内容 隐藏

问题:异步实际上是如何工作的?

这个问题是由我的另一个问题引起的:如何在cdef中等待?

网路上有关于的大量文章和网志文章asyncio,但它们都是非常肤浅的。我找不到任何有关如何asyncio实际实现以及使I / O异步的信息。我正在尝试阅读源代码,但是它是数千行,不是最高等级的C代码,其中很多处理辅助对象,但是最关键的是,很难在Python语法和它将翻译的C代码之间进行连接入。

Asycnio自己的文档甚至没有帮助。那里没有关于它如何工作的信息,只有一些有关如何使用它的指南,有时也会引起误解/写得很差。

我熟悉Go的协程实现,并希望Python做同样的事情。如果是这样的话,我在上面链接的帖子中出现的代码将奏效。既然没有,我现在想找出原因。到目前为止,我最好的猜测如下,请纠正我错的地方:

  1. 形式的过程定义async def foo(): ...实际上被解释为类继承的方法coroutine
  2. 也许async def实际上是通过await语句分为多个方法,在这些方法上被调用的对象能够跟踪到目前为止执行所取得的进展。
  3. 如果上述条件成立,那么从本质上讲,协程的执行归结为某个全局管理器调用循环对象的方法(循环?)。
  4. 全局管理器以某种方式(如何?)知道何时由Python代码执行I / O操作(仅?),并且能够选择当前执行方法放弃控制后执行的待处理协程方法之一(命中该await语句) )。

换句话说,这是我尝试将某些asyncio语法“简化”为更易于理解的内容:

async def coro(name):
    print('before', name)
    await asyncio.sleep()
    print('after', name)

asyncio.gather(coro('first'), coro('second'))

# translated from async def coro(name)
class Coro(coroutine):
    def before(self, name):
        print('before', name)

    def after(self, name):
        print('after', name)

    def __init__(self, name):
        self.name = name
        self.parts = self.before, self.after
        self.pos = 0

    def __call__():
        self.parts[self.pos](self.name)
        self.pos += 1

    def done(self):
        return self.pos == len(self.parts)


# translated from asyncio.gather()
class AsyncIOManager:

    def gather(*coros):
        while not every(c.done() for c in coros):
            coro = random.choice(coros)
            coro()

如果我的猜测证明是正确的:那么我有一个问题。在这种情况下,I / O实际如何发生?在单独的线程中?整个解释器是否已暂停并且I / O在解释器外部进行?I / O到底是什么意思?如果我的python过程称为C open()过程,然后它又向内核发送了中断,放弃了对它的控制,那么Python解释器如何知道这一点并能够继续运行其他代码,而内核代码则执行实际的I / O,直到它唤醒了最初发送中断的Python过程?原则上,Python解释器如何知道这种情况?

This question is motivated by my another question: How to await in cdef?

There are tons of articles and blog posts on the web about asyncio, but they are all very superficial. I couldn’t find any information about how asyncio is actually implemented, and what makes I/O asynchronous. I was trying to read the source code, but it’s thousands of lines of not the highest grade C code, a lot of which deals with auxiliary objects, but most crucially, it is hard to connect between Python syntax and what C code it would translate into.

Asycnio’s own documentation is even less helpful. There’s no information there about how it works, only some guidelines about how to use it, which are also sometimes misleading / very poorly written.

I’m familiar with Go’s implementation of coroutines, and was kind of hoping that Python did the same thing. If that was the case, the code I came up in the post linked above would have worked. Since it didn’t, I’m now trying to figure out why. My best guess so far is as follows, please correct me where I’m wrong:

  1. Procedure definitions of the form async def foo(): ... are actually interpreted as methods of a class inheriting coroutine.
  2. Perhaps, async def is actually split into multiple methods by await statements, where the object, on which these methods are called is able to keep track of the progress it made through the execution so far.
  3. If the above is true, then, essentially, execution of a coroutine boils down to calling methods of coroutine object by some global manager (loop?).
  4. The global manager is somehow (how?) aware of when I/O operations are performed by Python (only?) code and is able to choose one of the pending coroutine methods to execute after the current executing method relinquished control (hit on the await statement).

In other words, here’s my attempt at “desugaring” of some asyncio syntax into something more understandable:

async def coro(name):
    print('before', name)
    await asyncio.sleep()
    print('after', name)

asyncio.gather(coro('first'), coro('second'))

# translated from async def coro(name)
class Coro(coroutine):
    def before(self, name):
        print('before', name)

    def after(self, name):
        print('after', name)

    def __init__(self, name):
        self.name = name
        self.parts = self.before, self.after
        self.pos = 0

    def __call__():
        self.parts[self.pos](self.name)
        self.pos += 1

    def done(self):
        return self.pos == len(self.parts)


# translated from asyncio.gather()
class AsyncIOManager:

    def gather(*coros):
        while not every(c.done() for c in coros):
            coro = random.choice(coros)
            coro()

Should my guess prove correct: then I have a problem. How does I/O actually happen in this scenario? In a separate thread? Is the whole interpreter suspended and I/O happens outside the interpreter? What exactly is meant by I/O? If my python procedure called C open() procedure, and it in turn sent interrupt to kernel, relinquishing control to it, how does Python interpreter know about this and is able to continue running some other code, while kernel code does the actual I/O and until it wakes up the Python procedure which sent the interrupt originally? How can Python interpreter in principle, be aware of this happening?


回答 0

asyncio如何工作?

在回答这个问题之前,我们需要了解一些基本术语,如果您已经知道一些基本术语,请跳过这些基本术语。

生成器

生成器是使我们能够暂停python函数执行的对象。用户策划的生成器使用关键字实现。通过创建包含yield关键字的普通函数,我们将该函数转换为生成器:

>>> def test():
...     yield 1
...     yield 2
...
>>> gen = test()
>>> next(gen)
1
>>> next(gen)
2
>>> next(gen)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
StopIteration

如您所见,调用生成器会导致解释器加载测试的帧,并返回yielded值。next()再次调用,使框架再次加载到解释器堆栈中,并继续yield输入另一个值。

到第三次next()调用时,我们的生成器完成了StopIteration并被抛出。

与生成器通讯

生成器的鲜为人知的特点是,你可以与他们使用两种方法进行通信的事实:send()throw()

>>> def test():
...     val = yield 1
...     print(val)
...     yield 2
...     yield 3
...
>>> gen = test()
>>> next(gen)
1
>>> gen.send("abc")
abc
2
>>> gen.throw(Exception())
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 4, in test
Exception

调用时gen.send(),该值作为yield关键字的返回值传递。

gen.throw()另一方面,允许在生成器中引发Exception,但在同一位置引发了异常yield

从生成器返回值

从生成器返回一个值,结果将该值放入StopIteration异常中。稍后我们可以从异常中恢复值,并根据需要使用它。

>>> def test():
...     yield 1
...     return "abc"
...
>>> gen = test()
>>> next(gen)
1
>>> try:
...     next(gen)
... except StopIteration as exc:
...     print(exc.value)
...
abc

看,一个新的关键字: yield from

Python 3.4附带了一个新关键字:。什么是关键字允许我们做的,是通过对任何next()send()throw()成为最内嵌套的生成器。如果内部生成器返回一个值,则它也是的返回值yield from

>>> def inner():
...     inner_result = yield 2
...     print('inner', inner_result)
...     return 3
...
>>> def outer():
...     yield 1
...     val = yield from inner()
...     print('outer', val)
...     yield 4
...
>>> gen = outer()
>>> next(gen)
1
>>> next(gen) # Goes inside inner() automatically
2
>>> gen.send("abc")
inner abc
outer 3
4

我写了一篇文章进一步阐述这个话题。

放在一起

yield from在Python 3.4中引入了new关键字之后,我们现在能够在生成器内部创建生成器,就像隧道一样,将数据从最内层生成器来回传递到最外层生成器。这为生成器- 协程产生了新的含义。

协程是可以在运行时停止和恢复的功能。在Python中,它们是使用关键字定义的。就像生成器一样,它们也使用自己的形式,yield fromawait。之前asyncawait被在Python 3.5推出,我们创建了创建完全相同的方式生成协同程序(带yield from代替await)。

async def inner():
    return 1

async def outer():
    await inner()

像实现该__iter__()方法的每个迭代器或生成器一样,协程实现__await__()也允许它们每次都继续执行await coro

有一个很好的序列图里面Python文档,你应该看看。

在异步中,除了协程功能外,我们还有两个重要的对象:任务期货

期货

期货是已__await__()实现该方法的对象,其任务是保持某种状态和结果。状态可以是以下之一:

  1. 待处理-未来未设置任何结果或exceptions。
  2. 已取消-将来已使用取消 fut.cancel()
  3. 完成-将来通过使用的结果集fut.set_result()或使用的异常集完成fut.set_exception()

就像您猜到的那样,结果可能是将返回的Python对象,也可能是引发异常的对象。

对象的另一个重要特征future是它们包含一个称为的方法add_done_callback()。此方法允许在任务完成后立即调用函数-无论是引发异常还是完成。

任务

任务对象是特殊的期货,它们围绕着协程,并与最内部和最外部的协程进行通信。每当协程成为await未来时,未来都会一直传递到任务中(就像中的一样yield from),任务会接收它。

接下来,任务将自己绑定到未来。它通过呼吁add_done_callback()未来来做到这一点。从现在开始,如果将来能够实现,通过取消,传递异常或传递Python对象作为结果,任务的回调将被调用,并将恢复为存在。

异步

我们必须回答的最后一个亟待解决的问题是-IO如何实现?

在异步内部,我们有一个事件循环。任务的事件循环。事件循环的工作是在每次准备就绪时调用任务,并将所有工作协调到一台工作机中。

事件循环的IO部分建立在一个称为的关键功能上select。Select是一种阻止功能,由下面的操作系统实现,它允许在套接字上等待传入或传出数据。接收到数据后,它将唤醒,并返回接收到数据的套接字或准备写入的套接字。

当您尝试通过asyncio通过套接字接收或发送数据时,下面实际发生的情况是,首先检查套接字是否有任何可以立即读取或发送的数据。如果其.send()缓冲区已满,或者.recv()缓冲区为空,则将套接字注册到该select函数(只需将其添加到rlistfor recvwlistfor 列表之一send)中,并将适当的函数(await新创建的future对象)绑定到该套接字。

当所有可用任务都在等待将来时,事件循环将调用select并等待。当其中一个套接字有传入数据,或者其send缓冲区耗尽时,asyncio会检查与该套接字绑定的将来对象,并将其设置为完成。

现在所有的魔术都发生了。未来已经完成,之前添加的任务又恢复了活力add_done_callback(),并调用.send()协程以恢复最内部的协程(由于该await链),并且您从附近的缓冲区读取了新接收到的数据被溅到了。

在以下情况下,再次使用方法链recv()

  1. select.select 等待。
  2. 准备好套接字,其中包含数据。
  3. 来自套接字的数据被移入缓冲区。
  4. future.set_result() 叫做。
  5. 添加自己的任务add_done_callback()现在被唤醒。
  6. Task调用.send()协程,协程将一直进入最内层的协程并唤醒它。
  7. 数据正在从缓冲区中读取,并返回给我们谦虚的用户。

总而言之,asyncio使用生成器功能,该功能允许暂停和恢复功能。它使用的yield from功能允许将数据从最内层生成器来回传递到最外层。它使用所有这些命令,以便在等待IO完成(通过使用OS select功能)时停止功能执行。

而最好的呢?当一种功能暂停时,另一种功能可能会运行并与精致的结构(即异步)交错。

How does asyncio work?

Before answering this question we need to understand a few base terms, skip these if you already know any of them.

Generators

Generators are objects that allow us to suspend the execution of a python function. User curated generators are implement using the keyword . By creating a normal function containing the yield keyword, we turn that function into a generator:

>>> def test():
...     yield 1
...     yield 2
...
>>> gen = test()
>>> next(gen)
1
>>> next(gen)
2
>>> next(gen)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
StopIteration

As you can see, calling on the generator causes the interpreter to load test’s frame, and return the yielded value. Calling next() again, cause the frame to load again into the interpreter stack, and continue on yielding another value.

By the third time next() is called, our generator was finished, and StopIteration was thrown.

Communicating with a generator

A less-known feature of generators, is the fact that you can communicate with them using two methods: send() and throw().

>>> def test():
...     val = yield 1
...     print(val)
...     yield 2
...     yield 3
...
>>> gen = test()
>>> next(gen)
1
>>> gen.send("abc")
abc
2
>>> gen.throw(Exception())
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 4, in test
Exception

Upon calling gen.send(), the value is passed as a return value from the yield keyword.

gen.throw() on the other hand, allows throwing Exceptions inside generators, with the exception raised at the same spot yield was called.

Returning values from generators

Returning a value from a generator, results in the value being put inside the StopIteration exception. We can later on recover the value from the exception and use it to our need.

>>> def test():
...     yield 1
...     return "abc"
...
>>> gen = test()
>>> next(gen)
1
>>> try:
...     next(gen)
... except StopIteration as exc:
...     print(exc.value)
...
abc

Behold, a new keyword: yield from

Python 3.4 came with the addition of a new keyword: . What that keyword allows us to do, is pass on any next(), send() and throw() into an inner-most nested generator. If the inner generator returns a value, it is also the return value of yield from:

>>> def inner():
...     inner_result = yield 2
...     print('inner', inner_result)
...     return 3
...
>>> def outer():
...     yield 1
...     val = yield from inner()
...     print('outer', val)
...     yield 4
...
>>> gen = outer()
>>> next(gen)
1
>>> next(gen) # Goes inside inner() automatically
2
>>> gen.send("abc")
inner abc
outer 3
4

I’ve written an article to further elaborate on this topic.

Putting it all together

Upon introducing the new keyword yield from in Python 3.4, we were now able to create generators inside generators that just like a tunnel, pass the data back and forth from the inner-most to the outer-most generators. This has spawned a new meaning for generators – coroutines.

Coroutines are functions that can be stopped and resumed while being run. In Python, they are defined using the keyword. Much like generators, they too use their own form of yield from which is await. Before async and await were introduced in Python 3.5, we created coroutines in the exact same way generators were created (with yield from instead of await).

async def inner():
    return 1

async def outer():
    await inner()

Like every iterator or generator that implement the __iter__() method, coroutines implement __await__() which allows them to continue on every time await coro is called.

There’s a nice sequence diagram inside the Python docs that you should check out.

In asyncio, apart from coroutine functions, we have 2 important objects: tasks and futures.

Futures

Futures are objects that have the __await__() method implemented, and their job is to hold a certain state and result. The state can be one of the following:

  1. PENDING – future does not have any result or exception set.
  2. CANCELLED – future was cancelled using fut.cancel()
  3. FINISHED – future was finished, either by a result set using fut.set_result() or by an exception set using fut.set_exception()

The result, just like you have guessed, can either be a Python object, that will be returned, or an exception which may be raised.

Another important feature of future objects, is that they contain a method called add_done_callback(). This method allows functions to be called as soon as the task is done – whether it raised an exception or finished.

Tasks

Task objects are special futures, which wrap around coroutines, and communicate with the inner-most and outer-most coroutines. Every time a coroutine awaits a future, the future is passed all the way back to the task (just like in yield from), and the task receives it.

Next, the task binds itself to the future. It does so by calling add_done_callback() on the future. From now on, if the future will ever be done, by either being cancelled, passed an exception or passed a Python object as a result, the task’s callback will be called, and it will rise back up to existence.

Asyncio

The final burning question we must answer is – how is the IO implemented?

Deep inside asyncio, we have an event loop. An event loop of tasks. The event loop’s job is to call tasks every time they are ready and coordinate all that effort into one single working machine.

The IO part of the event loop is built upon a single crucial function called select. Select is a blocking function, implemented by the operating system underneath, that allows waiting on sockets for incoming or outgoing data. Upon data being received it wakes up, and returns the sockets which received data, or the sockets whom are ready for writing.

When you try to receive or send data over a socket through asyncio, what actually happens below is that the socket is first checked if it has any data that can be immediately read or sent. If its .send() buffer is full, or the .recv() buffer is empty, the socket is registered to the select function (by simply adding it to one of the lists, rlist for recv and wlist for send) and the appropriate function awaits a newly created future object, tied to that socket.

When all available tasks are waiting for futures, the event loop calls select and waits. When the one of the sockets has incoming data, or its send buffer drained up, asyncio checks for the future object tied to that socket, and sets it to done.

Now all the magic happens. The future is set to done, the task that added itself before with add_done_callback() rises up back to life, and calls .send() on the coroutine which resumes the inner-most coroutine (because of the await chain) and you read the newly received data from a nearby buffer it was spilled unto.

Method chain again, in case of recv():

  1. select.select waits.
  2. A ready socket, with data is returned.
  3. Data from the socket is moved into a buffer.
  4. future.set_result() is called.
  5. Task that added itself with add_done_callback() is now woken up.
  6. Task calls .send() on the coroutine which goes all the way into the inner-most coroutine and wakes it up.
  7. Data is being read from the buffer and returned to our humble user.

In summary, asyncio uses generator capabilities, that allow pausing and resuming functions. It uses yield from capabilities that allow passing data back and forth from the inner-most generator to the outer-most. It uses all of those in order to halt function execution while it’s waiting for IO to complete (by using the OS select function).

And the best of all? While one function is paused, another may run and interleave with the delicate fabric, which is asyncio.


回答 1

谈论async/awaitasyncio不是一回事。第一个是基本的低级构造(协程),而第二个是使用这些构造的库。相反,没有单一的最终答案。

下面是如何的一般说明async/awaitasyncio样库的工作。也就是说,可能还有其他的技巧(有…),但是除非您自己构建它们,否则它们是无关紧要的。除非您已经足够知道不必提出这样的问题,否则差异应该可以忽略不计。

1.坚果壳中的协程与子程序

就像子例程(函数,过程,…)一样,协程(生成器,…)是调用堆栈和指令指针的抽象:有执行代码段的堆栈,每个执行段都是特定的指令。

defvs 的区别async def只是为了清楚起见。实际的差别是returnyield。从此,awaityield from从单个调用到整个堆栈取不同。

1.1。子程序

子例程表示一个新的堆栈级别,用于保存局部变量,并且单次遍历其指令即可到达末尾。考虑这样的子例程:

def subfoo(bar):
     qux = 3
     return qux * bar

当您运行它时,这意味着

  1. bar和分配堆栈空间qux
  2. 递归执行第一个语句并跳转到下一个语句
  3. 一次return,将其值推入调用堆栈
  4. 清除堆栈(1.)和指令指针(2.)

值得注意的是,4.表示子例程始终以相同的状态开始。该功能本身专有的所有内容在完成后都会丢失。即使后面有说明,也无法恢复功能return

root -\
  :    \- subfoo --\
  :/--<---return --/
  |
  V

1.2。协程作为持久子例程

协程就像一个子例程,但是可以在破坏其状态的情况下退出。考虑这样的协程:

 def cofoo(bar):
      qux = yield bar  # yield marks a break point
      return qux

当您运行它时,这意味着

  1. bar和分配堆栈空间qux
  2. 递归执行第一个语句并跳转到下一个语句
    1. 一次yield,将其值压入调用堆栈,但存储堆栈和指令指针
    2. 一旦调用yield,恢复堆栈和指令指针并将参数推入qux
  3. 一次return,将其值推入调用堆栈
  4. 清除堆栈(1.)和指令指针(2.)

请注意,添加了2.1和2.2-协程可以在预定的位置挂起并恢复。这类似于在调用另一个子例程期间暂停子例程的方式。区别在于活动协程并不严格绑定到其调用堆栈。相反,悬挂的协程是单独的隔离堆栈的一部分。

root -\
  :    \- cofoo --\
  :/--<+--yield --/
  |    :
  V    :

这意味着悬浮的协程可以在堆栈之间自由存储或移动。任何有权访问协程的调用堆栈都可以决定恢复它。

1.3。遍历调用栈

到目前为止,我们的协程仅在调用堆栈中yield。子程序可以去和高达调用堆栈return()。为了完整性,协程还需要一种机制来提升调用堆栈。考虑这样的协程:

def wrap():
    yield 'before'
    yield from cofoo()
    yield 'after'

当您运行它时,这意味着它仍然像子例程一样分配堆栈和指令指针。当它挂起时,仍然就像存储一个子例程。

然而,yield from确实两者。它挂起堆栈wrap 运行指令cofoo。请注意,它将wrap保持挂起状态,直到cofoo完全完成。每当cofoo挂起或发送任何内容时,cofoo都直接连接到调用堆栈。

1.4。协程一直向下

如建立的那样,yield from允许将两个示波器连接到另一个中间示波器。递归应用时,这意味着堆栈的顶部可以连接到堆栈的底部

root -\
  :    \-> coro_a -yield-from-> coro_b --\
  :/ <-+------------------------yield ---/
  |    :
  :\ --+-- coro_a.send----------yield ---\
  :                             coro_b <-/

请注意,rootcoro_b不知道对方。这使得协程比回调更干净:协程仍然像子例程一样建立在1:1关系上。协程将暂停并恢复其整个现有执行堆栈,直到常规调用点为止。

值得注意的是,root可以恢复任意数量的协程。但是,它永远不能同时恢复多个。同一根的协程是并发的,但不是并行的!

1.5。Python的asyncawait

到目前为止,该解释已明确使用生成器的yieldyield from词汇-基本功能相同。新的Python3.5语法asyncawait主要是为了清楚起见。

def foo():  # subroutine?
     return None

def foo():  # coroutine?
     yield from foofoo()  # generator? coroutine?

async def foo():  # coroutine!
     await foofoo()  # coroutine!
     return None

需要使用async forand async with语句,因为您将yield from/await使用裸露的forand with语句断开链接。

2.简单事件循环的剖析

就一个协程本身而言,没有控制其他协程的概念。它只能对协程堆栈底部的调用者产生控制权。然后,此调用者可以切换到另一个协程并运行它。

几个协程的根节点通常是一个事件循环:在挂起时,协程会产生一个事件,并在该事件上恢复。反过来,事件循环能够有效地等待这些事件发生。这使它可以决定接下来要运行哪个协程,或在恢复之前如何等待。

这种设计意味着循环可以理解一组预定义的事件。几个协程await相互配合,直到最终完成一个事件await。该事件可以通过控制直接与事件循环通信yield

loop -\
  :    \-> coroutine --await--> event --\
  :/ <-+----------------------- yield --/
  |    :
  |    :  # loop waits for event to happen
  |    :
  :\ --+-- send(reply) -------- yield --\
  :        coroutine <--yield-- event <-/

关键是协程暂停允许事件循环和事件直接通信。中间协程堆栈不需要任何有关运行哪个循环或事件如何工作的知识。

2.1.1。及时事件

要处理的最简单事件是到达某个时间点。这也是线程代码的基本块:线程重复sleeps直到条件成立。但是,常规规则sleep本身会阻止执行-我们希望其他协程不被阻止。相反,我们想告诉事件循环何时应恢复当前协程堆栈。

2.1.2。定义事件

事件只是我们可以识别的值-通过枚举,类型或其他标识。我们可以使用存储目标时间的简单类来定义它。除了存储事件信息之外,我们还可以await直接允许一个类。

class AsyncSleep:
    """Event to sleep until a point in time"""
    def __init__(self, until: float):
        self.until = until

    # used whenever someone ``await``s an instance of this Event
    def __await__(self):
        # yield this Event to the loop
        yield self

    def __repr__(self):
        return '%s(until=%.1f)' % (self.__class__.__name__, self.until)

此类仅存储事件-它没有说明如何实际处理它。

唯一的特殊功能是__await__await关键字寻找的内容。实际上,它是一个迭代器,但不适用于常规迭代机制。

2.2.1。等待事件

现在我们有了一个事件,协程对此有何反应?我们应该能够表达相当于sleepawait荷兰国际集团我们的活动。为了更好地了解发生了什么,我们将等待一半的时间两次:

import time

async def asleep(duration: float):
    """await that ``duration`` seconds pass"""
    await AsyncSleep(time.time() + duration / 2)
    await AsyncSleep(time.time() + duration / 2)

我们可以直接实例化并运行此协程。类似于生成器,使用coroutine.send运行协程直到得到yield结果。

coroutine = asleep(100)
while True:
    print(coroutine.send(None))
    time.sleep(0.1)

这给了我们两个AsyncSleep事件,然后是StopIteration协程完成的一个事件。请注意,唯一的延迟来自time.sleep循环!每个AsyncSleep仅存储当前时间的偏移量。

2.2.2。活动+睡眠

目前,我们有两种独立的机制可供使用:

  • AsyncSleep 可以从协程内部产生的事件
  • time.sleep 可以等待而不会影响协程

值得注意的是,这两个是正交的:一个都不影响或触发另一个。结果,我们可以提出自己的策略sleep来应对延迟AsyncSleep

2.3。天真的事件循环

如果我们有几个协程,每个协程可以告诉我们何时要唤醒它。然后,我们可以等到第一个恢复之前,然后再恢复,依此类推。值得注意的是,在每一点上我们只关心下一个

这样可以进行简单的调度:

  1. 按照所需的唤醒时间对协程进行排序
  2. 选择第一个想要唤醒的人
  3. 等到这个时间点
  4. 运行这个协程
  5. 从1开始重复。

一个简单的实现不需要任何高级概念。A list允许按日期对协程进行排序。等待是有规律的time.sleep。运行协程的工作方式与之前一样coroutine.send

def run(*coroutines):
    """Cooperatively run all ``coroutines`` until completion"""
    # store wake-up-time and coroutines
    waiting = [(0, coroutine) for coroutine in coroutines]
    while waiting:
        # 2. pick the first coroutine that wants to wake up
        until, coroutine = waiting.pop(0)
        # 3. wait until this point in time
        time.sleep(max(0.0, until - time.time()))
        # 4. run this coroutine
        try:
            command = coroutine.send(None)
        except StopIteration:
            continue
        # 1. sort coroutines by their desired suspension
        if isinstance(command, AsyncSleep):
            waiting.append((command.until, coroutine))
            waiting.sort(key=lambda item: item[0])

当然,这还有很大的改进空间。我们可以将堆用于等待队列,或者将调度表用于事件。我们还可以从中获取返回值,StopIteration并将其分配给协程。但是,基本原理保持不变。

2.4。合作等待

AsyncSleep事件和run事件循环是定时事件的工作完全实现。

async def sleepy(identifier: str = "coroutine", count=5):
    for i in range(count):
        print(identifier, 'step', i + 1, 'at %.2f' % time.time())
        await asleep(0.1)

run(*(sleepy("coroutine %d" % j) for j in range(5)))

这将在五个协程中的每个协程之间进行协作切换,每个协程暂停0.1秒。即使事件循环是同步的,它仍然可以在0.5秒而不是2.5秒内执行工作。每个协程保持状态并独立运行。

3. I / O事件循环

支持的事件循环sleep适用于轮询。但是,等待文件句柄上的I / O可以更有效地完成:操作系统实现I / O,因此知道哪些句柄已准备就绪。理想情况下,事件循环应支持显式的“ Ready for I / O”事件。

3.1。该select呼叫

Python已经有一个接口可以查询OS的读取I / O句柄。当调用带有读取或写入的句柄时,它返回准备读取或写入的句柄:

readable, writeable, _ = select.select(rlist, wlist, xlist, timeout)

例如,我们可以open写入文件并等待其准备就绪:

write_target = open('/tmp/foo')
readable, writeable, _ = select.select([], [write_target], [])

select返回后,writeable包含我们的打开文件。

3.2。基本I / O事件

AsyncSleep请求类似,我们需要为I / O定义一个事件。使用底层select逻辑,事件必须引用可读对象-例如open文件。另外,我们存储要读取的数据量。

class AsyncRead:
    def __init__(self, file, amount=1):
        self.file = file
        self.amount = amount
        self._buffer = ''

    def __await__(self):
        while len(self._buffer) < self.amount:
            yield self
            # we only get here if ``read`` should not block
            self._buffer += self.file.read(1)
        return self._buffer

    def __repr__(self):
        return '%s(file=%s, amount=%d, progress=%d)' % (
            self.__class__.__name__, self.file, self.amount, len(self._buffer)
        )

AsyncSleep我们一样,我们大多只是存储底层系统调用所需的数据。这次__await__可以恢复多次-直到我们的需求amount被阅读为止。另外,我们return的I / O结果不只是恢复。

3.3。使用读取的I / O增强事件循环

事件循环的基础仍然是run先前定义的。首先,我们需要跟踪读取请求。这不再是排序的时间表,我们仅将读取请求映射到协程。

# new
waiting_read = {}  # type: Dict[file, coroutine]

由于select.select采用了超时参数,因此可以代替time.sleep

# old
time.sleep(max(0.0, until - time.time()))
# new
readable, _, _ = select.select(list(reads), [], [])

这将为我们提供所有可读文件-如果有的话,我们将运行相应的协程。如果没有,我们已经等待了足够长的时间来运行当前的协程。

# new - reschedule waiting coroutine, run readable coroutine
if readable:
    waiting.append((until, coroutine))
    waiting.sort()
    coroutine = waiting_read[readable[0]]

最后,我们必须实际侦听读取请求。

# new
if isinstance(command, AsyncSleep):
    ...
elif isinstance(command, AsyncRead):
    ...

3.4。把它放在一起

上面有点简化。如果我们总是可以阅读的话,我们需要做一些切换,以免饿死协程。我们需要处理没有阅读或等待的东西。但是,最终结果仍适合30 LOC。

def run(*coroutines):
    """Cooperatively run all ``coroutines`` until completion"""
    waiting_read = {}  # type: Dict[file, coroutine]
    waiting = [(0, coroutine) for coroutine in coroutines]
    while waiting or waiting_read:
        # 2. wait until the next coroutine may run or read ...
        try:
            until, coroutine = waiting.pop(0)
        except IndexError:
            until, coroutine = float('inf'), None
            readable, _, _ = select.select(list(waiting_read), [], [])
        else:
            readable, _, _ = select.select(list(waiting_read), [], [], max(0.0, until - time.time()))
        # ... and select the appropriate one
        if readable and time.time() < until:
            if until and coroutine:
                waiting.append((until, coroutine))
                waiting.sort()
            coroutine = waiting_read.pop(readable[0])
        # 3. run this coroutine
        try:
            command = coroutine.send(None)
        except StopIteration:
            continue
        # 1. sort coroutines by their desired suspension ...
        if isinstance(command, AsyncSleep):
            waiting.append((command.until, coroutine))
            waiting.sort(key=lambda item: item[0])
        # ... or register reads
        elif isinstance(command, AsyncRead):
            waiting_read[command.file] = coroutine

3.5。协同I / O

AsyncSleepAsyncRead并且run实现已全功能的睡眠和/或读取。与相同sleepy,我们可以定义一个帮助程序来测试阅读:

async def ready(path, amount=1024*32):
    print('read', path, 'at', '%d' % time.time())
    with open(path, 'rb') as file:
        result = return await AsyncRead(file, amount)
    print('done', path, 'at', '%d' % time.time())
    print('got', len(result), 'B')

run(sleepy('background', 5), ready('/dev/urandom'))

运行此命令,我们可以看到我们的I / O与等待的任务交错:

id background round 1
read /dev/urandom at 1530721148
id background round 2
id background round 3
id background round 4
id background round 5
done /dev/urandom at 1530721148
got 1024 B

4.非阻塞I / O

虽然文件上的I / O可以理解这个概念,但它实际上并不适合于像这样的库asyncioselect调用总是返回文件,并且两者都调用,open并且read可能无限期地阻塞。这阻止了事件循环的所有协程-这很糟糕。诸如此类的库aiofiles使用线程和同步来伪造文件中的非阻塞I / O和事件。

但是,套接字确实允许无阻塞的I / O-并且它们固有的延迟使其变得更加关键。在事件循环中使用时,可以包装等待数据和重试而不会阻塞任何内容。

4.1。非阻塞I / O事件

与我们类似AsyncRead,我们可以为套接字定义一个暂停和读取事件。我们不使用文件,而是使用套接字-该套接字必须是非阻塞的。另外,我们__await__使用socket.recv代替file.read

class AsyncRecv:
    def __init__(self, connection, amount=1, read_buffer=1024):
        assert not connection.getblocking(), 'connection must be non-blocking for async recv'
        self.connection = connection
        self.amount = amount
        self.read_buffer = read_buffer
        self._buffer = b''

    def __await__(self):
        while len(self._buffer) < self.amount:
            try:
                self._buffer += self.connection.recv(self.read_buffer)
            except BlockingIOError:
                yield self
        return self._buffer

    def __repr__(self):
        return '%s(file=%s, amount=%d, progress=%d)' % (
            self.__class__.__name__, self.connection, self.amount, len(self._buffer)
        )

与相比AsyncRead__await__执行真正的非阻塞I / O。当有数据时,它总是读取。如果没有可用数据,它将始终挂起。这意味着仅在我们执行有用的工作时才阻止事件循环。

4.2。解除阻塞事件循环

就事件循环而言,没有什么变化。要监听的事件仍然与文件相同-由标记为ready的文件描述符select

# old
elif isinstance(command, AsyncRead):
    waiting_read[command.file] = coroutine
# new
elif isinstance(command, AsyncRead):
    waiting_read[command.file] = coroutine
elif isinstance(command, AsyncRecv):
    waiting_read[command.connection] = coroutine

在这一点上,显然与AsyncReadAsyncRecv是同一种事件。我们可以轻松地将它们重构为一个具有可交换I / O组件的事件。实际上,事件循环,协程和事件调度程序,任意中间代码和实际I / O 清晰地分开

4.3。非阻塞I / O的丑陋一面

原则上,你应该在这一点上做的是复制的逻辑read作为recvAsyncRecv。但是,这现在变得更加丑陋-当函数在内核内部阻塞时,您必须处理早期返回,但要对您产生控制权。例如,打开连接与打开文件的时间更长:

# file
file = open(path, 'rb')
# non-blocking socket
connection = socket.socket()
connection.setblocking(False)
# open without blocking - retry on failure
try:
    connection.connect((url, port))
except BlockingIOError:
    pass

长话短说,剩下的就是几十行异常处理。此时事件和事件循环已经起作用。

id background round 1
read localhost:25000 at 1530783569
read /dev/urandom at 1530783569
done localhost:25000 at 1530783569 got 32768 B
id background round 2
id background round 3
id background round 4
done /dev/urandom at 1530783569 got 4096 B
id background round 5

附录

github上的示例代码

Talking about async/await and asyncio is not the same thing. The first is a fundamental, low-level construct (coroutines) while the later is a library using these constructs. Conversely, there is no single ultimate answer.

The following is a general description of how async/await and asyncio-like libraries work. That is, there may be other tricks on top (there are…) but they are inconsequential unless you build them yourself. The difference should be negligible unless you already know enough to not have to ask such a question.

1. Coroutines versus subroutines in a nut shell

Just like subroutines (functions, procedures, …), coroutines (generators, …) are an abstraction of call stack and instruction pointer: there is a stack of executing code pieces, and each is at a specific instruction.

The distinction of def versus async def is merely for clarity. The actual difference is return versus yield. From this, await or yield from take the difference from individual calls to entire stacks.

1.1. Subroutines

A subroutine represents a new stack level to hold local variables, and a single traversal of its instructions to reach an end. Consider a subroutine like this:

def subfoo(bar):
     qux = 3
     return qux * bar

When you run it, that means

  1. allocate stack space for bar and qux
  2. recursively execute the first statement and jump to the next statement
  3. once at a return, push its value to the calling stack
  4. clear the stack (1.) and instruction pointer (2.)

Notably, 4. means that a subroutine always starts at the same state. Everything exclusive to the function itself is lost upon completion. A function cannot be resumed, even if there are instructions after return.

root -\
  :    \- subfoo --\
  :/--<---return --/
  |
  V

1.2. Coroutines as persistent subroutines

A coroutine is like a subroutine, but can exit without destroying its state. Consider a coroutine like this:

 def cofoo(bar):
      qux = yield bar  # yield marks a break point
      return qux

When you run it, that means

  1. allocate stack space for bar and qux
  2. recursively execute the first statement and jump to the next statement
    1. once at a yield, push its value to the calling stack but store the stack and instruction pointer
    2. once calling into yield, restore stack and instruction pointer and push arguments to qux
  3. once at a return, push its value to the calling stack
  4. clear the stack (1.) and instruction pointer (2.)

Note the addition of 2.1 and 2.2 – a coroutine can be suspended and resumed at predefined points. This is similar to how a subroutine is suspended during calling another subroutine. The difference is that the active coroutine is not strictly bound to its calling stack. Instead, a suspended coroutine is part of a separate, isolated stack.

root -\
  :    \- cofoo --\
  :/--<+--yield --/
  |    :
  V    :

This means that suspended coroutines can be freely stored or moved between stacks. Any call stack that has access to a coroutine can decide to resume it.

1.3. Traversing the call stack

So far, our coroutine only goes down the call stack with yield. A subroutine can go down and up the call stack with return and (). For completeness, coroutines also need a mechanism to go up the call stack. Consider a coroutine like this:

def wrap():
    yield 'before'
    yield from cofoo()
    yield 'after'

When you run it, that means it still allocates the stack and instruction pointer like a subroutine. When it suspends, that still is like storing a subroutine.

However, yield from does both. It suspends stack and instruction pointer of wrap and runs cofoo. Note that wrap stays suspended until cofoo finishes completely. Whenever cofoo suspends or something is sent, cofoo is directly connected to the calling stack.

1.4. Coroutines all the way down

As established, yield from allows to connect two scopes across another intermediate one. When applied recursively, that means the top of the stack can be connected to the bottom of the stack.

root -\
  :    \-> coro_a -yield-from-> coro_b --\
  :/ <-+------------------------yield ---/
  |    :
  :\ --+-- coro_a.send----------yield ---\
  :                             coro_b <-/

Note that root and coro_b do not know about each other. This makes coroutines much cleaner than callbacks: coroutines still built on a 1:1 relation like subroutines. Coroutines suspend and resume their entire existing execution stack up until a regular call point.

Notably, root could have an arbitrary number of coroutines to resume. Yet, it can never resume more than one at the same time. Coroutines of the same root are concurrent but not parallel!

1.5. Python’s async and await

The explanation has so far explicitly used the yield and yield from vocabulary of generators – the underlying functionality is the same. The new Python3.5 syntax async and await exists mainly for clarity.

def foo():  # subroutine?
     return None

def foo():  # coroutine?
     yield from foofoo()  # generator? coroutine?

async def foo():  # coroutine!
     await foofoo()  # coroutine!
     return None

The async for and async with statements are needed because you would break the yield from/await chain with the bare for and with statements.

2. Anatomy of a simple event loop

By itself, a coroutine has no concept of yielding control to another coroutine. It can only yield control to the caller at the bottom of a coroutine stack. This caller can then switch to another coroutine and run it.

This root node of several coroutines is commonly an event loop: on suspension, a coroutine yields an event on which it wants resume. In turn, the event loop is capable of efficiently waiting for these events to occur. This allows it to decide which coroutine to run next, or how to wait before resuming.

Such a design implies that there is a set of pre-defined events that the loop understands. Several coroutines await each other, until finally an event is awaited. This event can communicate directly with the event loop by yielding control.

loop -\
  :    \-> coroutine --await--> event --\
  :/ <-+----------------------- yield --/
  |    :
  |    :  # loop waits for event to happen
  |    :
  :\ --+-- send(reply) -------- yield --\
  :        coroutine <--yield-- event <-/

The key is that coroutine suspension allows the event loop and events to directly communicate. The intermediate coroutine stack does not require any knowledge about which loop is running it, nor how events work.

2.1.1. Events in time

The simplest event to handle is reaching a point in time. This is a fundamental block of threaded code as well: a thread repeatedly sleeps until a condition is true. However, a regular sleep blocks execution by itself – we want other coroutines to not be blocked. Instead, we want tell the event loop when it should resume the current coroutine stack.

2.1.2. Defining an Event

An event is simply a value we can identify – be it via an enum, a type or other identity. We can define this with a simple class that stores our target time. In addition to storing the event information, we can allow to await a class directly.

class AsyncSleep:
    """Event to sleep until a point in time"""
    def __init__(self, until: float):
        self.until = until

    # used whenever someone ``await``s an instance of this Event
    def __await__(self):
        # yield this Event to the loop
        yield self
    
    def __repr__(self):
        return '%s(until=%.1f)' % (self.__class__.__name__, self.until)

This class only stores the event – it does not say how to actually handle it.

The only special feature is __await__ – it is what the await keyword looks for. Practically, it is an iterator but not available for the regular iteration machinery.

2.2.1. Awaiting an event

Now that we have an event, how do coroutines react to it? We should be able to express the equivalent of sleep by awaiting our event. To better see what is going on, we wait twice for half the time:

import time

async def asleep(duration: float):
    """await that ``duration`` seconds pass"""
    await AsyncSleep(time.time() + duration / 2)
    await AsyncSleep(time.time() + duration / 2)

We can directly instantiate and run this coroutine. Similar to a generator, using coroutine.send runs the coroutine until it yields a result.

coroutine = asleep(100)
while True:
    print(coroutine.send(None))
    time.sleep(0.1)

This gives us two AsyncSleep events and then a StopIteration when the coroutine is done. Notice that the only delay is from time.sleep in the loop! Each AsyncSleep only stores an offset from the current time.

2.2.2. Event + Sleep

At this point, we have two separate mechanisms at our disposal:

  • AsyncSleep Events that can be yielded from inside a coroutine
  • time.sleep that can wait without impacting coroutines

Notably, these two are orthogonal: neither one affects or triggers the other. As a result, we can come up with our own strategy to sleep to meet the delay of an AsyncSleep.

2.3. A naive event loop

If we have several coroutines, each can tell us when it wants to be woken up. We can then wait until the first of them wants to be resumed, then for the one after, and so on. Notably, at each point we only care about which one is next.

This makes for a straightforward scheduling:

  1. sort coroutines by their desired wake up time
  2. pick the first that wants to wake up
  3. wait until this point in time
  4. run this coroutine
  5. repeat from 1.

A trivial implementation does not need any advanced concepts. A list allows to sort coroutines by date. Waiting is a regular time.sleep. Running coroutines works just like before with coroutine.send.

def run(*coroutines):
    """Cooperatively run all ``coroutines`` until completion"""
    # store wake-up-time and coroutines
    waiting = [(0, coroutine) for coroutine in coroutines]
    while waiting:
        # 2. pick the first coroutine that wants to wake up
        until, coroutine = waiting.pop(0)
        # 3. wait until this point in time
        time.sleep(max(0.0, until - time.time()))
        # 4. run this coroutine
        try:
            command = coroutine.send(None)
        except StopIteration:
            continue
        # 1. sort coroutines by their desired suspension
        if isinstance(command, AsyncSleep):
            waiting.append((command.until, coroutine))
            waiting.sort(key=lambda item: item[0])

Of course, this has ample room for improvement. We can use a heap for the wait queue or a dispatch table for events. We could also fetch return values from the StopIteration and assign them to the coroutine. However, the fundamental principle remains the same.

2.4. Cooperative Waiting

The AsyncSleep event and run event loop are a fully working implementation of timed events.

async def sleepy(identifier: str = "coroutine", count=5):
    for i in range(count):
        print(identifier, 'step', i + 1, 'at %.2f' % time.time())
        await asleep(0.1)

run(*(sleepy("coroutine %d" % j) for j in range(5)))

This cooperatively switches between each of the five coroutines, suspending each for 0.1 seconds. Even though the event loop is synchronous, it still executes the work in 0.5 seconds instead of 2.5 seconds. Each coroutine holds state and acts independently.

3. I/O event loop

An event loop that supports sleep is suitable for polling. However, waiting for I/O on a file handle can be done more efficiently: the operating system implements I/O and thus knows which handles are ready. Ideally, an event loop should support an explicit “ready for I/O” event.

3.1. The select call

Python already has an interface to query the OS for read I/O handles. When called with handles to read or write, it returns the handles ready to read or write:

readable, writeable, _ = select.select(rlist, wlist, xlist, timeout)

For example, we can open a file for writing and wait for it to be ready:

write_target = open('/tmp/foo')
readable, writeable, _ = select.select([], [write_target], [])

Once select returns, writeable contains our open file.

3.2. Basic I/O event

Similar to the AsyncSleep request, we need to define an event for I/O. With the underlying select logic, the event must refer to a readable object – say an open file. In addition, we store how much data to read.

class AsyncRead:
    def __init__(self, file, amount=1):
        self.file = file
        self.amount = amount
        self._buffer = ''

    def __await__(self):
        while len(self._buffer) < self.amount:
            yield self
            # we only get here if ``read`` should not block
            self._buffer += self.file.read(1)
        return self._buffer

    def __repr__(self):
        return '%s(file=%s, amount=%d, progress=%d)' % (
            self.__class__.__name__, self.file, self.amount, len(self._buffer)
        )

As with AsyncSleep we mostly just store the data required for the underlying system call. This time, __await__ is capable of being resumed multiple times – until our desired amount has been read. In addition, we return the I/O result instead of just resuming.

3.3. Augmenting an event loop with read I/O

The basis for our event loop is still the run defined previously. First, we need to track the read requests. This is no longer a sorted schedule, we only map read requests to coroutines.

# new
waiting_read = {}  # type: Dict[file, coroutine]

Since select.select takes a timeout parameter, we can use it in place of time.sleep.

# old
time.sleep(max(0.0, until - time.time()))
# new
readable, _, _ = select.select(list(reads), [], [])

This gives us all readable files – if there are any, we run the corresponding coroutine. If there are none, we have waited long enough for our current coroutine to run.

# new - reschedule waiting coroutine, run readable coroutine
if readable:
    waiting.append((until, coroutine))
    waiting.sort()
    coroutine = waiting_read[readable[0]]

Finally, we have to actually listen for read requests.

# new
if isinstance(command, AsyncSleep):
    ...
elif isinstance(command, AsyncRead):
    ...

3.4. Putting it together

The above was a bit of a simplification. We need to do some switching to not starve sleeping coroutines if we can always read. We need to handle having nothing to read or nothing to wait for. However, the end result still fits into 30 LOC.

def run(*coroutines):
    """Cooperatively run all ``coroutines`` until completion"""
    waiting_read = {}  # type: Dict[file, coroutine]
    waiting = [(0, coroutine) for coroutine in coroutines]
    while waiting or waiting_read:
        # 2. wait until the next coroutine may run or read ...
        try:
            until, coroutine = waiting.pop(0)
        except IndexError:
            until, coroutine = float('inf'), None
            readable, _, _ = select.select(list(waiting_read), [], [])
        else:
            readable, _, _ = select.select(list(waiting_read), [], [], max(0.0, until - time.time()))
        # ... and select the appropriate one
        if readable and time.time() < until:
            if until and coroutine:
                waiting.append((until, coroutine))
                waiting.sort()
            coroutine = waiting_read.pop(readable[0])
        # 3. run this coroutine
        try:
            command = coroutine.send(None)
        except StopIteration:
            continue
        # 1. sort coroutines by their desired suspension ...
        if isinstance(command, AsyncSleep):
            waiting.append((command.until, coroutine))
            waiting.sort(key=lambda item: item[0])
        # ... or register reads
        elif isinstance(command, AsyncRead):
            waiting_read[command.file] = coroutine

3.5. Cooperative I/O

The AsyncSleep, AsyncRead and run implementations are now fully functional to sleep and/or read. Same as for sleepy, we can define a helper to test reading:

async def ready(path, amount=1024*32):
    print('read', path, 'at', '%d' % time.time())
    with open(path, 'rb') as file:
        result = await AsyncRead(file, amount)
    print('done', path, 'at', '%d' % time.time())
    print('got', len(result), 'B')

run(sleepy('background', 5), ready('/dev/urandom'))

Running this, we can see that our I/O is interleaved with the waiting task:

id background round 1
read /dev/urandom at 1530721148
id background round 2
id background round 3
id background round 4
id background round 5
done /dev/urandom at 1530721148
got 1024 B

4. Non-Blocking I/O

While I/O on files gets the concept across, it is not really suitable for a library like asyncio: the select call always returns for files, and both open and read may block indefinitely. This blocks all coroutines of an event loop – which is bad. Libraries like aiofiles use threads and synchronization to fake non-blocking I/O and events on file.

However, sockets do allow for non-blocking I/O – and their inherent latency makes it much more critical. When used in an event loop, waiting for data and retrying can be wrapped without blocking anything.

4.1. Non-Blocking I/O event

Similar to our AsyncRead, we can define a suspend-and-read event for sockets. Instead of taking a file, we take a socket – which must be non-blocking. Also, our __await__ uses socket.recv instead of file.read.

class AsyncRecv:
    def __init__(self, connection, amount=1, read_buffer=1024):
        assert not connection.getblocking(), 'connection must be non-blocking for async recv'
        self.connection = connection
        self.amount = amount
        self.read_buffer = read_buffer
        self._buffer = b''

    def __await__(self):
        while len(self._buffer) < self.amount:
            try:
                self._buffer += self.connection.recv(self.read_buffer)
            except BlockingIOError:
                yield self
        return self._buffer

    def __repr__(self):
        return '%s(file=%s, amount=%d, progress=%d)' % (
            self.__class__.__name__, self.connection, self.amount, len(self._buffer)
        )

In contrast to AsyncRead, __await__ performs truly non-blocking I/O. When data is available, it always reads. When no data is available, it always suspends. That means the event loop is only blocked while we perform useful work.

4.2. Un-Blocking the event loop

As far as the event loop is concerned, nothing changes much. The event to listen for is still the same as for files – a file descriptor marked ready by select.

# old
elif isinstance(command, AsyncRead):
    waiting_read[command.file] = coroutine
# new
elif isinstance(command, AsyncRead):
    waiting_read[command.file] = coroutine
elif isinstance(command, AsyncRecv):
    waiting_read[command.connection] = coroutine

At this point, it should be obvious that AsyncRead and AsyncRecv are the same kind of event. We could easily refactor them to be one event with an exchangeable I/O component. In effect, the event loop, coroutines and events cleanly separate a scheduler, arbitrary intermediate code and the actual I/O.

4.3. The ugly side of non-blocking I/O

In principle, what you should do at this point is replicate the logic of read as a recv for AsyncRecv. However, this is much more ugly now – you have to handle early returns when functions block inside the kernel, but yield control to you. For example, opening a connection versus opening a file is much longer:

# file
file = open(path, 'rb')
# non-blocking socket
connection = socket.socket()
connection.setblocking(False)
# open without blocking - retry on failure
try:
    connection.connect((url, port))
except BlockingIOError:
    pass

Long story short, what remains is a few dozen lines of Exception handling. The events and event loop already work at this point.

id background round 1
read localhost:25000 at 1530783569
read /dev/urandom at 1530783569
done localhost:25000 at 1530783569 got 32768 B
id background round 2
id background round 3
id background round 4
done /dev/urandom at 1530783569 got 4096 B
id background round 5

Addendum

Example code at github


回答 2

coro概念上讲,您的退货是正确的,但略微不完整。

await不会无条件地挂起,只有在遇到阻塞调用时才挂起。它如何知道呼叫正在阻塞?这由等待的代码决定。例如,可以将套接字读取的等待实现改为:

def read(sock, n):
    # sock must be in non-blocking mode
    try:
        return sock.recv(n)
    except EWOULDBLOCK:
        event_loop.add_reader(sock.fileno, current_task())
        return SUSPEND

在实际的异步中,等效代码修改a的状态,Future而不返回魔术值,但是概念是相同的。当适当地适合于类似生成器的对象时,可以await编辑以上代码。

在呼叫方,当协程包含:

data = await read(sock, 1024)

它减少了接近:

data = read(sock, 1024)
if data is SUSPEND:
    return SUSPEND
self.pos += 1
self.parts[self.pos](...)

熟悉生成器的人往往会根据yield from悬浮液自动进行描述。

挂起链一直持续到事件循环,该循环注意到协程已挂起,将其从可运行集合中删除,然后继续执行可运行的协程(如果有)。如果没有协程可运行,则循环等待,select()直到协程感兴趣的文件描述符中的任何一个都准备好进行IO。(事件循环维护文件描述符到协程的映射。)

在上面的示例中,一旦select()告知事件循环sock可读,它将重新添加coro到可运行集,因此将从暂停点继续执行。

换一种说法:

  1. 默认情况下,所有操作都在同一线程中发生。

  2. 事件循环负责安排协程,并在协程等待(通常会阻塞或超时的IO调用)准备就绪时将其唤醒。

为了深入了解协程驱动事件循环,我推荐Dave Beazley的演讲,他在现场观众面前演示了从头开始编写事件循环的过程。

Your coro desugaring is conceptually correct, but slightly incomplete.

await doesn’t suspend unconditionally, but only if it encounters a blocking call. How does it know that a call is blocking? This is decided by the code being awaited. For example, an awaitable implementation of socket read could be desugared to:

def read(sock, n):
    # sock must be in non-blocking mode
    try:
        return sock.recv(n)
    except EWOULDBLOCK:
        event_loop.add_reader(sock.fileno, current_task())
        return SUSPEND

In real asyncio the equivalent code modifies the state of a Future instead of returning magic values, but the concept is the same. When appropriately adapted to a generator-like object, the above code can be awaited.

On the caller side, when your coroutine contains:

data = await read(sock, 1024)

It desugars into something close to:

data = read(sock, 1024)
if data is SUSPEND:
    return SUSPEND
self.pos += 1
self.parts[self.pos](...)

People familiar with generators tend to describe the above in terms of yield from which does the suspension automatically.

The suspension chain continues all the way up to the event loop, which notices that the coroutine is suspended, removes it from the runnable set, and goes on to execute coroutines that are runnable, if any. If no coroutines are runnable, the loop waits in select() until either a file descriptor a coroutine is interested in becomes ready for IO. (The event loop maintains a file-descriptor-to-coroutine mapping.)

In the above example, once select() tells the event loop that sock is readable, it will re-add coro to the runnable set, so it will be continued from the point of suspension.

In other words:

  1. Everything happens in the same thread by default.

  2. The event loop is responsible for scheduling the coroutines and waking them up when whatever they were waiting for (typically an IO call that would normally block, or a timeout) becomes ready.

For insight on coroutine-driving event loops, I recommend this talk by Dave Beazley, where he demonstrates coding an event loop from scratch in front of live audience.


回答 3

归结为异步解决的两个主要挑战:

  • 如何在单个线程中执行多个I / O?
  • 如何实现协作式多任务处理?

关于第一点的答案已经存在了很长一段时间,被称为选择循环。在python中,它是在选择器模块中实现的。

第二个问题与协程的概念有关,即协程可以停止执行并在以后恢复。在python中,协程是使用生成器yield from语句实现的。这就是隐藏在async / await语法后面的东西。

答案中的更多资源。


编辑:解决您对goroutines的评论:

在asyncio中,与goroutine最接近的等效项实际上不是协程,而是任务(请参见文档中的区别)。在python中,协程(或生成器)对事件循环或I / O的概念一无所知。它只是一个可以yield在保持其当前状态的同时停止使用其执行的功能,因此可以在以后还原。该yield from语法允许以透明方式链接它们。

现在,在异步任务中,位于链最底部的协程始终最终产生了未来。然后,这种未来会上升到事件循环,并集成到内部机制中。当将来通过其他内部回调设置为完成时,事件循环可以通过将将来发送回协程链来恢复任务。


编辑:解决您帖子中的一些问题:

在这种情况下,I / O实际如何发生?在单独的线程中?整个解释器是否已暂停并且I / O在解释器外部进行?

不,线程中什么也没有发生。I / O始终由事件循环管理,主要是通过文件描述符进行。但是,这些文件描述符的注册通常被高级协同程序隐藏,这使您的工作变得很脏。

I / O到底是什么意思?如果我的python过程称为C open()过程,然后它向内核发送了中断,放弃了对它的控制,那么Python解释器如何知道这一点并能够继续运行其他代码,而内核代码则执行实际的I / O,直到唤醒原来发送中断的Python过程?原则上,Python解释器如何知道这种情况?

I / O是任何阻塞调用。在asyncio中,所有I / O操作都应经过事件循环,因为正如您所说,事件循环无法知道某个同步代码中正在执行阻塞调用。这意味着您不应该open在协程的上下文中使用同步。相反,请使用aiofiles这样的专用库,该库提供的异步版本open

It all boils down to the two main challenges that asyncio is addressing:

  • How to perform multiple I/O in a single thread?
  • How to implement cooperative multitasking?

The answer to the first point has been around for a long while and is called a select loop. In python, it is implemented in the selectors module.

The second question is related to the concept of coroutine, i.e. functions that can stop their execution and be restored later on. In python, coroutines are implemented using generators and the yield from statement. That’s what is hiding behind the async/await syntax.

More resources in this answer.


EDIT: Addressing your comment about goroutines:

The closest equivalent to a goroutine in asyncio is actually not a coroutine but a task (see the difference in the documentation). In python, a coroutine (or a generator) knows nothing about the concepts of event loop or I/O. It simply is a function that can stop its execution using yield while keeping its current state, so it can be restored later on. The yield from syntax allows for chaining them in a transparent way.

Now, within an asyncio task, the coroutine at the very bottom of the chain always ends up yielding a future. This future then bubbles up to the event loop, and gets integrated into the inner machinery. When the future is set to done by some other inner callback, the event loop can restore the task by sending the future back into the coroutine chain.


EDIT: Addressing some of the questions in your post:

How does I/O actually happen in this scenario? In a separate thread? Is the whole interpreter suspended and I/O happens outside the interpreter?

No, nothing happens in a thread. I/O is always managed by the event loop, mostly through file descriptors. However the registration of those file descriptors is usually hidden by high-level coroutines, making the dirty work for you.

What exactly is meant by I/O? If my python procedure called C open() procedure, and it in turn sent interrupt to kernel, relinquishing control to it, how does Python interpreter know about this and is able to continue running some other code, while kernel code does the actual I/O and until it wakes up the Python procedure which sent the interrupt originally? How can Python interpreter in principle, be aware of this happening?

An I/O is any blocking call. In asyncio, all the I/O operations should go through the event loop, because as you said, the event loop has no way to be aware that a blocking call is being performed in some synchronous code. That means you’re not supposed to use a synchronous open within the context of a coroutine. Instead, use a dedicated library such aiofiles which provides an asynchronous version of open.


声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。