AQS 即 AbstractQueuedSynchronizer,是用来实现锁和线程同步的一个工具类。大部分操作基于 CAS 和 FIFO 队列来实现。
如果让我们自己 ,实现可以分为几个大部分:
我们来想一下这几个部分的实现。
首先,用一个变量 state 作为锁的标志位。默认值 0,表示此时所有线程都可以加锁。加锁的时候通过 CAS 将 state 从 0 变为 1,CAS 执行成功表示加锁成功。
当有线程占有了锁,这时候有其他线程来加锁,按照下面判断当前来抢锁的线程是不是占用锁的线程:
有没有什么其他可以优化的地方?比如当放入等待队列的时候,看看有没有其他线程?
有。锁被占用了,并且轮不到当前线程来抢,直接阻塞就行了。在放入队列时候,通过CAS 再尝试获取一波锁。如果获取成功,就不用阻塞了,提高了效率。
通过 CAS 对 state 减 1。如果是重入锁,释放一次减一次。当 state 等于 0 时表示锁被释放,唤醒等待队列中的线程。
入队这个过程和我们平常使用的队列不同。平常使用的队列每次生成一个节点放入即可。而 AQS 队列为空时,第一次生成两个节点。第一个节点代表当前占有锁的线程,第二个节点为抢锁失败的节点。不为空的时候,每次生成一个节点放入队尾。
当把线程放入队列中时,后续应该做哪些操作呢?
如果让你写是不是直接放入队列中就完事了?但 Doug Lea 是这样做的:
当 A 线程释放锁,唤醒队列中的 B线程,A 线程会从队列中删除。那出队这个事情由谁来做?是由被唤醒的线程来做,即B 线程。
阻塞和唤醒线程调用 API 即可:
// 阻塞线程
LockSupport.park(this)
// 唤醒线程
LockSupport.unpark(this)
JUC 中的许多并发工具类的实现都依赖 AbstractQueuedSynchronizer,例如 ReentrantLock、CountDownLatch 等。
AbstractQueuedSynchronizer 定义了一个锁实现的内部流程,而如何加锁和解锁则在各个子类中实现。这是典型的模板方法模式。
AQS 内部维护了一个 FIFO 的队列(底层实现就是双向链表),通过该队列来实现线程的并发访问控制。
队列中的元素是一个 Node 节点:
static final class Node {
//表示当前线程以共享模式持有锁
static final Node SHARED = new Node();
//表示当前线程以独占模式持有锁
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
//当前节点的状态
volatile int waitStatus;
//前继节点
volatile Node prev;
//后继节点
volatile Node next;
//当前线程
volatile Thread thread;
//存储在condition队列中的后继节点
Node nextWaiter;
}
waitStatus 表示节点的状态,默认为 0。包含的状态有:
再来看 AbstractQueuedSynchronizer 这个类的属性。
//等待队列的头节点
private transient volatile Node head;
//等待队列的尾节点
private transient volatile Node tail;
//加锁的状态,在不同子类中有不同的意义
private volatile int state;
这个 state 在不同的子类中有不同的含义:
AbstractQueuedSynchronizer 中的 FIFO 队列用双向链表来实现。
AQS 提供了独占锁和共享锁两种加锁方式,每种方式都有响应中断和不响应中断的区别。
因此 AQS 的锁可以分为如下四类:
而释放锁的方式只有两种:
以 ReentrantLock 为例,从加锁这一部分开始分析:
// 调用ReentrantLock.FairSync#lock方法其实就是调用acquire(1);
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
//获取到锁返回false,否则返回true
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
//当前线程将自己中断
selfInterrupt();
}
从代码层面详细分析一波。
tryAcquire 是留给子类实现的:
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
这里通过抛出异常来告诉子类要重写这个方法。
为什么不将这个方法定义为 abstract 方法呢?
因为 AQS 有两种功能,独占和共享。如果用 abstract 修饰,则子类需要同时实现两种功能的方法,对子类不友好。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
前面说过,AQS 队列为空时,第一次会放入两个节点。
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// 队列为空,进行初始化,
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
放入队列后还要干什么?
// 自旋获取锁,直到获取锁成功,或者异常退出
// 但是并不是busy acquire,因为当获取失败后会被挂起,由前驱节点释放锁时将其唤醒
// 同时由于唤醒的时候可能有其他线程竞争,所以还需要进行尝试获取锁,体现的非公平锁的精髓。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 获取前继节点
final Node p = node.predecessor();
// node节点的前继节点是head节点,尝试获取锁,如果成功说明head节点已经释放锁了
// 将node设为head开始运行(head中不包含thread)
if (p == head && tryAcquire(arg)) {
setHead(node);
// 将第一个节点出队
p.next = null; // help GC
failed = false;
return interrupted;
}
// 获取锁失败后是否可以挂起
// 如果可以挂起,则阻塞当前线程(获取锁失败的节点)
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) {
interrupted = true;
}
}
} finally {
if (failed)
cancelAcquire(node);
}
}
根据前继节点的状态,判断是否可以阻塞当前获取锁失败的节点。
一般情况会经历如下两个过程:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// 前继节点释放时会unpark后继节点,可以挂起
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
//将CANCELLED状态的线程清理出队列
// 后面会提到为什么会有CANCELLED的节点
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 将前继节点的状态设置为SIGNAL,代表释放锁时需要唤醒后面的线程
// cas更新可能失败,所以不能直接返回true
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
shouldParkAfterFailedAcquire 表示上好闹钟了,可以阻塞线程了。后续当线程被唤醒的时候会从 return 语句出继续执行,然后进入 acquireQueued 方法的死循环,重新抢锁。至此,加锁结束。
// 挂起线程,返回是否被中断过
private final boolean parkAndCheckInterrupt() {
// 阻塞线程
LockSupport.park(this);
// 返回当前线程是否被调用过Thread#interrupt方法
return Thread.interrupted();
}
最后用一个流程图来解释不响应中断的独占锁:
可以看到上面调用 acquireQueued 方法发生异常的时候,会调用 cancelAcquire 方法。我们就详细分析一下这个 cancelAcquire 方法有哪些作用。
哪些地方执行发生异常会执行 cancelAcquire?
可以看到调用 cancelAcquire 方法的有如下几个部分:
分析这些方法的调用,发现基本就是如下两个地方会发生异常:
//处理异常退出的node
private void cancelAcquire(Node node) {
if (node == null)
return;
// 设置该节点不再关联任何线程
node.thread = null;
// 跳过CANCELLED节点,找到一个有效的前继节点
Node pred = node.prev;
while (pred.waitStatus > 0) {
node.prev = pred = pred.prev;
}
// 获取过滤后的有效节点的后继节点
Node predNext = pred.next;
// 设置状态为取消
node.waitStatus = Node.CANCELLED;
// case 1
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// case 2
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0) {
compareAndSetNext(pred, predNext, next);
}
} else {
// case3
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
将 node 出队有如下三种情况:
当前节点是 tail
compareAndSetTail 将 tail 指向 pred compareAndSetNext,将 pred 的 next 指向 null,也就是把当前节点移出队列。
当前节点不是 head 的后继节点,也不是 tail
这里将 node 的前继节点的 next 指向了 node 的后继节点,即 compareAndSetNext(pred, predNext, next)。
注意:pred 和n ode 节点中间有可能有 CANCELLED 的节点,怕乱就没画出来。
当前节点是 head 的后继节点
没有对队列进行操作,只是进行 head 后继节点的唤醒操作(unparkSuccessor 方法,后面会分析这个方法)。因为此时它是 head 的后继节点,还是有可能获取到锁的,所以唤醒它尝试获取一波锁。
当再次调用到 shouldParkAfterFailedAcquire(判断是否应该阻塞的方法时)会把 CANCELLED 状态的节点从队列中删除。
独占锁是释放其实就是利用 CAS 将 state-1。当 state=0 表示锁被释放,需要将阻塞队列中的线程唤醒。
// 调用ReentrantLock#unlock方法其实就是调用release(1)
public final boolean release(int arg) {
// 尝试释放锁
// 当state=0,表示锁被释放,tryRelease返回true,此时需要唤醒阻塞队列中的线程
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0) {
unparkSuccessor(h);
}
return true;
}
return false;
}
tryRelease 即具体的解锁逻辑,需要子类自己去实现
唤醒同步队列中的线程,可以看到前面加了判断 h != null && h.waitStatus != 0。
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0) {
compareAndSetWaitStatus(node, ws, 0);
}
// 头结点的下一个节点
Node s = node.next;
// 为空或者被取消
if (s == null || s.waitStatus > 0) {
s = null;
// 从队列尾部向前遍历找到最前面的一个waitStatus<=0的节点
for (Node t = tail; t != null && t != node; t = t.prev) {
if (t.waitStatus <= 0) {
s = t;
}
}
}
if (s != null) {
// 唤醒节点,但并不表示它持有锁,要从阻塞的地方开始运行
LockSupport.unpark(s.thread);
}
}
为什么要从后向前找第一个非 CANCELLED 的节点呢?
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
// 线程在这里挂起了
pred.next = node;
return node;
}
}
enq(node);
return node;
}
这其实和入队的逻辑有关系。
假如 Node1 在图示位置挂起了,Node1 后面又陆续增加了 Node2 和 Node3。此时从前向后遍历会导致元素丢失,不能正确唤醒线程。
之前说过独占锁可以响应中断,也可以不响应中断。调用的方法如下:
所以只需要看这两个个方法的区别在哪里,下面只列出有区别的部分:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
public final void acquireInterruptibly(int arg)
throws InterruptedException {
// 判断线程是否被中断
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
acquire 在尝试获取锁的时候完全不管线程有没有被中断,而 acquireInterruptibly 在尝试获取锁之前会判断线程是否被中断。如果被中断,则直接抛出异常。
tryAcquire 方法一样,只需要对比 acquireQueued 方法和 doAcquireInterruptibly 方法的区别即可。
执行 acquireQueued 方法当线程发生中断时,只是将 interrupted 设置为 true,并且调用 selfInterrupt 方法将中断标志位设置为 true。
而执行 doAcquireInterruptibly 方法,当线程发生中断时,直接抛出异常。
最后看一下 parkAndCheckInterrupt 方法,这个方法中判断线程是否中断的逻辑特别巧妙!
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
Thread 类提供了如下两个个方法来判断线程是否是中断状态:
这里为什么用 interrupted 而不是 isInterrupted 呢?
演示一下这两个方法的区别:
@Test
public void testInterrupt() throws InterruptedException {
Thread thread = new Thread(() -> {
while (true) {}
});
thread.start();
TimeUnit.MICROSECONDS.sleep(100);
thread.interrupt();
// true
System.out.println(thread.isInterrupted());
// true
System.out.println(thread.isInterrupted());
// true
System.out.println(thread.isInterrupted());
}
@Test
public void testInterrupt2() {
Thread.currentThread().interrupt();
// true
System.out.println(Thread.interrupted());
// false
System.out.println(Thread.interrupted());
// false
System.out.println(Thread.interrupted());
}
isInterrupted 和 interrupted 的方法区别如下:
接着再写两个例子:
public static void main(String[] args) {
LockSupport.park();
// end被一直阻塞没有输出
System.out.println("end");
}
public static void main(String[] args) {
Thread.currentThread().interrupt();
LockSupport.park();
// 输出end
System.out.println("end");
}
可以看到当线程被中断时,调用 park() 方法并不会被阻塞:
public static void main(String[] args) {
Thread.currentThread().interrupt();
LockSupport.park();
// 返回中断状态,并且清除中断状态
Thread.interrupted();
// 输出start
System.out.println("start");
LockSupport.park();
// end被阻塞,没有输出
System.out.println("end");
}
到这我们就能理解为什么要进行中断的复位了:
所以这里要对中断进行复位,是为了不让循环一直执行,让当前线程进入阻塞状态,如果不进行复位,前一个线程在获取锁之后执行了很耗时的操作,那当前线程岂不是要一直执行死循环,造成CPU使用率飙升?
独占锁的获取和释放我们已经搞清楚了,接下来基于 AQS 自己写一个锁。
AQS 已经把入队、出队、阻塞、唤醒的操作都封装好了。当我们用 AQS 来实现自己的锁时,就非常的方便了,只需要重写加锁和解锁的逻辑即可。这里演示一个基于 AQS 实现的非重入的互斥锁。
public class MyLock {
private final Sync sync;
public MyLock() {
sync = new Sync();
}
public class Sync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire(int arg) {
return compareAndSetState(0, arg);
}
@Override
protected boolean tryRelease(int arg) {
setState(0);
return true;
}
}
public void lock() {
sync.acquire(1);
}
public void unLock() {
sync.release(1);
}
}
本文由哈喽比特于3年以前收录,如有侵权请联系我们。
文章来源:https://mp.weixin.qq.com/s/oHx2n0_EWUPt8m-7weRX0Q
京东创始人刘强东和其妻子章泽天最近成为了互联网舆论关注的焦点。有关他们“移民美国”和在美国购买豪宅的传言在互联网上广泛传播。然而,京东官方通过微博发言人发布的消息澄清了这些传言,称这些言论纯属虚假信息和蓄意捏造。
日前,据博主“@超能数码君老周”爆料,国内三大运营商中国移动、中国电信和中国联通预计将集体采购百万台规模的华为Mate60系列手机。
据报道,荷兰半导体设备公司ASML正看到美国对华遏制政策的负面影响。阿斯麦(ASML)CEO彼得·温宁克在一档电视节目中分享了他对中国大陆问题以及该公司面临的出口管制和保护主义的看法。彼得曾在多个场合表达了他对出口管制以及中荷经济关系的担忧。
今年早些时候,抖音悄然上线了一款名为“青桃”的 App,Slogan 为“看见你的热爱”,根据应用介绍可知,“青桃”是一个属于年轻人的兴趣知识视频平台,由抖音官方出品的中长视频关联版本,整体风格有些类似B站。
日前,威马汽车首席数据官梅松林转发了一份“世界各国地区拥车率排行榜”,同时,他发文表示:中国汽车普及率低于非洲国家尼日利亚,每百户家庭仅17户有车。意大利世界排名第一,每十户中九户有车。
近日,一项新的研究发现,维生素 C 和 E 等抗氧化剂会激活一种机制,刺激癌症肿瘤中新血管的生长,帮助它们生长和扩散。
据媒体援引消息人士报道,苹果公司正在测试使用3D打印技术来生产其智能手表的钢质底盘。消息传出后,3D系统一度大涨超10%,不过截至周三收盘,该股涨幅回落至2%以内。
9月2日,坐拥千万粉丝的网红主播“秀才”账号被封禁,在社交媒体平台上引发热议。平台相关负责人表示,“秀才”账号违反平台相关规定,已封禁。据知情人士透露,秀才近期被举报存在违法行为,这可能是他被封禁的部分原因。据悉,“秀才”年龄39岁,是安徽省亳州市蒙城县人,抖音网红,粉丝数量超1200万。他曾被称为“中老年...
9月3日消息,亚马逊的一些股东,包括持有该公司股票的一家养老基金,日前对亚马逊、其创始人贝索斯和其董事会提起诉讼,指控他们在为 Project Kuiper 卫星星座项目购买发射服务时“违反了信义义务”。
据消息,为推广自家应用,苹果现推出了一个名为“Apps by Apple”的网站,展示了苹果为旗下产品(如 iPhone、iPad、Apple Watch、Mac 和 Apple TV)开发的各种应用程序。
特斯拉本周在美国大幅下调Model S和X售价,引发了该公司一些最坚定支持者的不满。知名特斯拉多头、未来基金(Future Fund)管理合伙人加里·布莱克发帖称,降价是一种“短期麻醉剂”,会让潜在客户等待进一步降价。
据外媒9月2日报道,荷兰半导体设备制造商阿斯麦称,尽管荷兰政府颁布的半导体设备出口管制新规9月正式生效,但该公司已获得在2023年底以前向中国运送受限制芯片制造机器的许可。
近日,根据美国证券交易委员会的文件显示,苹果卫星服务提供商 Globalstar 近期向马斯克旗下的 SpaceX 支付 6400 万美元(约 4.65 亿元人民币)。用于在 2023-2025 年期间,发射卫星,进一步扩展苹果 iPhone 系列的 SOS 卫星服务。
据报道,马斯克旗下社交平台𝕏(推特)日前调整了隐私政策,允许 𝕏 使用用户发布的信息来训练其人工智能(AI)模型。新的隐私政策将于 9 月 29 日生效。新政策规定,𝕏可能会使用所收集到的平台信息和公开可用的信息,来帮助训练 𝕏 的机器学习或人工智能模型。
9月2日,荣耀CEO赵明在采访中谈及华为手机回归时表示,替老同事们高兴,觉得手机行业,由于华为的回归,让竞争充满了更多的可能性和更多的魅力,对行业来说也是件好事。
《自然》30日发表的一篇论文报道了一个名为Swift的人工智能(AI)系统,该系统驾驶无人机的能力可在真实世界中一对一冠军赛里战胜人类对手。
近日,非营利组织纽约真菌学会(NYMS)发出警告,表示亚马逊为代表的电商平台上,充斥着各种AI生成的蘑菇觅食科普书籍,其中存在诸多错误。
社交媒体平台𝕏(原推特)新隐私政策提到:“在您同意的情况下,我们可能出于安全、安保和身份识别目的收集和使用您的生物识别信息。”
2023年德国柏林消费电子展上,各大企业都带来了最新的理念和产品,而高端化、本土化的中国产品正在不断吸引欧洲等国际市场的目光。
罗永浩日前在直播中吐槽苹果即将推出的 iPhone 新品,具体内容为:“以我对我‘子公司’的了解,我认为 iPhone 15 跟 iPhone 14 不会有什么区别的,除了序(列)号变了,这个‘不要脸’的东西,这个‘臭厨子’。