8297149: REDO JDK-8296889: Race condition when cancelling a request
8297075: java/net/httpclient/CancelStreamedBodyTest.java fails with "java.lang.AssertionError: WARNING: tracker for HttpClientImpl(1) has outstanding operations" Reviewed-by: jpai
This commit is contained in:
parent
4120db13d4
commit
134acab5a4
src/java.net.http/share/classes/jdk/internal/net/http
test/jdk/java/net/httpclient
@ -208,6 +208,11 @@ class Http1Exchange<T> extends ExchangeImpl<T> {
|
||||
this.exchange = exchange;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void onSubscribed() {
|
||||
exchange.registerResponseSubscriber(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void complete(Throwable t) {
|
||||
try {
|
||||
@ -459,7 +464,6 @@ class Http1Exchange<T> extends ExchangeImpl<T> {
|
||||
BodySubscriber<T> subscriber = handler.apply(response);
|
||||
Http1ResponseBodySubscriber<T> bs =
|
||||
new Http1ResponseBodySubscriber<T>(subscriber, this);
|
||||
registerResponseSubscriber(bs);
|
||||
return bs;
|
||||
}
|
||||
|
||||
|
@ -344,7 +344,6 @@ class Stream<T> extends ExchangeImpl<T> {
|
||||
Http2StreamResponseSubscriber<T> createResponseSubscriber(BodyHandler<T> handler, ResponseInfo response) {
|
||||
Http2StreamResponseSubscriber<T> subscriber =
|
||||
new Http2StreamResponseSubscriber<>(handler.apply(response));
|
||||
registerResponseSubscriber(subscriber);
|
||||
return subscriber;
|
||||
}
|
||||
|
||||
@ -1543,17 +1542,22 @@ class Stream<T> extends ExchangeImpl<T> {
|
||||
super(subscriber);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void onSubscribed() {
|
||||
registerResponseSubscriber(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void complete(Throwable t) {
|
||||
try {
|
||||
Stream.this.unregisterResponseSubscriber(this);
|
||||
unregisterResponseSubscriber(this);
|
||||
} finally {
|
||||
super.complete(t);
|
||||
}
|
||||
}
|
||||
@Override
|
||||
protected void onCancel() {
|
||||
Stream.this.unregisterResponseSubscriber(this);
|
||||
unregisterResponseSubscriber(this);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -30,11 +30,13 @@ import java.nio.ByteBuffer;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.CompletionStage;
|
||||
import java.util.concurrent.Flow;
|
||||
import java.util.concurrent.Flow.Subscription;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
import jdk.internal.net.http.ResponseSubscribers.TrustedSubscriber;
|
||||
|
||||
@ -62,6 +64,7 @@ public class HttpBodySubscriberWrapper<T> implements TrustedSubscriber<T> {
|
||||
final BodySubscriber<T> userSubscriber;
|
||||
final AtomicBoolean completed = new AtomicBoolean();
|
||||
final AtomicBoolean subscribed = new AtomicBoolean();
|
||||
final ReentrantLock subscriptionLock = new ReentrantLock();
|
||||
volatile SubscriptionWrapper subscription;
|
||||
volatile Throwable withError;
|
||||
public HttpBodySubscriberWrapper(BodySubscriber<T> userSubscriber) {
|
||||
@ -100,16 +103,20 @@ public class HttpBodySubscriberWrapper<T> implements TrustedSubscriber<T> {
|
||||
// subscribed yet.
|
||||
private void propagateError(Throwable t) {
|
||||
assert t != null;
|
||||
assert completed.get();
|
||||
try {
|
||||
// if unsubscribed at this point, it will not
|
||||
// get subscribed later - so do it now and
|
||||
// propagate the error
|
||||
// Race condition with onSubscribe: we need to wait until
|
||||
// subscription is finished before calling onError;
|
||||
synchronized (this) {
|
||||
subscriptionLock.lock();
|
||||
try {
|
||||
if (subscribed.compareAndSet(false, true)) {
|
||||
userSubscriber.onSubscribe(NOP);
|
||||
}
|
||||
} finally {
|
||||
subscriptionLock.unlock();
|
||||
}
|
||||
} finally {
|
||||
// if onError throws then there is nothing to do
|
||||
@ -127,6 +134,15 @@ public class HttpBodySubscriberWrapper<T> implements TrustedSubscriber<T> {
|
||||
*/
|
||||
protected void onCancel() { }
|
||||
|
||||
/**
|
||||
* Called right before the userSubscriber::onSubscribe is called.
|
||||
* @apiNote
|
||||
* This method may be used by subclasses to perform cleanup
|
||||
* related actions after a subscription has been succesfully
|
||||
* accepted.
|
||||
*/
|
||||
protected void onSubscribed() { }
|
||||
|
||||
/**
|
||||
* Complete the subscriber, either normally or exceptionally
|
||||
* ensure that the subscriber is completed only once.
|
||||
@ -169,22 +185,23 @@ public class HttpBodySubscriberWrapper<T> implements TrustedSubscriber<T> {
|
||||
public void onSubscribe(Flow.Subscription subscription) {
|
||||
// race condition with propagateError: we need to wait until
|
||||
// subscription is finished before calling onError;
|
||||
synchronized (this) {
|
||||
subscriptionLock.lock();
|
||||
try {
|
||||
if (subscribed.compareAndSet(false, true)) {
|
||||
onSubscribed();
|
||||
SubscriptionWrapper wrapped = new SubscriptionWrapper(subscription);
|
||||
userSubscriber.onSubscribe(this.subscription = wrapped);
|
||||
} else {
|
||||
// could be already subscribed and completed
|
||||
// if an unexpected error occurred before the actual
|
||||
// subscription - though that's not supposed
|
||||
// happen.
|
||||
assert completed.get();
|
||||
subscription.cancel();
|
||||
}
|
||||
} finally {
|
||||
subscriptionLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onNext(List<ByteBuffer> item) {
|
||||
assert subscribed.get();
|
||||
if (completed.get()) {
|
||||
SubscriptionWrapper subscription = this.subscription;
|
||||
if (subscription != null) {
|
||||
|
@ -23,7 +23,7 @@
|
||||
|
||||
/*
|
||||
* @test
|
||||
* @bug 8245462 8229822 8254786
|
||||
* @bug 8245462 8229822 8254786 8297075 8297149
|
||||
* @summary Tests cancelling the request.
|
||||
* @library /test/lib http2/server
|
||||
* @key randomness
|
||||
|
@ -23,7 +23,7 @@
|
||||
|
||||
/*
|
||||
* @test
|
||||
* @bug 8294916
|
||||
* @bug 8294916 8297075 8297149
|
||||
* @summary Tests that closing a streaming handler (ofInputStream()/ofLines())
|
||||
* without reading all the bytes unregisters the underlying subscriber.
|
||||
* @library /test/lib http2/server
|
||||
|
Loading…
x
Reference in New Issue
Block a user