IoT Fleet Management - Spark and Kafka

AttentionThis page documents an earlier version. Go to the latest (v2.1)version.

Overview

This is an end-to-end functional application. It is a blueprint for an IoT application built on top of YugabyteDB (Cassandra API) as the database, Kafka as the message broker, Spark for realtime analytics and Spring Boot as the application framework. The stack used for this application is very similar to the SMACK stack (Spark, Mesos, Akka, YugabyteDB, Kafka), which is a popular stack for developing IoT applications.

Scenario

Assume that a fleet management company wants to track their fleet of vehicles which are delivering shipments. The vehicles performing the shipments are of different types (18 Wheelers, busses, large trucks, etc), and the shipments themselves happen over 3 routes (Route-37, Route-82, Route-43). The company wants to track:

  • the breakdown of their vehicle types per shipment delivery route
  • which vehicles are near road closures so that they can predict delays in deliveries

This app renders a dashboard showing both of the above. Below is a view of the realtime, auto-refreshing dashboard.

YB IoT Fleet Management Dashboard

App Architecture

This application has the following subcomponents:

  • Data Store - Yugabyte
  • Data Producer - Test program writing into Kafka
  • Data Processor - Spark reading from Kafka
  • Data Dashboard - Spring Boot app using web sockets, jQuery and bootstrap

We will look at each of these components in detail. Below is an architecture diagram showing how these components fit together.

YB IoT Fleet Management Architecture

Data Store

Stores all the user-facing data. YugabyteDB is used here, with CQL as the programming language.

All the data is stored in the keyspace TrafficKeySpace:

  1. CREATE KEYSPACE IF NOT EXISTS TrafficKeySpace

There are three tables that hold the user-facing data - Total_Traffic for the lifetime traffic information, Window_Traffic for the last 30 seconds of traffic and poi_traffic for the traffic near a point of interest (road closures). The data processor constantly updates these tables, and the dashboard reads from these tables. Below are the schemas for these tables.

  1. CREATE TABLE TrafficKeySpace.Total_Traffic (
  2. routeId text,
  3. vehicleType text,
  4. totalCount bigint,
  5. timeStamp timestamp,
  6. recordDate text,
  7. PRIMARY KEY (routeId, recordDate, vehicleType)
  8. );
  9. CREATE TABLE TrafficKeySpace.Window_Traffic (
  10. routeId text,
  11. vehicleType text,
  12. totalCount bigint,
  13. timeStamp timestamp,
  14. recordDate text,
  15. PRIMARY KEY (routeId, recordDate, vehicleType)
  16. );
  17. CREATE TABLE TrafficKeySpace.poi_traffic(
  18. vehicleid text,
  19. vehicletype text,
  20. distance bigint,
  21. timeStamp timestamp,
  22. PRIMARY KEY (vehicleid)
  23. );

Data Producer

A program that generates random test data and publishes it to the Kafka topic iot-data-event. This emulates the data received from the connected vehicles using a message broker in the real world.

A single data point is a JSON payload and looks as follows:

  1. {
  2. "vehicleId":"0bf45cac-d1b8-4364-a906-980e1c2bdbcb",
  3. "vehicleType":"Taxi",
  4. "routeId":"Route-37",
  5. "longitude":"-95.255615",
  6. "latitude":"33.49808",
  7. "timestamp":"2017-10-16 12:31:03",
  8. "speed":49.0,
  9. "fuelLevel":38.0
  10. }

Data Processor

This is a Spark streaming application that consumes the data stream from the Kafka topic, converts them into meaningful insights and writes the resultant data back to YugabyteDB.

Spark communicates with YugabyteDB using the Cassandra connector. This is done as follows:

  1. SparkConf conf =
  2. new SparkConf().setAppName(prop.getProperty("com.iot.app.spark.app.name"))
  3. .set("spark.cassandra.connection.host",prop.getProperty("com.iot.app.cassandra.host"))

The data is consumed from a Kafka stream and collected in 5 second batches. This is achieved as follows:

  1. JavaStreamingContext jssc = new JavaStreamingContext(conf,Durations.seconds(5));
  2. JavaPairInputDStream<String, IoTData> directKafkaStream =
  3. KafkaUtils.createDirectStream(jssc,
  4. String.class,
  5. IoTData.class,
  6. StringDecoder.class,
  7. IoTDataDecoder.class,
  8. kafkaParams,
  9. topicsSet
  10. );

It computes the following:

  • Compute a breakdown by vehicle type and the shipment route across all the vehicles and shipments done so far
  • Compute the above breakdown for active shipments. This is done by computing the breakdown by vehicle type and shipment route for the last 30 seconds
  • Detect the vehicles which are within a 20 mile radius of a given Point of Interest (POI), which represents a road-closure

Data Dashboard

This is a Spring Boot application which queries the data from Yugabyte and pushes the data to the webpage using Web Sockets and jQuery. The data is pushed to the web page in fixed intervals so data will be refreshed automatically. Dashboard displays data in charts and tables. This web page uses bootstrap.js to display the dashboard containing charts and tables.

We create entity classes for the three tables “Total_Traffic”, “Window_Traffic” and “Poi_Traffic”, and DAO interfaces for all the entities extending CassandraRepository. For example, we create the DAO class for TotalTrafficData entity as follows.

  1. @Repository
  2. public interface TotalTrafficDataRepository extends CassandraRepository<TotalTrafficData> {
  3. @Query("SELECT * FROM traffickeyspace.total_traffic WHERE recorddate = ? ALLOW FILTERING")
  4. Iterable<TotalTrafficData> findTrafficDataByDate(String date);
  5. }

In order to connect to Yugabyte cluster and get connection for database operations, we write the assandraConfig class. This is done as follows:

  1. public class CassandraConfig extends AbstractCassandraConfiguration {
  2. @Bean
  3. public CassandraClusterFactoryBean cluster() {
  4. // Create a Cassandra cluster to access Yugabyte using CQL.
  5. CassandraClusterFactoryBean cluster = new CassandraClusterFactoryBean();
  6. // Set the database host.
  7. cluster.setContactPoints(environment.getProperty("com.iot.app.cassandra.host"));
  8. // Set the database port.
  9. cluster.setPort(Integer.parseInt(environment.getProperty("com.iot.app.cassandra.port")));
  10. return cluster;
  11. }
  12. }

Summary

This application is a blue print for building IoT applications. The instructions to build and run the application, as well as the source code can be found in this github repo.