当前仓库属于关闭状态,部分功能使用受限,详情请查阅 仓库状态说明
2 Star 4 Fork 4

路的尽头在哪/JDK1.8.0.25-read
关闭

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
AbstractQueuedSynchronizer.java 79.62 KB
一键复制 编辑 原始数据 按行查看 历史
qianwei4712@163.com 提交于 2021-12-22 22:55 +08:00 . AbstractQueuedSynchronizer.java
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962
package java.util.concurrent.locks;
import java.util.concurrent.TimeUnit;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import sun.misc.Unsafe;
/**
* 提供一个框架,用于实现依赖先进先出(FIFO)等待队列的阻塞锁和相关同步器(信号量,事件等)。
* 该类被设计为大多数类型的同步器的有用依据,这些同步器依赖于单个原子int值来表示状态。
* 子类必须定义改变此状态的受保护方法,以及根据该对象被获取或释放来定义该状态的含义。
* 给定这些,这个类中的其他方法执行所有排队和阻塞机制。
* 子类可以保持其他状态字段,但只以原子方式更新int使用方法操纵值getState() , setState(int)和compareAndSetState(int, int)被跟踪相对于同步。
*
* <p>子类应定义为非公共内部助手类,用于实现其封闭类的同步属性。
* AbstractQueuedSynchronizer类不实现任何同步接口。
* 相反,它定义了一些方法,如acquireInterruptibly(int) ,可以通过具体的锁和相关同步器来调用适当履行其公共方法。
*
* <p>此类支持默认独占模式和共享模式。
* 当以独占模式获取时,尝试通过其他线程获取不能成功。
* 多线程获取的共享模式可能(但不需要)成功。
* 除了在机械意义上,这个类不理解这些差异,当共享模式获取成功时,下一个等待线程(如果存在)也必须确定它是否也可以获取。
* 在不同模式下等待的线程共享相同的FIFO队列。 通常,实现子类只支持这些模式之一,但是两者都可以在ReadWriteLock中发挥作用 。
* 仅支持独占或仅共享模式的子类不需要定义支持未使用模式的方法
*
* <p>这个类定义的嵌套AbstractQueuedSynchronizer.ConditionObject可用于作为
* 一类Condition由子类支持独占模式用于该方法的实施isHeldExclusively()份报告是否同步排他相对于保持在当前线程,
* 方法release(int)与当前调用getState()值完全释放此目的,和acquire(int) ,给定此保存的状态值,最终将此对象恢复到其先前获取的状态。
* AbstractQueuedSynchronizer方法将创建此类条件,因此如果不能满足此约束,请勿使用该约束。
* AbstractQueuedSynchronizer.ConditionObject的行为当然取决于其同步器实现的语义。
*
* <p>该类为内部队列提供检查,检测和监控方法,以及条件对象的类似方法。
* 这些可以根据需要导出到类中,使用AbstractQueuedSynchronizer进行同步机制。
*
* <p>此类的序列化仅存储底层原子整数维持状态,因此反序列化对象具有空线程队列。
* 需要可序列化的典型子类将定义一个readObject方法,可以将其恢复为readObject时的已知初始状态。
*
* <h3>用法</h3>
*
* <p>使用这个类用作同步的基础上,重新定义以下方法,如适用,通过检查和/或修改使用所述同步状态getState() , setState(int)和/或compareAndSetState(int, int) :
* <ul>
* <li> {@link #tryAcquire}
* <li> {@link #tryRelease}
* <li> {@link #tryAcquireShared}
* <li> {@link #tryReleaseShared}
* <li> {@link #isHeldExclusively}
* </ul>
* 每个这些方法默认抛出UnsupportedOperationException 。 这些方法的实现必须是线程安全的,通常应该是短的而不是阻止的。
* 定义这些方法是唯一支持使用此类的方法。 所有其他方法都被声明为final ,因为它们不能独立变化。
*
* <p>您还可以找到来自继承的方法AbstractOwnableSynchronizer有用跟踪线程拥有独家同步的。
* 我们鼓励您使用它们 - 这样可以使监控和诊断工具帮助用户确定哪些线程持有锁定。
*
* <p>即使这个类基于内部FIFO队列,它也不会自动执行FIFO采集策略。 排他同步的核心形式如下:
*
* <pre>
* Acquire:
* while (!tryAcquire(arg)) {
* <em>enqueue thread if it is not already queued</em>;
* <em>possibly block current thread</em>;
* }
*
* Release:
* if (tryRelease(arg))
* <em>unblock the first queued thread</em>;
* </pre>
*
* (共享模式类似,但可能包含级联信号。)
*
* <p id="barging">因为在采集检查入队之前调用,所以新获取的线程可能闯入其他被阻塞和排队的。
* 但是,如果需要,您可以通过内部调用一个或多个检查方法来定义tryAcquire和/或tryAcquireShared来禁用驳船,从而提供一个合理的 FIFO采购订单。
* 特别地,最公平同步器可以定义tryAcquire返回false如果hasQueuedPredecessors() (具体地设计成由公平同步器中使用的方法)返回true 。
* 其他变化是可能的。
*
* <p>吞吐量和可扩展性通常对于默认的驳船(也称为贪心 , 放弃和车队避免 )战略来说是最高的。
* 虽然这不能保证是公平的或无饥饿的,但较早排队的线程在稍后排队的线程之前被允许重新侦听,并且每次重新提供对于传入线程成功的机会。
* 此外,虽然获取在通常意义上不“旋转”,但是在阻止之前它们可以执行多个tryAcquire tryAcquire与其他计算的交互。
* 当独占同步只是简单地持有时,这样可以提供旋转的大部分好处,而没有大部分负债。
* 如果需要,您可以通过以前通过“快速路径”检查获取方法的调用进行扩充,可能预先检查hasContended()和/或hasQueuedThreads() ,
* 以便只有在同步器可能不被竞争的情况下才能进行。
*
* <p>该类为同步提供了一个高效和可扩展的基础,部分原因是可以依靠int状态,获取和释放参数以及内部FIFO等待队列的同步器的使用范围。
* 当这不足够时,您可以使用atomic类,您自己的自定义Queue类和LockSupport类阻止支持从较低级别构建同步器。
*
* <h3>用法示例</h3>
*
* <p>这是一个不可重入互斥锁类,它使用零值来表示解锁状态,一个表示锁定状态。
* 虽然不可重入锁不严格要求记录当前的所有者线程,但是这样做无论如何使得使用更容易监视。 它还支持条件并公开其中一种仪器方法:
* <pre> {@code
* class Mutex implements Lock, java.io.Serializable {
*
* // Our internal helper class
* private static class Sync extends AbstractQueuedSynchronizer {
* // Reports whether in locked state
* protected boolean isHeldExclusively() {
* return getState() == 1;
* }
*
* // Acquires the lock if state is zero
* public boolean tryAcquire(int acquires) {
* assert acquires == 1; // Otherwise unused
* if (compareAndSetState(0, 1)) {
* setExclusiveOwnerThread(Thread.currentThread());
* return true;
* }
* return false;
* }
*
* // Releases the lock by setting state to zero
* protected boolean tryRelease(int releases) {
* assert releases == 1; // Otherwise unused
* if (getState() == 0) throw new IllegalMonitorStateException();
* setExclusiveOwnerThread(null);
* setState(0);
* return true;
* }
*
* // Provides a Condition
* Condition newCondition() { return new ConditionObject(); }
*
* // Deserializes properly
* private void readObject(ObjectInputStream s)
* throws IOException, ClassNotFoundException {
* s.defaultReadObject();
* setState(0); // reset to unlocked state
* }
* }
*
* // The sync object does all the hard work. We just forward to it.
* private final Sync sync = new Sync();
*
* public void lock() { sync.acquire(1); }
* public boolean tryLock() { return sync.tryAcquire(1); }
* public void unlock() { sync.release(1); }
* public Condition newCondition() { return sync.newCondition(); }
* public boolean isLocked() { return sync.isHeldExclusively(); }
* public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
* public void lockInterruptibly() throws InterruptedException {
* sync.acquireInterruptibly(1);
* }
* public boolean tryLock(long timeout, TimeUnit unit)
* throws InterruptedException {
* return sync.tryAcquireNanos(1, unit.toNanos(timeout));
* }
* }}</pre>
*
* <p>Here is a latch class that is like a
* {@link java.util.concurrent.CountDownLatch CountDownLatch}
* except that it only requires a single {@code signal} to
* fire. Because a latch is non-exclusive, it uses the {@code shared}
* acquire and release methods.
*
* <pre> {@code
* class BooleanLatch {
*
* private static class Sync extends AbstractQueuedSynchronizer {
* boolean isSignalled() { return getState() != 0; }
*
* protected int tryAcquireShared(int ignore) {
* return isSignalled() ? 1 : -1;
* }
*
* protected boolean tryReleaseShared(int ignore) {
* setState(1);
* return true;
* }
* }
*
* private final Sync sync = new Sync();
* public boolean isSignalled() { return sync.isSignalled(); }
* public void signal() { sync.releaseShared(1); }
* public void await() throws InterruptedException {
* sync.acquireSharedInterruptibly(1);
* }
* }}</pre>
* @since 1.5
*/
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
private static final long serialVersionUID = 7373984972572414691L;
/* 字段属性 *****************************************************************************************************************/
/**
* 队列头指针。只能通过set方法修改。
* 如果 head 存在,则保证其 waitStatus 不会被 CANCELLED
*/
private transient volatile Node head;
/**
* 队列尾指针。只能通过添加新节点
*/
private transient volatile Node tail;
/**
* 共享变量,使用volatile修饰保证线程可见性
*/
private volatile int state;
/**
* 自旋时间
* The number of nanoseconds for which it is faster to spin rather than to use timed park.
* A rough estimate suffices to improve responsiveness with very short timeouts.
*/
static final long spinForTimeoutThreshold = 1000L;
/**
* Setup to support compareAndSet. We need to natively implement
* this here: For the sake of permitting future enhancements, we
* cannot explicitly subclass AtomicInteger, which would be
* efficient and useful otherwise. So, as the lesser of evils, we
* natively implement using hotspot intrinsics API. And while we
* are at it, we do the same for other CASable fields (which could
* otherwise be done with atomic field updaters).
*/
// Unsafe类实例
private static final Unsafe unsafe = Unsafe.getUnsafe();
// Unsafe类实例
private static final long stateOffset;
// head内存偏移地址
private static final long headOffset;
// head内存偏移地址
private static final long tailOffset;
// tail内存偏移地址
private static final long waitStatusOffset;
// tail内存偏移地址
private static final long nextOffset;
static {
try {
stateOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
headOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
tailOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
waitStatusOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("waitStatus"));
nextOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("next"));
} catch (Exception ex) { throw new Error(ex); }
}
/* 构造方法 *****************************************************************************************************************/
/**
* 构造方法
* 创建一个初始同步状态为零的新的 AbstractQueuedSynchronizer实例。
*/
protected AbstractQueuedSynchronizer() { }
/* 主要方法 - 独占模式 *****************************************************************************************************************/
/**
* 以独占模式获取锁,忽略中断。
* 通过至少调用一次 {@link #tryAcquire} 实现,成功返回。
* 否则线程会排队,可能会反复阻塞和解除阻塞,调用 {@link #tryAcquire} 直到成功。
* 该方法可用于实现方法{@link Lock#lock}。
*
* @param arg 这个值被传送到 {@link #tryAcquire},但不会被解释,可以代表任何你喜欢的东西。
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
/**
* 尝试以独占模式获取。该方法应该查询对象的状态是否允许以独占模式获取它,如果允许则获取它。
* <p>此方法始终由执行获取的线程调用。
* 如果此方法报告失败,acquire 方法可能会将线程排队,如果它尚未排队,直到收到来自某个其他线程的释放信号。
* 这可用于实现方法 {@link Lock#tryLock()}。
*
* @param arg 该值始终是传递给获取方法的值,或者是在进入条件等待时保存的值。
* 该值是未经解释的,可以表示您喜欢的任何内容。
* @return {@code true} 如果成功。成功后,该对象已获得锁。
* @throws IllegalMonitorStateException 如果获取会将这个同步器置于非法状态。必须以一致的方式抛出此异常,同步才能正常工作。
* @throws UnsupportedOperationException 如果不支持独占模式
*/
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
/**
* 以独占不间断模式获取已在队列中的线程。
* 由条件等待方法以及获取使用。
* @param node 节点
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 获得当前节点的前驱节点,前驱结点不为空则返回,否则报异常
// !!!注意,这一步要么异常、要么返回节点
final Node p = node.predecessor();
// !!!上一个节点是头节点(也就是占位空节点,那么表示当前节点应该是实际意义上的第一个等待线程)
// !!!那么,根据公平锁的顺序,第一个等待线程优先尝试获得锁,并且如果获得锁,则进入第一个if
if (p == head && tryAcquire(arg)) {
// 那当前线程设置为头节点
setHead(node);
p.next = null; // help GC
failed = false;
// 返回 false,表示不需要被重点
return interrupted;
}
//如果自己可以休息了,就通过park()进入waiting状态,直到被unpark()。如果不可中断的情况下被中断了,那么会从park()中醒过来,发现拿不到资源,从而继续进入park()等待。
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;//如果等待过程中被中断过,哪怕只有那么一次,就将interrupted标记为true
}
} finally {
//最后在返回之前,当前线程取消试图获取锁
if (failed) cancelAcquire(node);
}
}
/**
* 独占模式下释放锁。如果 {@link #tryRelease} 返回 true,通过解除阻塞一个或多个线程来实现。
* 这个方法可以用来实现 {@link Lock#unlock}.
* @param arg 释放锁参数. 这个值被传送到 {@link #tryRelease} 但没有被解释,可以代表任何你喜欢的东西。
* @return the value returned from {@link #tryRelease}
*/
public final boolean release(int arg) {
if (tryRelease(arg)) {//释放锁成功则进入
Node h = head;
//如果链表头不为空,并且状态不是0(表示没有等待线程)
if (h != null && h.waitStatus != 0)
//进入方法,唤醒锁
unparkSuccessor(h);
return true;
}
//释放锁不成功,返回 false
return false;
}
/**
* 唤醒节点的后继节点(如果存在)。
* @param node the node
*/
private void unparkSuccessor(Node node) {
// 如果状态是负数(也就是signal=-1),尝试清除预期的信号
// 如果此操作失败或等待线程更改状态,也不影响
int ws = node.waitStatus;
//置零当前线程所在的结点状态,允许失败。
if (ws < 0) compareAndSetWaitStatus(node, ws, 0);
//找到下一个需要唤醒的结点s
Node s = node.next;
if (s == null || s.waitStatus > 0) {//如果为空或已取消
s = null;
//啰里啰唆的也不看了,反正是寻找下一个需要唤醒的线程
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0) s = t;
}
//找到后唤醒
if (s != null) LockSupport.unpark(s.thread);
}
/**
* 独占模式下,尝试修改 state释放锁。
* <p>此方法始终由执行释放的线程调用。默认实现抛出 {@link UnsupportedOperationException}.
* @param arg 释放参数. 该值始终是传递给释放方法的值,或者是进入条件等待时的当前状态值。该值是未经解释的,可以表示您喜欢的任何内容
* @return {@code true} 如果此对象现在处于完全释放状态,以便任何等待的线程都可以尝试获取;和 {@code false} 否则。
* @throws IllegalMonitorStateException 如果释放会使这个同步器处于非法状态。必须以一致的方式抛出此异常,同步才能正常工作。
* @throws UnsupportedOperationException 如果不支持独占模式
*/
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
/* 主要方法 - 共享模式 *****************************************************************************************************************/
/**
* 此方法是共享模式下线程获取共享资源的顶层入口。
* 它会获取指定量的资源,获取成功则直接返回,获取失败则进入等待队列,直到获取到资源为止,
* 整个过程忽略中断。
* @param arg the acquire argument.
*/
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0) doAcquireShared(arg);
}
/**
* 在共享模式下尝试,如果中断则中止。
* 通过首先检查中断状态来实现,然后至少调用一次 {@link #tryAcquireShared},成功返回。
* 否则线程会排队,可能会重复阻塞和解除阻塞,调用 {@link #tryAcquireShared} 直到成功或线程被中断。
* @param arg the acquire argument.
* @throws InterruptedException 如果线程中断,则抛出异常
*/
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
//如果线程中断,则抛出异常
if (Thread.interrupted()) throw new InterruptedException();
//尝试共享模式下获得锁
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
/**
* 尝试以共享模式获取。
* 该方法应该查询对象的状态是否允许在共享模式下获取该对象,如果是这样,就可以获取它。
* <p>该方法总是由执行获取的线程调用。
* 如果此方法报告失败,则获取方法可能将线程排队(如果尚未排队),直到被其他线程释放为止。
* <p>默认实现抛出 {@link UnsupportedOperationException}.
*
* @param arg 获取的论据。 该值始终是传递给获取方法的值,或者是进入条件等待时保存的值。 该值否则无法解释,可以代表您喜欢的任何内容。
* @return 失败的时候返回负值。如果在共享模式下获取成功但没有后续共享模式获取可以成功,则为零;
* 如果以共享模式获取成功并且随后的共享模式获取可能成功,则为正值,在这种情况下,后续等待线程必须检查可用性。
* (支持三种不同的返回值使得这种方法可以在仅获取有时只能完全执行的上下文中使用。)成功后,该对象已被获取。
* @throws IllegalMonitorStateException 如果获取将该同步器置于非法状态。 必须以一致的方式抛出此异常,以使同步正常工作
* @throws UnsupportedOperationException 如果不支持共享模式
*/
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
/**
* 在共享可中断模式下获取。
* @param arg 获取参数
*/
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
final Node node = addWaiter(Node.SHARED);//队尾加入共享节点
boolean failed = true;//是否成功标志
try {
for (;;) {
//拿到共享节点的上一个节点,也就是队尾的上一个节点
final Node p = node.predecessor();
if (p == head) {
//如果到head的下一个,因为head是拿到资源的线程,此时node被唤醒,很可能是head用完资源来唤醒自己的
int r = tryAcquireShared(arg);//尝试获取资源
if (r >= 0) {//成功
//将head指向自己,还有剩余资源可以再唤醒之后的线程
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
//判断状态,寻找安全点,进入waiting状态,等着被unpark()或interrupt()
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
/**
* Sets head of queue, and checks if successor may be waiting
* in shared mode, if so propagating if either propagate > 0 or
* PROPAGATE status was set.
*
* @param node the node
* @param propagate the return value from a tryAcquireShared
*/
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // 记录下旧头以供检查
//设置
setHead(node);
//如果还有剩余量,继续唤醒下一个邻居线程
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
/**
* 共享模式的释放操作 -- 信号后继并确保传播. (注意:对于独占模式,如果需要信号,释放就相当于调用头部的 unparkSuccessor。)
*/
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
/**
* 共享模式运行. 如果 {@link #tryReleaseShared} 返回 true,则通过解除阻塞一个或多个线程来实现。
* @param arg 释放参数。arg会被传到{@link #tryReleaseShared},但是这个方法是抽象方法,可能代表任何东西
* @return 从 {@link #tryReleaseShared} 返回的值
*/
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {//尝试释放锁
doReleaseShared();//执行释放锁
return true;
}
return false;
}
/* 其他方法 *****************************************************************************************************************/
/**
* 返回同步状态的当前值。
* 此属性 state 被 {@code volatile} 修饰,保证可见性。
* @return 当前state值
*/
protected final int getState() {
return state;
}
/**
* 设置同步状态的值。此操作具有 {@code volatile} 写入的内存语义。
* @param newState 新的状态值
*/
protected final void setState(int newState) {
state = newState;
}
/**
* 如果当前状态值等于预期值,则原子地将同步状态设置为给定的更新值。
* 此操作具有 {@code volatile} 读写的内存语义。
* @param expect 期望值
* @param update 新值
* @return 成功返回 {@code true}. 假返回表示实际值不等于预期值
*/
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
/**
* CAS head field. Used only by enq.
*/
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
/**
* CAS tail field. Used only by enq.
*/
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
/**
* CAS waitStatus field of a node.
*/
private static final boolean compareAndSetWaitStatus(Node node, int expect, int update) {
return unsafe.compareAndSwapInt(node, waitStatusOffset,
expect, update);
}
/**
* CAS next field of a node.
*/
private static final boolean compareAndSetNext(Node node, Node expect, Node update) {
return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
}
// 等待队列排队方法
/**
* 为当前线程和给定模式创建和排队节点。
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
//为当前调用线程,新建节点
Node node = new Node(Thread.currentThread(), mode);
// 试试enq的快速路径;失败时备份到完整的 enq
Node pred = tail;//拿到队列尾
if (pred != null) {//如果队尾不为空,表示目前队列中有等待线程
//新节点加入,连接
node.prev = pred;
//CAS 替换队尾,成功后再返回
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//没进入 if,然后说明没有等待线程
enq(node);
return node;
}
/**
* 将节点插入队列,必要时进行初始化。
* @param node the node to insert
* @return node's predecessor
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;//拿到队尾
if (t == null) { // 没有队尾,需要初始化;队尾和队头相同
if (compareAndSetHead(new Node()))
tail = head;
} else {// 线程加到最后
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
/**
* 将队列头设置为节点,从而出队。仅由获取方法调用。
* 为了 GC 和抑制不必要的信号和遍历,还清空了未使用的字段。
* @param node the node
*/
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
/**
* Convenience method to park and then check if interrupted
* @return {@code true} if interrupted
*/
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);//调用park()使线程进入waiting状态
return Thread.interrupted();//调用park()使线程进入waiting状态
}
/**
* 检查和更新未能获取的节点的状态。 如果线程应该阻塞,则返回 true。
* 这是所有获取循环中的主要信号控制。要求 pred == node.prev。
* @param pred 前驱节点
* @param node 当前节点
* @return 如果当前节点需要被阻塞,那么返回 true
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//获得前驱节点的线程节点状态
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
// 这个节点已经设置了状态,要求释放信号,所以它可以安全地停放。
// SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,也就是unpark
return true;
if (ws > 0) {
do {
// CANCELLED,值为1;前驱节点被取消。跳过前驱并重试。
//如果前驱放弃了,那就一直往前找,直到找到最近一个正常等待的状态,并排在它的后边。
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//如果前驱正常,那就把前驱的状态设置成SIGNAL,告诉它拿完号后通知自己一下。有可能失败,人家说不定刚刚释放完呢!
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
/**
* 取消正在进行的获取尝试。
* @param node the node
*/
private void cancelAcquire(Node node) {
// 如果节点不存在则忽略
if (node == null) return;
node.thread = null;
// 跳过取消的前任
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
Node predNext = pred.next;
node.waitStatus = Node.CANCELLED;
// If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
/**
* @return 标识此同步器及其状态的字符串
*/
public String toString() {
int s = getState();
String q = hasQueuedThreads() ? "non" : "";
return super.toString() + "[State = " + s + ", " + q + "empty queue]";
}
/**
* 中断当前线程的便捷方法。
*/
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
/* 不知道啥用的方法 *****************************************************************************************************************/
// 排队实用程序
// 各种版本的获取实用程序
/*
* Various flavors of acquire, varying in exclusive/shared and
* control modes. Each is mostly the same, but annoyingly
* different. Only a little bit of factoring is possible due to
* interactions of exception mechanics (including ensuring that we
* cancel if tryAcquire throws exception) and other control, at
* least not without hurting performance too much.
*/
/**
* Acquires in exclusive interruptible mode.
* @param arg the acquire argument
*/
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
/**
* Acquires in exclusive timed mode.
*
* @param arg the acquire argument
* @param nanosTimeout max wait time
* @return {@code true} if acquired
*/
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
/**
* Acquires in shared uninterruptible mode.
* @param arg the acquire argument
*/
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
/**
* Acquires in shared timed mode.
*
* @param arg the acquire argument
* @param nanosTimeout max wait time
* @return {@code true} if acquired
*/
private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return true;
}
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
// Main exported methods
/**
* Attempts to set the state to reflect a release in shared mode.
*
* <p>This method is always invoked by the thread performing release.
*
* <p>The default implementation throws
* {@link UnsupportedOperationException}.
*
* @param arg the release argument. This value is always the one
* passed to a release method, or the current state value upon
* entry to a condition wait. The value is otherwise
* uninterpreted and can represent anything you like.
* @return {@code true} if this release of shared mode may permit a
* waiting acquire (shared or exclusive) to succeed; and
* {@code false} otherwise
* @throws IllegalMonitorStateException if releasing would place this
* synchronizer in an illegal state. This exception must be
* thrown in a consistent fashion for synchronization to work
* correctly.
* @throws UnsupportedOperationException if shared mode is not supported
*/
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
/**
* Returns {@code true} if synchronization is held exclusively with
* respect to the current (calling) thread. This method is invoked
* upon each call to a non-waiting {@link ConditionObject} method.
* (Waiting methods instead invoke {@link #release}.)
*
* <p>The default implementation throws {@link
* UnsupportedOperationException}. This method is invoked
* internally only within {@link ConditionObject} methods, so need
* not be defined if conditions are not used.
*
* @return {@code true} if synchronization is held exclusively;
* {@code false} otherwise
* @throws UnsupportedOperationException if conditions are not supported
*/
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
/**
* Acquires in exclusive mode, aborting if interrupted.
* Implemented by first checking interrupt status, then invoking
* at least once {@link #tryAcquire}, returning on
* success. Otherwise the thread is queued, possibly repeatedly
* blocking and unblocking, invoking {@link #tryAcquire}
* until success or the thread is interrupted. This method can be
* used to implement method {@link Lock#lockInterruptibly}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
* @throws InterruptedException if the current thread is interrupted
*/
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
/**
* Attempts to acquire in exclusive mode, aborting if interrupted,
* and failing if the given timeout elapses. Implemented by first
* checking interrupt status, then invoking at least once {@link
* #tryAcquire}, returning on success. Otherwise, the thread is
* queued, possibly repeatedly blocking and unblocking, invoking
* {@link #tryAcquire} until success or the thread is interrupted
* or the timeout elapses. This method can be used to implement
* method {@link Lock#tryLock(long, TimeUnit)}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
* @param nanosTimeout the maximum number of nanoseconds to wait
* @return {@code true} if acquired; {@code false} if timed out
* @throws InterruptedException if the current thread is interrupted
*/
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
/**
* Attempts to acquire in shared mode, aborting if interrupted, and
* failing if the given timeout elapses. Implemented by first
* checking interrupt status, then invoking at least once {@link
* #tryAcquireShared}, returning on success. Otherwise, the
* thread is queued, possibly repeatedly blocking and unblocking,
* invoking {@link #tryAcquireShared} until success or the thread
* is interrupted or the timeout elapses.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquireShared} but is otherwise uninterpreted
* and can represent anything you like.
* @param nanosTimeout the maximum number of nanoseconds to wait
* @return {@code true} if acquired; {@code false} if timed out
* @throws InterruptedException if the current thread is interrupted
*/
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquireShared(arg) >= 0 ||
doAcquireSharedNanos(arg, nanosTimeout);
}
// Queue inspection methods
/**
* Queries whether any threads are waiting to acquire. Note that
* because cancellations due to interrupts and timeouts may occur
* at any time, a {@code true} return does not guarantee that any
* other thread will ever acquire.
*
* <p>In this implementation, this operation returns in
* constant time.
*
* @return {@code true} if there may be other threads waiting to acquire
*/
public final boolean hasQueuedThreads() {
return head != tail;
}
/**
* Queries whether any threads have ever contended to acquire this
* synchronizer; that is if an acquire method has ever blocked.
*
* <p>In this implementation, this operation returns in
* constant time.
*
* @return {@code true} if there has ever been contention
*/
public final boolean hasContended() {
return head != null;
}
/**
* Returns the first (longest-waiting) thread in the queue, or
* {@code null} if no threads are currently queued.
*
* <p>In this implementation, this operation normally returns in
* constant time, but may iterate upon contention if other threads are
* concurrently modifying the queue.
*
* @return the first (longest-waiting) thread in the queue, or
* {@code null} if no threads are currently queued
*/
public final Thread getFirstQueuedThread() {
// handle only fast path, else relay
return (head == tail) ? null : fullGetFirstQueuedThread();
}
/**
* Version of getFirstQueuedThread called when fastpath fails
*/
private Thread fullGetFirstQueuedThread() {
/*
* The first node is normally head.next. Try to get its
* thread field, ensuring consistent reads: If thread
* field is nulled out or s.prev is no longer head, then
* some other thread(s) concurrently performed setHead in
* between some of our reads. We try this twice before
* resorting to traversal.
*/
Node h, s;
Thread st;
if (((h = head) != null && (s = h.next) != null &&
s.prev == head && (st = s.thread) != null) ||
((h = head) != null && (s = h.next) != null &&
s.prev == head && (st = s.thread) != null))
return st;
/*
* Head's next field might not have been set yet, or may have
* been unset after setHead. So we must check to see if tail
* is actually first node. If not, we continue on, safely
* traversing from tail back to head to find first,
* guaranteeing termination.
*/
Node t = tail;
Thread firstThread = null;
while (t != null && t != head) {
Thread tt = t.thread;
if (tt != null)
firstThread = tt;
t = t.prev;
}
return firstThread;
}
/**
* Returns true if the given thread is currently queued.
*
* <p>This implementation traverses the queue to determine
* presence of the given thread.
*
* @param thread the thread
* @return {@code true} if the given thread is on the queue
* @throws NullPointerException if the thread is null
*/
public final boolean isQueued(Thread thread) {
if (thread == null)
throw new NullPointerException();
for (Node p = tail; p != null; p = p.prev)
if (p.thread == thread)
return true;
return false;
}
/**
* Returns {@code true} if the apparent first queued thread, if one
* exists, is waiting in exclusive mode. If this method returns
* {@code true}, and the current thread is attempting to acquire in
* shared mode (that is, this method is invoked from {@link
* #tryAcquireShared}) then it is guaranteed that the current thread
* is not the first queued thread. Used only as a heuristic in
* ReentrantReadWriteLock.
*/
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}
/**
* 查询是否有任何线程等待获取的时间比当前线程长。
* <p>调用此方法等效于(但可能比):
* <pre> {@code
* getFirstQueuedThread() != Thread.currentThread() &&
* hasQueuedThreads()}</pre>
*
* <p>请注意,由于中断和超时导致的取消随时可能发生,因此 {@code true} 返回并不能保证其他线程会在当前线程之前获取。
* 同样,由于队列为空,在此方法返回 {@code false} 后,另一个线程可能会赢得排队竞争。
*
* <p>此方法旨在供公平同步器使用以避免闯入.
* Such a synchronizer's {@link #tryAcquire} method should return
* {@code false}, and its {@link #tryAcquireShared} method should
* return a negative value, if this method returns {@code true}
* (unless this is a reentrant acquire). For example, the {@code
* tryAcquire} method for a fair, reentrant, exclusive mode
* synchronizer might look like this:
*
* <pre> {@code
* protected boolean tryAcquire(int arg) {
* if (isHeldExclusively()) {
* // A reentrant acquire; increment hold count
* return true;
* } else if (hasQueuedPredecessors()) {
* return false;
* } else {
* // try to acquire normally
* }
* }}</pre>
*
* @return 如果在当前线程之前有一个排队线程,返回 true; 如果当前线程在队列的头部或队列为空,返回 false
* @since 1.7
*/
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized before tail and on head.next being accurate if the current thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
//条件判断:
// 1.队列头尾不是同一个线程,表示不是空队列或者不止一个
// 2.队列只有一个线程 或者 第二个线程不是当前线程
return h != t && ((s = h.next) == null || s.thread != Thread.currentThread());
}
// Instrumentation and monitoring methods
/**
* Returns an estimate of the number of threads waiting to
* acquire. The value is only an estimate because the number of
* threads may change dynamically while this method traverses
* internal data structures. This method is designed for use in
* monitoring system state, not for synchronization
* control.
*
* @return the estimated number of threads waiting to acquire
*/
public final int getQueueLength() {
int n = 0;
for (Node p = tail; p != null; p = p.prev) {
if (p.thread != null)
++n;
}
return n;
}
/**
* Returns a collection containing threads that may be waiting to
* acquire. Because the actual set of threads may change
* dynamically while constructing this result, the returned
* collection is only a best-effort estimate. The elements of the
* returned collection are in no particular order. This method is
* designed to facilitate construction of subclasses that provide
* more extensive monitoring facilities.
*
* @return the collection of threads
*/
public final Collection<Thread> getQueuedThreads() {
ArrayList<Thread> list = new ArrayList<Thread>();
for (Node p = tail; p != null; p = p.prev) {
Thread t = p.thread;
if (t != null)
list.add(t);
}
return list;
}
/**
* Returns a collection containing threads that may be waiting to
* acquire in exclusive mode. This has the same properties
* as {@link #getQueuedThreads} except that it only returns
* those threads waiting due to an exclusive acquire.
*
* @return the collection of threads
*/
public final Collection<Thread> getExclusiveQueuedThreads() {
ArrayList<Thread> list = new ArrayList<Thread>();
for (Node p = tail; p != null; p = p.prev) {
if (!p.isShared()) {
Thread t = p.thread;
if (t != null)
list.add(t);
}
}
return list;
}
/**
* Returns a collection containing threads that may be waiting to
* acquire in shared mode. This has the same properties
* as {@link #getQueuedThreads} except that it only returns
* those threads waiting due to a shared acquire.
*
* @return the collection of threads
*/
public final Collection<Thread> getSharedQueuedThreads() {
ArrayList<Thread> list = new ArrayList<Thread>();
for (Node p = tail; p != null; p = p.prev) {
if (p.isShared()) {
Thread t = p.thread;
if (t != null)
list.add(t);
}
}
return list;
}
// Internal support methods for Conditions
/**
* Returns true if a node, always one that was initially placed on
* a condition queue, is now waiting to reacquire on sync queue.
* @param node the node
* @return true if is reacquiring
*/
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;
/*
* node.prev can be non-null, but not yet on queue because
* the CAS to place it on queue can fail. So we have to
* traverse from tail to make sure it actually made it. It
* will always be near the tail in calls to this method, and
* unless the CAS failed (which is unlikely), it will be
* there, so we hardly ever traverse much.
*/
return findNodeFromTail(node);
}
/**
* Returns true if node is on sync queue by searching backwards from tail.
* Called only when needed by isOnSyncQueue.
* @return true if present
*/
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}
/**
* Transfers a node from a condition queue onto sync queue.
* Returns true if successful.
* @param node the node
* @return true if successfully transferred (else the node was
* cancelled before signal)
*/
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
/**
* Transfers node, if necessary, to sync queue after a cancelled wait.
* Returns true if thread was cancelled before being signalled.
*
* @param node the node
* @return true if cancelled before the node was signalled
*/
final boolean transferAfterCancelledWait(Node node) {
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node);
return true;
}
/*
* If we lost out to a signal(), then we can't proceed
* until it finishes its enq(). Cancelling during an
* incomplete transfer is both rare and transient, so just
* spin.
*/
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
/**
* Invokes release with current state value; returns saved state.
* Cancels node and throws exception on failure.
* @param node the condition node for this wait
* @return previous sync state
*/
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
// Instrumentation methods for conditions
/**
* Queries whether the given ConditionObject
* uses this synchronizer as its lock.
*
* @param condition the condition
* @return {@code true} if owned
* @throws NullPointerException if the condition is null
*/
public final boolean owns(ConditionObject condition) {
return condition.isOwnedBy(this);
}
/**
* Queries whether any threads are waiting on the given condition
* associated with this synchronizer. Note that because timeouts
* and interrupts may occur at any time, a {@code true} return
* does not guarantee that a future {@code signal} will awaken
* any threads. This method is designed primarily for use in
* monitoring of the system state.
*
* @param condition the condition
* @return {@code true} if there are any waiting threads
* @throws IllegalMonitorStateException if exclusive synchronization
* is not held
* @throws IllegalArgumentException if the given condition is
* not associated with this synchronizer
* @throws NullPointerException if the condition is null
*/
public final boolean hasWaiters(ConditionObject condition) {
if (!owns(condition))
throw new IllegalArgumentException("Not owner");
return condition.hasWaiters();
}
/**
* Returns an estimate of the number of threads waiting on the
* given condition associated with this synchronizer. Note that
* because timeouts and interrupts may occur at any time, the
* estimate serves only as an upper bound on the actual number of
* waiters. This method is designed for use in monitoring of the
* system state, not for synchronization control.
*
* @param condition the condition
* @return the estimated number of waiting threads
* @throws IllegalMonitorStateException if exclusive synchronization
* is not held
* @throws IllegalArgumentException if the given condition is
* not associated with this synchronizer
* @throws NullPointerException if the condition is null
*/
public final int getWaitQueueLength(ConditionObject condition) {
if (!owns(condition))
throw new IllegalArgumentException("Not owner");
return condition.getWaitQueueLength();
}
/**
* Returns a collection containing those threads that may be
* waiting on the given condition associated with this
* synchronizer. Because the actual set of threads may change
* dynamically while constructing this result, the returned
* collection is only a best-effort estimate. The elements of the
* returned collection are in no particular order.
*
* @param condition the condition
* @return the collection of threads
* @throws IllegalMonitorStateException if exclusive synchronization
* is not held
* @throws IllegalArgumentException if the given condition is
* not associated with this synchronizer
* @throws NullPointerException if the condition is null
*/
public final Collection<Thread> getWaitingThreads(ConditionObject condition) {
if (!owns(condition))
throw new IllegalArgumentException("Not owner");
return condition.getWaitingThreads();
}
/**
* Condition implementation for a {@link
* AbstractQueuedSynchronizer} serving as the basis of a {@link
* Lock} implementation.
*
* <p>Method documentation for this class describes mechanics,
* not behavioral specifications from the point of view of Lock
* and Condition users. Exported versions of this class will in
* general need to be accompanied by documentation describing
* condition semantics that rely on those of the associated
* {@code AbstractQueuedSynchronizer}.
*
* <p>This class is Serializable, but all fields are transient,
* so deserialized conditions have no waiters.
*/
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
/**
* Creates a new {@code ConditionObject} instance.
*/
public ConditionObject() { }
// Internal methods
/**
* Adds a new waiter to wait queue.
* @return its new wait node
*/
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
/**
* Removes and transfers nodes until hit non-cancelled one or
* null. Split out from signal in part to encourage compilers
* to inline the case of no waiters.
* @param first (non-null) the first node on condition queue
*/
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
/**
* Removes and transfers all nodes.
* @param first (non-null) the first node on condition queue
*/
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
/**
* Unlinks cancelled waiter nodes from condition queue.
* Called only while holding lock. This is called when
* cancellation occurred during condition wait, and upon
* insertion of a new waiter when lastWaiter is seen to have
* been cancelled. This method is needed to avoid garbage
* retention in the absence of signals. So even though it may
* require a full traversal, it comes into play only when
* timeouts or cancellations occur in the absence of
* signals. It traverses all nodes rather than stopping at a
* particular target to unlink all pointers to garbage nodes
* without requiring many re-traversals during cancellation
* storms.
*/
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
// public methods
/**
* Moves the longest-waiting thread, if one exists, from the
* wait queue for this condition to the wait queue for the
* owning lock.
*
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
/**
* Moves all threads from the wait queue for this condition to
* the wait queue for the owning lock.
*
* @throws IllegalMonitorStateException if {@link #isHeldExclusively} returns {@code false}
*/
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
/**
* Implements uninterruptible condition wait.
* <ol>
* <li> Save lock state returned by {@link #getState}.
* <li> Invoke {@link #release} with saved state as argument,
* throwing IllegalMonitorStateException if it fails.
* <li> Block until signalled.
* <li> Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* </ol>
*/
public final void awaitUninterruptibly() {
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
boolean interrupted = false;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if (Thread.interrupted())
interrupted = true;
}
if (acquireQueued(node, savedState) || interrupted)
selfInterrupt();
}
/*
* For interruptible waits, we need to track whether to throw
* InterruptedException, if interrupted while blocked on
* condition, versus reinterrupt current thread, if
* interrupted while blocked waiting to re-acquire.
*/
/** Mode meaning to reinterrupt on exit from wait */
private static final int REINTERRUPT = 1;
/** Mode meaning to throw InterruptedException on exit from wait */
private static final int THROW_IE = -1;
/**
* Checks for interrupt, returning THROW_IE if interrupted
* before signalled, REINTERRUPT if after signalled, or
* 0 if not interrupted.
*/
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0;
}
/**
* Throws InterruptedException, reinterrupts current thread, or
* does nothing, depending on mode.
*/
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
/**
* Implements interruptible condition wait.
* <ol>
* <li> If current thread is interrupted, throw InterruptedException.
* <li> Save lock state returned by {@link #getState}.
* <li> Invoke {@link #release} with saved state as argument,
* throwing IllegalMonitorStateException if it fails.
* <li> Block until signalled or interrupted.
* <li> Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* <li> If interrupted while blocked in step 4, throw InterruptedException.
* </ol>
*/
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
/**
* Implements timed condition wait.
* <ol>
* <li> If current thread is interrupted, throw InterruptedException.
* <li> Save lock state returned by {@link #getState}.
* <li> Invoke {@link #release} with saved state as argument,
* throwing IllegalMonitorStateException if it fails.
* <li> Block until signalled, interrupted, or timed out.
* <li> Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* <li> If interrupted while blocked in step 4, throw InterruptedException.
* </ol>
*/
public final long awaitNanos(long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
final long deadline = System.nanoTime() + nanosTimeout;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (nanosTimeout <= 0L) {
transferAfterCancelledWait(node);
break;
}
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return deadline - System.nanoTime();
}
/**
* Implements absolute timed condition wait.
* <ol>
* <li> If current thread is interrupted, throw InterruptedException.
* <li> Save lock state returned by {@link #getState}.
* <li> Invoke {@link #release} with saved state as argument,
* throwing IllegalMonitorStateException if it fails.
* <li> Block until signalled, interrupted, or timed out.
* <li> Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* <li> If interrupted while blocked in step 4, throw InterruptedException.
* <li> If timed out while blocked in step 4, return false, else true.
* </ol>
*/
public final boolean awaitUntil(Date deadline)
throws InterruptedException {
long abstime = deadline.getTime();
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
boolean timedout = false;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (System.currentTimeMillis() > abstime) {
timedout = transferAfterCancelledWait(node);
break;
}
LockSupport.parkUntil(this, abstime);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return !timedout;
}
/**
* Implements timed condition wait.
* <ol>
* <li> If current thread is interrupted, throw InterruptedException.
* <li> Save lock state returned by {@link #getState}.
* <li> Invoke {@link #release} with saved state as argument,
* throwing IllegalMonitorStateException if it fails.
* <li> Block until signalled, interrupted, or timed out.
* <li> Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* <li> If interrupted while blocked in step 4, throw InterruptedException.
* <li> If timed out while blocked in step 4, return false, else true.
* </ol>
*/
public final boolean await(long time, TimeUnit unit)
throws InterruptedException {
long nanosTimeout = unit.toNanos(time);
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
final long deadline = System.nanoTime() + nanosTimeout;
boolean timedout = false;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (nanosTimeout <= 0L) {
timedout = transferAfterCancelledWait(node);
break;
}
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return !timedout;
}
// support for instrumentation
/**
* Returns true if this condition was created by the given
* synchronization object.
*
* @return {@code true} if owned
*/
final boolean isOwnedBy(AbstractQueuedSynchronizer sync) {
return sync == AbstractQueuedSynchronizer.this;
}
/**
* Queries whether any threads are waiting on this condition.
* Implements {@link AbstractQueuedSynchronizer#hasWaiters(ConditionObject)}.
*
* @return {@code true} if there are any waiting threads
* @throws IllegalMonitorStateException if {@link #isHeldExclusively} returns {@code false}
*/
protected final boolean hasWaiters() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
if (w.waitStatus == Node.CONDITION)
return true;
}
return false;
}
/**
* Returns an estimate of the number of threads waiting on
* this condition.
* Implements {@link AbstractQueuedSynchronizer#getWaitQueueLength(ConditionObject)}.
*
* @return the estimated number of waiting threads
* @throws IllegalMonitorStateException if {@link #isHeldExclusively} returns {@code false}
*/
protected final int getWaitQueueLength() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int n = 0;
for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
if (w.waitStatus == Node.CONDITION)
++n;
}
return n;
}
/**
* Returns a collection containing those threads that may be
* waiting on this Condition.
* Implements {@link AbstractQueuedSynchronizer#getWaitingThreads(ConditionObject)}.
*
* @return the collection of threads
* @throws IllegalMonitorStateException if {@link #isHeldExclusively} returns {@code false}
*/
protected final Collection<Thread> getWaitingThreads() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
ArrayList<Thread> list = new ArrayList<Thread>();
for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
if (w.waitStatus == Node.CONDITION) {
Thread t = w.thread;
if (t != null)
list.add(t);
}
}
return list;
}
}
/**
* 等待队列节点内部类。
* <p> CLH 锁通常用于自旋锁。
* 我们改为将它们用于阻塞同步器,但使用相同的基本策略,即在其节点的前驱中保存有关线程的一些控制信息。
* 每个节点中的“状态”字段跟踪线程是否应该阻塞。节点在其前任发布时收到信号。
* 队列的每个节点都充当一个特定的通知式监视器,持有一个等待线程。
* 尽管状态字段不控制线程是否被授予锁定等。一个线程可能会尝试获取它是否在队列中的第一个。
* 但第一并不能保证成功;它只给予抗争的权利。所以当前发布的竞争者线程可能需要重新等待。
*
* <p>尾部入队,头部出队。
* <pre>
* +------+ prev +-----+ +-----+
* head | | <---- | | <---- | | tail
* +------+ +-----+ +-----+
* </pre>
*
* <p>插入 CLH 队列只需要对“tail”进行一次原子操作,因此从未排队到排队有一个简单的原子分界点。
* 类似地,出列只涉及更新“头”。然而,节点需要做更多的工作来确定他们的继任者是谁,部分是为了处理由于超时和中断可能导致的取消。
*
* <p>我们还使用“下一个”链接来实现阻塞机制。
* 每个节点的线程 id 保存在它自己的节点中,因此前驱通过遍历下一个链接来通知下一个节点唤醒以确定它是哪个线程。
* 确定后继节点必须避免与新排队节点竞争以设置其前驱节点的“下一个”字段。
* 当节点的后继节点似乎为空时,通过从原子更新的“尾部”向后检查,在必要时解决此问题。
* (或者,换句话说,下一个链接是一种优化,因此我们通常不需要向后扫描。)
*/
static final class Node {
// 模式,分为共享与独占
// 节点在共享模式下等待的标记
static final Node SHARED = new Node();
// 独占模式
static final Node EXCLUSIVE = null;
// 结点状态
// CANCELLED,值为1,表示当前的线程被取消
// SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,也就是unpark
// CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中
// PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行
// 值为0,表示当前节点在sync队列中,等待着获取锁
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
// 当前节点线程.在构造时初始化并在使用后置空.
volatile Thread thread;
// 上一个节点
volatile Node prev;
// 下一个节点
volatile Node next;
// 结点状态
volatile int waitStatus;
// 下一个等待者
Node nextWaiter;
/**
* 如果节点在共享模式下等待,则返回 true。
*/
final boolean isShared() {
return nextWaiter == SHARED;
}
// 前驱结点不为空,返回
final Node predecessor() throws NullPointerException {
// 保存前驱结点
Node p = prev;
// 前驱结点为空,抛出异常
if (p == null)
throw new NullPointerException();
else // 前驱结点不为空,返回
return p;
}
// 用于建立初始头部或共享标记
Node() { }
// 用来添加等待线程节点
Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Java
1
https://gitee.com/qianwei4712/JDK1.8.0.25-read.git
git@gitee.com:qianwei4712/JDK1.8.0.25-read.git
qianwei4712
JDK1.8.0.25-read
JDK1.8.0.25-read
master

搜索帮助