RocketMQ


参考文章:

1、【仓储出库中枢】应用重启过程metaq rebalance引发消费停滞问题根治

https://ata.alibaba-inc.com/articles/200262?spm=a1z2e.8101737.webpage.dtitle2.3eb54f9bt5QWqE

2、【RocketMQ实战 丁威、梁勇 著】

分布式下如何提高性能(高并发)

https://zhuanlan.zhihu.com/p/402660843

顺序消息,如何提高并发度

官方的顺序消息的原理

1、发送端:顺序消息写入的Topic,by hash(key) -> 写入到 指定的consumeQueue

2、消费端:定义的线程线程数 = 1,即可实现顺序消费

性能瓶颈:

1、消费重平衡,需要对broker加锁

2、找对应consumeQueue,也需要加锁

3、拉取任务之后,需要对本地的processQueue加上

加锁的地方比较多,性能较差

优化方案(消费端优化)

性能瓶颈分析:多个加锁,可以针对业务的需求,提升性能

如,对不同账号userId,就可以对不同的账号之间进行并发消费

1、重写消费者类

实现DefaultLitePullConsumer,实现自定义的pull模式

重写execute方法

2、加入多线程,生成n个消费线程,对key % n 路由 到对应的消费线程中,实现业务级的多线程

metaQ通过tag进行过滤,可以降本

基本组件

Broker

Producer

Consumer

NameServer

高性能的优化点

异步复制

A:客户端写、先返回CompleteableFuture,

B:然后去落盘、复制副本,

C:然后再future.complete返回给客户端

B期间,还可以处理其他写情况,吞吐量较高

C阶段,符合同步复制的语义

线程模型:netty+reactor模型

快速失败

顺序写

commitlog

定长文件、页缓存、内存映射

在linux中,MappedByteBuffer 约等于 页缓存

内核级读写分离

transientStorePoolEnable = true

写入page Cache 后,批量刷新到磁盘 =》

开启后:堆外内存 + 定时刷新到磁盘

零拷贝:

在netty中,要读取文件,将文件基于内存映射,获取为一个ByteBuf,然后封装到FileRegion中,最后使用transferTo(底层为FileChannel.trasferTo),直接发送到网卡

常用场景

顺序消费(局部顺序)

加了三把锁:

1、队列重平衡之后,添加拉取任务之后,回向broker加锁

2、线程池消费的时候,会对消息队列加锁,确保同一个消息队列不会被并行执行

3、还会对处理队列加锁,保障重新负载的时候,不会被重复消费,只有当处理队列中的消息全部处理后,在会释放锁

延迟消息

内部默认有18个级别的延迟队列

topic修改为"SCHEDULER_TOPIC_XXX"

每个级别都有定时任务捞取,满足条件后,重新放入broker

广播消息原理

1、发送方正常发送

2、消费方订阅的时候,设置为广播订阅

consumer.setMessageModel(MessageModel.BROADCASTING);

官方文档上的说明

消费模式

pull(lite pull)

优点:

消息拉取以消费组为维度,且默认会创建20个拉取任务,消息拉取效率高,适合大数据的批处理任务

对实时性要求不高

缺点:

消费点位自动提交poll之后,就自动提交了,处理不好就容易丢失消息

短轮询:只轮询一次,默认等待时间1秒

长轮询:默认阻塞5秒,等待时间长,支持被唤醒,降低无效的拉取频率

push

默认16次重试

限流机制

消费端限流:

消费队列中积压的消息数:超过1000条

待处理的消息最大偏移量和最小偏移量,相差超过1000

消息队列中消息总大小超过100MB

任意满足,即限流

服务端限流:

若消息存在内存中,则运行拉取的最大消息大小,默认256KB

若消息存在内存中,则运行拉取的最大消息条数,默认32

若消息存在磁盘中,则运行拉取的最大消息大小,默认64KB

若消息存在磁盘中,则运行拉取的最大消息条数,默认8

根据待拉取消息的物理偏移量和当前commitlog文件的最大偏移量之差超过内存的40%,就认为需要从磁盘中拉

部署模式

主从模式

多个master-slave组合

master 写,slave读

多副本架构模式(Dledger)

多个raft复制组,强一致性,每个组内只有leader可读写,会自动选举leader

基于raft协议,一个复制组,至少需要3台broker

Q:主从如何迁移到多副本架构

三步走:

1、多副本的raft节点加入原集群

2、逐步关闭原主从节点的写权限

3、过了日志存储时间后,下线原节点

几种异常情况

1、broker 的PageCache 压力大

若broker 追加page cache 锁超时1秒,就抛这个异常

解决办法:

开启transientStorePoolEnable = true(内核级读写分离)

即把消息先写入堆外内存,由于该内存启用了锁定机制,性能接近直接操作内存

然后后台线程会定时批量写入page Cache,将单条变为批量,提高写入性能

但:有数据丢失的可能

2、发送线程池积压的拒绝策略

线程池中的有界队列长度默认为1000,超过后就会抛出错误

开启1中的配置后,还异常,则需要扩容了

3、Broker 端快速失败

broker 尚未page cache 忙,但是有一些发送请求在队列中等了超过200ms,也认为是繁忙,需要客户端自己捕获并处理

4、订阅关系不一致,导致消息丢失

同一个消费者组,每个机器上订阅不同的tag,则会导致消息丢失(会被过滤而不消费)

若每个机器上订阅的tag相同,则不会丢失消息

5、同一台机器上,clientId不唯一,导致不消费

和负载均衡算法有关

6、NameServer 间路由信息是最终一致的,对发送、消费的影响

发送:有发送方自己保证,可能会造成发送不均衡,不会丢消息

消费:可能造成部分消费、部分不消费,不会丢消息,且易于监控、发现,易恢复

7、NameServer间路由变更及时性问题

发送:若有broker挂掉,但是路由未更新,可能导致消息发送失败

解决办法:失败规避,客户端重试

消费:消费端重试,较好弥补

总结:NameServer 存在的问题,虽然可能致命,但是由生产者、消费者自己去保障、解决(发送重试、消费幂等),成本较低,且能保障可用性、稳定性。因此,职责分离,允许这些情况的发生

文件恢复机制

commitlog 文件异步构建 index 和 consumeQueue的过程中,可能有异常

基于checkpoint、abort文件

若有abort文件,则代表异常退出

1、从commitlog倒数第三个文件,开始恢复

消息轨迹

有默认的主题:RMQ_SYS_TRACE_TOPIC
还可以自定义

企业级场景

消息流量隔离

打标,修改topic

使用封装sdk的方式完成,如dpath的功能

任意时间消息延迟方案

一般的方案:

1、定时轮询存储介质(DB、redis等)

2、时间轮 和 本地存储索引

3、基于开源消息中间件的二次开发

每级别的延迟队列,都有消息的逻辑

搭建定时消息服务,负责消息轮转,传递下一等级的消息

同一延迟级别的消息,进行排序(可基于数据库、时间轮等)

消息资源容灾迁移方案

同城跨可用区部署 、

master、slave,互相同城跨可用区交叉部署

资源迁移

主题迁移

消费组迁移

集群迁移

跨集群复制方案

将一个集群的消息,复制到另一个集群,尝用于异地多活

RocketMQ的编程技巧

1、读写锁

用在存储路由元素的hashMap上,用ReentrantReadWriteLock

Q:为什么不用ConcurrentHashMap

A:ConcurrentHashMap 只针对自身安全,多个map之间,并不能保证并发安全

2、信号量的使用

异步发送的时候,用来控制并发度,避免限流

3、同步转异步

没使用Future

而是使用CountDownlatch

4、CompletableFuture

RocketMQ原理.drawio


文章作者: 王利康
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 王利康 !
  目录