0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看技术视频
  • 写文章/发帖/加入社区
会员中心
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

RocketMQ中各类重复消费的原理浅析

jf_uPRfTJDa 来源: 移动Labs 2024-01-08 09:29 次阅读

Labs 导读

随着大数据和云计算时代的到来,我国的各个产业每天都在产生不可估计的数据,以及对数据的各式各样的需求,消息中间件在处理数据、消费数据的过程中越来越受到重视。在高并发、微服务、分布式的场景下,如何合理地利用消息中间件,如何保证MQ消费消息的幂等性?所谓知其然,才能知其所以然,本文将通过RocketMQ作为例子,来扒一扒什么情况下会导致重复消费。

作者:李佳斌

单位:中国移动智慧家庭运营中心

Part 01RocketMQ如何生产和消费消息

先简单介绍下RocketMQ正常生产消息和消费消息的流程,如下图。

1.生产者在发送消息之前根据负载均衡策略(默认是轮询)选择一个Queue,然后跟这个Queue所在的机器建立连接,把消息发送到这个Queue上。

2.消费者消费这个Queue,就能获取到对应的消息。

8861f07e-ad40-11ee-8b88-92fbcf53809c.png

- 问题出现

当异常情况出现时,如消息发送超时或者消息消费超时,RocketMQ为保证消息发送成功,会启动重试机制,选择另一台机器的Queue重发。现在假设有这样一种情况,消费者实际正确接收到了消息,只是由于网络波动导致响应超时了,那就会出现消息重复发送,导致消费者重复消费的情况出现。

那除此之外,还有没有其他情况会导致消息重复消费的情况呢?总结起来一共有如下几种情况。

1)消息发送异常时的重复消费

2)消费消息时抛出了异常

3)消费者提交offset失败

4)Broker持久化offset失败

5)主从同步失败

6)重平衡

7)清理长时间消费的消息

Part 02浅析各类情况

- 消费消息时抛出异常

问题分析一

RocketMQ在并发消费的模式下会调用MessageListenerConcurrently的consumeMessage方法,入参是msgs集合。当调用该方法消费消息出现异常时,返回的结果status就会是null。这种情况下会导致status被设置为RECONSUME_LATER,也就是说消息之后会被重复消费。

问题分析二

传入的是msgs集合。上述原因一中消息处理之后,不管成功失败,都会对结果进行处理。而集合中的任意一个失败,都会导致status被设置为RECONSUME_LATER。在对结果处理是,判断到RECONSUME_LATER时,就会对msgs重新遍历并发送消息,重新消费,从而导致之前成功处理的消息都会被重复消费。不过好在msgs消息的数量默认情况下是1。

88792f32-ad40-11ee-8b88-92fbcf53809c.png

- 消费者提交offset失败

何为offset

producer发送消息到broker,Rocketmq会将消息的内容持久化到commitLog文件中,再分发到topic下的消费队列consume Queue,消费者提交消费请求时,broker从该consumer负责的消费队列中根据请求参数传入的起始offset来获取需要消费的消息索引信息,再从commitLog中获取具体的消息内容返回给consumer。消费成功之后,消费者提交offset,来记录这个queue消费到哪个位置了。

问题分析

RocketMq设计的时候,消费完消息,并不是同步提交offset,而是将offset保存到内存中,通过一个定时任务(默认是5S一次),以网络请求的方式将offset提交给broker。如果最新的offset还没提交,此时服务器宕机了,那么重启之后,就会从broker中读取到之前的提交的offset,并从此处开始消费,此时就会出现重复消费的情况了。

888b513a-ad40-11ee-8b88-92fbcf53809c.png

- broker持久化offset失败

问题分析

与消费者提交offset同理,Broker为了防止数据丢失,会将offset持久化到磁盘中。同样的也是通过一个默认5S的定时任务来处理持久化操作。所以offset的完整过程就如下图。当broker宕机时,就会导致offset丢失,此时如果消费者重新拉取消费进度,就会比实际消费的进度要低,导致重复消费。

88a54d42-ad40-11ee-8b88-92fbcf53809c.png

- 主从同步失败

问题分析

为保证RocketMQ服务的高可用,一般项目中都会启用主从备份的模式,当主节点挂掉之后,从节点就会升级为主节点对外提供服务。因此就需要进行主从同步,保证数据的一致性。默认情况下每隔10S,从节点会向主节点请求,同步元数据,包括消费进度。此时如果主节点宕机了,从节点就无法获取到10S之内的消费进度,自然也就会导致重复消费。

88bf60a6-ad40-11ee-8b88-92fbcf53809c.png

- 重平衡

何为重平衡

RocketMQ的消费者有两种模式,集群消费模式和广播消费模式,绝大多数场景采用的都是集群消费模式。前面提到的消费进度就是在集群消费模式下才会存在。集群消费模式中有一个消费组的概念。一个消费组可以有多个消费者,不同消费组之间消费消息互不干扰,而同一消费组的消费者按照一定的算法分配消息队列进行消息消费,保证一个消息只能被一个消费组消费一次。当消费组中的消费组增加或者减少时就会触发重平衡。如图,原先消费组中有两个消费者,平均消费4个队列,每个消费组2个队列;当加入了一个新的消费者时,为了保证新的消费者能够消费消息,就会进行重平衡,重新分配消息队列。

88d971d0-ad40-11ee-8b88-92fbcf53809c.png

问题分析

假设在重平衡发生时,此时消费者2还在正常消费Queue4,当消费者3加入,重平衡完成时,此时消费者2判断到Queue4已经不属于自己消费了,就会将Queue4设置为dropped,消费完成时,发现队列是dropped状态,那么消费者2的消费进度offset就不会被提交。成功消费了消息,但是消费进度却没有被提交,于是当消费者3开始消费消息时,就会从服务端拉取到之前的消费进度,造成队列4的消息被重复消费。

- 清理长时间消费的消息

清理机制讲解

RocketMQ中有一个机制会定时清理长时间正在消费的消息,默认是15分钟执行一次清理任务。之所以这么做,是有原因的。我们说过,消息被消费之后,就会提交offset。当一个线程消费了所有消息时,就会把消息从集合中移除,提交的消息进度offset就是msg5的offset+1。

假设,现在是两个线程消费,线程2消费完成,之后提交offset,但是此时线程1还在处理前两条消息,因此为了保证消费消息的不丢失,移除之后发现集合中还有剩余消息,就会把msg1的offset返回提交上去。而一旦集合最前面的消息长时间处理,就会导致这个消费进度一直在最前面。此时如果服务器重启,就会导致很多消费过的消息都会被重复消费。因此引入了清理长时间消费的机制。

88f595a4-ad40-11ee-8b88-92fbcf53809c.png

问题分析

引入清理长时间消费的消息机制后,一旦发现某个消息已经处理超过15分钟了,就会将消息移除,保障后续消息消费进度的正常提交,之后会隔一定的时间再次消费这个被移除的消息。但是,这个消息虽然被移除了,却并不是没有消费过,因此再次消费就会导致重复消费的问题出现。

Part 03总结

RocketMq的官方文档中对消息传递有这样的解释:RocketMq确保所有消息至少被传递一次,在大多数情况下,消息不会重复。可见RocketMq为了保证消息的不丢失,牺牲了消息投递的重复率。因此我们在使用RokcetMq时需要合理使用它的特点,设计合理的幂等技术方案来解决重复消费的问题。

审核编辑:汤梓红

声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • 云计算
    +关注

    关注

    39

    文章

    7726

    浏览量

    137181
  • Queue
    +关注

    关注

    0

    文章

    16

    浏览量

    7254
  • 负载均衡
    +关注

    关注

    0

    文章

    105

    浏览量

    12354
  • 大数据
    +关注

    关注

    64

    文章

    8862

    浏览量

    137278

原文标题:技术 | RocketMQ中各类重复消费的原理浅析

文章出处:【微信号:5G通信,微信公众号:5G通信】欢迎添加关注!文章转载请注明出处。

收藏 人收藏

    评论

    相关推荐

    RocketMQ入门手册

    RocketMQ入门篇
    发表于 10-09 14:13

    Rocketmq怎么安装

    Rocketmq 安装步骤
    发表于 10-24 07:47

    浅析伺服系统应用的惯量匹配问题

    刚性、惯量、响应时间及伺服增益调整之间的关系 浅析伺服系统应用的惯量匹配问题-惯量匹配
    发表于 09-07 07:01

    在Linux系统下部署RocketMQ单机实例

    前言这篇文章以4.3.0版本为标准进行讲述在linux下部署RocketMQ单机实例,在此之前需要已配置JAVA环境。下载程序包直接使用一般就下载已经编译好的二进制文件就好了,下载好以后&
    发表于 11-11 16:29

    展望Apache RocketMQ5.0 | 谈RocketMQ的过去、现在和未来

    RocketMQ 创始人,阿里巴巴中间件高级技术专家 冯嘉 向开发者们分享了Apache RocketMQ 的过去、现在和未来,以及对RocketMQ5.0的展望。本文是根据冯嘉的现场分享所整理,为大家回顾分享
    发表于 08-14 16:37 190次阅读

    全面简析RocketMQ 架构

    Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。每个 Broker 可以存储多个Topic的消息,每个Topic的消息也可以分片存储于集群的不同的Broker
    的头像 发表于 06-12 17:07 2019次阅读

    Apache RocketMQ MQTT协议架构模型

    rocketmq-mqtt.zip
    发表于 04-20 10:45 0次下载
    Apache <b class='flag-5'>RocketMQ</b> MQTT协议架构模型

    开源软件-RocketMQ Externals Apache RocketMQ的扩展项目

    ./oschina_soft/rocketmq-externals.zip
    发表于 06-23 15:03 0次下载
    开源软件-<b class='flag-5'>RocketMQ</b> Externals Apache <b class='flag-5'>RocketMQ</b>的扩展项目

    如何在RocketMQ合理使用重试机制

    RocketMQ 的重试机制包括三部分,分别是生产者重试,服务端内部数据复制遇到非预期问题时重试,消费消费重试。
    的头像 发表于 11-23 10:15 1093次阅读

    浅析LeetCode 83删除排序链表重复元素

    给定一个已排序的链表的头 head , 删除所有重复的元素,使每个元素只出现一次 。返回 已排序的链表 。
    的头像 发表于 02-06 10:25 705次阅读

    聊聊RocketMQ的主从复制

    RocketMQ 主从复制是 RocketMQ 高可用机制之一,数据可以从主节点复制到一个或多个从节点。
    的头像 发表于 07-04 09:42 581次阅读
    聊聊<b class='flag-5'>RocketMQ</b>的主从复制

    RocketMQ和RabbitMQ的区别

    化:RocketMQ将消息存储在磁盘上,保证消息的可靠性;RabbitMQ默认将消息保存在内存,可以通过插件进行持久化。 可用性:RocketMQ具备分布
    的头像 发表于 07-24 13:39 1.4w次阅读

    RocketMQ在业务消息场景的优势有哪些呢?

    RocketMQ 5.0 是消息事件流一体的实时数据处理平台,是业务消息领域的事实标准,很多互联网公司在业务消息场景会使用 RocketMQ
    的头像 发表于 08-07 11:36 740次阅读
    <b class='flag-5'>RocketMQ</b>在业务消息场景的优势有哪些呢?

    磁盘RocketMQ构建的索引结构

    RocketMQ 广泛使用于各类业务场景,在实际生产场景,用户通常会选择消息 ID 或者特定的业务 Key(例如学号,订单号)来查询和定位特定的一批消息,进而定位分布式系统
    的头像 发表于 12-22 10:43 376次阅读
    磁盘<b class='flag-5'>中</b><b class='flag-5'>RocketMQ</b>构建的索引结构

    RocketMQ协议是什么?RocketMQ协议特点

    分布式消息系统中生产者和消费者之间的高效可靠通信。它支持同步和异步消息传递模式,可以实现灵活和响应迅速的通信方式。 RocketMQ协议基于发布-订阅消息模式,生产者将消息发布到特定的主题,消费者订阅这些主题以接收消息。该协议通
    的头像 发表于 01-03 16:11 778次阅读