En entradas anteriores del blog, se realizó una introducción al framework de procesamiento distribuido Apache Spark. El objetivo de esta entrada es introducir las ventajas del uso del optimizador Catalyst de Spark SQL.
Spark SQL es un módulo de Apache Spark para el procesamiento de datos estructurados. Una de las grandes diferencias respecto a la Spark API RDD es que sus interfaces proporcionan información adicional para realizar procesos más eficientes. Esta información además es útil para que Spark SQL se beneficie internamente del uso de su optimizador Catalyst y mejorar el rendimiento en el procesamiento de datos.
La mayor abstracción en la API de Spark SQL es el DataFrame. En Spark, un DataFrame es una colección distribuida de datos organizada en filas con el mismo esquema. Conceptualmente es equivalente a una tabla en una base de datos relacional. Los DataFrames en Spark tienen las mismas capacidades que los RDDs, como por ejemplo su inmutabilidad, en memoria, resilientes, computación distribuida. Además, le aplica una estructura llamada esquema a los datos. La mayor diferencia entre los DataFrames y los RDDs es que los DataFrames, al tener más información sobre la estructura de los datos, permite un mayor nivel de abstracción y técnicas de optimización que no serían posibles con los RDDs. A partir de la versión de Spark 2.0, se fusionaron en una sola API los DataFrames y Datasets, unificando así sus capacidades en el procesamiento de datos a través de sus librerías.
Qué es Catalyst
Spark SQL fue diseñado con un optimizador llamado Catalyst basado en la programación funcional de Scala. Sus dos propósitos principales son: primero, añadir nuevas técnicas de optimización para resolver algunos problemas con “big data” y segundo, permitir a los desarrolladores ampliar y personalizarnm las funciones del optimizador.
Arquitectura Spark SQL y la integración del optimizador Catalyst
Componentes Catalyst
Los componentes principales del optimizador de Catalyst son los siguientes:
Trees
El tipo de datos principal en Catalyst son los tree. Cada tree está compuesto por nodes, y cada node tiene un nodetype y cero o más hijos. Estos objetos son inmutables y pueden ser manipulados con lenguaje funcional.
Como ejemplo se muestra el uso de los siguientes nodes:
Merge(Attribute(x), Merge(Literal(1), Literal(2))
Donde:
- Literal(value: Int): un valor constante
- Attribute(name: String): un atributo como fila de entrada
- Merge(left: TreeNode, right: TreeNode): mezcla de dos expresiones
Rules
Los trees pueden ser manipulados usando reglas, que son funciones de un tree a otro tree. El método de transformación aplica la función pattern matching recursivamente en todos los nodos del tree transformando cada patrón al resultado. A continuación se muestra un ejemplo de regla aplicada a un tree.
tree.transform { case Merge(Literal(c1), Literal(c2)) => Literal(c1) + Literal(c2) }
Usando Catalyst en Spark SQL
El optimizador de Catalyst en Spark ofrece una optimización basada en reglas y basada en costes. La optimización basada en reglas indica cómo ejecutar la consulta a partir de un conjunto de reglas definidas. Mientras tanto, la optimización basada en costes genera múltiples planes de ejecución y los compara para elegir el de menor coste.
Fases
Las cuatro fases de la transformación que realiza Catalyst son las siguientes:
1. Análisis
La primera fase de la optimización de Spark SQL es el análisis. Spark SQL se empieza con una relación a ser procesada que puede ser de dos formas. Una forma seria desde un AST (abstract syntax tree) retornado por un parseador SQL, y por otra otra parte a partir de un objeto DataFrame de la API de Spark SQL.
2. Plan lógico de optimización
La segunda fase es el plan lógico de optimización. En esta fase se aplica la optimización basada en reglas al plan lógico. Es posible añadir nuevas reglas de forma sencilla.
3. Plan físico
En la fase del plan físico, Spark SQL toma el plan lógico y genera uno o más planes físicos usando los operadores físicos que coinciden con el motor de ejecución de Spark. El plan a ejecutarse se selecciona usando el modelo basado en costes (comparación entre costes de los modelos).
4. Generación de código
La generación de código es la fase final de la optimización de Spark SQL. Para ejecutar en cada máquina, es necesario la generación de código Java bytecode.
Fases del plan de consultas en Spark SQL. Los cuadrados redondeados representan los Catalyst trees
Ejemplo
El optimizador de Catalyst se encuentra habilitado por defecto a partir de Spark 2.0, y contiene optimizaciones para manipular Datasets. A continuación, se muestra un ejemplo del plan generado para una consulta de un Dataset a partir de la Spark SQL API de Scala:
// Business object case class Persona(id: String, nombre: String, edad: Int) // The dataset to query val peopleDataset = Seq( Persona("001", "Bob", 28), Persona("002", "Joe", 34)).toDS // The query to execute val query = peopleDataset.groupBy("nombre").count().as("total") // Get Catalyst optimization plan query.explain(extended = true)
Como resultado se obtiene el plan detallado para la consulta:
== Analyzed Logical Plan == nombre: string, count: bigint SubqueryAlias total +- Aggregate [nombre#4], [nombre#4, count(1) AS count#11L] +- LocalRelation [id#3, nombre#4, edad#5] == Optimized Logical Plan == Aggregate [nombre#4], [nombre#4, count(1) AS count#11L] +- LocalRelation [nombre#4] == Physical Plan == *(2) HashAggregate(keys=[nombre#4], functions=[count(1)], output=[nombre#4, count#11L]) +- Exchange hashpartitioning(nombre#4, 200) +- *(1) HashAggregate(keys=[nombre#4], functions=[partial_count(1)], output=[nombre#4, count#17L]) +- LocalTableScan [nombre#4]
Conclusiones
El optimizador Catalyst de Spark SQL mejora la productividad de los desarrolladores y el rendimiento de las consultas que escriben. Catalyst transforma automáticamente las consultas relacionales para ejecutarlas más eficientemente usando técnicas como filtrados, índices y asegurando que los joins de los orígenes de datos se realizan en el orden más eficiente. Además, su diseño permite a la comunidad de Spark implementar y extender el optimizador con nuevas funcionalidades.
Fuentes:
Spark SQL: Relational Data Processing in Spark
https://people.csail.mit.edu/matei/papers/2015/sigmod_spark_sql.pdf
En BI Geek, empresa tecnológica centrada en Business Intelligence, Big Data & IA, apostamos por un nuevo modelo de consultoría orientado a hacer accesibles las soluciones informacionales y de tratamiento de datos para cualquier tipo de empresa.