Wait the light to fall

Hive Read and Write

焉知非鱼

Hive Read and Write

Hive 读写

使用 HiveCatalog 和 Flink 与 Hive 的连接器,Flink 可以从 Hive 数据中读取和写入数据,作为 Hive 批处理引擎的替代。请务必按照说明在你的应用中加入正确的依赖关系。同时请注意,Hive 连接器只适用于 blink planner。

从 Hive 读取数据 #

假设 Hive 在其默认的数据库中包含一个名为 people 的单表,该表包含多条记录。

hive> show databases;
OK
default
Time taken: 0.841 seconds, Fetched: 1 row(s)

hive> show tables;
OK
Time taken: 0.087 seconds

hive> CREATE TABLE mytable(name string, value double);
OK
Time taken: 0.127 seconds

hive> SELECT * FROM mytable;
OK
Tom   4.72
John  8.0
Tom   24.2
Bob   3.14
Bob   4.72
Tom   34.9
Mary  4.79
Tiff  2.72
Bill  4.33
Mary  77.7
Time taken: 0.097 seconds, Fetched: 10 row(s)

数据准备好后,你可以连接到现有的 Hive 安装并开始查询。

Flink SQL> show catalogs;
myhive
default_catalog

# ------ Set the current catalog to be 'myhive' catalog if you haven't set it in the yaml file ------

Flink SQL> use catalog myhive;

# ------ See all registered database in catalog 'mytable' ------

Flink SQL> show databases;
default

# ------ See the previously registered table 'mytable' ------

Flink SQL> show tables;
mytable

# ------ The table schema that Flink sees is the same that we created in Hive, two columns - name as string and value as double ------ 
Flink SQL> describe mytable;
root
 |-- name: name
 |-- type: STRING
 |-- name: value
 |-- type: DOUBLE

# ------ Select from hive table or hive view ------ 
Flink SQL> SELECT * FROM mytable;

   name      value
__________ __________

    Tom      4.72
    John     8.0
    Tom      24.2
    Bob      3.14
    Bob      4.72
    Tom      34.9
    Mary     4.79
    Tiff     2.72
    Bill     4.33
    Mary     77.7

查询 Hive 视图 #

如果你需要查询 Hive 视图,请注意。

在查询该目录中的视图之前,必须先使用 Hive 目录作为当前目录。可以通过 Table API 中的 tableEnv.useCatalog(...) 或者 SQL Client 中的 USE CATALOG …来实现。 Hive 和 Flink SQL 有不同的语法,例如,不同的保留关键字和字元。请确保视图的查询与 Flink 语法兼容。

写入 Hive #

同样,也可以使用 INSERT 子句将数据写入 hive 中。

考虑有一个名为 “mytable “的示例表,表中有两列:name 和 age,类型为 string 和 int。

# ------ INSERT INTO will append to the table or partition, keeping the existing data intact ------ 
Flink SQL> INSERT INTO mytable SELECT 'Tom', 25;

# ------ INSERT OVERWRITE will overwrite any existing data in the table or partition ------ 
Flink SQL> INSERT OVERWRITE mytable SELECT 'Tom', 25;

我们也支持分区表,考虑有一个名为 myparttable 的分区表,有四列:name、age、my_type 和 my_date,在 type 中…my_typemy_date 是分区键。

# ------ Insert with static partition ------ 
Flink SQL> INSERT OVERWRITE myparttable PARTITION (my_type='type_1', my_date='2019-08-08') SELECT 'Tom', 25;

# ------ Insert with dynamic partition ------ 
Flink SQL> INSERT OVERWRITE myparttable SELECT 'Tom', 25, 'type_1', '2019-08-08';

# ------ Insert with static(my_type) and dynamic(my_date) partition ------ 
Flink SQL> INSERT OVERWRITE myparttable PARTITION (my_type='type_1') SELECT 'Tom', 25, '2019-08-08';

格式 #

我们测试了以下表格存储格式:文本、csv、SequenceFile、ORC 和 Parquet。

优化 #

分区修剪 #

Flink 使用分区修剪作为一种性能优化,以限制 Flink 在查询 Hive 表时读取的文件和分区的数量。当你的数据被分区后,当查询符合某些过滤条件时,Flink 只会读取 Hive 表中的分区子集。

投影下推 #

Flink 利用投影下推,通过从表扫描中省略不必要的字段,最大限度地减少 Flink 和 Hive 表之间的数据传输。

当一个表包含许多列时,它尤其有利。

限制下推 #

对于带有 LIMIT 子句的查询,Flink 会尽可能地限制输出记录的数量,以减少跨网络传输的数据量。

读取时的向量优化 #

当满足以下条件时,会自动使用优化功能。

  • 格式: ORC 或 Parquet。
  • 没有复杂数据类型的列,如 hive 类型: List, Map, Struct, Union。

这个功能默认是开启的。如果出现问题,可以使用这个配置选项来关闭 Vectorized Optimization。

table.exec.hive.fallback-mapred-reader=true

Source 并行性推断 #

默认情况下,Flink 根据分割次数来推断 hive 源的并行度,分割次数是根据文件的数量和文件中的块数来推断的。

Flink 允许你灵活配置并行度推断的策略。你可以在 TableConfig 中配置以下参数(注意,这些参数会影响作业的所有源)。

Key Default Type Description
table.exec.hive.infer-source-parallelism true Boolean 如果为真,则根据分割数来推断源的并行度,如果为假,则根据配置来设置源的并行度。如果为 false,则通过配置来设置源的并行度。
table.exec.hive.infer-source-parallelism.max 1000 Integer 设置源运算符的最大推断并行度。

路线图 #

我们正在规划并积极开发支持功能,如:

  • ACID 表
  • 分桶表
  • 更多格式

更多功能需求请联系社区 https://flink.apache.org/community.html#mailing-lists

原文链接: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_read_write.html