8299338: AssertionError in ResponseSubscribers$HttpResponseInputStream::onSubscribe

Reviewed-by: jpai
This commit is contained in:
Daniel Fuchs 2023-02-22 12:43:55 +00:00
parent 63ef214328
commit 575484806c
6 changed files with 36 additions and 33 deletions
src/java.net.http/share/classes/jdk/internal/net/http
test/jdk/java/net/httpclient

@ -266,7 +266,7 @@ class Http1Exchange<T> extends ExchangeImpl<T> {
// The Http1ResponseBodySubscriber is registered with the HttpClient
// to ensure that it gets completed if the SelectorManager aborts due
// to unexpected exceptions.
private void registerResponseSubscriber(Http1ResponseBodySubscriber<T> subscriber) {
private boolean registerResponseSubscriber(Http1ResponseBodySubscriber<T> subscriber) {
Throwable failed = null;
synchronized (lock) {
failed = this.failed;
@ -276,13 +276,14 @@ class Http1Exchange<T> extends ExchangeImpl<T> {
}
if (failed != null) {
subscriber.onError(failed);
return false;
} else {
client.registerSubscriber(subscriber);
return client.registerSubscriber(subscriber);
}
}
private void unregisterResponseSubscriber(Http1ResponseBodySubscriber<T> subscriber) {
client.unregisterSubscriber(subscriber);
private boolean unregisterResponseSubscriber(Http1ResponseBodySubscriber<T> subscriber) {
return client.unregisterSubscriber(subscriber);
}
@Override

@ -557,7 +557,14 @@ final class HttpClientImpl extends HttpClient implements Trackable {
client.subscribers.forEach(s -> s.onError(t));
}
public void registerSubscriber(HttpBodySubscriberWrapper<?> subscriber) {
/**
* Adds the given subscriber to the subscribers list, or call
* its {@linkplain HttpBodySubscriberWrapper#onError onError}
* method if the client is shutting down.
* @param subscriber the subscriber
* @return true if the subscriber was added to the list.
*/
public boolean registerSubscriber(HttpBodySubscriberWrapper<?> subscriber) {
if (!selmgr.isClosed()) {
synchronized (selmgr) {
if (!selmgr.isClosed()) {
@ -567,20 +574,28 @@ final class HttpClientImpl extends HttpClient implements Trackable {
debug.log("body subscriber registered: " + count);
}
}
return;
return true;
}
}
}
subscriber.onError(selmgr.selectorClosedException());
return false;
}
public void unregisterSubscriber(HttpBodySubscriberWrapper<?> subscriber) {
/**
* Remove the given subscriber from the subscribers list.
* @param subscriber the subscriber
* @return true if the subscriber was found and removed from the list.
*/
public boolean unregisterSubscriber(HttpBodySubscriberWrapper<?> subscriber) {
if (subscribers.remove(subscriber)) {
long count = pendingSubscribersCount.decrementAndGet();
if (debug.on()) {
debug.log("body subscriber unregistered: " + count);
}
return true;
}
return false;
}
private void closeConnection(HttpConnection conn) {

@ -545,7 +545,7 @@ public class ResponseSubscribers {
@Override
public void onSubscribe(Flow.Subscription s) {
Objects.requireNonNull(s);
if (debug.on()) debug.log("onSubscribed called");
if (debug.on()) debug.log("onSubscribe called");
try {
if (!subscribed.compareAndSet(false, true)) {
if (debug.on()) debug.log("Already subscribed: canceling");
@ -559,10 +559,12 @@ public class ResponseSubscribers {
closed = this.closed;
if (!closed) {
this.subscription = s;
// should contain at least 2
assert buffers.remainingCapacity() > 1
// should contain at least 2, unless closed or failed.
assert buffers.remainingCapacity() > 1 || failed != null
: "buffers capacity: " + buffers.remainingCapacity()
+ " closed: " + closed + " failed: " + failed;
+ ", closed: " + closed + ", terminated: "
+ buffers.contains(LAST_LIST)
+ ", failed: " + failed;
}
}
if (closed) {
@ -578,7 +580,7 @@ public class ResponseSubscribers {
} catch (Throwable t) {
failed = t;
if (debug.on())
debug.log("onSubscribed failed", t);
debug.log("onSubscribe failed", t);
try {
close();
} catch (IOException x) {

@ -351,12 +351,12 @@ class Stream<T> extends ExchangeImpl<T> {
// The Http2StreamResponseSubscriber is registered with the HttpClient
// to ensure that it gets completed if the SelectorManager aborts due
// to unexpected exceptions.
private void registerResponseSubscriber(Http2StreamResponseSubscriber<?> subscriber) {
client().registerSubscriber(subscriber);
private boolean registerResponseSubscriber(Http2StreamResponseSubscriber<?> subscriber) {
return client().registerSubscriber(subscriber);
}
private void unregisterResponseSubscriber(Http2StreamResponseSubscriber<?> subscriber) {
client().unregisterSubscriber(subscriber);
private boolean unregisterResponseSubscriber(Http2StreamResponseSubscriber<?> subscriber) {
return client().unregisterSubscriber(subscriber);
}
@Override

@ -274,21 +274,6 @@ public class HttpBodySubscriberWrapper<T> implements TrustedSubscriber<T> {
tryUnregister();
}
/**
* Called right before the userSubscriber::onSubscribe is called.
* @apiNote
* This method may be used by subclasses to perform cleanup
* related actions after a subscription has been successfully
* accepted.
* This method is called while holding a subscription
* lock.
* @implSpec
* This method calls {@link #tryRegister()}
*/
protected void onSubscribed() {
tryRegister();
}
/**
* Complete the subscriber, either normally or exceptionally
* ensure that the subscriber is completed only once.
@ -381,8 +366,8 @@ public class HttpBodySubscriberWrapper<T> implements TrustedSubscriber<T> {
// subscription is finished before calling onError;
subscriptionLock.lock();
try {
tryRegister();
if (markSubscribed()) {
onSubscribed();
SubscriptionWrapper wrapped = new SubscriptionWrapper(subscription);
userSubscriber.onSubscribe(this.subscription = wrapped);
} else {

@ -23,7 +23,7 @@
/*
* @test
* @bug 8277969
* @bug 8277969 8299338
* @summary Test for edge case where the executor is not accepting
* new tasks while the client is still running
* @library /test/lib /test/jdk/java/net/httpclient/lib