8321270: Virtual Thread.yield consumes parking permit

Reviewed-by: sspitsyn
This commit is contained in:
Alan Bateman 2023-12-07 11:41:41 +00:00
parent 0b0fa47f84
commit 29d7a22348
4 changed files with 139 additions and 79 deletions

View File

@ -1986,26 +1986,27 @@ int java_lang_VirtualThread::state(oop vthread) {
JavaThreadStatus java_lang_VirtualThread::map_state_to_thread_status(int state) { JavaThreadStatus java_lang_VirtualThread::map_state_to_thread_status(int state) {
JavaThreadStatus status = JavaThreadStatus::NEW; JavaThreadStatus status = JavaThreadStatus::NEW;
switch (state & ~SUSPENDED) { switch (state & ~SUSPENDED) {
case NEW : case NEW:
status = JavaThreadStatus::NEW; status = JavaThreadStatus::NEW;
break; break;
case STARTED : case STARTED:
case RUNNABLE : case RUNNING:
case RUNNING : case PARKING:
case PARKING :
case TIMED_PARKING: case TIMED_PARKING:
case YIELDING : case UNPARKED:
case YIELDING:
case YIELDED:
status = JavaThreadStatus::RUNNABLE; status = JavaThreadStatus::RUNNABLE;
break; break;
case PARKED : case PARKED:
case PINNED : case PINNED:
status = JavaThreadStatus::PARKED; status = JavaThreadStatus::PARKED;
break; break;
case TIMED_PARKED: case TIMED_PARKED:
case TIMED_PINNED: case TIMED_PINNED:
status = JavaThreadStatus::PARKED_TIMED; status = JavaThreadStatus::PARKED_TIMED;
break; break;
case TERMINATED : case TERMINATED:
status = JavaThreadStatus::TERMINATED; status = JavaThreadStatus::TERMINATED;
break; break;
default: default:

View File

@ -523,15 +523,16 @@ class java_lang_VirtualThread : AllStatic {
enum { enum {
NEW = 0, NEW = 0,
STARTED = 1, STARTED = 1,
RUNNABLE = 2, RUNNING = 2,
RUNNING = 3, PARKING = 3,
PARKING = 4, PARKED = 4,
PARKED = 5, PINNED = 5,
PINNED = 6, TIMED_PARKING = 6,
TIMED_PARKING = 7, TIMED_PARKED = 7,
TIMED_PARKED = 8, TIMED_PINNED = 8,
TIMED_PINNED = 9, UNPARKED = 9,
YIELDING = 10, YIELDING = 10,
YIELDED = 11,
TERMINATED = 99, TERMINATED = 99,
// additional state bits // additional state bits

View File

@ -96,37 +96,37 @@ final class VirtualThread extends BaseVirtualThread {
* RUNNING -> PARKING // Thread parking with LockSupport.park * RUNNING -> PARKING // Thread parking with LockSupport.park
* PARKING -> PARKED // cont.yield successful, parked indefinitely * PARKING -> PARKED // cont.yield successful, parked indefinitely
* PARKING -> PINNED // cont.yield failed, parked indefinitely on carrier * PARKING -> PINNED // cont.yield failed, parked indefinitely on carrier
* PARKED -> RUNNABLE // unparked, schedule to continue * PARKED -> UNPARKED // unparked, may be scheduled to continue
* PINNED -> RUNNING // unparked, continue execution on same carrier * PINNED -> RUNNING // unparked, continue execution on same carrier
* UNPARKED -> RUNNING // continue execution after park
* *
* RUNNING -> TIMED_PARKING // Thread parking with LockSupport.parkNanos * RUNNING -> TIMED_PARKING // Thread parking with LockSupport.parkNanos
* TIMED_PARKING -> TIMED_PARKED // cont.yield successful, timed-parked * TIMED_PARKING -> TIMED_PARKED // cont.yield successful, timed-parked
* TIMED_PARKING -> TIMED_PINNED // cont.yield failed, timed-parked on carrier * TIMED_PARKING -> TIMED_PINNED // cont.yield failed, timed-parked on carrier
* TIMED_PARKED -> RUNNABLE // unparked, schedule to continue * TIMED_PARKED -> UNPARKED // unparked, may be scheduled to continue
* TIMED_PINNED -> RUNNING // unparked, continue execution on same carrier * TIMED_PINNED -> RUNNING // unparked, continue execution on same carrier
* *
* RUNNABLE -> RUNNING // continue execution
*
* RUNNING -> YIELDING // Thread.yield * RUNNING -> YIELDING // Thread.yield
* YIELDING -> RUNNABLE // yield successful * YIELDING -> YIELDED // cont.yield successful, may be scheduled to continue
* YIELDING -> RUNNING // yield failed * YIELDING -> RUNNING // cont.yield failed
* YIELDED -> RUNNING // continue execution after Thread.yield
*/ */
private static final int NEW = 0; private static final int NEW = 0;
private static final int STARTED = 1; private static final int STARTED = 1;
private static final int RUNNABLE = 2; // runnable-unmounted private static final int RUNNING = 2; // runnable-mounted
private static final int RUNNING = 3; // runnable-mounted
// untimed parking // untimed and timed parking
private static final int PARKING = 4; private static final int PARKING = 3;
private static final int PARKED = 5; // unmounted private static final int PARKED = 4; // unmounted
private static final int PINNED = 6; // mounted private static final int PINNED = 5; // mounted
private static final int TIMED_PARKING = 6;
private static final int TIMED_PARKED = 7; // unmounted
private static final int TIMED_PINNED = 8; // mounted
private static final int UNPARKED = 9; // unmounted but runnable
// timed parking // Thread.yield
private static final int TIMED_PARKING = 7; private static final int YIELDING = 10;
private static final int TIMED_PARKED = 8; private static final int YIELDED = 11; // unmounted but runnable
private static final int TIMED_PINNED = 9;
private static final int YIELDING = 10; // Thread.yield
private static final int TERMINATED = 99; // final state private static final int TERMINATED = 99; // final state
@ -218,11 +218,15 @@ final class VirtualThread extends BaseVirtualThread {
// set state to RUNNING // set state to RUNNING
int initialState = state(); int initialState = state();
if (initialState == STARTED && compareAndSetState(STARTED, RUNNING)) { if (initialState == STARTED || initialState == UNPARKED || initialState == YIELDED) {
// first run // newly started or continue after parking/blocking/Thread.yield
} else if (initialState == RUNNABLE && compareAndSetState(RUNNABLE, RUNNING)) { if (!compareAndSetState(initialState, RUNNING)) {
// consume parking permit return;
setParkPermit(false); }
// consume parking permit when continuing after parking
if (initialState == UNPARKED) {
setParkPermit(false);
}
} else { } else {
// not runnable // not runnable
return; return;
@ -244,8 +248,7 @@ final class VirtualThread extends BaseVirtualThread {
/** /**
* Submits the runContinuation task to the scheduler. For the default scheduler, * Submits the runContinuation task to the scheduler. For the default scheduler,
* and calling it on a worker thread, the task will be pushed to the local queue, * and calling it on a worker thread, the task will be pushed to the local queue,
* otherwise it will be pushed to a submission queue. * otherwise it will be pushed to an external submission queue.
*
* @throws RejectedExecutionException * @throws RejectedExecutionException
*/ */
private void submitRunContinuation() { private void submitRunContinuation() {
@ -258,7 +261,7 @@ final class VirtualThread extends BaseVirtualThread {
} }
/** /**
* Submits the runContinuation task to the scheduler with a lazy submit. * Submits the runContinuation task to given scheduler with a lazy submit.
* @throws RejectedExecutionException * @throws RejectedExecutionException
* @see ForkJoinPool#lazySubmit(ForkJoinTask) * @see ForkJoinPool#lazySubmit(ForkJoinTask)
*/ */
@ -272,7 +275,7 @@ final class VirtualThread extends BaseVirtualThread {
} }
/** /**
* Submits the runContinuation task to the scheduler as an external submit. * Submits the runContinuation task to the given scheduler as an external submit.
* @throws RejectedExecutionException * @throws RejectedExecutionException
* @see ForkJoinPool#externalSubmit(ForkJoinTask) * @see ForkJoinPool#externalSubmit(ForkJoinTask)
*/ */
@ -457,7 +460,7 @@ final class VirtualThread extends BaseVirtualThread {
setState(newState); setState(newState);
// may have been unparked while parking // may have been unparked while parking
if (parkPermit && compareAndSetState(newState, RUNNABLE)) { if (parkPermit && compareAndSetState(newState, UNPARKED)) {
// lazy submit to continue on the current thread as carrier if possible // lazy submit to continue on the current thread as carrier if possible
if (currentThread() instanceof CarrierThread ct) { if (currentThread() instanceof CarrierThread ct) {
lazySubmitRunContinuation(ct.getPool()); lazySubmitRunContinuation(ct.getPool());
@ -471,7 +474,7 @@ final class VirtualThread extends BaseVirtualThread {
// Thread.yield // Thread.yield
if (s == YIELDING) { if (s == YIELDING) {
setState(RUNNABLE); setState(YIELDED);
// external submit if there are no tasks in the local task queue // external submit if there are no tasks in the local task queue
if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) { if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) {
@ -618,7 +621,7 @@ final class VirtualThread extends BaseVirtualThread {
long startTime = System.nanoTime(); long startTime = System.nanoTime();
boolean yielded = false; boolean yielded = false;
Future<?> unparker = scheduleUnpark(this::unpark, nanos); Future<?> unparker = scheduleUnpark(nanos); // may throw OOME
setState(TIMED_PARKING); setState(TIMED_PARKING);
try { try {
yielded = yieldContinuation(); // may throw yielded = yieldContinuation(); // may throw
@ -683,14 +686,15 @@ final class VirtualThread extends BaseVirtualThread {
} }
/** /**
* Schedule an unpark task to run after a given delay. * Schedule this virtual thread to be unparked after a given delay.
*/ */
@ChangesCurrentThread @ChangesCurrentThread
private Future<?> scheduleUnpark(Runnable unparker, long nanos) { private Future<?> scheduleUnpark(long nanos) {
assert Thread.currentThread() == this;
// need to switch to current carrier thread to avoid nested parking // need to switch to current carrier thread to avoid nested parking
switchToCarrierThread(); switchToCarrierThread();
try { try {
return UNPARKER.schedule(unparker, nanos, NANOSECONDS); return UNPARKER.schedule(this::unpark, nanos, NANOSECONDS);
} finally { } finally {
switchToVirtualThread(this); switchToVirtualThread(this);
} }
@ -726,7 +730,7 @@ final class VirtualThread extends BaseVirtualThread {
if (!getAndSetParkPermit(true) && currentThread != this) { if (!getAndSetParkPermit(true) && currentThread != this) {
int s = state(); int s = state();
boolean parked = (s == PARKED) || (s == TIMED_PARKED); boolean parked = (s == PARKED) || (s == TIMED_PARKED);
if (parked && compareAndSetState(s, RUNNABLE)) { if (parked && compareAndSetState(s, UNPARKED)) {
if (currentThread instanceof VirtualThread vthread) { if (currentThread instanceof VirtualThread vthread) {
vthread.switchToCarrierThread(); vthread.switchToCarrierThread();
try { try {
@ -738,7 +742,7 @@ final class VirtualThread extends BaseVirtualThread {
submitRunContinuation(); submitRunContinuation();
} }
} else if ((s == PINNED) || (s == TIMED_PINNED)) { } else if ((s == PINNED) || (s == TIMED_PINNED)) {
// unpark carrier thread when pinned. // unpark carrier thread when pinned
synchronized (carrierThreadAccessLock()) { synchronized (carrierThreadAccessLock()) {
Thread carrier = carrierThread; Thread carrier = carrierThread;
if (carrier != null && ((s = state()) == PINNED || s == TIMED_PINNED)) { if (carrier != null && ((s = state()) == PINNED || s == TIMED_PINNED)) {
@ -889,7 +893,8 @@ final class VirtualThread extends BaseVirtualThread {
} else { } else {
return Thread.State.RUNNABLE; return Thread.State.RUNNABLE;
} }
case RUNNABLE: case UNPARKED:
case YIELDED:
// runnable, not mounted // runnable, not mounted
return Thread.State.RUNNABLE; return Thread.State.RUNNABLE;
case RUNNING: case RUNNING:
@ -905,7 +910,7 @@ final class VirtualThread extends BaseVirtualThread {
case PARKING: case PARKING:
case TIMED_PARKING: case TIMED_PARKING:
case YIELDING: case YIELDING:
// runnable, mounted, not yet waiting // runnable, in transition
return Thread.State.RUNNABLE; return Thread.State.RUNNABLE;
case PARKED: case PARKED:
case PINNED: case PINNED:
@ -947,35 +952,58 @@ final class VirtualThread extends BaseVirtualThread {
/** /**
* Returns the stack trace for this virtual thread if it is unmounted. * Returns the stack trace for this virtual thread if it is unmounted.
* Returns null if the thread is in another state. * Returns null if the thread is mounted or in transition.
*/ */
private StackTraceElement[] tryGetStackTrace() { private StackTraceElement[] tryGetStackTrace() {
int initialState = state(); int initialState = state();
return switch (initialState) { switch (initialState) {
case RUNNABLE, PARKED, TIMED_PARKED -> { case NEW, STARTED, TERMINATED -> {
int suspendedState = initialState | SUSPENDED; return new StackTraceElement[0]; // unmounted, empty stack
if (compareAndSetState(initialState, suspendedState)) {
try {
yield cont.getStackTrace();
} finally {
assert state == suspendedState;
setState(initialState);
// re-submit if runnable
// re-submit if unparked while suspended
if (initialState == RUNNABLE
|| (parkPermit && compareAndSetState(initialState, RUNNABLE))) {
try {
submitRunContinuation();
} catch (RejectedExecutionException ignore) { }
}
}
}
yield null;
} }
case NEW, STARTED, TERMINATED -> new StackTraceElement[0]; // empty stack case RUNNING, PINNED -> {
default -> null; return null; // mounted
}
case PARKED, TIMED_PARKED -> {
// unmounted, not runnable
}
case UNPARKED, YIELDED -> {
// unmounted, runnable
}
case PARKING, TIMED_PARKING, YIELDING -> {
return null; // in transition
}
default -> throw new InternalError();
}
// thread is unmounted, prevent it from continuing
int suspendedState = initialState | SUSPENDED;
if (!compareAndSetState(initialState, suspendedState)) {
return null;
}
// get stack trace and restore state
StackTraceElement[] stack;
try {
stack = cont.getStackTrace();
} finally {
assert state == suspendedState;
setState(initialState);
}
boolean resubmit = switch (initialState) {
case UNPARKED, YIELDED -> {
// resubmit as task may have run while suspended
yield true;
}
case PARKED, TIMED_PARKED -> {
// resubmit if unparked while suspended
yield parkPermit && compareAndSetState(initialState, UNPARKED);
}
default -> throw new InternalError();
}; };
if (resubmit) {
submitRunContinuation();
}
return stack;
} }
@Override @Override

View File

@ -23,7 +23,7 @@
/* /*
* @test id=default * @test id=default
* @bug 8284161 8286788 * @bug 8284161 8286788 8321270
* @summary Test Thread API with virtual threads * @summary Test Thread API with virtual threads
* @modules java.base/java.lang:+open * @modules java.base/java.lang:+open
* @library /test/lib * @library /test/lib
@ -1191,6 +1191,36 @@ class ThreadAPI {
assertEquals(List.of("A", "A", "B"), list); assertEquals(List.of("A", "A", "B"), list);
} }
/**
* Test that Thread.yield does not consume the thread's parking permit.
*/
@Test
void testYield3() throws Exception {
var thread = Thread.ofVirtual().start(() -> {
LockSupport.unpark(Thread.currentThread());
Thread.yield();
LockSupport.park(); // should not park
});
thread.join();
}
/**
* Test that Thread.yield does not make available the thread's parking permit.
*/
@Test
void testYield4() throws Exception {
var thread = Thread.ofVirtual().start(() -> {
Thread.yield();
LockSupport.park(); // should park
});
try {
await(thread, Thread.State.WAITING);
} finally {
LockSupport.unpark(thread);
thread.join();
}
}
/** /**
* Test Thread.onSpinWait. * Test Thread.onSpinWait.
*/ */