可以在本地机器的 Docker 容器中运行单机模式的 Pulsar,进行本地开发和测试。

如果没有安装 Docker,可以下载社区版本,并按照相应操作系统的说明进行操作。

在 Docker 中启动 Pulsar

  • 在 Mac 和 Linux 上:
  1. $ docker run -it \
  2. -p 6650:6650 \
  3. -p 8080:8080 \
  4. -v $PWD/data:/pulsar/data \
  5. apachepulsar/pulsar:2.6.1 \
  6. bin/pulsar standalone
  • For Windows:

    1. $ docker run -it \
    2. -p 6650:6650 \
    3. -p 8080:8080 \
    4. -v "$PWD/data:/pulsar/data".ToLower() \
    5. apachepulsar/pulsar:2.6.1 \
    6. bin/pulsar standalone

A few things to note about this command:

  • $PWD/data : The docker host directory in Windows operating system must be lowercase.$PWD/data provides you with the specified directory, for example: E:/data.
  • -v $PWD/data:/pulsar/data: This makes the process inside the container to store the data and metadata in the filesystem outside the container, in order not to start “fresh” every time the container is restarted.

If you start Pulsar successfully, you will see INFO-level log messages like this:

  1. 2017-08-09 22:34:04,030 - INFO - [main:WebService@213] - Web Service started at http://127.0.0.1:8080 2017-08-09 22:34:04,038 - INFO - [main:PulsarService@335] - messaging service is ready, bootstrap service on port=8080, broker url=pulsar://127.0.0.1:6650, cluster=standalone, configs=org.apache.pulsar.broker.ServiceConfiguration@4db60246 ...
  2. <br />> #### Tip
  3. >
  4. > When you start a local standalone cluster, a `public/default`
  5. namespace is created automatically. 自动创建的命名空间将用于开发用途。 所有Pulsartopic主题都在命名空间中进行管理。
  6. For more information, see [Topics](concepts-messaging.md#topics).
  7. ## Use Pulsar in Docker
  8. Pulsar offers client libraries for [Java](client-libraries-java.md), [Go](client-libraries-go.md), [Python](client-libraries-python.md)
  9. and [C++](client-libraries-cpp.md). If you're running a local standalone cluster, you can
  10. use one of these root URLs to interact with your cluster:
  11. * `pulsar://localhost:6650`
  12. * `http://localhost:8080`
  13. The following example will guide you get started with Pulsar quickly by using the [Python](client-libraries-python.md)
  14. client API.
  15. Install the Pulsar Python client library directly from [PyPI](https://pypi.org/project/pulsar-client/):
  16. ```shell
  17. $ pip install pulsar-client
  18. ### Consume 一条消息
  19. 创建 consumer 并订阅 topic:
  20. ```python
  21. import pulsar
  22. client = pulsar.Client('pulsar://localhost:6650')
  23. consumer = client.subscribe('my-topic',
  24. subscription_name='my-sub')
  25. while True:
  26. msg = consumer.receive()
  27. print("Received message: '%s'" % msg.data())
  28. consumer.acknowledge(msg)
  29. client.close()

Produce 一条消息

启动 producer,发送测试消息:

  1. import pulsar
  2. client = pulsar.Client('pulsar://localhost:6650')
  3. producer = client.create_producer('my-topic')
  4. for i in range(10):
  5. producer.send(('hello-pulsar-%d' % i).encode('utf-8'))
  6. client.close()

获取 topic 数据

In Pulsar, you can use REST, Java, or command-line tools to control every aspect of the system. For details on APIs, refer to Admin API Overview.

在最简单的示例中,您可以使用curl来获取特定主题的统计信息:

  1. $ curl http://localhost:8080/admin/v2/persistent/public/default/my-topic/stats | python -m json.tool

输出应如下所示:

  1. {
  2. "averageMsgSize": 0.0,
  3. "msgRateIn": 0.0,
  4. "msgRateOut": 0.0,
  5. "msgThroughputIn": 0.0,
  6. "msgThroughputOut": 0.0,
  7. "publishers": [
  8. {
  9. "address": "/172.17.0.1:35048",
  10. "averageMsgSize": 0.0,
  11. "clientVersion": "1.19.0-incubating",
  12. "connectedSince": "2017-08-09 20:59:34.621+0000",
  13. "msgRateIn": 0.0,
  14. "msgThroughputIn": 0.0,
  15. "producerId": 0,
  16. "producerName": "standalone-0-1"
  17. }
  18. ],
  19. "replication": {},
  20. "storageSize": 16,
  21. "subscriptions": {
  22. "my-sub": {
  23. "blockedSubscriptionOnUnackedMsgs": false,
  24. "consumers": [
  25. {
  26. "address": "/172.17.0.1:35064",
  27. "availablePermits": 996,
  28. "blockedConsumerOnUnackedMsgs": false,
  29. "clientVersion": "1.19.0-incubating",
  30. "connectedSince": "2017-08-09 21:05:39.222+0000",
  31. "consumerName": "166111",
  32. "msgRateOut": 0.0,
  33. "msgRateRedeliver": 0.0,
  34. "msgThroughputOut": 0.0,
  35. "unackedMessages": 0
  36. }
  37. ],
  38. "msgBacklog": 0,
  39. "msgRateExpired": 0.0,
  40. "msgRateOut": 0.0,
  41. "msgRateRedeliver": 0.0,
  42. "msgThroughputOut": 0.0,
  43. "type": "Exclusive",
  44. "unackedMessages": 0
  45. }
  46. }
  47. }