First Logs Analyzer in Spark

Before beginning this section, go through Spark Quick Start
and familiarize with the Spark Programming Guide
first.

This section requires a dependency on the Spark Core library in the maven file - note update this dependency based on the version of Spark you have installed:

  1. <dependency> <!-- Spark -->
  2. <groupId>org.apache.spark</groupId>
  3. <artifactId>spark-core_2.11</artifactId>
  4. <version>2.0.1</version>
  5. </dependency>

Before we can begin, we need two things:

The example code uses an Apache access log file since that’s a well known
and common log format. It would be easy to rewrite the parser for a different log format if you have data in another log format.

The following statistics will be computed:

  • The average, min, and max content size of responses returned from the server.
  • A count of response code’s returned.
  • All IPAddresses that have accessed this server more than N times.
  • The top endpoints requested by count.

Let’s first walk through the code first before running the example at LogAnalyzer.java.

The main body of a simple Spark application is below.
The first step is to bring up a Spark context. Then the Spark context
can load data from a text file as an RDD, which it can then process. Finally, before exiting the function, the Spark context is stopped.

  1. public class LogAnalyzer {
  2. public static void main(String[] args) {
  3. // Create a Spark Context.
  4. SparkConf conf = new SparkConf().setAppName("Log Analyzer");
  5. JavaSparkContext sc = new JavaSparkContext(conf);
  6. // Load the text file into Spark.
  7. if (args.length == 0) {
  8. System.out.println("Must specify an access logs file.");
  9. System.exit(-1);
  10. }
  11. String logFile = args[0];
  12. JavaRDD<String> logLines = sc.textFile(logFile);
  13. // TODO: Insert code here for processing logs.
  14. sc.stop();
  15. }
  16. }

Given an RDD of log lines, use the map function to transform each line
to an ApacheAccessLog object. The ApacheAccessLog RDD is cached in memory,
since multiple transformations and actions will be called on it.

  1. // Convert the text log lines to ApacheAccessLog objects and
  2. // cache them since multiple transformations and actions
  3. // will be called on the data.
  4. JavaRDD<ApacheAccessLog> accessLogs =
  5. logLines.map(ApacheAccessLog::parseFromLogLine).cache();

It’s useful to define a sum reducer - this is a function that takes in
two integers and returns their sum. This is used all over our example.

  1. private static Function2<Long, Long, Long> SUM_REDUCER = (a, b) -> a + b;

Next, let’s calculate the average, minimum, and maximum content size of the
response returned. A map transformation extracts the content sizes, and
then different actions (reduce, count, min, and max) are called to output
various stats. Again, call cache on the context size RDD to avoid recalculating those values for each action called on it.

  1. // Calculate statistics based on the content size.
  2. // Note how the contentSizes are cached as well since multiple actions
  3. // are called on that RDD.
  4. JavaRDD<Long> contentSizes =
  5. accessLogs.map(ApacheAccessLog::getContentSize).cache();
  6. System.out.println(String.format("Content Size Avg: %s, Min: %s, Max: %s",
  7. contentSizes.reduce(SUM_REDUCER) / contentSizes.count(),
  8. contentSizes.min(Comparator.naturalOrder()),
  9. contentSizes.max(Comparator.naturalOrder())));

To compute the response code counts, we have to work with key-value pairs - by using mapToPair and reduceByKey.
Notice that we call take(100) instead of collect() to gather the final output of the response code counts.
Use extreme caution before calling collect() on an RDD since all that data will be sent to a single Spark driver and can cause the driver to run out of memory. Even in this case where there are only a limited number of response codes and it seems safe - if there are malformed lines in the Apache access log or a bug in the parser, there could be many invalid response codes to cause an.

  1. // Compute Response Code to Count.
  2. List<Tuple2<Integer, Long>> responseCodeToCount = accessLogs
  3. .mapToPair(log -> new Tuple2<>(log.getResponseCode(), 1L))
  4. .reduceByKey(SUM_REDUCER)
  5. .take(100);
  6. System.out.println(String.format("Response code counts: %s", responseCodeToCount));

To compute any IPAddress that has accessed this server more than 10 times,
we call the filter tranformation and then map to retrieve only the IPAddress and discard the count. Again we use take(100) to retrieve the values.

  1. List<String> ipAddresses =
  2. accessLogs.mapToPair(log -> new Tuple2<>(log.getIpAddress(), 1L))
  3. .reduceByKey(SUM_REDUCER)
  4. .filter(tuple -> tuple._2() > 10)
  5. .map(Tuple2::_1)
  6. .take(100);
  7. System.out.println(String.format("IPAddresses > 10 times: %s", ipAddresses));

Last, let’s calculate the top endpoints requested in this log file. We define
an inner class, ValueComparator to help with that. This function tells us,
given two tuples, which one is first in ordering. The key of the tuple is ignored, and ordering is based just on the values.

  1. private static class ValueComparator<K, V>
  2. implements Comparator<Tuple2<K, V>>, Serializable {
  3. private Comparator<V> comparator;
  4. public ValueComparator(Comparator<V> comparator) {
  5. this.comparator = comparator;
  6. }
  7. @Override
  8. public int compare(Tuple2<K, V> o1, Tuple2<K, V> o2) {
  9. return comparator.compare(o1._2(), o2._2());
  10. }
  11. }

Then, we can use the ValueComparator with the top action to compute the top endpoints accessed on this server according to how many times the endpoint was accessed.

  1. List<Tuple2<String, Long>> topEndpoints = accessLogs
  2. .mapToPair(log -> new Tuple2<>(log.getEndpoint(), 1L))
  3. .reduceByKey(SUM_REDUCER)
  4. .top(10, new ValueComparator<>(Comparator.<Long>naturalOrder()));
  5. System.out.println("Top Endpoints: " + topEndpoints);

These code snippets are from LogAnalyzer.java.
Now that we’ve walked through the code, try running that example. See the README for language specific instructions for building and running.