Celery:让你的代码异步起飞!

文摘   2024-10-18 18:55   黑龙江  

Celery:让你的代码异步起飞!

Celery是个强大的Python异步任务队列库,它能让你的程序轻松处理复杂的后台任务。不管是发邮件、处理大数据,还是定时执行任务,Celery 都能帮你搞定。今天咱们就来看看这个神奇的工具怎么用!


01

Celery是啥玩意儿?


简单来说,Celery就是个任务队列管理系统。它能把你的耗时操作扔到后台去执行,让主程序继续欢快地跑下去,不用傻等着。


想象一下,你在餐厅点了菜。服务员接单后,把订单交给厨房(这就是Celery的 worker),然后继续给其他客人点单。你不用等着厨师做完菜才能点下一道菜,是吧?Celery就是这个道理。


python


复制


fromceleryimportCelery


app=Celery('tasks',broker='redis://localhost:6379')


@app.task defadd(x,y):returnx+y


异步调用

result=add.delay(4,4)


这段代码就创建了一个简单的Celery应用,定义了个加法任务。add.delay(4,4)就是异步调用,它会立即返回,不会等待任务完成。


02

Celery的核心组件


Celery主要有三个部分:


1.任务(Tasks):就是你想异步执行的那些函数。2.工人(Workers):负责执行任务的进程。3.消息中间件(Broker):存储任务队列的地方,常用Redis或RabbitMQ。


温馨提示:别把Broker和结果后端搞混了!Broker是存任务的,结果后端是存执行结果的。


03

定义任务


定义任务超级简单,就是给函数加个装饰器:


python


复制


@app.task defsend_email(to,subject,body):


发邮件的代码

pass


这样,send_email就变成了一个Celery任务。你可以像普通函数一样调用它,也可以用delay()apply_async()异步调用。


04

运行Worker


光定义任务不行,还得有人干活啊!启动worker就是这样:


bash


复制


celery-Ayour_moduleworker--loglevel=info


这条命令会启动worker进程,开始处理任务队列里的活儿。


05

异步调用任务


调用任务有两种主要方式:


python


复制


方式1:简单粗暴

result=send_email.delay('user@example.com','Hello','World')


方式2:更多控制

result=send_email.apply_async(args=['user@example.com','Hello','World'],countdown=10)


delay()apply_async()的简化版。用apply_async()你能设置更多参数,比如延迟执行时间。


06

处理结果


异步任务执行完了,结果怎么拿?Celery给了你两种选择:


python


复制


等待结果(不推荐,会阻塞)

result=task.delay(arg1,arg2) value=result.get(timeout=10)


异步回调(推荐)

@app.task defprocess_result(result):print(f“Taskresult:{result}”)


task.apply_async((arg1,arg2),link=process_result.s())


第一种方法会阻塞程序,除非你真的需要等结果,否则不推荐。第二种用回调函数,更符合异步的思想。


07

定时任务


Celery还能帮你搞定定时任务:


python


复制


fromcelery.schedulesimportcrontab


app.conf.beat_schedule={ 'add-every-30-seconds':{ 'task':'tasks.add', 'schedule':30.0, 'args':(16,16) }, 'backup-every-monday-morning':{ 'task':'tasks.backup', 'schedule':crontab(hour=7,minute=30,day_of_week=1), 'args':(), }, }


这样设置后,Celery会每30秒执行一次add任务,每周一早上7:30执行一次backup任务。


08

错误处理


任务执行出错了怎么办?Celery提供了重试机制:


python


复制


fromceleryimportTask


classRetryTask(Task):max_retries=3 default_retry_delay=60#60秒后重试


defretry(self,exc=None,throw=True,kwargs):print(f“Retryingtaskduetoerror:{exc}”) super().retry(exc=exc,throw=throw,kwargs)


@app.task(base=RetryTask) defmight_fail():


可能会失败的代码

ifrandom.choice([True,False]):raiseException(“Oops!”) return“Success!”


这个任务如果失败了,会自动重试3次,每次间隔60秒。


好了,今天的Celery入门就到这里。


用好Celery,你的Python 程序就能轻松应对各种复杂的异步场景。


记住,异步处理不仅能提高程序效率,还能让你的代码结构更清晰。


对了,如果你觉得Celery配置太复杂,可以试试 Django的Celery集成,能省不少事。




乐意说事
阅他人故事,品百味人生,坚持日更,加油!
 最新文章