Merge
This commit is contained in:
commit
b20e230f81
@ -286,11 +286,18 @@ JAVA_JAVA_java = \
|
||||
java/util/concurrent/ExecutorService.java \
|
||||
java/util/concurrent/ExecutorCompletionService.java \
|
||||
java/util/concurrent/Executors.java \
|
||||
java/util/concurrent/ForkJoinPool.java \
|
||||
java/util/concurrent/ForkJoinTask.java \
|
||||
java/util/concurrent/ForkJoinWorkerThread.java \
|
||||
java/util/concurrent/Future.java \
|
||||
java/util/concurrent/FutureTask.java \
|
||||
java/util/concurrent/LinkedBlockingDeque.java \
|
||||
java/util/concurrent/LinkedBlockingQueue.java \
|
||||
java/util/concurrent/LinkedTransferQueue.java \
|
||||
java/util/concurrent/Phaser.java \
|
||||
java/util/concurrent/PriorityBlockingQueue.java \
|
||||
java/util/concurrent/RecursiveAction.java \
|
||||
java/util/concurrent/RecursiveTask.java \
|
||||
java/util/concurrent/RejectedExecutionException.java \
|
||||
java/util/concurrent/RejectedExecutionHandler.java \
|
||||
java/util/concurrent/RunnableFuture.java \
|
||||
@ -301,9 +308,11 @@ JAVA_JAVA_java = \
|
||||
java/util/concurrent/Semaphore.java \
|
||||
java/util/concurrent/SynchronousQueue.java \
|
||||
java/util/concurrent/ThreadFactory.java \
|
||||
java/util/concurrent/ThreadLocalRandom.java \
|
||||
java/util/concurrent/ThreadPoolExecutor.java \
|
||||
java/util/concurrent/TimeUnit.java \
|
||||
java/util/concurrent/TimeoutException.java \
|
||||
java/util/concurrent/TransferQueue.java \
|
||||
java/util/concurrent/atomic/AtomicBoolean.java \
|
||||
java/util/concurrent/atomic/AtomicInteger.java \
|
||||
java/util/concurrent/atomic/AtomicIntegerArray.java \
|
||||
|
@ -256,9 +256,8 @@ public abstract class AbstractList<E> extends AbstractCollection<E> implements L
|
||||
public boolean addAll(int index, Collection<? extends E> c) {
|
||||
rangeCheckForAdd(index);
|
||||
boolean modified = false;
|
||||
Iterator<? extends E> e = c.iterator();
|
||||
while (e.hasNext()) {
|
||||
add(index++, e.next());
|
||||
for (E e : c) {
|
||||
add(index++, e);
|
||||
modified = true;
|
||||
}
|
||||
return modified;
|
||||
|
@ -183,11 +183,9 @@ public abstract class AbstractQueue<E>
|
||||
if (c == this)
|
||||
throw new IllegalArgumentException();
|
||||
boolean modified = false;
|
||||
Iterator<? extends E> e = c.iterator();
|
||||
while (e.hasNext()) {
|
||||
if (add(e.next()))
|
||||
for (E e : c)
|
||||
if (add(e))
|
||||
modified = true;
|
||||
}
|
||||
return modified;
|
||||
}
|
||||
|
||||
|
@ -448,11 +448,9 @@ public class HashMap<K,V>
|
||||
}
|
||||
|
||||
private void putAllForCreate(Map<? extends K, ? extends V> m) {
|
||||
for (Iterator<? extends Map.Entry<? extends K, ? extends V>> i = m.entrySet().iterator(); i.hasNext(); ) {
|
||||
Map.Entry<? extends K, ? extends V> e = i.next();
|
||||
for (Map.Entry<? extends K, ? extends V> e : m.entrySet())
|
||||
putForCreate(e.getKey(), e.getValue());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Rehashes the contents of this map into a new array with a
|
||||
@ -536,11 +534,9 @@ public class HashMap<K,V>
|
||||
resize(newCapacity);
|
||||
}
|
||||
|
||||
for (Iterator<? extends Map.Entry<? extends K, ? extends V>> i = m.entrySet().iterator(); i.hasNext(); ) {
|
||||
Map.Entry<? extends K, ? extends V> e = i.next();
|
||||
for (Map.Entry<? extends K, ? extends V> e : m.entrySet())
|
||||
put(e.getKey(), e.getValue());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes the mapping for the specified key from this map if present.
|
||||
|
@ -280,8 +280,8 @@ public class HashSet<E>
|
||||
s.writeInt(map.size());
|
||||
|
||||
// Write out all elements in the proper order.
|
||||
for (Iterator i=map.keySet().iterator(); i.hasNext(); )
|
||||
s.writeObject(i.next());
|
||||
for (E e : map.keySet())
|
||||
s.writeObject(e);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -50,6 +50,18 @@ import sun.misc.Unsafe;
|
||||
* <p>
|
||||
* Many applications will find the method {@link Math#random} simpler to use.
|
||||
*
|
||||
* <p>Instances of {@code java.util.Random} are threadsafe.
|
||||
* However, the concurrent use of the same {@code java.util.Random}
|
||||
* instance across threads may encounter contention and consequent
|
||||
* poor performance. Consider instead using
|
||||
* {@link java.util.concurrent.ThreadLocalRandom} in multithreaded
|
||||
* designs.
|
||||
*
|
||||
* <p>Instances of {@code java.util.Random} are not cryptographically
|
||||
* secure. Consider instead using {@link java.security.SecureRandom} to
|
||||
* get a cryptographically secure pseudo-random number generator for use
|
||||
* by security-sensitive applications.
|
||||
*
|
||||
* @author Frank Yellin
|
||||
* @since 1.0
|
||||
*/
|
||||
|
@ -218,8 +218,8 @@ public class ArrayBlockingQueue<E> extends AbstractQueue<E>
|
||||
if (capacity < c.size())
|
||||
throw new IllegalArgumentException();
|
||||
|
||||
for (Iterator<? extends E> it = c.iterator(); it.hasNext();)
|
||||
add(it.next());
|
||||
for (E e : c)
|
||||
add(e);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -250,8 +250,8 @@ public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
|
||||
* of its elements are null
|
||||
*/
|
||||
public ConcurrentLinkedQueue(Collection<? extends E> c) {
|
||||
for (Iterator<? extends E> it = c.iterator(); it.hasNext();)
|
||||
add(it.next());
|
||||
for (E e : c)
|
||||
add(e);
|
||||
}
|
||||
|
||||
// Have to override just to update the javadoc
|
||||
|
@ -895,7 +895,7 @@ public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V>
|
||||
if (n != null) {
|
||||
Node<K,V> f = n.next;
|
||||
if (n != b.next) // inconsistent read
|
||||
break;;
|
||||
break;
|
||||
Object v = n.value;
|
||||
if (v == null) { // n is deleted
|
||||
n.helpDelete(b, f);
|
||||
|
@ -148,7 +148,8 @@ import java.util.concurrent.atomic.*;
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
* <p>Memory consistency effects: Actions in a thread prior to calling
|
||||
* <p>Memory consistency effects: Until the count reaches
|
||||
* zero, actions in a thread prior to calling
|
||||
* {@code countDown()}
|
||||
* <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
|
||||
* actions following a successful return from a corresponding
|
||||
|
@ -332,8 +332,8 @@ public interface ExecutorService extends Executor {
|
||||
* @param tasks the collection of tasks
|
||||
* @return the result returned by one of the tasks
|
||||
* @throws InterruptedException if interrupted while waiting
|
||||
* @throws NullPointerException if tasks or any of its elements
|
||||
* are <tt>null</tt>
|
||||
* @throws NullPointerException if tasks or any element task
|
||||
* subject to execution is <tt>null</tt>
|
||||
* @throws IllegalArgumentException if tasks is empty
|
||||
* @throws ExecutionException if no task successfully completes
|
||||
* @throws RejectedExecutionException if tasks cannot be scheduled
|
||||
@ -356,8 +356,8 @@ public interface ExecutorService extends Executor {
|
||||
* @param unit the time unit of the timeout argument
|
||||
* @return the result returned by one of the tasks.
|
||||
* @throws InterruptedException if interrupted while waiting
|
||||
* @throws NullPointerException if tasks, any of its elements, or
|
||||
* unit are <tt>null</tt>
|
||||
* @throws NullPointerException if tasks, or unit, or any element
|
||||
* task subject to execution is <tt>null</tt>
|
||||
* @throws TimeoutException if the given timeout elapses before
|
||||
* any task successfully completes
|
||||
* @throws ExecutionException if no task successfully completes
|
||||
|
1988
jdk/src/share/classes/java/util/concurrent/ForkJoinPool.java
Normal file
1988
jdk/src/share/classes/java/util/concurrent/ForkJoinPool.java
Normal file
File diff suppressed because it is too large
Load Diff
1292
jdk/src/share/classes/java/util/concurrent/ForkJoinTask.java
Normal file
1292
jdk/src/share/classes/java/util/concurrent/ForkJoinTask.java
Normal file
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,827 @@
|
||||
/*
|
||||
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
||||
*
|
||||
* This code is free software; you can redistribute it and/or modify it
|
||||
* under the terms of the GNU General Public License version 2 only, as
|
||||
* published by the Free Software Foundation. Sun designates this
|
||||
* particular file as subject to the "Classpath" exception as provided
|
||||
* by Sun in the LICENSE file that accompanied this code.
|
||||
*
|
||||
* This code is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
|
||||
* version 2 for more details (a copy is included in the LICENSE file that
|
||||
* accompanied this code).
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License version
|
||||
* 2 along with this work; if not, write to the Free Software Foundation,
|
||||
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||
*
|
||||
* Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
|
||||
* CA 95054 USA or visit www.sun.com if you need additional information or
|
||||
* have any questions.
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is available under and governed by the GNU General Public
|
||||
* License version 2 only, as published by the Free Software Foundation.
|
||||
* However, the following notice accompanied the original version of this
|
||||
* file:
|
||||
*
|
||||
* Written by Doug Lea with assistance from members of JCP JSR-166
|
||||
* Expert Group and released to the public domain, as explained at
|
||||
* http://creativecommons.org/licenses/publicdomain
|
||||
*/
|
||||
|
||||
package java.util.concurrent;
|
||||
|
||||
import java.util.Collection;
|
||||
|
||||
/**
|
||||
* A thread managed by a {@link ForkJoinPool}. This class is
|
||||
* subclassable solely for the sake of adding functionality -- there
|
||||
* are no overridable methods dealing with scheduling or execution.
|
||||
* However, you can override initialization and termination methods
|
||||
* surrounding the main task processing loop. If you do create such a
|
||||
* subclass, you will also need to supply a custom {@link
|
||||
* ForkJoinPool.ForkJoinWorkerThreadFactory} to use it in a {@code
|
||||
* ForkJoinPool}.
|
||||
*
|
||||
* @since 1.7
|
||||
* @author Doug Lea
|
||||
*/
|
||||
public class ForkJoinWorkerThread extends Thread {
|
||||
/*
|
||||
* Algorithm overview:
|
||||
*
|
||||
* 1. Work-Stealing: Work-stealing queues are special forms of
|
||||
* Deques that support only three of the four possible
|
||||
* end-operations -- push, pop, and deq (aka steal), and only do
|
||||
* so under the constraints that push and pop are called only from
|
||||
* the owning thread, while deq may be called from other threads.
|
||||
* (If you are unfamiliar with them, you probably want to read
|
||||
* Herlihy and Shavit's book "The Art of Multiprocessor
|
||||
* programming", chapter 16 describing these in more detail before
|
||||
* proceeding.) The main work-stealing queue design is roughly
|
||||
* similar to "Dynamic Circular Work-Stealing Deque" by David
|
||||
* Chase and Yossi Lev, SPAA 2005
|
||||
* (http://research.sun.com/scalable/pubs/index.html). The main
|
||||
* difference ultimately stems from gc requirements that we null
|
||||
* out taken slots as soon as we can, to maintain as small a
|
||||
* footprint as possible even in programs generating huge numbers
|
||||
* of tasks. To accomplish this, we shift the CAS arbitrating pop
|
||||
* vs deq (steal) from being on the indices ("base" and "sp") to
|
||||
* the slots themselves (mainly via method "casSlotNull()"). So,
|
||||
* both a successful pop and deq mainly entail CAS'ing a non-null
|
||||
* slot to null. Because we rely on CASes of references, we do
|
||||
* not need tag bits on base or sp. They are simple ints as used
|
||||
* in any circular array-based queue (see for example ArrayDeque).
|
||||
* Updates to the indices must still be ordered in a way that
|
||||
* guarantees that (sp - base) > 0 means the queue is empty, but
|
||||
* otherwise may err on the side of possibly making the queue
|
||||
* appear nonempty when a push, pop, or deq have not fully
|
||||
* committed. Note that this means that the deq operation,
|
||||
* considered individually, is not wait-free. One thief cannot
|
||||
* successfully continue until another in-progress one (or, if
|
||||
* previously empty, a push) completes. However, in the
|
||||
* aggregate, we ensure at least probabilistic
|
||||
* non-blockingness. If an attempted steal fails, a thief always
|
||||
* chooses a different random victim target to try next. So, in
|
||||
* order for one thief to progress, it suffices for any
|
||||
* in-progress deq or new push on any empty queue to complete. One
|
||||
* reason this works well here is that apparently-nonempty often
|
||||
* means soon-to-be-stealable, which gives threads a chance to
|
||||
* activate if necessary before stealing (see below).
|
||||
*
|
||||
* This approach also enables support for "async mode" where local
|
||||
* task processing is in FIFO, not LIFO order; simply by using a
|
||||
* version of deq rather than pop when locallyFifo is true (as set
|
||||
* by the ForkJoinPool). This allows use in message-passing
|
||||
* frameworks in which tasks are never joined.
|
||||
*
|
||||
* Efficient implementation of this approach currently relies on
|
||||
* an uncomfortable amount of "Unsafe" mechanics. To maintain
|
||||
* correct orderings, reads and writes of variable base require
|
||||
* volatile ordering. Variable sp does not require volatile write
|
||||
* but needs cheaper store-ordering on writes. Because they are
|
||||
* protected by volatile base reads, reads of the queue array and
|
||||
* its slots do not need volatile load semantics, but writes (in
|
||||
* push) require store order and CASes (in pop and deq) require
|
||||
* (volatile) CAS semantics. (See "Idempotent work stealing" by
|
||||
* Michael, Saraswat, and Vechev, PPoPP 2009
|
||||
* http://portal.acm.org/citation.cfm?id=1504186 for an algorithm
|
||||
* with similar properties, but without support for nulling
|
||||
* slots.) Since these combinations aren't supported using
|
||||
* ordinary volatiles, the only way to accomplish these
|
||||
* efficiently is to use direct Unsafe calls. (Using external
|
||||
* AtomicIntegers and AtomicReferenceArrays for the indices and
|
||||
* array is significantly slower because of memory locality and
|
||||
* indirection effects.)
|
||||
*
|
||||
* Further, performance on most platforms is very sensitive to
|
||||
* placement and sizing of the (resizable) queue array. Even
|
||||
* though these queues don't usually become all that big, the
|
||||
* initial size must be large enough to counteract cache
|
||||
* contention effects across multiple queues (especially in the
|
||||
* presence of GC cardmarking). Also, to improve thread-locality,
|
||||
* queues are currently initialized immediately after the thread
|
||||
* gets the initial signal to start processing tasks. However,
|
||||
* all queue-related methods except pushTask are written in a way
|
||||
* that allows them to instead be lazily allocated and/or disposed
|
||||
* of when empty. All together, these low-level implementation
|
||||
* choices produce as much as a factor of 4 performance
|
||||
* improvement compared to naive implementations, and enable the
|
||||
* processing of billions of tasks per second, sometimes at the
|
||||
* expense of ugliness.
|
||||
*
|
||||
* 2. Run control: The primary run control is based on a global
|
||||
* counter (activeCount) held by the pool. It uses an algorithm
|
||||
* similar to that in Herlihy and Shavit section 17.6 to cause
|
||||
* threads to eventually block when all threads declare they are
|
||||
* inactive. For this to work, threads must be declared active
|
||||
* when executing tasks, and before stealing a task. They must be
|
||||
* inactive before blocking on the Pool Barrier (awaiting a new
|
||||
* submission or other Pool event). In between, there is some free
|
||||
* play which we take advantage of to avoid contention and rapid
|
||||
* flickering of the global activeCount: If inactive, we activate
|
||||
* only if a victim queue appears to be nonempty (see above).
|
||||
* Similarly, a thread tries to inactivate only after a full scan
|
||||
* of other threads. The net effect is that contention on
|
||||
* activeCount is rarely a measurable performance issue. (There
|
||||
* are also a few other cases where we scan for work rather than
|
||||
* retry/block upon contention.)
|
||||
*
|
||||
* 3. Selection control. We maintain policy of always choosing to
|
||||
* run local tasks rather than stealing, and always trying to
|
||||
* steal tasks before trying to run a new submission. All steals
|
||||
* are currently performed in randomly-chosen deq-order. It may be
|
||||
* worthwhile to bias these with locality / anti-locality
|
||||
* information, but doing this well probably requires more
|
||||
* lower-level information from JVMs than currently provided.
|
||||
*/
|
||||
|
||||
/**
|
||||
* Capacity of work-stealing queue array upon initialization.
|
||||
* Must be a power of two. Initial size must be at least 2, but is
|
||||
* padded to minimize cache effects.
|
||||
*/
|
||||
private static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
|
||||
|
||||
/**
|
||||
* Maximum work-stealing queue array size. Must be less than or
|
||||
* equal to 1 << 28 to ensure lack of index wraparound. (This
|
||||
* is less than usual bounds, because we need leftshift by 3
|
||||
* to be in int range).
|
||||
*/
|
||||
private static final int MAXIMUM_QUEUE_CAPACITY = 1 << 28;
|
||||
|
||||
/**
|
||||
* The pool this thread works in. Accessed directly by ForkJoinTask.
|
||||
*/
|
||||
final ForkJoinPool pool;
|
||||
|
||||
/**
|
||||
* The work-stealing queue array. Size must be a power of two.
|
||||
* Initialized when thread starts, to improve memory locality.
|
||||
*/
|
||||
private ForkJoinTask<?>[] queue;
|
||||
|
||||
/**
|
||||
* Index (mod queue.length) of next queue slot to push to or pop
|
||||
* from. It is written only by owner thread, via ordered store.
|
||||
* Both sp and base are allowed to wrap around on overflow, but
|
||||
* (sp - base) still estimates size.
|
||||
*/
|
||||
private volatile int sp;
|
||||
|
||||
/**
|
||||
* Index (mod queue.length) of least valid queue slot, which is
|
||||
* always the next position to steal from if nonempty.
|
||||
*/
|
||||
private volatile int base;
|
||||
|
||||
/**
|
||||
* Activity status. When true, this worker is considered active.
|
||||
* Must be false upon construction. It must be true when executing
|
||||
* tasks, and BEFORE stealing a task. It must be false before
|
||||
* calling pool.sync.
|
||||
*/
|
||||
private boolean active;
|
||||
|
||||
/**
|
||||
* Run state of this worker. Supports simple versions of the usual
|
||||
* shutdown/shutdownNow control.
|
||||
*/
|
||||
private volatile int runState;
|
||||
|
||||
/**
|
||||
* Seed for random number generator for choosing steal victims.
|
||||
* Uses Marsaglia xorshift. Must be nonzero upon initialization.
|
||||
*/
|
||||
private int seed;
|
||||
|
||||
/**
|
||||
* Number of steals, transferred to pool when idle
|
||||
*/
|
||||
private int stealCount;
|
||||
|
||||
/**
|
||||
* Index of this worker in pool array. Set once by pool before
|
||||
* running, and accessed directly by pool during cleanup etc.
|
||||
*/
|
||||
int poolIndex;
|
||||
|
||||
/**
|
||||
* The last barrier event waited for. Accessed in pool callback
|
||||
* methods, but only by current thread.
|
||||
*/
|
||||
long lastEventCount;
|
||||
|
||||
/**
|
||||
* True if use local fifo, not default lifo, for local polling
|
||||
*/
|
||||
private boolean locallyFifo;
|
||||
|
||||
/**
|
||||
* Creates a ForkJoinWorkerThread operating in the given pool.
|
||||
*
|
||||
* @param pool the pool this thread works in
|
||||
* @throws NullPointerException if pool is null
|
||||
*/
|
||||
protected ForkJoinWorkerThread(ForkJoinPool pool) {
|
||||
if (pool == null) throw new NullPointerException();
|
||||
this.pool = pool;
|
||||
// Note: poolIndex is set by pool during construction
|
||||
// Remaining initialization is deferred to onStart
|
||||
}
|
||||
|
||||
// Public access methods
|
||||
|
||||
/**
|
||||
* Returns the pool hosting this thread.
|
||||
*
|
||||
* @return the pool
|
||||
*/
|
||||
public ForkJoinPool getPool() {
|
||||
return pool;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the index number of this thread in its pool. The
|
||||
* returned value ranges from zero to the maximum number of
|
||||
* threads (minus one) that have ever been created in the pool.
|
||||
* This method may be useful for applications that track status or
|
||||
* collect results per-worker rather than per-task.
|
||||
*
|
||||
* @return the index number
|
||||
*/
|
||||
public int getPoolIndex() {
|
||||
return poolIndex;
|
||||
}
|
||||
|
||||
/**
|
||||
* Establishes local first-in-first-out scheduling mode for forked
|
||||
* tasks that are never joined.
|
||||
*
|
||||
* @param async if true, use locally FIFO scheduling
|
||||
*/
|
||||
void setAsyncMode(boolean async) {
|
||||
locallyFifo = async;
|
||||
}
|
||||
|
||||
// Runstate management
|
||||
|
||||
// Runstate values. Order matters
|
||||
private static final int RUNNING = 0;
|
||||
private static final int SHUTDOWN = 1;
|
||||
private static final int TERMINATING = 2;
|
||||
private static final int TERMINATED = 3;
|
||||
|
||||
final boolean isShutdown() { return runState >= SHUTDOWN; }
|
||||
final boolean isTerminating() { return runState >= TERMINATING; }
|
||||
final boolean isTerminated() { return runState == TERMINATED; }
|
||||
final boolean shutdown() { return transitionRunStateTo(SHUTDOWN); }
|
||||
final boolean shutdownNow() { return transitionRunStateTo(TERMINATING); }
|
||||
|
||||
/**
|
||||
* Transitions to at least the given state.
|
||||
*
|
||||
* @return {@code true} if not already at least at given state
|
||||
*/
|
||||
private boolean transitionRunStateTo(int state) {
|
||||
for (;;) {
|
||||
int s = runState;
|
||||
if (s >= state)
|
||||
return false;
|
||||
if (UNSAFE.compareAndSwapInt(this, runStateOffset, s, state))
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Tries to set status to active; fails on contention.
|
||||
*/
|
||||
private boolean tryActivate() {
|
||||
if (!active) {
|
||||
if (!pool.tryIncrementActiveCount())
|
||||
return false;
|
||||
active = true;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Tries to set status to inactive; fails on contention.
|
||||
*/
|
||||
private boolean tryInactivate() {
|
||||
if (active) {
|
||||
if (!pool.tryDecrementActiveCount())
|
||||
return false;
|
||||
active = false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Computes next value for random victim probe. Scans don't
|
||||
* require a very high quality generator, but also not a crummy
|
||||
* one. Marsaglia xor-shift is cheap and works well.
|
||||
*/
|
||||
private static int xorShift(int r) {
|
||||
r ^= (r << 13);
|
||||
r ^= (r >>> 17);
|
||||
return r ^ (r << 5);
|
||||
}
|
||||
|
||||
// Lifecycle methods
|
||||
|
||||
/**
|
||||
* This method is required to be public, but should never be
|
||||
* called explicitly. It performs the main run loop to execute
|
||||
* ForkJoinTasks.
|
||||
*/
|
||||
public void run() {
|
||||
Throwable exception = null;
|
||||
try {
|
||||
onStart();
|
||||
pool.sync(this); // await first pool event
|
||||
mainLoop();
|
||||
} catch (Throwable ex) {
|
||||
exception = ex;
|
||||
} finally {
|
||||
onTermination(exception);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes tasks until shut down.
|
||||
*/
|
||||
private void mainLoop() {
|
||||
while (!isShutdown()) {
|
||||
ForkJoinTask<?> t = pollTask();
|
||||
if (t != null || (t = pollSubmission()) != null)
|
||||
t.quietlyExec();
|
||||
else if (tryInactivate())
|
||||
pool.sync(this);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Initializes internal state after construction but before
|
||||
* processing any tasks. If you override this method, you must
|
||||
* invoke super.onStart() at the beginning of the method.
|
||||
* Initialization requires care: Most fields must have legal
|
||||
* default values, to ensure that attempted accesses from other
|
||||
* threads work correctly even before this thread starts
|
||||
* processing tasks.
|
||||
*/
|
||||
protected void onStart() {
|
||||
// Allocate while starting to improve chances of thread-local
|
||||
// isolation
|
||||
queue = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
|
||||
// Initial value of seed need not be especially random but
|
||||
// should differ across workers and must be nonzero
|
||||
int p = poolIndex + 1;
|
||||
seed = p + (p << 8) + (p << 16) + (p << 24); // spread bits
|
||||
}
|
||||
|
||||
/**
|
||||
* Performs cleanup associated with termination of this worker
|
||||
* thread. If you override this method, you must invoke
|
||||
* {@code super.onTermination} at the end of the overridden method.
|
||||
*
|
||||
* @param exception the exception causing this thread to abort due
|
||||
* to an unrecoverable error, or {@code null} if completed normally
|
||||
*/
|
||||
protected void onTermination(Throwable exception) {
|
||||
// Execute remaining local tasks unless aborting or terminating
|
||||
while (exception == null && pool.isProcessingTasks() && base != sp) {
|
||||
try {
|
||||
ForkJoinTask<?> t = popTask();
|
||||
if (t != null)
|
||||
t.quietlyExec();
|
||||
} catch (Throwable ex) {
|
||||
exception = ex;
|
||||
}
|
||||
}
|
||||
// Cancel other tasks, transition status, notify pool, and
|
||||
// propagate exception to uncaught exception handler
|
||||
try {
|
||||
do {} while (!tryInactivate()); // ensure inactive
|
||||
cancelTasks();
|
||||
runState = TERMINATED;
|
||||
pool.workerTerminated(this);
|
||||
} catch (Throwable ex) { // Shouldn't ever happen
|
||||
if (exception == null) // but if so, at least rethrown
|
||||
exception = ex;
|
||||
} finally {
|
||||
if (exception != null)
|
||||
ForkJoinTask.rethrowException(exception);
|
||||
}
|
||||
}
|
||||
|
||||
// Intrinsics-based support for queue operations.
|
||||
|
||||
private static long slotOffset(int i) {
|
||||
return ((long) i << qShift) + qBase;
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds in store-order the given task at given slot of q to null.
|
||||
* Caller must ensure q is non-null and index is in range.
|
||||
*/
|
||||
private static void setSlot(ForkJoinTask<?>[] q, int i,
|
||||
ForkJoinTask<?> t) {
|
||||
UNSAFE.putOrderedObject(q, slotOffset(i), t);
|
||||
}
|
||||
|
||||
/**
|
||||
* CAS given slot of q to null. Caller must ensure q is non-null
|
||||
* and index is in range.
|
||||
*/
|
||||
private static boolean casSlotNull(ForkJoinTask<?>[] q, int i,
|
||||
ForkJoinTask<?> t) {
|
||||
return UNSAFE.compareAndSwapObject(q, slotOffset(i), t, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets sp in store-order.
|
||||
*/
|
||||
private void storeSp(int s) {
|
||||
UNSAFE.putOrderedInt(this, spOffset, s);
|
||||
}
|
||||
|
||||
// Main queue methods
|
||||
|
||||
/**
|
||||
* Pushes a task. Called only by current thread.
|
||||
*
|
||||
* @param t the task. Caller must ensure non-null.
|
||||
*/
|
||||
final void pushTask(ForkJoinTask<?> t) {
|
||||
ForkJoinTask<?>[] q = queue;
|
||||
int mask = q.length - 1;
|
||||
int s = sp;
|
||||
setSlot(q, s & mask, t);
|
||||
storeSp(++s);
|
||||
if ((s -= base) == 1)
|
||||
pool.signalWork();
|
||||
else if (s >= mask)
|
||||
growQueue();
|
||||
}
|
||||
|
||||
/**
|
||||
* Tries to take a task from the base of the queue, failing if
|
||||
* either empty or contended.
|
||||
*
|
||||
* @return a task, or null if none or contended
|
||||
*/
|
||||
final ForkJoinTask<?> deqTask() {
|
||||
ForkJoinTask<?> t;
|
||||
ForkJoinTask<?>[] q;
|
||||
int i;
|
||||
int b;
|
||||
if (sp != (b = base) &&
|
||||
(q = queue) != null && // must read q after b
|
||||
(t = q[i = (q.length - 1) & b]) != null &&
|
||||
casSlotNull(q, i, t)) {
|
||||
base = b + 1;
|
||||
return t;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Tries to take a task from the base of own queue, activating if
|
||||
* necessary, failing only if empty. Called only by current thread.
|
||||
*
|
||||
* @return a task, or null if none
|
||||
*/
|
||||
final ForkJoinTask<?> locallyDeqTask() {
|
||||
int b;
|
||||
while (sp != (b = base)) {
|
||||
if (tryActivate()) {
|
||||
ForkJoinTask<?>[] q = queue;
|
||||
int i = (q.length - 1) & b;
|
||||
ForkJoinTask<?> t = q[i];
|
||||
if (t != null && casSlotNull(q, i, t)) {
|
||||
base = b + 1;
|
||||
return t;
|
||||
}
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a popped task, or null if empty. Ensures active status
|
||||
* if non-null. Called only by current thread.
|
||||
*/
|
||||
final ForkJoinTask<?> popTask() {
|
||||
int s = sp;
|
||||
while (s != base) {
|
||||
if (tryActivate()) {
|
||||
ForkJoinTask<?>[] q = queue;
|
||||
int mask = q.length - 1;
|
||||
int i = (s - 1) & mask;
|
||||
ForkJoinTask<?> t = q[i];
|
||||
if (t == null || !casSlotNull(q, i, t))
|
||||
break;
|
||||
storeSp(s - 1);
|
||||
return t;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Specialized version of popTask to pop only if
|
||||
* topmost element is the given task. Called only
|
||||
* by current thread while active.
|
||||
*
|
||||
* @param t the task. Caller must ensure non-null.
|
||||
*/
|
||||
final boolean unpushTask(ForkJoinTask<?> t) {
|
||||
ForkJoinTask<?>[] q = queue;
|
||||
int mask = q.length - 1;
|
||||
int s = sp - 1;
|
||||
if (casSlotNull(q, s & mask, t)) {
|
||||
storeSp(s);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns next task or null if empty or contended
|
||||
*/
|
||||
final ForkJoinTask<?> peekTask() {
|
||||
ForkJoinTask<?>[] q = queue;
|
||||
if (q == null)
|
||||
return null;
|
||||
int mask = q.length - 1;
|
||||
int i = locallyFifo ? base : (sp - 1);
|
||||
return q[i & mask];
|
||||
}
|
||||
|
||||
/**
|
||||
* Doubles queue array size. Transfers elements by emulating
|
||||
* steals (deqs) from old array and placing, oldest first, into
|
||||
* new array.
|
||||
*/
|
||||
private void growQueue() {
|
||||
ForkJoinTask<?>[] oldQ = queue;
|
||||
int oldSize = oldQ.length;
|
||||
int newSize = oldSize << 1;
|
||||
if (newSize > MAXIMUM_QUEUE_CAPACITY)
|
||||
throw new RejectedExecutionException("Queue capacity exceeded");
|
||||
ForkJoinTask<?>[] newQ = queue = new ForkJoinTask<?>[newSize];
|
||||
|
||||
int b = base;
|
||||
int bf = b + oldSize;
|
||||
int oldMask = oldSize - 1;
|
||||
int newMask = newSize - 1;
|
||||
do {
|
||||
int oldIndex = b & oldMask;
|
||||
ForkJoinTask<?> t = oldQ[oldIndex];
|
||||
if (t != null && !casSlotNull(oldQ, oldIndex, t))
|
||||
t = null;
|
||||
setSlot(newQ, b & newMask, t);
|
||||
} while (++b != bf);
|
||||
pool.signalWork();
|
||||
}
|
||||
|
||||
/**
|
||||
* Tries to steal a task from another worker. Starts at a random
|
||||
* index of workers array, and probes workers until finding one
|
||||
* with non-empty queue or finding that all are empty. It
|
||||
* randomly selects the first n probes. If these are empty, it
|
||||
* resorts to a full circular traversal, which is necessary to
|
||||
* accurately set active status by caller. Also restarts if pool
|
||||
* events occurred since last scan, which forces refresh of
|
||||
* workers array, in case barrier was associated with resize.
|
||||
*
|
||||
* This method must be both fast and quiet -- usually avoiding
|
||||
* memory accesses that could disrupt cache sharing etc other than
|
||||
* those needed to check for and take tasks. This accounts for,
|
||||
* among other things, updating random seed in place without
|
||||
* storing it until exit.
|
||||
*
|
||||
* @return a task, or null if none found
|
||||
*/
|
||||
private ForkJoinTask<?> scan() {
|
||||
ForkJoinTask<?> t = null;
|
||||
int r = seed; // extract once to keep scan quiet
|
||||
ForkJoinWorkerThread[] ws; // refreshed on outer loop
|
||||
int mask; // must be power 2 minus 1 and > 0
|
||||
outer:do {
|
||||
if ((ws = pool.workers) != null && (mask = ws.length - 1) > 0) {
|
||||
int idx = r;
|
||||
int probes = ~mask; // use random index while negative
|
||||
for (;;) {
|
||||
r = xorShift(r); // update random seed
|
||||
ForkJoinWorkerThread v = ws[mask & idx];
|
||||
if (v == null || v.sp == v.base) {
|
||||
if (probes <= mask)
|
||||
idx = (probes++ < 0) ? r : (idx + 1);
|
||||
else
|
||||
break;
|
||||
}
|
||||
else if (!tryActivate() || (t = v.deqTask()) == null)
|
||||
continue outer; // restart on contention
|
||||
else
|
||||
break outer;
|
||||
}
|
||||
}
|
||||
} while (pool.hasNewSyncEvent(this)); // retry on pool events
|
||||
seed = r;
|
||||
return t;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets and removes a local or stolen task.
|
||||
*
|
||||
* @return a task, if available
|
||||
*/
|
||||
final ForkJoinTask<?> pollTask() {
|
||||
ForkJoinTask<?> t = locallyFifo ? locallyDeqTask() : popTask();
|
||||
if (t == null && (t = scan()) != null)
|
||||
++stealCount;
|
||||
return t;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets a local task.
|
||||
*
|
||||
* @return a task, if available
|
||||
*/
|
||||
final ForkJoinTask<?> pollLocalTask() {
|
||||
return locallyFifo ? locallyDeqTask() : popTask();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a pool submission, if one exists, activating first.
|
||||
*
|
||||
* @return a submission, if available
|
||||
*/
|
||||
private ForkJoinTask<?> pollSubmission() {
|
||||
ForkJoinPool p = pool;
|
||||
while (p.hasQueuedSubmissions()) {
|
||||
ForkJoinTask<?> t;
|
||||
if (tryActivate() && (t = p.pollSubmission()) != null)
|
||||
return t;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
// Methods accessed only by Pool
|
||||
|
||||
/**
|
||||
* Removes and cancels all tasks in queue. Can be called from any
|
||||
* thread.
|
||||
*/
|
||||
final void cancelTasks() {
|
||||
ForkJoinTask<?> t;
|
||||
while (base != sp && (t = deqTask()) != null)
|
||||
t.cancelIgnoringExceptions();
|
||||
}
|
||||
|
||||
/**
|
||||
* Drains tasks to given collection c.
|
||||
*
|
||||
* @return the number of tasks drained
|
||||
*/
|
||||
final int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
|
||||
int n = 0;
|
||||
ForkJoinTask<?> t;
|
||||
while (base != sp && (t = deqTask()) != null) {
|
||||
c.add(t);
|
||||
++n;
|
||||
}
|
||||
return n;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets and clears steal count for accumulation by pool. Called
|
||||
* only when known to be idle (in pool.sync and termination).
|
||||
*/
|
||||
final int getAndClearStealCount() {
|
||||
int sc = stealCount;
|
||||
stealCount = 0;
|
||||
return sc;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns {@code true} if at least one worker in the given array
|
||||
* appears to have at least one queued task.
|
||||
*
|
||||
* @param ws array of workers
|
||||
*/
|
||||
static boolean hasQueuedTasks(ForkJoinWorkerThread[] ws) {
|
||||
if (ws != null) {
|
||||
int len = ws.length;
|
||||
for (int j = 0; j < 2; ++j) { // need two passes for clean sweep
|
||||
for (int i = 0; i < len; ++i) {
|
||||
ForkJoinWorkerThread w = ws[i];
|
||||
if (w != null && w.sp != w.base)
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
// Support methods for ForkJoinTask
|
||||
|
||||
/**
|
||||
* Returns an estimate of the number of tasks in the queue.
|
||||
*/
|
||||
final int getQueueSize() {
|
||||
// suppress momentarily negative values
|
||||
return Math.max(0, sp - base);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns an estimate of the number of tasks, offset by a
|
||||
* function of number of idle workers.
|
||||
*/
|
||||
final int getEstimatedSurplusTaskCount() {
|
||||
// The halving approximates weighting idle vs non-idle workers
|
||||
return (sp - base) - (pool.getIdleThreadCount() >>> 1);
|
||||
}
|
||||
|
||||
/**
|
||||
* Scans, returning early if joinMe done.
|
||||
*/
|
||||
final ForkJoinTask<?> scanWhileJoining(ForkJoinTask<?> joinMe) {
|
||||
ForkJoinTask<?> t = pollTask();
|
||||
if (t != null && joinMe.status < 0 && sp == base) {
|
||||
pushTask(t); // unsteal if done and this task would be stealable
|
||||
t = null;
|
||||
}
|
||||
return t;
|
||||
}
|
||||
|
||||
/**
|
||||
* Runs tasks until {@code pool.isQuiescent()}.
|
||||
*/
|
||||
final void helpQuiescePool() {
|
||||
for (;;) {
|
||||
ForkJoinTask<?> t = pollTask();
|
||||
if (t != null)
|
||||
t.quietlyExec();
|
||||
else if (tryInactivate() && pool.isQuiescent())
|
||||
break;
|
||||
}
|
||||
do {} while (!tryActivate()); // re-activate on exit
|
||||
}
|
||||
|
||||
// Unsafe mechanics
|
||||
|
||||
private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
|
||||
private static final long spOffset =
|
||||
objectFieldOffset("sp", ForkJoinWorkerThread.class);
|
||||
private static final long runStateOffset =
|
||||
objectFieldOffset("runState", ForkJoinWorkerThread.class);
|
||||
private static final long qBase;
|
||||
private static final int qShift;
|
||||
|
||||
static {
|
||||
qBase = UNSAFE.arrayBaseOffset(ForkJoinTask[].class);
|
||||
int s = UNSAFE.arrayIndexScale(ForkJoinTask[].class);
|
||||
if ((s & (s-1)) != 0)
|
||||
throw new Error("data type scale not a power of two");
|
||||
qShift = 31 - Integer.numberOfLeadingZeros(s);
|
||||
}
|
||||
|
||||
private static long objectFieldOffset(String field, Class<?> klazz) {
|
||||
try {
|
||||
return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
|
||||
} catch (NoSuchFieldException e) {
|
||||
// Convert Exception to corresponding Error
|
||||
NoSuchFieldError error = new NoSuchFieldError(field);
|
||||
error.initCause(e);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
}
|
1270
jdk/src/share/classes/java/util/concurrent/LinkedTransferQueue.java
Normal file
1270
jdk/src/share/classes/java/util/concurrent/LinkedTransferQueue.java
Normal file
File diff suppressed because it is too large
Load Diff
1042
jdk/src/share/classes/java/util/concurrent/Phaser.java
Normal file
1042
jdk/src/share/classes/java/util/concurrent/Phaser.java
Normal file
File diff suppressed because it is too large
Load Diff
179
jdk/src/share/classes/java/util/concurrent/RecursiveAction.java
Normal file
179
jdk/src/share/classes/java/util/concurrent/RecursiveAction.java
Normal file
@ -0,0 +1,179 @@
|
||||
/*
|
||||
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
||||
*
|
||||
* This code is free software; you can redistribute it and/or modify it
|
||||
* under the terms of the GNU General Public License version 2 only, as
|
||||
* published by the Free Software Foundation. Sun designates this
|
||||
* particular file as subject to the "Classpath" exception as provided
|
||||
* by Sun in the LICENSE file that accompanied this code.
|
||||
*
|
||||
* This code is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
|
||||
* version 2 for more details (a copy is included in the LICENSE file that
|
||||
* accompanied this code).
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License version
|
||||
* 2 along with this work; if not, write to the Free Software Foundation,
|
||||
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||
*
|
||||
* Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
|
||||
* CA 95054 USA or visit www.sun.com if you need additional information or
|
||||
* have any questions.
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is available under and governed by the GNU General Public
|
||||
* License version 2 only, as published by the Free Software Foundation.
|
||||
* However, the following notice accompanied the original version of this
|
||||
* file:
|
||||
*
|
||||
* Written by Doug Lea with assistance from members of JCP JSR-166
|
||||
* Expert Group and released to the public domain, as explained at
|
||||
* http://creativecommons.org/licenses/publicdomain
|
||||
*/
|
||||
|
||||
package java.util.concurrent;
|
||||
|
||||
/**
|
||||
* A recursive resultless {@link ForkJoinTask}. This class
|
||||
* establishes conventions to parameterize resultless actions as
|
||||
* {@code Void} {@code ForkJoinTask}s. Because {@code null} is the
|
||||
* only valid value of type {@code Void}, methods such as join always
|
||||
* return {@code null} upon completion.
|
||||
*
|
||||
* <p><b>Sample Usages.</b> Here is a sketch of a ForkJoin sort that
|
||||
* sorts a given {@code long[]} array:
|
||||
*
|
||||
* <pre> {@code
|
||||
* class SortTask extends RecursiveAction {
|
||||
* final long[] array; final int lo; final int hi;
|
||||
* SortTask(long[] array, int lo, int hi) {
|
||||
* this.array = array; this.lo = lo; this.hi = hi;
|
||||
* }
|
||||
* protected void compute() {
|
||||
* if (hi - lo < THRESHOLD)
|
||||
* sequentiallySort(array, lo, hi);
|
||||
* else {
|
||||
* int mid = (lo + hi) >>> 1;
|
||||
* invokeAll(new SortTask(array, lo, mid),
|
||||
* new SortTask(array, mid, hi));
|
||||
* merge(array, lo, hi);
|
||||
* }
|
||||
* }
|
||||
* }}</pre>
|
||||
*
|
||||
* You could then sort {@code anArray} by creating {@code new
|
||||
* SortTask(anArray, 0, anArray.length-1) } and invoking it in a
|
||||
* ForkJoinPool. As a more concrete simple example, the following
|
||||
* task increments each element of an array:
|
||||
* <pre> {@code
|
||||
* class IncrementTask extends RecursiveAction {
|
||||
* final long[] array; final int lo; final int hi;
|
||||
* IncrementTask(long[] array, int lo, int hi) {
|
||||
* this.array = array; this.lo = lo; this.hi = hi;
|
||||
* }
|
||||
* protected void compute() {
|
||||
* if (hi - lo < THRESHOLD) {
|
||||
* for (int i = lo; i < hi; ++i)
|
||||
* array[i]++;
|
||||
* }
|
||||
* else {
|
||||
* int mid = (lo + hi) >>> 1;
|
||||
* invokeAll(new IncrementTask(array, lo, mid),
|
||||
* new IncrementTask(array, mid, hi));
|
||||
* }
|
||||
* }
|
||||
* }}</pre>
|
||||
*
|
||||
* <p>The following example illustrates some refinements and idioms
|
||||
* that may lead to better performance: RecursiveActions need not be
|
||||
* fully recursive, so long as they maintain the basic
|
||||
* divide-and-conquer approach. Here is a class that sums the squares
|
||||
* of each element of a double array, by subdividing out only the
|
||||
* right-hand-sides of repeated divisions by two, and keeping track of
|
||||
* them with a chain of {@code next} references. It uses a dynamic
|
||||
* threshold based on method {@code getSurplusQueuedTaskCount}, but
|
||||
* counterbalances potential excess partitioning by directly
|
||||
* performing leaf actions on unstolen tasks rather than further
|
||||
* subdividing.
|
||||
*
|
||||
* <pre> {@code
|
||||
* double sumOfSquares(ForkJoinPool pool, double[] array) {
|
||||
* int n = array.length;
|
||||
* Applyer a = new Applyer(array, 0, n, null);
|
||||
* pool.invoke(a);
|
||||
* return a.result;
|
||||
* }
|
||||
*
|
||||
* class Applyer extends RecursiveAction {
|
||||
* final double[] array;
|
||||
* final int lo, hi;
|
||||
* double result;
|
||||
* Applyer next; // keeps track of right-hand-side tasks
|
||||
* Applyer(double[] array, int lo, int hi, Applyer next) {
|
||||
* this.array = array; this.lo = lo; this.hi = hi;
|
||||
* this.next = next;
|
||||
* }
|
||||
*
|
||||
* double atLeaf(int l, int h) {
|
||||
* double sum = 0;
|
||||
* for (int i = l; i < h; ++i) // perform leftmost base step
|
||||
* sum += array[i] * array[i];
|
||||
* return sum;
|
||||
* }
|
||||
*
|
||||
* protected void compute() {
|
||||
* int l = lo;
|
||||
* int h = hi;
|
||||
* Applyer right = null;
|
||||
* while (h - l > 1 && getSurplusQueuedTaskCount() <= 3) {
|
||||
* int mid = (l + h) >>> 1;
|
||||
* right = new Applyer(array, mid, h, right);
|
||||
* right.fork();
|
||||
* h = mid;
|
||||
* }
|
||||
* double sum = atLeaf(l, h);
|
||||
* while (right != null) {
|
||||
* if (right.tryUnfork()) // directly calculate if not stolen
|
||||
* sum += right.atLeaf(right.lo, right.hi);
|
||||
* else {
|
||||
* right.helpJoin();
|
||||
* sum += right.result;
|
||||
* }
|
||||
* right = right.next;
|
||||
* }
|
||||
* result = sum;
|
||||
* }
|
||||
* }}</pre>
|
||||
*
|
||||
* @since 1.7
|
||||
* @author Doug Lea
|
||||
*/
|
||||
public abstract class RecursiveAction extends ForkJoinTask<Void> {
|
||||
private static final long serialVersionUID = 5232453952276485070L;
|
||||
|
||||
/**
|
||||
* The main computation performed by this task.
|
||||
*/
|
||||
protected abstract void compute();
|
||||
|
||||
/**
|
||||
* Always returns null.
|
||||
*/
|
||||
public final Void getRawResult() { return null; }
|
||||
|
||||
/**
|
||||
* Requires null completion value.
|
||||
*/
|
||||
protected final void setRawResult(Void mustBeNull) { }
|
||||
|
||||
/**
|
||||
* Implements execution conventions for RecursiveActions.
|
||||
*/
|
||||
protected final boolean exec() {
|
||||
compute();
|
||||
return true;
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,97 @@
|
||||
/*
|
||||
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
||||
*
|
||||
* This code is free software; you can redistribute it and/or modify it
|
||||
* under the terms of the GNU General Public License version 2 only, as
|
||||
* published by the Free Software Foundation. Sun designates this
|
||||
* particular file as subject to the "Classpath" exception as provided
|
||||
* by Sun in the LICENSE file that accompanied this code.
|
||||
*
|
||||
* This code is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
|
||||
* version 2 for more details (a copy is included in the LICENSE file that
|
||||
* accompanied this code).
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License version
|
||||
* 2 along with this work; if not, write to the Free Software Foundation,
|
||||
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||
*
|
||||
* Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
|
||||
* CA 95054 USA or visit www.sun.com if you need additional information or
|
||||
* have any questions.
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is available under and governed by the GNU General Public
|
||||
* License version 2 only, as published by the Free Software Foundation.
|
||||
* However, the following notice accompanied the original version of this
|
||||
* file:
|
||||
*
|
||||
* Written by Doug Lea with assistance from members of JCP JSR-166
|
||||
* Expert Group and released to the public domain, as explained at
|
||||
* http://creativecommons.org/licenses/publicdomain
|
||||
*/
|
||||
|
||||
package java.util.concurrent;
|
||||
|
||||
/**
|
||||
* A recursive result-bearing {@link ForkJoinTask}.
|
||||
*
|
||||
* <p>For a classic example, here is a task computing Fibonacci numbers:
|
||||
*
|
||||
* <pre> {@code
|
||||
* class Fibonacci extends RecursiveTask<Integer> {
|
||||
* final int n;
|
||||
* Fibonacci(int n) { this.n = n; }
|
||||
* Integer compute() {
|
||||
* if (n <= 1)
|
||||
* return n;
|
||||
* Fibonacci f1 = new Fibonacci(n - 1);
|
||||
* f1.fork();
|
||||
* Fibonacci f2 = new Fibonacci(n - 2);
|
||||
* return f2.compute() + f1.join();
|
||||
* }
|
||||
* }}</pre>
|
||||
*
|
||||
* However, besides being a dumb way to compute Fibonacci functions
|
||||
* (there is a simple fast linear algorithm that you'd use in
|
||||
* practice), this is likely to perform poorly because the smallest
|
||||
* subtasks are too small to be worthwhile splitting up. Instead, as
|
||||
* is the case for nearly all fork/join applications, you'd pick some
|
||||
* minimum granularity size (for example 10 here) for which you always
|
||||
* sequentially solve rather than subdividing.
|
||||
*
|
||||
* @since 1.7
|
||||
* @author Doug Lea
|
||||
*/
|
||||
public abstract class RecursiveTask<V> extends ForkJoinTask<V> {
|
||||
private static final long serialVersionUID = 5232453952276485270L;
|
||||
|
||||
/**
|
||||
* The result of the computation.
|
||||
*/
|
||||
V result;
|
||||
|
||||
/**
|
||||
* The main computation performed by this task.
|
||||
*/
|
||||
protected abstract V compute();
|
||||
|
||||
public final V getRawResult() {
|
||||
return result;
|
||||
}
|
||||
|
||||
protected final void setRawResult(V value) {
|
||||
result = value;
|
||||
}
|
||||
|
||||
/**
|
||||
* Implements execution conventions for RecursiveTask.
|
||||
*/
|
||||
protected final boolean exec() {
|
||||
result = compute();
|
||||
return true;
|
||||
}
|
||||
|
||||
}
|
@ -61,6 +61,14 @@ import java.util.*;
|
||||
* causes tasks to be immediately removed from the work queue at
|
||||
* time of cancellation.
|
||||
*
|
||||
* <p>Successive executions of a task scheduled via
|
||||
* <code>scheduleAtFixedRate</code> or
|
||||
* <code>scheduleWithFixedDelay</code> do not overlap. While different
|
||||
* executions may be performed by different threads, the effects of
|
||||
* prior executions <a
|
||||
* href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
|
||||
* those of subsequent ones.
|
||||
*
|
||||
* <p>While this class inherits from {@link ThreadPoolExecutor}, a few
|
||||
* of the inherited tuning methods are not useful for it. In
|
||||
* particular, because it acts as a fixed-sized pool using
|
||||
|
@ -0,0 +1,228 @@
|
||||
/*
|
||||
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
||||
*
|
||||
* This code is free software; you can redistribute it and/or modify it
|
||||
* under the terms of the GNU General Public License version 2 only, as
|
||||
* published by the Free Software Foundation. Sun designates this
|
||||
* particular file as subject to the "Classpath" exception as provided
|
||||
* by Sun in the LICENSE file that accompanied this code.
|
||||
*
|
||||
* This code is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
|
||||
* version 2 for more details (a copy is included in the LICENSE file that
|
||||
* accompanied this code).
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License version
|
||||
* 2 along with this work; if not, write to the Free Software Foundation,
|
||||
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||
*
|
||||
* Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
|
||||
* CA 95054 USA or visit www.sun.com if you need additional information or
|
||||
* have any questions.
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is available under and governed by the GNU General Public
|
||||
* License version 2 only, as published by the Free Software Foundation.
|
||||
* However, the following notice accompanied the original version of this
|
||||
* file:
|
||||
*
|
||||
* Written by Doug Lea with assistance from members of JCP JSR-166
|
||||
* Expert Group and released to the public domain, as explained at
|
||||
* http://creativecommons.org/licenses/publicdomain
|
||||
*/
|
||||
|
||||
package java.util.concurrent;
|
||||
|
||||
import java.util.Random;
|
||||
|
||||
/**
|
||||
* A random number generator isolated to the current thread. Like the
|
||||
* global {@link java.util.Random} generator used by the {@link
|
||||
* java.lang.Math} class, a {@code ThreadLocalRandom} is initialized
|
||||
* with an internally generated seed that may not otherwise be
|
||||
* modified. When applicable, use of {@code ThreadLocalRandom} rather
|
||||
* than shared {@code Random} objects in concurrent programs will
|
||||
* typically encounter much less overhead and contention. Use of
|
||||
* {@code ThreadLocalRandom} is particularly appropriate when multiple
|
||||
* tasks (for example, each a {@link ForkJoinTask}) use random numbers
|
||||
* in parallel in thread pools.
|
||||
*
|
||||
* <p>Usages of this class should typically be of the form:
|
||||
* {@code ThreadLocalRandom.current().nextX(...)} (where
|
||||
* {@code X} is {@code Int}, {@code Long}, etc).
|
||||
* When all usages are of this form, it is never possible to
|
||||
* accidently share a {@code ThreadLocalRandom} across multiple threads.
|
||||
*
|
||||
* <p>This class also provides additional commonly used bounded random
|
||||
* generation methods.
|
||||
*
|
||||
* @since 1.7
|
||||
* @author Doug Lea
|
||||
*/
|
||||
public class ThreadLocalRandom extends Random {
|
||||
// same constants as Random, but must be redeclared because private
|
||||
private final static long multiplier = 0x5DEECE66DL;
|
||||
private final static long addend = 0xBL;
|
||||
private final static long mask = (1L << 48) - 1;
|
||||
|
||||
/**
|
||||
* The random seed. We can't use super.seed.
|
||||
*/
|
||||
private long rnd;
|
||||
|
||||
/**
|
||||
* Initialization flag to permit the first and only allowed call
|
||||
* to setSeed (inside Random constructor) to succeed. We can't
|
||||
* allow others since it would cause setting seed in one part of a
|
||||
* program to unintentionally impact other usages by the thread.
|
||||
*/
|
||||
boolean initialized;
|
||||
|
||||
// Padding to help avoid memory contention among seed updates in
|
||||
// different TLRs in the common case that they are located near
|
||||
// each other.
|
||||
private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7;
|
||||
|
||||
/**
|
||||
* The actual ThreadLocal
|
||||
*/
|
||||
private static final ThreadLocal<ThreadLocalRandom> localRandom =
|
||||
new ThreadLocal<ThreadLocalRandom>() {
|
||||
protected ThreadLocalRandom initialValue() {
|
||||
return new ThreadLocalRandom();
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
/**
|
||||
* Constructor called only by localRandom.initialValue.
|
||||
* We rely on the fact that the superclass no-arg constructor
|
||||
* invokes setSeed exactly once to initialize.
|
||||
*/
|
||||
ThreadLocalRandom() {
|
||||
super();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the current thread's {@code ThreadLocalRandom}.
|
||||
*
|
||||
* @return the current thread's {@code ThreadLocalRandom}
|
||||
*/
|
||||
public static ThreadLocalRandom current() {
|
||||
return localRandom.get();
|
||||
}
|
||||
|
||||
/**
|
||||
* Throws {@code UnsupportedOperationException}. Setting seeds in
|
||||
* this generator is not supported.
|
||||
*
|
||||
* @throws UnsupportedOperationException always
|
||||
*/
|
||||
public void setSeed(long seed) {
|
||||
if (initialized)
|
||||
throw new UnsupportedOperationException();
|
||||
initialized = true;
|
||||
rnd = (seed ^ multiplier) & mask;
|
||||
}
|
||||
|
||||
protected int next(int bits) {
|
||||
rnd = (rnd * multiplier + addend) & mask;
|
||||
return (int) (rnd >>> (48-bits));
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a pseudorandom, uniformly distributed value between the
|
||||
* given least value (inclusive) and bound (exclusive).
|
||||
*
|
||||
* @param least the least value returned
|
||||
* @param bound the upper bound (exclusive)
|
||||
* @throws IllegalArgumentException if least greater than or equal
|
||||
* to bound
|
||||
* @return the next value
|
||||
*/
|
||||
public int nextInt(int least, int bound) {
|
||||
if (least >= bound)
|
||||
throw new IllegalArgumentException();
|
||||
return nextInt(bound - least) + least;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a pseudorandom, uniformly distributed value
|
||||
* between 0 (inclusive) and the specified value (exclusive).
|
||||
*
|
||||
* @param n the bound on the random number to be returned. Must be
|
||||
* positive.
|
||||
* @return the next value
|
||||
* @throws IllegalArgumentException if n is not positive
|
||||
*/
|
||||
public long nextLong(long n) {
|
||||
if (n <= 0)
|
||||
throw new IllegalArgumentException("n must be positive");
|
||||
// Divide n by two until small enough for nextInt. On each
|
||||
// iteration (at most 31 of them but usually much less),
|
||||
// randomly choose both whether to include high bit in result
|
||||
// (offset) and whether to continue with the lower vs upper
|
||||
// half (which makes a difference only if odd).
|
||||
long offset = 0;
|
||||
while (n >= Integer.MAX_VALUE) {
|
||||
int bits = next(2);
|
||||
long half = n >>> 1;
|
||||
long nextn = ((bits & 2) == 0) ? half : n - half;
|
||||
if ((bits & 1) == 0)
|
||||
offset += n - nextn;
|
||||
n = nextn;
|
||||
}
|
||||
return offset + nextInt((int) n);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a pseudorandom, uniformly distributed value between the
|
||||
* given least value (inclusive) and bound (exclusive).
|
||||
*
|
||||
* @param least the least value returned
|
||||
* @param bound the upper bound (exclusive)
|
||||
* @return the next value
|
||||
* @throws IllegalArgumentException if least greater than or equal
|
||||
* to bound
|
||||
*/
|
||||
public long nextLong(long least, long bound) {
|
||||
if (least >= bound)
|
||||
throw new IllegalArgumentException();
|
||||
return nextLong(bound - least) + least;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a pseudorandom, uniformly distributed {@code double} value
|
||||
* between 0 (inclusive) and the specified value (exclusive).
|
||||
*
|
||||
* @param n the bound on the random number to be returned. Must be
|
||||
* positive.
|
||||
* @return the next value
|
||||
* @throws IllegalArgumentException if n is not positive
|
||||
*/
|
||||
public double nextDouble(double n) {
|
||||
if (n <= 0)
|
||||
throw new IllegalArgumentException("n must be positive");
|
||||
return nextDouble() * n;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a pseudorandom, uniformly distributed value between the
|
||||
* given least value (inclusive) and bound (exclusive).
|
||||
*
|
||||
* @param least the least value returned
|
||||
* @param bound the upper bound (exclusive)
|
||||
* @return the next value
|
||||
* @throws IllegalArgumentException if least greater than or equal
|
||||
* to bound
|
||||
*/
|
||||
public double nextDouble(double least, double bound) {
|
||||
if (least >= bound)
|
||||
throw new IllegalArgumentException();
|
||||
return nextDouble() * (bound - least) + least;
|
||||
}
|
||||
|
||||
private static final long serialVersionUID = -5851777807851030925L;
|
||||
}
|
161
jdk/src/share/classes/java/util/concurrent/TransferQueue.java
Normal file
161
jdk/src/share/classes/java/util/concurrent/TransferQueue.java
Normal file
@ -0,0 +1,161 @@
|
||||
/*
|
||||
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
||||
*
|
||||
* This code is free software; you can redistribute it and/or modify it
|
||||
* under the terms of the GNU General Public License version 2 only, as
|
||||
* published by the Free Software Foundation. Sun designates this
|
||||
* particular file as subject to the "Classpath" exception as provided
|
||||
* by Sun in the LICENSE file that accompanied this code.
|
||||
*
|
||||
* This code is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
|
||||
* version 2 for more details (a copy is included in the LICENSE file that
|
||||
* accompanied this code).
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License version
|
||||
* 2 along with this work; if not, write to the Free Software Foundation,
|
||||
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||
*
|
||||
* Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
|
||||
* CA 95054 USA or visit www.sun.com if you need additional information or
|
||||
* have any questions.
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is available under and governed by the GNU General Public
|
||||
* License version 2 only, as published by the Free Software Foundation.
|
||||
* However, the following notice accompanied the original version of this
|
||||
* file:
|
||||
*
|
||||
* Written by Doug Lea with assistance from members of JCP JSR-166
|
||||
* Expert Group and released to the public domain, as explained at
|
||||
* http://creativecommons.org/licenses/publicdomain
|
||||
*/
|
||||
|
||||
package java.util.concurrent;
|
||||
|
||||
/**
|
||||
* A {@link BlockingQueue} in which producers may wait for consumers
|
||||
* to receive elements. A {@code TransferQueue} may be useful for
|
||||
* example in message passing applications in which producers
|
||||
* sometimes (using method {@link #transfer}) await receipt of
|
||||
* elements by consumers invoking {@code take} or {@code poll}, while
|
||||
* at other times enqueue elements (via method {@code put}) without
|
||||
* waiting for receipt.
|
||||
* {@linkplain #tryTransfer(Object) Non-blocking} and
|
||||
* {@linkplain #tryTransfer(Object,long,TimeUnit) time-out} versions of
|
||||
* {@code tryTransfer} are also available.
|
||||
* A {@code TransferQueue} may also be queried, via {@link
|
||||
* #hasWaitingConsumer}, whether there are any threads waiting for
|
||||
* items, which is a converse analogy to a {@code peek} operation.
|
||||
*
|
||||
* <p>Like other blocking queues, a {@code TransferQueue} may be
|
||||
* capacity bounded. If so, an attempted transfer operation may
|
||||
* initially block waiting for available space, and/or subsequently
|
||||
* block waiting for reception by a consumer. Note that in a queue
|
||||
* with zero capacity, such as {@link SynchronousQueue}, {@code put}
|
||||
* and {@code transfer} are effectively synonymous.
|
||||
*
|
||||
* <p>This interface is a member of the
|
||||
* <a href="{@docRoot}/../technotes/guides/collections/index.html">
|
||||
* Java Collections Framework</a>.
|
||||
*
|
||||
* @since 1.7
|
||||
* @author Doug Lea
|
||||
* @param <E> the type of elements held in this collection
|
||||
*/
|
||||
public interface TransferQueue<E> extends BlockingQueue<E> {
|
||||
/**
|
||||
* Transfers the element to a waiting consumer immediately, if possible.
|
||||
*
|
||||
* <p>More precisely, transfers the specified element immediately
|
||||
* if there exists a consumer already waiting to receive it (in
|
||||
* {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
|
||||
* otherwise returning {@code false} without enqueuing the element.
|
||||
*
|
||||
* @param e the element to transfer
|
||||
* @return {@code true} if the element was transferred, else
|
||||
* {@code false}
|
||||
* @throws ClassCastException if the class of the specified element
|
||||
* prevents it from being added to this queue
|
||||
* @throws NullPointerException if the specified element is null
|
||||
* @throws IllegalArgumentException if some property of the specified
|
||||
* element prevents it from being added to this queue
|
||||
*/
|
||||
boolean tryTransfer(E e);
|
||||
|
||||
/**
|
||||
* Transfers the element to a consumer, waiting if necessary to do so.
|
||||
*
|
||||
* <p>More precisely, transfers the specified element immediately
|
||||
* if there exists a consumer already waiting to receive it (in
|
||||
* {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
|
||||
* else waits until the element is received by a consumer.
|
||||
*
|
||||
* @param e the element to transfer
|
||||
* @throws InterruptedException if interrupted while waiting,
|
||||
* in which case the element is not left enqueued
|
||||
* @throws ClassCastException if the class of the specified element
|
||||
* prevents it from being added to this queue
|
||||
* @throws NullPointerException if the specified element is null
|
||||
* @throws IllegalArgumentException if some property of the specified
|
||||
* element prevents it from being added to this queue
|
||||
*/
|
||||
void transfer(E e) throws InterruptedException;
|
||||
|
||||
/**
|
||||
* Transfers the element to a consumer if it is possible to do so
|
||||
* before the timeout elapses.
|
||||
*
|
||||
* <p>More precisely, transfers the specified element immediately
|
||||
* if there exists a consumer already waiting to receive it (in
|
||||
* {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
|
||||
* else waits until the element is received by a consumer,
|
||||
* returning {@code false} if the specified wait time elapses
|
||||
* before the element can be transferred.
|
||||
*
|
||||
* @param e the element to transfer
|
||||
* @param timeout how long to wait before giving up, in units of
|
||||
* {@code unit}
|
||||
* @param unit a {@code TimeUnit} determining how to interpret the
|
||||
* {@code timeout} parameter
|
||||
* @return {@code true} if successful, or {@code false} if
|
||||
* the specified waiting time elapses before completion,
|
||||
* in which case the element is not left enqueued
|
||||
* @throws InterruptedException if interrupted while waiting,
|
||||
* in which case the element is not left enqueued
|
||||
* @throws ClassCastException if the class of the specified element
|
||||
* prevents it from being added to this queue
|
||||
* @throws NullPointerException if the specified element is null
|
||||
* @throws IllegalArgumentException if some property of the specified
|
||||
* element prevents it from being added to this queue
|
||||
*/
|
||||
boolean tryTransfer(E e, long timeout, TimeUnit unit)
|
||||
throws InterruptedException;
|
||||
|
||||
/**
|
||||
* Returns {@code true} if there is at least one consumer waiting
|
||||
* to receive an element via {@link #take} or
|
||||
* timed {@link #poll(long,TimeUnit) poll}.
|
||||
* The return value represents a momentary state of affairs.
|
||||
*
|
||||
* @return {@code true} if there is at least one waiting consumer
|
||||
*/
|
||||
boolean hasWaitingConsumer();
|
||||
|
||||
/**
|
||||
* Returns an estimate of the number of consumers waiting to
|
||||
* receive elements via {@link #take} or timed
|
||||
* {@link #poll(long,TimeUnit) poll}. The return value is an
|
||||
* approximation of a momentary state of affairs, that may be
|
||||
* inaccurate if consumers have completed or given up waiting.
|
||||
* The value may be useful for monitoring and heuristics, but
|
||||
* not for synchronization control. Implementations of this
|
||||
* method are likely to be noticeably slower than those for
|
||||
* {@link #hasWaitingConsumer}.
|
||||
*
|
||||
* @return the number of consumers waiting to receive elements
|
||||
*/
|
||||
int getWaitingConsumerCount();
|
||||
}
|
@ -170,8 +170,8 @@ import java.util.Date;
|
||||
* <p>As interruption generally implies cancellation, and checks for
|
||||
* interruption are often infrequent, an implementation can favor responding
|
||||
* to an interrupt over normal method return. This is true even if it can be
|
||||
* shown that the interrupt occurred after another action may have unblocked
|
||||
* the thread. An implementation should document this behavior.
|
||||
* shown that the interrupt occurred after another action that may have
|
||||
* unblocked the thread. An implementation should document this behavior.
|
||||
*
|
||||
* @since 1.5
|
||||
* @author Doug Lea
|
||||
|
@ -92,6 +92,13 @@
|
||||
* assists in coordinating the processing of groups of
|
||||
* asynchronous tasks.
|
||||
*
|
||||
* <p>Class {@link java.util.concurrent.ForkJoinPool} provides an
|
||||
* Executor primarily designed for processing instances of {@link
|
||||
* java.util.concurrent.ForkJoinTask} and its subclasses. These
|
||||
* classes employ a work-stealing scheduler that attains high
|
||||
* throughput for tasks conforming to restrictions that often hold in
|
||||
* computation-intensive parallel processing.
|
||||
*
|
||||
* <h2>Queues</h2>
|
||||
*
|
||||
* The {@link java.util.concurrent.ConcurrentLinkedQueue} class
|
||||
@ -110,6 +117,12 @@
|
||||
* for producer-consumer, messaging, parallel tasking, and
|
||||
* related concurrent designs.
|
||||
*
|
||||
* <p> Extended interface {@link java.util.concurrent.TransferQueue},
|
||||
* and implementation {@link java.util.concurrent.LinkedTransferQueue}
|
||||
* introduce a synchronous {@code transfer} method (along with related
|
||||
* features) in which a producer may optionally block awaiting its
|
||||
* consumer.
|
||||
*
|
||||
* <p>The {@link java.util.concurrent.BlockingDeque} interface
|
||||
* extends {@code BlockingQueue} to support both FIFO and LIFO
|
||||
* (stack-based) operations.
|
||||
@ -136,15 +149,28 @@
|
||||
*
|
||||
* <h2>Synchronizers</h2>
|
||||
*
|
||||
* Four classes aid common special-purpose synchronization idioms.
|
||||
* {@link java.util.concurrent.Semaphore} is a classic concurrency tool.
|
||||
* {@link java.util.concurrent.CountDownLatch} is a very simple yet very
|
||||
* common utility for blocking until a given number of signals, events,
|
||||
* or conditions hold. A {@link java.util.concurrent.CyclicBarrier} is a
|
||||
* resettable multiway synchronization point useful in some styles of
|
||||
* parallel programming. An {@link java.util.concurrent.Exchanger} allows
|
||||
* two threads to exchange objects at a rendezvous point, and is useful
|
||||
* in several pipeline designs.
|
||||
* Five classes aid common special-purpose synchronization idioms.
|
||||
* <ul>
|
||||
*
|
||||
* <li>{@link java.util.concurrent.Semaphore} is a classic concurrency tool.
|
||||
*
|
||||
* <li>{@link java.util.concurrent.CountDownLatch} is a very simple yet
|
||||
* very common utility for blocking until a given number of signals,
|
||||
* events, or conditions hold.
|
||||
*
|
||||
* <li>A {@link java.util.concurrent.CyclicBarrier} is a resettable
|
||||
* multiway synchronization point useful in some styles of parallel
|
||||
* programming.
|
||||
*
|
||||
* <li>A {@link java.util.concurrent.Phaser} provides
|
||||
* a more flexible form of barrier that may be used to control phased
|
||||
* computation among multiple threads.
|
||||
*
|
||||
* <li>An {@link java.util.concurrent.Exchanger} allows two threads to
|
||||
* exchange objects at a rendezvous point, and is useful in several
|
||||
* pipeline designs.
|
||||
*
|
||||
* </ul>
|
||||
*
|
||||
* <h2>Concurrent Collections</h2>
|
||||
*
|
||||
@ -259,7 +285,8 @@
|
||||
* in each thread <i>happen-before</i> those subsequent to the
|
||||
* corresponding {@code exchange()} in another thread.
|
||||
*
|
||||
* <li>Actions prior to calling {@code CyclicBarrier.await}
|
||||
* <li>Actions prior to calling {@code CyclicBarrier.await} and
|
||||
* {@code Phaser.awaitAdvance} (as well as its variants)
|
||||
* <i>happen-before</i> actions performed by the barrier action, and
|
||||
* actions performed by the barrier action <i>happen-before</i> actions
|
||||
* subsequent to a successful return from the corresponding {@code await}
|
||||
|
@ -178,10 +178,10 @@ public class BiggernYours {
|
||||
new ConcurrentLinkedQueue() {
|
||||
public int size() {return randomize(super.size());}});
|
||||
|
||||
// testCollections(
|
||||
// new LinkedTransferQueue(),
|
||||
// new LinkedTransferQueue() {
|
||||
// public int size() {return randomize(super.size());}});
|
||||
testCollections(
|
||||
new LinkedTransferQueue(),
|
||||
new LinkedTransferQueue() {
|
||||
public int size() {return randomize(super.size());}});
|
||||
|
||||
testCollections(
|
||||
new LinkedBlockingQueue(),
|
||||
|
@ -49,7 +49,7 @@ public class IteratorAtEnd {
|
||||
testCollection(new LinkedBlockingQueue());
|
||||
testCollection(new ArrayBlockingQueue(100));
|
||||
testCollection(new ConcurrentLinkedQueue());
|
||||
// testCollection(new LinkedTransferQueue());
|
||||
testCollection(new LinkedTransferQueue());
|
||||
|
||||
testMap(new HashMap());
|
||||
testMap(new Hashtable());
|
||||
|
@ -76,7 +76,7 @@ public class MOAT {
|
||||
testCollection(new LinkedBlockingQueue<Integer>(20));
|
||||
testCollection(new LinkedBlockingDeque<Integer>(20));
|
||||
testCollection(new ConcurrentLinkedQueue<Integer>());
|
||||
// testCollection(new LinkedTransferQueue<Integer>());
|
||||
testCollection(new LinkedTransferQueue<Integer>());
|
||||
testCollection(new ConcurrentSkipListSet<Integer>());
|
||||
testCollection(Arrays.asList(new Integer(42)));
|
||||
testCollection(Arrays.asList(1,2,3));
|
||||
|
@ -52,7 +52,7 @@ public class CheckedNull {
|
||||
|
||||
testMap(Collections.checkedMap(
|
||||
new HashMap<String, String>(),
|
||||
String.class, String.class));;
|
||||
String.class, String.class));
|
||||
}
|
||||
|
||||
ClassCastException cce(F f) {
|
||||
|
@ -234,7 +234,7 @@ public class RacingCollections {
|
||||
List<Queue<Integer>> list =
|
||||
new ArrayList<Queue<Integer>>(newConcurrentDeques());
|
||||
list.add(new LinkedBlockingQueue<Integer>(10));
|
||||
// list.add(new LinkedTransferQueue<Integer>());
|
||||
list.add(new LinkedTransferQueue<Integer>());
|
||||
return list;
|
||||
}
|
||||
|
||||
|
@ -69,7 +69,7 @@ public class RemoveContains {
|
||||
test(new ArrayBlockingQueue<String>(10));
|
||||
test(new LinkedBlockingQueue<String>(10));
|
||||
test(new LinkedBlockingDeque<String>(10));
|
||||
// test(new LinkedTransferQueue<String>());
|
||||
test(new LinkedTransferQueue<String>());
|
||||
test(new ArrayDeque<String>(10));
|
||||
|
||||
System.out.printf("%nPassed = %d, failed = %d%n%n", passed, failed);
|
||||
|
@ -119,12 +119,36 @@ public class CancelledProducerConsumerLoops {
|
||||
}
|
||||
}
|
||||
|
||||
static final class LTQasSQ<T> extends LinkedTransferQueue<T> {
|
||||
LTQasSQ() { super(); }
|
||||
public void put(T x) {
|
||||
try { super.transfer(x); }
|
||||
catch (InterruptedException ex) { throw new Error(); }
|
||||
}
|
||||
private final static long serialVersionUID = 42;
|
||||
}
|
||||
|
||||
static final class HalfSyncLTQ<T> extends LinkedTransferQueue<T> {
|
||||
HalfSyncLTQ() { super(); }
|
||||
public void put(T x) {
|
||||
if (ThreadLocalRandom.current().nextBoolean())
|
||||
super.put(x);
|
||||
else {
|
||||
try { super.transfer(x); }
|
||||
catch (InterruptedException ex) { throw new Error(); }
|
||||
}
|
||||
}
|
||||
private final static long serialVersionUID = 42;
|
||||
}
|
||||
|
||||
static void oneTest(int pairs, int iters) throws Exception {
|
||||
|
||||
oneRun(new ArrayBlockingQueue<Integer>(CAPACITY), pairs, iters);
|
||||
oneRun(new LinkedBlockingQueue<Integer>(CAPACITY), pairs, iters);
|
||||
oneRun(new LinkedBlockingDeque<Integer>(CAPACITY), pairs, iters);
|
||||
// oneRun(new LinkedTransferQueue<Integer>(), pairs, iters);
|
||||
oneRun(new LinkedTransferQueue<Integer>(), pairs, iters);
|
||||
oneRun(new LTQasSQ<Integer>(), pairs, iters);
|
||||
oneRun(new HalfSyncLTQ<Integer>(), pairs, iters);
|
||||
oneRun(new SynchronousQueue<Integer>(), pairs, iters / 8);
|
||||
|
||||
/* PriorityBlockingQueue is unbounded
|
||||
|
@ -37,7 +37,7 @@ public class LastElement {
|
||||
testQueue(new LinkedBlockingDeque<Integer>());
|
||||
testQueue(new ArrayBlockingQueue<Integer>(10, true));
|
||||
testQueue(new ArrayBlockingQueue<Integer>(10, false));
|
||||
// testQueue(new LinkedTransferQueue<Integer>());
|
||||
testQueue(new LinkedTransferQueue<Integer>());
|
||||
|
||||
System.out.printf("%nPassed = %d, failed = %d%n%n", passed, failed);
|
||||
if (failed > 0) throw new Exception("Some tests failed");
|
||||
|
@ -87,11 +87,35 @@ public class MultipleProducersSingleConsumerLoops {
|
||||
throw new Error();
|
||||
}
|
||||
|
||||
static final class LTQasSQ<T> extends LinkedTransferQueue<T> {
|
||||
LTQasSQ() { super(); }
|
||||
public void put(T x) {
|
||||
try { super.transfer(x); }
|
||||
catch (InterruptedException ex) { throw new Error(); }
|
||||
}
|
||||
private final static long serialVersionUID = 42;
|
||||
}
|
||||
|
||||
static final class HalfSyncLTQ<T> extends LinkedTransferQueue<T> {
|
||||
HalfSyncLTQ() { super(); }
|
||||
public void put(T x) {
|
||||
if (ThreadLocalRandom.current().nextBoolean())
|
||||
super.put(x);
|
||||
else {
|
||||
try { super.transfer(x); }
|
||||
catch (InterruptedException ex) { throw new Error(); }
|
||||
}
|
||||
}
|
||||
private final static long serialVersionUID = 42;
|
||||
}
|
||||
|
||||
static void oneTest(int producers, int iters) throws Exception {
|
||||
oneRun(new ArrayBlockingQueue<Integer>(CAPACITY), producers, iters);
|
||||
oneRun(new LinkedBlockingQueue<Integer>(CAPACITY), producers, iters);
|
||||
oneRun(new LinkedBlockingDeque<Integer>(CAPACITY), producers, iters);
|
||||
// oneRun(new LinkedTransferQueue<Integer>(), producers, iters);
|
||||
oneRun(new LinkedTransferQueue<Integer>(), producers, iters);
|
||||
oneRun(new LTQasSQ<Integer>(), producers, iters);
|
||||
oneRun(new HalfSyncLTQ<Integer>(), producers, iters);
|
||||
|
||||
// Don't run PBQ since can legitimately run out of memory
|
||||
// if (print)
|
||||
|
@ -63,12 +63,11 @@ public class OfferDrainToLoops {
|
||||
test(new LinkedBlockingDeque());
|
||||
test(new LinkedBlockingDeque(2000));
|
||||
test(new ArrayBlockingQueue(2000));
|
||||
// test(new LinkedTransferQueue());
|
||||
test(new LinkedTransferQueue());
|
||||
}
|
||||
|
||||
Random getRandom() {
|
||||
return new Random();
|
||||
// return ThreadLocalRandom.current();
|
||||
return ThreadLocalRandom.current();
|
||||
}
|
||||
|
||||
void test(final BlockingQueue q) throws Throwable {
|
||||
|
@ -46,7 +46,7 @@ public class PollMemoryLeak {
|
||||
public static void main(String[] args) throws InterruptedException {
|
||||
final BlockingQueue[] qs = {
|
||||
new LinkedBlockingQueue(10),
|
||||
// new LinkedTransferQueue(),
|
||||
new LinkedTransferQueue(),
|
||||
new ArrayBlockingQueue(10),
|
||||
new SynchronousQueue(),
|
||||
new SynchronousQueue(true),
|
||||
|
@ -87,11 +87,35 @@ public class ProducerConsumerLoops {
|
||||
throw new Error();
|
||||
}
|
||||
|
||||
static final class LTQasSQ<T> extends LinkedTransferQueue<T> {
|
||||
LTQasSQ() { super(); }
|
||||
public void put(T x) {
|
||||
try { super.transfer(x); }
|
||||
catch (InterruptedException ex) { throw new Error(); }
|
||||
}
|
||||
private final static long serialVersionUID = 42;
|
||||
}
|
||||
|
||||
static final class HalfSyncLTQ<T> extends LinkedTransferQueue<T> {
|
||||
HalfSyncLTQ() { super(); }
|
||||
public void put(T x) {
|
||||
if (ThreadLocalRandom.current().nextBoolean())
|
||||
super.put(x);
|
||||
else {
|
||||
try { super.transfer(x); }
|
||||
catch (InterruptedException ex) { throw new Error(); }
|
||||
}
|
||||
}
|
||||
private final static long serialVersionUID = 42;
|
||||
}
|
||||
|
||||
static void oneTest(int pairs, int iters) throws Exception {
|
||||
oneRun(new ArrayBlockingQueue<Integer>(CAPACITY), pairs, iters);
|
||||
oneRun(new LinkedBlockingQueue<Integer>(CAPACITY), pairs, iters);
|
||||
oneRun(new LinkedBlockingDeque<Integer>(CAPACITY), pairs, iters);
|
||||
// oneRun(new LinkedTransferQueue<Integer>(), pairs, iters);
|
||||
oneRun(new LinkedTransferQueue<Integer>(), pairs, iters);
|
||||
oneRun(new LTQasSQ<Integer>(), pairs, iters);
|
||||
oneRun(new HalfSyncLTQ<Integer>(), pairs, iters);
|
||||
oneRun(new PriorityBlockingQueue<Integer>(), pairs, iters);
|
||||
oneRun(new SynchronousQueue<Integer>(), pairs, iters);
|
||||
|
||||
|
@ -73,11 +73,35 @@ public class SingleProducerMultipleConsumerLoops {
|
||||
throw new Error();
|
||||
}
|
||||
|
||||
static final class LTQasSQ<T> extends LinkedTransferQueue<T> {
|
||||
LTQasSQ() { super(); }
|
||||
public void put(T x) {
|
||||
try { super.transfer(x); }
|
||||
catch (InterruptedException ex) { throw new Error(); }
|
||||
}
|
||||
private final static long serialVersionUID = 42;
|
||||
}
|
||||
|
||||
static final class HalfSyncLTQ<T> extends LinkedTransferQueue<T> {
|
||||
HalfSyncLTQ() { super(); }
|
||||
public void put(T x) {
|
||||
if (ThreadLocalRandom.current().nextBoolean())
|
||||
super.put(x);
|
||||
else {
|
||||
try { super.transfer(x); }
|
||||
catch (InterruptedException ex) { throw new Error(); }
|
||||
}
|
||||
}
|
||||
private final static long serialVersionUID = 42;
|
||||
}
|
||||
|
||||
static void oneTest(int consumers, int iters) throws Exception {
|
||||
oneRun(new ArrayBlockingQueue<Integer>(CAPACITY), consumers, iters);
|
||||
oneRun(new LinkedBlockingQueue<Integer>(CAPACITY), consumers, iters);
|
||||
oneRun(new LinkedBlockingDeque<Integer>(CAPACITY), consumers, iters);
|
||||
// oneRun(new LinkedTransferQueue<Integer>(), consumers, iters);
|
||||
oneRun(new LinkedTransferQueue<Integer>(), consumers, iters);
|
||||
oneRun(new LTQasSQ<Integer>(), consumers, iters);
|
||||
oneRun(new HalfSyncLTQ<Integer>(), consumers, iters);
|
||||
oneRun(new PriorityBlockingQueue<Integer>(), consumers, iters);
|
||||
oneRun(new SynchronousQueue<Integer>(), consumers, iters);
|
||||
if (print)
|
||||
|
@ -60,7 +60,7 @@ public class ConcurrentQueueLoops {
|
||||
//queues.add(new ArrayBlockingQueue<Integer>(count, true));
|
||||
queues.add(new LinkedBlockingQueue<Integer>());
|
||||
queues.add(new LinkedBlockingDeque<Integer>());
|
||||
// queues.add(new LinkedTransferQueue<Integer>());
|
||||
queues.add(new LinkedTransferQueue<Integer>());
|
||||
|
||||
// Following additional implementations are available from:
|
||||
// http://gee.cs.oswego.edu/dl/concurrency-interest/index.html
|
||||
|
@ -43,7 +43,7 @@ import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.LinkedBlockingDeque;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
// import java.util.concurrent.LinkedTransferQueue;
|
||||
import java.util.concurrent.LinkedTransferQueue;
|
||||
import java.util.concurrent.PriorityBlockingQueue;
|
||||
import java.util.LinkedList;
|
||||
import java.util.PriorityQueue;
|
||||
@ -70,7 +70,7 @@ public class GCRetention {
|
||||
queues.add(new PriorityBlockingQueue<Boolean>());
|
||||
queues.add(new PriorityQueue<Boolean>());
|
||||
queues.add(new LinkedList<Boolean>());
|
||||
// queues.add(new LinkedTransferQueue<Boolean>());
|
||||
queues.add(new LinkedTransferQueue<Boolean>());
|
||||
|
||||
// Following additional implementations are available from:
|
||||
// http://gee.cs.oswego.edu/dl/concurrency-interest/index.html
|
||||
|
@ -49,7 +49,7 @@ public class IteratorWeakConsistency {
|
||||
test(new LinkedBlockingDeque());
|
||||
test(new LinkedBlockingDeque(20));
|
||||
test(new ConcurrentLinkedQueue());
|
||||
// test(new LinkedTransferQueue());
|
||||
test(new LinkedTransferQueue());
|
||||
// Other concurrent queues (e.g. ArrayBlockingQueue) do not
|
||||
// currently have weakly consistent iterators.
|
||||
// test(new ArrayBlockingQueue(20));
|
||||
|
@ -56,12 +56,11 @@ public class OfferRemoveLoops {
|
||||
testQueue(new ArrayBlockingQueue(10));
|
||||
testQueue(new PriorityBlockingQueue(10));
|
||||
testQueue(new ConcurrentLinkedQueue());
|
||||
// testQueue(new LinkedTransferQueue());
|
||||
testQueue(new LinkedTransferQueue());
|
||||
}
|
||||
|
||||
Random getRandom() {
|
||||
return new Random();
|
||||
// return ThreadLocalRandom.current();
|
||||
return ThreadLocalRandom.current();
|
||||
}
|
||||
|
||||
void testQueue(final Queue q) throws Throwable {
|
||||
|
@ -45,7 +45,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.LinkedBlockingDeque;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
// import java.util.concurrent.LinkedTransferQueue;
|
||||
import java.util.concurrent.LinkedTransferQueue;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
@ -67,7 +67,7 @@ public class RemovePollRace {
|
||||
queues.add(new ArrayBlockingQueue<Boolean>(count, true));
|
||||
queues.add(new LinkedBlockingQueue<Boolean>());
|
||||
queues.add(new LinkedBlockingDeque<Boolean>());
|
||||
// queues.add(new LinkedTransferQueue<Boolean>());
|
||||
queues.add(new LinkedTransferQueue<Boolean>());
|
||||
|
||||
// Following additional implementations are available from:
|
||||
// http://gee.cs.oswego.edu/dl/concurrency-interest/index.html
|
||||
|
94
jdk/test/java/util/concurrent/Phaser/Arrive.java
Normal file
94
jdk/test/java/util/concurrent/Phaser/Arrive.java
Normal file
@ -0,0 +1,94 @@
|
||||
/*
|
||||
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
||||
*
|
||||
* This code is free software; you can redistribute it and/or modify it
|
||||
* under the terms of the GNU General Public License version 2 only, as
|
||||
* published by the Free Software Foundation.
|
||||
*
|
||||
* This code is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
|
||||
* version 2 for more details (a copy is included in the LICENSE file that
|
||||
* accompanied this code).
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License version
|
||||
* 2 along with this work; if not, write to the Free Software Foundation,
|
||||
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||
*
|
||||
* Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
|
||||
* CA 95054 USA or visit www.sun.com if you need additional information or
|
||||
* have any questions.
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is available under and governed by the GNU General Public
|
||||
* License version 2 only, as published by the Free Software Foundation.
|
||||
* However, the following notice accompanied the original version of this
|
||||
* file:
|
||||
*
|
||||
* Written by Doug Lea with assistance from members of JCP JSR-166
|
||||
* Expert Group and released to the public domain, as explained at
|
||||
* http://creativecommons.org/licenses/publicdomain
|
||||
*/
|
||||
|
||||
/*
|
||||
* @test
|
||||
* @bug 6445158
|
||||
* @summary tests for Phaser.arrive()
|
||||
*/
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Phaser;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
public class Arrive {
|
||||
void test(String[] args) throws Throwable {
|
||||
final int n = ThreadLocalRandom.current().nextInt(1, 10);
|
||||
final int nthreads = n*3/2;
|
||||
final Phaser startingGate = new Phaser(nthreads);
|
||||
final Phaser phaser = new Phaser(n);
|
||||
final List<Thread> threads = new ArrayList<Thread>();
|
||||
final AtomicInteger count0 = new AtomicInteger(0);
|
||||
final AtomicInteger count1 = new AtomicInteger(0);
|
||||
final Runnable task = new Runnable() { public void run() {
|
||||
equal(startingGate.getPhase(), 0);
|
||||
startingGate.arriveAndAwaitAdvance();
|
||||
equal(startingGate.getPhase(), 1);
|
||||
int phase = phaser.arrive();
|
||||
if (phase == 0)
|
||||
count0.getAndIncrement();
|
||||
else if (phase == 1)
|
||||
count1.getAndIncrement();
|
||||
else
|
||||
fail();
|
||||
}};
|
||||
for (int i = 0; i < nthreads; i++)
|
||||
threads.add(new Thread(task));
|
||||
for (Thread thread : threads)
|
||||
thread.start();
|
||||
for (Thread thread : threads)
|
||||
thread.join();
|
||||
equal(count0.get(), n);
|
||||
equal(count1.get(), nthreads-n);
|
||||
equal(phaser.getPhase(), 1);
|
||||
}
|
||||
|
||||
//--------------------- Infrastructure ---------------------------
|
||||
volatile int passed = 0, failed = 0;
|
||||
void pass() {passed++;}
|
||||
void fail() {failed++; Thread.dumpStack();}
|
||||
void fail(String msg) {System.err.println(msg); fail();}
|
||||
void unexpected(Throwable t) {failed++; t.printStackTrace();}
|
||||
void check(boolean cond) {if (cond) pass(); else fail();}
|
||||
void equal(Object x, Object y) {
|
||||
if (x == null ? y == null : x.equals(y)) pass();
|
||||
else fail(x + " not equal to " + y);}
|
||||
public static void main(String[] args) throws Throwable {
|
||||
new Arrive().instanceMain(args);}
|
||||
public void instanceMain(String[] args) throws Throwable {
|
||||
try {test(args);} catch (Throwable t) {unexpected(t);}
|
||||
System.out.printf("%nPassed = %d, failed = %d%n%n", passed, failed);
|
||||
if (failed > 0) throw new AssertionError("Some tests failed");}
|
||||
}
|
407
jdk/test/java/util/concurrent/Phaser/Basic.java
Normal file
407
jdk/test/java/util/concurrent/Phaser/Basic.java
Normal file
@ -0,0 +1,407 @@
|
||||
/*
|
||||
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
||||
*
|
||||
* This code is free software; you can redistribute it and/or modify it
|
||||
* under the terms of the GNU General Public License version 2 only, as
|
||||
* published by the Free Software Foundation.
|
||||
*
|
||||
* This code is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
|
||||
* version 2 for more details (a copy is included in the LICENSE file that
|
||||
* accompanied this code).
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License version
|
||||
* 2 along with this work; if not, write to the Free Software Foundation,
|
||||
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||
*
|
||||
* Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
|
||||
* CA 95054 USA or visit www.sun.com if you need additional information or
|
||||
* have any questions.
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is available under and governed by the GNU General Public
|
||||
* License version 2 only, as published by the Free Software Foundation.
|
||||
* However, the following notice accompanied the original version of this
|
||||
* file:
|
||||
*
|
||||
* Written by Doug Lea with assistance from members of JCP JSR-166
|
||||
* Expert Group and released to the public domain, as explained at
|
||||
* http://creativecommons.org/licenses/publicdomain
|
||||
*/
|
||||
|
||||
/*
|
||||
* @test
|
||||
* @bug 6445158
|
||||
* @summary Basic tests for Phaser
|
||||
* @author Chris Hegarty
|
||||
*/
|
||||
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedList;
|
||||
import java.util.concurrent.Phaser;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import static java.util.concurrent.TimeUnit.*;
|
||||
|
||||
public class Basic {
|
||||
|
||||
private static void checkTerminated(final Phaser phaser) {
|
||||
check(phaser.isTerminated());
|
||||
int unarriverParties = phaser.getUnarrivedParties();
|
||||
int registeredParties = phaser.getRegisteredParties();
|
||||
equal(phaser.arrive(), -1);
|
||||
equal(phaser.arriveAndDeregister(), -1);
|
||||
equal(phaser.arriveAndAwaitAdvance(), -1);
|
||||
equal(phaser.bulkRegister(10), -1);
|
||||
equal(phaser.getPhase(), -1);
|
||||
equal(phaser.register(), -1);
|
||||
try {
|
||||
equal(phaser.awaitAdvanceInterruptibly(0), -1);
|
||||
equal(phaser.awaitAdvanceInterruptibly(0, 10, SECONDS), -1);
|
||||
} catch (Exception ie) {
|
||||
unexpected(ie);
|
||||
}
|
||||
equal(phaser.getUnarrivedParties(), unarriverParties);
|
||||
equal(phaser.getRegisteredParties(), registeredParties);
|
||||
}
|
||||
|
||||
private static void checkResult(Arriver a, Class<? extends Throwable> c) {
|
||||
Throwable t = a.result();
|
||||
if (! ((t == null && c == null) || (c != null && c.isInstance(t)))) {
|
||||
// t.printStackTrace();
|
||||
fail("Mismatch in thread " +
|
||||
a.getName() + ": " +
|
||||
t + ", " +
|
||||
(c == null ? "<null>" : c.getName()));
|
||||
} else {
|
||||
pass();
|
||||
}
|
||||
}
|
||||
|
||||
//----------------------------------------------------------------
|
||||
// Mechanism to get all test threads into "running" mode.
|
||||
//----------------------------------------------------------------
|
||||
private static Phaser atTheStartingGate = new Phaser(3);
|
||||
|
||||
private static void toTheStartingGate() {
|
||||
try {
|
||||
boolean expectNextPhase = false;
|
||||
if (atTheStartingGate.getUnarrivedParties() == 1) {
|
||||
expectNextPhase = true;
|
||||
}
|
||||
int phase = atTheStartingGate.getPhase();
|
||||
equal(phase, atTheStartingGate.arrive());
|
||||
int AwaitPhase = atTheStartingGate.awaitAdvanceInterruptibly(phase,
|
||||
10,
|
||||
SECONDS);
|
||||
if (expectNextPhase) check(AwaitPhase == (phase + 1));
|
||||
|
||||
pass();
|
||||
} catch (Throwable t) {
|
||||
unexpected(t);
|
||||
// reset(atTheStartingGate);
|
||||
throw new Error(t);
|
||||
}
|
||||
}
|
||||
|
||||
//----------------------------------------------------------------
|
||||
// Convenience methods for creating threads that call arrive,
|
||||
// awaitAdvance, arriveAndAwaitAdvance, awaitAdvanceInterruptibly
|
||||
//----------------------------------------------------------------
|
||||
private static abstract class Arriver extends Thread {
|
||||
static AtomicInteger count = new AtomicInteger(1);
|
||||
|
||||
Arriver() {
|
||||
this("Arriver");
|
||||
}
|
||||
|
||||
Arriver(String name) {
|
||||
this.setName(name + ":" + count.getAndIncrement());
|
||||
this.setDaemon(true);
|
||||
}
|
||||
|
||||
private volatile Throwable result;
|
||||
private volatile int phase;
|
||||
protected void result(Throwable result) { this.result = result; }
|
||||
public Throwable result() { return this.result; }
|
||||
protected void phase(int phase) { this.phase = phase; }
|
||||
public int phase() { return this.phase; }
|
||||
}
|
||||
|
||||
private static abstract class Awaiter extends Arriver {
|
||||
Awaiter() { super("Awaiter"); }
|
||||
Awaiter(String name) { super(name); }
|
||||
}
|
||||
|
||||
private static Arriver arriver(final Phaser phaser) {
|
||||
return new Arriver() { public void run() {
|
||||
toTheStartingGate();
|
||||
|
||||
try { phase(phaser.arrive()); }
|
||||
catch (Throwable result) { result(result); }}};
|
||||
}
|
||||
|
||||
private static AtomicInteger cycleArriveAwaitAdvance = new AtomicInteger(1);
|
||||
|
||||
private static Awaiter awaiter(final Phaser phaser) {
|
||||
return new Awaiter() { public void run() {
|
||||
toTheStartingGate();
|
||||
|
||||
try {
|
||||
if (cycleArriveAwaitAdvance.getAndIncrement() % 2 == 0)
|
||||
phase(phaser.awaitAdvance(phaser.arrive()));
|
||||
else
|
||||
phase(phaser.arriveAndAwaitAdvance());
|
||||
} catch (Throwable result) { result(result); }}};
|
||||
}
|
||||
|
||||
private static Awaiter awaiter(final Phaser phaser,
|
||||
final long timeout,
|
||||
final TimeUnit unit) {
|
||||
return new Awaiter("InterruptibleWaiter") { public void run() {
|
||||
toTheStartingGate();
|
||||
|
||||
try {
|
||||
if (timeout < 0)
|
||||
phase(phaser.awaitAdvanceInterruptibly(phaser.arrive()));
|
||||
else
|
||||
phase(phaser.awaitAdvanceInterruptibly(phaser.arrive(),
|
||||
timeout,
|
||||
unit));
|
||||
} catch (Throwable result) { result(result); }}};
|
||||
}
|
||||
|
||||
// Returns an infinite lazy list of all possible arriver/awaiter combinations.
|
||||
private static Iterator<Arriver> arriverIterator(final Phaser phaser) {
|
||||
return new Iterator<Arriver>() {
|
||||
int i = 0;
|
||||
public boolean hasNext() { return true; }
|
||||
public Arriver next() {
|
||||
switch ((i++)&7) {
|
||||
case 0: case 4:
|
||||
return arriver(phaser);
|
||||
case 1: case 5:
|
||||
return awaiter(phaser);
|
||||
case 2: case 6: case 7:
|
||||
return awaiter(phaser, -1, SECONDS);
|
||||
default:
|
||||
return awaiter(phaser, 10, SECONDS); }}
|
||||
public void remove() {throw new UnsupportedOperationException();}};
|
||||
}
|
||||
|
||||
// Returns an infinite lazy list of all possible awaiter only combinations.
|
||||
private static Iterator<Awaiter> awaiterIterator(final Phaser phaser) {
|
||||
return new Iterator<Awaiter>() {
|
||||
int i = 0;
|
||||
public boolean hasNext() { return true; }
|
||||
public Awaiter next() {
|
||||
switch ((i++)&7) {
|
||||
case 1: case 4: case 7:
|
||||
return awaiter(phaser);
|
||||
case 2: case 5:
|
||||
return awaiter(phaser, -1, SECONDS);
|
||||
default:
|
||||
return awaiter(phaser, 10, SECONDS); }}
|
||||
public void remove() {throw new UnsupportedOperationException();}};
|
||||
}
|
||||
|
||||
private static void realMain(String[] args) throws Throwable {
|
||||
|
||||
Thread.currentThread().setName("mainThread");
|
||||
|
||||
//----------------------------------------------------------------
|
||||
// Normal use
|
||||
//----------------------------------------------------------------
|
||||
try {
|
||||
Phaser phaser = new Phaser(3);
|
||||
equal(phaser.getRegisteredParties(), 3);
|
||||
equal(phaser.getArrivedParties(), 0);
|
||||
equal(phaser.getPhase(), 0);
|
||||
check(phaser.getRoot().equals(phaser));
|
||||
equal(phaser.getParent(), null);
|
||||
check(!phaser.isTerminated());
|
||||
|
||||
Iterator<Arriver> arrivers = arriverIterator(phaser);
|
||||
int phase = 0;
|
||||
for (int i = 0; i < 10; i++) {
|
||||
equal(phaser.getPhase(), phase++);
|
||||
Arriver a1 = arrivers.next(); a1.start();
|
||||
Arriver a2 = arrivers.next(); a2.start();
|
||||
toTheStartingGate();
|
||||
phaser.arriveAndAwaitAdvance();
|
||||
a1.join();
|
||||
a2.join();
|
||||
checkResult(a1, null);
|
||||
checkResult(a2, null);
|
||||
check(!phaser.isTerminated());
|
||||
equal(phaser.getRegisteredParties(), 3);
|
||||
equal(phaser.getArrivedParties(), 0);
|
||||
}
|
||||
} catch (Throwable t) { unexpected(t); }
|
||||
|
||||
//----------------------------------------------------------------
|
||||
// One thread interrupted
|
||||
//----------------------------------------------------------------
|
||||
try {
|
||||
Phaser phaser = new Phaser(3);
|
||||
Iterator<Arriver> arrivers = arriverIterator(phaser);
|
||||
int phase = phaser.getPhase();
|
||||
for (int i = 0; i < 4; i++) {
|
||||
check(phaser.getPhase() == phase);
|
||||
Awaiter a1 = awaiter(phaser, 10, SECONDS); a1.start();
|
||||
Arriver a2 = arrivers.next(); a2.start();
|
||||
toTheStartingGate();
|
||||
a1.interrupt();
|
||||
a1.join();
|
||||
phaser.arriveAndAwaitAdvance();
|
||||
a2.join();
|
||||
checkResult(a1, InterruptedException.class);
|
||||
checkResult(a2, null);
|
||||
check(!phaser.isTerminated());
|
||||
equal(phaser.getRegisteredParties(), 3);
|
||||
equal(phaser.getArrivedParties(), 0);
|
||||
phase++;
|
||||
}
|
||||
} catch (Throwable t) { unexpected(t); }
|
||||
|
||||
//----------------------------------------------------------------
|
||||
// Phaser is terminated while threads are waiting
|
||||
//----------------------------------------------------------------
|
||||
try {
|
||||
Phaser phaser = new Phaser(3);
|
||||
Iterator<Awaiter> awaiters = awaiterIterator(phaser);
|
||||
for (int i = 0; i < 4; i++) {
|
||||
Arriver a1 = awaiters.next(); a1.start();
|
||||
Arriver a2 = awaiters.next(); a2.start();
|
||||
toTheStartingGate();
|
||||
while (phaser.getArrivedParties() < 2) Thread.yield();
|
||||
phaser.forceTermination();
|
||||
a1.join();
|
||||
a2.join();
|
||||
check(a1.phase == -1);
|
||||
check(a2.phase == -1);
|
||||
int arrivedParties = phaser.getArrivedParties();
|
||||
checkTerminated(phaser);
|
||||
equal(phaser.getArrivedParties(), arrivedParties);
|
||||
}
|
||||
} catch (Throwable t) { unexpected(t); }
|
||||
|
||||
//----------------------------------------------------------------
|
||||
// Adds new unarrived parties to this phaser
|
||||
//----------------------------------------------------------------
|
||||
try {
|
||||
Phaser phaser = new Phaser(1);
|
||||
Iterator<Arriver> arrivers = arriverIterator(phaser);
|
||||
LinkedList<Arriver> arriverList = new LinkedList<Arriver>();
|
||||
int phase = phaser.getPhase();
|
||||
for (int i = 1; i < 5; i++) {
|
||||
atTheStartingGate = new Phaser(1+(3*i));
|
||||
check(phaser.getPhase() == phase);
|
||||
// register 3 more
|
||||
phaser.register(); phaser.register(); phaser.register();
|
||||
for (int z=0; z<(3*i); z++) {
|
||||
arriverList.add(arrivers.next());
|
||||
}
|
||||
for (Arriver arriver : arriverList)
|
||||
arriver.start();
|
||||
|
||||
toTheStartingGate();
|
||||
phaser.arriveAndAwaitAdvance();
|
||||
|
||||
for (Arriver arriver : arriverList) {
|
||||
arriver.join();
|
||||
checkResult(arriver, null);
|
||||
}
|
||||
equal(phaser.getRegisteredParties(), 1 + (3*i));
|
||||
equal(phaser.getArrivedParties(), 0);
|
||||
arriverList.clear();
|
||||
phase++;
|
||||
}
|
||||
atTheStartingGate = new Phaser(3);
|
||||
} catch (Throwable t) { unexpected(t); }
|
||||
|
||||
//----------------------------------------------------------------
|
||||
// One thread timed out
|
||||
//----------------------------------------------------------------
|
||||
try {
|
||||
Phaser phaser = new Phaser(3);
|
||||
Iterator<Arriver> arrivers = arriverIterator(phaser);
|
||||
for (long timeout : new long[] { 0L, 5L }) {
|
||||
for (int i = 0; i < 2; i++) {
|
||||
Awaiter a1 = awaiter(phaser, timeout, SECONDS); a1.start();
|
||||
Arriver a2 = arrivers.next(); a2.start();
|
||||
toTheStartingGate();
|
||||
a1.join();
|
||||
checkResult(a1, TimeoutException.class);
|
||||
phaser.arrive();
|
||||
a2.join();
|
||||
checkResult(a2, null);
|
||||
check(!phaser.isTerminated());
|
||||
}
|
||||
}
|
||||
} catch (Throwable t) { unexpected(t); }
|
||||
|
||||
//----------------------------------------------------------------
|
||||
// Barrier action completed normally
|
||||
//----------------------------------------------------------------
|
||||
try {
|
||||
final AtomicInteger count = new AtomicInteger(0);
|
||||
final Phaser[] kludge = new Phaser[1];
|
||||
Phaser phaser = new Phaser(3) {
|
||||
@Override
|
||||
protected boolean onAdvance(int phase, int registeredParties) {
|
||||
int countPhase = count.getAndIncrement();
|
||||
equal(countPhase, phase);
|
||||
equal(kludge[0].getPhase(), phase);
|
||||
equal(kludge[0].getRegisteredParties(), registeredParties);
|
||||
if (phase >= 3)
|
||||
return true; // terminate
|
||||
|
||||
return false;
|
||||
}
|
||||
};
|
||||
kludge[0] = phaser;
|
||||
equal(phaser.getRegisteredParties(), 3);
|
||||
Iterator<Awaiter> awaiters = awaiterIterator(phaser);
|
||||
for (int i = 0; i < 4; i++) {
|
||||
Awaiter a1 = awaiters.next(); a1.start();
|
||||
Awaiter a2 = awaiters.next(); a2.start();
|
||||
toTheStartingGate();
|
||||
while (phaser.getArrivedParties() < 2) Thread.yield();
|
||||
phaser.arrive();
|
||||
a1.join();
|
||||
a2.join();
|
||||
checkResult(a1, null);
|
||||
checkResult(a2, null);
|
||||
equal(count.get(), i+1);
|
||||
if (i < 3) {
|
||||
check(!phaser.isTerminated());
|
||||
equal(phaser.getRegisteredParties(), 3);
|
||||
equal(phaser.getArrivedParties(), 0);
|
||||
equal(phaser.getUnarrivedParties(), 3);
|
||||
equal(phaser.getPhase(), count.get());
|
||||
} else
|
||||
checkTerminated(phaser);
|
||||
}
|
||||
} catch (Throwable t) { unexpected(t); }
|
||||
|
||||
}
|
||||
|
||||
//--------------------- Infrastructure ---------------------------
|
||||
static volatile int passed = 0, failed = 0;
|
||||
static void pass() {passed++;}
|
||||
static void fail() {failed++; Thread.dumpStack();}
|
||||
static void fail(String msg) {System.out.println(msg); fail();}
|
||||
static void unexpected(Throwable t) {failed++; t.printStackTrace();}
|
||||
static void check(boolean cond) {if (cond) pass(); else fail();}
|
||||
static void equal(Object x, Object y) {
|
||||
if (x == null ? y == null : x.equals(y)) pass();
|
||||
else fail(x + " not equal to " + y);}
|
||||
public static void main(String[] args) throws Throwable {
|
||||
try {realMain(args);} catch (Throwable t) {unexpected(t);}
|
||||
System.out.printf("%nPassed = %d, failed = %d%n%n", passed, failed);
|
||||
if (failed > 0) throw new AssertionError("Some tests failed");}
|
||||
}
|
@ -20,6 +20,17 @@
|
||||
* have any questions.
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is available under and governed by the GNU General Public
|
||||
* License version 2 only, as published by the Free Software Foundation.
|
||||
* However, the following notice accompanied the original version of this
|
||||
* file:
|
||||
*
|
||||
* Written by Doug Lea with assistance from members of JCP JSR-166
|
||||
* Expert Group and released to the public domain, as explained at
|
||||
* http://creativecommons.org/licenses/publicdomain
|
||||
*/
|
||||
|
||||
/*
|
||||
* @test
|
||||
* @bug 6725789
|
||||
|
265
jdk/test/java/util/concurrent/forkjoin/Integrate.java
Normal file
265
jdk/test/java/util/concurrent/forkjoin/Integrate.java
Normal file
@ -0,0 +1,265 @@
|
||||
/*
|
||||
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
||||
*
|
||||
* This code is free software; you can redistribute it and/or modify it
|
||||
* under the terms of the GNU General Public License version 2 only, as
|
||||
* published by the Free Software Foundation.
|
||||
*
|
||||
* This code is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
|
||||
* version 2 for more details (a copy is included in the LICENSE file that
|
||||
* accompanied this code).
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License version
|
||||
* 2 along with this work; if not, write to the Free Software Foundation,
|
||||
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||
*
|
||||
* Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
|
||||
* CA 95054 USA or visit www.sun.com if you need additional information or
|
||||
* have any questions.
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is available under and governed by the GNU General Public
|
||||
* License version 2 only, as published by the Free Software Foundation.
|
||||
* However, the following notice accompanied the original version of this
|
||||
* file:
|
||||
*
|
||||
* Written by Doug Lea with assistance from members of JCP JSR-166
|
||||
* Expert Group and released to the public domain, as explained at
|
||||
* http://creativecommons.org/licenses/publicdomain
|
||||
*/
|
||||
|
||||
/*
|
||||
* @test
|
||||
* @bug 6865571
|
||||
* @summary Numerical Integration using fork/join
|
||||
* @run main Integrate reps=1 forkPolicy=dynamic
|
||||
* @run main Integrate reps=1 forkPolicy=serial
|
||||
* @run main Integrate reps=1 forkPolicy=fork
|
||||
*/
|
||||
|
||||
import java.util.concurrent.ForkJoinPool;
|
||||
import java.util.concurrent.RecursiveAction;
|
||||
|
||||
/**
|
||||
* Sample program using Gaussian Quadrature for numerical integration.
|
||||
* This version uses a simplified hardwired function. Inspired by a
|
||||
* <A href="http://www.cs.uga.edu/~dkl/filaments/dist.html">
|
||||
* Filaments</A> demo program.
|
||||
*/
|
||||
public final class Integrate {
|
||||
|
||||
static final double errorTolerance = 1.0e-11;
|
||||
/** for time conversion */
|
||||
static final long NPS = (1000L * 1000 * 1000);
|
||||
|
||||
static final int SERIAL = -1;
|
||||
static final int DYNAMIC = 0;
|
||||
static final int FORK = 1;
|
||||
|
||||
// the function to integrate
|
||||
static double computeFunction(double x) {
|
||||
return (x * x + 1.0) * x;
|
||||
}
|
||||
|
||||
static final double start = 0.0;
|
||||
static final double end = 1536.0;
|
||||
/*
|
||||
* The number of recursive calls for
|
||||
* integrate from start to end.
|
||||
* (Empirically determined)
|
||||
*/
|
||||
static final int calls = 263479047;
|
||||
|
||||
static String keywordValue(String[] args, String keyword) {
|
||||
for (String arg : args)
|
||||
if (arg.startsWith(keyword))
|
||||
return arg.substring(keyword.length() + 1);
|
||||
return null;
|
||||
}
|
||||
|
||||
static int intArg(String[] args, String keyword, int defaultValue) {
|
||||
String val = keywordValue(args, keyword);
|
||||
return (val == null) ? defaultValue : Integer.parseInt(val);
|
||||
}
|
||||
|
||||
static int policyArg(String[] args, String keyword, int defaultPolicy) {
|
||||
String val = keywordValue(args, keyword);
|
||||
if (val == null) return defaultPolicy;
|
||||
if (val.equals("dynamic")) return DYNAMIC;
|
||||
if (val.equals("serial")) return SERIAL;
|
||||
if (val.equals("fork")) return FORK;
|
||||
throw new Error();
|
||||
}
|
||||
|
||||
/**
|
||||
* Usage: Integrate [procs=N] [reps=N] forkPolicy=serial|dynamic|fork
|
||||
*/
|
||||
public static void main(String[] args) throws Exception {
|
||||
final int procs = intArg(args, "procs",
|
||||
Runtime.getRuntime().availableProcessors());
|
||||
final int forkPolicy = policyArg(args, "forkPolicy", DYNAMIC);
|
||||
|
||||
ForkJoinPool g = new ForkJoinPool(procs);
|
||||
System.out.println("Integrating from " + start + " to " + end +
|
||||
" forkPolicy = " + forkPolicy);
|
||||
long lastTime = System.nanoTime();
|
||||
|
||||
for (int reps = intArg(args, "reps", 10); reps > 0; reps--) {
|
||||
double a;
|
||||
if (forkPolicy == SERIAL)
|
||||
a = SQuad.computeArea(g, start, end);
|
||||
else if (forkPolicy == FORK)
|
||||
a = FQuad.computeArea(g, start, end);
|
||||
else
|
||||
a = DQuad.computeArea(g, start, end);
|
||||
long now = System.nanoTime();
|
||||
double s = (double) (now - lastTime) / NPS;
|
||||
lastTime = now;
|
||||
System.out.printf("Calls/sec: %12d", (long) (calls / s));
|
||||
System.out.printf(" Time: %7.3f", s);
|
||||
System.out.printf(" Area: %12.1f", a);
|
||||
System.out.println();
|
||||
}
|
||||
System.out.println(g);
|
||||
g.shutdown();
|
||||
}
|
||||
|
||||
|
||||
// Sequential version
|
||||
static final class SQuad extends RecursiveAction {
|
||||
static double computeArea(ForkJoinPool pool, double l, double r) {
|
||||
SQuad q = new SQuad(l, r, 0);
|
||||
pool.invoke(q);
|
||||
return q.area;
|
||||
}
|
||||
|
||||
final double left; // lower bound
|
||||
final double right; // upper bound
|
||||
double area;
|
||||
|
||||
SQuad(double l, double r, double a) {
|
||||
this.left = l; this.right = r; this.area = a;
|
||||
}
|
||||
|
||||
public final void compute() {
|
||||
double l = left;
|
||||
double r = right;
|
||||
area = recEval(l, r, (l * l + 1.0) * l, (r * r + 1.0) * r, area);
|
||||
}
|
||||
|
||||
static final double recEval(double l, double r, double fl,
|
||||
double fr, double a) {
|
||||
double h = (r - l) * 0.5;
|
||||
double c = l + h;
|
||||
double fc = (c * c + 1.0) * c;
|
||||
double hh = h * 0.5;
|
||||
double al = (fl + fc) * hh;
|
||||
double ar = (fr + fc) * hh;
|
||||
double alr = al + ar;
|
||||
if (Math.abs(alr - a) <= errorTolerance)
|
||||
return alr;
|
||||
else
|
||||
return recEval(c, r, fc, fr, ar) + recEval(l, c, fl, fc, al);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
//....................................
|
||||
|
||||
// ForkJoin version
|
||||
static final class FQuad extends RecursiveAction {
|
||||
static double computeArea(ForkJoinPool pool, double l, double r) {
|
||||
FQuad q = new FQuad(l, r, 0);
|
||||
pool.invoke(q);
|
||||
return q.area;
|
||||
}
|
||||
|
||||
final double left; // lower bound
|
||||
final double right; // upper bound
|
||||
double area;
|
||||
|
||||
FQuad(double l, double r, double a) {
|
||||
this.left = l; this.right = r; this.area = a;
|
||||
}
|
||||
|
||||
public final void compute() {
|
||||
double l = left;
|
||||
double r = right;
|
||||
area = recEval(l, r, (l * l + 1.0) * l, (r * r + 1.0) * r, area);
|
||||
}
|
||||
|
||||
static final double recEval(double l, double r, double fl,
|
||||
double fr, double a) {
|
||||
double h = (r - l) * 0.5;
|
||||
double c = l + h;
|
||||
double fc = (c * c + 1.0) * c;
|
||||
double hh = h * 0.5;
|
||||
double al = (fl + fc) * hh;
|
||||
double ar = (fr + fc) * hh;
|
||||
double alr = al + ar;
|
||||
if (Math.abs(alr - a) <= errorTolerance)
|
||||
return alr;
|
||||
FQuad q = new FQuad(l, c, al);
|
||||
q.fork();
|
||||
ar = recEval(c, r, fc, fr, ar);
|
||||
if (!q.tryUnfork()) {
|
||||
q.quietlyHelpJoin();
|
||||
return ar + q.area;
|
||||
}
|
||||
return ar + recEval(l, c, fl, fc, al);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// ...........................
|
||||
|
||||
// Version using on-demand Fork
|
||||
static final class DQuad extends RecursiveAction {
|
||||
static double computeArea(ForkJoinPool pool, double l, double r) {
|
||||
DQuad q = new DQuad(l, r, 0);
|
||||
pool.invoke(q);
|
||||
return q.area;
|
||||
}
|
||||
|
||||
final double left; // lower bound
|
||||
final double right; // upper bound
|
||||
double area;
|
||||
|
||||
DQuad(double l, double r, double a) {
|
||||
this.left = l; this.right = r; this.area = a;
|
||||
}
|
||||
|
||||
public final void compute() {
|
||||
double l = left;
|
||||
double r = right;
|
||||
area = recEval(l, r, (l * l + 1.0) * l, (r * r + 1.0) * r, area);
|
||||
}
|
||||
|
||||
static final double recEval(double l, double r, double fl,
|
||||
double fr, double a) {
|
||||
double h = (r - l) * 0.5;
|
||||
double c = l + h;
|
||||
double fc = (c * c + 1.0) * c;
|
||||
double hh = h * 0.5;
|
||||
double al = (fl + fc) * hh;
|
||||
double ar = (fr + fc) * hh;
|
||||
double alr = al + ar;
|
||||
if (Math.abs(alr - a) <= errorTolerance)
|
||||
return alr;
|
||||
DQuad q = null;
|
||||
if (getSurplusQueuedTaskCount() <= 3)
|
||||
(q = new DQuad(l, c, al)).fork();
|
||||
ar = recEval(c, r, fc, fr, ar);
|
||||
if (q != null && !q.tryUnfork()) {
|
||||
q.quietlyHelpJoin();
|
||||
return ar + q.area;
|
||||
}
|
||||
return ar + recEval(l, c, fl, fc, al);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
174
jdk/test/java/util/concurrent/forkjoin/NQueensCS.java
Normal file
174
jdk/test/java/util/concurrent/forkjoin/NQueensCS.java
Normal file
@ -0,0 +1,174 @@
|
||||
/*
|
||||
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
||||
*
|
||||
* This code is free software; you can redistribute it and/or modify it
|
||||
* under the terms of the GNU General Public License version 2 only, as
|
||||
* published by the Free Software Foundation.
|
||||
*
|
||||
* This code is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
|
||||
* version 2 for more details (a copy is included in the LICENSE file that
|
||||
* accompanied this code).
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License version
|
||||
* 2 along with this work; if not, write to the Free Software Foundation,
|
||||
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||
*
|
||||
* Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
|
||||
* CA 95054 USA or visit www.sun.com if you need additional information or
|
||||
* have any questions.
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is available under and governed by the GNU General Public
|
||||
* License version 2 only, as published by the Free Software Foundation.
|
||||
* However, the following notice accompanied the original version of this
|
||||
* file:
|
||||
*
|
||||
* Written by Doug Lea with assistance from members of JCP JSR-166
|
||||
* Expert Group and released to the public domain, as explained at
|
||||
* http://creativecommons.org/licenses/publicdomain
|
||||
*/
|
||||
|
||||
/*
|
||||
* @test
|
||||
* @bug 6865571
|
||||
* @summary Solve NQueens using fork/join
|
||||
* @run main NQueensCS maxBoardSize=11 reps=1
|
||||
* @run main NQueensCS maxBoardSize=11 reps=1 procs=8
|
||||
*/
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.concurrent.ForkJoinPool;
|
||||
import java.util.concurrent.RecursiveAction;
|
||||
|
||||
public class NQueensCS extends RecursiveAction {
|
||||
|
||||
static long lastStealCount;
|
||||
static int boardSize;
|
||||
|
||||
static final int[] expectedSolutions = new int[] {
|
||||
0, 1, 0, 0, 2, 10, 4, 40, 92, 352, 724, 2680, 14200,
|
||||
73712, 365596, 2279184, 14772512, 95815104, 666090624
|
||||
}; // see http://www.durangobill.com/N_Queens.html
|
||||
|
||||
static String keywordValue(String[] args, String keyword) {
|
||||
for (String arg : args)
|
||||
if (arg.startsWith(keyword))
|
||||
return arg.substring(keyword.length() + 1);
|
||||
return null;
|
||||
}
|
||||
|
||||
static int intArg(String[] args, String keyword, int defaultValue) {
|
||||
String val = keywordValue(args, keyword);
|
||||
return (val == null) ? defaultValue : Integer.parseInt(val);
|
||||
}
|
||||
|
||||
/** for time conversion */
|
||||
static final long NPS = (1000L * 1000L * 1000L);
|
||||
|
||||
/**
|
||||
* Usage: NQueensCS [minBoardSize=N] [maxBoardSize=N] [procs=N] [reps=N]
|
||||
*/
|
||||
public static void main(String[] args) throws Exception {
|
||||
// Board sizes too small: hard to measure well.
|
||||
// Board sizes too large: take too long to run.
|
||||
final int minBoardSize = intArg(args, "minBoardSize", 8);
|
||||
final int maxBoardSize = intArg(args, "maxBoardSize", 15);
|
||||
|
||||
final int procs = intArg(args, "procs", 0);
|
||||
|
||||
for (int reps = intArg(args, "reps", 10); reps > 0; reps--) {
|
||||
ForkJoinPool g = (procs == 0) ?
|
||||
new ForkJoinPool() :
|
||||
new ForkJoinPool(procs);
|
||||
lastStealCount = g.getStealCount();
|
||||
for (int i = minBoardSize; i <= maxBoardSize; i++)
|
||||
test(g, i);
|
||||
System.out.println(g);
|
||||
g.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
static void test(ForkJoinPool g, int i) throws Exception {
|
||||
boardSize = i;
|
||||
int ps = g.getParallelism();
|
||||
long start = System.nanoTime();
|
||||
NQueensCS task = new NQueensCS(new int[0]);
|
||||
g.invoke(task);
|
||||
int solutions = task.solutions;
|
||||
long time = System.nanoTime() - start;
|
||||
double secs = (double) time / NPS;
|
||||
if (solutions != expectedSolutions[i])
|
||||
throw new Error();
|
||||
System.out.printf("NQueensCS %3d", i);
|
||||
System.out.printf(" Time: %7.3f", secs);
|
||||
long sc = g.getStealCount();
|
||||
long ns = sc - lastStealCount;
|
||||
lastStealCount = sc;
|
||||
System.out.printf(" Steals/t: %5d", ns/ps);
|
||||
System.out.println();
|
||||
}
|
||||
|
||||
// Boards are represented as arrays where each cell
|
||||
// holds the column number of the queen in that row
|
||||
|
||||
final int[] sofar;
|
||||
NQueensCS nextSubtask; // to link subtasks
|
||||
int solutions;
|
||||
NQueensCS(int[] a) {
|
||||
this.sofar = a;
|
||||
}
|
||||
|
||||
public final void compute() {
|
||||
NQueensCS subtasks;
|
||||
int bs = boardSize;
|
||||
if (sofar.length >= bs)
|
||||
solutions = 1;
|
||||
else if ((subtasks = explore(sofar, bs)) != null)
|
||||
solutions = processSubtasks(subtasks);
|
||||
}
|
||||
|
||||
private static NQueensCS explore(int[] array, int bs) {
|
||||
int row = array.length;
|
||||
NQueensCS s = null; // subtask list
|
||||
outer:
|
||||
for (int q = 0; q < bs; ++q) {
|
||||
for (int i = 0; i < row; i++) {
|
||||
int p = array[i];
|
||||
if (q == p || q == p - (row - i) || q == p + (row - i))
|
||||
continue outer; // attacked
|
||||
}
|
||||
NQueensCS first = s; // lag forks to ensure 1 kept
|
||||
if (first != null)
|
||||
first.fork();
|
||||
int[] next = Arrays.copyOf(array, row+1);
|
||||
next[row] = q;
|
||||
NQueensCS subtask = new NQueensCS(next);
|
||||
subtask.nextSubtask = first;
|
||||
s = subtask;
|
||||
}
|
||||
return s;
|
||||
}
|
||||
|
||||
private static int processSubtasks(NQueensCS s) {
|
||||
// Always run first the task held instead of forked
|
||||
s.compute();
|
||||
int ns = s.solutions;
|
||||
s = s.nextSubtask;
|
||||
// Then the unstolen ones
|
||||
while (s != null && s.tryUnfork()) {
|
||||
s.compute();
|
||||
ns += s.solutions;
|
||||
s = s.nextSubtask;
|
||||
}
|
||||
// Then wait for the stolen ones
|
||||
while (s != null) {
|
||||
s.join();
|
||||
ns += s.solutions;
|
||||
s = s.nextSubtask;
|
||||
}
|
||||
return ns;
|
||||
}
|
||||
}
|
@ -115,7 +115,7 @@ public final class CancelledLockLoops {
|
||||
finally {
|
||||
lock.unlock();
|
||||
}
|
||||
if (completed != 2)
|
||||
if (c != 2)
|
||||
throw new Error("Completed != 2");
|
||||
int r = result;
|
||||
if (r == 0) // avoid overoptimization
|
||||
|
@ -30,6 +30,7 @@
|
||||
* Expert Group and released to the public domain, as explained at
|
||||
* http://creativecommons.org/licenses/publicdomain
|
||||
*/
|
||||
|
||||
import java.util.*;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.concurrent.locks.*;
|
||||
|
Loading…
x
Reference in New Issue
Block a user