[hadoop] How to export data from Spark SQL to CSV

This command works with HiveQL:

insert overwrite directory '/data/home.csv' select * from testtable;

But with Spark SQL I'm getting an error with an org.apache.spark.sql.hive.HiveQl stack trace:

java.lang.RuntimeException: Unsupported language features in query:
    insert overwrite directory '/data/home.csv' select * from testtable

Please guide me to write export to CSV feature in Spark SQL.

The answer is


The simplest way is to map over the DataFrame's RDD and use mkString:

  df.rdd.map(x=>x.mkString(","))

As of Spark 1.5 (or even before that) df.map(r=>r.mkString(",")) would do the same if you want CSV escaping you can use apache commons lang for that. e.g. here's the code we're using

 def DfToTextFile(path: String,
                   df: DataFrame,
                   delimiter: String = ",",
                   csvEscape: Boolean = true,
                   partitions: Int = 1,
                   compress: Boolean = true,
                   header: Option[String] = None,
                   maxColumnLength: Option[Int] = None) = {

    def trimColumnLength(c: String) = {
      val col = maxColumnLength match {
        case None => c
        case Some(len: Int) => c.take(len)
      }
      if (csvEscape) StringEscapeUtils.escapeCsv(col) else col
    }
    def rowToString(r: Row) = {
      val st = r.mkString("~-~").replaceAll("[\\p{C}|\\uFFFD]", "") //remove control characters
      st.split("~-~").map(trimColumnLength).mkString(delimiter)
    }

    def addHeader(r: RDD[String]) = {
      val rdd = for (h <- header;
                     if partitions == 1; //headers only supported for single partitions
                     tmpRdd = sc.parallelize(Array(h))) yield tmpRdd.union(r).coalesce(1)
      rdd.getOrElse(r)
    }

    val rdd = df.map(rowToString).repartition(partitions)
    val headerRdd = addHeader(rdd)

    if (compress)
      headerRdd.saveAsTextFile(path, classOf[GzipCodec])
    else
      headerRdd.saveAsTextFile(path)
  }

The answer above with spark-csv is correct but there is an issue - the library creates several files based on the data frame partitioning. And this is not what we usually need. So, you can combine all partitions to one:

df.coalesce(1).
    write.
    format("com.databricks.spark.csv").
    option("header", "true").
    save("myfile.csv")

and rename the output of the lib (name "part-00000") to a desire filename.

This blog post provides more details: https://fullstackml.com/2015/12/21/how-to-export-data-frame-from-apache-spark/


enter code here IN DATAFRAME:

val p=spark.read.format("csv").options(Map("header"->"true","delimiter"->"^")).load("filename.csv")

With the help of spark-csv we can write to a CSV file.

val dfsql = sqlContext.sql("select * from tablename")
dfsql.write.format("com.databricks.spark.csv").option("header","true").save("output.csv")`

Since Spark 2.X spark-csv is integrated as native datasource. Therefore, the necessary statement simplifies to (windows)

df.write
  .option("header", "true")
  .csv("file:///C:/out.csv")

or UNIX

df.write
  .option("header", "true")
  .csv("/var/out.csv")

Notice: as the comments say, it is creating the directory by that name with the partitions in it, not a standard CSV file. This, however, is most likely what you want since otherwise your either crashing your driver (out of RAM) or you could be working with a non distributed environment.


The error message suggests this is not a supported feature in the query language. But you can save a DataFrame in any format as usual through the RDD interface (df.rdd.saveAsTextFile). Or you can check out https://github.com/databricks/spark-csv.


Examples related to hadoop

Hadoop MapReduce: Strange Result when Storing Previous Value in Memory in a Reduce Class (Java) What is the difference between spark.sql.shuffle.partitions and spark.default.parallelism? How to check Spark Version What are the pros and cons of parquet format compared to other formats? java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient How to export data from Spark SQL to CSV How to copy data from one HDFS to another HDFS? How to calculate Date difference in Hive Select top 2 rows in Hive Spark - load CSV file as DataFrame?

Examples related to apache-spark

Select Specific Columns from Spark DataFrame Select columns in PySpark dataframe What is the difference between spark.sql.shuffle.partitions and spark.default.parallelism? How to find count of Null and Nan values for each column in a PySpark dataframe efficiently? Spark dataframe: collect () vs select () How does createOrReplaceTempView work in Spark? Spark difference between reduceByKey vs groupByKey vs aggregateByKey vs combineByKey Filter df when values matches part of a string in pyspark Filtering a pyspark dataframe using isin by exclusion Convert date from String to Date format in Dataframes

Examples related to export-to-csv

Excel: macro to export worksheet as CSV file without leaving my current Excel sheet Export to csv/excel from kibana How to export data from Spark SQL to CSV How to export a table dataframe in PySpark to csv? Java - Writing strings to a CSV file How to export dataGridView data Instantly to Excel on button click? Export javascript data to CSV file without server interaction How to create and download a csv file from php script? Export to CSV using jQuery and html export html table to csv

Examples related to hiveql

PySpark: withColumn() with two conditions and three outcomes How to export data from Spark SQL to CSV How to calculate Date difference in Hive Select top 2 rows in Hive Add a column in a table in HIVE QL How to get/generate the create statement for an existing hive table? How do I output the results of a HiveQL query to CSV? How to select current date in Hive SQL Hive insert query like SQL Difference between Hive internal tables and external tables?

Examples related to apache-spark-sql

Select Specific Columns from Spark DataFrame Pyspark: Filter dataframe based on multiple conditions Select columns in PySpark dataframe What is the difference between spark.sql.shuffle.partitions and spark.default.parallelism? How to find count of Null and Nan values for each column in a PySpark dataframe efficiently? Spark dataframe: collect () vs select () How does createOrReplaceTempView work in Spark? Filter df when values matches part of a string in pyspark Convert date from String to Date format in Dataframes Take n rows from a spark dataframe and pass to toPandas()