8187044: HttpClient ConnectionPool may spawn several concurrent CacheCleaner and prevent early GC of HttpClient
Fixes CacheCleaner creation logic in ConnectionPool. Reviewed-by: chegar
This commit is contained in:
parent
b874c676ac
commit
b85726c71e
@ -25,11 +25,14 @@
|
||||
|
||||
package jdk.incubator.http;
|
||||
|
||||
import java.lang.ref.WeakReference;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.ListIterator;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import jdk.incubator.http.internal.common.Utils;
|
||||
|
||||
/**
|
||||
@ -37,6 +40,21 @@ import jdk.incubator.http.internal.common.Utils;
|
||||
*/
|
||||
final class ConnectionPool {
|
||||
|
||||
// These counters are used to distribute ids for debugging
|
||||
// The ACTIVE_CLEANER_COUNTER will tell how many CacheCleaner
|
||||
// are active at a given time. It will increase when a new
|
||||
// CacheCleaner is started and decrease when it exits.
|
||||
static final AtomicLong ACTIVE_CLEANER_COUNTER = new AtomicLong();
|
||||
// The POOL_IDS_COUNTER increases each time a new ConnectionPool
|
||||
// is created. It may wrap and become negative but will never be
|
||||
// decremented.
|
||||
static final AtomicLong POOL_IDS_COUNTER = new AtomicLong();
|
||||
// The cleanerCounter is used to name cleaner threads within a
|
||||
// a connection pool, and increments monotically.
|
||||
// It may wrap and become negative but will never be
|
||||
// decremented.
|
||||
final AtomicLong cleanerCounter = new AtomicLong();
|
||||
|
||||
static final long KEEP_ALIVE = Utils.getIntegerNetProperty(
|
||||
"jdk.httpclient.keepalive.timeout", 1200); // seconds
|
||||
|
||||
@ -44,7 +62,12 @@ final class ConnectionPool {
|
||||
|
||||
final HashMap<CacheKey,LinkedList<HttpConnection>> plainPool;
|
||||
final HashMap<CacheKey,LinkedList<HttpConnection>> sslPool;
|
||||
CacheCleaner cleaner;
|
||||
// A monotically increasing id for this connection pool.
|
||||
// It may be negative (that's OK)
|
||||
// Mostly used for debugging purposes when looking at thread dumps.
|
||||
// Global scope.
|
||||
final long poolID = POOL_IDS_COUNTER.incrementAndGet();
|
||||
final AtomicReference<CacheCleaner> cleanerRef;
|
||||
|
||||
/**
|
||||
* Entries in connection pool are keyed by destination address and/or
|
||||
@ -105,6 +128,7 @@ final class ConnectionPool {
|
||||
plainPool = new HashMap<>();
|
||||
sslPool = new HashMap<>();
|
||||
expiryList = new LinkedList<>();
|
||||
cleanerRef = new AtomicReference<>();
|
||||
}
|
||||
|
||||
void start() {
|
||||
@ -143,7 +167,7 @@ final class ConnectionPool {
|
||||
findConnection(CacheKey key,
|
||||
HashMap<CacheKey,LinkedList<HttpConnection>> pool) {
|
||||
LinkedList<HttpConnection> l = pool.get(key);
|
||||
if (l == null || l.size() == 0) {
|
||||
if (l == null || l.isEmpty()) {
|
||||
return null;
|
||||
} else {
|
||||
HttpConnection c = l.removeFirst();
|
||||
@ -175,19 +199,36 @@ final class ConnectionPool {
|
||||
l.add(c);
|
||||
}
|
||||
|
||||
// only runs while entries exist in cache
|
||||
static String makeCleanerName(long poolId, long cleanerId) {
|
||||
return "HTTP-Cache-cleaner-" + poolId + "-" + cleanerId;
|
||||
}
|
||||
|
||||
final class CacheCleaner extends Thread {
|
||||
// only runs while entries exist in cache
|
||||
final static class CacheCleaner extends Thread {
|
||||
|
||||
volatile boolean stopping;
|
||||
// A monotically increasing id. May wrap and become negative (that's OK)
|
||||
// Mostly used for debugging purposes when looking at thread dumps.
|
||||
// Scoped per connection pool.
|
||||
final long cleanerID;
|
||||
// A reference to the owning ConnectionPool.
|
||||
// This reference's referent may become null if the HttpClientImpl
|
||||
// that owns this pool is GC'ed.
|
||||
final WeakReference<ConnectionPool> ownerRef;
|
||||
|
||||
CacheCleaner() {
|
||||
super(null, null, "HTTP-Cache-cleaner", 0, false);
|
||||
CacheCleaner(ConnectionPool owner) {
|
||||
this(owner, owner.cleanerCounter.incrementAndGet());
|
||||
}
|
||||
|
||||
CacheCleaner(ConnectionPool owner, long cleanerID) {
|
||||
super(null, null, makeCleanerName(owner.poolID, cleanerID), 0, false);
|
||||
this.cleanerID = cleanerID;
|
||||
this.ownerRef = new WeakReference<>(owner);
|
||||
setDaemon(true);
|
||||
}
|
||||
|
||||
synchronized boolean stopping() {
|
||||
return stopping;
|
||||
return stopping || ownerRef.get() == null;
|
||||
}
|
||||
|
||||
synchronized void stopCleaner() {
|
||||
@ -196,11 +237,19 @@ final class ConnectionPool {
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
while (!stopping()) {
|
||||
try {
|
||||
Thread.sleep(3000);
|
||||
} catch (InterruptedException e) {}
|
||||
cleanCache();
|
||||
ACTIVE_CLEANER_COUNTER.incrementAndGet();
|
||||
try {
|
||||
while (!stopping()) {
|
||||
try {
|
||||
Thread.sleep(3000);
|
||||
} catch (InterruptedException e) {}
|
||||
ConnectionPool owner = ownerRef.get();
|
||||
if (owner == null) return;
|
||||
owner.cleanCache(this);
|
||||
owner = null;
|
||||
}
|
||||
} finally {
|
||||
ACTIVE_CLEANER_COUNTER.decrementAndGet();
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -217,13 +266,15 @@ final class ConnectionPool {
|
||||
return;
|
||||
}
|
||||
}
|
||||
if (expiryList.isEmpty()) {
|
||||
CacheCleaner cleaner = this.cleanerRef.get();
|
||||
if (expiryList.isEmpty() && cleaner != null) {
|
||||
this.cleanerRef.compareAndSet(cleaner, null);
|
||||
cleaner.stopCleaner();
|
||||
cleaner = null;
|
||||
cleaner.interrupt();
|
||||
}
|
||||
}
|
||||
|
||||
private void cleanCache() {
|
||||
private void cleanCache(CacheCleaner cleaner) {
|
||||
long now = System.currentTimeMillis() / 1000;
|
||||
LinkedList<HttpConnection> closelist = new LinkedList<>();
|
||||
|
||||
@ -242,6 +293,10 @@ final class ConnectionPool {
|
||||
}
|
||||
}
|
||||
}
|
||||
if (expiryList.isEmpty() && cleaner != null) {
|
||||
this.cleanerRef.compareAndSet(cleaner, null);
|
||||
cleaner.stopCleaner();
|
||||
}
|
||||
}
|
||||
for (HttpConnection c : closelist) {
|
||||
//System.out.println ("KAC: closing " + c);
|
||||
@ -252,10 +307,13 @@ final class ConnectionPool {
|
||||
private synchronized void addToExpiryList(HttpConnection conn) {
|
||||
long now = System.currentTimeMillis() / 1000;
|
||||
long then = now + KEEP_ALIVE;
|
||||
|
||||
if (expiryList.isEmpty()) {
|
||||
cleaner = new CacheCleaner();
|
||||
cleaner.start();
|
||||
CacheCleaner cleaner = new CacheCleaner(this);
|
||||
if (this.cleanerRef.compareAndSet(null, cleaner)) {
|
||||
cleaner.start();
|
||||
}
|
||||
expiryList.add(new ExpiryEntry(conn, then));
|
||||
return;
|
||||
}
|
||||
|
||||
ListIterator<ExpiryEntry> li = expiryList.listIterator();
|
||||
|
@ -23,9 +23,10 @@
|
||||
|
||||
/*
|
||||
* @test
|
||||
* @bug 8151299 8164704
|
||||
* @modules jdk.incubator.httpclient
|
||||
* @bug 8151299 8164704 8187044
|
||||
* @modules jdk.incubator.httpclient java.management
|
||||
* @run testng jdk.incubator.httpclient/jdk.incubator.http.SelectorTest
|
||||
* @run testng jdk.incubator.httpclient/jdk.incubator.http.RawChannelTest
|
||||
* @run testng jdk.incubator.httpclient/jdk.incubator.http.ResponseHeadersTest
|
||||
* @run main/othervm --add-reads jdk.incubator.httpclient=java.management jdk.incubator.httpclient/jdk.incubator.http.ConnectionPoolTest
|
||||
*/
|
||||
|
@ -0,0 +1,252 @@
|
||||
/*
|
||||
* Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
|
||||
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
||||
*
|
||||
* This code is free software; you can redistribute it and/or modify it
|
||||
* under the terms of the GNU General Public License version 2 only, as
|
||||
* published by the Free Software Foundation.
|
||||
*
|
||||
* This code is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
|
||||
* version 2 for more details (a copy is included in the LICENSE file that
|
||||
* accompanied this code).
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License version
|
||||
* 2 along with this work; if not, write to the Free Software Foundation,
|
||||
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||
*
|
||||
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
|
||||
* or visit www.oracle.com if you need additional information or have any
|
||||
* questions.
|
||||
*/
|
||||
|
||||
package jdk.incubator.http;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.management.ManagementFactory;
|
||||
import java.lang.ref.Reference;
|
||||
import java.lang.ref.ReferenceQueue;
|
||||
import java.lang.ref.WeakReference;
|
||||
import java.net.Authenticator;
|
||||
import java.net.CookieManager;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.ProxySelector;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.SocketChannel;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.Executor;
|
||||
import javax.net.ssl.SSLContext;
|
||||
import javax.net.ssl.SSLParameters;
|
||||
import jdk.incubator.http.internal.common.ByteBufferReference;
|
||||
|
||||
/**
|
||||
* @summary Verifies that the ConnectionPool won't prevent an HttpClient
|
||||
* from being GC'ed. Verifies that the ConnectionPool has at most
|
||||
* one CacheCleaner thread running.
|
||||
* @bug 8187044
|
||||
* @author danielfuchs
|
||||
*/
|
||||
public class ConnectionPoolTest {
|
||||
|
||||
static long getActiveCleaners() throws ClassNotFoundException {
|
||||
// ConnectionPool.ACTIVE_CLEANER_COUNTER.get()
|
||||
// ConnectionPoolTest.class.getModule().addReads(
|
||||
// Class.forName("java.lang.management.ManagementFactory").getModule());
|
||||
return java.util.stream.Stream.of(ManagementFactory.getThreadMXBean()
|
||||
.dumpAllThreads(false, false))
|
||||
.filter(t -> t.getThreadName().startsWith("HTTP-Cache-cleaner"))
|
||||
.count();
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
testCacheCleaners();
|
||||
}
|
||||
|
||||
public static void testCacheCleaners() throws Exception {
|
||||
ConnectionPool pool = new ConnectionPool();
|
||||
HttpClient client = new HttpClientStub(pool);
|
||||
InetSocketAddress proxy = InetSocketAddress.createUnresolved("bar", 80);
|
||||
System.out.println("Adding 10 connections to pool");
|
||||
for (int i=0; i<10; i++) {
|
||||
InetSocketAddress addr = InetSocketAddress.createUnresolved("foo"+i, 80);
|
||||
HttpConnection c1 = new HttpConnectionStub(client, addr, proxy, true);
|
||||
pool.returnToPool(c1);
|
||||
}
|
||||
while (getActiveCleaners() == 0) {
|
||||
System.out.println("Waiting for cleaner to start");
|
||||
Thread.sleep(10);
|
||||
}
|
||||
System.out.println("Active CacheCleaners: " + getActiveCleaners());
|
||||
if (getActiveCleaners() > 1) {
|
||||
throw new RuntimeException("Too many CacheCleaner active: "
|
||||
+ getActiveCleaners());
|
||||
}
|
||||
System.out.println("Removing 9 connections from pool");
|
||||
for (int i=0; i<9; i++) {
|
||||
InetSocketAddress addr = InetSocketAddress.createUnresolved("foo"+i, 80);
|
||||
HttpConnection c2 = pool.getConnection(true, addr, proxy);
|
||||
if (c2 == null) {
|
||||
throw new RuntimeException("connection not found for " + addr);
|
||||
}
|
||||
}
|
||||
System.out.println("Active CacheCleaners: " + getActiveCleaners());
|
||||
if (getActiveCleaners() != 1) {
|
||||
throw new RuntimeException("Wrong number of CacheCleaner active: "
|
||||
+ getActiveCleaners());
|
||||
}
|
||||
System.out.println("Removing last connection from pool");
|
||||
for (int i=9; i<10; i++) {
|
||||
InetSocketAddress addr = InetSocketAddress.createUnresolved("foo"+i, 80);
|
||||
HttpConnection c2 = pool.getConnection(true, addr, proxy);
|
||||
if (c2 == null) {
|
||||
throw new RuntimeException("connection not found for " + addr);
|
||||
}
|
||||
}
|
||||
System.out.println("Active CacheCleaners: " + getActiveCleaners()
|
||||
+ " (may be 0 or may still be 1)");
|
||||
if (getActiveCleaners() > 1) {
|
||||
throw new RuntimeException("Too many CacheCleaner active: "
|
||||
+ getActiveCleaners());
|
||||
}
|
||||
InetSocketAddress addr = InetSocketAddress.createUnresolved("foo", 80);
|
||||
HttpConnection c = new HttpConnectionStub(client, addr, proxy, true);
|
||||
System.out.println("Adding/Removing one connection from pool 20 times in a loop");
|
||||
for (int i=0; i<20; i++) {
|
||||
pool.returnToPool(c);
|
||||
HttpConnection c2 = pool.getConnection(true, addr, proxy);
|
||||
if (c2 == null) {
|
||||
throw new RuntimeException("connection not found for " + addr);
|
||||
}
|
||||
if (c2 != c) {
|
||||
throw new RuntimeException("wrong connection found for " + addr);
|
||||
}
|
||||
}
|
||||
if (getActiveCleaners() > 1) {
|
||||
throw new RuntimeException("Too many CacheCleaner active: "
|
||||
+ getActiveCleaners());
|
||||
}
|
||||
ReferenceQueue<HttpClient> queue = new ReferenceQueue<>();
|
||||
WeakReference<HttpClient> weak = new WeakReference<>(client, queue);
|
||||
System.gc();
|
||||
Reference.reachabilityFence(pool);
|
||||
client = null; pool = null; c = null;
|
||||
while (true) {
|
||||
long cleaners = getActiveCleaners();
|
||||
System.out.println("Waiting for GC to release stub HttpClient;"
|
||||
+ " active cache cleaners: " + cleaners);
|
||||
System.gc();
|
||||
Reference<?> ref = queue.remove(1000);
|
||||
if (ref == weak) {
|
||||
System.out.println("Stub HttpClient GC'ed");
|
||||
break;
|
||||
}
|
||||
}
|
||||
while (getActiveCleaners() > 0) {
|
||||
System.out.println("Waiting for CacheCleaner to stop");
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
System.out.println("Active CacheCleaners: "
|
||||
+ getActiveCleaners());
|
||||
|
||||
if (getActiveCleaners() > 0) {
|
||||
throw new RuntimeException("Too many CacheCleaner active: "
|
||||
+ getActiveCleaners());
|
||||
}
|
||||
}
|
||||
static <T> T error() {
|
||||
throw new InternalError("Should not reach here: wrong test assumptions!");
|
||||
}
|
||||
|
||||
// Emulates an HttpConnection that has a strong reference to its HttpClient.
|
||||
static class HttpConnectionStub extends HttpConnection {
|
||||
|
||||
public HttpConnectionStub(HttpClient client,
|
||||
InetSocketAddress address,
|
||||
InetSocketAddress proxy,
|
||||
boolean secured) {
|
||||
super(address, null);
|
||||
this.key = ConnectionPool.cacheKey(address, proxy);
|
||||
this.address = address;
|
||||
this.proxy = proxy;
|
||||
this.secured = secured;
|
||||
this.client = client;
|
||||
}
|
||||
|
||||
InetSocketAddress proxy;
|
||||
InetSocketAddress address;
|
||||
boolean secured;
|
||||
ConnectionPool.CacheKey key;
|
||||
HttpClient client;
|
||||
|
||||
// All these return something
|
||||
@Override boolean connected() {return true;}
|
||||
@Override boolean isSecure() {return secured;}
|
||||
@Override boolean isProxied() {return proxy!=null;}
|
||||
@Override ConnectionPool.CacheKey cacheKey() {return key;}
|
||||
@Override public void close() {}
|
||||
@Override void shutdownInput() throws IOException {}
|
||||
@Override void shutdownOutput() throws IOException {}
|
||||
public String toString() {
|
||||
return "HttpConnectionStub: " + address + " proxy: " + proxy;
|
||||
}
|
||||
|
||||
// All these throw errors
|
||||
@Override
|
||||
public void connect() throws IOException, InterruptedException {error();}
|
||||
@Override public CompletableFuture<Void> connectAsync() {return error();}
|
||||
@Override SocketChannel channel() {return error();}
|
||||
@Override void flushAsync() throws IOException {error();}
|
||||
@Override
|
||||
protected ByteBuffer readImpl() throws IOException {return error();}
|
||||
@Override CompletableFuture<Void> whenReceivingResponse() {return error();}
|
||||
@Override
|
||||
long write(ByteBuffer[] buffers, int start, int number) throws IOException {
|
||||
throw (Error)error();
|
||||
}
|
||||
@Override
|
||||
long write(ByteBuffer buffer) throws IOException {throw (Error)error();}
|
||||
@Override
|
||||
void writeAsync(ByteBufferReference[] buffers) throws IOException {
|
||||
error();
|
||||
}
|
||||
@Override
|
||||
void writeAsyncUnordered(ByteBufferReference[] buffers)
|
||||
throws IOException {
|
||||
error();
|
||||
}
|
||||
}
|
||||
// Emulates an HttpClient that has a strong reference to its connection pool.
|
||||
static class HttpClientStub extends HttpClient {
|
||||
public HttpClientStub(ConnectionPool pool) {
|
||||
this.pool = pool;
|
||||
}
|
||||
final ConnectionPool pool;
|
||||
@Override public Optional<CookieManager> cookieManager() {return error();}
|
||||
@Override public HttpClient.Redirect followRedirects() {return error();}
|
||||
@Override public Optional<ProxySelector> proxy() {return error();}
|
||||
@Override public SSLContext sslContext() {return error();}
|
||||
@Override public Optional<SSLParameters> sslParameters() {return error();}
|
||||
@Override public Optional<Authenticator> authenticator() {return error();}
|
||||
@Override public HttpClient.Version version() {return HttpClient.Version.HTTP_1_1;}
|
||||
@Override public Executor executor() {return error();}
|
||||
@Override
|
||||
public <T> HttpResponse<T> send(HttpRequest req,
|
||||
HttpResponse.BodyHandler<T> responseBodyHandler)
|
||||
throws IOException, InterruptedException {
|
||||
return error();
|
||||
}
|
||||
@Override
|
||||
public <T> CompletableFuture<HttpResponse<T>> sendAsync(HttpRequest req,
|
||||
HttpResponse.BodyHandler<T> responseBodyHandler) {
|
||||
return error();
|
||||
}
|
||||
@Override
|
||||
public <U, T> CompletableFuture<U> sendAsync(HttpRequest req,
|
||||
HttpResponse.MultiProcessor<U, T> multiProcessor) {
|
||||
return error();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user