如何实现可插拔组件

学习如何编写和实现可插拔组件

在本指南中,您将了解为什么以及如何实现 可插拔组件。 要了解如何配置和注册可插拔组件,请参阅如何:注册可插入组件

实现一个可插拔组件

为了实现可插拔组件,您需要在组件中实现一个 gRPC 服务。 实现 gRPC 服务需要三个步骤:

查找 proto 定义文件

为每个支持的服务接口(状态存储、发布/订阅、绑定、密钥存储)提供了Proto定义。

目前支持以下组件API:

  • 状态存储
  • Pub/sub
  • 绑定
  • Secret stores(密钥存储)
Component类型gRPC 定义内置参考实现文档
状态存储statestate.protoRedis概念如何操作API规范
Pub/subpubsubpubsub.protoRedis概念如何操作API规范
绑定bindingsbindings.protoKafka概念输入如何输出如何API规范
密钥存储secretstoressecretstore.protoHashicorp/Vault概念如何操作-机密API规范

以下是可插拔组件状态存储的 gRPC 服务定义片段([state.proto]):

  1. // StateStore service provides a gRPC interface for state store components.
  2. service StateStore {
  3. // Initializes the state store component with the given metadata.
  4. rpc Init(InitRequest) returns (InitResponse) {}
  5. // Returns a list of implemented state store features.
  6. rpc Features(FeaturesRequest) returns (FeaturesResponse) {}
  7. // Ping the state store. Used for liveness purposes.
  8. rpc Ping(PingRequest) returns (PingResponse) {}
  9. // Deletes the specified key from the state store.
  10. rpc Delete(DeleteRequest) returns (DeleteResponse) {}
  11. // Get data from the given key.
  12. rpc Get(GetRequest) returns (GetResponse) {}
  13. // Sets the value of the specified key.
  14. rpc Set(SetRequest) returns (SetResponse) {}
  15. // Deletes many keys at once.
  16. rpc BulkDelete(BulkDeleteRequest) returns (BulkDeleteResponse) {}
  17. // Retrieves many keys at once.
  18. rpc BulkGet(BulkGetRequest) returns (BulkGetResponse) {}
  19. // Set the value of many keys at once.
  20. rpc BulkSet(BulkSetRequest) returns (BulkSetResponse) {}
  21. }

StateStore 服务的接口公开了总共 9 个方法:

  • 2 种初始化方法和组件功能通告(Init 和 Features)
  • 1种用于健康性或活动性检查的方法 (Ping)
  • CRUD 的 3 种方法(获取、设置、删除)
  • 批量 CRUD 操作的 3 种方法(BulkGet、BulkSet、BulkDelete)

创建服务脚手架

使用协议缓冲区和gRPC工具为服务创建必要的基架。 通过gRPC 概念文档了解更多关于这些工具的信息。

这些工具生成针对任何支持gRPC的语言编写的代码。 此代码作为您的服务器的基础,并提供:

  • 处理客户端调用的功能
  • 基础设施包括:
    • 解码传入请求
    • 执行服务方法
    • 编码服务响应

生成的代码不完整。 它缺少:

  • 您的目标服务定义的方法的具体实现(可插拔组件的核心)。
  • 有关如何处理 Unix 套接字域集成的代码,这是特定于 Dapr 的。
  • 处理与您的下游服务集成的代码。

在下一步中详细了解如何填补这些空白。

定义服务

提供所需服务的具体实现。 每个组件都有一个 gRPC 服务定义,用于其核心功能,与核心组件接口相同。 例如:

  • 状态存储

    可插拔的状态存储必须提供StateStore服务接口的实现。

    除了这个核心功能之外,一些组件还可能在其他可选服务下公开功能。 例如,您可以通过定义 QueriableStateStore 服务和 TransactionalStateStore 服务的实现来添加额外功能。

  • 发布/订阅

    可插拔的发布/订阅组件只定义了一个单一的核心服务接口 pubsub.proto。 它们没有可选的服务接口。

  • 绑定

    可插拔的输入和输出绑定在 bindings.proto 上只有一个单一的核心服务定义。 它们没有可选的服务接口。

  • 密钥存储

    可插拔的密钥存储在 secretstore.proto 上只有一个单一的核心服务定义。 它们没有可选的服务接口。

在使用 gRPC 和协议缓冲区工具生成上述状态存储示例的服务脚手架代码后,您可以为service StateStore下定义的 9 个方法定义具体实现,以及用于初始化和与您的依赖项通信的代码。

这个具体的实现和辅助代码是您可插拔组件的核心。 它们定义了组件在处理来自 Dapr 的 gRPC 请求时的行为方式。

返回语义错误

返回语义错误也是可插拔组件协议的一部分。 组件必须返回具有语义意义的特定 gRPC 代码,这些错误用于从并发要求到仅供信息使用的各种情况。

错误gRPC错误代码源组件说明
Etag不匹配codes.FailedPrecondition状态存储映射错误,无法满足并发需求
ETag 无效codes.InvalidArgument状态存储
批量删除行不匹配codes.Internal状态存储

状态管理概述中了解更多关于并发要求的信息。

以下示例演示如何在您自己的可插拔组件中返回错误,更改消息以满足您的需求。

重要: 为了使用.NET进行错误映射,请首先安装Google.Api.CommonProtos NuGet包

Etag不匹配

  1. var badRequest = new BadRequest();
  2. var des = "The ETag field provided does not match the one in the store";
  3. badRequest.FieldViolations.Add(
  4. new Google.Rpc.BadRequest.Types.FieldViolation
  5. {
  6. Field = "etag",
  7. Description = des
  8. });
  9. var baseStatusCode = Grpc.Core.StatusCode.FailedPrecondition;
  10. var status = new Google.Rpc.Status{
  11. Code = (int)baseStatusCode
  12. };
  13. status.Details.Add(Google.Protobuf.WellKnownTypes.Any.Pack(badRequest));
  14. var metadata = new Metadata();
  15. metadata.Add("grpc-status-details-bin", status.ToByteArray());
  16. throw new RpcException(new Grpc.Core.Status(baseStatusCode, "fake-err-msg"), metadata);

Etag 无效

  1. var badRequest = new BadRequest();
  2. var des = "The ETag field must only contain alphanumeric characters";
  3. badRequest.FieldViolations.Add(
  4. new Google.Rpc.BadRequest.Types.FieldViolation
  5. {
  6. Field = "etag",
  7. Description = des
  8. });
  9. var baseStatusCode = Grpc.Core.StatusCode.InvalidArgument;
  10. var status = new Google.Rpc.Status
  11. {
  12. Code = (int)baseStatusCode
  13. };
  14. status.Details.Add(Google.Protobuf.WellKnownTypes.Any.Pack(badRequest));
  15. var metadata = new Metadata();
  16. metadata.Add("grpc-status-details-bin", status.ToByteArray());
  17. throw new RpcException(new Grpc.Core.Status(baseStatusCode, "fake-err-msg"), metadata);

批量删除行不匹配

  1. var errorInfo = new Google.Rpc.ErrorInfo();
  2. errorInfo.Metadata.Add("expected", "100");
  3. errorInfo.Metadata.Add("affected", "99");
  4. var baseStatusCode = Grpc.Core.StatusCode.Internal;
  5. var status = new Google.Rpc.Status{
  6. Code = (int)baseStatusCode
  7. };
  8. status.Details.Add(Google.Protobuf.WellKnownTypes.Any.Pack(errorInfo));
  9. var metadata = new Metadata();
  10. metadata.Add("grpc-status-details-bin", status.ToByteArray());
  11. throw new RpcException(new Grpc.Core.Status(baseStatusCode, "fake-err-msg"), metadata);

就像Dapr Java SDK一样,Java Pluggable Components SDK使用Project Reactor,为Java提供了异步API。

错误可以直接返回:

  1. 在您的方法返回的MonoFlux中调用.error()方法
  2. 提供适当的异常作为参数。

只要它被捕获并反馈到您的结果MonoFlux中,您也可以引发异常。

Etag不匹配

  1. final Status status = Status.newBuilder()
  2. .setCode(io.grpc.Status.Code.FAILED_PRECONDITION.value())
  3. .setMessage("fake-err-msg-for-etag-mismatch")
  4. .addDetails(Any.pack(BadRequest.FieldViolation.newBuilder()
  5. .setField("etag")
  6. .setDescription("The ETag field provided does not match the one in the store")
  7. .build()))
  8. .build();
  9. return Mono.error(StatusProto.toStatusException(status));

ETag 无效

  1. final Status status = Status.newBuilder()
  2. .setCode(io.grpc.Status.Code.INVALID_ARGUMENT.value())
  3. .setMessage("fake-err-msg-for-invalid-etag")
  4. .addDetails(Any.pack(BadRequest.FieldViolation.newBuilder()
  5. .setField("etag")
  6. .setDescription("The ETag field must only contain alphanumeric characters")
  7. .build()))
  8. .build();
  9. return Mono.error(StatusProto.toStatusException(status));

批量删除行不匹配

  1. final Status status = Status.newBuilder()
  2. .setCode(io.grpc.Status.Code.INTERNAL.value())
  3. .setMessage("fake-err-msg-for-bulk-delete-row-mismatch")
  4. .addDetails(Any.pack(ErrorInfo.newBuilder()
  5. .putAllMetadata(Map.ofEntries(
  6. Map.entry("affected", "99"),
  7. Map.entry("expected", "100")
  8. ))
  9. .build()))
  10. .build();
  11. return Mono.error(StatusProto.toStatusException(status));

Etag不匹配

  1. st := status.New(codes.FailedPrecondition, "fake-err-msg")
  2. desc := "The ETag field provided does not match the one in the store"
  3. v := &errdetails.BadRequest_FieldViolation{
  4. Field: etagField,
  5. Description: desc,
  6. }
  7. br := &errdetails.BadRequest{}
  8. br.FieldViolations = append(br.FieldViolations, v)
  9. st, err := st.WithDetails(br)

ETag 无效

  1. st := status.New(codes.InvalidArgument, "fake-err-msg")
  2. desc := "The ETag field must only contain alphanumeric characters"
  3. v := &errdetails.BadRequest_FieldViolation{
  4. Field: etagField,
  5. Description: desc,
  6. }
  7. br := &errdetails.BadRequest{}
  8. br.FieldViolations = append(br.FieldViolations, v)
  9. st, err := st.WithDetails(br)

批量删除行不匹配

  1. st := status.New(codes.Internal, "fake-err-msg")
  2. br := &errdetails.ErrorInfo{}
  3. br.Metadata = map[string]string{
  4. affected: "99",
  5. expected: "100",
  6. }
  7. st, err := st.WithDetails(br)

下一步