[scala] How do I skip a header from CSV files in Spark?

Suppose I give three files paths to a Spark context to read and each file has a schema in the first row. How can we skip schema lines from headers?

val rdd=sc.textFile("file1,file2,file3")

Now, how can we skip header lines from this rdd?

This question is related to scala csv apache-spark

The answer is


//Find header from the files lying in the directory
val fileNameHeader = sc.binaryFiles("E:\\sss\\*.txt",1).map{
    case (fileName, stream)=>
        val header = new BufferedReader(new InputStreamReader(stream.open())).readLine()
        (fileName, header)
}.collect().toMap

val fileNameHeaderBr = sc.broadcast(fileNameHeader)

// Now let's skip the header. mapPartition will ensure the header
// can only be the first line of the partition
sc.textFile("E:\\sss\\*.txt",1).mapPartitions(iter =>
    if(iter.hasNext){
        val firstLine = iter.next()
        println(s"Comparing with firstLine $firstLine")
        if(firstLine == fileNameHeaderBr.value.head._2)
            new WrappedIterator(null, iter)
        else
            new WrappedIterator(firstLine, iter)
    }
    else {
        iter
    }
).collect().foreach(println)

class WrappedIterator(firstLine:String,iter:Iterator[String]) extends Iterator[String]{
    var isFirstIteration = true
    override def hasNext: Boolean = {
        if (isFirstIteration && firstLine != null){
            true
        }
        else{
            iter.hasNext
        }
    }

    override def next(): String = {
        if (isFirstIteration){
            println(s"For the first time $firstLine")
            isFirstIteration = false
            if (firstLine != null){
                firstLine
            }
            else{
                println(s"Every time $firstLine")
                iter.next()
            }
        }
        else {
          iter.next()
        }
    }
}

In Spark 2.0 a CSV reader is build into Spark, so you can easily load a CSV file as follows:

spark.read.option("header","true").csv("filePath")

Working in 2018 (Spark 2.3)

Python

df = spark.read
    .option("header", "true")
    .format("csv")
    .schema(myManualSchema)
    .load("mycsv.csv")

Scala

val myDf = spark.read
  .option("header", "true")
  .format("csv")
  .schema(myManualSchema)
  .load("mycsv.csv")

PD1: myManualSchema is a predefined schema written by me, you could skip that part of code

UPDATE 2021 The same code works for Spark 3.x

df = spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .format("csv")
    .csv("mycsv.csv")

In PySpark you can use a dataframe and set header as True:

df = spark.read.csv(dataPath, header=True)

From Spark 2.0 onwards what you can do is use SparkSession to get this done as a one liner:

val spark = SparkSession.builder.config(conf).getOrCreate()

and then as @SandeepPurohit said:

val dataFrame = spark.read.format("CSV").option("header","true").load(csvfilePath)

I hope it solved your question !

P.S: SparkSession is the new entry point introduced in Spark 2.0 and can be found under spark_sql package


It's an option that you pass to the read() command:

context = new org.apache.spark.sql.SQLContext(sc)

var data = context.read.option("header","true").csv("<path>")

data = sc.textFile('path_to_data')
header = data.first() #extract header
data = data.filter(row => row != header)   #filter out header

Alternatively, you can use the spark-csv package (or in Spark 2.0 this is more or less available natively as CSV). Note that this expects the header on each file (as you desire):

schema = StructType([
        StructField('lat',DoubleType(),True),
        StructField('lng',DoubleType(),True)])

df = sqlContext.read.format('com.databricks.spark.csv'). \
     options(header='true',
             delimiter="\t",
             treatEmptyValuesAsNulls=True,
             mode="DROPMALFORMED").load(input_file,schema=schema)

You could load each file separately, filter them with file.zipWithIndex().filter(_._2 > 0) and then union all the file RDDs.

If the number of files is too large, the union could throw a StackOverflowExeption.


For python developers. I have tested with spark2.0. Let's say you want to remove first 14 rows.

sc = spark.sparkContext
lines = sc.textFile("s3://folder_location_of_csv/")
parts = lines.map(lambda l: l.split(","))
parts.zipWithIndex().filter(lambda tup: tup[1] > 14).map(lambda x:x[0])

withColumn is df function. So below will not work in RDD style as used above.

parts.withColumn("index",monotonically_increasing_id()).filter(index > 14)

Use the filter() method in PySpark by filtering out the first column name to remove the header:

# Read file (change format for other file formats)
contentRDD = sc.textfile(<filepath>)

# Filter out first column of the header
filterDD = contentRDD.filter(lambda l: not l.startswith(<first column name>)

# Check your result
for i in filterDD.take(5) : print (i)

Examples related to scala

Intermediate language used in scalac? Why does calling sumr on a stream with 50 tuples not complete Select Specific Columns from Spark DataFrame Joining Spark dataframes on the key Provide schema while reading csv file as a dataframe how to filter out a null value from spark dataframe Fetching distinct values on a column using Spark DataFrame Can't push to the heroku Spark - Error "A master URL must be set in your configuration" when submitting an app Add jars to a Spark Job - spark-submit

Examples related to csv

Pandas: ValueError: cannot convert float NaN to integer Export result set on Dbeaver to CSV Convert txt to csv python script How to import an Excel file into SQL Server? "CSV file does not exist" for a filename with embedded quotes Save Dataframe to csv directly to s3 Python Data-frame Object has no Attribute (unicode error) 'unicodeescape' codec can't decode bytes in position 2-3: truncated \UXXXXXXXX escape How to write to a CSV line by line? How to check encoding of a CSV file

Examples related to apache-spark

Select Specific Columns from Spark DataFrame Select columns in PySpark dataframe What is the difference between spark.sql.shuffle.partitions and spark.default.parallelism? How to find count of Null and Nan values for each column in a PySpark dataframe efficiently? Spark dataframe: collect () vs select () How does createOrReplaceTempView work in Spark? Spark difference between reduceByKey vs groupByKey vs aggregateByKey vs combineByKey Filter df when values matches part of a string in pyspark Filtering a pyspark dataframe using isin by exclusion Convert date from String to Date format in Dataframes