8202788: Explicitly reclaim cached thread-local direct buffers at thread exit
Add internal TerminatingThreadLocal and use it to free cached thread-local direct buffers and nio-fs native buffers Reviewed-by: tonyp, alanb
This commit is contained in:
parent
578576f523
commit
6ec2cfcc49
@ -36,6 +36,8 @@ import java.util.HashMap;
|
|||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
import java.util.concurrent.locks.LockSupport;
|
import java.util.concurrent.locks.LockSupport;
|
||||||
|
|
||||||
|
import jdk.internal.misc.TerminatingThreadLocal;
|
||||||
import sun.nio.ch.Interruptible;
|
import sun.nio.ch.Interruptible;
|
||||||
import jdk.internal.reflect.CallerSensitive;
|
import jdk.internal.reflect.CallerSensitive;
|
||||||
import jdk.internal.reflect.Reflection;
|
import jdk.internal.reflect.Reflection;
|
||||||
@ -838,6 +840,9 @@ class Thread implements Runnable {
|
|||||||
* a chance to clean up before it actually exits.
|
* a chance to clean up before it actually exits.
|
||||||
*/
|
*/
|
||||||
private void exit() {
|
private void exit() {
|
||||||
|
if (TerminatingThreadLocal.REGISTRY.isPresent()) {
|
||||||
|
TerminatingThreadLocal.threadTerminated();
|
||||||
|
}
|
||||||
if (group != null) {
|
if (group != null) {
|
||||||
group.threadTerminated(this);
|
group.threadTerminated(this);
|
||||||
group = null;
|
group = null;
|
||||||
|
@ -24,6 +24,8 @@
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
package java.lang;
|
package java.lang;
|
||||||
|
import jdk.internal.misc.TerminatingThreadLocal;
|
||||||
|
|
||||||
import java.lang.ref.*;
|
import java.lang.ref.*;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
@ -170,6 +172,19 @@ public class ThreadLocal<T> {
|
|||||||
return setInitialValue();
|
return setInitialValue();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns {@code true} if there is a value in the current thread's copy of
|
||||||
|
* this thread-local variable, even if that values is {@code null}.
|
||||||
|
*
|
||||||
|
* @return {@code true} if current thread has associated value in this
|
||||||
|
* thread-local variable; {@code false} if not
|
||||||
|
*/
|
||||||
|
boolean isPresent() {
|
||||||
|
Thread t = Thread.currentThread();
|
||||||
|
ThreadLocalMap map = getMap(t);
|
||||||
|
return map != null && map.getEntry(this) != null;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Variant of set() to establish initialValue. Used instead
|
* Variant of set() to establish initialValue. Used instead
|
||||||
* of set() in case user has overridden the set() method.
|
* of set() in case user has overridden the set() method.
|
||||||
@ -180,10 +195,14 @@ public class ThreadLocal<T> {
|
|||||||
T value = initialValue();
|
T value = initialValue();
|
||||||
Thread t = Thread.currentThread();
|
Thread t = Thread.currentThread();
|
||||||
ThreadLocalMap map = getMap(t);
|
ThreadLocalMap map = getMap(t);
|
||||||
if (map != null)
|
if (map != null) {
|
||||||
map.set(this, value);
|
map.set(this, value);
|
||||||
else
|
} else {
|
||||||
createMap(t, value);
|
createMap(t, value);
|
||||||
|
}
|
||||||
|
if (this instanceof TerminatingThreadLocal) {
|
||||||
|
TerminatingThreadLocal.register((TerminatingThreadLocal<?>) this);
|
||||||
|
}
|
||||||
return value;
|
return value;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -199,10 +218,11 @@ public class ThreadLocal<T> {
|
|||||||
public void set(T value) {
|
public void set(T value) {
|
||||||
Thread t = Thread.currentThread();
|
Thread t = Thread.currentThread();
|
||||||
ThreadLocalMap map = getMap(t);
|
ThreadLocalMap map = getMap(t);
|
||||||
if (map != null)
|
if (map != null) {
|
||||||
map.set(this, value);
|
map.set(this, value);
|
||||||
else
|
} else {
|
||||||
createMap(t, value);
|
createMap(t, value);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -218,8 +238,9 @@ public class ThreadLocal<T> {
|
|||||||
*/
|
*/
|
||||||
public void remove() {
|
public void remove() {
|
||||||
ThreadLocalMap m = getMap(Thread.currentThread());
|
ThreadLocalMap m = getMap(Thread.currentThread());
|
||||||
if (m != null)
|
if (m != null) {
|
||||||
m.remove(this);
|
m.remove(this);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -0,0 +1,105 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
|
||||||
|
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
||||||
|
*
|
||||||
|
* This code is free software; you can redistribute it and/or modify it
|
||||||
|
* under the terms of the GNU General Public License version 2 only, as
|
||||||
|
* published by the Free Software Foundation. Oracle designates this
|
||||||
|
* particular file as subject to the "Classpath" exception as provided
|
||||||
|
* by Oracle in the LICENSE file that accompanied this code.
|
||||||
|
*
|
||||||
|
* This code is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
|
||||||
|
* version 2 for more details (a copy is included in the LICENSE file that
|
||||||
|
* accompanied this code).
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License version
|
||||||
|
* 2 along with this work; if not, write to the Free Software Foundation,
|
||||||
|
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||||
|
*
|
||||||
|
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
|
||||||
|
* or visit www.oracle.com if you need additional information or have any
|
||||||
|
* questions.
|
||||||
|
*/
|
||||||
|
package jdk.internal.misc;
|
||||||
|
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.IdentityHashMap;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A thread-local variable that is notified when a thread terminates and
|
||||||
|
* it has been initialized in the terminating thread (even if it was
|
||||||
|
* initialized with a null value).
|
||||||
|
*/
|
||||||
|
public class TerminatingThreadLocal<T> extends ThreadLocal<T> {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void set(T value) {
|
||||||
|
super.set(value);
|
||||||
|
register(this);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void remove() {
|
||||||
|
super.remove();
|
||||||
|
unregister(this);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Invoked by a thread when terminating and this thread-local has an associated
|
||||||
|
* value for the terminating thread (even if that value is null), so that any
|
||||||
|
* native resources maintained by the value can be released.
|
||||||
|
*
|
||||||
|
* @param value current thread's value of this thread-local variable
|
||||||
|
* (may be null but only if null value was explicitly initialized)
|
||||||
|
*/
|
||||||
|
protected void threadTerminated(T value) {
|
||||||
|
}
|
||||||
|
|
||||||
|
// following methods and field are implementation details and should only be
|
||||||
|
// called from the corresponding code int Thread/ThreadLocal class.
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Invokes the TerminatingThreadLocal's {@link #threadTerminated()} method
|
||||||
|
* on all instances registered in current thread.
|
||||||
|
*/
|
||||||
|
public static void threadTerminated() {
|
||||||
|
for (TerminatingThreadLocal<?> ttl : REGISTRY.get()) {
|
||||||
|
ttl._threadTerminated();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void _threadTerminated() { threadTerminated(get()); }
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Register given TerminatingThreadLocal
|
||||||
|
*
|
||||||
|
* @param tl the ThreadLocal to register
|
||||||
|
*/
|
||||||
|
public static void register(TerminatingThreadLocal<?> tl) {
|
||||||
|
REGISTRY.get().add(tl);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Unregister given TerminatingThreadLocal
|
||||||
|
*
|
||||||
|
* @param tl the ThreadLocal to unregister
|
||||||
|
*/
|
||||||
|
private static void unregister(TerminatingThreadLocal<?> tl) {
|
||||||
|
REGISTRY.get().remove(tl);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* a per-thread registry of TerminatingThreadLocal(s) that have been registered
|
||||||
|
* but later not unregistered in a particular thread.
|
||||||
|
*/
|
||||||
|
public static final ThreadLocal<Collection<TerminatingThreadLocal<?>>> REGISTRY =
|
||||||
|
new ThreadLocal<>() {
|
||||||
|
@Override
|
||||||
|
protected Collection<TerminatingThreadLocal<?>> initialValue() {
|
||||||
|
return Collections.newSetFromMap(new IdentityHashMap<>(4));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
@ -26,6 +26,7 @@
|
|||||||
package sun.nio.ch;
|
package sun.nio.ch;
|
||||||
|
|
||||||
import java.io.FileDescriptor;
|
import java.io.FileDescriptor;
|
||||||
|
import java.io.IOException;
|
||||||
import java.lang.reflect.Constructor;
|
import java.lang.reflect.Constructor;
|
||||||
import java.lang.reflect.InvocationTargetException;
|
import java.lang.reflect.InvocationTargetException;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
@ -35,9 +36,10 @@ import java.security.PrivilegedAction;
|
|||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
|
import jdk.internal.misc.TerminatingThreadLocal;
|
||||||
import jdk.internal.misc.Unsafe;
|
import jdk.internal.misc.Unsafe;
|
||||||
import sun.security.action.GetPropertyAction;
|
import sun.security.action.GetPropertyAction;
|
||||||
import java.io.IOException;
|
|
||||||
|
|
||||||
public class Util {
|
public class Util {
|
||||||
|
|
||||||
@ -50,13 +52,18 @@ public class Util {
|
|||||||
private static final long MAX_CACHED_BUFFER_SIZE = getMaxCachedBufferSize();
|
private static final long MAX_CACHED_BUFFER_SIZE = getMaxCachedBufferSize();
|
||||||
|
|
||||||
// Per-thread cache of temporary direct buffers
|
// Per-thread cache of temporary direct buffers
|
||||||
private static ThreadLocal<BufferCache> bufferCache =
|
private static ThreadLocal<BufferCache> bufferCache = new TerminatingThreadLocal<>() {
|
||||||
new ThreadLocal<BufferCache>()
|
|
||||||
{
|
|
||||||
@Override
|
@Override
|
||||||
protected BufferCache initialValue() {
|
protected BufferCache initialValue() {
|
||||||
return new BufferCache();
|
return new BufferCache();
|
||||||
}
|
}
|
||||||
|
@Override
|
||||||
|
protected void threadTerminated(BufferCache cache) { // will never be null
|
||||||
|
while (!cache.isEmpty()) {
|
||||||
|
ByteBuffer bb = cache.removeFirst();
|
||||||
|
free(bb);
|
||||||
|
}
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -25,6 +25,7 @@
|
|||||||
|
|
||||||
package sun.nio.fs;
|
package sun.nio.fs;
|
||||||
|
|
||||||
|
import jdk.internal.misc.TerminatingThreadLocal;
|
||||||
import jdk.internal.misc.Unsafe;
|
import jdk.internal.misc.Unsafe;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -37,8 +38,21 @@ class NativeBuffers {
|
|||||||
private static final Unsafe unsafe = Unsafe.getUnsafe();
|
private static final Unsafe unsafe = Unsafe.getUnsafe();
|
||||||
|
|
||||||
private static final int TEMP_BUF_POOL_SIZE = 3;
|
private static final int TEMP_BUF_POOL_SIZE = 3;
|
||||||
private static ThreadLocal<NativeBuffer[]> threadLocal =
|
private static ThreadLocal<NativeBuffer[]> threadLocal = new TerminatingThreadLocal<>() {
|
||||||
new ThreadLocal<NativeBuffer[]>();
|
@Override
|
||||||
|
protected void threadTerminated(NativeBuffer[] buffers) {
|
||||||
|
// threadLocal may be initialized but with initialValue of null
|
||||||
|
if (buffers != null) {
|
||||||
|
for (int i = 0; i < TEMP_BUF_POOL_SIZE; i++) {
|
||||||
|
NativeBuffer buffer = buffers[i];
|
||||||
|
if (buffer != null) {
|
||||||
|
buffer.free();
|
||||||
|
buffers[i] = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Allocates a native buffer, of at least the given size, from the heap.
|
* Allocates a native buffer, of at least the given size, from the heap.
|
||||||
|
@ -0,0 +1,91 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
|
||||||
|
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
||||||
|
*
|
||||||
|
* This code is free software; you can redistribute it and/or modify it
|
||||||
|
* under the terms of the GNU General Public License version 2 only, as
|
||||||
|
* published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This code is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
|
||||||
|
* version 2 for more details (a copy is included in the LICENSE file that
|
||||||
|
* accompanied this code).
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License version
|
||||||
|
* 2 along with this work; if not, write to the Free Software Foundation,
|
||||||
|
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||||
|
*
|
||||||
|
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
|
||||||
|
* or visit www.oracle.com if you need additional information or have any
|
||||||
|
* questions.
|
||||||
|
*/
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.UncheckedIOException;
|
||||||
|
import java.lang.management.BufferPoolMXBean;
|
||||||
|
import java.lang.management.ManagementFactory;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
import java.nio.channels.FileChannel;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.nio.file.Files;
|
||||||
|
import java.nio.file.Path;
|
||||||
|
|
||||||
|
import static java.nio.file.StandardOpenOption.CREATE;
|
||||||
|
import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
|
||||||
|
import static java.nio.file.StandardOpenOption.WRITE;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* @test
|
||||||
|
* @bug 8202788
|
||||||
|
* @summary Test reclamation of thread-local temporary direct byte buffers at thread exit
|
||||||
|
* @modules java.management
|
||||||
|
* @run main/othervm TempDirectBuffersReclamation
|
||||||
|
*/
|
||||||
|
public class TempDirectBuffersReclamation {
|
||||||
|
|
||||||
|
public static void main(String[] args) throws IOException {
|
||||||
|
|
||||||
|
BufferPoolMXBean dbPool = ManagementFactory
|
||||||
|
.getPlatformMXBeans(BufferPoolMXBean.class)
|
||||||
|
.stream()
|
||||||
|
.filter(bp -> bp.getName().equals("direct"))
|
||||||
|
.findFirst()
|
||||||
|
.orElseThrow(() -> new RuntimeException("Can't obtain direct BufferPoolMXBean"));
|
||||||
|
|
||||||
|
long count0 = dbPool.getCount();
|
||||||
|
long memoryUsed0 = dbPool.getMemoryUsed();
|
||||||
|
|
||||||
|
Thread thread = new Thread(TempDirectBuffersReclamation::doFileChannelWrite);
|
||||||
|
thread.start();
|
||||||
|
try {
|
||||||
|
thread.join();
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
long count1 = dbPool.getCount();
|
||||||
|
long memoryUsed1 = dbPool.getMemoryUsed();
|
||||||
|
|
||||||
|
if (count0 != count1 || memoryUsed0 != memoryUsed1) {
|
||||||
|
throw new AssertionError(
|
||||||
|
"Direct BufferPool not same before thread activity and after thread exit.\n" +
|
||||||
|
"Before: # of buffers: " + count0 + ", memory used: " + memoryUsed0 + "\n" +
|
||||||
|
" After: # of buffers: " + count1 + ", memory used: " + memoryUsed1 + "\n"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void doFileChannelWrite() {
|
||||||
|
try {
|
||||||
|
Path file = Files.createTempFile("test", ".tmp");
|
||||||
|
try (FileChannel fc = FileChannel.open(file, CREATE, WRITE, TRUNCATE_EXISTING)) {
|
||||||
|
fc.write(ByteBuffer.wrap("HELLO".getBytes(StandardCharsets.UTF_8)));
|
||||||
|
} finally {
|
||||||
|
Files.delete(file);
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new UncheckedIOException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,90 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
|
||||||
|
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
||||||
|
*
|
||||||
|
* This code is free software; you can redistribute it and/or modify it
|
||||||
|
* under the terms of the GNU General Public License version 2 only, as
|
||||||
|
* published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This code is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
|
||||||
|
* version 2 for more details (a copy is included in the LICENSE file that
|
||||||
|
* accompanied this code).
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License version
|
||||||
|
* 2 along with this work; if not, write to the Free Software Foundation,
|
||||||
|
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||||
|
*
|
||||||
|
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
|
||||||
|
* or visit www.oracle.com if you need additional information or have any
|
||||||
|
* questions.
|
||||||
|
*/
|
||||||
|
|
||||||
|
import jdk.internal.misc.TerminatingThreadLocal;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.CopyOnWriteArrayList;
|
||||||
|
import java.util.function.Consumer;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* @test
|
||||||
|
* @bug 8202788
|
||||||
|
* @summary TerminatingThreadLocal unit test
|
||||||
|
* @modules java.base/jdk.internal.misc
|
||||||
|
* @run main TestTerminatingThreadLocal
|
||||||
|
*/
|
||||||
|
public class TestTerminatingThreadLocal {
|
||||||
|
|
||||||
|
public static void main(String[] args) {
|
||||||
|
ttlTestSet(42, 112);
|
||||||
|
ttlTestSet(null, 112);
|
||||||
|
ttlTestSet(42, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
static <T> void ttlTestSet(T v0, T v1) {
|
||||||
|
ttlTest(v0, ttl -> { } );
|
||||||
|
ttlTest(v0, ttl -> { ttl.get(); }, v0);
|
||||||
|
ttlTest(v0, ttl -> { ttl.get(); ttl.remove(); } );
|
||||||
|
ttlTest(v0, ttl -> { ttl.get(); ttl.set(v1); }, v1);
|
||||||
|
ttlTest(v0, ttl -> { ttl.set(v1); }, v1);
|
||||||
|
ttlTest(v0, ttl -> { ttl.set(v1); ttl.remove(); } );
|
||||||
|
ttlTest(v0, ttl -> { ttl.set(v1); ttl.remove(); ttl.get(); }, v0);
|
||||||
|
ttlTest(v0, ttl -> { ttl.get(); ttl.remove(); ttl.set(v1); }, v1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@SafeVarargs
|
||||||
|
static <T> void ttlTest(T initialValue,
|
||||||
|
Consumer<? super TerminatingThreadLocal<T>> ttlOps,
|
||||||
|
T... expectedTerminatedValues)
|
||||||
|
{
|
||||||
|
List<T> terminatedValues = new CopyOnWriteArrayList<>();
|
||||||
|
|
||||||
|
TerminatingThreadLocal<T> ttl = new TerminatingThreadLocal<>() {
|
||||||
|
@Override
|
||||||
|
protected void threadTerminated(T value) {
|
||||||
|
terminatedValues.add(value);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected T initialValue() {
|
||||||
|
return initialValue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
Thread thread = new Thread(() -> ttlOps.accept(ttl));
|
||||||
|
thread.start();
|
||||||
|
try {
|
||||||
|
thread.join();
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!terminatedValues.equals(Arrays.asList(expectedTerminatedValues))) {
|
||||||
|
throw new AssertionError("Expected terminated values: " +
|
||||||
|
Arrays.toString(expectedTerminatedValues) +
|
||||||
|
" but got: " + terminatedValues);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -39,6 +39,7 @@ import static java.nio.file.StandardOpenOption.WRITE;
|
|||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* @test
|
* @test
|
||||||
@ -93,6 +94,7 @@ public class TestMaxCachedBufferSize {
|
|||||||
// by setting the jdk.nio.maxCachedBufferSize property.
|
// by setting the jdk.nio.maxCachedBufferSize property.
|
||||||
private static class Worker implements Runnable {
|
private static class Worker implements Runnable {
|
||||||
private final int id;
|
private final int id;
|
||||||
|
private final CountDownLatch finishLatch, exitLatch;
|
||||||
private final Random random = new Random();
|
private final Random random = new Random();
|
||||||
private long smallBufferCount = 0;
|
private long smallBufferCount = 0;
|
||||||
private long largeBufferCount = 0;
|
private long largeBufferCount = 0;
|
||||||
@ -152,6 +154,13 @@ public class TestMaxCachedBufferSize {
|
|||||||
}
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new Error("I/O error", e);
|
throw new Error("I/O error", e);
|
||||||
|
} finally {
|
||||||
|
finishLatch.countDown();
|
||||||
|
try {
|
||||||
|
exitLatch.await();
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
// ignore
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -160,8 +169,10 @@ public class TestMaxCachedBufferSize {
|
|||||||
loop();
|
loop();
|
||||||
}
|
}
|
||||||
|
|
||||||
public Worker(int id) {
|
public Worker(int id, CountDownLatch finishLatch, CountDownLatch exitLatch) {
|
||||||
this.id = id;
|
this.id = id;
|
||||||
|
this.finishLatch = finishLatch;
|
||||||
|
this.exitLatch = exitLatch;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -171,10 +182,6 @@ public class TestMaxCachedBufferSize {
|
|||||||
System.out.printf("Direct %d / %dK\n",
|
System.out.printf("Direct %d / %dK\n",
|
||||||
directCount, directTotalCapacity / 1024);
|
directCount, directTotalCapacity / 1024);
|
||||||
|
|
||||||
// Note that directCount could be < expectedCount. This can
|
|
||||||
// happen if a GC occurs after one of the worker threads exits
|
|
||||||
// since its thread-local DirectByteBuffer could be cleaned up
|
|
||||||
// before we reach here.
|
|
||||||
if (directCount > expectedCount) {
|
if (directCount > expectedCount) {
|
||||||
throw new Error(String.format(
|
throw new Error(String.format(
|
||||||
"inconsistent direct buffer total count, expected = %d, found = %d",
|
"inconsistent direct buffer total count, expected = %d, found = %d",
|
||||||
@ -208,46 +215,57 @@ public class TestMaxCachedBufferSize {
|
|||||||
threadNum, iters, maxBufferSize);
|
threadNum, iters, maxBufferSize);
|
||||||
System.out.println();
|
System.out.println();
|
||||||
|
|
||||||
|
final CountDownLatch finishLatch = new CountDownLatch(threadNum);
|
||||||
|
final CountDownLatch exitLatch = new CountDownLatch(1);
|
||||||
final Thread[] threads = new Thread[threadNum];
|
final Thread[] threads = new Thread[threadNum];
|
||||||
for (int i = 0; i < threadNum; i += 1) {
|
for (int i = 0; i < threadNum; i += 1) {
|
||||||
threads[i] = new Thread(new Worker(i));
|
threads[i] = new Thread(new Worker(i, finishLatch, exitLatch));
|
||||||
threads[i].start();
|
threads[i].start();
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
for (int i = 0; i < threadNum; i += 1) {
|
try {
|
||||||
threads[i].join();
|
finishLatch.await();
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
throw new Error("finishLatch.await() interrupted!", e);
|
||||||
}
|
}
|
||||||
} catch (InterruptedException e) {
|
|
||||||
throw new Error("join() interrupted!", e);
|
|
||||||
}
|
|
||||||
|
|
||||||
// There is an assumption here that, at this point, only the
|
// There is an assumption here that, at this point, only the
|
||||||
// cached DirectByteBuffers should be active. Given we
|
// cached DirectByteBuffers should be active. Given we
|
||||||
// haven't used any other DirectByteBuffers in this test, this
|
// haven't used any other DirectByteBuffers in this test, this
|
||||||
// should hold.
|
// should hold.
|
||||||
//
|
//
|
||||||
// Also note that we can only do the sanity checking at the
|
// Also note that we can only do the sanity checking at the
|
||||||
// end and not during the run given that, at any time, there
|
// end and not during the run given that, at any time, there
|
||||||
// could be buffers currently in use by some of the workers
|
// could be buffers currently in use by some of the workers
|
||||||
// that will not be cached.
|
// that will not be cached.
|
||||||
|
|
||||||
System.out.println();
|
System.out.println();
|
||||||
if (maxBufferSize < SMALL_BUFFER_MAX_SIZE) {
|
if (maxBufferSize < SMALL_BUFFER_MAX_SIZE) {
|
||||||
// The max buffer size is smaller than all buffers that
|
// The max buffer size is smaller than all buffers that
|
||||||
// were allocated. No buffers should have been cached.
|
// were allocated. No buffers should have been cached.
|
||||||
checkDirectBuffers(0, 0);
|
checkDirectBuffers(0, 0);
|
||||||
} else if (maxBufferSize < LARGE_BUFFER_MIN_SIZE) {
|
} else if (maxBufferSize < LARGE_BUFFER_MIN_SIZE) {
|
||||||
// The max buffer size is larger than all small buffers
|
// The max buffer size is larger than all small buffers
|
||||||
// but smaller than all large buffers that were
|
// but smaller than all large buffers that were
|
||||||
// allocated. Only small buffers could have been cached.
|
// allocated. Only small buffers could have been cached.
|
||||||
checkDirectBuffers(threadNum,
|
checkDirectBuffers(threadNum,
|
||||||
(long) threadNum * (long) SMALL_BUFFER_MAX_SIZE);
|
(long) threadNum * (long) SMALL_BUFFER_MAX_SIZE);
|
||||||
} else {
|
} else {
|
||||||
// The max buffer size is larger than all buffers that
|
// The max buffer size is larger than all buffers that
|
||||||
// were allocated. All buffers could have been cached.
|
// were allocated. All buffers could have been cached.
|
||||||
checkDirectBuffers(threadNum,
|
checkDirectBuffers(threadNum,
|
||||||
(long) threadNum * (long) LARGE_BUFFER_MAX_SIZE);
|
(long) threadNum * (long) LARGE_BUFFER_MAX_SIZE);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
exitLatch.countDown();
|
||||||
|
try {
|
||||||
|
for (int i = 0; i < threadNum; i += 1) {
|
||||||
|
threads[i].join();
|
||||||
|
}
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
// ignore
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user