Suppose I give three files paths to a Spark context to read and each file has a schema in the first row. How can we skip schema lines from headers?
val rdd=sc.textFile("file1,file2,file3")
Now, how can we skip header lines from this rdd?
This question is related to
scala
csv
apache-spark
//Find header from the files lying in the directory
val fileNameHeader = sc.binaryFiles("E:\\sss\\*.txt",1).map{
case (fileName, stream)=>
val header = new BufferedReader(new InputStreamReader(stream.open())).readLine()
(fileName, header)
}.collect().toMap
val fileNameHeaderBr = sc.broadcast(fileNameHeader)
// Now let's skip the header. mapPartition will ensure the header
// can only be the first line of the partition
sc.textFile("E:\\sss\\*.txt",1).mapPartitions(iter =>
if(iter.hasNext){
val firstLine = iter.next()
println(s"Comparing with firstLine $firstLine")
if(firstLine == fileNameHeaderBr.value.head._2)
new WrappedIterator(null, iter)
else
new WrappedIterator(firstLine, iter)
}
else {
iter
}
).collect().foreach(println)
class WrappedIterator(firstLine:String,iter:Iterator[String]) extends Iterator[String]{
var isFirstIteration = true
override def hasNext: Boolean = {
if (isFirstIteration && firstLine != null){
true
}
else{
iter.hasNext
}
}
override def next(): String = {
if (isFirstIteration){
println(s"For the first time $firstLine")
isFirstIteration = false
if (firstLine != null){
firstLine
}
else{
println(s"Every time $firstLine")
iter.next()
}
}
else {
iter.next()
}
}
}
In Spark 2.0 a CSV reader is build into Spark, so you can easily load a CSV file as follows:
spark.read.option("header","true").csv("filePath")
Working in 2018 (Spark 2.3)
Python
df = spark.read
.option("header", "true")
.format("csv")
.schema(myManualSchema)
.load("mycsv.csv")
Scala
val myDf = spark.read
.option("header", "true")
.format("csv")
.schema(myManualSchema)
.load("mycsv.csv")
PD1: myManualSchema is a predefined schema written by me, you could skip that part of code
UPDATE 2021 The same code works for Spark 3.x
df = spark.read
.option("header", "true")
.option("inferSchema", "true")
.format("csv")
.csv("mycsv.csv")
In PySpark you can use a dataframe and set header as True:
df = spark.read.csv(dataPath, header=True)
From Spark 2.0 onwards what you can do is use SparkSession to get this done as a one liner:
val spark = SparkSession.builder.config(conf).getOrCreate()
and then as @SandeepPurohit said:
val dataFrame = spark.read.format("CSV").option("header","true").load(csvfilePath)
I hope it solved your question !
P.S: SparkSession is the new entry point introduced in Spark 2.0 and can be found under spark_sql package
It's an option that you pass to the read()
command:
context = new org.apache.spark.sql.SQLContext(sc)
var data = context.read.option("header","true").csv("<path>")
data = sc.textFile('path_to_data')
header = data.first() #extract header
data = data.filter(row => row != header) #filter out header
Alternatively, you can use the spark-csv package (or in Spark 2.0 this is more or less available natively as CSV). Note that this expects the header on each file (as you desire):
schema = StructType([
StructField('lat',DoubleType(),True),
StructField('lng',DoubleType(),True)])
df = sqlContext.read.format('com.databricks.spark.csv'). \
options(header='true',
delimiter="\t",
treatEmptyValuesAsNulls=True,
mode="DROPMALFORMED").load(input_file,schema=schema)
You could load each file separately, filter them with file.zipWithIndex().filter(_._2 > 0)
and then union all the file RDDs.
If the number of files is too large, the union could throw a StackOverflowExeption
.
For python developers. I have tested with spark2.0. Let's say you want to remove first 14 rows.
sc = spark.sparkContext
lines = sc.textFile("s3://folder_location_of_csv/")
parts = lines.map(lambda l: l.split(","))
parts.zipWithIndex().filter(lambda tup: tup[1] > 14).map(lambda x:x[0])
withColumn is df function. So below will not work in RDD style as used above.
parts.withColumn("index",monotonically_increasing_id()).filter(index > 14)
Use the filter()
method in PySpark by filtering out the first column name to remove the header:
# Read file (change format for other file formats)
contentRDD = sc.textfile(<filepath>)
# Filter out first column of the header
filterDD = contentRDD.filter(lambda l: not l.startswith(<first column name>)
# Check your result
for i in filterDD.take(5) : print (i)
Source: Stackoverflow.com