MQ系列(一):ActiveMQ特性,概念,持久化,安装,应用集成
ActiveMQ 是一款老牌的,开源的,多协议的,非常流行的,基于 JAVA 的消息中间件,由 Apache 出品。ActiveMQ 官网,ActiveMQ Documentation。
ActiveMQ 实现了 JSM 1.1 标准,并提供了很多附加特性,如 JMS 管理,主从管理,消息组通信,消息优先级,延迟接收消息,虚拟接收者,消息持久化,消息队列监控等特性。
目前有两种类型的 ActiveMQ 可用,分别是经典的 5.x
版本 和 下一代 Artemis
版本,后续 5.x 与 Artemis 版本兼容合并将成为 ActiveMQ 6。
ActiveMQ特性
支持多种跨语言客户端和协议,如 OpenWire,STOMP,AMQP,MQTT;支持 Java,C ,C++,.NET 多种语言的客户端。
可以使用无处不在的 AMQP 协议集成多平台应用。使用 STOMP 通过 Websockets 在 web 应用程序之间交换消息。使用MQTT管理物联网(
IoT
)设备。完全支持 JMS 1.1 协议 和 J2ee1.4 规范(包括瞬态、持久化、事务,分布式事务消息)。
支持许多高级特性,如消息组,虚拟目的地,通配符和复合目的地等高级特性。
支持 Spring,使的 ActiveMQ 可以很容易嵌入到 Spring 应用中,并使用 Spring 的 XML 配置机制进行配置。
支持多种连接方式,如 in- VM、TCP、SSL、NIO、UDP、多播、JGroups 和 JXTA。
可以使用JDBC和高性能日志支持非常快速的持久化。
ActiveMQ基本概念
消息模型
ActiveMQ 完整支持 JMS 1.1, 从 Java 使用者的角度来看,其基本概念与 JMS 1.1规范是一致的。
点对点
点对点(Point to Point)模型:使用队列(Queue)作为消息载体,一条消息只能被一个消费者使用,未被消费的消息在存放在队列中直到被消费或超时。
发布/订阅
发布/订阅(Pub/Sub)模型:使用主题(Topic)作为消息载体,类似于广播模式,发布到主题的消息会传递给所有订阅该主题的订阅者,在一条消息被广播后才订阅的用户是收不到该消息的。
基本组件
Broker(消息代理)
消息队列服务器实体,接收客户端连接,提供消息通信的核心服务。
Producer(消息生产者)
生产消息并发给 Broker。
Consumer(消息消费者)
从 Broker 获取消息进行业务逻辑处理。
Queue(队列)
点对点模式下存放消息的地方,生产者发送消息到队列,消费者接收消息并进行业务处理。
Topic(主题)
发布/订阅 模式下消息汇聚地,生产者向 Topic 发送消息,由 Broker 广播给所有订阅者。
Message(消息)
按相应通信协议编码的数据包,封装业务数据。
传输连接器
为了交换消息,消息生产者和消息消费者(客户端)都需要连接到消息代理服务器。客户端和消息代理服务器之间的通信是通过传输连接器完成的。传输连接器就是用来处理和监听客户端连接的。
基本协议
ActiveMQ 常用的传输连接器协议可在其配置文件(/activemq-5.15.9/conf/activemq.xml
)中查看:
1 | <!-- 传输连接器通过给定协议把 ActiveMQ 暴露给客户端和其它代理服务器 --> |
ActiveMQ 支持的传输连接器协议有:OpenWire,VM,TCP,UDP,Multicast,NIO,SSL,HTTP,HTTPS,WebSocket,AMQP,MQTT,Stomp,XMAPP。每种协义配置可参见官网:Connectivity > Protocols 和 Features > URI Protocols。
- Openwire:监听 61616 端口。
- AMQP:监听 5672 端口。
- STOMP:监听 61613 端口。
- WebSocket:监听 61614 端口。
- MQTT:监听 1883 端口。
传输连接器的配置可参考官方文档:Transport configuration options。
协议合并
从5.13.0版开始,ActiveMQ 支持自动协议检测。可以检测OpenWire、STOMP、AMQP 和 MQTT,这样可以为所有4种类型协议的客户端共享一个传输(监听端口进行了合并)。
auto 自动协议检测
要 开启自动协议检测,URI 需要使用 auto:
前缀,具体如下。auto
配置只是让 ActiveMQ 的连接配置简捷一点。
TCP 连接(默认方式)
1
<transportConnector name="auto" uri="auto://localhost:5671"/>
SSL 连接
1
<transportConnector name="auto+ssl" uri="auto+ssl://localhost:5671"/>
ActiveMQ 使用 SSL 的详细配置参考 如何使用 SSL 文章。
NIO TCP 连接
1
<transportConnector name="auto+nio+ssl" uri="auto+nio+ssl://localhost:5671"/>
配置 auto 连接属性
属性名 | 默认值 | 描述 |
---|---|---|
protocolDetectionTimeOut | 30000 | 协议检测连接超时(ms),默认 30 秒,值设置为 <= 0 则禁用此项。 如果客户端连接但没有发送数据或足够的数据以检测协议,则线程将等待更多的数据以完成协议初始化,如果等待超时,则 Broker 会终止客户商连接。 |
maxConnectionThreadPoolSize | MAX_INT | 配置连接线程池允许的最大线程数。 若有很多连接,则降低此值可帮助防止代理耗尽线程。 默认值为 MAX_INT,表示关闭。 |
下面是配置最长协议检测为 5 秒的示例:
1 | <transportConnector name="auto" uri="auto://localhost:5671?protocolDetectionTimeOut=5000"/> |
配置 OpenWire 协议格式
OpenWire 是ActiveMQ 使用的默认 Wire Format。 为高速消息传递提供了一种高效的二进制格式。 可以在JMS客户端的连接 URI 中或 Brokers 传输绑定 URI 上配置 OpenWire 选项。
属性前缀 | 描述 |
---|---|
wireFormat. | 应用于所有连接格式。 |
wireFormat.default. | 应用于 OpenWire 默认的格式。 |
wireFormat.stomp. | 应用于 STOMP 连接格式。 |
wireFormat.amqp. | 应用于 AMQP 连接格式。 |
wireFormat.mqtt. | 应用于 MQTT 连接格式。 |
wireFormat.
示例1
<transportConnector name="auto" uri="auto://localhost:5671?wireFormat.maxFrameSize=1000"/>
wireFormat.default.
示例1
<transportConnector name="auto" uri="auto://localhost:5671?wireFormat.default.maxFrameSize=1000"/>
配置 auto 连接协议
默认情况下,auto 支持的所有连接协议均可用。也可以通过设置属性(auto.protocols
)来指定某些协议格式。
值 | 描述 |
---|---|
default | 开启 OpenWire |
amqp | 开启 AMQP 格式 |
stomp | 开启 STOMP 格式 |
mqtt | 开启 MQTT 格式 |
示例:只开启 OpenWire 和 STOMP
1 | <transportConnector name="auto" uri="auto://localhost:5671?auto.protocols=default,stomp"/> |
高级协议
ActiveMQ 还支持一些高级协议,也可以通过 URI 的方式配置,如 Failover 和 Fanout。
Failover(故障转移)
故障转移:一种重新连接的机制,工作于基础协议传输之上。用于建立可靠的连接。允许指定任意数量的复合 URI,它会随机选择其中的一个 URI 尝试建立连接,如果失败,则继续选择其它 URI 建立连接。具体配置可参考官网:The Failover Transport。
默认的 TCP 连接地址是 ActiveMQConnection.DEFAULT_BROKER_URL,实际它的值是 failover://tcp://localhost:61616,所以客户端默认使用的是基于 TCP 的 Failover 连接的协议。
语法:
1 | failover:(Uri1,Uri2,Uri3,…,UriN) |
示例:
1 | failover:(tcp://localhost:61616,tcp://remotehost:61616)?initialReconnectDelay=100 |
Fanout(Fanout传输)
Fanout:一种重新建立连接和复制的机制,工作于基础协议传输之上。它利用发现传输(Discovery Transport)来发现 Brokers,并将消息复制到这些 Borkers。具体配置可参考官网:The Fanout Transport。
语法:
1 | fanout:(discoveryURI)?transportOptions |
示例:
1 | fanout:(static:(tcp://localhost:61616,tcp://remotehost:61616))?initialReconnectDelay=100 |
消息持久化
消息转发模式
为防止因系统崩溃而导致消息丢失,消息中间件一般会支持消息持久化便于服务重启后恢复原来的消息数据。
通常消息持久化的逻辑是:当生产者发送消息后,Broker 首先将消息存储到文件,内存数据库或完程数据库等地方,然后再将消息发送给消费者,发送成功后将消息从存储中删除,若失败则继续尝试发送。Broker 在启动时会先检查指定位置的存储,若有发送失败的消息,则继续发送。
在 JMS 规范中对消息转发模式有两种:持久化(Persistent) 和 非持久化(Non_Persistent)。ActiveMQ 实现了 JSM 规范,即也支持这两种消息转发模式。
持久化(PERSISTENT)
对于持久化消息,被发送到消息服务器后,会被持久化存储,如果当前没有消费者,则会继续存在,只有等到消息被处理并被消费确认后,才会被消息服务器删除。
该模式保证消息在被成功消费之前不会丢失。
非持久化(NON_PERSISTENT)
JMS 实现者必须保证尽最大努力分发消息,但消息不会持久化存储。非持久化消息通常用于发送通知或不重要的实时数据。
该模式消息保证最多只传递一次,不保证消息不丢失。
若看重系统性能并且即使丢失一些消息也不会影响业务正常运行,可选择非持久化消息。
ActiveMQ 也支持上面两种方式,还支持通过缓存在内存中的中间状态消息的方式来恢复消息。这样,ActiveMQ 的消息持久化存储就有三种:存储到内存,存储到文件,存储到数据库。
持久化方式
在具体使用上,ActiveMQ 提从了一种插件式的消息存储机制,主要实现有以下几种方式。
-
是一种基于日志文件并支持消息事务的存储解决方案,具有极高的速度和可靠性。ActiveMQ 5.0及更早版本默认的存储。
消息本身以日志的形式写入到 Data Log 文件中,采用的是顺序追加的方式,性能很好。并对日志建立了引用索引,以便 快速取回消息。
-
支持 SQL 数据库存储,包括 MySQL,Oracle等。存储性能最差。
-
是一个基于文件的持久性数据库,是使用消息服务器的本地数据库。这是自ActiveMQ 5.4以来的默认存储机制。
其存储机制与 AMQ 很像,也是基于日志文件、索引和缓存,但经过优化可以实现更快的持久性。与 AMQ 不同的是,KahaDB 所有的 Destination 都使用一个索引文件,所有恢复时间远小于 AMQ。ActiveMQ In Action 中说它可以支持 1 万个连接,这可以满足大部分应用场景的需求。
-
在 ActiveMQ 5.8 中引入。 LevelDB存储是基于文件的持久性数据库。 它已经过优化,以提供比 KahaDB 更快的持久性。
尽管还不是默认的消息存储,但希望此存储实现在将来的版本中成为默认的。
注意:官方说明此存储方式已被放弃且不再支持,不建议使用。推荐的存储是 KahaDB。
-
在 ActiveMQ 5.9 中引入。 因 LevelDB 已被放弃,此存储方式不在做过多介绍。详细可参考官方文档:Replicated LevelDB Store。
ActiveMQ服务安装
ActiveMQ 支持在 Windows 和 Unix 环境安装服务。
服务安装
具本安装可参考:
- 官方:Using ActiveMQ > Getting Started
- 官方:ActiveMQ Version 5 Getting Started
- Spring Boot 2实践系列(三十四):集成 AcitveMQ 消息中间件的ActiveMQ 安装章节。
ActiveMQ 日志文件:[activemq_install_dir]/data/activemq.log
Web 控制台
ActiveMQ 集成了 Web 控制台 管理工具,非常方便用于监控和管理。
访问地址:
1 | http://localhost:8161/admin |
默认的账号密码:admin / admin,可在 conf/jetty-real.properties
文件中配置。
更多消息可查看 docs/WebConsole-README.txt
文件。
应用集成ActiveMQ
Java原生集成
此方式需要手动创建连接,是比较低效的,通常使用连接池来管理连接。
消息生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38public class MQProducer {
public static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
public static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
public static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
public static void main(String[] args) {
//创建连接工厂
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);
try {
//创建连接
Connection connection = connectionFactory.createConnection();
//开启连接
connection.start();
//创建全话,不启用事务,使用自动确认应答
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建消息
TextMessage message = session.createTextMessage("Hello Word!");
//创建主题
Topic topic = session.createTopic("activemq-topic-test1");
//创建生产者
MessageProducer topicProducer = session.createProducer(topic);
//发送消息
topicProducer.send(message);
Queue queue = session.createQueue("activemq-queue-test1");
MessageProducer queueProducer = session.createProducer(queue);
queueProducer.send(message);
//关闭资源
session.close();
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}消息消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50public class MQComsumer {
public static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
public static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
public static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
public static void main(String[] args) {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);
try {
//创建连接
Connection connection = connectionFactory.createConnection();
//开启连接
connection.start();
//创建全话,不启用事务,使用自动确认应答
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建主题
Topic topic = session.createTopic("activemq-topic-test1");
//创建生产者
MessageConsumer topicConsumer = session.createConsumer(topic);
topicConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
try {
System.out.println("接收到 Topic 的消息:" + ((TextMessage) message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
Queue queue = session.createQueue("activemq-queue-test1");
MessageConsumer queueConsumer = session.createConsumer(queue);
queueConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
try {
System.out.println("接收到 Queue 的消息:" + ((TextMessage) message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
//关闭资源
session.close();
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
Spring 集成
ActiveMQ 完全支持 Spring 方式配置 JMS 客户端和服务器。具体配置可参考:ActiveMQ > Spring Support。
Spring 集成 AciveMQ ,需要在 Spring XML 配置文件配置 ActiveMQConnectionFactory 实例 Bean。
1 | <bean id="jmsFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> |
使用 Zeroconf 来发现可用的 Brokers 进行连接
1 | <bean id="jmsFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> |
使用 Spring 提供的 JmsTemplate:
1 | <!-- a pooling based JMS provider --> |
更多信息:
- Synchronous Request Response with ActiveMQ and Spring
- Using Spring to Send JMS Messages
- Using Spring to Receive JMS Messages
- Tuning JMS Message Consumption In Spring
- Efficient Lightweight JMS with Spring and ActiveMQ
- JmsTemplate Gotchas(坑)
- Spring ActiveMQ spring.xml
- Spring 整合示例和测试用例
Spring Boot 集成
MQ系列(一):ActiveMQ特性,概念,持久化,安装,应用集成