亚马逊AWS官方博客

AWS Glue 增量数据加载和优化的 Parquet 写入器

Original URL: https://aws.amazon.com/blogs/big-data/load-data-incrementally-and-optimized-parquet-writer-with-aws-glue/

AWS Glue 提供一个无服务器环境,用于准备(提取和转换)和从各种来源加载大量数据集,以通过 Apache Spark ETL 作业进行分析和数据处理。该系列的第一篇文章 《使用 AWS Glue 扩展 Apache Spark 作业和分区数据的最佳实践》,讨论了帮助 Apache Spark 应用程序和 Glue ETL 作业的开发人员、大数据架构师、数据工程师和业务分析师自动扩展在 AWS Glue 上运行的数据处理作业的最佳实践。本文将介绍如何从 Amazon S3 数据湖以及JDBC连接数据库中的数据源增量加载数据,并且还会展示如何通过作业书签使 AWS Glue ETL 作业仅读取新添加的数据,以及如何通过在之前的作业运行结束时重置作业书签,让 AWS Glue ETL 作业处理晚到达的数据。本文还将回顾作业书签与复杂的AWS Glue ETL 脚本和工作负载配合使用的最佳实践。最后,本文将介绍如何使用经过性能优化的自定义 AWS Glue Parquet 写入器,可以在运行时计算架构,避免额外的数据传输。AWS Glue Parquet 写入器还通过添加或删除列支持数据集的架构演变。

AWS Glue 作业书签

AWS Glue 的 Spark 运行时有一个存储状态的机制,这个机制用于跟踪由某个 ETL 作业的特定运行处理过的数据。该持久保存的状态信息称为作业书签

上面的截图显示了在Glue 控制台界面上同一个 ETL 作业在不同的时间运行了多次。AWS Glue 作业使用作业书签来处理自上次作业运行以来的增量数据。作业书签由各种作业元素(如数据源、转换和目标)的状态构成。例如,AWS Glue 作业可以读取由 S3 支持的表中的新分区。AWS Glue 跟踪作业已成功处理的分区,以防止重复处理和向目标数据存储多次写入相同的数据。

作业书签 API

当使用 AWS Glue 控制台或 AWS Glue API 启动作业时,作业书签将以参数形式传递。

作业书签有以下三种设置选项:

  • 启用 – 此选项使作业在每次成功运行后更新书签状态,以跟踪已处理的数据。在同一数据源上运行的后续作业仅处理自上一个检查点以来的新增数据。
  • 禁用 – 确保不使用作业书签,这会导致作业始终在处理整个数据集。这是默认选项。
  • 暂停 – 读取状态信息并处理自上一个检查点以来的增量数据,但不进行更新。可以使用此选项指定每个后续运行都从相同的时间点开始处理数据。

在所有情况下,都需要由您负责管理来自上一个作业的输出。有关更多信息,请参阅本系列的第一篇文章:《使用 AWS Glue 扩展 Apache Spark 作业和分区数据的最佳实践》。有关向作业(特别是向作业书签)传递参数的详细信息,请参阅 由 AWS Glue 使用的特殊参数

对于从由 Amazon S3 支持的 AWS Glue 表中读取的 Glue ETL 作业,以下代码示例显示了如何在其中使用作业书签。作业以 JSON 格式从 Kinesis Firehose 事件流接收新文件,然后对表中的两列进行重命名,再转换并输出到 Amazon Redshift。transformation_ctx 是与此数据源相关联的作业书签的标识符。要正确执行操作,您需要使用 job.init 和 job.commit 来初始化和持久保存书签的状态。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

job = Job(glueContext)
job.init(args['JOB_NAME'], args)
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "firehose_s3_db",
                table_name = "firehose_s3_raw_table",
                transformation_ctx = "datasource0")
applymapping = ApplyMapping.apply(frame = datasource0,
                mappings = [("col0", "string", "name", "string"), ("col1", "string", "number", "string")],
                transformation_ctx = "applymapping1")

glueContext.write_dynamic_frame.from_jdbc_conf(frame = applymapping, catalog_connection = "redshift", connection_options = {"dbtable": "name", "database": "kinesis_db"}, redshift_tmp_dir= "s3://redshift_tmp_dir_path")

job.commit()

当使用 API 或 CLI 来开始作业运行时,需要添加以下参数以启用作业书签。

Job Arguments :

--job-bookmark-option, job-bookmark-enable
--JOB_NAME, glue-job-incremental

对于 S3 输入源,AWS Glue 作业书签会检查对象的最后修改时间,以验证要重新处理哪些对象。如果有来自 Kinesis Firehose 的新文件,或者从最后一个作业运行至今已有文件发生了更改,则当通过周期性Glue 作业触发器S3 触发器通知再次运行作业时,作业将会重新处理这些文件。

如果您想要使用相同的作业重新处理所有数据,请重置作业书签。要重置作业书签状态,请使用 AWS Glue 控制台,ResetJobBookmark操作(Python: reset_job_bookmark) API 操作或 AWS CLI。例如,使用 AWS CLI 输入以下命令:

aws glue reset-job-bookmark --job-name my-job-name

您也可以通过传入作业运行 ID,使用 ResetJobBookmark API 将计划作业运行重置到特定时间点,该 API 将作业书签的状态重置到作业运行 ID 完成后的时间点。举例来说,此功能类似于时空穿梭,您现在可以从过去的时间点重新处理输入数据,并在 ETL 脚本中或 ETL 管道中与 AWS Glue 工作流编排在一起的下游作业中,使用一组不同的转换操作。您可以在 AWS Glue 控制台中,使用倒回作业书签选项,将作业书签状态重置为先前的作业运行。

AWS Glue 会跟踪每个作业的书签。如果删除一个作业,相应的作业书签也将被删除。常见的S3 源存储格式包括 JSON、CSV、Apache Avro、XML,以及 JDBC 源都支持作业书签。从 AWS Glue 1.0 版本开始, Apache Parquet 和 ORC 等列式存储格式也同样支持作业书签。

最佳实践 1:使用作业书签进行开发

在某些情况中,您可能已启用了 AWS Glue 作业书签,但 AWS Glue 作业却重新处理了之前运行中已处理过的数据。发生这种情况可能是由于以下原因:

  • 缺少作业提交 – AWS Glue ETL 脚本末尾的 job.commit() 语句用于更新作业书签的状态。如果未包括该语句,则作业将重新处理之前处理过的文件和新文件。请确保在您的用户脚本中所有通向作业完成的代码路径中,都会执行该作业提交语句。
  • 缺少转换上下文 – 转换上下文在 GlueContext 中是可选参数,但作业标签需要该参数才能正常工作,请确认在创建 DynamicFrame 时包括了转换上下文参数。请参阅以下代码示例:

sample_dynF=glueContext.create_dynamic_frame_from_catalog(database,

table_name,

transformation_ctx=”sample_dynF”)

  • JDBC – 在使用 JDBC 连接访问关系数据库时,作业书签要求源表要么包含主键列,要么包含单调递增或递减值的列(需要在源选项中指定)。作业书签可以仅捕获新增行。此行为不适用于 S3 上存储的源表。
  • 最后修改时间 – 为确定处理存储在 S3 上的哪些文件,作业书签将检查对象的最后修改时间,而不是文件名。如果自作业最后一次运行以来更改过输入对象,则当作业再次运行时,将重新处理这些对象。

最佳实践 2:监控作业书签

有三种方式可以检查所有作业运行的作业书签的行为:

  • 存储在临时目录中的文件列表 – 所有运行 Apache Spark 并使用 DynamicFrames 读取数据的AWS Glue ETL 作业都会输出一个清单文件,按路径列出已处理的文件。清单文件存储在作业中指定的临时位置。文件路径为:<temporary location>/partitionlisting/<job name>/<run id>/<source transformation_ctx>.input-files.json此文件捕捉为相应数据源读取的文件列表,无论是否启用作业书签。
  • 作业指标 – 可以使用 AWS Glue 作业指标来检查 S3 读取和写入操作,并跟踪作业通过书签读取的字节数。您还可以在 AWS Glue 控制台跟踪一个作业多次运行读取的数据。有关更多信息,请参阅监控多个作业的进度
  • Glue 作业日志 – AWS Glue 作业还会在 Spark 驱动程序日志流中发出与 S3 中处理分区和跳过分区有关的日志,这些日志存储在 Amazon CloudWatch 中。

跳过分区

当分区为空或在 AWS Glue Data Catalog 中创建该特定分区的时间戳早于作业书签捕获到的最后一次作业运行的时间戳时,作业将跳过分区。以下示例日志消息指出已跳过的分区:

19/05/21 14:49:22 WARN HadoopDataSource: Skipping Partition
{"year": "2019", "month": "03", "day": "26", "hour": "13"}
has no new files detected 
@ s3://input-s3-prefix/Year=2019/Month=03/Day=26/Hour=13/ 
or path does not exist

处理分区

当作业发现在最后一次作业运行之后创建的新 S3 分区或者有需要处理的新文件时,它将生成日志消息。日志消息还会指出特定分区占文件总数的百分比。当前作业运行的初始和最终作业书签筛选器将处理这些文件。以下示例说明了作业书签的筛选逻辑。

如果是新分区(从分区创建时间来看,晚于最近作业运行的时间),则作业将处理该分区中的所有文件。分区创建时间为 1559235148000,这在最后作业运行时间之后。请参阅以下示例日志消息:

19/05/31 10:39:55 INFO PartitionFilesListerUsingBookmark:
Found new partition DynamicFramePartition(com.amazonaws.services.glue.DynamicRecord@35309577,
s3://input-s3-prefix/Year=2018/Month=12/Day=05/Hour=13/,
1559235148000) 
with 47 files

现有分区将触发第一个书签筛选器。此筛选器选择修改时间戳在最后一次作业运行之后的文件。在以下示例日志消息中,分区中共有 47 个文件,其中 15 个文件是新文件,应予以处理:

19/05/31 10:40:31 INFO PartitionFilesListerUsingBookmark:
After initial job bookmarks filter,
processing 31.91% of 47 files 
in partition DynamicFramePartition(com.amazonaws.services.glue.DynamicRecord@aa39e364,
s3://input-s3-prefix/Year=2018/Month=12/Day=05/Hour=13/,
1559235148000)

最终的书签筛选器会执行额外的筛选,以避免出现与 S3 最终一致性相关的竞争状况。如果到达的大量文件具有相同的修改时间,则此筛选器可能会将这些文件排除在处理范围之外。在以下示例日志消息中,筛选器处理了初始书签筛选器捕捉到的全部 15 个文件:

19/05/31 10:50:31 INFO PartitionFilesListerUsingBookmark:
After final job bookmarks filter, processing 100.00% of 15 files 
Found new partition DynamicFramePartition(com.amazonaws.services.glue.DynamicRecord@35309577,
s3://input-s3-prefix/Year=2018/Month=12/Day=05/Hour=13/,
1559235148000)

优化的 Apache Parquet 写入器

当使用 DynamicFrames 时,AWS Glue 提供了优化的 Apache Parquet 写入器以改进性能。通常情况下,Apache Parquet 格式的读取速度比写入速度要快,因为它采用列式存储布局以及将数据写入文件中的预计算架构。AWS Glue 的 Parquet 写入器提供了更高的写入性能,以及在处理不断演变的数据集时的灵活性。与默认的 Apache Spark Parquet 写入器不同,它不需要预计算架构,也不需要通过对输入数据集执行额外扫描而推断出架构。

您可以通过将 write_dynamic_frame.from_options 功能的 format 参数设置为 glueparquet 来启用 AWS Glue Parquet 写入器。由于数据通过 AWS Glue 作业传输并写入 S3,该优化的写入器将在运行时动态计算和合并架构,从而减少作业运行时间。AWS Glue Parquet 写入器还允许删除和添加新列,从而为架构演变提供支持。

您可以通过设置 format_options 参数来进一步调整 AWS Glue Parquet 写入器。请参阅以下代码示例:

block_size = 128*1024*1024
page_size = 1024*1024
glueContext.write_dynamic_frame.from_options(frame = dyFrame,
connection_type = "s3", connection_options = {"path": output_dir},
format = "glueparquet",
format_options = {"compression": "snappy",
                  blockSize = block_size, pageSize = page_size})

format_options 的默认值如下:

  • compressionsnappy
  • blockSize 为 128 MB
  • pageSize 为 1 MB

blockSize 指定在内存中缓冲的 Parquet 文件的一个行组的大小。pageSize 指定在访问单个记录时,必须从 Parquet 文件中完全读取的最小单元的大小。

小结

本文讨论了 AWS Glue 作业书签如何帮助以增量方式处理从 S3 和关系型数据库收集的数据。您还学习了使用作业书签可以让回填历史数据变得简单。与作业书签进行交互非常容易; 您可以启用、禁用、暂停它们,或将其还原到之前的时间点。通过监控作业书签的进度和状态,您可以更好地调整作业并进行检查以确保正确处理了所有数据。

您还可以使用 AWS Glue Parquet 写入器来优化在数据湖中写入 Apache Parquet 文件的性能。该写入器经过优化,支持 Parquet 文件的架构演变,让您可以自动管理数据更改。

建议您使用 AWS Glue 在 Apache Spark 应用程序中加载和写入您的数据,亲自试一试这些功能。

本系列的第三篇文章将讨论 AWS Glue 的自动代码生成功能如何让您轻松处理和转换复杂的数据集。该文章还会展示如何直接从 AWS Glue ETL 脚本对数据集执行 SQL 查询,以及如何使用 AWS Glue 工作流计划和编排数据管道。

 


关于作者

Mohit Saxena 是 AWS Glue 的技术主管经理。他热衷于构建可扩展的分布式系统,以有效地管理云中的数据。他喜欢看电影和了解最新技术前沿。

 

 

 

 

Bijay Bisht 是 AWS 的高级软件开发工程师。