先来个简单的背景:什么是MQ?
那么,为什么要保证消息的可靠性呢?
1. 消息持久化
磁盘持久化:消息被写入磁盘,一旦写入成功,可以确保在系统崩溃时消息不丢失。比如RabbitMQ的“消息持久化”模式,确保队列中的消息被写入磁盘。 内存持久化:一些高性能的MQ,比如Kafka,可以通过内存+磁盘的方式来提高性能,但如果MQ宕机,数据可能会丢失。
# 连接RabbitMQ
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 声明一个持久化的队列
channel.queue_declare(queue='task_queue', durable=True)
# 发送一条持久化的消息
channel.basic_publish(
exchange='',
routing_key='task_queue',
body='Hello World!',
properties=pika.BasicProperties(
delivery_mode=2, # 使消息持久化
))
print(" [x] Sent 'Hello World!'")
connection.close()
delivery_mode=2
来使得消息持久化,这样即使RabbitMQ崩溃了,消息也不会丢失。2. 消息确认机制
生产者确认:确保消息已经成功写入队列。常见的机制是“消息确认”回调,确保消息被队列接收。 消费者确认:确保消息被消费者处理并且确认。如果消费者没有成功处理,消息会重新排队或者被转到死信队列(Dead Letter Queue,DLQ)。
ack
(acknowledgment)来进行,消费者需要显式确认消息已经被成功处理。# 消费者确认消息处理
def callback(ch, method, properties, body):
print(f" [x] Received {body}")
ch.basic_ack(delivery_tag=method.delivery_tag) # 手动确认消息
channel.basic_consume(queue='task_queue', on_message_callback=callback)
basic_ack
会显式确认消息已经被成功处理。如果消费者没有调用basic_ack
,消息将被重新处理。3. 消息重试机制与死信队列(DLQ)
# 设置死信队列
channel.queue_declare(queue='dlq', durable=True)
# 消费者处理失败的消息会被发送到死信队列
channel.basic_publish(
exchange='',
routing_key='dlq',
body='Failed Message',
properties=pika.BasicProperties(
delivery_mode=2, # 使消息持久化
))
4. 消息去重
Kafka Producer
中的acks
参数来控制消息的可靠性。# Kafka生产者设置
from kafka import KafkaProducer
producer = KafkaProducer(acks='all', bootstrap_servers='localhost:9092')
producer.send('my_topic', key=b'key1', value=b'Hello Kafka!')
acks='all'
表示所有的副本都必须确认消息已经写入,这样可以避免消息丢失。5. 集群与高可用
# Kafka集群中的副本数设置
bin/kafka-topics.sh --alter --topic my_topic --partitions 3 --replication-factor 2 --bootstrap-server localhost:9092
-END-
ok,今天先说到这,老规矩,看完文章记得右下角给何老师点赞,
最后送给大家一个福利,我这里有一份搞副业的教程,这份教程里有100+个搞钱小项目:
网盘拉新核心玩法、公众号运营变现、小红书虚拟资料引流等,现在扫码加我微信,即可领取这份副业教程。
添加时备注:副业