In-depth explanation into the New Features of Apache Spark 3.0




New Features of Apache Spark 3.0

We will mention the exciting new developments within the Spark 3.0 as well as some other major initiatives that are coming within the future. during this talk, we would like to share with the community many of the more important changes with the examples and demos.




Spark 3.0 Highlights

In Spark 3.0, the entire community resolved quite 3,400 JIRAs. Spark SQL and therefore the Core are the new core module, and every one the opposite components are built on Spark SQL and therefore the Core. Today, the pull requests for Spark SQL and therefore the core constitute quite 60% of Spark 3.0. within the previous couple of releases, the share keeps rising . Today, we’ll specialize in the key features in both Spark SQL and therefore the Core.
This release provides many new capabilities, performance gains, and extended compatibility for the Spark ecosystem. this is often a mixture of the tremendous contributions from the open-source community. it’s impossible to debate the new features within 16 minutes. We resolved quite 3,400 JIRAs. Even during this light, I did my best, but I only can put 24 new Spark 3.0 features. Today, we might wish to present a number of them. First, allow us to mention the performance-related features.

Spark 3.0 Enhanced Performance

High performance is one among the main advantages when people select Spark as their computation engine. This release keeps enhancing the performance for interactive, batch, streaming, and workloads. Here, i will be able to first cover four of the performance features in SQL query compilers.

The four major features in query compilers include a replacement framework for adaptive query execution and a replacement runtime filtering for dynamic partition pruning. And also, we reduce the overhead of our query compiler by quite a half, especially on the optimizer overhead and therefore the SQL cache synchronization. Supporting an entire set of join hints is another useful features many of us are expecting .

Adaptive query execution was accessible at the previous releases. However, the previous framework features a few major drawbacks. only a few companies are using it within the production systems. during this release,

Spark Catalyst Optimizer

Michael Armbrust. he’s the creator of Spark SQL and also, Catalyst Optimizer. within the initial release of Spark SQL, all the optimizer rules are heuristic-based. to get good query plans, the query optimizer must understand the info characteristics. Then in Spark 2.x, we inaugurate a cost-based optimizer. However, in most cases, data statistics are commonly absent, especially when statistics collection is even costlier than the info processing within the albeit the statistics are available, the statistics are likely out of date. supported the storage and therefore the compute separation in Spark, the characteristics of knowledge is unpredictable. the prices are often mis-estimated thanks to the various deployment environment and therefore the recorder user-defined functions. We are unable to estimate the value for the UDF. Basically, in many cases, Spark optimizer is enabled to get the simplest plan thanks to this limitation.

Adaptive Query Execution

For  these reasons, runtime adaptivity becomes more critical for Spark than the normal systems. So this release introduced a replacement adaptive query execution framework called AQE. the essential idea of adaptive planning is straightforward . We optimize the execution plan using the prevailing rules of the optimizer and therefore the planner after we collect more accurate statistics from the finished plans.

The line shows the new logics we added during this release. rather than directly optimizing the execution plans, we remit the unfinished plan segments then use an existing optimizer and planner to optimize them at the top and build a replacement execution plan. This release includes three adaptive features. we will convert the soft merge join to broadcast hash join, supported the runtime statistics. we will shrink the amount of reducers after over-partitioning. we will also handle the skew join at runtime. If you would like to understand more details, please read the blog post I posted here. Today, i will be able to briefly explain them one by one.



Maybe many of you already learn many performance tuning tips. for instance , to form your join faster, you would possibly guide your optimizer to settle on a broadcast hash join rather than the type merge join. you can expand the spark.sql.autobroadcastjointhreshold or use a broadcast join hint. However, it’s hard to tune it. you would possibly hit out of memory exceptions and even worsen performance. albeit it works now, it’s hard to take care of over time because it’s sensitive to your data workloads.

You might be wondering why Spark is unable to form the wise choice by itself. I can easily list multiple reasons.

  • The statistics could be missing or out of date.
  • The file is compressed.
  • The file format is column-based, therefore the file size doesn’t represent the
  • Particular data volume.
  • The filters might be compressed
  • The filters may additionally contain the recorder UDFs.
  • The whole query fragments could be large, complex, and it’s hard to estimate
  • The particular data volume for Spark to form the simplest choice.

Convert Sort Merge Join to Broadcast Hash Join

So this is often an example to point out how AQE converts a kind merge join to a broadcast hash join at runtime. First, execute the leave stages. Examine the statistics from the shuffle operators which materialize the query fragments. you’ll see the particular size of stage two is far smaller than the estimated size reduced from 30 megabytes to eight megabytes so we will optimize the remaining plan and alter the join algorithm from sort merge join to broadcast hash join.

Another accessible performance tuning tip is to tune the configuration spark.sql.shuffle.partitions. The default value may be a atomic number , 200. Previously, the first default is 8. Later, it had been increased to 200. i think nobody knows the rational why it become 200 rather than 50, 400, or 2,000. it’s very hard to tune it, to be honest. Because it’s a worldwide configuration, it’s almost impossible to make a decision the simplest value for each query’s fragment employing a single configuration, especially when your query plan is large and sophisticated .

If you set it to very small values, the partition are going to be huge, and therefore therefore the aggregation and the sort might got to spew the info to the disk. If the configuration values are too big, the partition are going to be small. But the amount of partitions is big. it’ll cause inefficient IO and therefore the performance bottleneck might be the task scheduler. Then it’ll hamper everybody. Also, it’s very hard to take care of over time.

Dynamically Coalesce Shuffle Partitions

Until you’ll solve it during a smart way, we will first increase our initial partition number to an enormous one. After we execute the leave query stage, we will know the particular size of every partition. Then we will automatically correlate the nearby partitions and automatically reduce the amount of partitions to a smaller number. this instance shows how we reduce the amount of partitions from 50 to five at runtime. and that we added the particular coalesce at runtime.

Data Skew

Data skew is extremely annoying. you’ll see some long-running or frozen task and tons of disks spinning and a really low resource authorization rate in most nodes and even out of memory. Our Spark community might tell you a great many alternative ways to unravel such a typical performance problem. you’ll find the skew value and therefore the right queries to handle the skew value separately. And also, you’ll add the particular skew keys which will remove the info skew, either new columns or some existing columns. Anyway, you’ve got to manually rewrite your queries, and this is often annoying and sensitive to your workloads, too, which might be changed over time.

This is an example without the skew optimization. due to data skew, after the shuffle, the shuffle partition, A0, are going to be very large. If we do a join on these two tables, the entire performance bottleneck is to hitch the values for this specific partition, A0. For this partition, A0, the value of shuffle, sort, and merge are much bigger than the opposite partitions. most are expecting the partition 0 to finish and hamper the execution of the entire query.

Our adaptive query execution can handle it alright within the data skew case. After executing the leaf stages (stages one and stage two), we will optimize our queries with a skew shuffle reader. Basically, it’ll split the skew partitions into smaller sub-partitions after we realize some shuffle partitions are too big.

Let us use same example to point out the way to resolve it using adaptive query execution. After realizing partitions are overlarge , AQE will add a skew reader to automatically split table A’s partition part 0 to 3 segments: split 0, split 1, and split 2. Then it’ll also duplicate another side for table B. Then we’ll have three copies for table B’s part 0.

After this step, we will parallelize the shuffle reading, sorting, merging for this split partition A0. we will avoid generating very big partition for the type merge join. Overall, it’ll be much faster.

Based on a terabyte of TPC-DS benchmark, without statistics, Spark 3.0 can make Q7 eight times faster and also achieve twice fast and speed up for Q5 and quite 1.1 speed up for an additional 26 queries. So this is often just the start . within the future releases, we’ll still improve the compiler and introduce more new adaptive rules.

Dynamic Partition Pruning

The second performance features i would like to spotlight is dynamic partition pruning. So this is often another runtime optimization rule. Basically, dynamic partition pruning is to avoid partition scanning supported the queried results of the opposite query fragments. it’s important for star schema queries. we will achieve a big speed up in TPC-DS queries.

So this is often variety , during a TPC-DS benchmark, 60 out of 102 queries show a big speed up between 2 times and 18 times. it’s to prune the partitions that joins read from the very fact table T1 by identifying those partitions that result from filtering the dimension table, T2.

Let us explain it step by step. First, we’ll do the filter down within the left side. And on the proper side, we will generate a replacement filter for the partition column PP because join P may be a partition column. Then we get the query results of the left side. we will reuse our query results and generate the lists of constant values, EPP, and filter result. Now, we will down the in filter within the right side. this may avoid scanning all the partitions of the large fact table, T1. For this instance , we will avoid scanning 90% of partitioning. With this dynamic partition pruning, we will achieve 33 times speed up.

 




 

JOIN Optimizer Hints

So the last performance feature is join hints. Join hints are quite common optimizer hints. It can influence the optimizer to settle on an expected join strategies. Previously, we have already got a broadcast hash join. during this release, we also add the hints for the opposite three join strategies: sort merge join, shuffle hash join, and therefore the shuffle nested loop join.

Please remember, this could be used very carefully. it’s difficult to manage over time because it’s sensitive to your workloads. If your workloads’ patterns aren’t stable, the hint could even make your query much slower.

Here are examples the way to use these hints within the SQL queries. you furthermore may can do an equivalent thing within the DataFrame API. once we decide the join strategies.

So a broadcast hash join requires one side to be small, no shuffle, no sort, so it performs in no time .
For the shuffle hash join, it must shuffle the info but no sort is required . So it can handle the massive tables but will still hit out of memory if the info is skewed.
Sort merge join is far more robust. It can handle any data size. It must shuffle and salt data slower in most cases when the table size is little compared with a broadcast hash join.
And also, shuffle nested loop join, it doesn’t require the join keys, unlike the opposite three join strategies.
Richer APIs: new features and simplify development
To enable new use cases and simplify the Spark application development, this release delivers a replacement capability and enhanced interesting features.

Pandas UDF

Let’s, first, mention Pandas UDF. this is often a reasonably popular performance features for the PySpark users.

So allow us to mention the history of UDF support in PySpark. within the first release of Python support, 2013, we already support Python lambda functions for RDD API. Ranging from Spark 2.0, Python UDF registration is session-based. then next year, users can register the utilization of Java UDF in Python API. In 2018, we introduced Pandas UDF. during this release, we redesigned the interface for Pandas UDF by using the Python tab hints and added more tabs for the Pandas UDFs.

To adjust our compatibility with the old Pandas UDFs from Apache Spark 2.0 with the Python 2.6 and above, Python [inaudible] like pandas.Series, Pandas DataFrame, cube hole, and therefore the iterator are often wont to impress new Pandas UDF types. for instance , in Spark 2.3, we’ve a Scala UDF. The input may be a pandas.Series and its output is additionally pandas.Series. In Spark 2.0, we don’t require users to recollect any UDF types. you only got to specify the input and therefore the output types. In Spark 2.3, we even have a Grouped Map Pandas UDF, so input may be a Pandas DataFrame, and therefore the output is additionally Pandas DataFrames.

Old vs New Pandas UDF interface

This slide shows the difference between the old and therefore the new interface. an equivalent here. The new interface also can be used for the prevailing Grouped Aggregate Pandas UDFs. additionally , the old Pandas UDF was split into two API categories: Pandas UDFs and Pandas function APIs. you’ll treat Pandas UDFs within the same way that you simply use the opposite PySpark column instance.

For example, here, calculate the values. you’re calling the Pandas UDF calculate. We do support the new Pandas UDF types from iterators of series to iterator other series and from iterators of multiple series to iterator of series. So this is often useful for [inaudible] state initialization of your Pandas UDFs and also useful for Pandas UDF parquet.

However, you’ll now use Pandas function APIs with this column instance. Here are these two examples: map Pandas function API and therefore the core , the map Pandas UDF, the APIs. These APIs are newly added in these units.