diff --git a/jdk/src/java.base/share/classes/java/util/stream/AbstractPipeline.java b/jdk/src/java.base/share/classes/java/util/stream/AbstractPipeline.java index d33b9083a01..84199462c80 100644 --- a/jdk/src/java.base/share/classes/java/util/stream/AbstractPipeline.java +++ b/jdk/src/java.base/share/classes/java/util/stream/AbstractPipeline.java @@ -489,15 +489,17 @@ abstract class AbstractPipeline> @Override @SuppressWarnings("unchecked") - final void copyIntoWithCancel(Sink wrappedSink, Spliterator spliterator) { + final boolean copyIntoWithCancel(Sink wrappedSink, Spliterator spliterator) { @SuppressWarnings({"rawtypes","unchecked"}) AbstractPipeline p = AbstractPipeline.this; while (p.depth > 0) { p = p.previousStage; } + wrappedSink.begin(spliterator.getExactSizeIfKnown()); - p.forEachWithCancel(spliterator, wrappedSink); + boolean cancelled = p.forEachWithCancel(spliterator, wrappedSink); wrappedSink.end(); + return cancelled; } @Override @@ -602,8 +604,9 @@ abstract class AbstractPipeline> * * @param spliterator the spliterator to pull elements from * @param sink the sink to push elements to + * @return true if the cancellation was requested */ - abstract void forEachWithCancel(Spliterator spliterator, Sink sink); + abstract boolean forEachWithCancel(Spliterator spliterator, Sink sink); /** * Make a node builder compatible with this stream shape. diff --git a/jdk/src/java.base/share/classes/java/util/stream/DoublePipeline.java b/jdk/src/java.base/share/classes/java/util/stream/DoublePipeline.java index eb1d97195e5..56a5f57cc57 100644 --- a/jdk/src/java.base/share/classes/java/util/stream/DoublePipeline.java +++ b/jdk/src/java.base/share/classes/java/util/stream/DoublePipeline.java @@ -40,6 +40,7 @@ import java.util.function.DoubleToIntFunction; import java.util.function.DoubleToLongFunction; import java.util.function.DoubleUnaryOperator; import java.util.function.IntFunction; +import java.util.function.LongPredicate; import java.util.function.ObjDoubleConsumer; import java.util.function.Supplier; @@ -153,10 +154,12 @@ abstract class DoublePipeline } @Override - final void forEachWithCancel(Spliterator spliterator, Sink sink) { + final boolean forEachWithCancel(Spliterator spliterator, Sink sink) { Spliterator.OfDouble spl = adapt(spliterator); DoubleConsumer adaptedSink = adapt(sink); - do { } while (!sink.cancellationRequested() && spl.tryAdvance(adaptedSink)); + boolean cancelled; + do { } while (!(cancelled = sink.cancellationRequested()) && spl.tryAdvance(adaptedSink)); + return cancelled; } @Override @@ -352,6 +355,16 @@ abstract class DoublePipeline } } + @Override + public final DoubleStream takeWhile(DoublePredicate predicate) { + return WhileOps.makeTakeWhileDouble(this, predicate); + } + + @Override + public final DoubleStream dropWhile(DoublePredicate predicate) { + return WhileOps.makeDropWhileDouble(this, predicate); + } + @Override public final DoubleStream sorted() { return SortedOps.makeDouble(this); diff --git a/jdk/src/java.base/share/classes/java/util/stream/DoubleStream.java b/jdk/src/java.base/share/classes/java/util/stream/DoubleStream.java index 8f272bf4b1d..862f81e9e64 100644 --- a/jdk/src/java.base/share/classes/java/util/stream/DoubleStream.java +++ b/jdk/src/java.base/share/classes/java/util/stream/DoubleStream.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2012, 2014, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -24,18 +24,13 @@ */ package java.util.stream; -import java.nio.charset.Charset; -import java.nio.file.Files; -import java.nio.file.Path; import java.util.Arrays; -import java.util.Collection; import java.util.DoubleSummaryStatistics; import java.util.Objects; import java.util.OptionalDouble; import java.util.PrimitiveIterator; import java.util.Spliterator; import java.util.Spliterators; -import java.util.concurrent.ConcurrentHashMap; import java.util.function.BiConsumer; import java.util.function.DoubleBinaryOperator; import java.util.function.DoubleConsumer; @@ -279,6 +274,137 @@ public interface DoubleStream extends BaseStream { */ DoubleStream skip(long n); + /** + * Returns, if this stream is ordered, a stream consisting of the longest + * prefix of elements taken from this stream that match the given predicate. + * Otherwise returns, if this stream is unordered, a stream consisting of a + * subset of elements taken from this stream that match the given predicate. + * + *

If this stream is ordered then the longest prefix is a contiguous + * sequence of elements of this stream that match the given predicate. The + * first element of the sequence is the first element of this stream, and + * the element immediately following the last element of the sequence does + * not match the given predicate. + * + *

If this stream is unordered, and some (but not all) elements of this + * stream match the given predicate, then the behavior of this operation is + * nondeterministic; it is free to take any subset of matching elements + * (which includes the empty set). + * + *

Independent of whether this stream is ordered or unordered if all + * elements of this stream match the given predicate then this operation + * takes all elements (the result is the same is the input), or if no + * elements of the stream match the given predicate then no elements are + * taken (the result is an empty stream). + * + *

This is a short-circuiting + * stateful intermediate operation. + * + * @implSpec + * The default implementation obtains the {@link #spliterator() spliterator} + * of this stream, wraps that spliterator so as to support the semantics + * of this operation on traversal, and returns a new stream associated with + * the wrapped spliterator. The returned stream preserves the execution + * characteristics of this stream (namely parallel or sequential execution + * as per {@link #isParallel()}) but the wrapped spliterator may choose to + * not support splitting. When the returned stream is closed, the close + * handlers for both the returned and this stream are invoked. + * + * @apiNote + * While {@code takeWhile()} is generally a cheap operation on sequential + * stream pipelines, it can be quite expensive on ordered parallel + * pipelines, since the operation is constrained to return not just any + * valid prefix, but the longest prefix of elements in the encounter order. + * Using an unordered stream source (such as + * {@link #generate(DoubleSupplier)}) or removing the ordering constraint + * with {@link #unordered()} may result in significant speedups of + * {@code takeWhile()} in parallel pipelines, if the semantics of your + * situation permit. If consistency with encounter order is required, and + * you are experiencing poor performance or memory utilization with + * {@code takeWhile()} in parallel pipelines, switching to sequential + * execution with {@link #sequential()} may improve performance. + * + * @param predicate a non-interfering, + * stateless + * predicate to apply to elements to determine the longest + * prefix of elements. + * @return the new stream + */ + default DoubleStream takeWhile(DoublePredicate predicate) { + Objects.requireNonNull(predicate); + // Reuses the unordered spliterator, which, when encounter is present, + // is safe to use as long as it configured not to split + return StreamSupport.doubleStream( + new WhileOps.UnorderedWhileSpliterator.OfDouble.Taking(spliterator(), true, predicate), + isParallel()).onClose(this::close); + } + + /** + * Returns, if this stream is ordered, a stream consisting of the remaining + * elements of this stream after dropping the longest prefix of elements + * that match the given predicate. Otherwise returns, if this stream is + * unordered, a stream consisting of the remaining elements of this stream + * after dropping a subset of elements that match the given predicate. + * + *

If this stream is ordered then the longest prefix is a contiguous + * sequence of elements of this stream that match the given predicate. The + * first element of the sequence is the first element of this stream, and + * the element immediately following the last element of the sequence does + * not match the given predicate. + * + *

If this stream is unordered, and some (but not all) elements of this + * stream match the given predicate, then the behavior of this operation is + * nondeterministic; it is free to drop any subset of matching elements + * (which includes the empty set). + * + *

Independent of whether this stream is ordered or unordered if all + * elements of this stream match the given predicate then this operation + * drops all elements (the result is an empty stream), or if no elements of + * the stream match the given predicate then no elements are dropped (the + * result is the same is the input). + * + *

This is a stateful + * intermediate operation. + * + * @implSpec + * The default implementation obtains the {@link #spliterator() spliterator} + * of this stream, wraps that spliterator so as to support the semantics + * of this operation on traversal, and returns a new stream associated with + * the wrapped spliterator. The returned stream preserves the execution + * characteristics of this stream (namely parallel or sequential execution + * as per {@link #isParallel()}) but the wrapped spliterator may choose to + * not support splitting. When the returned stream is closed, the close + * handlers for both the returned and this stream are invoked. + * + * @apiNote + * While {@code dropWhile()} is generally a cheap operation on sequential + * stream pipelines, it can be quite expensive on ordered parallel + * pipelines, since the operation is constrained to return not just any + * valid prefix, but the longest prefix of elements in the encounter order. + * Using an unordered stream source (such as + * {@link #generate(DoubleSupplier)}) or removing the ordering constraint + * with {@link #unordered()} may result in significant speedups of + * {@code dropWhile()} in parallel pipelines, if the semantics of your + * situation permit. If consistency with encounter order is required, and + * you are experiencing poor performance or memory utilization with + * {@code dropWhile()} in parallel pipelines, switching to sequential + * execution with {@link #sequential()} may improve performance. + * + * @param predicate a non-interfering, + * stateless + * predicate to apply to elements to determine the longest + * prefix of elements. + * @return the new stream + */ + default DoubleStream dropWhile(DoublePredicate predicate) { + Objects.requireNonNull(predicate); + // Reuses the unordered spliterator, which, when encounter is present, + // is safe to use as long as it configured not to split + return StreamSupport.doubleStream( + new WhileOps.UnorderedWhileSpliterator.OfDouble.Dropping(spliterator(), true, predicate), + isParallel()).onClose(this::close); + } + /** * Performs an action for each element of this stream. * diff --git a/jdk/src/java.base/share/classes/java/util/stream/IntPipeline.java b/jdk/src/java.base/share/classes/java/util/stream/IntPipeline.java index 9c0162f1ce3..7cf0622ce89 100644 --- a/jdk/src/java.base/share/classes/java/util/stream/IntPipeline.java +++ b/jdk/src/java.base/share/classes/java/util/stream/IntPipeline.java @@ -156,10 +156,12 @@ abstract class IntPipeline } @Override - final void forEachWithCancel(Spliterator spliterator, Sink sink) { + final boolean forEachWithCancel(Spliterator spliterator, Sink sink) { Spliterator.OfInt spl = adapt(spliterator); IntConsumer adaptedSink = adapt(sink); - do { } while (!sink.cancellationRequested() && spl.tryAdvance(adaptedSink)); + boolean cancelled; + do { } while (!(cancelled = sink.cancellationRequested()) && spl.tryAdvance(adaptedSink)); + return cancelled; } @Override @@ -386,6 +388,16 @@ abstract class IntPipeline return SliceOps.makeInt(this, n, -1); } + @Override + public final IntStream takeWhile(IntPredicate predicate) { + return WhileOps.makeTakeWhileInt(this, predicate); + } + + @Override + public final IntStream dropWhile(IntPredicate predicate) { + return WhileOps.makeDropWhileInt(this, predicate); + } + @Override public final IntStream sorted() { return SortedOps.makeInt(this); diff --git a/jdk/src/java.base/share/classes/java/util/stream/IntStream.java b/jdk/src/java.base/share/classes/java/util/stream/IntStream.java index 4bb1ab5b97e..675b6fd307a 100644 --- a/jdk/src/java.base/share/classes/java/util/stream/IntStream.java +++ b/jdk/src/java.base/share/classes/java/util/stream/IntStream.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -272,6 +272,135 @@ public interface IntStream extends BaseStream { */ IntStream skip(long n); + /** + * Returns, if this stream is ordered, a stream consisting of the longest + * prefix of elements taken from this stream that match the given predicate. + * Otherwise returns, if this stream is unordered, a stream consisting of a + * subset of elements taken from this stream that match the given predicate. + * + *

If this stream is ordered then the longest prefix is a contiguous + * sequence of elements of this stream that match the given predicate. The + * first element of the sequence is the first element of this stream, and + * the element immediately following the last element of the sequence does + * not match the given predicate. + * + *

If this stream is unordered, and some (but not all) elements of this + * stream match the given predicate, then the behavior of this operation is + * nondeterministic; it is free to take any subset of matching elements + * (which includes the empty set). + * + *

Independent of whether this stream is ordered or unordered if all + * elements of this stream match the given predicate then this operation + * takes all elements (the result is the same is the input), or if no + * elements of the stream match the given predicate then no elements are + * taken (the result is an empty stream). + * + *

This is a short-circuiting + * stateful intermediate operation. + * + * @implSpec + * The default implementation obtains the {@link #spliterator() spliterator} + * of this stream, wraps that spliterator so as to support the semantics + * of this operation on traversal, and returns a new stream associated with + * the wrapped spliterator. The returned stream preserves the execution + * characteristics of this stream (namely parallel or sequential execution + * as per {@link #isParallel()}) but the wrapped spliterator may choose to + * not support splitting. When the returned stream is closed, the close + * handlers for both the returned and this stream are invoked. + * + * @apiNote + * While {@code takeWhile()} is generally a cheap operation on sequential + * stream pipelines, it can be quite expensive on ordered parallel + * pipelines, since the operation is constrained to return not just any + * valid prefix, but the longest prefix of elements in the encounter order. + * Using an unordered stream source (such as {@link #generate(IntSupplier)}) + * or removing the ordering constraint with {@link #unordered()} may result + * in significant speedups of {@code takeWhile()} in parallel pipelines, if + * the semantics of your situation permit. If consistency with encounter + * order is required, and you are experiencing poor performance or memory + * utilization with {@code takeWhile()} in parallel pipelines, switching to + * sequential execution with {@link #sequential()} may improve performance. + * + * @param predicate a non-interfering, + * stateless + * predicate to apply to elements to determine the longest + * prefix of elements. + * @return the new stream + */ + default IntStream takeWhile(IntPredicate predicate) { + Objects.requireNonNull(predicate); + // Reuses the unordered spliterator, which, when encounter is present, + // is safe to use as long as it configured not to split + return StreamSupport.intStream( + new WhileOps.UnorderedWhileSpliterator.OfInt.Taking(spliterator(), true, predicate), + isParallel()).onClose(this::close); + } + + /** + * Returns, if this stream is ordered, a stream consisting of the remaining + * elements of this stream after dropping the longest prefix of elements + * that match the given predicate. Otherwise returns, if this stream is + * unordered, a stream consisting of the remaining elements of this stream + * after dropping a subset of elements that match the given predicate. + * + *

If this stream is ordered then the longest prefix is a contiguous + * sequence of elements of this stream that match the given predicate. The + * first element of the sequence is the first element of this stream, and + * the element immediately following the last element of the sequence does + * not match the given predicate. + * + *

If this stream is unordered, and some (but not all) elements of this + * stream match the given predicate, then the behavior of this operation is + * nondeterministic; it is free to drop any subset of matching elements + * (which includes the empty set). + * + *

Independent of whether this stream is ordered or unordered if all + * elements of this stream match the given predicate then this operation + * drops all elements (the result is an empty stream), or if no elements of + * the stream match the given predicate then no elements are dropped (the + * result is the same is the input). + * + *

This is a stateful + * intermediate operation. + * + * @implSpec + * The default implementation obtains the {@link #spliterator() spliterator} + * of this stream, wraps that spliterator so as to support the semantics + * of this operation on traversal, and returns a new stream associated with + * the wrapped spliterator. The returned stream preserves the execution + * characteristics of this stream (namely parallel or sequential execution + * as per {@link #isParallel()}) but the wrapped spliterator may choose to + * not support splitting. When the returned stream is closed, the close + * handlers for both the returned and this stream are invoked. + * + * @apiNote + * While {@code dropWhile()} is generally a cheap operation on sequential + * stream pipelines, it can be quite expensive on ordered parallel + * pipelines, since the operation is constrained to return not just any + * valid prefix, but the longest prefix of elements in the encounter order. + * Using an unordered stream source (such as {@link #generate(IntSupplier)}) + * or removing the ordering constraint with {@link #unordered()} may result + * in significant speedups of {@code dropWhile()} in parallel pipelines, if + * the semantics of your situation permit. If consistency with encounter + * order is required, and you are experiencing poor performance or memory + * utilization with {@code dropWhile()} in parallel pipelines, switching to + * sequential execution with {@link #sequential()} may improve performance. + * + * @param predicate a non-interfering, + * stateless + * predicate to apply to elements to determine the longest + * prefix of elements. + * @return the new stream + */ + default IntStream dropWhile(IntPredicate predicate) { + Objects.requireNonNull(predicate); + // Reuses the unordered spliterator, which, when encounter is present, + // is safe to use as long as it configured not to split + return StreamSupport.intStream( + new WhileOps.UnorderedWhileSpliterator.OfInt.Dropping(spliterator(), true, predicate), + isParallel()).onClose(this::close); + } + /** * Performs an action for each element of this stream. * diff --git a/jdk/src/java.base/share/classes/java/util/stream/LongPipeline.java b/jdk/src/java.base/share/classes/java/util/stream/LongPipeline.java index 7a84ff997e7..19097b7e630 100644 --- a/jdk/src/java.base/share/classes/java/util/stream/LongPipeline.java +++ b/jdk/src/java.base/share/classes/java/util/stream/LongPipeline.java @@ -154,10 +154,12 @@ abstract class LongPipeline } @Override - final void forEachWithCancel(Spliterator spliterator, Sink sink) { + final boolean forEachWithCancel(Spliterator spliterator, Sink sink) { Spliterator.OfLong spl = adapt(spliterator); LongConsumer adaptedSink = adapt(sink); - do { } while (!sink.cancellationRequested() && spl.tryAdvance(adaptedSink)); + boolean cancelled; + do { } while (!(cancelled = sink.cancellationRequested()) && spl.tryAdvance(adaptedSink)); + return cancelled; } @Override @@ -367,6 +369,16 @@ abstract class LongPipeline return SliceOps.makeLong(this, n, -1); } + @Override + public final LongStream takeWhile(LongPredicate predicate) { + return WhileOps.makeTakeWhileLong(this, predicate); + } + + @Override + public final LongStream dropWhile(LongPredicate predicate) { + return WhileOps.makeDropWhileLong(this, predicate); + } + @Override public final LongStream sorted() { return SortedOps.makeLong(this); diff --git a/jdk/src/java.base/share/classes/java/util/stream/LongStream.java b/jdk/src/java.base/share/classes/java/util/stream/LongStream.java index 4f9c72bef42..dc6009f65fd 100644 --- a/jdk/src/java.base/share/classes/java/util/stream/LongStream.java +++ b/jdk/src/java.base/share/classes/java/util/stream/LongStream.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2013, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2013, 2015, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -24,11 +24,7 @@ */ package java.util.stream; -import java.nio.charset.Charset; -import java.nio.file.Files; -import java.nio.file.Path; import java.util.Arrays; -import java.util.Collection; import java.util.LongSummaryStatistics; import java.util.Objects; import java.util.OptionalDouble; @@ -36,7 +32,6 @@ import java.util.OptionalLong; import java.util.PrimitiveIterator; import java.util.Spliterator; import java.util.Spliterators; -import java.util.concurrent.ConcurrentHashMap; import java.util.function.BiConsumer; import java.util.function.Function; import java.util.function.LongBinaryOperator; @@ -277,6 +272,137 @@ public interface LongStream extends BaseStream { */ LongStream skip(long n); + /** + * Returns, if this stream is ordered, a stream consisting of the longest + * prefix of elements taken from this stream that match the given predicate. + * Otherwise returns, if this stream is unordered, a stream consisting of a + * subset of elements taken from this stream that match the given predicate. + * + *

If this stream is ordered then the longest prefix is a contiguous + * sequence of elements of this stream that match the given predicate. The + * first element of the sequence is the first element of this stream, and + * the element immediately following the last element of the sequence does + * not match the given predicate. + * + *

If this stream is unordered, and some (but not all) elements of this + * stream match the given predicate, then the behavior of this operation is + * nondeterministic; it is free to take any subset of matching elements + * (which includes the empty set). + * + *

Independent of whether this stream is ordered or unordered if all + * elements of this stream match the given predicate then this operation + * takes all elements (the result is the same is the input), or if no + * elements of the stream match the given predicate then no elements are + * taken (the result is an empty stream). + * + *

This is a short-circuiting + * stateful intermediate operation. + * + * @implSpec + * The default implementation obtains the {@link #spliterator() spliterator} + * of this stream, wraps that spliterator so as to support the semantics + * of this operation on traversal, and returns a new stream associated with + * the wrapped spliterator. The returned stream preserves the execution + * characteristics of this stream (namely parallel or sequential execution + * as per {@link #isParallel()}) but the wrapped spliterator may choose to + * not support splitting. When the returned stream is closed, the close + * handlers for both the returned and this stream are invoked. + * + * @apiNote + * While {@code takeWhile()} is generally a cheap operation on sequential + * stream pipelines, it can be quite expensive on ordered parallel + * pipelines, since the operation is constrained to return not just any + * valid prefix, but the longest prefix of elements in the encounter order. + * Using an unordered stream source (such as + * {@link #generate(LongSupplier)}) or removing the ordering constraint with + * {@link #unordered()} may result in significant speedups of + * {@code takeWhile()} in parallel pipelines, if the semantics of your + * situation permit. If consistency with encounter order is required, and + * you are experiencing poor performance or memory utilization with + * {@code takeWhile()} in parallel pipelines, switching to sequential + * execution with {@link #sequential()} may improve performance. + * + * @param predicate a non-interfering, + * stateless + * predicate to apply to elements to determine the longest + * prefix of elements. + * @return the new stream + */ + default LongStream takeWhile(LongPredicate predicate) { + Objects.requireNonNull(predicate); + // Reuses the unordered spliterator, which, when encounter is present, + // is safe to use as long as it configured not to split + return StreamSupport.longStream( + new WhileOps.UnorderedWhileSpliterator.OfLong.Taking(spliterator(), true, predicate), + isParallel()).onClose(this::close); + } + + /** + * Returns, if this stream is ordered, a stream consisting of the remaining + * elements of this stream after dropping the longest prefix of elements + * that match the given predicate. Otherwise returns, if this stream is + * unordered, a stream consisting of the remaining elements of this stream + * after dropping a subset of elements that match the given predicate. + * + *

If this stream is ordered then the longest prefix is a contiguous + * sequence of elements of this stream that match the given predicate. The + * first element of the sequence is the first element of this stream, and + * the element immediately following the last element of the sequence does + * not match the given predicate. + * + *

If this stream is unordered, and some (but not all) elements of this + * stream match the given predicate, then the behavior of this operation is + * nondeterministic; it is free to drop any subset of matching elements + * (which includes the empty set). + * + *

Independent of whether this stream is ordered or unordered if all + * elements of this stream match the given predicate then this operation + * drops all elements (the result is an empty stream), or if no elements of + * the stream match the given predicate then no elements are dropped (the + * result is the same is the input). + * + *

This is a stateful + * intermediate operation. + * + * @implSpec + * The default implementation obtains the {@link #spliterator() spliterator} + * of this stream, wraps that spliterator so as to support the semantics + * of this operation on traversal, and returns a new stream associated with + * the wrapped spliterator. The returned stream preserves the execution + * characteristics of this stream (namely parallel or sequential execution + * as per {@link #isParallel()}) but the wrapped spliterator may choose to + * not support splitting. When the returned stream is closed, the close + * handlers for both the returned and this stream are invoked. + * + * @apiNote + * While {@code dropWhile()} is generally a cheap operation on sequential + * stream pipelines, it can be quite expensive on ordered parallel + * pipelines, since the operation is constrained to return not just any + * valid prefix, but the longest prefix of elements in the encounter order. + * Using an unordered stream source (such as + * {@link #generate(LongSupplier)}) or removing the ordering constraint with + * {@link #unordered()} may result in significant speedups of + * {@code dropWhile()} in parallel pipelines, if the semantics of your + * situation permit. If consistency with encounter order is required, and + * you are experiencing poor performance or memory utilization with + * {@code dropWhile()} in parallel pipelines, switching to sequential + * execution with {@link #sequential()} may improve performance. + * + * @param predicate a non-interfering, + * stateless + * predicate to apply to elements to determine the longest + * prefix of elements. + * @return the new stream + */ + default LongStream dropWhile(LongPredicate predicate) { + Objects.requireNonNull(predicate); + // Reuses the unordered spliterator, which, when encounter is present, + // is safe to use as long as it configured not to split + return StreamSupport.longStream( + new WhileOps.UnorderedWhileSpliterator.OfLong.Dropping(spliterator(), true, predicate), + isParallel()).onClose(this::close); + } + /** * Performs an action for each element of this stream. * diff --git a/jdk/src/java.base/share/classes/java/util/stream/Node.java b/jdk/src/java.base/share/classes/java/util/stream/Node.java index 2b4360bea57..131195944ee 100644 --- a/jdk/src/java.base/share/classes/java/util/stream/Node.java +++ b/jdk/src/java.base/share/classes/java/util/stream/Node.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -125,7 +125,11 @@ interface Node { Node.Builder nodeBuilder = Nodes.builder(size, generator); nodeBuilder.begin(size); for (int i = 0; i < from && spliterator.tryAdvance(e -> { }); i++) { } - for (int i = 0; (i < size) && spliterator.tryAdvance(nodeBuilder); i++) { } + if (to == count()) { + spliterator.forEachRemaining(nodeBuilder); + } else { + for (int i = 0; i < size && spliterator.tryAdvance(nodeBuilder); i++) { } + } nodeBuilder.end(); return nodeBuilder.build(); } @@ -360,7 +364,11 @@ interface Node { Node.Builder.OfInt nodeBuilder = Nodes.intBuilder(size); nodeBuilder.begin(size); for (int i = 0; i < from && spliterator.tryAdvance((IntConsumer) e -> { }); i++) { } - for (int i = 0; (i < size) && spliterator.tryAdvance((IntConsumer) nodeBuilder); i++) { } + if (to == count()) { + spliterator.forEachRemaining((IntConsumer) nodeBuilder); + } else { + for (int i = 0; i < size && spliterator.tryAdvance((IntConsumer) nodeBuilder); i++) { } + } nodeBuilder.end(); return nodeBuilder.build(); } @@ -433,7 +441,11 @@ interface Node { Node.Builder.OfLong nodeBuilder = Nodes.longBuilder(size); nodeBuilder.begin(size); for (int i = 0; i < from && spliterator.tryAdvance((LongConsumer) e -> { }); i++) { } - for (int i = 0; (i < size) && spliterator.tryAdvance((LongConsumer) nodeBuilder); i++) { } + if (to == count()) { + spliterator.forEachRemaining((LongConsumer) nodeBuilder); + } else { + for (int i = 0; i < size && spliterator.tryAdvance((LongConsumer) nodeBuilder); i++) { } + } nodeBuilder.end(); return nodeBuilder.build(); } @@ -508,7 +520,11 @@ interface Node { Node.Builder.OfDouble nodeBuilder = Nodes.doubleBuilder(size); nodeBuilder.begin(size); for (int i = 0; i < from && spliterator.tryAdvance((DoubleConsumer) e -> { }); i++) { } - for (int i = 0; (i < size) && spliterator.tryAdvance((DoubleConsumer) nodeBuilder); i++) { } + if (to == count()) { + spliterator.forEachRemaining((DoubleConsumer) nodeBuilder); + } else { + for (int i = 0; i < size && spliterator.tryAdvance((DoubleConsumer) nodeBuilder); i++) { } + } nodeBuilder.end(); return nodeBuilder.build(); } diff --git a/jdk/src/java.base/share/classes/java/util/stream/Nodes.java b/jdk/src/java.base/share/classes/java/util/stream/Nodes.java index c18540c4e6e..8a517b0f263 100644 --- a/jdk/src/java.base/share/classes/java/util/stream/Nodes.java +++ b/jdk/src/java.base/share/classes/java/util/stream/Nodes.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -69,6 +69,14 @@ final class Nodes { private static final Node.OfLong EMPTY_LONG_NODE = new EmptyNode.OfLong(); private static final Node.OfDouble EMPTY_DOUBLE_NODE = new EmptyNode.OfDouble(); + /** + * @return an array generator for an array whose elements are of type T. + */ + @SuppressWarnings("unchecked") + static IntFunction castingArray() { + return size -> (T[]) new Object[size]; + } + // General shape-based node creation methods /** diff --git a/jdk/src/java.base/share/classes/java/util/stream/PipelineHelper.java b/jdk/src/java.base/share/classes/java/util/stream/PipelineHelper.java index 090469def00..081f68d1a0e 100644 --- a/jdk/src/java.base/share/classes/java/util/stream/PipelineHelper.java +++ b/jdk/src/java.base/share/classes/java/util/stream/PipelineHelper.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -136,8 +136,9 @@ abstract class PipelineHelper { * * @param wrappedSink the destination {@code Sink} * @param spliterator the source {@code Spliterator} + * @return true if the cancellation was requested */ - abstract void copyIntoWithCancel(Sink wrappedSink, Spliterator spliterator); + abstract boolean copyIntoWithCancel(Sink wrappedSink, Spliterator spliterator); /** * Takes a {@code Sink} that accepts elements of the output type of the diff --git a/jdk/src/java.base/share/classes/java/util/stream/ReferencePipeline.java b/jdk/src/java.base/share/classes/java/util/stream/ReferencePipeline.java index 4402997958b..80b0714e6be 100644 --- a/jdk/src/java.base/share/classes/java/util/stream/ReferencePipeline.java +++ b/jdk/src/java.base/share/classes/java/util/stream/ReferencePipeline.java @@ -122,8 +122,10 @@ abstract class ReferencePipeline } @Override - final void forEachWithCancel(Spliterator spliterator, Sink sink) { - do { } while (!sink.cancellationRequested() && spliterator.tryAdvance(sink)); + final boolean forEachWithCancel(Spliterator spliterator, Sink sink) { + boolean cancelled; + do { } while (!(cancelled = sink.cancellationRequested()) && spliterator.tryAdvance(sink)); + return cancelled; } @Override @@ -411,6 +413,16 @@ abstract class ReferencePipeline return SliceOps.makeRef(this, n, -1); } + @Override + public final Stream takeWhile(Predicate predicate) { + return WhileOps.makeTakeWhileRef(this, predicate); + } + + @Override + public final Stream dropWhile(Predicate predicate) { + return WhileOps.makeDropWhileRef(this, predicate); + } + // Terminal operations from Stream @Override diff --git a/jdk/src/java.base/share/classes/java/util/stream/SliceOps.java b/jdk/src/java.base/share/classes/java/util/stream/SliceOps.java index bfe053fca25..bdb13b4ff60 100644 --- a/jdk/src/java.base/share/classes/java/util/stream/SliceOps.java +++ b/jdk/src/java.base/share/classes/java/util/stream/SliceOps.java @@ -96,11 +96,6 @@ final class SliceOps { } } - @SuppressWarnings("unchecked") - private static IntFunction castingArray() { - return size -> (T[]) new Object[size]; - } - /** * Appends a "slice" operation to the provided stream. The slice operation * may be may be skip-only, limit-only, or skip-and-limit. @@ -151,7 +146,7 @@ final class SliceOps { // cancellation will be more aggressive cancelling later tasks // if the target slice size has been reached from a given task, // cancellation should also clear local results if any - return new SliceTask<>(this, helper, spliterator, castingArray(), skip, limit). + return new SliceTask<>(this, helper, spliterator, Nodes.castingArray(), skip, limit). invoke().spliterator(); } } diff --git a/jdk/src/java.base/share/classes/java/util/stream/Stream.java b/jdk/src/java.base/share/classes/java/util/stream/Stream.java index e0e26ff385f..6e96ba88ad4 100644 --- a/jdk/src/java.base/share/classes/java/util/stream/Stream.java +++ b/jdk/src/java.base/share/classes/java/util/stream/Stream.java @@ -24,7 +24,6 @@ */ package java.util.stream; -import java.nio.charset.Charset; import java.nio.file.Files; import java.nio.file.Path; import java.util.Arrays; @@ -480,6 +479,135 @@ public interface Stream extends BaseStream> { */ Stream skip(long n); + /** + * Returns, if this stream is ordered, a stream consisting of the longest + * prefix of elements taken from this stream that match the given predicate. + * Otherwise returns, if this stream is unordered, a stream consisting of a + * subset of elements taken from this stream that match the given predicate. + * + *

If this stream is ordered then the longest prefix is a contiguous + * sequence of elements of this stream that match the given predicate. The + * first element of the sequence is the first element of this stream, and + * the element immediately following the last element of the sequence does + * not match the given predicate. + * + *

If this stream is unordered, and some (but not all) elements of this + * stream match the given predicate, then the behavior of this operation is + * nondeterministic; it is free to take any subset of matching elements + * (which includes the empty set). + * + *

Independent of whether this stream is ordered or unordered if all + * elements of this stream match the given predicate then this operation + * takes all elements (the result is the same is the input), or if no + * elements of the stream match the given predicate then no elements are + * taken (the result is an empty stream). + * + *

This is a short-circuiting + * stateful intermediate operation. + * + * @implSpec + * The default implementation obtains the {@link #spliterator() spliterator} + * of this stream, wraps that spliterator so as to support the semantics + * of this operation on traversal, and returns a new stream associated with + * the wrapped spliterator. The returned stream preserves the execution + * characteristics of this stream (namely parallel or sequential execution + * as per {@link #isParallel()}) but the wrapped spliterator may choose to + * not support splitting. When the returned stream is closed, the close + * handlers for both the returned and this stream are invoked. + * + * @apiNote + * While {@code takeWhile()} is generally a cheap operation on sequential + * stream pipelines, it can be quite expensive on ordered parallel + * pipelines, since the operation is constrained to return not just any + * valid prefix, but the longest prefix of elements in the encounter order. + * Using an unordered stream source (such as {@link #generate(Supplier)}) or + * removing the ordering constraint with {@link #unordered()} may result in + * significant speedups of {@code takeWhile()} in parallel pipelines, if the + * semantics of your situation permit. If consistency with encounter order + * is required, and you are experiencing poor performance or memory + * utilization with {@code takeWhile()} in parallel pipelines, switching to + * sequential execution with {@link #sequential()} may improve performance. + * + * @param predicate a non-interfering, + * stateless + * predicate to apply to elements to determine the longest + * prefix of elements. + * @return the new stream + */ + default Stream takeWhile(Predicate predicate) { + Objects.requireNonNull(predicate); + // Reuses the unordered spliterator, which, when encounter is present, + // is safe to use as long as it configured not to split + return StreamSupport.stream( + new WhileOps.UnorderedWhileSpliterator.OfRef.Taking<>(spliterator(), true, predicate), + isParallel()).onClose(this::close); + } + + /** + * Returns, if this stream is ordered, a stream consisting of the remaining + * elements of this stream after dropping the longest prefix of elements + * that match the given predicate. Otherwise returns, if this stream is + * unordered, a stream consisting of the remaining elements of this stream + * after dropping a subset of elements that match the given predicate. + * + *

If this stream is ordered then the longest prefix is a contiguous + * sequence of elements of this stream that match the given predicate. The + * first element of the sequence is the first element of this stream, and + * the element immediately following the last element of the sequence does + * not match the given predicate. + * + *

If this stream is unordered, and some (but not all) elements of this + * stream match the given predicate, then the behavior of this operation is + * nondeterministic; it is free to drop any subset of matching elements + * (which includes the empty set). + * + *

Independent of whether this stream is ordered or unordered if all + * elements of this stream match the given predicate then this operation + * drops all elements (the result is an empty stream), or if no elements of + * the stream match the given predicate then no elements are dropped (the + * result is the same is the input). + * + *

This is a stateful + * intermediate operation. + * + * @implSpec + * The default implementation obtains the {@link #spliterator() spliterator} + * of this stream, wraps that spliterator so as to support the semantics + * of this operation on traversal, and returns a new stream associated with + * the wrapped spliterator. The returned stream preserves the execution + * characteristics of this stream (namely parallel or sequential execution + * as per {@link #isParallel()}) but the wrapped spliterator may choose to + * not support splitting. When the returned stream is closed, the close + * handlers for both the returned and this stream are invoked. + * + * @apiNote + * While {@code dropWhile()} is generally a cheap operation on sequential + * stream pipelines, it can be quite expensive on ordered parallel + * pipelines, since the operation is constrained to return not just any + * valid prefix, but the longest prefix of elements in the encounter order. + * Using an unordered stream source (such as {@link #generate(Supplier)}) or + * removing the ordering constraint with {@link #unordered()} may result in + * significant speedups of {@code dropWhile()} in parallel pipelines, if the + * semantics of your situation permit. If consistency with encounter order + * is required, and you are experiencing poor performance or memory + * utilization with {@code dropWhile()} in parallel pipelines, switching to + * sequential execution with {@link #sequential()} may improve performance. + * + * @param predicate a non-interfering, + * stateless + * predicate to apply to elements to determine the longest + * prefix of elements. + * @return the new stream + */ + default Stream dropWhile(Predicate predicate) { + Objects.requireNonNull(predicate); + // Reuses the unordered spliterator, which, when encounter is present, + // is safe to use as long as it configured not to split + return StreamSupport.stream( + new WhileOps.UnorderedWhileSpliterator.OfRef.Dropping<>(spliterator(), true, predicate), + isParallel()).onClose(this::close); + } + /** * Performs an action for each element of this stream. * diff --git a/jdk/src/java.base/share/classes/java/util/stream/WhileOps.java b/jdk/src/java.base/share/classes/java/util/stream/WhileOps.java new file mode 100644 index 00000000000..94705ec571b --- /dev/null +++ b/jdk/src/java.base/share/classes/java/util/stream/WhileOps.java @@ -0,0 +1,1394 @@ +/* + * Copyright (c) 2015, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ +package java.util.stream; + +import java.util.Comparator; +import java.util.Objects; +import java.util.Spliterator; +import java.util.concurrent.CountedCompleter; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; +import java.util.function.DoubleConsumer; +import java.util.function.DoublePredicate; +import java.util.function.IntConsumer; +import java.util.function.IntFunction; +import java.util.function.IntPredicate; +import java.util.function.LongConsumer; +import java.util.function.LongPredicate; +import java.util.function.Predicate; + +/** + * Factory for instances of a takeWhile and dropWhile operations + * that produce subsequences of their input stream. + * + * @since 1.9 + */ +final class WhileOps { + + static final int TAKE_FLAGS = StreamOpFlag.NOT_SIZED | StreamOpFlag.IS_SHORT_CIRCUIT; + + static final int DROP_FLAGS = StreamOpFlag.NOT_SIZED; + + /** + * Appends a "takeWhile" operation to the provided Stream. + * + * @param the type of both input and output elements + * @param upstream a reference stream with element type T + * @param predicate the predicate that returns false to halt taking. + */ + static Stream makeTakeWhileRef(AbstractPipeline upstream, + Predicate predicate) { + Objects.requireNonNull(predicate); + return new ReferencePipeline.StatefulOp(upstream, StreamShape.REFERENCE, TAKE_FLAGS) { + @Override + Spliterator opEvaluateParallelLazy(PipelineHelper helper, + Spliterator spliterator) { + if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { + return opEvaluateParallel(helper, spliterator, Nodes.castingArray()) + .spliterator(); + } + else { + return new UnorderedWhileSpliterator.OfRef.Taking<>( + helper.wrapSpliterator(spliterator), false, predicate); + } + } + + @Override + Node opEvaluateParallel(PipelineHelper helper, + Spliterator spliterator, + IntFunction generator) { + return new TakeWhileTask<>(this, helper, spliterator, generator) + .invoke(); + } + + @Override + Sink opWrapSink(int flags, Sink sink) { + return new Sink.ChainedReference(sink) { + boolean take = true; + + @Override + public void begin(long size) { + downstream.begin(-1); + } + + @Override + public void accept(T t) { + if (take = predicate.test(t)) { + downstream.accept(t); + } + } + + @Override + public boolean cancellationRequested() { + return !take || downstream.cancellationRequested(); + } + }; + } + }; + } + + /** + * Appends a "takeWhile" operation to the provided IntStream. + * + * @param upstream a reference stream with element type T + * @param predicate the predicate that returns false to halt taking. + */ + static IntStream makeTakeWhileInt(AbstractPipeline upstream, + IntPredicate predicate) { + Objects.requireNonNull(predicate); + return new IntPipeline.StatefulOp(upstream, StreamShape.INT_VALUE, TAKE_FLAGS) { + @Override + Spliterator opEvaluateParallelLazy(PipelineHelper helper, + Spliterator spliterator) { + if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { + return opEvaluateParallel(helper, spliterator, Integer[]::new) + .spliterator(); + } + else { + return new UnorderedWhileSpliterator.OfInt.Taking( + (Spliterator.OfInt) helper.wrapSpliterator(spliterator), false, predicate); + } + } + + @Override + Node opEvaluateParallel(PipelineHelper helper, + Spliterator spliterator, + IntFunction generator) { + return new TakeWhileTask<>(this, helper, spliterator, generator) + .invoke(); + } + + @Override + Sink opWrapSink(int flags, Sink sink) { + return new Sink.ChainedInt(sink) { + boolean take = true; + + @Override + public void begin(long size) { + downstream.begin(-1); + } + + @Override + public void accept(int t) { + if (take = predicate.test(t)) { + downstream.accept(t); + } + } + + @Override + public boolean cancellationRequested() { + return !take || downstream.cancellationRequested(); + } + }; + } + }; + } + + /** + * Appends a "takeWhile" operation to the provided LongStream. + * + * @param upstream a reference stream with element type T + * @param predicate the predicate that returns false to halt taking. + */ + static LongStream makeTakeWhileLong(AbstractPipeline upstream, + LongPredicate predicate) { + Objects.requireNonNull(predicate); + return new LongPipeline.StatefulOp(upstream, StreamShape.LONG_VALUE, TAKE_FLAGS) { + @Override + Spliterator opEvaluateParallelLazy(PipelineHelper helper, + Spliterator spliterator) { + if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { + return opEvaluateParallel(helper, spliterator, Long[]::new) + .spliterator(); + } + else { + return new UnorderedWhileSpliterator.OfLong.Taking( + (Spliterator.OfLong) helper.wrapSpliterator(spliterator), false, predicate); + } + } + + @Override + Node opEvaluateParallel(PipelineHelper helper, + Spliterator spliterator, + IntFunction generator) { + return new TakeWhileTask<>(this, helper, spliterator, generator) + .invoke(); + } + + @Override + Sink opWrapSink(int flags, Sink sink) { + return new Sink.ChainedLong(sink) { + boolean take = true; + + @Override + public void begin(long size) { + downstream.begin(-1); + } + + @Override + public void accept(long t) { + if (take = predicate.test(t)) { + downstream.accept(t); + } + } + + @Override + public boolean cancellationRequested() { + return !take || downstream.cancellationRequested(); + } + }; + } + }; + } + + /** + * Appends a "takeWhile" operation to the provided DoubleStream. + * + * @param upstream a reference stream with element type T + * @param predicate the predicate that returns false to halt taking. + */ + static DoubleStream makeTakeWhileDouble(AbstractPipeline upstream, + DoublePredicate predicate) { + Objects.requireNonNull(predicate); + return new DoublePipeline.StatefulOp(upstream, StreamShape.DOUBLE_VALUE, TAKE_FLAGS) { + @Override + Spliterator opEvaluateParallelLazy(PipelineHelper helper, + Spliterator spliterator) { + if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { + return opEvaluateParallel(helper, spliterator, Double[]::new) + .spliterator(); + } + else { + return new UnorderedWhileSpliterator.OfDouble.Taking( + (Spliterator.OfDouble) helper.wrapSpliterator(spliterator), false, predicate); + } + } + + @Override + Node opEvaluateParallel(PipelineHelper helper, + Spliterator spliterator, + IntFunction generator) { + return new TakeWhileTask<>(this, helper, spliterator, generator) + .invoke(); + } + + @Override + Sink opWrapSink(int flags, Sink sink) { + return new Sink.ChainedDouble(sink) { + boolean take = true; + + @Override + public void begin(long size) { + downstream.begin(-1); + } + + @Override + public void accept(double t) { + if (take = predicate.test(t)) { + downstream.accept(t); + } + } + + @Override + public boolean cancellationRequested() { + return !take || downstream.cancellationRequested(); + } + }; + } + }; + } + + /** + * A specialization for the dropWhile operation that controls if + * elements to be dropped are counted and passed downstream. + *

+ * This specialization is utilized by the {@link TakeWhileTask} for + * pipelines that are ordered. In such cases elements cannot be dropped + * until all elements have been collected. + * + * @param the type of both input and output elements + */ + interface DropWhileOp { + /** + * Accepts a {@code Sink} which will receive the results of this + * dropWhile operation, and return a {@code DropWhileSink} which + * accepts + * elements and which performs the dropWhile operation passing the + * results to the provided {@code Sink}. + * + * @param sink sink to which elements should be sent after processing + * @param retainAndCountDroppedElements true if elements to be dropped + * are counted and passed to the sink, otherwise such elements + * are actually dropped and not passed to the sink. + * @return a dropWhile sink + */ + DropWhileSink opWrapSink(Sink sink, boolean retainAndCountDroppedElements); + } + + /** + * A specialization for a dropWhile sink. + * + * @param the type of both input and output elements + */ + interface DropWhileSink extends Sink { + /** + * @return the could of elements that would have been dropped and + * instead were passed downstream. + */ + long getDropCount(); + } + + /** + * Appends a "dropWhile" operation to the provided Stream. + * + * @param the type of both input and output elements + * @param upstream a reference stream with element type T + * @param predicate the predicate that returns false to halt dropping. + */ + static Stream makeDropWhileRef(AbstractPipeline upstream, + Predicate predicate) { + Objects.requireNonNull(predicate); + + class Op extends ReferencePipeline.StatefulOp implements DropWhileOp { + public Op(AbstractPipeline upstream, StreamShape inputShape, int opFlags) { + super(upstream, inputShape, opFlags); + } + + @Override + Spliterator opEvaluateParallelLazy(PipelineHelper helper, + Spliterator spliterator) { + if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { + return opEvaluateParallel(helper, spliterator, Nodes.castingArray()) + .spliterator(); + } + else { + return new UnorderedWhileSpliterator.OfRef.Dropping<>( + helper.wrapSpliterator(spliterator), false, predicate); + } + } + + @Override + Node opEvaluateParallel(PipelineHelper helper, + Spliterator spliterator, + IntFunction generator) { + return new DropWhileTask<>(this, helper, spliterator, generator) + .invoke(); + } + + @Override + Sink opWrapSink(int flags, Sink sink) { + return opWrapSink(sink, false); + } + + public DropWhileSink opWrapSink(Sink sink, boolean retainAndCountDroppedElements) { + class OpSink extends Sink.ChainedReference implements DropWhileSink { + long dropCount; + boolean take; + + OpSink() { + super(sink); + } + + @Override + public void accept(T t) { + boolean takeElement = take || (take = !predicate.test(t)); + + // If ordered and element is dropped increment index + // for possible future truncation + if (retainAndCountDroppedElements && !takeElement) + dropCount++; + + // If ordered need to process element, otherwise + // skip if element is dropped + if (retainAndCountDroppedElements || takeElement) + downstream.accept(t); + } + + @Override + public long getDropCount() { + return dropCount; + } + } + return new OpSink(); + } + } + return new Op(upstream, StreamShape.REFERENCE, DROP_FLAGS); + } + + /** + * Appends a "dropWhile" operation to the provided IntStream. + * + * @param upstream a reference stream with element type T + * @param predicate the predicate that returns false to halt dropping. + */ + static IntStream makeDropWhileInt(AbstractPipeline upstream, + IntPredicate predicate) { + Objects.requireNonNull(predicate); + class Op extends IntPipeline.StatefulOp implements DropWhileOp { + public Op(AbstractPipeline upstream, StreamShape inputShape, int opFlags) { + super(upstream, inputShape, opFlags); + } + + @Override + Spliterator opEvaluateParallelLazy(PipelineHelper helper, + Spliterator spliterator) { + if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { + return opEvaluateParallel(helper, spliterator, Integer[]::new) + .spliterator(); + } + else { + return new UnorderedWhileSpliterator.OfInt.Dropping( + (Spliterator.OfInt) helper.wrapSpliterator(spliterator), false, predicate); + } + } + + @Override + Node opEvaluateParallel(PipelineHelper helper, + Spliterator spliterator, + IntFunction generator) { + return new DropWhileTask<>(this, helper, spliterator, generator) + .invoke(); + } + + @Override + Sink opWrapSink(int flags, Sink sink) { + return opWrapSink(sink, false); + } + + public DropWhileSink opWrapSink(Sink sink, boolean retainAndCountDroppedElements) { + class OpSink extends Sink.ChainedInt implements DropWhileSink { + long dropCount; + boolean take; + + OpSink() { + super(sink); + } + + @Override + public void accept(int t) { + boolean takeElement = take || (take = !predicate.test(t)); + + // If ordered and element is dropped increment index + // for possible future truncation + if (retainAndCountDroppedElements && !takeElement) + dropCount++; + + // If ordered need to process element, otherwise + // skip if element is dropped + if (retainAndCountDroppedElements || takeElement) + downstream.accept(t); + } + + @Override + public long getDropCount() { + return dropCount; + } + } + return new OpSink(); + } + } + return new Op(upstream, StreamShape.INT_VALUE, DROP_FLAGS); + } + + /** + * Appends a "dropWhile" operation to the provided LongStream. + * + * @param upstream a reference stream with element type T + * @param predicate the predicate that returns false to halt dropping. + */ + static LongStream makeDropWhileLong(AbstractPipeline upstream, + LongPredicate predicate) { + Objects.requireNonNull(predicate); + class Op extends LongPipeline.StatefulOp implements DropWhileOp { + public Op(AbstractPipeline upstream, StreamShape inputShape, int opFlags) { + super(upstream, inputShape, opFlags); + } + + @Override + Spliterator opEvaluateParallelLazy(PipelineHelper helper, + Spliterator spliterator) { + if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { + return opEvaluateParallel(helper, spliterator, Long[]::new) + .spliterator(); + } + else { + return new UnorderedWhileSpliterator.OfLong.Dropping( + (Spliterator.OfLong) helper.wrapSpliterator(spliterator), false, predicate); + } + } + + @Override + Node opEvaluateParallel(PipelineHelper helper, + Spliterator spliterator, + IntFunction generator) { + return new DropWhileTask<>(this, helper, spliterator, generator) + .invoke(); + } + + @Override + Sink opWrapSink(int flags, Sink sink) { + return opWrapSink(sink, false); + } + + public DropWhileSink opWrapSink(Sink sink, boolean retainAndCountDroppedElements) { + class OpSink extends Sink.ChainedLong implements DropWhileSink { + long dropCount; + boolean take; + + OpSink() { + super(sink); + } + + @Override + public void accept(long t) { + boolean takeElement = take || (take = !predicate.test(t)); + + // If ordered and element is dropped increment index + // for possible future truncation + if (retainAndCountDroppedElements && !takeElement) + dropCount++; + + // If ordered need to process element, otherwise + // skip if element is dropped + if (retainAndCountDroppedElements || takeElement) + downstream.accept(t); + } + + @Override + public long getDropCount() { + return dropCount; + } + } + return new OpSink(); + } + } + return new Op(upstream, StreamShape.LONG_VALUE, DROP_FLAGS); + } + + /** + * Appends a "dropWhile" operation to the provided DoubleStream. + * + * @param upstream a reference stream with element type T + * @param predicate the predicate that returns false to halt dropping. + */ + static DoubleStream makeDropWhileDouble(AbstractPipeline upstream, + DoublePredicate predicate) { + Objects.requireNonNull(predicate); + class Op extends DoublePipeline.StatefulOp implements DropWhileOp { + public Op(AbstractPipeline upstream, StreamShape inputShape, int opFlags) { + super(upstream, inputShape, opFlags); + } + + @Override + Spliterator opEvaluateParallelLazy(PipelineHelper helper, + Spliterator spliterator) { + if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { + return opEvaluateParallel(helper, spliterator, Double[]::new) + .spliterator(); + } + else { + return new UnorderedWhileSpliterator.OfDouble.Dropping( + (Spliterator.OfDouble) helper.wrapSpliterator(spliterator), false, predicate); + } + } + + @Override + Node opEvaluateParallel(PipelineHelper helper, + Spliterator spliterator, + IntFunction generator) { + return new DropWhileTask<>(this, helper, spliterator, generator) + .invoke(); + } + + @Override + Sink opWrapSink(int flags, Sink sink) { + return opWrapSink(sink, false); + } + + public DropWhileSink opWrapSink(Sink sink, boolean retainAndCountDroppedElements) { + class OpSink extends Sink.ChainedDouble implements DropWhileSink { + long dropCount; + boolean take; + + OpSink() { + super(sink); + } + + @Override + public void accept(double t) { + boolean takeElement = take || (take = !predicate.test(t)); + + // If ordered and element is dropped increment index + // for possible future truncation + if (retainAndCountDroppedElements && !takeElement) + dropCount++; + + // If ordered need to process element, otherwise + // skip if element is dropped + if (retainAndCountDroppedElements || takeElement) + downstream.accept(t); + } + + @Override + public long getDropCount() { + return dropCount; + } + } + return new OpSink(); + } + } + return new Op(upstream, StreamShape.DOUBLE_VALUE, DROP_FLAGS); + } + + // + + /** + * A spliterator supporting takeWhile and dropWhile operations over an + * underlying spliterator whose covered elements have no encounter order. + *

+ * Concrete subclasses of this spliterator support reference and primitive + * types for takeWhile and dropWhile. + *

+ * For the takeWhile operation if during traversal taking completes then + * taking is cancelled globally for the splitting and traversal of all + * related spliterators. + * Cancellation is governed by a shared {@link AtomicBoolean} instance. A + * spliterator in the process of taking when cancellation occurs will also + * be cancelled but not necessarily immediately. To reduce contention on + * the {@link AtomicBoolean} instance, cancellation make be acted on after + * a small number of additional elements have been traversed. + *

+ * For the dropWhile operation if during traversal dropping completes for + * some, but not all elements, then it is cancelled globally for the + * traversal of all related spliterators (splitting is not cancelled). + * Cancellation is governed in the same manner as for the takeWhile + * operation. + * + * @param the type of elements returned by this spliterator + * @param the type of the spliterator + */ + static abstract class UnorderedWhileSpliterator> implements Spliterator { + // Power of two constant minus one used for modulus of count + static final int CANCEL_CHECK_COUNT = (1 << 6) - 1; + + // The underlying spliterator + final T_SPLITR s; + // True if no splitting should be performed, if true then + // this spliterator may be used for an underlying spliterator whose + // covered elements have an encounter order + // See use in stream take/dropWhile default default methods + final boolean noSplitting; + // True when operations are cancelled for all related spliterators + // For taking, spliterators cannot split or traversed + // For dropping, spliterators cannot be traversed + final AtomicBoolean cancel; + // True while taking or dropping should be performed when traversing + boolean takeOrDrop = true; + // The count of elements traversed + int count; + + UnorderedWhileSpliterator(T_SPLITR s, boolean noSplitting) { + this.s = s; + this.noSplitting = noSplitting; + this.cancel = new AtomicBoolean(); + } + + UnorderedWhileSpliterator(T_SPLITR s, UnorderedWhileSpliterator parent) { + this.s = s; + this.noSplitting = parent.noSplitting; + this.cancel = parent.cancel; + } + + @Override + public long estimateSize() { + return s.estimateSize(); + } + + @Override + public int characteristics() { + // Size is not known + return s.characteristics() & ~(Spliterator.SIZED | Spliterator.SUBSIZED); + } + + @Override + public long getExactSizeIfKnown() { + return -1L; + } + + @Override + public Comparator getComparator() { + return s.getComparator(); + } + + @Override + public T_SPLITR trySplit() { + @SuppressWarnings("unchecked") + T_SPLITR ls = noSplitting ? null : (T_SPLITR) s.trySplit(); + return ls != null ? makeSpliterator(ls) : null; + } + + boolean checkCancelOnCount() { + return count != 0 || !cancel.get(); + } + + abstract T_SPLITR makeSpliterator(T_SPLITR s); + + static abstract class OfRef extends UnorderedWhileSpliterator> implements Consumer { + final Predicate p; + T t; + + OfRef(Spliterator s, boolean noSplitting, Predicate p) { + super(s, noSplitting); + this.p = p; + } + + OfRef(Spliterator s, OfRef parent) { + super(s, parent); + this.p = parent.p; + } + + @Override + public void accept(T t) { + count = (count + 1) & CANCEL_CHECK_COUNT; + this.t = t; + } + + static final class Taking extends OfRef { + Taking(Spliterator s, boolean noSplitting, Predicate p) { + super(s, noSplitting, p); + } + + Taking(Spliterator s, Taking parent) { + super(s, parent); + } + + @Override + public boolean tryAdvance(Consumer action) { + boolean test = true; + if (takeOrDrop && // If can take + checkCancelOnCount() && // and if not cancelled + s.tryAdvance(this) && // and if advanced one element + (test = p.test(t))) { // and test on element passes + action.accept(t); // then accept element + return true; + } + else { + // Taking is finished + takeOrDrop = false; + // Cancel all further traversal and splitting operations + // only if test of element failed (short-circuited) + if (!test) + cancel.set(true); + return false; + } + } + + @Override + public Spliterator trySplit() { + // Do not split if all operations are cancelled + return cancel.get() ? null : super.trySplit(); + } + + @Override + Spliterator makeSpliterator(Spliterator s) { + return new Taking<>(s, this); + } + } + + static final class Dropping extends OfRef { + Dropping(Spliterator s, boolean noSplitting, Predicate p) { + super(s, noSplitting, p); + } + + Dropping(Spliterator s, Dropping parent) { + super(s, parent); + } + + @Override + public boolean tryAdvance(Consumer action) { + if (takeOrDrop) { + takeOrDrop = false; + boolean adv; + boolean dropped = false; + while ((adv = s.tryAdvance(this)) && // If advanced one element + checkCancelOnCount() && // and if not cancelled + p.test(t)) { // and test on element passes + dropped = true; // then drop element + } + + // Report advanced element, if any + if (adv) { + // Cancel all further dropping if one or more elements + // were previously dropped + if (dropped) + cancel.set(true); + action.accept(t); + } + return adv; + } + else { + return s.tryAdvance(action); + } + } + + @Override + Spliterator makeSpliterator(Spliterator s) { + return new Dropping<>(s, this); + } + } + } + + static abstract class OfInt extends UnorderedWhileSpliterator implements IntConsumer, Spliterator.OfInt { + final IntPredicate p; + int t; + + OfInt(Spliterator.OfInt s, boolean noSplitting, IntPredicate p) { + super(s, noSplitting); + this.p = p; + } + + OfInt(Spliterator.OfInt s, UnorderedWhileSpliterator.OfInt parent) { + super(s, parent); + this.p = parent.p; + } + + @Override + public void accept(int t) { + count = (count + 1) & CANCEL_CHECK_COUNT; + this.t = t; + } + + static final class Taking extends UnorderedWhileSpliterator.OfInt { + Taking(Spliterator.OfInt s, boolean noSplitting, IntPredicate p) { + super(s, noSplitting, p); + } + + Taking(Spliterator.OfInt s, UnorderedWhileSpliterator.OfInt parent) { + super(s, parent); + } + + @Override + public boolean tryAdvance(IntConsumer action) { + boolean test = true; + if (takeOrDrop && // If can take + checkCancelOnCount() && // and if not cancelled + s.tryAdvance(this) && // and if advanced one element + (test = p.test(t))) { // and test on element passes + action.accept(t); // then accept element + return true; + } + else { + // Taking is finished + takeOrDrop = false; + // Cancel all further traversal and splitting operations + // only if test of element failed (short-circuited) + if (!test) + cancel.set(true); + return false; + } + } + + @Override + public Spliterator.OfInt trySplit() { + // Do not split if all operations are cancelled + return cancel.get() ? null : super.trySplit(); + } + + @Override + Spliterator.OfInt makeSpliterator(Spliterator.OfInt s) { + return new Taking(s, this); + } + } + + static final class Dropping extends UnorderedWhileSpliterator.OfInt { + Dropping(Spliterator.OfInt s, boolean noSplitting, IntPredicate p) { + super(s, noSplitting, p); + } + + Dropping(Spliterator.OfInt s, UnorderedWhileSpliterator.OfInt parent) { + super(s, parent); + } + + @Override + public boolean tryAdvance(IntConsumer action) { + if (takeOrDrop) { + takeOrDrop = false; + boolean adv; + boolean dropped = false; + while ((adv = s.tryAdvance(this)) && // If advanced one element + checkCancelOnCount() && // and if not cancelled + p.test(t)) { // and test on element passes + dropped = true; // then drop element + } + + // Report advanced element, if any + if (adv) { + // Cancel all further dropping if one or more elements + // were previously dropped + if (dropped) + cancel.set(true); + action.accept(t); + } + return adv; + } + else { + return s.tryAdvance(action); + } + } + + @Override + Spliterator.OfInt makeSpliterator(Spliterator.OfInt s) { + return new Dropping(s, this); + } + } + } + + static abstract class OfLong extends UnorderedWhileSpliterator implements LongConsumer, Spliterator.OfLong { + final LongPredicate p; + long t; + + OfLong(Spliterator.OfLong s, boolean noSplitting, LongPredicate p) { + super(s, noSplitting); + this.p = p; + } + + OfLong(Spliterator.OfLong s, UnorderedWhileSpliterator.OfLong parent) { + super(s, parent); + this.p = parent.p; + } + + @Override + public void accept(long t) { + count = (count + 1) & CANCEL_CHECK_COUNT; + this.t = t; + } + + static final class Taking extends UnorderedWhileSpliterator.OfLong { + Taking(Spliterator.OfLong s, boolean noSplitting, LongPredicate p) { + super(s, noSplitting, p); + } + + Taking(Spliterator.OfLong s, UnorderedWhileSpliterator.OfLong parent) { + super(s, parent); + } + + @Override + public boolean tryAdvance(LongConsumer action) { + boolean test = true; + if (takeOrDrop && // If can take + checkCancelOnCount() && // and if not cancelled + s.tryAdvance(this) && // and if advanced one element + (test = p.test(t))) { // and test on element passes + action.accept(t); // then accept element + return true; + } + else { + // Taking is finished + takeOrDrop = false; + // Cancel all further traversal and splitting operations + // only if test of element failed (short-circuited) + if (!test) + cancel.set(true); + return false; + } + } + + @Override + public Spliterator.OfLong trySplit() { + // Do not split if all operations are cancelled + return cancel.get() ? null : super.trySplit(); + } + + @Override + Spliterator.OfLong makeSpliterator(Spliterator.OfLong s) { + return new Taking(s, this); + } + } + + static final class Dropping extends UnorderedWhileSpliterator.OfLong { + Dropping(Spliterator.OfLong s, boolean noSplitting, LongPredicate p) { + super(s, noSplitting, p); + } + + Dropping(Spliterator.OfLong s, UnorderedWhileSpliterator.OfLong parent) { + super(s, parent); + } + + @Override + public boolean tryAdvance(LongConsumer action) { + if (takeOrDrop) { + takeOrDrop = false; + boolean adv; + boolean dropped = false; + while ((adv = s.tryAdvance(this)) && // If advanced one element + checkCancelOnCount() && // and if not cancelled + p.test(t)) { // and test on element passes + dropped = true; // then drop element + } + + // Report advanced element, if any + if (adv) { + // Cancel all further dropping if one or more elements + // were previously dropped + if (dropped) + cancel.set(true); + action.accept(t); + } + return adv; + } + else { + return s.tryAdvance(action); + } + } + + @Override + Spliterator.OfLong makeSpliterator(Spliterator.OfLong s) { + return new Dropping(s, this); + } + } + } + + static abstract class OfDouble extends UnorderedWhileSpliterator implements DoubleConsumer, Spliterator.OfDouble { + final DoublePredicate p; + double t; + + OfDouble(Spliterator.OfDouble s, boolean noSplitting, DoublePredicate p) { + super(s, noSplitting); + this.p = p; + } + + OfDouble(Spliterator.OfDouble s, UnorderedWhileSpliterator.OfDouble parent) { + super(s, parent); + this.p = parent.p; + } + + @Override + public void accept(double t) { + count = (count + 1) & CANCEL_CHECK_COUNT; + this.t = t; + } + + static final class Taking extends UnorderedWhileSpliterator.OfDouble { + Taking(Spliterator.OfDouble s, boolean noSplitting, DoublePredicate p) { + super(s, noSplitting, p); + } + + Taking(Spliterator.OfDouble s, UnorderedWhileSpliterator.OfDouble parent) { + super(s, parent); + } + + @Override + public boolean tryAdvance(DoubleConsumer action) { + boolean test = true; + if (takeOrDrop && // If can take + checkCancelOnCount() && // and if not cancelled + s.tryAdvance(this) && // and if advanced one element + (test = p.test(t))) { // and test on element passes + action.accept(t); // then accept element + return true; + } + else { + // Taking is finished + takeOrDrop = false; + // Cancel all further traversal and splitting operations + // only if test of element failed (short-circuited) + if (!test) + cancel.set(true); + return false; + } + } + + @Override + public Spliterator.OfDouble trySplit() { + // Do not split if all operations are cancelled + return cancel.get() ? null : super.trySplit(); + } + + @Override + Spliterator.OfDouble makeSpliterator(Spliterator.OfDouble s) { + return new Taking(s, this); + } + } + + static final class Dropping extends UnorderedWhileSpliterator.OfDouble { + Dropping(Spliterator.OfDouble s, boolean noSplitting, DoublePredicate p) { + super(s, noSplitting, p); + } + + Dropping(Spliterator.OfDouble s, UnorderedWhileSpliterator.OfDouble parent) { + super(s, parent); + } + + @Override + public boolean tryAdvance(DoubleConsumer action) { + if (takeOrDrop) { + takeOrDrop = false; + boolean adv; + boolean dropped = false; + while ((adv = s.tryAdvance(this)) && // If advanced one element + checkCancelOnCount() && // and if not cancelled + p.test(t)) { // and test on element passes + dropped = true; // then drop element + } + + // Report advanced element, if any + if (adv) { + // Cancel all further dropping if one or more elements + // were previously dropped + if (dropped) + cancel.set(true); + action.accept(t); + } + return adv; + } + else { + return s.tryAdvance(action); + } + } + + @Override + Spliterator.OfDouble makeSpliterator(Spliterator.OfDouble s) { + return new Dropping(s, this); + } + } + } + } + + + // + + /** + * {@code ForkJoinTask} implementing takeWhile computation. + *

+ * If the pipeline has encounter order then all tasks to the right of + * a task where traversal was short-circuited are cancelled. + * The results of completed (and cancelled) tasks are discarded. + * The result of merging a short-circuited left task and right task (which + * may or may not be short-circuited) is that left task. + *

+ * If the pipeline has no encounter order then all tasks to the right of + * a task where traversal was short-circuited are cancelled. + * The results of completed (and possibly cancelled) tasks are not + * discarded, as there is no need to throw away computed results. + * The result of merging does not change if a left task was + * short-circuited. + * No attempt is made, once a leaf task stopped taking, for it to cancel + * all other tasks, and further more, short-circuit the computation with its + * result. + * + * @param Input element type to the stream pipeline + * @param Output element type from the stream pipeline + */ + @SuppressWarnings("serial") + private static final class TakeWhileTask + extends AbstractShortCircuitTask, TakeWhileTask> { + private final AbstractPipeline op; + private final IntFunction generator; + private final boolean isOrdered; + private long thisNodeSize; + // True if a short-circuited + private boolean shortCircuited; + // True if completed, must be set after the local result + private volatile boolean completed; + + TakeWhileTask(AbstractPipeline op, + PipelineHelper helper, + Spliterator spliterator, + IntFunction generator) { + super(helper, spliterator); + this.op = op; + this.generator = generator; + this.isOrdered = StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags()); + } + + TakeWhileTask(TakeWhileTask parent, Spliterator spliterator) { + super(parent, spliterator); + this.op = parent.op; + this.generator = parent.generator; + this.isOrdered = parent.isOrdered; + } + + @Override + protected TakeWhileTask makeChild(Spliterator spliterator) { + return new TakeWhileTask<>(this, spliterator); + } + + @Override + protected final Node getEmptyResult() { + return Nodes.emptyNode(op.getOutputShape()); + } + + @Override + protected final Node doLeaf() { + Node.Builder builder = helper.makeNodeBuilder(-1, generator); + Sink s = op.opWrapSink(helper.getStreamAndOpFlags(), builder); + + if (shortCircuited = helper.copyIntoWithCancel(helper.wrapSink(s), spliterator)) { + // Cancel later nodes if the predicate returned false + // during traversal + cancelLaterNodes(); + } + + Node node = builder.build(); + thisNodeSize = node.count(); + return node; + } + + @Override + public final void onCompletion(CountedCompleter caller) { + if (!isLeaf()) { + Node result; + shortCircuited = leftChild.shortCircuited | rightChild.shortCircuited; + if (isOrdered && canceled) { + thisNodeSize = 0; + result = getEmptyResult(); + } + else if (isOrdered && leftChild.shortCircuited) { + // If taking finished on the left node then + // use the left node result + thisNodeSize = leftChild.thisNodeSize; + result = leftChild.getLocalResult(); + } + else { + thisNodeSize = leftChild.thisNodeSize + rightChild.thisNodeSize; + result = merge(); + } + + setLocalResult(result); + } + + completed = true; + super.onCompletion(caller); + } + + Node merge() { + if (leftChild.thisNodeSize == 0) { + // If the left node size is 0 then + // use the right node result + return rightChild.getLocalResult(); + } + else if (rightChild.thisNodeSize == 0) { + // If the right node size is 0 then + // use the left node result + return leftChild.getLocalResult(); + } + else { + // Combine the left and right nodes + return Nodes.conc(op.getOutputShape(), + leftChild.getLocalResult(), rightChild.getLocalResult()); + } + } + + @Override + protected void cancel() { + super.cancel(); + if (isOrdered && completed) + // If the task is completed then clear the result, if any + // to aid GC + setLocalResult(getEmptyResult()); + } + } + + /** + * {@code ForkJoinTask} implementing dropWhile computation. + *

+ * If the pipeline has encounter order then each leaf task will not + * drop elements but will obtain a count of the elements that would have + * been otherwise dropped. That count is used as an index to track + * elements to be dropped. Merging will update the index so it corresponds + * to the index that is the end of the global prefix of elements to be + * dropped. The root is truncated according to that index. + *

+ * If the pipeline has no encounter order then each leaf task will drop + * elements. Leaf tasks are ordinarily merged. No truncation of the root + * node is required. + * No attempt is made, once a leaf task stopped dropping, for it to cancel + * all other tasks, and further more, short-circuit the computation with + * its result. + * + * @param Input element type to the stream pipeline + * @param Output element type from the stream pipeline + */ + @SuppressWarnings("serial") + private static final class DropWhileTask + extends AbstractTask, DropWhileTask> { + private final AbstractPipeline op; + private final IntFunction generator; + private final boolean isOrdered; + private long thisNodeSize; + // The index from which elements of the node should be taken + // i.e. the node should be truncated from [takeIndex, thisNodeSize) + // Equivalent to the count of dropped elements + private long index; + + DropWhileTask(AbstractPipeline op, + PipelineHelper helper, + Spliterator spliterator, + IntFunction generator) { + super(helper, spliterator); + assert op instanceof DropWhileOp; + this.op = op; + this.generator = generator; + this.isOrdered = StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags()); + } + + DropWhileTask(DropWhileTask parent, Spliterator spliterator) { + super(parent, spliterator); + this.op = parent.op; + this.generator = parent.generator; + this.isOrdered = parent.isOrdered; + } + + @Override + protected DropWhileTask makeChild(Spliterator spliterator) { + return new DropWhileTask<>(this, spliterator); + } + + @Override + protected final Node doLeaf() { + boolean isChild = !isRoot(); + // If this not the root and pipeline is ordered and size is known + // then pre-size the builder + long sizeIfKnown = isChild && isOrdered && StreamOpFlag.SIZED.isPreserved(op.sourceOrOpFlags) + ? op.exactOutputSizeIfKnown(spliterator) + : -1; + Node.Builder builder = helper.makeNodeBuilder(sizeIfKnown, generator); + @SuppressWarnings("unchecked") + DropWhileOp dropOp = (DropWhileOp) op; + // If this leaf is the root then there is no merging on completion + // and there is no need to retain dropped elements + DropWhileSink s = dropOp.opWrapSink(builder, isOrdered && isChild); + helper.wrapAndCopyInto(s, spliterator); + + Node node = builder.build(); + thisNodeSize = node.count(); + index = s.getDropCount(); + return node; + } + + @Override + public final void onCompletion(CountedCompleter caller) { + if (!isLeaf()) { + if (isOrdered) { + index = leftChild.index; + // If a contiguous sequence of dropped elements + // include those of the right node, if any + if (index == leftChild.thisNodeSize) + index += rightChild.index; + } + + thisNodeSize = leftChild.thisNodeSize + rightChild.thisNodeSize; + Node result = merge(); + setLocalResult(isRoot() ? doTruncate(result) : result); + } + + super.onCompletion(caller); + } + + private Node merge() { + if (leftChild.thisNodeSize == 0) { + // If the left node size is 0 then + // use the right node result + return rightChild.getLocalResult(); + } + else if (rightChild.thisNodeSize == 0) { + // If the right node size is 0 then + // use the left node result + return leftChild.getLocalResult(); + } + else { + // Combine the left and right nodes + return Nodes.conc(op.getOutputShape(), + leftChild.getLocalResult(), rightChild.getLocalResult()); + } + } + + private Node doTruncate(Node input) { + return isOrdered + ? input.truncate(index, input.count(), generator) + : input; + } + } +} diff --git a/jdk/test/java/util/stream/bootlib/java/util/stream/DefaultMethodStreams.java b/jdk/test/java/util/stream/bootlib/java/util/stream/DefaultMethodStreams.java new file mode 100644 index 00000000000..c75db568ef2 --- /dev/null +++ b/jdk/test/java/util/stream/bootlib/java/util/stream/DefaultMethodStreams.java @@ -0,0 +1,984 @@ +/* + * Copyright (c) 2015, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ +package java.util.stream; + +import java.lang.reflect.Method; +import java.lang.reflect.Modifier; +import java.util.Comparator; +import java.util.DoubleSummaryStatistics; +import java.util.IntSummaryStatistics; +import java.util.Iterator; +import java.util.LongSummaryStatistics; +import java.util.Optional; +import java.util.OptionalDouble; +import java.util.OptionalInt; +import java.util.OptionalLong; +import java.util.PrimitiveIterator; +import java.util.Set; +import java.util.Spliterator; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.BinaryOperator; +import java.util.function.Consumer; +import java.util.function.DoubleBinaryOperator; +import java.util.function.DoubleConsumer; +import java.util.function.DoubleFunction; +import java.util.function.DoublePredicate; +import java.util.function.DoubleToIntFunction; +import java.util.function.DoubleToLongFunction; +import java.util.function.DoubleUnaryOperator; +import java.util.function.Function; +import java.util.function.IntBinaryOperator; +import java.util.function.IntConsumer; +import java.util.function.IntFunction; +import java.util.function.IntPredicate; +import java.util.function.IntToDoubleFunction; +import java.util.function.IntToLongFunction; +import java.util.function.IntUnaryOperator; +import java.util.function.LongBinaryOperator; +import java.util.function.LongConsumer; +import java.util.function.LongFunction; +import java.util.function.LongPredicate; +import java.util.function.LongToDoubleFunction; +import java.util.function.LongToIntFunction; +import java.util.function.LongUnaryOperator; +import java.util.function.ObjDoubleConsumer; +import java.util.function.ObjIntConsumer; +import java.util.function.ObjLongConsumer; +import java.util.function.Predicate; +import java.util.function.Supplier; +import java.util.function.ToDoubleFunction; + +import java.util.function.ToIntFunction; +import java.util.function.ToLongFunction; + +import static java.util.stream.Collectors.*; + +public final class DefaultMethodStreams { + + static { + // Verify that default methods are not overridden + verify(DefaultMethodRefStream.class); + verify(DefaultMethodIntStream.class); + verify(DefaultMethodLongStream.class); + verify(DefaultMethodDoubleStream.class); + } + + static void verify(Class del) { + // Find the stream interface + Class s = Stream.of(del.getInterfaces()) + .filter(c -> BaseStream.class.isAssignableFrom(c)) + .findFirst().get(); + + // Get all default methods on the stream class + Set dms = Stream.of(s.getMethods()) + .filter(m -> !Modifier.isStatic(m.getModifiers())) + .filter(m -> !m.isBridge()) + .filter(Method::isDefault) + .map(Method::getName) + .collect(toSet()); + + // Get all methods on the delegating class + Set ims = Stream.of(del.getMethods()) + .filter(m -> !Modifier.isStatic(m.getModifiers())) + .filter(m -> m.getDeclaringClass() == del) + .map(Method::getName) + .collect(toSet()); + + if (ims.stream().anyMatch(dms::contains)) { + throw new AssertionError(String.format("%s overrides default methods of %s\n", del, s)); + } + } + + /** + * Creates a stream that for the next operation either delegates to + * a default method on {@link Stream}, if present for that operation, + * otherwise delegates to an underlying stream. + * + * @param s the underlying stream to be delegated to for non-default + * methods. + * @param the type of the stream elements + * @return the delegating stream + */ + public static Stream delegateTo(Stream s) { + return new DefaultMethodRefStream<>(s); + } + + /** + * Creates a stream that for the next operation either delegates to + * a default method on {@link IntStream}, if present for that operation, + * otherwise delegates to an underlying stream. + * + * @param s the underlying stream to be delegated to for non-default + * methods. + * @return the delegating stream + */ + public static IntStream delegateTo(IntStream s) { + return new DefaultMethodIntStream(s); + } + + /** + * Creates a stream that for the next operation either delegates to + * a default method on {@link LongStream}, if present for that operation, + * otherwise delegates to an underlying stream. + * + * @param s the underlying stream to be delegated to for non-default + * methods. + * @return the delegating stream + */ + public static LongStream delegateTo(LongStream s) { + return new DefaultMethodLongStream(s); + } + + /** + * Creates a stream that for the next operation either delegates to + * a default method on {@link DoubleStream}, if present for that operation, + * otherwise delegates to an underlying stream. + * + * @param s the underlying stream to be delegated to for non-default + * methods. + * @return the delegating stream + */ + public static DoubleStream delegateTo(DoubleStream s) { + return new DefaultMethodDoubleStream(s); + } + + /** + * A stream that delegates the next operation to a default method, if + * present, or to the same operation of an underlying stream. + * + * @param the type of the stream elements + */ + static final class DefaultMethodRefStream implements Stream { + final Stream s; + + DefaultMethodRefStream(Stream s) { + this.s = s; + } + + + // Delegating non-default methods + + @Override + public Stream filter(Predicate predicate) { + return s.filter(predicate); + } + + @Override + public Stream map(Function mapper) { + return s.map(mapper); + } + + @Override + public IntStream mapToInt(ToIntFunction mapper) { + return s.mapToInt(mapper); + } + + @Override + public LongStream mapToLong(ToLongFunction mapper) { + return s.mapToLong(mapper); + } + + @Override + public DoubleStream mapToDouble(ToDoubleFunction mapper) { + return s.mapToDouble(mapper); + } + + @Override + public Stream flatMap(Function> mapper) { + return s.flatMap(mapper); + } + + @Override + public IntStream flatMapToInt(Function mapper) { + return s.flatMapToInt(mapper); + } + + @Override + public LongStream flatMapToLong(Function mapper) { + return s.flatMapToLong(mapper); + } + + @Override + public DoubleStream flatMapToDouble(Function mapper) { + return s.flatMapToDouble(mapper); + } + + @Override + public Stream distinct() { + return s.distinct(); + } + + @Override + public Stream sorted() { + return s.sorted(); + } + + @Override + public Stream sorted(Comparator comparator) { + return s.sorted(comparator); + } + + @Override + public Stream peek(Consumer action) { + return s.peek(action); + } + + @Override + public Stream limit(long maxSize) { + return s.limit(maxSize); + } + + @Override + public Stream skip(long n) { + return s.skip(n); + } + + @Override + public void forEach(Consumer action) { + s.forEach(action); + } + + @Override + public void forEachOrdered(Consumer action) { + s.forEachOrdered(action); + } + + @Override + public Object[] toArray() { + return s.toArray(); + } + + @Override + public A[] toArray(IntFunction generator) { + return s.toArray(generator); + } + + @Override + public T reduce(T identity, BinaryOperator accumulator) { + return s.reduce(identity, accumulator); + } + + @Override + public Optional reduce(BinaryOperator accumulator) { + return s.reduce(accumulator); + } + + @Override + public U reduce(U identity, BiFunction accumulator, BinaryOperator combiner) { + return s.reduce(identity, accumulator, combiner); + } + + @Override + public R collect(Supplier supplier, BiConsumer accumulator, BiConsumer combiner) { + return s.collect(supplier, accumulator, combiner); + } + + @Override + public R collect(Collector collector) { + return s.collect(collector); + } + + @Override + public Optional min(Comparator comparator) { + return s.min(comparator); + } + + @Override + public Optional max(Comparator comparator) { + return s.max(comparator); + } + + @Override + public long count() { + return s.count(); + } + + @Override + public boolean anyMatch(Predicate predicate) { + return s.anyMatch(predicate); + } + + @Override + public boolean allMatch(Predicate predicate) { + return s.allMatch(predicate); + } + + @Override + public boolean noneMatch(Predicate predicate) { + return s.noneMatch(predicate); + } + + @Override + public Optional findFirst() { + return s.findFirst(); + } + + @Override + public Optional findAny() { + return s.findAny(); + } + + @Override + public Iterator iterator() { + return s.iterator(); + } + + @Override + public Spliterator spliterator() { + return s.spliterator(); + } + + @Override + public boolean isParallel() { + return s.isParallel(); + } + + @Override + public Stream sequential() { + return s.sequential(); + } + + @Override + public Stream parallel() { + return s.parallel(); + } + + @Override + public Stream unordered() { + return s.unordered(); + } + + @Override + public Stream onClose(Runnable closeHandler) { + return s.onClose(closeHandler); + } + + @Override + public void close() { + s.close(); + } + } + + static final class DefaultMethodIntStream implements IntStream { + final IntStream s; + + public DefaultMethodIntStream(IntStream s) { + this.s = s; + } + + + // Delegating non-default methods + + @Override + public IntStream filter(IntPredicate predicate) { + return s.filter(predicate); + } + + @Override + public IntStream map(IntUnaryOperator mapper) { + return s.map(mapper); + } + + @Override + public Stream mapToObj(IntFunction mapper) { + return s.mapToObj(mapper); + } + + @Override + public LongStream mapToLong(IntToLongFunction mapper) { + return s.mapToLong(mapper); + } + + @Override + public DoubleStream mapToDouble(IntToDoubleFunction mapper) { + return s.mapToDouble(mapper); + } + + @Override + public IntStream flatMap(IntFunction mapper) { + return s.flatMap(mapper); + } + + @Override + public IntStream distinct() { + return s.distinct(); + } + + @Override + public IntStream sorted() { + return s.sorted(); + } + + @Override + public IntStream peek(IntConsumer action) { + return s.peek(action); + } + + @Override + public IntStream limit(long maxSize) { + return s.limit(maxSize); + } + + @Override + public IntStream skip(long n) { + return s.skip(n); + } + + @Override + public void forEach(IntConsumer action) { + s.forEach(action); + } + + @Override + public void forEachOrdered(IntConsumer action) { + s.forEachOrdered(action); + } + + @Override + public int[] toArray() { + return s.toArray(); + } + + @Override + public int reduce(int identity, IntBinaryOperator op) { + return s.reduce(identity, op); + } + + @Override + public OptionalInt reduce(IntBinaryOperator op) { + return s.reduce(op); + } + + @Override + public R collect(Supplier supplier, ObjIntConsumer accumulator, BiConsumer combiner) { + return s.collect(supplier, accumulator, combiner); + } + + @Override + public int sum() { + return s.sum(); + } + + @Override + public OptionalInt min() { + return s.min(); + } + + @Override + public OptionalInt max() { + return s.max(); + } + + @Override + public long count() { + return s.count(); + } + + @Override + public OptionalDouble average() { + return s.average(); + } + + @Override + public IntSummaryStatistics summaryStatistics() { + return s.summaryStatistics(); + } + + @Override + public boolean anyMatch(IntPredicate predicate) { + return s.anyMatch(predicate); + } + + @Override + public boolean allMatch(IntPredicate predicate) { + return s.allMatch(predicate); + } + + @Override + public boolean noneMatch(IntPredicate predicate) { + return s.noneMatch(predicate); + } + + @Override + public OptionalInt findFirst() { + return s.findFirst(); + } + + @Override + public OptionalInt findAny() { + return s.findAny(); + } + + @Override + public LongStream asLongStream() { + return s.asLongStream(); + } + + @Override + public DoubleStream asDoubleStream() { + return s.asDoubleStream(); + } + + @Override + public Stream boxed() { + return s.boxed(); + } + + @Override + public IntStream sequential() { + return s.sequential(); + } + + @Override + public IntStream parallel() { + return s.parallel(); + } + + @Override + public PrimitiveIterator.OfInt iterator() { + return s.iterator(); + } + + @Override + public Spliterator.OfInt spliterator() { + return s.spliterator(); + } + + @Override + public boolean isParallel() { + return s.isParallel(); + } + + @Override + public IntStream unordered() { + return s.unordered(); + } + + @Override + public IntStream onClose(Runnable closeHandler) { + return s.onClose(closeHandler); + } + + @Override + public void close() { + s.close(); + } + } + + static final class DefaultMethodLongStream implements LongStream { + final LongStream s; + + public DefaultMethodLongStream(LongStream s) { + this.s = s; + } + + + // Delegating non-default methods + + @Override + public void forEach(LongConsumer action) { + s.forEach(action); + } + + @Override + public LongStream filter(LongPredicate predicate) { + return s.filter(predicate); + } + + @Override + public LongStream map(LongUnaryOperator mapper) { + return s.map(mapper); + } + + @Override + public Stream mapToObj(LongFunction mapper) { + return s.mapToObj(mapper); + } + + @Override + public IntStream mapToInt(LongToIntFunction mapper) { + return s.mapToInt(mapper); + } + + @Override + public DoubleStream mapToDouble(LongToDoubleFunction mapper) { + return s.mapToDouble(mapper); + } + + @Override + public LongStream flatMap(LongFunction mapper) { + return s.flatMap(mapper); + } + + @Override + public LongStream distinct() { + return s.distinct(); + } + + @Override + public LongStream sorted() { + return s.sorted(); + } + + @Override + public LongStream peek(LongConsumer action) { + return s.peek(action); + } + + @Override + public LongStream limit(long maxSize) { + return s.limit(maxSize); + } + + @Override + public LongStream skip(long n) { + return s.skip(n); + } + + @Override + public void forEachOrdered(LongConsumer action) { + s.forEachOrdered(action); + } + + @Override + public long[] toArray() { + return s.toArray(); + } + + @Override + public long reduce(long identity, LongBinaryOperator op) { + return s.reduce(identity, op); + } + + @Override + public OptionalLong reduce(LongBinaryOperator op) { + return s.reduce(op); + } + + @Override + public R collect(Supplier supplier, ObjLongConsumer accumulator, BiConsumer combiner) { + return s.collect(supplier, accumulator, combiner); + } + + @Override + public long sum() { + return s.sum(); + } + + @Override + public OptionalLong min() { + return s.min(); + } + + @Override + public OptionalLong max() { + return s.max(); + } + + @Override + public long count() { + return s.count(); + } + + @Override + public OptionalDouble average() { + return s.average(); + } + + @Override + public LongSummaryStatistics summaryStatistics() { + return s.summaryStatistics(); + } + + @Override + public boolean anyMatch(LongPredicate predicate) { + return s.anyMatch(predicate); + } + + @Override + public boolean allMatch(LongPredicate predicate) { + return s.allMatch(predicate); + } + + @Override + public boolean noneMatch(LongPredicate predicate) { + return s.noneMatch(predicate); + } + + @Override + public OptionalLong findFirst() { + return s.findFirst(); + } + + @Override + public OptionalLong findAny() { + return s.findAny(); + } + + @Override + public DoubleStream asDoubleStream() { + return s.asDoubleStream(); + } + + @Override + public Stream boxed() { + return s.boxed(); + } + + @Override + public LongStream sequential() { + return s.sequential(); + } + + @Override + public LongStream parallel() { + return s.parallel(); + } + + @Override + public PrimitiveIterator.OfLong iterator() { + return s.iterator(); + } + + @Override + public Spliterator.OfLong spliterator() { + return s.spliterator(); + } + + @Override + public boolean isParallel() { + return s.isParallel(); + } + + @Override + public LongStream unordered() { + return s.unordered(); + } + + @Override + public LongStream onClose(Runnable closeHandler) { + return s.onClose(closeHandler); + } + + @Override + public void close() { + s.close(); + } + } + + static final class DefaultMethodDoubleStream implements DoubleStream { + final DoubleStream s; + + public DefaultMethodDoubleStream(DoubleStream s) { + this.s = s; + } + + @Override + public DoubleStream filter(DoublePredicate predicate) { + return s.filter(predicate); + } + + @Override + public DoubleStream map(DoubleUnaryOperator mapper) { + return s.map(mapper); + } + + @Override + public Stream mapToObj(DoubleFunction mapper) { + return s.mapToObj(mapper); + } + + @Override + public IntStream mapToInt(DoubleToIntFunction mapper) { + return s.mapToInt(mapper); + } + + @Override + public LongStream mapToLong(DoubleToLongFunction mapper) { + return s.mapToLong(mapper); + } + + @Override + public DoubleStream flatMap(DoubleFunction mapper) { + return s.flatMap(mapper); + } + + @Override + public DoubleStream distinct() { + return s.distinct(); + } + + @Override + public DoubleStream sorted() { + return s.sorted(); + } + + @Override + public DoubleStream peek(DoubleConsumer action) { + return s.peek(action); + } + + @Override + public DoubleStream limit(long maxSize) { + return s.limit(maxSize); + } + + @Override + public DoubleStream skip(long n) { + return s.skip(n); + } + + @Override + public void forEach(DoubleConsumer action) { + s.forEach(action); + } + + @Override + public void forEachOrdered(DoubleConsumer action) { + s.forEachOrdered(action); + } + + @Override + public double[] toArray() { + return s.toArray(); + } + + @Override + public double reduce(double identity, DoubleBinaryOperator op) { + return s.reduce(identity, op); + } + + @Override + public OptionalDouble reduce(DoubleBinaryOperator op) { + return s.reduce(op); + } + + @Override + public R collect(Supplier supplier, ObjDoubleConsumer accumulator, BiConsumer combiner) { + return s.collect(supplier, accumulator, combiner); + } + + @Override + public double sum() { + return s.sum(); + } + + @Override + public OptionalDouble min() { + return s.min(); + } + + @Override + public OptionalDouble max() { + return s.max(); + } + + @Override + public long count() { + return s.count(); + } + + @Override + public OptionalDouble average() { + return s.average(); + } + + @Override + public DoubleSummaryStatistics summaryStatistics() { + return s.summaryStatistics(); + } + + @Override + public boolean anyMatch(DoublePredicate predicate) { + return s.anyMatch(predicate); + } + + @Override + public boolean allMatch(DoublePredicate predicate) { + return s.allMatch(predicate); + } + + @Override + public boolean noneMatch(DoublePredicate predicate) { + return s.noneMatch(predicate); + } + + @Override + public OptionalDouble findFirst() { + return s.findFirst(); + } + + @Override + public OptionalDouble findAny() { + return s.findAny(); + } + + @Override + public Stream boxed() { + return s.boxed(); + } + + @Override + public DoubleStream sequential() { + return s.sequential(); + } + + @Override + public DoubleStream parallel() { + return s.parallel(); + } + + @Override + public PrimitiveIterator.OfDouble iterator() { + return s.iterator(); + } + + @Override + public Spliterator.OfDouble spliterator() { + return s.spliterator(); + } + + @Override + public boolean isParallel() { + return s.isParallel(); + } + + @Override + public DoubleStream unordered() { + return s.unordered(); + } + + @Override + public DoubleStream onClose(Runnable closeHandler) { + return s.onClose(closeHandler); + } + + @Override + public void close() { + s.close(); + } + } +} \ No newline at end of file diff --git a/jdk/test/java/util/stream/bootlib/java/util/stream/StreamTestDataProvider.java b/jdk/test/java/util/stream/bootlib/java/util/stream/StreamTestDataProvider.java index cc98529df7f..6f772f391ee 100644 --- a/jdk/test/java/util/stream/bootlib/java/util/stream/StreamTestDataProvider.java +++ b/jdk/test/java/util/stream/bootlib/java/util/stream/StreamTestDataProvider.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -119,7 +119,7 @@ public class StreamTestDataProvider { // Simple combination of numbers and null values, probably excessive but may catch // errors for initialization/termination/sequence - // @@@ This is separate from the other data for now until nulls are consitently supported by + // @@@ This is separate from the other data for now until nulls are consistently supported by // all operations { List list = new ArrayList<>(); diff --git a/jdk/test/java/util/stream/test/org/openjdk/tests/java/util/stream/WhileOpStatefulTest.java b/jdk/test/java/util/stream/test/org/openjdk/tests/java/util/stream/WhileOpStatefulTest.java new file mode 100644 index 00000000000..4c39ae6022d --- /dev/null +++ b/jdk/test/java/util/stream/test/org/openjdk/tests/java/util/stream/WhileOpStatefulTest.java @@ -0,0 +1,304 @@ +/* + * Copyright (c) 2015, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ +package org.openjdk.tests.java.util.stream; + +import org.testng.annotations.Test; + +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BooleanSupplier; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.DefaultMethodStreams; +import java.util.stream.DoubleStream; +import java.util.stream.IntStream; +import java.util.stream.LongStream; +import java.util.stream.OpTestCase; +import java.util.stream.Stream; + +import static java.util.stream.Collectors.toCollection; + +/* + * @test + * @bug 8071597 + */ +@Test +public class WhileOpStatefulTest extends OpTestCase { + static final long COUNT_PERIOD = 100; + + static final long EXECUTION_TIME_LIMIT = TimeUnit.SECONDS.toMillis(10); + + static final long TAKE_WHILE_COUNT_LIMIT = 100_000; + + static final int DROP_SOURCE_SIZE = 10_000; + + static final long DROP_WHILE_COUNT_LIMIT = 5000; + + @Test + public void testTimedTakeWithCount() { + testTakeWhileMulti( + s -> { + BooleanSupplier isWithinTakePeriod = + within(System.currentTimeMillis(), COUNT_PERIOD); + s.takeWhile(e -> isWithinTakePeriod.getAsBoolean()) + .mapToLong(e -> 1).reduce(0, Long::sum); + }, + s -> { + BooleanSupplier isWithinTakePeriod = + within(System.currentTimeMillis(), COUNT_PERIOD); + s.takeWhile(e -> isWithinTakePeriod.getAsBoolean()) + .mapToLong(e -> 1).reduce(0, Long::sum); + }, + s -> { + BooleanSupplier isWithinTakePeriod = + within(System.currentTimeMillis(), COUNT_PERIOD); + s.takeWhile(e -> isWithinTakePeriod.getAsBoolean()) + .map(e -> 1).reduce(0, Long::sum); + }, + s -> { + BooleanSupplier isWithinTakePeriod = + within(System.currentTimeMillis(), COUNT_PERIOD); + s.takeWhile(e -> isWithinTakePeriod.getAsBoolean()) + .mapToLong(e -> 1).reduce(0, Long::sum); + }); + } + + @Test + public void testCountTakeWithCount() { + testTakeWhileMulti( + s -> { + AtomicLong c = new AtomicLong(); + long rc = s.takeWhile(e -> c.getAndIncrement() < TAKE_WHILE_COUNT_LIMIT) + .mapToLong(e -> 1).reduce(0, Long::sum); + assertTrue(rc <= c.get()); + }, + s -> { + AtomicLong c = new AtomicLong(); + long rc = s.takeWhile(e -> c.getAndIncrement() < TAKE_WHILE_COUNT_LIMIT) + .mapToLong(e -> 1).reduce(0, Long::sum); + assertTrue(rc <= c.get()); + }, + s -> { + AtomicLong c = new AtomicLong(); + long rc = s.takeWhile(e -> c.getAndIncrement() < TAKE_WHILE_COUNT_LIMIT) + .map(e -> 1).reduce(0, Long::sum); + assertTrue(rc <= c.get()); + }, + s -> { + AtomicLong c = new AtomicLong(); + long rc = s.takeWhile(e -> c.getAndIncrement() < TAKE_WHILE_COUNT_LIMIT) + .mapToLong(e -> 1).reduce(0, Long::sum); + assertTrue(rc <= c.get()); + }); + } + + @Test + public void testCountTakeWithToArray() { + testTakeWhileMulti( + s -> { + AtomicLong c = new AtomicLong(); + Object[] ra = s.takeWhile(e -> c.getAndIncrement() < TAKE_WHILE_COUNT_LIMIT) + .toArray(); + assertTrue(ra.length <= c.get()); + }, + s -> { + AtomicLong c = new AtomicLong(); + int[] ra = s.takeWhile(e -> c.getAndIncrement() < TAKE_WHILE_COUNT_LIMIT) + .toArray(); + assertTrue(ra.length <= c.get()); + }, + s -> { + AtomicLong c = new AtomicLong(); + long[] ra = s.takeWhile(e -> c.getAndIncrement() < TAKE_WHILE_COUNT_LIMIT) + .toArray(); + assertTrue(ra.length <= c.get()); + }, + s -> { + AtomicLong c = new AtomicLong(); + double[] ra = s.takeWhile(e -> c.getAndIncrement() < TAKE_WHILE_COUNT_LIMIT) + .toArray(); + assertTrue(ra.length <= c.get()); + }); + } + + + @Test + public void testCountDropWithCount() { + testDropWhileMulti( + s -> { + AtomicLong c = new AtomicLong(); + long rc = s.dropWhile(e -> c.getAndIncrement() < DROP_WHILE_COUNT_LIMIT) + .mapToLong(e -> 1).reduce(0, Long::sum); + assertTrue(c.get() >= DROP_WHILE_COUNT_LIMIT); + assertTrue(rc <= DROP_SOURCE_SIZE); + }, + s -> { + AtomicLong c = new AtomicLong(); + long rc = s.dropWhile(e -> c.getAndIncrement() < DROP_WHILE_COUNT_LIMIT) + .mapToLong(e -> 1).reduce(0, Long::sum); + assertTrue(c.get() >= DROP_WHILE_COUNT_LIMIT); + assertTrue(rc <= DROP_SOURCE_SIZE); + }, + s -> { + AtomicLong c = new AtomicLong(); + long rc = s.dropWhile(e -> c.getAndIncrement() < DROP_WHILE_COUNT_LIMIT) + .map(e -> 1).reduce(0, Long::sum); + assertTrue(c.get() >= DROP_WHILE_COUNT_LIMIT); + assertTrue(rc <= DROP_SOURCE_SIZE); + }, + s -> { + AtomicLong c = new AtomicLong(); + long rc = s.dropWhile(e -> c.getAndIncrement() < DROP_WHILE_COUNT_LIMIT) + .mapToLong(e -> 1).reduce(0, Long::sum); + assertTrue(c.get() >= DROP_WHILE_COUNT_LIMIT); + assertTrue(rc <= DROP_SOURCE_SIZE); + }); + } + + @Test + public void testCountDropWithToArray() { + testDropWhileMulti( + s -> { + AtomicLong c = new AtomicLong(); + Object[] ra = s.dropWhile(e -> c.getAndIncrement() < DROP_WHILE_COUNT_LIMIT) + .toArray(); + assertTrue(c.get() >= DROP_WHILE_COUNT_LIMIT); + assertTrue(ra.length <= DROP_SOURCE_SIZE); + }, + s -> { + AtomicLong c = new AtomicLong(); + int[] ra = s.dropWhile(e -> c.getAndIncrement() < DROP_WHILE_COUNT_LIMIT) + .toArray(); + assertTrue(c.get() >= DROP_WHILE_COUNT_LIMIT); + assertTrue(ra.length <= DROP_SOURCE_SIZE); + }, + s -> { + AtomicLong c = new AtomicLong(); + long[] ra = s.dropWhile(e -> c.getAndIncrement() < DROP_WHILE_COUNT_LIMIT) + .toArray(); + assertTrue(c.get() >= DROP_WHILE_COUNT_LIMIT); + assertTrue(ra.length <= DROP_SOURCE_SIZE); + }, + s -> { + AtomicLong c = new AtomicLong(); + double[] ra = s.dropWhile(e -> c.getAndIncrement() < DROP_WHILE_COUNT_LIMIT) + .toArray(); + assertTrue(c.get() >= DROP_WHILE_COUNT_LIMIT); + assertTrue(ra.length <= DROP_SOURCE_SIZE); + }); + } + + + private void testTakeWhileMulti(Consumer> mRef, + Consumer mInt, + Consumer mLong, + Consumer mDouble) { + Map>> sources = new HashMap<>(); + sources.put("Stream.generate()", () -> Stream.generate(() -> 1)); + sources.put("Stream.iterate()", () -> Stream.iterate(1, x -> 1)); + sources.put("Stream.iterate().unordered()", () -> Stream.iterate(1, x -> 1)); + testWhileMulti(sources, mRef, mInt, mLong, mDouble); + } + + private void testDropWhileMulti(Consumer> mRef, + Consumer mInt, + Consumer mLong, + Consumer mDouble) { + Map>> sources = new HashMap<>(); + sources.put("IntStream.range().boxed()", + () -> IntStream.range(0, DROP_SOURCE_SIZE).boxed()); + sources.put("IntStream.range().boxed().unordered()", + () -> IntStream.range(0, DROP_SOURCE_SIZE).boxed().unordered()); + sources.put("LinkedList.stream()", + () -> IntStream.range(0, DROP_SOURCE_SIZE).boxed() + .collect(toCollection(LinkedList::new)) + .stream()); + sources.put("LinkedList.stream().unordered()", + () -> IntStream.range(0, DROP_SOURCE_SIZE).boxed() + .collect(toCollection(LinkedList::new)) + .stream() + .unordered()); + testWhileMulti(sources, mRef, mInt, mLong, mDouble); + } + + private void testWhileMulti(Map>> sources, + Consumer> mRef, + Consumer mInt, + Consumer mLong, + Consumer mDouble) { + Map, Stream>> transforms = new HashMap<>(); + transforms.put("Stream.sequential()", s -> { + BooleanSupplier isWithinExecutionPeriod = within(System.currentTimeMillis(), + EXECUTION_TIME_LIMIT); + return s.peek(e -> { + if (!isWithinExecutionPeriod.getAsBoolean()) { + throw new RuntimeException(); + } + }); + }); + transforms.put("Stream.parallel()", s -> { + BooleanSupplier isWithinExecutionPeriod = within(System.currentTimeMillis(), + EXECUTION_TIME_LIMIT); + return s.parallel() + .peek(e -> { + if (!isWithinExecutionPeriod.getAsBoolean()) { + throw new RuntimeException(); + } + }); + }); + + Map>> actions = new HashMap<>(); + actions.put("Ref", mRef); + actions.put("Int", s -> mInt.accept(s.mapToInt(e -> e))); + actions.put("Long", s -> mLong.accept(s.mapToLong(e -> e))); + actions.put("Double", s -> mDouble.accept(s.mapToDouble(e -> e))); + actions.put("Ref using defaults", s -> mRef.accept(DefaultMethodStreams.delegateTo(s))); + actions.put("Int using defaults", s -> mInt.accept(DefaultMethodStreams.delegateTo(s.mapToInt(e -> e)))); + actions.put("Long using defaults", s -> mLong.accept(DefaultMethodStreams.delegateTo(s.mapToLong(e -> e)))); + actions.put("Double using defaults", s -> mDouble.accept(DefaultMethodStreams.delegateTo(s.mapToDouble(e -> e)))); + + for (Map.Entry>> s : sources.entrySet()) { + setContext("source", s.getKey()); + + for (Map.Entry, Stream>> t : transforms.entrySet()) { + setContext("transform", t.getKey()); + + for (Map.Entry>> a : actions.entrySet()) { + setContext("shape", a.getKey()); + + Stream stream = s.getValue().get(); + stream = t.getValue().apply(stream); + a.getValue().accept(stream); + } + } + } + } + + static BooleanSupplier within(long start, long durationInMillis) { + return () -> (System.currentTimeMillis() - start) < durationInMillis; + } +} diff --git a/jdk/test/java/util/stream/test/org/openjdk/tests/java/util/stream/WhileOpTest.java b/jdk/test/java/util/stream/test/org/openjdk/tests/java/util/stream/WhileOpTest.java new file mode 100644 index 00000000000..b710791fb4e --- /dev/null +++ b/jdk/test/java/util/stream/test/org/openjdk/tests/java/util/stream/WhileOpTest.java @@ -0,0 +1,361 @@ +/* + * Copyright (c) 2015, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ +package org.openjdk.tests.java.util.stream; + +import org.testng.annotations.Test; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.stream.DefaultMethodStreams; +import java.util.stream.DoubleStream; +import java.util.stream.IntStream; +import java.util.stream.LambdaTestHelpers; +import java.util.stream.LongStream; +import java.util.stream.OpTestCase; +import java.util.stream.Stream; +import java.util.stream.StreamTestDataProvider; +import java.util.stream.TestData; + +/* + * @test + * @bug 8071597 + */ +@Test +public class WhileOpTest extends OpTestCase { + + @Test(dataProvider = "StreamTestData", dataProviderClass = StreamTestDataProvider.class) + public void testTakeWhileOps(String name, TestData.OfRef data) { + for (int size : sizes(data.size())) { + setContext("takeWhile", size); + + testWhileMulti(data, + whileResultAsserter(data, WhileOp.Take, e -> e < size), + s -> s.takeWhile(e -> e < size), + s -> s.takeWhile(e -> e < size), + s -> s.takeWhile(e -> e < size), + s -> s.takeWhile(e -> e < size)); + + + testWhileMulti(data, + whileResultAsserter(data, WhileOp.Take, e -> e < size / 2), + s -> s.takeWhile(e -> e < size).takeWhile(e -> e < size / 2), + s -> s.takeWhile(e -> e < size).takeWhile(e -> e < size / 2), + s -> s.takeWhile(e -> e < size).takeWhile(e -> e < size / 2), + s -> s.takeWhile(e -> e < size).takeWhile(e -> e < size / 2)); + } + } + + @Test(dataProvider = "StreamTestData", dataProviderClass = StreamTestDataProvider.class) + public void testDropWhileOps(String name, TestData.OfRef data) { + for (int size : sizes(data.size())) { + setContext("dropWhile", size); + + testWhileMulti(data, + whileResultAsserter(data, WhileOp.Drop, e -> e < size), + s -> s.dropWhile(e -> e < size), + s -> s.dropWhile(e -> e < size), + s -> s.dropWhile(e -> e < size), + s -> s.dropWhile(e -> e < size)); + + testWhileMulti(data, + whileResultAsserter(data, WhileOp.Drop, e -> e < size), + s -> s.dropWhile(e -> e < size / 2).dropWhile(e -> e < size), + s -> s.dropWhile(e -> e < size / 2).dropWhile(e -> e < size), + s -> s.dropWhile(e -> e < size / 2).dropWhile(e -> e < size), + s -> s.dropWhile(e -> e < size / 2).dropWhile(e -> e < size)); + } + } + + @Test(dataProvider = "StreamTestData", dataProviderClass = StreamTestDataProvider.class) + public void testDropTakeWhileOps(String name, TestData.OfRef data) { + for (int size : sizes(data.size())) { + setContext("dropWhile", size); + + testWhileMulti(data, + whileResultAsserter(data, WhileOp.Undefined, null), + s -> s.dropWhile(e -> e < size / 2).takeWhile(e -> e < size), + s -> s.dropWhile(e -> e < size / 2).takeWhile(e -> e < size), + s -> s.dropWhile(e -> e < size / 2).takeWhile(e -> e < size), + s -> s.dropWhile(e -> e < size / 2).takeWhile(e -> e < size)); + } + } + + /** + * While operation type to be asserted on + */ + enum WhileOp { + /** + * The takeWhile operation + */ + Take, + /** + * The dropWhile operation + */ + Drop, + /** + * The operation(s) are undefined + */ + Undefined + } + + /** + * Create a result asserter for takeWhile or dropWhile operations. + *

+ * If the stream pipeline consists of the takeWhile operation + * ({@link WhileOp#Take}) or the dropWhile operation ({@link WhileOp#Drop}) + * then specific assertions can be made on the actual result based on the + * input elements, {@code inputData}, and whether those elements match the + * predicate, {@code p}, of the operation. + *

+ * If the input elements have an encounter order then the actual result + * is asserted against the result of operating sequentially on input + * elements given the predicate and in accordance with the operation + * semantics. (The actual result whether produced sequentially or in + * parallel should the same.) + *

+ * If the input elements have no encounter order then an actual result + * is, for practical purposes, considered non-deterministic. + * Consider an input list of lists that contains all possible permutations + * of the input elements, and a output list of lists that is the result of + * applying the pipeline with the operation sequentially to each input + * list. + * Any list in the output lists is a valid result. It's not practical to + * test in such a manner. + * For a takeWhile operation the following assertions can be made if + * only some of the input elements match the predicate (i.e. taking will + * short-circuit the pipeline): + *

    + *
  1. The set of output elements is a subset of the set of matching + * input elements
  2. + *
  3. The set of output elements and the set of non-matching input + * element are disjoint
  4. + *
+ * For a dropWhile operation the following assertions can be made: + *
    + *
  1. The set of non-matching input elements is a subset of the set of + * output elements
  2. + *
  3. The set of matching output elements is a subset of the set of + * matching input elements
  4. + *
+ * + * @param inputData the elements input into the stream pipeline + * @param op the operation of the stream pipeline, one of takeWhile, + * dropWhile, or an undefined set of operations (possibly including + * two or more takeWhile and/or dropWhile operations, or because + * the predicate is not stateless). + * @param p the stateless predicate applied to the operation, ignored if + * the + * operation is {@link WhileOp#Undefined}. + * @param the type of elements + * @return a result asserter + */ + private ResultAsserter> whileResultAsserter(Iterable inputData, + WhileOp op, + Predicate p) { + return (act, exp, ord, par) -> { + if (par & !ord) { + List input = new ArrayList<>(); + inputData.forEach(input::add); + + List output = new ArrayList<>(); + act.forEach(output::add); + + if (op == WhileOp.Take) { + List matchingInput = new ArrayList<>(); + List nonMatchingInput = new ArrayList<>(); + input.forEach(t -> { + if (p.test(t)) + matchingInput.add(t); + else + nonMatchingInput.add(t); + }); + + // If some, not all, elements are taken + if (matchingInput.size() < input.size()) { + assertTrue(output.size() <= matchingInput.size(), + "Output is larger than the matching input"); + + // The output must be a subset of the matching input + assertTrue(matchingInput.containsAll(output), + "Output is not a subset of the matching input"); + + // The output must not contain any non matching elements + for (T nonMatching : nonMatchingInput) { + assertFalse(output.contains(nonMatching), + "Output and non-matching input are not disjoint"); + } + } + } + else if (op == WhileOp.Drop) { + List matchingInput = new ArrayList<>(); + List nonMatchingInput = new ArrayList<>(); + input.forEach(t -> { + if (p.test(t)) + matchingInput.add(t); + else + nonMatchingInput.add(t); + }); + + // The non matching input must be a subset of output + assertTrue(output.containsAll(nonMatchingInput), + "Non-matching input is not a subset of the output"); + + // The matching output must be a subset of the matching input + List matchingOutput = new ArrayList<>(); + output.forEach(i -> { + if (p.test(i)) + matchingOutput.add(i); + }); + assertTrue(matchingInput.containsAll(matchingOutput), + "Matching output is not a subset of matching input"); + } + + // Note: if there is a combination of takeWhile and dropWhile then specific + // assertions cannot be performed. + // All that can be reliably asserted is the output is a subset of the input + + assertTrue(input.containsAll(output)); + } + else { + // For specific operations derive expected result from the input + if (op == WhileOp.Take) { + List takeInput = new ArrayList<>(); + for (T t : inputData) { + if (p.test(t)) + takeInput.add(t); + else + break; + } + + LambdaTestHelpers.assertContents(act, takeInput); + } + else if (op == WhileOp.Drop) { + List dropInput = new ArrayList<>(); + for (T t : inputData) { + if (dropInput.size() > 0 || !p.test(t)) + dropInput.add(t); + } + + LambdaTestHelpers.assertContents(act, dropInput); + } + + LambdaTestHelpers.assertContents(act, exp); + } + }; + } + + private Collection sizes(int s) { + Set sizes = new LinkedHashSet<>(); + + sizes.add(0); + sizes.add(1); + sizes.add(s / 4); + sizes.add(s / 2); + sizes.add(3 * s / 4); + sizes.add(Math.max(0, s - 1)); + sizes.add(s); + sizes.add(Integer.MAX_VALUE); + + return sizes; + } + + private void testWhileMulti(TestData.OfRef data, + ResultAsserter> ra, + Function, Stream> mRef, + Function mInt, + Function mLong, + Function mDouble) { + Map, Stream>> ms = new HashMap<>(); + ms.put("Ref", mRef); + ms.put("Int", s -> mInt.apply(s.mapToInt(e -> e)).mapToObj(e -> e)); + ms.put("Long", s -> mLong.apply(s.mapToLong(e -> e)).mapToObj(e -> (int) e)); + ms.put("Double", s -> mDouble.apply(s.mapToDouble(e -> e)).mapToObj(e -> (int) e)); + ms.put("Ref using defaults", s -> mRef.apply(DefaultMethodStreams.delegateTo(s))); + ms.put("Int using defaults", s -> mInt.apply(DefaultMethodStreams.delegateTo(s.mapToInt(e -> e))).mapToObj(e -> e)); + ms.put("Long using defaults", s -> mLong.apply(DefaultMethodStreams.delegateTo(s.mapToLong(e -> e))).mapToObj(e -> (int) e)); + ms.put("Double using defaults", s -> mDouble.apply(DefaultMethodStreams.delegateTo(s.mapToDouble(e -> e))).mapToObj(e -> (int) e)); + + testWhileMulti(data, ra, ms); + } + + private final void testWhileMulti(TestData.OfRef data, + ResultAsserter> ra, + Map, Stream>> ms) { + for (Map.Entry, Stream>> e : ms.entrySet()) { + setContext("shape", e.getKey()); + + withData(data) + .stream(e.getValue()) + .resultAsserter(ra) + .exercise(); + } + } + + @Test + public void testRefDefaultClose() { + AtomicBoolean isClosed = new AtomicBoolean(); + Stream s = Stream.of(1, 2, 3).onClose(() -> isClosed.set(true)); + try (Stream ds = DefaultMethodStreams.delegateTo(s).takeWhile(e -> e < 3)) { + ds.count(); + } + assertTrue(isClosed.get()); + } + + @Test + public void testIntDefaultClose() { + AtomicBoolean isClosed = new AtomicBoolean(); + IntStream s = IntStream.of(1, 2, 3).onClose(() -> isClosed.set(true)); + try (IntStream ds = DefaultMethodStreams.delegateTo(s).takeWhile(e -> e < 3)) { + ds.count(); + } + assertTrue(isClosed.get()); + } + + @Test + public void testLongDefaultClose() { + AtomicBoolean isClosed = new AtomicBoolean(); + LongStream s = LongStream.of(1, 2, 3).onClose(() -> isClosed.set(true)); + try (LongStream ds = DefaultMethodStreams.delegateTo(s).takeWhile(e -> e < 3)) { + ds.count(); + } + assertTrue(isClosed.get()); + } + + @Test + public void testDoubleDefaultClose() { + AtomicBoolean isClosed = new AtomicBoolean(); + DoubleStream s = DoubleStream.of(1, 2, 3).onClose(() -> isClosed.set(true)); + try (DoubleStream ds = DefaultMethodStreams.delegateTo(s).takeWhile(e -> e < 3)) { + ds.count(); + } + assertTrue(isClosed.get()); + } +}