8280915: Better parallelization for AbstractSpliterator and IteratorSpliterator when size is unknown

Reviewed-by: psandoz
This commit is contained in:
Tagir F. Valeev 2022-04-15 15:18:38 +00:00
parent 53580b336a
commit 4cc8eccfca
3 changed files with 212 additions and 34 deletions

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2013, 2021, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2013, 2022, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -945,6 +945,7 @@ public final class Spliterators {
private int index; // current index, modified on advance/split
private final int fence; // one past last index
private final int characteristics;
private long estimatedSize; // estimated size, to help to split evenly
/**
* Creates a spliterator covering all of the given array.
@ -971,14 +972,27 @@ public final class Spliterators {
this.index = origin;
this.fence = fence;
this.characteristics = additionalCharacteristics | Spliterator.SIZED | Spliterator.SUBSIZED;
this.estimatedSize = -1;
}
private ArraySpliterator(Object[] array, int origin, int fence, int characteristics, long estimatedSize) {
this.array = array;
this.index = origin;
this.fence = fence;
this.characteristics = characteristics & ~(Spliterator.SIZED | Spliterator.SUBSIZED);
this.estimatedSize = estimatedSize;
}
@Override
public Spliterator<T> trySplit() {
int lo = index, mid = (lo + fence) >>> 1;
return (lo >= mid)
? null
: new ArraySpliterator<>(array, lo, index = mid, characteristics);
if (lo >= mid) return null;
if (estimatedSize == -1) {
return new ArraySpliterator<>(array, lo, index = mid, characteristics);
}
long prefixEstimatedSize = estimatedSize >>> 1;
estimatedSize -= prefixEstimatedSize;
return new ArraySpliterator<>(array, lo, index = mid, characteristics, prefixEstimatedSize);
}
@SuppressWarnings("unchecked")
@ -1006,7 +1020,9 @@ public final class Spliterators {
}
@Override
public long estimateSize() { return (long)(fence - index); }
public long estimateSize() {
return estimatedSize >= 0 ? estimatedSize : (long)(fence - index);
}
@Override
public int characteristics() {
@ -1030,6 +1046,7 @@ public final class Spliterators {
private int index; // current index, modified on advance/split
private final int fence; // one past last index
private final int characteristics;
private long estimatedSize; // estimated size, to help to split evenly
/**
* Creates a spliterator covering all of the given array.
@ -1056,14 +1073,27 @@ public final class Spliterators {
this.index = origin;
this.fence = fence;
this.characteristics = additionalCharacteristics | Spliterator.SIZED | Spliterator.SUBSIZED;
this.estimatedSize = -1;
}
private IntArraySpliterator(int[] array, int origin, int fence, int characteristics, long estimatedSize) {
this.array = array;
this.index = origin;
this.fence = fence;
this.characteristics = characteristics & ~(Spliterator.SIZED | Spliterator.SUBSIZED);
this.estimatedSize = estimatedSize;
}
@Override
public OfInt trySplit() {
int lo = index, mid = (lo + fence) >>> 1;
return (lo >= mid)
? null
: new IntArraySpliterator(array, lo, index = mid, characteristics);
if (lo >= mid) return null;
if (estimatedSize == -1) {
return new IntArraySpliterator(array, lo, index = mid, characteristics);
}
long prefixEstimatedSize = estimatedSize >>> 1;
estimatedSize -= prefixEstimatedSize;
return new IntArraySpliterator(array, lo, index = mid, characteristics, prefixEstimatedSize);
}
@Override
@ -1089,7 +1119,9 @@ public final class Spliterators {
}
@Override
public long estimateSize() { return (long)(fence - index); }
public long estimateSize() {
return estimatedSize >= 0 ? estimatedSize : (long)(fence - index);
}
@Override
public int characteristics() {
@ -1113,6 +1145,7 @@ public final class Spliterators {
private int index; // current index, modified on advance/split
private final int fence; // one past last index
private final int characteristics;
private long estimatedSize; // estimated size, to help to split evenly
/**
* Creates a spliterator covering all of the given array.
@ -1139,14 +1172,27 @@ public final class Spliterators {
this.index = origin;
this.fence = fence;
this.characteristics = additionalCharacteristics | Spliterator.SIZED | Spliterator.SUBSIZED;
this.estimatedSize = -1;
}
private LongArraySpliterator(long[] array, int origin, int fence, int characteristics, long estimatedSize) {
this.array = array;
this.index = origin;
this.fence = fence;
this.characteristics = characteristics & ~(Spliterator.SIZED | Spliterator.SUBSIZED);
this.estimatedSize = estimatedSize;
}
@Override
public OfLong trySplit() {
int lo = index, mid = (lo + fence) >>> 1;
return (lo >= mid)
? null
: new LongArraySpliterator(array, lo, index = mid, characteristics);
if (lo >= mid) return null;
if (estimatedSize == -1) {
return new LongArraySpliterator(array, lo, index = mid, characteristics);
}
long prefixEstimatedSize = estimatedSize >>> 1;
estimatedSize -= prefixEstimatedSize;
return new LongArraySpliterator(array, lo, index = mid, characteristics, prefixEstimatedSize);
}
@Override
@ -1172,7 +1218,9 @@ public final class Spliterators {
}
@Override
public long estimateSize() { return (long)(fence - index); }
public long estimateSize() {
return estimatedSize >= 0 ? estimatedSize : (long)(fence - index);
}
@Override
public int characteristics() {
@ -1196,6 +1244,7 @@ public final class Spliterators {
private int index; // current index, modified on advance/split
private final int fence; // one past last index
private final int characteristics;
private long estimatedSize; // estimated size, to help to split evenly
/**
* Creates a spliterator covering all of the given array.
@ -1222,14 +1271,27 @@ public final class Spliterators {
this.index = origin;
this.fence = fence;
this.characteristics = additionalCharacteristics | Spliterator.SIZED | Spliterator.SUBSIZED;
this.estimatedSize = -1;
}
private DoubleArraySpliterator(double[] array, int origin, int fence, int characteristics, long estimatedSize) {
this.array = array;
this.index = origin;
this.fence = fence;
this.characteristics = characteristics & ~(Spliterator.SIZED | Spliterator.SUBSIZED);
this.estimatedSize = estimatedSize;
}
@Override
public OfDouble trySplit() {
int lo = index, mid = (lo + fence) >>> 1;
return (lo >= mid)
? null
: new DoubleArraySpliterator(array, lo, index = mid, characteristics);
if (lo >= mid) return null;
if (estimatedSize == -1) {
return new DoubleArraySpliterator(array, lo, index = mid, characteristics);
}
long prefixEstimatedSize = estimatedSize >>> 1;
estimatedSize -= prefixEstimatedSize;
return new DoubleArraySpliterator(array, lo, index = mid, characteristics, prefixEstimatedSize);
}
@Override
@ -1255,7 +1317,9 @@ public final class Spliterators {
}
@Override
public long estimateSize() { return (long)(fence - index); }
public long estimateSize() {
return estimatedSize >= 0 ? estimatedSize : (long)(fence - index);
}
@Override
public int characteristics() {
@ -1362,9 +1426,11 @@ public final class Spliterators {
int j = 0;
do { a[j] = holder.value; } while (++j < n && tryAdvance(holder));
batch = j;
if (est != Long.MAX_VALUE)
if (est != Long.MAX_VALUE) {
est -= j;
return new ArraySpliterator<>(a, 0, j, characteristics());
return new ArraySpliterator<>(a, 0, j, characteristics);
}
return new ArraySpliterator<>(a, 0, j, characteristics, Long.MAX_VALUE / 2);
}
return null;
}
@ -1472,9 +1538,11 @@ public final class Spliterators {
int j = 0;
do { a[j] = holder.value; } while (++j < n && tryAdvance(holder));
batch = j;
if (est != Long.MAX_VALUE)
if (est != Long.MAX_VALUE) {
est -= j;
return new IntArraySpliterator(a, 0, j, characteristics());
return new IntArraySpliterator(a, 0, j, characteristics);
}
return new IntArraySpliterator(a, 0, j, characteristics, Long.MAX_VALUE / 2);
}
return null;
}
@ -1582,9 +1650,11 @@ public final class Spliterators {
int j = 0;
do { a[j] = holder.value; } while (++j < n && tryAdvance(holder));
batch = j;
if (est != Long.MAX_VALUE)
if (est != Long.MAX_VALUE) {
est -= j;
return new LongArraySpliterator(a, 0, j, characteristics());
return new LongArraySpliterator(a, 0, j, characteristics);
}
return new LongArraySpliterator(a, 0, j, characteristics, Long.MAX_VALUE / 2);
}
return null;
}
@ -1692,9 +1762,11 @@ public final class Spliterators {
int j = 0;
do { a[j] = holder.value; } while (++j < n && tryAdvance(holder));
batch = j;
if (est != Long.MAX_VALUE)
if (est != Long.MAX_VALUE) {
est -= j;
return new DoubleArraySpliterator(a, 0, j, characteristics());
return new DoubleArraySpliterator(a, 0, j, characteristics);
}
return new DoubleArraySpliterator(a, 0, j, characteristics, Long.MAX_VALUE / 2);
}
return null;
}
@ -1827,9 +1899,11 @@ public final class Spliterators {
int j = 0;
do { a[j] = i.next(); } while (++j < n && i.hasNext());
batch = j;
if (est != Long.MAX_VALUE)
if (est != Long.MAX_VALUE) {
est -= j;
return new ArraySpliterator<>(a, 0, j, characteristics);
return new ArraySpliterator<>(a, 0, j, characteristics);
}
return new ArraySpliterator<>(a, 0, j, characteristics, Long.MAX_VALUE / 2);
}
return null;
}
@ -1939,9 +2013,11 @@ public final class Spliterators {
int j = 0;
do { a[j] = i.nextInt(); } while (++j < n && i.hasNext());
batch = j;
if (est != Long.MAX_VALUE)
if (est != Long.MAX_VALUE) {
est -= j;
return new IntArraySpliterator(a, 0, j, characteristics);
return new IntArraySpliterator(a, 0, j, characteristics);
}
return new IntArraySpliterator(a, 0, j, characteristics, Long.MAX_VALUE / 2);
}
return null;
}
@ -2033,9 +2109,11 @@ public final class Spliterators {
int j = 0;
do { a[j] = i.nextLong(); } while (++j < n && i.hasNext());
batch = j;
if (est != Long.MAX_VALUE)
if (est != Long.MAX_VALUE) {
est -= j;
return new LongArraySpliterator(a, 0, j, characteristics);
return new LongArraySpliterator(a, 0, j, characteristics);
}
return new LongArraySpliterator(a, 0, j, characteristics, Long.MAX_VALUE / 2);
}
return null;
}
@ -2127,9 +2205,11 @@ public final class Spliterators {
int j = 0;
do { a[j] = i.nextDouble(); } while (++j < n && i.hasNext());
batch = j;
if (est != Long.MAX_VALUE)
if (est != Long.MAX_VALUE) {
est -= j;
return new DoubleArraySpliterator(a, 0, j, characteristics);
return new DoubleArraySpliterator(a, 0, j, characteristics);
}
return new DoubleArraySpliterator(a, 0, j, characteristics, Long.MAX_VALUE / 2);
}
return null;
}

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2016, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2016, 2022, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -30,6 +30,7 @@ package org.openjdk.tests.java.util.stream;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collector;
import java.util.stream.DoubleStream;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
@ -97,4 +98,21 @@ public class IterateTest extends OpTestCase {
checkNPE(() -> DoubleStream.iterate(0, null, x -> x + 1));
checkNPE(() -> DoubleStream.iterate(0, x -> x < 10, null));
}
@Test
public void testParallelize() {
checkHasSplit(Stream.iterate(0, x -> x < 10, x -> x + 1));
checkHasSplit(IntStream.iterate(0, x -> x < 10, x -> x + 1).boxed());
checkHasSplit(LongStream.iterate(0, x -> x < 10, x -> x + 1).boxed());
checkHasSplit(DoubleStream.iterate(0, x -> x < 10, x -> x + 1).boxed());
}
private void checkHasSplit(Stream<?> stream) {
int[] numberOfNonEmptyParts = stream.parallel().collect(
Collector.of(() -> new int[1], (acc, e) -> acc[0] = 1, (acc1, acc2) -> {
acc1[0] += acc2[0];
return acc1;
}));
assertTrue(numberOfNonEmptyParts[0] >= 2);
}
}

View File

@ -0,0 +1,80 @@
/*
* Copyright (c) 2022, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package org.openjdk.bench.java.util.stream;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
import java.math.BigInteger;
import java.util.SplittableRandom;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
/**
* Benchmark for checking Pattern::splitAsStream performance
* with CPU-bound downstream operation in sequential and parallel streams
*
* See JDK-8280915
*/
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
@Warmup(iterations = 5, time = 500, timeUnit = TimeUnit.MILLISECONDS)
@Measurement(iterations = 10, time = 500, timeUnit = TimeUnit.MILLISECONDS)
@Fork(3)
@State(Scope.Thread)
public class PatternSplit {
@Param({"10", "100", "1000", "10000"})
private int size;
private String input;
private static final Pattern PATTERN = Pattern.compile(",");
@Setup
public void setup() {
input = new SplittableRandom(1).ints(size, 1000, 2000)
.mapToObj(String::valueOf).collect(Collectors.joining(","));
}
@Benchmark
public BigInteger sumOf1000thPowers() {
return PATTERN.splitAsStream(input).map(BigInteger::new).map(v -> v.pow(1000))
.reduce(BigInteger.ZERO, BigInteger::add);
}
@Benchmark
public BigInteger sumOf1000thPowersParallel() {
return PATTERN.splitAsStream(input).parallel().map(BigInteger::new).map(v -> v.pow(1000))
.reduce(BigInteger.ZERO, BigInteger::add);
}
}