Ingest 节点

Ingest 节点是 Elasticsearch 5.0 新增的节点类型和功能。其开启方式为:在 elasticsearch.yml 中定义:

  1. node.ingest: true

Ingest 节点的基础原理,是:节点接收到数据之后,根据请求参数中指定的管道流 id,找到对应的已注册管道流,对数据进行处理,然后将处理过后的数据,按照 Elasticsearch 标准的 indexing 流程继续运行。

创建管道流

  1. curl -XPUT http://localhost:9200/_ingest/pipeline/my-pipeline-id -d '
  2. {
  3. "description" : "describe pipeline",
  4. "processors" : [
  5. {
  6. "convert" : {
  7. "field": "foo",
  8. "type": "integer"
  9. }
  10. }
  11. ]
  12. }'

然后发送端带着这个 my-pipeline-id 发请求就好了。示例见本书 beats 章节的介绍。

测试管道流

想知道自己的 ingest 配置是否正确,可以通过仿真接口测试验证一下:

  1. curl -XPUT http://localhost:9200/_ingest/pipeline/_simulate -d '
  2. {
  3. "pipeline" : {
  4. "description" : "describe pipeline",
  5. "processors" : [
  6. {
  7. "set" : {
  8. "field": "foo",
  9. "value": "bar"
  10. }
  11. }
  12. ]
  13. },
  14. "docs" : [
  15. {
  16. "_index": "index",
  17. "_type": "type",
  18. "_id": "id",
  19. "_source": {
  20. "foo" : "bar"
  21. }
  22. }
  23. ]
  24. }'

处理器

Ingest 节点的处理器,相当于 Logstash 的 filter 插件。事实上其主要处理器就是直接移植了 Logstash 的 filter 代码成 Java 版本。目前最重要的几个处理器分别是:

convert

  1. {
  2. "convert": {
  3. "field" : "foo",
  4. "type": "integer"
  5. }
  6. }

grok

  1. {
  2. "grok": {
  3. "field": "message",
  4. "patterns": ["my %{FAVORITE_DOG:dog} is colored %{RGB:color}"]
  5. "pattern_definitions" : {
  6. "FAVORITE_DOG" : "beagle",
  7. "RGB" : "RED|GREEN|BLUE"
  8. }
  9. }
  10. }

gsub

  1. {
  2. "gsub": {
  3. "field": "field1",
  4. "pattern": "\.",
  5. "replacement": "-"
  6. }
  7. }

date

  1. {
  2. "date" : {
  3. "field" : "initial_date",
  4. "target_field" : "timestamp",
  5. "formats" : ["dd/MM/yyyy hh:mm:ss"],
  6. "timezone" : "Europe/Amsterdam"
  7. }
  8. }

其他处理器插件

除了内置的处理器之外,还有 3 个处理器,官方选择了以插件性质单独发布,它们是 attachement,geoip 和 user-agent 。原因应该是这 3 个处理器需要额外数据模块,而且处理性能一般,担心拖累 ES 集群。

它们可以和其他普通 ES 插件一样安装:

  1. sudo bin/elasticsearch-plugin install ingest-geoip

使用方式和其他处理器一样:

  1. curl -XPUT http://localhost:9200/_ingest/pipeline/my-pipeline-id-2 -d '
  2. {
  3. "description" : "Add geoip info",
  4. "processors" : [
  5. {
  6. "geoip" : {
  7. "field" : "ip",
  8. "target_field" : "geo",
  9. "database_file" : "GeoLite2-Country.mmdb.gz"
  10. }
  11. }
  12. ]
  13. }