前言
在软件开发中,定时任务是一种常见的需求,比如定期备份数据、定时发送邮件、定时检查系统状态等。Python作为一种强大的编程语言,提供了多种方式来实现定时任务。
本篇文章将结合之前发布的,通过python来实现微信机器人的教程,打造一个定时推送微信消息到指定的用户或者群组的功能。
该机器人方案只能运行在win上,因为使用的是ui自动化来实现的,所以安全性较高,但是弊端也明显。
实现定时任务
本次将介绍多种的实现方式,大家可以了解一下,选择一个自己觉得好用的方式进行接入。
使用标准库中的sched模块
Python的标准库中包含了一个sched
模块,这是一个基于事件的定时调度器。使用sched
模块可以方便地实现简单的定时任务。
import sched
import time
scheduler = sched.scheduler(time.time, time.sleep)
def job():
print("执行定时任务")
# 将任务添加到调度器中,参数分别是:任务执行的时间,任务优先级,任务函数,任务函数的参数
scheduler.enter(10, 1, job) # 10秒后执行job函数
# 运行调度器
scheduler.run()
使用threading模块
对于需要并行执行的定时任务,可以使用threading
模块。通过创建一个线程来执行定时任务,可以避免阻塞主线程。
import threading
def timed_job():
print("执行定时任务")
# 可以在这里设置下一次执行的时间
threading.Timer(10, timed_job).start()
# 设置定时任务的初始执行时间
threading.Timer(10, timed_job).start()
使用第三方库APScheduler
对于更复杂的定时任务,可以使用APScheduler
这个强大的第三方库。它支持多种调度方式,包括定时执行、间隔执行等。
from apscheduler.schedulers.background import BackgroundScheduler
def my_job():
print("执行定时任务")
# 创建后台调度器
scheduler = BackgroundScheduler()
# 添加任务,参数分别是:触发器、执行时间间隔、任务函数
scheduler.add_job(my_job, 'interval', seconds=10)
# 启动调度器
scheduler.start()
# 为了防止脚本退出,通常需要在最后加入这行代码
try:
while True:
time.sleep(2)
except (KeyboardInterrupt, SystemExit):
scheduler.shutdown()
APScheduler 还可以使用cron
表达式来创建定时任务。
from apscheduler.schedulers.blocking import BlockingScheduler
def my_job():
print("Hello! The time is: %s" % datetime.datetime.now())
# 创建调度器
scheduler = BlockingScheduler()
# 添加任务,使用cron表达式设置每天的13点30分执行任务
scheduler.add_job(my_job, 'cron', hour=13, minute=30)
# 启动调度器
scheduler.start()
实现微信消息的定时推送
假设,我们每天早上九点推送每日新闻到指定的群里。
这里我直接给出代码大家做一个参考:
from apscheduler.schedulers.blocking import BlockingScheduler
from wxauto import WeChat
import requests
# 初始化微信操作对象
wx = WeChat()
def save_image():
# 图片的URL地址
url = 'https://api.jun.la/60s.php?format=image'
try:
# 发送GET请求
response = requests.get(url, timeout=10) # 设置超时时间
# 检查请求是否成功
if response.status_code == 200:
# 保存图片到本地
with open('downloaded_image.png', 'wb') as file:
file.write(response.content)
print("图片已成功保存到本地。")
else:
print("请求图片失败,状态码:", response.status_code)
except requests.RequestException as e:
print(f"请求图片时发生错误:{e}")
def send_message():
save_image()
# 发送图片
files = ['downloaded_image.png']
who = 'xxx群聊'
wx.SendFiles(filepath=files, who=who)
def my_job():
send_message()
# 创建调度器
scheduler = BlockingScheduler()
# 添加任务,使用cron表达式设置每天的13点30分执行任务
scheduler.add_job(my_job, 'cron', hour=9, minute=0)
# 启动调度器
scheduler.start()