Wait the light to fall

Streaming Aggregation

焉知非鱼

Streaming Aggregation

流式聚合

SQL 是数据分析中使用最广泛的语言。Flink 的 Table API 和 SQL 可以让用户用更少的时间和精力定义高效的流分析应用。此外,Flink Table API 和 SQL 还进行了有效的优化,它集成了大量的查询优化和调整后的运算符实现。但并不是所有的优化都是默认启用的,所以对于一些工作负载,可以通过开启一些选项来提高性能。

在本页面中,我们将介绍一些有用的优化选项和流式聚合的内部结构,在某些情况下会带来很大的改善。

注意: 目前,本页面中提到的优化选项仅在 Blink 计划器中支持。

注意: 目前,流式聚合的优化只支持无边界聚合。未来将支持窗口聚合的优化。

默认情况下,无界聚合运算符对输入记录进行逐一处理,即:(1)从状态中读取累加器,(2)累加/缩减记录到累加器,(3)将累加器写回状态,(4)下一条记录将从(1)开始重新做处理。这种处理模式可能会增加 StateBackend 的开销(尤其是对于 RocksDB StateBackend)。此外,在生产中很常见的数据偏斜也会使问题更加严重,容易出现作业背压的情况。

迷你批处理(MiniBatch)聚合 #

迷你批处理(mini-batch)聚合的核心思想是将一捆输入缓存在聚合运算器内部的缓冲区中。当触发处理该捆输入时,每个键只需要一个操作来访问状态。这样可以大大降低状态开销,获得更好的吞吐量。但是,这可能会增加一些延迟,因为它缓冲了一些记录,而不是在瞬间处理它们。这就是吞吐量和延迟之间的权衡。

下图解释了迷你批处理聚合如何减少状态操作。

img

MiniBatch 优化默认为禁用。为了启用此优化,您应该设置选项 table.exec.mini-batch.enabled、table.exec.mini-batch.allow-latency 和 table.exec.mini-batch.size。请参阅配置页面了解更多详情。

下面的示例展示了如何启用这些选项。

// instantiate table environment
val tEnv: TableEnvironment = ...

// access flink configuration
val configuration = tEnv.getConfig().getConfiguration()
// set low-level key-value options
configuration.setString("table.exec.mini-batch.enabled", "true") // enable mini-batch optimization
configuration.setString("table.exec.mini-batch.allow-latency", "5 s") // use 5 seconds to buffer input records
configuration.setString("table.exec.mini-batch.size", "5000") // the maximum number of records can be buffered by each aggregate operator task

Local-Global 聚合 #

Local-Global 是为了解决数据偏斜问题而提出的,将一个分组聚合分为两个阶段,即先在上游做局部聚合,然后在下游做全局聚合,这类似于 MapReduce 中的 Combine+Reduce 模式。例如,考虑以下 SQL。

SELECT color, sum(id)
FROM T
GROUP BY color

有可能数据流中的记录是倾斜的,因此一些聚合运算符实例要处理的记录比其他实例多得多,从而导致热点。本地聚合可以帮助将一定数量的具有相同键的输入累积到一个累积器中。全局聚合将只接收减少的累加器,而不是大量的原始输入。这可以显著降低网络洗牌和状态访问的成本。本地聚合每次积累的输入数量是基于小批量间隔的。这意味着本地-全局聚合取决于迷你批量优化的启用。

下图显示了本地-全局聚合如何提高性能。

img

下面的例子展示了如何启用本地-全局聚合。

// instantiate table environment
val tEnv: TableEnvironment = ...

// access flink configuration
val configuration = tEnv.getConfig().getConfiguration()
// set low-level key-value options
configuration.setString("table.exec.mini-batch.enabled", "true") // local-global aggregation depends on mini-batch is enabled
configuration.setString("table.exec.mini-batch.allow-latency", "5 s")
configuration.setString("table.exec.mini-batch.size", "5000")
configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE") // enable two-phase, i.e. local-global aggregation

Split Distinct 聚合 #

Local-Global 优化对于一般的聚合,如 SUM、COUNT、MAX、MIN、AVG,可以有效地消除数据偏斜。但在处理不同的聚合时,其性能并不理想。

例如,如果我们想分析今天有多少独特的用户登录。我们可以有如下查询:

SELECT day, COUNT(DISTINCT user_id)
FROM T
GROUP BY day

如果 distinct key(即 user_id)的值很稀疏,count distinct 不好减少记录。即使启用了局部-全局优化,也没有什么帮助。因为累加器仍然包含了几乎所有的原始记录,而全局聚合将是瓶颈(大部分重度累加器是由一个任务处理的,即在同一天)。

这个优化的思路是将不同的聚合(如 COUNT(DISTINCT col))分成两个层次。第一层聚合由组键和一个额外的桶键进行洗牌。桶键使用 HASH_CODE(distinct_key) % BUCKET_NUM 计算。BUCKET_NUM 默认为 1024,可以通过 table.optimizer.distinct-agg.split.bucket-num 选项进行配置。第二次聚合是按原组键进行洗牌,用 SUM 聚合不同桶的 COUNT DISTINCT 值。因为相同的 distinct key 只会在同一个 bucket 中计算,所以转换是等价的。桶键起到了额外的组键的作用,分担组键中热点的负担。桶键使得工作具有可扩展性,可以解决 distinct aggregations 中的数据偏斜/热点问题。

拆分不同的聚合后,上面的查询会自动改写成下面的查询:

SELECT day, SUM(cnt)
FROM (
    SELECT day, COUNT(DISTINCT user_id) as cnt
    FROM T
    GROUP BY day, MOD(HASH_CODE(user_id), 1024)
)
GROUP BY day

下图显示了拆分不同的聚合如何提高性能(假设颜色代表天数,字母代表 user_id)。

img

注:以上是最简单的例子,可以从这个优化中受益。除此之外,Flink 还支持拆分更复杂的聚合查询,例如,多个不同键的不同聚合(如 COUNT(DISTINCT a),SUM(DISTINCT b)),以及与其他非不同聚合(如 SUM, MAX, MIN, COUNT)一起工作。

注意,目前,分割优化不支持包含用户定义的 AggregateFunction 的聚合。

下面的例子展示了如何启用拆分不同的聚合优化。

// instantiate table environment
val tEnv: TableEnvironment = ...

tEnv.getConfig         // access high-level configuration
  .getConfiguration    // set low-level key-value options
  .setString("table.optimizer.distinct-agg.split.enabled", "true")  // enable distinct agg split

在 Distinct 聚合上使用 FILTER 修饰符 #

在某些情况下,用户可能需要从不同的维度来计算 UV(唯一访客)的数量,例如:Android 的 UV、iPhone 的 UV、Web 的 UV 以及总的 UV。很多用户会选择 CASE WHEN 来支持,比如。

SELECT
 day,
 COUNT(DISTINCT user_id) AS total_uv,
 COUNT(DISTINCT CASE WHEN flag IN ('android', 'iphone') THEN user_id ELSE NULL END) AS app_uv,
 COUNT(DISTINCT CASE WHEN flag IN ('wap', 'other') THEN user_id ELSE NULL END) AS web_uv
FROM T
GROUP BY day

但是,在这种情况下,建议使用 FILTER 语法,而不是 CASE WHEN。因为 FILTER 更符合 SQL 标准,会得到更多的性能提升。FILTER 是用于聚合函数上的修饰符,用于限制聚合中使用的值。用 FILTER 修饰符替换上面的例子,如下所示。

SELECT
 day,
 COUNT(DISTINCT user_id) AS total_uv,
 COUNT(DISTINCT user_id) FILTER (WHERE flag IN ('android', 'iphone')) AS app_uv,
 COUNT(DISTINCT user_id) FILTER (WHERE flag IN ('wap', 'other')) AS web_uv
FROM T
GROUP BY day

Flink SQL 优化器可以识别同一个独立键上的不同过滤参数。例如,在上面的例子中,三个 COUNT DISTINCT 都在 user_id 列上。那么 Flink 就可以只使用一个共享状态实例而不是三个状态实例来减少状态访问和状态大小。在一些工作负载中,这可以得到显著的性能提升。

原文链接: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/tuning/streaming_aggregation_optimization.html