diff --git a/src/java.base/share/classes/java/util/stream/AbstractPipeline.java b/src/java.base/share/classes/java/util/stream/AbstractPipeline.java index db75384e4d3..8afe5083cc7 100644 --- a/src/java.base/share/classes/java/util/stream/AbstractPipeline.java +++ b/src/java.base/share/classes/java/util/stream/AbstractPipeline.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2012, 2021, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2012, 2024, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -433,6 +433,20 @@ abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>> return result; } + /** + * Returns whether any of the stages in the (entire) pipeline is short-circuiting + * or not. + * @return {@code true} if any stage in this pipeline is short-circuiting, + * {@code false} if not. + */ + protected final boolean isShortCircuitingPipeline() { + for (var u = sourceStage.nextStage; u != null; u = u.nextStage) { + if (StreamOpFlag.SHORT_CIRCUIT.isKnown(u.combinedFlags)) + return true; + } + return false; + } + /** * Get the source spliterator for this pipeline stage. For a sequential or * stateless parallel pipeline, this is the source spliterator. For a diff --git a/src/java.base/share/classes/java/util/stream/DoublePipeline.java b/src/java.base/share/classes/java/util/stream/DoublePipeline.java index 47d9e017bb7..22b3bf67ef4 100644 --- a/src/java.base/share/classes/java/util/stream/DoublePipeline.java +++ b/src/java.base/share/classes/java/util/stream/DoublePipeline.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2013, 2023, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2013, 2024, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -39,6 +39,7 @@ import java.util.function.DoublePredicate; import java.util.function.DoubleToIntFunction; import java.util.function.DoubleToLongFunction; import java.util.function.DoubleUnaryOperator; +import java.util.function.IntConsumer; import java.util.function.IntFunction; import java.util.function.ObjDoubleConsumer; import java.util.function.Supplier; @@ -263,43 +264,46 @@ abstract class DoublePipeline<E_IN> StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) { @Override Sink<Double> opWrapSink(int flags, Sink<Double> sink) { - return new Sink.ChainedDouble<>(sink) { - // true if cancellationRequested() has been called - boolean cancellationRequestedCalled; + final DoubleConsumer fastPath = + isShortCircuitingPipeline() + ? null + : (sink instanceof DoubleConsumer dc) + ? dc + : sink::accept; + final class FlatMap implements Sink.OfDouble, DoublePredicate { + boolean cancel; - // cache the consumer to avoid creation on every accepted element - DoubleConsumer downstreamAsDouble = downstream::accept; + @Override public void begin(long size) { sink.begin(-1); } + @Override public void end() { sink.end(); } @Override - public void begin(long size) { - downstream.begin(-1); - } - - @Override - public void accept(double t) { - try (DoubleStream result = mapper.apply(t)) { + public void accept(double e) { + try (DoubleStream result = mapper.apply(e)) { if (result != null) { - if (!cancellationRequestedCalled) { - result.sequential().forEach(downstreamAsDouble); - } else { - var s = result.sequential().spliterator(); - do { - } while (!downstream.cancellationRequested() && s.tryAdvance(downstreamAsDouble)); - } + if (fastPath == null) + result.sequential().allMatch(this); + else + result.sequential().forEach(fastPath); } } } @Override public boolean cancellationRequested() { - // If this method is called then an operation within the stream - // pipeline is short-circuiting (see AbstractPipeline.copyInto). - // Note that we cannot differentiate between an upstream or - // downstream operation - cancellationRequestedCalled = true; - return downstream.cancellationRequested(); + return cancel || (cancel |= sink.cancellationRequested()); } - }; + + @Override + public boolean test(double output) { + if (!cancel) { + sink.accept(output); + return !(cancel |= sink.cancellationRequested()); + } else { + return false; + } + } + } + return new FlatMap(); } }; } diff --git a/src/java.base/share/classes/java/util/stream/GathererOp.java b/src/java.base/share/classes/java/util/stream/GathererOp.java index 096ac4ea3a5..8a7073f63ef 100644 --- a/src/java.base/share/classes/java/util/stream/GathererOp.java +++ b/src/java.base/share/classes/java/util/stream/GathererOp.java @@ -28,23 +28,15 @@ import jdk.internal.vm.annotation.ForceInline; import java.lang.invoke.MethodHandles; import java.lang.invoke.VarHandle; -import java.util.Comparator; -import java.util.Iterator; -import java.util.Optional; import java.util.Spliterator; import java.util.concurrent.CountedCompleter; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; -import java.util.function.BiFunction; import java.util.function.BinaryOperator; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.IntFunction; -import java.util.function.Predicate; import java.util.function.Supplier; -import java.util.function.ToDoubleFunction; -import java.util.function.ToIntFunction; -import java.util.function.ToLongFunction; import java.util.stream.Gatherer.Integrator; /** diff --git a/src/java.base/share/classes/java/util/stream/IntPipeline.java b/src/java.base/share/classes/java/util/stream/IntPipeline.java index 83a978d6f68..e78bc9fecc9 100644 --- a/src/java.base/share/classes/java/util/stream/IntPipeline.java +++ b/src/java.base/share/classes/java/util/stream/IntPipeline.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2012, 2023, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2012, 2024, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -297,43 +297,46 @@ abstract class IntPipeline<E_IN> StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) { @Override Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) { - return new Sink.ChainedInt<>(sink) { - // true if cancellationRequested() has been called - boolean cancellationRequestedCalled; + final IntConsumer fastPath = + isShortCircuitingPipeline() + ? null + : (sink instanceof IntConsumer ic) + ? ic + : sink::accept; + final class FlatMap implements Sink.OfInt, IntPredicate { + boolean cancel; - // cache the consumer to avoid creation on every accepted element - IntConsumer downstreamAsInt = downstream::accept; + @Override public void begin(long size) { sink.begin(-1); } + @Override public void end() { sink.end(); } @Override - public void begin(long size) { - downstream.begin(-1); - } - - @Override - public void accept(int t) { - try (IntStream result = mapper.apply(t)) { + public void accept(int e) { + try (IntStream result = mapper.apply(e)) { if (result != null) { - if (!cancellationRequestedCalled) { - result.sequential().forEach(downstreamAsInt); - } else { - var s = result.sequential().spliterator(); - do { - } while (!downstream.cancellationRequested() && s.tryAdvance(downstreamAsInt)); - } + if (fastPath == null) + result.sequential().allMatch(this); + else + result.sequential().forEach(fastPath); } } } @Override public boolean cancellationRequested() { - // If this method is called then an operation within the stream - // pipeline is short-circuiting (see AbstractPipeline.copyInto). - // Note that we cannot differentiate between an upstream or - // downstream operation - cancellationRequestedCalled = true; - return downstream.cancellationRequested(); + return cancel || (cancel |= sink.cancellationRequested()); } - }; + + @Override + public boolean test(int output) { + if (!cancel) { + sink.accept(output); + return !(cancel |= sink.cancellationRequested()); + } else { + return false; + } + } + } + return new FlatMap(); } }; } diff --git a/src/java.base/share/classes/java/util/stream/LongPipeline.java b/src/java.base/share/classes/java/util/stream/LongPipeline.java index b5d43c2ef13..c8c296824ed 100644 --- a/src/java.base/share/classes/java/util/stream/LongPipeline.java +++ b/src/java.base/share/classes/java/util/stream/LongPipeline.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2013, 2023, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2013, 2024, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -33,6 +33,7 @@ import java.util.Spliterator; import java.util.Spliterators; import java.util.function.BiConsumer; import java.util.function.BinaryOperator; +import java.util.function.IntConsumer; import java.util.function.IntFunction; import java.util.function.LongBinaryOperator; import java.util.function.LongConsumer; @@ -279,43 +280,46 @@ abstract class LongPipeline<E_IN> StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) { @Override Sink<Long> opWrapSink(int flags, Sink<Long> sink) { - return new Sink.ChainedLong<>(sink) { - // true if cancellationRequested() has been called - boolean cancellationRequestedCalled; + final LongConsumer fastPath = + isShortCircuitingPipeline() + ? null + : (sink instanceof LongConsumer lc) + ? lc + : sink::accept; + final class FlatMap implements Sink.OfLong, LongPredicate { + boolean cancel; - // cache the consumer to avoid creation on every accepted element - LongConsumer downstreamAsLong = downstream::accept; + @Override public void begin(long size) { sink.begin(-1); } + @Override public void end() { sink.end(); } @Override - public void begin(long size) { - downstream.begin(-1); - } - - @Override - public void accept(long t) { - try (LongStream result = mapper.apply(t)) { + public void accept(long e) { + try (LongStream result = mapper.apply(e)) { if (result != null) { - if (!cancellationRequestedCalled) { - result.sequential().forEach(downstreamAsLong); - } else { - var s = result.sequential().spliterator(); - do { - } while (!downstream.cancellationRequested() && s.tryAdvance(downstreamAsLong)); - } + if (fastPath == null) + result.sequential().allMatch(this); + else + result.sequential().forEach(fastPath); } } } @Override public boolean cancellationRequested() { - // If this method is called then an operation within the stream - // pipeline is short-circuiting (see AbstractPipeline.copyInto). - // Note that we cannot differentiate between an upstream or - // downstream operation - cancellationRequestedCalled = true; - return downstream.cancellationRequested(); + return cancel || (cancel |= sink.cancellationRequested()); } - }; + + @Override + public boolean test(long output) { + if (!cancel) { + sink.accept(output); + return !(cancel |= sink.cancellationRequested()); + } else { + return false; + } + } + } + return new FlatMap(); } }; } diff --git a/src/java.base/share/classes/java/util/stream/ReferencePipeline.java b/src/java.base/share/classes/java/util/stream/ReferencePipeline.java index 5058440335f..3133b20b03d 100644 --- a/src/java.base/share/classes/java/util/stream/ReferencePipeline.java +++ b/src/java.base/share/classes/java/util/stream/ReferencePipeline.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2012, 2023, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2012, 2024, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -36,10 +36,13 @@ import java.util.function.BiFunction; import java.util.function.BinaryOperator; import java.util.function.Consumer; import java.util.function.DoubleConsumer; +import java.util.function.DoublePredicate; import java.util.function.Function; import java.util.function.IntConsumer; import java.util.function.IntFunction; +import java.util.function.IntPredicate; import java.util.function.LongConsumer; +import java.util.function.LongPredicate; import java.util.function.Predicate; import java.util.function.Supplier; import java.util.function.ToDoubleFunction; @@ -274,40 +277,41 @@ abstract class ReferencePipeline<P_IN, P_OUT> StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) { @Override Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) { - return new Sink.ChainedReference<>(sink) { - // true if cancellationRequested() has been called - boolean cancellationRequestedCalled; + boolean shorts = isShortCircuitingPipeline(); + final class FlatMap implements Sink<P_OUT>, Predicate<R> { + boolean cancel; + + @Override public void begin(long size) { sink.begin(-1); } + @Override public void end() { sink.end(); } @Override - public void begin(long size) { - downstream.begin(-1); - } - - @Override - public void accept(P_OUT u) { - try (Stream<? extends R> result = mapper.apply(u)) { + public void accept(P_OUT e) { + try (Stream<? extends R> result = mapper.apply(e)) { if (result != null) { - if (!cancellationRequestedCalled) { - result.sequential().forEach(downstream); - } else { - var s = result.sequential().spliterator(); - do { - } while (!downstream.cancellationRequested() && s.tryAdvance(downstream)); - } + if (shorts) + result.sequential().allMatch(this); + else + result.sequential().forEach(sink); } } } @Override public boolean cancellationRequested() { - // If this method is called then an operation within the stream - // pipeline is short-circuiting (see AbstractPipeline.copyInto). - // Note that we cannot differentiate between an upstream or - // downstream operation - cancellationRequestedCalled = true; - return downstream.cancellationRequested(); + return cancel || (cancel |= sink.cancellationRequested()); } - }; + + @Override + public boolean test(R output) { + if (!cancel) { + sink.accept(output); + return !(cancel |= sink.cancellationRequested()); + } else { + return false; + } + } + } + return new FlatMap(); } }; } @@ -319,39 +323,46 @@ abstract class ReferencePipeline<P_IN, P_OUT> StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) { @Override Sink<P_OUT> opWrapSink(int flags, Sink<Integer> sink) { - return new Sink.ChainedReference<>(sink) { - // true if cancellationRequested() has been called - boolean cancellationRequestedCalled; + IntConsumer fastPath = + isShortCircuitingPipeline() + ? null + : (sink instanceof IntConsumer ic) + ? ic + : sink::accept; + final class FlatMap implements Sink<P_OUT>, IntPredicate { + boolean cancel; - // cache the consumer to avoid creation on every accepted element - IntConsumer downstreamAsInt = downstream::accept; + @Override public void begin(long size) { sink.begin(-1); } + @Override public void end() { sink.end(); } @Override - public void begin(long size) { - downstream.begin(-1); - } - - @Override - public void accept(P_OUT u) { - try (IntStream result = mapper.apply(u)) { + public void accept(P_OUT e) { + try (IntStream result = mapper.apply(e)) { if (result != null) { - if (!cancellationRequestedCalled) { - result.sequential().forEach(downstreamAsInt); - } else { - var s = result.sequential().spliterator(); - do { - } while (!downstream.cancellationRequested() && s.tryAdvance(downstreamAsInt)); - } + if (fastPath == null) + result.sequential().allMatch(this); + else + result.sequential().forEach(fastPath); } } } @Override public boolean cancellationRequested() { - cancellationRequestedCalled = true; - return downstream.cancellationRequested(); + return cancel || (cancel |= sink.cancellationRequested()); } - }; + + @Override + public boolean test(int output) { + if (!cancel) { + sink.accept(output); + return !(cancel |= sink.cancellationRequested()); + } else { + return false; + } + } + } + return new FlatMap(); } }; } @@ -363,39 +374,46 @@ abstract class ReferencePipeline<P_IN, P_OUT> StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) { @Override Sink<P_OUT> opWrapSink(int flags, Sink<Double> sink) { - return new Sink.ChainedReference<>(sink) { - // true if cancellationRequested() has been called - boolean cancellationRequestedCalled; + DoubleConsumer fastPath = + isShortCircuitingPipeline() + ? null + : (sink instanceof DoubleConsumer dc) + ? dc + : sink::accept; + final class FlatMap implements Sink<P_OUT>, DoublePredicate { + boolean cancel; - // cache the consumer to avoid creation on every accepted element - DoubleConsumer downstreamAsDouble = downstream::accept; + @Override public void begin(long size) { sink.begin(-1); } + @Override public void end() { sink.end(); } @Override - public void begin(long size) { - downstream.begin(-1); - } - - @Override - public void accept(P_OUT u) { - try (DoubleStream result = mapper.apply(u)) { + public void accept(P_OUT e) { + try (DoubleStream result = mapper.apply(e)) { if (result != null) { - if (!cancellationRequestedCalled) { - result.sequential().forEach(downstreamAsDouble); - } else { - var s = result.sequential().spliterator(); - do { - } while (!downstream.cancellationRequested() && s.tryAdvance(downstreamAsDouble)); - } + if (fastPath == null) + result.sequential().allMatch(this); + else + result.sequential().forEach(fastPath); } } } @Override public boolean cancellationRequested() { - cancellationRequestedCalled = true; - return downstream.cancellationRequested(); + return cancel || (cancel |= sink.cancellationRequested()); } - }; + + @Override + public boolean test(double output) { + if (!cancel) { + sink.accept(output); + return !(cancel |= sink.cancellationRequested()); + } else { + return false; + } + } + } + return new FlatMap(); } }; } @@ -408,39 +426,46 @@ abstract class ReferencePipeline<P_IN, P_OUT> StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) { @Override Sink<P_OUT> opWrapSink(int flags, Sink<Long> sink) { - return new Sink.ChainedReference<>(sink) { - // true if cancellationRequested() has been called - boolean cancellationRequestedCalled; + LongConsumer fastPath = + isShortCircuitingPipeline() + ? null + : (sink instanceof LongConsumer lc) + ? lc + : sink::accept; + final class FlatMap implements Sink<P_OUT>, LongPredicate { + boolean cancel; - // cache the consumer to avoid creation on every accepted element - LongConsumer downstreamAsLong = downstream::accept; + @Override public void begin(long size) { sink.begin(-1); } + @Override public void end() { sink.end(); } @Override - public void begin(long size) { - downstream.begin(-1); - } - - @Override - public void accept(P_OUT u) { - try (LongStream result = mapper.apply(u)) { + public void accept(P_OUT e) { + try (LongStream result = mapper.apply(e)) { if (result != null) { - if (!cancellationRequestedCalled) { - result.sequential().forEach(downstreamAsLong); - } else { - var s = result.sequential().spliterator(); - do { - } while (!downstream.cancellationRequested() && s.tryAdvance(downstreamAsLong)); - } + if (fastPath == null) + result.sequential().allMatch(this); + else + result.sequential().forEach(fastPath); } } } @Override public boolean cancellationRequested() { - cancellationRequestedCalled = true; - return downstream.cancellationRequested(); + return cancel || (cancel |= sink.cancellationRequested()); } - }; + + @Override + public boolean test(long output) { + if (!cancel) { + sink.accept(output); + return !(cancel |= sink.cancellationRequested()); + } else { + return false; + } + } + } + return new FlatMap(); } }; } diff --git a/test/jdk/java/util/stream/test/org/openjdk/tests/java/util/stream/FlatMapOpTest.java b/test/jdk/java/util/stream/test/org/openjdk/tests/java/util/stream/FlatMapOpTest.java index ab8daa732cd..36227de9c3a 100644 --- a/test/jdk/java/util/stream/test/org/openjdk/tests/java/util/stream/FlatMapOpTest.java +++ b/test/jdk/java/util/stream/test/org/openjdk/tests/java/util/stream/FlatMapOpTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2012, 2020, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2012, 2024, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -24,7 +24,7 @@ /* * @test * @summary flat-map operations - * @bug 8044047 8076458 8075939 + * @bug 8044047 8076458 8075939 8196106 */ package org.openjdk.tests.java.util.stream; @@ -273,4 +273,31 @@ public class FlatMapOpTest extends OpTestCase { limit(10).toArray(); assertEquals(count.get(), 10); } + + @Test + public void testTerminationOfNestedInfiniteStreams() { + var refExpected = Stream.generate(() -> "").limit(5).toList(); + var refResult = Stream.generate(() -> "") + .flatMap(c -> Stream.generate(() -> c).flatMap(x -> Stream.generate(() -> x))) + .limit(5).toList(); + assertEquals(refResult, refExpected); + + var intExpected = IntStream.generate(() -> 1).limit(5).sum(); + var intResult = IntStream.generate(() -> 1) + .flatMap(c -> IntStream.generate(() -> c).flatMap(x -> IntStream.generate(() -> x))) + .limit(5).sum(); + assertEquals(intResult, intExpected); + + var longExpected = LongStream.generate(() -> 1L).limit(5).sum(); + var longResult = LongStream.generate(() -> 1L) + .flatMap(c -> LongStream.generate(() -> c).flatMap(x -> LongStream.generate(() -> x))) + .limit(5).sum(); + assertEquals(longResult, longExpected); + + var doubleExpected = DoubleStream.generate(() -> 0d).limit(5).sum(); + var doubleResult = DoubleStream.generate(() -> 0d) + .flatMap(c -> DoubleStream.generate(() -> c).flatMap(x -> DoubleStream.generate(() -> x))) + .limit(5).sum(); + assertEquals(doubleResult, doubleExpected); + } } diff --git a/test/micro/org/openjdk/bench/java/util/stream/ops/DoubleAccumulator.java b/test/micro/org/openjdk/bench/java/util/stream/ops/DoubleAccumulator.java new file mode 100644 index 00000000000..0a828834633 --- /dev/null +++ b/test/micro/org/openjdk/bench/java/util/stream/ops/DoubleAccumulator.java @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2024, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ +package org.openjdk.bench.java.util.stream.ops; + +public class DoubleAccumulator { + + double acc; + + public DoubleAccumulator() { + acc = 0; + } + + public void add(double v) { + acc += v; + } + + public void merge(DoubleAccumulator other) { + acc += other.acc; + } + + public double get() { + return acc; + } + +} diff --git a/test/micro/org/openjdk/bench/java/util/stream/ops/ref/FlatMap.java b/test/micro/org/openjdk/bench/java/util/stream/ops/ref/FlatMap.java new file mode 100644 index 00000000000..ddd70ec18d8 --- /dev/null +++ b/test/micro/org/openjdk/bench/java/util/stream/ops/ref/FlatMap.java @@ -0,0 +1,239 @@ +/* + * Copyright (c) 2024, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ +package org.openjdk.bench.java.util.stream.ops.ref; + +import org.openjdk.bench.java.util.stream.ops.DoubleAccumulator; +import org.openjdk.bench.java.util.stream.ops.LongAccumulator; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.function.IntFunction; +import java.util.function.LongFunction; +import java.util.function.DoubleFunction; +import java.util.stream.Stream; +import java.util.stream.DoubleStream; +import java.util.stream.LongStream; +import java.util.stream.IntStream; +import java.util.Arrays; + +/** + * Benchmark for flatMap() operation. + */ +@BenchmarkMode(Mode.Throughput) +@Warmup(iterations = 4, time = 2, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 4, time = 2, timeUnit = TimeUnit.SECONDS) +@Fork(value = 3) +@OutputTimeUnit(TimeUnit.SECONDS) +@State(Scope.Thread) +public class FlatMap { + + /** + * Implementation notes: + * - parallel version requires thread-safe sink, we use the same for sequential version for better comparison + * - operations are explicit inner classes to untangle unwanted lambda effects + * - the result of applying consecutive operations is the same, in order to have the same number of elements in sink + */ + + @Param({"10", "100", "1000"}) + private int size; + + private Function<Long, Stream<Long>> funArrayStream; + private Function<Long, Stream<Long>> funIterateStream; + private LongFunction<LongStream> funLongStream; + private LongFunction<LongStream> funIterateLongStream; + private IntFunction<IntStream> funIntStream; + private IntFunction<IntStream> funIterateIntStream; + private DoubleFunction<DoubleStream> funDoubleStream; + private DoubleFunction<DoubleStream> funIterateDoubleStream; + + private Long[] cachedRefArray; + private int[] cachedIntArray; + private long[] cachedLongArray; + private double[] cachedDoubleArray; + + @Setup + public void setup() { + final int cachedSize = size; + cachedRefArray = new Long[cachedSize]; + cachedIntArray = new int[cachedSize]; + cachedLongArray = new long[cachedSize]; + cachedDoubleArray = new double[cachedSize]; + for(int i = 0;i < cachedRefArray.length;++i) { + cachedRefArray[i] = Long.valueOf(i); + cachedIntArray[i] = i; + cachedLongArray[i] = i; + cachedDoubleArray[i] = i; + } + + funArrayStream = new Function<Long, Stream<Long>>() { @Override public Stream<Long> apply(Long l) { + return Arrays.stream(cachedRefArray); + } }; + funIterateStream = new Function<Long, Stream<Long>>() { @Override public Stream<Long> apply(Long l) { + return Stream.iterate(0L, i -> i + 1).limit(cachedSize); } }; + funLongStream = new LongFunction<LongStream>() { @Override public LongStream apply(long l) { + return Arrays.stream(cachedLongArray); } }; + funIterateLongStream = new LongFunction<LongStream>() { @Override public LongStream apply(long l) { + return LongStream.iterate(0L, i -> i + 1).limit(cachedSize); } }; + funIntStream = new IntFunction<IntStream>() { @Override public IntStream apply(int i) { + return Arrays.stream(cachedIntArray); } }; + funIterateIntStream = new IntFunction<IntStream>() { @Override public IntStream apply(int i) { + return IntStream.iterate(0, ii -> ii + 1).limit(cachedSize); } }; + funDoubleStream = new DoubleFunction<DoubleStream>() { @Override public DoubleStream apply(double d) { + return Arrays.stream(cachedDoubleArray); } }; + funIterateDoubleStream = new DoubleFunction<DoubleStream>() { @Override public DoubleStream apply(double d) { + return DoubleStream.iterate(0d, i -> i + 1d).limit(cachedSize); } }; + } + + @Benchmark + public long seq_array_ref() { + return funArrayStream.apply(0L) + .flatMap(funArrayStream) + .collect(LongAccumulator::new, LongAccumulator::add, LongAccumulator::merge).get(); + } + + @Benchmark + public long par_array_ref() { + return funArrayStream.apply(0L) + .parallel() + .flatMap(funArrayStream) + .collect(LongAccumulator::new, LongAccumulator::add, LongAccumulator::merge).get(); + } + + @Benchmark + public long seq_array_long() { + return funLongStream.apply(0L) + .flatMap(funLongStream) + .collect(LongAccumulator::new, LongAccumulator::add, LongAccumulator::merge).get(); + } + + @Benchmark + public long par_array_long() { + return funLongStream.apply(0L) + .parallel() + .flatMap(funLongStream) + .collect(LongAccumulator::new, LongAccumulator::add, LongAccumulator::merge).get(); + } + + @Benchmark + public long seq_array_int() { + return funIntStream.apply(0) + .flatMap(funIntStream) + .collect(LongAccumulator::new, LongAccumulator::add, LongAccumulator::merge).get(); + } + + @Benchmark + public long par_array_int() { + return funIntStream.apply(0) + .parallel() + .flatMap(funIntStream) + .collect(LongAccumulator::new, LongAccumulator::add, LongAccumulator::merge).get(); + } + + @Benchmark + public double seq_array_double() { + return funDoubleStream.apply(0d) + .flatMap(funDoubleStream) + .collect(DoubleAccumulator::new, DoubleAccumulator::add, DoubleAccumulator::merge).get(); + } + + @Benchmark + public double par_array_double() { + return funDoubleStream.apply(0d) + .parallel() + .flatMap(funDoubleStream) + .collect(DoubleAccumulator::new, DoubleAccumulator::add, DoubleAccumulator::merge).get(); + } + + @Benchmark + public long seq_iterate_ref() { + return funIterateStream.apply(0L) + .flatMap(funIterateStream) + .collect(LongAccumulator::new, LongAccumulator::add, LongAccumulator::merge).get(); + } + + @Benchmark + public long par_iterate_ref() { + return funIterateStream.apply(0L) + .parallel() + .flatMap(funIterateStream) + .collect(LongAccumulator::new, LongAccumulator::add, LongAccumulator::merge).get(); + } + + + @Benchmark + public long seq_iterate_long() { + return funIterateLongStream.apply(0L) + .flatMap(funIterateLongStream) + .collect(LongAccumulator::new, LongAccumulator::add, LongAccumulator::merge).get(); + } + + @Benchmark + public long par_iterate_long() { + return funIterateLongStream.apply(0L) + .parallel() + .flatMap(funIterateLongStream) + .collect(LongAccumulator::new, LongAccumulator::add, LongAccumulator::merge).get(); + } + + @Benchmark + public long seq_iterate_int() { + return funIterateIntStream.apply(0) + .flatMap(funIterateIntStream) + .collect(LongAccumulator::new, LongAccumulator::add, LongAccumulator::merge).get(); + } + + @Benchmark + public long par_iterate_int() { + return funIterateIntStream.apply(0) + .parallel() + .flatMap(funIterateIntStream) + .collect(LongAccumulator::new, LongAccumulator::add, LongAccumulator::merge).get(); + } + + @Benchmark + public double seq_iterate_double() { + return funIterateDoubleStream.apply(0d) + .flatMap(funIterateDoubleStream) + .collect(DoubleAccumulator::new, DoubleAccumulator::add, DoubleAccumulator::merge).get(); + } + + @Benchmark + public double par_iterate_double() { + return funIterateDoubleStream.apply(0d) + .parallel() + .flatMap(funIterateDoubleStream) + .collect(DoubleAccumulator::new, DoubleAccumulator::add, DoubleAccumulator::merge).get(); + } + +}