[apache-spark] How to read multiple text files into a single RDD?

I want to read a bunch of text files from a hdfs location and perform mapping on it in an iteration using spark.

JavaRDD<String> records = ctx.textFile(args[1], 1); is capable of reading only one file at a time.

I want to read more than one file and process them as a single RDD. How?

There is a straight forward clean solution available. Use the wholeTextFiles() method. This will take a directory and forms a key value pair. The returned RDD will be a pair RDD. Find below the description from Spark docs:

SparkContext.wholeTextFiles lets you read a directory containing multiple small text files, and returns each of them as (filename, content) pairs. This is in contrast with textFile, which would return one record per line in each file

In PySpark, I have found an additional useful way to parse files. Perhaps there is an equivalent in Scala, but I am not comfortable enough coming up with a working translation. It is, in effect, a textFile call with the addition of labels (in the below example the key = filename, value = 1 line from file).

"Labeled" textFile


import glob
from pyspark import SparkContext
sc = SparkContext("local","example") # if running locally
sqlContext = SQLContext(sc)

for filename in glob.glob(Data_File + "/*"):
    Spark_Full += sc.textFile(filename).keyBy(lambda x: filename)

output: array with each entry containing a tuple using filename-as-key and with value = each line of file. (Technically, using this method you can also use a different key besides the actual filepath name- perhaps a hashing representation to save on memory). ie.

[('/home/folder_with_text_files/file1.txt', 'file1_contents_line1'),
 ('/home/folder_with_text_files/file1.txt', 'file1_contents_line2'),
 ('/home/folder_with_text_files/file1.txt', 'file1_contents_line3'),
 ('/home/folder_with_text_files/file2.txt', 'file2_contents_line1'),

You can also recombine either as a list of lines:

Spark_Full.groupByKey().map(lambda x: (x[0], list(x[1]))).collect()

[('/home/folder_with_text_files/file1.txt', ['file1_contents_line1', 'file1_contents_line2','file1_contents_line3']),
 ('/home/folder_with_text_files/file2.txt', ['file2_contents_line1'])]

Or recombine entire files back to single strings (in this example the result is the same as what you get from wholeTextFiles, but with the string "file:" stripped from the filepathing.):

Spark_Full.groupByKey().map(lambda x: (x[0], ' '.join(list(x[1])))).collect()

You can use this

First You can get a Buffer/List of S3 Paths :

import scala.collection.JavaConverters._
import java.util.ArrayList
import com.amazonaws.services.s3.AmazonS3Client
import com.amazonaws.services.s3.model.ObjectListing
import com.amazonaws.services.s3.model.S3ObjectSummary
import com.amazonaws.services.s3.model.ListObjectsRequest

def listFiles(s3_bucket:String, base_prefix : String) = {
    var files = new ArrayList[String]

    //S3 Client and List Object Request
    var s3Client = new AmazonS3Client();
    var objectListing: ObjectListing = null;
    var listObjectsRequest = new ListObjectsRequest();

    //Your S3 Bucket

    //Your Folder path or Prefix

    //Adding s3:// to the paths and adding to a list
    do {
      objectListing = s3Client.listObjects(listObjectsRequest);
      for (objectSummary <- objectListing.getObjectSummaries().asScala) {
        files.add("s3://" + s3_bucket + "/" + objectSummary.getKey());
    } while (objectListing.isTruncated());

    //Removing Base Directory Name

    //Creating a Scala List for same

Now Pass this List object to the following piece of code, note : sc is an object of SQLContext

var df: DataFrame = null;
  for (file <- files) {
    val fileDf= sc.textFile(file)
    if (df!= null) {
      df= df.unionAll(fileDf)
    } else {
      df= fileDf

Now you got a final Unified RDD i.e. df

Optional, And You can also repartition it in a single BigRDD

val files = sc.textFile(filename, 1).repartition(1)

Repartitioning always works :D

Use union as follows:

val sc = new SparkContext(...)
val r1 = sc.textFile("xxx1")
val r2 = sc.textFile("xxx2")
val rdds = Seq(r1, r2, ...)
val bigRdd = sc.union(rdds)

Then the bigRdd is the RDD with all files.

you can use

JavaRDD<String , String> records = sc.wholeTextFiles("path of your directory")

here you will get the path of your file and content of that file. so you can perform any action of a whole file at a time that saves the overhead

You can use a single textFile call to read multiple files. Scala:


rdd = textFile('/data/{1.txt,2.txt}')

All answers are correct with sc.textFile

I was just wondering why not wholeTextFiles For example, in this case...

val minPartitions = 2
val path = "/pathtohdfs"
      .flatMap{case (path, text) 

one limitation is that, we have to load small files otherwise performance will be bad and may lead to OOM.

Note :

  • The wholefile should fit in to memory
  • Good for file formats that are NOT splittable by line... such as XML files

