[scala] Fetching distinct values on a column using Spark DataFrame

Using Spark 1.6.1 version I need to fetch distinct values on a column and then perform some specific transformation on top of it. The column contains more than 50 million records and can grow larger.
I understand that doing a distinct.collect() will bring the call back to the driver program. Currently I am performing this task as below, is there a better approach?

 import sqlContext.implicits._
 preProcessedData.persist(StorageLevel.MEMORY_AND_DISK_2)

 preProcessedData.select(ApplicationId).distinct.collect().foreach(x => {
   val applicationId = x.getAs[String](ApplicationId)
   val selectedApplicationData = preProcessedData.filter($"$ApplicationId" === applicationId)
   // DO SOME TASK PER applicationId
 })

 preProcessedData.unpersist()  

The answer is


Well to obtain all different values in a Dataframe you can use distinct. As you can see in the documentation that method returns another DataFrame. After that you can create a UDF in order to transform each record.

For example:

val df = sc.parallelize(Array((1, 2), (3, 4), (1, 6))).toDF("age", "salary")

// I obtain all different values. If you show you must see only {1, 3}
val distinctValuesDF = df.select(df("age")).distinct

// Define your udf. In this case I defined a simple function, but they can get complicated.
val myTransformationUDF = udf(value => value / 10)

// Run that transformation "over" your DataFrame
val afterTransformationDF = distinctValuesDF.select(myTransformationUDF(col("age")))

This solution demonstrates how to transform data with Spark native functions which are better than UDFs. It also demonstrates how dropDuplicates which is more suitable than distinct for certain queries.

Suppose you have this DataFrame:

+-------+-------------+
|country|    continent|
+-------+-------------+
|  china|         asia|
| brazil|south america|
| france|       europe|
|  china|         asia|
+-------+-------------+

Here's how to take all the distinct countries and run a transformation:

df
  .select("country")
  .distinct
  .withColumn("country", concat(col("country"), lit(" is fun!")))
  .show()
+--------------+
|       country|
+--------------+
|brazil is fun!|
|france is fun!|
| china is fun!|
+--------------+

You can use dropDuplicates instead of distinct if you don't want to lose the continent information:

df
  .dropDuplicates("country")
  .withColumn("description", concat(col("country"), lit(" is a country in "), col("continent")))
  .show(false)
+-------+-------------+------------------------------------+
|country|continent    |description                         |
+-------+-------------+------------------------------------+
|brazil |south america|brazil is a country in south america|
|france |europe       |france is a country in europe       |
|china  |asia         |china is a country in asia          |
+-------+-------------+------------------------------------+

See here for more information about filtering DataFrames and here for more information on dropping duplicates.

Ultimately, you'll want to wrap your transformation logic in custom transformations that can be chained with the Dataset#transform method.


Examples related to scala

Intermediate language used in scalac? Why does calling sumr on a stream with 50 tuples not complete Select Specific Columns from Spark DataFrame Joining Spark dataframes on the key Provide schema while reading csv file as a dataframe how to filter out a null value from spark dataframe Fetching distinct values on a column using Spark DataFrame Can't push to the heroku Spark - Error "A master URL must be set in your configuration" when submitting an app Add jars to a Spark Job - spark-submit

Examples related to apache-spark

Select Specific Columns from Spark DataFrame Select columns in PySpark dataframe What is the difference between spark.sql.shuffle.partitions and spark.default.parallelism? How to find count of Null and Nan values for each column in a PySpark dataframe efficiently? Spark dataframe: collect () vs select () How does createOrReplaceTempView work in Spark? Spark difference between reduceByKey vs groupByKey vs aggregateByKey vs combineByKey Filter df when values matches part of a string in pyspark Filtering a pyspark dataframe using isin by exclusion Convert date from String to Date format in Dataframes

Examples related to dataframe

Trying to merge 2 dataframes but get ValueError How to show all of columns name on pandas dataframe? Python Pandas - Find difference between two data frames Pandas get the most frequent values of a column Display all dataframe columns in a Jupyter Python Notebook How to convert column with string type to int form in pyspark data frame? Display/Print one column from a DataFrame of Series in Pandas Binning column with python pandas Selection with .loc in python Set value to an entire column of a pandas dataframe

Examples related to apache-spark-sql

Select Specific Columns from Spark DataFrame Pyspark: Filter dataframe based on multiple conditions Select columns in PySpark dataframe What is the difference between spark.sql.shuffle.partitions and spark.default.parallelism? How to find count of Null and Nan values for each column in a PySpark dataframe efficiently? Spark dataframe: collect () vs select () How does createOrReplaceTempView work in Spark? Filter df when values matches part of a string in pyspark Convert date from String to Date format in Dataframes Take n rows from a spark dataframe and pass to toPandas()

Examples related to spark-dataframe

How does createOrReplaceTempView work in Spark? Take n rows from a spark dataframe and pass to toPandas() how to filter out a null value from spark dataframe Spark RDD to DataFrame python Split Spark Dataframe string column into multiple columns Pyspark: display a spark data frame in a table format Fetching distinct values on a column using Spark DataFrame Convert spark DataFrame column to python list How to import multiple csv files in a single load? Converting Pandas dataframe into Spark dataframe error