transformer

带有条件判断的函数式数据处理interceptor。
属于source interceptor。

使用场景

  • 日志提取出日志级别level,并且drop掉DEBUG日志
  • 日志里混合包括有json和plain的日志形式,可以判断json形式的日志并且进行处理
  • 根据访问日志里的status code,增加不同的topic字段

示例参考

使用方式

transformer会按照配置的actions里顺序执行所有的action。action类似函数的方式,可以写入参数,参数一般为event里的字段。
同时,每个action里还可能包括额外的控制字段。比如下面regex(body),body即为regex的参数,pattern为额外的字段。

  1. interceptors:
  2. - type: transformer
  3. actions:
  4. - action: regex(body)
  5. pattern: ^(?P<time>[^ ^Z]+Z) (?P<level>[^ ]*) (?P<log>.*)$
  6. - action: add(topic, common)

另外,action还支持条件判断if-then-else的方式:

  1. - if: <condition>
  2. then:
  3. - action: funcA()
  4. else:
  5. - action: funcB()

其中,condition条件判断也为函数的形式。

  1. interceptors:
  2. - type: transformer
  3. actions:
  4. - if: equal(status, 404)
  5. then:
  6. - action: add(topic, not_found)
  7. - action: return()

action

共有字段

  • ignoreError: 表示是否忽略该action处理过程中的错误,并且不会打印错误日志。

Example

  1. - type: transformer
  2. actions:
  3. - action: regex(body)
  4. pattern: (?<ip>\S+) (?<id>\S+) (?<u>\S+) (?<time>\[.*?\]) (?<url>\".*?\") (?<status>\S+) (?<size>\S+)
  5. ignoreError: true

这里的ignoreError设置为true,表示会忽略该正则匹配的错误,并且会继续执行后续的action。

add(key, value)

给event添加额外的key:value。

Example

  1. - action: add(topic, loggie)

input:

  1. {
  2. "body": "2021-02-16T09:21:20.545525544Z DEBUG this is log body"
  3. }

output:

  1. {
  2. "body": "2021-02-16T09:21:20.545525544Z DEBUG this is log body",
  3. "topic": "loggie"
  4. }

copy(from, to)

复制event里的字段。

参数:

  • from: 原有的key
  • to: 复制后的key

Example

  1. - action: copy(foo, bar)

input:

  1. {
  2. "body": "2021-02-16T09:21:20.545525544Z DEBUG this is log body",
  3. "foo": "loggie"
  4. }

output:

  1. {
  2. "body": "2021-02-16T09:21:20.545525544Z DEBUG this is log body",
  3. "foo": "loggie",
  4. "bar": "loggie"
  5. }

move(from, to)

移动/重命名字段。

参数:

  • from: 原有的key
  • to: 移动后的key

Example

  1. - action: move(foo, bar)

input:

  1. {
  2. "body": "2021-02-16T09:21:20.545525544Z DEBUG this is log body",
  3. "foo": "loggie"
  4. }

output:

  1. {
  2. "body": "2021-02-16T09:21:20.545525544Z DEBUG this is log body",
  3. "bar": "loggie"
  4. }

set(key, value)

更新字段key的值为value。

参数:

  • key: 需更新的字段
  • value: 更新的后的值

Example

  1. - action: set(foo, test)

input:

  1. {
  2. "body": "2021-02-16T09:21:20.545525544Z DEBUG this is log body",
  3. "foo": "loggie"
  4. }

output:

  1. {
  2. "body": "2021-02-16T09:21:20.545525544Z DEBUG this is log body",
  3. "foo": "test"
  4. }

del(key1, key2…)

删除字段。可填写多个字段key。

Example

  1. - action: del(foo)

input:

  1. {
  2. "body": "2021-02-16T09:21:20.545525544Z DEBUG this is log body",
  3. "foo": "loggie"
  4. }

output:

  1. {
  2. "body": "2021-02-16T09:21:20.545525544Z DEBUG this is log body",
  3. }

underRoot(key)

将嵌套的字段放在根部(最外层)。

Example

  1. - action: underRoot(state)

input:

  1. {
  2. "body": "2021-02-16T09:21:20.545525544Z DEBUG this is log body",
  3. "state": {
  4. "node": "127.0.0.1",
  5. "phase": "running"
  6. }
  7. }

output:

  1. {
  2. "body": "2021-02-16T09:21:20.545525544Z DEBUG this is log body",
  3. "node": "127.0.0.1",
  4. "phase": "running"
  5. }

fmt(key)

将某个字段的值重新渲染,可根据其他的字段值组成一个值。如果key不存在则会新增该字段。

额外字段:

  • pattern: 必填,表示格式化的规则,比如${state.node}-${state.phase}。如果pattern为固定值,则类似set(key, value)。

Example

  1. - action: fmt(status)
  2. pattern: ${state.node} is ${state.phase}

input:

  1. {
  2. "body": "2021-02-16T09:21:20.545525544Z DEBUG this is log body",
  3. "state": {
  4. "node": "127.0.0.1",
  5. "phase": "running"
  6. }
  7. }

output:

  1. {
  2. "body": "2021-02-16T09:21:20.545525544Z DEBUG this is log body",
  3. "state": {
  4. "node": "127.0.0.1",
  5. "phase": "running"
  6. },
  7. status: "127.0.0.1 is running"
  8. }

timestamp(key)

字段的时间格式转换。

额外字段:

  • fromLayout: 必填,指定字段的时间格式(golang形式),也可为unixunix_ms
  • fromLocation: 指定字段的时区,也可为UTC或者Local,如果为空,则为UTC
  • toLayout: 必填,转换后的时间格式(golang形式),也可为unixunix_ms
  • toLocation: 转换后的时间时区,也可为UTC或者Local,如果为空,则为UTC

Example

  1. - action: timestamp(time)
  2. fromLayout: "2006-01-02 15:04:05"
  3. fromLocation: Asia/Shanghai
  4. toLayout: unix_ms
  5. toLocation: Local

input:

  1. {
  2. "body": "2021-02-16T09:21:20.545525544Z DEBUG this is log body",
  3. "time": "2022-06-28 11:24:35"
  4. }

output:

  1. {
  2. "body": "2021-02-16T09:21:20.545525544Z DEBUG this is log body",
  3. "time": 1656386675000
  4. }

以上的layout参数需要填写golang形式,可参考:

  1. const (
  2. Layout = "01/02 03:04:05PM '06 -0700" // The reference time, in numerical order.
  3. ANSIC = "Mon Jan _2 15:04:05 2006"
  4. UnixDate = "Mon Jan _2 15:04:05 MST 2006"
  5. RubyDate = "Mon Jan 02 15:04:05 -0700 2006"
  6. RFC822 = "02 Jan 06 15:04 MST"
  7. RFC822Z = "02 Jan 06 15:04 -0700" // RFC822 with numeric zone
  8. RFC850 = "Monday, 02-Jan-06 15:04:05 MST"
  9. RFC1123 = "Mon, 02 Jan 2006 15:04:05 MST"
  10. RFC1123Z = "Mon, 02 Jan 2006 15:04:05 -0700" // RFC1123 with numeric zone
  11. RFC3339 = "2006-01-02T15:04:05Z07:00"
  12. RFC3339Nano = "2006-01-02T15:04:05.999999999Z07:00"
  13. Kitchen = "3:04PM"
  14. // Handy time stamps.
  15. Stamp = "Jan _2 15:04:05"
  16. StampMilli = "Jan _2 15:04:05.000"
  17. StampMicro = "Jan _2 15:04:05.000000"
  18. StampNano = "Jan _2 15:04:05.000000000"
  19. )

还可以根据实际情况修改。

regex(key)

使用正则的方式切分日志,提取字段。 另外也可以为regex(key, to)。

参数:

  • key: 必填,正则提取的字段
  • to: 非必填,提取后所有的字段放置到的key。默认为空,表示将字段提取到根部

额外字段:

  • pattern: 必填,正则表达式

Example

  1. - action: regex(body)
  2. pattern: (?<ip>\S+) (?<id>\S+) (?<u>\S+) (?<time>\[.*?\]) (?<url>\".*?\") (?<status>\S+) (?<size>\S+)

input:

  1. {
  2. "body": "10.244.0.1 - - [13/Dec/2021:12:40:48 +0000] 'GET / HTTP/1.1' 404 683",
  3. }

output:

  1. {
  2. "ip": "10.244.0.1",
  3. "id": "-",
  4. "u": "-",
  5. "time": "[13/Dec/2021:12:40:48 +0000]",
  6. "url": "GET / HTTP/1.1",
  7. "status": "404",
  8. "size": "683",
  9. }

jsonDecode(key)

将json文本反序列化。 也可以为jsonDecode(key, to)。

参数:

  • key: 必填,对应的字段key
  • to: 非必填,提取后所有的字段放置到的key。默认为空,则表示将字段提取到根部

Example

  1. - action: jsonDecode(body)

input:

  1. {
  2. "body": `{"log":"I0610 08:29:07.698664 Waiting for caches to sync", "stream":"stderr", "time":"2021-06-10T08:29:07.698731204Z"}`,
  3. }

output:

  1. {
  2. "log": "I0610 08:29:07.698664 Waiting for caches to sync",
  3. "stream": "stderr",
  4. "time": "2021-06-10T08:29:07.698731204Z"
  5. }

strconv(key, type)

字段值类型转换。

参数:

  • key: 目标字段
  • type: 转换后的类型,可为bool, int, float

Example

  1. - action: strconv(code, int)

input:

  1. {
  2. "body": "2021-02-16T09:21:20.545525544Z DEBUG this is log body",
  3. "code": "200"
  4. }

output:

  1. {
  2. "body": "2021-02-16T09:21:20.545525544Z DEBUG this is log body",
  3. "code": 200
  4. }

print()

打印event。一般用于调试阶段使用。

return()

控制类型函数,执行到return()后返回,不再继续执行下面的action。

dropEvent()

控制类型函数,执行到dropEvent()后会将该event直接丢弃。这意味着该条数据会丢失,也不会继续被后续的interceptor或者sink处理消费。

Example

  1. interceptors:
  2. - type: transformer
  3. actions:
  4. - action: regex(body)
  5. pattern: ^(?P<time>[^ ^Z]+Z) (?P<level>[^ ]*) (?P<log>.*)$
  6. - if: equal(level, DEBUG)
  7. then:
  8. - action: dropEvent()

假设日志为:2021-02-16T09:21:20.545525544Z DEBUG this is log body,则满足level字段为DEBUG,会直接丢弃该条日志。

condition

条件判断类函数。

操作符

  • AND:表示两个condition执行结果的

Example

  1. interceptors:
  2. - type: transformer
  3. actions:
  4. - if: equal(level, DEBUG) AND equal(code, 200)
  5. then:
  6. - action: dropEvent()
  • OR:表示两个condition执行结果的

Example

  1. interceptors:
  2. - type: transformer
  3. actions:
  4. - if: equal(level, DEBUG) OR equal(level, INFO)
  5. then:
  6. - action: dropEvent()
  • NOT:表示对condition执行结果取

Example

  1. interceptors:
  2. - type: transformer
  3. actions:
  4. - if: NOT equal(level, DEBUG)
  5. then:
  6. - action: dropEvent()

equal(key, target)

目标字段值是否和参数值target相等。

contain(key, target)

字段值是否包含参数值target。

exist(key)

目标字段是否存在或者是否为空。

greater(key, value)

目标字段的值是否大于参数值value。

less(key, value)

目标字段的值是否小于参数值value。

hasPrefix(key, prefix)

目标字段的值是否包含prefix前缀。

match(key, regex)

目标字段的值是否和参数regex正则匹配。

oneOf(key, value1, value2…)

目标字段的值是否是参数值value1…其中之一。