Pulsar的Spark Streaming receiver经过专门的定制,使 Apache Spark Streaming 能够从Pulsar接收数据.

应用程序可以通过 Spark Streaming Pulsar receiver 接收 Resilient Distributed Dataset (RDD) 格式的数据,并可以通过多种方式对其进行处理.

先决条件

若要使用 receiver,请在 java 配置中包含 pulsar-spark 库的依赖。

Maven

如果你使用maven,添加以下内容到你的 pom.xml 中:

  1. <!-- in your <properties> block -->
  2. <pulsar.version>2.6.1</pulsar.version>
  3. <!-- in your <dependencies> block -->
  4. <dependency>
  5. <groupId>org.apache.pulsar</groupId>
  6. <artifactId>pulsar-spark</artifactId>
  7. <version>${pulsar.version}</version>
  8. </dependency>

Gradle

如果你使用Gradle,添加以下内容到你的 build.gradle 中:

  1. def pulsarVersion = "2.6.1"
  2. dependencies {
  3. compile group: 'org.apache.pulsar', name: 'pulsar-spark', version: pulsarVersion
  4. }

用法

把一个 SparkStreamingPulsarReceiver 实例,传入 JavaStreamingContextreceiverStream方法中作为的参数:

  1. String serviceUrl = "pulsar://localhost:6650/";
  2. String topic = "persistent://public/default/test_src";
  3. String subs = "test_sub";
  4. SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName("Pulsar Spark Example");
  5. JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, Durations.seconds(60));
  6. ConsumerConfigurationData<byte[]> pulsarConf = new ConsumerConfigurationData();
  7. Set<String> set = new HashSet<>();
  8. set.add(topic);
  9. pulsarConf.setTopicNames(set);
  10. pulsarConf.setSubscriptionName(subs);
  11. SparkStreamingPulsarReceiver pulsarReceiver = new SparkStreamingPulsarReceiver(
  12. serviceUrl,
  13. pulsarConf,
  14. new AuthenticationDisabled());
  15. JavaReceiverInputDStream<byte[]> lineDStream = jsc.receiverStream(pulsarReceiver);

示例

You can find a complete example here. 该示例统计接收的消息有多少条包含字符串 “Pulsar”。