DynamoDBのデータをS3へ、CSV形式で出力するLambda関数です。
Datapipelineやglueのジョブを使えないときのスクリプトです。
また、csvをインラインで処理しているため、外部関数を固めてアップする必要もありません。
Lambda関数全体
from __future__ import print_function # Python 2/3 compatibility import json import boto3 import logging import os import datetime import calendar import decimal from boto3.dynamodb.conditions import Key, Attr from base64 import b64decode from urllib.request import Request, urlopen from urllib.error import URLError, HTTPError logger = logging.getLogger() logger.setLevel(logging.INFO) dynamodb = boto3.resource('dynamodb', region_name='ap-northeast-1') s3 = boto3.resource('s3') class DecimalEncoder(json.JSONEncoder): def default(self, o): if isinstance(o, decimal.Decimal): if o % 1 > 0: return float(o) else: return int(o) return super(DecimalEncoder, self).default(o) def setcsv(dt): dtj=json.loads(json.dumps(dt, cls=DecimalEncoder)) aaaa=dtj['aaaa'] bbbb=dtj['bbbb'] cccc=dtj['cccc'] csv="" csv=csv+str('\n') csv=csv+aaaa+',' csv=csv+bbbb+',' csv=csv+cccc return csv def lambda_handler(event, context): bucket = s3.Bucket('S3バケット名') table = dynamodb.Table('DynamoDBテーブル名') response = table.scan() csv="aaaa,bbbb,cccc" for dt in response['Items']: csv=csv+setcsv(dt) while 'LastEvaluatedKey' in response: response = table.scan( ExclusiveStartKey=response['LastEvaluatedKey'] ) for dt in response['Items']: csv=csv+setcsv(dt) # logger.info(str(csv)) ret = bucket.put_object( Body= str(csv), Key='出力ファイル名.csv', ContentType='text/csv' ) return str(ret)
DynamoDBのテーブルからデータを取得する部分
table = dynamodb.Table('DynamoDBテーブル名') response = table.scan() csv="aaaa,bbbb,cccc" for dt in response['Items']: csv=csv+setcsv(dt) while 'LastEvaluatedKey' in response: response = table.scan( ExclusiveStartKey=response['LastEvaluatedKey'] ) for dt in response['Items']: csv=csv+setcsv(dt)
DynamoDBのscanは、responseに次に読むブロックがあるかどうかを渡しているため、LastEvaluatedKeyをチェックしてそこから読み込むループ処理が必要です。