Apache Spark: RDD Performance I

Apache Spark es un framework de programación distribuida que actualmente goza de mucha popularidad, tal y como explicábamos en artículos anteriores. En esta entrada hablaremos sobre las consideraciones generales a tener en cuenta a la hora de utilizar Apache Spark y best practices para mejorar el rendimiento. 

Best Practices

A continuación vamos a desarrollar cuatro conceptos claves para programar de manera más eficiente con Apache Spark. Si bien es cierto que existen muchos más, consideramos que esta introducción abarca conceptos claves para optimizar el rendimiento. Como nota aclarativa, los ejemplos de código que se han usado están escritos en Scala.

Evitar el uso de groupByKey

El ejemplo básico para introducirse a Apache Spark es el contador de palabras. A continuación vamos a analizar dos maneras de enfocar la solución:

val words = Seq("Green", "Green", "Blue", "Blue", "White")
val wordsRDD = ss.sparkContext.parallelize(words)
  .map({
    Word =>
      (Word, 1)
  })
//Ejemplo con reduceByKey
val counterWithReduce = wordsRDD
  .reduceByKey(_+_)
//Ejemplo con groupBKyey
val counterWithGroupBy = wordsRDD
  .groupByKey()
.map({
  case (word, listCounter) =>
    (word, listCounter.sum)
})

Las dos soluciones son perfectamente válidas, sin embargo, la opción que utiliza reduceByKey es más eficiente para RDDs de gran volumen. Esto sucede gracias a que se pueden combinar las salidas con la misma clave en cada partición antes de mezclar los datos.

En cada partición, se combinan los pares que contienen la misma clave haciendo uso de la función lambda antes de mezclarse. Posteriormente, se aplica de nuevo la función lambda al valor obtenido de cada partición y se llega al resultado final.
Por otro lado, la opción que utiliza groupByKey es menos eficiente, ya que todos los pares clave-valor se mezclan al principio. Para determinar qué máquina mezcla qué par, Spark llama a una función particionada en la clave del par. Cuando una máquina tiene más datos de los que puede almacenar en memoria, Spark los envía a disco. Por tanto, si una clave tiene más pares clave-valor de los que puede almacenar en memoria se producirá una excepción.

Podemos hacernos una idea de que cuánto mayor es el volumen de datos que estamos tratando, mayores serán estas diferencias. Por tanto es importante conocer de antemano la volumetría con la que vamos a trabajar, lo que ayudará a decantarse por una solución u otra.

Evitar el uso de reduceByKey cuando la entrada y la salida son de tipos diferentes:

Por ejemplo, si tenemos un RDD que contiene la tupla (palabra, sinónimo) y queremos conseguir un RDD con la tupla (palabra, lista de sinónimos), una opción sería adaptar la entrada para convertir sinónimo en una lista y luego aplicar reduceByKey:

val example = Seq(("abrazar", "ceñir"),("abrazar", "rodear"),
  ("navaja", "puñal"), ("navaja", "cuchillo"))
val exampleRDD = ss.sparkContext.parallelize(example)
  .map({
    case(word, synonimous) =>
      (word, Seq(synonimous))
  })
val exampleWithReduce = exampleRDD
  .reduceByKey(_++_)

Este código sería ineficiente porque se almacenaría una lista por cada registro. La manera adecuada de enfocar la solución es utilizar aggregateByKey:

val example = Seq(("abrazar", "ceñir"), ("abrazar", "rodear"),
  ("navaja", "puñal"), ("navaja", "cuchillo"))
val exampleRDD = ss.sparkContext.parallelize(example)
val zero = ListBuffer.empty[(String)]
val exampleWithAggregate = exampleRDD.aggregateByKey(zero)((list, v) => list += v,
  (list1, list2) => list1 ++= list2)
  .mapValues(_.toSeq)

No copiar cada uno de los elementos de un RDD de gran tamaño al Driver:

Cuando trabajamos con RDDs de gran tamaño hay que evitar realizar cierto tipo de operaciones que implican mover de memoria a disco, ya que podemos colapsar la máquina del driver. Por ejemplo la función collect:

val exampleWithWrongAction=exampleRDD.collect()

La función collect intentará copiar cada elemento del RDD en el programa del driver y, si nuestro RDD es pesado, a continuación se quedará sin memoria y se bloqueará. Para que no suceda, esto debemos intentar reducir el número de elementos del RDD con take o takesample, realizando filtrados o muestreando.
De manera similar, hay que prestar atención también a las funciones countByKey, countByValue y collectAsMap.

Si realmente necesitamos cada uno de los elementos del RDD, podemos copiar los datos a fichero o a base de datos. De esta manera evitaremos que se produzcan excepciones de falta de memoria.

Tratar de manera elegante las entradas de datos incorrectos:

Es común que cuando trabajamos con grandes volúmenes de datos, éstos vengan con un formato no esperado o malformados. Para que esto no suponga un problema, es necesario esgrimir un plan de actuación. Como primera aproximación, podemos utilizar la función filter para descartarlos. Si queremos ir un paso más allá y conocemos la manera de adaptarlos podemos hacerlo mediante la función map.

En próximas entregas haremos hincapié en otros aspectos a tener en cuenta así como resolución de errores típicos y optimizaciones necesarias. Cualquier duda o sugerencia no dudéis en dejarnos vuestros comentarios.