一般社団法人 全国個人事業主支援協会

COLUMN コラム

  • GCPのCloud Composerで構築するデータパイプライン管理

Cloud Composerとは何か

GCPのCloud Composerは、Apache Airflowをフルマネージドで提供するサービスです。データパイプラインのオーケストレーションにおいて、スケジューリング、依存関係管理、モニタリングを一元化できるため、多くの企業がETL処理やデータ分析基盤の中核として採用しています。

筆者自身、オンプレミスのAirflow環境からCloud Composerへ移行したプロジェクトを複数経験してきましたが、インフラ管理の負荷が大幅に軽減される点は非常に大きなメリットです。一方で、マネージドサービスならではの制約やコスト面での注意点もあります。本記事では、実践的な構築手順とともに、現場で役立つ知見を共有します。

Cloud Composer環境の構築

環境の作成とバージョン選定

Cloud Composerには現在、Composer 1とComposer 2が存在します。新規プロジェクトであれば、パフォーマンスとコスト効率に優れたComposer 2を選択すべきです。Composer 2はオートスケーリングに対応しており、ワーカーノードの数がワークロードに応じて自動調整されます。

GCPコンソールからの作成も可能ですが、再現性を考慮するとTerraformやgcloudコマンドでの構築を推奨します。

gcloud composer environments create my-data-pipeline \
--location=asia-northeast1 \
--image-version=composer-2.6.0-airflow-2.7.3 \
--environment-size=small \
--scheduler-cpu=1 \
--scheduler-memory=2 \
--web-server-cpu=0.5 \
--web-server-memory=2

ポイントとして、environment-sizeはsmallから始めて、負荷に応じてスケールアップするのが現実的です。初期段階から大きなサイズを選ぶとコストが嵩みます。

ネットワーク構成の考慮

本番環境ではVPCネイティブのプライベートIP構成を採用するケースが多いでしょう。Cloud SQLやGCSへのアクセスはプライベート接続で行い、外部APIとの通信にはCloud NATを利用します。この設計により、セキュリティ要件を満たしつつ柔軟なデータパイプラインを構築できます。

DAGの設計と実装

基本的なDAG構造

Cloud ComposerのDAG(Directed Acyclic Graph)は、タスク間の依存関係を定義するワークフローの単位です。以下はBigQueryからデータを抽出し、加工してGCSに格納する典型的なパイプラインの例です。

from airflow import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.providers.google.cloud.transfers.bigquery_to_gcs import BigQueryToGCSOperator
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

default_args = {
'owner': 'data-team',
'depends_on_past': False,
'email_on_failure': True,
'email': ['data-alert@example.com'],
'retries': 2,
'retry_delay': timedelta(minutes=5),
}

with DAG(
dag_id='daily_sales_pipeline',
default_args=default_args,
schedule_interval='0 3 * * *',
start_date=datetime(2025, 1, 1),
catchup=False,
tags=['sales', 'daily'],
) as dag:

extract_task = BigQueryInsertJobOperator(
task_id='extract_raw_sales',
configuration={
'query': {
'query': 'SELECT * FROM raw.sales WHERE date = "{{ ds }}"',
'destinationTable': {
'projectId': 'my-project',
'datasetId': 'staging',
'tableId': 'sales_{{ ds_nodash }}',
},
'writeDisposition': 'WRITE_TRUNCATE',
'useLegacySql': False,
}
},
)

export_task = BigQueryToGCSOperator(
task_id='export_to_gcs',
source_project_dataset_table='my-project.staging.sales_{{ ds_nodash }}',
destination_cloud_storage_uris=['gs://my-bucket/sales/{{ ds }}/data_*.csv'],
export_format='CSV',
)

extract_task >> export_task

タスク分割の粒度

DAG設計で最も悩むのがタスクの粒度です。経験上、以下の方針が実用的です。

  • リトライ単位で分割する:失敗時に再実行したい最小単位をタスクにする
  • データソースごとに分割する:異なるデータソースからの取得は並列実行可能にする
  • 過度な分割は避ける:タスク数が多すぎるとスケジューラの負荷が上がり、全体のスループットが低下する

運用で押さえるべきポイント

エラーハンドリングとアラート

本番運用では、障害発生時の迅速な対応が求められます。Cloud Composerでは、Airflowのon_failure_callbackを活用してSlack通知やPagerDuty連携を実装するのが一般的です。

def slack_failure_alert(context):
from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook
hook = SlackWebhookHook(slack_webhook_conn_id='slack_webhook')
message = f":red_circle: DAG失敗: {context['dag'].dag_id}\nタスク: {context['task'].task_id}\n実行日: {context['ds']}"
hook.send(text=message)

default_args = {
'on_failure_callback': slack_failure_alert,
}

コスト最適化

Cloud Composerのコストは、主にGKEクラスタ、Cloud SQL、GCSの使用量で構成されます。開発環境ではスケジューラやワーカーのリソースを最小限に抑え、本番環境でのみスケールアップする運用を徹底しましょう。また、不要になったComposer環境は速やかに削除することが重要です。停止中でもCloud SQLのコストは発生し続けます。

CI/CDパイプラインとの統合

DAGファイルのデプロイは、Cloud BuildやGitHub Actionsと連携させるのがベストプラクティスです。DAGファイルをGCSバケットに同期する仕組みを自動化することで、コードレビューからデプロイまでのフローを一元管理できます。テスト環境でのDAG検証も忘れずに組み込みましょう。

まとめ

Cloud Composerは、GCPエコシステムとの親和性が高く、BigQuery・GCS・Dataflowなどとシームレスに連携できる強力なデータパイプライン管理ツールです。マネージドサービスとしての利便性を享受しつつも、DAGの設計品質やコスト管理には継続的な注意が必要です。まずは小規模なパイプラインから始めて、チームの習熟度に合わせて段階的に拡張していくアプローチをお勧めします。

この記事をシェアする

  • Twitterでシェア
  • Facebookでシェア
  • LINEでシェア