亚马逊AWS官方博客

使用 GLUE 构建无服务器架构的 ETL Pipeline

ETL 是将业务系统的数据经过抽取、清洗转换之后加载到数据处理平台的过程,其目的是将企业中的分散、凌乱、标准不统一的数据整合到一起,为企业的决策提供分析依据。众所周知,ETL 是 BI 项目重要的一个环节,通常在 BI 项目中 ETL 会花掉整个项目中至少1/3的时间。ETL 设计的好坏直接关系整个 BI 项目的成败。目前市场上主流的 ETL 工具譬如 Oracle 的 OWB、SQL Server 的 DTS 和 SSIS 服务、Informatic 等,它们要么通过单纯的 SQL 方式实现,屏蔽编码的复杂性,但是缺少灵活性,要么与后端数据源集成度不够高,缺少友好开发环境支持,对触发器,脚本化方面支持不够好。基于上面的 ETL 工具的不足,AWS 推出 Glue 服务可以很方便的解决上面的问题。AWS Glue 易于使用,内置常用的 Classifiers,轻松识别常用的数据格式,同时支持自定义 Classifiers,自动抽取多种数据源的元数据;完全托管的无服务器架构的服务,用户完全不用担心底层基础架构的故障,软件的更新,以及并发能力等,只需要将精力聚焦在如何提高 ETL 的效率方面;另外在开发编程环境,触发器,自动化脚本作业方面都提供很好的支持。

默认情况下,AWS Glue 提供的内置 Classifiers 如果不能满足数据抽取的需求我们需要创建自定义的 Classifiers,本文将演示如何通过 AWS Glue 构建无服务器架构的 ETL  Pipeline 实现自定义文本识别器和将多个 CSV 文件在同一 Job 中完成数据的清洗,并将目标格式转换为 Parquet。

准备数据

选择数据源 Web 服务器的日志文件,演示如何自定义文本识别器。Web 服务器的日志样例格式如下:

该日志文件直接使用 AWS Glue 默认的文本识别器是无法识别的,类型显示是 unkonwn 的。

另外一个数据源是选择从公共数据集 NYTaxi 中下载的3个 Excel 文 件,演示如何在一个 Job 中实现批量转换多个表。该 Excel 文件样例格式如下:

创建自定义文本识别器

Step 1  创建 Metadata 数据库

打开 AWS Glue 服务的 Console,在左面的导航面板中选择 database,点击 Add Database,输入数据库名:testdb,如下图所示:

选择 create,数据库创建完成。

Step 2 创建 S3 bucket 用于存放源数据

打开 AWS S3 Console,点击 Create Bucket, 输入 Bucket Name: teetttt,如下图所示:

Step 3 创建自定义的 Classifiers

在 AWS Glue Console 左面的导航面板,选择 Classifier,点击 Add Classifier,输入 Classifier Name为errlog,选择 Classifier Type 为 Grok,输入 Grok pattern 为:

%{TESTTIME:timestamp} %{IPORHOST:login_host} \[\S+ %{USER:login_user}/%{NUMBER:pid} as %{USER:sudouser}/%{NUMBER:sudouser_pid} on %{WORD:tty}/%{NUMBER:tty_id}/%{IPORHOST:host_ip}:%{NUMBER:source_port}-\>%{IPORHOST:local_ip}:%{NUMBER:dest_port}\](?:\:|) (%{UNIXPATH:current_path} %{GREEDYDATA:command}|%{GREEDYDATA:detail})

Custom Pattern 为:

TESTTIME ([A-z][A-z][A-z] [A-z][A-z][A-z]  [0-9] [0-9][0-9][:][0-9][0-9][:][0-9][0-9] [0-9][0-9][0-9][0-9])

如下图所示:

点击 Apply,配置完成。

注意在自定义 classifier 时,Grok 中支持的模式变量和 AWS Glue 中内置的正则变量可以直接在 Grok pattern 使用,譬如 GREEDDATA,UNIXPATH,USER 等。但是如果使用自己定义的正则模式变量,则需要在 Custom pattern 中定义,在本例中 TESTTIME 变量,由于默认的时间模式不能识别 Sun Mar  7 16:05:29 2004 日期格式,因而需要自己定义。

Step 4 创建 Crawlers,使用自定义的 classifier(errlog) 进行元数据爬取

在 AWS Glue Console 的左面导航面板中选择 Crawler,点击 Add Crawler,输入名称 customerrlog,自定义的 classifier 显示在页面下,如下图所示:

点击 Add,添加 errlog 到该 Crawler 中。

点击 Next,选择 Datastore 为 S3,指定 Step 2 创建的 bucket,如下图所示:

继续点击 Next,指定角色,允许该角色访问 S3:

指定调度的频率为 Run On Demand,如下:

设置 Crawler 的元数据输出为 Step 1 中创建的数据库 testdb,如下:

继续选择 Next,review 信息无误,点击 Finish 创建完成。

Step 5 运行 Crawler(customerrlog),爬取 S3 bucket(teetttt)的数据

点击 Run Crawler,运行完毕,生成一个元数据表,如下图所示:

Step 6 调用 AWS Athena 浏览元数据表

选择 errlogbucket 表,点击 view data,AWS Glue 会自动调用 AWS Athena,显示元数据表,如下图所示:

日志文件通过自定义的 Classifier 抓取结果如下:

至此整个自定义 Classifier 对存储在 S3 上的数据进行爬取完成。这样用户可以根据元数据表的信息进行任意的数据转换和加载工作。

下面演示利用在一个 Job 中实现对多个元数据表数据进行清洗,由于 CSV 格式在大数据处理过程中效率相对于 parquet 稍差,需要将源数据(默认格式是 CSV 文件),转换为 parquet 格式,并再次写会到 S3 存储桶中。

创建一个 Job 实现对多个数据源文件进行清洗及格式转换

Step 7 创建 Crawler 命名为 crawlertaxidata,并创建 S3 数据输出存储桶

其具体操作方式参照上面的步骤介绍。

Step 8 运行该 Crawler,自动创建三张元数据表

分别为 flv, green和 vendorinfo,点击 View data 可以查看具体的元数据信息,如下图所示:

Step 9 创建 Job,实现单元数据表到单元数据表的数据抽取和转换

选择创建 Job,输入 Job 名称,指定 IAM Role,脚本的存储路径和临时路径,如下图所示:

Step 10 选择元数据表

此处只允许选择一个元数据表,选择 green,如下图所示:

Step 11 选择目标数据输出的存储位置 S3 及格式 Parquet

如下图:

AWS Glue 支持5种输出格式,分别为 CSV,Parquet,ORC,Json,Avro。输出输出支持 S3 和 JDBC 两种方式。

Step 12 数据清洗及数据类型转换选择

根据需求自动进行数据的清洗及类型选择如下图所示:

Step 13 生成 Job 脚本,点击保存和编辑脚本

如下所示:

由于在实际项目中进行数据抽取的数据源会非常多,因为不能真对单个数据创建一个 Job,需要实现同一个 Job 进行多个数据源的转换操作,需要编辑脚本。

Step 14 编辑 Job 脚本,点击保存脚本

具体编辑完的脚本如下:

def map_function(dynamicRecord):
	return dynamicRecord
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "testdb", table_name = "green", transformation_ctx = "datasource0")
datasource1 = glueContext.create_dynamic_frame.from_catalog(database = "testdb", table_name = "flv",  transformation_ctx = "datasource1")
datasource2 = glueContext.create_dynamic_frame.from_catalog(database = "testdb", table_name = "vendorinfo", transformation_ctx = "datasource2")
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("vendorid", "long", "vendorid", "long"), ("lpep_pickup_datetime", "string", "lpep_pickup_datetime", "string"), ("lpep_dropoff_datetime", "string", "lpep_dropoff_datetime", "string"), ("store_and_fwd_flag", "string", "store_and_fwd_flag", "string"), ("ratecodeid", "long", "ratecodeid", "long"), ("pickup_longitude", "double", "pickup_longitude", "double"), ("pickup_latitude", "double", "pickup_latitude", "double"), ("dropoff_longitude", "double", "dropoff_longitude", "double"), ("dropoff_latitude", "double", "dropoff_latitude", "double"), ("passenger_count", "long", "passenger_count", "long"), ("trip_distance", "double", "trip_distance", "double"), ("fare_amount", "double", "fare_amount", "double"), ("extra", "double", "extra", "double"), ("mta_tax", "double", "mta_tax", "double"), ("tip_amount", "double", "tip_amount", "double"), ("tolls_amount", "double", "tolls_amount", "double"), ("ehail_fee", "string", "ehail_fee", "string"), ("improvement_surcharge", "double", "improvement_surcharge", "double"), ("total_amount", "double", "total_amount", "double"), ("payment_type", "long", "payment_type", "long"), ("trip_type", "long", "trip_type", "long")], transformation_ctx = "applymapping1")
applymapping2 = ApplyMapping.apply(frame = datasource1, mappings = [("vendorid", "bigint", "vendorid", "long"), ("tpep_pickup_datetime", "string", "tpep_pickup_datetime", "string"), ("tpep_dropoff_datetime", "string", "tpep_dropoff_datetime", "string"), ("passenger_count","bigint","passenger_count","long"),("trip_distance", "double", "trip_distance", "double"),("pickup_longitude", "double", "pickup_longitude", "double"), ("pickup_latitude", "double", "pickup_latitude","double"), ("ratecodeid", "bigint", "ratecodeid", "long"),("store_and_fwd_flag", "string", "store_and_fwd_flag", "string"), ("dropoff_longitude", "double", "dropoff_longitude", "double"), ("dropoff_latitude", "double", "dropoff_latitude", "double"),("payment_type", "bigint", "payment_type", "long"),  ("fare_amount", "double", "fare_amount", "double"), ("extra", "double", "extra", "double"), ("mta_tax", "double", "mta_tax", "double"), ("tip_amount", "double", "tip_amount", "double"), ("tolls_amount", "double", "tolls_amount", "double"),  ("improvement_surcharge", "double", "improvement_surcharge", "double"), ("total_amount", "double", "total_amount", "double")], transformation_ctx = "applymapping2")
applymapping3 = ApplyMapping.apply(frame = datasource2, mappings = [("vendorid", "string", "vendorid", "bigint"), ("comname", "string", "comname", "string")], transformation_ctx = "applymapping3")
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
resolvechoice3 = ResolveChoice.apply(frame = applymapping2, choice = "make_struct", transformation_ctx = "resolvechoice3")
resolvechoice4 = ResolveChoice.apply(frame = applymapping3, choice = "make_struct", transformation_ctx = "resolvechoice4")
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
dropnullfields4 = DropNullFields.apply(frame = resolvechoice3, transformation_ctx = "dropnullfields4")
dropnullfields5 = DropNullFields.apply(frame = resolvechoice3, transformation_ctx = "dropnullfields5")


datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://nytaxisdata"}, format = "parquet", transformation_ctx = "datasink4")
datasink5 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields4, connection_type = "s3", connection_options = {"path": "s3://nytaxisdata"}, format = "parquet", transformation_ctx = "datasink5")
datasink6 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields5, connection_type = "s3", connection_options = {"path": "s3://nytaxisdata"}, format = "parquet", transformation_ctx = "datasink6")

job.commit()

保存该脚本,点击 Run,完成数据的 ETL 工作。

结论

使用 AWS Glue 可以非常方便的帮助用户构建无服务器架构的 ETL Pipeline,用户不需要担心基础架构的可靠性以及后台的数据处理能力,只需专注于 Job 作业的编写。通过内置的 Classifier 和支持自定义创建 Classifier 两种方式为用户在数据爬取方面提供更大的灵活性,友好的脚本开发环境为用户的脚本开发和数据清洗提供更大便利。

王友升

王友升拥有超过13年的 IT 从业经验,负责基于 AWS 的云计算方案架构咨询和设计,推广 AWS 云平台技术和各种解决方案。在加入 AWS 之前,王友升曾在中地数码,浪潮,惠普等公司担任软件开发工程师、DBA 和解决方案架构师。他在服务器、存储、数据库优化方面拥有多年的经验,同时对大数据、Openstack 及人工智能和机器学习方面也进行一定的研究和积累。