diff --git a/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java b/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java index ee2f85249d7..0f9ccbf6284 100644 --- a/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java +++ b/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java @@ -259,10 +259,7 @@ public class ForkJoinPool extends AbstractExecutorService { * (http://www.di.ens.fr/~zappa/readings/ppopp13.pdf) for an * analysis of memory ordering requirements in work-stealing * algorithms similar to the one used here. We use per-operation - * ordered writes of various kinds for updates, but usually use - * explicit load fences for reads, to cover access of several - * fields of possibly several objects without further constraining - * read-by-read ordering. + * ordered writes of various kinds for accesses when required. * * We also support a user mode in which local task processing is * in FIFO, not LIFO order, simply by using a local version of @@ -302,8 +299,7 @@ public class ForkJoinPool extends AbstractExecutorService { * * * Slot k must be read with an acquiring read, which it must * anyway to dereference and run the task if the (acquiring) - * CAS succeeds, but uses an explicit acquire fence to support - * the following rechecks even if the CAS is not attempted. + * CAS succeeds. * * * q.base may change between reading and using its value to * index the slot. To avoid trying to use the wrong t, the @@ -410,7 +406,11 @@ public class ForkJoinPool extends AbstractExecutorService { * * Field "runState" and per-WorkQueue field "phase" play similar * roles, as lockable, versioned counters. Field runState also - * includes monotonic event bits (SHUTDOWN, STOP, and TERMINATED). + * includes monotonic event bits: + * * SHUTDOWN: no more external tasks accepted; STOP when quiescent + * * STOP: no more tasks run, and deregister all workers + * * CLEANED: all unexecuted tasks have been cancelled + * * TERMINATED: all workers deregistered and all queues cleaned * The version tags enable detection of state changes (by * comparing two reads) modulo bit wraparound. The bit range in * each case suffices for purposes of determining quiescence, @@ -546,22 +546,27 @@ public class ForkJoinPool extends AbstractExecutorService { * * If computations are purely tree structured, it suffices for * every worker to activate another when it pushes a task into * an empty queue, resulting in O(log(#threads)) steps to full - * activation. Emptiness must be conservatively approximated - * (by checking if there is apparently at most one existing - * task) which may result in unnecessary signals. Also, to - * reduce resource usages in some cases, at the expense of - * slower startup in others, activation of an idle thread is - * preferred over creating a new one, here and elsewhere. + * activation. Emptiness must be conservatively approximated, + * which may result in unnecessary signals. Also, to reduce + * resource usages in some cases, at the expense of slower + * startup in others, activation of an idle thread is preferred + * over creating a new one, here and elsewhere. * * * At the other extreme, if "flat" tasks (those that do not in * turn generate others) come in serially from only a single - * producer, each worker taking its first (since the last - * activation) task from a queue should propagate a signal if - * there are more tasks in that queue. This is equivalent to, - * but generally faster than, arranging the stealer take - * multiple tasks, re-pushing one or more on its own queue, and - * signalling (because its queue is empty), also resulting in - * logarithmic full activation time. + * producer, each worker taking a task from a queue should + * propagate a signal if there are more tasks in that + * queue. This is equivalent to, but generally faster than, + * arranging the stealer take multiple tasks, re-pushing one or + * more on its own queue, and signalling (because its queue is + * empty), also resulting in logarithmic full activation + * time. If tasks do not not engage in unbounded loops based on + * the actions of other workers with unknown dependencies loop, + * this form of proagation can be limited to one signal per + * activation (phase change). We distinguish the cases by + * further signalling only if the task is an InterruptibleTask + * (see below), which are the only supported forms of task that + * may do so. * * * Because we don't know about usage patterns (or most commonly, * mixtures), we use both approaches, which present even more @@ -611,9 +616,12 @@ public class ForkJoinPool extends AbstractExecutorService { * it tries to deactivate()), giving up (and rescanning) on "ctl" * contention. To avoid missed signals during deactivation, the * method rescans and reactivates if there may have been a missed - * signal during deactivation. Because idle workers are often not - * yet blocked (parked), we use a WorkQueue field to advertise - * that a waiter actually needs unparking upon signal. + * (external) signal during deactivation. To reduce false-alarm + * reactivations while doing so, we scan multiple times + * (analogously to method quiescent()) before trying to + * reactivate. Because idle workers are often not yet blocked + * (parked), we use a WorkQueue field to advertise that a waiter + * actually needs unparking upon signal. * * Quiescence. Workers scan looking for work, giving up when they * don't find any, without being sure that none are available. @@ -644,22 +652,36 @@ public class ForkJoinPool extends AbstractExecutorService { * workers are inactive because the caller and any others * executing helpQuiesce are not included in counts. * - * Termination. A call to shutdownNow invokes tryTerminate to - * atomically set a runState mode bit. However, the process of - * termination is intrinsically non-atomic. The calling thread, as - * well as other workers thereafter terminating help cancel queued - * tasks and interrupt other workers. These actions race with - * unterminated workers. By default, workers check for - * termination only when accessing pool state. This may take a - * while but suffices for structured computational tasks. But not - * necessarily for others. Class InterruptibleTask (see below) - * further arranges runState checks before executing task bodies, - * and ensures interrupts while terminating. Even so, there are no - * guarantees after an abrupt shutdown that remaining tasks - * complete normally or exceptionally or are cancelled. - * Termination may fail to complete if running tasks ignore both - * task status and interrupts and/or produce more tasks after - * others that could cancel them have exited. + * Termination. Termination is initiated by setting STOP in one of + * three ways (via methods tryTerminate and quiescent): + * * A call to shutdownNow, in which case all workers are + * interrupted, first ensuring that the queues array is stable, + * to avoid missing any workers. + * * A call to shutdown when quiescent, in which case method + * releaseWaiters is used to dequeue them, at which point they notice + * STOP state and return from runWorker to deregister(); + * * The pool becomes quiescent() sometime after shutdown has + * been called, in which case releaseWaiters is also used to + * propagate as they deregister. + * Upon STOP, each worker, as well as external callers to + * tryTerminate (via close() etc) race to set CLEANED, indicating + * that all tasks have been cancelled. The implementation (method + * cleanQueues) balances cases in which there may be many tasks to + * cancel (benefitting from parallelism) versus contention and + * interference when many threads try to poll remaining queues, + * while also avoiding unnecessary rechecks, by using + * pseudorandom scans and giving up upon interference. This may be + * retried by the same caller only when there are no more + * registered workers, using the same criteria as method + * quiescent. When CLEANED and all workers have deregistered, + * TERMINATED is set, also signalling any caller of + * awaitTermination or close. Because shutdownNow-based + * termination relies on interrupts, there is no guarantee that + * workers will stop if their tasks ignore interrupts. Class + * InterruptibleTask (see below) further arranges runState checks + * before executing task bodies, and ensures interrupts while + * terminating. Even so, there are no guarantees because tasks may + * internally enter unbounded loops. * * Trimming workers. To release resources after periods of lack of * use, a worker starting to wait when the pool is quiescent will @@ -819,10 +841,11 @@ public class ForkJoinPool extends AbstractExecutorService { * overridden by system properties, we use workers of subclass * InnocuousForkJoinWorkerThread when there is a SecurityManager * present. These workers have no permissions set, do not belong - * to any user-defined ThreadGroup, and clear all ThreadLocals - * after executing any top-level task. The associated mechanics - * may be JVM-dependent and must access particular Thread class - * fields to achieve this effect. + * to any user-defined ThreadGroup, and clear all ThreadLocals and + * reset the ContextClassLoader before (re)activating to execute + * top-level task. The associated mechanics may be JVM-dependent + * and must access particular Thread class fields to achieve this + * effect. * * InterruptibleTasks * ==================== @@ -844,7 +867,9 @@ public class ForkJoinPool extends AbstractExecutorService { * shutdown, runners are interrupted so they can cancel. Since * external joining callers never run these tasks, they must await * cancellation by others, which can occur along several different - * paths. + * paths. The inability to rely on caller-runs may also require + * extra signalling (resulting in scanning and contention) so is + * done only conditionally in methods push and runworker. * * Across these APIs, rules for reporting exceptions for tasks * with results accessed via join() differ from those via get(), @@ -904,10 +929,8 @@ public class ForkJoinPool extends AbstractExecutorService { * direct false-sharing and indirect cases due to GC bookkeeping * (cardmarks etc), and reduce the number of resizes, which are * not especially fast because they require atomic transfers. - * Currently, arrays for workers are initialized to be just large - * enough to avoid resizing in most tree-structured tasks, but - * larger for external queues where both false-sharing problems - * and the need for resizing are more common. (Maintenance note: + * Currently, arrays are initialized to be just large enough to + * avoid resizing in most tree-structured tasks. (Maintenance note: * any changes in fields, queues, or their uses, or JVM layout * policies, must be accompanied by re-evaluation of these * placement and sizing decisions.) @@ -996,12 +1019,6 @@ public class ForkJoinPool extends AbstractExecutorService { */ static final int INITIAL_QUEUE_CAPACITY = 1 << 6; - /** - * Initial capacity of work-stealing queue array for external queues. - * Must be a power of two, at least 2. See above. - */ - static final int INITIAL_EXTERNAL_QUEUE_CAPACITY = 1 << 9; - // conversions among short, int, long static final int SMASK = 0xffff; // (unsigned) short bits static final long LMASK = 0xffffffffL; // lower 32 bits of long @@ -1015,8 +1032,9 @@ public class ForkJoinPool extends AbstractExecutorService { // pool.runState bits static final long STOP = 1L << 0; // terminating static final long SHUTDOWN = 1L << 1; // terminate when quiescent - static final long TERMINATED = 1L << 2; // only set if STOP also set - static final long RS_LOCK = 1L << 3; // lowest seqlock bit + static final long CLEANED = 1L << 2; // stopped and queues cleared + static final long TERMINATED = 1L << 3; // only set if STOP also set + static final long RS_LOCK = 1L << 4; // lowest seqlock bit // spin/sleep limits for runState locking and elsewhere static final int SPIN_WAITS = 1 << 7; // max calls to onSpinWait @@ -1246,9 +1264,7 @@ public class ForkJoinPool extends AbstractExecutorService { */ WorkQueue(ForkJoinWorkerThread owner, int id, int cfg, boolean clearThreadLocals) { - array = new ForkJoinTask[owner == null ? - INITIAL_EXTERNAL_QUEUE_CAPACITY : - INITIAL_QUEUE_CAPACITY]; + array = new ForkJoinTask[INITIAL_QUEUE_CAPACITY]; this.owner = owner; this.config = (clearThreadLocals) ? cfg | CLEAR_TLS : cfg; } @@ -1294,10 +1310,12 @@ public class ForkJoinPool extends AbstractExecutorService { unlockPhase(); if (room < 0) throw new RejectedExecutionException("Queue capacity exceeded"); - else if ((room == 0 || - a[m & (s - 2)] == null) && // at most one existing task - pool != null) - pool.signalWork(); + if ((room == 0 || // pad for InterruptibleTasks + a[m & (s - ((internal || task == null || + task.getClass().getSuperclass() != + interruptibleTaskClass) ? 1 : 2))] == null) && + pool != null) + pool.signalWork(); // may have appeared empty } } @@ -1351,10 +1369,8 @@ public class ForkJoinPool extends AbstractExecutorService { updateBase(nb); break; } - while (b == (b = base)) { - U.loadFence(); + while (b == (b = U.getIntAcquire(this, BASE))) Thread.onSpinWait(); // spin to reduce memory traffic - } if (p - b <= 0) break; } @@ -1378,12 +1394,12 @@ public class ForkJoinPool extends AbstractExecutorService { final boolean tryUnpush(ForkJoinTask task, boolean internal) { boolean taken = false; ForkJoinTask[] a = array; - int p = top, s = p - 1, cap, k; + int p = top, s = p - 1, cap; long k; if (a != null && (cap = a.length) > 0 && - a[k = (cap - 1) & s] == task && + U.getReference(a, k = slotOffset((cap - 1) & s)) == task && (internal || tryLockPhase())) { if (top == p && - U.compareAndSetReference(a, slotOffset(k), task, null)) { + U.compareAndSetReference(a, k, task, null)) { taken = true; updateTop(s); } @@ -1417,28 +1433,25 @@ public class ForkJoinPool extends AbstractExecutorService { * Polls for a task. Used only by non-owners. */ final ForkJoinTask poll() { - for (int b = base;;) { - int cap, k, nb; ForkJoinTask[] a; + for (int pb = -1, b; ; pb = b) { // track progress + ForkJoinTask t; int cap, nb; long k; ForkJoinTask[] a; if ((a = array) == null || (cap = a.length) <= 0) break; - long kp = slotOffset(k = (cap - 1) & b); - int nk = (nb = b + 1) & (cap - 1); // next slot - int sk = (b + 2) & (cap - 1); // 2nd slot ahead - ForkJoinTask t = a[k]; - U.loadFence(); - if (b == (b = base)) { // else inconsistent - if (t != null) { - if (U.compareAndSetReference(a, kp, t, null)) { - updateBase(nb); - return t; - } - b = base; - } - else if (a[sk] == null && a[nk] == null && a[k] == null) { - if (top - b <= 0) - break; // empty + t = (ForkJoinTask)U.getReferenceAcquire( + a, k = slotOffset((cap - 1) & (b = base))); + Object u = U.getReference( // next slot + a, slotOffset((cap - 1) & (nb = b + 1))); + if (base != b) // inconsistent + ; + else if (t == null) { + if (u == null && top - b <= 0) + break; // empty + if (pb == b) Thread.onSpinWait(); // stalled - } + } + else if (U.compareAndSetReference(a, k, t, null)) { + updateBase(nb); + return t; } } return null; @@ -1449,14 +1462,11 @@ public class ForkJoinPool extends AbstractExecutorService { /** * Runs the given task, as well as remaining local tasks. */ - final void topLevelExec(ForkJoinTask task, int cfg) { - int fifo = cfg & FIFO; + final void topLevelExec(ForkJoinTask task, int fifo) { while (task != null) { task.doExec(); task = nextLocalTask(fifo); } - if ((cfg & CLEAR_TLS) != 0) - ThreadLocalRandom.eraseThreadLocals(Thread.currentThread()); } /** @@ -1468,23 +1478,24 @@ public class ForkJoinPool extends AbstractExecutorService { int b = base, p = top, s = p - 1, d = p - b, cap; if (a != null && (cap = a.length) > 0) { for (int m = cap - 1, i = s; d > 0; --i, --d) { - ForkJoinTask t; int k; boolean taken; - if ((t = a[k = i & m]) == null) + long k; boolean taken; + ForkJoinTask t = (ForkJoinTask)U.getReference( + a, k = slotOffset(i & m)); + if (t == null) break; if (t == task) { - long pos = slotOffset(k); if (!internal && !tryLockPhase()) break; // fail if locked if (taken = (top == p && - U.compareAndSetReference(a, pos, task, null))) { + U.compareAndSetReference(a, k, task, null))) { if (i == s) // act as pop updateTop(s); else if (i == base) // act as poll updateBase(i + 1); else { // swap with top U.putReferenceVolatile( - a, pos, (ForkJoinTask) + a, k, (ForkJoinTask) U.getAndSetReference( a, slotOffset(s & m), null)); updateTop(s); @@ -1512,19 +1523,18 @@ public class ForkJoinPool extends AbstractExecutorService { int status = 0; if (task != null) { outer: for (;;) { - ForkJoinTask[] a; ForkJoinTask t; boolean taken; - int stat, p, s, cap, k; + ForkJoinTask[] a; boolean taken; Object o; + int stat, p, s, cap; if ((stat = task.status) < 0) { status = stat; break; } if ((a = array) == null || (cap = a.length) <= 0) break; - if ((t = a[k = (cap - 1) & (s = (p = top) - 1)]) == null) + long k = slotOffset((cap - 1) & (s = (p = top) - 1)); + if (!((o = U.getReference(a, k)) instanceof CountedCompleter)) break; - if (!(t instanceof CountedCompleter)) - break; - CountedCompleter f = (CountedCompleter)t; + CountedCompleter t = (CountedCompleter)o, f = t; for (int steps = cap;;) { // bound path if (f == task) break; @@ -1535,7 +1545,7 @@ public class ForkJoinPool extends AbstractExecutorService { break; if (taken = (top == p && - U.compareAndSetReference(a, slotOffset(k), t, null))) + U.compareAndSetReference(a, k, t, null))) updateTop(s); if (!internal) unlockPhase(); @@ -1557,11 +1567,11 @@ public class ForkJoinPool extends AbstractExecutorService { */ final void helpAsyncBlocker(ManagedBlocker blocker) { for (;;) { - ForkJoinTask[] a; int b, cap, k; + ForkJoinTask t; ForkJoinTask[] a; int b, cap; long k; if ((a = array) == null || (cap = a.length) <= 0) break; - ForkJoinTask t = a[k = (b = base) & (cap - 1)]; - U.loadFence(); + t = (ForkJoinTask)U.getReferenceAcquire( + a, k = slotOffset((cap - 1) & (b = base))); if (t == null) { if (top - b <= 0) break; @@ -1572,7 +1582,7 @@ public class ForkJoinPool extends AbstractExecutorService { if (blocker != null && blocker.isReleasable()) break; if (base == b && t != null && - U.compareAndSetReference(a, slotOffset(k), t, null)) { + U.compareAndSetReference(a, k, t, null)) { updateBase(b + 1); t.doExec(); } @@ -1581,6 +1591,18 @@ public class ForkJoinPool extends AbstractExecutorService { // misc + /** + * Cancels all local tasks. Called only by owner. + */ + final void cancelTasks() { + for (ForkJoinTask t; (t = nextLocalTask(0)) != null; ) { + try { + t.cancel(false); + } catch (Throwable ignore) { + } + } + } + /** * Returns true if internal and not known to be blocked. */ @@ -1630,6 +1652,16 @@ public class ForkJoinPool extends AbstractExecutorService { */ static volatile RuntimePermission modifyThreadPermission; + /** + * Cached for faster type tests. + */ + static final Class interruptibleTaskClass; + + /** + * For VirtualThread intrinsics + */ + private static final JavaLangAccess JLA; + // fields declared in order of their likely layout on most VMs volatile CountDownLatch termination; // lazily constructed final Predicate saturate; @@ -1773,7 +1805,7 @@ public class ForkJoinPool extends AbstractExecutorService { * @param w caller's WorkQueue */ final void registerWorker(WorkQueue w) { - if (w != null) { + if (w != null && (runState & STOP) == 0L) { ThreadLocalRandom.localInit(); int seed = w.stackPred = ThreadLocalRandom.getProbe(); int phaseSeq = seed & ~((IDLE << 1) - 1); // initial phase tag @@ -1824,49 +1856,35 @@ public class ForkJoinPool extends AbstractExecutorService { * @param ex the exception causing failure, or null if none */ final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) { - if ((runState & STOP) != 0L) // ensure released - releaseAll(); - WorkQueue w = null; - int src = 0, phase = 0; - boolean replaceable = false; - if (wt != null && (w = wt.workQueue) != null) { - phase = w.phase; - if ((src = w.source) != DROPPED) { - w.source = DROPPED; // else already dropped - if (phase != 0) { // else failed to start - replaceable = true; - if (w.top - w.base > 0) { - ForkJoinTask t; // cancel remaining tasks - while ((t = w.nextLocalTask()) != null) { - try { - t.cancel(false); - } catch (Throwable ignore) { - } - } - } - } - } - } - if (src != DROPPED) { // decrement counts - long c = ctl; + WorkQueue w = null; // null if not created + int phase = 0; // 0 if not registered + if (wt != null && (w = wt.workQueue) != null && + (phase = w.phase) != 0 && (phase & IDLE) != 0) + releaseWaiters(); // ensure released + if (w == null || w.source != DROPPED) { + long c = ctl; // decrement counts do {} while (c != (c = compareAndExchangeCtl( c, ((RC_MASK & (c - RC_UNIT)) | (TC_MASK & (c - TC_UNIT)) | (LMASK & c))))); } - if ((tryTerminate(false, false) & STOP) == 0L && w != null) { - WorkQueue[] qs; int n, i; // remove index unless terminating + if (phase != 0 && w != null) { // remove index unless terminating long ns = w.nsteals & 0xffffffffL; - if ((lockRunState() & STOP) != 0L) - replaceable = false; - else if ((qs = queues) != null && (n = qs.length) > 0 && - qs[i = phase & SMASK & (n - 1)] == w) { - qs[i] = null; - stealCount += ns; // accumulate steals + if ((runState & STOP) == 0L) { + WorkQueue[] qs; int n, i; + if ((lockRunState() & STOP) == 0L && + (qs = queues) != null && (n = qs.length) > 0 && + qs[i = phase & SMASK & (n - 1)] == w) { + qs[i] = null; + stealCount += ns; // accumulate steals + } + unlockRunState(); } - unlockRunState(); - if (replaceable) - signalWork(); + } + if ((tryTerminate(false, false) & STOP) == 0L && + phase != 0 && w != null && w.source != DROPPED) { + signalWork(); // possibly replace + w.cancelTasks(); // clean queue } if (ex != null) ForkJoinTask.rethrow(ex); @@ -1912,12 +1930,9 @@ public class ForkJoinPool extends AbstractExecutorService { /** * Releases all waiting workers. Called only during shutdown. - * - * @return current ctl */ - private long releaseAll() { - long c = ctl; - for (;;) { + private void releaseWaiters() { + for (long c = ctl;;) { WorkQueue[] qs; WorkQueue v; int sp, i; if ((sp = (int)c) == 0 || (qs = queues) == null || qs.length <= (i = sp & SMASK) || (v = qs[i]) == null) @@ -1930,7 +1945,6 @@ public class ForkJoinPool extends AbstractExecutorService { U.unpark(v.owner); } } - return c; } /** @@ -1940,7 +1954,7 @@ public class ForkJoinPool extends AbstractExecutorService { * unlocked; if so, setting STOP if shutdown is enabled */ private int quiescent() { - outer: for (;;) { + for (;;) { long phaseSum = 0L; boolean swept = false; for (long e, prevRunState = 0L; ; prevRunState = e) { @@ -1983,64 +1997,61 @@ public class ForkJoinPool extends AbstractExecutorService { final void runWorker(WorkQueue w) { if (w != null) { int phase = w.phase, r = w.stackPred; // seed from registerWorker - int cfg = w.config, src = -1, nsteals = 0; - rescan: for (boolean scanned = false;;) { + int fifo = w.config & FIFO, nsteals = 0, src = -1; + for (;;) { WorkQueue[] qs; r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift if ((runState & STOP) != 0L || (qs = queues) == null) - return; + break; int n = qs.length, i = r, step = (r >>> 16) | 1; - for (int l = n; l > 0; --l, i += step) { // scan queues - int j; WorkQueue q; - if ((q = qs[j = i & (n - 1)]) != null) { - boolean taken = false; - for (int pb = -1, b = q.base;;) { - int cap, k, nb; ForkJoinTask[] a; - if ((a = q.array) == null || (cap = a.length) <= 0) - continue rescan; - long kp = slotOffset(k = (cap - 1) & b); - int nk = (nb = b + 1) & (cap - 1); // next slot - int sk = (b + 2) & (cap - 1); // 2nd slot ahead - ForkJoinTask t = a[k]; - U.loadFence(); - if (b != (b = q.base)) - ; // inconsistent - else if (t == null) { // possibly empty - if (a[sk] == null && a[nk] == null && - a[k] == null) { // screen - if (q.top - b > 0) { // stalled - if (!taken) // move unless taking - continue rescan; + boolean rescan = false; + scan: for (int l = n; l > 0; --l, i += step) { // scan queues + int j, cap; WorkQueue q; ForkJoinTask[] a; + if ((q = qs[j = i & (n - 1)]) != null && + (a = q.array) != null && (cap = a.length) > 0) { + for (int m = cap - 1, pb = -1, b = q.base;;) { + ForkJoinTask t; long k; + t = (ForkJoinTask)U.getReferenceAcquire( + a, k = slotOffset(m & b)); + if (b != (b = q.base) || t == null || + !U.compareAndSetReference(a, k, t, null)) { + if (a[b & m] == null) { + if (rescan) // end of run + break scan; + if (a[(b + 1) & m] == null && + a[(b + 2) & m] == null) { + break; // probably empty + } + if (pb == (pb = b)) { // track progress + rescan = true; // stalled; reorder scan + break scan; } - else if (taken) - continue rescan; // depleted; restart - else - break; // empty } - if (pb == (pb = b)) // base unchanged - Thread.onSpinWait(); } - else if (!U.compareAndSetReference(a, kp, t, null)) - b = q.base; // contended else { - q.base = nb; + boolean propagate; + int nb = q.base = b + 1; w.nsteals = ++nsteals; - w.source = j; // volatile write - if (taken != (taken = true) && a[nk] != null) - signalWork(); // propagate signal - w.topLevelExec(t, cfg); - if ((b = q.base) != nb && src != (src = j)) - continue rescan; // reduce interference + w.source = j; // volatile + rescan = true; + if (propagate = + ((src != (src = j) || + t.getClass().getSuperclass() == + interruptibleTaskClass) && + a[nb & m] != null)) + signalWork(); + w.topLevelExec(t, fifo); + if ((b = q.base) != nb && !propagate) + break scan; // reduce interference } } } } - if (!scanned) - scanned = true; // rescan before deactivate - else if (((phase = deactivate(w, r, phase)) & IDLE) == 0) - scanned = false; - else - return; + if (!rescan) { + if (((phase = deactivate(w, phase)) & IDLE) != 0) + break; + src = -1; // re-enable propagation + } } } } @@ -2049,56 +2060,38 @@ public class ForkJoinPool extends AbstractExecutorService { * Deactivates and if necessary awaits signal or termination. * * @param w the worker - * @param r random seed * @param phase current phase * @return current phase, with IDLE set if worker should exit */ - private int deactivate(WorkQueue w, int r, int phase) { + private int deactivate(WorkQueue w, int phase) { + if (w == null) // currently impossible + return IDLE; int p = phase | IDLE, activePhase = phase + (IDLE << 1); - if (w != null) { // always true - w.phase = p; - long pc = ctl, qc; - for (;;) { // try to enqueue - w.stackPred = (int)pc; // set ctl stack link - qc = (activePhase & LMASK) | ((pc - RC_UNIT) & UMASK); - if (pc == (pc = compareAndExchangeCtl(pc, qc))) // success - break; - if ((pc & RC_MASK) >= (qc & RC_MASK)) { - p = w.phase = phase; // back out on possible signal - break; - } - } - if (p != phase && // check quiescent termination - ((runState & SHUTDOWN) == 0L || quiescent() <= 0)) { - WorkQueue[] qs; - int spins = ((short)(qc >>> TC_SHIFT) << 1) + SPIN_WAITS + 1; - while ((p = w.phase) != activePhase && --spins > 0) - Thread.onSpinWait(); // reduce flailing - if (p != activePhase && (qs = queues) != null) { - int n = qs.length, step = (r >>> 16) | 1; - for (int i = r, l = n; l > 0; --l, i += step) { - WorkQueue q; // check for missed signals - if ((q = qs[i & (n - 1)]) != null && - q.top - q.base > 0) { - if (ctl == qc && compareAndSetCtl(qc, pc)) { - p = w.phase = activePhase; - break; // self-signal - } - if ((p = w.phase) == activePhase) - break; - } - } - if (p != activePhase) { - long delay = (((qc & RC_MASK) > 0L) ? 0L : - (w.source != INVALID_ID) ? keepAlive : - TIMEOUT_SLOP); // minimal delay if cascade - if ((p = w.phase) != activePhase) - p = awaitWork(w, p, delay); // block, drop, or exit - } - } - } + long pc = ctl, qc = (activePhase & LMASK) | ((pc - RC_UNIT) & UMASK); + w.stackPred = (int)pc; // set ctl stack link + w.phase = p; + if (!compareAndSetCtl(pc, qc)) // try to enqueue + return w.phase = phase; // back out on possible signal + int ac = (short)(qc >>> RC_SHIFT), n; long e; WorkQueue[] qs; + if (((e = runState) & STOP) != 0L || + ((e & SHUTDOWN) != 0L && ac == 0 && quiescent() > 0) || + (qs = queues) == null || (n = qs.length) <= 0) + return IDLE; // terminating + int prechecks = Math.min(ac, 2); // reactivation threshold + for (int k = Math.max(n + (n << 1), SPIN_WAITS << 1);;) { + WorkQueue q; int cap; ForkJoinTask[] a; + if (w.phase == activePhase) + return activePhase; + if (--k < 0) + return awaitWork(w, p); // block, drop, or exit + if ((k & 1) != 0) + Thread.onSpinWait(); // interleave spins and rechecks + else if ((q = qs[k & (n - 1)]) != null && + (a = q.array) != null && (cap = a.length) > 0 && + a[q.base & (cap - 1)] != null && --prechecks < 0 && + ctl == qc && compareAndSetCtl(qc, pc)) + return w.phase = activePhase; // reactivate } - return p; } /** @@ -2106,33 +2099,41 @@ public class ForkJoinPool extends AbstractExecutorService { * * @param w the work queue * @param p current phase (known to be idle) - * @param delay if nonzero keepAlive before trimming if quiescent * @return current phase, with IDLE set if worker should exit */ - private int awaitWork(WorkQueue w, int p, long delay) { + private int awaitWork(WorkQueue w, int p) { if (w != null) { + ForkJoinWorkerThread t; long deadline; + if ((w.config & CLEAR_TLS) != 0 && (t = w.owner) != null) + t.resetThreadLocals(); // clear before reactivate + if ((ctl & RC_MASK) > 0L) + deadline = 0L; + else if ((deadline = + (((w.source != INVALID_ID) ? keepAlive : TIMEOUT_SLOP)) + + System.currentTimeMillis()) == 0L) + deadline = 1L; // avoid zero int activePhase = p + IDLE; - LockSupport.setCurrentBlocker(this); - long deadline = (delay == 0L ? 0L : - delay + System.currentTimeMillis()); - w.parking = 1; // enable unpark - while ((p = w.phase) != activePhase) { - boolean trimmable = false; int trim; - Thread.interrupted(); // clear status - if ((runState & STOP) != 0L) - break; - if (deadline != 0L) { - if ((trim = tryTrim(w, p, deadline)) > 0) + if ((p = w.phase) != activePhase && (runState & STOP) == 0L) { + LockSupport.setCurrentBlocker(this); + w.parking = 1; // enable unpark + while ((p = w.phase) != activePhase) { + boolean trimmable = false; int trim; + Thread.interrupted(); // clear status + if ((runState & STOP) != 0L) break; - else if (trim < 0) - deadline = 0L; - else - trimmable = true; + if (deadline != 0L) { + if ((trim = tryTrim(w, p, deadline)) > 0) + break; + else if (trim < 0) + deadline = 0L; + else + trimmable = true; + } + U.park(trimmable, deadline); } - U.park(trimmable, deadline); + w.parking = 0; + LockSupport.setCurrentBlocker(null); } - w.parking = 0; - LockSupport.setCurrentBlocker(null); } return p; } @@ -2287,12 +2288,13 @@ public class ForkJoinPool extends AbstractExecutorService { int j; WorkQueue q; if ((q = qs[j = r & SMASK & (n - 1)]) != null) { for (;;) { - int sq = q.source, b, cap, k; ForkJoinTask[] a; + ForkJoinTask t; ForkJoinTask[] a; + boolean eligible = false; + int sq = q.source, b, cap; long k; if ((a = q.array) == null || (cap = a.length) <= 0) break; - ForkJoinTask t = a[k = (b = q.base) & (cap - 1)]; - U.loadFence(); - boolean eligible = false; + t = (ForkJoinTask)U.getReferenceAcquire( + a, k = slotOffset((cap - 1) & (b = q.base))); if (t == task) eligible = true; else if (t != null) { // check steal chain @@ -2311,18 +2313,16 @@ public class ForkJoinPool extends AbstractExecutorService { } if ((s = task.status) < 0) break outer; // validate - if (q.source == sq && q.base == b && a[k] == t) { - int nb = b + 1, nk = nb & (cap - 1); + if (q.source == sq && q.base == b && + U.getReference(a, k) == t) { if (!eligible) { // revisit if nonempty - if (!rescan && t == null && - (a[nk] != null || q.top - b > 0)) + if (!rescan && t == null && q.top - b > 0) rescan = true; break; } - if (U.compareAndSetReference( - a, slotOffset(k), t, null)) { - q.updateBase(nb); - w.source = j; + if (U.compareAndSetReference(a, k, t, null)) { + q.base = b + 1; + w.source = j; // volatile write t.doExec(); w.source = wsrc; rescan = true; // restart at index r @@ -2369,12 +2369,13 @@ public class ForkJoinPool extends AbstractExecutorService { int j; WorkQueue q; if ((q = qs[j = r & SMASK & (n - 1)]) != null) { for (;;) { - ForkJoinTask[] a; int b, cap, k; + ForkJoinTask t; ForkJoinTask[] a; + int b, cap, nb; long k; + boolean eligible = false; if ((a = q.array) == null || (cap = a.length) <= 0) break; - ForkJoinTask t = a[k = (b = q.base) & (cap - 1)]; - U.loadFence(); - boolean eligible = false; + t = (ForkJoinTask)U.getReferenceAcquire( + a, k = slotOffset((cap - 1) & (b = q.base))); if (t instanceof CountedCompleter) { CountedCompleter f = (CountedCompleter)t; for (int steps = cap; steps > 0; --steps) { @@ -2389,19 +2390,17 @@ public class ForkJoinPool extends AbstractExecutorService { if ((s = task.status) < 0) // validate break outer; if (q.base == b) { - int nb = b + 1, nk = nb & (cap - 1); if (eligible) { if (U.compareAndSetReference( - a, slotOffset(k), t, null)) { - q.updateBase(nb); + a, k, t, null)) { + q.updateBase(b + 1); t.doExec(); locals = rescan = true; break scan; } } - else if (a[k] == t) { - if (!rescan && t == null && - (a[nk] != null || q.top - b > 0)) + else if (U.getReference(a, k) == t) { + if (!rescan && t == null && q.top - b > 0) rescan = true; // revisit break; } @@ -2454,14 +2453,15 @@ public class ForkJoinPool extends AbstractExecutorService { int j; WorkQueue q; if ((q = qs[j = r & SMASK & (n - 1)]) != null && q != w) { for (;;) { - ForkJoinTask[] a; int b, cap, k; + ForkJoinTask t; ForkJoinTask[] a; + int b, cap; long k; if ((a = q.array) == null || (cap = a.length) <= 0) break; - ForkJoinTask t = a[k = (b = q.base) & (cap - 1)]; + t = (ForkJoinTask)U.getReferenceAcquire( + a, k = slotOffset((cap - 1) & (b = q.base))); if (t != null && phase == inactivePhase) // reactivate w.phase = phase = activePhase; - U.loadFence(); - if (q.base == b && a[k] == t) { + if (q.base == b && U.getReference(a, k) == t) { int nb = b + 1; if (t == null) { if (!rescan) { @@ -2474,10 +2474,9 @@ public class ForkJoinPool extends AbstractExecutorService { } break; } - if (U.compareAndSetReference( - a, slotOffset(k), t, null)) { - q.updateBase(nb); - w.source = j; + if (U.compareAndSetReference(a, k, t, null)) { + q.base = nb; + w.source = j; // volatile write t.doExec(); w.source = wsrc; rescan = locals = true; @@ -2603,15 +2602,15 @@ public class ForkJoinPool extends AbstractExecutorService { if (w == null) w = new WorkQueue(null, id, 0, false); w.phase = id; - long stop = lockRunState() & STOP; - if (stop == 0L && queues == qs && qs[i] == null) { + long isShutdown = lockRunState() & SHUTDOWN; + if (isShutdown == 0L && queues == qs && qs[i] == null) { q = qs[i] = w; // else retry w = null; } unlockRunState(); if (q != null) return q; - if (stop != 0L) + if (isShutdown != 0L) break; } else if (!q.tryLockPhase()) // move index @@ -2623,7 +2622,6 @@ public class ForkJoinPool extends AbstractExecutorService { else return q; } - tryTerminate(false, false); throw new RejectedExecutionException(); } @@ -2640,7 +2638,6 @@ public class ForkJoinPool extends AbstractExecutorService { } q.push(task, signalIfEmpty ? this : null, internal); } - private static final JavaLangAccess JLA = SharedSecrets.getJavaLangAccess(); /** * Returns queue for an external submission, bypassing call to @@ -2760,51 +2757,96 @@ public class ForkJoinPool extends AbstractExecutorService { * @return runState on exit */ private long tryTerminate(boolean now, boolean enable) { - long e = runState, isShutdown; - if ((e & STOP) == 0L) { - if (now) - runState = e = (lockRunState() + RS_LOCK) | STOP | SHUTDOWN; - else if ((isShutdown = (e & SHUTDOWN)) != 0 || enable) { - if (isShutdown == 0) - getAndBitwiseOrRunState(SHUTDOWN); - if (quiescent() > 0) - e = runState; - } - if ((e & STOP) != 0L && (releaseAll() & RC_MASK) > 0L && now) - interruptAll(); - } - if ((e & (STOP | TERMINATED)) == STOP) { // help cancel tasks - if ((ctl & RC_MASK) > 0L) { // unless all inactive - int r = (int)Thread.currentThread().threadId(); - WorkQueue[] qs = queues; // stagger traversals - int n = (qs == null) ? 0 : qs.length; - for (int l = n; l > 0; --l, ++r) { - WorkQueue q; ForkJoinTask t; - if ((q = qs[r & (n - 1)]) != null && - q.source != DROPPED) { - while ((t = q.poll()) != null) { - try { - t.cancel(false); - } catch (Throwable ignore) { - } - } - } + long e, isShutdown, ps; + if (((e = runState) & TERMINATED) != 0L) + now = false; + else if ((e & STOP) != 0L) + now = true; + else if (now) { + if (((ps = getAndBitwiseOrRunState(SHUTDOWN|STOP) & STOP)) == 0L) { + if ((ps & RS_LOCK) != 0L) { + spinLockRunState(); // ensure queues array stable after stop + unlockRunState(); } + interruptAll(); } - if (((e = runState) & TERMINATED) == 0L && ctl == 0L) { - e |= TERMINATED; - if ((getAndBitwiseOrRunState(TERMINATED) & TERMINATED) == 0L) { - CountDownLatch done; SharedThreadContainer ctr; - if ((done = termination) != null) - done.countDown(); - if ((ctr = container) != null) - ctr.close(); + } + else if ((isShutdown = (e & SHUTDOWN)) != 0L || enable) { + if (isShutdown == 0L) + getAndBitwiseOrRunState(SHUTDOWN); + if (quiescent() > 0) + now = true; + } + + if (now) { + releaseWaiters(); + for (;;) { + if (((e = runState) & CLEANED) == 0L) { + boolean clean = cleanQueues(); + if (((e = runState) & CLEANED) == 0L && clean) + e = getAndBitwiseOrRunState(CLEANED) | CLEANED; + } + if ((e & TERMINATED) != 0L) + break; + if (ctl != 0L) // else loop if didn't finish cleaning + break; + if ((e & CLEANED) != 0L) { + e |= TERMINATED; + if ((getAndBitwiseOrRunState(TERMINATED) & TERMINATED) == 0L) { + CountDownLatch done; SharedThreadContainer ctr; + if ((done = termination) != null) + done.countDown(); + if ((ctr = container) != null) + ctr.close(); + } + break; } } } return e; } + /** + * Scans queues in a psuedorandom order based on thread id, + * cancelling tasks until empty, or returning early upon + * interference or still-active external queues, in which case + * other calls will finish cancellation. + * + * @return true if all queues empty + */ + private boolean cleanQueues() { + int r = (int)Thread.currentThread().threadId(); + r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift + int step = (r >>> 16) | 1; // randomize traversals + WorkQueue[] qs = queues; + int n = (qs == null) ? 0 : qs.length; + for (int l = n; l > 0; --l, r += step) { + WorkQueue q; ForkJoinTask[] a; int cap; + if ((q = qs[r & (n - 1)]) != null && + (a = q.array) != null && (cap = a.length) > 0) { + for (;;) { + ForkJoinTask t; int b; long k; + t = (ForkJoinTask)U.getReferenceAcquire( + a, k = slotOffset((cap - 1) & (b = q.base))); + if (q.base == b && t != null && + U.compareAndSetReference(a, k, t, null)) { + q.updateBase(b + 1); + try { + t.cancel(false); + } catch (Throwable ignore) { + } + } + else if ((q.phase & (IDLE|1)) == 0 || // externally locked + q.top - q.base > 0) + return false; // incomplete + else + break; + } + } + } + return true; + } + /** * Interrupts all workers */ @@ -2814,8 +2856,7 @@ public class ForkJoinPool extends AbstractExecutorService { int n = (qs == null) ? 0 : qs.length; for (int i = 1; i < n; i += 2) { WorkQueue q; Thread o; - if ((q = qs[i]) != null && (o = q.owner) != null && o != current && - q.source != DROPPED) { + if ((q = qs[i]) != null && (o = q.owner) != null && o != current) { try { o.interrupt(); } catch (Throwable ignore) { @@ -4018,15 +4059,10 @@ public class ForkJoinPool extends AbstractExecutorService { if ((scale & (scale - 1)) != 0) throw new Error("array index scale not a power of two"); - defaultForkJoinWorkerThreadFactory = - new DefaultForkJoinWorkerThreadFactory(); - @SuppressWarnings("removal") - ForkJoinPool p = common = (System.getSecurityManager() == null) ? - new ForkJoinPool((byte)0) : - AccessController.doPrivileged(new PrivilegedAction<>() { - public ForkJoinPool run() { - return new ForkJoinPool((byte)0); }}); + interruptibleTaskClass = ForkJoinTask.InterruptibleTask.class; + Class dep = LockSupport.class; // ensure loaded // allow access to non-public methods + JLA = SharedSecrets.getJavaLangAccess(); SharedSecrets.setJavaUtilConcurrentFJPAccess( new JavaUtilConcurrentFJPAccess() { @Override @@ -4037,6 +4073,13 @@ public class ForkJoinPool extends AbstractExecutorService { pool.endCompensatedBlock(post); } }); - Class dep = LockSupport.class; // ensure loaded + defaultForkJoinWorkerThreadFactory = + new DefaultForkJoinWorkerThreadFactory(); + @SuppressWarnings("removal") + ForkJoinPool p = common = (System.getSecurityManager() == null) ? + new ForkJoinPool((byte)0) : + AccessController.doPrivileged(new PrivilegedAction<>() { + public ForkJoinPool run() { + return new ForkJoinPool((byte)0); }}); } } diff --git a/src/java.base/share/classes/java/util/concurrent/ForkJoinWorkerThread.java b/src/java.base/share/classes/java/util/concurrent/ForkJoinWorkerThread.java index 27b9b0812d6..7a468666ad3 100644 --- a/src/java.base/share/classes/java/util/concurrent/ForkJoinWorkerThread.java +++ b/src/java.base/share/classes/java/util/concurrent/ForkJoinWorkerThread.java @@ -41,6 +41,7 @@ import java.security.PrivilegedAction; import java.security.ProtectionDomain; import jdk.internal.access.JavaLangAccess; import jdk.internal.access.SharedSecrets; +import jdk.internal.misc.Unsafe; /** * A thread managed by a {@link ForkJoinPool}, which executes @@ -225,6 +226,25 @@ public class ForkJoinWorkerThread extends Thread { (sq = qs[i]) != null && sq.top - sq.base > 0) || q.top - q.base > 0)); } + + /** + * Clears ThreadLocals, and if necessary resets ContextClassLoader + */ + final void resetThreadLocals() { + if (U.getReference(this, THREADLOCALS) != null) + U.putReference(this, THREADLOCALS, null); + if (U.getReference(this, INHERITABLETHREADLOCALS) != null) + U.putReference(this, INHERITABLETHREADLOCALS, null); + if ((this instanceof InnocuousForkJoinWorkerThread) && + ((InnocuousForkJoinWorkerThread)this).needCCLReset()) + super.setContextClassLoader(ClassLoader.getSystemClassLoader()); + } + + private static final Unsafe U = Unsafe.getUnsafe(); + private static final long THREADLOCALS + = U.objectFieldOffset(Thread.class, "threadLocals"); + private static final long INHERITABLETHREADLOCALS + = U.objectFieldOffset(Thread.class, "inheritableThreadLocals"); private static final JavaLangAccess JLA = SharedSecrets.getJavaLangAccess(); /** @@ -236,6 +256,7 @@ public class ForkJoinWorkerThread extends Thread { static final class InnocuousForkJoinWorkerThread extends ForkJoinWorkerThread { /** The ThreadGroup for all InnocuousForkJoinWorkerThreads */ private static final ThreadGroup innocuousThreadGroup = createGroup(); + private boolean resetCCL; InnocuousForkJoinWorkerThread(ForkJoinPool pool) { super(innocuousThreadGroup, pool, true, true); } @@ -244,9 +265,20 @@ public class ForkJoinWorkerThread extends Thread { public void setUncaughtExceptionHandler(UncaughtExceptionHandler x) { } @Override // paranoically + @SuppressWarnings("removal") public void setContextClassLoader(ClassLoader cl) { - if (cl != null && ClassLoader.getSystemClassLoader() != cl) + if (System.getSecurityManager() != null && + cl != null && ClassLoader.getSystemClassLoader() != cl) throw new SecurityException("setContextClassLoader"); + resetCCL = true; + super.setContextClassLoader(cl); + } + + final boolean needCCLReset() { // get and clear + boolean needReset; + if (needReset = resetCCL) + resetCCL = false; + return needReset; } static ThreadGroup createGroup() { @@ -255,5 +287,6 @@ public class ForkJoinWorkerThread extends Thread { group = p; return new ThreadGroup(group, "InnocuousForkJoinWorkerThreadGroup"); } + } }