Portable 插件 Go SDK

用户可利用 GO SDK 来开发 portable 插件,这个 SDK 提供了类似原生插件的 API,另外它提供了启动函数,用户只需填充插件信息即可。

插件开发

Symbols

由于 portable 插件 GO SDK 提供了类似原生插件的API,用户做简单的修改即可复用以前编写的原生插件

用户只需依赖 github.com/lf-edge/ekuiper/sdk/go 而不是 eKuiper 主项目即可编写 portable 插件,用户需要实现 github.com/lf-edge/ekuiper/sdk/go/api 中的相应接口即可

对于源,实现跟原生源插件中一样的接口即可

  1. type Source interface {
  2. // Open Should be sync function for normal case. The container will run it in go func
  3. Open(ctx StreamContext, consumer chan<- SourceTuple, errCh chan<- error)
  4. // Configure Called during initialization. Configure the source with the data source(e.g. topic for mqtt) and the properties read from the yaml
  5. Configure(datasource string, props map[string]interface{}) error
  6. Closable
  7. }

对于目标,实现跟原生目标插件中一样的接口即可

  1. type Sink interface {
  2. //Should be sync function for normal case. The container will run it in go func
  3. Open(ctx StreamContext) error
  4. //Called during initialization. Configure the sink with the properties from rule action definition
  5. Configure(props map[string]interface{}) error
  6. //Called when each row of data has transferred to this sink
  7. Collect(ctx StreamContext, data interface{}) error
  8. Closable
  9. }

对于函数,实现跟原生函数插件中一样的接口即可

  1. type Function interface {
  2. //The argument is a list of xsql.Expr
  3. Validate(args []interface{}) error
  4. //Execute the function, return the result and if execution is successful.
  5. //If execution fails, return the error and false.
  6. Exec(args []interface{}, ctx FunctionContext) (interface{}, bool)
  7. //If this function is an aggregate function. Each parameter of an aggregate function will be a slice
  8. IsAggregate() bool
  9. }

插件主程序

由于 portable 插件是一个独立的程序,需要编写成一个可执行程序。在 GO SDK 中, 提供了启动函数,用户只需填充插件信息即可。启动函数如下:

  1. package main
  2. import (
  3. "github.com/lf-edge/ekuiper/sdk/go/api"
  4. sdk "github.com/lf-edge/ekuiper/sdk/go/runtime"
  5. "os"
  6. )
  7. func main() {
  8. sdk.Start(os.Args, &sdk.PluginConfig{
  9. Name: "mirror",
  10. Sources: map[string]sdk.NewSourceFunc{
  11. "random": func() api.Source {
  12. return &randomSource{}
  13. },
  14. },
  15. Functions: map[string]sdk.NewFunctionFunc{
  16. "echo": func() api.Function {
  17. return &echo{}
  18. },
  19. },
  20. Sinks: map[string]sdk.NewSinkFunc{
  21. "file": func() api.Sink {
  22. return &fileSink{}
  23. },
  24. },
  25. })
  26. }

在主函数中调用了 sdk.Start来启动插件进程。在参数中,PluginConfig 定义了插件名字,源,目标,函数构造函数。注意这些信息必须跟插件安装包中的 json 描述文件一致

完整例子请参考这个例子

打包发布

我们需要将可执行文件和 json 描述文件一起打包,使用 GO SDK,仅仅需要 go build编译出可执行文件即可。由于在不同操作系统下编译出到的可执行文件名字有所不同,需要确保 json 描述文件中可执行文件名字的准确性。详细信息,请参考