状态存储

创建一个状态存储组件只需要几个基本步骤。

添加状态存储命名空间

添加 using 语句来引用与状态存储相关的命名空间。

  1. using Dapr.PluggableComponents.Components;
  2. using Dapr.PluggableComponents.Components.StateStore;

实现 IStateStore

创建一个实现IStateStore接口的类。

  1. internal sealed class MyStateStore : IStateStore
  2. {
  3. public Task DeleteAsync(StateStoreDeleteRequest request, CancellationToken cancellationToken = default)
  4. {
  5. // Delete the requested key from the state store...
  6. }
  7. public Task<StateStoreGetResponse?> GetAsync(StateStoreGetRequest request, CancellationToken cancellationToken = default)
  8. {
  9. // Get the requested key value from from the state store, else return null...
  10. }
  11. public Task InitAsync(MetadataRequest request, CancellationToken cancellationToken = default)
  12. {
  13. // Called to initialize the component with its configured metadata...
  14. }
  15. public Task SetAsync(StateStoreSetRequest request, CancellationToken cancellationToken = default)
  16. {
  17. // Set the requested key to the specified value in the state store...
  18. }
  19. }

注册状态存储组件

在主程序文件中(例如,Program.cs),将状态存储注册到应用程序服务中。

  1. using Dapr.PluggableComponents;
  2. var app = DaprPluggableComponentsApplication.Create();
  3. app.RegisterService(
  4. "<socket name>",
  5. serviceBuilder =>
  6. {
  7. serviceBuilder.RegisterStateStore<MyStateStore>();
  8. });
  9. app.Run();

批量状态存储

打算支持批量操作的状态存储应该实现可选的 IBulkStateStore 接口。 它的方法与基础IStateStore接口的方法相同,但包括多个请求的值。

Note

Dapr 运行时将通过单独调用其操作来模拟状态存储的批量状态存储操作,这些状态存储_不_实现“IBulkStateStore”。

  1. internal sealed class MyStateStore : IStateStore, IBulkStateStore
  2. {
  3. // ...
  4. public Task BulkDeleteAsync(StateStoreDeleteRequest[] requests, CancellationToken cancellationToken = default)
  5. {
  6. // Delete all of the requested values from the state store...
  7. }
  8. public Task<StateStoreBulkStateItem[]> BulkGetAsync(StateStoreGetRequest[] requests, CancellationToken cancellationToken = default)
  9. {
  10. // Return the values of all of the requested values from the state store...
  11. }
  12. public Task BulkSetAsync(StateStoreSetRequest[] requests, CancellationToken cancellationToken = default)
  13. {
  14. // Set all of the values of the requested keys in the state store...
  15. }
  16. }

事务性状态存储

打算支持事务的状态存储应该实现可选的ITransactionalStateStore接口。 它的TransactAsync()方法接收一个请求,其中包含要在事务中执行的一系列delete和/或set操作。 状态存储应该遍历序列并调用每个操作的Visit()方法,传递表示每种操作类型所采取的动作的回调函数。

  1. internal sealed class MyStateStore : IStateStore, ITransactionalStateStore
  2. {
  3. // ...
  4. public async Task TransactAsync(StateStoreTransactRequest request, CancellationToken cancellationToken = default)
  5. {
  6. // Start transaction...
  7. try
  8. {
  9. foreach (var operation in request.Operations)
  10. {
  11. await operation.Visit(
  12. async deleteRequest =>
  13. {
  14. // Process delete request...
  15. },
  16. async setRequest =>
  17. {
  18. // Process set request...
  19. });
  20. }
  21. }
  22. catch
  23. {
  24. // Rollback transaction...
  25. throw;
  26. }
  27. // Commit transaction...
  28. }
  29. }

可查询的状态存储

打算支持查询的状态存储应该实现可选的IQueryableStateStore接口。 它的QueryAsync()方法会传递有关查询的详细信息,例如过滤器、结果限制和分页,以及结果的排序顺序。 状态存储应该使用这些详细信息来生成一组值,作为其响应的一部分返回。

  1. internal sealed class MyStateStore : IStateStore, IQueryableStateStore
  2. {
  3. // ...
  4. public Task<StateStoreQueryResponse> QueryAsync(StateStoreQueryRequest request, CancellationToken cancellationToken = default)
  5. {
  6. // Generate and return results...
  7. }
  8. }

ETag 和其他语义错误处理

Dapr 运行时对某些状态存储操作的特定错误条件有额外处理。 状态存储可以通过从其操作逻辑中抛出特定的异常来指示这些条件:

异常适用操作说明
ETagInvalidExceptionDelete, Set, Bulk Delete, Bulk Set当 ETag 无效时
ETagMismatchExceptionDelete, Set, Bulk Delete, Bulk Set当 ETag 与预期值不匹配时
BulkDeleteRowMismatchExceptionBulk Delete当受影响的行数与预期的行数不匹配时