8305847: Improve diagnosability and resilience of HttpClient::close tests
Reviewed-by: jpai, djelinski
This commit is contained in:
parent
d7dc474a5a
commit
90b4006bce
test/jdk/java/net/httpclient
@ -42,8 +42,6 @@ import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.io.UncheckedIOException;
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.URI;
|
||||
import java.net.http.HttpClient;
|
||||
import java.net.http.HttpClient.Redirect;
|
||||
@ -62,18 +60,11 @@ import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.RejectedExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.function.Function;
|
||||
import jdk.httpclient.test.lib.common.HttpServerAdapters;
|
||||
import jdk.httpclient.test.lib.http2.Http2TestServer;
|
||||
import javax.net.ssl.SSLContext;
|
||||
import javax.net.ssl.SSLHandshakeException;
|
||||
|
||||
import com.sun.net.httpserver.HttpServer;
|
||||
import com.sun.net.httpserver.HttpsConfigurator;
|
||||
import com.sun.net.httpserver.HttpsServer;
|
||||
import jdk.test.lib.RandomFactory;
|
||||
import jdk.test.lib.net.SimpleSSLContext;
|
||||
import org.testng.annotations.AfterTest;
|
||||
@ -99,6 +90,7 @@ public class AsyncShutdownNow implements HttpServerAdapters {
|
||||
}
|
||||
static final Random RANDOM = RandomFactory.getRandom();
|
||||
|
||||
ExecutorService readerService;
|
||||
SSLContext sslContext;
|
||||
HttpTestServer httpTestServer; // HTTP/1.1 [ 4 servers ]
|
||||
HttpTestServer httpsTestServer; // HTTPS/1.1
|
||||
@ -123,7 +115,7 @@ public class AsyncShutdownNow implements HttpServerAdapters {
|
||||
}
|
||||
|
||||
static final AtomicLong requestCounter = new AtomicLong();
|
||||
final ReferenceTracker TRACKER = ReferenceTracker.INSTANCE;
|
||||
static final ReferenceTracker TRACKER = ReferenceTracker.INSTANCE;
|
||||
|
||||
static Throwable getCause(Throwable t) {
|
||||
while (t instanceof CompletionException || t instanceof ExecutionException) {
|
||||
@ -175,8 +167,7 @@ public class AsyncShutdownNow implements HttpServerAdapters {
|
||||
|
||||
@Test(dataProvider = "positive")
|
||||
void testConcurrent(String uriString) throws Exception {
|
||||
out.printf("%n---- starting (%s) ----%n", uriString);
|
||||
ExecutorService readerService = Executors.newCachedThreadPool();
|
||||
out.printf("%n---- starting concurrent (%s) ----%n%n", uriString);
|
||||
HttpClient client = HttpClient.newBuilder()
|
||||
.proxy(NO_PROXY)
|
||||
.followRedirects(Redirect.ALWAYS)
|
||||
@ -186,8 +177,8 @@ public class AsyncShutdownNow implements HttpServerAdapters {
|
||||
|
||||
int step = RANDOM.nextInt(ITERATIONS);
|
||||
Throwable failed = null;
|
||||
List<CompletableFuture<String>> bodies = new ArrayList<>();
|
||||
try {
|
||||
List<CompletableFuture<String>> bodies = new ArrayList<>();
|
||||
for (int i = 0; i < ITERATIONS; i++) {
|
||||
URI uri = URI.create(uriString + "/concurrent/iteration-" + i);
|
||||
HttpRequest request = HttpRequest.newBuilder(uri)
|
||||
@ -232,31 +223,31 @@ public class AsyncShutdownNow implements HttpServerAdapters {
|
||||
});
|
||||
bodies.add(cf);
|
||||
}
|
||||
CompletableFuture.allOf(bodies.toArray(new CompletableFuture<?>[0])).get();
|
||||
} catch (Throwable throwable) {
|
||||
failed = throwable;
|
||||
} finally {
|
||||
failed = cleanup(client, readerService, failed);
|
||||
failed = cleanup(client, failed);
|
||||
}
|
||||
if (failed instanceof Exception ex) throw ex;
|
||||
if (failed instanceof Error e) throw e;
|
||||
assertTrue(client.isTerminated());
|
||||
// ensure that all operations are eventually terminated
|
||||
CompletableFuture.allOf(bodies.toArray(new CompletableFuture<?>[0])).get();
|
||||
}
|
||||
|
||||
static Throwable cleanup(HttpClient client, ExecutorService readerService, Throwable failed) {
|
||||
static Throwable cleanup(HttpClient client, Throwable failed) {
|
||||
try {
|
||||
try {
|
||||
if (client.awaitTermination(Duration.ofMillis(2000))) {
|
||||
out.println("Client terminated within expected delay");
|
||||
} else {
|
||||
AssertionError error = new AssertionError("client still running");
|
||||
if (failed != null) {
|
||||
failed.addSuppressed(error);
|
||||
} else failed = error;
|
||||
}
|
||||
} finally {
|
||||
readerService.shutdown();
|
||||
readerService.awaitTermination(2000, TimeUnit.MILLISECONDS);
|
||||
if (client.awaitTermination(Duration.ofMillis(2000))) {
|
||||
out.println("Client terminated within expected delay");
|
||||
} else {
|
||||
String msg = "Client %s still running: %s".formatted(
|
||||
client,
|
||||
TRACKER.diagnose(client));
|
||||
out.println(msg);
|
||||
AssertionError error = new AssertionError(msg);
|
||||
if (failed != null) {
|
||||
failed.addSuppressed(error);
|
||||
} else failed = error;
|
||||
}
|
||||
} catch (InterruptedException ie) {
|
||||
if (failed != null) {
|
||||
@ -268,8 +259,7 @@ public class AsyncShutdownNow implements HttpServerAdapters {
|
||||
|
||||
@Test(dataProvider = "positive")
|
||||
void testSequential(String uriString) throws Exception {
|
||||
out.printf("%n---- starting (%s) ----%n", uriString);
|
||||
ExecutorService readerService = Executors.newCachedThreadPool();
|
||||
out.printf("%n---- starting sequential (%s) ----%n%n", uriString);
|
||||
HttpClient client = HttpClient.newBuilder()
|
||||
.proxy(NO_PROXY)
|
||||
.followRedirects(Redirect.ALWAYS)
|
||||
@ -339,7 +329,7 @@ public class AsyncShutdownNow implements HttpServerAdapters {
|
||||
} catch (Throwable throwable) {
|
||||
failed = throwable;
|
||||
} finally {
|
||||
failed = cleanup(client, readerService, failed);
|
||||
failed = cleanup(client, failed);
|
||||
}
|
||||
if (failed instanceof Exception ex) throw ex;
|
||||
if (failed instanceof Error e) throw e;
|
||||
@ -354,6 +344,7 @@ public class AsyncShutdownNow implements HttpServerAdapters {
|
||||
sslContext = new SimpleSSLContext().get();
|
||||
if (sslContext == null)
|
||||
throw new AssertionError("Unexpected null sslContext");
|
||||
readerService = Executors.newCachedThreadPool();
|
||||
|
||||
httpTestServer = HttpTestServer.create(HTTP_1_1);
|
||||
httpTestServer.addHandler(new ServerRequestHandler(), "/http1/exec/");
|
||||
@ -380,6 +371,7 @@ public class AsyncShutdownNow implements HttpServerAdapters {
|
||||
Thread.sleep(100);
|
||||
AssertionError fail = TRACKER.checkShutdown(5000);
|
||||
try {
|
||||
shutdown(readerService);
|
||||
httpTestServer.stop();
|
||||
httpsTestServer.stop();
|
||||
http2TestServer.stop();
|
||||
@ -389,6 +381,15 @@ public class AsyncShutdownNow implements HttpServerAdapters {
|
||||
}
|
||||
}
|
||||
|
||||
static void shutdown(ExecutorService executorService) {
|
||||
try {
|
||||
executorService.shutdown();
|
||||
executorService.awaitTermination(2000, TimeUnit.MILLISECONDS);
|
||||
} catch (InterruptedException ie) {
|
||||
executorService.shutdownNow();
|
||||
}
|
||||
}
|
||||
|
||||
static class ServerRequestHandler implements HttpTestHandler {
|
||||
ConcurrentHashMap<String,String> closedRequests = new ConcurrentHashMap<>();
|
||||
|
||||
|
@ -38,6 +38,7 @@
|
||||
*/
|
||||
// -Djdk.internal.httpclient.debug=true
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
@ -56,8 +57,14 @@ import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Flow;
|
||||
import java.util.concurrent.Flow.Publisher;
|
||||
import java.util.concurrent.Flow.Subscriber;
|
||||
import java.util.concurrent.Flow.Subscription;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import jdk.httpclient.test.lib.common.HttpServerAdapters;
|
||||
import javax.net.ssl.SSLContext;
|
||||
|
||||
@ -69,6 +76,7 @@ import org.testng.annotations.DataProvider;
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
import static java.lang.System.err;
|
||||
import static java.lang.System.in;
|
||||
import static java.lang.System.out;
|
||||
import static java.net.http.HttpClient.Builder.NO_PROXY;
|
||||
import static java.net.http.HttpClient.Version.HTTP_1_1;
|
||||
@ -86,6 +94,7 @@ public class HttpClientClose implements HttpServerAdapters {
|
||||
}
|
||||
static final Random RANDOM = RandomFactory.getRandom();
|
||||
|
||||
ExecutorService readerService;
|
||||
SSLContext sslContext;
|
||||
HttpTestServer httpTestServer; // HTTP/1.1 [ 4 servers ]
|
||||
HttpTestServer httpsTestServer; // HTTPS/1.1
|
||||
@ -112,20 +121,71 @@ public class HttpClientClose implements HttpServerAdapters {
|
||||
static final AtomicLong requestCounter = new AtomicLong();
|
||||
final ReferenceTracker TRACKER = ReferenceTracker.INSTANCE;
|
||||
|
||||
static String readBody(InputStream in) {
|
||||
try {
|
||||
static String readBody(InputStream body) {
|
||||
try (InputStream in = body) {
|
||||
return new String(in.readAllBytes(), StandardCharsets.UTF_8);
|
||||
} catch (IOException io) {
|
||||
throw new UncheckedIOException(io);
|
||||
}
|
||||
}
|
||||
|
||||
private static record CancellingSubscriber<U>(ExchangeResult<?> result)
|
||||
implements Subscriber<U> {
|
||||
@Override
|
||||
public void onSubscribe(Subscription subscription) {
|
||||
out.printf("%s: cancelling subscription", result.step());
|
||||
subscription.cancel();
|
||||
}
|
||||
@Override
|
||||
public void onNext(U item) {}
|
||||
@Override
|
||||
public void onError(Throwable throwable) {}
|
||||
@Override
|
||||
public void onComplete() {}
|
||||
}
|
||||
|
||||
private static <U> void ensureClosed(ExchangeResult<U> result) {
|
||||
var response = result.response;
|
||||
if (response == null) return;
|
||||
var body = response.body();
|
||||
try {
|
||||
if (body instanceof Closeable cl) {
|
||||
cl.close();
|
||||
} else if (body instanceof Publisher<?> pub) {
|
||||
pub.subscribe(new CancellingSubscriber<Object>(result));
|
||||
}
|
||||
} catch (IOException io) {
|
||||
out.printf("%s: Failed to close body: %s", result.step(), io);
|
||||
io.printStackTrace(out);
|
||||
}
|
||||
}
|
||||
|
||||
private record ExchangeResult<T>(int step, HttpResponse<T> response) {
|
||||
public static <U> ExchangeResult<U> ofStep(int step) {
|
||||
return new ExchangeResult<U>(step, null);
|
||||
}
|
||||
ExchangeResult<T> withResponse(HttpResponse<T> response) {
|
||||
return new ExchangeResult(step, response);
|
||||
}
|
||||
ExchangeResult<T> assertResponseState() {
|
||||
try {
|
||||
out.println(step + ": Got response: " + response);
|
||||
assertEquals(response.statusCode(), 200);
|
||||
} catch (AssertionError error) {
|
||||
out.printf("%s: Closing body due to assertion - %s", error);
|
||||
ensureClosed(this);
|
||||
throw error;
|
||||
}
|
||||
return this;
|
||||
}
|
||||
}
|
||||
|
||||
@Test(dataProvider = "positive")
|
||||
void testConcurrent(String uriString) throws Exception {
|
||||
out.printf("%n---- starting (%s) ----%n", uriString);
|
||||
ExecutorService readerService = Executors.newCachedThreadPool();
|
||||
out.printf("%n---- starting concurrent (%s) ----%n%n", uriString);
|
||||
Throwable failed = null;
|
||||
HttpClient toCheck = null;
|
||||
List<CompletableFuture<String>> bodies = new ArrayList<>();
|
||||
try (HttpClient client = toCheck = HttpClient.newBuilder()
|
||||
.proxy(NO_PROXY)
|
||||
.followRedirects(Redirect.ALWAYS)
|
||||
@ -133,7 +193,6 @@ public class HttpClientClose implements HttpServerAdapters {
|
||||
.build()) {
|
||||
TRACKER.track(client);
|
||||
|
||||
List<CompletableFuture<String>> bodies = new ArrayList<>();
|
||||
for (int i = 0; i < ITERATIONS; i++) {
|
||||
URI uri = URI.create(uriString + "/concurrent/iteration-" + i);
|
||||
HttpRequest request = HttpRequest.newBuilder(uri)
|
||||
@ -143,12 +202,11 @@ public class HttpClientClose implements HttpServerAdapters {
|
||||
CompletableFuture<HttpResponse<InputStream>> responseCF;
|
||||
CompletableFuture<String> bodyCF;
|
||||
final int si = i;
|
||||
ExchangeResult<InputStream> result = ExchangeResult.ofStep(si);
|
||||
responseCF = client.sendAsync(request, BodyHandlers.ofInputStream())
|
||||
.thenApply((response) -> {
|
||||
out.println(si + ": Got response: " + response);
|
||||
assertEquals(response.statusCode(), 200);
|
||||
return response;
|
||||
});
|
||||
.thenApply(result::withResponse)
|
||||
.thenApplyAsync(ExchangeResult::assertResponseState, readerService)
|
||||
.thenApply(ExchangeResult::response);
|
||||
bodyCF = responseCF.thenApplyAsync(HttpResponse::body, readerService)
|
||||
.thenApply(HttpClientClose::readBody)
|
||||
.thenApply((s) -> {
|
||||
@ -163,33 +221,15 @@ public class HttpClientClose implements HttpServerAdapters {
|
||||
var cf = bodyCF;
|
||||
bodies.add(cf);
|
||||
}
|
||||
CompletableFuture.allOf(bodies.toArray(new CompletableFuture<?>[0])).get();
|
||||
} catch (Throwable throwable) {
|
||||
failed = throwable;
|
||||
} finally {
|
||||
failed = cleanup(readerService, failed);
|
||||
}
|
||||
if (failed instanceof Exception ex) throw ex;
|
||||
if (failed instanceof Error e) throw e;
|
||||
assertTrue(toCheck.isTerminated());
|
||||
}
|
||||
|
||||
static Throwable cleanup(ExecutorService readerService, Throwable failed) {
|
||||
try {
|
||||
readerService.shutdown();
|
||||
readerService.awaitTermination(2000, TimeUnit.MILLISECONDS);
|
||||
} catch (InterruptedException ie) {
|
||||
if (failed != null) {
|
||||
failed.addSuppressed(ie);
|
||||
} else failed = ie;
|
||||
}
|
||||
return failed;
|
||||
// assert all operations eventually terminate
|
||||
CompletableFuture.allOf(bodies.toArray(new CompletableFuture<?>[0])).get();
|
||||
}
|
||||
|
||||
@Test(dataProvider = "positive")
|
||||
void testSequential(String uriString) throws Exception {
|
||||
out.printf("%n---- starting (%s) ----%n", uriString);
|
||||
ExecutorService readerService = Executors.newCachedThreadPool();
|
||||
out.printf("%n---- starting sequential (%s) ----%n%n", uriString);
|
||||
Throwable failed = null;
|
||||
HttpClient toCheck = null;
|
||||
try (HttpClient client = toCheck = HttpClient.newBuilder()
|
||||
@ -206,14 +246,13 @@ public class HttpClientClose implements HttpServerAdapters {
|
||||
.build();
|
||||
out.printf("Iteration %d request: %s%n", i, request.uri());
|
||||
final int si = i;
|
||||
ExchangeResult<InputStream> result = ExchangeResult.ofStep(si);
|
||||
CompletableFuture<HttpResponse<InputStream>> responseCF;
|
||||
CompletableFuture<String> bodyCF;
|
||||
responseCF = client.sendAsync(request, BodyHandlers.ofInputStream())
|
||||
.thenApply((response) -> {
|
||||
out.println(si + ": Got response: " + response);
|
||||
assertEquals(response.statusCode(), 200);
|
||||
return response;
|
||||
});
|
||||
.thenApply(result::withResponse)
|
||||
.thenApplyAsync(ExchangeResult::assertResponseState, readerService)
|
||||
.thenApply(ExchangeResult::response);
|
||||
bodyCF = responseCF.thenApplyAsync(HttpResponse::body, readerService)
|
||||
.thenApply(HttpClientClose::readBody)
|
||||
.thenApply((s) -> {
|
||||
@ -231,13 +270,7 @@ public class HttpClientClose implements HttpServerAdapters {
|
||||
}
|
||||
bodyCF.get();
|
||||
}
|
||||
} catch (Throwable throwable) {
|
||||
failed = throwable;
|
||||
} finally {
|
||||
failed = cleanup(readerService, failed);
|
||||
}
|
||||
if (failed instanceof Exception ex) throw ex;
|
||||
if (failed instanceof Error e) throw e;
|
||||
assertTrue(toCheck.isTerminated());
|
||||
}
|
||||
|
||||
@ -249,7 +282,7 @@ public class HttpClientClose implements HttpServerAdapters {
|
||||
sslContext = new SimpleSSLContext().get();
|
||||
if (sslContext == null)
|
||||
throw new AssertionError("Unexpected null sslContext");
|
||||
|
||||
readerService = Executors.newCachedThreadPool();
|
||||
httpTestServer = HttpTestServer.create(HTTP_1_1);
|
||||
httpTestServer.addHandler(new ServerRequestHandler(), "/http1/exec/");
|
||||
httpURI = "http://" + httpTestServer.serverAuthority() + "/http1/exec/retry";
|
||||
@ -275,6 +308,7 @@ public class HttpClientClose implements HttpServerAdapters {
|
||||
Thread.sleep(100);
|
||||
AssertionError fail = TRACKER.checkShutdown(5000);
|
||||
try {
|
||||
shutdown(readerService);
|
||||
httpTestServer.stop();
|
||||
httpsTestServer.stop();
|
||||
http2TestServer.stop();
|
||||
@ -284,6 +318,15 @@ public class HttpClientClose implements HttpServerAdapters {
|
||||
}
|
||||
}
|
||||
|
||||
static void shutdown(ExecutorService executorService) {
|
||||
try {
|
||||
executorService.shutdown();
|
||||
executorService.awaitTermination(2000, TimeUnit.MILLISECONDS);
|
||||
} catch (InterruptedException ie) {
|
||||
executorService.shutdownNow();
|
||||
}
|
||||
}
|
||||
|
||||
static class ServerRequestHandler implements HttpTestHandler {
|
||||
ConcurrentHashMap<String,String> closedRequests = new ConcurrentHashMap<>();
|
||||
|
||||
|
@ -39,6 +39,7 @@
|
||||
*/
|
||||
// -Djdk.internal.httpclient.debug=true
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
@ -61,6 +62,9 @@ import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Flow.Publisher;
|
||||
import java.util.concurrent.Flow.Subscriber;
|
||||
import java.util.concurrent.Flow.Subscription;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import jdk.httpclient.test.lib.common.HttpServerAdapters;
|
||||
@ -91,6 +95,7 @@ public class HttpClientShutdown implements HttpServerAdapters {
|
||||
}
|
||||
static final Random RANDOM = RandomFactory.getRandom();
|
||||
|
||||
ExecutorService readerService;
|
||||
SSLContext sslContext;
|
||||
HttpTestServer httpTestServer; // HTTP/1.1 [ 4 servers ]
|
||||
HttpTestServer httpsTestServer; // HTTPS/1.1
|
||||
@ -115,7 +120,7 @@ public class HttpClientShutdown implements HttpServerAdapters {
|
||||
}
|
||||
|
||||
static final AtomicLong requestCounter = new AtomicLong();
|
||||
final ReferenceTracker TRACKER = ReferenceTracker.INSTANCE;
|
||||
static final ReferenceTracker TRACKER = ReferenceTracker.INSTANCE;
|
||||
static volatile long start = System.nanoTime();
|
||||
|
||||
static final String now() {
|
||||
@ -136,14 +141,65 @@ public class HttpClientShutdown implements HttpServerAdapters {
|
||||
return t;
|
||||
}
|
||||
|
||||
static String readBody(InputStream in) {
|
||||
try {
|
||||
static String readBody(InputStream body) {
|
||||
try (InputStream in = body) {
|
||||
return new String(in.readAllBytes(), StandardCharsets.UTF_8);
|
||||
} catch (IOException io) {
|
||||
throw new UncheckedIOException(io);
|
||||
}
|
||||
}
|
||||
|
||||
private static record CancellingSubscriber<U>(ExchangeResult<?> result)
|
||||
implements Subscriber<U> {
|
||||
@Override
|
||||
public void onSubscribe(Subscription subscription) {
|
||||
out.printf(now() + "%s: cancelling subscription", result.step());
|
||||
subscription.cancel();
|
||||
}
|
||||
@Override
|
||||
public void onNext(U item) {}
|
||||
@Override
|
||||
public void onError(Throwable throwable) {}
|
||||
@Override
|
||||
public void onComplete() {}
|
||||
}
|
||||
|
||||
private static <U> void ensureClosed(ExchangeResult<U> result) {
|
||||
var response = result.response;
|
||||
if (response == null) return;
|
||||
var body = response.body();
|
||||
try {
|
||||
if (body instanceof Closeable cl) {
|
||||
cl.close();
|
||||
} else if (body instanceof Publisher<?> pub) {
|
||||
pub.subscribe(new CancellingSubscriber<Object>(result));
|
||||
}
|
||||
} catch (IOException io) {
|
||||
out.printf(now() + "%s: Failed to close body: %s", result.step(), io);
|
||||
io.printStackTrace(out);
|
||||
}
|
||||
}
|
||||
|
||||
private record ExchangeResult<T>(int step, HttpResponse<T> response) {
|
||||
public static <U> ExchangeResult<U> ofStep(int step) {
|
||||
return new ExchangeResult<U>(step, null);
|
||||
}
|
||||
ExchangeResult<T> withResponse(HttpResponse<T> response) {
|
||||
return new ExchangeResult(step, response);
|
||||
}
|
||||
ExchangeResult<T> assertResponseState() {
|
||||
try {
|
||||
out.println(now() + step + ": Got response: " + response);
|
||||
assertEquals(response.statusCode(), 200);
|
||||
} catch (AssertionError error) {
|
||||
out.printf(now() + "%s: Closing body due to assertion - %s", error);
|
||||
ensureClosed(this);
|
||||
throw error;
|
||||
}
|
||||
return this;
|
||||
}
|
||||
}
|
||||
|
||||
static boolean hasExpectedMessage(IOException io) {
|
||||
String message = io.getMessage();
|
||||
if (message == null) return false;
|
||||
@ -177,8 +233,7 @@ public class HttpClientShutdown implements HttpServerAdapters {
|
||||
|
||||
@Test(dataProvider = "positive")
|
||||
void testConcurrent(String uriString) throws Exception {
|
||||
out.printf("%n---- %sstarting (%s) ----%n", now(), uriString);
|
||||
ExecutorService readerService = Executors.newCachedThreadPool();
|
||||
out.printf("%n---- %sstarting concurrent (%s) ----%n%n", now(), uriString);
|
||||
HttpClient client = HttpClient.newBuilder()
|
||||
.proxy(NO_PROXY)
|
||||
.followRedirects(Redirect.ALWAYS)
|
||||
@ -188,8 +243,8 @@ public class HttpClientShutdown implements HttpServerAdapters {
|
||||
|
||||
int step = RANDOM.nextInt(ITERATIONS);
|
||||
Throwable failed = null;
|
||||
List<CompletableFuture<String>> bodies = new ArrayList<>();
|
||||
try {
|
||||
List<CompletableFuture<String>> bodies = new ArrayList<>();
|
||||
for (int i = 0; i < ITERATIONS; i++) {
|
||||
URI uri = URI.create(uriString + "/concurrent/iteration-" + i);
|
||||
HttpRequest request = HttpRequest.newBuilder(uri)
|
||||
@ -199,12 +254,11 @@ public class HttpClientShutdown implements HttpServerAdapters {
|
||||
CompletableFuture<HttpResponse<InputStream>> responseCF;
|
||||
CompletableFuture<String> bodyCF;
|
||||
final int si = i;
|
||||
ExchangeResult<InputStream> result = ExchangeResult.ofStep(si);
|
||||
responseCF = client.sendAsync(request, BodyHandlers.ofInputStream())
|
||||
.thenApply((response) -> {
|
||||
out.println(now() + si + ": Got response: " + response);
|
||||
assertEquals(response.statusCode(), 200);
|
||||
return response;
|
||||
});
|
||||
.thenApply(result::withResponse)
|
||||
.thenApplyAsync(ExchangeResult::assertResponseState, readerService)
|
||||
.thenApply(ExchangeResult::response);
|
||||
bodyCF = responseCF.thenApplyAsync(HttpResponse::body, readerService)
|
||||
.thenApply(HttpClientShutdown::readBody)
|
||||
.thenApply((s) -> {
|
||||
@ -238,33 +292,32 @@ public class HttpClientShutdown implements HttpServerAdapters {
|
||||
});
|
||||
bodies.add(cf);
|
||||
}
|
||||
CompletableFuture.allOf(bodies.toArray(new CompletableFuture<?>[0])).get();
|
||||
} catch (Throwable throwable) {
|
||||
failed = throwable;
|
||||
} finally {
|
||||
failed = cleanup(client, readerService, failed);
|
||||
failed = cleanup(client, failed);
|
||||
}
|
||||
if (failed instanceof Exception ex) throw ex;
|
||||
if (failed instanceof Error e) throw e;
|
||||
assertTrue(client.isTerminated());
|
||||
// ensure all tasks have been successfully completed
|
||||
CompletableFuture.allOf(bodies.toArray(new CompletableFuture<?>[0])).get();
|
||||
}
|
||||
|
||||
static Throwable cleanup(HttpClient client, ExecutorService readerService, Throwable failed) {
|
||||
static Throwable cleanup(HttpClient client, Throwable failed) {
|
||||
try {
|
||||
try {
|
||||
out.println(now() + "awaiting termination...");
|
||||
if (client.awaitTermination(Duration.ofMillis(2000))) {
|
||||
out.println(now() + "Client terminated within expected delay");
|
||||
} else {
|
||||
out.println(now() + "Client still running!");
|
||||
AssertionError error = new AssertionError("client still running");
|
||||
if (failed != null) {
|
||||
failed.addSuppressed(error);
|
||||
} else failed = error;
|
||||
}
|
||||
} finally {
|
||||
readerService.shutdown();
|
||||
readerService.awaitTermination(2000, TimeUnit.MILLISECONDS);
|
||||
out.println(now() + "awaiting termination...");
|
||||
if (client.awaitTermination(Duration.ofMinutes(3))) {
|
||||
out.println(now() + "Client terminated within expected delay");
|
||||
} else {
|
||||
String msg = "Client %s still running: %s".formatted(
|
||||
client,
|
||||
TRACKER.diagnose(client));
|
||||
out.println(now() + msg);
|
||||
AssertionError error = new AssertionError(msg);
|
||||
if (failed != null) {
|
||||
failed.addSuppressed(error);
|
||||
} else failed = error;
|
||||
}
|
||||
} catch (InterruptedException ie) {
|
||||
if (failed != null) {
|
||||
@ -276,8 +329,7 @@ public class HttpClientShutdown implements HttpServerAdapters {
|
||||
|
||||
@Test(dataProvider = "positive")
|
||||
void testSequential(String uriString) throws Exception {
|
||||
out.printf("%n---- %sstarting (%s) ----%n", now(), uriString);
|
||||
ExecutorService readerService = Executors.newCachedThreadPool();
|
||||
out.printf("%n---- %sstarting sequential (%s) ----%n%n", now(), uriString);
|
||||
HttpClient client = HttpClient.newBuilder()
|
||||
.proxy(NO_PROXY)
|
||||
.followRedirects(Redirect.ALWAYS)
|
||||
@ -298,12 +350,11 @@ public class HttpClientShutdown implements HttpServerAdapters {
|
||||
final int si = i;
|
||||
CompletableFuture<HttpResponse<InputStream>> responseCF;
|
||||
CompletableFuture<String> bodyCF;
|
||||
ExchangeResult<InputStream> result = ExchangeResult.ofStep(si);
|
||||
responseCF = client.sendAsync(request, BodyHandlers.ofInputStream())
|
||||
.thenApply((response) -> {
|
||||
out.println(now() + si + ": Got response: " + response);
|
||||
assertEquals(response.statusCode(), 200);
|
||||
return response;
|
||||
});
|
||||
.thenApply(result::withResponse)
|
||||
.thenApplyAsync(ExchangeResult::assertResponseState, readerService)
|
||||
.thenApply(ExchangeResult::response);
|
||||
bodyCF = responseCF.thenApplyAsync(HttpResponse::body, readerService)
|
||||
.thenApply(HttpClientShutdown::readBody)
|
||||
.thenApply((s) -> {
|
||||
@ -350,7 +401,7 @@ public class HttpClientShutdown implements HttpServerAdapters {
|
||||
} catch (Throwable throwable) {
|
||||
failed = throwable;
|
||||
} finally {
|
||||
failed = cleanup(client, readerService, failed);
|
||||
failed = cleanup(client, failed);
|
||||
}
|
||||
if (failed instanceof Exception ex) throw ex;
|
||||
if (failed instanceof Error e) throw e;
|
||||
@ -365,6 +416,7 @@ public class HttpClientShutdown implements HttpServerAdapters {
|
||||
sslContext = new SimpleSSLContext().get();
|
||||
if (sslContext == null)
|
||||
throw new AssertionError("Unexpected null sslContext");
|
||||
readerService = Executors.newCachedThreadPool();
|
||||
|
||||
httpTestServer = HttpTestServer.create(HTTP_1_1);
|
||||
httpTestServer.addHandler(new ServerRequestHandler(), "/http1/exec/");
|
||||
@ -392,6 +444,7 @@ public class HttpClientShutdown implements HttpServerAdapters {
|
||||
Thread.sleep(100);
|
||||
AssertionError fail = TRACKER.checkShutdown(5000);
|
||||
try {
|
||||
shutdown(readerService);
|
||||
httpTestServer.stop();
|
||||
httpsTestServer.stop();
|
||||
http2TestServer.stop();
|
||||
@ -401,6 +454,15 @@ public class HttpClientShutdown implements HttpServerAdapters {
|
||||
}
|
||||
}
|
||||
|
||||
static void shutdown(ExecutorService executorService) {
|
||||
try {
|
||||
executorService.shutdown();
|
||||
executorService.awaitTermination(2000, TimeUnit.MILLISECONDS);
|
||||
} catch (InterruptedException ie) {
|
||||
executorService.shutdownNow();
|
||||
}
|
||||
}
|
||||
|
||||
static class ServerRequestHandler implements HttpTestHandler {
|
||||
ConcurrentHashMap<String,String> closedRequests = new ConcurrentHashMap<>();
|
||||
|
||||
|
@ -63,6 +63,14 @@ public class ReferenceTracker {
|
||||
return diagnose(warnings, (t) -> t.getOutstandingHttpOperations() > 0);
|
||||
}
|
||||
|
||||
public StringBuilder diagnose(Tracker tracker) {
|
||||
return diagnose(tracker, new StringBuilder(), (t) -> t.getOutstandingHttpOperations() > 0);
|
||||
}
|
||||
|
||||
public StringBuilder diagnose(HttpClient client) {
|
||||
return diagnose(getTracker(client));
|
||||
}
|
||||
|
||||
public StringBuilder diagnose(Tracker tracker, StringBuilder warnings, Predicate<Tracker> hasOutstanding) {
|
||||
checkOutstandingOperations(warnings, tracker, hasOutstanding);
|
||||
return warnings;
|
||||
|
Loading…
x
Reference in New Issue
Block a user