8026155: Enhance ForkJoin pool

Reviewed-by: chegar, alanb, ahgross
This commit is contained in:
Doug Lea 2013-12-19 10:31:59 +00:00
parent 9a9add8825
commit 3e91b8de35
2 changed files with 201 additions and 20 deletions

View File

@ -49,6 +49,9 @@ import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.security.AccessControlContext;
import java.security.ProtectionDomain;
import java.security.Permissions;
/**
* An {@link ExecutorService} for running {@link ForkJoinTask}s.
@ -140,6 +143,9 @@ import java.util.concurrent.TimeUnit;
* <li>{@code java.util.concurrent.ForkJoinPool.common.exceptionHandler}
* - the class name of a {@link UncaughtExceptionHandler}
* </ul>
* If a {@link SecurityManager} is present and no factory is
* specified, then the default pool uses a factory supplying
* threads that have no {@link Permissions} enabled.
* The system class loader is used to load these classes.
* Upon any error in establishing these settings, default parameters
* are used. It is possible to disable or limit the use of threads in
@ -501,6 +507,16 @@ public class ForkJoinPool extends AbstractExecutorService {
* task status checks) in inapplicable cases amounts to an odd
* form of limited spin-wait before blocking in ForkJoinTask.join.
*
* As a more appropriate default in managed environments, unless
* overridden by system properties, we use workers of subclass
* InnocuousForkJoinWorkerThread when there is a SecurityManager
* present. These workers have no permissions set, do not belong
* to any user-defined ThreadGroup, and erase all ThreadLocals
* after executing any top-level task (see WorkQueue.runTask). The
* associated mechanics (mainly in ForkJoinWorkerThread) may be
* JVM-dependent and must access particular Thread class fields to
* achieve this effect.
*
* Style notes
* ===========
*
@ -882,6 +898,7 @@ public class ForkJoinPool extends AbstractExecutorService {
*/
final void runTask(ForkJoinTask<?> task) {
if ((currentSteal = task) != null) {
ForkJoinWorkerThread thread;
task.doExec();
ForkJoinTask<?>[] a = array;
int md = mode;
@ -899,6 +916,8 @@ public class ForkJoinPool extends AbstractExecutorService {
t.doExec();
}
}
if ((thread = owner) != null) // no need to do in finally clause
thread.afterTopLevelExec();
}
}
@ -1155,7 +1174,7 @@ public class ForkJoinPool extends AbstractExecutorService {
* Increment for seed generators. See class ThreadLocal for
* explanation.
*/
private static final int SEED_INCREMENT = 0x61c88647;
private static final int SEED_INCREMENT = 0x9e3779b9;
/*
* Bits and masks for control variables
@ -2084,12 +2103,10 @@ public class ForkJoinPool extends AbstractExecutorService {
((c & ~AC_MASK) |
((c & AC_MASK) + AC_UNIT))));
}
if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null) {
(w.currentSteal = t).doExec();
w.currentSteal = ps;
}
if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null)
w.runTask(t);
}
else if (active) { // decrement active count without queuing
else if (active) { // decrement active count without queuing
long nc = ((c = ctl) & ~AC_MASK) | ((c & AC_MASK) - AC_UNIT);
if ((int)(nc >> AC_SHIFT) + parallelism == 0)
break; // bypass decrement-then-increment
@ -3282,8 +3299,7 @@ public class ForkJoinPool extends AbstractExecutorService {
*/
private static ForkJoinPool makeCommonPool() {
int parallelism = -1;
ForkJoinWorkerThreadFactory factory
= defaultForkJoinWorkerThreadFactory;
ForkJoinWorkerThreadFactory factory = null;
UncaughtExceptionHandler handler = null;
try { // ignore exceptions in accessing/parsing properties
String pp = System.getProperty
@ -3302,7 +3318,12 @@ public class ForkJoinPool extends AbstractExecutorService {
getSystemClassLoader().loadClass(hp).newInstance());
} catch (Exception ignore) {
}
if (factory == null) {
if (System.getSecurityManager() == null)
factory = defaultForkJoinWorkerThreadFactory;
else // use security-managed default
factory = new InnocuousForkJoinWorkerThreadFactory();
}
if (parallelism < 0 && // default 1 less than #cores
(parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
parallelism = 1;
@ -3312,4 +3333,38 @@ public class ForkJoinPool extends AbstractExecutorService {
"ForkJoinPool.commonPool-worker-");
}
/**
* Factory for innocuous worker threads
*/
static final class InnocuousForkJoinWorkerThreadFactory
implements ForkJoinWorkerThreadFactory {
/**
* An ACC to restrict permissions for the factory itself.
* The constructed workers have no permissions set.
*/
private static final AccessControlContext innocuousAcc;
static {
Permissions innocuousPerms = new Permissions();
innocuousPerms.add(modifyThreadPermission);
innocuousPerms.add(new RuntimePermission(
"enableContextClassLoaderOverride"));
innocuousPerms.add(new RuntimePermission(
"modifyThreadGroup"));
innocuousAcc = new AccessControlContext(new ProtectionDomain[] {
new ProtectionDomain(null, innocuousPerms)
});
}
public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
return (ForkJoinWorkerThread.InnocuousForkJoinWorkerThread)
java.security.AccessController.doPrivileged(
new java.security.PrivilegedAction<ForkJoinWorkerThread>() {
public ForkJoinWorkerThread run() {
return new ForkJoinWorkerThread.
InnocuousForkJoinWorkerThread(pool);
}}, innocuousAcc);
}
}
}

View File

@ -35,6 +35,9 @@
package java.util.concurrent;
import java.security.AccessControlContext;
import java.security.ProtectionDomain;
/**
* A thread managed by a {@link ForkJoinPool}, which executes
* {@link ForkJoinTask}s.
@ -61,6 +64,10 @@ public class ForkJoinWorkerThread extends Thread {
* completes. This leads to a visibility race, that is tolerated
* by requiring that the workQueue field is only accessed by the
* owning thread.
*
* Support for (non-public) subclass InnocuousForkJoinWorkerThread
* requires that we break quite a lot of encapulation (via Unsafe)
* both here and in the subclass to access and set Thread fields.
*/
final ForkJoinPool pool; // the pool this thread works in
@ -79,6 +86,18 @@ public class ForkJoinWorkerThread extends Thread {
this.workQueue = pool.registerWorker(this);
}
/**
* Version for InnocuousForkJoinWorkerThread
*/
ForkJoinWorkerThread(ForkJoinPool pool, ThreadGroup threadGroup,
AccessControlContext acc) {
super(threadGroup, null, "aForkJoinWorkerThread");
U.putOrderedObject(this, INHERITEDACCESSCONTROLCONTEXT, acc);
eraseThreadLocals(); // clear before registering
this.pool = pool;
this.workQueue = pool.registerWorker(this);
}
/**
* Returns the pool hosting this thread.
*
@ -131,21 +150,128 @@ public class ForkJoinWorkerThread extends Thread {
* {@link ForkJoinTask}s.
*/
public void run() {
Throwable exception = null;
try {
onStart();
pool.runWorker(workQueue);
} catch (Throwable ex) {
exception = ex;
} finally {
if (workQueue.array == null) { // only run once
Throwable exception = null;
try {
onTermination(exception);
onStart();
pool.runWorker(workQueue);
} catch (Throwable ex) {
if (exception == null)
exception = ex;
exception = ex;
} finally {
pool.deregisterWorker(this, exception);
try {
onTermination(exception);
} catch (Throwable ex) {
if (exception == null)
exception = ex;
} finally {
pool.deregisterWorker(this, exception);
}
}
}
}
/**
* Erases ThreadLocals by nulling out Thread maps
*/
final void eraseThreadLocals() {
U.putObject(this, THREADLOCALS, null);
U.putObject(this, INHERITABLETHREADLOCALS, null);
}
/**
* Non-public hook method for InnocuousForkJoinWorkerThread
*/
void afterTopLevelExec() {
}
// Set up to allow setting thread fields in constructor
private static final sun.misc.Unsafe U;
private static final long THREADLOCALS;
private static final long INHERITABLETHREADLOCALS;
private static final long INHERITEDACCESSCONTROLCONTEXT;
static {
try {
U = sun.misc.Unsafe.getUnsafe();
Class<?> tk = Thread.class;
THREADLOCALS = U.objectFieldOffset
(tk.getDeclaredField("threadLocals"));
INHERITABLETHREADLOCALS = U.objectFieldOffset
(tk.getDeclaredField("inheritableThreadLocals"));
INHERITEDACCESSCONTROLCONTEXT = U.objectFieldOffset
(tk.getDeclaredField("inheritedAccessControlContext"));
} catch (Exception e) {
throw new Error(e);
}
}
/**
* A worker thread that has no permissions, is not a member of any
* user-defined ThreadGroup, and erases all ThreadLocals after
* running each top-level task.
*/
static final class InnocuousForkJoinWorkerThread extends ForkJoinWorkerThread {
/** The ThreadGroup for all InnocuousForkJoinWorkerThreads */
private static final ThreadGroup innocuousThreadGroup =
createThreadGroup();
/** An AccessControlContext supporting no privileges */
private static final AccessControlContext INNOCUOUS_ACC =
new AccessControlContext(
new ProtectionDomain[] {
new ProtectionDomain(null, null)
});
InnocuousForkJoinWorkerThread(ForkJoinPool pool) {
super(pool, innocuousThreadGroup, INNOCUOUS_ACC);
}
@Override // to erase ThreadLocals
void afterTopLevelExec() {
eraseThreadLocals();
}
@Override // to always report system loader
public ClassLoader getContextClassLoader() {
return ClassLoader.getSystemClassLoader();
}
@Override // to silently fail
public void setUncaughtExceptionHandler(UncaughtExceptionHandler x) { }
@Override // paranoically
public void setContextClassLoader(ClassLoader cl) {
throw new SecurityException("setContextClassLoader");
}
/**
* Returns a new group with the system ThreadGroup (the
* topmost, parentless group) as parent. Uses Unsafe to
* traverse Thread group and ThreadGroup parent fields.
*/
private static ThreadGroup createThreadGroup() {
try {
sun.misc.Unsafe u = sun.misc.Unsafe.getUnsafe();
Class<?> tk = Thread.class;
Class<?> gk = ThreadGroup.class;
long tg = u.objectFieldOffset(tk.getDeclaredField("group"));
long gp = u.objectFieldOffset(gk.getDeclaredField("parent"));
ThreadGroup group = (ThreadGroup)
u.getObject(Thread.currentThread(), tg);
while (group != null) {
ThreadGroup parent = (ThreadGroup)u.getObject(group, gp);
if (parent == null)
return new ThreadGroup(group,
"InnocuousForkJoinWorkerThreadGroup");
group = parent;
}
} catch (Exception e) {
throw new Error(e);
}
// fall through if null as cannot-happen safeguard
throw new Error("Cannot create ThreadGroup");
}
}
}