8164983: Improve CountedCompleter code samples; add corresponding tests
Reviewed-by: martin, psandoz, shade
This commit is contained in:
parent
3cb67af31f
commit
d7f3695a90
@ -120,102 +120,114 @@ import java.lang.invoke.VarHandle;
|
||||
* to complete for some elements than others, either because of
|
||||
* intrinsic variation (for example I/O) or auxiliary effects such as
|
||||
* garbage collection. Because CountedCompleters provide their own
|
||||
* continuations, other threads need not block waiting to perform
|
||||
* them.
|
||||
* continuations, other tasks need not block waiting to perform them.
|
||||
*
|
||||
* <p>For example, here is an initial version of a class that uses
|
||||
* divide-by-two recursive decomposition to divide work into single
|
||||
* pieces (leaf tasks). Even when work is split into individual calls,
|
||||
* tree-based techniques are usually preferable to directly forking
|
||||
* leaf tasks, because they reduce inter-thread communication and
|
||||
* improve load balancing. In the recursive case, the second of each
|
||||
* pair of subtasks to finish triggers completion of its parent
|
||||
* <p>For example, here is an initial version of a utility method that
|
||||
* uses divide-by-two recursive decomposition to divide work into
|
||||
* single pieces (leaf tasks). Even when work is split into individual
|
||||
* calls, tree-based techniques are usually preferable to directly
|
||||
* forking leaf tasks, because they reduce inter-thread communication
|
||||
* and improve load balancing. In the recursive case, the second of
|
||||
* each pair of subtasks to finish triggers completion of their parent
|
||||
* (because no result combination is performed, the default no-op
|
||||
* implementation of method {@code onCompletion} is not overridden).
|
||||
* A static utility method sets up the base task and invokes it
|
||||
* (here, implicitly using the {@link ForkJoinPool#commonPool()}).
|
||||
* The utility method sets up the root task and invokes it (here,
|
||||
* implicitly using the {@link ForkJoinPool#commonPool()}). It is
|
||||
* straightforward and reliable (but not optimal) to always set the
|
||||
* pending count to the number of child tasks and call {@code
|
||||
* tryComplete()} immediately before returning.
|
||||
*
|
||||
* <pre> {@code
|
||||
* class MyOperation<E> { void apply(E e) { ... } }
|
||||
*
|
||||
* class ForEach<E> extends CountedCompleter<Void> {
|
||||
*
|
||||
* public static <E> void forEach(E[] array, MyOperation<E> op) {
|
||||
* new ForEach<E>(null, array, op, 0, array.length).invoke();
|
||||
* }
|
||||
*
|
||||
* final E[] array; final MyOperation<E> op; final int lo, hi;
|
||||
* ForEach(CountedCompleter<?> p, E[] array, MyOperation<E> op, int lo, int hi) {
|
||||
* super(p);
|
||||
* this.array = array; this.op = op; this.lo = lo; this.hi = hi;
|
||||
* }
|
||||
*
|
||||
* public void compute() { // version 1
|
||||
* if (hi - lo >= 2) {
|
||||
* int mid = (lo + hi) >>> 1;
|
||||
* setPendingCount(2); // must set pending count before fork
|
||||
* new ForEach(this, array, op, mid, hi).fork(); // right child
|
||||
* new ForEach(this, array, op, lo, mid).fork(); // left child
|
||||
* public static <E> void forEach(E[] array, Consumer<E> action) {
|
||||
* class Task extends CountedCompleter<Void> {
|
||||
* final int lo, hi;
|
||||
* Task(Task parent, int lo, int hi) {
|
||||
* super(parent); this.lo = lo; this.hi = hi;
|
||||
* }
|
||||
*
|
||||
* public void compute() {
|
||||
* if (hi - lo >= 2) {
|
||||
* int mid = (lo + hi) >>> 1;
|
||||
* // must set pending count before fork
|
||||
* setPendingCount(2);
|
||||
* new Task(this, mid, hi).fork(); // right child
|
||||
* new Task(this, lo, mid).fork(); // left child
|
||||
* }
|
||||
* else if (hi > lo)
|
||||
* action.accept(array[lo]);
|
||||
* tryComplete();
|
||||
* }
|
||||
* else if (hi > lo)
|
||||
* op.apply(array[lo]);
|
||||
* tryComplete();
|
||||
* }
|
||||
* new Task(null, 0, array.length).invoke();
|
||||
* }}</pre>
|
||||
*
|
||||
* This design can be improved by noticing that in the recursive case,
|
||||
* the task has nothing to do after forking its right task, so can
|
||||
* directly invoke its left task before returning. (This is an analog
|
||||
* of tail recursion removal.) Also, because the task returns upon
|
||||
* executing its left task (rather than falling through to invoke
|
||||
* {@code tryComplete}) the pending count is set to one:
|
||||
* of tail recursion removal.) Also, when the last action in a task
|
||||
* is to fork or invoke a subtask (a "tail call"), the call to {@code
|
||||
* tryComplete()} can be optimized away, at the cost of making the
|
||||
* pending count look "off by one".
|
||||
*
|
||||
* <pre> {@code
|
||||
* class ForEach<E> ... {
|
||||
* ...
|
||||
* public void compute() { // version 2
|
||||
* if (hi - lo >= 2) {
|
||||
* int mid = (lo + hi) >>> 1;
|
||||
* setPendingCount(1); // only one pending
|
||||
* new ForEach(this, array, op, mid, hi).fork(); // right child
|
||||
* new ForEach(this, array, op, lo, mid).compute(); // direct invoke
|
||||
* }
|
||||
* else {
|
||||
* if (hi > lo)
|
||||
* op.apply(array[lo]);
|
||||
* tryComplete();
|
||||
* }
|
||||
* }
|
||||
* }}</pre>
|
||||
* public void compute() {
|
||||
* if (hi - lo >= 2) {
|
||||
* int mid = (lo + hi) >>> 1;
|
||||
* setPendingCount(1); // looks off by one, but correct!
|
||||
* new Task(this, mid, hi).fork(); // right child
|
||||
* new Task(this, lo, mid).compute(); // direct invoke
|
||||
* } else {
|
||||
* if (hi > lo)
|
||||
* action.accept(array[lo]);
|
||||
* tryComplete();
|
||||
* }
|
||||
* }}</pre>
|
||||
*
|
||||
* As a further optimization, notice that the left task need not even exist.
|
||||
* Instead of creating a new one, we can iterate using the original task,
|
||||
* Instead of creating a new one, we can continue using the original task,
|
||||
* and add a pending count for each fork. Additionally, because no task
|
||||
* in this tree implements an {@link #onCompletion(CountedCompleter)} method,
|
||||
* {@code tryComplete()} can be replaced with {@link #propagateCompletion}.
|
||||
* {@code tryComplete} can be replaced with {@link #propagateCompletion}.
|
||||
*
|
||||
* <pre> {@code
|
||||
* class ForEach<E> ... {
|
||||
* ...
|
||||
* public void compute() { // version 3
|
||||
* int l = lo, h = hi;
|
||||
* while (h - l >= 2) {
|
||||
* int mid = (l + h) >>> 1;
|
||||
* addToPendingCount(1);
|
||||
* new ForEach(this, array, op, mid, h).fork(); // right child
|
||||
* h = mid;
|
||||
* public void compute() {
|
||||
* int n = hi - lo;
|
||||
* for (; n >= 2; n /= 2) {
|
||||
* addToPendingCount(1);
|
||||
* new Task(this, lo + n/2, lo + n).fork();
|
||||
* }
|
||||
* if (n > 0)
|
||||
* action.accept(array[lo]);
|
||||
* propagateCompletion();
|
||||
* }}</pre>
|
||||
*
|
||||
* When pending counts can be precomputed, they can be established in
|
||||
* the constructor:
|
||||
*
|
||||
* <pre> {@code
|
||||
* public static <E> void forEach(E[] array, Consumer<E> action) {
|
||||
* class Task extends CountedCompleter<Void> {
|
||||
* final int lo, hi;
|
||||
* Task(Task parent, int lo, int hi) {
|
||||
* super(parent, 31 - Integer.numberOfLeadingZeros(hi - lo));
|
||||
* this.lo = lo; this.hi = hi;
|
||||
* }
|
||||
*
|
||||
* public void compute() {
|
||||
* for (int n = hi - lo; n >= 2; n /= 2)
|
||||
* new Task(this, lo + n/2, lo + n).fork();
|
||||
* action.accept(array[lo]);
|
||||
* propagateCompletion();
|
||||
* }
|
||||
* if (h > l)
|
||||
* op.apply(array[l]);
|
||||
* propagateCompletion();
|
||||
* }
|
||||
* if (array.length > 0)
|
||||
* new Task(null, 0, array.length).invoke();
|
||||
* }}</pre>
|
||||
*
|
||||
* Additional optimizations of such classes might entail precomputing
|
||||
* pending counts so that they can be established in constructors,
|
||||
* specializing classes for leaf steps, subdividing by say, four,
|
||||
* instead of two per iteration, and using an adaptive threshold
|
||||
* instead of always subdividing down to single elements.
|
||||
* Additional optimizations of such classes might entail specializing
|
||||
* classes for leaf steps, subdividing by say, four, instead of two
|
||||
* per iteration, and using an adaptive threshold instead of always
|
||||
* subdividing down to single elements.
|
||||
*
|
||||
* <p><b>Searching.</b> A tree of CountedCompleters can search for a
|
||||
* value or property in different parts of a data structure, and
|
||||
|
@ -40,9 +40,12 @@ import java.util.concurrent.CountedCompleter;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ForkJoinPool;
|
||||
import java.util.concurrent.ForkJoinTask;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import junit.framework.Test;
|
||||
import junit.framework.TestSuite;
|
||||
@ -1869,4 +1872,115 @@ public class CountedCompleterTest extends JSR166TestCase {
|
||||
testInvokeOnPool(singletonPool(), a);
|
||||
}
|
||||
|
||||
/** CountedCompleter class javadoc code sample, version 1. */
|
||||
public static <E> void forEach1(E[] array, Consumer<E> action) {
|
||||
class Task extends CountedCompleter<Void> {
|
||||
final int lo, hi;
|
||||
Task(Task parent, int lo, int hi) {
|
||||
super(parent); this.lo = lo; this.hi = hi;
|
||||
}
|
||||
|
||||
public void compute() {
|
||||
if (hi - lo >= 2) {
|
||||
int mid = (lo + hi) >>> 1;
|
||||
// must set pending count before fork
|
||||
setPendingCount(2);
|
||||
new Task(this, mid, hi).fork(); // right child
|
||||
new Task(this, lo, mid).fork(); // left child
|
||||
}
|
||||
else if (hi > lo)
|
||||
action.accept(array[lo]);
|
||||
tryComplete();
|
||||
}
|
||||
}
|
||||
new Task(null, 0, array.length).invoke();
|
||||
}
|
||||
|
||||
/** CountedCompleter class javadoc code sample, version 2. */
|
||||
public static <E> void forEach2(E[] array, Consumer<E> action) {
|
||||
class Task extends CountedCompleter<Void> {
|
||||
final int lo, hi;
|
||||
Task(Task parent, int lo, int hi) {
|
||||
super(parent); this.lo = lo; this.hi = hi;
|
||||
}
|
||||
|
||||
public void compute() {
|
||||
if (hi - lo >= 2) {
|
||||
int mid = (lo + hi) >>> 1;
|
||||
setPendingCount(1); // looks off by one, but correct!
|
||||
new Task(this, mid, hi).fork(); // right child
|
||||
new Task(this, lo, mid).compute(); // direct invoke
|
||||
} else {
|
||||
if (hi > lo)
|
||||
action.accept(array[lo]);
|
||||
tryComplete();
|
||||
}
|
||||
}
|
||||
}
|
||||
new Task(null, 0, array.length).invoke();
|
||||
}
|
||||
|
||||
/** CountedCompleter class javadoc code sample, version 3. */
|
||||
public static <E> void forEach3(E[] array, Consumer<E> action) {
|
||||
class Task extends CountedCompleter<Void> {
|
||||
final int lo, hi;
|
||||
Task(Task parent, int lo, int hi) {
|
||||
super(parent); this.lo = lo; this.hi = hi;
|
||||
}
|
||||
|
||||
public void compute() {
|
||||
int n = hi - lo;
|
||||
for (; n >= 2; n /= 2) {
|
||||
addToPendingCount(1);
|
||||
new Task(this, lo + n/2, lo + n).fork();
|
||||
}
|
||||
if (n > 0)
|
||||
action.accept(array[lo]);
|
||||
propagateCompletion();
|
||||
}
|
||||
}
|
||||
new Task(null, 0, array.length).invoke();
|
||||
}
|
||||
|
||||
/** CountedCompleter class javadoc code sample, version 4. */
|
||||
public static <E> void forEach4(E[] array, Consumer<E> action) {
|
||||
class Task extends CountedCompleter<Void> {
|
||||
final int lo, hi;
|
||||
Task(Task parent, int lo, int hi) {
|
||||
super(parent, 31 - Integer.numberOfLeadingZeros(hi - lo));
|
||||
this.lo = lo; this.hi = hi;
|
||||
}
|
||||
|
||||
public void compute() {
|
||||
for (int n = hi - lo; n >= 2; n /= 2)
|
||||
new Task(this, lo + n/2, lo + n).fork();
|
||||
action.accept(array[lo]);
|
||||
propagateCompletion();
|
||||
}
|
||||
}
|
||||
if (array.length > 0)
|
||||
new Task(null, 0, array.length).invoke();
|
||||
}
|
||||
|
||||
void testRecursiveDecomposition(
|
||||
BiConsumer<Integer[], Consumer<Integer>> action) {
|
||||
int n = ThreadLocalRandom.current().nextInt(8);
|
||||
Integer[] a = new Integer[n];
|
||||
for (int i = 0; i < n; i++) a[i] = i + 1;
|
||||
AtomicInteger ai = new AtomicInteger(0);
|
||||
action.accept(a, (x) -> ai.addAndGet(x));
|
||||
assertEquals(n * (n + 1) / 2, ai.get());
|
||||
}
|
||||
|
||||
/**
|
||||
* Variants of divide-by-two recursive decomposition into leaf tasks,
|
||||
* as described in the CountedCompleter class javadoc code samples
|
||||
*/
|
||||
public void testRecursiveDecomposition() {
|
||||
testRecursiveDecomposition(CountedCompleterTest::forEach1);
|
||||
testRecursiveDecomposition(CountedCompleterTest::forEach2);
|
||||
testRecursiveDecomposition(CountedCompleterTest::forEach3);
|
||||
testRecursiveDecomposition(CountedCompleterTest::forEach4);
|
||||
}
|
||||
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user