AWS Lambda pythonでDynamoDBをS3 CSVへ出力

DynamoDBのデータをS3へ、CSV形式で出力するLambda関数です。

Datapipelineやglueのジョブを使えないときのスクリプトです。

また、csvをインラインで処理しているため、外部関数を固めてアップする必要もありません。

Lambda関数全体

from __future__ import print_function # Python 2/3 compatibility
import json
import boto3
import logging
import os
import datetime
import calendar
import decimal
from boto3.dynamodb.conditions import Key, Attr

from base64 import b64decode
from urllib.request import Request, urlopen
from urllib.error import URLError, HTTPError

logger = logging.getLogger()
logger.setLevel(logging.INFO)

dynamodb = boto3.resource('dynamodb', region_name='ap-northeast-1')
s3 = boto3.resource('s3')

class DecimalEncoder(json.JSONEncoder):
    def default(self, o):
        if isinstance(o, decimal.Decimal):
            if o % 1 > 0:
                return float(o)
            else:
                return int(o)
        return super(DecimalEncoder, self).default(o)

def setcsv(dt):
    dtj=json.loads(json.dumps(dt, cls=DecimalEncoder))
    aaaa=dtj['aaaa']
    bbbb=dtj['bbbb']
    cccc=dtj['cccc']
    csv=""
    csv=csv+str('\n')
    csv=csv+aaaa+','
    csv=csv+bbbb+','
    csv=csv+cccc
    return csv
    
def lambda_handler(event, context):
    bucket = s3.Bucket('S3バケット名')
    table = dynamodb.Table('DynamoDBテーブル名')
    response = table.scan()
    csv="aaaa,bbbb,cccc"
    for dt in response['Items']:
        csv=csv+setcsv(dt)
        
    while 'LastEvaluatedKey' in response:
        response = table.scan(
            ExclusiveStartKey=response['LastEvaluatedKey']
        )
        for dt in response['Items']:
            csv=csv+setcsv(dt)

#    logger.info(str(csv))
    ret = bucket.put_object( Body= str(csv), Key='出力ファイル名.csv', ContentType='text/csv' )
     
    return str(ret)

DynamoDBのテーブルからデータを取得する部分

    table = dynamodb.Table('DynamoDBテーブル名')
    response = table.scan()
    csv="aaaa,bbbb,cccc"
    for dt in response['Items']:
        csv=csv+setcsv(dt)
        
    while 'LastEvaluatedKey' in response:
        response = table.scan(
            ExclusiveStartKey=response['LastEvaluatedKey']
        )
        for dt in response['Items']:
            csv=csv+setcsv(dt)

DynamoDBのscanは、responseに次に読むブロックがあるかどうかを渡しているため、LastEvaluatedKeyをチェックしてそこから読み込むループ処理が必要です。