jdk-24/test/jdk/java/net/vthread/BlockingSocketOps.java
2023-11-21 14:05:32 +00:00

741 lines
23 KiB
Java

/*
* Copyright (c) 2018, 2023, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
/**
* @test id=default
* @bug 8284161
* @summary Test virtual threads doing blocking I/O on java.net Sockets
* @library /test/lib
* @run junit BlockingSocketOps
*/
/**
* @test id=poller-modes
* @requires (os.family == "linux") | (os.family == "mac")
* @library /test/lib
* @run junit/othervm -Djdk.pollerMode=1 BlockingSocketOps
* @run junit/othervm -Djdk.pollerMode=2 BlockingSocketOps
*/
/**
* @test id=no-vmcontinuations
* @requires vm.continuations
* @library /test/lib
* @run junit/othervm -XX:+UnlockExperimentalVMOptions -XX:-VMContinuations BlockingSocketOps
*/
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import jdk.test.lib.thread.VThreadRunner;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.*;
class BlockingSocketOps {
/**
* Socket read/write, no blocking.
*/
@Test
void testSocketReadWrite1() throws Exception {
VThreadRunner.run(() -> {
try (var connection = new Connection()) {
Socket s1 = connection.socket1();
Socket s2 = connection.socket2();
// write should not block
byte[] ba = "XXX".getBytes("UTF-8");
s1.getOutputStream().write(ba);
// read should not block
ba = new byte[10];
int n = s2.getInputStream().read(ba);
assertTrue(n > 0);
assertTrue(ba[0] == 'X');
}
});
}
/**
* Virtual thread blocks in read.
*/
@Test
void testSocketRead1() throws Exception {
testSocketRead(0);
}
/**
* Virtual thread blocks in timed read.
*/
@Test
void testSocketRead2() throws Exception {
testSocketRead(60_000);
}
void testSocketRead(int timeout) throws Exception {
VThreadRunner.run(() -> {
try (var connection = new Connection()) {
Socket s1 = connection.socket1();
Socket s2 = connection.socket2();
// delayed write from sc1
byte[] ba1 = "XXX".getBytes("UTF-8");
runAfterParkedAsync(() -> s1.getOutputStream().write(ba1));
// read from sc2 should block
if (timeout > 0) {
s2.setSoTimeout(timeout);
}
byte[] ba2 = new byte[10];
int n = s2.getInputStream().read(ba2);
assertTrue(n > 0);
assertTrue(ba2[0] == 'X');
}
});
}
/**
* Virtual thread blocks in write.
*/
@Test
void testSocketWrite1() throws Exception {
VThreadRunner.run(() -> {
try (var connection = new Connection()) {
Socket s1 = connection.socket1();
Socket s2 = connection.socket2();
// delayed read from s2 to EOF
InputStream in = s2.getInputStream();
Thread reader = runAfterParkedAsync(() ->
in.transferTo(OutputStream.nullOutputStream()));
// write should block
byte[] ba = new byte[100*1024];
try (OutputStream out = s1.getOutputStream()) {
for (int i = 0; i < 1000; i++) {
out.write(ba);
}
}
// wait for reader to finish
reader.join();
}
});
}
/**
* Virtual thread blocks in read, peer closes connection gracefully.
*/
@Test
void testSocketReadPeerClose1() throws Exception {
VThreadRunner.run(() -> {
try (var connection = new Connection()) {
Socket s1 = connection.socket1();
Socket s2 = connection.socket2();
// delayed close of s2
runAfterParkedAsync(s2::close);
// read from s1 should block, then read -1
int n = s1.getInputStream().read();
assertTrue(n == -1);
}
});
}
/**
* Virtual thread blocks in read, peer closes connection abruptly.
*/
@Test
void testSocketReadPeerClose2() throws Exception {
VThreadRunner.run(() -> {
try (var connection = new Connection()) {
Socket s1 = connection.socket1();
Socket s2 = connection.socket2();
// delayed abrupt close of s2
s2.setSoLinger(true, 0);
runAfterParkedAsync(s2::close);
// read from s1 should block, then throw
try {
int n = s1.getInputStream().read();
fail("read " + n);
} catch (IOException ioe) {
// expected
}
}
});
}
/**
* Socket close while virtual thread blocked in read.
*/
@Test
void testSocketReadAsyncClose1() throws Exception {
testSocketReadAsyncClose(0);
}
/**
* Socket close while virtual thread blocked in timed read.
*/
@Test
void testSocketReadAsyncClose2() throws Exception {
testSocketReadAsyncClose(0);
}
void testSocketReadAsyncClose(int timeout) throws Exception {
VThreadRunner.run(() -> {
try (var connection = new Connection()) {
Socket s = connection.socket1();
// delayed close of s
runAfterParkedAsync(s::close);
// read from s should block, then throw
if (timeout > 0) {
s.setSoTimeout(timeout);
}
try {
int n = s.getInputStream().read();
fail("read " + n);
} catch (SocketException expected) { }
}
});
}
/**
* Virtual thread interrupted while blocked in Socket read.
*/
@Test
void testSocketReadInterrupt1() throws Exception {
testSocketReadInterrupt(0);
}
/**
* Virtual thread interrupted while blocked in Socket read with timeout
*/
@Test
void testSocketReadInterrupt2() throws Exception {
testSocketReadInterrupt(60_000);
}
void testSocketReadInterrupt(int timeout) throws Exception {
VThreadRunner.run(() -> {
try (var connection = new Connection()) {
Socket s = connection.socket1();
// delayed interrupt of current thread
Thread thisThread = Thread.currentThread();
runAfterParkedAsync(thisThread::interrupt);
// read from s should block, then throw
if (timeout > 0) {
s.setSoTimeout(timeout);
}
try {
int n = s.getInputStream().read();
fail("read " + n);
} catch (SocketException expected) {
assertTrue(Thread.interrupted());
assertTrue(s.isClosed());
}
}
});
}
/**
* Socket close while virtual thread blocked in write.
*/
@Test
void testSocketWriteAsyncClose() throws Exception {
VThreadRunner.run(() -> {
try (var connection = new Connection()) {
Socket s = connection.socket1();
// delayedclose of s
runAfterParkedAsync(s::close);
// write to s should block, then throw
try {
byte[] ba = new byte[100*1024];
OutputStream out = s.getOutputStream();
for (;;) {
out.write(ba);
}
} catch (SocketException expected) { }
}
});
}
/**
* Virtual thread interrupted while blocked in Socket write.
*/
@Test
void testSocketWriteInterrupt() throws Exception {
VThreadRunner.run(() -> {
try (var connection = new Connection()) {
Socket s = connection.socket1();
// delayed interrupt of current thread
Thread thisThread = Thread.currentThread();
runAfterParkedAsync(thisThread::interrupt);
// write to s should block, then throw
try {
byte[] ba = new byte[100*1024];
OutputStream out = s.getOutputStream();
for (;;) {
out.write(ba);
}
} catch (SocketException expected) {
assertTrue(Thread.interrupted());
assertTrue(s.isClosed());
}
}
});
}
/**
* Virtual thread reading urgent data when SO_OOBINLINE is enabled.
*/
@Test
void testSocketReadUrgentData() throws Exception {
VThreadRunner.run(() -> {
try (var connection = new Connection()) {
Socket s1 = connection.socket1();
Socket s2 = connection.socket2();
// urgent data should be received
runAfterParkedAsync(() -> s2.sendUrgentData('X'));
// read should block, then read the OOB byte
s1.setOOBInline(true);
byte[] ba = new byte[10];
int n = s1.getInputStream().read(ba);
assertTrue(n == 1);
assertTrue(ba[0] == 'X');
// urgent data should not be received
s1.setOOBInline(false);
s1.setSoTimeout(500);
s2.sendUrgentData('X');
try {
s1.getInputStream().read(ba);
fail();
} catch (SocketTimeoutException expected) { }
}
});
}
/**
* ServerSocket accept, no blocking.
*/
@Test
void testServerSocketAccept1() throws Exception {
VThreadRunner.run(() -> {
try (var listener = new ServerSocket()) {
InetAddress loopback = InetAddress.getLoopbackAddress();
listener.bind(new InetSocketAddress(loopback, 0));
// establish connection
var socket1 = new Socket(loopback, listener.getLocalPort());
// accept should not block
var socket2 = listener.accept();
socket1.close();
socket2.close();
}
});
}
/**
* Virtual thread blocks in accept.
*/
@Test
void testServerSocketAccept2() throws Exception {
testServerSocketAccept(0);
}
/**
* Virtual thread blocks in timed accept.
*/
@Test
void testServerSocketAccept3() throws Exception {
testServerSocketAccept(60_000);
}
void testServerSocketAccept(int timeout) throws Exception {
VThreadRunner.run(() -> {
try (var listener = new ServerSocket()) {
InetAddress loopback = InetAddress.getLoopbackAddress();
listener.bind(new InetSocketAddress(loopback, 0));
// schedule connect
var socket1 = new Socket();
SocketAddress remote = listener.getLocalSocketAddress();
runAfterParkedAsync(() -> socket1.connect(remote));
// accept should block
if (timeout > 0) {
listener.setSoTimeout(timeout);
}
var socket2 = listener.accept();
socket1.close();
socket2.close();
}
});
}
/**
* ServerSocket close while virtual thread blocked in accept.
*/
@Test
void testServerSocketAcceptAsyncClose1() throws Exception {
testServerSocketAcceptAsyncClose(0);
}
/**
* ServerSocket close while virtual thread blocked in timed accept.
*/
@Test
void testServerSocketAcceptAsyncClose2() throws Exception {
testServerSocketAcceptAsyncClose(60_000);
}
void testServerSocketAcceptAsyncClose(int timeout) throws Exception {
VThreadRunner.run(() -> {
try (var listener = new ServerSocket()) {
InetAddress loopback = InetAddress.getLoopbackAddress();
listener.bind(new InetSocketAddress(loopback, 0));
// delayed close of listener
runAfterParkedAsync(listener::close);
// accept should block, then throw
if (timeout > 0) {
listener.setSoTimeout(timeout);
}
try {
listener.accept().close();
fail("connection accepted???");
} catch (SocketException expected) { }
}
});
}
/**
* Virtual thread interrupted while blocked in ServerSocket accept.
*/
@Test
void testServerSocketAcceptInterrupt1() throws Exception {
testServerSocketAcceptInterrupt(0);
}
/**
* Virtual thread interrupted while blocked in ServerSocket accept with timeout.
*/
@Test
void testServerSocketAcceptInterrupt2() throws Exception {
testServerSocketAcceptInterrupt(60_000);
}
void testServerSocketAcceptInterrupt(int timeout) throws Exception {
VThreadRunner.run(() -> {
try (var listener = new ServerSocket()) {
InetAddress loopback = InetAddress.getLoopbackAddress();
listener.bind(new InetSocketAddress(loopback, 0));
// delayed interrupt of current thread
Thread thisThread = Thread.currentThread();
runAfterParkedAsync(thisThread::interrupt);
// accept should block, then throw
if (timeout > 0) {
listener.setSoTimeout(timeout);
}
try {
listener.accept().close();
fail("connection accepted???");
} catch (SocketException expected) {
assertTrue(Thread.interrupted());
assertTrue(listener.isClosed());
}
}
});
}
/**
* DatagramSocket receive/send, no blocking.
*/
@Test
void testDatagramSocketSendReceive1() throws Exception {
VThreadRunner.run(() -> {
try (DatagramSocket s1 = new DatagramSocket(null);
DatagramSocket s2 = new DatagramSocket(null)) {
InetAddress lh = InetAddress.getLoopbackAddress();
s1.bind(new InetSocketAddress(lh, 0));
s2.bind(new InetSocketAddress(lh, 0));
// send should not block
byte[] bytes = "XXX".getBytes("UTF-8");
DatagramPacket p1 = new DatagramPacket(bytes, bytes.length);
p1.setSocketAddress(s2.getLocalSocketAddress());
s1.send(p1);
// receive should not block
byte[] ba = new byte[100];
DatagramPacket p2 = new DatagramPacket(ba, ba.length);
s2.receive(p2);
assertEquals(s1.getLocalSocketAddress(), p2.getSocketAddress());
assertTrue(ba[0] == 'X');
}
});
}
/**
* Virtual thread blocks in DatagramSocket receive.
*/
@Test
void testDatagramSocketSendReceive2() throws Exception {
testDatagramSocketSendReceive(0);
}
/**
* Virtual thread blocks in DatagramSocket receive with timeout.
*/
@Test
void testDatagramSocketSendReceive3() throws Exception {
testDatagramSocketSendReceive(60_000);
}
private void testDatagramSocketSendReceive(int timeout) throws Exception {
VThreadRunner.run(() -> {
try (DatagramSocket s1 = new DatagramSocket(null);
DatagramSocket s2 = new DatagramSocket(null)) {
InetAddress lh = InetAddress.getLoopbackAddress();
s1.bind(new InetSocketAddress(lh, 0));
s2.bind(new InetSocketAddress(lh, 0));
// delayed send
byte[] bytes = "XXX".getBytes("UTF-8");
DatagramPacket p1 = new DatagramPacket(bytes, bytes.length);
p1.setSocketAddress(s2.getLocalSocketAddress());
runAfterParkedAsync(() -> s1.send(p1));
// receive should block
if (timeout > 0) {
s2.setSoTimeout(timeout);
}
byte[] ba = new byte[100];
DatagramPacket p2 = new DatagramPacket(ba, ba.length);
s2.receive(p2);
assertEquals(s1.getLocalSocketAddress(), p2.getSocketAddress());
assertTrue(ba[0] == 'X');
}
});
}
/**
* Virtual thread blocks in DatagramSocket receive that times out.
*/
@Test
void testDatagramSocketReceiveTimeout() throws Exception {
VThreadRunner.run(() -> {
try (DatagramSocket s = new DatagramSocket(null)) {
InetAddress lh = InetAddress.getLoopbackAddress();
s.bind(new InetSocketAddress(lh, 0));
s.setSoTimeout(500);
byte[] ba = new byte[100];
DatagramPacket p = new DatagramPacket(ba, ba.length);
try {
s.receive(p);
fail();
} catch (SocketTimeoutException expected) { }
}
});
}
/**
* DatagramSocket close while virtual thread blocked in receive.
*/
@Test
void testDatagramSocketReceiveAsyncClose1() throws Exception {
testDatagramSocketReceiveAsyncClose(0);
}
/**
* DatagramSocket close while virtual thread blocked with timeout.
*/
@Test
void testDatagramSocketReceiveAsyncClose2() throws Exception {
testDatagramSocketReceiveAsyncClose(60_000);
}
private void testDatagramSocketReceiveAsyncClose(int timeout) throws Exception {
VThreadRunner.run(() -> {
try (DatagramSocket s = new DatagramSocket(null)) {
InetAddress lh = InetAddress.getLoopbackAddress();
s.bind(new InetSocketAddress(lh, 0));
// delayed close of s
runAfterParkedAsync(s::close);
// receive should block, then throw
if (timeout > 0) {
s.setSoTimeout(timeout);
}
try {
byte[] ba = new byte[100];
DatagramPacket p = new DatagramPacket(ba, ba.length);
s.receive(p);
fail();
} catch (SocketException expected) { }
}
});
}
/**
* Virtual thread interrupted while blocked in DatagramSocket receive.
*/
@Test
void testDatagramSocketReceiveInterrupt1() throws Exception {
testDatagramSocketReceiveInterrupt(0);
}
/**
* Virtual thread interrupted while blocked in DatagramSocket receive with timeout.
*/
@Test
void testDatagramSocketReceiveInterrupt2() throws Exception {
testDatagramSocketReceiveInterrupt(60_000);
}
private void testDatagramSocketReceiveInterrupt(int timeout) throws Exception {
VThreadRunner.run(() -> {
try (DatagramSocket s = new DatagramSocket(null)) {
InetAddress lh = InetAddress.getLoopbackAddress();
s.bind(new InetSocketAddress(lh, 0));
// delayed interrupt of current thread
Thread thisThread = Thread.currentThread();
runAfterParkedAsync(thisThread::interrupt);
// receive should block, then throw
if (timeout > 0) {
s.setSoTimeout(timeout);
}
try {
byte[] ba = new byte[100];
DatagramPacket p = new DatagramPacket(ba, ba.length);
s.receive(p);
fail();
} catch (SocketException expected) {
assertTrue(Thread.interrupted());
assertTrue(s.isClosed());
}
}
});
}
/**
* Creates a loopback connection
*/
static class Connection implements Closeable {
private final Socket s1;
private final Socket s2;
Connection() throws IOException {
var lh = InetAddress.getLoopbackAddress();
try (var listener = new ServerSocket()) {
listener.bind(new InetSocketAddress(lh, 0));
Socket s1 = new Socket();
Socket s2;
try {
s1.connect(listener.getLocalSocketAddress());
s2 = listener.accept();
} catch (IOException ioe) {
s1.close();
throw ioe;
}
this.s1 = s1;
this.s2 = s2;
}
}
Socket socket1() {
return s1;
}
Socket socket2() {
return s2;
}
@Override
public void close() throws IOException {
s1.close();
s2.close();
}
}
@FunctionalInterface
interface ThrowingRunnable {
void run() throws Exception;
}
/**
* Runs the given task asynchronously after the current virtual thread has parked.
* @return the thread started to run the task
*/
static Thread runAfterParkedAsync(ThrowingRunnable task) {
Thread target = Thread.currentThread();
if (!target.isVirtual())
throw new WrongThreadException();
return Thread.ofPlatform().daemon().start(() -> {
try {
Thread.State state = target.getState();
while (state != Thread.State.WAITING
&& state != Thread.State.TIMED_WAITING) {
Thread.sleep(20);
state = target.getState();
}
Thread.sleep(20); // give a bit more time to release carrier
task.run();
} catch (Exception e) {
e.printStackTrace();
}
});
}
}