8308310: HttpClient: Avoid logging or locking from within synchronized blocks

Reviewed-by: jpai
This commit is contained in:
Daniel Fuchs 2023-05-24 14:23:24 +00:00
parent 7764f46e9e
commit 736b90d54b
32 changed files with 794 additions and 406 deletions

@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2022, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2023, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -57,6 +57,7 @@ public class BufferingSubscriber<T> implements TrustedSubscriber<T>
private volatile DownstreamSubscription downstreamSubscription;
/** Must be held when accessing the internal buffers. */
// using a monitor here is fine: no logging while holding it
private final Object buffersLock = new Object();
/** The internal buffers holding the buffered data. */
private ArrayList<ByteBuffer> internalBuffers;

@ -1,5 +1,5 @@
/*
* Copyright (c) 2015, 2022, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2015, 2023, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -40,6 +40,7 @@ import java.util.ListIterator;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Flow;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import jdk.internal.net.http.common.FlowTube;
import jdk.internal.net.http.common.Logger;
@ -57,11 +58,12 @@ final class ConnectionPool {
// Pools of idle connections
private final ReentrantLock stateLock = new ReentrantLock();
private final HashMap<CacheKey,LinkedList<HttpConnection>> plainPool;
private final HashMap<CacheKey,LinkedList<HttpConnection>> sslPool;
private final ExpiryList expiryList;
private final String dbgTag; // used for debug
boolean stopped;
volatile boolean stopped;
/**
* Entries in connection pool are keyed by destination address and/or
@ -139,7 +141,7 @@ final class ConnectionPool {
return dbgTag;
}
synchronized void start() {
void start() {
assert !stopped : "Already stopped";
}
@ -149,9 +151,21 @@ final class ConnectionPool {
return new CacheKey(secure, destination, proxy);
}
synchronized HttpConnection getConnection(boolean secure,
InetSocketAddress addr,
InetSocketAddress proxy) {
HttpConnection getConnection(boolean secure,
InetSocketAddress addr,
InetSocketAddress proxy) {
if (stopped) return null;
stateLock.lock();
try {
return getConnection0(secure, addr, proxy);
} finally {
stateLock.unlock();
}
}
private HttpConnection getConnection0(boolean secure,
InetSocketAddress addr,
InetSocketAddress proxy) {
if (stopped) return null;
// for plain (unsecure) proxy connection the destination address is irrelevant.
addr = secure || proxy == null ? addr : null;
@ -185,7 +199,8 @@ final class ConnectionPool {
// it's possible that cleanup may have been called.
HttpConnection toClose = null;
synchronized(this) {
stateLock.lock();
try {
if (cleanup.isDone()) {
return;
} else if (stopped) {
@ -203,6 +218,8 @@ final class ConnectionPool {
putConnection(conn, sslPool);
}
expiryList.add(conn, now, keepAlive);
} finally {
stateLock.unlock();
}
if (toClose != null) {
if (debug.on()) {
@ -243,7 +260,7 @@ final class ConnectionPool {
removeFromPool(HttpConnection c,
HashMap<CacheKey,LinkedList<HttpConnection>> pool) {
//System.out.println("cacheCleaner removing: " + c);
assert Thread.holdsLock(this);
assert stateLock.isHeldByCurrentThread();
CacheKey k = c.cacheKey();
List<HttpConnection> l = pool.get(k);
if (l == null || l.isEmpty()) {
@ -288,7 +305,8 @@ final class ConnectionPool {
if (!expiryList.purgeMaybeRequired()) return nextPurge;
List<HttpConnection> closelist;
synchronized (this) {
stateLock.lock();
try {
closelist = expiryList.purgeUntil(now);
for (HttpConnection c : closelist) {
if (c instanceof PlainHttpConnection) {
@ -302,6 +320,8 @@ final class ConnectionPool {
nextPurge = now.until(
expiryList.nextExpiryDeadline().orElse(now),
ChronoUnit.MILLIS);
} finally {
stateLock.unlock();
}
closelist.forEach(this::close);
return nextPurge;
@ -316,7 +336,8 @@ final class ConnectionPool {
void stop() {
List<HttpConnection> closelist = Collections.emptyList();
try {
synchronized (this) {
stateLock.lock();
try {
stopped = true;
closelist = expiryList.stream()
.map(e -> e.connection)
@ -324,6 +345,8 @@ final class ConnectionPool {
expiryList.clear();
plainPool.clear();
sslPool.clear();
} finally {
stateLock.unlock();
}
} finally {
closelist.forEach(this::close);
@ -354,28 +377,25 @@ final class ConnectionPool {
// A loosely accurate boolean whose value is computed
// at the end of each operation performed on ExpiryList;
// Does not require synchronizing on the ConnectionPool.
// Does not require holding the ConnectionPool stateLock.
boolean purgeMaybeRequired() {
return mayContainEntries;
}
// Returns the next expiry deadline
// should only be called while holding a synchronization
// lock on the ConnectionPool
// should only be called while holding the ConnectionPool stateLock.
Optional<Instant> nextExpiryDeadline() {
if (list.isEmpty()) return Optional.empty();
else return Optional.of(list.getLast().expiry);
}
// should only be called while holding a synchronization
// lock on the ConnectionPool
// should only be called while holding the ConnectionPool stateLock.
HttpConnection removeOldest() {
ExpiryEntry entry = list.pollLast();
return entry == null ? null : entry.connection;
}
// should only be called while holding a synchronization
// lock on the ConnectionPool
// should only be called while holding the ConnectionPool stateLock.
void add(HttpConnection conn) {
add(conn, Instant.now(), KEEP_ALIVE_TIMEOUT);
}
@ -408,8 +428,7 @@ final class ConnectionPool {
mayContainEntries = true;
}
// should only be called while holding a synchronization
// lock on the ConnectionPool
// should only be called while holding the ConnectionPool stateLock.
void remove(HttpConnection c) {
if (c == null || list.isEmpty()) return;
ListIterator<ExpiryEntry> li = list.listIterator();
@ -423,8 +442,7 @@ final class ConnectionPool {
}
}
// should only be called while holding a synchronization
// lock on the ConnectionPool.
// should only be called while holding the ConnectionPool stateLock.
// Purge all elements whose deadline is before now (now included).
List<HttpConnection> purgeUntil(Instant now) {
if (list.isEmpty()) return Collections.emptyList();
@ -450,14 +468,12 @@ final class ConnectionPool {
return closelist;
}
// should only be called while holding a synchronization
// lock on the ConnectionPool
// should only be called while holding the ConnectionPool stateLock.
java.util.stream.Stream<ExpiryEntry> stream() {
return list.stream();
}
// should only be called while holding a synchronization
// lock on the ConnectionPool
// should only be called while holding the ConnectionPool stateLock.
void clear() {
list.clear();
mayContainEntries = false;
@ -465,10 +481,9 @@ final class ConnectionPool {
}
// Remove a connection from the pool.
// should only be called while holding a synchronization
// lock on the ConnectionPool
// should only be called while holding the ConnectionPool stateLock.
private void removeFromPool(HttpConnection c) {
assert Thread.holdsLock(this);
assert stateLock.isHeldByCurrentThread();
if (c instanceof PlainHttpConnection) {
removeFromPool(c, plainPool);
} else {
@ -478,7 +493,16 @@ final class ConnectionPool {
}
// Used by tests
synchronized boolean contains(HttpConnection c) {
boolean contains(HttpConnection c) {
stateLock.lock();
try {
return contains0(c);
} finally {
stateLock.unlock();
}
}
private boolean contains0(HttpConnection c) {
final CacheKey key = c.cacheKey();
List<HttpConnection> list;
if ((list = plainPool.get(key)) != null) {
@ -494,9 +518,12 @@ final class ConnectionPool {
if (debug.on())
debug.log("%s : ConnectionPool.cleanup(%s)",
String.valueOf(c.getConnectionFlow()), error);
synchronized(this) {
stateLock.lock();
try {
removeFromPool(c);
expiryList.remove(c);
} finally {
stateLock.unlock();
}
c.close();
}

@ -288,7 +288,7 @@ final class Exchange<T> {
IOException cause = null;
CompletableFuture<? extends ExchangeImpl<T>> cf = null;
if (failed != null) {
synchronized(this) {
synchronized (this) {
cause = failed;
impl = exchImpl;
cf = exchangeCF;
@ -371,7 +371,7 @@ final class Exchange<T> {
// instead - as we need CAS semantics.
synchronized (this) { exchangeCF = cf; };
res = cf.whenComplete((r,x) -> {
synchronized(Exchange.this) {
synchronized (Exchange.this) {
if (exchangeCF == cf) exchangeCF = null;
}
});

@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2022, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2023, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -419,7 +419,7 @@ class Http1AsyncReceiver {
}
void subscribe(Http1AsyncDelegate delegate) {
synchronized(this) {
synchronized (this) {
pendingDelegateRef.set(delegate);
}
if (queue.isEmpty()) {
@ -443,12 +443,16 @@ class Http1AsyncReceiver {
}
void unsubscribe(Http1AsyncDelegate delegate) {
synchronized(this) {
boolean unsubscribed = false;
synchronized (this) {
if (this.delegate == delegate) {
if (debug.on()) debug.log("Unsubscribed %s", delegate);
this.delegate = null;
unsubscribed = true;
}
}
if (unsubscribed) {
if (debug.on()) debug.log("Unsubscribed %s", delegate);
}
}
// Callback: Consumer of ByteBuffer

@ -39,7 +39,8 @@ import java.util.List;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import jdk.internal.net.http.common.Demand;
import jdk.internal.net.http.common.HttpBodySubscriberWrapper;
@ -68,11 +69,11 @@ class Http1Exchange<T> extends ExchangeImpl<T> {
/** Records a possible cancellation raised before any operation
* has been initiated, or an error received while sending the request. */
private Throwable failed;
private final AtomicReference<Throwable> failedRef = new AtomicReference<>();
private final List<CompletableFuture<?>> operations; // used for cancel
/** Must be held when operating on any internal state or data. */
private final Object lock = new Object();
private final ReentrantLock lock = new ReentrantLock();
/** Holds the outgoing data, either the headers or a request body part. Or
* an error from the request body publisher. At most there can be ~2 pieces
@ -268,11 +269,14 @@ class Http1Exchange<T> extends ExchangeImpl<T> {
// to unexpected exceptions.
private boolean registerResponseSubscriber(Http1ResponseBodySubscriber<T> subscriber) {
Throwable failed = null;
synchronized (lock) {
failed = this.failed;
lock.lock();
try {
failed = failedRef.get();
if (failed == null) {
this.responseSubscriber = subscriber;
}
} finally {
lock.unlock();
}
if (failed != null) {
subscriber.onError(failed);
@ -308,10 +312,13 @@ class Http1Exchange<T> extends ExchangeImpl<T> {
connectCF = connection.connectAsync(exchange)
.thenCompose(unused -> connection.finishConnect());
Throwable cancelled;
synchronized (lock) {
if ((cancelled = failed) == null) {
lock.lock();
try {
if ((cancelled = failedRef.get()) == null) {
operations.add(connectCF);
}
} finally {
lock.unlock();
}
if (cancelled != null) {
if (client.isSelectorThread()) {
@ -342,9 +349,7 @@ class Http1Exchange<T> extends ExchangeImpl<T> {
if (debug.on()) debug.log("requestAction.headers");
List<ByteBuffer> data = requestAction.headers();
synchronized (lock) {
state = State.HEADERS;
}
switchState(State.HEADERS);
if (debug.on()) debug.log("setting outgoing with headers");
assert outgoing.isEmpty() : "Unexpected outgoing:" + outgoing;
appendToOutgoing(data);
@ -410,10 +415,12 @@ class Http1Exchange<T> extends ExchangeImpl<T> {
if (debug.on()) debug.log("reading headers");
CompletableFuture<Response> cf = response.readHeadersAsync(executor);
Throwable cause;
synchronized (lock) {
lock.lock();
try {
operations.add(cf);
cause = failed;
failed = null;
cause = failedRef.compareAndExchange(failedRef.get(), null);
} finally {
lock.unlock();
}
if (cause != null) {
@ -469,9 +476,12 @@ class Http1Exchange<T> extends ExchangeImpl<T> {
ByteBuffer drainLeftOverBytes() {
synchronized (lock) {
lock.lock();
try {
asyncReceiver.stop();
return asyncReceiver.drain(Utils.EMPTY_BYTEBUFFER);
} finally {
lock.unlock();
}
}
@ -518,11 +528,11 @@ class Http1Exchange<T> extends ExchangeImpl<T> {
int count = 0;
Throwable error = null;
BodySubscriber<?> subscriber;
synchronized (lock) {
lock.lock();
try {
subscriber = responseSubscriber;
if ((error = failed) == null) {
failed = error = cause;
}
failedRef.compareAndSet(null, cause);
error = failedRef.get();
if (debug.on()) {
debug.log(request.uri() + ": " + error);
}
@ -551,6 +561,8 @@ class Http1Exchange<T> extends ExchangeImpl<T> {
}
operations.clear();
}
} finally {
lock.unlock();
}
// complete subscriber if needed
@ -601,16 +613,12 @@ class Http1Exchange<T> extends ExchangeImpl<T> {
/** Returns true if this exchange was canceled. */
boolean isCanceled() {
synchronized (lock) {
return failed != null;
}
return failedRef.get() != null;
}
/** Returns the cause for which this exchange was canceled, if available. */
Throwable getCancelCause() {
synchronized (lock) {
return failed;
}
return failedRef.get();
}
/** Convenience for {@link #appendToOutgoing(DataPair)}, with just a Throwable. */
@ -674,9 +682,7 @@ class Http1Exchange<T> extends ExchangeImpl<T> {
return null;
if (dp.throwable != null) {
synchronized (lock) {
state = State.ERROR;
}
switchState(State.ERROR);
exec.execute(() -> {
headersSentCF.completeExceptionally(dp.throwable);
bodySentCF.completeExceptionally(dp.throwable);
@ -687,18 +693,14 @@ class Http1Exchange<T> extends ExchangeImpl<T> {
switch (state) {
case HEADERS:
synchronized (lock) {
state = State.BODY;
}
switchState(State.BODY);
// completeAsync, since dependent tasks should run in another thread
if (debug.on()) debug.log("initiating completion of headersSentCF");
headersSentCF.completeAsync(() -> this, exec);
break;
case BODY:
if (dp.data == Http1RequestBodySubscriber.COMPLETED) {
synchronized (lock) {
state = State.COMPLETING;
}
switchState(State.COMPLETING);
if (debug.on()) debug.log("initiating completion of bodySentCF");
bodySentCF.completeAsync(() -> this, exec);
} else {
@ -716,6 +718,25 @@ class Http1Exchange<T> extends ExchangeImpl<T> {
return dp;
}
State switchState(State newState) {
lock.lock();
try {
return state = newState;
} finally {
lock.unlock();
}
}
State switchAssertState(State expected, State newState) {
lock.lock();
try {
assert state == expected : "Unexpected state:" + state + ", expected: " + expected;
return state = newState;
} finally {
lock.unlock();
}
}
/** A Publisher of HTTP/1.1 headers and request body. */
final class Http1Publisher implements FlowTube.TubePublisher {
@ -808,10 +829,7 @@ class Http1Exchange<T> extends ExchangeImpl<T> {
} else {
List<ByteBuffer> data = dp.data;
if (data == Http1RequestBodySubscriber.COMPLETED) {
synchronized (lock) {
assert state == State.COMPLETING : "Unexpected state:" + state;
state = State.COMPLETED;
}
switchAssertState(State.COMPLETING, State.COMPLETED);
if (debug.on())
debug.log("completed, stopping %s", writeScheduler);
writeScheduler.stop();

@ -1,5 +1,5 @@
/*
* Copyright (c) 2015, 2022, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2015, 2023, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -27,6 +27,8 @@ package jdk.internal.net.http;
import java.io.EOFException;
import java.lang.System.Logger.Level;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.net.http.HttpResponse.BodySubscriber;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
@ -114,21 +116,26 @@ class Http1Response<T> {
// of a pending operation. Although there usually is a single
// point where the operation starts, it may terminate at
// different places.
private final class ClientRefCountTracker {
final HttpClientImpl client = connection.client();
private static final class ClientRefCountTracker {
final HttpClientImpl client;
final Logger debug;
// state & 0x01 != 0 => acquire called
// state & 0x02 != 0 => tryRelease called
byte state;
volatile byte state;
public synchronized boolean acquire() {
if (state == 0) {
ClientRefCountTracker(HttpClientImpl client, Logger logger) {
this.client = client;
this.debug = logger;
}
public boolean acquire() {
if (STATE.compareAndSet(this, (byte) 0, (byte) 0x01)) {
// increment the reference count on the HttpClientImpl
// to prevent the SelectorManager thread from exiting
// until our operation is complete.
if (debug.on())
debug.log("Operation started: incrementing ref count for %s", client);
client.reference();
state = 0x01;
return true;
} else {
if (debug.on())
@ -139,8 +146,8 @@ class Http1Response<T> {
}
}
public synchronized void tryRelease() {
if (state == 0x01) {
public void tryRelease() {
if (STATE.compareAndSet(this, (byte) 0x01, (byte) 0x03)) {
// decrement the reference count on the HttpClientImpl
// to allow the SelectorManager thread to exit if no
// other operation is pending and the facade is no
@ -150,12 +157,21 @@ class Http1Response<T> {
client.unreference();
} else if (state == 0) {
if (debug.on())
debug.log("Operation finished: releasing ref count for %s", client);
debug.log("Operation not started: releasing ref count for %s", client);
} else if ((state & 0x02) == 0x02) {
if (debug.on())
debug.log("ref count for %s already released", client);
}
state |= 0x02;
}
private static final VarHandle STATE;
static {
try {
STATE = MethodHandles.lookup().findVarHandle(
ClientRefCountTracker.class, "state", byte.class);
} catch (NoSuchFieldException | IllegalAccessException e) {
throw new ExceptionInInitializerError(e);
}
}
}
@ -301,7 +317,7 @@ class Http1Response<T> {
// if we reach here, we must reset the headersReader state.
asyncReceiver.unsubscribe(headersReader);
headersReader.reset();
ClientRefCountTracker refCountTracker = new ClientRefCountTracker();
ClientRefCountTracker refCountTracker = new ClientRefCountTracker(connection.client(), debug);
// We need to keep hold on the client facade until the
// tracker has been incremented.

@ -36,6 +36,8 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.ReentrantLock;
import jdk.internal.net.http.common.Log;
import jdk.internal.net.http.common.Logger;
import jdk.internal.net.http.common.MinimalFuture;
@ -66,9 +68,11 @@ class Http2ClientImpl {
/* Map key is "scheme:host:port" */
private final Map<String,Http2Connection> connections = new ConcurrentHashMap<>();
// only accessed from within synchronized blocks
// only accessed from within lock protected blocks
private final Set<String> failures = new HashSet<>();
private final ReentrantLock lock = new ReentrantLock();
/**
* When HTTP/2 requested only. The following describes the aggregate behavior including the
* calling code. In all cases, the HTTP2 connection cache
@ -98,7 +102,8 @@ class Http2ClientImpl {
InetSocketAddress proxy = req.proxy();
String key = Http2Connection.keyFor(uri, proxy);
synchronized (this) {
lock.lock();
try {
Http2Connection connection = connections.get(key);
if (connection != null) {
try {
@ -124,11 +129,14 @@ class Http2ClientImpl {
if (debug.on()) debug.log("not found in connection pool");
return MinimalFuture.completedFuture(null);
}
} finally {
lock.unlock();
}
return Http2Connection
.createAsync(req, this, exchange)
.whenComplete((conn, t) -> {
synchronized (Http2ClientImpl.this) {
lock.lock();
try {
if (conn != null) {
try {
conn.reserveStream(true);
@ -141,6 +149,8 @@ class Http2ClientImpl {
if (cause instanceof Http2Connection.ALPNException)
failures.add(key);
}
} finally {
lock.unlock();
}
});
}
@ -161,7 +171,8 @@ class Http2ClientImpl {
}
String key = c.key();
synchronized(this) {
lock.lock();
try {
if (stopping) {
if (debug.on()) debug.log("stopping - closing connection: %s", c);
close(c);
@ -182,17 +193,22 @@ class Http2ClientImpl {
if (debug.on())
debug.log("put in the connection pool: %s", c);
return true;
} finally {
lock.unlock();
}
}
void deleteConnection(Http2Connection c) {
if (debug.on())
debug.log("removing from the connection pool: %s", c);
synchronized (this) {
lock.lock();
try {
if (connections.remove(c.key(), c)) {
if (debug.on())
debug.log("removed from the connection pool: %s", c);
}
} finally {
lock.unlock();
}
}
@ -201,7 +217,12 @@ class Http2ClientImpl {
if (debug.on()) debug.log("stopping");
STOPPED = new EOFException("HTTP/2 client stopped");
STOPPED.setStackTrace(new StackTraceElement[0]);
synchronized (this) {stopping = true;}
lock.lock();
try {
stopping = true;
} finally {
lock.unlock();
}
do {
connections.values().forEach(this::close);
} while (!connections.isEmpty());

@ -40,7 +40,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.ArrayList;
import java.util.Objects;
@ -48,6 +47,8 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Flow;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.function.Supplier;
import javax.net.ssl.SSLEngine;
@ -58,7 +59,6 @@ import jdk.internal.net.http.HttpConnection.HttpPublisher;
import jdk.internal.net.http.common.FlowTube;
import jdk.internal.net.http.common.FlowTube.TubeSubscriber;
import jdk.internal.net.http.common.HeaderDecoder;
import jdk.internal.net.http.common.HttpHeadersBuilder;
import jdk.internal.net.http.common.Log;
import jdk.internal.net.http.common.Logger;
import jdk.internal.net.http.common.MinimalFuture;
@ -143,7 +143,7 @@ class Http2Connection {
* expire when all its still open streams (which could be many) eventually
* complete.
*/
private boolean finalStream;
private volatile boolean finalStream;
/*
* ByteBuffer pooling strategy for HTTP/2 protocol.
@ -233,7 +233,8 @@ class Http2Connection {
if (!prefaceSent) {
if (debug.on())
debug.log("Preface not sent: buffering %d", buf.remaining());
synchronized (this) {
stateLock.lock();
try {
if (!prefaceSent) {
if (pending == null) pending = new ArrayList<>();
pending.add(buf);
@ -243,6 +244,8 @@ class Http2Connection {
);
return false;
}
} finally {
stateLock.unlock();
}
}
@ -252,7 +255,7 @@ class Http2Connection {
// concurrently while we're here.
// This ensures that later incoming buffers will not
// be processed before we have flushed the pending queue.
// No additional synchronization is therefore necessary here.
// No additional locking is therefore necessary here.
List<ByteBuffer> pending = this.pending;
this.pending = null;
if (pending != null) {
@ -274,15 +277,20 @@ class Http2Connection {
// Mark that the connection preface is sent
void markPrefaceSent() {
assert !prefaceSent;
synchronized (this) {
stateLock.lock();
try {
prefaceSent = true;
} finally {
stateLock.unlock();
}
}
}
private static final int HALF_CLOSED_LOCAL = 1;
private static final int HALF_CLOSED_REMOTE = 2;
private static final int SHUTDOWN_REQUESTED = 4;
private final Lock stateLock = new ReentrantLock();
volatile int closedState;
//-------------------------------------
@ -469,8 +477,17 @@ class Http2Connection {
// if false returned then a new Http2Connection is required
// if true, the stream may be assigned to this connection
// for server push, if false returned, then the stream should be cancelled
synchronized boolean reserveStream(boolean clientInitiated) throws IOException {
if (finalStream) {
boolean reserveStream(boolean clientInitiated) throws IOException {
stateLock.lock();
try {
return reserveStream0(clientInitiated);
} finally {
stateLock.unlock();
}
}
private boolean reserveStream0(boolean clientInitiated) throws IOException {
if (finalStream()) {
return false;
}
if (clientInitiated && (lastReservedClientStreamid + 2) >= MAX_CLIENT_STREAM_ID) {
@ -556,7 +573,7 @@ class Http2Connection {
.thenCompose(checkAlpnCF);
}
synchronized boolean finalStream() {
boolean finalStream() {
return finalStream;
}
@ -564,7 +581,7 @@ class Http2Connection {
* Mark this connection so no more streams created on it and it will close when
* all are complete.
*/
synchronized void setFinalStream() {
void setFinalStream() {
finalStream = true;
}
@ -1001,7 +1018,16 @@ class Http2Connection {
}
// reduce count of streams by 1 if stream still exists
synchronized void decrementStreamsCount(int streamid) {
void decrementStreamsCount(int streamid) {
stateLock.lock();
try {
decrementStreamsCount0(streamid);
} finally {
stateLock.unlock();
}
}
private void decrementStreamsCount0(int streamid) {
Stream<?> s = streams.get(streamid);
if (s == null || !s.deRegister())
return;
@ -1033,7 +1059,8 @@ class Http2Connection {
if (debug.on()) debug.log("Closed stream %d", streamid);
Stream<?> s;
synchronized (this) {
stateLock.lock();
try {
s = streams.remove(streamid);
if (s != null) {
// decrement the reference count on the HttpClientImpl
@ -1042,6 +1069,8 @@ class Http2Connection {
// longer referenced.
client().streamUnreference();
}
} finally {
stateLock.unlock();
}
// ## Remove s != null. It is a hack for delayed cancellation,reset
if (s != null && !(s instanceof Stream.PushedStream)) {
@ -1054,8 +1083,9 @@ class Http2Connection {
close();
} else {
// Start timer if property present and not already created
synchronized (this) {
// idleConnectionTimerEvent is always accessed within a synchronized block
stateLock.lock();
try {
// idleConnectionTimerEvent is always accessed within a lock protected block
if (streams.isEmpty() && idleConnectionTimeoutEvent == null) {
idleConnectionTimeoutEvent = client().idleConnectionTimeout()
.map(IdleConnectionTimeoutEvent::new).orElse(null);
@ -1063,6 +1093,8 @@ class Http2Connection {
client().registerTimer(idleConnectionTimeoutEvent);
}
}
} finally {
stateLock.unlock();
}
}
}
@ -1248,20 +1280,23 @@ class Http2Connection {
// increment the reference count on the HttpClientImpl
// to prevent the SelectorManager thread from exiting until
// the stream is closed.
synchronized (this) {
stateLock.lock();
try {
if (!isMarked(closedState, SHUTDOWN_REQUESTED)) {
if (debug.on()) {
debug.log("Opened stream %d", streamid);
}
client().streamReference();
streams.put(streamid, stream);
// idleConnectionTimerEvent is always accessed within a synchronized block
// idleConnectionTimerEvent is always accessed within a lock protected block
if (idleConnectionTimeoutEvent != null) {
client().cancelTimer(idleConnectionTimeoutEvent);
idleConnectionTimeoutEvent = null;
}
return;
}
} finally {
stateLock.unlock();
}
if (debug.on()) debug.log("connection closed: closing stream %d", stream);
stream.cancel();
@ -1391,12 +1426,13 @@ class Http2Connection {
}
}
private final Object sendlock = new Object();
private final Lock sendlock = new ReentrantLock();
void sendFrame(Http2Frame frame) {
try {
HttpPublisher publisher = publisher();
synchronized (sendlock) {
sendlock.lock();
try {
if (frame instanceof OutgoingHeaders) {
@SuppressWarnings("unchecked")
OutgoingHeaders<Stream<?>> oh = (OutgoingHeaders<Stream<?>>) frame;
@ -1408,6 +1444,8 @@ class Http2Connection {
} else {
publisher.enqueue(encodeFrame(frame));
}
} finally {
sendlock.unlock();
}
publisher.signalEnqueued();
} catch (IOException e) {
@ -1445,7 +1483,7 @@ class Http2Connection {
}
/*
* Direct call of the method bypasses synchronization on "sendlock" and
* Direct call of the method bypasses locking on "sendlock" and
* allowed only of control frames: WindowUpdateFrame, PingFrame and etc.
* prohibited for such frames as DataFrame, HeadersFrame, ContinuationFrame.
*/
@ -1562,8 +1600,13 @@ class Http2Connection {
}
}
synchronized boolean isActive() {
return numReservedClientStreams > 0 || numReservedServerStreams > 0;
boolean isActive() {
stateLock.lock();
try {
return numReservedClientStreams > 0 || numReservedServerStreams > 0;
} finally {
stateLock.unlock();
}
}
@Override

@ -75,6 +75,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.function.BooleanSupplier;
import java.util.stream.Stream;
@ -566,7 +567,8 @@ final class HttpClientImpl extends HttpClient implements Trackable {
*/
public boolean registerSubscriber(HttpBodySubscriberWrapper<?> subscriber) {
if (!selmgr.isClosed()) {
synchronized (selmgr) {
selmgr.lock();
try {
if (!selmgr.isClosed()) {
if (subscribers.add(subscriber)) {
long count = pendingSubscribersCount.incrementAndGet();
@ -576,6 +578,8 @@ final class HttpClientImpl extends HttpClient implements Trackable {
}
return true;
}
} finally {
selmgr.unlock();
}
}
subscriber.onError(selmgr.selectorClosedException());
@ -1103,6 +1107,7 @@ final class HttpClientImpl extends HttpClient implements Trackable {
private final HttpClientImpl owner;
private final ConnectionPool pool;
private final AtomicReference<Throwable> errorRef = new AtomicReference<>();
private final ReentrantLock lock = new ReentrantLock();
SelectorManager(HttpClientImpl ref) throws IOException {
super(null, null,
@ -1151,11 +1156,14 @@ final class HttpClientImpl extends HttpClient implements Trackable {
void register(AsyncEvent e) {
var closed = this.closed;
if (!closed) {
synchronized (this) {
lock.lock();
try {
closed = this.closed;
if (!closed) {
registrations.add(e);
}
} finally {
lock.unlock();
}
}
if (closed) {
@ -1183,7 +1191,8 @@ final class HttpClientImpl extends HttpClient implements Trackable {
}
Set<SelectionKey> keys = new HashSet<>();
Set<AsyncEvent> toAbort = new HashSet<>();
synchronized (this) {
lock.lock();
try {
if (closed = this.closed) return;
this.closed = true;
try {
@ -1195,6 +1204,8 @@ final class HttpClientImpl extends HttpClient implements Trackable {
toAbort.addAll(this.deregistrations);
this.registrations.clear();
this.deregistrations.clear();
} finally {
lock.unlock();
}
// double check after closing
abortPendingRequests(owner, t);
@ -1214,11 +1225,14 @@ final class HttpClientImpl extends HttpClient implements Trackable {
// Only called by the selector manager thread
private void shutdown() {
try {
synchronized (this) {
lock.lock();
try {
Log.logTrace("{0}: shutting down", getName());
if (debug.on()) debug.log("SelectorManager shutting down");
closed = true;
selector.close();
} finally {
lock.unlock();
}
} catch (IOException ignored) {
} finally {
@ -1240,7 +1254,8 @@ final class HttpClientImpl extends HttpClient implements Trackable {
try {
if (Log.channel()) Log.logChannel(getName() + ": starting");
while (!Thread.currentThread().isInterrupted() && !closed) {
synchronized (this) {
lock.lock();
try {
assert errorList.isEmpty();
assert readyList.isEmpty();
assert resetList.isEmpty();
@ -1290,6 +1305,8 @@ final class HttpClientImpl extends HttpClient implements Trackable {
}
registrations.clear();
selector.selectedKeys().clear();
} finally {
lock.unlock();
}
for (AsyncEvent event : readyList) {
@ -1450,6 +1467,14 @@ final class HttpClientImpl extends HttpClient implements Trackable {
event.handle();
}
}
void lock() {
lock.lock();
}
void unlock() {
lock.unlock();
}
}
final String debugInterestOps(SelectableChannel channel) {
@ -1692,15 +1717,19 @@ final class HttpClientImpl extends HttpClient implements Trackable {
// Timer controls.
// Timers are implemented through timed Selector.select() calls.
synchronized void registerTimer(TimeoutEvent event) {
void registerTimer(TimeoutEvent event) {
Log.logTrace("Registering timer {0}", event);
timeouts.add(event);
selmgr.wakeupSelector();
synchronized (this) {
timeouts.add(event);
selmgr.wakeupSelector();
}
}
synchronized void cancelTimer(TimeoutEvent event) {
void cancelTimer(TimeoutEvent event) {
Log.logTrace("Canceling timer {0}", event);
timeouts.remove(event);
synchronized (this) {
timeouts.remove(event);
}
}
/**
@ -1713,7 +1742,7 @@ final class HttpClientImpl extends HttpClient implements Trackable {
List<TimeoutEvent> toHandle = null;
int remaining = 0;
// enter critical section to retrieve the timeout event to handle
synchronized(this) {
synchronized (this) {
if (timeouts.isEmpty()) return 0L;
Instant now = Instant.now();

@ -98,13 +98,13 @@ abstract class HttpConnection implements Closeable {
private final Map<CompletionStage<?>, Boolean> operations =
new IdentityHashMap<>();
void add(CompletionStage<?> cf) {
synchronized(operations) {
synchronized (operations) {
operations.put(cf, Boolean.TRUE);
cf.whenComplete((r,t)-> remove(cf));
}
}
boolean remove(CompletionStage<?> cf) {
synchronized(operations) {
synchronized (operations) {
return operations.remove(cf);
}
}

@ -1,5 +1,5 @@
/*
* Copyright (c) 2015, 2020, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2015, 2023, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -29,7 +29,7 @@ import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
import javax.net.ssl.SSLSession;
import java.net.http.HttpClient;
@ -123,7 +123,7 @@ class HttpResponseImpl<T> implements HttpResponse<T>, RawChannel.Provider {
* the channel.
*/
@Override
public synchronized RawChannel rawChannel() throws IOException {
public RawChannel rawChannel() throws IOException {
if (rawChannelProvider == null) {
throw new UnsupportedOperationException(
"RawChannel is only supported for WebSocket creation");
@ -147,7 +147,7 @@ class HttpResponseImpl<T> implements HttpResponse<T>, RawChannel.Provider {
* the channel.
*/
@Override
public synchronized void closeRawChannel() throws IOException {
public void closeRawChannel() throws IOException {
if (rawChannelProvider == null) {
throw new UnsupportedOperationException(
"RawChannel is only supported for WebSocket creation");
@ -180,6 +180,7 @@ class HttpResponseImpl<T> implements HttpResponse<T>, RawChannel.Provider {
private final HttpConnection connection;
private final Exchange<?> exchange;
private RawChannel rawchan;
private final ReentrantLock stateLock = new ReentrantLock();
RawChannelProvider(HttpConnection conn, Exchange<?> exch) {
connection = conn;
exchange = exch;
@ -193,7 +194,16 @@ class HttpResponseImpl<T> implements HttpResponse<T>, RawChannel.Provider {
}
@Override
public synchronized RawChannel rawChannel() {
public RawChannel rawChannel() {
stateLock.lock();
try {
return rawChannel0();
} finally {
stateLock.unlock();
}
}
private RawChannel rawChannel0() {
if (rawchan == null) {
ExchangeImpl<?> exchImpl = exchangeImpl();
if (!(exchImpl instanceof Http1Exchange)) {
@ -213,11 +223,16 @@ class HttpResponseImpl<T> implements HttpResponse<T>, RawChannel.Provider {
return rawchan;
}
public synchronized void closeRawChannel() throws IOException {
public void closeRawChannel() throws IOException {
// close the rawChannel, if created, or the
// connection, if not.
if (rawchan != null) rawchan.close();
else connection.close();
stateLock.lock();
try {
if (rawchan != null) rawchan.close();
else connection.close();
} finally {
stateLock.unlock();
}
}
private static HttpConnection connection(Response resp, Exchange<?> exch) {

@ -1,5 +1,5 @@
/*
* Copyright (c) 2015, 2022, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2015, 2023, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -211,12 +211,16 @@ class MultiExchange<T> implements Cancelable {
return vers;
}
private synchronized void setExchange(Exchange<T> exchange) {
if (this.exchange != null && exchange != this.exchange) {
this.exchange.released();
if (cancelled) exchange.cancel();
private void setExchange(Exchange<T> exchange) {
Exchange<T> previousExchange;
synchronized (this) {
previousExchange = this.exchange;
this.exchange = exchange;
}
this.exchange = exchange;
if (previousExchange != null && exchange != previousExchange) {
previousExchange.released();
}
if (cancelled) exchange.cancel();
}
public Optional<Duration> remainingConnectTimeout() {

@ -1,5 +1,5 @@
/*
* Copyright (c) 2015, 2022, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2015, 2023, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -52,12 +52,11 @@ import jdk.internal.net.http.common.Utils;
*/
class PlainHttpConnection extends HttpConnection {
private final Object reading = new Object();
protected final SocketChannel chan;
private final SocketTube tube; // need SocketTube to call signalClosed().
private final PlainHttpPublisher writePublisher = new PlainHttpPublisher(reading);
private final PlainHttpPublisher writePublisher = new PlainHttpPublisher();
private volatile boolean connected;
private boolean closed;
private volatile boolean closed;
private volatile ConnectTimerEvent connectTimerEvent; // may be null
private volatile int unsuccessfulAttempts;
@ -243,9 +242,9 @@ class PlainHttpConnection extends HttpConnection {
if (closed) return false;
synchronized (this) {
closed = this.closed;
if (!closed) {
client().connectionOpened(this);
}
}
if (!closed) {
client().connectionOpened(this);
}
return !closed;
}

@ -1,5 +1,5 @@
/*
* Copyright (c) 2016, 2018, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2016, 2023, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -25,7 +25,6 @@
package jdk.internal.net.http;
import java.security.AccessControlContext;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.net.http.HttpRequest;
@ -33,6 +32,7 @@ import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodyHandler;
import java.net.http.HttpResponse.PushPromiseHandler;
import java.util.concurrent.Executor;
import java.util.concurrent.locks.ReentrantLock;
import jdk.internal.net.http.common.MinimalFuture;
import jdk.internal.net.http.common.Log;
@ -42,9 +42,10 @@ import jdk.internal.net.http.common.Log;
* Streams. This keeps track of all common state associated with the pushes.
*/
class PushGroup<T> {
private final HttpRequest initiatingRequest;
final CompletableFuture<Void> noMorePushesCF;
private final ReentrantLock stateLock = new ReentrantLock();
private final HttpRequest initiatingRequest;
volatile Throwable error; // any exception that occurred during pushes
@ -55,7 +56,6 @@ class PushGroup<T> {
int numberOfPushes;
int remainingPushes;
boolean noMorePushes = false;
PushGroup(PushPromiseHandler<T> pushPromiseHandler,
HttpRequestImpl initiatingRequest,
@ -68,7 +68,6 @@ class PushGroup<T> {
HttpRequestImpl initiatingRequest,
CompletableFuture<HttpResponse<T>> mainResponse,
Executor executor) {
this.noMorePushesCF = new MinimalFuture<>();
this.pushPromiseHandler = pushPromiseHandler;
this.initiatingRequest = initiatingRequest;
this.executor = executor;
@ -117,55 +116,51 @@ class PushGroup<T> {
throw t;
}
synchronized (this) {
stateLock.lock();
try {
if (acceptor.accepted()) {
numberOfPushes++;
remainingPushes++;
}
return acceptor;
} finally {
stateLock.unlock();
}
}
// This is called when the main body response completes because it means
// no more PUSH_PROMISEs are possible
synchronized void noMorePushes(boolean noMore) {
noMorePushes = noMore;
checkIfCompleted();
noMorePushesCF.complete(null);
void pushCompleted() {
stateLock.lock();
try {
remainingPushes--;
checkIfCompleted();
} finally {
stateLock.unlock();
}
}
synchronized CompletableFuture<Void> pushesCF() {
return noMorePushesCF;
}
synchronized boolean noMorePushes() {
return noMorePushes;
}
synchronized void pushCompleted() {
remainingPushes--;
checkIfCompleted();
}
synchronized void checkIfCompleted() {
private void checkIfCompleted() {
assert stateLock.isHeldByCurrentThread();
if (Log.trace()) {
Log.logTrace("PushGroup remainingPushes={0} error={1} noMorePushes={2}",
Log.logTrace("PushGroup remainingPushes={0} error={1}",
remainingPushes,
(error==null)?error:error.getClass().getSimpleName(),
noMorePushes);
(error==null)?error:error.getClass().getSimpleName());
}
if (remainingPushes == 0 && error == null && noMorePushes) {
if (remainingPushes == 0 && error == null) {
if (Log.trace()) {
Log.logTrace("push completed");
}
}
}
synchronized void pushError(Throwable t) {
void pushError(Throwable t) {
if (t == null) {
return;
}
this.error = t;
stateLock.lock();
try {
this.error = t;
} finally {
stateLock.unlock();
}
}
}

@ -1,5 +1,5 @@
/*
* Copyright (c) 2015, 2019, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2015, 2023, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -30,11 +30,14 @@ import java.io.UncheckedIOException;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpHeaders;
import java.util.concurrent.locks.ReentrantLock;
import jdk.internal.net.http.common.Log;
import jdk.internal.net.http.common.Utils;
class RedirectFilter implements HeaderFilter {
private final ReentrantLock stateLock = new ReentrantLock();
HttpRequestImpl request;
HttpClientImpl client;
HttpClient.Redirect policy;
@ -58,19 +61,29 @@ class RedirectFilter implements HeaderFilter {
public RedirectFilter() {}
@Override
public synchronized void request(HttpRequestImpl r, MultiExchange<?> e) throws IOException {
this.request = r;
this.client = e.client();
this.policy = client.followRedirects();
public void request(HttpRequestImpl r, MultiExchange<?> e) throws IOException {
stateLock.lock();
try {
this.request = r;
this.client = e.client();
this.policy = client.followRedirects();
this.method = r.method();
this.uri = r.uri();
this.exchange = e;
this.method = r.method();
this.uri = r.uri();
this.exchange = e;
} finally {
stateLock.unlock();
}
}
@Override
public synchronized HttpRequestImpl response(Response r) throws IOException {
return handleResponse(r);
public HttpRequestImpl response(Response r) throws IOException {
stateLock.lock();
try {
return handleResponse(r);
} finally {
stateLock.unlock();
}
}
private static String redirectedMethod(int statusCode, String orig) {

@ -1,5 +1,5 @@
/*
* Copyright (c) 2016, 2021, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2016, 2023, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -53,6 +53,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.function.Supplier;
@ -195,7 +196,7 @@ public final class RequestPublishers {
@Override
public long contentLength() {
if (contentLength == 0) {
synchronized(this) {
synchronized (this) {
if (contentLength == 0) {
contentLength = computeLength(content);
}
@ -387,6 +388,7 @@ public final class RequestPublishers {
volatile ByteBuffer nextBuffer;
volatile boolean need2Read = true;
volatile boolean haveNext;
final ReentrantLock stateLock = new ReentrantLock();
StreamIterator(InputStream is) {
this(is, Utils::getBuffer);
@ -433,7 +435,16 @@ public final class RequestPublishers {
}
@Override
public synchronized boolean hasNext() {
public boolean hasNext() {
stateLock.lock();
try {
return hasNext0();
} finally {
stateLock.unlock();
}
}
private boolean hasNext0() {
if (need2Read) {
try {
haveNext = read() != -1;
@ -454,12 +465,17 @@ public final class RequestPublishers {
}
@Override
public synchronized ByteBuffer next() {
if (!hasNext()) {
throw new NoSuchElementException();
public ByteBuffer next() {
stateLock.lock();
try {
if (!hasNext()) {
throw new NoSuchElementException();
}
need2Read = true;
return nextBuffer;
} finally {
stateLock.unlock();
}
need2Read = true;
return nextBuffer;
}
}

@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2021, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2023, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -495,9 +495,9 @@ final class SocketTube implements FlowTube {
}
void dropSubscription() {
if (debug.on()) debug.log("write: resetting demand to 0");
synchronized (InternalWriteSubscriber.this) {
cancelled = true;
if (debug.on()) debug.log("write: resetting demand to 0");
writeDemand.reset();
}
}
@ -1263,7 +1263,7 @@ final class SocketTube implements FlowTube {
private void resumeEvent(SocketFlowEvent event,
Consumer<Throwable> errorSignaler) {
boolean registrationRequired;
synchronized(lock) {
synchronized (lock) {
registrationRequired = !event.registered();
event.resume();
}
@ -1280,7 +1280,7 @@ final class SocketTube implements FlowTube {
private void pauseEvent(SocketFlowEvent event,
Consumer<Throwable> errorSignaler) {
synchronized(lock) {
synchronized (lock) {
event.pause();
}
try {

@ -44,6 +44,8 @@ import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiPredicate;
import java.net.http.HttpClient;
import java.net.http.HttpHeaders;
@ -148,7 +150,8 @@ class Stream<T> extends ExchangeImpl<T> {
private boolean requestSent, responseReceived;
// send lock: prevent sending DataFrames after reset occurred.
private final Object sendLock = new Object();
private final Lock sendLock = new ReentrantLock();
private final Lock stateLock = new ReentrantLock();
/**
* A reference to this Stream's connection Send Window controller. The
@ -391,18 +394,24 @@ class Stream<T> extends ExchangeImpl<T> {
*/
int markStream(int code) {
if (code == 0) return streamState;
synchronized (sendLock) {
sendLock.lock();
try {
return (int) STREAM_STATE.compareAndExchange(this, 0, code);
} finally {
sendLock.unlock();
}
}
private void sendDataFrame(DataFrame frame) {
synchronized (sendLock) {
sendLock.lock();
try {
// must not send DataFrame after reset.
if (streamState == 0) {
connection.sendDataFrame(frame);
}
}
} finally {
sendLock.unlock();
}
}
// pushes entire response body into response subscriber
@ -585,12 +594,15 @@ class Stream<T> extends ExchangeImpl<T> {
void handleReset(ResetFrame frame, Flow.Subscriber<?> subscriber) {
Log.logTrace("Handling RST_STREAM on stream {0}", streamid);
if (!closed) {
synchronized (this) {
stateLock.lock();
try {
if (closed) {
if (debug.on()) debug.log("Stream already closed: ignoring RESET");
return;
}
closed = true;
} finally {
stateLock.lock();
}
try {
int error = frame.getErrorCode();
@ -1118,13 +1130,15 @@ class Stream<T> extends ExchangeImpl<T> {
*/
final List<CompletableFuture<Response>> response_cfs = new ArrayList<>(5);
final Lock response_cfs_lock = new ReentrantLock();
@Override
CompletableFuture<Response> getResponseAsync(Executor executor) {
CompletableFuture<Response> cf;
// The code below deals with race condition that can be caused when
// completeResponse() is being called before getResponseAsync()
synchronized (response_cfs) {
response_cfs_lock.lock();
try {
if (!response_cfs.isEmpty()) {
// This CompletableFuture was created by completeResponse().
// it will be already completed.
@ -1139,6 +1153,8 @@ class Stream<T> extends ExchangeImpl<T> {
cf = new MinimalFuture<>();
response_cfs.add(cf);
}
} finally {
response_cfs_lock.unlock();
}
if (executor != null && !cf.isDone()) {
// protect from executing later chain of CompletableFuture operations from SelectorManager thread
@ -1158,7 +1174,8 @@ class Stream<T> extends ExchangeImpl<T> {
* uncompleted CF then creates one (completes it) and adds to list
*/
void completeResponse(Response resp) {
synchronized (response_cfs) {
response_cfs_lock.lock();
try {
CompletableFuture<Response> cf;
int cfs_len = response_cfs.size();
for (int i=0; i<cfs_len; i++) {
@ -1179,32 +1196,44 @@ class Stream<T> extends ExchangeImpl<T> {
if (debug.on())
debug.log("Adding completed responseCF(0) with response headers");
response_cfs.add(cf);
} finally {
response_cfs_lock.unlock();
}
}
// methods to update state and remove stream when finished
synchronized void requestSent() {
requestSent = true;
if (responseReceived) {
if (debug.on()) debug.log("requestSent: streamid=%d", streamid);
close();
} else {
if (debug.on()) {
debug.log("requestSent: streamid=%d but response not received", streamid);
void requestSent() {
stateLock.lock();
try {
requestSent = true;
if (responseReceived) {
if (debug.on()) debug.log("requestSent: streamid=%d", streamid);
close();
} else {
if (debug.on()) {
debug.log("requestSent: streamid=%d but response not received", streamid);
}
}
} finally {
stateLock.unlock();
}
}
synchronized void responseReceived() {
responseReceived = true;
if (requestSent) {
if (debug.on()) debug.log("responseReceived: streamid=%d", streamid);
close();
} else {
if (debug.on()) {
debug.log("responseReceived: streamid=%d but request not sent", streamid);
void responseReceived() {
stateLock.lock();
try {
responseReceived = true;
if (requestSent) {
if (debug.on()) debug.log("responseReceived: streamid=%d", streamid);
close();
} else {
if (debug.on()) {
debug.log("responseReceived: streamid=%d but request not sent", streamid);
}
}
} finally {
stateLock.unlock();
}
}
@ -1212,7 +1241,8 @@ class Stream<T> extends ExchangeImpl<T> {
* same as above but for errors
*/
void completeResponseExceptionally(Throwable t) {
synchronized (response_cfs) {
response_cfs_lock.lock();
try {
// use index to avoid ConcurrentModificationException
// caused by removing the CF from within the loop.
for (int i = 0; i < response_cfs.size(); i++) {
@ -1224,6 +1254,8 @@ class Stream<T> extends ExchangeImpl<T> {
}
}
response_cfs.add(MinimalFuture.failedFuture(t));
} finally {
response_cfs_lock.unlock();
}
}
@ -1308,10 +1340,13 @@ class Stream<T> extends ExchangeImpl<T> {
}
boolean closing;
if (closing = !closed) { // assigning closing to !closed
synchronized (this) {
stateLock.lock();
try {
if (closing = !closed) { // assigning closing to !closed
closed=true;
}
} finally {
stateLock.unlock();
}
}
@ -1368,9 +1403,12 @@ class Stream<T> extends ExchangeImpl<T> {
// This method doesn't send any frame
void close() {
if (closed) return;
synchronized(this) {
stateLock.lock();
try {
if (closed) return;
closed = true;
} finally {
stateLock.unlock();
}
if (debug.on()) debug.log("close stream %d", streamid);
Log.logTrace("Closing stream {0}", streamid);
@ -1399,7 +1437,7 @@ class Stream<T> extends ExchangeImpl<T> {
final CompletableFuture<Response> pushCF;
CompletableFuture<HttpResponse<T>> responseCF;
final HttpRequestImpl pushReq;
HttpResponse.BodyHandler<T> pushHandler;
volatile HttpResponse.BodyHandler<T> pushHandler;
private volatile boolean finalPushResponseCodeReceived;
PushedStream(PushGroup<T> pushGroup,
@ -1418,11 +1456,11 @@ class Stream<T> extends ExchangeImpl<T> {
return responseCF;
}
synchronized void setPushHandler(HttpResponse.BodyHandler<T> pushHandler) {
void setPushHandler(HttpResponse.BodyHandler<T> pushHandler) {
this.pushHandler = pushHandler;
}
synchronized HttpResponse.BodyHandler<T> getPushHandler() {
HttpResponse.BodyHandler<T> getPushHandler() {
// ignored parameters to function can be used as BodyHandler
return this.pushHandler;
}
@ -1488,11 +1526,6 @@ class Stream<T> extends ExchangeImpl<T> {
pushCF.completeExceptionally(t);
}
// @Override
// synchronized void responseReceived() {
// super.responseReceived();
// }
// create and return the PushResponseImpl
@Override
protected void handleResponse() {
@ -1570,7 +1603,7 @@ class Stream<T> extends ExchangeImpl<T> {
* Returns true if this exchange was canceled.
* @return true if this exchange was canceled.
*/
synchronized boolean isCanceled() {
boolean isCanceled() {
return errorRef.get() != null;
}
@ -1578,7 +1611,7 @@ class Stream<T> extends ExchangeImpl<T> {
* Returns the cause for which this exchange was canceled, if available.
* @return the cause for which this exchange was canceled, if available.
*/
synchronized Throwable getCancelCause() {
Throwable getCancelCause() {
return errorRef.get();
}

@ -1,5 +1,5 @@
/*
* Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2015, 2023, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -31,6 +31,7 @@ import jdk.internal.net.http.frame.SettingsFrame;
import jdk.internal.net.http.frame.WindowUpdateFrame;
import jdk.internal.net.http.common.Utils;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
abstract class WindowUpdateSender {
@ -40,6 +41,7 @@ abstract class WindowUpdateSender {
final int limit;
final Http2Connection connection;
final AtomicInteger received = new AtomicInteger();
final ReentrantLock sendLock = new ReentrantLock();
WindowUpdateSender(Http2Connection connection) {
this(connection, connection.clientSettings.getParameter(SettingsFrame.INITIAL_WINDOW_SIZE));
@ -70,12 +72,15 @@ abstract class WindowUpdateSender {
int rcv = received.addAndGet(delta);
if (debug.on()) debug.log("update: %d, received: %d, limit: %d", delta, rcv, limit);
if (rcv > limit) {
synchronized (this) {
sendLock.lock();
try {
int tosend = received.get();
if( tosend > limit) {
received.getAndAdd(-tosend);
sendWindowUpdate(tosend);
}
} finally {
sendLock.unlock();
}
}
}

@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2022, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2023, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -49,6 +49,7 @@ import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.IntBinaryOperator;
@ -259,7 +260,7 @@ public class SSLFlowDelegate {
final SequentialScheduler scheduler;
volatile ByteBuffer readBuf;
volatile boolean completing;
final Object readBufferLock = new Object();
final ReentrantLock readBufferLock = new ReentrantLock();
final Logger debugr = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
private final class ReaderDownstreamPusher implements Runnable {
@ -340,7 +341,8 @@ public class SSLFlowDelegate {
// readBuf is kept ready for reading outside of this method
private void addToReadBuf(List<ByteBuffer> buffers, boolean complete) {
assert Utils.remaining(buffers) > 0 || buffers.isEmpty();
synchronized (readBufferLock) {
readBufferLock.lock();
try {
for (ByteBuffer buf : buffers) {
readBuf.compact();
while (readBuf.remaining() < buf.remaining())
@ -357,6 +359,8 @@ public class SSLFlowDelegate {
this.completing = complete;
minBytesRequired = 0;
}
} finally {
readBufferLock.unlock();
}
}
@ -423,7 +427,8 @@ public class SSLFlowDelegate {
boolean handshaking = false;
try {
EngineResult result;
synchronized (readBufferLock) {
readBufferLock.lock();
try {
complete = this.completing;
if (debugr.on()) debugr.log("Unwrapping: %s", readBuf.remaining());
// Unless there is a BUFFER_UNDERFLOW, we should try to
@ -436,6 +441,8 @@ public class SSLFlowDelegate {
debugr.log("Unwrapped: result: %s", result.result);
debugr.log("Unwrapped: consumed: %s", result.bytesConsumed());
}
} finally {
readBufferLock.unlock();
}
if (result.bytesProduced() > 0) {
if (debugr.on())
@ -448,7 +455,8 @@ public class SSLFlowDelegate {
// not enough data in the read buffer...
// no need to try to unwrap again unless we get more bytes
// than minBytesRequired = len in the read buffer.
synchronized (readBufferLock) {
readBufferLock.lock();
try {
minBytesRequired = len;
// more bytes could already have been added...
assert readBuf.remaining() >= len;
@ -465,6 +473,8 @@ public class SSLFlowDelegate {
throw new IOException("BUFFER_UNDERFLOW with EOF, "
+ len + " bytes non decrypted.");
}
} finally {
readBufferLock.unlock();
}
// request more data and return.
requestMore();
@ -500,8 +510,11 @@ public class SSLFlowDelegate {
}
}
if (!complete) {
synchronized (readBufferLock) {
readBufferLock.lock();
try {
complete = this.completing && !readBuf.hasRemaining();
} finally {
readBufferLock.unlock();
}
}
if (complete) {
@ -754,7 +767,7 @@ public class SSLFlowDelegate {
}
private boolean hsTriggered() {
synchronized(writeList) {
synchronized (writeList) {
for (ByteBuffer b : writeList)
if (b == HS_TRIGGER)
return true;
@ -1173,8 +1186,12 @@ public class SSLFlowDelegate {
// won't be able to send the acknowledgement.
// Nothing more will come from the socket either,
// so mark the reader as completed.
synchronized (reader.readBufferLock) {
var readerLock = reader.readBufferLock;
readerLock.lock();
try {
reader.completing = true;
} finally {
readerLock.unlock();
}
}
}

@ -529,7 +529,7 @@ public class SSLTube implements FlowTube {
public void onComplete() {
assert !finished && !onCompleteReceived;
DelegateWrapper subscriberImpl;
synchronized(this) {
synchronized (this) {
subscriberImpl = subscribed;
}

@ -1,5 +1,5 @@
/*
* Copyright (c) 2016, 2021, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2016, 2023, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -156,29 +156,6 @@ public final class SequentialScheduler {
protected abstract void run();
}
/**
* A task that runs its main loop within a synchronized block to provide
* memory visibility between runs. Since the main loop can't run concurrently,
* the lock shouldn't be contended and no deadlock should ever be possible.
*/
public static final class SynchronizedRestartableTask
extends CompleteRestartableTask {
private final Runnable mainLoop;
private final Object lock = new Object();
public SynchronizedRestartableTask(Runnable mainLoop) {
this.mainLoop = mainLoop;
}
@Override
protected void run() {
synchronized(lock) {
mainLoop.run();
}
}
}
/**
* A task that runs its main loop within a block protected by a lock to provide
* memory visibility between runs. Since the main loop can't run concurrently,
@ -376,22 +353,6 @@ public final class SequentialScheduler {
state.set(STOP);
}
/**
* Returns a new {@code SequentialScheduler} that executes the provided
* {@code mainLoop} from within a {@link SynchronizedRestartableTask}.
*
* @apiNote This is equivalent to calling
* {@code new SequentialScheduler(new SynchronizedRestartableTask(mainLoop))}
* The main loop must not perform any blocking operation.
*
* @param mainLoop The main loop of the new sequential scheduler
* @return a new {@code SequentialScheduler} that executes the provided
* {@code mainLoop} from within a {@link SynchronizedRestartableTask}.
*/
public static SequentialScheduler synchronizedScheduler(Runnable mainLoop) {
return new SequentialScheduler(new SynchronizedRestartableTask(mainLoop));
}
/**
* Returns a new {@code SequentialScheduler} that executes the provided
* {@code mainLoop} from within a {@link LockingRestartableTask}.

@ -282,7 +282,7 @@ public abstract class SubscriberWrapper
// If there was an error, send it downstream.
Throwable error = errorRef.get();
if (error != null && outputQ.isEmpty()) {
synchronized(this) {
synchronized (this) {
if (downstreamCompleted)
return;
downstreamCompleted = true;

@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2018, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2023, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -72,7 +72,7 @@ public class SubscriptionBase implements Flow.Subscription {
}
@Override
public synchronized String toString() {
public String toString() {
return "SubscriptionBase: window = " + demand.get() +
" cancelled = " + cancelled.toString();
}

@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2020, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2023, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -33,13 +33,13 @@ import jdk.internal.net.http.common.SequentialScheduler.CompleteRestartableTask;
import jdk.internal.net.http.common.Utils;
import java.io.IOException;
import java.lang.System.Logger.Level;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SelectionKey;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
@ -73,7 +73,7 @@ public class TransportImpl implements Transport {
private final Demand demand = new Demand();
private final SequentialScheduler receiveScheduler;
private final RawChannel channel;
private final Object closeLock = new Object();
private final ReentrantLock closeLock = new ReentrantLock();
private final RawChannel.RawEvent writeEvent = new WriteEvent();
private final RawChannel.RawEvent readEvent = new ReadEvent();
private final AtomicReference<ChannelState> writeState
@ -302,7 +302,8 @@ public class TransportImpl implements Transport {
if (debug.on()) {
debug.log("closeOutput");
}
synchronized (closeLock) {
closeLock.lock();
try {
if (!outputClosed) {
outputClosed = true;
try {
@ -314,6 +315,8 @@ public class TransportImpl implements Transport {
}
}
}
} finally {
closeLock.unlock();
}
ChannelState s = writeState.get();
assert s == CLOSED : s;
@ -329,7 +332,8 @@ public class TransportImpl implements Transport {
if (debug.on()) {
debug.log("closeInput");
}
synchronized (closeLock) {
closeLock.lock();
try {
if (!inputClosed) {
inputClosed = true;
try {
@ -343,6 +347,8 @@ public class TransportImpl implements Transport {
}
}
}
} finally {
closeLock.unlock();
}
}

@ -32,9 +32,7 @@ import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import jdk.httpclient.test.lib.common.HttpServerAdapters;
import jdk.httpclient.test.lib.http2.Http2TestServer;
import com.sun.net.httpserver.HttpServer;
import com.sun.net.httpserver.HttpsConfigurator;
import com.sun.net.httpserver.HttpsServer;
import org.testng.annotations.AfterClass;
@ -55,6 +53,7 @@ import static java.net.http.HttpClient.Version.HTTP_2;
* @build jdk.httpclient.test.lib.common.HttpServerAdapters jdk.test.lib.net.SimpleSSLContext
* DigestEchoServer
* @run testng/othervm -Dtest.requiresHost=true
* -Djdk.tracePinnedThreads=full
* -Djdk.httpclient.HttpClient.log=headers
* -Djdk.internal.httpclient.debug=false
* AuthFilterCacheTest
@ -90,7 +89,9 @@ public class AuthFilterCacheTest implements HttpServerAdapters {
ProxySelector proxySelector;
MyAuthenticator auth;
HttpClient client;
ExecutorService executor = Executors.newCachedThreadPool();
ExecutorService serverExecutor = Executors.newCachedThreadPool();
ExecutorService virtualExecutor = Executors.newThreadPerTaskExecutor(Thread.ofVirtual()
.name("HttpClient-Worker", 0).factory());
@DataProvider(name = "uris")
Object[][] testURIs() {
@ -108,6 +109,7 @@ public class AuthFilterCacheTest implements HttpServerAdapters {
public HttpClient newHttpClient(ProxySelector ps, Authenticator auth) {
HttpClient.Builder builder = HttpClient
.newBuilder()
.executor(virtualExecutor)
.sslContext(context)
.authenticator(auth)
.proxy(ps);
@ -122,7 +124,7 @@ public class AuthFilterCacheTest implements HttpServerAdapters {
auth = new MyAuthenticator();
// HTTP/1.1
http1Server = HttpTestServer.create(HTTP_1_1, null, executor);
http1Server = HttpTestServer.create(HTTP_1_1, null, serverExecutor);
http1Server.addHandler(new TestHandler(), "/AuthFilterCacheTest/http1/");
http1Server.start();
http1URI = new URI("http://" + http1Server.serverAuthority()
@ -130,7 +132,7 @@ public class AuthFilterCacheTest implements HttpServerAdapters {
// HTTPS/1.1
HttpsServer sserver1 = HttpsServer.create(sa, 100);
sserver1.setExecutor(executor);
sserver1.setExecutor(serverExecutor);
sserver1.setHttpsConfigurator(new HttpsConfigurator(context));
https1Server = HttpTestServer.of(sserver1);
https1Server.addHandler(new TestHandler(), "/AuthFilterCacheTest/https1/");
@ -175,7 +177,8 @@ public class AuthFilterCacheTest implements HttpServerAdapters {
https1Server = stop(https1Server, HttpTestServer::stop);
http2Server = stop(http2Server, HttpTestServer::stop);
https2Server = stop(https2Server, HttpTestServer::stop);
client = null;
client.close();
virtualExecutor.close();
System.out.println("Teardown: done");
}
@ -267,7 +270,7 @@ public class AuthFilterCacheTest implements HttpServerAdapters {
CompletableFuture.allOf(cfs.toArray(new CompletableFuture[0])).join();
}
static class MyAuthenticator extends Authenticator {
static final class MyAuthenticator extends Authenticator {
private int count = 0;
MyAuthenticator() {
@ -275,11 +278,31 @@ public class AuthFilterCacheTest implements HttpServerAdapters {
}
public PasswordAuthentication getPasswordAuthentication() {
System.out.println("Authenticator called: " + ++count);
return (new PasswordAuthentication("user" + count,
("passwordNotCheckedAnyway" + count).toCharArray()));
}
@Override
public PasswordAuthentication requestPasswordAuthenticationInstance(String host,
InetAddress addr,
int port,
String protocol,
String prompt,
String scheme,
URL url,
RequestorType reqType) {
PasswordAuthentication passwordAuthentication;
int count;
synchronized (this) {
count = ++this.count;
passwordAuthentication = super.requestPasswordAuthenticationInstance(
host, addr, port, protocol, prompt, scheme, url, reqType);
}
// log outside of synchronized block
System.out.println("Authenticator called: " + count);
return passwordAuthentication;
}
public int getCount() {
return count;
}

@ -29,6 +29,7 @@
* @library /test/lib /test/jdk/java/net/httpclient/lib
* @build jdk.httpclient.test.lib.http2.Http2TestServer jdk.test.lib.net.SimpleSSLContext
* @run testng/othervm
* -Djdk.tracePinnedThreads=full
* -Djdk.httpclient.HttpClient.log=headers,errors,channel
* ConcurrentResponses
*/
@ -45,7 +46,10 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
import javax.net.ssl.SSLContext;
import com.sun.net.httpserver.HttpExchange;
@ -60,7 +64,6 @@ import java.net.http.HttpResponse.BodyHandler;
import java.net.http.HttpResponse.BodyHandlers;
import java.net.http.HttpResponse.BodySubscriber;
import java.net.http.HttpResponse.BodySubscribers;
import jdk.httpclient.test.lib.common.HttpServerAdapters;
import jdk.httpclient.test.lib.http2.Http2TestServer;
import jdk.httpclient.test.lib.http2.Http2TestExchange;
import jdk.httpclient.test.lib.http2.Http2Handler;
@ -86,6 +89,7 @@ public class ConcurrentResponses {
String http2FixedURI, https2FixedURI, http2VariableURI, https2VariableURI;
static final int CONCURRENT_REQUESTS = 13;
static final AtomicInteger IDS = new AtomicInteger();
static final String ALPHABET = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
static final int ALPHABET_LENGTH = ALPHABET.length();
@ -149,52 +153,72 @@ public class ConcurrentResponses {
// into the byte buffers it is given.
@Test(dataProvider = "uris")
void testAsString(String uri) throws Exception {
HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build();
int id = IDS.getAndIncrement();
ExecutorService virtualExecutor = Executors.newThreadPerTaskExecutor(Thread.ofVirtual()
.name("HttpClient-" + id + "-Worker", 0).factory());
HttpClient client = HttpClient.newBuilder()
.sslContext(sslContext)
.executor(virtualExecutor)
.build();
try {
Map<HttpRequest, String> requests = new HashMap<>();
for (int i = 0; i < CONCURRENT_REQUESTS; i++) {
HttpRequest request = HttpRequest.newBuilder(URI.create(uri + "?" + i))
.build();
requests.put(request, BODIES[i]);
}
Map<HttpRequest, String> requests = new HashMap<>();
for (int i=0;i<CONCURRENT_REQUESTS; i++) {
HttpRequest request = HttpRequest.newBuilder(URI.create(uri + "?" + i))
.build();
requests.put(request, BODIES[i]);
// initial connection to seed the cache so next parallel connections reuse it
client.sendAsync(HttpRequest.newBuilder(URI.create(uri)).build(), discarding()).join();
// will reuse connection cached from the previous request ( when HTTP/2 )
CompletableFuture.allOf(requests.keySet().parallelStream()
.map(request -> client.sendAsync(request, BodyHandlers.ofString()))
.map(cf -> cf.thenCompose(ConcurrentResponses::assert200ResponseCode))
.map(cf -> cf.thenCompose(response -> assertbody(response, requests.get(response.request()))))
.toArray(CompletableFuture<?>[]::new))
.join();
} finally {
client.close();
virtualExecutor.close();
}
// initial connection to seed the cache so next parallel connections reuse it
client.sendAsync(HttpRequest.newBuilder(URI.create(uri)).build(), discarding()).join();
// will reuse connection cached from the previous request ( when HTTP/2 )
CompletableFuture.allOf(requests.keySet().parallelStream()
.map(request -> client.sendAsync(request, BodyHandlers.ofString()))
.map(cf -> cf.thenCompose(ConcurrentResponses::assert200ResponseCode))
.map(cf -> cf.thenCompose(response -> assertbody(response, requests.get(response.request()))))
.toArray(CompletableFuture<?>[]::new))
.join();
}
// The custom subscriber aggressively attacks any area, between the limit
// and the capacity, in the byte buffers it is given, by writing 'X' into it.
@Test(dataProvider = "uris")
void testWithCustomSubscriber(String uri) throws Exception {
HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build();
int id = IDS.getAndIncrement();
ExecutorService virtualExecutor = Executors.newThreadPerTaskExecutor(Thread.ofVirtual()
.name("HttpClient-" + id + "-Worker", 0).factory());
HttpClient client = HttpClient.newBuilder()
.executor(virtualExecutor)
.sslContext(sslContext).build();
try {
Map<HttpRequest, String> requests = new HashMap<>();
for (int i = 0; i < CONCURRENT_REQUESTS; i++) {
HttpRequest request = HttpRequest.newBuilder(URI.create(uri + "?" + i))
.build();
requests.put(request, BODIES[i]);
}
Map<HttpRequest, String> requests = new HashMap<>();
for (int i=0;i<CONCURRENT_REQUESTS; i++) {
HttpRequest request = HttpRequest.newBuilder(URI.create(uri + "?" + i))
.build();
requests.put(request, BODIES[i]);
// initial connection to seed the cache so next parallel connections reuse it
client.sendAsync(HttpRequest.newBuilder(URI.create(uri)).build(), discarding()).join();
// will reuse connection cached from the previous request ( when HTTP/2 )
CompletableFuture.allOf(requests.keySet().parallelStream()
.map(request -> client.sendAsync(request, CustomSubscriber.handler))
.map(cf -> cf.thenCompose(ConcurrentResponses::assert200ResponseCode))
.map(cf -> cf.thenCompose(response -> assertbody(response, requests.get(response.request()))))
.toArray(CompletableFuture<?>[]::new))
.join();
} finally {
client.close();
virtualExecutor.close();
}
// initial connection to seed the cache so next parallel connections reuse it
client.sendAsync(HttpRequest.newBuilder(URI.create(uri)).build(), discarding()).join();
// will reuse connection cached from the previous request ( when HTTP/2 )
CompletableFuture.allOf(requests.keySet().parallelStream()
.map(request -> client.sendAsync(request, CustomSubscriber.handler))
.map(cf -> cf.thenCompose(ConcurrentResponses::assert200ResponseCode))
.map(cf -> cf.thenCompose(response -> assertbody(response, requests.get(response.request()))))
.toArray(CompletableFuture<?>[]::new))
.join();
}
/**
* A subscriber that wraps ofString, but mucks with any data between limit
* and capacity, if the client mistakenly passes it any that is should not.

@ -31,12 +31,10 @@
* @bug 8286171
* @library /test/lib /test/jdk/java/net/httpclient/lib
* @build jdk.httpclient.test.lib.common.HttpServerAdapters
* @run testng/othervm ExpectContinueTest
* @run testng/othervm -Djdk.internal.httpclient.debug=err ExpectContinueTest
*/
import com.sun.net.httpserver.HttpServer;
import org.testng.annotations.AfterTest;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.DataProvider;
@ -63,7 +61,6 @@ import java.net.http.HttpResponse;
import java.util.StringTokenizer;
import java.util.concurrent.CompletableFuture;
import jdk.httpclient.test.lib.common.HttpServerAdapters;
import jdk.httpclient.test.lib.http2.Http2TestServer;
import static java.net.http.HttpClient.Version.HTTP_1_1;
import static java.net.http.HttpClient.Version.HTTP_2;

@ -21,9 +21,6 @@
* questions.
*/
import com.sun.net.httpserver.HttpServer;
import com.sun.net.httpserver.HttpsConfigurator;
import com.sun.net.httpserver.HttpsServer;
import jdk.test.lib.net.IPSupport;
import jdk.test.lib.net.SimpleSSLContext;
import org.testng.Assert;
@ -33,22 +30,23 @@ import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import javax.net.ssl.SSLContext;
import java.io.Closeable;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import java.util.function.Supplier;
import jdk.httpclient.test.lib.common.HttpServerAdapters;
import jdk.httpclient.test.lib.http2.Http2TestServer;
import static java.net.http.HttpClient.Version.HTTP_1_1;
import static java.net.http.HttpClient.Version.HTTP_2;
@ -72,6 +70,7 @@ import static java.net.http.HttpClient.Version.HTTP_2;
* -Djdk.httpclient.HttpClient.log=frames,ssl,requests,responses,errors
* -Djdk.internal.httpclient.debug=true
* -Dsun.net.httpserver.idleInterval=50000
* -Djdk.tracePinnedThreads=full
* HttpClientLocalAddrTest
*
*/
@ -86,6 +85,7 @@ public class HttpClientLocalAddrTest implements HttpServerAdapters {
private static URI http2URI;
private static HttpServerAdapters.HttpTestServer https2_Server;
private static URI https2URI;
private static final AtomicInteger IDS = new AtomicInteger();
// start various HTTP/HTTPS servers that will be invoked against in the tests
@BeforeClass
@ -185,13 +185,13 @@ public class HttpClientLocalAddrTest implements HttpServerAdapters {
final var configureClientSSL = requiresSSLContext.test(requestURI);
// no localAddr set
testMethodParams.add(new Object[]{
newBuilder(configureClientSSL).build(),
newBuilder(configureClientSSL).provider(),
requestURI,
null
});
// null localAddr set
testMethodParams.add(new Object[]{
newBuilder(configureClientSSL).localAddress(null).build(),
newBuilder(configureClientSSL).localAddress(null).provider(),
requestURI,
null
});
@ -200,7 +200,7 @@ public class HttpClientLocalAddrTest implements HttpServerAdapters {
testMethodParams.add(new Object[]{
newBuilder(configureClientSSL)
.localAddress(loopbackAddr)
.build(),
.provider(),
requestURI,
loopbackAddr
});
@ -211,7 +211,7 @@ public class HttpClientLocalAddrTest implements HttpServerAdapters {
testMethodParams.add(new Object[]{
newBuilder(configureClientSSL)
.localAddress(localAddr)
.build(),
.provider(),
requestURI,
localAddr
});
@ -222,7 +222,7 @@ public class HttpClientLocalAddrTest implements HttpServerAdapters {
testMethodParams.add(new Object[]{
newBuilder(configureClientSSL)
.localAddress(localAddr)
.build(),
.provider(),
requestURI,
localAddr
});
@ -231,7 +231,92 @@ public class HttpClientLocalAddrTest implements HttpServerAdapters {
return testMethodParams.stream().toArray(Object[][]::new);
}
private static HttpClient.Builder newBuilder(boolean configureClientSSL) {
// An object that holds a client and that can be closed
// Used when closing the client might require closing additional
// resources, such as an executor
sealed interface ClientCloseable extends Closeable {
HttpClient client();
@Override
void close();
// a reusable client that does nothing when close() is called,
// so that the underlying client can be reused
record ReusableClient(HttpClient client) implements ClientCloseable {
// do not close the client so that it can be reused
@Override
public void close() { }
}
// a client configured with an executor, that closes both the client
// and the executor when close() is called
record ClientWithExecutor(HttpClient client, ExecutorService service)
implements ClientCloseable {
// close both the client and executor
@Override
public void close() {
client.close();
service.close();
}
}
static ReusableClient reusable(HttpClient client) {
return new ReusableClient(client);
}
static ClientWithExecutor withExecutor(HttpClient client, ExecutorService service) {
return new ClientWithExecutor(client, service);
}
}
// A supplier of ClientCloseable
sealed interface ClientProvider extends Supplier<ClientCloseable> {
ClientCloseable get();
// a ClientProvider that returns reusable clients wrapping the given clieny
record ReusableClientProvider(HttpClient client) implements ClientProvider {
@Override
public ClientCloseable get() {
return ClientCloseable.reusable(client);
}
}
// A ClientProvider that builds a new ClientWithExecutor for every call to get()
record ClientBuilder(HttpClient.Builder builder) implements ClientProvider {
ClientCloseable build() {
int id = IDS.getAndIncrement();
ExecutorService virtualExecutor = Executors.newThreadPerTaskExecutor(Thread.ofVirtual()
.name("HttpClient-" + id + "-Worker", 0).factory());
builder.executor(virtualExecutor);
return ClientCloseable.withExecutor(builder.build(), virtualExecutor);
}
public ClientBuilder localAddress(InetAddress localAddress) {
builder.localAddress(localAddress);
return this;
}
public ClientProvider provider() { return this; }
@Override
public ClientCloseable get() { return build(); }
}
static ReusableClientProvider reusable(HttpClient client) {
return new ReusableClientProvider(client);
}
static ClientBuilder builder(HttpClient.Builder builder) {
return new ClientBuilder(builder);
}
}
private static ClientProvider.ClientBuilder newBuilder(boolean configureClientSSL) {
var builder = HttpClient.newBuilder();
// don't let proxies interfere with the client addresses received on the
// HTTP request, by the server side handler used in this test.
@ -239,7 +324,7 @@ public class HttpClientLocalAddrTest implements HttpServerAdapters {
if (configureClientSSL) {
builder.sslContext(sslContext);
}
return builder;
return ClientProvider.builder(builder);
}
/**
@ -249,17 +334,20 @@ public class HttpClientLocalAddrTest implements HttpServerAdapters {
* {@code client}
*/
@Test(dataProvider = "params")
public void testSend(HttpClient client, URI requestURI, InetAddress localAddress) throws Exception {
System.out.println("Testing using a HTTP client " + client.version() + " with local address " + localAddress
+ " against request URI " + requestURI);
// GET request
var req = HttpRequest.newBuilder(requestURI).build();
var resp = client.send(req, HttpResponse.BodyHandlers.ofByteArray());
Assert.assertEquals(resp.statusCode(), 200, "Unexpected status code");
// verify the address only if a specific one was set on the client
if (localAddress != null && !localAddress.isAnyLocalAddress()) {
Assert.assertEquals(resp.body(), localAddress.getAddress(),
"Unexpected client address seen by the server handler");
public void testSend(ClientProvider clientProvider, URI requestURI, InetAddress localAddress) throws Exception {
try (var c = clientProvider.get()) {
HttpClient client = c.client();
System.out.println("Testing using a HTTP client " + client.version() + " with local address " + localAddress
+ " against request URI " + requestURI);
// GET request
var req = HttpRequest.newBuilder(requestURI).build();
var resp = client.send(req, HttpResponse.BodyHandlers.ofByteArray());
Assert.assertEquals(resp.statusCode(), 200, "Unexpected status code");
// verify the address only if a specific one was set on the client
if (localAddress != null && !localAddress.isAnyLocalAddress()) {
Assert.assertEquals(resp.body(), localAddress.getAddress(),
"Unexpected client address seen by the server handler");
}
}
}
@ -270,34 +358,42 @@ public class HttpClientLocalAddrTest implements HttpServerAdapters {
* {@code client}
*/
@Test(dataProvider = "params")
public void testSendAsync(HttpClient client, URI requestURI, InetAddress localAddress) throws Exception {
System.out.println("Testing using a HTTP client " + client.version() + " with local address " + localAddress
+ " against request URI " + requestURI);
// GET request
var req = HttpRequest.newBuilder(requestURI).build();
var cf = client.sendAsync(req,
HttpResponse.BodyHandlers.ofByteArray());
var resp = cf.get();
Assert.assertEquals(resp.statusCode(), 200, "Unexpected status code");
// verify the address only if a specific one was set on the client
if (localAddress != null && !localAddress.isAnyLocalAddress()) {
Assert.assertEquals(resp.body(), localAddress.getAddress(),
"Unexpected client address seen by the server handler");
public void testSendAsync(ClientProvider clientProvider, URI requestURI, InetAddress localAddress) throws Exception {
try (var c = clientProvider.get()) {
HttpClient client = c.client();
System.out.println("Testing using a HTTP client " + client.version()
+ " with local address " + localAddress
+ " against request URI " + requestURI);
// GET request
var req = HttpRequest.newBuilder(requestURI).build();
var cf = client.sendAsync(req,
HttpResponse.BodyHandlers.ofByteArray());
var resp = cf.get();
Assert.assertEquals(resp.statusCode(), 200, "Unexpected status code");
// verify the address only if a specific one was set on the client
if (localAddress != null && !localAddress.isAnyLocalAddress()) {
Assert.assertEquals(resp.body(), localAddress.getAddress(),
"Unexpected client address seen by the server handler");
}
}
}
/**
* Invokes the {@link #testSend(HttpClient)} and {@link #testSendAsync(HttpClient)}
* Invokes the {@link #testSend} and {@link #testSendAsync}
* tests, concurrently in multiple threads to verify that the correct local address
* is used when multiple concurrent threads are involved in sending requests from
* the {@code client}
*/
@Test(dataProvider = "params")
public void testMultiSendRequests(HttpClient client, URI requestURI, InetAddress localAddress) throws Exception {
public void testMultiSendRequests(ClientProvider clientProvider,
URI requestURI,
InetAddress localAddress) throws Exception {
int numThreads = 4;
ExecutorService executor = Executors.newFixedThreadPool(numThreads);
List<Future<Void>> taskResults = new ArrayList<>();
try {
try (var c = clientProvider.get()) {
// prevents testSend/testSendAsync from closing the client
ClientProvider client = ClientProvider.reusable(c.client());
for (int i = 0; i < numThreads; i++) {
final var currentIdx = i;
var f = executor.submit(new Callable<Void>() {

@ -1,5 +1,5 @@
/*
* Copyright (c) 2015, 2021, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2015, 2023, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -32,10 +32,14 @@
* @compile ../../../com/sun/net/httpserver/LogFilter.java
* @compile ../../../com/sun/net/httpserver/EchoHandler.java
* @compile ../../../com/sun/net/httpserver/FileServerHandler.java
* @run main/othervm/timeout=40 -Djdk.httpclient.HttpClient.log=ssl,channel ManyRequests
* @run main/othervm/timeout=40 -Djdk.httpclient.HttpClient.log=channel -Dtest.insertDelay=true ManyRequests
* @run main/othervm/timeout=40 -Djdk.httpclient.HttpClient.log=channel -Dtest.chunkSize=64 ManyRequests
* @run main/othervm/timeout=40 -Djdk.httpclient.HttpClient.log=channel -Dtest.insertDelay=true -Dtest.chunkSize=64 ManyRequests
* @run main/othervm/timeout=40 -Djdk.tracePinnedThreads=full
* -Djdk.httpclient.HttpClient.log=ssl,channel ManyRequests
* @run main/othervm/timeout=40 -Djdk.tracePinnedThreads=full
* -Djdk.httpclient.HttpClient.log=channel -Dtest.insertDelay=true ManyRequests
* @run main/othervm/timeout=40 -Djdk.tracePinnedThreads=full
* -Djdk.httpclient.HttpClient.log=channel -Dtest.chunkSize=64 ManyRequests
* @run main/othervm/timeout=40 -Djdk.tracePinnedThreads=full
* -Djdk.httpclient.HttpClient.log=channel -Dtest.insertDelay=true -Dtest.chunkSize=64 ManyRequests
* @summary Send a large number of requests asynchronously
*/
// * @run main/othervm/timeout=40 -Djdk.httpclient.HttpClient.log=ssl,channel ManyRequests
@ -84,7 +88,7 @@ import jdk.test.lib.net.URIBuilder;
public class ManyRequests {
static final int MAX_COUNT = 20;
static final int MAX_COUNT = 50;
static final int MAX_LIMIT = 40;
static final AtomicInteger COUNT = new AtomicInteger();
static final AtomicInteger LIMIT = new AtomicInteger(MAX_LIMIT);
@ -95,6 +99,8 @@ public class ManyRequests {
logger.setLevel(Level.ALL);
logger.info("TEST");
Stream.of(Logger.getLogger("").getHandlers()).forEach((h) -> h.setLevel(Level.ALL));
String osName = System.getProperty("os.name", "");
System.out.println("Running on: " + osName);
System.out.println("Sending " + REQUESTS
+ " requests; delay=" + INSERT_DELAY
+ ", chunks=" + CHUNK_SIZE
@ -106,17 +112,22 @@ public class ManyRequests {
ExecutorService executor = executorFor("HTTPS/1.1 Server Thread");
server.setHttpsConfigurator(new Configurator(ctx));
server.setExecutor(executor);
ExecutorService virtualExecutor = Executors.newThreadPerTaskExecutor(Thread.ofVirtual()
.name("HttpClient-Worker", 0).factory());
HttpClient client = HttpClient.newBuilder()
.proxy(Builder.NO_PROXY)
.sslContext(ctx)
.executor(virtualExecutor)
.connectTimeout(Duration.ofMillis(120_000)) // 2mins
.build();
try {
test(server, client);
System.out.println("OK");
} finally {
client.close();
server.stop(0);
virtualExecutor.close();
executor.shutdownNow();
}
}
@ -260,13 +271,13 @@ public class ManyRequests {
done = true;
} catch (CompletionException e) {
if (!Platform.isWindows()) throw e;
if (LIMIT.get() < REQUESTS) throw e;
if (LIMIT.get() < MAX_LIMIT) throw e;
Throwable cause = e;
while ((cause = cause.getCause()) != null) {
if (cause instanceof ConnectException) {
// try again, limit concurrency by half
COUNT.set(0);
LIMIT.set(REQUESTS/2);
LIMIT.set(LIMIT.get()/2);
System.out.println("*** Retrying due to " + cause);
continue LOOP;
}
@ -326,7 +337,7 @@ public class ManyRequests {
if ((blocked && number <= maxnumber / 2) ||
(!blocked && waiters.size() > 0)) {
int toRelease = Math.min(maxnumber - number, waiters.size());
for (int i=0; i<toRelease; i++) {
for (int i=0; i<toRelease && !waiters.isEmpty(); i++) {
CompletableFuture<Void> f = waiters.remove();
number ++;
f.complete(null);

@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2021, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2023, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -34,12 +34,16 @@
* @compile ../../../com/sun/net/httpserver/FileServerHandler.java
* @build ManyRequests ManyRequests2
* @run main/othervm/timeout=40 -Dtest.XFixed=true
* -Djdk.tracePinnedThreads=full
* -Djdk.httpclient.HttpClient.log=channel ManyRequests2
* @run main/othervm/timeout=40 -Dtest.XFixed=true -Dtest.insertDelay=true
* -Djdk.tracePinnedThreads=full
* -Djdk.httpclient.HttpClient.log=channel ManyRequests2
* @run main/othervm/timeout=40 -Dtest.XFixed=true -Dtest.chunkSize=64
* -Djdk.tracePinnedThreads=full
* -Djdk.httpclient.HttpClient.log=channel ManyRequests2
* @run main/othervm/timeout=40 -Djdk.internal.httpclient.debug=true
* @run main/othervm/timeout=400 -Djdk.internal.httpclient.debug=true
* -Djdk.tracePinnedThreads=full
* -Djdk.httpclient.HttpClient.log=channel
* -Dtest.XFixed=true -Dtest.insertDelay=true
* -Dtest.chunkSize=64 ManyRequests2

@ -1,5 +1,5 @@
/*
* Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2019, 2023, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -43,6 +43,9 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import static java.net.Proxy.NO_PROXY;
@ -53,7 +56,7 @@ import static java.net.Proxy.NO_PROXY;
* verifying that the remote address of the HTTP exchange (on the fake proxy server)
* is always the same InetSocketAddress.
* @modules jdk.httpserver
* @run main/othervm PlainProxyConnectionTest
* @run main/othervm -Djdk.tracePinnedThreads=full PlainProxyConnectionTest
* @author danielfuchs
*/
public class PlainProxyConnectionTest {
@ -61,6 +64,7 @@ public class PlainProxyConnectionTest {
static final String RESPONSE = "<html><body><p>Hello World!</body></html>";
static final String PATH = "/foo/";
static final ConcurrentLinkedQueue<InetSocketAddress> connections = new ConcurrentLinkedQueue<>();
private static final AtomicInteger IDS = new AtomicInteger();
// For convenience the server is used both as a plain server and as a plain proxy.
// When used as a proxy, it serves the request itself instead of forwarding it
@ -196,6 +200,18 @@ public class PlainProxyConnectionTest {
performSanityTest(server, uri, proxiedURI);
int id = IDS.getAndIncrement();
ExecutorService virtualExecutor = Executors.newThreadPerTaskExecutor(Thread.ofVirtual()
.name("HttpClient-" + id + "-Worker", 0).factory());
CountingProxySelector ps = CountingProxySelector.of(
InetSocketAddress.createUnresolved(
server.getAddress().getAddress().getHostAddress(),
server.getAddress().getPort()));
HttpClient client = HttpClient.newBuilder()
.version(version)
.executor(virtualExecutor)
.proxy(ps)
.build();
try {
connections.clear();
System.out.println("\nReal test begins here.");
@ -206,14 +222,6 @@ public class PlainProxyConnectionTest {
// to the fake `proxiedURI` at
// http://some.host.that.does.not.exist:4242/foo/x
//
CountingProxySelector ps = CountingProxySelector.of(
InetSocketAddress.createUnresolved(
server.getAddress().getAddress().getHostAddress(),
server.getAddress().getPort()));
HttpClient client = HttpClient.newBuilder()
.version(version)
.proxy(ps)
.build();
HttpRequest request = HttpRequest.newBuilder()
.uri(proxiedURI)
.GET()
@ -268,6 +276,8 @@ public class PlainProxyConnectionTest {
}
} finally {
connections.clear();
client.close();
virtualExecutor.close();
}
}
}