From 18df6fd5ba93c3c2d5965c072d482dcf7cbfc675 Mon Sep 17 00:00:00 2001
From: Doug Lea
Date: Thu, 21 Nov 2024 11:50:46 +0000
Subject: [PATCH] 8336707: Contention of ForkJoinPool grows when stealing works
Reviewed-by: vklang
---
.../java/util/concurrent/ForkJoinPool.java | 733 +++++++++---------
.../util/concurrent/ForkJoinWorkerThread.java | 35 +-
2 files changed, 422 insertions(+), 346 deletions(-)
diff --git a/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java b/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java
index ee2f85249d7..0f9ccbf6284 100644
--- a/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java
+++ b/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java
@@ -259,10 +259,7 @@ public class ForkJoinPool extends AbstractExecutorService {
* (http://www.di.ens.fr/~zappa/readings/ppopp13.pdf) for an
* analysis of memory ordering requirements in work-stealing
* algorithms similar to the one used here. We use per-operation
- * ordered writes of various kinds for updates, but usually use
- * explicit load fences for reads, to cover access of several
- * fields of possibly several objects without further constraining
- * read-by-read ordering.
+ * ordered writes of various kinds for accesses when required.
*
* We also support a user mode in which local task processing is
* in FIFO, not LIFO order, simply by using a local version of
@@ -302,8 +299,7 @@ public class ForkJoinPool extends AbstractExecutorService {
*
* * Slot k must be read with an acquiring read, which it must
* anyway to dereference and run the task if the (acquiring)
- * CAS succeeds, but uses an explicit acquire fence to support
- * the following rechecks even if the CAS is not attempted.
+ * CAS succeeds.
*
* * q.base may change between reading and using its value to
* index the slot. To avoid trying to use the wrong t, the
@@ -410,7 +406,11 @@ public class ForkJoinPool extends AbstractExecutorService {
*
* Field "runState" and per-WorkQueue field "phase" play similar
* roles, as lockable, versioned counters. Field runState also
- * includes monotonic event bits (SHUTDOWN, STOP, and TERMINATED).
+ * includes monotonic event bits:
+ * * SHUTDOWN: no more external tasks accepted; STOP when quiescent
+ * * STOP: no more tasks run, and deregister all workers
+ * * CLEANED: all unexecuted tasks have been cancelled
+ * * TERMINATED: all workers deregistered and all queues cleaned
* The version tags enable detection of state changes (by
* comparing two reads) modulo bit wraparound. The bit range in
* each case suffices for purposes of determining quiescence,
@@ -546,22 +546,27 @@ public class ForkJoinPool extends AbstractExecutorService {
* * If computations are purely tree structured, it suffices for
* every worker to activate another when it pushes a task into
* an empty queue, resulting in O(log(#threads)) steps to full
- * activation. Emptiness must be conservatively approximated
- * (by checking if there is apparently at most one existing
- * task) which may result in unnecessary signals. Also, to
- * reduce resource usages in some cases, at the expense of
- * slower startup in others, activation of an idle thread is
- * preferred over creating a new one, here and elsewhere.
+ * activation. Emptiness must be conservatively approximated,
+ * which may result in unnecessary signals. Also, to reduce
+ * resource usages in some cases, at the expense of slower
+ * startup in others, activation of an idle thread is preferred
+ * over creating a new one, here and elsewhere.
*
* * At the other extreme, if "flat" tasks (those that do not in
* turn generate others) come in serially from only a single
- * producer, each worker taking its first (since the last
- * activation) task from a queue should propagate a signal if
- * there are more tasks in that queue. This is equivalent to,
- * but generally faster than, arranging the stealer take
- * multiple tasks, re-pushing one or more on its own queue, and
- * signalling (because its queue is empty), also resulting in
- * logarithmic full activation time.
+ * producer, each worker taking a task from a queue should
+ * propagate a signal if there are more tasks in that
+ * queue. This is equivalent to, but generally faster than,
+ * arranging the stealer take multiple tasks, re-pushing one or
+ * more on its own queue, and signalling (because its queue is
+ * empty), also resulting in logarithmic full activation
+ * time. If tasks do not not engage in unbounded loops based on
+ * the actions of other workers with unknown dependencies loop,
+ * this form of proagation can be limited to one signal per
+ * activation (phase change). We distinguish the cases by
+ * further signalling only if the task is an InterruptibleTask
+ * (see below), which are the only supported forms of task that
+ * may do so.
*
* * Because we don't know about usage patterns (or most commonly,
* mixtures), we use both approaches, which present even more
@@ -611,9 +616,12 @@ public class ForkJoinPool extends AbstractExecutorService {
* it tries to deactivate()), giving up (and rescanning) on "ctl"
* contention. To avoid missed signals during deactivation, the
* method rescans and reactivates if there may have been a missed
- * signal during deactivation. Because idle workers are often not
- * yet blocked (parked), we use a WorkQueue field to advertise
- * that a waiter actually needs unparking upon signal.
+ * (external) signal during deactivation. To reduce false-alarm
+ * reactivations while doing so, we scan multiple times
+ * (analogously to method quiescent()) before trying to
+ * reactivate. Because idle workers are often not yet blocked
+ * (parked), we use a WorkQueue field to advertise that a waiter
+ * actually needs unparking upon signal.
*
* Quiescence. Workers scan looking for work, giving up when they
* don't find any, without being sure that none are available.
@@ -644,22 +652,36 @@ public class ForkJoinPool extends AbstractExecutorService {
* workers are inactive because the caller and any others
* executing helpQuiesce are not included in counts.
*
- * Termination. A call to shutdownNow invokes tryTerminate to
- * atomically set a runState mode bit. However, the process of
- * termination is intrinsically non-atomic. The calling thread, as
- * well as other workers thereafter terminating help cancel queued
- * tasks and interrupt other workers. These actions race with
- * unterminated workers. By default, workers check for
- * termination only when accessing pool state. This may take a
- * while but suffices for structured computational tasks. But not
- * necessarily for others. Class InterruptibleTask (see below)
- * further arranges runState checks before executing task bodies,
- * and ensures interrupts while terminating. Even so, there are no
- * guarantees after an abrupt shutdown that remaining tasks
- * complete normally or exceptionally or are cancelled.
- * Termination may fail to complete if running tasks ignore both
- * task status and interrupts and/or produce more tasks after
- * others that could cancel them have exited.
+ * Termination. Termination is initiated by setting STOP in one of
+ * three ways (via methods tryTerminate and quiescent):
+ * * A call to shutdownNow, in which case all workers are
+ * interrupted, first ensuring that the queues array is stable,
+ * to avoid missing any workers.
+ * * A call to shutdown when quiescent, in which case method
+ * releaseWaiters is used to dequeue them, at which point they notice
+ * STOP state and return from runWorker to deregister();
+ * * The pool becomes quiescent() sometime after shutdown has
+ * been called, in which case releaseWaiters is also used to
+ * propagate as they deregister.
+ * Upon STOP, each worker, as well as external callers to
+ * tryTerminate (via close() etc) race to set CLEANED, indicating
+ * that all tasks have been cancelled. The implementation (method
+ * cleanQueues) balances cases in which there may be many tasks to
+ * cancel (benefitting from parallelism) versus contention and
+ * interference when many threads try to poll remaining queues,
+ * while also avoiding unnecessary rechecks, by using
+ * pseudorandom scans and giving up upon interference. This may be
+ * retried by the same caller only when there are no more
+ * registered workers, using the same criteria as method
+ * quiescent. When CLEANED and all workers have deregistered,
+ * TERMINATED is set, also signalling any caller of
+ * awaitTermination or close. Because shutdownNow-based
+ * termination relies on interrupts, there is no guarantee that
+ * workers will stop if their tasks ignore interrupts. Class
+ * InterruptibleTask (see below) further arranges runState checks
+ * before executing task bodies, and ensures interrupts while
+ * terminating. Even so, there are no guarantees because tasks may
+ * internally enter unbounded loops.
*
* Trimming workers. To release resources after periods of lack of
* use, a worker starting to wait when the pool is quiescent will
@@ -819,10 +841,11 @@ public class ForkJoinPool extends AbstractExecutorService {
* overridden by system properties, we use workers of subclass
* InnocuousForkJoinWorkerThread when there is a SecurityManager
* present. These workers have no permissions set, do not belong
- * to any user-defined ThreadGroup, and clear all ThreadLocals
- * after executing any top-level task. The associated mechanics
- * may be JVM-dependent and must access particular Thread class
- * fields to achieve this effect.
+ * to any user-defined ThreadGroup, and clear all ThreadLocals and
+ * reset the ContextClassLoader before (re)activating to execute
+ * top-level task. The associated mechanics may be JVM-dependent
+ * and must access particular Thread class fields to achieve this
+ * effect.
*
* InterruptibleTasks
* ====================
@@ -844,7 +867,9 @@ public class ForkJoinPool extends AbstractExecutorService {
* shutdown, runners are interrupted so they can cancel. Since
* external joining callers never run these tasks, they must await
* cancellation by others, which can occur along several different
- * paths.
+ * paths. The inability to rely on caller-runs may also require
+ * extra signalling (resulting in scanning and contention) so is
+ * done only conditionally in methods push and runworker.
*
* Across these APIs, rules for reporting exceptions for tasks
* with results accessed via join() differ from those via get(),
@@ -904,10 +929,8 @@ public class ForkJoinPool extends AbstractExecutorService {
* direct false-sharing and indirect cases due to GC bookkeeping
* (cardmarks etc), and reduce the number of resizes, which are
* not especially fast because they require atomic transfers.
- * Currently, arrays for workers are initialized to be just large
- * enough to avoid resizing in most tree-structured tasks, but
- * larger for external queues where both false-sharing problems
- * and the need for resizing are more common. (Maintenance note:
+ * Currently, arrays are initialized to be just large enough to
+ * avoid resizing in most tree-structured tasks. (Maintenance note:
* any changes in fields, queues, or their uses, or JVM layout
* policies, must be accompanied by re-evaluation of these
* placement and sizing decisions.)
@@ -996,12 +1019,6 @@ public class ForkJoinPool extends AbstractExecutorService {
*/
static final int INITIAL_QUEUE_CAPACITY = 1 << 6;
- /**
- * Initial capacity of work-stealing queue array for external queues.
- * Must be a power of two, at least 2. See above.
- */
- static final int INITIAL_EXTERNAL_QUEUE_CAPACITY = 1 << 9;
-
// conversions among short, int, long
static final int SMASK = 0xffff; // (unsigned) short bits
static final long LMASK = 0xffffffffL; // lower 32 bits of long
@@ -1015,8 +1032,9 @@ public class ForkJoinPool extends AbstractExecutorService {
// pool.runState bits
static final long STOP = 1L << 0; // terminating
static final long SHUTDOWN = 1L << 1; // terminate when quiescent
- static final long TERMINATED = 1L << 2; // only set if STOP also set
- static final long RS_LOCK = 1L << 3; // lowest seqlock bit
+ static final long CLEANED = 1L << 2; // stopped and queues cleared
+ static final long TERMINATED = 1L << 3; // only set if STOP also set
+ static final long RS_LOCK = 1L << 4; // lowest seqlock bit
// spin/sleep limits for runState locking and elsewhere
static final int SPIN_WAITS = 1 << 7; // max calls to onSpinWait
@@ -1246,9 +1264,7 @@ public class ForkJoinPool extends AbstractExecutorService {
*/
WorkQueue(ForkJoinWorkerThread owner, int id, int cfg,
boolean clearThreadLocals) {
- array = new ForkJoinTask>[owner == null ?
- INITIAL_EXTERNAL_QUEUE_CAPACITY :
- INITIAL_QUEUE_CAPACITY];
+ array = new ForkJoinTask>[INITIAL_QUEUE_CAPACITY];
this.owner = owner;
this.config = (clearThreadLocals) ? cfg | CLEAR_TLS : cfg;
}
@@ -1294,10 +1310,12 @@ public class ForkJoinPool extends AbstractExecutorService {
unlockPhase();
if (room < 0)
throw new RejectedExecutionException("Queue capacity exceeded");
- else if ((room == 0 ||
- a[m & (s - 2)] == null) && // at most one existing task
- pool != null)
- pool.signalWork();
+ if ((room == 0 || // pad for InterruptibleTasks
+ a[m & (s - ((internal || task == null ||
+ task.getClass().getSuperclass() !=
+ interruptibleTaskClass) ? 1 : 2))] == null) &&
+ pool != null)
+ pool.signalWork(); // may have appeared empty
}
}
@@ -1351,10 +1369,8 @@ public class ForkJoinPool extends AbstractExecutorService {
updateBase(nb);
break;
}
- while (b == (b = base)) {
- U.loadFence();
+ while (b == (b = U.getIntAcquire(this, BASE)))
Thread.onSpinWait(); // spin to reduce memory traffic
- }
if (p - b <= 0)
break;
}
@@ -1378,12 +1394,12 @@ public class ForkJoinPool extends AbstractExecutorService {
final boolean tryUnpush(ForkJoinTask> task, boolean internal) {
boolean taken = false;
ForkJoinTask>[] a = array;
- int p = top, s = p - 1, cap, k;
+ int p = top, s = p - 1, cap; long k;
if (a != null && (cap = a.length) > 0 &&
- a[k = (cap - 1) & s] == task &&
+ U.getReference(a, k = slotOffset((cap - 1) & s)) == task &&
(internal || tryLockPhase())) {
if (top == p &&
- U.compareAndSetReference(a, slotOffset(k), task, null)) {
+ U.compareAndSetReference(a, k, task, null)) {
taken = true;
updateTop(s);
}
@@ -1417,28 +1433,25 @@ public class ForkJoinPool extends AbstractExecutorService {
* Polls for a task. Used only by non-owners.
*/
final ForkJoinTask> poll() {
- for (int b = base;;) {
- int cap, k, nb; ForkJoinTask>[] a;
+ for (int pb = -1, b; ; pb = b) { // track progress
+ ForkJoinTask> t; int cap, nb; long k; ForkJoinTask>[] a;
if ((a = array) == null || (cap = a.length) <= 0)
break;
- long kp = slotOffset(k = (cap - 1) & b);
- int nk = (nb = b + 1) & (cap - 1); // next slot
- int sk = (b + 2) & (cap - 1); // 2nd slot ahead
- ForkJoinTask> t = a[k];
- U.loadFence();
- if (b == (b = base)) { // else inconsistent
- if (t != null) {
- if (U.compareAndSetReference(a, kp, t, null)) {
- updateBase(nb);
- return t;
- }
- b = base;
- }
- else if (a[sk] == null && a[nk] == null && a[k] == null) {
- if (top - b <= 0)
- break; // empty
+ t = (ForkJoinTask>)U.getReferenceAcquire(
+ a, k = slotOffset((cap - 1) & (b = base)));
+ Object u = U.getReference( // next slot
+ a, slotOffset((cap - 1) & (nb = b + 1)));
+ if (base != b) // inconsistent
+ ;
+ else if (t == null) {
+ if (u == null && top - b <= 0)
+ break; // empty
+ if (pb == b)
Thread.onSpinWait(); // stalled
- }
+ }
+ else if (U.compareAndSetReference(a, k, t, null)) {
+ updateBase(nb);
+ return t;
}
}
return null;
@@ -1449,14 +1462,11 @@ public class ForkJoinPool extends AbstractExecutorService {
/**
* Runs the given task, as well as remaining local tasks.
*/
- final void topLevelExec(ForkJoinTask> task, int cfg) {
- int fifo = cfg & FIFO;
+ final void topLevelExec(ForkJoinTask> task, int fifo) {
while (task != null) {
task.doExec();
task = nextLocalTask(fifo);
}
- if ((cfg & CLEAR_TLS) != 0)
- ThreadLocalRandom.eraseThreadLocals(Thread.currentThread());
}
/**
@@ -1468,23 +1478,24 @@ public class ForkJoinPool extends AbstractExecutorService {
int b = base, p = top, s = p - 1, d = p - b, cap;
if (a != null && (cap = a.length) > 0) {
for (int m = cap - 1, i = s; d > 0; --i, --d) {
- ForkJoinTask> t; int k; boolean taken;
- if ((t = a[k = i & m]) == null)
+ long k; boolean taken;
+ ForkJoinTask> t = (ForkJoinTask>)U.getReference(
+ a, k = slotOffset(i & m));
+ if (t == null)
break;
if (t == task) {
- long pos = slotOffset(k);
if (!internal && !tryLockPhase())
break; // fail if locked
if (taken =
(top == p &&
- U.compareAndSetReference(a, pos, task, null))) {
+ U.compareAndSetReference(a, k, task, null))) {
if (i == s) // act as pop
updateTop(s);
else if (i == base) // act as poll
updateBase(i + 1);
else { // swap with top
U.putReferenceVolatile(
- a, pos, (ForkJoinTask>)
+ a, k, (ForkJoinTask>)
U.getAndSetReference(
a, slotOffset(s & m), null));
updateTop(s);
@@ -1512,19 +1523,18 @@ public class ForkJoinPool extends AbstractExecutorService {
int status = 0;
if (task != null) {
outer: for (;;) {
- ForkJoinTask>[] a; ForkJoinTask> t; boolean taken;
- int stat, p, s, cap, k;
+ ForkJoinTask>[] a; boolean taken; Object o;
+ int stat, p, s, cap;
if ((stat = task.status) < 0) {
status = stat;
break;
}
if ((a = array) == null || (cap = a.length) <= 0)
break;
- if ((t = a[k = (cap - 1) & (s = (p = top) - 1)]) == null)
+ long k = slotOffset((cap - 1) & (s = (p = top) - 1));
+ if (!((o = U.getReference(a, k)) instanceof CountedCompleter))
break;
- if (!(t instanceof CountedCompleter))
- break;
- CountedCompleter> f = (CountedCompleter>)t;
+ CountedCompleter> t = (CountedCompleter>)o, f = t;
for (int steps = cap;;) { // bound path
if (f == task)
break;
@@ -1535,7 +1545,7 @@ public class ForkJoinPool extends AbstractExecutorService {
break;
if (taken =
(top == p &&
- U.compareAndSetReference(a, slotOffset(k), t, null)))
+ U.compareAndSetReference(a, k, t, null)))
updateTop(s);
if (!internal)
unlockPhase();
@@ -1557,11 +1567,11 @@ public class ForkJoinPool extends AbstractExecutorService {
*/
final void helpAsyncBlocker(ManagedBlocker blocker) {
for (;;) {
- ForkJoinTask>[] a; int b, cap, k;
+ ForkJoinTask> t; ForkJoinTask>[] a; int b, cap; long k;
if ((a = array) == null || (cap = a.length) <= 0)
break;
- ForkJoinTask> t = a[k = (b = base) & (cap - 1)];
- U.loadFence();
+ t = (ForkJoinTask>)U.getReferenceAcquire(
+ a, k = slotOffset((cap - 1) & (b = base)));
if (t == null) {
if (top - b <= 0)
break;
@@ -1572,7 +1582,7 @@ public class ForkJoinPool extends AbstractExecutorService {
if (blocker != null && blocker.isReleasable())
break;
if (base == b && t != null &&
- U.compareAndSetReference(a, slotOffset(k), t, null)) {
+ U.compareAndSetReference(a, k, t, null)) {
updateBase(b + 1);
t.doExec();
}
@@ -1581,6 +1591,18 @@ public class ForkJoinPool extends AbstractExecutorService {
// misc
+ /**
+ * Cancels all local tasks. Called only by owner.
+ */
+ final void cancelTasks() {
+ for (ForkJoinTask> t; (t = nextLocalTask(0)) != null; ) {
+ try {
+ t.cancel(false);
+ } catch (Throwable ignore) {
+ }
+ }
+ }
+
/**
* Returns true if internal and not known to be blocked.
*/
@@ -1630,6 +1652,16 @@ public class ForkJoinPool extends AbstractExecutorService {
*/
static volatile RuntimePermission modifyThreadPermission;
+ /**
+ * Cached for faster type tests.
+ */
+ static final Class> interruptibleTaskClass;
+
+ /**
+ * For VirtualThread intrinsics
+ */
+ private static final JavaLangAccess JLA;
+
// fields declared in order of their likely layout on most VMs
volatile CountDownLatch termination; // lazily constructed
final Predicate super ForkJoinPool> saturate;
@@ -1773,7 +1805,7 @@ public class ForkJoinPool extends AbstractExecutorService {
* @param w caller's WorkQueue
*/
final void registerWorker(WorkQueue w) {
- if (w != null) {
+ if (w != null && (runState & STOP) == 0L) {
ThreadLocalRandom.localInit();
int seed = w.stackPred = ThreadLocalRandom.getProbe();
int phaseSeq = seed & ~((IDLE << 1) - 1); // initial phase tag
@@ -1824,49 +1856,35 @@ public class ForkJoinPool extends AbstractExecutorService {
* @param ex the exception causing failure, or null if none
*/
final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
- if ((runState & STOP) != 0L) // ensure released
- releaseAll();
- WorkQueue w = null;
- int src = 0, phase = 0;
- boolean replaceable = false;
- if (wt != null && (w = wt.workQueue) != null) {
- phase = w.phase;
- if ((src = w.source) != DROPPED) {
- w.source = DROPPED; // else already dropped
- if (phase != 0) { // else failed to start
- replaceable = true;
- if (w.top - w.base > 0) {
- ForkJoinTask> t; // cancel remaining tasks
- while ((t = w.nextLocalTask()) != null) {
- try {
- t.cancel(false);
- } catch (Throwable ignore) {
- }
- }
- }
- }
- }
- }
- if (src != DROPPED) { // decrement counts
- long c = ctl;
+ WorkQueue w = null; // null if not created
+ int phase = 0; // 0 if not registered
+ if (wt != null && (w = wt.workQueue) != null &&
+ (phase = w.phase) != 0 && (phase & IDLE) != 0)
+ releaseWaiters(); // ensure released
+ if (w == null || w.source != DROPPED) {
+ long c = ctl; // decrement counts
do {} while (c != (c = compareAndExchangeCtl(
c, ((RC_MASK & (c - RC_UNIT)) |
(TC_MASK & (c - TC_UNIT)) |
(LMASK & c)))));
}
- if ((tryTerminate(false, false) & STOP) == 0L && w != null) {
- WorkQueue[] qs; int n, i; // remove index unless terminating
+ if (phase != 0 && w != null) { // remove index unless terminating
long ns = w.nsteals & 0xffffffffL;
- if ((lockRunState() & STOP) != 0L)
- replaceable = false;
- else if ((qs = queues) != null && (n = qs.length) > 0 &&
- qs[i = phase & SMASK & (n - 1)] == w) {
- qs[i] = null;
- stealCount += ns; // accumulate steals
+ if ((runState & STOP) == 0L) {
+ WorkQueue[] qs; int n, i;
+ if ((lockRunState() & STOP) == 0L &&
+ (qs = queues) != null && (n = qs.length) > 0 &&
+ qs[i = phase & SMASK & (n - 1)] == w) {
+ qs[i] = null;
+ stealCount += ns; // accumulate steals
+ }
+ unlockRunState();
}
- unlockRunState();
- if (replaceable)
- signalWork();
+ }
+ if ((tryTerminate(false, false) & STOP) == 0L &&
+ phase != 0 && w != null && w.source != DROPPED) {
+ signalWork(); // possibly replace
+ w.cancelTasks(); // clean queue
}
if (ex != null)
ForkJoinTask.rethrow(ex);
@@ -1912,12 +1930,9 @@ public class ForkJoinPool extends AbstractExecutorService {
/**
* Releases all waiting workers. Called only during shutdown.
- *
- * @return current ctl
*/
- private long releaseAll() {
- long c = ctl;
- for (;;) {
+ private void releaseWaiters() {
+ for (long c = ctl;;) {
WorkQueue[] qs; WorkQueue v; int sp, i;
if ((sp = (int)c) == 0 || (qs = queues) == null ||
qs.length <= (i = sp & SMASK) || (v = qs[i]) == null)
@@ -1930,7 +1945,6 @@ public class ForkJoinPool extends AbstractExecutorService {
U.unpark(v.owner);
}
}
- return c;
}
/**
@@ -1940,7 +1954,7 @@ public class ForkJoinPool extends AbstractExecutorService {
* unlocked; if so, setting STOP if shutdown is enabled
*/
private int quiescent() {
- outer: for (;;) {
+ for (;;) {
long phaseSum = 0L;
boolean swept = false;
for (long e, prevRunState = 0L; ; prevRunState = e) {
@@ -1983,64 +1997,61 @@ public class ForkJoinPool extends AbstractExecutorService {
final void runWorker(WorkQueue w) {
if (w != null) {
int phase = w.phase, r = w.stackPred; // seed from registerWorker
- int cfg = w.config, src = -1, nsteals = 0;
- rescan: for (boolean scanned = false;;) {
+ int fifo = w.config & FIFO, nsteals = 0, src = -1;
+ for (;;) {
WorkQueue[] qs;
r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
if ((runState & STOP) != 0L || (qs = queues) == null)
- return;
+ break;
int n = qs.length, i = r, step = (r >>> 16) | 1;
- for (int l = n; l > 0; --l, i += step) { // scan queues
- int j; WorkQueue q;
- if ((q = qs[j = i & (n - 1)]) != null) {
- boolean taken = false;
- for (int pb = -1, b = q.base;;) {
- int cap, k, nb; ForkJoinTask>[] a;
- if ((a = q.array) == null || (cap = a.length) <= 0)
- continue rescan;
- long kp = slotOffset(k = (cap - 1) & b);
- int nk = (nb = b + 1) & (cap - 1); // next slot
- int sk = (b + 2) & (cap - 1); // 2nd slot ahead
- ForkJoinTask> t = a[k];
- U.loadFence();
- if (b != (b = q.base))
- ; // inconsistent
- else if (t == null) { // possibly empty
- if (a[sk] == null && a[nk] == null &&
- a[k] == null) { // screen
- if (q.top - b > 0) { // stalled
- if (!taken) // move unless taking
- continue rescan;
+ boolean rescan = false;
+ scan: for (int l = n; l > 0; --l, i += step) { // scan queues
+ int j, cap; WorkQueue q; ForkJoinTask>[] a;
+ if ((q = qs[j = i & (n - 1)]) != null &&
+ (a = q.array) != null && (cap = a.length) > 0) {
+ for (int m = cap - 1, pb = -1, b = q.base;;) {
+ ForkJoinTask> t; long k;
+ t = (ForkJoinTask>)U.getReferenceAcquire(
+ a, k = slotOffset(m & b));
+ if (b != (b = q.base) || t == null ||
+ !U.compareAndSetReference(a, k, t, null)) {
+ if (a[b & m] == null) {
+ if (rescan) // end of run
+ break scan;
+ if (a[(b + 1) & m] == null &&
+ a[(b + 2) & m] == null) {
+ break; // probably empty
+ }
+ if (pb == (pb = b)) { // track progress
+ rescan = true; // stalled; reorder scan
+ break scan;
}
- else if (taken)
- continue rescan; // depleted; restart
- else
- break; // empty
}
- if (pb == (pb = b)) // base unchanged
- Thread.onSpinWait();
}
- else if (!U.compareAndSetReference(a, kp, t, null))
- b = q.base; // contended
else {
- q.base = nb;
+ boolean propagate;
+ int nb = q.base = b + 1;
w.nsteals = ++nsteals;
- w.source = j; // volatile write
- if (taken != (taken = true) && a[nk] != null)
- signalWork(); // propagate signal
- w.topLevelExec(t, cfg);
- if ((b = q.base) != nb && src != (src = j))
- continue rescan; // reduce interference
+ w.source = j; // volatile
+ rescan = true;
+ if (propagate =
+ ((src != (src = j) ||
+ t.getClass().getSuperclass() ==
+ interruptibleTaskClass) &&
+ a[nb & m] != null))
+ signalWork();
+ w.topLevelExec(t, fifo);
+ if ((b = q.base) != nb && !propagate)
+ break scan; // reduce interference
}
}
}
}
- if (!scanned)
- scanned = true; // rescan before deactivate
- else if (((phase = deactivate(w, r, phase)) & IDLE) == 0)
- scanned = false;
- else
- return;
+ if (!rescan) {
+ if (((phase = deactivate(w, phase)) & IDLE) != 0)
+ break;
+ src = -1; // re-enable propagation
+ }
}
}
}
@@ -2049,56 +2060,38 @@ public class ForkJoinPool extends AbstractExecutorService {
* Deactivates and if necessary awaits signal or termination.
*
* @param w the worker
- * @param r random seed
* @param phase current phase
* @return current phase, with IDLE set if worker should exit
*/
- private int deactivate(WorkQueue w, int r, int phase) {
+ private int deactivate(WorkQueue w, int phase) {
+ if (w == null) // currently impossible
+ return IDLE;
int p = phase | IDLE, activePhase = phase + (IDLE << 1);
- if (w != null) { // always true
- w.phase = p;
- long pc = ctl, qc;
- for (;;) { // try to enqueue
- w.stackPred = (int)pc; // set ctl stack link
- qc = (activePhase & LMASK) | ((pc - RC_UNIT) & UMASK);
- if (pc == (pc = compareAndExchangeCtl(pc, qc))) // success
- break;
- if ((pc & RC_MASK) >= (qc & RC_MASK)) {
- p = w.phase = phase; // back out on possible signal
- break;
- }
- }
- if (p != phase && // check quiescent termination
- ((runState & SHUTDOWN) == 0L || quiescent() <= 0)) {
- WorkQueue[] qs;
- int spins = ((short)(qc >>> TC_SHIFT) << 1) + SPIN_WAITS + 1;
- while ((p = w.phase) != activePhase && --spins > 0)
- Thread.onSpinWait(); // reduce flailing
- if (p != activePhase && (qs = queues) != null) {
- int n = qs.length, step = (r >>> 16) | 1;
- for (int i = r, l = n; l > 0; --l, i += step) {
- WorkQueue q; // check for missed signals
- if ((q = qs[i & (n - 1)]) != null &&
- q.top - q.base > 0) {
- if (ctl == qc && compareAndSetCtl(qc, pc)) {
- p = w.phase = activePhase;
- break; // self-signal
- }
- if ((p = w.phase) == activePhase)
- break;
- }
- }
- if (p != activePhase) {
- long delay = (((qc & RC_MASK) > 0L) ? 0L :
- (w.source != INVALID_ID) ? keepAlive :
- TIMEOUT_SLOP); // minimal delay if cascade
- if ((p = w.phase) != activePhase)
- p = awaitWork(w, p, delay); // block, drop, or exit
- }
- }
- }
+ long pc = ctl, qc = (activePhase & LMASK) | ((pc - RC_UNIT) & UMASK);
+ w.stackPred = (int)pc; // set ctl stack link
+ w.phase = p;
+ if (!compareAndSetCtl(pc, qc)) // try to enqueue
+ return w.phase = phase; // back out on possible signal
+ int ac = (short)(qc >>> RC_SHIFT), n; long e; WorkQueue[] qs;
+ if (((e = runState) & STOP) != 0L ||
+ ((e & SHUTDOWN) != 0L && ac == 0 && quiescent() > 0) ||
+ (qs = queues) == null || (n = qs.length) <= 0)
+ return IDLE; // terminating
+ int prechecks = Math.min(ac, 2); // reactivation threshold
+ for (int k = Math.max(n + (n << 1), SPIN_WAITS << 1);;) {
+ WorkQueue q; int cap; ForkJoinTask>[] a;
+ if (w.phase == activePhase)
+ return activePhase;
+ if (--k < 0)
+ return awaitWork(w, p); // block, drop, or exit
+ if ((k & 1) != 0)
+ Thread.onSpinWait(); // interleave spins and rechecks
+ else if ((q = qs[k & (n - 1)]) != null &&
+ (a = q.array) != null && (cap = a.length) > 0 &&
+ a[q.base & (cap - 1)] != null && --prechecks < 0 &&
+ ctl == qc && compareAndSetCtl(qc, pc))
+ return w.phase = activePhase; // reactivate
}
- return p;
}
/**
@@ -2106,33 +2099,41 @@ public class ForkJoinPool extends AbstractExecutorService {
*
* @param w the work queue
* @param p current phase (known to be idle)
- * @param delay if nonzero keepAlive before trimming if quiescent
* @return current phase, with IDLE set if worker should exit
*/
- private int awaitWork(WorkQueue w, int p, long delay) {
+ private int awaitWork(WorkQueue w, int p) {
if (w != null) {
+ ForkJoinWorkerThread t; long deadline;
+ if ((w.config & CLEAR_TLS) != 0 && (t = w.owner) != null)
+ t.resetThreadLocals(); // clear before reactivate
+ if ((ctl & RC_MASK) > 0L)
+ deadline = 0L;
+ else if ((deadline =
+ (((w.source != INVALID_ID) ? keepAlive : TIMEOUT_SLOP)) +
+ System.currentTimeMillis()) == 0L)
+ deadline = 1L; // avoid zero
int activePhase = p + IDLE;
- LockSupport.setCurrentBlocker(this);
- long deadline = (delay == 0L ? 0L :
- delay + System.currentTimeMillis());
- w.parking = 1; // enable unpark
- while ((p = w.phase) != activePhase) {
- boolean trimmable = false; int trim;
- Thread.interrupted(); // clear status
- if ((runState & STOP) != 0L)
- break;
- if (deadline != 0L) {
- if ((trim = tryTrim(w, p, deadline)) > 0)
+ if ((p = w.phase) != activePhase && (runState & STOP) == 0L) {
+ LockSupport.setCurrentBlocker(this);
+ w.parking = 1; // enable unpark
+ while ((p = w.phase) != activePhase) {
+ boolean trimmable = false; int trim;
+ Thread.interrupted(); // clear status
+ if ((runState & STOP) != 0L)
break;
- else if (trim < 0)
- deadline = 0L;
- else
- trimmable = true;
+ if (deadline != 0L) {
+ if ((trim = tryTrim(w, p, deadline)) > 0)
+ break;
+ else if (trim < 0)
+ deadline = 0L;
+ else
+ trimmable = true;
+ }
+ U.park(trimmable, deadline);
}
- U.park(trimmable, deadline);
+ w.parking = 0;
+ LockSupport.setCurrentBlocker(null);
}
- w.parking = 0;
- LockSupport.setCurrentBlocker(null);
}
return p;
}
@@ -2287,12 +2288,13 @@ public class ForkJoinPool extends AbstractExecutorService {
int j; WorkQueue q;
if ((q = qs[j = r & SMASK & (n - 1)]) != null) {
for (;;) {
- int sq = q.source, b, cap, k; ForkJoinTask>[] a;
+ ForkJoinTask> t; ForkJoinTask>[] a;
+ boolean eligible = false;
+ int sq = q.source, b, cap; long k;
if ((a = q.array) == null || (cap = a.length) <= 0)
break;
- ForkJoinTask> t = a[k = (b = q.base) & (cap - 1)];
- U.loadFence();
- boolean eligible = false;
+ t = (ForkJoinTask>)U.getReferenceAcquire(
+ a, k = slotOffset((cap - 1) & (b = q.base)));
if (t == task)
eligible = true;
else if (t != null) { // check steal chain
@@ -2311,18 +2313,16 @@ public class ForkJoinPool extends AbstractExecutorService {
}
if ((s = task.status) < 0)
break outer; // validate
- if (q.source == sq && q.base == b && a[k] == t) {
- int nb = b + 1, nk = nb & (cap - 1);
+ if (q.source == sq && q.base == b &&
+ U.getReference(a, k) == t) {
if (!eligible) { // revisit if nonempty
- if (!rescan && t == null &&
- (a[nk] != null || q.top - b > 0))
+ if (!rescan && t == null && q.top - b > 0)
rescan = true;
break;
}
- if (U.compareAndSetReference(
- a, slotOffset(k), t, null)) {
- q.updateBase(nb);
- w.source = j;
+ if (U.compareAndSetReference(a, k, t, null)) {
+ q.base = b + 1;
+ w.source = j; // volatile write
t.doExec();
w.source = wsrc;
rescan = true; // restart at index r
@@ -2369,12 +2369,13 @@ public class ForkJoinPool extends AbstractExecutorService {
int j; WorkQueue q;
if ((q = qs[j = r & SMASK & (n - 1)]) != null) {
for (;;) {
- ForkJoinTask>[] a; int b, cap, k;
+ ForkJoinTask> t; ForkJoinTask>[] a;
+ int b, cap, nb; long k;
+ boolean eligible = false;
if ((a = q.array) == null || (cap = a.length) <= 0)
break;
- ForkJoinTask> t = a[k = (b = q.base) & (cap - 1)];
- U.loadFence();
- boolean eligible = false;
+ t = (ForkJoinTask>)U.getReferenceAcquire(
+ a, k = slotOffset((cap - 1) & (b = q.base)));
if (t instanceof CountedCompleter) {
CountedCompleter> f = (CountedCompleter>)t;
for (int steps = cap; steps > 0; --steps) {
@@ -2389,19 +2390,17 @@ public class ForkJoinPool extends AbstractExecutorService {
if ((s = task.status) < 0) // validate
break outer;
if (q.base == b) {
- int nb = b + 1, nk = nb & (cap - 1);
if (eligible) {
if (U.compareAndSetReference(
- a, slotOffset(k), t, null)) {
- q.updateBase(nb);
+ a, k, t, null)) {
+ q.updateBase(b + 1);
t.doExec();
locals = rescan = true;
break scan;
}
}
- else if (a[k] == t) {
- if (!rescan && t == null &&
- (a[nk] != null || q.top - b > 0))
+ else if (U.getReference(a, k) == t) {
+ if (!rescan && t == null && q.top - b > 0)
rescan = true; // revisit
break;
}
@@ -2454,14 +2453,15 @@ public class ForkJoinPool extends AbstractExecutorService {
int j; WorkQueue q;
if ((q = qs[j = r & SMASK & (n - 1)]) != null && q != w) {
for (;;) {
- ForkJoinTask>[] a; int b, cap, k;
+ ForkJoinTask> t; ForkJoinTask>[] a;
+ int b, cap; long k;
if ((a = q.array) == null || (cap = a.length) <= 0)
break;
- ForkJoinTask> t = a[k = (b = q.base) & (cap - 1)];
+ t = (ForkJoinTask>)U.getReferenceAcquire(
+ a, k = slotOffset((cap - 1) & (b = q.base)));
if (t != null && phase == inactivePhase) // reactivate
w.phase = phase = activePhase;
- U.loadFence();
- if (q.base == b && a[k] == t) {
+ if (q.base == b && U.getReference(a, k) == t) {
int nb = b + 1;
if (t == null) {
if (!rescan) {
@@ -2474,10 +2474,9 @@ public class ForkJoinPool extends AbstractExecutorService {
}
break;
}
- if (U.compareAndSetReference(
- a, slotOffset(k), t, null)) {
- q.updateBase(nb);
- w.source = j;
+ if (U.compareAndSetReference(a, k, t, null)) {
+ q.base = nb;
+ w.source = j; // volatile write
t.doExec();
w.source = wsrc;
rescan = locals = true;
@@ -2603,15 +2602,15 @@ public class ForkJoinPool extends AbstractExecutorService {
if (w == null)
w = new WorkQueue(null, id, 0, false);
w.phase = id;
- long stop = lockRunState() & STOP;
- if (stop == 0L && queues == qs && qs[i] == null) {
+ long isShutdown = lockRunState() & SHUTDOWN;
+ if (isShutdown == 0L && queues == qs && qs[i] == null) {
q = qs[i] = w; // else retry
w = null;
}
unlockRunState();
if (q != null)
return q;
- if (stop != 0L)
+ if (isShutdown != 0L)
break;
}
else if (!q.tryLockPhase()) // move index
@@ -2623,7 +2622,6 @@ public class ForkJoinPool extends AbstractExecutorService {
else
return q;
}
- tryTerminate(false, false);
throw new RejectedExecutionException();
}
@@ -2640,7 +2638,6 @@ public class ForkJoinPool extends AbstractExecutorService {
}
q.push(task, signalIfEmpty ? this : null, internal);
}
- private static final JavaLangAccess JLA = SharedSecrets.getJavaLangAccess();
/**
* Returns queue for an external submission, bypassing call to
@@ -2760,51 +2757,96 @@ public class ForkJoinPool extends AbstractExecutorService {
* @return runState on exit
*/
private long tryTerminate(boolean now, boolean enable) {
- long e = runState, isShutdown;
- if ((e & STOP) == 0L) {
- if (now)
- runState = e = (lockRunState() + RS_LOCK) | STOP | SHUTDOWN;
- else if ((isShutdown = (e & SHUTDOWN)) != 0 || enable) {
- if (isShutdown == 0)
- getAndBitwiseOrRunState(SHUTDOWN);
- if (quiescent() > 0)
- e = runState;
- }
- if ((e & STOP) != 0L && (releaseAll() & RC_MASK) > 0L && now)
- interruptAll();
- }
- if ((e & (STOP | TERMINATED)) == STOP) { // help cancel tasks
- if ((ctl & RC_MASK) > 0L) { // unless all inactive
- int r = (int)Thread.currentThread().threadId();
- WorkQueue[] qs = queues; // stagger traversals
- int n = (qs == null) ? 0 : qs.length;
- for (int l = n; l > 0; --l, ++r) {
- WorkQueue q; ForkJoinTask> t;
- if ((q = qs[r & (n - 1)]) != null &&
- q.source != DROPPED) {
- while ((t = q.poll()) != null) {
- try {
- t.cancel(false);
- } catch (Throwable ignore) {
- }
- }
- }
+ long e, isShutdown, ps;
+ if (((e = runState) & TERMINATED) != 0L)
+ now = false;
+ else if ((e & STOP) != 0L)
+ now = true;
+ else if (now) {
+ if (((ps = getAndBitwiseOrRunState(SHUTDOWN|STOP) & STOP)) == 0L) {
+ if ((ps & RS_LOCK) != 0L) {
+ spinLockRunState(); // ensure queues array stable after stop
+ unlockRunState();
}
+ interruptAll();
}
- if (((e = runState) & TERMINATED) == 0L && ctl == 0L) {
- e |= TERMINATED;
- if ((getAndBitwiseOrRunState(TERMINATED) & TERMINATED) == 0L) {
- CountDownLatch done; SharedThreadContainer ctr;
- if ((done = termination) != null)
- done.countDown();
- if ((ctr = container) != null)
- ctr.close();
+ }
+ else if ((isShutdown = (e & SHUTDOWN)) != 0L || enable) {
+ if (isShutdown == 0L)
+ getAndBitwiseOrRunState(SHUTDOWN);
+ if (quiescent() > 0)
+ now = true;
+ }
+
+ if (now) {
+ releaseWaiters();
+ for (;;) {
+ if (((e = runState) & CLEANED) == 0L) {
+ boolean clean = cleanQueues();
+ if (((e = runState) & CLEANED) == 0L && clean)
+ e = getAndBitwiseOrRunState(CLEANED) | CLEANED;
+ }
+ if ((e & TERMINATED) != 0L)
+ break;
+ if (ctl != 0L) // else loop if didn't finish cleaning
+ break;
+ if ((e & CLEANED) != 0L) {
+ e |= TERMINATED;
+ if ((getAndBitwiseOrRunState(TERMINATED) & TERMINATED) == 0L) {
+ CountDownLatch done; SharedThreadContainer ctr;
+ if ((done = termination) != null)
+ done.countDown();
+ if ((ctr = container) != null)
+ ctr.close();
+ }
+ break;
}
}
}
return e;
}
+ /**
+ * Scans queues in a psuedorandom order based on thread id,
+ * cancelling tasks until empty, or returning early upon
+ * interference or still-active external queues, in which case
+ * other calls will finish cancellation.
+ *
+ * @return true if all queues empty
+ */
+ private boolean cleanQueues() {
+ int r = (int)Thread.currentThread().threadId();
+ r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
+ int step = (r >>> 16) | 1; // randomize traversals
+ WorkQueue[] qs = queues;
+ int n = (qs == null) ? 0 : qs.length;
+ for (int l = n; l > 0; --l, r += step) {
+ WorkQueue q; ForkJoinTask>[] a; int cap;
+ if ((q = qs[r & (n - 1)]) != null &&
+ (a = q.array) != null && (cap = a.length) > 0) {
+ for (;;) {
+ ForkJoinTask> t; int b; long k;
+ t = (ForkJoinTask>)U.getReferenceAcquire(
+ a, k = slotOffset((cap - 1) & (b = q.base)));
+ if (q.base == b && t != null &&
+ U.compareAndSetReference(a, k, t, null)) {
+ q.updateBase(b + 1);
+ try {
+ t.cancel(false);
+ } catch (Throwable ignore) {
+ }
+ }
+ else if ((q.phase & (IDLE|1)) == 0 || // externally locked
+ q.top - q.base > 0)
+ return false; // incomplete
+ else
+ break;
+ }
+ }
+ }
+ return true;
+ }
+
/**
* Interrupts all workers
*/
@@ -2814,8 +2856,7 @@ public class ForkJoinPool extends AbstractExecutorService {
int n = (qs == null) ? 0 : qs.length;
for (int i = 1; i < n; i += 2) {
WorkQueue q; Thread o;
- if ((q = qs[i]) != null && (o = q.owner) != null && o != current &&
- q.source != DROPPED) {
+ if ((q = qs[i]) != null && (o = q.owner) != null && o != current) {
try {
o.interrupt();
} catch (Throwable ignore) {
@@ -4018,15 +4059,10 @@ public class ForkJoinPool extends AbstractExecutorService {
if ((scale & (scale - 1)) != 0)
throw new Error("array index scale not a power of two");
- defaultForkJoinWorkerThreadFactory =
- new DefaultForkJoinWorkerThreadFactory();
- @SuppressWarnings("removal")
- ForkJoinPool p = common = (System.getSecurityManager() == null) ?
- new ForkJoinPool((byte)0) :
- AccessController.doPrivileged(new PrivilegedAction<>() {
- public ForkJoinPool run() {
- return new ForkJoinPool((byte)0); }});
+ interruptibleTaskClass = ForkJoinTask.InterruptibleTask.class;
+ Class> dep = LockSupport.class; // ensure loaded
// allow access to non-public methods
+ JLA = SharedSecrets.getJavaLangAccess();
SharedSecrets.setJavaUtilConcurrentFJPAccess(
new JavaUtilConcurrentFJPAccess() {
@Override
@@ -4037,6 +4073,13 @@ public class ForkJoinPool extends AbstractExecutorService {
pool.endCompensatedBlock(post);
}
});
- Class> dep = LockSupport.class; // ensure loaded
+ defaultForkJoinWorkerThreadFactory =
+ new DefaultForkJoinWorkerThreadFactory();
+ @SuppressWarnings("removal")
+ ForkJoinPool p = common = (System.getSecurityManager() == null) ?
+ new ForkJoinPool((byte)0) :
+ AccessController.doPrivileged(new PrivilegedAction<>() {
+ public ForkJoinPool run() {
+ return new ForkJoinPool((byte)0); }});
}
}
diff --git a/src/java.base/share/classes/java/util/concurrent/ForkJoinWorkerThread.java b/src/java.base/share/classes/java/util/concurrent/ForkJoinWorkerThread.java
index 27b9b0812d6..7a468666ad3 100644
--- a/src/java.base/share/classes/java/util/concurrent/ForkJoinWorkerThread.java
+++ b/src/java.base/share/classes/java/util/concurrent/ForkJoinWorkerThread.java
@@ -41,6 +41,7 @@ import java.security.PrivilegedAction;
import java.security.ProtectionDomain;
import jdk.internal.access.JavaLangAccess;
import jdk.internal.access.SharedSecrets;
+import jdk.internal.misc.Unsafe;
/**
* A thread managed by a {@link ForkJoinPool}, which executes
@@ -225,6 +226,25 @@ public class ForkJoinWorkerThread extends Thread {
(sq = qs[i]) != null && sq.top - sq.base > 0) ||
q.top - q.base > 0));
}
+
+ /**
+ * Clears ThreadLocals, and if necessary resets ContextClassLoader
+ */
+ final void resetThreadLocals() {
+ if (U.getReference(this, THREADLOCALS) != null)
+ U.putReference(this, THREADLOCALS, null);
+ if (U.getReference(this, INHERITABLETHREADLOCALS) != null)
+ U.putReference(this, INHERITABLETHREADLOCALS, null);
+ if ((this instanceof InnocuousForkJoinWorkerThread) &&
+ ((InnocuousForkJoinWorkerThread)this).needCCLReset())
+ super.setContextClassLoader(ClassLoader.getSystemClassLoader());
+ }
+
+ private static final Unsafe U = Unsafe.getUnsafe();
+ private static final long THREADLOCALS
+ = U.objectFieldOffset(Thread.class, "threadLocals");
+ private static final long INHERITABLETHREADLOCALS
+ = U.objectFieldOffset(Thread.class, "inheritableThreadLocals");
private static final JavaLangAccess JLA = SharedSecrets.getJavaLangAccess();
/**
@@ -236,6 +256,7 @@ public class ForkJoinWorkerThread extends Thread {
static final class InnocuousForkJoinWorkerThread extends ForkJoinWorkerThread {
/** The ThreadGroup for all InnocuousForkJoinWorkerThreads */
private static final ThreadGroup innocuousThreadGroup = createGroup();
+ private boolean resetCCL;
InnocuousForkJoinWorkerThread(ForkJoinPool pool) {
super(innocuousThreadGroup, pool, true, true);
}
@@ -244,9 +265,20 @@ public class ForkJoinWorkerThread extends Thread {
public void setUncaughtExceptionHandler(UncaughtExceptionHandler x) { }
@Override // paranoically
+ @SuppressWarnings("removal")
public void setContextClassLoader(ClassLoader cl) {
- if (cl != null && ClassLoader.getSystemClassLoader() != cl)
+ if (System.getSecurityManager() != null &&
+ cl != null && ClassLoader.getSystemClassLoader() != cl)
throw new SecurityException("setContextClassLoader");
+ resetCCL = true;
+ super.setContextClassLoader(cl);
+ }
+
+ final boolean needCCLReset() { // get and clear
+ boolean needReset;
+ if (needReset = resetCCL)
+ resetCCL = false;
+ return needReset;
}
static ThreadGroup createGroup() {
@@ -255,5 +287,6 @@ public class ForkJoinWorkerThread extends Thread {
group = p;
return new ThreadGroup(group, "InnocuousForkJoinWorkerThreadGroup");
}
+
}
}