AWS Big Data Blog

Performance updates to Apache Spark in Amazon EMR 5.24 – Up to 13x better performance compared to Amazon EMR 5.16

Amazon EMR release 5.24.0 includes several optimizations in Spark that improve query performance. To evaluate the performance improvements, we used TPC-DS benchmark queries with 3-TB scale and ran them on a 6-node c4.8xlarge EMR cluster with data in Amazon S3. We observed up to 13X better query performance on EMR 5.24 compared to EMR 5.16 when operating with a similar configuration.

Customers use Spark for a wide array of analytics use cases ranging from large-scale transformations to streaming, data science, and machine learning. They choose to run Spark on EMR because EMR provides the latest, stable open source community innovations, performant storage with Amazon S3, and the unique cost savings capabilities of Spot Instances and Auto Scaling.

Each monthly EMR release offers the latest open source packages, alongside new features such as multiple master nodes, and cluster reconfiguration. The team also adds performance improvements with each release.

Each of those optimizations helps you run faster and reduce costs. With EMR 5.24, we have made several new optimizations and are detailing three critical ones in this post.

Setup

To get started with EMR, sign into the console, launch a cluster, and process data.

To replicate the setup for the benchmarking queries, use the following configuration:

  • Applications installed on the cluster: Ganglia, Hive, Spark, Hadoop (installed by default).
  • EMR release: EMR 5.24.0
  • Cluster configuration
    • Master instance group: 1 c4.8xlarge instance with 512 GiB of GP2 EBS storage (4 volumes of 128 GiB each)
    • Core instance group: 5 c4.8xlarge instances with 512 GiB of GP2 EBS storage (4 volumes of 128 GiB each)
Classification Properties
yarn-site yarn.nodemanager.resource.memory-mb : 53248
yarn.scheduler.maximum-allocation-vcores : 36
spark-defaults spark.executor.memory : 4743m
spark.driver.memory : 2g
spark.sql.optimizer.distinctBeforeIntersect.enabled : true
spark.sql.dynamicPartitionPruning.enabled : true
spark.sql.optimizer.flattenScalarSubqueriesWithAggregates.enabled : true
spark.executor.cores : 4
spark.executor.memoryOverhead : 890m

Results observed using TPC-DS benchmarks

The following two graphs compare the total aggregate runtime and geometric mean for all queries in the TPC-DS 3TB query dataset between the EMR releases.

The per-query runtime improvement between EMR 5.16 and EMR 5.24 is also illustrated in the following chart. The horizontal axis shows each of the queries in the TPC-DS 3 TB benchmark. The vertical axis shows the orders of magnitude of performance improvement seen in EMR 5.24.0 relative to EMR 5.16.0 as measured by query execution time. The largest performance improvements can be seen in 26 of the queries. In each of these queries, the performance was at least 2X better than EMR 5.16.

Performance optimizations in EMR 5.24

While AWS made several incremental performance improvements aggregating to the overall speedup, this post describes three major improvements in EMR 5.24 that affect the most common customer workloads:

  • Dynamic partition pruning
  • Flatten scalar subqueries
  • DISTINCT before INTERSECT

Dynamic partition pruning

Dynamic partition pruning improves job performance by selecting specific partitions within a table that must be read and processed for a query. By reducing the amount of data read and processed, queries run faster. The open source version of Spark (2.4.2) only supports pushing down static predicates that can be resolved at plan time. Examples of static predicate push down include the following:

partition_col = 5

partition_col IN (1,3,5)

partition_col BETWEEN 1 AND 3

partition_col = 1 + 3

With dynamic partition pruning turned on, Spark on EMR infers the partitions that must be read at runtime. Dynamic partition pruning is disabled by default, and can be enabled by setting the Spark property spark.sql.dynamicPartitionPruning.enabled from within Spark or when creating clusters. For more information, see Configure Spark.

Here’s an example that joins two tables and relies on dynamic partition pruning to improve performance. The store_sales table contains total sales data partitioned by region, and store_regions table contains a mapping of regions for each country. In this representative query, you want to only get data from a specific country.

SELECT ss.quarter, ss.region, ss.store, ss.total_sales 
FROM store_sales ss, store_regions sr
WHERE ss.region = sr.region AND sr.country = ’North America’

Without dynamic partition pruning, this query reads all regions, before filtering out the subset of regions that match the results of the subquery. With dynamic partition pruning, only the partitions for the regions returned in the subquery are read and processed. This saves time and resources by both reading less data from storage, and processing fewer records.

The following graph shows the performance improvements to Queries 72, 80, 17, and 25 from the TPC-DS suite that we tested with 3-TB data.

Flatten scalar subqueries

This optimization can improve query performance where multiple conditions must be applied to rows from a specific table. The optimization prevents the table from being read multiple times for each condition. This optimization detects such cases, and optimizes the query to read the table only one time.

Flatten scalar subqueries is disabled by default and can be enabled by setting the Spark property spark.sql.optimizer.flattenScalarSubqueriesWithAggregates.enabled from within Spark or when creating clusters.

To give an example of how this works, use the same total_sales table from the previous optimization. In this example, you want to group stores by their average sales when their average sales are in between specific ranges.

SELECT (SELECT avg(total_sales) FROM store_sales 
WHERE total_sales BETWEEN 5000000 AND 10000000) AS group1, 
(SELECT avg(total_sales) FROM store_sales 
WHERE total_sales BETWEEN 10000000 AND 15000000) AS group2, 
(SELECT avg(total_sales) FROM store_sales 
WHERE total_sales BETWEEN 15000000 AND 20000000) AS group3  

With this optimization disabled, the total_sales table is read for each sub query. With the optimization enabled, the query is rewritten as follows to apply each of the conditions to the rows returned by reading the table only one time.

SELECT c1 AS group1, c2 AS group2, c3 AS group3 
FROM (SELECT avg (IF(total_sales BETWEEN 5000000 AND 10000000, total_sales, null)) AS c1, 
avg (IF(total_sales BETWEEN 10000000 AND 15000000, total_sales, null)) AS c2, 
avg (IF(total_sales BETWEEN 15000000 AND 20000000, total_sales, null)) AS c3 FROM store_sales);  

This optimization saves time and resources by both reading less data from storage, and processing fewer records.

To illustrate, take the example of Q9 from the TPCDS suite. The query runs 2.9x faster in version 5.24 compared to 5.16, when the relevant Spark property is switched on.

DISTINCT before INTERSECT

When producing the intersection of two collections, the result of that intersection is a set of unique values found in each collection. When dealing with large collections, many duplicate records must be both processed, and shuffled between hosts to finally calculate the intersection. This optimization eliminates duplicate values in each collection before computing the intersection, improving performance by reducing the amount of data shuffled between hosts.

This optimization is disabled by default and can be enabled by setting the Spark property spark.sql.optimizer.distinctBeforeIntersect.enabled from within Spark or when creating clusters.

For example (simplified from TPC-DS query14), you want to find all of the brands that are sold both in store and catalog sale channels. In this example, the store_sales table contains sale made through the store channel, the catalog_sales table contains sale made through catalog, and the item table contains each unique product’s formulation (e.g. brand, manufactuer).

(SELECT item.brand ss_brand FROM store_sales, item
WHERE store_sales.item_id = item.item_id)
INTERSECT
(SELECT item.brand cs_brand FROM catalog_sales, item 
WHERE catalog_sales.item_id = item.item_id) 

With this optimization disabled, the first SELECT statement produces 2,600,000 records (same number of records as store_sales) with only 1,200 unique brands. The second SELECT statement produces 1,500,000 records (same number of records as catalog_sales) with 300 unique brands. This results in all 4,100,000 rows being fed into the intersect operation to produce the 200 brands that exist in both results.

With the optimization enabled, a distinct operation is performed on each collection before being fed into the intersect operator, resulting in only 1,200 + 300 records being fed into the intersect operator. This optimization saves time and resources by shuffling less data between hosts.

Summary

With each of these performance optimizations to Apache Spark, you benefit from better query performance on EMR 5.24 compared to EMR 5.16. We look forward to feedback on how these optimizations benefit your real world workloads.

Stay tuned as we roll out additional updates to improve Apache Spark performance in EMR. To keep up-to-date, subscribe to the Big Data blog’s RSS feed to learn about more great Apache Spark optimizations, configuration best practices, and tuning advice. Be sure not to miss other great optimizations like using S3 Select with Spark, and the EMRFS S3-Optimized Committer from previous EMR releases.

 


About the Author

Paul Codding is a senior product manager for EMR at Amazon Web Services.

 

 

 

 

Peter Gvozdjak is a senior engineering manager for EMR at Amazon Web Services.

 

 

 

 

Joseph Marques is a principal engineer for EMR at Amazon Web Services.

 

 

 

 

Yuzhou Sun is a software development engineer for EMR at Amazon Web Services.

 

 

 

 

Atul Payapilly is a software development engineer for EMR at Amazon Web Services.

 

 

 

 

Surya Vadan Akivikolanu is a software development engineer for EMR at Amazon Web Services.