标签归档:scheduled-tasks

使用Celery vs. RQ的利弊[关闭]

问题:使用Celery vs. RQ的利弊[关闭]

目前,我正在处理需要实施一些后台作业(主要用于电子邮件发送和大量数据库更新)的python项目。我将Redis用于任务代理。因此,在这一点上,我有两个候选人:CeleryRQ。我对这些工作队列有一定的经验,但我想请大家分享使用此工具的经验。所以。

  1. 使用Celery vs.RQ有什么优缺点。
  2. 适合使用Celery vs. RQ的项目/任务的任何示例。

Celery看起来很复杂,但是它是功能齐全的解决方案。实际上,我认为我不需要所有这些功能。从另一方面讲,RQ非常简单(例如,配置,集成),但是它似乎缺少一些有用的功能(例如,任务吊销,代码自动重载)

Currently I’m working on python project that requires implement some background jobs (mostly for email sending and heavily database updates). I use Redis for task broker. So in this point I have two candidates: Celery and RQ. I had some experience with these job queues, but I want to ask you guys to share you experience of using this tools. So.

  1. What pros and cons to use Celery vs. RQ.
  2. Any examples of projects/task suitable to use Celery vs. RQ.

Celery looks pretty complicated but it’s full featured solution. Actually I don’t think that I need all these features. From other side RQ is very simple (e.g configuration, integration), but it seems that it lacks some useful features (e.g task revoking, code auto-reloading)


回答 0

这是我在尝试回答这个完全相同的问题时发现的。它可能不全面,甚至在某些方面可能不准确。

简而言之,RQ被设计为更简单。Celery被设计为更坚固。他们俩都很棒。

  • 文档。RQ的文档全面而又不复杂,并且反映了项目的整体简单性-您永远不会感到迷茫或困惑。Celery的文档也很全面,但是在您第一次进行设置时,由于有太多可供选择的内部化选项,因此希望能重新访问它
  • 监控。Celery的FlowerRQ仪表板都很容易设置,并且至少为您提供了90%的所有信息

  • 经纪人支持。Celery无疑是赢家,RQ仅支持Redis。这意味着有关“什么是经纪人”的文档更少,但是也意味着,如果Redis不再为您服务,则您将来将无法切换经纪人。例如,Instagram在Celery中同时考虑了Redis和RabbitMQ。这很重要,因为不同的代理有不同的保证,例如,Redis 无法(截至撰写时)保证100%传递您的消息。

  • 优先级队列。RQ的优先级队列模型简单有效,工作人员按顺序从队列中读取。Celery要求纺纱多个工人从不同的队列消费。两种方法都有效

  • 操作系统支持。Celery无疑是赢家,因为RQ仅在支持forkUnix系统的系统上运行

  • 语言支持。RQ仅支持Python,而Celery允许您将任务从一种语言发送到另一种语言

  • API。Celery非常灵活(多个结果后端,漂亮的配置格式,工作流画布支持),但是自然地,这种功能可能会令人困惑。相比之下,RQ API很简单。

  • 子任务支持。Celery支持子任务(例如,从现有任务中创建新任务)。我不知道RQ是否

  • 社区与稳定。Celery可能更成熟,但它们都是活跃的项目。截至撰写本文时,Celery在Github上拥有约3500颗星,而RQ拥有约2000颗星,并且两个项目都显示出积极的发展

我认为,Celery并不像其声誉可能会让您相信的那么复杂,但是您将不得不使用RTFM。

那么,为什么有人愿意将(可能功能更全的)Celery换成RQ?在我看来,这全都归结为简单性。通过限制自己使用Redis + Unix,RQ提供了更简单的文档,更简单的代码库和更简单的API。这意味着您(以及潜在的项目贡献者)可以专注于您关心的代码,而不必在工作内存中保留有关任务队列系统的详细信息。我们每个人一次都有多少个限制,并且不再需要在其中保留任务队列详细信息,因此RQ让您回到您关心的代码。这种简单性是以语言间任务队列,广泛的操作系统支持,100%可靠的消息保证以及轻松切换消息代理的功能为代价的。

Here is what I have found while trying to answer this exact same question. It’s probably not comprehensive, and may even be inaccurate on some points.

In short, RQ is designed to be simpler all around. Celery is designed to be more robust. They are both excellent.

  • Documentation. RQ’s documentation is comprehensive without being complex, and mirrors the project’s overall simplicity – you never feel lost or confused. Celery’s documentation is also comprehensive, but expect to be re-visiting it quite a lot when you’re first setting things up as there are too many options to internalize
  • Monitoring. Celery’s Flower and the RQ dashboard are both very simple to setup and give you at least 90% of all information you would ever want

  • Broker support. Celery is the clear winner, RQ only supports Redis. This means less documentation on “what is a broker”, but also means you cannot switch brokers in the future if Redis no longer works for you. For example, Instagram considered both Redis and RabbitMQ with Celery. This is important because different brokers have different guarantees e.g. Redis cannot (as of writing) guarantee 100% that your messages are delivered.

  • Priority queues. RQs priority queue model is simple and effective – workers read from queues in order. Celery requires spinning up multiple workers to consume from different queues. Both approaches work

  • OS Support. Celery is the clear winner here, as RQ only runs on systems that support fork e.g. Unix systems

  • Language support. RQ only supports Python, whereas Celery lets you send tasks from one language to a different language

  • API. Celery is extremely flexible (multiple result backends, nice config format, workflow canvas support) but naturally this power can be confusing. By contrast, the RQ api is simple.

  • Subtask support. Celery supports subtasks (e.g. creating new tasks from within existing tasks). I don’t know if RQ does

  • Community and Stability. Celery is probably more established, but they are both active projects. As of writing, Celery has ~3500 stars on Github while RQ has ~2000 and both projects show active development

In my opinion, Celery is not as complex as its reputation might lead you to believe, but you will have to RTFM.

So, why would anyone be willing to trade the (arguably more full-featured) Celery for RQ? In my mind, it all comes down to the simplicity. By restricting itself to Redis+Unix, RQ provides simpler documentation, simpler codebase, and a simpler API. This means you (and potential contributors to your project) can focus on the code you care about, instead of having to keep details about the task queue system in your working memory. We all have a limit on how many details can be in our head at once, and by removing the need to keep task queue details in there RQ lets get back to the code you care about. That simplicity comes at the expense of features like inter-language task queues, wide OS support, 100% reliable message guarantees, and ability to switch message brokers easily.


回答 1

Celery并不那么复杂。从本质上讲,您可以从进行逐步配置tutorials,创建一个celery实例,用装饰您的功能,@celery.task然后使用来运行任务my_task.delay(*args, **kwargs)

从您自己的评估来看,似乎您必须在缺少(关键)功能或有些多余的东西之间进行选择。在我的书中,这并不是很难选择的选择。

Celery is not that complicated. At its core, you do the step by step configuration from the tutorials, create a celery instance, decorate your function with @celery.task then run the task with my_task.delay(*args, **kwargs).

Judging from your own assessment, it seems you have to choose between lacking (key) features or having some excess hanging around. That is not too hard of a choice in my book.


如何在Python中获得类似于Cron的调度程序?[关闭]

问题:如何在Python中获得类似于Cron的调度程序?[关闭]

我正在寻找在Python库将提供atcron一样的功能。

我很想拥有一个纯Python解决方案,而不是依赖于安装在盒子上的工具;这样,我可以在没有cron的机器上运行。

对于不熟悉的用户,cron您可以根据以下表达式来安排任务:

 0 2 * * 7 /usr/bin/run-backup # run the backups at 0200 on Every Sunday
 0 9-17/2 * * 1-5 /usr/bin/purge-temps # run the purge temps command, every 2 hours between 9am and 5pm on Mondays to Fridays.

cron时间表达式语法不太重要,但是我希望具有这种灵活性。

如果没有任何东西可以立即为我执行此操作,将不胜感激地收到有关构建基块进行类似操作的任何建议。

编辑 我对启动过程不感兴趣,只是“工作”也用Python编写-python函数。必要时,我认为这将是一个不同的线程,但不会出现在不同的过程中。

为此,我正在寻找cron时间表达式的可表达性,但是在Python中。

Cron 已经存在了很多年,但我正在尝试尽可能地便携。我不能依靠它的存在。

I’m looking for a library in Python which will provide at and cron like functionality.

I’d quite like have a pure Python solution, rather than relying on tools installed on the box; this way I run on machines with no cron.

For those unfamiliar with cron: you can schedule tasks based upon an expression like:

 0 2 * * 7 /usr/bin/run-backup # run the backups at 0200 on Every Sunday
 0 9-17/2 * * 1-5 /usr/bin/purge-temps # run the purge temps command, every 2 hours between 9am and 5pm on Mondays to Fridays.

The cron time expression syntax is less important, but I would like to have something with this sort of flexibility.

If there isn’t something that does this for me out-the-box, any suggestions for the building blocks to make something like this would be gratefully received.

Edit I’m not interested in launching processes, just “jobs” also written in Python – python functions. By necessity I think this would be a different thread, but not in a different process.

To this end, I’m looking for the expressivity of the cron time expression, but in Python.

Cron has been around for years, but I’m trying to be as portable as possible. I cannot rely on its presence.


回答 0

如果您正在寻找轻巧的结帐时间表

import schedule
import time

def job():
    print("I'm working...")

schedule.every(10).minutes.do(job)
schedule.every().hour.do(job)
schedule.every().day.at("10:30").do(job)

while 1:
    schedule.run_pending()
    time.sleep(1)

披露:我是那个图书馆的作者。

If you’re looking for something lightweight checkout schedule:

import schedule
import time

def job():
    print("I'm working...")

schedule.every(10).minutes.do(job)
schedule.every().hour.do(job)
schedule.every().day.at("10:30").do(job)

while 1:
    schedule.run_pending()
    time.sleep(1)

Disclosure: I’m the author of that library.


回答 1

您可以只使用普通的Python参数传递语法来指定crontab。例如,假设我们定义一个Event类,如下所示:

from datetime import datetime, timedelta
import time

# Some utility classes / functions first
class AllMatch(set):
    """Universal set - match everything"""
    def __contains__(self, item): return True

allMatch = AllMatch()

def conv_to_set(obj):  # Allow single integer to be provided
    if isinstance(obj, (int,long)):
        return set([obj])  # Single item
    if not isinstance(obj, set):
        obj = set(obj)
    return obj

# The actual Event class
class Event(object):
    def __init__(self, action, min=allMatch, hour=allMatch, 
                       day=allMatch, month=allMatch, dow=allMatch, 
                       args=(), kwargs={}):
        self.mins = conv_to_set(min)
        self.hours= conv_to_set(hour)
        self.days = conv_to_set(day)
        self.months = conv_to_set(month)
        self.dow = conv_to_set(dow)
        self.action = action
        self.args = args
        self.kwargs = kwargs

    def matchtime(self, t):
        """Return True if this event should trigger at the specified datetime"""
        return ((t.minute     in self.mins) and
                (t.hour       in self.hours) and
                (t.day        in self.days) and
                (t.month      in self.months) and
                (t.weekday()  in self.dow))

    def check(self, t):
        if self.matchtime(t):
            self.action(*self.args, **self.kwargs)

(注意:未经彻底测试)

然后,可以使用普通的python语法将CronTab指定为:

c = CronTab(
  Event(perform_backup, 0, 2, dow=6 ),
  Event(purge_temps, 0, range(9,18,2), dow=range(0,5))
)

这样,您就可以充分利用Python的参数机制(混合使用位置和关键字args,并且可以将符号名称用于星期和几个月的名称)

将CronTab类定义为仅以分钟为增量休眠,并在每个事件上调用check()。(夏令时/时区可能有一些细微之处,请注意。)这是一个快速实现:

class CronTab(object):
    def __init__(self, *events):
        self.events = events

    def run(self):
        t=datetime(*datetime.now().timetuple()[:5])
        while 1:
            for e in self.events:
                e.check(t)

            t += timedelta(minutes=1)
            while datetime.now() < t:
                time.sleep((t - datetime.now()).seconds)

需要注意的几件事:Python的工作日/月为零索引(与cron不同),并且该范围排除了最后一个元素,因此像“ 1-5”这样的语法变为range(0,5)-即[0,1,2, 3,4]。如果您喜欢cron语法,那么对其进行解析应该不会太困难。

You could just use normal Python argument passing syntax to specify your crontab. For example, suppose we define an Event class as below:

from datetime import datetime, timedelta
import time

# Some utility classes / functions first
class AllMatch(set):
    """Universal set - match everything"""
    def __contains__(self, item): return True

allMatch = AllMatch()

def conv_to_set(obj):  # Allow single integer to be provided
    if isinstance(obj, (int,long)):
        return set([obj])  # Single item
    if not isinstance(obj, set):
        obj = set(obj)
    return obj

# The actual Event class
class Event(object):
    def __init__(self, action, min=allMatch, hour=allMatch, 
                       day=allMatch, month=allMatch, dow=allMatch, 
                       args=(), kwargs={}):
        self.mins = conv_to_set(min)
        self.hours= conv_to_set(hour)
        self.days = conv_to_set(day)
        self.months = conv_to_set(month)
        self.dow = conv_to_set(dow)
        self.action = action
        self.args = args
        self.kwargs = kwargs

    def matchtime(self, t):
        """Return True if this event should trigger at the specified datetime"""
        return ((t.minute     in self.mins) and
                (t.hour       in self.hours) and
                (t.day        in self.days) and
                (t.month      in self.months) and
                (t.weekday()  in self.dow))

    def check(self, t):
        if self.matchtime(t):
            self.action(*self.args, **self.kwargs)

(Note: Not thoroughly tested)

Then your CronTab can be specified in normal python syntax as:

c = CronTab(
  Event(perform_backup, 0, 2, dow=6 ),
  Event(purge_temps, 0, range(9,18,2), dow=range(0,5))
)

This way you get the full power of Python’s argument mechanics (mixing positional and keyword args, and can use symbolic names for names of weeks and months)

The CronTab class would be defined as simply sleeping in minute increments, and calling check() on each event. (There are probably some subtleties with daylight savings time / timezones to be wary of though). Here’s a quick implementation:

class CronTab(object):
    def __init__(self, *events):
        self.events = events

    def run(self):
        t=datetime(*datetime.now().timetuple()[:5])
        while 1:
            for e in self.events:
                e.check(t)

            t += timedelta(minutes=1)
            while datetime.now() < t:
                time.sleep((t - datetime.now()).seconds)

A few things to note: Python’s weekdays / months are zero indexed (unlike cron), and that range excludes the last element, hence syntax like “1-5” becomes range(0,5) – ie [0,1,2,3,4]. If you prefer cron syntax, parsing it shouldn’t be too difficult however.


回答 2

也许只有在问了问题之后才出现这种情况;我以为是出于完整性考虑而只提到它:https : //apscheduler.readthedocs.org/en/latest/

maybe this has come up only after the question was asked; I thought I just mention it for completeness sake: https://apscheduler.readthedocs.org/en/latest/


回答 3

我在搜索中看到的一件事是python的sched模块,这可能是您正在寻找的东西。

One thing that in my searches I’ve seen is python’s sched module which might be the kind of thing you’re looking for.


回答 4

“ … Crontab模块,用于读取和写入crontab文件以及自动且简单地使用直接API来访问系统cron。…”

http://pypi.python.org/pypi/python-crontab

还有APScheduler(一个python软件包)。已经编写和调试。

http://packages.python.org/APScheduler/cronschedule.html

“… Crontab module for read and writing crontab files and accessing the system cron automatically and simply using a direct API. …”

http://pypi.python.org/pypi/python-crontab

and also APScheduler, a python package. Already written & debugged.

http://packages.python.org/APScheduler/cronschedule.html


回答 5

与上面大致相同,但同时使用gevent :)

"""Gevent based crontab implementation"""

from datetime import datetime, timedelta
import gevent

# Some utility classes / functions first
def conv_to_set(obj):
    """Converts to set allowing single integer to be provided"""

    if isinstance(obj, (int, long)):
        return set([obj])  # Single item
    if not isinstance(obj, set):
        obj = set(obj)
    return obj

class AllMatch(set):
    """Universal set - match everything"""
    def __contains__(self, item): 
        return True

allMatch = AllMatch()

class Event(object):
    """The Actual Event Class"""

    def __init__(self, action, minute=allMatch, hour=allMatch, 
                       day=allMatch, month=allMatch, daysofweek=allMatch, 
                       args=(), kwargs={}):
        self.mins = conv_to_set(minute)
        self.hours = conv_to_set(hour)
        self.days = conv_to_set(day)
        self.months = conv_to_set(month)
        self.daysofweek = conv_to_set(daysofweek)
        self.action = action
        self.args = args
        self.kwargs = kwargs

    def matchtime(self, t1):
        """Return True if this event should trigger at the specified datetime"""
        return ((t1.minute     in self.mins) and
                (t1.hour       in self.hours) and
                (t1.day        in self.days) and
                (t1.month      in self.months) and
                (t1.weekday()  in self.daysofweek))

    def check(self, t):
        """Check and run action if needed"""

        if self.matchtime(t):
            self.action(*self.args, **self.kwargs)

class CronTab(object):
    """The crontab implementation"""

    def __init__(self, *events):
        self.events = events

    def _check(self):
        """Check all events in separate greenlets"""

        t1 = datetime(*datetime.now().timetuple()[:5])
        for event in self.events:
            gevent.spawn(event.check, t1)

        t1 += timedelta(minutes=1)
        s1 = (t1 - datetime.now()).seconds + 1
        print "Checking again in %s seconds" % s1
        job = gevent.spawn_later(s1, self._check)

    def run(self):
        """Run the cron forever"""

        self._check()
        while True:
            gevent.sleep(60)

import os 
def test_task():
    """Just an example that sends a bell and asd to all terminals"""

    os.system('echo asd | wall')  

cron = CronTab(
  Event(test_task, 22, 1 ),
  Event(test_task, 0, range(9,18,2), daysofweek=range(0,5)),
)
cron.run()

More or less same as above but concurrent using gevent :)

"""Gevent based crontab implementation"""

from datetime import datetime, timedelta
import gevent

# Some utility classes / functions first
def conv_to_set(obj):
    """Converts to set allowing single integer to be provided"""

    if isinstance(obj, (int, long)):
        return set([obj])  # Single item
    if not isinstance(obj, set):
        obj = set(obj)
    return obj

class AllMatch(set):
    """Universal set - match everything"""
    def __contains__(self, item): 
        return True

allMatch = AllMatch()

class Event(object):
    """The Actual Event Class"""

    def __init__(self, action, minute=allMatch, hour=allMatch, 
                       day=allMatch, month=allMatch, daysofweek=allMatch, 
                       args=(), kwargs={}):
        self.mins = conv_to_set(minute)
        self.hours = conv_to_set(hour)
        self.days = conv_to_set(day)
        self.months = conv_to_set(month)
        self.daysofweek = conv_to_set(daysofweek)
        self.action = action
        self.args = args
        self.kwargs = kwargs

    def matchtime(self, t1):
        """Return True if this event should trigger at the specified datetime"""
        return ((t1.minute     in self.mins) and
                (t1.hour       in self.hours) and
                (t1.day        in self.days) and
                (t1.month      in self.months) and
                (t1.weekday()  in self.daysofweek))

    def check(self, t):
        """Check and run action if needed"""

        if self.matchtime(t):
            self.action(*self.args, **self.kwargs)

class CronTab(object):
    """The crontab implementation"""

    def __init__(self, *events):
        self.events = events

    def _check(self):
        """Check all events in separate greenlets"""

        t1 = datetime(*datetime.now().timetuple()[:5])
        for event in self.events:
            gevent.spawn(event.check, t1)

        t1 += timedelta(minutes=1)
        s1 = (t1 - datetime.now()).seconds + 1
        print "Checking again in %s seconds" % s1
        job = gevent.spawn_later(s1, self._check)

    def run(self):
        """Run the cron forever"""

        self._check()
        while True:
            gevent.sleep(60)

import os 
def test_task():
    """Just an example that sends a bell and asd to all terminals"""

    os.system('echo asd | wall')  

cron = CronTab(
  Event(test_task, 22, 1 ),
  Event(test_task, 0, range(9,18,2), daysofweek=range(0,5)),
)
cron.run()

回答 6

列出的解决方案均未尝试解析复杂的cron计划字符串。所以,这是我的版本,使用croniter。基本要点:

schedule = "*/5 * * * *" # Run every five minutes

nextRunTime = getNextCronRunTime(schedule)
while True:
     roundedDownTime = roundDownTime()
     if (roundedDownTime == nextRunTime):
         ####################################
         ### Do your periodic thing here. ###
         ####################################
         nextRunTime = getNextCronRunTime(schedule)
     elif (roundedDownTime > nextRunTime):
         # We missed an execution. Error. Re initialize.
         nextRunTime = getNextCronRunTime(schedule)
     sleepTillTopOfNextMinute()

辅助程序:

from croniter import croniter
from datetime import datetime, timedelta

# Round time down to the top of the previous minute
def roundDownTime(dt=None, dateDelta=timedelta(minutes=1)):
    roundTo = dateDelta.total_seconds()
    if dt == None : dt = datetime.now()
    seconds = (dt - dt.min).seconds
    rounding = (seconds+roundTo/2) // roundTo * roundTo
    return dt + timedelta(0,rounding-seconds,-dt.microsecond)

# Get next run time from now, based on schedule specified by cron string
def getNextCronRunTime(schedule):
    return croniter(schedule, datetime.now()).get_next(datetime)

# Sleep till the top of the next minute
def sleepTillTopOfNextMinute():
    t = datetime.utcnow()
    sleeptime = 60 - (t.second + t.microsecond/1000000.0)
    time.sleep(sleeptime)

None of the listed solutions even attempt to parse a complex cron schedule string. So, here is my version, using croniter. Basic gist:

schedule = "*/5 * * * *" # Run every five minutes

nextRunTime = getNextCronRunTime(schedule)
while True:
     roundedDownTime = roundDownTime()
     if (roundedDownTime == nextRunTime):
         ####################################
         ### Do your periodic thing here. ###
         ####################################
         nextRunTime = getNextCronRunTime(schedule)
     elif (roundedDownTime > nextRunTime):
         # We missed an execution. Error. Re initialize.
         nextRunTime = getNextCronRunTime(schedule)
     sleepTillTopOfNextMinute()

Helper routines:

from croniter import croniter
from datetime import datetime, timedelta

# Round time down to the top of the previous minute
def roundDownTime(dt=None, dateDelta=timedelta(minutes=1)):
    roundTo = dateDelta.total_seconds()
    if dt == None : dt = datetime.now()
    seconds = (dt - dt.min).seconds
    rounding = (seconds+roundTo/2) // roundTo * roundTo
    return dt + timedelta(0,rounding-seconds,-dt.microsecond)

# Get next run time from now, based on schedule specified by cron string
def getNextCronRunTime(schedule):
    return croniter(schedule, datetime.now()).get_next(datetime)

# Sleep till the top of the next minute
def sleepTillTopOfNextMinute():
    t = datetime.utcnow()
    sleeptime = 60 - (t.second + t.microsecond/1000000.0)
    time.sleep(sleeptime)

回答 7

我已经修改了脚本。

  1. 易于使用:

    cron = Cron()
    cron.add('* * * * *'   , minute_task) # every minute
    cron.add('33 * * * *'  , day_task)    # every hour
    cron.add('34 18 * * *' , day_task)    # every day
    cron.run()
  2. 尝试在一分钟的第一秒开始任务。

Github上的代码

I have modified the script.

  1. Easy to use:

    cron = Cron()
    cron.add('* * * * *'   , minute_task) # every minute
    cron.add('33 * * * *'  , day_task)    # every hour
    cron.add('34 18 * * *' , day_task)    # every day
    cron.run()
    
  2. Try to start task in the first second of a minute.

Code on Github


回答 8

我有一个小的修复 Brian建议 CronTab类运行方法

计时时间为一秒钟,导致每分钟结束时出现一秒钟的硬循环。

class CronTab(object):
    def __init__(self, *events):
        self.events = events

    def run(self):
        t=datetime(*datetime.now().timetuple()[:5])
        while 1:
            for e in self.events:
                e.check(t)

            t += timedelta(minutes=1)
            n = datetime.now()
            while n < t:
                s = (t - n).seconds + 1
                time.sleep(s)
                n = datetime.now()

I have a minor fix for the CronTab class run method suggested by Brian.

The timing was out by one second leading to a one-second, hard loop at the end of each minute.

class CronTab(object):
    def __init__(self, *events):
        self.events = events

    def run(self):
        t=datetime(*datetime.now().timetuple()[:5])
        while 1:
            for e in self.events:
                e.check(t)

            t += timedelta(minutes=1)
            n = datetime.now()
            while n < t:
                s = (t - n).seconds + 1
                time.sleep(s)
                n = datetime.now()

回答 9

没有“纯python”方法可以执行此操作,因为某些其他过程将必须启动python才能运行您的解决方案。每个平台都有一种或二十种不同的方式来启动流程和监视其进度。在UNIX平台上,cron是旧标准。在Mac OS X上,还启动了该功能,它将类似cron的启动与看门狗功能结合在一起,如果需要的话,可以使您的进程保持活动状态。python运行后,即可使用sched模块安排任务。

There isn’t a “pure python” way to do this because some other process would have to launch python in order to run your solution. Every platform will have one or twenty different ways to launch processes and monitor their progress. On unix platforms, cron is the old standard. On Mac OS X there is also launchd, which combines cron-like launching with watchdog functionality that can keep your process alive if that’s what you want. Once python is running, then you can use the sched module to schedule tasks.


回答 10

我知道有很多答案,但是另一个解决方案可能是与装饰器搭配使用。这是每天在特定时间重复执行功能的示例。关于使用这种方式的很酷的想法是,您只需要向要计划的功能添加语法糖

@repeatEveryDay(hour=6, minutes=30)
def sayHello(name):
    print(f"Hello {name}")

sayHello("Bob") # Now this function will be invoked every day at 6.30 a.m

装饰器将如下所示:

def repeatEveryDay(hour, minutes=0, seconds=0):
    """
    Decorator that will run the decorated function everyday at that hour, minutes and seconds.
    :param hour: 0-24
    :param minutes: 0-60 (Optional)
    :param seconds: 0-60 (Optional)
    """
    def decoratorRepeat(func):

        @functools.wraps(func)
        def wrapperRepeat(*args, **kwargs):

            def getLocalTime():
                return datetime.datetime.fromtimestamp(time.mktime(time.localtime()))

            # Get the datetime of the first function call
            td = datetime.timedelta(seconds=15)
            if wrapperRepeat.nextSent == None:
                now = getLocalTime()
                wrapperRepeat.nextSent = datetime.datetime(now.year, now.month, now.day, hour, minutes, seconds)
                if wrapperRepeat.nextSent < now:
                    wrapperRepeat.nextSent += td

            # Waiting till next day
            while getLocalTime() < wrapperRepeat.nextSent:
                time.sleep(1)

            # Call the function
            func(*args, **kwargs)

            # Get the datetime of the next function call
            wrapperRepeat.nextSent += td
            wrapperRepeat(*args, **kwargs)

        wrapperRepeat.nextSent = None
        return wrapperRepeat

    return decoratorRepeat

I know there are a lot of answers, but another solution could be to go with decorators. This is an example to repeat a function everyday at a specific time. The cool think about using this way is that you only need to add the Syntactic Sugar to the function you want to schedule:

@repeatEveryDay(hour=6, minutes=30)
def sayHello(name):
    print(f"Hello {name}")

sayHello("Bob") # Now this function will be invoked every day at 6.30 a.m

And the decorator will look like:

def repeatEveryDay(hour, minutes=0, seconds=0):
    """
    Decorator that will run the decorated function everyday at that hour, minutes and seconds.
    :param hour: 0-24
    :param minutes: 0-60 (Optional)
    :param seconds: 0-60 (Optional)
    """
    def decoratorRepeat(func):

        @functools.wraps(func)
        def wrapperRepeat(*args, **kwargs):

            def getLocalTime():
                return datetime.datetime.fromtimestamp(time.mktime(time.localtime()))

            # Get the datetime of the first function call
            td = datetime.timedelta(seconds=15)
            if wrapperRepeat.nextSent == None:
                now = getLocalTime()
                wrapperRepeat.nextSent = datetime.datetime(now.year, now.month, now.day, hour, minutes, seconds)
                if wrapperRepeat.nextSent < now:
                    wrapperRepeat.nextSent += td

            # Waiting till next day
            while getLocalTime() < wrapperRepeat.nextSent:
                time.sleep(1)

            # Call the function
            func(*args, **kwargs)

            # Get the datetime of the next function call
            wrapperRepeat.nextSent += td
            wrapperRepeat(*args, **kwargs)

        wrapperRepeat.nextSent = None
        return wrapperRepeat

    return decoratorRepeat

回答 11

Brian的解决方案运行良好。但是,正如其他人指出的那样,运行代码中存在一个细微的错误。我也发现它的需求过于复杂。

如果有人需要,这是我对运行代码更简单,更实用的替代方法:

def run(self):
    while 1:
        t = datetime.now()
        for e in self.events:
            e.check(t)

        time.sleep(60 - t.second - t.microsecond / 1000000.0)

Brian’s solution is working quite well. However, as others have pointed out, there is a subtle bug in the run code. Also i found it overly complicated for the needs.

Here is my simpler and functional alternative for the run code in case anybody needs it:

def run(self):
    while 1:
        t = datetime.now()
        for e in self.events:
            e.check(t)

        time.sleep(60 - t.second - t.microsecond / 1000000.0)

回答 12

另一个简单的解决方案是:

from aqcron import At
from time import sleep
from datetime import datetime

# Event scheduling
event_1 = At( second=5 )
event_2 = At( second=[0,20,40] )

while True:
    now = datetime.now()

    # Event check
    if now in event_1: print "event_1"
    if now in event_2: print "event_2"

    sleep(1)

而类aqcron.At是:

# aqcron.py

class At(object):
    def __init__(self, year=None,    month=None,
                 day=None,     weekday=None,
                 hour=None,    minute=None,
                 second=None):
        loc = locals()
        loc.pop("self")
        self.at = dict((k, v) for k, v in loc.iteritems() if v != None)

    def __contains__(self, now):
        for k in self.at.keys():
            try:
                if not getattr(now, k) in self.at[k]: return False
            except TypeError:
                if self.at[k] != getattr(now, k): return False
        return True

Another trivial solution would be:

from aqcron import At
from time import sleep
from datetime import datetime

# Event scheduling
event_1 = At( second=5 )
event_2 = At( second=[0,20,40] )

while True:
    now = datetime.now()

    # Event check
    if now in event_1: print "event_1"
    if now in event_2: print "event_2"

    sleep(1)

And the class aqcron.At is:

# aqcron.py

class At(object):
    def __init__(self, year=None,    month=None,
                 day=None,     weekday=None,
                 hour=None,    minute=None,
                 second=None):
        loc = locals()
        loc.pop("self")
        self.at = dict((k, v) for k, v in loc.iteritems() if v != None)

    def __contains__(self, now):
        for k in self.at.keys():
            try:
                if not getattr(now, k) in self.at[k]: return False
            except TypeError:
                if self.at[k] != getattr(now, k): return False
        return True

回答 13

如果您正在寻找分布式调度程序,则可以查看https://github.com/sherinkurian/mani-尽管确实需要redis,所以可能不是您想要的。(请注意,我是作者)这是通过使时钟在多个节点上运行来确保容错的。

If you are looking for a distributed scheduler, you can check out https://github.com/sherinkurian/mani – it does need redis though so might not be what you are looking for. (note that i am the author) this was built to ensure fault-tolerance by having clock run on more than one node.


回答 14

我不知道是否已经存在类似的东西。使用时间,日期时间和/或日历模块轻松编写自己的代码,请参见http://docs.python.org/library/time.html

python解决方案的唯一问题是您的工作需要始终运行,并且可能在重新启动后自动“恢复”,而您确实需要依赖于系统的解决方案。

I don’t know if something like that already exists. It would be easy to write your own with time, datetime and/or calendar modules, see http://docs.python.org/library/time.html

The only concern for a python solution is that your job needs to be always running and possibly be automatically “resurrected” after a reboot, something for which you do need to rely on system dependent solutions.


回答 15

您可以查看PiCloud的[1] Crons [2],但请注意,您的作业不会在您自己的计算机上运行。如果您每月使用20个小时以上的计算时间,则还需要付费。

[1] http://www.picloud.com

[2] http://docs.picloud.com/cron.html

You can check out PiCloud’s [1] Crons [2], but do note that your jobs won’t be running on your own machine. It’s also a service that you’ll need to pay for if you use more than 20 hours of compute time a month.

[1] http://www.picloud.com

[2] http://docs.picloud.com/cron.html


回答 16

服务器上Crontab的方法。

Python文件名hello.py

步骤1:创建一个sh文件,让其命名为s.sh

python3 /home/ubuntu/Shaurya/Folder/hello.py> /home/ubuntu/Shaurya/Folder/log.txt 2>&1

第2步:打开Crontab编辑器

crontab -e

步骤3:添加计划时间

使用Crontab格式

2 * * * * sudo sh /home/ubuntu/Shaurya/Folder/s.sh

该cron将在“第2分钟”运行。

Method of Crontab on Server.

Python file name hello.py

Step1: Create a sh file let give name s.sh

python3 /home/ubuntu/Shaurya/Folder/hello.py > /home/ubuntu/Shaurya/Folder/log.txt 2>&1

Step2: Open Crontab Editor

crontab -e

Step3: Add Schedule Time

Use Crontab Formatting

2 * * * * sudo sh /home/ubuntu/Shaurya/Folder/s.sh

This cron will run “At minute 2.”


回答 17

我喜欢pycron软件包如何解决此问题。

import pycron
import time

while True:
    if pycron.is_now('0 2 * * 0'):   # True Every Sunday at 02:00
        print('running backup')
    time.sleep(60)

I like how the pycron package solves this problem.

import pycron
import time

while True:
    if pycron.is_now('0 2 * * 0'):   # True Every Sunday at 02:00
        print('running backup')
    time.sleep(60)

设置预定的工作?

问题:设置预定的工作?

我一直在使用Django开发Web应用程序,并且很好奇是否有一种方法可以安排作业定期运行。

基本上,我只想遍历数据库并自动定期进行一些计算/更新,但是我似乎找不到任何有关此操作的文档。

有人知道如何设置吗?

需要说明的是:我知道我可以为此设置cron工作,但我很好奇Django中是否有某些功能可以提供此功能。我希望人们能够自己部署此应用程序,而无需进行大量配置(最好为零)。

我已经考虑过通过简单地检查自从上次将请求发送到站点以来是否应该运行作业来“追溯地”触发这些操作,但是我希望有一些清洁的方法。

I’ve been working on a web app using Django, and I’m curious if there is a way to schedule a job to run periodically.

Basically I just want to run through the database and make some calculations/updates on an automatic, regular basis, but I can’t seem to find any documentation on doing this.

Does anyone know how to set this up?

To clarify: I know I can set up a cron job to do this, but I’m curious if there is some feature in Django that provides this functionality. I’d like people to be able to deploy this app themselves without having to do much config (preferably zero).

I’ve considered triggering these actions “retroactively” by simply checking if a job should have been run since the last time a request was sent to the site, but I’m hoping for something a bit cleaner.


回答 0

我采用的一种解决方案是这样做:

1)创建一个自定义管理命令,例如

python manage.py my_cool_command

2)使用cron(在Linux上)或at在要求的时间(在Windows上)运行我的命令。

这是一个简单的解决方案,不需要安装沉重的AMQP堆栈。但是,使用其他答案中提到的诸如Celery之类的东西有很好的优势。特别是,使用Celery很好,不必将应用程序逻辑散布到crontab文件中。但是,cron解决方案非常适合中小型应用程序,并且您不需要太多外部依赖项。

编辑:

在更高版本的Windows中,at不建议在Windows 8,Server 2012及更高版本中使用该命令。您可以使用schtasks.exe相同的用途。

****更新****这是django doc 的新链接,用于编写自定义管理命令

One solution that I have employed is to do this:

1) Create a custom management command, e.g.

python manage.py my_cool_command

2) Use cron (on Linux) or at (on Windows) to run my command at the required times.

This is a simple solution that doesn’t require installing a heavy AMQP stack. However there are nice advantages to using something like Celery, mentioned in the other answers. In particular, with Celery it is nice to not have to spread your application logic out into crontab files. However the cron solution works quite nicely for a small to medium sized application and where you don’t want a lot of external dependencies.

EDIT:

In later version of windows the at command is deprecated for Windows 8, Server 2012 and above. You can use schtasks.exe for same use.

**** UPDATE **** This the new link of django doc for writing the custom management command


回答 1

Celery是基于AMQP(RabbitMQ)构建的分布式任务队列。它还以cron类的方式处理周期性任务(请参阅周期性任务)。根据您的应用程序,可能值得一试。

用django(docs)设置Celery非常容易,并且在停机的情况下,定期任务实际上会跳过错过的任务。如果任务失败,Celery还具有内置的重试机制。

Celery is a distributed task queue, built on AMQP (RabbitMQ). It also handles periodic tasks in a cron-like fashion (see periodic tasks). Depending on your app, it might be worth a gander.

Celery is pretty easy to set up with django (docs), and periodic tasks will actually skip missed tasks in case of a downtime. Celery also has built-in retry mechanisms, in case a task fails.


回答 2

我们已经开源了我认为是结构化应用程序的源代码。Brian的解决方案也暗指。我们希望收到任何/所有反馈!

https://github.com/tivix/django-cron

它带有一个管理命令:

./manage.py runcrons

做到了。每个cron都被建模为一个类(因此其所有OO),并且每个cron都以不同的频率运行,并且我们确保相同cron类型不会并行运行(以防万一cron自身花费的时间比其频率更长!)

We’ve open-sourced what I think is a structured app. that Brian’s solution above alludes too. We would love any / all feedback!

https://github.com/tivix/django-cron

It comes with one management command:

./manage.py runcrons

That does the job. Each cron is modeled as a class (so its all OO) and each cron runs at a different frequency and we make sure the same cron type doesn’t run in parallel (in case crons themselves take longer time to run than their frequency!)


回答 3

如果您使用的是标准POSIX操作系统,请使用cron

如果您使用的是Windows,请

编写Django管理命令以

  1. 找出他们使用的平台。

  2. 为您的用户执行适当的“ AT”命令,为您的用户更新crontab。

If you’re using a standard POSIX OS, you use cron.

If you’re using Windows, you use at.

Write a Django management command to

  1. Figure out what platform they’re on.

  2. Either execute the appropriate “AT” command for your users, or update the crontab for your users.


回答 4

有趣的新可插拔Django应用:django-chronograph

您只需要添加一个用作计时器的cron条目,即可在脚本中运行一个非常漂亮的Django管理界面。

Interesting new pluggable Django app: django-chronograph

You only have to add one cron entry which acts as a timer, and you have a very nice Django admin interface into the scripts to run.


回答 5

看一下Django Poor Man’s Cron,这是一个Django应用,它利用垃圾邮件搜索引擎,搜索引擎索引机器人等以大致固定的时间间隔运行计划的任务

请参阅:http : //code.google.com/p/django-poormanscron/

Look at Django Poor Man’s Cron which is a Django app that makes use of spambots, search engine indexing robots and alike to run scheduled tasks in approximately regular intervals

See: http://code.google.com/p/django-poormanscron/


回答 6

布赖恩·尼尔(Brian Neal)建议通过cron运行管理命令效果很好,但是如果您正在寻找更强大的功能(但还不如Celery(Celery)那么细腻),我可以考虑一下Kronos这样的库:

# app/cron.py

import kronos

@kronos.register('0 * * * *')
def task():
    pass

Brian Neal’s suggestion of running management commands via cron works well, but if you’re looking for something a little more robust (yet not as elaborate as Celery) I’d look into a library like Kronos:

# app/cron.py

import kronos

@kronos.register('0 * * * *')
def task():
    pass

回答 7

RabbitMQ和Celery比Cron具有更多的功能和任务处理功能。如果任务失败不是问题,并且您认为您将在下一个调用中处理损坏的任务,那么Cron就足够了。

Celery & AMQP将让您处理损坏的任务,并且它将由另一位工作人员再次执行(Celery工作人员侦听要处理的下一个任务),直到到达任务的max_retries属性为止。您甚至可以在发生故障时调用任务,例如记录故障,或在发生故障后向管理员发送电子邮件max_retries

而且,当您需要扩展应用程序时,您可以分发Celery和AMQP服务器。

RabbitMQ and Celery have more features and task handling capabilities than Cron. If task failure isn’t an issue, and you think you will handle broken tasks in the next call, then Cron is sufficient.

Celery & AMQP will let you handle the broken task, and it will get executed again by another worker (Celery workers listen for the next task to work on), until the task’s max_retries attribute is reached. You can even invoke tasks on failure, like logging the failure, or sending an email to the admin once the max_retries has been reached.

And you can distribute Celery and AMQP servers when you need to scale your application.


回答 8

我之前有完全相同的要求,最终使用APScheduler用户指南)解决了这一要求

它使调度作业变得非常简单,并使它独立于某些代码的基于请求的执行。以下是一个简单的示例。

from apscheduler.schedulers.background import BackgroundScheduler

scheduler = BackgroundScheduler()
job = None

def tick():
    print('One tick!')\

def start_job():
    global job
    job = scheduler.add_job(tick, 'interval', seconds=3600)
    try:
        scheduler.start()
    except:
        pass

希望这对某人有帮助!

I had exactly the same requirement a while ago, and ended up solving it using APScheduler (User Guide)

It makes scheduling jobs super simple, and keeps it independent for from request-based execution of some code. Following is a simple example.

from apscheduler.schedulers.background import BackgroundScheduler

scheduler = BackgroundScheduler()
job = None

def tick():
    print('One tick!')\

def start_job():
    global job
    job = scheduler.add_job(tick, 'interval', seconds=3600)
    try:
        scheduler.start()
    except:
        pass

Hope this helps somebody!


回答 9

我个人使用cron,但是django-extensionsJobs Scheduling部分看起来很有趣。

I personally use cron, but the Jobs Scheduling parts of django-extensions looks interesting.


回答 10

尽管不是Django的一部分,但Airflow是一个较新的项目(截至2016年),对任务管理很有用。

Airflow是一个工作流自动化和调度系统,可用于创作和管理数据管道。基于Web的UI为开发人员提供了一系列用于管理和查看这些管道的选项。

Airflow用Python编写,并使用Flask构建。

Airflow由Airbnb的Maxime Beauchemin创建,并于2015年春季开源。它于2016年冬季加入Apache Software Foundation的孵化计划。这是Git项目页面和一些其他背景信息

Although not part of Django, Airflow is a more recent project (as of 2016) that is useful for task management.

Airflow is a workflow automation and scheduling system that can be used to author and manage data pipelines. A web-based UI provides the developer with a range of options for managing and viewing these pipelines.

Airflow is written in Python and is built using Flask.

Airflow was created by Maxime Beauchemin at Airbnb and open sourced in the spring of 2015. It joined the Apache Software Foundation’s incubation program in the winter of 2016. Here is the Git project page and some addition background information.


回答 11

将以下内容放在cron.py文件的顶部:

#!/usr/bin/python
import os, sys
sys.path.append('/path/to/') # the parent directory of the project
sys.path.append('/path/to/project') # these lines only needed if not on path
os.environ['DJANGO_SETTINGS_MODULE'] = 'myproj.settings'

# imports and code below

Put the following at the top of your cron.py file:

#!/usr/bin/python
import os, sys
sys.path.append('/path/to/') # the parent directory of the project
sys.path.append('/path/to/project') # these lines only needed if not on path
os.environ['DJANGO_SETTINGS_MODULE'] = 'myproj.settings'

# imports and code below

回答 12

我只是想到了这个相当简单的解决方案:

  1. 定义一个视图函数do_work(req,param),就像在其他任何视图中一样,通过URL映射,返回HttpResponse等。
  2. 根据您的时间偏好设置(或在Windows中使用AT或计划任务)设置cron作业,该作业运行curl http:// localhost / your / mapped / url?param = value

您可以添加参数,但只需将参数添加到URL。

跟我说你们的想法。

[更新]我现在正在使用来自django-extensions的 runjob命令,而不是curl。

我的cron看起来像这样:

@hourly python /path/to/project/manage.py runjobs hourly

…等等,每天,每月等。您也可以将其设置为运行特定作业。

我发现它更易于管理和清洁。不需要将URL映射到视图。只需定义您的工作类别和crontab即可。

I just thought about this rather simple solution:

  1. Define a view function do_work(req, param) like you would with any other view, with URL mapping, return a HttpResponse and so on.
  2. Set up a cron job with your timing preferences (or using AT or Scheduled Tasks in Windows) which runs curl http://localhost/your/mapped/url?param=value.

You can add parameters but just adding parameters to the URL.

Tell me what you guys think.

[Update] I’m now using runjob command from django-extensions instead of curl.

My cron looks something like this:

@hourly python /path/to/project/manage.py runjobs hourly

… and so on for daily, monthly, etc’. You can also set it up to run a specific job.

I find it more managable and a cleaner. Doesn’t require mapping a URL to a view. Just define your job class and crontab and you’re set.


回答 13

在代码部分之后,我可以写任何东西,就像我的views.py :)

#######################################
import os,sys
sys.path.append('/home/administrator/development/store')
os.environ['DJANGO_SETTINGS_MODULE']='store.settings'
from django.core.management impor setup_environ
from store import settings
setup_environ(settings)
#######################################

来自 http://www.cotellese.net/2007/09/27/running-external-scripts-against-django-models/

after the part of code,I can write anything just like my views.py :)

#######################################
import os,sys
sys.path.append('/home/administrator/development/store')
os.environ['DJANGO_SETTINGS_MODULE']='store.settings'
from django.core.management impor setup_environ
from store import settings
setup_environ(settings)
#######################################

from http://www.cotellese.net/2007/09/27/running-external-scripts-against-django-models/


回答 14

您绝对应该检查django-q!它不需要任何额外的配置,并且很可能具有处理商业项目中任何生产问题所需的一切。

它是积极开发的,并且与django,django ORM,mongo,redis很好地集成在一起。这是我的配置:

# django-q
# -------------------------------------------------------------------------
# See: http://django-q.readthedocs.io/en/latest/configure.html
Q_CLUSTER = {
    # Match recommended settings from docs.
    'name': 'DjangoORM',
    'workers': 4,
    'queue_limit': 50,
    'bulk': 10,
    'orm': 'default',

# Custom Settings
# ---------------
# Limit the amount of successful tasks saved to Django.
'save_limit': 10000,

# See https://github.com/Koed00/django-q/issues/110.
'catch_up': False,

# Number of seconds a worker can spend on a task before it's terminated.
'timeout': 60 * 5,

# Number of seconds a broker will wait for a cluster to finish a task before presenting it again. This needs to be
# longer than `timeout`, otherwise the same task will be processed multiple times.
'retry': 60 * 6,

# Whether to force all async() calls to be run with sync=True (making them synchronous).
'sync': False,

# Redirect worker exceptions directly to Sentry error reporter.
'error_reporter': {
    'sentry': RAVEN_CONFIG,
},
}

You should definitely check out django-q! It requires no additional configuration and has quite possibly everything needed to handle any production issues on commercial projects.

It’s actively developed and integrates very well with django, django ORM, mongo, redis. Here is my configuration:

# django-q
# -------------------------------------------------------------------------
# See: http://django-q.readthedocs.io/en/latest/configure.html
Q_CLUSTER = {
    # Match recommended settings from docs.
    'name': 'DjangoORM',
    'workers': 4,
    'queue_limit': 50,
    'bulk': 10,
    'orm': 'default',

# Custom Settings
# ---------------
# Limit the amount of successful tasks saved to Django.
'save_limit': 10000,

# See https://github.com/Koed00/django-q/issues/110.
'catch_up': False,

# Number of seconds a worker can spend on a task before it's terminated.
'timeout': 60 * 5,

# Number of seconds a broker will wait for a cluster to finish a task before presenting it again. This needs to be
# longer than `timeout`, otherwise the same task will be processed multiple times.
'retry': 60 * 6,

# Whether to force all async() calls to be run with sync=True (making them synchronous).
'sync': False,

# Redirect worker exceptions directly to Sentry error reporter.
'error_reporter': {
    'sentry': RAVEN_CONFIG,
},
}

回答 15

用于计划程序作业的Django APScheduler。Advanced Python Scheduler(APScheduler)是一个Python库,可让您安排Python代码稍后执行,一次或定期执行。您可以根据需要随时添加或删除旧作业。

注意:我是这个图书馆的作者

安装APScheduler

pip install apscheduler

查看文件功能调用

文件名:scheduler_jobs.py

def FirstCronTest():
    print("")
    print("I am executed..!")

配置调度程序

制作execute.py文件并添加以下代码

from apscheduler.schedulers.background import BackgroundScheduler
scheduler = BackgroundScheduler()

您的书面函数在这里,调度程序函数写在scheduler_jobs中

import scheduler_jobs 

scheduler.add_job(scheduler_jobs.FirstCronTest, 'interval', seconds=10)
scheduler.start()

链接文件以执行

现在,在Url文件底部添加以下行

import execute

Django APScheduler for Scheduler Jobs. Advanced Python Scheduler (APScheduler) is a Python library that lets you schedule your Python code to be executed later, either just once or periodically. You can add new jobs or remove old ones on the fly as you please.

note: I’m the author of this library

Install APScheduler

pip install apscheduler

View file function to call

file name: scheduler_jobs.py

def FirstCronTest():
    print("")
    print("I am executed..!")

Configuring the scheduler

make execute.py file and add the below codes

from apscheduler.schedulers.background import BackgroundScheduler
scheduler = BackgroundScheduler()

Your written functions Here, the scheduler functions are written in scheduler_jobs

import scheduler_jobs 

scheduler.add_job(scheduler_jobs.FirstCronTest, 'interval', seconds=10)
scheduler.start()

Link the File for Execution

Now, add the below line in the bottom of Url file

import execute

回答 16

我今天对你的问题也有类似的看法。

我不想让它通过服务器cron来处理(最后,大多数库只是cron助手)。

因此,我创建了一个调度模块并将其附加到init

这不是最好的方法,但是它可以帮助我将所有代码都放在一个地方,并且其执行与主应用程序有关。

I had something similar with your problem today.

I didn’t wanted to have it handled by the server trhough cron (and most of the libs were just cron helpers in the end).

So i’ve created a scheduling module and attached it to the init .

It’s not the best approach, but it helps me to have all the code in a single place and with its execution related to the main app.


回答 17

是的,上面的方法很棒。我尝试了其中一些。最后,我发现了这样的方法:

    from threading import Timer

    def sync():

        do something...

        sync_timer = Timer(self.interval, sync, ())
        sync_timer.start()

就像递归一样。

好的,我希望这种方法可以满足您的要求。:)

Yes, the method above is so great. And I tried some of them. At last, I found a method like this:

    from threading import Timer

    def sync():

        do something...

        sync_timer = Timer(self.interval, sync, ())
        sync_timer.start()

Just like Recursive.

Ok, I hope this method can meet your requirement. :)


回答 18

与Celery相比,更现代的解决方案是Django Q:https//django-q.readthedocs.io/en/latest/index.html

它具有出色的文档,并且很容易理解。缺少Windows支持,因为Windows不支持流程分支。但是,如果您使用Windows for Linux子系统创建开发环境,则效果很好。

A more modern solution (compared to Celery) is Django Q: https://django-q.readthedocs.io/en/latest/index.html

It has great documentation and is easy to grok. Windows support is lacking, because Windows does not support process forking. But it works fine if you create your dev environment using the Windows for Linux Subsystem.


回答 19

我用Celery做我的定期任务。首先,您需要按以下步骤安装它:

pip install django-celery

不要忘记注册django-celery设置,然后您可以执行以下操作:

from celery import task
from celery.decorators import periodic_task
from celery.task.schedules import crontab
from celery.utils.log import get_task_logger
@periodic_task(run_every=crontab(minute="0", hour="23"))
def do_every_midnight():
 #your code

I use celery to create my periodical tasks. First you need to install it as follows:

pip install django-celery

Don’t forget to register django-celery in your settings and then you could do something like this:

from celery import task
from celery.decorators import periodic_task
from celery.task.schedules import crontab
from celery.utils.log import get_task_logger
@periodic_task(run_every=crontab(minute="0", hour="23"))
def do_every_midnight():
 #your code

回答 20

我不确定这对任何人都有用,因为我必须提供系统的其他用户来计划作业,而又不让他们访问实际的服务器(Windows)任务计划程序,因此我创建了这个可重用的应用程序。

请注意,用户可以访问服务器上的一个共享文件夹,可以在其中创建所需的command / task / .bat文件。然后可以使用此应用安排此任务。

应用名称为 Django_Windows_Scheduler

屏幕截图:

I am not sure will this be useful for anyone, since I had to provide other users of the system to schedule the jobs, without giving them access to the actual server(windows) Task Scheduler, I created this reusable app.

Please note users have access to one shared folder on server where they can create required command/task/.bat file. This task then can be scheduled using this app.

App name is Django_Windows_Scheduler

ScreenShot:


回答 21

如果您想要比Celery可靠的产品,请尝试构建在AWS SQS / SNS之上的TaskHawk

请参阅:http : //taskhawk.readthedocs.io

If you want something more reliable than Celery, try TaskHawk which is built on top of AWS SQS/SNS.

Refer: http://taskhawk.readthedocs.io


回答 22

对于简单的dockerized项目,我真的看不到任何现有的合适答案。

因此,我写了一个非常准系统的解决方案,不需要外部库或触发器,它们可以独立运行。无需外部os-cron,就可以在每种环境下工作。

它通过添加中间件来工作: middleware.py

import threading

def should_run(name, seconds_interval):
    from application.models import CronJob
    from django.utils.timezone import now

    try:
        c = CronJob.objects.get(name=name)
    except CronJob.DoesNotExist:
        CronJob(name=name, last_ran=now()).save()
        return True

    if (now() - c.last_ran).total_seconds() >= seconds_interval:
        c.last_ran = now()
        c.save()
        return True

    return False


class CronTask:
    def __init__(self, name, seconds_interval, function):
        self.name = name
        self.seconds_interval = seconds_interval
        self.function = function


def cron_worker(*_):
    if not should_run("main", 60):
        return

    # customize this part:
    from application.models import Event
    tasks = [
        CronTask("events", 60 * 30, Event.clean_stale_objects),
        # ...
    ]

    for task in tasks:
        if should_run(task.name, task.seconds_interval):
            task.function()


def cron_middleware(get_response):

    def middleware(request):
        response = get_response(request)
        threading.Thread(target=cron_worker).start()
        return response

    return middleware

models/cron.py

from django.db import models


class CronJob(models.Model):
    name = models.CharField(max_length=10, primary_key=True)
    last_ran = models.DateTimeField()

settings.py

MIDDLEWARE = [
    ...
    'application.middleware.cron_middleware',
    ...
]

For simple dockerized projects, I could not really see any existing answer fit.

So I wrote a very barebones solution without the need of external libraries or triggers, which runs on its own. No external os-cron needed, should work in every environment.

It works by adding a middleware: middleware.py

import threading

def should_run(name, seconds_interval):
    from application.models import CronJob
    from django.utils.timezone import now

    try:
        c = CronJob.objects.get(name=name)
    except CronJob.DoesNotExist:
        CronJob(name=name, last_ran=now()).save()
        return True

    if (now() - c.last_ran).total_seconds() >= seconds_interval:
        c.last_ran = now()
        c.save()
        return True

    return False


class CronTask:
    def __init__(self, name, seconds_interval, function):
        self.name = name
        self.seconds_interval = seconds_interval
        self.function = function


def cron_worker(*_):
    if not should_run("main", 60):
        return

    # customize this part:
    from application.models import Event
    tasks = [
        CronTask("events", 60 * 30, Event.clean_stale_objects),
        # ...
    ]

    for task in tasks:
        if should_run(task.name, task.seconds_interval):
            task.function()


def cron_middleware(get_response):

    def middleware(request):
        response = get_response(request)
        threading.Thread(target=cron_worker).start()
        return response

    return middleware

models/cron.py:

from django.db import models


class CronJob(models.Model):
    name = models.CharField(max_length=10, primary_key=True)
    last_ran = models.DateTimeField()

settings.py:

MIDDLEWARE = [
    ...
    'application.middleware.cron_middleware',
    ...
]

回答 23

简单的方法是编写一个自定义的shell命令(请参阅Django文档)并在Linux上使用cronjob执行它。但是,我强烈建议您使用像RabbitMQ这样的消息代理以及Celery。也许你可以看看这个教程

Simple way is to write a custom shell command see Django Documentation and execute it using a cronjob on linux. However i would highly recommend using a message broker like RabbitMQ coupled with celery. Maybe you can have a look at this Tutorial