Google Cloud Platform IT

【GCP】Cloud ComposerでETLを実現する

2020年11月10日

Cloud Composerとは?

GCPの数あるソリューションの中にあるCloud Composer
Cloud Composerは、GCP内のさまざまなデータを変換して別にエクスポートするGCPのETLツール的な役割を持っています。

ETLツールとは

  • Extract(抽出)
  • Transform(変換)
  • Load(出力)

を一括で行うツールのことです。

まさにComposerでは、さまざまなデータを抽出し、Composerで変換処理をかけて、別にエクスポートするという処理を行います。

テーブルに新規テーブルを作成する場合、
簡単に実験として既存テーブルのコピーをしてみる。
出力先テーブルがすでにある場合、エラーになってしまう。

ただ料金に関しては、処理を行う要素が多すぎて複雑です。
ここでは処理の金額についてまでは説明はしません。(いつか余裕ができたらしようと思います。)
※ Cloud Composerは作成するとずっと起動状態になります。裏で最低3つのCompute Engine(GCE)が動くのでかなりお金がかかります。なのでテスト的に作成して放置するのはやめた方がいいです。
※ Composerバージョン1では最低GCE3つですが、Composerバージョン2では最低はなく、オートスケーリングで作成されます。

start_date + scheduler_intervalが最初のDAGの実行時間になってしまうらしい。なので注意が必要。つまりstart_dateはおそらく1日前に設定しないといけないっぽい。
管理しやすいように、yamlファイルを作成してそこにsqlを記載して、import yamlで読み込むというようなことをして管理しやすくするのもいい。

test.yamlにsql: 100と記載して保存
f = open("test.yaml", "r+")
a = yaml.dump(f)

これでaにyamlファイルが辞書式(json式)で保存される

Cloud Composerのschedulerについて

start_date + scheduler_intervalが最初のDAGの実行時間になってしまうらしい。なので注意が必要。つまりstart_dateはおそらく1日前に設定しないといけないっぽい。
以下は例。
models.DAG(
schedule_interval=timedelta(minutes=10),
start_date=datetime.datetime(2020年, 11月, 10日, 14時, 20分, 0秒, tzinfo=pendulum.timezone('Asia/Tokyo')),...
)
これでデプロイして最初に実行されるのは、2020/11/10/14/30/0である。そこから10分ごとに実行される。

json型だとpythonでもjsのobjなので変数.キーでアクセスできるが、辞書式ではpythonなので、連想配列のようにアクセスする。
phpは連想配列、pythonは辞書式、javascriptはjsonがキーバリューの関係。
Cloud Composer のパイプラインは、Python を使用して有向非巡回グラフ(DAG)として構成されており、あらゆるユーザーが簡単に使用できます。

下のグラフで言えばここでタスク1を行い、ここでタスク2を行うというイメージ。それぞれのタスクでpythonだったりbashコマンドを実行したりとできる。そして有向グラフによって、1つ前の処理が完了したら、処理を実行するというように順番を決めて容易に実行させることが可能となる。

コード

#!/usr/bin/env python
# -*- coding: UTF-8 -*-

"""Unnest Google Analytics realtime data."""
from airflow import AirflowException
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow import models
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
import datetime
from google.cloud import storage
from datetime import timedelta
import pendulum

# 昨日の日付を取得
target_date = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(days=1) + datetime.timedelta(hours=9),
    datetime.datetime.min.time(),
).strftime('%Y%m%d')

# Composerの管理画面で設定した環境変数の取得
#project_id = models.Variable.get('project_id')
#bq_dataset_id = models.Variable.get('bq_dataset_id_ga_rt')
#bq_destination_table = models.Variable.get('bq_destination_table_ga_rt')+'_'+target_date
#gcs_config_bucket_name = models.Variable.get('gcs_config_bucket_name')
#config_file_name = models.Variable.get('config_file_name_ga_rt')

# set default DAG ARGs
default_dag_args = {
    'start_date': datetime.datetime(1970, 1, 1),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    #'retry_delay': datetime.timedelta(minutes=5),
    'project_id': "project_id",
}

sql = """
    #legacySQL
    select * from [project_id.ga_realtime_sessions_view_"""+target_date+"""]
    """

# ==== DAGの作成 ====
with models.DAG(
    dag_id='ga_realtime_sessions_unnest_yesterday',
    schedule_interval=timedelta(days=1),
    start_date=datetime.datetime(
        2020, 11, 10, 5, 30, 0, tzinfo=pendulum.timezone('Asia/Tokyo')),
    catchup=False,
    default_args=default_dag_args,

) as dag:
    task1 = BigQueryOperator(
        task_id = "task1",
        bql=sql,
        use_legacy_sql=True,
        allow_large_results=True,
        flatten_results=False,
        destination_dataset_table="project_name:realtime_session_view_copy.ga_raw_copy_"+str(target_date)
    )

    task1

composerでのパラメータ

パラメータ 説明
BashOperator bashつまりshellscriptのコマンドで処理を行うことができる
BigQueryOperator BigQueryのテーブルに対してなんらかの処理を行う。

DAGによるタスクの整理

色々

GCSのDAGフォルダに入れると、それをすぐairflowは検知して実行してくれる。
shecule_intervalsに設定した内容ではなく、初回は一回アップすると実行してくれる。
そのあとはオンの状態にしているとそのままcronの時間で実行してくれる。
そしてオフにしたら、cron実行はなしになり、
再度オンにしたらcronに乗っ取り実行される
あくまで初回だけアップしたら実行する

手動実行は以下の赤枠

そして、

one_failed

エラーのトリガー

DAGの処理実行方法

DAGでは好きな時に実行をする「手動実行」と、決まった時間に一定期間などで自動的に実行する「自動実行」の2つができます。

1. 自動実行
自動実行をするには、DAGを構成するロジック内で指定する、schedule_intervalというパラメータにcronと同じような指定方法で指定することができます。
2. 手動実行
手動実行は、AirflowのDAG一覧画面にて、右側にある下の画像の赤枠をクリックする。

そして、トリガーの設定ができ、GCSにデータがアップされたらトリガーで実行などの設定もできるが、手動実行の場合は、そのまま赤枠のトリガーをクリック

これでDAGの再実行を行うことができます。

さまざまなAirflowのOperator

Operator 説明
BigQueryOperator BigQueryを操作するタスク

-Google Cloud Platform, IT
-,

© 2025 Yosshi Labo. Powered by AFFINGER5