博客
关于我
RabbitMQ(三):Work Queues
阅读量:684 次
发布时间:2019-03-17

本文共 2728 字,大约阅读时间需要 9 分钟。

RabbitMQ 消息生产与消费实践指南

1. 参考链接

本文基于 RabbitMQ 官方文档和实践经验编写,未引用外部链接。

2. 代码实现

2.1 生产者定义

生产者负责将消息发布到 RabbitMQ 中。以下是关键参数的说明:

  • durable = True:表示消息持久化。如果生产者宕机,消息会保存在硬盘中(可在 RabbitMQ 管理界面指定存储路径)。
  • delivery_mode = 2:与 durable 一起使用,可确保 RabbitMQ 服务重启后消息不会丢失。
  • auto_ack = True:默认开启的应答参数,确保生产者在消费者未反馈时不会重新发送消息。
import pikaimport datetimeif __name__ == "__main__":    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))    channel = connection.channel()    channel.queue_declare(queue='tom', durable=True)        while True:        message = str(datetime.datetime.now())        channel.basic_publish(            exchange='',            routing_key='tom',            body=message,            properties=pika.BasicProperties(                delivery_mode=2            )        )        print("[x] sent %r" % message)        time.sleep(1)    connection.close()

2.2 消费者定义

消费者负责接收并处理消息。以下是关键代码说明:

  • call_back 函数接收消息并打印接收时间,time.sleep() 用于模拟处理延迟。
  • basic_ack 确认消息已成功处理。
import pikaimport timedef call_back(ch, method, properties, body):    print('[*] recieved %r' % body.decode())    time.sleep(body.count(b'.') + 2)  # 添加额外延迟    print('[*] done')    ch.basic_ack(delivery_tag=method.delivery_tag)if __name__ == "__main__":    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))    channel = connection.channel()    channel.queue_declare(queue='tom', durable=True)    channel.basic_qos(prefetch_count=1)  # 最多获取一个消息    channel.basic_consume(queue='tom', on_message_callback=call_back)    channel.start_consuming()

3. 代码运行结果

3.1 生产者行为

  • 生产者会持续循环发布消息,每隔 1 秒发送一条时间戳消息。
  • 每条消息都会打印 [x] sent 的日志。

3.2 消费者行为

  • 消费者会等待消息并处理。
  • 消息处理时间延迟取决于消息内容中点号数量加 2 秒。
  • 消费者会在处理完成后确认消息,确保消息可靠传输。

4. Windows 查看未处理消息队列

在 Windows 环境下,使用以下命令查看 RabbitMQ 队列状态:

rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

这样可以查看队列名称、已读消息数和未确认消息数。

5. 疑问解答

5.1 如何让生产者持续发送消息?

在生产者代码中添加一个无限循环即可:

import pikaimport datetimeimport timeif __name__ == "__main__":    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))    channel = connection.channel()    channel.queue_declare(queue='tom', durable=True)        while True:        message = str(datetime.datetime.now())        channel.basic_publish(            exchange='',            routing_key='tom',            body=message,            properties=pika.BasicProperties(                delivery_mode=2            )        )        print("[x] sent %r" % message)        time.sleep(1)    connection.close()

5.2 为什么会有未处理消息?

这通常是由于消费者无法及时处理消息或网络延迟导致的。确保消费者代码中没有过长的 time.sleep 延迟或其他阻塞操作,及时 basic_ack 确认消息。

6. 注意事项

  • 持久化消息delivery_mode=2 确保消息持久化,但请注意磁盘空间限制。
  • 性能优化:根据消费者需求调整 prefetch_count,避免过多占用内存。
  • 异常处理:建议在生产者中添加异常捕获机制,防止连接中断导致消息丢失。

通过以上配置和优化,可以实现可靠、高效的消息生产与消费。

转载地址:http://kmkhz.baihongyu.com/

你可能感兴趣的文章
oracle 并集 时间_Oracle集合运算符 交集 并集 差集
查看>>
Oracle 序列sequence 开始于某个值(10)执行完nextval 发现查出的值比10还小的解释
查看>>
ORACLE 异常错误处理
查看>>
oracle 执行一条查询语句,把数据加载到页面或者前台发生的事情
查看>>
oracle 批量生成建同义词语句和付权语句
查看>>
oracle 抓包工具,shell 安装oracle和pfring(抓包) 及自动环境配置
查看>>
Oracle 拆分以逗号分隔的字符串为多行数据
查看>>
Oracle 排序中使用nulls first 或者nulls last 语法
查看>>
oracle 插入date日期类型的数据、插入从表中查出的数据,使用表中的默认数据
查看>>
Oracle 操作笔记
查看>>
oracle 数据库 安装 和优化
查看>>
oracle 数据库dg搭建规范1
查看>>
Oracle 数据库常用SQL语句(1)
查看>>
Oracle 数据库特殊查询总结
查看>>
Oracle 数据类型
查看>>
Oracle 数据自动备份 通过EXP备份
查看>>
oracle 数据迁移 怎么保证 和原表的数据顺序一致_一个比传统数据库快 1001000 倍的数据库,来看一看?...
查看>>
oracle 时间函数
查看>>
oracle 时间转化函数及常见函数 .
查看>>
Oracle 权限(grant、revoke)
查看>>