8298340: java/net/httpclient/CancelRequestTest.java fails with AssertionError: Found some subscribers for testPostInterrupt

Reviewed-by: jpai
This commit is contained in:
Daniel Fuchs 2022-12-09 15:23:22 +00:00
parent 5a92bee185
commit 05d67f69e3
4 changed files with 50 additions and 5 deletions
src/java.net.http/share/classes/jdk/internal/net/http
test/jdk/java/net/httpclient

@ -1008,6 +1008,7 @@ class Http2Connection {
// This method is called when the HTTP/2 client is being
// stopped. Do not call it from anywhere else.
void closeAllStreams() {
if (debug.on()) debug.log("Close all streams");
for (var streamId : streams.keySet()) {
// safe to call without locking - see Stream::deRegister
decrementStreamsCount(streamId);

@ -273,6 +273,7 @@ class Stream<T> extends ExchangeImpl<T> {
if (debug.on()) debug.log("nullBody: streamid=%d", streamid);
// We should have an END_STREAM data frame waiting in the inputQ.
// We need a subscriber to force the scheduler to process it.
assert pendingResponseSubscriber == null;
pendingResponseSubscriber = HttpResponse.BodySubscribers.replacing(null);
sched.runOrSchedule();
}
@ -472,8 +473,14 @@ class Stream<T> extends ExchangeImpl<T> {
receiveDataFrame(new DataFrame(streamid, DataFrame.END_STREAM, List.of()));
}
} else if (frame instanceof DataFrame) {
if (cancelled) connection.dropDataFrame((DataFrame) frame);
else receiveDataFrame((DataFrame) frame);
if (cancelled) {
if (debug.on()) {
debug.log("request cancelled or stream closed: dropping data frame");
}
connection.dropDataFrame((DataFrame) frame);
} else {
receiveDataFrame((DataFrame) frame);
}
} else {
if (!cancelled) otherFrame(frame);
}
@ -1283,10 +1290,24 @@ class Stream<T> extends ExchangeImpl<T> {
}
}
}
if (closing) { // true if the stream has not been closed yet
if (responseSubscriber != null || pendingResponseSubscriber != null)
if (responseSubscriber != null || pendingResponseSubscriber != null) {
if (debug.on())
debug.log("stream %s closing due to %s", streamid, (Object)errorRef.get());
sched.runOrSchedule();
} else {
if (debug.on())
debug.log("stream %s closing due to %s before subscriber registered",
streamid, (Object)errorRef.get());
}
} else {
if (debug.on()) {
debug.log("stream %s already closed due to %s",
streamid, (Object)errorRef.get());
}
}
completeResponseExceptionally(e);
if (!requestBodyCF.isDone()) {
requestBodyCF.completeExceptionally(errorRef.get()); // we may be sending the body..
@ -1330,6 +1351,20 @@ class Stream<T> extends ExchangeImpl<T> {
if (debug.on()) debug.log("close stream %d", streamid);
Log.logTrace("Closing stream {0}", streamid);
connection.closeStream(streamid);
var s = responseSubscriber == null
? pendingResponseSubscriber
: responseSubscriber;
if (debug.on()) debug.log("subscriber is %s", s);
if (s instanceof Http2StreamResponseSubscriber<?> sw) {
if (debug.on()) debug.log("closing response subscriber stream %s", streamid);
// if the subscriber has already completed,
// there is nothing to do...
if (!sw.completed()) {
// otherwise make sure it will be completed
var cause = errorRef.get();
sw.complete(cause == null ? new IOException("stream closed") : cause);
}
}
Log.logTrace("Stream {0} closed", streamid);
}
@ -1554,10 +1589,12 @@ class Stream<T> extends ExchangeImpl<T> {
super.complete(t);
}
}
@Override
protected void onCancel() {
unregisterResponseSubscriber(this);
}
}
private static final VarHandle STREAM_STATE;

@ -30,7 +30,6 @@ import java.nio.ByteBuffer;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Subscription;
@ -176,6 +175,14 @@ public class HttpBodySubscriberWrapper<T> implements TrustedSubscriber<T> {
}
}
/**
* {@return true if this subscriber has already completed, either normally
* or abnormally}
*/
public boolean completed() {
return completed.get();
}
@Override
public CompletionStage<T> getBody() {
return userSubscriber.getBody();

@ -23,7 +23,7 @@
/*
* @test
* @bug 8245462 8229822 8254786 8297075 8297149
* @bug 8245462 8229822 8254786 8297075 8297149 8298340
* @summary Tests cancelling the request.
* @library /test/lib http2/server
* @key randomness