问题:异步实际上是如何工作的?
这个问题是由我的另一个问题引起的:如何在cdef中等待?
网路上有关于的大量文章和网志文章asyncio
,但它们都是非常肤浅的。我找不到任何有关如何asyncio
实际实现以及使I / O异步的信息。我正在尝试阅读源代码,但是它是数千行,不是最高等级的C代码,其中很多处理辅助对象,但是最关键的是,很难在Python语法和它将翻译的C代码之间进行连接入。
Asycnio自己的文档甚至没有帮助。那里没有关于它如何工作的信息,只有一些有关如何使用它的指南,有时也会引起误解/写得很差。
我熟悉Go的协程实现,并希望Python做同样的事情。如果是这样的话,我在上面链接的帖子中出现的代码将奏效。既然没有,我现在想找出原因。到目前为止,我最好的猜测如下,请纠正我错的地方:
- 形式的过程定义
async def foo(): ...
实际上被解释为类继承的方法coroutine
。 - 也许
async def
实际上是通过await
语句分为多个方法,在这些方法上被调用的对象能够跟踪到目前为止执行所取得的进展。 - 如果上述条件成立,那么从本质上讲,协程的执行归结为某个全局管理器调用循环对象的方法(循环?)。
- 全局管理器以某种方式(如何?)知道何时由Python代码执行I / O操作(仅?),并且能够选择当前执行方法放弃控制后执行的待处理协程方法之一(命中该
await
语句) )。
换句话说,这是我尝试将某些asyncio
语法“简化”为更易于理解的内容:
async def coro(name):
print('before', name)
await asyncio.sleep()
print('after', name)
asyncio.gather(coro('first'), coro('second'))
# translated from async def coro(name)
class Coro(coroutine):
def before(self, name):
print('before', name)
def after(self, name):
print('after', name)
def __init__(self, name):
self.name = name
self.parts = self.before, self.after
self.pos = 0
def __call__():
self.parts[self.pos](self.name)
self.pos += 1
def done(self):
return self.pos == len(self.parts)
# translated from asyncio.gather()
class AsyncIOManager:
def gather(*coros):
while not every(c.done() for c in coros):
coro = random.choice(coros)
coro()
如果我的猜测证明是正确的:那么我有一个问题。在这种情况下,I / O实际如何发生?在单独的线程中?整个解释器是否已暂停并且I / O在解释器外部进行?I / O到底是什么意思?如果我的python过程称为C open()
过程,然后它又向内核发送了中断,放弃了对它的控制,那么Python解释器如何知道这一点并能够继续运行其他代码,而内核代码则执行实际的I / O,直到它唤醒了最初发送中断的Python过程?原则上,Python解释器如何知道这种情况?
回答 0
asyncio如何工作?
在回答这个问题之前,我们需要了解一些基本术语,如果您已经知道一些基本术语,请跳过这些基本术语。
生成器
生成器是使我们能够暂停python函数执行的对象。用户策划的生成器使用关键字实现yield
。通过创建包含yield
关键字的普通函数,我们将该函数转换为生成器:
>>> def test():
... yield 1
... yield 2
...
>>> gen = test()
>>> next(gen)
1
>>> next(gen)
2
>>> next(gen)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
StopIteration
如您所见,调用next()
生成器会导致解释器加载测试的帧,并返回yield
ed值。next()
再次调用,使框架再次加载到解释器堆栈中,并继续yield
输入另一个值。
到第三次next()
调用时,我们的生成器完成了StopIteration
并被抛出。
与生成器通讯
生成器的鲜为人知的特点是,你可以与他们使用两种方法进行通信的事实:send()
和throw()
。
>>> def test():
... val = yield 1
... print(val)
... yield 2
... yield 3
...
>>> gen = test()
>>> next(gen)
1
>>> gen.send("abc")
abc
2
>>> gen.throw(Exception())
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "<stdin>", line 4, in test
Exception
调用时gen.send()
,该值作为yield
关键字的返回值传递。
gen.throw()
另一方面,允许在生成器中引发Exception,但在同一位置引发了异常yield
。
从生成器返回值
从生成器返回一个值,结果将该值放入StopIteration
异常中。稍后我们可以从异常中恢复值,并根据需要使用它。
>>> def test():
... yield 1
... return "abc"
...
>>> gen = test()
>>> next(gen)
1
>>> try:
... next(gen)
... except StopIteration as exc:
... print(exc.value)
...
abc
看,一个新的关键字: yield from
Python 3.4附带了一个新关键字:yield from
。什么是关键字允许我们做的,是通过对任何next()
,send()
并throw()
成为最内嵌套的生成器。如果内部生成器返回一个值,则它也是的返回值yield from
:
>>> def inner():
... inner_result = yield 2
... print('inner', inner_result)
... return 3
...
>>> def outer():
... yield 1
... val = yield from inner()
... print('outer', val)
... yield 4
...
>>> gen = outer()
>>> next(gen)
1
>>> next(gen) # Goes inside inner() automatically
2
>>> gen.send("abc")
inner abc
outer 3
4
我写了一篇文章进一步阐述这个话题。
放在一起
yield from
在Python 3.4中引入了new关键字之后,我们现在能够在生成器内部创建生成器,就像隧道一样,将数据从最内层生成器来回传递到最外层生成器。这为生成器- 协程产生了新的含义。
协程是可以在运行时停止和恢复的功能。在Python中,它们是使用async def
关键字定义的。就像生成器一样,它们也使用自己的形式,yield from
即await
。之前async
和await
被在Python 3.5推出,我们创建了创建完全相同的方式生成协同程序(带yield from
代替await
)。
async def inner():
return 1
async def outer():
await inner()
像实现该__iter__()
方法的每个迭代器或生成器一样,协程实现__await__()
也允许它们每次都继续执行await coro
。
在异步中,除了协程功能外,我们还有两个重要的对象:任务和期货。
期货
期货是已__await__()
实现该方法的对象,其任务是保持某种状态和结果。状态可以是以下之一:
- 待处理-未来未设置任何结果或exceptions。
- 已取消-将来已使用取消
fut.cancel()
- 完成-将来通过使用的结果集
fut.set_result()
或使用的异常集完成fut.set_exception()
就像您猜到的那样,结果可能是将返回的Python对象,也可能是引发异常的对象。
对象的另一个重要特征future
是它们包含一个称为的方法add_done_callback()
。此方法允许在任务完成后立即调用函数-无论是引发异常还是完成。
任务
任务对象是特殊的期货,它们围绕着协程,并与最内部和最外部的协程进行通信。每当协程成为await
未来时,未来都会一直传递到任务中(就像中的一样yield from
),任务会接收它。
接下来,任务将自己绑定到未来。它通过呼吁add_done_callback()
未来来做到这一点。从现在开始,如果将来能够实现,通过取消,传递异常或传递Python对象作为结果,任务的回调将被调用,并将恢复为存在。
异步
我们必须回答的最后一个亟待解决的问题是-IO如何实现?
在异步内部,我们有一个事件循环。任务的事件循环。事件循环的工作是在每次准备就绪时调用任务,并将所有工作协调到一台工作机中。
事件循环的IO部分建立在一个称为的关键功能上select
。Select是一种阻止功能,由下面的操作系统实现,它允许在套接字上等待传入或传出数据。接收到数据后,它将唤醒,并返回接收到数据的套接字或准备写入的套接字。
当您尝试通过asyncio通过套接字接收或发送数据时,下面实际发生的情况是,首先检查套接字是否有任何可以立即读取或发送的数据。如果其.send()
缓冲区已满,或者.recv()
缓冲区为空,则将套接字注册到该select
函数(只需将其添加到rlist
for recv
和wlist
for 列表之一send
)中,并将适当的函数(await
新创建的future
对象)绑定到该套接字。
当所有可用任务都在等待将来时,事件循环将调用select
并等待。当其中一个套接字有传入数据,或者其send
缓冲区耗尽时,asyncio会检查与该套接字绑定的将来对象,并将其设置为完成。
现在所有的魔术都发生了。未来已经完成,之前添加的任务又恢复了活力add_done_callback()
,并调用.send()
协程以恢复最内部的协程(由于该await
链),并且您从附近的缓冲区读取了新接收到的数据被溅到了。
在以下情况下,再次使用方法链recv()
:
select.select
等待。- 准备好套接字,其中包含数据。
- 来自套接字的数据被移入缓冲区。
future.set_result()
叫做。- 添加自己的任务
add_done_callback()
现在被唤醒。 - Task调用
.send()
协程,协程将一直进入最内层的协程并唤醒它。 - 数据正在从缓冲区中读取,并返回给我们谦虚的用户。
总而言之,asyncio使用生成器功能,该功能允许暂停和恢复功能。它使用的yield from
功能允许将数据从最内层生成器来回传递到最外层。它使用所有这些命令,以便在等待IO完成(通过使用OS select
功能)时停止功能执行。
而最好的呢?当一种功能暂停时,另一种功能可能会运行并与精致的结构(即异步)交错。
回答 1
谈论async/await
和asyncio
不是一回事。第一个是基本的低级构造(协程),而第二个是使用这些构造的库。相反,没有单一的最终答案。
下面是如何的一般说明async/await
和asyncio
样库的工作。也就是说,可能还有其他的技巧(有…),但是除非您自己构建它们,否则它们是无关紧要的。除非您已经足够知道不必提出这样的问题,否则差异应该可以忽略不计。
1.坚果壳中的协程与子程序
就像子例程(函数,过程,…)一样,协程(生成器,…)是调用堆栈和指令指针的抽象:有执行代码段的堆栈,每个执行段都是特定的指令。
def
vs 的区别async def
只是为了清楚起见。实际的差别是return
对yield
。从此,await
或yield from
从单个调用到整个堆栈取不同。
1.1。子程序
子例程表示一个新的堆栈级别,用于保存局部变量,并且单次遍历其指令即可到达末尾。考虑这样的子例程:
def subfoo(bar):
qux = 3
return qux * bar
当您运行它时,这意味着
- 为
bar
和分配堆栈空间qux
- 递归执行第一个语句并跳转到下一个语句
- 一次
return
,将其值推入调用堆栈 - 清除堆栈(1.)和指令指针(2.)
值得注意的是,4.表示子例程始终以相同的状态开始。该功能本身专有的所有内容在完成后都会丢失。即使后面有说明,也无法恢复功能return
。
root -\
: \- subfoo --\
:/--<---return --/
|
V
1.2。协程作为持久子例程
协程就像一个子例程,但是可以在不破坏其状态的情况下退出。考虑这样的协程:
def cofoo(bar):
qux = yield bar # yield marks a break point
return qux
当您运行它时,这意味着
- 为
bar
和分配堆栈空间qux
- 递归执行第一个语句并跳转到下一个语句
- 一次
yield
,将其值压入调用堆栈,但存储堆栈和指令指针 - 一旦调用
yield
,恢复堆栈和指令指针并将参数推入qux
- 一次
- 一次
return
,将其值推入调用堆栈 - 清除堆栈(1.)和指令指针(2.)
请注意,添加了2.1和2.2-协程可以在预定的位置挂起并恢复。这类似于在调用另一个子例程期间暂停子例程的方式。区别在于活动协程并不严格绑定到其调用堆栈。相反,悬挂的协程是单独的隔离堆栈的一部分。
root -\
: \- cofoo --\
:/--<+--yield --/
| :
V :
这意味着悬浮的协程可以在堆栈之间自由存储或移动。任何有权访问协程的调用堆栈都可以决定恢复它。
1.3。遍历调用栈
到目前为止,我们的协程仅在调用堆栈中yield
。子程序可以去和高达调用堆栈return
和()
。为了完整性,协程还需要一种机制来提升调用堆栈。考虑这样的协程:
def wrap():
yield 'before'
yield from cofoo()
yield 'after'
当您运行它时,这意味着它仍然像子例程一样分配堆栈和指令指针。当它挂起时,仍然就像存储一个子例程。
然而,yield from
确实两者。它挂起堆栈wrap
并运行指令cofoo
。请注意,它将wrap
保持挂起状态,直到cofoo
完全完成。每当cofoo
挂起或发送任何内容时,cofoo
都直接连接到调用堆栈。
1.4。协程一直向下
如建立的那样,yield from
允许将两个示波器连接到另一个中间示波器。递归应用时,这意味着堆栈的顶部可以连接到堆栈的底部。
root -\
: \-> coro_a -yield-from-> coro_b --\
:/ <-+------------------------yield ---/
| :
:\ --+-- coro_a.send----------yield ---\
: coro_b <-/
请注意,root
和coro_b
不知道对方。这使得协程比回调更干净:协程仍然像子例程一样建立在1:1关系上。协程将暂停并恢复其整个现有执行堆栈,直到常规调用点为止。
值得注意的是,root
可以恢复任意数量的协程。但是,它永远不能同时恢复多个。同一根的协程是并发的,但不是并行的!
1.5。Python的async
和await
到目前为止,该解释已明确使用生成器的yield
和yield from
词汇-基本功能相同。新的Python3.5语法async
并await
主要是为了清楚起见。
def foo(): # subroutine?
return None
def foo(): # coroutine?
yield from foofoo() # generator? coroutine?
async def foo(): # coroutine!
await foofoo() # coroutine!
return None
需要使用async for
and async with
语句,因为您将yield from/await
使用裸露的for
and with
语句断开链接。
2.简单事件循环的剖析
就一个协程本身而言,没有控制其他协程的概念。它只能对协程堆栈底部的调用者产生控制权。然后,此调用者可以切换到另一个协程并运行它。
几个协程的根节点通常是一个事件循环:在挂起时,协程会产生一个事件,并在该事件上恢复。反过来,事件循环能够有效地等待这些事件发生。这使它可以决定接下来要运行哪个协程,或在恢复之前如何等待。
这种设计意味着循环可以理解一组预定义的事件。几个协程await
相互配合,直到最终完成一个事件await
。该事件可以通过控制直接与事件循环通信yield
。
loop -\
: \-> coroutine --await--> event --\
:/ <-+----------------------- yield --/
| :
| : # loop waits for event to happen
| :
:\ --+-- send(reply) -------- yield --\
: coroutine <--yield-- event <-/
关键是协程暂停允许事件循环和事件直接通信。中间协程堆栈不需要任何有关运行哪个循环或事件如何工作的知识。
2.1.1。及时事件
要处理的最简单事件是到达某个时间点。这也是线程代码的基本块:线程重复sleep
s直到条件成立。但是,常规规则sleep
本身会阻止执行-我们希望其他协程不被阻止。相反,我们想告诉事件循环何时应恢复当前协程堆栈。
2.1.2。定义事件
事件只是我们可以识别的值-通过枚举,类型或其他标识。我们可以使用存储目标时间的简单类来定义它。除了存储事件信息之外,我们还可以await
直接允许一个类。
class AsyncSleep:
"""Event to sleep until a point in time"""
def __init__(self, until: float):
self.until = until
# used whenever someone ``await``s an instance of this Event
def __await__(self):
# yield this Event to the loop
yield self
def __repr__(self):
return '%s(until=%.1f)' % (self.__class__.__name__, self.until)
此类仅存储事件-它没有说明如何实际处理它。
唯一的特殊功能是__await__
– await
关键字寻找的内容。实际上,它是一个迭代器,但不适用于常规迭代机制。
2.2.1。等待事件
现在我们有了一个事件,协程对此有何反应?我们应该能够表达相当于sleep
由await
荷兰国际集团我们的活动。为了更好地了解发生了什么,我们将等待一半的时间两次:
import time
async def asleep(duration: float):
"""await that ``duration`` seconds pass"""
await AsyncSleep(time.time() + duration / 2)
await AsyncSleep(time.time() + duration / 2)
我们可以直接实例化并运行此协程。类似于生成器,使用coroutine.send
运行协程直到得到yield
结果。
coroutine = asleep(100)
while True:
print(coroutine.send(None))
time.sleep(0.1)
这给了我们两个AsyncSleep
事件,然后是StopIteration
协程完成的一个事件。请注意,唯一的延迟来自time.sleep
循环!每个AsyncSleep
仅存储当前时间的偏移量。
2.2.2。活动+睡眠
目前,我们有两种独立的机制可供使用:
AsyncSleep
可以从协程内部产生的事件time.sleep
可以等待而不会影响协程
值得注意的是,这两个是正交的:一个都不影响或触发另一个。结果,我们可以提出自己的策略sleep
来应对延迟AsyncSleep
。
2.3。天真的事件循环
如果我们有几个协程,每个协程可以告诉我们何时要唤醒它。然后,我们可以等到第一个恢复之前,然后再恢复,依此类推。值得注意的是,在每一点上我们只关心下一个。
这样可以进行简单的调度:
- 按照所需的唤醒时间对协程进行排序
- 选择第一个想要唤醒的人
- 等到这个时间点
- 运行这个协程
- 从1开始重复。
一个简单的实现不需要任何高级概念。A list
允许按日期对协程进行排序。等待是有规律的time.sleep
。运行协程的工作方式与之前一样coroutine.send
。
def run(*coroutines):
"""Cooperatively run all ``coroutines`` until completion"""
# store wake-up-time and coroutines
waiting = [(0, coroutine) for coroutine in coroutines]
while waiting:
# 2. pick the first coroutine that wants to wake up
until, coroutine = waiting.pop(0)
# 3. wait until this point in time
time.sleep(max(0.0, until - time.time()))
# 4. run this coroutine
try:
command = coroutine.send(None)
except StopIteration:
continue
# 1. sort coroutines by their desired suspension
if isinstance(command, AsyncSleep):
waiting.append((command.until, coroutine))
waiting.sort(key=lambda item: item[0])
当然,这还有很大的改进空间。我们可以将堆用于等待队列,或者将调度表用于事件。我们还可以从中获取返回值,StopIteration
并将其分配给协程。但是,基本原理保持不变。
2.4。合作等待
该AsyncSleep
事件和run
事件循环是定时事件的工作完全实现。
async def sleepy(identifier: str = "coroutine", count=5):
for i in range(count):
print(identifier, 'step', i + 1, 'at %.2f' % time.time())
await asleep(0.1)
run(*(sleepy("coroutine %d" % j) for j in range(5)))
这将在五个协程中的每个协程之间进行协作切换,每个协程暂停0.1秒。即使事件循环是同步的,它仍然可以在0.5秒而不是2.5秒内执行工作。每个协程保持状态并独立运行。
3. I / O事件循环
支持的事件循环sleep
适用于轮询。但是,等待文件句柄上的I / O可以更有效地完成:操作系统实现I / O,因此知道哪些句柄已准备就绪。理想情况下,事件循环应支持显式的“ Ready for I / O”事件。
3.1。该select
呼叫
Python已经有一个接口可以查询OS的读取I / O句柄。当调用带有读取或写入的句柄时,它返回准备读取或写入的句柄:
readable, writeable, _ = select.select(rlist, wlist, xlist, timeout)
例如,我们可以open
写入文件并等待其准备就绪:
write_target = open('/tmp/foo')
readable, writeable, _ = select.select([], [write_target], [])
select返回后,writeable
包含我们的打开文件。
3.2。基本I / O事件
与AsyncSleep
请求类似,我们需要为I / O定义一个事件。使用底层select
逻辑,事件必须引用可读对象-例如open
文件。另外,我们存储要读取的数据量。
class AsyncRead:
def __init__(self, file, amount=1):
self.file = file
self.amount = amount
self._buffer = ''
def __await__(self):
while len(self._buffer) < self.amount:
yield self
# we only get here if ``read`` should not block
self._buffer += self.file.read(1)
return self._buffer
def __repr__(self):
return '%s(file=%s, amount=%d, progress=%d)' % (
self.__class__.__name__, self.file, self.amount, len(self._buffer)
)
与AsyncSleep
我们一样,我们大多只是存储底层系统调用所需的数据。这次__await__
可以恢复多次-直到我们的需求amount
被阅读为止。另外,我们return
的I / O结果不只是恢复。
3.3。使用读取的I / O增强事件循环
事件循环的基础仍然是run
先前定义的。首先,我们需要跟踪读取请求。这不再是排序的时间表,我们仅将读取请求映射到协程。
# new
waiting_read = {} # type: Dict[file, coroutine]
由于select.select
采用了超时参数,因此可以代替time.sleep
。
# old
time.sleep(max(0.0, until - time.time()))
# new
readable, _, _ = select.select(list(reads), [], [])
这将为我们提供所有可读文件-如果有的话,我们将运行相应的协程。如果没有,我们已经等待了足够长的时间来运行当前的协程。
# new - reschedule waiting coroutine, run readable coroutine
if readable:
waiting.append((until, coroutine))
waiting.sort()
coroutine = waiting_read[readable[0]]
最后,我们必须实际侦听读取请求。
# new
if isinstance(command, AsyncSleep):
...
elif isinstance(command, AsyncRead):
...
3.4。把它放在一起
上面有点简化。如果我们总是可以阅读的话,我们需要做一些切换,以免饿死协程。我们需要处理没有阅读或等待的东西。但是,最终结果仍适合30 LOC。
def run(*coroutines):
"""Cooperatively run all ``coroutines`` until completion"""
waiting_read = {} # type: Dict[file, coroutine]
waiting = [(0, coroutine) for coroutine in coroutines]
while waiting or waiting_read:
# 2. wait until the next coroutine may run or read ...
try:
until, coroutine = waiting.pop(0)
except IndexError:
until, coroutine = float('inf'), None
readable, _, _ = select.select(list(waiting_read), [], [])
else:
readable, _, _ = select.select(list(waiting_read), [], [], max(0.0, until - time.time()))
# ... and select the appropriate one
if readable and time.time() < until:
if until and coroutine:
waiting.append((until, coroutine))
waiting.sort()
coroutine = waiting_read.pop(readable[0])
# 3. run this coroutine
try:
command = coroutine.send(None)
except StopIteration:
continue
# 1. sort coroutines by their desired suspension ...
if isinstance(command, AsyncSleep):
waiting.append((command.until, coroutine))
waiting.sort(key=lambda item: item[0])
# ... or register reads
elif isinstance(command, AsyncRead):
waiting_read[command.file] = coroutine
3.5。协同I / O
的AsyncSleep
,AsyncRead
并且run
实现已全功能的睡眠和/或读取。与相同sleepy
,我们可以定义一个帮助程序来测试阅读:
async def ready(path, amount=1024*32):
print('read', path, 'at', '%d' % time.time())
with open(path, 'rb') as file:
result = return await AsyncRead(file, amount)
print('done', path, 'at', '%d' % time.time())
print('got', len(result), 'B')
run(sleepy('background', 5), ready('/dev/urandom'))
运行此命令,我们可以看到我们的I / O与等待的任务交错:
id background round 1
read /dev/urandom at 1530721148
id background round 2
id background round 3
id background round 4
id background round 5
done /dev/urandom at 1530721148
got 1024 B
4.非阻塞I / O
虽然文件上的I / O可以理解这个概念,但它实际上并不适合于像这样的库asyncio
:select
调用总是返回文件,并且两者都调用,open
并且read
可能无限期地阻塞。这阻止了事件循环的所有协程-这很糟糕。诸如此类的库aiofiles
使用线程和同步来伪造文件中的非阻塞I / O和事件。
但是,套接字确实允许无阻塞的I / O-并且它们固有的延迟使其变得更加关键。在事件循环中使用时,可以包装等待数据和重试而不会阻塞任何内容。
4.1。非阻塞I / O事件
与我们类似AsyncRead
,我们可以为套接字定义一个暂停和读取事件。我们不使用文件,而是使用套接字-该套接字必须是非阻塞的。另外,我们__await__
使用socket.recv
代替file.read
。
class AsyncRecv:
def __init__(self, connection, amount=1, read_buffer=1024):
assert not connection.getblocking(), 'connection must be non-blocking for async recv'
self.connection = connection
self.amount = amount
self.read_buffer = read_buffer
self._buffer = b''
def __await__(self):
while len(self._buffer) < self.amount:
try:
self._buffer += self.connection.recv(self.read_buffer)
except BlockingIOError:
yield self
return self._buffer
def __repr__(self):
return '%s(file=%s, amount=%d, progress=%d)' % (
self.__class__.__name__, self.connection, self.amount, len(self._buffer)
)
与相比AsyncRead
,__await__
执行真正的非阻塞I / O。当有数据时,它总是读取。如果没有可用数据,它将始终挂起。这意味着仅在我们执行有用的工作时才阻止事件循环。
4.2。解除阻塞事件循环
就事件循环而言,没有什么变化。要监听的事件仍然与文件相同-由标记为ready的文件描述符select
。
# old
elif isinstance(command, AsyncRead):
waiting_read[command.file] = coroutine
# new
elif isinstance(command, AsyncRead):
waiting_read[command.file] = coroutine
elif isinstance(command, AsyncRecv):
waiting_read[command.connection] = coroutine
在这一点上,显然与AsyncRead
和AsyncRecv
是同一种事件。我们可以轻松地将它们重构为一个具有可交换I / O组件的事件。实际上,事件循环,协程和事件将调度程序,任意中间代码和实际I / O 清晰地分开。
4.3。非阻塞I / O的丑陋一面
原则上,你应该在这一点上做的是复制的逻辑read
作为recv
对AsyncRecv
。但是,这现在变得更加丑陋-当函数在内核内部阻塞时,您必须处理早期返回,但要对您产生控制权。例如,打开连接与打开文件的时间更长:
# file
file = open(path, 'rb')
# non-blocking socket
connection = socket.socket()
connection.setblocking(False)
# open without blocking - retry on failure
try:
connection.connect((url, port))
except BlockingIOError:
pass
长话短说,剩下的就是几十行异常处理。此时事件和事件循环已经起作用。
id background round 1
read localhost:25000 at 1530783569
read /dev/urandom at 1530783569
done localhost:25000 at 1530783569 got 32768 B
id background round 2
id background round 3
id background round 4
done /dev/urandom at 1530783569 got 4096 B
id background round 5
附录
回答 2
从coro
概念上讲,您的退货是正确的,但略微不完整。
await
不会无条件地挂起,只有在遇到阻塞调用时才挂起。它如何知道呼叫正在阻塞?这由等待的代码决定。例如,可以将套接字读取的等待实现改为:
def read(sock, n):
# sock must be in non-blocking mode
try:
return sock.recv(n)
except EWOULDBLOCK:
event_loop.add_reader(sock.fileno, current_task())
return SUSPEND
在实际的异步中,等效代码修改a的状态,Future
而不返回魔术值,但是概念是相同的。当适当地适合于类似生成器的对象时,可以await
编辑以上代码。
在呼叫方,当协程包含:
data = await read(sock, 1024)
它减少了接近:
data = read(sock, 1024)
if data is SUSPEND:
return SUSPEND
self.pos += 1
self.parts[self.pos](...)
熟悉生成器的人往往会根据yield from
悬浮液自动进行描述。
挂起链一直持续到事件循环,该循环注意到协程已挂起,将其从可运行集合中删除,然后继续执行可运行的协程(如果有)。如果没有协程可运行,则循环等待,select()
直到协程感兴趣的文件描述符中的任何一个都准备好进行IO。(事件循环维护文件描述符到协程的映射。)
在上面的示例中,一旦select()
告知事件循环sock
可读,它将重新添加coro
到可运行集,因此将从暂停点继续执行。
换一种说法:
默认情况下,所有操作都在同一线程中发生。
事件循环负责安排协程,并在协程等待(通常会阻塞或超时的IO调用)准备就绪时将其唤醒。
为了深入了解协程驱动事件循环,我推荐Dave Beazley的演讲,他在现场观众面前演示了从头开始编写事件循环的过程。
回答 3
归结为异步解决的两个主要挑战:
- 如何在单个线程中执行多个I / O?
- 如何实现协作式多任务处理?
关于第一点的答案已经存在了很长一段时间,被称为选择循环。在python中,它是在选择器模块中实现的。
第二个问题与协程的概念有关,即协程可以停止执行并在以后恢复。在python中,协程是使用生成器和yield from语句实现的。这就是隐藏在async / await语法后面的东西。
此答案中的更多资源。
编辑:解决您对goroutines的评论:
在asyncio中,与goroutine最接近的等效项实际上不是协程,而是任务(请参见文档中的区别)。在python中,协程(或生成器)对事件循环或I / O的概念一无所知。它只是一个可以yield
在保持其当前状态的同时停止使用其执行的功能,因此可以在以后还原。该yield from
语法允许以透明方式链接它们。
现在,在异步任务中,位于链最底部的协程始终最终产生了未来。然后,这种未来会上升到事件循环,并集成到内部机制中。当将来通过其他内部回调设置为完成时,事件循环可以通过将将来发送回协程链来恢复任务。
编辑:解决您帖子中的一些问题:
在这种情况下,I / O实际如何发生?在单独的线程中?整个解释器是否已暂停并且I / O在解释器外部进行?
不,线程中什么也没有发生。I / O始终由事件循环管理,主要是通过文件描述符进行。但是,这些文件描述符的注册通常被高级协同程序隐藏,这使您的工作变得很脏。
I / O到底是什么意思?如果我的python过程称为C open()过程,然后它向内核发送了中断,放弃了对它的控制,那么Python解释器如何知道这一点并能够继续运行其他代码,而内核代码则执行实际的I / O,直到唤醒原来发送中断的Python过程?原则上,Python解释器如何知道这种情况?
I / O是任何阻塞调用。在asyncio中,所有I / O操作都应经过事件循环,因为正如您所说,事件循环无法知道某个同步代码中正在执行阻塞调用。这意味着您不应该open
在协程的上下文中使用同步。相反,请使用aiofiles这样的专用库,该库提供的异步版本open
。