Windowed Calculations: window()

A typical use case for log analysis is monitoring a web server,
in which case you may only be interested in what’s happened for the last one hour of time and want those statistics to refresh every minute. One hour is
the window length, while one minute is the slide interval. In this
example, we use a window length of 30 seconds and a slide interval of
10 seconds as a comfortable choice for development.

The windows feature of Spark Streaming makes it very easy to compute
stats for a window of time, using the window function.

The first step is to initiate the SparkSession and context objects - in particular
a streaming context. Note how only SparkSession is created and the streaming context
is obtained from it. Next, the main body should be written. Finally, the example
calls start() on the streaming context, and awaitTermination()to keep
the streaming context running and accepting streaming input.

  1. public class LogAnalyzerStreamingSQL {
  2. public static void main(String[] args) throws InterruptedException {
  3. // Initialize SparkSession instance.
  4. // Note: Only SparkSession instance is created,
  5. //other flavors of Spark context are obtained from it.
  6. SparkSession sparkSession = SparkSession
  7. .builder()
  8. .appName("Log Analyzer Streaming SQL")
  9. .getOrCreate();
  10. JavaSparkContext sc = new JavaSparkContext(sparkSession.sparkContext());
  11. JavaStreamingContext jssc = new JavaStreamingContext(sc,
  12. SLIDE_INTERVAL); // This sets the update window to be every 10 seconds.
  13. // TODO: Insert code here to process logs.
  14. // Start the streaming server.
  15. jssc.start(); // Start the computation
  16. jssc.awaitTermination(); // Wait for the computation to terminate
  17. }
  18. }

The first step of the main body is to create a DStream from reading the socket.

  1. JavaReceiverInputDStream<String> logDataDStream =
  2. jssc.socketTextStream("localhost", 9999);

Next, call the map transformation to convert the logDataDStream into a ApacheAccessLog DStream.

  1. JavaDStream<ApacheAccessLog> accessLogDStream =
  2. logDataDStream.map(ApacheAccessLog::parseFromLogLine);

Next, call window on the accessLogDStream to create a windowed DStream. The window function nicely packages the input data that is being
streamed into RDDs containing a window length of data, and creates a new
RDD every SLIDE_INTERVAL of time.

  1. JavaDStream<ApacheAccessLog> windowDStream =
  2. accessLogDStream.window(WINDOW_LENGTH, SLIDE_INTERVAL);

Then call foreachRDD on the windowDStream. The function
passed into foreachRDD is called on each new RDD in the windowDStream as the RDD
is created, so every slide_interval. The RDD passed into the function contains
all the input for the last window_length of time. Now that there is
an RDD of ApacheAccessLogs, simply reuse code from either two batch examples (regular or SQL). In this example, the code was just copied and pasted, but you could refactor this code into one place nicely for reuse in your production code base - you can reuse all your batch processing code for streaming!

  1. windowDStream.foreachRDD(accessLogs -> {
  2. if (accessLogs.count() == 0) {
  3. System.out.println("No access logs in this time interval");
  4. return;
  5. }
  6. // Insert code verbatim from LogAnalyzer.java or LogAnalyzerSQL.java here.
  7. // Calculate statistics based on the content size.
  8. JavaRDD<Long> contentSizes =
  9. accessLogs.map(ApacheAccessLog::getContentSize).cache();
  10. System.out.println(String.format("Content Size Avg: %s, Min: %s, Max: %s",
  11. contentSizes.reduce(SUM_REDUCER) / contentSizes.count(),
  12. contentSizes.min(Comparator.naturalOrder()),
  13. contentSizes.max(Comparator.naturalOrder())));
  14. //...Won't copy the rest here...
  15. }

Now that we’ve walked through the code, run
LogAnalyzerStreaming.java
and/or LogAnalyzerStreamingSQL.java now. Use the cat command as explained before to add data to the log file periodically once you have your program up.