Pulsar Storm is an adaptor for integrating with Apache Storm topologies. It provides core Storm implementations for sending and receiving data.

An application can inject data into a Storm topology via a generic Pulsar spout, as well as consume data from a Storm topology via a generic Pulsar bolt.

使用 Pulsar Storm Adaptor

引入 Pulsar Storm Adaptor 依赖:

  1. <dependency>
  2. <groupId>org.apache.pulsar</groupId>
  3. <artifactId>pulsar-storm</artifactId>
  4. <version>${pulsar.version}</version>
  5. </dependency>

Pulsar Spout

Pulsar Spout允许通过 Storm topology 发布数据到一个topic用以消费. It emits a Storm tuple based on the message received and the MessageToValuesMapper provided by the client.

The tuples that fail to be processed by the downstream bolts will be re-injected by the spout with an exponential backoff, within a configurable timeout (the default is 60 seconds) or a configurable number of retries, whichever comes first, after which it is acknowledged by the consumer. Here’s an example construction of a spout:

  1. MessageToValuesMapper messageToValuesMapper = new MessageToValuesMapper() {
  2. @Override
  3. public Values toValues(Message msg) {
  4. return new Values(new String(msg.getData()));
  5. }
  6. @Override
  7. public void declareOutputFields(OutputFieldsDeclarer declarer) {
  8. // declare the output fields
  9. declarer.declare(new Fields("string"));
  10. }
  11. };
  12. // Configure a Pulsar Spout
  13. PulsarSpoutConfiguration spoutConf = new PulsarSpoutConfiguration();
  14. spoutConf.setServiceUrl("pulsar://broker.messaging.usw.example.com:6650");
  15. spoutConf.setTopic("persistent://my-property/usw/my-ns/my-topic1");
  16. spoutConf.setSubscriptionName("my-subscriber-name1");
  17. spoutConf.setMessageToValuesMapper(messageToValuesMapper);
  18. // Create a Pulsar Spout
  19. PulsarSpout spout = new PulsarSpout(spoutConf);

Pulsar Bolt

The Pulsar bolt allows data in a Storm topology to be published on a topic. It publishes messages based on the Storm tuple received and the TupleToMessageMapper provided by the client.

A partitioned topic can also be used to publish messages on different topics. In the implementation of the TupleToMessageMapper, a “key” will need to be provided in the message which will send the messages with the same key to the same topic. Here’s an example bolt:

  1. TupleToMessageMapper tupleToMessageMapper = new TupleToMessageMapper() {
  2. @Override
  3. public TypedMessageBuilder<byte[]> toMessage(TypedMessageBuilder<byte[]> msgBuilder, Tuple tuple) {
  4. String receivedMessage = tuple.getString(0);
  5. // message processing
  6. String processedMsg = receivedMessage + "-processed";
  7. return msgBuilder.value(processedMsg.getBytes());
  8. }
  9. @Override
  10. public void declareOutputFields(OutputFieldsDeclarer declarer) {
  11. // declare the output fields
  12. }
  13. };
  14. // Configure a Pulsar Bolt
  15. PulsarBoltConfiguration boltConf = new PulsarBoltConfiguration();
  16. boltConf.setServiceUrl("pulsar://broker.messaging.usw.example.com:6650");
  17. boltConf.setTopic("persistent://my-property/usw/my-ns/my-topic2");
  18. boltConf.setTupleToMessageMapper(tupleToMessageMapper);
  19. // Create a Pulsar Bolt
  20. PulsarBolt bolt = new PulsarBolt(boltConf);

示例

You can find a complete example here.