0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看技术视频
  • 写文章/发帖/加入社区
会员中心
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

一种基于MySQL实现的stream队列

jf_ro2CN3Fa 来源:稀土掘金 2023-11-24 09:53 次阅读

EMS

Extend MySQL Stream;一种基于 MySQL 实现的 stream 队列。

功能

集群消费、广播消费

自动重试、死信队列

快速重置消息位点,快速回放消息,快速查询消息

消息可基于磁盘积压、消息可快速清理

监控 group 积压,topic 消息量排行,消息链路追踪,消息消费超时告警;

读写性能 1200-3000 QPS 左右

写入设计

msg id 就是 topic 维度的自增 id,可对多个 topic 并发写入

针对一个 topic,需要有物理 physics offset, 每次写入,topic 维度的 physics_offset 自增加一

如果使用 redis 自增特性实现, 为每个 topic 配置一个自增 key, 则可以避免加锁.

redis 实现虽然性能好, 如为配置aof,宕机则可能导致丢失数据, 此时,会出现 offset 重复异常, 过一会随着继续自增, 也就恢复了.

写入需要上锁吗? 看怎么写, 如果使用非原子的形式自增 id,比如数据的的方式,先查出最大 id,再加一,那么必须加锁

topic 维度的自增 id 如果使用 mysql 实现, 性能不堪受辱,因此,此处使用 redis 自增实现(可配置为 mysql 实现);

经过测试,笔记本电脑,单 topic 20 并发写入,qps 在 1000-1500 左右(local mysql & local redis),基本满足业务需求。

考虑到高可用性和业务场景,此处无法使用批量插入

所有的 topic 和 msg 都写入的这一张表中,表数据定时清理,消费完的消息,可提前删除。

注意,本方案写入性能瓶颈是 MySQL Server 的性能瓶颈。

读取设计

假设针对一个 topic,只有一个 consumer,只需循环读取,然后更新 offset 即可。

但结合实际业务场景,这种基本不存在,所以,忽略这种场景。

通常,一个 topic 有多个 consumer group(简称 tg), 一个 consumer group 有多个 client(jvm or thread

如果一个 topic + group(简称 tg),有多个 consumer,每个 consumer 有多个线程,读取和更新 offset 则会有并发问题, 如下图。

这个 client id,我们将其设计为,ip + pid + uuid + thread id;

ip 和 pid 可帮助我们追溯问题

uuid 简单防重复

thread id,一种性能优化,下面细说。

结合实际业务场景,且遵循 simple is better 原则,读取时,使用上锁的方式解决并发问题。锁的粒度就是 tg

考虑到要实现基本的顺序读取和防止重复消费,多线程并发时,我们应当实现基于自增的形式读取 msg;每个 clientid 读取消息后,都会记录一个简单的log,并在 tg 维度增加一个 max offset

每次读取消息时,每个 client 都需要去检查当前想要读取的 tg 是否已经有【其他 client】在操作 max offset。即,我们将锁的粒度缩小到了 max offset;

对这个 tg 维度的 max offset + n

批量插入这个 tg + clientid offset log,表明这个消息被这个 clientid 读取了,同时也间接更新了 max offset(order by offset)

释放锁

拉取刚刚读取的 msg id list 里面的消息体

交给业务处理消息

整体原则是,一个 t + g 的 max offset,同时只能有一个 thread 操作(写和更新)

如果有其他人在读取,则阻塞

如果没有其他人在读取,则锁住这个 tg, 并批量拉取一定数量的消息 id,

fe6133ce-8a69-11ee-939d-92fbcf53809c.jpg

ack

对于集群消息,如何保证在断电情况下,消息不丢失,使用数据库存储消息, 写入即不会丢失, 但消费时, 如果刚刚读进内存就立刻宕机,则需要在重启时恢复消息.

每个 client get 到消息后,都需要记录 msg pid,consumer group,state(start、done,retry)为 start 状态

ack success,将 log update 为 done 状态

ack fail 后,将 log update 为 retry 状态,同时将消息存入重试队列

如果 client 还存活,超过 1 分钟(可配),则将其捞出,放进重试队列,并在 10s 进行第一次重试

如果 client 还存活,则立刻将其捞出,放进重试队列,并在 10s 进行第一次重试

这里需要上锁吗?其实是不需要的,因为更新的维度是 client id 的 log,不存在并发更新. 这里更新状态是表示这些消息已经处理结束了,否则无法判定宕机场景。

对于 start 状态的消息,定时任务会去检查

ack 是批量的,ack 失败,仅会导致重复消费。

广播消息

是否为广播消息由 topic 确定

广播消息不需要上锁,每一个订阅该 topic 的 client 都会读取到该消息

广播消息不需要 ack,不需要记录成功或失败或重试,仅需要内存里记录 offset

推荐尽可能使用集群模式,使用集群模式模拟广播模式

client id

只有 consumer 需要 client id

client id 由 ip pid uuid + thread id 组成, 可溯源.

client id 需要续约(5s),如果机器宕机,则会被自动清除,且他的 start 状态的消息会进入重试队列,交给同 group 的其他 client

client id 可以自己主动注销,注销前,自己内存的消息应当被优雅消费结束,一般来讲,kill -15 的 jvm 都会主动注销 client id;

核心表设计

topic 表:记录topic 元信息

group 表:记录 group 订阅元信息

msg 表:msg总表,记录写入的信息,包含 body 和 topic 维度的自增 offset,类似 rocketmq commit log

该表会被多个 consumer 消费的消息

该表会被定制删除过期数据

retry msg 表,消费失败、超时的消息,会进入该表,并按阶梯定时消费

dead msg 表,消费重试 16(any config) 次的消息,会进入该表

topic_group_log 表:记录 consumer group client 的 msg 消费记录,包含 state(start、done,retry) 字段,可 ack

该表的记录行数会非常多,单行数据较少,可自动删除 done 的记录

如上文所说,由于本方案未采用常见的多 queue 和多 partition 的设计,因此瓶颈在于上图提到的分布式锁的设计上,具体链路为 consumer group client 在集群消费时, 为了让并发读取的 thread 拉取到的消息尽可能准确,使用上锁的方式来实现。

总体看下来,可以简单理解为,ems 失去了性能,却拥有了所有。







审核编辑:刘清

声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • MySQL
    +关注

    关注

    1

    文章

    801

    浏览量

    26437
  • QPS
    QPS
    +关注

    关注

    0

    文章

    24

    浏览量

    8788

原文标题:如何设计一款基于 MySQL 实现的 Message Queue

文章出处:【微信号:芋道源码,微信公众号:芋道源码】欢迎添加关注!文章转载请注明出处。

收藏 人收藏

    评论

    相关推荐

    Stream API原理介绍

    Stream API 是 Java 8 中最重要的新特性之,它是处理集合和数组的一种新方式。它提供了一种简单、灵活和可读的方式来处理集合和数组中的元素,从而使代码更加简洁、高效和易于
    的头像 发表于 09-30 15:31 671次阅读

    聊消息队列技术选型的7消息场景

    我们在做消息队列的技术选型时,往往会结合业务场景进行考虑。今天来聊聊消息队列可能会用到的 7 消息场景。
    的头像 发表于 12-09 17:50 1315次阅读
    聊<b class='flag-5'>一</b>聊消息<b class='flag-5'>队列</b>技术选型的7<b class='flag-5'>种</b>消息场景

    Redis Stream应用案例

    的IoT设备会形成巨大的数据洪流,采集完成后在云端进行分析,产生巨大的用户价值。这些数据虽然内容各个不同,但是都有个共同的特点,都是一种时序数据。看到这里,你可能会突然发现,Redis Stream
    发表于 06-26 17:15

    怎样去设计一种采用覆盖机制的FIFO队列模型呢

    FIFO队列是什么?怎样去设计一种采用覆盖机制的FIFO队列模型呢?
    发表于 12-08 06:07

    如何利用Java swing mysql实现一种电影票订票管理系统呢

    Java swing mysql实现的电影票订票管理系统,主要实现的功能有:用户端:登录注册、查看电影信息、选择影院场次、选座购票、查看自己的影票、评价电影等功能。管理员:登录、电影管理、影院管理
    发表于 01-03 06:23

    实现队列环形缓冲的方法

    串口队列环形缓冲区队列串口环形缓冲的好处代码实现队列  要实现队列环形缓冲,还需要
    发表于 02-21 07:11

    如何去实现一种队列程序的设计呢

    队列的原理是什么?队列有何作用?如何去实现一种队列程序的设计呢?
    发表于 02-25 07:50

    一种改进的主动队列管理算法

    主动队列管理是实现网络拥塞控制的重要技术,但是多数主动队列管理算法如随机早期检(RED)都存在对参数依赖性强的问题。针对RED算法中平均队列长度不能完全反映网络拥塞状况的
    发表于 04-13 09:08 14次下载

    一种高效的磁盘队列I/O机制

    分析了传统磁盘队列的存储管理开销和读写性能,针对磁盘队列I/O已成为影响消息服务器性能的首要瓶颈,提出了一种高效磁盘队列I/O机制—FlashQ。FlashQ采用物理上连续的磁盘块
    发表于 05-14 19:51 32次下载

    一种基于速率的公平队列管理算法

    针对主动队列管理算法普遍存在的公平性问题,提出基于速率的公平队列管理算法RFED。该算法根据分组的到达速率调节丢包率,将队列的到达速率控制在链路的服务速率下,根据
    发表于 10-04 14:11 15次下载

    深度解析数据结构与算法篇之队列及环形队列实现

    的位置。 02 — 环形队列实现 要想将元素放入队列我们必须知道对头和队尾,在队列长度不能无限大的条件下我们还要知道队列的最大容量,我们还
    的头像 发表于 06-18 10:07 1893次阅读

    队列实现栈原理是什么?队列实现栈方案有哪几种?

    栈是一种后进先出的数据结构,而队列一种先进先出的数据结构,两者原理不难理解,使用也简单。
    的头像 发表于 07-04 13:28 2706次阅读
    <b class='flag-5'>队列</b><b class='flag-5'>实现</b>栈原理是什么?<b class='flag-5'>队列</b><b class='flag-5'>实现</b>栈方案有哪几种?

    TencentOS-tiny中环形队列实现

    1. 什么是队列队列(queue)是一种只能在端插入元素、在另端删除元素的数据结构,遵循「先入先出」(FIFO)的规则。 队列中有两个基
    的头像 发表于 10-08 16:30 1359次阅读

    一种基于单片机实现队列功能模块

    基于单片机实现队列功能模块,主要用于8位、16位、32位非运行RTOS的单片机应用,兼容大多数单片机平台。
    的头像 发表于 08-14 11:09 791次阅读
    <b class='flag-5'>一种</b>基于单片机<b class='flag-5'>实现</b>的<b class='flag-5'>队列</b>功能模块

    嵌入式环形队列与消息队列实现原理

    嵌入式环形队列,也称为环形缓冲区或循环队列,是一种先进先出(FIFO)的数据结构,用于在固定大小的存储区域中高效地存储和访问数据。其主要特点包括固定大小的数组和两个指针(头指针和尾指针),分别指向
    的头像 发表于 09-02 15:29 323次阅读