Wait the light to fall

Flink Dataset API 编程指南

焉知非鱼

Flink Dataset Api Programming Guide

Flink 中的数据集程序是对数据集实现转换(如过滤、映射、加入、分组)的常规程序。数据集最初是从某些来源创建的(例如,通过读取文件,或从本地集合中创建)。结果通过汇返回,例如可以将数据写入(分布式)文件,或标准输出(例如命令行终端)。Flink 程序可以在各种环境下运行,独立运行,或者嵌入其他程序中。执行可以发生在本地 JVM 中,也可以发生在许多机器的集群中。

请参考 DataStream API 概述,了解 Flink API 的基本概念。该概述是针对 DataStream API 的,但这两个 API 的基本概念是一样的。

为了创建你自己的 Flink DataSet 程序,我们鼓励你从 Flink 程序的骨架开始,并逐步添加你自己的转换。其余部分作为附加操作和高级功能的参考。

程序示例 #

下面的程序是一个完整的、可以使用的 WordCount 的例子,你可以复制和粘贴代码在本地运行。你可以复制和粘贴代码在本地运行它。你只需要在你的项目中加入正确的 Flink 的库(参见与 Flink 的链接部分)并指定导入。然后你就可以开始了

import org.apache.flink.api.scala._

object WordCount {
  def main(args: Array[String]) {

    val env = ExecutionEnvironment.getExecutionEnvironment
    val text = env.fromElements(
      "Who's there?",
      "I think I hear them. Stand, ho! Who's there?")

    val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
      .map { (_, 1) }
      .groupBy(0)
      .sum(1)

    counts.print()
  }
}

DataSet 转换 #

数据转换将一个或多个 DataSet 转换为一个新的 DataSet。程序可以将多个转换组合成复杂的集合。

本节简要介绍了可用的转换。转换文档中有所有变换的完整描述和示例。

  • Map

接受一个元素,产生一个元素。

data.map { x => x.toInt }
  • FlatMap

接受一个元素并产生零、一个或多个元素。

data.flatMap { str => str.split(" ") }
  • MapPartition

在一个函数调用中转换一个并行分区。该函数以"迭代器"的形式获取分区,并可产生任意数量的结果值。每个分区的元素数量取决于平行度和之前的操作。

data.mapPartition { in => in map { (_, 1) } }
  • Filter

对每个元素进行布尔函数评估,并保留那些函数返回真的元素。 重要:系统假设函数不会修改应用谓词的元素。违反这个假设会导致错误的结果。

data.filter { _ > 1000 }
  • Reduce

通过重复将两个元素合并为一个元素,将一组元素合并为一个元素。换算可以应用于一个完整的数据集,也可以应用于一个分组的数据集。

data.reduce { _ + _ }
  • ReduceGroup

将一组元素合并成一个或多个元素。ReduceGroup 可以应用在一个完整的数据集上,也可以应用在一个分组的数据集上。

data.reduceGroup { elements => elements.sum }
  • Aggregate

将一组值聚合成一个值。Aggregation 函数可以被认为是内置的 reduce 函数。Aggregate 可以应用于一个完整的数据集,也可以应用于一个分组的数据集。

val input: DataSet[(Int, String, Double)] = // [...]
val output: DataSet[(Int, String, Double)] = input.aggregate(SUM, 0).aggregate(MIN, 2)

你也可以使用简写语法来进行 minimum, maximumsum 的聚合。

val input: DataSet[(Int, String, Double)] = // [...]
val output: DataSet[(Int, String, Double)] = input.sum(0).min(2)
  • Distinct

返回数据集的不同元素。它从输入的 DataSet 中删除元素的所有字段或字段子集的重复条目。

data.distinct()
  • Join

通过创建所有键值相等的元素对来连接两个数据集。可以选择使用 JoinFunction 将一对元素变成一个元素,或者使用 FlatJoinFunction 将一对元素变成任意多个(包括无)元素。请参阅部分了解如何定义连接键。

// In this case tuple fields are used as keys. "0" is the join field on the first tuple
// "1" is the join field on the second tuple.
val result = input1.join(input2).where(0).equalTo(1)

你可以通过 Join Hints 指定运行时执行连接的方式。这些提示描述了连接是通过分区还是广播进行的,以及它是使用基于排序还是基于散列的算法。请参考转换指南,了解可能的提示列表和示例。 如果没有指定提示,系统将尝试对输入大小进行估计,并根据这些估计选择最佳策略。

// This executes a join by broadcasting the first data set
// using a hash table for the broadcast data
val result = input1.join(input2, JoinHint.BROADCAST_HASH_FIRST)
                   .where(0).equalTo(1)

请注意,连接转换只适用于等价连接。其他的连接类型需要使用 OuterJoin 或 CoGroup 来表达。

  • OuterJoin

在两个数据集上执行左联接、右联接或完全外联接。外联接与常规(内联接)类似,创建所有键值相同的元素对。此外,如果在另一侧没有找到匹配的键,“外侧"的记录(左、右或全联接时两者都有)将被保留。匹配的元素对(或一个元素和另一个输入的 null 值)被交给 JoinFunction 将这对元素变成单个元素,或交给 FlatJoinFunction 将这对元素变成任意多个(包括无)元素。请参阅部分,了解如何定义连接键。

val joined = left.leftOuterJoin(right).where(0).equalTo(1) {
   (left, right) =>
     val a = if (left == null) "none" else left._1
     (a, right)
  }
  • CoGroup

减少操作的二维变体。在一个或多个字段上对每个输入进行分组,然后将分组合并。每一对组都会调用转换函数。请参阅部分,了解如何定义 coGroup 键。

data1.coGroup(data2).where(0).equalTo(1)
  • Cross

建立两个输入的笛卡尔乘积(交叉乘积),创建所有元素对。可选择使用交叉函数将一对元素变成一个单一元素。

val data1: DataSet[Int] = // [...]
val data2: DataSet[String] = // [...]
val result: DataSet[(Int, String)] = data1.cross(data2)

注意:Cross 可能是一个非常耗费计算的操作,甚至可以挑战大型计算集群!建议使用 crossWithTiny()crossWithHuge() 来提示系统数据集的大小。

  • Union

  • 产生两个数据集的并集。

data.union(data2)
  • Rebalance

均匀地重新平衡数据集的并行分区,以消除数据倾斜。只有类似于 Map 的变换才可以跟随重新平衡(rebalance)变换。

val data1: DataSet[Int] = // [...]
val result: DataSet[(Int, String)] = data1.rebalance().map(...)
  • Hash-Partition

在给定的键上对数据集进行散列分区。键可以被指定为位置键、表达式键和键选择函数。

val in: DataSet[(Int, String)] = // [...]
val result = in.partitionByHash(0).mapPartition { ... }
  • Range-Partition

在给定的键上按照范围分割数据集。键可以被指定为位置键、表达式键和键选择函数。

val in: DataSet[(Int, String)] = // [...]
val result = in.partitionByRange(0).mapPartition { ... }
  • 自定义分区

使用自定义的 Partitioner 函数,根据键将记录分配到特定的分区。键可以指定为位置键、表达式键和键选择函数。 注意:此方法仅适用于单个字段键。

val in: DataSet[(Int, String)] = // [...]
val result = in
  .partitionCustom(partitioner, key).mapPartition { ... }
  • Sort Partition

按照指定的顺序对数据集的所有分区进行本地排序。字段可以指定为元组位置或字段表达式。对多个字段的排序是通过链式 sortPartition() 调用完成的。

val in: DataSet[(Int, String)] = // [...]
val result = in.sortPartition(1, Order.ASCENDING).mapPartition { ... }
  • First-n

返回一个数据集的前 n 个(任意)元素。First-n 可以应用于一个常规数据集、一个分组数据集或一个分组排序数据集。分组键可以指定为键选择函数、元组位置或 case 类字段。

val in: DataSet[(Int, String)] = // [...]
// regular data set
val result1 = in.first(3)
// grouped data set
val result2 = in.groupBy(0).first(3)
// grouped-sorted data set
val result3 = in.groupBy(0).sortGroup(1, Order.ASCENDING).first(3)

以下转换可用于元组的数据集。

  • MinBy / MaxBy

从一组元组中选择一个元组,这些元组的一个或多个字段的值是最小的(最大的)。用于比较的字段必须是有效的键字段,即可比较。如果多个元组具有最小(最大)字段值,则返回这些元组的任意元组。MinBy (MaxBy)可以应用于一个完整的数据集或一个分组数据集。

val in: DataSet[(Int, Double, String)] = // [...]
// a data set with a single tuple with minimum values for the Int and String fields.
val out: DataSet[(Int, Double, String)] = in.minBy(0, 2)
// a data set with one tuple for each group with the minimum value for the Double field.
val out2: DataSet[(Int, Double, String)] = in.groupBy(2)
                                             .minBy(1)

通过匿名模式匹配从 tuple、case 类和集合中提取,比如下面。

val data: DataSet[(Int, String, Double)] = // [...]
data.map {
  case (id, name, temperature) => // [...]
}

不受 API 开箱即用的支持。要使用这个功能,你应该使用 Scala API 扩展

变换的并行度可以通过 setParallelism(int) 来定义,而 name(String) 可以给变换指定一个自定义的名称,这对调试很有帮助。数据源数据接收器也是如此。

withParameters(Configuration) 传递 Configuration 对象,这些对象可以从用户函数里面的 open() 方法访问。

指定键 #

一些转换(join、coGroup、groupBy)需要在元素集合上定义一个键。其他转换(Reduce、GroupReduce、Aggregate)允许在应用之前将数据按键分组。

一个 DataSet 被分组为:

DataSet<...> input = // [...]
DataSet<...> reduced = input
  .groupBy(/*define key here*/)
  .reduceGroup(/*do something*/);

Flink 的数据模型不是基于键值对的。因此,你不需要将数据集类型物理地打包成键和值。键是"虚拟的”:它们被定义为实际数据上的函数,以指导分组操作符。

为元组定义键 #

最简单的情况是对 Tuple 的一个或多个字段进行分组。

val input: DataSet[(Int, String, Long)] = // [...]
val keyed = input.groupBy(0)

元组在第一个字段(整数类型的字段)上进行分组。

val input: DataSet[(Int, String, Long)] = // [...]
val grouped = input.groupBy(0,1)

在这里,我们将元组放在一个由第一个字段和第二个字段组成的复合键上。

关于嵌套 Tuple 的说明。如果你的 DataSet 有一个嵌套的元组,比如:

DataSet<Tuple3<Tuple2<Integer, Float>,String,Long>> ds;

指定 groupBy(0) 将使系统使用完整的 Tuple2 作为键(以 Integer 和 Float 为键)。如果要"导航"到嵌套的 Tuple2 中,就必须使用字段表达式键,下面将对其进行说明。

使用字段表达式定义键 #

你可以使用基于字符串的字段表达式来引用嵌套的字段,并为分组、排序、连接(join)或 coGrouping 定义键。

字段表达式可以非常容易地选择(嵌套的)复合类型中的字段,如 Tuple 和 POJO 类型。

在下面的例子中,我们有一个有两个字段 “word” 和 “count” 的 WC POJO。要按字段 word 进行分组,我们只需将其名称传递给 groupBy() 函数。

// some ordinary POJO (Plain old Java Object)
class WC(var word: String, var count: Int) {
  def this() { this("", 0L) }
}
val words: DataSet[WC] = // [...]
val wordCounts = words.groupBy("word")

// or, as a case class, which is less typing
case class WC(word: String, count: Int)
val words: DataSet[WC] = // [...]
val wordCounts = words.groupBy("word")

字段表达式语法 #

  • 通过字段名选择 POJO 字段。例如 “user” 指的是 POJO 类型的 “user” 字段。

  • 通过 1-offset 字段名或 0-offset 字段索引来选择 Tuple 字段。例如 “_1” 和 “5” 分别指 Scala Tuple 类型的第一和第六字段。

你可以在 POJO 和 Tuple 中选择嵌套字段。例如 “user.zip” 指的是 POJO 的 “zip” 字段,它存储在 POJO 类型的 “user” 字段中。支持 POJO 和 Tuple 的任意嵌套和混合,如 “_2.user.zip” 或 “user._4.1.zip”。

你可以使用 “_” 通配符表达式选择完整的类型。这也适用于不是 Tuple 或 POJO 类型的类型。

字段表达式示例:

class WC(var complex: ComplexNestedClass, var count: Int) {
  def this() { this(null, 0) }
}

class ComplexNestedClass(
    var someNumber: Int,
    someFloat: Float,
    word: (Long, Long, String),
    hadoopCitizen: IntWritable) {
  def this() { this(0, 0, (0, 0, ""), new IntWritable(0)) }
}

这些都是上面例子代码的有效字段表达式。

  • “count”: WC 类中的计数字段

  • “complex”: 递归选择 POJO 类型 ComplexNestedClasscomplex 字段的所有字段。

  • “complex.word._3”: 选择嵌套的 Tuple3 的最后一个字段。

  • “complex.hadoopCitizen”: 选择 Hadoop IntWritable 类型。

使用键选择函数定义键 #

另一种定义键的方法是"键选择器"函数。键选择器函数将一个元素作为输入,并返回该元素的键。键可以是任何类型的,并且可以从确定性计算中得到。

下面的例子显示了一个简单返回对象字段的键选择函数。

// some ordinary case class
case class WC(word: String, count: Int)
val words: DataSet[WC] = // [...]
val keyed = words.groupBy( _.word )

数据源 #

数据源创建初始数据集,例如从文件或 Java 集合中创建。创建数据集的一般机制是在 InputFormat 后面抽象出来的。Flink 自带了几种内置的格式来从常见的文件格式创建数据集。其中许多格式在 ExecutionEnvironment 上有快捷方法。

基于文件的:

  • readTextFile(path) / TextInputFormat - 读取文件并以字符串形式返回。

  • readTextFileWithValue(path) / TextValueInputFormat - 以行的方式读取文件并以 StringValues 的形式返回。StringValues 是可变字符串。

  • readCsvFile(path) / CsvInputFormat - 解析以逗号(或其他字符)分隔的文件。返回一个由 tuple、case 类对象或 POJOs 组成的 DataSet。支持基本的 java 类型及其对应的 Value 类型作为字段类型。

  • readFileOfPrimitives(path, delimiter) / PrimitiveInputFormat - 使用给定的定界符,解析新行(或其他字符序列)定界的基元数据类型的文件,如 String 或 Integer。

  • readSequenceFile(Key, Value, path) / SequenceFileInputFormat - 创建一个 JobConf 并从指定的路径读取文件,文件类型为 SequenceFileInputFormat,Key 类和 Value 类,并以 Tuple2<Key, Value> 的形式返回。

基于集合的:

  • fromCollection(Iterable) - 从一个 Iterable 创建一个数据集。Iterable 返回的所有元素必须是相同的类型。

  • fromCollection(Iterator) - 从一个 Iterator 创建一个数据集。该类指定了迭代器返回的元素的数据类型。

  • fromElements(elements: _*) - 从给定的对象序列中创建一个数据集。所有对象必须是相同的类型。

  • fromParallelCollection(SplittableIterator) - 从迭代器中并行创建一个数据集。该类指定了迭代器返回的元素的数据类型。

  • generateSequence(from, to) - 在给定的区间内并行生成数字序列。

通用的:

  • readFile(inputFormat, path) / FileInputFormat - 接受一个文件输入格式。

  • createInput(inputFormat) / InputFormat - 接受一个通用的输入格式。

示例:

val env  = ExecutionEnvironment.getExecutionEnvironment

// read text file from local files system
val localLines = env.readTextFile("file:///path/to/my/textfile")

// read text file from an HDFS running at nnHost:nnPort
val hdfsLines = env.readTextFile("hdfs://nnHost:nnPort/path/to/my/textfile")

// read a CSV file with three fields
val csvInput = env.readCsvFile[(Int, String, Double)]("hdfs:///the/CSV/file")

// read a CSV file with five fields, taking only two of them
val csvInput = env.readCsvFile[(String, Double)](
  "hdfs:///the/CSV/file",
  includedFields = Array(0, 3)) // take the first and the fourth field

// CSV input can also be used with Case Classes
case class MyCaseClass(str: String, dbl: Double)
val csvInput = env.readCsvFile[MyCaseClass](
  "hdfs:///the/CSV/file",
  includedFields = Array(0, 3)) // take the first and the fourth field

// read a CSV file with three fields into a POJO (Person) with corresponding fields
val csvInput = env.readCsvFile[Person](
  "hdfs:///the/CSV/file",
  pojoFields = Array("name", "age", "zipcode"))

// create a set from some given elements
val values = env.fromElements("Foo", "bar", "foobar", "fubar")

// generate a number sequence
val numbers = env.generateSequence(1, 10000000)

// read a file from the specified path of type SequenceFileInputFormat
val tuples = env.createInput(HadoopInputs.readSequenceFile(classOf[IntWritable], classOf[Text],
 "hdfs://nnHost:nnPort/path/to/file"))

配置 CSV 解析 #

Flink 为 CSV 解析提供了许多配置选项。

  • lineDelimiter: 字符串指定单个记录的定界符。默认的行定界符是新行字符 '/n'

  • fieldDelimiter: 字符串指定分隔记录字段的定界符。默认的字段定界符是逗号字符 ','

  • includeFields: Array[Int] 定义从输入文件中读取哪些字段(以及忽略哪些字段)。默认情况下,前 n 个字段(由 type() 调用中的类型数定义)会被解析。

  • pojoFields: Array[String] 指定 POJO 的字段,这些字段被映射到 CSV 字段。CSV 字段的解析器会根据 POJO 字段的类型和顺序自动初始化。

  • parseQuotedStrings: 启用引号字符串解析的字符。如果字符串字段的第一个字符是引号字符,那么字符串将被解析为引号字符串(前导或尾部的空白不被修剪)。引号字符串中的字段定界符会被忽略。如果引号字符串字段的最后一个字符不是引号字符,则引号字符串解析失败。如果启用了引号字符串解析,且字段的第一个字符不是引号字符串,则该字符串将被解析为未引号字符串。默认情况下,引号字符串解析被禁用。

  • ignoreComments: 字符串指定一个注解前缀。所有以指定注解前缀开始的行都不会被解析和忽略。默认情况下,没有行被忽略。

  • lenient:布尔值,启用宽松解析。也就是说,不能正确解析的行会被忽略。默认情况下,禁用宽松解析,无效行会引发异常。

  • ignoreFirstLine: Boolean 配置 InputFormat 忽略输入文件的第一行。默认情况下,没有行被忽略。

Input Path 的递归遍历 #

对于基于文件的输入,当输入路径是一个目录时,默认情况下不会枚举嵌套文件。取而代之的是,只读取基础目录内的文件,而忽略嵌套文件。嵌套文件的递归枚举可以通过 recursive.file.enumeration 配置参数启用,就像下面的例子。

// enable recursive enumeration of nested input files
val env  = ExecutionEnvironment.getExecutionEnvironment

// create a configuration object
val parameters = new Configuration

// set the recursive enumeration parameter
parameters.setBoolean("recursive.file.enumeration", true)

// pass the configuration to the data source
env.readTextFile("file:///path/with.nested/files").withParameters(parameters)

读取压缩文件 #

Flink 目前支持输入文件的透明解压,如果这些文件被标记为适当的文件扩展名。特别是,这意味着无需进一步配置输入格式,任何 FileInputFormat 都支持压缩,包括自定义输入格式。请注意,压缩文件可能不会被并行读取,从而影响作业的可扩展性。

下表列出了当前支持的压缩方法。

压缩方法 文件后缀 并行性
DEFLATE .deflate no
GZip .gz, .gzip no
Bzip2 .bz2 no
XZ .xz no

数据接收器 #

数据接收器消费 DataSet 并用于存储或返回它们。数据接收器的操作是用 OutputFormat 来描述的。Flink 带有各种内置的输出格式,这些格式被封装在对 DataSet 的操作后面。

  • writeAsText() / TextOutputFormat –将元素逐行写成 Strings。通过调用每个元素的 toString() 方法获得字符串。
  • writeAsCsv(...) / CsvOutputFormat - 将元组写成逗号分隔的值文件。行和字段定界符是可配置的。每个字段的值来自对象的 toString() 方法。
  • print() / printToErr() - 在标准输出/标准错误流上打印每个元素的 toString() 值。
  • write() / FileOutputFormat - 用于自定义文件输出的方法和基类。支持自定义对象到字节的转换。
  • output()/ OutputFormat - 最通用的输出方法,用于非基于文件的数据接收器(如将结果存储在数据库中)。

一个 DataSet 可以被输入到多个操作中。程序可以写入或打印一个数据集,同时还可以对其进行额外的转换。

示例

标准数据接收器方法:

// text data
val textData: DataSet[String] = // [...]

// write DataSet to a file on the local file system
textData.writeAsText("file:///my/result/on/localFS")

// write DataSet to a file on an HDFS with a namenode running at nnHost:nnPort
textData.writeAsText("hdfs://nnHost:nnPort/my/result/on/localFS")

// write DataSet to a file and overwrite the file if it exists
textData.writeAsText("file:///my/result/on/localFS", WriteMode.OVERWRITE)

// tuples as lines with pipe as the separator "a|b|c"
val values: DataSet[(String, Int, Double)] = // [...]
values.writeAsCsv("file:///path/to/the/result/file", "\n", "|")

// this writes tuples in the text formatting "(a, b, c)", rather than as CSV lines
values.writeAsText("file:///path/to/the/result/file")

// this writes values as strings using a user-defined formatting
values map { tuple => tuple._1 + " - " + tuple._2 }
  .writeAsText("file:///path/to/the/result/file")

本地排序输出 #

数据接收器的输出可以使用元组字段位置字段表达式对指定字段按指定顺序进行本地排序。这适用于每一种输出格式。

下面的示例展示了如何使用该功能。

val tData: DataSet[(Int, String, Double)] = // [...]
val pData: DataSet[(BookPojo, Double)] = // [...]
val sData: DataSet[String] = // [...]

// sort output on String field in ascending order
tData.sortPartition(1, Order.ASCENDING).print()

// sort output on Double field in descending and Int field in ascending order
tData.sortPartition(2, Order.DESCENDING).sortPartition(0, Order.ASCENDING).print()

// sort output on the "author" field of nested BookPojo in descending order
pData.sortPartition("_1.author", Order.DESCENDING).writeAsText(...)

// sort output on the full tuple in ascending order
tData.sortPartition("_", Order.ASCENDING).writeAsCsv(...)

// sort atomic type (String) output in descending order
sData.sortPartition("_", Order.DESCENDING).writeAsText(...)

目前还不支持全局排序输出。

迭代运算符 #

迭代在 Flink 程序中实现了循环。迭代运算符封装了程序的一部分,并反复执行,将一次迭代的结果(部分解)反馈到下一次迭代中。Flink 中的迭代有两种类型。BulkIterationDeltaIteration

本节提供了如何使用这两种运算符的快速示例。查看迭代介绍页面可以获得更详细的介绍。

批量迭代 #

要创建一个 BulkIteration,调用迭代开始的 DataSet 的 iterate(int) 方法,同时指定一个 step 函数。step 函数获取当前迭代的输入 DataSet,并且必须返回一个新的 DataSet。迭代调用的参数是最大的迭代次数,迭代过后要停止。

还有 iterateWithTermination(int) 函数,接受 step 函数,返回两个 DataSets。迭代步骤的结果和一个终止标准。一旦终止准则 DataSet 为空,就会停止迭代。

下面的例子是迭代估计数字 Pi。目标是计算随机点的数量,这些随机点落入单位圆中。在每一次迭代中,都会挑选一个随机点。如果这个点位于单位圆内,我们就递增计数。然后,Pi 的估计值是所得到的计数除以迭代次数乘以 4。

val env = ExecutionEnvironment.getExecutionEnvironment()

// Create initial DataSet
val initial = env.fromElements(0)

val count = initial.iterate(10000) { iterationInput: DataSet[Int] =>
  val result = iterationInput.map { i =>
    val x = Math.random()
    val y = Math.random()
    i + (if (x * x + y * y < 1) 1 else 0)
  }
  result
}

val result = count map { c => c / 10000.0 * 4 }

result.print()

env.execute("Iterative Pi Example")

你也可以查看 K-Means 的例子,它使用 BulkIteration 来聚类一组未标记的点。

增量迭代 #

增量迭代利用了某些算法在每次迭代中不改变解的每个数据点的事实。

除了在每次迭代中反馈的部分解(称为 workset),delta 迭代还保持着跨迭代的状态(称为解集),可以通过 delta 更新。迭代计算的结果是最后一次迭代后的状态。关于 delta 迭代的基本原理,请参考迭代简介

定义 DeltaIteration 与定义 BulkIteration 类似。对于 delta 迭代,两个数据集构成了每次迭代的输入(工作集和解集),并且在每次迭代中产生两个数据集作为结果(新工作集,解集 delta)。

要创建一个 DeltaIteration 在初始解集上调用 iterateDelta(initialWorkset,maxIterations,key)step 函数需要两个参数。(solutionSet, workset), 并且必须返回两个值: (solutionSetDelta, newWorkset).

下面是一个 delta 迭代语法的例子。

// read the initial data sets
val initialSolutionSet: DataSet[(Long, Double)] = // [...]

val initialWorkset: DataSet[(Long, Double)] = // [...]

val maxIterations = 100
val keyPosition = 0

val result = initialSolutionSet.iterateDelta(initialWorkset, maxIterations, Array(keyPosition)) {
  (solution, workset) =>
    val candidateUpdates = workset.groupBy(1).reduceGroup(new ComputeCandidateChanges())
    val deltas = candidateUpdates.join(solution).where(0).equalTo(0)(new CompareChangesToCurrent())

    val nextWorkset = deltas.filter(new FilterByThreshold())

    (deltas, nextWorkset)
}

result.writeAsCsv(outputPath)

env.execute()

在函数中对数据对象进行操作 #

Flink 的运行时以 Java 对象的形式与用户函数交换数据。函数从运行时接收输入对象作为方法参数,并返回输出对象作为结果。因为这些对象是由用户函数和运行时代码访问的,所以理解和遵循用户代码如何访问,即读取和修改这些对象的规则是非常重要的。

用户函数以常规方法参数(如 MapFunction)或通过 Iterable 参数(如 GroupReduceFunction)从 Flink 的运行时接收对象。我们把运行时传递给用户函数的对象称为输入对象。用户函数可以将对象作为方法返回值(像 MapFunction)或通过 Collector(像 FlatMapFunction)发射给 Flink 运行时。我们将用户函数向运行时发射的对象称为输出对象。

Flink 的 DataSet API 具有两种模式,它们在 Flink 的运行时如何创建或重用输入对象方面有所不同。这种行为会影响用户函数如何与输入和输出对象交互的保证和约束。下面的章节定义了这些规则,并给出了编写安全用户函数代码的编码指南。

禁用对象重用(DEFAULT) #

默认情况下,Flink 在禁用对象重用模式下运行。这种模式可以保证函数在函数调用中总是接收新的输入对象。对象重用禁用模式能提供更好的保证,使用起来也更安全。但是,它有一定的处理开销,可能会引起较高的 Java 垃圾收集活动。下表解释了在禁用对象重用模式下,用户函数如何访问输入和输出对象。

操作 保证和限制
读取输入对象 在一个方法调用中,保证输入对象的值不会改变。这包括由 Iterable 服务的对象。例如,在 List 或 Map 中收集由 Iterable 服务的输入对象是安全的。请注意,在方法调用离开后,对象可能会被修改。跨函数调用记忆对象是不安全的。
修改输入对象 你可以修改输入对象。
发射输入对象 你可以发射输入对象。输入对象的值可能在发射后发生变化。读取发射后的输入对象是不安全的。
读取输出对象 给予收集器的对象或作为方法结果返回的对象可能已经改变了其值。读取输出对象是不安全的。
修改输出对象 你可以在对象被发射后对其进行修改,然后再次发射。

对象重用禁用(默认)模式的编码准则。

  • 不要跨方法调用记忆和读取输入对象。
  • 不要在发出对象后读取对象。

启用对象重用 #

在启用对象重用模式下,Flink 的运行时会尽量减少对象实例化的数量。这可以提高性能,并且可以减少 Java 垃圾收集的压力。通过调用 ExecutionConfig.enableObjectReuse() 激活对象重用启用模式。下表解释了在启用对象重用模式下,用户函数如何访问输入和输出对象。

操作 保证和限制
读取作为常规方法参数接收的输入对象 作为常规方法参数接收的输入对象在一次函数调用中不被修改。对象可能在方法调用结束后被修改。跨函数调用记忆对象是不安全的。
读取从 Iterable 参数中接收到的输入对象 从 Iterable 中接收到的输入对象只在调用 next()方法之前有效。一个 Iterable 或 Iterator 可以多次服务于同一个对象实例。记住从 Iterable 接收的输入对象是不安全的,例如,把它们放在 List 或 Map 中。
修改输入对象 除了 MapFunction、FlatMapFunction、MapPartitionFunction、GroupReduceFunction、GroupCombineFunction、CoGroupFunction 和 InputFormat.next(reuse)的输入对象外,你不得修改输入对象。
发射输入对象 除了 MapFunction、FlatMapFunction、MapPartitionFunction、GroupReduceFunction、GroupCombineFunction、CoGroupFunction 和 InputFormat.next(重用)的输入对象外,你不得发射输入对象。
读取输出对象 一个被交给 Collector 或作为方法结果返回的对象可能已经改变了它的值。读取输出对象是不安全的。
修改输出对象 你可以修改一个输出对象并再次发出它。

启用对象重用的编码准则。

  • 不记忆从 Iterable 接收的输入对象。
  • 不记忆和读取跨方法调用的输入对象。
  • 除了 MapFunction、FlatMapFunction、MapPartitionFunction、GroupReduceFunction、GroupCombineFunction、CoGroupFunction 和 InputFormat.next(reuse)的输入对象外,不要修改或发出输入对象。
  • 为了减少对象实例化,你总是可以发出一个专门的输出对象,这个对象被反复修改,但从不读取。

调试 #

在分布式集群中的大型数据集上运行数据分析程序之前,最好确保所实现的算法能够按照预期的方式运行。因此,实现数据分析程序通常是一个检查结果、调试和改进的渐进过程。

Flink 提供了一些不错的功能,通过支持 IDE 内的本地调试、注入测试数据和收集结果数据,大大简化了数据分析程序的开发过程。本节给大家一些提示,如何简化 Flink 程序的开发。

本地执行环境 #

LocalEnvironment 在它创建的同一个 JVM 进程中启动 Flink 系统。如果你从 IDE 中启动 LocalEnvironment,你可以在代码中设置断点,轻松调试你的程序。

LocalEnvironment 的创建和使用方法如下。

val env = ExecutionEnvironment.createLocalEnvironment()

val lines = env.readTextFile(pathToTextFile)
// build your program

env.execute()

收集数据源和接收器 #

为分析程序提供输入并检查其输出,如果通过创建输入文件和读取输出文件来完成,是很麻烦的。Flink 具有特殊的数据源和接收器,这些数据源和接收器由 Java 集合支持,以方便测试。一旦程序经过测试,源和接收器可以很容易地被从 HDFS 等外部数据存储中读取/写入的源和接收器所替代。

集合数据源可以使用以下方式。

val env = ExecutionEnvironment.createLocalEnvironment()

// Create a DataSet from a list of elements
val myInts = env.fromElements(1, 2, 3, 4, 5)

// Create a DataSet from any Collection
val data: Seq[(String, Int)] = ...
val myTuples = env.fromCollection(data)

// Create a DataSet from an Iterator
val longIt: Iterator[Long] = ...
val myLongs = env.fromCollection(longIt)

注:目前,集合数据源要求数据类型和迭代器实现 Serializable。此外,集合数据源不能并行执行( parallelism = 1)。

语义注解 #

语义注解可以用来给 Flink 提供关于函数行为的提示。它们告诉系统,函数读取并评估了函数输入的哪些字段,以及它将哪些字段从输入转发到输出,而没有进行修改。语义注解是加快执行速度的有力手段,因为它们允许系统推理出在多个操作中重复使用排序顺序或分区的问题。使用语义注解最终可能会使程序免于不必要的数据洗牌或不必要的排序,并显著提高程序的性能。

注意:语义注解的使用是可选的。然而,在提供语义注解时,保守地使用语义注解是绝对关键的! 不正确的语义注解将导致 Flink 对你的程序做出不正确的假设,并可能最终导致不正确的结果。如果一个操作符的行为不是明确可预测的,就不应该提供注解。请仔细阅读文档。

目前支持以下语义注解。

转发字段注解

转发字段信息声明了未被修改的输入字段被函数转发到输出中的同一位置或另一位置。该信息被优化器用来推断数据属性(如排序或分区)是否被函数保留。对于对输入元素组进行操作的函数,如 GroupReduce、GroupCombine、CoGroup 和 MapPartition,所有被定义为转发字段的字段必须总是从同一个输入元素联合转发。由组智函数发出的每个元素的转发字段可能来源于函数的输入组的不同元素。

字段转发信息使用字段表达式来指定。在输出中转发到同一位置的字段可以通过其位置来指定。指定的位置必须对输入和输出的数据类型有效,并具有相同的类型。例如字符串 “f2 “声明 Java 输入元组的第三个字段总是等于输出元组中的第三个字段。

将输入中的源字段和输出中的目标字段指定为字段表达式,就可以声明未修改的字段转发到输出中的另一个位置。字符串 "f0->f2" 表示将 Java 输入元组的第一个字段不变的复制到 Java 输出元组的第三个字段。通配符表达式 * 可以用来指代整个输入或输出类型,即 "f0->*" 表示一个函数的输出总是等于其 Java 输入元组的第一个字段。

多个转发字段可以在一个字符串中用分号隔开声明为 "f0; f2->f1; f3->f2",也可以在单独的字符串中声明为 “f0”、“f2->f1”、“f3->f2”。当指定转发字段时,不要求所有的转发字段都声明,但所有的声明必须正确。

转发字段信息可以通过在函数类定义上附加 Java 注解来声明,或者在调用 DataSet 上的函数后将其作为操作符参数传递,如下图所示。

函数类注解

  • @ForwardedFields 用于单输入的函数,如 Map 和 Reduce。
  • @ForwardedFieldsFirst 代表有两个输入的函数的第一个输入,如 Join 和 CoGroup。
  • @ForwardedFieldsSecond 代表有两个输入的函数的第二个输入,如 Join 和 CoGroup。

操作符参数

  • data.map(myMapFnc).withForwardedFields() 用于单输入的函数,如 Map 和 Reduce。
  • data1.join(data2).where().equalTo().with(myJoinFnc).withForwardFieldsFirst() 用于有两个输入的函数的第一个输入,如 Join 和 CoGroup。
  • data1.join(data2).where().equalTo().with(myJoinFnc).withForwardFieldsSecond() 用于有两个输入的函数的第二个输入,如 Join 和 CoGroup。

请注意,不可能覆盖通过操作符参数指定为类注解的字段前向信息。

例子:在函数的第二个输入端,如 Join 和 CoGroup,请注意不能覆盖通过运算符参数指定的类注解的字段前向信息。

下面的例子显示了如何使用函数类注解来声明转发的字段信息。

@ForwardedFields("_1->_3")
class MyMap extends MapFunction[(Int, Int), (String, Int, Int)]{
   def map(value: (Int, Int)): (String, Int, Int) = {
    return ("foo", value._2 / 2, value._1)
  }
}

非转发字段

非转发字段信息声明了所有在函数输出中不保留在同一位置的字段。所有其他字段的值都被认为保留在输出的同一位置。因此,非转发字段信息与转发字段信息是相反的。分组运算符(如 GroupReduce、GroupCombine、CoGroup 和 MapPartition)的非转发字段信息必须满足与转发字段信息相同的要求。

重要:非转发字段信息的规范是可选的。但是如果使用,必须指定 ALL! 非转发字段,因为所有其他字段都被认为是原地转发的。将一个转发字段声明为非转发字段是安全的。

非转发字段被指定为字段表达式的列表。这个列表既可以是由分号分隔的字段表达式组成的单个字符串,也可以是多个字符串。例如 “f1; f3” 和 “f1”、“f3” 都声明 Java 元组的第二个和第四个字段不保留在原地,其他所有字段都保留在原地。非前向字段信息只能为输入和输出类型相同的函数指定。

非转发字段信息是作为函数类注解使用以下注解来指定的。

  • @NonForwardedFields 用于单个输入函数,如 Map 和 Reduce。
  • @NonForwardedFieldsFirst 用于有两个输入的函数的第一个输入,如 Join 和 CoGroup。
  • @NonForwardedFieldsSecond 用于函数的第二个输入,如 Join 和 CoGroup。

例子

下面的例子显示了如何声明非转发字段信息。

@NonForwardedFields("_2") // second field is not forwarded
class MyMap extends MapFunction[(Int, Int), (Int, Int)]{
  def map(value: (Int, Int)): (Int, Int) = {
    return (value._1, value._2 / 2)
  }
}

读取字段

读取字段信息声明了所有被函数访问和评估的字段,也就是说,所有被函数用来计算结果的字段。例如,在条件语句中被评估的字段或用于计算的字段必须在指定读取字段信息时被标记为读取。仅仅是未经修改就转发到输出而不评估其值的字段,或者根本没有被访问的字段都不被认为是读。

重要:读取字段信息的指定是可选的。但是如果使用,必须指定 ALL! 读取字段。将一个非读字段声明为读字段是安全的。

读取字段被指定为字段表达式的列表。这个列表可以是一个由分号分隔的字段表达式组成的单个字符串,也可以是多个字符串。例如 “f1; f3” 和 “f1”、“f3” 都声明 Java 元组的第二和第四字段被函数读取和评估。

读取字段信息是以函数类注解的形式指定的,使用以下注解。

  • @ReadFields 用于单输入函数,如 Map 和 Reduce。
  • @ReadFieldsFirst 用于有两个输入的函数的第一个输入,如 Join 和 CoGroup。
  • @ReadFieldsSecond 用于有两个输入的函数的第二个输入,如 Join 和 CoGroup。

示例:

下面的例子显示了如何声明读取字段信息。

@ReadFields("_1; _4") // _1 and _4 are read and evaluated by the function.
class MyMap extends MapFunction[(Int, Int, Int, Int), (Int, Int)]{
   def map(value: (Int, Int, Int, Int)): (Int, Int) = {
    if (value._1 == 42) {
      return (value._1, value._2)
    } else {
      return (value._4 + 10, value._2)
    }
  }
}

广播变量 #

广播变量允许你在操作的常规输入之外,将一个数据集提供给操作的所有并行实例。这对辅助数据集或数据依赖性参数化很有用。然后,该数据集将作为一个集合在操作者处被访问。

  • 广播:广播集通过 withBroadcastSet(DataSet,String) 按名称注册,并通过
  • 访问方式:通过目标操作者处的 getRuntimeContext().getBroadcastVariable(String) 访问。
// 1. The DataSet to be broadcast
val toBroadcast = env.fromElements(1, 2, 3)

val data = env.fromElements("a", "b")

data.map(new RichMapFunction[String, String]() {
    var broadcastSet: Traversable[String] = null

    override def open(config: Configuration): Unit = {
      // 3. Access the broadcast DataSet as a Collection
      broadcastSet = getRuntimeContext().getBroadcastVariable[String]("broadcastSetName").asScala
    }

    def map(in: String): String = {
        ...
    }
}).withBroadcastSet(toBroadcast, "broadcastSetName") // 2. Broadcast the DataSet

在注册和访问广播数据集时,确保名称(前面例子中的 broadcastSetName)匹配。关于完整的示例程序,可以看一下 KMeans 算法

注意:由于广播变量的内容在每个节点上都保存在内存中,所以它不应该变得太大。对于像标量值这样简单的东西,你可以简单地将参数作为函数闭包的一部分,或者使用 withParameters(...) 方法来传递配置。

分布式缓存 #

Flink 提供了一个类似于 Apache Hadoop 的分布式缓存,以使用户函数的并行实例可以在本地访问文件。该功能可用于共享包含静态外部数据的文件,如字典或机器学习的回归模型。

缓存的工作原理如下。程序在其 ExecutionEnvironment 中以特定的名称将本地或远程文件系统(如 HDFS 或 S3)的文件或目录注册为缓存文件。当程序执行时,Flink 会自动将该文件或目录复制到所有工作者的本地文件系统中。用户函数可以查找指定名称下的文件或目录,并从工作者的本地文件系统中访问它。

分布式缓存的使用方法如下。

ExecutionEnvironment 中注册文件或目录。

val env = ExecutionEnvironment.getExecutionEnvironment

// register a file from HDFS
env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile")

// register a local executable file (script, executable, ...)
env.registerCachedFile("file:///path/to/exec/file", "localExecFile", true)

// define your program and execute
...
val input: DataSet[String] = ...
val result: DataSet[Integer] = input.map(new MyMapper())
...
env.execute()

在一个用户函数(这里是 MapFunction)中访问缓存文件。该函数必须扩展一个 RichFunction 类,因为它需要访问 RuntimeContext。

// extend a RichFunction to have access to the RuntimeContext
class MyMapper extends RichMapFunction[String, Int] {

  override def open(config: Configuration): Unit = {

    // access cached file via RuntimeContext and DistributedCache
    val myFile: File = getRuntimeContext.getDistributedCache.getFile("hdfsFile")
    // read the file (or navigate the directory)
    ...
  }

  override def map(value: String): Int = {
    // use content of cached file
    ...
  }
}

向函数传递参数 #

可以使用构造函数或 withParameters(Configuration) 方法将参数传递给函数。参数会被序列化为函数对象的一部分,并传送给所有并行任务实例。

通过构造函数

val toFilter = env.fromElements(1, 2, 3)

toFilter.filter(new MyFilter(2))

class MyFilter(limit: Int) extends FilterFunction[Int] {
  override def filter(value: Int): Boolean = {
    value > limit
  }
}

通过 withParameters(配置)

本方法以一个 Configuration 对象作为参数,它将被传递给富函数open() 方法。配置对象是一个从 String 键到不同值类型的 Map。

val toFilter = env.fromElements(1, 2, 3)

val c = new Configuration()
c.setInteger("limit", 2)

toFilter.filter(new RichFilterFunction[Int]() {
    var limit = 0

    override def open(config: Configuration): Unit = {
      limit = config.getInteger("limit", 0)
    }

    def filter(in: Int): Boolean = {
        in > limit
    }
}).withParameters(c)

在全局范围内通过 ExecutionConfig

Flink 还允许将自定义配置值传递到环境的 ExecutionConfig 接口。由于执行配置可以在所有(丰富的)用户函数中访问,因此自定义配置将在所有函数中全局可用。

设置一个自定义的全局配置:

val env = ExecutionEnvironment.getExecutionEnvironment
val conf = new Configuration()
conf.setString("mykey", "myvalue")
env.getConfig.setGlobalJobParameters(conf)

请注意,你也可以传递一个扩展 ExecutionConfig.GlobalJobParameters 类的自定义类作为全局作业参数给执行配置。该接口允许实现 Map<String, String> toMap() 方法,该方法将在 web 前端显示来自配置的值。

从全局配置中访问值

全局工作参数中的对象在系统中的很多地方都可以访问。所有实现 RichFunction 接口的用户函数都可以通过运行时上下文访问。

public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {

    private String mykey;
    @Override
    public void open(Configuration parameters) throws Exception {
      super.open(parameters);
      ExecutionConfig.GlobalJobParameters globalParams = getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
      Configuration globConf = (Configuration) globalParams;
      mykey = globConf.getString("mykey", null);
    }
    // ... more here ...

原文链接: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/batch/