Reading a File Into a Spark RDD (Scala Cookbook recipe)

This is an excerpt from the Scala Cookbook, 2nd Edition. This is Recipe 20.2, Reading a File Into an Apache Spark RDD.

Problem

You want to start reading data files into a Spark RDD.

Solution

The canonical example for showing how to read a data file into an RDD is a “word count” application, so not to disappoint, this recipe shows how to read the text of the Gettysburg Address by Abraham Lincoln, and find out how many times each word in the text is used.

After starting the Spark shell, the first step in the process is to read a file named Gettysburg-Address.txt using the textFile method of the SparkContext variable sc that was introduced in the previous recipe:

scala> val fileRdd = sc.textFile("Gettysburg-Address.txt")
fileRdd: org.apache.spark.rdd.RDD[String] = Gettysburg-Address.txt
         MapPartitionsRDD[1] at textFile at <console>:24

This example assumes that Gettysburg-Address.txt is in the current directory.

The textFile method is used to read plain text files, and returns an object of type RDD[String]. It’s also lazy, which means that nothing happens yet. In fact, if you spell the filename incorrectly, you won’t find out until some time later when you call a non-lazy action method.

The rest of the word-count solution is pure Scala code. You just call a series of transformation methods on the RDD to get the solution you want:

val counts = fileRdd.map(_.replaceAll("[.,]", ""))
                    .map(_.replace("—", " "))
                    .flatMap(line => line.split(" "))
                    .map(word => (word, 1))
                    .reduceByKey(_ + _)
                    .sortBy(_._2)
                    .collect

The only things unique to Spark in this solution are:

  • Calling a map method on fileRdd (which has the type RDD[String])

  • Calling the collect method at the end

As mentioned in the previous recipe, because all of the Spark transformation methods are lazy, you have to call an eager action method like collect to get the action started.

Note Pasting multiline expressions == With Spark 3.1, when you have a multiline expression like this, you have to paste it into the Spark shell using its :paste command. The steps are:

Type [Control][d] to end the past operation

The shell will then interpret your expression. ==

Discussion

Here’s an explanation of how that code works. First, I create fileRdd as an RDD[String] with this code:

val fileRdd = sc.textFile("Gettysburg-Address.txt")

Because textFile is lazy, nothing actually happens yet.

Next, because I want to count words, I get rid of decimals, commas, and hyphens in the text using these two map transformation calls:

.map(_.replaceAll("[.,]", ""))
.map(_.replace("—", " "))

Next, I use this flatMap expression to convert the text into an array of words:

.flatMap(line => line.split(" "))

If you look at the result of the two map expressions and this flatMap expression, you’d see an Array[String]:

Array(Four, score, and, seven, years, ago, our, fathers ...

To get from this Array[String] to a solution that has a set of all words and the number of times they occur — a Map[String, Int] — the next step is to turn each word into a tuple:

.map(word => (word, 1))

At this point the intermediate data looks like this:

Array[(String, Int)] = Array((Four,1), (score,1), (and,1), (seven,1) ...

Next I use reduceByKey:

.reduceByKey(_ + _)

This transforms the intermediate data into this data structure:

Array[(String, Int)] = Array((nobly,1), (under,1), (this,4), (have,5) ...

As shown, this is an array of tuples, where the first value of each tuple is a word from the speech, and the second value is a count of how many times that word occurs. This is what reduceByKey does for us.

Finally I sort that data by the second tuple element:

.sortBy(_._2)

Finally you invoke the collect method to force all of the transformations to be run:

.collect

Because the data structure has the type Array[(String, Int)], you can call counts.foreach(println) on it. The end of its output shows the most common words in the speech:

scala> counts.foreach(println)
[data omitted here]
(here,8)
(we,8)
(to,8)
(the,9)
(that,13)

Methods to read text files into an RDD

There are two main methods to read text files into an RDD:

  • sparkContext.textFile

  • sparkContext.wholeTextFiles

The textFile method reads a file as a collection of lines. It can read a file from the local filesystem, or from a Hadoop or Amazon S3 filesystem using "hdfs://" and "s3a://" URLs, respectively.

The textFile method takes an optional second parameter related to partitioning. Per the RDD Programming Guide, textFile “takes an optional second argument for controlling the number of partitions of the file. By default, Spark creates one partition for each block of the file (blocks being 128MB by default in HDFS), but you can also ask for a higher number of partitions by passing a larger value.”

Here are some examples of how to use textFile:

textFile("/foo/file.txt")                  // read a file, using the default
                                           // number of partitions
textFile("/foo/file.txt", 8)               // same, but with 8 partitions
textFile("/foo/bar.txt", "/foo/baz.txt")   // read multiple files
textFile("/foo/ba*.txt")                   // read multiple files
textFile("/foo/*")                         // read all files in 'foo'
textFile("/a/1.txt", "/b/2.txt")           // multiple files in different
                                           // directories
textFile("hdfs://.../myfile.csv")          // use a Hadoop URL
textFile("s3a://.../myfile.csv")           // use an Amazon S3 URL

Note that the s3a URL prefix is the name of the Hadoop S3 connector, and was previously named s3 and s3n, so you may see those uses in older documentation.

The wholeTextFiles method reads the entire file into a single String, and returns an RDD that contains a tuple, so its return type is RDD[(String, String)]. The first string in the tuple is the filename that was read, and the second string is the entire contents of the file. This example shows how that works:

scala> val rdd = sc.wholeTextFiles("Gettysburg-Address.txt")
rdd: org.apache.spark.rdd.RDD[(String, String)] = Gettysburg-Address.txt
     MapPartitionsRDD[5] at wholeTextFiles at <console>:24

scala> rdd.foreach(t => println(s"${t._1} | ${t._2.take(15)}"))
file:/Users/al/Projects/Books/ScalaCookbook2Examples/16_Ecosystem/Spark/ ↵
     SparkApp1/Gettysburg-Address.txt | Four score and

The output from the second expression shows that the tuple contains the filename and file content.

Spark also contains other methods for reading files into a DataFrame or Dataset:

  • spark.read.text() is used to read a text file into DataFrame

  • spark.read.textFile() is used to read a text file into a Dataset[String]

  • spark.read.csv() and spark.read.format("csv").load("<path>") are used to read a CSV file into a DataFrame

These methods are demonstrated in the following recipes.

Saving an RDD to disk

When you obtain your final result using RDD transformation and action methods, you may want to save your results. You can save an RDD to disk using its saveAsTextFile method. This command saves an RDD to a directory named MyRddOutput under the /tmp directory:

myRdd.saveAsTextFile("/tmp/MyRddOutput")

After you do this you’ll find a series of files under the /tmp/MyRddOutput that represent the RDD named myRdd. Note that if the directory already exists, this operation will fail with an exception.

Reading more complicated text file formats

On a MacOS system, the /etc/passwd file contains seven fields that are delimited by the : character, but it also contains initial lines of comments, with each comment line beginning with the # character. To read this file into an RDD you need to skip those initial comment lines.

One way to do this is to read the file into an RDD as usual:

val fileRdd = spark.sparkContext.textFile("/etc/passwd")

Next, create a case class to model the seven-column format:

case class PasswordRecord (
    username: String,
    password: String,
    userId: Int,
    groupId: Int,
    comment: String,
    homeDirectory: String,
    shell: String
)

Now you can convert fileRdd into a new RDD by first filtering out all records that start with the # character, and then converting the remaining seven-column fields into a PasswordRecord:

val rdd = fileRdd
    .filter(! _.startsWith("#"))
    .map { line =>
        val row = line.split(":")
        PasswordRecord(
            row(0), row(1), row(2).toInt, row(3).toInt, row(4), row(5), row(6)
        )
    }

After doing this, you’ll see that rdd only contains the colon-separated rows from the file:

scala> rdd.take(3)
res1: Array[PasswordRecord] = Array(
PasswordRecord(nobody,*,-2,-2,Unprivileged User,/var/empty,/usr/bin/false),
PasswordRecord(root,*,0,0,System Administrator,/var/root,/bin/sh),
PasswordRecord(daemon,*,1,1,System Services,/var/root,/usr/bin/false)
)

Now you can work with this RDD as shown previously.

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

See Also