AbstractQueuedSynchronizer


AQS是什么(AbstractQueuedSynchronizer)

  • 用于构建锁或其他同步器组建的重量级基础框架及整个JUC体系的基石,通过内置的CLH(FIFO)队列的变种来完成资源获取线程的排队工作,将每条要抢占资源的线程封装成一个Node节点来实现锁的分配,一个int类变量表示持有锁的状态,通过CAS完成对state值的修改

FIFO是一CLH单向链表变体的虚拟双向队列

是可以给我们实现锁的一个框架,内部实现的关键就是维护了一个先进先出的队列以及state状态变量,先进先出队列存储的载体叫做Node节点,该节点标识着当前的状态值,是独占还是共享模式以及它的前驱和后继节点等信息。

总体流程:会把需要等待的线程以Node的形式放到这个先进先出的队列上,state变量则表示为当前锁的状态

  • AQS框架下的类 (ReentrantLock | CountDownLatch | ReentrantReadWriteLock | Semaphore)

以上的类都是通过内部类Sync继承AbstractQueuedSynchronizer 实现的

  • 加锁会导致阻塞,有阻塞就需要排队,排队必然需要队列

  • 如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的FIFO,将暂时获取不到的线程加入到队列中,这个队列就是AQS的抽象表现,共享资源封装为队列的结点,通过CAS、自旋以及LockSuport.park()方式,维护state变量的状态,使并发达到同步的效果。

加锁与解锁的流程

AQS内部体系架构

内部架构图

详解AQS内部代码

CLH队列,称为一个双向队列

内部结构(Node类详解)

AQS同步队列的基本结构

ReentrantLock解读AQS

业务图:

业务代码:


public class AQSDemo {
    public static void main(String[] args) {
        ReentrantLock lock = new ReentrantLock();
        //带入一个银行办理业务的案例来模拟我们的AQS如何进行线程的管理和通知唤醒机制
        //3个线程模拟3个来银行网点,受理窗口办理业务的顾客
        //A顾客就是第一个顾客,此时受理窗口没有任何人,A可以直接去办理
        new Thread(() -> {
                lock.lock();
                try{
                    System.out.println("-----A thread come in");

                    try { TimeUnit.MINUTES.sleep(20); }catch (Exception e) {e.printStackTrace();}
                }finally {
                    lock.unlock();
                }
        },"A").start();

        //第二个顾客,第二个线程---》由于受理业务的窗口只有一个(只能一个线程持有锁),此时B只能等待,
        //进入候客区
        new Thread(() -> {
            lock.lock();
            try{
                System.out.println("-----B thread come in");
            }finally {
                lock.unlock();
            }
        },"B").start();

        //第三个顾客,第三个线程---》由于受理业务的窗口只有一个(只能一个线程持有锁),此时C只能等待,
        //进入候客区
        new Thread(() -> {
            lock.lock();
            try{
                System.out.println("-----C thread come in");
            }finally {
                lock.unlock();
            }
        },"C").start();
    }
}


lock方法看公平锁与非公平锁

默认创建的是非公平锁

公平锁与非公平锁的差异:

可以看出公平锁与非公平锁lock()方法唯一区别就是公平锁在获取同步状态时多了一个限制条件:hasQueuedPredecessoros()
hasQueuedPredecessoros是公平锁加锁时判断等待队列是否存在有效节点的方法

lock()

1. lock.lock()源码

2. acquire()源码和3大流程走向

1. tryAcquire(arg)

1. 本次走非公平锁方向
2. nonfairTryAcquire(acquires)

return false(继续推进条件,走下一步方法addWaiter)
return true(结束)

addWaiter(Node.EXCLUSIVE)

1. addWaiter

图解:

2. enq(node)
3. B、C线程排好队效果如下图

acquireQueued(addWaiter(Node.EXCLUSIVE), arg)

1. acquireQueued会调用如下方法

(会调用如下方法:shouldParkAterFailedAcquire和parkAndCheckInterrupt | setHead(node) )

2. shouldParkAterFailedAcquire
3. parkAndCheckInterrupt

LockSupport.park() 挂起线程

4. 当我们执行下图中的③表示线程B或者C已经获取了permit了
5.setHead()

代码执行完成之后会出现下图所示

相当于队列中原本傀儡节点由于指向为空,就被GC,B线程替代变为傀儡节点

unlock()获取permit

1. release | tryRelease | unparkSuccessor(h);

2. tryRelease()

3. unparkSuccessor()

AQS源码总结

  1. 业务场景,比如说我们有三个线程A、B、C去银行办理业务了,A线程最先抢到执行权开始办理业务,那么B、C两个线程就在CLH队列里面排队如图所示,注意傀儡结点和B结点的状态都会改为-1
  1. 当A线程办理好业务,离开的时候,会把傀儡结点的waitStatus从-1改为0 将status从1改为0,将当前线程置为null
  2. 这个时候如果B上位,首先将status从0改为1(表示占用),把thread置为线程B 会执行如下图的①②③④,会触发GC,然后就把第一个灰色的傀儡结点给清除掉了,这个时候原来的B结点重新成为傀儡结点。

实现AQS同步类

  1. Sync类继承AbstractQueuedSynchronizer,并重写方法:tryAcquiretryReleaseisHeldEXclusively
  2. 这3个方法必须重写,如果不重写在使用的时候会抛异常UnsupportedOperationException
  3. 重写过程也比较简单,主要是使用AQS的CAS方法。以预期值为0,更新值为1,写入成功则获取锁成功。其实这个过程就是对state使用unsafe本地方法,传递偏移量stateOffset等参数,进行值交换操作。
  4. 最后提供lock与unlock两个方法,实际的类中会实现Lock接口中的相应方法,这里为了简化直接自定义这样两个方法

AQS核心源码分析

公平锁的实现原理

AQS有个队列,队首是Runnable状态的线程,后面都是Waiting状态的线程,公平锁是在cas修改state之前先判断队列里是否有Waiting状态的线程,如果有就将当前线程加到队尾

1. 获取锁流程图

2. lock

ReentrantLock实现了非公平锁,在调用lock.lock()时候,会有不同实现类

  • 非公平锁:会直接使用CAS抢占,修改变量state的值,如果成功则直接把自己的线程设置到exclusieOwnerThread(排他线程),也就是获得锁成功
  • 公平锁:不会进行抢占,规矩排队

3. acquire

这个代码包含四个方法的调用:

  1. tryAcquire:分别由继承AQS的公平锁和非公平锁实现;
  2. addWaiter:AQS的私有方法,主要用途是tryAcquire返回false后,也就是获取锁失败后,把当前请求锁的线程添加到队列中,并返回Node节点;
  3. acquireQueued:负责把addWaiter返回的Node节点添加到队列的后面,并会执行获取锁操作以及判断是否把当前线程挂起
  4. selfInterrupt:主要是执行完acquire之前自己执行中断操作

4. tryAcquire

这块代码包含两部分:

  1. 如果c==0,锁没有被占用,尝试用CAS方式获取锁,返回true
  2. 如果current == getExclusiveOwnerThread(),也就是当前线程持有锁,则需要调用setState进行锁重入操作。setState不需要枷锁,因为是在自己的当前线程下(可重入锁)
  3. 如果两种都不满足返回false

5. addWaiter

private Node addWaiter(Node mode) {
	Node node = new Node(Thread.currentThread(), mode); 
	Node pred = tail;
	// 如果队列不为空, 使用 CAS 方式将当前节点设为尾节点
	if (pred != null) 
	{
		node.prev = pred;
		if (compareAndSetTail(pred, node))
 		{
			pred.next = node;
			return node; 
		}
	}
	// 队列为空、CAS失败,将节点插入队列 
	enq(node);
	return node;
}
  • 当前执行方法addWaiter,那么就是!tryAcquire =true,也就是tryAcquire获取锁失败
  • 把当前节点封装到Node节点中,加入到FIFO队列中。因为先进先出,所以后来的队列加入到队尾
  • compareAndSetTail不一定成功,并发场景下,可能会出现操作失败,如果操作失败需要调用enq方法,该方法会自旋操作,把节点入队列

enq

  • 自旋for循环+CAS入队列
  • 队列为空会常见一个节点,把尾节点志向头节点,然后继续循环
  • 第二次循环,会把当前线程的节点添加到队尾。head节点是一个无用节点

注意,从尾节点逆向遍历

6. acquireQueued

当获取锁流程到这一步,说明节点已经加入队列完成。接下来就是让方法再次尝试获取锁,如果获取锁失败会判断是否把线程挂起。

setHead

private void setHead(Node node) 
{ 
	head = node;
	node.thread = null;
	node.prev = null; 
}

Head是一个虚节点,如果当前节点的前驱节点是Head节点,说明此时Node节点排在队列最前面,可以尝试获取锁。获取到锁之后设置Head节点,就是一个出队的过程,原来设置的节点设置Null方便GC。

shouldParkAfterFailedAcquire

  1. CANCELLED,取消排队,放弃获取锁。
  2. SIGNAL,标识当前节点的下一个节点状态已经被挂起,其实就是当前线程执行完毕后,需要额外执行唤醒后继节点操作。

    那么,以上这段代码主要的执行内容包括:

  3. 如果前一个节点状态是 SIGNAL,则返回 true。安心睡觉等着被叫醒
  4. 如果前一个节点状态是 CANCELLED,就是它放弃了,则继续向前寻找其他节 点。
  5. 最后如果什么都没找到,就给前一个节点设置个闹钟 SIGNAL,等着被通知。

parkAndCheckInterrupt

private final boolean parkAndCheckInterrupt() 
{
	//park方法真正加锁
	LockSupport.park(this);
	return Thread.interrupted(); 
}
  • 方法shouldParkAfterFailedAcquire返回false,执行parkAndCheckInterrupt方法
  • LockSupport.park(this);挂起线程操作
  • Thread.interrupted();检查线程中断标识



Enjoy Reading This Article?

Here are some more articles you might like to read next:

  • 2379. Minimum Recolors to Get K Consecutive Black Blocks
  • 2471. Minimum Number of Operations to Sort a Binary Tree by Level
  • 1387. Sort Integers by The Power Value
  • 2090. K Radius Subarray Averages
  • 2545. Sort the Students by Their Kth Score