Currently I’m working on python project that requires implement some background jobs (mostly for email sending and heavily database updates). I use Redis for task broker. So in this point I have two candidates: Celery and RQ. I had some experience with these job queues, but I want to ask you guys to share you experience of using this tools. So.
What pros and cons to use Celery vs. RQ.
Any examples of projects/task suitable to use Celery vs. RQ.
Celery looks pretty complicated but it’s full featured solution. Actually I don’t think that I need all these features. From other side RQ is very simple (e.g configuration, integration), but it seems that it lacks some useful features (e.g task revoking, code auto-reloading)
Here is what I have found while trying to answer this exact same question. It’s probably not comprehensive, and may even be inaccurate on some points.
In short, RQ is designed to be simpler all around. Celery is designed to be more robust. They are both excellent.
Documentation. RQ’s documentation is comprehensive without being complex, and mirrors the project’s overall simplicity – you never feel lost or confused. Celery’s documentation is also comprehensive, but expect to be re-visiting it quite a lot when you’re first setting things up as there are too many options to internalize
Monitoring. Celery’s Flower and the RQ dashboard are both very simple to setup and give you at least 90% of all information you would ever want
Broker support. Celery is the clear winner, RQ only supports Redis. This means less documentation on “what is a broker”, but also means you cannot switch brokers in the future if Redis no longer works for you. For example, Instagram considered both Redis and RabbitMQ with Celery. This is important because different brokers have different guarantees e.g. Redis cannot (as of writing) guarantee 100% that your messages are delivered.
Priority queues. RQs priority queue model is simple and effective – workers read from queues in order. Celery requires spinning up multiple workers to consume from different queues. Both approaches work
OS Support. Celery is the clear winner here, as RQ only runs on systems that support fork e.g. Unix systems
Language support. RQ only supports Python, whereas Celery lets you send tasks from one language to a different language
API. Celery is extremely flexible (multiple result backends, nice config format, workflow canvas support) but naturally this power can be confusing. By contrast, the RQ api is simple.
Subtask support. Celery supports subtasks (e.g. creating new tasks from within existing tasks). I don’t know if RQ does
Community and Stability. Celery is probably more established, but they are both active projects. As of writing, Celery has ~3500 stars on Github while RQ has ~2000 and both projects show active development
In my opinion, Celery is not as complex as its reputation might lead you to believe, but you will have to RTFM.
So, why would anyone be willing to trade the (arguably more full-featured) Celery for RQ? In my mind, it all comes down to the simplicity. By restricting itself to Redis+Unix, RQ provides simpler documentation, simpler codebase, and a simpler API. This means you (and potential contributors to your project) can focus on the code you care about, instead of having to keep details about the task queue system in your working memory. We all have a limit on how many details can be in our head at once, and by removing the need to keep task queue details in there RQ lets get back to the code you care about. That simplicity comes at the expense of features like inter-language task queues, wide OS support, 100% reliable message guarantees, and ability to switch message brokers easily.
Celery is not that complicated. At its core, you do the step by step configuration from the tutorials, create a celery instance, decorate your function with @celery.task then run the task with my_task.delay(*args, **kwargs).
Judging from your own assessment, it seems you have to choose between lacking (key) features or having some excess hanging around. That is not too hard of a choice in my book.
02**7/usr/bin/run-backup # run the backups at 0200 on Every Sunday09-17/2**1-5/usr/bin/purge-temps # run the purge temps command, every 2 hours between 9am and 5pm on Mondays to Fridays.
I’m looking for a library in Python which will provide at and cron like functionality.
I’d quite like have a pure Python solution, rather than relying on tools installed on the box; this way I run on machines with no cron.
For those unfamiliar with cron: you can schedule tasks based upon an expression like:
0 2 * * 7 /usr/bin/run-backup # run the backups at 0200 on Every Sunday
0 9-17/2 * * 1-5 /usr/bin/purge-temps # run the purge temps command, every 2 hours between 9am and 5pm on Mondays to Fridays.
The cron time expression syntax is less important, but I would like to have something with this sort of flexibility.
If there isn’t something that does this for me out-the-box, any suggestions for the building blocks to make something like this would be gratefully received.
Edit
I’m not interested in launching processes, just “jobs” also written in Python – python functions. By necessity I think this would be a different thread, but not in a different process.
To this end, I’m looking for the expressivity of the cron time expression, but in Python.
Cron has been around for years, but I’m trying to be as portable as possible. I cannot rely on its presence.
You could just use normal Python argument passing syntax to specify your crontab. For example, suppose we define an Event class as below:
from datetime import datetime, timedelta
import time
# Some utility classes / functions first
class AllMatch(set):
"""Universal set - match everything"""
def __contains__(self, item): return True
allMatch = AllMatch()
def conv_to_set(obj): # Allow single integer to be provided
if isinstance(obj, (int,long)):
return set([obj]) # Single item
if not isinstance(obj, set):
obj = set(obj)
return obj
# The actual Event class
class Event(object):
def __init__(self, action, min=allMatch, hour=allMatch,
day=allMatch, month=allMatch, dow=allMatch,
args=(), kwargs={}):
self.mins = conv_to_set(min)
self.hours= conv_to_set(hour)
self.days = conv_to_set(day)
self.months = conv_to_set(month)
self.dow = conv_to_set(dow)
self.action = action
self.args = args
self.kwargs = kwargs
def matchtime(self, t):
"""Return True if this event should trigger at the specified datetime"""
return ((t.minute in self.mins) and
(t.hour in self.hours) and
(t.day in self.days) and
(t.month in self.months) and
(t.weekday() in self.dow))
def check(self, t):
if self.matchtime(t):
self.action(*self.args, **self.kwargs)
(Note: Not thoroughly tested)
Then your CronTab can be specified in normal python syntax as:
This way you get the full power of Python’s argument mechanics (mixing positional and keyword args, and can use symbolic names for names of weeks and months)
The CronTab class would be defined as simply sleeping in minute increments, and calling check() on each event. (There are probably some subtleties with daylight savings time / timezones to be wary of though). Here’s a quick implementation:
class CronTab(object):
def __init__(self, *events):
self.events = events
def run(self):
t=datetime(*datetime.now().timetuple()[:5])
while 1:
for e in self.events:
e.check(t)
t += timedelta(minutes=1)
while datetime.now() < t:
time.sleep((t - datetime.now()).seconds)
A few things to note: Python’s weekdays / months are zero indexed (unlike cron), and that range excludes the last element, hence syntax like “1-5” becomes range(0,5) – ie [0,1,2,3,4]. If you prefer cron syntax, parsing it shouldn’t be too difficult however.
"""Gevent based crontab implementation"""from datetime import datetime, timedelta
import gevent
# Some utility classes / functions firstdef conv_to_set(obj):"""Converts to set allowing single integer to be provided"""if isinstance(obj,(int, long)):return set([obj])# Single itemifnot isinstance(obj, set):
obj = set(obj)return obj
classAllMatch(set):"""Universal set - match everything"""def __contains__(self, item):returnTrue
allMatch =AllMatch()classEvent(object):"""The Actual Event Class"""def __init__(self, action, minute=allMatch, hour=allMatch,
day=allMatch, month=allMatch, daysofweek=allMatch,
args=(), kwargs={}):
self.mins = conv_to_set(minute)
self.hours = conv_to_set(hour)
self.days = conv_to_set(day)
self.months = conv_to_set(month)
self.daysofweek = conv_to_set(daysofweek)
self.action = action
self.args = args
self.kwargs = kwargs
def matchtime(self, t1):"""Return True if this event should trigger at the specified datetime"""return((t1.minute in self.mins)and(t1.hour in self.hours)and(t1.day in self.days)and(t1.month in self.months)and(t1.weekday()in self.daysofweek))def check(self, t):"""Check and run action if needed"""if self.matchtime(t):
self.action(*self.args,**self.kwargs)classCronTab(object):"""The crontab implementation"""def __init__(self,*events):
self.events = events
def _check(self):"""Check all events in separate greenlets"""
t1 = datetime(*datetime.now().timetuple()[:5])for event in self.events:
gevent.spawn(event.check, t1)
t1 += timedelta(minutes=1)
s1 =(t1 - datetime.now()).seconds +1print"Checking again in %s seconds"% s1
job = gevent.spawn_later(s1, self._check)def run(self):"""Run the cron forever"""
self._check()whileTrue:
gevent.sleep(60)import os
def test_task():"""Just an example that sends a bell and asd to all terminals"""
os.system('echo asd | wall')
cron =CronTab(Event(test_task,22,1),Event(test_task,0, range(9,18,2), daysofweek=range(0,5)),)
cron.run()
More or less same as above but concurrent using gevent :)
"""Gevent based crontab implementation"""
from datetime import datetime, timedelta
import gevent
# Some utility classes / functions first
def conv_to_set(obj):
"""Converts to set allowing single integer to be provided"""
if isinstance(obj, (int, long)):
return set([obj]) # Single item
if not isinstance(obj, set):
obj = set(obj)
return obj
class AllMatch(set):
"""Universal set - match everything"""
def __contains__(self, item):
return True
allMatch = AllMatch()
class Event(object):
"""The Actual Event Class"""
def __init__(self, action, minute=allMatch, hour=allMatch,
day=allMatch, month=allMatch, daysofweek=allMatch,
args=(), kwargs={}):
self.mins = conv_to_set(minute)
self.hours = conv_to_set(hour)
self.days = conv_to_set(day)
self.months = conv_to_set(month)
self.daysofweek = conv_to_set(daysofweek)
self.action = action
self.args = args
self.kwargs = kwargs
def matchtime(self, t1):
"""Return True if this event should trigger at the specified datetime"""
return ((t1.minute in self.mins) and
(t1.hour in self.hours) and
(t1.day in self.days) and
(t1.month in self.months) and
(t1.weekday() in self.daysofweek))
def check(self, t):
"""Check and run action if needed"""
if self.matchtime(t):
self.action(*self.args, **self.kwargs)
class CronTab(object):
"""The crontab implementation"""
def __init__(self, *events):
self.events = events
def _check(self):
"""Check all events in separate greenlets"""
t1 = datetime(*datetime.now().timetuple()[:5])
for event in self.events:
gevent.spawn(event.check, t1)
t1 += timedelta(minutes=1)
s1 = (t1 - datetime.now()).seconds + 1
print "Checking again in %s seconds" % s1
job = gevent.spawn_later(s1, self._check)
def run(self):
"""Run the cron forever"""
self._check()
while True:
gevent.sleep(60)
import os
def test_task():
"""Just an example that sends a bell and asd to all terminals"""
os.system('echo asd | wall')
cron = CronTab(
Event(test_task, 22, 1 ),
Event(test_task, 0, range(9,18,2), daysofweek=range(0,5)),
)
cron.run()
schedule ="*/5 * * * *"# Run every five minutes
nextRunTime = getNextCronRunTime(schedule)whileTrue:
roundedDownTime = roundDownTime()if(roundedDownTime == nextRunTime):####################################### Do your periodic thing here. #######################################
nextRunTime = getNextCronRunTime(schedule)elif(roundedDownTime > nextRunTime):# We missed an execution. Error. Re initialize.
nextRunTime = getNextCronRunTime(schedule)
sleepTillTopOfNextMinute()
辅助程序:
from croniter import croniter
from datetime import datetime, timedelta
# Round time down to the top of the previous minutedef roundDownTime(dt=None, dateDelta=timedelta(minutes=1)):
roundTo = dateDelta.total_seconds()if dt ==None: dt = datetime.now()
seconds =(dt - dt.min).seconds
rounding =(seconds+roundTo/2)// roundTo * roundTo
return dt + timedelta(0,rounding-seconds,-dt.microsecond)# Get next run time from now, based on schedule specified by cron stringdef getNextCronRunTime(schedule):return croniter(schedule, datetime.now()).get_next(datetime)# Sleep till the top of the next minutedef sleepTillTopOfNextMinute():
t = datetime.utcnow()
sleeptime =60-(t.second + t.microsecond/1000000.0)
time.sleep(sleeptime)
None of the listed solutions even attempt to parse a complex cron schedule string. So, here is my version, using croniter. Basic gist:
schedule = "*/5 * * * *" # Run every five minutes
nextRunTime = getNextCronRunTime(schedule)
while True:
roundedDownTime = roundDownTime()
if (roundedDownTime == nextRunTime):
####################################
### Do your periodic thing here. ###
####################################
nextRunTime = getNextCronRunTime(schedule)
elif (roundedDownTime > nextRunTime):
# We missed an execution. Error. Re initialize.
nextRunTime = getNextCronRunTime(schedule)
sleepTillTopOfNextMinute()
Helper routines:
from croniter import croniter
from datetime import datetime, timedelta
# Round time down to the top of the previous minute
def roundDownTime(dt=None, dateDelta=timedelta(minutes=1)):
roundTo = dateDelta.total_seconds()
if dt == None : dt = datetime.now()
seconds = (dt - dt.min).seconds
rounding = (seconds+roundTo/2) // roundTo * roundTo
return dt + timedelta(0,rounding-seconds,-dt.microsecond)
# Get next run time from now, based on schedule specified by cron string
def getNextCronRunTime(schedule):
return croniter(schedule, datetime.now()).get_next(datetime)
# Sleep till the top of the next minute
def sleepTillTopOfNextMinute():
t = datetime.utcnow()
sleeptime = 60 - (t.second + t.microsecond/1000000.0)
time.sleep(sleeptime)
回答 7
我已经修改了脚本。
易于使用:
cron =Cron()
cron.add('* * * * *', minute_task)# every minute
cron.add('33 * * * *', day_task)# every hour
cron.add('34 18 * * *', day_task)# every day
cron.run()
classCronTab(object):def __init__(self,*events):
self.events = events
def run(self):
t=datetime(*datetime.now().timetuple()[:5])while1:for e in self.events:
e.check(t)
t += timedelta(minutes=1)
n = datetime.now()while n < t:
s =(t - n).seconds +1
time.sleep(s)
n = datetime.now()
The timing was out by one second leading to a one-second, hard loop at the end of each minute.
class CronTab(object):
def __init__(self, *events):
self.events = events
def run(self):
t=datetime(*datetime.now().timetuple()[:5])
while 1:
for e in self.events:
e.check(t)
t += timedelta(minutes=1)
n = datetime.now()
while n < t:
s = (t - n).seconds + 1
time.sleep(s)
n = datetime.now()
回答 9
没有“纯python”方法可以执行此操作,因为某些其他过程将必须启动python才能运行您的解决方案。每个平台都有一种或二十种不同的方式来启动流程和监视其进度。在UNIX平台上,cron是旧标准。在Mac OS X上,还启动了该功能,它将类似cron的启动与看门狗功能结合在一起,如果需要的话,可以使您的进程保持活动状态。python运行后,即可使用sched模块安排任务。
There isn’t a “pure python” way to do this because some other process would have to launch python in order to run your solution. Every platform will have one or twenty different ways to launch processes and monitor their progress. On unix platforms, cron is the old standard. On Mac OS X there is also launchd, which combines cron-like launching with watchdog functionality that can keep your process alive if that’s what you want. Once python is running, then you can use the sched module to schedule tasks.
@repeatEveryDay(hour=6, minutes=30)def sayHello(name):print(f"Hello {name}")
sayHello("Bob")# Now this function will be invoked every day at 6.30 a.m
装饰器将如下所示:
def repeatEveryDay(hour, minutes=0, seconds=0):"""
Decorator that will run the decorated function everyday at that hour, minutes and seconds.
:param hour: 0-24
:param minutes: 0-60 (Optional)
:param seconds: 0-60 (Optional)
"""def decoratorRepeat(func):@functools.wraps(func)def wrapperRepeat(*args,**kwargs):def getLocalTime():return datetime.datetime.fromtimestamp(time.mktime(time.localtime()))# Get the datetime of the first function call
td = datetime.timedelta(seconds=15)if wrapperRepeat.nextSent ==None:
now = getLocalTime()
wrapperRepeat.nextSent = datetime.datetime(now.year, now.month, now.day, hour, minutes, seconds)if wrapperRepeat.nextSent < now:
wrapperRepeat.nextSent += td
# Waiting till next daywhile getLocalTime()< wrapperRepeat.nextSent:
time.sleep(1)# Call the function
func(*args,**kwargs)# Get the datetime of the next function call
wrapperRepeat.nextSent += td
wrapperRepeat(*args,**kwargs)
wrapperRepeat.nextSent =Nonereturn wrapperRepeat
return decoratorRepeat
I know there are a lot of answers, but another solution could be to go with decorators. This is an example to repeat a function everyday at a specific time. The cool think about using this way is that you only need to add the Syntactic Sugar to the function you want to schedule:
@repeatEveryDay(hour=6, minutes=30)
def sayHello(name):
print(f"Hello {name}")
sayHello("Bob") # Now this function will be invoked every day at 6.30 a.m
And the decorator will look like:
def repeatEveryDay(hour, minutes=0, seconds=0):
"""
Decorator that will run the decorated function everyday at that hour, minutes and seconds.
:param hour: 0-24
:param minutes: 0-60 (Optional)
:param seconds: 0-60 (Optional)
"""
def decoratorRepeat(func):
@functools.wraps(func)
def wrapperRepeat(*args, **kwargs):
def getLocalTime():
return datetime.datetime.fromtimestamp(time.mktime(time.localtime()))
# Get the datetime of the first function call
td = datetime.timedelta(seconds=15)
if wrapperRepeat.nextSent == None:
now = getLocalTime()
wrapperRepeat.nextSent = datetime.datetime(now.year, now.month, now.day, hour, minutes, seconds)
if wrapperRepeat.nextSent < now:
wrapperRepeat.nextSent += td
# Waiting till next day
while getLocalTime() < wrapperRepeat.nextSent:
time.sleep(1)
# Call the function
func(*args, **kwargs)
# Get the datetime of the next function call
wrapperRepeat.nextSent += td
wrapperRepeat(*args, **kwargs)
wrapperRepeat.nextSent = None
return wrapperRepeat
return decoratorRepeat
Brian’s solution is working quite well. However, as others have pointed out, there is a subtle bug in the run code. Also i found it overly complicated for the needs.
Here is my simpler and functional alternative for the run code in case anybody needs it:
def run(self):
while 1:
t = datetime.now()
for e in self.events:
e.check(t)
time.sleep(60 - t.second - t.microsecond / 1000000.0)
回答 12
另一个简单的解决方案是:
from aqcron importAtfrom time import sleep
from datetime import datetime
# Event scheduling
event_1 =At( second=5)
event_2 =At( second=[0,20,40])whileTrue:
now = datetime.now()# Event checkif now in event_1:print"event_1"if now in event_2:print"event_2"
sleep(1)
而类aqcron.At是:
# aqcron.pyclassAt(object):def __init__(self, year=None, month=None,
day=None, weekday=None,
hour=None, minute=None,
second=None):
loc = locals()
loc.pop("self")
self.at = dict((k, v)for k, v in loc.iteritems()if v !=None)def __contains__(self, now):for k in self.at.keys():try:ifnot getattr(now, k)in self.at[k]:returnFalseexceptTypeError:if self.at[k]!= getattr(now, k):returnFalsereturnTrue
from aqcron import At
from time import sleep
from datetime import datetime
# Event scheduling
event_1 = At( second=5 )
event_2 = At( second=[0,20,40] )
while True:
now = datetime.now()
# Event check
if now in event_1: print "event_1"
if now in event_2: print "event_2"
sleep(1)
And the class aqcron.At is:
# aqcron.py
class At(object):
def __init__(self, year=None, month=None,
day=None, weekday=None,
hour=None, minute=None,
second=None):
loc = locals()
loc.pop("self")
self.at = dict((k, v) for k, v in loc.iteritems() if v != None)
def __contains__(self, now):
for k in self.at.keys():
try:
if not getattr(now, k) in self.at[k]: return False
except TypeError:
if self.at[k] != getattr(now, k): return False
return True
If you are looking for a distributed scheduler, you can check out https://github.com/sherinkurian/mani – it does need redis though so might not be what you are looking for. (note that i am the author)
this was built to ensure fault-tolerance by having clock run on more than one node.
I don’t know if something like that already exists. It would be easy to write your own with time, datetime and/or calendar modules, see http://docs.python.org/library/time.html
The only concern for a python solution is that your job needs to be always running and possibly be automatically “resurrected” after a reboot, something for which you do need to rely on system dependent solutions.
You can check out PiCloud’s [1] Crons [2], but do note that your jobs won’t be running on your own machine. It’s also a service that you’ll need to pay for if you use more than 20 hours of compute time a month.
I’ve been working on a web app using Django, and I’m curious if there is a way to schedule a job to run periodically.
Basically I just want to run through the database and make some calculations/updates on an automatic, regular basis, but I can’t seem to find any documentation on doing this.
Does anyone know how to set this up?
To clarify: I know I can set up a cron job to do this, but I’m curious if there is some feature in Django that provides this functionality. I’d like people to be able to deploy this app themselves without having to do much config (preferably zero).
I’ve considered triggering these actions “retroactively” by simply checking if a job should have been run since the last time a request was sent to the site, but I’m hoping for something a bit cleaner.
2) Use cron (on Linux) or at (on Windows) to run my command at the required times.
This is a simple solution that doesn’t require installing a heavy AMQP stack. However there are nice advantages to using something like Celery, mentioned in the other answers. In particular, with Celery it is nice to not have to spread your application logic out into crontab files. However the cron solution works quite nicely for a small to medium sized application and where you don’t want a lot of external dependencies.
EDIT:
In later version of windows the at command is deprecated for Windows 8, Server 2012 and above. You can use schtasks.exe for same use.
**** UPDATE ****
This the new link of django doc for writing the custom management command
Celery is a distributed task queue, built on AMQP (RabbitMQ). It also handles periodic tasks in a cron-like fashion (see periodic tasks). Depending on your app, it might be worth a gander.
Celery is pretty easy to set up with django (docs), and periodic tasks will actually skip missed tasks in case of a downtime. Celery also has built-in retry mechanisms, in case a task fails.
That does the job. Each cron is modeled as a class (so its all OO) and each cron runs at a different frequency and we make sure the same cron type doesn’t run in parallel (in case crons themselves take longer time to run than their frequency!)
Look at Django Poor Man’s Cron which is a Django app that makes use of spambots, search engine indexing robots and alike to run scheduled tasks in approximately regular intervals
Brian Neal’s suggestion of running management commands via cron works well, but if you’re looking for something a little more robust (yet not as elaborate as Celery) I’d look into a library like Kronos:
RabbitMQ and Celery have more features and task handling capabilities than Cron. If task failure isn’t an issue, and you think you will handle broken tasks in the next call, then Cron is sufficient.
Celery & AMQP will let you handle the broken task, and it will get executed again by another worker (Celery workers listen for the next task to work on), until the task’s max_retries attribute is reached. You can even invoke tasks on failure, like logging the failure, or sending an email to the admin once the max_retries has been reached.
And you can distribute Celery and AMQP servers when you need to scale your application.
Although not part of Django, Airflow is a more recent project (as of 2016) that is useful for task management.
Airflow is a workflow automation and scheduling system that can be used to author and manage data pipelines. A web-based UI provides the developer with a range of options for managing and viewing these pipelines.
Airflow is written in Python and is built using Flask.
Airflow was created by Maxime Beauchemin at Airbnb and open sourced in the spring of 2015. It joined the Apache Software Foundation’s incubation program in the winter of 2016. Here is the Git project page and some addition background information.
回答 11
将以下内容放在cron.py文件的顶部:
#!/usr/bin/pythonimport os, sys
sys.path.append('/path/to/')# the parent directory of the project
sys.path.append('/path/to/project')# these lines only needed if not on path
os.environ['DJANGO_SETTINGS_MODULE']='myproj.settings'# imports and code below
Put the following at the top of your cron.py file:
#!/usr/bin/python
import os, sys
sys.path.append('/path/to/') # the parent directory of the project
sys.path.append('/path/to/project') # these lines only needed if not on path
os.environ['DJANGO_SETTINGS_MODULE'] = 'myproj.settings'
# imports and code below
after the part of code,I can write anything just like my views.py :)
#######################################
import os,sys
sys.path.append('/home/administrator/development/store')
os.environ['DJANGO_SETTINGS_MODULE']='store.settings'
from django.core.management impor setup_environ
from store import settings
setup_environ(settings)
#######################################
# django-q# -------------------------------------------------------------------------# See: http://django-q.readthedocs.io/en/latest/configure.html
Q_CLUSTER ={# Match recommended settings from docs.'name':'DjangoORM','workers':4,'queue_limit':50,'bulk':10,'orm':'default',# Custom Settings# ---------------# Limit the amount of successful tasks saved to Django.'save_limit':10000,# See https://github.com/Koed00/django-q/issues/110.'catch_up':False,# Number of seconds a worker can spend on a task before it's terminated.'timeout':60*5,# Number of seconds a broker will wait for a cluster to finish a task before presenting it again. This needs to be# longer than `timeout`, otherwise the same task will be processed multiple times.'retry':60*6,# Whether to force all async() calls to be run with sync=True (making them synchronous).'sync':False,# Redirect worker exceptions directly to Sentry error reporter.'error_reporter':{'sentry': RAVEN_CONFIG,},}
You should definitely check out django-q!
It requires no additional configuration and has quite possibly everything needed to handle any production issues on commercial projects.
It’s actively developed and integrates very well with django, django ORM, mongo, redis. Here is my configuration:
# django-q
# -------------------------------------------------------------------------
# See: http://django-q.readthedocs.io/en/latest/configure.html
Q_CLUSTER = {
# Match recommended settings from docs.
'name': 'DjangoORM',
'workers': 4,
'queue_limit': 50,
'bulk': 10,
'orm': 'default',
# Custom Settings
# ---------------
# Limit the amount of successful tasks saved to Django.
'save_limit': 10000,
# See https://github.com/Koed00/django-q/issues/110.
'catch_up': False,
# Number of seconds a worker can spend on a task before it's terminated.
'timeout': 60 * 5,
# Number of seconds a broker will wait for a cluster to finish a task before presenting it again. This needs to be
# longer than `timeout`, otherwise the same task will be processed multiple times.
'retry': 60 * 6,
# Whether to force all async() calls to be run with sync=True (making them synchronous).
'sync': False,
# Redirect worker exceptions directly to Sentry error reporter.
'error_reporter': {
'sentry': RAVEN_CONFIG,
},
}
Django APScheduler for Scheduler Jobs. Advanced Python Scheduler (APScheduler) is a Python library that lets you schedule your Python code to be executed later, either just once or periodically. You can add new jobs or remove old ones on the fly as you please.
note: I’m the author of this library
Install APScheduler
pip install apscheduler
View file function to call
file name: scheduler_jobs.py
def FirstCronTest():
print("")
print("I am executed..!")
Configuring the scheduler
make execute.py file and add the below codes
from apscheduler.schedulers.background import BackgroundScheduler
scheduler = BackgroundScheduler()
Your written functions Here, the scheduler functions are written in scheduler_jobs
It has great documentation and is easy to grok. Windows support is lacking, because Windows does not support process forking. But it works fine if you create your dev environment using the Windows for Linux Subsystem.
回答 19
我用Celery做我的定期任务。首先,您需要按以下步骤安装它:
pip install django-celery
不要忘记注册django-celery设置,然后您可以执行以下操作:
from celery import task
from celery.decorators import periodic_task
from celery.task.schedules import crontab
from celery.utils.log import get_task_logger
@periodic_task(run_every=crontab(minute="0", hour="23"))def do_every_midnight():#your code
I am not sure will this be useful for anyone, since I had to provide other users of the system to schedule the jobs, without giving them access to the actual server(windows) Task Scheduler, I created this reusable app.
Please note users have access to one shared folder on server where they can create required command/task/.bat file. This task then can be scheduled using this app.
For simple dockerized projects, I could not really see any existing answer fit.
So I wrote a very barebones solution without the need of external libraries or triggers, which runs on its own. No external os-cron needed, should work in every environment.
It works by adding a middleware: middleware.py
import threading
def should_run(name, seconds_interval):
from application.models import CronJob
from django.utils.timezone import now
try:
c = CronJob.objects.get(name=name)
except CronJob.DoesNotExist:
CronJob(name=name, last_ran=now()).save()
return True
if (now() - c.last_ran).total_seconds() >= seconds_interval:
c.last_ran = now()
c.save()
return True
return False
class CronTask:
def __init__(self, name, seconds_interval, function):
self.name = name
self.seconds_interval = seconds_interval
self.function = function
def cron_worker(*_):
if not should_run("main", 60):
return
# customize this part:
from application.models import Event
tasks = [
CronTask("events", 60 * 30, Event.clean_stale_objects),
# ...
]
for task in tasks:
if should_run(task.name, task.seconds_interval):
task.function()
def cron_middleware(get_response):
def middleware(request):
response = get_response(request)
threading.Thread(target=cron_worker).start()
return response
return middleware
models/cron.py:
from django.db import models
class CronJob(models.Model):
name = models.CharField(max_length=10, primary_key=True)
last_ran = models.DateTimeField()
Simple way is to write a custom shell command see Django Documentation and execute it using a cronjob on linux. However i would highly recommend using a message broker like RabbitMQ coupled with celery. Maybe you can have a look at
this Tutorial