Understand schema

本章解释了 Pulsar schema 的基本概念,重点讨论了一些重要话题,并介绍了相关背景知识。

SchemaInfo

SchemaInfo 的数据结构中定义了 Pulsar schema 。

SchemaInfo 在 topic 上存储和执行,不能存储在命名空间或租户中。

SchemaInfo 由以下字段组成:

字段

说明

name

Schema 名称(字符串)。

type

Schema 类型,决定如何解释 schema 数据。

  • 预定义 schema:详情可参考 here

  • 自定义 schema:空字符串。

schemapayload)

Schema 数据是一个由 8 位无符号字节和特定 schema 类型组成的序列。

properties

用户定义的 string/stirng map 属性。

应用可以使用这个包裹传递程序指定的逻辑。

属性可能是与 schema 相关联的 Git 哈希(环境字符串,如 devprod )。

示例

如下是一个字符串的 SchemaInfo

  1. {
  2. "name": "test-string-schema",
  3. "type": "STRING",
  4. "schema": "",
  5. "properties": {}
  6. }

Schema 类型

Pulsar 支持多种 schema 类型,主要可分为两类:

  • 基础类型

  • 复杂类型

基础类型

目前,Pulsar 支持以下基础类型:

基础类型说明
BOOLEAN二进制值
INT88 位带符号整数
INT1616 位带符号整数
INT3232 位带符号整数
INT6464 位带符号整数
FLOAT单精度(32位)IEEE 754 浮点数
DOUBLE双精度(64位)IEEE 754 浮点数
BYTES8 位无符号字节序列
STRINGUnicode 字符序列
TIMESTAMP (DATE, TIME)表示特定时间点的逻辑类型(精度为毫秒)。 以 INT64
存储自 1970年1月1日,00:00:00 GMT 起的毫秒数。
INSTANT时间线上的单个瞬时点,精度为纳秒
LOCAL_DATE表示日期的不可变对象,通常为“年-月-日”的格式
LOCAL_TIME表示时间的不可变对象,通常为“时-分-秒”的格式。 时间可精确到纳秒级精度。
LOCAL_DATE_TIME用来表示日期及时间的不可变对象,通常为“年-月-日-时-分-秒”格式

对于基础类型,Pulsar 不在 SchemaInfo 中存储任何 schema 数据。 SchemaInfo 中的 type 用于决定如何序列化和反序列化数据。

在一些基础 schema 的实现中,可以使用 properties 来存储单个实现的调节设置。 例如,string schema 可以将编码集存储在 properties 中,用于序列化和反序列化字符串。

The conversions between Pulsar schema types and language-specific primitive types are as below.

Schema 类型Java 类型Python 类型Go 类型
BOOLEANbooleanboolbool
INT8byteint8
INT16shortint16
INT32intint32
INT64longint64
FLOATfloatfloatfloat32
DOUBLEdoublefloatfloat64
BYTESbyte[], ByteBuffer, ByteBufbytes[]byte
STRINGstringstrstring
TIMESTAMPjava.sql.Timestamp
TIMEjava.sql.Time
DATEjava.util.Date
INSTANTjava.time.Instant
LOCAL_DATEjava.time.LocalDate
LOCAL_TIMEjava.time.LocalDateTime
LOCAL_DATE_TIMEjava.time.LocalTime

示例

此示例展示了如何使用字符串 schema。

  1. 创建一个带有字符串 schema 的 producer 并发送消息。

    1. Producer<String> producer = client.newProducer(Schema.STRING).create();
    2. producer.newMessage().value("Hello Pulsar!").send();
  2. 创建一个带有字符串 schema 的 consumer 并接收消息。

    1. Consumer<String> consumer = client.newConsumer(Schema.STRING).subscribe();
    2. consumer.receive();

复杂类型

目前,Pulsar 支持以下复杂类型:

复杂类型说明
keyvalue复杂类型的键/值对。
struct结构体对象。 支持AvroBaseStructSchema以及ProtobufNativeSchema

keyvalue

Keyvalue schema 帮助应用程序定义 schema 的键和值。

对于键值 schema 的 schemaInfo,Pulsar 同时存储键 schema 的 SchemaInfo 和值 schema 的 SchemaInfo

Pulsar提供如下两种方式将键/值对编码到消息中:

  • 嵌入

  • 分离

你可以在构建键/值 schema 时选择编码类型。

INLINE

SEPARATED

将键/值对一起编码到消息体中。

键单独编码到消息键中而值编码到消息体中。

示例

本示例展示了如何创建一个键/值 schema,并使用它来 produce 和 consume 消息。

  1. 创建具有嵌入编码类型的键/值 schema。

    1. Schema<KeyValue<Integer, String>> kvSchema = Schema.KeyValue(Schema.INT32,Schema.STRING,KeyValueEncodingType.INLINE);
  2. 可选,创建一个包含分离编码类型的键/值 schema。

    1. Schema<KeyValue<Integer, String>> kvSchema = Schema.KeyValue(Schema.INT32,Schema.STRING,KeyValueEncodingType.SEPARATED);
  3. 使用键/值 schema produce 消息。

    1. Schema<KeyValue<Integer, String>> kvSchema = Schema.KeyValue(Schema.INT32,Schema.STRING,KeyValueEncodingType.SEPARATED);Producer<KeyValue<Integer, String>> producer = client.newProducer(kvSchema) .topic(TOPIC) .create();final int key = 100;final String value = "value-100";// send the key/value messageproducer.newMessage().value(new KeyValue<>(key, value)).send();
  4. 使用键/值 schema consume 消息。

    1. Schema<KeyValue<Integer, String>> kvSchema = Schema.KeyValue(Schema.INT32,Schema.STRING,KeyValueEncodingType.SEPARATED);Consumer<KeyValue<Integer, String>> consumer = client.newConsumer(kvSchema) ... .topic(TOPIC) .subscriptionName(SubscriptionName).subscribe();// receive key/value pairMessage<KeyValue<Integer, String>> msg = consumer.receive();KeyValue<Integer, String> kv = msg.getValue();

struct

这一节将详细展示structschema类型及用法

类型

structschema目前支持AvroBaseStructSchemaProtobufNativeSchema

类型说明
AvroBaseStructSchemaPulsar使用Avro 规范来声明AvroBaseStructSchema的schema定义,并且支持AvroSchemaJsonSchema,以及 ProtobufSchema

允许Pulsar:
-使用同一个工具来管理schema的定义
-使用不同的序列化或反序列化方法来处理数据
ProtobufNativeSchemaProtobufNativeSchema是基于protobuf原生解释器的。

允许Pulsar:
-使用Protobuf3序列化或反序列化数据
-使用AutoConsume来反序列化数据。
用法

Pulsar提供如下方式来使用structschema:

  • 静态

  • 通用

  • schema定义

static

generic

SchemaDefinition

你可以使用Java的POJO类、Go语言的struct以及Avro或Protobuf等工具生成的类来预定义结构schema.

示例

Pulsar 使用 Avro 库从预定义结构中获取 schema 定义。 Schema 定义是 schema 数据,存储在 SchemaInfo 中。

  1. Create the User class to define the messages sent to Pulsar topics.

    1. @Builder@AllArgsConstructor@NoArgsConstructorpublic static class User { String name; int age;}
  2. 结构 schema 创建 producer 并发送消息。

    1. Producer<User> producer = client.newProducer(Schema.AVRO(User.class)).create();producer.newMessage().value(User.builder().name("pulsar-user").age(1).build()).send();
  3. 结构 schema 创建 consumer 并接收消息。

    1. Consumer<User> consumer = client.newConsumer(Schema.AVRO(User.class)).subscribe();User user = consumer.receive();

有时应用程序中不包含预定义 schema,则可以使用此方法来定义 schema 并访问数据。

可以使用 GenericSchemaBuilder 来定义结构 schema,使用 GenericRecordBuilder 来生成通用结构,并 consume 消息到 GenericRecord

示例

  1. 使用 RecordSchemaBuilder 创建 schema。

    1. RecordSchemaBuilder recordSchemaBuilder = SchemaBuilder.record("schemaName");recordSchemaBuilder.field("intField").type(SchemaType.INT32);SchemaInfo schemaInfo = recordSchemaBuilder.build(SchemaType.AVRO);Producer<GenericRecord> producer = client.newProducer(Schema.generic(schemaInfo)).create();
  2. 使用 RecordBuilder 创建结构记录。

    1. producer.newMessage().value(schema.newRecordBuilder() .set("intField", 32) .build()).send();

你可以通过定义一个Schema定义来生成structschema。

示例

  1. Create the User class to define the messages sent to Pulsar topics.

    1. @Builder@AllArgsConstructor@NoArgsConstructorpublic static class User { String name; int age;}
  2. 使用Schema定义创建一个producer并发送消息。

    1. SchemaDefinition<User> schemaDefinition = SchemaDefinition.<User>builder().withPojo(User.class).build();Producer<User> producer = client.newProducer(Schema.AVRO(schemaDefinition)).create();producer.newMessage().value(User.builder().name("pulsar-user").age(1).build()).send();
  3. 使用Schema定义创建一个consumer并接收消息。

    1. SchemaDefinition<User> schemaDefinition = SchemaDefinition.<User>builder().withPojo(User.class).build();Consumer<User> consumer = client.newConsumer(Schema.AVRO(schemaDefinition)).subscribe();User user = consumer.receive().getValue();

自动 Schema

如果事先不知道 Pulsar topic 的 schema 类型,则可以使用 AUTO schema 向 broker 中 produce 或从 broker 中 consume 通用记录。

Auto Schema 类型说明
AUTO_PRODUCEThis is useful for transferring data from a producer to a Pulsar topic that has a schema.
AUTO_CONSUMEThis is useful for transferring data from a Pulsar topic that has a schema to a consumer.

AUTO_PRODUCE

AUTO_PRODUCE schema 帮助 producer 验证其发送的字节是否与 topic 的 schema 兼容。

示例

假设:

  • You have a producer processing messages from a Kafka topic K.

  • You have a Pulsar topic P, and you do not know its schema type.

  • Your application reads the messages from K and writes the messages to P.

In this case, you can use AUTO_PRODUCE to verify whether the bytes produced by K can be sent to P or not.

  1. Produce<byte[]> pulsarProducer = client.newProducer(Schema.AUTO_PRODUCE())
  2. .create();
  3. byte[] kafkaMessageBytes = ;
  4. pulsarProducer.produce(kafkaMessageBytes);

AUTO_CONSUME

AUTO_CONSUME schema 帮助 Pulsar topic 验证其发送的字节是否与 consumer 兼容,即 Pulsar topic 使用从 broker 端检索到的 SchemaInfo 将消息反序列化为特定语言的对象。

目前AUTO_CONSUME 仅支持AVRO、JSON 以及ProtobufNativeSchema这几种schema格式 将消息反序列化为通用记录

示例

假设:

  • You have a Pulsar topic P.

  • You have a consumer (for example, MySQL) receiving messages from the topic P.

  • You application reads the messages from P and writes the messages to MySQL.

In this case, you can use AUTO_CONSUME to verify whether the bytes produced by P can be sent to MySQL or not.

  1. Consumer<GenericRecord> pulsarConsumer = client.newConsumer(Schema.AUTO_CONSUME())
  2. .subscribe();
  3. Message<GenericRecord> msg = consumer.receive() ;
  4. GenericRecord record = msg.getValue();

Schema 版本

每个用 topic 存储的 SchemaInfo 都有一个版本。 Schema 版本管理单个 topic 中 schema 的更改。

将给定 SchemaInfo 产生的消息标记为 schema 版本,这样当消息被 Pulsar 客户端 consume 时,Pulsar 客户端可以使用 schema 版本检索相应的 SchemaInfo,并使用 SchemaInfo 反序列化数据。

Schema版本是连续的。 Schema 存储在负责处理相关 topic 的 broker 中,以进行版本分配。

一旦分配/提供了schema的版本,后续由该生产者生产的所有消息,都将被恰当的版本所标记。

示例

以下示例展示了 schema 版本如何工作。

假设通过下面的代码创建 Pulsar Java 客户端,尝试连接到 Pulsar 并开始发送消息:

  1. PulsarClient client = PulsarClient.builder()
  2. .serviceUrl("pulsar://localhost:6650")
  3. .build();
  4. Producer<SensorReading> producer = client.newProducer(JSONSchema.of(SensorReading.class))
  5. .topic("sensor-data")
  6. .sendTimeout(3, TimeUnit.SECONDS)
  7. .create();

下表列出了连接尝试时可能出现的情况以及发生的原因:

场景发生了什么
  • 此 topic 中无 schema。

(1)使用特定 schema 创建 producer。

(2)由于现有 schema 与“SensorReading”schema 不兼容,将当前 schema 传输到并存储在 broker 中。

(3)任何使用相同 schema 或 topic 创建的 consumer 都可以 consume 来自 sensor-data topic 的消息。

  • schema 已存在。

  • Producer 使用已存在的 schema 连接。

(1)将 schema 传输到 Pulsar broker 中。

(2)Broker 确定此 schema 兼容。

(3)Broker 尝试将此 schema 存储在 BookKeeper 中,但又认为其已存储成功,所以此 schema 被用于标记生成的消息。

  • schema 已存在。

  • Producer 使用兼容的新 schema 连接。

(1)将 schema 传输到 Pulsar broker 中。

(2)Broker 确定此 schema 兼容,并将新 schema 储存为当前版本(带有新版本号)。

Schema 工作方式

Pulsar schemas are applied and enforced at the topic level (schemas cannot be applied at the namespace or tenant level).

Producer 和 consumer 将 schema 上传到 broker,因此 Pulsar schema 同时工作在 producer 端和 consumer 端。

Producer 端

此图表展示了在 producer 中 schema 如何工作。

Schema 在 producer 端的工作

  1. 应用程序使用 schema 实例构造 producer 实例。

    Schema 实例定义了使用 producer 实例生成的数据的 schema 模式。

    以 AVRO 为例,Pulsar 从 POJO 类中提取 schema 定义,并构建 producer 连接时需要传递给 broker 的 SchemaInfo

  2. Producer 通过从已通过 schema 实例中提取的 SchemaInfo 连接到 broker。

  3. Broker 在 schema 存储中查找 schema 以确认其是否已注册。

  4. 如已注册,broker 会跳过 schema 验证(因为已注册 schema 是已知 schema),并将 schema 版本返回给 producer。

  5. 如未注册,则 broker 需验证是否可以在此命名空间中自动创建 schema:

  • If isAllowAutoUpdateSchema sets to true, then a schema can be created, and the broker validates the schema based on the schema compatibility check strategy defined for the topic.

  • If isAllowAutoUpdateSchema sets to false, then a schema can not be created, and the producer is rejected to connect to the broker.

Tip:

isAllowAutoUpdateSchema can be set via Pulsar admin API or REST API.

如何通过 Pulsar admin API 设置 isAllowAutoUpdateSchema,请参阅自动更新策略管理

  1. 如果方案被允许更新,则进行兼容策略检查。
  • If the schema is compatible, the broker stores it and returns the schema version to the producer.

    All the messages produced by this producer are tagged with the schema version.

  • If the schema is incompatible, the broker rejects it.

Consumer 端

This diagram illustrates how does Schema work on the consumer side.

Schema works at the consumer side

  1. The application uses a schema instance to construct a consumer instance.

    The schema instance defines the schema that the consumer uses for decoding messages received from a broker.

  2. The consumer connects to the broker with the SchemaInfo extracted from the passed-in schema instance.

  3. Broker 决定该 topic 是否中是否包含 schema / 数据 / 本地 consumer 和本地 producer 三者中的一个。

  4. Topic 中不同时包含 schema / 数据 / 本地 consumer 和本地 producer:

    • If isAllowAutoUpdateSchema sets to true, then the consumer registers a schema and it is connected to a broker.

    • If isAllowAutoUpdateSchema sets to false, then the consumer is rejected to connect to a broker.

  5. Topic 中包含 schema / 数据 / 本地 consumer 和本地 producer 中的一个,则执行 schema 兼容性检查。

    • Schema 通过兼容性检查,则 consumer 连接到 broker。

    • Schema 未通过兼容性检查,则拒绝 consumer 连接到 broker。

  6. Consumer 收到 broker 的信息。

    如果 consumer 所使用的 schema 支持 schema 版本化(例如,AVRO schema),则 consumer 获取在消息中加标签 schema 的信息 SchemaInfo,并使用已通过的 schema 和在消息中加标签的 schema 来解码消息。