问题:如何安排功能在Flask上每小时运行一次?

我有一个Flask虚拟主机,无法访问 cron命令。

我如何每小时执行一些Python函数?

I have a Flask web hosting with no access to cron command.

How can I execute some Python function every hour?


回答 0

您可以BackgroundScheduler()APScheduler软件包(v3.5.3)中使用:

import time
import atexit

from apscheduler.schedulers.background import BackgroundScheduler


def print_date_time():
    print(time.strftime("%A, %d. %B %Y %I:%M:%S %p"))


scheduler = BackgroundScheduler()
scheduler.add_job(func=print_date_time, trigger="interval", seconds=3)
scheduler.start()

# Shut down the scheduler when exiting the app
atexit.register(lambda: scheduler.shutdown())

请注意,当Flask处于调试模式时,将启动其中两个调度程序。有关更多信息,请查看问题。

You can use BackgroundScheduler() from APScheduler package (v3.5.3):

import time
import atexit

from apscheduler.schedulers.background import BackgroundScheduler


def print_date_time():
    print(time.strftime("%A, %d. %B %Y %I:%M:%S %p"))


scheduler = BackgroundScheduler()
scheduler.add_job(func=print_date_time, trigger="interval", seconds=3)
scheduler.start()

# Shut down the scheduler when exiting the app
atexit.register(lambda: scheduler.shutdown())

Note that two of these schedulers will be launched when Flask is in debug mode. For more information, check out this question.


回答 1

您可以APScheduler在Flask应用程序中使用它并通过其界面运行作业:

import atexit

# v2.x version - see https://stackoverflow.com/a/38501429/135978
# for the 3.x version
from apscheduler.scheduler import Scheduler
from flask import Flask

app = Flask(__name__)

cron = Scheduler(daemon=True)
# Explicitly kick off the background thread
cron.start()

@cron.interval_schedule(hours=1)
def job_function():
    # Do your work here


# Shutdown your cron thread if the web process is stopped
atexit.register(lambda: cron.shutdown(wait=False))

if __name__ == '__main__':
    app.run()

You could make use of APScheduler in your Flask application and run your jobs via its interface:

import atexit

# v2.x version - see https://stackoverflow.com/a/38501429/135978
# for the 3.x version
from apscheduler.scheduler import Scheduler
from flask import Flask

app = Flask(__name__)

cron = Scheduler(daemon=True)
# Explicitly kick off the background thread
cron.start()

@cron.interval_schedule(hours=1)
def job_function():
    # Do your work here


# Shutdown your cron thread if the web process is stopped
atexit.register(lambda: cron.shutdown(wait=False))

if __name__ == '__main__':
    app.run()

回答 2

我对应用程序调度程序的概念有些陌生,但是我在这里找到的APScheduler v3.3.1有点不同。我相信对于最新版本,软件包的结构,类名等已经发生了变化,因此,我在这里提出了我最近制作的,与基本Flask应用程序集成的新解决方案:

#!/usr/bin/python3
""" Demonstrating Flask, using APScheduler. """

from apscheduler.schedulers.background import BackgroundScheduler
from flask import Flask

def sensor():
    """ Function for test purposes. """
    print("Scheduler is alive!")

sched = BackgroundScheduler(daemon=True)
sched.add_job(sensor,'interval',minutes=60)
sched.start()

app = Flask(__name__)

@app.route("/home")
def home():
    """ Function for test purposes. """
    return "Welcome Home :) !"

if __name__ == "__main__":
    app.run()

如果有人对此示例的更新感兴趣,我还将把这个要点留在这里

以下是一些参考资料,供以后阅读:

I’m a little bit new with the concept of application schedulers, but what I found here for APScheduler v3.3.1 , it’s something a little bit different. I believe that for the newest versions, the package structure, class names, etc., have changed, so I’m putting here a fresh solution which I made recently, integrated with a basic Flask application:

#!/usr/bin/python3
""" Demonstrating Flask, using APScheduler. """

from apscheduler.schedulers.background import BackgroundScheduler
from flask import Flask

def sensor():
    """ Function for test purposes. """
    print("Scheduler is alive!")

sched = BackgroundScheduler(daemon=True)
sched.add_job(sensor,'interval',minutes=60)
sched.start()

app = Flask(__name__)

@app.route("/home")
def home():
    """ Function for test purposes. """
    return "Welcome Home :) !"

if __name__ == "__main__":
    app.run()

I’m also leaving this Gist here, if anyone have interest on updates for this example.

Here are some references, for future readings:


回答 3

您可以尝试使用APScheduler的BackgroundScheduler将间隔作业集成到Flask应用程序中。以下是使用蓝图和应用程序工厂(init .py)的示例:

from datetime import datetime

# import BackgroundScheduler
from apscheduler.schedulers.background import BackgroundScheduler
from flask import Flask

from webapp.models.main import db 
from webapp.controllers.main import main_blueprint    

# define the job
def hello_job():
    print('Hello Job! The time is: %s' % datetime.now())

def create_app(object_name):
    app = Flask(__name__)
    app.config.from_object(object_name)
    db.init_app(app)
    app.register_blueprint(main_blueprint)
    # init BackgroundScheduler job
    scheduler = BackgroundScheduler()
    # in your case you could change seconds to hours
    scheduler.add_job(hello_job, trigger='interval', seconds=3)
    scheduler.start()

    try:
        # To keep the main thread alive
        return app
    except:
        # shutdown if app occurs except 
        scheduler.shutdown()

希望能帮助到你 :)

参考:

  1. https://github.com/agronholm/apscheduler/blob/master/examples/schedulers/background.py

You could try using APScheduler’s BackgroundScheduler to integrate interval job into your Flask app. Below is the example that uses blueprint and app factory (init.py) :

from datetime import datetime

# import BackgroundScheduler
from apscheduler.schedulers.background import BackgroundScheduler
from flask import Flask

from webapp.models.main import db 
from webapp.controllers.main import main_blueprint    

# define the job
def hello_job():
    print('Hello Job! The time is: %s' % datetime.now())

def create_app(object_name):
    app = Flask(__name__)
    app.config.from_object(object_name)
    db.init_app(app)
    app.register_blueprint(main_blueprint)
    # init BackgroundScheduler job
    scheduler = BackgroundScheduler()
    # in your case you could change seconds to hours
    scheduler.add_job(hello_job, trigger='interval', seconds=3)
    scheduler.start()

    try:
        # To keep the main thread alive
        return app
    except:
        # shutdown if app occurs except 
        scheduler.shutdown()

Hope it helps :)

Ref :

  1. https://github.com/agronholm/apscheduler/blob/master/examples/schedulers/background.py

回答 4

对于一个简单的解决方案,您可以添加一条路线,例如

@app.route("/cron/do_the_thing", methods=['POST'])
def do_the_thing():
    logging.info("Did the thing")
    return "OK", 200

然后添加一个unix cron作业,该作业定期发布到此端点。例如,每分钟运行一次,在终端类型中crontab -e添加以下行:

* * * * * /opt/local/bin/curl -X POST https://YOUR_APP/cron/do_the_thing

(请注意,curl的路径必须是完整的,因为在作业运行时它将没有您的PATH。您可以通过以下方式找到系统上curl的完整路径: which curl

我之所以喜欢它,是因为手动测试很容易,它没有额外的依赖关系,并且因为没有什么特别的事情而易于理解。

安全

如果您想用密码保护您的cron作业,可以打开pip install Flask-BasicAuth,然后将凭据添加到您的应用配置中:

app = Flask(__name__)
app.config['BASIC_AUTH_REALM'] = 'realm'
app.config['BASIC_AUTH_USERNAME'] = 'falken'
app.config['BASIC_AUTH_PASSWORD'] = 'joshua'

密码保护作业端点:

from flask_basicauth import BasicAuth
basic_auth = BasicAuth(app)

@app.route("/cron/do_the_thing", methods=['POST'])
@basic_auth.required
def do_the_thing():
    logging.info("Did the thing a bit more securely")
    return "OK", 200

然后从您的cron作业中调用它:

* * * * * /opt/local/bin/curl -X POST https://falken:joshua@YOUR_APP/cron/do_the_thing

For a simple solution, you could add a route such as

@app.route("/cron/do_the_thing", methods=['POST'])
def do_the_thing():
    logging.info("Did the thing")
    return "OK", 200

Then add a unix cron job that POSTs to this endpoint periodically. For example to run it once a minute, in terminal type crontab -e and add this line:

* * * * * /opt/local/bin/curl -X POST https://YOUR_APP/cron/do_the_thing

(Note that the path to curl has to be complete, as when the job runs it won’t have your PATH. You can find out the full path to curl on your system by which curl)

I like this in that it’s easy to test the job manually, it has no extra dependencies and as there isn’t anything special going on it is easy to understand.

Security

If you’d like to password protect your cron job, you can pip install Flask-BasicAuth, and then add the credentials to your app configuration:

app = Flask(__name__)
app.config['BASIC_AUTH_REALM'] = 'realm'
app.config['BASIC_AUTH_USERNAME'] = 'falken'
app.config['BASIC_AUTH_PASSWORD'] = 'joshua'

To password protect the job endpoint:

from flask_basicauth import BasicAuth
basic_auth = BasicAuth(app)

@app.route("/cron/do_the_thing", methods=['POST'])
@basic_auth.required
def do_the_thing():
    logging.info("Did the thing a bit more securely")
    return "OK", 200

Then to call it from your cron job:

* * * * * /opt/local/bin/curl -X POST https://falken:joshua@YOUR_APP/cron/do_the_thing

回答 5

另一种选择是使用Flask-APScheduler,它与Flask配合得很好,例如:

  • 从Flask配置加载调度程序配置,
  • 从Flask配置中加载作业定义

此处的更多信息:

https://pypi.python.org/pypi/Flask-APScheduler

Another alternative might be to use Flask-APScheduler which plays nicely with Flask, e.g.:

  • Loads scheduler configuration from Flask configuration,
  • Loads job definitions from Flask configuration

More information here:

https://pypi.python.org/pypi/Flask-APScheduler


回答 6

一个完整的示例,使用调度和多重处理,对on_off控制和参数run_job()进行了简化,返回码得以简化,时间间隔设置为10秒,更改every(2).hour.do()为2小时。日程安排非常令人印象深刻,它不会漂移,而且我从未在安排日程时看到超过100毫秒的时间。使用多处理而不是线程,因为它具有终止方法。

#!/usr/bin/env python3

import schedule
import time
import datetime
import uuid

from flask import Flask, request
from multiprocessing import Process

app = Flask(__name__)
t = None
job_timer = None

def run_job(id):
    """ sample job with parameter """
    global job_timer
    print("timer job id={}".format(id))
    print("timer: {:.4f}sec".format(time.time() - job_timer))
    job_timer = time.time()

def run_schedule():
    """ infinite loop for schedule """
    global job_timer
    job_timer = time.time()
    while 1:
        schedule.run_pending()
        time.sleep(1)

@app.route('/timer/<string:status>')
def mytimer(status, nsec=10):
    global t, job_timer
    if status=='on' and not t:
        schedule.every(nsec).seconds.do(run_job, str(uuid.uuid4()))
        t = Process(target=run_schedule)
        t.start()
        return "timer on with interval:{}sec\n".format(nsec)
    elif status=='off' and t:
        if t:
            t.terminate()
            t = None
            schedule.clear()
        return "timer off\n"
    return "timer status not changed\n"

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000)

您可以通过以下方式对此进行测试:

$ curl http://127.0.0.1:5000/timer/on
timer on with interval:10sec
$ curl http://127.0.0.1:5000/timer/on
timer status not changed
$ curl http://127.0.0.1:5000/timer/off
timer off
$ curl http://127.0.0.1:5000/timer/off
timer status not changed

计时器每隔10秒就会向控制台发出一个计时器消息:

127.0.0.1 - - [18/Sep/2018 21:20:14] "GET /timer/on HTTP/1.1" 200 -
timer job id=b64ed165-911f-4b47-beed-0d023ead0a33
timer: 10.0117sec
timer job id=b64ed165-911f-4b47-beed-0d023ead0a33
timer: 10.0102sec

A complete example using schedule and multiprocessing, with on and off control and parameter to run_job() the return codes are simplified and interval is set to 10sec, change to every(2).hour.do()for 2hours. Schedule is quite impressive it does not drift and I’ve never seen it more than 100ms off when scheduling. Using multiprocessing instead of threading because it has a termination method.

#!/usr/bin/env python3

import schedule
import time
import datetime
import uuid

from flask import Flask, request
from multiprocessing import Process

app = Flask(__name__)
t = None
job_timer = None

def run_job(id):
    """ sample job with parameter """
    global job_timer
    print("timer job id={}".format(id))
    print("timer: {:.4f}sec".format(time.time() - job_timer))
    job_timer = time.time()

def run_schedule():
    """ infinite loop for schedule """
    global job_timer
    job_timer = time.time()
    while 1:
        schedule.run_pending()
        time.sleep(1)

@app.route('/timer/<string:status>')
def mytimer(status, nsec=10):
    global t, job_timer
    if status=='on' and not t:
        schedule.every(nsec).seconds.do(run_job, str(uuid.uuid4()))
        t = Process(target=run_schedule)
        t.start()
        return "timer on with interval:{}sec\n".format(nsec)
    elif status=='off' and t:
        if t:
            t.terminate()
            t = None
            schedule.clear()
        return "timer off\n"
    return "timer status not changed\n"

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000)

You test this by just issuing:

$ curl http://127.0.0.1:5000/timer/on
timer on with interval:10sec
$ curl http://127.0.0.1:5000/timer/on
timer status not changed
$ curl http://127.0.0.1:5000/timer/off
timer off
$ curl http://127.0.0.1:5000/timer/off
timer status not changed

Every 10sec the timer is on it will issue a timer message to console:

127.0.0.1 - - [18/Sep/2018 21:20:14] "GET /timer/on HTTP/1.1" 200 -
timer job id=b64ed165-911f-4b47-beed-0d023ead0a33
timer: 10.0117sec
timer job id=b64ed165-911f-4b47-beed-0d023ead0a33
timer: 10.0102sec

回答 7

您可能想将某些队列机制与RQ调度程序之类的调度程序一起使用,或者将其与诸如Celery之类的较重的东西一起使用(最有可能是一种过大的杀伤力)。

You might want to use some queue mechanism with scheduler like RQ scheduler or something more heavy like Celery (most probably an overkill).


声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。