AbstractQueuedSynchronizer
AQS是什么(AbstractQueuedSynchronizer)
- 用于构建锁或其他同步器组建的重量级基础框架及整个JUC体系的基石,通过内置的CLH(FIFO)队列的变种来完成资源获取线程的排队工作,将每条要抢占资源的线程封装成一个Node节点来实现锁的分配,一个int类变量表示持有锁的状态,通过CAS完成对state值的修改。
FIFO是一CLH单向链表变体的虚拟双向队列
是可以给我们实现锁的一个框架,内部实现的关键就是维护了一个先进先出的队列以及state状态变量,先进先出队列存储的载体叫做Node节点,该节点标识着当前的状态值,是独占还是共享模式以及它的前驱和后继节点等信息。
总体流程:会把需要等待的线程以Node的形式放到这个先进先出的队列上,state变量则表示为当前锁的状态
- AQS框架下的类 (ReentrantLock | CountDownLatch | ReentrantReadWriteLock | Semaphore)
以上的类都是通过内部类Sync继承AbstractQueuedSynchronizer 实现的
-
加锁会导致阻塞,有阻塞就需要排队,排队必然需要队列
-
如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的FIFO,将暂时获取不到的线程加入到队列中,这个队列就是AQS的抽象表现,共享资源封装为队列的结点,通过CAS、自旋以及LockSuport.park()方式,维护state变量的状态,使并发达到同步的效果。
加锁与解锁的流程
AQS内部体系架构
内部架构图
详解AQS内部代码
CLH队列,称为一个双向队列
内部结构(Node类详解)
AQS同步队列的基本结构
ReentrantLock解读AQS
业务图:
业务代码:
public class AQSDemo {
public static void main(String[] args) {
ReentrantLock lock = new ReentrantLock();
//带入一个银行办理业务的案例来模拟我们的AQS如何进行线程的管理和通知唤醒机制
//3个线程模拟3个来银行网点,受理窗口办理业务的顾客
//A顾客就是第一个顾客,此时受理窗口没有任何人,A可以直接去办理
new Thread(() -> {
lock.lock();
try{
System.out.println("-----A thread come in");
try { TimeUnit.MINUTES.sleep(20); }catch (Exception e) {e.printStackTrace();}
}finally {
lock.unlock();
}
},"A").start();
//第二个顾客,第二个线程---》由于受理业务的窗口只有一个(只能一个线程持有锁),此时B只能等待,
//进入候客区
new Thread(() -> {
lock.lock();
try{
System.out.println("-----B thread come in");
}finally {
lock.unlock();
}
},"B").start();
//第三个顾客,第三个线程---》由于受理业务的窗口只有一个(只能一个线程持有锁),此时C只能等待,
//进入候客区
new Thread(() -> {
lock.lock();
try{
System.out.println("-----C thread come in");
}finally {
lock.unlock();
}
},"C").start();
}
}
lock方法看公平锁与非公平锁
默认创建的是非公平锁
公平锁与非公平锁的差异:
可以看出公平锁与非公平锁lock()方法唯一区别就是公平锁在获取同步状态时多了一个限制条件:hasQueuedPredecessoros()
hasQueuedPredecessoros是公平锁加锁时判断等待队列是否存在有效节点的方法
lock()
1. lock.lock()源码
2. acquire()源码和3大流程走向
1. tryAcquire(arg)
1. 本次走非公平锁方向
2. nonfairTryAcquire(acquires)
return false(继续推进条件,走下一步方法addWaiter)
return true(结束)
addWaiter(Node.EXCLUSIVE)
1. addWaiter
图解:
2. enq(node)
3. B、C线程排好队效果如下图
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
1. acquireQueued会调用如下方法
(会调用如下方法:shouldParkAterFailedAcquire和parkAndCheckInterrupt | setHead(node) )
2. shouldParkAterFailedAcquire
3. parkAndCheckInterrupt
LockSupport.park() 挂起线程
4. 当我们执行下图中的③表示线程B或者C已经获取了permit了
5.setHead()
代码执行完成之后会出现下图所示
相当于队列中原本傀儡节点由于指向为空,就被GC,B线程替代变为傀儡节点
unlock()获取permit
1. release | tryRelease | unparkSuccessor(h);
2. tryRelease()
3. unparkSuccessor()
AQS源码总结
- 业务场景,比如说我们有三个线程A、B、C去银行办理业务了,A线程最先抢到执行权开始办理业务,那么B、C两个线程就在CLH队列里面排队如图所示,注意傀儡结点和B结点的状态都会改为-1
-
当A线程办理好业务,离开的时候,会把傀儡结点的waitStatus从-1改为0 将status从1改为0,将当前线程置为null -
这个时候如果B上位,首先将status从0改为1(表示占用),把thread置为线程B 会执行如下图的①②③④,会触发GC,然后就把第一个灰色的傀儡结点给清除掉了,这个时候原来的B结点重新成为傀儡结点。
实现AQS同步类
- Sync类继承
AbstractQueuedSynchronizer
,并重写方法:tryAcquire
、tryRelease
、isHeldEXclusively
; - 这3个方法必须重写,如果不重写在使用的时候会抛异常
UnsupportedOperationException
- 重写过程也比较简单,主要是使用AQS的CAS方法。以预期值为0,更新值为1,写入成功则获取锁成功。其实这个过程就是对state使用unsafe本地方法,传递偏移量stateOffset等参数,进行值交换操作。
- 最后提供lock与unlock两个方法,实际的类中会实现Lock接口中的相应方法,这里为了简化直接自定义这样两个方法
AQS核心源码分析
公平锁的实现原理
AQS有个队列,队首是Runnable状态的线程,后面都是Waiting状态的线程,公平锁是在cas修改state之前先判断队列里是否有Waiting状态的线程,如果有就将当前线程加到队尾
1. 获取锁流程图
2. lock
ReentrantLock实现了非公平锁,在调用lock.lock()
时候,会有不同实现类
- 非公平锁:会直接使用CAS抢占,修改变量state的值,如果成功则直接把自己的线程设置到exclusieOwnerThread(排他线程),也就是获得锁成功
- 公平锁:不会进行抢占,规矩排队
3. acquire
这个代码包含四个方法的调用:
- tryAcquire:分别由继承AQS的公平锁和非公平锁实现;
- addWaiter:AQS的私有方法,主要用途是tryAcquire返回false后,也就是获取锁失败后,把当前请求锁的线程添加到队列中,并返回Node节点;
- acquireQueued:负责把addWaiter返回的Node节点添加到队列的后面,并会执行获取锁操作以及判断是否把当前线程挂起
- selfInterrupt:主要是执行完acquire之前自己执行中断操作
4. tryAcquire
这块代码包含两部分:
- 如果
c==0
,锁没有被占用,尝试用CAS方式获取锁,返回true - 如果
current == getExclusiveOwnerThread()
,也就是当前线程持有锁,则需要调用setState
进行锁重入操作。setState不需要枷锁,因为是在自己的当前线程下(可重入锁) - 如果两种都不满足返回false
5. addWaiter
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
// 如果队列不为空, 使用 CAS 方式将当前节点设为尾节点
if (pred != null)
{
node.prev = pred;
if (compareAndSetTail(pred, node))
{
pred.next = node;
return node;
}
}
// 队列为空、CAS失败,将节点插入队列
enq(node);
return node;
}
- 当前执行方法
addWaiter
,那么就是!tryAcquire =true
,也就是tryAcquire获取锁失败 - 把当前节点封装到Node节点中,加入到FIFO队列中。因为先进先出,所以后来的队列加入到队尾
-
compareAndSetTail
不一定成功,并发场景下,可能会出现操作失败,如果操作失败需要调用enq
方法,该方法会自旋操作,把节点入队列
enq
- 自旋for循环+CAS入队列
- 队列为空会常见一个节点,把尾节点志向头节点,然后继续循环
- 第二次循环,会把当前线程的节点添加到队尾。head节点是一个无用节点
注意,从尾节点逆向遍历
6. acquireQueued
当获取锁流程到这一步,说明节点已经加入队列完成。接下来就是让方法再次尝试获取锁,如果获取锁失败会判断是否把线程挂起。
setHead
private void setHead(Node node)
{
head = node;
node.thread = null;
node.prev = null;
}
Head是一个虚节点,如果当前节点的前驱节点是Head节点,说明此时Node节点排在队列最前面,可以尝试获取锁。获取到锁之后设置Head节点,就是一个出队的过程,原来设置的节点设置Null方便GC。
shouldParkAfterFailedAcquire
- CANCELLED,取消排队,放弃获取锁。
-
SIGNAL,标识当前节点的下一个节点状态已经被挂起,其实就是当前线程执行完毕后,需要额外执行唤醒后继节点操作。
那么,以上这段代码主要的执行内容包括:
- 如果前一个节点状态是 SIGNAL,则返回 true。安心睡觉等着被叫醒
- 如果前一个节点状态是 CANCELLED,就是它放弃了,则继续向前寻找其他节 点。
- 最后如果什么都没找到,就给前一个节点设置个闹钟 SIGNAL,等着被通知。
parkAndCheckInterrupt
private final boolean parkAndCheckInterrupt()
{
//park方法真正加锁
LockSupport.park(this);
return Thread.interrupted();
}
- 方法
shouldParkAfterFailedAcquire
返回false,执行parkAndCheckInterrupt方法 -
LockSupport.park(this);
挂起线程操作 -
Thread.interrupted();
检查线程中断标识
Enjoy Reading This Article?
Here are some more articles you might like to read next: