7023006: Reduce unnecessary thread activity in ForkJoinPool

Reviewed-by: chegar, dholmes
This commit is contained in:
Doug Lea 2011-03-08 18:16:14 +00:00
parent f048eee643
commit 6a14cb99e0
3 changed files with 2015 additions and 1890 deletions

File diff suppressed because it is too large Load Diff

View File

@ -41,7 +41,8 @@ import java.util.Collections;
import java.util.List;
import java.util.RandomAccess;
import java.util.Map;
import java.util.WeakHashMap;
import java.lang.ref.WeakReference;
import java.lang.ref.ReferenceQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
@ -52,6 +53,8 @@ import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.ReentrantLock;
import java.lang.reflect.Constructor;
/**
* Abstract base class for tasks that run within a {@link ForkJoinPool}.
@ -95,7 +98,11 @@ import java.util.concurrent.TimeoutException;
* rethrown to callers attempting to join them. These exceptions may
* additionally include {@link RejectedExecutionException} stemming
* from internal resource exhaustion, such as failure to allocate
* internal task queues.
* internal task queues. Rethrown exceptions behave in the same way as
* regular exceptions, but, when possible, contain stack traces (as
* displayed for example using {@code ex.printStackTrace()}) of both
* the thread that initiated the computation as well as the thread
* actually encountering the exception; minimally only the latter.
*
* <p>The primary method for awaiting completion and extracting
* results of a task is {@link #join}, but there are several variants:
@ -192,8 +199,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
* status maintenance (2) execution and awaiting completion (3)
* user-level methods that additionally report results. This is
* sometimes hard to see because this file orders exported methods
* in a way that flows well in javadocs. In particular, most
* join mechanics are in method quietlyJoin, below.
* in a way that flows well in javadocs.
*/
/*
@ -215,91 +221,67 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
/** The run status of this task */
volatile int status; // accessed directly by pool and workers
private static final int NORMAL = -1;
private static final int CANCELLED = -2;
private static final int EXCEPTIONAL = -3;
private static final int SIGNAL = 1;
/**
* Table of exceptions thrown by tasks, to enable reporting by
* callers. Because exceptions are rare, we don't directly keep
* them with task objects, but instead use a weak ref table. Note
* that cancellation exceptions don't appear in the table, but are
* instead recorded as status values.
* TODO: Use ConcurrentReferenceHashMap
*/
static final Map<ForkJoinTask<?>, Throwable> exceptionMap =
Collections.synchronizedMap
(new WeakHashMap<ForkJoinTask<?>, Throwable>());
// Maintaining completion status
/**
* Marks completion and wakes up threads waiting to join this task,
* also clearing signal request bits.
*
* @param completion one of NORMAL, CANCELLED, EXCEPTIONAL
* @return completion status on exit
*/
private void setCompletion(int completion) {
int s;
while ((s = status) >= 0) {
private int setCompletion(int completion) {
for (int s;;) {
if ((s = status) < 0)
return s;
if (UNSAFE.compareAndSwapInt(this, statusOffset, s, completion)) {
if (s != 0)
synchronized (this) { notifyAll(); }
break;
return completion;
}
}
}
/**
* Records exception and sets exceptional completion.
* Tries to block a worker thread until completed or timed out.
* Uses Object.wait time argument conventions.
* May fail on contention or interrupt.
*
* @return status on exit
* @param millis if > 0, wait time.
*/
private void setExceptionalCompletion(Throwable rex) {
exceptionMap.put(this, rex);
setCompletion(EXCEPTIONAL);
}
/**
* Blocks a worker thread until completed or timed out. Called
* only by pool.
*/
final void internalAwaitDone(long millis, int nanos) {
int s = status;
if ((s == 0 &&
UNSAFE.compareAndSwapInt(this, statusOffset, 0, SIGNAL)) ||
s > 0) {
try { // the odd construction reduces lock bias effects
final void tryAwaitDone(long millis) {
int s;
try {
if (((s = status) > 0 ||
(s == 0 &&
UNSAFE.compareAndSwapInt(this, statusOffset, 0, SIGNAL))) &&
status > 0) {
synchronized (this) {
if (status > 0)
wait(millis, nanos);
else
notifyAll();
wait(millis);
}
} catch (InterruptedException ie) {
cancelIfTerminating();
}
} catch (InterruptedException ie) {
// caller must check termination
}
}
/**
* Blocks a non-worker-thread until completion.
* @return status upon completion
*/
private void externalAwaitDone() {
if (status >= 0) {
private int externalAwaitDone() {
int s;
if ((s = status) >= 0) {
boolean interrupted = false;
synchronized (this) {
for (;;) {
int s = status;
while ((s = status) >= 0) {
if (s == 0)
UNSAFE.compareAndSwapInt(this, statusOffset,
0, SIGNAL);
else if (s < 0) {
notifyAll();
break;
}
else {
try {
wait();
@ -312,53 +294,308 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
if (interrupted)
Thread.currentThread().interrupt();
}
return s;
}
/**
* Blocks a non-worker-thread until completion or interruption or timeout.
*/
private void externalInterruptibleAwaitDone(boolean timed, long nanos)
private int externalInterruptibleAwaitDone(long millis)
throws InterruptedException {
int s;
if (Thread.interrupted())
throw new InterruptedException();
if (status >= 0) {
long startTime = timed ? System.nanoTime() : 0L;
if ((s = status) >= 0) {
synchronized (this) {
for (;;) {
long nt;
int s = status;
while ((s = status) >= 0) {
if (s == 0)
UNSAFE.compareAndSwapInt(this, statusOffset,
0, SIGNAL);
else if (s < 0) {
notifyAll();
else {
wait(millis);
if (millis > 0L)
break;
}
}
}
}
return s;
}
/**
* Primary execution method for stolen tasks. Unless done, calls
* exec and records status if completed, but doesn't wait for
* completion otherwise.
*/
final void doExec() {
if (status >= 0) {
boolean completed;
try {
completed = exec();
} catch (Throwable rex) {
setExceptionalCompletion(rex);
return;
}
if (completed)
setCompletion(NORMAL); // must be outside try block
}
}
/**
* Primary mechanics for join, get, quietlyJoin.
* @return status upon completion
*/
private int doJoin() {
Thread t; ForkJoinWorkerThread w; int s; boolean completed;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
if ((s = status) < 0)
return s;
if ((w = (ForkJoinWorkerThread)t).unpushTask(this)) {
try {
completed = exec();
} catch (Throwable rex) {
return setExceptionalCompletion(rex);
}
if (completed)
return setCompletion(NORMAL);
}
return w.joinTask(this);
}
else
return externalAwaitDone();
}
/**
* Primary mechanics for invoke, quietlyInvoke.
* @return status upon completion
*/
private int doInvoke() {
int s; boolean completed;
if ((s = status) < 0)
return s;
try {
completed = exec();
} catch (Throwable rex) {
return setExceptionalCompletion(rex);
}
if (completed)
return setCompletion(NORMAL);
else
return doJoin();
}
// Exception table support
/**
* Table of exceptions thrown by tasks, to enable reporting by
* callers. Because exceptions are rare, we don't directly keep
* them with task objects, but instead use a weak ref table. Note
* that cancellation exceptions don't appear in the table, but are
* instead recorded as status values.
*
* Note: These statics are initialized below in static block.
*/
private static final ExceptionNode[] exceptionTable;
private static final ReentrantLock exceptionTableLock;
private static final ReferenceQueue<Object> exceptionTableRefQueue;
/**
* Fixed capacity for exceptionTable.
*/
private static final int EXCEPTION_MAP_CAPACITY = 32;
/**
* Key-value nodes for exception table. The chained hash table
* uses identity comparisons, full locking, and weak references
* for keys. The table has a fixed capacity because it only
* maintains task exceptions long enough for joiners to access
* them, so should never become very large for sustained
* periods. However, since we do not know when the last joiner
* completes, we must use weak references and expunge them. We do
* so on each operation (hence full locking). Also, some thread in
* any ForkJoinPool will call helpExpungeStaleExceptions when its
* pool becomes isQuiescent.
*/
static final class ExceptionNode extends WeakReference<ForkJoinTask<?>>{
final Throwable ex;
ExceptionNode next;
final long thrower; // use id not ref to avoid weak cycles
ExceptionNode(ForkJoinTask<?> task, Throwable ex, ExceptionNode next) {
super(task, exceptionTableRefQueue);
this.ex = ex;
this.next = next;
this.thrower = Thread.currentThread().getId();
}
}
/**
* Records exception and sets exceptional completion.
*
* @return status on exit
*/
private int setExceptionalCompletion(Throwable ex) {
int h = System.identityHashCode(this);
final ReentrantLock lock = exceptionTableLock;
lock.lock();
try {
expungeStaleExceptions();
ExceptionNode[] t = exceptionTable;
int i = h & (t.length - 1);
for (ExceptionNode e = t[i]; ; e = e.next) {
if (e == null) {
t[i] = new ExceptionNode(this, ex, t[i]);
break;
}
if (e.get() == this) // already present
break;
}
} finally {
lock.unlock();
}
return setCompletion(EXCEPTIONAL);
}
/**
* Removes exception node and clears status
*/
private void clearExceptionalCompletion() {
int h = System.identityHashCode(this);
final ReentrantLock lock = exceptionTableLock;
lock.lock();
try {
ExceptionNode[] t = exceptionTable;
int i = h & (t.length - 1);
ExceptionNode e = t[i];
ExceptionNode pred = null;
while (e != null) {
ExceptionNode next = e.next;
if (e.get() == this) {
if (pred == null)
t[i] = next;
else
pred.next = next;
break;
}
pred = e;
e = next;
}
expungeStaleExceptions();
status = 0;
} finally {
lock.unlock();
}
}
/**
* Returns a rethrowable exception for the given task, if
* available. To provide accurate stack traces, if the exception
* was not thrown by the current thread, we try to create a new
* exception of the same type as the one thrown, but with the
* recorded exception as its cause. If there is no such
* constructor, we instead try to use a no-arg constructor,
* followed by initCause, to the same effect. If none of these
* apply, or any fail due to other exceptions, we return the
* recorded exception, which is still correct, although it may
* contain a misleading stack trace.
*
* @return the exception, or null if none
*/
private Throwable getThrowableException() {
if (status != EXCEPTIONAL)
return null;
int h = System.identityHashCode(this);
ExceptionNode e;
final ReentrantLock lock = exceptionTableLock;
lock.lock();
try {
expungeStaleExceptions();
ExceptionNode[] t = exceptionTable;
e = t[h & (t.length - 1)];
while (e != null && e.get() != this)
e = e.next;
} finally {
lock.unlock();
}
Throwable ex;
if (e == null || (ex = e.ex) == null)
return null;
if (e.thrower != Thread.currentThread().getId()) {
Class ec = ex.getClass();
try {
Constructor<?> noArgCtor = null;
Constructor<?>[] cs = ec.getConstructors();// public ctors only
for (int i = 0; i < cs.length; ++i) {
Constructor<?> c = cs[i];
Class<?>[] ps = c.getParameterTypes();
if (ps.length == 0)
noArgCtor = c;
else if (ps.length == 1 && ps[0] == Throwable.class)
return (Throwable)(c.newInstance(ex));
}
if (noArgCtor != null) {
Throwable wx = (Throwable)(noArgCtor.newInstance());
wx.initCause(ex);
return wx;
}
} catch (Exception ignore) {
}
}
return ex;
}
/**
* Poll stale refs and remove them. Call only while holding lock.
*/
private static void expungeStaleExceptions() {
for (Object x; (x = exceptionTableRefQueue.poll()) != null;) {
if (x instanceof ExceptionNode) {
ForkJoinTask<?> key = ((ExceptionNode)x).get();
ExceptionNode[] t = exceptionTable;
int i = System.identityHashCode(key) & (t.length - 1);
ExceptionNode e = t[i];
ExceptionNode pred = null;
while (e != null) {
ExceptionNode next = e.next;
if (e == x) {
if (pred == null)
t[i] = next;
else
pred.next = next;
break;
}
else if (!timed)
wait();
else if ((nt = nanos - (System.nanoTime()-startTime)) > 0L)
wait(nt / 1000000, (int)(nt % 1000000));
else
break;
pred = e;
e = next;
}
}
}
}
/**
* Unless done, calls exec and records status if completed, but
* doesn't wait for completion otherwise. Primary execution method
* for ForkJoinWorkerThread.
* If lock is available, poll stale refs and remove them.
* Called from ForkJoinPool when pools become quiescent.
*/
final void quietlyExec() {
try {
if (status < 0 || !exec())
return;
} catch (Throwable rex) {
setExceptionalCompletion(rex);
return;
static final void helpExpungeStaleExceptions() {
final ReentrantLock lock = exceptionTableLock;
if (lock.tryLock()) {
try {
expungeStaleExceptions();
} finally {
lock.unlock();
}
}
setCompletion(NORMAL); // must be outside try block
}
/**
* Report the result of invoke or join; called only upon
* non-normal return of internal versions.
*/
private V reportResult() {
int s; Throwable ex;
if ((s = status) == CANCELLED)
throw new CancellationException();
if (s == EXCEPTIONAL && (ex = getThrowableException()) != null)
UNSAFE.throwException(ex);
return getRawResult();
}
// public methods
@ -399,11 +636,10 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
* @return the computed result
*/
public final V join() {
quietlyJoin();
Throwable ex;
if (status < NORMAL && (ex = getException()) != null)
UNSAFE.throwException(ex);
return getRawResult();
if (doJoin() != NORMAL)
return reportResult();
else
return getRawResult();
}
/**
@ -415,11 +651,10 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
* @return the computed result
*/
public final V invoke() {
quietlyInvoke();
Throwable ex;
if (status < NORMAL && (ex = getException()) != null)
UNSAFE.throwException(ex);
return getRawResult();
if (doInvoke() != NORMAL)
return reportResult();
else
return getRawResult();
}
/**
@ -483,22 +718,16 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
}
else if (i != 0)
t.fork();
else {
t.quietlyInvoke();
if (ex == null && t.status < NORMAL)
ex = t.getException();
}
else if (t.doInvoke() < NORMAL && ex == null)
ex = t.getException();
}
for (int i = 1; i <= last; ++i) {
ForkJoinTask<?> t = tasks[i];
if (t != null) {
if (ex != null)
t.cancel(false);
else {
t.quietlyJoin();
if (ex == null && t.status < NORMAL)
ex = t.getException();
}
else if (t.doJoin() < NORMAL && ex == null)
ex = t.getException();
}
}
if (ex != null)
@ -546,22 +775,16 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
}
else if (i != 0)
t.fork();
else {
t.quietlyInvoke();
if (ex == null && t.status < NORMAL)
ex = t.getException();
}
else if (t.doInvoke() < NORMAL && ex == null)
ex = t.getException();
}
for (int i = 1; i <= last; ++i) {
ForkJoinTask<?> t = ts.get(i);
if (t != null) {
if (ex != null)
t.cancel(false);
else {
t.quietlyJoin();
if (ex == null && t.status < NORMAL)
ex = t.getException();
}
else if (t.doJoin() < NORMAL && ex == null)
ex = t.getException();
}
}
if (ex != null)
@ -597,8 +820,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
* @return {@code true} if this task is now cancelled
*/
public boolean cancel(boolean mayInterruptIfRunning) {
setCompletion(CANCELLED);
return status == CANCELLED;
return setCompletion(CANCELLED) == CANCELLED;
}
/**
@ -614,21 +836,6 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
}
}
/**
* Cancels if current thread is a terminating worker thread,
* ignoring any exceptions thrown by cancel.
*/
final void cancelIfTerminating() {
Thread t = Thread.currentThread();
if ((t instanceof ForkJoinWorkerThread) &&
((ForkJoinWorkerThread) t).isTerminating()) {
try {
cancel(false);
} catch (Throwable ignore) {
}
}
}
public final boolean isDone() {
return status < 0;
}
@ -668,7 +875,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
int s = status;
return ((s >= NORMAL) ? null :
(s == CANCELLED) ? new CancellationException() :
exceptionMap.get(this));
getThrowableException());
}
/**
@ -726,19 +933,13 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
* member of a ForkJoinPool and was interrupted while waiting
*/
public final V get() throws InterruptedException, ExecutionException {
Thread t = Thread.currentThread();
if (t instanceof ForkJoinWorkerThread)
quietlyJoin();
else
externalInterruptibleAwaitDone(false, 0L);
int s = status;
if (s != NORMAL) {
Throwable ex;
if (s == CANCELLED)
throw new CancellationException();
if (s == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
throw new ExecutionException(ex);
}
int s = (Thread.currentThread() instanceof ForkJoinWorkerThread) ?
doJoin() : externalInterruptibleAwaitDone(0L);
Throwable ex;
if (s == CANCELLED)
throw new CancellationException();
if (s == EXCEPTIONAL && (ex = getThrowableException()) != null)
throw new ExecutionException(ex);
return getRawResult();
}
@ -758,20 +959,39 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
*/
public final V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
long nanos = unit.toNanos(timeout);
Thread t = Thread.currentThread();
if (t instanceof ForkJoinWorkerThread)
((ForkJoinWorkerThread)t).joinTask(this, true, nanos);
else
externalInterruptibleAwaitDone(true, nanos);
if (t instanceof ForkJoinWorkerThread) {
ForkJoinWorkerThread w = (ForkJoinWorkerThread) t;
long nanos = unit.toNanos(timeout);
if (status >= 0) {
boolean completed = false;
if (w.unpushTask(this)) {
try {
completed = exec();
} catch (Throwable rex) {
setExceptionalCompletion(rex);
}
}
if (completed)
setCompletion(NORMAL);
else if (status >= 0 && nanos > 0)
w.pool.timedAwaitJoin(this, nanos);
}
}
else {
long millis = unit.toMillis(timeout);
if (millis > 0)
externalInterruptibleAwaitDone(millis);
}
int s = status;
if (s != NORMAL) {
Throwable ex;
if (s == CANCELLED)
throw new CancellationException();
if (s == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
if (s != EXCEPTIONAL)
throw new TimeoutException();
if ((ex = getThrowableException()) != null)
throw new ExecutionException(ex);
throw new TimeoutException();
}
return getRawResult();
}
@ -783,28 +1003,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
* known to have aborted.
*/
public final void quietlyJoin() {
Thread t;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
ForkJoinWorkerThread w = (ForkJoinWorkerThread) t;
if (status >= 0) {
if (w.unpushTask(this)) {
boolean completed;
try {
completed = exec();
} catch (Throwable rex) {
setExceptionalCompletion(rex);
return;
}
if (completed) {
setCompletion(NORMAL);
return;
}
}
w.joinTask(this, false, 0L);
}
}
else
externalAwaitDone();
doJoin();
}
/**
@ -813,19 +1012,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
* exception.
*/
public final void quietlyInvoke() {
if (status >= 0) {
boolean completed;
try {
completed = exec();
} catch (Throwable rex) {
setExceptionalCompletion(rex);
return;
}
if (completed)
setCompletion(NORMAL);
else
quietlyJoin();
}
doInvoke();
}
/**
@ -864,8 +1051,9 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
*/
public void reinitialize() {
if (status == EXCEPTIONAL)
exceptionMap.remove(this);
status = 0;
clearExceptionalCompletion();
else
status = 0;
}
/**
@ -1176,23 +1364,23 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
s.defaultReadObject();
Object ex = s.readObject();
if (ex != null)
setExceptionalCompletion((Throwable) ex);
setExceptionalCompletion((Throwable)ex);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
private static final long statusOffset =
objectFieldOffset("status", ForkJoinTask.class);
private static long objectFieldOffset(String field, Class<?> klazz) {
private static final sun.misc.Unsafe UNSAFE;
private static final long statusOffset;
static {
exceptionTableLock = new ReentrantLock();
exceptionTableRefQueue = new ReferenceQueue<Object>();
exceptionTable = new ExceptionNode[EXCEPTION_MAP_CAPACITY];
try {
return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
} catch (NoSuchFieldException e) {
// Convert Exception to corresponding Error
NoSuchFieldError error = new NoSuchFieldError(field);
error.initCause(e);
throw error;
UNSAFE = sun.misc.Unsafe.getUnsafe();
statusOffset = UNSAFE.objectFieldOffset
(ForkJoinTask.class.getDeclaredField("status"));
} catch (Exception e) {
throw new Error(e);
}
}
}