Cloud Functionsを定期実行してBigQueryにデータを流す【GCP/Python】
GCPのCloud Schedulerを用いて、Cloud Functionsを定期実行してみます。
また、Cloud FunctionsではデータをBigQueryにインサートする処理をPythonで書きます。
この一連の処理を応用すれば、「Pythonを用いてあるサイトからスクレイピングして得られたデータを、BigQuery内に作ったdatasetの中に格納する」という処理を毎日定期実行することもできます。便利ですよね!
- 1. BigQueryに空のtableを作る
- 2. Cloud FunctionsでBigQueryにデータを送る処理をPythonで書く
- 3. Cloud Schedulerの設定をする
- 4. BigQueryでインサートされたデータを確認
1. BigQueryに空のtableを作る
まずは、BigQueryに空のテーブルを作ります。
プロジェクトの中にdatasetを作り、そこに空のテーブルを作りましょう。
ここでは、test_datasetの中にtest_tableというテーブルを作ります。
スキーマはdatetimeとvalueを、どちらもFLOAT型で指定しておきます。
「テーブルを作成」ボタンをクリックすれば、空のテーブルが出来上がります。
上の画像の様に、空のテーブルができていればOKです。
2. Cloud FunctionsでBigQueryにデータを送る処理をPythonで書く
次はCloud Functionsで定期実行したい処理を作ります。
2.1. 構成の設定
https://console.cloud.google.com/functions/
「関数を作成」をクリックします。
上の画像のようなページに遷移します。
関数名をtest-functionとし、トリガーのタイプとしてCloud Pub/Subを選択してください。
すると、「Cloud Pub/Subトピックを選択してください」と表示されるので、「トピックを作成する」ボタンをクリックして、トピックを作成してください。
ここでは、トピック名をtest-function-shceduler-topicとしました。
2.2. Pythonコードを書く
次に、Pythonコードを書きます。
ランタイムでPython 3.7を選択し、main.pyとrequirements.txtを書き換えます。
main.pyを書き換えましょう。
ここでは、insert_rows_jsonを使って、json形式でBigQueryにデータをインサートします。
インサートするデータは、現在の時刻とランダムな値です。
value = np.random.random() dt = datetime.datetime.now().timestamp()
json形式でインサートするため、.timestamp()を用いてUNIX時間に変換します。
書き換えるコードは以下の通りです。GCPのプロジェクトネーム(your-project-name)をご自身のものに変更してください。
import base64 from google.cloud import bigquery import datetime import numpy as np def hello_pubsub(event, context): value = np.random.random() dt = datetime.datetime.now().timestamp() # Construct a BigQuery client object. client = bigquery.Client() # TODO(developer): Set table_id to the ID of table to append to. # table_id = "your-project.your_dataset.your_table" rows_to_insert = [ {u"datetime": dt, u"value": value}, ] errors = client.insert_rows_json('your-project-name.test_dataset.test_table', rows_to_insert) # Make an API request. if errors == []: print("New rows have been added.") else: print("Encountered errors while inserting rows: {}".format(errors))
次に、requirements.txtを書き換えます。
main.pyで使ったパッケージのバージョンを記載しておきます。
# Function dependencies, for example: # package>=version numpy==1.19.4 bigquery==0.0.8
「デプロイ」ボタンをクリックして、デプロイします。
エラーなくデプロイできていれば、緑色のチェックマークが付くはずです。
エラーが出ている場合は右にある「操作」→「ログを表示」でエラー文言を確認してみましょう。
これで、Cloud Functionsの設定は完了です!
3. Cloud Schedulerの設定をする
Cloud Schedulerの設定をしましょう。これで上で作った関数を定期実行できるようになります。
https://console.cloud.google.com/cloudscheduler
ジョブの作成ページで、
・名前(ここではtest-function-schedulerとしました)
・頻度(ここでは毎分にしています)
・タイムゾーン
・トピック(2で作成したものを選択)
・ペイロード(testと書いておく)
以上5つを入力します。
これで「作成」ボタンを押せばOK!
上の画像のようになっていればOKです。
「今すぐ実行」ボタンをクリックして、エラーが出ないか確認しましょう。エラーが出る場合はログを表示してみましょう。
4. BigQueryでインサートされたデータを確認
最後に、BigQueryにデータがインサートされているか確認してみましょう。
上の画像のように、指定した通りdatetimeにUNIX時間のtimestampが、valueにランダムな数値が入っていればOKです。
以上、Cloud FunctionsをCloud Schedulerで定期実行してBigQueryにデータを流す方法でした!