Athenaクエリーを自動で実行する必要があったため、作成しました。
Lambda関数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 | import os import time import boto3 S3_OUTPUT = 's3://バケット名' S3_BUCKET ='バケット名' DATABASE = 'DB名' TABLE = 'テーブル名' # create query # number of retries RETRY_COUNT = 300 # main def lambda_handler(event, context): query = "SELECT * FROM %s.%s where a=b;" % (DATABASE, TABLE) print(query) client = boto3.client('athena') response = client.start_query_execution( QueryString=query, QueryExecutionContext={ 'Database': DATABASE }, ResultConfiguration={ 'OutputLocation': S3_OUTPUT, } ) query_execution_id = response['QueryExecutionId'] print(query_execution_id) for i in range(1, 1 + RETRY_COUNT): # get query execution query_status = client.get_query_execution(QueryExecutionId=query_execution_id) query_execution_status = query_status['QueryExecution']['Status']['State'] if query_execution_status == 'SUCCEEDED': print("STATUS:" + query_execution_status) break if query_execution_status == 'FAILED': raise Exception("STATUS:" + query_execution_status) else: print("STATUS:" + query_execution_status) time.sleep(i) else: client.stop_query_execution(QueryExecutionId=query_execution_id) raise Exception('TIME OVER') # query file name QUERY_FILE = query_execution_id + ".csv" QUERY1_FILE = "out.csv" # rename csv file s3 = boto3.client('s3') s3.copy_object(Bucket=S3_BUCKET,Key=QUERY1_FILE, CopySource={'Bucket': S3_BUCKET, 'Key': QUERY_FILE}) |
Athenaクエリー実行部分
クエリーの実行結果を待つ必要があるため、ループしています。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 | response = client.start_query_execution( QueryString=tele_query, QueryExecutionContext={ 'Database': 'DATABASE' }, ResultConfiguration={ 'OutputLocation': 'S3_BUCKET', } ) query_execution_id = response['QueryExecutionId'] print(query_execution_id) for i in range(1, 1 + RETRY_COUNT): # get query execution query_status = client.get_query_execution(QueryExecutionId=query_execution_id) query_execution_status = query_status['QueryExecution']['Status']['State'] if query_execution_status == 'SUCCEEDED': print("STATUS:" + query_execution_status) break if query_execution_status == 'FAILED': raise Exception("STATUS:" + query_execution_status) else: print("STATUS:" + query_execution_status) time.sleep(i) else: client.stop_query_execution(QueryExecutionId=query_execution_id) raise Exception('TIME OVER') |