4.6 调用RocketMQ

4.6.1 简介

client/rocketmq 包是对github.com/apache/rocketmq-client-go/v2 进行二次封装

4.6.2 配置规范

配置说明4.6 调用RocketMQ - 图1 (opens new window)

4.6.3 用法

访问rocketmq示例4.6 调用RocketMQ - 图2 (opens new window)

  1. // run: go run main.go --config=config.toml
  2. package main
  3. import (
  4. "context"
  5. "fmt"
  6. "strconv"
  7. "github.com/apache/rocketmq-client-go/v2/primitive"
  8. "github.com/douyu/jupiter"
  9. "github.com/douyu/jupiter/pkg/client/rocketmq"
  10. "github.com/douyu/jupiter/pkg/xlog"
  11. )
  12. // run: go run main.go -config=config.toml
  13. type Engine struct {
  14. jupiter.Application
  15. }
  16. func NewEngine() *Engine {
  17. eng := &Engine{}
  18. if err := eng.Startup(
  19. eng.exampleRocketMQProducer,
  20. eng.exampleRocketMQConsumer,
  21. ); err != nil {
  22. xlog.Panic("startup", xlog.Any("err", err))
  23. }
  24. return eng
  25. }
  26. func main() {
  27. app := NewEngine()
  28. if err := app.Run(); err != nil {
  29. panic(err)
  30. }
  31. }
  32. func (eng *Engine) exampleRocketMQConsumer() (err error) {
  33. consumerClient := rocketmq.StdPushConsumerConfig("configName").Build()
  34. defer func() {
  35. if consumerClient.Enable {
  36. _ = consumerClient.Close()
  37. }
  38. }()
  39. consumerClient.Subscribe(consumerClient.ConsumerConfig.Topic, func(ctx context.Context, ext *primitive.MessageExt) error {
  40. fmt.Println("msg...", string(ext.Message.Body))
  41. fmt.Println("msg topic...", string(ext.Message.Topic))
  42. fmt.Println("msg topic tag...", string(ext.Message.GetTags()))
  43. return nil
  44. })
  45. err = consumerClient.Start()
  46. return
  47. }
  48. func (eng *Engine) exampleRocketMQProducer() (err error) {
  49. producerClient := rocketmq.StdProducerConfig("configName").Build()
  50. defer func() {
  51. _ = producerClient.Close()
  52. }()
  53. err = producerClient.Start()
  54. if err != nil {
  55. return
  56. }
  57. for i := 0; i < 10; i++ {
  58. msg := "a" + strconv.Itoa(i)
  59. err = producerClient.Send([]byte(msg))
  60. }
  61. return
  62. }

执行 go run main.go —config=config.toml,可以看到如下图结果 image