Pub/sub

创建一个Pub/sub组件只需要几个基本步骤。

导入pub/sub包

创建文件components/pubsub.go并添加import语句,用于Pub/sub相关的包。

  1. package components
  2. import (
  3. "context"
  4. "github.com/dapr/components-contrib/pubsub"
  5. )

实现 PubSub 接口

创建一个实现PubSub接口的类型。

  1. type MyPubSubComponent struct {
  2. }
  3. func (component *MyPubSubComponent) Init(metadata pubsub.Metadata) error {
  4. // Called to initialize the component with its configured metadata...
  5. }
  6. func (component *MyPubSubComponent) Close() error {
  7. // Not used with pluggable components...
  8. return nil
  9. }
  10. func (component *MyPubSubComponent) Features() []pubsub.Feature {
  11. // Return a list of features supported by the component...
  12. }
  13. func (component *MyPubSubComponent) Publish(req *pubsub.PublishRequest) error {
  14. // Send the message to the "topic"...
  15. }
  16. func (component *MyPubSubComponent) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error {
  17. // Until canceled, check the topic for messages and deliver them to the Dapr runtime...
  18. }

调用Subscribe()方法应该设置一个长期存在的机制来获取消息,但立即返回nil(或错误,如果无法设置该机制)。 当被取消时(例如,通过ctx.Done()ctx.Err() != nil),机制应该结束。 “topic”从中拉取消息的方式是通过req参数传递,而将消息传递给Dapr运行时是通过handler回调函数执行的。 回调函数在应用程序(由 Dapr 运行时提供服务)确认处理消息后才返回。

  1. func (component *MyPubSubComponent) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error {
  2. go func() {
  3. for {
  4. err := ctx.Err()
  5. if err != nil {
  6. return
  7. }
  8. messages := // Poll for messages...
  9. for _, message := range messages {
  10. handler(ctx, &pubsub.NewMessage{
  11. // Set the message content...
  12. })
  13. }
  14. select {
  15. case <-ctx.Done():
  16. case <-time.After(5 * time.Second):
  17. }
  18. }
  19. }()
  20. return nil
  21. }

注册pub/sub组件

在主应用程序文件中(例如,main.go),注册 Pub/sub 组件到应用程序。

  1. package main
  2. import (
  3. "example/components"
  4. dapr "github.com/dapr-sandbox/components-go-sdk"
  5. "github.com/dapr-sandbox/components-go-sdk/pubsub/v1"
  6. )
  7. func main() {
  8. dapr.Register("<socket name>", dapr.WithPubSub(func() pubsub.PubSub {
  9. return &components.MyPubSubComponent{}
  10. }))
  11. dapr.MustRun()
  12. }

下一步