[scala] Provide schema while reading csv file as a dataframe

I am trying to read a csv file into a dataframe. I know what the schema of my dataframe should be since I know my csv file. Also I am using spark csv package to read the file. I trying to specify the schema like below.

val pagecount = sqlContext.read.format("csv")
  .option("delimiter"," ").option("quote","")
  .option("schema","project: string ,article: string ,requests: integer ,bytes_served: long")
  .load("dbfs:/databricks-datasets/wikipedia-datasets/data-001/pagecounts/sample/pagecounts-20151124-170000")

But when I check the schema of the data frame I created, it seems to have taken its own schema. Am I doing anything wrong ? how to make spark to pick up the schema I mentioned ?

> pagecount.printSchema
root
|-- _c0: string (nullable = true)
|-- _c1: string (nullable = true)
|-- _c2: string (nullable = true)
|-- _c3: string (nullable = true)

This question is related to scala apache-spark dataframe apache-spark-sql spark-csv

The answer is


schema definition as simple string

Just in case if some one is interested in schema definition as simple string with date and time stamp

data file creation from Terminal or shell

echo " 
2019-07-02 22:11:11.000999, 01/01/2019, Suresh, abc  
2019-01-02 22:11:11.000001, 01/01/2020, Aadi, xyz 
" > data.csv

Defining the schema as String

    user_schema = 'timesta TIMESTAMP,date DATE,first_name STRING , last_name STRING'

reading the data

    df = spark.read.csv(path='data.csv', schema = user_schema, sep=',', dateFormat='MM/dd/yyyy',timestampFormat='yyyy-MM-dd HH:mm:ss.SSSSSS')

    df.show(10, False)

    +-----------------------+----------+----------+---------+
    |timesta                |date      |first_name|last_name|
    +-----------------------+----------+----------+---------+
    |2019-07-02 22:11:11.999|2019-01-01| Suresh   | abc     |
    |2019-01-02 22:11:11.001|2020-01-01| Aadi     | xyz     |
    +-----------------------+----------+----------+---------+

Please note defining the schema explicitly instead of letting spark infer the schema also improves the spark read performance.


In pyspark 2.4 onwards, you can simply use header parameter to set the correct header:

data = spark.read.csv('data.csv', header=True)

Similarly, if using scala you can use header parameter as well.


You can also do like this by using sparkSession and implicit

import sparkSession.implicits._
val pagecount:DataFrame = sparkSession.read
.option("delimiter"," ")
.option("quote","")
.option("inferSchema","true")
.csv("dbfs:/databricks-datasets/wikipedia-datasets/data-001/pagecounts/sample/pagecounts-20151124-170000")
.toDF("project","article","requests","bytes_served")


Try the below code, you need not specify the schema. When you give inferSchema as true it should take it from your csv file.

val pagecount = sqlContext.read.format("csv")
  .option("delimiter"," ").option("quote","")
  .option("header", "true")
  .option("inferSchema", "true")
  .load("dbfs:/databricks-datasets/wikipedia-datasets/data-001/pagecounts/sample/pagecounts-20151124-170000")

If you want to manually specify the schema, you can do it as below:

import org.apache.spark.sql.types._

val customSchema = StructType(Array(
  StructField("project", StringType, true),
  StructField("article", StringType, true),
  StructField("requests", IntegerType, true),
  StructField("bytes_served", DoubleType, true))
)

val pagecount = sqlContext.read.format("csv")
  .option("delimiter"," ").option("quote","")
  .option("header", "true")
  .schema(customSchema)
  .load("dbfs:/databricks-datasets/wikipedia-datasets/data-001/pagecounts/sample/pagecounts-20151124-170000")

Here's how you can work with a custom schema, a complete demo:

$> shell code,

echo "
Slingo, iOS 
Slingo, Android
" > game.csv

Scala code:

import org.apache.spark.sql.types._

val customSchema = StructType(Array(
  StructField("game_id", StringType, true),
  StructField("os_id", StringType, true)
))

val csv_df = spark.read.format("csv").schema(customSchema).load("game.csv")
csv_df.show 

csv_df.orderBy(asc("game_id"), desc("os_id")).show
csv_df.createOrReplaceTempView("game_view")
val sort_df = sql("select * from game_view order by game_id, os_id desc")
sort_df.show 

For those interested in doing this in Python here is a working version.

customSchema = StructType([
    StructField("IDGC", StringType(), True),        
    StructField("SEARCHNAME", StringType(), True),
    StructField("PRICE", DoubleType(), True)
])
productDF = spark.read.load('/home/ForTesting/testProduct.csv', format="csv", header="true", sep='|', schema=customSchema)

testProduct.csv
ID|SEARCHNAME|PRICE
6607|EFKTON75LIN|890.88
6612|EFKTON100HEN|55.66

Hope this helps.


I'm using the solution provided by Arunakiran Nulu in my analysis (see the code). Despite it is able to assign the correct types to the columns, all the values returned are null. Previously, I've tried to the option .option("inferSchema", "true") and it returns the correct values in the dataframe (although different type).

val customSchema = StructType(Array(
    StructField("numicu", StringType, true),
    StructField("fecha_solicitud", TimestampType, true),
    StructField("codtecnica", StringType, true),
    StructField("tecnica", StringType, true),
    StructField("finexploracion", TimestampType, true),
    StructField("ultimavalidacioninforme", TimestampType, true),
    StructField("validador", StringType, true)))

val df_explo = spark.read
        .format("csv")
        .option("header", "true")
        .option("delimiter", "\t")
        .option("timestampFormat", "yyyy/MM/dd HH:mm:ss") 
        .schema(customSchema)
        .load(filename)

Result

root


|-- numicu: string (nullable = true)
 |-- fecha_solicitud: timestamp (nullable = true)
 |-- codtecnica: string (nullable = true)
 |-- tecnica: string (nullable = true)
 |-- finexploracion: timestamp (nullable = true)
 |-- ultimavalidacioninforme: timestamp (nullable = true)
 |-- validador: string (nullable = true)

and the table is:

|numicu|fecha_solicitud|codtecnica|tecnica|finexploracion|ultimavalidacioninforme|validador|
+------+---------------+----------+-------+--------------+-----------------------+---------+
|  null|           null|      null|   null|          null|                   null|     null|
|  null|           null|      null|   null|          null|                   null|     null|
|  null|           null|      null|   null|          null|                   null|     null|
|  null|           null|      null|   null|          null|                   null|     null|

This is one of option where we can pass the column names to the dataframe while loading CSV.

import pandas
    names = ['sepal-length', 'sepal-width', 'petal-length', 'petal-width', 'class']
    dataset = pandas.read_csv("C:/Users/NS00606317/Downloads/Iris.csv", names=names, header=0)
print(dataset.head(10))

Output

    sepal-length  sepal-width  petal-length  petal-width        class
1            5.1          3.5           1.4          0.2  Iris-setosa
2            4.9          3.0           1.4          0.2  Iris-setosa
3            4.7          3.2           1.3          0.2  Iris-setosa
4            4.6          3.1           1.5          0.2  Iris-setosa
5            5.0          3.6           1.4          0.2  Iris-setosa
6            5.4          3.9           1.7          0.4  Iris-setosa
7            4.6          3.4           1.4          0.3  Iris-setosa
8            5.0          3.4           1.5          0.2  Iris-setosa
9            4.4          2.9           1.4          0.2  Iris-setosa
10           4.9          3.1           1.5          0.1  Iris-setosa

// import Library
import java.io.StringReader ;

import au.com.bytecode.opencsv.CSVReader

//filename

var train_csv = "/Path/train.csv";

//read as text file

val train_rdd = sc.textFile(train_csv)   

//use string reader to convert in proper format

var full_train_data  = train_rdd.map{line =>  var csvReader = new CSVReader(new StringReader(line)) ; csvReader.readNext();  }   

//declares  types

type s = String

// declare case class for schema

case class trainSchema (Loan_ID :s ,Gender :s, Married :s, Dependents :s,Education :s,Self_Employed :s,ApplicantIncome :s,CoapplicantIncome :s,
    LoanAmount :s,Loan_Amount_Term :s, Credit_History :s, Property_Area :s,Loan_Status :s)

//create DF RDD with custom schema 

var full_train_data_with_schema = full_train_data.mapPartitionsWithIndex{(idx,itr)=> if (idx==0) itr.drop(1); 
                     itr.toList.map(x=> trainSchema(x(0),x(1),x(2),x(3),x(4),x(5),x(6),x(7),x(8),x(9),x(10),x(11),x(12))).iterator }.toDF


Thanks to the answer by @Nulu, it works for pyspark with minimal tweaking

from pyspark.sql.types import LongType, StringType, StructField, StructType, BooleanType, ArrayType, IntegerType

customSchema = StructType(Array(
    StructField("project", StringType, true),
    StructField("article", StringType, true),
    StructField("requests", IntegerType, true),
    StructField("bytes_served", DoubleType, true)))

pagecount = sc.read.format("com.databricks.spark.csv")
         .option("delimiter"," ")
         .option("quote","")
         .option("header", "false")
         .schema(customSchema)
         .load("dbfs:/databricks-datasets/wikipedia-datasets/data-001/pagecounts/sample/pagecounts-20151124-170000")

here my solution is:

import org.apache.spark.sql.types._
  val spark = org.apache.spark.sql.SparkSession.builder.
  master("local[*]").
  appName("Spark CSV Reader").
  getOrCreate()

val movie_rating_schema = StructType(Array(
  StructField("UserID", IntegerType, true),
  StructField("MovieID", IntegerType, true),
  StructField("Rating", DoubleType, true),
  StructField("Timestamp", TimestampType, true)))

val df_ratings: DataFrame = spark.read.format("csv").
  option("header", "true").
  option("mode", "DROPMALFORMED").
  option("delimiter", ",").
  //option("inferSchema", "true").
  option("nullValue", "null").
  schema(movie_rating_schema).
  load(args(0)) //"file:///home/hadoop/spark-workspace/data/ml-20m/ratings.csv"

val movie_avg_scores = df_ratings.rdd.map(_.toString()).
  map(line => {
    // drop "[", "]" and then split the str 
    val fileds = line.substring(1, line.length() - 1).split(",")
    //extract (movie id, average rating)
    (fileds(1).toInt, fileds(2).toDouble)
  }).
  groupByKey().
  map(data => {
    val avg: Double = data._2.sum / data._2.size
    (data._1, avg)
  })

The previous solutions have used the custom StructType.

With spark-sql 2.4.5 (scala version 2.12.10) it is now possible to specify the schema as a string using the schema function

import org.apache.spark.sql.SparkSession;

val sparkSession = SparkSession.builder()
            .appName("sample-app")
            .master("local[2]")
            .getOrCreate();

val pageCount = sparkSession.read
  .format("csv")
  .option("delimiter","|")
  .option("quote","")
  .schema("project string ,article string ,requests integer ,bytes_served long")
  .load("dbfs:/databricks-datasets/wikipedia-datasets/data-001/pagecounts/sample/pagecounts-20151124-170000")

Examples related to scala

Intermediate language used in scalac? Why does calling sumr on a stream with 50 tuples not complete Select Specific Columns from Spark DataFrame Joining Spark dataframes on the key Provide schema while reading csv file as a dataframe how to filter out a null value from spark dataframe Fetching distinct values on a column using Spark DataFrame Can't push to the heroku Spark - Error "A master URL must be set in your configuration" when submitting an app Add jars to a Spark Job - spark-submit

Examples related to apache-spark

Select Specific Columns from Spark DataFrame Select columns in PySpark dataframe What is the difference between spark.sql.shuffle.partitions and spark.default.parallelism? How to find count of Null and Nan values for each column in a PySpark dataframe efficiently? Spark dataframe: collect () vs select () How does createOrReplaceTempView work in Spark? Spark difference between reduceByKey vs groupByKey vs aggregateByKey vs combineByKey Filter df when values matches part of a string in pyspark Filtering a pyspark dataframe using isin by exclusion Convert date from String to Date format in Dataframes

Examples related to dataframe

Trying to merge 2 dataframes but get ValueError How to show all of columns name on pandas dataframe? Python Pandas - Find difference between two data frames Pandas get the most frequent values of a column Display all dataframe columns in a Jupyter Python Notebook How to convert column with string type to int form in pyspark data frame? Display/Print one column from a DataFrame of Series in Pandas Binning column with python pandas Selection with .loc in python Set value to an entire column of a pandas dataframe

Examples related to apache-spark-sql

Select Specific Columns from Spark DataFrame Pyspark: Filter dataframe based on multiple conditions Select columns in PySpark dataframe What is the difference between spark.sql.shuffle.partitions and spark.default.parallelism? How to find count of Null and Nan values for each column in a PySpark dataframe efficiently? Spark dataframe: collect () vs select () How does createOrReplaceTempView work in Spark? Filter df when values matches part of a string in pyspark Convert date from String to Date format in Dataframes Take n rows from a spark dataframe and pass to toPandas()

Examples related to spark-csv

Provide schema while reading csv file as a dataframe How to show full column content in a Spark Dataframe? Write single CSV file using spark-csv