Athenaクエリーを自動で実行する必要があったため、作成しました。
Lambda関数
import os
import time
import boto3
S3_OUTPUT = 's3://バケット名'
S3_BUCKET ='バケット名'
DATABASE = 'DB名'
TABLE = 'テーブル名'
# create query
# number of retries
RETRY_COUNT = 300
# main
def lambda_handler(event, context):
query = "SELECT * FROM %s.%s where a=b;" % (DATABASE, TABLE)
print(query)
client = boto3.client('athena')
response = client.start_query_execution(
QueryString=query,
QueryExecutionContext={
'Database': DATABASE
},
ResultConfiguration={
'OutputLocation': S3_OUTPUT,
}
)
query_execution_id = response['QueryExecutionId']
print(query_execution_id)
for i in range(1, 1 + RETRY_COUNT):
# get query execution
query_status = client.get_query_execution(QueryExecutionId=query_execution_id)
query_execution_status = query_status['QueryExecution']['Status']['State']
if query_execution_status == 'SUCCEEDED':
print("STATUS:" + query_execution_status)
break
if query_execution_status == 'FAILED':
raise Exception("STATUS:" + query_execution_status)
else:
print("STATUS:" + query_execution_status)
time.sleep(i)
else:
client.stop_query_execution(QueryExecutionId=query_execution_id)
raise Exception('TIME OVER')
# query file name
QUERY_FILE = query_execution_id + ".csv"
QUERY1_FILE = "out.csv"
# rename csv file
s3 = boto3.client('s3')
s3.copy_object(Bucket=S3_BUCKET,Key=QUERY1_FILE, CopySource={'Bucket': S3_BUCKET, 'Key': QUERY_FILE})
Athenaクエリー実行部分
クエリーの実行結果を待つ必要があるため、ループしています。
response = client.start_query_execution(
QueryString=tele_query,
QueryExecutionContext={
'Database': 'DATABASE'
},
ResultConfiguration={
'OutputLocation': 'S3_BUCKET',
}
)
query_execution_id = response['QueryExecutionId']
print(query_execution_id)
for i in range(1, 1 + RETRY_COUNT):
# get query execution
query_status = client.get_query_execution(QueryExecutionId=query_execution_id)
query_execution_status = query_status['QueryExecution']['Status']['State']
if query_execution_status == 'SUCCEEDED':
print("STATUS:" + query_execution_status)
break
if query_execution_status == 'FAILED':
raise Exception("STATUS:" + query_execution_status)
else:
print("STATUS:" + query_execution_status)
time.sleep(i)
else:
client.stop_query_execution(QueryExecutionId=query_execution_id)
raise Exception('TIME OVER')
