Apache Spark: RDD Performance II

Anteriormente iniciamos una serie de entradas relacionadas con las consideraciones generales a tener en cuenta a la hora de programar en Apache Spark. En esta entrada vamos a centrarnos en dos aspectos. Por un lado, comentaremos los errores más comunes que ocurren cuando ejecutamos algún programa en Apache Spark, y por otro, veremos la necesidad de elegir el número de particiones que debe tener un RDD.

Problemas típicos en Apache Spark

A continuación, vamos a explicar los cuatro tipos de errores más comunes. Los ejemplos utilizados en cada problema están escritos en Scala:

Job aborted due to stage failure: task not serializable

Es muy común encontrarse con este error en el log.

Exception in thread "main" org.apache.spark.SparkException: Task not serializable

Este error ocurre cuando inicializamos una variable en el driver (master), y a continuación queremos acceder desde uno de los esclavos. En este caso Spark trata de serializar el objeto de cara a enviarlo al esclavo, y fallará si el objeto es no serializable. Veamos el siguiente código:

class Impresora(){
  def imprimir(word: String): Unit ={
    println(word)
  }
}

En este caso tenemos una clase Impresora, que tiene una función que imprime lo que recibe como argumento.

    val words = Seq("Green", "Green", "Blue", "Blue", "White")
    val impresora = new Impresora
    val exampleRDD = ss.sparkContext.parallelize(words)
      .map({
        word =>
          impresora.imprimir(word)
          word
      })
    .collect()

A continuación, se llama dentro de la función lambda la clase instanciada impresora, creada fuera del map.
Este código fallará, por tanto para solucionarlo se proponen las siguientes ideas:

  • Hacer la clase Serializable mediante el seralizador de Java o registrandolas mediante Kryo
  • Instanciar la clase sólo en la función lambda pasada en el map
  • Convertir el objeto NotSerializable a estático y crear uno por máquina
  • Llamar a la función forEachPartition y entonces crear el objeto NotSerializable

Vamos a poner un ejemplo de como se resuelve con la segunda opción:

val words = Seq("Green", "Green", "Blue", "Blue", "White")
    val exampleRDD = ss.sparkContext.parallelize(words)
      .map({
        word =>
          val impresora = new Impresora
          impresora.imprimir(word)
          word
      })
    .collect()

Class cannot be found: dependencias no encontradas

Si utilizamos Maven como gestor de dependencias, por defecto no incluirá los jars cuando hacemos build. Cuando ejecutamos un job de Spark, si los esclavos no tienen los jars declarados en <dependencies> se lanzará un error class cannot be found.
De cara a solucionar de manera sencilla este error, la mejor opción es crear un uber jar, el cuál empaqueta todas las dependencias en un único jar. Para evitar redundancias innecesarias, marcaremos las dependencias que ya tengamos en cada esclavo, por ejemplo Apache Spark, como <scope>provided</scope>.

        <!--&lt;!&ndash;Spark dependencies&ndash;&gt;-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.2.0</version>
            <scope>provided</scope>
        </dependency>
        <!--&lt;!&ndash; https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.11 &ndash;&gt;-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.2.0</version>
            <scope>provided</scope>
        </dependency>

A continuación se muesta el plugin necesario para hacer el jar con las dependencias:

            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

Connection refused cuando ejecutamos start-all.sh

Localhost: ssh: connect to host localhost port 22: Connection refused
La manera de solucionarlo es activar “Remote Login” en la máquina. Se encuentra en System Preferences/Sharing.

Problemas de conectividad entre componentes de Spark

Dentro de los problemas de conectividad entre componentes de Spark existen varios warnings/error. Por ejemplo, podemos encontrarnos problemas si el master no es capaz de alcanzar a sus esclavos. O por otro lado los alcanza, pero no puede obtener respuesta por parte de ellos. En este caso, recomendamos seguir las siguientes recomendaciones sobre aspectos de la configuración:

  • Dirección exacta existente en el fichero de configuración
  • Asignar a la variable SPARK_LOCAL_IP una dirección adecuada para los procesos del driver, master y esclavos

Número de particiones adecuado para cada RDD

De cara a optimizar el procesado de RDDs, es importante conocer el número de particiones que tiene cada uno. Existen diferentes maneras de visualizar esta información, a continuación mostramos como se puede obtener más información en cada ejecución:

  • Usando el Spark User Interface

Utilizando la interfaz gráfica de Spark, que por defecto se encuentra en el puerto 4040 de la url del master de Spark, podemos visualizar cada una de las tareas que se ejecutan. Esta herramienta permite ver el detalle del número de particiones que se crean para cada tarea, así como las particiones cacheadas, en caso de que las hubiera, workers asignados para cada tarea/subtarea etc.

En próximas entregas seguiremos analizando más aspectos a tener en cuenta para optimizar los procesos desarrollados con Apache Spark, así como profundizar más en la herramienta Spark User Interface y planificadores de Spark. Cualquier duda, comentario o sugerencia por favor no dudéis en escribirnos.