上回说到[使用 Redis 的 List 实现消息队列] 有很多局限性,比如:
Stream 是 Redis 5.0 引入的一种专门为消息队列设计的数据类型,Stream 是一个包含 0 个或者多个元素的有序队列,这些元素根据 ID 的大小进行有序排列。
它实现了大部分消息队列的功能:
提供了很多消息队列操作命令,并且借鉴 Kafka 的 Consumer Groups 的概念,提供了消费组功能。
同时提供了消息的持久化和主从复制机制,客户端可以访问任何时刻的数据,并且能记住每一个客户端的访问位置,从而保证消息不丢失。
废话少说,先来看下如何使用,官网文档详见:https://redis.io/topics/streams-intro
「云岚宗众弟子听命,击杀萧炎!」
当云山最后一字落下,那弥漫的紧绷气氛,顿时宣告破碎,悬浮半空的众多云岚宗长老背后双翼一振,便是咻咻的划过天际,追杀萧炎。
云山使用以下指令向队列中插入「追杀萧炎」命令,让长老带领子弟去执行。
XADD 云岚宗 * task kill name 萧炎
"1645936602161-0"
Stream 中的每个元素由键值对的形式组成,不同元素可以包含不同数量的键值对。
该命令的语法如下:
XADD streamName id field value [field value ...]
消息队列名称后面的 「*」 ,表示让 Redis 为插入的消息自动生成唯一 ID,当然也可以自己定义。
消息 ID 由两部分组成:
❝通过将元素 ID 与时间进行关联,并强制要求新元素的 ID 必须大于旧元素的 ID, Redis 从逻辑上将流变成了一种只执行追加操作(append only)的数据结构。
这种特性对于使用流实现消息队列和事件系统的用户来说是非常重要的:
用户可以确信,新的消息和事件只会出现在已有消息和事件之后,就像现实世界里新事件总是发生在已有事件之后一样,一切都是有序进行的。
云凌老狗使用如下指令接收云山的命令:
XREAD COUNT 1 BLOCK 0 STREAMS 云岚宗 0-0
1) 1) "\xe4\xba\x91\xe5\xb2\x9a\xe5\xae\x97"
2) 1) 1) "1645936602161-0"
2) 1) "task"
2) "kill"
3) "name"
4) "萧炎" # 萧炎
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
该指令可以同时对多个流进行读取,每个心法对应含义如下:
如果想使用 XREAD 进行顺序消费,每次读取后要记住返回的消息 ID,下次调用 XREAD 就将上一次返回的消息 ID 作为参数传递到下一次调用就可以继续消费后续的消息了。
❝云韵宗主,我今天刚到云岚宗,历史的消息就不接了,只想接收我使用 XREAD 阻塞等待的那一刻开始通过 XADD 发布的消息要咋整?
运行「」心法即可,心法的最后「」符号表示读取最新的阻塞消息,读取不到则一直死等。
等待过程中,其他长老向队列追加消息,则会立即读取到。
XREAD COUNT 1 BLOCK 0 STREAMS 云岚宗 $
❝这么容易就实现消息队列了么?说好的 ACK 机制呢?
这里只是开胃菜,通过 XREAD 读取的数据其实并没有被删除,当重新执行 XREAD COUNT 2 BLOCK 0 STREAMS 云岚宗 0-0
指令的时候又会重新读取到。
所以我们还需要 ACK 机制,
接下来,我们来一个真正的消息队列。
Redis Stream 的 ConsumerGroup(消费者组)允许用户将一个流从逻辑上划分为多个不同的流,并让 ConsumerGroup 的消费者去处理。
它是一个强大的支持多播的可持久化的消息队列。Redis Stream 借鉴了 Kafka 的设计。
Stream 的高可用是建立主从复制基础上的,它和其它数据结构的复制机制没有区别,也就是说在 Sentinel 和 Cluster 集群环境下 Stream 是可以支持高可用的。
Redis-Stream
消费组实现的消息队列主要涉及以下三个指令:
Stream 通过 XGROUP CREATE
指令创建消费组 (Consumer Group),需要传递起始消息 ID 参数用来初始化 last_delivered_id
变量。
我们使用 XADD 往 bossStream 队列插入一些消息:
XADD bossStream * name zhangsan age 26
XADD bossStream * name lisi age 2
XADD bossStream * name bigold age 40
如下指令,为消息队列名为 bossStream 创建「青龙门」和「六扇门」两个消费组。
# 语法如下
# XGROUP CREATE stream group start_id
XGROUP CREATE bossStream 青龙门 0-0 MKSTREAM
XGROUP CREATE bossStream 六扇门 0-0 MKSTREAM
0-0
从第一条开始读取, $
表示从最后一条向后开始读取,只接收新消息。MKSTREAM
子命令作为 之后的最后一个参数来自动创建流。让「青龙门」消费组的 consumer1
从bossStream
阻塞读取一条消息:
XREADGROUP GROUP 青龙门 consumer1 COUNT 1 BLOCK 0 STREAMS bossStream >
1) 1) "bossStream"
2) 1) 1) "1645957821396-0"
2) 1) "name"
2) "zhangsan"
3) "age"
4) "26"
语法如下:
XREADGROUP GROUP groupName consumerName [COUNT n] [BLOCK ms] STREAMS streamName [stream ...] id [id ...]
[] 内的表示可选参数,该命令与 XREAD
大同小异,区别在于新增 GROUP groupName consumerName
选项。
该选项的两个参数分别用于指定被读取的消费者组以及负责处理消息的消费者。
其中:
>
:命令的最后参数 >
,表示从尚未被消费的消息开始读取;敲黑板了
如果消息队列中的消息被消费组的一个消费者消费了,这条消息就不会再被这个消费组的其他消费者读取到。
比如 consumer2
执行读取操作:
XREADGROUP GROUP 青龙门 consumer2 COUNT 1 BLOCK 0 STREAMS bossStream >
1) 1) "bossStream"
2) 1) 1) "1645957838700-0"
2) 1) "name"
2) "lisi"
3) "age"
4) "2"
consumer2
不能再读取到 zhangsan
了,而是读取下一条 lisi
因为这条消息已经被 consumer1
读取了。
使用消费者的另一个目的可以让组内的多个消费者分担读取消息,也就是每个消费者读取部分消息,从而实现均衡负载。
比如一个消费组有三个消费者 C1、C2、C3 和一个包含消息 1、2、3、4、5、6、7 的流:
为了保证消费者在消费的时候发生故障或者宕机重启后依然可以读取消息,Stream 内部有一个队列(pending List)保存每个消费者读取但是还没有执行 ACK 的消息。
如果消费者使用了 XREADGROUP GROUP groupName consumerName
读取消息,但是没有给 Stream 发送 XACK
命令,消息依然保留。
比如查看 bossStream
中的 消费组「青龙门」中各个消费者已读取未确认的消息信息:
XPENDING bossStream 青龙门
1) (integer) 2
2) "1645957821396-0"
3) "1645957838700-0"
4) 1) 1) "consumer1"
2) "1"
2) 1) "consumer2"
2) "1"
1)
未确认消息条数;2) ~ 3)
青龙门中所有消费者读取的消息最小和最大 ID;查看 consumer1
读取了哪些数据,使用以下命令:
XPENDING bossStream 青龙门 - + 10 consumer1
1) 1) "1645957821396-0"
2) "consumer1"
3) (integer) 3758384
4) (integer) 1
所以当接收到消息并且消费成功以后,我们需要手动 ACK 通知 Streams,这条消息就会被删除了。命令如下:
XACK bossStream 青龙门 1645957821396-0 1645957838700-0
(integer) 2
语法如下:
XACK key group-key ID [ID ...]
消费确认增加了消息的可靠性,一般在业务处理完成之后,需要执行 ack 确认消息已经被消费完成,整个流程的执行如下图所示:
Stream 整体流程
使用 maven 添加依赖
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.16.7</version>
</dependency>
添加 Redis 配置,码哥的 Redis 没有配置密码,大家根据实际情况配置即可。
spring:
application:
name: redission
redis:
host: 127.0.0.1
port: 6379
ssl: false
@Slf4j
@Service
public class QueueService {
@Autowired
private RedissonClient redissonClient;
/**
* 发送消息到队列
*
* @param message
*/
public void sendMessage(String message) {
RStream<String, String> stream = redissonClient.getStream("sensor#4921");
stream.add("speed", "19");
stream.add("velocity", "39%");
stream.add("temperature", "10C");
}
/**
* 消费者消费消息
*
* @param message
*/
public void consumerMessage(String message) {
RStream<String, String> stream = redissonClient.getStream("sensor#4921");
stream.createGroup("sensors_data", StreamMessageId.ALL);
Map<StreamMessageId, Map<String, String>> messages = stream.readGroup("sensors_data", "consumer_1");
for (Map.Entry<StreamMessageId, Map<String, String>> entry : messages.entrySet()) {
Map<String, String> msg = entry.getValue();
System.out.println(msg);
stream.ack("sensors_data", entry.getKey());
}
}
}
本文由哈喽比特于2年以前收录,如有侵权请联系我们。
文章来源:https://mp.weixin.qq.com/s/3gnUdBS7-JJzV1ZDuAd--w
京东创始人刘强东和其妻子章泽天最近成为了互联网舆论关注的焦点。有关他们“移民美国”和在美国购买豪宅的传言在互联网上广泛传播。然而,京东官方通过微博发言人发布的消息澄清了这些传言,称这些言论纯属虚假信息和蓄意捏造。
日前,据博主“@超能数码君老周”爆料,国内三大运营商中国移动、中国电信和中国联通预计将集体采购百万台规模的华为Mate60系列手机。
据报道,荷兰半导体设备公司ASML正看到美国对华遏制政策的负面影响。阿斯麦(ASML)CEO彼得·温宁克在一档电视节目中分享了他对中国大陆问题以及该公司面临的出口管制和保护主义的看法。彼得曾在多个场合表达了他对出口管制以及中荷经济关系的担忧。
今年早些时候,抖音悄然上线了一款名为“青桃”的 App,Slogan 为“看见你的热爱”,根据应用介绍可知,“青桃”是一个属于年轻人的兴趣知识视频平台,由抖音官方出品的中长视频关联版本,整体风格有些类似B站。
日前,威马汽车首席数据官梅松林转发了一份“世界各国地区拥车率排行榜”,同时,他发文表示:中国汽车普及率低于非洲国家尼日利亚,每百户家庭仅17户有车。意大利世界排名第一,每十户中九户有车。
近日,一项新的研究发现,维生素 C 和 E 等抗氧化剂会激活一种机制,刺激癌症肿瘤中新血管的生长,帮助它们生长和扩散。
据媒体援引消息人士报道,苹果公司正在测试使用3D打印技术来生产其智能手表的钢质底盘。消息传出后,3D系统一度大涨超10%,不过截至周三收盘,该股涨幅回落至2%以内。
9月2日,坐拥千万粉丝的网红主播“秀才”账号被封禁,在社交媒体平台上引发热议。平台相关负责人表示,“秀才”账号违反平台相关规定,已封禁。据知情人士透露,秀才近期被举报存在违法行为,这可能是他被封禁的部分原因。据悉,“秀才”年龄39岁,是安徽省亳州市蒙城县人,抖音网红,粉丝数量超1200万。他曾被称为“中老年...
9月3日消息,亚马逊的一些股东,包括持有该公司股票的一家养老基金,日前对亚马逊、其创始人贝索斯和其董事会提起诉讼,指控他们在为 Project Kuiper 卫星星座项目购买发射服务时“违反了信义义务”。
据消息,为推广自家应用,苹果现推出了一个名为“Apps by Apple”的网站,展示了苹果为旗下产品(如 iPhone、iPad、Apple Watch、Mac 和 Apple TV)开发的各种应用程序。
特斯拉本周在美国大幅下调Model S和X售价,引发了该公司一些最坚定支持者的不满。知名特斯拉多头、未来基金(Future Fund)管理合伙人加里·布莱克发帖称,降价是一种“短期麻醉剂”,会让潜在客户等待进一步降价。
据外媒9月2日报道,荷兰半导体设备制造商阿斯麦称,尽管荷兰政府颁布的半导体设备出口管制新规9月正式生效,但该公司已获得在2023年底以前向中国运送受限制芯片制造机器的许可。
近日,根据美国证券交易委员会的文件显示,苹果卫星服务提供商 Globalstar 近期向马斯克旗下的 SpaceX 支付 6400 万美元(约 4.65 亿元人民币)。用于在 2023-2025 年期间,发射卫星,进一步扩展苹果 iPhone 系列的 SOS 卫星服务。
据报道,马斯克旗下社交平台𝕏(推特)日前调整了隐私政策,允许 𝕏 使用用户发布的信息来训练其人工智能(AI)模型。新的隐私政策将于 9 月 29 日生效。新政策规定,𝕏可能会使用所收集到的平台信息和公开可用的信息,来帮助训练 𝕏 的机器学习或人工智能模型。
9月2日,荣耀CEO赵明在采访中谈及华为手机回归时表示,替老同事们高兴,觉得手机行业,由于华为的回归,让竞争充满了更多的可能性和更多的魅力,对行业来说也是件好事。
《自然》30日发表的一篇论文报道了一个名为Swift的人工智能(AI)系统,该系统驾驶无人机的能力可在真实世界中一对一冠军赛里战胜人类对手。
近日,非营利组织纽约真菌学会(NYMS)发出警告,表示亚马逊为代表的电商平台上,充斥着各种AI生成的蘑菇觅食科普书籍,其中存在诸多错误。
社交媒体平台𝕏(原推特)新隐私政策提到:“在您同意的情况下,我们可能出于安全、安保和身份识别目的收集和使用您的生物识别信息。”
2023年德国柏林消费电子展上,各大企业都带来了最新的理念和产品,而高端化、本土化的中国产品正在不断吸引欧洲等国际市场的目光。
罗永浩日前在直播中吐槽苹果即将推出的 iPhone 新品,具体内容为:“以我对我‘子公司’的了解,我认为 iPhone 15 跟 iPhone 14 不会有什么区别的,除了序(列)号变了,这个‘不要脸’的东西,这个‘臭厨子’。