[scala] How to save a spark DataFrame as csv on disk?

For example, the result of this:

df.filter("project = 'en'").select("title","count").groupBy("title").sum()

would return an Array.

How to save a spark DataFrame as a csv file on disk ?

This question is related to scala apache-spark apache-spark-sql

The answer is


I had similar problem. I needed to write down csv file on driver while I was connect to cluster in client mode.

I wanted to reuse the same CSV parsing code as Apache Spark to avoid potential errors.

I checked spark-csv code and found code responsible for converting dataframe into raw csv RDD[String] in com.databricks.spark.csv.CsvSchemaRDD.

Sadly it is hardcoded with sc.textFile and the end of relevant method.

I copy-pasted that code and removed last lines with sc.textFile and returned RDD directly instead.

My code:

/*
  This is copypasta from com.databricks.spark.csv.CsvSchemaRDD
  Spark's code has perfect method converting Dataframe -> raw csv RDD[String]
  But in last lines of that method it's hardcoded against writing as text file -
  for our case we need RDD.
 */
object DataframeToRawCsvRDD {

  val defaultCsvFormat = com.databricks.spark.csv.defaultCsvFormat

  def apply(dataFrame: DataFrame, parameters: Map[String, String] = Map())
           (implicit ctx: ExecutionContext): RDD[String] = {
    val delimiter = parameters.getOrElse("delimiter", ",")
    val delimiterChar = if (delimiter.length == 1) {
      delimiter.charAt(0)
    } else {
      throw new Exception("Delimiter cannot be more than one character.")
    }

    val escape = parameters.getOrElse("escape", null)
    val escapeChar: Character = if (escape == null) {
      null
    } else if (escape.length == 1) {
      escape.charAt(0)
    } else {
      throw new Exception("Escape character cannot be more than one character.")
    }

    val quote = parameters.getOrElse("quote", "\"")
    val quoteChar: Character = if (quote == null) {
      null
    } else if (quote.length == 1) {
      quote.charAt(0)
    } else {
      throw new Exception("Quotation cannot be more than one character.")
    }

    val quoteModeString = parameters.getOrElse("quoteMode", "MINIMAL")
    val quoteMode: QuoteMode = if (quoteModeString == null) {
      null
    } else {
      QuoteMode.valueOf(quoteModeString.toUpperCase)
    }

    val nullValue = parameters.getOrElse("nullValue", "null")

    val csvFormat = defaultCsvFormat
      .withDelimiter(delimiterChar)
      .withQuote(quoteChar)
      .withEscape(escapeChar)
      .withQuoteMode(quoteMode)
      .withSkipHeaderRecord(false)
      .withNullString(nullValue)

    val generateHeader = parameters.getOrElse("header", "false").toBoolean
    val headerRdd = if (generateHeader) {
      ctx.sparkContext.parallelize(Seq(
        csvFormat.format(dataFrame.columns.map(_.asInstanceOf[AnyRef]): _*)
      ))
    } else {
      ctx.sparkContext.emptyRDD[String]
    }

    val rowsRdd = dataFrame.rdd.map(row => {
      csvFormat.format(row.toSeq.map(_.asInstanceOf[AnyRef]): _*)
    })

    headerRdd union rowsRdd
  }

}

Writing dataframe to disk as csv is similar read from csv. If you want your result as one file, you can use coalesce.

df.coalesce(1)
      .write
      .option("header","true")
      .option("sep",",")
      .mode("overwrite")
      .csv("output/path")

If your result is an array you should use language specific solution, not spark dataframe api. Because all these kind of results return driver machine.


I had similar issue where i had to save the contents of the dataframe to a csv file of name which i defined. df.write("csv").save("<my-path>") was creating directory than file. So have to come up with the following solutions. Most of the code is taken from the following dataframe-to-csv with little modifications to the logic.

def saveDfToCsv(df: DataFrame, tsvOutput: String, sep: String = ",", header: Boolean = false): Unit = {
    val tmpParquetDir = "Posts.tmp.parquet"

    df.repartition(1).write.
        format("com.databricks.spark.csv").
        option("header", header.toString).
        option("delimiter", sep).
        save(tmpParquetDir)

    val dir = new File(tmpParquetDir)
    val newFileRgex = tmpParquetDir + File.separatorChar + ".part-00000.*.csv"
    val tmpTsfFile = dir.listFiles.filter(_.toPath.toString.matches(newFileRgex))(0).toString
    (new File(tmpTsvFile)).renameTo(new File(tsvOutput))

    dir.listFiles.foreach( f => f.delete )
    dir.delete
    }

Apache Spark does not support native CSV output on disk.

You have four available solutions though:

  1. You can convert your Dataframe into an RDD :

    def convertToReadableString(r : Row) = ???
    df.rdd.map{ convertToReadableString }.saveAsTextFile(filepath)
    

    This will create a folder filepath. Under the file path, you'll find partitions files (e.g part-000*)

    What I usually do if I want to append all the partitions into a big CSV is

    cat filePath/part* > mycsvfile.csv
    

    Some will use coalesce(1,false) to create one partition from the RDD. It's usually a bad practice, since it may overwhelm the driver by pulling all the data you are collecting to it.

    Note that df.rdd will return an RDD[Row].

  2. With Spark <2, you can use databricks spark-csv library:

    • Spark 1.4+:

      df.write.format("com.databricks.spark.csv").save(filepath)
      
    • Spark 1.3:

      df.save(filepath,"com.databricks.spark.csv")
      
  3. With Spark 2.x the spark-csv package is not needed as it's included in Spark.

    df.write.format("csv").save(filepath)
    
  4. You can convert to local Pandas data frame and use to_csv method (PySpark only).

Note: Solutions 1, 2 and 3 will result in CSV format files (part-*) generated by the underlying Hadoop API that Spark calls when you invoke save. You will have one part- file per partition.


Examples related to scala

Intermediate language used in scalac? Why does calling sumr on a stream with 50 tuples not complete Select Specific Columns from Spark DataFrame Joining Spark dataframes on the key Provide schema while reading csv file as a dataframe how to filter out a null value from spark dataframe Fetching distinct values on a column using Spark DataFrame Can't push to the heroku Spark - Error "A master URL must be set in your configuration" when submitting an app Add jars to a Spark Job - spark-submit

Examples related to apache-spark

Select Specific Columns from Spark DataFrame Select columns in PySpark dataframe What is the difference between spark.sql.shuffle.partitions and spark.default.parallelism? How to find count of Null and Nan values for each column in a PySpark dataframe efficiently? Spark dataframe: collect () vs select () How does createOrReplaceTempView work in Spark? Spark difference between reduceByKey vs groupByKey vs aggregateByKey vs combineByKey Filter df when values matches part of a string in pyspark Filtering a pyspark dataframe using isin by exclusion Convert date from String to Date format in Dataframes

Examples related to apache-spark-sql

Select Specific Columns from Spark DataFrame Pyspark: Filter dataframe based on multiple conditions Select columns in PySpark dataframe What is the difference between spark.sql.shuffle.partitions and spark.default.parallelism? How to find count of Null and Nan values for each column in a PySpark dataframe efficiently? Spark dataframe: collect () vs select () How does createOrReplaceTempView work in Spark? Filter df when values matches part of a string in pyspark Convert date from String to Date format in Dataframes Take n rows from a spark dataframe and pass to toPandas()