8343855: HTTP/2 ConnectionWindowUpdateSender may miss some unprocessed DataFrames from closed streams
Reviewed-by: jpai
This commit is contained in:
parent
c3776db498
commit
bd6152f596
@ -160,6 +160,10 @@ class Stream<T> extends ExchangeImpl<T> {
|
|||||||
// send lock: prevent sending DataFrames after reset occurred.
|
// send lock: prevent sending DataFrames after reset occurred.
|
||||||
private final Lock sendLock = new ReentrantLock();
|
private final Lock sendLock = new ReentrantLock();
|
||||||
private final Lock stateLock = new ReentrantLock();
|
private final Lock stateLock = new ReentrantLock();
|
||||||
|
// inputQ lock: methods that take from the inputQ
|
||||||
|
// must not run concurrently.
|
||||||
|
private final Lock inputQLock = new ReentrantLock();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A reference to this Stream's connection Send Window controller. The
|
* A reference to this Stream's connection Send Window controller. The
|
||||||
* stream MUST acquire the appropriate amount of Send Window before
|
* stream MUST acquire the appropriate amount of Send Window before
|
||||||
@ -183,6 +187,8 @@ class Stream<T> extends ExchangeImpl<T> {
|
|||||||
private void schedule() {
|
private void schedule() {
|
||||||
boolean onCompleteCalled = false;
|
boolean onCompleteCalled = false;
|
||||||
HttpResponse.BodySubscriber<T> subscriber = responseSubscriber;
|
HttpResponse.BodySubscriber<T> subscriber = responseSubscriber;
|
||||||
|
// prevents drainInputQueue() from running concurrently
|
||||||
|
inputQLock.lock();
|
||||||
try {
|
try {
|
||||||
if (subscriber == null) {
|
if (subscriber == null) {
|
||||||
// pendingResponseSubscriber will be null until response headers have been received and
|
// pendingResponseSubscriber will be null until response headers have been received and
|
||||||
@ -199,7 +205,7 @@ class Stream<T> extends ExchangeImpl<T> {
|
|||||||
Http2Frame frame = inputQ.peek();
|
Http2Frame frame = inputQ.peek();
|
||||||
if (frame instanceof ResetFrame rf) {
|
if (frame instanceof ResetFrame rf) {
|
||||||
inputQ.remove();
|
inputQ.remove();
|
||||||
if (endStreamReceived() && rf.getErrorCode() == ResetFrame.NO_ERROR) {
|
if (endStreamReceived() && rf.getErrorCode() == ResetFrame.NO_ERROR) {
|
||||||
// If END_STREAM is already received, complete the requestBodyCF successfully
|
// If END_STREAM is already received, complete the requestBodyCF successfully
|
||||||
// and stop sending any request data.
|
// and stop sending any request data.
|
||||||
requestBodyCF.complete(null);
|
requestBodyCF.complete(null);
|
||||||
@ -208,7 +214,7 @@ class Stream<T> extends ExchangeImpl<T> {
|
|||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
DataFrame df = (DataFrame)frame;
|
DataFrame df = (DataFrame) frame;
|
||||||
boolean finished = df.getFlag(DataFrame.END_STREAM);
|
boolean finished = df.getFlag(DataFrame.END_STREAM);
|
||||||
|
|
||||||
List<ByteBuffer> buffers = df.getData();
|
List<ByteBuffer> buffers = df.getData();
|
||||||
@ -256,6 +262,7 @@ class Stream<T> extends ExchangeImpl<T> {
|
|||||||
} catch (Throwable throwable) {
|
} catch (Throwable throwable) {
|
||||||
errorRef.compareAndSet(null, throwable);
|
errorRef.compareAndSet(null, throwable);
|
||||||
} finally {
|
} finally {
|
||||||
|
inputQLock.unlock();
|
||||||
if (sched.isStopped()) drainInputQueue();
|
if (sched.isStopped()) drainInputQueue();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -274,26 +281,36 @@ class Stream<T> extends ExchangeImpl<T> {
|
|||||||
} catch (Throwable x) {
|
} catch (Throwable x) {
|
||||||
Log.logError("Subscriber::onError threw exception: {0}", t);
|
Log.logError("Subscriber::onError threw exception: {0}", t);
|
||||||
} finally {
|
} finally {
|
||||||
|
// cancelImpl will eventually call drainInputQueue();
|
||||||
cancelImpl(t);
|
cancelImpl(t);
|
||||||
drainInputQueue();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// must only be called from the scheduler schedule() loop.
|
// Called from the scheduler schedule() loop,
|
||||||
// ensure that all received data frames are accounted for
|
// or after resetting the stream.
|
||||||
|
// Ensures that all received data frames are accounted for
|
||||||
// in the connection window flow control if the scheduler
|
// in the connection window flow control if the scheduler
|
||||||
// is stopped before all the data is consumed.
|
// is stopped before all the data is consumed.
|
||||||
|
// The inputQLock is used to prevent concurrently taking
|
||||||
|
// from the queue.
|
||||||
private void drainInputQueue() {
|
private void drainInputQueue() {
|
||||||
Http2Frame frame;
|
Http2Frame frame;
|
||||||
while ((frame = inputQ.poll()) != null) {
|
// will wait until schedule() has finished taking
|
||||||
if (frame instanceof DataFrame df) {
|
// from the queue, if needed.
|
||||||
// Data frames that have been added to the inputQ
|
inputQLock.lock();
|
||||||
// must be released using releaseUnconsumed() to
|
try {
|
||||||
// account for the amount of unprocessed bytes
|
while ((frame = inputQ.poll()) != null) {
|
||||||
// tracked by the connection.windowUpdater.
|
if (frame instanceof DataFrame df) {
|
||||||
connection.releaseUnconsumed(df);
|
// Data frames that have been added to the inputQ
|
||||||
|
// must be released using releaseUnconsumed() to
|
||||||
|
// account for the amount of unprocessed bytes
|
||||||
|
// tracked by the connection.windowUpdater.
|
||||||
|
connection.releaseUnconsumed(df);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
} finally {
|
||||||
|
inputQLock.unlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -405,12 +422,38 @@ class Stream<T> extends ExchangeImpl<T> {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
inputQ.add(df);
|
pushDataFrame(len, df);
|
||||||
} finally {
|
} finally {
|
||||||
sched.runOrSchedule();
|
sched.runOrSchedule();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Ensures that no data frame is pushed on the inputQ
|
||||||
|
// after the stream is closed.
|
||||||
|
// Changes to the `closed` boolean are guarded by the
|
||||||
|
// stateLock. Contention should be low as only one
|
||||||
|
// thread at a time adds to the inputQ, and
|
||||||
|
// we can only contend when closing the stream.
|
||||||
|
// Note that this method can run concurrently with
|
||||||
|
// methods holding the inputQLock: that is OK.
|
||||||
|
// The inputQLock is there to ensure that methods
|
||||||
|
// taking from the queue are not running concurrently
|
||||||
|
// with each others, but concurrently adding at the
|
||||||
|
// end of the queue while peeking/polling at the head
|
||||||
|
// is OK.
|
||||||
|
private void pushDataFrame(int len, DataFrame df) {
|
||||||
|
boolean closed = false;
|
||||||
|
stateLock.lock();
|
||||||
|
try {
|
||||||
|
if (!(closed = this.closed)) {
|
||||||
|
inputQ.add(df);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
stateLock.unlock();
|
||||||
|
}
|
||||||
|
if (closed && len > 0) connection.releaseUnconsumed(df);
|
||||||
|
}
|
||||||
|
|
||||||
/** Handles a RESET frame. RESET is always handled inline in the queue. */
|
/** Handles a RESET frame. RESET is always handled inline in the queue. */
|
||||||
private void receiveResetFrame(ResetFrame frame) {
|
private void receiveResetFrame(ResetFrame frame) {
|
||||||
inputQ.add(frame);
|
inputQ.add(frame);
|
||||||
@ -1547,6 +1590,8 @@ class Stream<T> extends ExchangeImpl<T> {
|
|||||||
}
|
}
|
||||||
} catch (Throwable ex) {
|
} catch (Throwable ex) {
|
||||||
Log.logError(ex);
|
Log.logError(ex);
|
||||||
|
} finally {
|
||||||
|
drainInputQueue();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1770,7 +1815,7 @@ class Stream<T> extends ExchangeImpl<T> {
|
|||||||
@Override
|
@Override
|
||||||
protected boolean windowSizeExceeded(long received) {
|
protected boolean windowSizeExceeded(long received) {
|
||||||
onProtocolError(new ProtocolException("stream %s flow control window exceeded"
|
onProtocolError(new ProtocolException("stream %s flow control window exceeded"
|
||||||
.formatted(streamid)), ResetFrame.FLOW_CONTROL_ERROR);
|
.formatted(streamid)), ResetFrame.FLOW_CONTROL_ERROR);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -171,7 +171,11 @@ public class ConnectionFlowControlTest {
|
|||||||
var response = responses.get(keys[i]);
|
var response = responses.get(keys[i]);
|
||||||
String ckey = response.headers().firstValue("X-Connection-Key").get();
|
String ckey = response.headers().firstValue("X-Connection-Key").get();
|
||||||
if (label == null) label = ckey;
|
if (label == null) label = ckey;
|
||||||
assertEquals(ckey, label, "Unexpected key for " + query);
|
if (i < max - 1) {
|
||||||
|
// the connection window might be exceeded at i == max - 2, which
|
||||||
|
// means that the last request could go on a new connection.
|
||||||
|
assertEquals(ckey, label, "Unexpected key for " + query);
|
||||||
|
}
|
||||||
int wait = uri.startsWith("https://") ? 500 : 250;
|
int wait = uri.startsWith("https://") ? 500 : 250;
|
||||||
try (InputStream is = response.body()) {
|
try (InputStream is = response.body()) {
|
||||||
Thread.sleep(Utils.adjustTimeout(wait));
|
Thread.sleep(Utils.adjustTimeout(wait));
|
||||||
|
@ -23,7 +23,7 @@
|
|||||||
|
|
||||||
/*
|
/*
|
||||||
* @test
|
* @test
|
||||||
* @bug 8342075
|
* @bug 8342075 8343855
|
||||||
* @library /test/lib /test/jdk/java/net/httpclient/lib
|
* @library /test/lib /test/jdk/java/net/httpclient/lib
|
||||||
* @build jdk.httpclient.test.lib.http2.Http2TestServer jdk.test.lib.net.SimpleSSLContext
|
* @build jdk.httpclient.test.lib.http2.Http2TestServer jdk.test.lib.net.SimpleSSLContext
|
||||||
* @run testng/othervm -Djdk.internal.httpclient.debug=true
|
* @run testng/othervm -Djdk.internal.httpclient.debug=true
|
||||||
@ -40,7 +40,6 @@ import java.net.URI;
|
|||||||
import java.net.http.HttpClient;
|
import java.net.http.HttpClient;
|
||||||
import java.net.http.HttpHeaders;
|
import java.net.http.HttpHeaders;
|
||||||
import java.net.http.HttpRequest;
|
import java.net.http.HttpRequest;
|
||||||
import java.net.http.HttpRequest.BodyPublishers;
|
|
||||||
import java.net.http.HttpResponse;
|
import java.net.http.HttpResponse;
|
||||||
import java.net.http.HttpResponse.BodyHandlers;
|
import java.net.http.HttpResponse.BodyHandlers;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
@ -53,6 +52,7 @@ import java.util.function.Consumer;
|
|||||||
import javax.net.ssl.SSLContext;
|
import javax.net.ssl.SSLContext;
|
||||||
import javax.net.ssl.SSLSession;
|
import javax.net.ssl.SSLSession;
|
||||||
|
|
||||||
|
import jdk.httpclient.test.lib.common.HttpServerAdapters.HttpHeadOrGetHandler;
|
||||||
import jdk.httpclient.test.lib.common.HttpServerAdapters.HttpTestServer;
|
import jdk.httpclient.test.lib.common.HttpServerAdapters.HttpTestServer;
|
||||||
import jdk.httpclient.test.lib.http2.BodyOutputStream;
|
import jdk.httpclient.test.lib.http2.BodyOutputStream;
|
||||||
import jdk.httpclient.test.lib.http2.Http2Handler;
|
import jdk.httpclient.test.lib.http2.Http2Handler;
|
||||||
@ -69,6 +69,7 @@ import org.testng.annotations.BeforeTest;
|
|||||||
import org.testng.annotations.DataProvider;
|
import org.testng.annotations.DataProvider;
|
||||||
import org.testng.annotations.Test;
|
import org.testng.annotations.Test;
|
||||||
|
|
||||||
|
import static java.util.concurrent.TimeUnit.NANOSECONDS;
|
||||||
import static org.testng.Assert.assertEquals;
|
import static org.testng.Assert.assertEquals;
|
||||||
import static org.testng.Assert.fail;
|
import static org.testng.Assert.fail;
|
||||||
|
|
||||||
@ -92,6 +93,19 @@ public class StreamFlowControlTest {
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void sleep(long wait) throws InterruptedException {
|
||||||
|
if (wait <= 0) return;
|
||||||
|
long remaining = Utils.adjustTimeout(wait);
|
||||||
|
long start = System.nanoTime();
|
||||||
|
while (remaining > 0) {
|
||||||
|
Thread.sleep(remaining);
|
||||||
|
long end = System.nanoTime();
|
||||||
|
remaining = remaining - NANOSECONDS.toMillis(end - start);
|
||||||
|
}
|
||||||
|
System.out.printf("Waited %s ms%n",
|
||||||
|
NANOSECONDS.toMillis(System.nanoTime() - start));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test(dataProvider = "variants")
|
@Test(dataProvider = "variants")
|
||||||
void test(String uri,
|
void test(String uri,
|
||||||
@ -115,7 +129,7 @@ public class StreamFlowControlTest {
|
|||||||
CompletableFuture<String> sent = new CompletableFuture<>();
|
CompletableFuture<String> sent = new CompletableFuture<>();
|
||||||
responseSent.put(query, sent);
|
responseSent.put(query, sent);
|
||||||
HttpRequest request = HttpRequest.newBuilder(uriWithQuery)
|
HttpRequest request = HttpRequest.newBuilder(uriWithQuery)
|
||||||
.POST(BodyPublishers.ofString("Hello there!"))
|
.GET()
|
||||||
.build();
|
.build();
|
||||||
System.out.println("\nSending request:" + uriWithQuery);
|
System.out.println("\nSending request:" + uriWithQuery);
|
||||||
final HttpClient cc = client;
|
final HttpClient cc = client;
|
||||||
@ -130,9 +144,9 @@ public class StreamFlowControlTest {
|
|||||||
// we have to pull to get the exception, but slow enough
|
// we have to pull to get the exception, but slow enough
|
||||||
// so that DataFrames are buffered up to the point that
|
// so that DataFrames are buffered up to the point that
|
||||||
// the window is exceeded...
|
// the window is exceeded...
|
||||||
int wait = uri.startsWith("https://") ? 500 : 350;
|
long wait = uri.startsWith("https://") ? 800 : 350;
|
||||||
try (InputStream is = response.body()) {
|
try (InputStream is = response.body()) {
|
||||||
Thread.sleep(Utils.adjustTimeout(wait));
|
sleep(wait);
|
||||||
is.readAllBytes();
|
is.readAllBytes();
|
||||||
}
|
}
|
||||||
// we could fail here if we haven't waited long enough
|
// we could fail here if we haven't waited long enough
|
||||||
@ -174,7 +188,7 @@ public class StreamFlowControlTest {
|
|||||||
CompletableFuture<String> sent = new CompletableFuture<>();
|
CompletableFuture<String> sent = new CompletableFuture<>();
|
||||||
responseSent.put(query, sent);
|
responseSent.put(query, sent);
|
||||||
HttpRequest request = HttpRequest.newBuilder(uriWithQuery)
|
HttpRequest request = HttpRequest.newBuilder(uriWithQuery)
|
||||||
.POST(BodyPublishers.ofString("Hello there!"))
|
.GET()
|
||||||
.build();
|
.build();
|
||||||
System.out.println("\nSending request:" + uriWithQuery);
|
System.out.println("\nSending request:" + uriWithQuery);
|
||||||
final HttpClient cc = client;
|
final HttpClient cc = client;
|
||||||
@ -188,9 +202,9 @@ public class StreamFlowControlTest {
|
|||||||
assertEquals(key, label, "Unexpected key for " + query);
|
assertEquals(key, label, "Unexpected key for " + query);
|
||||||
}
|
}
|
||||||
sent.join();
|
sent.join();
|
||||||
int wait = uri.startsWith("https://") ? 600 : 300;
|
long wait = uri.startsWith("https://") ? 800 : 350;
|
||||||
try (InputStream is = response.body()) {
|
try (InputStream is = response.body()) {
|
||||||
Thread.sleep(Utils.adjustTimeout(wait));
|
sleep(wait);
|
||||||
is.readAllBytes();
|
is.readAllBytes();
|
||||||
}
|
}
|
||||||
// we could fail here if we haven't waited long enough
|
// we could fail here if we haven't waited long enough
|
||||||
@ -252,7 +266,9 @@ public class StreamFlowControlTest {
|
|||||||
var https2TestServer = new Http2TestServer("localhost", true, sslContext);
|
var https2TestServer = new Http2TestServer("localhost", true, sslContext);
|
||||||
https2TestServer.addHandler(new Http2TestHandler(), "/https2/");
|
https2TestServer.addHandler(new Http2TestHandler(), "/https2/");
|
||||||
this.https2TestServer = HttpTestServer.of(https2TestServer);
|
this.https2TestServer = HttpTestServer.of(https2TestServer);
|
||||||
|
this.https2TestServer.addHandler(new HttpHeadOrGetHandler(), "/https2/head/");
|
||||||
https2URI = "https://" + this.https2TestServer.serverAuthority() + "/https2/x";
|
https2URI = "https://" + this.https2TestServer.serverAuthority() + "/https2/x";
|
||||||
|
String h2Head = "https://" + this.https2TestServer.serverAuthority() + "/https2/head/z";
|
||||||
|
|
||||||
// Override the default exchange supplier with a custom one to enable
|
// Override the default exchange supplier with a custom one to enable
|
||||||
// particular test scenarios
|
// particular test scenarios
|
||||||
@ -261,6 +277,13 @@ public class StreamFlowControlTest {
|
|||||||
|
|
||||||
this.http2TestServer.start();
|
this.http2TestServer.start();
|
||||||
this.https2TestServer.start();
|
this.https2TestServer.start();
|
||||||
|
|
||||||
|
// warmup to eliminate delay due to SSL class loading and initialization.
|
||||||
|
try (var client = HttpClient.newBuilder().sslContext(sslContext).build()) {
|
||||||
|
var request = HttpRequest.newBuilder(URI.create(h2Head)).HEAD().build();
|
||||||
|
var resp = client.send(request, BodyHandlers.discarding());
|
||||||
|
assertEquals(resp.statusCode(), 200);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterTest
|
@AfterTest
|
||||||
@ -279,11 +302,19 @@ public class StreamFlowControlTest {
|
|||||||
OutputStream os = t.getResponseBody()) {
|
OutputStream os = t.getResponseBody()) {
|
||||||
|
|
||||||
byte[] bytes = is.readAllBytes();
|
byte[] bytes = is.readAllBytes();
|
||||||
System.out.println("Server " + t.getLocalAddress() + " received:\n"
|
if (bytes.length != 0) {
|
||||||
+ t.getRequestURI() + ": " + new String(bytes, StandardCharsets.UTF_8));
|
System.out.println("Server " + t.getLocalAddress() + " received:\n"
|
||||||
|
+ t.getRequestURI() + ": " + new String(bytes, StandardCharsets.UTF_8));
|
||||||
|
} else {
|
||||||
|
System.out.println("No request body for " + t.getRequestMethod());
|
||||||
|
}
|
||||||
|
|
||||||
t.getResponseHeaders().setHeader("X-Connection-Key", t.getConnectionKey());
|
t.getResponseHeaders().setHeader("X-Connection-Key", t.getConnectionKey());
|
||||||
|
|
||||||
if (bytes.length == 0) bytes = "no request body!".getBytes(StandardCharsets.UTF_8);
|
if (bytes.length == 0) {
|
||||||
|
bytes = "no request body!"
|
||||||
|
.repeat(100).getBytes(StandardCharsets.UTF_8);
|
||||||
|
}
|
||||||
int window = Integer.getInteger("jdk.httpclient.windowsize", 2 * 16 * 1024);
|
int window = Integer.getInteger("jdk.httpclient.windowsize", 2 * 16 * 1024);
|
||||||
final int maxChunkSize;
|
final int maxChunkSize;
|
||||||
if (t instanceof FCHttp2TestExchange fct) {
|
if (t instanceof FCHttp2TestExchange fct) {
|
||||||
@ -307,13 +338,22 @@ public class StreamFlowControlTest {
|
|||||||
// ignore and continue...
|
// ignore and continue...
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
((BodyOutputStream) os).writeUncontrolled(resp, 0, resp.length);
|
try {
|
||||||
|
((BodyOutputStream) os).writeUncontrolled(resp, 0, resp.length);
|
||||||
|
} catch (IOException x) {
|
||||||
|
if (t instanceof FCHttp2TestExchange fct) {
|
||||||
|
fct.conn.updateConnectionWindow(resp.length);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
if (t instanceof FCHttp2TestExchange fct) {
|
||||||
|
fct.responseSent(query);
|
||||||
|
} else {
|
||||||
|
fail("Exchange is not %s but %s"
|
||||||
|
.formatted(FCHttp2TestExchange.class.getName(), t.getClass().getName()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (t instanceof FCHttp2TestExchange fct) {
|
|
||||||
fct.responseSent(query);
|
|
||||||
} else fail("Exchange is not %s but %s"
|
|
||||||
.formatted(FCHttp2TestExchange.class.getName(), t.getClass().getName()));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -49,6 +49,7 @@ import java.math.BigInteger;
|
|||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.net.http.HttpHeaders;
|
import java.net.http.HttpHeaders;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.util.Base64;
|
import java.util.Base64;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.ListIterator;
|
import java.util.ListIterator;
|
||||||
@ -66,6 +67,9 @@ import java.util.stream.Stream;
|
|||||||
|
|
||||||
import javax.net.ssl.SSLContext;
|
import javax.net.ssl.SSLContext;
|
||||||
|
|
||||||
|
import static java.net.http.HttpClient.Version.HTTP_1_1;
|
||||||
|
import static java.net.http.HttpClient.Version.HTTP_2;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Defines an adaptation layers so that a test server handlers and filters
|
* Defines an adaptation layers so that a test server handlers and filters
|
||||||
* can be implemented independently of the underlying server version.
|
* can be implemented independently of the underlying server version.
|
||||||
@ -268,9 +272,9 @@ public interface HttpServerAdapters {
|
|||||||
this.exchange = exch;
|
this.exchange = exch;
|
||||||
}
|
}
|
||||||
@Override
|
@Override
|
||||||
public Version getServerVersion() { return Version.HTTP_1_1; }
|
public Version getServerVersion() { return HTTP_1_1; }
|
||||||
@Override
|
@Override
|
||||||
public Version getExchangeVersion() { return Version.HTTP_1_1; }
|
public Version getExchangeVersion() { return HTTP_1_1; }
|
||||||
@Override
|
@Override
|
||||||
public InputStream getRequestBody() {
|
public InputStream getRequestBody() {
|
||||||
return exchange.getRequestBody();
|
return exchange.getRequestBody();
|
||||||
@ -330,9 +334,9 @@ public interface HttpServerAdapters {
|
|||||||
this.exchange = exch;
|
this.exchange = exch;
|
||||||
}
|
}
|
||||||
@Override
|
@Override
|
||||||
public Version getServerVersion() { return Version.HTTP_2; }
|
public Version getServerVersion() { return HTTP_2; }
|
||||||
@Override
|
@Override
|
||||||
public Version getExchangeVersion() { return Version.HTTP_2; }
|
public Version getExchangeVersion() { return HTTP_2; }
|
||||||
@Override
|
@Override
|
||||||
public InputStream getRequestBody() {
|
public InputStream getRequestBody() {
|
||||||
return exchange.getRequestBody();
|
return exchange.getRequestBody();
|
||||||
@ -421,6 +425,53 @@ public interface HttpServerAdapters {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* An {@link HttpTestHandler} that handles only HEAD and GET
|
||||||
|
* requests. If another method is used 405 is returned with
|
||||||
|
* an empty body.
|
||||||
|
* The response is always returned with fixed length.
|
||||||
|
*/
|
||||||
|
public static class HttpHeadOrGetHandler implements HttpTestHandler {
|
||||||
|
final String responseBody;
|
||||||
|
public HttpHeadOrGetHandler() {
|
||||||
|
this("pâté de tête persillé");
|
||||||
|
}
|
||||||
|
public HttpHeadOrGetHandler(String responseBody) {
|
||||||
|
this.responseBody = Objects.requireNonNull(responseBody);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void handle(HttpTestExchange t) throws IOException {
|
||||||
|
try (var exchg = t) {
|
||||||
|
exchg.getRequestBody().readAllBytes();
|
||||||
|
String method = exchg.getRequestMethod();
|
||||||
|
switch (method) {
|
||||||
|
case "HEAD" -> {
|
||||||
|
byte[] resp = responseBody.getBytes(StandardCharsets.UTF_8);
|
||||||
|
if (exchg.getExchangeVersion() != HTTP_1_1) {
|
||||||
|
// with HTTP/2 or HTTP/3 the server will not send content-length
|
||||||
|
exchg.getResponseHeaders()
|
||||||
|
.addHeader("Content-Length", String.valueOf(resp.length));
|
||||||
|
}
|
||||||
|
exchg.sendResponseHeaders(200, resp.length);
|
||||||
|
exchg.getResponseBody().close();
|
||||||
|
}
|
||||||
|
case "GET" -> {
|
||||||
|
byte[] resp = responseBody.getBytes(StandardCharsets.UTF_8);
|
||||||
|
exchg.sendResponseHeaders(200, resp.length);
|
||||||
|
try (var os = exchg.getResponseBody()) {
|
||||||
|
os.write(resp);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
default -> {
|
||||||
|
exchg.sendResponseHeaders(405, 0);
|
||||||
|
exchg.getResponseBody().close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
public static class HttpTestEchoHandler implements HttpTestHandler {
|
public static class HttpTestEchoHandler implements HttpTestHandler {
|
||||||
@Override
|
@Override
|
||||||
@ -877,7 +928,7 @@ public interface HttpServerAdapters {
|
|||||||
return new InetSocketAddress(InetAddress.getLoopbackAddress(),
|
return new InetSocketAddress(InetAddress.getLoopbackAddress(),
|
||||||
impl.getAddress().getPort());
|
impl.getAddress().getPort());
|
||||||
}
|
}
|
||||||
public Version getVersion() { return Version.HTTP_1_1; }
|
public Version getVersion() { return HTTP_1_1; }
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setRequestApprover(final Predicate<String> approver) {
|
public void setRequestApprover(final Predicate<String> approver) {
|
||||||
@ -902,7 +953,7 @@ public interface HttpServerAdapters {
|
|||||||
public void setAuthenticator(com.sun.net.httpserver.Authenticator authenticator) {
|
public void setAuthenticator(com.sun.net.httpserver.Authenticator authenticator) {
|
||||||
context.setAuthenticator(authenticator);
|
context.setAuthenticator(authenticator);
|
||||||
}
|
}
|
||||||
@Override public Version getVersion() { return Version.HTTP_1_1; }
|
@Override public Version getVersion() { return HTTP_1_1; }
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class Http2TestServerImpl extends HttpTestServer {
|
private static class Http2TestServerImpl extends HttpTestServer {
|
||||||
@ -933,7 +984,7 @@ public interface HttpServerAdapters {
|
|||||||
return new InetSocketAddress(InetAddress.getLoopbackAddress(),
|
return new InetSocketAddress(InetAddress.getLoopbackAddress(),
|
||||||
impl.getAddress().getPort());
|
impl.getAddress().getPort());
|
||||||
}
|
}
|
||||||
public Version getVersion() { return Version.HTTP_2; }
|
public Version getVersion() { return HTTP_2; }
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setRequestApprover(final Predicate<String> approver) {
|
public void setRequestApprover(final Predicate<String> approver) {
|
||||||
@ -971,7 +1022,7 @@ public interface HttpServerAdapters {
|
|||||||
"only BasicAuthenticator is supported on HTTP/2 context");
|
"only BasicAuthenticator is supported on HTTP/2 context");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@Override public Version getVersion() { return Version.HTTP_2; }
|
@Override public Version getVersion() { return HTTP_2; }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1384,7 +1384,7 @@ public class Http2TestServerConnection {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void updateConnectionWindow(int amount) {
|
public void updateConnectionWindow(int amount) {
|
||||||
System.out.printf("sendWindow (window=%s, amount=%s) is now: %s%n",
|
System.out.printf("sendWindow (window=%s, amount=%s) is now: %s%n",
|
||||||
sendWindow, amount, sendWindow + amount);
|
sendWindow, amount, sendWindow + amount);
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
|
Loading…
Reference in New Issue
Block a user