Spring Boot 2系列(三十五):集成 RabbitMQ 消息中间件

  Spring AMQP 默认支持 RabbitMQ 作为 AMQP 协议的实现,因为…RabbitMQ 和 Spring 是同一家软件公司开发的。

  Spring Boot 对 RabbitMQ 的支持也是基于 Spring AMQP。为 RabbitMQ 提供了自动配置,可以直接使用rabbitTemplate,自动开启了消息监听注解@EnableRabbit

  更多关于消息服务概念和支持的组件可阅读Spring Boot 2实践系列(三十三):JMS 和 AMQP 消息服务及支持的消息组件。   

RabbitMQ 简介

  RabbitMQ 是一个基于 AMQP 协议的轻量级,可靠,可扩展且可移植的消息代理; 支持多种消息传输协议、消息队列、传输确认、到队列的灵活路由、多种交换类型等。

  支持多种语言开发的客户端,可集群部署以实现高可用和大吞吐量。是部署最广泛、最受欢迎的开源消息代理【- -译自官方描述】。

  Spring AMQP 参考RabbitMQ 官网RabbitMQ DownloadRabbitMQ DocRabbitMQ Tutorials, Management Plugin

RabbitMQ 安装

以 Ubuntu 18.x 安装 RabbitMQ 为例,根据官方安装说明来执行安装操作。

下载

  1. 下载软件包前先在服务器添加用于签署 RabbitMQ 版本的密钥,否则会报警告

    1
    2
    apt-key adv --keyserver "hkps.pool.sks-keyservers.net" --recv-keys "0x6B73A36E6026DFCA"
    wget -O - "https://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc" | sudo apt-key add -
  2. RabbitMQ 依赖面向高并发的语言 Erlang,所以需要安装 Erlang 运行环境。
    官方有提供安装 Erlang 运行环境的安装步骤。但个人不建议手动安装 Erlang 运行环境,因为后续安装完 RabbitMQ 后,各种依赖和版本冲突问题,也不好定位,rabbitmq-server 无法启动,所以建议下载 RabbitMQ 全量包(rabbitmq-server_3.7.8-1_all.deb),会自动下载 RabbitMQ 和所有依赖。

  3. 下载 RabbitMQ 全量包,以rabbitmq-server_3.7.8-1_all.deb为例。

    执行安装:
    dpkg -i rabbitmq-server_3.7.8-1_all.deb

    会报缺少依赖,执行全量安装,会自动下载所有依赖并安装
    apt-get install -f

安装

  1. 安装完后,rabbitmq-server 服务会自动运行, 查看 rabbitmq-server 进程和状态

ps -ef|grep rabbitmq
service rabbitmq-server status //查看状态,显示 Active 表示活动运行中。

  1. 服务启动、停止、重启、查看状态操作

service rabbitmq-server start
service rabbitmq-server status
service rabbitmq-server restart
service rabbitmq-server stop

查看 rabbitmq 状态,包括当前的配置信息
rabbitmqctl status

  1. 安装 Rabbitmq 管理插件,提供基于 HTTP API 对 Rabbitmq 节点和集群进行管理和监控。管理插件包括基于浏览器的 UI 和命令行工具 rabbitmqadmin。

启用管理插件
rabbitmq-plugins enable rabbitmq_management

Web 管理

  1. 登录 Rabbitmq Web 管理
    Web 管理的端口是 15672,为本地登录提供了guest/guest账号密码,若要远程登录管理,需要添加账号。

http://rabbitmq_server_ip:15672

  1. 添加个 admin 账号用于远程登录

添加账号密码
rabbitmqctl add_user admin 123456

查看添加的账号
rabbitmqctl list_users

添加权限为管理员,管理员账号可用于 Spring Boot 的连接
rabbitmqctl set_user_tags admin administrator

分配权限(赋予virtual host中所有资源的配置、写、读权限以便管理其中的资源

1
rabbitmqctl set_permissions -p / admin '.*' '.*' '.*'
  1. 关于 rabbitmqctl 命令的使用,可使用 man 命令来查看

man rabbitmqctl

  1. 也可以直接使用 Rabbitmq 的 Docker 镜像,做好端口映射;省略手动安装 Rabbitmq 服务。

RabbitMQ 集成

RabbitMQ 的具体使用可参考官方的RabbitMQ Tutorials。RabbitMQ 通信有六种模式: Simplest(一对一的简单队列模式)、Work queues(一对多的队列模式)、Publish/Subscribe(发布-订阅模式)、Routing(路由模式)、Topics(主题模式)、RPC(RPC调用模式)。

此集成演示最常见的Queue队列模式和Topic主题模式。RabbitMQ的功能非常强大,可以单独开个系列文章来分析和实践。

基本配置

前提是 RabbitMQ 服务器正常运行中。

  1. pom.xml 添加依赖

    1
    2
    3
    4
    5
    <!--AMQP-->
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
  2. application.properties 配置

    1
    2
    3
    4
    5
    6
    #---------RabbitMQ-------------------------
    spring.rabbitmq.host=192.168.220.128
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=admin
    spring.rabbitmq.password=123456
    spring.rabbitmq.virtual-host=/

    Queues

  3. 添加 Queue Bean 配置

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    /**
    * @name: MQComponent
    * @desc: 消息组件
    **/
    @Configuration
    public class QueueRabbitMQConfig {

    private static final String QUEUE_NAME_1 = "my-queue";
    private static final String QUEUE_NAME_2 = "object";

    /**
    * 声明队列
    *
    * @return
    */
    @Bean
    public Queue strQueue() {
    return new Queue(QUEUE_NAME_1);
    }

    /**
    * 声明队列
    *
    * @return
    */
    @Bean
    public Queue objQueue() {
    return new Queue(QUEUE_NAME_2);
    }

    /**
    * 声明交换器
    *
    * @return
    */
    @Bean
    public DirectExchange directExchange() {
    DirectExchange directExchange = new DirectExchange("direct.exchange", false, true);
    return directExchange;
    }

    /**
    * 队列绑定交换器
    *
    * @param strQueue
    * @param directExchange
    * @return
    */
    @Bean
    public Binding strQueueBind(Queue strQueue, DirectExchange directExchange) {
    //使用队列名作为路由
    Binding binding = BindingBuilder.bind(strQueue).to(directExchange).withQueueName();
    return binding;
    }

    @Bean
    public Binding objQueueBind(Queue objQueue, DirectExchange directExchange) {
    //使用队列名作为路由
    Binding binding = BindingBuilder.bind(objQueue).to(directExchange).withQueueName();
    return binding;
    }
    }
  4. 消息生产者

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    /**
    * @name: MQSendServiceImpl
    **/
    @Service
    public class QueueProducerServiceImpl implements QueueProducerService {

    @Autowired
    private RabbitTemplate rabbitTemplate;


    /**
    * 发送普通消息到队列-queue
    */
    @Override
    public void rabbitMQSendStr(String msg) throws InterruptedException {
    for (int i = 0; i < 100; i++) {
    rabbitTemplate.convertAndSend("my-queue", System.nanoTime() + "__" + i);
    System.out.println(i);
    Thread.sleep(1000);
    }
    }

    /**
    * 发送对象,对象必须实现序列化接口
    */
    @Override
    public void rabbitMQSendObj(User user) {
    this.rabbitTemplate.convertAndSend("object", user);
    }
    }
  5. 消息消费者

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    /**
    * @name: MQConsumerServiceImpl
    * @desc: 消息消费者
    **/
    @Service
    public class QueueConsumerServiceImpl {

    /**
    * 多个消费者订阅 queue 类型同一个渠道的消息,
    * 接收消息的顺序与发送消息的顺序并不相同,
    * 多个消费者平均消费消息。
    */

    /**
    * 监听my-queue 文本消息
    */
    @RabbitListener(queues = "my-queue")
    public void rabbitMQReceive1(String msg) {
    System.out.println("client1 receive my-queue msg:" + msg);
    }

    @RabbitListener(queues = "my-queue")
    public void rabbitMQReceive2(String msg) {
    System.out.println("client2 receive my-queue msg:" + msg);
    }

    @RabbitListener(queues = "my-queue")
    public void rabbitMQReceive3(String msg) {
    System.out.println("client3 receive my-queue msg:" + msg);
    }

    /**
    * 监听user 对象消息消息
    */
    @RabbitListener(queues = "object")
    public void rabbitMQReceiveUser(User user) {
    System.out.println("receive user msg:" + user);
    }
    }

多个消费者订阅 queue 类型同一个渠道的消息,多个消费者平均消费消息(轮询),接收消息的顺序与发送消息的顺序并不相同。
Spring Boot 集成 RabbitMQ,只要对象实现了序列化接口,可直接发布对象消息。

Topics

Topic主题模式在消息生产者和消息消费者之间增加了Exchange(交换器),将消息生产者和消息消费者进行了解耦,消息生产者将消息发送给交换器,交换器根据调度策略把消息再给到目标队列。

Exchange 用于转发消息,不做存储。若没有将 Queue bind 到 Exchange,交换器将直接丢弃消息,若开启了 ack 模式,在找不到队列时会返回错误。

将 Queue 绑定到 Exchange ,涉及三个参数的关联,分别是queue、exchange、routing_key,如BindingBuilder.bind(queueNewsNba).to(exchange).with("topic.news.nba")

Queue 绑定到 Exchang 状态可在 Web 管理界面 Queues 选项点击队列名称,在队列概述界面有个 Bindings 项,点击展开可查看到当前 queue 绑定到的路由键。

routing_key:路由密钥由分隔的多个单词组成,最多可达 255 个字节。例:news.nba,sys.log.error等。同样 queue 名也采用相同的形式。此路由键支持两种模式匹配:

*(start)可匹配一个单词。
#(hash)可替换零个或多个单词。

  1. Topic 配置(注册 Queue,TopicExchange,Binding)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    /**
    * @name: TopicRabbitMQConfig
    * @desc: Topic 配置
    **/
    @Configuration
    public class TopicRabbitMQConfig {

    //新闻主题
    private static final String TOPIC_NEWS = "topic.news";
    //NBA新闻主题
    private static final String TOPIC_NEWS_NBA = "topic.news.nba";

    @Bean
    public Queue queueNews() {
    return new Queue(TopicRabbitMQConfig.TOPIC_NEWS);
    }

    @Bean
    public Queue queueNewsNba() {
    return new Queue(TopicRabbitMQConfig.TOPIC_NEWS_NBA);
    }

    /**
    * 注册交换器
    *
    * @return
    */
    @Bean
    public TopicExchange exchange() {
    TopicExchange exchange = new TopicExchange("exchange");
    return exchange;
    }

    /**
    * 队列绑定到交换器,设置匹配的routing_key
    * /

    /**
    * 订阅新闻主题可以收到所有新闻包括NBA
    * @param queueNews
    * @param exchange
    * @return
    */
    @Bean
    public Binding bindingExchangeNews(Queue queueNews, TopicExchange exchange) {
    return BindingBuilder.bind(queueNews).to(exchange).with("topic.news.#");
    }

    /**
    * 订阅新闻下的NBA主题只可以收到NBA新闻
    * @param queueNewsNba
    * @param exchange
    * @return
    */
    @Bean
    public Binding bindingExchangeNewsNba(Queue queueNewsNba, TopicExchange exchange) {
    return BindingBuilder.bind(queueNewsNba).to(exchange).with("topic.news.nba");
    }
    }
  2. 消息生产者

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    /**
    * @name: TopicProducerServiceImpl
    * @desc: Topic 生产者
    */
    @Service
    public class TopicProducerServiceImpl implements TopicProducerService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private TopicExchange topicExchange;

    @Override
    public void rabbitMQSendStr(String msg) {
    rabbitTemplate.convertAndSend(topicExchange.getName(), "topic.news", "Top Ten News");
    }

    @Override
    public void rabbitMQSendObj(User andy) {
    rabbitTemplate.convertAndSend(topicExchange.getName(), "topic.news.nba", "2018-2019 NBA First Battle Start");
    }
    }
  3. 消息消费者

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    /**
    * @name: TopicConsumerServiceImpl
    * @desc: Topic 消费者
    **/
    @Service
    public class TopicConsumerServiceImpl {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RabbitListener(queues = "topic.news")
    public void rabbitMQReceive1(String msg) {
    System.out.println("client1 receive topic.news msg: " + msg);
    }

    @RabbitListener(queues = "topic.news.nba")
    public void rabbitMQReceive2(String msg) {
    System.out.println("client2 receive topic.news.nba msg: " + msg);
    }
    }

springboot-rabbitmq -> 源码Github

Spring Boot 2系列(三十五):集成 RabbitMQ 消息中间件

http://blog.gxitsky.com/2018/10/27/SpringBoot-35-rabbitmq/

作者

光星

发布于

2018-10-27

更新于

2022-06-17

许可协议

评论