def compare_RabbitMQ(data):
mq_host = data.pop("mq_host")
mq_user = data.pop("mq_user")
mq_password = data.pop("mq_password")
mq_port = data.pop("mq_port")
virtual_host = data.pop("virtual_host")
for field in data.keys():
queuename = field
connection = pika.BlockingConnection(pika.ConnectionParameters(mq_host, mq_port, virtual_host))
channel = connection.channel()
actul = connectRabbitMQ(channel, queuename, 1)
def connectRabbitMQ(channel, queuename, size):
message_list = []
global count1
count1 = 0
def callback(ch, method, properties, body):
'''回调函数,处理从rabbitmq中取出的消息'''
global li, count1
# print('[x] %r:%r' % (method.routing_key, body))
li = body.decode('utf-8')
message_list.append("%s" % li)
channel.basic_ack(delivery_tag=method.delivery_tag) # 告诉生成者,消息处理完成
count1 += 1
if count1 >= size: # 需要做时间处理判断是否停止
channel.stop_consuming()
return
channel.basic_consume(callback, queue=queuename) # no_ack写的话,如果接收消息,机器宕机消息就丢了
channel.start_consuming() # 开始监听接受消息
print(message_list)
return message_list