MQ系列(六):RabbitMQ特点,概念,服务安装,应用集成

RabbitMQ 是部署最广泛的开源消息代理,有成千上万的用户,是最受欢迎的开源消息中间件之一。

RabbitMQ(兔子) 轻巧,易于在内部和云上部署。可以分布式集群部署,以满足大规模,高可用性的要求。

RabbitMQ 是由 Erlang 语言开发的基于 AMQP (Advanced Message Queuing Protocol:高级消息队列协议)标准的开源实现。目前最新版本是 RabbitMQ 3.8.14(2021-03-02) 。 版本实现的 AMQP 0-9-1 版本协议。还可以通过插件的方式支持 STOMP 和 MQTT 等协议。

特点

RabbitMQ 具有以下 特点:

  • 可靠性:RabbitMQ 使用一些机制来保证可靠性,如持久化,传输确认,发布确认等。
  • 灵活的路由:在消息进入队列之前,通过交换器来路由消息。RabbitMQ 提供了一些内置的交换器来实现典型的路由功能。可以将多个交换器绑定在一起来实现复杂的路由功能。也可以能插件件制来实现自定义的交换器。
  • 集群扩展:多个结点可以组成一个集群,可以根据实际业务需要动态地扩展集群中的节点。
  • 高可用性:队列可以在集群中的机器上设置镜像,使得部分节点出现问题的情况下队列仍然可用。
  • 多种协议:支持原生的 AMQP 协议,还可通过插件支持 STOMP,MQTT等多种消息中间件协议。
  • 多语言客户端:几乎支持所有主流开发语言,如 C,C++,Java,Python,PHP,Ruby, Go 等。
  • 管理界面:RabbitMQ 提供了一个易用的管理界面,用户可以监控和管理消息,集群中的节点等。
  • 插件机制:RabbitMQ 支持插件机制,提供了很多插件,使得可以从多方面进行扩展,还可以自定义插件。

概念

RabbitMQ 整全上是一个 生产者消费者 模型,主要负责接收,存储和转发消息。

RabbitMQ 中相关概念来自于 AMQP 协议的基本概念,AMQP 协议是 RabbitMQ 原生就支持的协议。

基本概念

  • Message(消息):由消息头和消息体组成。消息头由一系列可选属性(元消息)组成,属性包括 routing-key(路由键),priority(优先级),delivery-mode(传输类型:是否持久化)等。消息是不透明的,一些属性可能由 Broker 使用,一些可能由消费消息的客户端使用。

  • Publisher(发布者/(Producer/生产者)):向交换器发送消息的应用。

  • Consumer(消费者):从消息队列获取消息的应用。

  • Exchange(交换器):用于接收生产者发送的消息,并将这些消息路由给服务器中对应的队列。

  • Binding(绑定):用于消息队列与交称器之间的关联。一个绑定就是基于路由键将交称器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构造的路由表。

  • Queue(队列):消息容器,保存消息直到发送给消费者。

  • Connection(网络连接):比如一个 TCP 连接,客户端连接。

  • Channel(信道):多路复用连接中的一条独立的双向数据通道。信道是建立在真实的 TCP 连接内的虚拟连接,AMQP 命令都是通过信息发送出去的,不管是发送消息,还是据接收消息。TCP 连接的创建和销毁是非常昂贵的,引入了信道的概念,可以复用一个 TCP 连接。

  • Virtual Host(虚拟机):表示一批交换器、消息队列和相关对象。虚拟机是共享相同身份认证和加密环境的独立服务器域。本质上每个 vhost 相当于一台缩小版的 RabbitMQ 服务器,它拥有自己的队列,交换器,绑定和权限机制。

    vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 /

  • Broker(代理):消息队列服务器实体。

模型结构

RabbitMQ 模型结构图:

RabbitMQ 模型结构图

交换器类型

RabbitMQ 常用的交换器类型有 4 种:Direct Exchange,Fanout Exchange,Topic Exchange,Headers Exchange

Direct 交换器

Direct 交换器是把消息发送到消息中的路由键(routeing key)与 绑定键(binding key)完全匹配的队列中。

该类型交换器路由必须完全匹配,属于单播的模式。

Fanout 交换器

Fanout 交换器只是简单地将队列绑定到交换器,发送到该交换器的所有信息都会被转发到绑定该交换器的所有队列中,会忽略路由键的处理。转发消息是最快的。

该类型交换器类似于广播,所有绑定的队列都将收到消息的副本。

Topic 交换器

将消息路中的路由键(routeing key)与 绑定键(binding key)进行模式匹配,消息将被路由到匹配的队列中。

模式匹配

  • 路由键 和 绑定键都为一个 . 分隔的字符串(被 . 分隔的每一个字符串被称为一个单词),如 NEWS.SPORT.TOPNEWS.SPORT.NBA
  • 绑定键可以存在两种特殊字符串 *# 用于做模糊匹配。该类型交换器将识别这两个通配符,其中 * 用于匹配一个单词,# 用于匹配多个单词(可以是零个)。

Headers 交换器

Headers 交换器 与 Direct 交换器类似,但不依赖于路由键的匹配规则来路由信息,是根据消息头(headers)属性进行匹配,消息将被路由到匹配的队列。

该类型的交换器性能较差,也不实用,几乎不用。

服务安装

RabbitMQ 可以运行在多种操作环境中,可以运行在云环境中。目前最新版本是 RabbitMQ 3.8.14(2021-03-02) 。

服务端操作系统更多是基于 CentOS 或 Ubuntu,下载对应的安装包安装。不建议使用解压缩包的安装方式,需要手动安装 Erlange 运行环境 和 配置 RabbitMQ 环境变量,但安装 Erlange 极其麻烦,各种依赖和版本冲突,也不好定位。

CentOS 系统需要先安装 Erlang 运行环境;RabbitMQ 为 Ubuntu 系统提供了全量的安装包(集成了 Erlang 运行环境)。

安装参考官网:Doc > Installation and Provisioning > Downloading and Installing RabbitMQ

Operating Systems and PlatformsDebian and UbuntuRed Hat Enterprise Linux, CentOS, Fedora 等。

RabbitMQ 的 Github 地址:rabbitmq/**rabbitmq-server**,Releases 包

管理插件:https://www.rabbitmq.com/management.html

安装附加插件:https://www.rabbitmq.com/installing-plugins.html

Rabbitmq Server Release:https://github.com/rabbitmq/rabbitmq-server/releases

Rabbitmq Management:https://github.com/rabbitmq/rabbitmq-management

CentOS RabbitMQ Erlang RPM:https://github.com/rabbitmq/erlang-rpm

CentOS安装

下载

下载 RabbitMQ Server 安装包:https://www.rabbitmq.com/install-rpm.html#downloads

注意区分 CentOS 6.x,7.x,8.x 系统版本。

1
wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.14/rabbitmq-server-3.8.14-1.el7.noarch.rpm

下载 RabbitMQ 运行环境依赖 Erlang:https://www.rabbitmq.com/install-rpm.html#install-zero-dependency-rpm

1
wget https://github.com/rabbitmq/erlang-rpm/releases/download/v23.2.7/erlang-23.2.7-1.el7.x86_64.rpm

注意:RabbitMQ 版本 与 Erlang 版本的匹配,要下载对应的版本

安装

先安装 Erlang:

1
yum install erlang-23.2.7-1.el7.x86_64.rpm

再安装 RabbitMQ Server:

1
yum install rabbitmq-server-3.8.14-1.el7.noarch.rpm

开机启动

RabbitMQ Server 默认不会作为守护进程启动。设置为在系统启动时默认启动守护程序,以管理员身份运行。

1
chkconfig rabbitmq-server on

运行服务

1
2
3
4
5
6
7
8
9
# CentOS 6 使用 service 命令
service rabbitmq-server start
service rabbitmq-server status
service rabbitmq-server stop

# CentOS 7 使用 systemctl 命令
systemctl start rabbitmq-server
systemctl status rabbitmq-server
systemctl stop rabbitmq-server

如果运行服务报如下错:

1
ERROR: epmd error for host 192: badarg (unknown POSIX error)

检查服务名,不要包含数字和特殊字符,并且与 /etc/hosts 文件对应。

查看主机名:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
[root@gxvmcentos7 rabbitmq]# hostname
gxvmcentos7
[root@gxvmcentos7 rabbitmq]# hostnamectl status
Static hostname: computer-vm
Icon name: computer-vm
Chassis: vm
Machine ID: 21e270d06bd84097bcbb56d5118fa61f
Boot ID: ae975f6a1794402098010777d8e2697b
Virtualization: vmware
Operating System: CentOS Linux 7 (Core)
CPE OS Name: cpe:/o:centos:centos:7
Kernel: Linux 3.10.0-1160.el7.x86_64
Architecture: x86-64

设置主机名:

1
hostnamectl set-hostname **

或编辑 /etc/hostname 文件修改主机名。

通过命令 hostnamectl 修改主机名,会自动更新到 /etc/hostname,但保存到 /etc/hosts的映射不会被修改,需要手动编辑保存。然后重新 RabbitMQ 服务。

Ubuntu安装

Github > rabbitmq / rabbitmq-server > Releases 中找到要下载的版本,在 Assets 下拉项中找到以 all 结尾的安装包(如 rabbitmq-server_3.8.3.beta.2-1_all.deb)。

可以下载到本地再上传到服务器,或直接在 Linux 服务器端使用 wget 命令工具下载,然后安装。

参考 Spring Boot 2实践系列(三十五):集成 RabbitMQ 消息中间件 > RabbitMQ安装

Docker安装

Docker 安装简单省事,推荐使用此方式,前提是服务器要先安装好 Docker 容器。

有两种镜像,一种是只包含 RabbitMq 的镜像;还有一种是在 RabbitMq 镜像的基础上集成了管理插件的镜像,管理插件提供了 Web 控制台。

Docker 可以先使用 docker pull 下载镜像,再执行 docker run 来运行;或直接执行 docker run 来创建容器,如果镜像不存在,则会下载象像。如下:

1
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management

RabbitMQ 服务默认端口是 5672,管理插件 Web 控制台的访问端口是在服务端口上加 10000,所以是 15672,默认账号密码:guest / guest。

参考:Spring Cloud系列(十三):分布式服务链路跟踪 Sleuth > RabbitMq 方式发送

解压安装

从 Github 上下载 RabbitMQ 纯应用包来安装,Linux 环境通常下载以 tar.gz结尾的安装包。

注意:需要先安装 Erlange 运行环境,不建议使用此方式安装。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 解压
tar zvxf rabbitmq-server-xxxxxxxxxx.tar.gz -C /usr/local

# 修改 /etc/profile 文件, 添加环境变量
export PATH=%PATH:/usr/local/rabbitmq/sbin
export RABBITMQ_HOME=/usr/local/rabbitmq

# 刷新配置文件,使其生效
source /etc/profile

# 启动 rabbitmq server
./rabbitmq-server
# 以守护进程的方式在后台运行
./rabbitmq-server -detached

# 查看 rabbitmq server 状态
rabbitmqctl status

# 查看集群信息
rabbitmqctl cluster_status

具体参考官方文档:RabbitMQ Server > Installation Guides

插件安装

RabbitMQ 插件目录在:/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.14/plugins

官网 > RabbitMQ PluginRabbitMQ 附带的一些核心插件列表

插件列表

RabbitMQ 默认附带了一些核心插件,可通过命令查看:

1
rabbitmq-plugins list

管理 WebUI

安装 RabbitMQ 默认带了 Management UI 插件但没有启用,只需启用即可。

查看 RabbitMQ Management Plugin:

1
2
3
4
5
rabbitmq-plugins list

# 输出
[] rabbitmq_management 3.8.14
[] rabbitmq_management_agent 3.8.14

启用 Management UI 插件:

1
rabbitmq-plugins enable rabbitmq_management

浏览器打开 RabbitMQ Web UI:http://localhost:15672

15672:HTTP API clients, management UI and rabbitmqadmin (only if the management plugin is enabled)

默认用户是:guest/guest,只能本地登录,如果需要远程登录,需添加用户添加权限。

添加用户

查看用户:

1
2
3
4
5
rabbitmqctl list_users
# 输出
Listing users ...
user tags
guest [administrator]

添加用户:

1
2
# rabbitmqctl add_user username password
rabbitmqctl add_user admin 123456

用户授权

设置用户组,设置为管理员组:

1
rabbitmqctl set_user_tags admin administrator

给用户设置权限:

1
rabbitmqctl set_permissions -p / admin '.*' '.*' '.*'

这样就可以使用新创建的用户远程登录 RabbitMQ 的 management UI 了。

注意: Linux 系统如果开启了防火墙,默认是没有放行 156725672 端口的,需要开启。

安装插件

下载插件

进入到 RabbitMQ 插件目录下发 :

1
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.8.9/rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez

启用插件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

# 输出
[root@gxvmcentos7 plugins]# rabbitmq-plugins enable rabbitmq_delayed_message_exchange
Enabling plugins on node rabbit@gxvmcentos7:
rabbitmq_delayed_message_exchange
The following plugins have been configured:
rabbitmq_delayed_message_exchange
rabbitmq_management
rabbitmq_management_agent
rabbitmq_web_dispatch
Applying plugin configuration to rabbit@gxvmcentos7...
The following plugins have been enabled:
rabbitmq_delayed_message_exchange

started 1 plugins.

如果插件不存在,会报错,如下示例:

1
2
3
4
5
6
7
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

# 输出
Enabling plugins on node rabbit@gxcentos:
rabbitmq_delayed_message_exchange
Error:
{:plugins_not_found, [:rabbitmq_delayed_message_exchange]}

应用集成

RabbitMQ 运转流程大致入下:

  1. ConnectionFactory:创建连接工厂,设置账号,密码,服务器地址,端口,虚拟机

  2. Connection:使用工厂创建连接,连接代表 RabbitMQ 的 Socket,封装了 Socket 操作相关的逻辑

  3. Channel:使用连接创建信道,信道是与 RabbitMQ 打交道的最重要的接口,大部分业务操作都是在信道中完成,比如定义队列,定义交换器,队列与交换器绑定,发布消息等。

  4. Exchange:使用信息声明交换器,指定交换器名,类型,是否持久化等

  5. Queue:使用通道声明队列,指定队列名,是否持久化,是否排它,是否自动删除等

  6. Binding:使用通道将队列与交换机绑定,指定队列名,交换器名,路由名

  7. 发布消息:使用信息发布消息,需要指定发送到的交换器名,路由键,消息体等

  8. 交换器收到消息,根据路由键找到匹配的队列,若找到则将消息存入队列中;若没找到,则根据生产者配置属性选择将消息丢弃或退回给生产者。

  9. 消费消息:消费都使用信道获取消息,对收到的信息进行确认

  10. RabbitMQ 从队列中删除已确认的消息

  11. 关闭信道,关闭连接

Java 集成 RabbitMQ

  1. pom.xml 添加依赖

    1
    2
    3
    4
    5
    <dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.8.0</version>
    </dependency>
  2. 生产者发布消息

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    public class MsgProducer {

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
    // 创建连接工厂
    ConnectionFactory factory = new ConnectionFactory();
    factory.setUsername("admin");
    factory.setPassword("123456");
    factory.setHost("192.168.220.129");
    factory.setPort(5672);
    factory.setVirtualHost("/");

    // 创建连接
    Connection connection = factory.newConnection();
    // 创建通道
    Channel channel = connection.createChannel();
    // 声明交换器
    String exchangeName = "news-exchange";
    channel.exchangeDeclare(exchangeName, "direct", true, false, null);

    // 声明一个持久化,非排他,非自动删除的队列
    String queueName = "news-queue";
    channel.queueDeclare(queueName, true, false, false, null);

    // 路由键
    String routingKey = "news-routingKey";
    // 队列与交换机绑定
    channel.queueBind(queueName, exchangeName, routingKey);

    for (int i = 0; i < 50; i++) {
    //发送消息
    byte[] msgBytes = ("News Msg ......" + i).getBytes();
    channel.basicPublish(exchangeName, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, msgBytes);
    Thread.sleep(2000);
    }
    //关闭信道和连接
    channel.close();
    connection.close();
    }
    }
  3. 消费者获取消息

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    public class MsgConsumer2 {

    public static void main(String[] args) throws IOException, TimeoutException {
    // 创建连接工厂
    ConnectionFactory factory = new ConnectionFactory();
    factory.setUsername("admin");
    factory.setPassword("123456");
    factory.setHost("192.168.220.129");
    factory.setPort(5672);
    factory.setVirtualHost("/");

    // 创建连接
    Connection connection = factory.newConnection();
    // 创建通道
    Channel channel = connection.createChannel();
    channel.basicQos(64);
    // 声明交换器
    String exchangeName = "news-exchange";
    channel.exchangeDeclare(exchangeName, "direct", true, false, null);

    // 声明一个持久化,非排他,非自动删除的队列
    String queueName = "news-queue";
    channel.queueDeclare(queueName, true, false, false, null);

    String routingKey = "news-routingKey";
    channel.queueBind(queueName, exchangeName, routingKey);

    while (true) {
    boolean autoAck = false;
    channel.basicQos(64);
    channel.basicConsume(queueName, autoAck, new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    String routingKey1 = envelope.getRoutingKey();
    String contentType = properties.getContentType();
    System.out.println("routing key:" + routingKey1);
    System.out.println("content type:" + contentType);
    long deliveryTag = envelope.getDeliveryTag();
    String msg = new String(body, "UTF-8");
    System.out.println("消息内容:" + msg);
    channel.basicAck(deliveryTag, false);
    }
    });
    }
    }
    }
  4. 启动 RabbitMQ 服务

    1
    ./sbin/rabbitmq-server
  5. 先启动消费者服务,再启动生产者服务。

参考 RabbitMQ Java Client LibraryJava Client API Guide

Spring 集成 RabbitMQ

Spring 系列框架和 RabbitMQ 源自同一家伟大的公司 Pivotal,Spring 提供了 RabbitMQ 集成库 spring-rabbit,其中包含了很多注解,非常方便开发人员的使用。

Spring 集成 RabbitMQ 主要是将代码中的配置交由 Spring 的 XML 配置文件中配置。

参考 官方 Spring AMQP官方 Spring Integration > AMQP Support

Spring Boot 集成 RabbitMQ

参考 Spring Boot 2实践系列(三十五):集成 RabbitMQ 消息中间件

参考 Spring Boot 2实践系列(三十三):JMS 和 AMQP 消息服务及支持的消息组件

参考 官方 Spring Boot Autoconfiguration for Spring AMQP (RabbitMQ)

相关参考

  1. 官网 RabbitMQ
  2. Github RabbitMQ
  3. 高可用RabbitMQ安装及使用

MQ系列(六):RabbitMQ特点,概念,服务安装,应用集成

http://blog.gxitsky.com/2020/02/18/MQ-06-RabbitMQ-feature/

作者

光星

发布于

2020-02-18

更新于

2022-06-07

许可协议

评论