Wait the light to fall

批处理例子

焉知非鱼

Batch Examples

Batch 示例 #

下面的示例程序展示了 Flink 的不同应用,从简单的单词计数到图形算法。这些代码样本说明了 Flink 的 DataSet API 的使用。

以下和更多例子的完整源代码可以在 Flink 源码库的 flink-examples-batch 模块中找到。

运行一个例子 #

为了运行一个 Flink 实例,我们假设你有一个正在运行的 Flink 实例。导航中的 “Quickstart” 和 “Setup” 选项卡描述了启动 Flink 的各种方法。

最简单的方法是运行 ./bin/start-cluster.sh,默认情况下,它用一个 JobManager 和一个 TaskManager 启动一个本地集群。

Flink 的每个二进制版本都包含一个例子目录,其中有本页每个例子的 jar 文件。

要运行 WordCount 示例,请发出以下命令。

./bin/flink run ./examples/batch/WordCount.jar

其他的例子也可以用类似的方式启动。

请注意,许多例子在运行时没有传递任何参数,而是使用内置的数据。要使用真实数据运行 WordCount,你必须传递数据的路径。

./bin/flink run ./examples/batch/WordCount.jar --input /path/to/some/text/data --output /path/to/result

请注意,非本地文件系统需要一个模式前缀,如 hdfs://

WordCount #

WordCount 是大数据处理系统中的 “Hello World”。它计算文本集合中的单词频率。该算法分两步工作。首先,文本被分割成单个单词。第二,对单词进行分组和计数。

val env = ExecutionEnvironment.getExecutionEnvironment

// get input data
val text = env.readTextFile("/path/to/file")

val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
  .map { (_, 1) }
  .groupBy(0)
  .sum(1)

counts.writeAsCsv(outputPath, "\n", " ")

WordCount 的例子实现了上面描述的算法,输入参数:--input <path> --output <path>。作为测试数据,任何文本文件都可以。

页面排名 #

PageRank 算法计算由链接定义的图中页面的"重要性",这些链接从一个页面指向另一个页面。它是一种迭代图算法,这意味着它反复应用相同的计算。在每一次迭代中,每个页面将其当前的排名分布在所有的邻居上,并计算其新的排名,作为它从邻居那里得到的排名的累加和。PageRank 算法是由 Google 搜索引擎推广的,它利用网页的重要性来对搜索查询的结果进行排名。

在这个简单的例子中,PageRank 的实现方式是批量迭代和固定的迭代次数。

// User-defined types
case class Link(sourceId: Long, targetId: Long)
case class Page(pageId: Long, rank: Double)
case class AdjacencyList(sourceId: Long, targetIds: Array[Long])

// set up execution environment
val env = ExecutionEnvironment.getExecutionEnvironment

// read the pages and initial ranks by parsing a CSV file
val pages = env.readCsvFile[Page](pagesInputPath)

// the links are encoded as an adjacency list: (page-id, Array(neighbor-ids))
val links = env.readCsvFile[Link](linksInputPath)

// assign initial ranks to pages
val pagesWithRanks = pages.map(p => Page(p, 1.0 / numPages))

// build adjacency list from link input
val adjacencyLists = links
  // initialize lists
  .map(e => AdjacencyList(e.sourceId, Array(e.targetId)))
  // concatenate lists
  .groupBy("sourceId").reduce {
  (l1, l2) => AdjacencyList(l1.sourceId, l1.targetIds ++ l2.targetIds)
  }

// start iteration
val finalRanks = pagesWithRanks.iterateWithTermination(maxIterations) {
  currentRanks =>
    val newRanks = currentRanks
      // distribute ranks to target pages
      .join(adjacencyLists).where("pageId").equalTo("sourceId") {
        (page, adjacent, out: Collector[Page]) =>
        for (targetId <- adjacent.targetIds) {
          out.collect(Page(targetId, page.rank / adjacent.targetIds.length))
        }
      }
      // collect ranks and sum them up
      .groupBy("pageId").aggregate(SUM, "rank")
      // apply dampening factor
      .map { p =>
        Page(p.pageId, (p.rank * DAMPENING_FACTOR) + ((1 - DAMPENING_FACTOR) / numPages))
      }

    // terminate if no rank update was significant
    val termination = currentRanks.join(newRanks).where("pageId").equalTo("pageId") {
      (current, next, out: Collector[Int]) =>
        // check for significant update
        if (math.abs(current.rank - next.rank) > EPSILON) out.collect(1)
    }

    (newRanks, termination)
}

val result = finalRanks

// emit result
result.writeAsCsv(outputPath, "\n", " ")

PageRank 程序实现了上述示例。它需要以下参数才能运行。--pages <path> --links <path> --output <path> --numPages <n> --iterations <n>

输入文件是纯文本文件,必须按以下格式进行。

  • 页数用一个(长)ID 表示,用换行字符分隔。
    • 例如 “1/n2/n12/n42/n63/n” 给出了 5 个 ID 为 1、2、12、42 和 63 的页面。
  • 链接用页面 ID 对表示,用空格分隔。链接用换行符分隔。
    • 例如 “1 2\n2 12\n1 12\n42 63\n” 给出了四个(定向)链接(1)->(2),(2)->(12),(1)->(12)和(42)->(63)。

对于这个简单的实现,要求每个页面至少有一个入站链接和一个出站链接(一个页面可以指向自己)。

连接的组件 #

Connected Components 算法通过给同一连接部分中的所有顶点分配相同的组件 ID,来识别较大图中相互连接的部分。与 PageRank 类似,Connected Components 是一种迭代算法。在每一步中,每个顶点将其当前的组件 ID 传播给所有的邻居。如果一个顶点接受来自邻居的组件 ID,如果它小于自己的组件 ID。

本实现使用增量迭代。没有改变组件 ID 的顶点不参与下一步。这产生了更好的性能,因为后面的迭代通常只处理一些离群的顶点。

// set up execution environment
val env = ExecutionEnvironment.getExecutionEnvironment

// read vertex and edge data
// assign the initial components (equal to the vertex id)
val vertices = getVerticesDataSet(env).map { id => (id, id) }

// undirected edges by emitting for each input edge the input edges itself and an inverted
// version
val edges = getEdgesDataSet(env).flatMap { edge => Seq(edge, (edge._2, edge._1)) }

// open a delta iteration
val verticesWithComponents = vertices.iterateDelta(vertices, maxIterations, Array(0)) {
  (s, ws) =>

    // apply the step logic: join with the edges
    val allNeighbors = ws.join(edges).where(0).equalTo(0) { (vertex, edge) =>
      (edge._2, vertex._2)
    }

    // select the minimum neighbor
    val minNeighbors = allNeighbors.groupBy(0).min(1)

    // update if the component of the candidate is smaller
    val updatedComponents = minNeighbors.join(s).where(0).equalTo(0) {
      (newVertex, oldVertex, out: Collector[(Long, Long)]) =>
        if (newVertex._2 < oldVertex._2) out.collect(newVertex)
    }

    // delta and new workset are identical
    (updatedComponents, updatedComponents)
}

verticesWithComponents.writeAsCsv(outputPath, "\n", " ")

ConnectedComponents 程序实现了上面的例子。它需要以下参数才能运行: --vertices <path> --edges <path> --output <path> --iterations <n>

输入文件是纯文本文件,必须按如下格式编写。

  • 顶点用 ID 表示,并用换行符隔开。
    • 例如 “1/n2/n12/n42/n63/n” 给出了五个顶点,分别是(1)、(2)、(12)、(42)和(63)。
  • 边缘用一对顶点 ID 表示,这些顶点 ID 用空格字符分隔。边缘用换行符隔开。
    • 例如,“1 2/n2 12/n1 12/n42 63/n” 给出了四个(非直接)联系(1)-(2)、(2)-(12)、(1)-(12)和(42)-(63)。

原文链接: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/batch/examples.html