8020040: Improve and generalize the F/J tasks to handle right or left-balanced trees

Co-authored-by: Doug Lea <dl@cs.oswego.edu>
Reviewed-by: briangoetz
This commit is contained in:
Paul Sandoz 2013-07-10 10:24:38 +02:00
parent b47a003232
commit cfbed70a62
4 changed files with 171 additions and 150 deletions

View File

@ -92,21 +92,50 @@ abstract class AbstractShortCircuitTask<P_IN, P_OUT, R,
*/ */
protected abstract R getEmptyResult(); protected abstract R getEmptyResult();
/**
* Overrides AbstractTask version to include checks for early
* exits while splitting or computing.
*/
@Override @Override
protected boolean canCompute() { public void compute() {
// Have we already found an answer? Spliterator<P_IN> rs = spliterator, ls;
if (sharedResult.get() != null) { long sizeEstimate = rs.estimateSize();
tryComplete(); long sizeThreshold = getTargetSize(sizeEstimate);
return false; boolean forkRight = false;
} else if (taskCanceled()) { @SuppressWarnings("unchecked") K task = (K) this;
setLocalResult(getEmptyResult()); AtomicReference<R> sr = sharedResult;
tryComplete(); R result;
return false; while ((result = sr.get()) == null) {
if (task.taskCanceled()) {
result = task.getEmptyResult();
break;
}
if (sizeEstimate <= sizeThreshold || (ls = rs.trySplit()) == null) {
result = task.doLeaf();
break;
}
K leftChild, rightChild, taskToFork;
task.leftChild = leftChild = task.makeChild(ls);
task.rightChild = rightChild = task.makeChild(rs);
task.setPendingCount(1);
if (forkRight) {
forkRight = false;
rs = ls;
task = leftChild;
taskToFork = rightChild;
} }
else { else {
return true; forkRight = true;
task = rightChild;
taskToFork = leftChild;
} }
taskToFork.fork();
sizeEstimate = rs.estimateSize();
} }
task.setLocalResult(result);
task.tryComplete();
}
/** /**
* Declares that a globally valid result has been found. If another task has * Declares that a globally valid result has been found. If another task has

View File

@ -102,7 +102,7 @@ abstract class AbstractTask<P_IN, P_OUT, R,
protected Spliterator<P_IN> spliterator; protected Spliterator<P_IN> spliterator;
/** Target leaf size, common to all tasks in a computation */ /** Target leaf size, common to all tasks in a computation */
protected final long targetSize; protected long targetSize; // may be laziliy initialized
/** /**
* The left child. * The left child.
@ -134,7 +134,7 @@ abstract class AbstractTask<P_IN, P_OUT, R,
super(null); super(null);
this.helper = helper; this.helper = helper;
this.spliterator = spliterator; this.spliterator = spliterator;
this.targetSize = suggestTargetSize(spliterator.estimateSize()); this.targetSize = 0L;
} }
/** /**
@ -182,27 +182,13 @@ abstract class AbstractTask<P_IN, P_OUT, R,
} }
/** /**
* Returns a suggestion whether it is advisable to split the provided * Returns the targetSize, initializing it via the supplied
* spliterator based on target size and other considerations, such as pool * size estimate if not already initialized.
* state.
*
* @return {@code true} if a split is advised otherwise {@code false}
*/ */
public static boolean suggestSplit(Spliterator spliterator, protected final long getTargetSize(long sizeEstimate) {
long targetSize) { long s;
long remaining = spliterator.estimateSize(); return ((s = targetSize) != 0 ? s :
return (remaining > targetSize); (targetSize = suggestTargetSize(sizeEstimate)));
// @@@ May additionally want to fold in pool characteristics such as surplus task count
}
/**
* Returns a suggestion whether it is adviseable to split this task based on
* target size and other considerations.
*
* @return {@code true} if a split is advised otherwise {@code false}
*/
public boolean suggestSplit() {
return suggestSplit(spliterator, targetSize);
} }
/** /**
@ -285,43 +271,46 @@ abstract class AbstractTask<P_IN, P_OUT, R,
} }
/** /**
* Decides whether or not to split a task further or compute it directly. If * Decides whether or not to split a task further or compute it
* computing directly, call {@code doLeaf} and pass the result to * directly. If computing directly, calls {@code doLeaf} and pass
* {@code setRawResult}. If splitting, set up the child-related fields, * the result to {@code setRawResult}. Otherwise splits off
* create the child tasks, fork the leftmost (prefix) child tasks, and * subtasks, forking one and continuing as the other.
* compute the rightmost (remaining) child tasks.
* *
* <p> * <p> The method is structured to conserve resources across a
* Computing will continue for rightmost tasks while a task can be computed * range of uses. The loop continues with one of the child tasks
* as determined by {@link #canCompute()} and that task should and can be * when split, to avoid deep recursion. To cope with spliterators
* split into left and right tasks. * that may be systematically biased toward left-heavy or
* * right-heavy splits, we alternate which child is forked versus
* <p> * continued in the loop.
* The rightmost tasks are computed in a loop rather than recursively to
* avoid potential stack overflows when computing with a right-balanced
* tree, such as that produced when splitting with a {@link Spliterator}
* created from an {@link java.util.Iterator}.
*/ */
@Override @Override
public final void compute() { public void compute() {
@SuppressWarnings("unchecked") Spliterator<P_IN> rs = spliterator, ls; // right, left spliterators
K task = (K) this; long sizeEstimate = rs.estimateSize();
while (task.canCompute()) { long sizeThreshold = getTargetSize(sizeEstimate);
Spliterator<P_IN> split; boolean forkRight = false;
if (!task.suggestSplit() || (split = task.spliterator.trySplit()) == null) { @SuppressWarnings("unchecked") K task = (K) this;
task.setLocalResult(task.doLeaf()); while (sizeEstimate > sizeThreshold && (ls = rs.trySplit()) != null) {
task.tryComplete(); K leftChild, rightChild, taskToFork;
return; task.leftChild = leftChild = task.makeChild(ls);
task.rightChild = rightChild = task.makeChild(rs);
task.setPendingCount(1);
if (forkRight) {
forkRight = false;
rs = ls;
task = leftChild;
taskToFork = rightChild;
} }
else { else {
K l = task.leftChild = task.makeChild(split); forkRight = true;
K r = task.rightChild = task.makeChild(task.spliterator); task = rightChild;
task.spliterator = null; taskToFork = leftChild;
task.setPendingCount(1);
l.fork();
task = r;
} }
taskToFork.fork();
sizeEstimate = rs.estimateSize();
} }
task.setLocalResult(task.doLeaf());
task.tryComplete();
} }
/** /**
@ -338,21 +327,6 @@ abstract class AbstractTask<P_IN, P_OUT, R,
leftChild = rightChild = null; leftChild = rightChild = null;
} }
/**
* Determines if the task can be computed.
*
* @implSpec The default always returns true
*
* @return {@code true} if this task can be computed to either calculate the
* leaf via {@link #doLeaf()} or split, otherwise false if this task
* cannot be computed, for example if this task has been canceled
* and/or a result for the computation has been found by another
* task.
*/
protected boolean canCompute() {
return true;
}
/** /**
* Returns whether this node is a "leftmost" node -- whether the path from * Returns whether this node is a "leftmost" node -- whether the path from
* the root to this node involves only traversing leftmost child links. For * the root to this node involves only traversing leftmost child links. For

View File

@ -28,6 +28,7 @@ import java.util.Objects;
import java.util.Spliterator; import java.util.Spliterator;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountedCompleter; import java.util.concurrent.CountedCompleter;
import java.util.concurrent.ForkJoinTask;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.DoubleConsumer; import java.util.function.DoubleConsumer;
import java.util.function.IntConsumer; import java.util.function.IntConsumer;
@ -128,7 +129,7 @@ final class ForEachOps {
* *
* @param <T> the output type of the stream pipeline * @param <T> the output type of the stream pipeline
*/ */
private static abstract class ForEachOp<T> static abstract class ForEachOp<T>
implements TerminalOp<T, Void>, TerminalSink<T, Void> { implements TerminalOp<T, Void>, TerminalSink<T, Void> {
private final boolean ordered; private final boolean ordered;
@ -169,7 +170,7 @@ final class ForEachOps {
// Implementations // Implementations
/** Implementation class for reference streams */ /** Implementation class for reference streams */
private static class OfRef<T> extends ForEachOp<T> { static final class OfRef<T> extends ForEachOp<T> {
final Consumer<? super T> consumer; final Consumer<? super T> consumer;
OfRef(Consumer<? super T> consumer, boolean ordered) { OfRef(Consumer<? super T> consumer, boolean ordered) {
@ -184,7 +185,7 @@ final class ForEachOps {
} }
/** Implementation class for {@code IntStream} */ /** Implementation class for {@code IntStream} */
private static class OfInt extends ForEachOp<Integer> static final class OfInt extends ForEachOp<Integer>
implements Sink.OfInt { implements Sink.OfInt {
final IntConsumer consumer; final IntConsumer consumer;
@ -205,7 +206,7 @@ final class ForEachOps {
} }
/** Implementation class for {@code LongStream} */ /** Implementation class for {@code LongStream} */
private static class OfLong extends ForEachOp<Long> static final class OfLong extends ForEachOp<Long>
implements Sink.OfLong { implements Sink.OfLong {
final LongConsumer consumer; final LongConsumer consumer;
@ -226,7 +227,7 @@ final class ForEachOps {
} }
/** Implementation class for {@code DoubleStream} */ /** Implementation class for {@code DoubleStream} */
private static class OfDouble extends ForEachOp<Double> static final class OfDouble extends ForEachOp<Double>
implements Sink.OfDouble { implements Sink.OfDouble {
final DoubleConsumer consumer; final DoubleConsumer consumer;
@ -248,20 +249,20 @@ final class ForEachOps {
} }
/** A {@code ForkJoinTask} for performing a parallel for-each operation */ /** A {@code ForkJoinTask} for performing a parallel for-each operation */
private static class ForEachTask<S, T> extends CountedCompleter<Void> { static final class ForEachTask<S, T> extends CountedCompleter<Void> {
private Spliterator<S> spliterator; private Spliterator<S> spliterator;
private final Sink<S> sink; private final Sink<S> sink;
private final PipelineHelper<T> helper; private final PipelineHelper<T> helper;
private final long targetSize; private long targetSize;
ForEachTask(PipelineHelper<T> helper, ForEachTask(PipelineHelper<T> helper,
Spliterator<S> spliterator, Spliterator<S> spliterator,
Sink<S> sink) { Sink<S> sink) {
super(null); super(null);
this.spliterator = spliterator;
this.sink = sink; this.sink = sink;
this.targetSize = AbstractTask.suggestTargetSize(spliterator.estimateSize());
this.helper = helper; this.helper = helper;
this.spliterator = spliterator;
this.targetSize = 0L;
} }
ForEachTask(ForEachTask<S, T> parent, Spliterator<S> spliterator) { ForEachTask(ForEachTask<S, T> parent, Spliterator<S> spliterator) {
@ -272,28 +273,40 @@ final class ForEachOps {
this.helper = parent.helper; this.helper = parent.helper;
} }
// Similar to AbstractTask but doesn't need to track child tasks
public void compute() { public void compute() {
Spliterator<S> rightSplit = spliterator, leftSplit;
long sizeEstimate = rightSplit.estimateSize(), sizeThreshold;
if ((sizeThreshold = targetSize) == 0L)
targetSize = sizeThreshold = AbstractTask.suggestTargetSize(sizeEstimate);
boolean isShortCircuit = StreamOpFlag.SHORT_CIRCUIT.isKnown(helper.getStreamAndOpFlags()); boolean isShortCircuit = StreamOpFlag.SHORT_CIRCUIT.isKnown(helper.getStreamAndOpFlags());
while (true) { boolean forkRight = false;
if (isShortCircuit && sink.cancellationRequested()) { Sink<S> taskSink = sink;
propagateCompletion(); ForEachTask<S, T> task = this;
spliterator = null; while (!isShortCircuit || !taskSink.cancellationRequested()) {
return; if (sizeEstimate <= sizeThreshold ||
(leftSplit = rightSplit.trySplit()) == null) {
task.helper.copyInto(taskSink, rightSplit);
break;
} }
ForEachTask<S, T> leftTask = new ForEachTask<>(task, leftSplit);
Spliterator<S> split; task.addToPendingCount(1);
if (!AbstractTask.suggestSplit(spliterator, targetSize) ForEachTask<S, T> taskToFork;
|| (split = spliterator.trySplit()) == null) { if (forkRight) {
helper.copyInto(sink, spliterator); forkRight = false;
propagateCompletion(); rightSplit = leftSplit;
spliterator = null; taskToFork = task;
return; task = leftTask;
} }
else { else {
addToPendingCount(1); forkRight = true;
new ForEachTask<>(this, split).fork(); taskToFork = leftTask;
} }
taskToFork.fork();
sizeEstimate = rightSplit.estimateSize();
} }
task.spliterator = null;
task.propagateCompletion();
} }
} }
@ -301,7 +314,7 @@ final class ForEachOps {
* A {@code ForkJoinTask} for performing a parallel for-each operation * A {@code ForkJoinTask} for performing a parallel for-each operation
* which visits the elements in encounter order * which visits the elements in encounter order
*/ */
private static class ForEachOrderedTask<S, T> extends CountedCompleter<Void> { static final class ForEachOrderedTask<S, T> extends CountedCompleter<Void> {
private final PipelineHelper<T> helper; private final PipelineHelper<T> helper;
private Spliterator<S> spliterator; private Spliterator<S> spliterator;
private final long targetSize; private final long targetSize;
@ -343,25 +356,15 @@ final class ForEachOps {
} }
private static <S, T> void doCompute(ForEachOrderedTask<S, T> task) { private static <S, T> void doCompute(ForEachOrderedTask<S, T> task) {
while (true) { Spliterator<S> rightSplit = task.spliterator, leftSplit;
Spliterator<S> split; long sizeThreshold = task.targetSize;
if (!AbstractTask.suggestSplit(task.spliterator, task.targetSize) boolean forkRight = false;
|| (split = task.spliterator.trySplit()) == null) { while (rightSplit.estimateSize() > sizeThreshold &&
if (task.getPendingCount() == 0) { (leftSplit = rightSplit.trySplit()) != null) {
task.helper.wrapAndCopyInto(task.action, task.spliterator); ForEachOrderedTask<S, T> leftChild =
} new ForEachOrderedTask<>(task, leftSplit, task.leftPredecessor);
else { ForEachOrderedTask<S, T> rightChild =
Node.Builder<T> nb = task.helper.makeNodeBuilder( new ForEachOrderedTask<>(task, rightSplit, leftChild);
task.helper.exactOutputSizeIfKnown(task.spliterator),
size -> (T[]) new Object[size]);
task.node = task.helper.wrapAndCopyInto(nb, task.spliterator).build();
}
task.tryComplete();
return;
}
else {
ForEachOrderedTask<S, T> leftChild = new ForEachOrderedTask<>(task, split, task.leftPredecessor);
ForEachOrderedTask<S, T> rightChild = new ForEachOrderedTask<>(task, task.spliterator, leftChild);
task.completionMap.put(leftChild, rightChild); task.completionMap.put(leftChild, rightChild);
task.addToPendingCount(1); // forking task.addToPendingCount(1); // forking
rightChild.addToPendingCount(1); // right pending on left child rightChild.addToPendingCount(1); // right pending on left child
@ -372,10 +375,30 @@ final class ForEachOps {
else else
leftChild.addToPendingCount(-1); // left child is ready to go when ready leftChild.addToPendingCount(-1); // left child is ready to go when ready
} }
leftChild.fork(); ForEachOrderedTask<S, T> taskToFork;
if (forkRight) {
forkRight = false;
rightSplit = leftSplit;
task = leftChild;
taskToFork = rightChild;
}
else {
forkRight = true;
task = rightChild; task = rightChild;
taskToFork = leftChild;
} }
taskToFork.fork();
} }
if (task.getPendingCount() == 0) {
task.helper.wrapAndCopyInto(task.action, rightSplit);
}
else {
Node.Builder<T> nb = task.helper.makeNodeBuilder(
task.helper.exactOutputSizeIfKnown(rightSplit),
size -> (T[]) new Object[size]);
task.node = task.helper.wrapAndCopyInto(nb, rightSplit).build();
}
task.tryComplete();
} }
@Override @Override

View File

@ -1829,25 +1829,20 @@ final class Nodes {
@Override @Override
public void compute() { public void compute() {
SizedCollectorTask<P_IN, P_OUT, T_SINK, K> task = this; SizedCollectorTask<P_IN, P_OUT, T_SINK, K> task = this;
while (true) { Spliterator<P_IN> rightSplit = spliterator, leftSplit;
Spliterator<P_IN> leftSplit; while (rightSplit.estimateSize() > task.targetSize &&
if (!AbstractTask.suggestSplit(task.spliterator, task.targetSize) (leftSplit = rightSplit.trySplit()) != null) {
|| ((leftSplit = task.spliterator.trySplit()) == null)) {
if (task.offset + task.length >= MAX_ARRAY_SIZE)
throw new IllegalArgumentException("Stream size exceeds max array size");
T_SINK sink = (T_SINK) task;
task.helper.wrapAndCopyInto(sink, task.spliterator);
task.propagateCompletion();
return;
}
else {
task.setPendingCount(1); task.setPendingCount(1);
long leftSplitSize = leftSplit.estimateSize(); long leftSplitSize = leftSplit.estimateSize();
task.makeChild(leftSplit, task.offset, leftSplitSize).fork(); task.makeChild(leftSplit, task.offset, leftSplitSize).fork();
task = task.makeChild(task.spliterator, task.offset + leftSplitSize, task = task.makeChild(rightSplit, task.offset + leftSplitSize,
task.length - leftSplitSize); task.length - leftSplitSize);
} }
} if (task.offset + task.length >= MAX_ARRAY_SIZE)
throw new IllegalArgumentException("Stream size exceeds max array size");
T_SINK sink = (T_SINK) task;
task.helper.wrapAndCopyInto(sink, rightSplit);
task.propagateCompletion();
} }
abstract K makeChild(Spliterator<P_IN> spliterator, long offset, long size); abstract K makeChild(Spliterator<P_IN> spliterator, long offset, long size);