Celery:让你的代码异步起飞!
Celery是个强大的Python异步任务队列库,它能让你的程序轻松处理复杂的后台任务。不管是发邮件、处理大数据,还是定时执行任务,Celery 都能帮你搞定。今天咱们就来看看这个神奇的工具怎么用!
Celery是啥玩意儿?
简单来说,Celery就是个任务队列管理系统。它能把你的耗时操作扔到后台去执行,让主程序继续欢快地跑下去,不用傻等着。
想象一下,你在餐厅点了菜。服务员接单后,把订单交给厨房(这就是Celery的 worker),然后继续给其他客人点单。你不用等着厨师做完菜才能点下一道菜,是吧?Celery就是这个道理。
python
复制
fromceleryimportCelery
app=Celery('tasks',broker='redis://localhost:6379')
@app.task defadd(x,y):returnx+y
异步调用
result=add.delay(4,4)
这段代码就创建了一个简单的Celery应用,定义了个加法任务。add.delay(4,4)
就是异步调用,它会立即返回,不会等待任务完成。
Celery的核心组件
Celery主要有三个部分:
1.任务(Tasks):就是你想异步执行的那些函数。2.工人(Workers):负责执行任务的进程。3.消息中间件(Broker):存储任务队列的地方,常用Redis或RabbitMQ。
温馨提示:别把Broker和结果后端搞混了!Broker是存任务的,结果后端是存执行结果的。
定义任务
定义任务超级简单,就是给函数加个装饰器:
python
复制
@app.task defsend_email(to,subject,body):
发邮件的代码
pass
这样,send_email
就变成了一个Celery任务。你可以像普通函数一样调用它,也可以用delay()
或apply_async()
异步调用。
运行Worker
光定义任务不行,还得有人干活啊!启动worker就是这样:
bash
复制
celery-Ayour_moduleworker--loglevel=info
这条命令会启动worker进程,开始处理任务队列里的活儿。
异步调用任务
调用任务有两种主要方式:
python
复制
方式1:简单粗暴
result=send_email.delay('user@example.com','Hello','World')
方式2:更多控制
result=send_email.apply_async(args=['user@example.com','Hello','World'],countdown=10)
delay()
是apply_async()
的简化版。用apply_async()
你能设置更多参数,比如延迟执行时间。
处理结果
异步任务执行完了,结果怎么拿?Celery给了你两种选择:
python
复制
等待结果(不推荐,会阻塞)
result=task.delay(arg1,arg2) value=result.get(timeout=10)
异步回调(推荐)
@app.task defprocess_result(result):print(f“Taskresult:{result}”)
task.apply_async((arg1,arg2),link=process_result.s())
第一种方法会阻塞程序,除非你真的需要等结果,否则不推荐。第二种用回调函数,更符合异步的思想。
定时任务
Celery还能帮你搞定定时任务:
python
复制
fromcelery.schedulesimportcrontab
app.conf.beat_schedule={ 'add-every-30-seconds':{ 'task':'tasks.add', 'schedule':30.0, 'args':(16,16) }, 'backup-every-monday-morning':{ 'task':'tasks.backup', 'schedule':crontab(hour=7,minute=30,day_of_week=1), 'args':(), }, }
这样设置后,Celery会每30秒执行一次add
任务,每周一早上7:30执行一次backup
任务。
错误处理
任务执行出错了怎么办?Celery提供了重试机制:
python
复制
fromceleryimportTask
classRetryTask(Task):max_retries=3 default_retry_delay=60#60秒后重试
defretry(self,exc=None,throw=True,kwargs):print(f“Retryingtaskduetoerror:{exc}”) super().retry(exc=exc,throw=throw,kwargs)
@app.task(base=RetryTask) defmight_fail():
可能会失败的代码
ifrandom.choice([True,False]):raiseException(“Oops!”) return“Success!”
这个任务如果失败了,会自动重试3次,每次间隔60秒。
好了,今天的Celery入门就到这里。
用好Celery,你的Python 程序就能轻松应对各种复杂的异步场景。
记住,异步处理不仅能提高程序效率,还能让你的代码结构更清晰。
对了,如果你觉得Celery配置太复杂,可以试试 Django的Celery集成,能省不少事。