[scala] How to convert rdd object to dataframe in spark

How can I convert an RDD (org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]) to a Dataframe org.apache.spark.sql.DataFrame. I converted a dataframe to rdd using .rdd. After processing it I want it back in dataframe. How can I do this ?

This question is related to scala apache-spark apache-spark-sql rdd

The answer is


On newer versions of spark (2.0+)

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql._
import org.apache.spark.sql.types._

val spark = SparkSession
  .builder()
  .getOrCreate()
import spark.implicits._

val dfSchema = Seq("col1", "col2", "col3")
rdd.toDF(dfSchema: _*)

Assuming your RDD[row] is called rdd, you can use:

val sqlContext = new SQLContext(sc) 
import sqlContext.implicits._
rdd.toDF()

Suppose you have a DataFrame and you want to do some modification on the fields data by converting it to RDD[Row].

val aRdd = aDF.map(x=>Row(x.getAs[Long]("id"),x.getAs[List[String]]("role").head))

To convert back to DataFrame from RDD we need to define the structure type of the RDD.

If the datatype was Long then it will become as LongType in structure.

If String then StringType in structure.

val aStruct = new StructType(Array(StructField("id",LongType,nullable = true),StructField("role",StringType,nullable = true)))

Now you can convert the RDD to DataFrame using the createDataFrame method.

val aNamedDF = sqlContext.createDataFrame(aRdd,aStruct)

This code works perfectly from Spark 2.x with Scala 2.11

Import necessary classes

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{DoubleType, StringType, StructField, StructType}

Create SparkSession Object, and Here it's spark

val spark: SparkSession = SparkSession.builder.master("local").getOrCreate
val sc = spark.sparkContext // Just used to create test RDDs

Let's an RDD to make it DataFrame

val rdd = sc.parallelize(
  Seq(
    ("first", Array(2.0, 1.0, 2.1, 5.4)),
    ("test", Array(1.5, 0.5, 0.9, 3.7)),
    ("choose", Array(8.0, 2.9, 9.1, 2.5))
  )
)

Method 1

Using SparkSession.createDataFrame(RDD obj).

val dfWithoutSchema = spark.createDataFrame(rdd)

dfWithoutSchema.show()
+------+--------------------+
|    _1|                  _2|
+------+--------------------+
| first|[2.0, 1.0, 2.1, 5.4]|
|  test|[1.5, 0.5, 0.9, 3.7]|
|choose|[8.0, 2.9, 9.1, 2.5]|
+------+--------------------+

Method 2

Using SparkSession.createDataFrame(RDD obj) and specifying column names.

val dfWithSchema = spark.createDataFrame(rdd).toDF("id", "vals")

dfWithSchema.show()
+------+--------------------+
|    id|                vals|
+------+--------------------+
| first|[2.0, 1.0, 2.1, 5.4]|
|  test|[1.5, 0.5, 0.9, 3.7]|
|choose|[8.0, 2.9, 9.1, 2.5]|
+------+--------------------+

Method 3 (Actual answer to the question)

This way requires the input rdd should be of type RDD[Row].

val rowsRdd: RDD[Row] = sc.parallelize(
  Seq(
    Row("first", 2.0, 7.0),
    Row("second", 3.5, 2.5),
    Row("third", 7.0, 5.9)
  )
)

create the schema

val schema = new StructType()
  .add(StructField("id", StringType, true))
  .add(StructField("val1", DoubleType, true))
  .add(StructField("val2", DoubleType, true))

Now apply both rowsRdd and schema to createDataFrame()

val df = spark.createDataFrame(rowsRdd, schema)

df.show()
+------+----+----+
|    id|val1|val2|
+------+----+----+
| first| 2.0| 7.0|
|second| 3.5| 2.5|
| third| 7.0| 5.9|
+------+----+----+

Method 1: (Scala)

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
val df_2 = sc.parallelize(Seq((1L, 3.0, "a"), (2L, -1.0, "b"), (3L, 0.0, "c"))).toDF("x", "y", "z")

Method 2: (Scala)

case class temp(val1: String,val3 : Double) 

val rdd = sc.parallelize(Seq(
  Row("foo",  0.5), Row("bar",  0.0)
))
val rows = rdd.map({case Row(val1:String,val3:Double) => temp(val1,val3)}).toDF()
rows.show()

Method 1: (Python)

from pyspark.sql import Row
l = [('Alice',2)]
Person = Row('name','age')
rdd = sc.parallelize(l)
person = rdd.map(lambda r:Person(*r))
df2 = sqlContext.createDataFrame(person)
df2.show()

Method 2: (Python)

from pyspark.sql.types import * 
l = [('Alice',2)]
rdd = sc.parallelize(l)
schema =  StructType([StructField ("name" , StringType(), True) , 
StructField("age" , IntegerType(), True)]) 
df3 = sqlContext.createDataFrame(rdd, schema) 
df3.show()

Extracted the value from the row object and then applied the case class to convert rdd to DF

val temp1 = attrib1.map{case Row ( key: Int ) => s"$key" }
val temp2 = attrib2.map{case Row ( key: Int) => s"$key" }

case class RLT (id: String, attrib_1 : String, attrib_2 : String)
import hiveContext.implicits._

val df = result.map{ s => RLT(s(0),s(1),s(2)) }.toDF

Note: This answer was originally posted here

I am posting this answer because I would like to share additional details about the available options that I did not find in the other answers


To create a DataFrame from an RDD of Rows, there are two main options:

1) As already pointed out, you could use toDF() which can be imported by import sqlContext.implicits._. However, this approach only works for the following types of RDDs:

  • RDD[Int]
  • RDD[Long]
  • RDD[String]
  • RDD[T <: scala.Product]

(source: Scaladoc of the SQLContext.implicits object)

The last signature actually means that it can work for an RDD of tuples or an RDD of case classes (because tuples and case classes are subclasses of scala.Product).

So, to use this approach for an RDD[Row], you have to map it to an RDD[T <: scala.Product]. This can be done by mapping each row to a custom case class or to a tuple, as in the following code snippets:

val df = rdd.map({ 
  case Row(val1: String, ..., valN: Long) => (val1, ..., valN)
}).toDF("col1_name", ..., "colN_name")

or

case class MyClass(val1: String, ..., valN: Long = 0L)
val df = rdd.map({ 
  case Row(val1: String, ..., valN: Long) => MyClass(val1, ..., valN)
}).toDF("col1_name", ..., "colN_name")

The main drawback of this approach (in my opinion) is that you have to explicitly set the schema of the resulting DataFrame in the map function, column by column. Maybe this can be done programatically if you don't know the schema in advance, but things can get a little messy there. So, alternatively, there is another option:


2) You can use createDataFrame(rowRDD: RDD[Row], schema: StructType) as in the accepted answer, which is available in the SQLContext object. Example for converting an RDD of an old DataFrame:

val rdd = oldDF.rdd
val newDF = oldDF.sqlContext.createDataFrame(rdd, oldDF.schema)

Note that there is no need to explicitly set any schema column. We reuse the old DF's schema, which is of StructType class and can be easily extended. However, this approach sometimes is not possible, and in some cases can be less efficient than the first one.


To convert an Array[Row] to DataFrame or Dataset, the following works elegantly:

Say, schema is the StructType for the row,then

val rows: Array[Row]=...
implicit val encoder = RowEncoder.apply(schema)
import spark.implicits._
rows.toDS

I tried to explain the solution using the word count problem. 1. Read the file using sc

  1. Produce word count
  2. Methods to create DF

    • rdd.toDF method
    • rdd.toDF("word","count")
      • spark.createDataFrame(rdd,schema)

    Read file using spark

    val rdd=sc.textFile("D://cca175/data/")  
    

    Rdd to Dataframe

    val df=sc.textFile("D://cca175/data/").toDF("t1") df.show

    Method 1

    Create word count RDD to Dataframe

    val df=rdd.flatMap(x=>x.split(" ")).map(x=>(x,1)).reduceByKey((x,y)=>(x+y)).toDF("word","count")
    

    Method2

    Create Dataframe from Rdd

    val df=spark.createDataFrame(wordRdd) 
    # with header   
    val df=spark.createDataFrame(wordRdd).toDF("word","count")  df.show
    

    Method3

    Define Schema

    import org.apache.spark.sql.types._

    val schema=new StructType(). add(StructField("word",StringType,true)). add(StructField("count",StringType,true))

    Create RowRDD

    import org.apache.spark.sql.Row
    val rowRdd=wordRdd.map(x=>(Row(x._1,x._2)))     
    

    Create DataFrame from RDD with schema

    val df=spark.createDataFrame(rowRdd,schema)
    df.show


One needs to create a schema, and attach it to the Rdd.

Assuming val spark is a product of a SparkSession.builder...

    import org.apache.spark._
    import org.apache.spark.sql._       
    import org.apache.spark.sql.types._

    /* Lets gin up some sample data:
     * As RDD's and dataframes can have columns of differing types, lets make our
     * sample data a three wide, two tall, rectangle of mixed types.
     * A column of Strings, a column of Longs, and a column of Doubules 
     */
    val arrayOfArrayOfAnys = Array.ofDim[Any](2,3)
    arrayOfArrayOfAnys(0)(0)="aString"
    arrayOfArrayOfAnys(0)(1)=0L
    arrayOfArrayOfAnys(0)(2)=3.14159
    arrayOfArrayOfAnys(1)(0)="bString"
    arrayOfArrayOfAnys(1)(1)=9876543210L
    arrayOfArrayOfAnys(1)(2)=2.71828

    /* The way to convert an anything which looks rectangular, 
     * (Array[Array[String]] or Array[Array[Any]] or Array[Row], ... ) into an RDD is to 
     * throw it into sparkContext.parallelize.
     * http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkContext shows
     * the parallelize definition as 
     *     def parallelize[T](seq: Seq[T], numSlices: Int = defaultParallelism)
     * so in our case our ArrayOfArrayOfAnys is treated as a sequence of ArraysOfAnys.
     * Will leave the numSlices as the defaultParallelism, as I have no particular cause to change it. 
     */
    val rddOfArrayOfArrayOfAnys=spark.sparkContext.parallelize(arrayOfArrayOfAnys)

    /* We'll be using the sqlContext.createDataFrame to add a schema our RDD.
     * The RDD which goes into createDataFrame is an RDD[Row] which is not what we happen to have.
     * To convert anything one tall and several wide into a Row, one can use Row.fromSeq(thatThing.toSeq)
     * As we have an RDD[somethingWeDontWant], we can map each of the RDD rows into the desired Row type. 
     */     
    val rddOfRows=rddOfArrayOfArrayOfAnys.map(f=>
        Row.fromSeq(f.toSeq)
    )

    /* Now to construct our schema. This needs to be a StructType of 1 StructField per column in our dataframe.
     * https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.types.StructField shows the definition as
     *   case class StructField(name: String, dataType: DataType, nullable: Boolean = true, metadata: Metadata = Metadata.empty)
     * Will leave the two default values in place for each of the columns:
     *        nullability as true, 
     *        metadata as an empty Map[String,Any]
     *   
     */

    val schema = StructType(
        StructField("colOfStrings", StringType) ::
        StructField("colOfLongs"  , LongType  ) ::
        StructField("colOfDoubles", DoubleType) ::
        Nil
    )

    val df=spark.sqlContext.createDataFrame(rddOfRows,schema)
    /*
     *      +------------+----------+------------+
     *      |colOfStrings|colOfLongs|colOfDoubles|
     *      +------------+----------+------------+
     *      |     aString|         0|     3.14159|
     *      |     bString|9876543210|     2.71828|
     *      +------------+----------+------------+
    */ 
    df.show 

Same steps, but with fewer val declarations:

    val arrayOfArrayOfAnys=Array(
        Array("aString",0L         ,3.14159),
        Array("bString",9876543210L,2.71828)
    )

    val rddOfRows=spark.sparkContext.parallelize(arrayOfArrayOfAnys).map(f=>Row.fromSeq(f.toSeq))

    /* If one knows the datatypes, for instance from JDBC queries as to RDBC column metadata:
     * Consider constructing the schema from an Array[StructField].  This would allow looping over 
     * the columns, with a match statement applying the appropriate sql datatypes as the second
     *  StructField arguments.   
     */
    val sf=new Array[StructField](3)
    sf(0)=StructField("colOfStrings",StringType)
    sf(1)=StructField("colOfLongs"  ,LongType  )
    sf(2)=StructField("colOfDoubles",DoubleType)        
    val df=spark.sqlContext.createDataFrame(rddOfRows,StructType(sf.toList))
    df.show

Here is a simple example of converting your List into Spark RDD and then converting that Spark RDD into Dataframe.

Please note that I have used Spark-shell's scala REPL to execute following code, Here sc is an instance of SparkContext which is implicitly available in Spark-shell. Hope it answer your question.

scala> val numList = List(1,2,3,4,5)
numList: List[Int] = List(1, 2, 3, 4, 5)

scala> val numRDD = sc.parallelize(numList)
numRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[80] at parallelize at <console>:28

scala> val numDF = numRDD.toDF
numDF: org.apache.spark.sql.DataFrame = [_1: int]

scala> numDF.show
+---+
| _1|
+---+
|  1|
|  2|
|  3|
|  4|
|  5|
+---+

Examples related to scala

Intermediate language used in scalac? Why does calling sumr on a stream with 50 tuples not complete Select Specific Columns from Spark DataFrame Joining Spark dataframes on the key Provide schema while reading csv file as a dataframe how to filter out a null value from spark dataframe Fetching distinct values on a column using Spark DataFrame Can't push to the heroku Spark - Error "A master URL must be set in your configuration" when submitting an app Add jars to a Spark Job - spark-submit

Examples related to apache-spark

Select Specific Columns from Spark DataFrame Select columns in PySpark dataframe What is the difference between spark.sql.shuffle.partitions and spark.default.parallelism? How to find count of Null and Nan values for each column in a PySpark dataframe efficiently? Spark dataframe: collect () vs select () How does createOrReplaceTempView work in Spark? Spark difference between reduceByKey vs groupByKey vs aggregateByKey vs combineByKey Filter df when values matches part of a string in pyspark Filtering a pyspark dataframe using isin by exclusion Convert date from String to Date format in Dataframes

Examples related to apache-spark-sql

Select Specific Columns from Spark DataFrame Pyspark: Filter dataframe based on multiple conditions Select columns in PySpark dataframe What is the difference between spark.sql.shuffle.partitions and spark.default.parallelism? How to find count of Null and Nan values for each column in a PySpark dataframe efficiently? Spark dataframe: collect () vs select () How does createOrReplaceTempView work in Spark? Filter df when values matches part of a string in pyspark Convert date from String to Date format in Dataframes Take n rows from a spark dataframe and pass to toPandas()

Examples related to rdd

How to create a DataFrame from a text file in Spark Spark - repartition() vs coalesce() Difference between DataFrame, Dataset, and RDD in Spark Spark specify multiple column conditions for dataframe join Spark read file from S3 using sc.textFile ("s3n://...) Spark: subtract two DataFrames How to convert rdd object to dataframe in spark What is the difference between cache and persist? Apache Spark: map vs mapPartitions?