Overview

一个Pipeline对应一个Sink。

Concurrency相关配置使用可参照自适应sink流量控制

Sink通用配置

Example

  1. sink:
  2. type: "dev"
  3. codec:
  4. type: json
  5. pretty: true
  6. parallelism: 16
  7. concurrency:
  8. enabled: true
  9. rtt:
  10. blockJudgeThreshold: 120%
  11. newRttWeigh: 0.4
  12. goroutine:
  13. initThreshold: 8
  14. maxGoroutine: 20
  15. unstableTolerate: 3
  16. channelLenOfCap: 0.4
  17. ratio:
  18. multi: 2
  19. linear: 2
  20. linearWhenBlocked: 4
  21. duration:
  22. unstable: 15
  23. stable: 30

parallelism

字段类型是否必填默认值含义
parallelismint非必填1sink客户端的并发度,可同时启动多个增大发送吞吐量,可设置的最大值为100

codec

字段类型是否必填默认值含义
codec非必填sink发送数据给下游时,数据使用的格式
codec.typestring非必填jsoncodec类型

type: json

字段类型是否必填默认值含义
codec.pretty非必填false是否进行json格式美化
codec.beatsFormat非必填false日志转成类filebeats格式:增加@timestamp字段,同时body字段命名为message

type: raw

用于发送采集的原始body数据。

Example

  1. sink:
  2. type: dev
  3. codec:
  4. type: raw

concurrency

Example

  1. sink:
  2. type: kafka
  3. concurrency:
  4. enabled: true
字段类型是否必填默认值含义
enabledbool非必填false是否开启sink自适应并发度控制

注:默认此功能不开启。

concurrency.goroutine

字段类型是否必填默认值含义
initThresholdint非必填16初始阈值,低于该值,协程数指数增长(快启动阶段),高于该值,线性增长
maxGoroutineint非必填30最大协程数
unstableTolerateint非必填3进入平稳阶段后,对网络波动的容忍,协程数需减少的情况包括rtt增大,请求失败,协程数需增大的情况包括网络平稳且channel饱和,相同情况出现3次才会触发协程数改变,若出现第四次,将会追加,直到相反的情况触发协程数改变。
channelLenOfCapfloat非必填0.4channel饱和阈值,超过该值认为协程数需增大,仅在rtt稳定情况下才会计算该值

concurrency.rtt

字段类型是否必填默认值含义
blockJudgeThresholdstring非必填120%判断rtt增大阈值,当新rtt超过当前平均rtt的到达一定程度,认为协程数需减少
newRttWeighfloat非必填0.5计算新的平均rtt时,新rtt的权重

注:blockJudgeThreshold(b)支持百分比和浮点数两种。

若为百分比,则判断是否 (新rtt/平均rtt)>b 。

若为浮点数,则判断是否 (新rtt-平均rtt)>b 。

concurrency.ratio

字段类型是否必填默认值含义
multiint非必填2快启动阶段,协程数指数增长速率
linearint非必填2(快启动之后)协程数线性增长或减少速率
linearWhenBlockedint非必填4channel满时(上游阻塞),协程数线性增长速率

concurrency.duration

字段类型是否必填默认值含义
unstableint非必填15非平稳阶段,收集数据计算协程数的时间间隔,单位秒
stableint非必填30平稳阶段,收集数据计算协程数的时间间隔,单位秒