Configure the data flow from EdgeX to eKuiper

Sources feed data into eKuiper from other systems such as EdgeX foundry which are defined as streams. EdgeX source defines the properties to configure how the data feed into eKuiper from EdgeX. In this tutorial, we will demonstrate the various data flow from EdgeX to eKuiper and how to configure the source to adopt any kind of data flow.

Typical Data Flow Model

Typically, there are two kinds of data flow from EdgeX to eKuiper:

  • From EdgeX app service to eKuiper
  • From EdgeX message bus directly to eKuiper

data flow

Notice that, the EdgeX message bus receives messages from various service such as device service and core data. Even for the first kind of data flow, the app service result is also publish to the message bus and then consumed by eKuiper. The differences between the two kinds is whether app service processed the message before consuming by eKuiper.

By default, the first kind of data flow is used which allow users to prepare (transformed, enriched, filtered, etc.) and groom (formatted, compressed, encrypted, etc.) before sending to the eKuiper rule engine. If users don’t need to transform the data and would like to process the raw data in eKuiper to reduce the overhead, they can connect to the message bus directly.

The full properties list of EdgeX source can be found here. There are two critical properties that define the connection model: topic and messageType. Let’s explore how to configure them to adopt the connection models.

Connect to the App Service

In the default EdgeX docker compose file, a default app service app-service-rules is defined as the upstream of eKuiper. The default publish topic is rules-events which is defined in the default configuration. In the same docker compose file rulesengine section, there is an environment variable definition EDGEX__DEFAULT__TOPIC: rules-events. This means eKuiper edgeX source default topic is “rules-events” which just matches the publish topic of app-service-rules. Thus, when create an edgex type stream with default configuration, the data will flow from app-service-rules to eKuiper automatically.

Modify the connected app service

In some case, users may have multiple app services to do different kinds of data transformation. To let eKuiper connect to another app service, just change the topic to match the new topic name. So modify the docker compose file, add an environment variable TRIGGER_EDGEXMESSAGEBUS_PUBLISHHOST_PUBLISHTOPIC to the app service to explicitly specify the publish topic. Then update the EDGEX__DEFAULT__TOPIC in rulesengine to match the new topic to connect them.

  1. ...
  2. app-service-rules:
  3. ...
  4. environment:
  5. ...
  6. TRIGGER_EDGEXMESSAGEBUS_PUBLISHHOST_PUBLISHTOPIC: new-rules-events
  7. ...
  8. ...
  9. rulesengine:
  10. ...
  11. environment:
  12. ...
  13. EDGEX__DEFAULT__TOPIC: new-rules-events
  14. ...

Connect to the Message Bus

To bypass the app service and gain some performances boost, users can connect to the message bus directly. Besides setting up the topics, users also need to configure the messageType property.

EdgeX v2 message bus has multi level topics so that consumers can filter message by topic efficiently. Please refer to the topics filter examples.

For example, if the rules only consider the data from Random-Integer-Device, we can modify the docker compose file for rulesengine as below to changethe topic and modify the message type to request.

  1. ...
  2. ...
  3. rulesengine:
  4. ...
  5. environment:
  6. ...
  7. EDGEX__DEFAULT__TOPIC: edgex/events/#/Random-Integer-Device/#
  8. EDGEX__DEFAULT__MESSAGETYPE: request
  9. ...

By this way, the eKuiper detach from the app service and connect to the message bus directly. When create a EdgeX stream with default configuration:

  1. CREATE STREAM edgeXAll() with (FORMAT="JSON", TYPE="edgex")

Only the events from the Random-Integer-Device will be received.

Multiple Streams

In the real world, users usually have multiple rules. Some rules only concern about a specific profile, device or even reading of the message bus. It is a best practice to create multiple streams to map multiple points of interest and each rule only process the subset of messages.

In this scenario, users will have multiple topics in the EdgeX message bus either by app services or directly by message bus filtered topic. In eKuiper, edgex source configuration can map to each topic. An example edgex config file edgex.yaml with multiple configurations is as below:

  1. # Default conf connect app-service-rules
  2. default:
  3. protocol: tcp
  4. server: localhost
  5. port: 5563
  6. topic: rules-events
  7. type: redis
  8. messageType: event
  9. #Override the global configurations
  10. device_conf: # Filter only Random-Integer-Device
  11. topic: edgex/events/#/Random-Integer-Device/#
  12. messageType: request
  13. another_app_service_conf:
  14. topic: new-rules-events
  15. int8_conf: # Filter only Random-Integer-Device int8 reading
  16. topic: edgex/events/#/Random-Integer-Device/int8
  17. messageType: request

With this configuration, users have 3 confkey that would connect to different edgeX data flows. For example, if a user have two rules: rule1 need to process all events while rule2 only process the int8 reading of Random-Integer-Device. Then the user can create two streams: edgexAll and edgexInt8.

  1. CREATE STREAM edgexAll() WITH (FORMAT="JSON", TYPE="edgex")

With this definition, the default confkey will be used and the rules using this stream will receive all events.

  1. CREATE STREAM edgexInt8(int8 bigint) WITH (FORMAT="JSON", TYPE="edgex", CONF_KEY="int8_conf")

Differently, the edgexInt8 specify the confkey explicitly to use int8_conf which configures to filtered topic for Random-Integer-Device device int8 reading. Thus, it will only receive int8 reading for every event and the event structure is fixed. So, the stream definition also define the schema instead of schemaless.

Similarly, users can create streams for each confkey. And each rule can pick the streams depending on its interests.

Shared instance

When rules are running, each rule have an individual source instance and are separated from each other even using the same stream definition. Sometimes to reduce the overhead and guarantee the same data sequence across rules, many rules may want to share the same instance of source. This is even common for the edgex default stream which reads all events in the message bus and multiple instance may have a lot of overhead.

So for edgexAll stream, we recommend creating a shared instance and let all rules that need the full data use it.

  1. CREATE STREAM edgexAll() WITH (FORMAT="JSON", TYPE="edgex", SHARED="true")

Summary

In the previous tutorial, we usually create an overall stream for edgeX, and it is not obvious to know how to configure and filter the edgeX events. In this tutorial, we learn the configuration in both edgeX and eKuiper together to filter the events into multiple streams and let the rules only process events of interests. Finally, we discuss how to use shared instance of source for performance and consistency.