博客
关于我
RabbitMQ指南之三:发布/订阅模式(Publish/Subscribe)
阅读量:458 次
发布时间:2019-03-06

本文共 6471 字,大约阅读时间需要 21 分钟。

RabbitMQ发布-订阅模式详解:广播模式与日志系统实现

1. 消息广播机制:交换器(Exchange)的核心作用

在前两章的学习中,我们了解了消息生产者如何将消息发送到队列,以及消息消费者如何从队列中获取消息。然而,这些知识仅提供了RabbitMQ消息模型的表面。真正的核心是交换器(Exchange)的概念。在这一章中,我们将深入探讨交换器的功能,以及如何利用它来实现消息的广播传播。

1.1 RabbitMQ消息模型的扩展

在之前的指南中,我们提到的消息模型非常简单:

  • 一个生产者用于发送消息
  • 一个队列用于缓存消息
  • 一个消费者用于从队列中获取消息

然而,RabbitMQ的真正核心思想是:生产者并不会直接将消息发送到特定的队列。实际上,生产者只会将消息发送到一个交换器(Exchange),而交换器负责决定消息的最终去向。生产者完全无需知道消息最终会被转发到哪些队列。

1.2 交换器的功能与类型

交换器的主要职责是接收消息并将其转发给指定的队列。RabbitMQ提供了四种类型的交换器:

  • Direct Exchange(直连交换器):消息按路由键(Routing Key)转发到指定的队列。
  • Topic Exchange(主题交换器):消息根据主题(Topic)进行分类转发。
  • Headers Exchange(头交换器):消息根据消息头中的属性进行转发。
  • Fanout Exchange(广播交换器):消息被转发给所有绑定的队列。

本章将重点介绍Fanout Exchange(广播模式),因为它正是我们日志系统所需的核心功能。

2. 广播模式的实现:创建交换器与队列

为了实现日志系统的需求,我们需要一个能够将消息广播到多个消费者的系统。以下是实现这一目标的关键步骤:

2.1 创建交换器

首先,我们需要创建一个名为logs的Fanout Exchange。RabbitMQ提供了多种交换器类型,我们可以通过代码明确指定:

channel.exchangeDeclare("logs", "fanout");

2.2 创建临时队列

在本章中,我们采用非持久化、自动删除的方式来处理队列。这种方式特别适合广播模式,因为每个消费者都需要一个独立的队列。我们可以通过以下代码创建一个新的临时队列:

String queueName = channel.queueDeclare().getQueue();

2.3 绑定交换器与队列

接下来,我们需要将交换器与队列绑定。通过绑定,我们告诉交换器如何将消息转发到队列。以下是绑定的代码示例:

channel.queueBind(queueName, "logs", "");

通过上述代码,我们完成了交换器与队列的绑定关系。现在,交换器接收到的消息将被转发到指定的队列。

3. 消费者接收消息:监听队列

在生产者发送消息到交换器后,消息会被转发到所有绑定的队列。消费者需要监听这些队列,获取消息。以下是消费者的代码实现:

public class ReceiveLogs {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.92.130");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
String queue = channel.queueDeclare().getQueue();
channel.queueBind(queue, EXCHANGE_NAME, "");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(queue, true, deliverCallback, consumerTag -> {});
}
}

在代码中,我们可以看到以下关键点:

  • 自动确认(autoAck):设置为true,表示消费者接收消息后会立即发送确认回执。
  • 队列绑定:通过queueBind方法将队列与交换器绑定。
  • 消息处理:在DeliverCallback中定义了消息接收的处理逻辑。

4. 测试与验证

为了验证我们的实现是否正确,我们可以进行以下测试:

4.1 启动生产者

生产者负责发送消息到交换器。以下是生产者的代码:

public class EmitLog {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.92.130");
try (Connection connection = factory.newConnection()) {
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
String message = "RabbitMQ fanout test message";
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("utf-8"));
System.out.println(" [x] Sent '" + message + "'");
}
}
}

4.2 启动两个消费者

每个消费者都会 listening 到不同的队列。为了方便测试,我们可以手动启动两个消费者实例。通过运行以下代码,可以启动两个消费者:

public class ReceiveLogs {
public static void main(String[] args) throws Exception {
// 上述代码已省略
}
}

4.3 查看测试结果

在生产者发送消息后,两个消费者都会接收到消息。通过查看控制台输出,可以看到以下信息:

[*] Waiting for messages. To exit press CTRL+C
[x] Received "RabbitMQ fanout test message"
[x] Received "RabbitMQ fanout test message"

这表明我们的实现是正确的。

5. RabbitMQ在Spring Boot中的集成

在实际项目中,手动配置代码可能不够简便。我们可以通过Spring Boot来简化配置和管理。

5.1 项目结构

src/main/java/com/example/rabbitmq/
├── EmitLog.java
├── ReceiveInfoLogs.java
├── ReceiveErrorLogs.java
├── EmitLogRunner.java
└── application.properties

5.2 配置文件

# RabbitMQ配置
spring.rabbitmq.host=192.168.92.130
spring.rabbitmq.exchange=logs
# 队列配置
spring.rabbitmq.log.fanout.info=infor
spring.rabbitmq.log.fanout.error=error

5.3 代码实现

  • 生产者(EmitLog.java)
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Component
public class EmitLog {
@Value("${spring.rabbitmq.exchange}")
private String exchange;
@Autowired
private AmqpTemplate amqpTemplate;
public void send(String msg) {
amqpTemplate.convertAndSend(exchange, "", msg);
}
}
  • 消费者(ReceiveInfoLogs.java)
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "${rqbbitmq.log.fanout.info}", autoDelete = "true"),
exchange = @Exchange(value = "${spring.rabbitmq.exchange}", type = ExchangeTypes.FANOUT)
))
public class ReceiveInfoLogs {
@Autowired
private AmqpTemplate amqpTemplate;
@RabbitHandler
public void receiveInfoLog(Object message) {
System.out.println("接收到info级别的日志:" + message);
}
}
  • 消费者(ReceiveErrorLogs.java)
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "${rqbbitmq.log.fanout.error}", autoDelete = "true"),
exchange = @Exchange(value = "${spring.rabbitmq.exchange}", type = ExchangeTypes.FANOUT)
))
public class ReceiveErrorLogs {
@Autowired
private AmqpTemplate amqpTemplate;
@RabbitHandler
public void receiveErrorLog(Object message) {
System.out.println("接收到的error级别日志:" + message);
}
}
  • 启动类(EmitLogRunner.java)
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
@Component
public class EmitLogRunner implements ApplicationRunner {
@Autowired
private EmitLog emitLog;
@Override
public void run(ApplicationArguments args) throws Exception {
System.out.println("生产者发布消息:" + "测试消息");
emitLog.send("测试消息");
}
}

通过上述代码,我们可以轻松地在Spring Boot项目中配置RabbitMQ的发布-订阅模式。

6. 总结

通过本章的学习,我们掌握了RabbitMQ发布-订阅模式的核心原理,特别是广播模式的实现方法。我们了解了如何通过交换器和队列实现消息的广播传播,并成功地在Spring Boot项目中集成了RabbitMQ。

在下一章中,我们将深入探讨RabbitMQ的路由功能,学习如何根据消息的属性(如消息头)进行智能路由。

转载地址:http://pszbz.baihongyu.com/

你可能感兴趣的文章
mysql 创建表,不能包含关键字values 以及 表id自增问题
查看>>
mysql 删除日志文件详解
查看>>
mysql 判断表字段是否存在,然后修改
查看>>
MySQL 到底能不能放到 Docker 里跑?
查看>>
mysql 前缀索引 命令_11 | Mysql怎么给字符串字段加索引?
查看>>
MySQL 加锁处理分析
查看>>
mysql 协议的退出命令包及解析
查看>>
mysql 参数 innodb_flush_log_at_trx_commit
查看>>
mysql 取表中分组之后最新一条数据 分组最新数据 分组取最新数据 分组数据 获取每个分类的最新数据
查看>>
MySQL 命令和内置函数
查看>>
MySQL 和 PostgreSQL,我到底选择哪个?
查看>>
mysql 四种存储引擎
查看>>
MySQL 在并发场景下的问题及解决思路
查看>>
MySQL 在控制台插入数据时,中文乱码问题的解决
查看>>
MySQL 基础架构
查看>>
MySQL 基础模块的面试题总结
查看>>
MySQL 处理插入重主键唯一键重复值办法
查看>>
MySQL 备份 Xtrabackup
查看>>
mysql 复杂查询_mysql中复杂查询
查看>>
mYSQL 外键约束
查看>>