Wait the light to fall

处理应用程序参数

焉知非鱼

Handling Application Parameters

处理应用程序参数

几乎所有的 Flink 应用,包括批处理和流式应用,都依赖于外部配置参数,它们用于指定输入和输出源(如路径或地址)、系统参数(并行性、运行时配置)和应用特定参数(通常在用户函数中使用)。它们用于指定输入和输出源(如路径或地址)、系统参数(并行性、运行时配置)和应用程序特定参数(通常在用户函数中使用)。

Flink 提供了一个名为 ParameterTool 的简单工具,为解决这些问题提供一些基本的工具。请注意,你不一定要使用这里描述的 ParameterTool。其他框架如 Commons CLI和argparse4j 也能很好地与 Flink 一起工作。

将你的配置值导入 ParameterTool 之中

ParameterTool 提供了一组预定义的静态方法来读取配置。该工具内部期待的是一个 Map<String,String>,所以很容易将其与自己的配置风格整合在一起。

.properties 文件中

下面的方法将读取一个属性文件并提供键/值对。

String propertiesFilePath = "/home/sam/flink/myjob.properties";
ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFilePath);

File propertiesFile = new File(propertiesFilePath);
ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFile);

InputStream propertiesFileInputStream = new FileInputStream(file);
ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFileInputStream);

从命令行参数来看

这就允许从命令行中获取 --input hdfs://mydata --elements 42 这样的参数。

public static void main(String[] args) {
    ParameterTool parameter = ParameterTool.fromArgs(args);
    // .. regular code ..

从系统属性

当启动 JVM 时,你可以将系统属性传递给它。-Dinput=hdfs://mydata。你也可以从这些系统属性中初始化 ParameterTool。

ParameterTool parameter = ParameterTool.fromSystemProperties();

在 Flink 程序中使用参数

现在我们已经从某个地方得到了参数(见上文),我们可以以各种方式使用它们。

直接从 ParameterTool 中使用

ParameterTool 本身有访问值的方法。

ParameterTool parameters = // ...
parameter.getRequired("input");
parameter.get("output", "myDefaultValue");
parameter.getLong("expectedCount", -1L);
parameter.getNumberOfParameters()
// .. there are more methods available.

你可以在客户端提交应用程序的 main() 方法中直接使用这些方法的返回值。例如,你可以这样设置一个操作符的并行性。

ParameterTool parameters = ParameterTool.fromArgs(args);
int parallelism = parameters.get("mapParallelism", 2);
DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).setParallelism(parallelism);

由于 ParameterTool 是可序列化的,所以你可以把它传递给函数本身。

ParameterTool parameters = ParameterTool.fromArgs(args);
DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer(parameters));

然后在函数内部使用它从命令行获取值。

全局注册参数

在 ExecutionConfig 中注册为全局作业参数的参数可以作为配置值从 JobManager Web 界面和用户定义的所有功能中访问。

全局注册参数。

ParameterTool parameters = ParameterTool.fromArgs(args);

// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(parameters);

在任何丰富的用户功能中访问它们。

public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {

    @Override
    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
	ParameterTool parameters = (ParameterTool)
	    getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
	parameters.getRequired("input");
	// .. do more ..

原文链接: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/application_parameters.html