This is an excerpt from the Scala Cookbook, 2nd Edition. This is Recipe 20.2, Reading a File Into an Apache Spark RDD.
Problem
You want to start reading data files into a Spark RDD.
Solution
The canonical example for showing how to read a data file into an RDD is a “word count” application, so not to disappoint, this recipe shows how to read the text of the Gettysburg Address by Abraham Lincoln, and find out how many times each word in the text is used.
After starting the Spark shell, the first step in the process is to read a file named Gettysburg-Address.txt using the textFile
method of the SparkContext
variable sc
that was introduced in the previous recipe:
scala> val fileRdd = sc.textFile("Gettysburg-Address.txt") fileRdd: org.apache.spark.rdd.RDD[String] = Gettysburg-Address.txt MapPartitionsRDD[1] at textFile at <console>:24
This example assumes that Gettysburg-Address.txt is in the current directory.
The textFile
method is used to read plain text files, and returns an object of type RDD[String]
. It’s also lazy, which means that nothing happens yet. In fact, if you spell the filename incorrectly, you won’t find out until some time later when you call a non-lazy action method.
The rest of the word-count solution is pure Scala code. You just call a series of transformation methods on the RDD to get the solution you want:
val counts = fileRdd.map(_.replaceAll("[.,]", ""))
.map(_.replace("—", " "))
.flatMap(line => line.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _)
.sortBy(_._2)
.collect
The only things unique to Spark in this solution are:
-
Calling a
map
method onfileRdd
(which has the typeRDD[String]
) -
Calling the
collect
method at the end
As mentioned in the previous recipe, because all of the Spark transformation methods are lazy, you have to call an eager action method like collect
to get the action started.
Note | Pasting multiline expressions == With Spark 3.1, when you have a multiline expression like this, you have to paste it into the Spark shell using its :paste command. The steps are: |
Type [Control][d]
to end the past operation
The shell will then interpret your expression. ==
Discussion
Here’s an explanation of how that code works. First, I create fileRdd
as an RDD[String]
with this code:
val fileRdd = sc.textFile("Gettysburg-Address.txt")
Because textFile
is lazy, nothing actually happens yet.
Next, because I want to count words, I get rid of decimals, commas, and hyphens in the text using these two map
transformation calls:
.map(_.replaceAll("[.,]", ""))
.map(_.replace("—", " "))
Next, I use this flatMap
expression to convert the text into an array of words:
.flatMap(line => line.split(" "))
If you look at the result of the two map
expressions and this flatMap
expression, you’d see an Array[String]
:
Array(Four, score, and, seven, years, ago, our, fathers ...
To get from this Array[String]
to a solution that has a set of all words and the number of times they occur — a Map[String, Int]
— the next step is to turn each word into a tuple:
.map(word => (word, 1))
At this point the intermediate data looks like this:
Array[(String, Int)] = Array((Four,1), (score,1), (and,1), (seven,1) ...
Next I use reduceByKey
:
.reduceByKey(_ + _)
This transforms the intermediate data into this data structure:
Array[(String, Int)] = Array((nobly,1), (under,1), (this,4), (have,5) ...
As shown, this is an array of tuples, where the first value of each tuple is a word from the speech, and the second value is a count of how many times that word occurs. This is what reduceByKey
does for us.
Finally I sort that data by the second tuple element:
.sortBy(_._2)
Finally you invoke the collect
method to force all of the transformations to be run:
.collect
Because the data structure has the type Array[(String, Int)]
, you can call counts.foreach(println)
on it. The end of its output shows the most common words in the speech:
scala> counts.foreach(println) [data omitted here] (here,8) (we,8) (to,8) (the,9) (that,13)
Methods to read text files into an RDD
There are two main methods to read text files into an RDD:
-
sparkContext.textFile
-
sparkContext.wholeTextFiles
The textFile
method reads a file as a collection of lines. It can read a file from the local filesystem, or from a Hadoop or Amazon S3 filesystem using "hdfs://"
and "s3a://"
URLs, respectively.
The textFile
method takes an optional second parameter related to partitioning. Per the RDD Programming Guide, textFile
“takes an optional second argument for controlling the number of partitions of the file. By default, Spark creates one partition for each block of the file (blocks being 128MB by default in HDFS), but you can also ask for a higher number of partitions by passing a larger value.”
Here are some examples of how to use textFile
:
textFile("/foo/file.txt") // read a file, using the default
// number of partitions
textFile("/foo/file.txt", 8) // same, but with 8 partitions
textFile("/foo/bar.txt", "/foo/baz.txt") // read multiple files
textFile("/foo/ba*.txt") // read multiple files
textFile("/foo/*") // read all files in 'foo'
textFile("/a/1.txt", "/b/2.txt") // multiple files in different
// directories
textFile("hdfs://.../myfile.csv") // use a Hadoop URL
textFile("s3a://.../myfile.csv") // use an Amazon S3 URL
Note that the s3a
URL prefix is the name of the Hadoop S3 connector, and was previously named s3
and s3n
, so you may see those uses in older documentation.
The wholeTextFiles
method reads the entire file into a single String
, and returns an RDD that contains a tuple, so its return type is RDD[(String, String)]
. The first string in the tuple is the filename that was read, and the second string is the entire contents of the file. This example shows how that works:
scala> val rdd = sc.wholeTextFiles("Gettysburg-Address.txt") rdd: org.apache.spark.rdd.RDD[(String, String)] = Gettysburg-Address.txt MapPartitionsRDD[5] at wholeTextFiles at <console>:24 scala> rdd.foreach(t => println(s"${t._1} | ${t._2.take(15)}")) file:/Users/al/Projects/Books/ScalaCookbook2Examples/16_Ecosystem/Spark/ ↵ SparkApp1/Gettysburg-Address.txt | Four score and
The output from the second expression shows that the tuple contains the filename and file content.
Spark also contains other methods for reading files into a DataFrame
or Dataset
:
-
spark.read.text() is used to read a text file into
DataFrame
-
spark.read.textFile() is used to read a text file into a
Dataset[String]
-
spark.read.csv() and spark.read.format("csv").load("<path>") are used to read a CSV file into a
DataFrame
These methods are demonstrated in the following recipes.
Saving an RDD to disk
When you obtain your final result using RDD transformation and action methods, you may want to save your results. You can save an RDD to disk using its saveAsTextFile
method. This command saves an RDD to a directory named MyRddOutput under the /tmp directory:
myRdd.saveAsTextFile("/tmp/MyRddOutput")
After you do this you’ll find a series of files under the /tmp/MyRddOutput that represent the RDD named myRdd
. Note that if the directory already exists, this operation will fail with an exception.
Reading more complicated text file formats
On a MacOS system, the /etc/passwd file contains seven fields that are delimited by the :
character, but it also contains initial lines of comments, with each comment line beginning with the #
character. To read this file into an RDD you need to skip those initial comment lines.
One way to do this is to read the file into an RDD as usual:
val fileRdd = spark.sparkContext.textFile("/etc/passwd")
Next, create a case class to model the seven-column format:
case class PasswordRecord (
username: String,
password: String,
userId: Int,
groupId: Int,
comment: String,
homeDirectory: String,
shell: String
)
Now you can convert fileRdd
into a new RDD by first filtering out all records that start with the #
character, and then converting the remaining seven-column fields into a PasswordRecord
:
val rdd = fileRdd
.filter(! _.startsWith("#"))
.map { line =>
val row = line.split(":")
PasswordRecord(
row(0), row(1), row(2).toInt, row(3).toInt, row(4), row(5), row(6)
)
}
After doing this, you’ll see that rdd
only contains the colon-separated rows from the file:
scala> rdd.take(3) res1: Array[PasswordRecord] = Array( PasswordRecord(nobody,*,-2,-2,Unprivileged User,/var/empty,/usr/bin/false), PasswordRecord(root,*,0,0,System Administrator,/var/root,/bin/sh), PasswordRecord(daemon,*,1,1,System Services,/var/root,/usr/bin/false) )
Now you can work with this RDD as shown previously.
this post is sponsored by my books: | |||
#1 New Release |
FP Best Seller |
Learn Scala 3 |
Learn FP Fast |
See Also
-
The SparkByExamples.com page on reading text files is an excellent resource