Manage schema

本教程介绍了管理 Schema 的方法:

Schema 自动更新

如果某个 schema 通过了 schema 兼容性检查,Pulsar producer 就会自动将此 schema 更新为 topic 默认创建的 schema。

Producer 的自动更新

Producer 会在以下情况中自动更新

  • If a topic doesn’t have a schema, Pulsar registers a schema automatically.

  • If a topic has a schema:

    • If a producer doesn’t carry a schema:

      • If isSchemaValidationEnforced or schemaValidationEnforced is disabled in the namespace to which the topic belongs, the producer is allowed to connect to the topic and produce data.

      • If isSchemaValidationEnforced or schemaValidationEnforced is enabled in the namespace to which the topic belongs, the producer is rejected and disconnected.

    • If a producer carries a schema:

      Broker 根据 topic 所属命名空间中已配置的兼容性检查策略执行兼容性检查。

      • 已注册 schema,producer 将连接到 broker。

      • 未注册 schema:

        • If isAllowAutoUpdateSchema sets to false, the producer is rejected to connect to a broker.

        • If isAllowAutoUpdateSchema sets to true:

          • Schema 通过了兼容性检查,则 broker 主动为该 topic 注册一个新的 schema,并连接到 producer。

          • Schema 未通过兼容性检查,则 broker 不会注册 schema,并且 producer 连接到 broker 的请求被拒。

Producer 自动更新

Consumer 的自动更新

Consumer 会在以下情况中自动更新

  • If a consumer connects to a topic without a schema (which means the consumer receiving raw bytes), the consumer can connect to the topic successfully without doing any compatibility check.

  • If a consumer connects to a topic with a schema.

    • Topic 中不同时包含 schema / 数据 / 本地 consumer 和本地 producer:

      • If isAllowAutoUpdateSchema sets to true, then the consumer registers a schema and it is connected to a broker.

      • If isAllowAutoUpdateSchema sets to false, then the consumer is rejected to connect to a broker.

    • Topic 中包含 schema / 数据 / 本地 consumer 和本地 producer 中的一个,则执行 schema 兼容性检查。

      • Schema 通过兼容性检查,则 consumer 连接到 broker。

      • Schema 未通过兼容性检查,则拒绝 consumer 连接到 broker。

Consumer 自动更新

管理自动更新策略

可以使用 pulsar-admin 命令来管理 自动更新 策略,如下所示:

启用自动更新

可以使用 pulsar-admin命令在命名空间上启用 自动更新

  1. bin/pulsar-admin namespaces set-is-allow-auto-update-schema --enable tenant/namespace

禁用自动更新

要在名称空间上禁用AutoUpdate,可以使用pulsar-admin命令。

  1. bin/pulsar-admin namespaces set-is-allow-auto-update-schema --disable tenant/namespace

禁用AutoUpdate后,你只能使用pulsar-admin命令注册新的架构。

调整兼容性

要调整名称空间上的架构兼容性级别,可以使用pulsar-admin命令。

  1. bin/pulsar-admin namespaces set-schema-compatibility-strategy --compatibility <compatibility-level> tenant/namespace

Schema 验证

By default, schemaValidationEnforced is disabled for producers:

  • 没有 schema 的 producer 可以使用 schema 向 topic 中生成任何类型的消息,这可能会导致 topic 中有垃圾数据。

  • 这样,不支持 schema 的非 Java 语言客户端就可以使用 schema 向 topic 生成消息。

但是,如果你希望对具有架构的主题有更强的保证,则可以在整个群集中或在每个命名空间的基础上启用schemaValidationEnforced

启用 Schema 验证

要在名称空间上启用schemaValidationEnforced,可以使用pulsar-admin命令。

  1. bin/pulsar-admin namespaces set-schema-validation-enforce --enable tenant/namespace

禁用 Schema 验证

要在名称空间上禁用schemaValidationEnforced,可以使用pulsar-admin命令。

  1. bin/pulsar-admin namespaces set-schema-validation-enforce --disable tenant/namespace

Schema 手动管理

若要管理架构,可以使用以下方法之一。

Method说明

Admin CLI

可以使用pulsar-admin工具来管理 Pulsar schema、broker、集群、source、sink、topic、租户等。

关于如何使用 pulsar-admin 的更多信息,查阅 here

REST API

Pulsar 在 admin RESTful API 中显示与 schema 相关的管理 API。 可以直接访问 admin RESTfull 端来管理 schema。

关于如何使用 Pulsar REST API 的更多信息,参阅 here

Java Admin API

Pulsar 提供 Java 管理库。

更新 Schema

To upload (register) a new schema for a topic, you can use one of the following methods.

Admin CLI

REST API

Java Admin API

Use the upload subcommand.

  1. $ pulsar-admin schemas upload --filename <schema-definition-file> <topic-name>

The schema-definition-file is in JSON format.

  1. { "type": "<schema-type>", "schema": "<an-utf8-encoded-string-of-schema-definition-data>", "properties": {} // the properties associated with the schema}

The schema-definition-file includes the following fields:

字段说明

type

Schema 类型。

schema

Schema 定义数据以 UTF 8 字符集编码。

  • 如果 schema 是 privitive schema,则此字段应为空。

  • 如果 schema 为 struct schema,则此字段应为 Avro schema 定义的 JSON 字符串。

properties

与 schema 相关的其他属性。

Here are examples of the schema-definition-file for a JSON schema.

示例 1

  1. { "type": "JSON", "schema": "{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"com.foo\",\"fields\":[{\"name\":\"file1\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"file2\",\"type\":\"string\",\"default\":null},{\"name\":\"file3\",\"type\":[\"null\",\"string\"],\"default\":\"dfdf\"}]}", "properties": {}}

示例 2

  1. { "type": "STRING", "schema": "", "properties": { "key1": "value1" }}

Send a POST request to this endpoint: POST /admin/v2/schemas/:tenant/:namespace/:topic/schema

The post payload is in JSON format.

  1. { "type": "<schema-type>", "schema": "<an-utf8-encoded-string-of-schema-definition-data>", "properties": {} // the properties associated with the schema}

The post payload includes the following fields:

字段说明

type

Schema 类型。

schema

Schema 定义数据以 UTF 8 字符集编码。

  • 如果 schema 是 privitive schema,则此字段应为空。

  • 如果 schema 为 struct schema,则此字段应为 Avro schema 定义的 JSON 字符串。

properties

与 schema 相关的其他属性。
  1. void createSchema(String topic, PostSchemaPayload schemaPayload)

The PostSchemaPayload includes the following fields:

字段说明

type

Schema 类型。

schema

Schema 定义数据以 UTF 8 字符集编码。

  • 如果 schema 是 privitive schema,则此字段应为空。

  • 如果 schema 为 struct schema,则此字段应为 Avro schema 定义的 JSON 字符串。

properties

与 schema 相关的其他属性。

Here is an example of PostSchemaPayload:

  1. PulsarAdmin admin = …;PostSchemaPayload payload = new PostSchemaPayload();payload.setType("INT8");payload.setSchema("");admin.createSchema("my-tenant/my-ns/my-topic", payload);

获取 Schema(最新版本)

To get the latest schema for a topic, you can use one of the following methods.

Admin CLI

REST API

Java Admin API

使用 获得 子命令。

  1. $ pulsar-admin schemas get <topic-name>{ "version": 0, "type": "String", "timestamp": 0, "data": "string", "properties": { "property1": "string", "property2": "string" }}

Send a GET request to this endpoint: GET /admin/v2/schemas/:tenant/:namespace/:topic/schema

Here is an example of a response, which is returned in JSON format.

  1. { "version": "<the-version-number-of-the-schema>", "type": "<the-schema-type>", "timestamp": "<the-creation-timestamp-of-the-version-of-the-schema>", "data": "<an-utf8-encoded-string-of-schema-definition-data>", "properties": {} // the properties associated with the schema}

The response includes the following fields:

字段说明

version

Schema 版本,是一个长数。

type

Schema 类型。

timestamp

创建此版本 schema 的时间戳。

data

Schema 定义数据以 UTF 8 字符集编码。

  • 如果 schema 是 privitive schema,则此字段应为空。

  • 如果 schema 为 struct schema,则此字段应为 Avro schema 定义的 JSON 字符串。

properties

与 schema 相关的其他属性。
  1. SchemaInfo createSchema(String topic)

The SchemaInfo includes the following fields:

字段说明

name

Schema 名称。

type

Schema 类型。

schema

Schema 定义数据的字节数组以 UTF 8 字符集编码。

如果该 schema 是“primitive” schema,则此字节数组应为空。

如果该 schema 是 struct ** schema,则此字段应为由 Avro schema 定义的 JSON 字符串转换成的字节数组。

  1. properties
    </td>
    <td> schema 相关的其他属性。</td>

Here is an example of SchemaInfo:

  1. PulsarAdmin admin = …;SchemaInfo si = admin.getSchema("my-tenant/my-ns/my-topic");

获取 schema(详细信息)

To get a specific version of a schema, you can use one of the following methods.

Admin CLI

REST API

Java Admin API

使用 获得 子命令。

  1. $ pulsar-admin schemas get <topic-name> --version=<version>

Send a GET request to a schema endpoint: GET /admin/v2/schemas/:tenant/:namespace/:topic/schema/:version

Here is an example of a response, which is returned in JSON format.

  1. { "version": "<the-version-number-of-the-schema>", "type": "<the-schema-type>", "timestamp": "<the-creation-timestamp-of-the-version-of-the-schema>", "data": "<an-utf8-encoded-string-of-schema-definition-data>", "properties": {} // the properties associated with the schema}

The response includes the following fields:

字段说明

version

Schema 版本,是一个长数。

type

Schema 类型。

timestamp

创建此版本 schema 的时间戳。

data

Schema 定义数据以 UTF 8 字符集编码。

  • 如果 schema 是 privitive schema,则此字段应为空。

  • 如果 schema 为 struct schema,则此字段应为 Avro schema 定义的 JSON 字符串。

properties

与 schema 相关的其他属性。
  1. SchemaInfo createSchema(String topic, long version)

The SchemaInfo includes the following fields:

字段说明

name

Schema 名称。

type

Schema 类型。

schema

Schema 定义数据的字节数组以 UTF 8 字符集编码。

如果该 schema 是“primitive” schema,则此字节数组应为空。

如果该 schema 是 struct ** schema,则此字段应为由 Avro schema 定义的 JSON 字符串转换成的字节数组。

  1. properties
    </td>
    <td> schema 相关的其他属性。</td>

Here is an example of SchemaInfo:

  1. PulsarAdmin admin = …;SchemaInfo si = admin.getSchema("my-tenant/my-ns/my-topic", 1L);

解压缩 schema

To provide a schema via a topic, you can use the following method.

Admin CLI

Use the extract subcommand.

  1. $ pulsar-admin schemas extract --classname <class-name> --jar <jar-path> --type <type-name>

删除 schema

To delete a schema for a topic, you can use one of the following methods.

Note

In any case, the delete action deletes all versions of a schema registered for a topic.

Admin CLI

REST API

Java Admin API

使用 删除 子命令。

  1. $ pulsar-admin schemas delete <topic-name>

Send a DELETE request to a schema endpoint: DELETE /admin/v2/schemas/:tenant/:namespace/:topic/schema

Here is an example of a response, which is returned in JSON format.

  1. { "version": "<the-latest-version-number-of-the-schema>",}

The response includes the following field:

字段说明
versionSchema 版本,是一个长数。
  1. void deleteSchema(String topic)

Here is an example of deleting a schema.

  1. PulsarAdmin admin = …;admin.deleteSchema("my-tenant/my-ns/my-topic");

自定义schema存储

By default, Pulsar stores various data types of schemas in Apache BookKeeper deployed alongside Pulsar.

However, you can use another storage system if needed.

实现

To use a non-default (non-BookKeeper) storage system for Pulsar schemas, you need to implement the following Java interfaces:

SchemaStorage 接口

SchemaStorage接口包含以下方法:

  1. public interface SchemaStorage {
  2. // 如何更新schema
  3. CompletableFuture<SchemaVersion> put(String key, byte[] value, byte[] hash);
  4. // 如何从存储中获取schema
  5. CompletableFuture<StoredSchema> get(String key, SchemaVersion version);
  6. // 如何删除schema
  7. CompletableFuture<SchemaVersion> delete(String key);
  8. // 用户将schema字节数据转换为SchemaVersion 对象的工具方法
  9. SchemaVersion versionFromBytes(byte[] version);
  10. // 启动schema存储客户端
  11. void start() throws Exception;
  12. // 关闭schema存储客户端
  13. void close() throws Exception;
  14. }

提示

For a complete example of schema storage implementation, see BookKeeperSchemaStorage class.

SchemaStorageFactory接口

The SchemaStorageFactory interface has the following method:

  1. public interface SchemaStorageFactory {
  2. @NotNull
  3. SchemaStorage create(PulsarService pulsar) throws Exception;
  4. }

提示

For a complete example of schema storage factory implementation, see BookKeeperSchemaStorageFactory class.

部署

To use your custom schema storage implementation, perform the following steps.

  1. 将实现打包到 JAR 文件中。

  2. 将 JAR 文件添加到 Pulsar 二进制或 source 分布中的 lib 文件夹中。

  3. broker.conf 中的 schemaRegistryStorageClassName 配置更改为自定义工厂类。

  4. 启动 Pulsar。