最佳实践

此页面包含Flink程序员关于如何解决经常遇到的问题的最佳实践的集合。

几乎所有Flink应用程序(批处理和流式处理)都依赖于外部配置参数。它们用于指定输入和输出源(如路径或地址),系统参数(并行性,运行时配置)和特定于应用程序的参数(通常在用户函数中使用)。

Flink提供了一个简单的实用程序,ParameterTool用于提供一些解决这些问题的基本工具。请注意,您不必使用ParameterTool此处描述的内容。其他框架(如Commons CLIargparse4j)也适用于Flink。

获取配置值 ParameterTool

ParameterTool提供了一组用于读取配置的预定义静态方法。该工具在内部期望a Map<String, String>,因此很容易将其与您自己的配置风格集成。

来自.properties文件

以下方法将读取属性文件并提供键/值对:

  1. String propertiesFilePath = "/home/sam/flink/myjob.properties";
  2. ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFilePath);
  3. File propertiesFile = new File(propertiesFilePath);
  4. ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFile);
  5. InputStream propertiesFileInputStream = new FileInputStream(file);
  6. ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFileInputStream);

从命令行参数

这允许—input hdfs:///mydata —elements 42从命令行获取参数

  1. public static void main(String[] args) {
  2. ParameterTool parameter = ParameterTool.fromArgs(args);
  3. // .. regular code ..

来自系统属性

启动JVM时,您可以将系统属性传递给它:-Dinput=hdfs:///mydata您还可以ParameterTool从这些系统属性初始化

  1. ParameterTool parameter = ParameterTool.fromSystemProperties();

现在我们已经从某个地方获得了参数(见上文),我们可以通过各种方式使用它们。

直接来自 ParameterTool

ParameterTool本身有访问值的方法。

  1. ParameterTool parameters = // ...
  2. parameter.getRequired("input");
  3. parameter.get("output", "myDefaultValue");
  4. parameter.getLong("expectedCount", -1L);
  5. parameter.getNumberOfParameters()
  6. // .. there are more methods available.

您可以直接在main()提交应用程序的客户端方法中使用这些方法的返回值例如,您可以设置 算子的并行度,如下所示:

  1. ParameterTool parameters = ParameterTool.fromArgs(args);
  2. int parallelism = parameters.get("mapParallelism", 2);
  3. DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).setParallelism(parallelism);

由于ParameterTool可序列化,您可以将其传递给函数本身:

  1. ParameterTool parameters = ParameterTool.fromArgs(args);
  2. DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer(parameters));

然后在函数内部使用它从命令行获取值。

全局注册参数

注册为全局作业参数的参数ExecutionConfig可以作为JobManager Web界面中的配置值以及用户定义的所有函数进行访问。

全局注册参数:

  1. ParameterTool parameters = ParameterTool.fromArgs(args);
  2. // set up the execution environment
  3. final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  4. env.getConfig().setGlobalJobParameters(parameters);

在任何丰富的用户函数中访问它们:

  1. public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
  2. @Override
  3. public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
  4. ParameterTool parameters = (ParameterTool)
  5. getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
  6. parameters.getRequired("input");
  7. // .. do more ..

命名大型TupleX类型

建议使用POJO(普通旧Java对象),而不是TupleX使用包含许多字段的数据类型。此外,POJO可用于为大型Tuple公式命名。

而不是使用:

  1. Tuple11<String, String, ..., String> var = new ...;

创建从大型元组类型扩展的自定义类型要容易得多。

  1. CustomType var = new ...;
  2. public static class CustomType extends Tuple11<String, String, ..., String> {
  3. // constructor matching super
  4. }

使用Logback而不是Log4j

注意:本教程适用于从Flink 0.10开始

Apache Flink使用slf4j作为代码中的日志记录抽象。建议用户在其用户函数中使用sfl4j。

Sfl4j是一个编译时日志记录接口,可以在运行时使用不同的日志记录实现,例如log4jLogback

Flink默认依赖于Log4j。本页介绍如何使用Flink和Logback。用户报告说他们也可以使用本教程使用Graylog设置集中式日志记录。

要在代码中获取记录器实例,请使用以下代码:

  1. import org.slf4j.Logger;
  2. import org.slf4j.LoggerFactory;
  3. public class MyClass implements MapFunction {
  4. private static final Logger LOG = LoggerFactory.getLogger(MyClass.class);
  5. // ...

在所有情况下,类都是由依赖管理器(如Maven)创建的类路径执行的,Flink会将log4j拉入类路径。

因此,您需要从Flink的依赖项中排除log4j。以下描述将假设从Flink快速入门创建的Maven项目

pom.xml像这样更改项目文件:

  1. <dependencies>
  2. <!-- Add the two required logback dependencies -->
  3. <dependency>
  4. <groupId>ch.qos.logback</groupId>
  5. <artifactId>logback-core</artifactId>
  6. <version>1.1.3</version>
  7. </dependency>
  8. <dependency>
  9. <groupId>ch.qos.logback</groupId>
  10. <artifactId>logback-classic</artifactId>
  11. <version>1.1.3</version>
  12. </dependency>
  13. <!-- Add the log4j -> sfl4j (-> logback) bridge into the classpath
  14. Hadoop is logging to log4j! -->
  15. <dependency>
  16. <groupId>org.slf4j</groupId>
  17. <artifactId>log4j-over-slf4j</artifactId>
  18. <version>1.7.7</version>
  19. </dependency>
  20. <dependency>
  21. <groupId>org.apache.flink</groupId>
  22. <artifactId>flink-java</artifactId>
  23. <version>1.7-SNAPSHOT</version>
  24. <exclusions>
  25. <exclusion>
  26. <groupId>log4j</groupId>
  27. <artifactId>*</artifactId>
  28. </exclusion>
  29. <exclusion>
  30. <groupId>org.slf4j</groupId>
  31. <artifactId>slf4j-log4j12</artifactId>
  32. </exclusion>
  33. </exclusions>
  34. </dependency>
  35. <dependency>
  36. <groupId>org.apache.flink</groupId>
  37. <artifactId>flink-streaming-java_2.11</artifactId>
  38. <version>1.7-SNAPSHOT</version>
  39. <exclusions>
  40. <exclusion>
  41. <groupId>log4j</groupId>
  42. <artifactId>*</artifactId>
  43. </exclusion>
  44. <exclusion>
  45. <groupId>org.slf4j</groupId>
  46. <artifactId>slf4j-log4j12</artifactId>
  47. </exclusion>
  48. </exclusions>
  49. </dependency>
  50. <dependency>
  51. <groupId>org.apache.flink</groupId>
  52. <artifactId>flink-clients_2.11</artifactId>
  53. <version>1.7-SNAPSHOT</version>
  54. <exclusions>
  55. <exclusion>
  56. <groupId>log4j</groupId>
  57. <artifactId>*</artifactId>
  58. </exclusion>
  59. <exclusion>
  60. <groupId>org.slf4j</groupId>
  61. <artifactId>slf4j-log4j12</artifactId>
  62. </exclusion>
  63. </exclusions>
  64. </dependency>
  65. </dependencies>

<dependencies>部分进行了以下更改

  • log4j从所有Flink依赖项中排除所有依赖项:这会导致Maven忽略Flink对log4j的传递依赖项。
  • slf4j-log4j12从Flink的依赖项中排除工件:由于我们将使用slf4j进行logback绑定,因此我们必须将slf4j删除为log4j绑定。
  • 添加Logback依赖项:logback-corelogback-classic
  • 添加依赖项log4j-over-slf4jlog4j-over-slf4j是一种工具,允许直接使用Log4j API的遗留应用程序使用Slf4j接口。Flink依赖于Hadoop,它直接使用Log4j进行日志记录。因此,我们需要将所有记录器调用从Log4j重定向到Slf4j,后者又记录到Logback。请注意,您需要手动将排除项添加到要添加到pom文件的所有新Flink依赖项中。

您可能还需要检查其他(非Flink)依赖项是否正在引入log4j绑定。您可以使用分析项目的依赖关系mvn dependency:tree

本教程适用于在YARN上运行Flink或作为独立群集。

要使用Logback而不是使用Flink的Log4j,您需要目录中删除log4j-1.2.xx.jarsfl4j-log4j12-xxx.jarlib/

接下来,您需要将以下jar文件放入该lib/文件夹:

  • logback-classic.jar
  • logback-core.jar
  • log4j-over-slf4j.jar:此桥接器需要存在于类路径中,以便将日志记录调用从Hadoop(使用Log4j)重定向到Slf4j。请注意,lib/在使用每个作业的YARN群集时,需要显式设置目录。

使用自定义记录器在YARN上提交Flink的命令是: ./bin/flink run -yt $FLINK_HOME/lib <… remaining arguments …>