本文共 2728 字,大约阅读时间需要 9 分钟。
本文基于 RabbitMQ 官方文档和实践经验编写,未引用外部链接。
生产者负责将消息发布到 RabbitMQ 中。以下是关键参数的说明:
durable 一起使用,可确保 RabbitMQ 服务重启后消息不会丢失。import pikaimport datetimeif __name__ == "__main__": connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='tom', durable=True) while True: message = str(datetime.datetime.now()) channel.basic_publish( exchange='', routing_key='tom', body=message, properties=pika.BasicProperties( delivery_mode=2 ) ) print("[x] sent %r" % message) time.sleep(1) connection.close() 消费者负责接收并处理消息。以下是关键代码说明:
call_back 函数接收消息并打印接收时间,time.sleep() 用于模拟处理延迟。basic_ack 确认消息已成功处理。import pikaimport timedef call_back(ch, method, properties, body): print('[*] recieved %r' % body.decode()) time.sleep(body.count(b'.') + 2) # 添加额外延迟 print('[*] done') ch.basic_ack(delivery_tag=method.delivery_tag)if __name__ == "__main__": connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='tom', durable=True) channel.basic_qos(prefetch_count=1) # 最多获取一个消息 channel.basic_consume(queue='tom', on_message_callback=call_back) channel.start_consuming() [x] sent 的日志。在 Windows 环境下,使用以下命令查看 RabbitMQ 队列状态:
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
这样可以查看队列名称、已读消息数和未确认消息数。
在生产者代码中添加一个无限循环即可:
import pikaimport datetimeimport timeif __name__ == "__main__": connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='tom', durable=True) while True: message = str(datetime.datetime.now()) channel.basic_publish( exchange='', routing_key='tom', body=message, properties=pika.BasicProperties( delivery_mode=2 ) ) print("[x] sent %r" % message) time.sleep(1) connection.close() 这通常是由于消费者无法及时处理消息或网络延迟导致的。确保消费者代码中没有过长的 time.sleep 延迟或其他阻塞操作,及时 basic_ack 确认消息。
delivery_mode=2 确保消息持久化,但请注意磁盘空间限制。prefetch_count,避免过多占用内存。通过以上配置和优化,可以实现可靠、高效的消息生产与消费。
转载地址:http://kmkhz.baihongyu.com/