Elasticsearch Connector

This connector provides sinks that can request document actions to anElasticsearch Index. To use this connector, add oneof the following dependencies to your project, depending on the versionof the Elasticsearch installation:

Maven DependencySupported sinceElasticsearch version
flink-connector-elasticsearch2_2.111.0.02.x
flink-connector-elasticsearch5_2.111.3.05.x
flink-connector-elasticsearch6_2.111.6.06 and later versions

Note that the streaming connectors are currently not part of the binarydistribution. See here for informationabout how to package the program with the libraries for cluster execution.

Installing Elasticsearch

Instructions for setting up an Elasticsearch cluster can be foundhere.Make sure to set and remember a cluster name. This must be set whencreating an ElasticsearchSink for requesting document actions against your cluster.

Elasticsearch Sink

The ElasticsearchSink uses a TransportClient (before 6.x) or RestHighLevelClient (starting with 6.x) to communicate with anElasticsearch cluster.

The example below shows how to configure and create a sink:

  1. import org.apache.flink.api.common.functions.RuntimeContext;
  2. import org.apache.flink.streaming.api.datastream.DataStream;
  3. import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
  4. import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
  5. import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink;
  6. import org.elasticsearch.action.index.IndexRequest;
  7. import org.elasticsearch.client.Requests;
  8. import java.net.InetAddress;
  9. import java.net.InetSocketAddress;
  10. import java.util.ArrayList;
  11. import java.util.HashMap;
  12. import java.util.List;
  13. import java.util.Map;
  14. DataStream<String> input = ...;
  15. Map<String, String> config = new HashMap<>();
  16. config.put("cluster.name", "my-cluster-name");
  17. // This instructs the sink to emit after every element, otherwise they would be buffered
  18. config.put("bulk.flush.max.actions", "1");
  19. List<InetSocketAddress> transportAddresses = new ArrayList<>();
  20. transportAddresses.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
  21. transportAddresses.add(new InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300));
  22. input.addSink(new ElasticsearchSink<>(config, transportAddresses, new ElasticsearchSinkFunction<String>() {
  23. public IndexRequest createIndexRequest(String element) {
  24. Map<String, String> json = new HashMap<>();
  25. json.put("data", element);
  26. return Requests.indexRequest()
  27. .index("my-index")
  28. .type("my-type")
  29. .source(json);
  30. }
  31. @Override
  32. public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
  33. indexer.add(createIndexRequest(element));
  34. }
  35. }));
  1. import org.apache.flink.api.common.functions.RuntimeContext;
  2. import org.apache.flink.streaming.api.datastream.DataStream;
  3. import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
  4. import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
  5. import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
  6. import org.apache.http.HttpHost;
  7. import org.elasticsearch.action.index.IndexRequest;
  8. import org.elasticsearch.client.Requests;
  9. import java.util.ArrayList;
  10. import java.util.HashMap;
  11. import java.util.List;
  12. import java.util.Map;
  13. DataStream<String> input = ...;
  14. List<HttpHost> httpHosts = new ArrayList<>();
  15. httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
  16. httpHosts.add(new HttpHost("10.2.3.1", 9200, "http"));
  17. // use a ElasticsearchSink.Builder to create an ElasticsearchSink
  18. ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(
  19. httpHosts,
  20. new ElasticsearchSinkFunction<String>() {
  21. public IndexRequest createIndexRequest(String element) {
  22. Map<String, String> json = new HashMap<>();
  23. json.put("data", element);
  24. return Requests.indexRequest()
  25. .index("my-index")
  26. .type("my-type")
  27. .source(json);
  28. }
  29. @Override
  30. public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
  31. indexer.add(createIndexRequest(element));
  32. }
  33. }
  34. );
  35. // configuration for the bulk requests; this instructs the sink to emit after every element, otherwise they would be buffered
  36. esSinkBuilder.setBulkFlushMaxActions(1);
  37. // provide a RestClientFactory for custom configuration on the internally created REST client
  38. esSinkBuilder.setRestClientFactory(
  39. restClientBuilder -> {
  40. restClientBuilder.setDefaultHeaders(...)
  41. restClientBuilder.setMaxRetryTimeoutMillis(...)
  42. restClientBuilder.setPathPrefix(...)
  43. restClientBuilder.setHttpClientConfigCallback(...)
  44. }
  45. );
  46. // finally, build and add the sink to the job's pipeline
  47. input.addSink(esSinkBuilder.build());
  1. import org.apache.flink.api.common.functions.RuntimeContext
  2. import org.apache.flink.streaming.api.datastream.DataStream
  3. import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction
  4. import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer
  5. import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink
  6. import org.elasticsearch.action.index.IndexRequest
  7. import org.elasticsearch.client.Requests
  8. import java.net.InetAddress
  9. import java.net.InetSocketAddress
  10. import java.util.ArrayList
  11. import java.util.HashMap
  12. import java.util.List
  13. import java.util.Map
  14. val input: DataStream[String] = ...
  15. val config = new java.util.HashMap[String, String]
  16. config.put("cluster.name", "my-cluster-name")
  17. // This instructs the sink to emit after every element, otherwise they would be buffered
  18. config.put("bulk.flush.max.actions", "1")
  19. val transportAddresses = new java.util.ArrayList[InetSocketAddress]
  20. transportAddresses.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300))
  21. transportAddresses.add(new InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300))
  22. input.addSink(new ElasticsearchSink(config, transportAddresses, new ElasticsearchSinkFunction[String] {
  23. def createIndexRequest(element: String): IndexRequest = {
  24. val json = new java.util.HashMap[String, String]
  25. json.put("data", element)
  26. return Requests.indexRequest()
  27. .index("my-index")
  28. .type("my-type")
  29. .source(json)
  30. }
  31. }))
  1. import org.apache.flink.api.common.functions.RuntimeContext
  2. import org.apache.flink.streaming.api.datastream.DataStream
  3. import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction
  4. import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer
  5. import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
  6. import org.apache.http.HttpHost
  7. import org.elasticsearch.action.index.IndexRequest
  8. import org.elasticsearch.client.Requests
  9. import java.util.ArrayList
  10. import java.util.List
  11. val input: DataStream[String] = ...
  12. val httpHosts = new java.util.ArrayList[HttpHost]
  13. httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"))
  14. httpHosts.add(new HttpHost("10.2.3.1", 9200, "http"))
  15. val esSinkBuilder = new ElasticsearchSink.Builer[String](
  16. httpHosts,
  17. new ElasticsearchSinkFunction[String] {
  18. def createIndexRequest(element: String): IndexRequest = {
  19. val json = new java.util.HashMap[String, String]
  20. json.put("data", element)
  21. return Requests.indexRequest()
  22. .index("my-index")
  23. .type("my-type")
  24. .source(json)
  25. }
  26. }
  27. )
  28. // configuration for the bulk requests; this instructs the sink to emit after every element, otherwise they would be buffered
  29. esSinkBuilder.setBulkFlushMaxActions(1)
  30. // provide a RestClientFactory for custom configuration on the internally created REST client
  31. esSinkBuilder.setRestClientFactory(
  32. restClientBuilder -> {
  33. restClientBuilder.setDefaultHeaders(...)
  34. restClientBuilder.setMaxRetryTimeoutMillis(...)
  35. restClientBuilder.setPathPrefix(...)
  36. restClientBuilder.setHttpClientConfigCallback(...)
  37. }
  38. )
  39. // finally, build and add the sink to the job's pipeline
  40. input.addSink(esSinkBuilder.build)

For Elasticsearch versions that still uses the now deprecated TransportClient to communicatewith the Elasticsearch cluster (i.e., versions equal or below 5.x), note how a Map of Stringsis used to configure the ElasticsearchSink. This config map will be directlyforwarded when creating the internally used TransportClient.The configuration keys are documented in the Elasticsearch documentationhere.Especially important is the cluster.name parameter that must correspond tothe name of your cluster.

For Elasticsearch 6.x and above, internally, the RestHighLevelClient is used for cluster communication.By default, the connector uses the default configurations for the REST client. To have customconfiguration for the REST client, users can provide a RestClientFactory implementation when setting up the ElasticsearchClient.Builder that builds the sink.

Also note that the example only demonstrates performing a single indexrequest for each incoming element. Generally, the ElasticsearchSinkFunctioncan be used to perform multiple requests of different types (ex.,DeleteRequest, UpdateRequest, etc.).

Internally, each parallel instance of the Flink Elasticsearch Sink usesa BulkProcessor to send action requests to the cluster.This will buffer elements before sending them in bulk to the cluster. The BulkProcessorexecutes bulk requests one at a time, i.e. there will be no two concurrentflushes of the buffered actions in progress.

Elasticsearch Sinks and Fault Tolerance

With Flink’s checkpointing enabled, the Flink Elasticsearch Sink guaranteesat-least-once delivery of action requests to Elasticsearch clusters. It doesso by waiting for all pending action requests in the BulkProcessor at thetime of checkpoints. This effectively assures that all requests before thecheckpoint was triggered have been successfully acknowledged by Elasticsearch, beforeproceeding to process more records sent to the sink.

More details on checkpoints and fault tolerance are in the fault tolerance docs.

To use fault tolerant Elasticsearch Sinks, checkpointing of the topology needs to be enabled at the execution environment:

  1. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. env.enableCheckpointing(5000); // checkpoint every 5000 msecs
  1. val env = StreamExecutionEnvironment.getExecutionEnvironment()
  2. env.enableCheckpointing(5000) // checkpoint every 5000 msecs

NOTE: Users can disable flushing if they wish to do so, by callingdisableFlushOnCheckpoint() on the created ElasticsearchSink. Be awarethat this essentially means the sink will not provide any strongdelivery guarantees anymore, even with checkpoint for the topology enabled.

Handling Failing Elasticsearch Requests

Elasticsearch action requests may fail due to a variety of reasons, includingtemporarily saturated node queue capacity or malformed documents to be indexed.The Flink Elasticsearch Sink allows the user to specify how requestfailures are handled, by simply implementing an ActionRequestFailureHandler andproviding it to the constructor.

Below is an example:

  1. DataStream<String> input = ...;
  2. input.addSink(new ElasticsearchSink<>(
  3. config, transportAddresses,
  4. new ElasticsearchSinkFunction<String>() {...},
  5. new ActionRequestFailureHandler() {
  6. @Override
  7. void onFailure(ActionRequest action,
  8. Throwable failure,
  9. int restStatusCode,
  10. RequestIndexer indexer) throw Throwable {
  11. if (ExceptionUtils.findThrowable(failure, EsRejectedExecutionException.class).isPresent()) {
  12. // full queue; re-add document for indexing
  13. indexer.add(action);
  14. } else if (ExceptionUtils.findThrowable(failure, ElasticsearchParseException.class).isPresent()) {
  15. // malformed document; simply drop request without failing sink
  16. } else {
  17. // for all other failures, fail the sink
  18. // here the failure is simply rethrown, but users can also choose to throw custom exceptions
  19. throw failure;
  20. }
  21. }
  22. }));
  1. val input: DataStream[String] = ...
  2. input.addSink(new ElasticsearchSink(
  3. config, transportAddresses,
  4. new ElasticsearchSinkFunction[String] {...},
  5. new ActionRequestFailureHandler {
  6. @throws(classOf[Throwable])
  7. override def onFailure(ActionRequest action,
  8. Throwable failure,
  9. int restStatusCode,
  10. RequestIndexer indexer) {
  11. if (ExceptionUtils.findThrowable(failure, EsRejectedExecutionException.class).isPresent()) {
  12. // full queue; re-add document for indexing
  13. indexer.add(action)
  14. } else if (ExceptionUtils.findThrowable(failure, ElasticsearchParseException.class).isPresent()) {
  15. // malformed document; simply drop request without failing sink
  16. } else {
  17. // for all other failures, fail the sink
  18. // here the failure is simply rethrown, but users can also choose to throw custom exceptions
  19. throw failure
  20. }
  21. }
  22. }))

The above example will let the sink re-add requests that failed due toqueue capacity saturation and drop requests with malformed documents, withoutfailing the sink. For all other failures, the sink will fail. If a ActionRequestFailureHandleris not provided to the constructor, the sink will fail for any kind of error.

Note that onFailure is called for failures that still occur only after theBulkProcessor internally finishes all backoff retry attempts.By default, the BulkProcessor retries to a maximum of 8 attempts withan exponential backoff. For more information on the behaviour of theinternal BulkProcessor and how to configure it, please see the following section.

By default, if a failure handler is not provided, the sink uses aNoOpFailureHandler that simply fails for all kinds of exceptions. Theconnector also provides a RetryRejectedExecutionFailureHandler implementationthat always re-add requests that have failed due to queue capacity saturation.

IMPORTANT: Re-adding requests back to the internal BulkProcessoron failures will lead to longer checkpoints, as the sink will alsoneed to wait for the re-added requests to be flushed when checkpointing.For example, when using RetryRejectedExecutionFailureHandler, checkpointswill need to wait until Elasticsearch node queues have enough capacity forall the pending requests. This also means that if re-added requests neversucceed, the checkpoint will never finish.

Configuring the Internal Bulk Processor

The internal BulkProcessor can be further configured for its behaviouron how buffered action requests are flushed, by setting the following values inthe provided Map<String, String>:

  • bulk.flush.max.actions: Maximum amount of actions to buffer before flushing.
  • bulk.flush.max.size.mb: Maximum size of data (in megabytes) to buffer before flushing.
  • bulk.flush.interval.ms: Interval at which to flush regardless of the amount or size of buffered actions.For versions 2.x and above, configuring how temporary request errors areretried is also supported:

  • bulk.flush.backoff.enable: Whether or not to perform retries with backoff delay for a flush if one or more of its actions failed due to a temporary EsRejectedExecutionException.

  • bulk.flush.backoff.type: The type of backoff delay, either CONSTANT or EXPONENTIAL
  • bulk.flush.backoff.delay: The amount of delay for backoff. For constant backoff, this is simply the delay between each retry. For exponential backoff, this is the initial base delay.
  • bulk.flush.backoff.retries: The amount of backoff retries to attempt.More information about Elasticsearch can be found here.

Packaging the Elasticsearch Connector into an Uber-Jar

For the execution of your Flink program, it is recommended to build aso-called uber-jar (executable jar) containing all your dependencies(see here for further information).

Alternatively, you can put the connector’s jar file into Flink’s lib/ folder to make it availablesystem-wide, i.e. for all job being run.