示例:限速器

为了保障系统的安全性和性能,并保证系统的重要资源不被滥用,应用程序常常会对用户的某些行为进行限制,比如说:

  • 为了防止网站内容被网络爬虫抓取,网站管理者通常会限制每个 IP 地址在固定时间段内能够访问的页面数量—— 比如一分钟之内最多只能访问 30 个页面 ——超过这一限制的用户将被要求进行身份验证,确认本人并非网络爬虫,又或者等到限制解除了之后再进行访问。

  • 为了防止用户的账号遭到暴力破解,网上银行通常会对访客的密码试错次数进行限制,如果一个访客在尝试登录某个账号的过程中,连续好几次输入了错误的密码,那么这个账号将被冻结,只能等到第二天再尝试登录,有的银行还会向账号持有者的手机发送通知来汇报这一情况。

实现这些限制机制的其中一种方法是使用限速器,它可以限制用户在指定时间段之内能够执行某项操作的次数。

代码清单 2-8 展示了一个使用字符串键实现的限速器,这个限速器程序会把操作的最大可执行次数储存在一个字符串键里面,然后在用户每次尝试执行被限制的操作之前,使用 DECR 命令将操作的可执行次数减去一,最后通过检查可执行次数的值来判断是否执行该操作。


代码清单 2-8 倒计时式的限速器:/string/limiter.py

  1. class Limiter:
  2.  
  3. def __init__(self, client, key):
  4. self.client = client
  5. self.key = key
  6.  
  7. def set_max_execute_times(self, max_execute_times):
  8. """
  9. 设置操作的最大可执行次数。
  10. """
  11. self.client.set(self.key, max_execute_times)
  12.  
  13. def still_valid_to_execute(self):
  14. """
  15. 检查是否可以继续执行被限制的操作。
  16. 是的话返回 True ,否则返回 False 。
  17. """
  18. num = self.client.decr(self.key)
  19. return (num >= 0)
  20.  
  21. def remaining_execute_times(self):
  22. """
  23. 返回操作的剩余可执行次数。
  24. """
  25. num = int(self.client.get(self.key))
  26. if num < 0:
  27. return 0
  28. else:
  29. return num

这个限速器的关键在于 set_max_execute_times() 方法和 still_valid_to_execute() 方法:前者用于将最大可执行次数储存在一个字符串键里面,而后者则会在每次被调用时对可执行次数执行减一操作,并检查目前剩余的可执行次数是否已经变为负数:如果为负数则表示可执行次数已经耗尽,不为负数则表示操作可以继续执行。

以下代码展示了这个限制器的使用方法:

  1. >>> from redis import Redis
  2. >>> from limiter import Limiter
  3. >>> client = Redis(decode_responses=True)
  4. >>> limiter = Limiter(client, 'wrong_password_limiter') # 密码错误限制器
  5. >>> limiter.set_max_execute_times(3) # 最多只能输入错三次密码
  6. >>> limiter.still_valid_to_execute() # 前三次操作能够顺利执行
  7. True
  8. >>> limiter.still_valid_to_execute()
  9. True
  10. >>> limiter.still_valid_to_execute()
  11. True
  12. >>> limiter.still_valid_to_execute() # 从第四次开始,操作将被拒绝执行
  13. False
  14. >>> limiter.still_valid_to_execute()
  15. False

而以下伪代码则展示了如何使用这个限速器去限制密码的错误次数:

  1. # 试错次数未超过限制
  2. while limiter.still_valid_to_execute():
  3. # 获取访客输入的账号和密码
  4. account, password = get_user_input_account_and_password()
  5. # 验证账号和密码是否匹配
  6. if password_match(account, password):
  7. ui_print("密码验证成功")
  8. else:
  9. ui_print("密码验证失败,请重新输入")
  10. # 试错次数已超过限制
  11. else:
  12. # 锁定账号
  13. lock_account(account)
  14. ui_print("连续尝试登录失败,账号已被锁定,请明天再来尝试登录。")