对于围绕消息总线如pulsar搭建的应用来说,类型安全非常的重要。 生产者和消费者需要某种机制来协调topic层面的类型, 以免出现各种各样的潜在问题 (例如序列化和反序列化问题)。 应用通常采用两种基本方法之一,在消息传递中确保类型安全:

  1. “客户端” 方法, 消息生产者和消费者不仅负责序列化和反序列处理消息 (包括原始字节), 而且还 “知道” 哪些类型是通过哪些topic传输的。 如果生产者往主题 topic-1发送温度感应数据,而这个topic的consumer按照读取湿度感应的方式来转换数据,将会遇到问题。
  2. “服务器端” 方法, 生产者和消费者告知系统哪些数据类型可以通过topic传输。 通过这种方法,消息系统强制执行类型安全, 并确保生产者和消费者保持同步。

这两种方法都被Pulsar支持,你可以在topic的基础上,自由选择采用哪一种,或者混用。

  1. “客户端” 方法中, 生产者和消费者可以发送和接收由原始字节数组组成的消息, 并保留所有的类型安全在应用中,以“带外”基础上强制执行。
  2. For the “server-side” approach, Pulsar has a built-in schema registry that enables clients to upload data schemas on a per-topic basis. 这些schema显示了,topic可以识别哪些数据类型为有效。

Note

Currently, the Pulsar schema registry is only available for the Java client, CGo client, Python client, and C++ client.

基础架构

当你使用Schema创建带类型的producer时,schema将会被自动上传。 此外,可以通过Pulsar的 REST API 手动上载、提取和更新schema。

其他schema注册的后端

开箱即用,Pulsar使用 Apache BookKeeper log存储系统来保存schema。 不过你也可以按照你的需要使用不同的后端。 客制化的schema存储逻辑文档即将推出。

Schema是如何工作的

Pulsar schemas are applied and enforced at the topic level (schemas cannot be applied at the namespace or tenant level). Producers and consumers upload schemas to Pulsar brokers.

Pulsar schema是非常简单的数据结构,它的构成如下:

  • A name. In Pulsar, a schema’s name is the topic to which the schema is applied.
  • A payload, which is a binary representation of the schema
  • A schema type
  • User-defined properties as a string/string map. Properties如何使用,完全由程序指定。 Properties可能是关联到schema的Git hash,例如dev或者prod等环境信息。

Schema版本

我们通过一个例子,来讲解schema版本如何工作。 假设使用以下代码创建Pulsar Java 客户端,尝试连接到Pulsar,并开始发送消息:

  1. PulsarClient client = PulsarClient.builder()
  2. .serviceUrl("pulsar://localhost:6650")
  3. .build();
  4. Producer<SensorReading> producer = client.newProducer(JSONSchema.of(SensorReading.class))
  5. .topic("sensor-data")
  6. .sendTimeout(3, TimeUnit.SECONDS)
  7. .create();

下表列出了尝试连接时可能出现的场景, 以及每种场景下发生了什么:

场景发生了什么
该Topic不存在schema使用给定的schema创建了producer。 Schema传输到broker并被存储 (因为没有现成的schema与 SensorReading schema “兼容”)。 任何使用同样schema/topic的consumer可以消费sensor-data topic中的消息。
Schema已经存在;producer使用已经被保存过的schema进行连接Schema被传输到Pulsar broker。 Broker确认此schema是兼容的。 Broker尝试在BookKeeper存储schema,但是发现它已经存在了,所以用它来标记生产的消息。
Schema已经存在;producer使用兼容的新schema进行连接。The producer transmits the schema to the broker. The broker determines that the schema is compatible and stores the new schema as the current version (with a new version number).

Schema版本是连续的。 Schema被处理关联topic的broker所保存,便于进行版本的分配。 一旦分配/提供了schema的版本,后续由该生产者生产的所有消息,都将被恰当的版本所标记。

支持的schema格式

以下格式被Pulsar schema注册所支持:

  • None. If no schema is specified for a topic, producers and consumers will handle raw bytes.
  • String (用于 UTF-8-encoded 字符串)
  • JSON
  • Protobuf
  • Avro

使用介绍,请参考你选择的客户端类库文档。

未来的Pulsar release将会支持其他更多schema格式。

管理 Schema

你可以使用Pulsar的admin tools来管理topic的schema