This is an excerpt from the Scala Cookbook, 2nd Edition. This is Recipe 20.1, Getting Started with Apache Spark.
Scala Problem
You’ve never used Spark before, and want to get started with it.
Solution
When you use Spark professionally, your data will probably be spread across multiple computers — maybe thousands of computers — but to get started with Spark you can do all of your work on one computer system in “local mode.” The easiest way to do this is to start the Spark shell, create an array, and go from there.
The Spark installation instructions may vary over time, but currently you just download Spark from its download page. The download is a tgz file, so just unpack that and move it to your bin directory, just like you manually install other downloaded applications. For example, I install Spark under my /Users/al/bin directory.
Once you have Spark installed, start the Scala Spark shell like this:
$ spark-shell
The Spark shell is a modified version of the normal Scala shell you get with the scala
command, so anything you can do in the Scala shell you can also do in the Spark shell, such as creating an array:
val nums = Array.range(0, 100)
Once you have something like an array or map, you can create a Spark Resilient Distributed Dataset — RDD — by calling the Spark Context’s parallelize
method:
scala> val rdd = spark.sparkContext.parallelize(nums) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:25
Notice from the output that rdd
has the type RDD[Int]
. As you’ll see in the Discussion, an RDD is one of Spark’s basic building blocks. For now you can think of it as being a collection class like a list or array, but its data can be spread across all the computers in a cluster. It also has additional methods that can be called. Here are some examples of methods that look familiar from the Scala collections classes:
rdd.count // Long = 100
rdd.first // Int = 0
rdd.min // Int = 0
rdd.max // Int = 99
rdd.take(3) // Array[Int] = Array(0, 1, 2)
rdd.take(2).foreach(println) // prints 0 and 1 on separate lines
Here are a few RDD methods that may not look familiar:
// “sample” methods return random values from the RDD
rdd.sample(false, 0.05).collect // Array[Int] = Array(0, 16, 22, 27, 60, 73)
rdd.takeSample(false, 5) // Array[Int] = Array(35, 65, 31, 27, 1)
rdd.mean // Double = 49.5
rdd.stdev // Double = 28.866070047722115
rdd.getNumPartitions // Int = 8
rdd.stats // StatCounter = (count: 100, mean: 49.500000,
// stdev: 28.866070, max: 99.000000,
// min: 0.000000)
You can also use familiar collection methods like map
and filter
:
scala> rdd.map(_ + 5).filter(_ < 8) res0: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[10] at filter at <console>:26 scala> rdd.filter(_ > 10).filter(_ < 20) res1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[12] at filter at <console>:26
However, notice that these methods don’t return a result, at least not the result you were expecting. In Spark, transformation methods like these are evaluated lazily, so we refer to them as lazy or non-strict. To get a result from them you have to add an action method. An RDD has a collect
method, which is an action method that forces all previous transformation methods to be run, and then brings the result back to the computer your code is being run on. In these examples, adding collect
causes a result to be calculated:
scala> rdd.map(_ + 5).filter(_ < 8).collect res0: Array[Int] = Array(5, 6, 7) scala> rdd.filter(_ > 10).filter(_ < 20).collect res1: Array[Int] = Array(11, 12, 13, 14, 15, 16, 17, 18, 19)
Discussion
In production situations Spark will work with data that’s spread across clusters of computers, but as shown, in Spark’s local mode all of the processing is done on your local computer.
The ‘spark’ object and spark context (‘sc’)
In the Solution I created an RDD in the Spark shell with these two lines of code:
val nums = Array.range(0, 100)
val rdd = spark.sparkContext.parallelize(nums)
As you might guess, spark
is a pre-built object that’s available in the shell. You can see the type of spark
and spark.sparkContext
using the shell’s :type
command:
scala> :type spark org.apache.spark.sql.SparkSession scala> :type spark.sparkContext org.apache.spark.SparkContext
You’ll use these two objects quite a bit in your Spark programming. Because you use the SparkContext
so often, there’s a shortcut available for it in the shell named sc
, so instead of typing this:
val rdd = spark.sparkContext.parallelize(nums)
you can just type this:
val rdd = sc.parallelize(nums)
RDD
While there are higher-level ways to work with data in Spark — which you’ll see in the following recipes — the RDD is a fundamental data structure in Spark. The Spark RDD Programming Guide describes an RDD as “a collection of elements partitioned across the nodes of the cluster that can be operated on in parallel.” The Spark creators recommend thinking of an RDD as a large, distributed, spreadsheet.
Technically an RDD is an immutable, fault-tolerant, parallel data structure. The book, Beginning Apache Spark 2, by Hien Luu (Apress), states that an RDD is represented by five pieces of information:
-
A set of partitions, which are the chunks that make up the dataset
-
A set of dependencies on parent RDDs
-
A function for computing all the rows in the dataset
-
Metadata about the partitioning scheme (optional)
-
Where the data lives on the cluster (optional); if the data lives on the HDFS, then it would be where the block locations are located
When we say that an RDD is a parallel data structure, this means that a single large file can be split across many computers. In typical use cases, a dataset is so large that it can’t fit onto a single node, so it ends up being partitioned across multiple nodes in a cluster. Some basic things to know about partitions are:
-
The partition is the main unit of parallelism in Spark
-
Every node in a Spark cluster contains one or more partitions
-
The number of partitions is configurable
-
Spark provides a default value, but you can tune it
-
Partitions do not span multiple nodes
-
Spark runs one task for each partition of the cluster
When you use an RDD, each row in the data is typically represented as a Java/Scala object. The structure of this object is unknown to Spark, so it can’t help you with your processing, other than providing methods like filter
and map
, that know how to work with a file RDD that’s broken into chunks and spread across many computers.
When I say that Spark can’t help with your processing, what this means is that Spark also provides higher-level techniques that you’ll see in later recipes that demonstrate the Spark DataFrame
and Dataset
. When you use these data structures you’ll be able to use SQL queries, and Spark has an optimization engine that can help with the execution of your queries.
Three ways to create an RDD
There are three ways to create an RDD:
-
Call
parallelize
on a collection -
Read the data from one or more files into an RDD (as you’ll see in the next recipe)
-
Call a transformation method on an existing RDD to create a new RDD
The parallelize
method is shown in the Solution. This method is generally only used for testing, and per the Parallelized Collections section of the RDD Programming Guide, it copies the contents of a collection “to create a distributed dataset that can be operated on in parallel.”
The parallelize
method takes an optional parameter that lets you specify the number of partitions the dataset can be broken into:
val rdd = spark.sparkContext.parallelize(nums, 20)
rdd.getNumPartitions // Int = 20
RDD methods
There are dozens of methods available on an RDD. You can see these in the REPL as usual by creating an RDD, then typing a decimal and pressing the [Tab] key after the decimal. This includes implementations of the usual Scala collections methods like distinct
, filter
, foreach
, map
, and take
, as well as other methods unique to Spark. There are dozens of methods, so see the RDD class Scaladoc and RDD Programming Guide for more details on the available methods.
this post is sponsored by my books: | |||
#1 New Release |
FP Best Seller |
Learn Scala 3 |
Learn FP Fast |
See Also
These articles provide more details on Spark partitions: