第一个竞争出现时
public final void acquire ( int arg) {
if ( ! tryAcquire ( arg) &&
acquireQueued ( addWaiter ( Node. EXCLUSIVE) , arg) )
selfInterrupt ( ) ;
}
class="hljs-button signin active" data-title="登录复制" data-report-click="{"spm":"1001.2101.3001.4334"}">
Thread-1 执行了
ock方法中CAS 尝试将 state 由 0 改为 1,结果失败 lock方法中进一步调用acquire方法,进入 tryAcquire 逻辑,这里我们认为这时 state 已经是1,结果仍然失败 接下来进入 acquire方法的addWaiter 逻辑,构造 Node 队列
图中黄色三角表示该 Node 的 waitStatus 状态,其中 0 为默认正常状态 Node 的创建是懒惰的 其中第一个 Node 称为 Dummy(哑元)或哨兵,用来占位,并不关联线程
当前线程进入 acquire方法的 acquireQueued 逻辑
acquireQueued 会在一个死循环中不断尝试获得锁,失败后进入 park 阻塞 如果自己是紧邻着 head(排第二位),那么再次 tryAcquire 尝试获取锁,我们这里设置这时 state 仍为 1,失败 进入 shouldParkAfterFailedAcquire 逻辑,将前驱 node,即 head 的 waitStatus 改为 -1,这次返回 false shouldParkAfterFailedAcquire 执行完毕回到 acquireQueued ,再次 tryAcquire 尝试获取锁,当然这时 state 仍为 1,失败 当再次进入 shouldParkAfterFailedAcquire 时,这时因为其前驱 node 的 waitStatus 已经是 -1,这次返回 true 进入 parkAndCheckInterrupt, Thread-1 park(灰色表示已经阻塞)
再从次有多个线程经历上述过程竞争失败,变成这个样子 Thread-0 释放锁,进入 tryRelease 流程,如果成功
设置 exclusiveOwnerThread 为 null state = 0
如果当前队列不为 null,并且 head 的 waitStatus = -1,进入 unparkSuccessor 流程:
unparkSuccessor 中会找到队列中离 head 最近的一个 Node (没取消的),unpark 恢复其运行,本例中即为 Thread-1
回到 Thread-1 的 acquireQueued 流程 如果加锁成功(没有竞争),会设置 (acquireQueued 方法中)
exclusiveOwnerThread 为 Thread-1,state = 1 head 指向刚刚 Thread-1 所在的 Node,该 Node 清空 Thread 原本的 head 因为从链表断开,而可被垃圾回收
如果这时候有其它线程来竞争(非公平的体现),例如这时有 Thread-4 来了 如果不巧又被 Thread-4 占了先
Thread-4 被设置为 exclusiveOwnerThread,state = 1 Thread-1 再次进入 acquireQueued 流程,获取锁失败,重新进入 park 阻塞
加锁源码:
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691 L;
final void lock ( ) {
if ( compareAndSetState ( 0 , 1 ) )
setExclusiveOwnerThread ( Thread. currentThread ( ) ) ;
else
acquire ( 1 ) ;
}
public final void acquire ( int arg) {
if (
! tryAcquire ( arg) &&
acquireQueued ( addWaiter ( Node. EXCLUSIVE) , arg)
) {
selfInterrupt ( ) ;
}
}
protected final boolean tryAcquire ( int acquires) {
return nonfairTryAcquire ( acquires) ;
}
final boolean nonfairTryAcquire ( int acquires) {
final Thread current = Thread. currentThread ( ) ;
int c = getState ( ) ;
if ( c == 0 ) {
if ( compareAndSetState ( 0 , acquires) ) {
setExclusiveOwnerThread ( current) ;
return true ;
}
}
else if ( current == getExclusiveOwnerThread ( ) ) {
int nextc = c + acquires;
if ( nextc < 0 )
throw new Error ( "Maximum lock count exceeded" ) ;
setState ( nextc) ;
return true ;
}
return false ;
}
private Node addWaiter ( Node mode) {
Node node = new Node ( Thread. currentThread ( ) , mode) ;
Node pred = tail;
if ( pred != null) {
node. prev = pred;
if ( compareAndSetTail ( pred, node) ) {
pred. next = node;
return node;
}
}
enq ( node) ;
return node;
}
private Node enq ( final Node node) {
for ( ; ; ) {
Node t = tail;
if ( t == null) {
if ( compareAndSetHead ( new Node ( ) ) ) {
tail = head;
}
} else {
node. prev = t;
if ( compareAndSetTail ( t, node) ) {
t. next = node;
return t;
}
}
}
}
final boolean acquireQueued ( final Node node, int arg) {
boolean failed = true ;
try {
boolean interrupted = false ;
for ( ; ; ) {
final Node p = node. predecessor ( ) ;
if ( p == head && tryAcquire ( arg) ) {
setHead ( node) ;
p. next = null;
failed = false ;
return interrupted;
}
if (
shouldParkAfterFailedAcquire ( p, node) &&
parkAndCheckInterrupt ( )
) {
interrupted = true ;
}
}
} finally {
if ( failed)
cancelAcquire ( node) ;
}
}
private static boolean shouldParkAfterFailedAcquire ( Node pred, Node node) {
int ws = pred. waitStatus;
if ( ws == Node. SIGNAL) {
return true ;
}
if ( ws > 0 ) {
do {
node. prev = pred = pred. prev;
} while ( pred. waitStatus > 0 ) ;
pred. next = node;
} else {
compareAndSetWaitStatus ( pred, ws, Node. SIGNAL) ;
}
return false ;
}
private final boolean parkAndCheckInterrupt ( ) {
LockSupport. park ( this ) ;
return Thread. interrupted ( ) ;
}
}
class="hljs-button signin active" data-title="登录复制" data-report-click="{"spm":"1001.2101.3001.4334"}">
class="hide-preCode-box">1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156
解锁源码:
static final class NonfairSync extends Sync {
public void unlock ( ) {
sync. release ( 1 ) ;
}
public final boolean release ( int arg) {
if ( tryRelease ( arg) ) {
Node h = head;
if (
h != null &&
h. waitStatus != 0
) {
unparkSuccessor ( h) ;
}
return true ;
}
return false ;
}
protected final boolean tryRelease ( int releases) {
int c = getState ( ) - releases;
if ( Thread. currentThread ( ) != getExclusiveOwnerThread ( ) )
throw new IllegalMonitorStateException ( ) ;
boolean free = false ;
if ( c == 0 ) {
free = true ;
setExclusiveOwnerThread ( null) ;
}
setState ( c) ;
return free;
}
private void unparkSuccessor ( Node node) {
int ws = node. waitStatus;
if ( ws < 0 ) {
compareAndSetWaitStatus ( node, ws, 0 ) ;
}
Node s = node. next;
if ( s == null || s. waitStatus > 0 ) {
s = null;
for ( Node t = tail; t != null && t != node; t = t. prev)
if ( t. waitStatus <= 0 )
s = t;
}
if ( s != null)
LockSupport. unpark ( s. thread) ;
}
}
class="hljs-button signin active" data-title="登录复制" data-report-click="{"spm":"1001.2101.3001.4334"}">
class="hide-preCode-box">1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
2、锁重入原理
static final class NonfairSync extends Sync {
final boolean nonfairTryAcquire ( int acquires) {
final Thread current = Thread. currentThread ( ) ;
int c = getState ( ) ;
if ( c == 0 ) {
if ( compareAndSetState ( 0 , acquires) ) {
setExclusiveOwnerThread ( current) ;
return true ;
}
}
else if ( current == getExclusiveOwnerThread ( ) ) {
int nextc = c + acquires;
if ( nextc < 0 )
throw new Error ( "Maximum lock count exceeded" ) ;
setState ( nextc) ;
return true ;
}
return false ;
}
protected final boolean tryRelease ( int releases) {
int c = getState ( ) - releases;
if ( Thread. currentThread ( ) != getExclusiveOwnerThread ( ) )
throw new IllegalMonitorStateException ( ) ;
boolean free = false ;
if ( c == 0 ) {
free = true ;
setExclusiveOwnerThread ( null) ;
}
setState ( c) ;
return free;
}
}
class="hljs-button signin active" data-title="登录复制" data-report-click="{"spm":"1001.2101.3001.4334"}">
class="hide-preCode-box">1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
3、可打断原理
不可打断模式: 在此模式下,即使它被打断,仍会驻留在 AQS 队列中,一直要等到获得锁后方能得知自己被打断了。
static final class NonfairSync extends Sync {
private final boolean parkAndCheckInterrupt ( ) {
LockSupport. park ( this ) ;
return Thread. interrupted ( ) ;
}
final boolean acquireQueued ( final Node node, int arg) {
boolean failed = true ;
try {
boolean interrupted = false ;
for ( ; ; ) {
final Node p = node. predecessor ( ) ;
if ( p == head && tryAcquire ( arg) ) {
setHead ( node) ;
p. next = null;
failed = false ;
return interrupted;
}
if (
shouldParkAfterFailedAcquire ( p, node) &&
parkAndCheckInterrupt ( )
) {
interrupted = true ;
}
}
} finally {
if ( failed)
cancelAcquire ( node) ;
}
}
public final void acquire ( int arg) {
if (
! tryAcquire ( arg) &&
acquireQueued ( addWaiter ( Node. EXCLUSIVE) , arg)
) {
selfInterrupt ( ) ;
}
}
static void selfInterrupt ( ) {
Thread. currentThread ( ) . interrupt ( ) ;
}
}
}
class="hljs-button signin active" data-title="登录复制" data-report-click="{"spm":"1001.2101.3001.4334"}">
class="hide-preCode-box">1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
可打断模式:
static final class NonfairSync extends Sync {
public final void acquireInterruptibly ( int arg) throws InterruptedException {
if ( Thread. interrupted ( ) )
throw new InterruptedException ( ) ;
if ( ! tryAcquire ( arg) )
doAcquireInterruptibly ( arg) ;
}
private void doAcquireInterruptibly ( int arg) throws InterruptedException {
final Node node = addWaiter ( Node. EXCLUSIVE) ;
boolean failed = true ;
try {
for ( ; ; ) {
final Node p = node. predecessor ( ) ;
if ( p == head && tryAcquire ( arg) ) {
setHead ( node) ;
p. next = null;
failed = false ;
return ;
}
if ( shouldParkAfterFailedAcquire ( p, node) &&
parkAndCheckInterrupt ( ) ) {
throw new InterruptedException ( ) ;
}
}
} finally {
if ( failed)
cancelAcquire ( node) ;
}
}
}
class="hljs-button signin active" data-title="登录复制" data-report-click="{"spm":"1001.2101.3001.4334"}">
class="hide-preCode-box">1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
4、公平锁原理
static final class FairSync extends Sync {
private static final long serialVersionUID = - 3000897897090466540 L;
final void lock ( ) {
acquire ( 1 ) ;
}
public final void acquire ( int arg) {
if (
! tryAcquire ( arg) &&
acquireQueued ( addWaiter ( Node. EXCLUSIVE) , arg)
) {
selfInterrupt ( ) ;
}
}
protected final boolean tryAcquire ( int acquires) {
final Thread current = Thread. currentThread ( ) ;
int c = getState ( ) ;
if ( c == 0 ) {
if ( ! hasQueuedPredecessors ( ) &&
compareAndSetState ( 0 , acquires) ) {
setExclusiveOwnerThread ( current) ;
return true ;
}
}
else if ( current == getExclusiveOwnerThread ( ) ) {
int nextc = c + acquires;
if ( nextc < 0 )
throw new Error ( "Maximum lock count exceeded" ) ;
setState ( nextc) ;
return true ;
}
return false ;
}
public final boolean hasQueuedPredecessors ( ) {
Node t = tail;
Node h = head;
Node s;
return h != t &&
(
( s = h. next) == null ||
s. thread != Thread. currentThread ( )
) ;
}
}
class="hljs-button signin active" data-title="登录复制" data-report-click="{"spm":"1001.2101.3001.4334"}">
class="hide-preCode-box">1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
5、条件变量实现原理
每个条件变量其实就对应着一个等待队列,其实现类是 ConditionObject await 流程 开始 Thread-0 持有锁,调用 await,进入 ConditionObject 的 addConditionWaiter 流程 创建新的 Node 状态为 -2(Node.CONDITION),关联 Thread-0,加入等待队列尾部 接下来进入 AQS 的 fullyRelease 流程,释放同步器上的锁 unpark AQS 队列中的下一个节点,竞争锁,假设没有其他竞争线程,那么 Thread-1 竞争成功 park 阻塞 Thread-0 signal 流程 假设 Thread-1 要来唤醒 Thread-0 进入 ConditionObject 的 doSignal 流程,取得等待队列中第一个 Node,即 Thread-0 所在 Node 执行 transferForSignal 流程,将该 Node 加入 AQS 队列尾部,将 Thread-0 的 waitStatus 改为 0,Thread-3 的waitStatus 改为 -1 Thread-1 释放锁,进入 unlock 流程。 源码分析:
public class ConditionObject implements Condition , java. io. Serializable {
private static final long serialVersionUID = 1173984872572414699 L;
private transient Node firstWaiter;
private transient Node lastWaiter;
public ConditionObject ( ) { }
private Node addConditionWaiter ( ) {
Node t = lastWaiter;
if ( t != null && t. waitStatus != Node. CONDITION) {
unlinkCancelledWaiters ( ) ;
t = lastWaiter;
}
Node node = new Node ( Thread. currentThread ( ) , Node. CONDITION) ;
if ( t == null)
firstWaiter = node;
else
t. nextWaiter = node;
lastWaiter = node;
return node;
}
private void doSignal ( Node first) {
do {
if ( ( firstWaiter = first. nextWaiter) == null) {
lastWaiter = null;
}
first. nextWaiter = null;
} while (
! transferForSignal ( first) &&
( first = firstWaiter) != null
) ;
}
final boolean transferForSignal ( Node node) {
if ( ! compareAndSetWaitStatus ( node, Node. CONDITION, 0 ) )
return false ;
Node p = enq ( node) ;
int ws = p. waitStatus;
if (
ws > 0 ||
! compareAndSetWaitStatus ( p, ws, Node. SIGNAL)
) {
LockSupport. unpark ( node. thread) ;
}
return true ;
}
private void doSignalAll ( Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first. nextWaiter;
first. nextWaiter = null;
transferForSignal ( first) ;
first = next;
} while ( first != null) ;
}
private void unlinkCancelledWaiters ( ) {
}
public final void signal ( ) {
if ( ! isHeldExclusively ( ) )
throw new IllegalMonitorStateException ( ) ;
Node first = firstWaiter;
if ( first != null)
doSignal ( first) ;
}
public final void signalAll ( ) {
if ( ! isHeldExclusively ( ) )
throw new IllegalMonitorStateException ( ) ;
Node first = firstWaiter;
if ( first != null)
doSignalAll ( first) ;
}
public final void awaitUninterruptibly ( ) {
Node node = addConditionWaiter ( ) ;
int savedState = fullyRelease ( node) ;
boolean interrupted = false ;
while ( ! isOnSyncQueue ( node) ) {
LockSupport. park ( this ) ;
if ( Thread. interrupted ( ) )
interrupted = true ;
}
if ( acquireQueued ( node, savedState) || interrupted)
selfInterrupt ( ) ;
}
final int fullyRelease ( Node node) {
boolean failed = true ;
try {
int savedState = getState ( ) ;
if ( release ( savedState) ) {
failed = false ;
return savedState;
} else {
throw new IllegalMonitorStateException ( ) ;
}
} finally {
if ( failed)
node. waitStatus = Node. CANCELLED;
}
}
private static final int REINTERRUPT = 1 ;
private static final int THROW_IE = - 1 ;
private int checkInterruptWhileWaiting ( Node node) {
return Thread. interrupted ( ) ?
( transferAfterCancelledWait ( node) ? THROW_IE : REINTERRUPT) :
0 ;
}
private void reportInterruptAfterWait ( int interruptMode)
throws InterruptedException {
if ( interruptMode == THROW_IE)
throw new InterruptedException ( ) ;
else if ( interruptMode == REINTERRUPT)
selfInterrupt ( ) ;
}
public final void await ( ) throws InterruptedException {
if ( Thread. interrupted ( ) ) {
throw new InterruptedException ( ) ;
}
Node node = addConditionWaiter ( ) ;
int savedState = fullyRelease ( node) ;
int interruptMode = 0 ;
while ( ! isOnSyncQueue ( node) ) {
LockSupport. park ( this ) ;
if ( ( interruptMode = checkInterruptWhileWaiting ( node) ) != 0 )
break ;
}
if ( acquireQueued ( node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if ( node. nextWaiter != null)
unlinkCancelledWaiters ( ) ;
if ( interruptMode != 0 )
reportInterruptAfterWait ( interruptMode) ;
}
public final long awaitNanos ( long nanosTimeout) throws InterruptedException {
if ( Thread. interrupted ( ) ) {
throw new InterruptedException ( ) ;
}
Node node = addConditionWaiter ( ) ;
int savedState = fullyRelease ( node) ;
final long deadline = System. nanoTime ( ) + nanosTimeout;
int interruptMode = 0 ;
while ( ! isOnSyncQueue ( node) ) {
if ( nanosTimeout <= 0 L) {
transferAfterCancelledWait ( node) ;
break ;
}
if ( nanosTimeout >= spinForTimeoutThreshold)
LockSupport. parkNanos ( this , nanosTimeout) ;
if ( ( interruptMode = checkInterruptWhileWaiting ( node) ) != 0 )
break ;
nanosTimeout = deadline - System. nanoTime ( ) ;
}
if ( acquireQueued ( node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if ( node. nextWaiter != null)
unlinkCancelledWaiters ( ) ;
if ( interruptMode != 0 )
reportInterruptAfterWait ( interruptMode) ;
return deadline - System. nanoTime ( ) ;
}
public final boolean awaitUntil ( Date deadline) throws InterruptedException {
}
public final boolean await ( long time, TimeUnit unit) throws InterruptedException {
}
}
class="hljs-button signin active" data-title="登录复制" data-report-click="{"spm":"1001.2101.3001.4334"}">
class="hide-preCode-box">1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225
1、ReentrantReadWriteLock
当读操作远远高于写操作时,这时候使用读写锁让读-读可以并发,提高性能。读-写,写-写都是相互互斥的!
提供一个数据容器类内部分别使用读锁保护数据的read()方法,写锁保护数据的write()方法 。
实现代码如下:
public class Code_12_ReadWriteLockTest {
public static void main ( String[ ] args) throws InterruptedException {
DataContainer dataContainer = new DataContainer ( ) ;
Thread t1 = new Thread ( ( ) - > {
dataContainer. read ( ) ;
} , "t1" ) ;
Thread t2 = new Thread ( ( ) - > {
dataContainer. write ( ) ;
} , "t2" ) ;
t1. start ( ) ;
t2. start ( ) ;
}
}
@Slf4j ( topic = "c.DataContainer" )
class DataContainer {
private Object object = new Object ( ) ;
private ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock ( ) ;
private ReentrantReadWriteLock. ReadLock readLock = readWriteLock. readLock ( ) ;
private ReentrantReadWriteLock. WriteLock writeLock = readWriteLock. writeLock ( ) ;
public Object read ( ) {
readLock. lock ( ) ;
log. info ( "拿到读锁!" ) ;
try {
log. info ( "读取操作 ..." ) ;
try {
TimeUnit. SECONDS. sleep ( 1 ) ;
} catch ( InterruptedException e) {
e. printStackTrace ( ) ;
}
} finally {
readLock. unlock ( ) ;
log. info ( "释放读锁!" ) ;
}
return object;
}
public void write ( ) {
writeLock. lock ( ) ;
log. info ( "拿到写锁!" ) ;
try {
log. info ( "写操作 ... " ) ;
} finally {
writeLock. unlock ( ) ;
log. info ( "释放写锁!" ) ;
}
}
}
class="hljs-button signin active" data-title="登录复制" data-report-click="{"spm":"1001.2101.3001.4334"}">
class="hide-preCode-box">1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
注意事项
读锁不支持条件变量 重入时升级不支持:即持有读锁的情况下去获取写锁,会导致获取写锁永久等待
r. lock ( ) ;
try {
w. lock ( ) ;
try {
} finally {
w. unlock ( ) ;
}
} finally {
r. unlock ( ) ;
}
class="hljs-button signin active" data-title="登录复制" data-report-click="{"spm":"1001.2101.3001.4334"}">
重入时降级支持:即持有写锁的情况下去获取读锁
class CachedData {
Object data;
volatile boolean cacheValid;
final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock ( ) ;
void processCachedData ( ) {
rwl. readLock ( ) . lock ( ) ;
if ( ! cacheValid) {
rwl. readLock ( ) . unlock ( ) ;
rwl. writeLock ( ) . lock ( ) ;
try {
if ( ! cacheValid) {
data = . . .
cacheValid = true ;
}
rwl. readLock ( ) . lock ( ) ;
} finally {
rwl. writeLock ( ) . unlock ( ) ;
}
}
try {
use ( data) ;
} finally {
rwl. readLock ( ) . unlock ( ) ;
}
}
}
class="hljs-button signin active" data-title="登录复制" data-report-click="{"spm":"1001.2101.3001.4334"}">
class="hide-preCode-box">1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
2、应用之缓存
缓存更新策略: 更新时,是先清缓存还是先更新数据库? 先清除缓存操作如下:
先更新数据库操作如下: 补充一种情况,假设查询线程 A 查询数据时恰好缓存数据由于时间到期失效,或是第一次查询:这种情况的出现几率非常小: 实现代码如下:
public class Code_13_ReadWriteCacheTest {
public static void main ( String[ ] args) {
GeneriCacheDao< Object> generiCacheDao = new GeneriCacheDao < > ( ) ;
Object[ ] objects = new Object [ 2 ] ;
generiCacheDao. queryOne ( Object. class , "Test" , objects) ;
generiCacheDao. queryOne ( Object. class , "Test" , objects) ;
generiCacheDao. queryOne ( Object. class , "Test" , objects) ;
generiCacheDao. queryOne ( Object. class , "Test" , objects) ;
System. out. println ( generiCacheDao. map) ;
generiCacheDao. update ( "Test" , objects) ;
System. out. println ( generiCacheDao. map) ;
}
}
class GeneriCacheDao < T> extends GenericDao {
HashMap< SqlPair, T> map = new HashMap < > ( ) ;
ReentrantReadWriteLock lock = new ReentrantReadWriteLock ( ) ;
GenericDao genericDao = new GenericDao ( ) ;
@Override
public int update ( String sql, Object. . . params) {
lock. writeLock ( ) . lock ( ) ;
SqlPair sqlPair = new SqlPair ( sql, params) ;
try {
int update = genericDao. update ( sql, params) ;
map. clear ( ) ;
return update;
} finally {
lock. writeLock ( ) . unlock ( ) ;
}
}
@Override
public T queryOne ( Class beanClass , String sql, Object. . . params) {
SqlPair key = new SqlPair ( sql, params) ;
lock. readLock ( ) . lock ( ) ;
try {
T t = map. get ( key) ;
if ( t != null) {
return t;
}
} finally {
lock. readLock ( ) . unlock ( ) ;
}
lock. writeLock ( ) . lock ( ) ;
try {
T value = map. get ( key) ;
if ( value == null) {
value = ( T) genericDao. queryOne ( beanClass, sql, params) ;
map. put ( key, value) ;
}
return value;
} finally {
lock. writeLock ( ) . unlock ( ) ;
}
}
class SqlPair {
private String sql;
private Object[ ] params;
public SqlPair ( String sql, Object[ ] params) {
this . sql = sql;
this . params = params;
}
@Override
public boolean equals ( Object o) {
if ( this == o) return true ;
if ( o == null || getClass ( ) != o. getClass ( ) ) return false ;
SqlPair sqlMap = ( SqlPair) o;
return Objects. equals ( sql, sqlMap. sql) &&
Arrays. equals ( params, sqlMap. params) ;
}
@Override
public int hashCode ( ) {
int result = Objects. hash ( sql) ;
result = 31 * result + Arrays. hashCode ( params) ;
return result;
}
}
}
class GenericDao < T> {
public int update ( String sql, Object. . . params) {
return 1 ;
}
public T queryOne ( Class< T> beanClass, String sql, Object. . . params) {
System. out. println ( "查询数据库中" ) ;
return ( T) new Object ( ) ;
}
}
class="hljs-button signin active" data-title="登录复制" data-report-click="{"spm":"1001.2101.3001.4334"}">
class="hide-preCode-box">1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110
3、读写锁原理
图解流程
读写锁用的是同一个 Sync 同步器,因此等待队列、state 等也是同一个 下面执行:t1 w.lock,t2 r.lock 情况
1)t1 成功上锁,流程与 ReentrantLock 加锁相比没有特殊之处,不同是写锁状态占了 state 的低 16 位,而读锁使用的是 state 的高 16 位 2)t2 执行 r.lock,这时进入读锁的 sync.acquireShared(1) 流程,首先会进入 tryAcquireShared 流程。如果有写锁占据,那么 tryAcquireShared 返回 -1 表示失败。 tryAcquireShared 返回值表示
-1 表示失败 0 表示成功,但后继节点不会继续唤醒 正数表示成功,而且数值是还有几个后继节点需要唤醒,我们这里的读写锁返回 1
3)这时会进入 sync.doAcquireShared(1) 流程,首先也是调用 addWaiter 添加节点,不同之处在于节点被设置为 Node.SHARED 模式而非 Node.EXCLUSIVE 模式,注意此时 t2 仍处于活跃状态 4)t2 会看看自己的节点是不是老二,如果是,还会再次调用 tryAcquireShared(1) 来尝试获取锁 5)如果没有成功,在 doAcquireShared 内 for (;😉 循环一次,把前驱节点的 waitStatus 改为 -1,再 for (;😉 循环一 次尝试 tryAcquireShared(1) 如果还不成功,那么在 parkAndCheckInterrupt() 处 park。
又继续执行 :t3 r.lock,t4 w.lock 这种状态下,假设又有 t3 加读锁和 t4 加写锁,这期间 t1 仍然持有锁,就变成了下面的样子 继续执行 t1 w.unlock 这时会走到写锁的 sync.release(1) 流程,调用 sync.tryRelease(1) 成功,变成下面的样子 接下来执行唤醒流程 sync.unparkSuccessor,即让老二恢复运行,这时 t2 在 doAcquireShared 内 parkAndCheckInterrupt() 处恢复运行,图中的t2从黑色变成了蓝色(注意这里只是恢复运行而已,并没有获取到锁!) 这回再来一次 for (;; ) 执行 tryAcquireShared 成功则让读锁计数加一 这时 t2 已经恢复运行,接下来 t2 调用 setHeadAndPropagate(node, 1),它原本所在节点被置为头节点 事情还没完,在 setHeadAndPropagate 方法内还会检查下一个节点是否是 shared,如果是则调用 doReleaseShared() 将 head 的状态从 -1 改为 0 并唤醒老二,这时 t3 在 doAcquireShared 内 parkAndCheckInterrupt() 处恢复运行. 这回再来一次 for (;; ) 执行 tryAcquireShared 成功则让读锁计数加一 这时 t3 已经恢复运行,接下来 t3 调用 setHeadAndPropagate(node, 1),它原本所在节点被置为头节点 再继续执行t2 r.unlock,t3 r.unlock t2 进入 sync.releaseShared(1) 中,调用 tryReleaseShared(1) 让计数减一,但由于计数还不为零 t3 进入 sync.releaseShared(1) 中,调用 tryReleaseShared(1) 让计数减一,这回计数为零了,进入 doReleaseShared() 将头节点从 -1 改为 0 并唤醒老二,即 之后 t4 在 acquireQueued 中 parkAndCheckInterrupt 处恢复运行,再次 for (;; ) 这次自己是老二,并且没有其他 竞争,tryAcquire(1) 成功,修改头结点,流程结束 源码分析: 写锁上锁流程
static final class NonfairSync extends Sync {
public void lock ( ) {
sync. acquire ( 1 ) ;
}
public final void acquire ( int arg) {
if (
! tryAcquire ( arg) &&
acquireQueued ( addWaiter ( Node. EXCLUSIVE) , arg)
) {
selfInterrupt ( ) ;
}
}
protected final boolean tryAcquire ( int acquires) {
Thread current = Thread. currentThread ( ) ;
int c = getState ( ) ;
int w = exclusiveCount ( c) ;
if ( c != 0 ) {
if (
w == 0 ||
current != getExclusiveOwnerThread ( )
) {
return false ;
}
if ( w + exclusiveCount ( acquires) > MAX_COUNT)
throw new Error ( "Maximum lock count exceeded" ) ;
setState ( c + acquires) ;
return true ;
}
if (
writerShouldBlock ( ) ||
! compareAndSetState ( c, c + acquires)
) {
return false ;
}
setExclusiveOwnerThread ( current) ;
return true ;
}
final boolean writerShouldBlock ( ) {
return false ;
}
}
class="hljs-button signin active" data-title="登录复制" data-report-click="{"spm":"1001.2101.3001.4334"}">
class="hide-preCode-box">1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
写锁释放流程:
static final class NonfairSync extends Sync {
public void unlock ( ) {
sync. release ( 1 ) ;
}
public final boolean release ( int arg) {
if ( tryRelease ( arg) ) {
Node h = head;
if ( h != null && h. waitStatus != 0 )
unparkSuccessor ( h) ;
return true ;
}
return false ;
}
protected final boolean tryRelease ( int releases) {
if ( ! isHeldExclusively ( ) )
throw new IllegalMonitorStateException ( ) ;
int nextc = getState ( ) - releases;
boolean free = exclusiveCount ( nextc) == 0 ;
if ( free) {
setExclusiveOwnerThread ( null) ;
}
setState ( nextc) ;
return free;
}
}
class="hljs-button signin active" data-title="登录复制" data-report-click="{"spm":"1001.2101.3001.4334"}">
class="hide-preCode-box">1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
读锁上锁流程
static final class NonfairSync extends Sync {
public void lock ( ) {
sync. acquireShared ( 1 ) ;
}
public final void acquireShared ( int arg) {
if ( tryAcquireShared ( arg) < 0 ) {
doAcquireShared ( arg) ;
}
}
protected final int tryAcquireShared ( int unused) {
Thread current = Thread. currentThread ( ) ;
int c = getState ( ) ;
if (
exclusiveCount ( c) != 0 &&
getExclusiveOwnerThread ( ) != current
) {
return - 1 ;
}
int r = sharedCount ( c) ;
if (
! readerShouldBlock ( ) &&
r < MAX_COUNT &&
compareAndSetState ( c, c + SHARED_UNIT)
) {
return 1 ;
}
return fullTryAcquireShared ( current) ;
}
final boolean readerShouldBlock ( ) {
return apparentlyFirstQueuedIsExclusive ( ) ;
}
final int fullTryAcquireShared ( Thread current) {
HoldCounter rh = null;
for ( ; ; ) {
int c = getState ( ) ;
if ( exclusiveCount ( c) != 0 ) {
if ( getExclusiveOwnerThread ( ) != current)
return - 1 ;
} else if ( readerShouldBlock ( ) ) {
}
if ( sharedCount ( c) == MAX_COUNT)
throw new Error ( "Maximum lock count exceeded" ) ;
if ( compareAndSetState ( c, c + SHARED_UNIT) ) {
return 1 ;
}
}
}
private void doAcquireShared ( int arg) {
final Node node = addWaiter ( Node. SHARED) ;
boolean failed = true ;
try {
boolean interrupted = false ;
for ( ; ; ) {
final Node p = node. predecessor ( ) ;
if ( p == head) {
int r = tryAcquireShared ( arg) ;
if ( r >= 0 ) {
setHeadAndPropagate ( node, r) ;
p. next = null;
if ( interrupted)
selfInterrupt ( ) ;
failed = false ;
return ;
}
}
if (
shouldParkAfterFailedAcquire ( p, node) &&
parkAndCheckInterrupt ( )
) {
interrupted = true ;
}
}
} finally {
if ( failed)
cancelAcquire ( node) ;
}
}
private void setHeadAndPropagate ( Node node, int propagate) {
Node h = head;
setHead ( node) ;
if ( propagate > 0 || h == null || h. waitStatus < 0 ||
( h = head) == null || h. waitStatus < 0 ) {
Node s = node. next;
if ( s == null || s. isShared ( ) ) {
doReleaseShared ( ) ;
}
}
}
private void doReleaseShared ( ) {
for ( ; ; ) {
Node h = head;
if ( h != null && h != tail) {
int ws = h. waitStatus;
if ( ws == Node. SIGNAL) {
if ( ! compareAndSetWaitStatus ( h, Node. SIGNAL, 0 ) )
continue ;
unparkSuccessor ( h) ;
}
else if ( ws == 0 &&
! compareAndSetWaitStatus ( h, 0 , Node. PROPAGATE) )
continue ;
}
if ( h == head)
break ;
}
}
}
class="hljs-button signin active" data-title="登录复制" data-report-click="{"spm":"1001.2101.3001.4334"}">
class="hide-preCode-box">1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153
读锁释放流程
static final class NonfairSync extends Sync {
public void unlock ( ) {
sync. releaseShared ( 1 ) ;
}
public final boolean releaseShared ( int arg) {
if ( tryReleaseShared ( arg) ) {
doReleaseShared ( ) ;
return true ;
}
return false ;
}
protected final boolean tryReleaseShared ( int unused) {
for ( ; ; ) {
int c = getState ( ) ;
int nextc = c - SHARED_UNIT;
if ( compareAndSetState ( c, nextc) ) {
return nextc == 0 ;
}
}
}
private void doReleaseShared ( ) {
for ( ; ; ) {
Node h = head;
if ( h != null && h != tail) {
int ws = h. waitStatus;
if ( ws == Node. SIGNAL) {
if ( ! compareAndSetWaitStatus ( h, Node. SIGNAL, 0 ) )
continue ;
unparkSuccessor ( h) ;
}
else if ( ws == 0 &&
! compareAndSetWaitStatus ( h, 0 , Node. PROPAGATE) )
continue ;
}
if ( h == head)
break ;
}
}
}
class="hljs-button signin active" data-title="登录复制" data-report-click="{"spm":"1001.2101.3001.4334"}">
class="hide-preCode-box">1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
4、StampedLock
该类自 JDK 8 加入,是为了进一步优化读性能,它的特点是在使用读锁、写锁时都必须配合【戳】使用
加解读锁
long stamp = lock. readLock ( ) ;
lock. unlockRead ( stamp) ;
class="hljs-button signin active" data-title="登录复制" data-report-click="{"spm":"1001.2101.3001.4334"}">
加解写锁
long stamp = lock. writeLock ( ) ;
lock. unlockWrite ( stamp) ;
class="hljs-button signin active" data-title="登录复制" data-report-click="{"spm":"1001.2101.3001.4334"}">
乐观读,StampedLock 支持 tryOptimisticRead() 方法(乐观读),读取完毕后需要做一次 戳校验 如果校验通 过,表示这期间确实没有写操作,数据可以安全使用,如果校验没通过,需要重新获取读锁,保证数据安全 。
long stamp = lock. tryOptimisticRead ( ) ;
if ( ! lock. validate ( stamp) ) {
}
class="hljs-button signin active" data-title="登录复制" data-report-click="{"spm":"1001.2101.3001.4334"}">
提供一个 数据容器类 内部分别使用读锁保护数据的 read() 方法,写锁保护数据的 write() 方法。 代码实现:
public class Code_14_StampedLockTest {
public static void main ( String[ ] args) throws InterruptedException {
StampedLockDataContainer dataContainer = new StampedLockDataContainer ( 1 ) ;
Thread t1 = new Thread ( ( ) - > {
try {
System. out. println ( dataContainer. read ( 1 ) ) ;
} catch ( InterruptedException e) {
e. printStackTrace ( ) ;
}
} , "t1" ) ;
t1. start ( ) ;
TimeUnit. MILLISECONDS. sleep ( 500 ) ;
Thread t2 = new Thread ( ( ) - > {
dataContainer. write ( 10 ) ;
} , "t2" ) ;
t2. start ( ) ;
}
}
@Slf4j ( topic = "c.StampedLockDataContainer" )
class StampedLockDataContainer {
private int data;
private StampedLock stampedLock = new StampedLock ( ) ;
public StampedLockDataContainer ( int data) {
this . data = data;
}
public int read ( int readTime) throws InterruptedException {
long stamp = stampedLock. tryOptimisticRead ( ) ;
log. info ( "optimistic read locking ...{}" , stamp) ;
Thread. sleep ( readTime * 1000 ) ;
if ( stampedLock. validate ( stamp) ) {
log. info ( "read finish... {}" , stamp) ;
return data;
}
log. info ( "update to read lock ..." ) ;
try {
stamp = stampedLock. readLock ( ) ;
log. info ( "read lock {}" , stamp) ;
Thread. sleep ( readTime * 1000 ) ;
log. info ( "read finish ... {}" , stamp) ;
return data;
} finally {
stampedLock. unlockRead ( stamp) ;
}
}
public void write ( int newData) {
long stamp = stampedLock. writeLock ( ) ;
try {
log. info ( "write lock {}" , stamp) ;
this . data = newData;
try {
TimeUnit. SECONDS. sleep ( 1 ) ;
} catch ( InterruptedException e) {
e. printStackTrace ( ) ;
}
log. info ( "write finish ... {}" , stamp) ;
log. info ( "write newData ... {}" , this . data) ;
} finally {
stampedLock. unlockWrite ( stamp) ;
}
}
}
class="hljs-button signin active" data-title="登录复制" data-report-click="{"spm":"1001.2101.3001.4334"}">
class="hide-preCode-box">1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
注意: StampedLock 不支持条件变量 StampedLock 不支持可重入
Semaphore
1、基本使用
信号量,用来限制能同时访问共享资源的线程上限。
public static void main ( String[ ] args) {
Semaphore semaphore = new Semaphore ( 3 ) ;
for ( int i = 0 ; i < 10 ; i++ ) {
new Thread ( ( ) - > {
try {
semaphore. acquire ( ) ;
} catch ( InterruptedException e) {
e. printStackTrace ( ) ;
}
try {
log. info ( "start ..." ) ;
Thread. sleep ( 1000 ) ;
log. info ( "end ...." ) ;
} catch ( InterruptedException e) {
e. printStackTrace ( ) ;
} finally {
semaphore. release ( ) ;
}
} , "t" + ( i + 1 ) ) . start ( ) ; ;
}
}
class="hljs-button signin active" data-title="登录复制" data-report-click="{"spm":"1001.2101.3001.4334"}">
class="hide-preCode-box">1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
2、图解流程
Semaphore 有点像一个停车场,permits 就好像停车位数量,当线程获得了 permits 就像是获得了停车位,然后停车场显示空余车位减一刚开始,permits(state)为 3,这时 5 个线程来获取资源。 假设其中 Thread-1,Thread-2,Thread-4 cas 竞争成功,而 Thread-0 和 Thread-3 竞争失败,进入 AQS 队列park 阻塞 这时 Thread-4 释放了 permits,状态如下 接下来 Thread-0 竞争成功,permits 再次设置为 0,设置自己为 head 节点,断开原来的 head 节点,unpark 接下来的 Thread-3 节点,但由于 permits 是 0,因此 Thread-3 在尝试不成功后再次进入 park 状态
3、源码分析
static final class NonfairSync extends Sync {
private static final long serialVersionUID = - 2694183684443567898 L;
NonfairSync ( int permits) {
super ( permits) ;
}
public void acquire ( ) throws InterruptedException {
sync. acquireSharedInterruptibly ( 1 ) ;
}
public final void acquireSharedInterruptibly ( int arg)
throws InterruptedException {
if ( Thread. interrupted ( ) )
throw new InterruptedException ( ) ;
if ( tryAcquireShared ( arg) < 0 )
doAcquireSharedInterruptibly ( arg) ;
}
protected int tryAcquireShared ( int acquires) {
return nonfairTryAcquireShared ( acquires) ;
}
final int nonfairTryAcquireShared ( int acquires) {
for ( ; ; ) {
int available = getState ( ) ;
int remaining = available - acquires;
if (
remaining < 0 ||
compareAndSetState ( available, remaining)
) {
return remaining;
}
}
}
private void doAcquireSharedInterruptibly ( int arg) throws InterruptedException {
final Node node = addWaiter ( Node. SHARED) ;
boolean failed = true ;
try {
for ( ; ; ) {
final Node p = node. predecessor ( ) ;
if ( p == head) {
int r = tryAcquireShared ( arg) ;
if ( r >= 0 ) {
setHeadAndPropagate ( node, r) ;
p. next = null;
failed = false ;
return ;
}
}
if ( shouldParkAfterFailedAcquire ( p, node) &&
parkAndCheckInterrupt ( ) )
throw new InterruptedException ( ) ;
}
} finally {
if ( failed)
cancelAcquire ( node) ;
}
}
public void release ( ) {
sync. releaseShared ( 1 ) ;
}
public final boolean releaseShared ( int arg) {
if ( tryReleaseShared ( arg) ) {
doReleaseShared ( ) ;
return true ;
}
return false ;
}
protected final boolean tryReleaseShared ( int releases) {
for ( ; ; ) {
int current = getState ( ) ;
int next = current + releases;
if ( next < current)
throw new Error ( "Maximum permit count exceeded" ) ;
if ( compareAndSetState ( current, next) )
return true ;
}
}
}
class="hljs-button signin active" data-title="登录复制" data-report-click="{"spm":"1001.2101.3001.4334"}">
class="hide-preCode-box">1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99
CountdownLatch
CountDownLatch 允许多线程阻塞在一个地方,直至所有线程的任务都执行完毕。在 Java 并发中,countdownlatch 的概念是一个常见的面试题,所以一定要确保你很好的理解了它。
CountDownLatch 是共享锁的一种实现,它默认构造 AQS 的 state 值为 count。当线程使用 countDown方法时,其实使用了 tryReleaseShared 方法以CAS 的操作来减少 state ,直至 state 为 0 就代表所有的线程都调用了countDown方法。当调用 await 方法的时候,如果 state 不为0,就代表仍然有线程没有调用 countDown 方法,那么就把已经调用过 countDown 的线程都放入阻塞队列 Park ,并自旋 CAS 判断 state == 0,直至最后一个线程调用了 countDown ,使得 state == 0,于是阻塞的线程便判断成功,全部往下执行。
用来进行线程同步协作,等待所有线程完成倒计时。 其中构造参数用来初始化等待计数值,await() 用来等待计数归零,countDown() 用来让计数减一。
@Slf4j ( topic = "c.CountDownLatch" )
public class Code_16_CountDownLatchTest {
public static void main ( String[ ] args) throws InterruptedException {
method3 ( ) ;
}
public static void method1 ( ) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch ( 3 ) ;
new Thread ( ( ) - > {
log. info ( "t1 start ..." ) ;
try {
Thread. sleep ( 1000 ) ;
} catch ( InterruptedException e) {
e. printStackTrace ( ) ;
}
log. info ( "t1 end ..." ) ;
countDownLatch. countDown ( ) ;
} , "t1" ) . start ( ) ;
new Thread ( ( ) - > {
log. info ( "t2 start ..." ) ;
try {
Thread. sleep ( 2000 ) ;
} catch ( InterruptedException e) {
e. printStackTrace ( ) ;
}
log. info ( "t2 end ..." ) ;
countDownLatch. countDown ( ) ;
} , "t2" ) . start ( ) ;
new Thread ( ( ) - > {
log. info ( "t3 start ..." ) ;
try {
Thread. sleep ( 1500 ) ;
} catch ( InterruptedException e) {
e. printStackTrace ( ) ;
}
log. info ( "t3 end ..." ) ;
countDownLatch. countDown ( ) ;
} , "t3" ) . start ( ) ;
log. info ( "main wait ..." ) ;
countDownLatch. await ( ) ;
log. info ( "main wait end ..." ) ;
}
public static void method2 ( ) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch ( 3 ) ;
ExecutorService executorService = Executors. newFixedThreadPool ( 4 ) ;
executorService. submit ( ( ) - > {
log. info ( "t1 start ..." ) ;
try {
Thread. sleep ( 1000 ) ;
} catch ( InterruptedException e) {
e. printStackTrace ( ) ;
}
countDownLatch. countDown ( ) ;
log. info ( "t1 end ...{}" , countDownLatch. getCount ( ) ) ;
} ) ;
executorService. submit ( ( ) - > {
log. info ( "t2 start ..." ) ;
try {
Thread. sleep ( 2000 ) ;
} catch ( InterruptedException e) {
e. printStackTrace ( ) ;
}
log. info ( "t2 end ...{}" , countDownLatch. getCount ( ) ) ;
countDownLatch. countDown ( ) ;
} ) ;
executorService. submit ( ( ) - > {
log. info ( "t3 start ..." ) ;
try {
Thread. sleep ( 1500 ) ;
} catch ( InterruptedException e) {
e. printStackTrace ( ) ;
}
log. info ( "t3 end ...{}" , countDownLatch. getCount ( ) ) ;
countDownLatch. countDown ( ) ;
} ) ;
executorService. submit ( ( ) - > {
log. info ( "main wait ..." ) ;
try {
countDownLatch. await ( ) ;
} catch ( InterruptedException e) {
e. printStackTrace ( ) ;
}
log. info ( "main wait end ..." ) ;
executorService. shutdown ( ) ;
} ) ;
}
public static void method3 ( ) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch ( 10 ) ;
ExecutorService executorService = Executors. newFixedThreadPool ( 10 ) ;
String[ ] all = new String [ 10 ] ;
Random random = new Random ( ) ;
for ( int i = 0 ; i < 10 ; i++ ) {
int id = i;
executorService. submit ( ( ) - > {
for ( int j = 0 ; j <= 100 ; j++ ) {
try {
Thread. sleep ( random. nextInt ( 100 ) ) ;
} catch ( InterruptedException e) {
e. printStackTrace ( ) ;
}
all[ id] = j + "%" ;
System. out. print ( "\r" + Arrays. toString ( all) ) ;
}
countDownLatch. countDown ( ) ;
} ) ;
}
countDownLatch. await ( ) ;
System. out. println ( ) ;
System. out. println ( "游戏开始" ) ;
executorService. shutdown ( ) ;
}
}
class="hljs-button signin active" data-title="登录复制" data-report-click="{"spm":"1001.2101.3001.4334"}">
class="hide-preCode-box">1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133
CyclicBarrier
CyclicBarri[ˈsaɪklɪk ˈbæriɚ] 循环栅栏,用来进行线程协作,等待线程满足某个计数。构造时设置『计数个数』,每个线程执行到某个需要“同步”的时刻调用 await() 方法进行等待,当等待的线程数满足『计数个数』时,继续执行。跟 CountdownLatch 一样,但这个可以重用。
public static void main ( String[ ] args) {
ExecutorService executorService = Executors. newFixedThreadPool ( 2 ) ;
CyclicBarrier cyclicBarrier = new CyclicBarrier ( 2 , ( ) - > {
log. info ( "task2 finish ..." ) ;
} ) ;
for ( int i = 0 ; i < 3 ; i++ ) {
executorService. submit ( ( ) - > {
log. info ( "task1 begin ..." ) ;
try {
Thread. sleep ( 1000 ) ;
cyclicBarrier. await ( ) ;
} catch ( InterruptedException | BrokenBarrierException e) {
e. printStackTrace ( ) ;
}
} ) ;
executorService. submit ( ( ) - > {
log. info ( "task2 begin ..." ) ;
try {
Thread. sleep ( 2000 ) ;
cyclicBarrier. await ( ) ;
} catch ( InterruptedException | BrokenBarrierException e) {
e. printStackTrace ( ) ;
}
} ) ;
}
executorService. shutdown ( ) ;
}
class="hljs-button signin active" data-title="登录复制" data-report-click="{"spm":"1001.2101.3001.4334"}">
class="hide-preCode-box">1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
LinkedBlockingQueue
1)入队操作
public class LinkedBlockingQueue < E> extends AbstractQueue < E>
implements BlockingQueue < E> , java. io. Serializable {
static class Node < E> {
E item;
Node< E> next;
Node ( E x) { item = x; }
}
class="hljs-button signin active" data-title="登录复制" data-report-click="{"spm":"1001.2101.3001.4334"}">
class="hide-preCode-box">
private void enqueue ( Node< E> node) {
last = last. next = node;
}
class="hljs-button signin active" data-title="登录复制" data-report-click="{"spm":"1001.2101.3001.4334"}">
初始化链表 last = head = new Node(null); Dummy 节点用来占位,item 为 null。 当一个节点入队 last = last.next = node; 再来一个节点入队 last = last.next = node;
2)出队操作
private E dequeue ( ) {
Node< E> h = head;
Node< E> first = h. next;
h. next = h;
head = first;
E x = first. item;
first. item = null;
return x;
}
class="hljs-button signin active" data-title="登录复制" data-report-click="{"spm":"1001.2101.3001.4334"}">
h = head; first = h.next; h.next = h; head = first;
3)加锁分析
高明之处 在于用了两把锁和 dummy 节点
用一把锁,同一时刻,最多只允许有一个线程(生产者或消费者,二选一)执行 用两把锁,同一时刻,可以允许两个线程同时(一个生产者与一个消费者)执行
消费者与消费者线程仍然串行 生产者与生产者线程仍然串行
线程安全分析
当节点总数大于 2 时(包括 dummy 节点),putLock 保证的是 last 节点的线程安全,takeLock 保证的是 head 节点的线程安全。两把锁保证了入队和出队没有竞争 当节点总数等于 2 时(即一个 dummy 节点,一个正常节点)这时候,仍然是两把锁锁两个对象,不会竞争 当节点总数等于 1 时(就一个 dummy 节点)这时 take 线程会被 notEmpty 条件阻塞,有竞争,会阻塞
private final ReentrantLock putLock = new ReentrantLock ( ) ;
private final ReentrantLock takeLock = new ReentrantLock ( ) ;
class="hljs-button signin active" data-title="登录复制" data-report-click="{"spm":"1001.2101.3001.4334"}">
4)put 操作
public void put ( E e) throws InterruptedException {
if ( e == null) throw new NullPointerException ( ) ;
int c = - 1 ;
Node< E> node = new Node < E> ( e) ;
final ReentrantLock putLock = this . putLock;
final AtomicInteger count = this . count;
putLock. lockInterruptibly ( ) ;
try {
while ( count. get ( ) == capacity) {
notFull. await ( ) ;
}
enqueue ( node) ;
c = count. getAndIncrement ( ) ;
if ( c + 1 < capacity)
notFull. signal ( ) ;
} finally {
putLock. unlock ( ) ;
}
if ( c == 0 )
signalNotEmpty ( ) ;
}
class="hljs-button signin active" data-title="登录复制" data-report-click="{"spm":"1001.2101.3001.4334"}">
class="hide-preCode-box">1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
5)take 操作
public E take ( ) throws InterruptedException {
E x;
int c = - 1 ;
final AtomicInteger count = this . count;
final ReentrantLock takeLock = this . takeLock;
takeLock. lockInterruptibly ( ) ;
try {
while ( count. get ( ) == 0 ) {
notEmpty. await ( ) ;
}
x = dequeue ( ) ;
c = count. getAndDecrement ( ) ;
if ( c > 1 )
notEmpty. signal ( ) ;
} finally {
takeLock. unlock ( ) ;
}
if ( c == capacity)
signalNotFull ( )
return x;
}
class="hljs-button signin active" data-title="登录复制" data-report-click="{"spm":"1001.2101.3001.4334"}">
class="hide-preCode-box">1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
注意:由 put 唤醒 put 是为了避免信号不足
6)性能比较
主要列举 LinkedBlockingQueue 与 ArrayBlockingQueue 的性能比较
Linked 支持有界,Array 强制有界 Linked 实现是链表,Array 实现是数组 Linked 是懒惰的,而 Array 需要提前初始化 Node 数组 Linked 每次入队会生成新 Node,而 Array 的 Node 是提前创建好的 Linked 两把锁,Array 一把锁
data-report-view="{"mod":"1585297308_001","spm":"1001.2101.3001.6548","dest":"http://iyenn.com/rec/1724131.html","extend1":"pc","ab":"new"}">>
评论记录:
回复评论: