[scala] How do I convert csv file to rdd

I'm new to spark. I want to perform some operations on particular data in a CSV record.

I'm trying to read a CSV file and convert it to RDD. My further operations are based on the heading provided in CSV file.

(From comments) This is my code so far:

final JavaRDD<String> File = sc.textFile(Filename).cache();
final JavaRDD<String> lines = File.flatMap(new FlatMapFunction<String, String>() { 
    @Override public Iterable<String> call(String s) { 
    return Arrays.asList(EOL.split(s)); 
    } 
});
final String heading=lines.first().toString();

I can get the header values like this. I want to map this to each record in CSV file.

final String[] header=heading.split(" "); 

I can get the header values like this. I want to map this to each record in CSV file.

In java I’m using CSVReader record.getColumnValue(Column header) to get the particular value. I need to do something similar to that here.

This question is related to scala apache-spark

The answer is


I'd recommend reading the header directly from the driver, not through Spark. Two reasons for this: 1) It's a single line. There's no advantage to a distributed approach. 2) We need this line in the driver, not the worker nodes.

It goes something like this:

// Ridiculous amount of code to read one line.
val uri = new java.net.URI(filename)
val conf = sc.hadoopConfiguration
val fs = hadoop.fs.FileSystem.get(uri, conf)
val path = new hadoop.fs.Path(filename)
val stream = fs.open(path)
val source = scala.io.Source.fromInputStream(stream)
val header = source.getLines.head

Now when you make the RDD you can discard the header.

val csvRDD = sc.textFile(filename).filter(_ != header)

Then we can make an RDD from one column, for example:

val idx = header.split(",").indexOf(columnName)
val columnRDD = csvRDD.map(_.split(",")(idx))

We can use the new DataFrameRDD for reading and writing the CSV data. There are few advantages of DataFrameRDD over NormalRDD:

  1. DataFrameRDD are bit more faster than NormalRDD since we determine the schema and which helps to optimize a lot on runtime and provide us with significant performance gain.
  2. Even if the column shifts in CSV it will automatically take the correct column as we are not hard coding the column number which was present in reading the data as textFile and then splitting it and then using the number of column to get the data.
  3. In few lines of code you can read the CSV file directly.

You will be required to have this library: Add it in build.sbt

libraryDependencies += "com.databricks" % "spark-csv_2.10" % "1.2.0"

Spark Scala code for it:

val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val csvInPath = "/path/to/csv/abc.csv"
val df = sqlContext.read.format("com.databricks.spark.csv").option("header","true").load(csvInPath)
//format is for specifying the type of file you are reading
//header = true indicates that the first line is header in it

To convert to normal RDD by taking some of the columns from it and

val rddData = df.map(x=>Row(x.getAs("colA")))
//Do other RDD operation on it

Saving the RDD to CSV format:

val aDf = sqlContext.createDataFrame(rddData,StructType(Array(StructField("colANew",StringType,true))))
aDF.write.format("com.databricks.spark.csv").option("header","true").save("/csvOutPath/aCSVOp")

Since the header is set to true we will be getting the header name in all the output files.


I think you can try to load that csv into a RDD and then create a dataframe from that RDD, here is the document of creating dataframe from rdd:http://spark.apache.org/docs/latest/sql-programming-guide.html#interoperating-with-rdds


For spark scala I typically use when I can't use the spark csv packages...

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val rawdata = sc.textFile("hdfs://example.host:8020/user/example/example.csv")
val header = rawdata.first()
val tbldata = rawdata.filter(_(0) != header(0))

Here is another example using Spark/Scala to convert a CSV to RDD. For a more detailed description see this post.

def main(args: Array[String]): Unit = {
  val csv = sc.textFile("/path/to/your/file.csv")

  // split / clean data
  val headerAndRows = csv.map(line => line.split(",").map(_.trim))
  // get header
  val header = headerAndRows.first
  // filter out header (eh. just check if the first val matches the first header name)
  val data = headerAndRows.filter(_(0) != header(0))
  // splits to map (header/value pairs)
  val maps = data.map(splits => header.zip(splits).toMap)
  // filter out the user "me"
  val result = maps.filter(map => map("user") != "me")
  // print result
  result.foreach(println)
}

How about this?

val Delimeter = ","
val textFile = sc.textFile("data.csv").map(line => line.split(Delimeter))

As of Spark 2.0, CSV can be read directly into a DataFrame.

If the data file does not have a header row, then it would be:

val df = spark.read.csv("file://path/to/data.csv")

That will load the data, but give each column generic names like _c0, _c1, etc.

If there are headers then adding .option("header", "true") will use the first row to define the columns in the DataFrame:

val df = spark.read
  .option("header", "true")
  .csv("file://path/to/data.csv")

For a concrete example, let's say you have a file with the contents:

user,topic,hits
om,scala,120
daniel,spark,80
3754978,spark,1

Then the following will get the total hits grouped by topic:

import org.apache.spark.sql.functions._
import spark.implicits._

val rawData = spark.read
  .option("header", "true")
  .csv("file://path/to/data.csv")

// specifies the query, but does not execute it
val grouped = rawData.groupBy($"topic").agg(sum($"hits))

// runs the query, pulling the data to the master node
// can fail if the amount of data is too much to fit 
// into the master node's memory!
val collected = grouped.collect

// runs the query, writing the result back out
// in this case, changing format to Parquet since that can
//   be nicer to work with in Spark
grouped.write.parquet("hdfs://some/output/directory/")

// runs the query, writing the result back out
// in this case, in CSV format with a header and 
// coalesced to a single file.  This is easier for human 
// consumption but usually much slower.
grouped.coalesce(1)
  .write
  .option("header", "true")
  .csv("hdfs://some/output/directory/")

Firstly I must say that it's much much simpler if you put your headers in separate files - this is the convention in big data.

Anyway Daniel's answer is pretty good, but it has an inefficiency and a bug, so I'm going to post my own. The inefficiency is that you don't need to check every record to see if it's the header, you just need to check the first record for each partition. The bug is that by using .split(",") you could get an exception thrown or get the wrong column when entries are the empty string and occur at the start or end of the record - to correct that you need to use .split(",", -1). So here is the full code:

val header =
  scala.io.Source.fromInputStream(
    hadoop.fs.FileSystem.get(new java.net.URI(filename), sc.hadoopConfiguration)
    .open(new hadoop.fs.Path(path)))
  .getLines.head

val columnIndex = header.split(",").indexOf(columnName)

sc.textFile(path).mapPartitions(iterator => {
  val head = iterator.next()
  if (head == header) iterator else Iterator(head) ++ iterator
})
.map(_.split(",", -1)(columnIndex))

Final points, consider Parquet if you want to only fish out certain columns. Or at least consider implementing a lazily evaluated split function if you have wide rows.


Another alternative is to use the mapPartitionsWithIndex method as you'll get the partition index number and a list of all lines within that partition. Partition 0 and line 0 will be be the header

val rows = sc.textFile(path)
  .mapPartitionsWithIndex({ (index: Int, rows: Iterator[String]) => 
    val results = new ArrayBuffer[(String, Int)]

    var first = true
    while (rows.hasNext) {
      // check for first line
      if (index == 0 && first) {
        first = false
        rows.next // skip the first row
      } else {
        results += rows.next
      }
    }

    results.toIterator
}, true)

rows.flatMap { row => row.split(",") }

I would suggest you to try

https://spark.apache.org/docs/latest/sql-programming-guide.html#rdds

JavaRDD<Person> people = sc.textFile("examples/src/main/resources/people.txt").map(
  new Function<String, Person>() {
    public Person call(String line) throws Exception {
      String[] parts = line.split(",");

      Person person = new Person();
      person.setName(parts[0]);
      person.setAge(Integer.parseInt(parts[1].trim()));

      return person;
    }
  });

You have to have a class in this example person with the spec of your file header and associate your data to the schema and apply criteria like in mysql.. to get desired result


You can use the spark-csv library: https://github.com/databricks/spark-csv

This is directly from the documentation:

import org.apache.spark.sql.SQLContext

SQLContext sqlContext = new SQLContext(sc);

HashMap<String, String> options = new HashMap<String, String>();
options.put("header", "true");
options.put("path", "cars.csv");

DataFrame df = sqlContext.load("com.databricks.spark.csv", options);