/* * Copyright (c) 2018, 2022, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License version 2 only, as * published by the Free Software Foundation. * * This code is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License * version 2 for more details (a copy is included in the LICENSE file that * accompanied this code). * * You should have received a copy of the GNU General Public License version * 2 along with this work; if not, write to the Free Software Foundation, * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. * * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA * or visit www.oracle.com if you need additional information or have any * questions. */ /** * @test * @bug 8284161 * @summary Basic tests of virtual threads doing blocking I/O with NIO channels * @enablePreview * @library /test/lib * @run testng/othervm/timeout=300 BlockingChannelOps * @run testng/othervm/timeout=300 -Djdk.useDirectRegister BlockingChannelOps */ /** * @test * @requires vm.continuations * @enablePreview * @library /test/lib * @run testng/othervm/timeout=300 -XX:+UnlockExperimentalVMOptions -XX:-VMContinuations BlockingChannelOps */ import java.io.Closeable; import java.io.IOException; import java.net.DatagramPacket; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketAddress; import java.net.SocketException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousCloseException; import java.nio.channels.ClosedByInterruptException; import java.nio.channels.DatagramChannel; import java.nio.channels.Pipe; import java.nio.channels.ReadableByteChannel; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.channels.WritableByteChannel; import jdk.test.lib.thread.VThreadRunner; import org.testng.annotations.Test; import static org.testng.Assert.*; public class BlockingChannelOps { private static final long DELAY = 4000; /** * SocketChannel read/write, no blocking. */ @Test public void testSocketChannelReadWrite1() throws Exception { VThreadRunner.run(() -> { try (var connection = new Connection()) { SocketChannel sc1 = connection.channel1(); SocketChannel sc2 = connection.channel2(); // write should not block ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8")); int n = sc1.write(bb); assertTrue(n > 0); // read should not block bb = ByteBuffer.allocate(10); n = sc2.read(bb); assertTrue(n > 0); assertTrue(bb.get(0) == 'X'); } }); } /** * Virtual thread blocks in SocketChannel read. */ @Test public void testSocketChannelRead() throws Exception { VThreadRunner.run(() -> { try (var connection = new Connection()) { SocketChannel sc1 = connection.channel1(); SocketChannel sc2 = connection.channel2(); // schedule write ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8")); ScheduledWriter.schedule(sc1, bb, DELAY); // read should block bb = ByteBuffer.allocate(10); int n = sc2.read(bb); assertTrue(n > 0); assertTrue(bb.get(0) == 'X'); } }); } /** * Virtual thread blocks in SocketChannel write. */ @Test public void testSocketChannelWrite() throws Exception { VThreadRunner.run(() -> { try (var connection = new Connection()) { SocketChannel sc1 = connection.channel1(); SocketChannel sc2 = connection.channel2(); // schedule thread to read to EOF ScheduledReader.schedule(sc2, true, DELAY); // write should block ByteBuffer bb = ByteBuffer.allocate(100*1024); for (int i=0; i<1000; i++) { int n = sc1.write(bb); assertTrue(n > 0); bb.clear(); } } }); } /** * SocketChannel close while virtual thread blocked in read. */ @Test public void testSocketChannelReadAsyncClose() throws Exception { VThreadRunner.run(() -> { try (var connection = new Connection()) { SocketChannel sc = connection.channel1(); ScheduledCloser.schedule(sc, DELAY); try { int n = sc.read(ByteBuffer.allocate(100)); throw new RuntimeException("read returned " + n); } catch (AsynchronousCloseException expected) { } } }); } /** * Virtual thread interrupted while blocked in SocketChannel read. */ @Test public void testSocketChannelReadInterrupt() throws Exception { VThreadRunner.run(() -> { try (var connection = new Connection()) { SocketChannel sc = connection.channel1(); ScheduledInterrupter.schedule(Thread.currentThread(), DELAY); try { int n = sc.read(ByteBuffer.allocate(100)); throw new RuntimeException("read returned " + n); } catch (ClosedByInterruptException expected) { assertTrue(Thread.interrupted()); } } }); } /** * SocketChannel close while virtual thread blocked in write. */ @Test public void testSocketChannelWriteAsyncClose() throws Exception { VThreadRunner.run(() -> { try (var connection = new Connection()) { SocketChannel sc = connection.channel1(); ScheduledCloser.schedule(sc, DELAY); try { ByteBuffer bb = ByteBuffer.allocate(100*1024); for (;;) { int n = sc.write(bb); assertTrue(n > 0); bb.clear(); } } catch (AsynchronousCloseException expected) { } } }); } /** * Virtual thread interrupted while blocked in SocketChannel write. */ @Test public void testSocketChannelWriteInterrupt() throws Exception { VThreadRunner.run(() -> { try (var connection = new Connection()) { SocketChannel sc = connection.channel1(); ScheduledInterrupter.schedule(Thread.currentThread(), DELAY); try { ByteBuffer bb = ByteBuffer.allocate(100*1024); for (;;) { int n = sc.write(bb); assertTrue(n > 0); bb.clear(); } } catch (ClosedByInterruptException expected) { assertTrue(Thread.interrupted()); } } }); } /** * Virtual thread blocks in SocketChannel adaptor read. */ @Test public void testSocketAdaptorRead1() throws Exception { testSocketAdaptorRead(0); } /** * Virtual thread blocks in SocketChannel adaptor read with timeout. */ @Test public void testSocketAdaptorRead2() throws Exception { testSocketAdaptorRead(60_000); } private void testSocketAdaptorRead(int timeout) throws Exception { VThreadRunner.run(() -> { try (var connection = new Connection()) { SocketChannel sc1 = connection.channel1(); SocketChannel sc2 = connection.channel2(); // schedule write ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8")); ScheduledWriter.schedule(sc1, bb, DELAY); // read should block if (timeout > 0) sc2.socket().setSoTimeout(timeout); byte[] array = new byte[100]; int n = sc2.socket().getInputStream().read(array); assertTrue(n > 0); assertTrue(array[0] == 'X'); } }); } /** * ServerSocketChannel accept, no blocking. */ @Test public void testServerSocketChannelAccept1() throws Exception { VThreadRunner.run(() -> { try (var ssc = ServerSocketChannel.open()) { ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0)); var sc1 = SocketChannel.open(ssc.getLocalAddress()); // accept should not block var sc2 = ssc.accept(); sc1.close(); sc2.close(); } }); } /** * Virtual thread blocks in ServerSocketChannel accept. */ @Test public void testServerSocketChannelAccept2() throws Exception { VThreadRunner.run(() -> { try (var ssc = ServerSocketChannel.open()) { ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0)); var sc1 = SocketChannel.open(); ScheduledConnector.schedule(sc1, ssc.getLocalAddress(), DELAY); // accept will block var sc2 = ssc.accept(); sc1.close(); sc2.close(); } }); } /** * SeverSocketChannel close while virtual thread blocked in accept. */ @Test public void testServerSocketChannelAcceptAsyncClose() throws Exception { VThreadRunner.run(() -> { try (var ssc = ServerSocketChannel.open()) { InetAddress lh = InetAddress.getLoopbackAddress(); ssc.bind(new InetSocketAddress(lh, 0)); ScheduledCloser.schedule(ssc, DELAY); try { SocketChannel sc = ssc.accept(); sc.close(); throw new RuntimeException("connection accepted???"); } catch (AsynchronousCloseException expected) { } } }); } /** * Virtual thread interrupted while blocked in ServerSocketChannel accept. */ @Test public void testServerSocketChannelAcceptInterrupt() throws Exception { VThreadRunner.run(() -> { try (var ssc = ServerSocketChannel.open()) { InetAddress lh = InetAddress.getLoopbackAddress(); ssc.bind(new InetSocketAddress(lh, 0)); ScheduledInterrupter.schedule(Thread.currentThread(), DELAY); try { SocketChannel sc = ssc.accept(); sc.close(); throw new RuntimeException("connection accepted???"); } catch (ClosedByInterruptException expected) { assertTrue(Thread.interrupted()); } } }); } /** * Virtual thread blocks in ServerSocketChannel adaptor accept. */ @Test public void testSocketChannelAdaptorAccept1() throws Exception { testSocketChannelAdaptorAccept(0); } /** * Virtual thread blocks in ServerSocketChannel adaptor accept with timeout. */ @Test public void testSocketChannelAdaptorAccept2() throws Exception { testSocketChannelAdaptorAccept(60_000); } private void testSocketChannelAdaptorAccept(int timeout) throws Exception { VThreadRunner.run(() -> { try (var ssc = ServerSocketChannel.open()) { ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0)); var sc1 = SocketChannel.open(); ScheduledConnector.schedule(sc1, ssc.getLocalAddress(), DELAY); if (timeout > 0) ssc.socket().setSoTimeout(timeout); // accept will block Socket s = ssc.socket().accept(); sc1.close(); s.close(); } }); } /** * DatagramChannel receive/send, no blocking. */ @Test public void testDatagramChannelSendReceive1() throws Exception { VThreadRunner.run(() -> { try (DatagramChannel dc1 = DatagramChannel.open(); DatagramChannel dc2 = DatagramChannel.open()) { InetAddress lh = InetAddress.getLoopbackAddress(); dc2.bind(new InetSocketAddress(lh, 0)); // send should not block ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8")); int n = dc1.send(bb, dc2.getLocalAddress()); assertTrue(n > 0); // receive should not block bb = ByteBuffer.allocate(10); dc2.receive(bb); assertTrue(bb.get(0) == 'X'); } }); } /** * Virtual thread blocks in DatagramChannel receive. */ @Test public void testDatagramChannelSendReceive2() throws Exception { VThreadRunner.run(() -> { try (DatagramChannel dc1 = DatagramChannel.open(); DatagramChannel dc2 = DatagramChannel.open()) { InetAddress lh = InetAddress.getLoopbackAddress(); dc2.bind(new InetSocketAddress(lh, 0)); // schedule send ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8")); ScheduledSender.schedule(dc1, bb, dc2.getLocalAddress(), DELAY); // read should block bb = ByteBuffer.allocate(10); dc2.receive(bb); assertTrue(bb.get(0) == 'X'); } }); } /** * DatagramChannel close while virtual thread blocked in receive. */ @Test public void testDatagramChannelReceiveAsyncClose() throws Exception { VThreadRunner.run(() -> { try (DatagramChannel dc = DatagramChannel.open()) { InetAddress lh = InetAddress.getLoopbackAddress(); dc.bind(new InetSocketAddress(lh, 0)); ScheduledCloser.schedule(dc, DELAY); try { dc.receive(ByteBuffer.allocate(100)); throw new RuntimeException("receive returned"); } catch (AsynchronousCloseException expected) { } } }); } /** * Virtual thread interrupted while blocked in DatagramChannel receive. */ @Test public void testDatagramChannelReceiveInterrupt() throws Exception { VThreadRunner.run(() -> { try (DatagramChannel dc = DatagramChannel.open()) { InetAddress lh = InetAddress.getLoopbackAddress(); dc.bind(new InetSocketAddress(lh, 0)); ScheduledInterrupter.schedule(Thread.currentThread(), DELAY); try { dc.receive(ByteBuffer.allocate(100)); throw new RuntimeException("receive returned"); } catch (ClosedByInterruptException expected) { assertTrue(Thread.interrupted()); } } }); } /** * Virtual thread blocks in DatagramSocket adaptor receive. */ @Test public void testDatagramSocketAdaptorReceive1() throws Exception { testDatagramSocketAdaptorReceive(0); } /** * Virtual thread blocks in DatagramSocket adaptor receive with timeout. */ @Test public void testDatagramSocketAdaptorReceive2() throws Exception { testDatagramSocketAdaptorReceive(60_1000); } private void testDatagramSocketAdaptorReceive(int timeout) throws Exception { VThreadRunner.run(() -> { try (DatagramChannel dc1 = DatagramChannel.open(); DatagramChannel dc2 = DatagramChannel.open()) { InetAddress lh = InetAddress.getLoopbackAddress(); dc2.bind(new InetSocketAddress(lh, 0)); // schedule send ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8")); ScheduledSender.schedule(dc1, bb, dc2.getLocalAddress(), DELAY); // receive should block byte[] array = new byte[100]; DatagramPacket p = new DatagramPacket(array, 0, array.length); if (timeout > 0) dc2.socket().setSoTimeout(timeout); dc2.socket().receive(p); assertTrue(p.getLength() == 3 && array[0] == 'X'); } }); } /** * DatagramChannel close while virtual thread blocked in adaptor receive. */ @Test public void testDatagramSocketAdaptorReceiveAsyncClose1() throws Exception { testDatagramSocketAdaptorReceiveAsyncClose(0); } /** * DatagramChannel close while virtual thread blocked in adaptor receive * with timeout. */ @Test public void testDatagramSocketAdaptorReceiveAsyncClose2() throws Exception { testDatagramSocketAdaptorReceiveAsyncClose(60_1000); } private void testDatagramSocketAdaptorReceiveAsyncClose(int timeout) throws Exception { VThreadRunner.run(() -> { try (DatagramChannel dc = DatagramChannel.open()) { InetAddress lh = InetAddress.getLoopbackAddress(); dc.bind(new InetSocketAddress(lh, 0)); byte[] array = new byte[100]; DatagramPacket p = new DatagramPacket(array, 0, array.length); if (timeout > 0) dc.socket().setSoTimeout(timeout); // schedule channel/socket to be asynchronously closed ScheduledCloser.schedule(dc, DELAY); assertThrows(SocketException.class, () -> dc.socket().receive(p)); } }); } /** * Virtual thread interrupted while blocked in DatagramSocket adaptor receive. */ @Test public void testDatagramSocketAdaptorReceiveInterrupt1() throws Exception { testDatagramSocketAdaptorReceiveInterrupt(0); } /** * Virtual thread interrupted while blocked in DatagramSocket adaptor receive * with timeout. */ @Test public void testDatagramSocketAdaptorReceiveInterrupt2() throws Exception { testDatagramSocketAdaptorReceiveInterrupt(60_1000); } private void testDatagramSocketAdaptorReceiveInterrupt(int timeout) throws Exception { VThreadRunner.run(() -> { try (DatagramChannel dc = DatagramChannel.open()) { InetAddress lh = InetAddress.getLoopbackAddress(); dc.bind(new InetSocketAddress(lh, 0)); byte[] array = new byte[100]; DatagramPacket p = new DatagramPacket(array, 0, array.length); if (timeout > 0) dc.socket().setSoTimeout(timeout); // receive should block ScheduledInterrupter.schedule(Thread.currentThread(), DELAY); try { dc.socket().receive(p); fail(); } catch (ClosedByInterruptException expected) { assertTrue(Thread.interrupted()); } } }); } /** * Pipe read/write, no blocking. */ @Test public void testPipeReadWrite1() throws Exception { VThreadRunner.run(() -> { Pipe p = Pipe.open(); try (Pipe.SinkChannel sink = p.sink(); Pipe.SourceChannel source = p.source()) { // write should not block ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8")); int n = sink.write(bb); assertTrue(n > 0); // read should not block bb = ByteBuffer.allocate(10); n = source.read(bb); assertTrue(n > 0); assertTrue(bb.get(0) == 'X'); } }); } /** * Virtual thread blocks in Pipe.SourceChannel read. */ @Test public void testPipeReadWrite2() throws Exception { VThreadRunner.run(() -> { Pipe p = Pipe.open(); try (Pipe.SinkChannel sink = p.sink(); Pipe.SourceChannel source = p.source()) { // schedule write ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8")); ScheduledWriter.schedule(sink, bb, DELAY); // read should block bb = ByteBuffer.allocate(10); int n = source.read(bb); assertTrue(n > 0); assertTrue(bb.get(0) == 'X'); } }); } /** * Virtual thread blocks in Pipe.SinkChannel write. */ @Test public void testPipeReadWrite3() throws Exception { VThreadRunner.run(() -> { Pipe p = Pipe.open(); try (Pipe.SinkChannel sink = p.sink(); Pipe.SourceChannel source = p.source()) { // schedule thread to read to EOF ScheduledReader.schedule(source, true, DELAY); // write should block ByteBuffer bb = ByteBuffer.allocate(100*1024); for (int i=0; i<1000; i++) { int n = sink.write(bb); assertTrue(n > 0); bb.clear(); } } }); } /** * Pipe.SourceChannel close while virtual thread blocked in read. */ @Test public void testPipeReadAsyncClose() throws Exception { VThreadRunner.run(() -> { Pipe p = Pipe.open(); try (Pipe.SourceChannel source = p.source()) { ScheduledCloser.schedule(source, DELAY); try { int n = source.read(ByteBuffer.allocate(100)); throw new RuntimeException("read returned " + n); } catch (AsynchronousCloseException expected) { } } }); } /** * Virtual thread interrupted while blocked in Pipe.SourceChannel read. */ @Test public void testPipeReadInterrupt() throws Exception { VThreadRunner.run(() -> { Pipe p = Pipe.open(); try (Pipe.SourceChannel source = p.source()) { ScheduledInterrupter.schedule(Thread.currentThread(), DELAY); try { int n = source.read(ByteBuffer.allocate(100)); throw new RuntimeException("read returned " + n); } catch (ClosedByInterruptException expected) { assertTrue(Thread.interrupted()); } } }); } /** * Pipe.SinkChannel close while virtual thread blocked in write. */ @Test public void testPipeWriteAsyncClose() throws Exception { VThreadRunner.run(() -> { Pipe p = Pipe.open(); try (Pipe.SinkChannel sink = p.sink()) { ScheduledCloser.schedule(sink, DELAY); try { ByteBuffer bb = ByteBuffer.allocate(100*1024); for (;;) { int n = sink.write(bb); assertTrue(n > 0); bb.clear(); } } catch (AsynchronousCloseException expected) { } } }); } /** * Virtual thread interrupted while blocked in Pipe.SinkChannel write. */ @Test public void testPipeWriteInterrupt() throws Exception { VThreadRunner.run(() -> { Pipe p = Pipe.open(); try (Pipe.SinkChannel sink = p.sink()) { ScheduledInterrupter.schedule(Thread.currentThread(), DELAY); try { ByteBuffer bb = ByteBuffer.allocate(100*1024); for (;;) { int n = sink.write(bb); assertTrue(n > 0); bb.clear(); } } catch (ClosedByInterruptException expected) { assertTrue(Thread.interrupted()); } } }); } // -- supporting classes -- /** * Creates a loopback connection */ static class Connection implements Closeable { private final SocketChannel sc1; private final SocketChannel sc2; Connection() throws IOException { var lh = InetAddress.getLoopbackAddress(); try (var listener = ServerSocketChannel.open()) { listener.bind(new InetSocketAddress(lh, 0)); SocketChannel sc1 = SocketChannel.open(); SocketChannel sc2 = null; try { sc1.socket().connect(listener.getLocalAddress(), 10_000); sc2 = listener.accept(); } catch (IOException ioe) { sc1.close(); throw ioe; } this.sc1 = sc1; this.sc2 = sc2; } } SocketChannel channel1() { return sc1; } SocketChannel channel2() { return sc2; } @Override public void close() throws IOException { sc1.close(); sc2.close(); } } /** * Closes a channel after a delay */ static class ScheduledCloser implements Runnable { private final Closeable c; private final long delay; ScheduledCloser(Closeable c, long delay) { this.c = c; this.delay = delay; } @Override public void run() { try { Thread.sleep(delay); c.close(); } catch (Exception e) { } } static void schedule(Closeable c, long delay) { new Thread(new ScheduledCloser(c, delay)).start(); } } /** * Interrupts a thread after a delay */ static class ScheduledInterrupter implements Runnable { private final Thread thread; private final long delay; ScheduledInterrupter(Thread thread, long delay) { this.thread = thread; this.delay = delay; } @Override public void run() { try { Thread.sleep(delay); thread.interrupt(); } catch (Exception e) { } } static void schedule(Thread thread, long delay) { new Thread(new ScheduledInterrupter(thread, delay)).start(); } } /** * Establish a connection to a socket address after a delay */ static class ScheduledConnector implements Runnable { private final SocketChannel sc; private final SocketAddress address; private final long delay; ScheduledConnector(SocketChannel sc, SocketAddress address, long delay) { this.sc = sc; this.address = address; this.delay = delay; } @Override public void run() { try { Thread.sleep(delay); sc.connect(address); } catch (Exception e) { } } static void schedule(SocketChannel sc, SocketAddress address, long delay) { new Thread(new ScheduledConnector(sc, address, delay)).start(); } } /** * Reads from a connection, and to EOF, after a delay */ static class ScheduledReader implements Runnable { private final ReadableByteChannel rbc; private final boolean readAll; private final long delay; ScheduledReader(ReadableByteChannel rbc, boolean readAll, long delay) { this.rbc = rbc; this.readAll = readAll; this.delay = delay; } @Override public void run() { try { Thread.sleep(delay); ByteBuffer bb = ByteBuffer.allocate(100*1024); for (;;) { int n = rbc.read(bb); if (n == -1 || !readAll) break; bb.clear(); } } catch (Exception e) { } } static void schedule(ReadableByteChannel rbc, boolean readAll, long delay) { new Thread(new ScheduledReader(rbc, readAll, delay)).start(); } } /** * Writes to a connection after a delay */ static class ScheduledWriter implements Runnable { private final WritableByteChannel wbc; private final ByteBuffer buf; private final long delay; ScheduledWriter(WritableByteChannel wbc, ByteBuffer buf, long delay) { this.wbc = wbc; this.buf = buf; this.delay = delay; } @Override public void run() { try { Thread.sleep(delay); wbc.write(buf); } catch (Exception e) { } } static void schedule(WritableByteChannel wbc, ByteBuffer buf, long delay) { new Thread(new ScheduledWriter(wbc, buf, delay)).start(); } } /** * Sends a datagram to a target address after a delay */ static class ScheduledSender implements Runnable { private final DatagramChannel dc; private final ByteBuffer buf; private final SocketAddress address; private final long delay; ScheduledSender(DatagramChannel dc, ByteBuffer buf, SocketAddress address, long delay) { this.dc = dc; this.buf = buf; this.address = address; this.delay = delay; } @Override public void run() { try { Thread.sleep(delay); dc.send(buf, address); } catch (Exception e) { } } static void schedule(DatagramChannel dc, ByteBuffer buf, SocketAddress address, long delay) { new Thread(new ScheduledSender(dc, buf, address, delay)).start(); } } }