Wait the light to fall

Hive Streaming

焉知非鱼

Hive Streaming

Hive 流

一个典型的 hive 作业是周期性地安排执行的,所以会有较大的延迟。

Flink 支持以流式的形式写入、读取和加入 hive 表。

流式数据有三种类型。

  • 将流式数据写入 Hive 表。
  • 以流的形式增量读取 Hive 表。
  • 流式表使用 Temporal 表连接 Hive 表。

流式写入 #

Hive 表支持流式写入,基于 Filesystem Streaming Sink

Hive Streaming Sink 重用 Filesystem Streaming Sink,将 Hadoop OutputFormat/RecordWriter 整合到流式写入。Hadoop RecordWriters 是 Bulk-encoded Formats,Bulk Formats 在每个检查点上滚动文件。

默认情况下,现在只有重命名提交者,这意味着 S3 文件系统不能支持精确的 once,如果你想在 S3 文件系统中使用 Hive 流媒体汇,你可以在 TableConfig 中把以下参数配置为 false,以使用 Flink 原生写入器(只对 parquet 和 orc 有效)(注意这些参数会影响所有作业的汇)。

Key Default Type Description
table.exec.hive.fallback-mapred-writer true Boolean 如果是假的,用 flink native writer 写 parquet 和 orc 文件;如果是真的,用 hadoop mapred record writer 写 parquet 和 orc 文件。

下面展示了如何使用流接收器写一个流式查询,将数据从 Kafka 写到一个有 partition-commit 的 Hive 表中,并运行一个批处理查询将这些数据读回。

SET table.sql-dialect=hive;
CREATE TABLE hive_table (
  user_id STRING,
  order_amount DOUBLE
) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (
  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
  'sink.partition-commit.trigger'='partition-time',
  'sink.partition-commit.delay'='1 h',
  'sink.partition-commit.policy.kind'='metastore,success-file'
);

SET table.sql-dialect=default;
CREATE TABLE kafka_table (
  user_id STRING,
  order_amount DOUBLE,
  log_ts TIMESTAMP(3),
  WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND
) WITH (...);

-- streaming sql, insert into hive table
INSERT INTO TABLE hive_table SELECT user_id, order_amount, DATE_FORMAT(log_ts, 'yyyy-MM-dd'), DATE_FORMAT(log_ts, 'HH') FROM kafka_table;

-- batch sql, select with partition pruning
SELECT * FROM hive_table WHERE dt='2020-05-20' and hr='12';

流式读取 #

为了提高 hive 读取的实时性,Flink 支持实时 Hive 表流读取。

  • 分区表,监控分区的生成,并逐步读取新分区。
  • 非分区表,监控文件夹中新文件的生成,并增量读取新文件。

甚至可以采用 10 分钟级别的分区策略,利用 Flink 的 Hive 流式读取和 Hive 流式写入,大大提高 Hive 数据仓库的实时性能,达到准实时分钟级别。

Key Default Type Description
streaming-source.enable false Boolean 是否启用流媒体源。注意:请确保每个分区/文件都是以原子方式写入,否则读者可能会得到不完整的数据。请确保每个分区/文件都应该以原子方式写入,否则读者可能会得到不完整的数据。
streaming-source.monitor-interval 1 m Duration 连续监控分区/文件的时间间隔。
streaming-source.consume-order create-time String 流源的消耗顺序,支持 create-time 和 partition-time。create-time 比较的是分区/文件的创建时间,这不是 Hive metaStore 中的分区创建时间,而是文件系统中的文件夹/文件修改时间;partition-time 比较的是分区名称所代表的时间,如果分区文件夹以某种方式得到更新,比如在文件夹中添加新文件,就会影响数据的消耗方式。对于非分区表,这个值应该一直是 “创建时间”。
streaming-source.consume-start-offset 1970-00-00 String 流式消费的起始偏移量。如何解析和比较偏移量取决于你的顺序。对于创建时间和分区时间,应该是一个时间戳字符串(yyyy-[m]m-[d]d [hh:mm:ss])。对于分区时间,将使用分区时间提取器从分区中提取时间。

注意:

  • 监控策略是现在扫描位置路径中的所有目录/文件。如果分区太多,会出现性能问题。
  • 非分区的流式读取需要将每个文件原子地放入目标目录中。
  • 分区的流式读取要求在 hive metastore 的视图中原子地添加每个分区。这意味着新添加到现有分区的数据不会被消耗掉。
  • 流读取不支持 Flink DDL 中的水印语法。所以它不能用于窗口操作符。

下面展示了如何增量读取 Hive 表。

SELECT * FROM hive_table /*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.consume-start-offset'='2020-05-20') */;

Hive 表作为临时表 #

您可以使用 Hive 表作为时态表,并将流式数据加入其中。请按照示例来了解如何连接一个时态表。

在执行 join 时,Hive 表将被缓存在 TM 内存中,并在 Hive 表中查找来自流的每一条记录,以决定是否找到匹配。你不需要任何额外的设置就可以使用 Hive 表作为时态表。但可以选择用以下属性配置 Hive 表缓存的 TTL。缓存过期后,将再次扫描 Hive 表以加载最新的数据。

Key Default Type Description
lookup.join.cache.ttl 60 min Duration 在查找连接中构建表的缓存 TTL(例如 10 分钟)。默认情况下,TTL 为 60 分钟。

注意:

  1. 每个加入子任务都需要保留自己的 Hive 表的缓存。请确保 Hive 表可以放入 TM 任务槽的内存中。
  2. 你应该为 lookup.join.cache.ttl 设置一个相对较大的值。如果你的 Hive 表需要太频繁的更新和重载,你可能会有性能问题。
  3. 目前,每当缓存需要刷新时,我们只是简单地加载整个 Hive 表。没有办法区分新数据和旧数据。

原文链接: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_streaming.html