消费者示例

1 Consumer 示例

TubeMQ 提供了两种方式来消费消息: PullConsumer and PushConsumer。

1.1 PullConsumer

  1. public class PullConsumerExample {
  2. public static void main(String[] args) throws Throwable {
  3. final String masterHostAndPort = "localhost:8000";
  4. final String topic = "test";
  5. final String group = "test-group";
  6. final ConsumerConfig consumerConfig = new ConsumerConfig(masterHostAndPort, group);
  7. consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_LATEST_OFFSET);
  8. final MessageSessionFactory messageSessionFactory = new TubeSingleSessionFactory(consumerConfig);
  9. final PullMessageConsumer messagePullConsumer = messageSessionFactory.createPullConsumer(consumerConfig);
  10. messagePullConsumer.subscribe(topic, null);
  11. messagePullConsumer.completeSubscribe();
  12. // wait for client to join the exact consumer queue that consumer group allocated
  13. while (!messagePullConsumer.isPartitionsReady(1000)) {
  14. ThreadUtils.sleep(1000);
  15. }
  16. while (true) {
  17. ConsumerResult result = messagePullConsumer.getMessage();
  18. if (result.isSuccess()) {
  19. List<Message> messageList = result.getMessageList();
  20. for (Message message : messageList) {
  21. System.out.println("received message : " + message);
  22. }
  23. messagePullConsumer.confirmConsume(result.getConfirmContext(), true);
  24. }
  25. }
  26. }
  27. }

1.2 PushConsumer

  1. public class PushConsumerExample {
  2. public static void test(String[] args) throws Throwable {
  3. final String masterHostAndPort = "localhost:8000";
  4. final String topic = "test";
  5. final String group = "test-group";
  6. final ConsumerConfig consumerConfig = new ConsumerConfig(masterHostAndPort, group);
  7. consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_LATEST_OFFSET);
  8. final MessageSessionFactory messageSessionFactory = new TubeSingleSessionFactory(consumerConfig);
  9. final PushMessageConsumer pushConsumer = messageSessionFactory.createPushConsumer(consumerConfig);
  10. pushConsumer.subscribe(topic, null, new MessageListener() {
  11. @Override
  12. public void receiveMessages(PeerInfo peerInfo, List<Message> messages) throws InterruptedException {
  13. for (Message message : messages) {
  14. System.out.println("received message : " + new String(message.getData()));
  15. }
  16. }
  17. @Override
  18. public Executor getExecutor() {
  19. return null;
  20. }
  21. @Override
  22. public void stop() {
  23. //
  24. }
  25. });
  26. pushConsumer.completeSubscribe();
  27. CountDownLatch latch = new CountDownLatch(1);
  28. latch.await(10, TimeUnit.MINUTES);
  29. }
  30. }

Back to top