[scala] Extract column values of Dataframe as List in Apache Spark

I want to convert a string column of a data frame to a list. What I can find from the Dataframe API is RDD, so I tried converting it back to RDD first, and then apply toArray function to the RDD. In this case, the length and SQL work just fine. However, the result I got from RDD has square brackets around every element like this [A00001]. I was wondering if there's an appropriate way to convert a column to a list or a way to remove the square brackets.

Any suggestions would be appreciated. Thank you!

This question is related to scala apache-spark apache-spark-sql

The answer is


This should return the collection containing single list:

dataFrame.select("YOUR_COLUMN_NAME").rdd.map(r => r(0)).collect()

Without the mapping, you just get a Row object, which contains every column from the database.

Keep in mind that this will probably get you a list of Any type. Ïf you want to specify the result type, you can use .asInstanceOf[YOUR_TYPE] in r => r(0).asInstanceOf[YOUR_TYPE] mapping

P.S. due to automatic conversion you can skip the .rdd part.


An updated solution that gets you a list:

dataFrame.select("YOUR_COLUMN_NAME").map(r => r.getString(0)).collect.toList

I know the answer given and asked for is assumed for Scala, so I am just providing a little snippet of Python code in case a PySpark user is curious. The syntax is similar to the given answer, but to properly pop the list out I actually have to reference the column name a second time in the mapping function and I do not need the select statement.

i.e. A DataFrame, containing a column named "Raw"

To get each row value in "Raw" combined as a list where each entry is a row value from "Raw" I simply use:

MyDataFrame.rdd.map(lambda x: x.Raw).collect()

In Scala and Spark 2+, try this (assuming your column name is "s"): df.select('s).as[String].collect


This is java answer.

df.select("id").collectAsList();

List<String> whatever_list = df.toJavaRDD().map(new Function<Row, String>() {
    public String call(Row row) {
        return row.getAs("column_name").toString();
    }
}).collect();

logger.info(String.format("list is %s",whatever_list)); //verification

Since no one has given any solution in java(Real Programming Language) Can thank me later


Below is for Python-

df.select("col_name").rdd.flatMap(lambda x: x).collect()

With Spark 2.x and Scala 2.11

I'd think of 3 possible ways to convert values of a specific column to List.

Common code snippets for all the approaches

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder.getOrCreate    
import spark.implicits._ // for .toDF() method

val df = Seq(
    ("first", 2.0),
    ("test", 1.5), 
    ("choose", 8.0)
  ).toDF("id", "val")

Approach 1

df.select("id").collect().map(_(0)).toList
// res9: List[Any] = List(one, two, three)

What happens now? We are collecting data to Driver with collect() and picking element zero from each record.

This could not be an excellent way of doing it, Let's improve it with next approach.


Approach 2

df.select("id").rdd.map(r => r(0)).collect.toList 
//res10: List[Any] = List(one, two, three)

How is it better? We have distributed map transformation load among the workers rather than single Driver.

I know rdd.map(r => r(0)) does not seems elegant you. So, let's address it in next approach.


Approach 3

df.select("id").map(r => r.getString(0)).collect.toList 
//res11: List[String] = List(one, two, three)

Here we are not converting DataFrame to RDD. Look at map it won't accept r => r(0)(or _(0)) as the previous approach due to encoder issues in DataFrame. So end up using r => r.getString(0) and it would be addressed in the next versions of Spark.

Conclusion

All the options give the same output but 2 and 3 are effective, finally 3rd one is effective and elegant(I'd think).

Databricks notebook


sqlContext.sql(" select filename from tempTable").rdd.map(r => r(0)).collect.toList.foreach(out_streamfn.println) //remove brackets

it works perfectly


from pyspark.sql.functions import col

df.select(col("column_name")).collect()

here collect is functions which in turn convert it to list. Be ware of using the list on the huge data set. It will decrease performance. It is good to check the data.


Examples related to scala

Intermediate language used in scalac? Why does calling sumr on a stream with 50 tuples not complete Select Specific Columns from Spark DataFrame Joining Spark dataframes on the key Provide schema while reading csv file as a dataframe how to filter out a null value from spark dataframe Fetching distinct values on a column using Spark DataFrame Can't push to the heroku Spark - Error "A master URL must be set in your configuration" when submitting an app Add jars to a Spark Job - spark-submit

Examples related to apache-spark

Select Specific Columns from Spark DataFrame Select columns in PySpark dataframe What is the difference between spark.sql.shuffle.partitions and spark.default.parallelism? How to find count of Null and Nan values for each column in a PySpark dataframe efficiently? Spark dataframe: collect () vs select () How does createOrReplaceTempView work in Spark? Spark difference between reduceByKey vs groupByKey vs aggregateByKey vs combineByKey Filter df when values matches part of a string in pyspark Filtering a pyspark dataframe using isin by exclusion Convert date from String to Date format in Dataframes

Examples related to apache-spark-sql

Select Specific Columns from Spark DataFrame Pyspark: Filter dataframe based on multiple conditions Select columns in PySpark dataframe What is the difference between spark.sql.shuffle.partitions and spark.default.parallelism? How to find count of Null and Nan values for each column in a PySpark dataframe efficiently? Spark dataframe: collect () vs select () How does createOrReplaceTempView work in Spark? Filter df when values matches part of a string in pyspark Convert date from String to Date format in Dataframes Take n rows from a spark dataframe and pass to toPandas()