Native Plugin

eKuiper allows user to customize the different kinds of extensions by the native golang plugin system.

  • The source extension is used for extending different stream source, such as consuming data from other message brokers. eKuiper has built-in source support for MQTT broker.
  • Sink/Action extension is used for extending pub/push data to different targets, such as database, other message system, web interfaces or file systems. Built-in action is supported in eKuiper, see MQTT & log files.
  • Functions extension allows user to extend different functions that used in SQL. Built-in functions is supported in eKuiper, see functions.

Please read the following to learn how to implement different extensions.

Naming

We recommend plugin name to be camel case. Notice that, there are some restrictions for the names:

  1. The name of the export symbol of the plugin should be camel case with an upper case first letter. It must be the same as the plugin name except the first letter. For example, plugin name file must export a export symbol name File .
  2. The name of .so file must be the same as the export symbol name or the plugin name. For example, MySource.so or mySink.so.

State storage

eKuiper extension exposes a key value state storage interface through the context parameter, which can be used for all types of extensions, including Source/Sink/Function extensions.

States are key-value pairs, where the key is a string and the value is arbitrary data. Keys are scoped the to current extended instance.

Users can access the state storage through the context object. State-related methods include putState, getState, incrCounter, getCounter and deleteState.

Below is an example of a function extension to access states. This function will count the number of words passed in and save the cumulative number in the state.

  1. func (f *accumulateWordCountFunc) Exec(args []interface{}, ctx api.FunctionContext) (interface{}, bool) {
  2. logger := ctx.GetLogger()
  3. err := ctx.IncrCounter("allwordcount", len(strings.Split(args[0], args[1])))
  4. if err != nil {
  5. return err, false
  6. }
  7. if c, err := ctx.GetCounter("allwordcount"); err != nil {
  8. return err, false
  9. } else {
  10. return c, true
  11. }
  12. }

Runtime dependencies

Some plugin may need to access dependencies in the file system. Those files is put under {{eKuiperPath}}/etc/{{pluginType}}/{{pluginName}} directory. When packaging the plugin, put those files in etc directory. After installation, they will be moved to the recommended place.

In the plugin source code, developers can access the dependencies of file system by getting the eKuiper root path from the context:

  1. ctx.GetRootPath()