标签归档:flask

如何在单元测试中使用JSON发送请求

问题:如何在单元测试中使用JSON发送请求

我在Flask应用程序中包含在请求中使用JSON的代码,并且可以像这样获取JSON对象:

Request = request.get_json()

一切正常,但是我试图使用Python的unittest模块创建单元测试,并且很难找到一种发送带有请求的JSON的方法。

response=self.app.post('/test_function', 
                       data=json.dumps(dict(foo = 'bar')))

这给了我:

>>> request.get_data()
'{"foo": "bar"}'
>>> request.get_json()
None

Flask似乎有一个JSON参数,您可以在其中发布请求中设置json = dict(foo =’bar’),但我不知道如何使用unittest模块来做到这一点。

I have code within a Flask application that uses JSONs in the request, and I can get the JSON object like so:

Request = request.get_json()

This has been working fine, however I am trying to create unit tests using Python’s unittest module and I’m having difficulty finding a way to send a JSON with the request.

response=self.app.post('/test_function', 
                       data=json.dumps(dict(foo = 'bar')))

This gives me:

>>> request.get_data()
'{"foo": "bar"}'
>>> request.get_json()
None

Flask seems to have a JSON argument where you can set json=dict(foo=’bar’) within the post request, but I don’t know how to do that with the unittest module.


回答 0

将帖子更改为

response=self.app.post('/test_function', 
                       data=json.dumps(dict(foo='bar')),
                       content_type='application/json')

解决它。

感谢user3012759。

Changing the post to

response=self.app.post('/test_function', 
                       data=json.dumps(dict(foo='bar')),
                       content_type='application/json')

fixed it.

Thanks to user3012759.


回答 1

更新:由于Flask 1.0发布的flask.testing.FlaskClient方法接受json参数和Response.get_json添加的方法,请参见example

对于Flask 0.x,您可以使用以下收据:

from flask import Flask, Response as BaseResponse, json
from flask.testing import FlaskClient
from werkzeug.utils import cached_property


class Response(BaseResponse):
    @cached_property
    def json(self):
        return json.loads(self.data)


class TestClient(FlaskClient):
    def open(self, *args, **kwargs):
        if 'json' in kwargs:
            kwargs['data'] = json.dumps(kwargs.pop('json'))
            kwargs['content_type'] = 'application/json'
        return super(TestClient, self).open(*args, **kwargs)


app = Flask(__name__)
app.response_class = Response
app.test_client_class = TestClient
app.testing = True

UPDATE: Since Flask 1.0 released flask.testing.FlaskClient methods accepts json argument and Response.get_json method added, see example.

for Flask 0.x you may use receipt below:

from flask import Flask, Response as BaseResponse, json
from flask.testing import FlaskClient
from werkzeug.utils import cached_property


class Response(BaseResponse):
    @cached_property
    def json(self):
        return json.loads(self.data)


class TestClient(FlaskClient):
    def open(self, *args, **kwargs):
        if 'json' in kwargs:
            kwargs['data'] = json.dumps(kwargs.pop('json'))
            kwargs['content_type'] = 'application/json'
        return super(TestClient, self).open(*args, **kwargs)


app = Flask(__name__)
app.response_class = Response
app.test_client_class = TestClient
app.testing = True

在Flask中进行异步任务

问题:在Flask中进行异步任务

我正在Flask中编写一个应用程序,除了WSGI同步和阻塞之外,它的运行情况非常好。我特别有一项任务,该任务调出第三方API,该任务可能需要几分钟才能完成。我想拨打该电话(实际上是一系列电话)并使其运行。同时控制权返回给Flask。

我的看法如下:

@app.route('/render/<id>', methods=['POST'])
def render_script(id=None):
    ...
    data = json.loads(request.data)
    text_list = data.get('text_list')
    final_file = audio_class.render_audio(data=text_list)
    # do stuff
    return Response(
        mimetype='application/json',
        status=200
    )

现在,我要做的就是

final_file = audio_class.render_audio()

运行并提供在方法返回时要执行的回调,而Flask可以继续处理请求。这是我需要Flask异步运行的唯一任务,并且我想就如何最好地实现这一点提供一些建议。

我看过Twisted和Klein,但我不确定它们是否过大,因为Threading就足够了。或者也许Celery是一个不错的选择?

I am writing an application in Flask, which works really well except that WSGI is synchronous and blocking. I have one task in particular which calls out to a third party API and that task can take several minutes to complete. I would like to make that call (it’s actually a series of calls) and let it run. while control is returned to Flask.

My view looks like:

@app.route('/render/<id>', methods=['POST'])
def render_script(id=None):
    ...
    data = json.loads(request.data)
    text_list = data.get('text_list')
    final_file = audio_class.render_audio(data=text_list)
    # do stuff
    return Response(
        mimetype='application/json',
        status=200
    )

Now, what I want to do is have the line

final_file = audio_class.render_audio()

run and provide a callback to be executed when the method returns, whilst Flask can continue to process requests. This is the only task which I need Flask to run asynchronously, and I would like some advice on how best to implement this.

I have looked at Twisted and Klein, but I’m not sure they are overkill, as maybe Threading would suffice. Or maybe Celery is a good choice for this?


回答 0

我将使用Celery为您处理异步任务。您需要安装一个代理作为您的任务队列(建议使用RabbitMQ和Redis)。

app.py

from flask import Flask
from celery import Celery

broker_url = 'amqp://guest@localhost'          # Broker URL for RabbitMQ task queue

app = Flask(__name__)    
celery = Celery(app.name, broker=broker_url)
celery.config_from_object('celeryconfig')      # Your celery configurations in a celeryconfig.py

@celery.task(bind=True)
def some_long_task(self, x, y):
    # Do some long task
    ...

@app.route('/render/<id>', methods=['POST'])
def render_script(id=None):
    ...
    data = json.loads(request.data)
    text_list = data.get('text_list')
    final_file = audio_class.render_audio(data=text_list)
    some_long_task.delay(x, y)                 # Call your async task and pass whatever necessary variables
    return Response(
        mimetype='application/json',
        status=200
    )

运行您的Flask应用,然后启动另一个过程来运行您的Celery工作者。

$ celery worker -A app.celery --loglevel=debug

我还将参考Miguel Gringberg的文章以获取更深入的将Celery与Flask结合使用的指南。

I would use Celery to handle the asynchronous task for you. You’ll need to install a broker to serve as your task queue (RabbitMQ and Redis are recommended).

app.py:

from flask import Flask
from celery import Celery

broker_url = 'amqp://guest@localhost'          # Broker URL for RabbitMQ task queue

app = Flask(__name__)    
celery = Celery(app.name, broker=broker_url)
celery.config_from_object('celeryconfig')      # Your celery configurations in a celeryconfig.py

@celery.task(bind=True)
def some_long_task(self, x, y):
    # Do some long task
    ...

@app.route('/render/<id>', methods=['POST'])
def render_script(id=None):
    ...
    data = json.loads(request.data)
    text_list = data.get('text_list')
    final_file = audio_class.render_audio(data=text_list)
    some_long_task.delay(x, y)                 # Call your async task and pass whatever necessary variables
    return Response(
        mimetype='application/json',
        status=200
    )

Run your Flask app, and start another process to run your celery worker.

$ celery worker -A app.celery --loglevel=debug

I would also refer to Miguel Gringberg’s write up for a more in depth guide to using Celery with Flask.


回答 1

线程化是另一种可能的解决方案。尽管基于Celery的解决方案对于大规模应用程序更好,但是如果您不希望在所讨论的端点上有太多流量,则线程化是一种可行的选择。

该解决方案基于Miguel Grinberg的PyCon 2016 Flask Scale演示文稿,特别是其幻灯片平台中的第41张幻灯片。对于原始源感兴趣的人,他的代码也可以在github上找到。

从用户的角度来看,代码的工作方式如下:

  1. 您调用执行长时间运行任务的端点。
  2. 该端点返回202接受的链接,以检查任务状态。
  3. 在taks仍在运行时,对状态链接的调用返回202,在任务完成时返回200(及其结果)。

要将api调用转换为后台任务,只需添加@async_api装饰器。

这是一个完整的示例:

from flask import Flask, g, abort, current_app, request, url_for
from werkzeug.exceptions import HTTPException, InternalServerError
from flask_restful import Resource, Api
from datetime import datetime
from functools import wraps
import threading
import time
import uuid

tasks = {}

app = Flask(__name__)
api = Api(app)


@app.before_first_request
def before_first_request():
    """Start a background thread that cleans up old tasks."""
    def clean_old_tasks():
        """
        This function cleans up old tasks from our in-memory data structure.
        """
        global tasks
        while True:
            # Only keep tasks that are running or that finished less than 5
            # minutes ago.
            five_min_ago = datetime.timestamp(datetime.utcnow()) - 5 * 60
            tasks = {task_id: task for task_id, task in tasks.items()
                     if 'completion_timestamp' not in task or task['completion_timestamp'] > five_min_ago}
            time.sleep(60)

    if not current_app.config['TESTING']:
        thread = threading.Thread(target=clean_old_tasks)
        thread.start()


def async_api(wrapped_function):
    @wraps(wrapped_function)
    def new_function(*args, **kwargs):
        def task_call(flask_app, environ):
            # Create a request context similar to that of the original request
            # so that the task can have access to flask.g, flask.request, etc.
            with flask_app.request_context(environ):
                try:
                    tasks[task_id]['return_value'] = wrapped_function(*args, **kwargs)
                except HTTPException as e:
                    tasks[task_id]['return_value'] = current_app.handle_http_exception(e)
                except Exception as e:
                    # The function raised an exception, so we set a 500 error
                    tasks[task_id]['return_value'] = InternalServerError()
                    if current_app.debug:
                        # We want to find out if something happened so reraise
                        raise
                finally:
                    # We record the time of the response, to help in garbage
                    # collecting old tasks
                    tasks[task_id]['completion_timestamp'] = datetime.timestamp(datetime.utcnow())

                    # close the database session (if any)

        # Assign an id to the asynchronous task
        task_id = uuid.uuid4().hex

        # Record the task, and then launch it
        tasks[task_id] = {'task_thread': threading.Thread(
            target=task_call, args=(current_app._get_current_object(),
                               request.environ))}
        tasks[task_id]['task_thread'].start()

        # Return a 202 response, with a link that the client can use to
        # obtain task status
        print(url_for('gettaskstatus', task_id=task_id))
        return 'accepted', 202, {'Location': url_for('gettaskstatus', task_id=task_id)}
    return new_function


class GetTaskStatus(Resource):
    def get(self, task_id):
        """
        Return status about an asynchronous task. If this request returns a 202
        status code, it means that task hasn't finished yet. Else, the response
        from the task is returned.
        """
        task = tasks.get(task_id)
        if task is None:
            abort(404)
        if 'return_value' not in task:
            return '', 202, {'Location': url_for('gettaskstatus', task_id=task_id)}
        return task['return_value']


class CatchAll(Resource):
    @async_api
    def get(self, path=''):
        # perform some intensive processing
        print("starting processing task, path: '%s'" % path)
        time.sleep(10)
        print("completed processing task, path: '%s'" % path)
        return f'The answer is: {path}'


api.add_resource(CatchAll, '/<path:path>', '/')
api.add_resource(GetTaskStatus, '/status/<task_id>')


if __name__ == '__main__':
    app.run(debug=True)

Threading is another possible solution. Although the Celery based solution is better for applications at scale, if you are not expecting too much traffic on the endpoint in question, threading is a viable alternative.

This solution is based on Miguel Grinberg’s PyCon 2016 Flask at Scale presentation, specifically slide 41 in his slide deck. His code is also available on github for those interested in the original source.

From a user perspective the code works as follows:

  1. You make a call to the endpoint that performs the long running task.
  2. This endpoint returns 202 Accepted with a link to check on the task status.
  3. Calls to the status link returns 202 while the taks is still running, and returns 200 (and the result) when the task is complete.

To convert an api call to a background task, simply add the @async_api decorator.

Here is a fully contained example:

from flask import Flask, g, abort, current_app, request, url_for
from werkzeug.exceptions import HTTPException, InternalServerError
from flask_restful import Resource, Api
from datetime import datetime
from functools import wraps
import threading
import time
import uuid

tasks = {}

app = Flask(__name__)
api = Api(app)


@app.before_first_request
def before_first_request():
    """Start a background thread that cleans up old tasks."""
    def clean_old_tasks():
        """
        This function cleans up old tasks from our in-memory data structure.
        """
        global tasks
        while True:
            # Only keep tasks that are running or that finished less than 5
            # minutes ago.
            five_min_ago = datetime.timestamp(datetime.utcnow()) - 5 * 60
            tasks = {task_id: task for task_id, task in tasks.items()
                     if 'completion_timestamp' not in task or task['completion_timestamp'] > five_min_ago}
            time.sleep(60)

    if not current_app.config['TESTING']:
        thread = threading.Thread(target=clean_old_tasks)
        thread.start()


def async_api(wrapped_function):
    @wraps(wrapped_function)
    def new_function(*args, **kwargs):
        def task_call(flask_app, environ):
            # Create a request context similar to that of the original request
            # so that the task can have access to flask.g, flask.request, etc.
            with flask_app.request_context(environ):
                try:
                    tasks[task_id]['return_value'] = wrapped_function(*args, **kwargs)
                except HTTPException as e:
                    tasks[task_id]['return_value'] = current_app.handle_http_exception(e)
                except Exception as e:
                    # The function raised an exception, so we set a 500 error
                    tasks[task_id]['return_value'] = InternalServerError()
                    if current_app.debug:
                        # We want to find out if something happened so reraise
                        raise
                finally:
                    # We record the time of the response, to help in garbage
                    # collecting old tasks
                    tasks[task_id]['completion_timestamp'] = datetime.timestamp(datetime.utcnow())

                    # close the database session (if any)

        # Assign an id to the asynchronous task
        task_id = uuid.uuid4().hex

        # Record the task, and then launch it
        tasks[task_id] = {'task_thread': threading.Thread(
            target=task_call, args=(current_app._get_current_object(),
                               request.environ))}
        tasks[task_id]['task_thread'].start()

        # Return a 202 response, with a link that the client can use to
        # obtain task status
        print(url_for('gettaskstatus', task_id=task_id))
        return 'accepted', 202, {'Location': url_for('gettaskstatus', task_id=task_id)}
    return new_function


class GetTaskStatus(Resource):
    def get(self, task_id):
        """
        Return status about an asynchronous task. If this request returns a 202
        status code, it means that task hasn't finished yet. Else, the response
        from the task is returned.
        """
        task = tasks.get(task_id)
        if task is None:
            abort(404)
        if 'return_value' not in task:
            return '', 202, {'Location': url_for('gettaskstatus', task_id=task_id)}
        return task['return_value']


class CatchAll(Resource):
    @async_api
    def get(self, path=''):
        # perform some intensive processing
        print("starting processing task, path: '%s'" % path)
        time.sleep(10)
        print("completed processing task, path: '%s'" % path)
        return f'The answer is: {path}'


api.add_resource(CatchAll, '/<path:path>', '/')
api.add_resource(GetTaskStatus, '/status/<task_id>')


if __name__ == '__main__':
    app.run(debug=True)


回答 2

您也可以尝试使用multiprocessing.Processdaemon=True; 该process.start()方法不会阻止,您可以在后台执行昂贵的函数时立即将响应/状态返回给调用方。

在使用falcon框架并使用daemon过程帮助时,我遇到了类似的问题。

您需要执行以下操作:

from multiprocessing import Process

@app.route('/render/<id>', methods=['POST'])
def render_script(id=None):
    ...
    heavy_process = Process(  # Create a daemonic process with heavy "my_func"
        target=my_func,
        daemon=True
    )
    heavy_process.start()
    return Response(
        mimetype='application/json',
        status=200
    )

# Define some heavy function
def my_func():
    time.sleep(10)
    print("Process finished")

您应该立即得到响应,十秒钟后,您应该在控制台中看到打印的消息。

注意:请记住,daemonic不允许进程产生任何子进程。

You can also try using multiprocessing.Process with daemon=True; the process.start() method does not block and you can return a response/status immediately to the caller while your expensive function executes in the background.

I experienced similar problem while working with falcon framework and using daemon process helped.

You’d need to do the following:

from multiprocessing import Process

@app.route('/render/<id>', methods=['POST'])
def render_script(id=None):
    ...
    heavy_process = Process(  # Create a daemonic process with heavy "my_func"
        target=my_func,
        daemon=True
    )
    heavy_process.start()
    return Response(
        mimetype='application/json',
        status=200
    )

# Define some heavy function
def my_func():
    time.sleep(10)
    print("Process finished")

You should get a response immediately and, after 10s you should see a printed message in the console.

NOTE: Keep in mind that daemonic processes are not allowed to spawn any child processes.


全局变量在烧瓶中是线程安全的吗?如何在请求之间共享数据?

问题:全局变量在烧瓶中是线程安全的吗?如何在请求之间共享数据?

在我的应用程序中,公共对象的状态通过发出请求来更改,并且响应取决于状态。

class SomeObj():
    def __init__(self, param):
        self.param = param
    def query(self):
        self.param += 1
        return self.param

global_obj = SomeObj(0)

@app.route('/')
def home():
    flash(global_obj.query())
    render_template('index.html')

如果我在开发服务器上运行它,我希望得到1、2、3,依此类推。如果同时从100个不同的客户发出请求,会出问题吗?预期结果将是100个不同的客户端各自看到一个从1到100的唯一数字。或者会发生以下情况:

  1. 客户端1查询。self.param增加1。
  2. 在执行return语句之前,线程将切换到客户端2。self.param再次增加。
  3. 线程切换回客户端1,并向客户端返回数字2,例如。
  4. 现在,该线程移至客户端2,并向其返回数字3。

由于只有两个客户,因此预期结果是1和2,而不是2和3。跳过了一个数字。

当我扩展应用程序时,这是否真的会发生?我应该考虑使用什么替代全局变量?

In my application, the state of a common object is changed by making requests, and the response depends on the state.

class SomeObj():
    def __init__(self, param):
        self.param = param
    def query(self):
        self.param += 1
        return self.param

global_obj = SomeObj(0)

@app.route('/')
def home():
    flash(global_obj.query())
    render_template('index.html')

If I run this on my development server, I expect to get 1, 2, 3 and so on. If requests are made from 100 different clients simultaneously, can something go wrong? The expected result would be that the 100 different clients each see a unique number from 1 to 100. Or will something like this happen:

  1. Client 1 queries. self.param is incremented by 1.
  2. Before the return statement can be executed, the thread switches over to client 2. self.param is incremented again.
  3. The thread switches back to client 1, and the client is returned the number 2, say.
  4. Now the thread moves to client 2 and returns him/her the number 3.

Since there were only two clients, the expected results were 1 and 2, not 2 and 3. A number was skipped.

Will this actually happen as I scale up my application? What alternatives to a global variable should I look at?


回答 0

您不能使用全局变量来保存此类数据。它不仅不是线程安全的,也不是进程安全的,并且生产中的WSGI服务器产生了多个进程。如果您使用线程来处理请求,不仅计数会错误,而且根据处理该请求的进程的不同,计数也会有所不同。

使用Flask外部的数据源来保存全局数据。数据库,内存缓存或Redis都是适合的单独存储区域,具体取决于您的需求。如果您需要加载和访问Python数据,请考虑multiprocessing.Manager。您还可以将会话用于每个用户的简单数据。


开发服务器可以在单线程和进程中运行。您将看不到您描述的行为,因为每个请求都将被同步处理。启用线程或进程,您将看到它。app.run(threaded=True)app.run(processes=10)。(在1.0中,服务器默认为线程化。)


某些WSGI服务器可能支持gevent或其他异步工作器。全局变量仍然不是线程安全的,因为仍然没有针对大多数竞争条件的保护措施。您仍然可以设想这样一个场景,其中一个工作人员获取了一个值,产生了收益,另一个工作人员对其进行了修改,产生了收益,然后第一个工作人员也对其进行了修改。


如果请求期间需要存储一些全局数据,则可以使用Flask的gobject。另一个常见的情况是管理数据库连接的某些顶级对象。这种“全局”类型的区别在于,它对每个请求都是唯一的,请求之间不使用,并且有一些东西可以管理资源的建立和拆除。

You can’t use global variables to hold this sort of data. Not only is it not thread safe, it’s not process safe, and WSGI servers in production spawn multiple processes. Not only would your counts be wrong if you were using threads to handle requests, they would also vary depending on which process handled the request.

Use a data source outside of Flask to hold global data. A database, memcached, or redis are all appropriate separate storage areas, depending on your needs. If you need to load and access Python data, consider multiprocessing.Manager. You could also use the session for simple data that is per-user.


The development server may run in single thread and process. You won’t see the behavior you describe since each request will be handled synchronously. Enable threads or processes and you will see it. app.run(threaded=True) or app.run(processes=10). (In 1.0 the server is threaded by default.)


Some WSGI servers may support gevent or another async worker. Global variables are still not thread safe because there’s still no protection against most race conditions. You can still have a scenario where one worker gets a value, yields, another modifies it, yields, then the first worker also modifies it.


If you need to store some global data during a request, you may use Flask’s g object. Another common case is some top-level object that manages database connections. The distinction for this type of “global” is that it’s unique to each request, not used between requests, and there’s something managing the set up and teardown of the resource.


回答 1

这并不是对全局变量线程安全的真正答案。

但是我认为在这里提到会议很重要。您正在寻找一种存储特定于客户的数据的方法。每个连接都应该以线程安全的方式访问其自己的数据池。

服务器端会话可以做到这一点,它们可以在非常整齐的烧瓶插件中找到:https//pythonhosted.org/Flask-Session/

如果设置会话,session则所有路径中都存在一个变量,其行为类似于字典。对于每个连接的客户端,此词典中存储的数据都是单独的。

这是一个简短的演示:

from flask import Flask, session
from flask_session import Session

app = Flask(__name__)
# Check Configuration section for more details
SESSION_TYPE = 'filesystem'
app.config.from_object(__name__)
Session(app)

@app.route('/')
def reset():
    session["counter"]=0

    return "counter was reset"

@app.route('/inc')
def routeA():
    if not "counter" in session:
        session["counter"]=0

    session["counter"]+=1

    return "counter is {}".format(session["counter"])

@app.route('/dec')
def routeB():
    if not "counter" in session:
        session["counter"] = 0

    session["counter"] -= 1

    return "counter is {}".format(session["counter"])


if __name__ == '__main__':
    app.run()

之后pip install Flask-Session,您应该可以运行它。尝试从不同的浏览器访问它,您会发现计数器未在它们之间共享。

This is not really an answer to thread safety of globals.

But I think it is important to mention sessions here. You are looking for a way to store client-specific data. Every connection should have access to its own pool of data, in a threadsafe way.

This is possible with server-side sessions, and they are available in a very neat flask plugin: https://pythonhosted.org/Flask-Session/

If you set up sessions, a session variable is available in all your routes and it behaves like a dictionary. The data stored in this dictionary is individual for each connecting client.

Here is a short demo:

from flask import Flask, session
from flask_session import Session

app = Flask(__name__)
# Check Configuration section for more details
SESSION_TYPE = 'filesystem'
app.config.from_object(__name__)
Session(app)

@app.route('/')
def reset():
    session["counter"]=0

    return "counter was reset"

@app.route('/inc')
def routeA():
    if not "counter" in session:
        session["counter"]=0

    session["counter"]+=1

    return "counter is {}".format(session["counter"])

@app.route('/dec')
def routeB():
    if not "counter" in session:
        session["counter"] = 0

    session["counter"] -= 1

    return "counter is {}".format(session["counter"])


if __name__ == '__main__':
    app.run()

After pip install Flask-Session, you should be able to run this. Try accessing it from different browsers, you’ll see that the counter is not shared between them.


回答 2

虽然完全接受上述建议的答案,并且不鼓励将全局变量用于生产和可扩展的烧瓶存储,但出于对原型“非常简单的服务器”的目的,在烧瓶“开发服务器”下运行…

… python内置数据类型,我dict根据python docs(https://docs.python.org/3/glossary.html#term-global-interpreter-lock)亲自使用并测试了global 线程安全。不处理安全。

从开发服务器下运行的每个(可能是并发的)flask会话中,可以从(服务器全局)字典中进行插入,查找和读取。

当使用唯一的flask会话密钥为此类全局字典输入密钥时,它对于会话特定数据的服务器端存储非常有用,否则该会话不适合cookie(最大大小为4k)。

当然,应该谨慎地防止此类服务器全局指令在内存中增长得太大。可以在请求处理期间对某种过期的“旧”键/值对进行编码。

同样,不建议将其用于生产或可伸缩部署,但对于面向本地任务的服务器来说可能不错,因为对于给定任务而言,单独的db过多

While totally accepting the previous upvoted answers, and discouraging use of global variables for production and scalable Flask storage, for the purpose of prototyping or really simple servers, running under the flask ‘development server’…

The Python built-in data types, and I personally used and tested the global dict, as per Python documentation are thread safe. Not process safe.

The insertions, lookups, and reads from such a (server global) dict will be OK from each (possibly concurrent) Flask session running under the development server.

When such a global dict is keyed with a unique Flask session key, it can be rather useful for server-side storage of session specific data otherwise not fitting into the cookie (max size 4 kB).

Of course, such a server global dict should be carefully guarded for growing too large, being in-memory. Some sort of expiring the ‘old’ key/value pairs can be coded during request processing.

Again, it is not recommended for production or scalable deployments, but it is possibly OK for local task-oriented servers where a separate database is too much for the given task.


模板文件更改时重新加载Flask应用

问题:模板文件更改时重新加载Flask应用

默认情况下,使用内置服务器(Flask.run)运行Flask应用程序时,它将监视其Python文件并在代码更改时自动重新加载该应用程序:

* Detected change in '/home/xion/hello-world/app.py', reloading
* Restarting with reloader

不幸的是,这似乎仅适用于* .py文件,而且我似乎没有找到任何将此功能扩展到其他文件的方法。最值得注意的是,当模板更改时,让Flask重新启动应用程序将非常有用。我已经迷失了多少次我不喜欢模板中的标记,却因为看不到任何更改而感到困惑,只是发现该应用程序仍在使用旧版本的Jinja模板。

因此,有没有办法在模板目录中包含Flask监视文件,还是需要深入研究框架的源代码?

编辑:我正在使用Ubuntu 10.10。尚未在任何其他平台上尝试过。


经过进一步的查询,我发现模板中的更改确实实时更新的,而无需重新加载应用程序本身。但是,这似乎仅适用于传递给的模板flask.render_template

但是碰巧的是,在我的应用程序中,我有很多在Jinja模板中使用的可重用的,参数化的组件。它们实现为{% macro %}s,驻留在专用的“模块”中,并{% import %}编入实际页面。很好,很干…除了那些导入的模板显然从未检查过是否修改,因为它们根本没有通过render_template

(奇怪的是,对于通过调用的模板,这种情况不会发生{% extends %}。至于{% include %},由于不真正使用它们,我不知道。)

因此,总结起来,这种现象的根源似乎在Jinja和Flask或Werkzeug之间。我想对于其中的任何一个项目,都可能需要进行bug跟踪:)同时,我已经接受了jd。的答案,因为这是我实际使用的解决方案-而且它的工作原理很像魅力。

By default, when running Flask application using the built-in server (Flask.run), it monitors its Python files and automatically reloads the app if its code changes:

* Detected change in '/home/xion/hello-world/app.py', reloading
* Restarting with reloader

Unfortunately, this seems to work for *.py files only, and I don’t seem to find any way to extend this functionality to other files. Most notably, it would be extremely useful to have Flask restart the app when a template changes. I’ve lost count on how many times I was fiddling with markup in templates and getting confused by not seeing any changes, only to find out that the app was still using the old version of Jinja template.

So, is there a way to have Flask monitor files in templates directory, or does it require diving into the framework’s source?

Edit: I’m using Ubuntu 10.10. Haven’t tried that on any other platforms really.


After further inquiry, I have discovered that changes in templates indeed are updated in real time, without reloading the app itself. However, this seems to apply only to those templates that are passed to flask.render_template.

But it so happens that in my app, I have quite a lot of reusable, parametrized components which I use in Jinja templates. They are implemented as {% macro %}s, reside in dedicated “modules” and are {% import %}ed into actual pages. All nice and DRY… except that those imported templates are apparently never checked for modifications, as they don’t pass through render_template at all.

(Curiously, this doesn’t happen for templates invoked through {% extends %}. As for {% include %}, I have no idea as I don’t really use them.)

So to wrap up, the roots of this phenomenon seems to lie somewhere between Jinja and Flask or Werkzeug. I guess it may warrant a trip to bug tracker for either of those projects :) Meanwhile, I’ve accepted the jd.‘s answer because that’s the solution I actually used – and it works like a charm.


回答 0

以我的经验,模板甚至不需要重新启动应用程序即可刷新,因为每次render_template()调用时都应从磁盘加载模板。也许您的模板使用方式有所不同。

要在模板更改(或其他文件)时重新加载应用程序,可以将extra_files参数传递给Flask().run(),以观察文件名的集合:这些文件上的任何更改都会触发重新加载器。

例:

from os import path, walk

extra_dirs = ['directory/to/watch',]
extra_files = extra_dirs[:]
for extra_dir in extra_dirs:
    for dirname, dirs, files in walk(extra_dir):
        for filename in files:
            filename = path.join(dirname, filename)
            if path.isfile(filename):
                extra_files.append(filename)
app.run(extra_files=extra_files)

参见此处:http : //werkzeug.pocoo.org/docs/0.10/serving/?highlight=run_simple#werkzeug.serving.run_simple

In my experience, templates don’t even need the application to restart to be refreshed, as they should be loaded from disk everytime render_template() is called. Maybe your templates are used differently though.

To reload your application when the templates change (or any other file), you can pass the extra_files argument to Flask().run(), a collection of filenames to watch: any change on those files will trigger the reloader.

Example:

from os import path, walk

extra_dirs = ['directory/to/watch',]
extra_files = extra_dirs[:]
for extra_dir in extra_dirs:
    for dirname, dirs, files in walk(extra_dir):
        for filename in files:
            filename = path.join(dirname, filename)
            if path.isfile(filename):
                extra_files.append(filename)
app.run(extra_files=extra_files)

See here: http://werkzeug.pocoo.org/docs/0.10/serving/?highlight=run_simple#werkzeug.serving.run_simple


回答 1

您可以使用

TEMPLATES_AUTO_RELOAD = True

http://flask.pocoo.org/docs/1.0/config/

是否检查模板源的修改并自动重新加载。默认情况下,该值为“无”,这意味着Flask仅在调试模式下检查原始文件。

you can use

TEMPLATES_AUTO_RELOAD = True

From http://flask.pocoo.org/docs/1.0/config/

Whether to check for modifications of the template source and reload it automatically. By default the value is None which means that Flask checks original file only in debug mode.


回答 2

使用jinja模板时,需要设置一些参数。就python3而言,我使用以下代码解决了它:

if __name__ == '__main__':
    app.jinja_env.auto_reload = True
    app.config['TEMPLATES_AUTO_RELOAD'] = True
    app.run(debug=True, host='0.0.0.0')

When you are working with jinja templates, you need to set some parameters. In my case with python3, I solved it with the following code:

if __name__ == '__main__':
    app.jinja_env.auto_reload = True
    app.config['TEMPLATES_AUTO_RELOAD'] = True
    app.run(debug=True, host='0.0.0.0')

回答 3

对我来说效果很好:

 from flask import Flask, render_template, request, url_for, redirect
 app = Flask(__name__)
 app.config["TEMPLATES_AUTO_RELOAD"] = True

请参阅http://flask.pocoo.org/docs/1.0/config/

For me works just fine:

 from flask import Flask, render_template, request, url_for, redirect
 app = Flask(__name__)
 app.config["TEMPLATES_AUTO_RELOAD"] = True

See more on http://flask.pocoo.org/docs/1.0/config/


回答 4

实际上对我来说TEMPLATES_AUTO_RELOAD = True不起作用(0.12版本)。我使用jinja2和我所做的事情:

  1. 创建功能 before_request

    def before_request():
        app.jinja_env.cache = {}
  2. 在应用程序中注册

    app.before_request(before_request)
  3. 而已。

Actually for me TEMPLATES_AUTO_RELOAD = True does not work (0.12 version). I use jinja2 and what i have done:

  1. Create function before_request

    def before_request():
        app.jinja_env.cache = {}
    
  2. Register it in application

    app.before_request(before_request)
    
  3. That’s it.


回答 5

对我有用的只是添加以下内容:

@app.before_request
def before_request():
    # When you import jinja2 macros, they get cached which is annoying for local
    # development, so wipe the cache every request.
    if 'localhost' in request.host_url or '0.0.0.0' in request.host_url:
        app.jinja_env.cache = {}

摘自@dikkini的回答

What worked for me is just adding this:

@app.before_request
def before_request():
    # When you import jinja2 macros, they get cached which is annoying for local
    # development, so wipe the cache every request.
    if 'localhost' in request.host_url or '0.0.0.0' in request.host_url:
        app.jinja_env.cache = {}

(taken from @dikkini’s answer)


回答 6

在Windows上使用最新版本的Flask,并将run命令和debug设置为true;无需重置Flask即可生效对模板的更改。尝试按Shift + F5(或Shift加重新加载按钮)以确保没有任何内容被缓存。

Using the latest version of Flask on Windows, using the run command and debug set to true; Flask doesn’t need to be reset for changes to templates to be brought in to effect. Try Shift+F5 (or Shift plus the reload button) to make sure nothing it being cached.


回答 7

参见http://flask.pocoo.org/docs/1.0/quickstart/ 并使用FLASK_ENV=development


回答 8

截至2019年6月更新:

建议在app.run()上使用烧瓶CLI,以运行开发服务器,因此,如果我们要使用CLI,则不能使用接受的解决方案。

在撰写本文时,使用Flask(1.1)的开发版本允许我们设置环境变量FLASK_RUN_EXTRA_FILES,该变量有效地执行与接受的答案相同的操作。

请参阅此github问题

用法示例:

export FLASK_RUN_EXTRA_FILES="app/templates/index.html"
flask run

在Linux中。要指定多个其他文件,请使用冒号分隔文件路径。,例如

export FLASK_RUN_EXTRA_FILES="app/templates/index.html:app/templates/other.html"

CLI还支持--extra-filesFlask 1.1以后的参数。

Updated as of June 2019:

The flask CLI is recommended over app.run() for running a dev server, so if we want to use the CLI then the accepted solution can’t be used.

Using the development version of Flask (1.1) as of this writing allows us to set an environment variable FLASK_RUN_EXTRA_FILES which effectively does the same thing as the accepted answer.

See this github issue.

Example usage:

export FLASK_RUN_EXTRA_FILES="app/templates/index.html"
flask run

in Linux. To specify multiple extra files, separate file paths with colons., e.g.

export FLASK_RUN_EXTRA_FILES="app/templates/index.html:app/templates/other.html"

The CLI also supports an --extra-files argument as of Flask 1.1.


回答 9

模板会自动重新加载,为什么不ctrl+f5刷新网页,导致Web浏览器通常保存缓存。

Templates are reloaded automatically, why not doing ctrl+f5 to refresh the webpage, cause web-browsers usually save cache.


Flask中的静态文件-robot.txt,sitemap.xml(mod_wsgi)

问题:Flask中的静态文件-robot.txt,sitemap.xml(mod_wsgi)

是否有任何聪明的解决方案将静态文件存储在Flask的应用程序根目录中。robots.txt和sitemap.xml有望在/中找到,所以我的想法是为它们创建路由:

@app.route('/sitemap.xml', methods=['GET'])
def sitemap():
  response = make_response(open('sitemap.xml').read())
  response.headers["Content-type"] = "text/plain"
  return response

必须有一些更方便的方法:)

Is there any clever solution to store static files in Flask’s application root directory. robots.txt and sitemap.xml are expected to be found in /, so my idea was to create routes for them:

@app.route('/sitemap.xml', methods=['GET'])
def sitemap():
  response = make_response(open('sitemap.xml').read())
  response.headers["Content-type"] = "text/plain"
  return response

There must be something more convenient :)


回答 0

最好的方法是将static_url_path设置为root url

from flask import Flask

app = Flask(__name__, static_folder='static', static_url_path='')

The best way is to set static_url_path to root url

from flask import Flask

app = Flask(__name__, static_folder='static', static_url_path='')

回答 1

@vonPetrushev是正确的,在生产中,您将希望通过nginx或apache提供静态文件,但是对于开发人员来说,最好让开发环境简单,让python应用程序也提供静态内容,因此您不必担心有关更改配置和多个项目的信息。为此,您将要使用SharedDataMiddleware

from flask import Flask
app = Flask(__name__)
'''
Your app setup and code
'''
if app.config['DEBUG']:
    from werkzeug import SharedDataMiddleware
    import os
    app.wsgi_app = SharedDataMiddleware(app.wsgi_app, {
      '/': os.path.join(os.path.dirname(__file__), 'static')
    })

本示例假定您的静态文件位于文件夹“ static”中,并根据您的环境进行调整。

@vonPetrushev is right, in production you’ll want to serve static files via nginx or apache, but for development it’s nice to have your dev environment simple having your python app serving up the static content as well so you don’t have to worry about changing configurations and multiple projects. To do that, you’ll want to use the SharedDataMiddleware.

from flask import Flask
app = Flask(__name__)
'''
Your app setup and code
'''
if app.config['DEBUG']:
    from werkzeug import SharedDataMiddleware
    import os
    app.wsgi_app = SharedDataMiddleware(app.wsgi_app, {
      '/': os.path.join(os.path.dirname(__file__), 'static')
    })

This example assumes your static files are in the folder “static”, adjust to whatever fits your environment.


回答 2

这个问题的最干净答案是这个(相同)问题答案

from flask import Flask, request, send_from_directory
app = Flask(__name__, static_folder='static')    

@app.route('/robots.txt')
@app.route('/sitemap.xml')
def static_from_root():
    return send_from_directory(app.static_folder, request.path[1:])

总结一下:

  • 正如David指出的那样,通过正确的配置,可以通过prod提供一些静态文件
  • 查找/robots.txt不应导致重定向到/static/robots.txt。(在肖恩斯回答中,目前尚不清楚如何实现。)
  • 将静态文件添加到应用程序根文件夹中并不干净
  • 最后,所提出的解决方案看起来比添加中间件方法更干净:

The cleanest answer to this question is the answer to this (identical) question:

from flask import Flask, request, send_from_directory
app = Flask(__name__, static_folder='static')    

@app.route('/robots.txt')
@app.route('/sitemap.xml')
def static_from_root():
    return send_from_directory(app.static_folder, request.path[1:])

To summarize:

  • as David pointed out, with the right config it’s ok to serve a few static files through prod
  • looking for /robots.txt shouldn’t result in a redirect to /static/robots.txt. (In Seans answer it’s not immediately clear how that’s achieved.)
  • it’s not clean to add static files into the app root folder
  • finally, the proposed solution looks much cleaner than the adding middleware approach:

回答 3

尽管这是一个古老的答案问题,但我正在回答这个问题,因为此帖子在Google搜索结果中的排名很高。尽管文档中未涉及它,但是如果您阅读了Flask Application对象构造函数的API文档,则将涉及它。通过static_folder像这样传递命名参数:

from flask import Flask
app = Flask(__name__,
            static_folder="/path/to/static",
            template_folder="/path/to/templates")

…您可以定义从何处提供静态文件。同样,您可以定义a template_folder,即您的名字static_url_path

Even though this is an old answered question, I’m answering this because this post comes up pretty high in the Google results. While it’s not covered in the documentation, if you read the API docs for the Flask Application object constructor it’s covered. By passing the named parameter static_folder like so:

from flask import Flask
app = Flask(__name__,
            static_folder="/path/to/static",
            template_folder="/path/to/templates")

…you can define where static files are served from. Similarly, you can define a template_folder, the name of you static_url_path.


回答 4

提供静态文件与旨在传递动态内容的应用程序无关。提供静态文件的正确方法取决于您使用的服务器。毕竟,当您启动并运行应用程序时,需要将其绑定到Web服务器。我只能说Apache httpd,因此在通过mod-wsgi绑定到应用程序的虚拟主机中定义了提供静态文件的方式。以下指南将向您展示如何提供站点地图,robots.txt或任何静态内容:http : //code.google.com/p/modwsgi/wiki/QuickConfigurationGuide#Mounting_At_Root_Of_Site

Serving static files has nothing to do with application that is meant to deliver dynamic content. The correct way of serving static files is dependent of what server you’re using. After all, when you get your app up and running, you will need to bind it to a web server. I can speak only for apache httpd, so the way of serving static files is defined in the virtual host that you are binding to your application through mod-wsgi. Here is the guide that will show you how to serve sitemaps, robots.txt or any static content: http://code.google.com/p/modwsgi/wiki/QuickConfigurationGuide#Mounting_At_Root_Of_Site


回答 5

发送静态文件的另一种方法是使用全部规则,如下所示:

@app.route('/<path:path>')
def catch_all(path):
    if not app.debug:
        flask.abort(404)
    try:
        f = open(path)
    except IOError, e:
        flask.abort(404)
        return
    return f.read()

我用它来尽量减少开发时的设置。我从http://flask.pocoo.org/snippets/57/获得了这个想法

此外,我正在单机上使用flask开发,但在生产服务器中使用Apache进行了部署。我用:

file_suffix_to_mimetype = {
    '.css': 'text/css',
    '.jpg': 'image/jpeg',
    '.html': 'text/html',
    '.ico': 'image/x-icon',
    '.png': 'image/png',
    '.js': 'application/javascript'
}
def static_file(path):
    try:
        f = open(path)
    except IOError, e:
        flask.abort(404)
        return
    root, ext = os.path.splitext(path)
    if ext in file_suffix_to_mimetype:
        return flask.Response(f.read(), mimetype=file_suffix_to_mimetype[ext])
    return f.read()

[...]

if __name__ == '__main__':
    parser = optparse.OptionParser()
    parser.add_option('-d', '--debug', dest='debug', default=False,
                      help='turn on Flask debugging', action='store_true')

    options, args = parser.parse_args()

    if options.debug:
        app.debug = True
        # set up flask to serve static content
        app.add_url_rule('/<path:path>', 'static_file', static_file)
    app.run()

Another way to send static files is to use a catch-all rule like this:

@app.route('/<path:path>')
def catch_all(path):
    if not app.debug:
        flask.abort(404)
    try:
        f = open(path)
    except IOError, e:
        flask.abort(404)
        return
    return f.read()

I use this to try to minimise the set-up when developing. I got the idea from http://flask.pocoo.org/snippets/57/

Further, I’m developing using flask on my standalone machine but deploying with Apache in production server. I use:

file_suffix_to_mimetype = {
    '.css': 'text/css',
    '.jpg': 'image/jpeg',
    '.html': 'text/html',
    '.ico': 'image/x-icon',
    '.png': 'image/png',
    '.js': 'application/javascript'
}
def static_file(path):
    try:
        f = open(path)
    except IOError, e:
        flask.abort(404)
        return
    root, ext = os.path.splitext(path)
    if ext in file_suffix_to_mimetype:
        return flask.Response(f.read(), mimetype=file_suffix_to_mimetype[ext])
    return f.read()

[...]

if __name__ == '__main__':
    parser = optparse.OptionParser()
    parser.add_option('-d', '--debug', dest='debug', default=False,
                      help='turn on Flask debugging', action='store_true')

    options, args = parser.parse_args()

    if options.debug:
        app.debug = True
        # set up flask to serve static content
        app.add_url_rule('/<path:path>', 'static_file', static_file)
    app.run()

回答 6

自从问了这个问题以来,可能已经添加了它,但是我正在查看flask的“ helpers.py”,然后发现flask.send_from_directory:

send_from_directory(directory, filename, **options)
'''
  send_from_directory(directory, filename, **options)
  Send a file from a given directory with send_file.  This
  is a secure way to quickly expose static files from an upload folder
  or something similar.
'''

…引用flask.send_file:

send_file(filename_or_fp, mimetype=None, as_attachment=False, attachment_filename=None, add_etags=True, cache_timeout=43200, conditional=False)

…尽管对send_from_directory直接传递** options到send_file,但似乎可以更好地进行控制。

This might have been added since this question was asked, but I was looking through flask’s “helpers.py” and I found flask.send_from_directory:

send_from_directory(directory, filename, **options)
'''
  send_from_directory(directory, filename, **options)
  Send a file from a given directory with send_file.  This
  is a secure way to quickly expose static files from an upload folder
  or something similar.
'''

… which references flask.send_file:

send_file(filename_or_fp, mimetype=None, as_attachment=False, attachment_filename=None, add_etags=True, cache_timeout=43200, conditional=False)

… which seems better for more control, although send_from_directory passes **options directly through to send_file.


回答 7

从此处的文档中:http : //flask.pocoo.org/docs/quickstart/#static-files

动态Web应用程序也需要静态文件。通常这就是CSS和JavaScript文件的来源。理想情况下,将Web服务器配置为为您提供服务,但是在开发过程中Flask也可以做到。只需在包中或模块旁边创建一个名为static的文件夹,该文件夹将在应用程序的/ static中可用。

要生成指向该URL部分的URL,请使用特殊的“静态” URL名称:

url_for(’静态’,filename =’style.css’)

该文件必须以static / style.css的形式存储在文件系统中。

From the documentation here: http://flask.pocoo.org/docs/quickstart/#static-files

Dynamic web applications need static files as well. That’s usually where the CSS and JavaScript files are coming from. Ideally your web server is configured to serve them for you, but during development Flask can do that as well. Just create a folder called static in your package or next to your module and it will be available at /static on the application.

To generate URLs to that part of the URL, use the special ‘static’ URL name:

url_for(‘static’, filename=’style.css’)

The file has to be stored on the filesystem as static/style.css.


回答 8

我也有同样的困境。做了一些搜索,找到了我的答案(MHO):

也可以从文档中引用

动态Web应用程序也需要静态文件。通常这就是CSS和JavaScript文件的来源。理想情况下,您的Web服务器配置为可以为您提供服务,但是在开发期间Flask也可以做到。只需在包中或模块旁边创建一个名为static的文件夹,该文件夹将在应用程序的/ static中可用。

恕我直言:当您的应用程序投入生产时,应在Web服务器(nginx,apache)上配置(或理想情况下)静态文件服务;但是在开发过程中,Flask使其可用于提供静态文件。这是为了帮助您快速发展-无需设置网络服务器等。

希望能帮助到你。

I’m having the same dilemma as well. Did some search and found my answer(MHO):

Might as well quote from the documentation

Dynamic web applications need static files as well. That’s usually where the CSS and JavaScript files are coming from. Ideally your web server is configured to serve them for you, but during development Flask can do that as well. Just create a folder called static in your package or next to your module and it will be available at /static on the application.

IMHO: When your application is up for production, static file serving should be (or is ideally) configured on the webserver (nginx, apache); but during development, Flask made it available to serve static files. This is to help you develop rapidly – no need to setup webservers and such.

Hope it helps.


回答 9

试试这个:

@app.route("/ProtectedFolder/<path:filename>")
@CheckUserSecurityAccessConditions
def Protect_Content(filename):
  return send_from_directory((os.path.join(os.path.dirname(__file__), 'ProtectedFolder')),filename)

Try this:

@app.route("/ProtectedFolder/<path:filename>")
@CheckUserSecurityAccessConditions
def Protect_Content(filename):
  return send_from_directory((os.path.join(os.path.dirname(__file__), 'ProtectedFolder')),filename)

TypeError:ObjectId(”)不可序列化JSON

问题:TypeError:ObjectId(”)不可序列化JSON

在使用Python查询文档上的聚合函数后,我从MongoDB返回了响应,它返回有效响应,我可以打印该响应但不能返回它。

错误:

TypeError: ObjectId('51948e86c25f4b1d1c0d303c') is not JSON serializable

打印:

{'result': [{'_id': ObjectId('51948e86c25f4b1d1c0d303c'), 'api_calls_with_key': 4, 'api_calls_per_day': 0.375, 'api_calls_total': 6, 'api_calls_without_key': 2}], 'ok': 1.0}

但是当我尝试返回时:

TypeError: ObjectId('51948e86c25f4b1d1c0d303c') is not JSON serializable

这是RESTfull调用:

@appv1.route('/v1/analytics')
def get_api_analytics():
    # get handle to collections in MongoDB
    statistics = sldb.statistics

    objectid = ObjectId("51948e86c25f4b1d1c0d303c")

    analytics = statistics.aggregate([
    {'$match': {'owner': objectid}},
    {'$project': {'owner': "$owner",
    'api_calls_with_key': {'$cond': [{'$eq': ["$apikey", None]}, 0, 1]},
    'api_calls_without_key': {'$cond': [{'$ne': ["$apikey", None]}, 0, 1]}
    }},
    {'$group': {'_id': "$owner",
    'api_calls_with_key': {'$sum': "$api_calls_with_key"},
    'api_calls_without_key': {'$sum': "$api_calls_without_key"}
    }},
    {'$project': {'api_calls_with_key': "$api_calls_with_key",
    'api_calls_without_key': "$api_calls_without_key",
    'api_calls_total': {'$add': ["$api_calls_with_key", "$api_calls_without_key"]},
    'api_calls_per_day': {'$divide': [{'$add': ["$api_calls_with_key", "$api_calls_without_key"]}, {'$dayOfMonth': datetime.now()}]},
    }}
    ])


    print(analytics)

    return analytics

数据库连接良好,集合也在那里,我得到了有效的预期结果,但是当我尝试返回时,它给了我Json错误。任何想法如何将响应转换回JSON。谢谢

My response back from MongoDB after querying an aggregated function on document using Python, It returns valid response and i can print it but can not return it.

Error:

TypeError: ObjectId('51948e86c25f4b1d1c0d303c') is not JSON serializable

Print:

{'result': [{'_id': ObjectId('51948e86c25f4b1d1c0d303c'), 'api_calls_with_key': 4, 'api_calls_per_day': 0.375, 'api_calls_total': 6, 'api_calls_without_key': 2}], 'ok': 1.0}

But When i try to return:

TypeError: ObjectId('51948e86c25f4b1d1c0d303c') is not JSON serializable

It is RESTfull call:

@appv1.route('/v1/analytics')
def get_api_analytics():
    # get handle to collections in MongoDB
    statistics = sldb.statistics

    objectid = ObjectId("51948e86c25f4b1d1c0d303c")

    analytics = statistics.aggregate([
    {'$match': {'owner': objectid}},
    {'$project': {'owner': "$owner",
    'api_calls_with_key': {'$cond': [{'$eq': ["$apikey", None]}, 0, 1]},
    'api_calls_without_key': {'$cond': [{'$ne': ["$apikey", None]}, 0, 1]}
    }},
    {'$group': {'_id': "$owner",
    'api_calls_with_key': {'$sum': "$api_calls_with_key"},
    'api_calls_without_key': {'$sum': "$api_calls_without_key"}
    }},
    {'$project': {'api_calls_with_key': "$api_calls_with_key",
    'api_calls_without_key': "$api_calls_without_key",
    'api_calls_total': {'$add': ["$api_calls_with_key", "$api_calls_without_key"]},
    'api_calls_per_day': {'$divide': [{'$add': ["$api_calls_with_key", "$api_calls_without_key"]}, {'$dayOfMonth': datetime.now()}]},
    }}
    ])


    print(analytics)

    return analytics

db is well connected and collection is there too and I got back valid expected result but when i try to return it gives me Json error. Any idea how to convert the response back into JSON. Thanks


回答 0

您应该定义自己JSONEncoder并使用它:

import json
from bson import ObjectId

class JSONEncoder(json.JSONEncoder):
    def default(self, o):
        if isinstance(o, ObjectId):
            return str(o)
        return json.JSONEncoder.default(self, o)

JSONEncoder().encode(analytics)

也可以通过以下方式使用它。

json.encode(analytics, cls=JSONEncoder)

You should define you own JSONEncoder and using it:

import json
from bson import ObjectId

class JSONEncoder(json.JSONEncoder):
    def default(self, o):
        if isinstance(o, ObjectId):
            return str(o)
        return json.JSONEncoder.default(self, o)

JSONEncoder().encode(analytics)

It’s also possible to use it in the following way.

json.encode(analytics, cls=JSONEncoder)

回答 1

Pymongo提供json_util-您可以改用它来处理BSON类型

Pymongo provides json_util – you can use that one instead to handle BSON types

def parse_json(data):
    return json.loads(json_util.dumps(data))

回答 2

>>> from bson import Binary, Code
>>> from bson.json_util import dumps
>>> dumps([{'foo': [1, 2]},
...        {'bar': {'hello': 'world'}},
...        {'code': Code("function x() { return 1; }")},
...        {'bin': Binary("")}])
'[{"foo": [1, 2]}, {"bar": {"hello": "world"}}, {"code": {"$code": "function x() { return 1; }", "$scope": {}}}, {"bin": {"$binary": "AQIDBA==", "$type": "00"}}]'

来自json_util的实际示例。

与Flask的jsonify不同,“转储”将返回一个字符串,因此不能用作Flask的jsonify的1:1替换。

但是这个问题表明,我们可以使用json_util.dumps()进行序列化,使用json.loads()转换回dict,最后对其调用Flask的jsonify。

示例(取自上一个问题的答案):

from bson import json_util, ObjectId
import json

#Lets create some dummy document to prove it will work
page = {'foo': ObjectId(), 'bar': [ObjectId(), ObjectId()]}

#Dump loaded BSON to valid JSON string and reload it as dict
page_sanitized = json.loads(json_util.dumps(page))
return page_sanitized

此解决方案会将ObjectId和其他代码(例如Binary,Code等)转换为等效的字符串,例如“ $ oid”。

JSON输出如下所示:

{
  "_id": {
    "$oid": "abc123"
  }
}
>>> from bson import Binary, Code
>>> from bson.json_util import dumps
>>> dumps([{'foo': [1, 2]},
...        {'bar': {'hello': 'world'}},
...        {'code': Code("function x() { return 1; }")},
...        {'bin': Binary("")}])
'[{"foo": [1, 2]}, {"bar": {"hello": "world"}}, {"code": {"$code": "function x() { return 1; }", "$scope": {}}}, {"bin": {"$binary": "AQIDBA==", "$type": "00"}}]'

Actual example from json_util.

Unlike Flask’s jsonify, “dumps” will return a string, so it cannot be used as a 1:1 replacement of Flask’s jsonify.

But this question shows that we can serialize using json_util.dumps(), convert back to dict using json.loads() and finally call Flask’s jsonify on it.

Example (derived from previous question’s answer):

from bson import json_util, ObjectId
import json

#Lets create some dummy document to prove it will work
page = {'foo': ObjectId(), 'bar': [ObjectId(), ObjectId()]}

#Dump loaded BSON to valid JSON string and reload it as dict
page_sanitized = json.loads(json_util.dumps(page))
return page_sanitized

This solution will convert ObjectId and others (ie Binary, Code, etc) to a string equivalent such as “$oid.”

JSON output would look like this:

{
  "_id": {
    "$oid": "abc123"
  }
}

回答 3

收到“无法JSON序列化”错误的大多数用户只需default=str使用即可指定json.dumps。例如:

json.dumps(my_obj, default=str)

这将强制转换为str,从而避免错误。当然,然后查看生成的输出以确认这就是您所需要的。

Most users who receive the “not JSON serializable” error simply need to specify default=str when using json.dumps. For example:

json.dumps(my_obj, default=str)

This will force a conversion to str, preventing the error. Of course then look at the generated output to confirm that it is what you need.


回答 4

from bson import json_util
import json

@app.route('/')
def index():
    for _ in "collection_name".find():
        return json.dumps(i, indent=4, default=json_util.default)

这是将BSON转换为JSON对象的示例示例。你可以试试看

from bson import json_util
import json

@app.route('/')
def index():
    for _ in "collection_name".find():
        return json.dumps(i, indent=4, default=json_util.default)

This is the sample example for converting BSON into JSON object. You can try this.


回答 5

作为快速替代品,您可以更改{'owner': objectid}{'owner': str(objectid)}

但是定义自己的方法JSONEncoder是更好的解决方案,它取决于您的要求。

As a quick replacement, you can change {'owner': objectid} to {'owner': str(objectid)}.

But defining your own JSONEncoder is a better solution, it depends on your requirements.


回答 6

张贴在这里,因为我认为它可能是使用的人有用的Flaskpymongo。这是我当前的“最佳实践”设置,用于允许flask封送pymongo bson数据类型。

mongoflask.py

from datetime import datetime, date

import isodate as iso
from bson import ObjectId
from flask.json import JSONEncoder
from werkzeug.routing import BaseConverter


class MongoJSONEncoder(JSONEncoder):
    def default(self, o):
        if isinstance(o, (datetime, date)):
            return iso.datetime_isoformat(o)
        if isinstance(o, ObjectId):
            return str(o)
        else:
            return super().default(o)


class ObjectIdConverter(BaseConverter):
    def to_python(self, value):
        return ObjectId(value)

    def to_url(self, value):
        return str(value)

app.py

from .mongoflask import MongoJSONEncoder, ObjectIdConverter

def create_app():
    app = Flask(__name__)
    app.json_encoder = MongoJSONEncoder
    app.url_map.converters['objectid'] = ObjectIdConverter

    # Client sends their string, we interpret it as an ObjectId
    @app.route('/users/<objectid:user_id>')
    def show_user(user_id):
        # setup not shown, pretend this gets us a pymongo db object
        db = get_db()

        # user_id is a bson.ObjectId ready to use with pymongo!
        result = db.users.find_one({'_id': user_id})

        # And jsonify returns normal looking json!
        # {"_id": "5b6b6959828619572d48a9da",
        #  "name": "Will",
        #  "birthday": "1990-03-17T00:00:00Z"}
        return jsonify(result)


    return app

为什么这样做而不是提供BSON或mongod扩展JSON

我认为提供mongo特殊JSON给客户端应用程序带来了负担。大多数客户端应用程序都不会以任何复杂的方式使用mongo对象。如果我提供扩展的json,现在必须在服务器端和客户端使用它。ObjectId并且Timestamp更容易作为字符串使用,这使所有mongo编组的疯狂都隔离在服务器上。

{
  "_id": "5b6b6959828619572d48a9da",
  "created_at": "2018-08-08T22:06:17Z"
}

我认为,与大多数应用程序相比,这要轻得多。

{
  "_id": {"$oid": "5b6b6959828619572d48a9da"},
  "created_at": {"$date": 1533837843000}
}

Posting here as I think it may be useful for people using Flask with pymongo. This is my current “best practice” setup for allowing flask to marshall pymongo bson data types.

mongoflask.py

from datetime import datetime, date

import isodate as iso
from bson import ObjectId
from flask.json import JSONEncoder
from werkzeug.routing import BaseConverter


class MongoJSONEncoder(JSONEncoder):
    def default(self, o):
        if isinstance(o, (datetime, date)):
            return iso.datetime_isoformat(o)
        if isinstance(o, ObjectId):
            return str(o)
        else:
            return super().default(o)


class ObjectIdConverter(BaseConverter):
    def to_python(self, value):
        return ObjectId(value)

    def to_url(self, value):
        return str(value)

app.py

from .mongoflask import MongoJSONEncoder, ObjectIdConverter

def create_app():
    app = Flask(__name__)
    app.json_encoder = MongoJSONEncoder
    app.url_map.converters['objectid'] = ObjectIdConverter

    # Client sends their string, we interpret it as an ObjectId
    @app.route('/users/<objectid:user_id>')
    def show_user(user_id):
        # setup not shown, pretend this gets us a pymongo db object
        db = get_db()

        # user_id is a bson.ObjectId ready to use with pymongo!
        result = db.users.find_one({'_id': user_id})

        # And jsonify returns normal looking json!
        # {"_id": "5b6b6959828619572d48a9da",
        #  "name": "Will",
        #  "birthday": "1990-03-17T00:00:00Z"}
        return jsonify(result)


    return app

Why do this instead of serving BSON or mongod extended JSON?

I think serving mongo special JSON puts a burden on client applications. Most client apps will not care using mongo objects in any complex way. If I serve extended json, now I have to use it server side, and the client side. ObjectId and Timestamp are easier to work with as strings and this keeps all this mongo marshalling madness quarantined to the server.

{
  "_id": "5b6b6959828619572d48a9da",
  "created_at": "2018-08-08T22:06:17Z"
}

I think this is less onerous to work with for most applications than.

{
  "_id": {"$oid": "5b6b6959828619572d48a9da"},
  "created_at": {"$date": 1533837843000}
}

回答 7

这就是我最近修复错误的方式

    @app.route('/')
    def home():
        docs = []
        for doc in db.person.find():
            doc.pop('_id') 
            docs.append(doc)
        return jsonify(docs)

This is how I’ve recently fixed the error

    @app.route('/')
    def home():
        docs = []
        for doc in db.person.find():
            doc.pop('_id') 
            docs.append(doc)
        return jsonify(docs)

回答 8

我知道我发帖太晚了,但我认为这至少可以帮助到一些人!

tim和defuz提到的两个示例(都获得最高投票)都可以很好地工作。但是,有时会有细微的差别,这有时可能很重要。

  1. 以下方法添加了一个额外的字段,该字段是多余的,在所有情况下可能都不理想

Pymongo提供了json_util-您可以改用那个来处理BSON类型

输出:{“ _id”:{“ $ oid”:“ abc123”}}

  1. 在JsonEncoder类以所需的字符串格式给出相同输出的情况下,我们还需要使用json.loads(output)。但这导致

输出:{“ _id”:“ abc123”}

即使第一种方法看起来很简单,但这两种方法都需要非常少的精力。

I know I’m posting late but thought it would help at least a few folks!

Both the examples mentioned by tim and defuz(which are top voted) works perfectly fine. However, there is a minute difference which could be significant at times.

  1. The following method adds one extra field which is redundant and may not be ideal in all the cases

Pymongo provides json_util – you can use that one instead to handle BSON types

Output: { “_id”: { “$oid”: “abc123” } }

  1. Where as the JsonEncoder class gives the same output in the string format as we need and we need to use json.loads(output) in addition. But it leads to

Output: { “_id”: “abc123” }

Even though, the first method looks simple, both the method need very minimal effort.


回答 9

就我而言,我需要这样的东西:

class JsonEncoder():
    def encode(self, o):
        if '_id' in o:
            o['_id'] = str(o['_id'])
        return o

in my case I needed something like this:

class JsonEncoder():
    def encode(self, o):
        if '_id' in o:
            o['_id'] = str(o['_id'])
        return o

回答 10

Flask的jsonify提供了JSON Security中描述的安全性增强功能。如果自定义编码器与Flask一起使用,则最好考虑JSON安全性中讨论的要点

Flask’s jsonify provides security enhancement as described in JSON Security. If custom encoder is used with Flask, its better to consider the points discussed in the JSON Security


回答 11

我想提供一个附加的解决方案,以改善公认的答案。我以前在这里的另一个线程中提供了答案。

from flask import Flask
from flask.json import JSONEncoder

from bson import json_util

from . import resources

# define a custom encoder point to the json_util provided by pymongo (or its dependency bson)
class CustomJSONEncoder(JSONEncoder):
    def default(self, obj): return json_util.default(obj)

application = Flask(__name__)
application.json_encoder = CustomJSONEncoder

if __name__ == "__main__":
    application.run()

I would like to provide an additional solution that improves the accepted answer. I have previously provided the answers in another thread here.

from flask import Flask
from flask.json import JSONEncoder

from bson import json_util

from . import resources

# define a custom encoder point to the json_util provided by pymongo (or its dependency bson)
class CustomJSONEncoder(JSONEncoder):
    def default(self, obj): return json_util.default(obj)

application = Flask(__name__)
application.json_encoder = CustomJSONEncoder

if __name__ == "__main__":
    application.run()

回答 12

如果您不需要记录的_id,我建议在查询数据库时将其取消设置,这将使您可以直接打印返回的记录,例如

要在查询时取消设置_id,然后在循环中打印数据,您可以编写如下代码

records = mycollection.find(query, {'_id': 0}) #second argument {'_id':0} unsets the id from the query
for record in records:
    print(record)

If you will not be needing the _id of the records I will recommend unsetting it when querying the DB which will enable you to print the returned records directly e.g

To unset the _id when querying and then print data in a loop you write something like this

records = mycollection.find(query, {'_id': 0}) #second argument {'_id':0} unsets the id from the query
for record in records:
    print(record)

回答 13

解决方案:mongoengine +棉花糖

如果您使用mongoenginemarshamallow则此解决方案可能适用于您。

基本上,我是String从棉花糖导入字段的,而我覆盖了默认值Schema id进行String编码。

from marshmallow import Schema
from marshmallow.fields import String

class FrontendUserSchema(Schema):

    id = String()

    class Meta:
        fields = ("id", "email")

SOLUTION for: mongoengine + marshmallow

If you use mongoengine and marshamallow then this solution might be applicable for you.

Basically, I imported String field from marshmallow, and I overwritten default Schema id to be String encoded.

from marshmallow import Schema
from marshmallow.fields import String

class FrontendUserSchema(Schema):

    id = String()

    class Meta:
        fields = ("id", "email")

回答 14

from bson.objectid import ObjectId
from core.services.db_connection import DbConnectionService

class DbExecutionService:
     def __init__(self):
        self.db = DbConnectionService()

     def list(self, collection, search):
        session = self.db.create_connection(collection)
        return list(map(lambda row: {i: str(row[i]) if isinstance(row[i], ObjectId) else row[i] for i in row}, session.find(search))
from bson.objectid import ObjectId
from core.services.db_connection import DbConnectionService

class DbExecutionService:
     def __init__(self):
        self.db = DbConnectionService()

     def list(self, collection, search):
        session = self.db.create_connection(collection)
        return list(map(lambda row: {i: str(row[i]) if isinstance(row[i], ObjectId) else row[i] for i in row}, session.find(search))

回答 15

如果您不想_id回应,可以重构代码,如下所示:

jsonResponse = getResponse(mock_data)
del jsonResponse['_id'] # removes '_id' from the final response
return jsonResponse

这将消除TypeError: ObjectId('') is not JSON serializable错误。

If you don’t want _id in response, you can refactor your code something like this:

jsonResponse = getResponse(mock_data)
del jsonResponse['_id'] # removes '_id' from the final response
return jsonResponse

This will remove the TypeError: ObjectId('') is not JSON serializable error.


在Docker中部署最小化Flask应用-服务器连接问题

问题:在Docker中部署最小化Flask应用-服务器连接问题

我有一个唯一依赖的应用程序是flask,它可以在docker外部正常运行并绑定到默认端口5000。这是完整的源代码:

from flask import Flask

app = Flask(__name__)
app.debug = True

@app.route('/')
def main():
    return 'hi'

if __name__ == '__main__':
    app.run()

问题是,当我在docker中部署此服务器时,服务器正在运行,但无法从容器外部访问。

以下是我的Dockerfile。该图像是装有烧瓶的ubuntu。焦油仅包含index.py上面列出的内容;

# Dockerfile
FROM dreen/flask
MAINTAINER dreen
WORKDIR /srv

# Get source
RUN mkdir -p /srv
COPY perfektimprezy.tar.gz /srv/perfektimprezy.tar.gz
RUN tar x -f perfektimprezy.tar.gz
RUN rm perfektimprezy.tar.gz

# Run server
EXPOSE 5000
CMD ["python", "index.py"]

这是我正在部署的步骤

$> sudo docker build -t perfektimprezy .

据我所知,上面的代码运行良好,图像中包含tar的内容/srv。现在,让我们在容器中启动服务器:

$> sudo docker run -i -p 5000:5000 -d perfektimprezy
1c50b67d45b1a4feade72276394811c8399b1b95692e0914ee72b103ff54c769

它真的在运行吗?

$> sudo docker ps
CONTAINER ID        IMAGE                   COMMAND             CREATED             STATUS              PORTS                    NAMES
1c50b67d45b1        perfektimprezy:latest   "python index.py"   5 seconds ago       Up 5 seconds        0.0.0.0:5000->5000/tcp   loving_wozniak

$> sudo docker logs 1c50b67d45b1
 * Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)
 * Restarting with stat

是的,好像flask服务器正在运行。这是奇怪的地方。让我们向服务器发出请求:

 $> curl 127.0.0.1:5000 -v
 * Rebuilt URL to: 127.0.0.1:5000/
 * Hostname was NOT found in DNS cache
 *   Trying 127.0.0.1...
 * Connected to 127.0.0.1 (127.0.0.1) port 5000 (#0)
 > GET / HTTP/1.1
 > User-Agent: curl/7.35.0
 > Host: 127.0.0.1:5000
 > Accept: */*
 >
 * Empty reply from server
 * Connection #0 to host 127.0.0.1 left intact
 curl: (52) Empty reply from server

空回复…但是该流程正在运行吗?

$> sudo docker top 1c50b67d45b1
UID                 PID                 PPID                C                   STIME               TTY                 TIME                CMD
root                2084                812                 0                   10:26               ?                   00:00:00            python index.py
root                2117                2084                0                   10:26               ?                   00:00:00            /usr/bin/python index.py

现在,让我们进入服务器并检查…

$> sudo docker exec -it 1c50b67d45b1 bash
root@1c50b67d45b1:/srv# netstat -an
Active Internet connections (servers and established)
Proto Recv-Q Send-Q Local Address           Foreign Address         State
tcp        0      0 127.0.0.1:5000          0.0.0.0:*               LISTEN
tcp        0      0 127.0.0.1:47677         127.0.0.1:5000          TIME_WAIT
Active UNIX domain sockets (servers and established)
Proto RefCnt Flags       Type       State         I-Node   Path
root@1c50b67d45b1:/srv# curl -I 127.0.0.1:5000
HTTP/1.0 200 OK
Content-Type: text/html; charset=utf-8
Content-Length: 5447
Server: Werkzeug/0.10.4 Python/2.7.6
Date: Tue, 19 May 2015 12:18:14 GMT

很好…但是不是从外面来的:(我做错了什么?

I have an app who’s only dependency is flask, which runs fine outside docker and binds to the default port 5000. Here is the full source:

from flask import Flask

app = Flask(__name__)
app.debug = True

@app.route('/')
def main():
    return 'hi'

if __name__ == '__main__':
    app.run()

The problem is that when I deploy this in docker, the server is running but is unreachable from outside the container.

Below is my Dockerfile. The image is ubuntu with flask installed. The tar just contains the index.py listed above;

# Dockerfile
FROM dreen/flask
MAINTAINER dreen
WORKDIR /srv

# Get source
RUN mkdir -p /srv
COPY perfektimprezy.tar.gz /srv/perfektimprezy.tar.gz
RUN tar x -f perfektimprezy.tar.gz
RUN rm perfektimprezy.tar.gz

# Run server
EXPOSE 5000
CMD ["python", "index.py"]

Here are the steps I am doing to deploy

$> sudo docker build -t perfektimprezy .

As far as I know the above runs fine, the image has the contents of the tar in /srv. Now, let’s start the server in a container:

$> sudo docker run -i -p 5000:5000 -d perfektimprezy
1c50b67d45b1a4feade72276394811c8399b1b95692e0914ee72b103ff54c769

Is it actually running?

$> sudo docker ps
CONTAINER ID        IMAGE                   COMMAND             CREATED             STATUS              PORTS                    NAMES
1c50b67d45b1        perfektimprezy:latest   "python index.py"   5 seconds ago       Up 5 seconds        0.0.0.0:5000->5000/tcp   loving_wozniak

$> sudo docker logs 1c50b67d45b1
 * Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)
 * Restarting with stat

Yep, seems like the flask server is running. Here is where it gets weird. Lets make a request to the server:

 $> curl 127.0.0.1:5000 -v
 * Rebuilt URL to: 127.0.0.1:5000/
 * Hostname was NOT found in DNS cache
 *   Trying 127.0.0.1...
 * Connected to 127.0.0.1 (127.0.0.1) port 5000 (#0)
 > GET / HTTP/1.1
 > User-Agent: curl/7.35.0
 > Host: 127.0.0.1:5000
 > Accept: */*
 >
 * Empty reply from server
 * Connection #0 to host 127.0.0.1 left intact
 curl: (52) Empty reply from server

Empty reply… But is the process running?

$> sudo docker top 1c50b67d45b1
UID                 PID                 PPID                C                   STIME               TTY                 TIME                CMD
root                2084                812                 0                   10:26               ?                   00:00:00            python index.py
root                2117                2084                0                   10:26               ?                   00:00:00            /usr/bin/python index.py

Now let’s ssh into the server and check…

$> sudo docker exec -it 1c50b67d45b1 bash
root@1c50b67d45b1:/srv# netstat -an
Active Internet connections (servers and established)
Proto Recv-Q Send-Q Local Address           Foreign Address         State
tcp        0      0 127.0.0.1:5000          0.0.0.0:*               LISTEN
tcp        0      0 127.0.0.1:47677         127.0.0.1:5000          TIME_WAIT
Active UNIX domain sockets (servers and established)
Proto RefCnt Flags       Type       State         I-Node   Path
root@1c50b67d45b1:/srv# curl -I 127.0.0.1:5000
HTTP/1.0 200 OK
Content-Type: text/html; charset=utf-8
Content-Length: 5447
Server: Werkzeug/0.10.4 Python/2.7.6
Date: Tue, 19 May 2015 12:18:14 GMT

It’s fine… but not from the outside :( What am I doing wrong?


回答 0

问题是您只绑定到localhost接口,0.0.0.0如果要从外部访问容器,则应该绑定到。如果您更改:

if __name__ == '__main__':
    app.run()

if __name__ == '__main__':
    app.run(host='0.0.0.0')

它应该工作。

The problem is you are only binding to the localhost interface, you should be binding to 0.0.0.0 if you want the container to be accessible from outside. If you change:

if __name__ == '__main__':
    app.run()

to

if __name__ == '__main__':
    app.run(host='0.0.0.0')

It should work.


回答 1

当使用flask命令代替时app.run,您可以传递--host选项来更改主机。Docker中的行将是:

CMD ["flask", "run", "--host", "0.0.0.0"]

要么

CMD flask run --host 0.0.0.0

When using the flask command instead of app.run, you can pass the --host option to change the host. The line in Docker would be:

CMD ["flask", "run", "--host", "0.0.0.0"]

or

CMD flask run --host 0.0.0.0

回答 2

您的Docker容器具有多个网络接口。例如,我的容器具有以下内容:

$ ip addr
1: lo: <LOOPBACK,UP,LOWER_UP> mtu 65536 qdisc noqueue state UNKNOWN group default qlen 1000
    link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00
    inet 127.0.0.1/8 scope host lo
       valid_lft forever preferred_lft forever
32: eth0@if33: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc noqueue state UP group default 
    link/ether 02:42:ac:11:00:02 brd ff:ff:ff:ff:ff:ff link-netnsid 0
    inet 172.17.0.2/16 brd 172.17.255.255 scope global eth0
       valid_lft forever preferred_lft forever

如果运行docker network inspect bridge,您可以在上面的输出中看到容器通过第二个接口连接到该网桥。该默认网桥还连接到主机上的Docker进程。

因此,您将必须运行以下命令:

CMD flask run --host 172.17.0.2

从主机访问在Docker容器中运行的Flask应用。用172.17.0.2您的容器的特定IP地址替换。

Your Docker container has more than one network interface. For example, my container has the following:

$ ip addr
1: lo: <LOOPBACK,UP,LOWER_UP> mtu 65536 qdisc noqueue state UNKNOWN group default qlen 1000
    link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00
    inet 127.0.0.1/8 scope host lo
       valid_lft forever preferred_lft forever
32: eth0@if33: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc noqueue state UP group default 
    link/ether 02:42:ac:11:00:02 brd ff:ff:ff:ff:ff:ff link-netnsid 0
    inet 172.17.0.2/16 brd 172.17.255.255 scope global eth0
       valid_lft forever preferred_lft forever

if you run docker network inspect bridge, you can see that your container is connected to that bridge with the second interface in the above output. This default bridge is also connected to the Docker process on your host.

Therefore you would have to run the command:

CMD flask run --host 172.17.0.2

To access your Flask app running in a Docker container from your host machine. Replace 172.17.0.2 with whatever the particular IP address is of your container.


回答 3

以其他答案为基础:

假设您有两台计算机。每台计算机都有一个网络接口(例如WiFi),这是它的公共IP。每台计算机都有一个回送/本地主机接口,位于127.0.0.1。这意味着“仅这台计算机”。

如果在计算机A上列出了127.0.0.1,则在计算机B上运行时,您将无法通过127.0.0.1进行连接。毕竟,您要求侦听计算机A的本地专用地址。

Docker的设置与此类似。从技术上讲,它是同一台计算机,但是Linux内核允许每个容器使用其自己的隔离网络堆栈运行。因此,容器中的127.0.0.1与主机以外的其他计算机上的127.0.0.1相同-您无法连接到它。

较长的版本,带有图表:https : //pythonspeed.com/articles/docker-connection-refused/

To build on other answers:

Imagine you have two computers. Each computer has a network interface (WiFi, say), which is its public IP. Each computer has a loopback/localhost interface, at 127.0.0.1. This means “just this computer.”

If you listed on 127.0.0.1 on computer A, you would not expect to be able to connect to that via 127.0.0.1 when running on computer B. After all, you asked to listen on computer A’s local, private address.

Docker is similar setup; technically it’s the same computer, but the Linux kernel is allowing each container to run with its own isolated network stack. So 127.0.0.1 in a container is the same as 127.0.0.1 on a different computer than your host—you can’t connect to it.

Longer version, with diagrams: https://pythonspeed.com/articles/docker-connection-refused/


回答 4

首先,您需要在python脚本中更改以下代码:

app.run()

app.run(host="0.0.0.0")

其次,在您的docker文件中,最后一行应类似于

CMD ["flask", "run", "-h", "0.0.0.0", "-p", "5000"]

而在主机上,如果0.0.0.0:5000不起作用,那么您应该尝试localhost:5000

注-CMD命令必须正确。因为CMD命令提供了执行容器的默认值。

First of all in your python script you need to change code from

app.run()

to

app.run(host="0.0.0.0")

Second, In your docker file, last line should be like

CMD ["flask", "run", "-h", "0.0.0.0", "-p", "5000"]

And on host machine if 0.0.0.0:5000 doesn’t work then you should try with localhost:5000

Note – The CMD command has to be proper. Because CMD command provide defaults for executing container.


如何使用Flask获取用户代理?

问题:如何使用Flask获取用户代理?

我试图通过Flask访问用户代理,但是我找不到有关它的文档,或者它没有告诉我。

I’m trying to get access to the user agent with Flask, but I either can’t find the documentation on it, or it doesn’t tell me.


回答 0

from flask import request
request.headers.get('User-Agent')

您还可以使用request.user_agent包含以下属性的对象,这些属性是基于useragent字符串创建的:

  • 平台(Windows,Linux,MacOS等)
  • 浏览器(chrome,firefox,msie等)
  • 语言
  • 字串(== request.headers.get('User-Agent')
from flask import request
request.headers.get('User-Agent')

You can also use the request.user_agent object which contains the following attributes which are created based on the useragent string:

  • platform (windows, linux, macos, etc.)
  • browser (chrome, firefox, msie, etc.)
  • version
  • language
  • string (== request.headers.get('User-Agent'))

回答 1

flask.request.user_agent.string
flask.request.user_agent.string

回答 2

如果您使用

request.headers.get('User-Agent')

您可能会得到:Mozilla / 5.0(Windows NT 6.1; WOW64)AppleWebKit / 537.36(KHTML,例如Gecko)Chrome / 45.0.2454.101 Safari / 537.36

如果您使用

request.user_agent

您可能会像这样:

  • user_agent.platform:Windows
  • user_agent.browser:chrome
  • user_agent.version:45.0.2454.101
  • user_agent.language:无
  • user_agent.string:Mozilla / 5.0(Windows NT 6.1; WOW64)AppleWebKit / 537.36(KHTML,例如Gecko)Chrome / 45.0.2454.101 Safari / 537.36

If you use

request.headers.get('User-Agent')

you may get: Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/45.0.2454.101 Safari/537.36

If you use

request.user_agent

you may get like this:

  • user_agent.platform: windows
  • user_agent.browser: chrome
  • user_agent.version: 45.0.2454.101
  • user_agent.language: None
  • user_agent.string: Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/45.0.2454.101 Safari/537.36

回答 3

UA通常不包含语言。如果要在浏览器中设置语言,可以使用

request.accept_languages

它会给您语言列表。例如

LanguageAccept([('en-US', 1), ('en', 0.5)])

要访问第一个值,您可以使用

request.accept_languages[0][0]

这将导致字符串

'en-US'

有关“ accept_language”标头的详细信息:https ://www.w3.org/International/questions/qa-lang-priorities

UA usually does not contain language. If you want to get the language set in browser, you may use

request.accept_languages

It’ll give you list of languages. E.g.

LanguageAccept([('en-US', 1), ('en', 0.5)])

To access the first value, you may use

request.accept_languages[0][0]

which will result in string

'en-US'

Detailed information about ‘accept_language” header: https://www.w3.org/International/questions/qa-lang-priorities


回答 4

这个问题请求更多信息。该库似乎适合从烧瓶中收集大量信息的清单,并具有示例调用以从应用程序上下文中获取此信息。

https://pythonhosted.org/Flask-Track-Usage/

使用情况将以以下格式存储:

[
    {
            'url': str,
            'user_agent': {
                'browser': str,
                'language': str,
                'platform': str,
                'version': str,
            },
            'blueprint': str,
            'view_args': dict or None
            'status': int,
            'remote_addr': str,
            'xforwardedfor': str,
            'authorization': bool
            'ip_info': str or None,
            'path': str,
            'speed': float,
            'date': datetime,
    },
    {
        ....
    }
]

这是图书馆中收集数据的地方之一:

https://github.com/ashcrow/flask-track-usage/blob/master/src/flask_track_usage/ init .py在第158行附近

    data = {
        'url': ctx.request.url,
        'user_agent': ctx.request.user_agent,
        'server_name': ctx.app.name,
        'blueprint': ctx.request.blueprint,
        'view_args': ctx.request.view_args,
        'status': response.status_code,
        'remote_addr': ctx.request.remote_addr,
        'xforwardedfor': ctx.request.headers.get(
            'X-Forwarded-For', None),
        'authorization': bool(ctx.request.authorization),
        'ip_info': None,
        'path': ctx.request.path,
        'speed': float(speed),
        'date': int(time.mktime(current_time.timetuple())),
        'content_length': response.content_length,
        'request': "{} {} {}".format(
            ctx.request.method,
            ctx.request.url,
            ctx.request.environ.get('SERVER_PROTOCOL')
        ),
        'url_args': dict(
            [(k, ctx.request.args[k]) for k in ctx.request.args]
        ),
        'username': None,
        'track_var': g.track_var
    }

The question begs for a lot more information. This library seems to fit the bill of collecting a lot of information out of flask, and has example calls to getting this information out of the application context.

https://pythonhosted.org/Flask-Track-Usage/

Usage gets stored in this format:

[
    {
            'url': str,
            'user_agent': {
                'browser': str,
                'language': str,
                'platform': str,
                'version': str,
            },
            'blueprint': str,
            'view_args': dict or None
            'status': int,
            'remote_addr': str,
            'xforwardedfor': str,
            'authorization': bool
            'ip_info': str or None,
            'path': str,
            'speed': float,
            'date': datetime,
    },
    {
        ....
    }
]

Here is one of the places in the library where the data is collected:

https://github.com/ashcrow/flask-track-usage/blob/master/src/flask_track_usage/init.py around line 158

    data = {
        'url': ctx.request.url,
        'user_agent': ctx.request.user_agent,
        'server_name': ctx.app.name,
        'blueprint': ctx.request.blueprint,
        'view_args': ctx.request.view_args,
        'status': response.status_code,
        'remote_addr': ctx.request.remote_addr,
        'xforwardedfor': ctx.request.headers.get(
            'X-Forwarded-For', None),
        'authorization': bool(ctx.request.authorization),
        'ip_info': None,
        'path': ctx.request.path,
        'speed': float(speed),
        'date': int(time.mktime(current_time.timetuple())),
        'content_length': response.content_length,
        'request': "{} {} {}".format(
            ctx.request.method,
            ctx.request.url,
            ctx.request.environ.get('SERVER_PROTOCOL')
        ),
        'url_args': dict(
            [(k, ctx.request.args[k]) for k in ctx.request.args]
        ),
        'username': None,
        'track_var': g.track_var
    }

即使模板文件存在,Flask也会引发TemplateNotFound错误

问题:即使模板文件存在,Flask也会引发TemplateNotFound错误

我正在尝试渲染文件home.html。该文件存在于我的项目中,但是jinja2.exceptions.TemplateNotFound: home.html当我尝试渲染它时,我一直在获取它。Flask为什么找不到我的模板?

from flask import Flask, render_template

app = Flask(__name__)

@app.route('/')
def home():
    return render_template('home.html')
/myproject
    app.py
    home.html

I am trying to render the file home.html. The file exists in my project, but I keep getting jinja2.exceptions.TemplateNotFound: home.html when I try to render it. Why can’t Flask find my template?

from flask import Flask, render_template

app = Flask(__name__)

@app.route('/')
def home():
    return render_template('home.html')
/myproject
    app.py
    home.html

回答 0

您必须在正确的位置创建模板文件。在templatespython模块旁边的子目录中。

该错误表明目录中没有home.html文件templates/。确保在与python模块相同的目录中创建了该目录,并且确实将home.html文件放在该子目录中。如果您的应用是软件包,则应在软件包创建模板文件夹。

myproject/
    app.py
    templates/
        home.html
myproject/
    mypackage/
        __init__.py
        templates/
            home.html

另外,如果您将模板文件夹命名为其他名称,templates并且不想将其重命名为默认名称,则可以告诉Flask使用该其他目录。

app = Flask(__name__, template_folder='template')  # still relative to module

你可以问烧瓶解释它是如何试图找到一个给定的模板,通过设置EXPLAIN_TEMPLATE_LOADING选项True。对于每个加载的模板,您将获得一个报告记录到Flaskapp.logger的级别INFO

搜索成功后的样子:在此示例中,foo/bar.html模板扩展了base.html模板,因此有两个搜索:

[2019-06-15 16:03:39,197] INFO in debughelpers: Locating template "foo/bar.html":
    1: trying loader of application "flaskpackagename"
       class: jinja2.loaders.FileSystemLoader
       encoding: 'utf-8'
       followlinks: False
       searchpath:
         - /.../project/flaskpackagename/templates
       -> found ('/.../project/flaskpackagename/templates/foo/bar.html')
[2019-06-15 16:03:39,203] INFO in debughelpers: Locating template "base.html":
    1: trying loader of application "flaskpackagename"
       class: jinja2.loaders.FileSystemLoader
       encoding: 'utf-8'
       followlinks: False
       searchpath:
         - /.../project/flaskpackagename/templates
       -> found ('/.../project/flaskpackagename/templates/base.html')

蓝图也可以注册自己的模板目录,但这不是必需的,如果您使用蓝图可以更轻松地在逻辑单元之间拆分较大的项目。即使在每个蓝图中使用其他路径,也始终会首先搜索Flask应用程序主模板目录。

You must create your template files in the correct location; in the templates subdirectory next to your python module.

The error indicates that there is no home.html file in the templates/ directory. Make sure you created that directory in the same directory as your python module, and that you did in fact put a home.html file in that subdirectory. If your app is a package, the templates folder should be created inside the package.

myproject/
    app.py
    templates/
        home.html
myproject/
    mypackage/
        __init__.py
        templates/
            home.html

Alternatively, if you named your templates folder something other than templates and don’t want to rename it to the default, you can tell Flask to use that other directory.

app = Flask(__name__, template_folder='template')  # still relative to module

You can ask Flask to explain how it tried to find a given template, by setting the EXPLAIN_TEMPLATE_LOADING option to True. For every template loaded, you’ll get a report logged to the Flask app.logger, at level INFO.

This is what it looks like when a search is successful; in this example the foo/bar.html template extends the base.html template, so there are two searches:

[2019-06-15 16:03:39,197] INFO in debughelpers: Locating template "foo/bar.html":
    1: trying loader of application "flaskpackagename"
       class: jinja2.loaders.FileSystemLoader
       encoding: 'utf-8'
       followlinks: False
       searchpath:
         - /.../project/flaskpackagename/templates
       -> found ('/.../project/flaskpackagename/templates/foo/bar.html')
[2019-06-15 16:03:39,203] INFO in debughelpers: Locating template "base.html":
    1: trying loader of application "flaskpackagename"
       class: jinja2.loaders.FileSystemLoader
       encoding: 'utf-8'
       followlinks: False
       searchpath:
         - /.../project/flaskpackagename/templates
       -> found ('/.../project/flaskpackagename/templates/base.html')

Blueprints can register their own template directories too, but this is not a requirement if you are using blueprints to make it easier to split a larger project across logical units. The main Flask app template directory is always searched first even when using additional paths per blueprint.


回答 1

我认为Flask默认使用目录模板。因此,您的代码应该假设这是您的hello.py

from flask import Flask,render_template

app=Flask(__name__,template_folder='template')


@app.route("/")
def home():
    return render_template('home.html')

@app.route("/about/")
def about():
    return render_template('about.html')

if __name__=="__main__":
    app.run(debug=True)

你的工作空间结构就像

project/
    hello.py        
    template/
         home.html
         about.html    
    static/
           js/
             main.js
           css/
               main.css

您还创建了两个HTML文件,名称分别为home.html和about.html,并将它们放在模板文件夹中。

I think Flask uses the directory templates by default. So your code should be suppose this is your hello.py

from flask import Flask,render_template

app=Flask(__name__,template_folder='template')


@app.route("/")
def home():
    return render_template('home.html')

@app.route("/about/")
def about():
    return render_template('about.html')

if __name__=="__main__":
    app.run(debug=True)

And you work space structure like

project/
    hello.py        
    template/
         home.html
         about.html    
    static/
           js/
             main.js
           css/
               main.css

also you have create two html files with name of home.html and about.html and put those files in template folder.


回答 2

(请注意,上面为文件/项目结构提供的已接受答案是绝对正确的。)

也..

除了正确设置项目文件结构外,我们还必须告诉flask查找目录层次结构的适当级别。

例如..

    app = Flask(__name__, template_folder='../templates')
    app = Flask(__name__, template_folder='../templates', static_folder='../static')

从开始../向后移动一个目录,然后从那里开始。

从开始../../向后移动两个目录,然后从那里开始(依此类推…)。

希望这可以帮助

(Please note that the above accepted Answer provided for file/project structure is absolutely correct.)

Also..

In addition to properly setting up the project file structure, we have to tell flask to look in the appropriate level of the directory hierarchy.

for example..

    app = Flask(__name__, template_folder='../templates')
    app = Flask(__name__, template_folder='../templates', static_folder='../static')

Starting with ../ moves one directory backwards and starts there.

Starting with ../../ moves two directories backwards and starts there (and so on…).

Hope this helps


回答 3

我不知道为什么,但是我不得不使用以下文件夹结构。我将“模板”提高了一层。

project/
    app/
        hello.py
        static/
            main.css
    templates/
        home.html
    venv/

这可能表明在其他地方配置错误,但是我无法弄清楚那是什么,并且这可行。

I don’t know why, but I had to use the following folder structure instead. I put “templates” one level up.

project/
    app/
        hello.py
        static/
            main.css
    templates/
        home.html
    venv/

This probably indicates a misconfiguration elsewhere, but I couldn’t figure out what that was and this worked.


回答 4

检查:

  1. 模板文件具有正确的名称
  2. 模板文件位于一个名为 templates
  3. 您传递给的名称render_template是相对于模板目录的(index.html直接位于模板目录中,位于模板目录中auth/login.html的auth目录下。)
  4. 您可能没有与您的应用同名的子目录,或者模板目录位于该子目录中。

如果这不起作用,请打开调试(app.debug = True),这可能有助于找出问题所在。

Check that:

  1. the template file has the right name
  2. the template file is in a subdirectory called templates
  3. the name you pass to render_template is relative to the template directory (index.html would be directly in the templates directory, auth/login.html would be under the auth directory in the templates directory.)
  4. you either do not have a subdirectory with the same name as your app, or the templates directory is inside that subdir.

If that doesn’t work, turn on debugging (app.debug = True) which might help figure out what’s wrong.


回答 5

如果从已安装的程序包运行代码,请确保目录中存在模板文件<python root>/lib/site-packages/your-package/templates


一些细节:

就我而言,我试图运行项目flask_simple_ui的示例,并且jinja总是会说

jinja2.exceptions.TemplateNotFound:form.html

诀窍是示例程序将导入已安装的软件包flask_simple_ui。而ninja从内部的包装使用的根目录查找包路径,在我的情况下被使用...python/lib/site-packages/flask_simple_ui,而不是os.getcwd() 作为一个期望的那样。

不幸的是,它setup.py存在一个错误,并且不会复制任何html文件,包括missing form.html。一旦修复setup.pyTemplateNotFound的问题就消失了。

希望对您有所帮助。

If you run your code from an installed package, make sure template files are present in directory <python root>/lib/site-packages/your-package/templates.


Some details:

In my case I was trying to run examples of project flask_simple_ui and jinja would always say

jinja2.exceptions.TemplateNotFound: form.html

The trick was that sample program would import installed package flask_simple_ui. And ninja being used from inside that package is using as root directory for lookup the package path, in my case ...python/lib/site-packages/flask_simple_ui, instead of os.getcwd() as one would expect.

To my bad luck, setup.py has a bug and doesn’t copy any html files, including the missing form.html. Once I fixed setup.py, the problem with TemplateNotFound vanished.

I hope it helps someone.


回答 6

我遇到了同样的错误,事实证明,我唯一做错的就是将我的“模板”文件夹命名为“模板”而不命名为“ s”。更改它正常工作后,不知道为什么它是东西,但是它是。

I had the same error turns out the only thing i did wrong was to name my ‘templates’ folder,’template’ without ‘s’. After changing that it worked fine,dont know why its a thing but it is.


回答 7

当使用render_template()函数时,它将尝试在名为template的文件夹中搜索模板,并在以下情况下引发jinja2.exceptions.TemplateNotFound错误:

  1. html文件不存在或
  2. 当模板文件夹不存在时

解决问题:

在python文件所在的目录中创建一个带有名称模板的文件夹,并将创建的html文件放置在模板文件夹中。

When render_template() function is used it tries to search for template in the folder called templates and it throws error jinja2.exceptions.TemplateNotFound when :

  1. the html file do not exist or
  2. when templates folder do not exist

To solve the problem :

create a folder with name templates in the same directory where the python file is located and place the html file created in the templates folder.


回答 8

您需要将所有.html文件放在python模块旁边的模板文件夹中。并且,如果您的html文件中使用了任何图像,则需要将所有文件放在名为static的文件夹中

在以下结构中

project/
    hello.py
    static/
        image.jpg
        style.css
    templates/
        homepage.html
    virtual/
        filename.json

You need to put all you .html files in the template folder next to your python module. And if there are any images that you are using in your html files then you need put all your files in the folder named static

In the following Structure

project/
    hello.py
    static/
        image.jpg
        style.css
    templates/
        homepage.html
    virtual/
        filename.json

回答 9

另一种选择是设置,root_path它可以解决模板和静态文件夹的问题。

root_path = Path(sys.executable).parent if getattr(sys, 'frozen', False) else Path(__file__).parent
app = Flask(__name__.split('.')[0], root_path=root_path)

如果您直接通过渲染模板Jinja2,则可以这样写:

ENV = jinja2.Environment(loader=jinja2.FileSystemLoader(str(root_path / 'templates')))
template = ENV.get_template(your_template_name)

Another alternative is to set the root_path which fixes the problem both for templates and static folders.

root_path = Path(sys.executable).parent if getattr(sys, 'frozen', False) else Path(__file__).parent
app = Flask(__name__.split('.')[0], root_path=root_path)

If you render templates directly via Jinja2, then you write:

ENV = jinja2.Environment(loader=jinja2.FileSystemLoader(str(root_path / 'templates')))
template = ENV.get_template(your_template_name)

目标数据库不是最新的

问题:目标数据库不是最新的

我想迁移一个Flask应用程序。我正在使用Alembic。

但是,我收到以下错误。

Target database is not up to date.

在网上,我读到它与此有关。 http://alembic.zzzcomputing.com/zh-CN/latest/cookbook.html#building-an-up-to-date-database-from-scratch

不幸的是,我不太了解如何使数据库保持最新状态,以及在何处/如何编写链接中给出的代码。如果您有迁移的经验,能否请您为我解释一下

谢谢

I’d like to make a migration for a Flask app. I am using Alembic.

However, I receive the following error.

Target database is not up to date.

Online, I read that it has something to do with this. http://alembic.zzzcomputing.com/en/latest/cookbook.html#building-an-up-to-date-database-from-scratch

Unfortunately, I don’t quite understand how to get the database up to date and where/how I should write the code given in the link. If you have experience with migrations, can you please explain this for me

Thanks


回答 0

创建迁移后,无论是手动还是as --autogenerate,都必须使用进行应用alembic upgrade head。如果db.create_all()从外壳程序使用alembic stamp head,则可以用来表示数据库的当前状态代表所有迁移的应用程序。

After creating a migration, either manually or as --autogenerate, you must apply it with alembic upgrade head. If you used db.create_all() from a shell, you can use alembic stamp head to indicate that the current state of the database represents the application of all migrations.


回答 1

这对我有用

$ flask db stamp head
$ flask db migrate
$ flask db upgrade

This Worked For me

$ flask db stamp head
$ flask db migrate
$ flask db upgrade

回答 2

我的想法是这样的,当我执行“ ./manage.py db migration -m’添加关系’”时,出现的错误是这样的:“ alembic.util.exc.CommandError:目标数据库不是最新的。”

因此,我检查了迁移状态:

(venv) ]#./manage.py db heads
d996b44eca57 (head)
(venv) ]#./manage.py db current
INFO  [alembic.runtime.migration] Context impl SQLiteImpl.
INFO  [alembic.runtime.migration] Will assume non-transactional DDL.
715f79abbd75

并发现磁头和电流不同!

我通过执行以下步骤对其进行了修复:

(venv)]#./manage.py db stamp heads
INFO  [alembic.runtime.migration] Context impl SQLiteImpl.
INFO  [alembic.runtime.migration] Will assume non-transactional DDL.
INFO  [alembic.runtime.migration] Running stamp_revision 715f79abbd75 -> d996b44eca57

而现在的头脑是一样的

(venv) ]#./manage.py db current
INFO  [alembic.runtime.migration] Context impl SQLiteImpl.
INFO  [alembic.runtime.migration] Will assume non-transactional DDL.
d996b44eca57 (head)

现在,我可以再次进行迁移了。

My stuation is like this question, When I execute “./manage.py db migrate -m ‘Add relationship'”, the error occused like this ” alembic.util.exc.CommandError: Target database is not up to date.”

So I checked the status of my migrate:

(venv) ]#./manage.py db heads
d996b44eca57 (head)
(venv) ]#./manage.py db current
INFO  [alembic.runtime.migration] Context impl SQLiteImpl.
INFO  [alembic.runtime.migration] Will assume non-transactional DDL.
715f79abbd75

and found that the heads and the current are different!

I fixed it by doing this steps:

(venv)]#./manage.py db stamp heads
INFO  [alembic.runtime.migration] Context impl SQLiteImpl.
INFO  [alembic.runtime.migration] Will assume non-transactional DDL.
INFO  [alembic.runtime.migration] Running stamp_revision 715f79abbd75 -> d996b44eca57

And now the current is same to the head

(venv) ]#./manage.py db current
INFO  [alembic.runtime.migration] Context impl SQLiteImpl.
INFO  [alembic.runtime.migration] Will assume non-transactional DDL.
d996b44eca57 (head)

And now I can do the migrate again.


回答 3

可以用多种方法解决bby:

1要解决此错误,请删除最新的迁移文件(python文件),然后尝试重新执行迁移。

如果问题仍然存在,请尝试以下命令:

$ flask db stamp head  # To set the revision in the database to the head, without performing any migrations. You can change head to the required change you want.
$ flask db migrate     # To detect automatically all the changes.
$ flask db upgrade     # To apply all the changes.

This can be solved bby many ways :

1 To fix this error, delete the latest migration file ( a python file) then try to perform a migration afresh.

If issue still persists try these commands :

$ flask db stamp head  # To set the revision in the database to the head, without performing any migrations. You can change head to the required change you want.
$ flask db migrate     # To detect automatically all the changes.
$ flask db upgrade     # To apply all the changes.

回答 4

由于某种原因,我不得不删除一些迁移文件。不知道为什么。但这解决了问题。

一个问题是数据库最终会正确更新,包括所有新表等,但是当我使用自动迁移时,迁移文件本身不会显示任何更改。

如果有人有更好的解决方案,请让我知道,因为目前我的解决方案有点怪异。

I had to delete some of my migration files for some reason. Not sure why. But that fixed the problem, kind of.

One issue is that the database ends up getting updated properly, with all the new tables, etc, but the migration files themselves don’t show any changes when I use automigrate.

If someone has a better solution, please let me know, as right now my solution is kind of hacky.


回答 5

$ flask db stamp head  # To set the revision in the database to the head, without performing any migrations. You can change head to the required change you want.
$ flask db migrate  # To detect automatically all the changes.
$ flask db upgrade  # To apply all the changes.

您可以在文档https://flask-migrate.readthedocs.io/en/latest/中找到更多信息。

$ flask db stamp head  # To set the revision in the database to the head, without performing any migrations. You can change head to the required change you want.
$ flask db migrate  # To detect automatically all the changes.
$ flask db upgrade  # To apply all the changes.

You can find more info at the documentation https://flask-migrate.readthedocs.io/en/latest/


回答 6

我也遇到了不同的想法,我想将其中一个字段从字符串更改为整数,因此首先运行:

$ flask db stamp head # to make the current the same
$ flask db migrate
$ flask db upgrade

现在解决了!

I did too run into different heads and I wanted to change one of the fields from string to integer, so first run:

$ flask db stamp head # to make the current the same
$ flask db migrate
$ flask db upgrade

It’s solved now!


回答 7

如果您和我一样刚刚开始一个新项目,并且正在使用内存中的SQLite数据库(sqlite:///:memory:),也会发生这种情况。如果您在这样的数据库上应用迁移,那么很明显,下次您要说自动生成修订时,数据库仍将保持其原始状态(空),因此,Alembic会抱怨目标数据库未达到要求日期。解决方案是切换到持久数据库。

This can also happen if you, like myself, have just started a new project and you are using in-memory SQLite database (sqlite:///:memory:). If you apply a migration on such a database, obviously the next time you want to say auto-generate a revision, the database will still be in its original state (empty), so alembic will be complaining that the target database is not up to date. The solution is to switch to a persisted database.


回答 8

要解决此错误,请删除最新的迁移文件(python文件),然后尝试重新执行迁移。

To fix this error, delete the latest migration file ( a python file) then try to perform a migration afresh.


回答 9

在执行db upgrade命令之前,请尝试删除所有表。

Try to drop all tables before execute the db upgrade command.


回答 10

为了解决这个问题,我删除(删除)了迁移中的表并运行以下命令

flask db migrate

flask db upgrade

To solve this, I drop(delete) the tables in migration and run these commands

flask db migrate

and

flask db upgrade