AQS介绍
AbstractQueuedSynchronizer(AQS)提供了一套可用于实现锁同步机制的框架,不夸张地说,AQS是JUC同步框架的基石。AQS通过一个FIFO队列维护线程同步状态
,实现类只需要继承该类,并重写指定方法即可实现一套线程同步机制。
AQS根据资源互斥级别提供了独占和共享两种资源访问模式;同时其定义Condition
结构提供了wait/signal等待唤醒机制
。在JUC中,诸如ReentrantLock
、CountDownLatch
等都基于AQS实现。
AQS框架
AQS原理
AQS的原理并不复杂,AQS维护了一个volatile int state
变量和一个CLH(三个人名缩写)双向队列
,队列中的节点持有线程引用,每个节点均可通过getState()、setState()和compareAndSetState()
对state进行修改和访问。
当线程获取锁时,即试图对state变量做修改,如修改成功则获取锁;如修改失败则包装为节点挂载到队列中,等待持有锁的线程释放锁并唤醒队列中的节点。
AQS模版方法
AQS内部封装了队列维护逻辑,采用模版方法的模式提供实现类以下方法:
tryAcquire(int); // 尝试获取独占锁,可获取返回true,否则false
tryRelease(int); // 尝试释放独占锁,可释放返回true,否则false
tryAcquireShared(int); // 尝试以共享方式获取锁,失败返回负数,只能获取一次返回0,否则返回个数
tryReleaseShared(int); // 尝试释放共享锁,可获取返回true,否则false
isHeldExclusively(); // 判断线程是否独占资源
如实现类只需实现独占锁/共享锁功能,可只实现tryAcquire/tryRelease
或tryAcquireShared/tryReleaseShared
。虽然实现tryAcquire/tryRelease可自行设定逻辑,但建议使用state方法对state变量进行操作以实现同步类。
如下是一个简单的同步锁实现示例:
public class Mutex extends AbstractQueuedSynchronizer {
@Override
public boolean tryAcquire(int arg) {
return compareAndSetState(0, 1);
}
@Override
public boolean tryRelease(int arg) {
return compareAndSetState(1, 0);
}
public static void main(String[] args) {
final Mutex mutex = new Mutex();
new Thread(() -> {
System.out.println("thread1 acquire mutex");
mutex.acquire(1);
// 获取资源后sleep保持
try {
TimeUnit.SECONDS.sleep(5);
} catch(InterruptedException ignore) {
}
mutex.release(1);
System.out.println("thread1 release mutex");
}).start();
new Thread(() -> {
// 保证线程2在线程1启动后执行
try {
TimeUnit.SECONDS.sleep(1);
} catch(InterruptedException ignore) {
}
// 等待线程1 sleep结束释放资源
mutex.acquire(1);
System.out.println("thread2 acquire mutex");
mutex.release(1);
}).start()
}
}
示例代码简单通过AQS实现一个互斥操作,线程1获取mutex后,线程2的acquire陷入阻塞,直到线程1释放。其中tryAcquire/acquire/tryRelease/release的arg参数可按实现逻辑自定义传入值,无具体要求。
@param arg the acquire argument. This value is conveyed to {@link #tryAcquire} but is otherwise uninterpreted and can represent anyting you like.
AQS核心结构
Node
前文提到,在AQS中如果线程获取资源失败,会包装成一个节点挂载到CLH队列上,AQS中定义了Node类用于包装线程。
Node主要包含5个核心字段:
-
waitStatus:当前节点状态,该字段共有5种取值:
-
CANCELLED = 1。节点引用线程由于等待超时或被打断时的状态。
-
SIGNAL = -1。后继节点线程需要被唤醒时的当前节点状态。当队列中加入后继节点被挂起(block)时,其前驱节点会被设置为SIGNAL状态,表示该节点需要被唤醒。
-
CONDITION = -2。当节点线程进入condition队列时的状态。(见ConditionObject)
-
PROPAGATE = -3。仅在释放共享锁releaseShared时对头节点使用。(见共享锁分析)
-
0。节点初始化时的状态。
-
prev:前驱节点。
-
next:后继节点。
-
thread:引用线程,头节点不包含线程。
-
nextWaiter:condition条件队列。(见ConditionObject)
独占锁分析
acquire
public final void acquire(int arg) {
// tryAcquire需实现类处理
// 如获取资源成功,直接返回
if (!tryAcquire(arg) &&
// 如获取资源失败,将线程包装为Node添加到队列中阻塞等待
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 如阻塞线程被打断
selfInterrupt();
}
acquire核心为tryAcquire、addWaiter和acquireQueued三个函数,其中tryAcquire需具体类实现。 每当线程调用acquire时都首先会调用tryAcquire,失败后才会挂载到队列,因此acquire实现默认为非公平锁。
addWaiter将线程包装为独占节点,尾插式加入到队列中,如队列为空,则会添加一个空的头节点。值得注意的是addWaiter中的enq方法,通过CAS+自旋的方式处理尾节点添加冲突。
acquireQueue在线程节点加入队列后判断是否可再次尝试获取资源,如不能获取则将其前驱节点标志为SIGNAL状态(表示其需要被unpark唤醒)后,则通过park进入阻塞状态。
参照流程图,acquireQueued方法核心逻辑为for(;;)和shouldParkAfterFailedAcquire。tail节点默认初始状态为0,当新节点被挂载到队列后,将其前驱即原tail节点状态设为SIGNAL,表示该节点需要被唤醒,返回true后即被park陷入阻塞。for循环直到节点前驱为head后才尝试进行资源获取。
release
release流程较为简单,尝试释放成功后,即从头结点开始唤醒其后继节点,如后继节点被取消,则转为从尾部开始找阻塞的节点将其唤醒。阻塞节点被唤醒后,即进入acquireQueued中的for(;;)循环开始新一轮的资源竞争。
共享锁分析
acquireShared & releaseShared
public final void acquireShared(int arg) {
// 负数表示获取共享锁失败,不同于tryAcquire的bool返回
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
acquireShared和releaseShared整体流程与独占锁类似,tryAcquireShared获取失败后以Node.SHARED挂载到队尾阻塞,直到队头节点将其唤醒。在doAcquireShared与独占锁不同的是,由于共享锁是可以被多个线程获取的,因此在首个阻塞节点被唤醒后,会通过setHeadAndPropagate传递唤醒后续的阻塞节点。
// doAcquireShared核心代码
final Node node = addWaiter(Node.SHARED);
...
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
// r>=0 表示获取锁成功,调整头结点并传递唤醒
setHeadAndPropagate(node, r);
}
}
...
}
setHeadAndPropagate和doReleaseShared构成共享锁唤醒的核心逻辑。
这两方法的逻辑较为简单,不再进行展开,主要对setheadAndPropagate的多节点唤醒判断逻辑做出分析。 进入setHeadAndPropagate,首先需要明确的是,该函数的传入参数propagate一定是非负数,接下来其唤醒主要为两个判断逻辑:
-
如果propagate > 0,表示存在多个共享锁可以获取,可直接进行doReleaseShared唤醒阻塞节点。
-
如果propagate = 0,表示仅当前节点可被唤醒,则有两种情况:
-
h == null || h.waitStatus < 0,通常情况下h != null,现给出h.waitStatus < 0的场景。
- (h = head) == null || h.waitStatus < 0的场景执行序列如下:
独占锁共享锁小结
- 1、独占锁共享锁默认都是非公平获取策略,可能被插队。
- 2、独占锁只有一个线程可获取,其他线程均被阻塞在队列中;共享锁可以有多个线程获取。
- 3、独占锁释放仅唤醒一个阻塞节点,共享锁可以根据可用数量,一次唤醒多个阻塞节点
ConditionObject
AQS中Node除了组成阻塞队列外,还在ConditionObject中得到应用,ConditionObject的核心定义为:
public class ConditionObject implements Condition, java.io.Serializable {
...
private transient Node firstWaiter;
private transient Node lastWaiter;
...
}
ConditionObject通过Node也构成了一个FIFO的队列,那么ConditionObject为AQS提供了怎样的功能呢?
public interface Condition {
...
void await() throws InterruptedException;
void signal();
void signalAll();
...
}
查看Condition接口的定义,可以看到其定义的方法与Object类的wait/notify/notifyAll功能是一致的。
在Synchronized详解中笔者曾对ObjectMonitor做过简单介绍,其中ObjectMonitor包含_WaitSet和_EntryList两个队列,分别用于存储wait调用和sychronized锁竞争时挂起的线程,而AQS通过ConditionObject同样也提供了wait/notify机制的阻塞队列。
ConditionObject机制如上图,在条件队列中,Node采用nextWaiter组成单向链表,当持有锁的线程发起condition.await调用后,会包装为Node挂载到Condition条件阻塞队列中;当对应condition.signal被触发后,条件阻塞队列中的节点将被唤醒并挂载到锁阻塞队列中。ConditionObject的队列逻辑与前述的acquire/release大同小异,不再赘述。
评论( 0 )