Wait the light to fall

查询配置

焉知非鱼

Query Configuration

查询配置

表 API 和 SQL 查询具有相同的语义,无论其输入是有限的行集还是无限制的表变化流。在许多情况下,对流输入的连续查询能够计算出与离线计算结果相同的准确结果。然而,对于一些连续查询,你必须限制它们所维持的状态的大小,以避免在摄取无约束的输入流时耗尽存储。这取决于输入数据的特性和查询本身是否需要限制状态大小,以及它是否和如何影响计算结果的准确性。

Flink 的 Table API 和 SQL 接口提供了参数来调整连续查询的准确性和资源消耗。这些参数是通过 TableConfig 对象指定的,可以从 TableEnvironment 中获得。

val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env)

// obtain query configuration from TableEnvironment
val tConfig: TableConfig = tableEnv.getConfig
// set query parameters
tConfig.setIdleStateRetentionTime(Time.hours(12), Time.hours(24))

// define query
val result: Table = ???

// create TableSink
val sink: TableSink[Row] = ???

// register TableSink
tableEnv.registerTableSink(
  "outputTable",                  // table name
  Array[String](...),             // field names
  Array[TypeInformation[_]](...), // field types
  sink)                           // table sink

// emit result Table via a TableSink
result.executeInsert("outputTable")

// convert result Table into a DataStream[Row]
val stream: DataStream[Row] = result.toAppendStream[Row]

下面我们介绍 TableConfig 的参数,以及它们如何影响查询的准确性和资源消耗。

闲置状态保留时间 #

许多查询在一个或多个键属性上聚合或连接记录。当这样的查询在一个流上执行时,连续查询需要收集记录或维护每个键的部分结果。如果输入流的键域是不断变化的,即活跃的键值是随着时间的推移而变化的,那么随着观察到越来越多不同的键,连续查询会积累越来越多的状态。然而,往往键在一段时间后就会变得不活跃,其相应的状态也就变得陈旧无用。

例如下面的查询计算每节课的点击次数。

SELECT sessionId, COUNT(*) FROM clicks GROUP BY sessionId;

sessionId 属性被用作分组键,连续查询会对它观察到的每个 sessionId 进行计数。sessionId 属性是随着时间的推移而不断变化的,sessionId 值只有在会话结束之前才是有效的,即在有限的时间内。然而,连续查询无法知道 sessionId 的这一属性,它期望每个 sessionId 值都能在任何时间点出现。它为每一个观察到的 sessionId 值维持一个计数。因此,随着观察到的 sessionId 值越来越多,查询的总状态大小也在不断增加。

闲置状态保留时间参数定义了一个键的状态在被移除之前不被更新的保留时间。对于前面的示例查询,只要在配置的时间段内没有更新,sessionId 的计数就会被删除。

通过删除一个键的状态,连续查询就会完全忘记它以前见过这个键。如果处理一条带有键的记录,其状态在之前已经被删除,则该记录将被视为带有相应键的第一条记录。对于上面的例子来说,这意味着一个 sessionId 的计数将重新开始为 0。

有两个参数可以配置空闲状态保留时间。

  • 最小空闲状态保留时间定义了一个非活动键的状态在被移除之前至少保留多长时间。
  • 最大空闲状态保留时间定义了非活动键的状态在被删除前最多保留多长时间。

参数指定如下:

val tConfig: TableConfig = ???

// set idle state retention time: min = 12 hours, max = 24 hours
tConfig.setIdleStateRetentionTime(Time.hours(12), Time.hours(24))

清理状态需要额外的记账,对于 minTime 和 maxTime 的较大差异,记账成本较低。minTime 和 maxTime 之间的差异必须至少为 5 分钟。

原文链接: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/query_configuration.html