8342075: HttpClient: improve HTTP/2 flow control checks
Reviewed-by: jpai
This commit is contained in:
parent
85774b713e
commit
b0ac633b2d
@ -40,6 +40,8 @@ import jdk.internal.net.http.common.Logger;
|
||||
import jdk.internal.net.http.common.MinimalFuture;
|
||||
import jdk.internal.net.http.common.Utils;
|
||||
import jdk.internal.net.http.frame.SettingsFrame;
|
||||
|
||||
import static jdk.internal.net.http.frame.SettingsFrame.INITIAL_CONNECTION_WINDOW_SIZE;
|
||||
import static jdk.internal.net.http.frame.SettingsFrame.INITIAL_WINDOW_SIZE;
|
||||
import static jdk.internal.net.http.frame.SettingsFrame.ENABLE_PUSH;
|
||||
import static jdk.internal.net.http.frame.SettingsFrame.HEADER_TABLE_SIZE;
|
||||
@ -291,9 +293,13 @@ class Http2ClientImpl {
|
||||
// and the connection window size.
|
||||
int defaultValue = Math.max(streamWindow, K*K*32);
|
||||
|
||||
// The min value is the max between the streamWindow and
|
||||
// the initial connection window size
|
||||
int minValue = Math.max(INITIAL_CONNECTION_WINDOW_SIZE, streamWindow);
|
||||
|
||||
return getParameter(
|
||||
"jdk.httpclient.connectionWindowSize",
|
||||
streamWindow, Integer.MAX_VALUE, defaultValue);
|
||||
minValue, Integer.MAX_VALUE, defaultValue);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -92,6 +92,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
|
||||
import static jdk.internal.net.http.frame.SettingsFrame.DEFAULT_INITIAL_WINDOW_SIZE;
|
||||
import static jdk.internal.net.http.frame.SettingsFrame.ENABLE_PUSH;
|
||||
import static jdk.internal.net.http.frame.SettingsFrame.HEADER_TABLE_SIZE;
|
||||
import static jdk.internal.net.http.frame.SettingsFrame.INITIAL_CONNECTION_WINDOW_SIZE;
|
||||
import static jdk.internal.net.http.frame.SettingsFrame.INITIAL_WINDOW_SIZE;
|
||||
import static jdk.internal.net.http.frame.SettingsFrame.MAX_CONCURRENT_STREAMS;
|
||||
import static jdk.internal.net.http.frame.SettingsFrame.MAX_FRAME_SIZE;
|
||||
@ -1080,6 +1081,34 @@ class Http2Connection {
|
||||
return null;
|
||||
}
|
||||
|
||||
// This method is called when a DataFrame that was added
|
||||
// to a Stream::inputQ is later dropped from the queue
|
||||
// without being consumed.
|
||||
//
|
||||
// Before adding a frame to the queue, the Stream calls
|
||||
// connection.windowUpdater.canBufferUnprocessedBytes(), which
|
||||
// increases the count of unprocessed bytes in the connection.
|
||||
// After consuming the frame, it calls connection.windowUpdater::processed,
|
||||
// which decrements the count of unprocessed bytes, and possibly
|
||||
// sends a window update to the peer.
|
||||
//
|
||||
// This method is called when connection.windowUpdater::processed
|
||||
// will not be called, which can happen when consuming the frame
|
||||
// fails, or when an empty DataFrame terminates the stream,
|
||||
// or when the stream is cancelled while data is still
|
||||
// sitting in its inputQ. In the later case, it is called for
|
||||
// each frame that is dropped from the queue.
|
||||
final void releaseUnconsumed(DataFrame df) {
|
||||
windowUpdater.released(df.payloadLength());
|
||||
dropDataFrame(df);
|
||||
}
|
||||
|
||||
// This method can be called directly when a DataFrame is dropped
|
||||
// before/without having been added to any Stream::inputQ.
|
||||
// In that case, the number of unprocessed bytes hasn't been incremented
|
||||
// by the stream, and does not need to be decremented.
|
||||
// Otherwise, if the frame is dropped after having been added to the
|
||||
// inputQ, releaseUnconsumed above should be called.
|
||||
final void dropDataFrame(DataFrame df) {
|
||||
if (isMarked(closedState, SHUTDOWN_REQUESTED)) return;
|
||||
if (debug.on()) {
|
||||
@ -1465,11 +1494,12 @@ class Http2Connection {
|
||||
// Note that the default initial window size, not to be confused
|
||||
// with the initial window size, is defined by RFC 7540 as
|
||||
// 64K -1.
|
||||
final int len = windowUpdater.initialWindowSize - DEFAULT_INITIAL_WINDOW_SIZE;
|
||||
if (len != 0) {
|
||||
final int len = windowUpdater.initialWindowSize - INITIAL_CONNECTION_WINDOW_SIZE;
|
||||
assert len >= 0;
|
||||
if (len > 0) {
|
||||
if (Log.channel()) {
|
||||
Log.logChannel("Sending initial connection window update frame: {0} ({1} - {2})",
|
||||
len, windowUpdater.initialWindowSize, DEFAULT_INITIAL_WINDOW_SIZE);
|
||||
len, windowUpdater.initialWindowSize, INITIAL_CONNECTION_WINDOW_SIZE);
|
||||
}
|
||||
windowUpdater.sendWindowUpdate(len);
|
||||
}
|
||||
@ -1924,6 +1954,19 @@ class Http2Connection {
|
||||
int getStreamId() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean windowSizeExceeded(long received) {
|
||||
if (connection.isOpen()) {
|
||||
try {
|
||||
connection.protocolError(ErrorFrame.FLOW_CONTROL_ERROR,
|
||||
"connection window exceeded");
|
||||
} catch (IOException io) {
|
||||
connection.shutdown(io);
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -160,14 +160,13 @@ class Stream<T> extends ExchangeImpl<T> {
|
||||
// send lock: prevent sending DataFrames after reset occurred.
|
||||
private final Lock sendLock = new ReentrantLock();
|
||||
private final Lock stateLock = new ReentrantLock();
|
||||
|
||||
/**
|
||||
* A reference to this Stream's connection Send Window controller. The
|
||||
* stream MUST acquire the appropriate amount of Send Window before
|
||||
* sending any data. Will be null for PushStreams, as they cannot send data.
|
||||
*/
|
||||
private final WindowController windowController;
|
||||
private final WindowUpdateSender windowUpdater;
|
||||
private final WindowUpdateSender streamWindowUpdater;
|
||||
|
||||
// Only accessed in all method calls from incoming(), no need for volatile
|
||||
private boolean endStreamSeen;
|
||||
@ -217,7 +216,8 @@ class Stream<T> extends ExchangeImpl<T> {
|
||||
int size = Utils.remaining(dsts, Integer.MAX_VALUE);
|
||||
if (size == 0 && finished) {
|
||||
inputQ.remove();
|
||||
connection.ensureWindowUpdated(df); // must update connection window
|
||||
// consumed will not be called
|
||||
connection.releaseUnconsumed(df); // must update connection window
|
||||
Log.logTrace("responseSubscriber.onComplete");
|
||||
if (debug.on()) debug.log("incoming: onComplete");
|
||||
connection.decrementStreamsCount(streamid);
|
||||
@ -232,7 +232,11 @@ class Stream<T> extends ExchangeImpl<T> {
|
||||
try {
|
||||
subscriber.onNext(dsts);
|
||||
} catch (Throwable t) {
|
||||
connection.dropDataFrame(df); // must update connection window
|
||||
// Data frames that have been added to the inputQ
|
||||
// must be released using releaseUnconsumed() to
|
||||
// account for the amount of unprocessed bytes
|
||||
// tracked by the connection.windowUpdater.
|
||||
connection.releaseUnconsumed(df);
|
||||
throw t;
|
||||
}
|
||||
if (consumed(df)) {
|
||||
@ -283,8 +287,12 @@ class Stream<T> extends ExchangeImpl<T> {
|
||||
private void drainInputQueue() {
|
||||
Http2Frame frame;
|
||||
while ((frame = inputQ.poll()) != null) {
|
||||
if (frame instanceof DataFrame) {
|
||||
connection.dropDataFrame((DataFrame)frame);
|
||||
if (frame instanceof DataFrame df) {
|
||||
// Data frames that have been added to the inputQ
|
||||
// must be released using releaseUnconsumed() to
|
||||
// account for the amount of unprocessed bytes
|
||||
// tracked by the connection.windowUpdater.
|
||||
connection.releaseUnconsumed(df);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -310,12 +318,13 @@ class Stream<T> extends ExchangeImpl<T> {
|
||||
boolean endStream = df.getFlag(DataFrame.END_STREAM);
|
||||
if (len == 0) return endStream;
|
||||
|
||||
connection.windowUpdater.update(len);
|
||||
|
||||
connection.windowUpdater.processed(len);
|
||||
if (!endStream) {
|
||||
streamWindowUpdater.processed(len);
|
||||
} else {
|
||||
// Don't send window update on a stream which is
|
||||
// closed or half closed.
|
||||
windowUpdater.update(len);
|
||||
streamWindowUpdater.released(len);
|
||||
}
|
||||
|
||||
// true: end of stream; false: more data coming
|
||||
@ -385,8 +394,21 @@ class Stream<T> extends ExchangeImpl<T> {
|
||||
}
|
||||
|
||||
private void receiveDataFrame(DataFrame df) {
|
||||
inputQ.add(df);
|
||||
sched.runOrSchedule();
|
||||
try {
|
||||
int len = df.payloadLength();
|
||||
if (len > 0) {
|
||||
// we return from here if the connection is being closed.
|
||||
if (!connection.windowUpdater.canBufferUnprocessedBytes(len)) return;
|
||||
// we return from here if the stream is being closed.
|
||||
if (closed || !streamWindowUpdater.canBufferUnprocessedBytes(len)) {
|
||||
connection.releaseUnconsumed(df);
|
||||
return;
|
||||
}
|
||||
}
|
||||
inputQ.add(df);
|
||||
} finally {
|
||||
sched.runOrSchedule();
|
||||
}
|
||||
}
|
||||
|
||||
/** Handles a RESET frame. RESET is always handled inline in the queue. */
|
||||
@ -470,7 +492,7 @@ class Stream<T> extends ExchangeImpl<T> {
|
||||
this.responseHeadersBuilder = new HttpHeadersBuilder();
|
||||
this.rspHeadersConsumer = new HeadersConsumer();
|
||||
this.requestPseudoHeaders = createPseudoHeaders(request);
|
||||
this.windowUpdater = new StreamWindowUpdateSender(connection);
|
||||
this.streamWindowUpdater = new StreamWindowUpdateSender(connection);
|
||||
}
|
||||
|
||||
private boolean checkRequestCancelled() {
|
||||
@ -506,6 +528,8 @@ class Stream<T> extends ExchangeImpl<T> {
|
||||
if (debug.on()) {
|
||||
debug.log("request cancelled or stream closed: dropping data frame");
|
||||
}
|
||||
// Data frames that have not been added to the inputQ
|
||||
// can be released using dropDataFrame
|
||||
connection.dropDataFrame(df);
|
||||
} else {
|
||||
receiveDataFrame(df);
|
||||
@ -1427,12 +1451,18 @@ class Stream<T> extends ExchangeImpl<T> {
|
||||
|
||||
@Override
|
||||
void onProtocolError(final IOException cause) {
|
||||
onProtocolError(cause, ResetFrame.PROTOCOL_ERROR);
|
||||
}
|
||||
|
||||
void onProtocolError(final IOException cause, int code) {
|
||||
if (debug.on()) {
|
||||
debug.log("cancelling exchange on stream %d due to protocol error: %s", streamid, cause.getMessage());
|
||||
debug.log("cancelling exchange on stream %d due to protocol error [%s]: %s",
|
||||
streamid, ErrorFrame.stringForCode(code),
|
||||
cause.getMessage());
|
||||
}
|
||||
Log.logError("cancelling exchange on stream {0} due to protocol error: {1}\n", streamid, cause);
|
||||
// send a RESET frame and close the stream
|
||||
cancelImpl(cause, ResetFrame.PROTOCOL_ERROR);
|
||||
cancelImpl(cause, code);
|
||||
}
|
||||
|
||||
void connectionClosing(Throwable cause) {
|
||||
@ -1736,6 +1766,13 @@ class Stream<T> extends ExchangeImpl<T> {
|
||||
return dbgString = dbg;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean windowSizeExceeded(long received) {
|
||||
onProtocolError(new ProtocolException("stream %s flow control window exceeded"
|
||||
.formatted(streamid)), ResetFrame.FLOW_CONTROL_ERROR);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright (c) 2015, 2023, Oracle and/or its affiliates. All rights reserved.
|
||||
* Copyright (c) 2015, 2024, Oracle and/or its affiliates. All rights reserved.
|
||||
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
||||
*
|
||||
* This code is free software; you can redistribute it and/or modify it
|
||||
@ -31,16 +31,31 @@ import jdk.internal.net.http.frame.SettingsFrame;
|
||||
import jdk.internal.net.http.frame.WindowUpdateFrame;
|
||||
import jdk.internal.net.http.common.Utils;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
/**
|
||||
* A class that tracks the amount of flow controlled
|
||||
* data received on an HTTP/2 connection
|
||||
*/
|
||||
abstract class WindowUpdateSender {
|
||||
|
||||
final Logger debug =
|
||||
Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
|
||||
|
||||
// The threshold at which window updates are sent in bytes
|
||||
final int limit;
|
||||
// The flow control window in bytes
|
||||
final int windowSize;
|
||||
final Http2Connection connection;
|
||||
final AtomicInteger received = new AtomicInteger();
|
||||
// The amount of flow controlled data received and processed, in bytes,
|
||||
// since the start of the window.
|
||||
// The window is exhausted when received + unprocessed >= windowSize
|
||||
final AtomicLong received = new AtomicLong();
|
||||
// The amount of flow controlled data received and unprocessed, in bytes,
|
||||
// since the start of the window.
|
||||
// The window is exhausted when received + unprocessed >= windowSize
|
||||
final AtomicLong unprocessed = new AtomicLong();
|
||||
final ReentrantLock sendLock = new ReentrantLock();
|
||||
|
||||
WindowUpdateSender(Http2Connection connection) {
|
||||
@ -53,6 +68,7 @@ abstract class WindowUpdateSender {
|
||||
|
||||
WindowUpdateSender(Http2Connection connection, int maxFrameSize, int initWindowSize) {
|
||||
this.connection = connection;
|
||||
this.windowSize = initWindowSize;
|
||||
int v0 = Math.max(0, initWindowSize - maxFrameSize);
|
||||
int v1 = (initWindowSize + (maxFrameSize - 1)) / maxFrameSize;
|
||||
v1 = v1 * maxFrameSize / 2;
|
||||
@ -66,16 +82,119 @@ abstract class WindowUpdateSender {
|
||||
maxFrameSize, initWindowSize, limit);
|
||||
}
|
||||
|
||||
// O for the connection window, > 0 for a stream window
|
||||
abstract int getStreamId();
|
||||
|
||||
|
||||
/**
|
||||
* {@return {@code true} if buffering the given amount of
|
||||
* flow controlled data would not exceed the flow control
|
||||
* window}
|
||||
* <p>
|
||||
* This method is called before buffering and processing
|
||||
* a DataFrame. The count of unprocessed bytes is incremented
|
||||
* by the given amount, and checked against the number of
|
||||
* available bytes in the flow control window.
|
||||
* <p>
|
||||
* This method returns {@code true} if the bytes can be buffered
|
||||
* without exceeding the flow control window, {@code false}
|
||||
* if the flow control window is exceeded and corrective
|
||||
* action (close/reset) has been taken.
|
||||
* <p>
|
||||
* When this method returns true, either {@link #processed(int)}
|
||||
* or {@link #released(int)} must eventually be called to release
|
||||
* the bytes from the flow control window.
|
||||
*
|
||||
* @implSpec
|
||||
* an HTTP/2 endpoint may disable its own flow control
|
||||
* (see <a href="https://www.rfc-editor.org/rfc/rfc9113.html#section-5.2.1">
|
||||
* RFC 9113, section 5.2.1</a>), in which case this
|
||||
* method may return true even if the flow control window would
|
||||
* be exceeded: that is, the flow control window is exceeded but
|
||||
* the endpoint decided to take no corrective action.
|
||||
*
|
||||
* @param len a number of unprocessed bytes, which
|
||||
* the caller wants to buffer.
|
||||
*/
|
||||
boolean canBufferUnprocessedBytes(int len) {
|
||||
return !checkWindowSizeExceeded(unprocessed.addAndGet(len));
|
||||
}
|
||||
|
||||
// adds the provided amount to the amount of already
|
||||
// received and processed bytes and checks whether the
|
||||
// flow control window is exceeded. If so, take
|
||||
// corrective actions and return true.
|
||||
private boolean checkWindowSizeExceeded(long len) {
|
||||
// because windowSize is bound by Integer.MAX_VALUE
|
||||
// we will never reach the point where received.get() + len
|
||||
// could overflow
|
||||
long rcv = received.get() + len;
|
||||
return rcv > windowSize && windowSizeExceeded(rcv);
|
||||
}
|
||||
|
||||
/**
|
||||
* Called after unprocessed buffered bytes have been
|
||||
* processed, to release part of the flow control window
|
||||
*
|
||||
* @apiNote this method is called only when releasing bytes
|
||||
* that where buffered after calling
|
||||
* {@link #canBufferUnprocessedBytes(int)}.
|
||||
*
|
||||
* @param delta the amount of processed bytes to release
|
||||
*/
|
||||
void processed(int delta) {
|
||||
long rest = unprocessed.addAndGet(-delta);
|
||||
assert rest >= 0;
|
||||
update(delta);
|
||||
}
|
||||
|
||||
/**
|
||||
* Called when it is desired to release unprocessed bytes
|
||||
* without processing them, or without triggering the
|
||||
* sending of a window update. This method can be called
|
||||
* instead of calling {@link #processed(int)}.
|
||||
* When this method is called instead of calling {@link #processed(int)},
|
||||
* it should generally be followed by a call to {@link #update(int)},
|
||||
* unless the stream or connection is being closed.
|
||||
*
|
||||
* @apiNote this method should only be called to release bytes that
|
||||
* have been buffered after calling {@link
|
||||
* #canBufferUnprocessedBytes(int)}.
|
||||
*
|
||||
* @param delta the amount of bytes to release from the window
|
||||
*
|
||||
* @return the amount of remaining unprocessed bytes
|
||||
*/
|
||||
long released(int delta) {
|
||||
long rest = unprocessed.addAndGet(-delta);
|
||||
assert rest >= 0;
|
||||
return rest;
|
||||
}
|
||||
|
||||
/**
|
||||
* This method is called to update the flow control window,
|
||||
* and possibly send a window update
|
||||
*
|
||||
* @apiNote this method can be called directly if a frame is
|
||||
* dropped before calling {@link #canBufferUnprocessedBytes(int)}.
|
||||
* Otherwise, either {@link #processed(int)} or {@link #released(int)}
|
||||
* should be called, depending on whether sending a window update
|
||||
* is desired or not. It is typically not desired to send an update
|
||||
* if the stream or connection is being closed.
|
||||
*
|
||||
* @param delta the amount of bytes released from the window.
|
||||
*/
|
||||
void update(int delta) {
|
||||
int rcv = received.addAndGet(delta);
|
||||
long rcv = received.addAndGet(delta);
|
||||
if (debug.on()) debug.log("update: %d, received: %d, limit: %d", delta, rcv, limit);
|
||||
if (rcv > windowSize && windowSizeExceeded(rcv)) {
|
||||
return;
|
||||
}
|
||||
if (rcv > limit) {
|
||||
sendLock.lock();
|
||||
try {
|
||||
int tosend = received.get();
|
||||
if( tosend > limit) {
|
||||
int tosend = (int)Math.min(received.get(), Integer.MAX_VALUE);
|
||||
if (tosend > limit) {
|
||||
received.getAndAdd(-tosend);
|
||||
sendWindowUpdate(tosend);
|
||||
}
|
||||
@ -87,6 +206,7 @@ abstract class WindowUpdateSender {
|
||||
|
||||
void sendWindowUpdate(int delta) {
|
||||
if (debug.on()) debug.log("sending window update: %d", delta);
|
||||
assert delta > 0 : "illegal window update delta: " + delta;
|
||||
connection.sendUnorderedFrame(new WindowUpdateFrame(getStreamId(), delta));
|
||||
}
|
||||
|
||||
@ -104,4 +224,16 @@ abstract class WindowUpdateSender {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Called when the flow control window size is exceeded
|
||||
* This method may return false if flow control is disabled
|
||||
* in this endpoint.
|
||||
*
|
||||
* @param received the amount of data received, which is greater
|
||||
* than {@code windowSize}
|
||||
* @return {@code true} if the error was reported to the peer
|
||||
* and no further window update should be sent.
|
||||
*/
|
||||
protected abstract boolean windowSizeExceeded(long received);
|
||||
|
||||
}
|
||||
|
@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
|
||||
* Copyright (c) 2015, 2024, Oracle and/or its affiliates. All rights reserved.
|
||||
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
||||
*
|
||||
* This code is free software; you can redistribute it and/or modify it
|
||||
@ -463,6 +463,16 @@ public class FramesDecoder {
|
||||
int val = getInt();
|
||||
if (id > 0 && id <= SettingsFrame.MAX_PARAM) {
|
||||
// a known parameter. Ignore otherwise
|
||||
if (id == SettingsFrame.INITIAL_WINDOW_SIZE && val < 0) {
|
||||
return new MalformedFrame(ErrorFrame.FLOW_CONTROL_ERROR,
|
||||
"SettingsFrame with INITIAL_WINDOW_SIZE > 2^31 -1: "
|
||||
+ (val & 0xffffffffL));
|
||||
}
|
||||
if (id == SettingsFrame.MAX_FRAME_SIZE && (val < 16384 || val > 16777215)) {
|
||||
return new MalformedFrame(ErrorFrame.PROTOCOL_ERROR,
|
||||
"SettingsFrame with MAX_FRAME_SIZE out of range: "
|
||||
+ (val & 0xffffffffL));
|
||||
}
|
||||
sf.setParameter(id, val); // TODO parameters validation
|
||||
}
|
||||
}
|
||||
@ -530,7 +540,12 @@ public class FramesDecoder {
|
||||
return new MalformedFrame(ErrorFrame.FRAME_SIZE_ERROR,
|
||||
"WindowUpdateFrame length is "+ frameLength+", expected 4");
|
||||
}
|
||||
return new WindowUpdateFrame(streamid, getInt() & 0x7fffffff);
|
||||
int update = getInt();
|
||||
if (update < 0) {
|
||||
return new MalformedFrame(ErrorFrame.FLOW_CONTROL_ERROR,
|
||||
"WindowUpdateFrame with value > 2^31 -1 " + (update & 0xffffffffL));
|
||||
}
|
||||
return new WindowUpdateFrame(streamid, update & 0x7fffffff);
|
||||
}
|
||||
|
||||
private Http2Frame parseContinuationFrame(int frameLength, int streamid, int flags) {
|
||||
|
@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright (c) 2015, 2021, Oracle and/or its affiliates. All rights reserved.
|
||||
* Copyright (c) 2015, 2024, Oracle and/or its affiliates. All rights reserved.
|
||||
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
||||
*
|
||||
* This code is free software; you can redistribute it and/or modify it
|
||||
@ -166,6 +166,11 @@ public class SettingsFrame extends Http2Frame {
|
||||
// The initial value is 2^14 (16,384) octets.
|
||||
public static final int DEFAULT_MAX_FRAME_SIZE = 16 * K;
|
||||
|
||||
// Initial connection window size. This cannot be updated using the
|
||||
// SETTINGS frame.
|
||||
public static final int INITIAL_CONNECTION_WINDOW_SIZE = DEFAULT_INITIAL_WINDOW_SIZE;
|
||||
|
||||
|
||||
public static SettingsFrame defaultRFCSettings() {
|
||||
SettingsFrame f = new SettingsFrame();
|
||||
f.setParameter(ENABLE_PUSH, DEFAULT_ENABLE_PUSH);
|
||||
|
@ -57,9 +57,11 @@
|
||||
* means that the cache is unbounded.
|
||||
* </li>
|
||||
* <li><p><b>{@systemProperty jdk.httpclient.connectionWindowSize}</b> (default: 2^26)<br>
|
||||
* The HTTP/2 client connection window size in bytes. The maximum size is 2^31-1. This value
|
||||
* cannot be smaller than the stream window size, which can be configured through the
|
||||
* {@code jdk.httpclient.windowsize} system property.
|
||||
* The HTTP/2 client connection window size in bytes. Valid values are in the range
|
||||
* [2^16-1, 2^31-1]. If an invalid value is provided, the default value is used.
|
||||
* The implementation guarantees that the actual value will be no smaller than the stream
|
||||
* window size, which can be configured through the {@code jdk.httpclient.windowsize}
|
||||
* system property.
|
||||
* </li>
|
||||
* <li><p><b>{@systemProperty jdk.httpclient.disableRetryConnect}</b> (default: false)<br>
|
||||
* Whether automatic retry of connection failures is disabled. If false, then retries are
|
||||
@ -150,7 +152,8 @@
|
||||
* or 16kB)<br>The buffer size used by the web socket implementation for socket writes.
|
||||
* </li>
|
||||
* <li><p><b>{@systemProperty jdk.httpclient.windowsize}</b> (default: 16777216 or 16 MB)<br>
|
||||
* The HTTP/2 client stream window size in bytes.
|
||||
* The HTTP/2 client stream window size in bytes. Valid values are in the range [2^14, 2^31-1].
|
||||
* If an invalid value is provided, the default value is used.
|
||||
* </li>
|
||||
* <li><p><b>{@systemProperty jdk.httpclient.auth.retrylimit}</b> (default: 3)<br>
|
||||
* The number of attempts the Basic authentication filter will attempt to retry a failed
|
||||
|
@ -26,7 +26,7 @@
|
||||
* @bug 8217264
|
||||
* @summary Tests that you can map an InputStream to a GZIPInputStream
|
||||
* @library /test/lib /test/jdk/java/net/httpclient/lib
|
||||
* @build jdk.test.lib.net.SimpleSSLContext jdk.httpclient.test.lib.common.HttpServerAdapters
|
||||
* @build jdk.test.lib.net.SimpleSSLContext jdk.httpclient.test.lib.common.HttpServerAdapters ReferenceTracker
|
||||
* @run testng/othervm GZIPInputStreamTest
|
||||
*/
|
||||
|
||||
|
@ -376,7 +376,7 @@ public class ProxySelectorTest implements HttpServerAdapters {
|
||||
public void teardown() throws Exception {
|
||||
client = null;
|
||||
Thread.sleep(100);
|
||||
AssertionError fail = TRACKER.check(500);
|
||||
AssertionError fail = TRACKER.check(1500);
|
||||
try {
|
||||
proxy.stop();
|
||||
authproxy.stop();
|
||||
|
@ -0,0 +1,365 @@
|
||||
/*
|
||||
* Copyright (c) 2024, Oracle and/or its affiliates. All rights reserved.
|
||||
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
||||
*
|
||||
* This code is free software; you can redistribute it and/or modify it
|
||||
* under the terms of the GNU General Public License version 2 only, as
|
||||
* published by the Free Software Foundation.
|
||||
*
|
||||
* This code is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
|
||||
* version 2 for more details (a copy is included in the LICENSE file that
|
||||
* accompanied this code).
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License version
|
||||
* 2 along with this work; if not, write to the Free Software Foundation,
|
||||
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||
*
|
||||
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
|
||||
* or visit www.oracle.com if you need additional information or have any
|
||||
* questions.
|
||||
*/
|
||||
|
||||
/*
|
||||
* @test
|
||||
* @bug 8342075
|
||||
* @summary checks connection flow control
|
||||
* @library /test/lib /test/jdk/java/net/httpclient/lib
|
||||
* @build jdk.httpclient.test.lib.http2.Http2TestServer jdk.test.lib.net.SimpleSSLContext
|
||||
* @run testng/othervm -Djdk.internal.httpclient.debug=true
|
||||
* -Djdk.httpclient.connectionWindowSize=65535
|
||||
* -Djdk.httpclient.windowsize=16384
|
||||
* ConnectionFlowControlTest
|
||||
*/
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.net.ProtocolException;
|
||||
import java.net.URI;
|
||||
import java.net.http.HttpClient;
|
||||
import java.net.http.HttpHeaders;
|
||||
import java.net.http.HttpRequest;
|
||||
import java.net.http.HttpRequest.BodyPublishers;
|
||||
import java.net.http.HttpResponse;
|
||||
import java.net.http.HttpResponse.BodyHandler;
|
||||
import java.net.http.HttpResponse.BodyHandlers;
|
||||
import java.net.http.HttpResponse.BodySubscriber;
|
||||
import java.net.http.HttpResponse.ResponseInfo;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.CompletionStage;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Flow.Subscription;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.BiFunction;
|
||||
import java.util.function.Consumer;
|
||||
import javax.net.ssl.SSLContext;
|
||||
import javax.net.ssl.SSLSession;
|
||||
|
||||
import jdk.httpclient.test.lib.common.HttpServerAdapters.HttpTestServer;
|
||||
import jdk.httpclient.test.lib.http2.BodyOutputStream;
|
||||
import jdk.httpclient.test.lib.http2.Http2Handler;
|
||||
import jdk.httpclient.test.lib.http2.Http2TestExchange;
|
||||
import jdk.httpclient.test.lib.http2.Http2TestExchangeImpl;
|
||||
import jdk.httpclient.test.lib.http2.Http2TestServer;
|
||||
import jdk.httpclient.test.lib.http2.Http2TestServerConnection;
|
||||
import jdk.internal.net.http.common.HttpHeadersBuilder;
|
||||
import jdk.internal.net.http.frame.ContinuationFrame;
|
||||
import jdk.internal.net.http.frame.HeaderFrame;
|
||||
import jdk.internal.net.http.frame.HeadersFrame;
|
||||
import jdk.internal.net.http.frame.Http2Frame;
|
||||
import jdk.internal.net.http.frame.SettingsFrame;
|
||||
import jdk.test.lib.Utils;
|
||||
import jdk.test.lib.net.SimpleSSLContext;
|
||||
import org.testng.annotations.AfterTest;
|
||||
import org.testng.annotations.BeforeTest;
|
||||
import org.testng.annotations.DataProvider;
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
import static java.util.List.of;
|
||||
import static java.util.Map.entry;
|
||||
import static org.testng.Assert.assertEquals;
|
||||
import static org.testng.Assert.assertNotEquals;
|
||||
import static org.testng.Assert.assertTrue;
|
||||
import static org.testng.Assert.fail;
|
||||
|
||||
public class ConnectionFlowControlTest {
|
||||
|
||||
SSLContext sslContext;
|
||||
HttpTestServer http2TestServer; // HTTP/2 ( h2c )
|
||||
HttpTestServer https2TestServer; // HTTP/2 ( h2 )
|
||||
String http2URI;
|
||||
String https2URI;
|
||||
final AtomicInteger reqid = new AtomicInteger();
|
||||
|
||||
|
||||
@DataProvider(name = "variants")
|
||||
public Object[][] variants() {
|
||||
return new Object[][] {
|
||||
{ http2URI },
|
||||
{ https2URI },
|
||||
};
|
||||
}
|
||||
|
||||
@Test(dataProvider = "variants")
|
||||
void test(String uri) throws Exception {
|
||||
System.out.printf("%ntesting %s%n", uri);
|
||||
ConcurrentHashMap<String, CompletableFuture<String>> responseSent = new ConcurrentHashMap<>();
|
||||
ConcurrentHashMap<String, HttpResponse<InputStream>> responses = new ConcurrentHashMap<>();
|
||||
FCHttp2TestExchange.setResponseSentCB((s) -> responseSent.get(s).complete(s));
|
||||
int connectionWindowSize = Math.max(Integer.getInteger(
|
||||
"jdk.httpclient.connectionWindowSize", 65535), 65535);
|
||||
int windowSize = Math.max(Integer.getInteger(
|
||||
"jdk.httpclient.windowsize", 65535), 16384);
|
||||
int max = connectionWindowSize / windowSize + 2;
|
||||
System.out.printf("connection window: %s, stream window: %s, will make %s requests%n",
|
||||
connectionWindowSize, windowSize, max);
|
||||
|
||||
try (HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build()) {
|
||||
String label = null;
|
||||
|
||||
Throwable t = null;
|
||||
try {
|
||||
String[] keys = new String[max];
|
||||
for (int i = 0; i < max; i++) {
|
||||
String query = "reqId=" + reqid.incrementAndGet();
|
||||
keys[i] = query;
|
||||
URI uriWithQuery = URI.create(uri + "?" + query);
|
||||
CompletableFuture<String> sent = new CompletableFuture<>();
|
||||
responseSent.put(query, sent);
|
||||
HttpRequest request = HttpRequest.newBuilder(uriWithQuery)
|
||||
.POST(BodyPublishers.ofString("Hello there!"))
|
||||
.build();
|
||||
System.out.println("\nSending request:" + uriWithQuery);
|
||||
final HttpClient cc = client;
|
||||
var response = cc.send(request, BodyHandlers.ofInputStream());
|
||||
responses.put(query, response);
|
||||
String ckey = response.headers().firstValue("X-Connection-Key").get();
|
||||
if (label == null) label = ckey;
|
||||
try {
|
||||
if (i < max - 1) {
|
||||
// the connection window might be exceeded at i == max - 2, which
|
||||
// means that the last request could go on a new connection.
|
||||
assertEquals(ckey, label, "Unexpected key for " + query);
|
||||
}
|
||||
} catch (AssertionError ass) {
|
||||
// since we won't pull all responses, the client
|
||||
// will not exit unless we ask it to shutdown now.
|
||||
client.shutdownNow();
|
||||
throw ass;
|
||||
}
|
||||
}
|
||||
try {
|
||||
Thread.sleep(1000);
|
||||
} catch (InterruptedException ie) {
|
||||
// ignore
|
||||
}
|
||||
CompletableFuture<?> allsent = CompletableFuture.allOf(responseSent.values().stream()
|
||||
.toArray(CompletableFuture<?>[]::new));
|
||||
allsent.get();
|
||||
for (int i = 0; i < max; i++) {
|
||||
try {
|
||||
String query = keys[i];
|
||||
var response = responses.get(keys[i]);
|
||||
String ckey = response.headers().firstValue("X-Connection-Key").get();
|
||||
if (label == null) label = ckey;
|
||||
assertEquals(ckey, label, "Unexpected key for " + query);
|
||||
int wait = uri.startsWith("https://") ? 500 : 250;
|
||||
try (InputStream is = response.body()) {
|
||||
Thread.sleep(Utils.adjustTimeout(wait));
|
||||
is.readAllBytes();
|
||||
}
|
||||
System.out.printf("%s did not fail: %s%n", query, response.statusCode());
|
||||
} catch (AssertionError t1) {
|
||||
// since we won't pull all responses, the client
|
||||
// will not exit unless we ask it to shutdown now.
|
||||
client.shutdownNow();
|
||||
throw t1;
|
||||
} catch (Throwable t0) {
|
||||
System.out.println("Got EXPECTED: " + t0);
|
||||
if (t0 instanceof ExecutionException) {
|
||||
t0 = t0.getCause();
|
||||
}
|
||||
t = t0;
|
||||
try {
|
||||
assertDetailMessage(t0, i);
|
||||
} catch (AssertionError e) {
|
||||
// since we won't pull all responses, the client
|
||||
// will not exit unless we ask it to shutdown now.
|
||||
client.shutdownNow();
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (Throwable t0) {
|
||||
System.out.println("Got EXPECTED: " + t0);
|
||||
if (t0 instanceof ExecutionException) {
|
||||
t0 = t0.getCause();
|
||||
}
|
||||
t = t0;
|
||||
}
|
||||
if (t == null) {
|
||||
// we could fail here if we haven't waited long enough
|
||||
fail("Expected exception, got all responses, should sleep time be raised?");
|
||||
} else {
|
||||
assertDetailMessage(t, max);
|
||||
}
|
||||
String query = "reqId=" + reqid.incrementAndGet();
|
||||
URI uriWithQuery = URI.create(uri + "?" + query);
|
||||
CompletableFuture<String> sent = new CompletableFuture<>();
|
||||
responseSent.put(query, sent);
|
||||
HttpRequest request = HttpRequest.newBuilder(uriWithQuery)
|
||||
.POST(BodyPublishers.ofString("Hello there!"))
|
||||
.build();
|
||||
System.out.println("\nSending last request:" + uriWithQuery);
|
||||
var response = client.send(request, BodyHandlers.ofString());
|
||||
if (label != null) {
|
||||
String ckey = response.headers().firstValue("X-Connection-Key").get();
|
||||
assertNotEquals(ckey, label);
|
||||
System.out.printf("last request %s sent on different connection as expected:" +
|
||||
"\n\tlast: %s\n\tprevious: %s%n", query, ckey, label);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Assertions based on implementation specific detail messages. Keep in
|
||||
// sync with implementation.
|
||||
static void assertDetailMessage(Throwable throwable, int iterationIndex) {
|
||||
try {
|
||||
Throwable cause = throwable;
|
||||
while (cause != null) {
|
||||
if (cause instanceof ProtocolException) {
|
||||
if (cause.getMessage().contains("connection window exceeded")) {
|
||||
System.out.println("Found expected exception: " + cause);
|
||||
return;
|
||||
}
|
||||
}
|
||||
cause = cause.getCause();
|
||||
}
|
||||
throw new AssertionError(
|
||||
"ProtocolException(\"protocol error: connection window exceeded\") not found",
|
||||
throwable);
|
||||
} catch (AssertionError e) {
|
||||
System.out.println("Exception does not match expectation: " + throwable);
|
||||
throwable.printStackTrace(System.out);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
@BeforeTest
|
||||
public void setup() throws Exception {
|
||||
sslContext = new SimpleSSLContext().get();
|
||||
if (sslContext == null)
|
||||
throw new AssertionError("Unexpected null sslContext");
|
||||
|
||||
var http2TestServer = new Http2TestServer("localhost", false, 0);
|
||||
http2TestServer.addHandler(new Http2TestHandler(), "/http2/");
|
||||
this.http2TestServer = HttpTestServer.of(http2TestServer);
|
||||
http2URI = "http://" + this.http2TestServer.serverAuthority() + "/http2/x";
|
||||
|
||||
var https2TestServer = new Http2TestServer("localhost", true, sslContext);
|
||||
https2TestServer.addHandler(new Http2TestHandler(), "/https2/");
|
||||
this.https2TestServer = HttpTestServer.of(https2TestServer);
|
||||
https2URI = "https://" + this.https2TestServer.serverAuthority() + "/https2/x";
|
||||
|
||||
// Override the default exchange supplier with a custom one to enable
|
||||
// particular test scenarios
|
||||
http2TestServer.setExchangeSupplier(FCHttp2TestExchange::new);
|
||||
https2TestServer.setExchangeSupplier(FCHttp2TestExchange::new);
|
||||
|
||||
this.http2TestServer.start();
|
||||
this.https2TestServer.start();
|
||||
}
|
||||
|
||||
@AfterTest
|
||||
public void teardown() throws Exception {
|
||||
http2TestServer.stop();
|
||||
https2TestServer.stop();
|
||||
}
|
||||
|
||||
static class Http2TestHandler implements Http2Handler {
|
||||
|
||||
@Override
|
||||
public void handle(Http2TestExchange t) throws IOException {
|
||||
String query = t.getRequestURI().getRawQuery();
|
||||
|
||||
try (InputStream is = t.getRequestBody();
|
||||
OutputStream os = t.getResponseBody()) {
|
||||
|
||||
byte[] bytes = is.readAllBytes();
|
||||
System.out.println("Server " + t.getLocalAddress() + " received:\n"
|
||||
+ t.getRequestURI() + ": " + new String(bytes, StandardCharsets.UTF_8));
|
||||
t.getResponseHeaders().setHeader("X-Connection-Key", t.getConnectionKey());
|
||||
|
||||
if (bytes.length == 0) bytes = "no request body!".getBytes(StandardCharsets.UTF_8);
|
||||
int window = Math.max(16384, Integer.getInteger("jdk.httpclient.windowsize", 2*16*1024));
|
||||
final int maxChunkSize;
|
||||
if (t instanceof FCHttp2TestExchange fct) {
|
||||
maxChunkSize = Math.min(window, fct.conn.getMaxFrameSize());
|
||||
} else {
|
||||
maxChunkSize = Math.min(window, SettingsFrame.MAX_FRAME_SIZE);
|
||||
}
|
||||
byte[] resp = bytes.length < maxChunkSize
|
||||
? bytes
|
||||
: Arrays.copyOfRange(bytes, 0, maxChunkSize);
|
||||
int max = (window / resp.length);
|
||||
// send in chunks
|
||||
t.sendResponseHeaders(200, 0);
|
||||
int sent = 0;
|
||||
for (int i=0; i<=max; i++) {
|
||||
int len = Math.min(resp.length, window - sent);
|
||||
if (len <= 0) break;
|
||||
if (os instanceof BodyOutputStream bos) {
|
||||
try {
|
||||
// we don't wait for the stream window, but we want
|
||||
// to wait for the connection window
|
||||
bos.waitForStreamWindow(len);
|
||||
} catch (InterruptedException ie) {
|
||||
// ignore and continue...
|
||||
}
|
||||
}
|
||||
((BodyOutputStream) os).writeUncontrolled(resp, 0, len);
|
||||
sent += len;
|
||||
}
|
||||
if (sent != window) fail("should have sent %s, sent %s".formatted(window, sent));
|
||||
}
|
||||
if (t instanceof FCHttp2TestExchange fct) {
|
||||
fct.responseSent(query);
|
||||
} else {
|
||||
fail("Exchange is not %s but %s"
|
||||
.formatted(FCHttp2TestExchange.class.getName(), t.getClass().getName()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// A custom Http2TestExchangeImpl that overrides sendResponseHeaders to
|
||||
// allow headers to be sent with a number of CONTINUATION frames.
|
||||
static class FCHttp2TestExchange extends Http2TestExchangeImpl {
|
||||
static volatile Consumer<String> responseSentCB;
|
||||
static void setResponseSentCB(Consumer<String> responseSentCB) {
|
||||
FCHttp2TestExchange.responseSentCB = responseSentCB;
|
||||
}
|
||||
|
||||
final Http2TestServerConnection conn;
|
||||
FCHttp2TestExchange(int streamid, String method, HttpHeaders reqheaders,
|
||||
HttpHeadersBuilder rspheadersBuilder, URI uri, InputStream is,
|
||||
SSLSession sslSession, BodyOutputStream os,
|
||||
Http2TestServerConnection conn, boolean pushAllowed) {
|
||||
super(streamid, method, reqheaders, rspheadersBuilder, uri, is, sslSession, os, conn, pushAllowed);
|
||||
this.conn = conn;
|
||||
}
|
||||
public void responseSent(String query) {
|
||||
System.out.println("Server: response sent for " + query);
|
||||
responseSentCB.accept(query);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
342
test/jdk/java/net/httpclient/http2/StreamFlowControlTest.java
Normal file
342
test/jdk/java/net/httpclient/http2/StreamFlowControlTest.java
Normal file
@ -0,0 +1,342 @@
|
||||
/*
|
||||
* Copyright (c) 2024, Oracle and/or its affiliates. All rights reserved.
|
||||
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
||||
*
|
||||
* This code is free software; you can redistribute it and/or modify it
|
||||
* under the terms of the GNU General Public License version 2 only, as
|
||||
* published by the Free Software Foundation.
|
||||
*
|
||||
* This code is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
|
||||
* version 2 for more details (a copy is included in the LICENSE file that
|
||||
* accompanied this code).
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License version
|
||||
* 2 along with this work; if not, write to the Free Software Foundation,
|
||||
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||
*
|
||||
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
|
||||
* or visit www.oracle.com if you need additional information or have any
|
||||
* questions.
|
||||
*/
|
||||
|
||||
/*
|
||||
* @test
|
||||
* @bug 8342075
|
||||
* @library /test/lib /test/jdk/java/net/httpclient/lib
|
||||
* @build jdk.httpclient.test.lib.http2.Http2TestServer jdk.test.lib.net.SimpleSSLContext
|
||||
* @run testng/othervm -Djdk.internal.httpclient.debug=true
|
||||
* -Djdk.httpclient.connectionWindowSize=65535
|
||||
* -Djdk.httpclient.windowsize=16384
|
||||
* StreamFlowControlTest
|
||||
*/
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.net.ProtocolException;
|
||||
import java.net.URI;
|
||||
import java.net.http.HttpClient;
|
||||
import java.net.http.HttpHeaders;
|
||||
import java.net.http.HttpRequest;
|
||||
import java.net.http.HttpRequest.BodyPublishers;
|
||||
import java.net.http.HttpResponse;
|
||||
import java.net.http.HttpResponse.BodyHandlers;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Arrays;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.Consumer;
|
||||
import javax.net.ssl.SSLContext;
|
||||
import javax.net.ssl.SSLSession;
|
||||
|
||||
import jdk.httpclient.test.lib.common.HttpServerAdapters.HttpTestServer;
|
||||
import jdk.httpclient.test.lib.http2.BodyOutputStream;
|
||||
import jdk.httpclient.test.lib.http2.Http2Handler;
|
||||
import jdk.httpclient.test.lib.http2.Http2TestExchange;
|
||||
import jdk.httpclient.test.lib.http2.Http2TestExchangeImpl;
|
||||
import jdk.httpclient.test.lib.http2.Http2TestServer;
|
||||
import jdk.httpclient.test.lib.http2.Http2TestServerConnection;
|
||||
import jdk.internal.net.http.common.HttpHeadersBuilder;
|
||||
import jdk.internal.net.http.frame.SettingsFrame;
|
||||
import jdk.test.lib.Utils;
|
||||
import jdk.test.lib.net.SimpleSSLContext;
|
||||
import org.testng.annotations.AfterTest;
|
||||
import org.testng.annotations.BeforeTest;
|
||||
import org.testng.annotations.DataProvider;
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
import static org.testng.Assert.assertEquals;
|
||||
import static org.testng.Assert.fail;
|
||||
|
||||
public class StreamFlowControlTest {
|
||||
|
||||
SSLContext sslContext;
|
||||
HttpTestServer http2TestServer; // HTTP/2 ( h2c )
|
||||
HttpTestServer https2TestServer; // HTTP/2 ( h2 )
|
||||
String http2URI;
|
||||
String https2URI;
|
||||
final AtomicInteger reqid = new AtomicInteger();
|
||||
|
||||
|
||||
@DataProvider(name = "variants")
|
||||
public Object[][] variants() {
|
||||
return new Object[][] {
|
||||
{ http2URI, false },
|
||||
{ https2URI, false },
|
||||
{ http2URI, true },
|
||||
{ https2URI, true },
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
@Test(dataProvider = "variants")
|
||||
void test(String uri,
|
||||
boolean sameClient)
|
||||
throws Exception
|
||||
{
|
||||
System.out.printf("%ntesting test(%s, %s)%n", uri, sameClient);
|
||||
ConcurrentHashMap<String, CompletableFuture<String>> responseSent = new ConcurrentHashMap<>();
|
||||
FCHttp2TestExchange.setResponseSentCB((s) -> responseSent.get(s).complete(s));
|
||||
|
||||
HttpClient client = null;
|
||||
try {
|
||||
int max = sameClient ? 10 : 3;
|
||||
String label = null;
|
||||
for (int i = 0; i < max; i++) {
|
||||
if (!sameClient || client == null)
|
||||
client = HttpClient.newBuilder().sslContext(sslContext).build();
|
||||
|
||||
String query = "reqId=" + reqid.incrementAndGet();
|
||||
URI uriWithQuery = URI.create(uri + "?" + query);
|
||||
CompletableFuture<String> sent = new CompletableFuture<>();
|
||||
responseSent.put(query, sent);
|
||||
HttpRequest request = HttpRequest.newBuilder(uriWithQuery)
|
||||
.POST(BodyPublishers.ofString("Hello there!"))
|
||||
.build();
|
||||
System.out.println("\nSending request:" + uriWithQuery);
|
||||
final HttpClient cc = client;
|
||||
try {
|
||||
HttpResponse<InputStream> response = cc.send(request, BodyHandlers.ofInputStream());
|
||||
if (sameClient) {
|
||||
String key = response.headers().firstValue("X-Connection-Key").get();
|
||||
if (label == null) label = key;
|
||||
assertEquals(key, label, "Unexpected key for " + query);
|
||||
}
|
||||
sent.join();
|
||||
// we have to pull to get the exception, but slow enough
|
||||
// so that DataFrames are buffered up to the point that
|
||||
// the window is exceeded...
|
||||
int wait = uri.startsWith("https://") ? 500 : 350;
|
||||
try (InputStream is = response.body()) {
|
||||
Thread.sleep(Utils.adjustTimeout(wait));
|
||||
is.readAllBytes();
|
||||
}
|
||||
// we could fail here if we haven't waited long enough
|
||||
fail("Expected exception, got :" + response + ", should sleep time be raised?");
|
||||
} catch (IOException ioe) {
|
||||
System.out.println("Got EXPECTED: " + ioe);
|
||||
assertDetailMessage(ioe, i);
|
||||
} finally {
|
||||
if (!sameClient && client != null) {
|
||||
client.close();
|
||||
client = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
if (sameClient && client != null) client.close();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Test(dataProvider = "variants")
|
||||
void testAsync(String uri,
|
||||
boolean sameClient)
|
||||
{
|
||||
System.out.printf("%ntesting testAsync(%s, %s)%n", uri, sameClient);
|
||||
ConcurrentHashMap<String, CompletableFuture<String>> responseSent = new ConcurrentHashMap<>();
|
||||
FCHttp2TestExchange.setResponseSentCB((s) -> responseSent.get(s).complete(s));
|
||||
|
||||
HttpClient client = null;
|
||||
try {
|
||||
int max = sameClient ? 5 : 3;
|
||||
String label = null;
|
||||
for (int i = 0; i < max; i++) {
|
||||
if (!sameClient || client == null)
|
||||
client = HttpClient.newBuilder().sslContext(sslContext).build();
|
||||
|
||||
String query = "reqId=" + reqid.incrementAndGet();
|
||||
URI uriWithQuery = URI.create(uri + "?" + query);
|
||||
CompletableFuture<String> sent = new CompletableFuture<>();
|
||||
responseSent.put(query, sent);
|
||||
HttpRequest request = HttpRequest.newBuilder(uriWithQuery)
|
||||
.POST(BodyPublishers.ofString("Hello there!"))
|
||||
.build();
|
||||
System.out.println("\nSending request:" + uriWithQuery);
|
||||
final HttpClient cc = client;
|
||||
|
||||
Throwable t = null;
|
||||
try {
|
||||
HttpResponse<InputStream> response = cc.sendAsync(request, BodyHandlers.ofInputStream()).get();
|
||||
if (sameClient) {
|
||||
String key = response.headers().firstValue("X-Connection-Key").get();
|
||||
if (label == null) label = key;
|
||||
assertEquals(key, label, "Unexpected key for " + query);
|
||||
}
|
||||
sent.join();
|
||||
int wait = uri.startsWith("https://") ? 600 : 300;
|
||||
try (InputStream is = response.body()) {
|
||||
Thread.sleep(Utils.adjustTimeout(wait));
|
||||
is.readAllBytes();
|
||||
}
|
||||
// we could fail here if we haven't waited long enough
|
||||
fail("Expected exception, got :" + response + ", should sleep time be raised?");
|
||||
} catch (Throwable t0) {
|
||||
System.out.println("Got EXPECTED: " + t0);
|
||||
if (t0 instanceof ExecutionException) {
|
||||
t0 = t0.getCause();
|
||||
}
|
||||
t = t0;
|
||||
} finally {
|
||||
if (!sameClient && client != null) {
|
||||
client.close();
|
||||
client = null;
|
||||
}
|
||||
}
|
||||
assertDetailMessage(t, i);
|
||||
}
|
||||
} finally {
|
||||
if (sameClient && client != null) client.close();
|
||||
}
|
||||
}
|
||||
|
||||
// Assertions based on implementation specific detail messages. Keep in
|
||||
// sync with implementation.
|
||||
static void assertDetailMessage(Throwable throwable, int iterationIndex) {
|
||||
try {
|
||||
Throwable cause = throwable;
|
||||
while (cause != null) {
|
||||
if (cause instanceof ProtocolException) {
|
||||
if (cause.getMessage().matches("stream [0-9]+ flow control window exceeded")) {
|
||||
System.out.println("Found expected exception: " + cause);
|
||||
return;
|
||||
}
|
||||
}
|
||||
cause = cause.getCause();
|
||||
}
|
||||
throw new AssertionError(
|
||||
"ProtocolException(\"stream X flow control window exceeded\") not found",
|
||||
throwable);
|
||||
} catch (AssertionError e) {
|
||||
System.out.println("Exception does not match expectation: " + throwable);
|
||||
throwable.printStackTrace(System.out);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
@BeforeTest
|
||||
public void setup() throws Exception {
|
||||
sslContext = new SimpleSSLContext().get();
|
||||
if (sslContext == null)
|
||||
throw new AssertionError("Unexpected null sslContext");
|
||||
|
||||
var http2TestServer = new Http2TestServer("localhost", false, 0);
|
||||
http2TestServer.addHandler(new Http2TestHandler(), "/http2/");
|
||||
this.http2TestServer = HttpTestServer.of(http2TestServer);
|
||||
http2URI = "http://" + this.http2TestServer.serverAuthority() + "/http2/x";
|
||||
|
||||
var https2TestServer = new Http2TestServer("localhost", true, sslContext);
|
||||
https2TestServer.addHandler(new Http2TestHandler(), "/https2/");
|
||||
this.https2TestServer = HttpTestServer.of(https2TestServer);
|
||||
https2URI = "https://" + this.https2TestServer.serverAuthority() + "/https2/x";
|
||||
|
||||
// Override the default exchange supplier with a custom one to enable
|
||||
// particular test scenarios
|
||||
http2TestServer.setExchangeSupplier(FCHttp2TestExchange::new);
|
||||
https2TestServer.setExchangeSupplier(FCHttp2TestExchange::new);
|
||||
|
||||
this.http2TestServer.start();
|
||||
this.https2TestServer.start();
|
||||
}
|
||||
|
||||
@AfterTest
|
||||
public void teardown() throws Exception {
|
||||
http2TestServer.stop();
|
||||
https2TestServer.stop();
|
||||
}
|
||||
|
||||
static class Http2TestHandler implements Http2Handler {
|
||||
|
||||
@Override
|
||||
public void handle(Http2TestExchange t) throws IOException {
|
||||
String query = t.getRequestURI().getRawQuery();
|
||||
|
||||
try (InputStream is = t.getRequestBody();
|
||||
OutputStream os = t.getResponseBody()) {
|
||||
|
||||
byte[] bytes = is.readAllBytes();
|
||||
System.out.println("Server " + t.getLocalAddress() + " received:\n"
|
||||
+ t.getRequestURI() + ": " + new String(bytes, StandardCharsets.UTF_8));
|
||||
t.getResponseHeaders().setHeader("X-Connection-Key", t.getConnectionKey());
|
||||
|
||||
if (bytes.length == 0) bytes = "no request body!".getBytes(StandardCharsets.UTF_8);
|
||||
int window = Integer.getInteger("jdk.httpclient.windowsize", 2 * 16 * 1024);
|
||||
final int maxChunkSize;
|
||||
if (t instanceof FCHttp2TestExchange fct) {
|
||||
maxChunkSize = Math.min(window, fct.conn.getMaxFrameSize());
|
||||
} else {
|
||||
maxChunkSize = Math.min(window, SettingsFrame.MAX_FRAME_SIZE);
|
||||
}
|
||||
byte[] resp = bytes.length <= maxChunkSize
|
||||
? bytes
|
||||
: Arrays.copyOfRange(bytes, 0, maxChunkSize);
|
||||
int max = (window / resp.length) + 2;
|
||||
// send in chunks
|
||||
t.sendResponseHeaders(200, 0);
|
||||
for (int i = 0; i <= max; i++) {
|
||||
if (t instanceof FCHttp2TestExchange fct) {
|
||||
try {
|
||||
// we don't wait for the stream window, but we want
|
||||
// to wait for the connection window
|
||||
fct.conn.obtainConnectionWindow(resp.length);
|
||||
} catch (InterruptedException ie) {
|
||||
// ignore and continue...
|
||||
}
|
||||
}
|
||||
((BodyOutputStream) os).writeUncontrolled(resp, 0, resp.length);
|
||||
}
|
||||
}
|
||||
if (t instanceof FCHttp2TestExchange fct) {
|
||||
fct.responseSent(query);
|
||||
} else fail("Exchange is not %s but %s"
|
||||
.formatted(FCHttp2TestExchange.class.getName(), t.getClass().getName()));
|
||||
}
|
||||
}
|
||||
|
||||
// A custom Http2TestExchangeImpl that overrides sendResponseHeaders to
|
||||
// allow headers to be sent with a number of CONTINUATION frames.
|
||||
static class FCHttp2TestExchange extends Http2TestExchangeImpl {
|
||||
static volatile Consumer<String> responseSentCB;
|
||||
static void setResponseSentCB(Consumer<String> responseSentCB) {
|
||||
FCHttp2TestExchange.responseSentCB = responseSentCB;
|
||||
}
|
||||
|
||||
final Http2TestServerConnection conn;
|
||||
FCHttp2TestExchange(int streamid, String method, HttpHeaders reqheaders,
|
||||
HttpHeadersBuilder rspheadersBuilder, URI uri, InputStream is,
|
||||
SSLSession sslSession, BodyOutputStream os,
|
||||
Http2TestServerConnection conn, boolean pushAllowed) {
|
||||
super(streamid, method, reqheaders, rspheadersBuilder, uri, is, sslSession, os, conn, pushAllowed);
|
||||
this.conn = conn;
|
||||
}
|
||||
public void responseSent(String query) {
|
||||
System.out.println("Server: response sent for " + query);
|
||||
responseSentCB.accept(query);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright (c) 2016, 2023, Oracle and/or its affiliates. All rights reserved.
|
||||
* Copyright (c) 2016, 2024, Oracle and/or its affiliates. All rights reserved.
|
||||
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
||||
*
|
||||
* This code is free software; you can redistribute it and/or modify it
|
||||
@ -25,6 +25,7 @@ package jdk.httpclient.test.lib.http2;
|
||||
|
||||
import java.io.*;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Objects;
|
||||
|
||||
import jdk.internal.net.http.frame.DataFrame;
|
||||
import jdk.internal.net.http.frame.ResetFrame;
|
||||
@ -65,6 +66,10 @@ public class BodyOutputStream extends OutputStream {
|
||||
// first wait for the connection window
|
||||
conn.obtainConnectionWindow(demand);
|
||||
// now wait for the stream window
|
||||
waitForStreamWindow(demand);
|
||||
}
|
||||
|
||||
public void waitForStreamWindow(int demand) throws InterruptedException {
|
||||
synchronized (this) {
|
||||
while (demand > 0) {
|
||||
int n = Math.min(demand, window);
|
||||
@ -83,6 +88,7 @@ public class BodyOutputStream extends OutputStream {
|
||||
|
||||
@Override
|
||||
public void write(byte[] buf, int offset, int len) throws IOException {
|
||||
Objects.checkFromIndexSize(offset, len, buf.length);
|
||||
if (closed) {
|
||||
throw new IOException("closed");
|
||||
}
|
||||
@ -104,6 +110,34 @@ public class BodyOutputStream extends OutputStream {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This method pushes frames onto the stack without checking
|
||||
* for flow control, allowing the sender to bypass flow
|
||||
* control for testing purposes
|
||||
* @param buf data to send
|
||||
* @param offset offset at which the data starts
|
||||
* @param len length of the data to send
|
||||
* @throws IOException if an I/O error occurs
|
||||
*/
|
||||
public void writeUncontrolled(byte[] buf, int offset, int len)
|
||||
throws IOException {
|
||||
Objects.checkFromIndexSize(offset, len, buf.length);
|
||||
if (closed) {
|
||||
throw new IOException("closed");
|
||||
}
|
||||
|
||||
if (!goodToGo) {
|
||||
throw new IllegalStateException("sendResponseHeaders must be called first");
|
||||
}
|
||||
int max = conn.getMaxFrameSize();
|
||||
while (len > 0) {
|
||||
int n = len > max ? max : len;
|
||||
send(buf, offset, n, 0);
|
||||
offset += n;
|
||||
len -= n;
|
||||
}
|
||||
}
|
||||
|
||||
private void send(byte[] buf, int offset, int len, int flags) throws IOException {
|
||||
ByteBuffer buffer = ByteBuffer.allocate(len);
|
||||
buffer.put(buf, offset, len);
|
||||
|
@ -1374,7 +1374,7 @@ public class Http2TestServerConnection {
|
||||
*
|
||||
* @param amount
|
||||
*/
|
||||
synchronized void obtainConnectionWindow(int amount) throws InterruptedException {
|
||||
public synchronized void obtainConnectionWindow(int amount) throws InterruptedException {
|
||||
while (amount > 0) {
|
||||
int n = Math.min(amount, sendWindow);
|
||||
amount -= n;
|
||||
@ -1384,9 +1384,13 @@ public class Http2TestServerConnection {
|
||||
}
|
||||
}
|
||||
|
||||
synchronized void updateConnectionWindow(int amount) {
|
||||
sendWindow += amount;
|
||||
notifyAll();
|
||||
void updateConnectionWindow(int amount) {
|
||||
System.out.printf("sendWindow (window=%s, amount=%s) is now: %s%n",
|
||||
sendWindow, amount, sendWindow + amount);
|
||||
synchronized (this) {
|
||||
sendWindow += amount;
|
||||
notifyAll();
|
||||
}
|
||||
}
|
||||
|
||||
// simplified output headers class. really just a type safe container
|
||||
|
Loading…
Reference in New Issue
Block a user