标签归档:asynchronous

socket.shutdown与socket.close

问题:socket.shutdown与socket.close

我最近看到了一些看起来像这样的代码(袜子当然是套接字对象):

sock.shutdown(socket.SHUT_RDWR)
sock.close()

在套接字上调用shutdown然后关闭它的目的是什么?如果有所不同,则此套接字用于非阻塞IO。

I recently saw a bit of code that looked like this (with sock being a socket object of course):

sock.shutdown(socket.SHUT_RDWR)
sock.close()

What exactly is the purpose of calling shutdown on the socket and then closing it? If it makes a difference, this socket is being used for non-blocking IO.


回答 0

这是一个解释

一旦不再需要套接字,调用程序就可以通过对套接字描述符应用close子例程来丢弃该套接字。如果在关闭时可靠的传输套接字具有与之关联的数据,则系统将继续尝试进行数据传输。但是,如果仍未交付数据,则系统将丢弃该数据。如果应用程序不使用任何暂挂数据,则可以在关闭套接字之前使用套接字上的shutdown子例程。

Here’s one explanation:

Once a socket is no longer required, the calling program can discard the socket by applying a close subroutine to the socket descriptor. If a reliable delivery socket has data associated with it when a close takes place, the system continues to attempt data transfer. However, if the data is still undelivered, the system discards the data. Should the application program have no use for any pending data, it can use the shutdown subroutine on the socket prior to closing it.


回答 1

调用closeshutdown对基础套接字有两种不同的影响。

首先要指出的是,套接字是基础操作系统中的资源,并且多个进程可以具有同一基础套接字的句柄。

您打电话的时候 close它时,将句柄计数减一,如果句柄计数达到零,则套接字和关联的连接将通过正常的关闭过程(有效地将FIN / EOF发送到对等方)来释放套接字。

这里要注意的是,如果句柄计数没有达到零,因为另一个进程仍然具有套接字的句柄,则连接不会关闭并且套接字不会被释放。

另一方面,调用shutdown读写会关闭基础连接,并向对等方发送FIN / EOF,而不管套接字有多少个进程。但是,它不会取消分配套接字,您仍然需要在事后调用close。

Calling close and shutdown have two different effects on the underlying socket.

The first thing to point out is that the socket is a resource in the underlying OS and multiple processes can have a handle for the same underlying socket.

When you call close it decrements the handle count by one and if the handle count has reached zero then the socket and associated connection goes through the normal close procedure (effectively sending a FIN / EOF to the peer) and the socket is deallocated.

The thing to pay attention to here is that if the handle count does not reach zero because another process still has a handle to the socket then the connection is not closed and the socket is not deallocated.

On the other hand calling shutdown for reading and writing closes the underlying connection and sends a FIN / EOF to the peer regardless of how many processes have handles to the socket. However, it does not deallocate the socket and you still need to call close afterward.


回答 2

关闭和关闭的说明:正常关闭(msdn)

关机(针对您的情况)表示连接的另一端不再有读写套接字的意图。然后关闭释放与套接字关联的所有内存。

忽略关闭可能会导致套接字在操作系统堆栈中徘徊,直到正常关闭连接为止。

在国际海事组织中,“关闭”和“关闭”这两个名称具有误导性,“关闭”和“破坏”将强调它们之间的差异。

Explanation of shutdown and close: Graceful shutdown (msdn)

Shutdown (in your case) indicates to the other end of the connection there is no further intention to read from or write to the socket. Then close frees up any memory associated with the socket.

Omitting shutdown may cause the socket to linger in the OSs stack until the connection has been closed gracefully.

IMO the names ‘shutdown’ and ‘close’ are misleading, ‘close’ and ‘destroy’ would emphasise their differences.


回答 3

在Socket Programming HOWTO(py2 / py3)中已经提到了

断开连接

严格来说,应该先shutdown在套接字上使用close它。该shutdown是在另一端的咨询到插座。根据您传递的参数,它可能表示“ 我不再发送了,但我仍会听 ”,或“ 我不在听,很好的摆脱!”。但是,大多数套接字库都习惯于程序员忽略使用此礼节,通常a close与相同shutdown(); close()。因此,在大多数情况下,不需要显式关闭。

it’s mentioned right in the Socket Programming HOWTO (py2/py3)

Disconnecting

Strictly speaking, you’re supposed to use shutdown on a socket before you close it. The shutdown is an advisory to the socket at the other end. Depending on the argument you pass it, it can mean “I’m not going to send anymore, but I’ll still listen”, or “I’m not listening, good riddance!”. Most socket libraries, however, are so used to programmers neglecting to use this piece of etiquette that normally a close is the same as shutdown(); close(). So in most situations, an explicit shutdown is not needed.


回答 4

上面的代码难道不是错误的吗?

在shutdown调用之后直接执行close调用可能会使内核无论如何都丢弃所有传出缓冲区。

根据 http://blog.netherlabs.nl/articles/2009/01/18/the-ultimate-so_linger-page-or-why-is-my-tcp-not-reliable, 需要在关机和关机之间等待关闭,直到读取返回0。

Isn’t this code above wrong?

The close call directly after the shutdown call might make the kernel discard all outgoing buffers anyway.

According to http://blog.netherlabs.nl/articles/2009/01/18/the-ultimate-so_linger-page-or-why-is-my-tcp-not-reliable one needs to wait between the shutdown and the close until read returns 0.


回答 5

有一些关闭的方式:http : //msdn.microsoft.com/en-us/library/system.net.sockets.socket.shutdown.aspx。* nix是相似的。


回答 6

Shutdown(1),强制套接字no发送更多数据

这在

1-缓冲液冲洗

2-奇怪的错误检测

3-安全防护

让我解释更多,当您将数据从A发送到B时,不保证将其发送到B,仅保证将其发送到A os缓冲区,然后缓冲区又将其发送到B os缓冲区。

因此,通过在A上调用shutdown(1),您将刷新A的缓冲区,如果缓冲区不为空,则会引发错误,即:尚未将数据发送到对等方

但是,这是不可挽回的,因此您可以在完全发送完所有数据之后,并确保至少在对等os缓冲区中执行此操作

Shutdown(1) , forces the socket no to send any more data

This is usefull in

1- Buffer flushing

2- Strange error detection

3- Safe guarding

Let me explain more , when you send a data from A to B , it’s not guaranteed to be sent to B , it’s only guaranteed to be sent to the A os buffer , which in turn sends it to the B os buffer

So by calling shutdown(1) on A , you flush A’s buffer and an error is raised if the buffer is not empty ie: data has not been sent to the peer yet

Howoever this is irrevesable , so you can do that after you completely sent all your data and you want to be sure that it’s atleast at the peer os buffer


如何从Python异步运行外部命令?

问题:如何从Python异步运行外部命令?

我需要从Python脚本异步运行Shell命令。我的意思是,我希望我的Python脚本能够在外部命令关闭并继续执行所需操作的同时继续运行。

我读了这篇文章:

在Python中调用外部命令

然后我os.system()去做了一些测试,如果我&在命令末尾使用它,看起来就可以完成这项工作,这样我就不必等待它返回。我想知道的是,这是否是完成此任务的正确方法?我试过了,commands.call()但是对我来说不起作用,因为它会阻塞外部命令。

请告诉我是否os.system()建议这样做,或者我应该尝试其他方法。

I need to run a shell command asynchronously from a Python script. By this I mean that I want my Python script to continue running while the external command goes off and does whatever it needs to do.

I read this post:

Calling an external command in Python

I then went off and did some testing, and it looks like os.system() will do the job provided that I use & at the end of the command so that I don’t have to wait for it to return. What I am wondering is if this is the proper way to accomplish such a thing? I tried commands.call() but it will not work for me because it blocks on the external command.

Please let me know if using os.system() for this is advisable or if I should try some other route.


回答 0

subprocess.Popen正是您想要的。

from subprocess import Popen
p = Popen(['watch', 'ls']) # something long running
# ... do other stuff while subprocess is running
p.terminate()

(编辑以完成评论的答案)

Popen实例可以执行其他各种操作,例如可以poll()查看它是否仍在运行,还可以communicate()使用它在stdin上发送数据,并等待其终止。

subprocess.Popen does exactly what you want.

from subprocess import Popen
p = Popen(['watch', 'ls']) # something long running
# ... do other stuff while subprocess is running
p.terminate()

(Edit to complete the answer from comments)

The Popen instance can do various other things like you can poll() it to see if it is still running, and you can communicate() with it to send it data on stdin, and wait for it to terminate.


回答 1

如果要并行运行许多进程,然后在它们产生结果时进行处理,则可以使用轮询,如下所示:

from subprocess import Popen, PIPE
import time

running_procs = [
    Popen(['/usr/bin/my_cmd', '-i %s' % path], stdout=PIPE, stderr=PIPE)
    for path in '/tmp/file0 /tmp/file1 /tmp/file2'.split()]

while running_procs:
    for proc in running_procs:
        retcode = proc.poll()
        if retcode is not None: # Process finished.
            running_procs.remove(proc)
            break
        else: # No process is done, wait a bit and check again.
            time.sleep(.1)
            continue

    # Here, `proc` has finished with return code `retcode`
    if retcode != 0:
        """Error handling."""
    handle_results(proc.stdout)

控制流有些混乱,因为我正试图将其缩小—您可以根据自己的口味进行重构。:-)

这具有先为早期处理请求提供服务的优势。如果您调用communicate第一个正在运行的进程,而事实证明运行时间最长,则其他正在运行的进程在可能已经处理完它们的结果时将一直闲置在那里。

If you want to run many processes in parallel and then handle them when they yield results, you can use polling like in the following:

from subprocess import Popen, PIPE
import time

running_procs = [
    Popen(['/usr/bin/my_cmd', '-i %s' % path], stdout=PIPE, stderr=PIPE)
    for path in '/tmp/file0 /tmp/file1 /tmp/file2'.split()]

while running_procs:
    for proc in running_procs:
        retcode = proc.poll()
        if retcode is not None: # Process finished.
            running_procs.remove(proc)
            break
        else: # No process is done, wait a bit and check again.
            time.sleep(.1)
            continue

    # Here, `proc` has finished with return code `retcode`
    if retcode != 0:
        """Error handling."""
    handle_results(proc.stdout)

The control flow there is a little bit convoluted because I’m trying to make it small — you can refactor to your taste. :-)

This has the advantage of servicing the early-finishing requests first. If you call communicate on the first running process and that turns out to run the longest, the other running processes will have been sitting there idle when you could have been handling their results.


回答 2

我想知道的是[os.system()]是否是完成此类任务的正确方法?

os.system()不是正确的方法。这就是每个人都说要使用的原因subprocess

有关更多信息,请阅读http://docs.python.org/library/os.html#os.system

子流程模块提供了更强大的功能来生成新流程并检索其结果。使用该模块优于使用此功能。使用子流程模块。尤其要检查“子过程模块”部分的“替换旧功能”。

What I am wondering is if this [os.system()] is the proper way to accomplish such a thing?

No. os.system() is not the proper way. That’s why everyone says to use subprocess.

For more information, read http://docs.python.org/library/os.html#os.system

The subprocess module provides more powerful facilities for spawning new processes and retrieving their results; using that module is preferable to using this function. Use the subprocess module. Check especially the Replacing Older Functions with the subprocess Module section.


回答 3

我使用asyncproc模块取得了成功,该模块很好地处理了流程的输出。例如:

import os
from asynproc import Process
myProc = Process("myprogram.app")

while True:
    # check to see if process has ended
    poll = myProc.wait(os.WNOHANG)
    if poll is not None:
        break
    # print any new output
    out = myProc.read()
    if out != "":
        print out

I’ve had good success with the asyncproc module, which deals nicely with the output from the processes. For example:

import os
from asynproc import Process
myProc = Process("myprogram.app")

while True:
    # check to see if process has ended
    poll = myProc.wait(os.WNOHANG)
    if poll is not None:
        break
    # print any new output
    out = myProc.read()
    if out != "":
        print out

回答 4

pexpect与非阻塞阅读行结合使用是另一种方法。Pexpect解决了死锁问题,使您可以轻松地在后台运行进程,并在进程吐出预定义的字符串时提供简便的方法来进行回调,并且通常使与进程的交互更加容易。

Using pexpect with non-blocking readlines is another way to do this. Pexpect solves the deadlock problems, allows you to easily run the processes in the background, and gives easy ways to have callbacks when your process spits out predefined strings, and generally makes interacting with the process much easier.


回答 5

考虑到“我不必等待它返回”,最简单的解决方案之一就是:

subprocess.Popen( \
    [path_to_executable, arg1, arg2, ... argN],
    creationflags = subprocess.CREATE_NEW_CONSOLE,
).pid

但是…据我所读,这不是“ subprocess.CREATE_NEW_CONSOLE标记完成此事的正确方法”,因为标志会产生安全风险。

这里发生的关键事情是使用subprocess.CREATE_NEW_CONSOLE来创建新的控制台,并.pid(返回进程ID,以便以后可以检查程序是否需要),以免等待程序完成其工作。

Considering “I don’t have to wait for it to return”, one of the easiest solutions will be this:

subprocess.Popen( \
    [path_to_executable, arg1, arg2, ... argN],
    creationflags = subprocess.CREATE_NEW_CONSOLE,
).pid

But… From what I read this is not “the proper way to accomplish such a thing” because of security risks created by subprocess.CREATE_NEW_CONSOLE flag.

The key things that happen here is use of subprocess.CREATE_NEW_CONSOLE to create new console and .pid (returns process ID so that you could check program later on if you want to) so that not to wait for program to finish its job.


回答 6

我在使用Python中的s3270脚本软件尝试连接到3270终端时遇到相同的问题。现在,我在这里找到的Process子类解决了这个问题:

http://code.activestate.com/recipes/440554/

这是从文件中获取的示例:

def recv_some(p, t=.1, e=1, tr=5, stderr=0):
    if tr < 1:
        tr = 1
    x = time.time()+t
    y = []
    r = ''
    pr = p.recv
    if stderr:
        pr = p.recv_err
    while time.time() < x or r:
        r = pr()
        if r is None:
            if e:
                raise Exception(message)
            else:
                break
        elif r:
            y.append(r)
        else:
            time.sleep(max((x-time.time())/tr, 0))
    return ''.join(y)

def send_all(p, data):
    while len(data):
        sent = p.send(data)
        if sent is None:
            raise Exception(message)
        data = buffer(data, sent)

if __name__ == '__main__':
    if sys.platform == 'win32':
        shell, commands, tail = ('cmd', ('dir /w', 'echo HELLO WORLD'), '\r\n')
    else:
        shell, commands, tail = ('sh', ('ls', 'echo HELLO WORLD'), '\n')

    a = Popen(shell, stdin=PIPE, stdout=PIPE)
    print recv_some(a),
    for cmd in commands:
        send_all(a, cmd + tail)
        print recv_some(a),
    send_all(a, 'exit' + tail)
    print recv_some(a, e=0)
    a.wait()

I have the same problem trying to connect to an 3270 terminal using the s3270 scripting software in Python. Now I’m solving the problem with an subclass of Process that I found here:

http://code.activestate.com/recipes/440554/

And here is the sample taken from file:

def recv_some(p, t=.1, e=1, tr=5, stderr=0):
    if tr < 1:
        tr = 1
    x = time.time()+t
    y = []
    r = ''
    pr = p.recv
    if stderr:
        pr = p.recv_err
    while time.time() < x or r:
        r = pr()
        if r is None:
            if e:
                raise Exception(message)
            else:
                break
        elif r:
            y.append(r)
        else:
            time.sleep(max((x-time.time())/tr, 0))
    return ''.join(y)

def send_all(p, data):
    while len(data):
        sent = p.send(data)
        if sent is None:
            raise Exception(message)
        data = buffer(data, sent)

if __name__ == '__main__':
    if sys.platform == 'win32':
        shell, commands, tail = ('cmd', ('dir /w', 'echo HELLO WORLD'), '\r\n')
    else:
        shell, commands, tail = ('sh', ('ls', 'echo HELLO WORLD'), '\n')

    a = Popen(shell, stdin=PIPE, stdout=PIPE)
    print recv_some(a),
    for cmd in commands:
        send_all(a, cmd + tail)
        print recv_some(a),
    send_all(a, 'exit' + tail)
    print recv_some(a, e=0)
    a.wait()

回答 7

接受的答案旧。

我在这里找到了一个更好的现代答案:

https://kevinmccarthy.org/2016/07/25/streaming-subprocess-stdin-and-stdout-with-asyncio-in-python/

并进行了一些更改:

  1. 使它在Windows上工作
  2. 使它与多个命令一起工作
import sys
import asyncio

if sys.platform == "win32":
    asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())


async def _read_stream(stream, cb):
    while True:
        line = await stream.readline()
        if line:
            cb(line)
        else:
            break


async def _stream_subprocess(cmd, stdout_cb, stderr_cb):
    try:
        process = await asyncio.create_subprocess_exec(
            *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
        )

        await asyncio.wait(
            [
                _read_stream(process.stdout, stdout_cb),
                _read_stream(process.stderr, stderr_cb),
            ]
        )
        rc = await process.wait()
        return process.pid, rc
    except OSError as e:
        # the program will hang if we let any exception propagate
        return e


def execute(*aws):
    """ run the given coroutines in an asyncio loop
    returns a list containing the values returned from each coroutine.
    """
    loop = asyncio.get_event_loop()
    rc = loop.run_until_complete(asyncio.gather(*aws))
    loop.close()
    return rc


def printer(label):
    def pr(*args, **kw):
        print(label, *args, **kw)

    return pr


def name_it(start=0, template="s{}"):
    """a simple generator for task names
    """
    while True:
        yield template.format(start)
        start += 1


def runners(cmds):
    """
    cmds is a list of commands to excecute as subprocesses
    each item is a list appropriate for use by subprocess.call
    """
    next_name = name_it().__next__
    for cmd in cmds:
        name = next_name()
        out = printer(f"{name}.stdout")
        err = printer(f"{name}.stderr")
        yield _stream_subprocess(cmd, out, err)


if __name__ == "__main__":
    cmds = (
        [
            "sh",
            "-c",
            """echo "$SHELL"-stdout && sleep 1 && echo stderr 1>&2 && sleep 1 && echo done""",
        ],
        [
            "bash",
            "-c",
            "echo 'hello, Dave.' && sleep 1 && echo dave_err 1>&2 && sleep 1 && echo done",
        ],
        [sys.executable, "-c", 'print("hello from python");import sys;sys.exit(2)'],
    )

    print(execute(*runners(cmds)))

示例命令不可能在您的系统上完美地工作,也不可能处理奇怪的错误,但是此代码确实演示了一种使用asyncio运行多个子进程并输出输出的方法。

The accepted answer is very old.

I found a better modern answer here:

https://kevinmccarthy.org/2016/07/25/streaming-subprocess-stdin-and-stdout-with-asyncio-in-python/

and made some changes:

  1. make it work on windows
  2. make it work with multiple commands
import sys
import asyncio

if sys.platform == "win32":
    asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())


async def _read_stream(stream, cb):
    while True:
        line = await stream.readline()
        if line:
            cb(line)
        else:
            break


async def _stream_subprocess(cmd, stdout_cb, stderr_cb):
    try:
        process = await asyncio.create_subprocess_exec(
            *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
        )

        await asyncio.wait(
            [
                _read_stream(process.stdout, stdout_cb),
                _read_stream(process.stderr, stderr_cb),
            ]
        )
        rc = await process.wait()
        return process.pid, rc
    except OSError as e:
        # the program will hang if we let any exception propagate
        return e


def execute(*aws):
    """ run the given coroutines in an asyncio loop
    returns a list containing the values returned from each coroutine.
    """
    loop = asyncio.get_event_loop()
    rc = loop.run_until_complete(asyncio.gather(*aws))
    loop.close()
    return rc


def printer(label):
    def pr(*args, **kw):
        print(label, *args, **kw)

    return pr


def name_it(start=0, template="s{}"):
    """a simple generator for task names
    """
    while True:
        yield template.format(start)
        start += 1


def runners(cmds):
    """
    cmds is a list of commands to excecute as subprocesses
    each item is a list appropriate for use by subprocess.call
    """
    next_name = name_it().__next__
    for cmd in cmds:
        name = next_name()
        out = printer(f"{name}.stdout")
        err = printer(f"{name}.stderr")
        yield _stream_subprocess(cmd, out, err)


if __name__ == "__main__":
    cmds = (
        [
            "sh",
            "-c",
            """echo "$SHELL"-stdout && sleep 1 && echo stderr 1>&2 && sleep 1 && echo done""",
        ],
        [
            "bash",
            "-c",
            "echo 'hello, Dave.' && sleep 1 && echo dave_err 1>&2 && sleep 1 && echo done",
        ],
        [sys.executable, "-c", 'print("hello from python");import sys;sys.exit(2)'],
    )

    print(execute(*runners(cmds)))

It is unlikely that the example commands will work perfectly on your system, and it doesn’t handle weird errors, but this code does demonstrate one way to run multiple subprocesses using asyncio and stream the output.


回答 8

这里有几个答案,但是没有一个满足我的以下要求:

  1. 我不想等待命令完成或用子进程输出污染我的终端。

  2. 我想使用重定向运行bash脚本。

  3. 我想在我的bash脚本中支持管道(例如find ... | tar ...)。

满足以上要求的唯一组合是:

subprocess.Popen(['./my_script.sh "arg1" > "redirect/path/to"'],
                 stdout=subprocess.PIPE, 
                 stderr=subprocess.PIPE,
                 shell=True)

There are several answers here but none of them satisfied my below requirements:

  1. I don’t want to wait for command to finish or pollute my terminal with subprocess outputs.

  2. I want to run bash script with redirects.

  3. I want to support piping within my bash script (for example find ... | tar ...).

The only combination that satiesfies above requirements is:

subprocess.Popen(['./my_script.sh "arg1" > "redirect/path/to"'],
                 stdout=subprocess.PIPE, 
                 stderr=subprocess.PIPE,
                 shell=True)

回答 9

Python 3子过程示例在“等待命令异步终止”下对此进行了介绍:

import asyncio

proc = await asyncio.create_subprocess_exec(
    'ls','-lha',
    stdout=asyncio.subprocess.PIPE,
    stderr=asyncio.subprocess.PIPE)

# do something else while ls is working

# if proc takes very long to complete, the CPUs are free to use cycles for 
# other processes
stdout, stderr = await proc.communicate()

该过程完成后将立即开始运行await asyncio.create_subprocess_exec(...)。如果在您调用时还没有完成await proc.communicate(),它将在那儿等待,以便为您提供输出状态。如果完成,proc.communicate()将立即返回。

要点类似于Terrels的答案,但我认为Terrels的答案似乎使事情复杂化了。

请参阅asyncio.create_subprocess_exec以获取更多信息。

This is covered by Python 3 Subprocess Examples under “Wait for command to terminate asynchronously”:

import asyncio

proc = await asyncio.create_subprocess_exec(
    'ls','-lha',
    stdout=asyncio.subprocess.PIPE,
    stderr=asyncio.subprocess.PIPE)

# do something else while ls is working

# if proc takes very long to complete, the CPUs are free to use cycles for 
# other processes
stdout, stderr = await proc.communicate()

The process will start running as soon as the await asyncio.create_subprocess_exec(...) has completed. If it hasn’t finished by the time you call await proc.communicate(), it will wait there in order to give you your output status. If it has finished, proc.communicate() will return immediately.

The gist here is similar to Terrels answer but I think Terrels answer appears to overcomplicate things.

See asyncio.create_subprocess_exec for more information.


在Flask中进行异步任务

问题:在Flask中进行异步任务

我正在Flask中编写一个应用程序,除了WSGI同步和阻塞之外,它的运行情况非常好。我特别有一项任务,该任务调出第三方API,该任务可能需要几分钟才能完成。我想拨打该电话(实际上是一系列电话)并使其运行。同时控制权返回给Flask。

我的看法如下:

@app.route('/render/<id>', methods=['POST'])
def render_script(id=None):
    ...
    data = json.loads(request.data)
    text_list = data.get('text_list')
    final_file = audio_class.render_audio(data=text_list)
    # do stuff
    return Response(
        mimetype='application/json',
        status=200
    )

现在,我要做的就是

final_file = audio_class.render_audio()

运行并提供在方法返回时要执行的回调,而Flask可以继续处理请求。这是我需要Flask异步运行的唯一任务,并且我想就如何最好地实现这一点提供一些建议。

我看过Twisted和Klein,但我不确定它们是否过大,因为Threading就足够了。或者也许Celery是一个不错的选择?

I am writing an application in Flask, which works really well except that WSGI is synchronous and blocking. I have one task in particular which calls out to a third party API and that task can take several minutes to complete. I would like to make that call (it’s actually a series of calls) and let it run. while control is returned to Flask.

My view looks like:

@app.route('/render/<id>', methods=['POST'])
def render_script(id=None):
    ...
    data = json.loads(request.data)
    text_list = data.get('text_list')
    final_file = audio_class.render_audio(data=text_list)
    # do stuff
    return Response(
        mimetype='application/json',
        status=200
    )

Now, what I want to do is have the line

final_file = audio_class.render_audio()

run and provide a callback to be executed when the method returns, whilst Flask can continue to process requests. This is the only task which I need Flask to run asynchronously, and I would like some advice on how best to implement this.

I have looked at Twisted and Klein, but I’m not sure they are overkill, as maybe Threading would suffice. Or maybe Celery is a good choice for this?


回答 0

我将使用Celery为您处理异步任务。您需要安装一个代理作为您的任务队列(建议使用RabbitMQ和Redis)。

app.py

from flask import Flask
from celery import Celery

broker_url = 'amqp://guest@localhost'          # Broker URL for RabbitMQ task queue

app = Flask(__name__)    
celery = Celery(app.name, broker=broker_url)
celery.config_from_object('celeryconfig')      # Your celery configurations in a celeryconfig.py

@celery.task(bind=True)
def some_long_task(self, x, y):
    # Do some long task
    ...

@app.route('/render/<id>', methods=['POST'])
def render_script(id=None):
    ...
    data = json.loads(request.data)
    text_list = data.get('text_list')
    final_file = audio_class.render_audio(data=text_list)
    some_long_task.delay(x, y)                 # Call your async task and pass whatever necessary variables
    return Response(
        mimetype='application/json',
        status=200
    )

运行您的Flask应用,然后启动另一个过程来运行您的Celery工作者。

$ celery worker -A app.celery --loglevel=debug

我还将参考Miguel Gringberg的文章以获取更深入的将Celery与Flask结合使用的指南。

I would use Celery to handle the asynchronous task for you. You’ll need to install a broker to serve as your task queue (RabbitMQ and Redis are recommended).

app.py:

from flask import Flask
from celery import Celery

broker_url = 'amqp://guest@localhost'          # Broker URL for RabbitMQ task queue

app = Flask(__name__)    
celery = Celery(app.name, broker=broker_url)
celery.config_from_object('celeryconfig')      # Your celery configurations in a celeryconfig.py

@celery.task(bind=True)
def some_long_task(self, x, y):
    # Do some long task
    ...

@app.route('/render/<id>', methods=['POST'])
def render_script(id=None):
    ...
    data = json.loads(request.data)
    text_list = data.get('text_list')
    final_file = audio_class.render_audio(data=text_list)
    some_long_task.delay(x, y)                 # Call your async task and pass whatever necessary variables
    return Response(
        mimetype='application/json',
        status=200
    )

Run your Flask app, and start another process to run your celery worker.

$ celery worker -A app.celery --loglevel=debug

I would also refer to Miguel Gringberg’s write up for a more in depth guide to using Celery with Flask.


回答 1

线程化是另一种可能的解决方案。尽管基于Celery的解决方案对于大规模应用程序更好,但是如果您不希望在所讨论的端点上有太多流量,则线程化是一种可行的选择。

该解决方案基于Miguel Grinberg的PyCon 2016 Flask Scale演示文稿,特别是其幻灯片平台中的第41张幻灯片。对于原始源感兴趣的人,他的代码也可以在github上找到。

从用户的角度来看,代码的工作方式如下:

  1. 您调用执行长时间运行任务的端点。
  2. 该端点返回202接受的链接,以检查任务状态。
  3. 在taks仍在运行时,对状态链接的调用返回202,在任务完成时返回200(及其结果)。

要将api调用转换为后台任务,只需添加@async_api装饰器。

这是一个完整的示例:

from flask import Flask, g, abort, current_app, request, url_for
from werkzeug.exceptions import HTTPException, InternalServerError
from flask_restful import Resource, Api
from datetime import datetime
from functools import wraps
import threading
import time
import uuid

tasks = {}

app = Flask(__name__)
api = Api(app)


@app.before_first_request
def before_first_request():
    """Start a background thread that cleans up old tasks."""
    def clean_old_tasks():
        """
        This function cleans up old tasks from our in-memory data structure.
        """
        global tasks
        while True:
            # Only keep tasks that are running or that finished less than 5
            # minutes ago.
            five_min_ago = datetime.timestamp(datetime.utcnow()) - 5 * 60
            tasks = {task_id: task for task_id, task in tasks.items()
                     if 'completion_timestamp' not in task or task['completion_timestamp'] > five_min_ago}
            time.sleep(60)

    if not current_app.config['TESTING']:
        thread = threading.Thread(target=clean_old_tasks)
        thread.start()


def async_api(wrapped_function):
    @wraps(wrapped_function)
    def new_function(*args, **kwargs):
        def task_call(flask_app, environ):
            # Create a request context similar to that of the original request
            # so that the task can have access to flask.g, flask.request, etc.
            with flask_app.request_context(environ):
                try:
                    tasks[task_id]['return_value'] = wrapped_function(*args, **kwargs)
                except HTTPException as e:
                    tasks[task_id]['return_value'] = current_app.handle_http_exception(e)
                except Exception as e:
                    # The function raised an exception, so we set a 500 error
                    tasks[task_id]['return_value'] = InternalServerError()
                    if current_app.debug:
                        # We want to find out if something happened so reraise
                        raise
                finally:
                    # We record the time of the response, to help in garbage
                    # collecting old tasks
                    tasks[task_id]['completion_timestamp'] = datetime.timestamp(datetime.utcnow())

                    # close the database session (if any)

        # Assign an id to the asynchronous task
        task_id = uuid.uuid4().hex

        # Record the task, and then launch it
        tasks[task_id] = {'task_thread': threading.Thread(
            target=task_call, args=(current_app._get_current_object(),
                               request.environ))}
        tasks[task_id]['task_thread'].start()

        # Return a 202 response, with a link that the client can use to
        # obtain task status
        print(url_for('gettaskstatus', task_id=task_id))
        return 'accepted', 202, {'Location': url_for('gettaskstatus', task_id=task_id)}
    return new_function


class GetTaskStatus(Resource):
    def get(self, task_id):
        """
        Return status about an asynchronous task. If this request returns a 202
        status code, it means that task hasn't finished yet. Else, the response
        from the task is returned.
        """
        task = tasks.get(task_id)
        if task is None:
            abort(404)
        if 'return_value' not in task:
            return '', 202, {'Location': url_for('gettaskstatus', task_id=task_id)}
        return task['return_value']


class CatchAll(Resource):
    @async_api
    def get(self, path=''):
        # perform some intensive processing
        print("starting processing task, path: '%s'" % path)
        time.sleep(10)
        print("completed processing task, path: '%s'" % path)
        return f'The answer is: {path}'


api.add_resource(CatchAll, '/<path:path>', '/')
api.add_resource(GetTaskStatus, '/status/<task_id>')


if __name__ == '__main__':
    app.run(debug=True)

Threading is another possible solution. Although the Celery based solution is better for applications at scale, if you are not expecting too much traffic on the endpoint in question, threading is a viable alternative.

This solution is based on Miguel Grinberg’s PyCon 2016 Flask at Scale presentation, specifically slide 41 in his slide deck. His code is also available on github for those interested in the original source.

From a user perspective the code works as follows:

  1. You make a call to the endpoint that performs the long running task.
  2. This endpoint returns 202 Accepted with a link to check on the task status.
  3. Calls to the status link returns 202 while the taks is still running, and returns 200 (and the result) when the task is complete.

To convert an api call to a background task, simply add the @async_api decorator.

Here is a fully contained example:

from flask import Flask, g, abort, current_app, request, url_for
from werkzeug.exceptions import HTTPException, InternalServerError
from flask_restful import Resource, Api
from datetime import datetime
from functools import wraps
import threading
import time
import uuid

tasks = {}

app = Flask(__name__)
api = Api(app)


@app.before_first_request
def before_first_request():
    """Start a background thread that cleans up old tasks."""
    def clean_old_tasks():
        """
        This function cleans up old tasks from our in-memory data structure.
        """
        global tasks
        while True:
            # Only keep tasks that are running or that finished less than 5
            # minutes ago.
            five_min_ago = datetime.timestamp(datetime.utcnow()) - 5 * 60
            tasks = {task_id: task for task_id, task in tasks.items()
                     if 'completion_timestamp' not in task or task['completion_timestamp'] > five_min_ago}
            time.sleep(60)

    if not current_app.config['TESTING']:
        thread = threading.Thread(target=clean_old_tasks)
        thread.start()


def async_api(wrapped_function):
    @wraps(wrapped_function)
    def new_function(*args, **kwargs):
        def task_call(flask_app, environ):
            # Create a request context similar to that of the original request
            # so that the task can have access to flask.g, flask.request, etc.
            with flask_app.request_context(environ):
                try:
                    tasks[task_id]['return_value'] = wrapped_function(*args, **kwargs)
                except HTTPException as e:
                    tasks[task_id]['return_value'] = current_app.handle_http_exception(e)
                except Exception as e:
                    # The function raised an exception, so we set a 500 error
                    tasks[task_id]['return_value'] = InternalServerError()
                    if current_app.debug:
                        # We want to find out if something happened so reraise
                        raise
                finally:
                    # We record the time of the response, to help in garbage
                    # collecting old tasks
                    tasks[task_id]['completion_timestamp'] = datetime.timestamp(datetime.utcnow())

                    # close the database session (if any)

        # Assign an id to the asynchronous task
        task_id = uuid.uuid4().hex

        # Record the task, and then launch it
        tasks[task_id] = {'task_thread': threading.Thread(
            target=task_call, args=(current_app._get_current_object(),
                               request.environ))}
        tasks[task_id]['task_thread'].start()

        # Return a 202 response, with a link that the client can use to
        # obtain task status
        print(url_for('gettaskstatus', task_id=task_id))
        return 'accepted', 202, {'Location': url_for('gettaskstatus', task_id=task_id)}
    return new_function


class GetTaskStatus(Resource):
    def get(self, task_id):
        """
        Return status about an asynchronous task. If this request returns a 202
        status code, it means that task hasn't finished yet. Else, the response
        from the task is returned.
        """
        task = tasks.get(task_id)
        if task is None:
            abort(404)
        if 'return_value' not in task:
            return '', 202, {'Location': url_for('gettaskstatus', task_id=task_id)}
        return task['return_value']


class CatchAll(Resource):
    @async_api
    def get(self, path=''):
        # perform some intensive processing
        print("starting processing task, path: '%s'" % path)
        time.sleep(10)
        print("completed processing task, path: '%s'" % path)
        return f'The answer is: {path}'


api.add_resource(CatchAll, '/<path:path>', '/')
api.add_resource(GetTaskStatus, '/status/<task_id>')


if __name__ == '__main__':
    app.run(debug=True)


回答 2

您也可以尝试使用multiprocessing.Processdaemon=True; 该process.start()方法不会阻止,您可以在后台执行昂贵的函数时立即将响应/状态返回给调用方。

在使用falcon框架并使用daemon过程帮助时,我遇到了类似的问题。

您需要执行以下操作:

from multiprocessing import Process

@app.route('/render/<id>', methods=['POST'])
def render_script(id=None):
    ...
    heavy_process = Process(  # Create a daemonic process with heavy "my_func"
        target=my_func,
        daemon=True
    )
    heavy_process.start()
    return Response(
        mimetype='application/json',
        status=200
    )

# Define some heavy function
def my_func():
    time.sleep(10)
    print("Process finished")

您应该立即得到响应,十秒钟后,您应该在控制台中看到打印的消息。

注意:请记住,daemonic不允许进程产生任何子进程。

You can also try using multiprocessing.Process with daemon=True; the process.start() method does not block and you can return a response/status immediately to the caller while your expensive function executes in the background.

I experienced similar problem while working with falcon framework and using daemon process helped.

You’d need to do the following:

from multiprocessing import Process

@app.route('/render/<id>', methods=['POST'])
def render_script(id=None):
    ...
    heavy_process = Process(  # Create a daemonic process with heavy "my_func"
        target=my_func,
        daemon=True
    )
    heavy_process.start()
    return Response(
        mimetype='application/json',
        status=200
    )

# Define some heavy function
def my_func():
    time.sleep(10)
    print("Process finished")

You should get a response immediately and, after 10s you should see a printed message in the console.

NOTE: Keep in mind that daemonic processes are not allowed to spawn any child processes.


带有Python请求的异步请求

问题:带有Python请求的异步请求

我尝试了python 请求库文档中提供的示例。

使用async.map(rs),我得到了响应代码,但是我想获得所请求的每个页面的内容。例如,这不起作用:

out = async.map(rs)
print out[0].content

I tried the sample provided within the documentation of the requests library for python.

With async.map(rs), I get the response codes, but I want to get the content of each page requested. This, for example, does not work:

out = async.map(rs)
print out[0].content

回答 0

注意

下面的答案是适用于请求v0.13.0 +。编写此问题后,异步功能已移至grequests。但是,您可以将其替换requestsgrequests下面的内容,它应该可以工作。

我已经留下了这个答案,以反映原始问题,该问题与使用请求<v0.13.0有关。


async.map 异步执行多个任务,您必须:

  1. 为每个对象定义一个函数(您的任务)
  2. 将该函数添加为请求中的事件挂钩
  3. 调用async.map所有请求/操作的列表

例:

from requests import async
# If using requests > v0.13.0, use
# from grequests import async

urls = [
    'http://python-requests.org',
    'http://httpbin.org',
    'http://python-guide.org',
    'http://kennethreitz.com'
]

# A simple task to do to each response object
def do_something(response):
    print response.url

# A list to hold our things to do via async
async_list = []

for u in urls:
    # The "hooks = {..." part is where you define what you want to do
    # 
    # Note the lack of parentheses following do_something, this is
    # because the response will be used as the first argument automatically
    action_item = async.get(u, hooks = {'response' : do_something})

    # Add the task to our list of things to do via async
    async_list.append(action_item)

# Do our list of things to do via async
async.map(async_list)

Note

The below answer is not applicable to requests v0.13.0+. The asynchronous functionality was moved to grequests after this question was written. However, you could just replace requests with grequests below and it should work.

I’ve left this answer as is to reflect the original question which was about using requests < v0.13.0.


To do multiple tasks with async.map asynchronously you have to:

  1. Define a function for what you want to do with each object (your task)
  2. Add that function as an event hook in your request
  3. Call async.map on a list of all the requests / actions

Example:

from requests import async
# If using requests > v0.13.0, use
# from grequests import async

urls = [
    'http://python-requests.org',
    'http://httpbin.org',
    'http://python-guide.org',
    'http://kennethreitz.com'
]

# A simple task to do to each response object
def do_something(response):
    print response.url

# A list to hold our things to do via async
async_list = []

for u in urls:
    # The "hooks = {..." part is where you define what you want to do
    # 
    # Note the lack of parentheses following do_something, this is
    # because the response will be used as the first argument automatically
    action_item = async.get(u, hooks = {'response' : do_something})

    # Add the task to our list of things to do via async
    async_list.append(action_item)

# Do our list of things to do via async
async.map(async_list)

回答 1

async现在是一个独立的模块:grequests

看到这里:https : //github.com/kennethreitz/grequests

那里:通过Python发送多个HTTP请求的理想方法?

安装:

$ pip install grequests

用法:

建立一个堆栈:

import grequests

urls = [
    'http://www.heroku.com',
    'http://tablib.org',
    'http://httpbin.org',
    'http://python-requests.org',
    'http://kennethreitz.com'
]

rs = (grequests.get(u) for u in urls)

发送堆栈

grequests.map(rs)

结果看起来像

[<Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>]

grequests似乎没有为并发请求设置限制,即当多个请求发送到同一服务器时。

async is now an independent module : grequests.

See here : https://github.com/kennethreitz/grequests

And there: Ideal method for sending multiple HTTP requests over Python?

installation:

$ pip install grequests

usage:

build a stack:

import grequests

urls = [
    'http://www.heroku.com',
    'http://tablib.org',
    'http://httpbin.org',
    'http://python-requests.org',
    'http://kennethreitz.com'
]

rs = (grequests.get(u) for u in urls)

send the stack

grequests.map(rs)

result looks like

[<Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>]

grequests don’t seem to set a limitation for concurrent requests, ie when multiple requests are sent to the same server.


回答 2

我同时测试了request-futuresgrequests。Grequests速度更快,但是会带来Monkey补丁和依赖关系的其他问题。request-futures比grequests慢几倍。我决定将自己的请求简单地包装到ThreadPoolExecutor中,这几乎与grequests一样快,但是没有外部依赖项。

import requests
import concurrent.futures

def get_urls():
    return ["url1","url2"]

def load_url(url, timeout):
    return requests.get(url, timeout = timeout)

with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:

    future_to_url = {executor.submit(load_url, url, 10): url for url in     get_urls()}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            resp_err = resp_err + 1
        else:
            resp_ok = resp_ok + 1

I tested both requests-futures and grequests. Grequests is faster but brings monkey patching and additional problems with dependencies. requests-futures is several times slower than grequests. I decided to write my own and simply wrapped requests into ThreadPoolExecutor and it was almost as fast as grequests, but without external dependencies.

import requests
import concurrent.futures

def get_urls():
    return ["url1","url2"]

def load_url(url, timeout):
    return requests.get(url, timeout = timeout)

with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:

    future_to_url = {executor.submit(load_url, url, 10): url for url in     get_urls()}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            resp_err = resp_err + 1
        else:
            resp_ok = resp_ok + 1

回答 3

也许要求-未来是另一种选择。

from requests_futures.sessions import FuturesSession

session = FuturesSession()
# first request is started in background
future_one = session.get('http://httpbin.org/get')
# second requests is started immediately
future_two = session.get('http://httpbin.org/get?foo=bar')
# wait for the first request to complete, if it hasn't already
response_one = future_one.result()
print('response one status: {0}'.format(response_one.status_code))
print(response_one.content)
# wait for the second request to complete, if it hasn't already
response_two = future_two.result()
print('response two status: {0}'.format(response_two.status_code))
print(response_two.content)

办公文件中也建议使用此功能。如果您不想参与gevent,那将是一个很好的选择。

maybe requests-futures is another choice.

from requests_futures.sessions import FuturesSession

session = FuturesSession()
# first request is started in background
future_one = session.get('http://httpbin.org/get')
# second requests is started immediately
future_two = session.get('http://httpbin.org/get?foo=bar')
# wait for the first request to complete, if it hasn't already
response_one = future_one.result()
print('response one status: {0}'.format(response_one.status_code))
print(response_one.content)
# wait for the second request to complete, if it hasn't already
response_two = future_two.result()
print('response two status: {0}'.format(response_two.status_code))
print(response_two.content)

It is also recommended in the office document. If you don’t want involve gevent, it’s a good one.


回答 4

我在发布的大多数答案中都遇到了很多问题-它们要么使用已过时的库,这些库已被移植以具有有限的功能,要么为解决方案的执行提供了太多魔力,因此难以处理错误。如果它们不属于上述类别之一,则说明它们是第三方库或已弃用。

某些解决方案完全可以在http请求中正常工作,但是对于任何其他种类的请求(这都是荒谬的),这些解决方案都不够。这里不需要高度定制的解决方案。

简单地使用python内置库asyncio足以执行任何类型的异步请求,并为复杂的和用例特定的错误处理提供足够的流动性。

import asyncio

loop = asyncio.get_event_loop()

def do_thing(params):
    async def get_rpc_info_and_do_chores(id):
        # do things
        response = perform_grpc_call(id)
        do_chores(response)

    async def get_httpapi_info_and_do_chores(id):
        # do things
        response = requests.get(URL)
        do_chores(response)

    async_tasks = []
    for element in list(params.list_of_things):
       async_tasks.append(loop.create_task(get_chan_info_and_do_chores(id)))
       async_tasks.append(loop.create_task(get_httpapi_info_and_do_chores(ch_id)))

    loop.run_until_complete(asyncio.gather(*async_tasks))

它是如何工作的很简单。您正在创建一系列要异步执行的任务,然后要求循环执行这些任务并在完成时退出。没有多余的库,无需维护,也无需缺少功能。

I have a lot of issues with most of the answers posted – they either use deprecated libraries that have been ported over with limited features, or provide a solution with too much magic on the execution of the request, making it difficult to error handle. If they do not fall into one of the above categories, they’re 3rd party libraries or deprecated.

Some of the solutions works alright purely in http requests, but the solutions fall short for any other kind of request, which is ludicrous. A highly customized solution is not necessary here.

Simply using the python built-in library asyncio is sufficient enough to perform asynchronous requests of any type, as well as providing enough fluidity for complex and usecase specific error handling.

import asyncio

loop = asyncio.get_event_loop()

def do_thing(params):
    async def get_rpc_info_and_do_chores(id):
        # do things
        response = perform_grpc_call(id)
        do_chores(response)

    async def get_httpapi_info_and_do_chores(id):
        # do things
        response = requests.get(URL)
        do_chores(response)

    async_tasks = []
    for element in list(params.list_of_things):
       async_tasks.append(loop.create_task(get_chan_info_and_do_chores(id)))
       async_tasks.append(loop.create_task(get_httpapi_info_and_do_chores(ch_id)))

    loop.run_until_complete(asyncio.gather(*async_tasks))

How it works is simple. You’re creating a series of tasks you’d like to occur asynchronously, and then asking a loop to execute those tasks and exit upon completion. No extra libraries subject to lack of maintenance, no lack of functionality required.


回答 5

我知道这已经关闭了一段时间,但我认为推广另一个基于请求库的异步解决方案可能很有用。

list_of_requests = ['http://moop.com', 'http://doop.com', ...]

from simple_requests import Requests
for response in Requests().swarm(list_of_requests):
    print response.content

这些文档在这里:http : //pythonhosted.org/simple-requests/

I know this has been closed for a while, but I thought it might be useful to promote another async solution built on the requests library.

list_of_requests = ['http://moop.com', 'http://doop.com', ...]

from simple_requests import Requests
for response in Requests().swarm(list_of_requests):
    print response.content

The docs are here: http://pythonhosted.org/simple-requests/


回答 6

threads=list()

for requestURI in requests:
    t = Thread(target=self.openURL, args=(requestURI,))
    t.start()
    threads.append(t)

for thread in threads:
    thread.join()

...

def openURL(self, requestURI):
    o = urllib2.urlopen(requestURI, timeout = 600)
    o...
from threading import Thread

threads=list()

for requestURI in requests:
    t = Thread(target=self.openURL, args=(requestURI,))
    t.start()
    threads.append(t)

for thread in threads:
    thread.join()

...

def openURL(self, requestURI):
    o = urllib2.urlopen(requestURI, timeout = 600)
    o...

回答 7

如果你想使用ASYNCIO,然后requests-async提供异步/ AWAIT功能为requestshttps://github.com/encode/requests-async

If you want to use asyncio, then requests-async provides async/await functionality for requestshttps://github.com/encode/requests-async


回答 8

我一直在使用python请求对github的gist API进行异步调用。

有关示例,请参见此处的代码:

https://github.com/davidthewatson/flasgist/blob/master/views.py#L60-72

这种样式的python可能不是最清晰的例子,但是我可以向您保证代码可以工作。让我知道这是否使您感到困惑,我们将对其进行记录。

I have been using python requests for async calls against github’s gist API for some time.

For an example, see the code here:

https://github.com/davidthewatson/flasgist/blob/master/views.py#L60-72

This style of python may not be the clearest example, but I can assure you that the code works. Let me know if this is confusing to you and I will document it.


回答 9

您可以使用httpx它。

import httpx

async def get_async(url):
    async with httpx.AsyncClient() as client:
        return await client.get(url)

urls = ["http://google.com", "http://wikipedia.org"]

# Note that you need an async context to use `await`.
await asyncio.gather(*map(get_async, urls))

如果您需要功能语法,则gamla lib 会将其包装到中get_async

那你可以做


await gamla.map(gamla.get_async(10), ["http://google.com", "http://wikipedia.org"])

10以秒为单位的超时时间。

(免责声明:我是它的作者)

You can use httpx for that.

import httpx

async def get_async(url):
    async with httpx.AsyncClient() as client:
        return await client.get(url)

urls = ["http://google.com", "http://wikipedia.org"]

# Note that you need an async context to use `await`.
await asyncio.gather(*map(get_async, urls))

if you want a functional syntax, the gamla lib wraps this into get_async.

Then you can do


await gamla.map(gamla.get_async(10), ["http://google.com", "http://wikipedia.org"])

The 10 is the timeout in seconds.

(disclaimer: I am its author)


回答 10

我还尝试了使用python中的异步方法进行某些操作,但是使用twist进行异步编程的运气却更好。它具有较少的问题,并且有据可查。这是一些类似于您正在尝试的东西的链接。

http://pythonquirks.blogspot.com/2011/04/twisted-asynchronous-http-request.html

I have also tried some things using the asynchronous methods in python, how ever I have had much better luck using twisted for asynchronous programming. It has fewer problems and is well documented. Here is a link of something simmilar to what you are trying in twisted.

http://pythonquirks.blogspot.com/2011/04/twisted-asynchronous-http-request.html


干净,轻巧的替代Python的替代品吗?[关闭]

问题:干净,轻巧的替代Python的替代品吗?[关闭]

一个(很久以前),我写了一个网络蜘蛛,对它进行了多线程处理,以使并发请求能够同时发生。那是我的Python青年时代,在我了解GIL及其为多线程代码造成的相关麻烦之前(IE,大多数情况下,这些东西最终都被序列化了!)…

我想对这段代码进行重做,以使其更健壮并性能更好。基本上有两种方法可以执行此操作:我可以使用2.6+中的新多处理模块,也可以使用某种基于反应堆/事件的模型。我宁愿稍后再做,因为它更加简单且不易出错。

因此,问题与哪种框架最适合我的需求有关。以下是到目前为止我所知道的选项列表:

  • Twisted:Python反应器框架的祖父:看起来很复杂,但是有点a肿。陡峭的学习曲线,可完成一项小任务。
  • Eventlet:从在家伙lindenlab。基于Greenlet的框架,适用于此类任务。我看了一下代码,但看起来不是很漂亮:不符合pep8,散布着印刷品(为什么人们要在框架中这样做!?),API似乎有点不一致。
  • PyEv:不成熟,尽管它基于libevent,所以现在似乎还没有人在使用它,因此它有一个可靠的后端。
  • asyncore:来自stdlib:über低级,似乎涉及很多工作,只是为了使事情起步。
  • 龙卷风:尽管这是一种面向服务器的产品,旨在为动态网站提供服务器,但它确实具有异步HTTP客户端和简单的ioloop。看起来可以完成工作,但不能达到预期目的。[编辑:不幸的是,它不能在Windows上运行,这对我来说算是它了-这是我支持这个la脚平台的要求]

我有什么想念的吗?当然,必须有一个适合简化异步网络库的最佳选择的库!

[编辑:非常感谢intgr指向此页面。如果滚动到底部,您将看到一个非常不错的项目列表,旨在以一种或多种方式解决此任务。实际上,自Twisted诞生以来,事情确实已经发生了变化:人们现在似乎更喜欢基于协同例程的解决方案,而不是传统的面向反应器/回调的解决方案。这种方法的好处是更直接的代码:我过去确实发现过,特别是在使用boost.asio时。在C ++中,基于回调的代码可能导致难以遵循的设计,并且对于未经训练的人来说是相对模糊的。使用协同例程可使您编写看起来至少同步一些的代码。我想现在我的任务是找出我喜欢的众多库中的哪一个,并尝试一下!很高兴我现在问…]

[编辑:可能是关注或偶然发现此问题或在某种意义上关心此主题的任何人所感兴趣的:我发现了该工作可用工具的当前状态非常出色的文章]

A (long) while ago I wrote a web-spider that I multithreaded to enable concurrent requests to occur at the same time. That was in my Python youth, in the days before I knew about the GIL and the associated woes it creates for multithreaded code (IE, most of the time stuff just ends up serialized!)…

I’d like to rework this code to make it more robust and perform better. There are basically two ways I could do this: I could use the new multiprocessing module in 2.6+ or I could go for a reactor / event-based model of some sort. I would rather do the later since it’s far simpler and less error-prone.

So the question relates to what framework would be best suited to my needs. The following is a list of the options I know about so far:

  • Twisted: The granddaddy of Python reactor frameworks: seems complex and a bit bloated however. Steep learning curve for a small task.
  • Eventlet: From the guys at lindenlab. Greenlet based framework that’s geared towards these kinds of tasks. I had a look at the code though and it’s not too pretty: non-pep8 compliant, scattered with prints (why do people do this in a framework!?), API seems a little inconsistent.
  • PyEv: Immature, doesn’t seem to be anyone using it right now though it is based on libevent so it’s got a solid backend.
  • asyncore: From the stdlib: über low-level, seems like a lot of legwork involved just to get something off the ground.
  • tornado: Though this is a server oriented product designed to server dynamic websites it does feature an async HTTP client and a simple ioloop. Looks like it could get the job done but not what it was intended for. [edit: doesn’t run on Windows unfortunately, which counts it out for me – its a requirement for me to support this lame platform]

Is there anything I have missed at all? Surely there must be a library out there that fits the sweet-spot of a simplified async networking library!

[edit: big thanks to intgr for his pointer to this page. If you scroll to the bottom you will see there is a really nice list of projects that aim to tackle this task in one way or another. It seems actually that things have indeed moved on since the inception of Twisted: people now seem to favour a co-routine based solution rather than a traditional reactor / callback oriented one. The benefits of this approach are clearer more direct code: I’ve certainly found in the past, especially when working with boost.asio in C++ that callback based code can lead to designs that can be hard-to-follow and are relatively obscure to the untrained eye. Using co-routines allows you to write code that looks a little more synchronous at least. I guess now my task is to work out which one of these many libraries I like the look of and give it a go! Glad I asked now…]

[edit: perhaps of interest to anyone who followed or stumbled on this this question or cares about this topic in any sense: I found a really great writeup of the current state of the available tools for this job]


回答 0

我喜欢并发 Python模块,该模块依赖轻量级线程的Stackless Python微线程或Greenlets。所有阻塞网络I / O通过一个libevent循环透明地实现异步,因此它的效率应与真正的异步服务器差不多。

我想它在这种方式上类似于Eventlet。

缺点是其API与Python的sockets/ threading模块完全不同;您需要重写您的应用程序的一部分(或编写一个兼容性填充层)

编辑:似乎也有cogen,这是相似的,但是使用Python 2.5的增强型生成器为其协程而不是Greenlets。这使得它比并发和其他替代方法更可移植。网络I / O直接通过epoll / kqueue / iocp完成。

I liked the concurrence Python module which relies on either Stackless Python microthreads or Greenlets for light-weight threading. All blocking network I/O is transparently made asynchronous through a single libevent loop, so it should be nearly as efficient as an real asynchronous server.

I suppose it’s similar to Eventlet in this way.

The downside is that its API is quite different from Python’s sockets/threading modules; you need to rewrite a fair bit of your application (or write a compatibility shim layer)

Edit: It seems that there’s also cogen, which is similar, but uses Python 2.5’s enhanced generators for its coroutines, instead of Greenlets. This makes it more portable than concurrence and other alternatives. Network I/O is done directly with epoll/kqueue/iocp.


回答 1

扭曲是复杂的,您是正确的。扭曲肿。

如果您在此处查看:http : //twistedmatrix.com/trac/browser/trunk/twisted,您将找到一个组织良好,全面且经过良好测试的,包含许多 Internet协议的套件,以及编写的辅助代码并部署非常复杂的网络应用程序。我不会将膨胀与全面性混为一谈。

众所周知,Twisted文档乍一看并不是最用户友好的,并且我相信这会避免不幸的人们。但是如果您花时间的话,Twisted太棒了(IMHO)。我做到了,事实证明这是值得的,我建议其他人也可以尝试。

Twisted is complex, you’re right about that. Twisted is not bloated.

If you take a look here: http://twistedmatrix.com/trac/browser/trunk/twisted you’ll find an organized, comprehensive, and very well tested suite of many protocols of the internet, as well as helper code to write and deploy very sophisticated network applications. I wouldn’t confuse bloat with comprehensiveness.

It’s well known that the Twisted documentation isn’t the most user-friendly from first glance, and I believe this turns away an unfortunate number of people. But Twisted is amazing (IMHO) if you put in the time. I did and it proved to be worth it, and I’d recommend to others to try the same.


回答 2

gevent清除eventlet

在API方面,它遵循与标准库(尤其是线程和多处理模块)相同的约定(在这里有意义)。因此,您可以使用诸如QueueEvent之类的熟悉的东西。

它仅支持libevent从1.0开始更新: libev)作为反应堆实现,但充分利用了它的优点,它具有基于libevent-http的快速WSGI服务器,并通过libevent-dns解决DNS查询,而不是像其他大多数库一样使用线程池做。(更新:由于使用1.0 c-ares进行异步DNS查询;线程池也是一种选择。)

与eventlet一样,它通过使用greenlets使得不需要回调和Deferreds 。

查看示例:并发下载多个URL长时间轮询webchat

gevent is eventlet cleaned up.

API-wise it follows the same conventions as the standard library (in particular, threading and multiprocessing modules) where it makes sense. So you have familiar things like Queue and Event to work with.

It only supports libevent (update: libev since 1.0) as reactor implementation but takes full advantage of it, featuring a fast WSGI server based on libevent-http and resolving DNS queries through libevent-dns as opposed to using a thread pool like most other libraries do. (update: since 1.0 c-ares is used to make async DNS queries; threadpool is also an option.)

Like eventlet, it makes the callbacks and Deferreds unnecessary by using greenlets.

Check out the examples: concurrent download of multiple urls, long polling webchat.


回答 3

NicholasPiël在他的博客上对这些框架进行了非常有趣的比较:值得一读!

A really interesting comparison of such frameworks was compiled by Nicholas Piël on his blog: it’s well worth a read!


回答 4

这些解决方案都无法避免GIL阻止CPU并行的事实-它们只是获得线程已经具有的IO并行的更好方法。如果您认为可以做得更好的IO,则可以采取以下任何一种方法,但是如果瓶颈是处理结果,那么除了多处理模块之外,这里没有任何帮助。

None of these solutions will avoid that fact that the GIL prevents CPU parallelism – they are just better ways of getting IO parallelism that you already have with threads. If you think you can do better IO, by all means pursue one of these, but if your bottleneck is in processing the results nothing here will help except for the multiprocessing module.


回答 5

我不会说Twisted blo肿,但很难缠住你的头。我一直避免真正地学会学习,因为我一直希望对“小任务”更轻松一些。

但是,既然我已经使用了它,我不得不说所有的电池都非常好。

我使用过的所有其他异步库最终都没有看起来那么成熟。Twisted的事件循环很稳定。

我不太确定如何解决陡峭的Twisted学习曲线。如果有人将其分叉并清理一些东西,例如删除所有向后兼容的废纸and和无效项目,那可能会有所帮助。但这就是成熟软件的本质。

I wouldn’t go as far as to call Twisted bloated, but it is difficult to wrap your head around. I avoided really settling in an learn for quite a while as I always wanted something a little easier for ‘small tasks’.

However, now that I have worked with it some more I have to say having all the batteries included is VERY nice.

All the other async libraries I’ve worked with end being way less mature than they even appear. Twisted’s event loop is solid.

I’m not quite sure how to solve the steep Twisted learning curve. It might help if someone would fork it and clean a few things up, like removing all the backwards compatability cruft and the dead projects. But that’s the nature of mature software I guess.


回答 6

尚未提及Kamaelia。它的并发模型基于将组件连接在一起,并在收件箱和发件箱之间传递消息。是一个简短的概述。

Kamaelia hasn’t been mentioned yet. Its concurrency model is based on wiring together components with message passing between inboxes and outboxes. Here‘s a brief overview.


回答 7

我开始在某些事情上使用扭曲。它的美丽几乎是因为它“ blo肿”。那里有几乎所有主要协议的连接器。您可以拥有一个jabber机器人,该机器人将接收命令并将其发布到irc服务器,将其通过电子邮件发送给某人,运行命令,从NNTP服务器读取以及监视网页中的更改。坏消息是它可以完成所有这些操作,并且会使诸如OP所述的简单任务变得过于复杂。python的优点是您只包含需要的内容。因此,尽管下载量可能是20mb,但您可能只包含2mb的库(仍然很多)。我最大的困惑是,尽管它们包含示例,但您只能依靠基本的tcp服务器。

虽然不是python解决方案,但最近我已经看到node.js获得了更多的吸引力。实际上,我已经考虑过将其用于较小的项目,但是当我听到javascript时我只是畏缩:)

I’ve started to use twisted for some things. The beauty of it almost is because it’s “bloated.” There are connectors for just about any of the main protocols out there. You can have a jabber bot that will take commands and post to an irc server, email them to someone, run a command, read from an NNTP server, and monitor a web page for changes. The bad news is it can do all of that and can make things overly complex for simple tasks like the OP explained. The advantage of python though is you only include what you need. So while the download may be 20mb, you may only include 2mb of libraries (which is still a lot). My biggest complaint with twisted is although they include examples, anything beyond a basic tcp server you’re on your own.

While not a python solution, I’ve seen node.js gain a lot more traction as of late. In fact I’ve considered looking into it for smaller projects but I just cringe when I hear javascript :)


回答 8

关于这一主题的一本好书是:Abe Fettig撰写的“ Twisted Network Programming Essentials”。这些示例说明了如何编写非常Pythonic的代码,对我个人而言,不要以strike肿的框架为基础。看书中的解决方案,如果它们不是干净的,那么我不知道干净意味着什么。

我唯一的困惑与其他框架(如Ruby)相同。我担心,它会扩大规模吗?我不愿意将客户端委托给将存在可伸缩性问题的框架。

There is a good book on the subject: “Twisted Network Programming Essentials”, by Abe Fettig. The examples show how to write very Pythonic code, and to me personally, do not strike me as based on a bloated framework. Look at the solutions in the book, if they aren’t clean, then I don’t know what clean means.

My only enigma is the same I have with other frameworks, like Ruby. I worry, does it scale up? I would hate to commit a client to a framework that is going to have scalability problems.


回答 9

Whizzer是一个使用pyev的微型异步套接字框架。它的速度非常快,主要是因为pyev。它试图提供类似的界面,但略有改动。

Whizzer is a tiny asynchronous socket framework that uses pyev. Its very fast, primarily because of pyev. It attempts to provide a similiar interface as twisted with some slight changes.


回答 10

也可以尝试Syncless。它基于协程(因此类似于Concurrence,Eventlet和gevent)。它实现了socket.socket,socket.gethostbyname(等),ssl.SSLSocket,time.sleep和select.select的插入式非阻塞替换。它很快。它需要Stackless Python和libevent。它包含一个用C编写的强制性Python扩展(Pyrex / Cython)。

Also try Syncless. It’s coroutine-based (so it’s similar to Concurrence, Eventlet and gevent). It implements drop-in non-blocking replacements for socket.socket, socket.gethostbyname (etc.), ssl.SSLSocket, time.sleep and select.select. It’s fast. It needs Stackless Python and libevent. It contains a mandatory Python extension written in C (Pyrex/Cython).


回答 11

我确认不同步的好处。它可以使用libev(libevent的更新,更干净,性能更好的版本)。有时它没有libevent所提供的支持,但是现在开发过程更进一步,非常有用。

I Confirm the goodness of syncless. It can use libev (the newer, cleaner and better performance version of libevent). A while ago it didn’t have as much support as libevent, but now the development process is more advanced and syncless very useful.


回答 12

如果您只想要一个简化的,轻量级的HTTP请求库,那么我觉得Unirest真的很好

If you just want a Simplified, lightweight HTTP Request Library then I find Unirest really good


回答 13

欢迎您来看看PyWorks,它采用了完全不同的方法。它使对象实例在其自己的线程中运行,并对该对象进行异步函数调用。

只需让一个类从Task继承而不是从Object继承,它就异步了,所有方法调用都是Proxies。返回值(如果需要)是将来的代理。

res = obj.method( args )
# code continues here without waiting for method to finish
do_something_else( )
print "Result = %d" % res # Code will block here, if res not calculated yet

可以在http://bitbucket.org/raindog/pyworks上找到PyWorks。

You are welcome to have a look at PyWorks, which takes a quite different approach. It lets object instances run in their own thread and makes function call’s to that object async.

Just let a class inherit from Task instead of object and it is async, all methods calls are Proxies. Return values (if you need them) are Future proxies.

res = obj.method( args )
# code continues here without waiting for method to finish
do_something_else( )
print "Result = %d" % res # Code will block here, if res not calculated yet

PyWorks can be found on http://bitbucket.org/raindog/pyworks


Python中的异步方法调用?

问题:Python中的异步方法调用?

我想知道Python中是否有任何用于异步方法调用的库。如果您可以做这样的事情会很棒

@async
def longComputation():
    <code>


token = longComputation()
token.registerCallback(callback_function)
# alternative, polling
while not token.finished():
    doSomethingElse()
    if token.finished():
        result = token.result()

或异步调用非异步例程

def longComputation()
    <code>

token = asynccall(longComputation())

在语言核心中拥有更完善的策略,这将是很棒的。是否考虑过?

I was wondering if there’s any library for asynchronous method calls in Python. It would be great if you could do something like

@async
def longComputation():
    <code>


token = longComputation()
token.registerCallback(callback_function)
# alternative, polling
while not token.finished():
    doSomethingElse()
    if token.finished():
        result = token.result()

Or to call a non-async routine asynchronously

def longComputation()
    <code>

token = asynccall(longComputation())

It would be great to have a more refined strategy as native in the language core. Was this considered?


回答 0

您可以使用Python 2.6中添加的多处理模块。您可以使用进程池,然后通过以下方式异步获取结果:

apply_async(func[, args[, kwds[, callback]]])

例如:

from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    pool = Pool(processes=1)              # Start a worker processes.
    result = pool.apply_async(f, [10], callback) # Evaluate "f(10)" asynchronously calling callback when finished.

这只是一种选择。该模块提供了许多工具来实现您想要的。同样,用这种方法制作装饰器真的很容易。

You can use the multiprocessing module added in Python 2.6. You can use pools of processes and then get results asynchronously with:

apply_async(func[, args[, kwds[, callback]]])

E.g.:

from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    pool = Pool(processes=1)              # Start a worker processes.
    result = pool.apply_async(f, [10], callback) # Evaluate "f(10)" asynchronously calling callback when finished.

This is only one alternative. This module provides lots of facilities to achieve what you want. Also it will be really easy to make a decorator from this.


Tornado-Tornado是一个Python Web框架和异步网络库

Tornado是一个Python Web框架和异步网络库,最初是在FriendFeed通过使用非阻塞网络I/O,Tornado可以扩展到数万个开放连接,这使得它非常适合long pollingWebSockets,以及需要与每个用户建立长期连接的其他应用程序

你好,世界

这里是一个简单的“你好,世界”的旋风示例网络应用程序:

import tornado.ioloop
import tornado.web

class MainHandler(tornado.web.RequestHandler):
    def get(self):
        self.write("Hello, world")

def make_app():
    return tornado.web.Application([
        (r"/", MainHandler),
    ])

if __name__ == "__main__":
    app = make_app()
    app.listen(8888)
    tornado.ioloop.IOLoop.current().start()

此示例没有使用Tornado的任何异步特性;有关这一点,请参阅simple chat room

文档

有关其他资源的文档和链接,请访问https://www.tornadoweb.org