MQ系列(四):ActiveMQ发送与消费优化,异步,同步,流控,预取限制,慢消费
ActiveMQ 为生产者发送消息提供了根据业务场景需要的相关优化配置,支持同步,异步发送,支持流量控制,Broker存储空间设置等。
也为消息者提供了优化设置,支预取限制,为慢消费者提供了等待消息策略和消息丢弃策略的配置,支持异步消费者和优先级消费设置。
消息发送优化
异步/同步
AcitveMQ 支持以 同步 或 异步 模式将消息发送到 Broker。使用何种模式对调用发送方法(send)的延迟有很大的影响。
延迟通常是影响生产者的吞吐量的重要因素,因此使用异步发送可以显着提高系统的性能。
在不考虑事务的情况下:
Producer 默认以同步模式发送持久化消息,每次发送都将同步并阻栋,直到 Broker 回传消息已安全持久化到磁盘的确认消息。
这个 ACK 提供了消息不会丢失的保证,由于 Producer 被阻塞,会花费巨大的延迟代价。
Producer 发送非持久化消息默认是异步发送,不需要等待 Broker 的确认,所有需要考虑流量控制。
许多高性能应用允许在故障情况下能够容忍少量的消息丢失,那么即使使用持久消息,也可以启用异步发送来提高吞吐量。
异步发送
配置异步发送有多种方式,详细可参考官网:Features > Message Dispatching Features > Async Sends。
在连接的 URI 上使用异步发送参数
1
cf = new ActiveMQConnectionFactory("tcp://locahost:61616?jms.useAsyncSend=true");
ActiveMQConnectionFactory 对象设置使用异步发送属性
1
((ActiveMQConnectionFactory)connectionFactory).setUseAsyncSend(true);
ActiveMQConnection 对象设置使用异步发送属性
1
((ActiveMQConnection)connection).setUseAsyncSend(true);
生产者流量控制
官方资料:Features > Message Dispatching Features > Producer Flow Control
流量控制:指如果 Broker 检测到超出了目标的内存限制或超出了 Broker 的临时存储或文件存储限制,则消息流可能会减慢。生产者将被阻塞,直到资源可用或将收到 JMSException 异常。此行为是可配置的。
值得注意的是,当达到 memoryLimit
或<systemUsage>
限制时,默认的<systemUsage>
设置将导致生产者阻塞:这种阻塞行为有时被误解为 挂起生产者,而实际上生产者只是静静等待空间可用。
生产者同步发送的消息将自动使用流控制; 这通常适用于同步发送的持久性消息,除非启用
useAsyncSend
标志。使用异步发送的生产者,通常来说,非持久消息的生产者不需要等待 Broker 的任何确认; 因此,如果超过了内存限制,则不会被通知。 如果生产者确实想知道超出了 Broker 限制,则需要配置
ProducerWindowSize
连接选项,以便即使异步消息也受每个生产者控制。1
ActiveMQConnectionFactory connctionFactory = connctionFactory.setProducerWindowSize(1024000);
ProducerWindowSize
是生产者在等待来自 Broker 已接受先前发送的消息,回传确认消息之前,生产者将向 Broker 发送的最大数据字节数。
另外,如果要发送非持久性消息(默认是异步发送),且想知道是否超出了队列或主题的内存限制,则只需将连接工厂配置为alwaysSyncSend
。 虽然这将变慢,但是可以确保生产者可以立即收到内存问题的通知。
禁用流控制
如果需要,可以通过在 Broker 配置中将适当的目标策略上的 producerFlowControl设置为
false`,来对 Broker 上的特定 JMS 队列和主题禁用流控制。参考 参考 Broker Configuration。
1 | <destinationPolicy> |
根据需要禁用了流控制,以便继续进行消息分派,直到所有可用磁盘空间被挂起的消息用完为止(消息积压耗尽磁盘空间,无论是否配置了持久消息传递)。为防止此情况,增加了消息游标( Message Cursors)功能。
启用流量控制
注意,由于在 ActiveMQ 5.x 中引入了新的文件游标,非持久化消息被分流到临时文件中存储,以减少用于非持久化消息的内存量。因此,可能会发现队列的memoryLimit
永远不会到达,因为游标不占用太多内存。如果确实要将所有非持久性消息保留在内存中,并在达到限制时停止生产者,则应配置<vmQueueCursor>
,如下示例:
1 | <destinationPolicy> |
Broker存储优化
Broker堵塞异常
如果生产者正在发送持久消息(以至于期望收到OpenWire消息的响应),代理将向生产者发送ProducerAck
消息,通知生产者先前的发送窗口已被处理,现在可以发送另一个窗口。 它有点像消费者的习惯,但相反。
因此,一个好的生产者可能会在发送更多数据之前等待生产者确认,以避免堵塞 Broker(如果出现慢的消费者,则可能迫使 Broker 阻塞整个连接)。 可以在 ActiveMQMessageProducer
源码中查看其工作方式。
尽管生产者可以完全忽略 ProducerAck
,如果必须处理缓慢的消费者,则 Broker 应该停止传输, 尽管这确实意味着将使整个连接停止。
另一种方案是,当 Broker 上没有可用空间时,对send()
操作抛出异常给生产者(以替换无限期阻塞)。 通过将 sendFailIfNoSpace
属性配置为true
,Broker 将导致 send()
操作失败,并带有javax.jms.ResourceAllocationException
,该异常将传播到生产者。 以下是此配置的示例:
1 | <systemUsage> |
此属性的优点是客户端可以捕获javax.jms.ResourceAllocationException
,稍等后重试send()
操作,而不是无限期地挂起。
从版本5.3.1开始,新增了sendFailIfNoSpaceAfterTimeout
属性,单位毫秒(ms
),此属性设置无可用空间后的等待时间。
此属性的优点是,它将为阻塞设置了超时时间,而不是立即失败或无限期阻塞,此属性不仅为优化了 Broker ,还优化了生产者端,以便生产者在等待时间后再次重试 send()
操作,失败后可以捕获异常。
如果 Broker 没有容间,且在达到超时的时间后仍未释放可用空间,则生产者调用 send()
方法时失败并抛出异常。 下面是一个示例:
1 | <systemUsage> |
从5.16.0版本开始,可以通过目标策略(Destination Policies)在每个目标基础上配置sendFailIfNoSpace
和 sendFailIfNoSpaceAfterTimeout
。
存储空间设置
还可以通过<systemUsage>
元素上的某些属性来降低生产者的速度。如下示例:
1 | <systemUsage> |
可以设置NON_PERSISTENT
消息的内存限制,PERSITENT
消息的磁盘存储以及临时消息的总使用量,Broker 将在降低生产者速度之前使用它们。
使用上面显示的默认设置,代理将阻止 send()
调用,直到一些消息被消费并且 Broker 上的空间可用为止。 上面显示了默认值,可能需要为环境增加这些值。
存储百分比设置
StoreUsage
和 TempUsage
都支持 percentLimit
属性,其中将限制确定为可用总量的百分比。
从版本 5.15.x 开始,还有一个附加的相关total
属性,可用于显式设置可用总数,这样就不会查询文件系统。
如果 Broker 只能使用磁盘分区的一部分,或者底层文件存储报告 > Long.MAX_VALUE
可用容量(例如:EFS),这将溢出 java.io.file#getTotalSpace
的 Long 返回值,则此选项非常有用。
请注意,当指定了total
时,将不会在文件系统中验证可用的实际数据,而只验证相对于该绝对total
的存储使用情况。
时间戳插件
TimeStampPlugin 是 Broker 拦截器,它使用 Broker时间戳更新消息上JMS客户端(生产者)的时间戳。当已知客户端计算机上的时钟不正确并且只能信任在Broker服务器上的时间时,这就很有用。
启用此插件将破坏JMS遵从性,因为生产者在send()
之后在消息上看到的时间戳将不同于消费者在接收消息时观察到的时间戳。
ActiveMQ 默认没有开启此插件,插件属性如下:
属性 | 默认值 | 描述 |
---|---|---|
futureOnly | false | 为true 时,插件不会将消息的时间戳和过期时间设置为低于原始值的值。为 false 时,插件始终会更新消息的时间戳和过期时间。 |
ttlCeiling | 0 | 不为 0 时,值就是过期时间(单位:毫秒)。 |
zeroExpirationOverride | 0 | 不为 0 时,则值为没有设置过期时间的消息的过期时间。 |
要启用 TimeStampPlugin
,将以下内容添加到 ActiveMQ Broker配置中。如下示例:
1 | <plugins> |
Broker / Consumer 时钟同步:
如果消费者本地时钟大于 Broker 的本地时钟,如果使用此插件的默认配置,消息可能不会被消费,因为消费者可能会认为消息已过期。
如果 Broker 与 消费者之间的时间差大于消息到期时间,并且消费者的时钟提前运行(时间跑的更快),则设置 futureOnly=true
。
消息消费优化
预取限制
预取缓存区
ActiveMQ 的设计目标之一是成为高性能的消息总线。这意味着使用SEDA
架构可以异步执行尽可能多的工作。为了有效利用网络资源,Broker 使用 push
模型将消息发送给消费者,消费者端启用了一个本地消息缓冲区 存储未来得及处理的消息。
这样会存在一个问题,若不限制推送到消费者的消息数量的情况下,消费者端资源可能会耗尽,这是因为消费消息通常比传递消息慢得多(因为消费消息要处理业务)。
为了避免此情况,ActiveMQ采用了预取限制,以限制可以立即分发给单个消费者的最大消息数。消费者进而使用 预取限制 来调整其 预取消息缓冲区 的大小。
一旦 Broker 向消费者发送消息数量达到预取限制数量,则不再向该消费者发送更多的消息,直到消费者已经确认了接收到的至少50%的预取消息,例如 prefetch / 2。
当 Broker 收到上述确认后,将向消费者发送补充的prefetch / 2
条消息,以 填充 其预取缓冲区。注意,可以为每个消费者指定预取限制。
建议使用较大的预取值,以实现高性能和高消息量。但是,对于较低的消息量,其中每条消息都需要花费很长时间进行处理,因此应将预取设置为1。这样可以确保消费者一次只处理一条消息。
消费者消费消息的另一种方式是显式地从 Broker 中提取消息,单独提取消息并不是很有效,并且会显著增加每条消息的延迟。
如果将 预取限制 设置为 0
,将导致消费者每次主动轮询消息,而不是 Broker 将消息推送给消费者。
什么时慢消费者?
慢消费者是指其有待处理的消息的数量是配置预取限制数量的两倍以上。
无消息缓冲区
无法缓存预取消息的消费者必须将其预取设置为1。
此类消费者的示例是使用诸如Ruby之类的脚本语言(通过STOMP连接)实现的。 在这种情况下,没有客户端消息缓冲区的概念。
预取策略设置
可以给 ActiveMQConnectionFactory 或 ActiveMQConnection 指定 ActiveMQPrefetchPolicy 。这样可以为所有消费者配置每个预取值
。
对于不同类型的队列,ActiveMQ 默认的预取值也不同,如下。
转发模式 | 队列类型 | 默认预取值 |
---|---|---|
persistent | queues | 1000 |
non-persistent | queues | 1000 |
persistent | topics | 100 |
non-persistent | topics | Short.MAX_VALUE - 1 = (32766) |
可以在建立连接的 URI 上配置预取限制
为所有消费者类型配置预取限制
1
tcp://localhost:61616?jms.prefetchPolicy.all=50
1
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616?jms.prefetchPolicy.all=50");
只为队列消费者配置预取限制
1
tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=1
1
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=1")
也可以基于每个消费者的目标选项配置预取限制
1
2queue = new ActiveMQQueue("TEST.QUEUE?consumer.prefe tchSize=10");
consumer = session.createConsumer(queue);基于 ActiveMQConnectionFactory 或 ActiveMQConnection 设置预取策略
1
2
3
4
5
6
7
8ActiveMQPrefetchPolicy prefetchPolicy = ((ActiveMQConnectionFactory) connectionFactory).getPrefetchPolicy();
Connection connection = connectionFactory.createConnection();
ActiveMQPrefetchPolicy prefetchPolicy2 = ((ActiveMQConnection) connection).getPrefetchPolicy();
prefetchPolicy.setAll(1000);
prefetchPolicy.setQueuePrefetch(1000);
prefetchPolicy.setTopicPrefetch(1000);通过 Properties 属性修改
1
2
3
4Properties properties = new Properties();
properties.setProperty("prefetchPolicy.queuePrefetch", "1000");
properties.setProperty("prefetchPolicy.topicPrefetch", "1000");
((ActiveMQConnectionFactory) connectionFactory).setProperties(properties);
消费者池化
由于预取,从消费者池中消费消息是有问题的。未消费的预取消息仅在消费者关闭时释放,但被池化的消费者会被延迟直到消费者池关闭才关闭。基于此原因,org.apache.activemq.pool.PooledConnectionFactory 不将消费者放在池中。
Springs CachingConnectionFactory
支持消费者池化(默认已关闭),如果将CachingConnectionFactory
与 Springs DefaultMessageListenerContainer(DMLC)
中配置的多个消费者线程一起使用,则需要关闭 CachingConnectionFactory
中的消费者池(默认已关闭),或者在使用池化的消费者时设置预取值为 0
(即关闭预取缓存)。这样,消费者将每次调用轮循来获取消息。通常建议关闭 CachingConnectionFactory
和任何其它允许 JSM 消费者池化的框架中的消费者缓存。
注意,Springs DefaultMessageListenerContainer(DMLC)
及其CACHE_CONSUMER
缓存级别不受此问题影响!从某种意义上讲,Springs DMLC 不会对消费者进行池化,不会将内部池与多个消费者实例一起使用。相反,它缓存消费者,即重复使用相同的JMS消费者对象以在 DMLC 实例的生存期内接收所有消息。因此,它的行为非常类似于正确编写的JMS代码,可以在其中创建JMS连接,会话,消费者,然后使用该消费者实例接收所有消息。
因此,除非使用XA事务,否则即使在多个消费者线程中,在 Springs DMLC
中使用CACHE_CONSUMER
也不会有问题。XA事务不适用于CACHE_CONSUMER
。 但是,本地JMS事务
和 非事务消费者可以很好地使用 Springs DMLC 中的 CACHE_CONSUMER
。
内存与性能平衡
设置相对较高的预取值会有更高的性能。 因此,默认值通常大于1000,Topic
的默认值要高得多,非持久消息的默认值还要高。
预取值的大小决定了消费者的RAM
中将保留多少消息,因此如果RAM
有限,则可能需要设置一个较低的值,例如 1
或10
等。
慢消费者处理
慢消费者:慢消费者是指其有待处理的消息的数量是配置预取限制数量的两倍以上。简单理解,即快速生产者生产的消息不能被 Consumer 及时消费,导致消息积压在 Broker 中。
慢消费问题
慢消费者可能会在非持久性主题(Topic)上引发问题,因为可能导致 Broker 上有大量的消息积压在内存中直到内存被填满,Broker 就会降低生产者的速度,从而导致原本快速的消费者变慢。
针对此问题,ActiveMQ 提供了等待消息策略(Pending Message Limit Strategy)来配置 Broker,除了预取缓冲区外(预取限制),Broker 还会为消费者保留最大数量的匹配消息(等待消息的上限)。
一旦达到这个最大值,当新消息进来时,旧消息被丢弃。这样当前最新消息保留在 RAM 中,并继续向速度较慢的用户发送消息,但可以丢弃旧消息(这样,Broker 不会降低生产者速度,原本快消费者不会变慢)。
等待消息策略
可以在目标映射上配置PendingMessageLimitStrategy
实现类,以便主题命名空间的不同区域可以使用不同的策略来处理速度较慢的使用者。
例如,可能希望对高交易量的价格使用此策略,但对于低交易量的订单和交易,可能不希望丢弃旧消息。
该策略为消费者计算要保留在 RAM 中最大的等待消息(大于其预取大小),此限制值分三类:
- 为零表示除预取限制外不保留任何消息,
- 大于零将保持最多的消息量,随着新消息的到来而丢弃较旧的消息,
- 值为
-1
表示禁用丢弃消息。
当前,该策略有两种不同的实现方式:
ConstantPendingMessageLimitStrategy
此策略对所有消费者使用固定限制(超过其预取大小)。如下示例。
1
<constantPendingMessageLimitStrategy limit="50"/>
PrefetchRatePendingMessageLimitStrategy
此策略是使用消费者预取限制的乘数来计算等待消息的最大数量。例如,可以为每个消费者保留约2.5 倍预取限制的等待消费数量。
1
<prefetchRatePendingMessageLimitStrategy multiplier="2.5"/>
JMS 消费者有一个预取限制策略,可以使用此策略为持久化队列和非持久化队列及主题配置各种预取限制。预取限制策略还允许在每个connection/consumer
基础上指定 maximumPendingMessageLimit
。配置此属性的一个小区别是,为了简化与非 JMS 客户端(如 OpenWire)的操作,将忽略零值,因此可以配置最小值是 1
.
消息丢弃策略
当等待消息限制超过上限后,ActiveMQ 还提供了一个MessageEvictionStrategy
用于确定慢消费者丢弃消息的策略。
oldestMessageEvictionStrategy
:丢弃最旧的消息,默认实现。1
<oldestMessageEvictionStrategy/>
oldestMessageWithLowestPriorityEvictionStrategy
:丢弃优先级最低的最旧消息如果有一些高优先级的消息要进来,则要先逐出较低优先级的消息,即使这些消息是较新的。
1
<oldestMessageWithLowestPriorityEvictionStrategy/>
uniquePropertyMessageEvictionStrategy
:根据自定义属性来丢弃消息。1
<uniquePropertyMessageEvictionStrategy propertyName="STOCK"/>
如上,丢弃属性名为
STOCK
的消息。propertyName
是 JMS 消息属性。
以下示例一个ActiveMQ Broker 配置文件。 对于 PRICES.>
通配符范围中的主题,pendingMessageLimitStrategy
属性设置为对于每个使用者,仅在其预取缓冲区大小之上保留大约10条消息,丢弃最旧的消息。
1 | <beans xmlns="http://www.springframework.org/schema/beans" |
使用小提示:
如果知道某个消费者将变慢,则将其预取限制大小设置为比快速消费者更小!
例如,如果知道一台的服务器速度很慢,并且消息速率很高,且有一些快速消费者,则可能需要启用此功能,并将慢速服务器上的预取值设置为略低于快速服务器上的预取值。
监控慢速消费者
可以使用 JMX 控制台查看活动订阅的统计信息,这允许查看 TopicSubscriptionViewMBean
上的统计信息,如下:
统计 | 定义 |
---|---|
discarded(丢失) | 在订阅的生存期内,因订阅是一个慢消费者而丢弃多少消息的统计 |
matched(匹配) | 一旦预取缓冲区中有容量可用,当前已匹配并分发给订阅的消息数。 因此,非零值表示此订阅的预取缓冲区已满。 |
消息者异步分发
从ActiveMQ v4 版本开始,Broker 对消费者选择同步或异步分发更加易于配置。
现在,它已配置为 URI,Connection 和 ConnectionFactory 的默认值,并且可以通过目标选项(Destination Options)在每个消费者的基础上进行自定义,而不是以前只是传输服务器设置。
更有意义在于,若可以对较慢的消费者执行异步消息传递,对较快的消费者执行同步消息传递(以避免添加另一个SEDA
队列的同步和上下文切换成本)。使用同步消息传递的缺点是,如果向一个慢消费者分发消息,则生产者很有可能被阻塞。
该项的属性已是默认设置dispatchAsync=true
,这是高性能的最佳设置。如果想更好地应对慢消费者,则需要启用此设置。
如果希望获得更高的吞吐量,而使慢消费者的机会较低,则可以将其改为dispatchAsync = false
。
ConnectionFactory 级别
1
((ActiveMQConnectionFactory)connectionFactory).setDispatchAsync(false);
Connection 级别
在此级别配置
dispatchAsync
将覆盖ConnectionFactory
级别的设置。1
2Connection connection = connectionFactory.createConnection();
((ActiveMQConnection) connection).setDispatchAsync(true)Consumer 级别在目标选项上设置
将覆盖 connection and ConnectionFactory 级别。
1
2queue = new ActiveMQQueue("TEST.QUEUE?consumer.dispatchAsync=false");
consumer = session.createConsumer(queue);Broker 传输连接器关闭 Async Dispatch
可以在指定的传输连接器上通过
disableAsyncDispatch
属性禁用异步分发。 一旦尽用,则消费者端不能开启它。1
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616" disableAsyncDispatch="true"/>
消费者优先级
ActiveMQ 除了支持可插拨式的的分发策略(例如循环调度)外,还支持消费者优先级,这样可以给消费者加权重以优化网络跳数。
例如,如果希望代理将消息发送给常规JMS消费者,而不是其它代理(Broker);如果有可用的消费者,则无需使不必要的中间代理(Broker)到目标代理(Broker)跃点。
设置优先级:
使用 目标选项(Destination Options) 来设置消费者优先级:
1 | queue = new ActiveMQQueue("TEST.QUEUE?consumer.priority=10"); |
priority
属性的取值范围是:0
到 127
,最高优先级是 127,默认值是 0。
Broker 根据队列的消费者的优先级对其进行排序,首先将消息分发给优先级最高的消费者。
一旦被选中的消费者的预取缓冲区已满,Broker 将开始向具有次低优先级且预取缓冲区未满的消费者发送消息。
相关参考
官方文档
MQ系列(四):ActiveMQ发送与消费优化,异步,同步,流控,预取限制,慢消费
http://blog.gxitsky.com/2020/01/27/MQ-04-ActiveMQ-producer-consumer-optimization/