组件开发

推荐参考已有的组件开发一个新的组件

公共接口

  1. // 生命周期接口
  2. type Lifecycle interface {
  3. // 初始化,例如初始化Kafka连接
  4. Init(context Context)
  5. // 启动运行,例如开始消费Kafka
  6. Start()
  7. // 停止
  8. Stop()
  9. }
  10. // 描述接口
  11. type Describable interface {
  12. // 类别,例如source
  13. Category() Category
  14. // 类型,例如kafka
  15. Type() Type
  16. // 自定义描述
  17. String() string
  18. }
  19. // 配置获取接口
  20. type Config interface {
  21. // 获取配置
  22. Config() interface{}
  23. }
  24. // 组件接口
  25. type Component interface {
  26. // 生命周期管理
  27. Lifecycle
  28. // 描述管理
  29. Describable
  30. // 配置管理
  31. Config
  32. }

source组件

source组件对接数据源输入,开发一个新的source插件需要实现如下接口

  1. // source组件接口
  2. type Source interface {
  3. Component
  4. Producer
  5. // 提交接口,确认sink端成功然后提交
  6. Commit(events []Event)
  7. }
  8. // 生产接口,source组件需要实现
  9. type Producer interface {
  10. // 对接数据源
  11. ProductLoop(productFunc ProductFunc)
  12. }

sink组件

sink组件对接输出端,开发一个新的sink插件需要实现如下接口

  1. // sink组件接口
  2. type Sink interface {
  3. Component
  4. Consumer
  5. }
  6. // 消费接口,sink组件需要实现
  7. type Consumer interface {
  8. // 对接输出端
  9. Consume(batch Batch) Result
  10. }

interceptor组件

interceptor组件对事件进行拦截处理,开发一个新的interceptor插件需要实现如下接口

  1. // interceptor组件接口
  2. type Interceptor interface {
  3. Component
  4. // 拦截处理
  5. Intercept(invoker Invoker, invocation Invocation) api.Result
  6. }

Note

请注意新增的组件需要放到pkg/include/include.go的import当中注册