Datalabで簡易webクローリングし、BigQueryにアップロードする方法
Datalabで簡易webクローリングし、BigQueryにアップロードする方法:
GCP Cloud Datalab(Python3)で、requestsライブラリを利用してcsvを特定サイトから取得し、取得したデータをBigQueryに格納するコードを説明します。
Datalabで簡易webクローリングし、GoogleCloudStorage(GCS)にアップロードする方法、のちょっと修正版です。
こんなユーザが対象です。
まず、必要なライブラリ読み込みます。storage,bigquery。。
適宜、変数を定義します。
対象のURLからrequestsを利用して、ファイルを取得。response.contentにコンテンツが格納されます。typeはbytes。
GCSにアップロードする関数は以下です。返り値に、objectのURIを返します(gs://形式)。
BQにデータを格納する関数は以下です。スキーマを定義しておき、load関数でGCSにあるデータをBQにinsertします。
最後に、main関数は以下です。実行後、BigQueryに、新たなテーブルが作成されるとともに、データが格納されます。
*https://googledatalab.github.io/pydatalab/google.datalab.bigquery.html
Datalabを利用して、簡単に、WEBから取得したcsvファイルを、BQに格納する方法を説明しました。変数は自分の環境用に変更をして試してみてください。
はじめに
GCP Cloud Datalab(Python3)で、requestsライブラリを利用してcsvを特定サイトから取得し、取得したデータをBigQueryに格納するコードを説明します。Datalabで簡易webクローリングし、GoogleCloudStorage(GCS)にアップロードする方法、のちょっと修正版です。
こんなユーザが対象です。
- ローカルでJupyterNotebookを使っていたが、クラウドでいつでもどこからでも利用したい
- WEBのデータ(csv)を簡単に取得してBQへアップロードして、機械学習や可視化したい
- Python3を使いたい!
環境
- Cloud Datalab(Python3)
- 読み込むsample.csvは以下のようなものでテストします。
date | memo | number |
---|---|---|
2018-10-08 | test1 | 100 |
2018-10-09 | test2 | 200 |
2018-10-10 | test3 | 300 |
Datalab上でのPythonコード詳細
まず、必要なライブラリ読み込みます。storage,bigquery。。import requests import sys import google.datalab.storage as storage import google.datalab.bigquery as bq
## バケットの定義、適宜変更 ## バケットの定義 bucket_name = '<bucket name>' // バケット名 bucket_path = 'gs://' + bucket_name //バケットのPATH bucket_dir = '<bucket directory>/' //GCSの保存先フォルダ名 upload_filename = 'sample_upload.csv' //GCSに保存する名前 bucket = storage.Bucket(bucket_name) ## アクセスするURL access_url = 'https://<replace to site>/sample.csv' //ダウンロードしたいサイトのcsv ## BigQuery datasetname = '<bigquery datasets name>' // 保存先のBigQueryのデータセット名 table_id = '<bigquery table name' // BigQueryのテーブル名
## URLからダウンロードする def download(url): data = "" response = requests.get(url) #print(type(response.content)) ## <class 'bytes'> if response.status_code == 200: data = response.content return data else: print('download error') exit() return data
## GCSにアップロードする def upload_gcs(bucket, uploadObject, data): upload_object = bucket.object(uploadObject) upload_object.write_stream(data,content_type='text/csv') return upload_object.uri
## BigQueryに格納する def upload2bq(file_path,id): schema = [ {'name': 'date', 'type': 'DATE'}, {'name': 'memo', 'type': 'STRING'}, {'name': 'number', 'type': 'INT64'} ] ## csvオプション設定、最初の行をスキップして、urf-8形式とします。 csv_option = bq.CSVOptions(skip_leading_rows=1, encoding=u'utf-8') bq_schema = bq.Schema.from_data(schema) tablename = datasetname + '.' + id bq_table = bq.Table(tablename).create(schema = bq_schema, overwrite = True) ## GCSのpathを指定して、BQにデータをinsertします。 bq_table.load(file_path, mode='append', source_format = 'csv', csv_options=csv_option)
# main if __name__ == "__main__": ## アクセスする先のURL url = access_url ## バケットインスタンスの作成 bucket = storage.Bucket(bucket_name) ## URLにGETアクセスした結果のコンテンツを取得 data = download(url) if data != "": ## 正常に取得できたら、指定したGCSへアップロードし、返り値として、URIを取得する(filepath) filepath = upload_gcs(bucket, bucket_dir + upload_filename, data) table_id = table_id upload2bq(filepath,table_id) print('Success!') else: print('not Success!')
コメント
コメントを投稿