[scala] Select Specific Columns from Spark DataFrame

I have loaded CSV data into a Spark DataFrame.

I need to slice this dataframe into two different dataframes, where each one contains a set of columns from the original dataframe.

How do I select a subset into a Spark dataframe, based on columns ?

This question is related to scala apache-spark apache-spark-sql

The answer is


Let's say our parent Dataframe has 'n' columns

we can create 'x' child DataFrames( Lets consider 2 in our case).

The columns for the child Dataframe can be chosen as per desire from any of the parent Dataframe columns.

Consider source has 10 columns and we want to split into 2 DataFrames that contains columns referenced from the parent Dataframe.

The columns for the child Dataframe can be decided using the select Dataframe API

val parentDF = spark.read.format("csv").load("/path of the CSV file")

val Child1_DF = parentDF.select("col1","col2","col3","col9","col10").show()

val child2_DF = parentDF.select("col5", "col6","col7","col8","col1","col2").show()

Notice that the column count in the child dataframes can differ in length and will be less than the parent dataframe column count.

we can also refer to the column names without mentioning the real names using the positional indexes of the desired column from the parent dataframe

Import spark implicits first which acts as a helper class for usage of $-notation to access the columns using the positional indexes

import spark.implicits._
import org.apache.spark.sql.functions._

val child3_DF  = parentDF.select("_c0","_c1","_c2","_c8","_c9").show()

we can also select column basing on certain conditions. Lets say we want only even numbered columns to be selected in the child dataframe. By even we refer to even indexed columns and index being starting from '0'

val parentColumns = parentDF.columns.toList


res0: List[String] = List(_c0, _c1, _c2, _c3, _c4, _c5, _c6, _c7,_c8,_c9)

val evenParentColumns =  res0.zipWithIndex.filter(_._2 % 2 == 0).map( _._1).toSeq

res1: scala.collection.immutable.Seq[String] = List(_c0, _c2, _c4, _c6,_c8)

Now feed these columns to be selected from the parentDF.Note that the select API need seq type arguments.So we converted the "evenParentColumns" to Seq collection

val child4_DF = parentDF.select(res1.head, res1.tail:_*).show()

This will show the even indexed columns from the parent Dataframe.


| _c0 | _c2 | _c4 |_c6 |_c8 |


|ITE00100554|TMAX|null| E| 1 |

|TE00100554 |TMIN|null| E| 4 |

|GM000010962|PRCP|null| E| 7 |

So Now we are left with the even numbered columns in the dataframe

Similarly we can also apply other operations to the Dataframe column like shown below

val child5_DF = parentDF.select($"_c0", $"_c8" + 1).show()

So by many ways as mentioned we can select the columns in the Dataframe.


Just by using select select you can select particular columns, give them readable names and cast them. For example like this:

spark.read.csv(path).select(
          '_c0.alias("stn").cast(StringType),
          '_c1.alias("wban").cast(StringType),
          '_c2.alias("lat").cast(DoubleType),
          '_c3.alias("lon").cast(DoubleType)
        )
          .where('_c2.isNotNull && '_c3.isNotNull && '_c2 =!= 0.0 && '_c3 =!= 0.0)

Problem was to select columns of on dataframe after joining with other dataframe.

I tried below and select the columns of salaryDf from the joined dataframe.

Hope this will help

        val empDf=spark.read.option("header","true").csv("/data/tech.txt")

        val salaryDf=spark.read.option("header","true").csv("/data/salary.txt")

        val joinData= empDf.join(salaryDf,empDf.col("first") === salaryDf.col("first") and  empDf.col("last") === salaryDf.col("last"))

      //**below will select the colums of salaryDf only**

     val finalDF=joinData.select(salaryDf.columns map  salaryDf.col:_*)

//same way we can select the columns of empDf
joinData.select(empDf.columns map  empDf.col:_*)

You can use the below code to select columns based on their index (position). You can alter the numbers for variable colNos to select only those columns

import org.apache.spark.sql.functions.col

val colNos = Seq(0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35)
val Df_01 = Df.select(colNos_01 map Df.columns map col: _*)
Df_01.show(20, false)

i liked dehasis approach, because it allowed me to select, rename and convert columns in one step. However I had to adjust it to make it work for me in PySpark:

from pyspark.sql.functions import col

spark.read.csv(path).select(
      col('_c0').alias("stn").cast('String'),
      col('_c1').alias("wban").cast('String'),
      col('_c2').alias("lat").cast('Double'),
      col('_c3').alias("lon").cast('Double')
    )
      .where('_c2.isNotNull && '_c3.isNotNull && '_c2 =!= 0.0 && '_c3 =!= 0.0)

There are multiple options (especially in Scala) to select a subset of columns of that Dataframe. The following lines will all select the two columns colA and colB:

import spark.implicits._
import org.apache.spark.sql.functions.{col, column, expr}

inputDf.select(col("colA"), col("colB"))
inputDf.select(inputDf.col("colA"), inputDf.col("colB"))
inputDf.select(column("colA"), column("colB"))
inputDf.select(expr("colA"), expr("colB"))

// only available in Scala
inputDf.select($"colA", $"colB")
inputDf.select('colA, 'colB) // makes use of Scala's Symbol

// selecting columns based on a given iterable of Strings
val selectedColumns: Seq[Column] = Seq("colA", "colB").map(c => col(c))
inputDf.select(selectedColumns: _*)

// select the first or last 2 columns
inputDf.selectExpr(inputDf.columns.take(2): _*)
inputDf.selectExpr(inputDf.columns.takeRight(2): _*)

The usage of $ is possible as Scala provides an implicit class that converts a String into a Column using the method $:

implicit class StringToColumn(val sc : scala.StringContext) extends scala.AnyRef {
  def $(args : scala.Any*) : org.apache.spark.sql.ColumnName = { /* compiled code */ }
}

Typically, when you want to derive one DataFrame to multiple DataFrames it might improve your performance if you persist the original DataFrame before creating the others. At the end you can unpersist the original DataFrame.

Keep in mind that Columns are not resolved at compile time but only when it is compared to the column names of your catalog which happens during analyser phase of the query execution. In case you need stronger type safety you could create a Dataset.

For completeness, here is the csv to try out above code:

// csv file:
// colA,colB,colC
// 1,"foo","bar"

val inputDf = spark.read.format("csv").option("header", "true").load(csvFilePath)

// resulting DataFrame schema
root
 |-- colA: string (nullable = true)
 |-- colB: string (nullable = true)
 |-- colC: string (nullable = true)

Solved, just use select method for the dataframe to select columns:

 val df=spark.read.csv("C:\\Users\\Ahmed\\Desktop\\cabs_trajectories\\cabs_trajectories\\green\\2014\\green_tripdata_2014-09.csv")

val df1=df.select("_c0")

this would subset the first column of the dataframe


Examples related to scala

Intermediate language used in scalac? Why does calling sumr on a stream with 50 tuples not complete Select Specific Columns from Spark DataFrame Joining Spark dataframes on the key Provide schema while reading csv file as a dataframe how to filter out a null value from spark dataframe Fetching distinct values on a column using Spark DataFrame Can't push to the heroku Spark - Error "A master URL must be set in your configuration" when submitting an app Add jars to a Spark Job - spark-submit

Examples related to apache-spark

Select Specific Columns from Spark DataFrame Select columns in PySpark dataframe What is the difference between spark.sql.shuffle.partitions and spark.default.parallelism? How to find count of Null and Nan values for each column in a PySpark dataframe efficiently? Spark dataframe: collect () vs select () How does createOrReplaceTempView work in Spark? Spark difference between reduceByKey vs groupByKey vs aggregateByKey vs combineByKey Filter df when values matches part of a string in pyspark Filtering a pyspark dataframe using isin by exclusion Convert date from String to Date format in Dataframes

Examples related to apache-spark-sql

Select Specific Columns from Spark DataFrame Pyspark: Filter dataframe based on multiple conditions Select columns in PySpark dataframe What is the difference between spark.sql.shuffle.partitions and spark.default.parallelism? How to find count of Null and Nan values for each column in a PySpark dataframe efficiently? Spark dataframe: collect () vs select () How does createOrReplaceTempView work in Spark? Filter df when values matches part of a string in pyspark Convert date from String to Date format in Dataframes Take n rows from a spark dataframe and pass to toPandas()