Dapr的 gRPC 接口

在应用程序中使用 Dapr gRPC API

Dapr 和 gRPC

Dapr 为本地调用实现 HTTP 和 gRPC API 。 gRPC适用于低延迟、高性能的场景,并且使用原生客户端进行语言集成。

您可以在这里找到 自动生成的客户端 的列表。

Dapr 运行时实现 服务 ,应用程序可以通过 gRPC 进行通信。

除了通过 gRPC 调用 Dapr , Dapr 还可以通过 gRPC 与应用程序通信。 要做到这一点,应用程序需要托管一个gRPC服务器,并实现Dapr appcallback服务

配置 dapr 以通过 gRPC 与应用程序通信

自托管

当在自己托管模式下运行时,使用 --app-protocol 标志告诉Dapr 使用 gRPC 来与应用程序对话:

  1. dapr run --app-protocol grpc --app-port 5005 node app.js

这将告诉Dapr通过gRPC与您的应用程序通过5005端口进行通信。

Kubernetes

在Kubernetes上,在你的deployment YAML中设置以下注解:

  1. apiVersion: apps/v1
  2. kind: Deployment
  3. metadata:
  4. name: myapp
  5. namespace: default
  6. labels:
  7. app: myapp
  8. spec:
  9. replicas: 1
  10. selector:
  11. matchLabels:
  12. app: myapp
  13. template:
  14. metadata:
  15. labels:
  16. app: myapp
  17. annotations:
  18. dapr.io/enabled: "true"
  19. dapr.io/app-id: "myapp"
  20. dapr.io/app-protocol: "grpc"
  21. dapr.io/app-port: "5005"
  22. ...

使用 gRPC 调用 dapr - 执行示例

下面的步骤显示了如何创建 Dapr 客户端并调用 保存状态数据 操作:

    1. 导入包
  1. package main
  2. import (
  3. "context"
  4. "log"
  5. "os"
  6. dapr "github.com/dapr/go-sdk/client"
  7. )
  1. 创建客户端
  1. // just for this demo
  2. ctx := context.Background()
  3. data := []byte("ping")
  4. // create the client
  5. client, err := dapr.NewClient()
  6. if err != nil {
  7. log.Panic(err)
  8. }
  9. defer client.Close()
  1. 调用 “ 保存状态 “ 方法
  1. // save state with the key key1
  2. err = client.SaveState(ctx, "statestore", "key1", data)
  3. if err != nil {
  4. log.Panic(err)
  5. }
  6. log.Println("data saved")

好耶!

现在你可以探索Dapr客户端上的所有不同方法。

使用 Dapr 创建 gRPC 应用程序

以下步骤将向您显示如何创建一个让Dapr服务器与之通信的应用程序。

    1. 导入包
  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "log"
  6. "net"
  7. "github.com/golang/protobuf/ptypes/any"
  8. "github.com/golang/protobuf/ptypes/empty"
  9. commonv1pb "github.com/dapr/go-sdk/dapr/proto/common/v1"
  10. pb "github.com/dapr/go-sdk/dapr/proto/runtime/v1"
  11. "google.golang.org/grpc"
  12. )
  1. 实现接口
  1. // server is our user app
  2. type server struct {
  3. pb.UnimplementedAppCallbackServer
  4. }
  5. // EchoMethod is a simple demo method to invoke
  6. func (s *server) EchoMethod() string {
  7. return "pong"
  8. }
  9. // This method gets invoked when a remote service has called the app through Dapr
  10. // The payload carries a Method to identify the method, a set of metadata properties and an optional payload
  11. func (s *server) OnInvoke(ctx context.Context, in *commonv1pb.InvokeRequest) (*commonv1pb.InvokeResponse, error) {
  12. var response string
  13. switch in.Method {
  14. case "EchoMethod":
  15. response = s.EchoMethod()
  16. }
  17. return &commonv1pb.InvokeResponse{
  18. ContentType: "text/plain; charset=UTF-8",
  19. Data: &any.Any{Value: []byte(response)},
  20. }, nil
  21. }
  22. // Dapr will call this method to get the list of topics the app wants to subscribe to. In this example, we are telling Dapr
  23. // To subscribe to a topic named TopicA
  24. func (s *server) ListTopicSubscriptions(ctx context.Context, in *empty.Empty) (*pb.ListTopicSubscriptionsResponse, error) {
  25. return &pb.ListTopicSubscriptionsResponse{
  26. Subscriptions: []*pb.TopicSubscription{
  27. {Topic: "TopicA"},
  28. },
  29. }, nil
  30. }
  31. // Dapr will call this method to get the list of bindings the app will get invoked by. In this example, we are telling Dapr
  32. // To invoke our app with a binding named storage
  33. func (s *server) ListInputBindings(ctx context.Context, in *empty.Empty) (*pb.ListInputBindingsResponse, error) {
  34. return &pb.ListInputBindingsResponse{
  35. Bindings: []string{"storage"},
  36. }, nil
  37. }
  38. // This method gets invoked every time a new event is fired from a registerd binding. The message carries the binding name, a payload and optional metadata
  39. func (s *server) OnBindingEvent(ctx context.Context, in *pb.BindingEventRequest) (*pb.BindingEventResponse, error) {
  40. fmt.Println("Invoked from binding")
  41. return &pb.BindingEventResponse{}, nil
  42. }
  43. // This method is fired whenever a message has been published to a topic that has been subscribed. Dapr sends published messages in a CloudEvents 0.3 envelope.
  44. func (s *server) OnTopicEvent(ctx context.Context, in *pb.TopicEventRequest) (*pb.TopicEventResponse, error) {
  45. fmt.Println("Topic message arrived")
  46. return &pb.TopicEventResponse{}, nil
  47. }
  1. 创建服务器
  1. func main() {
  2. // create listener
  3. lis, err := net.Listen("tcp", ":50001")
  4. if err != nil {
  5. log.Fatalf("failed to listen: %v", err)
  6. }
  7. // create grpc server
  8. s := grpc.NewServer()
  9. pb.RegisterAppCallbackServer(s, &server{})
  10. fmt.Println("Client starting...")
  11. // and start...
  12. if err := s.Serve(lis); err != nil {
  13. log.Fatalf("failed to serve: %v", err)
  14. }
  15. }

这将在端口 4000 上为应用程序创建一个 gRPC 服务器。

  1. 运行你的应用

使用 Dapr CLI在本地运行:

  1. dapr run --app-id goapp --app-port 4000 --app-protocol grpc go run main.go

在 Kubernetes 上,设置所需的 dapr.io/app-protocol: "grpc"dapr.io/app-port: " 4000 注释在您的 Pod 规范模板中如上所述。

其他语言

您可以将 Dapr 与 Protobuf 支持的任何语言一起使用,而不只是使用当前可用的生成 SDK。 使用 原型 工具,您可以为 Ruby, C++, Rust 等其他语言生成 Dapr 客户机。

相关主题