Wait the light to fall

时间属性

焉知非鱼

Time Attributes

时间属性

Flink 能够根据不同的时间概念来处理流数据。

  • 处理时间是指正在执行相应操作的机器的系统时间(也称为"挂钟时间")。
  • 事件时间指的是基于时间戳对流媒体数据的处理,时间戳附加在每一行上。时间戳可以编码事件发生的时间。
  • 摄取时间是事件进入 Flink 的时间;在内部,它的处理方式与事件时间类似。

关于 Flink 中时间处理的更多信息,请参见关于事件时间和水印的介绍。

本页解释了如何在 Flink 的表 API 和 SQL 中为基于时间的操作定义时间属性。

时间属性介绍 #

Table APISQL 中的窗口等基于时间的操作都需要时间概念及其来源的信息。因此,表可以提供逻辑时间属性,用于指示时间和在表程序中访问相应的时间戳。

时间属性可以成为每个表模式的一部分。它们是在从 CREATE TABLE DDL 或 DataStream 创建表时定义的,或者是在使用 TableSource 时预先定义的。一旦在开始时定义了时间属性,它就可以作为一个字段被引用,并且可以在基于时间的操作中使用。

只要时间属性没有被修改,只是从查询的一个部分转发到另一个部分,它仍然是一个有效的时间属性。时间属性的行为就像常规的时间戳一样,可以被访问进行计算。如果在计算中使用了时间属性,它将被具体化并成为常规时间戳。常规时间戳不与 Flink 的时间和水印系统合作,因此不能再用于基于时间的操作。

表程序要求已经为流环境指定了相应的时间特征。

val env = StreamExecutionEnvironment.getExecutionEnvironment

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) // default

// alternatively:
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

处理时间 #

处理时间允许表程序根据本地机器的时间产生结果。它是最简单的时间概念,但不提供确定性。它既不需要提取时间戳,也不需要生成水印。

有三种方法可以定义处理时间属性。

在创建表 DDL 中定义 #

处理时间属性是在创建表 DDL 中使用系统 PROCTIME()函数定义为计算列。关于计算列的更多信息请参见 CREATE TABLE DDL

CREATE TABLE user_actions (
  user_name STRING,
  data STRING,
  user_action_time AS PROCTIME() -- declare an additional field as a processing time attribute
) WITH (
  ...
);

SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);

在 DataStream-to-Table 转换期间 #

处理时间属性是在模式定义过程中用 .proctime 属性定义的。时间属性只能通过一个额外的逻辑字段来扩展物理模式。因此,它只能在模式定义的最后定义。

val stream: DataStream[(String, String)] = ...

// declare an additional logical field as a processing time attribute
val table = tEnv.fromDataStream(stream, $"UserActionTimestamp", $"user_name", $"data", $"user_action_time".proctime)

val windowedTable = table.window(Tumble over 10.minutes on $"user_action_time" as "userActionWindow")

使用 Table Source #

处理时间属性由实现 DefinedProctimeAttribute 接口的 TableSource 定义。逻辑时间属性附加到由 TableSource 的返回类型定义的物理模式中。

// define a table source with a processing attribute
class UserActionSource extends StreamTableSource[Row] with DefinedProctimeAttribute {

	override def getReturnType = {
		val names = Array[String]("user_name" , "data")
		val types = Array[TypeInformation[_]](Types.STRING, Types.STRING)
		Types.ROW(names, types)
	}

	override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
		// create stream
		val stream = ...
		stream
	}

	override def getProctimeAttribute = {
		// field with this name will be appended as a third field
		"user_action_time"
	}
}

// register table source
tEnv.registerTableSource("user_actions", new UserActionSource)

val windowedTable = tEnv
	.from("user_actions")
	.window(Tumble over 10.minutes on $"user_action_time" as "userActionWindow")

事件时间 #

事件时间允许表格程序根据每条记录中包含的时间产生结果。这使得即使在事件失序或事件迟到的情况下,也能得到一致的结果。当从持久存储中读取记录时,它还能保证表程序的结果可重放。

此外,事件时间允许在批处理和流环境中对表程序进行统一的语法。流式环境中的时间属性可以是批处理环境中记录的常规字段。

为了处理失序事件,区分流式环境中事件的准时和迟到,Flink 需要从事件中提取时间戳,并在时间上做出某种进展(所谓的水印)。

事件时间属性既可以在创建表 DDL 中定义,也可以在 DataStream 到表的转换过程中定义,或者使用 TableSource 定义。

在创建表 DDL 中定义 #

事件时间属性是在 CREATE TABLE DDL 中使用 WATERMARK 语句定义的。水印语句在现有的事件时间字段上定义了一个水印生成表达式,将事件时间字段标记为事件时间属性。关于水印语句和水印策略的更多信息,请参见 CREATE TABLE DDL

CREATE TABLE user_actions (
  user_name STRING,
  data STRING,
  user_action_time TIMESTAMP(3),
  -- declare user_action_time as event time attribute and use 5 seconds delayed watermark strategy
  WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (
  ...
);

SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);

在 DataStream-to-Table 转换期间 #

事件时间属性是在模式定义期间用 .rowtime 属性定义的。时间戳和水印必须在被转换的 DataStream 中被分配。

在将 DataStream 转换为表时,有两种方法可以定义时间属性。根据指定的.rowtime 字段名是否存在于 DataStream 的模式中,时间戳字段要么是

  • 作为一个新的字段添加到模式中,或
  • 替换一个现有的字段。

无论哪种情况,事件时间戳字段都将持有 DataStream 事件时间戳的值。

// Option 1:

// extract timestamp and assign watermarks based on knowledge of the stream
val stream: DataStream[(String, String)] = inputStream.assignTimestampsAndWatermarks(...)

// declare an additional logical field as an event time attribute
val table = tEnv.fromDataStream(stream, $"user_name", $"data", $"user_action_time".rowtime)


// Option 2:

// extract timestamp from first field, and assign watermarks based on knowledge of the stream
val stream: DataStream[(Long, String, String)] = inputStream.assignTimestampsAndWatermarks(...)

// the first field has been used for timestamp extraction, and is no longer necessary
// replace first field with a logical event time attribute
val table = tEnv.fromDataStream(stream, $"user_action_time".rowtime, $"user_name", $"data")

// Usage:

val windowedTable = table.window(Tumble over 10.minutes on $"user_action_time" as "userActionWindow")

使用 TableSource #

事件时间属性由一个实现 DefinedRowtimeAttributes 接口的 TableSource 定义。getRowtimeAttributeDescriptors() 方法返回一个 RowtimeAttributeDescriptor 列表,用于描述时间属性的最终名称,一个用于导出属性值的时间戳提取器,以及与属性相关的水印策略。

请确保 getDataStream() 方法返回的 DataStream 与定义的时间属性一致。只有当定义了 StreamRecordTimestamp 时间戳提取器时,才会考虑 DataStream 的时间戳(由 TimestampAssigner 分配的时间戳)。只有定义了 PreserveWatermarks 水印策略,DataStream 的水印才会被保留。否则,只有 TableSource 的 rowtime 属性的值是相关的。

// define a table source with a rowtime attribute
class UserActionSource extends StreamTableSource[Row] with DefinedRowtimeAttributes {

	override def getReturnType = {
		val names = Array[String]("user_name" , "data", "user_action_time")
		val types = Array[TypeInformation[_]](Types.STRING, Types.STRING, Types.LONG)
		Types.ROW(names, types)
	}

	override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
		// create stream
		// ...
		// assign watermarks based on the "user_action_time" attribute
		val stream = inputStream.assignTimestampsAndWatermarks(...)
		stream
	}

	override def getRowtimeAttributeDescriptors: util.List[RowtimeAttributeDescriptor] = {
		// Mark the "user_action_time" attribute as event-time attribute.
		// We create one attribute descriptor of "user_action_time".
		val rowtimeAttrDescr = new RowtimeAttributeDescriptor(
			"user_action_time",
			new ExistingField("user_action_time"),
			new AscendingTimestamps)
		val listRowtimeAttrDescr = Collections.singletonList(rowtimeAttrDescr)
		listRowtimeAttrDescr
	}
}

// register the table source
tEnv.registerTableSource("user_actions", new UserActionSource)

val windowedTable = tEnv
	.from("user_actions")
	.window(Tumble over 10.minutes on $"user_action_time" as "userActionWindow")

原文链接: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/time_attributes.html