AWS Lambda на python3 в Unix/Linux

На работе, недавно попросили написать Lambda для AWS со следующей логикой:

  • CSV файлы, апликейшеном складываются в AWS S3 бакет.
  • Нужно написать python скрипт, который зайдет в бакет через креды и сможет забрать файлы с него.
  • Распарсить полученные CSV файлы с AWS S3 бакета и перевести данные в JSON формат.
  • Данные в JSON формате, отправить в ElasticSearch по определенному индексу.

Вообще, задача — довольно тривиальная и для меня очень простая. Я хорошо знаю python и AWS boto3 чтобы написать данный солюшен.

Не буду много лить воды, по этому сразу приведу код, который выглядит так:

#!/usr/bin/python3
# -*- coding: utf-8 -*-

import argparse
import datetime
import time
import csv
import json
import logging
import urllib3
import os
from gzip import GzipFile
from io import BytesIO, StringIO

import boto3
import botocore
from botocore.config import Config

# Initialize Logger
logger = logging.getLogger()
logger.setLevel(logging.INFO)

"""
Can Override the global variables using Lambda Environment Parameters
"""
globalVars = {}
globalVars['esIndexPrefix'] = "s3-to-es-"
globalVars['esIndexDocType'] = "s3_to_es_docs"


def s3_connector(aws_auth):
    if (aws_auth['role_name'] is None or aws_auth['role_name'] == "None") \
            and (aws_auth['role_session'] is None or aws_auth['role_session'] == "None"):
        try:
            session = boto3.session.Session(profile_name=aws_auth['profile_name'])
            # Will retry any method call at most 3 time(s)
            s3 = session.client(service_name=aws_auth['client'],
                                region_name=aws_auth['region'],
                                config=Config(retries={'max_attempts': 3})
                                )
            return s3
        except Exception as err:
            print("Failed to create a boto3 client connection to S3:\n", str(err))
            logger.error('ERROR: Failed to create a boto3 client connection to S3')
            return False
    elif (aws_auth['profile_name'] is None or aws_auth['profile_name'] == "None") \
            and (aws_auth['role_name'] is not None or aws_auth['role_name'] != "None") \
            and (aws_auth['role_session'] is not None or aws_auth['role_session'] != "None"):
        try:
            session = boto3.session.Session()
            sts = session.client(service_name='sts',
                                 region_name=aws_auth['region'],
                                 config=Config(retries={'max_attempts': 3})
                                 )

            assumed_role_object = sts.assume_role(
                RoleArn="{0}".format(aws_auth['role_name']),
                RoleSessionName='{0}'.format(aws_auth['role_session'])
            )
            # can be used ay name, but need to add restriction for the name!
            s3 = session.client(aws_access_key_id=assumed_role_object['Credentials']['AccessKeyId'],
                                aws_secret_access_key=assumed_role_object['Credentials']['SecretAccessKey'],
                                aws_session_token=assumed_role_object['Credentials']['SessionToken'],
                                service_name=aws_auth['client'],
                                region_name=aws_auth['region'],
                                config=Config(retries={'max_attempts': 3})
                                )

            return s3
        except Exception as err:
            print("Failed to create a boto3 client connection to S3:\n", str(err))
            logger.error('ERROR: Failed to create a boto3 client connection to S3')
            return False
    else:
        print('Please use/set [--profile-name] or [--role-name] with [--role-session]')
        return False


def s3_bucket(aws_auth, s3_bucket_name):
    s3_bucket_status = False
    s3 = s3_connector(aws_auth)
    if s3:
        try:
            s3.head_bucket(Bucket=s3_bucket_name)
            print("A bucket {} is already exists!".format(s3_bucket_name))
            s3_bucket_status = True
            return s3_bucket_status
        except botocore.exceptions.ClientError as e:
            error_code = int(e.response['Error']['Code'])
            if error_code == 403:
                print("Private {} bucket. Forbidden Access!".format(s3_bucket_name))
                logger.error('ERROR: Private {0} Bucket. Forbidden Access!'.format(s3_bucket_name))
            elif error_code == 404:
                print("The {} bucket does not exist!".format(s3_bucket_name))
                logger.error('ERROR: The {0} bucket does not exist!'.format(s3_bucket_name))
            s3_bucket_status = False
            return s3_bucket_status
    else:
        exit(-1)

    return s3_bucket_status


def s3_objects(aws_auth, s3_bucket_name):
    s3objects = []

    s3 = s3_connector(aws_auth)
    bucket_name = s3_bucket(aws_auth, s3_bucket_name)
    if bucket_name:
        try:
            for key in s3.list_objects(Bucket=s3_bucket_name)['Contents']:
                # print(key['Key'])
                key_name = key['Key']
                if (key_name.endswith('.gz')) or (key_name.endswith('.tar.gz')):
                    retr = s3.get_object(Bucket=s3_bucket_name, Key=key_name)
                    bytestream = BytesIO(retr['Body'].read())
                    content = GzipFile(None, 'rb', fileobj=bytestream).read().decode('utf-8')
                    s3objects.append(content)
                else:
                    data = s3.get_object(Bucket=s3_bucket_name, Key=key_name)
                    contents = data['Body'].read()
                    s3objects.append(contents)

                logger.info('SUCCESS: Retrieved object(s) from S3 {0} bucket'.format(s3_bucket_name))
        except Exception as e:
            print(e)
            logger.error('ERROR: I could not retrieved object(s) from S3 {0} bucket'.format(s3_bucket_name))

    return s3objects


def sending_data_to_elastisearch(es_url, docData):
    # Index each line to ES Domain
    index_name = globalVars['esIndexPrefix'] + str(datetime.date.today().year) + '-' + str(datetime.date.today().month)
    elastic_searh_url = es_url + '/' + index_name + '/' + globalVars['esIndexDocType']
    try:
        headers = {'Content-type': 'application/json', 'Accept': 'text/plain'}
        http = urllib3.PoolManager()
        response = http.request('POST',
                                elastic_searh_url,
                                body=json.dumps(docData),
                                headers=headers,
                                retries=False,
                                timeout=30)

        # print('Response status: ', response.status, "\nResponse data: ", response.data.decode('utf-8'))

        logger.error('ERROR [response status]: {0}'.format(response.status))

        if response.status == 201:
            logger.info('INFO: Response status: {0}\nResponse data: {1}'.format(response.status,
                                                                                response.data.decode('utf-8')))
            logger.info('INFO: Successfully inserted element into ES')
        elif response.status == 405:
            logger.error('ERROR: Something is wrong with sending DATA: \n\t {}'.format(response.data.decode('utf-8')))
            exit(1)
        else:
            logger.error('FAILURE: Got an error: \n\t {}'.format(response.data.decode('utf-8')))
            exit(1)
    except Exception as e:
        logger.error('ERROR: {0}'.format(str(e)))
        logger.error('ERROR: Unable to index line:"{0}"'.format(str(docData['content'])))
        print(e)
        exit(1)

    return sending_data_to_elastisearch


def pushing_locally(aws_auth, s3_bucket_name, es_url):
    s3objects = s3_objects(aws_auth, s3_bucket_name)
    for obj in s3objects:
        reader = csv.DictReader(StringIO(obj), fieldnames=None, restkey=None, restval=None, dialect='excel')
        for row in reader:
            json_out = json.loads(json.dumps(row))

            docData = {}
            docData['content'] = str(json.dumps(json_out))
            docData['createdDate'] = '{}'.format(datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%fZ"))

            sending_data_to_elastisearch(es_url, docData)

    return pushing_locally


def lambda_handler(event, context):

    aws_auth = {
        "client": os.environ['aws_boto3_client'],
        "region": os.environ['aws_region'],
        "profile_name": os.environ['aws_profile_name'],
        "role_name": os.environ['aws_role_name'],
        "role_session": os.environ['aws_role_session']
    }

    s3_bucket_name = os.environ['aws_s3_bucket_name']
    es_url = os.environ['elasticsearch_url']

    logger.info("Received event: " + json.dumps(event, indent=2))

    s3objects = s3_objects(aws_auth, s3_bucket_name)

    for obj in s3objects:
        reader = csv.DictReader(StringIO(obj.decode('utf-8')),
                                fieldnames=None,
                                restkey=None,
                                restval=None,
                                dialect='excel')
        for row in reader:
            json_out = json.loads(json.dumps(row))

            docData = {}
            docData['content'] = str(json.dumps(json_out))
            docData['createdDate'] = '{}'.format(datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%fZ"))

            sending_data_to_elastisearch(es_url, docData)
    logger.info('SUCCESS: Successfully indexed the entire doc into ElastiSearch')

    return {"Status": "AWS Lambda handler has been finished"}


if __name__ == '__main__':
    parser = argparse.ArgumentParser(prog='python3 script_name.py -h',
                                     usage='python3 script_name.py {ARGS}',
                                     add_help=True,
                                     prefix_chars='--/',
                                     epilog='''created by Vitalii Natarov'''
                                     )
    parser.add_argument('--version', action='version', version='v0.1.0')
    parser.add_argument('--bclient', dest='boto3_client', help='Set boto3 client', default='s3')
    parser.add_argument('--region', dest='region', help='Set AWS region for boto3', default='us-east-1')
    parser.add_argument('--pname', '--profile-name', dest='profile_name', help='Set profile name of AWS',
                        default=None)
    parser.add_argument('--rname', '--role-name', dest='role_name', help='Set role ARN name',
                        default=None)
    parser.add_argument('--rsession', '--role-session', dest='role_session', help='Set role session name',
                        default=None)
    parser.add_argument('--s3-bucket', '--s3bucket', dest='s3_bucket', help='Set S3 bucket name',
                        default="test-s3-to-elastisearch")
    parser.add_argument('--es-url', '--esurl', dest='es_url', help='Set ElastiSerch URL',
                        default="http://localhost:9200")
    parser.add_argument('--lambda', dest='aws_lambda', help='Set lambda usage', default=True)

    results = parser.parse_args()

    boto3_client = results.boto3_client
    region = results.region
    profile_name = results.profile_name
    role_name = results.role_name
    role_session = results.role_session
    s3_bucket_name = results.s3_bucket
    es_url = results.es_url
    aws_lambda = results.aws_lambda

    if aws_lambda == 'True':
        lambda_handler(None, None)
    else:
        start__time = time.time()
        aws_auth = {
            "client": boto3_client,
            "region": region,
            "profile_name": profile_name,
            "role_name": role_name,
            "role_session": role_session
        }

        pushing_locally(aws_auth, s3_bucket_name, es_url)

        end__time = round(time.time() - start__time, 2)
        print("--- %s seconds ---" % end__time)

Получить помощь, можно так:

$ python3 s3-to-elastisearch.py --help
usage: python3 script_name.py {ARGS}

optional arguments:
  -h, --help            show this help message and exit
  --version             show program's version number and exit
  --bclient BOTO3_CLIENT
                        Set boto3 client
  --region REGION       Set AWS region for boto3
  --pname PROFILE_NAME, --profile-name PROFILE_NAME
                        Set profile name of AWS
  --rname ROLE_NAME, --role-name ROLE_NAME
                        Set role ARN name
  --rsession ROLE_SESSION, --role-session ROLE_SESSION
                        Set role session name
  --s3-bucket S3_BUCKET, --s3bucket S3_BUCKET
                        Set S3 bucket name
  --es-url ES_URL, --esurl ES_URL
                        Set ElastiSerch URL
  --lambda AWS_LAMBDA   Set lambda usage

created by Vitalii Natarov

Пример использования:

$ python3 s3-to-elastisearch.py --lambda=False --profile-name=default

Или:

$ python3 s3-to-elastisearch.py --lambda=False --role-name="role_here" --role-session="session"

Насчет AWS Lambda, то для нее нужно будет выставить Environment переменные со следующими ключами:

  • aws_s3_bucket_name — Опция которая позваляет задать имя для AWS S3 бакета (т.е где будут лежать ваши CSV файлы).
  • elasticsearch_url — Далавляем УРЛ ElasticSearch-а. Например — «localhost:9200».
  • boto3_client — Клиент для подключения, например — S3.
  • aws_region — Выставляем AWS регион, например: us-east-1.
  • aws_profile_name — Профиль для подключения и получения ресурсов с AWS. Например — default.
  • aws_role_name — Если не выставлен aws_profile_name, то нужно выставить aws_role_name для использования ресурсов в AWS.
  • role_session — Если не выставлен aws_profile_name, то нужно выставить role_session для использования ресурсов в AWS.

Например:

aws_lambda_role_trust_policies.json выглядат так:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "LambdaAssumeRoleAllow",
      "Effect": "Allow",
      "Principal": {
        "Service": "lambda.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}

aws_lambda_policy.json выглядит так:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "LogGroupAllows",
            "Effect": "Allow",
            "Action": "logs:CreateLogGroup",
            "Resource": "arn:aws:logs:us-east-1:167127734783:*"
        },
        {
            "Sid": "LogStreamAllows",
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": [
                "arn:aws:logs:us-east-1:167127734783:log-group:/aws/lambda/s3-to-elasticsearch:*"
            ]
        },
        {
            "Sid": "ESAllows",
            "Effect": "Allow",
            "Action": [
                "es:ListElasticsearchInstanceTypeDetails",
                "es:CreateElasticsearchDomain",
                "es:ListTags",
                "es:ESHttpDelete",
                "es:DeleteElasticsearchServiceRole",
                "es:GetUpgradeHistory",
                "es:ESHttpHead",
                "es:DeleteElasticsearchDomain",
                "es:DescribeElasticsearchDomain",
                "es:UpgradeElasticsearchDomain",
                "es:ESHttpPost",
                "es:ESHttpPatch",
                "es:DescribeElasticsearchDomains",
                "es:DescribeReservedElasticsearchInstanceOfferings",
                "es:CreateElasticsearchServiceRole",
                "es:ESHttpGet",
                "es:DescribeElasticsearchDomainConfig",
                "es:PurchaseReservedElasticsearchInstanceOffering",
                "es:DescribeReservedElasticsearchInstances",
                "es:ListDomainNames",
                "es:UpdateElasticsearchDomainConfig",
                "es:GetCompatibleElasticsearchVersions",
                "es:GetUpgradeStatus",
                "es:ListElasticsearchInstanceTypes",
                "es:ListElasticsearchVersions",
                "es:DescribeElasticsearchInstanceTypeLimits",
                "es:ESHttpPut"
            ],
            "Resource": "*"
        },
        {
            "Sid": "S3Allows",
            "Effect": "Allow",
            "Action": [
                "S3:*"
            ],
            "Resource": [
                "*"
            ]
        }
    ]
}

python_script_role_policy.json:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "LogGroupAllows",
            "Effect": "Allow",
            "Action": "logs:CreateLogGroup",
            "Resource": "arn:aws:logs:us-east-1:167127734783:*"
        },
        {
            "Sid": "LogStreamAllows",
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": [
                "arn:aws:logs:us-east-1:167127734783:log-group:/aws/lambda/s3-to-elasticsearch:*"
            ]
        },
        {
            "Sid": "ESAllows",
            "Effect": "Allow",
            "Action": [
                "es:ListElasticsearchInstanceTypeDetails",
                "es:CreateElasticsearchDomain",
                "es:ListTags",
                "es:ESHttpDelete",
                "es:DeleteElasticsearchServiceRole",
                "es:GetUpgradeHistory",
                "es:ESHttpHead",
                "es:DeleteElasticsearchDomain",
                "es:DescribeElasticsearchDomain",
                "es:UpgradeElasticsearchDomain",
                "es:ESHttpPost",
                "es:ESHttpPatch",
                "es:DescribeElasticsearchDomains",
                "es:DescribeReservedElasticsearchInstanceOfferings",
                "es:CreateElasticsearchServiceRole",
                "es:ESHttpGet",
                "es:DescribeElasticsearchDomainConfig",
                "es:PurchaseReservedElasticsearchInstanceOffering",
                "es:DescribeReservedElasticsearchInstances",
                "es:ListDomainNames",
                "es:UpdateElasticsearchDomainConfig",
                "es:GetCompatibleElasticsearchVersions",
                "es:GetUpgradeStatus",
                "es:ListElasticsearchInstanceTypes",
                "es:ListElasticsearchVersions",
                "es:DescribeElasticsearchInstanceTypeLimits",
                "es:ESHttpPut"
            ],
            "Resource": "*"
        },
        {
            "Sid": "S3Allows",
            "Effect": "Allow",
            "Action": [
                "S3:*"
            ],
            "Resource": [
                "*"
            ]
        }
    ]
}

python_script_role_trust_policies.json:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "sts.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    },
    {
      "Sid": "ff",
      "Effect": "Allow",
      "Principal": {
        "AWS": "arn:aws:sts::167127734783:assumed-role/test-lambda-role/s3-to-elasticsearch"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}

Можно использовать! Если нужно помочь в настройке, пишите, — смогу помочь!

И да, код будет отправлен в GitHub и его можно будет стянуть:

$ git clone git@github.com:SebastianUA/lambda-s3-elastisearch.git

Вот и все, статья «AWS Lambda на python3 в Unix/Linux» завершена.

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *

Этот сайт использует Akismet для борьбы со спамом. Узнайте, как обрабатываются ваши данные комментариев.