博客
关于我
RabbitMQ(三):Work Queues
阅读量:684 次
发布时间:2019-03-17

本文共 2728 字,大约阅读时间需要 9 分钟。

RabbitMQ 消息生产与消费实践指南

1. 参考链接

本文基于 RabbitMQ 官方文档和实践经验编写,未引用外部链接。

2. 代码实现

2.1 生产者定义

生产者负责将消息发布到 RabbitMQ 中。以下是关键参数的说明:

  • durable = True:表示消息持久化。如果生产者宕机,消息会保存在硬盘中(可在 RabbitMQ 管理界面指定存储路径)。
  • delivery_mode = 2:与 durable 一起使用,可确保 RabbitMQ 服务重启后消息不会丢失。
  • auto_ack = True:默认开启的应答参数,确保生产者在消费者未反馈时不会重新发送消息。
import pikaimport datetimeif __name__ == "__main__":    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))    channel = connection.channel()    channel.queue_declare(queue='tom', durable=True)        while True:        message = str(datetime.datetime.now())        channel.basic_publish(            exchange='',            routing_key='tom',            body=message,            properties=pika.BasicProperties(                delivery_mode=2            )        )        print("[x] sent %r" % message)        time.sleep(1)    connection.close()

2.2 消费者定义

消费者负责接收并处理消息。以下是关键代码说明:

  • call_back 函数接收消息并打印接收时间,time.sleep() 用于模拟处理延迟。
  • basic_ack 确认消息已成功处理。
import pikaimport timedef call_back(ch, method, properties, body):    print('[*] recieved %r' % body.decode())    time.sleep(body.count(b'.') + 2)  # 添加额外延迟    print('[*] done')    ch.basic_ack(delivery_tag=method.delivery_tag)if __name__ == "__main__":    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))    channel = connection.channel()    channel.queue_declare(queue='tom', durable=True)    channel.basic_qos(prefetch_count=1)  # 最多获取一个消息    channel.basic_consume(queue='tom', on_message_callback=call_back)    channel.start_consuming()

3. 代码运行结果

3.1 生产者行为

  • 生产者会持续循环发布消息,每隔 1 秒发送一条时间戳消息。
  • 每条消息都会打印 [x] sent 的日志。

3.2 消费者行为

  • 消费者会等待消息并处理。
  • 消息处理时间延迟取决于消息内容中点号数量加 2 秒。
  • 消费者会在处理完成后确认消息,确保消息可靠传输。

4. Windows 查看未处理消息队列

在 Windows 环境下,使用以下命令查看 RabbitMQ 队列状态:

rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

这样可以查看队列名称、已读消息数和未确认消息数。

5. 疑问解答

5.1 如何让生产者持续发送消息?

在生产者代码中添加一个无限循环即可:

import pikaimport datetimeimport timeif __name__ == "__main__":    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))    channel = connection.channel()    channel.queue_declare(queue='tom', durable=True)        while True:        message = str(datetime.datetime.now())        channel.basic_publish(            exchange='',            routing_key='tom',            body=message,            properties=pika.BasicProperties(                delivery_mode=2            )        )        print("[x] sent %r" % message)        time.sleep(1)    connection.close()

5.2 为什么会有未处理消息?

这通常是由于消费者无法及时处理消息或网络延迟导致的。确保消费者代码中没有过长的 time.sleep 延迟或其他阻塞操作,及时 basic_ack 确认消息。

6. 注意事项

  • 持久化消息delivery_mode=2 确保消息持久化,但请注意磁盘空间限制。
  • 性能优化:根据消费者需求调整 prefetch_count,避免过多占用内存。
  • 异常处理:建议在生产者中添加异常捕获机制,防止连接中断导致消息丢失。

通过以上配置和优化,可以实现可靠、高效的消息生产与消费。

转载地址:http://kmkhz.baihongyu.com/

你可能感兴趣的文章
Objective-C实现currency converter货币换算算法(附完整源码)
查看>>
Objective-C实现cycle sort循环排序算法(附完整源码)
查看>>
Objective-C实现data transformations数据转换算法(附完整源码)
查看>>
Objective-C实现DBSCAN聚类算法(附完整源码)
查看>>
Objective-C实现DBSCAN聚类算法(附完整源码)
查看>>
Objective-C实现decision tree决策树算法(附完整源码)
查看>>
Objective-C实现degreeToRadian度到弧度算法(附完整源码)
查看>>
Objective-C实现depth first search深度优先搜索算法(附完整源码)
查看>>
Objective-C实现des文件加密算法(附完整源码)
查看>>
Objective-C实现deutsch jozsa算法(附完整源码)
查看>>
Objective-C实现DFS判断是否是二分图Bipartite算法(附完整源码)
查看>>
Objective-C实现Diffie-Hellman算法(附完整源码)
查看>>
Objective-C实现Dijkstra最小路径算法(附完整源码)
查看>>
Objective-C实现dijkstra迪杰斯特拉算法(附完整源码)
查看>>
Objective-C实现Dijkstra迪杰斯特拉算法(附完整源码)
查看>>
Objective-C实现dijkstra银行家算法(附完整源码)
查看>>
Objective-C实现Dinic算法(附完整源码)
查看>>
Objective-C实现disjoint set不相交集算法(附完整源码)
查看>>
Objective-C实现DisjointSet并查集的算法(附完整源码)
查看>>
Objective-C实现djb2哈希算法(附完整源码)
查看>>