Wait the light to fall

数据集中的 zipping 元素

焉知非鱼

Zipping Elements in a Dataset

Zipping 数据集中的元素 #

在某些算法中,人们可能需要为数据集元素分配唯一的标识符。本文档介绍了如何将 DataSetUtils 用于该目的。

使用密集索引进行 Zip #

zipWithIndex 给元素分配连续的标签,接收一个数据集作为输入,并返回一个新的(唯一id,初始值)2-tuples的数据集。这个过程需要两次传递,先计数再给元素贴标签,而且由于计数的同步性,不能采用流水线方式。备选的 zipWithUniqueId 以流水线的方式工作,当唯一的标签已经足够时,首选 zip。例如,下面的代码。

import org.apache.flink.api.scala._

val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
env.setParallelism(2)
val input: DataSet[String] = env.fromElements("A", "B", "C", "D", "E", "F", "G", "H")

val result: DataSet[(Long, String)] = input.zipWithIndex

result.writeAsCsv(resultPath, "\n", ",")
env.execute()

可以得到元组: (0,G), (1,H), (2,A), (3,B), (4,C), (5,D), (6,E), (7,F)

带有唯一标识符的 Zip #

在许多情况下,人们可能不需要分配连续的标签,zipWithUniqueId 以流水线的方式工作,加快了标签分配过程。该方法接收一个数据集作为输入,并返回一个由(唯一id,初始值)2-tuples组成的新数据集。例如,下面的代码。

import org.apache.flink.api.scala._

val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
env.setParallelism(2)
val input: DataSet[String] = env.fromElements("A", "B", "C", "D", "E", "F", "G", "H")

val result: DataSet[(Long, String)] = input.zipWithUniqueId

result.writeAsCsv(resultPath, "\n", ",")
env.execute()

可以得到元组: (0,G), (1,A), (2,H), (3,B), (5,C), (7,D), (9,E), (11,F)

原文链接: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/batch/zip_elements_guide.html