diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java b/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java index a08a6651a9a..c4a7abe2816 100644 --- a/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java +++ b/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java @@ -161,6 +161,9 @@ class Stream extends ExchangeImpl { private final WindowController windowController; private final WindowUpdateSender windowUpdater; + // Only accessed in all method calls from incoming(), no need for volatile + private boolean endStreamSeen; + @Override HttpConnection connection() { return connection.connection; @@ -175,6 +178,8 @@ class Stream extends ExchangeImpl { HttpResponse.BodySubscriber subscriber = responseSubscriber; try { if (subscriber == null) { + // pendingResponseSubscriber will be null until response headers have been received and + // readBodyAsync is called. subscriber = responseSubscriber = pendingResponseSubscriber; if (subscriber == null) { // can't process anything yet @@ -187,7 +192,13 @@ class Stream extends ExchangeImpl { Http2Frame frame = inputQ.peek(); if (frame instanceof ResetFrame rf) { inputQ.remove(); - handleReset(rf, subscriber); + if (endStreamReceived() && rf.getErrorCode() == ResetFrame.NO_ERROR) { + // If END_STREAM is already received, complete the requestBodyCF successfully + // and stop sending any request data. + requestBodyCF.complete(null); + } else { + handleReset(rf, subscriber); + } return; } DataFrame df = (DataFrame)frame; @@ -201,7 +212,6 @@ class Stream extends ExchangeImpl { connection.ensureWindowUpdated(df); // must update connection window Log.logTrace("responseSubscriber.onComplete"); if (debug.on()) debug.log("incoming: onComplete"); - sched.stop(); connection.decrementStreamsCount(streamid); subscriber.onComplete(); onCompleteCalled = true; @@ -220,7 +230,6 @@ class Stream extends ExchangeImpl { if (consumed(df)) { Log.logTrace("responseSubscriber.onComplete"); if (debug.on()) debug.log("incoming: onComplete"); - sched.stop(); connection.decrementStreamsCount(streamid); subscriber.onComplete(); onCompleteCalled = true; @@ -479,10 +488,12 @@ class Stream extends ExchangeImpl { handleResponse(hf); } if (hf.getFlag(HeaderFrame.END_STREAM)) { + endStreamSeen = true; if (debug.on()) debug.log("handling END_STREAM: %d", streamid); receiveDataFrame(new DataFrame(streamid, DataFrame.END_STREAM, List.of())); } } else if (frame instanceof DataFrame df) { + if (df.getFlag(DataFrame.END_STREAM)) endStreamSeen = true; if (cancelled) { if (debug.on()) { debug.log("request cancelled or stream closed: dropping data frame"); @@ -568,46 +579,50 @@ class Stream extends ExchangeImpl { void incoming_reset(ResetFrame frame) { Log.logTrace("Received RST_STREAM on stream {0}", streamid); - if (endStreamReceived()) { + // responseSubscriber will be null if readBodyAsync has not yet been called + Flow.Subscriber subscriber = responseSubscriber; + if (subscriber == null) subscriber = pendingResponseSubscriber; + // See RFC 9113 sec 5.1 Figure 2, life-cycle of a stream + if (endStreamReceived() && requestBodyCF.isDone()) { + // Stream is in a half closed or fully closed state, the RST_STREAM is ignored and logged. Log.logTrace("Ignoring RST_STREAM frame received on remotely closed stream {0}", streamid); } else if (closed) { + // Stream is in a fully closed state, the RST_STREAM is ignored and logged. Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid); + } else if (subscriber == null && !endStreamSeen) { + // subscriber is null and the reader has not seen an END_STREAM flag, handle reset immediately + handleReset(frame, null); + } else if (!requestBodyCF.isDone()) { + // Not done sending the body, complete exceptionally or normally based on RST_STREAM error code + incompleteRequestBodyReset(frame, subscriber); + } else if (response == null || !finalResponseCodeReceived) { + // Complete response has not been received, handle reset immediately + handleReset(frame, null); } else { - Flow.Subscriber subscriber = - responseSubscriber == null ? pendingResponseSubscriber : responseSubscriber; - if (!requestBodyCF.isDone()) { - // If a RST_STREAM is received, complete the requestBody. This will allow the - // response to be read before the Reset is handled in the case where the client's - // input stream is partially consumed or not consumed at all by the server. - if (frame.getErrorCode() != ResetFrame.NO_ERROR) { - if (debug.on()) { - debug.log("completing requestBodyCF exceptionally due to received" + - " RESET(%s) (stream=%s)", frame.getErrorCode(), streamid); - } - requestBodyCF.completeExceptionally(new IOException("RST_STREAM received")); - } else { - if (debug.on()) { - debug.log("completing requestBodyCF normally due to received" + - " RESET(NO_ERROR) (stream=%s)", streamid); - } - requestBodyCF.complete(null); - } + // Put ResetFrame into inputQ. Any frames already in the queue will be processed before the ResetFrame. + receiveResetFrame(frame); + Log.logTrace("RST_STREAM pushed in queue for stream {0}", streamid); + } + } + + void incompleteRequestBodyReset(ResetFrame frame, Flow.Subscriber subscriber) { + if (frame.getErrorCode() != ResetFrame.NO_ERROR) { + if (debug.on()) { + debug.log("completing requestBodyCF exceptionally due to received" + + " RESET(%s) (stream=%s)", frame.getErrorCode(), streamid); } - if ((response == null || !finalResponseCodeReceived) && subscriber == null) { - // we haven't received the headers yet, and won't receive any! - // handle reset now. - handleReset(frame, null); + requestBodyCF.completeExceptionally(new IOException("RST_STREAM received")); + } else { + if (debug.on()) { + debug.log("completing requestBodyCF normally due to received" + + " RESET(NO_ERROR) (stream=%s)", streamid); + } + if (!endStreamSeen || !finalResponseCodeReceived) { + // If no END_STREAM flag seen or the final response code has not been received, any RST_STREAM + // should be handled here immediately + handleReset(frame, subscriber); } else { - // put it in the input queue in order to read all - // pending data frames first. Indeed, a server may send - // RST_STREAM after sending END_STREAM, in which case we should - // ignore it. However, we won't know if we have received END_STREAM - // or not until all pending data frames are read. - receiveResetFrame(frame); - // RST_STREAM was pushed to the queue. It will be handled by - // asyncReceive after all pending data frames have been - // processed. - Log.logTrace("RST_STREAM pushed in queue for stream {0}", streamid); + requestBodyCF.complete(null); } } } @@ -1376,14 +1391,21 @@ class Stream extends ExchangeImpl { } if (closing) { // true if the stream has not been closed yet - if (responseSubscriber != null || pendingResponseSubscriber != null) { + var subscriber = this.responseSubscriber; + if (subscriber == null) subscriber = this.pendingResponseSubscriber; + if (subscriber != null) { if (debug.on()) debug.log("stream %s closing due to %s", streamid, (Object)errorRef.get()); sched.runOrSchedule(); + if (subscriber instanceof Http2StreamResponseSubscriber rs) { + // make sure the subscriber is stopped. + if (debug.on()) debug.log("closing response subscriber stream %s", streamid); + rs.complete(errorRef.get()); + } } else { if (debug.on()) debug.log("stream %s closing due to %s before subscriber registered", - streamid, (Object)errorRef.get()); + streamid, (Object)errorRef.get()); } } else { if (debug.on()) { diff --git a/test/jdk/java/net/httpclient/ExpectContinueTest.java b/test/jdk/java/net/httpclient/ExpectContinueTest.java index c85d7d13e2c..2996d6e252b 100644 --- a/test/jdk/java/net/httpclient/ExpectContinueTest.java +++ b/test/jdk/java/net/httpclient/ExpectContinueTest.java @@ -23,22 +23,30 @@ /* * @test - * @summary Tests that when the httpclient sends a 100 Expect Continue header and receives - * a response code of 417 Expectation Failed, that the client does not hang - * indefinitely and closes the connection. + * @summary Tests basic handling of Partial Responses by the HttpClient * @bug 8286171 8307648 * @library /test/lib /test/jdk/java/net/httpclient/lib * @build jdk.httpclient.test.lib.common.HttpServerAdapters - * @run testng/othervm -Djdk.internal.httpclient.debug=err ExpectContinueTest + * @run testng/othervm -Djdk.internal.httpclient.debug=true -Djdk.httpclient.HttpClient.log=errors ExpectContinueTest */ +import jdk.httpclient.test.lib.http2.BodyOutputStream; +import jdk.httpclient.test.lib.http2.Http2Handler; +import jdk.httpclient.test.lib.http2.Http2TestExchange; +import jdk.httpclient.test.lib.http2.Http2TestExchangeImpl; +import jdk.httpclient.test.lib.http2.Http2TestServer; +import jdk.httpclient.test.lib.http2.Http2TestServerConnection; +import jdk.internal.net.http.common.HttpHeadersBuilder; +import jdk.internal.net.http.frame.HeaderFrame; +import org.testng.TestException; import org.testng.annotations.AfterTest; import org.testng.annotations.BeforeTest; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import javax.net.ServerSocketFactory; +import javax.net.ssl.SSLSession; import java.io.BufferedReader; import java.io.Closeable; import java.io.IOException; @@ -46,6 +54,7 @@ import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; import java.io.OutputStreamWriter; +import java.io.PrintStream; import java.io.PrintWriter; import java.io.Writer; import java.net.InetAddress; @@ -55,65 +64,107 @@ import java.net.Socket; import java.net.URI; import java.net.http.HttpClient; import java.net.http.HttpClient.Builder; +import java.net.http.HttpHeaders; import java.net.http.HttpRequest; import java.net.http.HttpResponse; import java.util.StringTokenizer; +import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + import jdk.httpclient.test.lib.common.HttpServerAdapters; import static java.net.http.HttpClient.Version.HTTP_1_1; import static java.net.http.HttpClient.Version.HTTP_2; import static java.nio.charset.StandardCharsets.UTF_8; -import static org.testng.Assert.assertEquals; +import static org.testng.Assert.*; public class ExpectContinueTest implements HttpServerAdapters { HttpTestServer http1TestServer; // HTTP/1.1 Http1HangServer http1HangServer; - HttpTestServer http2TestServer; // HTTP/2 + Http2TestServer http2TestServer; // HTTP/2 - URI getUri; - URI postUri; - URI hangUri; - URI h2getUri; - URI h2postUri; - URI h2hangUri; + URI getUri, postUri, hangUri; + URI h2postUri, h2hangUri, h2endStreamUri, h2warmupURI; + static PrintStream err = new PrintStream(System.err); + static PrintStream out = new PrintStream(System.out); static final String EXPECTATION_FAILED_417 = "417 Expectation Failed"; + @DataProvider(name = "uris") + public Object[][] urisData() { + return new Object[][]{ + // URI, Expected Status Code, Will finish with Exception, Protocol Version + { postUri, 200, false, HTTP_1_1 }, + { hangUri, 417, false, HTTP_1_1}, + { h2postUri, 200, false, HTTP_2 }, + { h2hangUri, 417, false, HTTP_2 }, + { h2endStreamUri, 200, true, HTTP_2 }, // Error + }; + } + @Test(dataProvider = "uris") + public void test(URI uri, int expectedStatusCode, boolean exceptionally, HttpClient.Version version) + throws CancellationException, InterruptedException, ExecutionException, IOException { + + err.printf("\nTesting with Version: %s, URI: %s, exceptionally: %b\n", version, uri, exceptionally); + try (HttpClient client = HttpClient.newBuilder().proxy(Builder.NO_PROXY).version(version).build()) { + HttpResponse resp = null; + Throwable testThrowable = null; + if (!version.equals(HTTP_1_1)) { + err.printf("Performing warmup request to %s", h2warmupURI); + client.send(HttpRequest.newBuilder(h2warmupURI).GET().version(version).build(), HttpResponse.BodyHandlers.discarding()); + } + HttpRequest postRequest = HttpRequest.newBuilder(uri) + .version(version) + .POST(HttpRequest.BodyPublishers.ofString("Sample Post")) + .expectContinue(true) + .build(); + err.printf("Sending request (%s): %s%n", version, postRequest); + CompletableFuture> cf = client.sendAsync(postRequest, HttpResponse.BodyHandlers.ofString()); + try { + resp = cf.get(); + } catch (Exception e) { + testThrowable = e.getCause(); + } + verifyRequest(expectedStatusCode, resp, exceptionally, testThrowable); + } + } + @BeforeTest public void setup() throws Exception { InetSocketAddress saHang = new InetSocketAddress(InetAddress.getLoopbackAddress(), 0); - http1TestServer = HttpTestServer.create(HTTP_1_1); http1TestServer.addHandler(new GetHandler(), "/http1/get"); http1TestServer.addHandler(new PostHandler(), "/http1/post"); getUri = URI.create("http://" + http1TestServer.serverAuthority() + "/http1/get"); postUri = URI.create("http://" + http1TestServer.serverAuthority() + "/http1/post"); - // Due to limitations of the above Http1 Server, a manual approach is taken to test the hanging with the + // Due to limitations of the above Http1 Test Server, a manual approach is taken to test the hanging with the // httpclient using Http1 so that the correct response header can be returned for the test case http1HangServer = new Http1HangServer(saHang); hangUri = URI.create("http://" + http1HangServer.ia.getCanonicalHostName() + ":" + http1HangServer.port + "/http1/hang"); + http2TestServer = new Http2TestServer(false, 0); + http2TestServer.setExchangeSupplier(ExpectContinueTestExchangeImpl::new); + http2TestServer.addHandler(new GetHandler().toHttp2Handler(), "/http2/warmup"); + http2TestServer.addHandler(new PostHandler().toHttp2Handler(), "/http2/post"); + http2TestServer.addHandler(new PostHandlerCantContinue().toHttp2Handler(), "/http2/hang"); + http2TestServer.addHandler(new PostHandlerHttp2(), "/http2/endStream"); - http2TestServer = HttpTestServer.create(HTTP_2); - http2TestServer.addHandler(new GetHandler(), "/http2/get"); - http2TestServer.addHandler(new PostHandler(), "/http2/post"); - http2TestServer.addHandler(new PostHandlerCantContinue(), "/http2/hang"); - h2getUri = URI.create("http://" + http2TestServer.serverAuthority() + "/http2/get"); + h2warmupURI = new URI("http://" + http2TestServer.serverAuthority() + "/http2/warmup"); h2postUri = URI.create("http://" + http2TestServer.serverAuthority() + "/http2/post"); h2hangUri = URI.create("http://" + http2TestServer.serverAuthority() + "/http2/hang"); + h2endStreamUri = URI.create("http://" + http2TestServer.serverAuthority() + "/http2/endStream"); - System.out.println("HTTP/1.1 server listening at: " + http1TestServer.serverAuthority()); - System.out.println("HTTP/1.1 hang server listening at: " + hangUri.getRawAuthority()); - System.out.println("HTTP/2 clear server listening at: " + http2TestServer.serverAuthority()); + out.printf("HTTP/1.1 server listening at: %s", http1TestServer.serverAuthority()); + out.printf("HTTP/1.1 hang server listening at: %s", hangUri.getRawAuthority()); + out.printf("HTTP/2 clear server listening at: %s", http2TestServer.serverAuthority()); http1TestServer.start(); http1HangServer.start(); http2TestServer.start(); } - @AfterTest public void teardown() throws IOException { http1TestServer.stop(); @@ -125,13 +176,11 @@ public class ExpectContinueTest implements HttpServerAdapters { @Override public void handle(HttpTestExchange exchange) throws IOException { - try (InputStream is = exchange.getRequestBody(); - OutputStream os = exchange.getResponseBody()) { - System.err.println("Server reading body"); - is.readAllBytes(); - byte[] bytes = "RESPONSE_BODY".getBytes(UTF_8); - System.err.println("Server sending 200 (length="+bytes.length+")"); + try (OutputStream os = exchange.getResponseBody()) { + byte[] bytes = "Response Body".getBytes(UTF_8); + err.printf("Server sending 200 (length=%s)", bytes.length); exchange.sendResponseHeaders(200, bytes.length); + err.println("Server sending Response Body"); os.write(bytes); } } @@ -144,36 +193,63 @@ public class ExpectContinueTest implements HttpServerAdapters { // Http1 server has already sent 100 response at this point but not Http2 server if (exchange.getExchangeVersion().equals(HttpClient.Version.HTTP_2)) { // Send 100 Headers, tell client that we're ready for body - System.err.println("Server sending 100 (length = 0)"); + err.println("Server sending 100 (length = 0)"); exchange.sendResponseHeaders(100, 0); } // Read body from client and acknowledge with 200 - try (InputStream is = exchange.getRequestBody(); - OutputStream os = exchange.getResponseBody()) { - System.err.println("Server reading body"); + try (InputStream is = exchange.getRequestBody()) { + err.println("Server reading body"); is.readAllBytes(); - System.err.println("Server send 200 (length=0)"); + err.println("Server send 200 (length=0)"); exchange.sendResponseHeaders(200, 0); } } } - static class PostHandlerCantContinue implements HttpTestHandler { + static class PostHandlerHttp2 implements Http2Handler { + @Override + public void handle(Http2TestExchange exchange) throws IOException { + if (exchange instanceof ExpectContinueTestExchangeImpl impl) { + impl.sendEndStreamHeaders(); + } + } + } + + static class PostHandlerCantContinue implements HttpTestHandler { @Override public void handle(HttpTestExchange exchange) throws IOException { //Send 417 Headers, tell client to not send body - try (InputStream is = exchange.getRequestBody(); - OutputStream os = exchange.getResponseBody()) { + try (OutputStream os = exchange.getResponseBody()) { byte[] bytes = EXPECTATION_FAILED_417.getBytes(); - System.err.println("Server send 417 (length="+bytes.length+")"); + err.println("Server send 417 (length="+bytes.length+")"); exchange.sendResponseHeaders(417, bytes.length); + err.println("Server sending Response Body"); os.write(bytes); } } } + static class ExpectContinueTestExchangeImpl extends Http2TestExchangeImpl { + + public ExpectContinueTestExchangeImpl(int streamid, String method, HttpHeaders reqheaders, HttpHeadersBuilder rspheadersBuilder, URI uri, InputStream is, SSLSession sslSession, BodyOutputStream os, Http2TestServerConnection conn, boolean pushAllowed) { + super(streamid, method, reqheaders, rspheadersBuilder, uri, is, sslSession, os, conn, pushAllowed); + } + + private void sendEndStreamHeaders() throws IOException { + this.responseLength = 0; + rspheadersBuilder.setHeader(":status", Integer.toString(100)); + HttpHeaders headers = rspheadersBuilder.build(); + Http2TestServerConnection.ResponseHeaders response + = new Http2TestServerConnection.ResponseHeaders(headers); + response.streamid(streamid); + response.setFlag(HeaderFrame.END_HEADERS); + response.setFlag(HeaderFrame.END_STREAM); + sendResponseHeaders(response); + } + } + static class Http1HangServer extends Thread implements Closeable { final ServerSocket ss; @@ -222,7 +298,7 @@ public class ExpectContinueTest implements HttpServerAdapters { String version = tokenizer.nextToken(); boolean validRequest = method.equals("POST") && path.equals("/http1/hang") - && version.equals("HTTP/1.1"); + && version.equals("HTTP/1.1"); // If correct request, send 417 reply. Otherwise, wait for correct one if (validRequest) { System.err.println("Http1HangServer sending 417"); @@ -261,59 +337,17 @@ public class ExpectContinueTest implements HttpServerAdapters { } } - @DataProvider(name = "uris") - public Object[][] urisData() { - return new Object[][]{ - { getUri, postUri, hangUri, HTTP_1_1 }, - { h2getUri, h2postUri, h2hangUri, HTTP_2 } - }; + private void verifyRequest(int expectedStatusCode, HttpResponse resp, boolean exceptionally, Throwable testThrowable) { + if (exceptionally && testThrowable != null) { + err.println(testThrowable); + assertEquals(IOException.class, testThrowable.getClass()); + } else if (exceptionally) { + throw new TestException("Expected case to finish with an IOException but testException is null"); + } else if (resp != null) { + assertEquals(resp.statusCode(), expectedStatusCode); + err.println("Request completed successfully"); + err.println("Response Headers: " + resp.headers()); + err.println("Response Status Code: " + resp.statusCode()); + } } - - @Test(dataProvider = "uris") - public void test(URI getUri, URI postUri, URI hangUri, HttpClient.Version version) throws IOException, InterruptedException { - System.out.println("Testing with version: " + version); - HttpClient client = HttpClient.newBuilder() - .proxy(Builder.NO_PROXY) - .version(version) - .build(); - - HttpRequest getRequest = HttpRequest.newBuilder(getUri) - .GET() - .build(); - - HttpRequest postRequest = HttpRequest.newBuilder(postUri) - .POST(HttpRequest.BodyPublishers.ofString("Sample Post")) - .expectContinue(true) - .build(); - - HttpRequest hangRequest = HttpRequest.newBuilder(hangUri) - .POST(HttpRequest.BodyPublishers.ofString("Sample Post")) - .expectContinue(true) - .build(); - - System.out.printf("Sending request (%s): %s%n", version, getRequest); - System.err.println("Sending request: " + getRequest); - CompletableFuture> cf = client.sendAsync(getRequest, HttpResponse.BodyHandlers.ofString()); - HttpResponse resp = cf.join(); - System.err.println("Response Headers: " + resp.headers()); - System.err.println("Response Status Code: " + resp.statusCode()); - assertEquals(resp.statusCode(), 200); - - System.out.printf("Sending request (%s): %s%n", version, postRequest); - System.err.println("Sending request: " + postRequest); - cf = client.sendAsync(postRequest, HttpResponse.BodyHandlers.ofString()); - resp = cf.join(); - System.err.println("Response Headers: " + resp.headers()); - System.err.println("Response Status Code: " + resp.statusCode()); - assertEquals(resp.statusCode(), 200); - - System.out.printf("Sending request (%s): %s%n", version, hangRequest); - System.err.println("Sending request: " + hangRequest); - cf = client.sendAsync(hangRequest, HttpResponse.BodyHandlers.ofString()); - resp = cf.join(); - System.err.println("Response Headers: " + resp.headers()); - System.err.println("Response Status Code: " + resp.statusCode()); - assertEquals(resp.statusCode(), 417); - } - } diff --git a/test/jdk/java/net/httpclient/http2/ExpectContinueResetTest.java b/test/jdk/java/net/httpclient/http2/ExpectContinueResetTest.java new file mode 100644 index 00000000000..faab08e2862 --- /dev/null +++ b/test/jdk/java/net/httpclient/http2/ExpectContinueResetTest.java @@ -0,0 +1,229 @@ +/* + * Copyright (c) 2023, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +/* + * @test + * @summary Verifies that the client reacts correctly to receiving RST_STREAM at various stages of + * a Partial Response. + * @bug 8309118 + * @library /test/lib /test/jdk/java/net/httpclient/lib + * @build jdk.httpclient.test.lib.common.HttpServerAdapters + * @run testng/othervm/timeout=40 -Djdk.internal.httpclient.debug=true -Djdk.httpclient.HttpClient.log=trace,errors,headers + * ExpectContinueResetTest + */ + +import jdk.httpclient.test.lib.common.HttpServerAdapters; +import jdk.httpclient.test.lib.http2.BodyOutputStream; +import jdk.httpclient.test.lib.http2.Http2Handler; +import jdk.httpclient.test.lib.http2.Http2TestExchange; +import jdk.httpclient.test.lib.http2.Http2TestExchangeImpl; +import jdk.httpclient.test.lib.http2.Http2TestServer; +import jdk.httpclient.test.lib.http2.Http2TestServerConnection; + +import jdk.internal.net.http.common.HttpHeadersBuilder; +import jdk.internal.net.http.frame.ResetFrame; +import org.testng.TestException; +import org.testng.annotations.AfterTest; +import org.testng.annotations.BeforeTest; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import javax.net.ssl.SSLSession; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.PrintStream; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpHeaders; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.util.Iterator; +import java.util.concurrent.ExecutionException; + +import static java.net.http.HttpClient.Version.HTTP_2; +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.testng.Assert.*; + +public class ExpectContinueResetTest { + + Http2TestServer http2TestServer; + // "NoError" urls complete with an exception. "NoError" or "Error" here refers to the error code in the RST_STREAM frame + // and not the outcome of the test. + URI warmup, partialResponseResetNoError, partialResponseResetError, fullResponseResetNoError, fullResponseResetError; + + static PrintStream err = new PrintStream(System.err); + + @DataProvider(name = "testData") + public Object[][] testData() { + // Not consuming the InputStream in the server's handler results in different handling of RST_STREAM client-side + return new Object[][] { + { partialResponseResetNoError }, + { partialResponseResetError }, // Checks RST_STREAM is processed if client sees no END_STREAM + { fullResponseResetNoError }, + { fullResponseResetError } + }; + } + + + @Test(dataProvider = "testData") + public void test(URI uri) { + err.printf("\nTesting with Version: %s, URI: %s\n", HTTP_2, uri.toASCIIString()); + Iterable iterable = EndlessDataChunks::new; + HttpRequest.BodyPublisher testPub = HttpRequest.BodyPublishers.ofByteArrays(iterable); + Throwable testThrowable = null; + try { + performRequest(testPub, uri); + } catch (Exception e) { + testThrowable = e.getCause(); + } + assertNotNull(testThrowable, "Request should have completed exceptionally but testThrowable is null"); + assertEquals(testThrowable.getClass(), IOException.class, "Test should have closed with an IOException"); + testThrowable.printStackTrace(); + } + + static public class EndlessDataChunks implements Iterator { + + byte[] data = new byte[16]; + @Override + public boolean hasNext() { + return true; + } + @Override + public byte[] next() { + return data; + } + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + } + + @BeforeTest + public void setup() throws Exception { + http2TestServer = new Http2TestServer(false, 0); + http2TestServer.setExchangeSupplier(ExpectContinueResetTestExchangeImpl::new); + http2TestServer.addHandler(new GetHandler().toHttp2Handler(), "/warmup"); + http2TestServer.addHandler(new NoEndStreamOnPartialResponse(), "/partialResponse/codeNoError"); + http2TestServer.addHandler(new NoEndStreamOnPartialResponse(), "/partialResponse/codeError"); + http2TestServer.addHandler(new NoEndStreamOnFullResponse(), "/fullResponse/codeNoError"); + http2TestServer.addHandler(new NoEndStreamOnFullResponse(), "/fullResponse/codeError"); + + warmup = URI.create("http://" + http2TestServer.serverAuthority() + "/warmup"); + partialResponseResetNoError = URI.create("http://" + http2TestServer.serverAuthority() + "/partialResponse/codeNoError"); + partialResponseResetError = URI.create("http://" + http2TestServer.serverAuthority() + "/partialResponse/codeError"); + fullResponseResetNoError = URI.create("http://" + http2TestServer.serverAuthority() + "/fullResponse/codeNoError"); + fullResponseResetError = URI.create("http://" + http2TestServer.serverAuthority() + "/fullResponse/codeError"); + http2TestServer.start(); + } + + @AfterTest + public void teardown() { + http2TestServer.stop(); + } + + private void performRequest(HttpRequest.BodyPublisher bodyPublisher, URI uri) + throws IOException, InterruptedException, ExecutionException { + try (HttpClient client = HttpClient.newBuilder().proxy(HttpClient.Builder.NO_PROXY).version(HTTP_2).build()) { + err.printf("Performing warmup request to %s", warmup); + client.send(HttpRequest.newBuilder(warmup).GET().version(HTTP_2).build(), HttpResponse.BodyHandlers.discarding()); + HttpRequest postRequest = HttpRequest.newBuilder(uri) + .version(HTTP_2) + .POST(bodyPublisher) + .expectContinue(true) + .build(); + err.printf("Sending request (%s): %s%n", HTTP_2, postRequest); + // TODO: when test is stable and complete, see then if fromSubscriber makes our subscriber non null + client.sendAsync(postRequest, HttpResponse.BodyHandlers.ofString()).get(); + } + } + + static class GetHandler implements HttpServerAdapters.HttpTestHandler { + + @Override + public void handle(HttpServerAdapters.HttpTestExchange exchange) throws IOException { + try (OutputStream os = exchange.getResponseBody()) { + byte[] bytes = "Response Body".getBytes(UTF_8); + err.printf("Server sending 200 (length=%s)", bytes.length); + exchange.sendResponseHeaders(200, bytes.length); + err.println("Server sending Response Body"); + os.write(bytes); + } + } + } + + static class NoEndStreamOnPartialResponse implements Http2Handler { + + @Override + public void handle(Http2TestExchange exchange) throws IOException { + err.println("Sending 100"); + exchange.sendResponseHeaders(100, 0); + if (exchange instanceof ExpectContinueResetTestExchangeImpl testExchange) { + err.println("Sending Reset"); + err.println(exchange.getRequestURI().getPath()); + switch (exchange.getRequestURI().getPath()) { + case "/partialResponse/codeNoError" -> testExchange.addResetToOutputQ(ResetFrame.NO_ERROR); + case "/partialResponse/codeError" -> testExchange.addResetToOutputQ(ResetFrame.PROTOCOL_ERROR); + default -> throw new TestException("Invalid Request Path"); + } + } else { + throw new TestException("Wrong Exchange type used"); + } + } + } + + static class NoEndStreamOnFullResponse implements Http2Handler { + + @Override + public void handle(Http2TestExchange exchange) throws IOException { + err.println("Sending 100"); + exchange.sendResponseHeaders(100, 0); + err.println("Sending 200"); + exchange.sendResponseHeaders(200, 0); + if (exchange instanceof ExpectContinueResetTestExchangeImpl testExchange) { + err.println("Sending Reset"); + switch (exchange.getRequestURI().getPath()) { + case "/fullResponse/codeNoError" -> testExchange.addResetToOutputQ(ResetFrame.NO_ERROR); + case "/fullResponse/codeError" -> testExchange.addResetToOutputQ(ResetFrame.PROTOCOL_ERROR); + default -> throw new TestException("Invalid Request Path"); + } + } else { + throw new TestException("Wrong Exchange type used"); + } + } + } + + static class ExpectContinueResetTestExchangeImpl extends Http2TestExchangeImpl { + + public ExpectContinueResetTestExchangeImpl(int streamid, String method, HttpHeaders reqheaders, HttpHeadersBuilder rspheadersBuilder, URI uri, InputStream is, SSLSession sslSession, BodyOutputStream os, Http2TestServerConnection conn, boolean pushAllowed) { + super(streamid, method, reqheaders, rspheadersBuilder, uri, is, sslSession, os, conn, pushAllowed); + } + + public void addResetToOutputQ(int code) throws IOException { + ResetFrame rf = new ResetFrame(streamid, code); + this.conn.addToOutputQ(rf); + } + } +} diff --git a/test/jdk/java/net/httpclient/lib/jdk/httpclient/test/lib/http2/Http2TestExchangeImpl.java b/test/jdk/java/net/httpclient/lib/jdk/httpclient/test/lib/http2/Http2TestExchangeImpl.java index 28ea572a018..63a4de6ef90 100644 --- a/test/jdk/java/net/httpclient/lib/jdk/httpclient/test/lib/http2/Http2TestExchangeImpl.java +++ b/test/jdk/java/net/httpclient/lib/jdk/httpclient/test/lib/http2/Http2TestExchangeImpl.java @@ -23,6 +23,7 @@ package jdk.httpclient.test.lib.http2; +import jdk.httpclient.test.lib.http2.Http2TestServerConnection.ResponseHeaders; import jdk.internal.net.http.common.HttpHeadersBuilder; import jdk.internal.net.http.frame.HeaderFrame; import jdk.internal.net.http.frame.HeadersFrame; @@ -145,15 +146,15 @@ public class Http2TestExchangeImpl implements Http2TestExchange { rspheadersBuilder.setHeader(":status", Integer.toString(rCode)); HttpHeaders headers = rspheadersBuilder.build(); - Http2TestServerConnection.ResponseHeaders response - = new Http2TestServerConnection.ResponseHeaders(headers); + ResponseHeaders response + = new ResponseHeaders(headers); response.streamid(streamid); response.setFlag(HeaderFrame.END_HEADERS); if (responseLength < 0 || rCode == 204) { response.setFlag(HeadersFrame.END_STREAM); - conn.outputQ.put(response); + sendResponseHeaders(response); // Put a reset frame on the outputQ if there is still unconsumed data in the input stream and output stream // is going to be marked closed. if (is instanceof BodyInputStream bis && bis.unconsumed()) { @@ -161,12 +162,16 @@ public class Http2TestExchangeImpl implements Http2TestExchange { } os.markClosed(); } else { - conn.outputQ.put(response); + sendResponseHeaders(response); } os.goodToGo(); System.err.println("Sent response headers " + rCode); } + public void sendResponseHeaders(ResponseHeaders response) throws IOException { + conn.outputQ.put(response); + } + @Override public InetSocketAddress getRemoteAddress() { return (InetSocketAddress) conn.socket.getRemoteSocketAddress(); diff --git a/test/jdk/java/net/httpclient/lib/jdk/httpclient/test/lib/http2/Http2TestServerConnection.java b/test/jdk/java/net/httpclient/lib/jdk/httpclient/test/lib/http2/Http2TestServerConnection.java index e93669685dc..ff10d4087e9 100644 --- a/test/jdk/java/net/httpclient/lib/jdk/httpclient/test/lib/http2/Http2TestServerConnection.java +++ b/test/jdk/java/net/httpclient/lib/jdk/httpclient/test/lib/http2/Http2TestServerConnection.java @@ -1241,10 +1241,10 @@ public class Http2TestServerConnection { // simplified output headers class. really just a type safe container // for the hashmap. - static class ResponseHeaders extends Http2Frame { + public static class ResponseHeaders extends Http2Frame { HttpHeaders headers; - ResponseHeaders(HttpHeaders headers) { + public ResponseHeaders(HttpHeaders headers) { super(0, 0); this.headers = headers; }