8328316: Finisher cannot emit if stream is sequential and integrator returned false
Reviewed-by: psandoz
This commit is contained in:
parent
ba05c6d0b6
commit
ab28045d77
@ -1,5 +1,5 @@
|
|||||||
/*
|
/*
|
||||||
* Copyright (c) 2023, Oracle and/or its affiliates. All rights reserved.
|
* Copyright (c) 2023, 2024, Oracle and/or its affiliates. All rights reserved.
|
||||||
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
||||||
*
|
*
|
||||||
* This code is free software; you can redistribute it and/or modify it
|
* This code is free software; you can redistribute it and/or modify it
|
||||||
@ -142,6 +142,7 @@ final class GathererOp<T, A, R> extends ReferencePipeline<T, R> {
|
|||||||
private final Integrator<A, T, R> integrator; // Optimization: reuse
|
private final Integrator<A, T, R> integrator; // Optimization: reuse
|
||||||
private A state;
|
private A state;
|
||||||
private boolean proceed = true;
|
private boolean proceed = true;
|
||||||
|
private boolean downstreamProceed = true;
|
||||||
|
|
||||||
GatherSink(Gatherer<T, A, R> gatherer, Sink<R> sink) {
|
GatherSink(Gatherer<T, A, R> gatherer, Sink<R> sink) {
|
||||||
this.gatherer = gatherer;
|
this.gatherer = gatherer;
|
||||||
@ -173,12 +174,12 @@ final class GathererOp<T, A, R> extends ReferencePipeline<T, R> {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean cancellationRequested() {
|
public boolean cancellationRequested() {
|
||||||
return cancellationRequested(proceed);
|
return cancellationRequested(proceed && downstreamProceed);
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean cancellationRequested(boolean knownProceed) {
|
private boolean cancellationRequested(boolean knownProceed) {
|
||||||
// Highly performance sensitive
|
// Highly performance sensitive
|
||||||
return !(knownProceed && (!sink.cancellationRequested() || (proceed = false)));
|
return !(knownProceed && (!sink.cancellationRequested() || (downstreamProceed = false)));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -194,12 +195,12 @@ final class GathererOp<T, A, R> extends ReferencePipeline<T, R> {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean isRejecting() {
|
public boolean isRejecting() {
|
||||||
return !proceed;
|
return !downstreamProceed;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean push(R r) {
|
public boolean push(R r) {
|
||||||
var p = proceed;
|
var p = downstreamProceed;
|
||||||
if (p)
|
if (p)
|
||||||
sink.accept(r);
|
sink.accept(r);
|
||||||
return !cancellationRequested(p);
|
return !cancellationRequested(p);
|
||||||
|
65
test/jdk/java/util/stream/GathererShortCircuitTest.java
Normal file
65
test/jdk/java/util/stream/GathererShortCircuitTest.java
Normal file
@ -0,0 +1,65 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2024, Oracle and/or its affiliates. All rights reserved.
|
||||||
|
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
||||||
|
*
|
||||||
|
* This code is free software; you can redistribute it and/or modify it
|
||||||
|
* under the terms of the GNU General Public License version 2 only, as
|
||||||
|
* published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This code is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
|
||||||
|
* version 2 for more details (a copy is included in the LICENSE file that
|
||||||
|
* accompanied this code).
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License version
|
||||||
|
* 2 along with this work; if not, write to the Free Software Foundation,
|
||||||
|
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||||
|
*
|
||||||
|
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
|
||||||
|
* or visit www.oracle.com if you need additional information or have any
|
||||||
|
* questions.
|
||||||
|
*/
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
import java.util.stream.Gatherer;
|
||||||
|
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import static org.junit.jupiter.api.Assertions.*;
|
||||||
|
import static org.junit.jupiter.api.Assumptions.*;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @test
|
||||||
|
* @bug 8328316
|
||||||
|
* @summary Testing Gatherer behavior under short circuiting
|
||||||
|
* @enablePreview
|
||||||
|
* @run junit GathererShortCircuitTest
|
||||||
|
*/
|
||||||
|
|
||||||
|
public class GathererShortCircuitTest {
|
||||||
|
@Test
|
||||||
|
public void mustBeAbleToPushFromFinisher() {
|
||||||
|
Integer expected = 8328316;
|
||||||
|
List<Integer> source = List.of(1,2,3,4,5);
|
||||||
|
|
||||||
|
Gatherer<Integer, ?, Integer> pushOneInFinisher =
|
||||||
|
Gatherer.of(
|
||||||
|
(_, element, downstream) -> false,
|
||||||
|
(_, downstream) -> downstream.push(expected)
|
||||||
|
);
|
||||||
|
|
||||||
|
var usingCollect =
|
||||||
|
source.stream().gather(pushOneInFinisher).collect(Collectors.toList());
|
||||||
|
var usingBuiltin =
|
||||||
|
source.stream().gather(pushOneInFinisher).toList();
|
||||||
|
var usingCollectPar =
|
||||||
|
source.stream().parallel().gather(pushOneInFinisher).collect(Collectors.toList());
|
||||||
|
var usingBuiltinPar =
|
||||||
|
source.stream().parallel().gather(pushOneInFinisher).toList();
|
||||||
|
|
||||||
|
assertEquals(List.of(expected), usingCollect);
|
||||||
|
assertEquals(List.of(expected), usingBuiltin);
|
||||||
|
assertEquals(List.of(expected), usingCollectPar);
|
||||||
|
assertEquals(List.of(expected), usingBuiltinPar);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user