Pulsar adaptor for Apache Storm

Pulsar Storm is an adaptor for integrating with Apache Storm topologies. It provides core Storm implementations for sending and receiving data.

应用程序可以通过通用的 Pulsar spout 将数据写入到 Storm 拓扑中,也可以利用通用的 Pulsar bolt 来使用来自 Storm 拓扑的数据。

使用 Pulsar Storm Adaptor

引入 Pulsar Storm Adaptor 依赖:

  1. <dependency>
  2. <groupId>org.apache.pulsar</groupId>
  3. <artifactId>pulsar-storm</artifactId>
  4. <version>${pulsar.version}</version>
  5. </dependency>

Pulsar Spout

Pulsar Spout 允许 Storm topology 消费发布到 topic 的数据。 它根据收到的消息和客户端提供的 MessageToValuesMapper 发出 Storm tuple。

未能通过 downstream bolts 处理的元组将由 spout 以可配置的超时(默认为60秒)或可配置的重试次数(以先到者为准)中的指数回退量进行指数回退,然后 consumer 予以承认。 下面是 spout 的示例构造:

  1. MessageToValuesMapper messageToValuesMapper = new MessageToValuesMapper() {
  2. @Override
  3. public Values toValues(Message msg) {
  4. return new Values(new String(msg.getData()));
  5. }
  6. @Override
  7. public void declareOutputFields(OutputFieldsDeclarer declarer) {
  8. // declare the output fields
  9. declarer.declare(new Fields("string"));
  10. }
  11. };
  12. // Configure a Pulsar Spout
  13. PulsarSpoutConfiguration spoutConf = new PulsarSpoutConfiguration();
  14. spoutConf.setServiceUrl("pulsar://broker.messaging.usw.example.com:6650");
  15. spoutConf.setTopic("persistent://my-property/usw/my-ns/my-topic1");
  16. spoutConf.setSubscriptionName("my-subscriber-name1");
  17. spoutConf.setMessageToValuesMapper(messageToValuesMapper);
  18. // Create a Pulsar Spout
  19. PulsarSpout spout = new PulsarSpout(spoutConf);

点击这里查看完整示例.

Pulsar Bolt

The Pulsar bolt allows data in a Storm topology to be published on a topic. It publishes messages based on the Storm tuple received and the TupleToMessageMapper provided by the client.

A partitioned topic can also be used to publish messages on different topics. In the implementation of the TupleToMessageMapper, a “key” will need to be provided in the message which will send the messages with the same key to the same topic. Here’s an example bolt:

  1. TupleToMessageMapper tupleToMessageMapper = new TupleToMessageMapper() {
  2. @Override
  3. public TypedMessageBuilder<byte[]> toMessage(TypedMessageBuilder<byte[]> msgBuilder, Tuple tuple) {
  4. String receivedMessage = tuple.getString(0);
  5. // message processing
  6. String processedMsg = receivedMessage + "-processed";
  7. return msgBuilder.value(processedMsg.getBytes());
  8. }
  9. @Override
  10. public void declareOutputFields(OutputFieldsDeclarer declarer) {
  11. // declare the output fields
  12. }
  13. };
  14. // Configure a Pulsar Bolt
  15. PulsarBoltConfiguration boltConf = new PulsarBoltConfiguration();
  16. boltConf.setServiceUrl("pulsar://broker.messaging.usw.example.com:6650");
  17. boltConf.setTopic("persistent://my-property/usw/my-ns/my-topic2");
  18. boltConf.setTupleToMessageMapper(tupleToMessageMapper);
  19. // Create a Pulsar Bolt
  20. PulsarBolt bolt = new PulsarBolt(boltConf);