博客
关于我
RabbitMQ(三):Work Queues
阅读量:684 次
发布时间:2019-03-17

本文共 2728 字,大约阅读时间需要 9 分钟。

RabbitMQ 消息生产与消费实践指南

1. 参考链接

本文基于 RabbitMQ 官方文档和实践经验编写,未引用外部链接。

2. 代码实现

2.1 生产者定义

生产者负责将消息发布到 RabbitMQ 中。以下是关键参数的说明:

  • durable = True:表示消息持久化。如果生产者宕机,消息会保存在硬盘中(可在 RabbitMQ 管理界面指定存储路径)。
  • delivery_mode = 2:与 durable 一起使用,可确保 RabbitMQ 服务重启后消息不会丢失。
  • auto_ack = True:默认开启的应答参数,确保生产者在消费者未反馈时不会重新发送消息。
import pikaimport datetimeif __name__ == "__main__":    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))    channel = connection.channel()    channel.queue_declare(queue='tom', durable=True)        while True:        message = str(datetime.datetime.now())        channel.basic_publish(            exchange='',            routing_key='tom',            body=message,            properties=pika.BasicProperties(                delivery_mode=2            )        )        print("[x] sent %r" % message)        time.sleep(1)    connection.close()

2.2 消费者定义

消费者负责接收并处理消息。以下是关键代码说明:

  • call_back 函数接收消息并打印接收时间,time.sleep() 用于模拟处理延迟。
  • basic_ack 确认消息已成功处理。
import pikaimport timedef call_back(ch, method, properties, body):    print('[*] recieved %r' % body.decode())    time.sleep(body.count(b'.') + 2)  # 添加额外延迟    print('[*] done')    ch.basic_ack(delivery_tag=method.delivery_tag)if __name__ == "__main__":    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))    channel = connection.channel()    channel.queue_declare(queue='tom', durable=True)    channel.basic_qos(prefetch_count=1)  # 最多获取一个消息    channel.basic_consume(queue='tom', on_message_callback=call_back)    channel.start_consuming()

3. 代码运行结果

3.1 生产者行为

  • 生产者会持续循环发布消息,每隔 1 秒发送一条时间戳消息。
  • 每条消息都会打印 [x] sent 的日志。

3.2 消费者行为

  • 消费者会等待消息并处理。
  • 消息处理时间延迟取决于消息内容中点号数量加 2 秒。
  • 消费者会在处理完成后确认消息,确保消息可靠传输。

4. Windows 查看未处理消息队列

在 Windows 环境下,使用以下命令查看 RabbitMQ 队列状态:

rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

这样可以查看队列名称、已读消息数和未确认消息数。

5. 疑问解答

5.1 如何让生产者持续发送消息?

在生产者代码中添加一个无限循环即可:

import pikaimport datetimeimport timeif __name__ == "__main__":    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))    channel = connection.channel()    channel.queue_declare(queue='tom', durable=True)        while True:        message = str(datetime.datetime.now())        channel.basic_publish(            exchange='',            routing_key='tom',            body=message,            properties=pika.BasicProperties(                delivery_mode=2            )        )        print("[x] sent %r" % message)        time.sleep(1)    connection.close()

5.2 为什么会有未处理消息?

这通常是由于消费者无法及时处理消息或网络延迟导致的。确保消费者代码中没有过长的 time.sleep 延迟或其他阻塞操作,及时 basic_ack 确认消息。

6. 注意事项

  • 持久化消息delivery_mode=2 确保消息持久化,但请注意磁盘空间限制。
  • 性能优化:根据消费者需求调整 prefetch_count,避免过多占用内存。
  • 异常处理:建议在生产者中添加异常捕获机制,防止连接中断导致消息丢失。

通过以上配置和优化,可以实现可靠、高效的消息生产与消费。

转载地址:http://kmkhz.baihongyu.com/

你可能感兴趣的文章
OceanBase详解及如何通过MySQL的lib库进行连接
查看>>
OfficeWeb365 SaveDraw 文件上传漏洞复现
查看>>
office中的所有content type
查看>>
office之Excel 你会用 Ctrl + E 吗?
查看>>
OGG初始化之使用数据库实用程序加载数据
查看>>
ogg参数解析
查看>>
ognl详解
查看>>
Oil Deposits
查看>>
OJ中处理超大数据的方法
查看>>
OJ中常见的一种presentation error解决方法
查看>>
OK335xS UART device registe hacking
查看>>
ok6410内存初始化
查看>>
Okhttp3添加拦截器后,报错,java.io.IOException: unexpected end of stream on okhttp3.Address
查看>>
OKR为什么到今天才突然火了?
查看>>
ol3 Demo2 ----地图搜索功能
查看>>
OLAP、OLTP的介绍和比较
查看>>
OLAP在大数据时代的挑战
查看>>
oldboy.16课
查看>>
OLEDB IMEX行数限制的问题
查看>>
ollama 如何删除本地模型文件?
查看>>