[scala] Spark - load CSV file as DataFrame?

Parse CSV and load as DataFrame/DataSet with Spark 2.x

First, initialize SparkSession object by default it will available in shells as spark

val spark = org.apache.spark.sql.SparkSession.builder
        .master("local") # Change it as per your cluster
        .appName("Spark CSV Reader")
        .getOrCreate;

Use any one of the following ways to load CSV as DataFrame/DataSet

1. Do it in a programmatic way

 val df = spark.read
         .format("csv")
         .option("header", "true") //first line in file has headers
         .option("mode", "DROPMALFORMED")
         .load("hdfs:///csv/file/dir/file.csv")

Update: Adding all options from here in case the link will be broken in future

  • path: location of files. Similar to Spark can accept standard Hadoop globbing expressions.
  • header: when set to true the first line of files will be used to name columns and will not be included in data. All types will be assumed string. The default value is false.
  • delimiter: by default columns are delimited using, but delimiter can be set to any character
  • quote: by default the quote character is ", but can be set to any character. Delimiters inside quotes are ignored
  • escape: by default, the escape character is , but can be set to any character. Escaped quote characters are ignored
  • parserLib: by default, it is "commons" that can be set to "univocity" to use that library for CSV parsing.
  • mode: determines the parsing mode. By default it is PERMISSIVE. Possible values are:
    • PERMISSIVE: tries to parse all lines: nulls are inserted for missing tokens and extra tokens are ignored.
    • DROPMALFORMED: drops lines which have fewer or more tokens than expected or tokens which do not match the schema
    • FAILFAST: aborts with a RuntimeException if encounters any malformed line charset: defaults to 'UTF-8' but can be set to other valid charset names
  • inferSchema: automatically infers column types. It requires one extra pass over the data and is false by default comment: skip lines beginning with this character. Default is "#". Disable comments by setting this to null.
  • nullValue: specifies a string that indicates a null value, any fields matching this string will be set as nulls in the DataFrame
  • dateFormat: specifies a string that indicates the date format to use when reading dates or timestamps. Custom date formats follow the formats at java.text.SimpleDateFormat. This applies to both DateType and TimestampType. By default, it is null which means trying to parse times and date by java.sql.Timestamp.valueOf() and java.sql.Date.valueOf().

2. You can do this SQL way as well

 val df = spark.sql("SELECT * FROM csv.`hdfs:///csv/file/dir/file.csv`")

Dependencies:

 "org.apache.spark" % "spark-core_2.11" % 2.0.0,
 "org.apache.spark" % "spark-sql_2.11" % 2.0.0,

Spark version < 2.0

val df = sqlContext.read
    .format("com.databricks.spark.csv")
    .option("header", "true") 
    .option("mode", "DROPMALFORMED")
    .load("csv/file/path"); 

Dependencies:

"org.apache.spark" % "spark-sql_2.10" % 1.6.0,
"com.databricks" % "spark-csv_2.10" % 1.6.0,
"com.univocity" % "univocity-parsers" % LATEST,

Examples related to scala

Intermediate language used in scalac? Why does calling sumr on a stream with 50 tuples not complete Select Specific Columns from Spark DataFrame Joining Spark dataframes on the key Provide schema while reading csv file as a dataframe how to filter out a null value from spark dataframe Fetching distinct values on a column using Spark DataFrame Can't push to the heroku Spark - Error "A master URL must be set in your configuration" when submitting an app Add jars to a Spark Job - spark-submit

Examples related to apache-spark

Select Specific Columns from Spark DataFrame Select columns in PySpark dataframe What is the difference between spark.sql.shuffle.partitions and spark.default.parallelism? How to find count of Null and Nan values for each column in a PySpark dataframe efficiently? Spark dataframe: collect () vs select () How does createOrReplaceTempView work in Spark? Spark difference between reduceByKey vs groupByKey vs aggregateByKey vs combineByKey Filter df when values matches part of a string in pyspark Filtering a pyspark dataframe using isin by exclusion Convert date from String to Date format in Dataframes

Examples related to hadoop

Hadoop MapReduce: Strange Result when Storing Previous Value in Memory in a Reduce Class (Java) What is the difference between spark.sql.shuffle.partitions and spark.default.parallelism? How to check Spark Version What are the pros and cons of parquet format compared to other formats? java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient How to export data from Spark SQL to CSV How to copy data from one HDFS to another HDFS? How to calculate Date difference in Hive Select top 2 rows in Hive Spark - load CSV file as DataFrame?

Examples related to apache-spark-sql

Select Specific Columns from Spark DataFrame Pyspark: Filter dataframe based on multiple conditions Select columns in PySpark dataframe What is the difference between spark.sql.shuffle.partitions and spark.default.parallelism? How to find count of Null and Nan values for each column in a PySpark dataframe efficiently? Spark dataframe: collect () vs select () How does createOrReplaceTempView work in Spark? Filter df when values matches part of a string in pyspark Convert date from String to Date format in Dataframes Take n rows from a spark dataframe and pass to toPandas()

Examples related to hdfs

What are the pros and cons of parquet format compared to other formats? How to copy data from one HDFS to another HDFS? Spark - load CSV file as DataFrame? hadoop copy a local file system folder to HDFS What is the purpose of shuffling and sorting phase in the reducer in Map Reduce Programming? How to fix corrupt HDFS FIles How to copy file from HDFS to the local file system Name node is in safe mode. Not able to leave Hive load CSV with commas in quoted fields Permission denied at hdfs