I want to fire off a function every 0.5 seconds and be able to start and stop and reset the timer. I'm not too knowledgeable of how Python threads work and am having difficulties with the python timer.
However, I keep getting RuntimeError: threads can only be started once
when I execute threading.timer.start()
twice. Is there a work around for this? I tried applying threading.timer.cancel()
before each start.
Pseudo code:
t=threading.timer(0.5,function)
while True:
t.cancel()
t.start()
This question is related to
python
python-3.x
python-2.7
I have implemented a class that works as a timer.
I leave the link here in case anyone needs it: https://github.com/ivanhalencp/python/tree/master/xTimer
Improving a little on Hans Then's answer, we can just subclass the Timer function. The following becomes our entire "repeat timer" code, and it can be used as a drop-in replacement for threading.Timer with all the same arguments:
from threading import Timer
class RepeatTimer(Timer):
def run(self):
while not self.finished.wait(self.interval):
self.function(*self.args, **self.kwargs)
Usage example:
def dummyfn(msg="foo"):
print(msg)
timer = RepeatTimer(1, dummyfn)
timer.start()
time.sleep(5)
timer.cancel()
produces the following output:
foo
foo
foo
foo
and
timer = RepeatTimer(1, dummyfn, args=("bar",))
timer.start()
time.sleep(5)
timer.cancel()
produces
bar
bar
bar
bar
This is an alternate implementation using function instead of class. Inspired by @Andrew Wilkins above.
Because wait is more accurate than sleep ( it takes function runtime into account ):
import threading
PING_ON = threading.Event()
def ping():
while not PING_ON.wait(1):
print("my thread %s" % str(threading.current_thread().ident))
t = threading.Thread(target=ping)
t.start()
sleep(5)
PING_ON.set()
from threading import Timer
def TaskManager():
#do stuff
t = Timer( 1, TaskManager )
t.start()
TaskManager()
Here is small sample, it will help beter understanding how it runs. function taskManager() at the end create delayed function call to it self.
Try to change "dalay" variable and you will able to see difference
from threading import Timer, _sleep
# ------------------------------------------
DATA = []
dalay = 0.25 # sec
counter = 0
allow_run = True
FIFO = True
def taskManager():
global counter, DATA, delay, allow_run
counter += 1
if len(DATA) > 0:
if FIFO:
print("["+str(counter)+"] new data: ["+str(DATA.pop(0))+"]")
else:
print("["+str(counter)+"] new data: ["+str(DATA.pop())+"]")
else:
print("["+str(counter)+"] no data")
if allow_run:
#delayed method/function call to it self
t = Timer( dalay, taskManager )
t.start()
else:
print(" END task-manager: disabled")
# ------------------------------------------
def main():
DATA.append("data from main(): 0")
_sleep(2)
DATA.append("data from main(): 1")
_sleep(2)
# ------------------------------------------
print(" START task-manager:")
taskManager()
_sleep(2)
DATA.append("first data")
_sleep(2)
DATA.append("second data")
print(" START main():")
main()
print(" END main():")
_sleep(2)
DATA.append("last data")
allow_run = False
From Equivalent of setInterval in python:
import threading
def setInterval(interval):
def decorator(function):
def wrapper(*args, **kwargs):
stopped = threading.Event()
def loop(): # executed in another thread
while not stopped.wait(interval): # until stopped
function(*args, **kwargs)
t = threading.Thread(target=loop)
t.daemon = True # stop if the program exits
t.start()
return stopped
return wrapper
return decorator
Usage:
@setInterval(.5)
def function():
"..."
stop = function() # start timer, the first call is in .5 seconds
stop.set() # stop the loop
stop = function() # start new timer
# ...
stop.set()
Or here's the same functionality but as a standalone function instead of a decorator:
cancel_future_calls = call_repeatedly(60, print, "Hello, World")
# ...
cancel_future_calls()
In addition to the above great answers using Threads, in case you have to use your main thread or prefer an async approach - I wrapped a short class around aio_timers Timer class (to enable repeating)
import asyncio
from aio_timers import Timer
class RepeatingAsyncTimer():
def __init__(self, interval, cb, *args, **kwargs):
self.interval = interval
self.cb = cb
self.args = args
self.kwargs = kwargs
self.aio_timer = None
self.start_timer()
def start_timer(self):
self.aio_timer = Timer(delay=self.interval,
callback=self.cb_wrapper,
callback_args=self.args,
callback_kwargs=self.kwargs
)
def cb_wrapper(self, *args, **kwargs):
self.cb(*args, **kwargs)
self.start_timer()
from time import time
def cb(timer_name):
print(timer_name, time())
print(f'clock starts at: {time()}')
timer_1 = RepeatingAsyncTimer(interval=5, cb=cb, timer_name='timer_1')
timer_2 = RepeatingAsyncTimer(interval=10, cb=cb, timer_name='timer_2')
clock starts at: 1602438840.9690785
timer_1 1602438845.980087
timer_2 1602438850.9806316
timer_1 1602438850.9808934
timer_1 1602438855.9863033
timer_2 1602438860.9868324
timer_1 1602438860.9876585
I have changed some code in swapnil-jariwala code to make a little console clock.
from threading import Timer, Thread, Event
from datetime import datetime
class PT():
def __init__(self, t, hFunction):
self.t = t
self.hFunction = hFunction
self.thread = Timer(self.t, self.handle_function)
def handle_function(self):
self.hFunction()
self.thread = Timer(self.t, self.handle_function)
self.thread.start()
def start(self):
self.thread.start()
def printer():
tempo = datetime.today()
h,m,s = tempo.hour, tempo.minute, tempo.second
print(f"{h}:{m}:{s}")
t = PT(1, printer)
t.start()
OUTPUT
>>> 11:39:11
11:39:12
11:39:13
11:39:14
11:39:15
11:39:16
...
This code puts the clock timer in a little window with tkinter
from threading import Timer, Thread, Event
from datetime import datetime
import tkinter as tk
app = tk.Tk()
lab = tk.Label(app, text="Timer will start in a sec")
lab.pack()
class perpetualTimer():
def __init__(self, t, hFunction):
self.t = t
self.hFunction = hFunction
self.thread = Timer(self.t, self.handle_function)
def handle_function(self):
self.hFunction()
self.thread = Timer(self.t, self.handle_function)
self.thread.start()
def start(self):
self.thread.start()
def cancel(self):
self.thread.cancel()
def printer():
tempo = datetime.today()
clock = "{}:{}:{}".format(tempo.hour, tempo.minute, tempo.second)
try:
lab['text'] = clock
except RuntimeError:
exit()
t = perpetualTimer(1, printer)
t.start()
app.mainloop()
from threading import Timer, Thread, Event
from datetime import datetime
class perpetualTimer():
def __init__(self, t, hFunction):
self.t = t
self.hFunction = hFunction
self.thread = Timer(self.t, self.handle_function)
def handle_function(self):
self.hFunction()
self.thread = Timer(self.t, self.handle_function)
self.thread.start()
def start(self):
self.thread.start()
def cancel(self):
self.thread.cancel()
x = datetime.today()
start = x.second
def printer():
global questions, counter, start
x = datetime.today()
tempo = x.second
if tempo - 3 > start:
show_ans()
#print("\n{}:{}:{}".format(tempo.hour, tempo.minute, tempo.second), end="")
print()
print("-" + questions[counter])
counter += 1
if counter == len(answers):
counter = 0
def show_ans():
global answers, c2
print("It is {}".format(answers[c2]))
c2 += 1
if c2 == len(answers):
c2 = 0
questions = ["What is the capital of Italy?",
"What is the capital of France?",
"What is the capital of England?",
"What is the capital of Spain?"]
answers = "Rome", "Paris", "London", "Madrid"
counter = 0
c2 = 0
print("Get ready to answer")
t = perpetualTimer(3, printer)
t.start()
output:
Get ready to answer
>>>
-What is the capital of Italy?
It is Rome
-What is the capital of France?
It is Paris
-What is the capital of England?
...
In the interest of providing a correct answer using Timer as the OP requested, I'll improve upon swapnil jariwala's answer:
from threading import Timer
class InfiniteTimer():
"""A Timer class that does not stop, unless you want it to."""
def __init__(self, seconds, target):
self._should_continue = False
self.is_running = False
self.seconds = seconds
self.target = target
self.thread = None
def _handle_target(self):
self.is_running = True
self.target()
self.is_running = False
self._start_timer()
def _start_timer(self):
if self._should_continue: # Code could have been running when cancel was called.
self.thread = Timer(self.seconds, self._handle_target)
self.thread.start()
def start(self):
if not self._should_continue and not self.is_running:
self._should_continue = True
self._start_timer()
else:
print("Timer already started or running, please wait if you're restarting.")
def cancel(self):
if self.thread is not None:
self._should_continue = False # Just in case thread is running and cancel fails.
self.thread.cancel()
else:
print("Timer never started or failed to initialize.")
def tick():
print('ipsem lorem')
# Example Usage
t = InfiniteTimer(0.5, tick)
t.start()
I had to do this for a project. What I ended up doing was start a separate thread for the function
t = threading.Thread(target =heartbeat, args=(worker,))
t.start()
****heartbeat is my function, worker is one of my arguments****
inside of my heartbeat function:
def heartbeat(worker):
while True:
time.sleep(5)
#all of my code
So when I start the thread the function will repeatedly wait 5 seconds, run all of my code, and do that indefinitely. If you want to kill the process just kill the thread.
Using timer threads-
from threading import Timer,Thread,Event
class perpetualTimer():
def __init__(self,t,hFunction):
self.t=t
self.hFunction = hFunction
self.thread = Timer(self.t,self.handle_function)
def handle_function(self):
self.hFunction()
self.thread = Timer(self.t,self.handle_function)
self.thread.start()
def start(self):
self.thread.start()
def cancel(self):
self.thread.cancel()
def printer():
print 'ipsem lorem'
t = perpetualTimer(5,printer)
t.start()
this can be stopped by t.cancel()
I like right2clicky's answer, especially in that it doesn't require a Thread to be torn down and a new one created every time the Timer ticks. In addition, it's an easy override to create a class with a timer callback that gets called periodically. That's my normal use case:
class MyClass(RepeatTimer):
def __init__(self, period):
super().__init__(period, self.on_timer)
def on_timer(self):
print("Tick")
if __name__ == "__main__":
mc = MyClass(1)
mc.start()
time.sleep(5)
mc.cancel()
I have come up with another solution with SingleTon class. Please tell me if any memory leakage is here.
import time,threading
class Singleton:
__instance = None
sleepTime = 1
executeThread = False
def __init__(self):
if Singleton.__instance != None:
raise Exception("This class is a singleton!")
else:
Singleton.__instance = self
@staticmethod
def getInstance():
if Singleton.__instance == None:
Singleton()
return Singleton.__instance
def startThread(self):
self.executeThread = True
self.threadNew = threading.Thread(target=self.foo_target)
self.threadNew.start()
print('doing other things...')
def stopThread(self):
print("Killing Thread ")
self.executeThread = False
self.threadNew.join()
print(self.threadNew)
def foo(self):
print("Hello in " + str(self.sleepTime) + " seconds")
def foo_target(self):
while self.executeThread:
self.foo()
print(self.threadNew)
time.sleep(self.sleepTime)
if not self.executeThread:
break
sClass = Singleton()
sClass.startThread()
time.sleep(5)
sClass.getInstance().stopThread()
sClass.getInstance().sleepTime = 2
sClass.startThread()
Source: Stackoverflow.com