博客
关于我
RabbitMQ(三):Work Queues
阅读量:684 次
发布时间:2019-03-17

本文共 2728 字,大约阅读时间需要 9 分钟。

RabbitMQ 消息生产与消费实践指南

1. 参考链接

本文基于 RabbitMQ 官方文档和实践经验编写,未引用外部链接。

2. 代码实现

2.1 生产者定义

生产者负责将消息发布到 RabbitMQ 中。以下是关键参数的说明:

  • durable = True:表示消息持久化。如果生产者宕机,消息会保存在硬盘中(可在 RabbitMQ 管理界面指定存储路径)。
  • delivery_mode = 2:与 durable 一起使用,可确保 RabbitMQ 服务重启后消息不会丢失。
  • auto_ack = True:默认开启的应答参数,确保生产者在消费者未反馈时不会重新发送消息。
import pikaimport datetimeif __name__ == "__main__":    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))    channel = connection.channel()    channel.queue_declare(queue='tom', durable=True)        while True:        message = str(datetime.datetime.now())        channel.basic_publish(            exchange='',            routing_key='tom',            body=message,            properties=pika.BasicProperties(                delivery_mode=2            )        )        print("[x] sent %r" % message)        time.sleep(1)    connection.close()

2.2 消费者定义

消费者负责接收并处理消息。以下是关键代码说明:

  • call_back 函数接收消息并打印接收时间,time.sleep() 用于模拟处理延迟。
  • basic_ack 确认消息已成功处理。
import pikaimport timedef call_back(ch, method, properties, body):    print('[*] recieved %r' % body.decode())    time.sleep(body.count(b'.') + 2)  # 添加额外延迟    print('[*] done')    ch.basic_ack(delivery_tag=method.delivery_tag)if __name__ == "__main__":    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))    channel = connection.channel()    channel.queue_declare(queue='tom', durable=True)    channel.basic_qos(prefetch_count=1)  # 最多获取一个消息    channel.basic_consume(queue='tom', on_message_callback=call_back)    channel.start_consuming()

3. 代码运行结果

3.1 生产者行为

  • 生产者会持续循环发布消息,每隔 1 秒发送一条时间戳消息。
  • 每条消息都会打印 [x] sent 的日志。

3.2 消费者行为

  • 消费者会等待消息并处理。
  • 消息处理时间延迟取决于消息内容中点号数量加 2 秒。
  • 消费者会在处理完成后确认消息,确保消息可靠传输。

4. Windows 查看未处理消息队列

在 Windows 环境下,使用以下命令查看 RabbitMQ 队列状态:

rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

这样可以查看队列名称、已读消息数和未确认消息数。

5. 疑问解答

5.1 如何让生产者持续发送消息?

在生产者代码中添加一个无限循环即可:

import pikaimport datetimeimport timeif __name__ == "__main__":    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))    channel = connection.channel()    channel.queue_declare(queue='tom', durable=True)        while True:        message = str(datetime.datetime.now())        channel.basic_publish(            exchange='',            routing_key='tom',            body=message,            properties=pika.BasicProperties(                delivery_mode=2            )        )        print("[x] sent %r" % message)        time.sleep(1)    connection.close()

5.2 为什么会有未处理消息?

这通常是由于消费者无法及时处理消息或网络延迟导致的。确保消费者代码中没有过长的 time.sleep 延迟或其他阻塞操作,及时 basic_ack 确认消息。

6. 注意事项

  • 持久化消息delivery_mode=2 确保消息持久化,但请注意磁盘空间限制。
  • 性能优化:根据消费者需求调整 prefetch_count,避免过多占用内存。
  • 异常处理:建议在生产者中添加异常捕获机制,防止连接中断导致消息丢失。

通过以上配置和优化,可以实现可靠、高效的消息生产与消费。

转载地址:http://kmkhz.baihongyu.com/

你可能感兴趣的文章
Oracle10g安装了11g的ODAC后,PL/SQL连接提示TNS:无法解析指定的连接标识符
查看>>
oracle11g dataguard物理备库搭建(关闭主库cp数据文件到备库)
查看>>
Oracle11G基本操作
查看>>
Oracle11g服务详细介绍及哪些服务是必须开启的?
查看>>
Oracle11g静默安装dbca,netca报错处理--直接跟换操作系统
查看>>
oracle12安装软件后安装数据库,然后需要自己配置监听
查看>>
Oracle——08PL/SQL简介,基本程序结构和语句
查看>>
Oracle——distinct的用法
查看>>
Oracle、MySQL、SQL Server架构大对比
查看>>
oracle下的OVER(PARTITION BY)函数介绍
查看>>
Oracle中DATE数据相减问题
查看>>
Oracle中merge into的使用
查看>>
oracle中sql查询上月、本月、上周、本周、昨天、今天的数据!
查看>>
oracle中sql的case语句运用--根据不同条件去排序!
查看>>
Oracle中Transate函数的使用
查看>>
oracle中关于日期问题的汇总!
查看>>
Oracle中常用的语句
查看>>
Oracle中序列的操作以及使用前对序列的初始化
查看>>
oracle中新建用户和赋予权限
查看>>
Oracle中的NVL,NVL2,NULLIF以及COALESCE函数使用
查看>>