示例:带有阻塞功能的消息队列

在构建应用程序的时候,我们有时候会遇到一些非常耗时的操作,比如发送邮件、将一条新微博同步给上百万个用户、对硬盘进行大量读写、执行庞大的计算等等。因为这些操作是如此耗时,所以如果我们直接在响应用户请求的过程中执行它们的话,那么用户就需要等待非常长时间。

比如说,为了验证用户身份的有效性,有些网站在注册新用户的时候,会向用户给定的邮件地址发送一封激活邮件,用户只有在点击了验证邮件里面的激活链接之后,新注册的帐号才能够正常使用。

下面这段伪代码展示了一个带有邮件验证功能的帐号注册函数,这个函数不仅会为用户输入的用户名和密码创建新帐号,还会向用户给定的邮件地址发送一封激活:

  1. def register(username, password, email):
  2. # 创建新帐号
  3. create_new_account(username, password)
  4. # 发送激活邮件
  5. send_validate_email(email)
  6. # 向用户返回注册结果
  7. ui_print("帐号注册成功,请访问你的邮箱并激活帐号。")

因为邮件发送操作需要进行复杂的网络信息交换,所以它并不是一个快速的操作,如果我们直接在 send_valid_email() 函数里面执行邮件发送操作的话,那么用户可能就需要等待一段较长的时间才能看到 ui_print() 函数打印出的反馈信息。

为了解决这个问题,在执行 send_validate_email() 函数的时候,我们可以不立即执行邮件发送操作,而是将邮件发送任务放入到一个队列里面,然后由后台的线程负责实际执行。这样的话,程序只需要执行一个入队操作,然后就可以直接向用户反馈注册结果了,这比实际地发送邮件之后再向用户反馈结果要快得多。

代码清单 4-4 展示了一个使用 Redis 实现的消息队列,它使用 RPUSH 命令将消息推入队列,并使用 BLPOP 命令从队列里面取出待处理的消息。


代码清单 4-4 使用列表实现的消息队列:/list/message_queue.py

  1. class MessageQueue:
  2.  
  3. def __init__(self, client, queue_name):
  4. self.client = client
  5. self.queue_name = queue_name
  6.  
  7. def add_message(self, message):
  8. """
  9. 将一条消息放入到队列里面。
  10. """
  11. self.client.rpush(self.queue_name, message)
  12.  
  13. def get_message(self, timeout=0):
  14. """
  15. 从队列里面获取一条消息,
  16. 如果暂时没有消息可用,那么就在 timeout 参数指定的时限内阻塞并等待可用消息出现。
  17.  
  18. timeout 参数的默认值为 0 ,表示一直等待直到消息出现为止。
  19. """
  20. # blpop 的结果可以是 None ,也可以是一个包含两个元素的元组
  21. # 元组的第一个元素是弹出元素的来源队列,而第二个元素则是被弹出的元素
  22. result = self.client.blpop(self.queue_name, timeout)
  23. if result is not None:
  24. source_queue, poped_item = result
  25. return poped_item
  26.  
  27. def len(self):
  28. """
  29. 返回队列目前包含的消息数量。
  30. """
  31. return self.client.llen(self.queue_name)

为了使用这个消息队列,我们通常需要用到两个客户端:

  • 一个客户端作为消息的发送者(sender),它需要将待处理的消息推入到队列里面;

  • 而另一个客户端则作为消息的接收者(receiver)和消费者(consumer),它负责从队列里面取出消息,并根据消息内容进行相应的处理工作。

下面的这段代码展示了一个简单的消息接收者,在没有消息的时候,这个程序将阻塞在 mq.get_message() 调用上面;当有消息(邮件地址)出现时,程序就会打印出该消息并发送邮件:

  1. >>> from redis import Redis
  2. >>> from message_queue import MessageQueue
  3. >>> client = Redis(decode_responses=True)
  4. >>> mq = MessageQueue(client, 'validate user email queue')
  5. >>> while True:
  6. ... email_address = mq.get_message() # 阻塞直到消息出现
  7. ... send_email(email_address) # 打印出邮件地址并发送邮件
  8. ...
  9. peter@exampl.com
  10. jack@spam.com
  11. tom@blahblah.com

而以下代码则展示了消息发送者是如何将消息推入到队列里面的:

  1. >>> from redis import Redis
  2. >>> from message_queue import MessageQueue
  3. >>> client = Redis(decode_responses=True)
  4. >>> mq = MessageQueue(client, 'validate user email queue')
  5. >>> mq.add_message("peter@exampl.com")
  6. >>> mq.add_message("jack@spam.com")
  7. >>> mq.add_message("tom@blahblah.com")

阻塞弹出操作的应用

本节展示的消息队列之所以使用 BLPOP 命令而不是 LPOP 命令来实现出队操作,是因为阻塞弹出操作可以让消息接收者在队列为空的时候自动阻塞,而不必手动进行休眠,从而使得消息处理程序的编写变得更为简单直接,并且还可以有效地节约系统资源。

作为对比,以下代码展示了在使用 LPOP 命令实现出队操作的情况下,如何实现类似上面展示的消息处理程序:

  1. while True:
  2. # 尝试获取消息,如果没有消息,那么返回 None
  3. email_address = mq.get_message()
  4. if email_address is not None:
  5. # 有消息,发送邮件
  6. send_email(email_address)
  7. else:
  8. # 没有消息可用,休眠一百毫秒之后再试
  9. sleep(0.1)

因为缺少自动的阻塞操作,所以这个程序在没有取得消息的情况下,只能以一百毫秒一次的频率去尝试获取消息,如果队列为空的时间比较长,那么这个程序就会发送很多多余的 LPOP 命令,并因此浪费很多 CPU 资源和网络资源。

使用消息队列实现实时提醒

消息队列除了可以在应用程序的内部中使用,还可以用于实现面向用户的实时提醒系统。

比如说,如果我们在构建一个社交网站的话,那么可以使用 JavaScript 脚本,让客户端以异步的方式调用 MessageQueue 类的 get_message() 方法,然后程序就可以在用户被关注的时候、收到了新回复的时候又或者收到新私信的时候,通过调用 add_message() 方法来向用户发送提醒信息。