操作方法:从存储管理配置

了解如何获取应用程序配置并订阅更改

本示例使用 Redis 配置存储组件演示如何检索配置项。

Diagram showing get configuration of example service

注意

如果你还没有,请尝试使用配置快速入门快速了解如何使用配置 API。

在存储中创建配置项目

在支持的配置存储区中创建一个配置项。 这可以是一个简单的键值项,键值可任意选择。 如前所述,本示例使用了 Redis 配置存储组件。

使用 Docker 运行 Redis

  1. docker run --name my-redis -p 6379:6379 -d redis:6

保存项目

使用Redis CLI连接到Redis实例:

  1. redis-cli -p 6379

保存配置项:

  1. MSET orderId1 "101||1" orderId2 "102||1"

配置 Dapr 配置存储

将以下组件文件保存到您的机器上的默认组件文件夹中。 您可以将其用作 Dapr 组件 YAML:

  • 对于使用 kubectl 的 Kubernetes。
  • 使用 Dapr CLI 运行时。

注意

由于Redis配置组件与Redis statestore.yaml组件具有相同的元数据,如果您已经有一个Redis statestore.yaml,您可以简单地复制/更改Redis状态存储组件类型。

  1. apiVersion: dapr.io/v1alpha1
  2. kind: Component
  3. metadata:
  4. name: configstore
  5. spec:
  6. type: configuration.redis
  7. metadata:
  8. - name: redisHost
  9. value: localhost:6379
  10. - name: redisPassword
  11. value: <PASSWORD>

检索配置项目

获取配置

下面的示例展示了如何使用 Dapr 配置 API 获取已保存的配置项。

  1. //dependencies
  2. using System;
  3. using System.Collections.Generic;
  4. using System.Threading.Tasks;
  5. using Dapr.Client;
  6. //code
  7. namespace ConfigurationApi
  8. {
  9. public class Program
  10. {
  11. private static readonly string CONFIG_STORE_NAME = "configstore";
  12. public static async Task Main(string[] args)
  13. {
  14. using var client = new DaprClientBuilder().Build();
  15. var configuration = await client.GetConfiguration(CONFIG_STORE_NAME, new List<string>() { "orderId1", "orderId2" });
  16. Console.WriteLine($"Got key=\n{configuration[0].Key} -> {configuration[0].Value}\n{configuration[1].Key} -> {configuration[1].Value}");
  17. }
  18. }
  19. }
  1. //dependencies
  2. import io.dapr.client.DaprClientBuilder;
  3. import io.dapr.client.DaprClient;
  4. import io.dapr.client.domain.ConfigurationItem;
  5. import io.dapr.client.domain.GetConfigurationRequest;
  6. import io.dapr.client.domain.SubscribeConfigurationRequest;
  7. import reactor.core.publisher.Flux;
  8. import reactor.core.publisher.Mono;
  9. //code
  10. private static final String CONFIG_STORE_NAME = "configstore";
  11. public static void main(String[] args) throws Exception {
  12. try (DaprClient client = (new DaprClientBuilder()).build()) {
  13. List<String> keys = new ArrayList<>();
  14. keys.add("orderId1");
  15. keys.add("orderId2");
  16. GetConfigurationRequest req = new GetConfigurationRequest(CONFIG_STORE_NAME, keys);
  17. try {
  18. Mono<List<ConfigurationItem>> items = client.getConfiguration(req);
  19. items.block().forEach(ConfigurationClient::print);
  20. } catch (Exception ex) {
  21. System.out.println(ex.getMessage());
  22. }
  23. }
  24. }
  1. #dependencies
  2. from dapr.clients import DaprClient
  3. #code
  4. with DaprClient() as d:
  5. CONFIG_STORE_NAME = 'configstore'
  6. keys = ['orderId1', 'orderId2']
  7. #Startup time for dapr
  8. d.wait(20)
  9. configuration = d.get_configuration(store_name=CONFIG_STORE_NAME, keys=[keys], config_metadata={})
  10. print(f"Got key={configuration.items[0].key} value={configuration.items[0].value} version={configuration.items[0].version}")
  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. dapr "github.com/dapr/go-sdk/client"
  6. )
  7. func main() {
  8. ctx := context.Background()
  9. client, err := dapr.NewClient()
  10. if err != nil {
  11. panic(err)
  12. }
  13. items, err := client.GetConfigurationItems(ctx, "configstore", ["orderId1","orderId2"])
  14. if err != nil {
  15. panic(err)
  16. }
  17. for key, item := range items {
  18. fmt.Printf("get config: key = %s value = %s version = %s",key,(*item).Value, (*item).Version)
  19. }
  20. }
  1. import { CommunicationProtocolEnum, DaprClient } from "@dapr/dapr";
  2. // JS SDK does not support Configuration API over HTTP protocol yet
  3. const protocol = CommunicationProtocolEnum.GRPC;
  4. const host = process.env.DAPR_HOST ?? "localhost";
  5. const port = process.env.DAPR_GRPC_PORT ?? 3500;
  6. const DAPR_CONFIGURATION_STORE = "configstore";
  7. const CONFIGURATION_ITEMS = ["orderId1", "orderId2"];
  8. async function main() {
  9. const client = new DaprClient(host, port, protocol);
  10. // Get config items from the config store
  11. try {
  12. const config = await client.configuration.get(DAPR_CONFIGURATION_STORE, CONFIGURATION_ITEMS);
  13. Object.keys(config.items).forEach((key) => {
  14. console.log("Configuration for " + key + ":", JSON.stringify(config.items[key]));
  15. });
  16. } catch (error) {
  17. console.log("Could not get config item, err:" + error);
  18. process.exit(1);
  19. }
  20. }
  21. main().catch((e) => console.error(e));

启动 Dapr Sidecar:

  1. dapr run --app-id orderprocessing --dapr-http-port 3601

在另一个终端中,获取之前保存的配置项:

  1. curl http://localhost:3601/v1.0/configuration/configstore?key=orderId1

启动 Dapr Sidecar:

  1. dapr run --app-id orderprocessing --dapr-http-port 3601

在另一个终端中,获取之前保存的配置项:

  1. Invoke-RestMethod -Uri 'http://localhost:3601/v1.0/configuration/configstore?key=orderId1'

订阅配置项更新

以下是利用 SDK 订阅 [orderId1, orderId2] 的代码示例,使用的是 configstore 商店组件。

  1. using System;
  2. using System.Collections.Generic;
  3. using System.Threading.Tasks;
  4. using Dapr.Client;
  5. const string DAPR_CONFIGURATION_STORE = "configstore";
  6. var CONFIGURATION_KEYS = new List<string> { "orderId1", "orderId2" };
  7. var client = new DaprClientBuilder().Build();
  8. // Subscribe for configuration changes
  9. SubscribeConfigurationResponse subscribe = await client.SubscribeConfiguration(DAPR_CONFIGURATION_STORE, CONFIGURATION_ITEMS);
  10. // Print configuration changes
  11. await foreach (var items in subscribe.Source)
  12. {
  13. // First invocation when app subscribes to config changes only returns subscription id
  14. if (items.Keys.Count == 0)
  15. {
  16. Console.WriteLine("App subscribed to config changes with subscription id: " + subscribe.Id);
  17. subscriptionId = subscribe.Id;
  18. continue;
  19. }
  20. var cfg = System.Text.Json.JsonSerializer.Serialize(items);
  21. Console.WriteLine("Configuration update " + cfg);
  22. }

导航到包含上述代码的目录,然后运行以下命令启动 Dapr sidecar 和订阅程序:

  1. dapr run --app-id orderprocessing -- dotnet run
  1. using System;
  2. using Microsoft.AspNetCore.Hosting;
  3. using Microsoft.Extensions.Hosting;
  4. using Dapr.Client;
  5. using Dapr.Extensions.Configuration;
  6. using System.Collections.Generic;
  7. using System.Threading;
  8. namespace ConfigurationApi
  9. {
  10. public class Program
  11. {
  12. public static void Main(string[] args)
  13. {
  14. Console.WriteLine("Starting application.");
  15. CreateHostBuilder(args).Build().Run();
  16. Console.WriteLine("Closing application.");
  17. }
  18. /// <summary>
  19. /// Creates WebHost Builder.
  20. /// </summary>
  21. /// <param name="args">Arguments.</param>
  22. /// <returns>Returns IHostbuilder.</returns>
  23. public static IHostBuilder CreateHostBuilder(string[] args)
  24. {
  25. var client = new DaprClientBuilder().Build();
  26. return Host.CreateDefaultBuilder(args)
  27. .ConfigureAppConfiguration(config =>
  28. {
  29. // Get the initial value and continue to watch it for changes.
  30. config.AddDaprConfigurationStore("configstore", new List<string>() { "orderId1","orderId2" }, client, TimeSpan.FromSeconds(20));
  31. config.AddStreamingDaprConfigurationStore("configstore", new List<string>() { "orderId1","orderId2" }, client, TimeSpan.FromSeconds(20));
  32. })
  33. .ConfigureWebHostDefaults(webBuilder =>
  34. {
  35. webBuilder.UseStartup<Startup>();
  36. });
  37. }
  38. }
  39. }

导航到包含上述代码的目录,然后运行以下命令启动 Dapr sidecar 和订阅程序:

  1. dapr run --app-id orderprocessing -- dotnet run
  1. import io.dapr.client.DaprClientBuilder;
  2. import io.dapr.client.DaprClient;
  3. import io.dapr.client.domain.ConfigurationItem;
  4. import io.dapr.client.domain.GetConfigurationRequest;
  5. import io.dapr.client.domain.SubscribeConfigurationRequest;
  6. import reactor.core.publisher.Flux;
  7. import reactor.core.publisher.Mono;
  8. //code
  9. private static final String CONFIG_STORE_NAME = "configstore";
  10. private static String subscriptionId = null;
  11. public static void main(String[] args) throws Exception {
  12. try (DaprClient client = (new DaprClientBuilder()).build()) {
  13. // Subscribe for config changes
  14. List<String> keys = new ArrayList<>();
  15. keys.add("orderId1");
  16. keys.add("orderId2");
  17. Flux<SubscribeConfigurationResponse> subscription = client.subscribeConfiguration(DAPR_CONFIGURATON_STORE,keys);
  18. // Read config changes for 20 seconds
  19. subscription.subscribe((response) -> {
  20. // First ever response contains the subscription id
  21. if (response.getItems() == null || response.getItems().isEmpty()) {
  22. subscriptionId = response.getSubscriptionId();
  23. System.out.println("App subscribed to config changes with subscription id: " + subscriptionId);
  24. } else {
  25. response.getItems().forEach((k, v) -> {
  26. System.out.println("Configuration update for " + k + ": {'value':'" + v.getValue() + "'}");
  27. });
  28. }
  29. });
  30. Thread.sleep(20000);
  31. }
  32. }

导航到包含上述代码的目录,然后运行以下命令启动 Dapr sidecar 和订阅程序:

  1. dapr run --app-id orderprocessing -- -- mvn spring-boot:run
  1. #dependencies
  2. from dapr.clients import DaprClient
  3. #code
  4. def handler(id: str, resp: ConfigurationResponse):
  5. for key in resp.items:
  6. print(f"Subscribed item received key={key} value={resp.items[key].value} "
  7. f"version={resp.items[key].version} "
  8. f"metadata={resp.items[key].metadata}", flush=True)
  9. def executeConfiguration():
  10. with DaprClient() as d:
  11. storeName = 'configurationstore'
  12. keys = ['orderId1', 'orderId2']
  13. id = d.subscribe_configuration(store_name=storeName, keys=keys,
  14. handler=handler, config_metadata={})
  15. print("Subscription ID is", id, flush=True)
  16. sleep(20)
  17. executeConfiguration()

导航到包含上述代码的目录,然后运行以下命令启动 Dapr sidecar 和订阅程序:

  1. dapr run --app-id orderprocessing -- python3 OrderProcessingService.py
  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "time"
  6. dapr "github.com/dapr/go-sdk/client"
  7. )
  8. func main() {
  9. ctx := context.Background()
  10. client, err := dapr.NewClient()
  11. if err != nil {
  12. panic(err)
  13. }
  14. subscribeID, err := client.SubscribeConfigurationItems(ctx, "configstore", []string{"orderId1", "orderId2"}, func(id string, items map[string]*dapr.ConfigurationItem) {
  15. for k, v := range items {
  16. fmt.Printf("get updated config key = %s, value = %s version = %s \n", k, v.Value, v.Version)
  17. }
  18. })
  19. if err != nil {
  20. panic(err)
  21. }
  22. time.Sleep(20*time.Second)
  23. }

导航到包含上述代码的目录,然后运行以下命令启动 Dapr sidecar 和订阅程序:

  1. dapr run --app-id orderprocessing -- go run main.go
  1. import { CommunicationProtocolEnum, DaprClient } from "@dapr/dapr";
  2. // JS SDK does not support Configuration API over HTTP protocol yet
  3. const protocol = CommunicationProtocolEnum.GRPC;
  4. const host = process.env.DAPR_HOST ?? "localhost";
  5. const port = process.env.DAPR_GRPC_PORT ?? 3500;
  6. const DAPR_CONFIGURATION_STORE = "configstore";
  7. const CONFIGURATION_ITEMS = ["orderId1", "orderId2"];
  8. async function main() {
  9. const client = new DaprClient(host, port, protocol);
  10. // Subscribe to config updates
  11. try {
  12. const stream = await client.configuration.subscribeWithKeys(
  13. DAPR_CONFIGURATION_STORE,
  14. CONFIGURATION_ITEMS,
  15. (config) => {
  16. console.log("Configuration update", JSON.stringify(config.items));
  17. }
  18. );
  19. // Unsubscribe to config updates and exit app after 20 seconds
  20. setTimeout(() => {
  21. stream.stop();
  22. console.log("App unsubscribed to config changes");
  23. process.exit(0);
  24. }, 20000);
  25. } catch (error) {
  26. console.log("Error subscribing to config updates, err:" + error);
  27. process.exit(1);
  28. }
  29. }
  30. main().catch((e) => console.error(e));

导航到包含上述代码的目录,然后运行以下命令启动 Dapr sidecar 和订阅程序:

  1. dapr run --app-id orderprocessing --app-protocol grpc --dapr-grpc-port 3500 -- node index.js

退订配置项更新

订阅监视配置项目后,您将收到所有订阅密钥的更新。 要停止接收更新,您需要明确调用取消订阅 API。

以下代码示例展示了如何使用取消订阅 API 取消订阅配置更新。

  1. using System;
  2. using System.Collections.Generic;
  3. using System.Threading.Tasks;
  4. using Dapr.Client;
  5. const string DAPR_CONFIGURATION_STORE = "configstore";
  6. var client = new DaprClientBuilder().Build();
  7. // Unsubscribe to config updates and exit the app
  8. async Task unsubscribe(string subscriptionId)
  9. {
  10. try
  11. {
  12. await client.UnsubscribeConfiguration(DAPR_CONFIGURATION_STORE, subscriptionId);
  13. Console.WriteLine("App unsubscribed from config changes");
  14. Environment.Exit(0);
  15. }
  16. catch (Exception ex)
  17. {
  18. Console.WriteLine("Error unsubscribing from config updates: " + ex.Message);
  19. }
  20. }
  1. import io.dapr.client.DaprClientBuilder;
  2. import io.dapr.client.DaprClient;
  3. import io.dapr.client.domain.ConfigurationItem;
  4. import io.dapr.client.domain.GetConfigurationRequest;
  5. import io.dapr.client.domain.SubscribeConfigurationRequest;
  6. import reactor.core.publisher.Flux;
  7. import reactor.core.publisher.Mono;
  8. //code
  9. private static final String CONFIG_STORE_NAME = "configstore";
  10. private static String subscriptionId = null;
  11. public static void main(String[] args) throws Exception {
  12. try (DaprClient client = (new DaprClientBuilder()).build()) {
  13. // Unsubscribe from config changes
  14. UnsubscribeConfigurationResponse unsubscribe = client
  15. .unsubscribeConfiguration(subscriptionId, DAPR_CONFIGURATON_STORE).block();
  16. if (unsubscribe.getIsUnsubscribed()) {
  17. System.out.println("App unsubscribed to config changes");
  18. } else {
  19. System.out.println("Error unsubscribing to config updates, err:" + unsubscribe.getMessage());
  20. }
  21. } catch (Exception e) {
  22. System.out.println("Error unsubscribing to config updates," + e.getMessage());
  23. System.exit(1);
  24. }
  25. }
  1. import asyncio
  2. import time
  3. import logging
  4. from dapr.clients import DaprClient
  5. subscriptionID = ""
  6. with DaprClient() as d:
  7. isSuccess = d.unsubscribe_configuration(store_name='configstore', id=subscriptionID)
  8. print(f"Unsubscribed successfully? {isSuccess}", flush=True)
  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "log"
  7. "os"
  8. "time"
  9. dapr "github.com/dapr/go-sdk/client"
  10. )
  11. var DAPR_CONFIGURATION_STORE = "configstore"
  12. var subscriptionID = ""
  13. func main() {
  14. client, err := dapr.NewClient()
  15. if err != nil {
  16. log.Panic(err)
  17. }
  18. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  19. defer cancel()
  20. if err := client.UnsubscribeConfigurationItems(ctx, DAPR_CONFIGURATION_STORE , subscriptionID); err != nil {
  21. panic(err)
  22. }
  23. }
  1. import { CommunicationProtocolEnum, DaprClient } from "@dapr/dapr";
  2. // JS SDK does not support Configuration API over HTTP protocol yet
  3. const protocol = CommunicationProtocolEnum.GRPC;
  4. const host = process.env.DAPR_HOST ?? "localhost";
  5. const port = process.env.DAPR_GRPC_PORT ?? 3500;
  6. const DAPR_CONFIGURATION_STORE = "configstore";
  7. const CONFIGURATION_ITEMS = ["orderId1", "orderId2"];
  8. async function main() {
  9. const client = new DaprClient(host, port, protocol);
  10. try {
  11. const stream = await client.configuration.subscribeWithKeys(
  12. DAPR_CONFIGURATION_STORE,
  13. CONFIGURATION_ITEMS,
  14. (config) => {
  15. console.log("Configuration update", JSON.stringify(config.items));
  16. }
  17. );
  18. setTimeout(() => {
  19. // Unsubscribe to config updates
  20. stream.stop();
  21. console.log("App unsubscribed to config changes");
  22. process.exit(0);
  23. }, 20000);
  24. } catch (error) {
  25. console.log("Error subscribing to config updates, err:" + error);
  26. process.exit(1);
  27. }
  28. }
  29. main().catch((e) => console.error(e));
  1. curl 'http://localhost:<DAPR_HTTP_PORT>/v1.0/configuration/configstore/<subscription-id>/unsubscribe'
  1. Invoke-RestMethod -Uri 'http://localhost:<DAPR_HTTP_PORT>/v1.0/configuration/configstore/<subscription-id>/unsubscribe'

下一步