diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/Http2ClientImpl.java b/src/java.net.http/share/classes/jdk/internal/net/http/Http2ClientImpl.java index 022442b5371..92a48d901ff 100644 --- a/src/java.net.http/share/classes/jdk/internal/net/http/Http2ClientImpl.java +++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http2ClientImpl.java @@ -40,6 +40,8 @@ import jdk.internal.net.http.common.Logger; import jdk.internal.net.http.common.MinimalFuture; import jdk.internal.net.http.common.Utils; import jdk.internal.net.http.frame.SettingsFrame; + +import static jdk.internal.net.http.frame.SettingsFrame.INITIAL_CONNECTION_WINDOW_SIZE; import static jdk.internal.net.http.frame.SettingsFrame.INITIAL_WINDOW_SIZE; import static jdk.internal.net.http.frame.SettingsFrame.ENABLE_PUSH; import static jdk.internal.net.http.frame.SettingsFrame.HEADER_TABLE_SIZE; @@ -291,9 +293,13 @@ class Http2ClientImpl { // and the connection window size. int defaultValue = Math.max(streamWindow, K*K*32); + // The min value is the max between the streamWindow and + // the initial connection window size + int minValue = Math.max(INITIAL_CONNECTION_WINDOW_SIZE, streamWindow); + return getParameter( "jdk.httpclient.connectionWindowSize", - streamWindow, Integer.MAX_VALUE, defaultValue); + minValue, Integer.MAX_VALUE, defaultValue); } /** diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java b/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java index 080905222c3..2e6b4204984 100644 --- a/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java +++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java @@ -92,6 +92,7 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static jdk.internal.net.http.frame.SettingsFrame.DEFAULT_INITIAL_WINDOW_SIZE; import static jdk.internal.net.http.frame.SettingsFrame.ENABLE_PUSH; import static jdk.internal.net.http.frame.SettingsFrame.HEADER_TABLE_SIZE; +import static jdk.internal.net.http.frame.SettingsFrame.INITIAL_CONNECTION_WINDOW_SIZE; import static jdk.internal.net.http.frame.SettingsFrame.INITIAL_WINDOW_SIZE; import static jdk.internal.net.http.frame.SettingsFrame.MAX_CONCURRENT_STREAMS; import static jdk.internal.net.http.frame.SettingsFrame.MAX_FRAME_SIZE; @@ -1080,6 +1081,34 @@ class Http2Connection { return null; } + // This method is called when a DataFrame that was added + // to a Stream::inputQ is later dropped from the queue + // without being consumed. + // + // Before adding a frame to the queue, the Stream calls + // connection.windowUpdater.canBufferUnprocessedBytes(), which + // increases the count of unprocessed bytes in the connection. + // After consuming the frame, it calls connection.windowUpdater::processed, + // which decrements the count of unprocessed bytes, and possibly + // sends a window update to the peer. + // + // This method is called when connection.windowUpdater::processed + // will not be called, which can happen when consuming the frame + // fails, or when an empty DataFrame terminates the stream, + // or when the stream is cancelled while data is still + // sitting in its inputQ. In the later case, it is called for + // each frame that is dropped from the queue. + final void releaseUnconsumed(DataFrame df) { + windowUpdater.released(df.payloadLength()); + dropDataFrame(df); + } + + // This method can be called directly when a DataFrame is dropped + // before/without having been added to any Stream::inputQ. + // In that case, the number of unprocessed bytes hasn't been incremented + // by the stream, and does not need to be decremented. + // Otherwise, if the frame is dropped after having been added to the + // inputQ, releaseUnconsumed above should be called. final void dropDataFrame(DataFrame df) { if (isMarked(closedState, SHUTDOWN_REQUESTED)) return; if (debug.on()) { @@ -1465,11 +1494,12 @@ class Http2Connection { // Note that the default initial window size, not to be confused // with the initial window size, is defined by RFC 7540 as // 64K -1. - final int len = windowUpdater.initialWindowSize - DEFAULT_INITIAL_WINDOW_SIZE; - if (len != 0) { + final int len = windowUpdater.initialWindowSize - INITIAL_CONNECTION_WINDOW_SIZE; + assert len >= 0; + if (len > 0) { if (Log.channel()) { Log.logChannel("Sending initial connection window update frame: {0} ({1} - {2})", - len, windowUpdater.initialWindowSize, DEFAULT_INITIAL_WINDOW_SIZE); + len, windowUpdater.initialWindowSize, INITIAL_CONNECTION_WINDOW_SIZE); } windowUpdater.sendWindowUpdate(len); } @@ -1924,6 +1954,19 @@ class Http2Connection { int getStreamId() { return 0; } + + @Override + protected boolean windowSizeExceeded(long received) { + if (connection.isOpen()) { + try { + connection.protocolError(ErrorFrame.FLOW_CONTROL_ERROR, + "connection window exceeded"); + } catch (IOException io) { + connection.shutdown(io); + } + } + return true; + } } /** diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java b/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java index dc41c3ab515..bbb63718dc6 100644 --- a/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java +++ b/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java @@ -160,14 +160,13 @@ class Stream extends ExchangeImpl { // send lock: prevent sending DataFrames after reset occurred. private final Lock sendLock = new ReentrantLock(); private final Lock stateLock = new ReentrantLock(); - /** * A reference to this Stream's connection Send Window controller. The * stream MUST acquire the appropriate amount of Send Window before * sending any data. Will be null for PushStreams, as they cannot send data. */ private final WindowController windowController; - private final WindowUpdateSender windowUpdater; + private final WindowUpdateSender streamWindowUpdater; // Only accessed in all method calls from incoming(), no need for volatile private boolean endStreamSeen; @@ -217,7 +216,8 @@ class Stream extends ExchangeImpl { int size = Utils.remaining(dsts, Integer.MAX_VALUE); if (size == 0 && finished) { inputQ.remove(); - connection.ensureWindowUpdated(df); // must update connection window + // consumed will not be called + connection.releaseUnconsumed(df); // must update connection window Log.logTrace("responseSubscriber.onComplete"); if (debug.on()) debug.log("incoming: onComplete"); connection.decrementStreamsCount(streamid); @@ -232,7 +232,11 @@ class Stream extends ExchangeImpl { try { subscriber.onNext(dsts); } catch (Throwable t) { - connection.dropDataFrame(df); // must update connection window + // Data frames that have been added to the inputQ + // must be released using releaseUnconsumed() to + // account for the amount of unprocessed bytes + // tracked by the connection.windowUpdater. + connection.releaseUnconsumed(df); throw t; } if (consumed(df)) { @@ -283,8 +287,12 @@ class Stream extends ExchangeImpl { private void drainInputQueue() { Http2Frame frame; while ((frame = inputQ.poll()) != null) { - if (frame instanceof DataFrame) { - connection.dropDataFrame((DataFrame)frame); + if (frame instanceof DataFrame df) { + // Data frames that have been added to the inputQ + // must be released using releaseUnconsumed() to + // account for the amount of unprocessed bytes + // tracked by the connection.windowUpdater. + connection.releaseUnconsumed(df); } } } @@ -310,12 +318,13 @@ class Stream extends ExchangeImpl { boolean endStream = df.getFlag(DataFrame.END_STREAM); if (len == 0) return endStream; - connection.windowUpdater.update(len); - + connection.windowUpdater.processed(len); if (!endStream) { + streamWindowUpdater.processed(len); + } else { // Don't send window update on a stream which is // closed or half closed. - windowUpdater.update(len); + streamWindowUpdater.released(len); } // true: end of stream; false: more data coming @@ -385,8 +394,21 @@ class Stream extends ExchangeImpl { } private void receiveDataFrame(DataFrame df) { - inputQ.add(df); - sched.runOrSchedule(); + try { + int len = df.payloadLength(); + if (len > 0) { + // we return from here if the connection is being closed. + if (!connection.windowUpdater.canBufferUnprocessedBytes(len)) return; + // we return from here if the stream is being closed. + if (closed || !streamWindowUpdater.canBufferUnprocessedBytes(len)) { + connection.releaseUnconsumed(df); + return; + } + } + inputQ.add(df); + } finally { + sched.runOrSchedule(); + } } /** Handles a RESET frame. RESET is always handled inline in the queue. */ @@ -470,7 +492,7 @@ class Stream extends ExchangeImpl { this.responseHeadersBuilder = new HttpHeadersBuilder(); this.rspHeadersConsumer = new HeadersConsumer(); this.requestPseudoHeaders = createPseudoHeaders(request); - this.windowUpdater = new StreamWindowUpdateSender(connection); + this.streamWindowUpdater = new StreamWindowUpdateSender(connection); } private boolean checkRequestCancelled() { @@ -506,6 +528,8 @@ class Stream extends ExchangeImpl { if (debug.on()) { debug.log("request cancelled or stream closed: dropping data frame"); } + // Data frames that have not been added to the inputQ + // can be released using dropDataFrame connection.dropDataFrame(df); } else { receiveDataFrame(df); @@ -1427,12 +1451,18 @@ class Stream extends ExchangeImpl { @Override void onProtocolError(final IOException cause) { + onProtocolError(cause, ResetFrame.PROTOCOL_ERROR); + } + + void onProtocolError(final IOException cause, int code) { if (debug.on()) { - debug.log("cancelling exchange on stream %d due to protocol error: %s", streamid, cause.getMessage()); + debug.log("cancelling exchange on stream %d due to protocol error [%s]: %s", + streamid, ErrorFrame.stringForCode(code), + cause.getMessage()); } Log.logError("cancelling exchange on stream {0} due to protocol error: {1}\n", streamid, cause); // send a RESET frame and close the stream - cancelImpl(cause, ResetFrame.PROTOCOL_ERROR); + cancelImpl(cause, code); } void connectionClosing(Throwable cause) { @@ -1736,6 +1766,13 @@ class Stream extends ExchangeImpl { return dbgString = dbg; } } + + @Override + protected boolean windowSizeExceeded(long received) { + onProtocolError(new ProtocolException("stream %s flow control window exceeded" + .formatted(streamid)), ResetFrame.FLOW_CONTROL_ERROR); + return true; + } } /** diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/WindowUpdateSender.java b/src/java.net.http/share/classes/jdk/internal/net/http/WindowUpdateSender.java index 0bccc24e498..0affadddf15 100644 --- a/src/java.net.http/share/classes/jdk/internal/net/http/WindowUpdateSender.java +++ b/src/java.net.http/share/classes/jdk/internal/net/http/WindowUpdateSender.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2015, 2023, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2015, 2024, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -31,16 +31,31 @@ import jdk.internal.net.http.frame.SettingsFrame; import jdk.internal.net.http.frame.WindowUpdateFrame; import jdk.internal.net.http.common.Utils; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; +/** + * A class that tracks the amount of flow controlled + * data received on an HTTP/2 connection + */ abstract class WindowUpdateSender { final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG); + // The threshold at which window updates are sent in bytes final int limit; + // The flow control window in bytes + final int windowSize; final Http2Connection connection; - final AtomicInteger received = new AtomicInteger(); + // The amount of flow controlled data received and processed, in bytes, + // since the start of the window. + // The window is exhausted when received + unprocessed >= windowSize + final AtomicLong received = new AtomicLong(); + // The amount of flow controlled data received and unprocessed, in bytes, + // since the start of the window. + // The window is exhausted when received + unprocessed >= windowSize + final AtomicLong unprocessed = new AtomicLong(); final ReentrantLock sendLock = new ReentrantLock(); WindowUpdateSender(Http2Connection connection) { @@ -53,6 +68,7 @@ abstract class WindowUpdateSender { WindowUpdateSender(Http2Connection connection, int maxFrameSize, int initWindowSize) { this.connection = connection; + this.windowSize = initWindowSize; int v0 = Math.max(0, initWindowSize - maxFrameSize); int v1 = (initWindowSize + (maxFrameSize - 1)) / maxFrameSize; v1 = v1 * maxFrameSize / 2; @@ -66,16 +82,119 @@ abstract class WindowUpdateSender { maxFrameSize, initWindowSize, limit); } + // O for the connection window, > 0 for a stream window abstract int getStreamId(); + + /** + * {@return {@code true} if buffering the given amount of + * flow controlled data would not exceed the flow control + * window} + *

+ * This method is called before buffering and processing + * a DataFrame. The count of unprocessed bytes is incremented + * by the given amount, and checked against the number of + * available bytes in the flow control window. + *

+ * This method returns {@code true} if the bytes can be buffered + * without exceeding the flow control window, {@code false} + * if the flow control window is exceeded and corrective + * action (close/reset) has been taken. + *

+ * When this method returns true, either {@link #processed(int)} + * or {@link #released(int)} must eventually be called to release + * the bytes from the flow control window. + * + * @implSpec + * an HTTP/2 endpoint may disable its own flow control + * (see + * RFC 9113, section 5.2.1), in which case this + * method may return true even if the flow control window would + * be exceeded: that is, the flow control window is exceeded but + * the endpoint decided to take no corrective action. + * + * @param len a number of unprocessed bytes, which + * the caller wants to buffer. + */ + boolean canBufferUnprocessedBytes(int len) { + return !checkWindowSizeExceeded(unprocessed.addAndGet(len)); + } + + // adds the provided amount to the amount of already + // received and processed bytes and checks whether the + // flow control window is exceeded. If so, take + // corrective actions and return true. + private boolean checkWindowSizeExceeded(long len) { + // because windowSize is bound by Integer.MAX_VALUE + // we will never reach the point where received.get() + len + // could overflow + long rcv = received.get() + len; + return rcv > windowSize && windowSizeExceeded(rcv); + } + + /** + * Called after unprocessed buffered bytes have been + * processed, to release part of the flow control window + * + * @apiNote this method is called only when releasing bytes + * that where buffered after calling + * {@link #canBufferUnprocessedBytes(int)}. + * + * @param delta the amount of processed bytes to release + */ + void processed(int delta) { + long rest = unprocessed.addAndGet(-delta); + assert rest >= 0; + update(delta); + } + + /** + * Called when it is desired to release unprocessed bytes + * without processing them, or without triggering the + * sending of a window update. This method can be called + * instead of calling {@link #processed(int)}. + * When this method is called instead of calling {@link #processed(int)}, + * it should generally be followed by a call to {@link #update(int)}, + * unless the stream or connection is being closed. + * + * @apiNote this method should only be called to release bytes that + * have been buffered after calling {@link + * #canBufferUnprocessedBytes(int)}. + * + * @param delta the amount of bytes to release from the window + * + * @return the amount of remaining unprocessed bytes + */ + long released(int delta) { + long rest = unprocessed.addAndGet(-delta); + assert rest >= 0; + return rest; + } + + /** + * This method is called to update the flow control window, + * and possibly send a window update + * + * @apiNote this method can be called directly if a frame is + * dropped before calling {@link #canBufferUnprocessedBytes(int)}. + * Otherwise, either {@link #processed(int)} or {@link #released(int)} + * should be called, depending on whether sending a window update + * is desired or not. It is typically not desired to send an update + * if the stream or connection is being closed. + * + * @param delta the amount of bytes released from the window. + */ void update(int delta) { - int rcv = received.addAndGet(delta); + long rcv = received.addAndGet(delta); if (debug.on()) debug.log("update: %d, received: %d, limit: %d", delta, rcv, limit); + if (rcv > windowSize && windowSizeExceeded(rcv)) { + return; + } if (rcv > limit) { sendLock.lock(); try { - int tosend = received.get(); - if( tosend > limit) { + int tosend = (int)Math.min(received.get(), Integer.MAX_VALUE); + if (tosend > limit) { received.getAndAdd(-tosend); sendWindowUpdate(tosend); } @@ -87,6 +206,7 @@ abstract class WindowUpdateSender { void sendWindowUpdate(int delta) { if (debug.on()) debug.log("sending window update: %d", delta); + assert delta > 0 : "illegal window update delta: " + delta; connection.sendUnorderedFrame(new WindowUpdateFrame(getStreamId(), delta)); } @@ -104,4 +224,16 @@ abstract class WindowUpdateSender { } } + /** + * Called when the flow control window size is exceeded + * This method may return false if flow control is disabled + * in this endpoint. + * + * @param received the amount of data received, which is greater + * than {@code windowSize} + * @return {@code true} if the error was reported to the peer + * and no further window update should be sent. + */ + protected abstract boolean windowSizeExceeded(long received); + } diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/frame/FramesDecoder.java b/src/java.net.http/share/classes/jdk/internal/net/http/frame/FramesDecoder.java index 72c7750eb43..7ebfa090830 100644 --- a/src/java.net.http/share/classes/jdk/internal/net/http/frame/FramesDecoder.java +++ b/src/java.net.http/share/classes/jdk/internal/net/http/frame/FramesDecoder.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2015, 2024, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -463,6 +463,16 @@ public class FramesDecoder { int val = getInt(); if (id > 0 && id <= SettingsFrame.MAX_PARAM) { // a known parameter. Ignore otherwise + if (id == SettingsFrame.INITIAL_WINDOW_SIZE && val < 0) { + return new MalformedFrame(ErrorFrame.FLOW_CONTROL_ERROR, + "SettingsFrame with INITIAL_WINDOW_SIZE > 2^31 -1: " + + (val & 0xffffffffL)); + } + if (id == SettingsFrame.MAX_FRAME_SIZE && (val < 16384 || val > 16777215)) { + return new MalformedFrame(ErrorFrame.PROTOCOL_ERROR, + "SettingsFrame with MAX_FRAME_SIZE out of range: " + + (val & 0xffffffffL)); + } sf.setParameter(id, val); // TODO parameters validation } } @@ -530,7 +540,12 @@ public class FramesDecoder { return new MalformedFrame(ErrorFrame.FRAME_SIZE_ERROR, "WindowUpdateFrame length is "+ frameLength+", expected 4"); } - return new WindowUpdateFrame(streamid, getInt() & 0x7fffffff); + int update = getInt(); + if (update < 0) { + return new MalformedFrame(ErrorFrame.FLOW_CONTROL_ERROR, + "WindowUpdateFrame with value > 2^31 -1 " + (update & 0xffffffffL)); + } + return new WindowUpdateFrame(streamid, update & 0x7fffffff); } private Http2Frame parseContinuationFrame(int frameLength, int streamid, int flags) { diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/frame/SettingsFrame.java b/src/java.net.http/share/classes/jdk/internal/net/http/frame/SettingsFrame.java index 8a011959417..b3b8164598f 100644 --- a/src/java.net.http/share/classes/jdk/internal/net/http/frame/SettingsFrame.java +++ b/src/java.net.http/share/classes/jdk/internal/net/http/frame/SettingsFrame.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2015, 2021, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2015, 2024, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -166,6 +166,11 @@ public class SettingsFrame extends Http2Frame { // The initial value is 2^14 (16,384) octets. public static final int DEFAULT_MAX_FRAME_SIZE = 16 * K; + // Initial connection window size. This cannot be updated using the + // SETTINGS frame. + public static final int INITIAL_CONNECTION_WINDOW_SIZE = DEFAULT_INITIAL_WINDOW_SIZE; + + public static SettingsFrame defaultRFCSettings() { SettingsFrame f = new SettingsFrame(); f.setParameter(ENABLE_PUSH, DEFAULT_ENABLE_PUSH); diff --git a/src/java.net.http/share/classes/module-info.java b/src/java.net.http/share/classes/module-info.java index 5303e818866..c95d80657b8 100644 --- a/src/java.net.http/share/classes/module-info.java +++ b/src/java.net.http/share/classes/module-info.java @@ -57,9 +57,11 @@ * means that the cache is unbounded. * *

  • {@systemProperty jdk.httpclient.connectionWindowSize} (default: 2^26)
    - * The HTTP/2 client connection window size in bytes. The maximum size is 2^31-1. This value - * cannot be smaller than the stream window size, which can be configured through the - * {@code jdk.httpclient.windowsize} system property. + * The HTTP/2 client connection window size in bytes. Valid values are in the range + * [2^16-1, 2^31-1]. If an invalid value is provided, the default value is used. + * The implementation guarantees that the actual value will be no smaller than the stream + * window size, which can be configured through the {@code jdk.httpclient.windowsize} + * system property. *

  • *
  • {@systemProperty jdk.httpclient.disableRetryConnect} (default: false)
    * Whether automatic retry of connection failures is disabled. If false, then retries are @@ -150,7 +152,8 @@ * or 16kB)
    The buffer size used by the web socket implementation for socket writes. *

  • *
  • {@systemProperty jdk.httpclient.windowsize} (default: 16777216 or 16 MB)
    - * The HTTP/2 client stream window size in bytes. + * The HTTP/2 client stream window size in bytes. Valid values are in the range [2^14, 2^31-1]. + * If an invalid value is provided, the default value is used. *

  • *
  • {@systemProperty jdk.httpclient.auth.retrylimit} (default: 3)
    * The number of attempts the Basic authentication filter will attempt to retry a failed diff --git a/test/jdk/java/net/httpclient/GZIPInputStreamTest.java b/test/jdk/java/net/httpclient/GZIPInputStreamTest.java index e2a382dfc5e..7db41cde5d6 100644 --- a/test/jdk/java/net/httpclient/GZIPInputStreamTest.java +++ b/test/jdk/java/net/httpclient/GZIPInputStreamTest.java @@ -26,7 +26,7 @@ * @bug 8217264 * @summary Tests that you can map an InputStream to a GZIPInputStream * @library /test/lib /test/jdk/java/net/httpclient/lib - * @build jdk.test.lib.net.SimpleSSLContext jdk.httpclient.test.lib.common.HttpServerAdapters + * @build jdk.test.lib.net.SimpleSSLContext jdk.httpclient.test.lib.common.HttpServerAdapters ReferenceTracker * @run testng/othervm GZIPInputStreamTest */ diff --git a/test/jdk/java/net/httpclient/ProxySelectorTest.java b/test/jdk/java/net/httpclient/ProxySelectorTest.java index fd8f85fa6d7..a5407801bcd 100644 --- a/test/jdk/java/net/httpclient/ProxySelectorTest.java +++ b/test/jdk/java/net/httpclient/ProxySelectorTest.java @@ -376,7 +376,7 @@ public class ProxySelectorTest implements HttpServerAdapters { public void teardown() throws Exception { client = null; Thread.sleep(100); - AssertionError fail = TRACKER.check(500); + AssertionError fail = TRACKER.check(1500); try { proxy.stop(); authproxy.stop(); diff --git a/test/jdk/java/net/httpclient/http2/ConnectionFlowControlTest.java b/test/jdk/java/net/httpclient/http2/ConnectionFlowControlTest.java new file mode 100644 index 00000000000..6b0b3727ee2 --- /dev/null +++ b/test/jdk/java/net/httpclient/http2/ConnectionFlowControlTest.java @@ -0,0 +1,365 @@ +/* + * Copyright (c) 2024, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +/* + * @test + * @bug 8342075 + * @summary checks connection flow control + * @library /test/lib /test/jdk/java/net/httpclient/lib + * @build jdk.httpclient.test.lib.http2.Http2TestServer jdk.test.lib.net.SimpleSSLContext + * @run testng/othervm -Djdk.internal.httpclient.debug=true + * -Djdk.httpclient.connectionWindowSize=65535 + * -Djdk.httpclient.windowsize=16384 + * ConnectionFlowControlTest + */ + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.ProtocolException; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpHeaders; +import java.net.http.HttpRequest; +import java.net.http.HttpRequest.BodyPublishers; +import java.net.http.HttpResponse; +import java.net.http.HttpResponse.BodyHandler; +import java.net.http.HttpResponse.BodyHandlers; +import java.net.http.HttpResponse.BodySubscriber; +import java.net.http.HttpResponse.ResponseInfo; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map.Entry; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Flow.Subscription; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLSession; + +import jdk.httpclient.test.lib.common.HttpServerAdapters.HttpTestServer; +import jdk.httpclient.test.lib.http2.BodyOutputStream; +import jdk.httpclient.test.lib.http2.Http2Handler; +import jdk.httpclient.test.lib.http2.Http2TestExchange; +import jdk.httpclient.test.lib.http2.Http2TestExchangeImpl; +import jdk.httpclient.test.lib.http2.Http2TestServer; +import jdk.httpclient.test.lib.http2.Http2TestServerConnection; +import jdk.internal.net.http.common.HttpHeadersBuilder; +import jdk.internal.net.http.frame.ContinuationFrame; +import jdk.internal.net.http.frame.HeaderFrame; +import jdk.internal.net.http.frame.HeadersFrame; +import jdk.internal.net.http.frame.Http2Frame; +import jdk.internal.net.http.frame.SettingsFrame; +import jdk.test.lib.Utils; +import jdk.test.lib.net.SimpleSSLContext; +import org.testng.annotations.AfterTest; +import org.testng.annotations.BeforeTest; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import static java.util.List.of; +import static java.util.Map.entry; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotEquals; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; + +public class ConnectionFlowControlTest { + + SSLContext sslContext; + HttpTestServer http2TestServer; // HTTP/2 ( h2c ) + HttpTestServer https2TestServer; // HTTP/2 ( h2 ) + String http2URI; + String https2URI; + final AtomicInteger reqid = new AtomicInteger(); + + + @DataProvider(name = "variants") + public Object[][] variants() { + return new Object[][] { + { http2URI }, + { https2URI }, + }; + } + + @Test(dataProvider = "variants") + void test(String uri) throws Exception { + System.out.printf("%ntesting %s%n", uri); + ConcurrentHashMap> responseSent = new ConcurrentHashMap<>(); + ConcurrentHashMap> responses = new ConcurrentHashMap<>(); + FCHttp2TestExchange.setResponseSentCB((s) -> responseSent.get(s).complete(s)); + int connectionWindowSize = Math.max(Integer.getInteger( + "jdk.httpclient.connectionWindowSize", 65535), 65535); + int windowSize = Math.max(Integer.getInteger( + "jdk.httpclient.windowsize", 65535), 16384); + int max = connectionWindowSize / windowSize + 2; + System.out.printf("connection window: %s, stream window: %s, will make %s requests%n", + connectionWindowSize, windowSize, max); + + try (HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build()) { + String label = null; + + Throwable t = null; + try { + String[] keys = new String[max]; + for (int i = 0; i < max; i++) { + String query = "reqId=" + reqid.incrementAndGet(); + keys[i] = query; + URI uriWithQuery = URI.create(uri + "?" + query); + CompletableFuture sent = new CompletableFuture<>(); + responseSent.put(query, sent); + HttpRequest request = HttpRequest.newBuilder(uriWithQuery) + .POST(BodyPublishers.ofString("Hello there!")) + .build(); + System.out.println("\nSending request:" + uriWithQuery); + final HttpClient cc = client; + var response = cc.send(request, BodyHandlers.ofInputStream()); + responses.put(query, response); + String ckey = response.headers().firstValue("X-Connection-Key").get(); + if (label == null) label = ckey; + try { + if (i < max - 1) { + // the connection window might be exceeded at i == max - 2, which + // means that the last request could go on a new connection. + assertEquals(ckey, label, "Unexpected key for " + query); + } + } catch (AssertionError ass) { + // since we won't pull all responses, the client + // will not exit unless we ask it to shutdown now. + client.shutdownNow(); + throw ass; + } + } + try { + Thread.sleep(1000); + } catch (InterruptedException ie) { + // ignore + } + CompletableFuture allsent = CompletableFuture.allOf(responseSent.values().stream() + .toArray(CompletableFuture[]::new)); + allsent.get(); + for (int i = 0; i < max; i++) { + try { + String query = keys[i]; + var response = responses.get(keys[i]); + String ckey = response.headers().firstValue("X-Connection-Key").get(); + if (label == null) label = ckey; + assertEquals(ckey, label, "Unexpected key for " + query); + int wait = uri.startsWith("https://") ? 500 : 250; + try (InputStream is = response.body()) { + Thread.sleep(Utils.adjustTimeout(wait)); + is.readAllBytes(); + } + System.out.printf("%s did not fail: %s%n", query, response.statusCode()); + } catch (AssertionError t1) { + // since we won't pull all responses, the client + // will not exit unless we ask it to shutdown now. + client.shutdownNow(); + throw t1; + } catch (Throwable t0) { + System.out.println("Got EXPECTED: " + t0); + if (t0 instanceof ExecutionException) { + t0 = t0.getCause(); + } + t = t0; + try { + assertDetailMessage(t0, i); + } catch (AssertionError e) { + // since we won't pull all responses, the client + // will not exit unless we ask it to shutdown now. + client.shutdownNow(); + throw e; + } + } + } + } catch (Throwable t0) { + System.out.println("Got EXPECTED: " + t0); + if (t0 instanceof ExecutionException) { + t0 = t0.getCause(); + } + t = t0; + } + if (t == null) { + // we could fail here if we haven't waited long enough + fail("Expected exception, got all responses, should sleep time be raised?"); + } else { + assertDetailMessage(t, max); + } + String query = "reqId=" + reqid.incrementAndGet(); + URI uriWithQuery = URI.create(uri + "?" + query); + CompletableFuture sent = new CompletableFuture<>(); + responseSent.put(query, sent); + HttpRequest request = HttpRequest.newBuilder(uriWithQuery) + .POST(BodyPublishers.ofString("Hello there!")) + .build(); + System.out.println("\nSending last request:" + uriWithQuery); + var response = client.send(request, BodyHandlers.ofString()); + if (label != null) { + String ckey = response.headers().firstValue("X-Connection-Key").get(); + assertNotEquals(ckey, label); + System.out.printf("last request %s sent on different connection as expected:" + + "\n\tlast: %s\n\tprevious: %s%n", query, ckey, label); + } + } + } + + // Assertions based on implementation specific detail messages. Keep in + // sync with implementation. + static void assertDetailMessage(Throwable throwable, int iterationIndex) { + try { + Throwable cause = throwable; + while (cause != null) { + if (cause instanceof ProtocolException) { + if (cause.getMessage().contains("connection window exceeded")) { + System.out.println("Found expected exception: " + cause); + return; + } + } + cause = cause.getCause(); + } + throw new AssertionError( + "ProtocolException(\"protocol error: connection window exceeded\") not found", + throwable); + } catch (AssertionError e) { + System.out.println("Exception does not match expectation: " + throwable); + throwable.printStackTrace(System.out); + throw e; + } + } + + @BeforeTest + public void setup() throws Exception { + sslContext = new SimpleSSLContext().get(); + if (sslContext == null) + throw new AssertionError("Unexpected null sslContext"); + + var http2TestServer = new Http2TestServer("localhost", false, 0); + http2TestServer.addHandler(new Http2TestHandler(), "/http2/"); + this.http2TestServer = HttpTestServer.of(http2TestServer); + http2URI = "http://" + this.http2TestServer.serverAuthority() + "/http2/x"; + + var https2TestServer = new Http2TestServer("localhost", true, sslContext); + https2TestServer.addHandler(new Http2TestHandler(), "/https2/"); + this.https2TestServer = HttpTestServer.of(https2TestServer); + https2URI = "https://" + this.https2TestServer.serverAuthority() + "/https2/x"; + + // Override the default exchange supplier with a custom one to enable + // particular test scenarios + http2TestServer.setExchangeSupplier(FCHttp2TestExchange::new); + https2TestServer.setExchangeSupplier(FCHttp2TestExchange::new); + + this.http2TestServer.start(); + this.https2TestServer.start(); + } + + @AfterTest + public void teardown() throws Exception { + http2TestServer.stop(); + https2TestServer.stop(); + } + + static class Http2TestHandler implements Http2Handler { + + @Override + public void handle(Http2TestExchange t) throws IOException { + String query = t.getRequestURI().getRawQuery(); + + try (InputStream is = t.getRequestBody(); + OutputStream os = t.getResponseBody()) { + + byte[] bytes = is.readAllBytes(); + System.out.println("Server " + t.getLocalAddress() + " received:\n" + + t.getRequestURI() + ": " + new String(bytes, StandardCharsets.UTF_8)); + t.getResponseHeaders().setHeader("X-Connection-Key", t.getConnectionKey()); + + if (bytes.length == 0) bytes = "no request body!".getBytes(StandardCharsets.UTF_8); + int window = Math.max(16384, Integer.getInteger("jdk.httpclient.windowsize", 2*16*1024)); + final int maxChunkSize; + if (t instanceof FCHttp2TestExchange fct) { + maxChunkSize = Math.min(window, fct.conn.getMaxFrameSize()); + } else { + maxChunkSize = Math.min(window, SettingsFrame.MAX_FRAME_SIZE); + } + byte[] resp = bytes.length < maxChunkSize + ? bytes + : Arrays.copyOfRange(bytes, 0, maxChunkSize); + int max = (window / resp.length); + // send in chunks + t.sendResponseHeaders(200, 0); + int sent = 0; + for (int i=0; i<=max; i++) { + int len = Math.min(resp.length, window - sent); + if (len <= 0) break; + if (os instanceof BodyOutputStream bos) { + try { + // we don't wait for the stream window, but we want + // to wait for the connection window + bos.waitForStreamWindow(len); + } catch (InterruptedException ie) { + // ignore and continue... + } + } + ((BodyOutputStream) os).writeUncontrolled(resp, 0, len); + sent += len; + } + if (sent != window) fail("should have sent %s, sent %s".formatted(window, sent)); + } + if (t instanceof FCHttp2TestExchange fct) { + fct.responseSent(query); + } else { + fail("Exchange is not %s but %s" + .formatted(FCHttp2TestExchange.class.getName(), t.getClass().getName())); + } + } + } + + // A custom Http2TestExchangeImpl that overrides sendResponseHeaders to + // allow headers to be sent with a number of CONTINUATION frames. + static class FCHttp2TestExchange extends Http2TestExchangeImpl { + static volatile Consumer responseSentCB; + static void setResponseSentCB(Consumer responseSentCB) { + FCHttp2TestExchange.responseSentCB = responseSentCB; + } + + final Http2TestServerConnection conn; + FCHttp2TestExchange(int streamid, String method, HttpHeaders reqheaders, + HttpHeadersBuilder rspheadersBuilder, URI uri, InputStream is, + SSLSession sslSession, BodyOutputStream os, + Http2TestServerConnection conn, boolean pushAllowed) { + super(streamid, method, reqheaders, rspheadersBuilder, uri, is, sslSession, os, conn, pushAllowed); + this.conn = conn; + } + public void responseSent(String query) { + System.out.println("Server: response sent for " + query); + responseSentCB.accept(query); + } + + } +} diff --git a/test/jdk/java/net/httpclient/http2/StreamFlowControlTest.java b/test/jdk/java/net/httpclient/http2/StreamFlowControlTest.java new file mode 100644 index 00000000000..36b727e3a22 --- /dev/null +++ b/test/jdk/java/net/httpclient/http2/StreamFlowControlTest.java @@ -0,0 +1,342 @@ +/* + * Copyright (c) 2024, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +/* + * @test + * @bug 8342075 + * @library /test/lib /test/jdk/java/net/httpclient/lib + * @build jdk.httpclient.test.lib.http2.Http2TestServer jdk.test.lib.net.SimpleSSLContext + * @run testng/othervm -Djdk.internal.httpclient.debug=true + * -Djdk.httpclient.connectionWindowSize=65535 + * -Djdk.httpclient.windowsize=16384 + * StreamFlowControlTest + */ + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.ProtocolException; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpHeaders; +import java.net.http.HttpRequest; +import java.net.http.HttpRequest.BodyPublishers; +import java.net.http.HttpResponse; +import java.net.http.HttpResponse.BodyHandlers; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLSession; + +import jdk.httpclient.test.lib.common.HttpServerAdapters.HttpTestServer; +import jdk.httpclient.test.lib.http2.BodyOutputStream; +import jdk.httpclient.test.lib.http2.Http2Handler; +import jdk.httpclient.test.lib.http2.Http2TestExchange; +import jdk.httpclient.test.lib.http2.Http2TestExchangeImpl; +import jdk.httpclient.test.lib.http2.Http2TestServer; +import jdk.httpclient.test.lib.http2.Http2TestServerConnection; +import jdk.internal.net.http.common.HttpHeadersBuilder; +import jdk.internal.net.http.frame.SettingsFrame; +import jdk.test.lib.Utils; +import jdk.test.lib.net.SimpleSSLContext; +import org.testng.annotations.AfterTest; +import org.testng.annotations.BeforeTest; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.fail; + +public class StreamFlowControlTest { + + SSLContext sslContext; + HttpTestServer http2TestServer; // HTTP/2 ( h2c ) + HttpTestServer https2TestServer; // HTTP/2 ( h2 ) + String http2URI; + String https2URI; + final AtomicInteger reqid = new AtomicInteger(); + + + @DataProvider(name = "variants") + public Object[][] variants() { + return new Object[][] { + { http2URI, false }, + { https2URI, false }, + { http2URI, true }, + { https2URI, true }, + }; + } + + + @Test(dataProvider = "variants") + void test(String uri, + boolean sameClient) + throws Exception + { + System.out.printf("%ntesting test(%s, %s)%n", uri, sameClient); + ConcurrentHashMap> responseSent = new ConcurrentHashMap<>(); + FCHttp2TestExchange.setResponseSentCB((s) -> responseSent.get(s).complete(s)); + + HttpClient client = null; + try { + int max = sameClient ? 10 : 3; + String label = null; + for (int i = 0; i < max; i++) { + if (!sameClient || client == null) + client = HttpClient.newBuilder().sslContext(sslContext).build(); + + String query = "reqId=" + reqid.incrementAndGet(); + URI uriWithQuery = URI.create(uri + "?" + query); + CompletableFuture sent = new CompletableFuture<>(); + responseSent.put(query, sent); + HttpRequest request = HttpRequest.newBuilder(uriWithQuery) + .POST(BodyPublishers.ofString("Hello there!")) + .build(); + System.out.println("\nSending request:" + uriWithQuery); + final HttpClient cc = client; + try { + HttpResponse response = cc.send(request, BodyHandlers.ofInputStream()); + if (sameClient) { + String key = response.headers().firstValue("X-Connection-Key").get(); + if (label == null) label = key; + assertEquals(key, label, "Unexpected key for " + query); + } + sent.join(); + // we have to pull to get the exception, but slow enough + // so that DataFrames are buffered up to the point that + // the window is exceeded... + int wait = uri.startsWith("https://") ? 500 : 350; + try (InputStream is = response.body()) { + Thread.sleep(Utils.adjustTimeout(wait)); + is.readAllBytes(); + } + // we could fail here if we haven't waited long enough + fail("Expected exception, got :" + response + ", should sleep time be raised?"); + } catch (IOException ioe) { + System.out.println("Got EXPECTED: " + ioe); + assertDetailMessage(ioe, i); + } finally { + if (!sameClient && client != null) { + client.close(); + client = null; + } + } + } + } finally { + if (sameClient && client != null) client.close(); + } + + } + + @Test(dataProvider = "variants") + void testAsync(String uri, + boolean sameClient) + { + System.out.printf("%ntesting testAsync(%s, %s)%n", uri, sameClient); + ConcurrentHashMap> responseSent = new ConcurrentHashMap<>(); + FCHttp2TestExchange.setResponseSentCB((s) -> responseSent.get(s).complete(s)); + + HttpClient client = null; + try { + int max = sameClient ? 5 : 3; + String label = null; + for (int i = 0; i < max; i++) { + if (!sameClient || client == null) + client = HttpClient.newBuilder().sslContext(sslContext).build(); + + String query = "reqId=" + reqid.incrementAndGet(); + URI uriWithQuery = URI.create(uri + "?" + query); + CompletableFuture sent = new CompletableFuture<>(); + responseSent.put(query, sent); + HttpRequest request = HttpRequest.newBuilder(uriWithQuery) + .POST(BodyPublishers.ofString("Hello there!")) + .build(); + System.out.println("\nSending request:" + uriWithQuery); + final HttpClient cc = client; + + Throwable t = null; + try { + HttpResponse response = cc.sendAsync(request, BodyHandlers.ofInputStream()).get(); + if (sameClient) { + String key = response.headers().firstValue("X-Connection-Key").get(); + if (label == null) label = key; + assertEquals(key, label, "Unexpected key for " + query); + } + sent.join(); + int wait = uri.startsWith("https://") ? 600 : 300; + try (InputStream is = response.body()) { + Thread.sleep(Utils.adjustTimeout(wait)); + is.readAllBytes(); + } + // we could fail here if we haven't waited long enough + fail("Expected exception, got :" + response + ", should sleep time be raised?"); + } catch (Throwable t0) { + System.out.println("Got EXPECTED: " + t0); + if (t0 instanceof ExecutionException) { + t0 = t0.getCause(); + } + t = t0; + } finally { + if (!sameClient && client != null) { + client.close(); + client = null; + } + } + assertDetailMessage(t, i); + } + } finally { + if (sameClient && client != null) client.close(); + } + } + + // Assertions based on implementation specific detail messages. Keep in + // sync with implementation. + static void assertDetailMessage(Throwable throwable, int iterationIndex) { + try { + Throwable cause = throwable; + while (cause != null) { + if (cause instanceof ProtocolException) { + if (cause.getMessage().matches("stream [0-9]+ flow control window exceeded")) { + System.out.println("Found expected exception: " + cause); + return; + } + } + cause = cause.getCause(); + } + throw new AssertionError( + "ProtocolException(\"stream X flow control window exceeded\") not found", + throwable); + } catch (AssertionError e) { + System.out.println("Exception does not match expectation: " + throwable); + throwable.printStackTrace(System.out); + throw e; + } + } + + @BeforeTest + public void setup() throws Exception { + sslContext = new SimpleSSLContext().get(); + if (sslContext == null) + throw new AssertionError("Unexpected null sslContext"); + + var http2TestServer = new Http2TestServer("localhost", false, 0); + http2TestServer.addHandler(new Http2TestHandler(), "/http2/"); + this.http2TestServer = HttpTestServer.of(http2TestServer); + http2URI = "http://" + this.http2TestServer.serverAuthority() + "/http2/x"; + + var https2TestServer = new Http2TestServer("localhost", true, sslContext); + https2TestServer.addHandler(new Http2TestHandler(), "/https2/"); + this.https2TestServer = HttpTestServer.of(https2TestServer); + https2URI = "https://" + this.https2TestServer.serverAuthority() + "/https2/x"; + + // Override the default exchange supplier with a custom one to enable + // particular test scenarios + http2TestServer.setExchangeSupplier(FCHttp2TestExchange::new); + https2TestServer.setExchangeSupplier(FCHttp2TestExchange::new); + + this.http2TestServer.start(); + this.https2TestServer.start(); + } + + @AfterTest + public void teardown() throws Exception { + http2TestServer.stop(); + https2TestServer.stop(); + } + + static class Http2TestHandler implements Http2Handler { + + @Override + public void handle(Http2TestExchange t) throws IOException { + String query = t.getRequestURI().getRawQuery(); + + try (InputStream is = t.getRequestBody(); + OutputStream os = t.getResponseBody()) { + + byte[] bytes = is.readAllBytes(); + System.out.println("Server " + t.getLocalAddress() + " received:\n" + + t.getRequestURI() + ": " + new String(bytes, StandardCharsets.UTF_8)); + t.getResponseHeaders().setHeader("X-Connection-Key", t.getConnectionKey()); + + if (bytes.length == 0) bytes = "no request body!".getBytes(StandardCharsets.UTF_8); + int window = Integer.getInteger("jdk.httpclient.windowsize", 2 * 16 * 1024); + final int maxChunkSize; + if (t instanceof FCHttp2TestExchange fct) { + maxChunkSize = Math.min(window, fct.conn.getMaxFrameSize()); + } else { + maxChunkSize = Math.min(window, SettingsFrame.MAX_FRAME_SIZE); + } + byte[] resp = bytes.length <= maxChunkSize + ? bytes + : Arrays.copyOfRange(bytes, 0, maxChunkSize); + int max = (window / resp.length) + 2; + // send in chunks + t.sendResponseHeaders(200, 0); + for (int i = 0; i <= max; i++) { + if (t instanceof FCHttp2TestExchange fct) { + try { + // we don't wait for the stream window, but we want + // to wait for the connection window + fct.conn.obtainConnectionWindow(resp.length); + } catch (InterruptedException ie) { + // ignore and continue... + } + } + ((BodyOutputStream) os).writeUncontrolled(resp, 0, resp.length); + } + } + if (t instanceof FCHttp2TestExchange fct) { + fct.responseSent(query); + } else fail("Exchange is not %s but %s" + .formatted(FCHttp2TestExchange.class.getName(), t.getClass().getName())); + } + } + + // A custom Http2TestExchangeImpl that overrides sendResponseHeaders to + // allow headers to be sent with a number of CONTINUATION frames. + static class FCHttp2TestExchange extends Http2TestExchangeImpl { + static volatile Consumer responseSentCB; + static void setResponseSentCB(Consumer responseSentCB) { + FCHttp2TestExchange.responseSentCB = responseSentCB; + } + + final Http2TestServerConnection conn; + FCHttp2TestExchange(int streamid, String method, HttpHeaders reqheaders, + HttpHeadersBuilder rspheadersBuilder, URI uri, InputStream is, + SSLSession sslSession, BodyOutputStream os, + Http2TestServerConnection conn, boolean pushAllowed) { + super(streamid, method, reqheaders, rspheadersBuilder, uri, is, sslSession, os, conn, pushAllowed); + this.conn = conn; + } + public void responseSent(String query) { + System.out.println("Server: response sent for " + query); + responseSentCB.accept(query); + } + + } +} diff --git a/test/jdk/java/net/httpclient/lib/jdk/httpclient/test/lib/http2/BodyOutputStream.java b/test/jdk/java/net/httpclient/lib/jdk/httpclient/test/lib/http2/BodyOutputStream.java index c091b7ecf9b..db5778ec3d9 100644 --- a/test/jdk/java/net/httpclient/lib/jdk/httpclient/test/lib/http2/BodyOutputStream.java +++ b/test/jdk/java/net/httpclient/lib/jdk/httpclient/test/lib/http2/BodyOutputStream.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016, 2023, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2016, 2024, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -25,6 +25,7 @@ package jdk.httpclient.test.lib.http2; import java.io.*; import java.nio.ByteBuffer; +import java.util.Objects; import jdk.internal.net.http.frame.DataFrame; import jdk.internal.net.http.frame.ResetFrame; @@ -65,6 +66,10 @@ public class BodyOutputStream extends OutputStream { // first wait for the connection window conn.obtainConnectionWindow(demand); // now wait for the stream window + waitForStreamWindow(demand); + } + + public void waitForStreamWindow(int demand) throws InterruptedException { synchronized (this) { while (demand > 0) { int n = Math.min(demand, window); @@ -83,6 +88,7 @@ public class BodyOutputStream extends OutputStream { @Override public void write(byte[] buf, int offset, int len) throws IOException { + Objects.checkFromIndexSize(offset, len, buf.length); if (closed) { throw new IOException("closed"); } @@ -104,6 +110,34 @@ public class BodyOutputStream extends OutputStream { } } + /** + * This method pushes frames onto the stack without checking + * for flow control, allowing the sender to bypass flow + * control for testing purposes + * @param buf data to send + * @param offset offset at which the data starts + * @param len length of the data to send + * @throws IOException if an I/O error occurs + */ + public void writeUncontrolled(byte[] buf, int offset, int len) + throws IOException { + Objects.checkFromIndexSize(offset, len, buf.length); + if (closed) { + throw new IOException("closed"); + } + + if (!goodToGo) { + throw new IllegalStateException("sendResponseHeaders must be called first"); + } + int max = conn.getMaxFrameSize(); + while (len > 0) { + int n = len > max ? max : len; + send(buf, offset, n, 0); + offset += n; + len -= n; + } + } + private void send(byte[] buf, int offset, int len, int flags) throws IOException { ByteBuffer buffer = ByteBuffer.allocate(len); buffer.put(buf, offset, len); diff --git a/test/jdk/java/net/httpclient/lib/jdk/httpclient/test/lib/http2/Http2TestServerConnection.java b/test/jdk/java/net/httpclient/lib/jdk/httpclient/test/lib/http2/Http2TestServerConnection.java index c98e986ca85..068d2a49e57 100644 --- a/test/jdk/java/net/httpclient/lib/jdk/httpclient/test/lib/http2/Http2TestServerConnection.java +++ b/test/jdk/java/net/httpclient/lib/jdk/httpclient/test/lib/http2/Http2TestServerConnection.java @@ -1374,7 +1374,7 @@ public class Http2TestServerConnection { * * @param amount */ - synchronized void obtainConnectionWindow(int amount) throws InterruptedException { + public synchronized void obtainConnectionWindow(int amount) throws InterruptedException { while (amount > 0) { int n = Math.min(amount, sendWindow); amount -= n; @@ -1384,9 +1384,13 @@ public class Http2TestServerConnection { } } - synchronized void updateConnectionWindow(int amount) { - sendWindow += amount; - notifyAll(); + void updateConnectionWindow(int amount) { + System.out.printf("sendWindow (window=%s, amount=%s) is now: %s%n", + sendWindow, amount, sendWindow + amount); + synchronized (this) { + sendWindow += amount; + notifyAll(); + } } // simplified output headers class. really just a type safe container