Elasticsearch Connector

Elasticsearch 是一个分布式、RESTful 风格的搜索和数据分析引擎。 Flink 官方提供了Elasticsearch的连接器,用于向 elasticsearch 中写入数据,可提供 至少一次 的处理语义

ElasticsearchSink 使用 TransportClient(6.x 之前)或者 RestHighLevelClient(6.x 开始)和 Elasticsearch 集群进行通信, Streamx对 flink-connector-elasticsearch6 进一步封装,屏蔽开发细节,简化Elasticsearch6及以上的写入操作。

提示

因为Flink Connector Elasticsearch 不同版本之间存在冲突Streamx暂时仅支持Elasticsearch6及以上的写入操作,如需写入Elasticsearch5需要使用者排除 flink-connector-elasticsearch6 依赖,引入 flink-connector-elasticsearch5依赖 创建 org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink 实例写入数据。

elastic写入依赖

Elasticsearch版本不同依赖 Flink Connector Elasticsearch 不通,以下信息来源flink-docs-release-1.14文档:
es5.x Maven依赖

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-connector-elasticsearch5_2.11</artifactId>
  4. <version>1.14.3</version>
  5. </dependency>

es6.x Maven依赖

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-connector-elasticsearch5_2.11</artifactId>
  4. <version>1.14.3</version>
  5. </dependency>

es7.x及以上 Maven依赖

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-connector-elasticsearch5_2.11</artifactId>
  4. <version>1.14.3</version>
  5. </dependency>

基于官网的Elasticsearch写入数据

以下代码摘自官方文档

  • java, Elasticsearch 6.x 及以上
  • scala, Elasticsearch 6.x 及以上
  1. import org.apache.flink.api.common.functions.RuntimeContext;
  2. import org.apache.flink.streaming.api.datastream.DataStream;
  3. import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
  4. import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
  5. import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
  6. import org.apache.http.HttpHost;
  7. import org.elasticsearch.action.index.IndexRequest;
  8. import org.elasticsearch.client.Requests;
  9. import java.util.ArrayList;
  10. import java.util.HashMap;
  11. import java.util.List;
  12. import java.util.Map;
  13. DataStream<String> input = ...;
  14. List<HttpHost> httpHosts = new ArrayList<>();
  15. httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
  16. httpHosts.add(new HttpHost("10.2.3.1", 9200, "http"));
  17. // 使用 ElasticsearchSink.Builder 创建 ElasticsearchSink
  18. ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(
  19. httpHosts,
  20. new ElasticsearchSinkFunction<String>() {
  21. public IndexRequest createIndexRequest(String element) {
  22. Map<String, String> json = new HashMap<>();
  23. json.put("data", element);
  24. return Requests.indexRequest()
  25. .index("my-index")
  26. .type("my-type")
  27. .source(json);
  28. }
  29. @Override
  30. public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
  31. indexer.add(createIndexRequest(element));
  32. }
  33. }
  34. );
  35. // 批量请求的配置;下面的设置使 sink 在接收每个元素之后立即提交,否则这些元素将被缓存起来
  36. esSinkBuilder.setBulkFlushMaxActions(1);
  37. // 为内部创建的 REST 客户端提供一个自定义配置信息的 RestClientFactory
  38. esSinkBuilder.setRestClientFactory(
  39. restClientBuilder -> {
  40. restClientBuilder.setDefaultHeaders(...)
  41. restClientBuilder.setMaxRetryTimeoutMillis(...)
  42. restClientBuilder.setPathPrefix(...)
  43. restClientBuilder.setHttpClientConfigCallback(...)
  44. }
  45. );
  46. // 最后,构建并添加 sink 到作业管道中
  47. input.addSink(esSinkBuilder.build());
  1. import org.apache.flink.api.common.functions.RuntimeContext
  2. import org.apache.flink.streaming.api.datastream.DataStream
  3. import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction
  4. import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer
  5. import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
  6. import org.apache.http.HttpHost
  7. import org.elasticsearch.action.index.IndexRequest
  8. import org.elasticsearch.client.Requests
  9. import java.util.ArrayList
  10. import java.util.List
  11. val input: DataStream[String] = ...
  12. val httpHosts = new java.util.ArrayList[HttpHost]
  13. httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"))
  14. httpHosts.add(new HttpHost("10.2.3.1", 9200, "http"))
  15. val esSinkBuilder = new ElasticsearchSink.Builder[String](
  16. httpHosts,
  17. new ElasticsearchSinkFunction[String] {
  18. def process(element: String, ctx: RuntimeContext, indexer: RequestIndexer) {
  19. val json = new java.util.HashMap[String, String]
  20. json.put("data", element)
  21. val rqst: IndexRequest = Requests.indexRequest
  22. .index("my-index")
  23. .`type`("my-type")
  24. .source(json)
  25. indexer.add(rqst)
  26. }
  27. }
  28. )
  29. // 批量请求的配置;下面的设置使 sink 在接收每个元素之后立即提交,否则这些元素将被缓存起来
  30. esSinkBuilder.setBulkFlushMaxActions(1)
  31. // 为内部创建的 REST 客户端提供一个自定义配置信息的 RestClientFactory
  32. esSinkBuilder.setRestClientFactory(new RestClientFactory {
  33. override def configureRestClientBuilder(restClientBuilder: RestClientBuilder): Unit = {
  34. restClientBuilder.setDefaultHeaders(...)
  35. restClientBuilder.setMaxRetryTimeoutMillis(...)
  36. restClientBuilder.setPathPrefix(...)
  37. restClientBuilder.setHttpClientConfigCallback(...)
  38. }
  39. })
  40. // 最后,构建并添加 sink 到作业管道中
  41. input.addSink(esSinkBuilder.build)

以上创建ElasticsearchSink添加参数非常的不灵敏。StreamX使用约定大于配置、自动配置的方式只需要配置es 连接参数、flink运行参数,StreamX 会自动组装source和sink,极大的简化开发逻辑,提升开发效率和维护性。

StreamX 写入 Elasticsearch

ESSink 在启用 Flink checkpoint 后,保证至少一次将操作请求发送到 Elasticsearch 集群。

1. 配置策略和连接信息

  1. #redis sink 配置
  2. # 必填参数,多个节点使用 host1:port, host2:port,
  3. host: localhost:9200
  4. # 选填参数
  5. # es:
  6. # disableFlushOnCheckpoint: true #默认为false
  7. # auth:
  8. # user:
  9. # password:
  10. # rest:
  11. # max.retry.timeout:
  12. # path.prefix:
  13. # content.type:
  14. # connect:
  15. # request.timeout:
  16. # timeout:
  17. # cluster.name: elasticsearch
  18. # client.transport.sniff:
  19. # bulk.flush.:

2. 写入Elasticsearch

用 StreamX 写入Elasticsearch非常简单,代码如下:

  • scala
  1. import com.streamxhub.streamx.flink.core.scala.FlinkStreaming
  2. import com.streamxhub.streamx.flink.core.scala.sink.ESSink
  3. import com.streamxhub.streamx.flink.core.scala.util.ElasticSearchUtils
  4. import org.apache.flink.api.scala._
  5. import org.elasticsearch.action.index.IndexRequest
  6. import org.json4s.DefaultFormats
  7. import org.json4s.jackson.Serialization
  8. import java.util.Date
  9. object ConnectorApp extends FlinkStreaming {
  10. implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
  11. override def handle(): Unit = {
  12. val ds = context.fromCollection(List(
  13. OrderEntity(1, 1, 11.3d, 3.1d, new Date()),
  14. OrderEntity(2, 1, 12.3d, 3.2d, new Date()),
  15. OrderEntity(3, 1, 13.3d, 3.3d, new Date()),
  16. OrderEntity(4, 1, 14.3d, 3.4d, new Date()),
  17. OrderEntity(5, 1, 15.3d, 7.5d, new Date()),
  18. OrderEntity(6, 1, 16.3d, 3.6d, new Date()),
  19. OrderEntity(7, 1, 17.3d, 3.7d, new Date())
  20. ))
  21. // es sink.........
  22. //1)定义 Index的写入规则
  23. implicit def indexReq(x: OrderEntity): IndexRequest = ElasticSearchUtils.indexRequest(
  24. "flink_order",
  25. "_doc",
  26. s"${x.id}_${x.time.getTime}",
  27. Serialization.write(x)
  28. )
  29. //3)定义esSink并下沉=数据. done
  30. ESSink().sink6[OrderEntity](ds)
  31. }
  32. case class OrderEntity(id: Int, num: Int, price: Double, gmv: Double, time: Date) extends Serializable
  33. }

Flink ElasticsearchSinkFunction可以执行多种类型请求,如(DeleteRequest、 UpdateRequest、IndexRequest),StreamX也对以上功能进行了支持,对应方法如下:

  1. import com.streamxhub.streamx.flink.core.scala.StreamingContext
  2. import org.apache.flink.streaming.api.datastream.DataStreamSink
  3. import org.apache.flink.streaming.api.scala.DataStream
  4. import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler
  5. import org.apache.flink.streaming.connectors.elasticsearch.util.RetryRejectedExecutionFailureHandler
  6. import org.apache.flink.streaming.connectors.elasticsearch6.RestClientFactory
  7. import org.elasticsearch.action.delete.DeleteRequest
  8. import org.elasticsearch.action.index.IndexRequest
  9. import org.elasticsearch.action.update.UpdateRequest
  10. import java.util.Properties
  11. import scala.annotation.meta.param
  12. class ESSink(@(transient@param) context: StreamingContext,
  13. property: Properties = new Properties(),
  14. parallelism: Int = 0,
  15. name: String = null,
  16. uid: String = null) {
  17. /**
  18. * for ElasticSearch6
  19. *
  20. * @param stream
  21. * @param suffix
  22. * @param restClientFactory
  23. * @param failureHandler
  24. * @param f
  25. * @tparam T
  26. * @return
  27. */
  28. def sink6[T](stream: DataStream[T],
  29. suffix: String = "",
  30. restClientFactory: RestClientFactory = null,
  31. failureHandler: ActionRequestFailureHandler = new RetryRejectedExecutionFailureHandler)
  32. (implicit f: T => IndexRequest): DataStreamSink[T] = {
  33. new ES6Sink(context, property, parallelism, name, uid).sink[T](stream, suffix, restClientFactory, failureHandler, f)
  34. }
  35. def update6[T](stream: DataStream[T],
  36. suffix: String = "",
  37. restClientFactory: RestClientFactory = null,
  38. failureHandler: ActionRequestFailureHandler = new RetryRejectedExecutionFailureHandler)
  39. ( f: T => UpdateRequest): DataStreamSink[T] = {
  40. new ES6Sink(context, property, parallelism, name, uid).sink[T](stream, suffix, restClientFactory, failureHandler, f)
  41. }
  42. def delete6[T](stream: DataStream[T],
  43. suffix: String = "",
  44. restClientFactory: RestClientFactory = null,
  45. failureHandler: ActionRequestFailureHandler = new RetryRejectedExecutionFailureHandler)
  46. ( f: T => DeleteRequest): DataStreamSink[T] = {
  47. new ES6Sink(context, property, parallelism, name, uid).sink[T](stream, suffix, restClientFactory, failureHandler, f)
  48. }
  49. }
注意事项

启用 Flink checkpoint 后,Flink Elasticsearch Sink 保证至少一次将操作请求发送到 Elasticsearch 集群。 这是通过在进行 checkpoint 时等待 BulkProcessor 中所有挂起的操作请求来实现。 这有效地保证了在触发 checkpoint 之前所有的请求被 Elasticsearch 成功确认,然后继续处理发送到 sink 的记录。 用户想要禁用刷新,可以配置disableFlushOnCheckpoint为true来实现,实质上意味着 sink 将不再提供任何可靠的交付保证,即使启用了作业拓扑的 checkpoint。

其他配置

处理失败的 Elasticsearch 请求

Elasticsearch 操作请求可能由于多种原因而失败,可以通过实现ActionRequestFailureHandler来指定失败处理逻辑,见 官方文档处理失败的 Elasticsearch 请求 单元

配置内部批量处理器

es内部BulkProcessor可以进一步配置其如何刷新缓存操作请求的行为详细查看官方文档配置内部批量处理器 单元

StreamX配置

其他的所有的配置都必须遵守 StreamX 配置,具体可配置项和各个参数的作用请参考项目配置