首页 最新 热门 推荐

  • 首页
  • 最新
  • 热门
  • 推荐

从ReentrantLock理解AQS的原理及应用总结

  • 25-03-02 12:01
  • 2985
  • 13303
blog.csdn.net

目录

一、AQS基本概述

二、AQS应用介绍

(一)开发中的基本应用指导

(二)框架中的应用展示举例

(三)开发应用举例

三、ReentrantLock基本知识与AQS的联系

(一)ReentrantLock与Synchronized特性比较

(二)与AQS联系---Acquire方法

四、AQS原理分析

(一) 原理概览

架构图分析

AQS中队列:CLH变体的虚拟双向队列(FIFO)

 AQS数据结构

同步状态State

(二)AQS重要方法(以ReentrantLock关联分析)

(三)通过ReentrantLock理解AQS

线程加入等待队列

等待队列中线程出队列时机

CANCELLED状态节点生成

如何解锁

中断恢复后的执行流程

五、AQS原理总结

参考文献、书籍及链接


干货分享,感谢您的阅读!

一、AQS基本概述

AQS全称为AbstractQueuedSynchronizer,是Java中用于构建锁和同步器的框架性组件,它是Java并发包中ReentrantLock、Semaphore、ReentrantReadWriteLock等同步器的基础。AQS的设计思想是,在其内部维护了一个双向队列,用于管理请求锁的线程。当有线程请求锁时,AQS会将其封装成一个Node节点,并加入到等待队列中,线程则会进入阻塞状态。当持有锁的线程释放锁时,AQS会从等待队列中唤醒一个线程来获取锁,从而实现线程的同步和互斥。

AQS的主要特点包括:

  1. 支持独占模式和共享模式。独占模式下只允许一个线程持有锁,共享模式下可以允许多个线程同时持有锁。
  2. 内部维护了一个双向队列,用于管理请求锁的线程,队列中的节点是线程的封装。
  3. 通过CAS(Compare And Swap)操作实现状态的改变,状态可以是任意int类型的变量。
  4. 具有可重入性,即同一个线程可以多次获取同一把锁而不会出现死锁。

AQS的实现被广泛应用于Java并发包中的各种同步器,如ReentrantLock、ReentrantReadWriteLock、Semaphore、CountDownLatch等,为Java中的并发编程提供了基础设施。AQS为这些同步器提供了一个统一的基础框架,并且可以让开发人员基于此进行扩展和定制化,避免自己重复实现同步器的底层机制,从而更加专注于业务的实现。

二、AQS应用介绍

(一)开发中的基本应用指导

在开发中,我们可以利用AQS提供的同步机制来实现线程的协作和同步,从而达到线程安全的目的。以下是一些常见的开发应用场景:

  1. 使用ReentrantLock实现同步:ReentrantLock是一个可重入独占锁,可以使用它来实现线程的同步。
  2. 使用ReentrantReadWriteLock实现读写锁:ReentrantReadWriteLock是一个可重入的读写锁,可以使用它来实现对共享资源的读写操作。
  3. 使用Semaphore实现信号量:Semaphore是一种计数信号量,可以用来控制同时访问某个共享资源的线程个数。
  4. 使用CountDownLatch实现线程的等待和唤醒:CountDownLatch可以让某个线程等待其他线程执行完毕后再继续执行。
  5. 使用CyclicBarrier实现线程的协作:CyclicBarrier可以让一组线程相互等待,直到所有线程都到达某个状态后才会继续执行。
  6. 使用Condition实现线程的等待和唤醒:Condition是一种条件变量,可以让线程在某个条件满足时等待,直到另一个线程发出唤醒信号后才继续执行。
  7. ThreadPoolExecutor:Worker利用AQS同步状态实现对独占线程变量的设置(tryAcquire和tryRelease)。

除此之外,AQS还可以用来实现自定义的同步器,比如读写锁、可重入锁等。在开发中,我们可以根据具体的业务需求和线程安全要求,灵活地使用AQS提供的各种同步机制。

(二)框架中的应用展示举例

ReentrantLock的可重入性是AQS很好的应用之一,在ReentrantLock里面,不管是公平锁还是非公平锁,都有下面一段逻辑

  1. // java.util.concurrent.locks.ReentrantLock.FairSync#tryAcquire
  2. if (c == 0) {
  3. if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
  4. setExclusiveOwnerThread(current);
  5. return true;
  6. }
  7. } else if (current == getExclusiveOwnerThread()) {
  8. int nextc = c + acquires;
  9. if (nextc < 0)
  10. throw new Error("Maximum lock count exceeded");
  11. setState(nextc);
  12. return true;
  13. }
  14. // java.util.concurrent.locks.ReentrantLock.Sync#nonfairTryAcquire
  15. if (c == 0) {
  16. if (compareAndSetState(0, acquires)){
  17. setExclusiveOwnerThread(current);
  18. return true;
  19. }
  20. } else if (current == getExclusiveOwnerThread()) {
  21. int nextc = c + acquires;
  22. if (nextc < 0) // overflow
  23. throw new Error("Maximum lock count exceeded");
  24. setState(nextc);
  25. return true;
  26. }

可以看到,有一个同步状态State来控制整体可重入的情况。State是Volatile修饰的,用于保证一定的可见性和有序性。

  1. // java.util.concurrent.locks.AbstractQueuedSynchronizer
  2. private volatile int state;

State这个字段主要的过程:

  1. State初始化的时候为0,表示没有任何线程持有锁。
  2. 当有线程持有该锁时,值就会在原来的基础上+1,同一个线程多次获得锁时,就会多次+1,这里就是可重入的概念。
  3. 解锁也是对这个字段-1,一直到0,此线程对锁释放。

(三)开发应用举例

各公司封装各有不同,我们这里实现一个同步工具基本代码,极简方式:

  1. package org.zyf.javabasic.thread.toolstest.zyf;
  2. import java.util.concurrent.locks.AbstractQueuedSynchronizer;
  3. /**
  4. * @author yanfengzhang
  5. * @description
  6. * @date 2020/5/2 10:48
  7. */
  8. public class ZYFLock {
  9. private static class Sync extends AbstractQueuedSynchronizer {
  10. @Override
  11. protected boolean tryAcquire(int arg) {
  12. return compareAndSetState(0, 1);
  13. }
  14. @Override
  15. protected boolean tryRelease(int arg) {
  16. setState(0);
  17. return true;
  18. }
  19. @Override
  20. protected boolean isHeldExclusively() {
  21. return getState() == 1;
  22. }
  23. }
  24. private Sync sync = new Sync();
  25. public void lock() {
  26. sync.acquire(1);
  27. }
  28. public void unlock() {
  29. sync.release(1);
  30. }
  31. }

测试如下:

  1. package org.zyf.javabasic.thread.toolstest.zyf;
  2. /**
  3. * @author yanfengzhang
  4. * @description
  5. * @date 2020/5/2 10:50
  6. */
  7. public class ZYFLockTest {
  8. static int count = 0;
  9. static ZYFLock zyfLock = new ZYFLock();
  10. public static void main(String[] args) throws InterruptedException {
  11. Runnable runnable = new Runnable() {
  12. @Override
  13. public void run() {
  14. try {
  15. zyfLock.lock();
  16. for (int i = 0; i < 10000; i++) {
  17. count++;
  18. }
  19. } catch (Exception e) {
  20. e.printStackTrace();
  21. } finally {
  22. zyfLock.unlock();
  23. }
  24. }
  25. };
  26. Thread thread1 = new Thread(runnable);
  27. Thread thread2 = new Thread(runnable);
  28. thread1.start();
  29. thread2.start();
  30. thread1.join();
  31. thread2.join();
  32. //输入结果为20000
  33. System.out.println(count);
  34. }
  35. }

三、ReentrantLock基本知识与AQS的联系

(一)ReentrantLock与Synchronized特性比较

  1. // **************************Synchronized的使用方式**************************
  2. // 1.用于代码块
  3. synchronized (this) {}
  4. // 2.用于对象
  5. synchronized (object) {}
  6. // 3.用于方法
  7. public synchronized void test () {}
  8. // 4.可重入
  9. for (int i = 0; i < 100; i++) {
  10. synchronized (this) {}
  11. }
  12. // **************************ReentrantLock的使用方式**************************
  13. public void test () throw Exception {
  14. // 1.初始化选择公平锁、非公平锁
  15. ReentrantLock lock = new ReentrantLock(true);
  16. // 2.可用于代码块
  17. lock.lock();
  18. try {
  19. try {
  20. // 3.支持多种加锁方式,比较灵活; 具有可重入特性
  21. if(lock.tryLock(100, TimeUnit.MILLISECONDS)){ }
  22. } finally {
  23. // 4.手动释放锁
  24. lock.unlock()
  25. }
  26. } finally {
  27. lock.unlock();
  28. }
  29. }

ReentrantLock意思为可重入锁,指的是一个线程能够对一个临界资源重复加锁,其特性如下(与Synchronized进行比较):

对比内容ReentrantLockSynchronized
锁实现机制依赖AQS监视器模式
灵活性支持响应中断、超时、尝试获取锁不灵活
释放形式必须显示的调用unlock()释放锁自动释放监视器
锁类型公平锁&非公平锁非公平锁
条件队列可关联多个条件队列关联一个条件队列
可重入性可重入可重入

(二)与AQS联系---Acquire方法

ReentrantLock支持公平锁和非公平锁,并且ReentrantLock的底层就是由AQS来实现的,着重从这两者的加锁过程来理解与AQS之间的关系。

加锁流程代码如下:

  1. // java.util.concurrent.locks.ReentrantLock#NonfairSync
  2. // 非公平锁
  3. static final class NonfairSync extends Sync {
  4. ...
  5. final void lock() {
  6. if (compareAndSetState(0, 1))
  7. setExclusiveOwnerThread(Thread.currentThread());
  8. else
  9. acquire(1);
  10. }
  11. ...
  12. }
  13. // java.util.concurrent.locks.ReentrantLock#FairSync
  14. static final class FairSync extends Sync {
  15. ...
  16. final void lock() {
  17. acquire(1);
  18. }
  19. ...
  20. }

这块代码的含义的重点在于为获取锁失败,则进入Acquire方法进行后续处理。结合公平锁和非公平锁的加锁流程,虽然流程上有一定的不同,但是都调用了Acquire方法,而Acquire方法是FairSync和UnfairSync的父类AQS中的核心方法。

四、AQS原理分析

(一) 原理概览

架构图分析

通过下面的架构图来整体了解一下AQS框架,有颜色的为Method,无颜色的为Attribution,总的来说,AQS框架共分为五层,自上而下由浅入深,从AQS对外暴露的API到底层基础数据。

当有自定义同步器接入时,只需重写第一层所需要的部分方法即可,不需要关注底层具体的实现流程。当自定义同步器进行加锁或者解锁操作时,先经过第一层的API进入AQS内部方法,然后经过第二层进行锁的获取,接着对于获取锁失败的流程,进入第三层和第四层的等待队列处理,而这些处理方式均依赖于第五层的基础数据提供层。

AQS中队列:CLH变体的虚拟双向队列(FIFO)

AQS核心思想是,如果被请求的共享资源空闲,那么就将当前请求资源的线程设置为有效的工作线程,将共享资源设置为锁定状态;如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的是CLH队列(CLH:Craig、Landin and Hagersten队列,是单向链表)的变体实现的,将暂时获取不到锁的线程加入到队列中(AQS是通过将每条请求共享资源的线程封装成一个节点来实现锁的分配)。

主要原理图如下,AQS使用一个Volatile的int类型的成员变量来表示同步状态,通过内置的FIFO队列来完成资源获取的排队工作,通过CAS完成对State值的修改。

 AQS数据结构

 AQS中最基本的数据结构——Node(即为上面CLH变体队列中的节点),其基本源码如下:

  1. static final class Node {
  2. // 标识节点当前在共享模式下
  3. static final Node SHARED = new Node();
  4. // 标识节点当前在独占模式下
  5. static final Node EXCLUSIVE = null;
  6. // ======== 下面的几个int常量是给waitStatus用的 ===========
  7. /** waitStatus value to indicate thread has cancelled */
  8. // 代码此线程取消了争抢这个锁
  9. static final int CANCELLED = 1;
  10. /** waitStatus value to indicate successor's thread needs unparking */
  11. // 官方的描述是,其表示当前node的后继节点对应的线程需要被唤醒
  12. static final int SIGNAL = -1;
  13. /** waitStatus value to indicate thread is waiting on condition */
  14. // 本文不分析condition,所以略过吧,下一篇文章会介绍这个
  15. static final int CONDITION = -2;
  16. /**
  17. * waitStatus value to indicate the next acquireShared should
  18. * unconditionally propagate
  19. */
  20. // 同样的不分析,略过吧
  21. static final int PROPAGATE = -3;
  22. // =====================================================
  23. // 取值为上面的1、-1、-2、-3,或者0(以后会讲到)
  24. // 这么理解,暂时只需要知道如果这个值 大于0 代表此线程取消了等待,
  25. // ps: 半天抢不到锁,不抢了,ReentrantLock是可以指定timeouot的。。。
  26. volatile int waitStatus;
  27. // 前驱节点的引用
  28. volatile Node prev;
  29. // 后继节点的引用
  30. volatile Node next;
  31. // 这个就是线程本尊
  32. volatile Thread thread;
  33. //指向下一个处于CONDITION状态的节点
  34. Node nextWaiter;
  35. //返回前驱节点,没有的话抛出npe
  36. final Node predecessor() throws NullPointerException {
  37. Node p = prev;
  38. if (p == null)
  39. throw new NullPointerException();
  40. else
  41. return p;
  42. }
  43. }

根据结构代码,基本含义展示如下:

其中waitStatus有下面几个枚举值如下:

同步状态State

AQS中维护了一个名为state的字段,意为同步状态,是由Volatile修饰的,用于展示当前临界资源的获锁情况。

  1. // java.util.concurrent.locks.AbstractQueuedSynchronizer
  2. private volatile int state;

下面提供了几个访问这个字段的方法:

  1. //获取State的值
  2. protected final int getState()
  3. //设置State的值
  4. protected final void setState(int newState)
  5. //使用CAS方式更新State
  6. protected final boolean compareAndSetState(int expect, int update)

这几个方法都是final修饰的,说明子类中无法重写它们。我们可以通过修改State字段表示的同步状态来实现多线程的独占模式和共享模式(加锁过程)(对于我们自定义的同步工具,需要自定义获取同步状态和释放状态的方式)。

(二)AQS重要方法(以ReentrantLock关联分析)

从架构图中可以得知,AQS提供了大量用于自定义同步器实现的Protected方法。自定义同步器实现的相关方法也只是为了通过修改State字段来实现多线程的独占模式或者共享模式。部分展示如下:

一般来说,自定义同步器要么是独占方式,要么是共享方式,它们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock。ReentrantLock是独占锁,所以实现了tryAcquire-tryRelease。

以非公平锁为例,这里主要阐述一下非公平锁与AQS之间方法的关联之处,基本流程图如下:

以非公平锁为例,总结如下:

(三)通过ReentrantLock理解AQS

ReentrantLock公平锁和非公平锁在底层是相同的,以非公平锁为例分析。基本代码罗列如下:

  1. // java.util.concurrent.locks.ReentrantLock
  2. static final class NonfairSync extends Sync {
  3. ...
  4. final void lock() {
  5. if (compareAndSetState(0, 1))
  6. setExclusiveOwnerThread(Thread.currentThread());
  7. else
  8. acquire(1);
  9. }
  10. ...
  11. }
  12. // java.util.concurrent.locks.AbstractQueuedSynchronizer
  13. public final void acquire(int arg) {
  14. if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  15. selfInterrupt();
  16. }
  17. // java.util.concurrent.locks.AbstractQueuedSynchronizer
  18. protected boolean tryAcquire(int arg) {
  19. throw new UnsupportedOperationException();
  20. }

可以看出,这里只是AQS的简单实现,具体获取锁的实现方法是由各自的公平锁和非公平锁单独实现的(以ReentrantLock为例)。如果该方法返回了True,则说明当前线程获取锁成功,就不用往后执行了;如果获取失败,就需要加入到等待队列中。

线程加入等待队列

当执行Acquire(1)时,会通过tryAcquire获取锁。在这种情况下,如果获取锁失败,就会调用addWaiter加入到等待队列中去。获取锁失败后,会执行addWaiter(Node.EXCLUSIVE)加入等待队列,具体实现方法如下:

  1. // java.util.concurrent.locks.AbstractQueuedSynchronizer
  2. private Node addWaiter(Node mode) {
  3. Node node = new Node(Thread.currentThread(), mode);
  4. // Try the fast path of enq; backup to full enq on failure
  5. Node pred = tail;
  6. if (pred != null) {
  7. node.prev = pred;
  8. if (compareAndSetTail(pred, node)) {
  9. pred.next = node;
  10. return node;
  11. }
  12. }
  13. enq(node);
  14. return node;
  15. }
  16. private final boolean compareAndSetTail(Node expect, Node update) {
  17. return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
  18. }
  19. // java.util.concurrent.locks.AbstractQueuedSynchronizer
  20. static {
  21. try {
  22. stateOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
  23. headOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
  24. tailOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
  25. waitStatusOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("waitStatus"));
  26. nextOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("next"));
  27. } catch (Exception ex) {
  28. throw new Error(ex);
  29. }
  30. }
  31. // java.util.concurrent.locks.AbstractQueuedSynchronizer
  32. private Node enq(final Node node) {
  33. for (;;) {
  34. Node t = tail;
  35. if (t == null) { // Must initialize
  36. if (compareAndSetHead(new Node()))
  37. tail = head;
  38. } else {
  39. node.prev = t;
  40. if (compareAndSetTail(t, node)) {
  41. t.next = node;
  42. return t;
  43. }
  44. }
  45. }
  46. }

主要的流程如下:

  • 通过当前的线程和锁模式新建一个节点。
  • Pred指针指向尾节点Tail。
  • 将New中Node的Prev指针指向Pred。
  • 通过compareAndSetTail方法,完成尾节点的设置。这个方法主要是对tailOffset和Expect进行比较,如果tailOffset的Node和Expect的Node地址相同,那么设置Tail的值为Update的值。
  • 如果Pred指针是Null(说明等待队列中没有元素),或者当前Pred指针和Tail指向的位置不同(说明被别的线程已经修改),就需要看一下Enq的方法。
  • 如果没有被初始化,需要进行初始化一个头结点出来。但请注意,初始化的头结点并不是当前线程节点,而是调用了无参构造函数的节点。如果经历了初始化或者并发导致队列中有元素,则与之前的方法相同。其实,addWaiter就是一个在双端链表添加尾节点的操作,需要注意的是,双端链表的头结点是一个无参构造函数的头结点。

等待队列中线程出队列时机

一个线程获取锁失败了,被放入等待队列,acquireQueued会把放入队列中的线程不断去获取锁,直到获取成功或者不再需要获取(中断)。

从“何时出队列”和“如何出队?”两个方向来分析一下acquireQueued源码:

  1. // java.util.concurrent.locks.AbstractQueuedSynchronizer
  2. final boolean acquireQueued(final Node node, int arg) {
  3. // 标记是否成功拿到资源
  4. boolean failed = true;
  5. try {
  6. // 标记等待过程中是否中断过
  7. boolean interrupted = false;
  8. // 开始自旋,要么获取锁,要么中断
  9. for (;;) {
  10. // 获取当前节点的前驱节点
  11. final Node p = node.predecessor();
  12. // 如果p是头结点,说明当前节点在真实数据队列的首部,就尝试获取锁(别忘了头结点是虚节点)
  13. if (p == head && tryAcquire(arg)) {
  14. // 获取锁成功,头指针移动到当前node
  15. setHead(node);
  16. p.next = null; // help GC
  17. failed = false;
  18. return interrupted;
  19. }
  20. // 说明p为头节点且当前没有获取到锁(可能是非公平锁被抢占了)或者是p不为头结点,这个时候就要判断当前node是否要被阻塞(被阻塞条件:前驱节点的waitStatus为-1),防止无限循环浪费资源。具体两个方法下面细细分析
  21. if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
  22. interrupted = true;
  23. }
  24. } finally {
  25. if (failed)
  26. cancelAcquire(node);
  27. }
  28. }
  29. // java.util.concurrent.locks.AbstractQueuedSynchronizer
  30. // setHead方法是把当前节点置为虚节点,但并没有修改waitStatus,因为它是一直需要用的数据。
  31. private void setHead(Node node) {
  32. head = node;
  33. node.thread = null;
  34. node.prev = null;
  35. }
  36. // java.util.concurrent.locks.AbstractQueuedSynchronizer
  37. // 靠前驱节点判断当前线程是否应该被阻塞
  38. private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
  39. // 获取头结点的节点状态
  40. int ws = pred.waitStatus;
  41. // 说明头结点处于唤醒状态
  42. if (ws == Node.SIGNAL)
  43. return true;
  44. // 通过枚举值我们知道waitStatus>0是取消状态
  45. if (ws > 0) {
  46. do {
  47. // 循环向前查找取消节点,把取消节点从队列中剔除
  48. node.prev = pred = pred.prev;
  49. } while (pred.waitStatus > 0);
  50. pred.next = node;
  51. } else {
  52. // 设置前任节点等待状态为SIGNAL
  53. compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
  54. }
  55. return false;
  56. }
  57. // java.util.concurrent.locks.AbstractQueuedSynchronizer
  58. // parkAndCheckInterrupt主要用于挂起当前线程,阻塞调用栈,返回当前线程的中断状态。
  59. private final boolean parkAndCheckInterrupt() {
  60. LockSupport.park(this);
  61. return Thread.interrupted();
  62. }

上述方法的流程图如下:

从上图可以看出,跳出当前循环的条件是当“前置节点是头结点,且当前线程获取锁成功”。为了防止因死循环导致CPU资源被浪费,我们会判断前置节点的状态来决定是否要将当前线程挂起,具体挂起流程用流程图表示如下(shouldParkAfterFailedAcquire流程):

CANCELLED状态节点生成

对acquireQueued方法代码展开分析

  1. // java.util.concurrent.locks.AbstractQueuedSynchronizer
  2. final boolean acquireQueued(final Node node, int arg) {
  3. boolean failed = true;
  4. try {
  5. ...
  6. for (;;) {
  7. final Node p = node.predecessor();
  8. if (p == head && tryAcquire(arg)) {
  9. ...
  10. failed = false;
  11. ...
  12. }
  13. ...
  14. } finally {
  15. if (failed)
  16. cancelAcquire(node);
  17. }
  18. }
  19. // java.util.concurrent.locks.AbstractQueuedSynchronizer
  20. // 通过cancelAcquire方法,将Node的状态标记为CANCELLED
  21. private void cancelAcquire(Node node) {
  22. // 将无效节点过滤
  23. if (node == null)
  24. return;
  25. // 设置该节点不关联任何线程,也就是虚节点
  26. node.thread = null;
  27. Node pred = node.prev;
  28. // 通过前驱节点,跳过取消状态的node
  29. while (pred.waitStatus > 0)
  30. node.prev = pred = pred.prev;
  31. // 获取过滤后的前驱节点的后继节点
  32. Node predNext = pred.next;
  33. // 把当前node的状态设置为CANCELLED
  34. node.waitStatus = Node.CANCELLED;
  35. // 如果当前节点是尾节点,将从后往前的第一个非取消状态的节点设置为尾节点
  36. // 更新失败的话,则进入else,如果更新成功,将tail的后继节点设置为null
  37. if (node == tail && compareAndSetTail(node, pred)) {
  38. compareAndSetNext(pred, predNext, null);
  39. } else {
  40. int ws;
  41. // 如果当前节点不是head的后继节点,1:判断当前节点前驱节点的是否为SIGNAL,2:如果不是,则把前驱节点设置为SINGAL看是否成功
  42. // 如果1和2中有一个为true,再判断当前节点的线程是否为null
  43. // 如果上述条件都满足,把当前节点的前驱节点的后继指针指向当前节点的后继节点
  44. if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) {
  45. Node next = node.next;
  46. if (next != null && next.waitStatus <= 0)
  47. compareAndSetNext(pred, predNext, next);
  48. } else {
  49. // 如果当前节点是head的后继节点,或者上述条件不满足,那就唤醒当前节点的后继节点
  50. unparkSuccessor(node);
  51. }
  52. node.next = node; // help GC
  53. }
  54. }

当前的流程:

  • 获取当前节点的前驱节点,如果前驱节点的状态是CANCELLED,那就一直往前遍历,找到第一个waitStatus <= 0的节点,将找到的Pred节点和当前Node关联,将当前Node设置为CANCELLED。
  • 根据当前节点的位置,考虑以下三种情况:当前节点是尾节点;当前节点是Head的后继节点;当前节点不是Head的后继节点,也不是尾节点。

如何解锁

由于ReentrantLock在解锁的时候,并不区分公平锁和非公平锁,所以我们直接看解锁的源码展开分析:

  1. // java.util.concurrent.locks.ReentrantLock
  2. public void unlock() {
  3. sync.release(1);
  4. }
  5. // java.util.concurrent.locks.AbstractQueuedSynchronizer
  6. public final boolean release(int arg) {
  7. if (tryRelease(arg)) {
  8. Node h = head;
  9. if (h != null && h.waitStatus != 0)
  10. unparkSuccessor(h);
  11. return true;
  12. }
  13. return false;
  14. }
  15. // java.util.concurrent.locks.ReentrantLock.Sync
  16. // 方法返回当前锁是不是没有被线程持有
  17. protected final boolean tryRelease(int releases) {
  18. // 减少可重入次数
  19. int c = getState() - releases;
  20. // 当前线程不是持有锁的线程,抛出异常
  21. if (Thread.currentThread() != getExclusiveOwnerThread())
  22. throw new IllegalMonitorStateException();
  23. boolean free = false;
  24. // 如果持有线程全部释放,将当前独占锁所有线程设置为null,并更新state
  25. if (c == 0) {
  26. free = true;
  27. setExclusiveOwnerThread(null);
  28. }
  29. setState(c);
  30. return free;
  31. }
  32. // java.util.concurrent.locks.AbstractQueuedSynchronizer
  33. public final boolean release(int arg) {
  34. // 上边自定义的tryRelease如果返回true,说明该锁没有被任何线程持有
  35. if (tryRelease(arg)) {
  36. // 获取头结点
  37. Node h = head;
  38. // 头结点不为空并且头结点的waitStatus不是初始化节点情况,解除线程挂起状态
  39. if (h != null && h.waitStatus != 0)
  40. unparkSuccessor(h);
  41. return true;
  42. }
  43. return false;
  44. }
  45. // java.util.concurrent.locks.AbstractQueuedSynchronizer
  46. private void unparkSuccessor(Node node) {
  47. // 获取头结点waitStatus
  48. int ws = node.waitStatus;
  49. if (ws < 0)
  50. compareAndSetWaitStatus(node, ws, 0);
  51. // 获取当前节点的下一个节点
  52. Node s = node.next;
  53. // 如果下个节点是null或者下个节点被cancelled,就找到队列最开始的非cancelled的节点
  54. if (s == null || s.waitStatus > 0) {
  55. s = null;
  56. // 就从尾部节点开始找,到队首,找到队列第一个waitStatus<0的节点。
  57. for (Node t = tail; t != null && t != node; t = t.prev)
  58. if (t.waitStatus <= 0)
  59. s = t;
  60. }
  61. // 如果当前节点的下个节点不为空,而且状态<=0,就把当前节点unpark
  62. if (s != null)
  63. LockSupport.unpark(s.thread);
  64. }

综上所述,如果是从前往后找,由于极端情况下入队的非原子操作和CANCELLED节点产生过程中断开Next指针的操作,可能会导致无法遍历所有的节点。所以,唤醒对应的线程后,对应的线程就会继续往下执行。

中断恢复后的执行流程

唤醒后,会执行return Thread.interrupted();这个函数返回的是当前执行线程的中断状态,并清除。

  1. // java.util.concurrent.locks.AbstractQueuedSynchronizer
  2. private final boolean parkAndCheckInterrupt() {
  3. LockSupport.park(this);
  4. return Thread.interrupted();
  5. }

回到acquireQueued代码,当parkAndCheckInterrupt返回True或者False的时候,interrupted的值不同,但都会执行下次循环。如果这个时候获取锁成功,就会把当前interrupted返回。

  1. // java.util.concurrent.locks.AbstractQueuedSynchronizer
  2. final boolean acquireQueued(final Node node, int arg) {
  3. boolean failed = true;
  4. try {
  5. boolean interrupted = false;
  6. for (;;) {
  7. final Node p = node.predecessor();
  8. if (p == head && tryAcquire(arg)) {
  9. setHead(node);
  10. p.next = null; // help GC
  11. failed = false;
  12. return interrupted;
  13. }
  14. if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
  15. interrupted = true;
  16. }
  17. } finally {
  18. if (failed)
  19. cancelAcquire(node);
  20. }
  21. }

如果acquireQueued为True,就会执行selfInterrupt方法。

  1. // java.util.concurrent.locks.AbstractQueuedSynchronizer
  2. static void selfInterrupt() {
  3. Thread.currentThread().interrupt();
  4. }

该方法其实是为了中断线程。

五、AQS原理总结

AQS的原理可以简单概括为以下几点:

  1. AQS维护了一个FIFO队列,用于存储处于等待状态的线程。当一个线程需要获取某个资源时,如果该资源已被其他线程占用,则该线程会被加入到等待队列中,等待其他线程释放该资源。
  2. AQS使用一个state字段来表示资源的状态,state字段可以被多个线程同时访问。当一个线程需要获取某个资源时,它会先尝试修改state字段的值,以此来表示它已经获取了该资源。如果该资源已经被其他线程占用,则该线程会被加入到等待队列中,等待其他线程释放该资源。
  3. AQS中提供了两种模式:独占模式和共享模式。独占模式表示只有一个线程可以同时访问某个资源,比如ReentrantLock就是一个独占模式的同步器。共享模式则表示多个线程可以同时访问某个资源,比如Semaphore就是一个共享模式的同步器。
  4. AQS中还提供了一些钩子方法,这些方法可以被子类重写以实现自定义同步器的行为,比如acquire、release、tryAcquire等方法。

AQS的实现原理其实就是上述几个核心点的实现。对于AQS的使用者来说,只需要了解AQS的使用方法即可。而对于AQS的实现者来说,需要深入理解AQS的实现原理,并且了解AQS在不同场景下的具体应用,以便于在需要的时候能够根据具体需求来实现自定义的同步器。

参考文献、书籍及链接

1.从ReentrantLock的实现看AQS的原理及应用 - 美团技术团队

2.一行一行源码分析清楚AbstractQueuedSynchronizer_Javadoop

3.Java Concurrency: Understanding AQS: 这是一篇比较全面的介绍AQS的文章,包括AQS的设计原理、使用场景以及实现原理等方面的内容。链接:https://dzone.com/articles/java-concurrency-understanding-aqs

4.AbstractQueuedSynchronizer: 这是AQS官方文档,其中包含了AQS的详细说明和使用方法,可以作为AQS的参考手册。链接:AbstractQueuedSynchronizer (Java SE 11 & JDK 11 )

5.AQS学习笔记: 这是一篇比较系统地介绍AQS的博客文章,从AQS的基本概念、内部实现以及使用方法等方面进行了详细的阐述。链接:https://www.cnblogs.com/zhangxiaoliang/p/6208458.html

6.Java并发编程之AQS详解: 这是一篇非常详细的介绍AQS的博客文章,从AQS的设计原理、使用场景、内部实现以及ReentrantLock和ReentrantReadWriteLock的实现等方面进行了深入的讲解。链接:https://www.cnblogs.com/nullzx/p/7691638.html

7.Java 并发编程之 AQS 原理: 这是一篇非常清晰易懂的介绍AQS的博客文章,从AQS的基本概念、内部实现以及使用方法等方面进行了讲解,并通过图文并茂的方式帮助读者更好地理解AQS的实现原理。链接:https://segmentfault.com/a/1190000017199508

8.聊聊 Java 的几把 JVM 级锁_jvm层次的锁都有哪些-CSDN博客

文章知识点与官方知识档案匹配,可进一步学习相关知识
Java技能树首页概览148946 人正在系统学习中
注:本文转载自blog.csdn.net的张彦峰ZYF的文章"https://zyfcodes.blog.csdn.net/article/details/106081077"。版权归原作者所有,此博客不拥有其著作权,亦不承担相应法律责任。如有侵权,请联系我们删除。
复制链接
复制链接
相关推荐
发表评论
登录后才能发表评论和回复 注册

/ 登录

评论记录:

未查询到任何数据!
回复评论:

分类栏目

后端 (14832) 前端 (14280) 移动开发 (3760) 编程语言 (3851) Java (3904) Python (3298) 人工智能 (10119) AIGC (2810) 大数据 (3499) 数据库 (3945) 数据结构与算法 (3757) 音视频 (2669) 云原生 (3145) 云平台 (2965) 前沿技术 (2993) 开源 (2160) 小程序 (2860) 运维 (2533) 服务器 (2698) 操作系统 (2325) 硬件开发 (2492) 嵌入式 (2955) 微软技术 (2769) 软件工程 (2056) 测试 (2865) 网络空间安全 (2948) 网络与通信 (2797) 用户体验设计 (2592) 学习和成长 (2593) 搜索 (2744) 开发工具 (7108) 游戏 (2829) HarmonyOS (2935) 区块链 (2782) 数学 (3112) 3C硬件 (2759) 资讯 (2909) Android (4709) iOS (1850) 代码人生 (3043) 阅读 (2841)

热门文章

101
推荐
关于我们 隐私政策 免责声明 联系我们
Copyright © 2020-2025 蚁人论坛 (iYenn.com) All Rights Reserved.
Scroll to Top