以定时器为例研究一手 Python asyncio 的协程事件循环调度

date
Aug 10, 2024
slug
python-asyncio-eventloop
status
Published
tags
Python
summary
事件循环和协程不能不说的那些故事
type
Post
在使用 Python 的 asyncio 库实现异步编程的过程中,协程与事件循环这两个概念可以说有着千丝万缕的联系,常常是形影不离的出现,如胶似漆般的存在,asyncio 库到底是如何调度协程的? 下面以 Python 3.8 中的 asyncio.sleep 定时器为例研究一手 asyncio 的源码实现。

几个主要的概念

首先需要对 asyncio 中的几个主要函数和模块做一个初步认识:
  • asyncio.run 是启动事件循环的入口,接收一个协程作为参数。
  • asyncio.BaseEventLoop 就是事件循环基类了,子类常用的是 _UnixSelectorEventLoop,但核心调度逻辑都在基类中,其中最主要的是 run_forever 函数用来启动事件循环;另一个主要的函数是 create_task ,用来创建一个 Task 对象并放到事件循环中,准备在下一次循环时执行。
  • asyncio.events.Handleasyncio.events.TimerHandle 是放到 loop 中的处理对象,其中 _callback 属性保存的是一个回调函数,处理对象执行时调用的就是这个函数,回调函数参数放在_args 属性中。
  • asyncio.futures.Future 作为一个事件在未来完成的占位符,当事件完成后可通过 Future.set_result 方法将事件的结果设置进去。
  • asyncio.tasks.TaskFuture 类的子类,可以理解为是对协程的包装,在 Future 基础上增加了启动协程和恢复协程的能力,主要逻辑在 Task.__step 函数中。
 

从简单例子开始

先从最简单的一段代码开始
这段代码启动一个 main 协程,协程输出两行内容后完成结束,这里先不加入任何 await 异步操作,主要看一下事件循环是怎样初始化和启动的,只保留了关键代码。
 

loop 的初始化

首先看 asyncio.run 函数,内容比较简单,初始化一个事件循环 loop,然后调用 loop.run_until_complete(main) 启动并传入 main 协程。
 

Task 的初始化

接着来到 asyncio.base_events.BaseEventLoop.run_until_complete,首先调用了 asyncio.tasks.ensure_future 函数,目的是将传入的 main 协程转换成一个 Task 对象,在创建 Task 的过程中会将 Task 对象加入到 loop 的队列中,之后调用 self.run_forever 启动事件循环。
确切的说应该是将 Task.__step 函数包装到 Handle 对象中,之后加入到 loop 的队列中,稍后会看到这个细节。
再看一下 Task.__init__,其中 _coro 保存了传入的协程 coro 对象,实际上可以将 Task 视为一个协程的包装,在初始化的后面调用了 loop.call_soon(self.__step, context=self._context) 函数,将 Task 对象自己的 __step 函数加入到 loop 队列,当 loop 启动后便会执行这个函数。
再看一下 loop.call_soon 做了什么,接受一个 callback 参数,在这里就代表 Task.__step,接着会调用 _call_soon 函数,在 _call_soon 函数中初始化了 events.Handle 对象,然后将 handle 对象加入到 loop._ready 队列中。
在看一眼 Handle 的初始化,主要就是将 callback 保存下来,并且用 args 表示 callback 的参数。
Handle 的一个主要的函数是 _run,当 loop 启动后会从 loop._ready 队列中取出 Handle 执行,执行的就是 _run 函数,_run 函数中 self._context.run(self._callback, *self._args) 其实就是在原有 context 环境下执行回调函数并传入 args 参数。
到这里先总结一下,通过 asyncio.run(main()) 添加了一个协程,然后将协程 main 包装成 Task,并将 Task.__step 包装成 Handle 放到 loop._ready 队列中,接下来就是真正启动 loop 了。
 

loop 的启动

asyncio.base_events.BaseEventLoop.run_until_complete,在封装完 main 协程后会先添加一个回调函数 _run_until_complete_cb,回调函数会在 main 协程执行完后执行,内容就是将 loop 设置成关闭。
接着的 run_forever 函数就是启动 loop 了。
run_forever 中做了一些初始检查和设置,然后进入 while 循环并在循环中调用 _run_once_run_once 就是一次事件循环的核心调度逻辑了。
 

loop 调度的核心逻辑

核心调度逻辑在 _run_once 中。loop 主要有两个队列存放协程任务对应的 Handle,一个是 _scheduled 用来存放定时类协程,它是一个最小堆实现的优先队列,例如使用 asyncio.sleep 就会存进去一个 TimerHandle 对象;另一个是 _ready 用来存放准备好执行的协程,而 _scheduled 中有准备好的协程会取出来放入 _ready 中,loop 最终执行 Handle 都是从 _ready 中取出的。
_run_once 中做的事情分四个部分,第一部分是清理 _scheduled;第二部分是调用多路复用 IO 并处理就绪的描述符;第三部分是将到期的 _scheduled 转移到 _ready;第四部分遍历 _ready 并逐一启动处理函数 handle._run;
Handle._run 没啥说的,直接调用 Handle._callback,并且将 Handle._args 作为参数传进去。
还记得 loop 是怎么启动的吗?将 main 协程包装成 Task,在创建 Task 时将 Task.__step 作为 callback 生成了一个 Handle 并放到了 loop._ready 中,所以这里 Handle._run 其实执行的就是根据 main 协程生成出来的 Task.__stepTask.__step 是协程启动和协程暂停恢复的关键
 

协程的启动

Task._coro 属性保存了协程,通过 result = coro.send(None) 启动协程,由此进入到 main 协程中,打印出 main startmain end
之后 main 协程结束,抛出 StopIteration 异常,调用 super().set_result(exc.value)Task._result 设置结果并将 _state 标记成 _FINISHED,之后调用 __schedule_callbacks 触发 Task 上注册的回调函数,在这里 mian 协程注册的就是 _run_until_complete_cb 用来结束 loop 的,将回调函数放在传给 loop.call_soon 等待下一轮事件循环来触发。
到这里就可能看到一个协程是如何传给 loop 并启动的了,也知道了 loop 的大概流程。下面在 main 中加入 asyncio.sleep 看看定时器是如何调度的。
 

asyncio.sleep 如何定时

main 中加入一个 asyncio.sleep 看看定时是如何实现的
loop 的初始化和启动还是一样的,直接看看 Task.__step 是如何调度的,其中调用 result = coro.send(None) 会启动协程,首先输出 main start,然后调用 asyncio.sleep(3)
 

协程的挂起

首先常见一个空的 Future 对象 future,然后调用的 loop.call_later(delay, futures._set_result_unless_cancelled, future, result),然后一路向下调用 loop.call_at,最后生成了一个 TimerHandle 对象 push 进 loop._scheduled 堆中。
TimerHandle 其实就比 Handle 多了个 _when 属性表示何时可以恢复运行,当时间到了会调用 TimerHandle._run 执行 TimerHandlecallback,也就是 _set_result_unless_cancelled(future, result) 用来给 future 设置结果。
asyncio.sleep 的函数签名是 asyncio.sleep(delay, result=None),一般不传第二个参数所以结果是 None,如果传的话之后会将结果设置到 future 对象里面。
asyncio.sleep 函数的最后将 future 返回并挂起自己,控制权又交还给 Task.__stepresult = coro.send(None) 的位置,result 接到的就是 future 对象。
result 接到 future 后向下执行到 result.add_done_callback(self.__wakeup, context=self._context)future 设置一个回调函数 Task.__wakeup,到这里本轮循环就结束了。
到目前为止 loop 的状态是 _scheduled 堆中有一个 TimerHandle 对象,对象的 _when 表示剩余启动的秒数,对象的 _callback 指向的是 futures._set_result_unless_cancelled 参数是一个 future,这个 futurecallbacks 回调列表中有一个 main 协程生成的 Task.__wakeup
 

协程的恢复

本轮循环结束,下一轮循环时会检查 loop._scheduled 发现 TimerHandle 已经到期,将其放到 loop._ready 队列中,紧接着就取出执行 TimerHandle._run,也就是执行 futures._set_result_unless_cancelled(future, None),其实就是给 future 设置结果、标记完成、执行 future 的回调函数。
还记得 future 是怎么来的以及 future 里面是啥吗?future 是在 asyncio.sleep 时生成并通过 await 返回的,返回给 Task.__step 后通过 add_done_callback(self.__wakeup) 为其添加了一个回调函数。
所以到此为止干的事儿就是遍历 futurecallbacks 逐一通过 loop.call_soon() 添加到 loop 中,等待下一轮事件循环执行,这里添加的就是 main Task__wakeup 函数。
进入下一轮循环,loop._ready 中有一个 Handle,其内部的 _coro 代表的是 main Task__wakeup,取出来执行 Handle._run 实际上就是执行 main Task.__wakeup
__wakeup 也很简单就是确认 future 是已完成状态并调用 __step,控制权有交给了之前挂起的 main Task
Task.__step 再一次执行到 result = coro.send(None) 时,便会恢复之前的 sleep 协程接着执行 return,回到了 main 函数中,继续执行并输出 main end最后完成,抛出 StopIteration 异常,被 Task.__step 捕获,整个协程结束,之后事件循环做收尾工作也关闭,事件循环也关闭,到这里整个程序就结束了。
 

总结

asyncio 中的定时通过 asyncio.sleep 实现,原理是在事件循环中维护一个最小堆实现的优先队列 _scheduled,其中保存的都是定时任务处理对象 Handle,越早到期 Handle 就会越早被取出来并加入到 loop._ready 队列,在下一轮循环时取出并从挂起的位置恢复执行。
由于协程代码在执行时会切换控制权导致代码逻辑跳来跳去,有时会被绕晕,借助定时器的调度可以让整个事件循环的逻辑更加清晰。
 

© 菜皮 2020 - 2025