在得物技术体系中,大量使用Redis作为缓存中间件,以应对高并发下的大流量场景。在使用缓存时,不得不考虑数据一致性问题,即保证缓存中的数据和DB始终可以保持一致。常规的解决缓存一致性的方案一般为先修改DB并提交事物,再操作缓存更新或者失效,为了应对极端场景往往会再采用延迟操作的方式进行缓存的二次处理。
但实际开发中,遇到很多代码不规范的场景,在JDBC事务中进行缓存删除或者更新等操作,带来的问题是当JDBC事务未提交就完成了Redis的操作,容易造成二者数据不一致。所以我们思考:既然Redis本身也提供了事务的解决方案,那能不能将Redis事务和DB的事务进行结合,来保证数据一致性的问题呢?接下来我们就带着这个问题看看看Redis事务的实现、使用,最终探索一下将Redis操作结合DB事务使用的可能性。
Redis事务涉及命令MULTI、EXEC、WATCH、DISCARD、UNWATCH,命令含义如下:
命令简单,不展开讲,主要看整个事务实现过程,从RedisServer源码入手,看以下几个重点的结构定义:
/* Global server state structure */
struct redisServer {
... //省略所有不关心的属性
list *clients; //客户端链表
};
RedisServer的全局结构,省略所有不关心的属性,重点看clients属性(双向列表),实际存储RedisClient结构,当客户端到服务端连接建立时则会创建RedisClient结构(networking.c),继续看RedisClient结构定义。
// 每个Connection对应一个RedisClient结构
typedef struct redisClient {
//事务状态
multiState mstate;
} redisClient;
从RedisClient的定义可以看出,主要存储事务状态信息(意味着事务状态主要记录在Server端),至于事务状态信息具体什么结构,往下看。
// Redis命令Queue
typedef struct multiState {
//事务队列,FIFO
multiCmd *commands;
//已入队命令计数器
int count;
} multiState;
MultiState实际是一个FIFO的队列,继续看下MultiCmd定义。
// 命令节点:记录加入队列命令信息
typedef struct multiCmd {
//参数
robj **argv;
//参数数量
int argc;
//命令指针
struct redisCommand *cmd;
} multiCmd;
MultiCmd就是记录每一个命令的节点。
从上面一连串数据结构定义基本可以看出RedisServer对Multi事务的实现本质是一个FIFO的命令收集队列,等待执行——事务提交(当然也可以选择放弃)。
由于Redis事务采用的是命令收集方式,这种方式并不是友好的事务实现,或者说不是真实事务场景(类比JDBC事务,立即发生,采用redo和undo日志保证一致性),是一种滞后的行为。
命令收集意味着命令并不是立即执行,而是加入队列,你拿不到命令执行的结果,当你需要根据某个已有值(读取)进行逻辑区分时,此场景不可实现。实例代码如下:
// 读写混合逻辑
String evaluate_value = redisTemplate.opsForValue().get("key"); // read
if ("condition_value".equals(evaluate_value)) { // 根据已有值进行逻辑区分
redisTemplate.opsForValue().set("key1", "value1"); // write
} else {
redisTemplate.opsForValue().set("key1", "value2"); // write
}
个人认为更佳致命的设计在于既然不支持事务内读操作,那就明确报错(也不是很友好)禁用API,或者给其他的命令来解决,否则同一段代码其语意逻辑在不同场景表现不同,很容易引起Bug(甚至是事故)。
此处所说的回滚并不是指DISCARD命令,DISCARD命令只是丢弃整个队列,这里讲的是一旦提交EXEC命令,所有命令将FIFO的原则逐个执行,一但某个命令执行失败,并不会终止,而是继续往下执行,直到所有命令被执行完成。
当然,EXEC命令并非每次都是成功的,比如监测到WATCH的Keys有发生改变,则Exec执行失败。
Redis官方说明集群模式不支持事务,甚至是不支持multi的所有操作,如MSET,MGET等,主要原因是key的分散(命令需要MOVE到其他节点)。但实际并不是绝对,只要Redis客户端允许(未禁用)命令(JedisCluster报错),执行MULTI和EXEC也是可以的,并不会每次都返回错误,当事务提交所有命令涉及的key都能在对应的服务节点上完成时,不会报错。
另外,Redis Cluster如果存在反向代理服务来暴露服务,同样是可以通过代理来完成事务,比如阿里云的Redis Cluster,兼容所有集群命令提交。
前面了解了Redis事务实现,接下来了解下RedisTemplate。
RedisTemplate是Spring Data Redis提供给用户的最高级的抽象Redis Client,抽象封装主要是三个方面:
RedisTemplate也考虑到Redis的一些高级用法(事务Session、Pipeline、暴露Connection),故还是保留一些可重载的方法(所有execute开通的方法),方便使用者使用。
RedisTemplate作为Client抽象层,默认支持Jedis和Lettuce Client,当然也可以自行实现支持Redisson(需实现RedisClientProvider等相关工作)。下面具体看下各个客户端的特点,有助于侧面理解RedisTemplate封装连接管理的逻辑。
客户端 | 特点 |
---|---|
Jedis | 官方版本,Redis的操作特性支持全面 |
使用阻塞的I/O,且其方法调用都是同步的,程序流需要等到sockets处理完I/O才能执行,不支持异步 | |
Jedis客户端实例不是线程安全的,所以需要通过连接池来使用Jedis | |
Lettuce | Redis高级客户端,较好支持分布应用使用场景 |
Lettuce的API是线程安全的,所以可以操作单个Lettuce连接来完成各种操作 | |
Redisson | 重点关注Redis分布应用使用场景:分布式锁,分布式集合,可通过Redis支持延迟队列 |
基于Netty框架的事件驱动的通信层,其方法调用是异步的 | |
Redisson的API是线程安全的,所以可以操作单个Redisson连接来完成各种操作 |
可以看出,客户端分两类,主要在于连接实现是否采用Non-blocking I/O,基于NIO的实现方式,连接本身可以并发复用(Jedis基于Blocking I/O,一个连接同一时间仅给一个线程使用,归还连接池后才可再分配线程使用,而Lettuce和Redisson基于Non-blocking I/O,多个线程可以往同一个fd_socket上写),也就是说线程安全的,但是Jedis却正好相反,可以推断,RedisTemplate需要兼容这些特性。
连接管理封装方向:
PS:文章写到这里,敏感的同学应该可以想到,Redis事务状态存储在服务端,而客户端连接又是可以并发复用的,这自然会遇到一个棘手问题,如何保证事务状态不会串,一但事务状态错乱,将出现以下问题:
重复Multi:(error) ERR MULTI calls can not be nested)
执行空Exec:(error) ERR EXEC without MULTI
前面简单了解了Redis事务实现以及RedisTemplate封装,接下来看看RedisTemplate是如何正确使用事务。
使用SessionCallback来包装所有事务命令,如下代码
public <T> T execute(SessionCallback<T> session) {
Assert.isTrue(this.initialized, "template not initialized; call afterPropertiesSet() before using it");
Assert.notNull(session, "Callback object must not be null");
RedisConnectionFactory factory = this.getRequiredConnectionFactory();
RedisConnectionUtils.bindConnection(factory, this.enableTransactionSupport);
Object var3;
try {
var3 = session.execute(this);
} finally {
RedisConnectionUtils.unbindConnection(factory);
}
return var3;
}
通过前面一些预备知识,可以理解到,RedisTemplate execute SessionCallback实际就是先做了连接绑定,我们可以去看看源码:
public <T> T execute(SessionCallback<T> session) {
Assert.isTrue(this.initialized, "template not initialized; call afterPropertiesSet() before using it");
Assert.notNull(session, "Callback object must not be null");
RedisConnectionFactory factory = this.getRequiredConnectionFactory();
RedisConnectionUtils.bindConnection(factory, this.enableTransactionSupport);
Object var3;
try {
var3 = session.execute(this);
} finally {
RedisConnectionUtils.unbindConnection(factory);
}
return var3;
}
反之,如果不使用SessionCallback来包装所有命令,会遇到什么后果(可以根据前面学习知识去思考,事务状态会出现错乱),如下代码:
redisTemplate.multi();
redisTemplate.opsForValue().set("key1", "value1"); // cmd1
redisTemplate.opsForValue().set("key2", "value2"); // cmd2
Object result = redisTemplate.exec();
实际上面这段代码运行可能不会遇到你想的结果,比如编写Demo的测试并不会遇到复杂的多线程并发场景,另外也不见得你特意使用了Lettuce并配置了连接池(Lettuce可以单连接模式,早期版本仅支持单连接,默认是单连接),在使用Jedis下客户端情况下,也不会遇到此问题,上下文的连接实际是同一个,绑不绑定都不重要。
特别注意:虽然使用 SessionCallback 来包装执行事务命令,但是请确保事务能结束掉,连接池不保证每次用完连接一定会 colse。
// 事务执行;正常使用
Object result = redisTemplate.execute(new SessionCallback<Object>() {
@Override
public Object execute(RedisOperations redisOperations) throws DataAccessException {
redisOperations.multi();
try {
redisOperations.opsForValue().set("key1", "value1"); // cmd1
redisOperations.opsForValue().set("key2", "value2"); // cmd2
return redisOperations.exec();
} catch (Exception e) {
redisOperations.discard(); // 异常丢弃命令
return null;
}
}
});
从RedisTemplate源码看出,有一个enableTransactionSupport特别显眼,从字面意思看,是支持事务的意思,另外看下命令执行入口(execute RedisCallback方法)代码可以看出:
@Nullable
public <T> T execute(RedisCallback<T> action, boolean exposeConnection, boolean pipeline) {
Assert.isTrue(this.initialized, "template not initialized; call afterPropertiesSet() before using it");
Assert.notNull(action, "Callback object must not be null");
RedisConnectionFactory factory = this.getRequiredConnectionFactory();
RedisConnection conn = null;
Object var11;
try {
if (this.enableTransactionSupport) {
conn = RedisConnectionUtils.bindConnection(factory, this.enableTransactionSupport);
} else {
conn = RedisConnectionUtils.getConnection(factory);
}
boolean existingConnection = TransactionSynchronizationManager.hasResource(factory);
RedisConnection connToUse = this.preProcessConnection(conn, existingConnection);
boolean pipelineStatus = connToUse.isPipelined();
if (pipeline && !pipelineStatus) {
connToUse.openPipeline();
}
RedisConnection connToExpose = exposeConnection ? connToUse : this.createRedisConnectionProxy(connToUse);
T result = action.doInRedis(connToExpose);
if (pipeline && !pipelineStatus) {
connToUse.closePipeline();
}
var11 = this.postProcessResult(result, connToUse, existingConnection);
} finally {
RedisConnectionUtils.releaseConnection(conn, factory, this.enableTransactionSupport);
}
return var11;
}
如果enableTransactionSupport开启(代码第10行),在上下文中存在事务(特指spring-tx封装的一套上下文事务,常见注解事务:@Transactional)同样会绑定连接,所以如果整个RedisTemplate开启了enableTransactionSupport状态,以下代码则是正确写法,无论是否使用Lettuce连接池模式:
// 函数已注解申明事务 @Transactional
redisTemplate.opsForValue().set("key1", "value1"); // cmd1
redisTemplate.opsForValue().set("key2", "value2"); // cmd2
分析以上代码仍然从连接绑定(RedisConnectionUtils.bindConnection)入手,具体细节不再贴出来,找到RedisConnectionUtils.doGetConnection方法,因为开启enableTransactionSupport,找到RedisConnectionUtils.potentiallyRegisterTransactionSynchronisation即可看出,在上下文存在写事务时,自动开启multi。当时上下文事务提交时,再执行exec或者discard,具体实现代码RedisConnectionUtils.RedisTransactionSynchronizer内部类中。
RedisTemplate开启enableTransactionSupport后,看似事务使用变得轻巧,实际则不然,有以下诟病:
前面两点应该好理解,我就着重解释下第三点什么意思,读写混合,如下代码:
public void mark(String referValue) {
String value = redisTemplate.opsForValue().get("key"); // cmd1
if (referValue.equals(value)) {
redisTemplate.opsForValue().set("key", "value2"); // cmd2
} else {
redisTemplate.opsForValue().set("key", "value3"); // cmd3
}
}
函数在无事务情况下,调用一直正常(逻辑正确),但某一天被其他同事在上下文事务中调用,结果就出问题了。遇到问题很难排除,一是老代码,老逻辑,甚至是二方三方Jar代码,毋庸置疑;二是事务并不是专门为了Redis事务而开启,往往是因为JDBC事务需要(写完DB要操作Redis,这是一个常用场景)。
甚至还有一种场景,因为你配置redisTemplate默认的Bean为enableTransactionSupport后,发现很多二方三方Jar代码都出现逻辑运行错误。这些例子都可以说明,开启enableTransactionSupport的维护成本远远大于他的使用价值。
本文虽讲解了RedisTemplate事务使用,但是并不推荐大家使用Redis事务,Redis事务实现本身比较简单,甚至说非真正意义上的事务;另外RedisTemplate虽然对redis命令进行封装,但并没有严格约束命令的使用创景,如get命令在multi(pipeline一样)开启时无返回值也不禁用,很容易引发深层次问题,很难排查(读写混合逻辑)。
单纯的Redis事务一般确实没有太多使用场景,如果只考虑原子性完全可以使用lua脚本(实际命令在服务端执行是单线程的),我了解到逼迫我们考虑Redis事务或者说关注enableTransactionSupport,是因JDBC事务中Redis命令提前提交问题,尽管我们总是在DB执行后再执行Redis命令,如下代码
@Transactional
public void update(String name, Intger age) {
Person person = reposity.getByName(name);
if (person != null) {
person.setAge(age);
// save to db
reposity.update(person);
// update cache
redisTemplate.opsForHash().put(name, "age", age);
// send notice
producer.send(new ProducerRecord<>("PersonChanged", null, name));
}
}
当消息发送失败,导致事务回滚,实际redis命令早已执行,无法回滚;就算消息发送并没有失败,redis put命令也早于db的update执行,其中导致的后果则是redis与db不一致。
如果要解决以上场景问题,开始 enableTransactionSupport 自然是能解决问题,但是并非最佳手段,如 redis 的缓存异步提交,甚至监听 binlog 再提交,都是不错的选择。如果觉得这样做成本略大,当然也可以自行封装 RedisTemplate,将所有 JDBC 事务中的 Redis 命令收集到 Queue 中,等到事务提交后再执行。
新增CustomRedisTemplate类,继承spring-data中的RedisTemplate类,重写execute方法。
// 省略部分逻辑
public class CustomRedisTemplate<K, V> extends RedisTemplate<K, V> {
private <T> T execute(RedisCallback<T> action, boolean exposeConnection, boolean pipeline, RedisCallContext context, boolean immediately) {
// 开启原生事务支持(spring-data)直接执行
if (enableOriginTransactionSupport) {
return superExecute(action, exposeConnection, pipeline, cmd, key);
}
// 事务完的操作,不用加入到队列 直接执行
if (isTransactionCompletion()) {
return superExecute(action, exposeConnection, pipeline, cmd, key);
}
// 非即时性的,且写事务中 pending 不立即执行,写入队列中
if (!immediately && isActualNonReadonlyTransactionActive()) {
if (context != null) {
log.debug("Pending execute redis {} {} in a transaction", context.cmd, context.key);
}
RedisCallbackItem item = new RedisCallbackItem(action, exposeConnection, pipeline);
if (context != null) {
item.description = "" + context.cmd + " " + context.key;
}
RedisTransactionSupportSynchronization synchronization = getTransactionSupportSynchronization();
// 写入队列中
synchronization.items.add(item);
return null;
}
return superExecute(action, exposeConnection, pipeline, cmd, key);
}
}
execute的核心方法为判断当前命令是否需要立即执行,并且是否在事务当中,如果不满足,则调用父类RedisTemplate.execute方法执行;如果满足条件,则写入items队列中,等待执行。
新增RedisTransactionSupportSynchronization类,继承TransactionSynchronizationAdapter类。TransactionSynchronizationAdapter是spring-tx中提供的接口适配器,预留给大家实现事务的扩展使用。
private static class RedisTransactionSupportSynchronization extends TransactionSynchronizationAdapter {
public RedisTransactionSupportSynchronization(CustomRedisTemplate redisTemplate) {
this.redisTemplate = redisTemplate;
}
private final CustomRedisTemplate redisTemplate;
private final List<RedisCallbackItem> items = new ArrayList<>();
// 实现afterCompletion方法,用于JDBC事务完成后执行Redis命令
@Override
public void afterCompletion(int status) {
process(status);
}
private void process(int status) {
// 0:提交 ; 1,2:回滚和终端
if (status == 0) {
items.forEach((item) -> {
// 实际实行Redis命令
Object result = redisTemplate.execute(item.action,item.exposeConnection,item.pipeline, null, true);
log.debug("Execute redis {} result:{} after transaction commit.", item.description, result);
});
} else {
items.forEach((item) -> {
log.warn("Discard redis {} after transaction rollback.", item.description);
});
}
}
}
实现afterCompletion方法,在JDBC事务完成后拿到上面放置items队列中的Redis执行命令,开始执行Redis操作,保证Redis的执行,在JDBC事务之后。
本文由哈喽比特于2年以前收录,如有侵权请联系我们。
文章来源:https://mp.weixin.qq.com/s/a10GLPU-wPlnBEf5qfBDrA
京东创始人刘强东和其妻子章泽天最近成为了互联网舆论关注的焦点。有关他们“移民美国”和在美国购买豪宅的传言在互联网上广泛传播。然而,京东官方通过微博发言人发布的消息澄清了这些传言,称这些言论纯属虚假信息和蓄意捏造。
日前,据博主“@超能数码君老周”爆料,国内三大运营商中国移动、中国电信和中国联通预计将集体采购百万台规模的华为Mate60系列手机。
据报道,荷兰半导体设备公司ASML正看到美国对华遏制政策的负面影响。阿斯麦(ASML)CEO彼得·温宁克在一档电视节目中分享了他对中国大陆问题以及该公司面临的出口管制和保护主义的看法。彼得曾在多个场合表达了他对出口管制以及中荷经济关系的担忧。
今年早些时候,抖音悄然上线了一款名为“青桃”的 App,Slogan 为“看见你的热爱”,根据应用介绍可知,“青桃”是一个属于年轻人的兴趣知识视频平台,由抖音官方出品的中长视频关联版本,整体风格有些类似B站。
日前,威马汽车首席数据官梅松林转发了一份“世界各国地区拥车率排行榜”,同时,他发文表示:中国汽车普及率低于非洲国家尼日利亚,每百户家庭仅17户有车。意大利世界排名第一,每十户中九户有车。
近日,一项新的研究发现,维生素 C 和 E 等抗氧化剂会激活一种机制,刺激癌症肿瘤中新血管的生长,帮助它们生长和扩散。
据媒体援引消息人士报道,苹果公司正在测试使用3D打印技术来生产其智能手表的钢质底盘。消息传出后,3D系统一度大涨超10%,不过截至周三收盘,该股涨幅回落至2%以内。
9月2日,坐拥千万粉丝的网红主播“秀才”账号被封禁,在社交媒体平台上引发热议。平台相关负责人表示,“秀才”账号违反平台相关规定,已封禁。据知情人士透露,秀才近期被举报存在违法行为,这可能是他被封禁的部分原因。据悉,“秀才”年龄39岁,是安徽省亳州市蒙城县人,抖音网红,粉丝数量超1200万。他曾被称为“中老年...
9月3日消息,亚马逊的一些股东,包括持有该公司股票的一家养老基金,日前对亚马逊、其创始人贝索斯和其董事会提起诉讼,指控他们在为 Project Kuiper 卫星星座项目购买发射服务时“违反了信义义务”。
据消息,为推广自家应用,苹果现推出了一个名为“Apps by Apple”的网站,展示了苹果为旗下产品(如 iPhone、iPad、Apple Watch、Mac 和 Apple TV)开发的各种应用程序。
特斯拉本周在美国大幅下调Model S和X售价,引发了该公司一些最坚定支持者的不满。知名特斯拉多头、未来基金(Future Fund)管理合伙人加里·布莱克发帖称,降价是一种“短期麻醉剂”,会让潜在客户等待进一步降价。
据外媒9月2日报道,荷兰半导体设备制造商阿斯麦称,尽管荷兰政府颁布的半导体设备出口管制新规9月正式生效,但该公司已获得在2023年底以前向中国运送受限制芯片制造机器的许可。
近日,根据美国证券交易委员会的文件显示,苹果卫星服务提供商 Globalstar 近期向马斯克旗下的 SpaceX 支付 6400 万美元(约 4.65 亿元人民币)。用于在 2023-2025 年期间,发射卫星,进一步扩展苹果 iPhone 系列的 SOS 卫星服务。
据报道,马斯克旗下社交平台𝕏(推特)日前调整了隐私政策,允许 𝕏 使用用户发布的信息来训练其人工智能(AI)模型。新的隐私政策将于 9 月 29 日生效。新政策规定,𝕏可能会使用所收集到的平台信息和公开可用的信息,来帮助训练 𝕏 的机器学习或人工智能模型。
9月2日,荣耀CEO赵明在采访中谈及华为手机回归时表示,替老同事们高兴,觉得手机行业,由于华为的回归,让竞争充满了更多的可能性和更多的魅力,对行业来说也是件好事。
《自然》30日发表的一篇论文报道了一个名为Swift的人工智能(AI)系统,该系统驾驶无人机的能力可在真实世界中一对一冠军赛里战胜人类对手。
近日,非营利组织纽约真菌学会(NYMS)发出警告,表示亚马逊为代表的电商平台上,充斥着各种AI生成的蘑菇觅食科普书籍,其中存在诸多错误。
社交媒体平台𝕏(原推特)新隐私政策提到:“在您同意的情况下,我们可能出于安全、安保和身份识别目的收集和使用您的生物识别信息。”
2023年德国柏林消费电子展上,各大企业都带来了最新的理念和产品,而高端化、本土化的中国产品正在不断吸引欧洲等国际市场的目光。
罗永浩日前在直播中吐槽苹果即将推出的 iPhone 新品,具体内容为:“以我对我‘子公司’的了解,我认为 iPhone 15 跟 iPhone 14 不会有什么区别的,除了序(列)号变了,这个‘不要脸’的东西,这个‘臭厨子’。