diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java b/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java index bbb63718dc6..f3eedf8fe6e 100644 --- a/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java +++ b/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java @@ -160,6 +160,10 @@ class Stream extends ExchangeImpl { // send lock: prevent sending DataFrames after reset occurred. private final Lock sendLock = new ReentrantLock(); private final Lock stateLock = new ReentrantLock(); + // inputQ lock: methods that take from the inputQ + // must not run concurrently. + private final Lock inputQLock = new ReentrantLock(); + /** * A reference to this Stream's connection Send Window controller. The * stream MUST acquire the appropriate amount of Send Window before @@ -183,6 +187,8 @@ class Stream extends ExchangeImpl { private void schedule() { boolean onCompleteCalled = false; HttpResponse.BodySubscriber subscriber = responseSubscriber; + // prevents drainInputQueue() from running concurrently + inputQLock.lock(); try { if (subscriber == null) { // pendingResponseSubscriber will be null until response headers have been received and @@ -199,7 +205,7 @@ class Stream extends ExchangeImpl { Http2Frame frame = inputQ.peek(); if (frame instanceof ResetFrame rf) { inputQ.remove(); - if (endStreamReceived() && rf.getErrorCode() == ResetFrame.NO_ERROR) { + if (endStreamReceived() && rf.getErrorCode() == ResetFrame.NO_ERROR) { // If END_STREAM is already received, complete the requestBodyCF successfully // and stop sending any request data. requestBodyCF.complete(null); @@ -208,7 +214,7 @@ class Stream extends ExchangeImpl { } return; } - DataFrame df = (DataFrame)frame; + DataFrame df = (DataFrame) frame; boolean finished = df.getFlag(DataFrame.END_STREAM); List buffers = df.getData(); @@ -256,6 +262,7 @@ class Stream extends ExchangeImpl { } catch (Throwable throwable) { errorRef.compareAndSet(null, throwable); } finally { + inputQLock.unlock(); if (sched.isStopped()) drainInputQueue(); } @@ -274,26 +281,36 @@ class Stream extends ExchangeImpl { } catch (Throwable x) { Log.logError("Subscriber::onError threw exception: {0}", t); } finally { + // cancelImpl will eventually call drainInputQueue(); cancelImpl(t); - drainInputQueue(); } } } - // must only be called from the scheduler schedule() loop. - // ensure that all received data frames are accounted for + // Called from the scheduler schedule() loop, + // or after resetting the stream. + // Ensures that all received data frames are accounted for // in the connection window flow control if the scheduler // is stopped before all the data is consumed. + // The inputQLock is used to prevent concurrently taking + // from the queue. private void drainInputQueue() { Http2Frame frame; - while ((frame = inputQ.poll()) != null) { - if (frame instanceof DataFrame df) { - // Data frames that have been added to the inputQ - // must be released using releaseUnconsumed() to - // account for the amount of unprocessed bytes - // tracked by the connection.windowUpdater. - connection.releaseUnconsumed(df); + // will wait until schedule() has finished taking + // from the queue, if needed. + inputQLock.lock(); + try { + while ((frame = inputQ.poll()) != null) { + if (frame instanceof DataFrame df) { + // Data frames that have been added to the inputQ + // must be released using releaseUnconsumed() to + // account for the amount of unprocessed bytes + // tracked by the connection.windowUpdater. + connection.releaseUnconsumed(df); + } } + } finally { + inputQLock.unlock(); } } @@ -405,12 +422,38 @@ class Stream extends ExchangeImpl { return; } } - inputQ.add(df); + pushDataFrame(len, df); } finally { sched.runOrSchedule(); } } + // Ensures that no data frame is pushed on the inputQ + // after the stream is closed. + // Changes to the `closed` boolean are guarded by the + // stateLock. Contention should be low as only one + // thread at a time adds to the inputQ, and + // we can only contend when closing the stream. + // Note that this method can run concurrently with + // methods holding the inputQLock: that is OK. + // The inputQLock is there to ensure that methods + // taking from the queue are not running concurrently + // with each others, but concurrently adding at the + // end of the queue while peeking/polling at the head + // is OK. + private void pushDataFrame(int len, DataFrame df) { + boolean closed = false; + stateLock.lock(); + try { + if (!(closed = this.closed)) { + inputQ.add(df); + } + } finally { + stateLock.unlock(); + } + if (closed && len > 0) connection.releaseUnconsumed(df); + } + /** Handles a RESET frame. RESET is always handled inline in the queue. */ private void receiveResetFrame(ResetFrame frame) { inputQ.add(frame); @@ -1547,6 +1590,8 @@ class Stream extends ExchangeImpl { } } catch (Throwable ex) { Log.logError(ex); + } finally { + drainInputQueue(); } } @@ -1770,7 +1815,7 @@ class Stream extends ExchangeImpl { @Override protected boolean windowSizeExceeded(long received) { onProtocolError(new ProtocolException("stream %s flow control window exceeded" - .formatted(streamid)), ResetFrame.FLOW_CONTROL_ERROR); + .formatted(streamid)), ResetFrame.FLOW_CONTROL_ERROR); return true; } } diff --git a/test/jdk/java/net/httpclient/http2/ConnectionFlowControlTest.java b/test/jdk/java/net/httpclient/http2/ConnectionFlowControlTest.java index 6b0b3727ee2..30cc9122d9d 100644 --- a/test/jdk/java/net/httpclient/http2/ConnectionFlowControlTest.java +++ b/test/jdk/java/net/httpclient/http2/ConnectionFlowControlTest.java @@ -171,7 +171,11 @@ public class ConnectionFlowControlTest { var response = responses.get(keys[i]); String ckey = response.headers().firstValue("X-Connection-Key").get(); if (label == null) label = ckey; - assertEquals(ckey, label, "Unexpected key for " + query); + if (i < max - 1) { + // the connection window might be exceeded at i == max - 2, which + // means that the last request could go on a new connection. + assertEquals(ckey, label, "Unexpected key for " + query); + } int wait = uri.startsWith("https://") ? 500 : 250; try (InputStream is = response.body()) { Thread.sleep(Utils.adjustTimeout(wait)); diff --git a/test/jdk/java/net/httpclient/http2/StreamFlowControlTest.java b/test/jdk/java/net/httpclient/http2/StreamFlowControlTest.java index 36b727e3a22..39768b68dc5 100644 --- a/test/jdk/java/net/httpclient/http2/StreamFlowControlTest.java +++ b/test/jdk/java/net/httpclient/http2/StreamFlowControlTest.java @@ -23,7 +23,7 @@ /* * @test - * @bug 8342075 + * @bug 8342075 8343855 * @library /test/lib /test/jdk/java/net/httpclient/lib * @build jdk.httpclient.test.lib.http2.Http2TestServer jdk.test.lib.net.SimpleSSLContext * @run testng/othervm -Djdk.internal.httpclient.debug=true @@ -40,7 +40,6 @@ import java.net.URI; import java.net.http.HttpClient; import java.net.http.HttpHeaders; import java.net.http.HttpRequest; -import java.net.http.HttpRequest.BodyPublishers; import java.net.http.HttpResponse; import java.net.http.HttpResponse.BodyHandlers; import java.nio.charset.StandardCharsets; @@ -53,6 +52,7 @@ import java.util.function.Consumer; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLSession; +import jdk.httpclient.test.lib.common.HttpServerAdapters.HttpHeadOrGetHandler; import jdk.httpclient.test.lib.common.HttpServerAdapters.HttpTestServer; import jdk.httpclient.test.lib.http2.BodyOutputStream; import jdk.httpclient.test.lib.http2.Http2Handler; @@ -69,6 +69,7 @@ import org.testng.annotations.BeforeTest; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +import static java.util.concurrent.TimeUnit.NANOSECONDS; import static org.testng.Assert.assertEquals; import static org.testng.Assert.fail; @@ -92,6 +93,19 @@ public class StreamFlowControlTest { }; } + static void sleep(long wait) throws InterruptedException { + if (wait <= 0) return; + long remaining = Utils.adjustTimeout(wait); + long start = System.nanoTime(); + while (remaining > 0) { + Thread.sleep(remaining); + long end = System.nanoTime(); + remaining = remaining - NANOSECONDS.toMillis(end - start); + } + System.out.printf("Waited %s ms%n", + NANOSECONDS.toMillis(System.nanoTime() - start)); + } + @Test(dataProvider = "variants") void test(String uri, @@ -115,7 +129,7 @@ public class StreamFlowControlTest { CompletableFuture sent = new CompletableFuture<>(); responseSent.put(query, sent); HttpRequest request = HttpRequest.newBuilder(uriWithQuery) - .POST(BodyPublishers.ofString("Hello there!")) + .GET() .build(); System.out.println("\nSending request:" + uriWithQuery); final HttpClient cc = client; @@ -130,9 +144,9 @@ public class StreamFlowControlTest { // we have to pull to get the exception, but slow enough // so that DataFrames are buffered up to the point that // the window is exceeded... - int wait = uri.startsWith("https://") ? 500 : 350; + long wait = uri.startsWith("https://") ? 800 : 350; try (InputStream is = response.body()) { - Thread.sleep(Utils.adjustTimeout(wait)); + sleep(wait); is.readAllBytes(); } // we could fail here if we haven't waited long enough @@ -174,7 +188,7 @@ public class StreamFlowControlTest { CompletableFuture sent = new CompletableFuture<>(); responseSent.put(query, sent); HttpRequest request = HttpRequest.newBuilder(uriWithQuery) - .POST(BodyPublishers.ofString("Hello there!")) + .GET() .build(); System.out.println("\nSending request:" + uriWithQuery); final HttpClient cc = client; @@ -188,9 +202,9 @@ public class StreamFlowControlTest { assertEquals(key, label, "Unexpected key for " + query); } sent.join(); - int wait = uri.startsWith("https://") ? 600 : 300; + long wait = uri.startsWith("https://") ? 800 : 350; try (InputStream is = response.body()) { - Thread.sleep(Utils.adjustTimeout(wait)); + sleep(wait); is.readAllBytes(); } // we could fail here if we haven't waited long enough @@ -252,7 +266,9 @@ public class StreamFlowControlTest { var https2TestServer = new Http2TestServer("localhost", true, sslContext); https2TestServer.addHandler(new Http2TestHandler(), "/https2/"); this.https2TestServer = HttpTestServer.of(https2TestServer); + this.https2TestServer.addHandler(new HttpHeadOrGetHandler(), "/https2/head/"); https2URI = "https://" + this.https2TestServer.serverAuthority() + "/https2/x"; + String h2Head = "https://" + this.https2TestServer.serverAuthority() + "/https2/head/z"; // Override the default exchange supplier with a custom one to enable // particular test scenarios @@ -261,6 +277,13 @@ public class StreamFlowControlTest { this.http2TestServer.start(); this.https2TestServer.start(); + + // warmup to eliminate delay due to SSL class loading and initialization. + try (var client = HttpClient.newBuilder().sslContext(sslContext).build()) { + var request = HttpRequest.newBuilder(URI.create(h2Head)).HEAD().build(); + var resp = client.send(request, BodyHandlers.discarding()); + assertEquals(resp.statusCode(), 200); + } } @AfterTest @@ -279,11 +302,19 @@ public class StreamFlowControlTest { OutputStream os = t.getResponseBody()) { byte[] bytes = is.readAllBytes(); - System.out.println("Server " + t.getLocalAddress() + " received:\n" - + t.getRequestURI() + ": " + new String(bytes, StandardCharsets.UTF_8)); + if (bytes.length != 0) { + System.out.println("Server " + t.getLocalAddress() + " received:\n" + + t.getRequestURI() + ": " + new String(bytes, StandardCharsets.UTF_8)); + } else { + System.out.println("No request body for " + t.getRequestMethod()); + } + t.getResponseHeaders().setHeader("X-Connection-Key", t.getConnectionKey()); - if (bytes.length == 0) bytes = "no request body!".getBytes(StandardCharsets.UTF_8); + if (bytes.length == 0) { + bytes = "no request body!" + .repeat(100).getBytes(StandardCharsets.UTF_8); + } int window = Integer.getInteger("jdk.httpclient.windowsize", 2 * 16 * 1024); final int maxChunkSize; if (t instanceof FCHttp2TestExchange fct) { @@ -307,13 +338,22 @@ public class StreamFlowControlTest { // ignore and continue... } } - ((BodyOutputStream) os).writeUncontrolled(resp, 0, resp.length); + try { + ((BodyOutputStream) os).writeUncontrolled(resp, 0, resp.length); + } catch (IOException x) { + if (t instanceof FCHttp2TestExchange fct) { + fct.conn.updateConnectionWindow(resp.length); + } + } + } + } finally { + if (t instanceof FCHttp2TestExchange fct) { + fct.responseSent(query); + } else { + fail("Exchange is not %s but %s" + .formatted(FCHttp2TestExchange.class.getName(), t.getClass().getName())); } } - if (t instanceof FCHttp2TestExchange fct) { - fct.responseSent(query); - } else fail("Exchange is not %s but %s" - .formatted(FCHttp2TestExchange.class.getName(), t.getClass().getName())); } } diff --git a/test/jdk/java/net/httpclient/lib/jdk/httpclient/test/lib/common/HttpServerAdapters.java b/test/jdk/java/net/httpclient/lib/jdk/httpclient/test/lib/common/HttpServerAdapters.java index 27dbe637b94..4b00c0eea7b 100644 --- a/test/jdk/java/net/httpclient/lib/jdk/httpclient/test/lib/common/HttpServerAdapters.java +++ b/test/jdk/java/net/httpclient/lib/jdk/httpclient/test/lib/common/HttpServerAdapters.java @@ -49,6 +49,7 @@ import java.math.BigInteger; import java.net.InetSocketAddress; import java.net.URI; import java.net.http.HttpHeaders; +import java.nio.charset.StandardCharsets; import java.util.Base64; import java.util.List; import java.util.ListIterator; @@ -66,6 +67,9 @@ import java.util.stream.Stream; import javax.net.ssl.SSLContext; +import static java.net.http.HttpClient.Version.HTTP_1_1; +import static java.net.http.HttpClient.Version.HTTP_2; + /** * Defines an adaptation layers so that a test server handlers and filters * can be implemented independently of the underlying server version. @@ -268,9 +272,9 @@ public interface HttpServerAdapters { this.exchange = exch; } @Override - public Version getServerVersion() { return Version.HTTP_1_1; } + public Version getServerVersion() { return HTTP_1_1; } @Override - public Version getExchangeVersion() { return Version.HTTP_1_1; } + public Version getExchangeVersion() { return HTTP_1_1; } @Override public InputStream getRequestBody() { return exchange.getRequestBody(); @@ -330,9 +334,9 @@ public interface HttpServerAdapters { this.exchange = exch; } @Override - public Version getServerVersion() { return Version.HTTP_2; } + public Version getServerVersion() { return HTTP_2; } @Override - public Version getExchangeVersion() { return Version.HTTP_2; } + public Version getExchangeVersion() { return HTTP_2; } @Override public InputStream getRequestBody() { return exchange.getRequestBody(); @@ -421,6 +425,53 @@ public interface HttpServerAdapters { } } + /** + * An {@link HttpTestHandler} that handles only HEAD and GET + * requests. If another method is used 405 is returned with + * an empty body. + * The response is always returned with fixed length. + */ + public static class HttpHeadOrGetHandler implements HttpTestHandler { + final String responseBody; + public HttpHeadOrGetHandler() { + this("pâté de tête persillé"); + } + public HttpHeadOrGetHandler(String responseBody) { + this.responseBody = Objects.requireNonNull(responseBody); + } + + @Override + public void handle(HttpTestExchange t) throws IOException { + try (var exchg = t) { + exchg.getRequestBody().readAllBytes(); + String method = exchg.getRequestMethod(); + switch (method) { + case "HEAD" -> { + byte[] resp = responseBody.getBytes(StandardCharsets.UTF_8); + if (exchg.getExchangeVersion() != HTTP_1_1) { + // with HTTP/2 or HTTP/3 the server will not send content-length + exchg.getResponseHeaders() + .addHeader("Content-Length", String.valueOf(resp.length)); + } + exchg.sendResponseHeaders(200, resp.length); + exchg.getResponseBody().close(); + } + case "GET" -> { + byte[] resp = responseBody.getBytes(StandardCharsets.UTF_8); + exchg.sendResponseHeaders(200, resp.length); + try (var os = exchg.getResponseBody()) { + os.write(resp); + } + } + default -> { + exchg.sendResponseHeaders(405, 0); + exchg.getResponseBody().close(); + } + } + } + } + } + public static class HttpTestEchoHandler implements HttpTestHandler { @Override @@ -877,7 +928,7 @@ public interface HttpServerAdapters { return new InetSocketAddress(InetAddress.getLoopbackAddress(), impl.getAddress().getPort()); } - public Version getVersion() { return Version.HTTP_1_1; } + public Version getVersion() { return HTTP_1_1; } @Override public void setRequestApprover(final Predicate approver) { @@ -902,7 +953,7 @@ public interface HttpServerAdapters { public void setAuthenticator(com.sun.net.httpserver.Authenticator authenticator) { context.setAuthenticator(authenticator); } - @Override public Version getVersion() { return Version.HTTP_1_1; } + @Override public Version getVersion() { return HTTP_1_1; } } private static class Http2TestServerImpl extends HttpTestServer { @@ -933,7 +984,7 @@ public interface HttpServerAdapters { return new InetSocketAddress(InetAddress.getLoopbackAddress(), impl.getAddress().getPort()); } - public Version getVersion() { return Version.HTTP_2; } + public Version getVersion() { return HTTP_2; } @Override public void setRequestApprover(final Predicate approver) { @@ -971,7 +1022,7 @@ public interface HttpServerAdapters { "only BasicAuthenticator is supported on HTTP/2 context"); } } - @Override public Version getVersion() { return Version.HTTP_2; } + @Override public Version getVersion() { return HTTP_2; } } } diff --git a/test/jdk/java/net/httpclient/lib/jdk/httpclient/test/lib/http2/Http2TestServerConnection.java b/test/jdk/java/net/httpclient/lib/jdk/httpclient/test/lib/http2/Http2TestServerConnection.java index 068d2a49e57..fb0ce92cd1d 100644 --- a/test/jdk/java/net/httpclient/lib/jdk/httpclient/test/lib/http2/Http2TestServerConnection.java +++ b/test/jdk/java/net/httpclient/lib/jdk/httpclient/test/lib/http2/Http2TestServerConnection.java @@ -1384,7 +1384,7 @@ public class Http2TestServerConnection { } } - void updateConnectionWindow(int amount) { + public void updateConnectionWindow(int amount) { System.out.printf("sendWindow (window=%s, amount=%s) is now: %s%n", sendWindow, amount, sendWindow + amount); synchronized (this) {