Wait the light to fall

可查询状态

焉知非鱼

Queryable State Beta

注意:可查询状态的客户端 API 目前处于不断发展的状态,对所提供接口的稳定性不做保证。在即将到来的 Flink 版本中,客户端的 API 很可能会有突破性的变化。

简而言之,这个功能将 Flink 的 managed keyed (partitioned) state(参见 Working with State)暴露给外界,并允许用户从 Flink 外部查询作业的状态。对于某些场景来说,可查询状态消除了与外部系统(如键值存储)进行分布式操作/交易的需求,而这往往是实践中的瓶颈。此外,该功能对于调试目的可能特别有用。

注意事项: 当查询一个状态对象时,该对象是在没有任何同步或复制的情况下从一个并发线程访问的。这是一个设计上的选择,因为上述任何一种情况都会导致作业延迟的增加,这是我们想要避免的。因为任何使用 Java 堆空间的状态后端,如 MemoryStateBackend 或 FsStateBackend,在检索值时都不会使用副本,而是直接引用存储的值,所以读-修改-写模式是不安全的,可能会导致可查询状态服务器因并发修改而失败。RocksDBStateBackend 则可以避免这些问题。

架构 #

在展示如何使用可查询状态之前,先简单介绍一下构成它的实体。Queryable State 功能由三个主要实体组成。

  1. QueryableStateClient,它(可能)运行在 Flink 集群之外,并提交用户查询。
  2. QueryableStateClientProxy,它运行在每个 TaskManager 上(即 Flink 集群内部),负责接收客户端的查询,代表他从负责的 TaskManager 中获取所请求的状态,并将其返回给客户端,以及
  3. QueryableStateServer,它运行在每个 TaskManager 上,负责为本地存储的状态提供服务。

客户端连接到其中一个代理,并发送一个与特定键 k 相关联的状态的请求。正如在使用状态中所述,keyed state 被组织在键组(Key Groups)中,每个 TaskManager 都被分配了一些这样的键组(Key Groups)。为了发现哪个 TaskManager 负责持有 k 的键组,代理将询问 JobManager。根据答案,代理将查询运行在该 TaskManager 上的 QueryableStateServer,以获取与 k 相关联的状态,并将响应转发到客户端。

激活可查询状态 #

要在 Flink 集群上启用可查询状态,你需要做以下工作。

  1. flink-queryable-state-runtime_2.11-1.11.0.jarFlink 发行版opt/ 文件夹中复制到 lib/ 文件夹中。
  2. 设置属性 queryable-state.enabletrue。请参阅配置文档了解详情和附加参数。

要验证您的群集是否在启用可查询状态后运行,请检查任何 TaskManager 的日志中的行。“Started the Queryable State Proxy Server @ …"。

使状态可查询 #

现在你已经在集群上激活了可查询状态,现在是时候看看如何使用它了。为了使一个状态对外界可见,它需要通过使用以下方式明确地成为可查询状态。

  • QueryableStateStream, 一个方便的对象,它作为一个接收器(sink),并把它的传入值作为可查询的状态提供,或者是
  • stateDescriptor.setQueryable(String queryableStateName) 方法,使得状态描述符所代表的 keyed state,可以查询。

下面的章节将解释这两种方法的使用。

可查询的状态流 #

在 KeyedStream 上调用 .asQueryableState(stateName, stateDescriptor) 会返回一个 QueryableStateStream,它将其值作为可查询状态提供。根据状态的类型,asQueryableState() 方法有以下几种变体。

// ValueState
QueryableStateStream asQueryableState(
    String queryableStateName,
    ValueStateDescriptor stateDescriptor)

// Shortcut for explicit ValueStateDescriptor variant
QueryableStateStream asQueryableState(String queryableStateName)

// FoldingState
QueryableStateStream asQueryableState(
    String queryableStateName,
    FoldingStateDescriptor stateDescriptor)

// ReducingState
QueryableStateStream asQueryableState(
    String queryableStateName,
    ReducingStateDescriptor stateDescriptor)

注意:没有可查询的 ListState 接收器,因为这会导致一个不断增长的列表,可能无法清理,因此最终会消耗过多的内存。

返回的 QueryableStateStream 可以被看作是一个接收器(sink),不能被进一步转换。在内部,一个 QueryableStateStream 会被翻译成一个操作符,它使用所有传入的记录来更新可查询状态实例。更新逻辑是由 asQueryableState 调用中提供的 StateDescriptor 的类型暗示的。在像下面这样的程序中,keyed stream 的所有记录将通过 ValueState.update(value) 来更新状态实例:

stream.keyBy(0).asQueryableState("query-name")

这就像 Scala API 的 flatMapWithState 一样。

管理的 Keyed State #

通过 StateDescriptor.setQueryable(String queryableStateName) 使相应的状态描述符成为可查询的状态,可以使操作符的托管键控状态(参见使用 Managed Keyed State))成为可查询的状态,如下面的例子。

ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
        new ValueStateDescriptor<>(
                "average", // the state name
                TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {})); // type information

descriptor.setQueryable("query-name"); // queryable state name

注意:queryableStateName 参数可以任意选择,并且只用于查询。它不一定要与状态本身的名称相同。

这个变体对于哪种类型的状态可以被查询没有限制。这意味着它可以用于任何 ValueState、ReduceState、ListState、MapState、AggregatingState 以及目前已被废弃的 FoldingState。

查询状态 #

到目前为止,你已经设置了你的集群以可查询的状态运行,并且你已经将你的(部分)状态声明为可查询。现在是时候看看如何查询这个状态了。

为此,你可以使用 QueryableStateClient 辅助类。它可以在 flink-queryable-state-client jar 中找到,它必须和 flink-core 一起被显式地包含在项目的 pom.xml 中作为依赖,如下所示。

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-core</artifactId>
  <version>1.11.0</version>
</dependency>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-queryable-state-client-java</artifactId>
  <version>1.11.0</version>
</dependency>

更多的内容,可以查看如何设置 Flink 程序

QueryableStateClient 会将你的查询提交给内部代理,然后代理会处理你的查询并返回最终结果。初始化客户端的唯一要求是提供一个有效的 TaskManager 主机名(记住每个 TaskManager 上都有一个可查询状态代理运行)和代理监听的端口。更多关于如何配置代理和状态服务器端口的信息请参见配置部分

QueryableStateClient client = new QueryableStateClient(tmHostname, proxyPort)

客户端准备好后,要查询一个类型为 V 的状态,与类型为 K 的键相关联,可以使用该方法。

CompletableFuture<S> getKvState(
    JobID jobId,
    String queryableStateName,
    K key,
    TypeInformation<K> keyTypeInfo,
    StateDescriptor<S, V> stateDescriptor)

以上返回一个 CompletableFuture,最终持有 ID 为 jobID 的作业的 queryableStateName 所标识的可查询状态实例的状态值。key 是你对其状态感兴趣的键,keyTypeInfo 将告诉 Flink 如何序列化/解序列化它。最后,stateDescriptor 包含了关于所请求的状态的必要信息,即它的类型(Value、Reduce 等)和如何序列化/解序列化它的必要信息。

细心的读者会注意到,返回的 future 包含一个 S 类型的值,即一个包含实际值的 State 对象。这可以是 Flink 支持的任何一种状态类型。ValueState,ReduceState,ListState,MapState,AggregatingState,以及目前已经废弃的 FoldingState。

注意:这些状态对象不允许对包含的状态进行修改。您可以使用它们来获取状态的实际值,例如使用 valueState.get(),或者迭代包含的 <K,V> 条目,例如使用 mapState.entry(),但您不能修改它们。举个例子,在返回的列表状态上调用 add() 方法会抛出一个 UnsupportedOperationException

注意:客户端是异步的,可以被多个线程共享。在未使用时需要通过 QueryableStateClient.shutdown() 来关闭它,以释放资源。

例子 #

下面的例子扩展了 CountWindowAverage 的例子(请看使用 Managed Keyed State),使其可查询,并展示了如何查询这个值。

public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {

    private transient ValueState<Tuple2<Long, Long>> sum; // a tuple containing the count and the sum

    @Override
    public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {
        Tuple2<Long, Long> currentSum = sum.value();
        currentSum.f0 += 1;
        currentSum.f1 += input.f1;
        sum.update(currentSum);

        if (currentSum.f0 >= 2) {
            out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
            sum.clear();
        }
    }

    @Override
    public void open(Configuration config) {
        ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
                new ValueStateDescriptor<>(
                        "average", // the state name
                        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {})); // type information
        descriptor.setQueryable("query-name");
        sum = getRuntimeContext().getState(descriptor);
    }
}
Once used in a job, you can retrieve the job ID and then query any keys current state from this operator:

QueryableStateClient client = new QueryableStateClient(tmHostname, proxyPort);

// the state descriptor of the state to be fetched.
ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
        new ValueStateDescriptor<>(
          "average",
          TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}));

CompletableFuture<ValueState<Tuple2<Long, Long>>> resultFuture =
        client.getKvState(jobId, "query-name", key, BasicTypeInfo.LONG_TYPE_INFO, descriptor);

// now handle the returned value
resultFuture.thenAccept(response -> {
        try {
            Tuple2<Long, Long> res = response.get();
        } catch (Exception e) {
            e.printStackTrace();
        }
});

配置 #

以下配置参数会影响可查询状态服务器和客户端的行为,它们被定义在 QueryableStateOptions 中。

状态服务器 #

  • queryable-state.server.ports:可查询状态服务器的服务器端口范围。如果在同一台机器上运行多个 task manager,这对避免端口冲突很有用。指定的范围可以是:一个端口: “9123”,一个端口范围: “50100-50200”,或者一个范围和或点的列表: “50100-50200,50300-50400,51234”。默认端口为 9067。
  • queryable-state.server.network-threads: 接收状态服务器传入请求的网络(事件循环)线程数(0 => #slots)。
  • queryable-state.server.query-threads: 为状态服务器处理/服务传入请求的线程数(0 => #slots)。

代理 #

  • queryable-state.proxy.ports:可查询状态代理服务器的端口范围。如果在同一台机器上运行多个 task manager,这对避免端口冲突很有用。指定的范围可以是:一个端口: “9123”,一个端口范围: “50100-50200”,或者一个范围和或点的列表: “50100-50200,50300-50400,51234”。默认端口为 9069。
  • queryable-state.proxy.network-threads:为客户端代理接收传入请求的网络(事件循环)线程数(0 => #slots)。
  • queryable-state.proxy.query-threads:为客户端代理处理/服务传入请求的线程数(0 => #slots)。

限制条件 #

  • 可查询状态的生命周期与任务的生命周期绑定,例如,任务在启动时注册可查询状态,在处置时取消注册。在未来的版本中,我们希望将其解耦,以便在任务完成后允许查询,并通过状态复制加快恢复速度。
  • 关于可用 KvState 的通知是通过一个简单的告诉发生的。将来应该改进这个功能,使其更加强大,包括询问和确认。
  • 服务器和客户端会跟踪查询的统计数据。目前默认情况下,这些数据是被禁用的,因为它们不会暴露在任何地方。一旦有更好的支持通过 Metrics 系统发布这些数字,我们应该启用统计。

原文链接: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/state/queryable_state.html