博客
关于我
RabbitMQ(三):Work Queues
阅读量:684 次
发布时间:2019-03-17

本文共 2399 字,大约阅读时间需要 7 分钟。

1. 参考链接:

2. 代码实现

定义生产者,其中有下列参数需要留意:

  • durable = True,此参数表示消息的持久化,如果生产者宕机了,设置为 True,可以保证即使宕机,消息会保存在硬盘里面(硬盘在RabbitMQ的管理界面可以指定);
  • delivery_mode = 2,此参数和durable一同使用,可以确保消息在RabbitMQ服务重启也不会丢失;
  • auto_ack = True, 应答参数,此参数默认打开,保证即使消费者宕机或无反馈,生产者不会重新发送这个消息;eg. 如果我们设置为False 并中断一个消费者consumer,生产者producer会重新发送一次消息;
    • 同时需要修改一串代码
    • ch.basic_asc(delivery_tag = method.delivery_tag)

 

import pikaimport datetimeif __name__ == "__main__":    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))    channel3 = connection.channel()    channel3.queue_declare(queue='tom', durable=True)    # message = ''.join(sys.argv[1:]) or "tom and jerry"    message = str(datetime.datetime.now())    channel3.basic_publish(        exchange='',        routing_key='tom',        body=message,        properties=pika.BasicProperties(            delivery_mode=2        )    )    print("[x] sent %r"%message)    connection.close()

定义消费者

import pikaimport timedef call_back(ch, method, properties, body):    print('[*] recieved %r'%body.decode())    time.sleep(body.count(b'.'))    print('[*] done')    ch.basic_ack(delivery_tag = method.delivery_tag)if __name__ == '__main__':    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))    channel = connection.channel()    channel.queue_declare(queue='tom', durable=True)    print('[*] waiting for message, to exit use ctrl + c')    channel.basic_qos(prefetch_count=1)    channel.basic_consume(queue='tom', on_message_callback=call_back)    channel.start_consuming()

3. 代码运行结果:

定义一个生产者:

两个消费者的接受消息:

4. Windows查看未处理的消息队列信息

因为如果消息一直未处理,将不断的存入内容,消息的消费速度赶不上生者的生产速度,将会导致消息内容不断增加,使用如下命令查看相关消息

rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

 

5. 疑问

1. 如果想要生产者不断的发送消息,应该怎么处理,同第二篇文章问题

import pikaimport datetimeimport timeif __name__ == "__main__":    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))    channel3 = connection.channel()    channel3.queue_declare(queue='tom', durable=True)    # message = ''.join(sys.argv[1:]) or "tom and jerry"    while True:        message = str(datetime.datetime.now())        channel3.basic_publish(            exchange='',            routing_key='tom',            body=message,            properties=pika.BasicProperties(                delivery_mode=2            )        )        time.sleep(1)        print("[x] sent %r"%message)    connection.close()

输出如下

 

转载地址:http://kmkhz.baihongyu.com/

你可能感兴趣的文章
Netty工作笔记0011---Channel应用案例2
查看>>
Netty工作笔记0012---Channel应用案例3
查看>>
Netty工作笔记0013---Channel应用案例4Copy图片
查看>>
Netty工作笔记0014---Buffer类型化和只读
查看>>
Netty工作笔记0015---MappedByteBuffer使用
查看>>
Netty工作笔记0019---Selector API介绍
查看>>
Netty工作笔记0020---Selectionkey在NIO体系
查看>>
Netty工作笔记0022---NIO快速入门--编写客户端
查看>>
Vue踩坑笔记 - 关于vue静态资源引入的问题
查看>>
Netty工作笔记0024---SelectionKey API
查看>>
Netty工作笔记0025---SocketChannel API
查看>>
Netty工作笔记0027---NIO 网络编程应用--群聊系统2--服务器编写2
查看>>
Netty工作笔记0028---NIO 网络编程应用--群聊系统3--客户端编写1
查看>>
Netty工作笔记0030---NIO与零拷贝原理剖析
查看>>
Netty工作笔记0034---Netty架构设计--线程模型
查看>>
Netty工作笔记0050---Netty核心模块1
查看>>
Netty工作笔记0057---Netty群聊系统服务端
查看>>
Netty工作笔记0060---Tcp长连接和短连接_Http长连接和短连接_UDP长连接和短连接
查看>>
Netty工作笔记0061---Netty心跳处理器编写
查看>>
Netty工作笔记0063---WebSocket长连接开发2
查看>>