8196106: Support nested infinite or recursive flat mapped streams
Reviewed-by: psandoz
This commit is contained in:
parent
58911ccc2c
commit
8a5b86c529
@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright (c) 2012, 2021, Oracle and/or its affiliates. All rights reserved.
|
||||
* Copyright (c) 2012, 2024, Oracle and/or its affiliates. All rights reserved.
|
||||
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
||||
*
|
||||
* This code is free software; you can redistribute it and/or modify it
|
||||
@ -433,6 +433,20 @@ abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns whether any of the stages in the (entire) pipeline is short-circuiting
|
||||
* or not.
|
||||
* @return {@code true} if any stage in this pipeline is short-circuiting,
|
||||
* {@code false} if not.
|
||||
*/
|
||||
protected final boolean isShortCircuitingPipeline() {
|
||||
for (var u = sourceStage.nextStage; u != null; u = u.nextStage) {
|
||||
if (StreamOpFlag.SHORT_CIRCUIT.isKnown(u.combinedFlags))
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the source spliterator for this pipeline stage. For a sequential or
|
||||
* stateless parallel pipeline, this is the source spliterator. For a
|
||||
|
@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright (c) 2013, 2023, Oracle and/or its affiliates. All rights reserved.
|
||||
* Copyright (c) 2013, 2024, Oracle and/or its affiliates. All rights reserved.
|
||||
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
||||
*
|
||||
* This code is free software; you can redistribute it and/or modify it
|
||||
@ -39,6 +39,7 @@ import java.util.function.DoublePredicate;
|
||||
import java.util.function.DoubleToIntFunction;
|
||||
import java.util.function.DoubleToLongFunction;
|
||||
import java.util.function.DoubleUnaryOperator;
|
||||
import java.util.function.IntConsumer;
|
||||
import java.util.function.IntFunction;
|
||||
import java.util.function.ObjDoubleConsumer;
|
||||
import java.util.function.Supplier;
|
||||
@ -263,43 +264,46 @@ abstract class DoublePipeline<E_IN>
|
||||
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
|
||||
@Override
|
||||
Sink<Double> opWrapSink(int flags, Sink<Double> sink) {
|
||||
return new Sink.ChainedDouble<>(sink) {
|
||||
// true if cancellationRequested() has been called
|
||||
boolean cancellationRequestedCalled;
|
||||
final DoubleConsumer fastPath =
|
||||
isShortCircuitingPipeline()
|
||||
? null
|
||||
: (sink instanceof DoubleConsumer dc)
|
||||
? dc
|
||||
: sink::accept;
|
||||
final class FlatMap implements Sink.OfDouble, DoublePredicate {
|
||||
boolean cancel;
|
||||
|
||||
// cache the consumer to avoid creation on every accepted element
|
||||
DoubleConsumer downstreamAsDouble = downstream::accept;
|
||||
@Override public void begin(long size) { sink.begin(-1); }
|
||||
@Override public void end() { sink.end(); }
|
||||
|
||||
@Override
|
||||
public void begin(long size) {
|
||||
downstream.begin(-1);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void accept(double t) {
|
||||
try (DoubleStream result = mapper.apply(t)) {
|
||||
public void accept(double e) {
|
||||
try (DoubleStream result = mapper.apply(e)) {
|
||||
if (result != null) {
|
||||
if (!cancellationRequestedCalled) {
|
||||
result.sequential().forEach(downstreamAsDouble);
|
||||
} else {
|
||||
var s = result.sequential().spliterator();
|
||||
do {
|
||||
} while (!downstream.cancellationRequested() && s.tryAdvance(downstreamAsDouble));
|
||||
}
|
||||
if (fastPath == null)
|
||||
result.sequential().allMatch(this);
|
||||
else
|
||||
result.sequential().forEach(fastPath);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean cancellationRequested() {
|
||||
// If this method is called then an operation within the stream
|
||||
// pipeline is short-circuiting (see AbstractPipeline.copyInto).
|
||||
// Note that we cannot differentiate between an upstream or
|
||||
// downstream operation
|
||||
cancellationRequestedCalled = true;
|
||||
return downstream.cancellationRequested();
|
||||
return cancel || (cancel |= sink.cancellationRequested());
|
||||
}
|
||||
};
|
||||
|
||||
@Override
|
||||
public boolean test(double output) {
|
||||
if (!cancel) {
|
||||
sink.accept(output);
|
||||
return !(cancel |= sink.cancellationRequested());
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
return new FlatMap();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
@ -28,23 +28,15 @@ import jdk.internal.vm.annotation.ForceInline;
|
||||
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.lang.invoke.VarHandle;
|
||||
import java.util.Comparator;
|
||||
import java.util.Iterator;
|
||||
import java.util.Optional;
|
||||
import java.util.Spliterator;
|
||||
import java.util.concurrent.CountedCompleter;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.BiFunction;
|
||||
import java.util.function.BinaryOperator;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.IntFunction;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.function.ToDoubleFunction;
|
||||
import java.util.function.ToIntFunction;
|
||||
import java.util.function.ToLongFunction;
|
||||
import java.util.stream.Gatherer.Integrator;
|
||||
|
||||
/**
|
||||
|
@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright (c) 2012, 2023, Oracle and/or its affiliates. All rights reserved.
|
||||
* Copyright (c) 2012, 2024, Oracle and/or its affiliates. All rights reserved.
|
||||
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
||||
*
|
||||
* This code is free software; you can redistribute it and/or modify it
|
||||
@ -297,43 +297,46 @@ abstract class IntPipeline<E_IN>
|
||||
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
|
||||
@Override
|
||||
Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
|
||||
return new Sink.ChainedInt<>(sink) {
|
||||
// true if cancellationRequested() has been called
|
||||
boolean cancellationRequestedCalled;
|
||||
final IntConsumer fastPath =
|
||||
isShortCircuitingPipeline()
|
||||
? null
|
||||
: (sink instanceof IntConsumer ic)
|
||||
? ic
|
||||
: sink::accept;
|
||||
final class FlatMap implements Sink.OfInt, IntPredicate {
|
||||
boolean cancel;
|
||||
|
||||
// cache the consumer to avoid creation on every accepted element
|
||||
IntConsumer downstreamAsInt = downstream::accept;
|
||||
@Override public void begin(long size) { sink.begin(-1); }
|
||||
@Override public void end() { sink.end(); }
|
||||
|
||||
@Override
|
||||
public void begin(long size) {
|
||||
downstream.begin(-1);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void accept(int t) {
|
||||
try (IntStream result = mapper.apply(t)) {
|
||||
public void accept(int e) {
|
||||
try (IntStream result = mapper.apply(e)) {
|
||||
if (result != null) {
|
||||
if (!cancellationRequestedCalled) {
|
||||
result.sequential().forEach(downstreamAsInt);
|
||||
} else {
|
||||
var s = result.sequential().spliterator();
|
||||
do {
|
||||
} while (!downstream.cancellationRequested() && s.tryAdvance(downstreamAsInt));
|
||||
}
|
||||
if (fastPath == null)
|
||||
result.sequential().allMatch(this);
|
||||
else
|
||||
result.sequential().forEach(fastPath);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean cancellationRequested() {
|
||||
// If this method is called then an operation within the stream
|
||||
// pipeline is short-circuiting (see AbstractPipeline.copyInto).
|
||||
// Note that we cannot differentiate between an upstream or
|
||||
// downstream operation
|
||||
cancellationRequestedCalled = true;
|
||||
return downstream.cancellationRequested();
|
||||
return cancel || (cancel |= sink.cancellationRequested());
|
||||
}
|
||||
};
|
||||
|
||||
@Override
|
||||
public boolean test(int output) {
|
||||
if (!cancel) {
|
||||
sink.accept(output);
|
||||
return !(cancel |= sink.cancellationRequested());
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
return new FlatMap();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright (c) 2013, 2023, Oracle and/or its affiliates. All rights reserved.
|
||||
* Copyright (c) 2013, 2024, Oracle and/or its affiliates. All rights reserved.
|
||||
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
||||
*
|
||||
* This code is free software; you can redistribute it and/or modify it
|
||||
@ -33,6 +33,7 @@ import java.util.Spliterator;
|
||||
import java.util.Spliterators;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.BinaryOperator;
|
||||
import java.util.function.IntConsumer;
|
||||
import java.util.function.IntFunction;
|
||||
import java.util.function.LongBinaryOperator;
|
||||
import java.util.function.LongConsumer;
|
||||
@ -279,43 +280,46 @@ abstract class LongPipeline<E_IN>
|
||||
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
|
||||
@Override
|
||||
Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
|
||||
return new Sink.ChainedLong<>(sink) {
|
||||
// true if cancellationRequested() has been called
|
||||
boolean cancellationRequestedCalled;
|
||||
final LongConsumer fastPath =
|
||||
isShortCircuitingPipeline()
|
||||
? null
|
||||
: (sink instanceof LongConsumer lc)
|
||||
? lc
|
||||
: sink::accept;
|
||||
final class FlatMap implements Sink.OfLong, LongPredicate {
|
||||
boolean cancel;
|
||||
|
||||
// cache the consumer to avoid creation on every accepted element
|
||||
LongConsumer downstreamAsLong = downstream::accept;
|
||||
@Override public void begin(long size) { sink.begin(-1); }
|
||||
@Override public void end() { sink.end(); }
|
||||
|
||||
@Override
|
||||
public void begin(long size) {
|
||||
downstream.begin(-1);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void accept(long t) {
|
||||
try (LongStream result = mapper.apply(t)) {
|
||||
public void accept(long e) {
|
||||
try (LongStream result = mapper.apply(e)) {
|
||||
if (result != null) {
|
||||
if (!cancellationRequestedCalled) {
|
||||
result.sequential().forEach(downstreamAsLong);
|
||||
} else {
|
||||
var s = result.sequential().spliterator();
|
||||
do {
|
||||
} while (!downstream.cancellationRequested() && s.tryAdvance(downstreamAsLong));
|
||||
}
|
||||
if (fastPath == null)
|
||||
result.sequential().allMatch(this);
|
||||
else
|
||||
result.sequential().forEach(fastPath);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean cancellationRequested() {
|
||||
// If this method is called then an operation within the stream
|
||||
// pipeline is short-circuiting (see AbstractPipeline.copyInto).
|
||||
// Note that we cannot differentiate between an upstream or
|
||||
// downstream operation
|
||||
cancellationRequestedCalled = true;
|
||||
return downstream.cancellationRequested();
|
||||
return cancel || (cancel |= sink.cancellationRequested());
|
||||
}
|
||||
};
|
||||
|
||||
@Override
|
||||
public boolean test(long output) {
|
||||
if (!cancel) {
|
||||
sink.accept(output);
|
||||
return !(cancel |= sink.cancellationRequested());
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
return new FlatMap();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright (c) 2012, 2023, Oracle and/or its affiliates. All rights reserved.
|
||||
* Copyright (c) 2012, 2024, Oracle and/or its affiliates. All rights reserved.
|
||||
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
||||
*
|
||||
* This code is free software; you can redistribute it and/or modify it
|
||||
@ -36,10 +36,13 @@ import java.util.function.BiFunction;
|
||||
import java.util.function.BinaryOperator;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.DoubleConsumer;
|
||||
import java.util.function.DoublePredicate;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.IntConsumer;
|
||||
import java.util.function.IntFunction;
|
||||
import java.util.function.IntPredicate;
|
||||
import java.util.function.LongConsumer;
|
||||
import java.util.function.LongPredicate;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.function.ToDoubleFunction;
|
||||
@ -274,40 +277,41 @@ abstract class ReferencePipeline<P_IN, P_OUT>
|
||||
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
|
||||
@Override
|
||||
Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
|
||||
return new Sink.ChainedReference<>(sink) {
|
||||
// true if cancellationRequested() has been called
|
||||
boolean cancellationRequestedCalled;
|
||||
boolean shorts = isShortCircuitingPipeline();
|
||||
final class FlatMap implements Sink<P_OUT>, Predicate<R> {
|
||||
boolean cancel;
|
||||
|
||||
@Override public void begin(long size) { sink.begin(-1); }
|
||||
@Override public void end() { sink.end(); }
|
||||
|
||||
@Override
|
||||
public void begin(long size) {
|
||||
downstream.begin(-1);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void accept(P_OUT u) {
|
||||
try (Stream<? extends R> result = mapper.apply(u)) {
|
||||
public void accept(P_OUT e) {
|
||||
try (Stream<? extends R> result = mapper.apply(e)) {
|
||||
if (result != null) {
|
||||
if (!cancellationRequestedCalled) {
|
||||
result.sequential().forEach(downstream);
|
||||
} else {
|
||||
var s = result.sequential().spliterator();
|
||||
do {
|
||||
} while (!downstream.cancellationRequested() && s.tryAdvance(downstream));
|
||||
}
|
||||
if (shorts)
|
||||
result.sequential().allMatch(this);
|
||||
else
|
||||
result.sequential().forEach(sink);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean cancellationRequested() {
|
||||
// If this method is called then an operation within the stream
|
||||
// pipeline is short-circuiting (see AbstractPipeline.copyInto).
|
||||
// Note that we cannot differentiate between an upstream or
|
||||
// downstream operation
|
||||
cancellationRequestedCalled = true;
|
||||
return downstream.cancellationRequested();
|
||||
return cancel || (cancel |= sink.cancellationRequested());
|
||||
}
|
||||
};
|
||||
|
||||
@Override
|
||||
public boolean test(R output) {
|
||||
if (!cancel) {
|
||||
sink.accept(output);
|
||||
return !(cancel |= sink.cancellationRequested());
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
return new FlatMap();
|
||||
}
|
||||
};
|
||||
}
|
||||
@ -319,39 +323,46 @@ abstract class ReferencePipeline<P_IN, P_OUT>
|
||||
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
|
||||
@Override
|
||||
Sink<P_OUT> opWrapSink(int flags, Sink<Integer> sink) {
|
||||
return new Sink.ChainedReference<>(sink) {
|
||||
// true if cancellationRequested() has been called
|
||||
boolean cancellationRequestedCalled;
|
||||
IntConsumer fastPath =
|
||||
isShortCircuitingPipeline()
|
||||
? null
|
||||
: (sink instanceof IntConsumer ic)
|
||||
? ic
|
||||
: sink::accept;
|
||||
final class FlatMap implements Sink<P_OUT>, IntPredicate {
|
||||
boolean cancel;
|
||||
|
||||
// cache the consumer to avoid creation on every accepted element
|
||||
IntConsumer downstreamAsInt = downstream::accept;
|
||||
@Override public void begin(long size) { sink.begin(-1); }
|
||||
@Override public void end() { sink.end(); }
|
||||
|
||||
@Override
|
||||
public void begin(long size) {
|
||||
downstream.begin(-1);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void accept(P_OUT u) {
|
||||
try (IntStream result = mapper.apply(u)) {
|
||||
public void accept(P_OUT e) {
|
||||
try (IntStream result = mapper.apply(e)) {
|
||||
if (result != null) {
|
||||
if (!cancellationRequestedCalled) {
|
||||
result.sequential().forEach(downstreamAsInt);
|
||||
} else {
|
||||
var s = result.sequential().spliterator();
|
||||
do {
|
||||
} while (!downstream.cancellationRequested() && s.tryAdvance(downstreamAsInt));
|
||||
}
|
||||
if (fastPath == null)
|
||||
result.sequential().allMatch(this);
|
||||
else
|
||||
result.sequential().forEach(fastPath);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean cancellationRequested() {
|
||||
cancellationRequestedCalled = true;
|
||||
return downstream.cancellationRequested();
|
||||
return cancel || (cancel |= sink.cancellationRequested());
|
||||
}
|
||||
};
|
||||
|
||||
@Override
|
||||
public boolean test(int output) {
|
||||
if (!cancel) {
|
||||
sink.accept(output);
|
||||
return !(cancel |= sink.cancellationRequested());
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
return new FlatMap();
|
||||
}
|
||||
};
|
||||
}
|
||||
@ -363,39 +374,46 @@ abstract class ReferencePipeline<P_IN, P_OUT>
|
||||
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
|
||||
@Override
|
||||
Sink<P_OUT> opWrapSink(int flags, Sink<Double> sink) {
|
||||
return new Sink.ChainedReference<>(sink) {
|
||||
// true if cancellationRequested() has been called
|
||||
boolean cancellationRequestedCalled;
|
||||
DoubleConsumer fastPath =
|
||||
isShortCircuitingPipeline()
|
||||
? null
|
||||
: (sink instanceof DoubleConsumer dc)
|
||||
? dc
|
||||
: sink::accept;
|
||||
final class FlatMap implements Sink<P_OUT>, DoublePredicate {
|
||||
boolean cancel;
|
||||
|
||||
// cache the consumer to avoid creation on every accepted element
|
||||
DoubleConsumer downstreamAsDouble = downstream::accept;
|
||||
@Override public void begin(long size) { sink.begin(-1); }
|
||||
@Override public void end() { sink.end(); }
|
||||
|
||||
@Override
|
||||
public void begin(long size) {
|
||||
downstream.begin(-1);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void accept(P_OUT u) {
|
||||
try (DoubleStream result = mapper.apply(u)) {
|
||||
public void accept(P_OUT e) {
|
||||
try (DoubleStream result = mapper.apply(e)) {
|
||||
if (result != null) {
|
||||
if (!cancellationRequestedCalled) {
|
||||
result.sequential().forEach(downstreamAsDouble);
|
||||
} else {
|
||||
var s = result.sequential().spliterator();
|
||||
do {
|
||||
} while (!downstream.cancellationRequested() && s.tryAdvance(downstreamAsDouble));
|
||||
}
|
||||
if (fastPath == null)
|
||||
result.sequential().allMatch(this);
|
||||
else
|
||||
result.sequential().forEach(fastPath);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean cancellationRequested() {
|
||||
cancellationRequestedCalled = true;
|
||||
return downstream.cancellationRequested();
|
||||
return cancel || (cancel |= sink.cancellationRequested());
|
||||
}
|
||||
};
|
||||
|
||||
@Override
|
||||
public boolean test(double output) {
|
||||
if (!cancel) {
|
||||
sink.accept(output);
|
||||
return !(cancel |= sink.cancellationRequested());
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
return new FlatMap();
|
||||
}
|
||||
};
|
||||
}
|
||||
@ -408,39 +426,46 @@ abstract class ReferencePipeline<P_IN, P_OUT>
|
||||
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
|
||||
@Override
|
||||
Sink<P_OUT> opWrapSink(int flags, Sink<Long> sink) {
|
||||
return new Sink.ChainedReference<>(sink) {
|
||||
// true if cancellationRequested() has been called
|
||||
boolean cancellationRequestedCalled;
|
||||
LongConsumer fastPath =
|
||||
isShortCircuitingPipeline()
|
||||
? null
|
||||
: (sink instanceof LongConsumer lc)
|
||||
? lc
|
||||
: sink::accept;
|
||||
final class FlatMap implements Sink<P_OUT>, LongPredicate {
|
||||
boolean cancel;
|
||||
|
||||
// cache the consumer to avoid creation on every accepted element
|
||||
LongConsumer downstreamAsLong = downstream::accept;
|
||||
@Override public void begin(long size) { sink.begin(-1); }
|
||||
@Override public void end() { sink.end(); }
|
||||
|
||||
@Override
|
||||
public void begin(long size) {
|
||||
downstream.begin(-1);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void accept(P_OUT u) {
|
||||
try (LongStream result = mapper.apply(u)) {
|
||||
public void accept(P_OUT e) {
|
||||
try (LongStream result = mapper.apply(e)) {
|
||||
if (result != null) {
|
||||
if (!cancellationRequestedCalled) {
|
||||
result.sequential().forEach(downstreamAsLong);
|
||||
} else {
|
||||
var s = result.sequential().spliterator();
|
||||
do {
|
||||
} while (!downstream.cancellationRequested() && s.tryAdvance(downstreamAsLong));
|
||||
}
|
||||
if (fastPath == null)
|
||||
result.sequential().allMatch(this);
|
||||
else
|
||||
result.sequential().forEach(fastPath);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean cancellationRequested() {
|
||||
cancellationRequestedCalled = true;
|
||||
return downstream.cancellationRequested();
|
||||
return cancel || (cancel |= sink.cancellationRequested());
|
||||
}
|
||||
};
|
||||
|
||||
@Override
|
||||
public boolean test(long output) {
|
||||
if (!cancel) {
|
||||
sink.accept(output);
|
||||
return !(cancel |= sink.cancellationRequested());
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
return new FlatMap();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright (c) 2012, 2020, Oracle and/or its affiliates. All rights reserved.
|
||||
* Copyright (c) 2012, 2024, Oracle and/or its affiliates. All rights reserved.
|
||||
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
||||
*
|
||||
* This code is free software; you can redistribute it and/or modify it
|
||||
@ -24,7 +24,7 @@
|
||||
/*
|
||||
* @test
|
||||
* @summary flat-map operations
|
||||
* @bug 8044047 8076458 8075939
|
||||
* @bug 8044047 8076458 8075939 8196106
|
||||
*/
|
||||
|
||||
package org.openjdk.tests.java.util.stream;
|
||||
@ -273,4 +273,31 @@ public class FlatMapOpTest extends OpTestCase {
|
||||
limit(10).toArray();
|
||||
assertEquals(count.get(), 10);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTerminationOfNestedInfiniteStreams() {
|
||||
var refExpected = Stream.generate(() -> "").limit(5).toList();
|
||||
var refResult = Stream.generate(() -> "")
|
||||
.flatMap(c -> Stream.generate(() -> c).flatMap(x -> Stream.generate(() -> x)))
|
||||
.limit(5).toList();
|
||||
assertEquals(refResult, refExpected);
|
||||
|
||||
var intExpected = IntStream.generate(() -> 1).limit(5).sum();
|
||||
var intResult = IntStream.generate(() -> 1)
|
||||
.flatMap(c -> IntStream.generate(() -> c).flatMap(x -> IntStream.generate(() -> x)))
|
||||
.limit(5).sum();
|
||||
assertEquals(intResult, intExpected);
|
||||
|
||||
var longExpected = LongStream.generate(() -> 1L).limit(5).sum();
|
||||
var longResult = LongStream.generate(() -> 1L)
|
||||
.flatMap(c -> LongStream.generate(() -> c).flatMap(x -> LongStream.generate(() -> x)))
|
||||
.limit(5).sum();
|
||||
assertEquals(longResult, longExpected);
|
||||
|
||||
var doubleExpected = DoubleStream.generate(() -> 0d).limit(5).sum();
|
||||
var doubleResult = DoubleStream.generate(() -> 0d)
|
||||
.flatMap(c -> DoubleStream.generate(() -> c).flatMap(x -> DoubleStream.generate(() -> x)))
|
||||
.limit(5).sum();
|
||||
assertEquals(doubleResult, doubleExpected);
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,45 @@
|
||||
/*
|
||||
* Copyright (c) 2024, Oracle and/or its affiliates. All rights reserved.
|
||||
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
||||
*
|
||||
* This code is free software; you can redistribute it and/or modify it
|
||||
* under the terms of the GNU General Public License version 2 only, as
|
||||
* published by the Free Software Foundation.
|
||||
*
|
||||
* This code is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
|
||||
* version 2 for more details (a copy is included in the LICENSE file that
|
||||
* accompanied this code).
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License version
|
||||
* 2 along with this work; if not, write to the Free Software Foundation,
|
||||
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||
*
|
||||
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
|
||||
* or visit www.oracle.com if you need additional information or have any
|
||||
* questions.
|
||||
*/
|
||||
package org.openjdk.bench.java.util.stream.ops;
|
||||
|
||||
public class DoubleAccumulator {
|
||||
|
||||
double acc;
|
||||
|
||||
public DoubleAccumulator() {
|
||||
acc = 0;
|
||||
}
|
||||
|
||||
public void add(double v) {
|
||||
acc += v;
|
||||
}
|
||||
|
||||
public void merge(DoubleAccumulator other) {
|
||||
acc += other.acc;
|
||||
}
|
||||
|
||||
public double get() {
|
||||
return acc;
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,239 @@
|
||||
/*
|
||||
* Copyright (c) 2024, Oracle and/or its affiliates. All rights reserved.
|
||||
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
||||
*
|
||||
* This code is free software; you can redistribute it and/or modify it
|
||||
* under the terms of the GNU General Public License version 2 only, as
|
||||
* published by the Free Software Foundation.
|
||||
*
|
||||
* This code is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
|
||||
* version 2 for more details (a copy is included in the LICENSE file that
|
||||
* accompanied this code).
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License version
|
||||
* 2 along with this work; if not, write to the Free Software Foundation,
|
||||
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||
*
|
||||
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
|
||||
* or visit www.oracle.com if you need additional information or have any
|
||||
* questions.
|
||||
*/
|
||||
package org.openjdk.bench.java.util.stream.ops.ref;
|
||||
|
||||
import org.openjdk.bench.java.util.stream.ops.DoubleAccumulator;
|
||||
import org.openjdk.bench.java.util.stream.ops.LongAccumulator;
|
||||
import org.openjdk.jmh.annotations.Benchmark;
|
||||
import org.openjdk.jmh.annotations.BenchmarkMode;
|
||||
import org.openjdk.jmh.annotations.Fork;
|
||||
import org.openjdk.jmh.annotations.Measurement;
|
||||
import org.openjdk.jmh.annotations.Mode;
|
||||
import org.openjdk.jmh.annotations.OutputTimeUnit;
|
||||
import org.openjdk.jmh.annotations.Param;
|
||||
import org.openjdk.jmh.annotations.Scope;
|
||||
import org.openjdk.jmh.annotations.Setup;
|
||||
import org.openjdk.jmh.annotations.State;
|
||||
import org.openjdk.jmh.annotations.Warmup;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.IntFunction;
|
||||
import java.util.function.LongFunction;
|
||||
import java.util.function.DoubleFunction;
|
||||
import java.util.stream.Stream;
|
||||
import java.util.stream.DoubleStream;
|
||||
import java.util.stream.LongStream;
|
||||
import java.util.stream.IntStream;
|
||||
import java.util.Arrays;
|
||||
|
||||
/**
|
||||
* Benchmark for flatMap() operation.
|
||||
*/
|
||||
@BenchmarkMode(Mode.Throughput)
|
||||
@Warmup(iterations = 4, time = 2, timeUnit = TimeUnit.SECONDS)
|
||||
@Measurement(iterations = 4, time = 2, timeUnit = TimeUnit.SECONDS)
|
||||
@Fork(value = 3)
|
||||
@OutputTimeUnit(TimeUnit.SECONDS)
|
||||
@State(Scope.Thread)
|
||||
public class FlatMap {
|
||||
|
||||
/**
|
||||
* Implementation notes:
|
||||
* - parallel version requires thread-safe sink, we use the same for sequential version for better comparison
|
||||
* - operations are explicit inner classes to untangle unwanted lambda effects
|
||||
* - the result of applying consecutive operations is the same, in order to have the same number of elements in sink
|
||||
*/
|
||||
|
||||
@Param({"10", "100", "1000"})
|
||||
private int size;
|
||||
|
||||
private Function<Long, Stream<Long>> funArrayStream;
|
||||
private Function<Long, Stream<Long>> funIterateStream;
|
||||
private LongFunction<LongStream> funLongStream;
|
||||
private LongFunction<LongStream> funIterateLongStream;
|
||||
private IntFunction<IntStream> funIntStream;
|
||||
private IntFunction<IntStream> funIterateIntStream;
|
||||
private DoubleFunction<DoubleStream> funDoubleStream;
|
||||
private DoubleFunction<DoubleStream> funIterateDoubleStream;
|
||||
|
||||
private Long[] cachedRefArray;
|
||||
private int[] cachedIntArray;
|
||||
private long[] cachedLongArray;
|
||||
private double[] cachedDoubleArray;
|
||||
|
||||
@Setup
|
||||
public void setup() {
|
||||
final int cachedSize = size;
|
||||
cachedRefArray = new Long[cachedSize];
|
||||
cachedIntArray = new int[cachedSize];
|
||||
cachedLongArray = new long[cachedSize];
|
||||
cachedDoubleArray = new double[cachedSize];
|
||||
for(int i = 0;i < cachedRefArray.length;++i) {
|
||||
cachedRefArray[i] = Long.valueOf(i);
|
||||
cachedIntArray[i] = i;
|
||||
cachedLongArray[i] = i;
|
||||
cachedDoubleArray[i] = i;
|
||||
}
|
||||
|
||||
funArrayStream = new Function<Long, Stream<Long>>() { @Override public Stream<Long> apply(Long l) {
|
||||
return Arrays.stream(cachedRefArray);
|
||||
} };
|
||||
funIterateStream = new Function<Long, Stream<Long>>() { @Override public Stream<Long> apply(Long l) {
|
||||
return Stream.iterate(0L, i -> i + 1).limit(cachedSize); } };
|
||||
funLongStream = new LongFunction<LongStream>() { @Override public LongStream apply(long l) {
|
||||
return Arrays.stream(cachedLongArray); } };
|
||||
funIterateLongStream = new LongFunction<LongStream>() { @Override public LongStream apply(long l) {
|
||||
return LongStream.iterate(0L, i -> i + 1).limit(cachedSize); } };
|
||||
funIntStream = new IntFunction<IntStream>() { @Override public IntStream apply(int i) {
|
||||
return Arrays.stream(cachedIntArray); } };
|
||||
funIterateIntStream = new IntFunction<IntStream>() { @Override public IntStream apply(int i) {
|
||||
return IntStream.iterate(0, ii -> ii + 1).limit(cachedSize); } };
|
||||
funDoubleStream = new DoubleFunction<DoubleStream>() { @Override public DoubleStream apply(double d) {
|
||||
return Arrays.stream(cachedDoubleArray); } };
|
||||
funIterateDoubleStream = new DoubleFunction<DoubleStream>() { @Override public DoubleStream apply(double d) {
|
||||
return DoubleStream.iterate(0d, i -> i + 1d).limit(cachedSize); } };
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
public long seq_array_ref() {
|
||||
return funArrayStream.apply(0L)
|
||||
.flatMap(funArrayStream)
|
||||
.collect(LongAccumulator::new, LongAccumulator::add, LongAccumulator::merge).get();
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
public long par_array_ref() {
|
||||
return funArrayStream.apply(0L)
|
||||
.parallel()
|
||||
.flatMap(funArrayStream)
|
||||
.collect(LongAccumulator::new, LongAccumulator::add, LongAccumulator::merge).get();
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
public long seq_array_long() {
|
||||
return funLongStream.apply(0L)
|
||||
.flatMap(funLongStream)
|
||||
.collect(LongAccumulator::new, LongAccumulator::add, LongAccumulator::merge).get();
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
public long par_array_long() {
|
||||
return funLongStream.apply(0L)
|
||||
.parallel()
|
||||
.flatMap(funLongStream)
|
||||
.collect(LongAccumulator::new, LongAccumulator::add, LongAccumulator::merge).get();
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
public long seq_array_int() {
|
||||
return funIntStream.apply(0)
|
||||
.flatMap(funIntStream)
|
||||
.collect(LongAccumulator::new, LongAccumulator::add, LongAccumulator::merge).get();
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
public long par_array_int() {
|
||||
return funIntStream.apply(0)
|
||||
.parallel()
|
||||
.flatMap(funIntStream)
|
||||
.collect(LongAccumulator::new, LongAccumulator::add, LongAccumulator::merge).get();
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
public double seq_array_double() {
|
||||
return funDoubleStream.apply(0d)
|
||||
.flatMap(funDoubleStream)
|
||||
.collect(DoubleAccumulator::new, DoubleAccumulator::add, DoubleAccumulator::merge).get();
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
public double par_array_double() {
|
||||
return funDoubleStream.apply(0d)
|
||||
.parallel()
|
||||
.flatMap(funDoubleStream)
|
||||
.collect(DoubleAccumulator::new, DoubleAccumulator::add, DoubleAccumulator::merge).get();
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
public long seq_iterate_ref() {
|
||||
return funIterateStream.apply(0L)
|
||||
.flatMap(funIterateStream)
|
||||
.collect(LongAccumulator::new, LongAccumulator::add, LongAccumulator::merge).get();
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
public long par_iterate_ref() {
|
||||
return funIterateStream.apply(0L)
|
||||
.parallel()
|
||||
.flatMap(funIterateStream)
|
||||
.collect(LongAccumulator::new, LongAccumulator::add, LongAccumulator::merge).get();
|
||||
}
|
||||
|
||||
|
||||
@Benchmark
|
||||
public long seq_iterate_long() {
|
||||
return funIterateLongStream.apply(0L)
|
||||
.flatMap(funIterateLongStream)
|
||||
.collect(LongAccumulator::new, LongAccumulator::add, LongAccumulator::merge).get();
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
public long par_iterate_long() {
|
||||
return funIterateLongStream.apply(0L)
|
||||
.parallel()
|
||||
.flatMap(funIterateLongStream)
|
||||
.collect(LongAccumulator::new, LongAccumulator::add, LongAccumulator::merge).get();
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
public long seq_iterate_int() {
|
||||
return funIterateIntStream.apply(0)
|
||||
.flatMap(funIterateIntStream)
|
||||
.collect(LongAccumulator::new, LongAccumulator::add, LongAccumulator::merge).get();
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
public long par_iterate_int() {
|
||||
return funIterateIntStream.apply(0)
|
||||
.parallel()
|
||||
.flatMap(funIterateIntStream)
|
||||
.collect(LongAccumulator::new, LongAccumulator::add, LongAccumulator::merge).get();
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
public double seq_iterate_double() {
|
||||
return funIterateDoubleStream.apply(0d)
|
||||
.flatMap(funIterateDoubleStream)
|
||||
.collect(DoubleAccumulator::new, DoubleAccumulator::add, DoubleAccumulator::merge).get();
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
public double par_iterate_double() {
|
||||
return funIterateDoubleStream.apply(0d)
|
||||
.parallel()
|
||||
.flatMap(funIterateDoubleStream)
|
||||
.collect(DoubleAccumulator::new, DoubleAccumulator::add, DoubleAccumulator::merge).get();
|
||||
}
|
||||
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user