This is an excerpt from the Scala Cookbook, 2nd Edition. This is Recipe 20.3, Reading a CSV File Into a Spark RDD.
Problem
You want to read a CSV file into an Apache Spark RDD.
Solution
To read a well-formatted CSV file into an RDD:
-
Create a case class to model the file data
-
Read the file using sc.textFile
-
Create an RDD by mapping each row in the data to an instance of your case class
-
Manipulate the data as desired
The following example demonstrates those steps, using a file named TomHanksMoviesNoHeader.csv, which has these contents:
1995, Toy Story, 8.3 2000, Cast Away, 7.8 2006, The Da Vinci Code, 6.6 2012, Cloud Atlas, 7.4 1994, Forrest Gump, 8.8
Create a case class to model the file data
First, create a case class that matches the data in the file:
case class Movie(year: Int, name: String, rating: Double)
Read the file
Next, read the file into an initial RDD:
val fileRdd = sc.textFile("TomHanksMoviesNoHeader.csv")
Create an RDD by mapping each row to the case class
Then call map
on the fileRdd
to create a new RDD named movies
:
val movies = fileRdd.map{ row =>
val fields = row.split(",").map(_.trim)
Movie(fields(0).toInt, fields(1), fields(2).toDouble)
}
The first line in the block splits each row on the comma character, and then trims each resulting field in the row. When the block completes, movies
has the type org.apache.spark.rdd.RDD[Movie].
Work with the data as desired
Now you can work with the movies
RDD as desired, using the transformation and action methods demonstrated in the previous recipes:
scala> :type movies org.apache.spark.rdd.RDD[Movie] scala> movies.first res0: Movie = Movie(1995,Toy Story,8.3) scala> movies.take(2) res1: Array[Movie] = Array(Movie(1995,Toy Story,8.3), Movie(2000,Cast Away,7.8)) scala> movies.filter(.rating > 7).filter(.year > 2000).collect res2: Array[Movie] = Array(Movie(2012,Cloud Atlas,7.4))
Discussion
This recipe shows how to read a CSV file into an RDD. You can also work with CSV files using SQL — from the Spark SQL module — and that’s demonstrated in [spark-105-intro] and [spark-106-intro].
this post is sponsored by my books: | |||
#1 New Release |
FP Best Seller |
Learn Scala 3 |
Learn FP Fast |
Working with a CSV file without using a case class
You can read a CSV file into an RDD without using a case class, but the process is a little more cumbersome. If you want to use this approach, start by reading the file as before:
// RDD[String]
val fileRdd = sc.textFile("TomHanksMoviesNoHeader.csv")
Then split each row on the comma character, and trim each resulting field:
// movies: RDD[Array[String]]
val movies = rdd.map(row => row.split(",")
.map(field => field.trim))
As shown in the comment, movies
has the type RDD[Array[String]]
. Now you can work with the data as desired, but you’ll have to treat each row as an array of strings:
scala> movies.take(2) Array[Array[String]] = Array(Array(Year, Name, IMDB), Array(1995, Toy Story, 8.3)) scala> movies.filter(row => row(0).toInt > 2000).collect res0: Array[Array[String]] = Array( Array(2006, The Da Vinci Code, 6.6), Array(2012, Cloud Atlas, 7.4) )