问题:每x秒重复执行一个函数的最佳方法是什么?

我想永远每60秒重复执行一次Python中的函数(就像Objective C中的NSTimer一样)。这段代码将作为守护程序运行,实际上就像使用cron每一分钟调用python脚本一样,但是不需要用户设置。

有关使用Python实现的cron的问题中,该解决方案似乎实际上只是将sleep()停留了x秒。我不需要这种高级功能,所以也许这样的事情会起作用

while True:
    # Code executed here
    time.sleep(60)

此代码是否存在任何可预见的问题?

I want to repeatedly execute a function in Python every 60 seconds forever (just like an NSTimer in Objective C). This code will run as a daemon and is effectively like calling the python script every minute using a cron, but without requiring that to be set up by the user.

In this question about a cron implemented in Python, the solution appears to effectively just sleep() for x seconds. I don’t need such advanced functionality so perhaps something like this would work

while True:
    # Code executed here
    time.sleep(60)

Are there any foreseeable problems with this code?


回答 0

如果您的程序还没有事件循环,请使用sched模块,该模块实现了通用事件调度程序。

import sched, time
s = sched.scheduler(time.time, time.sleep)
def do_something(sc): 
    print("Doing stuff...")
    # do your stuff
    s.enter(60, 1, do_something, (sc,))

s.enter(60, 1, do_something, (s,))
s.run()

如果你已经在使用一个事件循环库一样asynciotriotkinterPyQt5gobjectkivy,和许多其他人-只是用您现有的事件循环库的方法安排任务,来代替。

If your program doesn’t have a event loop already, use the sched module, which implements a general purpose event scheduler.

import sched, time
s = sched.scheduler(time.time, time.sleep)
def do_something(sc): 
    print("Doing stuff...")
    # do your stuff
    s.enter(60, 1, do_something, (sc,))

s.enter(60, 1, do_something, (s,))
s.run()

If you’re already using an event loop library like asyncio, trio, tkinter, PyQt5, gobject, kivy, and many others – just schedule the task using your existing event loop library’s methods, instead.


回答 1

只需将您的时间循环锁定到系统时钟即可。简单。

import time
starttime=time.time()
while True:
  print "tick"
  time.sleep(60.0 - ((time.time() - starttime) % 60.0))

Lock your time loop to the system clock like this:

import time
starttime = time.time()
while True:
    print "tick"
    time.sleep(60.0 - ((time.time() - starttime) % 60.0))

回答 2

您可能需要考虑Twisted,它是实现Reactor Pattern的Python网络库。

from twisted.internet import task, reactor

timeout = 60.0 # Sixty seconds

def doWork():
    #do work here
    pass

l = task.LoopingCall(doWork)
l.start(timeout) # call every sixty seconds

reactor.run()

虽然“ while True:sleep(60)”可能会起作用Twisted可能已经实现了您最终将需要的许多功能(bobince指出的守护进程,日志记录或异常处理),并且可能是一种更强大的解决方案

You might want to consider Twisted which is a Python networking library that implements the Reactor Pattern.

from twisted.internet import task, reactor

timeout = 60.0 # Sixty seconds

def doWork():
    #do work here
    pass

l = task.LoopingCall(doWork)
l.start(timeout) # call every sixty seconds

reactor.run()

While “while True: sleep(60)” will probably work Twisted probably already implements many of the features that you will eventually need (daemonization, logging or exception handling as pointed out by bobince) and will probably be a more robust solution


回答 3

如果您希望采用非阻塞方式定期执行函数,而不是阻塞无限循环,我会使用线程计时器。这样,您的代码可以继续运行并执行其他任务,并且仍然每n秒调用一次函数。我经常使用这种技术来打印长时间的,CPU /磁盘/网络密集型任务的进度信息。

这是我在类似问题中发布的代码,其中包含start()和stop()控件:

from threading import Timer

class RepeatedTimer(object):
    def __init__(self, interval, function, *args, **kwargs):
        self._timer     = None
        self.interval   = interval
        self.function   = function
        self.args       = args
        self.kwargs     = kwargs
        self.is_running = False
        self.start()

    def _run(self):
        self.is_running = False
        self.start()
        self.function(*self.args, **self.kwargs)

    def start(self):
        if not self.is_running:
            self._timer = Timer(self.interval, self._run)
            self._timer.start()
            self.is_running = True

    def stop(self):
        self._timer.cancel()
        self.is_running = False

用法:

from time import sleep

def hello(name):
    print "Hello %s!" % name

print "starting..."
rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start()
try:
    sleep(5) # your long-running job goes here...
finally:
    rt.stop() # better in a try/finally block to make sure the program ends!

特征:

  • 仅标准库,无外部依赖
  • start()并且stop()即使计时器已经启动/停止也可以安全地多次拨打
  • 要调用的函数可以具有位置和命名参数
  • 您可以interval随时更改,它将在下次运行后生效。同样argskwargs甚至function

If you want a non-blocking way to execute your function periodically, instead of a blocking infinite loop I’d use a threaded timer. This way your code can keep running and perform other tasks and still have your function called every n seconds. I use this technique a lot for printing progress info on long, CPU/Disk/Network intensive tasks.

Here’s the code I’ve posted in a similar question, with start() and stop() control:

from threading import Timer

class RepeatedTimer(object):
    def __init__(self, interval, function, *args, **kwargs):
        self._timer     = None
        self.interval   = interval
        self.function   = function
        self.args       = args
        self.kwargs     = kwargs
        self.is_running = False
        self.start()

    def _run(self):
        self.is_running = False
        self.start()
        self.function(*self.args, **self.kwargs)

    def start(self):
        if not self.is_running:
            self._timer = Timer(self.interval, self._run)
            self._timer.start()
            self.is_running = True

    def stop(self):
        self._timer.cancel()
        self.is_running = False

Usage:

from time import sleep

def hello(name):
    print "Hello %s!" % name

print "starting..."
rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start()
try:
    sleep(5) # your long-running job goes here...
finally:
    rt.stop() # better in a try/finally block to make sure the program ends!

Features:

  • Standard library only, no external dependencies
  • start() and stop() are safe to call multiple times even if the timer has already started/stopped
  • function to be called can have positional and named arguments
  • You can change interval anytime, it will be effective after next run. Same for args, kwargs and even function!

回答 4

我认为更简单的方法是:

import time

def executeSomething():
    #code here
    time.sleep(60)

while True:
    executeSomething()

这样,您的代码将被执行,然后等待60秒,然后再次执行,然后等待执行,等等。。。

The easier way I believe to be:

import time

def executeSomething():
    #code here
    time.sleep(60)

while True:
    executeSomething()

This way your code is executed, then it waits 60 seconds then it executes again, waits, execute, etc… No need to complicate things :D


回答 5

import time, traceback

def every(delay, task):
  next_time = time.time() + delay
  while True:
    time.sleep(max(0, next_time - time.time()))
    try:
      task()
    except Exception:
      traceback.print_exc()
      # in production code you might want to have this instead of course:
      # logger.exception("Problem while executing repetitive task.")
    # skip tasks if we are behind schedule:
    next_time += (time.time() - next_time) // delay * delay + delay

def foo():
  print("foo", time.time())

every(5, foo)

如果要执行此操作而又不阻塞其余代码,则可以使用它使它在自己的线程中运行:

import threading
threading.Thread(target=lambda: every(5, foo)).start()

该解决方案结合了其他解决方案中很少发现的几个功能:

  • 异常处理:尽可能在此级别上正确处理异常,即在不中断程序的情况下记录日志以进行调试。
  • 没有链接:在许多答案中发现的常见链式实现(用于安排下一个事件的安排)在以下方面是脆弱的:如果在安排机制内出现任何问题(threading.Timer或其他任何原因)出了问题,它将终止链。即使问题的原因已经解决,也不会再执行任何操作。与之sleep()相比,简单的循环和简单的等待要健壮得多。
  • 无漂移:永无止境我的解决方案准确跟踪了预期运行时间。取决于执行时间,没有漂移(与许多其他解决方案一样)。
  • 跳过:如果一次执行花费了太多时间(例如,每五秒钟执行X次,但是X花费6秒执行一次),我的解决方案将跳过任务。这是标准的cron行为(并且有充分的理由)。然后,许多其他解决方案只需连续执行几次任务即可,而不会出现任何延迟。对于大多数情况(例如清理任务),这是不希望的。如果希望,简单地使用next_time += delay来代替。
import time, traceback

def every(delay, task):
  next_time = time.time() + delay
  while True:
    time.sleep(max(0, next_time - time.time()))
    try:
      task()
    except Exception:
      traceback.print_exc()
      # in production code you might want to have this instead of course:
      # logger.exception("Problem while executing repetitive task.")
    # skip tasks if we are behind schedule:
    next_time += (time.time() - next_time) // delay * delay + delay

def foo():
  print("foo", time.time())

every(5, foo)

If you want to do this without blocking your remaining code, you can use this to let it run in its own thread:

import threading
threading.Thread(target=lambda: every(5, foo)).start()

This solution combines several features rarely found combined in the other solutions:

  • Exception handling: As far as possible on this level, exceptions are handled properly, i. e. get logged for debugging purposes without aborting our program.
  • No chaining: The common chain-like implementation (for scheduling the next event) you find in many answers is brittle in the aspect that if anything goes wrong within the scheduling mechanism (threading.Timer or whatever), this will terminate the chain. No further executions will happen then, even if the reason of the problem is already fixed. A simple loop and waiting with a simple sleep() is much more robust in comparison.
  • No drift: My solution keeps an exact track of the times it is supposed to run at. There is no drift depending on the execution time (as in many other solutions).
  • Skipping: My solution will skip tasks if one execution took too much time (e. g. do X every five seconds, but X took 6 seconds). This is the standard cron behavior (and for a good reason). Many other solutions then simply execute the task several times in a row without any delay. For most cases (e. g. cleanup tasks) this is not wished. If it is wished, simply use next_time += delay instead.

回答 6

这是对MestreLion的代码的更新,可以避免随着时间的推移而变坏。

在此,RepeatedTimer类按照OP的请求每隔“间隔”秒调用给定函数;时间表不取决于函数执行所需的时间。我喜欢此解决方案,因为它没有外部库依赖项;这只是纯python。

import threading 
import time

class RepeatedTimer(object):
  def __init__(self, interval, function, *args, **kwargs):
    self._timer = None
    self.interval = interval
    self.function = function
    self.args = args
    self.kwargs = kwargs
    self.is_running = False
    self.next_call = time.time()
    self.start()

  def _run(self):
    self.is_running = False
    self.start()
    self.function(*self.args, **self.kwargs)

  def start(self):
    if not self.is_running:
      self.next_call += self.interval
      self._timer = threading.Timer(self.next_call - time.time(), self._run)
      self._timer.start()
      self.is_running = True

  def stop(self):
    self._timer.cancel()
    self.is_running = False

用法示例(从MestreLion的答案中复制):

from time import sleep

def hello(name):
    print "Hello %s!" % name

print "starting..."
rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start()
try:
    sleep(5) # your long-running job goes here...
finally:
    rt.stop() # better in a try/finally block to make sure the program ends!

Here’s an update to the code from MestreLion that avoids drifiting over time.

The RepeatedTimer class here calls the given function every “interval” seconds as requested by the OP; the schedule doesn’t depend on how long the function takes to execute. I like this solution since it doesn’t have external library dependencies; this is just pure python.

import threading 
import time

class RepeatedTimer(object):
  def __init__(self, interval, function, *args, **kwargs):
    self._timer = None
    self.interval = interval
    self.function = function
    self.args = args
    self.kwargs = kwargs
    self.is_running = False
    self.next_call = time.time()
    self.start()

  def _run(self):
    self.is_running = False
    self.start()
    self.function(*self.args, **self.kwargs)

  def start(self):
    if not self.is_running:
      self.next_call += self.interval
      self._timer = threading.Timer(self.next_call - time.time(), self._run)
      self._timer.start()
      self.is_running = True

  def stop(self):
    self._timer.cancel()
    self.is_running = False

Sample usage (copied from MestreLion’s answer):

from time import sleep

def hello(name):
    print "Hello %s!" % name

print "starting..."
rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start()
try:
    sleep(5) # your long-running job goes here...
finally:
    rt.stop() # better in a try/finally block to make sure the program ends!

回答 7

不久前,我也遇到了类似的问题。可能是http://cronus.readthedocs.org可能有帮助?

对于v0.2,以下代码段有效

import cronus.beat as beat

beat.set_rate(2) # 2 Hz
while beat.true():
    # do some time consuming work here
    beat.sleep() # total loop duration would be 0.5 sec

I faced a similar problem some time back. May be http://cronus.readthedocs.org might help?

For v0.2, the following snippet works

import cronus.beat as beat

beat.set_rate(2) # 2 Hz
while beat.true():
    # do some time consuming work here
    beat.sleep() # total loop duration would be 0.5 sec

回答 8

那和cron之间的主要区别是,异常将永久杀死守护程序。您可能需要包装异常捕获器和记录器。

The main difference between that and cron is that an exception will kill the daemon for good. You might want to wrap with an exception catcher and logger.


回答 9

一个可能的答案:

import time
t=time.time()

while True:
    if time.time()-t>10:
        #run your task here
        t=time.time()

One possible answer:

import time
t=time.time()

while True:
    if time.time()-t>10:
        #run your task here
        t=time.time()

回答 10

我最终使用了调度模块。该API很不错。

import schedule
import time

def job():
    print("I'm working...")

schedule.every(10).minutes.do(job)
schedule.every().hour.do(job)
schedule.every().day.at("10:30").do(job)
schedule.every(5).to(10).minutes.do(job)
schedule.every().monday.do(job)
schedule.every().wednesday.at("13:15").do(job)
schedule.every().minute.at(":17").do(job)

while True:
    schedule.run_pending()
    time.sleep(1)

I ended up using the schedule module. The API is nice.

import schedule
import time

def job():
    print("I'm working...")

schedule.every(10).minutes.do(job)
schedule.every().hour.do(job)
schedule.every().day.at("10:30").do(job)
schedule.every(5).to(10).minutes.do(job)
schedule.every().monday.do(job)
schedule.every().wednesday.at("13:15").do(job)
schedule.every().minute.at(":17").do(job)

while True:
    schedule.run_pending()
    time.sleep(1)

回答 11

我使用Tkinter after()方法,它不会“窃取游戏”(就像前面介绍的sched模块一样),即它允许其他事情并行运行:

import Tkinter

def do_something1():
  global n1
  n1 += 1
  if n1 == 6: # (Optional condition)
    print "* do_something1() is done *"; return
  # Do your stuff here
  # ...
  print "do_something1() "+str(n1)
  tk.after(1000, do_something1)

def do_something2(): 
  global n2
  n2 += 1
  if n2 == 6: # (Optional condition)
    print "* do_something2() is done *"; return
  # Do your stuff here
  # ...
  print "do_something2() "+str(n2)
  tk.after(500, do_something2)

tk = Tkinter.Tk(); 
n1 = 0; n2 = 0
do_something1()
do_something2()
tk.mainloop()

do_something1()do_something2()可以以任意速度间隔并行运行。在这里,第二个函数的执行速度是第二个函数的两倍。还要注意,我使用一个简单的计数器作为终止这两个函数的条件。您可以使用任何喜欢的其他条件,也可以不使用任何条件,直到在程序终止之前要运行什么功能(例如时钟)。

I use Tkinter after() method, which doesn’t “steal the game” (like the sched module that was presented earlier), i.e. it allows other things to run in parallel:

import Tkinter

def do_something1():
  global n1
  n1 += 1
  if n1 == 6: # (Optional condition)
    print "* do_something1() is done *"; return
  # Do your stuff here
  # ...
  print "do_something1() "+str(n1)
  tk.after(1000, do_something1)

def do_something2(): 
  global n2
  n2 += 1
  if n2 == 6: # (Optional condition)
    print "* do_something2() is done *"; return
  # Do your stuff here
  # ...
  print "do_something2() "+str(n2)
  tk.after(500, do_something2)

tk = Tkinter.Tk(); 
n1 = 0; n2 = 0
do_something1()
do_something2()
tk.mainloop()

do_something1() and do_something2() can run in parallel and in whatever interval speed. Here, the 2nd one will be executed twice as fast.Note also that I have used a simple counter as a condition to terminate either function. You can use whatever other contition you like or none if you what a function to run until the program terminates (e.g. a clock).


回答 12

这是MestreLion的代码的改编版本。除了原始功能外,此代码:

1)添加用于在特定时间触发计时器的first_interval(调用者需要计算first_interval并传入)

2)解决原始代码中的竞争条件。在原始代码中,如果控制线程未能取消正在运行的计时器(“停止计时器,并取消执行计时器的操作。这仅在计时器仍处于等待阶段时才有效。”引自https:// docs.python.org/2/library/threading.html),计时器将无休止地运行。

class RepeatedTimer(object):
def __init__(self, first_interval, interval, func, *args, **kwargs):
    self.timer      = None
    self.first_interval = first_interval
    self.interval   = interval
    self.func   = func
    self.args       = args
    self.kwargs     = kwargs
    self.running = False
    self.is_started = False

def first_start(self):
    try:
        # no race-condition here because only control thread will call this method
        # if already started will not start again
        if not self.is_started:
            self.is_started = True
            self.timer = Timer(self.first_interval, self.run)
            self.running = True
            self.timer.start()
    except Exception as e:
        log_print(syslog.LOG_ERR, "timer first_start failed %s %s"%(e.message, traceback.format_exc()))
        raise

def run(self):
    # if not stopped start again
    if self.running:
        self.timer = Timer(self.interval, self.run)
        self.timer.start()
    self.func(*self.args, **self.kwargs)

def stop(self):
    # cancel current timer in case failed it's still OK
    # if already stopped doesn't matter to stop again
    if self.timer:
        self.timer.cancel()
    self.running = False

Here’s an adapted version to the code from MestreLion. In addition to the original function, this code:

1) add first_interval used to fire the timer at a specific time(caller need to calculate the first_interval and pass in)

2) solve a race-condition in original code. In the original code, if control thread failed to cancel the running timer(“Stop the timer, and cancel the execution of the timer’s action. This will only work if the timer is still in its waiting stage.” quoted from https://docs.python.org/2/library/threading.html), the timer will run endlessly.

class RepeatedTimer(object):
def __init__(self, first_interval, interval, func, *args, **kwargs):
    self.timer      = None
    self.first_interval = first_interval
    self.interval   = interval
    self.func   = func
    self.args       = args
    self.kwargs     = kwargs
    self.running = False
    self.is_started = False

def first_start(self):
    try:
        # no race-condition here because only control thread will call this method
        # if already started will not start again
        if not self.is_started:
            self.is_started = True
            self.timer = Timer(self.first_interval, self.run)
            self.running = True
            self.timer.start()
    except Exception as e:
        log_print(syslog.LOG_ERR, "timer first_start failed %s %s"%(e.message, traceback.format_exc()))
        raise

def run(self):
    # if not stopped start again
    if self.running:
        self.timer = Timer(self.interval, self.run)
        self.timer.start()
    self.func(*self.args, **self.kwargs)

def stop(self):
    # cancel current timer in case failed it's still OK
    # if already stopped doesn't matter to stop again
    if self.timer:
        self.timer.cancel()
    self.running = False

回答 13

这似乎比公认的解决方案简单得多-是否有我没有考虑的缺点?来到这里寻找一些简单的复制面食,感到失望。

import threading, time

def print_every_n_seconds(n=2):
    while True:
        print(time.ctime())
        time.sleep(n)

thread = threading.Thread(target=print_every_n_seconds, daemon=True)
thread.start()

异步输出。

#Tue Oct 16 17:29:40 2018
#Tue Oct 16 17:29:42 2018
#Tue Oct 16 17:29:44 2018

从某种意义上说,它确实存在偏差,如果正在运行的任务花费大量时间,则间隔变为2秒+任务时间,因此,如果您需要精确的计划,那么这不适合您。

请注意,该daemon=True标志表示该线程不会阻止应用程序关闭。例如,有一个问题,pytest在运行测试等待该主题停止运行后,会无限期挂起。

This seems much simpler than accepted solution – does it have shortcomings I’m not considering? Came here looking for some dead-simple copy pasta.

import threading, time

def print_every_n_seconds(n=2):
    while True:
        print(time.ctime())
        time.sleep(n)
    
thread = threading.Thread(target=print_every_n_seconds, daemon=True)
thread.start()

Which asynchronously outputs.

#Tue Oct 16 17:29:40 2018
#Tue Oct 16 17:29:42 2018
#Tue Oct 16 17:29:44 2018

This does have drift. If the task being run takes appreciable amount of time, then the interval becomes 2 seconds + task time, so if you need precise scheduling then this is not for you.**

Note the daemon=True flag means this thread won’t block the app from shutting down. For example, had issue where pytest would hang indefinitely after running tests waiting for this thead to cease.


回答 14

我用它来每小时导致60个事件,而大多数事件在整分钟后的相同秒数内发生:

import math
import time
import random

TICK = 60 # one minute tick size
TICK_TIMING = 59 # execute on 59th second of the tick
TICK_MINIMUM = 30 # minimum catch up tick size when lagging

def set_timing():

    now = time.time()
    elapsed = now - info['begin']
    minutes = math.floor(elapsed/TICK)
    tick_elapsed = now - info['completion_time']
    if (info['tick']+1) > minutes:
        wait = max(0,(TICK_TIMING-(time.time() % TICK)))
        print ('standard wait: %.2f' % wait)
        time.sleep(wait)
    elif tick_elapsed < TICK_MINIMUM:
        wait = TICK_MINIMUM-tick_elapsed
        print ('minimum wait: %.2f' % wait)
        time.sleep(wait)
    else:
        print ('skip set_timing(); no wait')
    drift = ((time.time() - info['begin']) - info['tick']*TICK -
        TICK_TIMING + info['begin']%TICK)
    print ('drift: %.6f' % drift)

info['tick'] = 0
info['begin'] = time.time()
info['completion_time'] = info['begin'] - TICK

while 1:

    set_timing()

    print('hello world')

    #random real world event
    time.sleep(random.random()*TICK_MINIMUM)

    info['tick'] += 1
    info['completion_time'] = time.time()

根据实际情况,您可能会发现一些问题:

60,60,62,58,60,60,120,30,30,60,60,60,60,60...etc.

但在60分钟结束时,您将有60个滴答声;并且大多数会以您希望的分钟数正确偏移。

在我的系统上,我得到的典型漂移小于<1/20秒,直到需要校正为止。

这种方法的优点是可以解决时钟漂移问题。如果您正在做一些事情,例如每滴答声追加一项,而您希望每小时追加60项,则可能会引起问题。不能考虑漂移会导致诸如移动平均线之类的次要指示,以至于认为数据太过深了而导致输出错误。

I use this to cause 60 events per hour with most events occurring at the same number of seconds after the whole minute:

import math
import time
import random

TICK = 60 # one minute tick size
TICK_TIMING = 59 # execute on 59th second of the tick
TICK_MINIMUM = 30 # minimum catch up tick size when lagging

def set_timing():

    now = time.time()
    elapsed = now - info['begin']
    minutes = math.floor(elapsed/TICK)
    tick_elapsed = now - info['completion_time']
    if (info['tick']+1) > minutes:
        wait = max(0,(TICK_TIMING-(time.time() % TICK)))
        print ('standard wait: %.2f' % wait)
        time.sleep(wait)
    elif tick_elapsed < TICK_MINIMUM:
        wait = TICK_MINIMUM-tick_elapsed
        print ('minimum wait: %.2f' % wait)
        time.sleep(wait)
    else:
        print ('skip set_timing(); no wait')
    drift = ((time.time() - info['begin']) - info['tick']*TICK -
        TICK_TIMING + info['begin']%TICK)
    print ('drift: %.6f' % drift)

info['tick'] = 0
info['begin'] = time.time()
info['completion_time'] = info['begin'] - TICK

while 1:

    set_timing()

    print('hello world')

    #random real world event
    time.sleep(random.random()*TICK_MINIMUM)

    info['tick'] += 1
    info['completion_time'] = time.time()

Depending upon actual conditions you might get ticks of length:

60,60,62,58,60,60,120,30,30,60,60,60,60,60...etc.

but at the end of 60 minutes you’ll have 60 ticks; and most of them will occur at the correct offset to the minute you prefer.

On my system I get typical drift of < 1/20th of a second until need for correction arises.

The advantage of this method is resolution of clock drift; which can cause issues if you’re doing things like appending one item per tick and you expect 60 items appended per hour. Failure to account for drift can cause secondary indications like moving averages to consider data too deep into the past resulting in faulty output.


回答 15

例如,显示当前本地时间

import datetime
import glib
import logger

def get_local_time():
    current_time = datetime.datetime.now().strftime("%H:%M")
    logger.info("get_local_time(): %s",current_time)
    return str(current_time)

def display_local_time():
    logger.info("Current time is: %s", get_local_time())
    return True

# call every minute
glib.timeout_add(60*1000, display_local_time)

e.g., Display current local time

import datetime
import glib
import logger

def get_local_time():
    current_time = datetime.datetime.now().strftime("%H:%M")
    logger.info("get_local_time(): %s",current_time)
    return str(current_time)

def display_local_time():
    logger.info("Current time is: %s", get_local_time())
    return True

# call every minute
glib.timeout_add(60*1000, display_local_time)

回答 16

    ''' tracking number of times it prints'''
import threading

global timeInterval
count=0
def printit():
  threading.Timer(timeInterval, printit).start()
  print( "Hello, World!")
  global count
  count=count+1
  print(count)
printit

if __name__ == "__main__":
    timeInterval= int(input('Enter Time in Seconds:'))
    printit()
    ''' tracking number of times it prints'''
import threading

global timeInterval
count=0
def printit():
  threading.Timer(timeInterval, printit).start()
  print( "Hello, World!")
  global count
  count=count+1
  print(count)
printit

if __name__ == "__main__":
    timeInterval= int(input('Enter Time in Seconds:'))
    printit()

回答 17

这是不使用任何额外库的另一种解决方案。

def delay_until(condition_fn, interval_in_sec, timeout_in_sec):
    """Delay using a boolean callable function.

    `condition_fn` is invoked every `interval_in_sec` until `timeout_in_sec`.
    It can break early if condition is met.

    Args:
        condition_fn     - a callable boolean function
        interval_in_sec  - wait time between calling `condition_fn`
        timeout_in_sec   - maximum time to run

    Returns: None
    """
    start = last_call = time.time()
    while time.time() - start < timeout_in_sec:
        if (time.time() - last_call) > interval_in_sec:
            if condition_fn() is True:
                break
            last_call = time.time()

Here is another solution without using any extra libaries.

def delay_until(condition_fn, interval_in_sec, timeout_in_sec):
    """Delay using a boolean callable function.

    `condition_fn` is invoked every `interval_in_sec` until `timeout_in_sec`.
    It can break early if condition is met.

    Args:
        condition_fn     - a callable boolean function
        interval_in_sec  - wait time between calling `condition_fn`
        timeout_in_sec   - maximum time to run

    Returns: None
    """
    start = last_call = time.time()
    while time.time() - start < timeout_in_sec:
        if (time.time() - last_call) > interval_in_sec:
            if condition_fn() is True:
                break
            last_call = time.time()

声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。