8317808: HTTP/2 stream cancelImpl may leave subscriber registered

Reviewed-by: michaelm, djelinski
This commit is contained in:
Daniel Fuchs 2023-10-13 14:35:50 +00:00
parent 7d31146f4d
commit 6273ab97dc
4 changed files with 25 additions and 7 deletions

View File

@ -578,9 +578,10 @@ final class HttpClientImpl extends HttpClient implements Trackable {
if (debug.on()) { if (debug.on()) {
debug.log("body subscriber registered: " + count); debug.log("body subscriber registered: " + count);
} }
}
return true; return true;
} }
return false;
}
} finally { } finally {
selmgr.unlock(); selmgr.unlock();
} }

View File

@ -183,7 +183,7 @@ class Stream<T> extends ExchangeImpl<T> {
if (debug.on()) debug.log("subscribing user subscriber"); if (debug.on()) debug.log("subscribing user subscriber");
subscriber.onSubscribe(userSubscription); subscriber.onSubscribe(userSubscription);
} }
while (!inputQ.isEmpty()) { while (!inputQ.isEmpty() && errorRef.get() == null) {
Http2Frame frame = inputQ.peek(); Http2Frame frame = inputQ.peek();
if (frame instanceof ResetFrame rf) { if (frame instanceof ResetFrame rf) {
inputQ.remove(); inputQ.remove();
@ -417,6 +417,10 @@ class Stream<T> extends ExchangeImpl<T> {
// pushes entire response body into response subscriber // pushes entire response body into response subscriber
// blocking when required by local or remote flow control // blocking when required by local or remote flow control
CompletableFuture<T> receiveData(BodySubscriber<T> bodySubscriber, Executor executor) { CompletableFuture<T> receiveData(BodySubscriber<T> bodySubscriber, Executor executor) {
// ensure that the body subscriber will be subscribed and onError() is
// invoked
pendingResponseSubscriber = bodySubscriber;
// We want to allow the subscriber's getBody() method to block so it // We want to allow the subscriber's getBody() method to block so it
// can work with InputStreams. So, we offload execution. // can work with InputStreams. So, we offload execution.
responseBodyCF = ResponseSubscribers.getBodyAsync(executor, bodySubscriber, responseBodyCF = ResponseSubscribers.getBodyAsync(executor, bodySubscriber,
@ -427,9 +431,6 @@ class Stream<T> extends ExchangeImpl<T> {
responseBodyCF.completeExceptionally(t); responseBodyCF.completeExceptionally(t);
} }
// ensure that the body subscriber will be subscribed and onError() is
// invoked
pendingResponseSubscriber = bodySubscriber;
sched.runOrSchedule(); // in case data waiting already to be processed, or error sched.runOrSchedule(); // in case data waiting already to be processed, or error
return responseBodyCF; return responseBodyCF;

View File

@ -474,6 +474,7 @@ public abstract class AbstractThrowingSubscribers implements HttpServerAdapters
if (response != null) { if (response != null) {
finisher.finish(where, response, thrower); finisher.finish(where, response, thrower);
} }
var tracker = TRACKER.getTracker(client);
if (!sameClient) { if (!sameClient) {
// Wait for the client to be garbage collected. // Wait for the client to be garbage collected.
// we use the ReferenceTracker API rather than HttpClient::close here, // we use the ReferenceTracker API rather than HttpClient::close here,
@ -482,7 +483,6 @@ public abstract class AbstractThrowingSubscribers implements HttpServerAdapters
// By using the ReferenceTracker, we will get some diagnosis about what // By using the ReferenceTracker, we will get some diagnosis about what
// is keeping the client alive if it doesn't get GC'ed within the // is keeping the client alive if it doesn't get GC'ed within the
// expected time frame. // expected time frame.
var tracker = TRACKER.getTracker(client);
client = null; client = null;
System.gc(); System.gc();
System.out.println(now() + "waiting for client to shutdown: " + tracker.getName()); System.out.println(now() + "waiting for client to shutdown: " + tracker.getName());
@ -491,6 +491,14 @@ public abstract class AbstractThrowingSubscribers implements HttpServerAdapters
if (error != null) throw error; if (error != null) throw error;
System.out.println(now() + "client shutdown normally: " + tracker.getName()); System.out.println(now() + "client shutdown normally: " + tracker.getName());
System.err.println(now() + "client shutdown normally: " + tracker.getName()); System.err.println(now() + "client shutdown normally: " + tracker.getName());
} else {
System.out.println(now() + "waiting for operation to finish: " + tracker.getName());
System.err.println(now() + "waiting for operation to finish: " + tracker.getName());
var error = TRACKER.checkFinished(tracker, 10000);
if (error != null) throw error;
System.out.println(now() + "operation finished normally: " + tracker.getName());
System.err.println(now() + "operation finished normally: " + tracker.getName());
} }
} }
} }
@ -800,7 +808,7 @@ public abstract class AbstractThrowingSubscribers implements HttpServerAdapters
sharedClient == null ? null : sharedClient.toString(); sharedClient == null ? null : sharedClient.toString();
sharedClient = null; sharedClient = null;
Thread.sleep(100); Thread.sleep(100);
AssertionError fail = TRACKER.check(500); AssertionError fail = TRACKER.check(5000);
try { try {
httpTestServer.stop(); httpTestServer.stop();
httpsTestServer.stop(); httpsTestServer.stop();

View File

@ -115,6 +115,14 @@ public class ReferenceTracker {
"outstanding operations or unreleased resources", true); "outstanding operations or unreleased resources", true);
} }
public AssertionError checkFinished(Tracker tracker, long graceDelayMs) {
Predicate<Tracker> hasOperations = (t) -> t.getOutstandingOperations() > 0;
Predicate<Tracker> hasSubscribers = (t) -> t.getOutstandingSubscribers() > 0;
return check(tracker, graceDelayMs,
hasOperations.or(hasSubscribers),
"outstanding operations or unreleased resources", false);
}
public AssertionError check(long graceDelayMs) { public AssertionError check(long graceDelayMs) {
Predicate<Tracker> hasOperations = (t) -> t.getOutstandingOperations() > 0; Predicate<Tracker> hasOperations = (t) -> t.getOutstandingOperations() > 0;
Predicate<Tracker> hasSubscribers = (t) -> t.getOutstandingSubscribers() > 0; Predicate<Tracker> hasSubscribers = (t) -> t.getOutstandingSubscribers() > 0;