Pulsar Python client

Pulsar 的 Python 客户端库是对现有 C++ 客户端库 的封装, 它所包含的功能与 C++ 客户端 相同。 你可以在 C++ 客户端源码的 python 子目录中找到 Pulsar Python 客户端的相关源码 。

Python 客户端中生产者、消费者和 Reader 的所有方法都是线程安全的。

关于 pdoc 生成的适用于 Python 客户端的 API 文档,可以参阅 这里

安装

You can install the pulsar-client library either via PyPi, using pip, or by building the library from source.

使用 pip 安装

To install the pulsar-client library as a pre-built package using the pip package manager:

  1. $ pip install pulsar-client==2.9.2

可选依赖

为了支持 Pulsar 函数或 Avro 序列化等方面,可以在 pulsar-client 库安装额外的可选组件。

  1. # avro serialization
  2. $ pip install pulsar-client[avro]=='2.9.2'
  3. # functions runtime
  4. $ pip install pulsar-client[functions]=='2.9.2'
  5. # all optional components
  6. $ pip install pulsar-client[all]=='2.9.2'

Installation via PyPi is available for the following Python versions:

平台支持的 Python 版本
MacOS
10.13 (High Sierra), 10.14 (Mojave)
2.7, 3.7
Linux2.7, 3.4, 3.5, 3.6, 3.7, 3.8

Install from source

To install the pulsar-client library by building from source, follow instructions and compile the Pulsar C++ client library. That builds the Python binding for the library.

To install the built Python bindings:

  1. $ git clone https://github.com/apache/pulsar
  2. $ cd pulsar/pulsar-client-cpp/python
  3. $ sudo python setup.py install

API 手册:

The complete Python API reference is available at api/python.

示例

You can find a variety of Python code examples for the pulsar-client library.

生产者示例

The following example creates a Python producer for the my-topic topic and sends 10 messages on that topic:

  1. import pulsar
  2. client = pulsar.Client('pulsar://localhost:6650')
  3. producer = client.create_producer('my-topic')
  4. for i in range(10):
  5. producer.send(('Hello-%d' % i).encode('utf-8'))
  6. client.close()

消费者示例

The following example creates a consumer with the my-subscription subscription name on the my-topic topic, receives incoming messages, prints the content and ID of messages that arrive, and acknowledges each message to the Pulsar broker.

  1. import pulsar
  2. client = pulsar.Client('pulsar://localhost:6650')
  3. consumer = client.subscribe('my-topic', 'my-subscription')
  4. while True:
  5. msg = consumer.receive()
  6. try:
  7. print("Received message '{}' id='{}'".format(msg.data(), msg.message_id()))
  8. # 确认消息已经成功收到和处理
  9. consumer.acknowledge(msg)
  10. except:
  11. # 消息未被成功处理
  12. consumer.negative_acknowledge(msg)
  13. client.close()

This example shows how to configure negative acknowledgement.

  1. from pulsar import Client, schema
  2. client = Client('pulsar://localhost:6650')
  3. consumer = client.subscribe('negative_acks','test',schema=schema.StringSchema())
  4. producer = client.create_producer('negative_acks',schema=schema.StringSchema())
  5. for i in range(10):
  6. print('send msg "hello-%d"' % i)
  7. producer.send_async('hello-%d' % i, callback=None)
  8. producer.flush()
  9. for i in range(10):
  10. msg = consumer.receive()
  11. consumer.negative_acknowledge(msg)
  12. print('receive and nack msg "%s"' % msg.data())
  13. for i in range(10):
  14. msg = consumer.receive()
  15. consumer.acknowledge(msg)
  16. print('receive and ack msg "%s"' % msg.data())
  17. try:
  18. # No more messages expected
  19. msg = consumer.receive(100)
  20. except:
  21. print("no more msg")
  22. pass

读者接口示例

You can use the Pulsar Python API to use the Pulsar reader interface. 下面是一个示例:

  1. # MessageId 取自先前获取的消息
  2. msg_id = msg.message_id()
  3. reader = client.create_reader('my-topic', msg_id)
  4. while True:
  5. msg = reader.read_next()
  6. print("Received message '{}' id='{}'".format(msg.data(), msg.message_id()))
  7. # 无确认操作

多主题订阅

In addition to subscribing a consumer to a single Pulsar topic, you can also subscribe to multiple topics simultaneously. To use multi-topic subscriptions, you can supply a regular expression (regex) or a List of topics. 如果通过 regex 选择主题, 则所有主题都必须位于同一Pulsar命名空间中。

The following is an example.

  1. import re
  2. consumer = client.subscribe(re.compile('persistent://public/default/topic-*'), 'my-subscription')
  3. while True:
  4. msg = consumer.receive()
  5. try:
  6. print("Received message '{}' id='{}'".format(msg.data(), msg.message_id()))
  7. # Acknowledge successful processing of the message
  8. consumer.acknowledge(msg)
  9. except:
  10. # Message failed to be processed
  11. consumer.negative_acknowledge(msg)
  12. client.close()

Schema

Declare and validate schema

You can declare a schema by passing a class that inherits from pulsar.schema.Record and defines the fields as class variables. 例如:

  1. from pulsar.schema import *
  2. class Example(Record):
  3. a = String()
  4. b = Integer()
  5. c = Boolean()

With this simple schema definition, you can create producers, consumers and readers instances that refer to that.

  1. producer = client.create_producer(
  2. topic='my-topic',
  3. schema=AvroSchema(Example) )
  4. producer.send(Example(a='Hello', b=1))

After creating the producer, the Pulsar broker validates that the existing topic schema is indeed of “Avro” type and that the format is compatible with the schema definition of the Example class.

If there is a mismatch, an exception occurs in the producer creation.

Once a producer is created with a certain schema definition, it will only accept objects that are instances of the declared schema class.

Similarly, for a consumer/reader, the consumer will return an object, instance of the schema record class, rather than the raw bytes:

  1. consumer = client.subscribe(
  2. topic='my-topic',
  3. subscription_name='my-subscription',
  4. schema=AvroSchema(Example) )
  5. while True:
  6. msg = consumer.receive()
  7. ex = msg.value()
  8. try:
  9. print("Received message a={} b={} c={}".format(ex.a, ex.b, ex.c))
  10. # Acknowledge successful processing of the message
  11. consumer.acknowledge(msg)
  12. except:
  13. # Message failed to be processed
  14. consumer.negative_acknowledge(msg)

Supported schema types

You can use different builtin schema types in Pulsar. All the definitions are in the pulsar.schema package.

Schema备注
BytesSchemaGet the raw payload as a bytes object. No serialization/deserialization are performed. This is the default schema mode
StringSchemaEncode/decode payload as a UTF-8 string. Uses str objects
JsonSchemaRequire record definition. Serializes the record into standard JSON payload
AvroSchemaRequire record definition. Serializes in AVRO format

Schema definition reference

The schema definition is done through a class that inherits from pulsar.schema.Record.

This class has a number of fields which can be of either pulsar.schema.Field type or another nested Record. All the fields are specified in the pulsar.schema package. The fields are matching the AVRO fields types.

字段类型Python 类型备注
Booleanbool
Integerint
Longint
Floatfloat
Doublefloat
Bytesbytes
Stringstr
Arraylist需要指定元素的记录类型。
MapdictKey is always String. Need to specify value type.

Additionally, any Python Enum type can be used as a valid field type.

字段参数

When adding a field, you can use these parameters in the constructor.

参数默认值备注
defaultSet a default value for the field. Eg: a = Integer(default=5)
requiredFalseMark the field as “required”. It is set in the schema accordingly.

Schema 定义示例

简单定义
  1. class Example(Record):
  2. a = String()
  3. b = Integer()
  4. c = Array(String())
  5. i = Map(String())
使用枚举
  1. from enum import Enum
  2. class Color(Enum):
  3. red = 1
  4. green = 2
  5. blue = 3
  6. class Example(Record):
  7. name = String()
  8. color = Color
复杂类型
  1. class MySubRecord(Record):
  2. x = Integer()
  3. y = Long()
  4. z = String()
  5. class Example(Record):
  6. a = String()
  7. sub = MySubRecord()

端到端加密

端到端加密 可以让应用在生产端加密消息并在消费端解密消息。

Configuration

使用 Python 客户端的端到端加密功能,你需要为生产者和消费者配置 publicKeyPathprivateKeyPath

  1. publicKeyPath: "./public.pem"
  2. privateKeyPath: "./private.pem"

教程

This section provides step-by-step instructions on how to use the end-to-end encryption feature in the Python client.

前提条件

  • Pulsar Python 客户端为 2.7.1或更高版本

步骤

  1. 创建公钥和私钥密钥对。

    输入

    1. openssl genrsa -out private.pem 2048
    2. openssl rsa -in private.pem -pubout -out public.pem
  2. 创建一个生产者用来发送加密消息。

    输入

    1. import pulsar
    2. publicKeyPath = "./public.pem"
    3. privateKeyPath = "./private.pem"
    4. crypto_key_reader = pulsar.CryptoKeyReader(publicKeyPath, privateKeyPath)
    5. client = pulsar.Client('pulsar://localhost:6650')
    6. producer = client.create_producer(topic='encryption', encryption_key='encryption', crypto_key_reader=crypto_key_reader)
    7. producer.send('encryption message'.encode('utf8'))
    8. print('sent message')
    9. producer.close()
    10. client.close()
  3. 创建消费者接收加密消息。

    输入

    1. import pulsar
    2. publicKeyPath = "./public.pem"
    3. privateKeyPath = "./private.pem"
    4. crypto_key_reader = pulsar.CryptoKeyReader(publicKeyPath, privateKeyPath)
    5. client = pulsar.Client('pulsar://localhost:6650')
    6. consumer = client.subscribe(topic='encryption', subscription_name='encryption-sub', crypto_key_reader=crypto_key_reader)
    7. msg = consumer.receive()
    8. print("Received msg '{}' id = '{}'".format(msg.data(), msg.message_id()))
    9. consumer.close()
    10. client.close()
  4. 运行消费者接收加密消息。

    输入

    1. python consumer.py
  5. 在一个新的终端窗口中,运行生产者来生成加密消息。

    输入

    1. python producer.py

    现在你可以看到生产者发送消息,消费者成功收到消息。

    输出

    生产端:

    1. sent message

    消费端:

    1. Received msg 'b'encryption message'' id = '(0,0,-1,-1)'