本文共 6471 字,大约阅读时间需要 21 分钟。
在前两章的学习中,我们了解了消息生产者如何将消息发送到队列,以及消息消费者如何从队列中获取消息。然而,这些知识仅提供了RabbitMQ消息模型的表面。真正的核心是交换器(Exchange)的概念。在这一章中,我们将深入探讨交换器的功能,以及如何利用它来实现消息的广播传播。
在之前的指南中,我们提到的消息模型非常简单:
然而,RabbitMQ的真正核心思想是:生产者并不会直接将消息发送到特定的队列。实际上,生产者只会将消息发送到一个交换器(Exchange),而交换器负责决定消息的最终去向。生产者完全无需知道消息最终会被转发到哪些队列。
交换器的主要职责是接收消息并将其转发给指定的队列。RabbitMQ提供了四种类型的交换器:
本章将重点介绍Fanout Exchange(广播模式),因为它正是我们日志系统所需的核心功能。
为了实现日志系统的需求,我们需要一个能够将消息广播到多个消费者的系统。以下是实现这一目标的关键步骤:
首先,我们需要创建一个名为logs
的Fanout Exchange。RabbitMQ提供了多种交换器类型,我们可以通过代码明确指定:
channel.exchangeDeclare("logs", "fanout");
在本章中,我们采用非持久化、自动删除的方式来处理队列。这种方式特别适合广播模式,因为每个消费者都需要一个独立的队列。我们可以通过以下代码创建一个新的临时队列:
String queueName = channel.queueDeclare().getQueue();
接下来,我们需要将交换器与队列绑定。通过绑定,我们告诉交换器如何将消息转发到队列。以下是绑定的代码示例:
channel.queueBind(queueName, "logs", "");
通过上述代码,我们完成了交换器与队列的绑定关系。现在,交换器接收到的消息将被转发到指定的队列。
在生产者发送消息到交换器后,消息会被转发到所有绑定的队列。消费者需要监听这些队列,获取消息。以下是消费者的代码实现:
public class ReceiveLogs { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.92.130"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); String queue = channel.queueDeclare().getQueue(); channel.queueBind(queue, EXCHANGE_NAME, ""); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); }; channel.basicConsume(queue, true, deliverCallback, consumerTag -> {}); }}
在代码中,我们可以看到以下关键点:
true
,表示消费者接收消息后会立即发送确认回执。queueBind
方法将队列与交换器绑定。DeliverCallback
中定义了消息接收的处理逻辑。为了验证我们的实现是否正确,我们可以进行以下测试:
生产者负责发送消息到交换器。以下是生产者的代码:
public class EmitLog { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.92.130"); try (Connection connection = factory.newConnection()) { Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); String message = "RabbitMQ fanout test message"; channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("utf-8")); System.out.println(" [x] Sent '" + message + "'"); } }}
每个消费者都会 listening 到不同的队列。为了方便测试,我们可以手动启动两个消费者实例。通过运行以下代码,可以启动两个消费者:
public class ReceiveLogs { public static void main(String[] args) throws Exception { // 上述代码已省略 }}
在生产者发送消息后,两个消费者都会接收到消息。通过查看控制台输出,可以看到以下信息:
[*] Waiting for messages. To exit press CTRL+C [x] Received "RabbitMQ fanout test message" [x] Received "RabbitMQ fanout test message"
这表明我们的实现是正确的。
在实际项目中,手动配置代码可能不够简便。我们可以通过Spring Boot来简化配置和管理。
src/main/java/com/example/rabbitmq/├── EmitLog.java├── ReceiveInfoLogs.java├── ReceiveErrorLogs.java├── EmitLogRunner.java└── application.properties
# RabbitMQ配置spring.rabbitmq.host=192.168.92.130spring.rabbitmq.exchange=logs# 队列配置spring.rabbitmq.log.fanout.info=inforspring.rabbitmq.log.fanout.error=error
import org.springframework.amqp.core.AmqpTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Value;import org.springframework.stereotype.Component;@Componentpublic class EmitLog { @Value("${spring.rabbitmq.exchange}") private String exchange; @Autowired private AmqpTemplate amqpTemplate; public void send(String msg) { amqpTemplate.convertAndSend(exchange, "", msg); }}
import org.springframework.amqp.core.AmqpTemplate;import org.springframework.amqp.rabbit.annotation.*;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;@Component@RabbitListener(bindings = @QueueBinding( value = @Queue(value = "${rqbbitmq.log.fanout.info}", autoDelete = "true"), exchange = @Exchange(value = "${spring.rabbitmq.exchange}", type = ExchangeTypes.FANOUT)))public class ReceiveInfoLogs { @Autowired private AmqpTemplate amqpTemplate; @RabbitHandler public void receiveInfoLog(Object message) { System.out.println("接收到info级别的日志:" + message); }}
import org.springframework.amqp.core.AmqpTemplate;import org.springframework.amqp.core.ExchangeTypes;import org.springframework.amqp.rabbit.annotation.*;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;@Component@RabbitListener(bindings = @QueueBinding( value = @Queue(value = "${rqbbitmq.log.fanout.error}", autoDelete = "true"), exchange = @Exchange(value = "${spring.rabbitmq.exchange}", type = ExchangeTypes.FANOUT)))public class ReceiveErrorLogs { @Autowired private AmqpTemplate amqpTemplate; @RabbitHandler public void receiveErrorLog(Object message) { System.out.println("接收到的error级别日志:" + message); }}
import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.ApplicationRunner;import org.springframework.stereotype.Component;@Componentpublic class EmitLogRunner implements ApplicationRunner { @Autowired private EmitLog emitLog; @Override public void run(ApplicationArguments args) throws Exception { System.out.println("生产者发布消息:" + "测试消息"); emitLog.send("测试消息"); }}
通过上述代码,我们可以轻松地在Spring Boot项目中配置RabbitMQ的发布-订阅模式。
通过本章的学习,我们掌握了RabbitMQ发布-订阅模式的核心原理,特别是广播模式的实现方法。我们了解了如何通过交换器和队列实现消息的广播传播,并成功地在Spring Boot项目中集成了RabbitMQ。
在下一章中,我们将深入探讨RabbitMQ的路由功能,学习如何根据消息的属性(如消息头)进行智能路由。
转载地址:http://pszbz.baihongyu.com/