AWS Lambda pythonでAthenaクエリーを実行しS3へ出力する関数

Athenaクエリーを自動で実行する必要があったため、作成しました。

Lambda関数

import os
import time
import boto3


S3_OUTPUT = 's3://バケット名'
S3_BUCKET ='バケット名'
DATABASE = 'DB名'
TABLE = 'テーブル名'

# create query

# number of retries
RETRY_COUNT = 300

# main
def lambda_handler(event, context):

    query = "SELECT * FROM %s.%s where a=b;" % (DATABASE, TABLE)
    print(query)

    client = boto3.client('athena')

    response = client.start_query_execution(
        QueryString=query,
        QueryExecutionContext={
            'Database': DATABASE
        },
        ResultConfiguration={
            'OutputLocation': S3_OUTPUT,
        }
    )
    query_execution_id = response['QueryExecutionId']
    print(query_execution_id)

    for i in range(1, 1 + RETRY_COUNT):

        # get query execution
        query_status = client.get_query_execution(QueryExecutionId=query_execution_id)
        query_execution_status = query_status['QueryExecution']['Status']['State']

        if query_execution_status == 'SUCCEEDED':
            print("STATUS:" + query_execution_status)
            break

        if query_execution_status == 'FAILED':
            raise Exception("STATUS:" + query_execution_status)

        else:
            print("STATUS:" + query_execution_status)
            time.sleep(i)
    else:
        client.stop_query_execution(QueryExecutionId=query_execution_id)
        raise Exception('TIME OVER')


    # query file name
    QUERY_FILE = query_execution_id + ".csv"

    QUERY1_FILE = "out.csv"
    
    # rename csv file
    s3 = boto3.client('s3')
    s3.copy_object(Bucket=S3_BUCKET,Key=QUERY1_FILE, CopySource={'Bucket': S3_BUCKET, 'Key': QUERY_FILE})

Athenaクエリー実行部分

クエリーの実行結果を待つ必要があるため、ループしています。

   response = client.start_query_execution(
        QueryString=tele_query,
        QueryExecutionContext={
            'Database': 'DATABASE'
        },
        ResultConfiguration={
            'OutputLocation': 'S3_BUCKET',
        }
    )
    query_execution_id = response['QueryExecutionId']
    print(query_execution_id)

    for i in range(1, 1 + RETRY_COUNT):

        # get query execution
        query_status = client.get_query_execution(QueryExecutionId=query_execution_id)
        query_execution_status = query_status['QueryExecution']['Status']['State']

        if query_execution_status == 'SUCCEEDED':
            print("STATUS:" + query_execution_status)
            break

        if query_execution_status == 'FAILED':
            raise Exception("STATUS:" + query_execution_status)

        else:
            print("STATUS:" + query_execution_status)
            time.sleep(i)
    else:
        client.stop_query_execution(QueryExecutionId=query_execution_id)
        raise Exception('TIME OVER')