データアナリストのメモ帳

データアナリストのメモ帳

ITベンチャーで働くデータアナリストのブログ

【GCP】Cloud Functionsを定期実行してBigQueryにデータを流す

https://cdn-ak.f.st-hatena.com/images/fotolife/s/stinkydofu/20210109/20210109161507.jpg

GCPのCloud Schedulerを用いて、Cloud Functionsを定期実行してみます。
また、Cloud FunctionsではデータをBigQueryにインサートする処理をPythonで書きます。

この一連の処理を応用すれば、「Pythonを用いてあるサイトからスクレイピングして得られたデータを、BigQuery内に作ったdatasetの中に格納する」という処理を毎日定期実行することもできます。便利ですよね!

1. BigQueryに空のtableを作る

まずは、BigQueryに空のテーブルを作ります。
プロジェクトの中にdatasetを作り、そこに空のテーブルを作りましょう。

https://cdn-ak.f.st-hatena.com/images/fotolife/s/stinkydofu/20210109/20210109144346.png

ここでは、test_datasetの中にtest_tableというテーブルを作ります。
スキーマはdatetimeとvalueを、どちらもFLOAT型で指定しておきます。
「テーブルを作成」ボタンをクリックすれば、空のテーブルが出来上がります。

https://cdn-ak.f.st-hatena.com/images/fotolife/s/stinkydofu/20210109/20210109144733.png

上の画像の様に、空のテーブルができていればOKです。

2. Cloud FunctionsでBigQueryにデータを送る処理をPythonで書く

次はCloud Functionsで定期実行したい処理を作ります。

2.1. 構成の設定

https://cdn-ak.f.st-hatena.com/images/fotolife/s/stinkydofu/20210109/20210109145111.png https://console.cloud.google.com/functions/

「関数を作成」をクリックします。

https://cdn-ak.f.st-hatena.com/images/fotolife/s/stinkydofu/20210109/20210109145713.png

上の画像のようなページに遷移します。
関数名をtest-functionとし、トリガーのタイプとしてCloud Pub/Subを選択してください。
すると、「Cloud Pub/Subトピックを選択してください」と表示されるので、「トピックを作成する」ボタンをクリックして、トピックを作成してください。
ここでは、トピック名をtest-function-shceduler-topicとしました。

2.2. Pythonコードを書く

次に、Pythonコードを書きます。
ランタイムでPython 3.7を選択し、main.pyとrequirements.txtを書き換えます。

デフォルトでは以下のようになっていると思います。 https://cdn-ak.f.st-hatena.com/images/fotolife/s/stinkydofu/20210109/20210109150223.png

main.pyを書き換えましょう。
ここでは、insert_rows_jsonを使って、json形式でBigQueryにデータをインサートします。
インサートするデータは、現在の時刻とランダムな値です。

value = np.random.random()
dt = datetime.datetime.now().timestamp()

json形式でインサートするため、.timestamp()を用いてUNIX時間に変換します。
書き換えるコードは以下の通りです。GCPのプロジェクトネーム(your-project-name)をご自身のものに変更してください。

import base64
from google.cloud import bigquery
import datetime
import numpy as np

def hello_pubsub(event, context):

    value = np.random.random()
    dt = datetime.datetime.now().timestamp()

    # Construct a BigQuery client object.
    client = bigquery.Client()

    # TODO(developer): Set table_id to the ID of table to append to.
    # table_id = "your-project.your_dataset.your_table"

    rows_to_insert = [
        {u"datetime": dt, u"value": value},
    ]

    errors = client.insert_rows_json('your-project-name.test_dataset.test_table', rows_to_insert)  # Make an API request.
    if errors == []:
      print("New rows have been added.")
    else:
      print("Encountered errors while inserting rows: {}".format(errors))

次に、requirements.txtを書き換えます。
main.pyで使ったパッケージのバージョンを記載しておきます。

# Function dependencies, for example:
# package>=version
numpy==1.19.4
bigquery==0.0.8

「デプロイ」ボタンをクリックして、デプロイします。

https://cdn-ak.f.st-hatena.com/images/fotolife/s/stinkydofu/20210109/20210109151857.png

エラーなくデプロイできていれば、緑色のチェックマークが付くはずです。
エラーが出ている場合は右にある「操作」→「ログを表示」でエラー文言を確認してみましょう。

これで、Cloud Functionsの設定は完了です!

3. Cloud Schedulerの設定をする

Cloud Schedulerの設定をしましょう。これで上で作った関数を定期実行できるようになります。

https://console.cloud.google.com/cloudscheduler

ジョブの作成ページで、
・名前(ここではtest-function-schedulerとしました)
・頻度(ここでは毎分にしています)
タイムゾーン
・トピック(2で作成したものを選択)
ペイロード(testと書いておく)
以上5つを入力します。

https://cdn-ak.f.st-hatena.com/images/fotolife/s/stinkydofu/20210109/20210109152746.png

これで「作成」ボタンを押せばOK!

https://cdn-ak.f.st-hatena.com/images/fotolife/s/stinkydofu/20210109/20210109152753.png

上の画像のようになっていればOKです。
「今すぐ実行」ボタンをクリックして、エラーが出ないか確認しましょう。エラーが出る場合はログを表示してみましょう。

4. BigQueryでインサートされたデータを確認

最後に、BigQueryにデータがインサートされているか確認してみましょう。

https://cdn-ak.f.st-hatena.com/images/fotolife/s/stinkydofu/20210109/20210109153536.png

上の画像のように、指定した通りdatetimeにUNIX時間のtimestampが、valueにランダムな数値が入っていればOKです。

以上、Cloud FunctionsをCloud Schedulerで定期実行してBigQueryにデータを流す方法でした!

参考記事: Cloud Functionsを定期実行させてみる | Developers.IO

BigQueryでローカルのCSVファイルをアップロードする方法

https://cdn-ak.f.st-hatena.com/images/fotolife/s/stinkydofu/20210106/20210106000228.jpg BigQueryにローカルのCSVファイルをアップロードして、色々分析したいって時におすすめです。

1. 前提

  1. Google Cloud プロジェクトが選択されていること
  2. BigQuery API が有効になっていること

2. BigQueryのページでデータセットを作成する

まず、BigQueryのページから「データセットを作成」ボタンをクリックします。

https://cdn-ak.f.st-hatena.com/images/fotolife/s/stinkydofu/20210105/20210105224355.png

以下のようにデータセット IDとロケーションを入力して、下にある「データセットを作成」ボタンをクリック。 https://cdn-ak.f.st-hatena.com/images/fotolife/s/stinkydofu/20210105/20210105225507.png

これでデータセットの作成が完了です。

3. CSVファイルをアップロードする

作成したデータセットを選択し、テーブルを作成ボタンをクリックします。

https://cdn-ak.f.st-hatena.com/images/fotolife/s/stinkydofu/20210105/20210105230057.png

以下のように、入力します。
※今回は 以下のデータをローカルにダウンロードして使っています。 http://www.ssa.gov/OACT/babynames/names.zip

https://cdn-ak.f.st-hatena.com/images/fotolife/s/stinkydofu/20210105/20210105230322.png

テーブルの作成元:アップロード
ファイルを選択:アップロードしたいファイルを選ぶ
ファイル形式:CSV
テーブル名:好きな名前を付ける

スキーマは以下のようにテキストで編集できますが、

name:string,gender:string,count:integer

一つずつ指定していくことも可能です。

これでテーブルを作成ボタンを押せばOK!

test_datasetの中にtest_tableがあることが確認できると思います。
プレビューするとこんな感じ。

https://cdn-ak.f.st-hatena.com/images/fotolife/s/stinkydofu/20210105/20210105230955.png

以上、BigQueryでCSVファイルを扱う方法でした!

参照元記事: Cloud Console を使用したクイックスタート

【Python】現在のディレクトリを変更する方法

osモジュールを使って、現在のディレクトリを変更する方法です。

import os

# ディレクトリ変更
os.chdir('/Users/username/downloads/dirname')

また、現在のディレクトリを確認する方法は以下です。

os.getcwd()

【Python/glob】フォルダ内の複数のCSVファイルを一括で読み込んで縦に連結させる方法

globモジュールを使って、ディレクトリ内にある複数のCSVファイルを読み込み、さらにそれらを縦に連結させる方法です。

例えば、ある特定のディレクトリの中に、何かのログがCSVとして毎日吐き出されるような場合に使えます。
CSVファイルを一括で読み込んで一つのファイルに集約できると、何かと便利ですよね。CSVExcelで開いて一つずつ繋げていく手間が省けます。

まずは、モジュールをimportします。

import pandas as pd
import os
import glob

osを使ってディレクトリを変更しましょう。
読み込みたいCSVファイルが置いてあるディレクトリを指定します。

また、読み込んだCSVファイルを連結させる下準備として、空のDataFrameを用意しておきます。

# ディレクトリ変更
os.chdir('/Users/username/downloads/dirname')

df = pd.DataFrame(columns = [])

globモジュールを使います。
引数に指定されたパターン(正規表現)にマッチするファイルパス名を取得することが出来ます。
ここでは,log_nameという文字列を含むファイルパスを取得します。

log_name_20200101.csv
log_name_20200102.csv
log_name_20200103.csv

このような形で、ディレクトリ内にCSVファイルが置かれていることを想定しています。

for i in glob.glob("*log_name*"):
    tmp = pd.read_csv(i)
    df = pd.concat([df, tmp])

for文でglobで取得したファイル名を一つずつ処理していきます。
pd.read_csvで読み込み、pd.concatで縦方向に連結していきます。
空のDataFrameの中に読み込んだCSVファイルを足していくイメージですね。

以下はまとめです。

import pandas as pd
import os
import glob

# ディレクトリ変更
os.chdir('/Users/username/downloads/dirname')

df = pd.DataFrame(columns = [])

for i in glob.glob("*log_name*"):
    tmp = pd.read_csv(i)
    df = pd.concat([df, tmp])

以上、フォルダ内の複数のCSVファイルを一括で読み込んで縦に連結させる方法でした!