8293940: Some tests for virtual threads take too long

Reviewed-by: dfuchs
This commit is contained in:
Alan Bateman 2022-09-30 16:41:33 +00:00
parent 1d26c4b149
commit b8f9a915a2
11 changed files with 371 additions and 305 deletions
test/jdk
java
jdk
incubator/concurrent/StructuredTaskScope
internal/misc/ThreadFlock

@ -21,23 +21,23 @@
* questions.
*/
/**
* @test
/*
* @test id=default
* @bug 8284161 8286788
* @summary Test Thread API with virtual threads
* @enablePreview
* @modules java.base/java.lang:+open
* @library /test/lib
* @run testng/othervm/timeout=300 ThreadAPI
* @run testng ThreadAPI
*/
/**
* @test
/*
* @test id=no-vmcontinuations
* @requires vm.continuations
* @enablePreview
* @modules java.base/java.lang:+open
* @library /test/lib
* @run testng/othervm/timeout=300 -XX:+UnlockExperimentalVMOptions -XX:-VMContinuations ThreadAPI
* @run testng/othervm -XX:+UnlockExperimentalVMOptions -XX:-VMContinuations ThreadAPI
*/
import java.time.Duration;
@ -52,6 +52,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -63,12 +64,41 @@ import java.nio.channels.Selector;
import jdk.test.lib.thread.VThreadRunner;
import org.testng.SkipException;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import static org.testng.Assert.*;
public class ThreadAPI {
private static final Object lock = new Object();
// used for scheduling thread interrupt
private ScheduledExecutorService scheduler;
@BeforeClass
public void setUp() throws Exception {
ThreadFactory factory = (task) -> {
Thread thread = new Thread(task);
thread.setDaemon(true);
return thread;
};
scheduler = Executors.newSingleThreadScheduledExecutor(factory);
}
@AfterClass
public void tearDown() {
scheduler.shutdown();
}
/**
* An operation that does not return a result but may throw an exception.
*/
@FunctionalInterface
interface ThrowingRunnable {
void run() throws Exception;
}
/**
* Test Thread.currentThread before/after park.
*/
@ -81,7 +111,7 @@ public class ThreadAPI {
LockSupport.park();
after.set(Thread.currentThread());
});
Thread.sleep(100); // give time for virtual thread to park
awaitParked(thread);
LockSupport.unpark(thread);
thread.join();
assertTrue(before.get() == thread);
@ -105,7 +135,7 @@ public class ThreadAPI {
});
synchronized (lock) {
thread.start();
Thread.sleep(100); // give time for virtual thread to block
awaitBlocked(thread);
}
thread.join();
assertTrue(ref1.get() == thread);
@ -135,7 +165,7 @@ public class ThreadAPI {
lock.lock();
try {
thread.start();
Thread.sleep(100); // give time for virtual thread to block
awaitParked(thread);
} finally {
lock.unlock();
}
@ -206,7 +236,7 @@ public class ThreadAPI {
try {
assertTrue(vthread.isVirtual());
// Thread.currentThread() returned the virtual thread
// Thread.currentThread() returned by the virtual thread
Thread current;
while ((current = ref.get()) == null) {
Thread.sleep(10);
@ -341,13 +371,13 @@ public class ThreadAPI {
public void testJoin3() throws Exception {
var thread = Thread.ofVirtual().start(LockSupport::park);
try {
thread.join(100);
thread.join(100, 0);
thread.join(100, 100);
thread.join(0, 100);
assertFalse(thread.join(Duration.ofMillis(-100)));
thread.join(20);
thread.join(20, 0);
thread.join(20, 20);
thread.join(0, 20);
assertFalse(thread.join(Duration.ofMillis(-20)));
assertFalse(thread.join(Duration.ofMillis(0)));
assertFalse(thread.join(Duration.ofMillis(100)));
assertFalse(thread.join(Duration.ofMillis(20)));
assertTrue(thread.isAlive());
} finally {
LockSupport.unpark(thread);
@ -414,7 +444,7 @@ public class ThreadAPI {
public void testJoin11() throws Exception {
var thread = Thread.ofVirtual().start(() -> {
try {
Thread.sleep(100);
Thread.sleep(50);
} catch (InterruptedException e) { }
});
assertTrue(thread.join(Duration.ofSeconds(10)));
@ -632,7 +662,7 @@ public class ThreadAPI {
*/
@Test
public void testJoin28() throws Exception {
long nanos = TimeUnit.NANOSECONDS.convert(2, TimeUnit.SECONDS);
long nanos = TimeUnit.NANOSECONDS.convert(100, TimeUnit.MILLISECONDS);
VThreadRunner.run(() -> {
var thread = new Thread(() -> LockSupport.parkNanos(nanos));
thread.start();
@ -658,7 +688,7 @@ public class ThreadAPI {
Thread.currentThread().interrupt();
try {
thread.join(Duration.ofSeconds(Integer.MAX_VALUE));
fail();
fail("join not interrupted");
} catch (InterruptedException expected) {
assertFalse(Thread.interrupted());
} finally {
@ -685,7 +715,7 @@ public class ThreadAPI {
scheduleInterrupt(Thread.currentThread(), 100);
try {
thread.join(Duration.ofSeconds(Integer.MAX_VALUE));
fail();
fail("join not interrupted");
} catch (InterruptedException expected) {
assertFalse(Thread.interrupted());
} finally {
@ -704,8 +734,8 @@ public class ThreadAPI {
public void testJoin31() throws Exception {
Thread thread = Thread.ofVirtual().start(() -> {
synchronized (lock) {
for (int i=0; i<10; i++) {
LockSupport.parkNanos(Duration.ofMillis(100).toNanos());
for (int i = 0; i < 10; i++) {
LockSupport.parkNanos(Duration.ofMillis(20).toNanos());
}
}
});
@ -732,12 +762,12 @@ public class ThreadAPI {
Thread thread = Thread.ofVirtual().start(() -> {
synchronized (lock) {
while (!done.get()) {
LockSupport.parkNanos(Duration.ofMillis(100).toNanos());
LockSupport.parkNanos(Duration.ofMillis(20).toNanos());
}
}
});
try {
assertFalse(thread.join(Duration.ofSeconds(1)));
assertFalse(thread.join(Duration.ofMillis(100)));
} finally {
done.set(true);
}
@ -856,7 +886,7 @@ public class ThreadAPI {
try {
try {
Thread.sleep(60*1000);
fail();
fail("sleep not interrupted");
} catch (InterruptedException e) {
// interrupt status should be reset
assertFalse(Thread.interrupted());
@ -865,7 +895,7 @@ public class ThreadAPI {
exception.set(e);
}
});
Thread.sleep(100); // give time for thread to block
awaitParked(thread);
thread.interrupt();
thread.join();
assertTrue(exception.get() == null);
@ -885,14 +915,14 @@ public class ThreadAPI {
exception.set(e);
}
});
Thread.sleep(100); // give time for thread to block
awaitParked(thread);
thread.interrupt();
thread.join();
assertTrue(exception.get() == null);
}
/**
* Test trying to park, wait or block with interrupt status set.
* Test trying to park with interrupt status set.
*/
@Test
public void testInterrupt8() throws Exception {
@ -902,20 +932,32 @@ public class ThreadAPI {
LockSupport.park();
assertTrue(Thread.interrupted());
});
}
/**
* Test trying to wait with interrupt status set.
*/
@Test
public void testInterrupt9() throws Exception {
VThreadRunner.run(() -> {
Thread me = Thread.currentThread();
me.interrupt();
synchronized (lock) {
try {
lock.wait();
fail();
fail("wait not interrupted");
} catch (InterruptedException expected) {
assertFalse(Thread.interrupted());
}
}
});
}
/**
* Test trying to block with interrupt status set.
*/
@Test
public void testInterrupt10() throws Exception {
VThreadRunner.run(() -> {
Thread me = Thread.currentThread();
me.interrupt();
@ -1185,147 +1227,106 @@ public class ThreadAPI {
}
/**
* Test Thread.sleep(2000), thread should sleep.
* Tasks that sleep for 1 second.
*/
@Test
public void testSleep3() throws Exception {
@DataProvider(name = "oneSecondSleepers")
public Object[][] oneSecondSleepers() {
ThrowingRunnable[] sleepers = {
() -> Thread.sleep(1000),
() -> Thread.sleep(Duration.ofSeconds(1))
};
return Arrays.stream(sleepers)
.map(s -> new Object[] { s })
.toArray(Object[][]::new);
}
/**
* Test Thread.sleep duration.
*/
@Test(dataProvider = "oneSecondSleepers")
public void testSleep3(ThrowingRunnable sleeper) throws Exception {
VThreadRunner.run(() -> {
long start = millisTime();
Thread.sleep(2000);
expectDuration(start, /*min*/1900, /*max*/4000);
});
VThreadRunner.run(() -> {
long start = millisTime();
Thread.sleep(2000, 0);
expectDuration(start, /*min*/1900, /*max*/4000);
});
VThreadRunner.run(() -> {
long start = millisTime();
Thread.sleep(Duration.ofMillis(2000));
expectDuration(start, /*min*/1900, /*max*/4000);
sleeper.run();
expectDuration(start, /*min*/900, /*max*/4000);
});
}
/**
* Tasks that sleep for zero or longer duration.
*/
@DataProvider(name = "sleepers")
public Object[][] sleepers() {
ThrowingRunnable[] sleepers = {
() -> Thread.sleep(0),
() -> Thread.sleep(0, 0),
() -> Thread.sleep(1000),
() -> Thread.sleep(1000, 0),
() -> Thread.sleep(Duration.ofMillis(0)),
() -> Thread.sleep(Duration.ofMillis(1000)),
};
return Arrays.stream(sleepers)
.map(s -> new Object[] { s })
.toArray(Object[][]::new);
}
/**
* Test Thread.sleep with interrupt status set.
*/
@Test(dataProvider = "sleepers")
public void testSleep4(ThrowingRunnable sleeper) throws Exception {
VThreadRunner.run(() -> {
Thread me = Thread.currentThread();
me.interrupt();
try {
sleeper.run();
fail("sleep was not interrupted");
} catch (InterruptedException e) {
// expected
assertFalse(me.isInterrupted());
}
});
}
/**
* Test Thread.sleep with interrupt status set and a negative duration.
*/
@Test
public void testSleep4() throws Exception {
VThreadRunner.run(() -> {
Thread me = Thread.currentThread();
me.interrupt();
try {
Thread.sleep(0);
fail();
} catch (InterruptedException e) {
// expected
assertFalse(me.isInterrupted());
}
});
VThreadRunner.run(() -> {
Thread me = Thread.currentThread();
me.interrupt();
try {
Thread.sleep(0, 0);
fail();
} catch (InterruptedException e) {
// expected
assertFalse(me.isInterrupted());
}
});
VThreadRunner.run(() -> {
Thread me = Thread.currentThread();
me.interrupt();
try {
Thread.sleep(1000);
fail();
} catch (InterruptedException e) {
// expected
assertFalse(me.isInterrupted());
}
});
VThreadRunner.run(() -> {
Thread me = Thread.currentThread();
me.interrupt();
try {
Thread.sleep(1000, 0);
fail();
} catch (InterruptedException e) {
// expected
assertFalse(me.isInterrupted());
}
});
VThreadRunner.run(() -> {
Thread me = Thread.currentThread();
me.interrupt();
Thread.sleep(Duration.ofMillis(-1000)); // does nothing
assertTrue(me.isInterrupted());
});
VThreadRunner.run(() -> {
Thread me = Thread.currentThread();
me.interrupt();
try {
Thread.sleep(Duration.ofMillis(0));
fail();
} catch (InterruptedException e) {
// expected
assertFalse(me.isInterrupted());
}
});
VThreadRunner.run(() -> {
Thread me = Thread.currentThread();
me.interrupt();
try {
Thread.sleep(Duration.ofMillis(1000));
fail();
} catch (InterruptedException e) {
// expected
assertFalse(me.isInterrupted());
}
});
}
/**
* Test interrupting Thread.sleep
* Tasks that sleep for a long time.
*/
@Test
public void testSleep5() throws Exception {
VThreadRunner.run(() -> {
Thread t = Thread.currentThread();
scheduleInterrupt(t, 2000);
try {
Thread.sleep(20*1000);
fail();
} catch (InterruptedException e) {
// interrupt status should be cleared
assertFalse(t.isInterrupted());
}
});
@DataProvider(name = "longSleepers")
public Object[][] longSleepers() {
ThrowingRunnable[] sleepers = {
() -> Thread.sleep(20_000),
() -> Thread.sleep(20_000, 0),
() -> Thread.sleep(Duration.ofSeconds(20)),
};
return Arrays.stream(sleepers)
.map(s -> new Object[] { s })
.toArray(Object[][]::new);
}
/**
* Test interrupting Thread.sleep.
*/
@Test(dataProvider = "longSleepers")
public void testSleep5(ThrowingRunnable sleeper) throws Exception {
VThreadRunner.run(() -> {
Thread t = Thread.currentThread();
scheduleInterrupt(t, 2000);
scheduleInterrupt(t, 100);
try {
Thread.sleep(20*1000, 0);
fail();
} catch (InterruptedException e) {
// interrupt status should be cleared
assertFalse(t.isInterrupted());
}
});
VThreadRunner.run(() -> {
Thread t = Thread.currentThread();
scheduleInterrupt(t, 2000);
try {
Thread.sleep(Duration.ofSeconds(20));
fail();
sleeper.run();
fail("sleep was not interrupted");
} catch (InterruptedException e) {
// interrupt status should be cleared
assertFalse(t.isInterrupted());
@ -1334,7 +1335,7 @@ public class ThreadAPI {
}
/**
* Test that Thread.sleep should not disrupt parking permit.
* Test that Thread.sleep does not disrupt parking permit.
*/
@Test
public void testSleep6() throws Exception {
@ -1342,8 +1343,8 @@ public class ThreadAPI {
LockSupport.unpark(Thread.currentThread());
long start = millisTime();
Thread.sleep(2000);
expectDuration(start, /*min*/1900, /*max*/4000);
Thread.sleep(1000);
expectDuration(start, /*min*/900, /*max*/4000);
// check that parking permit was not consumed
LockSupport.park();
@ -1357,20 +1358,17 @@ public class ThreadAPI {
public void testSleep7() throws Exception {
AtomicReference<Exception> exc = new AtomicReference<>();
var thread = Thread.ofVirtual().start(() -> {
long start = millisTime();
try {
Thread.sleep(2000);
long elapsed = millisTime() - start;
if (elapsed < 1900) {
exc.set(new RuntimeException("sleep too short"));
}
} catch (InterruptedException e) {
long start = millisTime();
Thread.sleep(1000);
expectDuration(start, /*min*/900, /*max*/4000);
} catch (Exception e) {
exc.set(e);
}
});
// attempt to disrupt sleep
for (int i=0; i<5; i++) {
for (int i = 0; i < 5; i++) {
Thread.sleep(20);
LockSupport.unpark(thread);
}
@ -1382,21 +1380,21 @@ public class ThreadAPI {
}
/**
* Test Thread.sleep when pinned
* Test Thread.sleep when pinned.
*/
@Test
public void testSleep8() throws Exception {
VThreadRunner.run(() -> {
long start = millisTime();
synchronized (lock) {
Thread.sleep(2000);
Thread.sleep(1000);
}
expectDuration(start, /*min*/1900, /*max*/4000);
expectDuration(start, /*min*/900, /*max*/4000);
});
}
/**
* Test Thread.sleep when pinned and with interrupt status set
* Test Thread.sleep when pinned and with interrupt status set.
*/
@Test
public void testSleep9() throws Exception {
@ -1407,7 +1405,7 @@ public class ThreadAPI {
synchronized (lock) {
Thread.sleep(2000);
}
fail();
fail("sleep not interrupted");
} catch (InterruptedException e) {
// expected
assertFalse(me.isInterrupted());
@ -1416,18 +1414,18 @@ public class ThreadAPI {
}
/**
* Test interrupting Thread.sleep when pinned
* Test interrupting Thread.sleep when pinned.
*/
@Test
public void testSleep10() throws Exception {
VThreadRunner.run(() -> {
Thread t = Thread.currentThread();
scheduleInterrupt(t, 2000);
scheduleInterrupt(t, 100);
try {
synchronized (lock) {
Thread.sleep(20 * 1000);
}
fail();
fail("sleep not interrupted");
} catch (InterruptedException e) {
// interrupt status should be cleared
assertFalse(t.isInterrupted());
@ -2293,18 +2291,34 @@ public class ThreadAPI {
assertTrue(thread.toString().contains("fred"));
}
/**
* Waits for the given thread to park.
*/
static void awaitParked(Thread thread) throws InterruptedException {
Thread.State state = thread.getState();
while (state != Thread.State.WAITING && state != Thread.State.TIMED_WAITING) {
assertTrue(state != Thread.State.TERMINATED, "Thread has terminated");
Thread.sleep(10);
state = thread.getState();
}
}
/**
* Waits for the given thread to block waiting on a monitor.
*/
static void awaitBlocked(Thread thread) throws InterruptedException {
Thread.State state = thread.getState();
while (state != Thread.State.BLOCKED) {
assertTrue(state != Thread.State.TERMINATED, "Thread has terminated");
Thread.sleep(10);
state = thread.getState();
}
}
/**
* Schedule a thread to be interrupted after a delay.
*/
private static void scheduleInterrupt(Thread thread, long delay) {
Runnable interruptTask = () -> {
try {
Thread.sleep(delay);
thread.interrupt();
} catch (Exception e) {
e.printStackTrace();
}
};
new Thread(interruptTask).start();
private void scheduleInterrupt(Thread thread, long delayInMillis) {
scheduler.schedule(thread::interrupt, delayInMillis, TimeUnit.MILLISECONDS);
}
}

@ -21,19 +21,19 @@
* questions.
*/
/**
/*
* @test
* @requires vm.debug != true
* @compile --enable-preview -source ${jdk.version} PinALot.java
* @run main/othervm --enable-preview PinALot
* @summary Stress test timed park when pinned
* @requires vm.debug != true
* @enablePreview
* @run main PinALot 500000
*/
/**
/*
* @test
* @requires vm.debug == true
* @compile --enable-preview -source ${jdk.version} PinALot.java
* @run main/othervm/timeout=300 --enable-preview PinALot 200000
* @enablePreview
* @run main/othervm/timeout=300 PinALot 200000
*/
import java.time.Duration;

@ -21,13 +21,20 @@
* questions.
*/
/**
* @test
* @summary Stress test virtual threads with SynchronousQueue and LinkedTransferQueue
/*
* @test id=sq
* @summary Stress test virtual threads with a SynchronousQueue
* @requires vm.debug != true
* @compile --enable-preview -source ${jdk.version} PingPong.java
* @run main/othervm --enable-preview PingPong SQ 1000000
* @run main/othervm --enable-preview PingPong LTQ 1000000
* @enablePreview
* @run main PingPong SQ 500000
*/
/*
* @test id=ltq
* @summary Stress test virtual threads with a LinkedTransferQueue
* @requires vm.debug != true
* @enablePreview
* @run main PingPong LTQ 500000
*/
import java.time.Duration;

@ -21,20 +21,20 @@
* questions.
*/
/**
/*
* @test
* @summary Stress test virtual threads with a variation of the Skynet 1M benchmark
* @requires vm.continuations
* @compile --enable-preview -source ${jdk.version} Skynet.java
* @run main/othervm/timeout=300 --enable-preview Skynet
* @enablePreview
* @run main/othervm/timeout=300 Skynet
*/
/**
/*
* @test
* @requires vm.debug == true & vm.continuations
* @requires vm.gc.Z
* @compile --enable-preview -source ${jdk.version} Skynet.java
* @run main/othervm/timeout=300 --enable-preview -XX:+UnlockDiagnosticVMOptions
* @enablePreview
* @run main/othervm/timeout=300 -XX:+UnlockDiagnosticVMOptions
* -XX:+ZVerifyViews -XX:ZCollectionInterval=0.01 Skynet
*/
@ -43,10 +43,9 @@ import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
public class Skynet {
public static final int ITERATIONS = 10;
public static void main(String[] args) {
for (int i = 0; i < ITERATIONS; i++) {
int iterations = (args.length > 0) ? Integer.parseInt(args[0]) : 10;
for (int i = 0; i < iterations; i++) {
skynet(1_000_000, 499999500000L);
}
}

@ -21,19 +21,19 @@
* questions.
*/
/**
/*
* @test
* @summary Stress test Thread.sleep
* @requires vm.debug != true & vm.continuations
* @compile --enable-preview -source ${jdk.version} SleepALot.java
* @run main/othervm --enable-preview SleepALot
* @enablePreview
* @run main/othervm SleepALot 500000
*/
/**
/*
* @test
* @requires vm.debug == true & vm.continuations
* @compile --enable-preview -source ${jdk.version} SleepALot.java
* @run main/othervm/timeout=300 --enable-preview SleepALot 200000
* @enablePreview
* @run main/othervm/timeout=300 SleepALot 200000
*/
import java.time.Duration;

@ -21,12 +21,12 @@
* questions.
*/
/**
/*
* @test
* @summary Stress parking with CompletableFuture timed get
* @requires vm.debug != true & vm.continuations
* @compile --enable-preview -source ${jdk.version} TimedGet.java
* @run main/othervm -Xmx1g --enable-preview TimedGet
* @enablePreview
* @run main/othervm -Xmx1g TimedGet 100000
*/
import java.time.Duration;

@ -21,19 +21,19 @@
* questions.
*/
/**
/*
* @test
* @summary Stress test Thread.yield
* @requires vm.debug != true
* @compile --enable-preview -source ${jdk.version} YieldALot.java
* @run main/othervm --enable-preview YieldALot
* @enablePreview
* @run main YieldALot 500000
*/
/**
/*
* @test
* @requires vm.debug == true
* @compile --enable-preview -source ${jdk.version} YieldALot.java
* @run main/othervm/timeout=360 --enable-preview YieldALot 200000
* @enablePreview
* @run main YieldALot 200000
*/
import java.time.Duration;

@ -25,8 +25,8 @@
* @test
* @summary Test ExecutorService.close, including default implementation
* @library ../lib
* @compile --enable-preview -source ${jdk.version} CloseTest.java
* @run testng/othervm --enable-preview CloseTest
* @enablePreview
* @run testng CloseTest
*/
import java.time.Duration;
@ -81,14 +81,14 @@ public class CloseTest {
@Test(dataProvider = "executors")
public void testCloseWithRunningTasks(ExecutorService executor) throws Exception {
Future<?> future = executor.submit(() -> {
Thread.sleep(Duration.ofSeconds(1));
Thread.sleep(Duration.ofMillis(100));
return "foo";
});
executor.close(); // waits for task to complete
assertTrue(executor.isShutdown());
assertTrue(executor.isTerminated());
assertTrue(executor.awaitTermination(10, TimeUnit.MILLISECONDS));
assertEquals(future.get(), "foo");
assertEquals(future.resultNow(), "foo");
}
/**
@ -99,7 +99,7 @@ public class CloseTest {
Phaser phaser = new Phaser(2);
Future<?> future = executor.submit(() -> {
phaser.arriveAndAwaitAdvance();
Thread.sleep(Duration.ofSeconds(1));
Thread.sleep(Duration.ofMillis(100));
return "foo";
});
phaser.arriveAndAwaitAdvance(); // wait for task to start
@ -110,7 +110,7 @@ public class CloseTest {
assertTrue(executor.isShutdown());
assertTrue(executor.isTerminated());
assertTrue(executor.awaitTermination(10, TimeUnit.MILLISECONDS));
assertEquals(future.get(), "foo");
assertEquals(future.resultNow(), "foo");
}
/**

@ -22,13 +22,20 @@
*/
/*
* @test
* @test id=platform
* @summary Basic tests for new thread-per-task executors
* @compile --enable-preview -source ${jdk.version} ThreadPerTaskExecutorTest.java
* @run testng/othervm/timeout=300 --enable-preview ThreadPerTaskExecutorTest
* @enablePreview
* @run testng/othervm -DthreadFactory=platform ThreadPerTaskExecutorTest
*/
/*
* @test id=virtual
* @enablePreview
* @run testng/othervm -DthreadFactory=virtual ThreadPerTaskExecutorTest
*/
import java.time.Duration;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@ -54,6 +61,7 @@ public class ThreadPerTaskExecutorTest {
};
private ScheduledExecutorService scheduler;
private Object[][] threadFactories;
@BeforeClass
public void setUp() throws Exception {
@ -62,7 +70,19 @@ public class ThreadPerTaskExecutorTest {
thread.setDaemon(true);
return thread;
};
scheduler = Executors.newSingleThreadScheduledExecutor(factory);
this.scheduler = Executors.newSingleThreadScheduledExecutor(factory);
// thread factories
String value = System.getProperty("threadFactory");
List<ThreadFactory> list = new ArrayList<>();
if (value == null || value.equals("platform"))
list.add(Thread.ofPlatform().factory());
if (value == null || value.equals("virtual"))
list.add(Thread.ofVirtual().factory());
assertTrue(list.size() > 0, "No thread factories for tests");
this.threadFactories = list.stream()
.map(f -> new Object[] { f })
.toArray(Object[][]::new);
}
@AfterClass
@ -72,20 +92,15 @@ public class ThreadPerTaskExecutorTest {
@DataProvider(name = "factories")
public Object[][] factories() {
return new Object[][] {
{ Executors.defaultThreadFactory(), },
{ Thread.ofVirtual().factory(), },
};
return threadFactories;
}
@DataProvider(name = "executors")
public Object[][] executors() {
var defaultThreadFactory = Executors.defaultThreadFactory();
var virtualThreadFactory = Thread.ofVirtual().factory();
return new Object[][] {
{ Executors.newThreadPerTaskExecutor(defaultThreadFactory), },
{ Executors.newThreadPerTaskExecutor(virtualThreadFactory), },
};
return Arrays.stream(threadFactories)
.map(f -> Executors.newThreadPerTaskExecutor((ThreadFactory) f[0]))
.map(e -> new Object[] { e })
.toArray(Object[][]::new);
}
/**
@ -216,7 +231,7 @@ public class ThreadPerTaskExecutorTest {
Future<String> future;
try (executor) {
future = executor.submit(() -> {
Thread.sleep(Duration.ofMillis(500));
Thread.sleep(Duration.ofMillis(50));
return "foo";
});
}
@ -401,7 +416,7 @@ public class ThreadPerTaskExecutorTest {
Callable<String> task2 = () -> { throw new FooException(); };
try {
executor.invokeAny(Set.of(task1, task2));
fail();
fail("invokeAny did not throw");
} catch (ExecutionException e) {
Throwable cause = e.getCause();
assertTrue(cause instanceof FooException);
@ -419,12 +434,12 @@ public class ThreadPerTaskExecutorTest {
class FooException extends Exception { }
Callable<String> task1 = () -> { throw new FooException(); };
Callable<String> task2 = () -> {
Thread.sleep(Duration.ofMillis(500));
Thread.sleep(Duration.ofMillis(50));
throw new FooException();
};
try {
executor.invokeAny(Set.of(task1, task2));
fail();
fail("invokeAny did not throw");
} catch (ExecutionException e) {
Throwable cause = e.getCause();
assertTrue(cause instanceof FooException);
@ -448,14 +463,14 @@ public class ThreadPerTaskExecutorTest {
/**
* Test invokeAny where some, not all, tasks complete normally. The
* completion of the last task is delayed.
* completion of the first task to complete normally is delayed.
*/
@Test(dataProvider = "executors")
public void testInvokeAny6(ExecutorService executor) throws Exception {
try (executor) {
class FooException extends Exception { }
Callable<String> task1 = () -> {
Thread.sleep(Duration.ofMillis(500));
Thread.sleep(Duration.ofMillis(50));
return "foo";
};
Callable<String> task2 = () -> { throw new FooException(); };
@ -558,7 +573,7 @@ public class ThreadPerTaskExecutorTest {
Thread.currentThread().interrupt();
try {
executor.invokeAny(Set.of(task1, task2));
fail();
fail("invokeAny did not throw");
} catch (InterruptedException expected) {
assertFalse(Thread.currentThread().isInterrupted());
} finally {
@ -584,7 +599,7 @@ public class ThreadPerTaskExecutorTest {
scheduleInterrupt(Thread.currentThread(), Duration.ofMillis(500));
try {
executor.invokeAny(Set.of(task1, task2));
fail();
fail("invokeAny did not throw");
} catch (InterruptedException expected) {
assertFalse(Thread.currentThread().isInterrupted());
} finally {
@ -657,7 +672,7 @@ public class ThreadPerTaskExecutorTest {
try (executor) {
Callable<String> task1 = () -> "foo";
Callable<String> task2 = () -> {
Thread.sleep(Duration.ofMillis(500));
Thread.sleep(Duration.ofMillis(50));
return "bar";
};
@ -684,7 +699,7 @@ public class ThreadPerTaskExecutorTest {
class BarException extends Exception { }
Callable<String> task1 = () -> { throw new FooException(); };
Callable<String> task2 = () -> {
Thread.sleep(Duration.ofMillis(500));
Thread.sleep(Duration.ofMillis(50));
throw new BarException();
};
@ -711,7 +726,7 @@ public class ThreadPerTaskExecutorTest {
try (executor) {
Callable<String> task1 = () -> "foo";
Callable<String> task2 = () -> {
Thread.sleep(Duration.ofMillis(500));
Thread.sleep(Duration.ofMillis(50));
return "bar";
};
@ -781,7 +796,7 @@ public class ThreadPerTaskExecutorTest {
Thread.currentThread().interrupt();
try {
executor.invokeAll(List.of(task1, task2));
fail();
fail("invokeAll did not throw");
} catch (InterruptedException expected) {
assertFalse(Thread.currentThread().isInterrupted());
} finally {
@ -805,7 +820,7 @@ public class ThreadPerTaskExecutorTest {
Thread.currentThread().interrupt();
try {
executor.invokeAll(List.of(task1, task2), 1, TimeUnit.SECONDS);
fail();
fail("invokeAll did not throw");
} catch (InterruptedException expected) {
assertFalse(Thread.currentThread().isInterrupted());
} finally {
@ -815,7 +830,7 @@ public class ThreadPerTaskExecutorTest {
}
/**
* Test interrupt with thread blocked in invokeAll
* Test interrupt with thread blocked in invokeAll.
*/
@Test(dataProvider = "executors")
public void testInvokeAllInterrupt4(ExecutorService executor) throws Exception {
@ -825,7 +840,7 @@ public class ThreadPerTaskExecutorTest {
scheduleInterrupt(Thread.currentThread(), Duration.ofMillis(500));
try {
executor.invokeAll(Set.of(task1, task2));
fail();
fail("invokeAll did not throw");
} catch (InterruptedException expected) {
assertFalse(Thread.currentThread().isInterrupted());
@ -841,7 +856,7 @@ public class ThreadPerTaskExecutorTest {
}
/**
* Test interrupt with thread blocked in timed-invokeAll
* Test interrupt with thread blocked in timed-invokeAll.
*/
@Test(dataProvider = "executors")
public void testInvokeAllInterrupt6(ExecutorService executor) throws Exception {
@ -851,7 +866,7 @@ public class ThreadPerTaskExecutorTest {
scheduleInterrupt(Thread.currentThread(), Duration.ofMillis(500));
try {
executor.invokeAll(Set.of(task1, task2), 1, TimeUnit.DAYS);
fail();
fail("invokeAll did not throw");
} catch (InterruptedException expected) {
assertFalse(Thread.currentThread().isInterrupted());

@ -22,12 +22,19 @@
*/
/*
* @test
* @test id=platform
* @bug 8284199
* @summary Basic tests for StructuredTaskScope
* @enablePreview
* @modules jdk.incubator.concurrent
* @run testng/othervm StructuredTaskScopeTest
* @run testng/othervm -DthreadFactory=platform StructuredTaskScopeTest
*/
/*
* @test id=virtual
* @enablePreview
* @modules jdk.incubator.concurrent
* @run testng/othervm -DthreadFactory=virtual StructuredTaskScopeTest
*/
import jdk.incubator.concurrent.StructuredTaskScope;
@ -37,6 +44,7 @@ import jdk.incubator.concurrent.StructureViolationException;
import java.time.Duration;
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.NoSuchElementException;
import java.util.List;
import java.util.Set;
@ -65,6 +73,7 @@ import static org.testng.Assert.*;
public class StructuredTaskScopeTest {
private ScheduledExecutorService scheduler;
private Object[][] threadFactories;
@BeforeClass
public void setUp() throws Exception {
@ -73,7 +82,19 @@ public class StructuredTaskScopeTest {
thread.setDaemon(true);
return thread;
};
scheduler = Executors.newSingleThreadScheduledExecutor(factory);
this.scheduler = Executors.newSingleThreadScheduledExecutor(factory);
// thread factories
String value = System.getProperty("threadFactory");
List<ThreadFactory> list = new ArrayList<>();
if (value == null || value.equals("platform"))
list.add(Thread.ofPlatform().factory());
if (value == null || value.equals("virtual"))
list.add(Thread.ofVirtual().factory());
assertTrue(list.size() > 0, "No thread factories for tests");
this.threadFactories = list.stream()
.map(f -> new Object[] { f })
.toArray(Object[][]::new);
}
@AfterClass
@ -86,12 +107,7 @@ public class StructuredTaskScopeTest {
*/
@DataProvider
public Object[][] factories() {
var defaultThreadFactory = Executors.defaultThreadFactory();
var virtualThreadFactory = Thread.ofVirtual().factory();
return new Object[][] {
{ defaultThreadFactory, },
{ virtualThreadFactory, },
};
return threadFactories;
}
/**
@ -284,9 +300,9 @@ public class StructuredTaskScopeTest {
return null;
});
// start a second task to shutdown the scope after 500ms
// start a second task to shutdown the scope after a short delay
Future<String> future2 = scope.fork(() -> {
Thread.sleep(Duration.ofMillis(500));
Thread.sleep(Duration.ofMillis(100));
scope.shutdown();
return null;
});
@ -319,7 +335,7 @@ public class StructuredTaskScopeTest {
public void testJoinWithThreads(ThreadFactory factory) throws Exception {
try (var scope = new StructuredTaskScope(null, factory)) {
Future<String> future = scope.fork(() -> {
Thread.sleep(Duration.ofMillis(500));
Thread.sleep(Duration.ofMillis(50));
return "foo";
});
scope.join();
@ -362,7 +378,7 @@ public class StructuredTaskScopeTest {
public void testInterruptJoin1(ThreadFactory factory) throws Exception {
try (var scope = new StructuredTaskScope(null, factory)) {
Future<String> future = scope.fork(() -> {
Thread.sleep(Duration.ofSeconds(3));
Thread.sleep(Duration.ofMillis(100));
return "foo";
});
@ -370,7 +386,7 @@ public class StructuredTaskScopeTest {
Thread.currentThread().interrupt();
try {
scope.join();
fail();
fail("join did not throw");
} catch (InterruptedException expected) {
assertFalse(Thread.interrupted()); // interrupt status should be clear
}
@ -396,7 +412,7 @@ public class StructuredTaskScopeTest {
scheduleInterrupt(Thread.currentThread(), Duration.ofMillis(500));
try {
scope.join();
fail();
fail("join did not throw");
} catch (InterruptedException expected) {
assertFalse(Thread.interrupted()); // interrupt status should be clear
}
@ -446,7 +462,7 @@ public class StructuredTaskScopeTest {
return "foo";
});
Future<String> future2 = scope.fork(() -> {
Thread.sleep(Duration.ofMillis(500));
Thread.sleep(Duration.ofMillis(50));
return null;
});
scope.join();
@ -542,8 +558,8 @@ public class StructuredTaskScopeTest {
try {
for (int i = 0; i < 3; i++) {
try {
scope.joinUntil(Instant.now().plusSeconds(1));
fail();
scope.joinUntil(Instant.now().plusMillis(50));
fail("joinUntil did not throw");
} catch (TimeoutException expected) {
assertFalse(future.isDone());
}
@ -572,7 +588,7 @@ public class StructuredTaskScopeTest {
// now
try {
scope.joinUntil(Instant.now());
fail();
fail("joinUntil did not throw");
} catch (TimeoutException expected) {
assertFalse(future.isDone());
}
@ -580,7 +596,7 @@ public class StructuredTaskScopeTest {
// in the past
try {
scope.joinUntil(Instant.now().minusSeconds(1));
fail();
fail("joinUntil did not throw");
} catch (TimeoutException expected) {
assertFalse(future.isDone());
}
@ -598,15 +614,15 @@ public class StructuredTaskScopeTest {
public void testInterruptJoinUntil1(ThreadFactory factory) throws Exception {
try (var scope = new StructuredTaskScope(null, factory)) {
Future<String> future = scope.fork(() -> {
Thread.sleep(Duration.ofSeconds(3));
Thread.sleep(Duration.ofMillis(100));
return "foo";
});
// join should throw
// joinUntil should throw
Thread.currentThread().interrupt();
try {
scope.joinUntil(Instant.now().plusSeconds(10));
fail();
fail("joinUntil did not throw");
} catch (InterruptedException expected) {
assertFalse(Thread.interrupted()); // interrupt status should be clear
}
@ -618,7 +634,7 @@ public class StructuredTaskScopeTest {
}
/**
* Test interrupt of thread blocked in joinUntil
* Test interrupt of thread blocked in joinUntil.
*/
@Test(dataProvider = "factories")
public void testInterruptJoinUntil2(ThreadFactory factory) throws Exception {
@ -628,11 +644,11 @@ public class StructuredTaskScopeTest {
return "foo";
});
// join should throw
// joinUntil should throw
scheduleInterrupt(Thread.currentThread(), Duration.ofMillis(500));
try {
scope.joinUntil(Instant.now().plusSeconds(10));
fail();
fail("joinUntil did not throw");
} catch (InterruptedException expected) {
assertFalse(Thread.interrupted()); // interrupt status should be clear
}
@ -787,7 +803,7 @@ public class StructuredTaskScopeTest {
scope.join();
// release task after a delay
scheduler.schedule(latch::countDown, 1, TimeUnit.SECONDS);
scheduler.schedule(latch::countDown, 100, TimeUnit.MILLISECONDS);
// invoke close with interrupt status set
Thread.currentThread().interrupt();
@ -846,7 +862,7 @@ public class StructuredTaskScopeTest {
scope1.join();
try {
scope1.close();
fail();
fail("close did not throw");
} catch (StructureViolationException expected) { }
// underlying flock should be closed, fork should return a cancelled task
@ -870,7 +886,7 @@ public class StructuredTaskScopeTest {
try (var scope = new StructuredTaskScope(null, factory)) {
Future<String> future = scope.fork(() -> {
Thread.sleep(Duration.ofMillis(100));
Thread.sleep(Duration.ofMillis(20));
return "foo";
});
@ -890,7 +906,7 @@ public class StructuredTaskScopeTest {
try (var scope = new StructuredTaskScope(null, factory)) {
Future<String> future = scope.fork(() -> {
Thread.sleep(Duration.ofMillis(100));
Thread.sleep(Duration.ofMillis(20));
throw new FooException();
});
@ -917,8 +933,8 @@ public class StructuredTaskScopeTest {
// timed-get, should timeout
try {
future.get(100, TimeUnit.MICROSECONDS);
fail();
future.get(20, TimeUnit.MILLISECONDS);
fail("Future.get did not throw");
} catch (TimeoutException expected) { }
future.cancel(true);

@ -22,11 +22,18 @@
*/
/*
* @test
* @test id=platform
* @summary Basic tests for ThreadFlock
* @modules java.base/jdk.internal.misc
* @compile --enable-preview -source ${jdk.version} ThreadFlockTest.java
* @run testng/othervm --enable-preview ThreadFlockTest
* @enablePreview
* @run testng/othervm -DthreadFactory=platform ThreadFlockTest
*/
/*
* @test id=virtual
* @modules java.base/jdk.internal.misc
* @enablePreview
* @run testng/othervm -DthreadFactory=virtual ThreadFlockTest
*/
import java.time.Duration;
@ -46,6 +53,7 @@ import static org.testng.Assert.*;
public class ThreadFlockTest {
private ScheduledExecutorService scheduler;
private Object[][] threadFactories;
@BeforeClass
public void setUp() throws Exception {
@ -54,7 +62,19 @@ public class ThreadFlockTest {
thread.setDaemon(true);
return thread;
};
scheduler = Executors.newSingleThreadScheduledExecutor(factory);
this.scheduler = Executors.newSingleThreadScheduledExecutor(factory);
// thread factories
String value = System.getProperty("threadFactory");
List<ThreadFactory> list = new ArrayList<>();
if (value == null || value.equals("platform"))
list.add(Thread.ofPlatform().factory());
if (value == null || value.equals("virtual"))
list.add(Thread.ofVirtual().factory());
assertTrue(list.size() > 0, "No thread factories for tests");
this.threadFactories = list.stream()
.map(f -> new Object[] { f })
.toArray(Object[][]::new);
}
@AfterClass
@ -64,12 +84,7 @@ public class ThreadFlockTest {
@DataProvider(name = "factories")
public Object[][] factories() {
var defaultThreadFactory = Executors.defaultThreadFactory();
var virtualThreadFactory = Thread.ofVirtual().factory();
return new Object[][] {
{ defaultThreadFactory, },
{ virtualThreadFactory, },
};
return threadFactories;
}
/**
@ -381,7 +396,7 @@ public class ThreadFlockTest {
AtomicBoolean done = new AtomicBoolean();
Runnable task = () -> {
try {
Thread.sleep(Duration.ofSeconds(1));
Thread.sleep(Duration.ofMillis(50));
done.set(true);
} catch (InterruptedException e) { }
};
@ -431,7 +446,7 @@ public class ThreadFlockTest {
long startMillis = millisTime();
try {
flock.awaitAll(Duration.ofSeconds(2));
fail();
fail("awaitAll did not throw");
} catch (TimeoutException e) {
checkDuration(startMillis, 1900, 4000);
}
@ -458,8 +473,8 @@ public class ThreadFlockTest {
try {
for (int i = 0; i < 3; i++) {
try {
flock.awaitAll(Duration.ofSeconds(1));
fail();
flock.awaitAll(Duration.ofMillis(50));
fail("awaitAll did not throw");
} catch (TimeoutException expected) { }
}
} finally {
@ -488,11 +503,11 @@ public class ThreadFlockTest {
try {
try {
flock.awaitAll(Duration.ofSeconds(0));
fail();
fail("awaitAll did not throw");
} catch (TimeoutException expected) { }
try {
flock.awaitAll(Duration.ofSeconds(-1));
fail();
fail("awaitAll did not throw");
} catch (TimeoutException expected) { }
} finally {
thread.interrupt();
@ -527,7 +542,7 @@ public class ThreadFlockTest {
Thread.currentThread().interrupt();
try {
flock.awaitAll();
fail();
fail("awaitAll did not throw");
} catch (InterruptedException e) {
// interrupt status should be clear
assertFalse(Thread.currentThread().isInterrupted());
@ -537,9 +552,9 @@ public class ThreadFlockTest {
Thread.currentThread().interrupt();
try {
flock.awaitAll(Duration.ofSeconds(30));
fail();
fail("awaitAll did not throw");
} catch (TimeoutException e) {
fail();
fail("TimeoutException not expected");
} catch (InterruptedException e) {
// interrupt status should be clear
assertFalse(Thread.currentThread().isInterrupted());
@ -579,7 +594,7 @@ public class ThreadFlockTest {
scheduleInterrupt(Thread.currentThread(), Duration.ofMillis(500));
try {
flock.awaitAll();
fail();
fail("awaitAll did not throw");
} catch (InterruptedException e) {
// interrupt status should be clear
assertFalse(Thread.currentThread().isInterrupted());
@ -588,9 +603,9 @@ public class ThreadFlockTest {
scheduleInterrupt(Thread.currentThread(), Duration.ofMillis(500));
try {
flock.awaitAll(Duration.ofSeconds(30));
fail();
fail("awaitAll did not throw");
} catch (TimeoutException e) {
fail();
fail("TimeoutException not expected");
} catch (InterruptedException e) {
// interrupt status should be clear
assertFalse(Thread.currentThread().isInterrupted());
@ -706,7 +721,7 @@ public class ThreadFlockTest {
// schedule thread to invoke wakeup
Thread thread2 = factory.newThread(() -> {
try { Thread.sleep(Duration.ofSeconds(1)); } catch (Exception e) { }
try { Thread.sleep(Duration.ofMillis(500)); } catch (Exception e) { }
flock.wakeup();
});
flock.start(thread2);
@ -782,7 +797,7 @@ public class ThreadFlockTest {
var exception = new AtomicReference<Exception>();
Runnable sleepTask = () -> {
try {
Thread.sleep(Duration.ofSeconds(1));
Thread.sleep(Duration.ofMillis(50));
} catch (Exception e) {
exception.set(e);
}
@ -958,7 +973,7 @@ public class ThreadFlockTest {
try (var flock2 = ThreadFlock.open("flock2")) {
try {
flock1.close();
fail();
fail("close did not throw");
} catch (RuntimeException e) {
assertTrue(e.toString().contains("Structure"));
}