Getting strange behavior when calling function outside of a closure:
Task not serializable: java.io.NotSerializableException: testing
The problem is I need my code in a class and not an object. Any idea why this is happening? Is a Scala object serialized (default?)?
This is a working code example:
object working extends App {
val list = List(1,2,3)
val rddList = Spark.ctx.parallelize(list)
//calling function outside closure
val after = rddList.map(someFunc(_))
def someFunc(a:Int) = a+1
after.collect().map(println(_))
}
This is the non-working example :
object NOTworking extends App {
new testing().doIT
}
//adding extends Serializable wont help
class testing {
val list = List(1,2,3)
val rddList = Spark.ctx.parallelize(list)
def doIT = {
//again calling the fucntion someFunc
val after = rddList.map(someFunc(_))
//this will crash (spark lazy)
after.collect().map(println(_))
}
def someFunc(a:Int) = a+1
}
This question is related to
scala
apache-spark
serialization
Complete talk fully explaining the problem, which proposes a great paradigm shifting way to avoid these serialization problems: https://github.com/samthebest/dump/blob/master/sams-scala-tutorial/serialization-exceptions-and-memory-leaks-no-ws.md
The top voted answer is basically suggesting throwing away an entire language feature - that is no longer using methods and only using functions. Indeed in functional programming methods in classes should be avoided, but turning them into functions isn't solving the design issue here (see above link).
As a quick fix in this particular situation you could just use the @transient
annotation to tell it not to try to serialise the offending value (here, Spark.ctx
is a custom class not Spark's one following OP's naming):
@transient
val rddList = Spark.ctx.parallelize(list)
You can also restructure code so that rddList lives somewhere else, but that is also nasty.
In future Scala will include these things called "spores" that should allow us to fine grain control what does and does not exactly get pulled in by a closure. Furthermore this should turn all mistakes of accidentally pulling in non-serializable types (or any unwanted values) into compile errors rather than now which is horrible runtime exceptions / memory leaks.
http://docs.scala-lang.org/sips/pending/spores.html
When using kyro, make it so that registration is necessary, this will mean you get errors instead of memory leaks:
"Finally, I know that kryo has kryo.setRegistrationOptional(true) but I am having a very difficult time trying to figure out how to use it. When this option is turned on, kryo still seems to throw exceptions if I haven't registered classes."
Strategy for registering classes with kryo
Of course this only gives you type-level control not value-level control.
... more ideas to come.
def upper(name: String) : String = {
var uppper : String = name.toUpperCase()
uppper
}
val toUpperName = udf {(EmpName: String) => upper(EmpName)}
val emp_details = """[{"id": "1","name": "James Butt","country": "USA"},
{"id": "2", "name": "Josephine Darakjy","country": "USA"},
{"id": "3", "name": "Art Venere","country": "USA"},
{"id": "4", "name": "Lenna Paprocki","country": "USA"},
{"id": "5", "name": "Donette Foller","country": "USA"},
{"id": "6", "name": "Leota Dilliard","country": "USA"}]"""
val df_emp = spark.read.json(Seq(emp_details).toDS())
val df_name=df_emp.select($"id",$"name")
val df_upperName= df_name.withColumn("name",toUpperName($"name")).filter("id='5'")
display(df_upperName)
this will give error org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
Solution -
import java.io.Serializable;
object obj_upper extends Serializable {
def upper(name: String) : String =
{
var uppper : String = name.toUpperCase()
uppper
}
val toUpperName = udf {(EmpName: String) => upper(EmpName)}
}
val df_upperName=
df_name.withColumn("name",obj_upper.toUpperName($"name")).filter("id='5'")
display(df_upperName)
FYI in Spark 2.4 a lot of you will probably encounter this issue. Kryo serialization has gotten better but in many cases you cannot use spark.kryo.unsafe=true or the naive kryo serializer.
spark.kryo.unsafe="false"
OR
spark.serializer="org.apache.spark.serializer.JavaSerializer"
I modify custom RDD transformations that I encounter or personally write by using explicit broadcast variables and utilizing the new inbuilt twitter-chill api, converting them from rdd.map(row =>
to rdd.mapPartitions(partition => {
functions.
val sampleMap = Map("index1" -> 1234, "index2" -> 2345)
val outputRDD = rdd.map(row => {
val value = sampleMap.get(row._1)
value
})
import com.twitter.chill.MeatLocker
val sampleMap = Map("index1" -> 1234, "index2" -> 2345)
val brdSerSampleMap = spark.sparkContext.broadcast(MeatLocker(sampleMap))
rdd.mapPartitions(partition => {
val deSerSampleMap = brdSerSampleMap.value.get
partition.map(row => {
val value = sampleMap.get(row._1)
value
}).toIterator
})
This new way will only call the broadcast variable once per partition which is better. You will still need to use Java Serialization if you do not register classes.
I solved this problem using a different approach. You simply need to serialize the objects before passing through the closure, and de-serialize afterwards. This approach just works, even if your classes aren't Serializable, because it uses Kryo behind the scenes. All you need is some curry. ;)
Here's an example of how I did it:
def genMapper(kryoWrapper: KryoSerializationWrapper[(Foo => Bar)])
(foo: Foo) : Bar = {
kryoWrapper.value.apply(foo)
}
val mapper = genMapper(KryoSerializationWrapper(new Blah(abc))) _
rdd.flatMap(mapper).collectAsMap()
object Blah(abc: ABC) extends (Foo => Bar) {
def apply(foo: Foo) : Bar = { //This is the real function }
}
Feel free to make Blah as complicated as you want, class, companion object, nested classes, references to multiple 3rd party libs.
KryoSerializationWrapper refers to: https://github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala
I had a similar experience.
The error was triggered when I initialize a variable on the driver (master), but then tried to use it on one of the workers. When that happens, Spark Streaming will try to serialize the object to send it over to the worker, and fail if the object is not serializable.
I solved the error by making the variable static.
Previous non-working code
private final PhoneNumberUtil phoneUtil = PhoneNumberUtil.getInstance();
Working code
private static final PhoneNumberUtil phoneUtil = PhoneNumberUtil.getInstance();
Credits:
I faced similar issue, and what I understand from Grega's answer is
object NOTworking extends App {
new testing().doIT
}
//adding extends Serializable wont help
class testing {
val list = List(1,2,3)
val rddList = Spark.ctx.parallelize(list)
def doIT = {
//again calling the fucntion someFunc
val after = rddList.map(someFunc(_))
//this will crash (spark lazy)
after.collect().map(println(_))
}
def someFunc(a:Int) = a+1
}
your doIT method is trying to serialize someFunc(_) method, but as method are not serializable, it tries to serialize class testing which is again not serializable.
So make your code work, you should define someFunc inside doIT method. For example:
def doIT = {
def someFunc(a:Int) = a+1
//function definition
}
val after = rddList.map(someFunc(_))
after.collect().map(println(_))
}
And if there are multiple functions coming into picture, then all those functions should be available to the parent context.
I'm not entirely certain that this applies to Scala but, in Java, I solved the NotSerializableException
by refactoring my code so that the closure did not access a non-serializable final
field.
Grega's answer is great in explaining why the original code does not work and two ways to fix the issue. However, this solution is not very flexible; consider the case where your closure includes a method call on a non-Serializable
class that you have no control over. You can neither add the Serializable
tag to this class nor change the underlying implementation to change the method into a function.
Nilesh presents a great workaround for this, but the solution can be made both more concise and general:
def genMapper[A, B](f: A => B): A => B = {
val locker = com.twitter.chill.MeatLocker(f)
x => locker.get.apply(x)
}
This function-serializer can then be used to automatically wrap closures and method calls:
rdd map genMapper(someFunc)
This technique also has the benefit of not requiring the additional Shark dependencies in order to access KryoSerializationWrapper
, since Twitter's Chill is already pulled in by core Spark
Source: Stackoverflow.com