8339979: VirtualThreadSchedulerMXBeanTest.testReduceParallelism fails intermittently

Reviewed-by: kevinw
This commit is contained in:
Alan Bateman 2024-10-03 14:02:40 +00:00
parent 21f8ccf4a9
commit eb93e6952b

@ -34,6 +34,8 @@
import java.lang.management.ManagementFactory;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.IntPredicate;
import java.util.function.LongPredicate;
import java.util.stream.Stream;
import java.util.stream.IntStream;
import javax.management.MBeanServer;
@ -140,23 +142,37 @@ class VirtualThreadSchedulerMXBeanTest {
try {
// increase parallelism + saturate
int newParallelism = parallelism + 4;
bean.setParallelism(newParallelism);
IntStream.range(0, newParallelism).forEach(_ -> executor.submit(busyTask));
awaitMountedVirtualThreadCountGte(bean, newParallelism);
int highParallelism = parallelism + 4;
bean.setParallelism(highParallelism);
IntStream.range(0, highParallelism).forEach(_ -> executor.submit(busyTask));
// reduce parallelism and workload
newParallelism = Math.clamp(parallelism / 2, 1, parallelism);
bean.setParallelism(newParallelism);
sleep.set(true);
// mounted virtual thread count should reduce
awaitMountedVirtualThreadCountLte(bean, newParallelism);
// increase workload, the mounted virtual thread count should not increase
sleep.set(false);
// mounted virtual thread count should increase to highParallelism.
// Sample the count at highParallelism a few times.
for (int i = 0; i < 5; i++) {
Thread.sleep(100);
assertTrue(bean.getMountedVirtualThreadCount() <= newParallelism);
awaitMountedVirtualThreadCountEq(bean, highParallelism);
}
// reduce parallelism and workload
int lowParallelism = Math.clamp(parallelism / 2, 1, parallelism);
bean.setParallelism(lowParallelism);
sleep.set(true);
// mounted virtual thread count should reduce to lowParallelism or less.
// Sample the count at lowParallelism or less a few times.
for (int i = 0; i < 5; i++) {
Thread.sleep(100);
awaitMountedVirtualThreadCountLte(bean, lowParallelism);
}
// increase workload
sleep.set(false);
// mounted virtual thread count should not exceed lowParallelism.
// Sample the count at lowParallelism a few times.
for (int i = 0; i < 5; i++) {
Thread.sleep(100);
awaitMountedVirtualThreadCountEq(bean, lowParallelism);
}
} finally {
@ -230,31 +246,64 @@ class VirtualThreadSchedulerMXBeanTest {
* Waits for pool size >= target to be true.
*/
void awaitPoolSizeGte(VirtualThreadSchedulerMXBean bean, int target) throws InterruptedException {
System.err.format("await pool size >= %d ...%n", target);
while (bean.getPoolSize() < target) {
Thread.sleep(10);
}
awaitPoolSize(bean, ps -> ps >= target, ">= " + target);
}
/**
* Waits for the mounted virtual thread count >= target to be true.
*/
void awaitMountedVirtualThreadCountGte(VirtualThreadSchedulerMXBean bean,
int target) throws InterruptedException {
System.err.format("await mounted virtual thread count >= %d ...%n", target);
while (bean.getMountedVirtualThreadCount() < target) {
Thread.sleep(10);
}
long target) throws InterruptedException {
awaitMountedVirtualThreadCount(bean, c -> c >= target, ">= " + target);
}
/**
* Waits for the mounted virtual thread count <= target to be true.
*/
void awaitMountedVirtualThreadCountLte(VirtualThreadSchedulerMXBean bean,
int target) throws InterruptedException {
System.err.format("await mounted virtual thread count <= %d ...%n", target);
while (bean.getMountedVirtualThreadCount() > target) {
Thread.sleep(10);
long target) throws InterruptedException {
awaitMountedVirtualThreadCount(bean, c -> c <= target, "<= " + target);
}
/**
* Waits for the mounted virtual thread count == target to be true.
*/
void awaitMountedVirtualThreadCountEq(VirtualThreadSchedulerMXBean bean,
long target) throws InterruptedException {
awaitMountedVirtualThreadCount(bean, c -> c == target, "== " + target);
}
/**
* Waits until evaluating the given predicte on the pool size is true.
*/
void awaitPoolSize(VirtualThreadSchedulerMXBean bean,
IntPredicate predicate,
String reason) throws InterruptedException {
int poolSize = bean.getPoolSize();
if (!predicate.test(poolSize)) {
System.err.format("poolSize = %d, await %s ...%n", poolSize, reason);
while (!predicate.test(poolSize)) {
Thread.sleep(10);
poolSize = bean.getPoolSize();
}
System.err.format("poolSize = %d%n", poolSize);
}
}
/**
* Waits until evaluating the given predicte on the mounted thread count is true.
*/
void awaitMountedVirtualThreadCount(VirtualThreadSchedulerMXBean bean,
LongPredicate predicate,
String reason) throws InterruptedException {
long count = bean.getMountedVirtualThreadCount();
if (!predicate.test(count)) {
System.err.format("mountedVirtualThreadCount = %d, await %s ...%n", count, reason);
while (!predicate.test(count)) {
Thread.sleep(10);
count = bean.getMountedVirtualThreadCount();
}
System.err.format("mountedVirtualThreadCount = %d%n", count);
}
}
}