Spring Boot 2系列(三十五):集成 RabbitMQ 消息中间件
Spring AMQP 默认支持 RabbitMQ 作为 AMQP 协议的实现,因为…RabbitMQ 和 Spring 是同一家软件公司开发的。
Spring Boot 对 RabbitMQ 的支持也是基于 Spring AMQP。为 RabbitMQ 提供了自动配置,可以直接使用rabbitTemplate
,自动开启了消息监听注解@EnableRabbit
。
更多关于消息服务概念和支持的组件可阅读Spring Boot 2实践系列(三十三):JMS 和 AMQP 消息服务及支持的消息组件。
RabbitMQ 简介
RabbitMQ 是一个基于 AMQP 协议的轻量级,可靠,可扩展且可移植的消息代理; 支持多种消息传输协议、消息队列、传输确认、到队列的灵活路由、多种交换类型等。
支持多种语言开发的客户端,可集群部署以实现高可用和大吞吐量。是部署最广泛、最受欢迎的开源消息代理【- -译自官方描述】。
Spring AMQP 参考,RabbitMQ 官网,RabbitMQ Download,RabbitMQ Doc,RabbitMQ Tutorials, Management Plugin。
RabbitMQ 安装
以 Ubuntu 18.x 安装 RabbitMQ 为例,根据官方安装说明来执行安装操作。
下载
下载软件包前先在服务器添加用于签署 RabbitMQ 版本的密钥,否则会报警告
1
2apt-key adv --keyserver "hkps.pool.sks-keyservers.net" --recv-keys "0x6B73A36E6026DFCA"
wget -O - "https://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc" | sudo apt-key add -RabbitMQ 依赖面向高并发的语言 Erlang,所以需要安装 Erlang 运行环境。
官方有提供安装 Erlang 运行环境的安装步骤。但个人不建议手动安装 Erlang 运行环境,因为后续安装完 RabbitMQ 后,各种依赖和版本冲突问题,也不好定位,rabbitmq-server 无法启动,所以建议下载 RabbitMQ 全量包(rabbitmq-server_3.7.8-1_all.deb),会自动下载 RabbitMQ 和所有依赖。下载 RabbitMQ 全量包,以rabbitmq-server_3.7.8-1_all.deb为例。
执行安装:
dpkg -i rabbitmq-server_3.7.8-1_all.deb会报缺少依赖,执行全量安装,会自动下载所有依赖并安装
apt-get install -f
安装
- 安装完后,rabbitmq-server 服务会自动运行, 查看 rabbitmq-server 进程和状态
ps -ef|grep rabbitmq
service rabbitmq-server status //查看状态,显示 Active 表示活动运行中。
- 服务启动、停止、重启、查看状态操作
service rabbitmq-server start
service rabbitmq-server status
service rabbitmq-server restart
service rabbitmq-server stop查看 rabbitmq 状态,包括当前的配置信息
rabbitmqctl status
- 安装 Rabbitmq 管理插件,提供基于 HTTP API 对 Rabbitmq 节点和集群进行管理和监控。管理插件包括基于浏览器的 UI 和命令行工具 rabbitmqadmin。
启用管理插件
rabbitmq-plugins enable rabbitmq_management
Web 管理
- 登录 Rabbitmq Web 管理
Web 管理的端口是 15672,为本地登录提供了guest/guest
账号密码,若要远程登录管理,需要添加账号。
- 添加个 admin 账号用于远程登录
添加账号密码
rabbitmqctl add_user admin 123456查看添加的账号
rabbitmqctl list_users添加权限为管理员,管理员账号可用于 Spring Boot 的连接
rabbitmqctl set_user_tags admin administrator分配权限(赋予virtual host中所有资源的配置、写、读权限以便管理其中的资源
1 | rabbitmqctl set_permissions -p / admin '.*' '.*' '.*' |
- 关于 rabbitmqctl 命令的使用,可使用 man 命令来查看
man rabbitmqctl
- 也可以直接使用 Rabbitmq 的 Docker 镜像,做好端口映射;省略手动安装 Rabbitmq 服务。
RabbitMQ 集成
RabbitMQ 的具体使用可参考官方的RabbitMQ Tutorials。RabbitMQ 通信有六种模式: Simplest(一对一的简单队列模式)、Work queues(一对多的队列模式)、Publish/Subscribe(发布-订阅模式)、Routing(路由模式)、Topics(主题模式)、RPC(RPC调用模式)。
此集成演示最常见的Queue
队列模式和Topic
主题模式。RabbitMQ的功能非常强大,可以单独开个系列文章来分析和实践。
基本配置
前提是 RabbitMQ 服务器正常运行中。
pom.xml 添加依赖
1
2
3
4
5<!--AMQP-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>application.properties 配置
1
2
3
4
5
6#---------RabbitMQ-------------------------
192.168.220.128 =
5672 =
admin =
123456 =
/ =Queues
添加 Queue Bean 配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62/**
* @name: MQComponent
* @desc: 消息组件
**/
public class QueueRabbitMQConfig {
private static final String QUEUE_NAME_1 = "my-queue";
private static final String QUEUE_NAME_2 = "object";
/**
* 声明队列
*
* @return
*/
public Queue strQueue() {
return new Queue(QUEUE_NAME_1);
}
/**
* 声明队列
*
* @return
*/
public Queue objQueue() {
return new Queue(QUEUE_NAME_2);
}
/**
* 声明交换器
*
* @return
*/
public DirectExchange directExchange() {
DirectExchange directExchange = new DirectExchange("direct.exchange", false, true);
return directExchange;
}
/**
* 队列绑定交换器
*
* @param strQueue
* @param directExchange
* @return
*/
public Binding strQueueBind(Queue strQueue, DirectExchange directExchange) {
//使用队列名作为路由
Binding binding = BindingBuilder.bind(strQueue).to(directExchange).withQueueName();
return binding;
}
public Binding objQueueBind(Queue objQueue, DirectExchange directExchange) {
//使用队列名作为路由
Binding binding = BindingBuilder.bind(objQueue).to(directExchange).withQueueName();
return binding;
}
}消息生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30/**
* @name: MQSendServiceImpl
**/
public class QueueProducerServiceImpl implements QueueProducerService {
private RabbitTemplate rabbitTemplate;
/**
* 发送普通消息到队列-queue
*/
public void rabbitMQSendStr(String msg) throws InterruptedException {
for (int i = 0; i < 100; i++) {
rabbitTemplate.convertAndSend("my-queue", System.nanoTime() + "__" + i);
System.out.println(i);
Thread.sleep(1000);
}
}
/**
* 发送对象,对象必须实现序列化接口
*/
public void rabbitMQSendObj(User user) {
this.rabbitTemplate.convertAndSend("object", user);
}
}消息消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39/**
* @name: MQConsumerServiceImpl
* @desc: 消息消费者
**/
public class QueueConsumerServiceImpl {
/**
* 多个消费者订阅 queue 类型同一个渠道的消息,
* 接收消息的顺序与发送消息的顺序并不相同,
* 多个消费者平均消费消息。
*/
/**
* 监听my-queue 文本消息
*/
public void rabbitMQReceive1(String msg) {
System.out.println("client1 receive my-queue msg:" + msg);
}
public void rabbitMQReceive2(String msg) {
System.out.println("client2 receive my-queue msg:" + msg);
}
public void rabbitMQReceive3(String msg) {
System.out.println("client3 receive my-queue msg:" + msg);
}
/**
* 监听user 对象消息消息
*/
public void rabbitMQReceiveUser(User user) {
System.out.println("receive user msg:" + user);
}
}
多个消费者订阅 queue
类型同一个渠道的消息,多个消费者平均消费消息(轮询),接收消息的顺序与发送消息的顺序并不相同。
Spring Boot 集成 RabbitMQ,只要对象实现了序列化接口,可直接发布对象消息。
Topics
Topic主题模式在消息生产者和消息消费者之间增加了Exchange
(交换器),将消息生产者和消息消费者进行了解耦,消息生产者将消息发送给交换器,交换器根据调度策略把消息再给到目标队列。
Exchange
用于转发消息,不做存储。若没有将 Queue bind 到 Exchange,交换器将直接丢弃消息,若开启了 ack 模式,在找不到队列时会返回错误。
将 Queue 绑定到 Exchange ,涉及三个参数的关联,分别是queue、exchange、routing_key,如BindingBuilder.bind(queueNewsNba).to(exchange).with("topic.news.nba")
。
Queue 绑定到 Exchang 状态可在 Web 管理界面 Queues 选项点击队列名称,在队列概述界面有个 Bindings 项,点击展开可查看到当前 queue 绑定到的路由键。
routing_key:路由密钥由点分隔的多个单词组成,最多可达 255 个字节。例:news.nba,sys.log.error等。同样 queue 名也采用相同的形式。此路由键支持两种模式匹配:
*(start)可匹配一个单词。
#(hash)可替换零个或多个单词。
Topic 配置(注册 Queue,TopicExchange,Binding)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59/**
* @name: TopicRabbitMQConfig
* @desc: Topic 配置
**/
public class TopicRabbitMQConfig {
//新闻主题
private static final String TOPIC_NEWS = "topic.news";
//NBA新闻主题
private static final String TOPIC_NEWS_NBA = "topic.news.nba";
public Queue queueNews() {
return new Queue(TopicRabbitMQConfig.TOPIC_NEWS);
}
public Queue queueNewsNba() {
return new Queue(TopicRabbitMQConfig.TOPIC_NEWS_NBA);
}
/**
* 注册交换器
*
* @return
*/
public TopicExchange exchange() {
TopicExchange exchange = new TopicExchange("exchange");
return exchange;
}
/**
* 队列绑定到交换器,设置匹配的routing_key
* /
/**
* 订阅新闻主题可以收到所有新闻包括NBA
* @param queueNews
* @param exchange
* @return
*/
public Binding bindingExchangeNews(Queue queueNews, TopicExchange exchange) {
return BindingBuilder.bind(queueNews).to(exchange).with("topic.news.#");
}
/**
* 订阅新闻下的NBA主题只可以收到NBA新闻
* @param queueNewsNba
* @param exchange
* @return
*/
public Binding bindingExchangeNewsNba(Queue queueNewsNba, TopicExchange exchange) {
return BindingBuilder.bind(queueNewsNba).to(exchange).with("topic.news.nba");
}
}消息生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23/**
* @name: TopicProducerServiceImpl
* @desc: Topic 生产者
*/
public class TopicProducerServiceImpl implements TopicProducerService {
private RabbitTemplate rabbitTemplate;
private TopicExchange topicExchange;
public void rabbitMQSendStr(String msg) {
rabbitTemplate.convertAndSend(topicExchange.getName(), "topic.news", "Top Ten News");
}
public void rabbitMQSendObj(User andy) {
rabbitTemplate.convertAndSend(topicExchange.getName(), "topic.news.nba", "2018-2019 NBA First Battle Start");
}
}消息消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20/**
* @name: TopicConsumerServiceImpl
* @desc: Topic 消费者
**/
public class TopicConsumerServiceImpl {
private RabbitTemplate rabbitTemplate;
public void rabbitMQReceive1(String msg) {
System.out.println("client1 receive topic.news msg: " + msg);
}
public void rabbitMQReceive2(String msg) {
System.out.println("client2 receive topic.news.nba msg: " + msg);
}
}
Spring Boot 2系列(三十五):集成 RabbitMQ 消息中间件