博客
关于我
RabbitMQ(三):Work Queues
阅读量:684 次
发布时间:2019-03-17

本文共 2399 字,大约阅读时间需要 7 分钟。

1. 参考链接:

2. 代码实现

定义生产者,其中有下列参数需要留意:

  • durable = True,此参数表示消息的持久化,如果生产者宕机了,设置为 True,可以保证即使宕机,消息会保存在硬盘里面(硬盘在RabbitMQ的管理界面可以指定);
  • delivery_mode = 2,此参数和durable一同使用,可以确保消息在RabbitMQ服务重启也不会丢失;
  • auto_ack = True, 应答参数,此参数默认打开,保证即使消费者宕机或无反馈,生产者不会重新发送这个消息;eg. 如果我们设置为False 并中断一个消费者consumer,生产者producer会重新发送一次消息;
    • 同时需要修改一串代码
    • ch.basic_asc(delivery_tag = method.delivery_tag)

 

import pikaimport datetimeif __name__ == "__main__":    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))    channel3 = connection.channel()    channel3.queue_declare(queue='tom', durable=True)    # message = ''.join(sys.argv[1:]) or "tom and jerry"    message = str(datetime.datetime.now())    channel3.basic_publish(        exchange='',        routing_key='tom',        body=message,        properties=pika.BasicProperties(            delivery_mode=2        )    )    print("[x] sent %r"%message)    connection.close()

定义消费者

import pikaimport timedef call_back(ch, method, properties, body):    print('[*] recieved %r'%body.decode())    time.sleep(body.count(b'.'))    print('[*] done')    ch.basic_ack(delivery_tag = method.delivery_tag)if __name__ == '__main__':    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))    channel = connection.channel()    channel.queue_declare(queue='tom', durable=True)    print('[*] waiting for message, to exit use ctrl + c')    channel.basic_qos(prefetch_count=1)    channel.basic_consume(queue='tom', on_message_callback=call_back)    channel.start_consuming()

3. 代码运行结果:

定义一个生产者:

两个消费者的接受消息:

4. Windows查看未处理的消息队列信息

因为如果消息一直未处理,将不断的存入内容,消息的消费速度赶不上生者的生产速度,将会导致消息内容不断增加,使用如下命令查看相关消息

rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

 

5. 疑问

1. 如果想要生产者不断的发送消息,应该怎么处理,同第二篇文章问题

import pikaimport datetimeimport timeif __name__ == "__main__":    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))    channel3 = connection.channel()    channel3.queue_declare(queue='tom', durable=True)    # message = ''.join(sys.argv[1:]) or "tom and jerry"    while True:        message = str(datetime.datetime.now())        channel3.basic_publish(            exchange='',            routing_key='tom',            body=message,            properties=pika.BasicProperties(                delivery_mode=2            )        )        time.sleep(1)        print("[x] sent %r"%message)    connection.close()

输出如下

 

转载地址:http://kmkhz.baihongyu.com/

你可能感兴趣的文章
MySQL中B+Tree索引原理
查看>>
mysql中cast() 和convert()的用法讲解
查看>>
mysql中datetime与timestamp类型有什么区别
查看>>
MySQL中DQL语言的执行顺序
查看>>
mysql中floor函数的作用是什么?
查看>>
MySQL中group by 与 order by 一起使用排序问题
查看>>
mysql中having的用法
查看>>
MySQL中interactive_timeout和wait_timeout的区别
查看>>
mysql中int、bigint、smallint 和 tinyint的区别、char和varchar的区别详细介绍
查看>>
mysql中json_extract的使用方法
查看>>
mysql中json_extract的使用方法
查看>>
mysql中kill掉所有锁表的进程
查看>>
mysql中like % %模糊查询
查看>>
MySql中mvcc学习记录
查看>>
mysql中null和空字符串的区别与问题!
查看>>