Parse CSV and load as DataFrame/DataSet with Spark 2.x
First, initialize SparkSession
object by default it will available in shells as spark
val spark = org.apache.spark.sql.SparkSession.builder
.master("local") # Change it as per your cluster
.appName("Spark CSV Reader")
.getOrCreate;
Use any one of the following ways to load CSV as DataFrame/DataSet
1. Do it in a programmatic way
val df = spark.read
.format("csv")
.option("header", "true") //first line in file has headers
.option("mode", "DROPMALFORMED")
.load("hdfs:///csv/file/dir/file.csv")
Update: Adding all options from here in case the link will be broken in future
- path: location of files. Similar to Spark can accept standard Hadoop globbing expressions.
- header: when set to true the first line of files will be used to name columns and will not be included in data. All types will be assumed string. The default value is false.
- delimiter: by default columns are delimited using, but delimiter can be set to any character
- quote: by default the quote character is ", but can be set to any character. Delimiters inside quotes are ignored
- escape: by default, the escape character is , but can be set to any character. Escaped quote characters are ignored
- parserLib: by default, it is "commons" that can be set to "univocity" to use that library for CSV parsing.
- mode: determines the parsing mode. By default it is PERMISSIVE. Possible values are:
- PERMISSIVE: tries to parse all lines: nulls are inserted for missing tokens and extra tokens are ignored.
- DROPMALFORMED: drops lines which have fewer or more tokens than expected or tokens which do not match the schema
- FAILFAST: aborts with a RuntimeException if encounters any malformed line
charset: defaults to 'UTF-8' but can be set to other valid charset names
- inferSchema: automatically infers column types. It requires one extra pass over the data and is false by default
comment: skip lines beginning with this character. Default is "#". Disable comments by setting this to null.
- nullValue: specifies a string that indicates a null value, any fields matching this string will be set as nulls in the DataFrame
- dateFormat: specifies a string that indicates the date format to use when reading dates or timestamps. Custom date formats follow the formats at java.text.SimpleDateFormat. This applies to both DateType and TimestampType. By default, it is null which means trying to parse times and date by java.sql.Timestamp.valueOf() and java.sql.Date.valueOf().
val df = spark.sql("SELECT * FROM csv.`hdfs:///csv/file/dir/file.csv`")
Dependencies:
"org.apache.spark" % "spark-core_2.11" % 2.0.0,
"org.apache.spark" % "spark-sql_2.11" % 2.0.0,
Spark version < 2.0
val df = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true")
.option("mode", "DROPMALFORMED")
.load("csv/file/path");
Dependencies:
"org.apache.spark" % "spark-sql_2.10" % 1.6.0,
"com.databricks" % "spark-csv_2.10" % 1.6.0,
"com.univocity" % "univocity-parsers" % LATEST,