Spark SQL: Optimizador Catalyst

En entradas anteriores del blog, se realizó una introducción al framework de procesamiento distribuido Apache Spark. El objetivo de esta entrada es introducir las ventajas del uso del optimizador Catalyst de Spark SQL.

Spark SQL es un módulo de Apache Spark para el procesamiento de datos estructurados. Una de las grandes diferencias respecto a la Spark API RDD es que sus interfaces proporcionan información adicional para realizar procesos más eficientes. Esta información además es útil para que Spark SQL se beneficie internamente del uso de su optimizador Catalyst y mejorar el rendimiento en el procesamiento de datos.

La mayor abstracción en la API de Spark SQL es el DataFrame. En Spark, un DataFrame es una colección distribuida de datos organizada en filas con el mismo esquema. Conceptualmente es equivalente a una tabla en una base de datos relacional. Los DataFrames en Spark tienen las mismas capacidades que los RDDs, como por ejemplo su inmutabilidad, en memoria, resilientes, computación distribuida. Además, le aplica una estructura llamada esquema a los datos. La mayor diferencia entre los DataFrames y los RDDs es que los DataFrames, al tener más información sobre la estructura de los datos, permite un mayor nivel de abstracción y técnicas de optimización que no serían posibles con los RDDs. A partir de la versión de Spark 2.0, se fusionaron en una sola API los DataFrames y Datasets, unificando así sus capacidades en el procesamiento de datos a través de sus librerías.

Qué es Catalyst

Spark SQL fue diseñado con un optimizador llamado Catalyst basado en la programación funcional de Scala. Sus dos propósitos principales son: primero, añadir nuevas técnicas de optimización para resolver algunos problemas con “big data” y segundo, permitir a los desarrolladores ampliar y personalizarnm las funciones del optimizador.

Arquitectura Spark SQL y la integración  del optimizador Catalyst

Componentes Catalyst

Los componentes principales del optimizador de Catalyst son los siguientes:

Trees

El tipo de datos principal en Catalyst son los tree. Cada tree está compuesto por nodes, y cada node tiene un nodetype y cero o más hijos. Estos objetos son inmutables y pueden ser manipulados con lenguaje funcional.
Como ejemplo se muestra el uso de los siguientes nodes:

Merge(Attribute(x), Merge(Literal(1), Literal(2))

Donde:

  • Literal(value: Int): un valor constante
  • Attribute(name: String): un atributo como fila de entrada
  • Merge(left: TreeNode, right: TreeNode): mezcla de dos expresiones

Rules

Los trees pueden ser manipulados usando reglas, que son funciones de un tree a otro tree. El método de transformación aplica la función pattern matching recursivamente en todos los nodos del tree transformando cada patrón al resultado.  A continuación se muestra un ejemplo de regla aplicada a un tree.

tree.transform {
 case Merge(Literal(c1), Literal(c2)) => Literal(c1) + Literal(c2)
}

Usando Catalyst en Spark SQL

El optimizador de Catalyst en Spark ofrece una optimización basada en reglas y basada en costes. La optimización basada en reglas indica cómo ejecutar la consulta a partir de un conjunto de reglas definidas. Mientras tanto, la optimización basada en costes genera múltiples planes de ejecución y los compara para elegir el de menor coste.

Fases

Las cuatro fases de la transformación que realiza Catalyst son las siguientes:

1. Análisis

La primera fase de la optimización de Spark SQL es el análisis. Spark SQL se empieza con una relación a ser procesada que puede ser de dos formas.  Una forma seria desde un AST (abstract syntax tree) retornado por un parseador SQL, y por otra otra parte a partir de un objeto DataFrame de la API de Spark SQL.

2. Plan lógico de optimización

La segunda fase es el plan lógico de optimización. En esta fase se aplica la optimización basada en reglas al plan lógico. Es posible añadir nuevas reglas de forma sencilla.

3. Plan físico

En la fase del plan físico, Spark SQL toma el plan lógico y genera uno o más planes físicos usando los operadores físicos que coinciden con el motor de ejecución de Spark. El plan a ejecutarse se selecciona usando el modelo basado en costes (comparación entre costes de los modelos).

4. Generación de código

La generación de código es la fase final de la optimización de Spark SQL. Para ejecutar en cada máquina, es necesario la generación de código Java bytecode.

Fases del plan de consultas en Spark SQL. Los cuadrados redondeados representan los Catalyst trees

 

Ejemplo

El optimizador de Catalyst se encuentra habilitado por defecto a partir de Spark 2.0, y contiene optimizaciones para manipular Datasets.  A continuación, se muestra un ejemplo del plan generado para una consulta de un Dataset a partir de la Spark SQL API de Scala:

// Business object
case class Persona(id: String, nombre: String, edad: Int)
// The dataset to query
val peopleDataset  = Seq(
      Persona("001", "Bob", 28),
      Persona("002", "Joe", 34)).toDS
// The query to execute
val query = peopleDataset.groupBy("nombre").count().as("total")
// Get Catalyst optimization plan
query.explain(extended = true)

Como resultado se obtiene el plan detallado para la consulta:

== Analyzed Logical Plan ==
nombre: string, count: bigint
SubqueryAlias total
+- Aggregate [nombre#4], [nombre#4, count(1) AS count#11L]
   +- LocalRelation [id#3, nombre#4, edad#5]
== Optimized Logical Plan ==
Aggregate [nombre#4], [nombre#4, count(1) AS count#11L]
+- LocalRelation [nombre#4]
== Physical Plan ==
*(2) HashAggregate(keys=[nombre#4], functions=[count(1)], output=[nombre#4, count#11L])
+- Exchange hashpartitioning(nombre#4, 200)
   +- *(1) HashAggregate(keys=[nombre#4], functions=[partial_count(1)], output=[nombre#4, count#17L])
      +- LocalTableScan [nombre#4]

Conclusiones

El optimizador Catalyst de Spark SQL mejora la productividad de los desarrolladores y el rendimiento de las consultas que escriben. Catalyst transforma automáticamente las consultas relacionales para ejecutarlas más eficientemente usando técnicas como filtrados, índices y asegurando que los joins de los orígenes de datos se realizan en el orden más eficiente. Además, su diseño permite a la comunidad de Spark implementar y extender el optimizador con nuevas funcionalidades.

Fuentes:

Spark SQL: Relational Data Processing in Spark

https://people.csail.mit.edu/matei/papers/2015/sigmod_spark_sql.pdf