Wait the light to fall

HiveCatalog

焉知非鱼

HiveCatalog

HiveCatalog

Hive Metastore 经过多年的发展,已经成为 Hadoop 生态系统中事实上的元数据中心。很多公司在生产中都有一个 Hive Metastore 服务实例来管理他们所有的元数据,无论是 Hive 元数据还是非 Hive 元数据,都是真理的来源。

对于同时部署了 Hive 和 Flink 的用户,HiveCatalog 可以让他们使用 Hive Metastore 来管理 Flink 的元数据。

对于只有 Flink 部署的用户来说,HiveCatalog 是 Flink 开箱即用的唯一持久化目录。如果没有持久化目录,用户使用 Flink SQL CREATE DDL 必须在每个会话中反复创建元对象,比如 Kafka 表,这就浪费了很多时间。HiveCatalog 填补了这一空白,使用户只需创建一次表和其他元对象,以后就可以跨会话方便地引用和管理它们。

设置 HiveCatalog #

依赖性 #

在 Flink 中设置 HiveCatalog 需要与 Flink-Hive 集成相同的依赖关系

配置 #

在 Flink 中设置 HiveCatalog 需要与 Flink-Hive 集成相同的配置

如何使用 HiveCatalog #

一旦配置得当,HiveCatalog 应该可以直接使用。用户可以用 DDL 创建 Flink 元对象,并在创建后立即看到它们。

HiveCatalog 可以用来处理两种表。Hive 兼容表和通用表。Hive-compatible 表是指那些以 Hive 兼容的方式存储的表,在存储层的元数据和数据方面都是如此。因此,通过 Flink 创建的 Hive 兼容表可以从 Hive 端进行查询。

而通用表则是针对 Flink 的。当使用 HiveCatalog 创建通用表时,我们只是使用 HMS 来持久化元数据。虽然这些表对 Hive 是可见的,但 Hive 不太可能理解这些元数据。因此在 Hive 中使用这样的表会导致未定义的行为。

Flink 使用属性 “is_generic” 来判断一个表是与 Hive 兼容还是通用。当用 HiveCatalog 创建一个表时,它默认被认为是通用的。如果你想创建一个 Hive 兼容的表,请确保在你的表属性中把 is_generic 设置为 false。

如上所述,通用表不应该从 Hive 使用。在 Hive CLI 中,你可以调用 describe FORMATTED 对一个表进行检查,通过检查 is_generic 属性来决定它是否是通用的。通用表会有 is_generic=true。

例子 #

我们在这里将通过一个简单的例子进行讲解。

第 1 步:设置 Hive Metastore。

有一个 Hive Metastore 在运行。

在这里,我们在本地路径 /opt/hive-conf/hive-site.xml 中设置一个本地 Hive Metastore 和我们的 hive-site.xml 文件。我们有如下的一些配置。

<configuration>
   <property>
      <name>javax.jdo.option.ConnectionURL</name>
      <value>jdbc:mysql://localhost/metastore?createDatabaseIfNotExist=true</value>
      <description>metadata is stored in a MySQL server</description>
   </property>

   <property>
      <name>javax.jdo.option.ConnectionDriverName</name>
      <value>com.mysql.jdbc.Driver</value>
      <description>MySQL JDBC driver class</description>
   </property>

   <property>
      <name>javax.jdo.option.ConnectionUserName</name>
      <value>...</value>
      <description>user name for connecting to mysql server</description>
   </property>

   <property>
      <name>javax.jdo.option.ConnectionPassword</name>
      <value>...</value>
      <description>password for connecting to mysql server</description>
   </property>

   <property>
       <name>hive.metastore.uris</name>
       <value>thrift://localhost:9083</value>
       <description>IP address (or fully-qualified domain name) and port of the metastore host</description>
   </property>

   <property>
       <name>hive.metastore.schema.verification</name>
       <value>true</value>
   </property>

</configuration>

用 Hive Cli 测试连接到 HMS。运行一些命令,我们可以看到我们有一个名为 default 的数据库,里面没有表。

hive> show databases;
OK
default
Time taken: 0.032 seconds, Fetched: 1 row(s)

hive> show tables;
OK
Time taken: 0.028 seconds, Fetched: 0 row(s)

步骤 2:配置 Flink 集群和 SQL CLI

将所有 Hive 的依赖关系添加到 Flink 发行版的 /lib 目录下,并修改 SQL CLI 的 yaml 配置文件 sql-cli-defaults.yaml 如下。

execution:
    planner: blink
    type: streaming
    ...
    current-catalog: myhive  # set the HiveCatalog as the current catalog of the session
    current-database: mydatabase
    
catalogs:
   - name: myhive
     type: hive
     hive-conf-dir: /opt/hive-conf  # contains hive-site.xml

第三步:建立 Kafka 集群

Bootstrap 一个本地的 Kafka 2.3.0 集群,主题命名为 “test”,并以 name 和 age 的元组形式向主题产生一些简单的数据。

localhost$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>tom,15
>john,21

这些消息可以通过启动 Kafka 控制台消费者看到。

localhost$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

tom,15
john,21

第四步:启动 SQL Client,用 Flink SQL DDL 创建一个 Kafka 表。

启动 Flink SQL Client,通过 DDL 创建一个简单的 Kafka 2.3.0 表,并验证其模式。

Flink SQL> CREATE TABLE mykafka (name String, age Int) WITH (
   'connector.type' = 'kafka',
   'connector.version' = 'universal',
   'connector.topic' = 'test',
   'connector.properties.bootstrap.servers' = 'localhost:9092',
   'format.type' = 'csv',
   'update-mode' = 'append'
);
[INFO] Table has been created.

Flink SQL> DESCRIBE mykafka;
root
 |-- name: STRING
 |-- age: INT

验证该表也是通过 Hive Cli 对 Hive 可见的,注意该表有属性 is_generic=true。

hive> show tables;
OK
mykafka
Time taken: 0.038 seconds, Fetched: 1 row(s)

hive> describe formatted mykafka;
OK
# col_name            	data_type           	comment


# Detailed Table Information
Database:           	default
Owner:              	null
CreateTime:         	......
LastAccessTime:     	UNKNOWN
Retention:          	0
Location:           	......
Table Type:         	MANAGED_TABLE
Table Parameters:
	flink.connector.properties.bootstrap.servers	localhost:9092
	flink.connector.topic	test
	flink.connector.type	kafka
	flink.connector.version	universal
	flink.format.type   	csv
	flink.generic.table.schema.0.data-type	VARCHAR(2147483647)
	flink.generic.table.schema.0.name	name
	flink.generic.table.schema.1.data-type	INT
	flink.generic.table.schema.1.name	age
	flink.update-mode   	append
	is_generic          	true
	transient_lastDdlTime	......

# Storage Information
SerDe Library:      	org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat:        	org.apache.hadoop.mapred.TextInputFormat
OutputFormat:       	org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat
Compressed:         	No
Num Buckets:        	-1
Bucket Columns:     	[]
Sort Columns:       	[]
Storage Desc Params:
	serialization.format	1
Time taken: 0.158 seconds, Fetched: 36 row(s)

第五步:运行 Flink SQL 查询 Kakfa 表。

在 Flink 集群中,无论是单机还是 yarn-session,从 Flink SQL Client 中运行一个简单的选择查询。

Flink SQL> select * from mykafka;

在 Kafka 主题中多产生一些信息。

localhost$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

tom,15
john,21
kitty,30
amy,24
kaiky,18

你现在应该可以在 SQL Client 中看到 Flink 产生的结果,如。

             SQL Query Result (Table)
 Refresh: 1 s    Page: Last of 1     

        name                       age
         tom                        15
        john                        21
       kitty                        30
         amy                        24
       kaiky                        18

支持的类型 #

HiveCatalog 支持通用表的所有 Flink 类型。

对于 Hive 兼容的表,HiveCatalog 需要将 Flink 数据类型映射到相应的 Hive 类型,如下表所述。

Flink Data Type Hive Data Type
CHAR(p) CHAR(p)
VARCHAR(p) VARCHAR(p)
STRING STRING
BOOLEAN BOOLEAN
TINYINT TINYINT
SMALLINT SMALLINT
INT INT
BIGINT LONG
FLOAT FLOAT
DOUBLE DOUBLE
DECIMAL(p, s) DECIMAL(p, s)
DATE DATE
TIMESTAMP(9) TIMESTAMP
BYTES BINARY
ARRAY LIST
MAP<K, V> MAP<K, V>
ROW STRUCT

关于类型映射需要注意的地方。

  • Hive 的 CHAR(p) 最大长度为 255。
  • Hive 的 VARCHAR(p) 最大长度为 65535。
  • Hive 的 MAP 只支持基元键类型,而 Flink 的 MAP 可以是任何数据类型。
  • 不支持 Hive 的 UNION 类型。
  • Hive 的 TIMESTAMP 的精度总是 9,不支持其他精度。而 Hive 的 UDF 则可以处理精度<=9 的 TIMESTAMP 值。
  • Hive 不支持 Flink 的 TIMESTAMP_WITH_TIME_ZONE、TIMESTAMP_WITH_LOCAL_TIME_ZONE 和 MULTISET。
  • Flink 的 INTERVAL 类型还不能映射到 Hive 的 INTERVAL 类型。

Scala Shell #

注意:由于目前 Scala Shell 并不支持 blink planner,所以不建议在 Scala Shell 中使用 Hive 连接器。

原文链接: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_catalog.html