Manage schema

本教程介绍了管理 Schema 的方法:

Schema 自动更新

如果某个 schema 通过了 schema 兼容性检查,Pulsar producer 就会自动将此 schema 更新为 topic 默认创建的 schema。

Producer 的自动更新

Producer 会在以下情况中执行AutoUpdate(自动更新):

  • If a topic doesn’t have a schema, Pulsar registers a schema automatically.

  • If a topic has a schema:

    • If a producer doesn’t carry a schema:

      • If isSchemaValidationEnforced or schemaValidationEnforced is disabled in the namespace to which the topic belongs, the producer is allowed to connect to the topic and produce data.

      • If isSchemaValidationEnforced or schemaValidationEnforced is enabled in the namespace to which the topic belongs, the producer is rejected and disconnected.

    • If a producer carries a schema:

      Broker 根据 topic 所属命名空间中已配置的兼容性检查策略执行兼容性检查。

      • 已注册 schema,producer 将连接到 broker。

      • 未注册 schema:

        • If isAllowAutoUpdateSchema sets to false, the producer is rejected to connect to a broker.

        • If isAllowAutoUpdateSchema sets to true:

          • Schema 通过了兼容性检查,则 broker 主动为该 topic 注册一个新的 schema,并连接到 producer。

          • Schema 未通过兼容性检查,则 broker 不会注册 schema,并且 producer 连接到 broker 的请求被拒。

Producer 自动更新

Consumer 的自动更新

Consumer 会在以下情况中AutoUpdate(自动更新):

  • If a consumer connects to a topic without a schema (which means the consumer receiving raw bytes), the consumer can connect to the topic successfully without doing any compatibility check.

  • If a consumer connects to a topic with a schema.

    • Topic 中不同时包含 schema / 数据 / 本地 consumer 和本地 producer:

      • If isAllowAutoUpdateSchema sets to true, then the consumer registers a schema and it is connected to a broker.

      • If isAllowAutoUpdateSchema sets to false, then the consumer is rejected to connect to a broker.

    • Topic 中包含 schema / 数据 / 本地 consumer 和本地 producer 中的一个,则执行 schema 兼容性检查。

      • Schema 通过兼容性检查,则 consumer 连接到 broker。

      • Schema 未通过兼容性检查,则拒绝 consumer 连接到 broker。

Consumer 自动更新

管理自动更新策略

可以使用 pulsar-admin 命令来管理 自动更新 策略,如下所示:

启用自动更新

可以使用 pulsar-admin命令在命名空间上启用 AutoUpdate

  1. bin/pulsar-admin namespaces set-is-allow-auto-update-schema --enable tenant/namespace

禁用自动更新

要在名称空间上禁用AutoUpdate,可以使用pulsar-admin命令。

  1. bin/pulsar-admin namespaces set-is-allow-auto-update-schema --disable tenant/namespace

禁用AutoUpdate后,你只能使用pulsar-admin命令注册新的架构。

调整兼容性

要调整名称空间上的架构兼容性级别,可以使用pulsar-admin命令。

  1. bin/pulsar-admin namespaces set-schema-compatibility-strategy --compatibility <compatibility-level> tenant/namespace

Schema 验证

By default, schemaValidationEnforced is disabled for producers:

  • 没有 schema 的 producer 可以使用 schema 向 topic 中生成任何类型的消息,这可能会导致 topic 中有垃圾数据。

  • 这样,不支持 schema 的非 Java 语言客户端就可以使用 schema 向 topic 生成消息。

但是,如果你希望对具有架构的主题有更强的保证,则可以在整个群集中或在每个命名空间的基础上启用schemaValidationEnforced

启用 Schema 验证

要在名称空间上启用schemaValidationEnforced,可以使用pulsar-admin命令。

  1. bin/pulsar-admin namespaces set-schema-validation-enforce --enable tenant/namespace

禁用 Schema 验证

要在名称空间上禁用schemaValidationEnforced,可以使用pulsar-admin命令。

  1. bin/pulsar-admin namespaces set-schema-validation-enforce --disable tenant/namespace

Schema 手动管理

若要管理架构,可以使用以下方法之一。

Method说明

Admin CLI

可以使用pulsar-admin工具来管理 Pulsar schema、broker、集群、source、sink、topic、租户等。

更多如何使用 pulsar-admin 的信息,可查阅 此处

REST API

Pulsar 在 admin RESTful API 中暴露了与 schema 相关的管理 API。 可以直接访问 admin RESTful 端点来管理 schema。

关于如何使用 Pulsar REST API 的更多信息,可参阅 此处

Java Admin API

Pulsar 提供 Java 管理库。

上传 Schema

To upload (register) a new schema for a topic, you can use one of the following methods.

Admin CLI

REST API

Java Admin API

Use the upload subcommand.

  1. $ pulsar-admin schemas upload --filename <schema-definition-file> <topic-name>

The schema-definition-file is in JSON format.

  1. { "type": "<schema-type>", "schema": "<an-utf8-encoded-string-of-schema-definition-data>", "properties": {} // the properties associated with the schema}

The schema-definition-file includes the following fields:

字段说明

type

Schema 类型。

schema

Schema 定义数据,编码格式为 UTF 8 字符集。

  • 如果 schema 是 privitive schema,则此字段应为空。

  • 如果 schema 为 struct schema,则此字段应为 Avro schema 定义的 JSON 字符串。

properties

与 schema 相关的其他属性。

Here are examples of the schema-definition-file for a JSON schema.

示例 1

  1. { "type": "JSON", "schema": "{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"com.foo\",\"fields\":[{\"name\":\"file1\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"file2\",\"type\":\"string\",\"default\":null},{\"name\":\"file3\",\"type\":[\"null\",\"string\"],\"default\":\"dfdf\"}]}", "properties": {}}

示例 2

  1. { "type": "STRING", "schema": "", "properties": { "key1": "value1" }}

向此端点发送POST请求:POST /admin/v2/schemas/:tenant/:namespace/:topic/schema

The post payload is in JSON format.

  1. { "type": "<schema-type>", "schema": "<an-utf8-encoded-string-of-schema-definition-data>", "properties": {} // the properties associated with the schema}

The post payload includes the following fields:

字段说明

type

Schema 类型。

schema

Schema 定义数据,编码格式为 UTF 8 字符集。

  • 如果 schema 是 privitive schema,则此字段应为空。

  • 如果 schema 为 struct schema,则此字段应为 Avro schema 定义的 JSON 字符串。

properties

与 schema 相关的其他属性。
  1. void createSchema(String topic, PostSchemaPayload schemaPayload)

The PostSchemaPayload includes the following fields:

字段说明

type

Schema 类型。

schema

Schema 定义数据,编码格式为 UTF 8 字符集。

  • 如果 schema 是 privitive schema,则此字段应为空。

  • 如果 schema 为 struct schema,则此字段应为 Avro schema 定义的 JSON 字符串。

properties

与 schema 相关的其他属性。

Here is an example of PostSchemaPayload:

  1. PulsarAdmin admin = …;PostSchemaPayload payload = new PostSchemaPayload();payload.setType("INT8");payload.setSchema("");admin.createSchema("my-tenant/my-ns/my-topic", payload);

获取 Schema(最新版本)

To get the latest schema for a topic, you can use one of the following methods.

Admin CLI

REST API

Java Admin API

使用 get 子命令。

  1. $ pulsar-admin schemas get <topic-name>{ "version": 0, "type": "String", "timestamp": 0, "data": "string", "properties": { "property1": "string", "property2": "string" }}

向此端点发送GET请求:GET /admin/v2/schemas/:tenant/:namespace/:topic/schema

Here is an example of a response, which is returned in JSON format.

  1. { "version": "<the-version-number-of-the-schema>", "type": "<the-schema-type>", "timestamp": "<the-creation-timestamp-of-the-version-of-the-schema>", "data": "<an-utf8-encoded-string-of-schema-definition-data>", "properties": {} // the properties associated with the schema}

The response includes the following fields:

字段说明

version

Schema 版本,是一个长数。

type

Schema 类型。

timestamp

创建此版本 schema 的时间戳。

data

Schema 定义数据,编码格式为 UTF 8 字符集。

  • 如果 schema 是 privitive schema,则此字段应为空。

  • 如果 schema 为 struct schema,则此字段应为 Avro schema 定义的 JSON 字符串。

properties

与 schema 相关的其他属性。
  1. SchemaInfo createSchema(String topic)

The SchemaInfo includes the following fields:

字段说明

name

Schema 名称。

type

Schema 类型。

schema

Schema 定义数据的字节数组,以 UTF 8 字符集编码。

如果该 schema 是“primitive” schema,则此字节数组应为空。

如果该 schema 是 struct ** schema,则此字段应为由 Avro schema 定义的 JSON 字符串转换成的字节数组。

  1. properties
    </td>
    <td> schema 相关的其他属性。</td>

Here is an example of SchemaInfo:

  1. PulsarAdmin admin = …;SchemaInfo si = admin.getSchema("my-tenant/my-ns/my-topic");

获取 schema(详细信息)

To get a specific version of a schema, you can use one of the following methods.

Admin CLI

REST API

Java Admin API

使用 get 子命令。

  1. $ pulsar-admin schemas get <topic-name> --version=<version>

向 Schema 端点发送GET请求: GET /admin/v2/schemas/:tenant/:namespace/:topic/schema/:version

Here is an example of a response, which is returned in JSON format.

  1. { "version": "<the-version-number-of-the-schema>", "type": "<the-schema-type>", "timestamp": "<the-creation-timestamp-of-the-version-of-the-schema>", "data": "<an-utf8-encoded-string-of-schema-definition-data>", "properties": {} // the properties associated with the schema}

The response includes the following fields:

字段说明

version

Schema 版本,是一个长数。

type

Schema 类型。

timestamp

创建此版本 schema 的时间戳。

data

Schema 定义数据,编码格式为 UTF 8 字符集。

  • 如果 schema 是 privitive schema,则此字段应为空。

  • 如果 schema 为 struct schema,则此字段应为 Avro schema 定义的 JSON 字符串。

properties

与 schema 相关的其他属性。
  1. SchemaInfo createSchema(String topic, long version)

The SchemaInfo includes the following fields:

字段说明

name

Schema 名称。

type

Schema 类型。

schema

Schema 定义数据的字节数组以 UTF 8 字符集编码。

如果该 schema 是“primitive” schema,则此字节数组应为空。

如果该 schema 是 struct ** schema,则此字段应为由 Avro schema 定义的 JSON 字符串转换成的字节数组。

  1. properties
    </td>
    <td> schema 相关的其他属性。</td>

Here is an example of SchemaInfo:

  1. PulsarAdmin admin = …;SchemaInfo si = admin.getSchema("my-tenant/my-ns/my-topic", 1L);

解压缩 schema

To provide a schema via a topic, you can use the following method.

Admin CLI

Use the extract subcommand.

  1. $ pulsar-admin schemas extract --classname <class-name> --jar <jar-path> --type <type-name>

删除 schema

To delete a schema for a topic, you can use one of the following methods.

Note

In any case, the delete action deletes all versions of a schema registered for a topic.

Admin CLI

REST API

Java Admin API

使用 delete 子命令。

  1. $ pulsar-admin schemas delete <topic-name>

向 Schema 端点发送DELETE请求:DELETE /admin/v2/schemas/:tenant/:namespace/:topic/schema

Here is an example of a response, which is returned in JSON format.

  1. { "version": "<the-latest-version-number-of-the-schema>",}

The response includes the following field:

字段说明
versionSchema 版本,是一个长数。
  1. void deleteSchema(String topic)

Here is an example of deleting a schema.

  1. PulsarAdmin admin = …;admin.deleteSchema("my-tenant/my-ns/my-topic");

自定义schema存储

By default, Pulsar stores various data types of schemas in Apache BookKeeper deployed alongside Pulsar.

However, you can use another storage system if needed.

实现

To use a non-default (non-BookKeeper) storage system for Pulsar schemas, you need to implement the following Java interfaces:

SchemaStorage 接口

SchemaStorage接口包含以下方法:

  1. public interface SchemaStorage {
  2. // 如何更新schema
  3. CompletableFuture<SchemaVersion> put(String key, byte[] value, byte[] hash);
  4. // 如何从存储中获取schema
  5. CompletableFuture<StoredSchema> get(String key, SchemaVersion version);
  6. // 如何删除schema
  7. CompletableFuture<SchemaVersion> delete(String key);
  8. // 用户将schema字节数据转换为SchemaVersion 对象的工具方法
  9. SchemaVersion versionFromBytes(byte[] version);
  10. // 启动schema存储客户端
  11. void start() throws Exception;
  12. // 关闭schema存储客户端
  13. void close() throws Exception;
  14. }

提示

For a complete example of schema storage implementation, see BookKeeperSchemaStorage class.

SchemaStorageFactory接口

The SchemaStorageFactory interface has the following method:

  1. public interface SchemaStorageFactory {
  2. @NotNull
  3. SchemaStorage create(PulsarService pulsar) throws Exception;
  4. }

提示

For a complete example of schema storage factory implementation, see BookKeeperSchemaStorageFactory class.

部署

To use your custom schema storage implementation, perform the following steps.

  1. 将实现打包到 JAR 文件中。

  2. 将 JAR 文件添加到 Pulsar 二进制包或源码中的 lib 文件夹。

  3. broker.conf 中的 schemaRegistryStorageClassName 配置更改为自定义工厂类。

  4. 启动 Pulsar。

设置 schema 兼容性检查策略

You can set schema compatibility check strategy at namespace or broker level.

  • If you set schema compatibility check strategy at both namespace or broker level, it uses the strategy set for the namespace level.

  • If you do not set schema compatibility check strategy at both namespace or broker level, it uses the FULL strategy.

  • If you set schema compatibility check strategy at broker level rather than namespace level, it uses the strategy set for the broker level.

  • If you set schema compatibility check strategy at namespace level rather than broker level, it uses the strategy set for the namespace level.

命名空间(Namespace)

You can set schema compatibility check strategy at namespace level using one of the following methods.

pulsar-admin

REST API

Java

Use the pulsar-admin namespaces set-schema-compatibility-strategy command.

  1. pulsar-admin namespaces set-schema-compatibility-strategy options

Send a PUT request to this endpoint: PUT /admin/v2/namespaces/:tenant/:namespace

Use the setSchemaCompatibilityStrategymethod.

  1. admin.namespaces().setSchemaCompatibilityStrategy("test", SchemaCompatibilityStrategy.FULL);

Broker

You can set schema compatibility check strategy at broker level by setting schemaCompatibilityStrategy in broker.conf or standalone.conf file.

示例

  1. schemaCompatibilityStrategy=ALWAYS_INCOMPATIBLE