APScheduler:定时任务神器
一个定时任务框架,说白了就是让程序按照既定的时间计划去干活。APScheduler就是这么一个神器,它能帮你把任务安排得明明白白的。想每天早上8点发个邮件?想每隔5分钟检查一下服务器状态?APScheduler统统能搞定。
装这个库特别简单,一行命令就搞定:
pip install apscheduler
来看个最基础的例子:
from apscheduler.schedulers.blocking import BlockingScheduler
from datetime import datetime
def job():
print(f“我在 {datetime.now()} 被调用啦!”)
scheduler = BlockingScheduler()
scheduler.add_job(job, 'interval', seconds=3)
scheduler.start()
运行这段代码,你就能看到每3秒钟打印一次当前时间。
温馨提示: BlockingScheduler 是最简单的调度器,但它会阻塞当前程序。如果你的项目里还有其他代码要跑,建议用 BackgroundScheduler 。
APScheduler有三种经典的调度方式,我都给你整明白:
间隔调度(interval) :
scheduler.add_job(job, 'interval', hours=1) # 每小时执行
scheduler.add_job(job, 'interval', minutes=30) # 每30分钟执行
定时调度(cron) :
# 每天早上8点执行
scheduler.add_job(job, 'cron', hour=8)
# 工作日的下午5点执行
scheduler.add_job(job, 'cron', day_of_week='mon-fri', hour=17)
一次性调度(date) :
scheduler.add_job(job, 'date', run_date='2024-12-31 23:59:59')
有时候我们需要对正在运行的任务动点手脚:
from apscheduler.schedulers.background import BackgroundScheduler
def my_job(name):
print(f“任务{name}执行啦!”)
scheduler = BackgroundScheduler()
job = scheduler.add_job(my_job, 'interval', seconds=5, args=['测试'])
scheduler.start()
# 暂停任务
job.pause()
# 恢复任务
job.resume()
# 删除任务
job.remove()
啥事都得有个保底,任务执行出错了咋整?
from apscheduler.events import EVENT_JOB_ERROR
def my_listener(event):
if event.exception:
print(f“哎呀,出错了:{event.exception}”)
scheduler = BackgroundScheduler()
scheduler.add_listener(my_listener, EVENT_JOB_ERROR)
温馨提示:任务里最好加上try-except,不然一个任务崩了可能影响其他任务。
任务多了就得有个“备忘录”,APScheduler支持多种存储方式:
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
jobstores = {
'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
scheduler = BackgroundScheduler(jobstores=jobstores)
这样即使程序重启,任务配置也不会丢失。
经过这一通操作,你已经掌握了APScheduler的核心用法。调度任务就像设置闹钟一样简单,关键是要记住三种调度方式的特点。写代码时记得加上错误处理,这样你的程序才能在生产环境里稳稳当当地跑。
看完这些内容,你随便找个需求练练手,没准很快就能写出一个靠谱的定时任务系统。