Elasticsearch连接器

此连接器提供可以向Elasticsearch索引请求文档 算子操作的接收器要使用此连接器,请将以下依赖项之一添加到项目中,具体取决于Elasticsearch安装的版本:

Maven依赖支持自Elasticsearch版本
Flink连接器-elasticsearch_2.111.0.01.x中
Flink连接器-elasticsearch2_2.111.0.02.X
Flink连接器-elasticsearch5_2.111.3.05.x
Flink连接器-elasticsearch6_2.111.6.06及更高版本

请注意,流连接器当前不是二进制分发的一部分。有关如何使用库将程序打包以执行集群的信息,请参见此处

安装Elasticsearch

可以在此处找到有关设置Elasticsearch集群的说明确保设置并记住群集名称。必须在创建ElasticsearchSink针对群集的请求文档 算子操作时设置此选项

Elasticsearch Sink

ElasticsearchSink使用TransportClient(前6.x)或RestHighLevelClient(与6.x开始)与Elasticsearch集群通信。

以下示例显示了如何配置和创建接收器:

  1. import org.apache.flink.api.common.functions.RuntimeContext;
  2. import org.apache.flink.streaming.api.datastream.DataStream;
  3. import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink;
  4. import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
  5. import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
  6. import org.elasticsearch.action.index.IndexRequest;
  7. import org.elasticsearch.client.Requests;
  8. import org.elasticsearch.common.transport.InetSocketTransportAddress;
  9. import org.elasticsearch.common.transport.TransportAddress;
  10. import java.net.InetAddress;
  11. import java.util.ArrayList;
  12. import java.util.HashMap;
  13. import java.util.List;
  14. import java.util.Map;
  15. DataStream<String> input = ...;
  16. Map<String, String> config = new HashMap<>();
  17. config.put("cluster.name", "my-cluster-name");
  18. // This instructs the sink to emit after every element, otherwise they would be buffered
  19. config.put("bulk.flush.max.actions", "1");
  20. List<TransportAddress> transportAddresses = new ArrayList<String>();
  21. transportAddresses.add(new InetSocketTransportAddress("127.0.0.1", 9300));
  22. transportAddresses.add(new InetSocketTransportAddress("10.2.3.1", 9300));
  23. input.addSink(new ElasticsearchSink<>(config, transportAddresses, new ElasticsearchSinkFunction<String>() {
  24. public IndexRequest createIndexRequest(String element) {
  25. Map<String, String> json = new HashMap<>();
  26. json.put("data", element);
  27. return Requests.indexRequest()
  28. .index("my-index")
  29. .type("my-type")
  30. .source(json);
  31. }
  32. @Override
  33. public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
  34. indexer.add(createIndexRequest(element));
  35. }
  36. }));
  1. import org.apache.flink.api.common.functions.RuntimeContext;
  2. import org.apache.flink.streaming.api.datastream.DataStream;
  3. import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
  4. import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
  5. import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink;
  6. import org.elasticsearch.action.index.IndexRequest;
  7. import org.elasticsearch.client.Requests;
  8. import java.net.InetAddress;
  9. import java.net.InetSocketAddress;
  10. import java.util.ArrayList;
  11. import java.util.HashMap;
  12. import java.util.List;
  13. import java.util.Map;
  14. DataStream<String> input = ...;
  15. Map<String, String> config = new HashMap<>();
  16. config.put("cluster.name", "my-cluster-name");
  17. // This instructs the sink to emit after every element, otherwise they would be buffered
  18. config.put("bulk.flush.max.actions", "1");
  19. List<InetSocketAddress> transportAddresses = new ArrayList<>();
  20. transportAddresses.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
  21. transportAddresses.add(new InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300));
  22. input.addSink(new ElasticsearchSink<>(config, transportAddresses, new ElasticsearchSinkFunction<String>() {
  23. public IndexRequest createIndexRequest(String element) {
  24. Map<String, String> json = new HashMap<>();
  25. json.put("data", element);
  26. return Requests.indexRequest()
  27. .index("my-index")
  28. .type("my-type")
  29. .source(json);
  30. }
  31. @Override
  32. public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
  33. indexer.add(createIndexRequest(element));
  34. }
  35. }));
  1. import org.apache.flink.api.common.functions.RuntimeContext;
  2. import org.apache.flink.streaming.api.datastream.DataStream;
  3. import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
  4. import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
  5. import org.apache.http.HttpHost;
  6. import org.elasticsearch.action.index.IndexRequest;
  7. import org.elasticsearch.client.Requests;
  8. import java.util.ArrayList;
  9. import java.util.HashMap;
  10. import java.util.List;
  11. import java.util.Map;
  12. DataStream<String> input = ...;
  13. List<HttpHost> httpHost = new ArrayList<>();
  14. httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
  15. httpHosts.add(new HttpHost("10.2.3.1", 9200, "http"));
  16. // use a ElasticsearchSink.Builder to create an ElasticsearchSink
  17. ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(
  18. httpHosts,
  19. new ElasticsearchSinkFunction<String>() {
  20. public IndexRequest createIndexRequest(String element) {
  21. Map<String, String> json = new HashMap<>();
  22. json.put("data", element);
  23. return Requests.indexRequest()
  24. .index("my-index")
  25. .type("my-type")
  26. .source(json);
  27. }
  28. @Override
  29. public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
  30. indexer.add(createIndexRequest(element));
  31. }
  32. }
  33. );
  34. // configuration for the bulk requests; this instructs the sink to emit after every element, otherwise they would be buffered
  35. builder.setBulkFlushMaxActions(1);
  36. // provide a RestClientFactory for custom configuration on the internally created REST client
  37. builder.setRestClientFactory(
  38. restClientBuilder -> {
  39. restClientBuilder.setDefaultHeaders(...)
  40. restClientBuilder.setMaxRetryTimeoutMillis(...)
  41. restClientBuilder.setPathPrefix(...)
  42. restClientBuilder.setHttpClientConfigCallback(...)
  43. }
  44. );
  45. // finally, build and add the sink to the job's pipeline
  46. input.addSink(esSinkBuilder.build());
  1. import org.apache.flink.api.common.functions.RuntimeContext
  2. import org.apache.flink.streaming.api.datastream.DataStream
  3. import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink
  4. import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction
  5. import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer
  6. import org.elasticsearch.action.index.IndexRequest
  7. import org.elasticsearch.client.Requests
  8. import org.elasticsearch.common.transport.InetSocketTransportAddress
  9. import org.elasticsearch.common.transport.TransportAddress
  10. import java.net.InetAddress
  11. import java.util.ArrayList
  12. import java.util.HashMap
  13. import java.util.List
  14. import java.util.Map
  15. val input: DataStream[String] = ...
  16. val config = new java.util.HashMap[String, String]
  17. config.put("cluster.name", "my-cluster-name")
  18. // This instructs the sink to emit after every element, otherwise they would be buffered
  19. config.put("bulk.flush.max.actions", "1")
  20. val transportAddresses = new java.util.ArrayList[TransportAddress]
  21. transportAddresses.add(new InetSocketTransportAddress("127.0.0.1", 9300))
  22. transportAddresses.add(new InetSocketTransportAddress("10.2.3.1", 9300))
  23. input.addSink(new ElasticsearchSink(config, transportAddresses, new ElasticsearchSinkFunction[String] {
  24. def createIndexRequest(element: String): IndexRequest = {
  25. val json = new java.util.HashMap[String, String]
  26. json.put("data", element)
  27. return Requests.indexRequest()
  28. .index("my-index")
  29. .type("my-type")
  30. .source(json)
  31. }
  32. }))
  1. import org.apache.flink.api.common.functions.RuntimeContext
  2. import org.apache.flink.streaming.api.datastream.DataStream
  3. import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction
  4. import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer
  5. import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink
  6. import org.elasticsearch.action.index.IndexRequest
  7. import org.elasticsearch.client.Requests
  8. import java.net.InetAddress
  9. import java.net.InetSocketAddress
  10. import java.util.ArrayList
  11. import java.util.HashMap
  12. import java.util.List
  13. import java.util.Map
  14. val input: DataStream[String] = ...
  15. val config = new java.util.HashMap[String, String]
  16. config.put("cluster.name", "my-cluster-name")
  17. // This instructs the sink to emit after every element, otherwise they would be buffered
  18. config.put("bulk.flush.max.actions", "1")
  19. val transportAddresses = new java.util.ArrayList[InetSocketAddress]
  20. transportAddresses.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300))
  21. transportAddresses.add(new InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300))
  22. input.addSink(new ElasticsearchSink(config, transportAddresses, new ElasticsearchSinkFunction[String] {
  23. def createIndexRequest(element: String): IndexRequest = {
  24. val json = new java.util.HashMap[String, String]
  25. json.put("data", element)
  26. return Requests.indexRequest()
  27. .index("my-index")
  28. .type("my-type")
  29. .source(json)
  30. }
  31. }))
  1. import org.apache.flink.api.common.functions.RuntimeContext
  2. import org.apache.flink.streaming.api.datastream.DataStream
  3. import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer
  4. import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
  5. import org.apache.http.HttpHost
  6. import org.elasticsearch.action.index.IndexRequest
  7. import org.elasticsearch.client.Requests
  8. import java.util.ArrayList
  9. import java.util.List
  10. val input: DataStream[String] = ...
  11. val httpHosts = new java.util.ArrayList[HttpHost]
  12. httpHosts.add(new HttpHost("127.0.0.1", 9300, "http"))
  13. httpHosts.add(new HttpHost("10.2.3.1", 9300, "http"))
  14. val esSinkBuilder = new ElasticsearchSink.Builer[String](
  15. httpHosts,
  16. new ElasticsearchSinkFunction[String] {
  17. def createIndexRequest(element: String): IndexRequest = {
  18. val json = new java.util.HashMap[String, String]
  19. json.put("data", element)
  20. return Requests.indexRequest()
  21. .index("my-index")
  22. .type("my-type")
  23. .source(json)
  24. }
  25. }
  26. )
  27. // configuration for the bulk requests; this instructs the sink to emit after every element, otherwise they would be buffered
  28. builder.setBulkFlushMaxActions(1)
  29. // provide a RestClientFactory for custom configuration on the internally created REST client
  30. builder.setRestClientFactory(
  31. restClientBuilder -> {
  32. restClientBuilder.setDefaultHeaders(...)
  33. restClientBuilder.setMaxRetryTimeoutMillis(...)
  34. restClientBuilder.setPathPrefix(...)
  35. restClientBuilder.setHttpClientConfigCallback(...)
  36. }
  37. )
  38. // finally, build and add the sink to the job's pipeline
  39. input.addSink(esSinkBuilder.build)

对于仍使用现在已经过时,Elasticsearch版本TransportClient与Elasticsearch集群通信(即版本等于或小于5.x),注意如何MapStringS用于配置ElasticsearchSink创建内部使用时,将直接转发此配置映射TransportClient配置键在此处的Elasticsearch文档中进行了说明特别重要的是cluster.name必须与群集名称对应参数。

对于Elasticsearch 6.x及更高版本,在内部,RestHighLevelClient它用于群集通信。默认情况下,连接器使用REST客户端的默认配置。要为REST客户端进行自定义配置,用户可以RestClientFactory在设置ElasticsearchClient.Builder构建接收器提供实现

另请注意,该示例仅演示了为每个传入数据元执行单个索引请求。一般地,ElasticsearchSinkFunction可用于执行不同类型的多个请求(例如,DeleteRequestUpdateRequest等等)。

在内部,Flink Elasticsearch Sink的每个并行实例都使用a BulkProcessor向集群发送 算子操作请求。这将在将数据元批量发送到集群之前缓冲数据元。BulkProcessor执行批量同时申请一个,即会出现在正在进行的缓冲动作没有两个并发刷新。

Elasticsearch Sinks和Fault Tolerance

通过启用Flink的检查点,Flink Elasticsearch Sink可以保证至少一次向Elasticsearch集群传递 算子操作请求。它通过等待BulkProcessor检查点时的所有待处理 算子操作请求来实现这有效地确保Elasticsearch已成功确认触发检查点之前的所有请求,然后再继续处理发送到接收器的更多记录。

有关检查点和容错的更多详细信息,请参见容错文档

要使用容错Elasticsearch Sink,需要在运行环境中启用拓扑的检查点:

  1. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. env.enableCheckpointing(5000); // checkpoint every 5000 msecs
  1. val env = StreamExecutionEnvironment.getExecutionEnvironment()
  2. env.enableCheckpointing(5000) // checkpoint every 5000 msecs

注意:如果用户希望这样做,可以通过在创建的ElasticsearchSink调用disableFlushOnCheckpoint()来禁用刷新请注意,这实际上意味着接收器不再提供任何强大的传送保证,即使启用了拓扑的检查点也是如此。

使用嵌入式节点进行通信(仅适用于Elasticsearch 1.x)

对于Elasticsearch版本1.x,还支持使用嵌入式节点的通信。有关与Elasticsearch与嵌入式节点进行通信与之间的区别的信息,请参见此处TransportClient

下面是如何创建ElasticsearchSink使用嵌入式节点而不是TransportClient

  1. DataStream<String> input = ...;
  2. Map<String, String> config = new HashMap<>;
  3. // This instructs the sink to emit after every element, otherwise they would be buffered
  4. config.put("bulk.flush.max.actions", "1");
  5. config.put("cluster.name", "my-cluster-name");
  6. input.addSink(new ElasticsearchSink<>(config, new ElasticsearchSinkFunction<String>() {
  7. public IndexRequest createIndexRequest(String element) {
  8. Map<String, String> json = new HashMap<>();
  9. json.put("data", element);
  10. return Requests.indexRequest()
  11. .index("my-index")
  12. .type("my-type")
  13. .source(json);
  14. }
  15. @Override
  16. public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
  17. indexer.add(createIndexRequest(element));
  18. }
  19. }));
  1. val input: DataStream[String] = ...
  2. val config = new java.util.HashMap[String, String]
  3. config.put("bulk.flush.max.actions", "1")
  4. config.put("cluster.name", "my-cluster-name")
  5. input.addSink(new ElasticsearchSink(config, new ElasticsearchSinkFunction[String] {
  6. def createIndexRequest(element: String): IndexRequest = {
  7. val json = new java.util.HashMap[String, String]
  8. json.put("data", element)
  9. return Requests.indexRequest()
  10. .index("my-index")
  11. .type("my-type")
  12. .source(json)
  13. }
  14. }))

不同之处在于,现在我们不需要提供Elasticsearch节点的地址列表。

处理失败的Elasticsearch请求

Elasticsearch 算子操作请求可能由于各种原因而失败,包括临时饱和的节点队列容量或要编制索引的格式错误的文档。Flink Elasticsearch Sink允许用户通过简单地实现ActionRequestFailureHandler并将其提供给构造函数来指定请求失败的处理方式

以下是一个例子:

  1. DataStream<String> input = ...;
  2. input.addSink(new ElasticsearchSink<>(
  3. config, transportAddresses,
  4. new ElasticsearchSinkFunction<String>() {...},
  5. new ActionRequestFailureHandler() {
  6. @Override
  7. void onFailure(ActionRequest action,
  8. Throwable failure,
  9. int restStatusCode,
  10. RequestIndexer indexer) throw Throwable {
  11. if (ExceptionUtils.containsThrowable(failure, EsRejectedExecutionException.class)) {
  12. // full queue; re-add document for indexing
  13. indexer.add(action);
  14. } else if (ExceptionUtils.containsThrowable(failure, ElasticsearchParseException.class)) {
  15. // malformed document; simply drop request without failing sink
  16. } else {
  17. // for all other failures, fail the sink
  18. // here the failure is simply rethrown, but users can also choose to throw custom exceptions
  19. throw failure;
  20. }
  21. }
  22. }));
  1. val input: DataStream[String] = ...
  2. input.addSink(new ElasticsearchSink(
  3. config, transportAddresses,
  4. new ElasticsearchSinkFunction[String] {...},
  5. new ActionRequestFailureHandler {
  6. @throws(classOf[Throwable])
  7. override def onFailure(ActionRequest action,
  8. Throwable failure,
  9. int restStatusCode,
  10. RequestIndexer indexer) {
  11. if (ExceptionUtils.containsThrowable(failure, EsRejectedExecutionException.class)) {
  12. // full queue; re-add document for indexing
  13. indexer.add(action)
  14. } else if (ExceptionUtils.containsThrowable(failure, ElasticsearchParseException.class)) {
  15. // malformed document; simply drop request without failing sink
  16. } else {
  17. // for all other failures, fail the sink
  18. // here the failure is simply rethrown, but users can also choose to throw custom exceptions
  19. throw failure
  20. }
  21. }
  22. }))

上面的示例将让接收器重新添加由于队列容量饱和而失败的请求以及丢弃具有格式错误文档的请求,而不会使接收器失败。对于所有其他故障,接收器将失败。如果ActionRequestFailureHandler未向构造函数提供a ,则接收器将因任何类型的错误而失败。

请注意,onFailure仅在BulkProcessor内部完成所有退避重试尝试后才会发生故障默认情况下,BulkProcessor使用指数退避重试最多8次尝试。有关内部行为及其BulkProcessor配置方式的更多信息,请参阅以下部分。

默认情况下,如果未提供失败处理程序,则接收器将使用NoOpFailureHandler对所有类型的异常都失败的接收器连接器还提供一种RetryRejectedExecutionFailureHandler实现,实现始终重新添加由于队列容量饱和而失败的请求。

重要信息在发生故障时将请求重新添加回内部BulkProcessor将导致更长的检查点,因为接收器还需要等待在检查点时刷新重新添加的请求。例如,在使用RetryRejectedExecutionFailureHandler时,检查点需要等到Elasticsearch节点队列具有足够的容量用于所有挂起的请求。这也意味着如果重新添加的请求永远不会成功,那么检查点将永远不会完成。

对于Elasticsearch 1.x故障处理:对于Elasticsearch 1.x中,它是不相匹配的故障类型可行的,因为确切类型无法通过旧版本的Java客户端API(因此被检索,该类型将一般例外小号并且仅在失败消息中有所不同)。在这种情况下,建议匹配提供的REST状态代码。

配置内部批量处理器

BulkProcessor通过在提供的内容中设置以下值,可以进一步配置内部以了解刷新缓冲 算子操作请求的行为Map<String, String>

  • bulk.flush.max.actions:刷新前缓冲的最大 算子操作量。
  • bulk.flush.max.size.mb:刷新前缓冲区的最大数据大小(以兆字节为单位)。
  • bulk.flush.interval.ms:无论缓冲 算子操作的数量或大小如何都要刷新的时间间隔。对于2.x及更高版本,还支持配置重试临时请求错误的方式:

  • bulk.flush.backoff.enable:如果一个或多个 算子操作由于临时 算子操作而失败,是否对刷新执行具有退避延迟的重试EsRejectedExecutionException

  • bulk.flush.backoff.type:退避延迟的类型,CONSTANT或者EXPONENTIAL
  • bulk.flush.backoff.delay:退避的延迟量。对于恒定的退避,这只是每次重试之间的延迟。对于指数退避,这是初始基本延迟。
  • bulk.flush.backoff.retries:要尝试的退避重试次数。有关Elasticsearch的更多信息,请访问此处

将Elasticsearch Connector打包到Uber-Jar中

为了执行Flink程序,建议构建一个包含所有依赖项的所谓uber-jar(可执行jar)(有关详细信息,请参阅此处)。

或者,您可以将连接器的jar文件放入Flink的lib/文件夹中,以使其在系统范围内可用,即对于所有正在运行的作业。