diff --git a/jdk/src/share/classes/java/util/concurrent/ForkJoinPool.java b/jdk/src/share/classes/java/util/concurrent/ForkJoinPool.java index 122ce86737e..263552a93ea 100644 --- a/jdk/src/share/classes/java/util/concurrent/ForkJoinPool.java +++ b/jdk/src/share/classes/java/util/concurrent/ForkJoinPool.java @@ -42,7 +42,6 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.AbstractExecutorService; import java.util.concurrent.Callable; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; @@ -823,15 +822,13 @@ public class ForkJoinPool extends AbstractExecutorService { (workerCounts & RUNNING_COUNT_MASK) <= 1); long startTime = untimed? 0 : System.nanoTime(); Thread.interrupted(); // clear/ignore interrupt - if (eventCount != ec || w.runState != 0 || - runState >= TERMINATING) // recheck after clear - break; + if (eventCount != ec || w.isTerminating()) + break; // recheck after clear if (untimed) LockSupport.park(w); else { LockSupport.parkNanos(w, SHRINK_RATE_NANOS); - if (eventCount != ec || w.runState != 0 || - runState >= TERMINATING) + if (eventCount != ec || w.isTerminating()) break; if (System.nanoTime() - startTime >= SHRINK_RATE_NANOS) tryShutdownUnusedWorker(ec); @@ -899,16 +896,23 @@ public class ForkJoinPool extends AbstractExecutorService { UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc, wc + (ONE_RUNNING|ONE_TOTAL))) { ForkJoinWorkerThread w = null; + Throwable fail = null; try { w = factory.newThread(this); - } finally { // adjust on null or exceptional factory return - if (w == null) { - decrementWorkerCounts(ONE_RUNNING, ONE_TOTAL); - tryTerminate(false); // handle failure during shutdown - } + } catch (Throwable ex) { + fail = ex; } - if (w == null) + if (w == null) { // null or exceptional factory return + decrementWorkerCounts(ONE_RUNNING, ONE_TOTAL); + tryTerminate(false); // handle failure during shutdown + // If originating from an external caller, + // propagate exception, else ignore + if (fail != null && runState < TERMINATING && + !(Thread.currentThread() instanceof + ForkJoinWorkerThread)) + UNSAFE.throwException(fail); break; + } w.start(recordWorker(w), ueh); if ((workerCounts >>> TOTAL_COUNT_SHIFT) >= pc) { int c; // advance event count @@ -997,8 +1001,12 @@ public class ForkJoinPool extends AbstractExecutorService { boolean active = w.active; boolean inactivate = false; int pc = parallelism; - int rs; - while (w.runState == 0 && (rs = runState) < TERMINATING) { + while (w.runState == 0) { + int rs = runState; + if (rs >= TERMINATING) { // propagate shutdown + w.shutdown(); + break; + } if ((inactivate || (active && (rs & ACTIVE_COUNT_MASK) >= pc)) && UNSAFE.compareAndSwapInt(this, runStateOffset, rs, rs - 1)) inactivate = active = w.active = false; @@ -1126,6 +1134,7 @@ public class ForkJoinPool extends AbstractExecutorService { return true; } + /** * Actions on transition to TERMINATING * @@ -1149,7 +1158,7 @@ public class ForkJoinPool extends AbstractExecutorService { if (passes > 0 && !w.isTerminated()) { w.cancelTasks(); LockSupport.unpark(w); - if (passes > 1) { + if (passes > 1 && !w.isInterrupted()) { try { w.interrupt(); } catch (SecurityException ignore) { @@ -1725,6 +1734,13 @@ public class ForkJoinPool extends AbstractExecutorService { return (runState & (TERMINATING|TERMINATED)) == TERMINATING; } + /** + * Returns true if terminating or terminated. Used by ForkJoinWorkerThread. + */ + final boolean isAtLeastTerminating() { + return runState >= TERMINATING; + } + /** * Returns {@code true} if this pool has been shut down. * diff --git a/jdk/src/share/classes/java/util/concurrent/ForkJoinTask.java b/jdk/src/share/classes/java/util/concurrent/ForkJoinTask.java index 598578f0cf1..cd18360f83d 100644 --- a/jdk/src/share/classes/java/util/concurrent/ForkJoinTask.java +++ b/jdk/src/share/classes/java/util/concurrent/ForkJoinTask.java @@ -55,10 +55,10 @@ import java.util.WeakHashMap; * start other subtasks. As indicated by the name of this class, * many programs using {@code ForkJoinTask} employ only methods * {@link #fork} and {@link #join}, or derivatives such as {@link - * #invokeAll}. However, this class also provides a number of other - * methods that can come into play in advanced usages, as well as - * extension mechanics that allow support of new forms of fork/join - * processing. + * #invokeAll(ForkJoinTask...) invokeAll}. However, this class also + * provides a number of other methods that can come into play in + * advanced usages, as well as extension mechanics that allow + * support of new forms of fork/join processing. * *
A {@code ForkJoinTask} is a lightweight form of {@link Future}.
* The efficiency of {@code ForkJoinTask}s stems from a set of
@@ -250,7 +250,7 @@ public abstract class ForkJoinTask