问题:在Flask中进行异步任务
我正在Flask中编写一个应用程序,除了WSGI同步和阻塞之外,它的运行情况非常好。我特别有一项任务,该任务调出第三方API,该任务可能需要几分钟才能完成。我想拨打该电话(实际上是一系列电话)并使其运行。同时控制权返回给Flask。
我的看法如下:
@app.route('/render/<id>', methods=['POST'])
def render_script(id=None):
    ...
    data = json.loads(request.data)
    text_list = data.get('text_list')
    final_file = audio_class.render_audio(data=text_list)
    # do stuff
    return Response(
        mimetype='application/json',
        status=200
    )
现在,我要做的就是
final_file = audio_class.render_audio()运行并提供在方法返回时要执行的回调,而Flask可以继续处理请求。这是我需要Flask异步运行的唯一任务,并且我想就如何最好地实现这一点提供一些建议。
我看过Twisted和Klein,但我不确定它们是否过大,因为Threading就足够了。或者也许Celery是一个不错的选择?
回答 0
我将使用Celery为您处理异步任务。您需要安装一个代理作为您的任务队列(建议使用RabbitMQ和Redis)。
app.py:
from flask import Flask
from celery import Celery
broker_url = 'amqp://guest@localhost'          # Broker URL for RabbitMQ task queue
app = Flask(__name__)    
celery = Celery(app.name, broker=broker_url)
celery.config_from_object('celeryconfig')      # Your celery configurations in a celeryconfig.py
@celery.task(bind=True)
def some_long_task(self, x, y):
    # Do some long task
    ...
@app.route('/render/<id>', methods=['POST'])
def render_script(id=None):
    ...
    data = json.loads(request.data)
    text_list = data.get('text_list')
    final_file = audio_class.render_audio(data=text_list)
    some_long_task.delay(x, y)                 # Call your async task and pass whatever necessary variables
    return Response(
        mimetype='application/json',
        status=200
    )
运行您的Flask应用,然后启动另一个过程来运行您的Celery工作者。
$ celery worker -A app.celery --loglevel=debug我还将参考Miguel Gringberg的文章,以获取更深入的将Celery与Flask结合使用的指南。
回答 1
线程化是另一种可能的解决方案。尽管基于Celery的解决方案对于大规模应用程序更好,但是如果您不希望在所讨论的端点上有太多流量,则线程化是一种可行的选择。
该解决方案基于Miguel Grinberg的PyCon 2016 Flask Scale演示文稿,特别是其幻灯片平台中的第41张幻灯片。对于原始源感兴趣的人,他的代码也可以在github上找到。
从用户的角度来看,代码的工作方式如下:
- 您调用执行长时间运行任务的端点。
- 该端点返回202接受的链接,以检查任务状态。
- 在taks仍在运行时,对状态链接的调用返回202,在任务完成时返回200(及其结果)。
要将api调用转换为后台任务,只需添加@async_api装饰器。
这是一个完整的示例:
from flask import Flask, g, abort, current_app, request, url_for
from werkzeug.exceptions import HTTPException, InternalServerError
from flask_restful import Resource, Api
from datetime import datetime
from functools import wraps
import threading
import time
import uuid
tasks = {}
app = Flask(__name__)
api = Api(app)
@app.before_first_request
def before_first_request():
    """Start a background thread that cleans up old tasks."""
    def clean_old_tasks():
        """
        This function cleans up old tasks from our in-memory data structure.
        """
        global tasks
        while True:
            # Only keep tasks that are running or that finished less than 5
            # minutes ago.
            five_min_ago = datetime.timestamp(datetime.utcnow()) - 5 * 60
            tasks = {task_id: task for task_id, task in tasks.items()
                     if 'completion_timestamp' not in task or task['completion_timestamp'] > five_min_ago}
            time.sleep(60)
    if not current_app.config['TESTING']:
        thread = threading.Thread(target=clean_old_tasks)
        thread.start()
def async_api(wrapped_function):
    @wraps(wrapped_function)
    def new_function(*args, **kwargs):
        def task_call(flask_app, environ):
            # Create a request context similar to that of the original request
            # so that the task can have access to flask.g, flask.request, etc.
            with flask_app.request_context(environ):
                try:
                    tasks[task_id]['return_value'] = wrapped_function(*args, **kwargs)
                except HTTPException as e:
                    tasks[task_id]['return_value'] = current_app.handle_http_exception(e)
                except Exception as e:
                    # The function raised an exception, so we set a 500 error
                    tasks[task_id]['return_value'] = InternalServerError()
                    if current_app.debug:
                        # We want to find out if something happened so reraise
                        raise
                finally:
                    # We record the time of the response, to help in garbage
                    # collecting old tasks
                    tasks[task_id]['completion_timestamp'] = datetime.timestamp(datetime.utcnow())
                    # close the database session (if any)
        # Assign an id to the asynchronous task
        task_id = uuid.uuid4().hex
        # Record the task, and then launch it
        tasks[task_id] = {'task_thread': threading.Thread(
            target=task_call, args=(current_app._get_current_object(),
                               request.environ))}
        tasks[task_id]['task_thread'].start()
        # Return a 202 response, with a link that the client can use to
        # obtain task status
        print(url_for('gettaskstatus', task_id=task_id))
        return 'accepted', 202, {'Location': url_for('gettaskstatus', task_id=task_id)}
    return new_function
class GetTaskStatus(Resource):
    def get(self, task_id):
        """
        Return status about an asynchronous task. If this request returns a 202
        status code, it means that task hasn't finished yet. Else, the response
        from the task is returned.
        """
        task = tasks.get(task_id)
        if task is None:
            abort(404)
        if 'return_value' not in task:
            return '', 202, {'Location': url_for('gettaskstatus', task_id=task_id)}
        return task['return_value']
class CatchAll(Resource):
    @async_api
    def get(self, path=''):
        # perform some intensive processing
        print("starting processing task, path: '%s'" % path)
        time.sleep(10)
        print("completed processing task, path: '%s'" % path)
        return f'The answer is: {path}'
api.add_resource(CatchAll, '/<path:path>', '/')
api.add_resource(GetTaskStatus, '/status/<task_id>')
if __name__ == '__main__':
    app.run(debug=True)
回答 2
您也可以尝试使用multiprocessing.Process带daemon=True; 该process.start()方法不会阻止,您可以在后台执行昂贵的函数时立即将响应/状态返回给调用方。
在使用falcon框架并使用daemon过程帮助时,我遇到了类似的问题。
您需要执行以下操作:
from multiprocessing import Process
@app.route('/render/<id>', methods=['POST'])
def render_script(id=None):
    ...
    heavy_process = Process(  # Create a daemonic process with heavy "my_func"
        target=my_func,
        daemon=True
    )
    heavy_process.start()
    return Response(
        mimetype='application/json',
        status=200
    )
# Define some heavy function
def my_func():
    time.sleep(10)
    print("Process finished")
您应该立即得到响应,十秒钟后,您应该在控制台中看到打印的消息。
注意:请记住,daemonic不允许进程产生任何子进程。

