Hello. As an introduction, I want to tell you how I came to life like that.
Before meeting Big Data and Spark, in particular, I happened to optimize SQL queries a lot and often, first for MSSQL, then for Oracle, and now I ran into SparkSQL.
And if for the DBMS there are already a lot of good books describing the methodology and the “pens” that can be twisted to get the optimal query plan, then I haven’t met such books for Spark. There were more articles and sets of practices, and more related to work through the RDD / Dataset API, rather than pure SQL. For me, one of the reference books on SQL optimization is the book by J. Lewis “Oracle. Basics of cost optimization. I was looking for something similar in depth. Why did SparkSQL become the subject of research, and not the underlying API? Here the interest was caused by the peculiarities of the project I am working on.
For one of our customers, our company is developing a data warehouse, a detailed layer of which and part of the storefronts is located in the Hadoop cluster, and the final storefronts are in Oracle. This project involves an extensive data transformation layer that is implemented on Spark. To speed up the development and connectivity of ETL developers who are not familiar with the intricacies of Big Data technologies but are familiar with SQL and ETL tools, a tool was developed that resembles other ETL tools, for example, Informatica, and allows you to visually design ETL processes with subsequent generation code for Spark. Due to the complexity of algorithms and a large number of transformations, developers mainly use SparkSQL queries.
This is where the story begins, since we had to answer a large number of questions like "Why doesn’t the query work / run slowly / work differently than in Oracle?". The most interesting part for me was this one: “Why is it working slowly?”. Moreover, unlike the DBMS with which I worked before, you can get into the source code and get an answer to your questions.
Spark 2.3.0 is used to run examples and analyze source code.
It is assumed that the reader is familiar with the architecture of Spark, and the general principles of operation of the query optimizer for one of the DBMS. At a minimum, the phrase “query plan” should not be surprising.
Also, this article is trying not to become a translation of the Spark optimizer code into Russian, so for things that are very interesting from the point of view of the optimizer’s work, but which can be read in the source code, they will simply be briefly mentioned here with links to the corresponding classes.
Let's start with a small query to learn the basic stages through which it goes from parsing to execution.
scala> spark.read.orc("/user/test/balance").createOrReplaceTempView("bal") scala> spark.read.orc("/user/test/customer").createOrReplaceTempView("cust") scala> val df = spark.sql(""" | select bal.account_rk, cust.full_name | from bal | join cust | on bal.party_rk = cust.party_rk | and bal.actual_date = cust.actual_date | where bal.actual_date = cast('2017-12-31' as date) | """) df: org.apache.spark.sql.DataFrame = [account_rk: decimal(38,18), full_name: string] scala> df.explain(true)
The main module responsible for parsing SQL and optimizing the query execution plan is Spark Catalyst.
Extended output when describing a query plan (df.explain (true)) allows you to track all the stages that a query goes through:
== Parsed Logical Plan == 'Project ['bal.account_rk, 'cust.full_name] +- 'Filter ('bal.actual_date = cast(2017-12-31 as date)) +- 'Join Inner, (('bal.party_rk = 'cust.party_rk) && ('bal.actual_date = 'cust.actual_date)) :- 'UnresolvedRelation `bal` +- 'UnresolvedRelation `cust`
== Analyzed Logical Plan == account_rk: decimal(38,18), full_name: string Project [account_rk#1, full_name#59] +- Filter (actual_date#27 = cast(2017-12-31 as date)) +- Join Inner, ((party_rk#18 = party_rk#57) && (actual_date#27 = actual_date#88)) :- SubqueryAlias bal : +- Relation[ACTUAL_END_DATE#0,ACCOUNT_RK#1,... 4 more fields] orc +- SubqueryAlias cust +- Relation[ACTUAL_END_DATE#56,PARTY_RK#57... 9 more fields] orc
== Optimized Logical Plan == Project [account_rk#1, full_name#59] +- Join Inner, ((party_rk#18 = party_rk#57) && (actual_date#27 = actual_date#88)) :- Project [ACCOUNT_RK#1, PARTY_RK#18, ACTUAL_DATE#27] : +- Filter ((isnotnull(actual_date#27) && (actual_date#27 = 17531)) && isnotnull(party_rk#18)) : +- Relation[ACTUAL_END_DATE#0,ACCOUNT_RK#1,... 4 more fields] orc +- Project [PARTY_RK#57, FULL_NAME#59, ACTUAL_DATE#88] +- Filter ((isnotnull(actual_date#88) && isnotnull(party_rk#57)) && (actual_date#88 = 17531)) +- Relation[ACTUAL_END_DATE#56,PARTY_RK#57,... 9 more fields] orc
== Physical Plan == *(2) Project [account_rk#1, full_name#59] +- *(2) BroadcastHashJoin [party_rk#18, actual_date#27], [party_rk#57, actual_date#88], Inner, BuildRight :- *(2) Project [ACCOUNT_RK#1, PARTY_RK#18, ACTUAL_DATE#27] : +- *(2) Filter isnotnull(party_rk#18) : +- *(2) FileScan orc [ACCOUNT_RK#1,PARTY_RK#18,ACTUAL_DATE#27] Batched: false, Format: ORC, Location: InMemoryFileIndex[hdfs://cluster:8020/user/test/balance], PartitionCount: 1, PartitionFilters: [isnotnull(ACTUAL_DATE#27), (ACTUAL_DATE#27 = 17531)], PushedFilters: [IsNotNull(PARTY_RK)], ReadSchema: struct<ACCOUNT_RK:decimal(38,18),PARTY_RK:decimal(38,18)> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, decimal(38,18), true], input[2, date, true])) +- *(1) Project [PARTY_RK#57, FULL_NAME#59, ACTUAL_DATE#88] +- *(1) Filter isnotnull(party_rk#57) +- *(1) FileScan orc [PARTY_RK#57,FULL_NAME#59,ACTUAL_DATE#88] Batched: false, Format: ORC, Location: InMemoryFileIndex[hdfs://cluster:8020/user/test/customer], PartitionCount: 1, PartitionFilters: [isnotnull(ACTUAL_DATE#88), (ACTUAL_DATE#88 = 17531)], PushedFilters: [IsNotNull(PARTY_RK)], ReadSchema: struct<PARTY_RK:decimal(38,18),FULL_NAME:string>
The following optimization and execution stages (for example, WholeStageCodegen) are beyond the scope of this article, but in great detail (as well as the stages described above) are described in Mastering Spark Sql .
Reading the query execution plan usually occurs “from the inside” and “from the bottom up,” that is, the most nested parts are executed first, and gradually move to the final projection located at the top.
There are two types of query optimizers:
The former are focused on applying a set of fixed rules, for example, applying filtering conditions from where at earlier stages, if it is possible, predicting constants, etc.
The CBO optimizer for evaluating the quality of the obtained plan uses the cost function, which usually depends on the amount of data being processed, the number of lines that fall under the filters, the cost of performing certain operations.
You can get acquainted in detail with the design specification for CBO for Apache Spark by the links: specification and main JIRA task for implementation .
The starting point for exploring the full suite of existing optimizations is the Optimizer.scala code.
Here is a short excerpt from a long list of available optimizations:
def batches: Seq[Batch] = { val operatorOptimizationRuleSet = Seq( // Operator push down PushProjectionThroughUnion, ReorderJoin, EliminateOuterJoin, PushPredicateThroughJoin, PushDownPredicate, LimitPushDown, ColumnPruning, InferFiltersFromConstraints, // Operator combine CollapseRepartition, CollapseProject, CollapseWindow, CombineFilters, CombineLimits, CombineUnions, // Constant folding and strength reduction NullPropagation, ConstantPropagation, ........
It should be noted that the list of these optimizations includes both optimization based on rules and optimization based on an estimate of the cost of the request, which will be discussed below.
A feature of CBO is that in order to work correctly, it needs to know and store information on the statistics of the data used in the query - the number of records, the size of the record, the histograms of the distribution of data in the columns of the tables.
To collect statistics, a set of SQL commands ANALYZE TABLE ... COMPUTE STATISTICS is used, in addition, a set of tables is needed to store information, the API is provided through ExternalCatalog, more precisely through HiveExternalCatalog.
Since the CBO is currently disabled by default, the focus will be on exploring the available optimization and nuances of the RBO.
At the stage of forming the physical plan for the query, the join strategy is selected. The following options are currently available in Spark (you can start exploring code with code in SparkStrategies.scala).
The best option is if one of the parties to the join is small enough (the criterion of sufficiency is set by the spark.sql.autoBroadcastJoinThreshold parameter in SQLConf). In this case, this side is copied entirely to all executors, where the hash join with the main table occurs. In addition to size, you should take into account that in the case of outer join you can only copy the outer side, therefore, if possible, you should use the table with the largest amount of data as the leading table in the outer join case.
, , SQL Oracle, /*+ broadcast(t1, t2) */
With the default setting of spark.sql.join.preferSortMergeJoin, this method is applied by default if the keys for the join can be sorted.
Of the features, it can be noted that, in contrast to the previous method, optimization by code generation for performing an operation is available only for an inner join.
If the keys are not sorted, or the sort merge join selection is disabled by default, Catalyst tries to apply the shuffle hash join. In addition to checking for settings, it also checks that Spark has enough memory to build a local hash map for one partition (the total number of partitions is set by the spark.sql.shuffle.partitions setting)
In the case when there is no possibility of direct comparison by key (for example, a condition on like) or there are no keys to join the tables, depending on the size of the tables, either this type or CartesianProduct is selected.
In any case, to work with join, you need to shuffle tables by key. Therefore, at the moment, the order of specifying the tables, especially in the case of performing several join in a row, is important (if you are a bore, if the CBO is not included, and the JOIN_REORDER_ENABLED setting is not enabled).
If possible, the order of joining tables should minimize the number of shuffle operations for large tables, for which connections using the same key should be consistent. Also do not forget about the minimization of data for the join, to be able to enable Broadcast Hash Join.
Consider the following query:
select bal.account_rk, cust.full_name from balance bal join customer cust on bal.party_rk = cust.party_rk and bal.actual_date = cust.actual_date where bal.actual_date = cast('2017-12-31' as date)
Here we connect two tables that are partitioned the same way across the actual_date field and impose an explicit filter only on the partition on the balance table.
As can be seen from the optimized query plan, the filter by date also applies to the customer, and at the time of reading the data from the disk it is determined that exactly one partition is needed.
== Optimized Logical Plan == Project [account_rk#1, full_name#59] +- Join Inner, ((party_rk#18 = party_rk#57) && (actual_date#27 = actual_date#88)) :- Project [ACCOUNT_RK#1, PARTY_RK#18, ACTUAL_DATE#27] : +- Filter ((isnotnull(actual_date#27) && (actual_date#27 = 17531)) && isnotnull(party_rk#18)) : +- Relation[,... 4 more fields] orc +- Project [PARTY_RK#57, FULL_NAME#59, ACTUAL_DATE#88] +- Filter (((actual_date#88 = 17531) && isnotnull(actual_date#88)) && isnotnull(party_rk#57)) +- Relation[,... 9 more fields] orc
But it is only necessary to replace the inner join in the request on the left outer, as the push predicate for the customer table immediately falls off, and a full scan occurs, which is an undesirable effect.
== Optimized Logical Plan == Project [account_rk#1, full_name#59] +- Join LeftOuter, ((party_rk#18 = party_rk#57) && (actual_date#27 = actual_date#88)) :- Project [ACCOUNT_RK#1, PARTY_RK#18, ACTUAL_DATE#27] : +- Filter (isnotnull(actual_date#27) && (actual_date#27 = 17531)) : +- Relation[,... 4 more fields] orc +- Project [PARTY_RK#57, FULL_NAME#59, ACTUAL_DATE#88] +- Relation[,... 9 more fields] orc
Consider a simple example of selecting from a table with filtering by client type, in the scheme the type of the field party_type is string.
select party_rk, full_name from cust where actual_date = cast('2017-12-31' as date) and party_type = 101 -- -- and party_type = '101' --
And we compare the two resulting plans, the first - when we call on the incorrect type (there will be an implicit casting int to string), the second - when on the type corresponding to the scheme.
PushedFilters: [IsNotNull(PARTY_TYPE)] // . PushedFilters: [IsNotNull(PARTY_TYPE), EqualTo(PARTY_TYPE,101)] // .
A similar problem is observed for the case of comparing dates with a string, there will be a filter for string comparisons. Example:
where OPER_DATE = '2017-12-31' Filter (isnotnull(oper_date#0) && (cast(oper_date#0 as string) = 2017-12-31) PushedFilters: [IsNotNull(OPER_DATE)] where OPER_DATE = cast('2017-12-31' as date) PushedFilters: [IsNotNull(OPER_DATE), EqualTo(OPER_DATE,2017-12-31)]
For the case when implicit type casting is possible, for example, int -> decimal, the optimizer handles itself.
A lot of interesting information about the “handles” that can be used to fine tune the Catalyst, as well as the features (present and future) of the optimizer can be found in SQLConf.scala.
In particular, as can be seen by default, the cost optimizer is currently still disabled.
val CBO_ENABLED = buildConf("spark.sql.cbo.enabled") .doc("Enables CBO for estimation of plan statistics when set true.") .booleanConf .createWithDefault(false)
As well as the dependent optimizations associated with the reordering of the joines.
val JOIN_REORDER_ENABLED = buildConf("spark.sql.cbo.joinReorder.enabled") .doc("Enables join reorder in CBO.") .booleanConf .createWithDefault(false)
or
val STARSCHEMA_DETECTION = buildConf("spark.sql.cbo.starSchemaDetection") .doc("When true, it enables join reordering based on star schema detection. ") .booleanConf .createWithDefault(false)
It was possible to touch on only a small part of the existing optimizations, ahead are waiting for experiments with cost optimization, which can give much more room for query conversion. Also, another interesting question is comparing a set of optimizations when reading from Parquet and Orc files, judging by the jira project, it goes to parity, but is it really so?
Besides:
Source: https://habr.com/ru/post/417103/
All Articles