首页 最新 热门 推荐

  • 首页
  • 最新
  • 热门
  • 推荐

AQS源码导读

  • 23-11-17 19:42
  • 4078
  • 8489
blog.csdn.net

文章目录

  • 前言
  • AQS基础架构
  • ReentrantLock.lock()做了什么?
  • ReentrantLock.unlock()做了什么?
  • 问题
      • 1.工作线程什么时候出队?
      • 2.AQS唤醒队列的规则是什么?
      • 3.唤醒节点为什么要从尾巴开始?
      • 4.其他问题
  • 尾巴


前言

AQS全称:AbstractQueuedSynchronizer,抽象的队列同步器,和synchronized不同的是,它是使用Java编写实现的一个同步器,开发者可以基于它进行功能的增强和扩展。

AQS堪称J.U.C包的半壁江山,很多并发工具类都是使用AQS来实现的,例如:ReentrantLock、Semaphore、CountDownLatch等。

使用synchronized实现同步的原理是:每个Java锁对象都有一个对应的Monitor对象(对象监视器),Monitor对象是由C++实现的。对象监视器维护了一个变量Owner,指向的是当前持有锁的线程,还维护了一个Entry List集合,存放的是竞争锁失败的线程。线程在这个集合里会被挂起休眠,直到Owner线程释放锁,JVM才去Entry List集合中唤醒线程来继续竞争锁,循环往复。

AQS的任务就是使用Java代码的方式,去完成synchronized中由C代码实现的功能。Java开发者不一定熟悉C语言,要读懂synchronized的源码实现并非易事。不过好在JDK提供了AQS,通过阅读AQS的源码也能让你对并发有更深的理解。

AQS的核心思想是:通过一个volatile修饰的int属性state代表同步状态,例如0是无锁状态,1是上锁状态。多线程竞争资源时,通过CAS的方式来修改state,例如从0修改为1,修改成功的线程即为资源竞争成功的线程,将其设为exclusiveOwnerThread,也称【工作线程】,资源竞争失败的线程会被放入一个FIFO的队列中并挂起休眠,当exclusiveOwnerThread线程释放资源后,会从队列中唤醒线程继续工作,循环往复。
逻辑是不是和synchronized底层差不多?对吧。

理论说的差不多了,本篇文章就通过ReentrantLock结合AbstractQueuedSynchronizer,通过阅读源码的方式来看一下AQS到底是如何工作的,顺便膜拜一下Doug Lea大佬。


AQS基础架构

阅读源码前,先来简单了解一下AQS的架构:
在这里插入图片描述
架构还是比较简单的,除了实现Serializable接口外,就只继承了AbstractOwnableSynchronizer父类。
AbstractOwnableSynchronizer父类中维护了一个exclusiveOwnerThread属性,是用来记录当前同步器资源的独占线程的,没有其他东西。

AQS有一个内部类Node,AQS会将竞争锁失败的线程封装成一个Node节点,Node类有prev和next属性,分别指向前驱节点和后继节点,形成一个双向链表的结构。除此之外,每个Node节点还有一个被volatile修饰的int变量waitStatus,它代表的是节点的等待状态,有如下几种值:

  1. 0:新建节点的默认值。
  2. SIGNAL(-1):表示后继结点在等待当前结点唤醒。
  3. CONDITION(-2):表示结点等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。
  4. PROPAGATE(-3):共享模式下,前驱节点会唤醒所有的后继节点。
  5. CANCELLED(1):取消竞争资源,锁超时或发生中断才会触发。

可以看到,waitStatus是以0为临界值的,大于0代表节点无效,例如AQS在唤醒队列中的节点时,waitStatus大于0的节点会被跳过。

AQS内部还维护了int类型的state变量,代表同步器的状态。例如,在ReentrantLock中,state就代表锁的重入次数,每lock一次,state就+1,每unlock一次,state就-1,当state等于0时,代表没有上锁。

AQS内部还维护了head和tail属性,用来指向FIFO队列中的头尾节点,被head指向的节点,总是工作线程。线程在获取到锁后,是不会出队的。只有当head释放锁,并将其后继节点唤醒并设为head后,才会出队。


ReentrantLock.lock()做了什么?

示例程序:开启三个线程:A、B、C,按顺序依次调用lock()方法,这期间到底发生了什么???

1、刚开始没有任何线程竞争锁,AQS内部结构是这样的:
在这里插入图片描述
2、线程A调用lock()方法:
在这里插入图片描述
其实是交给sync对象去上锁了,Sync类就是一个继承了AQS的类。

ReentrantLock默认采用的是非公平锁,不管队列中是否有等待线程,上来直接就尝试利用CAS抢锁,如果抢成功了,就将当前线程设为exclusiveOwnerThread并返回。
如果没有成功,则调用acquire(1)去获取锁。

// 非公平锁的lock,上来直接就抢锁,不管队列中有没有线程在等待。
final void lock() {
	// CAS的方式将修改state,如果修改成功,表示没有其他线程持有锁,将当前线程设为独占锁的持有者
	if (compareAndSetState(0, 1))
		setExclusiveOwnerThread(Thread.currentThread());
	else
		// CAS失败,代表其他线程已经持有锁了,此时去竞争锁
		acquire(1);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

公平锁就显得非常有礼貌,上来先询问队列中是否有线程在等待,如果有,则让它们先获取,自己入队等待。

final void lock() {
	acquire(1);
}

// 公平锁-获取锁
protected final boolean tryAcquire(int acquires) {
	final Thread current = Thread.currentThread();
	int c = getState();
	if (c == 0) {
		/*
		即使当前是无锁状态,也要判断队列中是否有线程已经在等待了。
		如果有其他线程在等待,要让其他线程先获取锁,自己入队挂起。
		如果队列中无线程,则尝试CAS竞争。
		 */
		if (!hasQueuedPredecessors() &&
				compareAndSetState(0, acquires)) {
			setExclusiveOwnerThread(current);
			return true;
		}
	}
	// 当前线程就是持有锁的线程,重入即可
	else if (current == getExclusiveOwnerThread()) {
		int nextc = c + acquires;
		if (nextc < 0)// 重入次数过多,溢出了
			throw new Error("Maximum lock count exceeded");
		setState(nextc);
		return true;
	}
	return false;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

不管是公平锁还是非公平锁,线程A此时就可以获取到锁并返回了,此时AQS的内部结构如下:
在这里插入图片描述

假设线程A还未释放锁,线程B调用lock(),竞争锁失败,则调用acquire(1)去获取锁,这是AQS的模板方法。

/*
竞争锁的流程:
1.tryAcquire():再次尝试去获取锁。
2.addWaiter():如果还获取不到,在队列的尾巴添加一个Node。
3.acquireQueued():去排队。
 */
public final void acquire(int arg) {
	if (!tryAcquire(arg) &&
			acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
		// 是否需要进行一次自我中断,来补上线程等待期间发生的中断。
		selfInterrupt();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

在acquire()方法中,首先会调用tryAcquire()去尝试获取锁,如果获取不到,则通过addWaiter()将当前线程封装为一个Node节点入队,再调用acquireQueued()去排队。
这里有一点需要注意,AQS在排队的过程中,是不响应中断的,如果排队期间发生了中断,只能等排队结束后,AQS自动补上一个自我中断:selfInterrupt()。

非公平锁尝试获取锁的流程如下:

// 非公平锁尝试获取锁
final boolean nonfairTryAcquire(int acquires) {
	final Thread current = Thread.currentThread();
	int c = getState();
	if (c == 0) {
		// state==0,代表其他线程已经释放锁了,再次CAS的方式修改state,成功则代表抢到锁,返回。
		if (compareAndSetState(0, acquires)) {
			setExclusiveOwnerThread(current);
			return true;
		}
	}
	// 其他线程还没释放锁,判断持有锁的线程是否是当前线程,如果是,则重入,state++。
	// 可重入锁,state就代表锁重入的次数,0说明锁释放了。
	else if (current == getExclusiveOwnerThread()) {
		int nextc = c + acquires;
		if (nextc < 0) // 锁不可能无限重入,重入的次数超过了int最大值后,就会抛异常。
			throw new Error("Maximum lock count exceeded");
		setState(nextc);
		return true;
	}
	// 其他线程没释放锁,当前线程又不是持有锁的线程,则抢锁失败。
	return false;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

由于线程A没有释放锁,且线程B不是锁的持有线程,因此tryAcquire()会返回false。

尝试获取锁失败,则开始创建Node节点,并入队。addWaiter代码如下:

// 如果尝试获取锁失败,则入队。
private Node addWaiter(Node mode) {
	// 创建一个和当前线程绑定的Node节点
	Node node = new Node(Thread.currentThread(), mode);
	Node pred = tail;
	if (pred != null) {
		/*
		如果tail不为null,则通过CAS的方式将tail指向当前Node。如果失败,会调用enq()重试。
		1.当前节点的prev指向前任tail
		2.CAS将tail指向当前节点
		3.前任tail的next指向当前节点
		这三个步骤不是原子的,如果执行到第2步时间片到期,持有锁的线程释放锁唤醒节点时,
		如果从head向tail找,此时前任tail节点的next还是null,会存在漏唤醒问题。
		而prev的赋值先于CAS执行,所以在唤醒队列时,从tail向head找就没问题了。
		 */
		node.prev = pred;
		if (compareAndSetTail(pred, node)) {
			pred.next = node;
			return node;
		}
	}
	// 如果CAS入队失败,则自旋重试
	enq(node);
	return node;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

如果tail不为null,则将当前节点的prev指向现任tail,再通过CAS的方式将tail指向当前节点,最后前任tail的next指向当前节点即可。
这里有一点需要注意:

  1. 当前节点的prev指向前任tail
  2. CAS将tail指向当前节点
  3. 前任tail的next指向当前节点

这三个步骤不是原子的,如果执行到第2步时间片到期,持有锁的线程释放锁唤醒节点时,如果从head向tail找,此时前任tail节点的next还是null,会存在漏唤醒问题。而prev的赋值先于CAS执行,所以在唤醒队列时,从tail向head找就没问题了。

如果CAS入队失败也没关系,下面会调用enq()进行自旋重试,直到成功为止:

// CAS入队失败,自旋重试
private Node enq(final Node node) {
	for (;;) { // 这比while(true)好在哪里???
		Node t = tail;
		if (t == null) {
			// tail==null,说明队列是空的,做初始化。
			// 新建一个节点,head和tail都指向它。再进循环时,tail就不为null了。
			if (compareAndSetHead(new Node()))
				tail = head;
		} else {
			/*
			队列不为空
			1.当前节点的prev指向前任tail
			2.CAS将tail指向当前节点
			3.前任tail的next指向当前节点
			不断重试,直到成功为止
			 */
			node.prev = t;
			if (compareAndSetTail(t, node)) {
				t.next = node;
				return t;
			}
		}
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

如果tail为null,说明队列还没初始化,这时会创建一个Node节点,head和tail都指向这个空节点。再次循环时,由于已经初始化了,进入else逻辑,还是执行那三个步骤。

线程B入队后,此时AQS的内部结构如下:
在这里插入图片描述

节点B成功入队后,就是排队的操作了。线程B是继续竞争还是Park挂起呢?

// Node入队后,开始排队
final boolean acquireQueued(final Node node, int arg) {
	boolean failed = true;// 是否获取锁失败
	try {
		/*
		线程等待的过程中是不响应中断的,如果期间发生中断,
		则必须等到线程抢到锁后进行自我中断:selfInterrupt()。
		 */
		boolean interrupted = false;//获取锁的过程中是否发生中断。
		for (;;) {
			final Node p = node.predecessor();// 获取当前节点的前驱节点
			/*
			如果自己的前驱是head,自己就有资格去抢锁,有两种情况:
			1、作为第一个节点入队。
			2、head释放锁了,唤醒了当前节点。
			 */
			if (p == head && tryAcquire(arg)) {
				/*
				如果抢锁成功,说明是head释放锁并唤醒了当前节点。
				将head指向当前节点,failed = false表示成功获取到锁。
				 */
				setHead(node);
				p.next = null; // help GC
				failed = false;
				return interrupted;
			}
			// 竞争锁失败,判断是否需要挂起当前线程
			if (shouldParkAfterFailedAcquire(p, node) &&
					parkAndCheckInterrupt())
				interrupted = true;
		}
	} finally {
		// 获取不到锁就挂起线程,不断死循环,因此只要不出意外,最终一定能获取到锁。
		// 如果没有获取到锁就跳出循环了,说明线程不想竞争了,例如:锁超时。
		// 此时需要修改
		if (failed)
			cancelAcquire(node);
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39

如果当前节点的前驱节点是head,则代表当前节点拥有竞争锁的资格。分两种情况:

  1. 作为第一个节点入队。
  2. head释放锁了,唤醒了当前节点。

此时线程B并不是被线程A唤醒了,而是第一种情况,线程B会再次尝试获取锁,但是由于线程A还没释放,因此会失败。
线程B获取锁失败后,会执行shouldParkAfterFailedAcquire(),判断是否应该被Park挂起。

// 线程竞争锁失败是否要挂起
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
	int ws = pred.waitStatus;
	if (ws == Node.SIGNAL)
		// 如果前驱节点的waitStatus=-1,当前节点就可以安心挂起了。
		return true;
	if (ws > 0) {
		// waitStatus以0为分界点。0是默认值,小于0代表节点为有效状态,大于0代表节点无效,例如:被取消了。
		// 如果前驱节点无效,就继续向前找,直到找到有效节点,并将其next指向自己。中间无效的节点会被GC回收。
		do {
			node.prev = pred = pred.prev;
		} while (pred.waitStatus > 0);
		pred.next = node;
	} else {
		// CAS的方式将前驱节点的waitStatus改为-1,代表当前节点在等待被前驱节点唤醒。
		compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
	}
	return false;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

线程被Park挂起的前提条件是:必须将前驱节点的waitStatus设为SIGNAL(-1),这样当前驱节点释放锁时,才会唤醒后继节点。

由于此时head节点的waitStatus等于0,不满足条件,所以线程B会尝试使用CAS的方式将其改为SIGNAL,且这一次不会线程B不会被Park。
此时,AQS的内部结构是:
在这里插入图片描述

再次循环,由于线程A还没释放锁,线程B在此获取锁失败,再次执行shouldParkAfterFailedAcquire(),此时前驱节点的waitStatus已经是SIGNAL(-1)了,所以线程B可以安心Park了,返回true。

shouldParkAfterFailedAcquire()返回true代表需要将线程B挂起,因此会执行parkAndCheckInterrupt():

// 挂起当前线程,等待被前驱节点唤醒
private final boolean parkAndCheckInterrupt() {
	LockSupport.park(this);
	// 阻塞过程中不响应中断,期间如果发生了中断,则补上自我中断:selfInterrupt()。
	return Thread.interrupted();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

挂起的过程就很简单了,调用了LockSupport.park()方法。前面已经说过,AQS排队的过程是不响应中断的,如果期间发生了中断,只能等待线程被唤醒后,补上自我中断,所以这里会返回线程的一个中断标志。

线程B现在已经被Park挂起了,只能等待线程A的唤醒才能继续运行。

acquireQueued()方法中,有一个finally语句块,它的作用是,如果线程没有获取到锁就退出了循环,说明线程获取锁超时或者发生中断了,那么节点就无效了,需要将它出队,调用cancelAcquire():

// 节点取消竞争锁
private void cancelAcquire(Node node) {
	// 忽略不存在的节点
	if (node == null)
		return;

	node.thread = null;//取消绑定的线程

	// 往前找,跳过CANCELLED的节点
	Node pred = node.prev;
	while (pred.waitStatus > 0)
		node.prev = pred = pred.prev;

	Node predNext = pred.next;

	// 将当前节点设为CANCELLED,这样即使出队失败也没关系,唤醒节点时会跳过CANCELLED的节点
	node.waitStatus = Node.CANCELLED;

	if (node == tail && compareAndSetTail(node, pred)) {
		/*
		如果node是尾巴,就使用CAS将tail指向前驱节点,当前节点直接出队。
		出队成功,将tail的next置空。
		 */
		compareAndSetNext(pred, predNext, null);
	} else {
		/*
		如果前面的操作失败了,有两种情况:
		1.当前node原来是尾巴,取消过程中有新节点插入,现已不是尾巴。
		2.当前node原来就是中间节点。

		当前node是中间节点的话,就需要做两件事:
		1.修改前驱节点的waitStatus为SIGNAL,让其释放锁后记得唤醒后继节点。
		2.将前驱节点的next指向后继节点,当前node出列。

		出列的过程是允许失败的,即使没有出列,只要node的waitStatus设为CANCELLED,
		head在唤醒后继节点时也会跳过CANCELLED的节点。

		修改前驱节点为SIGNAL的过程也是允许失败的,只要失败了就会唤醒node的后继节点,
		让后继节点自己去修改前驱节点为SIGNAL。
		 */
		int ws;
		if (pred != head &&
				((ws = pred.waitStatus) == Node.SIGNAL ||
						(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
				pred.thread != null) {
			/*
			修改前驱节点为SIGNAL成功,将前驱节点的next指向当前节点的后继节点,当前节点出列。
			 */
			Node next = node.next;
			if (next != null && next.waitStatus <= 0)
				compareAndSetNext(pred, predNext, next);
		} else {
			/*
			失败有两种情况:
			1.当前节点的prev为head,那老二就有资格去竞争了,唤醒当前节点的后继节点。
			2.修改前驱节点为SIGNAL失败,唤醒后继节点,让它自己去修改前驱节点的状态。
			 */
			unparkSuccessor(node);
		}

		node.next = node; // help GC
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63

节点取消获取锁的情况有两种需要考虑:

  1. 当前节点是tail。
  2. 当前节点是中间节点。

第一种情况处理就简单了,直接将tail指向当前节点的prev,然后prev的next置空,当前节点就出队了。
第二种情况就比较复杂,如果当前节点的前驱节点不是head的话,那么就必须将前驱节点的waitStatus设为SIGNAL(-1),然后将前驱节点的next指向当前节点的next,当前节点的next的prev,指向前驱节点。
在这里插入图片描述
如果说当前节点的前驱节点是head,那么就直接唤醒当前节点的后继节点,因为老三变老二了,它有资格去竞争锁了。

如果CAS修改节点的指向失败了,也没关系,唤醒当前节点的后继节点,让它自己去修改前驱节点的waitStatus,当前节点可以安心出队。

很显然,线程B是不会触发cancelAcquire()方法的。

假设,此时线程A依然没有unlock,此时线程C也要来获取锁。显然线程C会竞争失败,AQS会将其封装为Node节点入队,并将线程B的Node节点的waitStatus改为SIGNAL(-1),然后Park休眠。
此时,AQS的内部结构是:
在这里插入图片描述


ReentrantLock.unlock()做了什么?

线程B、C成功入队并Park后,假设此时线程A执行unlock释放锁:
在这里插入图片描述
释放锁的过程,其实是调用了sync的release(),这也是AQS的模板方法:

// 释放锁
public final boolean release(int arg) {
	/*
	调用子类的tryRelease(),返回true代表成功释放锁。
	对于ReentrantLock来说,state减少至0代表需要释放锁。
	 */
	if (tryRelease(arg)) {
		/*
		head就代表持有锁的节点。
		如果head的waitStatus!=0,说明有后继节点在等待被其唤醒。
		还记得线程入队时,如果要挂起,必须将其前驱节点的waitStatus改为-1吗???
		如果节点入队不改前驱节点的waitStatus,它将无法被唤醒。
		 */
		Node h = head;
		if (h != null && h.waitStatus != 0)
			// 释放锁后要去唤醒后继节点
			unparkSuccessor(h);
		return true;
	}
	return false;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

AQS会调用子类实现的tryRelease(),当它返回true就代表成功释放了资源,AQS就会去唤醒队列中的节点。

/*
尝试释放锁,返回true代表锁成功释放。
只有持有锁的线程才能释放锁,因此不存在并发问题。
 */
protected final boolean tryRelease(int releases) {
	int c = getState() - releases;
	// 不是持有锁的线程执行释放锁,抛异常。
	if (Thread.currentThread() != getExclusiveOwnerThread())
		throw new IllegalMonitorStateException();
	boolean free = false;
	if (c == 0) {
		// 当state==0才需要真正的释放锁
		free = true;
		setExclusiveOwnerThread(null);
	}
	setState(c);
	return free;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

ReentrantLock是可重入锁,state代表锁的重入次数,tryRelease()就是state自减的过程。当state减至0就代表锁成功释放了,同时会将exclusiveOwnerThread置空。

必须是持有锁的线程才能调用tryRelease(),否则会抛异常。

线程A执行完tryRelease()后,此时AQS的内部结构是:
在这里插入图片描述

tryRelease()返回true,AQS就要去唤醒队列中的节点了,执行unparkSuccessor():

// 唤醒后继节点
private void unparkSuccessor(Node node) {
	// 将waitStatus置为0
	int ws = node.waitStatus;
	if (ws < 0)
		compareAndSetWaitStatus(node, ws, 0);

	/*
	如果后继节点的waitStatus>0,代表节点是CANCELLED的无效节点,会跳过。
	然后从尾巴开始向头找,直到找到waitStatus <= 0的有效节点,并将其唤醒。

	为什么要从尾巴找?
	是因为节点入队时,需要执行三个操作:
	1.当前节点的prev指向前任tail
	2.CAS将tail指向当前节点
	3.前任tail的next指向当前节点
	如果执行到步骤2时,时间片到期,此时前驱节点的next还是null,会存在漏唤醒的问题。
	而prev的赋值操作先于CAS执行,因此通过prev向前找总能找到。
	 */
	Node s = node.next;
	if (s == null || s.waitStatus > 0) {
		s = null;
		for (Node t = tail; t != null && t != node; t = t.prev)
			if (t.waitStatus <= 0)
				s = t;
	}
	if (s != null)
		LockSupport.unpark(s.thread);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

这里有一个很有意思的操作,当next节点无效时,AQS会跳过它,重新寻找有效的节点。AQS会从tail开始向head找,而不是从head向tail找,这是为什么呢???

从尾部向头部找,是因为节点入队时,需要执行三个操作:1.当前节点的prev指向前任tail、2.CAS将tail指向当前节点、3.前任tail的next指向当前节点。如果执行到步骤2时,时间片到期,此时前驱节点的next还是null,会存在漏唤醒的问题。而prev的赋值操作先于CAS执行,因此通过prev向前找总能找到。

当节点被唤醒时,会从AQS的parkAndCheckInterrupt()方法里继续执行,重新获取锁。

线程A释放锁,并将线程B唤醒,线程B会继续去竞争。
在这里插入图片描述
此时线程B会竞争成功,同时会将head指向当前节点。
此时AQS内部结构是:
在这里插入图片描述
线程B运行一段时间后,也释放锁了,接着会唤醒线程C。
在这里插入图片描述
线程C会成功获取到锁:
在这里插入图片描述
线程C释放锁后,最后一个Node节点并不会出列,而是会保留。当下一次有线程来竞争锁时,成功后会自动将前任head覆盖。


问题

最后再总结一下关于AQS几个比较重要的问题。

1.工作线程什么时候出队?

当FIFO队列中的一个节点竞争到资源时,它并不是就马上出队了,而是将head指向自己。节点释放锁后依然不会主动出队,而是等待下一个节点竞争锁成功后修改head的指向,将前任head踢出去。

2.AQS唤醒队列的规则是什么?

当head指向的节点成功释放资源后,首先会判断当前节点的waitStatus是否等于0,如果等于0就不会去唤醒后继节点了,这也就是为什么新的节点入队休眠的前提是必须将前驱节点的waitStatus改为SIGNAL(-1)的原因,如果不改,后继节点将不会被唤醒,就会导致死锁。

AQS首先会唤醒当前节点的直接后继节点next,如果next为null,有两种情况:

  1. 确实没有后继节点了,之前next指向的节点可能由于超时等原因退出竞争了。
  2. 存在后继节点,只是由于多线程的原因,后继节点还没来得及将当前节点的next指向它。

第一种情况好办,后继节点为null,不唤醒就是了。
第二种情况就需要从tail向head寻找了,找到了有效节点再唤醒。

如果存在直接后继节点,但是节点的waitStatus大于0,AQS也是会选择跳过它的。前面已经说过,waitStatus大于0的节点代表无效节点,如CANCELLED(1)是已经取消竞争的节点。如果直接后继节点是无效节点的话,AQS会从tail开始向head遍历,直到找到有效节点,再将其唤醒。

总结:存在直接后继节点且节点有效,则优先唤醒后继节点。否则,从tail向head遍历,直到找到有效节点再唤醒。

3.唤醒节点为什么要从尾巴开始?

这是因为,新节点入队时,需要执行三个步骤:

  1. 当前节点的prev指向前任tail
  2. CAS将tail指向当前节点
  3. 前任tail的next指向当前节点

这三个操作AQS并没有做同步处理,如果在执行步骤2后CPU时间片到期了,此时的节点指向是这样的:
在这里插入图片描述
前驱节点的next还没有赋值,如果从头向尾找,就可能会存在漏唤醒的问题。
而prev的赋值先于tail的CAS操作之前执行,因此从尾向头找,就可以避免这个问题。

4.其他问题

占个坑,问题可以持续更新,想到了再更。有疑惑的同学可以评论告诉我,我会把我知道的记录下来,大家一起再探讨一下。


尾巴

到现在还很印象深刻,年初的时候有一次面试,就被问到AQS,当时没有静下心来研究,有些概念还是很模糊,答得不是很好,面试官也不太满意。

恰好上周加班,今天调休,可以写篇文章放松一下。于是我决定挑战一下我的软肋AQS,静下心来阅读源码才发现,过去看网上的博客,一直比较模糊的概念,其实代码里已经写的非常清楚了。

过去读不懂的源码,往后慢慢都会读懂!!!


你可能感兴趣的文章:

  • 摊牌了,我要手写一个RPC
  • Java锁的膨胀过程以及一致性哈希对锁膨胀的影响
  • ThreadLocal源码解析
  • CMS与三色标记算法
  • 大白话理解可达性分析算法
程序员小潘
微信公众号
专注于Java后端技术分享~
注:本文转载自blog.csdn.net的程序员小潘的文章"https://javap.blog.csdn.net/article/details/109786284"。版权归原作者所有,此博客不拥有其著作权,亦不承担相应法律责任。如有侵权,请联系我们删除。
复制链接
复制链接
相关推荐
发表评论
登录后才能发表评论和回复 注册

/ 登录

评论记录:

未查询到任何数据!
回复评论:

分类栏目

后端 (14832) 前端 (14280) 移动开发 (3760) 编程语言 (3851) Java (3904) Python (3298) 人工智能 (10119) AIGC (2810) 大数据 (3499) 数据库 (3945) 数据结构与算法 (3757) 音视频 (2669) 云原生 (3145) 云平台 (2965) 前沿技术 (2993) 开源 (2160) 小程序 (2860) 运维 (2533) 服务器 (2698) 操作系统 (2325) 硬件开发 (2492) 嵌入式 (2955) 微软技术 (2769) 软件工程 (2056) 测试 (2865) 网络空间安全 (2948) 网络与通信 (2797) 用户体验设计 (2592) 学习和成长 (2593) 搜索 (2744) 开发工具 (7108) 游戏 (2829) HarmonyOS (2935) 区块链 (2782) 数学 (3112) 3C硬件 (2759) 资讯 (2909) Android (4709) iOS (1850) 代码人生 (3043) 阅读 (2841)

热门文章

101
推荐
关于我们 隐私政策 免责声明 联系我们
Copyright © 2020-2025 蚁人论坛 (iYenn.com) All Rights Reserved.
Scroll to Top