Kuiper 插件开发教程

EMQ X Kuiper - 基于 SQL 的轻量级流式数据处理软件插件开发教程 - 图1 (opens new window)提供了一套插件机制用于实现自定义源(source),目标(sink)以及 SQL 函数(function)以扩展流处理功能。本教程详细介绍了 Kuiper 插件的开发编译和部署过程。

概览

Kuiper 插件机制基于 Go 语言的插件机制,使用户可以构建松散耦合的插件程序,在运行时动态加载和绑定。同时,由于 GO 语言插件系统的限制, Kuiper 插件的编译和使用也有相应的限制:

  • 插件不支持 windows 系统
  • 插件编译环境要求跟 Kuiper 编译环境尽量一致,包括但不限于
    • 相同的 GO 版本
    • 插件与 Kuiper 自身依赖的相同包版本必须完全一致,包括 Kuiper 自身
    • 插件与 Kuiper 编译环境的 GOPATH 必须完全一致

这些限制较为苛刻,几乎要求插件和 Kuiper 在同一台机器编译运行,经常导致开发环境编译出的插件无法在生产 Kuiper 上使用。本文详细介绍了一种切实可用的插件开发环境设置和流程,推荐给 Kuiper 插件开发者使用。插件的开发和使用一般有如下流程:

  • 开发
    • 创建并开发插件项目
    • 编译调试插件
  • 部署
    • 编译生产环境可用插件
    • 部署插件到生产环境

插件开发

插件开发一般在开发环境中进行。在开发环境调试运行通过后再部署到生产环境中。

创建并开发插件项目

Kuiper 项目源代码的 plugins 目录下有一些插件范例。用户自定义的插件也可以在 Kuiper 项目中开发。但是为了便于代码管理,一般应当在 Kuiper 项目之外另建项目开发自定义插件。插件项目建议使用 Go module,典型的项目目录如下图所示:

  1. plugin_project
  2. sources //源(source)插件源代码目录
  3. mysource.go
  4. sinks //目标(sink)插件源代码目录
  5. mysink.go
  6. functions //函数(function)插件源代码目录
  7. myfunction.go
  8. target //编译结果目录
  9. go.mod //go module文件

插件开发需要扩展 Kuiper 内的接口,因此必须依赖于 Kuiper 项目。最简单的 go.mod 也需要包含对 Kuiper 的依赖。典型的 go.mod 如下:

  1. module samplePlugin
  2. go 1.13
  3. require (
  4. github.com/emqx/kuiper v0.0.0-20200323140757-60d00241372b
  5. )

Kuiper 插件有三种类型,源代码可放入对应的目录中。插件开发的详细方法请参看 EMQ X Kuiper 扩展插件开发教程 - 图2 (opens new window)。本文以目标(sink)为例,介绍插件的开发部署过程。我们将开发一个最基本的 MySql 目标,用于将流输出写入到 MySql 数据库中。

  • 新建名为 samplePlugin 的插件项目,采用上文的目录结构
  • 在 sinks 目录下,新建 mysql.go 文件
  • 编辑 mysql.go 文件以实现插件
    • 实现 api.Sink插件开发教程 - 图3 (opens new window)接口
    • 导出 Symbol:Mysql。它既可以是一个“构造函数”,也可以是结构体本身。当导出构造函数时,使用该插件的规则初始化时会用此函数创建该插件的实例;当导出为结构体时,所有使用该插件的规则将公用该插件同一个单例。如果插件有状态,例如数据库连接,建议使用第一种方法。
  • 编辑 go.mod, 添加 mysql 驱动模块

mysql.go 完整代码如下

  1. package main
  2. // 该例子为简化样例,仅建议测试时使用
  3. import (
  4. "database/sql"
  5. "fmt"
  6. "github.com/emqx/kuiper/common"
  7. "github.com/emqx/kuiper/xstream/api"
  8. _ "github.com/go-sql-driver/mysql"
  9. )
  10. type mysqlConfig struct {
  11. Url string `json:"url"`
  12. Table string `json:"table"`
  13. }
  14. type mysqlSink struct {
  15. conf *mysqlConfig
  16. //数据库连接实例
  17. db *sql.DB
  18. }
  19. func (m *mysqlSink) Configure(props map[string]interface{}) error {
  20. cfg := &mysqlConfig{}
  21. err := common.MapToStruct(props, cfg)
  22. if err != nil {
  23. return fmt.Errorf("read properties %v fail with error: %v", props, err)
  24. }
  25. if cfg.Url == ""{
  26. return fmt.Errorf("property Url is required")
  27. }
  28. if cfg.Table == ""{
  29. return fmt.Errorf("property Table is required")
  30. }
  31. return nil
  32. }
  33. func (m *mysqlSink) Open(ctx api.StreamContext) (err error) {
  34. logger := ctx.GetLogger()
  35. logger.Debug("Opening mysql sink")
  36. m.db, err = sql.Open("mysql", m.conf.Url)
  37. return
  38. }
  39. // 该函数为数据处理简化函数。
  40. func (m *mysqlSink) Collect(ctx api.StreamContext, item interface{}) error {
  41. logger := ctx.GetLogger()
  42. if v, ok := item.([]byte); ok {
  43. //TODO 生产环境中需要处理item unmarshall后的各种类型。
  44. // 默认的类型为 []map[string]interface{}
  45. // 如果sink的`dataTemplate`属性有设置,则可能为各种其他的类型
  46. logger.Debugf("mysql sink receive %s", item)
  47. //TODO 此处列名写死。生产环境中一般可从item中的键值对获取列名
  48. sql := fmt.Sprintf("INSERT INTO %s (`name`) VALUES ('%s')", m.conf.Table, v)
  49. logger.Debugf(sql)
  50. insert, err := m.db.Query(sql)
  51. if err != nil {
  52. return err
  53. }
  54. defer insert.Close()
  55. } else {
  56. logger.Debug("mysql sink receive non byte data")
  57. }
  58. return nil
  59. }
  60. func (m *mysqlSink) Close(ctx api.StreamContext) error {
  61. if m.db != nil {
  62. return m.db.Close()
  63. }
  64. return nil
  65. }
  66. // export the constructor function to be used to instantiates the plugin
  67. func Mysql() api.Sink {
  68. return &mysqlSink{}
  69. }

go.mod 完整代码如下

  1. module samplePlugin
  2. go 1.13
  3. require (
  4. github.com/emqx/kuiper v0.0.0-20200323140757-60d00241372b
  5. github.com/go-sql-driver/mysql v1.5.0
  6. )

编译调试插件

编译插件应当与编译 Kuiper 的环境一致。在开发环境中,典型的用法是在本地下载并编译 Kuiper 和插件,然后在本地 Kuiper 上调试插件功能;也可以在 Kuiper 的 docker 容器中编译插件,并用 Kuiper 容器运行调试。

本地编译

开发者可以在本地自行编译 Kuiper 和插件进行调试。其步骤如下:

  1. 下载 Kuiper 源代码 git clone https://github.com/emqx/kuiper.git
  2. 编译 Kuiper:在 Kuiper 目录下,运行 make
  3. 编译插件:

    1. 在插件项目下,运行 go mod edit -replace github.com/emqx/kuiper=$kuiperPath,使得 Kuiper 依赖指向本地 Kuiper,请替换 $kuiperPath 到步骤1下载目录,下同。
    2. 编译插件 so 到 Kuiper 插件目录下
    1. go build -trimpath --buildmode=plugin -o $kuiperPath/_build/$build/plugins/sinks/Mysql@v1.0.0.so sinks/mysql.go

Docker 编译

从0.3.0版本开始,Kuiper 提供了开发版本 docker 镜像。其中, 0.4.0及之后版本的开发镜像为x.x.x,例如``uiper:0.4.0;而 0.3.x 版本的开发镜像名为 x.x.x-dev,例如kuiper:0.3.0-dev`。与运行版本相比,开发版提供了 go 开发环境,使得用户可以在编译出在 Kuiper 正式发布版本中完全兼容的插件。Docker 中编译步骤如下:

  1. 运行 Kuiper 开发版本 docker。需要把本地插件目录 mount 到 docker 里的目录中,这样才能在 docker 中访问插件项目并编译。笔者的插件项目位于本地 /var/git 目录。下面的命令中,我们把本地的 /var/git目录映射到 docker 内的 /home 目录中。

    1. docker run -d --name kuiper-dev --mount type=bind,source=/var/git,target=/home emqx/kuiper:0.3.0-dev
  2. 在 docker 环境中编译插件,其原理与本地编译一致。编译出的插件置于插件项目的 target 目录中

    1. -- In host
    2. # docker exec -it kuiper-dev /bin/sh
    3. -- In docker instance
    4. # cd /home/samplePlugin
    5. # go mod edit -replace github.com/emqx/kuiper=/go/kuiper
    6. # go build -trimpath --buildmode=plugin -o /home/samplePlugin/target/plugins/sinks/Mysql@v1.0.0.so sinks/mysql.go

在插件项目中可以使用如下 shell 脚本自动编译及打包插件。修改脚本开头的参数以满足不同环境下的开发调试需求。

  1. #!/bin/sh
  2. export KUIPER_SOURCE=../kuiper
  3. export PLUGIN_TARGET=$KUIPER_SOURCE/plugins
  4. export ETC_TARGET=$KUIPER_SOURCE/etc
  5. export ZIP_TARGET=plugins
  6. export VERSION=0.0.1
  7. go mod edit -replace github.com/emqx/kuiper=$KUIPER_SOURCE
  8. go build -trimpath --buildmode=plugin -o $PLUGIN_TARGET/sinks/Mysql@v$VERSION.so sinks/mysql.go
  9. ## zip the output
  10. mkdir $ZIP_TARGET/sinks
  11. zip -o $ZIP_TARGET/sinks/mysql.zip $PLUGIN_TARGET/sinks/Mysql@v$VERSION.so

调试运行插件

在本地或 开发 Docker 中启动 Kuiper,创建流和规则,规则的 action 设置为 mysql 即可对自定义的 mysql sink 插件进行测试。创建流和规则的步骤请参考 Kuiper 文档插件开发教程 - 图4 (opens new window)。以下提供一个使用了 mysql 插件的规则供参考。

  1. {
  2. "id": "ruleTest",
  3. "sql": "SELECT * from demo",
  4. "actions": [
  5. {
  6. "log": {},
  7. "mysql":{
  8. "url": "user:test@tcp(localhost:3307)/user",
  9. "table": "test"
  10. }
  11. }
  12. ]
  13. }

开发调试中,也可以直接把插件 so 文件复制到相应 plugins 目录下,并重启 Kuiper 进行调试。开发环境的Docker 镜像,Kuiper默认在 /usr/local/kuiper 目录下。需要注意的是,插件重新编译后需要重启 Kuiper 才能载入新的版本。

插件部署

Kuiper 生产环境和开发环境如果不同,开发的插件需要重新编译并部署到生产环境。假设生产环境采用 Kuiper docker 进行部署,本节将描述如何部署插件到生产环境中。

插件编译

插件原则上应该与生产环境 Kuiper 采用相同环境进行编译。假设生产环境为 Kuiper docker,则应当采用与生产环境相同版本的 dev docker 环境编译插件。例如,生产环境采用 emqx/kuiper:0.3.0插件开发教程 - 图5 (opens new window)的 docker 镜像,则插件需要在emqx/kuiper:0.3.0-dev插件开发教程 - 图6 (opens new window) 的环境中进行编译。

编译过程请参考 Docker 编译。编译完成的插件可以直接在开发 Docker 中进行调试。

插件部署

可以采用 REST API插件开发教程 - 图7 (opens new window) 或者 CLI插件开发教程 - 图8 (opens new window) 进行插件管理。下文以 REST API 为例,将上一节编译的插件部署到生产环境中。

  1. 插件打包并放到 http 服务器。将上一节编译好的插件 .so 文件及默认配置文件(只有 source 需要) .yaml 文件一起打包到一个 .zip 文件中,假设为 mysqlSink.zip。把该文件放置到生产环境也可访问的 http 服务器中。

    • 某些插件可能依赖 Kuiper 环境未安装的库。用户可以选择自行到 Kuiper 服务器安装依赖或者在插件包中放入名为 install.sh 安装脚本和依赖。插件管理系统会运行插件包中的 install.sh 文件。详情请参考 插件文件格式
  2. 使用 REST API 创建插件:

    1. POST http://{$production_kuiper_ip}:9081/plugins/sinks
    2. Content-Type: application/json
    3. {"name":"mysql","file":"http://{$http_server_ip}/plugins/sinks/mysqlSink.zip"}
  3. 验证插件是否创建成功

    1. GET http://{$production_kuiper_ip}:9081/plugins/sinks/mysql

    返回

    1. {
    2. "name": "mysql",
    3. "version": "1.0.0"
    4. }

至此,插件部署成功。可以创建带有 mysql sink 的规则进行验证。