From cfbed70a62d0ec843f6be24e96bb1f2da9443a39 Mon Sep 17 00:00:00 2001 From: Paul Sandoz Date: Wed, 10 Jul 2013 10:24:38 +0200 Subject: [PATCH] 8020040: Improve and generalize the F/J tasks to handle right or left-balanced trees Co-authored-by: Doug Lea Reviewed-by: briangoetz --- .../util/stream/AbstractShortCircuitTask.java | 53 +++++-- .../java/util/stream/AbstractTask.java | 108 ++++++--------- .../classes/java/util/stream/ForEachOps.java | 129 +++++++++++------- .../share/classes/java/util/stream/Nodes.java | 31 ++--- 4 files changed, 171 insertions(+), 150 deletions(-) diff --git a/jdk/src/share/classes/java/util/stream/AbstractShortCircuitTask.java b/jdk/src/share/classes/java/util/stream/AbstractShortCircuitTask.java index 06da65485dd..7a102e80d8e 100644 --- a/jdk/src/share/classes/java/util/stream/AbstractShortCircuitTask.java +++ b/jdk/src/share/classes/java/util/stream/AbstractShortCircuitTask.java @@ -92,22 +92,51 @@ abstract class AbstractShortCircuitTask rs = spliterator, ls; + long sizeEstimate = rs.estimateSize(); + long sizeThreshold = getTargetSize(sizeEstimate); + boolean forkRight = false; + @SuppressWarnings("unchecked") K task = (K) this; + AtomicReference sr = sharedResult; + R result; + while ((result = sr.get()) == null) { + if (task.taskCanceled()) { + result = task.getEmptyResult(); + break; + } + if (sizeEstimate <= sizeThreshold || (ls = rs.trySplit()) == null) { + result = task.doLeaf(); + break; + } + K leftChild, rightChild, taskToFork; + task.leftChild = leftChild = task.makeChild(ls); + task.rightChild = rightChild = task.makeChild(rs); + task.setPendingCount(1); + if (forkRight) { + forkRight = false; + rs = ls; + task = leftChild; + taskToFork = rightChild; + } + else { + forkRight = true; + task = rightChild; + taskToFork = leftChild; + } + taskToFork.fork(); + sizeEstimate = rs.estimateSize(); } + task.setLocalResult(result); + task.tryComplete(); } + /** * Declares that a globally valid result has been found. If another task has * not already found the answer, the result is installed in diff --git a/jdk/src/share/classes/java/util/stream/AbstractTask.java b/jdk/src/share/classes/java/util/stream/AbstractTask.java index 75fc3a11f60..b8344e84c59 100644 --- a/jdk/src/share/classes/java/util/stream/AbstractTask.java +++ b/jdk/src/share/classes/java/util/stream/AbstractTask.java @@ -102,7 +102,7 @@ abstract class AbstractTask spliterator; /** Target leaf size, common to all tasks in a computation */ - protected final long targetSize; + protected long targetSize; // may be laziliy initialized /** * The left child. @@ -134,7 +134,7 @@ abstract class AbstractTask targetSize); - // @@@ May additionally want to fold in pool characteristics such as surplus task count - } - - /** - * Returns a suggestion whether it is adviseable to split this task based on - * target size and other considerations. - * - * @return {@code true} if a split is advised otherwise {@code false} - */ - public boolean suggestSplit() { - return suggestSplit(spliterator, targetSize); + protected final long getTargetSize(long sizeEstimate) { + long s; + return ((s = targetSize) != 0 ? s : + (targetSize = suggestTargetSize(sizeEstimate))); } /** @@ -285,43 +271,46 @@ abstract class AbstractTask - * Computing will continue for rightmost tasks while a task can be computed - * as determined by {@link #canCompute()} and that task should and can be - * split into left and right tasks. - * - *

- * The rightmost tasks are computed in a loop rather than recursively to - * avoid potential stack overflows when computing with a right-balanced - * tree, such as that produced when splitting with a {@link Spliterator} - * created from an {@link java.util.Iterator}. + *

The method is structured to conserve resources across a + * range of uses. The loop continues with one of the child tasks + * when split, to avoid deep recursion. To cope with spliterators + * that may be systematically biased toward left-heavy or + * right-heavy splits, we alternate which child is forked versus + * continued in the loop. */ @Override - public final void compute() { - @SuppressWarnings("unchecked") - K task = (K) this; - while (task.canCompute()) { - Spliterator split; - if (!task.suggestSplit() || (split = task.spliterator.trySplit()) == null) { - task.setLocalResult(task.doLeaf()); - task.tryComplete(); - return; + public void compute() { + Spliterator rs = spliterator, ls; // right, left spliterators + long sizeEstimate = rs.estimateSize(); + long sizeThreshold = getTargetSize(sizeEstimate); + boolean forkRight = false; + @SuppressWarnings("unchecked") K task = (K) this; + while (sizeEstimate > sizeThreshold && (ls = rs.trySplit()) != null) { + K leftChild, rightChild, taskToFork; + task.leftChild = leftChild = task.makeChild(ls); + task.rightChild = rightChild = task.makeChild(rs); + task.setPendingCount(1); + if (forkRight) { + forkRight = false; + rs = ls; + task = leftChild; + taskToFork = rightChild; } else { - K l = task.leftChild = task.makeChild(split); - K r = task.rightChild = task.makeChild(task.spliterator); - task.spliterator = null; - task.setPendingCount(1); - l.fork(); - task = r; + forkRight = true; + task = rightChild; + taskToFork = leftChild; } + taskToFork.fork(); + sizeEstimate = rs.estimateSize(); } + task.setLocalResult(task.doLeaf()); + task.tryComplete(); } /** @@ -338,21 +327,6 @@ abstract class AbstractTask the output type of the stream pipeline */ - private static abstract class ForEachOp + static abstract class ForEachOp implements TerminalOp, TerminalSink { private final boolean ordered; @@ -169,7 +170,7 @@ final class ForEachOps { // Implementations /** Implementation class for reference streams */ - private static class OfRef extends ForEachOp { + static final class OfRef extends ForEachOp { final Consumer consumer; OfRef(Consumer consumer, boolean ordered) { @@ -184,7 +185,7 @@ final class ForEachOps { } /** Implementation class for {@code IntStream} */ - private static class OfInt extends ForEachOp + static final class OfInt extends ForEachOp implements Sink.OfInt { final IntConsumer consumer; @@ -205,7 +206,7 @@ final class ForEachOps { } /** Implementation class for {@code LongStream} */ - private static class OfLong extends ForEachOp + static final class OfLong extends ForEachOp implements Sink.OfLong { final LongConsumer consumer; @@ -226,7 +227,7 @@ final class ForEachOps { } /** Implementation class for {@code DoubleStream} */ - private static class OfDouble extends ForEachOp + static final class OfDouble extends ForEachOp implements Sink.OfDouble { final DoubleConsumer consumer; @@ -248,20 +249,20 @@ final class ForEachOps { } /** A {@code ForkJoinTask} for performing a parallel for-each operation */ - private static class ForEachTask extends CountedCompleter { + static final class ForEachTask extends CountedCompleter { private Spliterator spliterator; private final Sink sink; private final PipelineHelper helper; - private final long targetSize; + private long targetSize; ForEachTask(PipelineHelper helper, Spliterator spliterator, Sink sink) { super(null); - this.spliterator = spliterator; this.sink = sink; - this.targetSize = AbstractTask.suggestTargetSize(spliterator.estimateSize()); this.helper = helper; + this.spliterator = spliterator; + this.targetSize = 0L; } ForEachTask(ForEachTask parent, Spliterator spliterator) { @@ -272,28 +273,40 @@ final class ForEachOps { this.helper = parent.helper; } + // Similar to AbstractTask but doesn't need to track child tasks public void compute() { + Spliterator rightSplit = spliterator, leftSplit; + long sizeEstimate = rightSplit.estimateSize(), sizeThreshold; + if ((sizeThreshold = targetSize) == 0L) + targetSize = sizeThreshold = AbstractTask.suggestTargetSize(sizeEstimate); boolean isShortCircuit = StreamOpFlag.SHORT_CIRCUIT.isKnown(helper.getStreamAndOpFlags()); - while (true) { - if (isShortCircuit && sink.cancellationRequested()) { - propagateCompletion(); - spliterator = null; - return; + boolean forkRight = false; + Sink taskSink = sink; + ForEachTask task = this; + while (!isShortCircuit || !taskSink.cancellationRequested()) { + if (sizeEstimate <= sizeThreshold || + (leftSplit = rightSplit.trySplit()) == null) { + task.helper.copyInto(taskSink, rightSplit); + break; } - - Spliterator split; - if (!AbstractTask.suggestSplit(spliterator, targetSize) - || (split = spliterator.trySplit()) == null) { - helper.copyInto(sink, spliterator); - propagateCompletion(); - spliterator = null; - return; + ForEachTask leftTask = new ForEachTask<>(task, leftSplit); + task.addToPendingCount(1); + ForEachTask taskToFork; + if (forkRight) { + forkRight = false; + rightSplit = leftSplit; + taskToFork = task; + task = leftTask; } else { - addToPendingCount(1); - new ForEachTask<>(this, split).fork(); + forkRight = true; + taskToFork = leftTask; } + taskToFork.fork(); + sizeEstimate = rightSplit.estimateSize(); } + task.spliterator = null; + task.propagateCompletion(); } } @@ -301,7 +314,7 @@ final class ForEachOps { * A {@code ForkJoinTask} for performing a parallel for-each operation * which visits the elements in encounter order */ - private static class ForEachOrderedTask extends CountedCompleter { + static final class ForEachOrderedTask extends CountedCompleter { private final PipelineHelper helper; private Spliterator spliterator; private final long targetSize; @@ -343,39 +356,49 @@ final class ForEachOps { } private static void doCompute(ForEachOrderedTask task) { - while (true) { - Spliterator split; - if (!AbstractTask.suggestSplit(task.spliterator, task.targetSize) - || (split = task.spliterator.trySplit()) == null) { - if (task.getPendingCount() == 0) { - task.helper.wrapAndCopyInto(task.action, task.spliterator); - } - else { - Node.Builder nb = task.helper.makeNodeBuilder( - task.helper.exactOutputSizeIfKnown(task.spliterator), - size -> (T[]) new Object[size]); - task.node = task.helper.wrapAndCopyInto(nb, task.spliterator).build(); - } - task.tryComplete(); - return; + Spliterator rightSplit = task.spliterator, leftSplit; + long sizeThreshold = task.targetSize; + boolean forkRight = false; + while (rightSplit.estimateSize() > sizeThreshold && + (leftSplit = rightSplit.trySplit()) != null) { + ForEachOrderedTask leftChild = + new ForEachOrderedTask<>(task, leftSplit, task.leftPredecessor); + ForEachOrderedTask rightChild = + new ForEachOrderedTask<>(task, rightSplit, leftChild); + task.completionMap.put(leftChild, rightChild); + task.addToPendingCount(1); // forking + rightChild.addToPendingCount(1); // right pending on left child + if (task.leftPredecessor != null) { + leftChild.addToPendingCount(1); // left pending on previous subtree, except left spine + if (task.completionMap.replace(task.leftPredecessor, task, leftChild)) + task.addToPendingCount(-1); // transfer my "right child" count to my left child + else + leftChild.addToPendingCount(-1); // left child is ready to go when ready + } + ForEachOrderedTask taskToFork; + if (forkRight) { + forkRight = false; + rightSplit = leftSplit; + task = leftChild; + taskToFork = rightChild; } else { - ForEachOrderedTask leftChild = new ForEachOrderedTask<>(task, split, task.leftPredecessor); - ForEachOrderedTask rightChild = new ForEachOrderedTask<>(task, task.spliterator, leftChild); - task.completionMap.put(leftChild, rightChild); - task.addToPendingCount(1); // forking - rightChild.addToPendingCount(1); // right pending on left child - if (task.leftPredecessor != null) { - leftChild.addToPendingCount(1); // left pending on previous subtree, except left spine - if (task.completionMap.replace(task.leftPredecessor, task, leftChild)) - task.addToPendingCount(-1); // transfer my "right child" count to my left child - else - leftChild.addToPendingCount(-1); // left child is ready to go when ready - } - leftChild.fork(); + forkRight = true; task = rightChild; + taskToFork = leftChild; } + taskToFork.fork(); } + if (task.getPendingCount() == 0) { + task.helper.wrapAndCopyInto(task.action, rightSplit); + } + else { + Node.Builder nb = task.helper.makeNodeBuilder( + task.helper.exactOutputSizeIfKnown(rightSplit), + size -> (T[]) new Object[size]); + task.node = task.helper.wrapAndCopyInto(nb, rightSplit).build(); + } + task.tryComplete(); } @Override diff --git a/jdk/src/share/classes/java/util/stream/Nodes.java b/jdk/src/share/classes/java/util/stream/Nodes.java index f79a88c470b..4edf324a84c 100644 --- a/jdk/src/share/classes/java/util/stream/Nodes.java +++ b/jdk/src/share/classes/java/util/stream/Nodes.java @@ -1829,25 +1829,20 @@ final class Nodes { @Override public void compute() { SizedCollectorTask task = this; - while (true) { - Spliterator leftSplit; - if (!AbstractTask.suggestSplit(task.spliterator, task.targetSize) - || ((leftSplit = task.spliterator.trySplit()) == null)) { - if (task.offset + task.length >= MAX_ARRAY_SIZE) - throw new IllegalArgumentException("Stream size exceeds max array size"); - T_SINK sink = (T_SINK) task; - task.helper.wrapAndCopyInto(sink, task.spliterator); - task.propagateCompletion(); - return; - } - else { - task.setPendingCount(1); - long leftSplitSize = leftSplit.estimateSize(); - task.makeChild(leftSplit, task.offset, leftSplitSize).fork(); - task = task.makeChild(task.spliterator, task.offset + leftSplitSize, - task.length - leftSplitSize); - } + Spliterator rightSplit = spliterator, leftSplit; + while (rightSplit.estimateSize() > task.targetSize && + (leftSplit = rightSplit.trySplit()) != null) { + task.setPendingCount(1); + long leftSplitSize = leftSplit.estimateSize(); + task.makeChild(leftSplit, task.offset, leftSplitSize).fork(); + task = task.makeChild(rightSplit, task.offset + leftSplitSize, + task.length - leftSplitSize); } + if (task.offset + task.length >= MAX_ARRAY_SIZE) + throw new IllegalArgumentException("Stream size exceeds max array size"); + T_SINK sink = (T_SINK) task; + task.helper.wrapAndCopyInto(sink, rightSplit); + task.propagateCompletion(); } abstract K makeChild(Spliterator spliterator, long offset, long size);