1. 消息队列的简介

什么是消息队列呢?队列就是排队,就像在银行办理业务排队一样,排在最前面的先处理,后面的后处理,按照顺序来,先进先出。这个队列可以是程序,可以是数据,也可以是任务,是任何你可以存储的东西。消息队列就是给队列传递消息。这么来说,打个比方,我们在一个网站上注册了账号,系统可能会给你发送一封注册邮件,同时在页面上提示你”稍等几分钟后会收到一封邮件”,发邮件这个事是通过操作系统的调用,例如linux的sendmail,或者接口来发送的,发邮件是通过排队来发的,先到的先发,假如很多邮件等着发,那就得像银行那样排队了,所以未必就能实时,总有延迟。总结来说,发邮件这个事是有延迟的,是需要等待之后用户才能收到邮件的。然而,这种延迟对用户的体验还有操作并不影响啊。在网站上的其他应用他还是照样用,没有任何影响。对这种对用户没直接影响或者有延迟的任务就可以扔到消息队列处理。所以,发邮件,发短信,捕获异常等任务都可以扔到消息队列。也就是消息队列是独立于web进程的另一个进程,因为它有可能耗时很长的,所以要另开一个进程来处理,对web进程没有任务影响,用户还是照常访问网站。这么来说,假如网站有一个需要扔的消息队列叫A,但用户触发了A,就把A扔到消息队列,这时给用户感觉是这个A任务是一瞬间完成,其实它是给消息队列那个进程发送了消息,可能跟它说,我要发短信,就把发短信这个指令,加上短信的内容一并传给消息队列的进程,消息队列收到消息后,就把这个任务放到队列中进行排队,因为前面还有一堆任务没处理,所以要慢慢处理,轮到A的时候才处理A,由于A是耗时的动作所以就慢慢处理就好,反正对用户不太影响。

前面说到,消息队列的进程要把任务放入队列中,由于有很多任务,需要排队,所以这些任务是需要存储起来的。在ruby中,有很多gem可以实现后台的消息队列,但它们的存储方式有区别,比如delayed_job就是用数据库(MySQL,Sqlite,PostgreSQL等)来存的,它会先让你创建表,如果有任务进来,就会插入到表中作为一条记录,要处理的时候就会取出这条记录。像resquesidekiq就是用redis来存储数据的,redis是存储在计算机的内存中的。比较一下,就知道resque和sidekiq在存储方式上比delayed_job有优势,而delayed_job的好处是能直接利用数据库,不用额外安装redis。

消息队列是另外的一个进程,任务进入消息队列中,一个接一个的处理,也就是说,A进程在被处理时,必须等前面的任务被处理完才能轮到它。这种方式体现在delayed_job和resque中。sidekiq的处理方式是多线程的,它是基于celluloid的,用Actor作为并发模型,它能同时处理多个任务。

值得一提的是Ruby on Rails从4.2开始加上了active_job。因为有各种各样的消息队列的解决方案,active_job就是提供了统一的接口和调用,要用到消息队列还是会用到上面提到的几个。这个东西就像activerecord一样,要指定数据库那也是很简单的,只要换相应的gem和改配置文件就行了,而active_job也正是这样。

不过,这一篇文章不会详说上面的三种消息队列的实现,只会说到特用于PostgreSQL的消息队列queue_classic

2. PostgreSQL的listen/notify

queue_classic是基于PostgreSQL的listen/notify来实现的,列队在等任务进来就是用的listen,把任务放入队列就是notify。

PostgreSQL的listen/notify,也就是一种消息的订阅/发布模式,也就是类似那种生产者/消费者模式。这种模式很常见,例如redis的pub/sub模式、rails的Notifications组件。

懂了PostgreSQL的listen/notify,也就等于懂了其他的订阅/发布模式。

它很简单,就相当于一种广播机制,比如,你定阅杂志,还有其他人也订阅了,这个过程就叫listen,也就是监听,等杂志有更新了,或者有新的杂志出来,它就会广播,就会送一份给订阅杂志的人,这个过程就是notify,也就是通知。

listen/notify的使用很简单。

首先是listen(监听),只接监听的通道的名称,这个名称自己定义。

  1. rails365_pro=# LISTEN virtual;
  2. LISTEN

这个时候可以直接执行notify。

  1. rails365_pro=# NOTIFY virtual;
  2. NOTIFY
  3. Asynchronous notification "virtual" received from server process with PID 4996.

表示监听的通道已知收到消息了。

还可以传参数。

  1. rails365_pro=# NOTIFY virtual, 'This is the payload';
  2. NOTIFY
  3. Asynchronous notification "virtual" with payload "This is the payload" received from server process with PID 4996.

也可以结合sql语句来使用。

  1. LISTEN foo;
  2. SELECT pg_notify('fo' || 'o', 'pay' || 'load');

只要连接到同一个数据库的所有session都会接到监听通道传过来的信息。

可以尝试另开一个psql进程。然后notify,再回到之前的psql执行listen就可以测试的,如果显示正确的pid就成功的。

完结。