Wait the light to fall

Catalogs

焉知非鱼

Catalogs

Catalogs

目录提供了元数据,如数据库、表、分区、视图以及访问数据库或其他外部系统中存储的数据所需的功能和信息。

数据处理中最关键的一个方面是管理元数据。它可能是短暂的元数据,如临时表,或针对表环境注册的 UDF。或者是永久性的元数据,比如 Hive Metastore 中的元数据。目录为管理元数据提供了统一的 API,并使其可以从表 API 和 SQL 查询中访问。

Catalog 使用户能够引用数据系统中现有的元数据,并自动将它们映射到 Flink 的相应元数据中。例如,Flink 可以将 JDBC 表自动映射到 Flink 表,用户不必在 Flink 中手动重新编写 DDL。Catalog 大大简化了用户现有系统上手 Flink 所需的步骤,大大提升了用户体验。

Catalog 类型 #

GenericInMemoryCatalog #

GenericInMemoryCatalog 是一个目录的内存实现。所有对象只在会话的生命周期内可用。

JdbcCatalog #

JdbcCatalog 使用户能够通过 JDBC 协议将 Flink 与关系型数据库连接起来。PostgresCatalog 是目前 JDBC Catalog 的唯一实现。关于设置目录的更多细节,请参见 JdbcCatalog 文档

HiveCatalog #

HiveCatalog 有两个目的,一是作为纯 Flink 元数据的持久化存储,二是作为读写现有 Hive 元数据的接口。Flink 的 Hive 文档提供了设置目录和与现有 Hive 安装接口的完整细节。

警告: Hive Metastore 将所有的元对象名称都存储为小写。这与 GenericInMemoryCatalog 不同,后者是区分大小写的。

用户定义的 Catalog #

目录是可插拔的,用户可以通过实现 Catalog 接口来开发自定义目录。要在 SQL CLI 中使用自定义目录,用户应该通过实现 CatalogFactory 接口同时开发目录和它对应的目录工厂。

目录工厂定义了一组属性,用于在 SQL CLI 引导时配置目录。该属性集将被传递给发现服务,服务会尝试将属性与 CatalogFactory 匹配,并启动相应的目录实例。

使用 SQL DDL #

用户可以使用 SQL DDL 在 Table API 和 SQL 中创建目录中的表。

  • Flink SQL
// the catalog should have been registered via yaml file
Flink SQL> CREATE DATABASE mydb WITH (...);

Flink SQL> CREATE TABLE mytable (name STRING, age INT) WITH (...);

Flink SQL> SHOW TABLES;
mytable

详细信息,请查看 Flink SQL CREATE DDL

  • Scala
val tableEnv = ...

// Create a HiveCatalog 
val catalog = new HiveCatalog("myhive", null, "<path_of_hive_conf>")

// Register the catalog
tableEnv.registerCatalog("myhive", catalog)

// Create a catalog database
tableEnv.executeSql("CREATE DATABASE mydb WITH (...)")

// Create a catalog table
tableEnv.executeSql("CREATE TABLE mytable (name STRING, age INT) WITH (...)")

tableEnv.listTables() // should return the tables in current catalog and database.

For detailed information, please check out Flink SQL CREATE DDL.

使用 Java, Scala 或 Python #

用户可以使用 Java、Scala 或 Python 来编程创建目录表。

import org.apache.flink.table.api._
import org.apache.flink.table.catalog._
import org.apache.flink.table.catalog.hive.HiveCatalog
import org.apache.flink.table.descriptors.Kafka

val tableEnv = TableEnvironment.create(EnvironmentSettings.newInstance.build)

// Create a HiveCatalog 
val catalog = new HiveCatalog("myhive", null, "<path_of_hive_conf>")

// Register the catalog
tableEnv.registerCatalog("myhive", catalog)

// Create a catalog database 
catalog.createDatabase("mydb", new CatalogDatabaseImpl(...))

// Create a catalog table
val schema = TableSchema.builder()
    .field("name", DataTypes.STRING())
    .field("age", DataTypes.INT())
    .build()

catalog.createTable(
        new ObjectPath("mydb", "mytable"), 
        new CatalogTableImpl(
            schema,
            new Kafka()
                .version("0.11")
                ....
                .startFromEarlist()
                .toProperties(),
            "my comment"
        ),
        false
    )
    
val tables = catalog.listTables("mydb") // tables should contain "mytable"

Catalog API #

注意:这里只列出了目录程序的 API,用户可以通过 SQL DDL 实现许多相同的功能。用户可以通过 SQL DDL 实现许多相同的功能。详细的 DDL 信息,请参考 SQL CREATE DDL

数据库操作 #

// create database
catalog.createDatabase("mydb", new CatalogDatabaseImpl(...), false);

// drop database
catalog.dropDatabase("mydb", false);

// alter database
catalog.alterDatabase("mydb", new CatalogDatabaseImpl(...), false);

// get database
catalog.getDatabase("mydb");

// check if a database exist
catalog.databaseExists("mydb");

// list databases in a catalog
catalog.listDatabases();

Table 操作 #

// create table
catalog.createTable(new ObjectPath("mydb", "mytable"), new CatalogTableImpl(...), false);

// drop table
catalog.dropTable(new ObjectPath("mydb", "mytable"), false);

// alter table
catalog.alterTable(new ObjectPath("mydb", "mytable"), new CatalogTableImpl(...), false);

// rename table
catalog.renameTable(new ObjectPath("mydb", "mytable"), "my_new_table");

// get table
catalog.getTable("mytable");

// check if a table exist or not
catalog.tableExists("mytable");

// list tables in a database
catalog.listTables("mydb");

视图操作 #

// create view
catalog.createTable(new ObjectPath("mydb", "myview"), new CatalogViewImpl(...), false);

// drop view
catalog.dropTable(new ObjectPath("mydb", "myview"), false);

// alter view
catalog.alterTable(new ObjectPath("mydb", "mytable"), new CatalogViewImpl(...), false);

// rename view
catalog.renameTable(new ObjectPath("mydb", "myview"), "my_new_view", false);

// get view
catalog.getTable("myview");

// check if a view exist or not
catalog.tableExists("mytable");

// list views in a database
catalog.listViews("mydb");

Partition 操作 #

// create view
catalog.createPartition(
    new ObjectPath("mydb", "mytable"),
    new CatalogPartitionSpec(...),
    new CatalogPartitionImpl(...),
    false);

// drop partition
catalog.dropPartition(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...), false);

// alter partition
catalog.alterPartition(
    new ObjectPath("mydb", "mytable"),
    new CatalogPartitionSpec(...),
    new CatalogPartitionImpl(...),
    false);

// get partition
catalog.getPartition(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));

// check if a partition exist or not
catalog.partitionExists(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));

// list partitions of a table
catalog.listPartitions(new ObjectPath("mydb", "mytable"));

// list partitions of a table under a give partition spec
catalog.listPartitions(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));

// list partitions of a table by expression filter
catalog.listPartitionsByFilter(new ObjectPath("mydb", "mytable"), Arrays.asList(epr1, ...));

Function 操作 #

// create function
catalog.createFunction(new ObjectPath("mydb", "myfunc"), new CatalogFunctionImpl(...), false);

// drop function
catalog.dropFunction(new ObjectPath("mydb", "myfunc"), false);

// alter function
catalog.alterFunction(new ObjectPath("mydb", "myfunc"), new CatalogFunctionImpl(...), false);

// get function
catalog.getFunction("myfunc");

// check if a function exist or not
catalog.functionExists("myfunc");

// list functions in a database
catalog.listFunctions("mydb");

目录的 Table API 和 SQL #

注册目录 #

用户可以访问一个名为 default_catalog 的默认内存目录,这个目录总是默认创建的。该目录默认有一个名为 default_database 的单一数据库。用户也可以在现有的 Flink 会话中注册额外的目录。

  • Scala
tableEnv.registerCatalog(new CustomCatalog("myCatalog"));
  • YAML

所有使用 YAML 定义的目录必须提供一个 type 属性,指定目录的类型。以下类型是开箱即用的。

Catalog Type Value
GenericInMemory generic_in_memory
Hive hive
catalogs:
   - name: myCatalog
     type: custom_catalog
     hive-conf-dir: ...

更改当前目录和数据库 #

Flink 将始终搜索当前目录和数据库中的表、视图和 UDF。

  • Scala
tableEnv.useCatalog("myCatalog");
tableEnv.useDatabase("myDb");
  • Flink SQL
Flink SQL> USE CATALOG myCatalog;
Flink SQL> USE myDB;

通过提供 catalog.database.object 形式的完全限定名称,可以访问非当前目录的元数据。

  • Scala
tableEnv.from("not_the_current_catalog.not_the_current_db.my_table");
  • Flink SQL
Flink SQL> SELECT * FROM not_the_current_catalog.not_the_current_db.my_table;

列出可用的目录 #

  • Scala
tableEnv.listCatalogs();
  • Flink SQL
Flink SQL> show catalogs;

列出可用的数据库 #

  • Scala
tableEnv.listDatabases();
  • Flink SQL
Flink SQL> show databases;

列出可用的表 #

  • Scala
tableEnv.listTables();
  • Flink SQL
Flink SQL> show tables;

原文链接: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/catalogs.html