From ab8071d28027ecbf5e8984c30b35fa1c2d934de7 Mon Sep 17 00:00:00 2001
From: Doug Lea
Date: Wed, 21 Aug 2024 18:22:24 +0000
Subject: [PATCH] 8338146: Improve Exchanger performance with VirtualThreads
Reviewed-by: alanb
---
.../java/util/concurrent/Exchanger.java | 550 +++++++-----------
.../util/concurrent/ForkJoinWorkerThread.java | 26 +
.../util/concurrent/LinkedTransferQueue.java | 4 +-
3 files changed, 251 insertions(+), 329 deletions(-)
diff --git a/src/java.base/share/classes/java/util/concurrent/Exchanger.java b/src/java.base/share/classes/java/util/concurrent/Exchanger.java
index 0096bca8c6f..8674ea9af39 100644
--- a/src/java.base/share/classes/java/util/concurrent/Exchanger.java
+++ b/src/java.base/share/classes/java/util/concurrent/Exchanger.java
@@ -139,125 +139,109 @@ public class Exchanger {
* able to exchange items. That is, we cannot completely partition
* across threads, but instead give threads arena indices that
* will on average grow under contention and shrink under lack of
- * contention. We approach this by defining the Nodes that we need
- * anyway as ThreadLocals, and include in them per-thread index
- * and related bookkeeping state. (We can safely reuse per-thread
- * nodes rather than creating them fresh each time because slots
- * alternate between pointing to a node vs null, so cannot
- * encounter ABA problems. However, we do need some care in
- * resetting them between uses.)
+ * contention.
*
- * Implementing an effective arena requires allocating a bunch of
- * space, so we only do so upon detecting contention (except on
- * uniprocessors, where they wouldn't help, so aren't used).
- * Otherwise, exchanges use the single-slot slotExchange method.
- * On contention, not only must the slots be in different
- * locations, but the locations must not encounter memory
- * contention due to being on the same cache line (or more
- * generally, the same coherence unit). Because, as of this
- * writing, there is no way to determine cacheline size, we define
- * a value that is enough for common platforms. Additionally,
- * extra care elsewhere is taken to avoid other false/unintended
- * sharing and to enhance locality, including adding padding (via
- * @Contended) to Nodes, embedding "bound" as an Exchanger field.
+ * We approach this by defining the Nodes holding references to
+ * transfered items as ThreadLocals, and include in them
+ * per-thread index and related bookkeeping state. We can safely
+ * reuse per-thread nodes rather than creating them fresh each
+ * time because slots alternate between pointing to a node vs
+ * null, so cannot encounter ABA problems. However, we must ensure
+ * that object transfer fields are reset between uses. Given this,
+ * Participant nodes can be defined as static ThreadLocals. As
+ * seen for example in class Striped64, using indices established
+ * in one instance across others usually improves overall
+ * performance. Nodes also include a participant-local random
+ * number generator.
+ *
+ * Spreading out contention requires that the memory locations
+ * used by the arena slots don't share a cache line -- otherwise,
+ * the arena would have almost no benefit. We arrange this by
+ * adding another level of indirection: The arena elements point
+ * to "Slots", each of which is padded using @Contended. We only
+ * create a single Slot on intialization, adding more when
+ * needed. The per-thread Participant Nodes may also be subject to
+ * false-sharing contention, but tend to be more scattered in
+ * memory, so are unpadded, with some occasional performance impact.
*
* The arena starts out with only one used slot. We expand the
* effective arena size by tracking collisions; i.e., failed CASes
- * while trying to exchange. By nature of the above algorithm, the
- * only kinds of collision that reliably indicate contention are
- * when two attempted releases collide -- one of two attempted
- * offers can legitimately fail to CAS without indicating
- * contention by more than one other thread. (Note: it is possible
- * but not worthwhile to more precisely detect contention by
- * reading slot values after CAS failures.) When a thread has
- * collided at each slot within the current arena bound, it tries
- * to expand the arena size by one. We track collisions within
- * bounds by using a version (sequence) number on the "bound"
- * field, and conservatively reset collision counts when a
- * participant notices that bound has been updated (in either
- * direction).
+ * while trying to exchange. And shrink it via "spinouts" in which
+ * threads give up waiting at a slot. By nature of the above
+ * algorithm, the only kinds of collision that reliably indicate
+ * contention are when two attempted releases collide -- one of
+ * two attempted offers can legitimately fail to CAS without
+ * indicating contention by more than one other thread.
*
- * The effective arena size is reduced (when there is more than
- * one slot) by giving up on waiting after a while and trying to
- * decrement the arena size on expiration. The value of "a while"
- * is an empirical matter. We implement by piggybacking on the
- * use of spin->yield->block that is essential for reasonable
- * waiting performance anyway -- in a busy exchanger, offers are
- * usually almost immediately released, in which case context
- * switching on multiprocessors is extremely slow/wasteful. Arena
- * waits just omit the blocking part, and instead cancel. The spin
- * count is empirically chosen to be a value that avoids blocking
- * 99% of the time under maximum sustained exchange rates on a
- * range of test machines. Spins and yields entail some limited
- * randomness (using a cheap xorshift) to avoid regular patterns
- * that can induce unproductive grow/shrink cycles. (Using a
- * pseudorandom also helps regularize spin cycle duration by
- * making branches unpredictable.) Also, during an offer, a
- * waiter can "know" that it will be released when its slot has
- * changed, but cannot yet proceed until match is set. In the
- * mean time it cannot cancel the offer, so instead spins/yields.
- * Note: It is possible to avoid this secondary check by changing
- * the linearization point to be a CAS of the match field (as done
- * in one case in the Scott & Scherer DISC paper), which also
- * increases asynchrony a bit, at the expense of poorer collision
- * detection and inability to always reuse per-thread nodes. So
- * the current scheme is typically a better tradeoff.
+ * Arena size (the value of field "bound") is controlled by random
+ * sampling. On each miss (collision or spinout), a thread chooses
+ * a new random index within the arena. Upon the third collision
+ * with the same current bound, it tries to grow the arena. And
+ * upon the second spinout, it tries to shrink. The asymmetry in
+ * part reflects relative costs, and reduces flailing. Because
+ * they cannot be changed without also changing the sampling
+ * strategy, these rules are directly incorporated into uses of
+ * the xchg "misses" variable. The bound field is tagged with
+ * sequence numbers to reduce stale decisions. Uniform random
+ * indices are generated using XorShift with enough bits so that
+ * bias (See Knuth TAoCP vol 2) is negligible for moduli used here
+ * (at most 256) without requiring rejection tests. Using
+ * nonuniform randoms with greater weight to higher indices is
+ * also possible but does not seem worthwhile in practice.
*
- * On collisions, indices traverse the arena cyclically in reverse
- * order, restarting at the maximum index (which will tend to be
- * sparsest) when bounds change. (On expirations, indices instead
- * are halved until reaching 0.) It is possible (and has been
- * tried) to use randomized, prime-value-stepped, or double-hash
- * style traversal instead of simple cyclic traversal to reduce
- * bunching. But empirically, whatever benefits these may have
- * don't overcome their added overhead: We are managing operations
- * that occur very quickly unless there is sustained contention,
- * so simpler/faster control policies work better than more
- * accurate but slower ones.
+ * These mechanics rely on a reasonable choice of constant SPINS.
+ * The time cost of SPINS * Thread.onSpinWait() should be at least
+ * the expected cost of a park/unpark context switch, and larger
+ * than that of two failed CASes, but still small enough to avoid
+ * excessive delays during arena shrinkage. We also deal with the
+ * possibility that when an offering thread waits for a release,
+ * spin-waiting would be useless because the releasing thread is
+ * descheduled. On multiprocessors, we cannot know this in
+ * general. But when Virtual Threads are used, method
+ * ForkJoinWorkerThread.hasKnownQueuedWork serves as a guide to
+ * whether to spin or immediately block, allowing a context switch
+ * that may enable a releaser. Note also that when many threads
+ * are being run on few cores, enountering enough collisions to
+ * trigger arena growth is rare, and soon followed by shrinkage,
+ * so this doesn't require special handling.
*
- * Because we use expiration for arena size control, we cannot
- * throw TimeoutExceptions in the timed version of the public
- * exchange method until the arena size has shrunken to zero (or
- * the arena isn't enabled). This may delay response to timeout
- * but is still within spec.
+ * The basic exchange mechanics rely on checks that Node item
+ * fields are not null, which doesn't work when offered items are
+ * null. We trap this case by translating nulls to the
+ * (un-Exchangeable) value of the static Participant
+ * reference.
*
- * Essentially all of the implementation is in methods
- * slotExchange and arenaExchange. These have similar overall
- * structure, but differ in too many details to combine. The
- * slotExchange method uses the single Exchanger field "slot"
- * rather than arena array elements. However, it still needs
- * minimal collision detection to trigger arena construction.
- * (The messiest part is making sure interrupt status and
- * InterruptedExceptions come out right during transitions when
- * both methods may be called. This is done by using null return
- * as a sentinel to recheck interrupt status.)
+ * Essentially all of the implementation is in method xchg. As is
+ * too common in this sort of code, most of the logic relies on
+ * reads of fields that are maintained as local variables so can't
+ * be nicely factored. It is structured as a main loop with a
+ * leading volatile read (of field bound), that causes others to
+ * be freshly read even though declared in plain mode. We don't
+ * use compareAndExchange that would otherwise save some re-reads
+ * because of the need to recheck indices and bounds on failures.
*
- * As is too common in this sort of code, methods are monolithic
- * because most of the logic relies on reads of fields that are
- * maintained as local variables so can't be nicely factored --
- * mainly, here, bulky spin->yield->block/cancel code. Note that
- * field Node.item is not declared as volatile even though it is
- * read by releasing threads, because they only do so after CAS
- * operations that must precede access, and all uses by the owning
- * thread are otherwise acceptably ordered by other operations.
- * (Because the actual points of atomicity are slot CASes, it
- * would also be legal for the write to Node.match in a release to
- * be weaker than a full volatile write. However, this is not done
- * because it could allow further postponement of the write,
- * delaying progress.)
+ * Support for optional timeouts in a single method adds further
+ * complexity. Note that for the sake of arena bounds control,
+ * time bounds must be ignored during spinouts, which may delay
+ * TimeoutExceptions (but no more so than would excessive context
+ * switching that could occur otherwise). Responses to
+ * interruption are handled similarly, postponing commitment to
+ * throw InterruptedException until successfully cancelled.
+ *
+ * Design differences from previous releases include:
+ * * Accommodation of VirtualThreads.
+ * * Use of Slots vs spaced indices for the arena and static
+ * ThreadLocals, avoiding separate arena vs non-arena modes.
+ * * Use of random sampling for grow/shrink decisions, with typically
+ * faster and more stable adaptation (as was mentioned as a
+ * possible improvement in previous version).
*/
- /**
- * The index distance (as a shift value) between any two used slots
- * in the arena, spacing them out to avoid false sharing.
- */
- private static final int ASHIFT = 5;
-
/**
* The maximum supported arena index. The maximum allocatable
- * arena size is MMASK + 1. Must be a power of two minus one, less
- * than (1<<(31-ASHIFT)). The cap of 255 (0xff) more than suffices
- * for the expected scaling limits of the main algorithms.
+ * arena size is MMASK + 1. Must be a power of two minus one. The
+ * cap of 255 (0xff) more than suffices for the expected scaling
+ * limits of the main algorithms.
*/
private static final int MMASK = 0xff;
@@ -267,49 +251,34 @@ public class Exchanger {
*/
private static final int SEQ = MMASK + 1;
- /** The number of CPUs, for sizing and spin control */
- private static final int NCPU = Runtime.getRuntime().availableProcessors();
-
/**
- * The maximum slot index of the arena: The number of slots that
- * can in principle hold all threads without contention, or at
- * most the maximum indexable value.
- */
- static final int FULL = (NCPU >= (MMASK << 1)) ? MMASK : NCPU >>> 1;
-
- /**
- * The bound for spins while waiting for a match. The actual
- * number of iterations will on average be about twice this value
- * due to randomization. Note: Spinning is disabled when NCPU==1.
+ * The bound for spins while waiting for a match before either
+ * blocking or possibly shrinking arena.
*/
private static final int SPINS = 1 << 10;
/**
- * Value representing null arguments/returns from public
- * methods. Needed because the API originally didn't disallow null
- * arguments, which it should have.
+ * Padded arena cells to avoid false-sharing memory contention
*/
- private static final Object NULL_ITEM = new Object();
-
- /**
- * Sentinel value returned by internal exchange methods upon
- * timeout, to avoid need for separate timed versions of these
- * methods.
- */
- private static final Object TIMED_OUT = new Object();
+ @jdk.internal.vm.annotation.Contended
+ static final class Slot {
+ Node entry;
+ }
/**
* Nodes hold partially exchanged data, plus other per-thread
- * bookkeeping. Padded via @Contended to reduce memory contention.
+ * bookkeeping.
*/
- @jdk.internal.vm.annotation.Contended static final class Node {
+ static final class Node {
+ long seed; // Random seed
int index; // Arena index
- int bound; // Last recorded value of Exchanger.bound
- int collides; // Number of CAS failures at current bound
- int hash; // Pseudo-random for spins
Object item; // This thread's current item
volatile Object match; // Item provided by releasing thread
volatile Thread parked; // Set to this thread when parked, else null
+ Node() {
+ index = -1; // initialize on first use
+ seed = Thread.currentThread().threadId();
+ }
}
/** The corresponding thread local class */
@@ -318,210 +287,152 @@ public class Exchanger {
}
/**
- * Per-thread state.
+ * The participant thread-locals. Because it is impossible to
+ * exchange, we also use this reference for dealing with null user
+ * arguments that are translated in and out of this value
+ * surrounding use.
*/
- private final Participant participant;
+ private static final Participant participant = new Participant();
/**
- * Elimination array; null until enabled (within slotExchange).
- * Element accesses use emulation of volatile gets and CAS.
+ * Elimination array; element accesses use emulation of volatile
+ * gets and CAS.
*/
- private volatile Node[] arena;
+ private final Slot[] arena;
/**
- * Slot used until contention detected.
+ * Number of cores, for sizing and spin control. Computed only
+ * upon construction.
*/
- private volatile Node slot;
+ private final int ncpu;
/**
- * The index of the largest valid arena position, OR'ed with SEQ
- * number in high bits, incremented on each update. The initial
- * update from 0 to SEQ is used to ensure that the arena array is
- * constructed only once.
+ * The index of the largest valid arena position.
*/
private volatile int bound;
/**
- * Exchange function when arenas enabled. See above for explanation.
+ * Exchange function. See above for explanation.
*
- * @param item the (non-null) item to exchange
- * @param timed true if the wait is timed
- * @param ns if timed, the maximum wait time, else 0L
- * @return the other thread's item; or null if interrupted; or
- * TIMED_OUT if timed and timed out
+ * @param x the item to exchange
+ * @param deadline if zero, untimed, else timeout deadline
+ * @return the other thread's item
+ * @throws InterruptedException if interrupted while waiting
+ * @throws TimeoutException if deadline nonzero and timed out
*/
- private final Object arenaExchange(Object item, boolean timed, long ns) {
- Node[] a = arena;
+ private final V xchg(V x, long deadline)
+ throws InterruptedException, TimeoutException {
+ Slot[] a = arena;
int alen = a.length;
- Node p = participant.get();
- for (int i = p.index;;) { // access slot at i
- int b, m, c;
- int j = (i << ASHIFT) + ((1 << ASHIFT) - 1);
- if (j < 0 || j >= alen)
- j = alen - 1;
- Node q = (Node)AA.getAcquire(a, j);
- if (q != null && AA.compareAndSet(a, j, q, null)) {
- Object v = q.item; // release
- q.match = item;
- Thread w = q.parked;
- if (w != null)
- LockSupport.unpark(w);
- return v;
+ Participant ps = participant;
+ Object item = (x == null) ? ps : x; // translate nulls
+ Node p = ps.get();
+ int i = p.index; // if < 0, move
+ int misses = 0; // ++ on collide, -- on spinout
+ Object offered = null; // for cleanup
+ Object v = null;
+ outer: for (;;) {
+ int b, m; Slot s; Node q;
+ if ((m = (b = bound) & MMASK) == 0) // volatile read
+ i = 0;
+ if (i < 0 || i > m || i >= alen || (s = a[i]) == null) {
+ long r = p.seed; // randomly move
+ r ^= r << 13; r ^= r >>> 7; r ^= r << 17; // xorShift
+ i = p.index = (int)((p.seed = r) % (m + 1));
}
- else if (i <= (m = (b = bound) & MMASK) && q == null) {
- p.item = item; // offer
- if (AA.compareAndSet(a, j, null, p)) {
- long end = (timed && m == 0) ? System.nanoTime() + ns : 0L;
- Thread t = Thread.currentThread(); // wait
- for (int h = p.hash, spins = SPINS;;) {
- Object v = p.match;
- if (v != null) {
- MATCH.setRelease(p, null);
- p.item = null; // clear for next use
- p.hash = h;
- return v;
+ else if ((q = s.entry) != null) { // try release
+ if (ENTRY.compareAndSet(s, q, null)) {
+ Thread w;
+ v = q.item;
+ q.match = item;
+ if (i == 0 && (w = q.parked) != null)
+ LockSupport.unpark(w);
+ break;
+ }
+ else { // collision
+ int nb;
+ i = -1; // move index
+ if (b != bound) // stale
+ misses = 0;
+ else if (misses <= 2) // continue sampling
+ ++misses;
+ else if ((nb = (b + 1) & MMASK) < alen) {
+ misses = 0; // try to grow
+ if (BOUND.compareAndSet(this, b, b + 1 + SEQ) &&
+ a[i = p.index = nb] == null)
+ AA.compareAndSet(a, nb, null, new Slot());
+ }
+ }
+ }
+ else { // try offer
+ if (offered == null)
+ offered = p.item = item;
+ if (ENTRY.compareAndSet(s, null, p)) {
+ boolean tryCancel; // true if interrupted
+ Thread t = Thread.currentThread();
+ if (!(tryCancel = t.isInterrupted()) && ncpu > 1 &&
+ (i != 0 || // check for busy VTs
+ (!ForkJoinWorkerThread.hasKnownQueuedWork()))) {
+ for (int j = SPINS; j > 0; --j) {
+ if ((v = p.match) != null) {
+ MATCH.set(p, null);
+ break outer; // spin wait
+ }
+ Thread.onSpinWait();
}
- else if (spins > 0) {
- h ^= h << 1; h ^= h >>> 3; h ^= h << 10; // xorshift
- if (h == 0) // initialize hash
- h = SPINS | (int)t.threadId();
- else if (h < 0 && // approx 50% true
- (--spins & ((SPINS >>> 1) - 1)) == 0)
- Thread.yield(); // two yields per wait
+ }
+ for (long ns = 1L;;) { // block or cancel offer
+ if ((v = p.match) != null) {
+ MATCH.set(p, null);
+ break outer;
}
- else if (AA.getAcquire(a, j) != p)
- spins = SPINS; // releaser hasn't set match yet
- else if (!t.isInterrupted() && m == 0 &&
- (!timed ||
- (ns = end - System.nanoTime()) > 0L)) {
- p.parked = t; // minimize window
- if (AA.getAcquire(a, j) == p) {
- if (ns == 0L)
+ if (i == 0 && !tryCancel &&
+ (deadline == 0L ||
+ ((ns = deadline - System.nanoTime()) > 0L))) {
+ p.parked = t; // emable unpark and recheck
+ if (p.match == null) {
+ if (deadline == 0L)
LockSupport.park(this);
else
LockSupport.parkNanos(this, ns);
+ tryCancel = t.isInterrupted();
}
p.parked = null;
}
- else if (AA.getAcquire(a, j) == p &&
- AA.compareAndSet(a, j, p, null)) {
- if (m != 0) // try to shrink
- BOUND.compareAndSet(this, b, b + SEQ - 1);
- p.item = null;
- p.hash = h;
- i = p.index >>>= 1; // descend
+ else if (ENTRY.compareAndSet(s, p, null)) { // cancel
+ offered = p.item = null;
if (Thread.interrupted())
- return null;
- if (timed && m == 0 && ns <= 0L)
- return TIMED_OUT;
- break; // expired; restart
+ throw new InterruptedException();
+ if (deadline != 0L && ns <= 0L)
+ throw new TimeoutException();
+ i = -1; // move and restart
+ if (bound != b)
+ misses = 0; // stale
+ else if (misses >= 0)
+ --misses; // continue sampling
+ else if ((b & MMASK) != 0) {
+ misses = 0; // try to shrink
+ BOUND.compareAndSet(this, b, b - 1 + SEQ);
+ }
+ continue outer;
}
}
}
- else
- p.item = null; // clear offer
- }
- else {
- if (p.bound != b) { // stale; reset
- p.bound = b;
- p.collides = 0;
- i = (i != m || m == 0) ? m : m - 1;
- }
- else if ((c = p.collides) < m || m == FULL ||
- !BOUND.compareAndSet(this, b, b + SEQ + 1)) {
- p.collides = c + 1;
- i = (i == 0) ? m : i - 1; // cyclically traverse
- }
- else
- i = m + 1; // grow
- p.index = i;
}
}
- }
-
- /**
- * Exchange function used until arenas enabled. See above for explanation.
- *
- * @param item the item to exchange
- * @param timed true if the wait is timed
- * @param ns if timed, the maximum wait time, else 0L
- * @return the other thread's item; or null if either the arena
- * was enabled or the thread was interrupted before completion; or
- * TIMED_OUT if timed and timed out
- */
- private final Object slotExchange(Object item, boolean timed, long ns) {
- Node p = participant.get();
- Thread t = Thread.currentThread();
- if (t.isInterrupted()) // preserve interrupt status so caller can recheck
- return null;
-
- for (Node q;;) {
- if ((q = slot) != null) {
- if (SLOT.compareAndSet(this, q, null)) {
- Object v = q.item;
- q.match = item;
- Thread w = q.parked;
- if (w != null)
- LockSupport.unpark(w);
- return v;
- }
- // create arena on contention, but continue until slot null
- if (NCPU > 1 && bound == 0 &&
- BOUND.compareAndSet(this, 0, SEQ))
- arena = new Node[(FULL + 2) << ASHIFT];
- }
- else if (arena != null)
- return null; // caller must reroute to arenaExchange
- else {
- p.item = item;
- if (SLOT.compareAndSet(this, null, p))
- break;
- p.item = null;
- }
- }
-
- // await release
- int h = p.hash;
- long end = timed ? System.nanoTime() + ns : 0L;
- int spins = (NCPU > 1) ? SPINS : 1;
- Object v;
- while ((v = p.match) == null) {
- if (spins > 0) {
- h ^= h << 1; h ^= h >>> 3; h ^= h << 10;
- if (h == 0)
- h = SPINS | (int)t.threadId();
- else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)
- Thread.yield();
- }
- else if (slot != p)
- spins = SPINS;
- else if (!t.isInterrupted() && arena == null &&
- (!timed || (ns = end - System.nanoTime()) > 0L)) {
- p.parked = t;
- if (slot == p) {
- if (ns == 0L)
- LockSupport.park(this);
- else
- LockSupport.parkNanos(this, ns);
- }
- p.parked = null;
- }
- else if (SLOT.compareAndSet(this, p, null)) {
- v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null;
- break;
- }
- }
- MATCH.setRelease(p, null);
- p.item = null;
- p.hash = h;
- return v;
+ if (offered != null) // cleanup
+ p.item = null;
+ @SuppressWarnings("unchecked") V ret = (v == participant) ? null : (V)v;
+ return ret;
}
/**
* Creates a new Exchanger.
*/
public Exchanger() {
- participant = new Participant();
+ int h = (ncpu = Runtime.getRuntime().availableProcessors()) >>> 1;
+ int size = (h == 0) ? 1 : (h > MMASK) ? MMASK + 1 : h;
+ (arena = new Slot[size])[0] = new Slot();
}
/**
@@ -557,17 +468,12 @@ public class Exchanger {
* @throws InterruptedException if the current thread was
* interrupted while waiting
*/
- @SuppressWarnings("unchecked")
public V exchange(V x) throws InterruptedException {
- Object v;
- Node[] a;
- Object item = (x == null) ? NULL_ITEM : x; // translate null args
- if (((a = arena) != null ||
- (v = slotExchange(item, false, 0L)) == null) &&
- (Thread.interrupted() || // disambiguates null return
- (v = arenaExchange(item, false, 0L)) == null))
- throw new InterruptedException();
- return (v == NULL_ITEM) ? null : (V)v;
+ try {
+ return xchg(x, 0L);
+ } catch (TimeoutException cannotHappen) {
+ return null; // not reached
+ }
}
/**
@@ -612,34 +518,24 @@ public class Exchanger {
* @throws TimeoutException if the specified waiting time elapses
* before another thread enters the exchange
*/
- @SuppressWarnings("unchecked")
public V exchange(V x, long timeout, TimeUnit unit)
throws InterruptedException, TimeoutException {
- Object v;
- Object item = (x == null) ? NULL_ITEM : x;
- long ns = unit.toNanos(timeout);
- if ((arena != null ||
- (v = slotExchange(item, true, ns)) == null) &&
- (Thread.interrupted() ||
- (v = arenaExchange(item, true, ns)) == null))
- throw new InterruptedException();
- if (v == TIMED_OUT)
- throw new TimeoutException();
- return (v == NULL_ITEM) ? null : (V)v;
+ long d = unit.toNanos(timeout) + System.nanoTime();
+ return xchg(x, (d == 0L) ? 1L : d); // avoid zero deadline
}
// VarHandle mechanics
private static final VarHandle BOUND;
- private static final VarHandle SLOT;
private static final VarHandle MATCH;
+ private static final VarHandle ENTRY;
private static final VarHandle AA;
static {
try {
MethodHandles.Lookup l = MethodHandles.lookup();
BOUND = l.findVarHandle(Exchanger.class, "bound", int.class);
- SLOT = l.findVarHandle(Exchanger.class, "slot", Node.class);
MATCH = l.findVarHandle(Node.class, "match", Object.class);
- AA = MethodHandles.arrayElementVarHandle(Node[].class);
+ ENTRY = l.findVarHandle(Slot.class, "entry", Node.class);
+ AA = MethodHandles.arrayElementVarHandle(Slot[].class);
} catch (ReflectiveOperationException e) {
throw new ExceptionInInitializerError(e);
}
diff --git a/src/java.base/share/classes/java/util/concurrent/ForkJoinWorkerThread.java b/src/java.base/share/classes/java/util/concurrent/ForkJoinWorkerThread.java
index 1b2777c6e4a..2995fe3c63d 100644
--- a/src/java.base/share/classes/java/util/concurrent/ForkJoinWorkerThread.java
+++ b/src/java.base/share/classes/java/util/concurrent/ForkJoinWorkerThread.java
@@ -39,6 +39,8 @@ import java.security.AccessController;
import java.security.AccessControlContext;
import java.security.PrivilegedAction;
import java.security.ProtectionDomain;
+import jdk.internal.access.JavaLangAccess;
+import jdk.internal.access.SharedSecrets;
/**
* A thread managed by a {@link ForkJoinPool}, which executes
@@ -202,6 +204,30 @@ public class ForkJoinWorkerThread extends Thread {
}
}
+ /**
+ * Returns true if the current task is being executed by a
+ * ForkJoinWorkerThread that is momentarily known to have one or
+ * more queued tasks that it could execute immediately. This
+ * method is approximate and useful only as a heuristic indicator
+ * within a running task.
+ *
+ * @return true if the current task is being executed by a worker
+ * that has queued work
+ */
+ static boolean hasKnownQueuedWork() {
+ ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue q, sq;
+ ForkJoinPool p; ForkJoinPool.WorkQueue[] qs; int i;
+ Thread c = JLA.currentCarrierThread();
+ return ((c instanceof ForkJoinWorkerThread) &&
+ (p = (wt = (ForkJoinWorkerThread)c).pool) != null &&
+ (q = wt.workQueue) != null &&
+ (i = q.source) >= 0 && // check local and current source queues
+ (((qs = p.queues) != null && qs.length > i &&
+ (sq = qs[i]) != null && sq.top - sq.base > 0) ||
+ q.top - q.base > 0));
+ }
+ private static final JavaLangAccess JLA = SharedSecrets.getJavaLangAccess();
+
/**
* A worker thread that has no permissions, is not a member of any
* user-defined ThreadGroup, uses the system class loader as
diff --git a/src/java.base/share/classes/java/util/concurrent/LinkedTransferQueue.java b/src/java.base/share/classes/java/util/concurrent/LinkedTransferQueue.java
index a0d3176c762..118d648c7a2 100644
--- a/src/java.base/share/classes/java/util/concurrent/LinkedTransferQueue.java
+++ b/src/java.base/share/classes/java/util/concurrent/LinkedTransferQueue.java
@@ -426,8 +426,8 @@ public class LinkedTransferQueue extends AbstractQueue
long deadline = (timed) ? System.nanoTime() + ns : 0L;
boolean upc = isUniprocessor; // don't spin but later recheck
Thread w = Thread.currentThread();
- if (w.isVirtual()) // don't spin
- spin = false;
+ if (spin && ForkJoinWorkerThread.hasKnownQueuedWork())
+ spin = false; // don't spin
int spins = (spin & !upc) ? SPINS : 0; // negative when may park
while ((m = item) == e) {
if (spins >= 0) {