8229822: ThrowingPushPromises tests sometimes fail due to EOF

SocketTube is fixed to cater for errors caused by pausing/resuming events on an asynchronously closed connection, from within the selector's manager thread. Http2Connection and Stream are fixed to prevent sending a DataFrame on a stream after Reset has been sent.

Reviewed-by: chegar
This commit is contained in:
Daniel Fuchs 2020-08-07 16:16:45 +01:00
parent 45c89daf72
commit 77c46ea911
10 changed files with 169 additions and 29 deletions

View File

@ -329,7 +329,11 @@ class Http2Connection {
Log.logTrace("Connection send window size {0} ", windowController.connectionWindowSize());
Stream<?> initialStream = createStream(exchange);
initialStream.registerStream(1);
boolean opened = initialStream.registerStream(1, true);
if (debug.on() && !opened) {
debug.log("Initial stream was cancelled - but connection is maintained: " +
"reset frame will need to be sent later");
}
windowController.registerStream(1, getInitialSendWindowSize());
initialStream.requestSent();
// Upgrading:
@ -338,6 +342,11 @@ class Http2Connection {
this.initial = initial;
connectFlows(connection);
sendConnectionPreface();
if (!opened) {
debug.log("ensure reset frame is sent to cancel initial stream");
initialStream.sendCancelStreamFrame();
}
}
// Used when upgrading an HTTP/1.1 connection to HTTP/2 after receiving
@ -849,7 +858,7 @@ class Http2Connection {
Exchange<T> pushExch = new Exchange<>(pushReq, parent.exchange.multi);
Stream.PushedStream<T> pushStream = createPushStream(parent, pushExch);
pushExch.exchImpl = pushStream;
pushStream.registerStream(promisedStreamid);
pushStream.registerStream(promisedStreamid, true);
parent.incoming_pushPromise(pushReq, pushStream);
}
@ -874,7 +883,7 @@ class Http2Connection {
}
}
void resetStream(int streamid, int code) throws IOException {
void resetStream(int streamid, int code) {
try {
if (connection.channel().isOpen()) {
// no need to try & send a reset frame if the
@ -882,6 +891,7 @@ class Http2Connection {
Log.logError(
"Resetting stream {0,number,integer} with error code {1,number,integer}",
streamid, code);
markStream(streamid, code);
ResetFrame frame = new ResetFrame(streamid, code);
sendFrame(frame);
} else if (debug.on()) {
@ -894,6 +904,11 @@ class Http2Connection {
}
}
private void markStream(int streamid, int code) {
Stream<?> s = streams.get(streamid);
if (s != null) s.markStream(code);
}
// reduce count of streams by 1 if stream still exists
synchronized void decrementStreamsCount(int streamid) {
Stream<?> s = streams.get(streamid);
@ -1192,12 +1207,19 @@ class Http2Connection {
Stream<?> stream = oh.getAttachment();
assert stream.streamid == 0;
int streamid = nextstreamid;
nextstreamid += 2;
stream.registerStream(streamid);
// set outgoing window here. This allows thread sending
// body to proceed.
windowController.registerStream(streamid, getInitialSendWindowSize());
return stream;
if (stream.registerStream(streamid, false)) {
// set outgoing window here. This allows thread sending
// body to proceed.
nextstreamid += 2;
windowController.registerStream(streamid, getInitialSendWindowSize());
return stream;
} else {
stream.cancelImpl(new IOException("Request cancelled"));
if (finalStream() && streams.isEmpty()) {
close();
}
return null;
}
}
private final Object sendlock = new Object();
@ -1211,7 +1233,9 @@ class Http2Connection {
OutgoingHeaders<Stream<?>> oh = (OutgoingHeaders<Stream<?>>) frame;
Stream<?> stream = registerNewStream(oh);
// provide protection from inserting unordered frames between Headers and Continuation
publisher.enqueue(encodeHeaders(oh, stream));
if (stream != null) {
publisher.enqueue(encodeHeaders(oh, stream));
}
} else {
publisher.enqueue(encodeFrame(frame));
}

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2018, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2020, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -246,7 +246,7 @@ final class SocketTube implements FlowTube {
}
@Override
public final void abort(IOException error) {
debug().log(() -> "abort: " + error);
debug().log(() -> this.getClass().getSimpleName() + " abort: " + error);
pause(); // pause, then signal
signalError(error); // should not be resumed after abort (not checked)
}
@ -724,10 +724,12 @@ final class SocketTube implements FlowTube {
@Override
public final void cancel() {
pauseReadEvent();
if (debug.on()) debug.log("Read subscription cancelled");
if (Log.channel()) {
Log.logChannel("Read subscription cancelled for channel {0}",
channelDescr());
}
if (debug.on()) debug.log("Stopping read scheduler");
readScheduler.stop();
}
@ -748,6 +750,7 @@ final class SocketTube implements FlowTube {
}
final void signalError(Throwable error) {
if (debug.on()) debug.log("signal read error: " + error);
if (!errorRef.compareAndSet(null, error)) {
return;
}
@ -808,6 +811,7 @@ final class SocketTube implements FlowTube {
}
current.errorRef.compareAndSet(null, error);
current.signalCompletion();
if (debug.on()) debug.log("Stopping read scheduler");
readScheduler.stop();
debugState("leaving read() loop with error: ");
return;
@ -831,6 +835,7 @@ final class SocketTube implements FlowTube {
// anyway.
pauseReadEvent();
current.signalCompletion();
if (debug.on()) debug.log("Stopping read scheduler");
readScheduler.stop();
}
debugState("leaving read() loop after EOF: ");
@ -850,6 +855,7 @@ final class SocketTube implements FlowTube {
// waiting for this event to terminate.
// So resume the read event and return now...
resumeReadEvent();
if (errorRef.get() != null) continue;
debugState("leaving read() loop after onNext: ");
return;
} else {
@ -861,6 +867,7 @@ final class SocketTube implements FlowTube {
// readable again.
demand.increase(1);
resumeReadEvent();
if (errorRef.get() != null) continue;
debugState("leaving read() loop with no bytes");
return;
}
@ -879,6 +886,7 @@ final class SocketTube implements FlowTube {
// Trying to pause the event here would actually
// introduce a race condition between this loop and
// request(n).
if (errorRef.get() != null) continue;
debugState("leaving read() loop with no demand");
break;
}
@ -946,6 +954,7 @@ final class SocketTube implements FlowTube {
@Override
protected final void signalError(Throwable error) {
if (debug.on()) debug.log("signalError to %s (%s)", sub, error);
sub.signalError(error);
}

View File

@ -28,6 +28,8 @@ package jdk.internal.net.http;
import java.io.EOFException;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@ -133,12 +135,18 @@ class Stream<T> extends ExchangeImpl<T> {
private volatile boolean remotelyClosed;
private volatile boolean closed;
private volatile boolean endStreamSent;
final AtomicBoolean deRegistered = new AtomicBoolean(false);
// Indicates the first reason that was invoked when sending a ResetFrame
// to the server. A streamState of 0 indicates that no reset was sent.
// (see markStream(int code)
private volatile int streamState; // assigned using STREAM_STATE varhandle.
private volatile boolean deRegistered; // assigned using DEREGISTERED varhandle.
// state flags
private boolean requestSent, responseReceived;
// send lock: prevent sending DataFrames after reset occurred.
private final Object sendLock = new Object();
/**
* A reference to this Stream's connection Send Window controller. The
* stream MUST acquire the appropriate amount of Send Window before
@ -292,7 +300,7 @@ class Stream<T> extends ExchangeImpl<T> {
}
boolean deRegister() {
return deRegistered.compareAndSet(false, true);
return DEREGISTERED.compareAndSet(this, false, true);
}
@Override
@ -338,6 +346,36 @@ class Stream<T> extends ExchangeImpl<T> {
sched.runOrSchedule();
}
/**
* Records the first reason which was invoked when sending a ResetFrame
* to the server in the streamState, and return the previous value
* of the streamState. This is an atomic operation.
* A possible use of this method would be to send a ResetFrame only
* if no previous reset frame has been sent.
* For instance: <pre>{@code
* if (markStream(ResetFrame.CANCEL) == 0) {
* connection.sendResetFrame(streamId, ResetFrame.CANCEL);
* }
* }</pre>
* @param code the reason code as per HTTP/2 protocol
* @return the previous value of the stream state.
*/
int markStream(int code) {
if (code == 0) return streamState;
synchronized (sendLock) {
return (int) STREAM_STATE.compareAndExchange(this, 0, code);
}
}
private void sendDataFrame(DataFrame frame) {
synchronized (sendLock) {
// must not send DataFrame after reset.
if (streamState == 0) {
connection.sendDataFrame(frame);
}
}
}
// pushes entire response body into response subscriber
// blocking when required by local or remote flow control
CompletableFuture<T> receiveData(BodySubscriber<T> bodySubscriber, Executor executor) {
@ -384,6 +422,7 @@ class Stream<T> extends ExchangeImpl<T> {
*/
void incoming(Http2Frame frame) throws IOException {
if (debug.on()) debug.log("incoming: %s", frame);
var cancelled = closed || streamState != 0;
if ((frame instanceof HeaderFrame)) {
HeaderFrame hframe = (HeaderFrame)frame;
if (hframe.endHeaders()) {
@ -395,9 +434,10 @@ class Stream<T> extends ExchangeImpl<T> {
receiveDataFrame(new DataFrame(streamid, DataFrame.END_STREAM, List.of()));
}
} else if (frame instanceof DataFrame) {
receiveDataFrame((DataFrame)frame);
if (cancelled) connection.dropDataFrame((DataFrame) frame);
else receiveDataFrame((DataFrame) frame);
} else {
otherFrame(frame);
if (!cancelled) otherFrame(frame);
}
}
@ -720,6 +760,16 @@ class Stream<T> extends ExchangeImpl<T> {
} else {
requestContentLen = 0;
}
// At this point the stream doesn't have a streamid yet.
// It will be allocated if we send the request headers.
Throwable t = errorRef.get();
if (t != null) {
if (debug.on()) debug.log("stream already cancelled, headers not sent: %s", (Object)t);
return MinimalFuture.failedFuture(t);
}
// sending the headers will cause the allocation of the stream id
OutgoingHeaders<Stream<T>> f = headerFrame(requestContentLen);
connection.sendFrame(f);
CompletableFuture<ExchangeImpl<T>> cf = new MinimalFuture<>();
@ -745,10 +795,17 @@ class Stream<T> extends ExchangeImpl<T> {
// been already closed (or will be closed shortly after).
}
void registerStream(int id) {
this.streamid = id;
connection.putStream(this, streamid);
if (debug.on()) debug.log("Registered stream %d", id);
boolean registerStream(int id, boolean registerIfCancelled) {
boolean cancelled = closed;
if (!cancelled || registerIfCancelled) {
this.streamid = id;
connection.putStream(this, streamid);
if (debug.on()) {
debug.log("Stream %d registered (cancelled: %b, registerIfCancelled: %b)",
streamid, cancelled, registerIfCancelled);
}
}
return !cancelled;
}
void signalWindowUpdate() {
@ -853,6 +910,7 @@ class Stream<T> extends ExchangeImpl<T> {
cancelImpl(t);
return;
}
int state = streamState;
do {
// handle COMPLETED;
@ -865,7 +923,7 @@ class Stream<T> extends ExchangeImpl<T> {
}
// handle bytes to send downstream
while (item.hasRemaining()) {
while (item.hasRemaining() && state == 0) {
if (debug.on()) debug.log("trySend: %d", item.remaining());
assert !endStreamSent : "internal error, send data after END_STREAM flag";
DataFrame df = getDataFrame(item);
@ -884,6 +942,7 @@ class Stream<T> extends ExchangeImpl<T> {
+ "Too many bytes in request body. Expected: "
+ contentLength + ", got: "
+ (contentLength - remainingContentLength);
assert streamid > 0;
connection.resetStream(streamid, ResetFrame.PROTOCOL_ERROR);
throw new IOException(msg);
} else if (remainingContentLength == 0) {
@ -891,15 +950,26 @@ class Stream<T> extends ExchangeImpl<T> {
endStreamSent = true;
}
}
if ((state = streamState) != 0) {
if (debug.on()) debug.log("trySend: cancelled: %s", String.valueOf(t));
break;
}
if (debug.on())
debug.log("trySend: sending: %d", df.getDataLength());
connection.sendDataFrame(df);
sendDataFrame(df);
}
if (state != 0) break;
assert !item.hasRemaining();
ByteBuffer b = outgoing.removeFirst();
assert b == item;
} while (outgoing.peekFirst() != null);
if (state != 0) {
t = errorRef.get();
if (t == null) t = new IOException(ResetFrame.stringForCode(streamState));
throw t;
}
if (debug.on()) debug.log("trySend: request 1");
subscription.request(1);
} catch (Throwable ex) {
@ -1102,7 +1172,11 @@ class Stream<T> extends ExchangeImpl<T> {
@Override
void cancel() {
cancel(new IOException("Stream " + streamid + " cancelled"));
if ((streamid == 0)) {
cancel(new IOException("Stream cancelled before streamid assigned"));
} else {
cancel(new IOException("Stream " + streamid + " cancelled"));
}
}
void onSubscriptionError(Throwable t) {
@ -1135,9 +1209,13 @@ class Stream<T> extends ExchangeImpl<T> {
// This method sends a RST_STREAM frame
void cancelImpl(Throwable e) {
errorRef.compareAndSet(null, e);
if (debug.on()) debug.log("cancelling stream {0}: {1}", streamid, e);
if (debug.on()) {
if (streamid == 0) debug.log("cancelling stream: %s", (Object)e);
else debug.log("cancelling stream %d: %s", streamid, e);
}
if (Log.trace()) {
Log.logTrace("cancelling stream {0}: {1}\n", streamid, e);
if (streamid == 0) Log.logTrace("cancelling stream: {0}\n", e);
else Log.logTrace("cancelling stream {0}: {1}\n", streamid, e);
}
boolean closing;
if (closing = !closed) { // assigning closing to !closed
@ -1160,14 +1238,15 @@ class Stream<T> extends ExchangeImpl<T> {
}
try {
// will send a RST_STREAM frame
if (streamid != 0) {
connection.decrementStreamsCount(streamid);
if (streamid != 0 && streamState == 0) {
e = Utils.getCompletionCause(e);
if (e instanceof EOFException) {
// read EOF: no need to try & send reset
connection.decrementStreamsCount(streamid);
connection.closeStream(streamid);
} else {
connection.resetStream(streamid, ResetFrame.CANCEL);
// no use to send CANCEL if already closed.
sendCancelStreamFrame();
}
}
} catch (Throwable ex) {
@ -1175,6 +1254,14 @@ class Stream<T> extends ExchangeImpl<T> {
}
}
void sendCancelStreamFrame() {
// do not reset a stream until it has a streamid.
if (streamid > 0 && markStream(ResetFrame.CANCEL) == 0) {
connection.resetStream(streamid, ResetFrame.CANCEL);
}
close();
}
// This method doesn't send any frame
void close() {
if (closed) return;
@ -1390,4 +1477,17 @@ class Stream<T> extends ExchangeImpl<T> {
}
}
}
private static final VarHandle STREAM_STATE;
private static final VarHandle DEREGISTERED;
static {
try {
STREAM_STATE = MethodHandles.lookup()
.findVarHandle(Stream.class, "streamState", int.class);
DEREGISTERED = MethodHandles.lookup()
.findVarHandle(Stream.class, "deRegistered", boolean.class);
} catch (Exception x) {
throw new ExceptionInInitializerError(x);
}
}
}

View File

@ -23,6 +23,7 @@
/*
* @test
* @bug 8229822
* @summary Tests what happens when push promise handlers and their
* response body handlers and subscribers throw unexpected exceptions.
* @library /test/lib http2/server

View File

@ -23,6 +23,7 @@
/*
* @test
* @bug 8229822
* @summary Tests what happens when push promise handlers and their
* response body handlers and subscribers throw unexpected exceptions.
* @library /test/lib http2/server

View File

@ -23,6 +23,7 @@
/*
* @test
* @bug 8229822
* @summary Tests what happens when push promise handlers and their
* response body handlers and subscribers throw unexpected exceptions.
* @library /test/lib http2/server

View File

@ -23,6 +23,7 @@
/*
* @test
* @bug 8229822
* @summary Tests what happens when push promise handlers and their
* response body handlers and subscribers throw unexpected exceptions.
* @library /test/lib http2/server

View File

@ -23,6 +23,7 @@
/*
* @test
* @bug 8229822
* @summary Tests what happens when push promise handlers and their
* response body handlers and subscribers throw unexpected exceptions.
* @library /test/lib http2/server

View File

@ -23,6 +23,7 @@
/*
* @test
* @bug 8229822
* @summary Tests what happens when push promise handlers and their
* response body handlers and subscribers throw unexpected exceptions.
* @library /test/lib http2/server

View File

@ -23,6 +23,7 @@
/*
* @test
* @bug 8229822
* @summary Tests what happens when push promise handlers and their
* response body handlers and subscribers throw unexpected exceptions.
* @library /test/lib http2/server