Understand schema

本章解释了 Pulsar schema 的基本概念,重点讨论了一些重要话题,并介绍了相关背景知识。

SchemaInfo

SchemaInfo 的数据结构中定义 Pulsar schema 。

SchemaInfo 在 topic 上存储和执行,不能存储在命名空间或租户中。

SchemaInfo 由以下字段组成:

字段

说明

name

Schema 名称(字符串)。

type

Schema 类型,决定如何解释 schema 数据。

  • 预定义 schema:详情参考 here.

  • 自定义模式:空字符串。

schemapayload)

Schema 数据是一个由 8 位无符号字节和特定 schema 类型组成的序列。

properties

用户定义的 string/stirng map 属性。

应用可以使用这个包裹传递程序指定的逻辑。

属性可能是与 schema 相关联的 Git 哈希(环境字符串,如 devprod )。

示例

字符串的 SchemaInfo

  1. {
  2. "name": "test-string-schema",
  3. "type": "STRING",
  4. "schema": "",
  5. "properties": {}
  6. }

Schema 类型

Pulsar 支持多种 schema 类型,主要可分为两类:

  • 原始类型

  • 复杂类型

原始类型

目前,Pulsar 支持以下原始类型:

原始类型说明
BOOLEAN二进制值
INT88 位带符号整数
INT1616 位带符号整数
INT3232 位带符号整数
INT6464 位带符号整数
FLOAT单精度(32位)IEEE 754 浮点数
DOUBLE双精度(64位)IEEE 754 浮点数
BYTES8 位无符号字节序列
STRINGUnicode 字符序列
TIMESTAMP (DATE, TIME)表示特定时间点的逻辑类型(精度为毫秒)。 以 INT64
存储自 1970年1月1日,00:00:00 GMT 起的毫秒数。
INSTANTA single instantaneous point on the time-line with nanoseconds precision
LOCAL_DATEAn immutable date-time object that represents a date, often viewed as year-month-day
LOCAL_TIMEAn immutable date-time object that represents a time, often viewed as hour-minute-second. Time is represented to nanosecond precision.
LOCAL_DATE_TIMEAn immutable date-time object that represents a date-time, often viewed as year-month-day-hour-minute-second

对于原始类型,Pulsar 不在 SchemaInfo 中存储任何 schema 数据。 SchemaInfo 中的类型用于决定如何序列化和反序列化数据。

在一些原始 schema 的实现中,可以使用属性来存储单个实现的调节设置。 例如,字符串 schema 可以将编码集存储在属性中,用于序列化和反序列化字符串。

The conversions between Pulsar schema types and language-specific primitive types are as below.

Schema 类型Java 类型Python 类型Go 类型
BOOLEANbooleanboolbool
INT8byteint8
INT16shortint16
INT32intint32
INT64longint64
FLOATfloatfloatfloat32
DOUBLEdoublefloatfloat64
BYTESbyte[], ByteBuffer, ByteBufbytes[]字节
STRINGstringstrstring
TIMESTAMPjava.sql.Timestamp
TIMEjava.sql.Time
DATEjava.util.Date
INSTANTjava.time.Instant
LOCAL_DATEjava.time.LocalDate
LOCAL_TIMEjava.time.LocalDateTime
LOCAL_DATE_TIMEjava.time.LocalTime

示例

此示例展示了如何使用字符串 schema。

  1. 创建一个带有字符串 schema 的 producer 并发送消息。

    1. Producer<String> producer = client.newProducer(Schema.STRING).create();
    2. producer.newMessage().value("Hello Pulsar!").send();
  2. 创建一个带有字符串 schema 的 consumer 并接收消息。

    1. Consumer<String> consumer = client.newConsumer(Schema.STRING).subscribe();
    2. consumer.receive();

复杂类型

目前,Pulsar 支持以下复杂类型:

复杂类型说明
keyvalue复杂类型的键/值对。
structHandles structured data. It supports AvroBaseStructSchema and ProtobufNativeSchema.

keyvalue

键值 schema 定义应用程序中 schema 的键和值。

对于键值 schema 的 schemaInfo,Pulsar 同时存储键 schema 的 SchemaInfo 和值 schema 的 SchemaInfo

Pulsar provides the following methods to encode a key/value pair in messages:

  • 嵌入

  • 分离

You can choose the encoding type when constructing the key/value schema.

INLINE

SEPARATED

Key/value pairs are encoded together in the message payload.

Key is encoded in the message key and the value is encoded in the message payload.

示例

本示例展示了如何创建一个键/值 schema,并使用它来 produce 和 consume 消息。

  1. 创建具有嵌入编码类型的键/值 schema。

    1. Schema<KeyValue<Integer, String>> kvSchema = Schema.KeyValue(Schema.INT32,Schema.STRING,KeyValueEncodingType.INLINE);
  2. 可选,创建一个包含分离编码类型的键/值 schema。

    1. Schema<KeyValue<Integer, String>> kvSchema = Schema.KeyValue(Schema.INT32,Schema.STRING,KeyValueEncodingType.SEPARATED);
  3. 使用键/值 schema produce 消息。

    1. Schema<KeyValue<Integer, String>> kvSchema = Schema.KeyValue(Schema.INT32,Schema.STRING,KeyValueEncodingType.SEPARATED);Producer<KeyValue<Integer, String>> producer = client.newProducer(kvSchema) .topic(TOPIC) .create();final int key = 100;final String value = "value-100";// send the key/value messageproducer.newMessage().value(new KeyValue<>(key, value)).send();
  4. 使用键/值 schema consume 消息。

    1. Schema<KeyValue<Integer, String>> kvSchema = Schema.KeyValue(Schema.INT32,Schema.STRING,KeyValueEncodingType.SEPARATED);Consumer<KeyValue<Integer, String>> consumer = client.newConsumer(kvSchema) ... .topic(TOPIC) .subscriptionName(SubscriptionName).subscribe();// receive key/value pairMessage<KeyValue<Integer, String>> msg = consumer.receive();KeyValue<Integer, String> kv = msg.getValue();

struct

This section describes the details of type and usage of the struct schema.

类型

struct schema supports AvroBaseStructSchema and ProtobufNativeSchema.

类型说明
AvroBaseStructSchemaPulsar uses Avro Specification to declare the schema definition for AvroBaseStructSchema, which supports AvroSchema, JsonSchema, and ProtobufSchema.

This allows Pulsar:
- to use the same tools to manage schema definitions
- to use different serialization or deserialization methods to handle data
ProtobufNativeSchemaProtobufNativeSchema is based on protobuf native Descriptor.

This allows Pulsar:
- to use native protobuf-v3 to serialize or deserialize data
- to use AutoConsume to deserialize data.
用法

Pulsar provides the following methods to use the struct schema:

  • 静态

  • 通用

  • SchemaDefinition

static

generic

SchemaDefinition

You can predefine the struct schema, which can be a POJO in Java, a struct in Go, or classes generated by Avro or Protobuf tools.

示例

Pulsar 使用 Avro 库从预定义结构中获取 schema 定义。 Schema 定义是 schema 数据,存储在 SchemaInfo 中。

  1. Create the User class to define the messages sent to Pulsar topics.

    1. public class User { String name; int age;}
  2. 结构 schema 创建 producer 并发送消息。

    1. Producer<User> producer = client.newProducer(Schema.AVRO(User.class)).create();producer.newMessage().value(User.builder().userName("pulsar-user").userId(1L).build()).send();
  3. 结构 schema 创建 consumer 并接收消息。

    1. Consumer<User> consumer = client.newConsumer(Schema.AVRO(User.class)).subscribe();User user = consumer.receive();

有时应用程序中不包含预定义 schema,则可以使用此方法来定义 schema 并访问数据。

可以使用 GenericSchemaBuilder 来定义结构 schema,使用 GenericRecordBuilder 来生成通用结构,并 consume 消息到 GenericRecord

示例

  1. 使用 RecordSchemaBuilder 创建 schema。

    1. RecordSchemaBuilder recordSchemaBuilder = SchemaBuilder.record("schemaName");recordSchemaBuilder.field("intField").type(SchemaType.INT32);SchemaInfo schemaInfo = recordSchemaBuilder.build(SchemaType.AVRO);Producer<GenericRecord> producer = client.newProducer(Schema.generic(schemaInfo)).create();
  2. 使用 RecordBuilder 创建结构记录。

    1. producer.newMessage().value(schema.newRecordBuilder() .set("intField", 32) .build()).send();

You can define the schemaDefinition to generate a struct schema.

示例

  1. Create the User class to define the messages sent to Pulsar topics.

    1. public class User { String name; int age;}
  2. Create a producer with a SchemaDefinition and send messages.

    1. SchemaDefinition<User> schemaDefinition = SchemaDefinition.builder().withPojo(User.class).build();Producer<User> producer = client.newProducer(schemaDefinition).create();producer.newMessage().value(User.builder().userName("pulsar-user").userId(1L).build()).send();
  3. Create a consumer with a SchemaDefinition schema and receive messages

    1. SchemaDefinition<User> schemaDefinition = SchemaDefinition.builder().withPojo(User.class).build();Consumer<User> consumer = client.newConsumer(schemaDefinition).subscribe();User user = consumer.receive();

自动 Schema

如果事先不知道 Pulsar topic 的 schema 类型,则可以使用 AUTO schema 向 broker 中 produce 或从 broker 中 consume 通用记录。

Auto Schema 类型说明
AUTO_PRODUCEThis is useful for transferring data from a producer to a Pulsar topic that has a schema.
AUTO_CONSUMEThis is useful for transferring data from a Pulsar topic that has a schema to a consumer.

AUTO_PRODUCE

AUTO_PRODUCE schema 帮助 producer 验证其发送的字节是否与 topic 的 schema 兼容。

示例

假设:

  • You have a producer processing messages from a Kafka topic K.

  • You have a Pulsar topic P, and you do not know its schema type.

  • Your application reads the messages from K and writes the messages to P.

In this case, you can use AUTO_PRODUCE to verify whether the bytes produced by K can be sent to P or not.

  1. Produce<byte[]> pulsarProducer = client.newProducer(Schema.AUTO_PRODUCE())
  2. .create();
  3. byte[] kafkaMessageBytes = ;
  4. pulsarProducer.produce(kafkaMessageBytes);

AUTO_CONSUME

AUTO_CONSUME schema 帮助 Pulsar topic 验证其发送的字节是否与 consumer 兼容,即 Pulsar topic 使用从 broker 端检索到的 SchemaInfo 将消息反序列化为特定语言的对象。

Currently, AUTO_CONSUME supports AVRO, JSON and ProtobufNativeSchema schemas. 将消息反序列化为通用记录

示例

假设:

  • You have a Pulsar topic P.

  • You have a consumer (for example, MySQL) receiving messages from the topic P.

  • You application reads the messages from P and writes the messages to MySQL.

In this case, you can use AUTO_CONSUME to verify whether the bytes produced by P can be sent to MySQL or not.

  1. Consumer<GenericRecord> pulsarConsumer = client.newConsumer(Schema.AUTO_CONSUME())
  2. .subscribe();
  3. Message<GenericRecord> msg = consumer.receive() ;
  4. GenericRecord record = msg.getValue();

Schema 版本

每个用 topic 存储的 SchemaInfo 都有一个版本。 Schema 版本管理单个 topic 中 schema 的更改。

将给定 SchemaInfo 产生的消息标记为 schema 版本,这样当消息被 Pulsar 客户端 consume 时,Pulsar 客户端可以使用 schema 版本检索相应的 SchemaInfo,并使用 SchemaInfo 反序列化数据。

Schema版本是连续的。 Schema 存储在负责处理相关 topic 的 broker 中,以进行版本分配。

一旦分配/提供了schema的版本,后续由该生产者生产的所有消息,都将被恰当的版本所标记。

示例

以下示例展示了 schema 版本如何工作。

假设通过下面的代码创建 Pulsar Java 客户端,尝试连接到 Pulsar 并开始发送消息:

  1. PulsarClient client = PulsarClient.builder()
  2. .serviceUrl("pulsar://localhost:6650")
  3. .build();
  4. Producer<SensorReading> producer = client.newProducer(JSONSchema.of(SensorReading.class))
  5. .topic("sensor-data")
  6. .sendTimeout(3, TimeUnit.SECONDS)
  7. .create();

下表列出了连接尝试时可能出现的情况以及发生的原因:

场景发生了什么
  • 此 topic 中无 schema。

(1)使用特定 schema 创建 producer。

(2)由于现有 schema 与“SensorReading”schema 不兼容,将当前 schema 传输到并存储在 broker 中。

(3)任何使用相同 schema 或 topic 创建的 consumer 都可以 consume 来自 sensor-data topic 的消息。

  • schema 已存在。

  • Producer 使用已存在的 schema 连接。

(1)将 schema 传输到 Pulsar broker 中。

(2)Broker 确定此 schema 兼容。

(3)Broker 尝试将此 schema 存储在 BookKeeper 中,但又认为其已存储成功,所以此 schema 被用于标记生成的消息。

  • schema 已存在。

  • Producer 使用兼容的新 schema 连接。

(1)将 schema 传输到 Pulsar broker 中。

(2)Broker 确定此 schema 兼容,并将新 schema 储存为当前版本(带有新版本号)。

Schema 工作方式

Pulsar schemas are applied and enforced at the topic level (schemas cannot be applied at the namespace or tenant level).

Producer 和 consumer 将 schema 上传到 broker,因此 Pulsar schema 同时工作在 producer 端和 consumer 端。

Producer 端

此图表展示了在 producer 中 schema 如何工作。

Schema 在 producer 端的工作

  1. 应用程序使用 schema 实例构造 producer 实例。

    Schema 实例定义了使用 producer 实例生成的数据的 schema 模式。

    以 AVRO 为例,Pulsar 从 POJO 类中提取 schema 定义,并构建 producer 连接时需要传递给 broker 的 SchemaInfo

  2. Producer 通过从已通过 schema 实例中提取的 SchemaInfo 连接到 broker。

  3. Broker 在 schema 存储中查找 schema 以确认其是否已注册。

  4. 如已注册,broker 会跳过 schema 验证(因为已注册 schema 是已知 schema),并将 schema 版本返回给 producer。

  5. 如未注册,则 broker 需验证是否可以在此命名空间中自动创建 schema:

  • If isAllowAutoUpdateSchema sets to true, then a schema can be created, and the broker validates the schema based on the schema compatibility check strategy defined for the topic.

  • If isAllowAutoUpdateSchema sets to false, then a schema can not be created, and the producer is rejected to connect to the broker.

Tip:

isAllowAutoUpdateSchema can be set via Pulsar admin API or REST API.

如何通过 Pulsar admin API 设置 isAllowAutoUpdateSchema,请参阅自动更新策略管理

  1. 如果方案被允许更新,则进行兼容策略检查。
  • If the schema is compatible, the broker stores it and returns the schema version to the producer.

    All the messages produced by this producer are tagged with the schema version.

  • If the schema is incompatible, the broker rejects it.

Consumer 端

This diagram illustrates how does Schema work on the consumer side.

Schema works at the consumer side

  1. The application uses a schema instance to construct a consumer instance.

    The schema instance defines the schema that the consumer uses for decoding messages received from a broker.

  2. The consumer connects to the broker with the SchemaInfo extracted from the passed-in schema instance.

  3. Broker 决定该 topic 是否中是否包含 schema / 数据 / 本地 consumer 和本地 producer 三者中的一个。

  4. Topic 中不同时包含 schema / 数据 / 本地 consumer 和本地 producer:

    • If isAllowAutoUpdateSchema sets to true, then the consumer registers a schema and it is connected to a broker.

    • If isAllowAutoUpdateSchema sets to false, then the consumer is rejected to connect to a broker.

  5. Topic 中包含 schema / 数据 / 本地 consumer 和本地 producer 中的一个,则执行 schema 兼容性检查。

    • Schema 通过兼容性检查,则 consumer 连接到 broker。

    • Schema 未通过兼容性检查,则拒绝 consumer 连接到 broker。

  6. Consumer 收到 broker 的信息。

    如果 consumer 所使用的 schema 支持 schema 版本化(例如,AVRO schema),则 consumer 获取在消息中加标签 schema 的信息 SchemaInfo,并使用已通过的 schema 和在消息中加标签的 schema 来解码消息。