状态存储

创建一个状态存储组件只需要几个基本步骤。

导入状态存储包

创建文件components/statestore.go并添加import语句,用于状态存储相关的包。

  1. package components
  2. import (
  3. "context"
  4. "github.com/dapr/components-contrib/state"
  5. )

实现 Store 接口

创建一个实现Store接口的类型。

  1. type MyStateStore struct {
  2. }
  3. func (store *MyStateStore) Init(metadata state.Metadata) error {
  4. // Called to initialize the component with its configured metadata...
  5. }
  6. func (store *MyStateStore) GetComponentMetadata() map[string]string {
  7. // Not used with pluggable components...
  8. return map[string]string{}
  9. }
  10. func (store *MyStateStore) Features() []state.Feature {
  11. // Return a list of features supported by the state store...
  12. }
  13. func (store *MyStateStore) Delete(ctx context.Context, req *state.DeleteRequest) error {
  14. // Delete the requested key from the state store...
  15. }
  16. func (store *MyStateStore) Get(ctx context.Context, req *state.GetRequest) (*state.GetResponse, error) {
  17. // Get the requested key value from the state store, else return an empty response...
  18. }
  19. func (store *MyStateStore) Set(ctx context.Context, req *state.SetRequest) error {
  20. // Set the requested key to the specified value in the state store...
  21. }
  22. func (store *MyStateStore) BulkGet(ctx context.Context, req []state.GetRequest) (bool, []state.BulkGetResponse, error) {
  23. // Get the requested key values from the state store...
  24. }
  25. func (store *MyStateStore) BulkDelete(ctx context.Context, req []state.DeleteRequest) error {
  26. // Delete the requested keys from the state store...
  27. }
  28. func (store *MyStateStore) BulkSet(ctx context.Context, req []state.SetRequest) error {
  29. // Set the requested keys to their specified values in the state store...
  30. }

注册状态存储组件

在主应用程序文件中(例如,main.go),将状态存储注册到应用程序服务中。

  1. package main
  2. import (
  3. "example/components"
  4. dapr "github.com/dapr-sandbox/components-go-sdk"
  5. "github.com/dapr-sandbox/components-go-sdk/state/v1"
  6. )
  7. func main() {
  8. dapr.Register("<socket name>", dapr.WithStateStore(func() state.Store {
  9. return &components.MyStateStoreComponent{}
  10. }))
  11. dapr.MustRun()
  12. }

批量状态存储

虽然需要状态存储来支持 批量操作,它们的实现按顺序委托给各个操作方法。

事务性状态存储

打算支持事务的状态存储应该实现可选的TransactionalStore接口。 它的Multi()方法接收一个请求,其中包含要在事务中执行的一系列delete和/或set操作。 状态存储应该遍历序列并应用每个操作。

  1. func (store *MyStateStoreComponent) Multi(ctx context.Context, request *state.TransactionalStateRequest) error {
  2. // Start transaction...
  3. for _, operation := range request.Operations {
  4. switch operation.Operation {
  5. case state.Delete:
  6. deleteRequest := operation.Request.(state.DeleteRequest)
  7. // Process delete request...
  8. case state.Upsert:
  9. setRequest := operation.Request.(state.SetRequest)
  10. // Process set request...
  11. }
  12. }
  13. // End (or rollback) transaction...
  14. return nil
  15. }

可查询的状态存储

打算支持查询的状态存储应该实现可选的Querier接口。 它的Query()方法会传递有关查询的详细信息,例如过滤器、结果限制、分页和结果的排序顺序。 状态存储使用这些详细信息来生成一组值,作为其响应的一部分返回。

  1. func (store *MyStateStoreComponent) Query(ctx context.Context, req *state.QueryRequest) (*state.QueryResponse, error) {
  2. // Generate and return results...
  3. }

ETag 和其他语义错误处理

Dapr 运行时对某些状态存储操作的特定错误条件有额外处理。 状态存储可以通过从其操作逻辑中返回特定错误来指示这些条件:

错误适用操作说明
NewETagError(state.ETagInvalid, …)Delete, Set, Bulk Delete, Bulk Set当 ETag 无效时
NewETagError(state.ETagMismatch, …)Delete, Set, Bulk Delete, Bulk Set当 ETag 与预期值不匹配时
NewBulkDeleteRowMismatchError(…)Bulk Delete当受影响的行数与预期的行数不匹配时

下一步