Wait the light to fall

概念和通用 API

焉知非鱼

Concepts and Common API

概念和通用 API #

Table API 和 SQL 被集成在一个联合 API 中。这个 API 的核心概念是一个 Table,作为查询的输入和输出。本文档介绍了具有 Table API 和 SQL 查询的程序的常用结构,如何注册 Table,如何查询 Table,如何发出 Table。

两种 Planners 的主要区别 #

  1. Blink 将批处理作业视为流式作业的一种特殊情况。因此,也不支持 Table 和 DataSet 之间的转换,批处理作业不会被翻译成 DateSet 程序,而是翻译成 DataStream 程序,和流作业一样。
  2. Blink 计划器不支持 BatchTableSource,请使用有界的 StreamTableSource 代替。
  3. 旧计划器和 Blink 计划器的 FilterableTableSource 的实现是不兼容的。旧的规划者会将 PlannerExpressions 推送到 FilterableTableSource 中,而 Blink 规划者会将 Expressions 推送下去。
  4. 基于字符串的键值配置选项(详情请看配置文档)只用于 Blink 规划器。
  5. PlannerConfig 在两个规划器中的实现(CalciteConfig)是不同的。
  6. Blink 规划师将在 TableEnvironment 和 StreamTableEnvironment 上把多个汇优化成一个 DAG。旧的规划器总是会将每个汇优化成一个新的 DAG,其中所有的 DAG 是相互独立的。
  7. 现在老的计划器不支持目录统计,而 Blink 计划器支持。

Table API 和 SQL 程序的结构 #

所有用于批处理和流处理的 Table API 和 SQL 程序都遵循相同的模式。下面的代码示例显示了 Table API 和 SQL 程序的共同结构。

// create a TableEnvironment for specific planner batch or streaming
val tableEnv = ... // see "Create a TableEnvironment" section

// create a Table
tableEnv.connect(...).createTemporaryTable("table1")
// register an output Table
tableEnv.connect(...).createTemporaryTable("outputTable")

// create a Table from a Table API query
val tapiResult = tableEnv.from("table1").select(...)
// create a Table from a SQL query
val sqlResult  = tableEnv.sqlQuery("SELECT ... FROM table1 ...")

// emit a Table API result Table to a TableSink, same for SQL result
val tableResult = tapiResult.executeInsert("outputTable")
tableResult...

注意:表 API 和 SQL 查询可以很容易地与 DataStream 或 DataSet 程序集成并嵌入其中。请查看与 DataStream 和 DataSet API 的集成部分,了解如何将 DataStream 和 DataSets 转换为表,反之亦然。

创建一个 TableEnvironment #

TableEnvironment 是 Table API 和 SQL 集成的核心概念。它负责

  • 在内部目录(catalog)中注册一个 Table
  • 登记目录(catalog)
  • 加载可插拔模块
  • 执行 SQL 查询
  • 注册一个用户定义的(标量、表或聚合)函数
  • 将 DataStream 或 DataSet 转换为 Table
  • 持有对 ExecutionEnvironment 或 StreamExecutionEnvironment 的引用。

一个 Table 总是绑定在一个特定的 TableEnvironment 上。在同一个查询中,不可能将不同 TableEnvironments 的表组合起来,例如,将它们连接或联合起来。

通过调用静态的 BatchTableEnvironment.create()StreamTableEnvironment.create() 方法创建一个 TableEnvironment,其中包含一个 StreamExecutionEnvironment 或 ExecutionEnvironment 和一个可选的 TableConfig。TableConfig 可以用来配置 TableEnvironment 或自定义查询优化和翻译过程(参见 Query Optimization)。

确保选择与你的编程语言相匹配的特定规划器 BatchTableEnvironment/StreamTableEnvironment。

如果这两个规划器 jar 都在 classpath 上(默认行为),你应该明确设置在当前程序中使用哪个规划器。

// **********************
// FLINK STREAMING QUERY
// **********************
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

val fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build()
val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
val fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings)
// or val fsTableEnv = TableEnvironment.create(fsSettings)

// ******************
// FLINK BATCH QUERY
// ******************
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.BatchTableEnvironment

val fbEnv = ExecutionEnvironment.getExecutionEnvironment
val fbTableEnv = BatchTableEnvironment.create(fbEnv)

// **********************
// BLINK STREAMING QUERY
// **********************
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings)
// or val bsTableEnv = TableEnvironment.create(bsSettings)

// ******************
// BLINK BATCH QUERY
// ******************
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}

val bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
val bbTableEnv = TableEnvironment.create(bbSettings)

注意:如果在 /lib 目录下只有一个 planner jar,可以使用 AnyPlanner(python 的 use_any_planner) 来创建特定的环境设置。

在目录(Catalog)中创建表 #

一个 TableEnvironment 维护着一个表的目录图,这些表是用一个标识符创建的。每个标识符由 3 部分组成:目录名、数据库名和对象名。如果没有指定目录或数据库,将使用当前的默认值(参见Table 标识符展开部分的例子)。

表可以是虚拟的(VIEWS)或常规的(TABLES)。VIEWS 可以从现有的 Table 对象创建,通常是 Table API 或 SQL 查询的结果。TABLES 描述外部数据,如文件、数据库表或消息队列。

临时表与永久表 #

表可以是临时的,与单个 Flink 会话的生命周期挂钩,也可以是永久的,在多个 Flink 会话和集群中可见。

永久表需要一个目录(如 Hive Metastore)来维护表的元数据。一旦创建了永久表,它对连接到目录的任何 Flink 会话都是可见的,并将继续存在,直到表被显式放弃。

另一方面,临时表总是存储在内存中,并且只在它们创建的 Flink 会话的持续时间内存在。这些表对其他会话不可见。它们不绑定到任何目录或数据库,但可以在一个目录或数据库的命名空间中创建。如果相应的数据库被删除,临时表不会被删除。

Shadowing #

可以用与现有永久表相同的标识符登记一个临时表。只要临时表存在,临时表就会对永久表产生遮盖,使永久表无法访问。所有使用该标识符的查询都将针对临时表执行。

这可能对实验很有用。它允许首先对临时表运行完全相同的查询,例如,只有一个数据子集,或者数据被混淆了。一旦验证了查询的正确性,就可以针对真正的生产表运行。

创建一个 Table #

虚拟表 #

表 API 对象对应于 SQL 术语中的 VIEW(虚拟表)。它封装了一个逻辑查询计划。它可以在一个目录中创建,具体如下。

// get a TableEnvironment
val tableEnv = ... // see "Create a TableEnvironment" section

// table is the result of a simple projection query 
val projTable: Table = tableEnv.from("X").select(...)

// register the Table projTable as table "projectedTable"
tableEnv.createTemporaryView("projectedTable", projTable)

注意:Table 对象与关系型数据库系统中的 VIEW 类似,即定义 Table 的查询不进行优化,但当另一个查询引用注册的 Table 时,会被内联。如果多个查询引用同一个注册表,则会对每个引用查询进行内联,并执行多次,即注册表的结果不会被共享。

连接器表 #

也可以从连接器声明中创建一个关系型数据库中已知的 TABLE。连接器描述的是存储表数据的外部系统。这里可以声明 Apacha Kafka 或普通文件系统等存储系统。

DDL
tableEnvironment
  .connect(...)
  .withFormat(...)
  .withSchema(...)
  .inAppendMode()
  .createTemporaryTable("MyTable")

扩展 Table 标识符 #

表总是用目录(catalog)、数据库、表名三部分组成的标识符进行注册。

用户可以将其中的一个目录和一个数据库设置为"当前目录"和"当前数据库"。其中,上述 3 部分标识符中的前两部分可以选择,如果不提供,则引用当前目录和当前数据库。用户可以通过表 API 或 SQL 切换当前目录和当前数据库。

标识符遵循 SQL 的要求,这意味着它们可以用反引号符(`)进行转义。

// get a TableEnvironment
val tEnv: TableEnvironment = ...;
tEnv.useCatalog("custom_catalog")
tEnv.useDatabase("custom_database")

val table: Table = ...;

// register the view named 'exampleView' in the catalog named 'custom_catalog'
// in the database named 'custom_database' 
tableEnv.createTemporaryView("exampleView", table)

// register the view named 'exampleView' in the catalog named 'custom_catalog'
// in the database named 'other_database' 
tableEnv.createTemporaryView("other_database.exampleView", table)

// register the view named 'example.View' in the catalog named 'custom_catalog'
// in the database named 'custom_database' 
tableEnv.createTemporaryView("`example.View`", table)

// register the view named 'exampleView' in the catalog named 'other_catalog'
// in the database named 'other_database' 
tableEnv.createTemporaryView("other_catalog.other_database.exampleView", table)

查询一个 Table #

Table API #

Table API 是 Scala 和 Java 的语言集成查询 API。与 SQL 不同的是,查询不是指定为 Strings,而是在宿主语言中一步步组成。

该 API 基于 Table 类,它表示一个表(流式或批处理),并提供了应用关系操作的方法。这些方法返回一个新的 Table 对象,该对象表示对输入的 Table 应用关系操作的结果。有些关系操作由多个方法调用组成,如 table.groupBy(...).select(),其中 groupBy(...) 指定表的分组,select(...) 是表的分组上的投影。

Table API 文档描述了流式表和批处理表上支持的所有 Table API 操作。

下面的示例显示了一个简单的 Table API 聚合查询。

// get a TableEnvironment
val tableEnv = ... // see "Create a TableEnvironment" section

// register Orders table

// scan registered Orders table
val orders = tableEnv.from("Orders")
// compute revenue for all customers from France
val revenue = orders
  .filter($"cCountry" === "FRANCE")
  .groupBy($"cID", $"cName")
  .select($"cID", $"cName", $"revenue".sum AS "revSum")

// emit or convert Table
// execute query

注意:Scala Table API 使用以美元符号($)开头的 Scala 字符串插值来引用 Table 的属性。Table API 使用 Scala implicits。请确保导入

  • org.apache.flink.table.api._ - 用于隐式表达式转换
  • org.apache.flink.api.scala._org.apache.flink.table.api.bridge.scala._,如果你想从 DataStream 转换到 DataStream。

SQL #

Flink 的 SQL 集成是基于 Apache Calcite,它实现了 SQL 标准。SQL 查询被指定为常规 Strings。

SQL 文档描述了 Flink 对流和批处理表的 SQL 支持。

下面的例子展示了如何指定一个查询并将结果以表的形式返回。

// get a TableEnvironment
val tableEnv = ... // see "Create a TableEnvironment" section

// register Orders table

// compute revenue for all customers from France
val revenue = tableEnv.sqlQuery("""
  |SELECT cID, cName, SUM(revenue) AS revSum
  |FROM Orders
  |WHERE cCountry = 'FRANCE'
  |GROUP BY cID, cName
  """.stripMargin)

// emit or convert Table
// execute query

下面的示例显示了如何指定一个更新查询,将其结果插入到注册表中。

// get a TableEnvironment
val tableEnv = ... // see "Create a TableEnvironment" section

// register "Orders" table
// register "RevenueFrance" output table

// compute revenue for all customers from France and emit to "RevenueFrance"
tableEnv.executeSql("""
  |INSERT INTO RevenueFrance
  |SELECT cID, cName, SUM(revenue) AS revSum
  |FROM Orders
  |WHERE cCountry = 'FRANCE'
  |GROUP BY cID, cName
  """.stripMargin)

混合 Table API 和 SQL #

表 API 和 SQL 查询可以很容易地混合,因为两者都返回 Table 对象。

  • 可以在 SQL 查询返回的 Table 对象上定义 Table API 查询。
  • 通过在 TableEnvironment 中注册生成的 Table并在 SQL 查询的 FROM 子句中引用它,可以在 Table API 查询的结果上定义一个 SQL 查询。

发出一个表 #

一个 Table 是通过将其写入 TableSink 而发出的。TableSink 是一个通用接口,它支持多种文件格式(如 CSV、Apache Parquet、Apache Avro)、存储系统(如 JDBC、Apache HBase、Apache Cassandra、Elasticsearch)或消息系统(如 Apache Kafka、RabbitMQ)。

批量表只能写入 BatchTableSink,而流式表则需要 AppendStreamTableSink、RetractStreamTableSink 或 UpsertStreamTableSink。

请参阅有关 Table Sources & Sink 的文档,以了解可用的 Sink 的详细信息以及如何实现自定义 TableSink 的说明。

Table.executeInsert(String tableName) 方法将 Table 排放到一个注册的 TableSink 中。该方法通过名称从目录中查找 TableSink,并验证 Table 的模式与 TableSink 的模式是否相同。

下面的示例展示了如何发射 Table。

// get a TableEnvironment
val tableEnv = ... // see "Create a TableEnvironment" section

// create an output Table
val schema = new Schema()
    .field("a", DataTypes.INT())
    .field("b", DataTypes.STRING())
    .field("c", DataTypes.LONG())

tableEnv.connect(new FileSystem("/path/to/file"))
    .withFormat(new Csv().fieldDelimiter('|').deriveSchema())
    .withSchema(schema)
    .createTemporaryTable("CsvSinkTable")

// compute a result Table using Table API operators and/or SQL queries
val result: Table = ...

// emit the result Table to the registered TableSink
result.executeInsert("CsvSinkTable")

翻译和执行查询 #

两个规划器翻译和执行查询的行为是不同的。

  • Blink 计划器

表 API 和 SQL 查询无论其输入是流式还是批处理,都会被翻译成 DataStream 程序。一个查询在内部表示为一个逻辑查询计划,并分两个阶段进行翻译。

  1. 逻辑计划的优化。
  2. 翻译成 DataStream 程序。

Table API 或 SQL 查询被翻译时:

  • TableEnvironment.executeSql() 被调用。这个方法用于执行给定的语句,一旦这个方法被调用,sql 查询就会立即被翻译。
  • Table.executeInsert() 被调用。该方法用于将表的内容插入到给定的 sink 路径中,一旦调用该方法,Table API 立即被翻译。
  • 调用 Table.execute()。该方法用于将表内容收集到本地客户端,一旦调用该方法,Table API 立即被翻译。
  • StatementSet.execute() 被调用。一个 Table(通过 StatementSet.addInsert() 向 sink 发出)或一个 INSERT 语句(通过 StatementSet.addInsertSql() 指定)将首先在 StatementSet 中被缓冲。一旦 StatementSet.execute() 被调用,它们就会被翻译。所有接收器将被优化成一个 DAG。
  • 当一个表被转换为 DataStream 时,它就会被翻译(参见与 DataStream 和 DataSet API 的集成)。一旦翻译完毕,它就是一个常规的 DataStream 程序,并在调用 StreamExecutionEnvironment.execut()时被执行。 注意: 从 1.11 版本开始,sqlUpdate() 方法和 insertInto() 方法已被废弃。如果 Table 程序是由这两个方法构建的,我们必须使用 StreamTableEnvironment.execution() 方法代替 StreamExecutionEnvironment.execution() 方法来执行。

与 DataStream 和 DataSet API 的集成 #

两种流上的计划器都可以与 DataStream API 集成,只有老的计划器可以与 DataSet API 集成,批处理的 Blink 计划器不能与两者结合。只有旧的计划器可以与 DataSet API 集成,批处理的 Blink 计划器不能与两者结合。注:下面讨论的 DataSet API 只适用于批处理的旧版规划器。

Table API 和 SQL 查询可以很容易地与 DataStreamDataSet 程序集成并嵌入其中。例如,可以查询一个外部表(例如来自 RDBMS),做一些预处理,如过滤、投影、聚合或加入元数据,然后用 DataStream 或 DataSet API(以及建立在这些 API 之上的任何库,如 CEP 或 Gelly)进一步处理数据。反之,也可以在 DataStream 或 DataSet 程序的结果上应用 Table API 或 SQL 查询。

这种交互可以通过将 DataStream 或 DataSet 转换为表来实现,反之亦然。在本节中,我们将描述这些转换是如何完成的。

Scala 隐式转换 #

Scala Table API 为 DataSet、DataStream 和 Table 类提供了隐式转换的功能。这些转换是通过导入包 org.apache.flink.table.api.bridge.scala._ 来实现的,此外还可以导入 org.apache.flink.api.scala._ 来实现 Scala DataStream API。

从 DataStream 或 DataSet 创建视图 #

DataStream 或 DataSet 可以作为视图在 TableEnvironment 中注册。由此产生的视图的模式取决于注册的 DataStream 或 DataSet 的数据类型。请查看有关数据类型到表模式的映射部分以了解详情。

注意:从 DataStream 或 DataSet 创建的视图只能注册为临时视图。

// get TableEnvironment 
// registration of a DataSet is equivalent
val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section

val stream: DataStream[(Long, String)] = ...

// register the DataStream as View "myTable" with fields "f0", "f1"
tableEnv.createTemporaryView("myTable", stream)

// register the DataStream as View "myTable2" with fields "myLong", "myString"
tableEnv.createTemporaryView("myTable2", stream, 'myLong, 'myString)

将 DataStream 或 DataSet 转换为 Table #

不需要在 TableEnvironment 中注册一个 DataStream 或 DataSet,也可以直接将其转换为 Table。如果你想在 Table API 查询中使用 Table,这很方便。

// get TableEnvironment
// registration of a DataSet is equivalent
val tableEnv = ... // see "Create a TableEnvironment" section

val stream: DataStream[(Long, String)] = ...

// convert the DataStream into a Table with default fields "_1", "_2"
val table1: Table = tableEnv.fromDataStream(stream)

// convert the DataStream into a Table with fields "myLong", "myString"
val table2: Table = tableEnv.fromDataStream(stream, $"myLong", $"myString")

将 Table 转换为 DataStream 或 DataSet #

Table 可以被转换为 DataStream 或 DataSet。通过这种方式,可以在表 API 或 SQL 查询的结果上运行自定义 DataStream 或 DataSet 程序。

当将 Table 转换为 DataStream 或 DataSet 时,您需要指定生成的 DataStream 或 DataSet 的数据类型,即表的行要转换为的数据类型。通常,最方便的转换类型是 Row。下面的列表给出了不同选项的功能概述。

  • Row:字段按位置映射,字段数量任意,支持 null 值,无类型安全访问。
  • POJO:字段按名称映射(POJO 字段必须与表字段一样命名),任意数量的字段,支持 null 值,类型安全访问。
  • Case Class:字段按位置映射,不支持 null 值,类型安全访问。
  • Tuple:字段按位置映射,限制为 22 个(Scala)或 25 个(Java)字段,不支持 null 值,类型安全访问。
  • 原子类型:表必须有一个字段,不支持空值,类型安全访问。表必须有一个字段,不支持 null 值,类型安全访问。

将 Table 转换为 DataStream #

作为流式查询结果的表将被动态更新,即随着查询输入流中新记录的到达而变化。因此,将这种动态查询转换成的 DataStream 需要对表的更新进行编码。

有两种模式可以将表转换为 DataStream。

  1. Append 模式。只有当动态 Table 只被 INSERT 修改时,才可以使用这种模式,即只进行追加,之前发出的结果永远不会更新。
  2. 收回模式。这种模式可以一直使用。它将 INSERT 和 DELETE 更改用布尔标志编码。
// get TableEnvironment. 
// registration of a DataSet is equivalent
val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section

// Table with two fields (String name, Integer age)
val table: Table = ...

// convert the Table into an append DataStream of Row
val dsRow: DataStream[Row] = tableEnv.toAppendStream[Row](table)

// convert the Table into an append DataStream of Tuple2[String, Int]
val dsTuple: DataStream[(String, Int)] dsTuple = 
  tableEnv.toAppendStream[(String, Int)](table)

// convert the Table into a retract DataStream of Row.
//   A retract stream of type X is a DataStream[(Boolean, X)]. 
//   The boolean field indicates the type of the change. 
//   True is INSERT, false is DELETE.
val retractStream: DataStream[(Boolean, Row)] = tableEnv.toRetractStream[Row](table)

注意:关于动态表及其属性的详细讨论在动态表文档中给出。

注意: 一旦表转换为 DataStream,请使用 StreamExecutionEnvironment.execute() 方法来执行 DataStream 程序。

将 Table 转换为 DataSet #

Table 转换为 DataStream 的过程如下:

// get TableEnvironment 
// registration of a DataSet is equivalent
val tableEnv = BatchTableEnvironment.create(env)

// Table with two fields (String name, Integer age)
val table: Table = ...

// convert the Table into a DataSet of Row
val dsRow: DataSet[Row] = tableEnv.toDataSet[Row](table)

// convert the Table into a DataSet of Tuple2[String, Int]
val dsTuple: DataSet[(String, Int)] = tableEnv.toDataSet[(String, Int)](table)

注意: 一旦 Table 转换为 DataSet,我们必须使用 ExecutionEnvironment.execute 方法来执行 DataSet 程序。

数据类型到 Table Schema 的映射 #

Flink 的 DataStream 和 DataSet API 支持非常多样化的类型。复合类型,如 Tuples(内置的 Scala 和 Flink Java tuples)、POJOs、Scala case 类和 Flink 的 Row 类型,允许嵌套具有多个字段的数据结构,这些字段可以在 Table 表达式中访问。其他类型被视为原子类型。在下文中,我们将描述 Table API 如何将这些类型转换为内部行表示,并展示将 DataStream 转换为 Table 的例子。

数据类型到 Table Schema 的映射可以通过两种方式进行:基于字段位置或基于字段名。

  • 基于位置的映射

基于位置的映射可以用来给字段一个更有意义的名字,同时保持字段顺序。这种映射可用于具有定义字段顺序的复合数据类型以及原子类型。复合数据类型如元组、行和 case 类都有这样的字段顺序。然而,POJO 的字段必须根据字段名进行映射(见下一节)。字段可以被投影出来,但不能使用别名作为重命名。

当定义基于位置的映射时,指定的名称必须不存在于输入数据类型中,否则 API 将假设映射应该基于字段名发生。如果没有指定字段名,则使用复合类型的默认字段名和字段顺序,对于原子类型则使用 f0。

// get a TableEnvironment
val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section

val stream: DataStream[(Long, Int)] = ...

// convert DataStream into Table with default field names "_1" and "_2"
val table: Table = tableEnv.fromDataStream(stream)

// convert DataStream into Table with field "myLong" only
val table: Table = tableEnv.fromDataStream(stream, $"myLong")

// convert DataStream into Table with field names "myLong" and "myInt"
val table: Table = tableEnv.fromDataStream(stream, $"myLong", $"myInt")
  • 基于名称的映射

基于名称的映射可以用于任何数据类型,包括 POJO。它是定义表模式映射的最灵活的方式。映射中的所有字段都是通过名称引用的,并可能使用别名重命名为。字段可以重新排序和投影出来。

如果没有指定字段名,则使用复合类型的默认字段名和字段顺序,对于原子类型则使用 f0。

// get a TableEnvironment
val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section

val stream: DataStream[(Long, Int)] = ...

// convert DataStream into Table with default field names "_1" and "_2"
val table: Table = tableEnv.fromDataStream(stream)

// convert DataStream into Table with field "_2" only
val table: Table = tableEnv.fromDataStream(stream, $"_2")

// convert DataStream into Table with swapped fields
val table: Table = tableEnv.fromDataStream(stream, $"_2", $"_1")

// convert DataStream into Table with swapped fields and field names "myInt" and "myLong"
val table: Table = tableEnv.fromDataStream(stream, $"_2" as "myInt", $"_1" as "myLong")

原子类型 #

Flink 将原语(Integer、Double、String)或通用类型(不能分析和分解的类型)视为原子类型。原子类型的 DataStream 或 DataSet 会被转换为具有单一属性的 Table。属性的类型是从原子类型推断出来的,可以指定属性的名称。

// get a TableEnvironment
val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section

val stream: DataStream[Long] = ...

// convert DataStream into Table with default field name "f0"
val table: Table = tableEnv.fromDataStream(stream)

// convert DataStream into Table with field name "myLong"
val table: Table = tableEnv.fromDataStream(stream, $"myLong")

Tuples(Scala 和 Java)和 Case 类(仅 Scala)。 #

Flink 支持 Scala 的内置元组,并为 Java 提供了自己的元组类。DataStreams 和 DataSets 这两种元组都可以转换为表。通过为所有字段提供名称(基于位置的映射),可以重命名字段。如果没有指定字段名,则使用默认的字段名。如果引用了原始的字段名(对于 Flink Tuples 来说是 f0, f1, …,对于 Scala Tuples 来说是 _1, _2, …),API 会假定映射是基于名称而不是基于位置的。基于名称的映射允许重新排序字段和用别名(as)进行投影。

// get a TableEnvironment
val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section

val stream: DataStream[(Long, String)] = ...

// convert DataStream into Table with renamed default field names '_1, '_2
val table: Table = tableEnv.fromDataStream(stream)

// convert DataStream into Table with field names "myLong", "myString" (position-based)
val table: Table = tableEnv.fromDataStream(stream, $"myLong", $"myString")

// convert DataStream into Table with reordered fields "_2", "_1" (name-based)
val table: Table = tableEnv.fromDataStream(stream, $"_2", $"_1")

// convert DataStream into Table with projected field "_2" (name-based)
val table: Table = tableEnv.fromDataStream(stream, $"_2")

// convert DataStream into Table with reordered and aliased fields "myString", "myLong" (name-based)
val table: Table = tableEnv.fromDataStream(stream, $"_2" as "myString", $"_1" as "myLong")

// define case class
case class Person(name: String, age: Int)
val streamCC: DataStream[Person] = ...

// convert DataStream into Table with default field names 'name, 'age
val table = tableEnv.fromDataStream(streamCC)

// convert DataStream into Table with field names 'myName, 'myAge (position-based)
val table = tableEnv.fromDataStream(streamCC, $"myName", $"myAge")

// convert DataStream into Table with reordered and aliased fields "myAge", "myName" (name-based)
val table: Table = tableEnv.fromDataStream(stream, $"age" as "myAge", $"name" as "myName")

POJO(Java 和 Scala) #

Flink 支持 POJO 作为复合类型。这里记录了确定 POJO 的规则。

当将 POJO DataStream 或 DataSet 转换为 Table 而不指定字段名时,会使用原始 POJO 字段的名称。名称映射需要原始名称,不能通过位置来完成。字段可以使用别名(使用 as 关键字)重命名,重新排序,并进行投影。

// get a TableEnvironment
val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section

// Person is a POJO with field names "name" and "age"
val stream: DataStream[Person] = ...

// convert DataStream into Table with default field names "age", "name" (fields are ordered by name!)
val table: Table = tableEnv.fromDataStream(stream)

// convert DataStream into Table with renamed fields "myAge", "myName" (name-based)
val table: Table = tableEnv.fromDataStream(stream, $"age" as "myAge", $"name" as "myName")

// convert DataStream into Table with projected field "name" (name-based)
val table: Table = tableEnv.fromDataStream(stream, $"name")

// convert DataStream into Table with projected and renamed field "myName" (name-based)
val table: Table = tableEnv.fromDataStream(stream, $"name" as "myName")

Row #

Row 数据类型支持任意数量的字段和具有 null 值的字段。字段名可以通过 RowTypeInfo 来指定,也可以在将 Row DataStream 或 DataSet 转换为 Table 时指定。Row 类型支持通过位置和名称对字段进行映射。可以通过为所有字段提供名称(基于位置的映射)或单独选择字段进行投影/排序/重命名(基于名称的映射)来重命名字段。

// get a TableEnvironment
val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section

// DataStream of Row with two fields "name" and "age" specified in `RowTypeInfo`
val stream: DataStream[Row] = ...

// convert DataStream into Table with default field names "name", "age"
val table: Table = tableEnv.fromDataStream(stream)

// convert DataStream into Table with renamed field names "myName", "myAge" (position-based)
val table: Table = tableEnv.fromDataStream(stream, $"myName", $"myAge")

// convert DataStream into Table with renamed fields "myName", "myAge" (name-based)
val table: Table = tableEnv.fromDataStream(stream, $"name" as "myName", $"age" as "myAge")

// convert DataStream into Table with projected field "name" (name-based)
val table: Table = tableEnv.fromDataStream(stream, $"name")

// convert DataStream into Table with projected and renamed field "myName" (name-based)
val table: Table = tableEnv.fromDataStream(stream, $"name" as "myName")

查询优化 #

  • Blink 计划器

Apache Flink 利用并扩展了 Apache Calcite 来执行复杂的查询优化。这包括一系列基于规则和成本的优化,如:

  • 基于 Apache Calcite 的子查询装饰相关。
  • 投影修剪
  • 分区修剪
  • 过滤器下推
  • 子计划重复复制,避免重复计算。
  • 特殊子查询重写,包括两部分。
    • 将 IN 和 EXISTS 转换为左半连接。
    • 将 NOT IN 和 NOT EXISTS 转换为左反连接。
  • 可选的 join 重新排序
    • 通过 table.optimizer.join-reorder-enabled 启用。

注:IN/EXISTS/NOT IN/NOT EXISTS 目前只支持子查询重写中的连词条件。

优化器做出智能决策,不仅基于计划,还基于数据源提供的丰富统计数据,以及每个操作符(如 io、cpu、网络和内存)的细粒度成本。

高级用户可以通过 CalciteConfig 对象提供自定义优化,该对象可以通过调用 TableEnvironment#getConfig#setPlannerConfig 提供给 table 环境。

解释表 #

Table API 提供了一种机制来解释计算 Table 的逻辑和优化查询计划。这是通过 Table.explain() 方法或 StatementSet.explain() 方法完成的。Table.explain() 返回一个 Table 的计划。StatementSet.explain() 返回多个接收器的计划。它返回一个描述三个计划的字符串。

  1. 关系查询的抽象语法树,即未优化的逻辑查询计划。
  2. 优化的逻辑查询计划,以及
  3. 物理执行计划。

TableEnvironment.explainSql()TableEnvironment.executeSql() 支持执行 EXPLAIN 语句来获取计划,请参考 EXPLAIN 页面。

下面的代码显示了一个使用 Table.explain() 方法给定 Table 的例子和相应的输出。

val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env)

val table1 = env.fromElements((1, "hello")).toTable(tEnv, $"count", $"word")
val table2 = env.fromElements((1, "hello")).toTable(tEnv, $"count", $"word")
val table = table1
  .where($"word".like("F%"))
  .unionAll(table2)
println(table.explain())

上述例子的结果是:

== Abstract Syntax Tree ==
LogicalUnion(all=[true])
  LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
    FlinkLogicalDataStreamScan(id=[1], fields=[count, word])
  FlinkLogicalDataStreamScan(id=[2], fields=[count, word])

== Optimized Logical Plan ==
DataStreamUnion(all=[true], union all=[count, word])
  DataStreamCalc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
    DataStreamScan(id=[1], fields=[count, word])
  DataStreamScan(id=[2], fields=[count, word])

== Physical Execution Plan ==
Stage 1 : Data Source
	content : collect elements with CollectionInputFormat

Stage 2 : Data Source
	content : collect elements with CollectionInputFormat

	Stage 3 : Operator
		content : from: (count, word)
		ship_strategy : REBALANCE

		Stage 4 : Operator
			content : where: (LIKE(word, _UTF-16LE'F%')), select: (count, word)
			ship_strategy : FORWARD

			Stage 5 : Operator
				content : from: (count, word)
				ship_strategy : REBALANCE

下面的代码显示了使用 StatementSet.explain() 方法进行多重接收器计划的一个例子和相应的输出。

val settings = EnvironmentSettings.newInstance.useBlinkPlanner.inStreamingMode.build
val tEnv = TableEnvironment.create(settings)

val schema = new Schema()
    .field("count", DataTypes.INT())
    .field("word", DataTypes.STRING())

tEnv.connect(new FileSystem("/source/path1"))
    .withFormat(new Csv().deriveSchema())
    .withSchema(schema)
    .createTemporaryTable("MySource1")
tEnv.connect(new FileSystem("/source/path2"))
    .withFormat(new Csv().deriveSchema())
    .withSchema(schema)
    .createTemporaryTable("MySource2")
tEnv.connect(new FileSystem("/sink/path1"))
    .withFormat(new Csv().deriveSchema())
    .withSchema(schema)
    .createTemporaryTable("MySink1")
tEnv.connect(new FileSystem("/sink/path2"))
    .withFormat(new Csv().deriveSchema())
    .withSchema(schema)
    .createTemporaryTable("MySink2")
    
val stmtSet = tEnv.createStatementSet()

val table1 = tEnv.from("MySource1").where($"word".like("F%"))
stmtSet.addInsert("MySink1", table1)

val table2 = table1.unionAll(tEnv.from("MySource2"))
stmtSet.addInsert("MySink2", table2)

val explanation = stmtSet.explain()
println(explanation)

多重接收器计划的结果是:

== Abstract Syntax Tree ==
LogicalLegacySink(name=[MySink1], fields=[count, word])
+- LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
   +- LogicalTableScan(table=[[default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]]])

LogicalLegacySink(name=[MySink2], fields=[count, word])
+- LogicalUnion(all=[true])
   :- LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
   :  +- LogicalTableScan(table=[[default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]]])
   +- LogicalTableScan(table=[[default_catalog, default_database, MySource2, source: [CsvTableSource(read fields: count, word)]]])

== Optimized Logical Plan ==
Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')], reuse_id=[1])
+- TableSourceScan(table=[[default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]]], fields=[count, word])

LegacySink(name=[MySink1], fields=[count, word])
+- Reused(reference_id=[1])

LegacySink(name=[MySink2], fields=[count, word])
+- Union(all=[true], union=[count, word])
   :- Reused(reference_id=[1])
   +- TableSourceScan(table=[[default_catalog, default_database, MySource2, source: [CsvTableSource(read fields: count, word)]]], fields=[count, word])

== Physical Execution Plan ==
Stage 1 : Data Source
	content : collect elements with CollectionInputFormat

	Stage 2 : Operator
		content : CsvTableSource(read fields: count, word)
		ship_strategy : REBALANCE

		Stage 3 : Operator
			content : SourceConversion(table:Buffer(default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]), fields:(count, word))
			ship_strategy : FORWARD

			Stage 4 : Operator
				content : Calc(where: (word LIKE _UTF-16LE'F%'), select: (count, word))
				ship_strategy : FORWARD

				Stage 5 : Operator
					content : SinkConversionToRow
					ship_strategy : FORWARD

					Stage 6 : Operator
						content : Map
						ship_strategy : FORWARD

Stage 8 : Data Source
	content : collect elements with CollectionInputFormat

	Stage 9 : Operator
		content : CsvTableSource(read fields: count, word)
		ship_strategy : REBALANCE

		Stage 10 : Operator
			content : SourceConversion(table:Buffer(default_catalog, default_database, MySource2, source: [CsvTableSource(read fields: count, word)]), fields:(count, word))
			ship_strategy : FORWARD

			Stage 12 : Operator
				content : SinkConversionToRow
				ship_strategy : FORWARD

				Stage 13 : Operator
					content : Map
					ship_strategy : FORWARD

					Stage 7 : Data Sink
						content : Sink: CsvTableSink(count, word)
						ship_strategy : FORWARD

						Stage 14 : Data Sink
							content : Sink: CsvTableSink(count, word)
							ship_strategy : FORWARD

原文链接: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/common.html