DynamoDBのデータをS3へ、CSV形式で出力するLambda関数です。
Datapipelineやglueのジョブを使えないときのスクリプトです。
また、csvをインラインで処理しているため、外部関数を固めてアップする必要もありません。
Lambda関数全体
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 | from __future__ import print_function # Python 2/3 compatibility import json import boto3 import logging import os import datetime import calendar import decimal from boto3.dynamodb.conditions import Key, Attr from base64 import b64decode from urllib.request import Request, urlopen from urllib.error import URLError, HTTPError logger = logging.getLogger() logger.setLevel(logging.INFO) dynamodb = boto3.resource('dynamodb', region_name='ap-northeast-1') s3 = boto3.resource('s3') class DecimalEncoder(json.JSONEncoder): def default(self, o): if isinstance(o, decimal.Decimal): if o % 1 > 0: return float(o) else: return int(o) return super(DecimalEncoder, self).default(o) def setcsv(dt): dtj=json.loads(json.dumps(dt, cls=DecimalEncoder)) aaaa=dtj['aaaa'] bbbb=dtj['bbbb'] cccc=dtj['cccc'] csv="" csv=csv+str('\n') csv=csv+aaaa+',' csv=csv+bbbb+',' csv=csv+cccc return csv def lambda_handler(event, context): bucket = s3.Bucket('S3バケット名') table = dynamodb.Table('DynamoDBテーブル名') response = table.scan() csv="aaaa,bbbb,cccc" for dt in response['Items']: csv=csv+setcsv(dt) while 'LastEvaluatedKey' in response: response = table.scan( ExclusiveStartKey=response['LastEvaluatedKey'] ) for dt in response['Items']: csv=csv+setcsv(dt) # logger.info(str(csv)) ret = bucket.put_object( Body= str(csv), Key='出力ファイル名.csv', ContentType='text/csv' ) return str(ret) |
DynamoDBのテーブルからデータを取得する部分
1 2 3 4 5 6 7 8 9 10 11 12 | table = dynamodb.Table('DynamoDBテーブル名') response = table.scan() csv="aaaa,bbbb,cccc" for dt in response['Items']: csv=csv+setcsv(dt) while 'LastEvaluatedKey' in response: response = table.scan( ExclusiveStartKey=response['LastEvaluatedKey'] ) for dt in response['Items']: csv=csv+setcsv(dt) |