Tuesday, April 25, 2017

Spark and serialization

One of the trickiest issues for Spark developers is serialization. I'm going to provide a checklist of things to think about when your Spark job doesn't work as expected. This article is directed at Scala programmers, although the principles are the same of course for Java developers (with the exception that there are no Case Classes in Java).

Serialization of functions

This is the one that newbies run into pretty quickly. Any function that you pass to one of Spark's higher-order functions (such as the map method of RDD) must be serializable. Once you think about it, it's pretty obvious but when you're new to Spark, it may not be so clear. All of the real work of Spark is performed on the executors, and so any such function has to be sent "over the wire" to the executors. Hence, the function must be serializable. Functions will be serializable in any of the following three situations:

  • the function is a "pure" function, for example one that is created as an anonymous function (i.e. a lambda);
  • the function is a partially applied method (that's to say the method has undergone the 𝜂 (eta) transformation whereby the method is followed by the underscore character) where the method is part of:
    • an object
    • a serializable class
Note what's missing: the function derives from a method belonging to a non-serializable class. This is because an instance function (i.e. belonging to a class rather than an object) has an (implicit) parameter for this, where this is the appropriate instance of the class.

UDFs and Broadcasts

Don't forget, too, that anything object which is passed as a parameter into a class representing a UDF (user-defined function), or anything which you broadcast, must also be serializable.

Abstract Classes

Any class whose instances must be serialized (including, as described above, classes where the methods are used as functions) needs to be marked Serializable. But: even if you mark your concrete class as Serializable, you may still run into a problem with serialization. In such a case, you must ensure that any abstract classes which the class extends are also marked Serializable.

Case Classes

Case classes (and case objects) in Scala are inherently serializable. This is a good thing but it can have unforeseen consequences if your case class is mutable. Let's say you create a case class which has a mutable field (or variable). Perhaps there is a method which can be called to initialize that field or variable. When you run it in the driver, everything looks good. But, if that object needs to be serialized, then the initialization method won't be called after deserialization. If you had had to explicitly mark the case class as serializable, you might have thought about this problem beforehand.

Preemptive serialization checking

If you want to check that objects/functions are serializable before you get too far into your Spark job -- or perhaps you are performing a functional test, then you can use the following Spark environment variable:

SparkEnv.get.closureSerializer

(this is the same serializer that Spark will use). Here's a handy case class/method which will validate serialization:

case class Serialization(serializer: Serializer) {
  def validate[T : ClassTag](obj: T): Boolean = {
    val serializerInstance = serializer.newInstance()
    val result: T = serializerInstance.deserialize(serializerInstance.serialize(obj))
    result==obj
  }
}

Just pass the result of  invoking SparkEnv.get.closureSerializer into the Serialization class and run the validate method on anything you want to check. If it returns true, then all is well. If it returns false, then the deserialized version isn't the same as the original (e.g. you tried to serialize a mutable class as described above). If it throws an exception, then something in the serialization stack (essentially the relevant class hierarchy) isn't marked Serializable when it should be.