ioloop分析

咱们先来简单说一下ioloop的源码
  1. while True:
  2. poll_timeout = 3600.0
  3. # Prevent IO event starvation by delaying new callbacks
  4. # to the next iteration of the event loop.
  5. with self._callback_lock:
  6. #上次循环的回调列表
  7. callbacks = self._callbacks
  8. self._callbacks = []
  9. for callback in callbacks:
  10. #执行遗留回调
  11. self._run_callback(callback)
  12. if self._timeouts:
  13. now = self.time()
  14. while self._timeouts:
  15. #超时回调
  16. if self._timeouts[0].callback is None:
  17. # 最小堆维护超时事件
  18. heapq.heappop(self._timeouts)
  19. elif self._timeouts[0].deadline <= now:
  20. timeout = heapq.heappop(self._timeouts)
  21. self._run_callback(timeout.callback)
  22. else:
  23. seconds = self._timeouts[0].deadline - now
  24. poll_timeout = min(seconds, poll_timeout)
  25. break
  26. if self._callbacks:
  27. # If any callbacks or timeouts called add_callback,
  28. # we don't want to wait in poll() before we run them.
  29. poll_timeout = 0.0
  30. if not self._running:
  31. break
  32. if self._blocking_signal_threshold is not None:
  33. # clear alarm so it doesn't fire while poll is waiting for
  34. # events.
  35. signal.setitimer(signal.ITIMER_REAL, 0, 0)
  36. try:
  37. #这里的poll就是epoll,当有事件发生,就会返回,详情参照tornadoepoll的代码
  38. event_pairs = self._impl.poll(poll_timeout)
  39. except Exception as e:
  40. if (getattr(e, 'errno', None) == errno.EINTR or
  41. (isinstance(getattr(e, 'args', None), tuple) and
  42. len(e.args) == 2 and e.args[0] == errno.EINTR)):
  43. continue
  44. else:
  45. raise
  46. if self._blocking_signal_threshold is not None:
  47. signal.setitimer(signal.ITIMER_REAL,
  48. self._blocking_signal_threshold, 0)
  49. #如果有事件发生,添加事件,
  50. self._events.update(event_pairs)
  51. while self._events:
  52. fd, events = self._events.popitem()
  53. try:
  54. #根据fd找到对应的回调函数,
  55. self._handlers[fd](fd, events)
  56. except (OSError, IOError) as e:
  57. if e.args[0] == errno.EPIPE:
  58. # Happens when the client closes the connection
  59. pass
  60. else:
  61. app_log.error("Exception in I/O handler for fd %s",
  62. fd, exc_info=True)
  63. except Exception:
  64. app_log.error("Exception in I/O handler for fd %s",
  65. fd, exc_info=True)
简单来说一下流程,首先执行上次循环的回调列表,然后调用epoll等待事件的发生,根据fd取出对应的回调函数,然后执行。IOLoop基本是个事件循环,因此它总是被其它模块所调用。
咱们来看一下ioloop的start方法,start 方法中主要分三个部分:一个部分是对超时的相关处理;一部分是 epoll 事件通知阻塞、接收;一部分是对 epoll 返回I/O事件的处理。

1.超时的处理,是用一个最小堆来维护每个回调函数的超时时间,如果超时,取出对应的回调,如果没有则重新设置poll_timeout的值

2.通过 self._impl.poll(poll_timeout) 进行事件阻塞,当有事件通知或超时时 poll 返回特定的 event_pairs,这个上面也说过了。