协程和Python 3.5中的future / task之间的区别?

问题:协程和Python 3.5中的future / task之间的区别?

假设我们有一个虚拟函数:

async def foo(arg):
    result = await some_remote_call(arg)
    return result.upper()

之间有什么区别:

coros = []
for i in range(5):
    coros.append(foo(i))

loop = get_event_loop()
loop.run_until_complete(wait(coros))

和:

from asyncio import ensure_future

futures = []
for i in range(5):
    futures.append(ensure_future(foo(i)))

loop = get_event_loop()
loop.run_until_complete(wait(futures))

注意:该示例返回结果,但这不是问题的重点。如果返回值很重要,请使用gather()代替wait()

无论返回值如何,我都希望在上保持清晰ensure_future()wait(coros)并且wait(futures)都运行协程,那么何时以及为什么要包装协程ensure_future

基本上,使用Python 3.5运行一堆非阻塞操作的正确方法(tm)是async什么?

为了获得额外的抵免额,如果我要批量处理电话,该怎么办?例如,我需要拨打some_remote_call(...)1000次,但我不想同时连接1000个连接而粉碎Web服务器/数据库/等。这对于线程或进程池是可行的,但是有没有办法做到这一点asyncio

Let’s say we have a dummy function:

async def foo(arg):
    result = await some_remote_call(arg)
    return result.upper()

What’s the difference between:

import asyncio    

coros = []
for i in range(5):
    coros.append(foo(i))

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(coros))

And:

import asyncio

futures = []
for i in range(5):
    futures.append(asyncio.ensure_future(foo(i)))

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(futures))

Note: The example returns a result, but this isn’t the focus of the question. When return value matters, use gather() instead of wait().

Regardless of return value, I’m looking for clarity on ensure_future(). wait(coros) and wait(futures) both run the coroutines, so when and why should a coroutine be wrapped in ensure_future?

Basically, what’s the Right Way ™ to run a bunch of non-blocking operations using Python 3.5’s async?

For extra credit, what if I want to batch the calls? For example, I need to call some_remote_call(...) 1000 times, but I don’t want to crush the web server/database/etc with 1000 simultaneous connections. This is doable with a thread or process pool, but is there a way to do this with asyncio?

2020 update (Python 3.7+): Don’t use these snippets. Instead use:

import asyncio

async def do_something_async():
    tasks = []
    for i in range(5):
        tasks.append(asyncio.create_task(foo(i)))
    await asyncio.gather(*tasks)

def do_something():
    asyncio.run(do_something_async)

Also consider using Trio, a robust 3rd party alternative to asyncio.


回答 0

协程是生成器函数,它既可以产生值也可以从外部接受值。使用协程的好处是我们可以暂停函数的执行并在以后恢复它。在网络操作的情况下,在我们等待响应的同时暂停函数的执行是有意义的。我们可以花时间运行其他功能。

未来就像PromiseJavascript中的对象一样。它就像一个占位符,代表着将在未来实现的价值。在上述情况下,在等待网络I / O时,一个函数可以给我们一个容器,保证在操作完成时它将用值填充该容器。我们保留了将来的对象,当它满足时,我们可以在其上调用方法以检索实际结果。

直接回答:你并不需要ensure_future,如果你不想要的结果。如果您需要结果或检索发生的异常,它们会很好。

额外积分:我将选择run_in_executor并传递一个Executor实例来控制最大工人数。

说明和示例代码

在第一个示例中,您正在使用协程。该wait函数接收一堆协程并将它们组合在一起。这样就wait()完成了所有协程的耗尽(返回所有值的完成/完成)。

loop = get_event_loop() # 
loop.run_until_complete(wait(coros))

run_until_complete方法将确保循环有效直到执行完成。请注意在这种情况下您如何无法获得异步执行的结果。

在第二个示例中,您将使用ensure_future函数包装协程并返回的Task对象Future。协程计划在您调用时在主事件循环中执行ensure_future。返回的future / task对象还没有值,但是随着时间的推移,当网络操作完成时,future对象将保存操作的结果。

from asyncio import ensure_future

futures = []
for i in range(5):
    futures.append(ensure_future(foo(i)))

loop = get_event_loop()
loop.run_until_complete(wait(futures))

因此,在此示例中,我们正在做相同的事情,除了使用期货而不是仅使用协程。

让我们看一下如何使用asyncio /协程/期货的示例:

import asyncio


async def slow_operation():
    await asyncio.sleep(1)
    return 'Future is done!'


def got_result(future):
    print(future.result())

    # We have result, so let's stop
    loop.stop()


loop = asyncio.get_event_loop()
task = loop.create_task(slow_operation())
task.add_done_callback(got_result)

# We run forever
loop.run_forever()

在这里,我们create_taskloop对象上使用了方法。ensure_future将在主事件循环中安排任务。这种方法使我们能够在选择的循环中安排协程。

我们还看到了add_done_callback在任务对象上使用方法添加回调的概念。

A Taskdone当协程返回值,引发异常或被取消时。有检查这些事件的方法。

我写了一些有关这些主题的博客文章,可能会有所帮助:

当然,您可以在官方手册上找到更多详细信息:https : //docs.python.org/3/library/asyncio.html

A coroutine is a generator function that can both yield values and accept values from the outside. The benefit of using a coroutine is that we can pause the execution of a function and resume it later. In case of a network operation, it makes sense to pause the execution of a function while we’re waiting for the response. We can use the time to run some other functions.

A future is like the Promise objects from Javascript. It is like a placeholder for a value that will be materialized in the future. In the above-mentioned case, while waiting on network I/O, a function can give us a container, a promise that it will fill the container with the value when the operation completes. We hold on to the future object and when it’s fulfilled, we can call a method on it to retrieve the actual result.

Direct Answer: You don’t need ensure_future if you don’t need the results. They are good if you need the results or retrieve exceptions occurred.

Extra Credits: I would choose run_in_executor and pass an Executor instance to control the number of max workers.

Explanations and Sample codes

In the first example, you are using coroutines. The wait function takes a bunch of coroutines and combines them together. So wait() finishes when all the coroutines are exhausted (completed/finished returning all the values).

loop = get_event_loop() # 
loop.run_until_complete(wait(coros))

The run_until_complete method would make sure that the loop is alive until the execution is finished. Please notice how you are not getting the results of the async execution in this case.

In the second example, you are using the ensure_future function to wrap a coroutine and return a Task object which is a kind of Future. The coroutine is scheduled to be executed in the main event loop when you call ensure_future. The returned future/task object doesn’t yet have a value but over time, when the network operations finish, the future object will hold the result of the operation.

from asyncio import ensure_future

futures = []
for i in range(5):
    futures.append(ensure_future(foo(i)))

loop = get_event_loop()
loop.run_until_complete(wait(futures))

So in this example, we’re doing the same thing except we’re using futures instead of just using coroutines.

Let’s look at an example of how to use asyncio/coroutines/futures:

import asyncio


async def slow_operation():
    await asyncio.sleep(1)
    return 'Future is done!'


def got_result(future):
    print(future.result())

    # We have result, so let's stop
    loop.stop()


loop = asyncio.get_event_loop()
task = loop.create_task(slow_operation())
task.add_done_callback(got_result)

# We run forever
loop.run_forever()

Here, we have used the create_task method on the loop object. ensure_future would schedule the task in the main event loop. This method enables us to schedule a coroutine on a loop we choose.

We also see the concept of adding a callback using the add_done_callback method on the task object.

A Task is done when the coroutine returns a value, raises an exception or gets canceled. There are methods to check these incidents.

I have written some blog posts on these topics which might help:

Of course, you can find more details on the official manual: https://docs.python.org/3/library/asyncio.html


回答 1

简单的答案

  • 调用协程函数(async def)不会运行它。它返回一个协程对象,就像生成器函数返回生成器对象一样。
  • await 从协程中检索值,即“调用”协程
  • eusure_future/create_task 安排协程在下一次迭代时在事件循环上运行(尽管不等待它们完成,就像守护线程一样)。

一些代码示例

让我们先清除一些术语:

  • 协程功能,您所需要async def的;
  • 协程对象,当您“调用”协程函数时得到的内容;
  • 任务,一个包裹在协程对象上的对象在事件循环上运行。

案例1,await在协程上

我们创建两个协程,await一个协程,并用于create_task运行另一个协程。

import asyncio
import time

# coroutine function
async def p(word):
    print(f'{time.time()} - {word}')


async def main():
    loop = asyncio.get_event_loop()
    coro = p('await')  # coroutine
    task2 = loop.create_task(p('create_task'))  # <- runs in next iteration
    await coro  # <-- run directly
    await task2

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

您将得到结果:

1539486251.7055213 - await
1539486251.7055705 - create_task

说明:

task1直接执行,而task2在以下迭代中执行。

情况2,将控制权交给事件循环

如果替换main函数,则会看到不同的结果:

async def main():
    loop = asyncio.get_event_loop()
    coro = p('await')
    task2 = loop.create_task(p('create_task'))  # scheduled to next iteration
    await asyncio.sleep(1)  # loop got control, and runs task2
    await coro  # run coro
    await task2

您将得到结果:

-> % python coro.py
1539486378.5244057 - create_task
1539486379.5252144 - await  # note the delay

说明:

调用时asyncio.sleep(1),该控件已退回到事件循环,该循环检查要运行的任务,然后运行由创建的任务create_task

请注意,我们首先调用协程函数,但不是await,因此,我们只创建了一个协程,而不使其运行。然后,我们再次调用协程函数,并将其包装在create_task调用中,creat_task实际上将调度协程在下一次迭代中运行。因此,结果create task是在之前执行await

实际上,这里的重点是将控制权交还给循环,您可以asyncio.sleep(0)用来查看相同的结果。

引擎盖下

loop.create_task实际通话asyncio.tasks.Task(),将会通话loop.call_soon。并将loop.call_soon任务放入loop._ready。在循环的每次迭代期间,它将检查loop._ready中的每个回调并运行它。

asyncio.waitasyncio.ensure_future并且asyncio.gather实际上loop.create_task直接或间接调用。

另请注意文档

回调按注册顺序调用。每个回调将仅被调用一次。

Simple answer

  • Invoking a coroutine function(async def) does NOT run it. It returns a coroutine objects, like generator function returns generator objects.
  • await retrieves values from coroutines, i.e. “calls” the coroutine
  • eusure_future/create_task schedule the coroutine to run on the event loop on next iteration(although not waiting them to finish, like a daemon thread).

Some code examples

Let’s first clear some terms:

  • coroutine function, the one you async defs;
  • coroutine object, what you got when you “call” a coroutine function;
  • task, a object wrapped around a coroutine object to run on the event loop.

Case 1, await on a coroutine

We create two coroutines, await one, and use create_task to run the other one.

import asyncio
import time

# coroutine function
async def p(word):
    print(f'{time.time()} - {word}')


async def main():
    loop = asyncio.get_event_loop()
    coro = p('await')  # coroutine
    task2 = loop.create_task(p('create_task'))  # <- runs in next iteration
    await coro  # <-- run directly
    await task2

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

you will get result:

1539486251.7055213 - await
1539486251.7055705 - create_task

Explain:

task1 was executed directly, and task2 was executed in the following iteration.

Case 2, yielding control to event loop

If we replace the main function, we can see a different result:

async def main():
    loop = asyncio.get_event_loop()
    coro = p('await')
    task2 = loop.create_task(p('create_task'))  # scheduled to next iteration
    await asyncio.sleep(1)  # loop got control, and runs task2
    await coro  # run coro
    await task2

you will get result:

-> % python coro.py
1539486378.5244057 - create_task
1539486379.5252144 - await  # note the delay

Explain:

When calling asyncio.sleep(1), the control was yielded back to the event loop, and the loop checks for tasks to run, then it runs the task created by create_task.

Note that, we first invoke the coroutine function, but not await it, so we just created a single coroutine, and not make it running. Then, we call the coroutine function again, and wrap it in a create_task call, creat_task will actually schedule the coroutine to run on next iteration. So, in the result, create task is executed before await.

Actually, the point here is to give back control to the loop, you could use asyncio.sleep(0) to see the same result.

Under the hood

loop.create_task actually calls asyncio.tasks.Task(), which will call loop.call_soon. And loop.call_soon will put the task in loop._ready. During each iteration of the loop, it checks for every callbacks in loop._ready and runs it.

asyncio.wait, asyncio.ensure_future and asyncio.gather actually call loop.create_task directly or indirectly.

Also note in the docs:

Callbacks are called in the order in which they are registered. Each callback will be called exactly once.


回答 2

文森特(Vincent)的评论链接到https://github.com/python/asyncio/blob/master/asyncio/tasks.py#L346,显示为您wait()包装了协程ensure_future()

换句话说,我们确实需要未来,协程将默默地转变为它们。

当我找到有关如何协程/期货的明确解释时,我将更新此答案。

A comment by Vincent linked to https://github.com/python/asyncio/blob/master/asyncio/tasks.py#L346, which shows that wait() wraps the coroutines in ensure_future() for you!

In other words, we do need a future, and coroutines will be silently transformed into them.

I’ll update this answer when I find a definitive explanation of how to batch coroutines/futures.


回答 3

摘自BDFL [2013]

任务

  • 这是包裹在未来的协程
  • Task类是Future类的子类
  • 因此,它与工作的await呢!

  • 它与裸协程有何不同?
  • 无需等待就可以取得进步
    • 只要您等待其他事情,即
      • 等待 [something_else]

考虑到这一点,ensure_future将其作为创建任务的名称是有意义的,因为无论您是否等待它(只要您等待某事),都会计算Future的结果。这使事件循环可以在您等待其他事情时完成您的任务。请注意,Python 3.7 create_task确保未来的首选方法。

注意:出于现代性的考虑,我将Guido幻灯片中的“收益率从”更改为“等待”。

From the BDFL [2013]

Tasks

  • It’s a coroutine wrapped in a Future
  • class Task is a subclass of class Future
  • So it works with await too!

  • How does it differ from a bare coroutine?
  • It can make progress without waiting for it
    • As long as you wait for something else, i.e.
      • await [something_else]

With this in mind, ensure_future makes sense as a name for creating a Task since the Future’s result will be computed whether or not you await it (as long as you await something). This allows the event loop to complete your Task while you’re waiting on other things. Note that in Python 3.7 create_task is the preferred way ensure a future.

Note: I changed “yield from” in Guido’s slides to “await” here for modernity.