8192944: Miscellaneous changes imported from jsr166 CVS 2017-12-08

Reviewed-by: martin, psandoz, chegar
This commit is contained in:
Doug Lea 2017-12-08 15:30:53 -08:00
parent 00d1900dc9
commit 71a866fe0c
3 changed files with 139 additions and 132 deletions

View File

@ -735,7 +735,7 @@ public abstract class CountedCompleter<T> extends ForkJoinTask<T> {
CountedCompleter<?> a = this, s = a;
while (a.onExceptionalCompletion(ex, s) &&
(a = (s = a).completer) != null && a.status >= 0 &&
a.recordExceptionalCompletion(ex) == EXCEPTIONAL)
isExceptionalStatus(a.recordExceptionalCompletion(ex)))
;
}

View File

@ -219,52 +219,59 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
* methods in a way that flows well in javadocs.
*/
/*
/**
* The status field holds run control status bits packed into a
* single int to minimize footprint and to ensure atomicity (via
* CAS). Status is initially zero, and takes on nonnegative
* values until completed, upon which status (anded with
* DONE_MASK) holds value NORMAL, CANCELLED, or EXCEPTIONAL. Tasks
* undergoing blocking waits by other threads have the SIGNAL bit
* set. Completion of a stolen task with SIGNAL set awakens any
* waiters via notifyAll. Even though suboptimal for some
* purposes, we use basic builtin wait/notify to take advantage of
* "monitor inflation" in JVMs that we would otherwise need to
* emulate to avoid adding further per-task bookkeeping overhead.
* We want these monitors to be "fat", i.e., not use biasing or
* thin-lock techniques, so use some odd coding idioms that tend
* to avoid them, mainly by arranging that every synchronized
* block performs a wait, notifyAll or both.
* single int to ensure atomicity. Status is initially zero, and
* takes on nonnegative values until completed, upon which it
* holds (sign bit) DONE, possibly with ABNORMAL (cancelled or
* exceptional) and THROWN (in which case an exception has been
* stored). Tasks with dependent blocked waiting joiners have the
* SIGNAL bit set. Completion of a task with SIGNAL set awakens
* any waiters via notifyAll. (Waiters also help signal others
* upon completion.)
*
* These control bits occupy only (some of) the upper half (16
* bits) of status field. The lower bits are used for user-defined
* tags.
*/
/** The run status of this task */
volatile int status; // accessed directly by pool and workers
static final int DONE_MASK = 0xf0000000; // mask out non-completion bits
static final int NORMAL = 0xf0000000; // must be negative
static final int CANCELLED = 0xc0000000; // must be < NORMAL
static final int EXCEPTIONAL = 0x80000000; // must be < CANCELLED
static final int SIGNAL = 0x00010000; // must be >= 1 << 16
static final int SMASK = 0x0000ffff; // short bits for tags
private static final int DONE = 1 << 31; // must be negative
private static final int ABNORMAL = 1 << 18; // set atomically with DONE
private static final int THROWN = 1 << 17; // set atomically with ABNORMAL
private static final int SIGNAL = 1 << 16; // true if joiner waiting
private static final int SMASK = 0xffff; // short bits for tags
static boolean isExceptionalStatus(int s) { // needed by subclasses
return (s & THROWN) != 0;
}
/**
* Marks completion and wakes up threads waiting to join this
* task.
* Sets DONE status and wakes up threads waiting to join this task.
*
* @param completion one of NORMAL, CANCELLED, EXCEPTIONAL
* @return completion status on exit
* @return status on exit
*/
private int setCompletion(int completion) {
for (int s;;) {
private int setDone() {
int s;
if (((s = (int)STATUS.getAndBitwiseOr(this, DONE)) & SIGNAL) != 0)
synchronized (this) { notifyAll(); }
return s | DONE;
}
/**
* Marks cancelled or exceptional completion unless already done.
*
* @param completion must be DONE | ABNORMAL, ORed with THROWN if exceptional
* @return status on exit
*/
private int abnormalCompletion(int completion) {
for (int s, ns;;) {
if ((s = status) < 0)
return s;
if (STATUS.compareAndSet(this, s, s | completion)) {
if ((s >>> 16) != 0)
else if (STATUS.weakCompareAndSet(this, s, ns = s | completion)) {
if ((s & SIGNAL) != 0)
synchronized (this) { notifyAll(); }
return completion;
return ns;
}
}
}
@ -282,10 +289,11 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
try {
completed = exec();
} catch (Throwable rex) {
return setExceptionalCompletion(rex);
completed = false;
s = setExceptionalCompletion(rex);
}
if (completed)
s = setCompletion(NORMAL);
s = setDone();
}
return s;
}
@ -297,9 +305,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
* @param timeout using Object.wait conventions.
*/
final void internalWait(long timeout) {
int s;
if ((s = status) >= 0 && // force completer to issue notify
STATUS.compareAndSet(this, s, s | SIGNAL)) {
if ((int)STATUS.getAndBitwiseOr(this, SIGNAL) >= 0) {
synchronized (this) {
if (status >= 0)
try { wait(timeout); } catch (InterruptedException ie) { }
@ -314,27 +320,24 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
* @return status upon completion
*/
private int externalAwaitDone() {
int s = ((this instanceof CountedCompleter) ? // try helping
ForkJoinPool.common.externalHelpComplete(
(CountedCompleter<?>)this, 0) :
ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0);
if (s >= 0 && (s = status) >= 0) {
int s = tryExternalHelp();
if (s >= 0 && (s = (int)STATUS.getAndBitwiseOr(this, SIGNAL)) >= 0) {
boolean interrupted = false;
do {
if (STATUS.compareAndSet(this, s, s | SIGNAL)) {
synchronized (this) {
if (status >= 0) {
for (;;) {
if ((s = status) >= 0) {
try {
wait(0L);
} catch (InterruptedException ie) {
interrupted = true;
}
}
else
else {
notifyAll();
break;
}
}
}
} while ((s = status) >= 0);
if (interrupted)
Thread.currentThread().interrupt();
}
@ -345,27 +348,37 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
* Blocks a non-worker-thread until completion or interruption.
*/
private int externalInterruptibleAwaitDone() throws InterruptedException {
int s;
if (Thread.interrupted())
int s = tryExternalHelp();
if (s >= 0 && (s = (int)STATUS.getAndBitwiseOr(this, SIGNAL)) >= 0) {
synchronized (this) {
for (;;) {
if ((s = status) >= 0)
wait(0L);
else {
notifyAll();
break;
}
}
}
}
else if (Thread.interrupted())
throw new InterruptedException();
if ((s = status) >= 0 &&
(s = ((this instanceof CountedCompleter) ?
return s;
}
/**
* Tries to help with tasks allowed for external callers.
*
* @return current status
*/
private int tryExternalHelp() {
int s;
return ((s = status) < 0 ? s:
(this instanceof CountedCompleter) ?
ForkJoinPool.common.externalHelpComplete(
(CountedCompleter<?>)this, 0) :
ForkJoinPool.common.tryExternalUnpush(this) ? doExec() :
0)) >= 0) {
while ((s = status) >= 0) {
if (STATUS.compareAndSet(this, s, s | SIGNAL)) {
synchronized (this) {
if (status >= 0)
wait(0L);
else
notifyAll();
}
}
}
}
return s;
ForkJoinPool.common.tryExternalUnpush(this) ?
doExec() : 0);
}
/**
@ -475,7 +488,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
} finally {
lock.unlock();
}
s = setCompletion(EXCEPTIONAL);
s = abnormalCompletion(DONE | ABNORMAL | THROWN);
}
return s;
}
@ -487,7 +500,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
*/
private int setExceptionalCompletion(Throwable ex) {
int s = recordExceptionalCompletion(ex);
if ((s & DONE_MASK) == EXCEPTIONAL)
if ((s & THROWN) != 0)
internalPropagateException(ex);
return s;
}
@ -662,10 +675,8 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
* Throws exception, if any, associated with the given status.
*/
private void reportException(int s) {
if (s == CANCELLED)
throw new CancellationException();
if (s == EXCEPTIONAL)
rethrow(getThrowableException());
rethrow((s & THROWN) != 0 ? getThrowableException() :
new CancellationException());
}
// public methods
@ -707,7 +718,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
*/
public final V join() {
int s;
if ((s = doJoin() & DONE_MASK) != NORMAL)
if (((s = doJoin()) & ABNORMAL) != 0)
reportException(s);
return getRawResult();
}
@ -722,7 +733,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
*/
public final V invoke() {
int s;
if ((s = doInvoke() & DONE_MASK) != NORMAL)
if (((s = doInvoke()) & ABNORMAL) != 0)
reportException(s);
return getRawResult();
}
@ -747,9 +758,9 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2) {
int s1, s2;
t2.fork();
if ((s1 = t1.doInvoke() & DONE_MASK) != NORMAL)
if (((s1 = t1.doInvoke()) & ABNORMAL) != 0)
t1.reportException(s1);
if ((s2 = t2.doJoin() & DONE_MASK) != NORMAL)
if (((s2 = t2.doJoin()) & ABNORMAL) != 0)
t2.reportException(s2);
}
@ -779,7 +790,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
}
else if (i != 0)
t.fork();
else if (t.doInvoke() < NORMAL && ex == null)
else if ((t.doInvoke() & ABNORMAL) != 0 && ex == null)
ex = t.getException();
}
for (int i = 1; i <= last; ++i) {
@ -787,7 +798,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
if (t != null) {
if (ex != null)
t.cancel(false);
else if (t.doJoin() < NORMAL)
else if ((t.doJoin() & ABNORMAL) != 0)
ex = t.getException();
}
}
@ -831,7 +842,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
}
else if (i != 0)
t.fork();
else if (t.doInvoke() < NORMAL && ex == null)
else if ((t.doInvoke() & ABNORMAL) != 0 && ex == null)
ex = t.getException();
}
for (int i = 1; i <= last; ++i) {
@ -839,7 +850,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
if (t != null) {
if (ex != null)
t.cancel(false);
else if (t.doJoin() < NORMAL)
else if ((t.doJoin() & ABNORMAL) != 0)
ex = t.getException();
}
}
@ -876,7 +887,8 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
* @return {@code true} if this task is now cancelled
*/
public boolean cancel(boolean mayInterruptIfRunning) {
return (setCompletion(CANCELLED) & DONE_MASK) == CANCELLED;
int s = abnormalCompletion(DONE | ABNORMAL);
return (s & (ABNORMAL | THROWN)) == ABNORMAL;
}
public final boolean isDone() {
@ -884,7 +896,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
}
public final boolean isCancelled() {
return (status & DONE_MASK) == CANCELLED;
return (status & (ABNORMAL | THROWN)) == ABNORMAL;
}
/**
@ -893,7 +905,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
* @return {@code true} if this task threw an exception or was cancelled
*/
public final boolean isCompletedAbnormally() {
return status < NORMAL;
return (status & ABNORMAL) != 0;
}
/**
@ -904,7 +916,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
* exception and was not cancelled
*/
public final boolean isCompletedNormally() {
return (status & DONE_MASK) == NORMAL;
return (status & (DONE | ABNORMAL)) == DONE;
}
/**
@ -915,9 +927,9 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
* @return the exception, or {@code null} if none
*/
public final Throwable getException() {
int s = status & DONE_MASK;
return ((s >= NORMAL) ? null :
(s == CANCELLED) ? new CancellationException() :
int s = status;
return ((s & ABNORMAL) == 0 ? null :
(s & THROWN) == 0 ? new CancellationException() :
getThrowableException());
}
@ -961,7 +973,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
setExceptionalCompletion(rex);
return;
}
setCompletion(NORMAL);
setDone();
}
/**
@ -973,7 +985,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
* @since 1.8
*/
public final void quietlyComplete() {
setCompletion(NORMAL);
setDone();
}
/**
@ -990,10 +1002,11 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
public final V get() throws InterruptedException, ExecutionException {
int s = (Thread.currentThread() instanceof ForkJoinWorkerThread) ?
doJoin() : externalInterruptibleAwaitDone();
if ((s &= DONE_MASK) == CANCELLED)
throw new CancellationException();
if (s == EXCEPTIONAL)
if ((s & THROWN) != 0)
throw new ExecutionException(getThrowableException());
else if ((s & ABNORMAL) != 0)
throw new CancellationException();
else
return getRawResult();
}
@ -1034,7 +1047,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
while ((s = status) >= 0 &&
(ns = deadline - System.nanoTime()) > 0L) {
if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) > 0L &&
STATUS.compareAndSet(this, s, s | SIGNAL)) {
(s = (int)STATUS.getAndBitwiseOr(this, SIGNAL)) >= 0) {
synchronized (this) {
if (status >= 0)
wait(ms); // OK to throw InterruptedException
@ -1046,14 +1059,12 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
}
}
if (s >= 0)
s = status;
if ((s &= DONE_MASK) != NORMAL) {
if (s == CANCELLED)
throw new CancellationException();
if (s != EXCEPTIONAL)
throw new TimeoutException();
else if ((s & THROWN) != 0)
throw new ExecutionException(getThrowableException());
}
else if ((s & ABNORMAL) != 0)
throw new CancellationException();
else
return getRawResult();
}
@ -1110,7 +1121,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
* setRawResult(null)}.
*/
public void reinitialize() {
if ((status & DONE_MASK) == EXCEPTIONAL)
if ((status & THROWN) != 0)
clearExceptionalCompletion();
else
status = 0;
@ -1327,7 +1338,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
*/
public final short setForkJoinTaskTag(short newValue) {
for (int s;;) {
if (STATUS.compareAndSet(this, s = status,
if (STATUS.weakCompareAndSet(this, s = status,
(s & ~SMASK) | (newValue & SMASK)))
return (short)s;
}
@ -1351,7 +1362,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
for (int s;;) {
if ((short)(s = status) != expect)
return false;
if (STATUS.compareAndSet(this, s,
if (STATUS.weakCompareAndSet(this, s,
(s & ~SMASK) | (update & SMASK)))
return true;
}

View File

@ -105,8 +105,7 @@ public class ExecutorCompletionServiceTest extends JSR166TestCase {
/**
* A taken submitted task is completed
*/
public void testTake()
throws InterruptedException, ExecutionException {
public void testTake() throws Exception {
CompletionService cs = new ExecutorCompletionService(cachedThreadPool);
cs.submit(new StringTask());
Future f = cs.take();
@ -127,8 +126,7 @@ public class ExecutorCompletionServiceTest extends JSR166TestCase {
/**
* poll returns non-null when the returned task is completed
*/
public void testPoll1()
throws InterruptedException, ExecutionException {
public void testPoll1() throws Exception {
CompletionService cs = new ExecutorCompletionService(cachedThreadPool);
assertNull(cs.poll());
cs.submit(new StringTask());
@ -147,15 +145,15 @@ public class ExecutorCompletionServiceTest extends JSR166TestCase {
/**
* timed poll returns non-null when the returned task is completed
*/
public void testPoll2()
throws InterruptedException, ExecutionException {
public void testPoll2() throws Exception {
CompletionService cs = new ExecutorCompletionService(cachedThreadPool);
assertNull(cs.poll());
cs.submit(new StringTask());
long startTime = System.nanoTime();
Future f;
while ((f = cs.poll(SHORT_DELAY_MS, MILLISECONDS)) == null) {
while ((f = cs.poll(timeoutMillis(), MILLISECONDS)) == null) {
assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
if (millisElapsedSince(startTime) > LONG_DELAY_MS)
fail("timed out");
Thread.yield();
@ -167,8 +165,7 @@ public class ExecutorCompletionServiceTest extends JSR166TestCase {
/**
* poll returns null before the returned task is completed
*/
public void testPollReturnsNull()
throws InterruptedException, ExecutionException {
public void testPollReturnsNullBeforeCompletion() throws Exception {
CompletionService cs = new ExecutorCompletionService(cachedThreadPool);
final CountDownLatch proceed = new CountDownLatch(1);
cs.submit(new Callable() { public String call() throws Exception {
@ -188,29 +185,28 @@ public class ExecutorCompletionServiceTest extends JSR166TestCase {
/**
* successful and failed tasks are both returned
*/
public void testTaskAssortment()
throws InterruptedException, ExecutionException {
public void testTaskAssortment() throws Exception {
CompletionService cs = new ExecutorCompletionService(cachedThreadPool);
ArithmeticException ex = new ArithmeticException();
for (int i = 0; i < 2; i++) {
final int rounds = 2;
for (int i = rounds; i--> 0; ) {
cs.submit(new StringTask());
cs.submit(callableThrowing(ex));
cs.submit(runnableThrowing(ex), null);
}
int normalCompletions = 0;
int exceptionalCompletions = 0;
for (int i = 0; i < 3 * 2; i++) {
for (int i = 3 * rounds; i--> 0; ) {
try {
if (cs.take().get() == TEST_STRING)
assertSame(TEST_STRING, cs.take().get());
normalCompletions++;
}
catch (ExecutionException expected) {
assertTrue(expected.getCause() instanceof ArithmeticException);
} catch (ExecutionException expected) {
assertSame(ex, expected.getCause());
exceptionalCompletions++;
}
}
assertEquals(2 * 1, normalCompletions);
assertEquals(2 * 2, exceptionalCompletions);
assertEquals(1 * rounds, normalCompletions);
assertEquals(2 * rounds, exceptionalCompletions);
assertNull(cs.poll());
}