A lightweight loT edge stream processing - Kuiper plugin development tutorial

EMQ X KuiperDevelopment tutorial - 图1 (opens new window) is a lightweight loT streaming data processing software based on SQL. It provides a set of plugin mechanism for implementing customized source, sink and SQL function to extend the ability of stream processing. This tutorial gives a detailed introduction to the process of development, compilation, and deployment of the Kuiper plugin.

Overview

Kuiper plugin is based on the plugin mechanism of Golang, users can build loosely-coupled plugin applications, dynamic loading and binding when it is running. However, because the limitation of the Golang plugin system, the compilation and usage of the Kuiper plugin also have corresponding limitations:

  • The plugin does not support Windows system
  • The compilation environment of the plugin is required to be as consistent as possible with Kuiper. Including but not limited to:
    • The same Go version
    • The version of the same libraries plugin and Kuiper depend on must be completely the same, including Kuiper itself
    • The plugin needs to be completely consistent with GOPATH of the Kuiper compilation environment

These limitations are relatively strict, and they almost require compiling and running the plugin and Kuiper on the same machine. It often results in the plugin which complied by the development environment can not be used in producing Kuiper. This article gives a detailed introduction to one reliable and available plugin development environment setting and process, which is recommended to the Kuiper plugin developer to use. Generally, the process for development and usage of the plugin is as follows:

  • Development
    • Create and develop plugin project
    • Compile and debug plugin
  • Deployment
    • Compile plugins which can be used for the production environment
    • Deploy plugin to the production environment

Plugin development

Developing plugin is generally carried out in the development environment. Kuiper plugins will be deployed to the production environment after passing debugging and running the development environment.

Create and develop the plugin project

There are some plugin examples in the plugins directory of the Kuiper project source code. The user customized plugin can also be developed in the Kuiper project. However, users usually need to create the new project outside of the Kuiper project to develop customized plugins, to manage code more conveniently. It’s recommended to use Go module to develop plugin projects, the typical structure of project is listed as following.

  1. plugin_project
  2. sources //source code directory of the plugin source
  3. mysource.go
  4. sinks //source code directory of the plugin sink
  5. mysink.go
  6. functions //source code directory of the plugin function
  7. myfunction.go
  8. target //directory of compiling results
  9. go.mod //file go module

Developing a plugin needs to extend the interface in Kuiper, so it must depend on the Kuiper project. The simplest go.mod also needs to include the dependency for Kuiper. A typical go.mod is as follows:

  1. module samplePlugin
  2. go 1.13
  3. require (
  4. github.com/emqx/kuiper v0.0.0-20200323140757-60d00241372b
  5. )

The Kuiper plugin has three types. The source code can be put into the corresponding directory. For the detailed method of plugin development: EMQ X Kuiper extension. This article will take the Sink plugin as an example to introduce the process of plugin development and deployment. We will develop a basic MySql sink, for write stream output data to the MySql database.

  • Create plugin project samplePlugin with the above directory structure
  • Create file mysql.go under the sinks directory
  • Edit file mysql.go for implementing the plugin
    • Implement api.SinkDevelopment tutorial - 图2 (opens new window) interface
    • Export Symbol: Mysql. It could be a constructor function so that each rule can instantiate an own mysql plugin instance. Or it could be the struct which means every rule will share a singleton of the plugin. If the plugin has states like the connection, the first approach is preferred.
  • Edit go.mod, add Mysql driver module

The complete source code of mysql.go is as follows:

  1. package main
  2. // This is a simplified mysql sink which is for test and tutorial only
  3. import (
  4. "database/sql"
  5. "fmt"
  6. "github.com/emqx/kuiper/common"
  7. "github.com/emqx/kuiper/xstream/api"
  8. _ "github.com/go-sql-driver/mysql"
  9. )
  10. type mysqlConfig struct {
  11. Url string `json:"url"`
  12. Table string `json:"table"`
  13. }
  14. type mysqlSink struct {
  15. conf *mysqlConfig
  16. //The db connection instance
  17. db *sql.DB
  18. }
  19. func (m *mysqlSink) Configure(props map[string]interface{}) error {
  20. cfg := &mysqlConfig{}
  21. err := common.MapToStruct(props, cfg)
  22. if err != nil {
  23. return fmt.Errorf("read properties %v fail with error: %v", props, err)
  24. }
  25. if cfg.Url == "" {
  26. return fmt.Errorf("property Url is required")
  27. }
  28. if cfg.Table == "" {
  29. return fmt.Errorf("property Table is required")
  30. }
  31. return nil
  32. }
  33. func (m *mysqlSink) Open(ctx api.StreamContext) (err error) {
  34. logger := ctx.GetLogger()
  35. logger.Debug("Opening mysql sink")
  36. m.db, err = sql.Open("mysql", m.conf.Url)
  37. return
  38. }
  39. // This is a simplified version of data collect which just insert the received string into hardcoded name column of the db
  40. func (m *mysqlSink) Collect(ctx api.StreamContext, item interface{}) error {
  41. logger := ctx.GetLogger()
  42. if v, ok := item.([]byte); ok {
  43. //TODO in production: deal with various data type of the unmarshalled item.
  44. // It is a json string of []map[string]interface{} by default;
  45. // And it is possible to be any other kind of data if the sink `dataTemplate` is set
  46. logger.Debugf("mysql sink receive %s", item)
  47. //TODO hard coded column here. In production, we'd better get the column/value pair from the item
  48. sql := fmt.Sprintf("INSERT INTO %s (`name`) VALUES ('%s')", m.conf.Table, v)
  49. logger.Debugf(sql)
  50. insert, err := m.db.Query(sql)
  51. if err != nil {
  52. return err
  53. }
  54. defer insert.Close()
  55. } else {
  56. logger.Debug("mysql sink receive non byte data")
  57. }
  58. return nil
  59. }
  60. func (m *mysqlSink) Close(ctx api.StreamContext) error {
  61. if m.db != nil {
  62. return m.db.Close()
  63. }
  64. return nil
  65. }
  66. // export the constructor function to be used to instantiates the plugin
  67. func Mysql() api.Sink {
  68. return &mysqlSink{}
  69. }

The complete code of go.mod is as follows:

  1. module samplePlugin
  2. go 1.13
  3. require (
  4. github.com/emqx/kuiper v0.0.0-20200323140757-60d00241372b
  5. github.com/go-sql-driver/mysql v1.5.0
  6. )

Compile and debug the plugin

The environment of compiling the plugin should be consistent with that of Kuiper. In the development environment, the typical usage is that locally download and compile Kuiper and plugin, and then debug plugin functions in the local Kuiper, or compile the plugin in the docker container of Kuiper and use the Kuiper container to debug it.

Compile locally

Developers can locally compile Kuiper and the plugin for debugging, which steps are as follows:

  1. Download Kuiper source code: git clone https://github.com/emqx/kuiper.git
  2. Compile Kuiper: run make under the Kuiper directory
  3. Compile the plugin:

    1. Run go mod edit -replace github.com/emqx/kuiper=$kuiperPath under the plugin project, make the Kuiper dependence point to the local Kuiper, and then please replace the download directory of step 1 by $kuiperPath, the same below.
    2. Compile the plugin so to the directory of Kuiper plugin
    1. go build -trimpath --buildmode=plugin -o $kuiperPath/_build/$build/plugins/sinks/Mysql@v1.0.0.so sinks/mysql.go

Docker compile

Kuiper provides different docker images for different purpose. The development docker image should be used for compiling plugins. From 0.4.0, the kuiper image with tag x.x.x (e.g. kuiper:0.4.0) is the development docker image. For 0.3.x, kuiper image with tag x.x.x-dev (e.g. kuiper:0.3.0-dev) is the development docker image. Compared with the running version, the development version provides the development environment of Go, which lets users compile the plugin that can be completely compatible with the officially published version of Kuiper. The compiling steps in docker are as follows:

  1. Run docker of the development version of Kuiper. Users need to mount the local plugin directory to the directory in docker, and then they can access and compile the plugin project in docker. The author’s plugin project is located in the local /var/git directory. We map the local directory /var/git to the /home directory in docker by using the following commands.

    1. docker run -d --name kuiper-dev --mount type=bind,source=/var/git,target=/home emqx/kuiper:0.3.0-dev
  2. The principle of compiling plugins in docker environment is the same as the local compilation. The compiled plugin is locating in the target directory of the plugin project.

    1. -- In host
    2. # docker exec -it kuiper-dev /bin/sh
    3. -- In docker instance
    4. # cd /home/samplePlugin
    5. # go mod edit -replace github.com/emqx/kuiper=/go/kuiper
    6. # go build -trimpath --buildmode=plugin -o /home/samplePlugin/target/plugins/sinks/Mysql@v1.0.0.so sinks/mysql.go

You can use below sample shell script in your plugin project to automatically build and package the plugins. Please modify the variables at the beginning of the script to meet the requirements of different environments.

  1. #!/bin/sh
  2. export KUIPER_SOURCE=../kuiper
  3. export PLUGIN_TARGET=$KUIPER_SOURCE/plugins
  4. export ETC_TARGET=$KUIPER_SOURCE/etc
  5. export ZIP_TARGET=plugins
  6. export VERSION=0.0.1
  7. go mod edit -replace github.com/emqx/kuiper=$KUIPER_SOURCE
  8. go build -trimpath --buildmode=plugin -o $PLUGIN_TARGET/sinks/Mysql@v$VERSION.so sinks/mysql.go
  9. ## zip the output
  10. mkdir $ZIP_TARGET/sinks
  11. zip -o $ZIP_TARGET/sinks/mysql.zip $PLUGIN_TARGET/sinks/Mysql@v$VERSION.so

Debug and run the plugin

Run Kuiper in the local or Develop Docker, create streams and rules, set action of the rule to mysql, then users can test the customized mysql sink plugin. Please refer Kuiper documentationDevelopment tutorial - 图3 (opens new window) for the steps of creating streams and rules. The following provides a rule using the mysql plugin for reference.

  1. {
  2. "id": "ruleTest",
  3. "sql": "SELECT * from demo",
  4. "actions": [
  5. {
  6. "log": {},
  7. "mysql":{
  8. "url": "user:test@tcp(localhost:3307)/user",
  9. "table": "test"
  10. }
  11. }
  12. ]
  13. }

During development testing, it is also fine to manually copy the compiled .so file and the .yaml file(if any) to the corresponding folders and then restart Kuiper. In development docker image, the default Kuiper location is /usr/local/kuiper. It should be noted that loading the new version after compiling the plugin again needs to restart Kuiper.

Plugin deployment

If the production environment and development environment are different, the developed plugin needs to be compiled again and deployed to the production environment. Assuming that the production environment adopts Kuiper docker to deploy, this article will describe how to deploy the plugin to the production environment.

Plugin compilation

The plugin should use the same environment as the production environment Kuiper to compile in principle. If the production environment is Kuiper docker, should use the dev docker environment that has the same version as the production environment to compile the plugin. For example, if the production environment uses docker mirroring emqx/kuiper:0.3.0Development tutorial - 图4 (opens new window), the plugin should be compiled in emqx/kuiper:0.3.0-devDevelopment tutorial - 图5 (opens new window) environment.

Please refer Docker compile for the compilation process. The compiled plugin can be tested in the Development Docker image before deploying.

Plugin deployment

Users can use REST APIDevelopment tutorial - 图6 (opens new window) or CLIDevelopment tutorial - 图7 (opens new window) to manage plugins. The following takes the REST API as an example to deploy the plugin compiled in the previous step to the production environment.

  1. Package the plugin and put it into the http server. Package the file .so of the plugin compiled in the previous step and the default configuration file (only required for source) .yaml into a .zip file (assuming that the file is mysqlSink.zip). Put this file into the http server that the production environment can also access.
    • Some plugin may depend on libs that are not installed on Kuiper environment. The user can either install them manually in the Kuiper server or put the install script and dependencies in the plugin zip and let the plugin management system do the installation. Please refer to Plugin File Format for detail.
  2. Use REST API to create plugins:

    1. POST http://{$production_kuiper_ip}:9081/plugins/sinks
    2. Content-Type: application/json
    3. {"name":"mysql","file":"http://{$http_server_ip}/plugins/sinks/mysqlSink.zip"}
  3. Verify whether the plugin was created successfully or not

    1. GET http://{$production_kuiper_ip}:9081/plugins/sinks/mysql

    Return

    1. {
    2. "name": "mysql",
    3. "version": "1.0.0"
    4. }

So far, the plugin has been deployed successfully. Users can create rules with mysql sink for verification.