一、幂等性实现
1.1 什么是幂等性?
幂等性是指同一条消息无论被消费多少次,业务结果都只生效一次,防止重复扣款、重复发货等问题。
RabbitMQ 的投递模式是“至少一次交付”(at-least-once delivery),如果消费者处理失败或者没有及时确认,消息会被多次投递。如果业务本身不具备幂等性,就可能导致重复扣款、重复发货等严重后果。
1.2 实现思路
RabbitMQ 只负责消息的可靠投递,而不会记录每条消息是否已经被成功消费。因此,需要由消费者端维护消费状态,常见做法是借助 Redis 实现去重逻辑。
消息在生产阶段应携带全局唯一的 message_id(例如订单号:order:10010)。在消费逻辑中,先通过 Redis 的原子命令 SETNX 尝试写入该 message_id:①如果 SETNX返回1,表示第一次消费,可以处理;②如果返回0,表示已消费,直接忽略
二、消息重放实现
在RabbitMQ中,ack和nack机制是保证可靠投递、实现重放的关键。
2.1 ack和nack
如果你的消费逻辑里既没有调用ack,也没有调用nack,消息状态会一直unacked。只要没确认,就永远不会删除消息。
(1) ack
确认消息已被消费成功。当消费者调用:
ch.basic_ack(delivery_tag=method.delivery_tag)
RabbitMQ就会把消息从队列里永久删除。只要你ack了,这条消息就不可能再来了。
(2) nack
告诉RabbitMQ“我没处理好”。有两种方式:
# 发送nack并重入队列
# RabbitMQ会立刻把消息放回队列,再投递给其他消费者。
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
# 发送nack不重入队列
# 消息就会被丢弃(或者,如果绑定了死信队列,就转入死信队列)。
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
2.2 实现代码
下方代码实现了以下关键功能:
1. 消息通过 SETNX + EXPIRE 在 Redis 中写入幂等标记,确保同一消息只会被一个消费者处理。
2. 如果标记已存在,判断是“已完成”还是“正在处理”,分别选择直接确认或稍后重试。
3. 业务处理成功后将标记更新为 done 并延长过期,表示消费已完成。
4. 如果处理失败,删除标记以便下次重新消费,并根据重试次数决定是否放弃或重试。
生产者代码
import pika
import uuid
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='test_queue', durable=True)
message_id = str(uuid.uuid4())
body = "test message" # 可以通过推送body = "fail message" 模拟消费异常
properties = pika.BasicProperties(
delivery_mode=2,
message_id=message_id
)
channel.basic_publish(
exchange='',
routing_key='test_queue',
body=body,
properties=properties
)
print(f"[x] Sent '{body}' with message_id {message_id}")
connection.close()
消费者代码
import pika
import redis
import time
# Redis
r = redis.StrictRedis(host='localhost', port=6379, db=0)
# RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='test_queue', durable=True)
MAX_RETRY = 5
def callback(ch, method, properties, body):
message_id = properties.message_id
if not message_id:
import hashlib
message_id = hashlib.md5(body).hexdigest()
redis_key = f"msg:{message_id}"
retry_key = f"retry:{message_id}"
# 尝试用SETNX写入幂等标记
result = r.setnx(redis_key, "processing")
if not result:
status = r.get(redis_key)
if status and status.decode() == "done":
# 已经处理过
ch.basic_ack(delivery_tag=method.delivery_tag)
print(f"[!] Duplicate message detected: {message_id}")
else:
# 正在处理,稍后重试
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
print(f"[!] Message {message_id} is being processed by another consumer.")
return
# SETNX成功,要设置过期时间,防止永久占用
r.expire(redis_key, 300) # 300秒
try:
# 获取重试次数
retry_count = r.get(retry_key)
if retry_count is None:
retry_count = 0
else:
retry_count = int(retry_count)
print(f"[x] Processing message: {body.decode()} (retry: {retry_count})")
# 模拟失败
if "fail" in body.decode():
raise Exception("Simulated failure")
# 业务逻辑
# ...
# 处理成功,改为done并延长过期
r.set(redis_key, "done")
r.expire(redis_key, 24*60*60)
r.delete(retry_key)
ch.basic_ack(delivery_tag=method.delivery_tag)
print("[+] Message processed successfully")
except Exception as e:
retry_count += 1
r.set(retry_key, retry_count)
r.expire(retry_key, 24*60*60)
print(f"[!] Error processing message (retry {retry_count}): {e}")
# 失败时删除幂等标记,下次可以继续处理
r.delete(redis_key)
if retry_count >= MAX_RETRY:
ch.basic_ack(delivery_tag=method.delivery_tag)
print("[!] Max retries reached, moving message to dead letter log.")
else:
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(
queue='test_queue',
on_message_callback=callback,
auto_ack=False
)
print("[*] Waiting for messages. CTRL+C to exit")
channel.start_consuming()