标签归档:python-asyncio

异步实际上是如何工作的?

问题:异步实际上是如何工作的?

这个问题是由我的另一个问题引起的:如何在cdef中等待?

网路上有关于的大量文章和网志文章asyncio,但它们都是非常肤浅的。我找不到任何有关如何asyncio实际实现以及使I / O异步的信息。我正在尝试阅读源代码,但是它是数千行,不是最高等级的C代码,其中很多处理辅助对象,但是最关键的是,很难在Python语法和它将翻译的C代码之间进行连接入。

Asycnio自己的文档甚至没有帮助。那里没有关于它如何工作的信息,只有一些有关如何使用它的指南,有时也会引起误解/写得很差。

我熟悉Go的协程实现,并希望Python做同样的事情。如果是这样的话,我在上面链接的帖子中出现的代码将奏效。既然没有,我现在想找出原因。到目前为止,我最好的猜测如下,请纠正我错的地方:

  1. 形式的过程定义async def foo(): ...实际上被解释为类继承的方法coroutine
  2. 也许async def实际上是通过await语句分为多个方法,在这些方法上被调用的对象能够跟踪到目前为止执行所取得的进展。
  3. 如果上述条件成立,那么从本质上讲,协程的执行归结为某个全局管理器调用循环对象的方法(循环?)。
  4. 全局管理器以某种方式(如何?)知道何时由Python代码执行I / O操作(仅?),并且能够选择当前执行方法放弃控制后执行的待处理协程方法之一(命中该await语句) )。

换句话说,这是我尝试将某些asyncio语法“简化”为更易于理解的内容:

async def coro(name):
    print('before', name)
    await asyncio.sleep()
    print('after', name)

asyncio.gather(coro('first'), coro('second'))

# translated from async def coro(name)
class Coro(coroutine):
    def before(self, name):
        print('before', name)

    def after(self, name):
        print('after', name)

    def __init__(self, name):
        self.name = name
        self.parts = self.before, self.after
        self.pos = 0

    def __call__():
        self.parts[self.pos](self.name)
        self.pos += 1

    def done(self):
        return self.pos == len(self.parts)


# translated from asyncio.gather()
class AsyncIOManager:

    def gather(*coros):
        while not every(c.done() for c in coros):
            coro = random.choice(coros)
            coro()

如果我的猜测证明是正确的:那么我有一个问题。在这种情况下,I / O实际如何发生?在单独的线程中?整个解释器是否已暂停并且I / O在解释器外部进行?I / O到底是什么意思?如果我的python过程称为C open()过程,然后它又向内核发送了中断,放弃了对它的控制,那么Python解释器如何知道这一点并能够继续运行其他代码,而内核代码则执行实际的I / O,直到它唤醒了最初发送中断的Python过程?原则上,Python解释器如何知道这种情况?

This question is motivated by my another question: How to await in cdef?

There are tons of articles and blog posts on the web about asyncio, but they are all very superficial. I couldn’t find any information about how asyncio is actually implemented, and what makes I/O asynchronous. I was trying to read the source code, but it’s thousands of lines of not the highest grade C code, a lot of which deals with auxiliary objects, but most crucially, it is hard to connect between Python syntax and what C code it would translate into.

Asycnio’s own documentation is even less helpful. There’s no information there about how it works, only some guidelines about how to use it, which are also sometimes misleading / very poorly written.

I’m familiar with Go’s implementation of coroutines, and was kind of hoping that Python did the same thing. If that was the case, the code I came up in the post linked above would have worked. Since it didn’t, I’m now trying to figure out why. My best guess so far is as follows, please correct me where I’m wrong:

  1. Procedure definitions of the form async def foo(): ... are actually interpreted as methods of a class inheriting coroutine.
  2. Perhaps, async def is actually split into multiple methods by await statements, where the object, on which these methods are called is able to keep track of the progress it made through the execution so far.
  3. If the above is true, then, essentially, execution of a coroutine boils down to calling methods of coroutine object by some global manager (loop?).
  4. The global manager is somehow (how?) aware of when I/O operations are performed by Python (only?) code and is able to choose one of the pending coroutine methods to execute after the current executing method relinquished control (hit on the await statement).

In other words, here’s my attempt at “desugaring” of some asyncio syntax into something more understandable:

async def coro(name):
    print('before', name)
    await asyncio.sleep()
    print('after', name)

asyncio.gather(coro('first'), coro('second'))

# translated from async def coro(name)
class Coro(coroutine):
    def before(self, name):
        print('before', name)

    def after(self, name):
        print('after', name)

    def __init__(self, name):
        self.name = name
        self.parts = self.before, self.after
        self.pos = 0

    def __call__():
        self.parts[self.pos](self.name)
        self.pos += 1

    def done(self):
        return self.pos == len(self.parts)


# translated from asyncio.gather()
class AsyncIOManager:

    def gather(*coros):
        while not every(c.done() for c in coros):
            coro = random.choice(coros)
            coro()

Should my guess prove correct: then I have a problem. How does I/O actually happen in this scenario? In a separate thread? Is the whole interpreter suspended and I/O happens outside the interpreter? What exactly is meant by I/O? If my python procedure called C open() procedure, and it in turn sent interrupt to kernel, relinquishing control to it, how does Python interpreter know about this and is able to continue running some other code, while kernel code does the actual I/O and until it wakes up the Python procedure which sent the interrupt originally? How can Python interpreter in principle, be aware of this happening?


回答 0

asyncio如何工作?

在回答这个问题之前,我们需要了解一些基本术语,如果您已经知道一些基本术语,请跳过这些基本术语。

生成器

生成器是使我们能够暂停python函数执行的对象。用户策划的生成器使用关键字实现yield。通过创建包含yield关键字的普通函数,我们将该函数转换为生成器:

>>> def test():
...     yield 1
...     yield 2
...
>>> gen = test()
>>> next(gen)
1
>>> next(gen)
2
>>> next(gen)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
StopIteration

如您所见,调用next()生成器会导致解释器加载测试的帧,并返回yielded值。next()再次调用,使框架再次加载到解释器堆栈中,并继续yield输入另一个值。

到第三次next()调用时,我们的生成器完成了StopIteration并被抛出。

与生成器通讯

生成器的鲜为人知的特点是,你可以与他们使用两种方法进行通信的事实:send()throw()

>>> def test():
...     val = yield 1
...     print(val)
...     yield 2
...     yield 3
...
>>> gen = test()
>>> next(gen)
1
>>> gen.send("abc")
abc
2
>>> gen.throw(Exception())
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 4, in test
Exception

调用时gen.send(),该值作为yield关键字的返回值传递。

gen.throw()另一方面,允许在生成器中引发Exception,但在同一位置引发了异常yield

从生成器返回值

从生成器返回一个值,结果将该值放入StopIteration异常中。稍后我们可以从异常中恢复值,并根据需要使用它。

>>> def test():
...     yield 1
...     return "abc"
...
>>> gen = test()
>>> next(gen)
1
>>> try:
...     next(gen)
... except StopIteration as exc:
...     print(exc.value)
...
abc

看,一个新的关键字: yield from

Python 3.4附带了一个新关键字:yield from。什么是关键字允许我们做的,是通过对任何next()send()throw()成为最内嵌套的生成器。如果内部生成器返回一个值,则它也是的返回值yield from

>>> def inner():
...     inner_result = yield 2
...     print('inner', inner_result)
...     return 3
...
>>> def outer():
...     yield 1
...     val = yield from inner()
...     print('outer', val)
...     yield 4
...
>>> gen = outer()
>>> next(gen)
1
>>> next(gen) # Goes inside inner() automatically
2
>>> gen.send("abc")
inner abc
outer 3
4

我写了一篇文章进一步阐述这个话题。

放在一起

yield from在Python 3.4中引入了new关键字之后,我们现在能够在生成器内部创建生成器,就像隧道一样,将数据从最内层生成器来回传递到最外层生成器。这为生成器- 协程产生了新的含义。

协程是可以在运行时停止和恢复的功能。在Python中,它们是使用async def关键字定义的。就像生成器一样,它们也使用自己的形式,yield fromawait。之前asyncawait被在Python 3.5推出,我们创建了创建完全相同的方式生成协同程序(带yield from代替await)。

async def inner():
    return 1

async def outer():
    await inner()

像实现该__iter__()方法的每个迭代器或生成器一样,协程实现__await__()也允许它们每次都继续执行await coro

有一个很好的序列图里面Python文档,你应该看看。

在异步中,除了协程功能外,我们还有两个重要的对象:任务期货

期货

期货是已__await__()实现该方法的对象,其任务是保持某种状态和结果。状态可以是以下之一:

  1. 待处理-未来未设置任何结果或exceptions。
  2. 已取消-将来已使用取消 fut.cancel()
  3. 完成-将来通过使用的结果集fut.set_result()或使用的异常集完成fut.set_exception()

就像您猜到的那样,结果可能是将返回的Python对象,也可能是引发异常的对象。

对象的另一个重要特征future是它们包含一个称为的方法add_done_callback()。此方法允许在任务完成后立即调用函数-无论是引发异常还是完成。

任务

任务对象是特殊的期货,它们围绕着协程,并与最内部和最外部的协程进行通信。每当协程成为await未来时,未来都会一直传递到任务中(就像中的一样yield from),任务会接收它。

接下来,任务将自己绑定到未来。它通过呼吁add_done_callback()未来来做到这一点。从现在开始,如果将来能够实现,通过取消,传递异常或传递Python对象作为结果,任务的回调将被调用,并将恢复为存在。

异步

我们必须回答的最后一个亟待解决的问题是-IO如何实现?

在异步内部,我们有一个事件循环。任务的事件循环。事件循环的工作是在每次准备就绪时调用任务,并将所有工作协调到一台工作机中。

事件循环的IO部分建立在一个称为的关键功能上select。Select是一种阻止功能,由下面的操作系统实现,它允许在套接字上等待传入或传出数据。接收到数据后,它将唤醒,并返回接收到数据的套接字或准备写入的套接字。

当您尝试通过asyncio通过套接字接收或发送数据时,下面实际发生的情况是,首先检查套接字是否有任何可以立即读取或发送的数据。如果其.send()缓冲区已满,或者.recv()缓冲区为空,则将套接字注册到该select函数(只需将其添加到rlistfor recvwlistfor 列表之一send)中,并将适当的函数(await新创建的future对象)绑定到该套接字。

当所有可用任务都在等待将来时,事件循环将调用select并等待。当其中一个套接字有传入数据,或者其send缓冲区耗尽时,asyncio会检查与该套接字绑定的将来对象,并将其设置为完成。

现在所有的魔术都发生了。未来已经完成,之前添加的任务又恢复了活力add_done_callback(),并调用.send()协程以恢复最内部的协程(由于该await链),并且您从附近的缓冲区读取了新接收到的数据被溅到了。

在以下情况下,再次使用方法链recv()

  1. select.select 等待。
  2. 准备好套接字,其中包含数据。
  3. 来自套接字的数据被移入缓冲区。
  4. future.set_result() 叫做。
  5. 添加自己的任务add_done_callback()现在被唤醒。
  6. Task调用.send()协程,协程将一直进入最内层的协程并唤醒它。
  7. 数据正在从缓冲区中读取,并返回给我们谦虚的用户。

总而言之,asyncio使用生成器功能,该功能允许暂停和恢复功能。它使用的yield from功能允许将数据从最内层生成器来回传递到最外层。它使用所有这些命令,以便在等待IO完成(通过使用OS select功能)时停止功能执行。

而最好的呢?当一种功能暂停时,另一种功能可能会运行并与精致的结构(即异步)交错。

How does asyncio work?

Before answering this question we need to understand a few base terms, skip these if you already know any of them.

Generators

Generators are objects that allow us to suspend the execution of a python function. User curated generators are implement using the keyword yield. By creating a normal function containing the yield keyword, we turn that function into a generator:

>>> def test():
...     yield 1
...     yield 2
...
>>> gen = test()
>>> next(gen)
1
>>> next(gen)
2
>>> next(gen)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
StopIteration

As you can see, calling next() on the generator causes the interpreter to load test’s frame, and return the yielded value. Calling next() again, cause the frame to load again into the interpreter stack, and continue on yielding another value.

By the third time next() is called, our generator was finished, and StopIteration was thrown.

Communicating with a generator

A less-known feature of generators, is the fact that you can communicate with them using two methods: send() and throw().

>>> def test():
...     val = yield 1
...     print(val)
...     yield 2
...     yield 3
...
>>> gen = test()
>>> next(gen)
1
>>> gen.send("abc")
abc
2
>>> gen.throw(Exception())
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 4, in test
Exception

Upon calling gen.send(), the value is passed as a return value from the yield keyword.

gen.throw() on the other hand, allows throwing Exceptions inside generators, with the exception raised at the same spot yield was called.

Returning values from generators

Returning a value from a generator, results in the value being put inside the StopIteration exception. We can later on recover the value from the exception and use it to our need.

>>> def test():
...     yield 1
...     return "abc"
...
>>> gen = test()
>>> next(gen)
1
>>> try:
...     next(gen)
... except StopIteration as exc:
...     print(exc.value)
...
abc

Behold, a new keyword: yield from

Python 3.4 came with the addition of a new keyword: yield from. What that keyword allows us to do, is pass on any next(), send() and throw() into an inner-most nested generator. If the inner generator returns a value, it is also the return value of yield from:

>>> def inner():
...     inner_result = yield 2
...     print('inner', inner_result)
...     return 3
...
>>> def outer():
...     yield 1
...     val = yield from inner()
...     print('outer', val)
...     yield 4
...
>>> gen = outer()
>>> next(gen)
1
>>> next(gen) # Goes inside inner() automatically
2
>>> gen.send("abc")
inner abc
outer 3
4

I’ve written an article to further elaborate on this topic.

Putting it all together

Upon introducing the new keyword yield from in Python 3.4, we were now able to create generators inside generators that just like a tunnel, pass the data back and forth from the inner-most to the outer-most generators. This has spawned a new meaning for generators – coroutines.

Coroutines are functions that can be stopped and resumed while being run. In Python, they are defined using the async def keyword. Much like generators, they too use their own form of yield from which is await. Before async and await were introduced in Python 3.5, we created coroutines in the exact same way generators were created (with yield from instead of await).

async def inner():
    return 1

async def outer():
    await inner()

Like every iterator or generator that implement the __iter__() method, coroutines implement __await__() which allows them to continue on every time await coro is called.

There’s a nice sequence diagram inside the Python docs that you should check out.

In asyncio, apart from coroutine functions, we have 2 important objects: tasks and futures.

Futures

Futures are objects that have the __await__() method implemented, and their job is to hold a certain state and result. The state can be one of the following:

  1. PENDING – future does not have any result or exception set.
  2. CANCELLED – future was cancelled using fut.cancel()
  3. FINISHED – future was finished, either by a result set using fut.set_result() or by an exception set using fut.set_exception()

The result, just like you have guessed, can either be a Python object, that will be returned, or an exception which may be raised.

Another important feature of future objects, is that they contain a method called add_done_callback(). This method allows functions to be called as soon as the task is done – whether it raised an exception or finished.

Tasks

Task objects are special futures, which wrap around coroutines, and communicate with the inner-most and outer-most coroutines. Every time a coroutine awaits a future, the future is passed all the way back to the task (just like in yield from), and the task receives it.

Next, the task binds itself to the future. It does so by calling add_done_callback() on the future. From now on, if the future will ever be done, by either being cancelled, passed an exception or passed a Python object as a result, the task’s callback will be called, and it will rise back up to existence.

Asyncio

The final burning question we must answer is – how is the IO implemented?

Deep inside asyncio, we have an event loop. An event loop of tasks. The event loop’s job is to call tasks every time they are ready and coordinate all that effort into one single working machine.

The IO part of the event loop is built upon a single crucial function called select. Select is a blocking function, implemented by the operating system underneath, that allows waiting on sockets for incoming or outgoing data. Upon data being received it wakes up, and returns the sockets which received data, or the sockets whom are ready for writing.

When you try to receive or send data over a socket through asyncio, what actually happens below is that the socket is first checked if it has any data that can be immediately read or sent. If its .send() buffer is full, or the .recv() buffer is empty, the socket is registered to the select function (by simply adding it to one of the lists, rlist for recv and wlist for send) and the appropriate function awaits a newly created future object, tied to that socket.

When all available tasks are waiting for futures, the event loop calls select and waits. When the one of the sockets has incoming data, or its send buffer drained up, asyncio checks for the future object tied to that socket, and sets it to done.

Now all the magic happens. The future is set to done, the task that added itself before with add_done_callback() rises up back to life, and calls .send() on the coroutine which resumes the inner-most coroutine (because of the await chain) and you read the newly received data from a nearby buffer it was spilled unto.

Method chain again, in case of recv():

  1. select.select waits.
  2. A ready socket, with data is returned.
  3. Data from the socket is moved into a buffer.
  4. future.set_result() is called.
  5. Task that added itself with add_done_callback() is now woken up.
  6. Task calls .send() on the coroutine which goes all the way into the inner-most coroutine and wakes it up.
  7. Data is being read from the buffer and returned to our humble user.

In summary, asyncio uses generator capabilities, that allow pausing and resuming functions. It uses yield from capabilities that allow passing data back and forth from the inner-most generator to the outer-most. It uses all of those in order to halt function execution while it’s waiting for IO to complete (by using the OS select function).

And the best of all? While one function is paused, another may run and interleave with the delicate fabric, which is asyncio.


回答 1

谈论async/awaitasyncio不是一回事。第一个是基本的低级构造(协程),而第二个是使用这些构造的库。相反,没有单一的最终答案。

下面是如何的一般说明async/awaitasyncio样库的工作。也就是说,可能还有其他的技巧(有…),但是除非您自己构建它们,否则它们是无关紧要的。除非您已经足够知道不必提出这样的问题,否则差异应该可以忽略不计。

1.坚果壳中的协程与子程序

就像子例程(函数,过程,…)一样,协程(生成器,…)是调用堆栈和指令指针的抽象:有执行代码段的堆栈,每个执行段都是特定的指令。

defvs 的区别async def只是为了清楚起见。实际的差别是returnyield。从此,awaityield from从单个调用到整个堆栈取不同。

1.1。子程序

子例程表示一个新的堆栈级别,用于保存局部变量,并且单次遍历其指令即可到达末尾。考虑这样的子例程:

def subfoo(bar):
     qux = 3
     return qux * bar

当您运行它时,这意味着

  1. bar和分配堆栈空间qux
  2. 递归执行第一个语句并跳转到下一个语句
  3. 一次return,将其值推入调用堆栈
  4. 清除堆栈(1.)和指令指针(2.)

值得注意的是,4.表示子例程始终以相同的状态开始。该功能本身专有的所有内容在完成后都会丢失。即使后面有说明,也无法恢复功能return

root -\
  :    \- subfoo --\
  :/--<---return --/
  |
  V

1.2。协程作为持久子例程

协程就像一个子例程,但是可以在破坏其状态的情况下退出。考虑这样的协程:

 def cofoo(bar):
      qux = yield bar  # yield marks a break point
      return qux

当您运行它时,这意味着

  1. bar和分配堆栈空间qux
  2. 递归执行第一个语句并跳转到下一个语句
    1. 一次yield,将其值压入调用堆栈,但存储堆栈和指令指针
    2. 一旦调用yield,恢复堆栈和指令指针并将参数推入qux
  3. 一次return,将其值推入调用堆栈
  4. 清除堆栈(1.)和指令指针(2.)

请注意,添加了2.1和2.2-协程可以在预定的位置挂起并恢复。这类似于在调用另一个子例程期间暂停子例程的方式。区别在于活动协程并不严格绑定到其调用堆栈。相反,悬挂的协程是单独的隔离堆栈的一部分。

root -\
  :    \- cofoo --\
  :/--<+--yield --/
  |    :
  V    :

这意味着悬浮的协程可以在堆栈之间自由存储或移动。任何有权访问协程的调用堆栈都可以决定恢复它。

1.3。遍历调用栈

到目前为止,我们的协程仅在调用堆栈中yield。子程序可以去和高达调用堆栈return()。为了完整性,协程还需要一种机制来提升调用堆栈。考虑这样的协程:

def wrap():
    yield 'before'
    yield from cofoo()
    yield 'after'

当您运行它时,这意味着它仍然像子例程一样分配堆栈和指令指针。当它挂起时,仍然就像存储一个子例程。

然而,yield from确实两者。它挂起堆栈wrap 运行指令cofoo。请注意,它将wrap保持挂起状态,直到cofoo完全完成。每当cofoo挂起或发送任何内容时,cofoo都直接连接到调用堆栈。

1.4。协程一直向下

如建立的那样,yield from允许将两个示波器连接到另一个中间示波器。递归应用时,这意味着堆栈的顶部可以连接到堆栈的底部

root -\
  :    \-> coro_a -yield-from-> coro_b --\
  :/ <-+------------------------yield ---/
  |    :
  :\ --+-- coro_a.send----------yield ---\
  :                             coro_b <-/

请注意,rootcoro_b不知道对方。这使得协程比回调更干净:协程仍然像子例程一样建立在1:1关系上。协程将暂停并恢复其整个现有执行堆栈,直到常规调用点为止。

值得注意的是,root可以恢复任意数量的协程。但是,它永远不能同时恢复多个。同一根的协程是并发的,但不是并行的!

1.5。Python的asyncawait

到目前为止,该解释已明确使用生成器的yieldyield from词汇-基本功能相同。新的Python3.5语法asyncawait主要是为了清楚起见。

def foo():  # subroutine?
     return None

def foo():  # coroutine?
     yield from foofoo()  # generator? coroutine?

async def foo():  # coroutine!
     await foofoo()  # coroutine!
     return None

需要使用async forand async with语句,因为您将yield from/await使用裸露的forand with语句断开链接。

2.简单事件循环的剖析

就一个协程本身而言,没有控制其他协程的概念。它只能对协程堆栈底部的调用者产生控制权。然后,此调用者可以切换到另一个协程并运行它。

几个协程的根节点通常是一个事件循环:在挂起时,协程会产生一个事件,并在该事件上恢复。反过来,事件循环能够有效地等待这些事件发生。这使它可以决定接下来要运行哪个协程,或在恢复之前如何等待。

这种设计意味着循环可以理解一组预定义的事件。几个协程await相互配合,直到最终完成一个事件await。该事件可以通过控制直接与事件循环通信yield

loop -\
  :    \-> coroutine --await--> event --\
  :/ <-+----------------------- yield --/
  |    :
  |    :  # loop waits for event to happen
  |    :
  :\ --+-- send(reply) -------- yield --\
  :        coroutine <--yield-- event <-/

关键是协程暂停允许事件循环和事件直接通信。中间协程堆栈不需要任何有关运行哪个循环或事件如何工作的知识。

2.1.1。及时事件

要处理的最简单事件是到达某个时间点。这也是线程代码的基本块:线程重复sleeps直到条件成立。但是,常规规则sleep本身会阻止执行-我们希望其他协程不被阻止。相反,我们想告诉事件循环何时应恢复当前协程堆栈。

2.1.2。定义事件

事件只是我们可以识别的值-通过枚举,类型或其他标识。我们可以使用存储目标时间的简单类来定义它。除了存储事件信息之外,我们还可以await直接允许一个类。

class AsyncSleep:
    """Event to sleep until a point in time"""
    def __init__(self, until: float):
        self.until = until

    # used whenever someone ``await``s an instance of this Event
    def __await__(self):
        # yield this Event to the loop
        yield self

    def __repr__(self):
        return '%s(until=%.1f)' % (self.__class__.__name__, self.until)

此类仅存储事件-它没有说明如何实际处理它。

唯一的特殊功能是__await__await关键字寻找的内容。实际上,它是一个迭代器,但不适用于常规迭代机制。

2.2.1。等待事件

现在我们有了一个事件,协程对此有何反应?我们应该能够表达相当于sleepawait荷兰国际集团我们的活动。为了更好地了解发生了什么,我们将等待一半的时间两次:

import time

async def asleep(duration: float):
    """await that ``duration`` seconds pass"""
    await AsyncSleep(time.time() + duration / 2)
    await AsyncSleep(time.time() + duration / 2)

我们可以直接实例化并运行此协程。类似于生成器,使用coroutine.send运行协程直到得到yield结果。

coroutine = asleep(100)
while True:
    print(coroutine.send(None))
    time.sleep(0.1)

这给了我们两个AsyncSleep事件,然后是StopIteration协程完成的一个事件。请注意,唯一的延迟来自time.sleep循环!每个AsyncSleep仅存储当前时间的偏移量。

2.2.2。活动+睡眠

目前,我们有两种独立的机制可供使用:

  • AsyncSleep 可以从协程内部产生的事件
  • time.sleep 可以等待而不会影响协程

值得注意的是,这两个是正交的:一个都不影响或触发另一个。结果,我们可以提出自己的策略sleep来应对延迟AsyncSleep

2.3。天真的事件循环

如果我们有几个协程,每个协程可以告诉我们何时要唤醒它。然后,我们可以等到第一个恢复之前,然后再恢复,依此类推。值得注意的是,在每一点上我们只关心下一个

这样可以进行简单的调度:

  1. 按照所需的唤醒时间对协程进行排序
  2. 选择第一个想要唤醒的人
  3. 等到这个时间点
  4. 运行这个协程
  5. 从1开始重复。

一个简单的实现不需要任何高级概念。A list允许按日期对协程进行排序。等待是有规律的time.sleep。运行协程的工作方式与之前一样coroutine.send

def run(*coroutines):
    """Cooperatively run all ``coroutines`` until completion"""
    # store wake-up-time and coroutines
    waiting = [(0, coroutine) for coroutine in coroutines]
    while waiting:
        # 2. pick the first coroutine that wants to wake up
        until, coroutine = waiting.pop(0)
        # 3. wait until this point in time
        time.sleep(max(0.0, until - time.time()))
        # 4. run this coroutine
        try:
            command = coroutine.send(None)
        except StopIteration:
            continue
        # 1. sort coroutines by their desired suspension
        if isinstance(command, AsyncSleep):
            waiting.append((command.until, coroutine))
            waiting.sort(key=lambda item: item[0])

当然,这还有很大的改进空间。我们可以将堆用于等待队列,或者将调度表用于事件。我们还可以从中获取返回值,StopIteration并将其分配给协程。但是,基本原理保持不变。

2.4。合作等待

AsyncSleep事件和run事件循环是定时事件的工作完全实现。

async def sleepy(identifier: str = "coroutine", count=5):
    for i in range(count):
        print(identifier, 'step', i + 1, 'at %.2f' % time.time())
        await asleep(0.1)

run(*(sleepy("coroutine %d" % j) for j in range(5)))

这将在五个协程中的每个协程之间进行协作切换,每个协程暂停0.1秒。即使事件循环是同步的,它仍然可以在0.5秒而不是2.5秒内执行工作。每个协程保持状态并独立运行。

3. I / O事件循环

支持的事件循环sleep适用于轮询。但是,等待文件句柄上的I / O可以更有效地完成:操作系统实现I / O,因此知道哪些句柄已准备就绪。理想情况下,事件循环应支持显式的“ Ready for I / O”事件。

3.1。该select呼叫

Python已经有一个接口可以查询OS的读取I / O句柄。当调用带有读取或写入的句柄时,它返回准备读取或写入的句柄:

readable, writeable, _ = select.select(rlist, wlist, xlist, timeout)

例如,我们可以open写入文件并等待其准备就绪:

write_target = open('/tmp/foo')
readable, writeable, _ = select.select([], [write_target], [])

select返回后,writeable包含我们的打开文件。

3.2。基本I / O事件

AsyncSleep请求类似,我们需要为I / O定义一个事件。使用底层select逻辑,事件必须引用可读对象-例如open文件。另外,我们存储要读取的数据量。

class AsyncRead:
    def __init__(self, file, amount=1):
        self.file = file
        self.amount = amount
        self._buffer = ''

    def __await__(self):
        while len(self._buffer) < self.amount:
            yield self
            # we only get here if ``read`` should not block
            self._buffer += self.file.read(1)
        return self._buffer

    def __repr__(self):
        return '%s(file=%s, amount=%d, progress=%d)' % (
            self.__class__.__name__, self.file, self.amount, len(self._buffer)
        )

AsyncSleep我们一样,我们大多只是存储底层系统调用所需的数据。这次__await__可以恢复多次-直到我们的需求amount被阅读为止。另外,我们return的I / O结果不只是恢复。

3.3。使用读取的I / O增强事件循环

事件循环的基础仍然是run先前定义的。首先,我们需要跟踪读取请求。这不再是排序的时间表,我们仅将读取请求映射到协程。

# new
waiting_read = {}  # type: Dict[file, coroutine]

由于select.select采用了超时参数,因此可以代替time.sleep

# old
time.sleep(max(0.0, until - time.time()))
# new
readable, _, _ = select.select(list(reads), [], [])

这将为我们提供所有可读文件-如果有的话,我们将运行相应的协程。如果没有,我们已经等待了足够长的时间来运行当前的协程。

# new - reschedule waiting coroutine, run readable coroutine
if readable:
    waiting.append((until, coroutine))
    waiting.sort()
    coroutine = waiting_read[readable[0]]

最后,我们必须实际侦听读取请求。

# new
if isinstance(command, AsyncSleep):
    ...
elif isinstance(command, AsyncRead):
    ...

3.4。把它放在一起

上面有点简化。如果我们总是可以阅读的话,我们需要做一些切换,以免饿死协程。我们需要处理没有阅读或等待的东西。但是,最终结果仍适合30 LOC。

def run(*coroutines):
    """Cooperatively run all ``coroutines`` until completion"""
    waiting_read = {}  # type: Dict[file, coroutine]
    waiting = [(0, coroutine) for coroutine in coroutines]
    while waiting or waiting_read:
        # 2. wait until the next coroutine may run or read ...
        try:
            until, coroutine = waiting.pop(0)
        except IndexError:
            until, coroutine = float('inf'), None
            readable, _, _ = select.select(list(waiting_read), [], [])
        else:
            readable, _, _ = select.select(list(waiting_read), [], [], max(0.0, until - time.time()))
        # ... and select the appropriate one
        if readable and time.time() < until:
            if until and coroutine:
                waiting.append((until, coroutine))
                waiting.sort()
            coroutine = waiting_read.pop(readable[0])
        # 3. run this coroutine
        try:
            command = coroutine.send(None)
        except StopIteration:
            continue
        # 1. sort coroutines by their desired suspension ...
        if isinstance(command, AsyncSleep):
            waiting.append((command.until, coroutine))
            waiting.sort(key=lambda item: item[0])
        # ... or register reads
        elif isinstance(command, AsyncRead):
            waiting_read[command.file] = coroutine

3.5。协同I / O

AsyncSleepAsyncRead并且run实现已全功能的睡眠和/或读取。与相同sleepy,我们可以定义一个帮助程序来测试阅读:

async def ready(path, amount=1024*32):
    print('read', path, 'at', '%d' % time.time())
    with open(path, 'rb') as file:
        result = return await AsyncRead(file, amount)
    print('done', path, 'at', '%d' % time.time())
    print('got', len(result), 'B')

run(sleepy('background', 5), ready('/dev/urandom'))

运行此命令,我们可以看到我们的I / O与等待的任务交错:

id background round 1
read /dev/urandom at 1530721148
id background round 2
id background round 3
id background round 4
id background round 5
done /dev/urandom at 1530721148
got 1024 B

4.非阻塞I / O

虽然文件上的I / O可以理解这个概念,但它实际上并不适合于像这样的库asyncioselect调用总是返回文件,并且两者都调用,open并且read可能无限期地阻塞。这阻止了事件循环的所有协程-这很糟糕。诸如此类的库aiofiles使用线程和同步来伪造文件中的非阻塞I / O和事件。

但是,套接字确实允许无阻塞的I / O-并且它们固有的延迟使其变得更加关键。在事件循环中使用时,可以包装等待数据和重试而不会阻塞任何内容。

4.1。非阻塞I / O事件

与我们类似AsyncRead,我们可以为套接字定义一个暂停和读取事件。我们不使用文件,而是使用套接字-该套接字必须是非阻塞的。另外,我们__await__使用socket.recv代替file.read

class AsyncRecv:
    def __init__(self, connection, amount=1, read_buffer=1024):
        assert not connection.getblocking(), 'connection must be non-blocking for async recv'
        self.connection = connection
        self.amount = amount
        self.read_buffer = read_buffer
        self._buffer = b''

    def __await__(self):
        while len(self._buffer) < self.amount:
            try:
                self._buffer += self.connection.recv(self.read_buffer)
            except BlockingIOError:
                yield self
        return self._buffer

    def __repr__(self):
        return '%s(file=%s, amount=%d, progress=%d)' % (
            self.__class__.__name__, self.connection, self.amount, len(self._buffer)
        )

与相比AsyncRead__await__执行真正的非阻塞I / O。当有数据时,它总是读取。如果没有可用数据,它将始终挂起。这意味着仅在我们执行有用的工作时才阻止事件循环。

4.2。解除阻塞事件循环

就事件循环而言,没有什么变化。要监听的事件仍然与文件相同-由标记为ready的文件描述符select

# old
elif isinstance(command, AsyncRead):
    waiting_read[command.file] = coroutine
# new
elif isinstance(command, AsyncRead):
    waiting_read[command.file] = coroutine
elif isinstance(command, AsyncRecv):
    waiting_read[command.connection] = coroutine

在这一点上,显然与AsyncReadAsyncRecv是同一种事件。我们可以轻松地将它们重构为一个具有可交换I / O组件的事件。实际上,事件循环,协程和事件调度程序,任意中间代码和实际I / O 清晰地分开

4.3。非阻塞I / O的丑陋一面

原则上,你应该在这一点上做的是复制的逻辑read作为recvAsyncRecv。但是,这现在变得更加丑陋-当函数在内核内部阻塞时,您必须处理早期返回,但要对您产生控制权。例如,打开连接与打开文件的时间更长:

# file
file = open(path, 'rb')
# non-blocking socket
connection = socket.socket()
connection.setblocking(False)
# open without blocking - retry on failure
try:
    connection.connect((url, port))
except BlockingIOError:
    pass

长话短说,剩下的就是几十行异常处理。此时事件和事件循环已经起作用。

id background round 1
read localhost:25000 at 1530783569
read /dev/urandom at 1530783569
done localhost:25000 at 1530783569 got 32768 B
id background round 2
id background round 3
id background round 4
done /dev/urandom at 1530783569 got 4096 B
id background round 5

附录

github上的示例代码

Talking about async/await and asyncio is not the same thing. The first is a fundamental, low-level construct (coroutines) while the later is a library using these constructs. Conversely, there is no single ultimate answer.

The following is a general description of how async/await and asyncio-like libraries work. That is, there may be other tricks on top (there are…) but they are inconsequential unless you build them yourself. The difference should be negligible unless you already know enough to not have to ask such a question.

1. Coroutines versus subroutines in a nut shell

Just like subroutines (functions, procedures, …), coroutines (generators, …) are an abstraction of call stack and instruction pointer: there is a stack of executing code pieces, and each is at a specific instruction.

The distinction of def versus async def is merely for clarity. The actual difference is return versus yield. From this, await or yield from take the difference from individual calls to entire stacks.

1.1. Subroutines

A subroutine represents a new stack level to hold local variables, and a single traversal of its instructions to reach an end. Consider a subroutine like this:

def subfoo(bar):
     qux = 3
     return qux * bar

When you run it, that means

  1. allocate stack space for bar and qux
  2. recursively execute the first statement and jump to the next statement
  3. once at a return, push its value to the calling stack
  4. clear the stack (1.) and instruction pointer (2.)

Notably, 4. means that a subroutine always starts at the same state. Everything exclusive to the function itself is lost upon completion. A function cannot be resumed, even if there are instructions after return.

root -\
  :    \- subfoo --\
  :/--<---return --/
  |
  V

1.2. Coroutines as persistent subroutines

A coroutine is like a subroutine, but can exit without destroying its state. Consider a coroutine like this:

 def cofoo(bar):
      qux = yield bar  # yield marks a break point
      return qux

When you run it, that means

  1. allocate stack space for bar and qux
  2. recursively execute the first statement and jump to the next statement
    1. once at a yield, push its value to the calling stack but store the stack and instruction pointer
    2. once calling into yield, restore stack and instruction pointer and push arguments to qux
  3. once at a return, push its value to the calling stack
  4. clear the stack (1.) and instruction pointer (2.)

Note the addition of 2.1 and 2.2 – a coroutine can be suspended and resumed at predefined points. This is similar to how a subroutine is suspended during calling another subroutine. The difference is that the active coroutine is not strictly bound to its calling stack. Instead, a suspended coroutine is part of a separate, isolated stack.

root -\
  :    \- cofoo --\
  :/--<+--yield --/
  |    :
  V    :

This means that suspended coroutines can be freely stored or moved between stacks. Any call stack that has access to a coroutine can decide to resume it.

1.3. Traversing the call stack

So far, our coroutine only goes down the call stack with yield. A subroutine can go down and up the call stack with return and (). For completeness, coroutines also need a mechanism to go up the call stack. Consider a coroutine like this:

def wrap():
    yield 'before'
    yield from cofoo()
    yield 'after'

When you run it, that means it still allocates the stack and instruction pointer like a subroutine. When it suspends, that still is like storing a subroutine.

However, yield from does both. It suspends stack and instruction pointer of wrap and runs cofoo. Note that wrap stays suspended until cofoo finishes completely. Whenever cofoo suspends or something is sent, cofoo is directly connected to the calling stack.

1.4. Coroutines all the way down

As established, yield from allows to connect two scopes across another intermediate one. When applied recursively, that means the top of the stack can be connected to the bottom of the stack.

root -\
  :    \-> coro_a -yield-from-> coro_b --\
  :/ <-+------------------------yield ---/
  |    :
  :\ --+-- coro_a.send----------yield ---\
  :                             coro_b <-/

Note that root and coro_b do not know about each other. This makes coroutines much cleaner than callbacks: coroutines still built on a 1:1 relation like subroutines. Coroutines suspend and resume their entire existing execution stack up until a regular call point.

Notably, root could have an arbitrary number of coroutines to resume. Yet, it can never resume more than one at the same time. Coroutines of the same root are concurrent but not parallel!

1.5. Python’s async and await

The explanation has so far explicitly used the yield and yield from vocabulary of generators – the underlying functionality is the same. The new Python3.5 syntax async and await exists mainly for clarity.

def foo():  # subroutine?
     return None

def foo():  # coroutine?
     yield from foofoo()  # generator? coroutine?

async def foo():  # coroutine!
     await foofoo()  # coroutine!
     return None

The async for and async with statements are needed because you would break the yield from/await chain with the bare for and with statements.

2. Anatomy of a simple event loop

By itself, a coroutine has no concept of yielding control to another coroutine. It can only yield control to the caller at the bottom of a coroutine stack. This caller can then switch to another coroutine and run it.

This root node of several coroutines is commonly an event loop: on suspension, a coroutine yields an event on which it wants resume. In turn, the event loop is capable of efficiently waiting for these events to occur. This allows it to decide which coroutine to run next, or how to wait before resuming.

Such a design implies that there is a set of pre-defined events that the loop understands. Several coroutines await each other, until finally an event is awaited. This event can communicate directly with the event loop by yielding control.

loop -\
  :    \-> coroutine --await--> event --\
  :/ <-+----------------------- yield --/
  |    :
  |    :  # loop waits for event to happen
  |    :
  :\ --+-- send(reply) -------- yield --\
  :        coroutine <--yield-- event <-/

The key is that coroutine suspension allows the event loop and events to directly communicate. The intermediate coroutine stack does not require any knowledge about which loop is running it, nor how events work.

2.1.1. Events in time

The simplest event to handle is reaching a point in time. This is a fundamental block of threaded code as well: a thread repeatedly sleeps until a condition is true. However, a regular sleep blocks execution by itself – we want other coroutines to not be blocked. Instead, we want tell the event loop when it should resume the current coroutine stack.

2.1.2. Defining an Event

An event is simply a value we can identify – be it via an enum, a type or other identity. We can define this with a simple class that stores our target time. In addition to storing the event information, we can allow to await a class directly.

class AsyncSleep:
    """Event to sleep until a point in time"""
    def __init__(self, until: float):
        self.until = until

    # used whenever someone ``await``s an instance of this Event
    def __await__(self):
        # yield this Event to the loop
        yield self
    
    def __repr__(self):
        return '%s(until=%.1f)' % (self.__class__.__name__, self.until)

This class only stores the event – it does not say how to actually handle it.

The only special feature is __await__ – it is what the await keyword looks for. Practically, it is an iterator but not available for the regular iteration machinery.

2.2.1. Awaiting an event

Now that we have an event, how do coroutines react to it? We should be able to express the equivalent of sleep by awaiting our event. To better see what is going on, we wait twice for half the time:

import time

async def asleep(duration: float):
    """await that ``duration`` seconds pass"""
    await AsyncSleep(time.time() + duration / 2)
    await AsyncSleep(time.time() + duration / 2)

We can directly instantiate and run this coroutine. Similar to a generator, using coroutine.send runs the coroutine until it yields a result.

coroutine = asleep(100)
while True:
    print(coroutine.send(None))
    time.sleep(0.1)

This gives us two AsyncSleep events and then a StopIteration when the coroutine is done. Notice that the only delay is from time.sleep in the loop! Each AsyncSleep only stores an offset from the current time.

2.2.2. Event + Sleep

At this point, we have two separate mechanisms at our disposal:

  • AsyncSleep Events that can be yielded from inside a coroutine
  • time.sleep that can wait without impacting coroutines

Notably, these two are orthogonal: neither one affects or triggers the other. As a result, we can come up with our own strategy to sleep to meet the delay of an AsyncSleep.

2.3. A naive event loop

If we have several coroutines, each can tell us when it wants to be woken up. We can then wait until the first of them wants to be resumed, then for the one after, and so on. Notably, at each point we only care about which one is next.

This makes for a straightforward scheduling:

  1. sort coroutines by their desired wake up time
  2. pick the first that wants to wake up
  3. wait until this point in time
  4. run this coroutine
  5. repeat from 1.

A trivial implementation does not need any advanced concepts. A list allows to sort coroutines by date. Waiting is a regular time.sleep. Running coroutines works just like before with coroutine.send.

def run(*coroutines):
    """Cooperatively run all ``coroutines`` until completion"""
    # store wake-up-time and coroutines
    waiting = [(0, coroutine) for coroutine in coroutines]
    while waiting:
        # 2. pick the first coroutine that wants to wake up
        until, coroutine = waiting.pop(0)
        # 3. wait until this point in time
        time.sleep(max(0.0, until - time.time()))
        # 4. run this coroutine
        try:
            command = coroutine.send(None)
        except StopIteration:
            continue
        # 1. sort coroutines by their desired suspension
        if isinstance(command, AsyncSleep):
            waiting.append((command.until, coroutine))
            waiting.sort(key=lambda item: item[0])

Of course, this has ample room for improvement. We can use a heap for the wait queue or a dispatch table for events. We could also fetch return values from the StopIteration and assign them to the coroutine. However, the fundamental principle remains the same.

2.4. Cooperative Waiting

The AsyncSleep event and run event loop are a fully working implementation of timed events.

async def sleepy(identifier: str = "coroutine", count=5):
    for i in range(count):
        print(identifier, 'step', i + 1, 'at %.2f' % time.time())
        await asleep(0.1)

run(*(sleepy("coroutine %d" % j) for j in range(5)))

This cooperatively switches between each of the five coroutines, suspending each for 0.1 seconds. Even though the event loop is synchronous, it still executes the work in 0.5 seconds instead of 2.5 seconds. Each coroutine holds state and acts independently.

3. I/O event loop

An event loop that supports sleep is suitable for polling. However, waiting for I/O on a file handle can be done more efficiently: the operating system implements I/O and thus knows which handles are ready. Ideally, an event loop should support an explicit “ready for I/O” event.

3.1. The select call

Python already has an interface to query the OS for read I/O handles. When called with handles to read or write, it returns the handles ready to read or write:

readable, writeable, _ = select.select(rlist, wlist, xlist, timeout)

For example, we can open a file for writing and wait for it to be ready:

write_target = open('/tmp/foo')
readable, writeable, _ = select.select([], [write_target], [])

Once select returns, writeable contains our open file.

3.2. Basic I/O event

Similar to the AsyncSleep request, we need to define an event for I/O. With the underlying select logic, the event must refer to a readable object – say an open file. In addition, we store how much data to read.

class AsyncRead:
    def __init__(self, file, amount=1):
        self.file = file
        self.amount = amount
        self._buffer = ''

    def __await__(self):
        while len(self._buffer) < self.amount:
            yield self
            # we only get here if ``read`` should not block
            self._buffer += self.file.read(1)
        return self._buffer

    def __repr__(self):
        return '%s(file=%s, amount=%d, progress=%d)' % (
            self.__class__.__name__, self.file, self.amount, len(self._buffer)
        )

As with AsyncSleep we mostly just store the data required for the underlying system call. This time, __await__ is capable of being resumed multiple times – until our desired amount has been read. In addition, we return the I/O result instead of just resuming.

3.3. Augmenting an event loop with read I/O

The basis for our event loop is still the run defined previously. First, we need to track the read requests. This is no longer a sorted schedule, we only map read requests to coroutines.

# new
waiting_read = {}  # type: Dict[file, coroutine]

Since select.select takes a timeout parameter, we can use it in place of time.sleep.

# old
time.sleep(max(0.0, until - time.time()))
# new
readable, _, _ = select.select(list(reads), [], [])

This gives us all readable files – if there are any, we run the corresponding coroutine. If there are none, we have waited long enough for our current coroutine to run.

# new - reschedule waiting coroutine, run readable coroutine
if readable:
    waiting.append((until, coroutine))
    waiting.sort()
    coroutine = waiting_read[readable[0]]

Finally, we have to actually listen for read requests.

# new
if isinstance(command, AsyncSleep):
    ...
elif isinstance(command, AsyncRead):
    ...

3.4. Putting it together

The above was a bit of a simplification. We need to do some switching to not starve sleeping coroutines if we can always read. We need to handle having nothing to read or nothing to wait for. However, the end result still fits into 30 LOC.

def run(*coroutines):
    """Cooperatively run all ``coroutines`` until completion"""
    waiting_read = {}  # type: Dict[file, coroutine]
    waiting = [(0, coroutine) for coroutine in coroutines]
    while waiting or waiting_read:
        # 2. wait until the next coroutine may run or read ...
        try:
            until, coroutine = waiting.pop(0)
        except IndexError:
            until, coroutine = float('inf'), None
            readable, _, _ = select.select(list(waiting_read), [], [])
        else:
            readable, _, _ = select.select(list(waiting_read), [], [], max(0.0, until - time.time()))
        # ... and select the appropriate one
        if readable and time.time() < until:
            if until and coroutine:
                waiting.append((until, coroutine))
                waiting.sort()
            coroutine = waiting_read.pop(readable[0])
        # 3. run this coroutine
        try:
            command = coroutine.send(None)
        except StopIteration:
            continue
        # 1. sort coroutines by their desired suspension ...
        if isinstance(command, AsyncSleep):
            waiting.append((command.until, coroutine))
            waiting.sort(key=lambda item: item[0])
        # ... or register reads
        elif isinstance(command, AsyncRead):
            waiting_read[command.file] = coroutine

3.5. Cooperative I/O

The AsyncSleep, AsyncRead and run implementations are now fully functional to sleep and/or read. Same as for sleepy, we can define a helper to test reading:

async def ready(path, amount=1024*32):
    print('read', path, 'at', '%d' % time.time())
    with open(path, 'rb') as file:
        result = await AsyncRead(file, amount)
    print('done', path, 'at', '%d' % time.time())
    print('got', len(result), 'B')

run(sleepy('background', 5), ready('/dev/urandom'))

Running this, we can see that our I/O is interleaved with the waiting task:

id background round 1
read /dev/urandom at 1530721148
id background round 2
id background round 3
id background round 4
id background round 5
done /dev/urandom at 1530721148
got 1024 B

4. Non-Blocking I/O

While I/O on files gets the concept across, it is not really suitable for a library like asyncio: the select call always returns for files, and both open and read may block indefinitely. This blocks all coroutines of an event loop – which is bad. Libraries like aiofiles use threads and synchronization to fake non-blocking I/O and events on file.

However, sockets do allow for non-blocking I/O – and their inherent latency makes it much more critical. When used in an event loop, waiting for data and retrying can be wrapped without blocking anything.

4.1. Non-Blocking I/O event

Similar to our AsyncRead, we can define a suspend-and-read event for sockets. Instead of taking a file, we take a socket – which must be non-blocking. Also, our __await__ uses socket.recv instead of file.read.

class AsyncRecv:
    def __init__(self, connection, amount=1, read_buffer=1024):
        assert not connection.getblocking(), 'connection must be non-blocking for async recv'
        self.connection = connection
        self.amount = amount
        self.read_buffer = read_buffer
        self._buffer = b''

    def __await__(self):
        while len(self._buffer) < self.amount:
            try:
                self._buffer += self.connection.recv(self.read_buffer)
            except BlockingIOError:
                yield self
        return self._buffer

    def __repr__(self):
        return '%s(file=%s, amount=%d, progress=%d)' % (
            self.__class__.__name__, self.connection, self.amount, len(self._buffer)
        )

In contrast to AsyncRead, __await__ performs truly non-blocking I/O. When data is available, it always reads. When no data is available, it always suspends. That means the event loop is only blocked while we perform useful work.

4.2. Un-Blocking the event loop

As far as the event loop is concerned, nothing changes much. The event to listen for is still the same as for files – a file descriptor marked ready by select.

# old
elif isinstance(command, AsyncRead):
    waiting_read[command.file] = coroutine
# new
elif isinstance(command, AsyncRead):
    waiting_read[command.file] = coroutine
elif isinstance(command, AsyncRecv):
    waiting_read[command.connection] = coroutine

At this point, it should be obvious that AsyncRead and AsyncRecv are the same kind of event. We could easily refactor them to be one event with an exchangeable I/O component. In effect, the event loop, coroutines and events cleanly separate a scheduler, arbitrary intermediate code and the actual I/O.

4.3. The ugly side of non-blocking I/O

In principle, what you should do at this point is replicate the logic of read as a recv for AsyncRecv. However, this is much more ugly now – you have to handle early returns when functions block inside the kernel, but yield control to you. For example, opening a connection versus opening a file is much longer:

# file
file = open(path, 'rb')
# non-blocking socket
connection = socket.socket()
connection.setblocking(False)
# open without blocking - retry on failure
try:
    connection.connect((url, port))
except BlockingIOError:
    pass

Long story short, what remains is a few dozen lines of Exception handling. The events and event loop already work at this point.

id background round 1
read localhost:25000 at 1530783569
read /dev/urandom at 1530783569
done localhost:25000 at 1530783569 got 32768 B
id background round 2
id background round 3
id background round 4
done /dev/urandom at 1530783569 got 4096 B
id background round 5

Addendum

Example code at github


回答 2

coro概念上讲,您的退货是正确的,但略微不完整。

await不会无条件地挂起,只有在遇到阻塞调用时才挂起。它如何知道呼叫正在阻塞?这由等待的代码决定。例如,可以将套接字读取的等待实现改为:

def read(sock, n):
    # sock must be in non-blocking mode
    try:
        return sock.recv(n)
    except EWOULDBLOCK:
        event_loop.add_reader(sock.fileno, current_task())
        return SUSPEND

在实际的异步中,等效代码修改a的状态,Future而不返回魔术值,但是概念是相同的。当适当地适合于类似生成器的对象时,可以await编辑以上代码。

在呼叫方,当协程包含:

data = await read(sock, 1024)

它减少了接近:

data = read(sock, 1024)
if data is SUSPEND:
    return SUSPEND
self.pos += 1
self.parts[self.pos](...)

熟悉生成器的人往往会根据yield from悬浮液自动进行描述。

挂起链一直持续到事件循环,该循环注意到协程已挂起,将其从可运行集合中删除,然后继续执行可运行的协程(如果有)。如果没有协程可运行,则循环等待,select()直到协程感兴趣的文件描述符中的任何一个都准备好进行IO。(事件循环维护文件描述符到协程的映射。)

在上面的示例中,一旦select()告知事件循环sock可读,它将重新添加coro到可运行集,因此将从暂停点继续执行。

换一种说法:

  1. 默认情况下,所有操作都在同一线程中发生。

  2. 事件循环负责安排协程,并在协程等待(通常会阻塞或超时的IO调用)准备就绪时将其唤醒。

为了深入了解协程驱动事件循环,我推荐Dave Beazley的演讲,他在现场观众面前演示了从头开始编写事件循环的过程。

Your coro desugaring is conceptually correct, but slightly incomplete.

await doesn’t suspend unconditionally, but only if it encounters a blocking call. How does it know that a call is blocking? This is decided by the code being awaited. For example, an awaitable implementation of socket read could be desugared to:

def read(sock, n):
    # sock must be in non-blocking mode
    try:
        return sock.recv(n)
    except EWOULDBLOCK:
        event_loop.add_reader(sock.fileno, current_task())
        return SUSPEND

In real asyncio the equivalent code modifies the state of a Future instead of returning magic values, but the concept is the same. When appropriately adapted to a generator-like object, the above code can be awaited.

On the caller side, when your coroutine contains:

data = await read(sock, 1024)

It desugars into something close to:

data = read(sock, 1024)
if data is SUSPEND:
    return SUSPEND
self.pos += 1
self.parts[self.pos](...)

People familiar with generators tend to describe the above in terms of yield from which does the suspension automatically.

The suspension chain continues all the way up to the event loop, which notices that the coroutine is suspended, removes it from the runnable set, and goes on to execute coroutines that are runnable, if any. If no coroutines are runnable, the loop waits in select() until either a file descriptor a coroutine is interested in becomes ready for IO. (The event loop maintains a file-descriptor-to-coroutine mapping.)

In the above example, once select() tells the event loop that sock is readable, it will re-add coro to the runnable set, so it will be continued from the point of suspension.

In other words:

  1. Everything happens in the same thread by default.

  2. The event loop is responsible for scheduling the coroutines and waking them up when whatever they were waiting for (typically an IO call that would normally block, or a timeout) becomes ready.

For insight on coroutine-driving event loops, I recommend this talk by Dave Beazley, where he demonstrates coding an event loop from scratch in front of live audience.


回答 3

归结为异步解决的两个主要挑战:

  • 如何在单个线程中执行多个I / O?
  • 如何实现协作式多任务处理?

关于第一点的答案已经存在了很长一段时间,被称为选择循环。在python中,它是在选择器模块中实现的。

第二个问题与协程的概念有关,即协程可以停止执行并在以后恢复。在python中,协程是使用生成器yield from语句实现的。这就是隐藏在async / await语法后面的东西。

答案中的更多资源。


编辑:解决您对goroutines的评论:

在asyncio中,与goroutine最接近的等效项实际上不是协程,而是任务(请参见文档中的区别)。在python中,协程(或生成器)对事件循环或I / O的概念一无所知。它只是一个可以yield在保持其当前状态的同时停止使用其执行的功能,因此可以在以后还原。该yield from语法允许以透明方式链接它们。

现在,在异步任务中,位于链最底部的协程始终最终产生了未来。然后,这种未来会上升到事件循环,并集成到内部机制中。当将来通过其他内部回调设置为完成时,事件循环可以通过将将来发送回协程链来恢复任务。


编辑:解决您帖子中的一些问题:

在这种情况下,I / O实际如何发生?在单独的线程中?整个解释器是否已暂停并且I / O在解释器外部进行?

不,线程中什么也没有发生。I / O始终由事件循环管理,主要是通过文件描述符进行。但是,这些文件描述符的注册通常被高级协同程序隐藏,这使您的工作变得很脏。

I / O到底是什么意思?如果我的python过程称为C open()过程,然后它向内核发送了中断,放弃了对它的控制,那么Python解释器如何知道这一点并能够继续运行其他代码,而内核代码则执行实际的I / O,直到唤醒原来发送中断的Python过程?原则上,Python解释器如何知道这种情况?

I / O是任何阻塞调用。在asyncio中,所有I / O操作都应经过事件循环,因为正如您所说,事件循环无法知道某个同步代码中正在执行阻塞调用。这意味着您不应该open在协程的上下文中使用同步。相反,请使用aiofiles这样的专用库,该库提供的异步版本open

It all boils down to the two main challenges that asyncio is addressing:

  • How to perform multiple I/O in a single thread?
  • How to implement cooperative multitasking?

The answer to the first point has been around for a long while and is called a select loop. In python, it is implemented in the selectors module.

The second question is related to the concept of coroutine, i.e. functions that can stop their execution and be restored later on. In python, coroutines are implemented using generators and the yield from statement. That’s what is hiding behind the async/await syntax.

More resources in this answer.


EDIT: Addressing your comment about goroutines:

The closest equivalent to a goroutine in asyncio is actually not a coroutine but a task (see the difference in the documentation). In python, a coroutine (or a generator) knows nothing about the concepts of event loop or I/O. It simply is a function that can stop its execution using yield while keeping its current state, so it can be restored later on. The yield from syntax allows for chaining them in a transparent way.

Now, within an asyncio task, the coroutine at the very bottom of the chain always ends up yielding a future. This future then bubbles up to the event loop, and gets integrated into the inner machinery. When the future is set to done by some other inner callback, the event loop can restore the task by sending the future back into the coroutine chain.


EDIT: Addressing some of the questions in your post:

How does I/O actually happen in this scenario? In a separate thread? Is the whole interpreter suspended and I/O happens outside the interpreter?

No, nothing happens in a thread. I/O is always managed by the event loop, mostly through file descriptors. However the registration of those file descriptors is usually hidden by high-level coroutines, making the dirty work for you.

What exactly is meant by I/O? If my python procedure called C open() procedure, and it in turn sent interrupt to kernel, relinquishing control to it, how does Python interpreter know about this and is able to continue running some other code, while kernel code does the actual I/O and until it wakes up the Python procedure which sent the interrupt originally? How can Python interpreter in principle, be aware of this happening?

An I/O is any blocking call. In asyncio, all the I/O operations should go through the event loop, because as you said, the event loop has no way to be aware that a blocking call is being performed in some synchronous code. That means you’re not supposed to use a synchronous open within the context of a coroutine. Instead, use a dedicated library such aiofiles which provides an asynchronous version of open.


“失火” Python异步/等待

问题:“失火” Python异步/等待

有时需要执行一些非关键性的异步操作,但我不想等待它完成。在Tornado的协程实现中,您可以通过简单地省略yield关键字来“触发并忘记”一个异步函数。

我一直在尝试找出如何使用Python 3.5中发布的新语法async/ 来“激发并忘记” await。例如,一个简化的代码片段:

async def async_foo():
    print("Do some stuff asynchronously here...")

def bar():
    async_foo()  # fire and forget "async_foo()"

bar()

但是,发生的事情是bar()永远不会执行,而是收到运行时警告:

RuntimeWarning: coroutine 'async_foo' was never awaited
  async_foo()  # fire and forget "async_foo()"

Sometimes there is some non-critical asynchronous operation that needs to happen but I don’t want to wait for it to complete. In Tornado’s coroutine implementation you can “fire & forget” an asynchronous function by simply ommitting the yield key-word.

I’ve been trying to figure out how to “fire & forget” with the new async/await syntax released in Python 3.5. E.g., a simplified code snippet:

async def async_foo():
    print("Do some stuff asynchronously here...")

def bar():
    async_foo()  # fire and forget "async_foo()"

bar()

What happens though is that bar() never executes and instead we get a runtime warning:

RuntimeWarning: coroutine 'async_foo' was never awaited
  async_foo()  # fire and forget "async_foo()"

回答 0

更新:

如果您使用的是Python> = 3.7,请在任何地方替换asyncio.ensure_futureasyncio.create_task最新的,更好的派生task的方法


asyncio。“解雇”的任务

根据python docs的asyncio.Task说法,有可能启动一些协程以“在后台”执行asyncio.ensure_future 函数创建的任务不会阻止执行(因此函数将立即返回!)。这似乎是您要求的一种“解雇”的方法。

import asyncio


async def async_foo():
    print("async_foo started")
    await asyncio.sleep(1)
    print("async_foo done")


async def main():
    asyncio.ensure_future(async_foo())  # fire and forget async_foo()

    # btw, you can also create tasks inside non-async funcs

    print('Do some actions 1')
    await asyncio.sleep(1)
    print('Do some actions 2')
    await asyncio.sleep(1)
    print('Do some actions 3')


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

输出:

Do some actions 1
async_foo started
Do some actions 2
async_foo done
Do some actions 3

如果事件循环完成后正在执行任务怎么办?

请注意,asyncio期望任务将在事件循环完成时完成。因此,如果您更改main()为:

async def main():
    asyncio.ensure_future(async_foo())  # fire and forget

    print('Do some actions 1')
    await asyncio.sleep(0.1)
    print('Do some actions 2')

程序完成后,您会收到以下警告:

Task was destroyed but it is pending!
task: <Task pending coro=<async_foo() running at [...]

为防止这种情况,您可以在事件循环完成后等待所有待处理的任务

async def main():
    asyncio.ensure_future(async_foo())  # fire and forget

    print('Do some actions 1')
    await asyncio.sleep(0.1)
    print('Do some actions 2')


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

    # Let's also finish all running tasks:
    pending = asyncio.Task.all_tasks()
    loop.run_until_complete(asyncio.gather(*pending))

杀死任务而不是等待任务

有时,您不想等待任务完成(例如,某些任务可能创建为永久运行)。在这种情况下,您可以只取消()而不是等待它们:

import asyncio
from contextlib import suppress


async def echo_forever():
    while True:
        print("echo")
        await asyncio.sleep(1)


async def main():
    asyncio.ensure_future(echo_forever())  # fire and forget

    print('Do some actions 1')
    await asyncio.sleep(1)
    print('Do some actions 2')
    await asyncio.sleep(1)
    print('Do some actions 3')


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

    # Let's also cancel all running tasks:
    pending = asyncio.Task.all_tasks()
    for task in pending:
        task.cancel()
        # Now we should await task to execute it's cancellation.
        # Cancelled task raises asyncio.CancelledError that we can suppress:
        with suppress(asyncio.CancelledError):
            loop.run_until_complete(task)

输出:

Do some actions 1
echo
Do some actions 2
echo
Do some actions 3
echo

Upd:

Replace asyncio.ensure_future with asyncio.create_task everywhere if you’re using Python >= 3.7 It’s newer, nicer way to spawn task.


asyncio.Task to “fire and forget”

According to python docs for asyncio.Task it is possible to start some coroutine to execute “in background”. The task created by asyncio.ensure_future function won’t block the execution (therefore the function will return immediately!). This looks like a way to “fire and forget” as you requested.

import asyncio


async def async_foo():
    print("async_foo started")
    await asyncio.sleep(1)
    print("async_foo done")


async def main():
    asyncio.ensure_future(async_foo())  # fire and forget async_foo()

    # btw, you can also create tasks inside non-async funcs

    print('Do some actions 1')
    await asyncio.sleep(1)
    print('Do some actions 2')
    await asyncio.sleep(1)
    print('Do some actions 3')


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Output:

Do some actions 1
async_foo started
Do some actions 2
async_foo done
Do some actions 3

What if tasks are executing after event loop complete?

Note that asyncio expects task would be completed at the moment event loop completed. So if you’ll change main() to:

async def main():
    asyncio.ensure_future(async_foo())  # fire and forget

    print('Do some actions 1')
    await asyncio.sleep(0.1)
    print('Do some actions 2')

You’ll get this warning after the program finished:

Task was destroyed but it is pending!
task: <Task pending coro=<async_foo() running at [...]

To prevent that you can just await all pending tasks after event loop completed:

async def main():
    asyncio.ensure_future(async_foo())  # fire and forget

    print('Do some actions 1')
    await asyncio.sleep(0.1)
    print('Do some actions 2')


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

    # Let's also finish all running tasks:
    pending = asyncio.Task.all_tasks()
    loop.run_until_complete(asyncio.gather(*pending))

Kill tasks instead of awaiting them

Sometimes you don’t want to await tasks to be done (for example, some tasks may be created to run forever). In that case, you can just cancel() them instead of awaiting them:

import asyncio
from contextlib import suppress


async def echo_forever():
    while True:
        print("echo")
        await asyncio.sleep(1)


async def main():
    asyncio.ensure_future(echo_forever())  # fire and forget

    print('Do some actions 1')
    await asyncio.sleep(1)
    print('Do some actions 2')
    await asyncio.sleep(1)
    print('Do some actions 3')


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

    # Let's also cancel all running tasks:
    pending = asyncio.Task.all_tasks()
    for task in pending:
        task.cancel()
        # Now we should await task to execute it's cancellation.
        # Cancelled task raises asyncio.CancelledError that we can suppress:
        with suppress(asyncio.CancelledError):
            loop.run_until_complete(task)

Output:

Do some actions 1
echo
Do some actions 2
echo
Do some actions 3
echo

回答 1

谢谢谢尔盖的简洁回答。这是相同的装饰版。

import asyncio
import time

def fire_and_forget(f):
    def wrapped(*args, **kwargs):
        return asyncio.get_event_loop().run_in_executor(None, f, *args, *kwargs)

    return wrapped

@fire_and_forget
def foo():
    time.sleep(1)
    print("foo() completed")

print("Hello")
foo()
print("I didn't wait for foo()")

产生

>>> Hello
>>> foo() started
>>> I didn't wait for foo()
>>> foo() completed

注意:检查我的其他答案,使用普通线程也可以做到这一点。

Thank you Sergey for the succint answer. Here is the decorated version of the same.

import asyncio
import time

def fire_and_forget(f):
    def wrapped(*args, **kwargs):
        return asyncio.get_event_loop().run_in_executor(None, f, *args, *kwargs)

    return wrapped

@fire_and_forget
def foo():
    time.sleep(1)
    print("foo() completed")

print("Hello")
foo()
print("I didn't wait for foo()")

Produces

>>> Hello
>>> foo() started
>>> I didn't wait for foo()
>>> foo() completed

Note: Check my other answer which does the same using plain threads.


回答 2

这不是完全异步执行,但也许run_in_executor()适合您。

def fire_and_forget(task, *args, **kwargs):
    loop = asyncio.get_event_loop()
    if callable(task):
        return loop.run_in_executor(None, task, *args, **kwargs)
    else:    
        raise TypeError('Task must be a callable')

def foo():
    #asynchronous stuff here


fire_and_forget(foo)

This is not entirely asynchronous execution, but maybe run_in_executor() is suitable for you.

def fire_and_forget(task, *args, **kwargs):
    loop = asyncio.get_event_loop()
    if callable(task):
        return loop.run_in_executor(None, task, *args, **kwargs)
    else:    
        raise TypeError('Task must be a callable')

def foo():
    #asynchronous stuff here


fire_and_forget(foo)

回答 3

由于某种原因,如果您无法使用,asyncio那么这里是使用普通线程的实现。检查我的其他答案和谢尔盖的答案。

import threading

def fire_and_forget(f):
    def wrapped():
        threading.Thread(target=f).start()

    return wrapped

@fire_and_forget
def foo():
    time.sleep(1)
    print("foo() completed")

print("Hello")
foo()
print("I didn't wait for foo()")

For some reason if you are unable to use asyncio then here is the implementation using plain threads. Check my other answers and Sergey’s answer too.

import threading

def fire_and_forget(f):
    def wrapped():
        threading.Thread(target=f).start()

    return wrapped

@fire_and_forget
def foo():
    time.sleep(1)
    print("foo() completed")

print("Hello")
foo()
print("I didn't wait for foo()")

asyncio.ensure_future与BaseEventLoop.create_task与简单协程?

问题:asyncio.ensure_future与BaseEventLoop.create_task与简单协程?

我看过一些关于asyncio的基本Python 3.5教程,以各种方式进行相同的操作。在此代码中:

import asyncio  

async def doit(i):
    print("Start %d" % i)
    await asyncio.sleep(3)
    print("End %d" % i)
    return i

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    #futures = [asyncio.ensure_future(doit(i), loop=loop) for i in range(10)]
    #futures = [loop.create_task(doit(i)) for i in range(10)]
    futures = [doit(i) for i in range(10)]
    result = loop.run_until_complete(asyncio.gather(*futures))
    print(result)

上面定义futures变量的所有三个变体都可以达到相同的结果。我可以看到的唯一区别是,在第三个变体中,执行是乱序的(在大多数情况下不重要)。还有其他区别吗?在某些情况下,我不能仅使用最简单的变体(协程的简单列表)吗?

I’ve seen several basic Python 3.5 tutorials on asyncio doing the same operation in various flavours. In this code:

import asyncio  

async def doit(i):
    print("Start %d" % i)
    await asyncio.sleep(3)
    print("End %d" % i)
    return i

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    #futures = [asyncio.ensure_future(doit(i), loop=loop) for i in range(10)]
    #futures = [loop.create_task(doit(i)) for i in range(10)]
    futures = [doit(i) for i in range(10)]
    result = loop.run_until_complete(asyncio.gather(*futures))
    print(result)

All the three variants above that define the futures variable achieve the same result; the only difference I can see is that with the third variant the execution is out of order (which should not matter in most cases). Is there any other difference? Are there cases where I can’t just use the simplest variant (plain list of coroutines)?


回答 0

实际信息:

从Python 3.7开始,为此添加asyncio.create_task(coro)高级功能。

您应该使用它代替其他从Coroutime创建任务的方式。但是,如果您需要从任意等待中创建任务,则应使用asyncio.ensure_future(obj)


旧信息:

ensure_futurecreate_task

ensure_future是一种Task从创建的方法coroutine。它基于参数(包括create_task对协程和类似未来的对象使用of)以不同的方式创建任务。

create_task是的抽象方法AbstractEventLoop。不同的事件循环可以以不同的方式实现此功能。

您应该ensure_future用来创建任务。create_task仅在要实现自己的事件循环类型时才需要。

更新:

@ bj0指向Guido对此主题的回答

的要点ensure_future()是,如果您拥有某种可能是协程或a的东西Future(后者包含a,Task因为这是的子类Future),并且您希望能够在其上调用仅在其上定义的方法Future(可能是唯一的)有用的示例cancel())。当它已经是Future(或Task)时,则不执行任何操作;当它是一个协程时,它将它包裹在一个Task

如果您知道有一个协程,并且希望对其进行调度,则使用的正确API是create_task()。唯一应该调用的时间ensure_future()是在提供接受协程或a的API(如asyncio自己的大多数API)时,Future您需要对其进行一些操作,要求您拥有a Future

然后:

最后,我仍然相信这ensure_future()是一个很少需要的功能的适当模糊的名称。从协程创建任务时,应使用适当命名的 loop.create_task()。也许应该为此起别名 asyncio.create_task()

我感到惊讶。我一直使用的主要动机ensure_future是,与loop的成员相比,它是更高层的函数create_task(讨论中包含了诸如add asyncio.spawn或的想法asyncio.create_task)。

我还可以指出,在我看来,使用可以处理任何Awaitable而非协程的通用函数非常方便。

但是,Guido的答案很明确:“从协程创建任务时,应使用名称正确的loop.create_task()

什么时候应该将协程包裹在任务中?

将协程包装在Task中-是一种在后台启动此协程的方法。例子如下:

import asyncio


async def msg(text):
    await asyncio.sleep(0.1)
    print(text)


async def long_operation():
    print('long_operation started')
    await asyncio.sleep(3)
    print('long_operation finished')


async def main():
    await msg('first')

    # Now you want to start long_operation, but you don't want to wait it finised:
    # long_operation should be started, but second msg should be printed immediately.
    # Create task to do so:
    task = asyncio.ensure_future(long_operation())

    await msg('second')

    # Now, when you want, you can await task finised:
    await task


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

输出:

first
long_operation started
second
long_operation finished

您可以替换asyncio.ensure_future(long_operation())await long_operation()来感受不同。

Actual info:

Starting from Python 3.7 asyncio.create_task(coro) high-level function was added for this purpose.

You should use it instead other ways of creating tasks from coroutimes. However if you need to create task from arbitrary awaitable, you should use asyncio.ensure_future(obj).


Old info:

ensure_future vs create_task

ensure_future is a method to create Task from coroutine. It creates tasks in different ways based on argument (including using of create_task for coroutines and future-like objects).

create_task is an abstract method of AbstractEventLoop. Different event loops can implement this function different ways.

You should use ensure_future to create tasks. You’ll need create_task only if you’re going to implement your own event loop type.

Upd:

@bj0 pointed at Guido’s answer on this topic:

The point of ensure_future() is if you have something that could either be a coroutine or a Future (the latter includes a Task because that’s a subclass of Future), and you want to be able to call a method on it that is only defined on Future (probably about the only useful example being cancel()). When it is already a Future (or Task) this does nothing; when it is a coroutine it wraps it in a Task.

If you know that you have a coroutine and you want it to be scheduled, the correct API to use is create_task(). The only time when you should be calling ensure_future() is when you are providing an API (like most of asyncio’s own APIs) that accepts either a coroutine or a Future and you need to do something to it that requires you to have a Future.

and later:

In the end I still believe that ensure_future() is an appropriately obscure name for a rarely-needed piece of functionality. When creating a task from a coroutine you should use the appropriately-named loop.create_task(). Maybe there should be an alias for that asyncio.create_task()?

It’s surprising to me. My main motivation to use ensure_future all along was that it’s higher-level function comparing to loop’s member create_task (discussion contains some ideas like adding asyncio.spawn or asyncio.create_task).

I can also point that in my opinion it’s pretty convenient to use universal function that can handle any Awaitable rather than coroutines only.

However, Guido’s answer is clear: “When creating a task from a coroutine you should use the appropriately-named loop.create_task()

When coroutines should be wrapped in tasks?

Wrap coroutine in a Task – is a way to start this coroutine “in background”. Here’s example:

import asyncio


async def msg(text):
    await asyncio.sleep(0.1)
    print(text)


async def long_operation():
    print('long_operation started')
    await asyncio.sleep(3)
    print('long_operation finished')


async def main():
    await msg('first')

    # Now you want to start long_operation, but you don't want to wait it finised:
    # long_operation should be started, but second msg should be printed immediately.
    # Create task to do so:
    task = asyncio.ensure_future(long_operation())

    await msg('second')

    # Now, when you want, you can await task finised:
    await task


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Output:

first
long_operation started
second
long_operation finished

You can replace asyncio.ensure_future(long_operation()) with just await long_operation() to feel the difference.


回答 1

create_task()

  • 接受协程,
  • 返回任务,
  • 它在循环上下文中调用。

ensure_future()

  • 接受期货,协程,等待对象,
  • 返回Task(如果Future通过,则返回Future)。
  • 如果给定的arg是协程,则使用create_task
  • 可以传递循环对象。

如您所见,create_task更具体。


async 没有create_task或sure_future的函数

简单的调用async函数返回协程

>>> async def doit(i):
...     await asyncio.sleep(3)
...     return i
>>> doit(4)   
<coroutine object doit at 0x7f91e8e80ba0>

并且由于gather幕后确保(ensure_future)args为期货,因此明确地ensure_future是多余的。

类似的问题loop.create_task,asyncio.async / ensure_future和Task有什么区别?

create_task()

  • accepts coroutines,
  • returns Task,
  • it is invoked in context of the loop.

ensure_future()

  • accepts Futures, coroutines, awaitable objects,
  • returns Task (or Future if Future passed).
  • if the given arg is a coroutine it uses create_task,
  • loop object can be passed.

As you can see the create_task is more specific.


async function without create_task or ensure_future

Simple invoking async function returns coroutine

>>> async def doit(i):
...     await asyncio.sleep(3)
...     return i
>>> doit(4)   
<coroutine object doit at 0x7f91e8e80ba0>

And since the gather under the hood ensures (ensure_future) that args are futures, explicitly ensure_future is redundant.

Similar question What’s the difference between loop.create_task, asyncio.async/ensure_future and Task?


回答 2

注意:仅对Python 3.7有效(对于Python 3.5,请参考前面的答案)。

从官方文档:

asyncio.create_task(在Python 3.7中添加)是生成新任务的替代方法,而不是ensure_future()


详情:

因此,现在,在Python 3.7及更高版本中,有2个顶级包装函数(相似但不同):

好吧,这两个包装函数最好都可以帮助您调用BaseEventLoop.create_task。唯一的区别是ensure_future接受任何awaitable对象并帮助您将其转换为Future。另外,您还可以在中提供自己的event_loop参数ensure_future。而且,根据您是否需要这些功能,您可以简单地选择要使用的包装器。

Note: Only valid for Python 3.7 (for Python 3.5 refer to the earlier answer).

From the official docs:

asyncio.create_task (added in Python 3.7) is the preferable way for spawning new tasks instead of ensure_future().


Detail:

So now, in Python 3.7 onwards, there are 2 top-level wrapper function (similar but different):

Well, utlimately both of these wrapper functions will help you call BaseEventLoop.create_task. The only difference is ensure_future accept any awaitable object and help you convert it into a Future. And also you can provide your own event_loop parameter in ensure_future. And depending if you need those capability or not, you can simply choose which wrapper to use.


回答 3

在您的示例中,所有这三种类型都是异步执行的。唯一的区别是,在第三个示例中,您预先生成了所有10个协程,然后一起提交给循环。因此只有最后一个随机输出。

for your example, all the three types execute asynchronously. the only difference is that, in the third example, you pre-generated all 10 coroutines, and submitted to the loop together. so only the last one gives output randomly.


Python 3中的多处理与多线程与异步

问题:Python 3中的多处理与多线程与异步

我发现在Python 3.4中,用于多处理/线程的库很少:多处理 vs 线程asyncio

但是我不知道使用哪个,或者是“推荐的”。他们做的是同一件事还是不同?如果是这样,则将哪一个用于什么?我想编写一个在计算机上使用多核的程序。但是我不知道我应该学习哪个图书馆。

I found that in Python 3.4 there are few different libraries for multiprocessing/threading: multiprocessing vs threading vs asyncio.

But I don’t know which one to use or is the “recommended one”. Do they do the same thing, or are different? If so, which one is used for what? I want to write a program that uses multicores in my computer. But I don’t know which library I should learn.


回答 0

它们旨在(略有)不同的目的和/或要求。CPython(典型的主线Python实现)仍然具有全局解释器锁,因此多线程应用程序(当今实现并行处理的标准方式)不是最佳选择。这就是为什么multiprocessing 可能要优先于threading。但是并不是每个问题都可以有效地分解为[几乎独立的]部分,因此可能需要大量的进程间通信。这就是为什么multiprocessing可能不被threading普遍推荐的原因。

asyncio(该技术不仅在Python中可用,其他语言和/或框架也有此技术,例如Boost.ASIO)是一种有效处理来自许多同时源的大量I / O操作而无需并行代码执行的方法。 。因此,这仅是针对特定任务的解决方案(确实是一个不错的方案!),而不是通常用于并行处理的解决方案。

They are intended for (slightly) different purposes and/or requirements. CPython (a typical, mainline Python implementation) still has the global interpreter lock so a multi-threaded application (a standard way to implement parallel processing nowadays) is suboptimal. That’s why multiprocessing may be preferred over threading. But not every problem may be effectively split into [almost independent] pieces, so there may be a need in heavy interprocess communications. That’s why multiprocessing may not be preferred over threading in general.

asyncio (this technique is available not only in Python, other languages and/or frameworks also have it, e.g. Boost.ASIO) is a method to effectively handle a lot of I/O operations from many simultaneous sources w/o need of parallel code execution. So it’s just a solution (a good one indeed!) for a particular task, not for parallel processing in general.


回答 1

[快速回答]

TL; DR

做出正确的选择:

我们介绍了最流行的并发形式。但是问题仍然存在-什么时候应该选择哪个?这实际上取决于用例。根据我的经验(和阅读),我倾向于遵循以下伪代码:

if io_bound:
    if io_very_slow:
        print("Use Asyncio")
    else:
        print("Use Threads")
else:
    print("Multi Processing")
  • CPU限制=>多处理
  • I / O绑定,快速I / O,有限的连接数=>多线程
  • I / O受限,I / O缓慢,许多连接=> Asyncio

参考


[ 注意 ]:

  • 如果您使用的是长调用方法(即,包含在睡眠时间或惰性I / O中的方法),则最佳选择是asyncioTwistedTornado方法(协程方法),该方法可以与单个线程并发工作。
  • asyncio适用于Python3.4及更高版本。
  • 自从Python2.7开始,TornadoTwisted已经准备就绪
  • uvloop是超快速asyncio事件循环(uvloop使asyncio速度提高2-4倍)。

[更新(2019)]:

  • Japranto GitHub是一个基于uvloop的非常快速的管道HTTP服务器。

[Quick Answer]

TL;DR

Making the Right Choice:

We have walked through the most popular forms of concurrency. But the question remains – when should choose which one? It really depends on the use cases. From my experience (and reading), I tend to follow this pseudo code:

if io_bound:
    if io_very_slow:
        print("Use Asyncio")
    else:
        print("Use Threads")
else:
    print("Multi Processing")
  • CPU Bound => Multi Processing
  • I/O Bound, Fast I/O, Limited Number of Connections => Multi Threading
  • I/O Bound, Slow I/O, Many connections => Asyncio

Reference


[NOTE]:

  • If you have a long call method (i.e. a method that contained with a sleep time or lazy I/O), the best choice is asyncio, Twisted or Tornado approach (coroutine methods), that works with a single thread as concurrency.
  • asyncio works on Python3.4 and later.
  • Tornado and Twisted are ready since Python2.7
  • uvloop is ultra fast asyncio event loop (uvloop makes asyncio 2-4x faster).

[UPDATE (2019)]:

  • Japranto (GitHub) is a very fast pipelining HTTP server based on uvloop.

回答 2

这是基本思想:

IO- BOUND吗?———>使用asyncio

它是CPU- HEAVY吗?—–>使用multiprocessing

其他吗?———————->使用threading

因此,除非您遇到IO / CPU问题,否则基本上要坚持使用线程。

This is the basic idea:

Is it IO-BOUND ? ———> USE asyncio

IS IT CPU-HEAVY ? —–> USE multiprocessing

ELSE ? ———————-> USE threading

So basically stick to threading unless you have IO/CPU problems.


回答 3

多处理中,您利用多个CPU来分配您的计算。由于每个CPU并行运行,因此您可以有效地同时运行多个任务。您可能希望对CPU绑定的任务使用多处理。一个示例将尝试计算巨大列表中所有元素的总和。如果您的计算机具有8个核心,则可以将列表“切割”为8个较小的列表,并分别在单独的核心上计算每个列表的总和,然后将这些数字相加即可。这样您将获得约8倍的加速。

穿线您不需要多个CPU。想象一个程序向网络发送大量HTTP请求。如果使用单线程程序,它将在每个请求处停止执行(块),等待响应,然后在收到响应后继续执行。这里的问题是,在等待某些外部服务器执行任务时,您的CPU并未真正在工作。同时,它实际上可以做一些有用的工作!解决方法是使用线程-您可以创建多个线程,每个线程负责从Web请求一些内容。关于线程的好处是,即使它们在一个CPU上运行,CPU也会不时地“冻结”一个线程的执行并跳转到执行另一个线程(这称为上下文切换,并且它在不确定性下不断发生)间隔)。 -使用线程。

asyncio本质上是线程化,而不是CPU,而是由您(作为程序员(或实际上是您的应用程序))决定上下文切换的时间和地点。在Python中,您可以使用await关键字来暂停协程的执行(使用async关键字定义)。

In multiprocessing you leverage multiple CPUs to distribute your calculations. Since each of the CPUs runs in parallel, you’re effectively able to run multiple tasks simultaneously. You would want to use multiprocessing for CPU-bound tasks. An example would be trying to calculate a sum of all elements of a huge list. If your machine has 8 cores, you can “cut” the list into 8 smaller lists and calculate the sum of each of those lists separately on separate core and then just add up those numbers. You’ll get a ~8x speedup by doing that.

In (multi)threading you don’t need multiple CPUs. Imagine a program that sends lots of HTTP requests to the web. If you used a single-threaded program, it would stop the execution (block) at each request, wait for a response, and then continue once received a response. The problem here is that your CPU isn’t really doing work while waiting for some external server to do the job; it could have actually done some useful work in the meantime! The fix is to use threads – you can create many of them, each responsible for requesting some content from the web. The nice thing about threads is that, even if they run on one CPU, the CPU from time to time “freezes” the execution of one thread and jumps to executing the other one (it’s called context switching and it happens constantly at non-deterministic intervals). So if your task is I/O bound – use threading.

asyncio is essentially threading where not the CPU but you, as a programmer (or actually your application), decide where and when does the context switch happen. In Python you use an await keyword to suspend the execution of your coroutine (defined using async keyword).


协程和Python 3.5中的future / task之间的区别?

问题:协程和Python 3.5中的future / task之间的区别?

假设我们有一个虚拟函数:

async def foo(arg):
    result = await some_remote_call(arg)
    return result.upper()

之间有什么区别:

coros = []
for i in range(5):
    coros.append(foo(i))

loop = get_event_loop()
loop.run_until_complete(wait(coros))

和:

from asyncio import ensure_future

futures = []
for i in range(5):
    futures.append(ensure_future(foo(i)))

loop = get_event_loop()
loop.run_until_complete(wait(futures))

注意:该示例返回结果,但这不是问题的重点。如果返回值很重要,请使用gather()代替wait()

无论返回值如何,我都希望在上保持清晰ensure_future()wait(coros)并且wait(futures)都运行协程,那么何时以及为什么要包装协程ensure_future

基本上,使用Python 3.5运行一堆非阻塞操作的正确方法(tm)是async什么?

为了获得额外的抵免额,如果我要批量处理电话,该怎么办?例如,我需要拨打some_remote_call(...)1000次,但我不想同时连接1000个连接而粉碎Web服务器/数据库/等。这对于线程或进程池是可行的,但是有没有办法做到这一点asyncio

Let’s say we have a dummy function:

async def foo(arg):
    result = await some_remote_call(arg)
    return result.upper()

What’s the difference between:

import asyncio    

coros = []
for i in range(5):
    coros.append(foo(i))

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(coros))

And:

import asyncio

futures = []
for i in range(5):
    futures.append(asyncio.ensure_future(foo(i)))

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(futures))

Note: The example returns a result, but this isn’t the focus of the question. When return value matters, use gather() instead of wait().

Regardless of return value, I’m looking for clarity on ensure_future(). wait(coros) and wait(futures) both run the coroutines, so when and why should a coroutine be wrapped in ensure_future?

Basically, what’s the Right Way ™ to run a bunch of non-blocking operations using Python 3.5’s async?

For extra credit, what if I want to batch the calls? For example, I need to call some_remote_call(...) 1000 times, but I don’t want to crush the web server/database/etc with 1000 simultaneous connections. This is doable with a thread or process pool, but is there a way to do this with asyncio?

2020 update (Python 3.7+): Don’t use these snippets. Instead use:

import asyncio

async def do_something_async():
    tasks = []
    for i in range(5):
        tasks.append(asyncio.create_task(foo(i)))
    await asyncio.gather(*tasks)

def do_something():
    asyncio.run(do_something_async)

Also consider using Trio, a robust 3rd party alternative to asyncio.


回答 0

协程是生成器函数,它既可以产生值也可以从外部接受值。使用协程的好处是我们可以暂停函数的执行并在以后恢复它。在网络操作的情况下,在我们等待响应的同时暂停函数的执行是有意义的。我们可以花时间运行其他功能。

未来就像PromiseJavascript中的对象一样。它就像一个占位符,代表着将在未来实现的价值。在上述情况下,在等待网络I / O时,一个函数可以给我们一个容器,保证在操作完成时它将用值填充该容器。我们保留了将来的对象,当它满足时,我们可以在其上调用方法以检索实际结果。

直接回答:你并不需要ensure_future,如果你不想要的结果。如果您需要结果或检索发生的异常,它们会很好。

额外积分:我将选择run_in_executor并传递一个Executor实例来控制最大工人数。

说明和示例代码

在第一个示例中,您正在使用协程。该wait函数接收一堆协程并将它们组合在一起。这样就wait()完成了所有协程的耗尽(返回所有值的完成/完成)。

loop = get_event_loop() # 
loop.run_until_complete(wait(coros))

run_until_complete方法将确保循环有效直到执行完成。请注意在这种情况下您如何无法获得异步执行的结果。

在第二个示例中,您将使用ensure_future函数包装协程并返回的Task对象Future。协程计划在您调用时在主事件循环中执行ensure_future。返回的future / task对象还没有值,但是随着时间的推移,当网络操作完成时,future对象将保存操作的结果。

from asyncio import ensure_future

futures = []
for i in range(5):
    futures.append(ensure_future(foo(i)))

loop = get_event_loop()
loop.run_until_complete(wait(futures))

因此,在此示例中,我们正在做相同的事情,除了使用期货而不是仅使用协程。

让我们看一下如何使用asyncio /协程/期货的示例:

import asyncio


async def slow_operation():
    await asyncio.sleep(1)
    return 'Future is done!'


def got_result(future):
    print(future.result())

    # We have result, so let's stop
    loop.stop()


loop = asyncio.get_event_loop()
task = loop.create_task(slow_operation())
task.add_done_callback(got_result)

# We run forever
loop.run_forever()

在这里,我们create_taskloop对象上使用了方法。ensure_future将在主事件循环中安排任务。这种方法使我们能够在选择的循环中安排协程。

我们还看到了add_done_callback在任务对象上使用方法添加回调的概念。

A Taskdone当协程返回值,引发异常或被取消时。有检查这些事件的方法。

我写了一些有关这些主题的博客文章,可能会有所帮助:

当然,您可以在官方手册上找到更多详细信息:https : //docs.python.org/3/library/asyncio.html

A coroutine is a generator function that can both yield values and accept values from the outside. The benefit of using a coroutine is that we can pause the execution of a function and resume it later. In case of a network operation, it makes sense to pause the execution of a function while we’re waiting for the response. We can use the time to run some other functions.

A future is like the Promise objects from Javascript. It is like a placeholder for a value that will be materialized in the future. In the above-mentioned case, while waiting on network I/O, a function can give us a container, a promise that it will fill the container with the value when the operation completes. We hold on to the future object and when it’s fulfilled, we can call a method on it to retrieve the actual result.

Direct Answer: You don’t need ensure_future if you don’t need the results. They are good if you need the results or retrieve exceptions occurred.

Extra Credits: I would choose run_in_executor and pass an Executor instance to control the number of max workers.

Explanations and Sample codes

In the first example, you are using coroutines. The wait function takes a bunch of coroutines and combines them together. So wait() finishes when all the coroutines are exhausted (completed/finished returning all the values).

loop = get_event_loop() # 
loop.run_until_complete(wait(coros))

The run_until_complete method would make sure that the loop is alive until the execution is finished. Please notice how you are not getting the results of the async execution in this case.

In the second example, you are using the ensure_future function to wrap a coroutine and return a Task object which is a kind of Future. The coroutine is scheduled to be executed in the main event loop when you call ensure_future. The returned future/task object doesn’t yet have a value but over time, when the network operations finish, the future object will hold the result of the operation.

from asyncio import ensure_future

futures = []
for i in range(5):
    futures.append(ensure_future(foo(i)))

loop = get_event_loop()
loop.run_until_complete(wait(futures))

So in this example, we’re doing the same thing except we’re using futures instead of just using coroutines.

Let’s look at an example of how to use asyncio/coroutines/futures:

import asyncio


async def slow_operation():
    await asyncio.sleep(1)
    return 'Future is done!'


def got_result(future):
    print(future.result())

    # We have result, so let's stop
    loop.stop()


loop = asyncio.get_event_loop()
task = loop.create_task(slow_operation())
task.add_done_callback(got_result)

# We run forever
loop.run_forever()

Here, we have used the create_task method on the loop object. ensure_future would schedule the task in the main event loop. This method enables us to schedule a coroutine on a loop we choose.

We also see the concept of adding a callback using the add_done_callback method on the task object.

A Task is done when the coroutine returns a value, raises an exception or gets canceled. There are methods to check these incidents.

I have written some blog posts on these topics which might help:

Of course, you can find more details on the official manual: https://docs.python.org/3/library/asyncio.html


回答 1

简单的答案

  • 调用协程函数(async def)不会运行它。它返回一个协程对象,就像生成器函数返回生成器对象一样。
  • await 从协程中检索值,即“调用”协程
  • eusure_future/create_task 安排协程在下一次迭代时在事件循环上运行(尽管不等待它们完成,就像守护线程一样)。

一些代码示例

让我们先清除一些术语:

  • 协程功能,您所需要async def的;
  • 协程对象,当您“调用”协程函数时得到的内容;
  • 任务,一个包裹在协程对象上的对象在事件循环上运行。

案例1,await在协程上

我们创建两个协程,await一个协程,并用于create_task运行另一个协程。

import asyncio
import time

# coroutine function
async def p(word):
    print(f'{time.time()} - {word}')


async def main():
    loop = asyncio.get_event_loop()
    coro = p('await')  # coroutine
    task2 = loop.create_task(p('create_task'))  # <- runs in next iteration
    await coro  # <-- run directly
    await task2

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

您将得到结果:

1539486251.7055213 - await
1539486251.7055705 - create_task

说明:

task1直接执行,而task2在以下迭代中执行。

情况2,将控制权交给事件循环

如果替换main函数,则会看到不同的结果:

async def main():
    loop = asyncio.get_event_loop()
    coro = p('await')
    task2 = loop.create_task(p('create_task'))  # scheduled to next iteration
    await asyncio.sleep(1)  # loop got control, and runs task2
    await coro  # run coro
    await task2

您将得到结果:

-> % python coro.py
1539486378.5244057 - create_task
1539486379.5252144 - await  # note the delay

说明:

调用时asyncio.sleep(1),该控件已退回到事件循环,该循环检查要运行的任务,然后运行由创建的任务create_task

请注意,我们首先调用协程函数,但不是await,因此,我们只创建了一个协程,而不使其运行。然后,我们再次调用协程函数,并将其包装在create_task调用中,creat_task实际上将调度协程在下一次迭代中运行。因此,结果create task是在之前执行await

实际上,这里的重点是将控制权交还给循环,您可以asyncio.sleep(0)用来查看相同的结果。

引擎盖下

loop.create_task实际通话asyncio.tasks.Task(),将会通话loop.call_soon。并将loop.call_soon任务放入loop._ready。在循环的每次迭代期间,它将检查loop._ready中的每个回调并运行它。

asyncio.waitasyncio.ensure_future并且asyncio.gather实际上loop.create_task直接或间接调用。

另请注意文档

回调按注册顺序调用。每个回调将仅被调用一次。

Simple answer

  • Invoking a coroutine function(async def) does NOT run it. It returns a coroutine objects, like generator function returns generator objects.
  • await retrieves values from coroutines, i.e. “calls” the coroutine
  • eusure_future/create_task schedule the coroutine to run on the event loop on next iteration(although not waiting them to finish, like a daemon thread).

Some code examples

Let’s first clear some terms:

  • coroutine function, the one you async defs;
  • coroutine object, what you got when you “call” a coroutine function;
  • task, a object wrapped around a coroutine object to run on the event loop.

Case 1, await on a coroutine

We create two coroutines, await one, and use create_task to run the other one.

import asyncio
import time

# coroutine function
async def p(word):
    print(f'{time.time()} - {word}')


async def main():
    loop = asyncio.get_event_loop()
    coro = p('await')  # coroutine
    task2 = loop.create_task(p('create_task'))  # <- runs in next iteration
    await coro  # <-- run directly
    await task2

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

you will get result:

1539486251.7055213 - await
1539486251.7055705 - create_task

Explain:

task1 was executed directly, and task2 was executed in the following iteration.

Case 2, yielding control to event loop

If we replace the main function, we can see a different result:

async def main():
    loop = asyncio.get_event_loop()
    coro = p('await')
    task2 = loop.create_task(p('create_task'))  # scheduled to next iteration
    await asyncio.sleep(1)  # loop got control, and runs task2
    await coro  # run coro
    await task2

you will get result:

-> % python coro.py
1539486378.5244057 - create_task
1539486379.5252144 - await  # note the delay

Explain:

When calling asyncio.sleep(1), the control was yielded back to the event loop, and the loop checks for tasks to run, then it runs the task created by create_task.

Note that, we first invoke the coroutine function, but not await it, so we just created a single coroutine, and not make it running. Then, we call the coroutine function again, and wrap it in a create_task call, creat_task will actually schedule the coroutine to run on next iteration. So, in the result, create task is executed before await.

Actually, the point here is to give back control to the loop, you could use asyncio.sleep(0) to see the same result.

Under the hood

loop.create_task actually calls asyncio.tasks.Task(), which will call loop.call_soon. And loop.call_soon will put the task in loop._ready. During each iteration of the loop, it checks for every callbacks in loop._ready and runs it.

asyncio.wait, asyncio.ensure_future and asyncio.gather actually call loop.create_task directly or indirectly.

Also note in the docs:

Callbacks are called in the order in which they are registered. Each callback will be called exactly once.


回答 2

文森特(Vincent)的评论链接到https://github.com/python/asyncio/blob/master/asyncio/tasks.py#L346,显示为您wait()包装了协程ensure_future()

换句话说,我们确实需要未来,协程将默默地转变为它们。

当我找到有关如何协程/期货的明确解释时,我将更新此答案。

A comment by Vincent linked to https://github.com/python/asyncio/blob/master/asyncio/tasks.py#L346, which shows that wait() wraps the coroutines in ensure_future() for you!

In other words, we do need a future, and coroutines will be silently transformed into them.

I’ll update this answer when I find a definitive explanation of how to batch coroutines/futures.


回答 3

摘自BDFL [2013]

任务

  • 这是包裹在未来的协程
  • Task类是Future类的子类
  • 因此,它与工作的await呢!

  • 它与裸协程有何不同?
  • 无需等待就可以取得进步
    • 只要您等待其他事情,即
      • 等待 [something_else]

考虑到这一点,ensure_future将其作为创建任务的名称是有意义的,因为无论您是否等待它(只要您等待某事),都会计算Future的结果。这使事件循环可以在您等待其他事情时完成您的任务。请注意,Python 3.7 create_task确保未来的首选方法。

注意:出于现代性的考虑,我将Guido幻灯片中的“收益率从”更改为“等待”。

From the BDFL [2013]

Tasks

  • It’s a coroutine wrapped in a Future
  • class Task is a subclass of class Future
  • So it works with await too!

  • How does it differ from a bare coroutine?
  • It can make progress without waiting for it
    • As long as you wait for something else, i.e.
      • await [something_else]

With this in mind, ensure_future makes sense as a name for creating a Task since the Future’s result will be computed whether or not you await it (as long as you await something). This allows the event loop to complete your Task while you’re waiting on other things. Note that in Python 3.7 create_task is the preferred way ensure a future.

Note: I changed “yield from” in Guido’s slides to “await” here for modernity.


Asyncio.gather vs asyncio.wait

问题:Asyncio.gather vs asyncio.wait

asyncio.gather并且asyncio.wait似乎有类似的用法:我有一堆我想执行/等待的异步事情(不一定要等到下一个开始之前完成)。它们使用不同的语法,并且在某些细节上有所不同,但是对我来说,拥有2个功能在功能上有如此大的重叠是非常不切实际的。我想念什么?

asyncio.gather and asyncio.wait seem to have similar uses: I have a bunch of async things that I want to execute/wait for (not necessarily waiting for one to finish before the next one starts). They use a different syntax, and differ in some details, but it seems very un-pythonic to me to have 2 functions that have such a huge overlap in functionality. What am I missing?


回答 0

尽管在一般情况下类似(“为许多任务运行并获取结果”),但是对于其他情况,每个功能都有一些特定的功能:

asyncio.gather()

返回一个Future实例,允许高层任务分组:

import asyncio
from pprint import pprint

import random


async def coro(tag):
    print(">", tag)
    await asyncio.sleep(random.uniform(1, 3))
    print("<", tag)
    return tag


loop = asyncio.get_event_loop()

group1 = asyncio.gather(*[coro("group 1.{}".format(i)) for i in range(1, 6)])
group2 = asyncio.gather(*[coro("group 2.{}".format(i)) for i in range(1, 4)])
group3 = asyncio.gather(*[coro("group 3.{}".format(i)) for i in range(1, 10)])

all_groups = asyncio.gather(group1, group2, group3)

results = loop.run_until_complete(all_groups)

loop.close()

pprint(results)

群组中的所有任务都可以通过调用group2.cancel()甚至取消all_groups.cancel()。另见.gather(..., return_exceptions=True)

asyncio.wait()

支持在完成第一个任务后或在指定的超时后等待停止,从而降低了操作的精度:

import asyncio
import random


async def coro(tag):
    print(">", tag)
    await asyncio.sleep(random.uniform(0.5, 5))
    print("<", tag)
    return tag


loop = asyncio.get_event_loop()

tasks = [coro(i) for i in range(1, 11)]

print("Get first result:")
finished, unfinished = loop.run_until_complete(
    asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED))

for task in finished:
    print(task.result())
print("unfinished:", len(unfinished))

print("Get more results in 2 seconds:")
finished2, unfinished2 = loop.run_until_complete(
    asyncio.wait(unfinished, timeout=2))

for task in finished2:
    print(task.result())
print("unfinished2:", len(unfinished2))

print("Get all other results:")
finished3, unfinished3 = loop.run_until_complete(asyncio.wait(unfinished2))

for task in finished3:
    print(task.result())

loop.close()

Although similar in general cases (“run and get results for many tasks”), each function has some specific functionality for other cases:

asyncio.gather()

Returns a Future instance, allowing high level grouping of tasks:

import asyncio
from pprint import pprint

import random


async def coro(tag):
    print(">", tag)
    await asyncio.sleep(random.uniform(1, 3))
    print("<", tag)
    return tag


loop = asyncio.get_event_loop()

group1 = asyncio.gather(*[coro("group 1.{}".format(i)) for i in range(1, 6)])
group2 = asyncio.gather(*[coro("group 2.{}".format(i)) for i in range(1, 4)])
group3 = asyncio.gather(*[coro("group 3.{}".format(i)) for i in range(1, 10)])

all_groups = asyncio.gather(group1, group2, group3)

results = loop.run_until_complete(all_groups)

loop.close()

pprint(results)

All tasks in a group can be cancelled by calling group2.cancel() or even all_groups.cancel(). See also .gather(..., return_exceptions=True),

asyncio.wait()

Supports waiting to be stopped after the first task is done, or after a specified timeout, allowing lower level precision of operations:

import asyncio
import random


async def coro(tag):
    print(">", tag)
    await asyncio.sleep(random.uniform(0.5, 5))
    print("<", tag)
    return tag


loop = asyncio.get_event_loop()

tasks = [coro(i) for i in range(1, 11)]

print("Get first result:")
finished, unfinished = loop.run_until_complete(
    asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED))

for task in finished:
    print(task.result())
print("unfinished:", len(unfinished))

print("Get more results in 2 seconds:")
finished2, unfinished2 = loop.run_until_complete(
    asyncio.wait(unfinished, timeout=2))

for task in finished2:
    print(task.result())
print("unfinished2:", len(unfinished2))

print("Get all other results:")
finished3, unfinished3 = loop.run_until_complete(asyncio.wait(unfinished2))

for task in finished3:
    print(task.result())

loop.close()

回答 1

asyncio.waitasyncio.gather

顾名思义,asyncio.gather主要集中在收集结果上。它等待一堆期货,并以给定的顺序返回其结果。

asyncio.wait只是等待期货。而不是直接给您结果,而是完成和待处理的任务。您必须手动收集值。

此外,您可以指定等待所有期货完成,或者仅等待第一个期货wait

asyncio.wait is more low level than asyncio.gather.

As the name suggests, asyncio.gather mainly focuses on gathering the results. it waits on a bunch of futures and return their results in a given order.

asyncio.wait just waits on the futures. and instead of giving you the results directly, it gives done and pending tasks. you have to mannually collect the values.

Moreover, you could specify to wait for all futures to finish or the just the first one with wait.


回答 2

我还注意到,您可以通过简单地指定列表来在wait()中提供一组协程:

result=loop.run_until_complete(asyncio.wait([
        say('first hello', 2),
        say('second hello', 1),
        say('third hello', 4)
    ]))

而通过仅指定多个协程来完成对collect()的分组:

result=loop.run_until_complete(asyncio.gather(
        say('first hello', 2),
        say('second hello', 1),
        say('third hello', 4)
    ))

I also noticed that you can provide a group of coroutines in wait() by simply specifying the list:

result=loop.run_until_complete(asyncio.wait([
        say('first hello', 2),
        say('second hello', 1),
        say('third hello', 4)
    ]))

Whereas grouping in gather() is done by just specifying multiple coroutines:

result=loop.run_until_complete(asyncio.gather(
        say('first hello', 2),
        say('second hello', 1),
        say('third hello', 4)
    ))