I would like to read a CSV in spark and convert it as DataFrame and store it in HDFS with df.registerTempTable("table_name")
scala> val df = sqlContext.load("hdfs:///csv/file/dir/file.csv")
java.lang.RuntimeException: hdfs:///csv/file/dir/file.csv is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [49, 59, 54, 10]
at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:418)
at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:277)
at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:276)
at scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)
at scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650)
at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165)
at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)
at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
What is the right command to load CSV file as DataFrame in Apache Spark?
This question is related to
scala
apache-spark
hadoop
apache-spark-sql
hdfs
With Spark 2.4+, if you want to load a csv from a local directory, then you can use 2 sessions and load that into hive. The first session should be created with master() config as "local[*]" and the second session with "yarn" and Hive enabled.
The below one worked for me.
import org.apache.log4j.{Level, Logger}
import org.apache.spark._
import org.apache.spark.rdd._
import org.apache.spark.sql._
object testCSV {
def main(args: Array[String]) {
Logger.getLogger("org").setLevel(Level.ERROR)
val spark_local = SparkSession.builder().appName("CSV local files reader").master("local[*]").getOrCreate()
import spark_local.implicits._
spark_local.sql("SET").show(100,false)
val local_path="/tmp/data/spend_diversity.csv" // Local file
val df_local = spark_local.read.format("csv").option("inferSchema","true").load("file://"+local_path) // "file://" is mandatory
df_local.show(false)
val spark = SparkSession.builder().appName("CSV HDFS").config("spark.sql.warehouse.dir", "/apps/hive/warehouse").enableHiveSupport().getOrCreate()
import spark.implicits._
spark.sql("SET").show(100,false)
val df = df_local
df.createOrReplaceTempView("lcsv")
spark.sql(" drop table if exists work.local_csv ")
spark.sql(" create table work.local_csv as select * from lcsv ")
}
When ran with spark2-submit --master "yarn" --conf spark.ui.enabled=false testCSV.jar
it went fine and created the table in hive.
Penny's Spark 2 example is the way to do it in spark2. There's one more trick: have that header generated for you by doing an initial scan of the data, by setting the option inferSchema
to true
Here, then, assumming that spark
is a spark session you have set up, is the operation to load in the CSV index file of all the Landsat images which amazon host on S3.
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
val csvdata = spark.read.options(Map(
"header" -> "true",
"ignoreLeadingWhiteSpace" -> "true",
"ignoreTrailingWhiteSpace" -> "true",
"timestampFormat" -> "yyyy-MM-dd HH:mm:ss.SSSZZZ",
"inferSchema" -> "true",
"mode" -> "FAILFAST"))
.csv("s3a://landsat-pds/scene_list.gz")
The bad news is: this triggers a scan through the file; for something large like this 20+MB zipped CSV file, that can take 30s over a long haul connection. Bear that in mind: you are better off manually coding up the schema once you've got it coming in.
(code snippet Apache Software License 2.0 licensed to avoid all ambiguity; something I've done as a demo/integration test of S3 integration)
It's for whose Hadoop is 2.6 and Spark is 1.6 and without "databricks" package.
import org.apache.spark.sql.types.{StructType,StructField,StringType,IntegerType};
import org.apache.spark.sql.Row;
val csv = sc.textFile("/path/to/file.csv")
val rows = csv.map(line => line.split(",").map(_.trim))
val header = rows.first
val data = rows.filter(_(0) != header(0))
val rdd = data.map(row => Row(row(0),row(1).toInt))
val schema = new StructType()
.add(StructField("id", StringType, true))
.add(StructField("val", IntegerType, true))
val df = sqlContext.createDataFrame(rdd, schema)
Default file format is Parquet with spark.read.. and file reading csv that why you are getting the exception. Specify csv format with api you are trying to use
With in-built Spark csv, you can get it done easily with new SparkSession object for Spark > 2.0.
val df = spark.
read.
option("inferSchema", "false").
option("header","true").
option("mode","DROPMALFORMED").
option("delimiter", ";").
schema(dataSchema).
csv("/csv/file/dir/file.csv")
df.show()
df.printSchema()
There are various options you can set.
header
: whether your file includes header line at the topinferSchema
: whether you want to infer schema automatically or not. Default is true
. I always prefer to provide schema to ensure proper datatypes.mode
: parsing mode, PERMISSIVE, DROPMALFORMED or FAILFASTdelimiter
: to specify delimiter, default is comma(',')In case you are building a jar with scala 2.11 and Apache 2.0 or higher.
There is no need to create a sqlContext
or sparkContext
object. Just a SparkSession
object suffices the requirement for all needs.
Following is mycode which works fine:
import org.apache.spark.sql.{DataFrame, Row, SQLContext, SparkSession}
import org.apache.log4j.{Level, LogManager, Logger}
object driver {
def main(args: Array[String]) {
val log = LogManager.getRootLogger
log.info("**********JAR EXECUTION STARTED**********")
val spark = SparkSession.builder().master("local").appName("ValidationFrameWork").getOrCreate()
val df = spark.read.format("csv")
.option("header", "true")
.option("delimiter","|")
.option("inferSchema","true")
.load("d:/small_projects/spark/test.pos")
df.show()
}
}
In case you are running in cluster just change .master("local")
to .master("yarn")
while defining the sparkBuilder
object
The Spark Doc covers this: https://spark.apache.org/docs/2.2.0/sql-programming-guide.html
First, initialize SparkSession
object by default it will available in shells as spark
val spark = org.apache.spark.sql.SparkSession.builder
.master("local") # Change it as per your cluster
.appName("Spark CSV Reader")
.getOrCreate;
Use any one of the following ways to load CSV as
DataFrame/DataSet
val df = spark.read
.format("csv")
.option("header", "true") //first line in file has headers
.option("mode", "DROPMALFORMED")
.load("hdfs:///csv/file/dir/file.csv")
val df = spark.sql("SELECT * FROM csv.`hdfs:///csv/file/dir/file.csv`")
Dependencies:
"org.apache.spark" % "spark-core_2.11" % 2.0.0,
"org.apache.spark" % "spark-sql_2.11" % 2.0.0,
val df = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true")
.option("mode", "DROPMALFORMED")
.load("csv/file/path");
Dependencies:
"org.apache.spark" % "spark-sql_2.10" % 1.6.0,
"com.databricks" % "spark-csv_2.10" % 1.6.0,
"com.univocity" % "univocity-parsers" % LATEST,
To read from relative path on the system use System.getProperty method to get current directory and further uses to load the file using relative path.
scala> val path = System.getProperty("user.dir").concat("/../2015-summary.csv")
scala> val csvDf = spark.read.option("inferSchema","true").option("header", "true").csv(path)
scala> csvDf.take(3)
spark:2.4.4 scala:2.11.12
In Java 1.8 This code snippet perfectly working to read CSV files
POM.xml
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.10 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>2.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.scala-lang/scala-library -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.8</version>
</dependency>
<dependency>
<groupId>com.databricks</groupId>
<artifactId>spark-csv_2.10</artifactId>
<version>1.4.0</version>
</dependency>
Java
SparkConf conf = new SparkConf().setAppName("JavaWordCount").setMaster("local");
// create Spark Context
SparkContext context = new SparkContext(conf);
// create spark Session
SparkSession sparkSession = new SparkSession(context);
Dataset<Row> df = sparkSession.read().format("com.databricks.spark.csv").option("header", true).option("inferSchema", true).load("hdfs://localhost:9000/usr/local/hadoop_data/loan_100.csv");
//("hdfs://localhost:9000/usr/local/hadoop_data/loan_100.csv");
System.out.println("========== Print Schema ============");
df.printSchema();
System.out.println("========== Print Data ==============");
df.show();
System.out.println("========== Print title ==============");
df.select("title").show();
There are a lot of challenges to parsing a CSV file, it keeps adding up if the file size is bigger, if there are non-english/escape/separator/other characters in the column values, that could cause parsing errors.
The magic then is in the options that are used. The ones that worked for me and hope should cover most of the edge cases are in code below:
### Create a Spark Session
spark = SparkSession.builder.master("local").appName("Classify Urls").getOrCreate()
### Note the options that are used. You may have to tweak these in case of error
html_df = spark.read.csv(html_csv_file_path,
header=True,
multiLine=True,
ignoreLeadingWhiteSpace=True,
ignoreTrailingWhiteSpace=True,
encoding="UTF-8",
sep=',',
quote='"',
escape='"',
maxColumns=2,
inferSchema=True)
Hope that helps. For more refer: Using PySpark 2 to read CSV having HTML source code
Note: The code above is from Spark 2 API, where the CSV file reading API comes bundled with built-in packages of Spark installable.
Note: PySpark is a Python wrapper for Spark and shares the same API as Scala/Java.
Add following Spark dependencies to POM file :
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.2.0</version>
</dependency>
//Spark configuration:
val spark = SparkSession.builder().master("local").appName("Sample App").getOrCreate()
//Read csv file:
val df = spark.read.option("header", "true").csv("FILE_PATH")
// Display output
df.show()
Try this if using spark 2.0+
For non-hdfs file:
df = spark.read.csv("file:///csvfile.csv")
For hdfs file:
df = spark.read.csv("hdfs:///csvfile.csv")
For hdfs file (with different delimiter than comma:
df = spark.read.option("delimiter","|")csv("hdfs:///csvfile.csv")
Note:- this work for any delimited file. Just use option(“delimiter”,) to change value.
Hope this is helpful.
With Spark 2.0, following is how you can read CSV
val conf = new SparkConf().setMaster("local[2]").setAppName("my app")
val sc = new SparkContext(conf)
val sparkSession = SparkSession.builder
.config(conf = conf)
.appName("spark session example")
.getOrCreate()
val path = "/Users/xxx/Downloads/usermsg.csv"
val base_df = sparkSession.read.option("header","true").
csv(path)
Source: Stackoverflow.com