Before I ask, Cron Jobs and Task Scheduler will be my last options, this script will be used across Windows and Linux and I'd prefer to have a coded out method of doing this than leaving this to the end user to complete.
Is there a library for Python that I can use to schedule tasks? I will need to run a function once every hour, however, over time if I run a script once every hour and use .sleep, "once every hour" will run at a different part of the hour from the previous day due to the delay inherent to executing/running the script and/or function.
What is the best way to schedule a function to run at a specific time of day (more than once) without using a Cron Job or scheduling it with Task Scheduler?
Or if this is not possible, I would like your input as well.
import datetime
import time
from apscheduler.scheduler import Scheduler
# Start the scheduler
sched = Scheduler()
sched.daemonic = False
sched.start()
def job_function():
print("Hello World")
print(datetime.datetime.now())
time.sleep(20)
# Schedules job_function to be run once each minute
sched.add_cron_job(job_function, minute='0-59')
out:
>Hello World
>2014-03-28 09:44:00.016.492
>Hello World
>2014-03-28 09:45:00.0.14110
(From Animesh Pandey's answer below)
from apscheduler.schedulers.blocking import BlockingScheduler
sched = BlockingScheduler()
@sched.scheduled_job('interval', seconds=10)
def timed_job():
print('This job is run every 10 seconds.')
@sched.scheduled_job('cron', day_of_week='mon-fri', hour=10)
def scheduled_job():
print('This job is run every weekday at 10am.')
sched.configure(options_from_ini_file)
sched.start()
This question is related to
python
python-3.x
cron
scheduled-tasks
cron-task
Run the script every 15 minutes of the hour. For example, you want to receive 15 minute stock price quotes, which are updated every 15 minutes.
while True:
print("Update data:", datetime.now())
sleep = 15 - datetime.now().minute % 15
if sleep == 15:
run_strategy()
time.sleep(sleep * 60)
else:
time.sleep(sleep * 60)
#For scheduling task execution
import schedule
import time
def job():
print("I'm working...")
schedule.every(1).minutes.do(job)
#schedule.every().hour.do(job)
#schedule.every().day.at("10:30").do(job)
#schedule.every(5).to(10).minutes.do(job)
#schedule.every().monday.do(job)
#schedule.every().wednesday.at("13:15").do(job)
#schedule.every().minute.at(":17").do(job)
while True:
schedule.run_pending()
time.sleep(1)
On the version posted by sunshinekitty called "Version < 3.0" , you may need to specify apscheduler 2.1.2 . I accidentally had version 3 on my 2.7 install, so I went:
pip uninstall apscheduler
pip install apscheduler==2.1.2
It worked correctly after that. Hope that helps.
Probably you got the solution already @lukik, but if you wanna remove a scheduling, you should use:
job = scheduler.add_job(myfunc, 'interval', minutes=2)
job.remove()
or
scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
scheduler.remove_job('my_job_id')
if you need to use a explicit job ID
For more information, you should check: https://apscheduler.readthedocs.io/en/stable/userguide.html#removing-jobs
the simplest option I can suggest is using the schedule library.
In your question, you said "I will need to run a function once every hour" the code to do this is very simple:
import schedule
def thing_you_wanna_do():
...
...
return
schedule.every().hour.do(thing_you_wanna_do)
while True:
schedule.run_pending()
you also asked how to do something at a certain time of the day some examples of how to do this are:
import schedule
def thing_you_wanna_do():
...
...
return
schedule.every().day.at("10:30").do(thing_you_wanna_do)
schedule.every().monday.do(thing_you_wanna_do)
schedule.every().wednesday.at("13:15").do(thing_you_wanna_do)
# If you would like some randomness / variation you could also do something like this
schedule.every(1).to(2).hours.do(thing_you_wanna_do)
while True:
schedule.run_pending()
90% of the code used is the example code of the schedule library. Happy scheduling!
One option is to write a C/C++ wrapper that executes the python script on a regular basis. Your end-user would run the C/C++ executable, which would remain running in the background, and periodically execute the python script. This may not be the best solution, and may not work if you don't know C/C++ or want to keep this 100% python. But it does seem like the most user-friendly approach, since people are used to clicking on executables. All of this assumes that python is installed on your end user's computer.
Another option is to use cron job/Task Scheduler but to put it in the installer as a script so your end user doesn't have to do it.
To run something every 10 minutes past the hour.
from datetime import datetime, timedelta
while 1:
print 'Run something..'
dt = datetime.now() + timedelta(hours=1)
dt = dt.replace(minute=10)
while datetime.now() < dt:
time.sleep(1)
For apscheduler
< 3.0, see Unknown's answer.
For apscheduler
> 3.0
from apscheduler.schedulers.blocking import BlockingScheduler
sched = BlockingScheduler()
@sched.scheduled_job('interval', seconds=10)
def timed_job():
print('This job is run every 10 seconds.')
@sched.scheduled_job('cron', day_of_week='mon-fri', hour=10)
def scheduled_job():
print('This job is run every weekday at 10am.')
sched.configure(options_from_ini_file)
sched.start()
apscheduler
documentation.
This for apscheduler-3.3.1
on Python 3.6.2
.
"""
Following configurations are set for the scheduler:
- a MongoDBJobStore named “mongo”
- an SQLAlchemyJobStore named “default” (using SQLite)
- a ThreadPoolExecutor named “default”, with a worker count of 20
- a ProcessPoolExecutor named “processpool”, with a worker count of 5
- UTC as the scheduler’s timezone
- coalescing turned off for new jobs by default
- a default maximum instance limit of 3 for new jobs
"""
from pytz import utc
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ProcessPoolExecutor
"""
Method 1:
"""
jobstores = {
'mongo': {'type': 'mongodb'},
'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
'default': {'type': 'threadpool', 'max_workers': 20},
'processpool': ProcessPoolExecutor(max_workers=5)
}
job_defaults = {
'coalesce': False,
'max_instances': 3
}
"""
Method 2 (ini format):
"""
gconfig = {
'apscheduler.jobstores.mongo': {
'type': 'mongodb'
},
'apscheduler.jobstores.default': {
'type': 'sqlalchemy',
'url': 'sqlite:///jobs.sqlite'
},
'apscheduler.executors.default': {
'class': 'apscheduler.executors.pool:ThreadPoolExecutor',
'max_workers': '20'
},
'apscheduler.executors.processpool': {
'type': 'processpool',
'max_workers': '5'
},
'apscheduler.job_defaults.coalesce': 'false',
'apscheduler.job_defaults.max_instances': '3',
'apscheduler.timezone': 'UTC',
}
sched_method1 = BlockingScheduler() # uses overrides from Method1
sched_method2 = BlockingScheduler() # uses same overrides from Method2 but in an ini format
@sched_method1.scheduled_job('interval', seconds=10)
def timed_job():
print('This job is run every 10 seconds.')
@sched_method2.scheduled_job('cron', day_of_week='mon-fri', hour=10)
def scheduled_job():
print('This job is run every weekday at 10am.')
sched_method1.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)
sched_method1.start()
sched_method2.configure(gconfig=gconfig)
sched_method2.start()
Source: Stackoverflow.com