データアナリストのメモ帳

データアナリストのメモ帳

IT企業で働くデータアナリストのブログ

Cloud Functionsを定期実行してBigQueryにデータを流す【GCP/Python】

https://cdn-ak.f.st-hatena.com/images/fotolife/s/stinkydofu/20210109/20210109161507.jpg

GCPのCloud Schedulerを用いて、Cloud Functionsを定期実行してみます。
また、Cloud FunctionsではデータをBigQueryにインサートする処理をPythonで書きます。

この一連の処理を応用すれば、「Pythonを用いてあるサイトからスクレイピングして得られたデータを、BigQuery内に作ったdatasetの中に格納する」という処理を毎日定期実行することもできます。便利ですよね!

1. BigQueryに空のtableを作る

まずは、BigQueryに空のテーブルを作ります。
プロジェクトの中にdatasetを作り、そこに空のテーブルを作りましょう。

https://cdn-ak.f.st-hatena.com/images/fotolife/s/stinkydofu/20210109/20210109144346.png

ここでは、test_datasetの中にtest_tableというテーブルを作ります。
スキーマはdatetimeとvalueを、どちらもFLOAT型で指定しておきます。
「テーブルを作成」ボタンをクリックすれば、空のテーブルが出来上がります。

https://cdn-ak.f.st-hatena.com/images/fotolife/s/stinkydofu/20210109/20210109144733.png

上の画像の様に、空のテーブルができていればOKです。

2. Cloud FunctionsでBigQueryにデータを送る処理をPythonで書く

次はCloud Functionsで定期実行したい処理を作ります。

2.1. 構成の設定

https://cdn-ak.f.st-hatena.com/images/fotolife/s/stinkydofu/20210109/20210109145111.png https://console.cloud.google.com/functions/

「関数を作成」をクリックします。

https://cdn-ak.f.st-hatena.com/images/fotolife/s/stinkydofu/20210109/20210109145713.png

上の画像のようなページに遷移します。
関数名をtest-functionとし、トリガーのタイプとしてCloud Pub/Subを選択してください。
すると、「Cloud Pub/Subトピックを選択してください」と表示されるので、「トピックを作成する」ボタンをクリックして、トピックを作成してください。
ここでは、トピック名をtest-function-shceduler-topicとしました。

2.2. Pythonコードを書く

次に、Pythonコードを書きます。
ランタイムでPython 3.7を選択し、main.pyとrequirements.txtを書き換えます。

デフォルトでは以下のようになっていると思います。 https://cdn-ak.f.st-hatena.com/images/fotolife/s/stinkydofu/20210109/20210109150223.png

main.pyを書き換えましょう。
ここでは、insert_rows_jsonを使って、json形式でBigQueryにデータをインサートします。
インサートするデータは、現在の時刻とランダムな値です。

value = np.random.random()
dt = datetime.datetime.now().timestamp()

json形式でインサートするため、.timestamp()を用いてUNIX時間に変換します。
書き換えるコードは以下の通りです。GCPのプロジェクトネーム(your-project-name)をご自身のものに変更してください。

import base64
from google.cloud import bigquery
import datetime
import numpy as np

def hello_pubsub(event, context):

    value = np.random.random()
    dt = datetime.datetime.now().timestamp()

    # Construct a BigQuery client object.
    client = bigquery.Client()

    # TODO(developer): Set table_id to the ID of table to append to.
    # table_id = "your-project.your_dataset.your_table"

    rows_to_insert = [
        {u"datetime": dt, u"value": value},
    ]

    errors = client.insert_rows_json('your-project-name.test_dataset.test_table', rows_to_insert)  # Make an API request.
    if errors == []:
      print("New rows have been added.")
    else:
      print("Encountered errors while inserting rows: {}".format(errors))

次に、requirements.txtを書き換えます。
main.pyで使ったパッケージのバージョンを記載しておきます。

# Function dependencies, for example:
# package>=version
numpy==1.19.4
bigquery==0.0.8

「デプロイ」ボタンをクリックして、デプロイします。

https://cdn-ak.f.st-hatena.com/images/fotolife/s/stinkydofu/20210109/20210109151857.png

エラーなくデプロイできていれば、緑色のチェックマークが付くはずです。
エラーが出ている場合は右にある「操作」→「ログを表示」でエラー文言を確認してみましょう。

これで、Cloud Functionsの設定は完了です!

3. Cloud Schedulerの設定をする

Cloud Schedulerの設定をしましょう。これで上で作った関数を定期実行できるようになります。

https://console.cloud.google.com/cloudscheduler

ジョブの作成ページで、
・名前(ここではtest-function-schedulerとしました)
・頻度(ここでは毎分にしています)
タイムゾーン
・トピック(2で作成したものを選択)
ペイロード(testと書いておく)
以上5つを入力します。

https://cdn-ak.f.st-hatena.com/images/fotolife/s/stinkydofu/20210109/20210109152746.png

これで「作成」ボタンを押せばOK!

https://cdn-ak.f.st-hatena.com/images/fotolife/s/stinkydofu/20210109/20210109152753.png

上の画像のようになっていればOKです。
「今すぐ実行」ボタンをクリックして、エラーが出ないか確認しましょう。エラーが出る場合はログを表示してみましょう。

4. BigQueryでインサートされたデータを確認

最後に、BigQueryにデータがインサートされているか確認してみましょう。

https://cdn-ak.f.st-hatena.com/images/fotolife/s/stinkydofu/20210109/20210109153536.png

上の画像のように、指定した通りdatetimeにUNIX時間のtimestampが、valueにランダムな数値が入っていればOKです。

以上、Cloud FunctionsをCloud Schedulerで定期実行してBigQueryにデータを流す方法でした!

参考記事: Cloud Functionsを定期実行させてみる | DevelopersIO