Tornado Code Reading - tornado.concurrent
##Future背景
Future 代表一个函数的调用结果. 函数返回一个值或者抛出一个异常, 所以future 包含一个值或者一个异常(可以通过future.result()获得这个结果,异常和返回值). Fuures存在与对应的函数结束前. 在 一个 多线程场景下,简单的调用 future.result()等待另外一个线程或者进程完成。在 异步场景下,你可以附带一个callback给future为了当调用结束可以得到通知。(with future.add_done_callback or io_loop.add_future)
##了解 FutureFutures
已经在Python3.2应用了 concurrent.futures , 如果在python3.2之前版本用 可以 (pip install futures). 那在 tornado 中如果可以将用就用python包,否则将会使用一个兼容的类 tornado.concurrent.Future
class tornado.concurrent.Future
这个类封装了异步操作的结果。在同步程序中, Futures被用来等待一个线程或者进程池的结果。在Tornado一般用在 IOLoop.add_future 或者 在一个 gen.coroutine
1 | """ |
_DummyFuture
- result
1 | """ |
- exception
这个函数先与result唯一的不同是 无异常的时候 return None ;
- add_done_callback(fn)
1 | """ |
TracebackFuture
存储 异常的追踪
DummyExecutor
1 | class DummyExecutor(object): |
run_on_executor
1 | def run_on_executor(fn): |
这里有关于run_on_executor
的应用例子 https://gist.github.com/zs1621/7921770
return_future
- what use: 让函数通过回调返回一个
Future
- how use: @return_future
看 tornado 源码的 test文件 concurrent_test.py;
1 | class ReturnFutureTest(AsyncTestCase): |
联合 concurrent.py -> return_future
1 | def return_future(f): |
###分几种情况 1. 同步无回调 2.同步有回调 3.异步无回调 4.异步有回调
- 同步无回调LOG
f(args, kwargs): future = sync_process()
- a – (<test_return_future.ReturnFutureTest testMethod=test_no_callback>,) {} argsssssssss
- b – None (<test_return_future.ReturnFutureTest testMethod=test_no_callback>,) {‘callback’: <function
at 0xb6ba8f0c>} callback - c – None result——————
- d – (function
at 0xb6ba8f0c>) +++++++++++++
从 a-b 可以理解 replacer.replace 的作用: 提取callback 的值, 并将callback 放入kwargs; 由c 可以知道 f() 函数是不会return 的; f()的结果只能由 future.result() 得到, 只要知道reuturn_future 是返回Future 本身是不会return 的, 如果return 就会报错; 由d 可知匿名函数
fuction <lambda> at 0xb6ba8f0c
赋值给了callback,而这个匿名函数的作用就是set_result
。
- 同步有回调LOG
f(args, kwargs, callback): future = sync_process() callback(future)
- a – (<test_return_future.ReturnFutureTest testMethod=test_callback_kw>,) {‘callback’:<bound method ReturnFutureTest.stop of <test_return_future.ReturnFutureTest testMethod=test_callback_kw>>} argsssssssss
- b – <bound method ReturnFutureTest.stop of <test_return_future.ReturnFutureTest testMethod=test_callback_kw>> (<test_return_future.ReturnFutureTest testMethod=test_callback_kw>,) {‘callback’: <function
at 0xb6bd8f44>} callback - c – None result——————
- d – 额额 function
at 0xb6bd8f44 +++++++++++++ - e – 额额 Future at 0xb6b9cc8cL state=finished returned int +_++++_
与无回调相比; 明显多出 e log; run_callback 就是将 future.result()作为callback的参数运行;
- 异步无回调LOG
f(args, kwargs): future = async_process()
- a – (<test_return_future.ReturnFutureTest testMethod=test_async_future>,) {} argsssssssss
- b – None (<test_return_future.ReturnFutureTest testMethod=test_async_future>,) {‘callback’: <function
at 0x8518f44>} callback - c – None result——————
- d – (function
at 0x8518f44) ++++++++++++++
与同步无回调相比; 看
test_async_future(self)
, 将 f() 获得的 future -> self.io_loop.add_future(future, self.stop) -> self.stop(future) -> 最后通过 self.wait()获得的结果 就是 例子中42 . 现实中一般将 return_future 与 gen.engine 联合使用 , 通过在 gen.engine -> yield f() 获得结果