1. Simple模式

Simple模式 - 图1

  • 消息产生着§将消息放入队列
  • 消息的消费者(consumer) 监听(while) 消息队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列中删除(隐患 消息可能没有被消费者正确处理,已经从队列中消失了,造成消息的丢失)应用场景:聊天(中间有一个过度的服务器;p端,c端)

做simple简单模式之前首先我们新建一个Virtual Host并且给他分配一个用户名,用来隔离数据,根据自己需要自行创建

第一步

Simple模式 - 图2

第二步

Simple模式 - 图3

第三步

Simple模式 - 图4

第四步

Simple模式 - 图5

第五步

Simple模式 - 图6

1.1.1. 代码层面

目录结构

Simple模式 - 图7

kuteng-RabbitMQ

-RabbitMQ

—rabitmq.go //这个是RabbitMQ的封装

-SimlpePublish

—mainSimlpePublish.go //Publish 先启动

-SimpleRecieve

—mainSimpleRecieve.go

rabitmq.go代码

  1. package RabbitMQ
  2. import (
  3. "fmt"
  4. "log"
  5. "github.com/streadway/amqp"
  6. )
  7. //连接信息amqp://kuteng:kuteng@127.0.0.1:5672/kuteng这个信息是固定不变的amqp://事固定参数后面两个是用户名密码ip地址端口号Virtual Host
  8. const MQURL = "amqp://kuteng:kuteng@127.0.0.1:5672/kuteng"
  9. //rabbitMQ结构体
  10. type RabbitMQ struct {
  11. conn *amqp.Connection
  12. channel *amqp.Channel
  13. //队列名称
  14. QueueName string
  15. //交换机名称
  16. Exchange string
  17. //bind Key 名称
  18. Key string
  19. //连接信息
  20. Mqurl string
  21. }
  22. //创建结构体实例
  23. func NewRabbitMQ(queueName string, exchange string, key string) *RabbitMQ {
  24. return &RabbitMQ{QueueName: queueName, Exchange: exchange, Key: key, Mqurl: MQURL}
  25. }
  26. //断开channel 和 connection
  27. func (r *RabbitMQ) Destory() {
  28. r.channel.Close()
  29. r.conn.Close()
  30. }
  31. //错误处理函数
  32. func (r *RabbitMQ) failOnErr(err error, message string) {
  33. if err != nil {
  34. log.Fatalf("%s:%s", message, err)
  35. panic(fmt.Sprintf("%s:%s", message, err))
  36. }
  37. }
  38. //创建简单模式下RabbitMQ实例
  39. func NewRabbitMQSimple(queueName string) *RabbitMQ {
  40. //创建RabbitMQ实例
  41. rabbitmq := NewRabbitMQ(queueName, "", "")
  42. var err error
  43. //获取connection
  44. rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)
  45. rabbitmq.failOnErr(err, "failed to connect rabb"+
  46. "itmq!")
  47. //获取channel
  48. rabbitmq.channel, err = rabbitmq.conn.Channel()
  49. rabbitmq.failOnErr(err, "failed to open a channel")
  50. return rabbitmq
  51. }
  52. //直接模式队列生产
  53. func (r *RabbitMQ) PublishSimple(message string) {
  54. //1.申请队列,如果队列不存在会自动创建,存在则跳过创建
  55. _, err := r.channel.QueueDeclare(
  56. r.QueueName,
  57. //是否持久化
  58. false,
  59. //是否自动删除
  60. false,
  61. //是否具有排他性
  62. false,
  63. //是否阻塞处理
  64. false,
  65. //额外的属性
  66. nil,
  67. )
  68. if err != nil {
  69. fmt.Println(err)
  70. }
  71. //调用channel 发送消息到队列中
  72. r.channel.Publish(
  73. r.Exchange,
  74. r.QueueName,
  75. //如果为true,根据自身exchange类型和routekey规则无法找到符合条件的队列会把消息返还给发送者
  76. false,
  77. //如果为true,当exchange发送消息到队列后发现队列上没有消费者,则会把消息返还给发送者
  78. false,
  79. amqp.Publishing{
  80. ContentType: "text/plain",
  81. Body: []byte(message),
  82. })
  83. }
  84. //simple 模式下消费者
  85. func (r *RabbitMQ) ConsumeSimple() {
  86. //1.申请队列,如果队列不存在会自动创建,存在则跳过创建
  87. q, err := r.channel.QueueDeclare(
  88. r.QueueName,
  89. //是否持久化
  90. false,
  91. //是否自动删除
  92. false,
  93. //是否具有排他性
  94. false,
  95. //是否阻塞处理
  96. false,
  97. //额外的属性
  98. nil,
  99. )
  100. if err != nil {
  101. fmt.Println(err)
  102. }
  103. //接收消息
  104. msgs, err := r.channel.Consume(
  105. q.Name, // queue
  106. //用来区分多个消费者
  107. "", // consumer
  108. //是否自动应答
  109. true, // auto-ack
  110. //是否独有
  111. false, // exclusive
  112. //设置为true,表示 不能将同一个Conenction中生产者发送的消息传递给这个Connection中 的消费者
  113. false, // no-local
  114. //列是否阻塞
  115. false, // no-wait
  116. nil, // args
  117. )
  118. if err != nil {
  119. fmt.Println(err)
  120. }
  121. forever := make(chan bool)
  122. //启用协程处理消息
  123. go func() {
  124. for d := range msgs {
  125. //消息逻辑处理,可以自行设计逻辑
  126. log.Printf("Received a message: %s", d.Body)
  127. }
  128. }()
  129. log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
  130. <-forever
  131. }

mainSimlpePublish.go代码

  1. package main
  2. import (
  3. "fmt"
  4. "github.com/student/kuteng-RabbitMQ/RabbitMQ"
  5. )
  6. func main() {
  7. rabbitmq := RabbitMQ.NewRabbitMQSimple("" +
  8. "kuteng")
  9. rabbitmq.PublishSimple("Hello kuteng222!")
  10. fmt.Println("发送成功!")
  11. }

mainSimpleRecieve.go代码

  1. package main
  2. import "github.com/student/kuteng-RabbitMQ/RabbitMQ"
  3. func main() {
  4. rabbitmq := RabbitMQ.NewRabbitMQSimple("" +
  5. "kuteng")
  6. rabbitmq.ConsumeSimple()
  7. }