8294375: test/jdk/java/nio/channels/vthread/BlockingChannelOps.java is slow

Reviewed-by: jpai
This commit is contained in:
Alan Bateman 2022-09-28 13:10:43 +00:00
parent 60616f243f
commit 37f83b9b8e
2 changed files with 293 additions and 486 deletions
test/jdk/java
net/vthread
nio/channels/vthread

@ -24,18 +24,19 @@
/**
* @test id=default
* @bug 8284161
* @summary Basic tests of virtual threads doing blocking I/O with java.net sockets
* @summary Test virtual threads doing blocking I/O on java.net sockets
* @enablePreview
* @library /test/lib
* @run testng/othervm/timeout=300 BlockingSocketOps
* @run testng/othervm BlockingSocketOps
*/
/**
* @test id=indirect-register
* @summary Basic tests of virtual threads doing blocking I/O with java.net sockets
* @test id=direct-register
* @summary Test virtual threads doing blocking I/O on java.net sockets and with
* the I/O poller configured to use direct registration
* @enablePreview
* @library /test/lib
* @run testng/othervm/timeout=300 -Djdk.useDirectRegister BlockingSocketOps
* @run testng/othervm -Djdk.useDirectRegister BlockingSocketOps
*/
/**
@ -43,7 +44,7 @@
* @requires vm.continuations
* @enablePreview
* @library /test/lib
* @run testng/othervm/timeout=300 -XX:+UnlockExperimentalVMOptions -XX:-VMContinuations BlockingSocketOps
* @run testng/othervm -XX:+UnlockExperimentalVMOptions -XX:-VMContinuations BlockingSocketOps
*/
import java.io.Closeable;
@ -66,8 +67,6 @@ import static org.testng.Assert.*;
public class BlockingSocketOps {
private static final long DELAY = 2000;
/**
* Socket read/write, no blocking.
*/
@ -113,19 +112,18 @@ public class BlockingSocketOps {
Socket s1 = connection.socket1();
Socket s2 = connection.socket2();
// schedule write
byte[] ba = "XXX".getBytes("UTF-8");
ScheduledWriter.schedule(s1, ba, DELAY);
// delayed write from sc1
byte[] ba1 = "XXX".getBytes("UTF-8");
runAfterParkedAsync(() -> s1.getOutputStream().write(ba1));
// read should block
// read from sc2 should block
if (timeout > 0) {
assert timeout > DELAY;
s2.setSoTimeout(timeout);
}
ba = new byte[10];
int n = s2.getInputStream().read(ba);
byte[] ba2 = new byte[10];
int n = s2.getInputStream().read(ba2);
assertTrue(n > 0);
assertTrue(ba[0] == 'X');
assertTrue(ba2[0] == 'X');
}
});
}
@ -140,21 +138,27 @@ public class BlockingSocketOps {
Socket s1 = connection.socket1();
Socket s2 = connection.socket2();
// schedule thread to read to EOF
ScheduledReader.schedule(s2, true, DELAY);
// delayed read from s2 to EOF
InputStream in = s2.getInputStream();
Thread reader = runAfterParkedAsync(() ->
in.transferTo(OutputStream.nullOutputStream()));
// write should block
byte[] ba = new byte[100*1024];
OutputStream out = s1.getOutputStream();
for (int i=0; i<1000; i++) {
out.write(ba);
try (OutputStream out = s1.getOutputStream()) {
for (int i = 0; i < 1000; i++) {
out.write(ba);
}
}
// wait for reader to finish
reader.join();
}
});
}
/**
* Virtual thread blocks in read, peer closes connection.
* Virtual thread blocks in read, peer closes connection gracefully.
*/
@Test
public void testSocketReadPeerClose1() throws Exception {
@ -163,8 +167,10 @@ public class BlockingSocketOps {
Socket s1 = connection.socket1();
Socket s2 = connection.socket2();
ScheduledCloser.schedule(s2, DELAY);
// delayed close of s2
runAfterParkedAsync(s2::close);
// read from s1 should block, then read -1
int n = s1.getInputStream().read();
assertTrue(n == -1);
}
@ -181,12 +187,14 @@ public class BlockingSocketOps {
Socket s1 = connection.socket1();
Socket s2 = connection.socket2();
// delayed abrupt close of s2
s2.setSoLinger(true, 0);
ScheduledCloser.schedule(s2, DELAY);
runAfterParkedAsync(s2::close);
// read from s1 should block, then throw
try {
s1.getInputStream().read();
fail();
int n = s1.getInputStream().read();
fail("read " + n);
} catch (IOException ioe) {
// expected
}
@ -214,14 +222,17 @@ public class BlockingSocketOps {
VThreadRunner.run(() -> {
try (var connection = new Connection()) {
Socket s = connection.socket1();
ScheduledCloser.schedule(s, DELAY);
// delayed close of s
runAfterParkedAsync(s::close);
// read from s should block, then throw
if (timeout > 0) {
s.setSoTimeout(timeout);
}
try {
if (timeout > 0) {
assert timeout > DELAY;
s.setSoTimeout(timeout);
}
int n = s.getInputStream().read();
throw new RuntimeException("read returned " + n);
fail("read " + n);
} catch (SocketException expected) { }
}
});
@ -247,14 +258,19 @@ public class BlockingSocketOps {
VThreadRunner.run(() -> {
try (var connection = new Connection()) {
Socket s = connection.socket1();
ScheduledInterrupter.schedule(Thread.currentThread(), DELAY);
// delayed interrupt of current thread
Thread thisThread = Thread.currentThread();
runAfterParkedAsync(thisThread::interrupt);
// read from s should block, then throw
if (timeout > 0) {
s.setSoTimeout(timeout);
}
try {
if (timeout > 0) {
assert timeout > DELAY;
s.setSoTimeout(timeout);
}
int n = s.getInputStream().read();
throw new RuntimeException("read returned " + n);
fail("read " + n);
} catch (SocketException expected) {
assertTrue(Thread.interrupted());
assertTrue(s.isClosed());
@ -271,7 +287,11 @@ public class BlockingSocketOps {
VThreadRunner.run(() -> {
try (var connection = new Connection()) {
Socket s = connection.socket1();
ScheduledCloser.schedule(s, DELAY);
// delayedclose of s
runAfterParkedAsync(s::close);
// write to s should block, then throw
try {
byte[] ba = new byte[100*1024];
OutputStream out = s.getOutputStream();
@ -284,14 +304,19 @@ public class BlockingSocketOps {
}
/**
* Virtual thread interrupted while blocked in Socket write
* Virtual thread interrupted while blocked in Socket write.
*/
@Test
public void testSocketWriteInterrupt() throws Exception {
VThreadRunner.run(() -> {
try (var connection = new Connection()) {
Socket s = connection.socket1();
ScheduledInterrupter.schedule(Thread.currentThread(), DELAY);
// delayed interrupt of current thread
Thread thisThread = Thread.currentThread();
runAfterParkedAsync(thisThread::interrupt);
// write to s should block, then throw
try {
byte[] ba = new byte[100*1024];
OutputStream out = s.getOutputStream();
@ -317,7 +342,9 @@ public class BlockingSocketOps {
Socket s2 = connection.socket2();
// urgent data should be received
ScheduledUrgentData.scheduleUrgentData(s2, 'X', DELAY);
runAfterParkedAsync(() -> s2.sendUrgentData('X'));
// read should block, then read the OOB byte
s1.setOOBInline(true);
byte[] ba = new byte[10];
int n = s1.getInputStream().read(ba);
@ -342,8 +369,13 @@ public class BlockingSocketOps {
@Test
public void testServerSocketAccept1() throws Exception {
VThreadRunner.run(() -> {
try (var listener = new ServerSocket(0)) {
var socket1 = new Socket(listener.getInetAddress(), listener.getLocalPort());
try (var listener = new ServerSocket()) {
InetAddress loopback = InetAddress.getLoopbackAddress();
listener.bind(new InetSocketAddress(loopback, 0));
// establish connection
var socket1 = new Socket(loopback, listener.getLocalPort());
// accept should not block
var socket2 = listener.accept();
socket1.close();
@ -370,12 +402,17 @@ public class BlockingSocketOps {
void testServerSocketAccept(int timeout) throws Exception {
VThreadRunner.run(() -> {
try (var listener = new ServerSocket(0)) {
try (var listener = new ServerSocket()) {
InetAddress loopback = InetAddress.getLoopbackAddress();
listener.bind(new InetSocketAddress(loopback, 0));
// schedule connect
var socket1 = new Socket();
ScheduledConnector.schedule(socket1, listener.getLocalSocketAddress(), DELAY);
// accept will block
SocketAddress remote = listener.getLocalSocketAddress();
runAfterParkedAsync(() -> socket1.connect(remote));
// accept should block
if (timeout > 0) {
assert timeout > DELAY;
listener.setSoTimeout(timeout);
}
var socket2 = listener.accept();
@ -403,22 +440,27 @@ public class BlockingSocketOps {
void testServerSocketAcceptAsyncClose(int timeout) throws Exception {
VThreadRunner.run(() -> {
try (var listener = new ServerSocket(0)) {
ScheduledCloser.schedule(listener, DELAY);
try (var listener = new ServerSocket()) {
InetAddress loopback = InetAddress.getLoopbackAddress();
listener.bind(new InetSocketAddress(loopback, 0));
// delayed close of listener
runAfterParkedAsync(listener::close);
// accept should block, then throw
if (timeout > 0) {
assert timeout > DELAY;
listener.setSoTimeout(timeout);
}
try {
listener.accept().close();
throw new RuntimeException("connection accepted???");
fail("connection accepted???");
} catch (SocketException expected) { }
}
});
}
/**
* Virtual thread interrupted while blocked in ServerSocket accept
* Virtual thread interrupted while blocked in ServerSocket accept.
*/
@Test
public void testServerSocketAcceptInterrupt1() throws Exception {
@ -426,7 +468,7 @@ public class BlockingSocketOps {
}
/**
* Virtual thread interrupted while blocked in ServerSocket accept with timeout
* Virtual thread interrupted while blocked in ServerSocket accept with timeout.
*/
@Test
public void testServerSocketAcceptInterrupt2() throws Exception {
@ -435,15 +477,21 @@ public class BlockingSocketOps {
void testServerSocketAcceptInterrupt(int timeout) throws Exception {
VThreadRunner.run(() -> {
try (var listener = new ServerSocket(0)) {
ScheduledInterrupter.schedule(Thread.currentThread(), DELAY);
try (var listener = new ServerSocket()) {
InetAddress loopback = InetAddress.getLoopbackAddress();
listener.bind(new InetSocketAddress(loopback, 0));
// delayed interrupt of current thread
Thread thisThread = Thread.currentThread();
runAfterParkedAsync(thisThread::interrupt);
// accept should block, then throw
if (timeout > 0) {
assert timeout > DELAY;
listener.setSoTimeout(timeout);
}
try {
listener.accept().close();
throw new RuntimeException("connection accepted???");
fail("connection accepted???");
} catch (SocketException expected) {
assertTrue(Thread.interrupted());
assertTrue(listener.isClosed());
@ -482,7 +530,7 @@ public class BlockingSocketOps {
}
/**
* Virtual thread blocks in DatagramSocket receive
* Virtual thread blocks in DatagramSocket receive.
*/
@Test
public void testDatagramSocketSendReceive2() throws Exception {
@ -490,7 +538,7 @@ public class BlockingSocketOps {
}
/**
* Virtual thread blocks in DatagramSocket receive with timeout
* Virtual thread blocks in DatagramSocket receive with timeout.
*/
@Test
public void testDatagramSocketSendReceive3() throws Exception {
@ -506,15 +554,14 @@ public class BlockingSocketOps {
s1.bind(new InetSocketAddress(lh, 0));
s2.bind(new InetSocketAddress(lh, 0));
// schedule send
// delayed send
byte[] bytes = "XXX".getBytes("UTF-8");
DatagramPacket p1 = new DatagramPacket(bytes, bytes.length);
p1.setSocketAddress(s2.getLocalSocketAddress());
ScheduledSender.schedule(s1, p1, DELAY);
runAfterParkedAsync(() -> s1.send(p1));
// receive should block
if (timeout > 0) {
assert timeout > DELAY;
s2.setSoTimeout(timeout);
}
byte[] ba = new byte[100];
@ -527,7 +574,7 @@ public class BlockingSocketOps {
}
/**
* Virtual thread blocks in DatagramSocket receive that times out
* Virtual thread blocks in DatagramSocket receive that times out.
*/
@Test
public void testDatagramSocketReceiveTimeout() throws Exception {
@ -535,7 +582,7 @@ public class BlockingSocketOps {
try (DatagramSocket s = new DatagramSocket(null)) {
InetAddress lh = InetAddress.getLoopbackAddress();
s.bind(new InetSocketAddress(lh, 0));
s.setSoTimeout(2000);
s.setSoTimeout(500);
byte[] ba = new byte[100];
DatagramPacket p = new DatagramPacket(ba, ba.length);
try {
@ -568,12 +615,11 @@ public class BlockingSocketOps {
InetAddress lh = InetAddress.getLoopbackAddress();
s.bind(new InetSocketAddress(lh, 0));
// schedule close
ScheduledCloser.schedule(s, DELAY);
// delayed close of s
runAfterParkedAsync(s::close);
// receive
// receive should block, then throw
if (timeout > 0) {
assert timeout > DELAY;
s.setSoTimeout(timeout);
}
try {
@ -595,7 +641,7 @@ public class BlockingSocketOps {
}
/**
* Virtual thread interrupted while blocked in DatagramSocket receive with timeout
* Virtual thread interrupted while blocked in DatagramSocket receive with timeout.
*/
@Test
public void testDatagramSocketReceiveInterrupt2() throws Exception {
@ -607,15 +653,15 @@ public class BlockingSocketOps {
try (DatagramSocket s = new DatagramSocket(null)) {
InetAddress lh = InetAddress.getLoopbackAddress();
s.bind(new InetSocketAddress(lh, 0));
// delayed interrupt of current thread
Thread thisThread = Thread.currentThread();
runAfterParkedAsync(thisThread::interrupt);
// receive should block, then throw
if (timeout > 0) {
assert timeout > DELAY;
s.setSoTimeout(timeout);
}
// schedule interrupt
ScheduledInterrupter.schedule(Thread.currentThread(), DELAY);
// receive
try {
byte[] ba = new byte[100];
DatagramPacket p = new DatagramPacket(ba, ba.length);
@ -629,8 +675,6 @@ public class BlockingSocketOps {
});
}
// -- supporting classes --
/**
* Creates a loopback connection
*/
@ -668,191 +712,32 @@ public class BlockingSocketOps {
}
}
/**
* Closes a socket after a delay
*/
static class ScheduledCloser implements Runnable {
private final Closeable c;
private final long delay;
ScheduledCloser(Closeable c, long delay) {
this.c = c;
this.delay = delay;
}
@Override
public void run() {
try {
Thread.sleep(delay);
c.close();
} catch (Exception e) { }
}
static void schedule(Closeable c, long delay) {
new Thread(new ScheduledCloser(c, delay)).start();
}
@FunctionalInterface
interface ThrowingRunnable {
void run() throws Exception;
}
/**
* Interrupts a thread after a delay
* Runs the given task asynchronously after the current virtual thread has parked.
* @return the thread started to run the task
*/
static class ScheduledInterrupter implements Runnable {
private final Thread thread;
private final long delay;
ScheduledInterrupter(Thread thread, long delay) {
this.thread = thread;
this.delay = delay;
}
@Override
public void run() {
static Thread runAfterParkedAsync(ThrowingRunnable task) {
Thread target = Thread.currentThread();
if (!target.isVirtual())
throw new WrongThreadException();
return Thread.ofPlatform().daemon().start(() -> {
try {
Thread.sleep(delay);
thread.interrupt();
} catch (Exception e) { }
}
static void schedule(Thread thread, long delay) {
new Thread(new ScheduledInterrupter(thread, delay)).start();
}
}
/**
* Reads from a socket, and to EOF, after a delay
*/
static class ScheduledReader implements Runnable {
private final Socket s;
private final boolean readAll;
private final long delay;
ScheduledReader(Socket s, boolean readAll, long delay) {
this.s = s;
this.readAll = readAll;
this.delay = delay;
}
@Override
public void run() {
try {
Thread.sleep(delay);
byte[] ba = new byte[8192];
InputStream in = s.getInputStream();
for (;;) {
int n = in.read(ba);
if (n == -1 || !readAll)
break;
Thread.State state = target.getState();
while (state != Thread.State.WAITING
&& state != Thread.State.TIMED_WAITING) {
Thread.sleep(20);
state = target.getState();
}
} catch (Exception e) { }
}
static void schedule(Socket s, boolean readAll, long delay) {
new Thread(new ScheduledReader(s, readAll, delay)).start();
}
}
/**
* Writes to a socket after a delay
*/
static class ScheduledWriter implements Runnable {
private final Socket s;
private final byte[] ba;
private final long delay;
ScheduledWriter(Socket s, byte[] ba, long delay) {
this.s = s;
this.ba = ba.clone();
this.delay = delay;
}
@Override
public void run() {
try {
Thread.sleep(delay);
s.getOutputStream().write(ba);
} catch (Exception e) { }
}
static void schedule(Socket s, byte[] ba, long delay) {
new Thread(new ScheduledWriter(s, ba, delay)).start();
}
}
/**
* Establish a connection to a socket address after a delay
*/
static class ScheduledConnector implements Runnable {
private final Socket socket;
private final SocketAddress address;
private final long delay;
ScheduledConnector(Socket socket, SocketAddress address, long delay) {
this.socket = socket;
this.address = address;
this.delay = delay;
}
@Override
public void run() {
try {
Thread.sleep(delay);
socket.connect(address);
} catch (Exception e) { }
}
static void schedule(Socket socket, SocketAddress address, long delay) {
new Thread(new ScheduledConnector(socket, address, delay)).start();
}
}
/**
* Sends a datagram to a target address after a delay
*/
static class ScheduledSender implements Runnable {
private final DatagramSocket socket;
private final DatagramPacket packet;
private final long delay;
ScheduledSender(DatagramSocket socket, DatagramPacket packet, long delay) {
this.socket = socket;
this.packet = packet;
this.delay = delay;
}
@Override
public void run() {
try {
Thread.sleep(delay);
socket.send(packet);
} catch (Exception e) { }
}
static void schedule(DatagramSocket socket, DatagramPacket packet, long delay) {
new Thread(new ScheduledSender(socket, packet, delay)).start();
}
}
/**
* Sends urgent data after a delay
*/
static class ScheduledUrgentData implements Runnable {
private final Socket s;
private final int data;
private final long delay;
ScheduledUrgentData(Socket s, int data, long delay) {
this.s = s;
this.data = data;
this.delay = delay;
}
@Override
public void run() {
try {
Thread.sleep(delay);
s.sendUrgentData(data);
} catch (Exception e) { }
}
static void scheduleUrgentData(Socket s, int data, long delay) {
new Thread(new ScheduledUrgentData(s, data, delay)).start();
}
Thread.sleep(20); // give a bit more time to release carrier
task.run();
} catch (Exception e) {
e.printStackTrace();
}
});
}
}

@ -24,18 +24,19 @@
/**
* @test id=default
* @bug 8284161
* @summary Basic tests of virtual threads doing blocking I/O with NIO channels
* @summary Test virtual threads doing blocking I/O on NIO channels
* @enablePreview
* @library /test/lib
* @run testng/othervm/timeout=300 BlockingChannelOps
* @run testng/othervm BlockingChannelOps
*/
/**
* @test id=indirect-register
* @summary Basic tests of virtual threads doing blocking I/O with NIO channels
* @test id=direct-register
* @summary Test virtual threads doing blocking I/O on NIO channels and with
* the I/O poller configured to use direct registration
* @enablePreview
* @library /test/lib
* @run testng/othervm/timeout=300 -Djdk.useDirectRegister BlockingChannelOps
* @run testng/othervm -Djdk.useDirectRegister BlockingChannelOps
*/
/**
@ -43,7 +44,7 @@
* @requires vm.continuations
* @enablePreview
* @library /test/lib
* @run testng/othervm/timeout=300 -XX:+UnlockExperimentalVMOptions -XX:-VMContinuations BlockingChannelOps
* @run testng/othervm -XX:+UnlockExperimentalVMOptions -XX:-VMContinuations BlockingChannelOps
*/
import java.io.Closeable;
@ -57,6 +58,7 @@ import java.net.SocketException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.DatagramChannel;
import java.nio.channels.Pipe;
import java.nio.channels.ReadableByteChannel;
@ -64,12 +66,12 @@ import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.WritableByteChannel;
import jdk.test.lib.Platform;
import jdk.test.lib.thread.VThreadRunner;
import org.testng.annotations.Test;
import static org.testng.Assert.*;
public class BlockingChannelOps {
private static final long DELAY = 4000;
/**
* SocketChannel read/write, no blocking.
@ -81,12 +83,12 @@ public class BlockingChannelOps {
SocketChannel sc1 = connection.channel1();
SocketChannel sc2 = connection.channel2();
// write should not block
// write to sc1
ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
int n = sc1.write(bb);
assertTrue(n > 0);
// read should not block
// read from sc2 should not block
bb = ByteBuffer.allocate(10);
n = sc2.read(bb);
assertTrue(n > 0);
@ -105,15 +107,15 @@ public class BlockingChannelOps {
SocketChannel sc1 = connection.channel1();
SocketChannel sc2 = connection.channel2();
// schedule write
ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
ScheduledWriter.schedule(sc1, bb, DELAY);
// delayed write to sc1
ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
runAfterParkedAsync(() -> sc1.write(bb1));
// read should block
bb = ByteBuffer.allocate(10);
int n = sc2.read(bb);
// read from sc2 should block
ByteBuffer bb2 = ByteBuffer.allocate(10);
int n = sc2.read(bb2);
assertTrue(n > 0);
assertTrue(bb.get(0) == 'X');
assertTrue(bb2.get(0) == 'X');
}
});
}
@ -128,16 +130,20 @@ public class BlockingChannelOps {
SocketChannel sc1 = connection.channel1();
SocketChannel sc2 = connection.channel2();
// schedule thread to read to EOF
ScheduledReader.schedule(sc2, true, DELAY);
// delayed read from sc2 to EOF
Thread reader = runAfterParkedAsync(() -> readToEOF(sc2));
// write should block
// write to sc1 should block
ByteBuffer bb = ByteBuffer.allocate(100*1024);
for (int i=0; i<1000; i++) {
int n = sc1.write(bb);
assertTrue(n > 0);
bb.clear();
}
sc1.close();
// wait for reader to finish
reader.join();
}
});
}
@ -150,10 +156,10 @@ public class BlockingChannelOps {
VThreadRunner.run(() -> {
try (var connection = new Connection()) {
SocketChannel sc = connection.channel1();
ScheduledCloser.schedule(sc, DELAY);
runAfterParkedAsync(sc::close);
try {
int n = sc.read(ByteBuffer.allocate(100));
throw new RuntimeException("read returned " + n);
fail("read returned " + n);
} catch (AsynchronousCloseException expected) { }
}
});
@ -167,10 +173,14 @@ public class BlockingChannelOps {
VThreadRunner.run(() -> {
try (var connection = new Connection()) {
SocketChannel sc = connection.channel1();
ScheduledInterrupter.schedule(Thread.currentThread(), DELAY);
// delayed interrupt of current thread
Thread thisThread = Thread.currentThread();
runAfterParkedAsync(thisThread::interrupt);
try {
int n = sc.read(ByteBuffer.allocate(100));
throw new RuntimeException("read returned " + n);
fail("read returned " + n);
} catch (ClosedByInterruptException expected) {
assertTrue(Thread.interrupted());
}
@ -186,7 +196,7 @@ public class BlockingChannelOps {
VThreadRunner.run(() -> {
try (var connection = new Connection()) {
SocketChannel sc = connection.channel1();
ScheduledCloser.schedule(sc, DELAY);
runAfterParkedAsync(sc::close);
try {
ByteBuffer bb = ByteBuffer.allocate(100*1024);
for (;;) {
@ -194,7 +204,12 @@ public class BlockingChannelOps {
assertTrue(n > 0);
bb.clear();
}
} catch (AsynchronousCloseException expected) { }
} catch (AsynchronousCloseException e) {
// expected
} catch (ClosedChannelException e) {
// on macOS the write loop may block more than once
if (!Platform.isOSX()) throw e;
}
}
});
}
@ -207,7 +222,11 @@ public class BlockingChannelOps {
VThreadRunner.run(() -> {
try (var connection = new Connection()) {
SocketChannel sc = connection.channel1();
ScheduledInterrupter.schedule(Thread.currentThread(), DELAY);
// delayed interrupt of current thread
Thread thisThread = Thread.currentThread();
runAfterParkedAsync(thisThread::interrupt);
try {
ByteBuffer bb = ByteBuffer.allocate(100*1024);
for (;;) {
@ -215,8 +234,11 @@ public class BlockingChannelOps {
assertTrue(n > 0);
bb.clear();
}
} catch (ClosedByInterruptException expected) {
} catch (ClosedByInterruptException e) {
assertTrue(Thread.interrupted());
} catch (ClosedChannelException e) {
// on macOS the write loop may block more than once
if (!Platform.isOSX()) throw e;
}
}
});
@ -244,15 +266,14 @@ public class BlockingChannelOps {
SocketChannel sc1 = connection.channel1();
SocketChannel sc2 = connection.channel2();
// schedule write
// delayed write to sc1
ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
ScheduledWriter.schedule(sc1, bb, DELAY);
runAfterParkedAsync(() -> sc1.write(bb));
// read should block
// read from sc2 should block
byte[] array = new byte[100];
if (timeout > 0)
sc2.socket().setSoTimeout(timeout);
byte[] array = new byte[100];
int n = sc2.socket().getInputStream().read(array);
assertTrue(n > 0);
assertTrue(array[0] == 'X');
@ -286,8 +307,11 @@ public class BlockingChannelOps {
try (var ssc = ServerSocketChannel.open()) {
ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
var sc1 = SocketChannel.open();
ScheduledConnector.schedule(sc1, ssc.getLocalAddress(), DELAY);
// accept will block
// delayed connect
runAfterParkedAsync(() -> sc1.connect(ssc.getLocalAddress()));
// accept should block
var sc2 = ssc.accept();
sc1.close();
sc2.close();
@ -304,11 +328,11 @@ public class BlockingChannelOps {
try (var ssc = ServerSocketChannel.open()) {
InetAddress lh = InetAddress.getLoopbackAddress();
ssc.bind(new InetSocketAddress(lh, 0));
ScheduledCloser.schedule(ssc, DELAY);
runAfterParkedAsync(ssc::close);
try {
SocketChannel sc = ssc.accept();
sc.close();
throw new RuntimeException("connection accepted???");
fail("connection accepted???");
} catch (AsynchronousCloseException expected) { }
}
});
@ -323,11 +347,15 @@ public class BlockingChannelOps {
try (var ssc = ServerSocketChannel.open()) {
InetAddress lh = InetAddress.getLoopbackAddress();
ssc.bind(new InetSocketAddress(lh, 0));
ScheduledInterrupter.schedule(Thread.currentThread(), DELAY);
// delayed interrupt of current thread
Thread thisThread = Thread.currentThread();
runAfterParkedAsync(thisThread::interrupt);
try {
SocketChannel sc = ssc.accept();
sc.close();
throw new RuntimeException("connection accepted???");
fail("connection accepted???");
} catch (ClosedByInterruptException expected) {
assertTrue(Thread.interrupted());
}
@ -355,15 +383,16 @@ public class BlockingChannelOps {
VThreadRunner.run(() -> {
try (var ssc = ServerSocketChannel.open()) {
ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
var sc1 = SocketChannel.open();
ScheduledConnector.schedule(sc1, ssc.getLocalAddress(), DELAY);
var sc = SocketChannel.open();
// delayed connect
runAfterParkedAsync(() -> sc.connect(ssc.getLocalAddress()));
// accept should block
if (timeout > 0)
ssc.socket().setSoTimeout(timeout);
// accept will block
Socket s = ssc.socket().accept();
sc1.close();
sc.close();
s.close();
}
});
@ -406,14 +435,14 @@ public class BlockingChannelOps {
InetAddress lh = InetAddress.getLoopbackAddress();
dc2.bind(new InetSocketAddress(lh, 0));
// schedule send
ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
ScheduledSender.schedule(dc1, bb, dc2.getLocalAddress(), DELAY);
// delayed send from sc1
ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
runAfterParkedAsync(() -> dc1.send(bb1, dc2.getLocalAddress()));
// read should block
bb = ByteBuffer.allocate(10);
dc2.receive(bb);
assertTrue(bb.get(0) == 'X');
// read from dc2 should block
ByteBuffer bb2 = ByteBuffer.allocate(10);
dc2.receive(bb2);
assertTrue(bb2.get(0) == 'X');
}
});
}
@ -427,10 +456,10 @@ public class BlockingChannelOps {
try (DatagramChannel dc = DatagramChannel.open()) {
InetAddress lh = InetAddress.getLoopbackAddress();
dc.bind(new InetSocketAddress(lh, 0));
ScheduledCloser.schedule(dc, DELAY);
runAfterParkedAsync(dc::close);
try {
dc.receive(ByteBuffer.allocate(100));
throw new RuntimeException("receive returned");
fail("receive returned");
} catch (AsynchronousCloseException expected) { }
}
});
@ -445,10 +474,14 @@ public class BlockingChannelOps {
try (DatagramChannel dc = DatagramChannel.open()) {
InetAddress lh = InetAddress.getLoopbackAddress();
dc.bind(new InetSocketAddress(lh, 0));
ScheduledInterrupter.schedule(Thread.currentThread(), DELAY);
// delayed interrupt of current thread
Thread thisThread = Thread.currentThread();
runAfterParkedAsync(thisThread::interrupt);
try {
dc.receive(ByteBuffer.allocate(100));
throw new RuntimeException("receive returned");
fail("receive returned");
} catch (ClosedByInterruptException expected) {
assertTrue(Thread.interrupted());
}
@ -480,9 +513,9 @@ public class BlockingChannelOps {
InetAddress lh = InetAddress.getLoopbackAddress();
dc2.bind(new InetSocketAddress(lh, 0));
// schedule send
// delayed send from dc2
ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
ScheduledSender.schedule(dc1, bb, dc2.getLocalAddress(), DELAY);
runAfterParkedAsync(() -> dc1.send(bb, dc2.getLocalAddress()));
// receive should block
byte[] array = new byte[100];
@ -523,8 +556,9 @@ public class BlockingChannelOps {
if (timeout > 0)
dc.socket().setSoTimeout(timeout);
// schedule channel/socket to be asynchronously closed
ScheduledCloser.schedule(dc, DELAY);
// delayed close of channel/socket
runAfterParkedAsync(dc::close);
assertThrows(SocketException.class, () -> dc.socket().receive(p));
}
});
@ -558,8 +592,10 @@ public class BlockingChannelOps {
if (timeout > 0)
dc.socket().setSoTimeout(timeout);
// receive should block
ScheduledInterrupter.schedule(Thread.currentThread(), DELAY);
// delayed interrupt of current thread
Thread thisThread = Thread.currentThread();
runAfterParkedAsync(thisThread::interrupt);
try {
dc.socket().receive(p);
fail();
@ -604,15 +640,15 @@ public class BlockingChannelOps {
try (Pipe.SinkChannel sink = p.sink();
Pipe.SourceChannel source = p.source()) {
// schedule write
ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
ScheduledWriter.schedule(sink, bb, DELAY);
// delayed write from sink
ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
runAfterParkedAsync(() -> sink.write(bb1));
// read should block
bb = ByteBuffer.allocate(10);
int n = source.read(bb);
ByteBuffer bb2 = ByteBuffer.allocate(10);
int n = source.read(bb2);
assertTrue(n > 0);
assertTrue(bb.get(0) == 'X');
assertTrue(bb2.get(0) == 'X');
}
});
}
@ -627,16 +663,20 @@ public class BlockingChannelOps {
try (Pipe.SinkChannel sink = p.sink();
Pipe.SourceChannel source = p.source()) {
// schedule thread to read to EOF
ScheduledReader.schedule(source, true, DELAY);
// delayed read from source to EOF
Thread reader = runAfterParkedAsync(() -> readToEOF(source));
// write should block
// write to sink should block
ByteBuffer bb = ByteBuffer.allocate(100*1024);
for (int i=0; i<1000; i++) {
int n = sink.write(bb);
assertTrue(n > 0);
bb.clear();
}
sink.close();
// wait for reader to finish
reader.join();
}
});
}
@ -649,10 +689,10 @@ public class BlockingChannelOps {
VThreadRunner.run(() -> {
Pipe p = Pipe.open();
try (Pipe.SourceChannel source = p.source()) {
ScheduledCloser.schedule(source, DELAY);
runAfterParkedAsync(source::close);
try {
int n = source.read(ByteBuffer.allocate(100));
throw new RuntimeException("read returned " + n);
fail("read returned " + n);
} catch (AsynchronousCloseException expected) { }
}
});
@ -666,10 +706,14 @@ public class BlockingChannelOps {
VThreadRunner.run(() -> {
Pipe p = Pipe.open();
try (Pipe.SourceChannel source = p.source()) {
ScheduledInterrupter.schedule(Thread.currentThread(), DELAY);
// delayed interrupt of current thread
Thread thisThread = Thread.currentThread();
runAfterParkedAsync(thisThread::interrupt);
try {
int n = source.read(ByteBuffer.allocate(100));
throw new RuntimeException("read returned " + n);
fail("read returned " + n);
} catch (ClosedByInterruptException expected) {
assertTrue(Thread.interrupted());
}
@ -685,7 +729,7 @@ public class BlockingChannelOps {
VThreadRunner.run(() -> {
Pipe p = Pipe.open();
try (Pipe.SinkChannel sink = p.sink()) {
ScheduledCloser.schedule(sink, DELAY);
runAfterParkedAsync(sink::close);
try {
ByteBuffer bb = ByteBuffer.allocate(100*1024);
for (;;) {
@ -706,7 +750,11 @@ public class BlockingChannelOps {
VThreadRunner.run(() -> {
Pipe p = Pipe.open();
try (Pipe.SinkChannel sink = p.sink()) {
ScheduledInterrupter.schedule(Thread.currentThread(), DELAY);
// delayed interrupt of current thread
Thread thisThread = Thread.currentThread();
runAfterParkedAsync(thisThread::interrupt);
try {
ByteBuffer bb = ByteBuffer.allocate(100*1024);
for (;;) {
@ -721,8 +769,6 @@ public class BlockingChannelOps {
});
}
// -- supporting classes --
/**
* Creates a loopback connection
*/
@ -760,166 +806,42 @@ public class BlockingChannelOps {
}
/**
* Closes a channel after a delay
* Read from a channel until all bytes have been read or an I/O error occurs.
*/
static class ScheduledCloser implements Runnable {
private final Closeable c;
private final long delay;
ScheduledCloser(Closeable c, long delay) {
this.c = c;
this.delay = delay;
}
@Override
public void run() {
try {
Thread.sleep(delay);
c.close();
} catch (Exception e) { }
}
static void schedule(Closeable c, long delay) {
new Thread(new ScheduledCloser(c, delay)).start();
static void readToEOF(ReadableByteChannel rbc) throws IOException {
ByteBuffer bb = ByteBuffer.allocate(16*1024);
int n;
while ((n = rbc.read(bb)) > 0) {
bb.clear();
}
}
/**
* Interrupts a thread after a delay
*/
static class ScheduledInterrupter implements Runnable {
private final Thread thread;
private final long delay;
ScheduledInterrupter(Thread thread, long delay) {
this.thread = thread;
this.delay = delay;
}
@Override
public void run() {
try {
Thread.sleep(delay);
thread.interrupt();
} catch (Exception e) { }
}
static void schedule(Thread thread, long delay) {
new Thread(new ScheduledInterrupter(thread, delay)).start();
}
@FunctionalInterface
interface ThrowingRunnable {
void run() throws Exception;
}
/**
* Establish a connection to a socket address after a delay
* Runs the given task asynchronously after the current virtual thread has parked.
* @return the thread started to run the task
*/
static class ScheduledConnector implements Runnable {
private final SocketChannel sc;
private final SocketAddress address;
private final long delay;
ScheduledConnector(SocketChannel sc, SocketAddress address, long delay) {
this.sc = sc;
this.address = address;
this.delay = delay;
}
@Override
public void run() {
static Thread runAfterParkedAsync(ThrowingRunnable task) {
Thread target = Thread.currentThread();
if (!target.isVirtual())
throw new WrongThreadException();
return Thread.ofPlatform().daemon().start(() -> {
try {
Thread.sleep(delay);
sc.connect(address);
} catch (Exception e) { }
}
static void schedule(SocketChannel sc, SocketAddress address, long delay) {
new Thread(new ScheduledConnector(sc, address, delay)).start();
}
}
/**
* Reads from a connection, and to EOF, after a delay
*/
static class ScheduledReader implements Runnable {
private final ReadableByteChannel rbc;
private final boolean readAll;
private final long delay;
ScheduledReader(ReadableByteChannel rbc, boolean readAll, long delay) {
this.rbc = rbc;
this.readAll = readAll;
this.delay = delay;
}
@Override
public void run() {
try {
Thread.sleep(delay);
ByteBuffer bb = ByteBuffer.allocate(100*1024);
for (;;) {
int n = rbc.read(bb);
if (n == -1 || !readAll)
break;
bb.clear();
Thread.State state = target.getState();
while (state != Thread.State.WAITING
&& state != Thread.State.TIMED_WAITING) {
Thread.sleep(20);
state = target.getState();
}
} catch (Exception e) { }
}
static void schedule(ReadableByteChannel rbc, boolean readAll, long delay) {
new Thread(new ScheduledReader(rbc, readAll, delay)).start();
}
}
/**
* Writes to a connection after a delay
*/
static class ScheduledWriter implements Runnable {
private final WritableByteChannel wbc;
private final ByteBuffer buf;
private final long delay;
ScheduledWriter(WritableByteChannel wbc, ByteBuffer buf, long delay) {
this.wbc = wbc;
this.buf = buf;
this.delay = delay;
}
@Override
public void run() {
try {
Thread.sleep(delay);
wbc.write(buf);
} catch (Exception e) { }
}
static void schedule(WritableByteChannel wbc, ByteBuffer buf, long delay) {
new Thread(new ScheduledWriter(wbc, buf, delay)).start();
}
}
/**
* Sends a datagram to a target address after a delay
*/
static class ScheduledSender implements Runnable {
private final DatagramChannel dc;
private final ByteBuffer buf;
private final SocketAddress address;
private final long delay;
ScheduledSender(DatagramChannel dc, ByteBuffer buf, SocketAddress address, long delay) {
this.dc = dc;
this.buf = buf;
this.address = address;
this.delay = delay;
}
@Override
public void run() {
try {
Thread.sleep(delay);
dc.send(buf, address);
} catch (Exception e) { }
}
static void schedule(DatagramChannel dc, ByteBuffer buf,
SocketAddress address, long delay) {
new Thread(new ScheduledSender(dc, buf, address, delay)).start();
}
Thread.sleep(20); // give a bit more time to release carrier
task.run();
} catch (Exception e) {
e.printStackTrace();
}
});
}
}