8259796: timed CompletableFuture.get may swallow InterruptedException

Reviewed-by: dl, alanb
This commit is contained in:
Martin Buchholz 2021-01-19 18:41:08 +00:00
parent bb0821eb6a
commit f7b96d347a
2 changed files with 62 additions and 59 deletions

View File

@ -1911,45 +1911,49 @@ public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
* throws TimeoutException on timeout.
*/
private Object timedGet(long nanos) throws TimeoutException {
if (Thread.interrupted())
return null;
if (nanos > 0L) {
long d = System.nanoTime() + nanos;
long deadline = (d == 0L) ? 1L : d; // avoid 0
Signaller q = null;
boolean queued = false;
Object r;
while ((r = result) == null) { // similar to untimed
if (q == null) {
q = new Signaller(true, nanos, deadline);
if (Thread.currentThread() instanceof ForkJoinWorkerThread)
ForkJoinPool.helpAsyncBlocker(defaultExecutor(), q);
}
else if (!queued)
queued = tryPushStack(q);
else if (q.nanos <= 0L)
break;
else {
try {
ForkJoinPool.managedBlock(q);
} catch (InterruptedException ie) {
q.interrupted = true;
}
if (q.interrupted)
break;
long d = System.nanoTime() + nanos;
long deadline = (d == 0L) ? 1L : d; // avoid 0
boolean interrupted = false, queued = false;
Signaller q = null;
Object r = null;
for (;;) { // order of checking interrupt, result, timeout matters
if (interrupted || (interrupted = Thread.interrupted()))
break;
else if ((r = result) != null)
break;
else if (nanos <= 0L)
break;
else if (q == null) {
q = new Signaller(true, nanos, deadline);
if (Thread.currentThread() instanceof ForkJoinWorkerThread)
ForkJoinPool.helpAsyncBlocker(defaultExecutor(), q);
}
else if (!queued)
queued = tryPushStack(q);
else {
try {
ForkJoinPool.managedBlock(q);
interrupted = q.interrupted;
nanos = q.nanos;
} catch (InterruptedException ie) {
interrupted = true;
}
}
if (q != null && queued) {
q.thread = null;
if (r == null)
cleanStack();
}
if (r != null || (r = result) != null)
postComplete();
if (r != null || (q != null && q.interrupted))
return r;
}
throw new TimeoutException();
if (q != null) {
q.thread = null;
if (r == null)
cleanStack();
}
if (r != null) {
if (interrupted)
Thread.currentThread().interrupt();
postComplete();
return r;
} else if (interrupted)
return null;
else
throw new TimeoutException();
}
/* ------------- public methods -------------- */

View File

@ -24,6 +24,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import static java.util.concurrent.TimeUnit.DAYS;
import java.util.concurrent.atomic.AtomicReference;
/*
@ -33,57 +34,55 @@ import java.util.concurrent.atomic.AtomicReference;
* @key randomness
*/
// TODO: incorporate into CompletableFuture tck tests
public class SwallowedInterruptedException {
static final int ITERATIONS = 100;
public static void main(String[] args) throws Throwable {
ThreadLocalRandom rnd = ThreadLocalRandom.current();
for (int i = 1; i <= ITERATIONS; i++) {
System.out.format("Iteration %d%n", i);
boolean timed = rnd.nextBoolean();
long sleepMillis = rnd.nextLong(10);
CompletableFuture<Void> future = new CompletableFuture<>();
CountDownLatch running = new CountDownLatch(1);
AtomicReference<String> failed = new AtomicReference<>();
CountDownLatch threadRunning = new CountDownLatch(1);
AtomicReference<Throwable> fail = new AtomicReference<>();
Thread thread = new Thread(() -> {
// signal main thread that child is running
running.countDown();
threadRunning.countDown();
// invoke Future.get, it complete with the interrupt status set or
// else throw InterruptedException with the interrupt status not set.
try {
future.get();
Void result = (timed) ? future.get(1, DAYS) : future.get();
// interrupt status should be set
if (!Thread.currentThread().isInterrupted()) {
failed.set("Future.get completed with interrupt status not set");
fail.set(new AssertionError(
"Future.get completed with interrupt status not set"));
}
} catch (InterruptedException ex) {
// interrupt status should be cleared
if (Thread.currentThread().isInterrupted()) {
failed.set("InterruptedException with interrupt status set");
fail.set(new AssertionError(
"InterruptedException with interrupt status set"));
}
} catch (Throwable ex) {
failed.set("Unexpected exception " + ex);
fail.set(ex);
}
});
thread.setDaemon(true);
thread.start();
threadRunning.await();
// wait for thread to run
running.await();
// interrupt thread and set result after an optional (random) delay
// interrupt thread, then set result after an optional (random) delay
thread.interrupt();
long sleepMillis = ThreadLocalRandom.current().nextLong(10);
if (sleepMillis > 0)
Thread.sleep(sleepMillis);
future.complete(null);
// wait for thread to terminate and check for failure
thread.join();
String failedReason = failed.get();
if (failedReason != null) {
throw new RuntimeException("Test failed: " + failedReason);
if (fail.get() != null) {
throw new AssertionError(
String.format("Test failed at iteration %d with [timed=%s sleepMillis=%d]",
i, timed, sleepMillis),
fail.get());
}
}
}