[apache-spark] How to import multiple csv files in a single load?

Reader's Digest: (Spark 2.x)

For Example, if you have 3 directories holding csv files:

dir1, dir2, dir3

You then define paths as a string of comma delimited list of paths as follows:

paths = "dir1/,dir2/,dir3/*"

Then use the following function and pass it this paths variable

def get_df_from_csv_paths(paths):

        df = spark.read.format("csv").option("header", "false").\
            schema(custom_schema).\
            option('delimiter', '\t').\
            option('mode', 'DROPMALFORMED').\
            load(paths.split(','))
        return df

By then running:

df = get_df_from_csv_paths(paths)

You will obtain in df a single spark dataframe containing the data from all the csvs found in these 3 directories.

===========================================================================

Full Version:

In case you want to ingest multiple CSVs from multiple directories you simply need to pass a list and use wildcards.

For Example:

if your data_path looks like this:

's3://bucket_name/subbucket_name/2016-09-*/184/*,
s3://bucket_name/subbucket_name/2016-10-*/184/*,
s3://bucket_name/subbucket_name/2016-11-*/184/*,
s3://bucket_name/subbucket_name/2016-12-*/184/*, ... '

you can use the above function to ingest all the csvs in all these directories and subdirectories at once:

This would ingest all directories in s3 bucket_name/subbucket_name/ according to the wildcard patterns specified. e.g. the first pattern would look in

bucket_name/subbucket_name/

for all directories with names starting with

2016-09-

and for each of those take only the directory named

184

and within that subdirectory look for all csv files.

And this would be executed for each of the patterns in the comma delimited list.

This works way better than union..

Examples related to apache-spark

Select Specific Columns from Spark DataFrame Select columns in PySpark dataframe What is the difference between spark.sql.shuffle.partitions and spark.default.parallelism? How to find count of Null and Nan values for each column in a PySpark dataframe efficiently? Spark dataframe: collect () vs select () How does createOrReplaceTempView work in Spark? Spark difference between reduceByKey vs groupByKey vs aggregateByKey vs combineByKey Filter df when values matches part of a string in pyspark Filtering a pyspark dataframe using isin by exclusion Convert date from String to Date format in Dataframes

Examples related to apache-spark-sql

Select Specific Columns from Spark DataFrame Pyspark: Filter dataframe based on multiple conditions Select columns in PySpark dataframe What is the difference between spark.sql.shuffle.partitions and spark.default.parallelism? How to find count of Null and Nan values for each column in a PySpark dataframe efficiently? Spark dataframe: collect () vs select () How does createOrReplaceTempView work in Spark? Filter df when values matches part of a string in pyspark Convert date from String to Date format in Dataframes Take n rows from a spark dataframe and pass to toPandas()

Examples related to spark-dataframe

How does createOrReplaceTempView work in Spark? Take n rows from a spark dataframe and pass to toPandas() how to filter out a null value from spark dataframe Spark RDD to DataFrame python Split Spark Dataframe string column into multiple columns Pyspark: display a spark data frame in a table format Fetching distinct values on a column using Spark DataFrame Convert spark DataFrame column to python list How to import multiple csv files in a single load? Converting Pandas dataframe into Spark dataframe error