世界微动态丨真香,聊聊 RocketMQ 5.0 的 POP 消费模式!
可能还存在限制 Reef 实现更高性能的因素,我们后续将研究 Reef 冻结期间的潜在回归,并继续努力使 Reef 成为迄今为止最好的 Ceph 版本!
(资料图片仅供参考)
大家好,我是君哥。
大家都知道,RocketMQ 消费模式有 PULL 模式和 PUSH 模式,不过本质上都是 PULL 模式,而在实际使用时,一般使用 PUSH 模式。
不过,RocketMQ 的 PUSH 模式有明显的不足,主要体现在以下几个方面:
消息积压了,增加消费者不一定能解决。PUSH 模式如下图:上面的图中,消费组中的消费者每个消费者消费两个 MessageQueue,这种情况下,增加消费者是可以提高消费能力的。
但是下面这张图,每个消费者消费一个 MessageQueue,因为同一个 MessageQueue 只能被同一个消费组中的一个消费者消费,所以增加消费者并不能提高消费能力。
客户端的处理逻辑比较多,比如负载均衡、offset 管理、消费失败后的处理(比如失败消息发送回 Broker),这些逻辑都在客户端。如果再支持其他语言,客户端会变得越来越重。消费者机器 hang 住,可能会导致消息积压,如下图:通过客户端负责均衡,MessageQueue0 这个队列分配给了 Consumer0 进行独占消费,如果 Consumer0 这个消费者 hang 住了,但是服务没有挂,不能从 Name Server 中下线,因为 Consumer0 拉取到的消息不能消费,也就不能给 Broker 发送更新 Offset 的请求,最终导致消息积压。这种情况只能手动让 Consumer0 下线或者让 Consumer0 重启。
RocketMQ 5.0 为了解决 PUSH Consumer 上面的问题,引入了 POP Consumer。
1 POP 客户端
POP 模式的客户端引入的背景是 RocketMQ 5.0为了更好地拥抱云原生,客户端要改造成无状态的轻量级客户端,RocketMQ 4.x 中客户端具有的负载均衡、权限管理、消费管理等功能都从客户端移动到了 Proxy。
POP 消费模式如下图:
四个消费者都可以消费 Broker1 和 Broker2 上面的所有队列,这样即使某一个消费者 hang 住了,其他消费者也可以消费,并不会造成消息积压。
同时,从上图中可以看到,POP 客户端还有一个优势,增加消费者数量是可以提高消费能力的,不受 MessageQueue 数量和消费者数量的限制。
跟 PUSH 模式相比,POP 模式拉取到消息后,会设置一个 POP_CK 属性,代码如下:
//MQClientAPIImpl.javaif (requestHeader instanceof PopMessageRequestHeader) { if (startOffsetInfo == null) { // we should set the check point info to extraInfo field , if the command is popMsg // find pop ck offset String key = messageExt.getTopic() + messageExt.getQueueId(); if (!map.containsKey(messageExt.getTopic() + messageExt.getQueueId())) { map.put(key, ExtraInfoUtil.buildExtraInfo(messageExt.getQueueOffset(), responseHeader.getPopTime(), responseHeader.getInvisibleTime(), responseHeader.getReviveQid(), messageExt.getTopic(), brokerName, messageExt.getQueueId())); } messageExt.getProperties().put(MessageConst.PROPERTY_POP_CK, map.get(key) + MessageConst.KEY_SEPARATOR + messageExt.getQueueOffset()); } else { String queueIdKey = ExtraInfoUtil.getStartOffsetInfoMapKey(messageExt.getTopic(), messageExt.getQueueId()); String queueOffsetKey = ExtraInfoUtil.getQueueOffsetMapKey(messageExt.getTopic(), messageExt.getQueueId(), messageExt.getQueueOffset()); int index = sortMap.get(queueIdKey).indexOf(messageExt.getQueueOffset()); Long msgQueueOffset = msgOffsetInfo.get(queueIdKey).get(index); messageExt.getProperties().put(MessageConst.PROPERTY_POP_CK, ExtraInfoUtil.buildExtraInfo(startOffsetInfo.get(queueIdKey), responseHeader.getPopTime(), responseHeader.getInvisibleTime(), responseHeader.getReviveQid(), messageExt.getTopic(), brokerName, messageExt.getQueueId(), msgQueueOffset) ); //... }}
可以看到,POP_CK 属性包含了 brokerName、Topic、QueueId、offset 等参数,通过这个属性可以唯一标识一条消息了。
从上面的代码还可以看到,responseHeader 中有一个 invisibleTime 属性,这个属性的作用是消费者通过 POP 模式拉取到一条消息后,这段时间(invisibleTime)内这条消息在 Broker 端是不可见的,消费者再次拉取就不会重复拉取到。但是如果过了这段时间,消费者还没有给 Broker 返回 ACK,这条消息会变为可见,再次被消费者拉取到。
消费完成后,向 Broker 发送 ACK 消息,见下面代码:
public void ackMessageAsync( final String addr, final long timeOut, final AckCallback ackCallback, final AckMessageRequestHeader requestHeader //) throws RemotingException, MQBrokerException, InterruptedException { final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.ACK_MESSAGE, requestHeader); this.remotingClient.invokeAsync(addr, request, timeOut, new BaseInvokeCallback(MQClientAPIImpl.this) { @Override public void onComplete(ResponseFuture responseFuture) { RemotingCommand response = responseFuture.getResponseCommand(); if (response != null) { try { AckResult ackResult = new AckResult(); if (ResponseCode.SUCCESS == response.getCode()) { ackResult.setStatus(AckStatus.OK); } //... assert ackResult != null; ackCallback.onSuccess(ackResult); } //... } else { //... } } });}
2. Broker
从上面的介绍可以看到,每个消费者都可以从 Broker 的所有 MessageQueue 上拉取消息,那如果多个消费者都从一个 MessageQueue 上面拉取,有没有可能会重复消费呢?
Broker 收到消息拉取请求,从 MessageStore 拉取消息时,首先会给 MessageQueue 进行加锁,加锁成功后,才会拉取消息,这是其他客户端来拉取时就会加锁失败。
//PopMessageProcessor.javaString lockKey = topic + PopAckConstants.SPLIT + requestHeader.getConsumerGroup() + PopAckConstants.SPLIT + queueId;long offset = getPopOffset(topic, requestHeader, queueId, false, lockKey);if (!queueLockManager.tryLock(lockKey)) { restNum = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - offset + restNum; return restNum;}
Broker 从 MessageStore 拉取到消息后,会定义一个 CheckPoint 放入缓存,代码如下:
//PopMessageProcessor.javaprivate long popMsgFromQueue(boolean isRetry, GetMessageResult getMessageResult, PopMessageRequestHeader requestHeader, int queueId, long restNum, int reviveQid, Channel channel, long popTime, ExpressionMessageFilter messageFilter, StringBuilder startOffsetInfo, StringBuilder msgOffsetInfo, StringBuilder orderCountInfo) { String topic = isRetry ? KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), requestHeader.getConsumerGroup()) : requestHeader.getTopic(); String lockKey = topic + PopAckConstants.SPLIT + requestHeader.getConsumerGroup() + PopAckConstants.SPLIT + queueId; //... offset = getPopOffset(topic, requestHeader, queueId, true, lockKey); GetMessageResult getMessageTmpResult = null; try { //... restNum = getMessageTmpResult.getMaxOffset() - getMessageTmpResult.getNextBeginOffset() + restNum; if (!getMessageTmpResult.getMessageMapedList().isEmpty()) { if (isOrder) { //... } else { appendCheckPoint(requestHeader, topic, reviveQid, queueId, offset, getMessageTmpResult, popTime, this.brokerController.getBrokerConfig().getBrokerName()); } } //... } //... return restNum;}
Broker 收到消费者发来的 ACK 后,会把 CheckPoint 从缓存中移除。
如果 Broker 一直没有收到 ACK,则会把 CheckPoint 从缓存中移除,同时把 CheckPoint 发送给 MessageStore,由 MessageStore 发送到重试队列。代码如下:
boolean removeCk = !this.serving; // ck will be timeout if (point.getReviveTime() - now < brokerController.getBrokerConfig().getPopCkStayBufferTimeOut()) { removeCk = true; } // the time stayed is too long if (now - point.getPopTime() > brokerController.getBrokerConfig().getPopCkStayBufferTime()) { removeCk = true; } // double check if (removeCk) { // put buffer ak to store if (pointWrapper.getReviveQueueOffset() < 0) { putCkToStore(pointWrapper, false); } }}
3 总结
POP 客户端有很多的优势,总结如下:
无状态,更好地拥抱云原生;计算相关的功能下移到 Proxy,更加轻量级;消费能力扩展不受 MessageQueue 数量的限制;消费者 hang 住,并不会导致消息积压。标签:
推荐文章
- 世界微动态丨真香,聊聊 RocketMQ 5.0 的 POP 消费模式!
- 被视为格力“第二个董明珠”的孟羽童被开除:人设崩了? 全球头条
- 贵州持续深耕茶品牌体系建设:省级区域公用品牌+区域主要公用品牌+企业品牌联动发展
- 每日看点!光明日报丨为祁连山生态环境贡献巾帼力量
- 鹤壁相关部门回应“10万元买8套房”:未查到购房者信息 热议
- 杨玉环泡温泉的诗句(泡温泉的诗句)-全球今日报
- 全球时讯:墨西哥新莱昂州州长:起亚将在该州投资扩建工厂,生产两款电动汽车
- 机构:今晚成品油零售限价或遇“两连跌” 部分地区柴油有望步入“6元时代”
- 【当前独家】尿道结石怎么排出来的(尿道结石怎么排出来)
- 【聚看点】网易同城登录_网易同城登陆
- 世界热门:上海:制止餐饮浪费 倡导文明消费
- 苏州工资扣税最新标准_工资扣税标准2020标准
- 厦门忠仑公园人行天桥“大变身” 这些方面做了升级改造
- 国家统计局:4月份规模以上工业增加值同比增长5.6%|全球热头条
- 王强调研水利工程防汛备汛工作
- 哪种蛋最营养?
- 环球观天下!北京9区初三二模时间已出!5月17日开考!(含私立校)
- 年内第5次!成品油调价时间定了
- 环球速看:施工组织设计包括哪些内容 实施性施工组织设计包括哪些内容
- 外媒:ROG掌机轻松模拟PS3、XBOX360及Switch
- 奔跑吧第三季2019-奔跑吧第三季嘉宾
- 当前短讯!回馈社会是企业的责任_回馈社会是什么意思
- 魔兽世界黑翼血环单刷_魔兽世界黑翼血环副本入口在哪里|世界快看
- 从一个路由器接到另一个路由器怎么接_从一个路由器接到另一个路由器怎么设置
- 江苏阳光: 江苏阳光股份有限公司关于控股股东上层股权结构变更的公告 天天最新
- 今日精选:ps如何快速去白边_ps如何去白边
- 全国特色化示范性软件学院工业软件创新发展论坛举行 聚焦
- 天天实时:在工作群内发不雅信息,官方通报:涉事双方均被双开
- 跟生活发生化学反应,“偶遇·石磊”深圳开展
- 全球热点评!30+“美人鱼”决战南京路!还有一条男“美人鱼”哟
- 全球百事通!华兴源创(688001):华兴源创:关于实施超额业绩奖励暨关联交易
- 各地区2023年高级经济师报名费用及缴费时间汇总 环球快报
- 每日精选:i5和i7的区别是什么_i5和i7的区别
- 翻译:罗兰加洛斯冠军之争:奥斯塔彭科送给克雷娃1枚咸鸭蛋
- 2023年4月中国汽车保值率报告:吉利缤瑞同级最高,GL8再登MPV榜首 视讯
- 专家建议五年内禁售燃油车
- 环球热议:整个的酱油鸡怎么做好吃方便
- 天天速读:5月15日 13:01分 保龄宝(002286)股价快速拉升
- 南通沿海开发集团将兑付2亿超短期融资券 利率为2.55%-全球新动态
- 白茫茫盐碱地变成米粮川!我国盐碱地综合利用从“治理”转向“适应”-天天要闻
- 当前关注:华为入股智能物联网操作系统研发商鸿湖万联
- 广告无底线,罚你没商量
- 当前通讯!民调:五成一不支持民进党2024继续执政
- 诺康达药学研究服务毛利率大跌11个点,曾IPO失利触发补偿协议|观焦点
- 东南亚两轮车:供给与需求的共振,企业加速出口步伐 当前关注
- 动态:广西初步建成山洪灾害防御体系
- 中食民安发布一季度业绩 股东应占溢利43.5万新加坡元同比增加47.46%
- 精彩看点:合肥融入长三角共绘“幸福圈”
- (原神同人文/空X刻晴)刻晴:我玉衡星不能有点特权嘛(中)占有欲
- 中超最新积分榜:武磊双响上海海港登顶,山东泰山遭遇4轮不胜 每日速看
- 士之耽兮犹可说也耽是什么意思_士之耽兮
- 分度头的使用方法讲解视频_分度头的使用方法_天天即时
- 曝张雨绮前夫袁巴元丑闻!私下睡女网友致其怀孕,生子后不管母子
- 南粤古驿道定向赛首次跑进广东清远佛冈
最新资讯
- 环球报道:国产鱼竿性价比高的有哪几种_国产最好的鱼竿品牌
- 日本本州东南海域发生5.3级地震,震源深度10公里|每日热讯
- 虽然召回百万辆,但特斯拉依然赢了
- 炒股能有稳定盈利吗?
- 环球简讯:皮肤有小红点像血点是怎么回事_皮肤上有小红血点是什么原因
- 天天热点!试车日志|北京现代MUFASA ix35继任车型 新青年们会买单吗?
- 天目山上新物种:一棵独特的“葱”|当前独家
- 【全球热闻】苹果为保密iPhone 15不惜钓鱼执法
- 印度去年从俄罗斯进口原油量增数倍_天天时快讯
- 当前快看:瑞典皇家工程科学院_关于瑞典皇家工程科学院介绍
- 悼念科比图片_nba悼念科比
- 500 余家苹果授权专营店入驻饿了么,购iPhone14系列最高减1500元
- 如果一个男人放不下你,他会这样做 世界球精选
- 全球快资讯:今天夏天又四十度了的意思(今年夏天又四十度是什么意思)
- 中国流动科技馆巡展活动洪江区站举行启动仪式 焦点精选
- 全球看点:减持到5%以下未及时披露,ST金鸿原大股东益豪企业收监管函
- 2023征信太乱怎么借到钱 这几个平台助你解决资金难题
- 天天看热讯:重庆地铁一号线磁器口_重庆地铁一号线票价
- 焦点快报!“山体滑坡,河道翻船”!搜救演练模拟“水中自救”
- 快看点丨猪肝粥,宝宝辅食怎么做?
- 港元对美金汇率查询(2023年5月13日)
- 太平洋西部银行股价暴跌 专家:美国银行业危机仍在蔓延
- “微”中见“大”的光影力量-每日速看
- 全球快消息!科普帖!警惕毕业季8大求职陷阱
- 世界头条:一波还未平息,一波又来侵袭!16日广西再迎强降雨
- 【全球时快讯】长公主小说墨书白_长公主popo
- 贵州集中表彰一批技能突出护理工作者 天天热门
- 中国Z世代拥抱“国潮”化妆品,国货品牌正在“狂飙猛进” 当前消息
- 耶伦再警告!
- 专论 || 从零部件进出口看中国汽车产业链
- 全球热讯:零封,又见零封,JDG零封GG!网友:MSI真正的正赛,即将开赛!
- 巴萨美国行赛程:10天连战尤文、枪手、皇马、米兰
- 环球短讯!什么是水貂毛睫毛 什么是水貂毛
- 丰田汽车:人为错误致日本215万用户的车辆数据已被公开十年,面临数据泄露风险
- 聚焦:最迟明年,峨眉山景区不再“一条独路上山”!
- 焦点热门:百乐juice果汁笔_百乐笔为什么那么贵
- 环球资讯:会畅通讯:公司云视频硬件业务覆盖国内和国外客户
- 环球即时看!“铁路+旅游”激活流动中国 助力旅游经济持续升温
- 官埠桥镇:群众参与村湾整治 共同缔造美好环境_热文
- 意甲球队本赛季欧战成绩非常不错
- 吴以岭院士:中医理论创新与现代科技相结合抗衰老研究取得突破 即时看
- 【全球新要闻】克洛普:无缘欧冠会让引援变难,但我们也曾见证过利物浦的奇迹
- 辽宁4-0?吴前训练意外遭遇肋骨受伤,送往沈阳医院捂肋部一脸痛苦
- 每日观察!炒菜必备哪些调料_炒菜必备调料
- 高盛:维持华虹半导体(01347)“买入”评级 目标价下调至42港元 今亮点
- 2023年美国经济“软着陆”困难重重 专家:一季度或现较快下行-天天报资讯
- 潮流是个轮回!戴安娜王妃的无袖穿搭再度流行
- 全球即时:18个“一件事套餐” “金青新广”四地可跨域一次办
- 焦点关注:evis是什么意思_evus是什么意思
- 马龙谈世乒赛氛围:谁都会紧张,那种感觉一辈子都不会忘!_世界新动态