kafka-logger

描述

kafka-logger 是一个插件,可用作 ngx_lua nginx 模块的 Kafka 客户端驱动程序。

它可以将接口请求日志以 JSON 的形式推送给外部 Kafka 集群。如果在短时间内没有收到日志数据,请放心,它会在我们的批处理处理器中的计时器功能到期后自动发送日志。

有关 Apache APISIX 中 Batch-Processor 的更多信息,请参考。 Batch-Processor

属性

名称类型必选项默认值有效值描述
broker_listobject必须要推送的 kafka 的 broker 列表。
kafka_topicstring必须要推送的 topic。
producer_typestring可选async[“async”, “sync”]生产者发送消息的模式。
required_acksinteger可选1[0, 1, -1]生产者在确认一个请求发送完成之前需要收到的反馈信息的数量。这个参数是为了保证发送请求的可靠性。语义同 kafka 生产者的 acks 参数(如果设置 acks=0,则 producer 不会等待服务器的反馈。该消息会被立刻添加到 socket buffer 中并认为已经发送完成。如果设置 acks=1,leader 节点会将记录写入本地日志,并且在所有 follower 节点反馈之前就先确认成功。如果设置 acks=-1,这就意味着 leader 节点会等待所有同步中的副本确认之后再确认这条记录是否发送完成。)。
keystring可选用于消息的分区分配。
timeoutinteger可选3[1,…]发送数据的超时时间。
namestring可选“kafka logger”batch processor 的唯一标识。
meta_formatenum可选“default”[“default”,”origin”]default:获取请求信息以默认的 JSON 编码方式。origin:获取请求信息以 HTTP 原始请求方式。具体示例
include_req_bodyboolean可选false[false, true]是否包括请求 body。false: 表示不包含请求的 body ;true: 表示包含请求的 body。注意:如果请求 body 没办法完全放在内存中,由于 Nginx 的限制,我们没有办法把它记录下来。
include_req_body_exprarray可选include_req_body 开启时,基于 lua-resty-expr 表达式的结果进行记录。如果该选项存在,只有在表达式为真的时候才会记录请求 body。
include_resp_bodyboolean可选false[false, true]是否包括响应体。包含响应体,当为true
include_resp_body_exprarray可选是否采集响体,基于 lua-resty-expr。 该选项需要开启 include_resp_body
cluster_nameinteger可选1[0,…]kafka 集群的名称。当有两个或多个 kafka 集群时,可以指定不同的名称。只适用于 producer_type 是 async 模式。
producer_batch_numinteger可选200[1,…]对应 lua-resty-kafka 中的batch_num参数,聚合消息批量提交,单位为消息条数
producer_batch_sizeinteger可选1048576[0,…]对应 lua-resty-kafka 中的batch_size参数,单位为字节
producer_max_bufferinginteger可选50000[1,…]对应 lua-resty-kafka 中的max_buffering参数,最大缓冲区,单位为条
producer_time_lingerinteger可选1[1,…]对应 lua-resty-kafka 中的flush_time参数,单位为秒

本插件支持使用批处理器来聚合并批量处理条目(日志/数据)。这样可以避免插件频繁地提交数据,默认设置情况下批处理器会每 5 秒钟或队列中的数据达到 1000 条时提交数据,如需了解或自定义批处理器相关参数设置,请参考 Batch-Processor 配置部分。

meta_format 参考示例

  • default:

    1. {
    2. "upstream": "127.0.0.1:1980",
    3. "start_time": 1619414294760,
    4. "client_ip": "127.0.0.1",
    5. "service_id": "",
    6. "route_id": "1",
    7. "request": {
    8. "querystring": {
    9. "ab": "cd"
    10. },
    11. "size": 90,
    12. "uri": "/hello?ab=cd",
    13. "url": "http://localhost:1984/hello?ab=cd",
    14. "headers": {
    15. "host": "localhost",
    16. "content-length": "6",
    17. "connection": "close"
    18. },
    19. "body": "abcdef",
    20. "method": "GET"
    21. },
    22. "response": {
    23. "headers": {
    24. "connection": "close",
    25. "content-type": "text/plain; charset=utf-8",
    26. "date": "Mon, 26 Apr 2021 05:18:14 GMT",
    27. "server": "APISIX/2.5",
    28. "transfer-encoding": "chunked"
    29. },
    30. "size": 190,
    31. "status": 200
    32. },
    33. "server": {
    34. "hostname": "localhost",
    35. "version": "2.5"
    36. },
    37. "latency": 0
    38. }
  • origin:

    1. GET /hello?ab=cd HTTP/1.1
    2. host: localhost
    3. content-length: 6
    4. connection: close
    5. abcdef

工作原理

消息将首先写入缓冲区。 当缓冲区超过 batch_max_size 时,它将发送到 kafka 服务器, 或每个 buffer_duration 刷新缓冲区。

如果成功,则返回 true。 如果出现错误,则返回 nil,并带有描述错误的字符串(buffer overflow)。

Broker 列表

插件支持一次推送到多个 Broker,如下配置:

  1. {
  2. "127.0.0.1":9092,
  3. "127.0.0.1":9093
  4. }

如何启用

  1. 为特定路由启用 kafka-logger 插件。
  1. curl http://127.0.0.1:9080/apisix/admin/routes/1 -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '
  2. {
  3. "plugins": {
  4. "kafka-logger": {
  5. "broker_list" :
  6. {
  7. "127.0.0.1":9092
  8. },
  9. "kafka_topic" : "test2",
  10. "key" : "key1"
  11. }
  12. },
  13. "upstream": {
  14. "nodes": {
  15. "127.0.0.1:1980": 1
  16. },
  17. "type": "roundrobin"
  18. },
  19. "uri": "/hello"
  20. }'

测试插件

成功

  1. $ curl -i http://127.0.0.1:9080/hello
  2. HTTP/1.1 200 OK
  3. ...
  4. hello, world

插件元数据设置

名称类型必选项默认值有效值描述
log_formatobject可选{“host”: “$host”, “@timestamp”: “$time_iso8601”, “client_ip”: “$remote_addr”}以 JSON 格式的键值对来声明日志格式。对于值部分,仅支持字符串。如果是以 $ 开头,则表明是要获取 APISIX 变量Nginx 内置变量。请注意,该设置是全局生效的,因此在指定 log_format 后,将对所有绑定 kafka-logger 的 Route 或 Service 生效。

设置日志格式示例

  1. curl http://127.0.0.1:9080/apisix/admin/plugin_metadata/kafka-logger -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '
  2. {
  3. "log_format": {
  4. "host": "$host",
  5. "@timestamp": "$time_iso8601",
  6. "client_ip": "$remote_addr"
  7. }
  8. }'

在日志收集处,将得到类似下面的日志:

  1. {"host":"localhost","@timestamp":"2020-09-23T19:05:05-04:00","client_ip":"127.0.0.1","route_id":"1"}
  2. {"host":"localhost","@timestamp":"2020-09-23T19:05:05-04:00","client_ip":"127.0.0.1","route_id":"1"}

禁用插件

当您要禁用 kafka-logger 插件时,这很简单,您可以在插件配置中删除相应的 json 配置,无需重新启动服务,它将立即生效:

  1. $ curl http://127.0.0.1:9080/apisix/admin/routes/1 -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '
  2. {
  3. "methods": ["GET"],
  4. "uri": "/hello",
  5. "plugins": {},
  6. "upstream": {
  7. "type": "roundrobin",
  8. "nodes": {
  9. "127.0.0.1:1980": 1
  10. }
  11. }
  12. }'