8 OpenMessaging样例

OpenMessaging旨在建立消息和流处理规范,以为金融、电子商务、物联网和大数据领域提供通用框架及工业级指导方案。在分布式异构环境中,设计原则是面向云、简单、灵活和独立于语言。符合这些规范将帮助企业方便的开发跨平台和操作系统的异构消息传递应用程序。提供了openmessaging-api 0.3.0-alpha的部分实现,下面的示例演示如何基于OpenMessaging访问RocketMQ。

8.1 OMSProducer样例

下面的示例演示如何在同步、异步或单向传输中向RocketMQ代理发送消息。

  1. import io.openmessaging.Future;
  2. import io.openmessaging.FutureListener;
  3. import io.openmessaging.Message;
  4. import io.openmessaging.MessagingAccessPoint;
  5. import io.openmessaging.OMS;
  6. import io.openmessaging.producer.Producer;
  7. import io.openmessaging.producer.SendResult;
  8. import java.nio.charset.Charset;
  9. import java.util.concurrent.CountDownLatch;
  10. public class SimpleProducer {
  11. public static void main(String[] args) {
  12. final MessagingAccessPoint messagingAccessPoint =
  13. OMS.getMessagingAccessPoint("oms:rocketmq://localhost:9876/default:default");
  14. final Producer producer = messagingAccessPoint.createProducer();
  15. messagingAccessPoint.startup();
  16. System.out.printf("MessagingAccessPoint startup OK%n");
  17. producer.startup();
  18. System.out.printf("Producer startup OK%n");
  19. {
  20. Message message = producer.createBytesMessage("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")));
  21. SendResult sendResult = producer.send(message);
  22. //final Void aVoid = result.get(3000L);
  23. System.out.printf("Send async message OK, msgId: %s%n", sendResult.messageId());
  24. }
  25. final CountDownLatch countDownLatch = new CountDownLatch(1);
  26. {
  27. final Future<SendResult> result = producer.sendAsync(producer.createBytesMessage("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
  28. result.addListener(new FutureListener<SendResult>() {
  29. @Override
  30. public void operationComplete(Future<SendResult> future) {
  31. if (future.getThrowable() != null) {
  32. System.out.printf("Send async message Failed, error: %s%n", future.getThrowable().getMessage());
  33. } else {
  34. System.out.printf("Send async message OK, msgId: %s%n", future.get().messageId());
  35. }
  36. countDownLatch.countDown();
  37. }
  38. });
  39. }
  40. {
  41. producer.sendOneway(producer.createBytesMessage("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
  42. System.out.printf("Send oneway message OK%n");
  43. }
  44. try {
  45. countDownLatch.await();
  46. Thread.sleep(500); // 等一些时间来发送消息
  47. } catch (InterruptedException ignore) {
  48. }
  49. producer.shutdown();
  50. }
  51. }

8.2 OMSPullConsumer

用OMS PullConsumer 来从指定的队列中拉取消息

  1. import io.openmessaging.Message;
  2. import io.openmessaging.MessagingAccessPoint;
  3. import io.openmessaging.OMS;
  4. import io.openmessaging.OMSBuiltinKeys;
  5. import io.openmessaging.consumer.PullConsumer;
  6. import io.openmessaging.producer.Producer;
  7. import io.openmessaging.producer.SendResult;
  8. public class SimplePullConsumer {
  9. public static void main(String[] args) {
  10. final MessagingAccessPoint messagingAccessPoint =
  11. OMS.getMessagingAccessPoint("oms:rocketmq://localhost:9876/default:default");
  12. messagingAccessPoint.startup();
  13. final Producer producer = messagingAccessPoint.createProducer();
  14. final PullConsumer consumer = messagingAccessPoint.createPullConsumer(
  15. OMS.newKeyValue().put(OMSBuiltinKeys.CONSUMER_ID, "OMS_CONSUMER"));
  16. messagingAccessPoint.startup();
  17. System.out.printf("MessagingAccessPoint startup OK%n");
  18. final String queueName = "TopicTest";
  19. producer.startup();
  20. Message msg = producer.createBytesMessage(queueName, "Hello Open Messaging".getBytes());
  21. SendResult sendResult = producer.send(msg);
  22. System.out.printf("Send Message OK. MsgId: %s%n", sendResult.messageId());
  23. producer.shutdown();
  24. consumer.attachQueue(queueName);
  25. consumer.startup();
  26. System.out.printf("Consumer startup OK%n");
  27. // 运行直到发现一个消息被发送了
  28. boolean stop = false;
  29. while (!stop) {
  30. Message message = consumer.receive();
  31. if (message != null) {
  32. String msgId = message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID);
  33. System.out.printf("Received one message: %s%n", msgId);
  34. consumer.ack(msgId);
  35. if (!stop) {
  36. stop = msgId.equalsIgnoreCase(sendResult.messageId());
  37. }
  38. } else {
  39. System.out.printf("Return without any message%n");
  40. }
  41. }
  42. consumer.shutdown();
  43. messagingAccessPoint.shutdown();
  44. }
  45. }

8.3 OMSPushConsumer

以下示范如何将 OMS PushConsumer 添加到指定的队列,并通过 MessageListener 消费这些消息。

  1. import io.openmessaging.Message;
  2. import io.openmessaging.MessagingAccessPoint;
  3. import io.openmessaging.OMS;
  4. import io.openmessaging.OMSBuiltinKeys;
  5. import io.openmessaging.consumer.MessageListener;
  6. import io.openmessaging.consumer.PushConsumer;
  7. public class SimplePushConsumer {
  8. public static void main(String[] args) {
  9. final MessagingAccessPoint messagingAccessPoint = OMS
  10. .getMessagingAccessPoint("oms:rocketmq://localhost:9876/default:default");
  11. final PushConsumer consumer = messagingAccessPoint.
  12. createPushConsumer(OMS.newKeyValue().put(OMSBuiltinKeys.CONSUMER_ID, "OMS_CONSUMER"));
  13. messagingAccessPoint.startup();
  14. System.out.printf("MessagingAccessPoint startup OK%n");
  15. Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
  16. @Override
  17. public void run() {
  18. consumer.shutdown();
  19. messagingAccessPoint.shutdown();
  20. }
  21. }));
  22. consumer.attachQueue("OMS_HELLO_TOPIC", new MessageListener() {
  23. @Override
  24. public void onReceived(Message message, Context context) {
  25. System.out.printf("Received one message: %s%n", message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID));
  26. context.ack();
  27. }
  28. });
  29. consumer.startup();
  30. System.out.printf("Consumer startup OK%n");
  31. }
  32. }