本文共 2399 字,大约阅读时间需要 7 分钟。
定义生产者,其中有下列参数需要留意:
import pikaimport datetimeif __name__ == "__main__": connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel3 = connection.channel() channel3.queue_declare(queue='tom', durable=True) # message = ''.join(sys.argv[1:]) or "tom and jerry" message = str(datetime.datetime.now()) channel3.basic_publish( exchange='', routing_key='tom', body=message, properties=pika.BasicProperties( delivery_mode=2 ) ) print("[x] sent %r"%message) connection.close()
定义消费者
import pikaimport timedef call_back(ch, method, properties, body): print('[*] recieved %r'%body.decode()) time.sleep(body.count(b'.')) print('[*] done') ch.basic_ack(delivery_tag = method.delivery_tag)if __name__ == '__main__': connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='tom', durable=True) print('[*] waiting for message, to exit use ctrl + c') channel.basic_qos(prefetch_count=1) channel.basic_consume(queue='tom', on_message_callback=call_back) channel.start_consuming()
定义一个生产者:
两个消费者的接受消息:
因为如果消息一直未处理,将不断的存入内容,消息的消费速度赶不上生者的生产速度,将会导致消息内容不断增加,使用如下命令查看相关消息
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
1. 如果想要生产者不断的发送消息,应该怎么处理,同第二篇文章问题
import pikaimport datetimeimport timeif __name__ == "__main__": connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel3 = connection.channel() channel3.queue_declare(queue='tom', durable=True) # message = ''.join(sys.argv[1:]) or "tom and jerry" while True: message = str(datetime.datetime.now()) channel3.basic_publish( exchange='', routing_key='tom', body=message, properties=pika.BasicProperties( delivery_mode=2 ) ) time.sleep(1) print("[x] sent %r"%message) connection.close()
输出如下
转载地址:http://kmkhz.baihongyu.com/