8296889: Race condition when cancelling a request

Reviewed-by: jpai
This commit is contained in:
Daniel Fuchs 2022-11-15 18:36:45 +00:00
parent 87530e66dd
commit 7357a1a379
4 changed files with 25 additions and 6 deletions

View File

@ -208,6 +208,11 @@ class Http1Exchange<T> extends ExchangeImpl<T> {
this.exchange = exchange; this.exchange = exchange;
} }
@Override
public void onSubscribed() {
exchange.registerResponseSubscriber(this);
}
@Override @Override
protected void complete(Throwable t) { protected void complete(Throwable t) {
try { try {
@ -459,7 +464,6 @@ class Http1Exchange<T> extends ExchangeImpl<T> {
BodySubscriber<T> subscriber = handler.apply(response); BodySubscriber<T> subscriber = handler.apply(response);
Http1ResponseBodySubscriber<T> bs = Http1ResponseBodySubscriber<T> bs =
new Http1ResponseBodySubscriber<T>(subscriber, this); new Http1ResponseBodySubscriber<T>(subscriber, this);
registerResponseSubscriber(bs);
return bs; return bs;
} }

View File

@ -344,7 +344,6 @@ class Stream<T> extends ExchangeImpl<T> {
Http2StreamResponseSubscriber<T> createResponseSubscriber(BodyHandler<T> handler, ResponseInfo response) { Http2StreamResponseSubscriber<T> createResponseSubscriber(BodyHandler<T> handler, ResponseInfo response) {
Http2StreamResponseSubscriber<T> subscriber = Http2StreamResponseSubscriber<T> subscriber =
new Http2StreamResponseSubscriber<>(handler.apply(response)); new Http2StreamResponseSubscriber<>(handler.apply(response));
registerResponseSubscriber(subscriber);
return subscriber; return subscriber;
} }
@ -1543,17 +1542,22 @@ class Stream<T> extends ExchangeImpl<T> {
super(subscriber); super(subscriber);
} }
@Override
public void onSubscribed() {
registerResponseSubscriber(this);
}
@Override @Override
protected void complete(Throwable t) { protected void complete(Throwable t) {
try { try {
Stream.this.unregisterResponseSubscriber(this); unregisterResponseSubscriber(this);
} finally { } finally {
super.complete(t); super.complete(t);
} }
} }
@Override @Override
protected void onCancel() { protected void onCancel() {
Stream.this.unregisterResponseSubscriber(this); unregisterResponseSubscriber(this);
} }
} }

View File

@ -127,6 +127,15 @@ public class HttpBodySubscriberWrapper<T> implements TrustedSubscriber<T> {
*/ */
protected void onCancel() { } protected void onCancel() { }
/**
* Called right after the userSubscriber::onSubscribe is called.
* @apiNote
* This method may be used by subclasses to perform cleanup
* related actions after a subscription has been succesfully
* accepted.
*/
protected void onSubscribed() { }
/** /**
* Complete the subscriber, either normally or exceptionally * Complete the subscriber, either normally or exceptionally
* ensure that the subscriber is completed only once. * ensure that the subscriber is completed only once.
@ -169,8 +178,9 @@ public class HttpBodySubscriberWrapper<T> implements TrustedSubscriber<T> {
public void onSubscribe(Flow.Subscription subscription) { public void onSubscribe(Flow.Subscription subscription) {
// race condition with propagateError: we need to wait until // race condition with propagateError: we need to wait until
// subscription is finished before calling onError; // subscription is finished before calling onError;
boolean onSubscribed;
synchronized (this) { synchronized (this) {
if (subscribed.compareAndSet(false, true)) { if ((onSubscribed = subscribed.compareAndSet(false, true))) {
SubscriptionWrapper wrapped = new SubscriptionWrapper(subscription); SubscriptionWrapper wrapped = new SubscriptionWrapper(subscription);
userSubscriber.onSubscribe(this.subscription = wrapped); userSubscriber.onSubscribe(this.subscription = wrapped);
} else { } else {
@ -181,6 +191,7 @@ public class HttpBodySubscriberWrapper<T> implements TrustedSubscriber<T> {
assert completed.get(); assert completed.get();
} }
} }
if (onSubscribed) onSubscribed();
} }
@Override @Override

View File

@ -23,7 +23,7 @@
/* /*
* @test * @test
* @bug 8245462 8229822 8254786 * @bug 8245462 8229822 8254786 8296889
* @summary Tests cancelling the request. * @summary Tests cancelling the request.
* @library /test/lib http2/server * @library /test/lib http2/server
* @key randomness * @key randomness