8286294: ForkJoinPool.commonPool().close() spins

Reviewed-by: alanb
This commit is contained in:
Doug Lea 2022-05-09 11:09:34 +00:00
parent d4474b5816
commit 4f5d73f2d4
3 changed files with 113 additions and 23 deletions

View File

@ -3529,6 +3529,54 @@ public class ForkJoinPool extends AbstractExecutorService {
return (helpQuiescePool(this, unit.toNanos(timeout), false) > 0);
}
/**
* Unless this is the {@link #commonPool()}, initiates an orderly
* shutdown in which previously submitted tasks are executed, but
* no new tasks will be accepted, and waits until all tasks have
* completed execution and the executor has terminated.
*
* <p> If already terminated, or this is the {@link
* #commonPool()}, this method has no effect on execution, and
* does not wait. Otherwise, if interrupted while waiting, this
* method stops all executing tasks as if by invoking {@link
* #shutdownNow()}. It then continues to wait until all actively
* executing tasks have completed. Tasks that were awaiting
* execution are not executed. The interrupt status will be
* re-asserted before this method returns.
*
* @throws SecurityException if a security manager exists and
* shutting down this ExecutorService may manipulate
* threads that the caller is not permitted to modify
* because it does not hold {@link
* java.lang.RuntimePermission}{@code ("modifyThread")},
* or the security manager's {@code checkAccess} method
* denies access.
* @since 19
*/
@Override
public void close() {
if ((config & ISCOMMON) == 0) {
boolean terminated = tryTerminate(false, false);
if (!terminated) {
shutdown();
boolean interrupted = false;
while (!terminated) {
try {
terminated = awaitTermination(1L, TimeUnit.DAYS);
} catch (InterruptedException e) {
if (!interrupted) {
shutdownNow();
interrupted = true;
}
}
}
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}
}
/**
* Interface for extending managed parallelism for tasks running
* in {@link ForkJoinPool}s.

View File

@ -55,7 +55,7 @@ public class ForkJoinPool19Test extends JSR166TestCase {
}
public static Test suite() {
return new TestSuite(ForkJoinPool8Test.class);
return new TestSuite(ForkJoinPool19Test.class);
}
/**
@ -264,21 +264,29 @@ public class ForkJoinPool19Test extends JSR166TestCase {
* lazySubmit submits a task that is not executed until new
* workers are created or it is explicitly joined by a worker.
*/
@SuppressWarnings("removal")
public void testLazySubmit() {
ForkJoinPool p;
try {
p = new ForkJoinPool();
} catch (java.security.AccessControlException e) {
return;
}
FibAction f = new FibAction(8);
RecursiveAction j = new RecursiveAction() {
protected void compute() {
f.join();
}};
RecursiveAction a = new CheckedRecursiveAction() {
protected void realCompute() {
final ForkJoinPool p = mainPool();
FibAction f = new FibAction(8);
p.invoke(new FibAction(8));
p.lazySubmit(f);
checkNotDone(f);
FibAction g = new FibAction(8);
p.submit(g);
g.join();
f.join();
p.invoke(new FibAction(8));
p.invoke(j);
assertEquals(21, f.result);
checkCompletedNormally(f);
}};
testInvokeOnPool(mainPool(), a);
testInvokeOnPool(p, a);
}
/**
@ -492,4 +500,40 @@ public class ForkJoinPool19Test extends JSR166TestCase {
}
}
/**
* Implicitly closing a new pool using try-with-resources terminates it
*/
public void testClose() {
ForkJoinTask f = new FibAction(8);
ForkJoinPool pool = null;
try (ForkJoinPool p = new ForkJoinPool()) {
pool = p;
p.execute(f);
}
checkCompletedNormally(f);
assertTrue(pool != null && pool.isTerminated());
}
/**
* Implicitly closing common pool using try-with-resources has no effect.
*/
public void testCloseCommonPool() {
ForkJoinTask f = new FibAction(8);
ForkJoinPool pool;
try (ForkJoinPool p = pool = ForkJoinPool.commonPool()) {
p.execute(f);
}
assertFalse(pool.isShutdown());
assertFalse(pool.isTerminating());
assertFalse(pool.isTerminated());
String prop = System.getProperty(
"java.util.concurrent.ForkJoinPool.common.parallelism");
if (! "0".equals(prop)) {
f.join();
checkCompletedNormally(f);
}
}
}

View File

@ -1721,19 +1721,17 @@ public class ForkJoinTaskTest extends JSR166TestCase {
}
}
// adaptInterruptible deferred to its own independent change
// https://bugs.openjdk.java.net/browse/JDK-8246587
// /**
// * adaptInterruptible(callable).toString() contains toString of wrapped task
// */
// public void testAdaptInterruptible_Callable_toString() {
// if (testImplementationDetails) {
// Callable<String> c = () -> "";
// ForkJoinTask<String> task = ForkJoinTask.adaptInterruptible(c);
// assertEquals(
// identityString(task) + "[Wrapped task = " + c.toString() + "]",
// task.toString());
// }
// }
/**
* adaptInterruptible(callable).toString() contains toString of wrapped task
*/
public void testAdaptInterruptible_Callable_toString() {
if (testImplementationDetails) {
Callable<String> c = () -> "";
ForkJoinTask<String> task = ForkJoinTask.adaptInterruptible(c);
assertEquals(
identityString(task) + "[Wrapped task = " + c.toString() + "]",
task.toString());
}
}
}