Redis内部提供了一个非常实用的数据结构,这款数据结构就是List。
List是一款简单的字符串列表,这类字符串列表所能存储的元素个数上限为(2^32 -1 )个Redis在底层对它提供了非常丰富的API接口供外界调用,从而能实现不同的效果。
环境准备
首先我们需要拉取docker的相关镜像文件到本地机器:
docker pull redis:4.0
然后便是启动这个容器:(启动的时候需要指定相关的镜像文件和对应镜像版本号)
docker run -p 6379:6379 -d --name myredis redis:4.0
于是容器启动后便ok了:
好了,现在我们有了一个完整的redis实操环境 下边我们先从应用方面开始着手List这款数据结构。
队列结构
如果使用了list结构的时候,我们是从一端插入,从另外一端弹出,那么此时所处理的一个数据结构就会是一个队列的模式,例如说:
lpush list a
lpush list b
lpush list c
那么这个时候,数据结构list当中的元素从左到右依次排序的信息就会是:
c,b,a
这个时候如果我们从数据结构list队列中将元素依次取出
rpop list
rpop list
rpop list
那么在每次执行redis指令之后,它所弹出的元素就会是:
a,b,c
栈结构
如果我们使用了list结构,并且每次操作都是在同一端去做处理,例如说:
lpush list a
lpush list b
lpush list c
那么这个时候,数据结构list的元素从左到右依旧会是:
c,b,a
如果这个时候我们从list结构的同一端中依次将元素弹出:
lpop list
lpop list
lpop list
执行之后的元素信息则会是:
c,b,a
所以可以看出,redis的一种list数据结构类型,在采取不同的指令进行操作之后可以模拟出不同数据结构的效果,整体使用上还是比较灵活的。
List结构的实现方式
在版本3.2之前,Redis 列表list使用两种数据结构作为底层实现:
压缩链表
压缩列表(ziplist)是Redis为了节省内存而开发的,是由一系列特殊编码的连续内存块组成的顺序型数据结构,一个压缩列表可以包含任意多个节点(entry),每个节点可以保存一个字节数组或者一个整数值。重点是内存连续。
ziplist数据结构内部其实有很多个元素部分:
zlbytes 记录了整个ziplist的内存大小是多少字节
zltail 记录了整个内存块中的尾部内存地址
zlen 记录了整个内存块的长度
ziplist中的每个元素都是由一个entry组成的
entry的内部会包含有每个元素的一些基础信息,entry内部包含有:
prevlengh 记录上一个节点的长度,为了方便反向遍历ziplist
encoding 编码方式
data 实际节点的值
双端链表
prev和next两个指针 , 重点是可以从前往后也可以从后往前 , 这就可以实现lpush rpush这些指令了
因为用的链表 , 所以这也就导致了lindex指令 , 获取某个索引值的元素 , 需要遍历链表才可以获取到 , 时间复杂度是 O(n)
当列表对象可以同时满足下列两个条件时,列表对象采用压缩链表编码:
(1)列表对象保存的所有字符串元素的长度都小于64字节;
(2)列表元素保存的元素数量小于512个;
以上两个条件的上限值可以在配置文件中修改 list-max-ziplist-value选项和 list-max-ziplist-entries选项,否则采用双端链表编码
redis3.2版本以后采用的快速列表
quicklist 是一个双向链表,并且是一个ziplist的双向链表,也就是说quicklist的每个节点都是一个ziplist的entries。结合了两者的优点。
因为在ziplist中,每个zlentry都存储着前一个节点所占的字节数,而这个数值又是变长编码的。假设存在一个压缩列表,其包含e1、e2、e3、e4…..,e1节点的大小为253字节,那么e2.prevrawlen的大小为1字节,如果此时在e2与e1之间插入了一个新节点e_new,e_new编码后的整体长度(包含e1的长度)为254字节,此时e2.prevrawlen就需要扩充为5字节;如果e2的整体长度变化又引起了e3.prevrawlen的存储长度变化,那么e3也需要扩…….如此递归直到表尾节点或者某一个节点的prevrawlen本身长度可以容纳前一个节点的变化。其中每一次扩充都需要进行空间再分配操作。删除节点亦是如此,只要引起了操作节点之后的节点的prevrawlen的变化,都可能引起连锁更新。
连锁更新在最坏情况下需要进行N次空间再分配,而每次空间再分配的最坏时间复杂度为O(N),因此连锁更新的总体时间复杂度是O(N^2)。 即使涉及连锁更新的时间复杂度这么高,但它能引起的性能问题的概率是极低的:需要列表中存在大量的节点长度接近254的zlentry。
由于ziplist连锁更新的问题,也使得ziplist的优缺点极其明显;也使得后续Redis采取折中,替换了ziplist。
ziplist的主要优点是节省内存,但它上面的查找操作只能按顺序查找(可以是从前往后、也可以从后往前)。 ziplist将数据按照一定规则编码在一块连续的内存区域,目的是节省内存,这种结构并不擅长做修改操作。一旦数据发生改动,就会引发内存realloc,可能导致内存拷贝。
底层原理小结
Redis中的列表list,在版本3.2之前,列表底层的编码是ziplist和linkedlist实现的,但是在版本3.2之后,重新引入 quicklist,列表的底层都由quicklist实现。
在版本3.2之前,当列表对象中元素的长度比较小或者数量比较少的时候,采用ziplist来存储,当列表对象中元素的长度比较大或者数量比较多的时候,则会转而使用双向列表linkedlist来存储。
这两种存储方式的优缺点
可以认为quickList,是ziplist和linkedlist二者的结合;quickList将二者的优点结合起来。 官方给出的定义
A generic doubly linked quicklist implementation
A doubly linked list of ziplists
quickList是一个ziplist组成的双向链表。每个节点使用ziplist来保存数据。 本质上来说,quicklist里面保存着一个一个小的ziplist。
实战:如何根据List结构来实现一款阻塞队列
为啥要用Redis作为一款阻塞队列呢?
其实这一个技术设计主要取决于当前技术团队的一个发展阶段。业界内其实已经有了很多优秀的开源的消息队列中间件,但是如果我们换一个角度来思考问题,搭建一套成熟稳定的消息中间件所需要花费的时间成本,人力成本其实还是蛮高的。尤其是在创业小公司阶段,面对需要快速上线的业务需求,你作为一名leader还会选择使用这种如此之重的技术方案嘛?(我大概率不会)
打个比方:
我只想给家里小孩买个能打电话的手机带去学校,主要目的是为了让孩子在有需要沟通的时候能够联系到父母,那么此时在你面前有两类选择:老人机(可以打电话),新出的iphone 13(功能特别丰富)。此时你会选择哪个?
brpoplpush 指令
假设说当我们需要将某个元素弹出一个list之后去做一些额外的业务场景处理,此时就会需要考虑到如果业务场景处理过程中出现了异常,那么弹出的元素就会出现丢失情况。
此时比较好的处理方式便是将这个元素在弹出之后插入到一个备份的队列当中,等到真正将元素处理完毕后,再从备份队列中移除。
好了,接下来我们来试试如何通过brpopLpush设计一款适合小型公司使用的消息队列组件。
这里我只给出一些简单设计代码思路, 如果想要把这个组件进行更完善的处理还需要针对内部的具体策略做一些完善和调整。
首先我们来看具体的实现结果:
往redis中发送消息:
然后是消费这条消息:
整个代码的使用方式看起来比较简单易懂。那么该如何去做具体的实现呢?整体的代码模块如下:
由于时间原因,这个组件还有关于重试策略,接入spi模块没有开发完成,但是整体的核心逻辑已经成型。
首先需要定义一个消息队列服务:
public interface IMessageQueueService {
SendResult send(MessageDTO messageDTO);
SendResult sendAsync(MessageDTO messageDTO);
}
此外还需要定义消息发送的DTO
public class MessageDTO implements Serializable {
private static final long serialVersionUID = 2933961555708653068L;
private String jsonParam;
public String getJsonParam() {
return jsonParam;
}
public void setJsonParam(String jsonParam) {
this.jsonParam = jsonParam;
}
@Override
public String toString() {
return "MessageDTO{" +
"jsonParam='" + jsonParam + '\'' +
'}';
}
}
除了这些之外,还有消息发送的结果响应对象:
public class SendResult {
private int code;
private String desc;
public static SendResult sendSuccess(){
return new SendResult(SEND_SUCCESS,"发送成功");
}
public static SendResult sendAsyncSuccess(){
return new SendResult(SEND_ASYNC_SUCCESS,"异步发送成功");
}
public static SendResult sendFail(){
return new SendResult(SEND_FAIL,"发送失败");
}
public SendResult(int code, String desc) {
this.code = code;
this.desc = desc;
}
public int getCode() {
return code;
}
public void setCode(int code) {
this.code = code;
}
public String getDesc() {
return desc;
}
public void setDesc(String desc) {
this.desc = desc;
}
}
发送数据之后的响应状态值:
public class SendResultStatus {
public static int SEND_ASYNC_SUCCESS =1;
public static int SEND_SUCCESS = 0;
public static int SEND_FAIL = -1;
}
具体的消息发送,我这里是通过引入了redis来实现消息的存储:
public class RedisMessageQueueService implements IMessageQueueService {
private IRedisService redisService;
private String queueName;
private String backQueueName;
public RedisMessageQueueService(String queueName, String backQueueName,IRedisService redisService) {
this.queueName = queueName;
this.backQueueName = backQueueName;
this.redisService = redisService;
}
@Override
public SendResult send(MessageDTO messageDTO) {
if (messageDTO == null) {
return SendResult.sendFail();
}
String jsonParam = JSON.toJSONString(messageDTO);
Long result = redisService.lpush(queueName, jsonParam);
if (result != null) {
return SendResult.sendSuccess();
}
return SendResult.sendFail();
}
@Override
public SendResult sendAsync(MessageDTO messageDTO) {
if (messageDTO == null) {
return SendResult.sendFail();
}
String jsonParam = JSON.toJSONString(messageDTO);
Long result = redisService.lpush(queueName, jsonParam);
if (result != null) {
return SendResult.sendAsyncSuccess();
}
return SendResult.sendFail();
}
}
这里你可能会疑惑,为什么这个对象没有加入Spring容器进行管理(其实是因为没有完善好,所以内部的很多属性都是在Spring容器初始化阶段通过反射来进行注入的...)
相关的属性注入配置类如下:
@Configuration
public class RedisQueueConfiguration implements BeanPostProcessor {
private static final Logger LOGGER = LoggerFactory.getLogger(RedisQueueConfiguration.class);
@Resource
private ApplicationContext applicationContext;
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
Class clazz = AopUtils.isAopProxy(bean) ? AopUtils.getTargetClass(bean) : bean.getClass();
RedisQueueSubscription redisQueueSubscription = (RedisQueueSubscription) clazz.getAnnotation(RedisQueueSubscription.class);
if(redisQueueSubscription!=null && RedisMessageSubscription.class.equals(clazz.getSuperclass())) {
String queueName = redisQueueSubscription.queueName();
String backUpQueueName = redisQueueSubscription.backUpQueueName();
int timeOut = redisQueueSubscription.timeOut();
try {
Field queueNameField = clazz.getSuperclass().getDeclaredField("queueName");
queueNameField.setAccessible(true);
queueNameField.set(bean,queueName);
Field backUpQueueNameField = clazz.getSuperclass().getDeclaredField("backUpQueueName");
backUpQueueNameField.setAccessible(true);
backUpQueueNameField.set(bean,backUpQueueName);
Field timeOutField = clazz.getSuperclass().getDeclaredField("timeOut");
timeOutField.setAccessible(true);
timeOutField.set(bean,timeOut);
Field redisServiceField = clazz.getSuperclass().getDeclaredField("redisService");
redisServiceField.setAccessible(true);
redisServiceField.set(bean, applicationContext.getBean(IRedisService.class));
Method methods[] = clazz.getSuperclass().getMethods();
for (Method method : methods) {
if("onReceive".equals(method.getName())){
method.invoke(bean);
}
}
} catch (NoSuchFieldException | IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}
}
Field[] fields = clazz.getDeclaredFields();
for (Field field : fields) {
field.setAccessible(true);
if (field.isAnnotationPresent(RedisQueue.class)) {
RedisQueue redisQueue = field.getAnnotation(RedisQueue.class);
String queueName = redisQueue.queueName();
String backUpQueueName = redisQueue.backQueueName();
RedisMessageQueueService redisMessageQueueService = new RedisMessageQueueService(queueName,backUpQueueName, applicationContext.getBean(IRedisService.class));
try {
LOGGER.info("init redis queue ====== ");
field.set(bean,redisMessageQueueService);
} catch (IllegalAccessException e) {
e.printStackTrace();
}
}
}
return bean;
}
}
还有一个自定义的注解,用来制定消息需要存储的队列名称以及备份队列的名称(主要是考虑到消费失败做重复发送操作)
@Target(value = ElementType.FIELD)
@Documented
@Retention(value = RetentionPolicy.RUNTIME)
public @interface RedisQueue {
String queueName() default "";
String backQueueName() default "";
}
发送部分大概就这样了。
那么消费端则是采用了一次抽象,将核心的处理逻辑抽象到了父类对象中,具体的实现留给了子类去做拓展。
public abstract class RedisMessageSubscription implements IMessageSubscription {
private IRedisService redisService;
private String queueName;
private String backUpQueueName;
private int timeOut;
public RedisMessageSubscription(){
}
/**
* 留给子类执行核心程序
*
* @param result
* @return
*/
public abstract MessageConsumeResult dataHandle(String result);
@Override
public void onReceive() {
Thread thread = new Thread(new ReceiveDataThread());
thread.start();
}
class ReceiveDataThread implements Runnable{
@Override
public void run() {
while (true) {
try {
String result = redisService.brPopLpush(queueName,backUpQueueName,timeOut);
MessageConsumeResult messageConsumeResult = dataHandle(result);
if(messageConsumeResult.isSuccess()){
continue;
} else {
//进入一个重试阶段
}
}catch (Exception e){
//有时候会出现链接超时,所以如果一旦订阅了某个队列之后出现异常,那么就需要考虑到这种中断的情况
e.printStackTrace();
}
}
}
}
public IRedisService getRedisService() {
return redisService;
}
public void setRedisService(IRedisService redisService) {
this.redisService = redisService;
}
public String getQueueName() {
return queueName;
}
public void setQueueName(String queueName) {
this.queueName = queueName;
}
public String getBackUpQueueName() {
return backUpQueueName;
}
public void setBackUpQueueName(String backUpQueueName) {
this.backUpQueueName = backUpQueueName;
}
public int getTimeOut() {
return timeOut;
}
public void setTimeOut(int timeOut) {
this.timeOut = timeOut;
}
}
消费响应类如下:
public class MessageConsumeResult {
private int code;
private String desc;
public MessageConsumeResult(int code, String desc) {
this.code = code;
this.desc = desc;
}
public static MessageConsumeResult consumeSuccess() {
return new MessageConsumeResult(CONSUME_SUCCESS, "消费成功");
}
public boolean isSuccess() {
return CONSUME_SUCCESS == this.getCode();
}
public static MessageConsumeResult consumeFail() {
return new MessageConsumeResult(CONSUME_SUCCESS, "消费成功");
}
public int getCode() {
return code;
}
public void setCode(int code) {
this.code = code;
}
public String getDesc() {
return desc;
}
public void setDesc(String desc) {
this.desc = desc;
}
}
响应的一些常量配置类为:
public class MessageConsumeResultStatus {
public static int CONSUME_SUCCESS =1;
public static int CONSUME_FAIL = -1;
}
因此,当需要程序需要使用的时候,只需要自定义一个consumer,然后继承这个抽象的父亲类对象即可。如下:
@RedisQueueSubscription(queueName = "message-list",backUpQueueName = "backup-message-list")
@Component
public class RedisMessageConsumer extends RedisMessageSubscription{
@Override
public MessageConsumeResult dataHandle(String result) {
System.out.println("result is "+ result);
return MessageConsumeResult.consumeSuccess();
}
}
这款组件还有很多不完善的部分,最近也在尝试继续完善它,例如可以考虑加入一些重试机制,关于重试部分打算参考这几篇文章中给出的思路。
https://cloud.tencent.com/developer/article/1038304
http://bittechblog.com/blog/article/8
里面的干货讲解都很赞~~
这是国庆节前的最后一篇文章,Idea在这里先预祝各位读者们国庆节快乐~~
本文由哈喽比特于3年以前收录,如有侵权请联系我们。
文章来源:https://mp.weixin.qq.com/s/E0phFmTtEJxffQdvjAXL9g
京东创始人刘强东和其妻子章泽天最近成为了互联网舆论关注的焦点。有关他们“移民美国”和在美国购买豪宅的传言在互联网上广泛传播。然而,京东官方通过微博发言人发布的消息澄清了这些传言,称这些言论纯属虚假信息和蓄意捏造。
日前,据博主“@超能数码君老周”爆料,国内三大运营商中国移动、中国电信和中国联通预计将集体采购百万台规模的华为Mate60系列手机。
据报道,荷兰半导体设备公司ASML正看到美国对华遏制政策的负面影响。阿斯麦(ASML)CEO彼得·温宁克在一档电视节目中分享了他对中国大陆问题以及该公司面临的出口管制和保护主义的看法。彼得曾在多个场合表达了他对出口管制以及中荷经济关系的担忧。
今年早些时候,抖音悄然上线了一款名为“青桃”的 App,Slogan 为“看见你的热爱”,根据应用介绍可知,“青桃”是一个属于年轻人的兴趣知识视频平台,由抖音官方出品的中长视频关联版本,整体风格有些类似B站。
日前,威马汽车首席数据官梅松林转发了一份“世界各国地区拥车率排行榜”,同时,他发文表示:中国汽车普及率低于非洲国家尼日利亚,每百户家庭仅17户有车。意大利世界排名第一,每十户中九户有车。
近日,一项新的研究发现,维生素 C 和 E 等抗氧化剂会激活一种机制,刺激癌症肿瘤中新血管的生长,帮助它们生长和扩散。
据媒体援引消息人士报道,苹果公司正在测试使用3D打印技术来生产其智能手表的钢质底盘。消息传出后,3D系统一度大涨超10%,不过截至周三收盘,该股涨幅回落至2%以内。
9月2日,坐拥千万粉丝的网红主播“秀才”账号被封禁,在社交媒体平台上引发热议。平台相关负责人表示,“秀才”账号违反平台相关规定,已封禁。据知情人士透露,秀才近期被举报存在违法行为,这可能是他被封禁的部分原因。据悉,“秀才”年龄39岁,是安徽省亳州市蒙城县人,抖音网红,粉丝数量超1200万。他曾被称为“中老年...
9月3日消息,亚马逊的一些股东,包括持有该公司股票的一家养老基金,日前对亚马逊、其创始人贝索斯和其董事会提起诉讼,指控他们在为 Project Kuiper 卫星星座项目购买发射服务时“违反了信义义务”。
据消息,为推广自家应用,苹果现推出了一个名为“Apps by Apple”的网站,展示了苹果为旗下产品(如 iPhone、iPad、Apple Watch、Mac 和 Apple TV)开发的各种应用程序。
特斯拉本周在美国大幅下调Model S和X售价,引发了该公司一些最坚定支持者的不满。知名特斯拉多头、未来基金(Future Fund)管理合伙人加里·布莱克发帖称,降价是一种“短期麻醉剂”,会让潜在客户等待进一步降价。
据外媒9月2日报道,荷兰半导体设备制造商阿斯麦称,尽管荷兰政府颁布的半导体设备出口管制新规9月正式生效,但该公司已获得在2023年底以前向中国运送受限制芯片制造机器的许可。
近日,根据美国证券交易委员会的文件显示,苹果卫星服务提供商 Globalstar 近期向马斯克旗下的 SpaceX 支付 6400 万美元(约 4.65 亿元人民币)。用于在 2023-2025 年期间,发射卫星,进一步扩展苹果 iPhone 系列的 SOS 卫星服务。
据报道,马斯克旗下社交平台𝕏(推特)日前调整了隐私政策,允许 𝕏 使用用户发布的信息来训练其人工智能(AI)模型。新的隐私政策将于 9 月 29 日生效。新政策规定,𝕏可能会使用所收集到的平台信息和公开可用的信息,来帮助训练 𝕏 的机器学习或人工智能模型。
9月2日,荣耀CEO赵明在采访中谈及华为手机回归时表示,替老同事们高兴,觉得手机行业,由于华为的回归,让竞争充满了更多的可能性和更多的魅力,对行业来说也是件好事。
《自然》30日发表的一篇论文报道了一个名为Swift的人工智能(AI)系统,该系统驾驶无人机的能力可在真实世界中一对一冠军赛里战胜人类对手。
近日,非营利组织纽约真菌学会(NYMS)发出警告,表示亚马逊为代表的电商平台上,充斥着各种AI生成的蘑菇觅食科普书籍,其中存在诸多错误。
社交媒体平台𝕏(原推特)新隐私政策提到:“在您同意的情况下,我们可能出于安全、安保和身份识别目的收集和使用您的生物识别信息。”
2023年德国柏林消费电子展上,各大企业都带来了最新的理念和产品,而高端化、本土化的中国产品正在不断吸引欧洲等国际市场的目光。
罗永浩日前在直播中吐槽苹果即将推出的 iPhone 新品,具体内容为:“以我对我‘子公司’的了解,我认为 iPhone 15 跟 iPhone 14 不会有什么区别的,除了序(列)号变了,这个‘不要脸’的东西,这个‘臭厨子’。