Wait the light to fall

Hadoop 的兼容性

焉知非鱼

Hadoop Compatibility Beta

Hadoop 兼容性测试版 #

Flink 与 Apache Hadoop MapReduce 接口兼容,因此允许重用为 Hadoop MapReduce 实现的代码。

您可以:

本文档展示了如何将现有的 Hadoop MapReduce 代码与 Flink 一起使用。从 Hadoop 支持的文件系统读取代码,请参考连接到其他系统指南。

项目配置 #

对 Hadoop 输入/输出格式的支持是 flink-java 和 flink-scala Maven 模块的一部分,这些模块在编写 Flink 作业时总是需要的。这些代码位于 org.apache.flink.api.java.hadooporg.apache.flink.api.scala.hadoop 中的 mapred 和 mapreduce API 的附加子包中。

对 Hadoop Mappers 和 Reducers 的支持包含在 flink-hadoop-compatibility Maven 模块中。这段代码位于 org.apache.flink.hadoopcompatibility 包中。

如果您想重用 Mappers 和 Reducers,请在 pom.xml 中添加以下依赖关系。

<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-hadoop-compatibility_2.11</artifactId>
	<version>1.11.0</version>
</dependency>

另请参见如何配置 hadoop 依赖关系

使用 Hadoop 输入格式 #

要使用 Flink 的 Hadoop InputFormats,必须先使用 HadoopInputs 实用程序类的 readHadoopFile 或 createHadoopInput 来包装格式。前者用于从 FileInputFormat 派生的输入格式,而后者必须用于通用的输入格式。通过使用 ExecutionEnvironmen#createInput,产生的 InputFormat 可以用来创建数据源。

生成的 DataSet 包含 2 个元组,其中第一个字段是键,第二个字段是从 Hadoop InputFormat 中检索的值。

下面的示例展示了如何使用 Hadoop 的 TextInputFormat。

val env = ExecutionEnvironment.getExecutionEnvironment

val input: DataSet[(LongWritable, Text)] =
  env.createInput(HadoopInputs.readHadoopFile(
                    new TextInputFormat, classOf[LongWritable], classOf[Text], textPath))

// Do something with the data.
[...]

使用 Hadoop 输出格式 #

Flink 为 Hadoop OutputFormat 提供了一个兼容性封装器,它支持任何实现 org.apache.hadoop.mapred.OutputFormat 或扩展 org.apache.hadoop.mapreduce.OutputFormat 的类。OutputFormat 包装器希望它的输入数据是一个包含2个key和value的 DataSet。这些数据将由 Hadoop OutputFormat 处理。

下面的示例展示了如何使用 Hadoop 的 TextOutputFormat。

// Obtain your result to emit.
val hadoopResult: DataSet[(Text, IntWritable)] = [...]

val hadoopOF = new HadoopOutputFormat[Text,IntWritable](
  new TextOutputFormat[Text, IntWritable],
  new JobConf)

hadoopOF.getJobConf.set("mapred.textoutputformat.separator", " ")
FileOutputFormat.setOutputPath(hadoopOF.getJobConf, new Path(resultPath))

hadoopResult.output(hadoopOF)

使用 Hadoop Mappers 和 Reducers #

Hadoop Mappers 在语义上等同于 Flink 的 FlatMapFunctions,Hadoop Reducers 等同于 Flink 的 GroupReduceFunctions。Flink 为 Hadoop MapReduce 的 Mapper 和 Reducer 接口的实现提供了封装器,也就是说,你可以在常规的 Flink 程序中重用你的 Hadoop Mapper 和 Reducer。目前,只支持 Hadoop 的 mapred API(org.apache.hadoop.mapred)的 Mapper 和 Reduce 接口。

包装器将一个 DataSet<Tuple2<KEYIN,VALUEIN> 作为输入,并产生一个 DataSet<Tuple2<KEYOUT,VALUEOUT> 作为输出,其中 KEYIN 和 KEYOUT 是键,VALUEIN 和 VALUEOUT 是 Hadoop 函数处理的 Hadoop 键值对的值。对于 Reducers,Flink 提供了一个包装器,用于带(HadoopReduceCombineFunction)和不带 Combiner(HadoopReduceFunction)的 GroupReduceFunction。包装器接受一个可选的 JobConf 对象来配置 Hadoop Mapper 或 Reducer。

Flink 的函数包装器有:

  • sorg.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction,
  • sorg.apache.flink.hadoopcompatibility.mapred.HadoopReduceFunction, 和
  • sorg.apache.flink.hadoopcompatibility.mapred.HadoopReduceCombineFunction.

并可作为常规的 Flink FlatMapFunctionsGroupReduceFunctions 使用。

下面的例子展示了如何使用 Hadoop Mapper 和 Reducer 函数:

// Obtain data to process somehow.
DataSet<Tuple2<LongWritable, Text>> text = [...]

DataSet<Tuple2<Text, LongWritable>> result = text
  // use Hadoop Mapper (Tokenizer) as MapFunction
  .flatMap(new HadoopMapFunction<LongWritable, Text, Text, LongWritable>(
    new Tokenizer()
  ))
  .groupBy(0)
  // use Hadoop Reducer (Counter) as Reduce- and CombineFunction
  .reduceGroup(new HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable>(
    new Counter(), new Counter()
  ));

请注意:Reducer 包装器工作在 Flink 的 groupBy() 操作所定义的组上。它不考虑您在 JobConf 中设置的任何自定义分区器、排序或分组比较器。

完整的 Hadoop WordCount 示例 #

下面的示例展示了使用 Hadoop 数据类型、Input-和 OutputFormats 以及 Mapper 和 Reducer 实现的完整 WordCount 实现。

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// Set up the Hadoop TextInputFormat.
Job job = Job.getInstance();
HadoopInputFormat<LongWritable, Text> hadoopIF =
  new HadoopInputFormat<LongWritable, Text>(
    new TextInputFormat(), LongWritable.class, Text.class, job
  );
TextInputFormat.addInputPath(job, new Path(inputPath));

// Read data using the Hadoop TextInputFormat.
DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopIF);

DataSet<Tuple2<Text, LongWritable>> result = text
  // use Hadoop Mapper (Tokenizer) as MapFunction
  .flatMap(new HadoopMapFunction<LongWritable, Text, Text, LongWritable>(
    new Tokenizer()
  ))
  .groupBy(0)
  // use Hadoop Reducer (Counter) as Reduce- and CombineFunction
  .reduceGroup(new HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable>(
    new Counter(), new Counter()
  ));

// Set up the Hadoop TextOutputFormat.
HadoopOutputFormat<Text, LongWritable> hadoopOF =
  new HadoopOutputFormat<Text, LongWritable>(
    new TextOutputFormat<Text, LongWritable>(), job
  );
hadoopOF.getConfiguration().set("mapreduce.output.textoutputformat.separator", " ");
TextOutputFormat.setOutputPath(job, new Path(outputPath));

// Emit data using the Hadoop TextOutputFormat.
result.output(hadoopOF);

// Execute Program
env.execute("Hadoop WordCount");

原文链接: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/batch/hadoop_compatibility.html