本文最后更新于:2024年3月18日 凌晨
Java ReentrantLock与AQS
Java中的大部分同步类(Lock,Semaphore,ReentrantLock等)都是基于AbstractQueuedSynchronizer(简称为AQS)实现的,AQS是一种提供了原子式管理同步状态,阻塞和唤醒线程功能以及队列模型的简单框架。
ReentrantLock
ReentrantLock特性概览
ReentrantLock意思为可重入锁,指的是一个线程能够对一个临界资源重复加锁,为了帮助大家更好地理解ReentrantLock的特性,我们先将ReentrantLock跟常用的Synchronized进行比较,其特性如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 synchronized (this ) {}synchronized (object) {}public synchronized void test () {}for (int i = 0 ; i < 100 ; i++) { synchronized (this ) {} }public void test () throw Exception { ReentrantLock lock = new ReentrantLock(true ); lock.lock(); try { try { if (lock.tryLock(100 , TimeUnit.MILLISECONDS)){ } } finally { lock.unlock() } } finally { lock.unlock(); } }
ReentrantLock与AQS的关联
1 2 3 4 5 6 7 8 9 10 11 static final class NonfairSync extends Sync { final void lock () { if (compareAndSetState(0 , 1 )) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1 ); } }
这块代码的含义为:
若通过CAS设置变量State(同步状态)成功,也就是获取锁成功,则将当前线程设置为独占线程。
若通过CAS设置变量State(同步状态)失败,也就是获取锁失败,则进入Acquire方法进行后续处理。
某个线程获取锁失败的后续流程有以下两种可能:
当前线程获锁结果设置为失败,获取锁流程结束,这种设计会极大降低系统的并发度,并不满足我们实际的需求。
AQS框架的处理流程:存在某种排队等候机制,线程继续等待,仍然保留获取锁的可能,获取锁流程仍在继续。
带着非公平锁的这些问题,再看下公平锁源码中获锁的方式:
1 2 3 4 5 6 7 static final class FairSync extends Sync { final void lock () { acquire(1 ); } }
结合公平锁和非公平锁的加锁流程,虽然流程上有一定的不同,但是都调用了Acquire方法,而Acquire方法是FairSync和UnfairSync的父类AQS中的核心方法。
AQS
上图中有颜色的为Method,无颜色的为Attribution
AQS框架共分为五层,自上而下由浅入深,从AQS对外暴露的API到底层基础数据。
当有自定义同步器接入时,只需重写第一层所需要的部分方法即可,不需要关注底层具体的实现流程,当自定义同步器进行加锁或者解锁操作时,先经过第一层的API进入AQS内部方法,然后经过第二层进行锁的获取,接着对于获取锁失败的流程,进入第三层和第四层的等待队列处理,而这些处理方式均依赖于第五层的基础数据提供层。
原理概览
AQS核心思想是,如果被请求的共享资源空闲,那么就将当前请求资源的线程设置为有效的工作线程,将共享资源设置为锁定状态,如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配,这个机制主要用的是CLH队列的变体实现的,将暂时获取不到锁的线程加入到队列中。
CLH:Craig,Landin and Hagersten队列,是单向链表,AQS中的队列是CLH变体的虚拟双向队列(FIFO),AQS是通过将每条请求共享资源的线程封装成一个节点来实现锁的分配。
主要原理图如下:
AQS使用一个Volatile的int类型的成员变量来表示同步状态,通过内置的FIFO队列来完成资源获取的排队工作,通过CAS完成对State值的修改。
AQS数据结构
先来看下AQS中最基本的数据结构——Node,Node即为上面CLH变体队列中的节点。
方法和属性值
含义
waitStatus
当前节点在队列中的状态
thread
表示处于该节点的线程
prev
前驱指针
predecessor()
返回前驱节点,没有的话抛出npe
nextWaiter
指向下一个处于CONDITION状态的节点
next
后继指针
模式
含义
SHARED
表示线程以共享的模式等待锁
EXCLUSIVE
表示线程正在以独占的方式等待锁
枚举
含义
0
当一个Node被初始化的时候的默认值
CANCELLED
为1,表示线程获取锁的请求已经取消了
CONDITION
为-2,表示节点在等待队列中,节点线程等待唤醒
PROPAGATE
为-3,当前线程处在SHARED情ryryt况下,该字段才会使用
SIGNAL
为-1,表示线程已经准备好了,就等资源释放了
同步状态State
AQS的同步状态State:AQS中维护了一个名为state的字段,意为同步状态,是由Volatile修饰的,用于展示当前临界资源的获锁情况。
1 2 3 private volatile int state;
方法名
描述
protected final int getState()
获取State的值
protected final void setState(int newState)
设置State的值
protected final boolean compareAndSetState(int expect, int update)
使用CAS方式更新State
这几个方法都是Final修饰的,说明子类中无法重写它们,我们可以通过修改State字段表示的同步状态来实现多线程的独占模式和共享模式(加锁过程)
对于我们自定义的同步工具,需要自定义获取同步状态和释放状态的方式,也就是AQS架构图中的第一层:API层。
AQS重要方法与ReentrantLock的关联
从架构图中可以得知,AQS提供了大量用于自定义同步器实现的Protected方法,自定义同步器实现的相关方法也只是为了通过修改State字段来实现多线程的独占模式或者共享模式,自定义同步器需要实现以下方法(ReentrantLock需要实现的方法如下,并不是全部):
方法名
描述
protected boolean isHeldExclusively()
该线程是否正在独占资源,只有用到Condition才需要去实现它,
protected boolean tryAcquire(int arg)
独占方式,arg为获取锁的次数,尝试获取资源,成功则返回True,失败则返回False,
protected boolean tryRelease(int arg)
独占方式,arg为释放锁的次数,尝试释放资源,成功则返回True,失败则返回False,
protected int tryAcquireShared(int arg)
共享方式,arg为获取锁的次数,尝试获取资源,负数表示失败,0表示成功,但没有剩余可用资源,正数表示成功,且有剩余资源,
protected boolean tryReleaseShared(int arg)
共享方式,arg为释放锁的次数,尝试释放资源,如果释放后允许唤醒后续等待结点返回True,否则返回False,
一般来说,自定义同步器要么是独占方式,要么是共享方式,它们也只需实现tryAcquire-tryRelease
,tryAcquireShared-tryReleaseShared
中的一种即可,AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock,ReentrantLock是独占锁,所以实现了tryAcquire-tryRelease
以非公平锁为例,这里主要阐述一下非公平锁与AQS之间方法的关联之处,具体每一处核心方法的作用会在文章后面详细进行阐述。
为了帮助大家理解ReentrantLock和AQS之间方法的交互过程,以非公平锁为例,我们将加锁和解锁的交互流程单独拎出来强调一下,以便于对后续内容的理解。
加锁
通过ReentrantLock的加锁方法Lock进行加锁操作。
会调用到内部类Sync的Lock方法,由于Sync#lock是抽象方法,根据ReentrantLock初始化选择的公平锁和非公平锁,执行相关内部类的Lock方法,本质上都会执行AQS的Acquire方法。
AQS的Acquire方法会执行tryAcquire方法,但是由于tryAcquire需要自定义同步器实现,因此执行了ReentrantLock中的tryAcquire方法,由于ReentrantLock是通过公平锁和非公平锁内部类实现的tryAcquire方法,因此会根据锁类型不同,执行不同的tryAcquire
tryAcquire是获取锁逻辑,获取失败后,会执行框架AQS的后续逻辑,跟ReentrantLock自定义同步器无关。
解锁
通过ReentrantLock的解锁方法Unlock进行解锁。
Unlock会调用内部类Sync的Release方法,该方法继承于AQS
Release中会调用tryRelease方法,tryRelease需要自定义同步器实现,tryRelease只在ReentrantLock中的Sync实现,因此可以看出,释放锁的过程,并不区分是否为公平锁。
释放成功后,所有处理由AQS框架完成,与自定义同步器无关。
通过上面的描述,大概可以总结出ReentrantLock加锁解锁时API层核心方法的映射关系。
通过ReentrantLock理解AQS
ReentrantLock中公平锁和非公平锁在底层是相同的,这里以非公平锁为例进行分析。
在非公平锁中,有一段这样的代码:
1 2 3 4 5 6 7 8 9 10 11 12 static final class NonfairSync extends Sync { ... final void lock () { if (compareAndSetState(0 , 1 )) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1 ); } ... }
1 2 3 4 5 6 public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
1 2 3 4 5 protected boolean tryAcquire (int arg) { throw new UnsupportedOperationException(); }
可以看出,这里只是AQS的简单实现,具体获取锁的实现方法是由各自的公平锁和非公平锁单独实现的(以ReentrantLock为例),如果该方法返回了True,则说明当前线程获取锁成功,就不用往后执行了,如果获取失败,就需要加入到等待队列中,下面会详细解释线程是何时以及怎样被加入进等待队列中的。
线程加入等待队列
加入队列的时机
当执行Acquire(1)
时,会通过tryAcquire获取锁,在这种情况下,如果获取锁失败,就会调用addWaiter加入到等待队列中去。
如何加入队列
获取锁失败后,会执行addWaiter(Node.EXCLUSIVE)
加入等待队列,具体实现方法如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 private Node addWaiter (Node mode) { Node node = new Node(Thread.currentThread(), mode); Node pred = tail; if (pred != null ) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }private final boolean compareAndSetTail (Node expect, Node update) { return unsafe.compareAndSwapObject(this , tailOffset, expect, update); }
主要的流程如下:
通过当前的线程和锁模式新建一个节点。
Pred指针指向尾节点Tail
将New中Node的Prev指针指向Pred
通过compareAndSetTail方法,完成尾节点的设置,这个方法主要是对tailOffset和Expect进行比较,如果tailOffset的Node和Expect的Node地址是相同的,那么设置Tail的值为Update的值。
1 2 3 4 5 6 7 8 9 10 11 12 13 static { try { stateOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("state" )); headOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("head" )); tailOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("tail" )); waitStatusOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("waitStatus" )); nextOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("next" )); } catch (Exception ex) { throw new Error(ex); } }
从AQS的静态代码块可以看出,都是获取一个对象的属性相对于该对象在内存当中的偏移量,这样我们就可以根据这个偏移量在对象内存当中找到这个属性,tailOffset指的是tail对应的偏移量,所以这个时候会将new出来的Node置为当前队列的尾节点,同时,由于是双向链表,也需要将前一个节点指向尾节点。
如果Pred指针是Null(说明等待队列中没有元素),或者当前Pred指针和Tail指向的位置不同(说明被别的线程已经修改),就需要看一下Enq的方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 private Node enq (final Node node) { for (;;) { Node t = tail; if (t == null ) { if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
如果没有被初始化,需要进行初始化一个头结点出来,但请注意,初始化的头结点并不是当前线程节点,而是调用了无参构造函数的节点,如果经历了初始化或者并发导致队列中有元素,则与之前的方法相同,其实,addWaiter就是一个在双端链表添加尾节点的操作,需要注意的是,双端链表的头结点是一个无参构造函数的头结点。
总结一下,线程获取锁的时候,过程大体如下:
当没有线程获取到锁时,线程1获取锁成功。
线程2申请锁,但是锁被线程1占有。
如果再有线程要获取锁,依次在队列中往后排队即可。
回到上边的代码,hasQueuedPredecessors
是公平锁加锁时判断等待队列中是否存在有效节点的方法,如果返回False,说明当前线程可以争取共享资源,如果返回True,说明队列中存在有效节点,当前线程必须加入到等待队列中。
1 2 3 4 5 6 7 8 9 10 11 public final boolean hasQueuedPredecessors () { Node t = tail; Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
看到这里,我们理解一下h != t && ((s = h.next) == null || s.thread != Thread.currentThread());
为什么要判断的头结点的下一个节点?第一个节点储存的数据是什么?
双向链表中,第一个节点为虚节点,其实并不存储任何信息,只是占位,真正的第一个有数据的节点,是在第二个节点开始的,当h != t时:如果(s = h.next) == null,等待队列正在有线程进行初始化,但只是进行到了Tail指向Head,没有将Head指向Tail,此时队列中有元素,需要返回True(这块具体见下边代码分析),如果(s = h.next) != null,说明此时队列中至少有一个有效节点,如果此时s.thread == Thread.currentThread(),说明等待队列的第一个有效节点中的线程与当前线程相同,那么当前线程是可以获取资源的,如果s.thread != Thread.currentThread(),说明等待队列的第一个有效节点线程与当前线程不同,当前线程必须加入进等待队列。
1 2 3 4 5 6 7 8 9 10 11 12 if (t == null ) { if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } }
节点入队不是原子操作,所以会出现短暂的head != tail,此时Tail指向最后一个节点,而且Tail指向Head,如果Head没有指向Tail(可见5,6,7行),这种情况下也需要将相关线程加入队列中,所以这块代码是为了解决极端情况下的并发问题。
等待队列中线程出队列时机
1 2 3 4 5 6 public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
上文解释了addWaiter方法,这个方法其实就是把对应的线程以Node的数据结构形式加入到双端队列里,返回的是一个包含该线程的Node,而这个Node会作为参数,进入到acquireQueued方法中,acquireQueued方法可以对排队中的线程进行"获锁”操作。
总的来说,一个线程获取锁失败了,被放入等待队列,acquireQueued会把放入队列中的线程不断去获取锁,直到获取成功或者不再需要获取(中断)
下面我们从"何时出队列?”和"如何出队列?”两个方向来分析一下acquireQueued源码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 final boolean acquireQueued (final Node node, int arg) { boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; failed = false ; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true ; } } finally { if (failed) cancelAcquire(node); } }
注意 :setHead方法是把当前节点置为虚节点,但并没有修改waitStatus,因为它是一直需要用的数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 private void setHead (Node node) { head = node; node.thread = null ; node.prev = null ; }private static boolean shouldParkAfterFailedAcquire (Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true ; if (ws > 0 ) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0 ); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false ; }
parkAndCheckInterrupt主要用于挂起当前线程,阻塞调用栈,返回当前线程的中断状态。
1 2 3 4 5 6 private final boolean parkAndCheckInterrupt () { LockSupport.park(this ); return Thread.interrupted(); }
从上图可以看出,跳出当前循环的条件是当"前置节点是头结点,且当前线程获取锁成功”,为了防止因死循环导致CPU资源被浪费,我们会判断前置节点的状态来决定是否要将当前线程挂起,具体挂起流程用流程图表示如下(shouldParkAfterFailedAcquire流程):
从队列中释放节点的疑虑打消了,那么又有新问题了:
shouldParkAfterFailedAcquire中取消节点是怎么生成的呢?什么时候会把一个节点的waitStatus设置为-1?
是在什么时间释放节点通知到被挂起的线程呢?
CANCELLED状态节点生成
acquireQueued方法中的Finally代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 final boolean acquireQueued (final Node node, int arg) { boolean failed = true ; try { ... for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { ... failed = false ; ... } ... } finally { if (failed) cancelAcquire(node); } }
通过cancelAcquire
方法,将Node的状态标记为CANCELLED,接下来,我们逐行来分析这个方法的原理:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 private void cancelAcquire (Node node) { if (node == null ) return ; node.thread = null ; Node pred = node.prev; while (pred.waitStatus > 0 ) node.prev = pred = pred.prev; Node predNext = pred.next; node.waitStatus = Node.CANCELLED; if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null ); } else { int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null ) { Node next = node.next; if (next != null && next.waitStatus <= 0 ) compareAndSetNext(pred, predNext, next); } else { unparkSuccessor(node); } node.next = node; } }
当前的流程:获取当前节点的前驱节点,如果前驱节点的状态是CANCELLED,那就一直往前遍历,找到第一个waitStatus <= 0的节点,将找到的Pred节点和当前Node关联,将当前Node设置为CANCELLED
根据当前节点的位置,考虑以下三种情况:
当前节点是尾节点。
当前节点是Head的后继节点。
当前节点不是Head的后继节点,也不是尾节点。
根据上述第二条,我们来分析每一种情况的流程。
当前节点是尾节点。
当前节点是Head的后继节点。
当前节点不是Head的后继节点,也不是尾节点。
通过上面的流程,我们对于CANCELLED节点状态的产生和变化已经有了大致的了解,但是为什么所有的变化都是对Next指针进行了操作,而没有对Prev指针进行操作呢?什么情况下会对Prev指针进行操作?
执行cancelAcquire的时候,当前节点的前置节点可能已经从队列中出去了(已经执行过Try代码块中的shouldParkAfterFailedAcquire方法了),如果此时修改Prev指针,有可能会导致Prev指向另一个已经移除队列的Node,因此这块变化Prev指针不安全,shouldParkAfterFailedAcquire方法中,会执行下面的代码,其实就是在处理Prev指针,shouldParkAfterFailedAcquire是获取锁失败的情况下才会执行,进入该方法后,说明共享资源已被获取,当前节点之前的节点都不会出现变化,因此这个时候变更Prev指针比较安全。
1 2 3 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0 );
如何解锁
我们已经剖析了加锁过程中的基本流程,接下来再对解锁的基本流程进行分析,由于ReentrantLock在解锁的时候,并不区分公平锁和非公平锁,所以我们直接看解锁的源码:
1 2 3 4 5 public void unlock () { sync.release(1 ); }
1 2 3 4 5 6 7 8 9 10 11 public final boolean release (int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0 ) unparkSuccessor(h); return true ; } return false ; }
在ReentrantLock里面的公平锁和非公平锁的父类Sync定义了可重入锁的释放锁机制。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 protected final boolean tryRelease (int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false ; if (c == 0 ) { free = true ; setExclusiveOwnerThread(null ); } setState(c); return free; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public final boolean release (int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0 ) unparkSuccessor(h); return true ; } return false ; }
这里的判断条件为什么是h != null && h.waitStatus != 0
h == null Head还没初始化,初始情况下,head == null,第一个节点入队,Head会被初始化一个虚拟节点,所以说,这里如果还没来得及入队,就会出现head == null 的情况。
h != null && waitStatus == 0 表明后继节点对应的线程仍在运行中,不需要唤醒。
h != null && waitStatus < 0 表明后继节点可能被阻塞了,需要唤醒。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 private void unparkSuccessor (Node node) { int ws = node.waitStatus; if (ws < 0 ) compareAndSetWaitStatus(node, ws, 0 ); Node s = node.next; if (s == null || s.waitStatus > 0 ) { s = null ; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0 ) s = t; } if (s != null ) LockSupport.unpark(s.thread); }
为什么要从后往前找第一个非Cancelled的节点呢?原因如下。
之前的addWaiter方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 private Node addWaiter (Node mode) { Node node = new Node(Thread.currentThread(), mode); Node pred = tail; if (pred != null ) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
我们从这里可以看到,节点入队并不是原子操作,也就是说,node.prev = pred; compareAndSetTail(pred, node)
这两个地方可以看作Tail入队的原子操作,但是此时pred.next = node;还没执行,如果这个时候执行了unparkSuccessor方法,就没办法从前往后找了,所以需要从后往前找,还有一点原因,在产生CANCELLED状态节点的时候,先断开的是Next指针,Prev指针并未断开,因此也是必须要从后往前遍历才能够遍历完全部的Node
综上所述,如果是从前往后找,由于极端情况下入队的非原子操作和CANCELLED节点产生过程中断开Next指针的操作,可能会导致无法遍历所有的节点,所以,唤醒对应的线程后,对应的线程就会继续往下执行,继续执行acquireQueued方法以后,中断如何处理?
中断恢复后的执行流程
唤醒后,会执行return Thread.interrupted();
,这个函数返回的是当前执行线程的中断状态,并清除。
1 2 3 4 5 6 private final boolean parkAndCheckInterrupt () { LockSupport.park(this ); return Thread.interrupted(); }
再回到acquireQueued代码,当parkAndCheckInterrupt返回True或者False的时候,interrupted的值不同,但都会执行下次循环,如果这个时候获取锁成功,就会把当前interrupted返回。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 final boolean acquireQueued (final Node node, int arg) { boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; failed = false ; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true ; } } finally { if (failed) cancelAcquire(node); } }
如果acquireQueued为True,就会执行selfInterrupt方法。
1 2 3 4 5 static void selfInterrupt () { Thread.currentThread().interrupt(); }
该方法其实是为了中断线程,但为什么获取了锁以后还要中断线程呢?这部分属于Java提供的协作式中断知识内容,感兴趣同学可以查阅一下,这里简单介绍一下:
当中断线程被唤醒时,并不知道被唤醒的原因,可能是当前线程在等待中被中断,也可能是释放了锁以后被唤醒,因此我们通过Thread.interrupted()方法检查中断标记(该方法返回了当前线程的中断状态,并将当前线程的中断标识设置为False),并记录下来,如果发现该线程被中断过,就再中断一次。
线程在等待资源的过程中被唤醒,唤醒后还是会不断地去尝试获取锁,直到抢到锁为止,也就是说,在整个流程中,并不响应中断,只是记录中断记录,最后抢到锁返回了,那么如果被中断过的话,就需要补充一次中断。
这里的处理方式主要是运用线程池中基本运作单元Worder中的runWorker,通过Thread.interrupted()进行额外的判断处理,感兴趣的同学可以看下ThreadPoolExecutor源码。
AQS应用
ReentrantLock的可重入应用
ReentrantLock的可重入性是AQS很好的应用之一,在了解完上述知识点以后,我们很容易得知ReentrantLock实现可重入的方法,在ReentrantLock里面,不管是公平锁还是非公平锁,都有一段逻辑。
公平锁:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 if (c == 0 ) { if (!hasQueuedPredecessors() && compareAndSetState(0 , acquires)) { setExclusiveOwnerThread(current); return true ; } }else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0 ) throw new Error("Maximum lock count exceeded" ); setState(nextc); return true ; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 if (c == 0 ) { if (compareAndSetState(0 , acquires)){ setExclusiveOwnerThread(current); return true ; } }else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0 ) throw new Error("Maximum lock count exceeded" ); setState(nextc); return true ; }
从上面这两段都可以看到,有一个同步状态State来控制整体可重入的情况,State是Volatile修饰的,用于保证一定的可见性和有序性。
1 2 3 private volatile int state;
接下来看State这个字段主要的过程:
State初始化的时候为0,表示没有任何线程持有锁。
当有线程持有该锁时,值就会在原来的基础上+1,同一个线程多次获得锁时,就会多次+1,这里就是可重入的概念。
解锁也是对这个字段-1,一直到0,此线程对锁释放。
JUC中的应用场景
除了上边ReentrantLock的可重入性的应用,AQS作为并发编程的框架,为很多其他同步工具提供了良好的解决方案,下面列出了JUC中的几种同步工具,大体介绍一下AQS的应用场景:
同步工具
同步工具与AQS的关联
ReentrantLock
使用AQS保存锁重复持有的次数,当一个线程获取锁时,ReentrantLock记录当前获得锁的线程标识,用于检测是否重复获取,以及错误线程试图解锁操作时异常情况的处理,
Semaphore
使用AQS同步状态来保存信号量的当前计数,tryRelease会增加计数,acquireShared会减少计数,
CountDownLatch
使用AQS同步状态来表示计数,计数为0时,所有的Acquire操作(CountDownLatch的await方法)才可以通过,
ReentrantReadWriteLock
使用AQS同步状态中的16位保存写锁持有的次数,剩下的16位用于保存读锁的持有次数,
ThreadPoolExecutor
Worker利用AQS同步状态实现对独占线程变量的设置(tryAcquire和tryRelease),
自定义同步工具
了解AQS基本原理以后,按照上面所说的AQS知识点,自己实现一个同步工具。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public class LeeLock { private static class Sync extends AbstractQueuedSynchronizer { @Override protected boolean tryAcquire (int arg) { return compareAndSetState(0 , 1 ); } @Override protected boolean tryRelease (int arg) { setState(0 ); return true ; } @Override protected boolean isHeldExclusively () { return getState() == 1 ; } } private Sync sync = new Sync(); public void lock () { sync.acquire(1 ); } public void unlock () { sync.release(1 ); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public class LeeMain { static int count = 0 ; static LeeLock leeLock = new LeeLock(); public static void main (String[] args) throws InterruptedException { Runnable runnable = new Runnable() { @Override public void run () { try { leeLock.lock(); for (int i = 0 ; i < 10000 ; i++) { count++; } } catch (Exception e) { e.printStackTrace(); } finally { leeLock.unlock(); } } }; Thread thread1 = new Thread(runnable); Thread thread2 = new Thread(runnable); thread1.start(); thread2.start(); thread1.join(); thread2.join(); System.out.println(count); } }
上述代码每次运行结果都会是20000,通过简单的几行代码就能实现同步功能,这就是AQS的强大之处。