На работе, недавно попросили написать Lambda для AWS со следующей логикой:
- CSV файлы, апликейшеном складываются в AWS S3 бакет.
- Нужно написать python скрипт, который зайдет в бакет через креды и сможет забрать файлы с него.
- Распарсить полученные CSV файлы с AWS S3 бакета и перевести данные в JSON формат.
- Данные в JSON формате, отправить в ElasticSearch по определенному индексу.
Вообще, задача — довольно тривиальная и для меня очень простая. Я хорошо знаю python и AWS boto3 чтобы написать данный солюшен.
Не буду много лить воды, по этому сразу приведу код, который выглядит так:
#!/usr/bin/python3
# -*- coding: utf-8 -*-
import argparse
import datetime
import time
import csv
import json
import logging
import urllib3
import os
from gzip import GzipFile
from io import BytesIO, StringIO
import boto3
import botocore
from botocore.config import Config
# Initialize Logger
logger = logging.getLogger()
logger.setLevel(logging.INFO)
"""
Can Override the global variables using Lambda Environment Parameters
"""
globalVars = {}
globalVars['esIndexPrefix'] = "s3-to-es-"
globalVars['esIndexDocType'] = "s3_to_es_docs"
def s3_connector(aws_auth):
if (aws_auth['role_name'] is None or aws_auth['role_name'] == "None") \
and (aws_auth['role_session'] is None or aws_auth['role_session'] == "None"):
try:
session = boto3.session.Session(profile_name=aws_auth['profile_name'])
# Will retry any method call at most 3 time(s)
s3 = session.client(service_name=aws_auth['client'],
region_name=aws_auth['region'],
config=Config(retries={'max_attempts': 3})
)
return s3
except Exception as err:
print("Failed to create a boto3 client connection to S3:\n", str(err))
logger.error('ERROR: Failed to create a boto3 client connection to S3')
return False
elif (aws_auth['profile_name'] is None or aws_auth['profile_name'] == "None") \
and (aws_auth['role_name'] is not None or aws_auth['role_name'] != "None") \
and (aws_auth['role_session'] is not None or aws_auth['role_session'] != "None"):
try:
session = boto3.session.Session()
sts = session.client(service_name='sts',
region_name=aws_auth['region'],
config=Config(retries={'max_attempts': 3})
)
assumed_role_object = sts.assume_role(
RoleArn="{0}".format(aws_auth['role_name']),
RoleSessionName='{0}'.format(aws_auth['role_session'])
)
# can be used ay name, but need to add restriction for the name!
s3 = session.client(aws_access_key_id=assumed_role_object['Credentials']['AccessKeyId'],
aws_secret_access_key=assumed_role_object['Credentials']['SecretAccessKey'],
aws_session_token=assumed_role_object['Credentials']['SessionToken'],
service_name=aws_auth['client'],
region_name=aws_auth['region'],
config=Config(retries={'max_attempts': 3})
)
return s3
except Exception as err:
print("Failed to create a boto3 client connection to S3:\n", str(err))
logger.error('ERROR: Failed to create a boto3 client connection to S3')
return False
else:
print('Please use/set [--profile-name] or [--role-name] with [--role-session]')
return False
def s3_bucket(aws_auth, s3_bucket_name):
s3_bucket_status = False
s3 = s3_connector(aws_auth)
if s3:
try:
s3.head_bucket(Bucket=s3_bucket_name)
print("A bucket {} is already exists!".format(s3_bucket_name))
s3_bucket_status = True
return s3_bucket_status
except botocore.exceptions.ClientError as e:
error_code = int(e.response['Error']['Code'])
if error_code == 403:
print("Private {} bucket. Forbidden Access!".format(s3_bucket_name))
logger.error('ERROR: Private {0} Bucket. Forbidden Access!'.format(s3_bucket_name))
elif error_code == 404:
print("The {} bucket does not exist!".format(s3_bucket_name))
logger.error('ERROR: The {0} bucket does not exist!'.format(s3_bucket_name))
s3_bucket_status = False
return s3_bucket_status
else:
exit(-1)
return s3_bucket_status
def s3_objects(aws_auth, s3_bucket_name):
s3objects = []
s3 = s3_connector(aws_auth)
bucket_name = s3_bucket(aws_auth, s3_bucket_name)
if bucket_name:
try:
for key in s3.list_objects(Bucket=s3_bucket_name)['Contents']:
# print(key['Key'])
key_name = key['Key']
if (key_name.endswith('.gz')) or (key_name.endswith('.tar.gz')):
retr = s3.get_object(Bucket=s3_bucket_name, Key=key_name)
bytestream = BytesIO(retr['Body'].read())
content = GzipFile(None, 'rb', fileobj=bytestream).read().decode('utf-8')
s3objects.append(content)
else:
data = s3.get_object(Bucket=s3_bucket_name, Key=key_name)
contents = data['Body'].read()
s3objects.append(contents)
logger.info('SUCCESS: Retrieved object(s) from S3 {0} bucket'.format(s3_bucket_name))
except Exception as e:
print(e)
logger.error('ERROR: I could not retrieved object(s) from S3 {0} bucket'.format(s3_bucket_name))
return s3objects
def sending_data_to_elastisearch(es_url, docData):
# Index each line to ES Domain
index_name = globalVars['esIndexPrefix'] + str(datetime.date.today().year) + '-' + str(datetime.date.today().month)
elastic_searh_url = es_url + '/' + index_name + '/' + globalVars['esIndexDocType']
try:
headers = {'Content-type': 'application/json', 'Accept': 'text/plain'}
http = urllib3.PoolManager()
response = http.request('POST',
elastic_searh_url,
body=json.dumps(docData),
headers=headers,
retries=False,
timeout=30)
# print('Response status: ', response.status, "\nResponse data: ", response.data.decode('utf-8'))
logger.error('ERROR [response status]: {0}'.format(response.status))
if response.status == 201:
logger.info('INFO: Response status: {0}\nResponse data: {1}'.format(response.status,
response.data.decode('utf-8')))
logger.info('INFO: Successfully inserted element into ES')
elif response.status == 405:
logger.error('ERROR: Something is wrong with sending DATA: \n\t {}'.format(response.data.decode('utf-8')))
exit(1)
else:
logger.error('FAILURE: Got an error: \n\t {}'.format(response.data.decode('utf-8')))
exit(1)
except Exception as e:
logger.error('ERROR: {0}'.format(str(e)))
logger.error('ERROR: Unable to index line:"{0}"'.format(str(docData['content'])))
print(e)
exit(1)
return sending_data_to_elastisearch
def pushing_locally(aws_auth, s3_bucket_name, es_url):
s3objects = s3_objects(aws_auth, s3_bucket_name)
for obj in s3objects:
reader = csv.DictReader(StringIO(obj), fieldnames=None, restkey=None, restval=None, dialect='excel')
for row in reader:
json_out = json.loads(json.dumps(row))
docData = {}
docData['content'] = str(json.dumps(json_out))
docData['createdDate'] = '{}'.format(datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%fZ"))
sending_data_to_elastisearch(es_url, docData)
return pushing_locally
def lambda_handler(event, context):
aws_auth = {
"client": os.environ['aws_boto3_client'],
"region": os.environ['aws_region'],
"profile_name": os.environ['aws_profile_name'],
"role_name": os.environ['aws_role_name'],
"role_session": os.environ['aws_role_session']
}
s3_bucket_name = os.environ['aws_s3_bucket_name']
es_url = os.environ['elasticsearch_url']
logger.info("Received event: " + json.dumps(event, indent=2))
s3objects = s3_objects(aws_auth, s3_bucket_name)
for obj in s3objects:
reader = csv.DictReader(StringIO(obj.decode('utf-8')),
fieldnames=None,
restkey=None,
restval=None,
dialect='excel')
for row in reader:
json_out = json.loads(json.dumps(row))
docData = {}
docData['content'] = str(json.dumps(json_out))
docData['createdDate'] = '{}'.format(datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%fZ"))
sending_data_to_elastisearch(es_url, docData)
logger.info('SUCCESS: Successfully indexed the entire doc into ElastiSearch')
return {"Status": "AWS Lambda handler has been finished"}
if __name__ == '__main__':
parser = argparse.ArgumentParser(prog='python3 script_name.py -h',
usage='python3 script_name.py {ARGS}',
add_help=True,
prefix_chars='--/',
epilog='''created by Vitalii Natarov'''
)
parser.add_argument('--version', action='version', version='v0.1.0')
parser.add_argument('--bclient', dest='boto3_client', help='Set boto3 client', default='s3')
parser.add_argument('--region', dest='region', help='Set AWS region for boto3', default='us-east-1')
parser.add_argument('--pname', '--profile-name', dest='profile_name', help='Set profile name of AWS',
default=None)
parser.add_argument('--rname', '--role-name', dest='role_name', help='Set role ARN name',
default=None)
parser.add_argument('--rsession', '--role-session', dest='role_session', help='Set role session name',
default=None)
parser.add_argument('--s3-bucket', '--s3bucket', dest='s3_bucket', help='Set S3 bucket name',
default="test-s3-to-elastisearch")
parser.add_argument('--es-url', '--esurl', dest='es_url', help='Set ElastiSerch URL',
default="http://localhost:9200")
parser.add_argument('--lambda', dest='aws_lambda', help='Set lambda usage', default=True)
results = parser.parse_args()
boto3_client = results.boto3_client
region = results.region
profile_name = results.profile_name
role_name = results.role_name
role_session = results.role_session
s3_bucket_name = results.s3_bucket
es_url = results.es_url
aws_lambda = results.aws_lambda
if aws_lambda == 'True':
lambda_handler(None, None)
else:
start__time = time.time()
aws_auth = {
"client": boto3_client,
"region": region,
"profile_name": profile_name,
"role_name": role_name,
"role_session": role_session
}
pushing_locally(aws_auth, s3_bucket_name, es_url)
end__time = round(time.time() - start__time, 2)
print("--- %s seconds ---" % end__time)
Получить помощь, можно так:
$ python3 s3-to-elastisearch.py --help
usage: python3 script_name.py {ARGS}
optional arguments:
-h, --help show this help message and exit
--version show program's version number and exit
--bclient BOTO3_CLIENT
Set boto3 client
--region REGION Set AWS region for boto3
--pname PROFILE_NAME, --profile-name PROFILE_NAME
Set profile name of AWS
--rname ROLE_NAME, --role-name ROLE_NAME
Set role ARN name
--rsession ROLE_SESSION, --role-session ROLE_SESSION
Set role session name
--s3-bucket S3_BUCKET, --s3bucket S3_BUCKET
Set S3 bucket name
--es-url ES_URL, --esurl ES_URL
Set ElastiSerch URL
--lambda AWS_LAMBDA Set lambda usage
created by Vitalii Natarov
Пример использования:
$ python3 s3-to-elastisearch.py --lambda=False --profile-name=default
Или:
$ python3 s3-to-elastisearch.py --lambda=False --role-name="role_here" --role-session="session"
Насчет AWS Lambda, то для нее нужно будет выставить Environment переменные со следующими ключами:
- aws_s3_bucket_name — Опция которая позваляет задать имя для AWS S3 бакета (т.е где будут лежать ваши CSV файлы).
- elasticsearch_url — Далавляем УРЛ ElasticSearch-а. Например — «localhost:9200».
- boto3_client — Клиент для подключения, например — S3.
- aws_region — Выставляем AWS регион, например: us-east-1.
- aws_profile_name — Профиль для подключения и получения ресурсов с AWS. Например — default.
- aws_role_name — Если не выставлен aws_profile_name, то нужно выставить aws_role_name для использования ресурсов в AWS.
- role_session — Если не выставлен aws_profile_name, то нужно выставить role_session для использования ресурсов в AWS.
Например:
aws_lambda_role_trust_policies.json выглядат так:
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "LambdaAssumeRoleAllow",
"Effect": "Allow",
"Principal": {
"Service": "lambda.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
aws_lambda_policy.json выглядит так:
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "LogGroupAllows",
"Effect": "Allow",
"Action": "logs:CreateLogGroup",
"Resource": "arn:aws:logs:us-east-1:167127734783:*"
},
{
"Sid": "LogStreamAllows",
"Effect": "Allow",
"Action": [
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": [
"arn:aws:logs:us-east-1:167127734783:log-group:/aws/lambda/s3-to-elasticsearch:*"
]
},
{
"Sid": "ESAllows",
"Effect": "Allow",
"Action": [
"es:ListElasticsearchInstanceTypeDetails",
"es:CreateElasticsearchDomain",
"es:ListTags",
"es:ESHttpDelete",
"es:DeleteElasticsearchServiceRole",
"es:GetUpgradeHistory",
"es:ESHttpHead",
"es:DeleteElasticsearchDomain",
"es:DescribeElasticsearchDomain",
"es:UpgradeElasticsearchDomain",
"es:ESHttpPost",
"es:ESHttpPatch",
"es:DescribeElasticsearchDomains",
"es:DescribeReservedElasticsearchInstanceOfferings",
"es:CreateElasticsearchServiceRole",
"es:ESHttpGet",
"es:DescribeElasticsearchDomainConfig",
"es:PurchaseReservedElasticsearchInstanceOffering",
"es:DescribeReservedElasticsearchInstances",
"es:ListDomainNames",
"es:UpdateElasticsearchDomainConfig",
"es:GetCompatibleElasticsearchVersions",
"es:GetUpgradeStatus",
"es:ListElasticsearchInstanceTypes",
"es:ListElasticsearchVersions",
"es:DescribeElasticsearchInstanceTypeLimits",
"es:ESHttpPut"
],
"Resource": "*"
},
{
"Sid": "S3Allows",
"Effect": "Allow",
"Action": [
"S3:*"
],
"Resource": [
"*"
]
}
]
}
python_script_role_policy.json:
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "LogGroupAllows",
"Effect": "Allow",
"Action": "logs:CreateLogGroup",
"Resource": "arn:aws:logs:us-east-1:167127734783:*"
},
{
"Sid": "LogStreamAllows",
"Effect": "Allow",
"Action": [
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": [
"arn:aws:logs:us-east-1:167127734783:log-group:/aws/lambda/s3-to-elasticsearch:*"
]
},
{
"Sid": "ESAllows",
"Effect": "Allow",
"Action": [
"es:ListElasticsearchInstanceTypeDetails",
"es:CreateElasticsearchDomain",
"es:ListTags",
"es:ESHttpDelete",
"es:DeleteElasticsearchServiceRole",
"es:GetUpgradeHistory",
"es:ESHttpHead",
"es:DeleteElasticsearchDomain",
"es:DescribeElasticsearchDomain",
"es:UpgradeElasticsearchDomain",
"es:ESHttpPost",
"es:ESHttpPatch",
"es:DescribeElasticsearchDomains",
"es:DescribeReservedElasticsearchInstanceOfferings",
"es:CreateElasticsearchServiceRole",
"es:ESHttpGet",
"es:DescribeElasticsearchDomainConfig",
"es:PurchaseReservedElasticsearchInstanceOffering",
"es:DescribeReservedElasticsearchInstances",
"es:ListDomainNames",
"es:UpdateElasticsearchDomainConfig",
"es:GetCompatibleElasticsearchVersions",
"es:GetUpgradeStatus",
"es:ListElasticsearchInstanceTypes",
"es:ListElasticsearchVersions",
"es:DescribeElasticsearchInstanceTypeLimits",
"es:ESHttpPut"
],
"Resource": "*"
},
{
"Sid": "S3Allows",
"Effect": "Allow",
"Action": [
"S3:*"
],
"Resource": [
"*"
]
}
]
}
python_script_role_trust_policies.json:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "sts.amazonaws.com"
},
"Action": "sts:AssumeRole"
},
{
"Sid": "ff",
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:sts::167127734783:assumed-role/test-lambda-role/s3-to-elasticsearch"
},
"Action": "sts:AssumeRole"
}
]
}
Можно использовать! Если нужно помочь в настройке, пишите, — смогу помочь!
И да, код будет отправлен в GitHub и его можно будет стянуть:
$ git clone git@github.com:SebastianUA/lambda-s3-elastisearch.git
Вот и все, статья «AWS Lambda на python3 в Unix/Linux» завершена.