diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/ConnectionPool.java b/src/java.net.http/share/classes/jdk/internal/net/http/ConnectionPool.java index 0ad7b9d5992..1d8cb013295 100644 --- a/src/java.net.http/share/classes/jdk/internal/net/http/ConnectionPool.java +++ b/src/java.net.http/share/classes/jdk/internal/net/http/ConnectionPool.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2015, 2023, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2015, 2024, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -44,6 +44,7 @@ import java.util.stream.Collectors; import jdk.internal.net.http.common.Deadline; import jdk.internal.net.http.common.FlowTube; +import jdk.internal.net.http.common.Log; import jdk.internal.net.http.common.Logger; import jdk.internal.net.http.common.TimeLine; import jdk.internal.net.http.common.TimeSource; @@ -492,13 +493,13 @@ final class ConnectionPool { // Remove a connection from the pool. // should only be called while holding the ConnectionPool stateLock. - private void removeFromPool(HttpConnection c) { + private boolean removeFromPool(HttpConnection c) { assert stateLock.isHeldByCurrentThread(); if (c instanceof PlainHttpConnection) { - removeFromPool(c, plainPool); + return removeFromPool(c, plainPool); } else { assert c.isSecure() : "connection " + c + " is not secure!"; - removeFromPool(c, sslPool); + return removeFromPool(c, sslPool); } } @@ -524,18 +525,36 @@ final class ConnectionPool { return false; } - void cleanup(HttpConnection c, Throwable error) { + void cleanup(HttpConnection c, long pendingData, Throwable error) { if (debug.on()) debug.log("%s : ConnectionPool.cleanup(%s)", String.valueOf(c.getConnectionFlow()), error); stateLock.lock(); + boolean removed; try { - removeFromPool(c); + removed = removeFromPool(c); expiryList.remove(c); } finally { stateLock.unlock(); } - c.close(); + if (!removed && pendingData != 0) { + // this should not happen; the cleanup may have consumed + // some data that wasn't supposed to be consumed, so + // the only thing we can do is log it and close the + // connection. + if (Log.errors()) { + Log.logError("WARNING: CleanupTrigger triggered for" + + " a connection not found in the pool: closing {0}", c.dbgString()); + } + if (debug.on()) { + debug.log("WARNING: CleanupTrigger triggered for" + + " a connection not found in the pool: closing %s", c.dbgString()); + } + Throwable cause = new IOException("Unexpected cleanup triggered for non pooled connection", error); + c.close(cause); + } else { + c.close(); + } } /** @@ -549,6 +568,7 @@ final class ConnectionPool { private final HttpConnection connection; private volatile boolean done; + private volatile boolean dropped; public CleanupTrigger(HttpConnection connection) { this.connection = connection; @@ -556,9 +576,12 @@ final class ConnectionPool { public boolean isDone() { return done;} - private void triggerCleanup(Throwable error) { + private void triggerCleanup(long pendingData, Throwable error) { done = true; - cleanup(connection, error); + if (debug.on()) { + debug.log("Cleanup triggered for %s: pendingData:%s error:%s", this, pendingData, error); + } + cleanup(connection, pendingData, error); } @Override public void request(long n) {} @@ -566,15 +589,16 @@ final class ConnectionPool { @Override public void onSubscribe(Flow.Subscription subscription) { + if (dropped || done) return; subscription.request(1); } @Override - public void onError(Throwable error) { triggerCleanup(error); } + public void onError(Throwable error) { triggerCleanup(0, error); } @Override - public void onComplete() { triggerCleanup(null); } + public void onComplete() { triggerCleanup(0, null); } @Override public void onNext(List item) { - triggerCleanup(new IOException("Data received while in pool")); + triggerCleanup(Utils.remaining(item), new IOException("Data received while in pool")); } @Override @@ -586,5 +610,10 @@ final class ConnectionPool { public String toString() { return "CleanupTrigger(" + connection.getConnectionFlow() + ")"; } + + @Override + public void dropSubscription() { + dropped = true; + } } } diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java b/src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java index cbdf6633576..d0e30806d53 100644 --- a/src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java +++ b/src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017, 2023, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2017, 2024, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -29,6 +29,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.List; import java.util.Objects; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Flow; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -557,7 +558,7 @@ final class SocketTube implements FlowTube { implements Flow.Publisher> { private final InternalReadSubscription subscriptionImpl = new InternalReadSubscription(); - AtomicReference pendingSubscription = new AtomicReference<>(); + ConcurrentLinkedQueue pendingSubscriptions = new ConcurrentLinkedQueue<>(); private volatile ReadSubscription subscription; @Override @@ -565,14 +566,14 @@ final class SocketTube implements FlowTube { Objects.requireNonNull(s); TubeSubscriber sub = FlowTube.asTubeSubscriber(s); - ReadSubscription target = new ReadSubscription(subscriptionImpl, sub); - ReadSubscription previous = pendingSubscription.getAndSet(target); - - if (previous != null && previous != target) { + ReadSubscription previous; + while ((previous = pendingSubscriptions.poll()) != null) { if (debug.on()) debug.log("read publisher: dropping pending subscriber: " + previous.subscriber); previous.errorRef.compareAndSet(null, errorRef.get()); + // make sure no data will be routed to the old subscriber. + previous.stopReading(); previous.signalOnSubscribe(); if (subscriptionImpl.completed) { previous.signalCompletion(); @@ -580,8 +581,10 @@ final class SocketTube implements FlowTube { previous.subscriber.dropSubscription(); } } + ReadSubscription target = new ReadSubscription(subscriptionImpl, sub); + pendingSubscriptions.offer(target); - if (debug.on()) debug.log("read publisher got subscriber"); + if (debug.on()) debug.log("read publisher got new subscriber: " + s); subscriptionImpl.signalSubscribe(); debugState("leaving read.subscribe: "); } @@ -606,6 +609,7 @@ final class SocketTube implements FlowTube { volatile boolean subscribed; volatile boolean cancelled; volatile boolean completed; + private volatile boolean stopped; public ReadSubscription(InternalReadSubscription impl, TubeSubscriber subscriber) { @@ -623,11 +627,12 @@ final class SocketTube implements FlowTube { @Override public void request(long n) { - if (!cancelled) { + if (!cancelled && !stopped) { + // should be safe to not synchronize here. impl.request(n); } else { if (debug.on()) - debug.log("subscription cancelled, ignoring request %d", n); + debug.log("subscription stopped or cancelled, ignoring request %d", n); } } @@ -661,6 +666,32 @@ final class SocketTube implements FlowTube { signalCompletion(); } } + + /** + * Called when switching subscriber on the {@link InternalReadSubscription}. + * This subscriber is the old subscriber. Demand on the internal + * subscription will be reset and reading will be paused until the + * new subscriber is subscribed. + * This should ensure that no data is routed to this subscriber + * until the new subscriber is subscribed. + */ + synchronized void stopReading() { + stopped = true; + impl.demand.reset(); + } + + synchronized boolean tryDecrementDemand() { + if (stopped) return false; + return impl.demand.tryDecrement(); + } + + synchronized boolean isStopped() { + return stopped; + } + + synchronized void increaseDemand(long n) { + if (!stopped) impl.demand.increase(n); + } } final class InternalReadSubscription implements Flow.Subscription { @@ -835,7 +866,7 @@ final class SocketTube implements FlowTube { // If we reach here then we must be in the selector thread. assert client.isSelectorThread(); - if (demand.tryDecrement()) { + if (current.tryDecrementDemand()) { // we have demand. try { List bytes = readAvailable(current.bufferSource); @@ -881,8 +912,10 @@ final class SocketTube implements FlowTube { // event. This ensures that this loop is // executed again when the socket becomes // readable again. - demand.increase(1); - resumeReadEvent(); + if (!current.isStopped()) { + current.increaseDemand(1); + resumeReadEvent(); + } if (errorRef.get() != null) continue; debugState("leaving read() loop with no bytes"); return; @@ -922,30 +955,35 @@ final class SocketTube implements FlowTube { } boolean handlePending() { - ReadSubscription pending = pendingSubscription.getAndSet(null); - if (pending == null) return false; - if (debug.on()) - debug.log("handling pending subscription for %s", + ReadSubscription pending; + boolean subscribed = false; + while ((pending = pendingSubscriptions.poll()) != null) { + subscribed = true; + if (debug.on()) + debug.log("handling pending subscription for %s", pending.subscriber); - ReadSubscription current = subscription; - if (current != null && current != pending && !completed) { - current.subscriber.dropSubscription(); - } - if (debug.on()) debug.log("read demand reset to 0"); - subscriptionImpl.demand.reset(); // subscriber will increase demand if it needs to. - pending.errorRef.compareAndSet(null, errorRef.get()); - if (!readScheduler.isStopped()) { - subscription = pending; - } else { - if (debug.on()) debug.log("socket tube is already stopped"); - } - if (debug.on()) debug.log("calling onSubscribe"); - pending.signalOnSubscribe(); - if (completed) { + ReadSubscription current = subscription; + if (current != null && current != pending && !completed) { + debug.log("dropping pending subscription for current %s", + current.subscriber); + current.subscriber.dropSubscription(); + } + if (debug.on()) debug.log("read demand reset to 0"); + subscriptionImpl.demand.reset(); // subscriber will increase demand if it needs to. pending.errorRef.compareAndSet(null, errorRef.get()); - pending.signalCompletion(); + if (!readScheduler.isStopped()) { + subscription = pending; + } else { + if (debug.on()) debug.log("socket tube is already stopped"); + } + if (debug.on()) debug.log("calling onSubscribe on " + pending.subscriber); + pending.signalOnSubscribe(); + if (completed) { + pending.errorRef.compareAndSet(null, errorRef.get()); + pending.signalCompletion(); + } } - return true; + return subscribed; } } diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/common/FlowTube.java b/src/java.net.http/share/classes/jdk/internal/net/http/common/FlowTube.java index 023086e3dee..b87c4601763 100644 --- a/src/java.net.http/share/classes/jdk/internal/net/http/common/FlowTube.java +++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/FlowTube.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017, 2018, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2017, 2024, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -180,6 +180,10 @@ public interface FlowTube extends public void onComplete() { delegate.onComplete(); } + @Override + public String toString() { + return "TubeSubscriberWrapper("+delegate.toString()+")"; + } } } diff --git a/test/jdk/java/net/httpclient/DigestEchoClient.java b/test/jdk/java/net/httpclient/DigestEchoClient.java index 3b6d1a1773f..7038d82d91f 100644 --- a/test/jdk/java/net/httpclient/DigestEchoClient.java +++ b/test/jdk/java/net/httpclient/DigestEchoClient.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2018, 2023, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2018, 2024, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -64,7 +64,7 @@ import static java.lang.String.format; * @test * @summary this test verifies that a client may provides authorization * headers directly when connecting with a server. - * @bug 8087112 + * @bug 8087112 8336655 8338569 * @library /test/lib /test/jdk/java/net/httpclient/lib * @build jdk.httpclient.test.lib.common.HttpServerAdapters jdk.test.lib.net.SimpleSSLContext * DigestEchoServer ReferenceTracker DigestEchoClient diff --git a/test/jdk/java/net/httpclient/ShutdownNow.java b/test/jdk/java/net/httpclient/ShutdownNow.java index de2279277c7..77504b5052a 100644 --- a/test/jdk/java/net/httpclient/ShutdownNow.java +++ b/test/jdk/java/net/httpclient/ShutdownNow.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2023, 2024, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -210,12 +210,16 @@ public class ShutdownNow implements HttpServerAdapters { } CompletableFuture.allOf(responses.toArray(new CompletableFuture[0])).get(); } finally { - if (client.awaitTermination(Duration.ofMillis(2000))) { + if (client.awaitTermination(Duration.ofMillis(2500))) { out.println("Client terminated within expected delay"); + assertTrue(client.isTerminated()); } else { - throw new AssertionError("client still running"); + client = null; + var error = TRACKER.check(500); + if (error != null) throw error; + throw new AssertionError("client was still running, but exited after further delay: " + + "timeout should be adjusted"); } - assertTrue(client.isTerminated()); } } @@ -272,12 +276,16 @@ public class ShutdownNow implements HttpServerAdapters { }).thenCompose((c) -> c).get(); } } finally { - if (client.awaitTermination(Duration.ofMillis(2000))) { + if (client.awaitTermination(Duration.ofMillis(2500))) { out.println("Client terminated within expected delay"); + assertTrue(client.isTerminated()); } else { - throw new AssertionError("client still running"); + client = null; + var error = TRACKER.check(500); + if (error != null) throw error; + throw new AssertionError("client was still running, but exited after further delay: " + + "timeout should be adjusted"); } - assertTrue(client.isTerminated()); } } diff --git a/test/jdk/java/net/httpclient/SmokeTest.java b/test/jdk/java/net/httpclient/SmokeTest.java index cff2cee4d80..56294e8f02f 100644 --- a/test/jdk/java/net/httpclient/SmokeTest.java +++ b/test/jdk/java/net/httpclient/SmokeTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2015, 2019, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2015, 2024, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -23,7 +23,7 @@ /* * @test - * @bug 8087112 8178699 + * @bug 8087112 8178699 8338569 * @modules java.net.http * java.logging * jdk.httpserver @@ -54,6 +54,8 @@ import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; import java.net.InetSocketAddress; import java.net.PasswordAuthentication; @@ -572,10 +574,12 @@ public class SmokeTest { System.out.print("test7: " + target); Path requestBody = getTempFile(128 * 1024); // First test - URI uri = new URI(target); - HttpRequest request = HttpRequest.newBuilder().uri(uri).GET().build(); + AtomicInteger count = new AtomicInteger(); for (int i=0; i<4; i++) { + URI uri = new URI(target+"?get-sync;count="+count.incrementAndGet()); + System.out.println("Sending " + uri); + HttpRequest request = HttpRequest.newBuilder().uri(uri).GET().build(); HttpResponse r = client.send(request, BodyHandlers.ofString()); String body = r.body(); if (!body.equals("OK")) { @@ -584,12 +588,15 @@ public class SmokeTest { } // Second test: 4 x parallel - request = HttpRequest.newBuilder() - .uri(uri) - .POST(BodyPublishers.ofFile(requestBody)) - .build(); + List> futures = new LinkedList<>(); for (int i=0; i<4; i++) { + URI uri = new URI(target+"?post-async;count="+count.incrementAndGet()); + System.out.println("Sending " + uri); + HttpRequest request = HttpRequest.newBuilder() + .uri(uri) + .POST(BodyPublishers.ofFile(requestBody)) + .build(); futures.add(client.sendAsync(request, BodyHandlers.ofString()) .thenApply((response) -> { if (response.statusCode() == 200) @@ -610,11 +617,17 @@ public class SmokeTest { } // Third test: Multiple of 4 parallel requests - request = HttpRequest.newBuilder(uri).GET().build(); BlockingQueue q = new LinkedBlockingQueue<>(); + Set inFlight = ConcurrentHashMap.newKeySet(); for (int i=0; i<4; i++) { + URI uri = new URI(target+"?get-async;count="+count.incrementAndGet()); + inFlight.add(uri.getQuery()); + System.out.println("Sending " + uri); + HttpRequest request = HttpRequest.newBuilder(uri).GET().build(); client.sendAsync(request, BodyHandlers.ofString()) .thenApply((HttpResponse resp) -> { + inFlight.remove(uri.getQuery()); + System.out.println("Got response for: " + uri); String body = resp.body(); putQ(q, body); return body; @@ -630,8 +643,15 @@ public class SmokeTest { if (!body.equals("OK")) { throw new RuntimeException(body); } + URI uri = new URI(target+"?get-async-next;count="+count.incrementAndGet()); + inFlight.add(uri.getQuery()); + System.out.println("Sending " + uri); + HttpRequest request = HttpRequest.newBuilder(uri).GET().build(); client.sendAsync(request, BodyHandlers.ofString()) .thenApply((resp) -> { + inFlight.remove(uri.getQuery()); + System.out.println("Got response for: " + uri); + System.out.println("In flight: " + inFlight); if (resp.statusCode() == 200) putQ(q, resp.body()); else @@ -639,9 +659,13 @@ public class SmokeTest { return null; }); } + System.out.println("Waiting: In flight: " + inFlight); + System.out.println("Queue size: " + q.size()); // should be four left for (int i=0; i<4; i++) { takeQ(q); + System.out.println("Waiting: In flight: " + inFlight); + System.out.println("Queue size: " + q.size()); } System.out.println(" OK"); }