指南:如何保存和获取状态

使用键值对来持久化状态

介绍

状态管理是任何应用程序最常见的需求之一:无论是新是旧,是单体还是微服务。 与不同的数据库库打交道,进行测试,处理重试和故障是很费时费力的。

Dapr提供的状态管理功能包括一致性和并发选项。 在本指南中,我们将从基础知识开始。使用键/值状态API来允许应用程序保存,获取和删除状态。

前提

第一步:设置状态存储

状态存储组件代表Dapr用来与数据库进行通信的资源。

在本指南中,我们将使用 Redis 作为状态存储引擎,但在 支持列表中的任何状态存储引擎都是可以使用的。

当在单机模式下使用dapr init时,Dapr CLI会自动提供一个状态存储(Redis),并在components目录中创建相关的YAML,在Linux/MacOS上位于$HOME/.dapr/components,在Windows上位于%USERPROFILE%/.dapr/components

如果需要切换使用的状态存储引擎,用你选择的文件替换/components下的YAML文件statestore.yaml

要将其部署到 Kubernetes 集群中,请在下面的 yaml 中填写你的所需statestore 组件metadata 连接详情,保存为 statestore.yaml,并执行命令 kubectl apply -f statestore.yaml

  1. apiVersion: dapr.io/v1alpha1
  2. kind: Component
  3. metadata:
  4. name: statestore
  5. namespace: default
  6. spec:
  7. type: state.redis
  8. version: v1
  9. metadata:
  10. - name: redisHost
  11. value: localhost:6379
  12. - name: redisPassword
  13. value: ""

请参阅 这里的说明,了解如何在 Kubernetes 上设置不同的状态存储引擎。

第二步:保存和检索单个状态

下面的例子显示了如何使用Dapr状态构件的单个键/值对。

Note

设置一个app-id是很重要的,因为状态键是以这个值为前缀的。 如果你不设置,就会在运行期间为你自动生成一个值,而到下次运行命令时又会生成一个新的值,你将因此无法再访问以前保存的状态。

首先启动一个Dapr sidecar:

  1. dapr run --app-id myapp --dapr-http-port 3500

然后在一个单独的终端中保存一个键/值对到你的statestore中:

  1. curl -X POST -H "Content-Type: application/json" -d '[{ "key": "key1", "value": "value1"}]' http://localhost:3500/v1.0/state/statestore

现在获取你刚才保存的状态:

  1. curl http://localhost:3500/v1.0/state/statestore/key1

你也可以重启你的sidecar,然后再次尝试检索状态,看看存储的状态是否与应用状态保持一致。

首先启动Dapr sidecar:

  1. dapr --app-id myapp --port 3500 run

然后在一个单独的终端中保存一个键/值对到你的statestore中:

  1. Invoke-RestMethod -Method Post -ContentType 'application/json' -Body '[{"key": "key1", "value": "value1"}]' -Uri 'http://localhost:3500/v1.0/state/statestore'

现在获取你刚才保存的状态:

  1. Invoke-RestMethod -Uri 'http://localhost:3500/v1.0/state/statestore/key1'

你也可以重启你的sidecar,然后再次尝试检索状态,看看存储的状态是否与应用状态保持一致。

将以下内容保存到名为pythonState.py的文件中:

  1. from dapr.clients import DaprClient
  2. with DaprClient() as d:
  3. d.save_state(store_name="statestore", key="myFirstKey", value="myFirstValue" )
  4. print("State has been stored")
  5. data = d.get_state(store_name="statestore", key="myFirstKey").data
  6. print(f"Got value: {data}")

保存后执行以下命令启动Dapr sidecar并运行Python应用程序:

  1. dapr --app-id myapp run python pythonState.py

你应该会得到一个类似于下面的输出,它将同时显示Dapr和应用程序的日志:

  1. == DAPR == time="2021-01-06T21:34:33.7970377-08:00" level=info msg="starting Dapr Runtime -- version 0.11.3 -- commit a1a8e11" app_id=Braidbald-Boot scope=dapr.runtime type=log ver=0.11.3
  2. == DAPR == time="2021-01-06T21:34:33.8040378-08:00" level=info msg="standalone mode configured" app_id=Braidbald-Boot scope=dapr.runtime type=log ver=0.11.3
  3. == DAPR == time="2021-01-06T21:34:33.8040378-08:00" level=info msg="app id: Braidbald-Boot" app_id=Braidbald-Boot scope=dapr.runtime type=log ver=0.11.3
  4. == DAPR == time="2021-01-06T21:34:33.9750400-08:00" level=info msg="component loaded. name: statestore, type: state.redis" app_id=Braidbald-Boot scope=dapr.runtime type=log ver=0.11.3
  5. == DAPR == time="2021-01-06T21:34:33.9760387-08:00" level=info msg="API gRPC server is running on port 51656" app_id=Braidbald-Boot scope=dapr.runtime type=log ver=0.11.3
  6. == DAPR == time="2021-01-06T21:34:33.9770372-08:00" level=info msg="dapr initialized. Status: Running. Init Elapsed 172.9994ms" app_id=Braidbald-Boot scope=dapr.
  7. Checking if Dapr sidecar is listening on GRPC port 51656
  8. Dapr sidecar is up and running.
  9. Updating metadata for app command: python pythonState.py
  10. You are up and running! Both Dapr and your app logs will appear here.
  11. == APP == State has been stored
  12. == APP == Got value: b'myFirstValue' Status: Running. Init Elapsed 172.9994ms" app_id=Braidbald-Boot scope=dapr.

state-example.php中保存以下内容:

  1. <?php
  2. require_once __DIR__.'/vendor/autoload.php';
  3. $app = \Dapr\App::create();
  4. $app->run(function(\Dapr\State\StateManager $stateManager, \Psr\Log\LoggerInterface $logger) {
  5. $stateManager->save_state(store_name: 'statestore', item: new \Dapr\State\StateItem(
  6. key: 'myFirstKey',
  7. value: 'myFirstValue'
  8. ));
  9. $logger->alert('State has been stored');
  10. $data = $stateManager->load_state(store_name: 'statestore', key: 'myFirstKey')->value;
  11. $logger->alert("Got value: {data}", ['data' => $data]);
  12. });

保存后,执行以下命令启动Dapr sidecar并运行PHP应用程序:

  1. dapr --app-id myapp run -- php state-example.php

你应该会得到一个类似于下面的输出,它将同时显示Dapr和应用程序的日志:

  1. You're up and running! Both Dapr and your app logs will appear here.
  2. == APP == [2021-02-12T16:30:11.078777+01:00] APP.ALERT: State has been stored [] []
  3. == APP == [2021-02-12T16:30:11.082620+01:00] APP.ALERT: Got value: myFirstValue {"data":"myFirstValue"} []

第三步:删除状态

下面的例子显示了如何通过给状态管理API传递一个键来删除一个对象:

用上面运行的同一个dapr实例执行:

  1. curl -X DELETE 'http://localhost:3500/v1.0/state/statestore/key1'

再尝试获取状态,注意没有返回任何值。

用上面运行的同一个dapr实例执行:

  1. Invoke-RestMethod -Method Delete -Uri 'http://localhost:3500/v1.0/state/statestore/key1'

再尝试获取状态,注意没有返回任何值。

修改pythonState.py如下:

  1. from dapr.clients import DaprClient
  2. with DaprClient() as d:
  3. d.save_state(store_name="statestore", key="key1", value="value1" )
  4. print("State has been stored")
  5. data = d.get_state(store_name="statestore", key="key1").data
  6. print(f"Got value: {data}")
  7. d.delete_state(store_name="statestore", key="key1")
  8. data = d.get_state(store_name="statestore", key="key1").data
  9. print(f"Got value after delete: {data}")

现在通过以下命令运行你的程序:

  1. dapr --app-id myapp run python pythonState.py

你应该会看到一个类似于下面的输出:

  1. Starting Dapr with id Yakchocolate-Lord. HTTP Port: 59457. gRPC Port: 59458
  2. == DAPR == time="2021-01-06T22:55:36.5570696-08:00" level=info msg="starting Dapr Runtime -- version 0.11.3 -- commit a1a8e11" app_id=Yakchocolate-Lord scope=dapr.runtime type=log ver=0.11.3
  3. == DAPR == time="2021-01-06T22:55:36.5690367-08:00" level=info msg="standalone mode configured" app_id=Yakchocolate-Lord scope=dapr.runtime type=log ver=0.11.3
  4. == DAPR == time="2021-01-06T22:55:36.7220140-08:00" level=info msg="component loaded. Status: Running. Init Elapsed 154.984ms" app_id=Yakchocolate-Lord scope=dapr.runtime type=log ver=0.11.3
  5. Checking if Dapr sidecar is listening on GRPC port 59458
  6. Dapr sidecar is up and running.
  7. Updating metadata for app command: python pythonState.py
  8. You're up and running!
  9. == APP == State has been stored
  10. == APP == Got value: b'value1'
  11. == APP == Got value after delete: b''

修改state-example.php,内容如下:

  1. <?php
  2. require_once __DIR__.'/vendor/autoload.php';
  3. $app = \Dapr\App::create();
  4. $app->run(function(\Dapr\State\StateManager $stateManager, \Psr\Log\LoggerInterface $logger) {
  5. $stateManager->save_state(store_name: 'statestore', item: new \Dapr\State\StateItem(
  6. key: 'myFirstKey',
  7. value: 'myFirstValue'
  8. ));
  9. $logger->alert('State has been stored');
  10. $data = $stateManager->load_state(store_name: 'statestore', key: 'myFirstKey')->value;
  11. $logger->alert("Got value: {data}", ['data' => $data]);
  12. $stateManager->delete_keys(store_name: 'statestore', keys: ['myFirstKey']);
  13. $data = $stateManager->load_state(store_name: 'statestore', key: 'myFirstKey')->value;
  14. $logger->alert("Got value after delete: {data}", ['data' => $data]);
  15. });

现在使用以下命令运行它:

  1. dapr --app-id myapp run -- php state-example.php

你应该会看到类似下面的输出:

  1. You're up and running!
  2. == APP == [2021-02-12T16:38:00.839201+01:00] APP.ALERT: State has been stored [] []
  3. == APP == [2021-02-12T16:38:00.841997+01:00] APP.ALERT: Got value: myFirstValue {"data":"myFirstValue"} []
  4. == APP == [2021-02-12T16:38:00.845721+01:00] APP.ALERT: Got value after delete: {"data":null} []

第四步:保存和检索多个状态

Dapr还允许你在同一个调用中保存和检索多个状态:

在上面运行的同一个dapr实例中,将两个键/值对保存到你的statetore中:

  1. curl -X POST -H "Content-Type: application/json" -d '[{ "key": "key1", "value": "value1"}, { "key": "key2", "value": "value2"}]' http://localhost:3500/v1.0/state/statestore

现在获取你刚才保存的状态:

  1. curl -X POST -H "Content-Type: application/json" -d '{"keys":["key1", "key2"]}' http://localhost:3500/v1.0/state/statestore/bulk

在上面运行的同一个dapr实例中,将两个键/值对保存到你的statetore中:

  1. Invoke-RestMethod -Method Post -ContentType 'application/json' -Body '[{ "key": "key1", "value": "value1"}, { "key": "key2", "value": "value2"}]' -Uri 'http://localhost:3500/v1.0/state/statestore'

现在获取你刚才保存的状态:

  1. Invoke-RestMethod -Method Post -ContentType 'application/json' -Body '{"keys":["key1", "key2"]}' -Uri 'http://localhost:3500/v1.0/state/statestore/bulk'

StateItem对象可以使用save_statesget_states方法来存储多个Dapr状态。

用以下代码更新你的pythonState.py文件:

  1. from dapr.clients import DaprClient
  2. from dapr.clients.grpc._state import StateItem
  3. with DaprClient() as d:
  4. s1 = StateItem(key="key1", value="value1")
  5. s2 = StateItem(key="key2", value="value2")
  6. d.save_bulk_state(store_name="statestore", states=[s1,s2])
  7. print("States have been stored")
  8. items = d.get_bulk_state(store_name="statestore", keys=["key1", "key2"]).items
  9. print(f"Got items: {[i.data for i in items]}")

现在通过以下命令运行你的程序:

  1. dapr --app-id myapp run python pythonState.py

你应该会看到一个类似于下面的输出:

  1. == DAPR == time="2021-01-06T21:54:56.7262358-08:00" level=info msg="starting Dapr Runtime -- version 0.11.3 -- commit a1a8e11" app_id=Musesequoia-Sprite scope=dapr.runtime type=log ver=0.11.3
  2. == DAPR == time="2021-01-06T21:54:56.7401933-08:00" level=info msg="standalone mode configured" app_id=Musesequoia-Sprite scope=dapr.runtime type=log ver=0.11.3
  3. == DAPR == time="2021-01-06T21:54:56.8754240-08:00" level=info msg="Initialized name resolution to standalone" app_id=Musesequoia-Sprite scope=dapr.runtime type=log ver=0.11.3
  4. == DAPR == time="2021-01-06T21:54:56.8844248-08:00" level=info msg="component loaded. name: statestore, type: state.redis" app_id=Musesequoia-Sprite scope=dapr.runtime type=log ver=0.11.3
  5. == DAPR == time="2021-01-06T21:54:56.8854273-08:00" level=info msg="API gRPC server is running on port 60614" app_id=Musesequoia-Sprite scope=dapr.runtime type=log ver=0.11.3
  6. == DAPR == time="2021-01-06T21:54:56.8854273-08:00" level=info msg="dapr initialized. Status: Running. Init Elapsed 145.234ms" app_id=Musesequoia-Sprite scope=dapr.runtime type=log ver=0.11.3
  7. Checking if Dapr sidecar is listening on GRPC port 60614
  8. Dapr sidecar is up and running.
  9. Updating metadata for app command: python pythonState.py
  10. You're up and running!
  11. == APP == States have been stored
  12. == APP == Got items: [b'value1', b'value2']

要用PHP批量加载和保存状态,只需创建一个 “Plain Ole’ PHP对象”(POPO),并用 StateStore注解进行声明。

更新state-example.php文件:

  1. <?php
  2. require_once __DIR__.'/vendor/autoload.php';
  3. #[\Dapr\State\Attributes\StateStore('statestore', \Dapr\consistency\EventualLastWrite::class)]
  4. class MyState {
  5. public string $key1 = 'value1';
  6. public string $key2 = 'value2';
  7. }
  8. $app = \Dapr\App::create();
  9. $app->run(function(\Dapr\State\StateManager $stateManager, \Psr\Log\LoggerInterface $logger) {
  10. $obj = new MyState();
  11. $stateManager->save_object(item: $obj);
  12. $logger->alert('States have been stored');
  13. $stateManager->load_object(into: $obj);
  14. $logger->alert("Got value: {data}", ['data' => $obj]);
  15. });

运行该应用:

  1. dapr --app-id myapp run -- php state-example.php

并看到以下输出:

  1. You're up and running!
  2. == APP == [2021-02-12T16:55:02.913801+01:00] APP.ALERT: States have been stored [] []
  3. == APP == [2021-02-12T16:55:02.917850+01:00] APP.ALERT: Got value: [object MyState] {"data":{"MyState":{"key1":"value1","key2":"value2"}}} []

第五步:执行状态事务性操作

Note

状态事务性操作需要一个支持multi-item transactions的状态存储引擎。 请访问 支持的状态存储引擎页面查看完整列表。 请注意,在自托管环境中创建的默认Redis容器是支持的。

用上面运行的同一个dapr实例执行两个状态事务操作:

  1. curl -X POST -H "Content-Type: application/json" -d '{"operations": [{"operation":"upsert", "request": {"key": "key1", "value": "newValue1"}}, {"operation":"delete", "request": {"key": "key2"}}]}' http://localhost:3500/v1.0/state/statestore/transaction

现在可以看到你的状态事务操作的结果:

  1. curl -X POST -H "Content-Type: application/json" -d '{"keys":["key1", "key2"]}' http://localhost:3500/v1.0/state/statestore/bulk

在上面运行的同一个dapr实例中,将两个键/值对保存到你的statetore中:

  1. Invoke-RestMethod -Method Post -ContentType 'application/json' -Body '{"operations": [{"operation":"upsert", "request": {"key": "key1", "value": "newValue1"}}, {"operation":"delete", "request": {"key": "key2"}}]}' -Uri 'http://localhost:3500/v1.0/state/statestore'

现在可以看到你的状态事务操作的结果:

  1. Invoke-RestMethod -Method Post -ContentType 'application/json' -Body '{"keys":["key1", "key2"]}' -Uri 'http://localhost:3500/v1.0/state/statestore/bulk'

如果你的状态存储需要事务支持,可以考虑使用TransactionalStateOperation

用以下代码更新你的pythonState.py文件:

  1. from dapr.clients import DaprClient
  2. from dapr.clients.grpc._state import StateItem
  3. from dapr.clients.grpc._request import TransactionalStateOperation, TransactionOperationType
  4. with DaprClient() as d:
  5. s1 = StateItem(key="key1", value="value1")
  6. s2 = StateItem(key="key2", value="value2")
  7. d.save_bulk_state(store_name="statestore", states=[s1,s2])
  8. print("States have been stored")
  9. d.execute_state_transaction(
  10. store_name="statestore",
  11. operations=[
  12. TransactionalStateOperation(key="key1", data="newValue1", operation_type=TransactionOperationType.upsert),
  13. TransactionalStateOperation(key="key2", data="value2", operation_type=TransactionOperationType.delete)
  14. ]
  15. )
  16. print("State transactions have been completed")
  17. items = d.get_bulk_state(store_name="statestore", keys=["key1", "key2"]).items
  18. print(f"Got items: {[i.data for i in items]}")

现在通过以下命令运行你的程序:

  1. dapr run python pythonState.py

你应该会看到一个类似于下面的输出:

  1. Starting Dapr with id Singerchecker-Player. HTTP Port: 59533. gRPC Port: 59534
  2. == DAPR == time="2021-01-06T22:18:14.1246721-08:00" level=info msg="starting Dapr Runtime -- version 0.11.3 -- commit a1a8e11" app_id=Singerchecker-Player scope=dapr.runtime type=log ver=0.11.3
  3. == DAPR == time="2021-01-06T22:18:14.1346254-08:00" level=info msg="standalone mode configured" app_id=Singerchecker-Player scope=dapr.runtime type=log ver=0.11.3
  4. == DAPR == time="2021-01-06T22:18:14.2747063-08:00" level=info msg="component loaded. name: statestore, type: state.redis" app_id=Singerchecker-Player scope=dapr.runtime type=log ver=0.11.3
  5. == DAPR == time="2021-01-06T22:18:14.2757062-08:00" level=info msg="API gRPC server is running on port 59534" app_id=Singerchecker-Player scope=dapr.runtime type=log ver=0.11.3
  6. == DAPR == time="2021-01-06T22:18:14.2767059-08:00" level=info msg="dapr initialized. Status: Running. Init Elapsed 142.0805ms" app_id=Singerchecker-Player scope=dapr.runtime type=log ver=0.11.3
  7. Checking if Dapr sidecar is listening on GRPC port 59534
  8. Dapr sidecar is up and running.
  9. Updating metadata for app command: python pythonState.py
  10. You're up and running!
  11. == APP == State transactions have been completed
  12. == APP == Got items: [b'value1', b'']

事务性状态通过扩展TransactionalState基础对象来支持,它挂接到你的 对象,然后通过setters和getters来提供事务。 而你可能会希望依赖注入框架来替你创建一个事务对象:

再次修改state-example.php文件:

  1. <?php
  2. require_once __DIR__.'/vendor/autoload.php';
  3. #[\Dapr\State\Attributes\StateStore('statestore', \Dapr\consistency\EventualLastWrite::class)]
  4. class MyState extends \Dapr\State\TransactionalState {
  5. public string $key1 = 'value1';
  6. public string $key2 = 'value2';
  7. }
  8. $app = \Dapr\App::create();
  9. $app->run(function(MyState $obj, \Psr\Log\LoggerInterface $logger, \Dapr\State\StateManager $stateManager) {
  10. $obj->begin();
  11. $obj->key1 = 'hello world';
  12. $obj->key2 = 'value3';
  13. $obj->commit();
  14. $logger->alert('Transaction committed!');
  15. // begin a new transaction which reloads from the store
  16. $obj->begin();
  17. $logger->alert("Got value: {key1}, {key2}", ['key1' => $obj->key1, 'key2' => $obj->key2]);
  18. });

运行程序:

  1. dapr --app-id myapp run -- php state-example.php

观察到以下输出:

  1. You're up and running!
  2. == APP == [2021-02-12T17:10:06.837110+01:00] APP.ALERT: Transaction committed! [] []
  3. == APP == [2021-02-12T17:10:06.840857+01:00] APP.ALERT: Got value: hello world, value3 {"key1":"hello world","key2":"value3"} []

下一步