8071597: Add Stream dropWhile and takeWhile operations

Reviewed-by: briangoetz, smarks, chegar, forax
This commit is contained in:
Paul Sandoz 2015-06-09 07:10:03 +01:00
parent 5d91ae3352
commit 5744f4fc30
18 changed files with 3665 additions and 41 deletions

View File

@ -489,15 +489,17 @@ abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>
@Override @Override
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
final <P_IN> void copyIntoWithCancel(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) { final <P_IN> boolean copyIntoWithCancel(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
@SuppressWarnings({"rawtypes","unchecked"}) @SuppressWarnings({"rawtypes","unchecked"})
AbstractPipeline p = AbstractPipeline.this; AbstractPipeline p = AbstractPipeline.this;
while (p.depth > 0) { while (p.depth > 0) {
p = p.previousStage; p = p.previousStage;
} }
wrappedSink.begin(spliterator.getExactSizeIfKnown()); wrappedSink.begin(spliterator.getExactSizeIfKnown());
p.forEachWithCancel(spliterator, wrappedSink); boolean cancelled = p.forEachWithCancel(spliterator, wrappedSink);
wrappedSink.end(); wrappedSink.end();
return cancelled;
} }
@Override @Override
@ -602,8 +604,9 @@ abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>
* *
* @param spliterator the spliterator to pull elements from * @param spliterator the spliterator to pull elements from
* @param sink the sink to push elements to * @param sink the sink to push elements to
* @return true if the cancellation was requested
*/ */
abstract void forEachWithCancel(Spliterator<E_OUT> spliterator, Sink<E_OUT> sink); abstract boolean forEachWithCancel(Spliterator<E_OUT> spliterator, Sink<E_OUT> sink);
/** /**
* Make a node builder compatible with this stream shape. * Make a node builder compatible with this stream shape.

View File

@ -40,6 +40,7 @@ import java.util.function.DoubleToIntFunction;
import java.util.function.DoubleToLongFunction; import java.util.function.DoubleToLongFunction;
import java.util.function.DoubleUnaryOperator; import java.util.function.DoubleUnaryOperator;
import java.util.function.IntFunction; import java.util.function.IntFunction;
import java.util.function.LongPredicate;
import java.util.function.ObjDoubleConsumer; import java.util.function.ObjDoubleConsumer;
import java.util.function.Supplier; import java.util.function.Supplier;
@ -153,10 +154,12 @@ abstract class DoublePipeline<E_IN>
} }
@Override @Override
final void forEachWithCancel(Spliterator<Double> spliterator, Sink<Double> sink) { final boolean forEachWithCancel(Spliterator<Double> spliterator, Sink<Double> sink) {
Spliterator.OfDouble spl = adapt(spliterator); Spliterator.OfDouble spl = adapt(spliterator);
DoubleConsumer adaptedSink = adapt(sink); DoubleConsumer adaptedSink = adapt(sink);
do { } while (!sink.cancellationRequested() && spl.tryAdvance(adaptedSink)); boolean cancelled;
do { } while (!(cancelled = sink.cancellationRequested()) && spl.tryAdvance(adaptedSink));
return cancelled;
} }
@Override @Override
@ -352,6 +355,16 @@ abstract class DoublePipeline<E_IN>
} }
} }
@Override
public final DoubleStream takeWhile(DoublePredicate predicate) {
return WhileOps.makeTakeWhileDouble(this, predicate);
}
@Override
public final DoubleStream dropWhile(DoublePredicate predicate) {
return WhileOps.makeDropWhileDouble(this, predicate);
}
@Override @Override
public final DoubleStream sorted() { public final DoubleStream sorted() {
return SortedOps.makeDouble(this); return SortedOps.makeDouble(this);

View File

@ -1,5 +1,5 @@
/* /*
* Copyright (c) 2012, 2014, Oracle and/or its affiliates. All rights reserved. * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
* *
* This code is free software; you can redistribute it and/or modify it * This code is free software; you can redistribute it and/or modify it
@ -24,18 +24,13 @@
*/ */
package java.util.stream; package java.util.stream;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection;
import java.util.DoubleSummaryStatistics; import java.util.DoubleSummaryStatistics;
import java.util.Objects; import java.util.Objects;
import java.util.OptionalDouble; import java.util.OptionalDouble;
import java.util.PrimitiveIterator; import java.util.PrimitiveIterator;
import java.util.Spliterator; import java.util.Spliterator;
import java.util.Spliterators; import java.util.Spliterators;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;
import java.util.function.DoubleBinaryOperator; import java.util.function.DoubleBinaryOperator;
import java.util.function.DoubleConsumer; import java.util.function.DoubleConsumer;
@ -279,6 +274,137 @@ public interface DoubleStream extends BaseStream<Double, DoubleStream> {
*/ */
DoubleStream skip(long n); DoubleStream skip(long n);
/**
* Returns, if this stream is ordered, a stream consisting of the longest
* prefix of elements taken from this stream that match the given predicate.
* Otherwise returns, if this stream is unordered, a stream consisting of a
* subset of elements taken from this stream that match the given predicate.
*
* <p>If this stream is ordered then the longest prefix is a contiguous
* sequence of elements of this stream that match the given predicate. The
* first element of the sequence is the first element of this stream, and
* the element immediately following the last element of the sequence does
* not match the given predicate.
*
* <p>If this stream is unordered, and some (but not all) elements of this
* stream match the given predicate, then the behavior of this operation is
* nondeterministic; it is free to take any subset of matching elements
* (which includes the empty set).
*
* <p>Independent of whether this stream is ordered or unordered if all
* elements of this stream match the given predicate then this operation
* takes all elements (the result is the same is the input), or if no
* elements of the stream match the given predicate then no elements are
* taken (the result is an empty stream).
*
* <p>This is a <a href="package-summary.html#StreamOps">short-circuiting
* stateful intermediate operation</a>.
*
* @implSpec
* The default implementation obtains the {@link #spliterator() spliterator}
* of this stream, wraps that spliterator so as to support the semantics
* of this operation on traversal, and returns a new stream associated with
* the wrapped spliterator. The returned stream preserves the execution
* characteristics of this stream (namely parallel or sequential execution
* as per {@link #isParallel()}) but the wrapped spliterator may choose to
* not support splitting. When the returned stream is closed, the close
* handlers for both the returned and this stream are invoked.
*
* @apiNote
* While {@code takeWhile()} is generally a cheap operation on sequential
* stream pipelines, it can be quite expensive on ordered parallel
* pipelines, since the operation is constrained to return not just any
* valid prefix, but the longest prefix of elements in the encounter order.
* Using an unordered stream source (such as
* {@link #generate(DoubleSupplier)}) or removing the ordering constraint
* with {@link #unordered()} may result in significant speedups of
* {@code takeWhile()} in parallel pipelines, if the semantics of your
* situation permit. If consistency with encounter order is required, and
* you are experiencing poor performance or memory utilization with
* {@code takeWhile()} in parallel pipelines, switching to sequential
* execution with {@link #sequential()} may improve performance.
*
* @param predicate a <a href="package-summary.html#NonInterference">non-interfering</a>,
* <a href="package-summary.html#Statelessness">stateless</a>
* predicate to apply to elements to determine the longest
* prefix of elements.
* @return the new stream
*/
default DoubleStream takeWhile(DoublePredicate predicate) {
Objects.requireNonNull(predicate);
// Reuses the unordered spliterator, which, when encounter is present,
// is safe to use as long as it configured not to split
return StreamSupport.doubleStream(
new WhileOps.UnorderedWhileSpliterator.OfDouble.Taking(spliterator(), true, predicate),
isParallel()).onClose(this::close);
}
/**
* Returns, if this stream is ordered, a stream consisting of the remaining
* elements of this stream after dropping the longest prefix of elements
* that match the given predicate. Otherwise returns, if this stream is
* unordered, a stream consisting of the remaining elements of this stream
* after dropping a subset of elements that match the given predicate.
*
* <p>If this stream is ordered then the longest prefix is a contiguous
* sequence of elements of this stream that match the given predicate. The
* first element of the sequence is the first element of this stream, and
* the element immediately following the last element of the sequence does
* not match the given predicate.
*
* <p>If this stream is unordered, and some (but not all) elements of this
* stream match the given predicate, then the behavior of this operation is
* nondeterministic; it is free to drop any subset of matching elements
* (which includes the empty set).
*
* <p>Independent of whether this stream is ordered or unordered if all
* elements of this stream match the given predicate then this operation
* drops all elements (the result is an empty stream), or if no elements of
* the stream match the given predicate then no elements are dropped (the
* result is the same is the input).
*
* <p>This is a <a href="package-summary.html#StreamOps">stateful
* intermediate operation</a>.
*
* @implSpec
* The default implementation obtains the {@link #spliterator() spliterator}
* of this stream, wraps that spliterator so as to support the semantics
* of this operation on traversal, and returns a new stream associated with
* the wrapped spliterator. The returned stream preserves the execution
* characteristics of this stream (namely parallel or sequential execution
* as per {@link #isParallel()}) but the wrapped spliterator may choose to
* not support splitting. When the returned stream is closed, the close
* handlers for both the returned and this stream are invoked.
*
* @apiNote
* While {@code dropWhile()} is generally a cheap operation on sequential
* stream pipelines, it can be quite expensive on ordered parallel
* pipelines, since the operation is constrained to return not just any
* valid prefix, but the longest prefix of elements in the encounter order.
* Using an unordered stream source (such as
* {@link #generate(DoubleSupplier)}) or removing the ordering constraint
* with {@link #unordered()} may result in significant speedups of
* {@code dropWhile()} in parallel pipelines, if the semantics of your
* situation permit. If consistency with encounter order is required, and
* you are experiencing poor performance or memory utilization with
* {@code dropWhile()} in parallel pipelines, switching to sequential
* execution with {@link #sequential()} may improve performance.
*
* @param predicate a <a href="package-summary.html#NonInterference">non-interfering</a>,
* <a href="package-summary.html#Statelessness">stateless</a>
* predicate to apply to elements to determine the longest
* prefix of elements.
* @return the new stream
*/
default DoubleStream dropWhile(DoublePredicate predicate) {
Objects.requireNonNull(predicate);
// Reuses the unordered spliterator, which, when encounter is present,
// is safe to use as long as it configured not to split
return StreamSupport.doubleStream(
new WhileOps.UnorderedWhileSpliterator.OfDouble.Dropping(spliterator(), true, predicate),
isParallel()).onClose(this::close);
}
/** /**
* Performs an action for each element of this stream. * Performs an action for each element of this stream.
* *

View File

@ -156,10 +156,12 @@ abstract class IntPipeline<E_IN>
} }
@Override @Override
final void forEachWithCancel(Spliterator<Integer> spliterator, Sink<Integer> sink) { final boolean forEachWithCancel(Spliterator<Integer> spliterator, Sink<Integer> sink) {
Spliterator.OfInt spl = adapt(spliterator); Spliterator.OfInt spl = adapt(spliterator);
IntConsumer adaptedSink = adapt(sink); IntConsumer adaptedSink = adapt(sink);
do { } while (!sink.cancellationRequested() && spl.tryAdvance(adaptedSink)); boolean cancelled;
do { } while (!(cancelled = sink.cancellationRequested()) && spl.tryAdvance(adaptedSink));
return cancelled;
} }
@Override @Override
@ -386,6 +388,16 @@ abstract class IntPipeline<E_IN>
return SliceOps.makeInt(this, n, -1); return SliceOps.makeInt(this, n, -1);
} }
@Override
public final IntStream takeWhile(IntPredicate predicate) {
return WhileOps.makeTakeWhileInt(this, predicate);
}
@Override
public final IntStream dropWhile(IntPredicate predicate) {
return WhileOps.makeDropWhileInt(this, predicate);
}
@Override @Override
public final IntStream sorted() { public final IntStream sorted() {
return SortedOps.makeInt(this); return SortedOps.makeInt(this);

View File

@ -1,5 +1,5 @@
/* /*
* Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved. * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
* *
* This code is free software; you can redistribute it and/or modify it * This code is free software; you can redistribute it and/or modify it
@ -272,6 +272,135 @@ public interface IntStream extends BaseStream<Integer, IntStream> {
*/ */
IntStream skip(long n); IntStream skip(long n);
/**
* Returns, if this stream is ordered, a stream consisting of the longest
* prefix of elements taken from this stream that match the given predicate.
* Otherwise returns, if this stream is unordered, a stream consisting of a
* subset of elements taken from this stream that match the given predicate.
*
* <p>If this stream is ordered then the longest prefix is a contiguous
* sequence of elements of this stream that match the given predicate. The
* first element of the sequence is the first element of this stream, and
* the element immediately following the last element of the sequence does
* not match the given predicate.
*
* <p>If this stream is unordered, and some (but not all) elements of this
* stream match the given predicate, then the behavior of this operation is
* nondeterministic; it is free to take any subset of matching elements
* (which includes the empty set).
*
* <p>Independent of whether this stream is ordered or unordered if all
* elements of this stream match the given predicate then this operation
* takes all elements (the result is the same is the input), or if no
* elements of the stream match the given predicate then no elements are
* taken (the result is an empty stream).
*
* <p>This is a <a href="package-summary.html#StreamOps">short-circuiting
* stateful intermediate operation</a>.
*
* @implSpec
* The default implementation obtains the {@link #spliterator() spliterator}
* of this stream, wraps that spliterator so as to support the semantics
* of this operation on traversal, and returns a new stream associated with
* the wrapped spliterator. The returned stream preserves the execution
* characteristics of this stream (namely parallel or sequential execution
* as per {@link #isParallel()}) but the wrapped spliterator may choose to
* not support splitting. When the returned stream is closed, the close
* handlers for both the returned and this stream are invoked.
*
* @apiNote
* While {@code takeWhile()} is generally a cheap operation on sequential
* stream pipelines, it can be quite expensive on ordered parallel
* pipelines, since the operation is constrained to return not just any
* valid prefix, but the longest prefix of elements in the encounter order.
* Using an unordered stream source (such as {@link #generate(IntSupplier)})
* or removing the ordering constraint with {@link #unordered()} may result
* in significant speedups of {@code takeWhile()} in parallel pipelines, if
* the semantics of your situation permit. If consistency with encounter
* order is required, and you are experiencing poor performance or memory
* utilization with {@code takeWhile()} in parallel pipelines, switching to
* sequential execution with {@link #sequential()} may improve performance.
*
* @param predicate a <a href="package-summary.html#NonInterference">non-interfering</a>,
* <a href="package-summary.html#Statelessness">stateless</a>
* predicate to apply to elements to determine the longest
* prefix of elements.
* @return the new stream
*/
default IntStream takeWhile(IntPredicate predicate) {
Objects.requireNonNull(predicate);
// Reuses the unordered spliterator, which, when encounter is present,
// is safe to use as long as it configured not to split
return StreamSupport.intStream(
new WhileOps.UnorderedWhileSpliterator.OfInt.Taking(spliterator(), true, predicate),
isParallel()).onClose(this::close);
}
/**
* Returns, if this stream is ordered, a stream consisting of the remaining
* elements of this stream after dropping the longest prefix of elements
* that match the given predicate. Otherwise returns, if this stream is
* unordered, a stream consisting of the remaining elements of this stream
* after dropping a subset of elements that match the given predicate.
*
* <p>If this stream is ordered then the longest prefix is a contiguous
* sequence of elements of this stream that match the given predicate. The
* first element of the sequence is the first element of this stream, and
* the element immediately following the last element of the sequence does
* not match the given predicate.
*
* <p>If this stream is unordered, and some (but not all) elements of this
* stream match the given predicate, then the behavior of this operation is
* nondeterministic; it is free to drop any subset of matching elements
* (which includes the empty set).
*
* <p>Independent of whether this stream is ordered or unordered if all
* elements of this stream match the given predicate then this operation
* drops all elements (the result is an empty stream), or if no elements of
* the stream match the given predicate then no elements are dropped (the
* result is the same is the input).
*
* <p>This is a <a href="package-summary.html#StreamOps">stateful
* intermediate operation</a>.
*
* @implSpec
* The default implementation obtains the {@link #spliterator() spliterator}
* of this stream, wraps that spliterator so as to support the semantics
* of this operation on traversal, and returns a new stream associated with
* the wrapped spliterator. The returned stream preserves the execution
* characteristics of this stream (namely parallel or sequential execution
* as per {@link #isParallel()}) but the wrapped spliterator may choose to
* not support splitting. When the returned stream is closed, the close
* handlers for both the returned and this stream are invoked.
*
* @apiNote
* While {@code dropWhile()} is generally a cheap operation on sequential
* stream pipelines, it can be quite expensive on ordered parallel
* pipelines, since the operation is constrained to return not just any
* valid prefix, but the longest prefix of elements in the encounter order.
* Using an unordered stream source (such as {@link #generate(IntSupplier)})
* or removing the ordering constraint with {@link #unordered()} may result
* in significant speedups of {@code dropWhile()} in parallel pipelines, if
* the semantics of your situation permit. If consistency with encounter
* order is required, and you are experiencing poor performance or memory
* utilization with {@code dropWhile()} in parallel pipelines, switching to
* sequential execution with {@link #sequential()} may improve performance.
*
* @param predicate a <a href="package-summary.html#NonInterference">non-interfering</a>,
* <a href="package-summary.html#Statelessness">stateless</a>
* predicate to apply to elements to determine the longest
* prefix of elements.
* @return the new stream
*/
default IntStream dropWhile(IntPredicate predicate) {
Objects.requireNonNull(predicate);
// Reuses the unordered spliterator, which, when encounter is present,
// is safe to use as long as it configured not to split
return StreamSupport.intStream(
new WhileOps.UnorderedWhileSpliterator.OfInt.Dropping(spliterator(), true, predicate),
isParallel()).onClose(this::close);
}
/** /**
* Performs an action for each element of this stream. * Performs an action for each element of this stream.
* *

View File

@ -154,10 +154,12 @@ abstract class LongPipeline<E_IN>
} }
@Override @Override
final void forEachWithCancel(Spliterator<Long> spliterator, Sink<Long> sink) { final boolean forEachWithCancel(Spliterator<Long> spliterator, Sink<Long> sink) {
Spliterator.OfLong spl = adapt(spliterator); Spliterator.OfLong spl = adapt(spliterator);
LongConsumer adaptedSink = adapt(sink); LongConsumer adaptedSink = adapt(sink);
do { } while (!sink.cancellationRequested() && spl.tryAdvance(adaptedSink)); boolean cancelled;
do { } while (!(cancelled = sink.cancellationRequested()) && spl.tryAdvance(adaptedSink));
return cancelled;
} }
@Override @Override
@ -367,6 +369,16 @@ abstract class LongPipeline<E_IN>
return SliceOps.makeLong(this, n, -1); return SliceOps.makeLong(this, n, -1);
} }
@Override
public final LongStream takeWhile(LongPredicate predicate) {
return WhileOps.makeTakeWhileLong(this, predicate);
}
@Override
public final LongStream dropWhile(LongPredicate predicate) {
return WhileOps.makeDropWhileLong(this, predicate);
}
@Override @Override
public final LongStream sorted() { public final LongStream sorted() {
return SortedOps.makeLong(this); return SortedOps.makeLong(this);

View File

@ -1,5 +1,5 @@
/* /*
* Copyright (c) 2013, Oracle and/or its affiliates. All rights reserved. * Copyright (c) 2013, 2015, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
* *
* This code is free software; you can redistribute it and/or modify it * This code is free software; you can redistribute it and/or modify it
@ -24,11 +24,7 @@
*/ */
package java.util.stream; package java.util.stream;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection;
import java.util.LongSummaryStatistics; import java.util.LongSummaryStatistics;
import java.util.Objects; import java.util.Objects;
import java.util.OptionalDouble; import java.util.OptionalDouble;
@ -36,7 +32,6 @@ import java.util.OptionalLong;
import java.util.PrimitiveIterator; import java.util.PrimitiveIterator;
import java.util.Spliterator; import java.util.Spliterator;
import java.util.Spliterators; import java.util.Spliterators;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;
import java.util.function.Function; import java.util.function.Function;
import java.util.function.LongBinaryOperator; import java.util.function.LongBinaryOperator;
@ -277,6 +272,137 @@ public interface LongStream extends BaseStream<Long, LongStream> {
*/ */
LongStream skip(long n); LongStream skip(long n);
/**
* Returns, if this stream is ordered, a stream consisting of the longest
* prefix of elements taken from this stream that match the given predicate.
* Otherwise returns, if this stream is unordered, a stream consisting of a
* subset of elements taken from this stream that match the given predicate.
*
* <p>If this stream is ordered then the longest prefix is a contiguous
* sequence of elements of this stream that match the given predicate. The
* first element of the sequence is the first element of this stream, and
* the element immediately following the last element of the sequence does
* not match the given predicate.
*
* <p>If this stream is unordered, and some (but not all) elements of this
* stream match the given predicate, then the behavior of this operation is
* nondeterministic; it is free to take any subset of matching elements
* (which includes the empty set).
*
* <p>Independent of whether this stream is ordered or unordered if all
* elements of this stream match the given predicate then this operation
* takes all elements (the result is the same is the input), or if no
* elements of the stream match the given predicate then no elements are
* taken (the result is an empty stream).
*
* <p>This is a <a href="package-summary.html#StreamOps">short-circuiting
* stateful intermediate operation</a>.
*
* @implSpec
* The default implementation obtains the {@link #spliterator() spliterator}
* of this stream, wraps that spliterator so as to support the semantics
* of this operation on traversal, and returns a new stream associated with
* the wrapped spliterator. The returned stream preserves the execution
* characteristics of this stream (namely parallel or sequential execution
* as per {@link #isParallel()}) but the wrapped spliterator may choose to
* not support splitting. When the returned stream is closed, the close
* handlers for both the returned and this stream are invoked.
*
* @apiNote
* While {@code takeWhile()} is generally a cheap operation on sequential
* stream pipelines, it can be quite expensive on ordered parallel
* pipelines, since the operation is constrained to return not just any
* valid prefix, but the longest prefix of elements in the encounter order.
* Using an unordered stream source (such as
* {@link #generate(LongSupplier)}) or removing the ordering constraint with
* {@link #unordered()} may result in significant speedups of
* {@code takeWhile()} in parallel pipelines, if the semantics of your
* situation permit. If consistency with encounter order is required, and
* you are experiencing poor performance or memory utilization with
* {@code takeWhile()} in parallel pipelines, switching to sequential
* execution with {@link #sequential()} may improve performance.
*
* @param predicate a <a href="package-summary.html#NonInterference">non-interfering</a>,
* <a href="package-summary.html#Statelessness">stateless</a>
* predicate to apply to elements to determine the longest
* prefix of elements.
* @return the new stream
*/
default LongStream takeWhile(LongPredicate predicate) {
Objects.requireNonNull(predicate);
// Reuses the unordered spliterator, which, when encounter is present,
// is safe to use as long as it configured not to split
return StreamSupport.longStream(
new WhileOps.UnorderedWhileSpliterator.OfLong.Taking(spliterator(), true, predicate),
isParallel()).onClose(this::close);
}
/**
* Returns, if this stream is ordered, a stream consisting of the remaining
* elements of this stream after dropping the longest prefix of elements
* that match the given predicate. Otherwise returns, if this stream is
* unordered, a stream consisting of the remaining elements of this stream
* after dropping a subset of elements that match the given predicate.
*
* <p>If this stream is ordered then the longest prefix is a contiguous
* sequence of elements of this stream that match the given predicate. The
* first element of the sequence is the first element of this stream, and
* the element immediately following the last element of the sequence does
* not match the given predicate.
*
* <p>If this stream is unordered, and some (but not all) elements of this
* stream match the given predicate, then the behavior of this operation is
* nondeterministic; it is free to drop any subset of matching elements
* (which includes the empty set).
*
* <p>Independent of whether this stream is ordered or unordered if all
* elements of this stream match the given predicate then this operation
* drops all elements (the result is an empty stream), or if no elements of
* the stream match the given predicate then no elements are dropped (the
* result is the same is the input).
*
* <p>This is a <a href="package-summary.html#StreamOps">stateful
* intermediate operation</a>.
*
* @implSpec
* The default implementation obtains the {@link #spliterator() spliterator}
* of this stream, wraps that spliterator so as to support the semantics
* of this operation on traversal, and returns a new stream associated with
* the wrapped spliterator. The returned stream preserves the execution
* characteristics of this stream (namely parallel or sequential execution
* as per {@link #isParallel()}) but the wrapped spliterator may choose to
* not support splitting. When the returned stream is closed, the close
* handlers for both the returned and this stream are invoked.
*
* @apiNote
* While {@code dropWhile()} is generally a cheap operation on sequential
* stream pipelines, it can be quite expensive on ordered parallel
* pipelines, since the operation is constrained to return not just any
* valid prefix, but the longest prefix of elements in the encounter order.
* Using an unordered stream source (such as
* {@link #generate(LongSupplier)}) or removing the ordering constraint with
* {@link #unordered()} may result in significant speedups of
* {@code dropWhile()} in parallel pipelines, if the semantics of your
* situation permit. If consistency with encounter order is required, and
* you are experiencing poor performance or memory utilization with
* {@code dropWhile()} in parallel pipelines, switching to sequential
* execution with {@link #sequential()} may improve performance.
*
* @param predicate a <a href="package-summary.html#NonInterference">non-interfering</a>,
* <a href="package-summary.html#Statelessness">stateless</a>
* predicate to apply to elements to determine the longest
* prefix of elements.
* @return the new stream
*/
default LongStream dropWhile(LongPredicate predicate) {
Objects.requireNonNull(predicate);
// Reuses the unordered spliterator, which, when encounter is present,
// is safe to use as long as it configured not to split
return StreamSupport.longStream(
new WhileOps.UnorderedWhileSpliterator.OfLong.Dropping(spliterator(), true, predicate),
isParallel()).onClose(this::close);
}
/** /**
* Performs an action for each element of this stream. * Performs an action for each element of this stream.
* *

View File

@ -1,5 +1,5 @@
/* /*
* Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved. * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
* *
* This code is free software; you can redistribute it and/or modify it * This code is free software; you can redistribute it and/or modify it
@ -125,7 +125,11 @@ interface Node<T> {
Node.Builder<T> nodeBuilder = Nodes.builder(size, generator); Node.Builder<T> nodeBuilder = Nodes.builder(size, generator);
nodeBuilder.begin(size); nodeBuilder.begin(size);
for (int i = 0; i < from && spliterator.tryAdvance(e -> { }); i++) { } for (int i = 0; i < from && spliterator.tryAdvance(e -> { }); i++) { }
for (int i = 0; (i < size) && spliterator.tryAdvance(nodeBuilder); i++) { } if (to == count()) {
spliterator.forEachRemaining(nodeBuilder);
} else {
for (int i = 0; i < size && spliterator.tryAdvance(nodeBuilder); i++) { }
}
nodeBuilder.end(); nodeBuilder.end();
return nodeBuilder.build(); return nodeBuilder.build();
} }
@ -360,7 +364,11 @@ interface Node<T> {
Node.Builder.OfInt nodeBuilder = Nodes.intBuilder(size); Node.Builder.OfInt nodeBuilder = Nodes.intBuilder(size);
nodeBuilder.begin(size); nodeBuilder.begin(size);
for (int i = 0; i < from && spliterator.tryAdvance((IntConsumer) e -> { }); i++) { } for (int i = 0; i < from && spliterator.tryAdvance((IntConsumer) e -> { }); i++) { }
for (int i = 0; (i < size) && spliterator.tryAdvance((IntConsumer) nodeBuilder); i++) { } if (to == count()) {
spliterator.forEachRemaining((IntConsumer) nodeBuilder);
} else {
for (int i = 0; i < size && spliterator.tryAdvance((IntConsumer) nodeBuilder); i++) { }
}
nodeBuilder.end(); nodeBuilder.end();
return nodeBuilder.build(); return nodeBuilder.build();
} }
@ -433,7 +441,11 @@ interface Node<T> {
Node.Builder.OfLong nodeBuilder = Nodes.longBuilder(size); Node.Builder.OfLong nodeBuilder = Nodes.longBuilder(size);
nodeBuilder.begin(size); nodeBuilder.begin(size);
for (int i = 0; i < from && spliterator.tryAdvance((LongConsumer) e -> { }); i++) { } for (int i = 0; i < from && spliterator.tryAdvance((LongConsumer) e -> { }); i++) { }
for (int i = 0; (i < size) && spliterator.tryAdvance((LongConsumer) nodeBuilder); i++) { } if (to == count()) {
spliterator.forEachRemaining((LongConsumer) nodeBuilder);
} else {
for (int i = 0; i < size && spliterator.tryAdvance((LongConsumer) nodeBuilder); i++) { }
}
nodeBuilder.end(); nodeBuilder.end();
return nodeBuilder.build(); return nodeBuilder.build();
} }
@ -508,7 +520,11 @@ interface Node<T> {
Node.Builder.OfDouble nodeBuilder = Nodes.doubleBuilder(size); Node.Builder.OfDouble nodeBuilder = Nodes.doubleBuilder(size);
nodeBuilder.begin(size); nodeBuilder.begin(size);
for (int i = 0; i < from && spliterator.tryAdvance((DoubleConsumer) e -> { }); i++) { } for (int i = 0; i < from && spliterator.tryAdvance((DoubleConsumer) e -> { }); i++) { }
for (int i = 0; (i < size) && spliterator.tryAdvance((DoubleConsumer) nodeBuilder); i++) { } if (to == count()) {
spliterator.forEachRemaining((DoubleConsumer) nodeBuilder);
} else {
for (int i = 0; i < size && spliterator.tryAdvance((DoubleConsumer) nodeBuilder); i++) { }
}
nodeBuilder.end(); nodeBuilder.end();
return nodeBuilder.build(); return nodeBuilder.build();
} }

View File

@ -1,5 +1,5 @@
/* /*
* Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved. * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
* *
* This code is free software; you can redistribute it and/or modify it * This code is free software; you can redistribute it and/or modify it
@ -69,6 +69,14 @@ final class Nodes {
private static final Node.OfLong EMPTY_LONG_NODE = new EmptyNode.OfLong(); private static final Node.OfLong EMPTY_LONG_NODE = new EmptyNode.OfLong();
private static final Node.OfDouble EMPTY_DOUBLE_NODE = new EmptyNode.OfDouble(); private static final Node.OfDouble EMPTY_DOUBLE_NODE = new EmptyNode.OfDouble();
/**
* @return an array generator for an array whose elements are of type T.
*/
@SuppressWarnings("unchecked")
static <T> IntFunction<T[]> castingArray() {
return size -> (T[]) new Object[size];
}
// General shape-based node creation methods // General shape-based node creation methods
/** /**

View File

@ -1,5 +1,5 @@
/* /*
* Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved. * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
* *
* This code is free software; you can redistribute it and/or modify it * This code is free software; you can redistribute it and/or modify it
@ -136,8 +136,9 @@ abstract class PipelineHelper<P_OUT> {
* *
* @param wrappedSink the destination {@code Sink} * @param wrappedSink the destination {@code Sink}
* @param spliterator the source {@code Spliterator} * @param spliterator the source {@code Spliterator}
* @return true if the cancellation was requested
*/ */
abstract <P_IN> void copyIntoWithCancel(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator); abstract <P_IN> boolean copyIntoWithCancel(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator);
/** /**
* Takes a {@code Sink} that accepts elements of the output type of the * Takes a {@code Sink} that accepts elements of the output type of the

View File

@ -122,8 +122,10 @@ abstract class ReferencePipeline<P_IN, P_OUT>
} }
@Override @Override
final void forEachWithCancel(Spliterator<P_OUT> spliterator, Sink<P_OUT> sink) { final boolean forEachWithCancel(Spliterator<P_OUT> spliterator, Sink<P_OUT> sink) {
do { } while (!sink.cancellationRequested() && spliterator.tryAdvance(sink)); boolean cancelled;
do { } while (!(cancelled = sink.cancellationRequested()) && spliterator.tryAdvance(sink));
return cancelled;
} }
@Override @Override
@ -411,6 +413,16 @@ abstract class ReferencePipeline<P_IN, P_OUT>
return SliceOps.makeRef(this, n, -1); return SliceOps.makeRef(this, n, -1);
} }
@Override
public final Stream<P_OUT> takeWhile(Predicate<? super P_OUT> predicate) {
return WhileOps.makeTakeWhileRef(this, predicate);
}
@Override
public final Stream<P_OUT> dropWhile(Predicate<? super P_OUT> predicate) {
return WhileOps.makeDropWhileRef(this, predicate);
}
// Terminal operations from Stream // Terminal operations from Stream
@Override @Override

View File

@ -96,11 +96,6 @@ final class SliceOps {
} }
} }
@SuppressWarnings("unchecked")
private static <T> IntFunction<T[]> castingArray() {
return size -> (T[]) new Object[size];
}
/** /**
* Appends a "slice" operation to the provided stream. The slice operation * Appends a "slice" operation to the provided stream. The slice operation
* may be may be skip-only, limit-only, or skip-and-limit. * may be may be skip-only, limit-only, or skip-and-limit.
@ -151,7 +146,7 @@ final class SliceOps {
// cancellation will be more aggressive cancelling later tasks // cancellation will be more aggressive cancelling later tasks
// if the target slice size has been reached from a given task, // if the target slice size has been reached from a given task,
// cancellation should also clear local results if any // cancellation should also clear local results if any
return new SliceTask<>(this, helper, spliterator, castingArray(), skip, limit). return new SliceTask<>(this, helper, spliterator, Nodes.castingArray(), skip, limit).
invoke().spliterator(); invoke().spliterator();
} }
} }

View File

@ -24,7 +24,6 @@
*/ */
package java.util.stream; package java.util.stream;
import java.nio.charset.Charset;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.Arrays; import java.util.Arrays;
@ -480,6 +479,135 @@ public interface Stream<T> extends BaseStream<T, Stream<T>> {
*/ */
Stream<T> skip(long n); Stream<T> skip(long n);
/**
* Returns, if this stream is ordered, a stream consisting of the longest
* prefix of elements taken from this stream that match the given predicate.
* Otherwise returns, if this stream is unordered, a stream consisting of a
* subset of elements taken from this stream that match the given predicate.
*
* <p>If this stream is ordered then the longest prefix is a contiguous
* sequence of elements of this stream that match the given predicate. The
* first element of the sequence is the first element of this stream, and
* the element immediately following the last element of the sequence does
* not match the given predicate.
*
* <p>If this stream is unordered, and some (but not all) elements of this
* stream match the given predicate, then the behavior of this operation is
* nondeterministic; it is free to take any subset of matching elements
* (which includes the empty set).
*
* <p>Independent of whether this stream is ordered or unordered if all
* elements of this stream match the given predicate then this operation
* takes all elements (the result is the same is the input), or if no
* elements of the stream match the given predicate then no elements are
* taken (the result is an empty stream).
*
* <p>This is a <a href="package-summary.html#StreamOps">short-circuiting
* stateful intermediate operation</a>.
*
* @implSpec
* The default implementation obtains the {@link #spliterator() spliterator}
* of this stream, wraps that spliterator so as to support the semantics
* of this operation on traversal, and returns a new stream associated with
* the wrapped spliterator. The returned stream preserves the execution
* characteristics of this stream (namely parallel or sequential execution
* as per {@link #isParallel()}) but the wrapped spliterator may choose to
* not support splitting. When the returned stream is closed, the close
* handlers for both the returned and this stream are invoked.
*
* @apiNote
* While {@code takeWhile()} is generally a cheap operation on sequential
* stream pipelines, it can be quite expensive on ordered parallel
* pipelines, since the operation is constrained to return not just any
* valid prefix, but the longest prefix of elements in the encounter order.
* Using an unordered stream source (such as {@link #generate(Supplier)}) or
* removing the ordering constraint with {@link #unordered()} may result in
* significant speedups of {@code takeWhile()} in parallel pipelines, if the
* semantics of your situation permit. If consistency with encounter order
* is required, and you are experiencing poor performance or memory
* utilization with {@code takeWhile()} in parallel pipelines, switching to
* sequential execution with {@link #sequential()} may improve performance.
*
* @param predicate a <a href="package-summary.html#NonInterference">non-interfering</a>,
* <a href="package-summary.html#Statelessness">stateless</a>
* predicate to apply to elements to determine the longest
* prefix of elements.
* @return the new stream
*/
default Stream<T> takeWhile(Predicate<? super T> predicate) {
Objects.requireNonNull(predicate);
// Reuses the unordered spliterator, which, when encounter is present,
// is safe to use as long as it configured not to split
return StreamSupport.stream(
new WhileOps.UnorderedWhileSpliterator.OfRef.Taking<>(spliterator(), true, predicate),
isParallel()).onClose(this::close);
}
/**
* Returns, if this stream is ordered, a stream consisting of the remaining
* elements of this stream after dropping the longest prefix of elements
* that match the given predicate. Otherwise returns, if this stream is
* unordered, a stream consisting of the remaining elements of this stream
* after dropping a subset of elements that match the given predicate.
*
* <p>If this stream is ordered then the longest prefix is a contiguous
* sequence of elements of this stream that match the given predicate. The
* first element of the sequence is the first element of this stream, and
* the element immediately following the last element of the sequence does
* not match the given predicate.
*
* <p>If this stream is unordered, and some (but not all) elements of this
* stream match the given predicate, then the behavior of this operation is
* nondeterministic; it is free to drop any subset of matching elements
* (which includes the empty set).
*
* <p>Independent of whether this stream is ordered or unordered if all
* elements of this stream match the given predicate then this operation
* drops all elements (the result is an empty stream), or if no elements of
* the stream match the given predicate then no elements are dropped (the
* result is the same is the input).
*
* <p>This is a <a href="package-summary.html#StreamOps">stateful
* intermediate operation</a>.
*
* @implSpec
* The default implementation obtains the {@link #spliterator() spliterator}
* of this stream, wraps that spliterator so as to support the semantics
* of this operation on traversal, and returns a new stream associated with
* the wrapped spliterator. The returned stream preserves the execution
* characteristics of this stream (namely parallel or sequential execution
* as per {@link #isParallel()}) but the wrapped spliterator may choose to
* not support splitting. When the returned stream is closed, the close
* handlers for both the returned and this stream are invoked.
*
* @apiNote
* While {@code dropWhile()} is generally a cheap operation on sequential
* stream pipelines, it can be quite expensive on ordered parallel
* pipelines, since the operation is constrained to return not just any
* valid prefix, but the longest prefix of elements in the encounter order.
* Using an unordered stream source (such as {@link #generate(Supplier)}) or
* removing the ordering constraint with {@link #unordered()} may result in
* significant speedups of {@code dropWhile()} in parallel pipelines, if the
* semantics of your situation permit. If consistency with encounter order
* is required, and you are experiencing poor performance or memory
* utilization with {@code dropWhile()} in parallel pipelines, switching to
* sequential execution with {@link #sequential()} may improve performance.
*
* @param predicate a <a href="package-summary.html#NonInterference">non-interfering</a>,
* <a href="package-summary.html#Statelessness">stateless</a>
* predicate to apply to elements to determine the longest
* prefix of elements.
* @return the new stream
*/
default Stream<T> dropWhile(Predicate<? super T> predicate) {
Objects.requireNonNull(predicate);
// Reuses the unordered spliterator, which, when encounter is present,
// is safe to use as long as it configured not to split
return StreamSupport.stream(
new WhileOps.UnorderedWhileSpliterator.OfRef.Dropping<>(spliterator(), true, predicate),
isParallel()).onClose(this::close);
}
/** /**
* Performs an action for each element of this stream. * Performs an action for each element of this stream.
* *

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,984 @@
/*
* Copyright (c) 2015, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package java.util.stream;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.Comparator;
import java.util.DoubleSummaryStatistics;
import java.util.IntSummaryStatistics;
import java.util.Iterator;
import java.util.LongSummaryStatistics;
import java.util.Optional;
import java.util.OptionalDouble;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.PrimitiveIterator;
import java.util.Set;
import java.util.Spliterator;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.BinaryOperator;
import java.util.function.Consumer;
import java.util.function.DoubleBinaryOperator;
import java.util.function.DoubleConsumer;
import java.util.function.DoubleFunction;
import java.util.function.DoublePredicate;
import java.util.function.DoubleToIntFunction;
import java.util.function.DoubleToLongFunction;
import java.util.function.DoubleUnaryOperator;
import java.util.function.Function;
import java.util.function.IntBinaryOperator;
import java.util.function.IntConsumer;
import java.util.function.IntFunction;
import java.util.function.IntPredicate;
import java.util.function.IntToDoubleFunction;
import java.util.function.IntToLongFunction;
import java.util.function.IntUnaryOperator;
import java.util.function.LongBinaryOperator;
import java.util.function.LongConsumer;
import java.util.function.LongFunction;
import java.util.function.LongPredicate;
import java.util.function.LongToDoubleFunction;
import java.util.function.LongToIntFunction;
import java.util.function.LongUnaryOperator;
import java.util.function.ObjDoubleConsumer;
import java.util.function.ObjIntConsumer;
import java.util.function.ObjLongConsumer;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.function.ToDoubleFunction;
import java.util.function.ToIntFunction;
import java.util.function.ToLongFunction;
import static java.util.stream.Collectors.*;
public final class DefaultMethodStreams {
static {
// Verify that default methods are not overridden
verify(DefaultMethodRefStream.class);
verify(DefaultMethodIntStream.class);
verify(DefaultMethodLongStream.class);
verify(DefaultMethodDoubleStream.class);
}
static void verify(Class<?> del) {
// Find the stream interface
Class<?> s = Stream.of(del.getInterfaces())
.filter(c -> BaseStream.class.isAssignableFrom(c))
.findFirst().get();
// Get all default methods on the stream class
Set<String> dms = Stream.of(s.getMethods())
.filter(m -> !Modifier.isStatic(m.getModifiers()))
.filter(m -> !m.isBridge())
.filter(Method::isDefault)
.map(Method::getName)
.collect(toSet());
// Get all methods on the delegating class
Set<String> ims = Stream.of(del.getMethods())
.filter(m -> !Modifier.isStatic(m.getModifiers()))
.filter(m -> m.getDeclaringClass() == del)
.map(Method::getName)
.collect(toSet());
if (ims.stream().anyMatch(dms::contains)) {
throw new AssertionError(String.format("%s overrides default methods of %s\n", del, s));
}
}
/**
* Creates a stream that for the next operation either delegates to
* a default method on {@link Stream}, if present for that operation,
* otherwise delegates to an underlying stream.
*
* @param s the underlying stream to be delegated to for non-default
* methods.
* @param <T> the type of the stream elements
* @return the delegating stream
*/
public static <T> Stream<T> delegateTo(Stream<T> s) {
return new DefaultMethodRefStream<>(s);
}
/**
* Creates a stream that for the next operation either delegates to
* a default method on {@link IntStream}, if present for that operation,
* otherwise delegates to an underlying stream.
*
* @param s the underlying stream to be delegated to for non-default
* methods.
* @return the delegating stream
*/
public static IntStream delegateTo(IntStream s) {
return new DefaultMethodIntStream(s);
}
/**
* Creates a stream that for the next operation either delegates to
* a default method on {@link LongStream}, if present for that operation,
* otherwise delegates to an underlying stream.
*
* @param s the underlying stream to be delegated to for non-default
* methods.
* @return the delegating stream
*/
public static LongStream delegateTo(LongStream s) {
return new DefaultMethodLongStream(s);
}
/**
* Creates a stream that for the next operation either delegates to
* a default method on {@link DoubleStream}, if present for that operation,
* otherwise delegates to an underlying stream.
*
* @param s the underlying stream to be delegated to for non-default
* methods.
* @return the delegating stream
*/
public static DoubleStream delegateTo(DoubleStream s) {
return new DefaultMethodDoubleStream(s);
}
/**
* A stream that delegates the next operation to a default method, if
* present, or to the same operation of an underlying stream.
*
* @param <T> the type of the stream elements
*/
static final class DefaultMethodRefStream<T> implements Stream<T> {
final Stream<T> s;
DefaultMethodRefStream(Stream<T> s) {
this.s = s;
}
// Delegating non-default methods
@Override
public Stream<T> filter(Predicate<? super T> predicate) {
return s.filter(predicate);
}
@Override
public <R> Stream<R> map(Function<? super T, ? extends R> mapper) {
return s.map(mapper);
}
@Override
public IntStream mapToInt(ToIntFunction<? super T> mapper) {
return s.mapToInt(mapper);
}
@Override
public LongStream mapToLong(ToLongFunction<? super T> mapper) {
return s.mapToLong(mapper);
}
@Override
public DoubleStream mapToDouble(ToDoubleFunction<? super T> mapper) {
return s.mapToDouble(mapper);
}
@Override
public <R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper) {
return s.flatMap(mapper);
}
@Override
public IntStream flatMapToInt(Function<? super T, ? extends IntStream> mapper) {
return s.flatMapToInt(mapper);
}
@Override
public LongStream flatMapToLong(Function<? super T, ? extends LongStream> mapper) {
return s.flatMapToLong(mapper);
}
@Override
public DoubleStream flatMapToDouble(Function<? super T, ? extends DoubleStream> mapper) {
return s.flatMapToDouble(mapper);
}
@Override
public Stream<T> distinct() {
return s.distinct();
}
@Override
public Stream<T> sorted() {
return s.sorted();
}
@Override
public Stream<T> sorted(Comparator<? super T> comparator) {
return s.sorted(comparator);
}
@Override
public Stream<T> peek(Consumer<? super T> action) {
return s.peek(action);
}
@Override
public Stream<T> limit(long maxSize) {
return s.limit(maxSize);
}
@Override
public Stream<T> skip(long n) {
return s.skip(n);
}
@Override
public void forEach(Consumer<? super T> action) {
s.forEach(action);
}
@Override
public void forEachOrdered(Consumer<? super T> action) {
s.forEachOrdered(action);
}
@Override
public Object[] toArray() {
return s.toArray();
}
@Override
public <A> A[] toArray(IntFunction<A[]> generator) {
return s.toArray(generator);
}
@Override
public T reduce(T identity, BinaryOperator<T> accumulator) {
return s.reduce(identity, accumulator);
}
@Override
public Optional<T> reduce(BinaryOperator<T> accumulator) {
return s.reduce(accumulator);
}
@Override
public <U> U reduce(U identity, BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner) {
return s.reduce(identity, accumulator, combiner);
}
@Override
public <R> R collect(Supplier<R> supplier, BiConsumer<R, ? super T> accumulator, BiConsumer<R, R> combiner) {
return s.collect(supplier, accumulator, combiner);
}
@Override
public <R, A> R collect(Collector<? super T, A, R> collector) {
return s.collect(collector);
}
@Override
public Optional<T> min(Comparator<? super T> comparator) {
return s.min(comparator);
}
@Override
public Optional<T> max(Comparator<? super T> comparator) {
return s.max(comparator);
}
@Override
public long count() {
return s.count();
}
@Override
public boolean anyMatch(Predicate<? super T> predicate) {
return s.anyMatch(predicate);
}
@Override
public boolean allMatch(Predicate<? super T> predicate) {
return s.allMatch(predicate);
}
@Override
public boolean noneMatch(Predicate<? super T> predicate) {
return s.noneMatch(predicate);
}
@Override
public Optional<T> findFirst() {
return s.findFirst();
}
@Override
public Optional<T> findAny() {
return s.findAny();
}
@Override
public Iterator<T> iterator() {
return s.iterator();
}
@Override
public Spliterator<T> spliterator() {
return s.spliterator();
}
@Override
public boolean isParallel() {
return s.isParallel();
}
@Override
public Stream<T> sequential() {
return s.sequential();
}
@Override
public Stream<T> parallel() {
return s.parallel();
}
@Override
public Stream<T> unordered() {
return s.unordered();
}
@Override
public Stream<T> onClose(Runnable closeHandler) {
return s.onClose(closeHandler);
}
@Override
public void close() {
s.close();
}
}
static final class DefaultMethodIntStream implements IntStream {
final IntStream s;
public DefaultMethodIntStream(IntStream s) {
this.s = s;
}
// Delegating non-default methods
@Override
public IntStream filter(IntPredicate predicate) {
return s.filter(predicate);
}
@Override
public IntStream map(IntUnaryOperator mapper) {
return s.map(mapper);
}
@Override
public <U> Stream<U> mapToObj(IntFunction<? extends U> mapper) {
return s.mapToObj(mapper);
}
@Override
public LongStream mapToLong(IntToLongFunction mapper) {
return s.mapToLong(mapper);
}
@Override
public DoubleStream mapToDouble(IntToDoubleFunction mapper) {
return s.mapToDouble(mapper);
}
@Override
public IntStream flatMap(IntFunction<? extends IntStream> mapper) {
return s.flatMap(mapper);
}
@Override
public IntStream distinct() {
return s.distinct();
}
@Override
public IntStream sorted() {
return s.sorted();
}
@Override
public IntStream peek(IntConsumer action) {
return s.peek(action);
}
@Override
public IntStream limit(long maxSize) {
return s.limit(maxSize);
}
@Override
public IntStream skip(long n) {
return s.skip(n);
}
@Override
public void forEach(IntConsumer action) {
s.forEach(action);
}
@Override
public void forEachOrdered(IntConsumer action) {
s.forEachOrdered(action);
}
@Override
public int[] toArray() {
return s.toArray();
}
@Override
public int reduce(int identity, IntBinaryOperator op) {
return s.reduce(identity, op);
}
@Override
public OptionalInt reduce(IntBinaryOperator op) {
return s.reduce(op);
}
@Override
public <R> R collect(Supplier<R> supplier, ObjIntConsumer<R> accumulator, BiConsumer<R, R> combiner) {
return s.collect(supplier, accumulator, combiner);
}
@Override
public int sum() {
return s.sum();
}
@Override
public OptionalInt min() {
return s.min();
}
@Override
public OptionalInt max() {
return s.max();
}
@Override
public long count() {
return s.count();
}
@Override
public OptionalDouble average() {
return s.average();
}
@Override
public IntSummaryStatistics summaryStatistics() {
return s.summaryStatistics();
}
@Override
public boolean anyMatch(IntPredicate predicate) {
return s.anyMatch(predicate);
}
@Override
public boolean allMatch(IntPredicate predicate) {
return s.allMatch(predicate);
}
@Override
public boolean noneMatch(IntPredicate predicate) {
return s.noneMatch(predicate);
}
@Override
public OptionalInt findFirst() {
return s.findFirst();
}
@Override
public OptionalInt findAny() {
return s.findAny();
}
@Override
public LongStream asLongStream() {
return s.asLongStream();
}
@Override
public DoubleStream asDoubleStream() {
return s.asDoubleStream();
}
@Override
public Stream<Integer> boxed() {
return s.boxed();
}
@Override
public IntStream sequential() {
return s.sequential();
}
@Override
public IntStream parallel() {
return s.parallel();
}
@Override
public PrimitiveIterator.OfInt iterator() {
return s.iterator();
}
@Override
public Spliterator.OfInt spliterator() {
return s.spliterator();
}
@Override
public boolean isParallel() {
return s.isParallel();
}
@Override
public IntStream unordered() {
return s.unordered();
}
@Override
public IntStream onClose(Runnable closeHandler) {
return s.onClose(closeHandler);
}
@Override
public void close() {
s.close();
}
}
static final class DefaultMethodLongStream implements LongStream {
final LongStream s;
public DefaultMethodLongStream(LongStream s) {
this.s = s;
}
// Delegating non-default methods
@Override
public void forEach(LongConsumer action) {
s.forEach(action);
}
@Override
public LongStream filter(LongPredicate predicate) {
return s.filter(predicate);
}
@Override
public LongStream map(LongUnaryOperator mapper) {
return s.map(mapper);
}
@Override
public <U> Stream<U> mapToObj(LongFunction<? extends U> mapper) {
return s.mapToObj(mapper);
}
@Override
public IntStream mapToInt(LongToIntFunction mapper) {
return s.mapToInt(mapper);
}
@Override
public DoubleStream mapToDouble(LongToDoubleFunction mapper) {
return s.mapToDouble(mapper);
}
@Override
public LongStream flatMap(LongFunction<? extends LongStream> mapper) {
return s.flatMap(mapper);
}
@Override
public LongStream distinct() {
return s.distinct();
}
@Override
public LongStream sorted() {
return s.sorted();
}
@Override
public LongStream peek(LongConsumer action) {
return s.peek(action);
}
@Override
public LongStream limit(long maxSize) {
return s.limit(maxSize);
}
@Override
public LongStream skip(long n) {
return s.skip(n);
}
@Override
public void forEachOrdered(LongConsumer action) {
s.forEachOrdered(action);
}
@Override
public long[] toArray() {
return s.toArray();
}
@Override
public long reduce(long identity, LongBinaryOperator op) {
return s.reduce(identity, op);
}
@Override
public OptionalLong reduce(LongBinaryOperator op) {
return s.reduce(op);
}
@Override
public <R> R collect(Supplier<R> supplier, ObjLongConsumer<R> accumulator, BiConsumer<R, R> combiner) {
return s.collect(supplier, accumulator, combiner);
}
@Override
public long sum() {
return s.sum();
}
@Override
public OptionalLong min() {
return s.min();
}
@Override
public OptionalLong max() {
return s.max();
}
@Override
public long count() {
return s.count();
}
@Override
public OptionalDouble average() {
return s.average();
}
@Override
public LongSummaryStatistics summaryStatistics() {
return s.summaryStatistics();
}
@Override
public boolean anyMatch(LongPredicate predicate) {
return s.anyMatch(predicate);
}
@Override
public boolean allMatch(LongPredicate predicate) {
return s.allMatch(predicate);
}
@Override
public boolean noneMatch(LongPredicate predicate) {
return s.noneMatch(predicate);
}
@Override
public OptionalLong findFirst() {
return s.findFirst();
}
@Override
public OptionalLong findAny() {
return s.findAny();
}
@Override
public DoubleStream asDoubleStream() {
return s.asDoubleStream();
}
@Override
public Stream<Long> boxed() {
return s.boxed();
}
@Override
public LongStream sequential() {
return s.sequential();
}
@Override
public LongStream parallel() {
return s.parallel();
}
@Override
public PrimitiveIterator.OfLong iterator() {
return s.iterator();
}
@Override
public Spliterator.OfLong spliterator() {
return s.spliterator();
}
@Override
public boolean isParallel() {
return s.isParallel();
}
@Override
public LongStream unordered() {
return s.unordered();
}
@Override
public LongStream onClose(Runnable closeHandler) {
return s.onClose(closeHandler);
}
@Override
public void close() {
s.close();
}
}
static final class DefaultMethodDoubleStream implements DoubleStream {
final DoubleStream s;
public DefaultMethodDoubleStream(DoubleStream s) {
this.s = s;
}
@Override
public DoubleStream filter(DoublePredicate predicate) {
return s.filter(predicate);
}
@Override
public DoubleStream map(DoubleUnaryOperator mapper) {
return s.map(mapper);
}
@Override
public <U> Stream<U> mapToObj(DoubleFunction<? extends U> mapper) {
return s.mapToObj(mapper);
}
@Override
public IntStream mapToInt(DoubleToIntFunction mapper) {
return s.mapToInt(mapper);
}
@Override
public LongStream mapToLong(DoubleToLongFunction mapper) {
return s.mapToLong(mapper);
}
@Override
public DoubleStream flatMap(DoubleFunction<? extends DoubleStream> mapper) {
return s.flatMap(mapper);
}
@Override
public DoubleStream distinct() {
return s.distinct();
}
@Override
public DoubleStream sorted() {
return s.sorted();
}
@Override
public DoubleStream peek(DoubleConsumer action) {
return s.peek(action);
}
@Override
public DoubleStream limit(long maxSize) {
return s.limit(maxSize);
}
@Override
public DoubleStream skip(long n) {
return s.skip(n);
}
@Override
public void forEach(DoubleConsumer action) {
s.forEach(action);
}
@Override
public void forEachOrdered(DoubleConsumer action) {
s.forEachOrdered(action);
}
@Override
public double[] toArray() {
return s.toArray();
}
@Override
public double reduce(double identity, DoubleBinaryOperator op) {
return s.reduce(identity, op);
}
@Override
public OptionalDouble reduce(DoubleBinaryOperator op) {
return s.reduce(op);
}
@Override
public <R> R collect(Supplier<R> supplier, ObjDoubleConsumer<R> accumulator, BiConsumer<R, R> combiner) {
return s.collect(supplier, accumulator, combiner);
}
@Override
public double sum() {
return s.sum();
}
@Override
public OptionalDouble min() {
return s.min();
}
@Override
public OptionalDouble max() {
return s.max();
}
@Override
public long count() {
return s.count();
}
@Override
public OptionalDouble average() {
return s.average();
}
@Override
public DoubleSummaryStatistics summaryStatistics() {
return s.summaryStatistics();
}
@Override
public boolean anyMatch(DoublePredicate predicate) {
return s.anyMatch(predicate);
}
@Override
public boolean allMatch(DoublePredicate predicate) {
return s.allMatch(predicate);
}
@Override
public boolean noneMatch(DoublePredicate predicate) {
return s.noneMatch(predicate);
}
@Override
public OptionalDouble findFirst() {
return s.findFirst();
}
@Override
public OptionalDouble findAny() {
return s.findAny();
}
@Override
public Stream<Double> boxed() {
return s.boxed();
}
@Override
public DoubleStream sequential() {
return s.sequential();
}
@Override
public DoubleStream parallel() {
return s.parallel();
}
@Override
public PrimitiveIterator.OfDouble iterator() {
return s.iterator();
}
@Override
public Spliterator.OfDouble spliterator() {
return s.spliterator();
}
@Override
public boolean isParallel() {
return s.isParallel();
}
@Override
public DoubleStream unordered() {
return s.unordered();
}
@Override
public DoubleStream onClose(Runnable closeHandler) {
return s.onClose(closeHandler);
}
@Override
public void close() {
s.close();
}
}
}

View File

@ -1,5 +1,5 @@
/* /*
* Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved. * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
* *
* This code is free software; you can redistribute it and/or modify it * This code is free software; you can redistribute it and/or modify it
@ -119,7 +119,7 @@ public class StreamTestDataProvider {
// Simple combination of numbers and null values, probably excessive but may catch // Simple combination of numbers and null values, probably excessive but may catch
// errors for initialization/termination/sequence // errors for initialization/termination/sequence
// @@@ This is separate from the other data for now until nulls are consitently supported by // @@@ This is separate from the other data for now until nulls are consistently supported by
// all operations // all operations
{ {
List<Object[]> list = new ArrayList<>(); List<Object[]> list = new ArrayList<>();

View File

@ -0,0 +1,304 @@
/*
* Copyright (c) 2015, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package org.openjdk.tests.java.util.stream;
import org.testng.annotations.Test;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.DefaultMethodStreams;
import java.util.stream.DoubleStream;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
import java.util.stream.OpTestCase;
import java.util.stream.Stream;
import static java.util.stream.Collectors.toCollection;
/*
* @test
* @bug 8071597
*/
@Test
public class WhileOpStatefulTest extends OpTestCase {
static final long COUNT_PERIOD = 100;
static final long EXECUTION_TIME_LIMIT = TimeUnit.SECONDS.toMillis(10);
static final long TAKE_WHILE_COUNT_LIMIT = 100_000;
static final int DROP_SOURCE_SIZE = 10_000;
static final long DROP_WHILE_COUNT_LIMIT = 5000;
@Test
public void testTimedTakeWithCount() {
testTakeWhileMulti(
s -> {
BooleanSupplier isWithinTakePeriod =
within(System.currentTimeMillis(), COUNT_PERIOD);
s.takeWhile(e -> isWithinTakePeriod.getAsBoolean())
.mapToLong(e -> 1).reduce(0, Long::sum);
},
s -> {
BooleanSupplier isWithinTakePeriod =
within(System.currentTimeMillis(), COUNT_PERIOD);
s.takeWhile(e -> isWithinTakePeriod.getAsBoolean())
.mapToLong(e -> 1).reduce(0, Long::sum);
},
s -> {
BooleanSupplier isWithinTakePeriod =
within(System.currentTimeMillis(), COUNT_PERIOD);
s.takeWhile(e -> isWithinTakePeriod.getAsBoolean())
.map(e -> 1).reduce(0, Long::sum);
},
s -> {
BooleanSupplier isWithinTakePeriod =
within(System.currentTimeMillis(), COUNT_PERIOD);
s.takeWhile(e -> isWithinTakePeriod.getAsBoolean())
.mapToLong(e -> 1).reduce(0, Long::sum);
});
}
@Test
public void testCountTakeWithCount() {
testTakeWhileMulti(
s -> {
AtomicLong c = new AtomicLong();
long rc = s.takeWhile(e -> c.getAndIncrement() < TAKE_WHILE_COUNT_LIMIT)
.mapToLong(e -> 1).reduce(0, Long::sum);
assertTrue(rc <= c.get());
},
s -> {
AtomicLong c = new AtomicLong();
long rc = s.takeWhile(e -> c.getAndIncrement() < TAKE_WHILE_COUNT_LIMIT)
.mapToLong(e -> 1).reduce(0, Long::sum);
assertTrue(rc <= c.get());
},
s -> {
AtomicLong c = new AtomicLong();
long rc = s.takeWhile(e -> c.getAndIncrement() < TAKE_WHILE_COUNT_LIMIT)
.map(e -> 1).reduce(0, Long::sum);
assertTrue(rc <= c.get());
},
s -> {
AtomicLong c = new AtomicLong();
long rc = s.takeWhile(e -> c.getAndIncrement() < TAKE_WHILE_COUNT_LIMIT)
.mapToLong(e -> 1).reduce(0, Long::sum);
assertTrue(rc <= c.get());
});
}
@Test
public void testCountTakeWithToArray() {
testTakeWhileMulti(
s -> {
AtomicLong c = new AtomicLong();
Object[] ra = s.takeWhile(e -> c.getAndIncrement() < TAKE_WHILE_COUNT_LIMIT)
.toArray();
assertTrue(ra.length <= c.get());
},
s -> {
AtomicLong c = new AtomicLong();
int[] ra = s.takeWhile(e -> c.getAndIncrement() < TAKE_WHILE_COUNT_LIMIT)
.toArray();
assertTrue(ra.length <= c.get());
},
s -> {
AtomicLong c = new AtomicLong();
long[] ra = s.takeWhile(e -> c.getAndIncrement() < TAKE_WHILE_COUNT_LIMIT)
.toArray();
assertTrue(ra.length <= c.get());
},
s -> {
AtomicLong c = new AtomicLong();
double[] ra = s.takeWhile(e -> c.getAndIncrement() < TAKE_WHILE_COUNT_LIMIT)
.toArray();
assertTrue(ra.length <= c.get());
});
}
@Test
public void testCountDropWithCount() {
testDropWhileMulti(
s -> {
AtomicLong c = new AtomicLong();
long rc = s.dropWhile(e -> c.getAndIncrement() < DROP_WHILE_COUNT_LIMIT)
.mapToLong(e -> 1).reduce(0, Long::sum);
assertTrue(c.get() >= DROP_WHILE_COUNT_LIMIT);
assertTrue(rc <= DROP_SOURCE_SIZE);
},
s -> {
AtomicLong c = new AtomicLong();
long rc = s.dropWhile(e -> c.getAndIncrement() < DROP_WHILE_COUNT_LIMIT)
.mapToLong(e -> 1).reduce(0, Long::sum);
assertTrue(c.get() >= DROP_WHILE_COUNT_LIMIT);
assertTrue(rc <= DROP_SOURCE_SIZE);
},
s -> {
AtomicLong c = new AtomicLong();
long rc = s.dropWhile(e -> c.getAndIncrement() < DROP_WHILE_COUNT_LIMIT)
.map(e -> 1).reduce(0, Long::sum);
assertTrue(c.get() >= DROP_WHILE_COUNT_LIMIT);
assertTrue(rc <= DROP_SOURCE_SIZE);
},
s -> {
AtomicLong c = new AtomicLong();
long rc = s.dropWhile(e -> c.getAndIncrement() < DROP_WHILE_COUNT_LIMIT)
.mapToLong(e -> 1).reduce(0, Long::sum);
assertTrue(c.get() >= DROP_WHILE_COUNT_LIMIT);
assertTrue(rc <= DROP_SOURCE_SIZE);
});
}
@Test
public void testCountDropWithToArray() {
testDropWhileMulti(
s -> {
AtomicLong c = new AtomicLong();
Object[] ra = s.dropWhile(e -> c.getAndIncrement() < DROP_WHILE_COUNT_LIMIT)
.toArray();
assertTrue(c.get() >= DROP_WHILE_COUNT_LIMIT);
assertTrue(ra.length <= DROP_SOURCE_SIZE);
},
s -> {
AtomicLong c = new AtomicLong();
int[] ra = s.dropWhile(e -> c.getAndIncrement() < DROP_WHILE_COUNT_LIMIT)
.toArray();
assertTrue(c.get() >= DROP_WHILE_COUNT_LIMIT);
assertTrue(ra.length <= DROP_SOURCE_SIZE);
},
s -> {
AtomicLong c = new AtomicLong();
long[] ra = s.dropWhile(e -> c.getAndIncrement() < DROP_WHILE_COUNT_LIMIT)
.toArray();
assertTrue(c.get() >= DROP_WHILE_COUNT_LIMIT);
assertTrue(ra.length <= DROP_SOURCE_SIZE);
},
s -> {
AtomicLong c = new AtomicLong();
double[] ra = s.dropWhile(e -> c.getAndIncrement() < DROP_WHILE_COUNT_LIMIT)
.toArray();
assertTrue(c.get() >= DROP_WHILE_COUNT_LIMIT);
assertTrue(ra.length <= DROP_SOURCE_SIZE);
});
}
private void testTakeWhileMulti(Consumer<Stream<Integer>> mRef,
Consumer<IntStream> mInt,
Consumer<LongStream> mLong,
Consumer<DoubleStream> mDouble) {
Map<String, Supplier<Stream<Integer>>> sources = new HashMap<>();
sources.put("Stream.generate()", () -> Stream.generate(() -> 1));
sources.put("Stream.iterate()", () -> Stream.iterate(1, x -> 1));
sources.put("Stream.iterate().unordered()", () -> Stream.iterate(1, x -> 1));
testWhileMulti(sources, mRef, mInt, mLong, mDouble);
}
private void testDropWhileMulti(Consumer<Stream<Integer>> mRef,
Consumer<IntStream> mInt,
Consumer<LongStream> mLong,
Consumer<DoubleStream> mDouble) {
Map<String, Supplier<Stream<Integer>>> sources = new HashMap<>();
sources.put("IntStream.range().boxed()",
() -> IntStream.range(0, DROP_SOURCE_SIZE).boxed());
sources.put("IntStream.range().boxed().unordered()",
() -> IntStream.range(0, DROP_SOURCE_SIZE).boxed().unordered());
sources.put("LinkedList.stream()",
() -> IntStream.range(0, DROP_SOURCE_SIZE).boxed()
.collect(toCollection(LinkedList::new))
.stream());
sources.put("LinkedList.stream().unordered()",
() -> IntStream.range(0, DROP_SOURCE_SIZE).boxed()
.collect(toCollection(LinkedList::new))
.stream()
.unordered());
testWhileMulti(sources, mRef, mInt, mLong, mDouble);
}
private void testWhileMulti(Map<String, Supplier<Stream<Integer>>> sources,
Consumer<Stream<Integer>> mRef,
Consumer<IntStream> mInt,
Consumer<LongStream> mLong,
Consumer<DoubleStream> mDouble) {
Map<String, Function<Stream<Integer>, Stream<Integer>>> transforms = new HashMap<>();
transforms.put("Stream.sequential()", s -> {
BooleanSupplier isWithinExecutionPeriod = within(System.currentTimeMillis(),
EXECUTION_TIME_LIMIT);
return s.peek(e -> {
if (!isWithinExecutionPeriod.getAsBoolean()) {
throw new RuntimeException();
}
});
});
transforms.put("Stream.parallel()", s -> {
BooleanSupplier isWithinExecutionPeriod = within(System.currentTimeMillis(),
EXECUTION_TIME_LIMIT);
return s.parallel()
.peek(e -> {
if (!isWithinExecutionPeriod.getAsBoolean()) {
throw new RuntimeException();
}
});
});
Map<String, Consumer<Stream<Integer>>> actions = new HashMap<>();
actions.put("Ref", mRef);
actions.put("Int", s -> mInt.accept(s.mapToInt(e -> e)));
actions.put("Long", s -> mLong.accept(s.mapToLong(e -> e)));
actions.put("Double", s -> mDouble.accept(s.mapToDouble(e -> e)));
actions.put("Ref using defaults", s -> mRef.accept(DefaultMethodStreams.delegateTo(s)));
actions.put("Int using defaults", s -> mInt.accept(DefaultMethodStreams.delegateTo(s.mapToInt(e -> e))));
actions.put("Long using defaults", s -> mLong.accept(DefaultMethodStreams.delegateTo(s.mapToLong(e -> e))));
actions.put("Double using defaults", s -> mDouble.accept(DefaultMethodStreams.delegateTo(s.mapToDouble(e -> e))));
for (Map.Entry<String, Supplier<Stream<Integer>>> s : sources.entrySet()) {
setContext("source", s.getKey());
for (Map.Entry<String, Function<Stream<Integer>, Stream<Integer>>> t : transforms.entrySet()) {
setContext("transform", t.getKey());
for (Map.Entry<String, Consumer<Stream<Integer>>> a : actions.entrySet()) {
setContext("shape", a.getKey());
Stream<Integer> stream = s.getValue().get();
stream = t.getValue().apply(stream);
a.getValue().accept(stream);
}
}
}
}
static BooleanSupplier within(long start, long durationInMillis) {
return () -> (System.currentTimeMillis() - start) < durationInMillis;
}
}

View File

@ -0,0 +1,361 @@
/*
* Copyright (c) 2015, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package org.openjdk.tests.java.util.stream;
import org.testng.annotations.Test;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.DefaultMethodStreams;
import java.util.stream.DoubleStream;
import java.util.stream.IntStream;
import java.util.stream.LambdaTestHelpers;
import java.util.stream.LongStream;
import java.util.stream.OpTestCase;
import java.util.stream.Stream;
import java.util.stream.StreamTestDataProvider;
import java.util.stream.TestData;
/*
* @test
* @bug 8071597
*/
@Test
public class WhileOpTest extends OpTestCase {
@Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
public void testTakeWhileOps(String name, TestData.OfRef<Integer> data) {
for (int size : sizes(data.size())) {
setContext("takeWhile", size);
testWhileMulti(data,
whileResultAsserter(data, WhileOp.Take, e -> e < size),
s -> s.takeWhile(e -> e < size),
s -> s.takeWhile(e -> e < size),
s -> s.takeWhile(e -> e < size),
s -> s.takeWhile(e -> e < size));
testWhileMulti(data,
whileResultAsserter(data, WhileOp.Take, e -> e < size / 2),
s -> s.takeWhile(e -> e < size).takeWhile(e -> e < size / 2),
s -> s.takeWhile(e -> e < size).takeWhile(e -> e < size / 2),
s -> s.takeWhile(e -> e < size).takeWhile(e -> e < size / 2),
s -> s.takeWhile(e -> e < size).takeWhile(e -> e < size / 2));
}
}
@Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
public void testDropWhileOps(String name, TestData.OfRef<Integer> data) {
for (int size : sizes(data.size())) {
setContext("dropWhile", size);
testWhileMulti(data,
whileResultAsserter(data, WhileOp.Drop, e -> e < size),
s -> s.dropWhile(e -> e < size),
s -> s.dropWhile(e -> e < size),
s -> s.dropWhile(e -> e < size),
s -> s.dropWhile(e -> e < size));
testWhileMulti(data,
whileResultAsserter(data, WhileOp.Drop, e -> e < size),
s -> s.dropWhile(e -> e < size / 2).dropWhile(e -> e < size),
s -> s.dropWhile(e -> e < size / 2).dropWhile(e -> e < size),
s -> s.dropWhile(e -> e < size / 2).dropWhile(e -> e < size),
s -> s.dropWhile(e -> e < size / 2).dropWhile(e -> e < size));
}
}
@Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
public void testDropTakeWhileOps(String name, TestData.OfRef<Integer> data) {
for (int size : sizes(data.size())) {
setContext("dropWhile", size);
testWhileMulti(data,
whileResultAsserter(data, WhileOp.Undefined, null),
s -> s.dropWhile(e -> e < size / 2).takeWhile(e -> e < size),
s -> s.dropWhile(e -> e < size / 2).takeWhile(e -> e < size),
s -> s.dropWhile(e -> e < size / 2).takeWhile(e -> e < size),
s -> s.dropWhile(e -> e < size / 2).takeWhile(e -> e < size));
}
}
/**
* While operation type to be asserted on
*/
enum WhileOp {
/**
* The takeWhile operation
*/
Take,
/**
* The dropWhile operation
*/
Drop,
/**
* The operation(s) are undefined
*/
Undefined
}
/**
* Create a result asserter for takeWhile or dropWhile operations.
* <p>
* If the stream pipeline consists of the takeWhile operation
* ({@link WhileOp#Take}) or the dropWhile operation ({@link WhileOp#Drop})
* then specific assertions can be made on the actual result based on the
* input elements, {@code inputData}, and whether those elements match the
* predicate, {@code p}, of the operation.
* <p>
* If the input elements have an encounter order then the actual result
* is asserted against the result of operating sequentially on input
* elements given the predicate and in accordance with the operation
* semantics. (The actual result whether produced sequentially or in
* parallel should the same.)
* <p>
* If the input elements have no encounter order then an actual result
* is, for practical purposes, considered non-deterministic.
* Consider an input list of lists that contains all possible permutations
* of the input elements, and a output list of lists that is the result of
* applying the pipeline with the operation sequentially to each input
* list.
* Any list in the output lists is a valid result. It's not practical to
* test in such a manner.
* For a takeWhile operation the following assertions can be made if
* only some of the input elements match the predicate (i.e. taking will
* short-circuit the pipeline):
* <ol>
* <li>The set of output elements is a subset of the set of matching
* input elements</li>
* <li>The set of output elements and the set of non-matching input
* element are disjoint</li>
* </ol>
* For a dropWhile operation the following assertions can be made:
* <ol>
* <li>The set of non-matching input elements is a subset of the set of
* output elements</li>
* <li>The set of matching output elements is a subset of the set of
* matching input elements</li>
* </ol>
*
* @param inputData the elements input into the stream pipeline
* @param op the operation of the stream pipeline, one of takeWhile,
* dropWhile, or an undefined set of operations (possibly including
* two or more takeWhile and/or dropWhile operations, or because
* the predicate is not stateless).
* @param p the stateless predicate applied to the operation, ignored if
* the
* operation is {@link WhileOp#Undefined}.
* @param <T> the type of elements
* @return a result asserter
*/
private <T> ResultAsserter<Iterable<T>> whileResultAsserter(Iterable<T> inputData,
WhileOp op,
Predicate<? super T> p) {
return (act, exp, ord, par) -> {
if (par & !ord) {
List<T> input = new ArrayList<>();
inputData.forEach(input::add);
List<T> output = new ArrayList<>();
act.forEach(output::add);
if (op == WhileOp.Take) {
List<T> matchingInput = new ArrayList<>();
List<T> nonMatchingInput = new ArrayList<>();
input.forEach(t -> {
if (p.test(t))
matchingInput.add(t);
else
nonMatchingInput.add(t);
});
// If some, not all, elements are taken
if (matchingInput.size() < input.size()) {
assertTrue(output.size() <= matchingInput.size(),
"Output is larger than the matching input");
// The output must be a subset of the matching input
assertTrue(matchingInput.containsAll(output),
"Output is not a subset of the matching input");
// The output must not contain any non matching elements
for (T nonMatching : nonMatchingInput) {
assertFalse(output.contains(nonMatching),
"Output and non-matching input are not disjoint");
}
}
}
else if (op == WhileOp.Drop) {
List<T> matchingInput = new ArrayList<>();
List<T> nonMatchingInput = new ArrayList<>();
input.forEach(t -> {
if (p.test(t))
matchingInput.add(t);
else
nonMatchingInput.add(t);
});
// The non matching input must be a subset of output
assertTrue(output.containsAll(nonMatchingInput),
"Non-matching input is not a subset of the output");
// The matching output must be a subset of the matching input
List<T> matchingOutput = new ArrayList<>();
output.forEach(i -> {
if (p.test(i))
matchingOutput.add(i);
});
assertTrue(matchingInput.containsAll(matchingOutput),
"Matching output is not a subset of matching input");
}
// Note: if there is a combination of takeWhile and dropWhile then specific
// assertions cannot be performed.
// All that can be reliably asserted is the output is a subset of the input
assertTrue(input.containsAll(output));
}
else {
// For specific operations derive expected result from the input
if (op == WhileOp.Take) {
List<T> takeInput = new ArrayList<>();
for (T t : inputData) {
if (p.test(t))
takeInput.add(t);
else
break;
}
LambdaTestHelpers.assertContents(act, takeInput);
}
else if (op == WhileOp.Drop) {
List<T> dropInput = new ArrayList<>();
for (T t : inputData) {
if (dropInput.size() > 0 || !p.test(t))
dropInput.add(t);
}
LambdaTestHelpers.assertContents(act, dropInput);
}
LambdaTestHelpers.assertContents(act, exp);
}
};
}
private Collection<Integer> sizes(int s) {
Set<Integer> sizes = new LinkedHashSet<>();
sizes.add(0);
sizes.add(1);
sizes.add(s / 4);
sizes.add(s / 2);
sizes.add(3 * s / 4);
sizes.add(Math.max(0, s - 1));
sizes.add(s);
sizes.add(Integer.MAX_VALUE);
return sizes;
}
private void testWhileMulti(TestData.OfRef<Integer> data,
ResultAsserter<Iterable<Integer>> ra,
Function<Stream<Integer>, Stream<Integer>> mRef,
Function<IntStream, IntStream> mInt,
Function<LongStream, LongStream> mLong,
Function<DoubleStream, DoubleStream> mDouble) {
Map<String, Function<Stream<Integer>, Stream<Integer>>> ms = new HashMap<>();
ms.put("Ref", mRef);
ms.put("Int", s -> mInt.apply(s.mapToInt(e -> e)).mapToObj(e -> e));
ms.put("Long", s -> mLong.apply(s.mapToLong(e -> e)).mapToObj(e -> (int) e));
ms.put("Double", s -> mDouble.apply(s.mapToDouble(e -> e)).mapToObj(e -> (int) e));
ms.put("Ref using defaults", s -> mRef.apply(DefaultMethodStreams.delegateTo(s)));
ms.put("Int using defaults", s -> mInt.apply(DefaultMethodStreams.delegateTo(s.mapToInt(e -> e))).mapToObj(e -> e));
ms.put("Long using defaults", s -> mLong.apply(DefaultMethodStreams.delegateTo(s.mapToLong(e -> e))).mapToObj(e -> (int) e));
ms.put("Double using defaults", s -> mDouble.apply(DefaultMethodStreams.delegateTo(s.mapToDouble(e -> e))).mapToObj(e -> (int) e));
testWhileMulti(data, ra, ms);
}
private final void testWhileMulti(TestData.OfRef<Integer> data,
ResultAsserter<Iterable<Integer>> ra,
Map<String, Function<Stream<Integer>, Stream<Integer>>> ms) {
for (Map.Entry<String, Function<Stream<Integer>, Stream<Integer>>> e : ms.entrySet()) {
setContext("shape", e.getKey());
withData(data)
.stream(e.getValue())
.resultAsserter(ra)
.exercise();
}
}
@Test
public void testRefDefaultClose() {
AtomicBoolean isClosed = new AtomicBoolean();
Stream<Integer> s = Stream.of(1, 2, 3).onClose(() -> isClosed.set(true));
try (Stream<Integer> ds = DefaultMethodStreams.delegateTo(s).takeWhile(e -> e < 3)) {
ds.count();
}
assertTrue(isClosed.get());
}
@Test
public void testIntDefaultClose() {
AtomicBoolean isClosed = new AtomicBoolean();
IntStream s = IntStream.of(1, 2, 3).onClose(() -> isClosed.set(true));
try (IntStream ds = DefaultMethodStreams.delegateTo(s).takeWhile(e -> e < 3)) {
ds.count();
}
assertTrue(isClosed.get());
}
@Test
public void testLongDefaultClose() {
AtomicBoolean isClosed = new AtomicBoolean();
LongStream s = LongStream.of(1, 2, 3).onClose(() -> isClosed.set(true));
try (LongStream ds = DefaultMethodStreams.delegateTo(s).takeWhile(e -> e < 3)) {
ds.count();
}
assertTrue(isClosed.get());
}
@Test
public void testDoubleDefaultClose() {
AtomicBoolean isClosed = new AtomicBoolean();
DoubleStream s = DoubleStream.of(1, 2, 3).onClose(() -> isClosed.set(true));
try (DoubleStream ds = DefaultMethodStreams.delegateTo(s).takeWhile(e -> e < 3)) {
ds.count();
}
assertTrue(isClosed.get());
}
}