abstractstaticclassNode{ volatile Node prev; // initially attached via casTail volatile Node next; // visibly nonnull when signallable Thread waiter; // visibly nonnull when enqueued volatileint status; // written by owner, atomic bit ops by others
staticfinalclassConditionNodeextendsNode implementsForkJoinPool.ManagedBlocker{ ConditionNode nextWaiter; // link to next waiting node
/** * Allows Conditions to be used in ForkJoinPools without * risking fixed pool exhaustion. This is usable only for * untimed Condition waits, not timed versions. */ publicfinalbooleanisReleasable(){ return status <= 1 || Thread.currentThread().isInterrupted(); }
publicfinalbooleanblock(){ while (!isReleasable()) LockSupport.park(); //如果当前节点不能被释放,调用中断函数 returntrue; } }
finalintacquire(Node node, int arg, boolean shared, boolean interruptible, boolean timed, long time){ Thread current = Thread.currentThread(); byte spins = 0, postSpins = 0; // retries upon unpark of first thread boolean interrupted = false, first = false;//是否中断,传入的node是否是第一个节点 Node pred = null; // predecessor of node when enqueued
/* * Repeatedly: * Check if node now first * if so, ensure head stable, else ensure valid predecessor * if node is first or not yet enqueued, try acquiring * else if node not yet created, create it * else if not yet enqueued, try once to enqueue * else if woken from park, retry (up to postSpins times) * else if WAITING status not set, set and retry * else park and clear WAITING status, and check cancellation */
for (;;) { //如果传入的节点为null,则它的上一个节点也是null,否则取到它的上一个节点,然后判断是否为空 // if (!first && (pred = (node == null) ? null : node.prev) != null && !(first = (head == pred))) { //执行到这里的条件是,一开始first是false(默认是false),传入节点不是空值,并且上一个节点也存在,并且上一个节点不是 //头结点,也就是说前面有节点在等待 if (pred.status < 0) { cleanQueue(); // predecessor cancelled continue; } elseif (pred.prev == null) { Thread.onSpinWait(); // ensure serialization continue; } } if (first || pred == null) { //如果first为ture,说明head == pred,也就是它的上一个节点是head节点,或者是没有上一个节点 boolean acquired; try { if (shared) //如果设置了共享模式,则调用共享模式的获取锁,否则以独占方式进行锁的获取 acquired = (tryAcquireShared(arg) >= 0); //共享模式只有CountdownLatch和ReentrantReadWriteLock else acquired = tryAcquire(arg); } catch (Throwable ex) { cancelAcquire(node, interrupted, false); //获取锁的时候如果出现异常,取消锁的获取 throw ex; } if (acquired) { if (first) { //如果获取到了锁,并且当前节点的前一个节点是头结点,则将当前节点设置为head node.prev = null; head = node; pred.next = null; node.waiter = null; if (shared) signalNextIfShared(node); //如果是共享锁,则通知后面的一个节点,将状态改为wait,并且唤醒线程 if (interrupted) current.interrupt(); //如果出现了中断,则中断当前的线程 } return1; //在拿到了锁之后,退出循环 } } if (node == null) { // 如果传入的节点为null,生成一个新的Node if (shared) node = new SharedNode(); else node = new ExclusiveNode(); } elseif (pred == null) { // 如果pred为null,则说明节点是新生成的,尝试将新节点插入到等待队列中去 node.waiter = current; Node t = tail; node.setPrevRelaxed(t); // avoid unnecessary fence if (t == null) tryInitializeHead(); elseif (!casTail(t, node)) //如果插入尾结点失败,将新节点的prev还设置为null node.setPrevRelaxed(null); // back out else t.next = node; //将新节点插入到最后面 } elseif (first && spins != 0) { --spins; // reduce unfairness on rewaits Thread.onSpinWait(); } elseif (node.status == 0) { //默认情况下status是0 node.status = WAITING; // enable signal and recheck } else { long nanos; spins = postSpins = (byte)((postSpins << 1) | 1); if (!timed) LockSupport.park(this); elseif ((nanos = time - System.nanoTime()) > 0L) LockSupport.parkNanos(this, nanos); else break; node.clearStatus(); //在挂起指定时间后,将节点的状态置位0,开始下一次循环 if ((interrupted |= Thread.interrupted()) && interruptible) break; } } return cancelAcquire(node, interrupted, interruptible); }
finalintgetAndUnsetStatus(int v){ // for signalling return U.getAndBitwiseAndInt(this, STATUS, ~v); }
publicfinalintgetAndBitwiseAndInt(Object o, long offset, int mask){ int current; do { current = getIntVolatile(o, offset); } while (!weakCompareAndSetInt(o, offset, current, current & mask)); return current; }