[scala] Task not serializable: java.io.NotSerializableException when calling function outside closure only on classes not objects

FYI in Spark 2.4 a lot of you will probably encounter this issue. Kryo serialization has gotten better but in many cases you cannot use spark.kryo.unsafe=true or the naive kryo serializer.

For a quick fix try changing the following in your Spark configuration

spark.kryo.unsafe="false"

OR

spark.serializer="org.apache.spark.serializer.JavaSerializer"

I modify custom RDD transformations that I encounter or personally write by using explicit broadcast variables and utilizing the new inbuilt twitter-chill api, converting them from rdd.map(row => to rdd.mapPartitions(partition => { functions.

Example

Old (not-great) Way

val sampleMap = Map("index1" -> 1234, "index2" -> 2345)
val outputRDD = rdd.map(row => {
    val value = sampleMap.get(row._1)
    value
})

Alternative (better) Way

import com.twitter.chill.MeatLocker
val sampleMap = Map("index1" -> 1234, "index2" -> 2345)
val brdSerSampleMap = spark.sparkContext.broadcast(MeatLocker(sampleMap))

rdd.mapPartitions(partition => {
    val deSerSampleMap = brdSerSampleMap.value.get
    partition.map(row => {
        val value = sampleMap.get(row._1)
        value
    }).toIterator
})

This new way will only call the broadcast variable once per partition which is better. You will still need to use Java Serialization if you do not register classes.

Examples related to scala

Intermediate language used in scalac? Why does calling sumr on a stream with 50 tuples not complete Select Specific Columns from Spark DataFrame Joining Spark dataframes on the key Provide schema while reading csv file as a dataframe how to filter out a null value from spark dataframe Fetching distinct values on a column using Spark DataFrame Can't push to the heroku Spark - Error "A master URL must be set in your configuration" when submitting an app Add jars to a Spark Job - spark-submit

Examples related to apache-spark

Select Specific Columns from Spark DataFrame Select columns in PySpark dataframe What is the difference between spark.sql.shuffle.partitions and spark.default.parallelism? How to find count of Null and Nan values for each column in a PySpark dataframe efficiently? Spark dataframe: collect () vs select () How does createOrReplaceTempView work in Spark? Spark difference between reduceByKey vs groupByKey vs aggregateByKey vs combineByKey Filter df when values matches part of a string in pyspark Filtering a pyspark dataframe using isin by exclusion Convert date from String to Date format in Dataframes

Examples related to serialization

laravel Unable to prepare route ... for serialization. Uses Closure TypeError: Object of type 'bytes' is not JSON serializable Best way to save a trained model in PyTorch? Convert Dictionary to JSON in Swift Java: JSON -> Protobuf & back conversion Understanding passport serialize deserialize How to generate serial version UID in Intellij Parcelable encountered IOException writing serializable object getactivity() Task not serializable: java.io.NotSerializableException when calling function outside closure only on classes not objects Cannot deserialize the JSON array (e.g. [1,2,3]) into type ' ' because type requires JSON object (e.g. {"name":"value"}) to deserialize correctly