Reading a CSV File Into a Spark RDD (Scala Cookbook recipe)

This is an excerpt from the Scala Cookbook, 2nd Edition. This is Recipe 20.3, Reading a CSV File Into a Spark RDD.

Problem

You want to read a CSV file into an Apache Spark RDD.

Solution

To read a well-formatted CSV file into an RDD:

  1. Create a case class to model the file data

  2. Read the file using sc.textFile

  3. Create an RDD by mapping each row in the data to an instance of your case class

  4. Manipulate the data as desired

The following example demonstrates those steps, using a file named TomHanksMoviesNoHeader.csv, which has these contents:

1995, Toy Story,         8.3
2000, Cast Away,         7.8
2006, The Da Vinci Code, 6.6
2012, Cloud Atlas,       7.4
1994, Forrest Gump,      8.8

Create a case class to model the file data

First, create a case class that matches the data in the file:

case class Movie(year: Int, name: String, rating: Double)

Read the file

Next, read the file into an initial RDD:

val fileRdd = sc.textFile("TomHanksMoviesNoHeader.csv")

Create an RDD by mapping each row to the case class

Then call map on the fileRdd to create a new RDD named movies:

val movies = fileRdd.map{ row =>
    val fields = row.split(",").map(_.trim)
    Movie(fields(0).toInt, fields(1), fields(2).toDouble)
}

The first line in the block splits each row on the comma character, and then trims each resulting field in the row. When the block completes, movies has the type org.apache.spark.rdd.RDD[Movie].

Work with the data as desired

Now you can work with the movies RDD as desired, using the transformation and action methods demonstrated in the previous recipes:

scala> :type movies
org.apache.spark.rdd.RDD[Movie]

scala> movies.first
res0: Movie = Movie(1995,Toy Story,8.3)

scala> movies.take(2)
res1: Array[Movie] = Array(Movie(1995,Toy Story,8.3), Movie(2000,Cast Away,7.8))

scala> movies.filter(.rating > 7).filter(.year > 2000).collect
res2: Array[Movie] = Array(Movie(2012,Cloud Atlas,7.4))

Discussion

This recipe shows how to read a CSV file into an RDD. You can also work with CSV files using SQL — from the Spark SQL module — and that’s demonstrated in [spark-105-intro] and [spark-106-intro].

Working with a CSV file without using a case class

You can read a CSV file into an RDD without using a case class, but the process is a little more cumbersome. If you want to use this approach, start by reading the file as before:

// RDD[String]
val fileRdd = sc.textFile("TomHanksMoviesNoHeader.csv")

Then split each row on the comma character, and trim each resulting field:

// movies: RDD[Array[String]]
val movies = rdd.map(row => row.split(",")
                .map(field => field.trim))

As shown in the comment, movies has the type RDD[Array[String]]. Now you can work with the data as desired, but you’ll have to treat each row as an array of strings:

scala> movies.take(2)
Array[Array[String]] = Array(Array(Year, Name, IMDB),
                       Array(1995, Toy Story, 8.3))

scala> movies.filter(row => row(0).toInt > 2000).collect
res0: Array[Array[String]] = Array(
    Array(2006, The Da Vinci Code, 6.6),
    Array(2012, Cloud Atlas, 7.4)
)