8017513: Support for closeable streams

8022237: j.u.s.BaseStream.onClose() has an issue in implementation or requires spec clarification
8022572: Same exception instances thrown from j.u.stream.Stream.onClose() handlers are not listed as suppressed

BaseStream implements AutoCloseable; Remove CloseableStream and DelegatingStream

Reviewed-by: alanb, mduigou, psandoz
This commit is contained in:
Brian Goetz 2013-09-03 12:16:01 -07:00 committed by Henry Jen
parent 45d26c9571
commit 7bc062de1d
20 changed files with 578 additions and 548 deletions

View File

@ -25,34 +25,56 @@
package java.nio.file;
import java.nio.file.attribute.*;
import java.nio.file.spi.FileSystemProvider;
import java.nio.file.spi.FileTypeDetector;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.Reader;
import java.io.UncheckedIOException;
import java.io.Writer;
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.nio.channels.SeekableByteChannel;
import java.io.Closeable;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Reader;
import java.io.Writer;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.*;
import java.util.function.BiPredicate;
import java.util.stream.CloseableStream;
import java.util.stream.DelegatingStream;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CharsetEncoder;
import java.nio.file.attribute.BasicFileAttributeView;
import java.nio.file.attribute.BasicFileAttributes;
import java.nio.file.attribute.DosFileAttributes;
import java.nio.file.attribute.FileAttribute;
import java.nio.file.attribute.FileAttributeView;
import java.nio.file.attribute.FileOwnerAttributeView;
import java.nio.file.attribute.FileStoreAttributeView;
import java.nio.file.attribute.FileTime;
import java.nio.file.attribute.PosixFileAttributeView;
import java.nio.file.attribute.PosixFileAttributes;
import java.nio.file.attribute.PosixFilePermission;
import java.nio.file.attribute.UserPrincipal;
import java.nio.file.spi.FileSystemProvider;
import java.nio.file.spi.FileTypeDetector;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.function.BiPredicate;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
/**
* This class consists exclusively of static methods that operate on files,
@ -74,6 +96,21 @@ public final class Files {
return path.getFileSystem().provider();
}
/**
* Convert a Closeable to a Runnable by converting checked IOException
* to UncheckedIOException
*/
private static Runnable asUncheckedRunnable(Closeable c) {
return () -> {
try {
c.close();
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
};
}
// -- File contents --
/**
@ -3228,29 +3265,7 @@ public final class Files {
// -- Stream APIs --
/**
* Implementation of CloseableStream
*/
private static class DelegatingCloseableStream<T> extends DelegatingStream<T>
implements CloseableStream<T>
{
private final Closeable closeable;
DelegatingCloseableStream(Closeable c, Stream<T> delegate) {
super(delegate);
this.closeable = c;
}
public void close() {
try {
closeable.close();
} catch (IOException ex) {
throw new UncheckedIOException(ex);
}
}
}
/**
* Return a lazily populated {@code CloseableStream}, the elements of
* Return a lazily populated {@code Stream}, the elements of
* which are the entries in the directory. The listing is not recursive.
*
* <p> The elements of the stream are {@link Path} objects that are
@ -3264,10 +3279,13 @@ public final class Files {
* reflect updates to the directory that occur after returning from this
* method.
*
* <p> When not using the try-with-resources construct, then the stream's
* {@link CloseableStream#close close} method should be invoked after the
* operation is completed so as to free any resources held for the open
* directory. Operating on a closed stream behaves as if the end of stream
* <p> The returned stream encapsulates a {@link DirectoryStream}.
* If timely disposal of file system resources is required, the
* {@code try}-with-resources construct should be used to ensure that the
* stream's {@link Stream#close close} method is invoked after the stream
* operations are completed.
*
* <p> Operating on a closed stream behaves as if the end of stream
* has been reached. Due to read-ahead, one or more elements may be
* returned after the stream has been closed.
*
@ -3278,7 +3296,7 @@ public final class Files {
*
* @param dir The path to the directory
*
* @return The {@code CloseableStream} describing the content of the
* @return The {@code Stream} describing the content of the
* directory
*
* @throws NotDirectoryException
@ -3294,43 +3312,54 @@ public final class Files {
* @see #newDirectoryStream(Path)
* @since 1.8
*/
public static CloseableStream<Path> list(Path dir) throws IOException {
public static Stream<Path> list(Path dir) throws IOException {
DirectoryStream<Path> ds = Files.newDirectoryStream(dir);
final Iterator<Path> delegate = ds.iterator();
try {
final Iterator<Path> delegate = ds.iterator();
// Re-wrap DirectoryIteratorException to UncheckedIOException
Iterator<Path> it = new Iterator<Path>() {
public boolean hasNext() {
try {
return delegate.hasNext();
} catch (DirectoryIteratorException e) {
throw new UncheckedIOException(e.getCause());
// Re-wrap DirectoryIteratorException to UncheckedIOException
Iterator<Path> it = new Iterator<Path>() {
@Override
public boolean hasNext() {
try {
return delegate.hasNext();
} catch (DirectoryIteratorException e) {
throw new UncheckedIOException(e.getCause());
}
}
}
public Path next() {
try {
return delegate.next();
} catch (DirectoryIteratorException e) {
throw new UncheckedIOException(e.getCause());
@Override
public Path next() {
try {
return delegate.next();
} catch (DirectoryIteratorException e) {
throw new UncheckedIOException(e.getCause());
}
}
}
};
};
Stream<Path> s = StreamSupport.stream(
Spliterators.spliteratorUnknownSize(it, Spliterator.DISTINCT),
false);
return new DelegatingCloseableStream<>(ds, s);
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(it, Spliterator.DISTINCT), false)
.onClose(asUncheckedRunnable(ds));
} catch (Error|RuntimeException e) {
try {
ds.close();
} catch (IOException ex) {
try {
e.addSuppressed(ex);
} catch (Throwable ignore) {}
}
throw e;
}
}
/**
* Return a {@code CloseableStream} that is lazily populated with {@code
* Return a {@code Stream} that is lazily populated with {@code
* Path} by walking the file tree rooted at a given starting file. The
* file tree is traversed <em>depth-first</em>, the elements in the stream
* are {@link Path} objects that are obtained as if by {@link
* Path#resolve(Path) resolving} the relative path against {@code start}.
*
* <p> The {@code stream} walks the file tree as elements are consumed.
* The {@code CloseableStream} returned is guaranteed to have at least one
* The {@code Stream} returned is guaranteed to have at least one
* element, the starting file itself. For each file visited, the stream
* attempts to read its {@link BasicFileAttributes}. If the file is a
* directory and can be opened successfully, entries in the directory, and
@ -3370,10 +3399,11 @@ public final class Files {
* <p> When a security manager is installed and it denies access to a file
* (or directory), then it is ignored and not included in the stream.
*
* <p> When not using the try-with-resources construct, then the stream's
* {@link CloseableStream#close close} method should be invoked after the
* operation is completed so as to free any resources held for the open
* directory. Operate the stream after it is closed will throw an
* <p> The returned stream encapsulates one or more {@link DirectoryStream}s.
* If timely disposal of file system resources is required, the
* {@code try}-with-resources construct should be used to ensure that the
* stream's {@link Stream#close close} method is invoked after the stream
* operations are completed. Operating on a closed stream will result in an
* {@link java.lang.IllegalStateException}.
*
* <p> If an {@link IOException} is thrown when accessing the directory
@ -3388,7 +3418,7 @@ public final class Files {
* @param options
* options to configure the traversal
*
* @return the {@link CloseableStream} of {@link Path}
* @return the {@link Stream} of {@link Path}
*
* @throws IllegalArgumentException
* if the {@code maxDepth} parameter is negative
@ -3401,21 +3431,22 @@ public final class Files {
* if an I/O error is thrown when accessing the starting file.
* @since 1.8
*/
public static CloseableStream<Path> walk(Path start, int maxDepth,
FileVisitOption... options)
throws IOException
{
public static Stream<Path> walk(Path start, int maxDepth,
FileVisitOption... options)
throws IOException {
FileTreeIterator iterator = new FileTreeIterator(start, maxDepth, options);
Stream<Path> s = StreamSupport.stream(
Spliterators.spliteratorUnknownSize(iterator, Spliterator.DISTINCT),
false).
map(entry -> entry.file());
return new DelegatingCloseableStream<>(iterator, s);
try {
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.DISTINCT), false)
.onClose(iterator::close)
.map(entry -> entry.file());
} catch (Error|RuntimeException e) {
iterator.close();
throw e;
}
}
/**
* Return a {@code CloseableStream} that is lazily populated with {@code
* Return a {@code Stream} that is lazily populated with {@code
* Path} by walking the file tree rooted at a given starting file. The
* file tree is traversed <em>depth-first</em>, the elements in the stream
* are {@link Path} objects that are obtained as if by {@link
@ -3428,12 +3459,19 @@ public final class Files {
* </pre></blockquote>
* In other words, it visits all levels of the file tree.
*
* <p> The returned stream encapsulates one or more {@link DirectoryStream}s.
* If timely disposal of file system resources is required, the
* {@code try}-with-resources construct should be used to ensure that the
* stream's {@link Stream#close close} method is invoked after the stream
* operations are completed. Operating on a closed stream will result in an
* {@link java.lang.IllegalStateException}.
*
* @param start
* the starting file
* @param options
* options to configure the traversal
*
* @return the {@link CloseableStream} of {@link Path}
* @return the {@link Stream} of {@link Path}
*
* @throws SecurityException
* If the security manager denies access to the starting file.
@ -3446,15 +3484,14 @@ public final class Files {
* @see #walk(Path, int, FileVisitOption...)
* @since 1.8
*/
public static CloseableStream<Path> walk(Path start,
FileVisitOption... options)
throws IOException
{
public static Stream<Path> walk(Path start,
FileVisitOption... options)
throws IOException {
return walk(start, Integer.MAX_VALUE, options);
}
/**
* Return a {@code CloseableStream} that is lazily populated with {@code
* Return a {@code Stream} that is lazily populated with {@code
* Path} by searching for files in a file tree rooted at a given starting
* file.
*
@ -3463,12 +3500,19 @@ public final class Files {
* {@link BiPredicate} is invoked with its {@link Path} and {@link
* BasicFileAttributes}. The {@code Path} object is obtained as if by
* {@link Path#resolve(Path) resolving} the relative path against {@code
* start} and is only included in the returned {@link CloseableStream} if
* start} and is only included in the returned {@link Stream} if
* the {@code BiPredicate} returns true. Compare to calling {@link
* java.util.stream.Stream#filter filter} on the {@code Stream}
* returned by {@code walk} method, this method may be more efficient by
* avoiding redundant retrieval of the {@code BasicFileAttributes}.
*
* <p> The returned stream encapsulates one or more {@link DirectoryStream}s.
* If timely disposal of file system resources is required, the
* {@code try}-with-resources construct should be used to ensure that the
* stream's {@link Stream#close close} method is invoked after the stream
* operations are completed. Operating on a closed stream will result in an
* {@link java.lang.IllegalStateException}.
*
* <p> If an {@link IOException} is thrown when accessing the directory
* after returned from this method, it is wrapped in an {@link
* UncheckedIOException} which will be thrown from the method that caused
@ -3484,7 +3528,7 @@ public final class Files {
* @param options
* options to configure the traversal
*
* @return the {@link CloseableStream} of {@link Path}
* @return the {@link Stream} of {@link Path}
*
* @throws IllegalArgumentException
* if the {@code maxDepth} parameter is negative
@ -3499,24 +3543,25 @@ public final class Files {
* @see #walk(Path, int, FileVisitOption...)
* @since 1.8
*/
public static CloseableStream<Path> find(Path start,
int maxDepth,
BiPredicate<Path, BasicFileAttributes> matcher,
FileVisitOption... options)
throws IOException
{
public static Stream<Path> find(Path start,
int maxDepth,
BiPredicate<Path, BasicFileAttributes> matcher,
FileVisitOption... options)
throws IOException {
FileTreeIterator iterator = new FileTreeIterator(start, maxDepth, options);
Stream<Path> s = StreamSupport.stream(
Spliterators.spliteratorUnknownSize(iterator, Spliterator.DISTINCT),
false).
filter(entry -> matcher.test(entry.file(), entry.attributes())).
map(entry -> entry.file());
return new DelegatingCloseableStream<>(iterator, s);
try {
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.DISTINCT), false)
.onClose(iterator::close)
.filter(entry -> matcher.test(entry.file(), entry.attributes()))
.map(entry -> entry.file());
} catch (Error|RuntimeException e) {
iterator.close();
throw e;
}
}
/**
* Read all lines from a file as a {@code CloseableStream}. Unlike {@link
* Read all lines from a file as a {@code Stream}. Unlike {@link
* #readAllLines(Path, Charset) readAllLines}, this method does not read
* all lines into a {@code List}, but instead populates lazily as the stream
* is consumed.
@ -3528,22 +3573,24 @@ public final class Files {
* <p> After this method returns, then any subsequent I/O exception that
* occurs while reading from the file or when a malformed or unmappable byte
* sequence is read, is wrapped in an {@link UncheckedIOException} that will
* be thrown form the
* be thrown from the
* {@link java.util.stream.Stream} method that caused the read to take
* place. In case an {@code IOException} is thrown when closing the file,
* it is also wrapped as an {@code UncheckedIOException}.
*
* <p> When not using the try-with-resources construct, then stream's
* {@link CloseableStream#close close} method should be invoked after
* operation is completed so as to free any resources held for the open
* file.
* <p> The returned stream encapsulates a {@link Reader}. If timely
* disposal of file system resources is required, the try-with-resources
* construct should be used to ensure that the stream's
* {@link Stream#close close} method is invoked after the stream operations
* are completed.
*
*
* @param path
* the path to the file
* @param cs
* the charset to use for decoding
*
* @return the lines from the file as a {@code CloseableStream}
* @return the lines from the file as a {@code Stream}
*
* @throws IOException
* if an I/O error occurs opening the file
@ -3557,10 +3604,19 @@ public final class Files {
* @see java.io.BufferedReader#lines()
* @since 1.8
*/
public static CloseableStream<String> lines(Path path, Charset cs)
throws IOException
{
public static Stream<String> lines(Path path, Charset cs) throws IOException {
BufferedReader br = Files.newBufferedReader(path, cs);
return new DelegatingCloseableStream<>(br, br.lines());
try {
return br.lines().onClose(asUncheckedRunnable(br));
} catch (Error|RuntimeException e) {
try {
br.close();
} catch (IOException ex) {
try {
e.addSuppressed(ex);
} catch (Throwable ignore) {}
}
throw e;
}
}
}

View File

@ -71,6 +71,9 @@ import java.util.function.Supplier;
*/
abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>
extends PipelineHelper<E_OUT> implements BaseStream<E_OUT, S> {
private static final String MSG_STREAM_LINKED = "stream has already been operated upon or closed";
private static final String MSG_CONSUMED = "source already consumed or closed";
/**
* Backlink to the head of the pipeline chain (self if this is the source
* stage).
@ -137,6 +140,8 @@ abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>
*/
private boolean sourceAnyStateful;
private Runnable sourceCloseAction;
/**
* True if pipeline is parallel, otherwise the pipeline is sequential; only
* valid for the source stage.
@ -195,7 +200,7 @@ abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>
*/
AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
if (previousStage.linkedOrConsumed)
throw new IllegalStateException("stream has already been operated upon");
throw new IllegalStateException(MSG_STREAM_LINKED);
previousStage.linkedOrConsumed = true;
previousStage.nextStage = this;
@ -221,7 +226,7 @@ abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
assert getOutputShape() == terminalOp.inputShape();
if (linkedOrConsumed)
throw new IllegalStateException("stream has already been operated upon");
throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;
return isParallel()
@ -238,7 +243,7 @@ abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>
@SuppressWarnings("unchecked")
final Node<E_OUT> evaluateToArrayNode(IntFunction<E_OUT[]> generator) {
if (linkedOrConsumed)
throw new IllegalStateException("stream has already been operated upon");
throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;
// If the last intermediate operation is stateful then
@ -266,7 +271,7 @@ abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>
throw new IllegalStateException();
if (linkedOrConsumed)
throw new IllegalStateException("stream has already been operated upon");
throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;
if (sourceStage.sourceSpliterator != null) {
@ -282,7 +287,7 @@ abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>
return s;
}
else {
throw new IllegalStateException("source already consumed");
throw new IllegalStateException(MSG_CONSUMED);
}
}
@ -302,12 +307,35 @@ abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>
return (S) this;
}
@Override
public void close() {
linkedOrConsumed = true;
sourceSupplier = null;
sourceSpliterator = null;
if (sourceStage.sourceCloseAction != null) {
Runnable closeAction = sourceStage.sourceCloseAction;
sourceStage.sourceCloseAction = null;
closeAction.run();
}
}
@Override
@SuppressWarnings("unchecked")
public S onClose(Runnable closeHandler) {
Runnable existingHandler = sourceStage.sourceCloseAction;
sourceStage.sourceCloseAction =
(existingHandler == null)
? closeHandler
: Streams.composeWithExceptions(existingHandler, closeHandler);
return (S) this;
}
// Primitive specialization use co-variant overrides, hence is not final
@Override
@SuppressWarnings("unchecked")
public Spliterator<E_OUT> spliterator() {
if (linkedOrConsumed)
throw new IllegalStateException("stream has already been operated upon");
throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;
if (this == sourceStage) {
@ -324,7 +352,7 @@ abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>
return lazySpliterator(s);
}
else {
throw new IllegalStateException("source already consumed");
throw new IllegalStateException(MSG_CONSUMED);
}
}
else {
@ -424,7 +452,7 @@ abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>
sourceStage.sourceSupplier = null;
}
else {
throw new IllegalStateException("source already consumed");
throw new IllegalStateException(MSG_CONSUMED);
}
if (isParallel()) {

View File

@ -35,7 +35,8 @@ import java.util.Spliterator;
* @param <S> type of stream implementing {@code BaseStream}
* @since 1.8
*/
public interface BaseStream<T, S extends BaseStream<T, S>> {
public interface BaseStream<T, S extends BaseStream<T, S>>
extends AutoCloseable {
/**
* Returns an iterator for the elements of this stream.
*
@ -103,4 +104,33 @@ public interface BaseStream<T, S extends BaseStream<T, S>> {
* @return an unordered stream
*/
S unordered();
/**
* Returns an equivalent stream with an additional close handler. Close
* handlers are run when the {@link #close()} method
* is called on the stream, and are executed in the order they were
* added. All close handlers are run, even if earlier close handlers throw
* exceptions. If any close handler throws an exception, the first
* exception thrown will be relayed to the caller of {@code close()}, with
* any remaining exceptions added to that exception as suppressed exceptions
* (unless one of the remaining exceptions is the same exception as the
* first exception, since an exception cannot suppress itself.) May
* return itself.
*
* <p>This is an <a href="package-summary.html#StreamOps">intermediate
* operation</a>.
*
* @param closeHandler A task to execute when the stream is closed
* @return a stream with a handler that is run if the stream is closed
*/
S onClose(Runnable closeHandler);
/**
* Closes this stream, causing all close handlers for this stream pipeline
* to be called.
*
* @see AutoCloseable#close()
*/
@Override
void close();
}

View File

@ -1,57 +0,0 @@
/*
* Copyright (c) 2013, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package java.util.stream;
/**
* A {@code CloseableStream} is a {@code Stream} that can be closed.
* The close method is invoked to release resources that the object is
* holding (such as open files).
*
* @param <T> The type of stream elements
* @since 1.8
*/
public interface CloseableStream<T> extends Stream<T>, AutoCloseable {
/**
* Closes this resource, relinquishing any underlying resources.
* This method is invoked automatically on objects managed by the
* {@code try}-with-resources statement. Does nothing if called when
* the resource has already been closed.
*
* This method does not allow throwing checked {@code Exception}s like
* {@link AutoCloseable#close() AutoCloseable.close()}. Cases where the
* close operation may fail require careful attention by implementers. It
* is strongly advised to relinquish the underlying resources and to
* internally <em>mark</em> the resource as closed. The {@code close}
* method is unlikely to be invoked more than once and so this ensures
* that the resources are released in a timely manner. Furthermore it
* reduces problems that could arise when the resource wraps, or is
* wrapped, by another resource.
*
* @see AutoCloseable#close()
*/
void close();
}

View File

@ -1,270 +0,0 @@
/*
* Copyright (c) 2013, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package java.util.stream;
import java.util.Comparator;
import java.util.Iterator;
import java.util.Objects;
import java.util.Optional;
import java.util.Spliterator;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.BinaryOperator;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.IntFunction;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.function.ToDoubleFunction;
import java.util.function.ToIntFunction;
import java.util.function.ToLongFunction;
/**
* A {@code Stream} implementation that delegates operations to another {@code
* Stream}.
*
* @param <T> type of stream elements for this stream and underlying delegate
* stream
*
* @since 1.8
*/
public class DelegatingStream<T> implements Stream<T> {
final private Stream<T> delegate;
/**
* Construct a {@code Stream} that delegates operations to another {@code
* Stream}.
*
* @param delegate the underlying {@link Stream} to which we delegate all
* {@code Stream} methods
* @throws NullPointerException if the delegate is null
*/
public DelegatingStream(Stream<T> delegate) {
this.delegate = Objects.requireNonNull(delegate);
}
// -- BaseStream methods --
@Override
public Spliterator<T> spliterator() {
return delegate.spliterator();
}
@Override
public boolean isParallel() {
return delegate.isParallel();
}
@Override
public Iterator<T> iterator() {
return delegate.iterator();
}
// -- Stream methods --
@Override
public Stream<T> filter(Predicate<? super T> predicate) {
return delegate.filter(predicate);
}
@Override
public <R> Stream<R> map(Function<? super T, ? extends R> mapper) {
return delegate.map(mapper);
}
@Override
public IntStream mapToInt(ToIntFunction<? super T> mapper) {
return delegate.mapToInt(mapper);
}
@Override
public LongStream mapToLong(ToLongFunction<? super T> mapper) {
return delegate.mapToLong(mapper);
}
@Override
public DoubleStream mapToDouble(ToDoubleFunction<? super T> mapper) {
return delegate.mapToDouble(mapper);
}
@Override
public <R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper) {
return delegate.flatMap(mapper);
}
@Override
public IntStream flatMapToInt(Function<? super T, ? extends IntStream> mapper) {
return delegate.flatMapToInt(mapper);
}
@Override
public LongStream flatMapToLong(Function<? super T, ? extends LongStream> mapper) {
return delegate.flatMapToLong(mapper);
}
@Override
public DoubleStream flatMapToDouble(Function<? super T, ? extends DoubleStream> mapper) {
return delegate.flatMapToDouble(mapper);
}
@Override
public Stream<T> distinct() {
return delegate.distinct();
}
@Override
public Stream<T> sorted() {
return delegate.sorted();
}
@Override
public Stream<T> sorted(Comparator<? super T> comparator) {
return delegate.sorted(comparator);
}
@Override
public void forEach(Consumer<? super T> action) {
delegate.forEach(action);
}
@Override
public void forEachOrdered(Consumer<? super T> action) {
delegate.forEachOrdered(action);
}
@Override
public Stream<T> peek(Consumer<? super T> consumer) {
return delegate.peek(consumer);
}
@Override
public Stream<T> limit(long maxSize) {
return delegate.limit(maxSize);
}
@Override
public Stream<T> substream(long startingOffset) {
return delegate.substream(startingOffset);
}
@Override
public Stream<T> substream(long startingOffset, long endingOffset) {
return delegate.substream(startingOffset, endingOffset);
}
@Override
public <A> A[] toArray(IntFunction<A[]> generator) {
return delegate.toArray(generator);
}
@Override
public Object[] toArray() {
return delegate.toArray();
}
@Override
public T reduce(T identity, BinaryOperator<T> accumulator) {
return delegate.reduce(identity, accumulator);
}
@Override
public Optional<T> reduce(BinaryOperator<T> accumulator) {
return delegate.reduce(accumulator);
}
@Override
public <U> U reduce(U identity, BiFunction<U, ? super T, U> accumulator,
BinaryOperator<U> combiner) {
return delegate.reduce(identity, accumulator, combiner);
}
@Override
public <R> R collect(Supplier<R> resultFactory,
BiConsumer<R, ? super T> accumulator,
BiConsumer<R, R> combiner) {
return delegate.collect(resultFactory, accumulator, combiner);
}
@Override
public <R, A> R collect(Collector<? super T, A, R> collector) {
return delegate.collect(collector);
}
@Override
public Optional<T> max(Comparator<? super T> comparator) {
return delegate.max(comparator);
}
@Override
public Optional<T> min(Comparator<? super T> comparator) {
return delegate.min(comparator);
}
@Override
public long count() {
return delegate.count();
}
@Override
public boolean anyMatch(Predicate<? super T> predicate) {
return delegate.anyMatch(predicate);
}
@Override
public boolean allMatch(Predicate<? super T> predicate) {
return delegate.allMatch(predicate);
}
@Override
public boolean noneMatch(Predicate<? super T> predicate) {
return delegate.noneMatch(predicate);
}
@Override
public Optional<T> findFirst() {
return delegate.findFirst();
}
@Override
public Optional<T> findAny() {
return delegate.findAny();
}
@Override
public Stream<T> unordered() {
return delegate.unordered();
}
@Override
public Stream<T> sequential() {
return delegate.sequential();
}
@Override
public Stream<T> parallel() {
return delegate.parallel();
}
}

View File

@ -266,10 +266,11 @@ abstract class DoublePipeline<E_IN>
@Override
public void accept(double t) {
// We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
DoubleStream result = mapper.apply(t);
if (result != null)
result.sequential().forEach(i -> downstream.accept(i));
try (DoubleStream result = mapper.apply(t)) {
// We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
if (result != null)
result.sequential().forEach(i -> downstream.accept(i));
}
}
};
}

View File

@ -752,7 +752,8 @@ public interface DoubleStream extends BaseStream<Double, DoubleStream> {
* elements of a first {@code DoubleStream} succeeded by all the elements of the
* second {@code DoubleStream}. The resulting stream is ordered if both
* of the input streams are ordered, and parallel if either of the input
* streams is parallel.
* streams is parallel. When the resulting stream is closed, the close
* handlers for both input streams are invoked.
*
* @param a the first stream
* @param b the second stream to concatenate on to end of the first stream
@ -764,7 +765,8 @@ public interface DoubleStream extends BaseStream<Double, DoubleStream> {
Spliterator.OfDouble split = new Streams.ConcatSpliterator.OfDouble(
a.spliterator(), b.spliterator());
return StreamSupport.doubleStream(split, a.isParallel() || b.isParallel());
DoubleStream stream = StreamSupport.doubleStream(split, a.isParallel() || b.isParallel());
return stream.onClose(Streams.composedClose(a, b));
}
/**

View File

@ -302,10 +302,11 @@ abstract class IntPipeline<E_IN>
@Override
public void accept(int t) {
// We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
IntStream result = mapper.apply(t);
if (result != null)
result.sequential().forEach(i -> downstream.accept(i));
try (IntStream result = mapper.apply(t)) {
// We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
if (result != null)
result.sequential().forEach(i -> downstream.accept(i));
}
}
};
}

View File

@ -806,7 +806,8 @@ public interface IntStream extends BaseStream<Integer, IntStream> {
* elements of a first {@code IntStream} succeeded by all the elements of the
* second {@code IntStream}. The resulting stream is ordered if both
* of the input streams are ordered, and parallel if either of the input
* streams is parallel.
* streams is parallel. When the resulting stream is closed, the close
* handlers for both input streams are invoked.
*
* @param a the first stream
* @param b the second stream to concatenate on to end of the first stream
@ -818,7 +819,8 @@ public interface IntStream extends BaseStream<Integer, IntStream> {
Spliterator.OfInt split = new Streams.ConcatSpliterator.OfInt(
a.spliterator(), b.spliterator());
return StreamSupport.intStream(split, a.isParallel() || b.isParallel());
IntStream stream = StreamSupport.intStream(split, a.isParallel() || b.isParallel());
return stream.onClose(Streams.composedClose(a, b));
}
/**

View File

@ -283,10 +283,11 @@ abstract class LongPipeline<E_IN>
@Override
public void accept(long t) {
// We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
LongStream result = mapper.apply(t);
if (result != null)
result.sequential().forEach(i -> downstream.accept(i));
try (LongStream result = mapper.apply(t)) {
// We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
if (result != null)
result.sequential().forEach(i -> downstream.accept(i));
}
}
};
}

View File

@ -812,7 +812,8 @@ public interface LongStream extends BaseStream<Long, LongStream> {
* elements of a first {@code LongStream} succeeded by all the elements of the
* second {@code LongStream}. The resulting stream is ordered if both
* of the input streams are ordered, and parallel if either of the input
* streams is parallel.
* streams is parallel. When the resulting stream is closed, the close
* handlers for both input streams are invoked.
*
* @param a the first stream
* @param b the second stream to concatenate on to end of the first stream
@ -824,7 +825,8 @@ public interface LongStream extends BaseStream<Long, LongStream> {
Spliterator.OfLong split = new Streams.ConcatSpliterator.OfLong(
a.spliterator(), b.spliterator());
return StreamSupport.longStream(split, a.isParallel() || b.isParallel());
LongStream stream = StreamSupport.longStream(split, a.isParallel() || b.isParallel());
return stream.onClose(Streams.composedClose(a, b));
}
/**

View File

@ -264,10 +264,11 @@ abstract class ReferencePipeline<P_IN, P_OUT>
@Override
public void accept(P_OUT u) {
// We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
Stream<? extends R> result = mapper.apply(u);
if (result != null)
result.sequential().forEach(downstream);
try (Stream<? extends R> result = mapper.apply(u)) {
// We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
if (result != null)
result.sequential().forEach(downstream);
}
}
};
}
@ -291,10 +292,11 @@ abstract class ReferencePipeline<P_IN, P_OUT>
@Override
public void accept(P_OUT u) {
// We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
IntStream result = mapper.apply(u);
if (result != null)
result.sequential().forEach(downstreamAsInt);
try (IntStream result = mapper.apply(u)) {
// We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
if (result != null)
result.sequential().forEach(downstreamAsInt);
}
}
};
}
@ -318,10 +320,11 @@ abstract class ReferencePipeline<P_IN, P_OUT>
@Override
public void accept(P_OUT u) {
// We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
DoubleStream result = mapper.apply(u);
if (result != null)
result.sequential().forEach(downstreamAsDouble);
try (DoubleStream result = mapper.apply(u)) {
// We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
if (result != null)
result.sequential().forEach(downstreamAsDouble);
}
}
};
}
@ -345,10 +348,11 @@ abstract class ReferencePipeline<P_IN, P_OUT>
@Override
public void accept(P_OUT u) {
// We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
LongStream result = mapper.apply(u);
if (result != null)
result.sequential().forEach(downstreamAsLong);
try (LongStream result = mapper.apply(u)) {
// We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
if (result != null)
result.sequential().forEach(downstreamAsLong);
}
}
};
}

View File

@ -891,7 +891,8 @@ public interface Stream<T> extends BaseStream<T, Stream<T>> {
* elements of a first {@code Stream} succeeded by all the elements of the
* second {@code Stream}. The resulting stream is ordered if both
* of the input streams are ordered, and parallel if either of the input
* streams is parallel.
* streams is parallel. When the resulting stream is closed, the close
* handlers for both input streams are invoked.
*
* @param <T> The type of stream elements
* @param a the first stream
@ -906,7 +907,8 @@ public interface Stream<T> extends BaseStream<T, Stream<T>> {
@SuppressWarnings("unchecked")
Spliterator<T> split = new Streams.ConcatSpliterator.OfRef<>(
(Spliterator<T>) a.spliterator(), (Spliterator<T>) b.spliterator());
return StreamSupport.stream(split, a.isParallel() || b.isParallel());
Stream<T> stream = StreamSupport.stream(split, a.isParallel() || b.isParallel());
return stream.onClose(Streams.composedClose(a, b));
}
/**

View File

@ -833,4 +833,61 @@ final class Streams {
}
}
}
/**
* Given two Runnables, return a Runnable that executes both in sequence,
* even if the first throws an exception, and if both throw exceptions, add
* any exceptions thrown by the second as suppressed exceptions of the first.
*/
static Runnable composeWithExceptions(Runnable a, Runnable b) {
return new Runnable() {
@Override
public void run() {
try {
a.run();
}
catch (Throwable e1) {
try {
b.run();
}
catch (Throwable e2) {
try {
e1.addSuppressed(e2);
} catch (Throwable ignore) {}
}
throw e1;
}
b.run();
}
};
}
/**
* Given two streams, return a Runnable that
* executes both of their {@link BaseStream#close} methods in sequence,
* even if the first throws an exception, and if both throw exceptions, add
* any exceptions thrown by the second as suppressed exceptions of the first.
*/
static Runnable composedClose(BaseStream<?, ?> a, BaseStream<?, ?> b) {
return new Runnable() {
@Override
public void run() {
try {
a.close();
}
catch (Throwable e1) {
try {
b.close();
}
catch (Throwable e2) {
try {
e1.addSuppressed(e2);
} catch (Throwable ignore) {}
}
throw e1;
}
b.close();
}
};
}
}

View File

@ -43,14 +43,13 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
import java.util.function.BiPredicate;
import java.util.stream.CloseableStream;
import java.util.stream.Stream;
import java.util.stream.Collectors;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@ -138,14 +137,14 @@ public class StreamTest {
}
public void testBasic() {
try (CloseableStream<Path> s = Files.list(testFolder)) {
Object[] actual = s.sorted(Comparator.naturalOrder()).toArray();
try (Stream<Path> s = Files.list(testFolder)) {
Object[] actual = s.sorted().toArray();
assertEquals(actual, level1);
} catch (IOException ioe) {
fail("Unexpected IOException");
}
try (CloseableStream<Path> s = Files.list(testFolder.resolve("empty"))) {
try (Stream<Path> s = Files.list(testFolder.resolve("empty"))) {
int count = s.mapToInt(p -> 1).reduce(0, Integer::sum);
assertEquals(count, 0, "Expect empty stream.");
} catch (IOException ioe) {
@ -154,8 +153,8 @@ public class StreamTest {
}
public void testWalk() {
try (CloseableStream<Path> s = Files.walk(testFolder)) {
Object[] actual = s.sorted(Comparator.naturalOrder()).toArray();
try (Stream<Path> s = Files.walk(testFolder)) {
Object[] actual = s.sorted().toArray();
assertEquals(actual, all);
} catch (IOException ioe) {
fail("Unexpected IOException");
@ -163,9 +162,9 @@ public class StreamTest {
}
public void testWalkOneLevel() {
try (CloseableStream<Path> s = Files.walk(testFolder, 1)) {
try (Stream<Path> s = Files.walk(testFolder, 1)) {
Object[] actual = s.filter(path -> ! path.equals(testFolder))
.sorted(Comparator.naturalOrder())
.sorted()
.toArray();
assertEquals(actual, level1);
} catch (IOException ioe) {
@ -176,8 +175,8 @@ public class StreamTest {
public void testWalkFollowLink() {
// If link is not supported, the directory structure won't have link.
// We still want to test the behavior with FOLLOW_LINKS option.
try (CloseableStream<Path> s = Files.walk(testFolder, FileVisitOption.FOLLOW_LINKS)) {
Object[] actual = s.sorted(Comparator.naturalOrder()).toArray();
try (Stream<Path> s = Files.walk(testFolder, FileVisitOption.FOLLOW_LINKS)) {
Object[] actual = s.sorted().toArray();
assertEquals(actual, all_folowLinks);
} catch (IOException ioe) {
fail("Unexpected IOException");
@ -185,7 +184,7 @@ public class StreamTest {
}
private void validateFileSystemLoopException(Path start, Path... causes) {
try (CloseableStream<Path> s = Files.walk(start, FileVisitOption.FOLLOW_LINKS)) {
try (Stream<Path> s = Files.walk(start, FileVisitOption.FOLLOW_LINKS)) {
try {
int count = s.mapToInt(p -> 1).reduce(0, Integer::sum);
fail("Should got FileSystemLoopException, but got " + count + "elements.");
@ -282,28 +281,28 @@ public class StreamTest {
public void testFind() throws IOException {
PathBiPredicate pred = new PathBiPredicate((path, attrs) -> true);
try (CloseableStream<Path> s = Files.find(testFolder, Integer.MAX_VALUE, pred)) {
try (Stream<Path> s = Files.find(testFolder, Integer.MAX_VALUE, pred)) {
Set<Path> result = s.collect(Collectors.toCollection(TreeSet::new));
assertEquals(pred.visited(), all);
assertEquals(result.toArray(new Path[0]), pred.visited());
}
pred = new PathBiPredicate((path, attrs) -> attrs.isSymbolicLink());
try (CloseableStream<Path> s = Files.find(testFolder, Integer.MAX_VALUE, pred)) {
try (Stream<Path> s = Files.find(testFolder, Integer.MAX_VALUE, pred)) {
s.forEach(path -> assertTrue(Files.isSymbolicLink(path)));
assertEquals(pred.visited(), all);
}
pred = new PathBiPredicate((path, attrs) ->
path.getFileName().toString().startsWith("e"));
try (CloseableStream<Path> s = Files.find(testFolder, Integer.MAX_VALUE, pred)) {
try (Stream<Path> s = Files.find(testFolder, Integer.MAX_VALUE, pred)) {
s.forEach(path -> assertEquals(path.getFileName().toString(), "empty"));
assertEquals(pred.visited(), all);
}
pred = new PathBiPredicate((path, attrs) ->
path.getFileName().toString().startsWith("l") && attrs.isRegularFile());
try (CloseableStream<Path> s = Files.find(testFolder, Integer.MAX_VALUE, pred)) {
try (Stream<Path> s = Files.find(testFolder, Integer.MAX_VALUE, pred)) {
s.forEach(path -> fail("Expect empty stream"));
assertEquals(pred.visited(), all);
}
@ -317,14 +316,14 @@ public class StreamTest {
try {
// zero lines
assertTrue(Files.size(tmpfile) == 0, "File should be empty");
try (CloseableStream<String> s = Files.lines(tmpfile, US_ASCII)) {
try (Stream<String> s = Files.lines(tmpfile, US_ASCII)) {
assertEquals(s.mapToInt(l -> 1).reduce(0, Integer::sum), 0, "No line expected");
}
// one line
byte[] hi = { (byte)'h', (byte)'i' };
Files.write(tmpfile, hi);
try (CloseableStream<String> s = Files.lines(tmpfile, US_ASCII)) {
try (Stream<String> s = Files.lines(tmpfile, US_ASCII)) {
List<String> lines = s.collect(Collectors.toList());
assertTrue(lines.size() == 1, "One line expected");
assertTrue(lines.get(0).equals("hi"), "'Hi' expected");
@ -334,7 +333,7 @@ public class StreamTest {
List<String> expected = Arrays.asList("hi", "there");
Files.write(tmpfile, expected, US_ASCII);
assertTrue(Files.size(tmpfile) > 0, "File is empty");
try (CloseableStream<String> s = Files.lines(tmpfile, US_ASCII)) {
try (Stream<String> s = Files.lines(tmpfile, US_ASCII)) {
List<String> lines = s.collect(Collectors.toList());
assertTrue(lines.equals(expected), "Unexpected lines");
}
@ -342,7 +341,7 @@ public class StreamTest {
// MalformedInputException
byte[] bad = { (byte)0xff, (byte)0xff };
Files.write(tmpfile, bad);
try (CloseableStream<String> s = Files.lines(tmpfile, US_ASCII)) {
try (Stream<String> s = Files.lines(tmpfile, US_ASCII)) {
try {
List<String> lines = s.collect(Collectors.toList());
throw new RuntimeException("UncheckedIOException expected");
@ -378,7 +377,7 @@ public class StreamTest {
fsp.setFaultyMode(false);
Path fakeRoot = fs.getRoot();
try {
try (CloseableStream<Path> s = Files.list(fakeRoot)) {
try (Stream<Path> s = Files.list(fakeRoot)) {
s.forEach(path -> assertEquals(path.getFileName().toString(), "DirectoryIteratorException"));
}
} catch (UncheckedIOException uioe) {
@ -398,7 +397,7 @@ public class StreamTest {
}
try {
try (CloseableStream<Path> s = Files.list(fakeRoot)) {
try (Stream<Path> s = Files.list(fakeRoot)) {
s.forEach(path -> fail("should not get here"));
}
} catch (UncheckedIOException uioe) {
@ -427,12 +426,12 @@ public class StreamTest {
try {
fsp.setFaultyMode(false);
Path fakeRoot = fs.getRoot();
try (CloseableStream<Path> s = Files.list(fakeRoot.resolve("dir2"))) {
try (Stream<Path> s = Files.list(fakeRoot.resolve("dir2"))) {
// only one file
s.forEach(path -> assertEquals(path.getFileName().toString(), "IOException"));
}
try (CloseableStream<Path> s = Files.walk(fakeRoot.resolve("empty"))) {
try (Stream<Path> s = Files.walk(fakeRoot.resolve("empty"))) {
String[] result = s.map(path -> path.getFileName().toString())
.toArray(String[]::new);
// ordered as depth-first
@ -440,13 +439,13 @@ public class StreamTest {
}
fsp.setFaultyMode(true);
try (CloseableStream<Path> s = Files.list(fakeRoot.resolve("dir2"))) {
try (Stream<Path> s = Files.list(fakeRoot.resolve("dir2"))) {
s.forEach(path -> fail("should have caused exception"));
} catch (UncheckedIOException uioe) {
assertTrue(uioe.getCause() instanceof FaultyFileSystem.FaultyException);
}
try (CloseableStream<Path> s = Files.walk(fakeRoot.resolve("empty"))) {
try (Stream<Path> s = Files.walk(fakeRoot.resolve("empty"))) {
String[] result = s.map(path -> path.getFileName().toString())
.toArray(String[]::new);
fail("should not reach here due to IOException");
@ -454,7 +453,7 @@ public class StreamTest {
assertTrue(uioe.getCause() instanceof FaultyFileSystem.FaultyException);
}
try (CloseableStream<Path> s = Files.walk(
try (Stream<Path> s = Files.walk(
fakeRoot.resolve("empty").resolve("IOException")))
{
String[] result = s.map(path -> path.getFileName().toString())
@ -502,20 +501,20 @@ public class StreamTest {
fsp.setFaultyMode(false);
Path fakeRoot = fs.getRoot();
// validate setting
try (CloseableStream<Path> s = Files.list(fakeRoot.resolve("empty"))) {
try (Stream<Path> s = Files.list(fakeRoot.resolve("empty"))) {
String[] result = s.map(path -> path.getFileName().toString())
.toArray(String[]::new);
assertEqualsNoOrder(result, new String[] { "SecurityException", "sample" });
}
try (CloseableStream<Path> s = Files.walk(fakeRoot.resolve("dir2"))) {
try (Stream<Path> s = Files.walk(fakeRoot.resolve("dir2"))) {
String[] result = s.map(path -> path.getFileName().toString())
.toArray(String[]::new);
assertEqualsNoOrder(result, new String[] { "dir2", "SecurityException", "fileInSE", "file" });
}
if (supportsLinks) {
try (CloseableStream<Path> s = Files.list(fakeRoot.resolve("dir"))) {
try (Stream<Path> s = Files.list(fakeRoot.resolve("dir"))) {
String[] result = s.map(path -> path.getFileName().toString())
.toArray(String[]::new);
assertEqualsNoOrder(result, new String[] { "d1", "f1", "lnDir2", "SecurityException", "lnDirSE", "lnFileSE" });
@ -525,13 +524,13 @@ public class StreamTest {
// execute test
fsp.setFaultyMode(true);
// ignore file cause SecurityException
try (CloseableStream<Path> s = Files.walk(fakeRoot.resolve("empty"))) {
try (Stream<Path> s = Files.walk(fakeRoot.resolve("empty"))) {
String[] result = s.map(path -> path.getFileName().toString())
.toArray(String[]::new);
assertEqualsNoOrder(result, new String[] { "empty", "sample" });
}
// skip folder cause SecurityException
try (CloseableStream<Path> s = Files.walk(fakeRoot.resolve("dir2"))) {
try (Stream<Path> s = Files.walk(fakeRoot.resolve("dir2"))) {
String[] result = s.map(path -> path.getFileName().toString())
.toArray(String[]::new);
assertEqualsNoOrder(result, new String[] { "dir2", "file" });
@ -539,14 +538,14 @@ public class StreamTest {
if (supportsLinks) {
// not following links
try (CloseableStream<Path> s = Files.walk(fakeRoot.resolve("dir"))) {
try (Stream<Path> s = Files.walk(fakeRoot.resolve("dir"))) {
String[] result = s.map(path -> path.getFileName().toString())
.toArray(String[]::new);
assertEqualsNoOrder(result, new String[] { "dir", "d1", "f1", "lnDir2", "lnDirSE", "lnFileSE" });
}
// following links
try (CloseableStream<Path> s = Files.walk(fakeRoot.resolve("dir"), FileVisitOption.FOLLOW_LINKS)) {
try (Stream<Path> s = Files.walk(fakeRoot.resolve("dir"), FileVisitOption.FOLLOW_LINKS)) {
String[] result = s.map(path -> path.getFileName().toString())
.toArray(String[]::new);
// ?? Should fileInSE show up?
@ -556,19 +555,19 @@ public class StreamTest {
}
// list instead of walk
try (CloseableStream<Path> s = Files.list(fakeRoot.resolve("empty"))) {
try (Stream<Path> s = Files.list(fakeRoot.resolve("empty"))) {
String[] result = s.map(path -> path.getFileName().toString())
.toArray(String[]::new);
assertEqualsNoOrder(result, new String[] { "sample" });
}
try (CloseableStream<Path> s = Files.list(fakeRoot.resolve("dir2"))) {
try (Stream<Path> s = Files.list(fakeRoot.resolve("dir2"))) {
String[] result = s.map(path -> path.getFileName().toString())
.toArray(String[]::new);
assertEqualsNoOrder(result, new String[] { "file" });
}
// root cause SecurityException should be reported
try (CloseableStream<Path> s = Files.walk(
try (Stream<Path> s = Files.walk(
fakeRoot.resolve("dir2").resolve("SecurityException")))
{
String[] result = s.map(path -> path.getFileName().toString())
@ -579,7 +578,7 @@ public class StreamTest {
}
// Walk a file cause SecurityException, we should get SE
try (CloseableStream<Path> s = Files.walk(
try (Stream<Path> s = Files.walk(
fakeRoot.resolve("dir").resolve("SecurityException")))
{
String[] result = s.map(path -> path.getFileName().toString())
@ -590,7 +589,7 @@ public class StreamTest {
}
// List a file cause SecurityException, we should get SE as cannot read attribute
try (CloseableStream<Path> s = Files.list(
try (Stream<Path> s = Files.list(
fakeRoot.resolve("dir2").resolve("SecurityException")))
{
String[] result = s.map(path -> path.getFileName().toString())
@ -600,7 +599,7 @@ public class StreamTest {
assertTrue(se.getCause() instanceof FaultyFileSystem.FaultyException);
}
try (CloseableStream<Path> s = Files.list(
try (Stream<Path> s = Files.list(
fakeRoot.resolve("dir").resolve("SecurityException")))
{
String[] result = s.map(path -> path.getFileName().toString())
@ -627,7 +626,7 @@ public class StreamTest {
}
public void testConstructException() {
try (CloseableStream<String> s = Files.lines(testFolder.resolve("notExist"), Charset.forName("UTF-8"))) {
try (Stream<String> s = Files.lines(testFolder.resolve("notExist"), Charset.forName("UTF-8"))) {
s.forEach(l -> fail("File is not even exist!"));
} catch (IOException ioe) {
assertTrue(ioe instanceof NoSuchFileException);
@ -635,24 +634,26 @@ public class StreamTest {
}
public void testClosedStream() throws IOException {
try (CloseableStream<Path> s = Files.list(testFolder)) {
try (Stream<Path> s = Files.list(testFolder)) {
s.close();
Object[] actual = s.sorted(Comparator.naturalOrder()).toArray();
assertTrue(actual.length <= level1.length);
}
try (CloseableStream<Path> s = Files.walk(testFolder)) {
s.close();
Object[] actual = s.sorted(Comparator.naturalOrder()).toArray();
Object[] actual = s.sorted().toArray();
fail("Operate on closed stream should throw IllegalStateException");
} catch (IllegalStateException ex) {
// expected
}
try (CloseableStream<Path> s = Files.find(testFolder, Integer.MAX_VALUE,
try (Stream<Path> s = Files.walk(testFolder)) {
s.close();
Object[] actual = s.sorted().toArray();
fail("Operate on closed stream should throw IllegalStateException");
} catch (IllegalStateException ex) {
// expected
}
try (Stream<Path> s = Files.find(testFolder, Integer.MAX_VALUE,
(p, attr) -> true)) {
s.close();
Object[] actual = s.sorted(Comparator.naturalOrder()).toArray();
Object[] actual = s.sorted().toArray();
fail("Operate on closed stream should throw IllegalStateException");
} catch (IllegalStateException ex) {
// expected

View File

@ -40,7 +40,7 @@ import java.util.function.Function;
@SuppressWarnings({"rawtypes", "unchecked"})
public enum DoubleStreamTestScenario implements OpTestCase.BaseStreamTestScenario {
STREAM_FOR_EACH(false) {
STREAM_FOR_EACH_WITH_CLOSE(false) {
<T, S_IN extends BaseStream<T, S_IN>>
void _run(TestData<T, S_IN> data, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
DoubleStream s = m.apply(data.stream());
@ -48,6 +48,7 @@ public enum DoubleStreamTestScenario implements OpTestCase.BaseStreamTestScenari
s = s.sequential();
}
s.forEach(b);
s.close();
}
},

View File

@ -40,7 +40,7 @@ import java.util.function.IntConsumer;
@SuppressWarnings({"rawtypes", "unchecked"})
public enum IntStreamTestScenario implements OpTestCase.BaseStreamTestScenario {
STREAM_FOR_EACH(false) {
STREAM_FOR_EACH_WITH_CLOSE(false) {
<T, S_IN extends BaseStream<T, S_IN>>
void _run(TestData<T, S_IN> data, IntConsumer b, Function<S_IN, IntStream> m) {
IntStream s = m.apply(data.stream());
@ -48,6 +48,7 @@ public enum IntStreamTestScenario implements OpTestCase.BaseStreamTestScenario {
s = s.sequential();
}
s.forEach(b);
s.close();
}
},

View File

@ -40,7 +40,7 @@ import java.util.function.LongConsumer;
@SuppressWarnings({"rawtypes", "unchecked"})
public enum LongStreamTestScenario implements OpTestCase.BaseStreamTestScenario {
STREAM_FOR_EACH(false) {
STREAM_FOR_EACH_WITH_CLOSE(false) {
<T, S_IN extends BaseStream<T, S_IN>>
void _run(TestData<T, S_IN> data, LongConsumer b, Function<S_IN, LongStream> m) {
LongStream s = m.apply(data.stream());
@ -48,6 +48,7 @@ public enum LongStreamTestScenario implements OpTestCase.BaseStreamTestScenario
s = s.sequential();
}
s.forEach(b);
s.close();
}
},

View File

@ -39,7 +39,7 @@ import java.util.function.Function;
@SuppressWarnings({"rawtypes", "unchecked"})
public enum StreamTestScenario implements OpTestCase.BaseStreamTestScenario {
STREAM_FOR_EACH(false) {
STREAM_FOR_EACH_WITH_CLOSE(false) {
<T, U, S_IN extends BaseStream<T, S_IN>>
void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) {
Stream<U> s = m.apply(data.stream());
@ -47,6 +47,7 @@ public enum StreamTestScenario implements OpTestCase.BaseStreamTestScenario {
s = s.sequential();
}
s.forEach(b);
s.close();
}
},

View File

@ -0,0 +1,166 @@
/*
* Copyright (c) 2013, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package org.openjdk.tests.java.util.stream;
import java.util.Arrays;
import java.util.stream.OpTestCase;
import java.util.stream.Stream;
import org.testng.annotations.Test;
import static java.util.stream.LambdaTestHelpers.countTo;
/**
* StreamCloseTest
*
* @author Brian Goetz
*/
@Test(groups = { "serialization-hostile" })
public class StreamCloseTest extends OpTestCase {
public void testEmptyCloseHandler() {
try (Stream<Integer> ints = countTo(100).stream()) {
ints.forEach(i -> {});
}
}
public void testOneCloseHandler() {
final boolean[] holder = new boolean[1];
Runnable closer = () -> { holder[0] = true; };
try (Stream<Integer> ints = countTo(100).stream()) {
ints.onClose(closer);
ints.forEach(i -> {});
}
assertTrue(holder[0]);
Arrays.fill(holder, false);
try (Stream<Integer> ints = countTo(100).stream().onClose(closer)) {
ints.forEach(i -> {});
}
assertTrue(holder[0]);
Arrays.fill(holder, false);
try (Stream<Integer> ints = countTo(100).stream().filter(e -> true).onClose(closer)) {
ints.forEach(i -> {});
}
assertTrue(holder[0]);
Arrays.fill(holder, false);
try (Stream<Integer> ints = countTo(100).stream().filter(e -> true).onClose(closer).filter(e -> true)) {
ints.forEach(i -> {});
}
assertTrue(holder[0]);
}
public void testTwoCloseHandlers() {
final boolean[] holder = new boolean[2];
Runnable close1 = () -> { holder[0] = true; };
Runnable close2 = () -> { holder[1] = true; };
try (Stream<Integer> ints = countTo(100).stream()) {
ints.onClose(close1).onClose(close2);
ints.forEach(i -> {});
}
assertTrue(holder[0] && holder[1]);
Arrays.fill(holder, false);
try (Stream<Integer> ints = countTo(100).stream().onClose(close1).onClose(close2)) {
ints.forEach(i -> {});
}
assertTrue(holder[0] && holder[1]);
Arrays.fill(holder, false);
try (Stream<Integer> ints = countTo(100).stream().filter(e -> true).onClose(close1).onClose(close2)) {
ints.forEach(i -> {});
}
assertTrue(holder[0] && holder[1]);
Arrays.fill(holder, false);
try (Stream<Integer> ints = countTo(100).stream().filter(e -> true).onClose(close1).onClose(close2).filter(e -> true)) {
ints.forEach(i -> {});
}
assertTrue(holder[0] && holder[1]);
}
public void testCascadedExceptions() {
final boolean[] holder = new boolean[3];
boolean caught = false;
Runnable close1 = () -> { holder[0] = true; throw new RuntimeException("1"); };
Runnable close2 = () -> { holder[1] = true; throw new RuntimeException("2"); };
Runnable close3 = () -> { holder[2] = true; throw new RuntimeException("3"); };
try (Stream<Integer> ints = countTo(100).stream()) {
ints.onClose(close1).onClose(close2).onClose(close3);
ints.forEach(i -> {});
}
catch (RuntimeException e) {
assertCascaded(e, 3);
assertTrue(holder[0] && holder[1] && holder[2]);
caught = true;
}
assertTrue(caught);
Arrays.fill(holder, false);
caught = false;
try (Stream<Integer> ints = countTo(100).stream().onClose(close1).onClose(close2).onClose(close3)) {
ints.forEach(i -> {});
}
catch (RuntimeException e) {
assertCascaded(e, 3);
assertTrue(holder[0] && holder[1] && holder[2]);
caught = true;
}
assertTrue(caught);
caught = false;
Arrays.fill(holder, false);
try (Stream<Integer> ints = countTo(100).stream().filter(e -> true).onClose(close1).onClose(close2).onClose(close3)) {
ints.forEach(i -> {});
}
catch (RuntimeException e) {
assertCascaded(e, 3);
assertTrue(holder[0] && holder[1] && holder[2]);
caught = true;
}
assertTrue(caught);
caught = false;
Arrays.fill(holder, false);
try (Stream<Integer> ints = countTo(100).stream().filter(e -> true).onClose(close1).onClose(close2).filter(e -> true).onClose(close3)) {
ints.forEach(i -> {});
}
catch (RuntimeException e) {
assertCascaded(e, 3);
assertTrue(holder[0] && holder[1] && holder[2]);
caught = true;
}
assertTrue(caught);
}
private void assertCascaded(RuntimeException e, int n) {
assertTrue(e.getMessage().equals("1"));
assertTrue(e.getSuppressed().length == n - 1);
for (int i=0; i<n-1; i++)
assertTrue(e.getSuppressed()[i].getMessage().equals(String.valueOf(i + 2)));
}
}