当前位置:脚本大全 > > 正文

java入坑rabbitmq(Python操作rabbitMQ的示例代码)

时间:2021-11-03 15:51:39类别:脚本大全

java入坑rabbitmq

Python操作rabbitMQ的示例代码

引入

rabbitmq 是一个由 erlang 语言开发的 amqp 的开源实现。

rabbitmq是一款基于amqp协议的消息中间件,它能够在应用之间提供可靠的消息传输。在易用性,扩展性,高可用性上表现优秀。使用消息中间件利于应用之间的解耦,生产者(客户端)无需知道消费者(服务端)的存在。而且两端可以使用不同的语言编写,大大提供了灵活性。

java入坑rabbitmq(Python操作rabbitMQ的示例代码)

中文文档

安装

  • ?
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • # 安装配置epel源
  •   rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm
  •  
  • # 安装erlang
  •   yum -y install erlang
  •  
  • # 安装rabbitmq
  •   yum -y install rabbitmq-server
  •  
  • # 启动/停止
  •   service rabbitmq-server start/stop
  • rabbitmq工作模型

    简单模式

    生产者

  • ?
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • import pika
  • connection = pika.blockingconnection(pika.connectionparameters( host='localhost'))
  •  
  • channel = connection.channel()
  •  
  • channel.queue_declare(queue='hello')
  •  
  • channel.basic_publish(exchange='',
  •            routing_key='hello',
  •            body='hello world!')
  •  
  • print(" [x] sent 'hello world!'")
  • connection.close()
  • 消费者

  • ?
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • connection = pika.blockingconnection(pika.connectionparameters(host='localhost'))
  • channel = connection.channel()
  •  
  • channel.queue_declare(queue='hello')
  •  
  • def callback(ch, method, properties, body):
  •   print(" [x] received %r" % body)
  •  
  • channel.basic_consume( callback,
  •             queue='hello',
  •             no_ack=true)
  •  
  • print(' [*] waiting for messages. to exit press ctrl+c')
  • channel.start_consuming()
  • 相关参数

    1,no-ack = false

    如果消费者遇到情况(its channel is closed, connection is closed, or tcp connection is lost)挂掉了,那么,rabbitmq会重新将该任务添加到队列中。

    接收消息端应该这么写:

  • ?
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • import pika
  •  
  • connection = pika.blockingconnection(pika.connectionparameters(
  •     host='10.211.55.4'))
  • channel = connection.channel()
  •  
  • channel.queue_declare(queue='hello')
  •  
  • def callback(ch, method, properties, body):
  •   print(" [x] received %r" % body)
  •   import time
  •   time.sleep(10)
  •   print 'ok'
  •   ch.basic_ack(delivery_tag = method.delivery_tag)
  •  
  • channel.basic_consume(callback,
  •            queue='hello',
  •            no_ack=false)
  •  
  • print(' [*] waiting for messages. to exit press ctrl+c')
  • channel.start_consuming()
  • 2,durable :消息不丢失

    生产者

  • ?
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • import pika
  •  
  • connection = pika.blockingconnection(pika.connectionparameters(host='10.211.55.4'))
  • channel = connection.channel()
  •  
  • # make message persistent
  • channel.queue_declare(queue='hello', durable=true)
  •  
  • channel.basic_publish(exchange='',
  •            routing_key='hello',
  •            body='hello world!',
  •            properties=pika.basicproperties(
  •              delivery_mode=2, # make message persistent
  •            ))
  • print(" [x] sent 'hello world!'")
  • connection.close()
  • 3,消息获取顺序

    默认消息队列里的数据是按照顺序被消费者拿走,例如:消费者1 去队列中获取 奇数 序列的任务,消费者1去队列中获取 偶数 序列的任务。

    channel.basic_qos(prefetch_count=1) 表示谁来谁取,不再按照奇偶数排列

  • ?
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • import pika
  •  
  • connection = pika.blockingconnection(pika.connectionparameters(host='10.211.55.4'))
  • channel = connection.channel()
  •  
  • # make message persistent
  • channel.queue_declare(queue='hello')
  •  
  •  
  • def callback(ch, method, properties, body):
  •   print(" [x] received %r" % body)
  •   import time
  •   time.sleep(10)
  •   print 'ok'
  •   ch.basic_ack(delivery_tag = method.delivery_tag)
  •  
  • channel.basic_qos(prefetch_count=1)
  •  
  • channel.basic_consume(callback,
  •            queue='hello',
  •            no_ack=false)
  •  
  • print(' [*] waiting for messages. to exit press ctrl+c')
  • channel.start_consuming()
  • exchange模型

    1,发布订阅

    java入坑rabbitmq(Python操作rabbitMQ的示例代码)

    发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,rabbitmq实现发布和订阅时,会为每一个订阅者创建一个队列,而发布者发布消息时,会将消息放置在所有相关队列中。

    exchange type = fanout

    生产者

  • ?
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • import pika
  • import sys
  •  
  • connection = pika.blockingconnection(pika.connectionparameters(
  •     host='localhost'))
  • channel = connection.channel()
  •  
  • channel.exchange_declare(exchange='logs',
  •              type='fanout')
  •  
  • message = ' '.join(sys.argv[1:]) or "info: hello world!"
  • channel.basic_publish(exchange='logs',
  •            routing_key='',
  •            body=message)
  • print(" [x] sent %r" % message)
  • connection.close()
  • 消费者

  • ?
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • import pika
  •  
  • connection = pika.blockingconnection(pika.connectionparameters(
  •     host='localhost'))
  • channel = connection.channel()
  •  
  • channel.exchange_declare(exchange='logs',
  •              type='fanout')
  •  
  • result = channel.queue_declare(exclusive=true)
  • queue_name = result.method.queue
  •  
  • channel.queue_bind(exchange='logs',
  •           queue=queue_name)
  •  
  • print(' [*] waiting for logs. to exit press ctrl+c')
  •  
  • def callback(ch, method, properties, body):
  •   print(" [x] %r" % body)
  •  
  • channel.basic_consume(callback,
  •            queue=queue_name,
  •            no_ack=true)
  •  
  • channel.start_consuming()
  • 2,关键字发送

    java入坑rabbitmq(Python操作rabbitMQ的示例代码)

    之前事例,发送消息时明确指定某个队列并向其中发送消息,rabbitmq还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。

    exchange type = direct

  • ?
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • import pika
  • import sys
  •  
  • connection = pika.blockingconnection(pika.connectionparameters(
  •     host='localhost'))
  • channel = connection.channel()
  •  
  • channel.exchange_declare(exchange='direct_logs',
  •              type='direct')
  •  
  • result = channel.queue_declare(exclusive=true)
  • queue_name = result.method.queue
  •  
  • severities = sys.argv[1:]
  • if not severities:
  •   sys.stderr.write("usage: %s [info] [warning] [error]\n" % sys.argv[0])
  •   sys.exit(1)
  •  
  • for severity in severities:
  •   channel.queue_bind(exchange='direct_logs',
  •             queue=queue_name,
  •             routing_key=severity)
  •  
  • print(' [*] waiting for logs. to exit press ctrl+c')
  •  
  • def callback(ch, method, properties, body):
  •   print(" [x] %r:%r" % (method.routing_key, body))
  •  
  • channel.basic_consume(callback,
  •            queue=queue_name,
  •            no_ack=true)
  •  
  • channel.start_consuming()
  • 3,模糊匹配

    java入坑rabbitmq(Python操作rabbitMQ的示例代码)

    exchange type = topic

    发送者路由值 队列中
    old.boy.python old.* -- 不匹配
    old.boy.python old.# -- 匹配

    在topic类型下,可以让队列绑定几个模糊的关键字,之后发送者将数据发送到exchange,exchange将传入”路由值“和 ”关键字“进行匹配,匹配成功,则将数据发送到指定队列。

  • ?
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • import pika
  • import sys
  •  
  • connection = pika.blockingconnection(pika.connectionparameters(
  •     host='localhost'))
  • channel = connection.channel()
  •  
  • channel.exchange_declare(exchange='topic_logs',
  •              type='topic')
  •  
  • result = channel.queue_declare(exclusive=true)
  • queue_name = result.method.queue
  •  
  • binding_keys = sys.argv[1:]
  • if not binding_keys:
  •   sys.stderr.write("usage: %s [binding_key]...\n" % sys.argv[0])
  •   sys.exit(1)
  •  
  • for binding_key in binding_keys:
  •   channel.queue_bind(exchange='topic_logs',
  •             queue=queue_name,
  •             routing_key=binding_key)
  •  
  • print(' [*] waiting for logs. to exit press ctrl+c')
  •  
  • def callback(ch, method, properties, body):
  •   print(" [x] %r:%r" % (method.routing_key, body))
  •  
  • channel.basic_consume(callback,
  •            queue=queue_name,
  •            no_ack=true)
  •  
  • channel.start_consuming()
  • 基于rabbitmq的rpc

     callback queue 回调队列

    一个客户端向服务器发送请求,服务器端处理请求后,将其处理结果保存在一个存储体中。而客户端为了获得处理结果,那么客户在向服务器发送请求时,同时发送一个回调队列地址 reply_to

    correlation id 关联标识

    一个客户端可能会发送多个请求给服务器,当服务器处理完后,客户端无法辨别在回调队列中的响应具体和那个请求时对应的。为了处理这种情况,客户端在发送每个请求时,同时会附带一个独有 correlation_id 属性,这样客户端在回调队列中根据 correlation_id 字段的值就可以分辨此响应属于哪个请求。

    客户端发送请求:

    某个应用将请求信息交给客户端,然后客户端发送rpc请求,在发送rpc请求到rpc请求队列时,客户端至少发送带有reply_to以及correlation_id两个属性的信息

    服务端工作流:

    等待接受客户端发来rpc请求,当请求出现的时候,服务器从rpc请求队列中取出请求,然后处理后,将响应发送到reply_to指定的回调队列中

    客户端接受处理结果:

    客户端等待回调队列中出现响应,当响应出现时,它会根据响应中correlation_id字段的值,将其返回给对应的应用

    服务者

  • ?
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 标签:

    猜您喜欢