[apache-spark] How to read multiple text files into a single RDD?

I want to read a bunch of text files from a hdfs location and perform mapping on it in an iteration using spark.

JavaRDD<String> records = ctx.textFile(args[1], 1); is capable of reading only one file at a time.

I want to read more than one file and process them as a single RDD. How?

This question is related to apache-spark

The answer is


There is a straight forward clean solution available. Use the wholeTextFiles() method. This will take a directory and forms a key value pair. The returned RDD will be a pair RDD. Find below the description from Spark docs:

SparkContext.wholeTextFiles lets you read a directory containing multiple small text files, and returns each of them as (filename, content) pairs. This is in contrast with textFile, which would return one record per line in each file


In PySpark, I have found an additional useful way to parse files. Perhaps there is an equivalent in Scala, but I am not comfortable enough coming up with a working translation. It is, in effect, a textFile call with the addition of labels (in the below example the key = filename, value = 1 line from file).

"Labeled" textFile

input:

import glob
from pyspark import SparkContext
SparkContext.stop(sc)
sc = SparkContext("local","example") # if running locally
sqlContext = SQLContext(sc)

for filename in glob.glob(Data_File + "/*"):
    Spark_Full += sc.textFile(filename).keyBy(lambda x: filename)

output: array with each entry containing a tuple using filename-as-key and with value = each line of file. (Technically, using this method you can also use a different key besides the actual filepath name- perhaps a hashing representation to save on memory). ie.

[('/home/folder_with_text_files/file1.txt', 'file1_contents_line1'),
 ('/home/folder_with_text_files/file1.txt', 'file1_contents_line2'),
 ('/home/folder_with_text_files/file1.txt', 'file1_contents_line3'),
 ('/home/folder_with_text_files/file2.txt', 'file2_contents_line1'),
  ...]

You can also recombine either as a list of lines:

Spark_Full.groupByKey().map(lambda x: (x[0], list(x[1]))).collect()

[('/home/folder_with_text_files/file1.txt', ['file1_contents_line1', 'file1_contents_line2','file1_contents_line3']),
 ('/home/folder_with_text_files/file2.txt', ['file2_contents_line1'])]

Or recombine entire files back to single strings (in this example the result is the same as what you get from wholeTextFiles, but with the string "file:" stripped from the filepathing.):

Spark_Full.groupByKey().map(lambda x: (x[0], ' '.join(list(x[1])))).collect()


You can use this

First You can get a Buffer/List of S3 Paths :

import scala.collection.JavaConverters._
import java.util.ArrayList
import com.amazonaws.services.s3.AmazonS3Client
import com.amazonaws.services.s3.model.ObjectListing
import com.amazonaws.services.s3.model.S3ObjectSummary
import com.amazonaws.services.s3.model.ListObjectsRequest

def listFiles(s3_bucket:String, base_prefix : String) = {
    var files = new ArrayList[String]

    //S3 Client and List Object Request
    var s3Client = new AmazonS3Client();
    var objectListing: ObjectListing = null;
    var listObjectsRequest = new ListObjectsRequest();

    //Your S3 Bucket
    listObjectsRequest.setBucketName(s3_bucket)

    //Your Folder path or Prefix
    listObjectsRequest.setPrefix(base_prefix)

    //Adding s3:// to the paths and adding to a list
    do {
      objectListing = s3Client.listObjects(listObjectsRequest);
      for (objectSummary <- objectListing.getObjectSummaries().asScala) {
        files.add("s3://" + s3_bucket + "/" + objectSummary.getKey());
      }
      listObjectsRequest.setMarker(objectListing.getNextMarker());
    } while (objectListing.isTruncated());

    //Removing Base Directory Name
    files.remove(0)

    //Creating a Scala List for same
    files.asScala
  }

Now Pass this List object to the following piece of code, note : sc is an object of SQLContext

var df: DataFrame = null;
  for (file <- files) {
    val fileDf= sc.textFile(file)
    if (df!= null) {
      df= df.unionAll(fileDf)
    } else {
      df= fileDf
    }
  }

Now you got a final Unified RDD i.e. df

Optional, And You can also repartition it in a single BigRDD

val files = sc.textFile(filename, 1).repartition(1)

Repartitioning always works :D


Use union as follows:

val sc = new SparkContext(...)
val r1 = sc.textFile("xxx1")
val r2 = sc.textFile("xxx2")
...
val rdds = Seq(r1, r2, ...)
val bigRdd = sc.union(rdds)

Then the bigRdd is the RDD with all files.


you can use

JavaRDD<String , String> records = sc.wholeTextFiles("path of your directory")

here you will get the path of your file and content of that file. so you can perform any action of a whole file at a time that saves the overhead


You can use a single textFile call to read multiple files. Scala:

sc.textFile(','.join(files)) 

rdd = textFile('/data/{1.txt,2.txt}')

TRY THIS Interface used to write a DataFrame to external storage systems (e.g. file systems, key-value stores, etc). Use DataFrame.write() to access this.

New in version 1.4.

csv(path, mode=None, compression=None, sep=None, quote=None, escape=None, header=None, nullValue=None, escapeQuotes=None, quoteAll=None, dateFormat=None, timestampFormat=None) Saves the content of the DataFrame in CSV format at the specified path.

Parameters: path – the path in any Hadoop supported file system mode – specifies the behavior of the save operation when data already exists.

append: Append contents of this DataFrame to existing data. overwrite: Overwrite existing data. ignore: Silently ignore this operation if data already exists. error (default case): Throw an exception if data already exists. compression – compression codec to use when saving to file. This can be one of the known case-insensitive shorten names (none, bzip2, gzip, lz4, snappy and deflate). sep – sets the single character as a separator for each field and value. If None is set, it uses the default value, ,. quote – sets the single character used for escaping quoted values where the separator can be part of the value. If None is set, it uses the default value, ". If you would like to turn off quotations, you need to set an empty string. escape – sets the single character used for escaping quotes inside an already quoted value. If None is set, it uses the default value, \ escapeQuotes – A flag indicating whether values containing quotes should always be enclosed in quotes. If None is set, it uses the default value true, escaping all values containing a quote character. quoteAll – A flag indicating whether all values should always be enclosed in quotes. If None is set, it uses the default value false, only escaping values containing a quote character. header – writes the names of columns as the first line. If None is set, it uses the default value, false. nullValue – sets the string representation of a null value. If None is set, it uses the default value, empty string. dateFormat – sets the string that indicates a date format. Custom date formats follow the formats at java.text.SimpleDateFormat. This applies to date type. If None is set, it uses the default value value, yyyy-MM-dd. timestampFormat – sets the string that indicates a timestamp format. Custom date formats follow the formats at java.text.SimpleDateFormat. This applies to timestamp type. If None is set, it uses the default value value, yyyy-MM-dd'T'HH:mm:ss.SSSZZ.


All answers are correct with sc.textFile

I was just wondering why not wholeTextFiles For example, in this case...

val minPartitions = 2
val path = "/pathtohdfs"
    sc.wholeTextFiles(path,minPartitions)
      .flatMap{case (path, text) 
    ...

one limitation is that, we have to load small files otherwise performance will be bad and may lead to OOM.

Note :

  • The wholefile should fit in to memory
  • Good for file formats that are NOT splittable by line... such as XML files

Further reference to visit