Tornado代码阅读笔记 IOLoop
Jun 14 2016准备用Tornado + greenlet + Django ORM搭一个框架,大体上有个思路,在开始前再次阅读下Tornado的代码。目的是在学习Torndao使用的同时,了解下原理,以便在使用过程中少踩点坑。
准备
学习IO多路复用: epoll
http://scotdoyle.com/python-epoll-howto.html
https://fukun.org/archives/10051470.htmlReactor模型
http://blog.csdn.net/u013074465/article/details/46276967
去年大概这个时候也硬着头皮读过Tornado的代码,当时没有经验,还不知道编程模型,在程序的理解上都是顺序执行的思路,所以看到Tornado的代码只会觉得特么的牛B,各种类,各种方法的调来调去。现在理解了Reactor模型以后,再回过头看Tornado,就比较容易理解了,所以在阅读代码前如果能在构架上先理解,读起来会快很多。
Tornado的代码基于2.0版本,最新的4.3版本读起来比较绕,抽象的更加细化,阅读代码的目的在于学习编程思路,不求新。
Hello World
#!/usr/bin/env python
import tornado.httpserver
import tornado.ioloop
import tornado.options
import tornado.web
from tornado.options import define, options
define("port", default=8888, help="run on the given port", type=int)
class MainHandler(tornado.web.RequestHandler):
def get(self):
self.write("Hello, world")
def main():
tornado.options.parse_command_line()
application = tornado.web.Application([
(r"/", MainHandler),
])
http_server = tornado.httpserver.HTTPServer(application)
http_server.listen(options.port)
tornado.ioloop.IOLoop.instance().start()
if __name__ == "__main__":
main()
重点看main
函数,创建了一个HTTPServer的对象,然后就启动IOLoop了。基于对Reactor模型的理解,阅读的过程中,先不看HTTPServer
,直接看IOLoop。
IOLoop
Tornado中IOLoop类实现的功能对应于Reactor模型中的 反应堆, IO多路复用即epoll, 定时器集合
https://github.com/tornadoweb/tornado/blob/branch2.0/tornado/ioloop.py
instacnce方法
Tornado是单进程单线程的,所以在同一个进程中只能有一个运行中的IOLoop,所以要实例化IOLoop是通过instance类方法实现了单例模式,在实例化过程中创建了一些实例变量,比较重要的有这些:
- _handlers
保存add_handler方法添加进来的socket描述符与回调的event handler的字典 - _callbacks
保存add_callback方法添加的待回调的函数 - _timeouts
保存add_timeout方法添加的定时任务 - _events
epoll_wait 执行后的待处理事件字典,key是scoket描述符
反应堆
对应于Reactor模型中的反应堆模块,IOLoop实现了一下方法用来注册/删除/更新event handler
def add_handler(self, fd, handler, events):
"""Registers the given handler to receive the given events for fd."""
self._handlers[fd] = stack_context.wrap(handler) # 加入到回调handler字典中
self._impl.register(fd, events | self.ERROR) # 注册epoll 事件监听
添加监听的socket描述符,事件以及事件handler到_handlers字典中,并且注册监听事件到epoll上
def update_handler(self, fd, events):
"""Changes the events we listen for fd."""
self._impl.modify(fd, events | self.ERROR)
更新socket描述符监听的事件
def remove_handler(self, fd):
"""Stop listening for events on fd."""
self._handlers.pop(fd, None)
self._events.pop(fd, None)
try:
self._impl.unregister(fd)
except (OSError, IOError):
logging.debug("Error deleting fd from IOLoop", exc_info=True)
移除监听的socket描述符
IO多路复用
start方法开启了一个无限循环,每一次循环都会做3件事:
- 遍历_callbacks中的回调函数
- 遍历_timeouts到时间需要处理的定时任务
- 调用epoll_wait获取需要处理的socket事件,调用_handlers中对应socket的事件回调handler
在上面的3步中,真正的IO多路复用只是在第三步,依赖于epoll高效的事件通知机制。
定时器集合
def add_timeout(self, deadline, callback):
"""Calls the given callback at the time deadline from the I/O loop.
Returns a handle that may be passed to remove_timeout to cancel.
"""
timeout = _Timeout(deadline, stack_context.wrap(callback))
heapq.heappush(self._timeouts, timeout)
return timeout
def remove_timeout(self, timeout):
"""Cancels a pending timeout.
The argument is a handle as returned by add_timeout.
"""
# Removing from a heap is complicated, so just leave the defunct
# timeout object in the queue (see discussion in
# http://docs.python.org/library/heapq.html).
# If this turns out to be a problem, we could add a garbage
# collection pass whenever there are too many dead timeouts.
timeout.callback = None
以上2个方法分别是添加定时任务,以及移除定时任务。这里要注意,通过一个_Timeout类的包装,然后通过堆加入定时任务,为的是在start方法中遍历定时任务时优先处理最早的已经到时间的定时任务,在start方法里面是通过heapq。heappop方法来输出最小时间的定时任务。
回调函数列表
除了Reactor模型中的反应堆,IO多路复用,定时器集合,IOLoop还实现了一个回调函数的处理机制。
def add_callback(self, callback):
"""Calls the given callback on the next I/O loop iteration.
It is safe to call this method from any thread at any time.
Note that this is the *only* method in IOLoop that makes this
guarantee; all other interaction with the IOLoop must be done
from that IOLoop's thread. add_callback() may be used to transfer
control from other threads to the IOLoop's thread.
"""
if not self._callbacks:
self._wake()
self._callbacks.append(stack_context.wrap(callback))
基本上了解了这些方法,以及IO多路复用的过程,然后在看一下IOLoop docstring中的例子,整个Reactor模型中调度过程就完全实现了,然后再来看一下HTTPServer类,看看Reactor模型中concrete event handler的实现。
HTTPServer
在HTTPServer实例化过程中保存了处理请求的Application对象。调用listen方法,listen方法调用了2个方法:
- bind
创建了一个非阻塞的socket,并bind,listen
- start
if not self.io_loop:
self.io_loop = ioloop.IOLoop.instance() # 创建 ioloop 对象
for fd in self._sockets.keys(): # 把socket字典中所有的socket加入到ioloop handler列表中,并监听可读事件,回调函数为_handle_events
self.io_loop.add_handler(fd, self._handle_events,
ioloop.IOLoop.READ)
只看这一段,创建了IOLoop实例,调用add_handler注册soket监听可读事件,event handler回调函数是_handle_events方法。
_handle_events
def _handle_events(self, fd, events): # 当接受到服务端socket的可读事件时,建立连接
while True:
try:
connection, address = self._sockets[fd].accept() # 接受socket连接
except socket.error, e:
if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN):
return
raise
stream = iostream.IOStream(connection, io_loop=self.io_loop) # 创建IOStream对象,封装子socket
HTTPConnection(stream, address, self.request_callback,
self.no_keep_alive, self.xheaders) # 创建 HTTPonnection连接对象,处理http请求
省略了大部分,只看关键的地方。_handle_events方法实际上就是Reactor模型概率中的concrete event handler。
在epoll_wait的过程中,如果主socket有可读事件则会回调_handle_events方法,调用accept方法接受子socket,子socket封装为IOStream对象,封装了socket读写相关的操作,使用IOStream对象实例化HTTPConnection对象就完成了整个concrete event handler的过程。
总结
以上就是我在阅读IOLoop代码的一些理解,接下来将重点阅读IOStream相关的代码,以及Tornado协程相关的代码。