可查询状态Beta

注意:可查询状态的客户端API当前处于不断发展的状态,并且不保证所提供接口的稳定性。在即将推出的Flink版本中,客户端可能会出现API更改。

简而言之,此函数将Flink的托管Keys(分区)状态(请参阅使用状态暴露给外部世界,并允许用户从Flink外部查询作业的状态。对于某些情况,可查询状态消除了对外部系统(例如键值存储)的分布式 算子操作/事务的需要,这通常是实践中的瓶颈。此外,此函数对于调试目的可能特别有用。

注意:查询状态对象时,无需任何同步或复制即可从并发线程访问该对象。这是一种设计选择,因为上述任何一种都会导致增加的作业延迟,我们希望避免这种情况。由于使用Java堆空间的任何状态后台(例如 MemoryStateBackend或)FsStateBackend在检索值时不能与副本一起使用,而是直接引用存储的值,因此读取 - 修改 - 写入模式是不安全的,并且可能导致可查询状态服务器由于并发修改而失败。RocksDBStateBackend从这些问题的安全。

建筑

在展示如何使用可查询状态之前,简要描述组成它的实体是很有用的。可查询状态函数包含三个主要实体:

  • QueryableStateClient,所述Flink群集外,其(可能)运行并提交用户查询,
  • QueryableStateClientProxy,它运行在每个TaskManager在Flink集群内),并负责接收客户的疑问,代表他取出由负责TaskManager所请求的状态,并返回给客户端,
  • QueryableStateServer它运行在每个TaskManager并负责提供本地存储的状态。客户端连接到其中一个代理,并发送与特定Keys关联的状态的请求k如“ 使用状态”中所述,被Keys化状态按键组进行组织,每个键组TaskManager都分配了许多这些键组。要发现哪个TaskManager负责Keys组持有k,代理将询问JobManager根据答案,代理将在QueryableStateServer运行时查询TaskManager与之关联的状态k,并将响应转发回客户端。

激活可查询状态

要在Flink群集上启用可查询状态,只需Flink分发flink-queryable-state-runtime_2.11-1.7-SNAPSHOT.jaropt/文件夹复制到该文件夹即可。否则,未启用可查询状态函数。lib/

要验证群集是否在启用了可查询状态的情况下运行,请检查该行的任何TaskManager的日志:"Started the Queryable State Proxy Server @ …"

使状态可查询

现在您已在群集上激活了可查询状态,现在是时候看看如何使用它了。为了使状态对外界可见,需要使用以下方法明确查询状态:

  • a QueryableStateStream,作为接收器的便利对象,并将其传入值作为可查询状态提供,或者
  • stateDescriptor.setQueryable(String queryableStateName)方法使得由状态描述符表示的被Keys化状态可查询。以下部分解释了这两种方法的用法。

可查询状态流

调用.asQueryableState(stateName, stateDescriptor)一个KeyedStream回报QueryableStateStream它提供其值可查询状态。根据状态的类型,该asQueryableState()方法有以下变体

  1. // ValueState
  2. QueryableStateStream asQueryableState(
  3. String queryableStateName,
  4. ValueStateDescriptor stateDescriptor)
  5. // Shortcut for explicit ValueStateDescriptor variant
  6. QueryableStateStream asQueryableState(String queryableStateName)
  7. // FoldingState
  8. QueryableStateStream asQueryableState(
  9. String queryableStateName,
  10. FoldingStateDescriptor stateDescriptor)
  11. // ReducingState
  12. QueryableStateStream asQueryableState(
  13. String queryableStateName,
  14. ReducingStateDescriptor stateDescriptor)

注意:没有可查询的接收ListState器,因为它会导致不断增长的列表,这些列表可能无法清理,因此最终会消耗太多内存。

返回QueryableStateStream可以看作是一个接收器,不能进一步转换。在内部,aQueryableStateStream被转换为 算子,该 算子使用所有传入记录来更新可查询状态实例。更新逻辑由调用中StateDescriptor提供的类型隐式asQueryableState在如下所示的程序中,被Key化的数据流的所有记录将用于通过以下方式更新状态实例ValueState.update(value)

  1. stream.keyBy(0).asQueryableState("query-name")

这就像Scala API一样flatMapWithState

管理Keys状态

通过使相应的状态描述符可查询可以使算子的管理被Keys化状态(请参阅使用托管被Keys化状态)可查询StateDescriptor.setQueryable(String queryableStateName),如下例所示:

  1. ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
  2. new ValueStateDescriptor<>(
  3. "average", // the state name
  4. TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {})); // type information
  5. descriptor.setQueryable("query-name"); // queryable state name

注意:queryableStateName参数可以任意选择并且仅用于查询。它不必与状态自己的名字相同。

该变体对于哪种类型的状态可以被查询没有限制。这意味着,这可以用于任何ValueStateReduceStateListStateMapStateAggregatingState,和当前已过时FoldingState

查询状态

到目前为止,您已将集群设置为以可查询状态运行,并且已将(某些)状态声明为可查询状态。现在是时候看看如何查询这个状态了。

为此,您可以使用QueryableStateClient帮助程序类。这是可用的flink-queryable-state-client,必须被明确列入作为一个依赖罐子pom.xml项目与沿flink-core,如下图所示:

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-core</artifactId>
  4. <version>1.7-SNAPSHOT</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.flink</groupId>
  8. <artifactId>flink-queryable-state-client-java_2.11</artifactId>
  9. <version>1.7-SNAPSHOT</version>
  10. </dependency>

有关详细信息,您可以查看如何设置Flink程序

QueryableStateClient将提交查询到内部代理,然后将处理您的查询并返回最终结果。初始化客户端的唯一要求是提供有效的TaskManager主机名(请记住,每个TaskManager上都运行可查询的状态代理)以及代理侦听的端口。有关如何在“ 配置”部分中配置代理和状态服务器端口的更多信息

  1. QueryableStateClient client = new QueryableStateClient(tmHostname, proxyPort);

在客户端准备好的情况下,要查询与类型V的键相关联的类型状态K,您可以使用以下方法:

  1. CompletableFuture<S> getKvState(
  2. JobID jobId,
  3. String queryableStateName,
  4. K key,
  5. TypeInformation<K> keyTypeInfo,
  6. StateDescriptor<S, V> stateDescriptor)

以上内容返回CompletableFuture最终保存由queryableStateName具有ID的作业标识的可查询状态实例的状态值jobIDkey是您感兴趣的状态的Keys,keyTypeInfo它将告诉Flink如何序列化/反序列化它。最后,stateDescriptor包含了请求的状态,即它的类型(必要的信息ValueReduce等等),并就如何序列化/反序列化的必要信息。

细心的读者会注意到返回的future包含一个type值S一个State包含实际值对象。这可以通过任何支持Flink状态类型:ValueStateReduceStateListStateMapStateAggregatingState,和当前已过时FoldingState

注意:这些状态对象不允许修改包含的状态。您可以使用它们来获取状态的实际值,例如,使用valueState.get()或迭代所包含的<K, V>条目,例如使用mapState.entries(),但您无法修改它们。例如,add()在返回的列表状态上调用该方法将抛出一个UnsupportedOperationException

注意:客户端是异步的,可以由多个线程共享。它需要QueryableStateClient.shutdown()在未使用时关闭才能释放资源。

以下示例通过使CountWindowAverage示例可查询来扩展示例(请参阅使用托管被Keys化状态),并显示如何查询此值:

  1. public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
  2. private transient ValueState<Tuple2<Long, Long>> sum; // a tuple containing the count and the sum
  3. @Override
  4. public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {
  5. Tuple2<Long, Long> currentSum = sum.value();
  6. currentSum.f0 += 1;
  7. currentSum.f1 += input.f1;
  8. sum.update(currentSum);
  9. if (currentSum.f0 >= 2) {
  10. out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
  11. sum.clear();
  12. }
  13. }
  14. @Override
  15. public void open(Configuration config) {
  16. ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
  17. new ValueStateDescriptor<>(
  18. "average", // the state name
  19. TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {})); // type information
  20. descriptor.setQueryable("query-name");
  21. sum = getRuntimeContext().getState(descriptor);
  22. }
  23. }

在作业中使用后,您可以检索作业ID,然后从该 算子查询任何键的当前状态:

  1. QueryableStateClient client = new QueryableStateClient(tmHostname, proxyPort);
  2. // the state descriptor of the state to be fetched.
  3. ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
  4. new ValueStateDescriptor<>(
  5. "average",
  6. TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}));
  7. CompletableFuture<ValueState<Tuple2<Long, Long>>> resultFuture =
  8. client.getKvState(jobId, "query-name", key, BasicTypeInfo.LONG_TYPE_INFO, descriptor);
  9. // now handle the returned value
  10. resultFuture.thenAccept(response -> {
  11. try {
  12. Tuple2<Long, Long> res = response.get();
  13. } catch (Exception e) {
  14. e.printStackTrace();
  15. }
  16. });

配置

以下配置参数会影响可查询状态服务器和客户端的行为。它们的定义是QueryableStateOptions

状态服务器

  • query.server.ports:可查询状态服务器的服务器端口范围。如果有多个TaskManager在同一台机器上运行,这对于避免端口冲突很有用。指定的范围可以是:port:“9123”,一系列端口:“50100-50200”,或范围和/或点列表:“50100-50200,50300-50400,51234”。默认端口为9067。
  • query.server.network-threads:接收状态服务器传入请求的网络(事件循环)线程数(0 => #slots)
  • query.server.query-threads:处理/服务状态服务器的传入请求的线程数(0 => #slots)。

代理

  • query.proxy.ports:可查询状态代理的服务器端口范围。如果有多个TaskManager在同一台机器上运行,这对于避免端口冲突很有用。指定的范围可以是:port:“9123”,一系列端口:“50100-50200”,或范围和/或点列表:“50100-50200,50300-50400,51234”。默认端口为9069。
  • query.proxy.network-threads:接收客户端代理的传入请求的网络(事件循环)线程数(0 => #slots)
  • query.proxy.query-threads:处理/提供客户端代理的传入请求的线程数(0 => #slots)。

限制

  • 可查询状态生命周期绑定到作业的生命周期,例如,任务在启动时注册可查询状态,并在处理时注销它。在将来的版本中,需要将其解耦以便在任务完成后允许查询,并通过状态复制加速恢复。
  • 关于可用KvState的通知通​​过简单的告诉发生。在未来,应该通过询问和确认来改进这一点。
  • 服务器和客户端会跟踪查询的统计信息。默认情况下,这些函数目前处于禁用状态,因为它们不会在任 只要有更好的支持通过Metrics系统发布这些数字,我们就应该启用统计数据。