8316580: HttpClient with StructuredTaskScope does not close when a task fails

Reviewed-by: djelinski
This commit is contained in:
Daniel Fuchs 2023-09-28 13:32:38 +00:00
parent 065203d44a
commit fc98998627
4 changed files with 453 additions and 27 deletions

View File

@ -283,23 +283,25 @@ final class HttpClientImpl extends HttpClient implements Trackable {
}
}
static void registerPending(PendingRequest pending) {
static <T> CompletableFuture<T> registerPending(PendingRequest pending, CompletableFuture<T> res) {
// shortcut if cf is already completed: no need to go through the trouble of
// registering it
if (pending.cf.isDone()) return;
if (pending.cf.isDone()) return res;
var client = pending.client;
var cf = pending.cf;
var id = pending.id;
boolean added = client.pendingRequests.add(pending);
// this may immediately remove `pending` from the set is the cf is already completed
pending.ref = cf.whenComplete((r,t) -> client.pendingRequests.remove(pending));
var ref = res.whenComplete((r,t) -> client.pendingRequests.remove(pending));
pending.ref = ref;
assert added : "request %d was already added".formatted(id);
// should not happen, unless the selector manager has already
// exited abnormally
if (client.selmgr.isClosed()) {
pending.abort(client.selmgr.selectorClosedException());
}
return ref;
}
static void abortPendingRequests(HttpClientImpl client, Throwable reason) {
@ -931,8 +933,9 @@ final class HttpClientImpl extends HttpClient implements Trackable {
cf = sendAsync(req, responseHandler, null, null);
return cf.get();
} catch (InterruptedException ie) {
if (cf != null )
if (cf != null) {
cf.cancel(true);
}
throw ie;
} catch (ExecutionException e) {
final Throwable throwable = e.getCause();
@ -1052,19 +1055,23 @@ final class HttpClientImpl extends HttpClient implements Trackable {
(b,t) -> debugCompleted("ClientImpl (async)", start, userRequest));
}
// makes sure that any dependent actions happen in the CF default
// executor. This is only needed for sendAsync(...), when
// exchangeExecutor is non-null.
if (exchangeExecutor != null) {
res = res.whenCompleteAsync((r, t) -> { /* do nothing */}, ASYNC_POOL);
}
// The mexCf is the Cf we need to abort if the SelectorManager thread
// is aborted.
PendingRequest pending = new PendingRequest(id, requestImpl, mexCf, mex, this);
registerPending(pending);
return res;
} catch(Throwable t) {
res = registerPending(pending, res);
if (exchangeExecutor != null) {
// makes sure that any dependent actions happen in the CF default
// executor. This is only needed for sendAsync(...), when
// exchangeExecutor is non-null.
return res.isDone() ? res
: res.whenCompleteAsync((r, t) -> { /* do nothing */}, ASYNC_POOL);
} else {
// make a defensive copy that can be safely canceled
// by the caller
return res.isDone() ? res : res.copy();
}
} catch (Throwable t) {
requestUnreference();
debugCompleted("ClientImpl (async)", start, userRequest);
throw t;

View File

@ -91,7 +91,7 @@ class MultiExchange<T> implements Cancelable {
Exchange<T> previous;
volatile Throwable retryCause;
volatile boolean expiredOnce;
volatile HttpResponse<T> response = null;
volatile HttpResponse<T> response;
// Maximum number of times a request will be retried/redirected
// for any reason
@ -278,13 +278,18 @@ class MultiExchange<T> implements Cancelable {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
boolean cancelled = this.cancelled;
boolean firstCancel = false;
if (!cancelled && mayInterruptIfRunning) {
if (interrupted.get() == null) {
interrupted.compareAndSet(null,
firstCancel = interrupted.compareAndSet(null,
new CancellationException("Request cancelled"));
debug.log("multi exchange recording: " + interrupted.get());
} else {
debug.log("multi exchange recorded: " + interrupted.get());
}
if (debug.on()) {
if (firstCancel) {
debug.log("multi exchange recording: " + interrupted.get());
} else {
debug.log("multi exchange recorded: " + interrupted.get());
}
}
this.cancelled = true;
var exchange = getExchange();
@ -377,17 +382,30 @@ class MultiExchange<T> implements Cancelable {
}).exceptionallyCompose(this::whenCancelled);
}
// returns a CancellationExcpetion that wraps the given cause
// if cancel(boolean) was called, the given cause otherwise
private Throwable wrapIfCancelled(Throwable cause) {
CancellationException interrupt = interrupted.get();
if (interrupt == null) return cause;
var cancel = new CancellationException(interrupt.getMessage());
// preserve the stack trace of the original exception to
// show where the call to cancel(boolean) came from
cancel.setStackTrace(interrupt.getStackTrace());
cancel.initCause(Utils.getCancelCause(cause));
return cancel;
}
// if the request failed because the multi exchange was cancelled,
// make sure the reported exception is wrapped in CancellationException
private CompletableFuture<HttpResponse<T>> whenCancelled(Throwable t) {
CancellationException x = interrupted.get();
if (x != null) {
// make sure to fail with CancellationException if cancel(true)
// was called.
t = x.initCause(Utils.getCancelCause(t));
var x = wrapIfCancelled(t);
if (x instanceof CancellationException) {
if (debug.on()) {
debug.log("MultiExchange interrupted with: " + t.getCause());
debug.log("MultiExchange interrupted with: " + x.getCause());
}
}
return MinimalFuture.failedFuture(t);
return MinimalFuture.failedFuture(x);
}
static class NullSubscription implements Flow.Subscription {

View File

@ -0,0 +1,391 @@
/*
* Copyright (c) 2023, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpClient.Version;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import jdk.internal.net.http.common.OperationTrackers.Tracker;
import jdk.test.lib.net.SimpleSSLContext;
import jdk.test.lib.net.URIBuilder;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
/*
* @test
* @bug 8316580
* @library /test/lib
* @run junit/othervm -Djdk.tracePinnedThreads=full
* -DuseReferenceTracker=false
* HttpGetInCancelledFuture
* @run junit/othervm -Djdk.tracePinnedThreads=full
* -DuseReferenceTracker=true
* HttpGetInCancelledFuture
* @summary This test verifies that cancelling a future that
* does an HTTP request using the HttpClient doesn't cause
* HttpClient::close to block forever.
*/
public class HttpGetInCancelledFuture {
static final boolean useTracker = Boolean.getBoolean("useReferenceTracker");
static final class TestException extends RuntimeException {
public TestException(String message, Throwable cause) {
super(message, cause);
}
}
static ReferenceTracker TRACKER = ReferenceTracker.INSTANCE;
HttpClient makeClient(URI uri, Version version, Executor executor) {
var builder = HttpClient.newBuilder();
if (uri.getScheme().equalsIgnoreCase("https")) {
try {
builder.sslContext(new SimpleSSLContext().get());
} catch (IOException io) {
throw new UncheckedIOException(io);
}
}
return builder.connectTimeout(Duration.ofSeconds(1))
.executor(executor)
.version(version)
.build();
}
record TestCase(String url, int reqCount, Version version) {}
// A server that doesn't accept
static volatile ServerSocket NOT_ACCEPTING;
static List<TestCase> parameters() {
ServerSocket ss = NOT_ACCEPTING;
if (ss == null) {
synchronized (HttpGetInCancelledFuture.class) {
if ((ss = NOT_ACCEPTING) == null) {
try {
ss = new ServerSocket();
var loopback = InetAddress.getLoopbackAddress();
ss.bind(new InetSocketAddress(loopback, 0), 10);
NOT_ACCEPTING = ss;
} catch (IOException io) {
throw new UncheckedIOException(io);
}
}
}
}
URI http = URIBuilder.newBuilder()
.loopback()
.scheme("http")
.port(ss.getLocalPort())
.path("/not-accepting/")
.buildUnchecked();
URI https = URIBuilder.newBuilder()
.loopback()
.scheme("https")
.port(ss.getLocalPort())
.path("/not-accepting/")
.buildUnchecked();
// use all HTTP versions, without and with TLS
return List.of(
new TestCase(http.toString(), 200, Version.HTTP_2),
new TestCase(http.toString(), 200, Version.HTTP_1_1),
new TestCase(https.toString(), 200, Version.HTTP_2),
new TestCase(https.toString(), 200, Version.HTTP_1_1)
);
}
@ParameterizedTest
@MethodSource("parameters")
void runTest(TestCase test) {
System.out.println("Testing with: " + test);
runTest(test.url, test.reqCount, test.version);
}
static class TestTaskScope implements AutoCloseable {
final ExecutorService pool = new ForkJoinPool();
final Map<Task<?>, Future<?>> tasks = new ConcurrentHashMap<>();
final AtomicReference<ExecutionException> failed = new AtomicReference<>();
class Task<T> implements Callable<T> {
final Callable<T> task;
final CompletableFuture<T> cf = new CompletableFuture<>();
Task(Callable<T> task) {
this.task = task;
}
@Override
public T call() throws Exception {
try {
var res = task.call();
cf.complete(res);
return res;
} catch (Throwable t) {
cf.completeExceptionally(t);
throw t;
}
}
CompletableFuture<T> cf() {
return cf;
}
}
static class ShutdownOnFailure extends TestTaskScope {
public ShutdownOnFailure() {}
@Override
protected <T> void completed(Task<T> task, T result, Throwable throwable) {
super.completed(task, result, throwable);
if (throwable != null) {
if (failed.get() == null) {
ExecutionException ex = throwable instanceof ExecutionException x
? x : new ExecutionException(throwable);
failed.compareAndSet(null, ex);
}
tasks.entrySet().forEach(this::cancel);
}
}
void cancel(Map.Entry<Task<?>, Future<?>> entry) {
entry.getValue().cancel(true);
entry.getKey().cf().cancel(true);
tasks.remove(entry.getKey(), entry.getValue());
}
@Override
public <T> CompletableFuture<T> fork(Callable<T> callable) {
var ex = failed.get();
if (ex == null) {
return super.fork(callable);
} // otherwise do nothing
return CompletableFuture.failedFuture(new RejectedExecutionException());
}
}
public <T> CompletableFuture<T> fork(Callable<T> callable) {
var task = new Task<>(callable);
var res = pool.submit(task);
tasks.put(task, res);
task.cf.whenComplete((r,t) -> completed(task, r, t));
return task.cf;
}
protected <T> void completed(Task<T> task, T result, Throwable throwable) {
tasks.remove(task);
}
public void join() throws InterruptedException {
try {
var cfs = tasks.keySet().stream()
.map(Task::cf).toArray(CompletableFuture[]::new);
CompletableFuture.allOf(cfs).get();
} catch (InterruptedException it) {
throw it;
} catch (ExecutionException ex) {
failed.compareAndSet(null, ex);
}
}
public void throwIfFailed() throws ExecutionException {
ExecutionException x = failed.get();
if (x != null) throw x;
}
public void close() {
pool.close();
}
}
ExecutorService testExecutor() {
return Executors.newCachedThreadPool();
}
void runTest(String url, int reqCount, Version version) {
final var dest = URI.create(url);
try (final var executor = testExecutor()) {
var httpClient = makeClient(dest, version, executor);
TRACKER.track(httpClient);
Tracker tracker = TRACKER.getTracker(httpClient);
Throwable failed = null;
try {
try (final var scope = new TestTaskScope.ShutdownOnFailure()) {
launchAndProcessRequests(scope, httpClient, reqCount, dest);
} finally {
System.out.printf("StructuredTaskScope closed: STARTED=%s, SUCCESS=%s, INTERRUPT=%s, FAILED=%s%n",
STARTED.get(), SUCCESS.get(), INTERRUPT.get(), FAILED.get());
}
System.out.println("ERROR: Expected TestException not thrown");
throw new AssertionError("Expected TestException not thrown");
} catch (TestException x) {
System.out.println("Got expected exception: " + x);
} catch (Throwable t) {
System.out.println("ERROR: Unexpected exception: " + t);
failed = t;
throw t;
} finally {
// we can either use the tracker or call HttpClient::close
if (useTracker) {
// using the tracker depends on GC but will give us some diagnostic
// if some operations are not properly cancelled and prevent the client
// from terminating
httpClient = null;
System.gc();
System.out.println(TRACKER.diagnose(tracker));
var error = TRACKER.check(tracker, 10000);
if (error != null) {
if (failed != null) error.addSuppressed(failed);
EXCEPTIONS.forEach(x -> {
System.out.println("FAILED: " + x);
});
EXCEPTIONS.forEach(x -> {
x.printStackTrace(System.out);
});
throw error;
}
} else {
// if not all operations terminate, close() will block
// forever and the test will fail in jtreg timeout.
// there will be no diagnostic.
httpClient.close();
}
System.out.println("HttpClient closed");
}
} finally {
System.out.println("ThreadExecutor closed");
}
// not all tasks may have been started before the scope was cancelled
// due to the first connect/timeout exception, but all tasks that started
// must have either succeeded, be interrupted, or failed
assertTrue(STARTED.get() > 0);
assertEquals(STARTED.get(), SUCCESS.get() + INTERRUPT.get() + FAILED.get());
if (SUCCESS.get() > 0) {
// we don't expect any server to be listening and responding
System.out.println("WARNING: got some unexpected successful responses from "
+ "\"" + NOT_ACCEPTING.getLocalSocketAddress() + "\": " + SUCCESS.get());
}
}
private void launchAndProcessRequests(
TestTaskScope.ShutdownOnFailure scope,
HttpClient httpClient,
int reqCount,
URI dest) {
for (int counter = 0; counter < reqCount; counter++) {
scope.fork(() ->
getAndCheck(httpClient, dest)
);
}
try {
scope.join();
} catch (InterruptedException e) {
throw new AssertionError("scope.join() was interrupted", e);
}
try {
scope.throwIfFailed();
} catch (ExecutionException e) {
throw new TestException("something threw an exception in StructuredTaskScope", e);
}
}
final static AtomicLong ID = new AtomicLong();
final AtomicLong SUCCESS = new AtomicLong();
final AtomicLong INTERRUPT = new AtomicLong();
final AtomicLong FAILED = new AtomicLong();
final AtomicLong STARTED = new AtomicLong();
final CopyOnWriteArrayList<Exception> EXCEPTIONS = new CopyOnWriteArrayList<>();
private String getAndCheck(HttpClient httpClient, URI url) {
STARTED.incrementAndGet();
final var response = sendRequest(httpClient, url);
String res = response.body();
int statusCode = response.statusCode();
assertEquals(200, statusCode);
return res;
}
private HttpResponse<String> sendRequest(HttpClient httpClient, URI url) {
var id = ID.incrementAndGet();
try {
var request = HttpRequest.newBuilder(url).GET().build();
var response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
// System.out.println("Got response for " + id + ": " + response);
SUCCESS.incrementAndGet();
return response;
} catch (InterruptedException e) {
INTERRUPT.incrementAndGet();
// System.out.println("Got interrupted for " + id + ": " + e);
throw new RuntimeException(e);
} catch (Exception e) {
FAILED.incrementAndGet();
EXCEPTIONS.add(e);
//System.out.println("Got exception for " + id + ": " + e);
throw new RuntimeException(e);
}
}
@AfterAll
static void tearDown() {
try {
System.gc();
var error = TRACKER.check(5000);
if (error != null) throw error;
} finally {
ServerSocket ss;
synchronized (HttpGetInCancelledFuture.class) {
ss = NOT_ACCEPTING;
NOT_ACCEPTING = null;
}
if (ss != null) {
try {
ss.close();
} catch (IOException io) {
throw new UncheckedIOException(io);
}
}
}
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2018, 2022, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2018, 2023, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -246,6 +246,11 @@ public class ReferenceTracker {
}
long duration = Duration.ofNanos(System.nanoTime() - waitStart).toMillis();
if (hasOutstanding.test(tracker)) {
if (i == 0 && waited == 0) {
// we found nothing and didn't wait expecting success, but then found
// something. Respin to make sure we wait.
return check(tracker, graceDelayMs, hasOutstanding, description, printThreads);
}
StringBuilder warnings = diagnose(tracker, new StringBuilder(), hasOutstanding);
if (hasOutstanding.test(tracker)) {
fail = new AssertionError(warnings.toString());
@ -302,6 +307,11 @@ public class ReferenceTracker {
}
long duration = Duration.ofNanos(System.nanoTime() - waitStart).toMillis();
if (TRACKERS.stream().anyMatch(hasOutstanding)) {
if (i == 0 && waited == 0) {
// we found nothing and didn't wait expecting success, but then found
// something. Respin to make sure we wait.
return check(graceDelayMs, hasOutstanding, description, printThreads);
}
StringBuilder warnings = diagnose(new StringBuilder(), hasOutstanding);
addSummary(warnings);
if (TRACKERS.stream().anyMatch(hasOutstanding)) {