8284585: PushPromiseContinuation test fails intermittently in timeout

Reviewed-by: dfuchs
This commit is contained in:
Conor Cleary 2022-05-16 11:14:34 +00:00 committed by Daniel Fuchs
parent 652044d82b
commit 65da38d844
4 changed files with 94 additions and 17 deletions

View File

@ -763,7 +763,7 @@ class Http2Connection {
} }
Stream<?> stream = getStream(streamid); Stream<?> stream = getStream(streamid);
if (stream == null) { if (stream == null && pushContinuationState == null) {
// Should never receive a frame with unknown stream id // Should never receive a frame with unknown stream id
if (frame instanceof HeaderFrame) { if (frame instanceof HeaderFrame) {
@ -803,7 +803,11 @@ class Http2Connection {
if (pushContinuationState != null) { if (pushContinuationState != null) {
if (frame instanceof ContinuationFrame cf) { if (frame instanceof ContinuationFrame cf) {
try { try {
handlePushContinuation(stream, cf); if (streamid == pushContinuationState.pushContFrame.streamid())
handlePushContinuation(stream, cf);
else
protocolError(ErrorFrame.PROTOCOL_ERROR, "Received a Continuation Frame with an " +
"unexpected stream id");
} catch (UncheckedIOException e) { } catch (UncheckedIOException e) {
debug.log("Error handling Push Promise with Continuation: " + e.getMessage(), e); debug.log("Error handling Push Promise with Continuation: " + e.getMessage(), e);
protocolError(ErrorFrame.PROTOCOL_ERROR, e.getMessage()); protocolError(ErrorFrame.PROTOCOL_ERROR, e.getMessage());
@ -890,8 +894,6 @@ class Http2Connection {
private <T> void completePushPromise(int promisedStreamid, Stream<T> parent, HttpHeaders headers) private <T> void completePushPromise(int promisedStreamid, Stream<T> parent, HttpHeaders headers)
throws IOException { throws IOException {
// Perhaps the following checks could be moved to handlePushPromise()
// to reset the PushPromise stream earlier?
HttpRequestImpl parentReq = parent.request; HttpRequestImpl parentReq = parent.request;
if (promisedStreamid != nextPushStream) { if (promisedStreamid != nextPushStream) {
resetStream(promisedStreamid, ResetFrame.PROTOCOL_ERROR); resetStream(promisedStreamid, ResetFrame.PROTOCOL_ERROR);

View File

@ -61,12 +61,13 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.function.BiPredicate; import java.util.function.BiPredicate;
import static java.nio.charset.StandardCharsets.UTF_8; import static java.nio.charset.StandardCharsets.UTF_8;
import static org.testng.Assert.assertEquals; import static org.testng.Assert.*;
public class PushPromiseContinuation { public class PushPromiseContinuation {
@ -97,7 +98,7 @@ public class PushPromiseContinuation {
// Need to have a custom exchange supplier to manage the server's push // Need to have a custom exchange supplier to manage the server's push
// promise with continuation flow // promise with continuation flow
server.setExchangeSupplier(Http2LPPTestExchangeImpl::new); server.setExchangeSupplier(Http2PushPromiseContinuationExchangeImpl::new);
System.err.println("PushPromiseContinuation: Server listening on port " + server.getAddress().getPort()); System.err.println("PushPromiseContinuation: Server listening on port " + server.getAddress().getPort());
server.start(); server.start();
@ -166,6 +167,31 @@ public class PushPromiseContinuation {
verify(resp); verify(resp);
} }
@Test
public void testSendHeadersOnPushPromiseStream() throws Exception {
// This test server sends a push promise that should be followed by a continuation but
// incorrectly sends on Response Headers while the client awaits the continuation.
Http2TestServer faultyServer = new Http2TestServer(false, 0);
faultyServer.addHandler(new ServerPushHandler(), "/");
faultyServer.setExchangeSupplier(Http2PushPromiseHeadersExchangeImpl::new);
System.err.println("PushPromiseContinuation: FaultyServer listening on port " + faultyServer.getAddress().getPort());
faultyServer.start();
int faultyPort = faultyServer.getAddress().getPort();
URI faultyUri = new URI("http://localhost:" + faultyPort + "/");
HttpClient client = HttpClient.newHttpClient();
// Server is making a request to an incorrect URI
HttpRequest hreq = HttpRequest.newBuilder(faultyUri).version(HttpClient.Version.HTTP_2).GET().build();
CompletableFuture<HttpResponse<String>> cf =
client.sendAsync(hreq, HttpResponse.BodyHandlers.ofString(UTF_8), pph);
CompletionException t = expectThrows(CompletionException.class, () -> cf.join());
assertEquals(t.getCause().getClass(), IOException.class, "Expected an IOException but got " + t.getCause());
System.err.println("Client received the following expected exception: " + t.getCause());
faultyServer.stop();
}
private void verify(HttpResponse<String> resp) { private void verify(HttpResponse<String> resp) {
assertEquals(resp.statusCode(), 200); assertEquals(resp.statusCode(), 200);
assertEquals(resp.body(), mainResponseBody); assertEquals(resp.body(), mainResponseBody);
@ -186,15 +212,46 @@ public class PushPromiseContinuation {
} }
} }
static class Http2LPPTestExchangeImpl extends Http2TestExchangeImpl { static class Http2PushPromiseHeadersExchangeImpl extends Http2TestExchangeImpl {
Http2PushPromiseHeadersExchangeImpl(int streamid, String method, HttpHeaders reqheaders, HttpHeadersBuilder rspheadersBuilder, URI uri, InputStream is, SSLSession sslSession, BodyOutputStream os, Http2TestServerConnection conn, boolean pushAllowed) {
super(streamid, method, reqheaders, rspheadersBuilder, uri, is, sslSession, os, conn, pushAllowed);
}
@Override
public void serverPush(URI uri, HttpHeaders headers, InputStream content) {
HttpHeadersBuilder headersBuilder = new HttpHeadersBuilder();
headersBuilder.setHeader(":method", "GET");
headersBuilder.setHeader(":scheme", uri.getScheme());
headersBuilder.setHeader(":authority", uri.getAuthority());
headersBuilder.setHeader(":path", uri.getPath());
for (Map.Entry<String,List<String>> entry : headers.map().entrySet()) {
for (String value : entry.getValue())
headersBuilder.addHeader(entry.getKey(), value);
}
HttpHeaders combinedHeaders = headersBuilder.build();
OutgoingPushPromise pp = new OutgoingPushPromise(streamid, uri, combinedHeaders, content);
// Indicates to the client that a continuation should be expected
pp.setFlag(0x0);
try {
conn.outputQ.put(pp);
// writeLoop will spin up thread to read the InputStream
} catch (IOException ex) {
System.err.println("TestServer: pushPromise exception: " + ex);
}
}
}
static class Http2PushPromiseContinuationExchangeImpl extends Http2TestExchangeImpl {
HttpHeadersBuilder pushPromiseHeadersBuilder; HttpHeadersBuilder pushPromiseHeadersBuilder;
List<ContinuationFrame> cfs; List<ContinuationFrame> cfs;
Http2LPPTestExchangeImpl(int streamid, String method, HttpHeaders reqheaders, Http2PushPromiseContinuationExchangeImpl(int streamid, String method, HttpHeaders reqheaders,
HttpHeadersBuilder rspheadersBuilder, URI uri, InputStream is, HttpHeadersBuilder rspheadersBuilder, URI uri, InputStream is,
SSLSession sslSession, BodyOutputStream os, SSLSession sslSession, BodyOutputStream os,
Http2TestServerConnection conn, boolean pushAllowed) { Http2TestServerConnection conn, boolean pushAllowed) {
super(streamid, method, reqheaders, rspheadersBuilder, uri, is, sslSession, os, conn, pushAllowed); super(streamid, method, reqheaders, rspheadersBuilder, uri, is, sslSession, os, conn, pushAllowed);
} }
@ -248,7 +305,8 @@ public class PushPromiseContinuation {
HttpHeaders pushPromiseHeaders = pushPromiseHeadersBuilder.build(); HttpHeaders pushPromiseHeaders = pushPromiseHeadersBuilder.build();
testHeaders = testHeadersBuilder.build(); testHeaders = testHeadersBuilder.build();
// Create the Push Promise Frame // Create the Push Promise Frame
OutgoingPushPromise pp = new OutgoingPushPromise(streamid, uri, pushPromiseHeaders, content); OutgoingPushPromise pp = new OutgoingPushPromise(streamid, uri, pushPromiseHeaders, content, cfs);
// Indicates to the client that a continuation should be expected // Indicates to the client that a continuation should be expected
pp.setFlag(0x0); pp.setFlag(0x0);
@ -256,10 +314,6 @@ public class PushPromiseContinuation {
// Schedule push promise and continuation for sending // Schedule push promise and continuation for sending
conn.outputQ.put(pp); conn.outputQ.put(pp);
System.err.println("Server: Scheduled a Push Promise to Send"); System.err.println("Server: Scheduled a Push Promise to Send");
for (ContinuationFrame cf : cfs) {
conn.outputQ.put(cf);
System.err.println("Server: Scheduled a Continuation to Send");
}
} catch (IOException ex) { } catch (IOException ex) {
System.err.println("Server: pushPromise exception: " + ex); System.err.println("Server: pushPromise exception: " + ex);
} }

View File

@ -945,6 +945,11 @@ public class Http2TestServerConnection {
nextPushStreamId += 2; nextPushStreamId += 2;
pp.streamid(op.parentStream); pp.streamid(op.parentStream);
writeFrame(pp); writeFrame(pp);
// No need to check for END_HEADERS flag here to allow for tests to simulate bad server side
// behavior i.e Continuation Frames included with END_HEADERS flag set
for (Http2Frame cf : op.getContinuations())
writeFrame(cf);
final InputStream ii = op.is; final InputStream ii = op.is;
final BodyOutputStream oo = new BodyOutputStream( final BodyOutputStream oo = new BodyOutputStream(
promisedStreamid, promisedStreamid,

View File

@ -1,5 +1,5 @@
/* /*
* Copyright (c) 2016, 2018, Oracle and/or its affiliates. All rights reserved. * Copyright (c) 2016, 2022, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
* *
* This code is free software; you can redistribute it and/or modify it * This code is free software; you can redistribute it and/or modify it
@ -24,6 +24,9 @@
import java.io.InputStream; import java.io.InputStream;
import java.net.URI; import java.net.URI;
import java.net.http.HttpHeaders; import java.net.http.HttpHeaders;
import java.util.List;
import jdk.internal.net.http.frame.ContinuationFrame;
import jdk.internal.net.http.frame.Http2Frame; import jdk.internal.net.http.frame.Http2Frame;
// will be converted to a PushPromiseFrame in the writeLoop // will be converted to a PushPromiseFrame in the writeLoop
@ -33,16 +36,29 @@ class OutgoingPushPromise extends Http2Frame {
final URI uri; final URI uri;
final InputStream is; final InputStream is;
final int parentStream; // not the pushed streamid final int parentStream; // not the pushed streamid
private final List<Http2Frame> continuations;
public OutgoingPushPromise(int parentStream, public OutgoingPushPromise(int parentStream,
URI uri, URI uri,
HttpHeaders headers, HttpHeaders headers,
InputStream is) { InputStream is) {
this(parentStream, uri, headers, is, List.of());
}
public OutgoingPushPromise(int parentStream,
URI uri,
HttpHeaders headers,
InputStream is,
List<ContinuationFrame> continuations) {
super(0,0); super(0,0);
this.uri = uri; this.uri = uri;
this.headers = headers; this.headers = headers;
this.is = is; this.is = is;
this.parentStream = parentStream; this.parentStream = parentStream;
this.continuations = List.copyOf(continuations);
} }
public List<Http2Frame> getContinuations() {
return continuations;
}
} }