8277969: HttpClient SelectorManager shuts down when custom Executor rejects a task

Reviewed-by: jpai, michaelm
This commit is contained in:
Daniel Fuchs 2022-04-20 13:09:45 +00:00
parent 6c6d5223df
commit 5291ec8d56
24 changed files with 1862 additions and 268 deletions

@ -26,7 +26,6 @@
package jdk.internal.net.http;
import java.io.IOException;
import java.lang.System.Logger.Level;
import java.net.InetSocketAddress;
import java.net.ProxySelector;
import java.net.URI;

@ -1,5 +1,5 @@
/*
* Copyright (c) 2015, 2020, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2015, 2022, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -26,11 +26,12 @@
package jdk.internal.net.http;
import java.io.IOException;
import java.net.http.HttpClient;
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.ResponseInfo;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import jdk.internal.net.http.common.HttpBodySubscriberWrapper;
import jdk.internal.net.http.common.Logger;
import jdk.internal.net.http.common.MinimalFuture;
import jdk.internal.net.http.common.Utils;
@ -66,7 +67,7 @@ abstract class ExchangeImpl<T> {
return exchange;
}
HttpClient client() {
HttpClientImpl client() {
return exchange.client();
}
@ -181,6 +182,22 @@ abstract class ExchangeImpl<T> {
boolean returnConnectionToPool,
Executor executor);
/**
* Creates and wraps an {@link HttpResponse.BodySubscriber} from a {@link
* HttpResponse.BodyHandler} for the given {@link ResponseInfo}.
* An {@code HttpBodySubscriberWrapper} wraps a response body subscriber and makes
* sure its completed/onError methods are called only once, and that its onSusbscribe
* is called before onError. This is useful when errors occur asynchronously, and
* most typically when the error occurs before the {@code BodySubscriber} has
* subscribed.
* @param handler a body handler
* @param response a response info
* @return a new {@code HttpBodySubscriberWrapper} to handle the response
*/
HttpBodySubscriberWrapper<T> createResponseSubscriber(HttpResponse.BodyHandler<T> handler, ResponseInfo response) {
return new HttpBodySubscriberWrapper<>(handler.apply(response));
}
/**
* Ignore/consume the body.
*/

@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2021, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2022, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -497,7 +497,8 @@ class Http1AsyncReceiver {
final Throwable t = (recorded == null ? ex : recorded);
if (debug.on())
debug.log("recorded " + t + "\n\t delegate: " + delegate
+ "\t\t queue.isEmpty: " + queue.isEmpty(), ex);
+ "\n\t queue.isEmpty: " + queue.isEmpty()
+ "\n\tstopRequested: " + stopRequested, ex);
if (Log.errors()) {
Log.logError("HTTP/1 read subscriber recorded error: {0} - {1}", describe(), t);
}

@ -1,5 +1,5 @@
/*
* Copyright (c) 2015, 2021, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2015, 2022, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -27,10 +27,10 @@ package jdk.internal.net.http;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.http.HttpClient;
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodyHandler;
import java.net.http.HttpResponse.BodySubscriber;
import java.net.http.HttpResponse.ResponseInfo;
import java.nio.ByteBuffer;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
@ -39,7 +39,9 @@ import java.util.List;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
import jdk.internal.net.http.common.Demand;
import jdk.internal.net.http.common.HttpBodySubscriberWrapper;
import jdk.internal.net.http.common.Log;
import jdk.internal.net.http.common.FlowTube;
import jdk.internal.net.http.common.Logger;
@ -47,7 +49,6 @@ import jdk.internal.net.http.common.SequentialScheduler;
import jdk.internal.net.http.common.MinimalFuture;
import jdk.internal.net.http.common.Utils;
import static java.net.http.HttpClient.Version.HTTP_1_1;
import static jdk.internal.net.http.common.Utils.wrapWithExtraDetail;
/**
* Encapsulates one HTTP/1.1 request/response exchange.
@ -78,7 +79,7 @@ class Http1Exchange<T> extends ExchangeImpl<T> {
final ConcurrentLinkedDeque<DataPair> outgoing = new ConcurrentLinkedDeque<>();
/** The write publisher, responsible for writing the complete request ( both
* headers and body ( if any ). */
* headers and body ( if any )). */
private final Http1Publisher writePublisher = new Http1Publisher();
/** Completed when the header have been published, or there is an error */
@ -86,8 +87,10 @@ class Http1Exchange<T> extends ExchangeImpl<T> {
/** Completed when the body has been published, or there is an error */
private final CompletableFuture<ExchangeImpl<T>> bodySentCF = new MinimalFuture<>();
/** The subscriber to the request's body published. Maybe null. */
private volatile Http1BodySubscriber bodySubscriber;
/** The subscriber to the request's body published. May be null. */
private volatile Http1RequestBodySubscriber bodySubscriber;
/** The subscriber to the response's body received. May be null. */
private volatile BodySubscriber<T> responseSubscriber;
enum State { INITIAL,
HEADERS,
@ -117,12 +120,12 @@ class Http1Exchange<T> extends ExchangeImpl<T> {
* concrete implementations: {@link Http1Request.StreamSubscriber}, and
* {@link Http1Request.FixedContentSubscriber}, for receiving chunked and
* fixed length bodies, respectively. */
abstract static class Http1BodySubscriber implements Flow.Subscriber<ByteBuffer> {
abstract static class Http1RequestBodySubscriber implements Flow.Subscriber<ByteBuffer> {
final MinimalFuture<Flow.Subscription> whenSubscribed = new MinimalFuture<>();
private volatile Flow.Subscription subscription;
volatile boolean complete;
private final Logger debug;
Http1BodySubscriber(Logger debug) {
Http1RequestBodySubscriber(Logger debug) {
assert debug != null;
this.debug = debug;
}
@ -159,8 +162,8 @@ class Http1Exchange<T> extends ExchangeImpl<T> {
}
}
static Http1BodySubscriber completeSubscriber(Logger debug) {
return new Http1BodySubscriber(debug) {
static Http1RequestBodySubscriber completeSubscriber(Logger debug) {
return new Http1RequestBodySubscriber(debug) {
@Override public void onSubscribe(Flow.Subscription subscription) { error(); }
@Override public void onNext(ByteBuffer item) { error(); }
@Override public void onError(Throwable throwable) { error(); }
@ -173,6 +176,34 @@ class Http1Exchange<T> extends ExchangeImpl<T> {
}
}
/**
* The Http1AsyncReceiver ensures that all calls to
* the subscriber, including onSubscribe, occur sequentially.
* There could however be some race conditions that could happen
* in case of unexpected errors thrown at unexpected places, which
* may cause onError to be called multiple times.
* The Http1BodySubscriber will ensure that the user subscriber
* is actually completed only once - and only after it is
* subscribed.
* @param <U> The type of response.
*/
static final class Http1ResponseBodySubscriber<U> extends HttpBodySubscriberWrapper<U> {
final Http1Exchange<U> exchange;
Http1ResponseBodySubscriber(BodySubscriber<U> userSubscriber, Http1Exchange<U> exchange) {
super(userSubscriber);
this.exchange = exchange;
}
@Override
protected void complete(Throwable t) {
try {
exchange.responseSubscriberCompleted(this);
} finally {
super.complete(t);
}
}
}
@Override
public String toString() {
return "HTTP/1.1 " + request.toString();
@ -217,6 +248,28 @@ class Http1Exchange<T> extends ExchangeImpl<T> {
asyncReceiver.subscriber());
}
// The Http1ResponseBodySubscriber is registered with the HttpClient
// to ensure that it gets completed if the SelectorManager aborts due
// to unexpected exceptions.
void registerResponseSubscriber(Http1ResponseBodySubscriber<T> subscriber) {
Throwable failed = null;
synchronized (lock) {
failed = this.failed;
if (failed == null) {
this.responseSubscriber = subscriber;
}
}
if (failed != null) {
subscriber.onError(failed);
} else {
client.registerSubscriber(subscriber);
}
}
void responseSubscriberCompleted(HttpBodySubscriberWrapper<T> subscriber) {
client.subscriberCompleted(subscriber);
}
@Override
CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() {
// create the response before sending the request headers, so that
@ -321,12 +374,12 @@ class Http1Exchange<T> extends ExchangeImpl<T> {
if (debug.on()) debug.log("bodySubscriber is %s",
bodySubscriber == null ? null : bodySubscriber.getClass());
if (bodySubscriber == null) {
bodySubscriber = Http1BodySubscriber.completeSubscriber(debug);
appendToOutgoing(Http1BodySubscriber.COMPLETED);
bodySubscriber = Http1RequestBodySubscriber.completeSubscriber(debug);
appendToOutgoing(Http1RequestBodySubscriber.COMPLETED);
} else {
// start
bodySubscriber.whenSubscribed
.thenAccept((s) -> cancelIfFailed(s))
.thenAccept(this::cancelIfFailed)
.thenAccept((s) -> requestMoreBody());
}
} catch (Throwable t) {
@ -370,15 +423,24 @@ class Http1Exchange<T> extends ExchangeImpl<T> {
boolean returnConnectionToPool,
Executor executor)
{
BodySubscriber<T> bs = handler.apply(new ResponseInfoImpl(response.responseCode(),
response.responseHeaders(),
HTTP_1_1));
var responseInfo = new ResponseInfoImpl(response.responseCode(),
response.responseHeaders(), HTTP_1_1);
BodySubscriber<T> bs = createResponseSubscriber(handler, responseInfo);
CompletableFuture<T> bodyCF = response.readBody(bs,
returnConnectionToPool,
executor);
return bodyCF;
}
@Override
Http1ResponseBodySubscriber<T> createResponseSubscriber(BodyHandler<T> handler, ResponseInfo response) {
BodySubscriber<T> subscriber = handler.apply(response);
Http1ResponseBodySubscriber<T> bs =
new Http1ResponseBodySubscriber<T>(subscriber, this);
registerResponseSubscriber(bs);
return bs;
}
@Override
CompletableFuture<Void> ignoreBody() {
return response.ignoreBody(executor);
@ -430,8 +492,10 @@ class Http1Exchange<T> extends ExchangeImpl<T> {
private void cancelImpl(Throwable cause) {
LinkedList<CompletableFuture<?>> toComplete = null;
int count = 0;
Throwable error;
Throwable error = null;
BodySubscriber<?> subscriber;
synchronized (lock) {
subscriber = responseSubscriber;
if ((error = failed) == null) {
failed = error = cause;
}
@ -464,6 +528,15 @@ class Http1Exchange<T> extends ExchangeImpl<T> {
operations.clear();
}
}
// complete subscriber if needed
if (subscriber != null && error != null) {
var failure = error;
if (client.isSelectorThread()) {
executor.execute(() -> subscriber.onError(failure));
} else subscriber.onError(failure);
}
try {
Log.logError("Http1Exchange.cancel: count=" + count);
if (toComplete != null) {
@ -598,7 +671,7 @@ class Http1Exchange<T> extends ExchangeImpl<T> {
headersSentCF.completeAsync(() -> this, exec);
break;
case BODY:
if (dp.data == Http1BodySubscriber.COMPLETED) {
if (dp.data == Http1RequestBodySubscriber.COMPLETED) {
synchronized (lock) {
state = State.COMPLETING;
}
@ -709,7 +782,7 @@ class Http1Exchange<T> extends ExchangeImpl<T> {
writeScheduler.stop();
} else {
List<ByteBuffer> data = dp.data;
if (data == Http1BodySubscriber.COMPLETED) {
if (data == Http1RequestBodySubscriber.COMPLETED) {
synchronized (lock) {
assert state == State.COMPLETING : "Unexpected state:" + state;
state = State.COMPLETED;
@ -754,7 +827,8 @@ class Http1Exchange<T> extends ExchangeImpl<T> {
}
}
HttpClient client() {
@Override
final HttpClientImpl client() {
return client;
}

@ -1,5 +1,5 @@
/*
* Copyright (c) 2015, 2021, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2015, 2022, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -38,7 +38,7 @@ import java.util.concurrent.Flow;
import java.util.function.BiPredicate;
import java.net.http.HttpHeaders;
import java.net.http.HttpRequest;
import jdk.internal.net.http.Http1Exchange.Http1BodySubscriber;
import jdk.internal.net.http.Http1Exchange.Http1RequestBodySubscriber;
import jdk.internal.net.http.common.HttpHeadersBuilder;
import jdk.internal.net.http.common.Log;
import jdk.internal.net.http.common.Logger;
@ -314,8 +314,8 @@ class Http1Request {
return List.of(b);
}
Http1BodySubscriber continueRequest() {
Http1BodySubscriber subscriber;
Http1RequestBodySubscriber continueRequest() {
Http1RequestBodySubscriber subscriber;
if (streaming) {
subscriber = new StreamSubscriber();
requestPublisher.subscribe(subscriber);
@ -329,7 +329,7 @@ class Http1Request {
return subscriber;
}
final class StreamSubscriber extends Http1BodySubscriber {
final class StreamSubscriber extends Http1RequestBodySubscriber {
StreamSubscriber() { super(debug); }
@ -392,7 +392,7 @@ class Http1Request {
}
}
final class FixedContentSubscriber extends Http1BodySubscriber {
final class FixedContentSubscriber extends Http1RequestBodySubscriber {
private volatile long contentWritten;
FixedContentSubscriber() { super(debug); }

@ -1,5 +1,5 @@
/*
* Copyright (c) 2015, 2021, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2015, 2022, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -27,13 +27,10 @@ package jdk.internal.net.http;
import java.io.EOFException;
import java.lang.System.Logger.Level;
import java.net.http.HttpResponse.BodySubscriber;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;
@ -41,7 +38,6 @@ import java.net.http.HttpHeaders;
import java.net.http.HttpResponse;
import jdk.internal.net.http.ResponseContent.BodyParser;
import jdk.internal.net.http.ResponseContent.UnknownLengthBodyParser;
import jdk.internal.net.http.ResponseSubscribers.TrustedSubscriber;
import jdk.internal.net.http.common.Log;
import jdk.internal.net.http.common.Logger;
import jdk.internal.net.http.common.MinimalFuture;
@ -74,7 +70,7 @@ class Http1Response<T> {
private static final int MAX_IGNORE = 1024;
// Revisit: can we get rid of this?
static enum State {INITIAL, READING_HEADERS, READING_BODY, DONE}
enum State {INITIAL, READING_HEADERS, READING_BODY, DONE}
private volatile State readProgress = State.INITIAL;
final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
@ -123,7 +119,7 @@ class Http1Response<T> {
// state & 0x02 != 0 => tryRelease called
byte state;
public synchronized void acquire() {
public synchronized boolean acquire() {
if (state == 0) {
// increment the reference count on the HttpClientImpl
// to prevent the SelectorManager thread from exiting
@ -132,11 +128,13 @@ class Http1Response<T> {
debug.log("Operation started: incrementing ref count for %s", client);
client.reference();
state = 0x01;
return true;
} else {
if (debug.on())
debug.log("Operation ref count for %s is already %s",
client, ((state & 0x2) == 0x2) ? "released." : "incremented!" );
assert (state & 0x01) == 0 : "reference count already incremented";
return false;
}
}
@ -277,119 +275,6 @@ class Http1Response<T> {
}
}
static final Flow.Subscription NOP = new Flow.Subscription() {
@Override
public void request(long n) { }
public void cancel() { }
};
/**
* The Http1AsyncReceiver ensures that all calls to
* the subscriber, including onSubscribe, occur sequentially.
* There could however be some race conditions that could happen
* in case of unexpected errors thrown at unexpected places, which
* may cause onError to be called multiple times.
* The Http1BodySubscriber will ensure that the user subscriber
* is actually completed only once - and only after it is
* subscribed.
* @param <U> The type of response.
*/
static final class Http1BodySubscriber<U> implements TrustedSubscriber<U> {
final HttpResponse.BodySubscriber<U> userSubscriber;
final AtomicBoolean completed = new AtomicBoolean();
volatile Throwable withError;
volatile boolean subscribed;
Http1BodySubscriber(HttpResponse.BodySubscriber<U> userSubscriber) {
this.userSubscriber = userSubscriber;
}
@Override
public boolean needsExecutor() {
return TrustedSubscriber.needsExecutor(userSubscriber);
}
// propagate the error to the user subscriber, even if not
// subscribed yet.
private void propagateError(Throwable t) {
assert t != null;
try {
// if unsubscribed at this point, it will not
// get subscribed later - so do it now and
// propagate the error
if (subscribed == false) {
subscribed = true;
userSubscriber.onSubscribe(NOP);
}
} finally {
// if onError throws then there is nothing to do
// here: let the caller deal with it by logging
// and closing the connection.
userSubscriber.onError(t);
}
}
// complete the subscriber, either normally or exceptionally
// ensure that the subscriber is completed only once.
private void complete(Throwable t) {
if (completed.compareAndSet(false, true)) {
t = withError = Utils.getCompletionCause(t);
if (t == null) {
assert subscribed;
try {
userSubscriber.onComplete();
} catch (Throwable x) {
// Simply propagate the error by calling
// onError on the user subscriber, and let the
// connection be reused since we should have received
// and parsed all the bytes when we reach here.
// If onError throws in turn, then we will simply
// let that new exception flow up to the caller
// and let it deal with it.
// (i.e: log and close the connection)
// Note that rethrowing here could introduce a
// race that might cause the next send() operation to
// fail as the connection has already been put back
// into the cache when we reach here.
propagateError(t = withError = Utils.getCompletionCause(x));
}
} else {
propagateError(t);
}
}
}
@Override
public CompletionStage<U> getBody() {
return userSubscriber.getBody();
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
if (!subscribed) {
subscribed = true;
userSubscriber.onSubscribe(subscription);
} else {
// could be already subscribed and completed
// if an unexpected error occurred before the actual
// subscription - though that's not supposed
// happen.
assert completed.get();
}
}
@Override
public void onNext(List<ByteBuffer> item) {
assert !completed.get();
userSubscriber.onNext(item);
}
@Override
public void onError(Throwable throwable) {
complete(throwable);
}
@Override
public void onComplete() {
complete(null);
}
}
public <U> CompletableFuture<U> readBody(HttpResponse.BodySubscriber<U> p,
boolean return2Cache,
@ -398,12 +283,13 @@ class Http1Response<T> {
debug.log("readBody: return2Cache: " + return2Cache);
if (request.isWebSocket() && return2Cache && connection != null) {
debug.log("websocket connection will be returned to cache: "
+ connection.getClass() + "/" + connection );
+ connection.getClass() + "/" + connection);
}
}
assert !return2Cache || !request.isWebSocket();
this.return2Cache = return2Cache;
final Http1BodySubscriber<U> subscriber = new Http1BodySubscriber<>(p);
final BodySubscriber<U> subscriber = p;
final CompletableFuture<U> cf = new MinimalFuture<>();
@ -420,6 +306,7 @@ class Http1Response<T> {
// tracker has been incremented.
connection.client().reference();
executor.execute(() -> {
boolean acquired = false;
try {
content = new ResponseContent(
connection, clen, headers, subscriber,
@ -433,7 +320,8 @@ class Http1Response<T> {
// increment the reference count on the HttpClientImpl
// to prevent the SelectorManager thread from exiting until
// the body is fully read.
refCountTracker.acquire();
acquired = refCountTracker.acquire();
assert acquired == true;
bodyParser = content.getBodyParser(
(t) -> {
try {
@ -457,7 +345,7 @@ class Http1Response<T> {
assert bodyReaderCF != null : "parsing not started";
// Make sure to keep a reference to asyncReceiver from
// within this
CompletableFuture<?> trailingOp = bodyReaderCF.whenComplete((s,t) -> {
CompletableFuture<?> trailingOp = bodyReaderCF.whenComplete((s, t) -> {
t = Utils.getCompletionCause(t);
try {
if (t == null) {
@ -479,11 +367,12 @@ class Http1Response<T> {
});
connection.addTrailingOperation(trailingOp);
} catch (Throwable t) {
if (debug.on()) debug.log("Failed reading body: " + t);
if (debug.on()) debug.log("Failed reading body: " + t);
try {
subscriber.onError(t);
cf.completeExceptionally(t);
} finally {
if (acquired) refCountTracker.tryRelease();
asyncReceiver.onReadError(t);
}
} finally {
@ -492,6 +381,7 @@ class Http1Response<T> {
});
ResponseSubscribers.getBodyAsync(executor, p, cf, (t) -> {
subscriber.onError(t);
cf.completeExceptionally(t);
asyncReceiver.setRetryOnError(false);
asyncReceiver.onReadError(t);
@ -752,12 +642,14 @@ class Http1Response<T> {
@Override
public final void onReadError(Throwable t) {
if (t instanceof EOFException && bodyParser != null &&
bodyParser instanceof UnknownLengthBodyParser) {
((UnknownLengthBodyParser)bodyParser).complete();
BodyParser parser = bodyParser;
if (t instanceof EOFException && parser != null &&
parser instanceof UnknownLengthBodyParser ulBodyParser) {
ulBodyParser.complete();
return;
}
t = wrapWithExtraDetail(t, parser::currentStateMessage);
parser.onError(t);
Http1Response.this.onReadError(t);
}
@ -824,6 +716,20 @@ class Http1Response<T> {
cf.complete(State.READING_BODY);
}
}
if (error != null) {
// makes sure the parser gets the error
BodyParser parser = this.parser;
if (parser != null) {
if (debug.on()) {
debug.log("propagating error to parser: " + error);
}
parser.onError(error);
} else {
if (debug.on()) {
debug.log("no parser - error not propagated: " + error);
}
}
}
}
@Override

@ -57,6 +57,7 @@ class Http2ClientImpl {
Utils.getDebugLogger("Http2ClientImpl"::toString, Utils.DEBUG);
private final HttpClientImpl client;
private volatile boolean stopping;
Http2ClientImpl(HttpClientImpl client) {
this.client = client;
@ -160,6 +161,11 @@ class Http2ClientImpl {
String key = c.key();
synchronized(this) {
if (stopping) {
if (debug.on()) debug.log("stopping - closing connection: %s", c);
close(c);
return false;
}
if (!c.isOpen()) {
if (debug.on())
debug.log("skipping offered closed or closing connection: %s", c);
@ -193,16 +199,24 @@ class Http2ClientImpl {
private EOFException STOPPED;
void stop() {
synchronized (this) {stopping = true;}
if (debug.on()) debug.log("stopping");
STOPPED = new EOFException("HTTP/2 client stopped");
STOPPED.setStackTrace(new StackTraceElement[0]);
connections.values().forEach(this::close);
connections.clear();
do {
connections.values().forEach(this::close);
} while (!connections.isEmpty());
}
private void close(Http2Connection h2c) {
// close all streams
try { h2c.closeAllStreams(); } catch (Throwable t) {}
// send GOAWAY
try { h2c.close(); } catch (Throwable t) {}
// attempt graceful shutdown
try { h2c.shutdown(STOPPED); } catch (Throwable t) {}
// double check and close any new streams
try { h2c.closeAllStreams(); } catch (Throwable t) {}
}
HttpClientImpl client() {

@ -968,16 +968,29 @@ class Http2Connection {
}
}
// This method is called when the HTTP/2 client is being
// stopped. Do not call it from anywhere else.
void closeAllStreams() {
for (var streamId : streams.keySet()) {
// safe to call without locking - see Stream::deRegister
decrementStreamsCount(streamId);
closeStream(streamId);
}
}
void closeStream(int streamid) {
if (debug.on()) debug.log("Closed stream %d", streamid);
boolean isClient = (streamid % 2) == 1;
Stream<?> s = streams.remove(streamid);
if (s != null) {
// decrement the reference count on the HttpClientImpl
// to allow the SelectorManager thread to exit if no
// other operation is pending and the facade is no
// longer referenced.
client().streamUnreference();
Stream<?> s;
synchronized (this) {
s = streams.remove(streamid);
if (s != null) {
// decrement the reference count on the HttpClientImpl
// to allow the SelectorManager thread to exit if no
// other operation is pending and the facade is no
// longer referenced.
client().streamUnreference();
}
}
// ## Remove s != null. It is a hack for delayed cancellation,reset
if (s != null && !(s instanceof Stream.PushedStream)) {
@ -1153,8 +1166,19 @@ class Http2Connection {
// increment the reference count on the HttpClientImpl
// to prevent the SelectorManager thread from exiting until
// the stream is closed.
client().streamReference();
streams.put(streamid, stream);
synchronized (this) {
if (!closed) {
if (debug.on()) {
debug.log("Opened stream %d", streamid);
}
client().streamReference();
streams.put(streamid, stream);
return;
}
}
if (debug.on()) debug.log("connection closed: closing stream %d", stream);
stream.cancel();
}
/**

@ -42,6 +42,7 @@ import java.net.http.HttpTimeoutException;
import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
@ -62,13 +63,18 @@ import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.BooleanSupplier;
import java.util.stream.Stream;
import java.net.http.HttpClient;
@ -77,9 +83,12 @@ import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodyHandler;
import java.net.http.HttpResponse.PushPromiseHandler;
import java.net.http.WebSocket;
import jdk.internal.net.http.common.BufferSupplier;
import jdk.internal.net.http.common.HttpBodySubscriberWrapper;
import jdk.internal.net.http.common.Log;
import jdk.internal.net.http.common.Logger;
import jdk.internal.net.http.common.MinimalFuture;
import jdk.internal.net.http.common.Pair;
import jdk.internal.net.http.common.Utils;
import jdk.internal.net.http.common.OperationTrackers.Trackable;
@ -100,6 +109,7 @@ final class HttpClientImpl extends HttpClient implements Trackable {
final Logger debugelapsed = Utils.getDebugLogger(this::dbgString, DEBUGELAPSED);
final Logger debugtimeout = Utils.getDebugLogger(this::dbgString, DEBUGTIMEOUT);
static final AtomicLong CLIENT_IDS = new AtomicLong();
private final AtomicLong CONNECTION_IDS = new AtomicLong();
// Define the default factory as a static inner class
// that embeds all the necessary logic to avoid
@ -138,24 +148,39 @@ final class HttpClientImpl extends HttpClient implements Trackable {
static final class DelegatingExecutor implements Executor {
private final BooleanSupplier isInSelectorThread;
private final Executor delegate;
DelegatingExecutor(BooleanSupplier isInSelectorThread, Executor delegate) {
private final BiConsumer<Runnable, Throwable> errorHandler;
DelegatingExecutor(BooleanSupplier isInSelectorThread,
Executor delegate,
BiConsumer<Runnable, Throwable> errorHandler) {
this.isInSelectorThread = isInSelectorThread;
this.delegate = delegate;
this.errorHandler = errorHandler;
}
Executor delegate() {
return delegate;
}
@Override
public void execute(Runnable command) {
if (isInSelectorThread.getAsBoolean()) {
delegate.execute(command);
ensureExecutedAsync(command);
} else {
command.run();
}
}
public void ensureExecutedAsync(Runnable command) {
try {
delegate.execute(command);
} catch (Throwable t) {
errorHandler.accept(command, t);
ASYNC_POOL.execute(command);
}
}
@SuppressWarnings("removal")
private void shutdown() {
if (delegate instanceof ExecutorService service) {
@ -167,7 +192,129 @@ final class HttpClientImpl extends HttpClient implements Trackable {
new RuntimePermission("modifyThread"));
}
}
}
// We maintain a list of pending requests that will be aborted if ever
// the selector manager thread exists abnormally.
// The request, its id, and its completable future, are stored in a record-like
// PendingRequest object added to the pending requests set (pendingRequests).
//
// When the request's cf completes, either normally or abnormally, a dependent action
// will remove the PendingRequest object from the pending requests set.
// If the SelectorManager threads exits abnormally, all pending requests in the
// pending requests set will be completed exceptionally, in the ASYNC_POOL.
//
// HttpClientImpl::registerPending(id, req, cf, client) is called from sendAsync
// to register the pending request in the pending requests set before returning
// the completable future to the caller.
//
// HttpClientImpl::abortPendingRequests(client, Throwable reason) is called from
// the SelectorManager when a throwable is caught just before exiting.
//
// A dependent action is registered with the pending request's cf to make sure
// that the pending request will be removed from the pending requests set if,
// or after, the cf is completed.
//
private final Set<PendingRequest> pendingRequests;
// an id to ensure total order of pending request objects
private final AtomicLong pendingRequestId = new AtomicLong();
private static final class PendingRequest implements Comparable<PendingRequest> {
final long id;
final HttpRequest request;
final CompletableFuture<?> cf;
final HttpClientImpl client;
final MultiExchange<?> mex;
Object ref;
private PendingRequest(long id,
HttpRequest request,
CompletableFuture<?> cf,
MultiExchange<?> mex,
HttpClientImpl client) {
this.id = id;
this.request = request;
this.cf = cf;
this.mex = mex;
this.client = client;
}
public void abort(Throwable t) {
try {
if (client.isSelectorThread()) {
var done = cf.exceptionally((e) -> null);
ASYNC_POOL.execute(() -> completeExceptionally(t));
// special case for when this method is called in the SelectorManager thread:
// we want to wait until all futures are completed before proceeding to
// shutdown. This ensures that the caller receive the actual `reason`
// and not something like "HTTP/2 client closed"...
done.join();
} else {
cf.completeExceptionally(t);
}
} finally {
mex.cancel(Utils.getIOException(t));
}
}
private void completeExceptionally(Throwable t) {
if (client.debug.on()) {
client.debug.log("aborting %s with %s", this, t);
}
try { cf.completeExceptionally(t); } catch (Throwable e) {
client.debug.log("Failed to complete cf for [%s]: %s", this, e);
}
}
@Override
public int compareTo(PendingRequest o) {
if (o == null) return 1;
return Long.compare(id, o.id);
}
public String toString() {
return id + ": " + request.toString();
}
}
static void registerPending(PendingRequest pending) {
// shortcut if cf is already completed: no need to go through the trouble of
// registering it
if (pending.cf.isDone()) return;
var client = pending.client;
var cf = pending.cf;
var id = pending.id;
boolean added = client.pendingRequests.add(pending);
// this may immediately remove `pending` from the set is the cf is already completed
pending.ref = cf.whenComplete((r,t) -> client.pendingRequests.remove(pending));
assert added : "request %d was already added".formatted(id);
// should not happen, unless the selector manager has already
// exited abnormally
if (client.selmgr.isClosed()) {
pending.abort(client.selmgr.selectorClosedException());
}
}
static void abortPendingRequests(HttpClientImpl client, Throwable reason) {
reason = Utils.getCompletionCause(reason);
if (client.debug.on()) {
var msg = reason instanceof RejectedExecutionException
? reason.getClass() : reason;
client.debug.log("aborting pending requests due to: %s", msg);
}
closeSubscribers(client, reason);
var pendingRequests = client.pendingRequests;
while (!pendingRequests.isEmpty()) {
var pendings = pendingRequests.iterator();
while (pendings.hasNext()) {
var pending = pendings.next();
try {
pending.abort(reason);
} finally {
pendings.remove();
}
}
}
}
private final CookieHandler cookieHandler;
@ -206,6 +353,11 @@ final class HttpClientImpl extends HttpClient implements Trackable {
// have completed.
private final WeakReference<HttpClientFacade> facadeRef;
private final ConcurrentSkipListSet<PlainHttpConnection> openedConnections
= new ConcurrentSkipListSet<>(HttpConnection.COMPARE_BY_ID);
private final ConcurrentSkipListSet<HttpBodySubscriberWrapper<?>> subscribers
= new ConcurrentSkipListSet<>(HttpBodySubscriberWrapper.COMPARE_BY_ID);
// This counter keeps track of the number of operations pending
// on the HttpClient. The SelectorManager thread will wait
// until there are no longer any pending operations and the
@ -237,8 +389,11 @@ final class HttpClientImpl extends HttpClient implements Trackable {
// the response has been fully received or the web socket is closed.
private final AtomicLong pendingOperationCount = new AtomicLong();
private final AtomicLong pendingWebSocketCount = new AtomicLong();
private final AtomicLong pendingHttpOperationsCount = new AtomicLong();
private final AtomicLong pendingHttpRequestCount = new AtomicLong();
private final AtomicLong pendingHttp2StreamCount = new AtomicLong();
private final AtomicLong pendingTCPConnectionCount = new AtomicLong();
private final AtomicBoolean isAlive = new AtomicBoolean();
/** A Set of, deadline first, ordered timeout events. */
private final TreeSet<TimeoutEvent> timeouts;
@ -292,7 +447,9 @@ final class HttpClientImpl extends HttpClient implements Trackable {
} else {
isDefaultExecutor = false;
}
delegatingExecutor = new DelegatingExecutor(this::isSelectorThread, ex);
pendingRequests = new ConcurrentSkipListSet<>();
delegatingExecutor = new DelegatingExecutor(this::isSelectorThread, ex,
this::onSubmitFailure);
facadeRef = new WeakReference<>(facadeFactory.createFacade(this));
client2 = new Http2ClientImpl(this);
cookieHandler = builder.cookieHandler;
@ -331,6 +488,10 @@ final class HttpClientImpl extends HttpClient implements Trackable {
assert facadeRef.get() != null;
}
void onSubmitFailure(Runnable command, Throwable failure) {
selmgr.abort(failure);
}
private void start() {
selmgr.start();
}
@ -344,10 +505,48 @@ final class HttpClientImpl extends HttpClient implements Trackable {
connections.stop();
// Clears HTTP/2 cache and close its connections.
client2.stop();
// make sure all subscribers are completed
closeSubscribers();
// close TCP connection if any are still opened
openedConnections.forEach(this::closeConnection);
// shutdown the executor if needed
if (isDefaultExecutor) delegatingExecutor.shutdown();
}
private void closeSubscribers() {
if (subscribers.isEmpty()) return;
IOException io = selmgr.selectorClosedException();
closeSubscribers(this, io);
}
private static void closeSubscribers(HttpClientImpl client, Throwable t) {
client.subscribers.forEach(s -> s.onError(t));
}
public void registerSubscriber(HttpBodySubscriberWrapper<?> subscriber) {
if (!selmgr.isClosed()) {
synchronized (selmgr) {
if (!selmgr.isClosed()) {
subscribers.add(subscriber);
return;
}
}
}
subscriber.onError(selmgr.selectorClosedException());
}
public void subscriberCompleted(HttpBodySubscriberWrapper<?> subscriber) {
subscribers.remove(subscriber);
}
private void closeConnection(HttpConnection conn) {
try { conn.close(); } catch (Throwable e) {
if (Log.channel()) {
Log.logChannel("Failed to close connection: " + e);
}
}
}
private static SSLParameters getDefaultParams(SSLContext ctx) {
SSLParameters params = ctx.getDefaultSSLParameters();
return params;
@ -365,16 +564,46 @@ final class HttpClientImpl extends HttpClient implements Trackable {
return facadeRef.get();
}
// Increments the pendingOperationCount.
final long reference() {
public long newConnectionId() {
return CONNECTION_IDS.incrementAndGet();
}
// Increments the pendingTCPConnectionCount
public void connectionOpened(PlainHttpConnection plainHttpConnection) {
if (openedConnections.add(plainHttpConnection)) {
pendingTCPConnectionCount.incrementAndGet();
}
}
// Decrements the pendingTCPConnectionCount
public void connectionClosed(PlainHttpConnection plainHttpConnection) {
if (openedConnections.remove(plainHttpConnection)) {
pendingTCPConnectionCount.decrementAndGet();
}
}
// Increments the pendingHttpRequestCount and pendingOperationCount.
final long requestReference() {
pendingHttpRequestCount.incrementAndGet();
return reference();
}
// Decrements the pendingHttpRequestCount and pendingOperationCount.
final long requestUnreference() {
pendingHttpRequestCount.decrementAndGet();
return unreference();
}
// Increments the pendingHttpOperationsCount and pendingOperationCount.
final long reference() {
pendingHttpOperationsCount.incrementAndGet();
return pendingOperationCount.incrementAndGet();
}
// Decrements the pendingOperationCount.
// Decrements the pendingHttpOperationsCount and pendingOperationCount.
final long unreference() {
final long count = pendingOperationCount.decrementAndGet();
final long httpCount = pendingHttpRequestCount.decrementAndGet();
final long httpCount = pendingHttpOperationsCount.decrementAndGet();
final long http2Count = pendingHttp2StreamCount.get();
final long webSocketCount = pendingWebSocketCount.get();
if (count == 0 && facade() == null) {
@ -387,17 +616,17 @@ final class HttpClientImpl extends HttpClient implements Trackable {
return count;
}
// Increments the pendingOperationCount.
// Increments the pendingHttp2StreamCount and pendingOperationCount.
final long streamReference() {
pendingHttp2StreamCount.incrementAndGet();
return pendingOperationCount.incrementAndGet();
}
// Decrements the pendingOperationCount.
// Decrements the pendingHttp2StreamCount and pendingOperationCount.
final long streamUnreference() {
final long count = pendingOperationCount.decrementAndGet();
final long http2Count = pendingHttp2StreamCount.decrementAndGet();
final long httpCount = pendingHttpRequestCount.get();
final long httpCount = pendingHttpOperationsCount.get();
final long webSocketCount = pendingWebSocketCount.get();
if (count == 0 && facade() == null) {
selmgr.wakeupSelector();
@ -409,17 +638,17 @@ final class HttpClientImpl extends HttpClient implements Trackable {
return count;
}
// Increments the pendingOperationCount.
// Increments the pendingWebSocketCount and pendingOperationCount.
final long webSocketOpen() {
pendingWebSocketCount.incrementAndGet();
return pendingOperationCount.incrementAndGet();
}
// Decrements the pendingOperationCount.
// Decrements the pendingWebSocketCount and pendingOperationCount.
final long webSocketClose() {
final long count = pendingOperationCount.decrementAndGet();
final long webSocketCount = pendingWebSocketCount.decrementAndGet();
final long httpCount = pendingHttpRequestCount.get();
final long httpCount = pendingHttpOperationsCount.get();
final long http2Count = pendingHttp2StreamCount.get();
if (count == 0 && facade() == null) {
selmgr.wakeupSelector();
@ -432,28 +661,41 @@ final class HttpClientImpl extends HttpClient implements Trackable {
}
// Returns the pendingOperationCount.
// Incremented with any operation, whether it's HTTP/1.1, HTTP/2, or WebSocket
final long referenceCount() {
return pendingOperationCount.get();
}
// Trackers are used in test to verify that an instance of
// HttpClient has shutdown correctly, and that all operations
// have terminated.
static final class HttpClientTracker implements Tracker {
final AtomicLong requestCount;
final AtomicLong httpCount;
final AtomicLong http2Count;
final AtomicLong websocketCount;
final AtomicLong operationsCount;
final AtomicLong connnectionsCount;
final Reference<?> reference;
final AtomicBoolean isAlive;
final String name;
HttpClientTracker(AtomicLong http,
HttpClientTracker(AtomicLong request,
AtomicLong http,
AtomicLong http2,
AtomicLong ws,
AtomicLong ops,
AtomicLong conns,
Reference<?> ref,
AtomicBoolean isAlive,
String name) {
this.requestCount = request;
this.httpCount = http;
this.http2Count = http2;
this.websocketCount = ws;
this.operationsCount = ops;
this.connnectionsCount = conns;
this.reference = ref;
this.isAlive = isAlive;
this.name = name;
}
@Override
@ -461,6 +703,12 @@ final class HttpClientImpl extends HttpClient implements Trackable {
return operationsCount.get();
}
@Override
public long getOutstandingHttpRequests() {
return requestCount.get();
}
@Override
public long getOutstandingTcpConnections() { return connnectionsCount.get();}
@Override
public long getOutstandingHttpOperations() {
return httpCount.get();
}
@ -475,23 +723,29 @@ final class HttpClientImpl extends HttpClient implements Trackable {
return reference.get() != null;
}
@Override
public boolean isSelectorAlive() { return isAlive.get(); }
@Override
public String getName() {
return name;
}
}
public Tracker getOperationsTracker() {
return new HttpClientTracker(pendingHttpRequestCount,
return new HttpClientTracker(
pendingHttpRequestCount,
pendingHttpOperationsCount,
pendingHttp2StreamCount,
pendingWebSocketCount,
pendingOperationCount,
pendingTCPConnectionCount,
facadeRef,
isAlive,
dbgTag);
}
// Called by the SelectorManager thread to figure out whether it's time
// to terminate.
final boolean isReferenced() {
boolean isReferenced() {
HttpClient facade = facade();
return facade != null || referenceCount() > 0;
}
@ -523,6 +777,14 @@ final class HttpClientImpl extends HttpClient implements Trackable {
return Thread.currentThread() == selmgr;
}
boolean isSelectorClosed() {
return selmgr.isClosed();
}
IOException selectorClosedException() {
return selmgr.selectorClosedException();
}
Http2ClientImpl client2() {
return client2;
}
@ -613,6 +875,12 @@ final class HttpClientImpl extends HttpClient implements Trackable {
Objects.requireNonNull(userRequest);
Objects.requireNonNull(responseHandler);
// should not happen, unless the selector manager has
// exited abnormally
if (selmgr.isClosed()) {
return MinimalFuture.failedFuture(selmgr.selectorClosedException());
}
AccessControlContext acc = null;
if (System.getSecurityManager() != null)
acc = AccessController.getContext();
@ -622,8 +890,9 @@ final class HttpClientImpl extends HttpClient implements Trackable {
if (requestImpl.method().equals("CONNECT"))
throw new IllegalArgumentException("Unsupported method CONNECT");
long id = pendingRequestId.incrementAndGet();
long start = DEBUGELAPSED ? System.nanoTime() : 0;
reference();
requestReference();
try {
if (debugelapsed.on())
debugelapsed.log("ClientImpl (async) send %s", userRequest);
@ -646,8 +915,8 @@ final class HttpClientImpl extends HttpClient implements Trackable {
responseHandler,
pushPromiseHandler,
acc);
CompletableFuture<HttpResponse<T>> res =
mex.responseAsync(executor).whenComplete((b,t) -> unreference());
CompletableFuture<HttpResponse<T>> mexCf = mex.responseAsync(executor);
CompletableFuture<HttpResponse<T>> res = mexCf.whenComplete((b,t) -> requestUnreference());
if (DEBUGELAPSED) {
res = res.whenComplete(
(b,t) -> debugCompleted("ClientImpl (async)", start, userRequest));
@ -659,9 +928,14 @@ final class HttpClientImpl extends HttpClient implements Trackable {
if (exchangeExecutor != null) {
res = res.whenCompleteAsync((r, t) -> { /* do nothing */}, ASYNC_POOL);
}
// The mexCf is the Cf we need to abort if the SelectorManager thread
// is aborted.
PendingRequest pending = new PendingRequest(id, requestImpl, mexCf, mex, this);
registerPending(pending);
return res;
} catch(Throwable t) {
unreference();
requestUnreference();
debugCompleted("ClientImpl (async)", start, userRequest);
throw t;
}
@ -701,8 +975,9 @@ final class HttpClientImpl extends HttpClient implements Trackable {
private final List<AsyncTriggerEvent> deregistrations;
private final Logger debug;
private final Logger debugtimeout;
HttpClientImpl owner;
ConnectionPool pool;
private final HttpClientImpl owner;
private final ConnectionPool pool;
private final AtomicReference<Throwable> errorRef = new AtomicReference<>();
SelectorManager(HttpClientImpl ref) throws IOException {
super(null, null,
@ -717,13 +992,22 @@ final class HttpClientImpl extends HttpClient implements Trackable {
selector = Selector.open();
}
IOException selectorClosedException() {
var io = new IOException("selector manager closed");
var cause = errorRef.get();
if (cause != null) {
io.initCause(cause);
}
return io;
}
void eventUpdated(AsyncEvent e) throws ClosedChannelException {
if (Thread.currentThread() == this) {
SelectionKey key = e.channel().keyFor(selector);
if (key != null && key.isValid()) {
SelectorAttachment sa = (SelectorAttachment) key.attachment();
sa.register(e);
} else if (e.interestOps() != 0){
} else if (e.interestOps() != 0) {
// We don't care about paused events.
// These are actually handled by
// SelectorAttachment::resetInterestOps later on.
@ -740,6 +1024,7 @@ final class HttpClientImpl extends HttpClient implements Trackable {
// This returns immediately. So caller not allowed to send/receive
// on connection.
synchronized void register(AsyncEvent e) {
if (closed) e.abort(selectorClosedException());
registrations.add(e);
selector.wakeup();
}
@ -756,6 +1041,48 @@ final class HttpClientImpl extends HttpClient implements Trackable {
selector.wakeup();
}
void abort(Throwable t) {
boolean closed = this.closed;
errorRef.compareAndSet(null, t);
if (debug.on()) {
debug.log("aborting selector manager(closed=%s): " + t, closed);
}
t = errorRef.get();
boolean inSelectorThread = owner.isSelectorThread();
if (!inSelectorThread) {
// abort anything pending, then close
abortPendingRequests(owner, t);
}
Set<SelectionKey> keys = new HashSet<>();
Set<AsyncEvent> toAbort = new HashSet<>();
synchronized (this) {
if (closed = this.closed) return;
this.closed = true;
try {
keys.addAll(selector.keys());
} catch (ClosedSelectorException ce) {
// OK - nothing to do...
}
toAbort.addAll(this.registrations);
toAbort.addAll(this.deregistrations);
this.registrations.clear();
this.deregistrations.clear();
}
// double check after closing
abortPendingRequests(owner, t);
IOException io = toAbort.isEmpty()
? null : selectorClosedException();
for (AsyncEvent e : toAbort) {
try {
e.abort(io);
} catch (Throwable x) {
debug.log("Failed to abort event: " + x);
}
}
if (!inSelectorThread) selector.wakeup();
}
synchronized void shutdown() {
Log.logTrace("{0}: shutting down", getName());
if (debug.on()) debug.log("SelectorManager shutting down");
@ -768,14 +1095,19 @@ final class HttpClientImpl extends HttpClient implements Trackable {
}
}
boolean isClosed() {
return closed;
}
@Override
public void run() {
List<Pair<AsyncEvent,IOException>> errorList = new ArrayList<>();
List<AsyncEvent> readyList = new ArrayList<>();
List<Runnable> resetList = new ArrayList<>();
owner.isAlive.set(true);
try {
if (Log.channel()) Log.logChannel(getName() + ": starting");
while (!Thread.currentThread().isInterrupted()) {
while (!Thread.currentThread().isInterrupted() && !closed) {
synchronized (this) {
assert errorList.isEmpty();
assert readyList.isEmpty();
@ -808,7 +1140,7 @@ final class HttpClientImpl extends HttpClient implements Trackable {
// may throw IOE if channel closed: that's OK
sa.register(event);
if (!chan.isOpen()) {
throw new IOException("Channel closed");
throw new ClosedChannelException();
}
} catch (IOException e) {
Log.logTrace("{0}: {1}", getName(), e);
@ -927,7 +1259,8 @@ final class HttpClientImpl extends HttpClient implements Trackable {
selector.selectedKeys().clear();
// handle selected events
readyList.forEach((e) -> handleEvent(e, null));
IOException ioe = closed ? selectorClosedException() : null;
readyList.forEach((e) -> handleEvent(e, ioe));
readyList.clear();
// handle errors (closed channels etc...)
@ -940,19 +1273,26 @@ final class HttpClientImpl extends HttpClient implements Trackable {
}
} catch (Throwable e) {
errorRef.compareAndSet(null, e);
if (!closed) {
closed = true; // set closed early so that new requests are rejected
// This terminates thread. So, better just print stack trace
String err = Utils.stackTrace(e);
Log.logError("{0}: {1}: {2}", getName(),
"HttpClientImpl shutting down due to fatal error", err);
}
abortPendingRequests(owner, selectorClosedException());
if (debug.on()) debug.log("shutting down", e);
if (Utils.ASSERTIONSENABLED && !debug.on()) {
e.printStackTrace(System.err); // always print the stack
}
} finally {
if (Log.channel()) Log.logChannel(getName() + ": stopping");
shutdown();
try {
shutdown();
} finally {
owner.isAlive.set(false);
}
}
}
@ -969,7 +1309,10 @@ final class HttpClientImpl extends HttpClient implements Trackable {
/** Handles the given event. The given ioe may be null. */
void handleEvent(AsyncEvent event, IOException ioe) {
if (closed || ioe != null) {
if (ioe == null && closed) {
ioe = selectorClosedException();
}
if (ioe != null) {
event.abort(ioe);
} else {
event.handle();

@ -1,5 +1,5 @@
/*
* Copyright (c) 2015, 2021, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2015, 2022, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -31,10 +31,10 @@ import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Arrays;
import java.util.Comparator;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentLinkedDeque;
@ -69,16 +69,29 @@ abstract class HttpConnection implements Closeable {
final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
static final Logger DEBUG_LOGGER = Utils.getDebugLogger(
() -> "HttpConnection(SocketTube(?))", Utils.DEBUG);
public static final Comparator<HttpConnection> COMPARE_BY_ID
= Comparator.comparing(HttpConnection::id);
/** The address this connection is connected to. Could be a server or a proxy. */
final InetSocketAddress address;
private final HttpClientImpl client;
private final TrailingOperations trailingOperations;
private final long id;
HttpConnection(InetSocketAddress address, HttpClientImpl client) {
this.address = address;
this.client = client;
trailingOperations = new TrailingOperations();
this.id = newConnectionId(client);
}
// This is overridden in tests
long newConnectionId(HttpClientImpl client) {
return client.newConnectionId();
}
private long id() {
return id;
}
private static final class TrailingOperations {

@ -1,5 +1,5 @@
/*
* Copyright (c) 2015, 2021, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2015, 2022, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -169,9 +169,10 @@ class MultiExchange<T> implements Cancelable {
this.responseHandler = responseHandler;
if (pushPromiseHandler != null) {
Executor ensureExecutedAsync = this.executor::ensureExecutedAsync;
Executor executor = acc == null
? this.executor.delegate()
: new PrivilegedExecutor(this.executor.delegate(), acc);
? ensureExecutedAsync
: new PrivilegedExecutor(ensureExecutedAsync, acc);
this.pushGroup = new PushGroup<>(pushPromiseHandler, request, executor);
} else {
pushGroup = null;

@ -1,5 +1,5 @@
/*
* Copyright (c) 2015, 2021, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2015, 2022, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -140,15 +140,18 @@ class PlainHttpConnection extends HttpConnection {
debug.log("ConnectEvent: connect finished: %s, cancelled: %s, Local addr: %s",
finished, exchange.multi.requestCancelled(), chan.getLocalAddress());
assert finished || exchange.multi.requestCancelled() : "Expected channel to be connected";
client().connectionOpened(PlainHttpConnection.this);
// complete async since the event runs on the SelectorManager thread
cf.completeAsync(() -> ConnectState.SUCCESS, client().theExecutor());
} catch (Throwable e) {
if (canRetryConnect(e)) {
unsuccessfulAttempts++;
// complete async since the event runs on the SelectorManager thread
cf.completeAsync(() -> ConnectState.RETRY, client().theExecutor());
return;
}
Throwable t = Utils.toConnectException(e);
// complete async since the event runs on the SelectorManager thread
client().theExecutor().execute( () -> cf.completeExceptionally(t));
close();
}
@ -156,6 +159,7 @@ class PlainHttpConnection extends HttpConnection {
@Override
public void abort(IOException ioe) {
// complete async since the event runs on the SelectorManager thread
client().theExecutor().execute( () -> cf.completeExceptionally(ioe));
close();
}
@ -188,6 +192,7 @@ class PlainHttpConnection extends HttpConnection {
}
if (finished) {
if (debug.on()) debug.log("connect finished without blocking");
client().connectionOpened(this);
cf.complete(ConnectState.SUCCESS);
} else {
if (debug.on()) debug.log("registering connect event");
@ -197,6 +202,9 @@ class PlainHttpConnection extends HttpConnection {
} catch (Throwable throwable) {
cf.completeExceptionally(Utils.toConnectException(throwable));
try {
if (Log.channel()) {
Log.logChannel("Closing connection: connect failed due to: " + throwable);
}
close();
} catch (Exception x) {
if (debug.on())
@ -368,8 +376,15 @@ class PlainHttpConnection extends HttpConnection {
debug.log("Closing channel: " + client().debugInterestOps(chan));
if (connectTimerEvent != null)
client().cancelTimer(connectTimerEvent);
chan.close();
tube.signalClosed();
if (Log.channel()) {
Log.logChannel("Closing channel: " + chan);
}
try {
chan.close();
tube.signalClosed();
} finally {
client().connectionClosed(this);
}
} catch (IOException e) {
Log.logTrace("Closing resulted in " + e);
}

@ -1,5 +1,5 @@
/*
* Copyright (c) 2015, 2021, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2015, 2022, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -104,6 +104,7 @@ class ResponseContent {
// A current-state message suitable for inclusion in an exception
// detail message.
String currentStateMessage();
void onError(Throwable t);
}
// Returns a parser that will take care of parsing the received byte
@ -178,6 +179,12 @@ class ResponseContent {
pusher.onSubscribe(this.sub = sub);
}
@Override
public void onError(Throwable t) {
closedExceptionally = t;
onComplete.accept(t);
}
@Override
public String currentStateMessage() {
return format("chunked transfer encoding, state: %s", state);
@ -477,6 +484,12 @@ class ResponseContent {
pusher.onSubscribe(this.sub = sub);
}
@Override
public void onError(Throwable t) {
closedExceptionally = t;
onComplete.accept(t);
}
@Override
public String currentStateMessage() {
return format("http1_0 content, bytes received: %d", breceived);
@ -563,6 +576,16 @@ class ResponseContent {
}
}
@Override
public void onError(Throwable t) {
if (contentLength != 0) {
closedExceptionally = t;
onComplete.accept(t);
} else {
onComplete.accept(null);
}
}
@Override
public String currentStateMessage() {
return format("fixed content-length: %d, bytes received: %d",

@ -1,5 +1,5 @@
/*
* Copyright (c) 2016, 2021, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2016, 2022, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -63,6 +63,7 @@ import jdk.internal.net.http.common.Log;
import jdk.internal.net.http.common.Logger;
import jdk.internal.net.http.common.MinimalFuture;
import jdk.internal.net.http.common.Utils;
import jdk.internal.net.http.HttpClientImpl.DelegatingExecutor;
import static java.nio.charset.StandardCharsets.UTF_8;
public class ResponseSubscribers {
@ -1141,8 +1142,8 @@ public class ResponseSubscribers {
assert cf != null;
if (TrustedSubscriber.needsExecutor(bs)) {
e = (e instanceof HttpClientImpl.DelegatingExecutor)
? ((HttpClientImpl.DelegatingExecutor) e).delegate() : e;
e = (e instanceof DelegatingExecutor exec)
? exec::ensureExecutedAsync : e;
}
e.execute(() -> {
@ -1155,12 +1156,14 @@ public class ResponseSubscribers {
}
});
} catch (Throwable t) {
// the errorHandler will complete the CF
errorHandler.accept(t);
}
});
return cf;
} catch (Throwable t) {
// the errorHandler will complete the CF
errorHandler.accept(t);
}
return cf;

@ -341,6 +341,10 @@ final class SocketTube implements FlowTube {
void tryFlushCurrent(boolean inSelectorThread) {
List<ByteBuffer> bufs = current;
if (bufs == null) return;
if (client.isSelectorClosed()) {
signalError(client.selectorClosedException());
return;
}
try {
assert inSelectorThread == client.isSelectorThread() :
"should " + (inSelectorThread ? "" : "not ")
@ -354,6 +358,10 @@ final class SocketTube implements FlowTube {
if (remaining - written == 0) {
current = null;
if (writeDemand.tryDecrement()) {
if (client.isSelectorClosed()) {
signalError(client.selectorClosedException());
return;
}
Runnable requestMore = this::requestMore;
if (inSelectorThread) {
assert client.isSelectorThread();

@ -1,5 +1,5 @@
/*
* Copyright (c) 2015, 2021, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2015, 2022, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -31,6 +31,8 @@ import java.io.UncheckedIOException;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.net.URI;
import java.net.http.HttpResponse.BodyHandler;
import java.net.http.HttpResponse.ResponseInfo;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
@ -299,6 +301,9 @@ class Stream<T> extends ExchangeImpl<T> {
return endStream;
}
// This method is called by Http2Connection::decrementStreamCount in order
// to make sure that the stream count is decremented only once for
// a given stream.
boolean deRegister() {
return DEREGISTERED.compareAndSet(this, false, true);
}
@ -311,7 +316,8 @@ class Stream<T> extends ExchangeImpl<T> {
try {
Log.logTrace("Reading body on stream {0}", streamid);
debug.log("Getting BodySubscriber for: " + response);
BodySubscriber<T> bodySubscriber = handler.apply(new ResponseInfoImpl(response));
Http2StreamResponseSubscriber<T> bodySubscriber =
createResponseSubscriber(handler, new ResponseInfoImpl(response));
CompletableFuture<T> cf = receiveData(bodySubscriber, executor);
PushGroup<?> pg = exchange.getPushGroup();
@ -327,6 +333,25 @@ class Stream<T> extends ExchangeImpl<T> {
}
}
@Override
Http2StreamResponseSubscriber<T> createResponseSubscriber(BodyHandler<T> handler, ResponseInfo response) {
Http2StreamResponseSubscriber<T> subscriber =
new Http2StreamResponseSubscriber<>(handler.apply(response));
registerResponseSubscriber(subscriber);
return subscriber;
}
// The Http2StreamResponseSubscriber is registered with the HttpClient
// to ensure that it gets completed if the SelectorManager aborts due
// to unexpected exceptions.
private void registerResponseSubscriber(Http2StreamResponseSubscriber<?> subscriber) {
client().registerSubscriber(subscriber);
}
private void subscriberCompleted(Http2StreamResponseSubscriber<?> subscriber) {
client().subscriberCompleted(subscriber);
}
@Override
public String toString() {
return "streamid: " + streamid;
@ -384,10 +409,13 @@ class Stream<T> extends ExchangeImpl<T> {
if (isCanceled()) {
Throwable t = getCancelCause();
responseBodyCF.completeExceptionally(t);
} else {
pendingResponseSubscriber = bodySubscriber;
sched.runOrSchedule(); // in case data waiting already to be processed
}
// ensure that the body subscriber will be subsribed and onError() is
// invoked
pendingResponseSubscriber = bodySubscriber;
sched.runOrSchedule(); // in case data waiting already to be processed, or error
return responseBodyCF;
}
@ -1484,6 +1512,21 @@ class Stream<T> extends ExchangeImpl<T> {
}
}
final class Http2StreamResponseSubscriber<U> extends HttpBodySubscriberWrapper<U> {
Http2StreamResponseSubscriber(BodySubscriber<U> subscriber) {
super(subscriber);
}
@Override
protected void complete(Throwable t) {
try {
Stream.this.subscriberCompleted(this);
} finally {
super.complete(t);
}
}
}
private static final VarHandle STREAM_STATE;
private static final VarHandle DEREGISTERED;
static {

@ -0,0 +1,174 @@
/*
* Copyright (c) 2022, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package jdk.internal.net.http.common;
import java.net.http.HttpResponse.BodySubscriber;
import java.nio.ByteBuffer;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import jdk.internal.net.http.ResponseSubscribers.TrustedSubscriber;
/**
* A class that wraps a user supplied {@link BodySubscriber}, but on
* which {@link #onError(Throwable)} can be invoked at any time,
* even before {@link #onSubscribe(Subscription)} has not been called
* yet.
* @param <T> the type of the response body
*/
public class HttpBodySubscriberWrapper<T> implements TrustedSubscriber<T> {
public static final Comparator<HttpBodySubscriberWrapper<?>> COMPARE_BY_ID
= Comparator.comparing(HttpBodySubscriberWrapper::id);
public static final Flow.Subscription NOP = new Flow.Subscription() {
@Override
public void request(long n) { }
public void cancel() { }
};
static final AtomicLong IDS = new AtomicLong();
final long id = IDS.incrementAndGet();
final BodySubscriber<T> userSubscriber;
final AtomicBoolean completed = new AtomicBoolean();
final AtomicBoolean subscribed = new AtomicBoolean();
volatile Subscription subscription;
volatile Throwable withError;
public HttpBodySubscriberWrapper(BodySubscriber<T> userSubscriber) {
this.userSubscriber = userSubscriber;
}
final long id() { return id; }
@Override
public boolean needsExecutor() {
return TrustedSubscriber.needsExecutor(userSubscriber);
}
// propagate the error to the user subscriber, even if not
// subscribed yet.
private void propagateError(Throwable t) {
assert t != null;
try {
// if unsubscribed at this point, it will not
// get subscribed later - so do it now and
// propagate the error
// Race condition with onSubscribe: we need to wait until
// subscription is finished before calling onError;
synchronized (this) {
if (subscribed.compareAndSet(false, true)) {
userSubscriber.onSubscribe(NOP);
}
}
} finally {
// if onError throws then there is nothing to do
// here: let the caller deal with it by logging
// and closing the connection.
userSubscriber.onError(t);
}
}
/**
* Complete the subscriber, either normally or exceptionally
* ensure that the subscriber is completed only once.
* @param t a throwable, or {@code null}
*/
protected void complete(Throwable t) {
if (completed.compareAndSet(false, true)) {
t = withError = Utils.getCompletionCause(t);
if (t == null) {
try {
assert subscribed.get();
userSubscriber.onComplete();
} catch (Throwable x) {
// Simply propagate the error by calling
// onError on the user subscriber, and let the
// connection be reused since we should have received
// and parsed all the bytes when we reach here.
// If onError throws in turn, then we will simply
// let that new exception flow up to the caller
// and let it deal with it.
// (i.e: log and close the connection)
// Note that rethrowing here could introduce a
// race that might cause the next send() operation to
// fail as the connection has already been put back
// into the cache when we reach here.
propagateError(t = withError = Utils.getCompletionCause(x));
}
} else {
propagateError(t);
}
}
}
@Override
public CompletionStage<T> getBody() {
return userSubscriber.getBody();
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
// race condition with propagateError: we need to wait until
// subscription is finished before calling onError;
synchronized (this) {
if (subscribed.compareAndSet(false, true)) {
userSubscriber.onSubscribe(subscription);
} else {
// could be already subscribed and completed
// if an unexpected error occurred before the actual
// subscription - though that's not supposed
// happen.
assert completed.get();
}
}
}
@Override
public void onNext(List<ByteBuffer> item) {
if (completed.get()) {
if (subscription != null) {
subscription.cancel();
}
} else {
userSubscriber.onNext(item);
}
}
@Override
public void onError(Throwable throwable) {
complete(throwable);
}
@Override
public void onComplete() {
complete(null);
}
}

@ -1,5 +1,5 @@
/*
* Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2018, 2022, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -43,6 +43,10 @@ public final class OperationTrackers {
public interface Tracker {
// The total number of outstanding operations
long getOutstandingOperations();
// The number of outstandanding requests: this is
// the number of CF returned by send/sendAsync which
// have not been completed.
long getOutstandingHttpRequests();
// The number of outstanding HTTP/1.1 operations.
// A single HTTP/1.1 request may increment this counter
// multiple times, so the value returned will be >= to
@ -53,11 +57,16 @@ public final class OperationTrackers {
long getOutstandingHttp2Streams();
// The number of active WebSockets
long getOutstandingWebSocketOperations();
// number of TCP connections still opened
long getOutstandingTcpConnections();
// Whether the facade returned to the
// user is still referenced
boolean isFacadeReferenced();
// whether the Selector Manager thread is still running
boolean isSelectorAlive();
// The name of the object being tracked.
String getName();
}
/**

@ -0,0 +1,421 @@
/*
* Copyright (c) 2022, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
/*
* @test
* @bug 8277969
* @summary Test for edge case where the executor is not accepting
* new tasks while the client is still running
* @modules java.base/sun.net.www.http
* java.net.http/jdk.internal.net.http.common
* java.net.http/jdk.internal.net.http.frame
* java.net.http/jdk.internal.net.http.hpack
* java.logging
* jdk.httpserver
* @library /test/lib http2/server
* @build Http2TestServer
* @build jdk.test.lib.net.SimpleSSLContext ReferenceTracker
* @run testng/othervm
* -Djdk.internal.httpclient.debug=true
* -Djdk.httpclient.HttpClient.log=trace,headers,requests
* AsyncExecutorShutdown
*/
// -Djdk.internal.httpclient.debug=true
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpClient.Redirect;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodyHandlers;
import java.nio.channels.ClosedChannelException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLHandshakeException;
import com.sun.net.httpserver.HttpServer;
import com.sun.net.httpserver.HttpsConfigurator;
import com.sun.net.httpserver.HttpsServer;
import jdk.test.lib.RandomFactory;
import jdk.test.lib.net.SimpleSSLContext;
import org.testng.annotations.AfterTest;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import static java.lang.System.out;
import static java.net.http.HttpClient.Builder.NO_PROXY;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.fail;
public class AsyncExecutorShutdown implements HttpServerAdapters {
static {
HttpServerAdapters.enableServerLogging();
}
static final Random RANDOM = RandomFactory.getRandom();
SSLContext sslContext;
HttpTestServer httpTestServer; // HTTP/1.1 [ 4 servers ]
HttpTestServer httpsTestServer; // HTTPS/1.1
HttpTestServer http2TestServer; // HTTP/2 ( h2c )
HttpTestServer https2TestServer; // HTTP/2 ( h2 )
String httpURI;
String httpsURI;
String http2URI;
String https2URI;
static final String MESSAGE = "AsyncExecutorShutdown message body";
static final int ITERATIONS = 3;
@DataProvider(name = "positive")
public Object[][] positive() {
return new Object[][] {
{ httpURI, },
{ httpsURI, },
{ http2URI, },
{ https2URI, },
};
}
static final AtomicLong requestCounter = new AtomicLong();
final ReferenceTracker TRACKER = ReferenceTracker.INSTANCE;
static Throwable getCause(Throwable t) {
while (t instanceof CompletionException || t instanceof ExecutionException) {
t = t.getCause();
}
return t;
}
static String readBody(InputStream in) {
try {
return new String(in.readAllBytes(), StandardCharsets.UTF_8);
} catch (IOException io) {
throw new UncheckedIOException(io);
}
}
static void checkCause(String what, Throwable cause) {
Throwable t = cause;
Throwable accepted = null;
while (t != null) {
out.println(what + ": checking " + t);
if (t instanceof RejectedExecutionException) {
out.println(what + ": Got expected RejectedExecutionException in cause: " + t);
return;
} else if (t instanceof ClosedChannelException) {
out.println(what + ": Accepting ClosedChannelException as a valid cause: " + t);
accepted = t;
}
t = t.getCause();
}
if (accepted != null) {
out.println(what + ": Didn't find expected RejectedExecutionException, " +
"but accepting " + accepted.getClass().getSimpleName()
+ " as a valid cause: " + accepted);
return;
}
throw new AssertionError(what + ": Unexpected exception: " + cause, cause);
}
@Test(dataProvider = "positive")
void testConcurrent(String uriString) throws Exception {
out.printf("%n---- starting (%s) ----%n", uriString);
ExecutorService executorService = Executors.newCachedThreadPool();
ExecutorService readerService = Executors.newCachedThreadPool();
HttpClient client = HttpClient.newBuilder()
.proxy(NO_PROXY)
.followRedirects(Redirect.ALWAYS)
.executor(executorService)
.sslContext(sslContext)
.build();
TRACKER.track(client);
assert client.executor().isPresent();
int step = RANDOM.nextInt(ITERATIONS);
try {
List<CompletableFuture<String>> bodies = new ArrayList<>();
for (int i = 0; i < ITERATIONS; i++) {
URI uri = URI.create(uriString + "/concurrent/iteration-" + i);
HttpRequest request = HttpRequest.newBuilder(uri)
.header("X-uuid", "uuid-" + requestCounter.incrementAndGet())
.build();
out.printf("Iteration %d request: %s%n", i, request.uri());
CompletableFuture<HttpResponse<InputStream>> responseCF;
CompletableFuture<String> bodyCF;
final int si = i;
try {
responseCF = client.sendAsync(request, BodyHandlers.ofInputStream())
.thenApply((response) -> {
out.println(si + ": Got response: " + response);
assertEquals(response.statusCode(), 200);
return response;
});
bodyCF = responseCF.thenApplyAsync(HttpResponse::body, readerService)
.thenApply(AsyncExecutorShutdown::readBody)
.thenApply((s) -> { assertEquals(s, MESSAGE); return s;});
} catch (RejectedExecutionException x) {
out.println(i + ": Got expected exception: " + x);
continue;
}
long sleep = RANDOM.nextLong(5);
if (sleep > 0) {
out.printf("%d: sleeping %d ms%n", i, sleep);
Thread.sleep(sleep);
}
if (i == step) {
out.printf("%d: shutting down executor now%n", i, sleep);
executorService.shutdownNow();
}
var cf = bodyCF.exceptionally((t) -> {
Throwable cause = getCause(t);
out.println(si + ": Got expected exception: " + cause);
if (UncheckedIOException.class.isAssignableFrom(cause.getClass())) {
if (cause.getCause() != null) {
out.println(si + ": Got expected exception: " + cause);
cause = cause.getCause();
}
}
checkCause(String.valueOf(si), cause);
return null;
});
bodies.add(cf);
}
CompletableFuture.allOf(bodies.toArray(new CompletableFuture<?>[0])).get();
} finally {
client = null;
executorService.awaitTermination(2000, TimeUnit.MILLISECONDS);
readerService.shutdown();
readerService.awaitTermination(2000, TimeUnit.MILLISECONDS);
}
}
@Test(dataProvider = "positive")
void testSequential(String uriString) throws Exception {
out.printf("%n---- starting (%s) ----%n", uriString);
ExecutorService executorService = Executors.newCachedThreadPool();
ExecutorService readerService = Executors.newCachedThreadPool();
HttpClient client = HttpClient.newBuilder()
.proxy(NO_PROXY)
.followRedirects(Redirect.ALWAYS)
.executor(executorService)
.sslContext(sslContext)
.build();
TRACKER.track(client);
assert client.executor().isPresent();
int step = RANDOM.nextInt(ITERATIONS);
out.printf("will shutdown executor in step %d%n", step);
try {
for (int i = 0; i < ITERATIONS; i++) {
URI uri = URI.create(uriString + "/sequential/iteration-" + i);
HttpRequest request = HttpRequest.newBuilder(uri)
.header("X-uuid", "uuid-" + requestCounter.incrementAndGet())
.build();
out.printf("Iteration %d request: %s%n", i, request.uri());
final int si = i;
CompletableFuture<HttpResponse<InputStream>> responseCF;
CompletableFuture<String> bodyCF;
try {
responseCF = client.sendAsync(request, BodyHandlers.ofInputStream())
.thenApply((response) -> {
out.println(si + ": Got response: " + response);
assertEquals(response.statusCode(), 200);
return response;
});
bodyCF = responseCF.thenApplyAsync(HttpResponse::body, readerService)
.thenApply(AsyncExecutorShutdown::readBody)
.thenApply((s) -> {assertEquals(s, MESSAGE); return s;})
.thenApply((s) -> {out.println(si + ": Got body: " + s); return s;});
} catch (RejectedExecutionException x) {
out.println(i + ": Got expected exception: " + x);
continue;
}
long sleep = RANDOM.nextLong(5);
if (sleep > 0) {
out.printf("%d: sleeping %d ms%n", i, sleep);
Thread.sleep(sleep);
}
if (i == step) {
out.printf("%d: shutting down executor now%n", i, sleep);
executorService.shutdownNow();
}
bodyCF.handle((r,t) -> {
if (t != null) {
try {
Throwable cause = getCause(t);
out.println(si + ": Got expected exception: " + cause);
if (UncheckedIOException.class.isAssignableFrom(cause.getClass())) {
if (cause.getCause() != null) {
out.println(si + ": Got expected exception: " + cause);
cause = cause.getCause();
}
}
checkCause(String.valueOf(si), cause);
} catch (Throwable ase) {
return CompletableFuture.failedFuture(ase);
}
return CompletableFuture.completedFuture(null);
} else {
return CompletableFuture.completedFuture(r);
}
}).thenCompose((c) -> c).get();
}
} finally {
client = null;
executorService.awaitTermination(2000, TimeUnit.MILLISECONDS);
readerService.shutdown();
readerService.awaitTermination(2000, TimeUnit.MILLISECONDS);
}
}
// -- Infrastructure
@BeforeTest
public void setup() throws Exception {
out.println("\n**** Setup ****\n");
sslContext = new SimpleSSLContext().get();
if (sslContext == null)
throw new AssertionError("Unexpected null sslContext");
InetSocketAddress sa = new InetSocketAddress(InetAddress.getLoopbackAddress(), 0);
httpTestServer = HttpTestServer.of(HttpServer.create(sa, 0));
httpTestServer.addHandler(new ServerRequestHandler(), "/http1/exec/");
httpURI = "http://" + httpTestServer.serverAuthority() + "/http1/exec/retry";
HttpsServer httpsServer = HttpsServer.create(sa, 0);
httpsServer.setHttpsConfigurator(new HttpsConfigurator(sslContext));
httpsTestServer = HttpTestServer.of(httpsServer);
httpsTestServer.addHandler(new ServerRequestHandler(),"/https1/exec/");
httpsURI = "https://" + httpsTestServer.serverAuthority() + "/https1/exec/retry";
http2TestServer = HttpTestServer.of(new Http2TestServer("localhost", false, 0));
http2TestServer.addHandler(new ServerRequestHandler(), "/http2/exec/");
http2URI = "http://" + http2TestServer.serverAuthority() + "/http2/exec/retry";
https2TestServer = HttpTestServer.of(new Http2TestServer("localhost", true, sslContext));
https2TestServer.addHandler(new ServerRequestHandler(), "/https2/exec/");
https2URI = "https://" + https2TestServer.serverAuthority() + "/https2/exec/retry";
httpTestServer.start();
httpsTestServer.start();
http2TestServer.start();
https2TestServer.start();
}
@AfterTest
public void teardown() throws Exception {
Thread.sleep(100);
AssertionError fail = TRACKER.checkShutdown(5000);
try {
httpTestServer.stop();
httpsTestServer.stop();
http2TestServer.stop();
https2TestServer.stop();
} finally {
if (fail != null) throw fail;
}
}
static class ServerRequestHandler implements HttpTestHandler {
ConcurrentHashMap<String,String> closedRequests = new ConcurrentHashMap<>();
@java.lang.Override
public void handle(HttpTestExchange t) throws IOException {
out.println("ServerRequestHandler for: " + t.getRequestURI());
List<String> uuids = t.getRequestHeaders().get("X-uuid");
if (uuids == null || uuids.size() != 1) {
readAllRequestData(t);
try (OutputStream os = t.getResponseBody()) {
String msg = "Incorrect uuid header values:[" + uuids + "]";
(new RuntimeException(msg)).printStackTrace();
t.sendResponseHeaders(500, -1);
os.write(msg.getBytes(UTF_8));
}
return;
}
String uuid = uuids.get(0);
// retrying
if (closedRequests.putIfAbsent(uuid, t.getRequestURI().toString()) == null) {
if (t.getExchangeVersion() == HttpClient.Version.HTTP_1_1) {
// Throwing an exception here only causes a retry
// with HTTP_1_1 - where it forces the server to close
// the connection.
// For HTTP/2 then throwing an IOE would cause the server
// to close the stream, and throwing anything else would
// cause it to close the connection, but neither would
// cause the client to retry.
// So we simply do not try to retry with HTTP/2.
out.println("Server will close connection, client will retry: "
+ t.getRequestURI().toString());
throw new IOException("Closing on first request");
}
}
// not retrying
readAllRequestData(t);
try (OutputStream os = t.getResponseBody()) {
byte[] bytes = MESSAGE.getBytes(UTF_8);
t.sendResponseHeaders(200, bytes.length);
for (int i=0; i<bytes.length; i++) {
os.write(bytes, i, 1);
os.flush();
try {
Thread.sleep(RANDOM.nextInt(5));
} catch (InterruptedException x) { }
}
}
closedRequests.remove(uuid);
}
}
static void readAllRequestData(HttpTestExchange t) throws IOException {
try (InputStream is = t.getRequestBody()) {
is.readAllBytes();
}
}
}

@ -0,0 +1,384 @@
/*
* Copyright (c) 2022, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
/*
* @test
* @bug 8277969
* @summary Test for edge case where the executor is not accepting
* new tasks while the client is still running
* @modules java.base/sun.net.www.http
* java.net.http/jdk.internal.net.http.common
* java.net.http/jdk.internal.net.http.frame
* java.net.http/jdk.internal.net.http.hpack
* java.logging
* jdk.httpserver
* @library /test/lib http2/server
* @build Http2TestServer
* @build jdk.test.lib.net.SimpleSSLContext ReferenceTracker
* @run testng/othervm
* -Djdk.internal.httpclient.debug=true
* -Djdk.httpclient.HttpClient.log=trace,headers,requests
* ExecutorShutdown
*/
// -Djdk.internal.httpclient.debug=true
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpClient.Redirect;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodyHandlers;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLHandshakeException;
import com.sun.net.httpserver.HttpServer;
import com.sun.net.httpserver.HttpsConfigurator;
import com.sun.net.httpserver.HttpsServer;
import jdk.test.lib.RandomFactory;
import jdk.test.lib.net.SimpleSSLContext;
import org.testng.annotations.AfterTest;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import static java.lang.System.out;
import static java.net.http.HttpClient.Builder.NO_PROXY;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.fail;
public class ExecutorShutdown implements HttpServerAdapters {
static {
HttpServerAdapters.enableServerLogging();
}
static final Random RANDOM = RandomFactory.getRandom();
SSLContext sslContext;
HttpTestServer httpTestServer; // HTTP/1.1 [ 4 servers ]
HttpTestServer httpsTestServer; // HTTPS/1.1
HttpTestServer http2TestServer; // HTTP/2 ( h2c )
HttpTestServer https2TestServer; // HTTP/2 ( h2 )
String httpURI;
String httpsURI;
String http2URI;
String https2URI;
static final String MESSAGE = "ExecutorShutdown message body";
static final int ITERATIONS = 3;
@DataProvider(name = "positive")
public Object[][] positive() {
return new Object[][] {
{ httpURI, },
{ httpsURI, },
{ http2URI, },
{ https2URI, },
};
}
static final AtomicLong requestCounter = new AtomicLong();
final ReferenceTracker TRACKER = ReferenceTracker.INSTANCE;
static Throwable getCause(Throwable t) {
while (t instanceof CompletionException || t instanceof ExecutionException) {
t = t.getCause();
}
return t;
}
static void checkCause(String what, Throwable cause) {
Throwable t = cause;
Throwable accepted = null;
while (t != null) {
out.println(what + ": checking " + t);
if (t instanceof RejectedExecutionException) {
out.println(what + ": Got expected RejectedExecutionException in cause: " + t);
return;
} else if (t instanceof ClosedChannelException) {
out.println(what + ": Accepting ClosedChannelException as a valid cause: " + t);
accepted = t;
}
t = t.getCause();
}
if (accepted != null) {
out.println(what + ": Didn't find expected RejectedExecutionException, " +
"but accepting " + accepted.getClass().getSimpleName()
+ " as a valid cause: " + accepted);
return;
}
throw new AssertionError(what + ": Unexpected exception: " + cause, cause);
}
@Test(dataProvider = "positive")
void testConcurrent(String uriString) throws Exception {
out.printf("%n---- starting (%s) ----%n", uriString);
ExecutorService executorService = Executors.newCachedThreadPool();
HttpClient client = HttpClient.newBuilder()
.proxy(NO_PROXY)
.followRedirects(Redirect.ALWAYS)
.executor(executorService)
.sslContext(sslContext)
.build();
TRACKER.track(client);
assert client.executor().isPresent();
int step = RANDOM.nextInt(ITERATIONS);
try {
List<CompletableFuture<HttpResponse<String>>> responses = new ArrayList<>();
for (int i = 0; i < ITERATIONS; i++) {
URI uri = URI.create(uriString + "/concurrent/iteration-" + i);
HttpRequest request = HttpRequest.newBuilder(uri)
.header("X-uuid", "uuid-" + requestCounter.incrementAndGet())
.build();
out.printf("Iteration %d request: %s%n", i, request.uri());
CompletableFuture<HttpResponse<String>> responseCF;
try {
responseCF = client.sendAsync(request, BodyHandlers.ofString());
} catch (RejectedExecutionException x) {
out.println(i + ": Got expected exception: " + x);
continue;
}
long sleep = RANDOM.nextLong(5);
if (sleep > 0) {
out.printf("%d: sleeping %d ms%n", i, sleep);
Thread.sleep(sleep);
}
if (i == step) {
out.printf("%d: shutting down executor now%n", i, sleep);
executorService.shutdownNow();
}
final int si = i;
var cf = responseCF.thenApply((response) -> {
out.println(si + ": Got response: " + response);
out.println(si + ": Got body Path: " + response.body());
assertEquals(response.statusCode(), 200);
assertEquals(response.body(), MESSAGE);
return response;
}).exceptionally((t) -> {
Throwable cause = getCause(t);
out.println(si + ": Got expected exception: " + cause);
checkCause(String.valueOf(si), cause);
return null;
});
responses.add(cf);
}
CompletableFuture.allOf(responses.toArray(new CompletableFuture<?>[0])).get();
} finally {
client = null;
executorService.awaitTermination(2000, TimeUnit.MILLISECONDS);
}
}
@Test(dataProvider = "positive")
void testSequential(String uriString) throws Exception {
out.printf("%n---- starting (%s) ----%n", uriString);
ExecutorService executorService = Executors.newCachedThreadPool();
HttpClient client = HttpClient.newBuilder()
.proxy(NO_PROXY)
.followRedirects(Redirect.ALWAYS)
.executor(executorService)
.sslContext(sslContext)
.build();
TRACKER.track(client);
assert client.executor().isPresent();
int step = RANDOM.nextInt(ITERATIONS);
out.printf("will shutdown executor in step %d%n", step);
try {
for (int i = 0; i < ITERATIONS; i++) {
URI uri = URI.create(uriString + "/sequential/iteration-" + i);
HttpRequest request = HttpRequest.newBuilder(uri)
.header("X-uuid", "uuid-" + requestCounter.incrementAndGet())
.build();
out.printf("Iteration %d request: %s%n", i, request.uri());
CompletableFuture<HttpResponse<String>> responseCF;
try {
responseCF = client.sendAsync(request, BodyHandlers.ofString());
} catch (RejectedExecutionException x) {
out.println(i + ": Got expected exception: " + x);
continue;
}
long sleep = RANDOM.nextLong(5);
if (sleep > 0) {
out.printf("%d: sleeping %d ms%n", i, sleep);
Thread.sleep(sleep);
}
if (i == step) {
out.printf("%d: shutting down executor now%n", i, sleep);
executorService.shutdownNow();
}
final int si = i;
responseCF.thenApply((response) -> {
out.println(si + ": Got response: " + response);
out.println(si + ": Got body Path: " + response.body());
assertEquals(response.statusCode(), 200);
assertEquals(response.body(), MESSAGE);
return response;
}).handle((r,t) -> {
if (t != null) {
try {
Throwable cause = getCause(t);
out.println(si + ": Got expected exception: " + cause);
checkCause(String.valueOf(si), cause);
} catch (Throwable ase) {
return CompletableFuture.failedFuture(ase);
}
return CompletableFuture.completedFuture(null);
} else {
return CompletableFuture.completedFuture(r);
}
}).thenCompose((c) -> c).get();
}
} finally {
client = null;
executorService.awaitTermination(2000, TimeUnit.MILLISECONDS);
}
}
// -- Infrastructure
@BeforeTest
public void setup() throws Exception {
out.println("\n**** Setup ****\n");
sslContext = new SimpleSSLContext().get();
if (sslContext == null)
throw new AssertionError("Unexpected null sslContext");
InetSocketAddress sa = new InetSocketAddress(InetAddress.getLoopbackAddress(), 0);
httpTestServer = HttpTestServer.of(HttpServer.create(sa, 0));
httpTestServer.addHandler(new ServerRequestHandler(), "/http1/exec/");
httpURI = "http://" + httpTestServer.serverAuthority() + "/http1/exec/retry";
HttpsServer httpsServer = HttpsServer.create(sa, 0);
httpsServer.setHttpsConfigurator(new HttpsConfigurator(sslContext));
httpsTestServer = HttpTestServer.of(httpsServer);
httpsTestServer.addHandler(new ServerRequestHandler(),"/https1/exec/");
httpsURI = "https://" + httpsTestServer.serverAuthority() + "/https1/exec/retry";
http2TestServer = HttpTestServer.of(new Http2TestServer("localhost", false, 0));
http2TestServer.addHandler(new ServerRequestHandler(), "/http2/exec/");
http2URI = "http://" + http2TestServer.serverAuthority() + "/http2/exec/retry";
https2TestServer = HttpTestServer.of(new Http2TestServer("localhost", true, sslContext));
https2TestServer.addHandler(new ServerRequestHandler(), "/https2/exec/");
https2URI = "https://" + https2TestServer.serverAuthority() + "/https2/exec/retry";
httpTestServer.start();
httpsTestServer.start();
http2TestServer.start();
https2TestServer.start();
}
@AfterTest
public void teardown() throws Exception {
Thread.sleep(100);
AssertionError fail = TRACKER.check(5000);
try {
httpTestServer.stop();
httpsTestServer.stop();
http2TestServer.stop();
https2TestServer.stop();
} finally {
if (fail != null) throw fail;
}
}
static class ServerRequestHandler implements HttpTestHandler {
ConcurrentHashMap<String,String> closedRequests = new ConcurrentHashMap<>();
@java.lang.Override
public void handle(HttpTestExchange t) throws IOException {
out.println("ServerRequestHandler for: " + t.getRequestURI());
List<String> uuids = t.getRequestHeaders().get("X-uuid");
if (uuids == null || uuids.size() != 1) {
readAllRequestData(t);
try (OutputStream os = t.getResponseBody()) {
String msg = "Incorrect uuid header values:[" + uuids + "]";
(new RuntimeException(msg)).printStackTrace();
t.sendResponseHeaders(500, -1);
os.write(msg.getBytes(UTF_8));
}
return;
}
String uuid = uuids.get(0);
// retrying
if (closedRequests.putIfAbsent(uuid, t.getRequestURI().toString()) == null) {
if (t.getExchangeVersion() == HttpClient.Version.HTTP_1_1) {
// Throwing an exception here only causes a retry
// with HTTP_1_1 - where it forces the server to close
// the connection.
// For HTTP/2 then throwing an IOE would cause the server
// to close the stream, and throwing anything else would
// cause it to close the connection, but neither would
// cause the client to retry.
// So we simply do not try to retry with HTTP/2.
out.println("Server will close connection, client will retry: "
+ t.getRequestURI().toString());
throw new IOException("Closing on first request");
}
}
// not retrying
readAllRequestData(t);
try (OutputStream os = t.getResponseBody()) {
byte[] bytes = MESSAGE.getBytes(UTF_8);
t.sendResponseHeaders(200, bytes.length);
for (int i=0; i<bytes.length; i++) {
os.write(bytes, i, 1);
os.flush();
try {
Thread.sleep(RANDOM.nextInt(5));
} catch (InterruptedException x) { }
}
}
closedRequests.remove(uuid);
}
}
static void readAllRequestData(HttpTestExchange t) throws IOException {
try (InputStream is = t.getRequestBody()) {
is.readAllBytes();
}
}
}

@ -1,5 +1,5 @@
/*
* Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2018, 2022, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -24,8 +24,12 @@
import jdk.internal.net.http.common.OperationTrackers;
import jdk.internal.net.http.common.OperationTrackers.Tracker;
import java.io.PrintStream;
import java.lang.management.ManagementFactory;
import java.net.http.HttpClient;
import java.util.Arrays;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.Predicate;
import java.util.stream.Collectors;
/**
@ -51,8 +55,12 @@ public class ReferenceTracker {
}
public StringBuilder diagnose(StringBuilder warnings) {
return diagnose(warnings, (t) -> t.getOutstandingHttpOperations() > 0);
}
public StringBuilder diagnose(StringBuilder warnings, Predicate<Tracker> hasOutstanding) {
for (Tracker tracker : TRACKERS) {
checkOutstandingOperations(warnings, tracker);
checkOutstandingOperations(warnings, tracker, hasOutstanding);
}
return warnings;
}
@ -76,22 +84,54 @@ public class ReferenceTracker {
}
public AssertionError check(long graceDelayMs) {
return check(graceDelayMs,
(t) -> t.getOutstandingHttpOperations() > 0,
"outstanding operations", true);
}
private void printThreads(String why, PrintStream out) {
out.println(why);
Arrays.stream(ManagementFactory.getThreadMXBean()
.dumpAllThreads(true, true))
.forEach(out::println);
}
public AssertionError check(long graceDelayMs,
Predicate<Tracker> hasOutstanding,
String description,
boolean printThreads) {
AssertionError fail = null;
if (hasOutstandingOperations()) {
try {
Thread.sleep(graceDelayMs);
} catch (InterruptedException x) {
// OK
}
StringBuilder warnings = diagnose(new StringBuilder());
graceDelayMs = Math.max(graceDelayMs, 100);
long delay = Math.min(graceDelayMs, 500);
var count = delay > 0 ? graceDelayMs / delay : 1;
for (int i = 0; i < count; i++) {
if (TRACKERS.stream().anyMatch(hasOutstanding)) {
System.gc();
try {
System.out.println("Waiting for HTTP operations to terminate...");
Thread.sleep(Math.min(graceDelayMs, Math.max(delay, 1)));
} catch (InterruptedException x) {
// OK
}
} else break;
}
if (TRACKERS.stream().anyMatch(hasOutstanding)) {
StringBuilder warnings = diagnose(new StringBuilder(), hasOutstanding);
addSummary(warnings);
if (hasOutstandingOperations()) {
if (TRACKERS.stream().anyMatch(hasOutstanding)) {
fail = new AssertionError(warnings.toString());
}
} else {
System.out.println("PASSED: No outstanding operations found in "
System.out.println("PASSED: No " + description + " found in "
+ getTrackedClientCount() + " clients");
}
if (fail != null) {
Predicate<Tracker> isAlive = Tracker::isSelectorAlive;
if (printThreads && TRACKERS.stream().anyMatch(isAlive)) {
printThreads("Some selector manager threads are still alive: ", System.out);
printThreads("Some selector manager threads are still alive: ", System.err);
}
}
return fail;
}
@ -108,23 +148,49 @@ public class ReferenceTracker {
.append(" operations still pending out of ")
.append(tracked)
.append(" tracked clients.");
System.out.println(warning.toString().substring(pos));
System.err.println(warning.toString().substring(pos));
System.out.println(warning.substring(pos));
System.err.println(warning.substring(pos));
}
private static void checkOutstandingOperations(StringBuilder warning, Tracker tracker) {
if (tracker.getOutstandingOperations() > 0) {
private static void checkOutstandingOperations(StringBuilder warning,
Tracker tracker,
Predicate<Tracker> hasOutsanding) {
if (hasOutsanding.test(tracker)) {
if (warning.length() > 0) warning.append("\n");
int pos = warning.length();
warning.append("WARNING: tracker for " + tracker.getName() + " has outstanding operations:");
warning.append("\n\tPending HTTP Requests: " + tracker.getOutstandingHttpRequests());
warning.append("\n\tPending HTTP/1.1 operations: " + tracker.getOutstandingHttpOperations());
warning.append("\n\tPending HTTP/2 streams: " + tracker.getOutstandingHttp2Streams());
warning.append("\n\tPending WebSocket operations: " + tracker.getOutstandingWebSocketOperations());
warning.append("\n\tPending TCP connections: " + tracker.getOutstandingTcpConnections());
warning.append("\n\tTotal pending operations: " + tracker.getOutstandingOperations());
warning.append("\n\tFacade referenced: " + tracker.isFacadeReferenced());
System.out.println(warning.toString().substring(pos));
System.err.println(warning.toString().substring(pos));
warning.append("\n\tSelector alive: " + tracker.isSelectorAlive());
System.out.println(warning.substring(pos));
System.err.println(warning.substring(pos));
}
}
private boolean isSelectorManager(Thread t) {
String name = t.getName();
if (name == null) return false;
return name.contains("SelectorManager");
}
// This is a slightly more permissive check than the default checks,
// it only verifies that all CFs returned by send/sendAsync have been
// completed, and that all opened channels have been closed, and that
// the selector manager thread has exited.
// It doesn't check that all refcounts have reached 0.
// This is typically useful to only check that resources have been released.
public AssertionError checkShutdown(long graceDelayMs) {
Predicate<Tracker> isAlive = Tracker::isSelectorAlive;
Predicate<Tracker> hasPendingRequests = (t) -> t.getOutstandingHttpRequests() > 0;
Predicate<Tracker> hasPendingConnections = (t) -> t.getOutstandingTcpConnections() > 0;
AssertionError failed = check(graceDelayMs,
isAlive.or(hasPendingRequests).or(hasPendingConnections),
"outstanding unclosed resources", true);
return failed;
}
}

@ -1,5 +1,5 @@
/*
* Copyright (c) 2015, 2020, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2015, 2022, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -24,17 +24,16 @@
import java.io.IOException;
import java.net.*;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import javax.net.ServerSocketFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLParameters;
import javax.net.ssl.SSLServerSocket;
import javax.net.ssl.SSLServerSocketFactory;
import javax.net.ssl.SNIServerName;
import jdk.internal.net.http.frame.ErrorFrame;
/**
@ -45,16 +44,19 @@ import jdk.internal.net.http.frame.ErrorFrame;
* obtained from the supplied ExecutorService.
*/
public class Http2TestServer implements AutoCloseable {
static final AtomicLong IDS = new AtomicLong();
final long id = IDS.incrementAndGet();
final ServerSocket server;
final boolean supportsHTTP11;
volatile boolean secure;
final ExecutorService exec;
volatile boolean stopping = false;
private volatile boolean stopping = false;
final Map<String,Http2Handler> handlers;
final SSLContext sslContext;
final String serverName;
final HashMap<InetSocketAddress,Http2TestServerConnection> connections;
final Set<Http2TestServerConnection> connections;
final Properties properties;
final String name;
private static ThreadFactory defaultThreadFac =
(Runnable r) -> {
@ -172,6 +174,7 @@ public class Http2TestServer implements AutoCloseable {
boolean supportsHTTP11)
throws Exception
{
this.name = "TestServer(%d)".formatted(id);
this.serverName = serverName;
this.supportsHTTP11 = supportsHTTP11;
if (secure) {
@ -188,7 +191,7 @@ public class Http2TestServer implements AutoCloseable {
this.exec = exec == null ? getDefaultExecutor() : exec;
this.handlers = Collections.synchronizedMap(new HashMap<>());
this.properties = properties;
this.connections = new HashMap<>();
this.connections = ConcurrentHashMap.newKeySet();
}
/**
@ -227,7 +230,7 @@ public class Http2TestServer implements AutoCloseable {
Http2Handler handler = href.get();
if (handler == null)
throw new RuntimeException("No handler found for path " + path);
System.err.println("Using handler for: " + bestMatch.get());
System.err.println(name + ": Using handler for: " + bestMatch.get());
return handler;
}
@ -241,8 +244,8 @@ public class Http2TestServer implements AutoCloseable {
public synchronized void stop() {
// TODO: clean shutdown GoAway
stopping = true;
System.err.printf("Server stopping %d connections\n", connections.size());
for (Http2TestServerConnection connection : connections.values()) {
System.err.printf("%s: stopping %d connections\n", name, connections.size());
for (Http2TestServerConnection connection : connections) {
connection.close(ErrorFrame.NO_ERROR);
}
try {
@ -279,11 +282,56 @@ public class Http2TestServer implements AutoCloseable {
private synchronized void putConnection(InetSocketAddress addr, Http2TestServerConnection c) {
if (!stopping)
connections.put(addr, c);
connections.add(c);
}
private synchronized void removeConnection(InetSocketAddress addr, Http2TestServerConnection c) {
connections.remove(addr, c);
connections.remove(c);
}
record AcceptedConnection(Http2TestServer server,
Socket socket) {
void startConnection() {
String name = server.name;
Http2TestServerConnection c = null;
InetSocketAddress addr = null;
try {
addr = (InetSocketAddress) socket.getRemoteSocketAddress();
System.err.println(name + ": creating connection");
c = server.createConnection(server, socket, server.exchangeSupplier);
server.putConnection(addr, c);
System.err.println(name + ": starting connection");
c.run();
System.err.println(name + ": connection started");
} catch (Throwable e) {
boolean stopping = server.stopping;
if (!stopping) {
System.err.println(name + ": unexpected exception: " + e);
e.printStackTrace();
}
// we should not reach here, but if we do
// the connection might not have been closed
// and if so then the client might wait
// forever.
if (c != null) {
server.removeConnection(addr, c);
}
try {
if (c != null) c.close(ErrorFrame.PROTOCOL_ERROR);
} catch (Exception x) {
if (!stopping)
System.err.println(name + ": failed to close connection: " + e);
} finally {
try {
socket.close();
} catch (IOException x) {
if (!stopping)
System.err.println(name + ": failed to close socket: " + e);
}
}
System.err.println(name + ": failed to start connection: " + e);
}
}
}
/**
@ -293,38 +341,37 @@ public class Http2TestServer implements AutoCloseable {
exec.submit(() -> {
try {
while (!stopping) {
System.err.println(name + ": accepting connections");
Socket socket = server.accept();
Http2TestServerConnection c = null;
InetSocketAddress addr = null;
System.err.println(name + ": connection accepted");
try {
addr = (InetSocketAddress) socket.getRemoteSocketAddress();
c = createConnection(this, socket, exchangeSupplier);
putConnection(addr, c);
c.run();
var accepted = new AcceptedConnection(this, socket);
exec.submit(accepted::startConnection);
} catch (Throwable e) {
if (!stopping) {
System.err.println(name + ": unexpected exception: " + e);
e.printStackTrace();
}
// we should not reach here, but if we do
// the connection might not have been closed
// and if so then the client might wait
// forever.
if (c != null) {
removeConnection(addr, c);
c.close(ErrorFrame.PROTOCOL_ERROR);
} else {
socket.close();
}
System.err.println("TestServer: start exception: " + e);
System.err.println(name + ": start exception: " + e);
}
System.err.println(name + ": stopping is: " + stopping);
}
} catch (SecurityException se) {
System.err.println("TestServer: terminating, caught " + se);
System.err.println(name + ": terminating, caught " + se);
se.printStackTrace();
stopping = true;
try { server.close(); } catch (IOException ioe) { /* ignore */}
} catch (Throwable e) {
if (!stopping) {
System.err.println("TestServer: terminating, caught " + e);
System.err.println(name + ": terminating, caught " + e);
e.printStackTrace();
}
} finally {
System.err.println(name + ": finished");
}
});
}
@ -338,6 +385,7 @@ public class Http2TestServer implements AutoCloseable {
@Override
public void close() throws Exception {
System.err.println(name + ": closing");
stop();
}
}

@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2021, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2022, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -47,6 +47,7 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.IntStream;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
@ -62,7 +63,6 @@ import jdk.internal.net.http.common.FlowTube;
* connection deadlines and purges the right connections
* from the cache.
* @bug 8187044 8187111 8221395
* @author danielfuchs
*/
public class ConnectionPoolTest {
@ -441,6 +441,8 @@ public class ConnectionPoolTest {
// Emulates an HttpConnection that has a strong reference to its HttpClient.
static class HttpConnectionStub extends HttpConnection {
static final AtomicLong IDS = new AtomicLong();
public HttpConnectionStub(
HttpClient client,
InetSocketAddress address,
@ -473,6 +475,12 @@ public class ConnectionPoolTest {
final SocketChannel channel;
volatile boolean closed, finished;
// Called from within super constructor
@Override
long newConnectionId(HttpClientImpl client) {
return IDS.incrementAndGet();
}
// Used for testing closeOrReturnToPool.
void finish(boolean finished) { this.finished = finished; }
void reopen() { closed = finished = false;}

@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2021, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2022, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -299,12 +299,12 @@ public class SSLEchoTubeTest extends AbstractSSLTubeTest {
Flow.Subscription s = subscription;
if (s == null || cancelled.get()) return;
long unfulfilled = queue.size() + --requested;
if (unfulfilled <= maxQueueSize/2) {
if (unfulfilled <= maxQueueSize / 2) {
long req = maxQueueSize - unfulfilled;
requested += req;
s.request(req);
System.out.printf("EchoTube request: %s [requested:%s, queue:%s, unfulfilled:%s]%n",
req, requested-req, queue.size(), unfulfilled );
req, requested - req, queue.size(), unfulfilled);
}
}