Wait the light to fall

Dataset 变换

焉知非鱼

Dataset Transformations

DataSet 转换

本文档深入介绍了 DataSets 上可用的转换。关于 Flink Java API 的一般介绍,请参考编程指南

对于密集索引的数据集中的压缩元素,请参考压缩元素指南

Map #

Map 转换将用户定义的映射函数应用于 DataSet 的每个元素。它实现了一对一的映射,也就是说,函数必须准确地返回一个元素。

下面的代码将一个由整数对组成的 DataSet 转化为一个由整数组成的 DataSet。

val intPairs: DataSet[(Int, Int)] = // [...]
val intSums = intPairs.map { pair => pair._1 + pair._2 }

FlatMap #

FlatMap 转换在 DataSet 的每个元素上应用了一个用户定义的 flat-map 函数。这种映射函数的变体可以为每个输入元素返回任意多个结果元素(包括没有)。

下面的代码将一个文本行的 DataSet 转换为一个单词的 DataSet。

val textLines: DataSet[String] = // [...]
val words = textLines.flatMap { _.split(" ") }

MapPartition #

MapPartition 在一次函数调用中转换一个并行分区。map-partition 函数以 Iterable 的形式获取分区,并可以产生任意数量的结果值。每个分区中元素的数量取决于平行度和之前的操作。

下面的代码将文本行的 DataSet 转换为每个分区的计数 DataSet。

val textLines: DataSet[String] = // [...]
// Some is required because the return value must be a Collection.
// There is an implicit conversion from Option to a Collection.
val counts = texLines.mapPartition { in => Some(in.size) }

Filter #

过滤器转换将用户定义的过滤器函数应用于 DataSet 的每个元素,并且只保留那些函数返回为真的元素。

以下代码从数据集中删除所有小于零的整数。

val intNumbers: DataSet[Int] = // [...]
val naturalNumbers = intNumbers.filter { _ > 0 }

重要:系统假设函数不会修改应用谓词的元素。违反这个假设会导致错误的结果。

元组数据集的投影(Projection) #

Project 转换删除或移动 Tuple DataSet 的 Tuple 字段。project(int...) 方法通过其索引选择应该保留的 Tuple 字段,并定义它们在输出 Tuple 中的顺序。

投影(Projection)不需要定义用户函数。

下面的代码显示了在 DataSet 上应用 Project 转换的不同方法。

DataSet<Tuple3<Integer, Double, String>> in = // [...]
// converts Tuple3<Integer, Double, String> into Tuple2<String, Integer>
DataSet<Tuple2<String, Integer>> out = in.project(2,0);
# scala
Not supported.

分组数据集上的变换 #

reduce 操作可以对分组的数据集进行操作。指定用于分组的键可以通过多种方式进行。

  • 键表达式
  • 键选择器函数
  • 一个或多个字段位置键(仅限元组数据集)。
  • case 类字段(仅 case 类)

请看一下 reduce 的例子,看看如何指定分组键。

换算分组数据集 #

应用于分组数据集的 Reduce 转换,使用用户定义的 Reduce 函数将每个分组换算为一个元素。对于每一组输入元素,一个 Reduce 函数将成对的元素连续组合成一个元素,直到每组只剩下一个元素。

请注意,对于一个 ReduceFunction,返回对象的键字段应该与输入值相匹配。这是因为 reduce 是隐式可组合的,当传递给 reduce 运算符时,从 combine 运算符发出的对象又是按键分组的。

在按键表达式分组的数据集上进行 Reduce 操作 #

键表达式指定了 DataSet 中每个元素的一个或多个字段。每个键表达式都是一个公共字段的名称或一个 getter 方法。点号可以用来深入到对象中。键表达式 "*" 可以选择所有字段。下面的代码展示了如何使用键表达式对 POJO 数据集进行分组,并使用 reduce 函数对其进行换算。

// some ordinary POJO
class WC(val word: String, val count: Int) {
  def this() {
    this(null, -1)
  }
  // [...]
}

val words: DataSet[WC] = // [...]
val wordCounts = words.groupBy("word").reduce {
  (w1, w2) => new WC(w1.word, w1.count + w2.count)
}

对按键选择器分组的数据集进行换算 #

键选择器函数从数据集的每个元素中提取一个键值。提取的键值用于对 DataSet 进行分组。下面的代码展示了如何使用键选择器函数对 POJO 数据集进行分组,并使用 reduce 函数对其进行换算。

// some ordinary POJO
class WC(val word: String, val count: Int) {
  def this() {
    this(null, -1)
  }
  // [...]
}

val words: DataSet[WC] = // [...]
val wordCounts = words.groupBy { _.word } reduce {
  (w1, w2) => new WC(w1.word, w1.count + w2.count)
}

对按字段位置键分组的数据集进行换算(仅元组数据集) #

字段位置键指定了一个 Tuple DataSet 的一个或多个字段,这些字段被用作分组键。下面的代码显示了如何使用字段位置键和应用 reduce 函数。

val tuples = DataSet[(String, Int, Double)] = // [...]
// group on the first and second Tuple field
val reducedTuples = tuples.groupBy(0, 1).reduce { ... }

对按 case 类字段分组的数据集进行换算 #

当使用 Case Classes 时,你也可以使用字段的名称来指定分组键。

case class MyClass(val a: String, b: Int, c: Double)
val tuples = DataSet[MyClass] = // [...]
// group on the first and second field
val reducedTuples = tuples.groupBy("a", "b").reduce { ... }

在分组数据集上进行分组换算 #

应用在分组 DataSet 上的 GroupReduce 转换,会对每个组调用用户定义的 group-reduce 函数。这与 Reduce 之间的区别在于,用户定义的函数可以一次性获得整个组。该函数是在一个组的所有元素上用一个 Iterable 调用的,并且可以返回任意数量的结果元素。

在按字段位置键分组的数据集上进行分组 Reduce(只适用于元组数据集) #

下面的代码显示了如何从一个按 Integer 分组的 DataSet 中删除重复的字符串。

val input: DataSet[(Int, String)] = // [...]
val output = input.groupBy(0).reduceGroup {
      (in, out: Collector[(Int, String)]) =>
        in.toSet foreach (out.collect)
    }

对按键表达式、键选择器函数或 case 类字段分组的数据集进行分组换算 #

类似于 Reduce 变换中的键表达式键选择器函数case 类字段的工作。

对排序组进行 GroupReduce #

一个 group-reduce 函数使用一个 Iterable 访问一个组的元素。可选地,Iterable 可以按照指定的顺序输出一个组的元素。在许多情况下,这有助于降低用户定义的 group-reduce 函数的复杂性,并提高其效率。

下面的代码显示了另一个例子,如何在一个由整数分组并按 String 排序的 DataSet 中删除重复的 String。

val input: DataSet[(Int, String)] = // [...]
val output = input.groupBy(0).sortGroup(1, Order.ASCENDING).reduceGroup {
      (in, out: Collector[(Int, String)]) =>
        var prev: (Int, String) = null
        for (t <- in) {
          if (prev == null || prev != t)
            out.collect(t)
            prev = t
        }
    }

注意:如果在 reduce 操作之前,使用运算符的基于排序的执行策略建立了分组,那么 GroupSort 通常是免费的。

可组合的 GroupReduceFunctions #

与 reduce 函数不同,group-reduce 函数是不可隐式组合的。为了使一个分组换算函数可以组合,它必须实现 GroupCombineFunction 接口。

重要:GroupCombineFunction 接口的通用输入和输出类型必须等于 GroupReduceFunction 的通用输入类型,如下例所示。

// Combinable GroupReduceFunction that computes two sums.
class MyCombinableGroupReducer
  extends GroupReduceFunction[(String, Int), String]
  with GroupCombineFunction[(String, Int), (String, Int)]
{
  override def reduce(
    in: java.lang.Iterable[(String, Int)],
    out: Collector[String]): Unit =
  {
    val r: (String, Int) =
      in.iterator.asScala.reduce( (a,b) => (a._1, a._2 + b._2) )
    // concat key and sum and emit
    out.collect (r._1 + "-" + r._2)
  }

  override def combine(
    in: java.lang.Iterable[(String, Int)],
    out: Collector[(String, Int)]): Unit =
  {
    val r: (String, Int) =
      in.iterator.asScala.reduce( (a,b) => (a._1, a._2 + b._2) )
    // emit tuple with key and sum
    out.collect(r)
  }
}

在分组数据集上进行分组合并 #

GroupCombine 变换是可组合的 GroupReduceFunctioncombine 步骤的泛化形式。与此相反,GroupReduce 函数中的 combine 步骤只允许从输入类型 I 到输出类型 I 的组合。这是因为 GroupReduce 函数中的 reduce 步骤期望输入类型 I。

在某些应用中,希望在执行额外的转换(例如减少数据大小)之前,将一个数据集合并成中间格式。这可以通过一个 CombineGroup 转换来实现,而且成本很低。

注意:对分组数据集的 GroupCombine 是在内存中以贪婪的策略执行的,它可能不会一次处理所有数据,而是分多个步骤进行。它也是在各个分区上执行的,而不像 GroupReduce 变换那样进行数据交换。这可能会导致部分结果。

下面的例子演示了如何使用 CombineGroup 变换来实现另一种 WordCount

val input: DataSet[String] = [..] // The words received as input

val combinedWords: DataSet[(String, Int)] = input
  .groupBy(0)
  .combineGroup {
    (words, out: Collector[(String, Int)]) =>
        var key: String = null
        var count = 0

        for (word <- words) {
            key = word
            count += 1
        }
        out.collect((key, count))
}

val output: DataSet[(String, Int)] = combinedWords
  .groupBy(0)
  .reduceGroup {
    (words, out: Collector[(String, Int)]) =>
        var key: String = null
        var sum = 0

        for ((word, sum) <- words) {
            key = word
            sum += count
        }
        out.collect((key, sum))
}

上面的另一种 WordCount 实现演示了 GroupCombine 如何在执行 GroupReduce 转换之前组合单词。上面的例子只是一个概念证明。请注意,组合步骤如何改变 DataSet 的类型,通常在执行 GroupReduce 之前需要进行额外的 Map 转换。

在分组元组数据集上进行聚合 #

有一些常用的聚合操作是经常使用的。Aggregate 转换提供了以下内置的聚合函数。

  • Sum,
  • Min,
  • Max.

Aggregate 变换只能应用在 Tuple 数据集上,并且只支持字段位置键进行分组。

下面的代码显示了如何在按字段位置键分组的数据集上应用"聚合"变换。

val input: DataSet[(Int, String, Double)] = // [...]
val output = input.groupBy(1).aggregate(SUM, 0).and(MIN, 2)

要在一个 DataSet 上应用多个聚合,必须在第一个聚合之后使用 .and() 函数,也就是说 .aggregary(SUM, 0).and(MIN, 2) 会产生原始 DataSet 的字段 0 和字段 2 的最小值之和。与此相反,.aggregary(SUM,0).aggregary(MIN,2) 将在一个聚合上应用一个聚合。在给定的示例中,它将在计算字段 0 与字段 1 分组后产生字段 2 的最小值。

注意:聚合函数集将在未来得到扩展。

对分组元组数据集的 MinBy / MaxBy 函数 #

MinBy (MaxBy) 转换为每组元组选择一个元组。被选择的元组是一个或多个指定字段的值是最小(最大)的元组。用于比较的字段必须是有效的关键字段,即可比较的字段。如果多个元组具有最小(最大)字段值,则返回这些元组的任意元组。

下面的代码显示了如何从 DataSet<Tuple3<Integer, String, Double>> 中选择具有相同 String 值的每组元组的 Integer 和 Double 字段最小值的元组。

val input: DataSet[(Int, String, Double)] = // [...]
val output: DataSet[(Int, String, Double)] = input
                                   .groupBy(1)  // group DataSet on second field
                                   .minBy(0, 2) // select tuple with minimum values for first and third field.

换算整个数据集 #

Reduce 转换将用户定义的 reduce 函数应用于一个数据集的所有元素。随后,reduce 函数将元素对组合成一个元素,直到只剩下一个元素。

下面的代码显示了如何对一个整数数据集的所有元素进行求和。

val intNumbers = env.fromElements(1,2,3)
val sum = intNumbers.reduce (_ + _)

使用 Reduce 转换换算一个完整的 DataSet 意味着最后的 Reduce 操作不能并行完成。然而,reduce 函数是可以自动组合的,因此 Reduce 转换不会限制大多数用例的可扩展性。

对整个数据集进行分组换算 #

GroupReduce 转换将用户定义的 group-reduce 函数应用于 DataSet 的所有元素。group-reduce 可以遍历 DataSet 的所有元素,并返回任意数量的结果元素。

下面的示例展示了如何在一个完整的 DataSet 上应用 GroupReduce 转换。

val input: DataSet[Int] = // [...]
val output = input.reduceGroup(new MyGroupReducer())

注意:如果 group-reduce 函数不可组合,那么在一个完整的 DataSet 上的 GroupReduce 转换不能并行完成。因此,这可能是一个非常耗费计算的操作。请参阅上面的"可组合的 GroupReduceFunctions" 部分,了解如何实现可组合的 group-reduce 函数。

在完整的数据集上进行分组合并(GroupCombine) #

在一个完整的 DataSet 上的 GroupCombine 的工作原理类似于在一个分组的 DataSet 上的 GroupCombine。在所有节点上对数据进行分区,然后以贪婪的方式进行合并(即只有适合内存的数据才会一次性合并)。

在完整的 Tuple 数据集上进行聚合 #

有一些常用的聚合操作是经常使用的。Aggregate 转换提供了以下内置的聚合函数。

  • Sum,
  • Min, 和
  • Max.

Aggregate 变换只能应用于 Tuple 数据集。

下面的代码显示了如何在一个完整的数据集上应用聚合转换。

val input: DataSet[(Int, String, Double)] = // [...]
val output = input.aggregate(SUM, 0).and(MIN, 2)

注意:扩展支持的聚合函数集是我们的路线图。

在完整的元组数据集上实现 MinBy / MaxBy #

MinBy (MaxBy) 转换从一个元组数据集中选择一个元组。被选择的元组是一个或多个指定字段的值是最小(最大)的元组。用于比较的字段必须是有效的键字段,即可比较的字段。如果多个元组具有最小(最大)字段值,则返回这些元组的任意元组。

以下代码显示了如何从 DataSet<Tuple3<Integer, String, Double>> 中选择具有 Integer 和 Double 字段最大值的元组。

val input: DataSet[(Int, String, Double)] = // [...]
val output: DataSet[(Int, String, Double)] = input                          
                                   .maxBy(0, 2) // select tuple with maximum values for first and third field.

Distinct #

Distinct 转换计算源 DataSet 中不同元素的 DataSet。下面的代码从 DataSet 中删除所有重复的元素。

val input: DataSet[(Int, String, Double)] = // [...]
val output = input.distinct()

也可以使用以下方法改变 DataSet 中元素的区分方式。

  • 一个或多个字段位置键(仅元组数据集)。
  • 一个键选择器函数,或
  • 一个键表达式

用字段位置键去重(Distinct) #

val input: DataSet[(Int, Double, String)] = // [...]
val output = input.distinct(0,2)

用 KeySelector 函数去重(Distinct) #

val input: DataSet[Int] = // [...]
val output = input.distinct {x => Math.abs(x)}

用键表达式去重(Distinct) #

// some ordinary POJO
case class CustomType(aName : String, aNumber : Int) { }

val input: DataSet[CustomType] = // [...]
val output = input.distinct("aName", "aNumber")

也可以用通配符表示使用所有字段:

// some ordinary POJO
val input: DataSet[CustomType] = // [...]
val output = input.distinct("_")

Join #

Join 转换将两个 DataSets 连接成一个 DataSet。两个数据集的元素在一个或多个键上进行连接(join),这些键可以通过使用

  • 键选择器函数
  • 一个或多个字段位置键(仅限 Tuple DataSet)。
  • case 类字段

有几种不同的方法来执行 Join 转换,如下所示。

默认的 Join (Join into Tuple2) #

默认的 Join 变换会产生一个新的 Tuple DataSet,它有两个字段。每个元组在第一个元组字段中持有第一个输入 DataSet 的 join 元素,在第二个字段中持有第二个输入 DataSet 的匹配元素。

下面的代码显示了一个使用字段位置键的默认 Join 转换。

val input1: DataSet[(Int, String)] = // [...]
val input2: DataSet[(Double, Int)] = // [...]
val result = input1.join(input2).where(0).equalTo(1)

用 Join 函数连接 #

Join 转换也可以调用用户定义的 join 函数来处理连接(joining)元组。join 函数接收第一个输入 DataSet 的一个元素和第二个输入 DataSet 的一个元素,并准确返回一个元素。

下面的代码使用键选择器函数执行了一个带有自定义 java 对象的 DataSet 和一个 Tuple DataSet 的连接,并展示了如何使用用户定义的连接(join)函数。

case class Rating(name: String, category: String, points: Int)

val ratings: DataSet[Ratings] = // [...]
val weights: DataSet[(String, Double)] = // [...]

val weightedRatings = ratings.join(weights).where("category").equalTo(0) {
  (rating, weight) => (rating.name, rating.points * weight._2)
}

用 Flat-Join 函数连接 #

类似于 Map 和 FlatMap,FlatJoin 的行为方式与 Join 相同,但它不是返回一个元素,而是可以返回(收集)、零个、一个或多个元素。

case class Rating(name: String, category: String, points: Int)

val ratings: DataSet[Ratings] = // [...]
val weights: DataSet[(String, Double)] = // [...]

val weightedRatings = ratings.join(weights).where("category").equalTo(0) {
  (rating, weight, out: Collector[(String, Double)]) =>
    if (weight._2 > 0.1) out.collect(rating.name, rating.points * weight._2)
}

用 Projection (Java Only) 连接 #

Join 变换可以使用投影(projection)构造结果元组,如下所示:

DataSet<Tuple3<Integer, Byte, String>> input1 = // [...]
DataSet<Tuple2<Integer, Double>> input2 = // [...]
DataSet<Tuple4<Integer, String, Double, Byte>>
            result =
            input1.join(input2)
                  // key definition on first DataSet using a field position key
                  .where(0)
                  // key definition of second DataSet using a field position key
                  .equalTo(0)
                  // select and reorder fields of matching tuples
                  .projectFirst(0,2).projectSecond(1).projectFirst(1);
// scala
Not supported.

用数据集大小提示 Join #

为了引导优化器选择正确的执行策略,你可以提示要连接(join)的 DataSet 的大小,如下所示:

val input1: DataSet[(Int, String)] = // [...]
val input2: DataSet[(Int, String)] = // [...]

// hint that the second DataSet is very small
val result1 = input1.joinWithTiny(input2).where(0).equalTo(0)

// hint that the second DataSet is very large
val result1 = input1.joinWithHuge(input2).where(0).equalTo(0)

Join 算法提示 #

Flink 运行时可以以各种方式执行连接(join)。每一种可能的方式在不同的情况下都会优于其他方式。系统会尝试自动选择一种合理的方式,但也允许你手动选择一种策略,以防你想强制执行特定的连接(join)方式。

val input1: DataSet[SomeType] = // [...]
val input2: DataSet[AnotherType] = // [...]

// hint that the second DataSet is very small
val result1 = input1.join(input2, JoinHint.BROADCAST_HASH_FIRST).where("id").equalTo("key")

有以下提示:

  • OPTIMIZER_CHOOSES: 相当于完全不给提示,让系统来选择。

  • BROADCAST_HASH_FIRST:广播第一个输入,并据此建立一个哈希表,由第二个输入探测。如果第一个输入的数据非常小,这是一个很好的策略。

  • BROADCAST_HASH_SECOND: 广播第二个输入,并从中建立一个哈希表,由第一个输入探测。如果第二个输入非常小,是一个很好的策略。

  • REPARTITION_HASH_FIRST:系统对每个输入进行分区(洗牌)(除非输入已经被分区),并从第一个输入建立一个哈希表。如果第一个输入比第二个输入小,但两个输入都很大,这个策略就很好。注意:如果无法估计大小,也无法重新使用已有的分区和排序,系统就会使用这个默认的后备策略。

  • REPARTITION_HASH_SECOND:系统对每个输入进行分区(洗牌)(除非输入已经被分区),并从第二个输入建立一个哈希表。如果第二个输入比第一个输入小,但两个输入仍然很大,这个策略就很好。

  • REPARTITION_SORT_MERGE:系统对每个输入进行分区(洗牌)(除非输入已经分区),并对每个输入进行排序(除非已经排序)。通过对排序后的输入进行流式合并来连接(join)这些输入。如果一个或两个输入都已经被排序,这个策略就很好。

外连接 #

OuterJoin 转换在两个数据集上执行左、右或全外连接。外连接与常规(内连接)类似,创建所有键值相等的元素对。此外,如果在另一侧没有找到匹配的键,“外侧"的记录(左、右,或者在完全的情况下两者都有)将被保留。匹配的一对元素(或一个元素和另一个输入的空值)被交给 JoinFunction 将这对元素变成一个元素,或交给 FlatJoinFunction 将这对元素变成任意多个(包括无)元素。

两个 DataSets 的元素都是在一个或多个键上连接的,这些键可以通过使用

  • 键选择器函数
  • 一个或多个字段位置键(仅限 Tuple DataSet)。
  • case 类字段

OuterJoins 只支持 Java 和 Scala DataSet API。

用 Join 函数进行外连接 #

OuterJoin 转换调用一个用户定义的 join 函数来处理连接元组。join 函数接收第一个输入 DataSet 的一个元素和第二个输入 DataSet 的一个元素,并准确地返回一个元素。根据外连接的类型(左、右、全),连接函数的两个输入元素中可以有一个是空的。

下面的代码使用键选择器函数执行 DataSet 与自定义 java 对象和 Tuple DataSet 的左外连接,并展示了如何使用用户定义的连接函数。

case class Rating(name: String, category: String, points: Int)

val movies: DataSet[(String, String)] = // [...]
val ratings: DataSet[Ratings] = // [...]

val moviesWithPoints = movies.leftOuterJoin(ratings).where(0).equalTo("name") {
  (movie, rating) => (movie._1, if (rating == null) -1 else rating.points)
}

使用 Flat-Join 函数进行外连接 #

类似于 Map 和 FlatMap,一个带有 flat-join 函数的 OuterJoin 的行为与带有 join 函数的 OuterJoin 相同,但它不是返回一个元素,而是可以返回(收集)、零个、一个或多个元素。

Not supported.

Join 算法提示 #

Flink 运行时可以以各种方式执行外连接。每一种可能的方式在不同的情况下都会优于其他方式。系统试图自动选择一种合理的方式,但允许你手动选择一种策略,以防你想强制执行特定的外连接方式。

val input1: DataSet[SomeType] = // [...]
val input2: DataSet[AnotherType] = // [...]

// hint that the second DataSet is very small
val result1 = input1.leftOuterJoin(input2, JoinHint.REPARTITION_SORT_MERGE).where("id").equalTo("key")

val result2 = input1.rightOuterJoin(input2, JoinHint.BROADCAST_HASH_FIRST).where("id").equalTo("key")

有以下提示:

  • OPTIMIZER_CHOOSES: 相当于完全不给提示,让系统来选择。

  • BROADCAST_HASH_FIRST:广播第一个输入,并据此建立一个哈希表,由第二个输入探测。如果第一个输入的数据非常小,这是一个很好的策略。

  • BROADCAST_HASH_SECOND: 广播第二个输入,并从中建立一个哈希表,由第一个输入探测。如果第二个输入非常小,是一个很好的策略。

  • REPARTITION_HASH_FIRST:系统对每个输入进行分区(洗牌)(除非输入已经被分区),并从第一个输入建立一个哈希表。如果第一个输入比第二个输入小,但两个输入仍然很大,这个策略就很好。

  • REPARTITION_HASH_SECOND:系统对每个输入进行分区(洗牌)(除非输入已经被分区),并从第二个输入建立一个哈希表。如果第二个输入比第一个输入小,但两个输入仍然很大,这个策略就很好。

  • REPARTITION_SORT_MERGE:系统对每个输入进行分区(洗牌)(除非输入已经分区),并对每个输入进行排序(除非已经排序)。通过对排序后的输入进行流式合并来连接(join)这些输入。如果一个或两个输入都已经被排序,这个策略就很好。

注意:目前还不是所有的外连接类型都支持所有的执行策略。

  • LeftOuterJoin 支持:

    • OPTIMIZER_CHOOSES
    • BROADCAST_HASH_SECOND
    • REPARTITION_HASH_SECOND
    • REPARTITION_SORT_MERGE
  • RightOuterJoin 支持:

    • OPTIMIZER_CHOOSES
    • BROADCAST_HASH_FIRST
    • REPARTITION_HASH_FIRST
    • REPARTITION_SORT_MERGE
  • FullOuterJoin 支持:

    • OPTIMIZER_CHOOSES
    • REPARTITION_SORT_MERGE

Cross #

Cross 变换将两个 DataSets 组合成一个 DataSet。它建立了两个输入数据集元素的所有 pairwise 组合,即建立了一个笛卡尔积。Cross 变换要么在每对元素上调用用户定义的 cross 函数,要么输出一个 Tuple2。这两种模式如下所示。

注意:Cross 是一个潜在的计算密集型操作,甚至可以挑战大型计算集群。

使用用户定义函数进行交叉运算 #

Cross 变换可以调用一个用户定义的 cross 函数。cross 函数接收第一个输入的一个元素和第二个输入的一个元素,并正好返回一个结果元素。

下面的代码展示了如何使用 cross 函数对两个 DataSets 进行交叉变换。

case class Coord(id: Int, x: Int, y: Int)

val coords1: DataSet[Coord] = // [...]
val coords2: DataSet[Coord] = // [...]

val distances = coords1.cross(coords2) {
  (c1, c2) =>
    val dist = sqrt(pow(c1.x - c2.x, 2) + pow(c1.y - c2.y, 2))
    (c1.id, c2.id, dist)
}

用数据集大小提示交叉 #

为了引导优化器选择正确的执行策略,你可以提示要交叉的 DataSet 的大小,如下所示。

val input1: DataSet[(Int, String)] = // [...]
val input2: DataSet[(Int, String)] = // [...]

// hint that the second DataSet is very small
val result1 = input1.crossWithTiny(input2)

// hint that the second DataSet is very large
val result1 = input1.crossWithHuge(input2)

CoGroup #

CoGroup 转换联合(jointly)处理两个 DataSets 的组。两个 DataSets 根据定义的键进行分组,共享同一键的两个 DataSets 的组被一起交给用户定义的共组(co-group)函数。如果对于一个特定的键来说,只有一个 DataSet 有一个组,那么 co-group 函数就会和这个组以及一个空组一起被调用。共组(co-group)函数可以分别迭代两个组的元素,并返回任意数量的结果元素。

与 Reduce、GroupReduce 和 Join 类似,可以使用不同的键选择器方法来定义键。

数据集上的 CoGroup #

val iVals: DataSet[(String, Int)] = // [...]
val dVals: DataSet[(String, Double)] = // [...]

val output = iVals.coGroup(dVals).where(0).equalTo(0) {
  (iVals, dVals, out: Collector[Double]) =>
    val ints = iVals map { _._2 } toSet

    for (dVal <- dVals) {
      for (i <- ints) {
        out.collect(dVal._2 * i)
      }
    }
}

Union #

产生两个 DataSets 的联合(union),这两个 DataSets 必须是同一类型。两个以上 DataSets 的联合(union)可以通过多个联合(union)调用来实现,如下所示。

val vals1: DataSet[(String, Int)] = // [...]
val vals2: DataSet[(String, Int)] = // [...]
val vals3: DataSet[(String, Int)] = // [...]

val unioned = vals1.union(vals2).union(vals3)

Rebalance #

均匀地重新平衡 DataSet 的并行分区,以消除数据倾斜。

val in: DataSet[String] = // [...]
// rebalance DataSet and apply a Map transformation.
val out = in.rebalance().map { ... }

Hash-Partition #

在给定的键上对 DataSet 进行散列分割。键可以被指定为位置键、表达式键和键选择器函数(关于如何指定键,请参见 Reduce 示例)。

val in: DataSet[(String, Int)] = // [...]
// hash-partition DataSet by String value and apply a MapPartition transformation.
val out = in.partitionByHash(0).mapPartition { ... }

Range-Partition #

在给定的键上 Range-partitions 一个 DataSet。键可以被指定为位置键、表达式键和键选择器函数(关于如何指定键,请参见 Reduce 示例)。

val in: DataSet[(String, Int)] = // [...]
// range-partition DataSet by String value and apply a MapPartition transformation.
val out = in.partitionByRange(0).mapPartition { ... }

Sort Partition #

按照指定的顺序,在指定的字段上对 DataSet 的所有分区进行本地排序。字段可以被指定为字段表达式或字段位置(关于如何指定键,请参阅 Reduce 示例)。通过链式 sortPartition() 调用,可以在多个字段上对分区进行排序。

val in: DataSet[(String, Int)] = // [...]
// Locally sort partitions in ascending order on the second String field and
// in descending order on the first String field.
// Apply a MapPartition transformation on the sorted partitions.
val out = in.sortPartition(1, Order.ASCENDING)
            .sortPartition(0, Order.DESCENDING)
            .mapPartition { ... }

First-n #

返回一个 DataSet 的前 n 个(任意)元素。First-n 可以应用于一个常规的 DataSet、一个分组的 DataSet 或一个分组排序的 DataSet。分组键可以被指定为键选择器函数或字段位置键(关于如何指定键,请参见 Reduce 示例)。

val in: DataSet[(String, Int)] = // [...]
// Return the first five (arbitrary) elements of the DataSet
val out1 = in.first(5)

// Return the first two (arbitrary) elements of each String group
val out2 = in.groupBy(0).first(2)

// Return the first three elements of each String group ordered by the Integer field
val out3 = in.groupBy(0).sortGroup(1, Order.ASCENDING).first(3)

原文链接: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/batch/dataset_transformations.html