8312433: HttpClient request fails due to connection being considered idle and closed

Reviewed-by: djelinski
This commit is contained in:
Jaikiran Pai 2023-07-27 12:14:14 +00:00
parent 271417a0e1
commit 486c7844f9
3 changed files with 309 additions and 44 deletions
src/java.net.http/share/classes/jdk/internal/net/http
test/jdk/java/net/httpclient/http2

@ -28,8 +28,6 @@ package jdk.internal.net.http;
import java.io.EOFException;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.Base64;
import java.util.HashSet;
import java.util.Map;
@ -71,7 +69,8 @@ class Http2ClientImpl {
// only accessed from within lock protected blocks
private final Set<String> failures = new HashSet<>();
private final ReentrantLock lock = new ReentrantLock();
// used when dealing with connections in the pool
private final ReentrantLock connectionPoolLock = new ReentrantLock();
/**
* When HTTP/2 requested only. The following describes the aggregate behavior including the
@ -100,15 +99,16 @@ class Http2ClientImpl {
Exchange<?> exchange) {
String key = Http2Connection.keyFor(req);
lock.lock();
connectionPoolLock.lock();
try {
Http2Connection connection = connections.get(key);
if (connection != null) {
try {
if (!connection.isOpen() || !connection.reserveStream(true)) {
if (!connection.tryReserveForPoolCheckout() || !connection.reserveStream(true)) {
if (debug.on())
debug.log("removing found closed or closing connection: %s", connection);
deleteConnection(connection);
debug.log("removing connection from pool since it couldn't be" +
" reserved for use: %s", connection);
removeFromPool(connection);
} else {
// fast path if connection already exists
if (debug.on())
@ -128,12 +128,12 @@ class Http2ClientImpl {
return MinimalFuture.completedFuture(null);
}
} finally {
lock.unlock();
connectionPoolLock.unlock();
}
return Http2Connection
.createAsync(req, this, exchange)
.whenComplete((conn, t) -> {
lock.lock();
connectionPoolLock.lock();
try {
if (conn != null) {
try {
@ -148,7 +148,7 @@ class Http2ClientImpl {
failures.add(key);
}
} finally {
lock.unlock();
connectionPoolLock.unlock();
}
});
}
@ -169,7 +169,7 @@ class Http2ClientImpl {
}
String key = c.key();
lock.lock();
connectionPoolLock.lock();
try {
if (stopping) {
if (debug.on()) debug.log("stopping - closing connection: %s", c);
@ -192,21 +192,27 @@ class Http2ClientImpl {
debug.log("put in the connection pool: %s", c);
return true;
} finally {
lock.unlock();
connectionPoolLock.unlock();
}
}
void deleteConnection(Http2Connection c) {
/**
* Removes the connection from the pool (if it was in the pool).
* This method doesn't close the connection.
*
* @param c the connection to remove from the pool
*/
void removeFromPool(Http2Connection c) {
if (debug.on())
debug.log("removing from the connection pool: %s", c);
lock.lock();
connectionPoolLock.lock();
try {
if (connections.remove(c.key(), c)) {
if (debug.on())
debug.log("removed from the connection pool: %s", c);
}
} finally {
lock.unlock();
connectionPoolLock.unlock();
}
}
@ -215,11 +221,11 @@ class Http2ClientImpl {
if (debug.on()) debug.log("stopping");
STOPPED = new EOFException("HTTP/2 client stopped");
STOPPED.setStackTrace(new StackTraceElement[0]);
lock.lock();
connectionPoolLock.lock();
try {
stopping = true;
} finally {
lock.unlock();
connectionPoolLock.unlock();
}
do {
connections.values().forEach(this::close);

@ -131,7 +131,8 @@ class Http2Connection {
private static final int MAX_CLIENT_STREAM_ID = Integer.MAX_VALUE; // 2147483647
private static final int MAX_SERVER_STREAM_ID = Integer.MAX_VALUE - 1; // 2147483646
private IdleConnectionTimeoutEvent idleConnectionTimeoutEvent; // may be null
// may be null; must be accessed/updated with the stateLock held
private IdleConnectionTimeoutEvent idleConnectionTimeoutEvent;
/**
* Flag set when no more streams to be opened on this connection.
@ -196,31 +197,65 @@ class Http2Connection {
// and has not sent the final stream flag
final class IdleConnectionTimeoutEvent extends TimeoutEvent {
private boolean fired;
// expected to be accessed/updated with "stateLock" being held
private boolean cancelled;
IdleConnectionTimeoutEvent(Duration duration) {
super(duration);
fired = false;
}
/**
* {@link #shutdown(Throwable) Shuts down} the connection, unless this event is
* {@link #cancelled}
*/
@Override
public void handle() {
fired = true;
if (debug.on()) {
debug.log("HTTP connection idle for too long");
// first check if the connection is still idle.
// must be done with the "stateLock" held, to allow for synchronizing actions like
// closing the connection and checking out from connection pool (which too is expected
// to use this same lock)
stateLock.lock();
try {
if (cancelled) {
if (debug.on()) {
debug.log("Not initiating idle connection shutdown");
}
return;
}
if (!markIdleShutdownInitiated()) {
if (debug.on()) {
debug.log("Unexpected state %s, skipping idle connection shutdown",
describeClosedState(closedState));
}
return;
}
} finally {
stateLock.unlock();
}
HttpConnectTimeoutException hte = new HttpConnectTimeoutException("HTTP connection idle, no active streams. Shutting down.");
if (debug.on()) {
debug.log("Initiating shutdown of HTTP connection which is idle for too long");
}
HttpConnectTimeoutException hte = new HttpConnectTimeoutException(
"HTTP connection idle, no active streams. Shutting down.");
shutdown(hte);
}
/**
* Cancels this event. Should be called with stateLock held
*/
void cancel() {
assert stateLock.isHeldByCurrentThread() : "Current thread doesn't hold " + stateLock;
// mark as cancelled to prevent potentially already triggered event from actually
// doing the shutdown
this.cancelled = true;
// cancel the timer to prevent the event from being triggered (if it hasn't already)
client().cancelTimer(this);
}
@Override
public String toString() {
return "IdleConnectionTimeoutEvent, " + super.toString();
}
public boolean isFired() {
return fired;
}
}
// A small class that allows to control frames with respect to the state of
@ -294,8 +329,11 @@ class Http2Connection {
private static final int HALF_CLOSED_LOCAL = 1;
private static final int HALF_CLOSED_REMOTE = 2;
private static final int SHUTDOWN_REQUESTED = 4;
private final Lock stateLock = new ReentrantLock();
volatile int closedState;
// state when idle connection management initiates a shutdown of the connection, after
// which the connection will go into SHUTDOWN_REQUESTED state
private static final int IDLE_SHUTDOWN_INITIATED = 8;
private final ReentrantLock stateLock = new ReentrantLock();
private volatile int closedState;
//-------------------------------------
final HttpConnection connection;
@ -496,11 +534,11 @@ class Http2Connection {
}
if (clientInitiated && (lastReservedClientStreamid + 2) >= MAX_CLIENT_STREAM_ID) {
setFinalStream();
client2.deleteConnection(this);
client2.removeFromPool(this);
return false;
} else if (!clientInitiated && (lastReservedServerStreamid + 2) >= MAX_SERVER_STREAM_ID) {
setFinalStream();
client2.deleteConnection(this);
client2.removeFromPool(this);
return false;
}
if (clientInitiated)
@ -775,7 +813,7 @@ class Http2Connection {
Log.logError("Shutting down connection");
}
}
client2.deleteConnection(this);
client2.removeFromPool(this);
for (Stream<?> s : streams.values()) {
try {
s.connectionClosing(t);
@ -994,8 +1032,7 @@ class Http2Connection {
}
boolean isOpen() {
return !isMarked(closedState, SHUTDOWN_REQUESTED)
&& connection.channel().isOpen();
return !isMarkedForShutdown() && connection.channel().isOpen();
}
void resetStream(int streamid, int code) {
@ -1092,10 +1129,11 @@ class Http2Connection {
// Start timer if property present and not already created
stateLock.lock();
try {
// idleConnectionTimerEvent is always accessed within a lock protected block
// idleConnectionTimeoutEvent is always accessed within a lock protected block
if (streams.isEmpty() && idleConnectionTimeoutEvent == null) {
idleConnectionTimeoutEvent = client().idleConnectionTimeout()
.map(IdleConnectionTimeoutEvent::new).orElse(null);
.map(IdleConnectionTimeoutEvent::new)
.orElse(null);
if (idleConnectionTimeoutEvent != null) {
client().registerTimer(idleConnectionTimeoutEvent);
}
@ -1283,23 +1321,53 @@ class Http2Connection {
return new Stream.PushedStream<>(pg, this, pushEx);
}
/**
* Attempts to notify the idle connection management that this connection should
* be considered "in use". This way the idle connection management doesn't close
* this connection during the time the connection is handed out from the pool and any
* new stream created on that connection.
* @return true if the connection has been successfully reserved and is {@link #isOpen()}. false
* otherwise; in which case the connection must not be handed out from the pool.
*/
boolean tryReserveForPoolCheckout() {
// must be done with "stateLock" held to co-ordinate idle connection management
stateLock.lock();
try {
cancelIdleShutdownEvent();
// consider the reservation successful only if the connection's state hasn't moved
// to "being closed"
return isOpen();
} finally {
stateLock.unlock();
}
}
/**
* Cancels any event that might have been scheduled to shutdown this connection. Must be called
* with the stateLock held.
*/
private void cancelIdleShutdownEvent() {
assert stateLock.isHeldByCurrentThread() : "Current thread doesn't hold " + stateLock;
if (idleConnectionTimeoutEvent == null) {
return;
}
idleConnectionTimeoutEvent.cancel();
idleConnectionTimeoutEvent = null;
}
<T> void putStream(Stream<T> stream, int streamid) {
// increment the reference count on the HttpClientImpl
// to prevent the SelectorManager thread from exiting until
// the stream is closed.
stateLock.lock();
try {
if (!isMarked(closedState, SHUTDOWN_REQUESTED)) {
if (!isMarkedForShutdown()) {
if (debug.on()) {
debug.log("Opened stream %d", streamid);
}
client().streamReference();
streams.put(streamid, stream);
// idleConnectionTimerEvent is always accessed within a lock protected block
if (idleConnectionTimeoutEvent != null) {
client().cancelTimer(idleConnectionTimeoutEvent);
idleConnectionTimeoutEvent = null;
}
cancelIdleShutdownEvent();
return;
}
} finally {
@ -1663,6 +1731,12 @@ class Http2Connection {
return (state & mask) == mask;
}
private boolean isMarkedForShutdown() {
final int closedSt = closedState;
return isMarked(closedSt, IDLE_SHUTDOWN_INITIATED)
|| isMarked(closedSt, SHUTDOWN_REQUESTED);
}
private boolean markShutdownRequested() {
return markClosedState(SHUTDOWN_REQUESTED);
}
@ -1675,6 +1749,10 @@ class Http2Connection {
return markClosedState(HALF_CLOSED_REMOTE);
}
private boolean markIdleShutdownInitiated() {
return markClosedState(IDLE_SHUTDOWN_INITIATED);
}
private boolean markClosedState(int flag) {
int state, desired;
do {
@ -1688,8 +1766,11 @@ class Http2Connection {
String describeClosedState(int state) {
if (state == 0) return "active";
String desc = null;
if (isMarked(state, IDLE_SHUTDOWN_INITIATED)) {
desc = "idle-shutdown-initiated";
}
if (isMarked(state, SHUTDOWN_REQUESTED)) {
desc = "shutdown";
desc = desc == null ? "shutdown" : desc + "+shutdown";
}
if (isMarked(state, HALF_CLOSED_LOCAL | HALF_CLOSED_REMOTE)) {
if (desc == null) return "closed";

@ -0,0 +1,178 @@
/*
* Copyright (c) 2023, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
import java.io.IOException;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodyHandlers;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import jdk.httpclient.test.lib.common.HttpServerAdapters;
import jdk.httpclient.test.lib.common.HttpServerAdapters.HttpTestExchange;
import jdk.httpclient.test.lib.common.HttpServerAdapters.HttpTestServer;
import jdk.internal.net.http.common.Utils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import static java.net.http.HttpClient.Builder.NO_PROXY;
import static java.net.http.HttpClient.Version.HTTP_2;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
/*
* @test
* @bug 8312433
* @summary verify that the HttpClient's HTTP2 idle connection management doesn't close a connection
* when that connection has been handed out from the pool to a caller
* @library /test/jdk/java/net/httpclient/lib
* @build jdk.httpclient.test.lib.common.HttpServerAdapters
*
* @run junit/othervm -Djdk.internal.httpclient.debug=true
* -Djdk.httpclient.keepalive.timeout.h2=3
* IdlePooledConnectionTest
*/
public class IdlePooledConnectionTest {
private static final String ALL_OK_PATH = "/allOK";
private static HttpTestServer h2Server;
private static URI allOKUri;
private static final String H2_KEEPALIVE_TIMEOUT_PROP = "jdk.httpclient.keepalive.timeout.h2";
private static final String KEEPALIVE_TIMEOUT_PROP = "jdk.httpclient.keepalive.timeout";
@BeforeAll
static void beforeAll() throws Exception {
h2Server = HttpTestServer.create(HTTP_2);
h2Server.addHandler(new AllOKHandler(), ALL_OK_PATH);
h2Server.start();
System.err.println("Started H2 server at " + h2Server.serverAuthority());
allOKUri = new URI("http://" + h2Server.serverAuthority() + ALL_OK_PATH);
}
@AfterAll
static void afterAll() throws Exception {
if (h2Server != null) {
System.err.println("Stopping h2 server: " + h2Server.serverAuthority());
h2Server.stop();
}
}
// just returns a 200 HTTP response for all requests
private static final class AllOKHandler implements HttpServerAdapters.HttpTestHandler {
@Override
public void handle(final HttpTestExchange exchange) throws IOException {
System.err.println("Responding with 200 response code for request "
+ exchange.getRequestURI());
exchange.sendResponseHeaders(200, 0);
}
}
/*
* Issues a HTTP2 request against a server and expects it to succeed.
* The connection that was used is internally pooled by the HttpClient implementation.
* Then waits for the H2 idle connection timeout, before again firing several concurrent HTTP2
* requests against the same server. It is expected that all these requests complete
* successfully without running into a race condition where the H2 idle connection management
* closes the (pooled) connection during the time connection has been handed out to a caller
* and a new stream hasn't yet been created.
*/
@Test
public void testPooledConnection() throws Exception {
final Duration h2TimeoutDuration = getEffectiveH2IdleTimeoutDuration();
assertNotNull(h2TimeoutDuration, "H2 idle connection timeout cannot be null");
// the wait time, which represents the time to wait before firing off additional requests,
// is intentionally a few milliseconds smaller than the h2 idle connection timeout,
// to allow for the requests to reach the place where connection checkout from the pool
// happens and thus allow the code to race with the idle connection timer task
// closing the connection.
final long waitTimeMillis = TimeUnit.of(ChronoUnit.MILLIS).convert(h2TimeoutDuration) - 5;
try (final HttpClient client = HttpClient.newBuilder().proxy(NO_PROXY).build()) {
final HttpRequest request = HttpRequest.newBuilder(allOKUri)
.GET().version(HTTP_2).build();
// keep ready the additional concurrent requests that we will fire later.
// we do this now so that when it's time to fire off these additional requests,
// this main thread does as little work as possible to increase the chances of a
// race condition in idle connection management closing a pooled connection
// and new requests being fired
final Callable<HttpResponse<Void>> task = () -> client.send(request,
BodyHandlers.discarding());
final List<Callable<HttpResponse<Void>>> tasks = new ArrayList<>();
final int numAdditionalReqs = 20;
for (int i = 0; i < numAdditionalReqs; i++) {
tasks.add(task);
}
// issue the first request
System.err.println("issuing first request: " + request);
final HttpResponse<Void> firstResp = client.send(request, BodyHandlers.discarding());
assertEquals(200, firstResp.statusCode(), "unexpected response code for request "
+ request);
System.err.println("waiting for " + waitTimeMillis + " milli seconds" +
" before issuing additional requests");
Thread.sleep(waitTimeMillis);
// issue additional concurrent requests
final List<Future<HttpResponse<Void>>> responses;
try (final ExecutorService executor = Executors.newFixedThreadPool(numAdditionalReqs)) {
responses = executor.invokeAll(tasks);
}
System.err.println("All " + responses.size() + " requests completed, now" +
" verifying each response");
// verify all requests succeeded
for (final Future<HttpResponse<Void>> future : responses) {
final HttpResponse<Void> rsp = future.get();
assertEquals(200, rsp.statusCode(), "unexpected response code for request "
+ request);
}
}
}
// returns the effective idle timeout duration of a HTTP2 connection
private static Duration getEffectiveH2IdleTimeoutDuration() {
final long keepAliveTimeoutInSecs = getNetProp(KEEPALIVE_TIMEOUT_PROP, 30);
final long h2TimeoutInSecs = getNetProp(H2_KEEPALIVE_TIMEOUT_PROP, keepAliveTimeoutInSecs);
return Duration.of(h2TimeoutInSecs, ChronoUnit.SECONDS);
}
private static long getNetProp(final String prop, final long def) {
final String s = Utils.getNetProperty(prop);
if (s == null) {
return def;
}
try {
final long timeoutVal = Long.parseLong(s);
return timeoutVal >= 0 ? timeoutVal : def;
} catch (NumberFormatException ignored) {
return def;
}
}
}