基于Redis实现特殊的消息队列

发表于 3年以前  | 总阅读数:427 次

说到消息队列,首先映入脑海的就是Kafka等,消息队列在各个领域都发挥了很大的作用。但是,在一些场景下,传统的消息队列Kafka无法满足需求,比如以下场景:

  • 消息重复概率比较高时,需要对重复消息进行合并处理避免浪费有限的资源,减少消费延迟;
  • 需要根据业务自定义优先级进行消息处理,高优先级的消息比低优先级的消息先处理;
  • 消息需要定时消费的场景,消息只有在设定的消费时间到了之后立马被消费。

本文将介绍一种基于Redis实现的消息队列(Redis message queue, RMQ),RMQ可以作为传统消息队列的互补选择,在传统消息队列没有涉及的场景中使用RMQ。

功能介绍

RMQ设计为一个二方库,可以帮助用户基于Redis快速实现消息队列的功能,RMQ消息队列具有消息合并、区分优先级、支持定时消息等特性。RMQ消息队列可以用于异步解耦、削峰填谷,支持亿级数据堆积。RMQ消息队列目前支持三种类型的消息,分别是RangeMergeMessage(区间重复合并消息)、PriorityMessage(优先级消息)、FixedTimeMessage(任意定时消息)。

区间重复合并消息

RangeMergeMessage支持区间重复消息合并,发送消息时需要设置时间区间,消息延迟该时间区间长度后被消费,在该时间区间内如果发送重复的消息,重复消息将会被合并。如果消息在Redis服务端发生堆积,重复到来的消息依然会被合并处理。

该类型消息适用于消息重复率较高且希望重复消息合并处理的场景,对重复消息进行合并可以减少下游消费系统的压力,减少不必要的资源消耗,将有限的资源最大化的利用,提升消费效率。

优先级消息

PriorityMessage支持给消息设置任意等级的优先级,优先级高的消息会被优先消费,相同优先级的消息被随机消费。如果消息在Redis服务端发生堆积,重复的消息将被合并处理,合并后消息的优先级等于最后存储的消息的优先级。

该类型消息适用于希望重复消息合并处理且需要设置优先级的场景,下游消费者资源有限时,合并重复消息且优先处理优先级高的消息将可以合理利用有限的资源。

任意定时消息

FixedTimeMessage支持给消息设置任意消费时间,只有消费时间到了之后消息才被消费,消费时间可精确到秒。消息到期后没有及时被消费时,消费者将按照时间由远及近进行消费。如果消息在Redis服务端发生堆积,重复的消息将被合并处理,合并后消息的消费时间等于最后存储的消息的消费时间。

该类型消息适用于希望重复消息合并处理且需要定时消费的场景,定时消息应用场景非常丰富,比如定时打标去标、活动结束后清理动作、订单超时关闭等。

并发消费控制

使用传统消息中间件进行集群消费的时候,为了避免并发处理同一元数据导致不一致问题,通常需要对元数据加分布式锁,频繁的锁冲突会导致消费效率低下。加分布式锁的最终目的其实就是保障属于同一元数据的消息被串行消费。加分布式锁并不是最好的方案,最好的方案应该是从根上解决并发问题,让属于同一元数据的消息串行消费。

RMQ消息队列具有并发消费控制能力,属于同一元数据的消息只会被分配给全局唯一一个线程进行消费,因此属于同一元数据的消息将被串行消费。使用方如果需要该能力,除了需要提供Redis,还需要提供ZooKeeper。

重试次数控制

RMQ消息队列支持失败重试消费16次,业务返回消费失败后,消息会被回滚并等待重试消费,重试16次后消息进入死信队列,消息不再被消费,除非人工干预。

技术原理

总体框架

RMQ消息队列由三部分组成,分别为ZooKeeper、RMQ二方库、Redis。ZooKeeper负责维护集群worker的信息,将topic的所有slot分配给全局的woker。Redis负责存储消息,采用Sorted Set结构存储,Store Queue是消息存放的队列,Prepare Queue是采用二阶段消费方式正在消费的消息存放队列,Dead Queue是死信队列。RMQ二方库由RmqClient、Consumer、Producer三部分组成。RmqClient负责RMQ的启动工作,包括上报TopicDef、Worker给ZooKeeper,分配Slot给Worker,扫描业务定义的MessageListener Bean。Producer负责根据不用消息类型将消息按照指定的方式存储到Redis。Consumer负责根据不用消息类型按照指定方式从Redis弹出消息并调用业务的MessageListener。

消息存储

  • Topic的设计

Topic的定义有三部分组成,topic表示主题名称,slotAmount表示消息存储划分的槽数量,topicType表示消息的类型。主题名称是一个Topic的唯一标示,相同主题名称Topic的slotAmount和topicType一定是一样的。

消息存储采用Redis的Sorted Set结构,为了支持大量消息的堆积,需要把消息分散存储到很多个槽中,slotAmount表示该Topic消息存储共使用的槽数量,槽数量一定需要是2的n次幂。在消息存储的时候,采用对指定数据或者消息体哈希求余得到槽位置。

  • StoreQueue的设计

上图中topic划分了8个槽位,编号0-7。如果发送方指定了消息的slotBasis,则计算slotBasis的CRC32值,CRC32值对槽数量进行取模得到槽序号,SlotKey设计为#{topic}_#{index}(也即Redis的键),其中#{}表示占位符。

发送方需要保证相同内容的消息的slotBasis相同,如果没有指定slotBasis则采用消息内容计算SlotKey,这样内容相同的消息体就会落在同一个Sorted Set里面,所以内容相同的消息会进行合并。

Redis的Sorted Set中的数据按照分数排序,实现不同类型的消息的关键就在于如何利用分数、如何添加消息到Sorted Set、如何从Sorted Set中弹出消息。优先级消息将优先级作为分数,消费时每次弹出分数最大的消息。任意定时消息将时间戳作为分数,消费时每次弹出分数大于当前时间戳的一个消息。

区间重复合并消息将时间戳作为分数,添加消息时将(当前时间戳+时间区间)作为分数,消费时每次弹出分数大于当前时间戳的一个消息。

  • PrepareQueue的设计

为了保障RMQ消息队列的可用性,做到每条消息至少消费一次,消费者不是直接pop有序集合中的元素,而是将元素从StoreQueue移动到PrepareQueue并返回消息给消费者,等消费成功后再从PrepareQueue从删除,或者消费失败后从PreapreQueue重新移动到StoreQueue,这便是根据二阶段提交的思想实现的二阶段消费。

在后面将会详细介绍二阶段消费的实现思路,这里重点介绍下PrepareQueue的存储设计。StoreQueue中每一个Slot对应PrepareQueue中的Slot,PrepareQueue的SlotKey设计为prepare{#{topic}#{index}}。PrepareQueue采用Sorted Set作为存储,消息移动到PrepareQueue时刻对应的(秒级时间戳*1000+重试次数)作为分数,字符串存储的是消息体内容。这里分数的设计与重试次数的设计密切相关,所以在重试次数设计章节详细介绍。

PrepareQueue的SlotKey设计中需要注意的一点,由于消息从StoreQueue移动到PrepareQueue是通过Lua脚本操作的,因此需要保证Lua脚本操作的Slot在同一个Redis节点上,如何保证PrepareQueue的SlotKey和对应的StoreQueue的SlotKey被hash到同一个Redis槽中呢。Redis的hash tag功能可以指定SlotKey中只有某一部分参与计算hash,这一部分采用{}包括,因此PrepareQueue的SlotKey中采用{}包括了StoreQueue的SlotKey。

  • DeadQueue的设计

消息重试消费16次后,消息将进入DeadQueue。DeadQueue的SlotKey设计为prepare{#{topic}#{index}},这里同样采用hash tag功能保证DeadQueue的SlotKey与对应StoreQueue的SlotKey存储在同一Redis节点。

生产者

生产者的任务就是将消息添加到Redis的Sorted Set中。首先,需要计算出消息添加到Redis的SlotKey,如果发送方指定了消息的slotBasis(否则采用content代替),则计算slotBasis的CRC32值,CRC32值对槽数量进行取模得到槽序号,SlotKey设计为#{topic}_#{index},其中#{}表示占位符。然后,不同类型的消息有不同的添加方式,因此分布讲述三种类型消息的添加过程。

  • 区间重复合并消息

发送该消息时需要设置timeRange,timeRange必须大于0,单位为毫秒,表示消息将延迟timeRange毫秒后被消费,期间到来的重复消息将被合并,合并后的消息依然维持原来的消费时间。 因此在存储该类型消息的时候,采用(当前时间戳+timeRange)作为分数,添加消息采用Lua脚本执行,保证操作的原子性,Lua脚本首先采用zscore命令检查消息是否已经存在,如果已经存在则直接返回,如果不存在则执行zadd命令添加。

  • 优先级消息

发送该消息时需要设置priority,priority必须大于16,表示消息的优先级,数值越大表示优先级越高。因此在存储该类型消息的时候,采用priority作为分数,采用zadd命令直接添加。

  • 任意定时消息

发送该类型消息时需要设置fixedTime,fixedTime必须大于当前时间,表示消费时间戳,当前时间大于该消费时间戳的时候,消息才会被消费。因此在存储该类型消息的时候,采用fixedTime作为分数,采用命令zadd直接添加。

消费者

  • 二阶段消费方式

三种消费模式

一般消息队列存在三种消费模式,分别是:最多消费一次、至少消费一次、只消费一次。最多消费一次模式消息可能丢失,一般不怎么使用。至少消费一次模式消息不会丢失,但是可能存在重复消费,比较常用。只消费一次模式消息被精确只消费一次,实现较困难,一般需要业务记录幂等ID来实现。RMQ实现了至少消费一次的模式,那么如何保证消息至少被消费一次呢?

至少消费一次模式实现的难点

从最简单的消费模式——最多消费一次说起,消费者端只需要从消息队列服务中取出消息就行,即执行Redis的zpopmax命令,不伦消费者是否接收到该消息并成功消费,消息队列服务都认为消息消费成功。最多一次消费模式导致消息丢失的因素可能有:网络丢包导致消费者没有接收到消息,消费者接收到消息但在消费的时候宕机了,消费者接收到消息但消费失败。针对消费失败导致消息丢失的情况比较好解决,只需要把消费失败的消息重新放入消息队列服务就行,但是网络丢包和消费系统异常导致的消息丢失问题不好解决。

可能有人会想到,我们不把元素从有序集合中pop出来,我们先查询优先级最高的元素,然后消费,再删除消费成功的元素,但是这样消息服务队列就变成了同步阻塞队列,性能会很差。

至少消费一次模式的实现

至少消费一次的问题比较类似银行转账问题,A向B账户转账100元,如何保障A账户扣减100同时B账户增加100,因此我们可以想到二阶段提交的思想。第一个准备阶段,A、B分别进行资源冻结并持久化undo和redo日志,A、B分别告诉协调者已经准备好;第二个提交阶段,协调者告诉A、B进行提交,A、B分别提交事务。RMQ基于二阶段提交的思想来实现至少消费一次的模式。

RMQ存储设计中PrepareQueue的作用就是用来冻结资源并记录事务日志,消费者端即是参与者也是协调者。第一个准备阶段,消费者端通过执行Lua脚本从StoreQueue中Pop消息并存储到PrepareQueue,同时消息传输到消费者端,消费者端消费该消息;第二个提交阶段,消费者端根据消费结果是否成功协调消息队列服务是提交还是回滚,如果消费成功则提交事务,该消息从PrepareQueue中删除,如果消费失败则回滚事务,消费者端将该消息从PrepareQueue移动到StoreQueue,如果因为各种异常导致PrepareQueue中消息滞留超时,超时后将自动执行回滚操作。二阶段消费的流程图如下所示。

实现方案的异常情况分析

我们来分析下采用二阶段消费方案可能存在的异常情况,从以下分析来看二阶段消费方案可以保障消息至少被消费一次。

  1. 网络丢包导致消费者没有接收到消息,这时消息已经记录到PrepareQueue,如果到了超时时间,消息被回滚放回StoreQueue,等待下次被消费,消息不丢失。
  2. 消费者接收到了消息,但是消费者还没来得及消费完成系统就宕机了,消息消费超时到了后,消息会被重新放入StoreQueue,等待下次被消费,消息不丢失。
  3. 消费者接收到了消息并消费成功,消费者端在协调事务提交的时候宕机了,消息消费超时到了后,消息会被重新放入StoreQueue,等待下次被消费,消息被重复消费。
  4. 消费者接收到了消息但消费失败,消费者端在协调事务提交的时候宕机了,消息消费超时到了后,消息会被重新放入StoreQueue,等待下次被消费,消息不丢失。
  5. 消费者接收到了消息并消费成功,但是由于fullgc等原因使消费时间太长,PrepareQueue中的消息由于超时已经回滚到StoreQueue,等待下次被消费,消息被重复消费。
  • 重试次数控制的实现

采用二阶段消费方式,需要将消息在StoreQueue和PrepareQueue之间移动,如何实现重试次数控制呢,其关键在StoreQueue和PrepareQueue的分数设计。

PrepareQueue的分数需要与时间相关,正常情况下,消费者不管消费失败还是消费成功,都会从PrepareQueue删除消息,当消费者系统发生异常或者宕机的时候,消息就无法从PrepareQueue中删除,我们也不知道消费者是否消费成功,为保障消息至少被消费一次,我们需要做到超时回滚,因此分数需要与消费时间相关。当PrepareQueue中的消息发生超时的时候,将消息从PrepareQueue移动到StoreQueue。

因此PrepareQueue的分数设计为:秒级时间戳*1000+重试次数。不同类型的消息首次存储到StoreQueue中的分数表示的含义不尽相同,区间重复合并消息和任意定时消息存储时的分数表示消费时间戳,优先级消息存储时的分数表示优先级。如果消息消费失败,消息从PrepareQueue回滚到StoreQueue,所有类型的消息存储时的分数都表示剩余重试次数,剩余重试次数从16次不断降低最后为0,消息进入死信队列。消息在StoreQueue和PrepareQueue之间移动流程如下:

  • Pop消息

不同类型的消息在消费的时候Pop消息的方式不一样,因此接下来分别讲述三种类型消息的Pop方式。

区间重复合并消息

该消息存储的分数设计为消费时间戳,当前时间大于消息的消费时间戳时,该消息应该被消费。因此采用Redis命令ZRANGEBYSCORE弹出分数小于当前时间戳的一条消息。

优先级消息

该消息存储的分数设计为优先级,优先级越高分数越大,因此采用Redis命令ZPOPMAX弹出分数最大的一条消息。

任意定时消息

该消息存储的分数设计为消费时间戳,当前时间大于消息的消费时间戳时,该消息应该被消费。因此采用Redis命令ZRANGEBYSCORE弹出分数小于当前时间戳的一条消息。

相关应用

主图价格表达项目

在主图价格表达中需要实现一个功能,商品价格发生变化时将商品价格打印在商品主图上面,那么需要在价格发生变动的时候触发合成一张带价格的图片,每一次触发合图时计算价格都是获取当前最新的价格。上游价格变化的因素很多,变化很频繁,下游合图消耗GPU资源较大,处理容量较低。因此需要尽可能合并触发合图消息,减轻下游处理压力,于是使用了RMQ作为消息队列来进行削峰填谷、消息合并。不仅如此,还可以根据商家等级划分触发合图消息的等级,使KA商家能够优先得到处理,缩短价格变化的延迟。

在线上实际环境中,集群共130台机器,RMQ消息队列的发送消息能力和消费消息能力均可以达到5w tps,而且这并不是峰值,理论上可以达到10w tps。

在线数据圈选引擎

在线数据圈选引擎需要处理各种来源的大量动态数据,需要将一段时间区间内的消息合并处理,减少处理压力,并且在对同一元数据进行并发处理需要加分布式锁,锁冲突导致消费效率下降。RMQ的区间重复合并消息和并发消费控制能力可以帮助解决这些问题。目前,在线数据圈选引擎已经采用了RMQ消息队列作为核心组件,RMQ消息队列发挥了很大的作用。

总结

本文提出了一种可实现的基于Redis的消息队列,充分利用Sorted Set结构设计了消息合并、优先级、定时等特性,与传统消息队列形成互补,弥补传统消息队列这方面特性的缺失。为了实现高可用,本文在二阶段提交的思想上进行改进设计了二阶段消费方式,保障消息至少被消费一次。

未来将基于Redis的特性打造更多独特的功能,与传统消息中间件形成互补。在消费控制方面会增加流量自动调控能力,根据消息类型调控消费速度,减少因为某种类型消息消费瓶颈导致整体消费性能下降。

本文由哈喽比特于3年以前收录,如有侵权请联系我们。
文章来源:https://mp.weixin.qq.com/s/QTDrVEhp4XBFEowgaJcOgw

 相关推荐

刘强东夫妇:“移民美国”传言被驳斥

京东创始人刘强东和其妻子章泽天最近成为了互联网舆论关注的焦点。有关他们“移民美国”和在美国购买豪宅的传言在互联网上广泛传播。然而,京东官方通过微博发言人发布的消息澄清了这些传言,称这些言论纯属虚假信息和蓄意捏造。

发布于:1年以前  |  808次阅读  |  详细内容 »

博主曝三大运营商,将集体采购百万台华为Mate60系列

日前,据博主“@超能数码君老周”爆料,国内三大运营商中国移动、中国电信和中国联通预计将集体采购百万台规模的华为Mate60系列手机。

发布于:1年以前  |  770次阅读  |  详细内容 »

ASML CEO警告:出口管制不是可行做法,不要“逼迫中国大陆创新”

据报道,荷兰半导体设备公司ASML正看到美国对华遏制政策的负面影响。阿斯麦(ASML)CEO彼得·温宁克在一档电视节目中分享了他对中国大陆问题以及该公司面临的出口管制和保护主义的看法。彼得曾在多个场合表达了他对出口管制以及中荷经济关系的担忧。

发布于:1年以前  |  756次阅读  |  详细内容 »

抖音中长视频App青桃更名抖音精选,字节再发力对抗B站

今年早些时候,抖音悄然上线了一款名为“青桃”的 App,Slogan 为“看见你的热爱”,根据应用介绍可知,“青桃”是一个属于年轻人的兴趣知识视频平台,由抖音官方出品的中长视频关联版本,整体风格有些类似B站。

发布于:1年以前  |  648次阅读  |  详细内容 »

威马CDO:中国每百户家庭仅17户有车

日前,威马汽车首席数据官梅松林转发了一份“世界各国地区拥车率排行榜”,同时,他发文表示:中国汽车普及率低于非洲国家尼日利亚,每百户家庭仅17户有车。意大利世界排名第一,每十户中九户有车。

发布于:1年以前  |  589次阅读  |  详细内容 »

研究发现维生素 C 等抗氧化剂会刺激癌症生长和转移

近日,一项新的研究发现,维生素 C 和 E 等抗氧化剂会激活一种机制,刺激癌症肿瘤中新血管的生长,帮助它们生长和扩散。

发布于:1年以前  |  449次阅读  |  详细内容 »

苹果据称正引入3D打印技术,用以生产智能手表的钢质底盘

据媒体援引消息人士报道,苹果公司正在测试使用3D打印技术来生产其智能手表的钢质底盘。消息传出后,3D系统一度大涨超10%,不过截至周三收盘,该股涨幅回落至2%以内。

发布于:1年以前  |  446次阅读  |  详细内容 »

千万级抖音网红秀才账号被封禁

9月2日,坐拥千万粉丝的网红主播“秀才”账号被封禁,在社交媒体平台上引发热议。平台相关负责人表示,“秀才”账号违反平台相关规定,已封禁。据知情人士透露,秀才近期被举报存在违法行为,这可能是他被封禁的部分原因。据悉,“秀才”年龄39岁,是安徽省亳州市蒙城县人,抖音网红,粉丝数量超1200万。他曾被称为“中老年...

发布于:1年以前  |  445次阅读  |  详细内容 »

亚马逊股东起诉公司和贝索斯,称其在购买卫星发射服务时忽视了 SpaceX

9月3日消息,亚马逊的一些股东,包括持有该公司股票的一家养老基金,日前对亚马逊、其创始人贝索斯和其董事会提起诉讼,指控他们在为 Project Kuiper 卫星星座项目购买发射服务时“违反了信义义务”。

发布于:1年以前  |  444次阅读  |  详细内容 »

苹果上线AppsbyApple网站,以推广自家应用程序

据消息,为推广自家应用,苹果现推出了一个名为“Apps by Apple”的网站,展示了苹果为旗下产品(如 iPhone、iPad、Apple Watch、Mac 和 Apple TV)开发的各种应用程序。

发布于:1年以前  |  442次阅读  |  详细内容 »

特斯拉美国降价引发投资者不满:“这是短期麻醉剂”

特斯拉本周在美国大幅下调Model S和X售价,引发了该公司一些最坚定支持者的不满。知名特斯拉多头、未来基金(Future Fund)管理合伙人加里·布莱克发帖称,降价是一种“短期麻醉剂”,会让潜在客户等待进一步降价。

发布于:1年以前  |  441次阅读  |  详细内容 »

光刻机巨头阿斯麦:拿到许可,继续对华出口

据外媒9月2日报道,荷兰半导体设备制造商阿斯麦称,尽管荷兰政府颁布的半导体设备出口管制新规9月正式生效,但该公司已获得在2023年底以前向中国运送受限制芯片制造机器的许可。

发布于:1年以前  |  437次阅读  |  详细内容 »

马斯克与库克首次隔空合作:为苹果提供卫星服务

近日,根据美国证券交易委员会的文件显示,苹果卫星服务提供商 Globalstar 近期向马斯克旗下的 SpaceX 支付 6400 万美元(约 4.65 亿元人民币)。用于在 2023-2025 年期间,发射卫星,进一步扩展苹果 iPhone 系列的 SOS 卫星服务。

发布于:1年以前  |  430次阅读  |  详细内容 »

𝕏(推特)调整隐私政策,可拿用户发布的信息训练 AI 模型

据报道,马斯克旗下社交平台𝕏(推特)日前调整了隐私政策,允许 𝕏 使用用户发布的信息来训练其人工智能(AI)模型。新的隐私政策将于 9 月 29 日生效。新政策规定,𝕏可能会使用所收集到的平台信息和公开可用的信息,来帮助训练 𝕏 的机器学习或人工智能模型。

发布于:1年以前  |  428次阅读  |  详细内容 »

荣耀CEO谈华为手机回归:替老同事们高兴,对行业也是好事

9月2日,荣耀CEO赵明在采访中谈及华为手机回归时表示,替老同事们高兴,觉得手机行业,由于华为的回归,让竞争充满了更多的可能性和更多的魅力,对行业来说也是件好事。

发布于:1年以前  |  423次阅读  |  详细内容 »

AI操控无人机能力超越人类冠军

《自然》30日发表的一篇论文报道了一个名为Swift的人工智能(AI)系统,该系统驾驶无人机的能力可在真实世界中一对一冠军赛里战胜人类对手。

发布于:1年以前  |  423次阅读  |  详细内容 »

AI生成的蘑菇科普书存在可致命错误

近日,非营利组织纽约真菌学会(NYMS)发出警告,表示亚马逊为代表的电商平台上,充斥着各种AI生成的蘑菇觅食科普书籍,其中存在诸多错误。

发布于:1年以前  |  420次阅读  |  详细内容 »

社交媒体平台𝕏计划收集用户生物识别数据与工作教育经历

社交媒体平台𝕏(原推特)新隐私政策提到:“在您同意的情况下,我们可能出于安全、安保和身份识别目的收集和使用您的生物识别信息。”

发布于:1年以前  |  411次阅读  |  详细内容 »

国产扫地机器人热销欧洲,国产割草机器人抢占欧洲草坪

2023年德国柏林消费电子展上,各大企业都带来了最新的理念和产品,而高端化、本土化的中国产品正在不断吸引欧洲等国际市场的目光。

发布于:1年以前  |  406次阅读  |  详细内容 »

罗永浩吐槽iPhone15和14不会有区别,除了序列号变了

罗永浩日前在直播中吐槽苹果即将推出的 iPhone 新品,具体内容为:“以我对我‘子公司’的了解,我认为 iPhone 15 跟 iPhone 14 不会有什么区别的,除了序(列)号变了,这个‘不要脸’的东西,这个‘臭厨子’。

发布于:1年以前  |  398次阅读  |  详细内容 »
 相关文章
Android插件化方案 5年以前  |  237231次阅读
vscode超好用的代码书签插件Bookmarks 2年以前  |  8065次阅读
 目录