I have Python script bgservice.py and I want it to run all the time, because it is part of the web service I build. How can I make it run continuously even after I logout SSH?
You might consider turning your python script into a proper python daemon, as described here.
python-daemon is a good tool that can be used to run python scripts as a background daemon process rather than a forever running script. You will need to modify existing code a bit but its plain and simple.
If you are facing problems with python-daemon, there is another utility supervisor that will do the same for you, but in this case you wont have to write any code (or modify existing) as this is a out of the box solution for daemonizing processes.
import os, timedef daemon(func):def wrapper(*args,**kwargs):if os.fork():return
func(*args,**kwargs)
os._exit(os.EX_OK)return wrapper@daemondef my_func(count=10):for i in range(0,count):print('parent pid: %d'% os.getppid())
time.sleep(1)
my_func(count=10)#still in parent thread
time.sleep(2)#after 2 seconds the function my_func lives on is own
Here is a simple solution inside python using a decorator:
import os, time
def daemon(func):
def wrapper(*args, **kwargs):
if os.fork(): return
func(*args, **kwargs)
os._exit(os.EX_OK)
return wrapper
@daemon
def my_func(count=10):
for i in range(0,count):
print('parent pid: %d' % os.getppid())
time.sleep(1)
my_func(count=10)
#still in parent thread
time.sleep(2)
#after 2 seconds the function my_func lives on is own
You can of course replace the content of your bgservice.py file in place of my_func.
If what you need is that the process should run forever no matter whether you are logged in or not, consider running the process as a daemon.
supervisord is a great out of the box solution that can be used to daemonize any process. It has another controlling utility supervisorctl that can be used to monitor processes that are being run by supervisor.
You don’t have to write any extra code or modify existing scripts to make this work. Moreover, verbose documentation makes this process much simpler.
After scratching my head for hours around python-daemon, supervisor is the solution that worked for me in minutes.
Hope this helps someone trying to make python-daemon work
回答 8
您也可以使用Yapdi:
基本用法:
import yapdi
daemon = yapdi.Daemon()
retcode = daemon.daemonize()# This would run in daemon mode; output is not visibleif retcode == yapdi.OPERATION_SUCCESSFUL:print('Hello Daemon')
import yapdi
daemon = yapdi.Daemon()
retcode = daemon.daemonize()
# This would run in daemon mode; output is not visible
if retcode == yapdi.OPERATION_SUCCESSFUL:
print('Hello Daemon')
However, even though syslog shows an entry when the task should have started, this task never actually runs (the log file for the script is empty). If I run the line manually from the shell, it works as expected.
The only way I can currently get the command to run via cron, is to break the commands up and put them in a dumb bash wrapper script:
#!/bin/sh
source /home/user/project/env/bin/activate
cd /home/user/project/
./manage.py command arg
EDIT:
ars came up with a working combination of commands:
Running source from a cronfile won’t work as cron uses /bin/sh as its default shell, which doesn’t support source. You need to set the SHELL environment variable to be /bin/bash:
It’s tricky to spot why this fails as /var/log/syslog doesn’t log the error details. Best to alias yourself to root so you get emailed with any cron errors. Simply add yourself to /etc/aliases and run sendmail -bi.
The only correct way to run python cron jobs when using a virtualenv is to activate the environment and then execute the environment’s python to run your code.
Another solution is echoing the complete command including activating the environment and piping it into /bin/bash. Consider this for your /etc/crontab:
Python3.3.2+(default,Feb282014,00:52:16)[GCC 4.8.1] on linux
Type"help","copyright","credits"or"license"for more information.>>>import sys
>>> sys.path
['','/usr/lib/python3.3','/usr/lib/python3.3/plat-x86_64-linux-gnu','/usr/lib/python3.3/lib-dynload']>>>import requests
Traceback(most recent call last):File"<stdin>", line 1,in<module>ImportError:No module named 'requests'>>> sys.path.insert(0,'/path/to/venv/modules/');>>>import requests
>>>
set the python path
to include the venv modules directory.
man python mentions modifying the path in shell at $PYTHONPATH or in python with sys.path
Other answers mention ideas for doing this using the shell. From python, adding the following lines to my script allows me to successfully run it directly from cron.
Python 3.3.2+ (default, Feb 28 2014, 00:52:16)
[GCC 4.8.1] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import sys
>>> sys.path
['', '/usr/lib/python3.3', '/usr/lib/python3.3/plat-x86_64-linux-gnu', '/usr/lib/python3.3/lib-dynload']
>>> import requests
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
ImportError: No module named 'requests'
>>> sys.path.insert(0,'/path/to/venv/modules/');
>>> import requests
>>>
I’d like to add this because I spent some time solving the issue and did not find an answer here for combination of variables usage in cron and virtualenv. So maybe it’ll help someone.
PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin
DIR_SMTH="cd /smth"
VENV=". venv/bin/activate"
CMD="some_python_bin do_something"
# m h dom mon dow command
0 * * * * $DIR_SMTH && $VENV && $CMD -k2 some_target >> /tmp/crontest.log 2>&1
It did not work well when it was configured like
DIR_SMTH=”cd /smth && . venv/bin/activate”
Thanks @davidwinterbottom, @reed-sandberg and @mkb for giving the right direction. The accepted answer actually works fine until your python need to run a script which have to run another python binary from venv/bin directory.
I am using miniconda with Conda version 4.7.12 on a Ubuntu 18.04.3 LTS.
I am able to place the above inside a script and run it via crontab as well without any trouble.
回答 8
python脚本
from datetime import datetime
import boto # check wheather its taking the virtualenv or not import sys
param1=sys.argv[1]#Param
myFile = open('appendtxt.txt','a')
myFile.write('\nAccessed on '+ param1+str(datetime.now()))
Cron命令
*/1**** cd /Workspace/testcron/&&/Workspace/testcron/venvcron/bin/python3 /Workspace/testcron/testcronwithparam.py param
from datetime import datetime
import boto # check wheather its taking the virtualenv or not
import sys
param1=sys.argv[1] #Param
myFile = open('appendtxt.txt', 'a')
myFile.write('\nAccessed on ' + param1+str(datetime.now()))
02**7/usr/bin/run-backup # run the backups at 0200 on Every Sunday09-17/2**1-5/usr/bin/purge-temps # run the purge temps command, every 2 hours between 9am and 5pm on Mondays to Fridays.
I’m looking for a library in Python which will provide at and cron like functionality.
I’d quite like have a pure Python solution, rather than relying on tools installed on the box; this way I run on machines with no cron.
For those unfamiliar with cron: you can schedule tasks based upon an expression like:
0 2 * * 7 /usr/bin/run-backup # run the backups at 0200 on Every Sunday
0 9-17/2 * * 1-5 /usr/bin/purge-temps # run the purge temps command, every 2 hours between 9am and 5pm on Mondays to Fridays.
The cron time expression syntax is less important, but I would like to have something with this sort of flexibility.
If there isn’t something that does this for me out-the-box, any suggestions for the building blocks to make something like this would be gratefully received.
Edit
I’m not interested in launching processes, just “jobs” also written in Python – python functions. By necessity I think this would be a different thread, but not in a different process.
To this end, I’m looking for the expressivity of the cron time expression, but in Python.
Cron has been around for years, but I’m trying to be as portable as possible. I cannot rely on its presence.
You could just use normal Python argument passing syntax to specify your crontab. For example, suppose we define an Event class as below:
from datetime import datetime, timedelta
import time
# Some utility classes / functions first
class AllMatch(set):
"""Universal set - match everything"""
def __contains__(self, item): return True
allMatch = AllMatch()
def conv_to_set(obj): # Allow single integer to be provided
if isinstance(obj, (int,long)):
return set([obj]) # Single item
if not isinstance(obj, set):
obj = set(obj)
return obj
# The actual Event class
class Event(object):
def __init__(self, action, min=allMatch, hour=allMatch,
day=allMatch, month=allMatch, dow=allMatch,
args=(), kwargs={}):
self.mins = conv_to_set(min)
self.hours= conv_to_set(hour)
self.days = conv_to_set(day)
self.months = conv_to_set(month)
self.dow = conv_to_set(dow)
self.action = action
self.args = args
self.kwargs = kwargs
def matchtime(self, t):
"""Return True if this event should trigger at the specified datetime"""
return ((t.minute in self.mins) and
(t.hour in self.hours) and
(t.day in self.days) and
(t.month in self.months) and
(t.weekday() in self.dow))
def check(self, t):
if self.matchtime(t):
self.action(*self.args, **self.kwargs)
(Note: Not thoroughly tested)
Then your CronTab can be specified in normal python syntax as:
This way you get the full power of Python’s argument mechanics (mixing positional and keyword args, and can use symbolic names for names of weeks and months)
The CronTab class would be defined as simply sleeping in minute increments, and calling check() on each event. (There are probably some subtleties with daylight savings time / timezones to be wary of though). Here’s a quick implementation:
class CronTab(object):
def __init__(self, *events):
self.events = events
def run(self):
t=datetime(*datetime.now().timetuple()[:5])
while 1:
for e in self.events:
e.check(t)
t += timedelta(minutes=1)
while datetime.now() < t:
time.sleep((t - datetime.now()).seconds)
A few things to note: Python’s weekdays / months are zero indexed (unlike cron), and that range excludes the last element, hence syntax like “1-5” becomes range(0,5) – ie [0,1,2,3,4]. If you prefer cron syntax, parsing it shouldn’t be too difficult however.
"""Gevent based crontab implementation"""from datetime import datetime, timedelta
import gevent
# Some utility classes / functions firstdef conv_to_set(obj):"""Converts to set allowing single integer to be provided"""if isinstance(obj,(int, long)):return set([obj])# Single itemifnot isinstance(obj, set):
obj = set(obj)return obj
classAllMatch(set):"""Universal set - match everything"""def __contains__(self, item):returnTrue
allMatch =AllMatch()classEvent(object):"""The Actual Event Class"""def __init__(self, action, minute=allMatch, hour=allMatch,
day=allMatch, month=allMatch, daysofweek=allMatch,
args=(), kwargs={}):
self.mins = conv_to_set(minute)
self.hours = conv_to_set(hour)
self.days = conv_to_set(day)
self.months = conv_to_set(month)
self.daysofweek = conv_to_set(daysofweek)
self.action = action
self.args = args
self.kwargs = kwargs
def matchtime(self, t1):"""Return True if this event should trigger at the specified datetime"""return((t1.minute in self.mins)and(t1.hour in self.hours)and(t1.day in self.days)and(t1.month in self.months)and(t1.weekday()in self.daysofweek))def check(self, t):"""Check and run action if needed"""if self.matchtime(t):
self.action(*self.args,**self.kwargs)classCronTab(object):"""The crontab implementation"""def __init__(self,*events):
self.events = events
def _check(self):"""Check all events in separate greenlets"""
t1 = datetime(*datetime.now().timetuple()[:5])for event in self.events:
gevent.spawn(event.check, t1)
t1 += timedelta(minutes=1)
s1 =(t1 - datetime.now()).seconds +1print"Checking again in %s seconds"% s1
job = gevent.spawn_later(s1, self._check)def run(self):"""Run the cron forever"""
self._check()whileTrue:
gevent.sleep(60)import os
def test_task():"""Just an example that sends a bell and asd to all terminals"""
os.system('echo asd | wall')
cron =CronTab(Event(test_task,22,1),Event(test_task,0, range(9,18,2), daysofweek=range(0,5)),)
cron.run()
More or less same as above but concurrent using gevent :)
"""Gevent based crontab implementation"""
from datetime import datetime, timedelta
import gevent
# Some utility classes / functions first
def conv_to_set(obj):
"""Converts to set allowing single integer to be provided"""
if isinstance(obj, (int, long)):
return set([obj]) # Single item
if not isinstance(obj, set):
obj = set(obj)
return obj
class AllMatch(set):
"""Universal set - match everything"""
def __contains__(self, item):
return True
allMatch = AllMatch()
class Event(object):
"""The Actual Event Class"""
def __init__(self, action, minute=allMatch, hour=allMatch,
day=allMatch, month=allMatch, daysofweek=allMatch,
args=(), kwargs={}):
self.mins = conv_to_set(minute)
self.hours = conv_to_set(hour)
self.days = conv_to_set(day)
self.months = conv_to_set(month)
self.daysofweek = conv_to_set(daysofweek)
self.action = action
self.args = args
self.kwargs = kwargs
def matchtime(self, t1):
"""Return True if this event should trigger at the specified datetime"""
return ((t1.minute in self.mins) and
(t1.hour in self.hours) and
(t1.day in self.days) and
(t1.month in self.months) and
(t1.weekday() in self.daysofweek))
def check(self, t):
"""Check and run action if needed"""
if self.matchtime(t):
self.action(*self.args, **self.kwargs)
class CronTab(object):
"""The crontab implementation"""
def __init__(self, *events):
self.events = events
def _check(self):
"""Check all events in separate greenlets"""
t1 = datetime(*datetime.now().timetuple()[:5])
for event in self.events:
gevent.spawn(event.check, t1)
t1 += timedelta(minutes=1)
s1 = (t1 - datetime.now()).seconds + 1
print "Checking again in %s seconds" % s1
job = gevent.spawn_later(s1, self._check)
def run(self):
"""Run the cron forever"""
self._check()
while True:
gevent.sleep(60)
import os
def test_task():
"""Just an example that sends a bell and asd to all terminals"""
os.system('echo asd | wall')
cron = CronTab(
Event(test_task, 22, 1 ),
Event(test_task, 0, range(9,18,2), daysofweek=range(0,5)),
)
cron.run()
schedule ="*/5 * * * *"# Run every five minutes
nextRunTime = getNextCronRunTime(schedule)whileTrue:
roundedDownTime = roundDownTime()if(roundedDownTime == nextRunTime):####################################### Do your periodic thing here. #######################################
nextRunTime = getNextCronRunTime(schedule)elif(roundedDownTime > nextRunTime):# We missed an execution. Error. Re initialize.
nextRunTime = getNextCronRunTime(schedule)
sleepTillTopOfNextMinute()
辅助程序:
from croniter import croniter
from datetime import datetime, timedelta
# Round time down to the top of the previous minutedef roundDownTime(dt=None, dateDelta=timedelta(minutes=1)):
roundTo = dateDelta.total_seconds()if dt ==None: dt = datetime.now()
seconds =(dt - dt.min).seconds
rounding =(seconds+roundTo/2)// roundTo * roundTo
return dt + timedelta(0,rounding-seconds,-dt.microsecond)# Get next run time from now, based on schedule specified by cron stringdef getNextCronRunTime(schedule):return croniter(schedule, datetime.now()).get_next(datetime)# Sleep till the top of the next minutedef sleepTillTopOfNextMinute():
t = datetime.utcnow()
sleeptime =60-(t.second + t.microsecond/1000000.0)
time.sleep(sleeptime)
None of the listed solutions even attempt to parse a complex cron schedule string. So, here is my version, using croniter. Basic gist:
schedule = "*/5 * * * *" # Run every five minutes
nextRunTime = getNextCronRunTime(schedule)
while True:
roundedDownTime = roundDownTime()
if (roundedDownTime == nextRunTime):
####################################
### Do your periodic thing here. ###
####################################
nextRunTime = getNextCronRunTime(schedule)
elif (roundedDownTime > nextRunTime):
# We missed an execution. Error. Re initialize.
nextRunTime = getNextCronRunTime(schedule)
sleepTillTopOfNextMinute()
Helper routines:
from croniter import croniter
from datetime import datetime, timedelta
# Round time down to the top of the previous minute
def roundDownTime(dt=None, dateDelta=timedelta(minutes=1)):
roundTo = dateDelta.total_seconds()
if dt == None : dt = datetime.now()
seconds = (dt - dt.min).seconds
rounding = (seconds+roundTo/2) // roundTo * roundTo
return dt + timedelta(0,rounding-seconds,-dt.microsecond)
# Get next run time from now, based on schedule specified by cron string
def getNextCronRunTime(schedule):
return croniter(schedule, datetime.now()).get_next(datetime)
# Sleep till the top of the next minute
def sleepTillTopOfNextMinute():
t = datetime.utcnow()
sleeptime = 60 - (t.second + t.microsecond/1000000.0)
time.sleep(sleeptime)
回答 7
我已经修改了脚本。
易于使用:
cron =Cron()
cron.add('* * * * *', minute_task)# every minute
cron.add('33 * * * *', day_task)# every hour
cron.add('34 18 * * *', day_task)# every day
cron.run()
classCronTab(object):def __init__(self,*events):
self.events = events
def run(self):
t=datetime(*datetime.now().timetuple()[:5])while1:for e in self.events:
e.check(t)
t += timedelta(minutes=1)
n = datetime.now()while n < t:
s =(t - n).seconds +1
time.sleep(s)
n = datetime.now()
The timing was out by one second leading to a one-second, hard loop at the end of each minute.
class CronTab(object):
def __init__(self, *events):
self.events = events
def run(self):
t=datetime(*datetime.now().timetuple()[:5])
while 1:
for e in self.events:
e.check(t)
t += timedelta(minutes=1)
n = datetime.now()
while n < t:
s = (t - n).seconds + 1
time.sleep(s)
n = datetime.now()
回答 9
没有“纯python”方法可以执行此操作,因为某些其他过程将必须启动python才能运行您的解决方案。每个平台都有一种或二十种不同的方式来启动流程和监视其进度。在UNIX平台上,cron是旧标准。在Mac OS X上,还启动了该功能,它将类似cron的启动与看门狗功能结合在一起,如果需要的话,可以使您的进程保持活动状态。python运行后,即可使用sched模块安排任务。
There isn’t a “pure python” way to do this because some other process would have to launch python in order to run your solution. Every platform will have one or twenty different ways to launch processes and monitor their progress. On unix platforms, cron is the old standard. On Mac OS X there is also launchd, which combines cron-like launching with watchdog functionality that can keep your process alive if that’s what you want. Once python is running, then you can use the sched module to schedule tasks.
@repeatEveryDay(hour=6, minutes=30)def sayHello(name):print(f"Hello {name}")
sayHello("Bob")# Now this function will be invoked every day at 6.30 a.m
装饰器将如下所示:
def repeatEveryDay(hour, minutes=0, seconds=0):"""
Decorator that will run the decorated function everyday at that hour, minutes and seconds.
:param hour: 0-24
:param minutes: 0-60 (Optional)
:param seconds: 0-60 (Optional)
"""def decoratorRepeat(func):@functools.wraps(func)def wrapperRepeat(*args,**kwargs):def getLocalTime():return datetime.datetime.fromtimestamp(time.mktime(time.localtime()))# Get the datetime of the first function call
td = datetime.timedelta(seconds=15)if wrapperRepeat.nextSent ==None:
now = getLocalTime()
wrapperRepeat.nextSent = datetime.datetime(now.year, now.month, now.day, hour, minutes, seconds)if wrapperRepeat.nextSent < now:
wrapperRepeat.nextSent += td
# Waiting till next daywhile getLocalTime()< wrapperRepeat.nextSent:
time.sleep(1)# Call the function
func(*args,**kwargs)# Get the datetime of the next function call
wrapperRepeat.nextSent += td
wrapperRepeat(*args,**kwargs)
wrapperRepeat.nextSent =Nonereturn wrapperRepeat
return decoratorRepeat
I know there are a lot of answers, but another solution could be to go with decorators. This is an example to repeat a function everyday at a specific time. The cool think about using this way is that you only need to add the Syntactic Sugar to the function you want to schedule:
@repeatEveryDay(hour=6, minutes=30)
def sayHello(name):
print(f"Hello {name}")
sayHello("Bob") # Now this function will be invoked every day at 6.30 a.m
And the decorator will look like:
def repeatEveryDay(hour, minutes=0, seconds=0):
"""
Decorator that will run the decorated function everyday at that hour, minutes and seconds.
:param hour: 0-24
:param minutes: 0-60 (Optional)
:param seconds: 0-60 (Optional)
"""
def decoratorRepeat(func):
@functools.wraps(func)
def wrapperRepeat(*args, **kwargs):
def getLocalTime():
return datetime.datetime.fromtimestamp(time.mktime(time.localtime()))
# Get the datetime of the first function call
td = datetime.timedelta(seconds=15)
if wrapperRepeat.nextSent == None:
now = getLocalTime()
wrapperRepeat.nextSent = datetime.datetime(now.year, now.month, now.day, hour, minutes, seconds)
if wrapperRepeat.nextSent < now:
wrapperRepeat.nextSent += td
# Waiting till next day
while getLocalTime() < wrapperRepeat.nextSent:
time.sleep(1)
# Call the function
func(*args, **kwargs)
# Get the datetime of the next function call
wrapperRepeat.nextSent += td
wrapperRepeat(*args, **kwargs)
wrapperRepeat.nextSent = None
return wrapperRepeat
return decoratorRepeat
Brian’s solution is working quite well. However, as others have pointed out, there is a subtle bug in the run code. Also i found it overly complicated for the needs.
Here is my simpler and functional alternative for the run code in case anybody needs it:
def run(self):
while 1:
t = datetime.now()
for e in self.events:
e.check(t)
time.sleep(60 - t.second - t.microsecond / 1000000.0)
回答 12
另一个简单的解决方案是:
from aqcron importAtfrom time import sleep
from datetime import datetime
# Event scheduling
event_1 =At( second=5)
event_2 =At( second=[0,20,40])whileTrue:
now = datetime.now()# Event checkif now in event_1:print"event_1"if now in event_2:print"event_2"
sleep(1)
而类aqcron.At是:
# aqcron.pyclassAt(object):def __init__(self, year=None, month=None,
day=None, weekday=None,
hour=None, minute=None,
second=None):
loc = locals()
loc.pop("self")
self.at = dict((k, v)for k, v in loc.iteritems()if v !=None)def __contains__(self, now):for k in self.at.keys():try:ifnot getattr(now, k)in self.at[k]:returnFalseexceptTypeError:if self.at[k]!= getattr(now, k):returnFalsereturnTrue
from aqcron import At
from time import sleep
from datetime import datetime
# Event scheduling
event_1 = At( second=5 )
event_2 = At( second=[0,20,40] )
while True:
now = datetime.now()
# Event check
if now in event_1: print "event_1"
if now in event_2: print "event_2"
sleep(1)
And the class aqcron.At is:
# aqcron.py
class At(object):
def __init__(self, year=None, month=None,
day=None, weekday=None,
hour=None, minute=None,
second=None):
loc = locals()
loc.pop("self")
self.at = dict((k, v) for k, v in loc.iteritems() if v != None)
def __contains__(self, now):
for k in self.at.keys():
try:
if not getattr(now, k) in self.at[k]: return False
except TypeError:
if self.at[k] != getattr(now, k): return False
return True
If you are looking for a distributed scheduler, you can check out https://github.com/sherinkurian/mani – it does need redis though so might not be what you are looking for. (note that i am the author)
this was built to ensure fault-tolerance by having clock run on more than one node.
I don’t know if something like that already exists. It would be easy to write your own with time, datetime and/or calendar modules, see http://docs.python.org/library/time.html
The only concern for a python solution is that your job needs to be always running and possibly be automatically “resurrected” after a reboot, something for which you do need to rely on system dependent solutions.
You can check out PiCloud’s [1] Crons [2], but do note that your jobs won’t be running on your own machine. It’s also a service that you’ll need to pay for if you use more than 20 hours of compute time a month.