Set up a standalone Pulsar in Docker

可以在本地机器的 Docker 容器中运行单机模式的 Pulsar,进行本地开发和测试。

如果没有安装 Docker,可以下载社区版本,并按照相应操作系统的说明进行操作。

在 Docker 中启动 Pulsar

  • MacOS、Linux、Windows 用户:

    1. $ docker run -it \
    2. -p 6650:6650 \
    3. -p 8080:8080 \
    4. --mount source=pulsardata,target=/pulsar/data \
    5. --mount source=pulsarconf,target=/pulsar/conf \
    6. apachepulsar/pulsar:2.7.0 \
    7. bin/pulsar standalone

A few things to note about this command:

  • The data, metadata, and configuration are persisted on Docker volumes in order to not start “fresh” every time the container is restarted. For details on the volumes you can use docker volume inspect <sourcename>
  • For Docker on Windows make sure to configure it to use Linux containers

成功启动 Pulsar 后,你将看到如下所示的 INFO 级日志消息:

  1. 2017-08-09 22:34:04,030 - INFO - [main:WebService@213] - Web Service started at http://127.0.0.1:8080
  2. 2017-08-09 22:34:04,038 - INFO - [main:PulsarService@335] - messaging service is ready, bootstrap service on port=8080, broker url=pulsar://127.0.0.1:6650, cluster=standalone, configs=org.apache.pulsar.broker.ServiceConfiguration@4db60246
  3. ...

Tip

启动本地独立集群,将自动创建 public/default 命名空间。 自动创建的命名空间将用于开发用途。 所有Pulsar的topic主题都在命名空间中进行管理。 了解更多信息,参阅 Topics

在 Docker 中运行 Pulsar

Pulsar 支持多个客户端: JavaGo、<a href =“ client-libraries-python.md“>Python 和 C ++。 如果运行的是本地独立集群,则可以使用以下 URL 中的一个与其交互:

  • pulsar://localhost:6650
  • http://localhost:8080

以下示例展示了如何通过 Python 客户端的 API 快速入门 Pulsar。

直接从 PyPI 安装 Pulsar 的 Python 客户端库:

  1. $ pip install pulsar-client

Consume 一条消息

创建 consumer 并订阅 topic:

  1. import pulsar
  2. client = pulsar.Client('pulsar://localhost:6650')
  3. consumer = client.subscribe('my-topic',
  4. subscription_name='my-sub')
  5. while True:
  6. msg = consumer.receive()
  7. print("Received message: '%s'" % msg.data())
  8. consumer.acknowledge(msg)
  9. client.close()

Produce 一条消息

启动 producer,发送测试消息:

  1. import pulsar
  2. client = pulsar.Client('pulsar://localhost:6650')
  3. producer = client.create_producer('my-topic')
  4. for i in range(10):
  5. producer.send(('hello-pulsar-%d' % i).encode('utf-8'))
  6. client.close()

获取 topic 数据

In Pulsar, you can use REST, Java, or command-line tools to control every aspect of the system. For details on APIs, refer to Admin API Overview.

在最简单的示例中,您可以使用curl来获取特定主题的统计信息:

  1. $ curl http://localhost:8080/admin/v2/persistent/public/default/my-topic/stats | python -m json.tool

输出应如下所示:

  1. {
  2. "averageMsgSize": 0.0,
  3. "msgRateIn": 0.0,
  4. "msgRateOut": 0.0,
  5. "msgThroughputIn": 0.0,
  6. "msgThroughputOut": 0.0,
  7. "publishers": [
  8. {
  9. "address": "/172.17.0.1:35048",
  10. "averageMsgSize": 0.0,
  11. "clientVersion": "1.19.0-incubating",
  12. "connectedSince": "2017-08-09 20:59:34.621+0000",
  13. "msgRateIn": 0.0,
  14. "msgThroughputIn": 0.0,
  15. "producerId": 0,
  16. "producerName": "standalone-0-1"
  17. }
  18. ],
  19. "replication": {},
  20. "storageSize": 16,
  21. "subscriptions": {
  22. "my-sub": {
  23. "blockedSubscriptionOnUnackedMsgs": false,
  24. "consumers": [
  25. {
  26. "address": "/172.17.0.1:35064",
  27. "availablePermits": 996,
  28. "blockedConsumerOnUnackedMsgs": false,
  29. "clientVersion": "1.19.0-incubating",
  30. "connectedSince": "2017-08-09 21:05:39.222+0000",
  31. "consumerName": "166111",
  32. "msgRateOut": 0.0,
  33. "msgRateRedeliver": 0.0,
  34. "msgThroughputOut": 0.0,
  35. "unackedMessages": 0
  36. }
  37. ],
  38. "msgBacklog": 0,
  39. "msgRateExpired": 0.0,
  40. "msgRateOut": 0.0,
  41. "msgRateRedeliver": 0.0,
  42. "msgThroughputOut": 0.0,
  43. "type": "Exclusive",
  44. "unackedMessages": 0
  45. }
  46. }
  47. }