Python 实现定时任务功能,打造微信消息定时推送功能

文摘   2024-11-11 09:20   安徽  

前言

在软件开发中,定时任务是一种常见的需求,比如定期备份数据、定时发送邮件、定时检查系统状态等。Python作为一种强大的编程语言,提供了多种方式来实现定时任务。

本篇文章将结合之前发布的,通过python来实现微信机器人的教程,打造一个定时推送微信消息到指定的用户或者群组的功能。

利用Python控制微信收发消息,打造自己的微信通知机器人

该机器人方案只能运行在win上,因为使用的是ui自动化来实现的,所以安全性较高,但是弊端也明显。

实现定时任务

本次将介绍多种的实现方式,大家可以了解一下,选择一个自己觉得好用的方式进行接入。

使用标准库中的sched模块

Python的标准库中包含了一个sched模块,这是一个基于事件的定时调度器。使用sched模块可以方便地实现简单的定时任务。

import sched
import time

scheduler = sched.scheduler(time.time, time.sleep)

def job():
print("执行定时任务")

# 将任务添加到调度器中,参数分别是:任务执行的时间,任务优先级,任务函数,任务函数的参数
scheduler.enter(10, 1, job) # 10秒后执行job函数

# 运行调度器
scheduler.run()

使用threading模块

对于需要并行执行的定时任务,可以使用threading模块。通过创建一个线程来执行定时任务,可以避免阻塞主线程。

import threading

def timed_job():
print("执行定时任务")
# 可以在这里设置下一次执行的时间
threading.Timer(10, timed_job).start()

# 设置定时任务的初始执行时间
threading.Timer(10, timed_job).start()

使用第三方库APScheduler

对于更复杂的定时任务,可以使用APScheduler这个强大的第三方库。它支持多种调度方式,包括定时执行、间隔执行等。

from apscheduler.schedulers.background import BackgroundScheduler

def my_job():
print("执行定时任务")

# 创建后台调度器
scheduler = BackgroundScheduler()
# 添加任务,参数分别是:触发器、执行时间间隔、任务函数
scheduler.add_job(my_job, 'interval', seconds=10)
# 启动调度器
scheduler.start()

# 为了防止脚本退出,通常需要在最后加入这行代码
try:
while True:
time.sleep(2)
except (KeyboardInterrupt, SystemExit):
scheduler.shutdown()

APScheduler 还可以使用cron表达式来创建定时任务。

from apscheduler.schedulers.blocking import BlockingScheduler

def my_job():
print("Hello! The time is: %s" % datetime.datetime.now())

# 创建调度器
scheduler = BlockingScheduler()

# 添加任务,使用cron表达式设置每天的13点30分执行任务
scheduler.add_job(my_job, 'cron', hour=13, minute=30)

# 启动调度器
scheduler.start()

实现微信消息的定时推送

假设,我们每天早上九点推送每日新闻到指定的群里。

这里我直接给出代码大家做一个参考:

from apscheduler.schedulers.blocking import BlockingScheduler
from wxauto import WeChat
import requests

# 初始化微信操作对象
wx = WeChat()

def save_image():
# 图片的URL地址
url = 'https://api.jun.la/60s.php?format=image'

try:
# 发送GET请求
response = requests.get(url, timeout=10) # 设置超时时间

# 检查请求是否成功
if response.status_code == 200:
# 保存图片到本地
with open('downloaded_image.png', 'wb') as file:
file.write(response.content)
print("图片已成功保存到本地。")
else:
print("请求图片失败,状态码:", response.status_code)
except requests.RequestException as e:
print(f"请求图片时发生错误:{e}")

def send_message():
save_image()
# 发送图片
files = ['downloaded_image.png']
who = 'xxx群聊'
wx.SendFiles(filepath=files, who=who)


def my_job():
send_message()

# 创建调度器
scheduler = BlockingScheduler()

# 添加任务,使用cron表达式设置每天的13点30分执行任务
scheduler.add_job(my_job, 'cron', hour=9, minute=0)

# 启动调度器
scheduler.start()

往期推荐

家宽拥有IPv6如何突破访问限制,解析域名,远程访问

2024-11-08

WinSW:一个可以将Jar或exe注册成Windows系统服务的工具

2024-10-16

OpenVPN 快速搭建教程

2024-09-30

利用玩客云刷 CasaOS 打造出低成本的轻量级 NAS 服务

2024-07-17

Python项目打包成.exe可执行文件

2024-05-27

欢迎大家关注我的公众号,将会为大家推荐更优质的内容!

青檬小栈
科长技术小栈,分享各种技术文章和教程集合。欢迎大家的关注!
 最新文章