Note: I originally wrote this article many years ago using Apache Spark 0.9.x. Hopefully the content below is still useful, but I wanted to warn you up front that it is old.
Introduction
I don’t want to make my original Parsing Apache access log records with Spark and Scala article any longer, so I’m putting some new, better code here.
Assuming that you've read that article, or that you’re comfortable with Spark, I’ll jump right in and say that I use this piece of code to load my (a) Apache access log parser and (b) sample access log data file into the Spark REPL:
import com.alvinalexander.accesslogparser._ val p = new AccessLogParser val log = sc.textFile("accesslog.sample")
To be clear, those statements, and all of the following expressions, are entered in the REPL. I start the Spark interpreter (REPL) with this command at my *nix command line:
$ MASTER=local[4] SPARK_CLASSPATH=AlsApacheLogParser.jar spark-shell
That command brings my Apache access log parser library (jar) into the Spark REPL workspace.
Getting a list of URIs
After those first few lines are loaded, I use the following expression that yields a list of URIs. It’s not exactly what I want, but it’s a start:
val uris = log.map(p.parseRecordReturningNullObjectOnFailure(_).request) .filter(_ != "") .map(_.split(" ")(1))
If you’re new to Scala, this version of that same code may be more readable:
val uris = log.map(line => p.parseRecordReturningNullObjectOnFailure(line).request) .filter(request => request != "") .map(request => request.split(" ")(1)) // a request looks like "GET /foo HTTP/1.1"
(Note that I paste long expressions like these into the REPL using the :paste
command.)
Sidebar: Handling the 'bytes' field
I used this next expression to find the lines that my Apache access log file parser project wasn’t parsing properly:
for { line <- log if p.parseRecord(line) == None } yield line
Using this expression, I found that the bytes field in an Apache access log record can contain the -
character. I assumed that it would always be an integer value, so I fixed that in my parser library before proceeding.
Getting what I want: URIs sorted by hit count
Next up, this handsome expression returns an Array[(String, Int)]
, which is an array of (URI -> numOccurrences) pairs:
// works: use the previous example to get to a series of "(URI, COUNT)" pairs; (MapReduce like) val uriCount = log.map(p.parseRecordReturningNullObjectOnFailure(_).request) .filter(request => request != "") // filter out records that wouldn't parse properly .map(_.split(" ")(1)) // get the uri field .map(uri => (uri, 1)) // create a tuple for each record .reduceByKey((a, b) => a + b) // reduce to get this for each record: (/java/java_oo/up.png,2) .collect // convert to Array[(String, Int)], which is Array[(URI, numOccurrences)]
Now that I have the hit count for each URI, I can get what I want, the data sorted, with the highest number of hits listed first:
import scala.collection.immutable.ListMap val uriHitCount = ListMap(uriCount.toSeq.sortWith(_._2 > _._2):_*) // (/bar, 10), (/foo, 3), (/baz, 1)
Now I print the first 20 records to verify that I have what I want:
// print the top-20 most-hit URIs uriHitCount.take(20).foreach(println) (/styles/mobile1024.css,80603) (/styles/mobile480.css,80582) (/styles/mobile768.css,80375) (/images/icons/home.png,79147) (/images/icons/gear.png,78993) (/images/icons/person.png,78988) (/images/icons/search.png,78961) (/sites/default/files/chrysalis2_logo.png,78925) (/images/icons/rss.png,78849) (/sites/default/files/V6-Flattened-FrontOnly-300pxH-60b.jpg,78694) (/sites/default/files/HowISoldMyBusiness-300pxH-5.jpg,78662) (/modules/node/node.css?g,78598) (/sites/all/modules/ckeditor/ckeditor.css?g,78509) (/sites/default/files/imagecache/preview/photos/scala-cookbook-oreilly-alvin-alexander.gif,78453) (/sites/all/modules/filefield/filefield.css?g,78367) (/sites/all/themes/chrysalis2/style.css?g,78353) (/modules/system/system.css?g,78267) (/sites/all/modules/cck/theme/content-module.css?g,78232) (/modules/user/user.css?g,78201) (/sites/all/modules/mollom/mollom.css?g,78163)
In the real world what I’m going to do next is get rid of all image, CSS, and JavaScript hits so I can see which blog posts are the most popular, but that’s a series of simple filter
method calls. When I add those filter
calls to this code, I can print the results onscreen as shown above, or with a little more formatting:
val formatter = java.text.NumberFormat.getIntegerInstance uriHitCount.take(50).foreach { pair => val uri = pair._1 val count = pair._2 println(s"${formatter.format(count)} => $uri") }
This code gives me 50 lines of output that look like this:
29,228 => / 10,379 => /foo 9,512 => /bar 9,015 => /baz
Writing my results to file
The last thing I want to show is how to write this data to a file from the Spark command line. To do that, I use this code:
import java.io._ val file = new File("UriHitCount.out") val bw = new BufferedWriter(new FileWriter(file)) for { record <- uriHitCount val uri = record._1 val count = record._2 } bw.write(s"$count => $uri\n") bw.close
Note that I can’t use Spark’s saveAsTextFile
method because uriHitCount
is a ListMap
.
For my current small sample data set, the first ten lines of the resulting UriHitCount.out file look like this:
80603 => /styles/mobile1024.css 80582 => /styles/mobile480.css 80375 => /styles/mobile768.css 79147 => /images/icons/home.png 78993 => /images/icons/gear.png 78988 => /images/icons/person.png 78961 => /images/icons/search.png 78925 => /sites/default/files/chrysalis2_logo.png 78849 => /images/icons/rss.png 78694 => /sites/default/files/V6-Flattened-FrontOnly-300pxH-60b.jpg
this post is sponsored by my books: | |||
#1 New Release |
FP Best Seller |
Learn Scala 3 |
Learn FP Fast |
Summary
The next thing I want to do with this is to get all of my Apache access log files onto an Amazon cluster, and test Spark on multiple servers. But alas, that will have to wait until I can find some more free time. Until then, if you were looking for an example of how to use Apache Spark with Scala, and in particular, how to process Apache access log records with Spark and Scala, I hope this has been helpful.