如何维护网站的运营网络推广公司介绍
关于 AQS,网上已经有无数的文章阐述 AQS 的使用及其源码,所以多这么一篇文章也没啥所谓,还能总结一下研究过的源码。源码解析和某某的使用,大概是互联网上 Java 文章中写得最多的主题了。
AQS
AQS 是 AbstractQueuedSynchronizer 的缩写,中文翻译过来就是抽象队列同步器。ReentrantLock
、ReentrantReadWriteLock
、Semaphore
、CountDownLatch
都是基于 AQS。AQS 的核心思想是,当线程请求获取资源时,如果资源空闲,则会将当前线程设置为资源的独占线程,成功获得锁;否则将获取锁失败的线程加入到排队队列中(CLH),并提供线程阻塞和线程唤醒机制。CLH 是一个虚拟的双向队列。
首先看一下 AQS 的关键属性。
// java.util.concurrent.locks.AbstractQueuedSynchronizer
private transient volatile Node head; //队列的头节点
private transient volatile Node tail; //队列的尾节点
private volatile int state; //同步状态
state
用于实现锁的可重入性。
- 0 为表示没有线程持有该锁
- 当存在线程获取锁成功,则 state 变为 1。如果是同一线程重复获得,则 state++
- 如果存在线程释放锁,则 state–
上面的三个属性都存在对应的以 CAS 方式进行修改的方法,state
对应的是 compareAndSetState()
方法, head
对应的是 compareAndSetHead()
方法,tail
对应的是 compareAndSetTail()
。以 CAS 的方式修改值能避免锁的竞争。
因为请求获取锁的线程会以 Node 节点的方式在 CLH 队列中排队,在分析 AQS 机制时也会大量涉及到 Node 节点,所以很有必要对 Node 节点进行分析。
CLH 队列中 Node 节点。
// java.util.concurrent.locks.AbstractQueuedSynchronizer.Node
volatile int waitStatus; //当前节点在队列中的状态
volatile Node prev; //前驱节点
volatile Node next; //后继节点
volatile Thread thread; //在该节点中排队的线程
Node nextWaiter; //下一个处于condition或共享状态的节点//等待锁的两种状态
static final Node SHARED = new Node(); //共享
static final Node EXCLUSIVE = null; //独占//waitStatus的几个值
static final int CANCELLED = 1; //已取消
static final int SIGNAL = -1; //后继节点的线程需要唤醒
static final int CONDITION = -2; //节点处于等待队列中
static final int PROPAGATE = -3; //线程处在SHARED情况下使用该字段
ReentrantLock
说是研究 AQS 源码,但 AQS 毕竟是一个抽象类,只实现了部分方法,另外一些方法会在子类中实现。所以我们也同样会涉及到 ReentrantLock 源码的研究。
ReentrantLock 的通常使用方式是:
class X { private final ReentrantLock lock = new ReentrantLock(); public void m() { lock.lock(); try { // ... method body } finally { lock.unlock() } }
}
ReentrantLock 存在两种锁:公平锁(FairSync)和非公平锁(NonfairSync),默认为非公平锁。使用公平锁时,线程会直接进入队列中排队,只有队列中第一个线程才能获取锁;使用非公平锁时,线程会先尝试获取锁,成功则占用锁,失败则在队列排队。对于 AQS 来说,公平锁和非公平锁的绝大部分方法都是共用的。
ReentrantLock 的主要方法有两个,一是使用 lock()
方法加锁,二是使用 unlock()
方法解锁。
加锁
非公平锁
我们首先来分析非公平锁的加锁过程。
// java.util.concurrent.locks.ReentrantLock.NonfairSync
final void lock() { if (compareAndSetState(0, 1)) //通过cas方式设置同步状态setExclusiveOwnerThread(Thread.currentThread()); //成功,设置为独占线程else acquire(1); //失败,获取锁
}
在 lock()
方法中,会先尝试通过 CAS 的方式去获取锁,成功则设置为独占线程,否则执行 acquire()
方法。
// java.util.concurrent.locks.AbstractQueuedSynchronizer
public final void acquire(int arg) { if (!tryAcquire(arg) && //尝试获取锁acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 失败,则添加到等待队列selfInterrupt();
}
在 acquire()
方法中,会再次尝试去获取锁,如果失败则加入到排队队列。
// java.util.concurrent.locks.ReentrantLock.NonfairSync#tryAcquire
protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires);
}// java.util.concurrent.locks.ReentrantLock.Sync#nonfairTryAcquire
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { //如果当前没有线程持有锁if (compareAndSetState(0, acquires)) { //state++setExclusiveOwnerThread(current); //将当前线程设置为独占线程return true; } } else if (current == getExclusiveOwnerThread()) { //如果已有线程持有锁,且当前线程为独占线程int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); //state++,实现锁的可重入性return true; //获得锁} return false; //如果已有线程持有锁,且当前线程不为独占线程,则获取锁失败
}
在 nonfairTryAcquire()
方法中,如果资源空闲,则再次尝试通过 CAS 的方式去获取锁。如果当前资源已被当前线程占用,则将 state++
,以实现锁的可重入性。
// java.util.concurrent.locks.AbstractQueuedSynchronizer#addWaiter
// 设置队列尾节点
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); //新建排队节点 Node pred = tail; //pred指向尾节点if (pred != null) { //如果Pred指针是Null(说明等待队列中没有元素),或者当前Pred指针和Tail指向的位置不同(说明被别的线程已经修改)node.prev = pred; //将新建节点的prev指向predif (compareAndSetTail(pred, node)) { // 设置新建节点为尾节点pred.next = node; return node; } } enq(node); return node;
}//java.util.concurrent.locks.AbstractQueuedSynchronizer#enq
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { //如果尾节点为空,则说明队列还未初始化,if (compareAndSetHead(new Node())) //初始化一个头节点tail = head; } else { //如果已经初始化,则设置为尾节点node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } }
}
如果尝试多次获取锁都失败,则在 addWaiter()
方法中会将线程放到节点中,并设置为排队队列的尾节点。
//java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireQueued
final boolean acquireQueued(final Node node, int arg) { //是否成功获取到资源boolean failed = true; try { //循环等待过程中是否中断过boolean interrupted = false; for (;;) { final Node p = node.predecessor(); //获取当前节点的前驱节点if (p == head && tryAcquire(arg)) { //如果前驱节点为头节点,则尝试获取锁setHead(node); //获取锁成功,设置当前节点为头节点p.next = null; // help GC failed = false; return interrupted; } //来到这里,说明前驱节点不是头节点或者获取锁失败。判断当前节点是否要被阻塞if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; //线程中断成功} } finally { if (failed) cancelAcquire(node); }
}
加入到排队队列中后,会在 acquireQueued()
方法中循环等待资源的获取,并判断线程是否需要被阻塞,直到线程获取成功或者抛出异常。
//java.util.concurrent.locks.AbstractQueuedSynchronizer#shouldParkAfterFailedAcquire
//靠前驱节点判断当前线程是否应该被阻塞
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { //获取前驱节点的状态int ws = pred.waitStatus; if (ws == Node.SIGNAL) //如果前驱节点处于唤醒状态 return true; if (ws > 0) { //前驱节点处于取消状态 do { //向前查找取消的节点,并将其从队列中删除node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); //设置前驱节点为唤醒状态} return false;
}
//java.util.concurrent.locks.AbstractQueuedSynchronizer#parkAndCheckInterrupt
//挂起当前线程,阻塞调用栈,返回当前线程的中断状态
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted();
}
整一个非公平锁的加锁流程可以用如下流程图表示:
公平锁
公平锁和非公平锁的流程中只有 lock()
方法和 tryAcquire()
方法存在差别。
//java.util.concurrent.locks.ReentrantLock.FairSync#lock
final void lock() { acquire(1); //直接获取锁
}
公平锁中会直接调用 acquire()
方法。
// java.util.concurrent.locks.ReentrantLock.FairSync#tryAcquire
protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false;
}
相对于 nonfair
的 nonfairTryAcquire
方法,在没有线程持有锁时,增加了 hasQueuedPredecessors()
方法的判断,该方法用于判断队列中是否存在线程比当前线程等待时间更长。
// java.util.concurrent.locks.AbstractQueuedSynchronizer#hasQueuedPredecessors
public final boolean hasQueuedPredecessors() { Node t = tail; Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread());
}
分析一下这段代码:
- 如果
h != t
为 true,说明队列正在初始化,或者已经初始化完成。- 在
h != t
前提下,如果(s = h.next) == null
为 true,说明队列正在初始化,因为在队列初始化过程中,是有可能存在 head 已经被初始化(不再等于 tail 了),但 head.next 还没有被设值为 node,这种情况下,因为队列中已经存在 Node,当前线程需要加到等待队列中,故返回 true。初始化过程在AbstractQueuedSynchronizer#enq()
。 - 但如果
(s = h.next) == null
为 false,说明队列中已经存在 Node,则判断该 Node 的线程是否与当前线程相同。如果s.thread != Thread.currentThread()
为 true,说明不相同,需要进入等待队列。如果相同,说明当前线程可以获取锁。
- 在
- 如果
h != t
为 false,说明队列为空,返回 false,说明可以去获得锁。
另外,s = h.next
这段代码获取的是 head 的下一个节点,因为 head 是虚节点,不存储数据,真正的数据存储在 head.next
。
解锁
相对于加锁过程,解锁过程比较简单,且公平锁和非公平锁共用同一个 lock()
方法。
// java.util.concurrent.locks.ReentrantLock#unlock
public void unlock() { sync.release(1);
}//java.util.concurrent.locks.AbstractQueuedSynchronizer#release
public final boolean release(int arg) { if (tryRelease(arg)) { //如果锁没有被任何线程持有Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); //解除后继节点的线程挂起return true; } return false;
}
关于代码 h != null && h.waitStatus != 0
,分为以下几种情况:
h==null
,说明头节点还未初始化。h!=null && h.waitStatus==0
,说明后继节点还在运行中。因为如果节点已被取消,waitStatus 会被设置为 1;如果后继节点需要唤醒,waitStatus 会被设置为 -1;如果节点正在排队,waitStatus 则会设置为 -2。如果 waitStatus 为 0,说明已经处于运行中。h != null && h.waitStatus != 0
则会去唤醒后继节点。
在 release()
方法中,会先尝试去解锁,如果解锁成功且后继节点需要唤醒,则将后继节点取消挂起。
//java.util.concurrent.locks.ReentrantLock.Sync#tryRelease
//更新state状态,如果重入次数为0,则将锁的独占线程设置为null
protected final boolean tryRelease(int releases) { //减少可重入次数int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) //如果当前线程不是该锁的独占线程,则抛出异常throw new IllegalMonitorStateException(); boolean free = false; //该锁是否已被释放if (c == 0) { //如果可重入次数已为0free = true; setExclusiveOwnerThread(null); //将锁的独占线程设置为null,表明不再有线程持有该锁} setState(c); //修改锁的状态return free;
}
在 tryRelease()
方法中,会去减少锁的可重入次数,当可重入次数为 0 时,清空锁的独占线程。
// java.util.concurrent.locks.AbstractQueuedSynchronizer#unparkSuccessor
private void unparkSuccessor(Node node) { int ws = node.waitStatus; //获取头节点的waitStatusif (ws < 0) //如果节点的waitStatus为负数,说明后继节点需要被唤醒或者正在排队compareAndSetWaitStatus(node, ws, 0); //清除节点当前的waitStatus,设置为0 Node s = node.next; //获取头节点的后继节点if (s == null || s.waitStatus > 0) { //如果节点为null,或者已被取消s = null; for (Node t = tail; t != null && t != node; t = t.prev) //从尾节点开始向前遍历if (t.waitStatus <= 0) //找到队列中第一个不被取消的节点s = t; } if (s != null) //如果找到了,则将该节点取消挂起LockSupport.unpark(s.thread);
}
在 unparkSuccessor()
方法中,会从尾节点开始从后往前遍历,找到队列中第一个没有被取消的节点,将该节点取消挂起。
整个解锁流程可以用如下流程图表示。