Wait the light to fall

Hive 集成 - 概览

焉知非鱼

Overview

Hive 集成

Apache Hive 已经确立了自己作为数据仓库生态系统的焦点。它不仅是大数据分析和 ETL 的 SQL 引擎,也是一个数据管理平台,在这里,数据被发现、定义和发展。

Flink 与 Hive 提供了两方面的整合。

第一是利用 Hive 的 Metastore 作为一个持久性目录,与 Flink 的 HiveCatalog 进行跨会话存储 Flink 特定的元数据。例如,用户可以通过使用 HiveCatalog 将 Kafka 或 ElasticSearch 表存储在 Hive Metastore 中,并在以后的 SQL 查询中重复使用。

二是提供 Flink 作为读写 Hive 表的替代引擎。

HiveCatalog 的设计是 “开箱即用”,与现有的 Hive 安装兼容。您不需要修改现有的 Hive Metastore,也不需要改变数据位置或表的分区。

支持的 Hive 版本 #

Flink 支持以下 Hive 版本。

  • 1.0
    • 1.0.0
    • 1.0.1
  • 1.1
    • 1.1.0
    • 1.1.1
  • 1.2
    • 1.2.0
    • 1.2.1
    • 1.2.2
  • 2.0
    • 2.0.0
    • 2.0.1
  • 2.1
    • 2.1.0
    • 2.1.1
  • 2.2
    • 2.2.0
  • 2.3
    • 2.3.0
    • 2.3.1
    • 2.3.2
    • 2.3.3
    • 2.3.4
    • 2.3.5
    • 2.3.6
  • 3.1
    • 3.1.0
    • 3.1.1
    • 3.1.2

请注意 Hive 本身在不同的版本有不同的功能,这些问题不是 Flink 造成的。

  • 1.2.0 及以后版本支持 Hive 内置函数。
  • 3.1.0 及以后版本支持列约束,即 PRIMARY KEY 和 NOT NULL。
  • 在 1.2.0 及以后的版本中,支持修改表的统计数据。
  • 在 1.2.0 及以后的版本中支持 DATE 列统计。
  • 在 2.0.x 中不支持写入 ORC 表。

依赖性 #

为了与 Hive 集成,你需要在 Flink 发行版的 /lib/ 目录下添加一些额外的依赖关系,以使集成工作在 Table API 程序或 SQL 客户端中。另外,你也可以将这些依赖项放在一个专门的文件夹中,并分别用 -C-l 选项将它们添加到 classpath 中,用于 Table API 程序或 SQL Client。

Apache Hive 是建立在 Hadoop 上的,所以首先需要 Hadoop 依赖,请参考提供 Hadoop 类。

有两种方法可以添加 Hive 依赖。首先是使用 Flink 的捆绑式 Hive jars。你可以根据你使用的 metastore 的版本来选择捆绑的 Hive jar。第二种是分别添加每个所需的 jar。如果你使用的 Hive 版本没有在这里列出,第二种方式就会很有用。

使用捆绑的 Hive jar #

下表列出了所有可用的捆绑的 hive jar,你可以选择一个到 Flink 发行版的 /lib/ 目录下。

Metastore version Maven dependency SQL Client JAR
1.0.0 - 1.2.2 flink-sql-connector-hive-1.2.2 Download
2.0.0 - 2.2.0 flink-sql-connector-hive-2.2.0 Download
2.3.0 - 2.3.6 flink-sql-connector-hive-2.3.6 Download
3.0.0 - 3.1.2 flink-sql-connector-hive-3.1.2 Download

用户定义的依赖性 #

请在下面找到不同 Hive 主要版本所需的依赖关系。

  • Hive 2.3.4
/flink-1.11.0
   /lib

       // Flink's Hive connector.Contains flink-hadoop-compatibility and flink-orc jars
       flink-connector-hive_2.11-1.11.0.jar

       // Hive dependencies
       hive-exec-2.3.4.jar
  • Hive 1.0.0
/flink-1.11.0
   /lib

       // Flink's Hive connector
       flink-connector-hive_2.11-1.11.0.jar

       // Hive dependencies
       hive-metastore-1.0.0.jar
       hive-exec-1.0.0.jar
       libfb303-0.9.0.jar // libfb303 is not packed into hive-exec in some versions, need to add it separately
       
       // Orc dependencies -- required by the ORC vectorized optimizations
       orc-core-1.4.3-nohive.jar
       aircompressor-0.8.jar // transitive dependency of orc-core
  • Hive 1.1.0
/flink-1.11.0
   /lib

       // Flink's Hive connector
       flink-connector-hive_2.11-1.11.0.jar

       // Hive dependencies
       hive-metastore-1.1.0.jar
       hive-exec-1.1.0.jar
       libfb303-0.9.2.jar // libfb303 is not packed into hive-exec in some versions, need to add it separately

       // Orc dependencies -- required by the ORC vectorized optimizations
       orc-core-1.4.3-nohive.jar
       aircompressor-0.8.jar // transitive dependency of orc-core
  • Hive 1.2.1
/flink-1.11.0
   /lib

       // Flink's Hive connector
       flink-connector-hive_2.11-1.11.0.jar

       // Hive dependencies
       hive-metastore-1.2.1.jar
       hive-exec-1.2.1.jar
       libfb303-0.9.2.jar // libfb303 is not packed into hive-exec in some versions, need to add it separately

       // Orc dependencies -- required by the ORC vectorized optimizations
       orc-core-1.4.3-nohive.jar
       aircompressor-0.8.jar // transitive dependency of orc-core
  • Hive 2.0.0
/flink-1.11.0
   /lib

       // Flink's Hive connector
       flink-connector-hive_2.11-1.11.0.jar

       // Hive dependencies
       hive-exec-2.0.0.jar
  • Hive 2.1.0
/flink-1.11.0
   /lib

       // Flink's Hive connector
       flink-connector-hive_2.11-1.11.0.jar

       // Hive dependencies
       hive-exec-2.1.0.jar
  • Hive 2.2.0
/flink-1.11.0
   /lib

       // Flink's Hive connector
       flink-connector-hive_2.11-1.11.0.jar

       // Hive dependencies
       hive-exec-2.2.0.jar

       // Orc dependencies -- required by the ORC vectorized optimizations
       orc-core-1.4.3.jar
       aircompressor-0.8.jar // transitive dependency of orc-core
  • Hive 3.1.0
/flink-1.11.0
   /lib

       // Flink's Hive connector
       flink-connector-hive_2.11-1.11.0.jar

       // Hive dependencies
       hive-exec-3.1.0.jar
       libfb303-0.9.3.jar // libfb303 is not packed into hive-exec in some versions, need to add it separately

Program maven #

如果你正在构建你自己的程序,你需要在你的 mvn 文件中加入以下依赖关系。建议不要在生成的 jar 文件中包含这些依赖关系。你应该在运行时添加上面所说的依赖关系。

<!-- Flink Dependency -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-hive_2.11</artifactId>
  <version>1.11.0</version>
  <scope>provided</scope>
</dependency>

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-api-java-bridge_2.11</artifactId>
  <version>1.11.0</version>
  <scope>provided</scope>
</dependency>

<!-- Hive Dependency -->
<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-exec</artifactId>
    <version>${hive.version}</version>
    <scope>provided</scope>
</dependency>

连接到 Hive #

通过表环境或 YAML 配置,使用目录接口和 HiveCatalog 连接到现有的 Hive 安装。

如果 hive-conf/hive-site.xml 文件存储在远程存储系统中,用户应先将 hive 配置文件下载到本地环境中。

请注意,虽然 HiveCatalog 不需要特定的规划师,但读/写 Hive 表只适用于 blink 规划师。因此强烈建议您在连接 Hive 仓库时使用 blink planner。

HiveCatalog 能够自动检测使用中的 Hive 版本。建议不要指定 Hive 版本,除非自动检测失败。

以 Hive 2.3.4 版本为例。

val settings = EnvironmentSettings.newInstance().inBatchMode().build()
val tableEnv = TableEnvironment.create(settings)

val name            = "myhive"
val defaultDatabase = "mydatabase"
val hiveConfDir     = "/opt/hive-conf" // a local path

val hive = new HiveCatalog(name, defaultDatabase, hiveConfDir)
tableEnv.registerCatalog("myhive", hive)

// set the HiveCatalog as the current catalog of the session
tableEnv.useCatalog("myhive")

DDL #

建议使用 Hive 方言在 Flink 中执行 DDL 来创建 Hive 表、视图、分区、函数。

DML #

Flink 支持 DML 写入 Hive 表。请参考读写 Hive 表的细节。

原文链接: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/