应用程序参数处理

应用程序参数处理

几乎所有的批和流的 Flink 应用程序,都依赖于外部配置参数。这些配置参数可以用于指定输入和输出源(如路径或地址)、系统参数(并行度,运行时配置)和特定的应用程序参数(通常使用在用户自定义函数)。

为解决以上问题,Flink 提供一个名为 Parametertool 的简单公共类,其中包含了一些基本的工具。请注意,这里说的 Parametertool 并不是必须使用的。Commons CLIargparse4j 等其他框架也可以非常好地兼容 Flink。

ParameterTool 读取配置值

ParameterTool 定义了一组静态方法,用于读取配置信息。该工具类内部使用了 Map<string,string> 类型,这样使得它可以很容易地与你的配置集成在一起。

配置值来自 .properties 文件

以下方法可以读取 Properties 文件并解析出键/值对:

  1. String propertiesFilePath = "/home/sam/flink/myjob.properties";
  2. ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFilePath);
  3. File propertiesFile = new File(propertiesFilePath);
  4. ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFile);
  5. InputStream propertiesFileInputStream = new FileInputStream(file);
  6. ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFileInputStream);

配置值来自命令行

以下方法可以从命令行中获取参数,如 --input hdfs:///mydata --elements 42

  1. public static void main(String[] args) {
  2. ParameterTool parameter = ParameterTool.fromArgs(args);
  3. // .. regular code ..

配置值来自系统属性

启动 JVM 时,可以将系统属性传递给 JVM:-Dinput=hdfs:///mydata。你也可以从这些系统属性初始化 ParameterTool

  1. ParameterTool parameter = ParameterTool.fromSystemProperties();

现在我们已经从某处获取了参数(见上文),可以以各种不同的方式使用它们。

直接从 ParameterTool 获取

ParameterTool 本身具有访问配置值的方法。

  1. ParameterTool parameters = // ...
  2. parameter.getRequired("input");
  3. parameter.get("output", "myDefaultValue");
  4. parameter.getLong("expectedCount", -1L);
  5. parameter.getNumberOfParameters();
  6. // .. there are more methods available.

你可以在提交应用程序时直接在客户端的 main() 方法中使用这些方法的返回值。例如,你可以这样设置算子的并行度:

  1. ParameterTool parameters = ParameterTool.fromArgs(args);
  2. int parallelism = parameters.get("mapParallelism", 2);
  3. DataStream<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).setParallelism(parallelism);

由于 ParameterTool 是序列化的,你可以将其传递给函数本身:

  1. ParameterTool parameters = ParameterTool.fromArgs(args);
  2. DataStream<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer(parameters));

然后在函数内使用它以获取命令行的传递的参数。

注册全局参数

从 JobManager web 界面和用户定义的所有函数中可以以配置值的方式访问在 ExecutionConfig 中注册的全局作业参数。

注册全局参数:

  1. ParameterTool parameters = ParameterTool.fromArgs(args);
  2. // set up the execution environment
  3. final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  4. env.getConfig().setGlobalJobParameters(parameters);

在任意富函数中访问参数:

  1. public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
  2. @Override
  3. public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
  4. ParameterTool parameters = (ParameterTool)
  5. getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
  6. parameters.getRequired("input");
  7. // .. do more ..