项目配置

本节将向你展示如何通过流行的构建工具 (MavenGradle) 配置你的项目,必要的依赖项(比如连接器和格式),以及覆盖一些高级配置主题。

每个 Flink 应用程序都依赖于一组 Flink 库。应用程序至少依赖于 Flink API,此外还依赖于某些连接器库(比如 Kafka、Cassandra),以及用户开发的自定义的数据处理逻辑所需要的第三方依赖项。

开始

要开始使用 Flink 应用程序,请使用以下命令、脚本和模板来创建 Flink 项目。

Maven

你可以使用如下的 Maven 命令或快速启动脚本,基于原型创建一个项目。

Maven 命令

  1. $ mvn archetype:generate \
  2. -DarchetypeGroupId=org.apache.flink \
  3. -DarchetypeArtifactId=flink-quickstart-java \
  4. -DarchetypeVersion=1.16.0

这允许你命名新建的项目,而且会交互式地询问 groupId、artifactId、package 的名字。

快速启动脚本

  1. $ curl https://flink.apache.org/q/quickstart.sh | bash -s 1.16.0

Gradle

你可以创建一个空项目,你需要在其中手动创建 src/main/javasrc/main/resources 目录并开始在其中编写一些类,使用如下 Gradle 构建脚本或下面提供的快速启动脚本以获得功能齐全的启动项目。

Gradle 构建脚本

请在脚本的所在目录执行 gradle 命令来执行这些构建配置脚本。

build.gradle

  1. plugins {
  2. id 'java'
  3. id 'application'
  4. // shadow plugin to produce fat JARs
  5. id 'com.github.johnrengelman.shadow' version '7.1.2'
  6. }
  7. // artifact properties
  8. group = 'org.quickstart'
  9. version = '0.1-SNAPSHOT'
  10. mainClassName = 'org.quickstart.DataStreamJob'
  11. description = """Flink Quickstart Job"""
  12. ext {
  13. javaVersion = '1.8'
  14. flinkVersion = '1.16.0'
  15. scalaBinaryVersion = '_2.12'
  16. slf4jVersion = '1.7.32'
  17. log4jVersion = '2.17.1'
  18. }
  19. sourceCompatibility = javaVersion
  20. targetCompatibility = javaVersion
  21. tasks.withType(JavaCompile) {
  22. options.encoding = 'UTF-8'
  23. }
  24. applicationDefaultJvmArgs = ["-Dlog4j.configurationFile=log4j2.properties"]
  25. // declare where to find the dependencies of your project
  26. repositories {
  27. mavenCentral()
  28. maven {
  29. url "https://repository.apache.org/content/repositories/snapshots"
  30. mavenContent {
  31. snapshotsOnly()
  32. }
  33. }
  34. }
  35. // NOTE: We cannot use "compileOnly" or "shadow" configurations since then we could not run code
  36. // in the IDE or with "gradle run". We also cannot exclude transitive dependencies from the
  37. // shadowJar yet (see https://github.com/johnrengelman/shadow/issues/159).
  38. // -> Explicitly define the // libraries we want to be included in the "flinkShadowJar" configuration!
  39. configurations {
  40. flinkShadowJar // dependencies which go into the shadowJar
  41. // always exclude these (also from transitive dependencies) since they are provided by Flink
  42. flinkShadowJar.exclude group: 'org.apache.flink', module: 'force-shading'
  43. flinkShadowJar.exclude group: 'com.google.code.findbugs', module: 'jsr305'
  44. flinkShadowJar.exclude group: 'org.slf4j'
  45. flinkShadowJar.exclude group: 'org.apache.logging.log4j'
  46. }
  47. // declare the dependencies for your production and test code
  48. dependencies {
  49. // --------------------------------------------------------------
  50. // Compile-time dependencies that should NOT be part of the
  51. // shadow (uber) jar and are provided in the lib folder of Flink
  52. // --------------------------------------------------------------
  53. implementation "org.apache.flink:flink-streaming-java:${flinkVersion}"
  54. implementation "org.apache.flink:flink-clients:${flinkVersion}"
  55. // --------------------------------------------------------------
  56. // Dependencies that should be part of the shadow jar, e.g.
  57. // connectors. These must be in the flinkShadowJar configuration!
  58. // --------------------------------------------------------------
  59. //flinkShadowJar "org.apache.flink:flink-connector-kafka:${flinkVersion}"
  60. runtimeOnly "org.apache.logging.log4j:log4j-slf4j-impl:${log4jVersion}"
  61. runtimeOnly "org.apache.logging.log4j:log4j-api:${log4jVersion}"
  62. runtimeOnly "org.apache.logging.log4j:log4j-core:${log4jVersion}"
  63. // Add test dependencies here.
  64. // testCompile "junit:junit:4.12"
  65. }
  66. // make compileOnly dependencies available for tests:
  67. sourceSets {
  68. main.compileClasspath += configurations.flinkShadowJar
  69. main.runtimeClasspath += configurations.flinkShadowJar
  70. test.compileClasspath += configurations.flinkShadowJar
  71. test.runtimeClasspath += configurations.flinkShadowJar
  72. javadoc.classpath += configurations.flinkShadowJar
  73. }
  74. run.classpath = sourceSets.main.runtimeClasspath
  75. jar {
  76. manifest {
  77. attributes 'Built-By': System.getProperty('user.name'),
  78. 'Build-Jdk': System.getProperty('java.version')
  79. }
  80. }
  81. shadowJar {
  82. configurations = [project.configurations.flinkShadowJar]
  83. }

settings.gradle

  1. rootProject.name = 'quickstart'

快速启动脚本

  1. bash -c "$(curl https://flink.apache.org/q/gradle-quickstart.sh)" -- 1.16.0 _2.12

需要哪些依赖项?

要开始一个 Flink 作业,你通常需要如下依赖项:

除此之外,若要开发自定义功能,你还要添加必要的第三方依赖项。

Flink提供了两大 API:Datastream APITable API & SQL,它们可以单独使用,也可以混合使用,具体取决于你的使用场景:

你要使用的 API你需要添加的依赖项
DataStreamflink-streaming-java
DataStream Scala 版flink-streaming-scala_2.12
Table APIflink-table-api-java
Table API Scala 版flink-table-api-scala_2.12
Table API + DataStreamflink-table-api-java-bridge
Table API + DataStream Scala 版flink-table-api-scala-bridge_2.12

你只需将它们包含在你的构建工具脚本/描述符中,就可以开发你的作业了!

运行和打包

如果你想通过简单地执行主类来运行你的作业,你需要 classpath 里有 flink-runtime。对于 Table API 程序,你还需要 flink-table-runtimeflink-table-planner-loader

根据经验,我们建议将应用程序代码及其所有必需的依赖项打包进一个 fat/uber JAR 中。这包括打包你作业用到的连接器、格式和第三方依赖项。此规则不适用于 Java API、DataStream Scala API 以及前面提到的运行时模块,它们已经由 Flink 本身提供,不应包含在作业的 uber JAR 中。你可以把该作业 JAR 提交到已经运行的 Flink 集群,也可以轻松将其添加到 Flink 应用程序容器镜像中,而无需修改发行版。

下一步是什么?