/* * Copyright (c) 2008, 2017, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License version 2 only, as * published by the Free Software Foundation. * * This code is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License * version 2 for more details (a copy is included in the LICENSE file that * accompanied this code). * * You should have received a copy of the GNU General Public License version * 2 along with this work; if not, write to the Free Software Foundation, * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. * * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA * or visit www.oracle.com if you need additional information or have any * questions. */ /* @test * @bug 4607272 6842687 6878369 6944810 7023403 * @summary Unit test for AsynchronousSocketChannel(use -Dseed=X to set PRNG seed) * @library /test/lib * @run main Basic -skipSlowConnectTest * @key randomness intermittent */ import java.io.Closeable; import java.io.IOException; import java.net.*; import static java.net.StandardSocketOptions.*; import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.Random; import java.util.Set; import java.util.concurrent.*; import java.util.concurrent.atomic.*; import jdk.test.lib.RandomFactory; public class Basic { private static final Random RAND = RandomFactory.getRandom(); static boolean skipSlowConnectTest = false; public static void main(String[] args) throws Exception { for (String arg: args) { switch (arg) { case "-skipSlowConnectTest" : skipSlowConnectTest = true; break; default: throw new RuntimeException("Unrecognized argument: " + arg); } } testBind(); testSocketOptions(); testConnect(); testCloseWhenPending(); testCancel(); testRead1(); testRead2(); testRead3(); testWrite1(); testWrite2(); // skip timeout tests until 7052549 is fixed if (!System.getProperty("os.name").startsWith("Windows")) testTimeout(); testShutdown(); } static class Server implements Closeable { private final ServerSocketChannel ssc; private final InetSocketAddress address; Server() throws IOException { ssc = ServerSocketChannel.open().bind(new InetSocketAddress(0)); InetAddress lh = InetAddress.getLocalHost(); int port = ((InetSocketAddress)(ssc.getLocalAddress())).getPort(); address = new InetSocketAddress(lh, port); } InetSocketAddress address() { return address; } SocketChannel accept() throws IOException { return ssc.accept(); } public void close() throws IOException { ssc.close(); } } static void testBind() throws Exception { System.out.println("-- bind --"); try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) { if (ch.getLocalAddress() != null) throw new RuntimeException("Local address should be 'null'"); ch.bind(new InetSocketAddress(0)); // check local address after binding InetSocketAddress local = (InetSocketAddress)ch.getLocalAddress(); if (local.getPort() == 0) throw new RuntimeException("Unexpected port"); if (!local.getAddress().isAnyLocalAddress()) throw new RuntimeException("Not bound to a wildcard address"); // try to re-bind try { ch.bind(new InetSocketAddress(0)); throw new RuntimeException("AlreadyBoundException expected"); } catch (AlreadyBoundException x) { } } // check ClosedChannelException AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(); ch.close(); try { ch.bind(new InetSocketAddress(0)); throw new RuntimeException("ClosedChannelException expected"); } catch (ClosedChannelException x) { } } static void testSocketOptions() throws Exception { System.out.println("-- socket options --"); try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) { ch.setOption(SO_RCVBUF, 128*1024) .setOption(SO_SNDBUF, 128*1024) .setOption(SO_REUSEADDR, true); // check SO_SNDBUF/SO_RCVBUF limits int before, after; before = ch.getOption(SO_SNDBUF); after = ch.setOption(SO_SNDBUF, Integer.MAX_VALUE).getOption(SO_SNDBUF); if (after < before) throw new RuntimeException("setOption caused SO_SNDBUF to decrease"); before = ch.getOption(SO_RCVBUF); after = ch.setOption(SO_RCVBUF, Integer.MAX_VALUE).getOption(SO_RCVBUF); if (after < before) throw new RuntimeException("setOption caused SO_RCVBUF to decrease"); ch.bind(new InetSocketAddress(0)); // default values if (ch.getOption(SO_KEEPALIVE)) throw new RuntimeException("Default of SO_KEEPALIVE should be 'false'"); if (ch.getOption(TCP_NODELAY)) throw new RuntimeException("Default of TCP_NODELAY should be 'false'"); // set and check if (!ch.setOption(SO_KEEPALIVE, true).getOption(SO_KEEPALIVE)) throw new RuntimeException("SO_KEEPALIVE did not change"); if (!ch.setOption(TCP_NODELAY, true).getOption(TCP_NODELAY)) throw new RuntimeException("SO_KEEPALIVE did not change"); // read others (can't check as actual value is implementation dependent) ch.getOption(SO_RCVBUF); ch.getOption(SO_SNDBUF); Set> options = ch.supportedOptions(); boolean reuseport = options.contains(SO_REUSEPORT); if (reuseport) { if (ch.getOption(SO_REUSEPORT)) throw new RuntimeException("Default of SO_REUSEPORT should be 'false'"); if (!ch.setOption(SO_REUSEPORT, true).getOption(SO_REUSEPORT)) throw new RuntimeException("SO_REUSEPORT did not change"); } } } static void testConnect() throws Exception { System.out.println("-- connect --"); SocketAddress address; try (Server server = new Server()) { address = server.address(); // connect to server and check local/remote addresses try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) { ch.connect(address).get(); // check local address if (ch.getLocalAddress() == null) throw new RuntimeException("Not bound to local address"); // check remote address InetSocketAddress remote = (InetSocketAddress)ch.getRemoteAddress(); if (remote.getPort() != server.address().getPort()) throw new RuntimeException("Connected to unexpected port"); if (!remote.getAddress().equals(server.address().getAddress())) throw new RuntimeException("Connected to unexpected address"); // try to connect again try { ch.connect(server.address()).get(); throw new RuntimeException("AlreadyConnectedException expected"); } catch (AlreadyConnectedException x) { } // clean-up server.accept().close(); } // check that connect fails with ClosedChannelException AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(); ch.close(); try { ch.connect(server.address()).get(); throw new RuntimeException("ExecutionException expected"); } catch (ExecutionException x) { if (!(x.getCause() instanceof ClosedChannelException)) throw new RuntimeException("Cause of ClosedChannelException expected", x.getCause()); } final AtomicReference connectException = new AtomicReference<>(); ch.connect(server.address(), (Void)null, new CompletionHandler() { public void completed(Void result, Void att) { } public void failed(Throwable exc, Void att) { connectException.set(exc); } }); while (connectException.get() == null) { Thread.sleep(100); } if (!(connectException.get() instanceof ClosedChannelException)) throw new RuntimeException("ClosedChannelException expected", connectException.get()); } // test that failure to connect closes the channel try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) { try { ch.connect(address).get(); } catch (ExecutionException x) { // failed to establish connection if (ch.isOpen()) throw new RuntimeException("Channel should be closed"); } } // repeat test by connecting to a (probably) non-existent host. This // improves the chance that the connect will not fail immediately. if (!skipSlowConnectTest) { try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) { try { ch.connect(genSocketAddress()).get(); } catch (ExecutionException x) { // failed to establish connection if (ch.isOpen()) throw new RuntimeException("Channel should be closed"); } } } } static void testCloseWhenPending() throws Exception { System.out.println("-- asynchronous close when connecting --"); AsynchronousSocketChannel ch; // asynchronous close while connecting ch = AsynchronousSocketChannel.open(); Future connectResult = ch.connect(genSocketAddress()); // give time to initiate the connect (SYN) Thread.sleep(50); // close ch.close(); // check that exception is thrown in timely manner try { connectResult.get(5, TimeUnit.SECONDS); } catch (TimeoutException x) { throw new RuntimeException("AsynchronousCloseException not thrown"); } catch (ExecutionException x) { // expected } System.out.println("-- asynchronous close when reading --"); try (Server server = new Server()) { ch = AsynchronousSocketChannel.open(); ch.connect(server.address()).get(); ByteBuffer dst = ByteBuffer.allocateDirect(100); Future result = ch.read(dst); // attempt a second read - should fail with ReadPendingException ByteBuffer buf = ByteBuffer.allocateDirect(100); try { ch.read(buf); throw new RuntimeException("ReadPendingException expected"); } catch (ReadPendingException x) { } // close channel (should cause initial read to complete) ch.close(); server.accept().close(); // check that AsynchronousCloseException is thrown try { result.get(); throw new RuntimeException("Should not read"); } catch (ExecutionException x) { if (!(x.getCause() instanceof AsynchronousCloseException)) throw new RuntimeException(x); } System.out.println("-- asynchronous close when writing --"); ch = AsynchronousSocketChannel.open(); ch.connect(server.address()).get(); final AtomicReference writeException = new AtomicReference(); // write bytes to fill socket buffer final AtomicInteger numCompleted = new AtomicInteger(); ch.write(genBuffer(), ch, new CompletionHandler() { public void completed(Integer result, AsynchronousSocketChannel ch) { numCompleted.incrementAndGet(); ch.write(genBuffer(), ch, this); } public void failed(Throwable x, AsynchronousSocketChannel ch) { writeException.set(x); } }); // give time for socket buffer to fill up - // take pauses until the handler is no longer being invoked // because all writes are being pended which guarantees that // the internal channel state indicates it is writing int prevNumCompleted = numCompleted.get(); do { Thread.sleep(1000); if (numCompleted.get() == prevNumCompleted) { break; } prevNumCompleted = numCompleted.get(); } while (true); // attempt a concurrent write - // should fail with WritePendingException try { ch.write(genBuffer()); throw new RuntimeException("WritePendingException expected"); } catch (WritePendingException x) { } // close channel - should cause initial write to complete ch.close(); server.accept().close(); // wait for exception while (writeException.get() == null) { Thread.sleep(100); } if (!(writeException.get() instanceof AsynchronousCloseException)) throw new RuntimeException("AsynchronousCloseException expected", writeException.get()); } } static void testCancel() throws Exception { System.out.println("-- cancel --"); try (Server server = new Server()) { for (int i=0; i<2; i++) { boolean mayInterruptIfRunning = (i == 0) ? false : true; // establish loopback connection AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(); ch.connect(server.address()).get(); SocketChannel peer = server.accept(); // start read operation ByteBuffer buf = ByteBuffer.allocate(1); Future res = ch.read(buf); // cancel operation boolean cancelled = res.cancel(mayInterruptIfRunning); // check post-conditions if (!res.isDone()) throw new RuntimeException("isDone should return true"); if (res.isCancelled() != cancelled) throw new RuntimeException("isCancelled not consistent"); try { res.get(); throw new RuntimeException("CancellationException expected"); } catch (CancellationException x) { } try { res.get(1, TimeUnit.SECONDS); throw new RuntimeException("CancellationException expected"); } catch (CancellationException x) { } // check that the cancel doesn't impact writing to the channel if (!mayInterruptIfRunning) { buf = ByteBuffer.wrap("a".getBytes()); ch.write(buf).get(); } ch.close(); peer.close(); } } } static void testRead1() throws Exception { System.out.println("-- read (1) --"); try (Server server = new Server()) { final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(); ch.connect(server.address()).get(); // read with 0 bytes remaining should complete immediately ByteBuffer buf = ByteBuffer.allocate(1); buf.put((byte)0); int n = ch.read(buf).get(); if (n != 0) throw new RuntimeException("0 expected"); // write bytes and close connection ByteBuffer src = genBuffer(); try (SocketChannel sc = server.accept()) { sc.setOption(SO_SNDBUF, src.remaining()); while (src.hasRemaining()) sc.write(src); } // reads should complete immediately final ByteBuffer dst = ByteBuffer.allocateDirect(src.capacity() + 100); final CountDownLatch latch = new CountDownLatch(1); ch.read(dst, (Void)null, new CompletionHandler() { public void completed(Integer result, Void att) { int n = result; if (n > 0) { ch.read(dst, (Void)null, this); } else { latch.countDown(); } } public void failed(Throwable exc, Void att) { } }); latch.await(); // check buffers src.flip(); dst.flip(); if (!src.equals(dst)) { throw new RuntimeException("Contents differ"); } // close channel ch.close(); // check read fails with ClosedChannelException try { ch.read(dst).get(); throw new RuntimeException("ExecutionException expected"); } catch (ExecutionException x) { if (!(x.getCause() instanceof ClosedChannelException)) throw new RuntimeException("Cause of ClosedChannelException expected", x.getCause()); } } } static void testRead2() throws Exception { System.out.println("-- read (2) --"); try (Server server = new Server()) { final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(); ch.connect(server.address()).get(); SocketChannel sc = server.accept(); ByteBuffer src = genBuffer(); // read until the buffer is full final ByteBuffer dst = ByteBuffer.allocateDirect(src.capacity()); final CountDownLatch latch = new CountDownLatch(1); ch.read(dst, (Void)null, new CompletionHandler() { public void completed(Integer result, Void att) { if (dst.hasRemaining()) { ch.read(dst, (Void)null, this); } else { latch.countDown(); } } public void failed(Throwable exc, Void att) { } }); // trickle the writing do { int rem = src.remaining(); int size = (rem <= 100) ? rem : 50 + RAND.nextInt(rem - 100); ByteBuffer buf = ByteBuffer.allocate(size); for (int i=0; i() { public void completed(Long result, Void att) { long n = result; if (n <= 0) throw new RuntimeException("No bytes read"); l1.countDown(); } public void failed(Throwable exc, Void att) { } }); // write some bytes sc.write(genBuffer()); // read should now complete l1.await(); // write more bytes sc.write(genBuffer()); // read should complete immediately for (int i=0; i() { public void completed(Long result, Void att) { long n = result; if (n <= 0) throw new RuntimeException("No bytes read"); l2.countDown(); } public void failed(Throwable exc, Void att) { } }); l2.await(); ch.close(); sc.close(); } } static void testWrite1() throws Exception { System.out.println("-- write (1) --"); try (Server server = new Server()) { final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(); ch.connect(server.address()).get(); SocketChannel sc = server.accept(); // write with 0 bytes remaining should complete immediately ByteBuffer buf = ByteBuffer.allocate(1); buf.put((byte)0); int n = ch.write(buf).get(); if (n != 0) throw new RuntimeException("0 expected"); // write all bytes and close connection when done final ByteBuffer src = genBuffer(); ch.write(src, (Void)null, new CompletionHandler() { public void completed(Integer result, Void att) { if (src.hasRemaining()) { ch.write(src, (Void)null, this); } else { try { ch.close(); } catch (IOException ignore) { } } } public void failed(Throwable exc, Void att) { } }); // read to EOF or buffer full ByteBuffer dst = ByteBuffer.allocateDirect(src.capacity() + 100); do { n = sc.read(dst); } while (n > 0); sc.close(); // check buffers src.flip(); dst.flip(); if (!src.equals(dst)) { throw new RuntimeException("Contents differ"); } // check write fails with ClosedChannelException try { ch.read(dst).get(); throw new RuntimeException("ExecutionException expected"); } catch (ExecutionException x) { if (!(x.getCause() instanceof ClosedChannelException)) throw new RuntimeException("Cause of ClosedChannelException expected", x.getCause()); } } } // exercise gathering write static void testWrite2() throws Exception { System.out.println("-- write (2) --"); try (Server server = new Server()) { final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(); ch.connect(server.address()).get(); SocketChannel sc = server.accept(); // number of bytes written final AtomicLong bytesWritten = new AtomicLong(0); // write buffers (should complete immediately) ByteBuffer[] srcs = genBuffers(1); final CountDownLatch l1 = new CountDownLatch(1); ch.write(srcs, 0, srcs.length, 0L, TimeUnit.SECONDS, (Void)null, new CompletionHandler() { public void completed(Long result, Void att) { long n = result; if (n <= 0) throw new RuntimeException("No bytes read"); bytesWritten.addAndGet(n); l1.countDown(); } public void failed(Throwable exc, Void att) { } }); l1.await(); // set to true to signal that no more buffers should be written final AtomicBoolean continueWriting = new AtomicBoolean(true); // write until socket buffer is full so as to create the conditions // for when a write does not complete immediately srcs = genBuffers(1); ch.write(srcs, 0, srcs.length, 0L, TimeUnit.SECONDS, (Void)null, new CompletionHandler() { public void completed(Long result, Void att) { long n = result; if (n <= 0) throw new RuntimeException("No bytes written"); bytesWritten.addAndGet(n); if (continueWriting.get()) { ByteBuffer[] srcs = genBuffers(8); ch.write(srcs, 0, srcs.length, 0L, TimeUnit.SECONDS, (Void)null, this); } } public void failed(Throwable exc, Void att) { } }); // give time for socket buffer to fill up. Thread.sleep(5*1000); // signal handler to stop further writing continueWriting.set(false); // read until done ByteBuffer buf = ByteBuffer.allocateDirect(4096); long total = 0L; do { int n = sc.read(buf); if (n <= 0) throw new RuntimeException("No bytes read"); buf.rewind(); total += n; } while (total < bytesWritten.get()); ch.close(); sc.close(); } } static void testShutdown() throws Exception { System.out.println("-- shutdown --"); try (Server server = new Server(); AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) { ch.connect(server.address()).get(); try (SocketChannel peer = server.accept()) { ByteBuffer buf = ByteBuffer.allocateDirect(1000); int n; // check read ch.shutdownInput(); n = ch.read(buf).get(); if (n != -1) throw new RuntimeException("-1 expected"); // check full with full buffer buf.put(new byte[100]); n = ch.read(buf).get(); if (n != -1) throw new RuntimeException("-1 expected"); // check write ch.shutdownOutput(); try { ch.write(buf).get(); throw new RuntimeException("ClosedChannelException expected"); } catch (ExecutionException x) { if (!(x.getCause() instanceof ClosedChannelException)) throw new RuntimeException("ClosedChannelException expected", x.getCause()); } } } } static void testTimeout() throws Exception { System.out.println("-- timeouts --"); testTimeout(Integer.MIN_VALUE, TimeUnit.SECONDS); testTimeout(-1L, TimeUnit.SECONDS); testTimeout(0L, TimeUnit.SECONDS); testTimeout(2L, TimeUnit.SECONDS); } static void testTimeout(final long timeout, final TimeUnit unit) throws Exception { System.out.printf("---- timeout: %d ms%n", unit.toMillis(timeout)); try (Server server = new Server()) { AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(); ch.connect(server.address()).get(); ByteBuffer dst = ByteBuffer.allocate(512); final AtomicReference readException = new AtomicReference(); // this read should timeout if value is > 0 ch.read(dst, timeout, unit, null, new CompletionHandler() { public void completed(Integer result, Void att) { readException.set(new RuntimeException("Should not complete")); } public void failed(Throwable exc, Void att) { readException.set(exc); } }); if (timeout > 0L) { // wait for exception while (readException.get() == null) { Thread.sleep(100); } if (!(readException.get() instanceof InterruptedByTimeoutException)) throw new RuntimeException("InterruptedByTimeoutException expected", readException.get()); // after a timeout then further reading should throw unspecified runtime exception boolean exceptionThrown = false; try { ch.read(dst); } catch (RuntimeException x) { exceptionThrown = true; } if (!exceptionThrown) throw new RuntimeException("RuntimeException expected after timeout."); } else { Thread.sleep(1000); Throwable exc = readException.get(); if (exc != null) throw new RuntimeException(exc); } final AtomicReference writeException = new AtomicReference(); // write bytes to fill socket buffer ch.write(genBuffer(), timeout, unit, ch, new CompletionHandler() { public void completed(Integer result, AsynchronousSocketChannel ch) { ch.write(genBuffer(), timeout, unit, ch, this); } public void failed(Throwable exc, AsynchronousSocketChannel ch) { writeException.set(exc); } }); if (timeout > 0) { // wait for exception while (writeException.get() == null) { Thread.sleep(100); } if (!(writeException.get() instanceof InterruptedByTimeoutException)) throw new RuntimeException("InterruptedByTimeoutException expected", writeException.get()); // after a timeout then further writing should throw unspecified runtime exception boolean exceptionThrown = false; try { ch.write(genBuffer()); } catch (RuntimeException x) { exceptionThrown = true; } if (!exceptionThrown) throw new RuntimeException("RuntimeException expected after timeout."); } else { Thread.sleep(1000); Throwable exc = writeException.get(); if (exc != null) throw new RuntimeException(exc); } // clean-up server.accept().close(); ch.close(); } } // returns ByteBuffer with random bytes static ByteBuffer genBuffer() { int size = 1024 + RAND.nextInt(16000); byte[] buf = new byte[size]; RAND.nextBytes(buf); boolean useDirect = RAND.nextBoolean(); if (useDirect) { ByteBuffer bb = ByteBuffer.allocateDirect(buf.length); bb.put(buf); bb.flip(); return bb; } else { return ByteBuffer.wrap(buf); } } // return ByteBuffer[] with random bytes static ByteBuffer[] genBuffers(int max) { int len = 1; if (max > 1) len += RAND.nextInt(max); ByteBuffer[] bufs = new ByteBuffer[len]; for (int i=0; i