From 71a866fe0cd6a708eae6a3f21bf06fb36c85e652 Mon Sep 17 00:00:00 2001 From: Doug Lea Date: Fri, 8 Dec 2017 15:30:53 -0800 Subject: [PATCH] 8192944: Miscellaneous changes imported from jsr166 CVS 2017-12-08 Reviewed-by: martin, psandoz, chegar --- .../util/concurrent/CountedCompleter.java | 2 +- .../java/util/concurrent/ForkJoinTask.java | 233 +++++++++--------- .../tck/ExecutorCompletionServiceTest.java | 36 ++- 3 files changed, 139 insertions(+), 132 deletions(-) diff --git a/src/java.base/share/classes/java/util/concurrent/CountedCompleter.java b/src/java.base/share/classes/java/util/concurrent/CountedCompleter.java index 9d273a56fde..9321ccfbfb0 100644 --- a/src/java.base/share/classes/java/util/concurrent/CountedCompleter.java +++ b/src/java.base/share/classes/java/util/concurrent/CountedCompleter.java @@ -735,7 +735,7 @@ public abstract class CountedCompleter extends ForkJoinTask { CountedCompleter a = this, s = a; while (a.onExceptionalCompletion(ex, s) && (a = (s = a).completer) != null && a.status >= 0 && - a.recordExceptionalCompletion(ex) == EXCEPTIONAL) + isExceptionalStatus(a.recordExceptionalCompletion(ex))) ; } diff --git a/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java b/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java index 193cb9e81cb..e4e3aa8f934 100644 --- a/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java +++ b/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java @@ -219,52 +219,59 @@ public abstract class ForkJoinTask implements Future, Serializable { * methods in a way that flows well in javadocs. */ - /* + /** * The status field holds run control status bits packed into a - * single int to minimize footprint and to ensure atomicity (via - * CAS). Status is initially zero, and takes on nonnegative - * values until completed, upon which status (anded with - * DONE_MASK) holds value NORMAL, CANCELLED, or EXCEPTIONAL. Tasks - * undergoing blocking waits by other threads have the SIGNAL bit - * set. Completion of a stolen task with SIGNAL set awakens any - * waiters via notifyAll. Even though suboptimal for some - * purposes, we use basic builtin wait/notify to take advantage of - * "monitor inflation" in JVMs that we would otherwise need to - * emulate to avoid adding further per-task bookkeeping overhead. - * We want these monitors to be "fat", i.e., not use biasing or - * thin-lock techniques, so use some odd coding idioms that tend - * to avoid them, mainly by arranging that every synchronized - * block performs a wait, notifyAll or both. + * single int to ensure atomicity. Status is initially zero, and + * takes on nonnegative values until completed, upon which it + * holds (sign bit) DONE, possibly with ABNORMAL (cancelled or + * exceptional) and THROWN (in which case an exception has been + * stored). Tasks with dependent blocked waiting joiners have the + * SIGNAL bit set. Completion of a task with SIGNAL set awakens + * any waiters via notifyAll. (Waiters also help signal others + * upon completion.) * * These control bits occupy only (some of) the upper half (16 * bits) of status field. The lower bits are used for user-defined * tags. */ - - /** The run status of this task */ volatile int status; // accessed directly by pool and workers - static final int DONE_MASK = 0xf0000000; // mask out non-completion bits - static final int NORMAL = 0xf0000000; // must be negative - static final int CANCELLED = 0xc0000000; // must be < NORMAL - static final int EXCEPTIONAL = 0x80000000; // must be < CANCELLED - static final int SIGNAL = 0x00010000; // must be >= 1 << 16 - static final int SMASK = 0x0000ffff; // short bits for tags + + private static final int DONE = 1 << 31; // must be negative + private static final int ABNORMAL = 1 << 18; // set atomically with DONE + private static final int THROWN = 1 << 17; // set atomically with ABNORMAL + private static final int SIGNAL = 1 << 16; // true if joiner waiting + private static final int SMASK = 0xffff; // short bits for tags + + static boolean isExceptionalStatus(int s) { // needed by subclasses + return (s & THROWN) != 0; + } /** - * Marks completion and wakes up threads waiting to join this - * task. + * Sets DONE status and wakes up threads waiting to join this task. * - * @param completion one of NORMAL, CANCELLED, EXCEPTIONAL - * @return completion status on exit + * @return status on exit */ - private int setCompletion(int completion) { - for (int s;;) { + private int setDone() { + int s; + if (((s = (int)STATUS.getAndBitwiseOr(this, DONE)) & SIGNAL) != 0) + synchronized (this) { notifyAll(); } + return s | DONE; + } + + /** + * Marks cancelled or exceptional completion unless already done. + * + * @param completion must be DONE | ABNORMAL, ORed with THROWN if exceptional + * @return status on exit + */ + private int abnormalCompletion(int completion) { + for (int s, ns;;) { if ((s = status) < 0) return s; - if (STATUS.compareAndSet(this, s, s | completion)) { - if ((s >>> 16) != 0) + else if (STATUS.weakCompareAndSet(this, s, ns = s | completion)) { + if ((s & SIGNAL) != 0) synchronized (this) { notifyAll(); } - return completion; + return ns; } } } @@ -282,10 +289,11 @@ public abstract class ForkJoinTask implements Future, Serializable { try { completed = exec(); } catch (Throwable rex) { - return setExceptionalCompletion(rex); + completed = false; + s = setExceptionalCompletion(rex); } if (completed) - s = setCompletion(NORMAL); + s = setDone(); } return s; } @@ -297,9 +305,7 @@ public abstract class ForkJoinTask implements Future, Serializable { * @param timeout using Object.wait conventions. */ final void internalWait(long timeout) { - int s; - if ((s = status) >= 0 && // force completer to issue notify - STATUS.compareAndSet(this, s, s | SIGNAL)) { + if ((int)STATUS.getAndBitwiseOr(this, SIGNAL) >= 0) { synchronized (this) { if (status >= 0) try { wait(timeout); } catch (InterruptedException ie) { } @@ -314,27 +320,24 @@ public abstract class ForkJoinTask implements Future, Serializable { * @return status upon completion */ private int externalAwaitDone() { - int s = ((this instanceof CountedCompleter) ? // try helping - ForkJoinPool.common.externalHelpComplete( - (CountedCompleter)this, 0) : - ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0); - if (s >= 0 && (s = status) >= 0) { + int s = tryExternalHelp(); + if (s >= 0 && (s = (int)STATUS.getAndBitwiseOr(this, SIGNAL)) >= 0) { boolean interrupted = false; - do { - if (STATUS.compareAndSet(this, s, s | SIGNAL)) { - synchronized (this) { - if (status >= 0) { - try { - wait(0L); - } catch (InterruptedException ie) { - interrupted = true; - } + synchronized (this) { + for (;;) { + if ((s = status) >= 0) { + try { + wait(0L); + } catch (InterruptedException ie) { + interrupted = true; } - else - notifyAll(); + } + else { + notifyAll(); + break; } } - } while ((s = status) >= 0); + } if (interrupted) Thread.currentThread().interrupt(); } @@ -345,29 +348,39 @@ public abstract class ForkJoinTask implements Future, Serializable { * Blocks a non-worker-thread until completion or interruption. */ private int externalInterruptibleAwaitDone() throws InterruptedException { - int s; - if (Thread.interrupted()) - throw new InterruptedException(); - if ((s = status) >= 0 && - (s = ((this instanceof CountedCompleter) ? - ForkJoinPool.common.externalHelpComplete( - (CountedCompleter)this, 0) : - ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : - 0)) >= 0) { - while ((s = status) >= 0) { - if (STATUS.compareAndSet(this, s, s | SIGNAL)) { - synchronized (this) { - if (status >= 0) - wait(0L); - else - notifyAll(); + int s = tryExternalHelp(); + if (s >= 0 && (s = (int)STATUS.getAndBitwiseOr(this, SIGNAL)) >= 0) { + synchronized (this) { + for (;;) { + if ((s = status) >= 0) + wait(0L); + else { + notifyAll(); + break; } } } } + else if (Thread.interrupted()) + throw new InterruptedException(); return s; } + /** + * Tries to help with tasks allowed for external callers. + * + * @return current status + */ + private int tryExternalHelp() { + int s; + return ((s = status) < 0 ? s: + (this instanceof CountedCompleter) ? + ForkJoinPool.common.externalHelpComplete( + (CountedCompleter)this, 0) : + ForkJoinPool.common.tryExternalUnpush(this) ? + doExec() : 0); + } + /** * Implementation for join, get, quietlyJoin. Directly handles * only cases of already-completed, external wait, and @@ -475,7 +488,7 @@ public abstract class ForkJoinTask implements Future, Serializable { } finally { lock.unlock(); } - s = setCompletion(EXCEPTIONAL); + s = abnormalCompletion(DONE | ABNORMAL | THROWN); } return s; } @@ -487,7 +500,7 @@ public abstract class ForkJoinTask implements Future, Serializable { */ private int setExceptionalCompletion(Throwable ex) { int s = recordExceptionalCompletion(ex); - if ((s & DONE_MASK) == EXCEPTIONAL) + if ((s & THROWN) != 0) internalPropagateException(ex); return s; } @@ -662,10 +675,8 @@ public abstract class ForkJoinTask implements Future, Serializable { * Throws exception, if any, associated with the given status. */ private void reportException(int s) { - if (s == CANCELLED) - throw new CancellationException(); - if (s == EXCEPTIONAL) - rethrow(getThrowableException()); + rethrow((s & THROWN) != 0 ? getThrowableException() : + new CancellationException()); } // public methods @@ -707,7 +718,7 @@ public abstract class ForkJoinTask implements Future, Serializable { */ public final V join() { int s; - if ((s = doJoin() & DONE_MASK) != NORMAL) + if (((s = doJoin()) & ABNORMAL) != 0) reportException(s); return getRawResult(); } @@ -722,7 +733,7 @@ public abstract class ForkJoinTask implements Future, Serializable { */ public final V invoke() { int s; - if ((s = doInvoke() & DONE_MASK) != NORMAL) + if (((s = doInvoke()) & ABNORMAL) != 0) reportException(s); return getRawResult(); } @@ -747,9 +758,9 @@ public abstract class ForkJoinTask implements Future, Serializable { public static void invokeAll(ForkJoinTask t1, ForkJoinTask t2) { int s1, s2; t2.fork(); - if ((s1 = t1.doInvoke() & DONE_MASK) != NORMAL) + if (((s1 = t1.doInvoke()) & ABNORMAL) != 0) t1.reportException(s1); - if ((s2 = t2.doJoin() & DONE_MASK) != NORMAL) + if (((s2 = t2.doJoin()) & ABNORMAL) != 0) t2.reportException(s2); } @@ -779,7 +790,7 @@ public abstract class ForkJoinTask implements Future, Serializable { } else if (i != 0) t.fork(); - else if (t.doInvoke() < NORMAL && ex == null) + else if ((t.doInvoke() & ABNORMAL) != 0 && ex == null) ex = t.getException(); } for (int i = 1; i <= last; ++i) { @@ -787,7 +798,7 @@ public abstract class ForkJoinTask implements Future, Serializable { if (t != null) { if (ex != null) t.cancel(false); - else if (t.doJoin() < NORMAL) + else if ((t.doJoin() & ABNORMAL) != 0) ex = t.getException(); } } @@ -831,7 +842,7 @@ public abstract class ForkJoinTask implements Future, Serializable { } else if (i != 0) t.fork(); - else if (t.doInvoke() < NORMAL && ex == null) + else if ((t.doInvoke() & ABNORMAL) != 0 && ex == null) ex = t.getException(); } for (int i = 1; i <= last; ++i) { @@ -839,7 +850,7 @@ public abstract class ForkJoinTask implements Future, Serializable { if (t != null) { if (ex != null) t.cancel(false); - else if (t.doJoin() < NORMAL) + else if ((t.doJoin() & ABNORMAL) != 0) ex = t.getException(); } } @@ -876,7 +887,8 @@ public abstract class ForkJoinTask implements Future, Serializable { * @return {@code true} if this task is now cancelled */ public boolean cancel(boolean mayInterruptIfRunning) { - return (setCompletion(CANCELLED) & DONE_MASK) == CANCELLED; + int s = abnormalCompletion(DONE | ABNORMAL); + return (s & (ABNORMAL | THROWN)) == ABNORMAL; } public final boolean isDone() { @@ -884,7 +896,7 @@ public abstract class ForkJoinTask implements Future, Serializable { } public final boolean isCancelled() { - return (status & DONE_MASK) == CANCELLED; + return (status & (ABNORMAL | THROWN)) == ABNORMAL; } /** @@ -893,7 +905,7 @@ public abstract class ForkJoinTask implements Future, Serializable { * @return {@code true} if this task threw an exception or was cancelled */ public final boolean isCompletedAbnormally() { - return status < NORMAL; + return (status & ABNORMAL) != 0; } /** @@ -904,7 +916,7 @@ public abstract class ForkJoinTask implements Future, Serializable { * exception and was not cancelled */ public final boolean isCompletedNormally() { - return (status & DONE_MASK) == NORMAL; + return (status & (DONE | ABNORMAL)) == DONE; } /** @@ -915,9 +927,9 @@ public abstract class ForkJoinTask implements Future, Serializable { * @return the exception, or {@code null} if none */ public final Throwable getException() { - int s = status & DONE_MASK; - return ((s >= NORMAL) ? null : - (s == CANCELLED) ? new CancellationException() : + int s = status; + return ((s & ABNORMAL) == 0 ? null : + (s & THROWN) == 0 ? new CancellationException() : getThrowableException()); } @@ -961,7 +973,7 @@ public abstract class ForkJoinTask implements Future, Serializable { setExceptionalCompletion(rex); return; } - setCompletion(NORMAL); + setDone(); } /** @@ -973,7 +985,7 @@ public abstract class ForkJoinTask implements Future, Serializable { * @since 1.8 */ public final void quietlyComplete() { - setCompletion(NORMAL); + setDone(); } /** @@ -990,11 +1002,12 @@ public abstract class ForkJoinTask implements Future, Serializable { public final V get() throws InterruptedException, ExecutionException { int s = (Thread.currentThread() instanceof ForkJoinWorkerThread) ? doJoin() : externalInterruptibleAwaitDone(); - if ((s &= DONE_MASK) == CANCELLED) - throw new CancellationException(); - if (s == EXCEPTIONAL) + if ((s & THROWN) != 0) throw new ExecutionException(getThrowableException()); - return getRawResult(); + else if ((s & ABNORMAL) != 0) + throw new CancellationException(); + else + return getRawResult(); } /** @@ -1034,7 +1047,7 @@ public abstract class ForkJoinTask implements Future, Serializable { while ((s = status) >= 0 && (ns = deadline - System.nanoTime()) > 0L) { if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) > 0L && - STATUS.compareAndSet(this, s, s | SIGNAL)) { + (s = (int)STATUS.getAndBitwiseOr(this, SIGNAL)) >= 0) { synchronized (this) { if (status >= 0) wait(ms); // OK to throw InterruptedException @@ -1046,15 +1059,13 @@ public abstract class ForkJoinTask implements Future, Serializable { } } if (s >= 0) - s = status; - if ((s &= DONE_MASK) != NORMAL) { - if (s == CANCELLED) - throw new CancellationException(); - if (s != EXCEPTIONAL) - throw new TimeoutException(); + throw new TimeoutException(); + else if ((s & THROWN) != 0) throw new ExecutionException(getThrowableException()); - } - return getRawResult(); + else if ((s & ABNORMAL) != 0) + throw new CancellationException(); + else + return getRawResult(); } /** @@ -1110,7 +1121,7 @@ public abstract class ForkJoinTask implements Future, Serializable { * setRawResult(null)}. */ public void reinitialize() { - if ((status & DONE_MASK) == EXCEPTIONAL) + if ((status & THROWN) != 0) clearExceptionalCompletion(); else status = 0; @@ -1327,8 +1338,8 @@ public abstract class ForkJoinTask implements Future, Serializable { */ public final short setForkJoinTaskTag(short newValue) { for (int s;;) { - if (STATUS.compareAndSet(this, s = status, - (s & ~SMASK) | (newValue & SMASK))) + if (STATUS.weakCompareAndSet(this, s = status, + (s & ~SMASK) | (newValue & SMASK))) return (short)s; } } @@ -1351,8 +1362,8 @@ public abstract class ForkJoinTask implements Future, Serializable { for (int s;;) { if ((short)(s = status) != expect) return false; - if (STATUS.compareAndSet(this, s, - (s & ~SMASK) | (update & SMASK))) + if (STATUS.weakCompareAndSet(this, s, + (s & ~SMASK) | (update & SMASK))) return true; } } diff --git a/test/jdk/java/util/concurrent/tck/ExecutorCompletionServiceTest.java b/test/jdk/java/util/concurrent/tck/ExecutorCompletionServiceTest.java index 9b7251e6865..2a936940c96 100644 --- a/test/jdk/java/util/concurrent/tck/ExecutorCompletionServiceTest.java +++ b/test/jdk/java/util/concurrent/tck/ExecutorCompletionServiceTest.java @@ -105,8 +105,7 @@ public class ExecutorCompletionServiceTest extends JSR166TestCase { /** * A taken submitted task is completed */ - public void testTake() - throws InterruptedException, ExecutionException { + public void testTake() throws Exception { CompletionService cs = new ExecutorCompletionService(cachedThreadPool); cs.submit(new StringTask()); Future f = cs.take(); @@ -127,8 +126,7 @@ public class ExecutorCompletionServiceTest extends JSR166TestCase { /** * poll returns non-null when the returned task is completed */ - public void testPoll1() - throws InterruptedException, ExecutionException { + public void testPoll1() throws Exception { CompletionService cs = new ExecutorCompletionService(cachedThreadPool); assertNull(cs.poll()); cs.submit(new StringTask()); @@ -147,15 +145,15 @@ public class ExecutorCompletionServiceTest extends JSR166TestCase { /** * timed poll returns non-null when the returned task is completed */ - public void testPoll2() - throws InterruptedException, ExecutionException { + public void testPoll2() throws Exception { CompletionService cs = new ExecutorCompletionService(cachedThreadPool); assertNull(cs.poll()); cs.submit(new StringTask()); long startTime = System.nanoTime(); Future f; - while ((f = cs.poll(SHORT_DELAY_MS, MILLISECONDS)) == null) { + while ((f = cs.poll(timeoutMillis(), MILLISECONDS)) == null) { + assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); if (millisElapsedSince(startTime) > LONG_DELAY_MS) fail("timed out"); Thread.yield(); @@ -167,8 +165,7 @@ public class ExecutorCompletionServiceTest extends JSR166TestCase { /** * poll returns null before the returned task is completed */ - public void testPollReturnsNull() - throws InterruptedException, ExecutionException { + public void testPollReturnsNullBeforeCompletion() throws Exception { CompletionService cs = new ExecutorCompletionService(cachedThreadPool); final CountDownLatch proceed = new CountDownLatch(1); cs.submit(new Callable() { public String call() throws Exception { @@ -188,29 +185,28 @@ public class ExecutorCompletionServiceTest extends JSR166TestCase { /** * successful and failed tasks are both returned */ - public void testTaskAssortment() - throws InterruptedException, ExecutionException { + public void testTaskAssortment() throws Exception { CompletionService cs = new ExecutorCompletionService(cachedThreadPool); ArithmeticException ex = new ArithmeticException(); - for (int i = 0; i < 2; i++) { + final int rounds = 2; + for (int i = rounds; i--> 0; ) { cs.submit(new StringTask()); cs.submit(callableThrowing(ex)); cs.submit(runnableThrowing(ex), null); } int normalCompletions = 0; int exceptionalCompletions = 0; - for (int i = 0; i < 3 * 2; i++) { + for (int i = 3 * rounds; i--> 0; ) { try { - if (cs.take().get() == TEST_STRING) - normalCompletions++; - } - catch (ExecutionException expected) { - assertTrue(expected.getCause() instanceof ArithmeticException); + assertSame(TEST_STRING, cs.take().get()); + normalCompletions++; + } catch (ExecutionException expected) { + assertSame(ex, expected.getCause()); exceptionalCompletions++; } } - assertEquals(2 * 1, normalCompletions); - assertEquals(2 * 2, exceptionalCompletions); + assertEquals(1 * rounds, normalCompletions); + assertEquals(2 * rounds, exceptionalCompletions); assertNull(cs.poll()); }