参考文章:
1、【仓储出库中枢】应用重启过程metaq rebalance引发消费停滞问题根治
https://ata.alibaba-inc.com/articles/200262?spm=a1z2e.8101737.webpage.dtitle2.3eb54f9bt5QWqE
2、【RocketMQ实战 丁威、梁勇 著】
分布式下如何提高性能(高并发)
https://zhuanlan.zhihu.com/p/402660843
顺序消息,如何提高并发度
官方的顺序消息的原理
1、发送端:顺序消息写入的Topic,by hash(key) -> 写入到 指定的consumeQueue
2、消费端:定义的线程线程数 = 1,即可实现顺序消费
性能瓶颈:
1、消费重平衡,需要对broker加锁
2、找对应consumeQueue,也需要加锁
3、拉取任务之后,需要对本地的processQueue加上
加锁的地方比较多,性能较差
优化方案(消费端优化)
性能瓶颈分析:多个加锁,可以针对业务的需求,提升性能
如,对不同账号userId,就可以对不同的账号之间进行并发消费
1、重写消费者类
实现DefaultLitePullConsumer,实现自定义的pull模式
重写execute方法
2、加入多线程,生成n个消费线程,对key % n 路由 到对应的消费线程中,实现业务级的多线程
metaQ通过tag进行过滤,可以降本
基本组件
Broker
Producer
Consumer
NameServer
高性能的优化点
异步复制
A:客户端写、先返回CompleteableFuture,
B:然后去落盘、复制副本,
C:然后再future.complete返回给客户端
B期间,还可以处理其他写情况,吞吐量较高
C阶段,符合同步复制的语义
线程模型:netty+reactor模型
快速失败
顺序写
commitlog
定长文件、页缓存、内存映射
在linux中,MappedByteBuffer 约等于 页缓存
内核级读写分离
transientStorePoolEnable = true
写入page Cache 后,批量刷新到磁盘 =》
开启后:堆外内存 + 定时刷新到磁盘
零拷贝:
在netty中,要读取文件,将文件基于内存映射,获取为一个ByteBuf,然后封装到FileRegion中,最后使用transferTo(底层为FileChannel.trasferTo),直接发送到网卡
常用场景
顺序消费(局部顺序)
加了三把锁:
1、队列重平衡之后,添加拉取任务之后,回向broker加锁
2、线程池消费的时候,会对消息队列加锁,确保同一个消息队列不会被并行执行
3、还会对处理队列加锁,保障重新负载的时候,不会被重复消费,只有当处理队列中的消息全部处理后,在会释放锁
延迟消息
内部默认有18个级别的延迟队列
topic修改为"SCHEDULER_TOPIC_XXX"
每个级别都有定时任务捞取,满足条件后,重新放入broker
广播消息原理
1、发送方正常发送
2、消费方订阅的时候,设置为广播订阅
consumer.setMessageModel(MessageModel.BROADCASTING);
官方文档上的说明
消费模式
pull(lite pull)
优点:
消息拉取以消费组为维度,且默认会创建20个拉取任务,消息拉取效率高,适合大数据的批处理任务
对实时性要求不高
缺点:
消费点位自动提交poll之后,就自动提交了,处理不好就容易丢失消息
短轮询:只轮询一次,默认等待时间1秒
长轮询:默认阻塞5秒,等待时间长,支持被唤醒,降低无效的拉取频率
push
默认16次重试
限流机制
消费端限流:
消费队列中积压的消息数:超过1000条
待处理的消息最大偏移量和最小偏移量,相差超过1000
消息队列中消息总大小超过100MB
任意满足,即限流
服务端限流:
若消息存在内存中,则运行拉取的最大消息大小,默认256KB
若消息存在内存中,则运行拉取的最大消息条数,默认32
若消息存在磁盘中,则运行拉取的最大消息大小,默认64KB
若消息存在磁盘中,则运行拉取的最大消息条数,默认8
根据待拉取消息的物理偏移量和当前commitlog文件的最大偏移量之差超过内存的40%,就认为需要从磁盘中拉
部署模式
主从模式
多个master-slave组合
master 写,slave读
多副本架构模式(Dledger)
多个raft复制组,强一致性,每个组内只有leader可读写,会自动选举leader
基于raft协议,一个复制组,至少需要3台broker
Q:主从如何迁移到多副本架构
三步走:
1、多副本的raft节点加入原集群
2、逐步关闭原主从节点的写权限
3、过了日志存储时间后,下线原节点
几种异常情况
1、broker 的PageCache 压力大
若broker 追加page cache 锁超时1秒,就抛这个异常
解决办法:
开启transientStorePoolEnable = true(内核级读写分离)
即把消息先写入堆外内存,由于该内存启用了锁定机制,性能接近直接操作内存
然后后台线程会定时批量写入page Cache,将单条变为批量,提高写入性能
但:有数据丢失的可能
2、发送线程池积压的拒绝策略
线程池中的有界队列长度默认为1000,超过后就会抛出错误
开启1中的配置后,还异常,则需要扩容了
3、Broker 端快速失败
broker 尚未page cache 忙,但是有一些发送请求在队列中等了超过200ms,也认为是繁忙,需要客户端自己捕获并处理
4、订阅关系不一致,导致消息丢失
同一个消费者组,每个机器上订阅不同的tag,则会导致消息丢失(会被过滤而不消费)
若每个机器上订阅的tag相同,则不会丢失消息
5、同一台机器上,clientId不唯一,导致不消费
和负载均衡算法有关
6、NameServer 间路由信息是最终一致的,对发送、消费的影响
发送:有发送方自己保证,可能会造成发送不均衡,不会丢消息
消费:可能造成部分消费、部分不消费,不会丢消息,且易于监控、发现,易恢复
7、NameServer间路由变更及时性问题
发送:若有broker挂掉,但是路由未更新,可能导致消息发送失败
解决办法:失败规避,客户端重试
消费:消费端重试,较好弥补
总结:NameServer 存在的问题,虽然可能致命,但是由生产者、消费者自己去保障、解决(发送重试、消费幂等),成本较低,且能保障可用性、稳定性。因此,职责分离,允许这些情况的发生
文件恢复机制
commitlog 文件异步构建 index 和 consumeQueue的过程中,可能有异常
基于checkpoint、abort文件
若有abort文件,则代表异常退出
1、从commitlog倒数第三个文件,开始恢复
消息轨迹
有默认的主题:RMQ_SYS_TRACE_TOPIC
还可以自定义
企业级场景
消息流量隔离
打标,修改topic
使用封装sdk的方式完成,如dpath的功能
任意时间消息延迟方案
一般的方案:
1、定时轮询存储介质(DB、redis等)
2、时间轮 和 本地存储索引
3、基于开源消息中间件的二次开发
每级别的延迟队列,都有消息的逻辑
搭建定时消息服务,负责消息轮转,传递下一等级的消息
同一延迟级别的消息,进行排序(可基于数据库、时间轮等)
消息资源容灾迁移方案
同城跨可用区部署 、
master、slave,互相同城跨可用区交叉部署
资源迁移
主题迁移
消费组迁移
集群迁移
跨集群复制方案
将一个集群的消息,复制到另一个集群,尝用于异地多活
RocketMQ的编程技巧
1、读写锁
用在存储路由元素的hashMap上,用ReentrantReadWriteLock
Q:为什么不用ConcurrentHashMap
A:ConcurrentHashMap 只针对自身安全,多个map之间,并不能保证并发安全
2、信号量的使用
异步发送的时候,用来控制并发度,避免限流
3、同步转异步
没使用Future
而是使用CountDownlatch
4、CompletableFuture