Contents
Cloud Composerとは?
GCPの数あるソリューションの中にあるCloud Composer
Cloud Composerは、GCP内のさまざまなデータを変換して別にエクスポートするGCPのETLツール的な役割を持っています。
ETLツールとは
- Extract(抽出)
- Transform(変換)
- Load(出力)
を一括で行うツールのことです。
まさにComposerでは、さまざまなデータを抽出し、Composerで変換処理をかけて、別にエクスポートするという処理を行います。
テーブルに新規テーブルを作成する場合、
簡単に実験として既存テーブルのコピーをしてみる。
出力先テーブルがすでにある場合、エラーになってしまう。
ただ料金に関しては、処理を行う要素が多すぎて複雑です。
ここでは処理の金額についてまでは説明はしません。(いつか余裕ができたらしようと思います。)
※ Cloud Composerは作成するとずっと起動状態になります。裏で最低3つのCompute Engine(GCE)が動くのでかなりお金がかかります。なのでテスト的に作成して放置するのはやめた方がいいです。
※ Composerバージョン1では最低GCE3つですが、Composerバージョン2では最低はなく、オートスケーリングで作成されます。
start_date + scheduler_intervalが最初のDAGの実行時間になってしまうらしい。なので注意が必要。つまりstart_dateはおそらく1日前に設定しないといけないっぽい。
管理しやすいように、yamlファイルを作成してそこにsqlを記載して、import yamlで読み込むというようなことをして管理しやすくするのもいい。
test.yamlにsql: 100と記載して保存
f = open("test.yaml", "r+")
a = yaml.dump(f)
これでaにyamlファイルが辞書式(json式)で保存される
Cloud Composerのschedulerについて
start_date + scheduler_intervalが最初のDAGの実行時間になってしまうらしい。なので注意が必要。つまりstart_dateはおそらく1日前に設定しないといけないっぽい。
以下は例。
models.DAG(
schedule_interval=timedelta(minutes=10),
start_date=datetime.datetime(2020年, 11月, 10日, 14時, 20分, 0秒, tzinfo=pendulum.timezone('Asia/Tokyo')),...
)
これでデプロイして最初に実行されるのは、2020/11/10/14/30/0である。そこから10分ごとに実行される。
json型だとpythonでもjsのobjなので変数.キーでアクセスできるが、辞書式ではpythonなので、連想配列のようにアクセスする。
phpは連想配列、pythonは辞書式、javascriptはjsonがキーバリューの関係。
Cloud Composer のパイプラインは、Python を使用して有向非巡回グラフ(DAG)として構成されており、あらゆるユーザーが簡単に使用できます。
下のグラフで言えばここでタスク1を行い、ここでタスク2を行うというイメージ。それぞれのタスクでpythonだったりbashコマンドを実行したりとできる。そして有向グラフによって、1つ前の処理が完了したら、処理を実行するというように順番を決めて容易に実行させることが可能となる。
コード
#!/usr/bin/env python # -*- coding: UTF-8 -*- """Unnest Google Analytics realtime data.""" from airflow import AirflowException from airflow.contrib.operators.bigquery_operator import BigQueryOperator from airflow import models from airflow.operators.python_operator import PythonOperator from airflow.operators.bash_operator import BashOperator import datetime from google.cloud import storage from datetime import timedelta import pendulum # 昨日の日付を取得 target_date = datetime.datetime.combine( datetime.datetime.today() - datetime.timedelta(days=1) + datetime.timedelta(hours=9), datetime.datetime.min.time(), ).strftime('%Y%m%d') # Composerの管理画面で設定した環境変数の取得 #project_id = models.Variable.get('project_id') #bq_dataset_id = models.Variable.get('bq_dataset_id_ga_rt') #bq_destination_table = models.Variable.get('bq_destination_table_ga_rt')+'_'+target_date #gcs_config_bucket_name = models.Variable.get('gcs_config_bucket_name') #config_file_name = models.Variable.get('config_file_name_ga_rt') # set default DAG ARGs default_dag_args = { 'start_date': datetime.datetime(1970, 1, 1), 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, #'retry_delay': datetime.timedelta(minutes=5), 'project_id': "project_id", } sql = """ #legacySQL select * from [project_id.ga_realtime_sessions_view_"""+target_date+"""] """ # ==== DAGの作成 ==== with models.DAG( dag_id='ga_realtime_sessions_unnest_yesterday', schedule_interval=timedelta(days=1), start_date=datetime.datetime( 2020, 11, 10, 5, 30, 0, tzinfo=pendulum.timezone('Asia/Tokyo')), catchup=False, default_args=default_dag_args, ) as dag: task1 = BigQueryOperator( task_id = "task1", bql=sql, use_legacy_sql=True, allow_large_results=True, flatten_results=False, destination_dataset_table="project_name:realtime_session_view_copy.ga_raw_copy_"+str(target_date) ) task1
composerでのパラメータ
パラメータ | 説明 |
---|---|
BashOperator | bashつまりshellscriptのコマンドで処理を行うことができる |
BigQueryOperator | BigQueryのテーブルに対してなんらかの処理を行う。 |
DAGによるタスクの整理
色々
GCSのDAGフォルダに入れると、それをすぐairflowは検知して実行してくれる。
shecule_intervalsに設定した内容ではなく、初回は一回アップすると実行してくれる。
そのあとはオンの状態にしているとそのままcronの時間で実行してくれる。
そしてオフにしたら、cron実行はなしになり、
再度オンにしたらcronに乗っ取り実行される
あくまで初回だけアップしたら実行する
手動実行は以下の赤枠
そして、
one_failed
エラーのトリガー
DAGの処理実行方法
DAGでは好きな時に実行をする「手動実行」と、決まった時間に一定期間などで自動的に実行する「自動実行」の2つができます。
1. 自動実行
自動実行をするには、DAGを構成するロジック内で指定する、schedule_intervalというパラメータにcronと同じような指定方法で指定することができます。
2. 手動実行
手動実行は、AirflowのDAG一覧画面にて、右側にある下の画像の赤枠をクリックする。
そして、トリガーの設定ができ、GCSにデータがアップされたらトリガーで実行などの設定もできるが、手動実行の場合は、そのまま赤枠のトリガーをクリック
これでDAGの再実行を行うことができます。
さまざまなAirflowのOperator
Operator | 説明 |
---|---|
BigQueryOperator | BigQueryを操作するタスク |