8019481: Sync misc j.u.c classes from 166 to tl
Reviewed-by: martin
This commit is contained in:
parent
efb561f632
commit
1001452ba9
@ -49,13 +49,13 @@ public class BrokenBarrierException extends Exception {
|
|||||||
private static final long serialVersionUID = 7117394618823254244L;
|
private static final long serialVersionUID = 7117394618823254244L;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructs a <tt>BrokenBarrierException</tt> with no specified detail
|
* Constructs a {@code BrokenBarrierException} with no specified detail
|
||||||
* message.
|
* message.
|
||||||
*/
|
*/
|
||||||
public BrokenBarrierException() {}
|
public BrokenBarrierException() {}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructs a <tt>BrokenBarrierException</tt> with the specified
|
* Constructs a {@code BrokenBarrierException} with the specified
|
||||||
* detail message.
|
* detail message.
|
||||||
*
|
*
|
||||||
* @param message the detail message
|
* @param message the detail message
|
||||||
|
@ -45,14 +45,14 @@ import java.util.concurrent.locks.ReentrantLock;
|
|||||||
* <em>cyclic</em> because it can be re-used after the waiting threads
|
* <em>cyclic</em> because it can be re-used after the waiting threads
|
||||||
* are released.
|
* are released.
|
||||||
*
|
*
|
||||||
* <p>A <tt>CyclicBarrier</tt> supports an optional {@link Runnable} command
|
* <p>A {@code CyclicBarrier} supports an optional {@link Runnable} command
|
||||||
* that is run once per barrier point, after the last thread in the party
|
* that is run once per barrier point, after the last thread in the party
|
||||||
* arrives, but before any threads are released.
|
* arrives, but before any threads are released.
|
||||||
* This <em>barrier action</em> is useful
|
* This <em>barrier action</em> is useful
|
||||||
* for updating shared-state before any of the parties continue.
|
* for updating shared-state before any of the parties continue.
|
||||||
*
|
*
|
||||||
* <p><b>Sample usage:</b> Here is an example of
|
* <p><b>Sample usage:</b> Here is an example of using a barrier in a
|
||||||
* using a barrier in a parallel decomposition design:
|
* parallel decomposition design:
|
||||||
*
|
*
|
||||||
* <pre> {@code
|
* <pre> {@code
|
||||||
* class Solver {
|
* class Solver {
|
||||||
@ -81,16 +81,20 @@ import java.util.concurrent.locks.ReentrantLock;
|
|||||||
* public Solver(float[][] matrix) {
|
* public Solver(float[][] matrix) {
|
||||||
* data = matrix;
|
* data = matrix;
|
||||||
* N = matrix.length;
|
* N = matrix.length;
|
||||||
* barrier = new CyclicBarrier(N,
|
* Runnable barrierAction =
|
||||||
* new Runnable() {
|
* new Runnable() { public void run() { mergeRows(...); }};
|
||||||
* public void run() {
|
* barrier = new CyclicBarrier(N, barrierAction);
|
||||||
* mergeRows(...);
|
|
||||||
* }
|
|
||||||
* });
|
|
||||||
* for (int i = 0; i < N; ++i)
|
|
||||||
* new Thread(new Worker(i)).start();
|
|
||||||
*
|
*
|
||||||
* waitUntilDone();
|
* List<Thread> threads = new ArrayList<Thread>(N);
|
||||||
|
* for (int i = 0; i < N; i++) {
|
||||||
|
* Thread thread = new Thread(new Worker(i));
|
||||||
|
* threads.add(thread);
|
||||||
|
* thread.start();
|
||||||
|
* }
|
||||||
|
*
|
||||||
|
* // wait until done
|
||||||
|
* for (Thread thread : threads)
|
||||||
|
* thread.join();
|
||||||
* }
|
* }
|
||||||
* }}</pre>
|
* }}</pre>
|
||||||
*
|
*
|
||||||
@ -98,8 +102,8 @@ import java.util.concurrent.locks.ReentrantLock;
|
|||||||
* barrier until all rows have been processed. When all rows are processed
|
* barrier until all rows have been processed. When all rows are processed
|
||||||
* the supplied {@link Runnable} barrier action is executed and merges the
|
* the supplied {@link Runnable} barrier action is executed and merges the
|
||||||
* rows. If the merger
|
* rows. If the merger
|
||||||
* determines that a solution has been found then <tt>done()</tt> will return
|
* determines that a solution has been found then {@code done()} will return
|
||||||
* <tt>true</tt> and each worker will terminate.
|
* {@code true} and each worker will terminate.
|
||||||
*
|
*
|
||||||
* <p>If the barrier action does not rely on the parties being suspended when
|
* <p>If the barrier action does not rely on the parties being suspended when
|
||||||
* it is executed, then any of the threads in the party could execute that
|
* it is executed, then any of the threads in the party could execute that
|
||||||
@ -112,7 +116,7 @@ import java.util.concurrent.locks.ReentrantLock;
|
|||||||
* // log the completion of this iteration
|
* // log the completion of this iteration
|
||||||
* }}</pre>
|
* }}</pre>
|
||||||
*
|
*
|
||||||
* <p>The <tt>CyclicBarrier</tt> uses an all-or-none breakage model
|
* <p>The {@code CyclicBarrier} uses an all-or-none breakage model
|
||||||
* for failed synchronization attempts: If a thread leaves a barrier
|
* for failed synchronization attempts: If a thread leaves a barrier
|
||||||
* point prematurely because of interruption, failure, or timeout, all
|
* point prematurely because of interruption, failure, or timeout, all
|
||||||
* other threads waiting at that barrier point will also leave
|
* other threads waiting at that barrier point will also leave
|
||||||
@ -139,7 +143,7 @@ public class CyclicBarrier {
|
|||||||
* is reset. There can be many generations associated with threads
|
* is reset. There can be many generations associated with threads
|
||||||
* using the barrier - due to the non-deterministic way the lock
|
* using the barrier - due to the non-deterministic way the lock
|
||||||
* may be allocated to waiting threads - but only one of these
|
* may be allocated to waiting threads - but only one of these
|
||||||
* can be active at a time (the one to which <tt>count</tt> applies)
|
* can be active at a time (the one to which {@code count} applies)
|
||||||
* and all the rest are either broken or tripped.
|
* and all the rest are either broken or tripped.
|
||||||
* There need not be an active generation if there has been a break
|
* There need not be an active generation if there has been a break
|
||||||
* but no subsequent reset.
|
* but no subsequent reset.
|
||||||
@ -259,7 +263,7 @@ public class CyclicBarrier {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a new <tt>CyclicBarrier</tt> that will trip when the
|
* Creates a new {@code CyclicBarrier} that will trip when the
|
||||||
* given number of parties (threads) are waiting upon it, and which
|
* given number of parties (threads) are waiting upon it, and which
|
||||||
* will execute the given barrier action when the barrier is tripped,
|
* will execute the given barrier action when the barrier is tripped,
|
||||||
* performed by the last thread entering the barrier.
|
* performed by the last thread entering the barrier.
|
||||||
@ -278,7 +282,7 @@ public class CyclicBarrier {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a new <tt>CyclicBarrier</tt> that will trip when the
|
* Creates a new {@code CyclicBarrier} that will trip when the
|
||||||
* given number of parties (threads) are waiting upon it, and
|
* given number of parties (threads) are waiting upon it, and
|
||||||
* does not perform a predefined action when the barrier is tripped.
|
* does not perform a predefined action when the barrier is tripped.
|
||||||
*
|
*
|
||||||
@ -301,7 +305,7 @@ public class CyclicBarrier {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Waits until all {@linkplain #getParties parties} have invoked
|
* Waits until all {@linkplain #getParties parties} have invoked
|
||||||
* <tt>await</tt> on this barrier.
|
* {@code await} on this barrier.
|
||||||
*
|
*
|
||||||
* <p>If the current thread is not the last to arrive then it is
|
* <p>If the current thread is not the last to arrive then it is
|
||||||
* disabled for thread scheduling purposes and lies dormant until
|
* disabled for thread scheduling purposes and lies dormant until
|
||||||
@ -326,7 +330,7 @@ public class CyclicBarrier {
|
|||||||
*
|
*
|
||||||
* <p>If the barrier is {@link #reset} while any thread is waiting,
|
* <p>If the barrier is {@link #reset} while any thread is waiting,
|
||||||
* or if the barrier {@linkplain #isBroken is broken} when
|
* or if the barrier {@linkplain #isBroken is broken} when
|
||||||
* <tt>await</tt> is invoked, or while any thread is waiting, then
|
* {@code await} is invoked, or while any thread is waiting, then
|
||||||
* {@link BrokenBarrierException} is thrown.
|
* {@link BrokenBarrierException} is thrown.
|
||||||
*
|
*
|
||||||
* <p>If any thread is {@linkplain Thread#interrupt interrupted} while waiting,
|
* <p>If any thread is {@linkplain Thread#interrupt interrupted} while waiting,
|
||||||
@ -343,7 +347,7 @@ public class CyclicBarrier {
|
|||||||
* the broken state.
|
* the broken state.
|
||||||
*
|
*
|
||||||
* @return the arrival index of the current thread, where index
|
* @return the arrival index of the current thread, where index
|
||||||
* <tt>{@link #getParties()} - 1</tt> indicates the first
|
* {@code getParties() - 1} indicates the first
|
||||||
* to arrive and zero indicates the last to arrive
|
* to arrive and zero indicates the last to arrive
|
||||||
* @throws InterruptedException if the current thread was interrupted
|
* @throws InterruptedException if the current thread was interrupted
|
||||||
* while waiting
|
* while waiting
|
||||||
@ -351,7 +355,7 @@ public class CyclicBarrier {
|
|||||||
* interrupted or timed out while the current thread was
|
* interrupted or timed out while the current thread was
|
||||||
* waiting, or the barrier was reset, or the barrier was
|
* waiting, or the barrier was reset, or the barrier was
|
||||||
* broken when {@code await} was called, or the barrier
|
* broken when {@code await} was called, or the barrier
|
||||||
* action (if present) failed due an exception.
|
* action (if present) failed due to an exception
|
||||||
*/
|
*/
|
||||||
public int await() throws InterruptedException, BrokenBarrierException {
|
public int await() throws InterruptedException, BrokenBarrierException {
|
||||||
try {
|
try {
|
||||||
@ -363,7 +367,7 @@ public class CyclicBarrier {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Waits until all {@linkplain #getParties parties} have invoked
|
* Waits until all {@linkplain #getParties parties} have invoked
|
||||||
* <tt>await</tt> on this barrier, or the specified waiting time elapses.
|
* {@code await} on this barrier, or the specified waiting time elapses.
|
||||||
*
|
*
|
||||||
* <p>If the current thread is not the last to arrive then it is
|
* <p>If the current thread is not the last to arrive then it is
|
||||||
* disabled for thread scheduling purposes and lies dormant until
|
* disabled for thread scheduling purposes and lies dormant until
|
||||||
@ -393,7 +397,7 @@ public class CyclicBarrier {
|
|||||||
*
|
*
|
||||||
* <p>If the barrier is {@link #reset} while any thread is waiting,
|
* <p>If the barrier is {@link #reset} while any thread is waiting,
|
||||||
* or if the barrier {@linkplain #isBroken is broken} when
|
* or if the barrier {@linkplain #isBroken is broken} when
|
||||||
* <tt>await</tt> is invoked, or while any thread is waiting, then
|
* {@code await} is invoked, or while any thread is waiting, then
|
||||||
* {@link BrokenBarrierException} is thrown.
|
* {@link BrokenBarrierException} is thrown.
|
||||||
*
|
*
|
||||||
* <p>If any thread is {@linkplain Thread#interrupt interrupted} while
|
* <p>If any thread is {@linkplain Thread#interrupt interrupted} while
|
||||||
@ -412,16 +416,17 @@ public class CyclicBarrier {
|
|||||||
* @param timeout the time to wait for the barrier
|
* @param timeout the time to wait for the barrier
|
||||||
* @param unit the time unit of the timeout parameter
|
* @param unit the time unit of the timeout parameter
|
||||||
* @return the arrival index of the current thread, where index
|
* @return the arrival index of the current thread, where index
|
||||||
* <tt>{@link #getParties()} - 1</tt> indicates the first
|
* {@code getParties() - 1} indicates the first
|
||||||
* to arrive and zero indicates the last to arrive
|
* to arrive and zero indicates the last to arrive
|
||||||
* @throws InterruptedException if the current thread was interrupted
|
* @throws InterruptedException if the current thread was interrupted
|
||||||
* while waiting
|
* while waiting
|
||||||
* @throws TimeoutException if the specified timeout elapses
|
* @throws TimeoutException if the specified timeout elapses.
|
||||||
|
* In this case the barrier will be broken.
|
||||||
* @throws BrokenBarrierException if <em>another</em> thread was
|
* @throws BrokenBarrierException if <em>another</em> thread was
|
||||||
* interrupted or timed out while the current thread was
|
* interrupted or timed out while the current thread was
|
||||||
* waiting, or the barrier was reset, or the barrier was broken
|
* waiting, or the barrier was reset, or the barrier was broken
|
||||||
* when {@code await} was called, or the barrier action (if
|
* when {@code await} was called, or the barrier action (if
|
||||||
* present) failed due an exception
|
* present) failed due to an exception
|
||||||
*/
|
*/
|
||||||
public int await(long timeout, TimeUnit unit)
|
public int await(long timeout, TimeUnit unit)
|
||||||
throws InterruptedException,
|
throws InterruptedException,
|
||||||
|
@ -35,7 +35,8 @@
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
package java.util.concurrent;
|
package java.util.concurrent;
|
||||||
import java.util.concurrent.atomic.*;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
import java.util.concurrent.locks.LockSupport;
|
import java.util.concurrent.locks.LockSupport;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -52,7 +53,7 @@ import java.util.concurrent.locks.LockSupport;
|
|||||||
* to swap buffers between threads so that the thread filling the
|
* to swap buffers between threads so that the thread filling the
|
||||||
* buffer gets a freshly emptied one when it needs it, handing off the
|
* buffer gets a freshly emptied one when it needs it, handing off the
|
||||||
* filled one to the thread emptying the buffer.
|
* filled one to the thread emptying the buffer.
|
||||||
* <pre>{@code
|
* <pre> {@code
|
||||||
* class FillAndEmpty {
|
* class FillAndEmpty {
|
||||||
* Exchanger<DataBuffer> exchanger = new Exchanger<DataBuffer>();
|
* Exchanger<DataBuffer> exchanger = new Exchanger<DataBuffer>();
|
||||||
* DataBuffer initialEmptyBuffer = ... a made-up type
|
* DataBuffer initialEmptyBuffer = ... a made-up type
|
||||||
@ -88,8 +89,7 @@ import java.util.concurrent.locks.LockSupport;
|
|||||||
* new Thread(new FillingLoop()).start();
|
* new Thread(new FillingLoop()).start();
|
||||||
* new Thread(new EmptyingLoop()).start();
|
* new Thread(new EmptyingLoop()).start();
|
||||||
* }
|
* }
|
||||||
* }
|
* }}</pre>
|
||||||
* }</pre>
|
|
||||||
*
|
*
|
||||||
* <p>Memory consistency effects: For each pair of threads that
|
* <p>Memory consistency effects: For each pair of threads that
|
||||||
* successfully exchange objects via an {@code Exchanger}, actions
|
* successfully exchange objects via an {@code Exchanger}, actions
|
||||||
@ -103,486 +103,425 @@ import java.util.concurrent.locks.LockSupport;
|
|||||||
* @param <V> The type of objects that may be exchanged
|
* @param <V> The type of objects that may be exchanged
|
||||||
*/
|
*/
|
||||||
public class Exchanger<V> {
|
public class Exchanger<V> {
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Algorithm Description:
|
* Overview: The core algorithm is, for an exchange "slot",
|
||||||
|
* and a participant (caller) with an item:
|
||||||
*
|
*
|
||||||
* The basic idea is to maintain a "slot", which is a reference to
|
* for (;;) {
|
||||||
* a Node containing both an Item to offer and a "hole" waiting to
|
* if (slot is empty) { // offer
|
||||||
* get filled in. If an incoming "occupying" thread sees that the
|
* place item in a Node;
|
||||||
* slot is null, it CAS'es (compareAndSets) a Node there and waits
|
* if (can CAS slot from empty to node) {
|
||||||
* for another to invoke exchange. That second "fulfilling" thread
|
* wait for release;
|
||||||
* sees that the slot is non-null, and so CASes it back to null,
|
* return matching item in node;
|
||||||
* also exchanging items by CASing the hole, plus waking up the
|
* }
|
||||||
* occupying thread if it is blocked. In each case CAS'es may
|
* }
|
||||||
* fail because a slot at first appears non-null but is null upon
|
* else if (can CAS slot from node to empty) { // release
|
||||||
* CAS, or vice-versa. So threads may need to retry these
|
* get the item in node;
|
||||||
* actions.
|
* set matching item in node;
|
||||||
|
* release waiting thread;
|
||||||
|
* }
|
||||||
|
* // else retry on CAS failure
|
||||||
|
* }
|
||||||
*
|
*
|
||||||
* This simple approach works great when there are only a few
|
* This is among the simplest forms of a "dual data structure" --
|
||||||
* threads using an Exchanger, but performance rapidly
|
* see Scott and Scherer's DISC 04 paper and
|
||||||
* deteriorates due to CAS contention on the single slot when
|
* http://www.cs.rochester.edu/research/synchronization/pseudocode/duals.html
|
||||||
* there are lots of threads using an exchanger. So instead we use
|
|
||||||
* an "arena"; basically a kind of hash table with a dynamically
|
|
||||||
* varying number of slots, any one of which can be used by
|
|
||||||
* threads performing an exchange. Incoming threads pick slots
|
|
||||||
* based on a hash of their Thread ids. If an incoming thread
|
|
||||||
* fails to CAS in its chosen slot, it picks an alternative slot
|
|
||||||
* instead. And similarly from there. If a thread successfully
|
|
||||||
* CASes into a slot but no other thread arrives, it tries
|
|
||||||
* another, heading toward the zero slot, which always exists even
|
|
||||||
* if the table shrinks. The particular mechanics controlling this
|
|
||||||
* are as follows:
|
|
||||||
*
|
*
|
||||||
* Waiting: Slot zero is special in that it is the only slot that
|
* This works great in principle. But in practice, like many
|
||||||
* exists when there is no contention. A thread occupying slot
|
* algorithms centered on atomic updates to a single location, it
|
||||||
* zero will block if no thread fulfills it after a short spin.
|
* scales horribly when there are more than a few participants
|
||||||
* In other cases, occupying threads eventually give up and try
|
* using the same Exchanger. So the implementation instead uses a
|
||||||
* another slot. Waiting threads spin for a while (a period that
|
* form of elimination arena, that spreads out this contention by
|
||||||
* should be a little less than a typical context-switch time)
|
* arranging that some threads typically use different slots,
|
||||||
* before either blocking (if slot zero) or giving up (if other
|
* while still ensuring that eventually, any two parties will be
|
||||||
* slots) and restarting. There is no reason for threads to block
|
* able to exchange items. That is, we cannot completely partition
|
||||||
* unless there are unlikely to be any other threads present.
|
* across threads, but instead give threads arena indices that
|
||||||
* Occupants are mainly avoiding memory contention so sit there
|
* will on average grow under contention and shrink under lack of
|
||||||
* quietly polling for a shorter period than it would take to
|
* contention. We approach this by defining the Nodes that we need
|
||||||
* block and then unblock them. Non-slot-zero waits that elapse
|
* anyway as ThreadLocals, and include in them per-thread index
|
||||||
* because of lack of other threads waste around one extra
|
* and related bookkeeping state. (We can safely reuse per-thread
|
||||||
* context-switch time per try, which is still on average much
|
* nodes rather than creating them fresh each time because slots
|
||||||
* faster than alternative approaches.
|
* alternate between pointing to a node vs null, so cannot
|
||||||
|
* encounter ABA problems. However, we do need some care in
|
||||||
|
* resetting them between uses.)
|
||||||
*
|
*
|
||||||
* Sizing: Usually, using only a few slots suffices to reduce
|
* Implementing an effective arena requires allocating a bunch of
|
||||||
* contention. Especially with small numbers of threads, using
|
* space, so we only do so upon detecting contention (except on
|
||||||
* too many slots can lead to just as poor performance as using
|
* uniprocessors, where they wouldn't help, so aren't used).
|
||||||
* too few of them, and there's not much room for error. The
|
* Otherwise, exchanges use the single-slot slotExchange method.
|
||||||
* variable "max" maintains the number of slots actually in
|
* On contention, not only must the slots be in different
|
||||||
* use. It is increased when a thread sees too many CAS
|
* locations, but the locations must not encounter memory
|
||||||
* failures. (This is analogous to resizing a regular hash table
|
* contention due to being on the same cache line (or more
|
||||||
* based on a target load factor, except here, growth steps are
|
* generally, the same coherence unit). Because, as of this
|
||||||
* just one-by-one rather than proportional.) Growth requires
|
* writing, there is no way to determine cacheline size, we define
|
||||||
* contention failures in each of three tried slots. Requiring
|
* a value that is enough for common platforms. Additionally,
|
||||||
* multiple failures for expansion copes with the fact that some
|
* extra care elsewhere is taken to avoid other false/unintended
|
||||||
* failed CASes are not due to contention but instead to simple
|
* sharing and to enhance locality, including adding padding (via
|
||||||
* races between two threads or thread pre-emptions occurring
|
* sun.misc.Contended) to Nodes, embedding "bound" as an Exchanger
|
||||||
* between reading and CASing. Also, very transient peak
|
* field, and reworking some park/unpark mechanics compared to
|
||||||
* contention can be much higher than the average sustainable
|
* LockSupport versions.
|
||||||
* levels. An attempt to decrease the max limit is usually made
|
|
||||||
* when a non-slot-zero wait elapses without being fulfilled.
|
|
||||||
* Threads experiencing elapsed waits move closer to zero, so
|
|
||||||
* eventually find existing (or future) threads even if the table
|
|
||||||
* has been shrunk due to inactivity. The chosen mechanics and
|
|
||||||
* thresholds for growing and shrinking are intrinsically
|
|
||||||
* entangled with indexing and hashing inside the exchange code,
|
|
||||||
* and can't be nicely abstracted out.
|
|
||||||
*
|
*
|
||||||
* Hashing: Each thread picks its initial slot to use in accord
|
* The arena starts out with only one used slot. We expand the
|
||||||
* with a simple hashcode. The sequence is the same on each
|
* effective arena size by tracking collisions; i.e., failed CASes
|
||||||
* encounter by any given thread, but effectively random across
|
* while trying to exchange. By nature of the above algorithm, the
|
||||||
* threads. Using arenas encounters the classic cost vs quality
|
* only kinds of collision that reliably indicate contention are
|
||||||
* tradeoffs of all hash tables. Here, we use a one-step FNV-1a
|
* when two attempted releases collide -- one of two attempted
|
||||||
* hash code based on the current thread's Thread.getId(), along
|
* offers can legitimately fail to CAS without indicating
|
||||||
* with a cheap approximation to a mod operation to select an
|
* contention by more than one other thread. (Note: it is possible
|
||||||
* index. The downside of optimizing index selection in this way
|
* but not worthwhile to more precisely detect contention by
|
||||||
* is that the code is hardwired to use a maximum table size of
|
* reading slot values after CAS failures.) When a thread has
|
||||||
* 32. But this value more than suffices for known platforms and
|
* collided at each slot within the current arena bound, it tries
|
||||||
* applications.
|
* to expand the arena size by one. We track collisions within
|
||||||
|
* bounds by using a version (sequence) number on the "bound"
|
||||||
|
* field, and conservatively reset collision counts when a
|
||||||
|
* participant notices that bound has been updated (in either
|
||||||
|
* direction).
|
||||||
*
|
*
|
||||||
* Probing: On sensed contention of a selected slot, we probe
|
* The effective arena size is reduced (when there is more than
|
||||||
* sequentially through the table, analogously to linear probing
|
* one slot) by giving up on waiting after a while and trying to
|
||||||
* after collision in a hash table. (We move circularly, in
|
* decrement the arena size on expiration. The value of "a while"
|
||||||
* reverse order, to mesh best with table growth and shrinkage
|
* is an empirical matter. We implement by piggybacking on the
|
||||||
* rules.) Except that to minimize the effects of false-alarms
|
* use of spin->yield->block that is essential for reasonable
|
||||||
* and cache thrashing, we try the first selected slot twice
|
* waiting performance anyway -- in a busy exchanger, offers are
|
||||||
* before moving.
|
* usually almost immediately released, in which case context
|
||||||
|
* switching on multiprocessors is extremely slow/wasteful. Arena
|
||||||
|
* waits just omit the blocking part, and instead cancel. The spin
|
||||||
|
* count is empirically chosen to be a value that avoids blocking
|
||||||
|
* 99% of the time under maximum sustained exchange rates on a
|
||||||
|
* range of test machines. Spins and yields entail some limited
|
||||||
|
* randomness (using a cheap xorshift) to avoid regular patterns
|
||||||
|
* that can induce unproductive grow/shrink cycles. (Using a
|
||||||
|
* pseudorandom also helps regularize spin cycle duration by
|
||||||
|
* making branches unpredictable.) Also, during an offer, a
|
||||||
|
* waiter can "know" that it will be released when its slot has
|
||||||
|
* changed, but cannot yet proceed until match is set. In the
|
||||||
|
* mean time it cannot cancel the offer, so instead spins/yields.
|
||||||
|
* Note: It is possible to avoid this secondary check by changing
|
||||||
|
* the linearization point to be a CAS of the match field (as done
|
||||||
|
* in one case in the Scott & Scherer DISC paper), which also
|
||||||
|
* increases asynchrony a bit, at the expense of poorer collision
|
||||||
|
* detection and inability to always reuse per-thread nodes. So
|
||||||
|
* the current scheme is typically a better tradeoff.
|
||||||
*
|
*
|
||||||
* Padding: Even with contention management, slots are heavily
|
* On collisions, indices traverse the arena cyclically in reverse
|
||||||
* contended, so use cache-padding to avoid poor memory
|
* order, restarting at the maximum index (which will tend to be
|
||||||
* performance. Because of this, slots are lazily constructed
|
* sparsest) when bounds change. (On expirations, indices instead
|
||||||
* only when used, to avoid wasting this space unnecessarily.
|
* are halved until reaching 0.) It is possible (and has been
|
||||||
* While isolation of locations is not much of an issue at first
|
* tried) to use randomized, prime-value-stepped, or double-hash
|
||||||
* in an application, as time goes on and garbage-collectors
|
* style traversal instead of simple cyclic traversal to reduce
|
||||||
* perform compaction, slots are very likely to be moved adjacent
|
* bunching. But empirically, whatever benefits these may have
|
||||||
* to each other, which can cause much thrashing of cache lines on
|
* don't overcome their added overhead: We are managing operations
|
||||||
* MPs unless padding is employed.
|
* that occur very quickly unless there is sustained contention,
|
||||||
|
* so simpler/faster control policies work better than more
|
||||||
|
* accurate but slower ones.
|
||||||
*
|
*
|
||||||
* This is an improvement of the algorithm described in the paper
|
* Because we use expiration for arena size control, we cannot
|
||||||
* "A Scalable Elimination-based Exchange Channel" by William
|
* throw TimeoutExceptions in the timed version of the public
|
||||||
* Scherer, Doug Lea, and Michael Scott in Proceedings of SCOOL05
|
* exchange method until the arena size has shrunken to zero (or
|
||||||
* workshop. Available at: http://hdl.handle.net/1802/2104
|
* the arena isn't enabled). This may delay response to timeout
|
||||||
|
* but is still within spec.
|
||||||
|
*
|
||||||
|
* Essentially all of the implementation is in methods
|
||||||
|
* slotExchange and arenaExchange. These have similar overall
|
||||||
|
* structure, but differ in too many details to combine. The
|
||||||
|
* slotExchange method uses the single Exchanger field "slot"
|
||||||
|
* rather than arena array elements. However, it still needs
|
||||||
|
* minimal collision detection to trigger arena construction.
|
||||||
|
* (The messiest part is making sure interrupt status and
|
||||||
|
* InterruptedExceptions come out right during transitions when
|
||||||
|
* both methods may be called. This is done by using null return
|
||||||
|
* as a sentinel to recheck interrupt status.)
|
||||||
|
*
|
||||||
|
* As is too common in this sort of code, methods are monolithic
|
||||||
|
* because most of the logic relies on reads of fields that are
|
||||||
|
* maintained as local variables so can't be nicely factored --
|
||||||
|
* mainly, here, bulky spin->yield->block/cancel code), and
|
||||||
|
* heavily dependent on intrinsics (Unsafe) to use inlined
|
||||||
|
* embedded CAS and related memory access operations (that tend
|
||||||
|
* not to be as readily inlined by dynamic compilers when they are
|
||||||
|
* hidden behind other methods that would more nicely name and
|
||||||
|
* encapsulate the intended effects). This includes the use of
|
||||||
|
* putOrderedX to clear fields of the per-thread Nodes between
|
||||||
|
* uses. Note that field Node.item is not declared as volatile
|
||||||
|
* even though it is read by releasing threads, because they only
|
||||||
|
* do so after CAS operations that must precede access, and all
|
||||||
|
* uses by the owning thread are otherwise acceptably ordered by
|
||||||
|
* other operations. (Because the actual points of atomicity are
|
||||||
|
* slot CASes, it would also be legal for the write to Node.match
|
||||||
|
* in a release to be weaker than a full volatile write. However,
|
||||||
|
* this is not done because it could allow further postponement of
|
||||||
|
* the write, delaying progress.)
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The byte distance (as a shift value) between any two used slots
|
||||||
|
* in the arena. 1 << ASHIFT should be at least cacheline size.
|
||||||
|
*/
|
||||||
|
private static final int ASHIFT = 7;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The maximum supported arena index. The maximum allocatable
|
||||||
|
* arena size is MMASK + 1. Must be a power of two minus one, less
|
||||||
|
* than (1<<(31-ASHIFT)). The cap of 255 (0xff) more than suffices
|
||||||
|
* for the expected scaling limits of the main algorithms.
|
||||||
|
*/
|
||||||
|
private static final int MMASK = 0xff;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Unit for sequence/version bits of bound field. Each successful
|
||||||
|
* change to the bound also adds SEQ.
|
||||||
|
*/
|
||||||
|
private static final int SEQ = MMASK + 1;
|
||||||
|
|
||||||
/** The number of CPUs, for sizing and spin control */
|
/** The number of CPUs, for sizing and spin control */
|
||||||
private static final int NCPU = Runtime.getRuntime().availableProcessors();
|
private static final int NCPU = Runtime.getRuntime().availableProcessors();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The capacity of the arena. Set to a value that provides more
|
* The maximum slot index of the arena: The number of slots that
|
||||||
* than enough space to handle contention. On small machines
|
* can in principle hold all threads without contention, or at
|
||||||
* most slots won't be used, but it is still not wasted because
|
* most the maximum indexable value.
|
||||||
* the extra space provides some machine-level address padding
|
|
||||||
* to minimize interference with heavily CAS'ed Slot locations.
|
|
||||||
* And on very large machines, performance eventually becomes
|
|
||||||
* bounded by memory bandwidth, not numbers of threads/CPUs.
|
|
||||||
* This constant cannot be changed without also modifying
|
|
||||||
* indexing and hashing algorithms.
|
|
||||||
*/
|
*/
|
||||||
private static final int CAPACITY = 32;
|
static final int FULL = (NCPU >= (MMASK << 1)) ? MMASK : NCPU >>> 1;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The value of "max" that will hold all threads without
|
* The bound for spins while waiting for a match. The actual
|
||||||
* contention. When this value is less than CAPACITY, some
|
* number of iterations will on average be about twice this value
|
||||||
* otherwise wasted expansion can be avoided.
|
* due to randomization. Note: Spinning is disabled when NCPU==1.
|
||||||
*/
|
*/
|
||||||
private static final int FULL =
|
private static final int SPINS = 1 << 10;
|
||||||
Math.max(0, Math.min(CAPACITY, NCPU / 2) - 1);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* The number of times to spin (doing nothing except polling a
|
|
||||||
* memory location) before blocking or giving up while waiting to
|
|
||||||
* be fulfilled. Should be zero on uniprocessors. On
|
|
||||||
* multiprocessors, this value should be large enough so that two
|
|
||||||
* threads exchanging items as fast as possible block only when
|
|
||||||
* one of them is stalled (due to GC or preemption), but not much
|
|
||||||
* longer, to avoid wasting CPU resources. Seen differently, this
|
|
||||||
* value is a little over half the number of cycles of an average
|
|
||||||
* context switch time on most systems. The value here is
|
|
||||||
* approximately the average of those across a range of tested
|
|
||||||
* systems.
|
|
||||||
*/
|
|
||||||
private static final int SPINS = (NCPU == 1) ? 0 : 2000;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* The number of times to spin before blocking in timed waits.
|
|
||||||
* Timed waits spin more slowly because checking the time takes
|
|
||||||
* time. The best value relies mainly on the relative rate of
|
|
||||||
* System.nanoTime vs memory accesses. The value is empirically
|
|
||||||
* derived to work well across a variety of systems.
|
|
||||||
*/
|
|
||||||
private static final int TIMED_SPINS = SPINS / 20;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Sentinel item representing cancellation of a wait due to
|
|
||||||
* interruption, timeout, or elapsed spin-waits. This value is
|
|
||||||
* placed in holes on cancellation, and used as a return value
|
|
||||||
* from waiting methods to indicate failure to set or get hole.
|
|
||||||
*/
|
|
||||||
private static final Object CANCEL = new Object();
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Value representing null arguments/returns from public
|
* Value representing null arguments/returns from public
|
||||||
* methods. This disambiguates from internal requirement that
|
* methods. Needed because the API originally didn't disallow null
|
||||||
* holes start out as null to mean they are not yet set.
|
* arguments, which it should have.
|
||||||
*/
|
*/
|
||||||
private static final Object NULL_ITEM = new Object();
|
private static final Object NULL_ITEM = new Object();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Nodes hold partially exchanged data. This class
|
* Sentinel value returned by internal exchange methods upon
|
||||||
* opportunistically subclasses AtomicReference to represent the
|
* timeout, to avoid need for separate timed versions of these
|
||||||
* hole. So get() returns hole, and compareAndSet CAS'es value
|
* methods.
|
||||||
* into hole. This class cannot be parameterized as "V" because
|
|
||||||
* of the use of non-V CANCEL sentinels.
|
|
||||||
*/
|
*/
|
||||||
@SuppressWarnings("serial")
|
private static final Object TIMED_OUT = new Object();
|
||||||
private static final class Node extends AtomicReference<Object> {
|
|
||||||
/** The element offered by the Thread creating this node. */
|
|
||||||
public final Object item;
|
|
||||||
|
|
||||||
/** The Thread waiting to be signalled; null until waiting. */
|
|
||||||
public volatile Thread waiter;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates node with given item and empty hole.
|
* Nodes hold partially exchanged data, plus other per-thread
|
||||||
* @param item the item
|
* bookkeeping. Padded via @sun.misc.Contended to reduce memory
|
||||||
|
* contention.
|
||||||
*/
|
*/
|
||||||
public Node(Object item) {
|
@sun.misc.Contended static final class Node {
|
||||||
this.item = item;
|
int index; // Arena index
|
||||||
|
int bound; // Last recorded value of Exchanger.bound
|
||||||
|
int collides; // Number of CAS failures at current bound
|
||||||
|
int hash; // Pseudo-random for spins
|
||||||
|
Object item; // This thread's current item
|
||||||
|
volatile Object match; // Item provided by releasing thread
|
||||||
|
volatile Thread parked; // Set to this thread when parked, else null
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** The corresponding thread local class */
|
||||||
|
static final class Participant extends ThreadLocal<Node> {
|
||||||
|
public Node initialValue() { return new Node(); }
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A Slot is an AtomicReference with heuristic padding to lessen
|
* Per-thread state
|
||||||
* cache effects of this heavily CAS'ed location. While the
|
|
||||||
* padding adds noticeable space, all slots are created only on
|
|
||||||
* demand, and there will be more than one of them only when it
|
|
||||||
* would improve throughput more than enough to outweigh using
|
|
||||||
* extra space.
|
|
||||||
*/
|
*/
|
||||||
@SuppressWarnings("serial")
|
private final Participant participant;
|
||||||
private static final class Slot extends AtomicReference<Object> {
|
|
||||||
// Improve likelihood of isolation on <= 64 byte cache lines
|
|
||||||
long q0, q1, q2, q3, q4, q5, q6, q7, q8, q9, qa, qb, qc, qd, qe;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Slot array. Elements are lazily initialized when needed.
|
* Elimination array; null until enabled (within slotExchange).
|
||||||
* Declared volatile to enable double-checked lazy construction.
|
* Element accesses use emulation of volatile gets and CAS.
|
||||||
*/
|
*/
|
||||||
private volatile Slot[] arena = new Slot[CAPACITY];
|
private volatile Node[] arena;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The maximum slot index being used. The value sometimes
|
* Slot used until contention detected.
|
||||||
* increases when a thread experiences too many CAS contentions,
|
|
||||||
* and sometimes decreases when a spin-wait elapses. Changes
|
|
||||||
* are performed only via compareAndSet, to avoid stale values
|
|
||||||
* when a thread happens to stall right before setting.
|
|
||||||
*/
|
*/
|
||||||
private final AtomicInteger max = new AtomicInteger();
|
private volatile Node slot;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Main exchange function, handling the different policy variants.
|
* The index of the largest valid arena position, OR'ed with SEQ
|
||||||
* Uses Object, not "V" as argument and return value to simplify
|
* number in high bits, incremented on each update. The initial
|
||||||
* handling of sentinel values. Callers from public methods decode
|
* update from 0 to SEQ is used to ensure that the arena array is
|
||||||
* and cast accordingly.
|
* constructed only once.
|
||||||
|
*/
|
||||||
|
private volatile int bound;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Exchange function when arenas enabled. See above for explanation.
|
||||||
*
|
*
|
||||||
* @param item the (non-null) item to exchange
|
* @param item the (non-null) item to exchange
|
||||||
* @param timed true if the wait is timed
|
* @param timed true if the wait is timed
|
||||||
* @param nanos if timed, the maximum wait time
|
* @param ns if timed, the maximum wait time, else 0L
|
||||||
* @return the other thread's item, or CANCEL if interrupted or timed out
|
* @return the other thread's item; or null if interrupted; or
|
||||||
|
* TIMED_OUT if timed and timed out
|
||||||
*/
|
*/
|
||||||
private Object doExchange(Object item, boolean timed, long nanos) {
|
private final Object arenaExchange(Object item, boolean timed, long ns) {
|
||||||
Node me = new Node(item); // Create in case occupying
|
Node[] a = arena;
|
||||||
int index = hashIndex(); // Index of current slot
|
Node p = participant.get();
|
||||||
int fails = 0; // Number of CAS failures
|
for (int i = p.index;;) { // access slot at i
|
||||||
|
int b, m, c; long j; // j is raw array offset
|
||||||
for (;;) {
|
Node q = (Node)U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE);
|
||||||
Object y; // Contents of current slot
|
if (q != null && U.compareAndSwapObject(a, j, q, null)) {
|
||||||
Slot slot = arena[index];
|
Object v = q.item; // release
|
||||||
if (slot == null) // Lazily initialize slots
|
q.match = item;
|
||||||
createSlot(index); // Continue loop to reread
|
Thread w = q.parked;
|
||||||
else if ((y = slot.get()) != null && // Try to fulfill
|
if (w != null)
|
||||||
slot.compareAndSet(y, null)) {
|
U.unpark(w);
|
||||||
Node you = (Node)y; // Transfer item
|
|
||||||
if (you.compareAndSet(null, item)) {
|
|
||||||
LockSupport.unpark(you.waiter);
|
|
||||||
return you.item;
|
|
||||||
} // Else cancelled; continue
|
|
||||||
}
|
|
||||||
else if (y == null && // Try to occupy
|
|
||||||
slot.compareAndSet(null, me)) {
|
|
||||||
if (index == 0) // Blocking wait for slot 0
|
|
||||||
return timed ?
|
|
||||||
awaitNanos(me, slot, nanos) :
|
|
||||||
await(me, slot);
|
|
||||||
Object v = spinWait(me, slot); // Spin wait for non-0
|
|
||||||
if (v != CANCEL)
|
|
||||||
return v;
|
return v;
|
||||||
me = new Node(item); // Throw away cancelled node
|
|
||||||
int m = max.get();
|
|
||||||
if (m > (index >>>= 1)) // Decrease index
|
|
||||||
max.compareAndSet(m, m - 1); // Maybe shrink table
|
|
||||||
}
|
}
|
||||||
else if (++fails > 1) { // Allow 2 fails on 1st slot
|
else if (i <= (m = (b = bound) & MMASK) && q == null) {
|
||||||
int m = max.get();
|
p.item = item; // offer
|
||||||
if (fails > 3 && m < FULL && max.compareAndSet(m, m + 1))
|
if (U.compareAndSwapObject(a, j, null, p)) {
|
||||||
index = m + 1; // Grow on 3rd failed slot
|
long end = (timed && m == 0) ? System.nanoTime() + ns : 0L;
|
||||||
else if (--index < 0)
|
Thread t = Thread.currentThread(); // wait
|
||||||
index = m; // Circularly traverse
|
for (int h = p.hash, spins = SPINS;;) {
|
||||||
}
|
Object v = p.match;
|
||||||
}
|
if (v != null) {
|
||||||
}
|
U.putOrderedObject(p, MATCH, null);
|
||||||
|
p.item = null; // clear for next use
|
||||||
/**
|
p.hash = h;
|
||||||
* Returns a hash index for the current thread. Uses a one-step
|
|
||||||
* FNV-1a hash code (http://www.isthe.com/chongo/tech/comp/fnv/)
|
|
||||||
* based on the current thread's Thread.getId(). These hash codes
|
|
||||||
* have more uniform distribution properties with respect to small
|
|
||||||
* moduli (here 1-31) than do other simple hashing functions.
|
|
||||||
*
|
|
||||||
* <p>To return an index between 0 and max, we use a cheap
|
|
||||||
* approximation to a mod operation, that also corrects for bias
|
|
||||||
* due to non-power-of-2 remaindering (see {@link
|
|
||||||
* java.util.Random#nextInt}). Bits of the hashcode are masked
|
|
||||||
* with "nbits", the ceiling power of two of table size (looked up
|
|
||||||
* in a table packed into three ints). If too large, this is
|
|
||||||
* retried after rotating the hash by nbits bits, while forcing new
|
|
||||||
* top bit to 0, which guarantees eventual termination (although
|
|
||||||
* with a non-random-bias). This requires an average of less than
|
|
||||||
* 2 tries for all table sizes, and has a maximum 2% difference
|
|
||||||
* from perfectly uniform slot probabilities when applied to all
|
|
||||||
* possible hash codes for sizes less than 32.
|
|
||||||
*
|
|
||||||
* @return a per-thread-random index, 0 <= index < max
|
|
||||||
*/
|
|
||||||
private final int hashIndex() {
|
|
||||||
long id = Thread.currentThread().getId();
|
|
||||||
int hash = (((int)(id ^ (id >>> 32))) ^ 0x811c9dc5) * 0x01000193;
|
|
||||||
|
|
||||||
int m = max.get();
|
|
||||||
int nbits = (((0xfffffc00 >> m) & 4) | // Compute ceil(log2(m+1))
|
|
||||||
((0x000001f8 >>> m) & 2) | // The constants hold
|
|
||||||
((0xffff00f2 >>> m) & 1)); // a lookup table
|
|
||||||
int index;
|
|
||||||
while ((index = hash & ((1 << nbits) - 1)) > m) // May retry on
|
|
||||||
hash = (hash >>> nbits) | (hash << (33 - nbits)); // non-power-2 m
|
|
||||||
return index;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Creates a new slot at given index. Called only when the slot
|
|
||||||
* appears to be null. Relies on double-check using builtin
|
|
||||||
* locks, since they rarely contend. This in turn relies on the
|
|
||||||
* arena array being declared volatile.
|
|
||||||
*
|
|
||||||
* @param index the index to add slot at
|
|
||||||
*/
|
|
||||||
private void createSlot(int index) {
|
|
||||||
// Create slot outside of lock to narrow sync region
|
|
||||||
Slot newSlot = new Slot();
|
|
||||||
Slot[] a = arena;
|
|
||||||
synchronized (a) {
|
|
||||||
if (a[index] == null)
|
|
||||||
a[index] = newSlot;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Tries to cancel a wait for the given node waiting in the given
|
|
||||||
* slot, if so, helping clear the node from its slot to avoid
|
|
||||||
* garbage retention.
|
|
||||||
*
|
|
||||||
* @param node the waiting node
|
|
||||||
* @param the slot it is waiting in
|
|
||||||
* @return true if successfully cancelled
|
|
||||||
*/
|
|
||||||
private static boolean tryCancel(Node node, Slot slot) {
|
|
||||||
if (!node.compareAndSet(null, CANCEL))
|
|
||||||
return false;
|
|
||||||
if (slot.get() == node) // pre-check to minimize contention
|
|
||||||
slot.compareAndSet(node, null);
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Three forms of waiting. Each just different enough not to merge
|
|
||||||
// code with others.
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Spin-waits for hole for a non-0 slot. Fails if spin elapses
|
|
||||||
* before hole filled. Does not check interrupt, relying on check
|
|
||||||
* in public exchange method to abort if interrupted on entry.
|
|
||||||
*
|
|
||||||
* @param node the waiting node
|
|
||||||
* @return on success, the hole; on failure, CANCEL
|
|
||||||
*/
|
|
||||||
private static Object spinWait(Node node, Slot slot) {
|
|
||||||
int spins = SPINS;
|
|
||||||
for (;;) {
|
|
||||||
Object v = node.get();
|
|
||||||
if (v != null)
|
|
||||||
return v;
|
return v;
|
||||||
else if (spins > 0)
|
}
|
||||||
--spins;
|
else if (spins > 0) {
|
||||||
|
h ^= h << 1; h ^= h >>> 3; h ^= h << 10; // xorshift
|
||||||
|
if (h == 0) // initialize hash
|
||||||
|
h = SPINS | (int)t.getId();
|
||||||
|
else if (h < 0 && // approx 50% true
|
||||||
|
(--spins & ((SPINS >>> 1) - 1)) == 0)
|
||||||
|
Thread.yield(); // two yields per wait
|
||||||
|
}
|
||||||
|
else if (U.getObjectVolatile(a, j) != p)
|
||||||
|
spins = SPINS; // releaser hasn't set match yet
|
||||||
|
else if (!t.isInterrupted() && m == 0 &&
|
||||||
|
(!timed ||
|
||||||
|
(ns = end - System.nanoTime()) > 0L)) {
|
||||||
|
U.putObject(t, BLOCKER, this); // emulate LockSupport
|
||||||
|
p.parked = t; // minimize window
|
||||||
|
if (U.getObjectVolatile(a, j) == p)
|
||||||
|
U.park(false, ns);
|
||||||
|
p.parked = null;
|
||||||
|
U.putObject(t, BLOCKER, null);
|
||||||
|
}
|
||||||
|
else if (U.getObjectVolatile(a, j) == p &&
|
||||||
|
U.compareAndSwapObject(a, j, p, null)) {
|
||||||
|
if (m != 0) // try to shrink
|
||||||
|
U.compareAndSwapInt(this, BOUND, b, b + SEQ - 1);
|
||||||
|
p.item = null;
|
||||||
|
p.hash = h;
|
||||||
|
i = p.index >>>= 1; // descend
|
||||||
|
if (Thread.interrupted())
|
||||||
|
return null;
|
||||||
|
if (timed && m == 0 && ns <= 0L)
|
||||||
|
return TIMED_OUT;
|
||||||
|
break; // expired; restart
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
else
|
else
|
||||||
tryCancel(node, slot);
|
p.item = null; // clear offer
|
||||||
}
|
}
|
||||||
|
else {
|
||||||
|
if (p.bound != b) { // stale; reset
|
||||||
|
p.bound = b;
|
||||||
|
p.collides = 0;
|
||||||
|
i = (i != m || m == 0) ? m : m - 1;
|
||||||
}
|
}
|
||||||
|
else if ((c = p.collides) < m || m == FULL ||
|
||||||
/**
|
!U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)) {
|
||||||
* Waits for (by spinning and/or blocking) and gets the hole
|
p.collides = c + 1;
|
||||||
* filled in by another thread. Fails if interrupted before
|
i = (i == 0) ? m : i - 1; // cyclically traverse
|
||||||
* hole filled.
|
|
||||||
*
|
|
||||||
* When a node/thread is about to block, it sets its waiter field
|
|
||||||
* and then rechecks state at least one more time before actually
|
|
||||||
* parking, thus covering race vs fulfiller noticing that waiter
|
|
||||||
* is non-null so should be woken.
|
|
||||||
*
|
|
||||||
* Thread interruption status is checked only surrounding calls to
|
|
||||||
* park. The caller is assumed to have checked interrupt status
|
|
||||||
* on entry.
|
|
||||||
*
|
|
||||||
* @param node the waiting node
|
|
||||||
* @return on success, the hole; on failure, CANCEL
|
|
||||||
*/
|
|
||||||
private static Object await(Node node, Slot slot) {
|
|
||||||
Thread w = Thread.currentThread();
|
|
||||||
int spins = SPINS;
|
|
||||||
for (;;) {
|
|
||||||
Object v = node.get();
|
|
||||||
if (v != null)
|
|
||||||
return v;
|
|
||||||
else if (spins > 0) // Spin-wait phase
|
|
||||||
--spins;
|
|
||||||
else if (node.waiter == null) // Set up to block next
|
|
||||||
node.waiter = w;
|
|
||||||
else if (w.isInterrupted()) // Abort on interrupt
|
|
||||||
tryCancel(node, slot);
|
|
||||||
else // Block
|
|
||||||
LockSupport.park(node);
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Waits for (at index 0) and gets the hole filled in by another
|
|
||||||
* thread. Fails if timed out or interrupted before hole filled.
|
|
||||||
* Same basic logic as untimed version, but a bit messier.
|
|
||||||
*
|
|
||||||
* @param node the waiting node
|
|
||||||
* @param nanos the wait time
|
|
||||||
* @return on success, the hole; on failure, CANCEL
|
|
||||||
*/
|
|
||||||
private Object awaitNanos(Node node, Slot slot, long nanos) {
|
|
||||||
int spins = TIMED_SPINS;
|
|
||||||
long lastTime = 0;
|
|
||||||
Thread w = null;
|
|
||||||
for (;;) {
|
|
||||||
Object v = node.get();
|
|
||||||
if (v != null)
|
|
||||||
return v;
|
|
||||||
long now = System.nanoTime();
|
|
||||||
if (w == null)
|
|
||||||
w = Thread.currentThread();
|
|
||||||
else
|
else
|
||||||
nanos -= now - lastTime;
|
i = m + 1; // grow
|
||||||
lastTime = now;
|
p.index = i;
|
||||||
if (nanos > 0) {
|
|
||||||
if (spins > 0)
|
|
||||||
--spins;
|
|
||||||
else if (node.waiter == null)
|
|
||||||
node.waiter = w;
|
|
||||||
else if (w.isInterrupted())
|
|
||||||
tryCancel(node, slot);
|
|
||||||
else
|
|
||||||
LockSupport.parkNanos(node, nanos);
|
|
||||||
}
|
}
|
||||||
else if (tryCancel(node, slot) && !w.isInterrupted())
|
|
||||||
return scanOnTimeout(node);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sweeps through arena checking for any waiting threads. Called
|
* Exchange function used until arenas enabled. See above for explanation.
|
||||||
* only upon return from timeout while waiting in slot 0. When a
|
|
||||||
* thread gives up on a timed wait, it is possible that a
|
|
||||||
* previously-entered thread is still waiting in some other
|
|
||||||
* slot. So we scan to check for any. This is almost always
|
|
||||||
* overkill, but decreases the likelihood of timeouts when there
|
|
||||||
* are other threads present to far less than that in lock-based
|
|
||||||
* exchangers in which earlier-arriving threads may still be
|
|
||||||
* waiting on entry locks.
|
|
||||||
*
|
*
|
||||||
* @param node the waiting node
|
* @param item the item to exchange
|
||||||
* @return another thread's item, or CANCEL
|
* @param timed true if the wait is timed
|
||||||
|
* @param ns if timed, the maximum wait time, else 0L
|
||||||
|
* @return the other thread's item; or null if either the arena
|
||||||
|
* was enabled or the thread was interrupted before completion; or
|
||||||
|
* TIMED_OUT if timed and timed out
|
||||||
*/
|
*/
|
||||||
private Object scanOnTimeout(Node node) {
|
private final Object slotExchange(Object item, boolean timed, long ns) {
|
||||||
Object y;
|
Node p = participant.get();
|
||||||
for (int j = arena.length - 1; j >= 0; --j) {
|
Thread t = Thread.currentThread();
|
||||||
Slot slot = arena[j];
|
if (t.isInterrupted()) // preserve interrupt status so caller can recheck
|
||||||
if (slot != null) {
|
return null;
|
||||||
while ((y = slot.get()) != null) {
|
|
||||||
if (slot.compareAndSet(y, null)) {
|
for (Node q;;) {
|
||||||
Node you = (Node)y;
|
if ((q = slot) != null) {
|
||||||
if (you.compareAndSet(null, node.item)) {
|
if (U.compareAndSwapObject(this, SLOT, q, null)) {
|
||||||
LockSupport.unpark(you.waiter);
|
Object v = q.item;
|
||||||
return you.item;
|
q.match = item;
|
||||||
|
Thread w = q.parked;
|
||||||
|
if (w != null)
|
||||||
|
U.unpark(w);
|
||||||
|
return v;
|
||||||
|
}
|
||||||
|
// create arena on contention, but continue until slot null
|
||||||
|
if (NCPU > 1 && bound == 0 &&
|
||||||
|
U.compareAndSwapInt(this, BOUND, 0, SEQ))
|
||||||
|
arena = new Node[(FULL + 2) << ASHIFT];
|
||||||
|
}
|
||||||
|
else if (arena != null)
|
||||||
|
return null; // caller must reroute to arenaExchange
|
||||||
|
else {
|
||||||
|
p.item = item;
|
||||||
|
if (U.compareAndSwapObject(this, SLOT, null, p))
|
||||||
|
break;
|
||||||
|
p.item = null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// await release
|
||||||
|
int h = p.hash;
|
||||||
|
long end = timed ? System.nanoTime() + ns : 0L;
|
||||||
|
int spins = (NCPU > 1) ? SPINS : 1;
|
||||||
|
Object v;
|
||||||
|
while ((v = p.match) == null) {
|
||||||
|
if (spins > 0) {
|
||||||
|
h ^= h << 1; h ^= h >>> 3; h ^= h << 10;
|
||||||
|
if (h == 0)
|
||||||
|
h = SPINS | (int)t.getId();
|
||||||
|
else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)
|
||||||
|
Thread.yield();
|
||||||
|
}
|
||||||
|
else if (slot != p)
|
||||||
|
spins = SPINS;
|
||||||
|
else if (!t.isInterrupted() && arena == null &&
|
||||||
|
(!timed || (ns = end - System.nanoTime()) > 0L)) {
|
||||||
|
U.putObject(t, BLOCKER, this);
|
||||||
|
p.parked = t;
|
||||||
|
if (slot == p)
|
||||||
|
U.park(false, ns);
|
||||||
|
p.parked = null;
|
||||||
|
U.putObject(t, BLOCKER, null);
|
||||||
|
}
|
||||||
|
else if (U.compareAndSwapObject(this, SLOT, p, null)) {
|
||||||
|
v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null;
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
U.putOrderedObject(p, MATCH, null);
|
||||||
return CANCEL;
|
p.item = null;
|
||||||
|
p.hash = h;
|
||||||
|
return v;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a new Exchanger.
|
* Creates a new Exchanger.
|
||||||
*/
|
*/
|
||||||
public Exchanger() {
|
public Exchanger() {
|
||||||
|
participant = new Participant();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -620,15 +559,14 @@ public class Exchanger<V> {
|
|||||||
*/
|
*/
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public V exchange(V x) throws InterruptedException {
|
public V exchange(V x) throws InterruptedException {
|
||||||
if (!Thread.interrupted()) {
|
Object v;
|
||||||
Object o = doExchange((x == null) ? NULL_ITEM : x, false, 0);
|
Object item = (x == null) ? NULL_ITEM : x; // translate null args
|
||||||
if (o == NULL_ITEM)
|
if ((arena != null ||
|
||||||
return null;
|
(v = slotExchange(item, false, 0L)) == null) &&
|
||||||
if (o != CANCEL)
|
((Thread.interrupted() || // disambiguates null return
|
||||||
return (V)o;
|
(v = arenaExchange(item, false, 0L)) == null)))
|
||||||
Thread.interrupted(); // Clear interrupt status on IE throw
|
|
||||||
}
|
|
||||||
throw new InterruptedException();
|
throw new InterruptedException();
|
||||||
|
return (v == NULL_ITEM) ? null : (V)v;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -666,7 +604,7 @@ public class Exchanger<V> {
|
|||||||
*
|
*
|
||||||
* @param x the object to exchange
|
* @param x the object to exchange
|
||||||
* @param timeout the maximum time to wait
|
* @param timeout the maximum time to wait
|
||||||
* @param unit the time unit of the <tt>timeout</tt> argument
|
* @param unit the time unit of the {@code timeout} argument
|
||||||
* @return the object provided by the other thread
|
* @return the object provided by the other thread
|
||||||
* @throws InterruptedException if the current thread was
|
* @throws InterruptedException if the current thread was
|
||||||
* interrupted while waiting
|
* interrupted while waiting
|
||||||
@ -676,16 +614,51 @@ public class Exchanger<V> {
|
|||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public V exchange(V x, long timeout, TimeUnit unit)
|
public V exchange(V x, long timeout, TimeUnit unit)
|
||||||
throws InterruptedException, TimeoutException {
|
throws InterruptedException, TimeoutException {
|
||||||
if (!Thread.interrupted()) {
|
Object v;
|
||||||
Object o = doExchange((x == null) ? NULL_ITEM : x,
|
Object item = (x == null) ? NULL_ITEM : x;
|
||||||
true, unit.toNanos(timeout));
|
long ns = unit.toNanos(timeout);
|
||||||
if (o == NULL_ITEM)
|
if ((arena != null ||
|
||||||
return null;
|
(v = slotExchange(item, true, ns)) == null) &&
|
||||||
if (o != CANCEL)
|
((Thread.interrupted() ||
|
||||||
return (V)o;
|
(v = arenaExchange(item, true, ns)) == null)))
|
||||||
if (!Thread.interrupted())
|
|
||||||
throw new TimeoutException();
|
|
||||||
}
|
|
||||||
throw new InterruptedException();
|
throw new InterruptedException();
|
||||||
|
if (v == TIMED_OUT)
|
||||||
|
throw new TimeoutException();
|
||||||
|
return (v == NULL_ITEM) ? null : (V)v;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Unsafe mechanics
|
||||||
|
private static final sun.misc.Unsafe U;
|
||||||
|
private static final long BOUND;
|
||||||
|
private static final long SLOT;
|
||||||
|
private static final long MATCH;
|
||||||
|
private static final long BLOCKER;
|
||||||
|
private static final int ABASE;
|
||||||
|
static {
|
||||||
|
int s;
|
||||||
|
try {
|
||||||
|
U = sun.misc.Unsafe.getUnsafe();
|
||||||
|
Class<?> ek = Exchanger.class;
|
||||||
|
Class<?> nk = Node.class;
|
||||||
|
Class<?> ak = Node[].class;
|
||||||
|
Class<?> tk = Thread.class;
|
||||||
|
BOUND = U.objectFieldOffset
|
||||||
|
(ek.getDeclaredField("bound"));
|
||||||
|
SLOT = U.objectFieldOffset
|
||||||
|
(ek.getDeclaredField("slot"));
|
||||||
|
MATCH = U.objectFieldOffset
|
||||||
|
(nk.getDeclaredField("match"));
|
||||||
|
BLOCKER = U.objectFieldOffset
|
||||||
|
(tk.getDeclaredField("parkBlocker"));
|
||||||
|
s = U.arrayIndexScale(ak);
|
||||||
|
// ABASE absorbs padding in front of element 0
|
||||||
|
ABASE = U.arrayBaseOffset(ak) + (1 << ASHIFT);
|
||||||
|
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new Error(e);
|
||||||
|
}
|
||||||
|
if ((s & (s-1)) != 0 || s > (1 << ASHIFT))
|
||||||
|
throw new Error("Unsupported array scale");
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -46,7 +46,7 @@ import java.util.concurrent.locks.LockSupport;
|
|||||||
* {@link java.util.concurrent.CountDownLatch CountDownLatch}
|
* {@link java.util.concurrent.CountDownLatch CountDownLatch}
|
||||||
* but supporting more flexible usage.
|
* but supporting more flexible usage.
|
||||||
*
|
*
|
||||||
* <p> <b>Registration.</b> Unlike the case for other barriers, the
|
* <p><b>Registration.</b> Unlike the case for other barriers, the
|
||||||
* number of parties <em>registered</em> to synchronize on a phaser
|
* number of parties <em>registered</em> to synchronize on a phaser
|
||||||
* may vary over time. Tasks may be registered at any time (using
|
* may vary over time. Tasks may be registered at any time (using
|
||||||
* methods {@link #register}, {@link #bulkRegister}, or forms of
|
* methods {@link #register}, {@link #bulkRegister}, or forms of
|
||||||
@ -59,7 +59,7 @@ import java.util.concurrent.locks.LockSupport;
|
|||||||
* (However, you can introduce such bookkeeping by subclassing this
|
* (However, you can introduce such bookkeeping by subclassing this
|
||||||
* class.)
|
* class.)
|
||||||
*
|
*
|
||||||
* <p> <b>Synchronization.</b> Like a {@code CyclicBarrier}, a {@code
|
* <p><b>Synchronization.</b> Like a {@code CyclicBarrier}, a {@code
|
||||||
* Phaser} may be repeatedly awaited. Method {@link
|
* Phaser} may be repeatedly awaited. Method {@link
|
||||||
* #arriveAndAwaitAdvance} has effect analogous to {@link
|
* #arriveAndAwaitAdvance} has effect analogous to {@link
|
||||||
* java.util.concurrent.CyclicBarrier#await CyclicBarrier.await}. Each
|
* java.util.concurrent.CyclicBarrier#await CyclicBarrier.await}. Each
|
||||||
@ -103,7 +103,7 @@ import java.util.concurrent.locks.LockSupport;
|
|||||||
*
|
*
|
||||||
* </ul>
|
* </ul>
|
||||||
*
|
*
|
||||||
* <p> <b>Termination.</b> A phaser may enter a <em>termination</em>
|
* <p><b>Termination.</b> A phaser may enter a <em>termination</em>
|
||||||
* state, that may be checked using method {@link #isTerminated}. Upon
|
* state, that may be checked using method {@link #isTerminated}. Upon
|
||||||
* termination, all synchronization methods immediately return without
|
* termination, all synchronization methods immediately return without
|
||||||
* waiting for advance, as indicated by a negative return value.
|
* waiting for advance, as indicated by a negative return value.
|
||||||
@ -118,7 +118,7 @@ import java.util.concurrent.locks.LockSupport;
|
|||||||
* also available to abruptly release waiting threads and allow them
|
* also available to abruptly release waiting threads and allow them
|
||||||
* to terminate.
|
* to terminate.
|
||||||
*
|
*
|
||||||
* <p> <b>Tiering.</b> Phasers may be <em>tiered</em> (i.e.,
|
* <p><b>Tiering.</b> Phasers may be <em>tiered</em> (i.e.,
|
||||||
* constructed in tree structures) to reduce contention. Phasers with
|
* constructed in tree structures) to reduce contention. Phasers with
|
||||||
* large numbers of parties that would otherwise experience heavy
|
* large numbers of parties that would otherwise experience heavy
|
||||||
* synchronization contention costs may instead be set up so that
|
* synchronization contention costs may instead be set up so that
|
||||||
@ -300,18 +300,20 @@ public class Phaser {
|
|||||||
private static final int PHASE_SHIFT = 32;
|
private static final int PHASE_SHIFT = 32;
|
||||||
private static final int UNARRIVED_MASK = 0xffff; // to mask ints
|
private static final int UNARRIVED_MASK = 0xffff; // to mask ints
|
||||||
private static final long PARTIES_MASK = 0xffff0000L; // to mask longs
|
private static final long PARTIES_MASK = 0xffff0000L; // to mask longs
|
||||||
|
private static final long COUNTS_MASK = 0xffffffffL;
|
||||||
private static final long TERMINATION_BIT = 1L << 63;
|
private static final long TERMINATION_BIT = 1L << 63;
|
||||||
|
|
||||||
// some special values
|
// some special values
|
||||||
private static final int ONE_ARRIVAL = 1;
|
private static final int ONE_ARRIVAL = 1;
|
||||||
private static final int ONE_PARTY = 1 << PARTIES_SHIFT;
|
private static final int ONE_PARTY = 1 << PARTIES_SHIFT;
|
||||||
|
private static final int ONE_DEREGISTER = ONE_ARRIVAL|ONE_PARTY;
|
||||||
private static final int EMPTY = 1;
|
private static final int EMPTY = 1;
|
||||||
|
|
||||||
// The following unpacking methods are usually manually inlined
|
// The following unpacking methods are usually manually inlined
|
||||||
|
|
||||||
private static int unarrivedOf(long s) {
|
private static int unarrivedOf(long s) {
|
||||||
int counts = (int)s;
|
int counts = (int)s;
|
||||||
return (counts == EMPTY) ? 0 : counts & UNARRIVED_MASK;
|
return (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static int partiesOf(long s) {
|
private static int partiesOf(long s) {
|
||||||
@ -372,38 +374,45 @@ public class Phaser {
|
|||||||
* Manually tuned to speed up and minimize race windows for the
|
* Manually tuned to speed up and minimize race windows for the
|
||||||
* common case of just decrementing unarrived field.
|
* common case of just decrementing unarrived field.
|
||||||
*
|
*
|
||||||
* @param deregister false for arrive, true for arriveAndDeregister
|
* @param adjust value to subtract from state;
|
||||||
|
* ONE_ARRIVAL for arrive,
|
||||||
|
* ONE_DEREGISTER for arriveAndDeregister
|
||||||
*/
|
*/
|
||||||
private int doArrive(boolean deregister) {
|
private int doArrive(int adjust) {
|
||||||
int adj = deregister ? ONE_ARRIVAL|ONE_PARTY : ONE_ARRIVAL;
|
|
||||||
final Phaser root = this.root;
|
final Phaser root = this.root;
|
||||||
for (;;) {
|
for (;;) {
|
||||||
long s = (root == this) ? state : reconcileState();
|
long s = (root == this) ? state : reconcileState();
|
||||||
int phase = (int)(s >>> PHASE_SHIFT);
|
int phase = (int)(s >>> PHASE_SHIFT);
|
||||||
int counts = (int)s;
|
|
||||||
int unarrived = (counts & UNARRIVED_MASK) - 1;
|
|
||||||
if (phase < 0)
|
if (phase < 0)
|
||||||
return phase;
|
return phase;
|
||||||
else if (counts == EMPTY || unarrived < 0) {
|
int counts = (int)s;
|
||||||
if (root == this || reconcileState() == s)
|
int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
|
||||||
|
if (unarrived <= 0)
|
||||||
throw new IllegalStateException(badArrive(s));
|
throw new IllegalStateException(badArrive(s));
|
||||||
}
|
if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s-=adjust)) {
|
||||||
else if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s-=adj)) {
|
if (unarrived == 1) {
|
||||||
if (unarrived == 0) {
|
|
||||||
long n = s & PARTIES_MASK; // base of next state
|
long n = s & PARTIES_MASK; // base of next state
|
||||||
int nextUnarrived = (int)n >>> PARTIES_SHIFT;
|
int nextUnarrived = (int)n >>> PARTIES_SHIFT;
|
||||||
if (root != this)
|
if (root == this) {
|
||||||
return parent.doArrive(nextUnarrived == 0);
|
|
||||||
if (onAdvance(phase, nextUnarrived))
|
if (onAdvance(phase, nextUnarrived))
|
||||||
n |= TERMINATION_BIT;
|
n |= TERMINATION_BIT;
|
||||||
else if (nextUnarrived == 0)
|
else if (nextUnarrived == 0)
|
||||||
n |= EMPTY;
|
n |= EMPTY;
|
||||||
else
|
else
|
||||||
n |= nextUnarrived;
|
n |= nextUnarrived;
|
||||||
n |= (long)((phase + 1) & MAX_PHASE) << PHASE_SHIFT;
|
int nextPhase = (phase + 1) & MAX_PHASE;
|
||||||
|
n |= (long)nextPhase << PHASE_SHIFT;
|
||||||
UNSAFE.compareAndSwapLong(this, stateOffset, s, n);
|
UNSAFE.compareAndSwapLong(this, stateOffset, s, n);
|
||||||
releaseWaiters(phase);
|
releaseWaiters(phase);
|
||||||
}
|
}
|
||||||
|
else if (nextUnarrived == 0) { // propagate deregistration
|
||||||
|
phase = parent.doArrive(ONE_DEREGISTER);
|
||||||
|
UNSAFE.compareAndSwapLong(this, stateOffset,
|
||||||
|
s, s | EMPTY);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
phase = parent.doArrive(ONE_ARRIVAL);
|
||||||
|
}
|
||||||
return phase;
|
return phase;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -417,42 +426,49 @@ public class Phaser {
|
|||||||
*/
|
*/
|
||||||
private int doRegister(int registrations) {
|
private int doRegister(int registrations) {
|
||||||
// adjustment to state
|
// adjustment to state
|
||||||
long adj = ((long)registrations << PARTIES_SHIFT) | registrations;
|
long adjust = ((long)registrations << PARTIES_SHIFT) | registrations;
|
||||||
final Phaser parent = this.parent;
|
final Phaser parent = this.parent;
|
||||||
int phase;
|
int phase;
|
||||||
for (;;) {
|
for (;;) {
|
||||||
long s = state;
|
long s = (parent == null) ? state : reconcileState();
|
||||||
int counts = (int)s;
|
int counts = (int)s;
|
||||||
int parties = counts >>> PARTIES_SHIFT;
|
int parties = counts >>> PARTIES_SHIFT;
|
||||||
int unarrived = counts & UNARRIVED_MASK;
|
int unarrived = counts & UNARRIVED_MASK;
|
||||||
if (registrations > MAX_PARTIES - parties)
|
if (registrations > MAX_PARTIES - parties)
|
||||||
throw new IllegalStateException(badRegister(s));
|
throw new IllegalStateException(badRegister(s));
|
||||||
else if ((phase = (int)(s >>> PHASE_SHIFT)) < 0)
|
phase = (int)(s >>> PHASE_SHIFT);
|
||||||
|
if (phase < 0)
|
||||||
break;
|
break;
|
||||||
else if (counts != EMPTY) { // not 1st registration
|
if (counts != EMPTY) { // not 1st registration
|
||||||
if (parent == null || reconcileState() == s) {
|
if (parent == null || reconcileState() == s) {
|
||||||
if (unarrived == 0) // wait out advance
|
if (unarrived == 0) // wait out advance
|
||||||
root.internalAwaitAdvance(phase, null);
|
root.internalAwaitAdvance(phase, null);
|
||||||
else if (UNSAFE.compareAndSwapLong(this, stateOffset,
|
else if (UNSAFE.compareAndSwapLong(this, stateOffset,
|
||||||
s, s + adj))
|
s, s + adjust))
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else if (parent == null) { // 1st root registration
|
else if (parent == null) { // 1st root registration
|
||||||
long next = ((long)phase << PHASE_SHIFT) | adj;
|
long next = ((long)phase << PHASE_SHIFT) | adjust;
|
||||||
if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next))
|
if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next))
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
synchronized (this) { // 1st sub registration
|
synchronized (this) { // 1st sub registration
|
||||||
if (state == s) { // recheck under lock
|
if (state == s) { // recheck under lock
|
||||||
parent.doRegister(1);
|
phase = parent.doRegister(1);
|
||||||
do { // force current phase
|
if (phase < 0)
|
||||||
|
break;
|
||||||
|
// finish registration whenever parent registration
|
||||||
|
// succeeded, even when racing with termination,
|
||||||
|
// since these are part of the same "transaction".
|
||||||
|
while (!UNSAFE.compareAndSwapLong
|
||||||
|
(this, stateOffset, s,
|
||||||
|
((long)phase << PHASE_SHIFT) | adjust)) {
|
||||||
|
s = state;
|
||||||
phase = (int)(root.state >>> PHASE_SHIFT);
|
phase = (int)(root.state >>> PHASE_SHIFT);
|
||||||
// assert phase < 0 || (int)state == EMPTY;
|
// assert (int)s == EMPTY;
|
||||||
} while (!UNSAFE.compareAndSwapLong
|
}
|
||||||
(this, stateOffset, state,
|
|
||||||
((long)phase << PHASE_SHIFT) | adj));
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -467,10 +483,6 @@ public class Phaser {
|
|||||||
* subphasers have not yet done so, in which case they must finish
|
* subphasers have not yet done so, in which case they must finish
|
||||||
* their own advance by setting unarrived to parties (or if
|
* their own advance by setting unarrived to parties (or if
|
||||||
* parties is zero, resetting to unregistered EMPTY state).
|
* parties is zero, resetting to unregistered EMPTY state).
|
||||||
* However, this method may also be called when "floating"
|
|
||||||
* subphasers with possibly some unarrived parties are merely
|
|
||||||
* catching up to current phase, in which case counts are
|
|
||||||
* unaffected.
|
|
||||||
*
|
*
|
||||||
* @return reconciled state
|
* @return reconciled state
|
||||||
*/
|
*/
|
||||||
@ -478,16 +490,16 @@ public class Phaser {
|
|||||||
final Phaser root = this.root;
|
final Phaser root = this.root;
|
||||||
long s = state;
|
long s = state;
|
||||||
if (root != this) {
|
if (root != this) {
|
||||||
int phase, u, p;
|
int phase, p;
|
||||||
// CAS root phase with current parties; possibly trip unarrived
|
// CAS to root phase with current parties, tripping unarrived
|
||||||
while ((phase = (int)(root.state >>> PHASE_SHIFT)) !=
|
while ((phase = (int)(root.state >>> PHASE_SHIFT)) !=
|
||||||
(int)(s >>> PHASE_SHIFT) &&
|
(int)(s >>> PHASE_SHIFT) &&
|
||||||
!UNSAFE.compareAndSwapLong
|
!UNSAFE.compareAndSwapLong
|
||||||
(this, stateOffset, s,
|
(this, stateOffset, s,
|
||||||
s = (((long)phase << PHASE_SHIFT) |
|
s = (((long)phase << PHASE_SHIFT) |
|
||||||
(s & PARTIES_MASK) |
|
((phase < 0) ? (s & COUNTS_MASK) :
|
||||||
((p = (int)s >>> PARTIES_SHIFT) == 0 ? EMPTY :
|
(((p = (int)s >>> PARTIES_SHIFT) == 0) ? EMPTY :
|
||||||
(u = (int)s & UNARRIVED_MASK) == 0 ? p : u))))
|
((s & PARTIES_MASK) | p))))))
|
||||||
s = state;
|
s = state;
|
||||||
}
|
}
|
||||||
return s;
|
return s;
|
||||||
@ -619,7 +631,7 @@ public class Phaser {
|
|||||||
* of unarrived parties would become negative
|
* of unarrived parties would become negative
|
||||||
*/
|
*/
|
||||||
public int arrive() {
|
public int arrive() {
|
||||||
return doArrive(false);
|
return doArrive(ONE_ARRIVAL);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -639,7 +651,7 @@ public class Phaser {
|
|||||||
* of registered or unarrived parties would become negative
|
* of registered or unarrived parties would become negative
|
||||||
*/
|
*/
|
||||||
public int arriveAndDeregister() {
|
public int arriveAndDeregister() {
|
||||||
return doArrive(true);
|
return doArrive(ONE_DEREGISTER);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -666,17 +678,15 @@ public class Phaser {
|
|||||||
for (;;) {
|
for (;;) {
|
||||||
long s = (root == this) ? state : reconcileState();
|
long s = (root == this) ? state : reconcileState();
|
||||||
int phase = (int)(s >>> PHASE_SHIFT);
|
int phase = (int)(s >>> PHASE_SHIFT);
|
||||||
int counts = (int)s;
|
|
||||||
int unarrived = (counts & UNARRIVED_MASK) - 1;
|
|
||||||
if (phase < 0)
|
if (phase < 0)
|
||||||
return phase;
|
return phase;
|
||||||
else if (counts == EMPTY || unarrived < 0) {
|
int counts = (int)s;
|
||||||
if (reconcileState() == s)
|
int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
|
||||||
|
if (unarrived <= 0)
|
||||||
throw new IllegalStateException(badArrive(s));
|
throw new IllegalStateException(badArrive(s));
|
||||||
}
|
if (UNSAFE.compareAndSwapLong(this, stateOffset, s,
|
||||||
else if (UNSAFE.compareAndSwapLong(this, stateOffset, s,
|
|
||||||
s -= ONE_ARRIVAL)) {
|
s -= ONE_ARRIVAL)) {
|
||||||
if (unarrived != 0)
|
if (unarrived > 1)
|
||||||
return root.internalAwaitAdvance(phase, null);
|
return root.internalAwaitAdvance(phase, null);
|
||||||
if (root != this)
|
if (root != this)
|
||||||
return parent.arriveAndAwaitAdvance();
|
return parent.arriveAndAwaitAdvance();
|
||||||
@ -809,8 +819,8 @@ public class Phaser {
|
|||||||
if (UNSAFE.compareAndSwapLong(root, stateOffset,
|
if (UNSAFE.compareAndSwapLong(root, stateOffset,
|
||||||
s, s | TERMINATION_BIT)) {
|
s, s | TERMINATION_BIT)) {
|
||||||
// signal all threads
|
// signal all threads
|
||||||
releaseWaiters(0);
|
releaseWaiters(0); // Waiters on evenQ
|
||||||
releaseWaiters(1);
|
releaseWaiters(1); // Waiters on oddQ
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1016,7 +1026,7 @@ public class Phaser {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Possibly blocks and waits for phase to advance unless aborted.
|
* Possibly blocks and waits for phase to advance unless aborted.
|
||||||
* Call only from root node.
|
* Call only on root phaser.
|
||||||
*
|
*
|
||||||
* @param phase current phase
|
* @param phase current phase
|
||||||
* @param node if non-null, the wait node to track interrupt and timeout;
|
* @param node if non-null, the wait node to track interrupt and timeout;
|
||||||
@ -1024,6 +1034,7 @@ public class Phaser {
|
|||||||
* @return current phase
|
* @return current phase
|
||||||
*/
|
*/
|
||||||
private int internalAwaitAdvance(int phase, QNode node) {
|
private int internalAwaitAdvance(int phase, QNode node) {
|
||||||
|
// assert root == this;
|
||||||
releaseWaiters(phase-1); // ensure old queue clean
|
releaseWaiters(phase-1); // ensure old queue clean
|
||||||
boolean queued = false; // true when node is enqueued
|
boolean queued = false; // true when node is enqueued
|
||||||
int lastUnarrived = 0; // to increase spins upon change
|
int lastUnarrived = 0; // to increase spins upon change
|
||||||
@ -1082,7 +1093,7 @@ public class Phaser {
|
|||||||
final boolean timed;
|
final boolean timed;
|
||||||
boolean wasInterrupted;
|
boolean wasInterrupted;
|
||||||
long nanos;
|
long nanos;
|
||||||
long lastTime;
|
final long deadline;
|
||||||
volatile Thread thread; // nulled to cancel wait
|
volatile Thread thread; // nulled to cancel wait
|
||||||
QNode next;
|
QNode next;
|
||||||
|
|
||||||
@ -1093,7 +1104,7 @@ public class Phaser {
|
|||||||
this.interruptible = interruptible;
|
this.interruptible = interruptible;
|
||||||
this.nanos = nanos;
|
this.nanos = nanos;
|
||||||
this.timed = timed;
|
this.timed = timed;
|
||||||
this.lastTime = timed ? System.nanoTime() : 0L;
|
this.deadline = timed ? System.nanoTime() + nanos : 0L;
|
||||||
thread = Thread.currentThread();
|
thread = Thread.currentThread();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1112,9 +1123,7 @@ public class Phaser {
|
|||||||
}
|
}
|
||||||
if (timed) {
|
if (timed) {
|
||||||
if (nanos > 0L) {
|
if (nanos > 0L) {
|
||||||
long now = System.nanoTime();
|
nanos = deadline - System.nanoTime();
|
||||||
nanos -= now - lastTime;
|
|
||||||
lastTime = now;
|
|
||||||
}
|
}
|
||||||
if (nanos <= 0L) {
|
if (nanos <= 0L) {
|
||||||
thread = null;
|
thread = null;
|
||||||
@ -1129,7 +1138,7 @@ public class Phaser {
|
|||||||
return true;
|
return true;
|
||||||
else if (!timed)
|
else if (!timed)
|
||||||
LockSupport.park(this);
|
LockSupport.park(this);
|
||||||
else if (nanos > 0)
|
else if (nanos > 0L)
|
||||||
LockSupport.parkNanos(this, nanos);
|
LockSupport.parkNanos(this, nanos);
|
||||||
return isReleasable();
|
return isReleasable();
|
||||||
}
|
}
|
||||||
|
@ -36,10 +36,10 @@
|
|||||||
package java.util.concurrent;
|
package java.util.concurrent;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A <tt>TimeUnit</tt> represents time durations at a given unit of
|
* A {@code TimeUnit} represents time durations at a given unit of
|
||||||
* granularity and provides utility methods to convert across units,
|
* granularity and provides utility methods to convert across units,
|
||||||
* and to perform timing and delay operations in these units. A
|
* and to perform timing and delay operations in these units. A
|
||||||
* <tt>TimeUnit</tt> does not maintain time information, but only
|
* {@code TimeUnit} does not maintain time information, but only
|
||||||
* helps organize and use time representations that may be maintained
|
* helps organize and use time representations that may be maintained
|
||||||
* separately across various contexts. A nanosecond is defined as one
|
* separately across various contexts. A nanosecond is defined as one
|
||||||
* thousandth of a microsecond, a microsecond as one thousandth of a
|
* thousandth of a microsecond, a microsecond as one thousandth of a
|
||||||
@ -47,7 +47,7 @@ package java.util.concurrent;
|
|||||||
* as sixty seconds, an hour as sixty minutes, and a day as twenty four
|
* as sixty seconds, an hour as sixty minutes, and a day as twenty four
|
||||||
* hours.
|
* hours.
|
||||||
*
|
*
|
||||||
* <p>A <tt>TimeUnit</tt> is mainly used to inform time-based methods
|
* <p>A {@code TimeUnit} is mainly used to inform time-based methods
|
||||||
* how a given timing parameter should be interpreted. For example,
|
* how a given timing parameter should be interpreted. For example,
|
||||||
* the following code will timeout in 50 milliseconds if the {@link
|
* the following code will timeout in 50 milliseconds if the {@link
|
||||||
* java.util.concurrent.locks.Lock lock} is not available:
|
* java.util.concurrent.locks.Lock lock} is not available:
|
||||||
@ -63,7 +63,7 @@ package java.util.concurrent;
|
|||||||
*
|
*
|
||||||
* Note however, that there is no guarantee that a particular timeout
|
* Note however, that there is no guarantee that a particular timeout
|
||||||
* implementation will be able to notice the passage of time at the
|
* implementation will be able to notice the passage of time at the
|
||||||
* same granularity as the given <tt>TimeUnit</tt>.
|
* same granularity as the given {@code TimeUnit}.
|
||||||
*
|
*
|
||||||
* @since 1.5
|
* @since 1.5
|
||||||
* @author Doug Lea
|
* @author Doug Lea
|
||||||
@ -174,83 +174,82 @@ public enum TimeUnit {
|
|||||||
// etc. are not declared abstract but otherwise act as abstract methods.
|
// etc. are not declared abstract but otherwise act as abstract methods.
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Convert the given time duration in the given unit to this
|
* Converts the given time duration in the given unit to this unit.
|
||||||
* unit. Conversions from finer to coarser granularities
|
* Conversions from finer to coarser granularities truncate, so
|
||||||
* truncate, so lose precision. For example converting
|
* lose precision. For example, converting {@code 999} milliseconds
|
||||||
* <tt>999</tt> milliseconds to seconds results in
|
* to seconds results in {@code 0}. Conversions from coarser to
|
||||||
* <tt>0</tt>. Conversions from coarser to finer granularities
|
* finer granularities with arguments that would numerically
|
||||||
* with arguments that would numerically overflow saturate to
|
* overflow saturate to {@code Long.MIN_VALUE} if negative or
|
||||||
* <tt>Long.MIN_VALUE</tt> if negative or <tt>Long.MAX_VALUE</tt>
|
* {@code Long.MAX_VALUE} if positive.
|
||||||
* if positive.
|
|
||||||
*
|
*
|
||||||
* <p>For example, to convert 10 minutes to milliseconds, use:
|
* <p>For example, to convert 10 minutes to milliseconds, use:
|
||||||
* <tt>TimeUnit.MILLISECONDS.convert(10L, TimeUnit.MINUTES)</tt>
|
* {@code TimeUnit.MILLISECONDS.convert(10L, TimeUnit.MINUTES)}
|
||||||
*
|
*
|
||||||
* @param sourceDuration the time duration in the given <tt>sourceUnit</tt>
|
* @param sourceDuration the time duration in the given {@code sourceUnit}
|
||||||
* @param sourceUnit the unit of the <tt>sourceDuration</tt> argument
|
* @param sourceUnit the unit of the {@code sourceDuration} argument
|
||||||
* @return the converted duration in this unit,
|
* @return the converted duration in this unit,
|
||||||
* or <tt>Long.MIN_VALUE</tt> if conversion would negatively
|
* or {@code Long.MIN_VALUE} if conversion would negatively
|
||||||
* overflow, or <tt>Long.MAX_VALUE</tt> if it would positively overflow.
|
* overflow, or {@code Long.MAX_VALUE} if it would positively overflow.
|
||||||
*/
|
*/
|
||||||
public long convert(long sourceDuration, TimeUnit sourceUnit) {
|
public long convert(long sourceDuration, TimeUnit sourceUnit) {
|
||||||
throw new AbstractMethodError();
|
throw new AbstractMethodError();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Equivalent to <tt>NANOSECONDS.convert(duration, this)</tt>.
|
* Equivalent to
|
||||||
|
* {@link #convert(long, TimeUnit) NANOSECONDS.convert(duration, this)}.
|
||||||
* @param duration the duration
|
* @param duration the duration
|
||||||
* @return the converted duration,
|
* @return the converted duration,
|
||||||
* or <tt>Long.MIN_VALUE</tt> if conversion would negatively
|
* or {@code Long.MIN_VALUE} if conversion would negatively
|
||||||
* overflow, or <tt>Long.MAX_VALUE</tt> if it would positively overflow.
|
* overflow, or {@code Long.MAX_VALUE} if it would positively overflow.
|
||||||
* @see #convert
|
|
||||||
*/
|
*/
|
||||||
public long toNanos(long duration) {
|
public long toNanos(long duration) {
|
||||||
throw new AbstractMethodError();
|
throw new AbstractMethodError();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Equivalent to <tt>MICROSECONDS.convert(duration, this)</tt>.
|
* Equivalent to
|
||||||
|
* {@link #convert(long, TimeUnit) MICROSECONDS.convert(duration, this)}.
|
||||||
* @param duration the duration
|
* @param duration the duration
|
||||||
* @return the converted duration,
|
* @return the converted duration,
|
||||||
* or <tt>Long.MIN_VALUE</tt> if conversion would negatively
|
* or {@code Long.MIN_VALUE} if conversion would negatively
|
||||||
* overflow, or <tt>Long.MAX_VALUE</tt> if it would positively overflow.
|
* overflow, or {@code Long.MAX_VALUE} if it would positively overflow.
|
||||||
* @see #convert
|
|
||||||
*/
|
*/
|
||||||
public long toMicros(long duration) {
|
public long toMicros(long duration) {
|
||||||
throw new AbstractMethodError();
|
throw new AbstractMethodError();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Equivalent to <tt>MILLISECONDS.convert(duration, this)</tt>.
|
* Equivalent to
|
||||||
|
* {@link #convert(long, TimeUnit) MILLISECONDS.convert(duration, this)}.
|
||||||
* @param duration the duration
|
* @param duration the duration
|
||||||
* @return the converted duration,
|
* @return the converted duration,
|
||||||
* or <tt>Long.MIN_VALUE</tt> if conversion would negatively
|
* or {@code Long.MIN_VALUE} if conversion would negatively
|
||||||
* overflow, or <tt>Long.MAX_VALUE</tt> if it would positively overflow.
|
* overflow, or {@code Long.MAX_VALUE} if it would positively overflow.
|
||||||
* @see #convert
|
|
||||||
*/
|
*/
|
||||||
public long toMillis(long duration) {
|
public long toMillis(long duration) {
|
||||||
throw new AbstractMethodError();
|
throw new AbstractMethodError();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Equivalent to <tt>SECONDS.convert(duration, this)</tt>.
|
* Equivalent to
|
||||||
|
* {@link #convert(long, TimeUnit) SECONDS.convert(duration, this)}.
|
||||||
* @param duration the duration
|
* @param duration the duration
|
||||||
* @return the converted duration,
|
* @return the converted duration,
|
||||||
* or <tt>Long.MIN_VALUE</tt> if conversion would negatively
|
* or {@code Long.MIN_VALUE} if conversion would negatively
|
||||||
* overflow, or <tt>Long.MAX_VALUE</tt> if it would positively overflow.
|
* overflow, or {@code Long.MAX_VALUE} if it would positively overflow.
|
||||||
* @see #convert
|
|
||||||
*/
|
*/
|
||||||
public long toSeconds(long duration) {
|
public long toSeconds(long duration) {
|
||||||
throw new AbstractMethodError();
|
throw new AbstractMethodError();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Equivalent to <tt>MINUTES.convert(duration, this)</tt>.
|
* Equivalent to
|
||||||
|
* {@link #convert(long, TimeUnit) MINUTES.convert(duration, this)}.
|
||||||
* @param duration the duration
|
* @param duration the duration
|
||||||
* @return the converted duration,
|
* @return the converted duration,
|
||||||
* or <tt>Long.MIN_VALUE</tt> if conversion would negatively
|
* or {@code Long.MIN_VALUE} if conversion would negatively
|
||||||
* overflow, or <tt>Long.MAX_VALUE</tt> if it would positively overflow.
|
* overflow, or {@code Long.MAX_VALUE} if it would positively overflow.
|
||||||
* @see #convert
|
|
||||||
* @since 1.6
|
* @since 1.6
|
||||||
*/
|
*/
|
||||||
public long toMinutes(long duration) {
|
public long toMinutes(long duration) {
|
||||||
@ -258,12 +257,12 @@ public enum TimeUnit {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Equivalent to <tt>HOURS.convert(duration, this)</tt>.
|
* Equivalent to
|
||||||
|
* {@link #convert(long, TimeUnit) HOURS.convert(duration, this)}.
|
||||||
* @param duration the duration
|
* @param duration the duration
|
||||||
* @return the converted duration,
|
* @return the converted duration,
|
||||||
* or <tt>Long.MIN_VALUE</tt> if conversion would negatively
|
* or {@code Long.MIN_VALUE} if conversion would negatively
|
||||||
* overflow, or <tt>Long.MAX_VALUE</tt> if it would positively overflow.
|
* overflow, or {@code Long.MAX_VALUE} if it would positively overflow.
|
||||||
* @see #convert
|
|
||||||
* @since 1.6
|
* @since 1.6
|
||||||
*/
|
*/
|
||||||
public long toHours(long duration) {
|
public long toHours(long duration) {
|
||||||
@ -271,10 +270,10 @@ public enum TimeUnit {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Equivalent to <tt>DAYS.convert(duration, this)</tt>.
|
* Equivalent to
|
||||||
|
* {@link #convert(long, TimeUnit) DAYS.convert(duration, this)}.
|
||||||
* @param duration the duration
|
* @param duration the duration
|
||||||
* @return the converted duration
|
* @return the converted duration
|
||||||
* @see #convert
|
|
||||||
* @since 1.6
|
* @since 1.6
|
||||||
*/
|
*/
|
||||||
public long toDays(long duration) {
|
public long toDays(long duration) {
|
||||||
@ -294,9 +293,9 @@ public enum TimeUnit {
|
|||||||
* Performs a timed {@link Object#wait(long, int) Object.wait}
|
* Performs a timed {@link Object#wait(long, int) Object.wait}
|
||||||
* using this time unit.
|
* using this time unit.
|
||||||
* This is a convenience method that converts timeout arguments
|
* This is a convenience method that converts timeout arguments
|
||||||
* into the form required by the <tt>Object.wait</tt> method.
|
* into the form required by the {@code Object.wait} method.
|
||||||
*
|
*
|
||||||
* <p>For example, you could implement a blocking <tt>poll</tt>
|
* <p>For example, you could implement a blocking {@code poll}
|
||||||
* method (see {@link BlockingQueue#poll BlockingQueue.poll})
|
* method (see {@link BlockingQueue#poll BlockingQueue.poll})
|
||||||
* using:
|
* using:
|
||||||
*
|
*
|
||||||
@ -327,7 +326,7 @@ public enum TimeUnit {
|
|||||||
* Performs a timed {@link Thread#join(long, int) Thread.join}
|
* Performs a timed {@link Thread#join(long, int) Thread.join}
|
||||||
* using this time unit.
|
* using this time unit.
|
||||||
* This is a convenience method that converts time arguments into the
|
* This is a convenience method that converts time arguments into the
|
||||||
* form required by the <tt>Thread.join</tt> method.
|
* form required by the {@code Thread.join} method.
|
||||||
*
|
*
|
||||||
* @param thread the thread to wait for
|
* @param thread the thread to wait for
|
||||||
* @param timeout the maximum time to wait. If less than
|
* @param timeout the maximum time to wait. If less than
|
||||||
@ -347,7 +346,7 @@ public enum TimeUnit {
|
|||||||
* Performs a {@link Thread#sleep(long, int) Thread.sleep} using
|
* Performs a {@link Thread#sleep(long, int) Thread.sleep} using
|
||||||
* this time unit.
|
* this time unit.
|
||||||
* This is a convenience method that converts time arguments into the
|
* This is a convenience method that converts time arguments into the
|
||||||
* form required by the <tt>Thread.sleep</tt> method.
|
* form required by the {@code Thread.sleep} method.
|
||||||
*
|
*
|
||||||
* @param timeout the minimum time to sleep. If less than
|
* @param timeout the minimum time to sleep. If less than
|
||||||
* or equal to zero, do not sleep at all.
|
* or equal to zero, do not sleep at all.
|
||||||
|
@ -40,7 +40,7 @@ package java.util.concurrent;
|
|||||||
* operations for which a timeout is specified need a means to
|
* operations for which a timeout is specified need a means to
|
||||||
* indicate that the timeout has occurred. For many such operations it
|
* indicate that the timeout has occurred. For many such operations it
|
||||||
* is possible to return a value that indicates timeout; when that is
|
* is possible to return a value that indicates timeout; when that is
|
||||||
* not possible or desirable then <tt>TimeoutException</tt> should be
|
* not possible or desirable then {@code TimeoutException} should be
|
||||||
* declared and thrown.
|
* declared and thrown.
|
||||||
*
|
*
|
||||||
* @since 1.5
|
* @since 1.5
|
||||||
@ -50,13 +50,13 @@ public class TimeoutException extends Exception {
|
|||||||
private static final long serialVersionUID = 1900926677490660714L;
|
private static final long serialVersionUID = 1900926677490660714L;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructs a <tt>TimeoutException</tt> with no specified detail
|
* Constructs a {@code TimeoutException} with no specified detail
|
||||||
* message.
|
* message.
|
||||||
*/
|
*/
|
||||||
public TimeoutException() {}
|
public TimeoutException() {}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructs a <tt>TimeoutException</tt> with the specified detail
|
* Constructs a {@code TimeoutException} with the specified detail
|
||||||
* message.
|
* message.
|
||||||
*
|
*
|
||||||
* @param message the detail message
|
* @param message the detail message
|
||||||
|
@ -48,7 +48,7 @@
|
|||||||
*
|
*
|
||||||
* {@link java.util.concurrent.Executor} is a simple standardized
|
* {@link java.util.concurrent.Executor} is a simple standardized
|
||||||
* interface for defining custom thread-like subsystems, including
|
* interface for defining custom thread-like subsystems, including
|
||||||
* thread pools, asynchronous IO, and lightweight task frameworks.
|
* thread pools, asynchronous I/O, and lightweight task frameworks.
|
||||||
* Depending on which concrete Executor class is being used, tasks may
|
* Depending on which concrete Executor class is being used, tasks may
|
||||||
* execute in a newly created thread, an existing task-execution thread,
|
* execute in a newly created thread, an existing task-execution thread,
|
||||||
* or the thread calling {@link java.util.concurrent.Executor#execute
|
* or the thread calling {@link java.util.concurrent.Executor#execute
|
||||||
@ -102,8 +102,10 @@
|
|||||||
* <h2>Queues</h2>
|
* <h2>Queues</h2>
|
||||||
*
|
*
|
||||||
* The {@link java.util.concurrent.ConcurrentLinkedQueue} class
|
* The {@link java.util.concurrent.ConcurrentLinkedQueue} class
|
||||||
* supplies an efficient scalable thread-safe non-blocking FIFO
|
* supplies an efficient scalable thread-safe non-blocking FIFO queue.
|
||||||
* queue.
|
* The {@link java.util.concurrent.ConcurrentLinkedDeque} class is
|
||||||
|
* similar, but additionally supports the {@link java.util.Deque}
|
||||||
|
* interface.
|
||||||
*
|
*
|
||||||
* <p>Five implementations in {@code java.util.concurrent} support
|
* <p>Five implementations in {@code java.util.concurrent} support
|
||||||
* the extended {@link java.util.concurrent.BlockingQueue}
|
* the extended {@link java.util.concurrent.BlockingQueue}
|
||||||
@ -117,7 +119,7 @@
|
|||||||
* for producer-consumer, messaging, parallel tasking, and
|
* for producer-consumer, messaging, parallel tasking, and
|
||||||
* related concurrent designs.
|
* related concurrent designs.
|
||||||
*
|
*
|
||||||
* <p> Extended interface {@link java.util.concurrent.TransferQueue},
|
* <p>Extended interface {@link java.util.concurrent.TransferQueue},
|
||||||
* and implementation {@link java.util.concurrent.LinkedTransferQueue}
|
* and implementation {@link java.util.concurrent.LinkedTransferQueue}
|
||||||
* introduce a synchronous {@code transfer} method (along with related
|
* introduce a synchronous {@code transfer} method (along with related
|
||||||
* features) in which a producer may optionally block awaiting its
|
* features) in which a producer may optionally block awaiting its
|
||||||
@ -216,9 +218,9 @@
|
|||||||
* it may (or may not) reflect any updates since the iterator was
|
* it may (or may not) reflect any updates since the iterator was
|
||||||
* created.
|
* created.
|
||||||
*
|
*
|
||||||
* <h2><a name="MemoryVisibility">Memory Consistency Properties</a></h2>
|
* <h2 id="MemoryVisibility">Memory Consistency Properties</h2>
|
||||||
*
|
*
|
||||||
* <a href="http://docs.oracle.com/javase/specs/jls/se7/html/index.html">
|
* <a href="http://docs.oracle.com/javase/specs/jls/se7/html/jls-17.html#jls-17.4.5">
|
||||||
* Chapter 17 of the Java Language Specification</a> defines the
|
* Chapter 17 of the Java Language Specification</a> defines the
|
||||||
* <i>happens-before</i> relation on memory operations such as reads and
|
* <i>happens-before</i> relation on memory operations such as reads and
|
||||||
* writes of shared variables. The results of a write by one thread are
|
* writes of shared variables. The results of a write by one thread are
|
||||||
|
Loading…
Reference in New Issue
Block a user