For example, the result of this:
df.filter("project = 'en'").select("title","count").groupBy("title").sum()
would return an Array.
How to save a spark DataFrame as a csv file on disk ?
This question is related to
scala
apache-spark
apache-spark-sql
I had similar problem. I needed to write down csv file on driver while I was connect to cluster in client mode.
I wanted to reuse the same CSV parsing code as Apache Spark to avoid potential errors.
I checked spark-csv code and found code responsible for converting dataframe into raw csv RDD[String]
in com.databricks.spark.csv.CsvSchemaRDD
.
Sadly it is hardcoded with sc.textFile
and the end of relevant method.
I copy-pasted that code and removed last lines with sc.textFile
and returned RDD directly instead.
My code:
/*
This is copypasta from com.databricks.spark.csv.CsvSchemaRDD
Spark's code has perfect method converting Dataframe -> raw csv RDD[String]
But in last lines of that method it's hardcoded against writing as text file -
for our case we need RDD.
*/
object DataframeToRawCsvRDD {
val defaultCsvFormat = com.databricks.spark.csv.defaultCsvFormat
def apply(dataFrame: DataFrame, parameters: Map[String, String] = Map())
(implicit ctx: ExecutionContext): RDD[String] = {
val delimiter = parameters.getOrElse("delimiter", ",")
val delimiterChar = if (delimiter.length == 1) {
delimiter.charAt(0)
} else {
throw new Exception("Delimiter cannot be more than one character.")
}
val escape = parameters.getOrElse("escape", null)
val escapeChar: Character = if (escape == null) {
null
} else if (escape.length == 1) {
escape.charAt(0)
} else {
throw new Exception("Escape character cannot be more than one character.")
}
val quote = parameters.getOrElse("quote", "\"")
val quoteChar: Character = if (quote == null) {
null
} else if (quote.length == 1) {
quote.charAt(0)
} else {
throw new Exception("Quotation cannot be more than one character.")
}
val quoteModeString = parameters.getOrElse("quoteMode", "MINIMAL")
val quoteMode: QuoteMode = if (quoteModeString == null) {
null
} else {
QuoteMode.valueOf(quoteModeString.toUpperCase)
}
val nullValue = parameters.getOrElse("nullValue", "null")
val csvFormat = defaultCsvFormat
.withDelimiter(delimiterChar)
.withQuote(quoteChar)
.withEscape(escapeChar)
.withQuoteMode(quoteMode)
.withSkipHeaderRecord(false)
.withNullString(nullValue)
val generateHeader = parameters.getOrElse("header", "false").toBoolean
val headerRdd = if (generateHeader) {
ctx.sparkContext.parallelize(Seq(
csvFormat.format(dataFrame.columns.map(_.asInstanceOf[AnyRef]): _*)
))
} else {
ctx.sparkContext.emptyRDD[String]
}
val rowsRdd = dataFrame.rdd.map(row => {
csvFormat.format(row.toSeq.map(_.asInstanceOf[AnyRef]): _*)
})
headerRdd union rowsRdd
}
}
Writing dataframe to disk as csv is similar read from csv. If you want your result as one file, you can use coalesce.
df.coalesce(1)
.write
.option("header","true")
.option("sep",",")
.mode("overwrite")
.csv("output/path")
If your result is an array you should use language specific solution, not spark dataframe api. Because all these kind of results return driver machine.
I had similar issue where i had to save the contents of the dataframe to a csv file of name which i defined. df.write("csv").save("<my-path>")
was creating directory than file. So have to come up with the following solutions.
Most of the code is taken from the following dataframe-to-csv with little modifications to the logic.
def saveDfToCsv(df: DataFrame, tsvOutput: String, sep: String = ",", header: Boolean = false): Unit = {
val tmpParquetDir = "Posts.tmp.parquet"
df.repartition(1).write.
format("com.databricks.spark.csv").
option("header", header.toString).
option("delimiter", sep).
save(tmpParquetDir)
val dir = new File(tmpParquetDir)
val newFileRgex = tmpParquetDir + File.separatorChar + ".part-00000.*.csv"
val tmpTsfFile = dir.listFiles.filter(_.toPath.toString.matches(newFileRgex))(0).toString
(new File(tmpTsvFile)).renameTo(new File(tsvOutput))
dir.listFiles.foreach( f => f.delete )
dir.delete
}
Apache Spark does not support native CSV output on disk.
You have four available solutions though:
You can convert your Dataframe into an RDD :
def convertToReadableString(r : Row) = ???
df.rdd.map{ convertToReadableString }.saveAsTextFile(filepath)
This will create a folder filepath. Under the file path, you'll find partitions files (e.g part-000*)
What I usually do if I want to append all the partitions into a big CSV is
cat filePath/part* > mycsvfile.csv
Some will use coalesce(1,false)
to create one partition from the RDD. It's usually a bad practice, since it may overwhelm the driver by pulling all the data you are collecting to it.
Note that df.rdd
will return an RDD[Row]
.
With Spark <2, you can use databricks spark-csv library:
Spark 1.4+:
df.write.format("com.databricks.spark.csv").save(filepath)
Spark 1.3:
df.save(filepath,"com.databricks.spark.csv")
With Spark 2.x the spark-csv
package is not needed as it's included in Spark.
df.write.format("csv").save(filepath)
You can convert to local Pandas data frame and use to_csv
method (PySpark only).
Note: Solutions 1, 2 and 3 will result in CSV format files (part-*
) generated by the underlying Hadoop API that Spark calls when you invoke save
. You will have one part-
file per partition.
Source: Stackoverflow.com