6986050: Small clarifications and fixes for ForkJoin

Clarify FJ.get on throw InterruptedException, propagate ThreadFactory, shutdown transition

Reviewed-by: chegar
This commit is contained in:
Doug Lea 2010-09-21 16:06:59 +01:00
parent 0d636dcf1c
commit 9d5707e87d
4 changed files with 91 additions and 29 deletions

View File

@ -42,7 +42,6 @@ import java.util.Collections;
import java.util.List;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
@ -823,15 +822,13 @@ public class ForkJoinPool extends AbstractExecutorService {
(workerCounts & RUNNING_COUNT_MASK) <= 1);
long startTime = untimed? 0 : System.nanoTime();
Thread.interrupted(); // clear/ignore interrupt
if (eventCount != ec || w.runState != 0 ||
runState >= TERMINATING) // recheck after clear
break;
if (eventCount != ec || w.isTerminating())
break; // recheck after clear
if (untimed)
LockSupport.park(w);
else {
LockSupport.parkNanos(w, SHRINK_RATE_NANOS);
if (eventCount != ec || w.runState != 0 ||
runState >= TERMINATING)
if (eventCount != ec || w.isTerminating())
break;
if (System.nanoTime() - startTime >= SHRINK_RATE_NANOS)
tryShutdownUnusedWorker(ec);
@ -899,16 +896,23 @@ public class ForkJoinPool extends AbstractExecutorService {
UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc,
wc + (ONE_RUNNING|ONE_TOTAL))) {
ForkJoinWorkerThread w = null;
Throwable fail = null;
try {
w = factory.newThread(this);
} finally { // adjust on null or exceptional factory return
if (w == null) {
decrementWorkerCounts(ONE_RUNNING, ONE_TOTAL);
tryTerminate(false); // handle failure during shutdown
}
} catch (Throwable ex) {
fail = ex;
}
if (w == null)
if (w == null) { // null or exceptional factory return
decrementWorkerCounts(ONE_RUNNING, ONE_TOTAL);
tryTerminate(false); // handle failure during shutdown
// If originating from an external caller,
// propagate exception, else ignore
if (fail != null && runState < TERMINATING &&
!(Thread.currentThread() instanceof
ForkJoinWorkerThread))
UNSAFE.throwException(fail);
break;
}
w.start(recordWorker(w), ueh);
if ((workerCounts >>> TOTAL_COUNT_SHIFT) >= pc) {
int c; // advance event count
@ -997,8 +1001,12 @@ public class ForkJoinPool extends AbstractExecutorService {
boolean active = w.active;
boolean inactivate = false;
int pc = parallelism;
int rs;
while (w.runState == 0 && (rs = runState) < TERMINATING) {
while (w.runState == 0) {
int rs = runState;
if (rs >= TERMINATING) { // propagate shutdown
w.shutdown();
break;
}
if ((inactivate || (active && (rs & ACTIVE_COUNT_MASK) >= pc)) &&
UNSAFE.compareAndSwapInt(this, runStateOffset, rs, rs - 1))
inactivate = active = w.active = false;
@ -1126,6 +1134,7 @@ public class ForkJoinPool extends AbstractExecutorService {
return true;
}
/**
* Actions on transition to TERMINATING
*
@ -1149,7 +1158,7 @@ public class ForkJoinPool extends AbstractExecutorService {
if (passes > 0 && !w.isTerminated()) {
w.cancelTasks();
LockSupport.unpark(w);
if (passes > 1) {
if (passes > 1 && !w.isInterrupted()) {
try {
w.interrupt();
} catch (SecurityException ignore) {
@ -1725,6 +1734,13 @@ public class ForkJoinPool extends AbstractExecutorService {
return (runState & (TERMINATING|TERMINATED)) == TERMINATING;
}
/**
* Returns true if terminating or terminated. Used by ForkJoinWorkerThread.
*/
final boolean isAtLeastTerminating() {
return runState >= TERMINATING;
}
/**
* Returns {@code true} if this pool has been shut down.
*

View File

@ -55,10 +55,10 @@ import java.util.WeakHashMap;
* start other subtasks. As indicated by the name of this class,
* many programs using {@code ForkJoinTask} employ only methods
* {@link #fork} and {@link #join}, or derivatives such as {@link
* #invokeAll}. However, this class also provides a number of other
* methods that can come into play in advanced usages, as well as
* extension mechanics that allow support of new forms of fork/join
* processing.
* #invokeAll(ForkJoinTask...) invokeAll}. However, this class also
* provides a number of other methods that can come into play in
* advanced usages, as well as extension mechanics that allow
* support of new forms of fork/join processing.
*
* <p>A {@code ForkJoinTask} is a lightweight form of {@link Future}.
* The efficiency of {@code ForkJoinTask}s stems from a set of
@ -250,7 +250,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
int s; // the odd construction reduces lock bias effects
while ((s = status) >= 0) {
try {
synchronized(this) {
synchronized (this) {
if (UNSAFE.compareAndSwapInt(this, statusOffset, s,SIGNAL))
wait();
}
@ -270,7 +270,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
int s;
if ((s = status) >= 0) {
try {
synchronized(this) {
synchronized (this) {
if (UNSAFE.compareAndSwapInt(this, statusOffset, s,SIGNAL))
wait(millis, 0);
}
@ -288,7 +288,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
private void externalAwaitDone() {
int s;
while ((s = status) >= 0) {
synchronized(this) {
synchronized (this) {
if (UNSAFE.compareAndSwapInt(this, statusOffset, s, SIGNAL)){
boolean interrupted = false;
while (status >= 0) {
@ -669,11 +669,34 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
setCompletion(NORMAL);
}
/**
* Waits if necessary for the computation to complete, and then
* retrieves its result.
*
* @return the computed result
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an
* exception
* @throws InterruptedException if the current thread is not a
* member of a ForkJoinPool and was interrupted while waiting
*/
public final V get() throws InterruptedException, ExecutionException {
quietlyJoin();
if (Thread.interrupted())
throw new InterruptedException();
int s = status;
int s;
if (Thread.currentThread() instanceof ForkJoinWorkerThread) {
quietlyJoin();
s = status;
}
else {
while ((s = status) >= 0) {
synchronized (this) { // interruptible form of awaitDone
if (UNSAFE.compareAndSwapInt(this, statusOffset,
s, SIGNAL)) {
while (status >= 0)
wait();
}
}
}
}
if (s < NORMAL) {
Throwable ex;
if (s == CANCELLED)
@ -684,6 +707,20 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
return getRawResult();
}
/**
* Waits if necessary for at most the given time for the computation
* to complete, and then retrieves its result, if available.
*
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @return the computed result
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an
* exception
* @throws InterruptedException if the current thread is not a
* member of a ForkJoinPool and was interrupted while waiting
* @throws TimeoutException if the wait timed out
*/
public final V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
Thread t = Thread.currentThread();
@ -725,7 +762,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
long ms = nt / 1000000;
int ns = (int) (nt % 1000000);
try {
synchronized(this) {
synchronized (this) {
if (status >= 0)
wait(ms, ns);
}

View File

@ -778,11 +778,20 @@ public class ForkJoinWorkerThread extends Thread {
// status check methods used mainly by ForkJoinPool
final boolean isRunning() { return runState == 0; }
final boolean isTerminating() { return (runState & TERMINATING) != 0; }
final boolean isTerminated() { return (runState & TERMINATED) != 0; }
final boolean isSuspended() { return (runState & SUSPENDED) != 0; }
final boolean isTrimmed() { return (runState & TRIMMED) != 0; }
final boolean isTerminating() {
if ((runState & TERMINATING) != 0)
return true;
if (pool.isAtLeastTerminating()) { // propagate pool state
shutdown();
return true;
}
return false;
}
/**
* Sets state to TERMINATING. Does NOT unpark or interrupt
* to wake up if currently blocked. Callers must do so if desired.

View File

@ -138,7 +138,7 @@ package java.util.concurrent;
* if (right.tryUnfork()) // directly calculate if not stolen
* sum += right.atLeaf(right.lo, right.hi);
* else {
* right.helpJoin();
* right.join();
* sum += right.result;
* }
* right = right.next;