What exactly is the purpose of calling shutdown on the socket and then closing it? If it makes a difference, this socket is being used for non-blocking IO.

Here’s one explanation:

Once a socket is no longer required, the calling program can discard the socket by applying a close subroutine to the socket descriptor. If a reliable delivery socket has data associated with it when a close takes place, the system continues to attempt data transfer. However, if the data is still undelivered, the system discards the data. Should the application program have no use for any pending data, it can use the shutdown subroutine on the socket prior to closing it.

您打电话的时候 close它时,将句柄计数减一,如果句柄计数达到零,则套接字和关联的连接将通过正常的关闭过程(有效地将FIN / EOF发送到对等方)来释放套接字。


另一方面,调用shutdown读写会关闭基础连接,并向对等方发送FIN / EOF,而不管套接字有多少个进程。但是,它不会取消分配套接字,您仍然需要在事后调用close。

Calling close and shutdown have two different effects on the underlying socket.

The first thing to point out is that the socket is a resource in the underlying OS and multiple processes can have a handle for the same underlying socket.

When you call close it decrements the handle count by one and if the handle count has reached zero then the socket and associated connection goes through the normal close procedure (effectively sending a FIN / EOF to the peer) and the socket is deallocated.

The thing to pay attention to here is that if the handle count does not reach zero because another process still has a handle to the socket then the connection is not closed and the socket is not deallocated.

On the other hand calling shutdown for reading and writing closes the underlying connection and sends a FIN / EOF to the peer regardless of how many processes have handles to the socket. However, it does not deallocate the socket and you still need to call close afterward.

Explanation of shutdown and close: Graceful shutdown (msdn)

Shutdown (in your case) indicates to the other end of the connection there is no further intention to read from or write to the socket. Then close frees up any memory associated with the socket.

Omitting shutdown may cause the socket to linger in the OSs stack until the connection has been closed gracefully.

IMO the names ‘shutdown’ and ‘close’ are misleading, ‘close’ and ‘destroy’ would emphasise their differences.

在Socket Programming HOWTO(py2 / py3)中已经提到了


严格来说,应该先shutdown在套接字上使用close它。该shutdown是在另一端的咨询到插座。根据您传递的参数,它可能表示" 我不再发送了,但我仍会听 ",或" 我不在听,很好的摆脱!"。但是,大多数套接字库都习惯于程序员忽略使用此礼节,通常a close与相同shutdown(); close()。因此,在大多数情况下,不需要显式关闭。

it’s mentioned right in the Socket Programming HOWTO (py2/py3)


Strictly speaking, you’re supposed to use shutdown on a socket before you close it. The shutdown is an advisory to the socket at the other end. Depending on the argument you pass it, it can mean “I’m not going to send anymore, but I’ll still listen”, or “I’m not listening, good riddance!”. Most socket libraries, however, are so used to programmers neglecting to use this piece of etiquette that normally a close is the same as shutdown(); close(). So in most situations, an explicit shutdown is not needed.

根据 http://blog.netherlabs.nl/articles/2009/01/18/the-ultimate-so_linger-page-or-why-is-my-tcp-not-reliable, 需要在关机和关机之间等待关闭,直到读取返回0。

Isn’t this code above wrong?

The close call directly after the shutdown call might make the kernel discard all outgoing buffers anyway.

According to http://blog.netherlabs.nl/articles/2009/01/18/the-ultimate-so_linger-page-or-why-is-my-tcp-not-reliable one needs to wait between the shutdown and the close until read returns 0.

有一些关闭的方式:http : //msdn.microsoft.com/en-us/library/system.net.sockets.socket.shutdown.aspx。* nix是相似的。

让我解释更多,当您将数据从A发送到B时,不保证将其发送到B,仅保证将其发送到A os缓冲区,然后缓冲区又将其发送到B os缓冲区。



Shutdown(1) , forces the socket no to send any more data

This is usefull in

1- Buffer flushing

2- Strange error detection

3- Safe guarding

Let me explain more , when you send a data from A to B , it’s not guaranteed to be sent to B , it’s only guaranteed to be sent to the A os buffer , which in turn sends it to the B os buffer

So by calling shutdown(1) on A , you flush A’s buffer and an error is raised if the buffer is not empty ie: data has not been sent to the peer yet

Howoever this is irrevesable , so you can do that after you completely sent all your data and you want to be sure that it’s atleast at the peer os buffer








I need to run a shell command asynchronously from a Python script. By this I mean that I want my Python script to continue running while the external command goes off and does whatever it needs to do.

I read this post:

Calling an external command in Python

I then went off and did some testing, and it looks like os.system() will do the job provided that I use & at the end of the command so that I don’t have to wait for it to return. What I am wondering is if this is the proper way to accomplish such a thing? I tried commands.call() but it will not work for me because it blocks on the external command.

Please let me know if using os.system() for this is advisable or if I should try some other route.

from subprocess import Popen
p = Popen(['watch', 'ls']) # something long running
# ... do other stuff while subprocess is running



subprocess.Popen does exactly what you want.

from subprocess import Popen
p = Popen(['watch', 'ls']) # something long running
# ... do other stuff while subprocess is running

(Edit to complete the answer from comments)

The Popen instance can do various other things like you can poll() it to see if it is still running, and you can communicate() with it to send it data on stdin, and wait for it to terminate.

from subprocess import Popen, PIPE
import time

running_procs = [
    Popen(['/usr/bin/my_cmd', '-i %s' % path], stdout=PIPE, stderr=PIPE)
    for path in '/tmp/file0 /tmp/file1 /tmp/file2'.split()]

while running_procs:
    for proc in running_procs:
        retcode = proc.poll()
        if retcode is not None: # Process finished.
        else: # No process is done, wait a bit and check again.

    # Here, `proc` has finished with return code `retcode`
    if retcode != 0:
        """Error handling."""



If you want to run many processes in parallel and then handle them when they yield results, you can use polling like in the following:

from subprocess import Popen, PIPE
import time

running_procs = [
    Popen(['/usr/bin/my_cmd', '-i %s' % path], stdout=PIPE, stderr=PIPE)
    for path in '/tmp/file0 /tmp/file1 /tmp/file2'.split()]

while running_procs:
    for proc in running_procs:
        retcode = proc.poll()
        if retcode is not None: # Process finished.
        else: # No process is done, wait a bit and check again.

    # Here, `proc` has finished with return code `retcode`
    if retcode != 0:
        """Error handling."""

The control flow there is a little bit convoluted because I’m trying to make it small — you can refactor to your taste. :-)

This has the advantage of servicing the early-finishing requests first. If you call communicate on the first running process and that turns out to run the longest, the other running processes will have been sitting there idle when you could have been handling their results.

What I am wondering is if this [os.system()] is the proper way to accomplish such a thing?

No. os.system() is not the proper way. That’s why everyone says to use subprocess.

For more information, read http://docs.python.org/library/os.html#os.system

The subprocess module provides more powerful facilities for spawning new processes and retrieving their results; using that module is preferable to using this function. Use the subprocess module. Check especially the Replacing Older Functions with the subprocess Module section.

import os
from asynproc import Process
myProc = Process("myprogram.app")

while True:
    # check to see if process has ended
    poll = myProc.wait(os.WNOHANG)
    if poll is not None:
    # print any new output
    out = myProc.read()
    if out != "":
        print out

I’ve had good success with the asyncproc module, which deals nicely with the output from the processes. For example:

import os
from asynproc import Process
myProc = Process("myprogram.app")

while True:
    # check to see if process has ended
    poll = myProc.wait(os.WNOHANG)
    if poll is not None:
    # print any new output
    out = myProc.read()
    if out != "":
        print out

Using pexpect with non-blocking readlines is another way to do this. Pexpect solves the deadlock problems, allows you to easily run the processes in the background, and gives easy ways to have callbacks when your process spits out predefined strings, and generally makes interacting with the process much easier.

subprocess.Popen( \
    [path_to_executable, arg1, arg2, ... argN],
    creationflags = subprocess.CREATE_NEW_CONSOLE,

但是…据我所读,这不是" subprocess.CREATE_NEW_CONSOLE标记完成此事的正确方法",因为标志会产生安全风险。


Considering “I don’t have to wait for it to return”, one of the easiest solutions will be this:

subprocess.Popen( \
    [path_to_executable, arg1, arg2, ... argN],
    creationflags = subprocess.CREATE_NEW_CONSOLE,

But… From what I read this is not “the proper way to accomplish such a thing” because of security risks created by subprocess.CREATE_NEW_CONSOLE flag.

The key things that happen here is use of subprocess.CREATE_NEW_CONSOLE to create new console and .pid (returns process ID so that you could check program later on if you want to) so that not to wait for program to finish its job.

def recv_some(p, t=.1, e=1, tr=5, stderr=0):
    if tr < 1:
        tr = 1
    x = time.time()+t
    y = []
    r = ''
    pr = p.recv
    if stderr:
        pr = p.recv_err
    while time.time() < x or r:
        r = pr()
        if r is None:
            if e:
                raise Exception(message)
        elif r:
            time.sleep(max((x-time.time())/tr, 0))
    return ''.join(y)

def send_all(p, data):
    while len(data):
        sent = p.send(data)
        if sent is None:
            raise Exception(message)
        data = buffer(data, sent)

if __name__ == '__main__':
    if sys.platform == 'win32':
        shell, commands, tail = ('cmd', ('dir /w', 'echo HELLO WORLD'), '\r\n')
        shell, commands, tail = ('sh', ('ls', 'echo HELLO WORLD'), '\n')

    a = Popen(shell, stdin=PIPE, stdout=PIPE)
    print recv_some(a),
    for cmd in commands:
        send_all(a, cmd + tail)
        print recv_some(a),
    send_all(a, 'exit' + tail)
    print recv_some(a, e=0)

I have the same problem trying to connect to an 3270 terminal using the s3270 scripting software in Python. Now I’m solving the problem with an subclass of Process that I found here:


And here is the sample taken from file:

def recv_some(p, t=.1, e=1, tr=5, stderr=0):
    if tr < 1:
        tr = 1
    x = time.time()+t
    y = []
    r = ''
    pr = p.recv
    if stderr:
        pr = p.recv_err
    while time.time() < x or r:
        r = pr()
        if r is None:
            if e:
                raise Exception(message)
        elif r:
            time.sleep(max((x-time.time())/tr, 0))
    return ''.join(y)

def send_all(p, data):
    while len(data):
        sent = p.send(data)
        if sent is None:
            raise Exception(message)
        data = buffer(data, sent)

if __name__ == '__main__':
    if sys.platform == 'win32':
        shell, commands, tail = ('cmd', ('dir /w', 'echo HELLO WORLD'), '\r\n')
        shell, commands, tail = ('sh', ('ls', 'echo HELLO WORLD'), '\n')

    a = Popen(shell, stdin=PIPE, stdout=PIPE)
    print recv_some(a),
    for cmd in commands:
        send_all(a, cmd + tail)
        print recv_some(a),
    send_all(a, 'exit' + tail)
    print recv_some(a, e=0)

  1. 使它在Windows上工作
  2. 使它与多个命令一起工作
import sys
import asyncio

if sys.platform == "win32":

async def _read_stream(stream, cb):
    while True:
        line = await stream.readline()
        if line:

async def _stream_subprocess(cmd, stdout_cb, stderr_cb):
        process = await asyncio.create_subprocess_exec(
            *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE

        await asyncio.wait(
                _read_stream(process.stdout, stdout_cb),
                _read_stream(process.stderr, stderr_cb),
        rc = await process.wait()
        return process.pid, rc
    except OSError as e:
        # the program will hang if we let any exception propagate
        return e

def execute(*aws):
    """ run the given coroutines in an asyncio loop
    returns a list containing the values returned from each coroutine.
    loop = asyncio.get_event_loop()
    rc = loop.run_until_complete(asyncio.gather(*aws))
    return rc

def printer(label):
    def pr(*args, **kw):
        print(label, *args, **kw)

    return pr

def name_it(start=0, template="s{}"):
    """a simple generator for task names
    while True:
        yield template.format(start)
        start += 1

def runners(cmds):
    cmds is a list of commands to excecute as subprocesses
    each item is a list appropriate for use by subprocess.call
    next_name = name_it().__next__
    for cmd in cmds:
        name = next_name()
        out = printer(f"{name}.stdout")
        err = printer(f"{name}.stderr")
        yield _stream_subprocess(cmd, out, err)

if __name__ == "__main__":
    cmds = (
            """echo "$SHELL"-stdout && sleep 1 && echo stderr 1>&2 && sleep 1 && echo done""",
            "echo 'hello, Dave.' && sleep 1 && echo dave_err 1>&2 && sleep 1 && echo done",
        [sys.executable, "-c", 'print("hello from python");import sys;sys.exit(2)'],



The accepted answer is very old.

I found a better modern answer here:


and made some changes:

  1. make it work on windows
  2. make it work with multiple commands
import sys
import asyncio

if sys.platform == "win32":

async def _read_stream(stream, cb):
    while True:
        line = await stream.readline()
        if line:

async def _stream_subprocess(cmd, stdout_cb, stderr_cb):
        process = await asyncio.create_subprocess_exec(
            *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE

        await asyncio.wait(
                _read_stream(process.stdout, stdout_cb),
                _read_stream(process.stderr, stderr_cb),
        rc = await process.wait()
        return process.pid, rc
    except OSError as e:
        # the program will hang if we let any exception propagate
        return e

def execute(*aws):
    """ run the given coroutines in an asyncio loop
    returns a list containing the values returned from each coroutine.
    loop = asyncio.get_event_loop()
    rc = loop.run_until_complete(asyncio.gather(*aws))
    return rc

def printer(label):
    def pr(*args, **kw):
        print(label, *args, **kw)

    return pr

def name_it(start=0, template="s{}"):
    """a simple generator for task names
    while True:
        yield template.format(start)
        start += 1

def runners(cmds):
    cmds is a list of commands to excecute as subprocesses
    each item is a list appropriate for use by subprocess.call
    next_name = name_it().__next__
    for cmd in cmds:
        name = next_name()
        out = printer(f"{name}.stdout")
        err = printer(f"{name}.stderr")
        yield _stream_subprocess(cmd, out, err)

if __name__ == "__main__":
    cmds = (
            """echo "$SHELL"-stdout && sleep 1 && echo stderr 1>&2 && sleep 1 && echo done""",
            "echo 'hello, Dave.' && sleep 1 && echo dave_err 1>&2 && sleep 1 && echo done",
        [sys.executable, "-c", 'print("hello from python");import sys;sys.exit(2)'],


It is unlikely that the example commands will work perfectly on your system, and it doesn’t handle weird errors, but this code does demonstrate one way to run multiple subprocesses using asyncio and stream the output.

  1. 我不想等待命令完成或用子进程输出污染我的终端。

  2. 我想使用重定向运行bash脚本。

  3. 我想在我的bash脚本中支持管道(例如find ... | tar ...)。


subprocess.Popen(['./my_script.sh "arg1" > "redirect/path/to"'],

There are several answers here but none of them satisfied my below requirements:

  1. I don’t want to wait for command to finish or pollute my terminal with subprocess outputs.

  2. I want to run bash script with redirects.

  3. I want to support piping within my bash script (for example find ... | tar ...).

The only combination that satiesfies above requirements is:

subprocess.Popen(['./my_script.sh "arg1" > "redirect/path/to"'],

Python 3子过程示例在"等待命令异步终止"下对此进行了介绍:

import asyncio

proc = await asyncio.create_subprocess_exec(

# do something else while ls is working

# if proc takes very long to complete, the CPUs are free to use cycles for 
# other processes
stdout, stderr = await proc.communicate()

该过程完成后将立即开始运行await asyncio.create_subprocess_exec(...)。如果在您调用时还没有完成await proc.communicate(),它将在那儿等待,以便为您提供输出状态。如果完成,proc.communicate()将立即返回。



This is covered by Python 3 Subprocess Examples under “Wait for command to terminate asynchronously”:

import asyncio

proc = await asyncio.create_subprocess_exec(

# do something else while ls is working

# if proc takes very long to complete, the CPUs are free to use cycles for 
# other processes
stdout, stderr = await proc.communicate()

The process will start running as soon as the await asyncio.create_subprocess_exec(...) has completed. If it hasn’t finished by the time you call await proc.communicate(), it will wait there in order to give you your output status. If it has finished, proc.communicate() will return immediately.

The gist here is similar to Terrels answer but I think Terrels answer appears to overcomplicate things.

@app.route('/render/<id>', methods=['POST'])
def render_script(id=None):
    data = json.loads(request.data)
    text_list = data.get('text_list')
    final_file = audio_class.render_audio(data=text_list)
    # do stuff
    return Response(


final_file = audio_class.render_audio()



I am writing an application in Flask, which works really well except that WSGI is synchronous and blocking. I have one task in particular which calls out to a third party API and that task can take several minutes to complete. I would like to make that call (it’s actually a series of calls) and let it run. while control is returned to Flask.

My view looks like:

@app.route('/render/<id>', methods=['POST'])
def render_script(id=None):
    data = json.loads(request.data)
    text_list = data.get('text_list')
    final_file = audio_class.render_audio(data=text_list)
    # do stuff
    return Response(

Now, what I want to do is have the line

final_file = audio_class.render_audio()

run and provide a callback to be executed when the method returns, whilst Flask can continue to process requests. This is the only task which I need Flask to run asynchronously, and I would like some advice on how best to implement this.

I have looked at Twisted and Klein, but I’m not sure they are overkill, as maybe Threading would suffice. Or maybe Celery is a good choice for this?

回答 0



from flask import Flask
from celery import Celery

broker_url = 'amqp://guest@localhost'          # Broker URL for RabbitMQ task queue

app = Flask(__name__)    
celery = Celery(app.name, broker=broker_url)
celery.config_from_object('celeryconfig')      # Your celery configurations in a celeryconfig.py

def some_long_task(self, x, y):
    # Do some long task

@app.route('/render/<id>', methods=['POST'])
def render_script(id=None):
    data = json.loads(request.data)
    text_list = data.get('text_list')
    final_file = audio_class.render_audio(data=text_list)
    some_long_task.delay(x, y)                 # Call your async task and pass whatever necessary variables
    return Response(


$ celery worker -A app.celery --loglevel=debug

我还将参考Miguel Gringberg的文章以获取更深入的将Celery与Flask结合使用的指南。

I would use Celery to handle the asynchronous task for you. You’ll need to install a broker to serve as your task queue (RabbitMQ and Redis are recommended).


from flask import Flask
from celery import Celery

broker_url = 'amqp://guest@localhost'          # Broker URL for RabbitMQ task queue

app = Flask(__name__)    
celery = Celery(app.name, broker=broker_url)
celery.config_from_object('celeryconfig')      # Your celery configurations in a celeryconfig.py

def some_long_task(self, x, y):
    # Do some long task

@app.route('/render/<id>', methods=['POST'])
def render_script(id=None):
    data = json.loads(request.data)
    text_list = data.get('text_list')
    final_file = audio_class.render_audio(data=text_list)
    some_long_task.delay(x, y)                 # Call your async task and pass whatever necessary variables
    return Response(

Run your Flask app, and start another process to run your celery worker.

$ celery worker -A app.celery --loglevel=debug

I would also refer to Miguel Gringberg’s write up for a more in depth guide to using Celery with Flask.

回答 1


该解决方案基于Miguel Grinberg的PyCon 2016 Flask Scale演示文稿,特别是其幻灯片平台中的第41张幻灯片。对于原始源感兴趣的人,他的代码也可以在github上找到。


  1. 您调用执行长时间运行任务的端点。
  2. 该端点返回202接受的链接,以检查任务状态。
  3. 在taks仍在运行时,对状态链接的调用返回202,在任务完成时返回200(及其结果)。



from flask import Flask, g, abort, current_app, request, url_for
from werkzeug.exceptions import HTTPException, InternalServerError
from flask_restful import Resource, Api
from datetime import datetime
from functools import wraps
import threading
import time
import uuid

tasks = {}

app = Flask(__name__)
api = Api(app)

def before_first_request():
    """Start a background thread that cleans up old tasks."""
    def clean_old_tasks():
        This function cleans up old tasks from our in-memory data structure.
        global tasks
        while True:
            # Only keep tasks that are running or that finished less than 5
            # minutes ago.
            five_min_ago = datetime.timestamp(datetime.utcnow()) - 5 * 60
            tasks = {task_id: task for task_id, task in tasks.items()
                     if 'completion_timestamp' not in task or task['completion_timestamp'] > five_min_ago}

    if not current_app.config['TESTING']:
        thread = threading.Thread(target=clean_old_tasks)

def async_api(wrapped_function):
    def new_function(*args, **kwargs):
        def task_call(flask_app, environ):
            # Create a request context similar to that of the original request
            # so that the task can have access to flask.g, flask.request, etc.
            with flask_app.request_context(environ):
                    tasks[task_id]['return_value'] = wrapped_function(*args, **kwargs)
                except HTTPException as e:
                    tasks[task_id]['return_value'] = current_app.handle_http_exception(e)
                except Exception as e:
                    # The function raised an exception, so we set a 500 error
                    tasks[task_id]['return_value'] = InternalServerError()
                    if current_app.debug:
                        # We want to find out if something happened so reraise
                    # We record the time of the response, to help in garbage
                    # collecting old tasks
                    tasks[task_id]['completion_timestamp'] = datetime.timestamp(datetime.utcnow())

                    # close the database session (if any)

        # Assign an id to the asynchronous task
        task_id = uuid.uuid4().hex

        # Record the task, and then launch it
        tasks[task_id] = {'task_thread': threading.Thread(
            target=task_call, args=(current_app._get_current_object(),

        # Return a 202 response, with a link that the client can use to
        # obtain task status
        print(url_for('gettaskstatus', task_id=task_id))
        return 'accepted', 202, {'Location': url_for('gettaskstatus', task_id=task_id)}
    return new_function

class GetTaskStatus(Resource):
    def get(self, task_id):
        Return status about an asynchronous task. If this request returns a 202
        status code, it means that task hasn't finished yet. Else, the response
        from the task is returned.
        task = tasks.get(task_id)
        if task is None:
        if 'return_value' not in task:
            return '', 202, {'Location': url_for('gettaskstatus', task_id=task_id)}
        return task['return_value']

class CatchAll(Resource):
    def get(self, path=''):
        # perform some intensive processing
        print("starting processing task, path: '%s'" % path)
        print("completed processing task, path: '%s'" % path)
        return f'The answer is: {path}'

api.add_resource(CatchAll, '/<path:path>', '/')
api.add_resource(GetTaskStatus, '/status/<task_id>')

if __name__ == '__main__':

Threading is another possible solution. Although the Celery based solution is better for applications at scale, if you are not expecting too much traffic on the endpoint in question, threading is a viable alternative.

This solution is based on Miguel Grinberg’s PyCon 2016 Flask at Scale presentation, specifically slide 41 in his slide deck. His code is also available on github for those interested in the original source.

From a user perspective the code works as follows:

  1. You make a call to the endpoint that performs the long running task.
  2. This endpoint returns 202 Accepted with a link to check on the task status.
  3. Calls to the status link returns 202 while the taks is still running, and returns 200 (and the result) when the task is complete.

To convert an api call to a background task, simply add the @async_api decorator.

Here is a fully contained example:

from flask import Flask, g, abort, current_app, request, url_for
from werkzeug.exceptions import HTTPException, InternalServerError
from flask_restful import Resource, Api
from datetime import datetime
from functools import wraps
import threading
import time
import uuid

tasks = {}

app = Flask(__name__)
api = Api(app)

def before_first_request():
    """Start a background thread that cleans up old tasks."""
    def clean_old_tasks():
        This function cleans up old tasks from our in-memory data structure.
        global tasks
        while True:
            # Only keep tasks that are running or that finished less than 5
            # minutes ago.
            five_min_ago = datetime.timestamp(datetime.utcnow()) - 5 * 60
            tasks = {task_id: task for task_id, task in tasks.items()
                     if 'completion_timestamp' not in task or task['completion_timestamp'] > five_min_ago}

    if not current_app.config['TESTING']:
        thread = threading.Thread(target=clean_old_tasks)

def async_api(wrapped_function):
    def new_function(*args, **kwargs):
        def task_call(flask_app, environ):
            # Create a request context similar to that of the original request
            # so that the task can have access to flask.g, flask.request, etc.
            with flask_app.request_context(environ):
                    tasks[task_id]['return_value'] = wrapped_function(*args, **kwargs)
                except HTTPException as e:
                    tasks[task_id]['return_value'] = current_app.handle_http_exception(e)
                except Exception as e:
                    # The function raised an exception, so we set a 500 error
                    tasks[task_id]['return_value'] = InternalServerError()
                    if current_app.debug:
                        # We want to find out if something happened so reraise
                    # We record the time of the response, to help in garbage
                    # collecting old tasks
                    tasks[task_id]['completion_timestamp'] = datetime.timestamp(datetime.utcnow())

                    # close the database session (if any)

        # Assign an id to the asynchronous task
        task_id = uuid.uuid4().hex

        # Record the task, and then launch it
        tasks[task_id] = {'task_thread': threading.Thread(
            target=task_call, args=(current_app._get_current_object(),

        # Return a 202 response, with a link that the client can use to
        # obtain task status
        print(url_for('gettaskstatus', task_id=task_id))
        return 'accepted', 202, {'Location': url_for('gettaskstatus', task_id=task_id)}
    return new_function

class GetTaskStatus(Resource):
    def get(self, task_id):
        Return status about an asynchronous task. If this request returns a 202
        status code, it means that task hasn't finished yet. Else, the response
        from the task is returned.
        task = tasks.get(task_id)
        if task is None:
        if 'return_value' not in task:
            return '', 202, {'Location': url_for('gettaskstatus', task_id=task_id)}
        return task['return_value']

class CatchAll(Resource):
    def get(self, path=''):
        # perform some intensive processing
        print("starting processing task, path: '%s'" % path)
        print("completed processing task, path: '%s'" % path)
        return f'The answer is: {path}'

api.add_resource(CatchAll, '/<path:path>', '/')
api.add_resource(GetTaskStatus, '/status/<task_id>')

if __name__ == '__main__':

您也可以尝试使用multiprocessing.Processdaemon=True; 该process.start()方法不会阻止,您可以在后台执行昂贵的函数时立即将响应/状态返回给调用方。



from multiprocessing import Process

@app.route('/render/<id>', methods=['POST'])
def render_script(id=None):
    heavy_process = Process(  # Create a daemonic process with heavy "my_func"
    return Response(

# Define some heavy function
def my_func():
    print("Process finished")



You can also try using multiprocessing.Process with daemon=True; the process.start() method does not block and you can return a response/status immediately to the caller while your expensive function executes in the background.

I experienced similar problem while working with falcon framework and using daemon process helped.

You’d need to do the following:

from multiprocessing import Process

@app.route('/render/<id>', methods=['POST'])
def render_script(id=None):
    heavy_process = Process(  # Create a daemonic process with heavy "my_func"
    return Response(

# Define some heavy function
def my_func():
    print("Process finished")

You should get a response immediately and, after 10s you should see a printed message in the console.

NOTE: Keep in mind that daemonic processes are not allowed to spawn any child processes.



out = async.map(rs)
print out[0].content

I tried the sample provided within the documentation of the requests library for python.

With async.map(rs), I get the response codes, but I want to get the content of each page requested. This, for example, does not work:

out = async.map(rs)
print out[0].content

下面的答案是适用于请求v0.13.0 +。编写此问题后,异步功能已移至grequests。但是,您可以将其替换requestsgrequests下面的内容,它应该可以工作。


async.map 异步执行多个任务,您必须:

  1. 为每个对象定义一个函数(您的任务)
  2. 将该函数添加为请求中的事件挂钩
  3. 调用async.map所有请求/操作的列表


from requests import async
# If using requests > v0.13.0, use
# from grequests import async

urls = [

# A simple task to do to each response object
def do_something(response):
    print response.url

# A list to hold our things to do via async
async_list = []

for u in urls:
    # The "hooks = {..." part is where you define what you want to do
    # Note the lack of parentheses following do_something, this is
    # because the response will be used as the first argument automatically
    action_item = async.get(u, hooks = {'response' : do_something})

    # Add the task to our list of things to do via async

# Do our list of things to do via async


The below answer is not applicable to requests v0.13.0+. The asynchronous functionality was moved to grequests after this question was written. However, you could just replace requests with grequests below and it should work.

I’ve left this answer as is to reflect the original question which was about using requests < v0.13.0.

To do multiple tasks with async.map asynchronously you have to:

  1. Define a function for what you want to do with each object (your task)
  2. Add that function as an event hook in your request
  3. Call async.map on a list of all the requests / actions


from requests import async
# If using requests > v0.13.0, use
# from grequests import async

urls = [

# A simple task to do to each response object
def do_something(response):
    print response.url

# A list to hold our things to do via async
async_list = []

for u in urls:
    # The "hooks = {..." part is where you define what you want to do
    # Note the lack of parentheses following do_something, this is
    # because the response will be used as the first argument automatically
    action_item = async.get(u, hooks = {'response' : do_something})

    # Add the task to our list of things to do via async

# Do our list of things to do via async

看到这里:https : //github.com/kennethreitz/grequests



$ pip install grequests



import grequests

urls = [

rs = (grequests.get(u) for u in urls)




[<Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>]


async is now an independent module : grequests.

See here : https://github.com/kennethreitz/grequests

And there: Ideal method for sending multiple HTTP requests over Python?


$ pip install grequests


build a stack:

import grequests

urls = [

rs = (grequests.get(u) for u in urls)

send the stack


result looks like

[<Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>]

grequests don’t seem to set a limitation for concurrent requests, ie when multiple requests are sent to the same server.

import requests
import concurrent.futures

def get_urls():
    return ["url1","url2"]

def load_url(url, timeout):
    return requests.get(url, timeout = timeout)

with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:

    future_to_url = {executor.submit(load_url, url, 10): url for url in     get_urls()}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
            data = future.result()
        except Exception as exc:
            resp_err = resp_err + 1
            resp_ok = resp_ok + 1

I tested both requests-futures and grequests. Grequests is faster but brings monkey patching and additional problems with dependencies. requests-futures is several times slower than grequests. I decided to write my own and simply wrapped requests into ThreadPoolExecutor and it was almost as fast as grequests, but without external dependencies.

import requests
import concurrent.futures

def get_urls():
    return ["url1","url2"]

def load_url(url, timeout):
    return requests.get(url, timeout = timeout)

with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:

    future_to_url = {executor.submit(load_url, url, 10): url for url in     get_urls()}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
            data = future.result()
        except Exception as exc:
            resp_err = resp_err + 1
            resp_ok = resp_ok + 1

from requests_futures.sessions import FuturesSession

session = FuturesSession()
# first request is started in background
future_one = session.get('http://httpbin.org/get')
# second requests is started immediately
future_two = session.get('http://httpbin.org/get?foo=bar')
# wait for the first request to complete, if it hasn't already
response_one = future_one.result()
print('response one status: {0}'.format(response_one.status_code))
# wait for the second request to complete, if it hasn't already
response_two = future_two.result()
print('response two status: {0}'.format(response_two.status_code))


maybe requests-futures is another choice.

from requests_futures.sessions import FuturesSession

session = FuturesSession()
# first request is started in background
future_one = session.get('http://httpbin.org/get')
# second requests is started immediately
future_two = session.get('http://httpbin.org/get?foo=bar')
# wait for the first request to complete, if it hasn't already
response_one = future_one.result()
print('response one status: {0}'.format(response_one.status_code))
# wait for the second request to complete, if it hasn't already
response_two = future_two.result()
print('response two status: {0}'.format(response_two.status_code))

It is also recommended in the office document. If you don’t want involve gevent, it’s a good one.

import asyncio

loop = asyncio.get_event_loop()

def do_thing(params):
    async def get_rpc_info_and_do_chores(id):
        # do things
        response = perform_grpc_call(id)

    async def get_httpapi_info_and_do_chores(id):
        # do things
        response = requests.get(URL)

    async_tasks = []
    for element in list(params.list_of_things):



I have a lot of issues with most of the answers posted – they either use deprecated libraries that have been ported over with limited features, or provide a solution with too much magic on the execution of the request, making it difficult to error handle. If they do not fall into one of the above categories, they’re 3rd party libraries or deprecated.

Some of the solutions works alright purely in http requests, but the solutions fall short for any other kind of request, which is ludicrous. A highly customized solution is not necessary here.

Simply using the python built-in library asyncio is sufficient enough to perform asynchronous requests of any type, as well as providing enough fluidity for complex and usecase specific error handling.

import asyncio

loop = asyncio.get_event_loop()

def do_thing(params):
    async def get_rpc_info_and_do_chores(id):
        # do things
        response = perform_grpc_call(id)

    async def get_httpapi_info_and_do_chores(id):
        # do things
        response = requests.get(URL)

    async_tasks = []
    for element in list(params.list_of_things):


How it works is simple. You’re creating a series of tasks you’d like to occur asynchronously, and then asking a loop to execute those tasks and exit upon completion. No extra libraries subject to lack of maintenance, no lack of functionality required.

list_of_requests = ['http://moop.com', 'http://doop.com', ...]

from simple_requests import Requests
for response in Requests().swarm(list_of_requests):
    print response.content

这些文档在这里:http : //pythonhosted.org/simple-requests/

I know this has been closed for a while, but I thought it might be useful to promote another async solution built on the requests library.

list_of_requests = ['http://moop.com', 'http://doop.com', ...]

from simple_requests import Requests
for response in Requests().swarm(list_of_requests):
    print response.content

The docs are here: http://pythonhosted.org/simple-requests/

for requestURI in requests:
    t = Thread(target=self.openURL, args=(requestURI,))

for thread in threads:


def openURL(self, requestURI):
    o = urllib2.urlopen(requestURI, timeout = 600)
from threading import Thread


for requestURI in requests:
    t = Thread(target=self.openURL, args=(requestURI,))

for thread in threads:


def openURL(self, requestURI):
    o = urllib2.urlopen(requestURI, timeout = 600)

如果你想使用ASYNCIO,然后requests-async提供异步/ AWAIT功能为requestshttps://github.com/encode/requests-async

If you want to use asyncio, then requests-async provides async/await functionality for requestshttps://github.com/encode/requests-async

回答 8

我一直在使用python请求对github的gist API进行异步调用。




I have been using python requests for async calls against github’s gist API for some time.

For an example, see the code here:


This style of python may not be the clearest example, but I can assure you that the code works. Let me know if this is confusing to you and I will document it.

import httpx

async def get_async(url):
    async with httpx.AsyncClient() as client:
        return await client.get(url)

urls = ["http://google.com", "http://wikipedia.org"]

# Note that you need an async context to use `await`.
await asyncio.gather(*map(get_async, urls))

如果您需要功能语法,则gamla lib 会将其包装到中get_async


await gamla.map(gamla.get_async(10), ["http://google.com", "http://wikipedia.org"])



You can use httpx for that.

import httpx

async def get_async(url):
    async with httpx.AsyncClient() as client:
        return await client.get(url)

urls = ["http://google.com", "http://wikipedia.org"]

# Note that you need an async context to use `await`.
await asyncio.gather(*map(get_async, urls))

if you want a functional syntax, the gamla lib wraps this into get_async.

Then you can do

await gamla.map(gamla.get_async(10), ["http://google.com", "http://wikipedia.org"])

The 10 is the timeout in seconds.

(disclaimer: I am its author)

I have also tried some things using the asynchronous methods in python, how ever I have had much better luck using twisted for asynchronous programming. It has fewer problems and is well documented. Here is a link of something simmilar to what you are trying in twisted.







  • Twisted:Python反应器框架的祖父:看起来很复杂,但是有点a肿。陡峭的学习曲线,可完成一项小任务。
  • Eventlet:从在家伙lindenlab。基于Greenlet的框架,适用于此类任务。我看了一下代码,但看起来不是很漂亮:不符合pep8,散布着印刷品(为什么人们要在框架中这样做!?),API似乎有点不一致。
  • PyEv:不成熟,尽管它基于libevent,所以现在似乎还没有人在使用它,因此它有一个可靠的后端。
  • asyncore:来自stdlib:über低级,似乎涉及很多工作,只是为了使事情起步。
  • 龙卷风:尽管这是一种面向服务器的产品,旨在为动态网站提供服务器,但它确实具有异步HTTP客户端和简单的ioloop。看起来可以完成工作,但不能达到预期目的。[编辑:不幸的是,它不能在Windows上运行,这对我来说算是它了-这是我支持这个la脚平台的要求]


[编辑:非常感谢intgr指向此页面。如果滚动到底部,您将看到一个非常不错的项目列表,旨在以一种或多种方式解决此任务。实际上,自Twisted诞生以来,事情确实已经发生了变化:人们现在似乎更喜欢基于协同例程的解决方案,而不是传统的面向反应器/回调的解决方案。这种方法的好处是更直接的代码:我过去确实发现过,特别是在使用boost.asio时。在C ++中,基于回调的代码可能导致难以遵循的设计,并且对于未经训练的人来说是相对模糊的。使用协同例程可使您编写看起来至少同步一些的代码。我想现在我的任务是找出我喜欢的众多库中的哪一个,并尝试一下!很高兴我现在问…]


A (long) while ago I wrote a web-spider that I multithreaded to enable concurrent requests to occur at the same time. That was in my Python youth, in the days before I knew about the GIL and the associated woes it creates for multithreaded code (IE, most of the time stuff just ends up serialized!)…

I’d like to rework this code to make it more robust and perform better. There are basically two ways I could do this: I could use the new multiprocessing module in 2.6+ or I could go for a reactor / event-based model of some sort. I would rather do the later since it’s far simpler and less error-prone.

So the question relates to what framework would be best suited to my needs. The following is a list of the options I know about so far:

  • Twisted: The granddaddy of Python reactor frameworks: seems complex and a bit bloated however. Steep learning curve for a small task.
  • Eventlet: From the guys at lindenlab. Greenlet based framework that’s geared towards these kinds of tasks. I had a look at the code though and it’s not too pretty: non-pep8 compliant, scattered with prints (why do people do this in a framework!?), API seems a little inconsistent.
  • PyEv: Immature, doesn’t seem to be anyone using it right now though it is based on libevent so it’s got a solid backend.
  • asyncore: From the stdlib: über low-level, seems like a lot of legwork involved just to get something off the ground.
  • tornado: Though this is a server oriented product designed to server dynamic websites it does feature an async HTTP client and a simple ioloop. Looks like it could get the job done but not what it was intended for. [edit: doesn’t run on Windows unfortunately, which counts it out for me – its a requirement for me to support this lame platform]

Is there anything I have missed at all? Surely there must be a library out there that fits the sweet-spot of a simplified async networking library!

[edit: big thanks to intgr for his pointer to this page. If you scroll to the bottom you will see there is a really nice list of projects that aim to tackle this task in one way or another. It seems actually that things have indeed moved on since the inception of Twisted: people now seem to favour a co-routine based solution rather than a traditional reactor / callback oriented one. The benefits of this approach are clearer more direct code: I’ve certainly found in the past, especially when working with boost.asio in C++ that callback based code can lead to designs that can be hard-to-follow and are relatively obscure to the untrained eye. Using co-routines allows you to write code that looks a little more synchronous at least. I guess now my task is to work out which one of these many libraries I like the look of and give it a go! Glad I asked now…]

[edit: perhaps of interest to anyone who followed or stumbled on this this question or cares about this topic in any sense: I found a really great writeup of the current state of the available tools for this job]

我喜欢并发 Python模块,该模块依赖轻量级线程的Stackless Python微线程或Greenlets。所有阻塞网络I / O通过一个libevent循环透明地实现异步,因此它的效率应与真正的异步服务器差不多。


缺点是其API与Python的sockets/ threading模块完全不同;您需要重写您的应用程序的一部分(或编写一个兼容性填充层)

编辑:似乎也有cogen,这是相似的,但是使用Python 2.5的增强型生成器为其协程而不是Greenlets。这使得它比并发和其他替代方法更可移植。网络I / O直接通过epoll / kqueue / iocp完成。

I liked the concurrence Python module which relies on either Stackless Python microthreads or Greenlets for light-weight threading. All blocking network I/O is transparently made asynchronous through a single libevent loop, so it should be nearly as efficient as an real asynchronous server.

I suppose it’s similar to Eventlet in this way.

The downside is that its API is quite different from Python’s sockets/threading modules; you need to rewrite a fair bit of your application (or write a compatibility shim layer)

Edit: It seems that there’s also cogen, which is similar, but uses Python 2.5’s enhanced generators for its coroutines, instead of Greenlets. This makes it more portable than concurrence and other alternatives. Network I/O is done directly with epoll/kqueue/iocp.

回答 1


如果您在此处查看:http : //twistedmatrix.com/trac/browser/trunk/twisted,您将找到一个组织良好,全面且经过良好测试的,包含许多 Internet协议的套件,以及编写的辅助代码并部署非常复杂的网络应用程序。我不会将膨胀与全面性混为一谈。


Twisted is complex, you’re right about that. Twisted is not bloated.

If you take a look here: http://twistedmatrix.com/trac/browser/trunk/twisted you’ll find an organized, comprehensive, and very well tested suite of many protocols of the internet, as well as helper code to write and deploy very sophisticated network applications. I wouldn’t confuse bloat with comprehensiveness.

It’s well known that the Twisted documentation isn’t the most user-friendly from first glance, and I believe this turns away an unfortunate number of people. But Twisted is amazing (IMHO) if you put in the time. I did and it proved to be worth it, and I’d recommend to others to try the same.

它仅支持libevent从1.0开始更新: libev)作为反应堆实现,但充分利用了它的优点,它具有基于libevent-http的快速WSGI服务器,并通过libevent-dns解决DNS查询,而不是像其他大多数库一样使用线程池做。(更新:由于使用1.0 c-ares进行异步DNS查询;线程池也是一种选择。)

与eventlet一样,它通过使用greenlets使得不需要回调和Deferreds 。


gevent is eventlet cleaned up.

API-wise it follows the same conventions as the standard library (in particular, threading and multiprocessing modules) where it makes sense. So you have familiar things like Queue and Event to work with.

It only supports libevent (update: libev since 1.0) as reactor implementation but takes full advantage of it, featuring a fast WSGI server based on libevent-http and resolving DNS queries through libevent-dns as opposed to using a thread pool like most other libraries do. (update: since 1.0 c-ares is used to make async DNS queries; threadpool is also an option.)

Like eventlet, it makes the callbacks and Deferreds unnecessary by using greenlets.

Check out the examples: concurrent download of multiple urls, long polling webchat.

回答 3


回答 4


回答 5

我不会说Twisted blo肿,但很难缠住你的头。我一直避免真正地学会学习,因为我一直希望对"小任务"更轻松一些。




I wouldn’t go as far as to call Twisted bloated, but it is difficult to wrap your head around. I avoided really settling in an learn for quite a while as I always wanted something a little easier for ‘small tasks’.

However, now that I have worked with it some more I have to say having all the batteries included is VERY nice.

All the other async libraries I’ve worked with end being way less mature than they even appear. Twisted’s event loop is solid.

I’m not quite sure how to solve the steep Twisted learning curve. It might help if someone would fork it and clean a few things up, like removing all the backwards compatability cruft and the dead projects. But that’s the nature of mature software I guess.

回答 6


Kamaelia hasn’t been mentioned yet. Its concurrency model is based on wiring together components with message passing between inboxes and outboxes. Here‘s a brief overview.

回答 7

I’ve started to use twisted for some things. The beauty of it almost is because it’s “bloated.” There are connectors for just about any of the main protocols out there. You can have a jabber bot that will take commands and post to an irc server, email them to someone, run a command, read from an NNTP server, and monitor a web page for changes. The bad news is it can do all of that and can make things overly complex for simple tasks like the OP explained. The advantage of python though is you only include what you need. So while the download may be 20mb, you may only include 2mb of libraries (which is still a lot). My biggest complaint with twisted is although they include examples, anything beyond a basic tcp server you’re on your own.

While not a python solution, I’ve seen node.js gain a lot more traction as of late. In fact I’ve considered looking into it for smaller projects but I just cringe when I hear javascript :)

回答 8

关于这一主题的一本好书是:Abe Fettig撰写的" Twisted Network Programming Essentials"。这些示例说明了如何编写非常Pythonic的代码,对我个人而言,不要以strike肿的框架为基础。看书中的解决方案,如果它们不是干净的,那么我不知道干净意味着什么。


There is a good book on the subject: “Twisted Network Programming Essentials”, by Abe Fettig. The examples show how to write very Pythonic code, and to me personally, do not strike me as based on a bloated framework. Look at the solutions in the book, if they aren’t clean, then I don’t know what clean means.

My only enigma is the same I have with other frameworks, like Ruby. I worry, does it scale up? I would hate to commit a client to a framework that is going to have scalability problems.

回答 9


回答 10

也可以尝试Syncless。它基于协程(因此类似于Concurrence,Eventlet和gevent)。它实现了socket.socket,socket.gethostbyname(等),ssl.SSLSocket,time.sleep和select.select的插入式非阻塞替换。它很快。它需要Stackless Python和libevent。它包含一个用C编写的强制性Python扩展(Pyrex / Cython)。

Also try Syncless. It’s coroutine-based (so it’s similar to Concurrence, Eventlet and gevent). It implements drop-in non-blocking replacements for socket.socket, socket.gethostbyname (etc.), ssl.SSLSocket, time.sleep and select.select. It’s fast. It needs Stackless Python and libevent. It contains a mandatory Python extension written in C (Pyrex/Cython).

回答 11


回答 12


res = obj.method( args )
# code continues here without waiting for method to finish
do_something_else( )
print "Result = %d" % res # Code will block here, if res not calculated yet


You are welcome to have a look at PyWorks, which takes a quite different approach. It lets object instances run in their own thread and makes function call’s to that object async.

Just let a class inherit from Task instead of object and it is async, all methods calls are Proxies. Return values (if you need them) are Future proxies.

res = obj.method( args )
# code continues here without waiting for method to finish
do_something_else( )
print "Result = %d" % res # Code will block here, if res not calculated yet

def longComputation():

token = longComputation()
# alternative, polling
while not token.finished():
    if token.finished():
        result = token.result()


def longComputation()

token = asynccall(longComputation())


I was wondering if there’s any library for asynchronous method calls in Python. It would be great if you could do something like

def longComputation():

token = longComputation()
# alternative, polling
while not token.finished():
    if token.finished():
        result = token.result()

Or to call a non-async routine asynchronously

def longComputation()

token = asynccall(longComputation())

It would be great to have a more refined strategy as native in the language core. Was this considered?

回答 0

您可以使用Python 2.6中添加的多处理模块。您可以使用进程池,然后通过以下方式异步获取结果:

apply_async(func[, args[, kwds[, callback]]])


from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    pool = Pool(processes=1)              # Start a worker processes.
    result = pool.apply_async(f, [10], callback) # Evaluate "f(10)" asynchronously calling callback when finished.


You can use the multiprocessing module added in Python 2.6. You can use pools of processes and then get results asynchronously with:

apply_async(func[, args[, kwds[, callback]]])


from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    pool = Pool(processes=1)              # Start a worker processes.
    result = pool.apply_async(f, [10], callback) # Evaluate "f(10)" asynchronously calling callback when finished.

This is only one alternative. This module provides lots of facilities to achieve what you want. Also it will be really easy to make a decorator from this.

Tornado-Tornado是一个Python Web框架和异步网络库

Tornado是一个Python Web框架和异步网络库,最初是在FriendFeed通过使用非阻塞网络I/O,Tornado可以扩展到数万个开放连接,这使得它非常适合long pollingWebSockets,以及需要与每个用户建立长期连接的其他应用程序



import tornado.ioloop
import tornado.web

class MainHandler(tornado.web.RequestHandler):
    def get(self):
        self.write("Hello, world")

def make_app():
    return tornado.web.Application([
        (r"/", MainHandler),

if __name__ == "__main__":
    app = make_app()

此示例没有使用Tornado的任何异步特性;有关这一点,请参阅simple chat room


