博客
关于我
RabbitMQ(三):Work Queues
阅读量:684 次
发布时间:2019-03-17

本文共 2728 字,大约阅读时间需要 9 分钟。

RabbitMQ 消息生产与消费实践指南

1. 参考链接

本文基于 RabbitMQ 官方文档和实践经验编写,未引用外部链接。

2. 代码实现

2.1 生产者定义

生产者负责将消息发布到 RabbitMQ 中。以下是关键参数的说明:

  • durable = True:表示消息持久化。如果生产者宕机,消息会保存在硬盘中(可在 RabbitMQ 管理界面指定存储路径)。
  • delivery_mode = 2:与 durable 一起使用,可确保 RabbitMQ 服务重启后消息不会丢失。
  • auto_ack = True:默认开启的应答参数,确保生产者在消费者未反馈时不会重新发送消息。
import pikaimport datetimeif __name__ == "__main__":    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))    channel = connection.channel()    channel.queue_declare(queue='tom', durable=True)        while True:        message = str(datetime.datetime.now())        channel.basic_publish(            exchange='',            routing_key='tom',            body=message,            properties=pika.BasicProperties(                delivery_mode=2            )        )        print("[x] sent %r" % message)        time.sleep(1)    connection.close()

2.2 消费者定义

消费者负责接收并处理消息。以下是关键代码说明:

  • call_back 函数接收消息并打印接收时间,time.sleep() 用于模拟处理延迟。
  • basic_ack 确认消息已成功处理。
import pikaimport timedef call_back(ch, method, properties, body):    print('[*] recieved %r' % body.decode())    time.sleep(body.count(b'.') + 2)  # 添加额外延迟    print('[*] done')    ch.basic_ack(delivery_tag=method.delivery_tag)if __name__ == "__main__":    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))    channel = connection.channel()    channel.queue_declare(queue='tom', durable=True)    channel.basic_qos(prefetch_count=1)  # 最多获取一个消息    channel.basic_consume(queue='tom', on_message_callback=call_back)    channel.start_consuming()

3. 代码运行结果

3.1 生产者行为

  • 生产者会持续循环发布消息,每隔 1 秒发送一条时间戳消息。
  • 每条消息都会打印 [x] sent 的日志。

3.2 消费者行为

  • 消费者会等待消息并处理。
  • 消息处理时间延迟取决于消息内容中点号数量加 2 秒。
  • 消费者会在处理完成后确认消息,确保消息可靠传输。

4. Windows 查看未处理消息队列

在 Windows 环境下,使用以下命令查看 RabbitMQ 队列状态:

rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

这样可以查看队列名称、已读消息数和未确认消息数。

5. 疑问解答

5.1 如何让生产者持续发送消息?

在生产者代码中添加一个无限循环即可:

import pikaimport datetimeimport timeif __name__ == "__main__":    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))    channel = connection.channel()    channel.queue_declare(queue='tom', durable=True)        while True:        message = str(datetime.datetime.now())        channel.basic_publish(            exchange='',            routing_key='tom',            body=message,            properties=pika.BasicProperties(                delivery_mode=2            )        )        print("[x] sent %r" % message)        time.sleep(1)    connection.close()

5.2 为什么会有未处理消息?

这通常是由于消费者无法及时处理消息或网络延迟导致的。确保消费者代码中没有过长的 time.sleep 延迟或其他阻塞操作,及时 basic_ack 确认消息。

6. 注意事项

  • 持久化消息delivery_mode=2 确保消息持久化,但请注意磁盘空间限制。
  • 性能优化:根据消费者需求调整 prefetch_count,避免过多占用内存。
  • 异常处理:建议在生产者中添加异常捕获机制,防止连接中断导致消息丢失。

通过以上配置和优化,可以实现可靠、高效的消息生产与消费。

转载地址:http://kmkhz.baihongyu.com/

你可能感兴趣的文章
Pipenv 与 Conda?
查看>>
QVGA/HVGA/WVGA/FWVGA分辨率屏含义及大小//Android虚拟机分辨率
查看>>
pipreqs : 无法将“pipreqs”项识别为 cmdlet、函数、脚本文件或可运行程序的名称。请检查名称的拼写,如果包括路径,请确保路径 正确,然后再试一次。
查看>>
pipy国内镜像的网址
查看>>
quiver绘制python语言
查看>>
pip下载缓慢
查看>>
PIP使用SSH从BitBucket安装自定义软件包,无需输入SSH密码
查看>>
pip命令提示unknow or unsupported command install解决方法
查看>>
pip在安装模块时提示Read timed out
查看>>
pip更换源
查看>>
SpringBoot之Banner源码深度分解
查看>>
Pix2Pix如何工作?
查看>>
QuickBI助你成为分析师——搞定数据源
查看>>
pkl来存储python字典
查看>>
quick sort | 快速排序 C++ 实现
查看>>
pkpmbs 建设工程质量监督系统 Ajax_operaFile.aspx 文件读取漏洞复现
查看>>
pkpmbs 建设工程质量监督系统 文件上传漏洞复现
查看>>
pku 2400 Supervisor, Supervisee KM求最小权匹配+DFS回溯解集
查看>>
queue队列、deque双端队列和priority_queue优先队列
查看>>
PKUSC2018游记
查看>>