Spark Streaming 整合 Flume

一、简介

Apache Flume 是一个分布式,高可用的数据收集系统,可以从不同的数据源收集数据,经过聚合后发送到分布式计算框架或者存储系统中。Spark Straming 提供了以下两种方式用于 Flume 的整合。

二、推送式方法

在推送式方法 (Flume-style Push-based Approach) 中,Spark Streaming 程序需要对某台服务器的某个端口进行监听,Flume 通过 avro Sink 将数据源源不断推送到该端口。这里以监听日志文件为例,具体整合方式如下:

2.1 配置日志收集Flume

新建配置 netcat-memory-avro.properties,使用 tail 命令监听文件内容变化,然后将新的文件内容通过 avro sink 发送到 hadoop001 这台服务器的 8888 端口:

  1. #指定agent的sources,sinks,channels
  2. a1.sources = s1
  3. a1.sinks = k1
  4. a1.channels = c1
  5. #配置sources属性
  6. a1.sources.s1.type = exec
  7. a1.sources.s1.command = tail -F /tmp/log.txt
  8. a1.sources.s1.shell = /bin/bash -c
  9. a1.sources.s1.channels = c1
  10. #配置sink
  11. a1.sinks.k1.type = avro
  12. a1.sinks.k1.hostname = hadoop001
  13. a1.sinks.k1.port = 8888
  14. a1.sinks.k1.batch-size = 1
  15. a1.sinks.k1.channel = c1
  16. #配置channel类型
  17. a1.channels.c1.type = memory
  18. a1.channels.c1.capacity = 1000
  19. a1.channels.c1.transactionCapacity = 100

2.2 项目依赖

项目采用 Maven 工程进行构建,主要依赖为 spark-streamingspark-streaming-flume

  1. <properties>
  2. <scala.version>2.11</scala.version>
  3. <spark.version>2.4.0</spark.version>
  4. </properties>
  5. <dependencies>
  6. <!-- Spark Streaming-->
  7. <dependency>
  8. <groupId>org.apache.spark</groupId>
  9. <artifactId>spark-streaming_${scala.version}</artifactId>
  10. <version>${spark.version}</version>
  11. </dependency>
  12. <!-- Spark Streaming 整合 Flume 依赖-->
  13. <dependency>
  14. <groupId>org.apache.spark</groupId>
  15. <artifactId>spark-streaming-flume_${scala.version}</artifactId>
  16. <version>2.4.3</version>
  17. </dependency>
  18. </dependencies>

2.3 Spark Streaming接收日志数据

调用 FlumeUtils 工具类的 createStream 方法,对 hadoop001 的 8888 端口进行监听,获取到流数据并进行打印:

  1. import org.apache.spark.SparkConf
  2. import org.apache.spark.streaming.{Seconds, StreamingContext}
  3. import org.apache.spark.streaming.flume.FlumeUtils
  4. object PushBasedWordCount {
  5. def main(args: Array[String]): Unit = {
  6. val sparkConf = new SparkConf()
  7. val ssc = new StreamingContext(sparkConf, Seconds(5))
  8. // 1.获取输入流
  9. val flumeStream = FlumeUtils.createStream(ssc, "hadoop001", 8888)
  10. // 2.打印输入流的数据
  11. flumeStream.map(line => new String(line.event.getBody.array()).trim).print()
  12. ssc.start()
  13. ssc.awaitTermination()
  14. }
  15. }

2.4 项目打包

因为 Spark 安装目录下是不含有 spark-streaming-flume 依赖包的,所以在提交到集群运行时候必须提供该依赖包,你可以在提交命令中使用 --jar 指定上传到服务器的该依赖包,或者使用 --packages org.apache.spark:spark-streaming-flume_2.12:2.4.3 指定依赖包的完整名称,这样程序在启动时会先去中央仓库进行下载。

这里我采用的是第三种方式:使用 maven-shade-plugin 插件进行 ALL IN ONE 打包,把所有依赖的 Jar 一并打入最终包中。需要注意的是 spark-streaming 包在 Spark 安装目录的 jars 目录中已经提供,所以不需要打入。插件配置如下:

  1. <build>
  2. <plugins>
  3. <plugin>
  4. <groupId>org.apache.maven.plugins</groupId>
  5. <artifactId>maven-compiler-plugin</artifactId>
  6. <configuration>
  7. <source>8</source>
  8. <target>8</target>
  9. </configuration>
  10. </plugin>
  11. <!--使用 shade 进行打包-->
  12. <plugin>
  13. <groupId>org.apache.maven.plugins</groupId>
  14. <artifactId>maven-shade-plugin</artifactId>
  15. <configuration>
  16. <createDependencyReducedPom>true</createDependencyReducedPom>
  17. <filters>
  18. <filter>
  19. <artifact>*:*</artifact>
  20. <excludes>
  21. <exclude>META-INF/*.SF</exclude>
  22. <exclude>META-INF/*.sf</exclude>
  23. <exclude>META-INF/*.DSA</exclude>
  24. <exclude>META-INF/*.dsa</exclude>
  25. <exclude>META-INF/*.RSA</exclude>
  26. <exclude>META-INF/*.rsa</exclude>
  27. <exclude>META-INF/*.EC</exclude>
  28. <exclude>META-INF/*.ec</exclude>
  29. <exclude>META-INF/MSFTSIG.SF</exclude>
  30. <exclude>META-INF/MSFTSIG.RSA</exclude>
  31. </excludes>
  32. </filter>
  33. </filters>
  34. <artifactSet>
  35. <excludes>
  36. <exclude>org.apache.spark:spark-streaming_${scala.version}</exclude>
  37. <exclude>org.scala-lang:scala-library</exclude>
  38. <exclude>org.apache.commons:commons-lang3</exclude>
  39. </excludes>
  40. </artifactSet>
  41. </configuration>
  42. <executions>
  43. <execution>
  44. <phase>package</phase>
  45. <goals>
  46. <goal>shade</goal>
  47. </goals>
  48. <configuration>
  49. <transformers>
  50. <transformer
  51. implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
  52. <transformer
  53. implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
  54. </transformer>
  55. </transformers>
  56. </configuration>
  57. </execution>
  58. </executions>
  59. </plugin>
  60. <!--打包.scala 文件需要配置此插件-->
  61. <plugin>
  62. <groupId>org.scala-tools</groupId>
  63. <artifactId>maven-scala-plugin</artifactId>
  64. <version>2.15.1</version>
  65. <executions>
  66. <execution>
  67. <id>scala-compile</id>
  68. <goals>
  69. <goal>compile</goal>
  70. </goals>
  71. <configuration>
  72. <includes>
  73. <include>**/*.scala</include>
  74. </includes>
  75. </configuration>
  76. </execution>
  77. <execution>
  78. <id>scala-test-compile</id>
  79. <goals>
  80. <goal>testCompile</goal>
  81. </goals>
  82. </execution>
  83. </executions>
  84. </plugin>
  85. </plugins>
  86. </build>

本项目完整源码见:spark-streaming-flume

使用 mvn clean package 命令打包后会生产以下两个 Jar 包,提交 非 original 开头的 Jar 即可。

Spark Streaming 整合 Flume - 图1

2.5 启动服务和提交作业

启动 Flume 服务:

  1. flume-ng agent \
  2. --conf conf \
  3. --conf-file /usr/app/apache-flume-1.6.0-cdh5.15.2-bin/examples/netcat-memory-avro.properties \
  4. --name a1 -Dflume.root.logger=INFO,console

提交 Spark Streaming 作业:

  1. spark-submit \
  2. --class com.heibaiying.flume.PushBasedWordCount \
  3. --master local[4] \
  4. /usr/appjar/spark-streaming-flume-1.0.jar

2.6 测试

这里使用 echo 命令模拟日志产生的场景,往日志文件中追加数据,然后查看程序的输出:

Spark Streaming 整合 Flume - 图2

Spark Streaming 程序成功接收到数据并打印输出:

Spark Streaming 整合 Flume - 图3

2.7 注意事项

1. 启动顺序

这里需要注意的,不论你先启动 Spark 程序还是 Flume 程序,由于两者的启动都需要一定的时间,此时先启动的程序会短暂地抛出端口拒绝连接的异常,此时不需要进行任何操作,等待两个程序都启动完成即可。

Spark Streaming 整合 Flume - 图4

2. 版本一致

最好保证用于本地开发和编译的 Scala 版本和 Spark 的 Scala 版本一致,至少保证大版本一致,如都是 2.11


三、拉取式方法

拉取式方法 (Pull-based Approach using a Custom Sink) 是将数据推送到 SparkSink 接收器中,此时数据会保持缓冲状态,Spark Streaming 定时从接收器中拉取数据。这种方式是基于事务的,即只有在 Spark Streaming 接收和复制数据完成后,才会删除缓存的数据。与第一种方式相比,具有更强的可靠性和容错保证。整合步骤如下:

3.1 配置日志收集Flume

新建 Flume 配置文件 netcat-memory-sparkSink.properties,配置和上面基本一致,只是把 a1.sinks.k1.type 的属性修改为 org.apache.spark.streaming.flume.sink.SparkSink,即采用 Spark 接收器。

  1. #指定agent的sources,sinks,channels
  2. a1.sources = s1
  3. a1.sinks = k1
  4. a1.channels = c1
  5. #配置sources属性
  6. a1.sources.s1.type = exec
  7. a1.sources.s1.command = tail -F /tmp/log.txt
  8. a1.sources.s1.shell = /bin/bash -c
  9. a1.sources.s1.channels = c1
  10. #配置sink
  11. a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
  12. a1.sinks.k1.hostname = hadoop001
  13. a1.sinks.k1.port = 8888
  14. a1.sinks.k1.batch-size = 1
  15. a1.sinks.k1.channel = c1
  16. #配置channel类型
  17. a1.channels.c1.type = memory
  18. a1.channels.c1.capacity = 1000
  19. a1.channels.c1.transactionCapacity = 100

2.2 新增依赖

使用拉取式方法需要额外添加以下两个依赖:

  1. <dependency>
  2. <groupId>org.scala-lang</groupId>
  3. <artifactId>scala-library</artifactId>
  4. <version>2.12.8</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.commons</groupId>
  8. <artifactId>commons-lang3</artifactId>
  9. <version>3.5</version>
  10. </dependency>

注意:添加这两个依赖只是为了本地测试,Spark 的安装目录下已经提供了这两个依赖,所以在最终打包时需要进行排除。

2.3 Spark Streaming接收日志数据

这里和上面推送式方法的代码基本相同,只是将调用方法改为 createPollingStream

  1. import org.apache.spark.SparkConf
  2. import org.apache.spark.streaming.{Seconds, StreamingContext}
  3. import org.apache.spark.streaming.flume.FlumeUtils
  4. object PullBasedWordCount {
  5. def main(args: Array[String]): Unit = {
  6. val sparkConf = new SparkConf()
  7. val ssc = new StreamingContext(sparkConf, Seconds(5))
  8. // 1.获取输入流
  9. val flumeStream = FlumeUtils.createPollingStream(ssc, "hadoop001", 8888)
  10. // 2.打印输入流中的数据
  11. flumeStream.map(line => new String(line.event.getBody.array()).trim).print()
  12. ssc.start()
  13. ssc.awaitTermination()
  14. }
  15. }

2.4 启动测试

启动和提交作业流程与上面相同,这里给出执行脚本,过程不再赘述。

启动 Flume 进行日志收集:

  1. flume-ng agent \
  2. --conf conf \
  3. --conf-file /usr/app/apache-flume-1.6.0-cdh5.15.2-bin/examples/netcat-memory-sparkSink.properties \
  4. --name a1 -Dflume.root.logger=INFO,console

提交 Spark Streaming 作业:

  1. spark-submit \
  2. --class com.heibaiying.flume.PullBasedWordCount \
  3. --master local[4] \
  4. /usr/appjar/spark-streaming-flume-1.0.jar

参考资料