8338569: HTTP/1.1 CleanupTrigger may be triggerred after the next exchange started

Reviewed-by: jpai
This commit is contained in:
Daniel Fuchs 2024-08-29 08:54:02 +00:00
parent 362f9ce077
commit 723588a4e7
6 changed files with 167 additions and 64 deletions

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2015, 2023, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2015, 2024, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -44,6 +44,7 @@ import java.util.stream.Collectors;
import jdk.internal.net.http.common.Deadline;
import jdk.internal.net.http.common.FlowTube;
import jdk.internal.net.http.common.Log;
import jdk.internal.net.http.common.Logger;
import jdk.internal.net.http.common.TimeLine;
import jdk.internal.net.http.common.TimeSource;
@ -492,13 +493,13 @@ final class ConnectionPool {
// Remove a connection from the pool.
// should only be called while holding the ConnectionPool stateLock.
private void removeFromPool(HttpConnection c) {
private boolean removeFromPool(HttpConnection c) {
assert stateLock.isHeldByCurrentThread();
if (c instanceof PlainHttpConnection) {
removeFromPool(c, plainPool);
return removeFromPool(c, plainPool);
} else {
assert c.isSecure() : "connection " + c + " is not secure!";
removeFromPool(c, sslPool);
return removeFromPool(c, sslPool);
}
}
@ -524,18 +525,36 @@ final class ConnectionPool {
return false;
}
void cleanup(HttpConnection c, Throwable error) {
void cleanup(HttpConnection c, long pendingData, Throwable error) {
if (debug.on())
debug.log("%s : ConnectionPool.cleanup(%s)",
String.valueOf(c.getConnectionFlow()), error);
stateLock.lock();
boolean removed;
try {
removeFromPool(c);
removed = removeFromPool(c);
expiryList.remove(c);
} finally {
stateLock.unlock();
}
c.close();
if (!removed && pendingData != 0) {
// this should not happen; the cleanup may have consumed
// some data that wasn't supposed to be consumed, so
// the only thing we can do is log it and close the
// connection.
if (Log.errors()) {
Log.logError("WARNING: CleanupTrigger triggered for" +
" a connection not found in the pool: closing {0}", c.dbgString());
}
if (debug.on()) {
debug.log("WARNING: CleanupTrigger triggered for" +
" a connection not found in the pool: closing %s", c.dbgString());
}
Throwable cause = new IOException("Unexpected cleanup triggered for non pooled connection", error);
c.close(cause);
} else {
c.close();
}
}
/**
@ -549,6 +568,7 @@ final class ConnectionPool {
private final HttpConnection connection;
private volatile boolean done;
private volatile boolean dropped;
public CleanupTrigger(HttpConnection connection) {
this.connection = connection;
@ -556,9 +576,12 @@ final class ConnectionPool {
public boolean isDone() { return done;}
private void triggerCleanup(Throwable error) {
private void triggerCleanup(long pendingData, Throwable error) {
done = true;
cleanup(connection, error);
if (debug.on()) {
debug.log("Cleanup triggered for %s: pendingData:%s error:%s", this, pendingData, error);
}
cleanup(connection, pendingData, error);
}
@Override public void request(long n) {}
@ -566,15 +589,16 @@ final class ConnectionPool {
@Override
public void onSubscribe(Flow.Subscription subscription) {
if (dropped || done) return;
subscription.request(1);
}
@Override
public void onError(Throwable error) { triggerCleanup(error); }
public void onError(Throwable error) { triggerCleanup(0, error); }
@Override
public void onComplete() { triggerCleanup(null); }
public void onComplete() { triggerCleanup(0, null); }
@Override
public void onNext(List<ByteBuffer> item) {
triggerCleanup(new IOException("Data received while in pool"));
triggerCleanup(Utils.remaining(item), new IOException("Data received while in pool"));
}
@Override
@ -586,5 +610,10 @@ final class ConnectionPool {
public String toString() {
return "CleanupTrigger(" + connection.getConnectionFlow() + ")";
}
@Override
public void dropSubscription() {
dropped = true;
}
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2023, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2024, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -29,6 +29,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@ -557,7 +558,7 @@ final class SocketTube implements FlowTube {
implements Flow.Publisher<List<ByteBuffer>> {
private final InternalReadSubscription subscriptionImpl
= new InternalReadSubscription();
AtomicReference<ReadSubscription> pendingSubscription = new AtomicReference<>();
ConcurrentLinkedQueue<ReadSubscription> pendingSubscriptions = new ConcurrentLinkedQueue<>();
private volatile ReadSubscription subscription;
@Override
@ -565,14 +566,14 @@ final class SocketTube implements FlowTube {
Objects.requireNonNull(s);
TubeSubscriber sub = FlowTube.asTubeSubscriber(s);
ReadSubscription target = new ReadSubscription(subscriptionImpl, sub);
ReadSubscription previous = pendingSubscription.getAndSet(target);
if (previous != null && previous != target) {
ReadSubscription previous;
while ((previous = pendingSubscriptions.poll()) != null) {
if (debug.on())
debug.log("read publisher: dropping pending subscriber: "
+ previous.subscriber);
previous.errorRef.compareAndSet(null, errorRef.get());
// make sure no data will be routed to the old subscriber.
previous.stopReading();
previous.signalOnSubscribe();
if (subscriptionImpl.completed) {
previous.signalCompletion();
@ -580,8 +581,10 @@ final class SocketTube implements FlowTube {
previous.subscriber.dropSubscription();
}
}
ReadSubscription target = new ReadSubscription(subscriptionImpl, sub);
pendingSubscriptions.offer(target);
if (debug.on()) debug.log("read publisher got subscriber");
if (debug.on()) debug.log("read publisher got new subscriber: " + s);
subscriptionImpl.signalSubscribe();
debugState("leaving read.subscribe: ");
}
@ -606,6 +609,7 @@ final class SocketTube implements FlowTube {
volatile boolean subscribed;
volatile boolean cancelled;
volatile boolean completed;
private volatile boolean stopped;
public ReadSubscription(InternalReadSubscription impl,
TubeSubscriber subscriber) {
@ -623,11 +627,12 @@ final class SocketTube implements FlowTube {
@Override
public void request(long n) {
if (!cancelled) {
if (!cancelled && !stopped) {
// should be safe to not synchronize here.
impl.request(n);
} else {
if (debug.on())
debug.log("subscription cancelled, ignoring request %d", n);
debug.log("subscription stopped or cancelled, ignoring request %d", n);
}
}
@ -661,6 +666,32 @@ final class SocketTube implements FlowTube {
signalCompletion();
}
}
/**
* Called when switching subscriber on the {@link InternalReadSubscription}.
* This subscriber is the old subscriber. Demand on the internal
* subscription will be reset and reading will be paused until the
* new subscriber is subscribed.
* This should ensure that no data is routed to this subscriber
* until the new subscriber is subscribed.
*/
synchronized void stopReading() {
stopped = true;
impl.demand.reset();
}
synchronized boolean tryDecrementDemand() {
if (stopped) return false;
return impl.demand.tryDecrement();
}
synchronized boolean isStopped() {
return stopped;
}
synchronized void increaseDemand(long n) {
if (!stopped) impl.demand.increase(n);
}
}
final class InternalReadSubscription implements Flow.Subscription {
@ -835,7 +866,7 @@ final class SocketTube implements FlowTube {
// If we reach here then we must be in the selector thread.
assert client.isSelectorThread();
if (demand.tryDecrement()) {
if (current.tryDecrementDemand()) {
// we have demand.
try {
List<ByteBuffer> bytes = readAvailable(current.bufferSource);
@ -881,8 +912,10 @@ final class SocketTube implements FlowTube {
// event. This ensures that this loop is
// executed again when the socket becomes
// readable again.
demand.increase(1);
resumeReadEvent();
if (!current.isStopped()) {
current.increaseDemand(1);
resumeReadEvent();
}
if (errorRef.get() != null) continue;
debugState("leaving read() loop with no bytes");
return;
@ -922,30 +955,35 @@ final class SocketTube implements FlowTube {
}
boolean handlePending() {
ReadSubscription pending = pendingSubscription.getAndSet(null);
if (pending == null) return false;
if (debug.on())
debug.log("handling pending subscription for %s",
ReadSubscription pending;
boolean subscribed = false;
while ((pending = pendingSubscriptions.poll()) != null) {
subscribed = true;
if (debug.on())
debug.log("handling pending subscription for %s",
pending.subscriber);
ReadSubscription current = subscription;
if (current != null && current != pending && !completed) {
current.subscriber.dropSubscription();
}
if (debug.on()) debug.log("read demand reset to 0");
subscriptionImpl.demand.reset(); // subscriber will increase demand if it needs to.
pending.errorRef.compareAndSet(null, errorRef.get());
if (!readScheduler.isStopped()) {
subscription = pending;
} else {
if (debug.on()) debug.log("socket tube is already stopped");
}
if (debug.on()) debug.log("calling onSubscribe");
pending.signalOnSubscribe();
if (completed) {
ReadSubscription current = subscription;
if (current != null && current != pending && !completed) {
debug.log("dropping pending subscription for current %s",
current.subscriber);
current.subscriber.dropSubscription();
}
if (debug.on()) debug.log("read demand reset to 0");
subscriptionImpl.demand.reset(); // subscriber will increase demand if it needs to.
pending.errorRef.compareAndSet(null, errorRef.get());
pending.signalCompletion();
if (!readScheduler.isStopped()) {
subscription = pending;
} else {
if (debug.on()) debug.log("socket tube is already stopped");
}
if (debug.on()) debug.log("calling onSubscribe on " + pending.subscriber);
pending.signalOnSubscribe();
if (completed) {
pending.errorRef.compareAndSet(null, errorRef.get());
pending.signalCompletion();
}
}
return true;
return subscribed;
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2018, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2024, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -180,6 +180,10 @@ public interface FlowTube extends
public void onComplete() {
delegate.onComplete();
}
@Override
public String toString() {
return "TubeSubscriberWrapper("+delegate.toString()+")";
}
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2018, 2023, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2018, 2024, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -64,7 +64,7 @@ import static java.lang.String.format;
* @test
* @summary this test verifies that a client may provides authorization
* headers directly when connecting with a server.
* @bug 8087112
* @bug 8087112 8336655 8338569
* @library /test/lib /test/jdk/java/net/httpclient/lib
* @build jdk.httpclient.test.lib.common.HttpServerAdapters jdk.test.lib.net.SimpleSSLContext
* DigestEchoServer ReferenceTracker DigestEchoClient

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2023, 2024, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -210,12 +210,16 @@ public class ShutdownNow implements HttpServerAdapters {
}
CompletableFuture.allOf(responses.toArray(new CompletableFuture<?>[0])).get();
} finally {
if (client.awaitTermination(Duration.ofMillis(2000))) {
if (client.awaitTermination(Duration.ofMillis(2500))) {
out.println("Client terminated within expected delay");
assertTrue(client.isTerminated());
} else {
throw new AssertionError("client still running");
client = null;
var error = TRACKER.check(500);
if (error != null) throw error;
throw new AssertionError("client was still running, but exited after further delay: "
+ "timeout should be adjusted");
}
assertTrue(client.isTerminated());
}
}
@ -272,12 +276,16 @@ public class ShutdownNow implements HttpServerAdapters {
}).thenCompose((c) -> c).get();
}
} finally {
if (client.awaitTermination(Duration.ofMillis(2000))) {
if (client.awaitTermination(Duration.ofMillis(2500))) {
out.println("Client terminated within expected delay");
assertTrue(client.isTerminated());
} else {
throw new AssertionError("client still running");
client = null;
var error = TRACKER.check(500);
if (error != null) throw error;
throw new AssertionError("client was still running, but exited after further delay: "
+ "timeout should be adjusted");
}
assertTrue(client.isTerminated());
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2015, 2019, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2015, 2024, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -23,7 +23,7 @@
/*
* @test
* @bug 8087112 8178699
* @bug 8087112 8178699 8338569
* @modules java.net.http
* java.logging
* jdk.httpserver
@ -54,6 +54,8 @@ import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.net.InetSocketAddress;
import java.net.PasswordAuthentication;
@ -572,10 +574,12 @@ public class SmokeTest {
System.out.print("test7: " + target);
Path requestBody = getTempFile(128 * 1024);
// First test
URI uri = new URI(target);
HttpRequest request = HttpRequest.newBuilder().uri(uri).GET().build();
AtomicInteger count = new AtomicInteger();
for (int i=0; i<4; i++) {
URI uri = new URI(target+"?get-sync;count="+count.incrementAndGet());
System.out.println("Sending " + uri);
HttpRequest request = HttpRequest.newBuilder().uri(uri).GET().build();
HttpResponse<String> r = client.send(request, BodyHandlers.ofString());
String body = r.body();
if (!body.equals("OK")) {
@ -584,12 +588,15 @@ public class SmokeTest {
}
// Second test: 4 x parallel
request = HttpRequest.newBuilder()
.uri(uri)
.POST(BodyPublishers.ofFile(requestBody))
.build();
List<CompletableFuture<String>> futures = new LinkedList<>();
for (int i=0; i<4; i++) {
URI uri = new URI(target+"?post-async;count="+count.incrementAndGet());
System.out.println("Sending " + uri);
HttpRequest request = HttpRequest.newBuilder()
.uri(uri)
.POST(BodyPublishers.ofFile(requestBody))
.build();
futures.add(client.sendAsync(request, BodyHandlers.ofString())
.thenApply((response) -> {
if (response.statusCode() == 200)
@ -610,11 +617,17 @@ public class SmokeTest {
}
// Third test: Multiple of 4 parallel requests
request = HttpRequest.newBuilder(uri).GET().build();
BlockingQueue<String> q = new LinkedBlockingQueue<>();
Set<String> inFlight = ConcurrentHashMap.newKeySet();
for (int i=0; i<4; i++) {
URI uri = new URI(target+"?get-async;count="+count.incrementAndGet());
inFlight.add(uri.getQuery());
System.out.println("Sending " + uri);
HttpRequest request = HttpRequest.newBuilder(uri).GET().build();
client.sendAsync(request, BodyHandlers.ofString())
.thenApply((HttpResponse<String> resp) -> {
inFlight.remove(uri.getQuery());
System.out.println("Got response for: " + uri);
String body = resp.body();
putQ(q, body);
return body;
@ -630,8 +643,15 @@ public class SmokeTest {
if (!body.equals("OK")) {
throw new RuntimeException(body);
}
URI uri = new URI(target+"?get-async-next;count="+count.incrementAndGet());
inFlight.add(uri.getQuery());
System.out.println("Sending " + uri);
HttpRequest request = HttpRequest.newBuilder(uri).GET().build();
client.sendAsync(request, BodyHandlers.ofString())
.thenApply((resp) -> {
inFlight.remove(uri.getQuery());
System.out.println("Got response for: " + uri);
System.out.println("In flight: " + inFlight);
if (resp.statusCode() == 200)
putQ(q, resp.body());
else
@ -639,9 +659,13 @@ public class SmokeTest {
return null;
});
}
System.out.println("Waiting: In flight: " + inFlight);
System.out.println("Queue size: " + q.size());
// should be four left
for (int i=0; i<4; i++) {
takeQ(q);
System.out.println("Waiting: In flight: " + inFlight);
System.out.println("Queue size: " + q.size());
}
System.out.println(" OK");
}