操作方法:使用输出绑定连接外部资源

通过输出绑定调用外部系统

使用输出绑定,您可以调用外部资源。 调用请求可发送可选的有效载荷和元数据。

Diagram showing bindings of example service

本指南以 Kafka 绑定为例。 您可以从绑定组件列表} 中找到自己喜欢的绑定规范。 在本指南中

  1. 该示例调用了 /binding 端点,其中 checkout,即要调用的绑定名称。
  2. 有效载荷位于必需的 data 字段中,并且可以是任何 JSON 可序列化的值。
  3. operation字段告诉绑定需要采取什么操作。 例如,Kafka绑定支持create操作

注意

如果你还没有,请尝试使用绑定快速入门快速了解如何使用绑定 API。

创建绑定

创建一个 binding.yaml 文件,并将其保存到应用程序目录中的 components 子文件夹中。

创建一个名称为 checkout 的新绑定组件。 在metadata部分,配置以下与Kafka相关的属性:

  • 您要发布信息的主题
  • Broker

在创建绑定组件时,请指定支持的绑定direction(方向)

使用 --resources-path 标志与 dapr run 命令一起使用,指向您的自定义资源目录。

  1. apiVersion: dapr.io/v1alpha1
  2. kind: Component
  3. metadata:
  4. name: checkout
  5. spec:
  6. type: bindings.kafka
  7. version: v1
  8. metadata:
  9. # Kafka broker connection setting
  10. - name: brokers
  11. value: localhost:9092
  12. # consumer configuration: topic and consumer group
  13. - name: topics
  14. value: sample
  15. - name: consumerGroup
  16. value: group1
  17. # publisher configuration: topic
  18. - name: publishTopic
  19. value: sample
  20. - name: authRequired
  21. value: false
  22. - name: direction
  23. value: output

要将以下 binding.yaml 文件部署到 Kubernetes 集群中,请运行 kubectl apply -f binding.yaml

  1. apiVersion: dapr.io/v1alpha1
  2. kind: Component
  3. metadata:
  4. name: checkout
  5. spec:
  6. type: bindings.kafka
  7. version: v1
  8. metadata:
  9. # Kafka broker connection setting
  10. - name: brokers
  11. value: localhost:9092
  12. # consumer configuration: topic and consumer group
  13. - name: topics
  14. value: sample
  15. - name: consumerGroup
  16. value: group1
  17. # publisher configuration: topic
  18. - name: publishTopic
  19. value: sample
  20. - name: authRequired
  21. value: false
  22. - name: direction
  23. value: output

发送事件(输出绑定)

下面的代码示例利用 Dapr SDK 在运行中的 Dapr 实例上调用输出绑定端点。

  1. //dependencies
  2. using System;
  3. using System.Collections.Generic;
  4. using System.Net.Http;
  5. using System.Net.Http.Headers;
  6. using System.Threading.Tasks;
  7. using Dapr.Client;
  8. using Microsoft.AspNetCore.Mvc;
  9. using System.Threading;
  10. //code
  11. namespace EventService
  12. {
  13. class Program
  14. {
  15. static async Task Main(string[] args)
  16. {
  17. string BINDING_NAME = "checkout";
  18. string BINDING_OPERATION = "create";
  19. while(true)
  20. {
  21. System.Threading.Thread.Sleep(5000);
  22. Random random = new Random();
  23. int orderId = random.Next(1,1000);
  24. using var client = new DaprClientBuilder().Build();
  25. //Using Dapr SDK to invoke output binding
  26. await client.InvokeBindingAsync(BINDING_NAME, BINDING_OPERATION, orderId);
  27. Console.WriteLine("Sending message: " + orderId);
  28. }
  29. }
  30. }
  31. }
  1. //dependencies
  2. import io.dapr.client.DaprClient;
  3. import io.dapr.client.DaprClientBuilder;
  4. import io.dapr.client.domain.HttpExtension;
  5. import org.springframework.boot.autoconfigure.SpringBootApplication;
  6. import org.slf4j.Logger;
  7. import org.slf4j.LoggerFactory;
  8. import java.util.Random;
  9. import java.util.concurrent.TimeUnit;
  10. //code
  11. @SpringBootApplication
  12. public class OrderProcessingServiceApplication {
  13. private static final Logger log = LoggerFactory.getLogger(OrderProcessingServiceApplication.class);
  14. public static void main(String[] args) throws InterruptedException{
  15. String BINDING_NAME = "checkout";
  16. String BINDING_OPERATION = "create";
  17. while(true) {
  18. TimeUnit.MILLISECONDS.sleep(5000);
  19. Random random = new Random();
  20. int orderId = random.nextInt(1000-1) + 1;
  21. DaprClient client = new DaprClientBuilder().build();
  22. //Using Dapr SDK to invoke output binding
  23. client.invokeBinding(BINDING_NAME, BINDING_OPERATION, orderId).block();
  24. log.info("Sending message: " + orderId);
  25. }
  26. }
  27. }
  1. #dependencies
  2. import random
  3. from time import sleep
  4. import requests
  5. import logging
  6. import json
  7. from dapr.clients import DaprClient
  8. #code
  9. logging.basicConfig(level = logging.INFO)
  10. BINDING_NAME = 'checkout'
  11. BINDING_OPERATION = 'create'
  12. while True:
  13. sleep(random.randrange(50, 5000) / 1000)
  14. orderId = random.randint(1, 1000)
  15. with DaprClient() as client:
  16. #Using Dapr SDK to invoke output binding
  17. resp = client.invoke_binding(BINDING_NAME, BINDING_OPERATION, json.dumps(orderId))
  18. logging.basicConfig(level = logging.INFO)
  19. logging.info('Sending message: ' + str(orderId))
  1. //dependencies
  2. import (
  3. "context"
  4. "log"
  5. "math/rand"
  6. "time"
  7. "strconv"
  8. dapr "github.com/dapr/go-sdk/client"
  9. )
  10. //code
  11. func main() {
  12. BINDING_NAME := "checkout";
  13. BINDING_OPERATION := "create";
  14. for i := 0; i < 10; i++ {
  15. time.Sleep(5000)
  16. orderId := rand.Intn(1000-1) + 1
  17. client, err := dapr.NewClient()
  18. if err != nil {
  19. panic(err)
  20. }
  21. defer client.Close()
  22. ctx := context.Background()
  23. //Using Dapr SDK to invoke output binding
  24. in := &dapr.InvokeBindingRequest{ Name: BINDING_NAME, Operation: BINDING_OPERATION , Data: []byte(strconv.Itoa(orderId))}
  25. err = client.InvokeOutputBinding(ctx, in)
  26. log.Println("Sending message: " + strconv.Itoa(orderId))
  27. }
  28. }
  1. //dependencies
  2. import { DaprClient, CommunicationProtocolEnum } from "@dapr/dapr";
  3. //code
  4. const daprHost = "127.0.0.1";
  5. (async function () {
  6. for (var i = 0; i < 10; i++) {
  7. await sleep(2000);
  8. const orderId = Math.floor(Math.random() * (1000 - 1) + 1);
  9. try {
  10. await sendOrder(orderId)
  11. } catch (err) {
  12. console.error(e);
  13. process.exit(1);
  14. }
  15. }
  16. })();
  17. async function sendOrder(orderId) {
  18. const BINDING_NAME = "checkout";
  19. const BINDING_OPERATION = "create";
  20. const client = new DaprClient({
  21. daprHost,
  22. daprPort: process.env.DAPR_HTTP_PORT,
  23. communicationProtocol: CommunicationProtocolEnum.HTTP,
  24. });
  25. //Using Dapr SDK to invoke output binding
  26. const result = await client.binding.send(BINDING_NAME, BINDING_OPERATION, orderId);
  27. console.log("Sending message: " + orderId);
  28. }
  29. function sleep(ms) {
  30. return new Promise(resolve => setTimeout(resolve, ms));
  31. }

还可以使用 HTTP 调用输出绑定端点:

  1. curl -X POST -H 'Content-Type: application/json' http://localhost:3601/v1.0/bindings/checkout -d '{ "data": 100, "operation": "create" }'

观看这个视频,了解如何使用双向输出绑定。

参考资料