Spark SQL: Catalyst Optimizer

In previous blog posts, we made an introduction to the Apache Spark distributed processing framework. The purpose of this entry is to present the advantages of using the Spark SQL Catalyst Optimizer.

Spark SQL is an Apache Spark module for structured data processing. One of the big differences with the Spark API RDD is that its interfaces provide additional information to perform more efficient processes. This information is also useful for Spark SQL to benefit internally from using its Catalyst optimizer and improve performance in data processing.

The biggest abstraction in the Spark SQL API is the DataFrame. In Spark, a DataFrame is a distributed collection of data organized in rows with the same scheme. Conceptually it is equivalent to a table in a relational database. DataFrames in Spark have the same capabilities as RDDs, such as immutability, in memory, resilient, distributed computing. In addition, it applies a structure called schema to the data. The biggest difference between DataFrames and RDDs is that DataFrames, having more information about the structure of the data, allows a higher level of abstraction and optimization techniques that would not be possible with RDDs. From the version of Spark 2.0, the DataFrames and Datasets were merged into a single API, thus unifying their data processing capabilities through their libraries.

What is Catalyst

Spark SQL was designed with an optimizer called Catalyst based on the functional programming of Scala. Its two main purposes are: first, to add new optimization techniques to solve some problems with “big data” and second, to allow developers to expand and customize the functions of the optimizer.

Catalyst Spark SQL architecture and Catalyst optimizer integration

Catalyst components

Los componentes principales del optimizador de Catalyst son los siguientes:

The main components of the Catalyst optimizer are as follows:

Trees

The main data type in Catalyst is the tree. Each tree is composed of nodes, and each node has a nodetype and zero or more children. These objects are immutable and can be manipulated with functional language.

As an example, let me show you the use of the following nodes:

Merge(Attribute(x), Merge(Literal(1), Literal(2))

Where:

  • Literal(value: Int): a constant value
  • Attribute(name: String): an attribute as input row
  • Merge(left: TreeNode, right: TreeNode): mix of two expressions

Rules

Trees can be manipulated using rules, which are functions of a tree to another tree. The transformation method applies the pattern matching function recursively on all nodes of the tree transforming each pattern to the result. Below there’s an example of a rule applied to a tree.

tree.transform {
 case Merge(Literal(c1), Literal(c2)) => Literal(c1) + Literal(c2)
}

Using Catalyst in Spark SQL

The Catalyst Optimizer in Spark offers rule-based and cost-based optimization. Rule-based optimization indicates how to execute the query from a set of defined rules. Meanwhile, cost-based optimization generates multiple execution plans and compares them to choose the lowest cost one.

Phases

The four phases of the transformation that Catalyst performs are as follows:

1. Analysis

The first phase of Spark SQL optimization is the analysis. Spark SQL starts with a relationship to be processed that can be in two ways. A serious form from an AST (abstract syntax tree) returned by an SQL parser, and on the other hand from a DataFrame object of the Spark SQL API.

2. Logic Optimization Plan

The second phase is the logical optimization plan. In this phase, rule-based optimization is applied to the logical plan. It is possible to easily add new rules.

3. Physical plan

In the physical plan phase, Spark SQL takes the logical plan and generates one or more physical plans using the physical operators that match the Spark execution engine. The plan to be executed is selected using the cost-based model (comparison between model costs).

4. Code generation

Code generation is the final phase of optimizing Spark SQL. To run on each machine, it is necessary to generate Java code bytecode.

Phases of the query plan in Spark SQL. Rounded squares represent the Catalyst trees

 

Example

The Catalyst optimizer is enabled by default as of Spark 2.0, and contains optimizations to manipulate datasets. Below is an example of the plan generated for a query of a Dataset from the Spala SQL API of Scala:

// Business object
case class Persona(id: String, nombre: String, edad: Int)
// The dataset to query
val peopleDataset  = Seq(
      Persona("001", "Bob", 28),
      Persona("002", "Joe", 34)).toDS
// The query to execute
val query = peopleDataset.groupBy("nombre").count().as("total")
// Get Catalyst optimization plan
query.explain(extended = true)

As a result, the detailed plan for the consultation is obtained:

== Analyzed Logical Plan ==
nombre: string, count: bigint
SubqueryAlias total
+- Aggregate [nombre#4], [nombre#4, count(1) AS count#11L]
   +- LocalRelation [id#3, nombre#4, edad#5]
== Optimized Logical Plan ==
Aggregate [nombre#4], [nombre#4, count(1) AS count#11L]
+- LocalRelation [nombre#4]
== Physical Plan ==
*(2) HashAggregate(keys=[nombre#4], functions=[count(1)], output=[nombre#4, count#11L])
+- Exchange hashpartitioning(nombre#4, 200)
   +- *(1) HashAggregate(keys=[nombre#4], functions=[partial_count(1)], output=[nombre#4, count#17L])
      +- LocalTableScan [nombre#4]

Conclusions

The Spark SQL Catalyst Optimizer improves developer productivity and the performance of their written queries. Catalyst automatically transforms relational queries to execute them more efficiently using techniques such as filtering, indexes and ensuring that data source joins are performed in the most efficient order. In addition, its design allows the Spark community to implement and extend the optimizer with new features.

Sources:

Spark SQL: Relational Data Processing in Spark

https://people.csail.mit.edu/matei/papers/2015/sigmod_spark_sql.pdf