1. Routing模式(路由模式,一个消息被多个消费者获取,并且消息的目标队列可被生产者指定)

Routing模式 - 图1

  • 消息生产者将消息发送给交换机按照路由判断,路由是字符串(info) 当前产生的消息携带路由字符(对象的方法),交换机根据路由的key,只能匹配上路由key对应的消息队列,对应的消费者才能消费消息;
  • 根据业务功能定义路由字符串
  • 从系统的代码逻辑中获取对应的功能字符串,将消息任务扔到对应的队列中业务场景:error 通知;EXCEPTION;错误通知的功能;传统意义的错误通知;客户通知;利用key路由,可以将程序中的错误封装成消息传入到消息队列中,开发者可以自定义消费者,实时接收错误;

目录结构

Routing模式 - 图2

kuteng-RabbitMQ

-RabbitMQ

—rabitmq.go //这个是RabbitMQ的封装

-publish

—mainpublish.go //Publish 先启动

-recieve1

—mainrecieve.go

-recieve2

—mainrecieve.go

rabitmq.go代码

  1. package RabbitMQ
  2. import (
  3. "fmt"
  4. "log"
  5. "github.com/streadway/amqp"
  6. )
  7. //连接信息
  8. const MQURL = "amqp://kuteng:kuteng@127.0.0.1:5672/kuteng"
  9. //rabbitMQ结构体
  10. type RabbitMQ struct {
  11. conn *amqp.Connection
  12. channel *amqp.Channel
  13. //队列名称
  14. QueueName string
  15. //交换机名称
  16. Exchange string
  17. //bind Key 名称
  18. Key string
  19. //连接信息
  20. Mqurl string
  21. }
  22. //创建结构体实例
  23. func NewRabbitMQ(queueName string, exchange string, key string) *RabbitMQ {
  24. return &RabbitMQ{QueueName: queueName, Exchange: exchange, Key: key, Mqurl: MQURL}
  25. }
  26. //断开channel 和 connection
  27. func (r *RabbitMQ) Destory() {
  28. r.channel.Close()
  29. r.conn.Close()
  30. }
  31. //错误处理函数
  32. func (r *RabbitMQ) failOnErr(err error, message string) {
  33. if err != nil {
  34. log.Fatalf("%s:%s", message, err)
  35. panic(fmt.Sprintf("%s:%s", message, err))
  36. }
  37. }
  38. //路由模式
  39. //创建RabbitMQ实例
  40. func NewRabbitMQRouting(exchangeName string, routingKey string) *RabbitMQ {
  41. //创建RabbitMQ实例
  42. rabbitmq := NewRabbitMQ("", exchangeName, routingKey)
  43. var err error
  44. //获取connection
  45. rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)
  46. rabbitmq.failOnErr(err, "failed to connect rabbitmq!")
  47. //获取channel
  48. rabbitmq.channel, err = rabbitmq.conn.Channel()
  49. rabbitmq.failOnErr(err, "failed to open a channel")
  50. return rabbitmq
  51. }
  52. //路由模式发送消息
  53. func (r *RabbitMQ) PublishRouting(message string) {
  54. //1.尝试创建交换机
  55. err := r.channel.ExchangeDeclare(
  56. r.Exchange,
  57. //要改成direct
  58. "direct",
  59. true,
  60. false,
  61. false,
  62. false,
  63. nil,
  64. )
  65. r.failOnErr(err, "Failed to declare an excha"+
  66. "nge")
  67. //2.发送消息
  68. err = r.channel.Publish(
  69. r.Exchange,
  70. //要设置
  71. r.Key,
  72. false,
  73. false,
  74. amqp.Publishing{
  75. ContentType: "text/plain",
  76. Body: []byte(message),
  77. })
  78. }
  79. //路由模式接受消息
  80. func (r *RabbitMQ) RecieveRouting() {
  81. //1.试探性创建交换机
  82. err := r.channel.ExchangeDeclare(
  83. r.Exchange,
  84. //交换机类型
  85. "direct",
  86. true,
  87. false,
  88. false,
  89. false,
  90. nil,
  91. )
  92. r.failOnErr(err, "Failed to declare an exch"+
  93. "ange")
  94. //2.试探性创建队列,这里注意队列名称不要写
  95. q, err := r.channel.QueueDeclare(
  96. "", //随机生产队列名称
  97. false,
  98. false,
  99. true,
  100. false,
  101. nil,
  102. )
  103. r.failOnErr(err, "Failed to declare a queue")
  104. //绑定队列到 exchange 中
  105. err = r.channel.QueueBind(
  106. q.Name,
  107. //需要绑定key
  108. r.Key,
  109. r.Exchange,
  110. false,
  111. nil)
  112. //消费消息
  113. messges, err := r.channel.Consume(
  114. q.Name,
  115. "",
  116. true,
  117. false,
  118. false,
  119. false,
  120. nil,
  121. )
  122. forever := make(chan bool)
  123. go func() {
  124. for d := range messges {
  125. log.Printf("Received a message: %s", d.Body)
  126. }
  127. }()
  128. fmt.Println("退出请按 CTRL+C\n")
  129. <-forever
  130. }

mainpublish.go代码

  1. package main
  2. import (
  3. "fmt"
  4. "strconv"
  5. "time"
  6. "github.com/student/kuteng-RabbitMQ/RabbitMQ"
  7. )
  8. func main() {
  9. kutengone := RabbitMQ.NewRabbitMQRouting("kuteng", "kuteng_one")
  10. kutengtwo := RabbitMQ.NewRabbitMQRouting("kuteng", "kuteng_two")
  11. for i := 0; i <= 100; i++ {
  12. kutengone.PublishRouting("Hello kuteng one!" + strconv.Itoa(i))
  13. kutengtwo.PublishRouting("Hello kuteng Two!" + strconv.Itoa(i))
  14. time.Sleep(1 * time.Second)
  15. fmt.Println(i)
  16. }
  17. }

recieve1/mainrecieve.go代码

  1. package main
  2. import "github.com/student/kuteng-RabbitMQ/RabbitMQ"
  3. func main() {
  4. kutengone := RabbitMQ.NewRabbitMQRouting("kuteng", "kuteng_one")
  5. kutengone.RecieveRouting()
  6. }

recieve2/mainrecieve.go代码

  1. package main
  2. import "github.com/student/kuteng-RabbitMQ/RabbitMQ"
  3. func main() {
  4. kutengtwo := RabbitMQ.NewRabbitMQRouting("kuteng", "kuteng_two")
  5. kutengtwo.RecieveRouting()
  6. }