在上一章中,我们创建了一个工作队列,工作队列模式的设想是每一条消息只会被转发给一个消费者。本章将会讲解完全不一样的场景: 我们会把一个消息转发给多个消费者,这种模式称之为发布-订阅模式。

为了阐述这个模式,我们将会搭建一个简单的日志系统,它包含两种程序:一种发送日志消息,另一种接收并打印日志消息。在这个日志系统里,每一个运行的消费者都可以获取到消息,在这种情况下,我们可以实现这种需求:一个消费者接收消息并写入磁盘,另一个消费者接收消息并打印在电脑屏幕上。简单来说,生产者发布的消息将会以广播的形式转发到所有的消费者。

1、交换器(Exchange)

在前两章节我们,我们往队列中发布消息或获取消息,然而,前面的讲解其实并不完整,接下来,是时候介绍完整的RabbitMq消息模型了。

回忆一下我们前两章指南中包含的内容:

rabbitmq消息模式的核心思想是:一个生产者并不会直接往一个队列中发送消息,事实上,生产者根本不知道它发送的消息将被转发到哪些队列。

实际上,生产者只能把消息发送给一个exchange,exchange只做一件简单的事情:一方面它们接收从生产者发送过来的消息,另一方面,它们把接收到的消息推送给队列。一个exchage必须清楚地知道如何处理一条消息。

有四种类型的交换器,分别是:direct、topic、headers、fanout。本章主要讲解最后一种:fanous(广播模式)。下面创建一个fanout类型的交换器,我们称之为:logs:

1 channel.exchangeDeclare("logs", "fanout");

广播模式交换器很简单,从字面意思也能理解,它其实就是把接收到的消息推送给所有它知道的队列。在我们的日志系统中正好需要这种模式。

如果想查看当前系统中有多少个exchange,可以使用以下命令:

sudo rabbitmqctl list_exchanges

rabbitmq什么消息会进入死信队列(RabbitMQ指南之三发布订阅模式)(1)

或者通过控制台查看:

rabbitmq什么消息会进入死信队列(RabbitMQ指南之三发布订阅模式)(2)

可以看到有很多以amq.*开头的交换器,以及(AMQP default)默认交换器,这些是默认创建的交换器。

在前面两章的指南中,我们并不知道交换器的存在,但是依然可以将消息发送到队列中,那其实并不是因为我们可以不使用交换器,实际上是我们使用了默认的交换器(我们通过指定交换器为字字符串:""),回顾一下我们之前是如何发送消息的:

1 channel.basicPublish("", "hello", null, message.getBytes());

第一个参数是交换器的名字,空字符串表示它是一个默认或无命名的交换器,消息将会由指定的路由键(第二个参数,routingKey,后面会讲)转发到队列。

你可能会有疑问:既然exchange可以指定为空字符串(""),那么可否指定为null?

答案是:不能!

通过跟踪发布消息的代码,在AMQImpl类中的Publish()方面中,可以看到,不光是exchange不能为null,同时routingKey路由键也不能为null,否则会抛出异常:

rabbitmq什么消息会进入死信队列(RabbitMQ指南之三发布订阅模式)(3)

接着上面的讲解,我们创建一个命名的交换器:

1 channel.basicPublish( "logs", "", null, message.getBytes());

2、临时队列

在前两章的例子中,我们使用的队列都是有具体的队列名,创建命名队列是很必要的,因为我们需要将消费者指向同一名字的队列。因此,要想在生产者和消费者中间共享队列就必须要使用命名队列。

但是,本章讲解的日志系统也可以使用非命名队列(可以不手动命名),我们希望收到所有日志消息,而不是部分。并且我们希望总是接收到新的日志消息而不是旧的日志消息。为了解决这个问题,需要分两步走。

首先,无论何时我们的消费者连接到RabbitMq,我们都需要一个新的、空的队列来接收日志消息,因此,消费者在连接上RabbitMq之后需要创建一个任意名字的队列,或者让RabbitMq生成任意的队列名字。

其次,一旦该消费者断开了与RabbitMq的连接,队列也被自动删除。

通过JAVA客户端的无参方法:queueDeclare()来创建一个非持久化、专有的、自动删除的、名字随机生成的队列。

1 String queueName = channel.queueDeclare().getQueue();

3、绑定(Binding)

rabbitmq什么消息会进入死信队列(RabbitMQ指南之三发布订阅模式)(4)

前面广播模式的交换器和队列已经创建好了,接下来就是要告诉交换器向队列里发送消息。交换器与队列之间的关系称之为绑定关系。

1 channel.queueBind(queueName, "logs", "");

至此,交换器已经可以往队列中发送消息了。

可以通过下列命令来查看队列的绑定关系:

rabbitmq什么消息会进入死信队列(RabbitMQ指南之三发布订阅模式)(5)

4、完整的代码

EmitLog.java

1 import com.rabbitmq.client.BuiltinExchangeType; 2 import com.rabbitmq.client.Channel; 3 import com.rabbitmq.client.Connection; 4 import com.rabbitmq.client.Connectionfactory; 5 6 public class EmitLog { 7 8 private static final String EXCHANGE_NAME = "logs"; 9 10 public static void main(String[] args) throws Exception { 11 12 ConnectionFactory factory = new ConnectionFactory(); 13 factory.setHost("192.168.92.130"); 14 15 try (Connection connection = factory.newConnection(); 16 Channel channel = connection.createChannel();) { 17 18 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); 19 20 String message = "RabbitMq fanout。。。。。。"; 21 channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes("utf-8")); 22 23 System.out.println(" [x] Sent '" message "'"); 24 } 25 } 26 }

正好你所看到的,Connection创建完成之后,定义了exchange,这一步是必要的,因为如果没有交换器将无法发送消息。

如此没有队列绑定到该交换器上,那么,交换器收到的消息将会丢失,但是对我们本章的日志系统来说没问题的,当没有消费者时,我们可以安全地放弃掉数据,我们只接收最新的日志消息。

ReceiveLogs.java

1 public class ReceiveLogs { 2 3 private static final String EXCHANGE_NAME = "logs"; 4 5 public static void main(String[] args) throws Exception { 6 7 ConnectionFactory factory = new ConnectionFactory(); 8 factory.setHost("192.168.92.130"); 9 10 Connection connection = factory.newConnection(); 11 Channel channel = connection.createChannel(); 12 13 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); 14 15 final String queue = channel.queueDeclare().getQueue(); 16 channel.queueBind(queue,EXCHANGE_NAME,""); 17 18 System.out.println(" [*] Waiting for messages. To exit press CTRL C"); 19 20 DeliverCallback deliverCallback = (consumerTa,delivery) -> { 21 22 String message = new String(delivery.getBody(), "UTF-8"); 23 System.out.println(" [x] Received '" message "'"); 24 25 }; 26 27 channel.basicConsume(queue,true,deliverCallback,consumerTag -> {}); 28 } 29 }

这里的autoAck设置为true,因为我们这里是广播模式,每个消费者都会收到一样的消息,并且这里给消费者生产的随机名称的队列相当于是独有的,所以在接收到消息之后立即发送确认回执是OK的。

但是这里先提出一个疑问:在这种模式下,每个队列收到的消息是否也会有Ready和Unacked状态?

5、测试结果

一、首先启动生产者,再启动两个消费者

rabbitmq什么消息会进入死信队列(RabbitMQ指南之三发布订阅模式)(6)

rabbitmq什么消息会进入死信队列(RabbitMQ指南之三发布订阅模式)(7)

rabbitmq什么消息会进入死信队列(RabbitMQ指南之三发布订阅模式)(8)

可以看到,生产者启动后发送的消息丢失了,两个消费者并没有消费到,此时再看控制台:

rabbitmq什么消息会进入死信队列(RabbitMQ指南之三发布订阅模式)(9)

可见RabbitMq为我们创建了两个随机命名的队列,其Exclusive是Owner,表示是专有的,Parameters为AD(auto delete),拥有该队列的消费者一占断开连接,队列将会被自动删除。

二、其次启动生产者发送一次消息

rabbitmq什么消息会进入死信队列(RabbitMQ指南之三发布订阅模式)(10)

rabbitmq什么消息会进入死信队列(RabbitMQ指南之三发布订阅模式)(11)

两个消费都都收到了消息。

三、关闭所有消费者,观察控制台变化

rabbitmq什么消息会进入死信队列(RabbitMQ指南之三发布订阅模式)(12)

两个专有随机队列自动删除了。

6、Springboot的实现

工程结构图:

rabbitmq什么消息会进入死信队列(RabbitMQ指南之三发布订阅模式)(13)

一、配置文件application.properties:

生产者:

#RabbitMq spring.rabbitmq.host=192.168.92.130 spring.rabbitmq.exchange=logs

消费者:

#RabbitMq spring.rabbitmq.host=192.168.92.130 spring.rabbitmq.exchange=logs ##队列--我们可以自己指定队列名称,也可以由RabbitMq自动生成,这里为了方便,我们自己命名(如果需要,我也可以写一个自动生成名称的方法) rqbbitmq.log.fanout.info=info rqbbitmq.log.fanout.error=error server.port=8090

二、生产者代码

这里为了让系统生产者启动时就自动发送一条消息,我加了一个EmitLogRunner类。

EmitLog.java

1 import org.springframework.amqp.core.AmqpTemplate; 2 import org.springframework.beans.factory.annotation.Autowired; 3 import org.springframework.beans.factory.annotation.Value; 4 import org.springframework.stereotype.Component; 5 6 @Component 7 public class EmitLog { 8 9 @Value("${spring.rabbitmq.exchange}") 10 private String exchange; 11 12 @Autowired 13 private AmqpTemplate amqpTemplate; 14 15 public void send(String msg) { 16 amqpTemplate.convertAndSend(exchange,"",msg); 17 } 18 }

EmitLogRunner.java

import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.stereotype.Component; @Component public class EmitLogRunner implements ApplicationRunner { @Autowired private EmitLog emitLog; @Override public void run(ApplicationArguments args) throws Exception { System.out.println("生产者发布消息:" msg); emitLog.send("RabbitMq fanout test message"); } }

二、消费者代码

ReceiveInfoLogs.java

@Component @RabbitListener( bindings = @QueueBinding( value = @Queue(value = "${rqbbitmq.log.fanout.info}",autoDelete = "true"), exchange = @Exchange(value = "${spring.rabbitmq.exchange}",type = ExchangeTypes.FANOUT) ) ) public class ReceiveInfoLogs { @Autowired private AmqpTemplate amqpTemplate; @RabbitHandler public void receiveInfoLog (Object message) { System.out.println("接收到info级别的日志:" message); } }

ReceiveErrorLogs.java

import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component @RabbitListener( bindings = @QueueBinding( value = @Queue(value = "${rqbbitmq.log.fanout.error}",autoDelete = "true"), exchange = @Exchange(value = "${spring.rabbitmq.exchange}",type = ExchangeTypes.FANOUT) ) ) public class ReceiveErrorLogs { @Autowired private AmqpTemplate amqpTemplate; @RabbitHandler public void receiveErrorLog(Object message) { System.out.println("接收到的error级别日志:" message); } }

注意看一下注解方式bindings里面都是以@开头并加上对应的要绑定的项,琢磨一下应该都能理解。

三、验证

启动消费者和生产者,查看控制台:

rabbitmq什么消息会进入死信队列(RabbitMQ指南之三发布订阅模式)(14)

rabbitmq什么消息会进入死信队列(RabbitMQ指南之三发布订阅模式)(15)

未完待续,关注我,后续更多干货奉上

,