1244 lines
48 KiB
Java
Raw Normal View History

/*
* Copyright (c) 2018, 2020, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
/**
* @test
* @bug 8245194
* @run testng/othervm IOExchanges
*/
import java.io.IOException;
import java.net.*;
import java.nio.channels.*;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import static java.lang.System.out;
import static java.net.StandardProtocolFamily.*;
import static java.nio.channels.SelectionKey.OP_ACCEPT;
import static java.nio.channels.SelectionKey.OP_READ;
import static java.nio.channels.SelectionKey.OP_WRITE;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
public class IOExchanges {
static boolean unixDomainSupported = true;
@BeforeTest()
public void setup() {
try {
SocketChannel.open(UNIX);
} catch (IOException | UnsupportedOperationException e) {
unixDomainSupported = false;
out.println("Unix domain channels not supported");
}
}
static SocketChannel openSocketChannel(ProtocolFamily family)
throws IOException {
return family == UNIX ? SocketChannel.open(family)
: SocketChannel.open();
}
static ServerSocketChannel openServerSocketChannel(ProtocolFamily family)
throws IOException {
return family == UNIX ? ServerSocketChannel.open(family)
: ServerSocketChannel.open();
}
public static void deleteFile(SocketAddress addr) throws Exception {
if (addr instanceof UnixDomainSocketAddress) {
Files.deleteIfExists(((UnixDomainSocketAddress) addr).getPath());
}
}
/*
The following, non-exhaustive set, of tests exercise different combinations
of blocking and non-blocking accept/connect calls along with I/O
operations, that exchange a single byte. The intent it to test a reasonable
set of blocking and non-blocking scenarios.
The individual test method names follow their test scenario.
[BAccep|SELNBAccep|SPINNBAccep] - Accept either:
blocking, select-non-blocking, spinning-non-blocking
[BConn|NBConn] - blocking connect / non-blocking connect
[BIO|NBIO] - blocking / non-blocking I/O operations (read/write)
[WR|RW] - connecting thread write/accepting thread reads first, and vice-versa
[Id] - unique test Id
BAccep_BConn_BIO_WR_1
BAccep_BConn_BIO_RW_2
SELNBAccep_BConn_BIO_WR_3
SELNBAccep_BConn_BIO_RW_4
SPINNBAccep_BConn_BIO_WR_5
SPINNBAccep_BConn_BIO_RW_6
BAccep_NBConn_BIO_WR_7
BAccep_NBConn_BIO_RW_8
SELNBAccep_NBConn_BIO_WR_9
SELNBAccep_NBConn_BIO_RW_10
SPINNBAccep_NBConn_BIO_WR_11
SPINNBAccep_NBConn_BIO_RW_12
BAccep_BConn_NBIO_WR_1a // Non-Blocking I/O
BAccep_BConn_NBIO_RW_2a
SELNBAccep_BConn_NBIO_WR_3a
SELNBAccep_BConn_NBIO_RW_4a
SPINNBAccep_BConn_NBIO_WR_5a
SPINNBAccep_BConn_NBIO_RW_6a
BAccep_NBConn_NBIO_WR_7a
BAccep_NBConn_NBIO_RW_8a
SELNBAccep_NBConn_NBIO_WR_9a
SELNBAccep_NBConn_NBIO_RW_10a
SPINBAccep_NBConn_NBIO_WR_11a
SPINBAccep_NBConn_NBIO_RW_12a
*/
@DataProvider(name = "family")
public Object[][] family() {
return unixDomainSupported ?
new Object[][] {
{ UNIX },
{ INET }}
: new Object[][] {
{ INET }
};
}
@Test(dataProvider = "family")
public void BAccep_BConn_BIO_WR_1(ProtocolFamily family)
throws Throwable {
try (ServerSocketChannel ssc = openServerSocketChannel(family)) {
ssc.bind(null);
SocketAddress addr = ssc.getLocalAddress();
TestThread t = TestThread.of("t1", () -> {
try (SocketChannel sc = openSocketChannel(family)) {
assertTrue(sc.connect(addr));
ByteBuffer bb = ByteBuffer.allocate(10).put((byte) 0x01).flip();
assertEquals(sc.write(bb), 1);
out.printf("wrote: 0x%x%n", bb.get(0));
assertEquals(sc.read(bb.clear()), -1);
}
});
t.start();
try (SocketChannel sc = ssc.accept()) {
ByteBuffer bb = ByteBuffer.allocate(10);
assertEquals(sc.read(bb), 1);
out.printf("read: 0x%x%n", bb.get(0));
assertEquals(bb.get(0), 0x01);
}
t.awaitCompletion();
deleteFile(addr);
}
}
@Test(dataProvider = "family")
public void BAccep_BConn_BIO_RW_2(ProtocolFamily family)
throws Throwable {
try (ServerSocketChannel ssc = openServerSocketChannel(family)) {
ssc.bind(null);
SocketAddress addr = ssc.getLocalAddress();
TestThread t = TestThread.of("t2", () -> {
try (SocketChannel sc = openSocketChannel(family)) {
assertTrue(sc.connect(addr));
ByteBuffer bb = ByteBuffer.allocate(10);
assertEquals(sc.read(bb), 1);
out.printf("read: 0x%x%n", bb.get(0));
assertEquals(bb.get(0), 0x02);
}
});
t.start();
try (SocketChannel sc = ssc.accept()) {
ByteBuffer bb = ByteBuffer.allocate(10).put((byte) 0x02).flip();
assertEquals(sc.write(bb), 1);
out.printf("wrote: 0x%x%n", bb.get(0));
assertEquals(sc.read(bb.clear()), -1);
}
t.awaitCompletion();
deleteFile(addr);
}
}
@Test(dataProvider = "family")
public void SELNBAccep_BConn_BIO_WR_3(ProtocolFamily family)
throws Throwable {
try (ServerSocketChannel ssc = openServerSocketChannel(family);
Selector selector = Selector.open()) {
ssc.bind(null);
SocketAddress addr = ssc.getLocalAddress();
TestThread t = TestThread.of("t3", () -> {
try (SocketChannel sc = openSocketChannel(family)) {
assertTrue(sc.connect(addr));
ByteBuffer bb = ByteBuffer.allocate(10).put((byte) 0x03).flip();
assertEquals(sc.write(bb), 1);
out.printf("wrote: 0x%x%n", bb.get(0));
assertEquals(sc.read(bb.clear()), -1);
}
});
t.start();
ssc.configureBlocking(false).register(selector, OP_ACCEPT);
assertEquals(selector.select(), 1);
try (SocketChannel sc = ssc.accept()) {
ByteBuffer bb = ByteBuffer.allocate(10);
assertEquals(sc.read(bb), 1);
out.printf("read: 0x%x%n", bb.get(0));
assertEquals(bb.get(0), 0x03);
}
t.awaitCompletion();
deleteFile(addr);
}
}
@Test(dataProvider = "family")
public void SELNBAccep_BConn_BIO_RW_4(ProtocolFamily family)
throws Throwable {
try (ServerSocketChannel ssc = openServerSocketChannel(family);
Selector selector = Selector.open()) {
ssc.bind(null);
SocketAddress addr = ssc.getLocalAddress();
TestThread t = TestThread.of("t4", () -> {
try (SocketChannel sc = openSocketChannel(family)) {
assertTrue(sc.connect(addr));
ByteBuffer bb = ByteBuffer.allocate(10);
assertEquals(sc.read(bb), 1);
out.printf("read: 0x%x%n", bb.get(0));
assertEquals(bb.get(0), 0x04);
}
});
t.start();
ssc.configureBlocking(false).register(selector, OP_ACCEPT);
assertEquals(selector.select(), 1);
try (SocketChannel sc = ssc.accept()) {
ByteBuffer bb = ByteBuffer.allocate(10).put((byte) 0x04).flip();
assertEquals(sc.write(bb), 1);
out.printf("wrote: 0x%x%n", bb.get(0));
assertEquals(sc.read(bb.clear()), -1);
}
t.awaitCompletion();
deleteFile(addr);
}
}
@Test(dataProvider = "family")
public void SPINNBAccep_BConn_BIO_WR_5(ProtocolFamily family)
throws Throwable {
try (ServerSocketChannel ssc = openServerSocketChannel(family)) {
ssc.bind(null);
SocketAddress addr = ssc.getLocalAddress();
TestThread t = TestThread.of("t5", () -> {
try (SocketChannel sc = openSocketChannel(family)) {
assertTrue(sc.connect(addr));
ByteBuffer bb = ByteBuffer.allocate(10).put((byte) 0x05).flip();
assertEquals(sc.write(bb), 1);
out.printf("wrote: 0x%x%n", bb.get(0));
assertEquals(sc.read(bb.clear()), -1);
}
});
t.start();
SocketChannel accepted;
for (; ; ) {
accepted = ssc.accept();
if (accepted != null) {
out.println("accepted new connection");
break;
}
Thread.onSpinWait();
}
try (SocketChannel sc = accepted) {
ByteBuffer bb = ByteBuffer.allocate(10);
assertEquals(sc.read(bb), 1);
out.printf("read: 0x%x%n", bb.get(0));
assertEquals(bb.get(0), 0x05);
}
t.awaitCompletion();
deleteFile(addr);
}
}
@Test(dataProvider = "family")
public void SPINNBAccep_BConn_BIO_RW_6(ProtocolFamily family)
throws Throwable {
try (ServerSocketChannel ssc = openServerSocketChannel(family)) {
ssc.bind(null);
SocketAddress addr = ssc.getLocalAddress();
TestThread t = TestThread.of("t6", () -> {
try (SocketChannel sc = openSocketChannel(family)) {
assertTrue(sc.connect(addr));
ByteBuffer bb = ByteBuffer.allocate(10);
assertEquals(sc.read(bb), 1);
out.printf("read: 0x%x%n", bb.get(0));
assertEquals(bb.get(0), 0x06);
}
});
t.start();
SocketChannel accepted;
for (; ; ) {
accepted = ssc.accept();
if (accepted != null) {
out.println("accepted new connection");
break;
}
Thread.onSpinWait();
}
try (SocketChannel sc = accepted) {
ByteBuffer bb = ByteBuffer.allocate(10).put((byte) 0x06).flip();
assertEquals(sc.write(bb), 1);
out.printf("wrote: 0x%x%n", bb.get(0));
assertEquals(sc.read(bb.clear()), -1);
}
t.awaitCompletion();
deleteFile(addr);
}
}
// Similar to the previous six scenarios, but with same-thread
// non-blocking connect.
@Test(dataProvider = "family")
public void BAccep_NBConn_BIO_WR_7(ProtocolFamily family)
throws Throwable {
try (ServerSocketChannel ssc = openServerSocketChannel(family)) {
ssc.bind(null);
SocketAddress addr = ssc.getLocalAddress();
try (SocketChannel sc = openSocketChannel(family)) {
sc.configureBlocking(false);
sc.connect(addr);
try (SocketChannel sc2 = ssc.accept()) {
assertTrue(sc.finishConnect());
sc.configureBlocking(true);
TestThread t = TestThread.of("t7", () -> {
ByteBuffer bb = ByteBuffer.allocate(10).put((byte) 0x07).flip();
assertEquals(sc.write(bb), 1);
out.printf("wrote: 0x%x%n", bb.get(0));
assertEquals(sc.read(bb.clear()), -1);
});
t.start();
ByteBuffer bb = ByteBuffer.allocate(10);
assertEquals(sc2.read(bb), 1);
out.printf("read: 0x%x%n", bb.get(0));
assertEquals(bb.get(0), 0x07);
sc2.shutdownOutput();
t.awaitCompletion();
}
}
deleteFile(addr);
}
}
@Test(dataProvider = "family")
public void BAccep_NBConn_BIO_RW_8(ProtocolFamily family)
throws Throwable {
try (ServerSocketChannel ssc = openServerSocketChannel(family)) {
ssc.bind(null);
SocketAddress addr = ssc.getLocalAddress();
try (SocketChannel sc = openSocketChannel(family)) {
sc.configureBlocking(false);
sc.connect(addr);
try (SocketChannel sc2 = ssc.accept()) {
assertTrue(sc.finishConnect());
sc.configureBlocking(true);
TestThread t = TestThread.of("t8", () -> {
ByteBuffer bb = ByteBuffer.allocate(10);
assertEquals(sc.read(bb), 1);
out.printf("read: 0x%x%n", bb.get(0));
assertEquals(bb.get(0), 0x08);
sc.shutdownOutput();
});
t.start();
ByteBuffer bb = ByteBuffer.allocate(10).put((byte) 0x08).flip();
assertEquals(sc2.write(bb), 1);
out.printf("wrote: 0x%x%n", bb.get(0));
assertEquals(sc2.read(bb.clear()), -1);
t.awaitCompletion();
}
}
deleteFile(addr);
}
}
@Test(dataProvider = "family")
public void SELNBAccep_NBConn_BIO_WR_9(ProtocolFamily family)
throws Throwable {
try (ServerSocketChannel ssc = openServerSocketChannel(family)) {
ssc.bind(null);
SocketAddress addr = ssc.getLocalAddress();
try (SocketChannel sc = openSocketChannel(family);
Selector selector = Selector.open()) {
sc.configureBlocking(false);
sc.connect(addr);
ssc.configureBlocking(false).register(selector, OP_ACCEPT);
assertEquals(selector.select(), 1);
try (SocketChannel sc2 = ssc.accept()) {
assertTrue(sc.finishConnect());
sc.configureBlocking(true);
TestThread t = TestThread.of("t9", () -> {
ByteBuffer bb = ByteBuffer.allocate(10).put((byte) 0x09).flip();
assertEquals(sc.write(bb), 1);
out.printf("wrote: 0x%x%n", bb.get(0));
assertEquals(sc.read(bb.clear()), -1);
});
t.start();
ByteBuffer bb = ByteBuffer.allocate(10);
assertEquals(sc2.read(bb), 1);
out.printf("read: 0x%x%n", bb.get(0));
assertEquals(bb.get(0), 0x09);
sc2.shutdownOutput();
t.awaitCompletion();
}
}
deleteFile(addr);
}
}
@Test(dataProvider = "family")
public void SELNBAccep_NBConn_BIO_RW_10(ProtocolFamily family)
throws Throwable {
try (ServerSocketChannel ssc = openServerSocketChannel(family)) {
ssc.bind(null);
SocketAddress addr = ssc.getLocalAddress();
try (SocketChannel sc = openSocketChannel(family);
Selector selector = Selector.open()) {
sc.configureBlocking(false);
sc.connect(addr);
ssc.configureBlocking(false).register(selector, OP_ACCEPT);
assertEquals(selector.select(), 1);
try (SocketChannel sc2 = ssc.accept()) {
assertTrue(sc.finishConnect());
sc.configureBlocking(true);
TestThread t = TestThread.of("t10", () -> {
ByteBuffer bb = ByteBuffer.allocate(10);
assertEquals(sc.read(bb), 1);
out.printf("read: 0x%x%n", bb.get(0));
assertEquals(bb.get(0), 0x10);
sc.shutdownOutput();
});
t.start();
ByteBuffer bb = ByteBuffer.allocate(10).put((byte) 0x10).flip();
assertEquals(sc2.write(bb), 1);
out.printf("wrote: 0x%x%n", bb.get(0));
assertEquals(sc2.read(bb.clear()), -1);
t.awaitCompletion();
}
}
deleteFile(addr);
}
}
@Test(dataProvider = "family")
public void SPINNBAccep_NBConn_BIO_WR_11(ProtocolFamily family)
throws Throwable {
try (ServerSocketChannel ssc = openServerSocketChannel(family)) {
ssc.bind(null);
SocketAddress addr = ssc.getLocalAddress();
try (SocketChannel sc = openSocketChannel(family)) {
sc.configureBlocking(false);
sc.connect(addr);
SocketChannel accepted;
for (; ; ) {
accepted = ssc.accept();
if (accepted != null) {
out.println("accepted new connection");
break;
}
Thread.onSpinWait();
}
try (SocketChannel sc2 = accepted) {
assertTrue(sc.finishConnect());
sc.configureBlocking(true);
TestThread t = TestThread.of("t11", () -> {
ByteBuffer bb = ByteBuffer.allocate(10).put((byte) 0x11).flip();
assertEquals(sc.write(bb), 1);
out.printf("wrote: 0x%x%n", bb.get(0));
assertEquals(sc.read(bb.clear()), -1);
});
t.start();
ByteBuffer bb = ByteBuffer.allocate(10);
assertEquals(sc2.read(bb), 1);
out.printf("read: 0x%x%n", bb.get(0));
assertEquals(bb.get(0), 0x11);
sc2.shutdownOutput();
t.awaitCompletion();
}
}
deleteFile(addr);
}
}
@Test(dataProvider = "family")
public void SPINNBAccep_NBConn_BIO_RW_12(ProtocolFamily family)
throws Throwable {
try (ServerSocketChannel ssc = openServerSocketChannel(family)) {
ssc.bind(null);
SocketAddress addr = ssc.getLocalAddress();
try (SocketChannel sc = openSocketChannel(family)) {
sc.configureBlocking(false);
sc.connect(addr);
SocketChannel accepted;
for (; ; ) {
accepted = ssc.accept();
if (accepted != null) {
out.println("accepted new connection");
break;
}
Thread.onSpinWait();
}
try (SocketChannel sc2 = accepted) {
assertTrue(sc.finishConnect());
sc.configureBlocking(true);
TestThread t = TestThread.of("t12", () -> {
ByteBuffer bb = ByteBuffer.allocate(10);
assertEquals(sc.read(bb), 1);
out.printf("read: 0x%x%n", bb.get(0));
assertEquals(bb.get(0), 0x12);
sc.shutdownOutput();
});
t.start();
ByteBuffer bb = ByteBuffer.allocate(10).put((byte) 0x12).flip();
assertEquals(sc2.write(bb), 1);
out.printf("wrote: 0x%x%n", bb.get(0));
assertEquals(sc2.read(bb.clear()), -1);
t.awaitCompletion();
}
}
deleteFile(addr);
}
}
// ---
// Similar to the previous twelve scenarios but with non-blocking IO
// ---
@Test(dataProvider = "family")
public void BAccep_BConn_NBIO_WR_1a(ProtocolFamily family)
throws Throwable {
try (ServerSocketChannel ssc = openServerSocketChannel(family)) {
ssc.bind(null);
SocketAddress addr = ssc.getLocalAddress();
TestThread t = TestThread.of("t1a", () -> {
try (SocketChannel sc = openSocketChannel(family);
Selector selector = Selector.open()) {
assertTrue(sc.connect(addr));
ByteBuffer bb = ByteBuffer.allocate(10).put((byte) 0x1A).flip();
sc.configureBlocking(false);
SelectionKey k = sc.register(selector, OP_WRITE);
selector.select();
int c;
while ((c = sc.write(bb)) < 1) ;
assertEquals(c, 1);
out.printf("wrote: 0x%x%n", bb.get(0));
k.interestOps(OP_READ);
selector.select();
bb.clear();
while ((c = sc.read(bb)) == 0) ;
assertEquals(c, -1);
}
});
t.start();
try (SocketChannel sc = ssc.accept();
Selector selector = Selector.open()) {
ByteBuffer bb = ByteBuffer.allocate(10);
sc.configureBlocking(false);
sc.register(selector, OP_READ);
selector.select();
int c;
while ((c = sc.read(bb)) == 0) ;
assertEquals(c, 1);
out.printf("read: 0x%x%n", bb.get(0));
assertEquals(bb.get(0), 0x1A);
}
t.awaitCompletion();
deleteFile(addr);
}
}
@Test(dataProvider = "family")
public void BAccep_BConn_NBIO_RW_2a(ProtocolFamily family)
throws Throwable {
try (ServerSocketChannel ssc = openServerSocketChannel(family)) {
ssc.bind(null);
SocketAddress addr = ssc.getLocalAddress();
TestThread t = TestThread.of("t2a", () -> {
try (SocketChannel sc = openSocketChannel(family);
Selector selector = Selector.open()) {
assertTrue(sc.connect(addr));
ByteBuffer bb = ByteBuffer.allocate(10);
sc.configureBlocking(false);
sc.register(selector, OP_READ);
selector.select();
int c;
while ((c = sc.read(bb)) == 0) ;
assertEquals(c, 1);
out.printf("read: 0x%x%n", bb.get(0));
assertEquals(bb.get(0), 0x2A);
}
});
t.start();
try (SocketChannel sc = ssc.accept();
Selector selector = Selector.open()) {
ByteBuffer bb = ByteBuffer.allocate(10).put((byte) 0x2A).flip();
sc.configureBlocking(false);
SelectionKey k = sc.register(selector, OP_WRITE);
selector.select();
int c;
while ((c = sc.write(bb)) < 1) ;
assertEquals(c, 1);
out.printf("wrote: 0x%x%n", bb.get(0));
k.interestOps(OP_READ);
selector.select();
bb.clear();
while ((c = sc.read(bb)) == 0) ;
assertEquals(c, -1);
}
t.awaitCompletion();
deleteFile(addr);
}
}
@Test(dataProvider = "family")
public void SELNBAccep_BConn_NBIO_WR_3a(ProtocolFamily family)
throws Throwable {
try (ServerSocketChannel ssc = openServerSocketChannel(family);
Selector aselector = Selector.open()) {
ssc.bind(null);
SocketAddress addr = ssc.getLocalAddress();
TestThread t = TestThread.of("t3a", () -> {
try (SocketChannel sc = openSocketChannel(family);
Selector selector = Selector.open()) {
assertTrue(sc.connect(addr));
ByteBuffer bb = ByteBuffer.allocate(10).put((byte) 0x3A).flip();
sc.configureBlocking(false);
SelectionKey k = sc.register(selector, OP_WRITE);
selector.select();
int c;
while ((c = sc.write(bb)) < 1) ;
assertEquals(c, 1);
out.printf("wrote: 0x%x%n", bb.get(0));
k.interestOps(OP_READ);
selector.select();
bb.clear();
while ((c = sc.read(bb)) == 0) ;
assertEquals(c, -1);
}
});
t.start();
ssc.configureBlocking(false).register(aselector, OP_ACCEPT);
assertEquals(aselector.select(), 1);
try (SocketChannel sc = ssc.accept();
Selector selector = Selector.open()) {
ByteBuffer bb = ByteBuffer.allocate(10);
sc.configureBlocking(false);
sc.register(selector, OP_READ);
selector.select();
int c;
while ((c = sc.read(bb)) == 0) ;
assertEquals(c, 1);
out.printf("read: 0x%x%n", bb.get(0));
assertEquals(bb.get(0), 0x3A);
}
t.awaitCompletion();
deleteFile(addr);
}
}
@Test(dataProvider = "family")
public void SELNBAccep_BConn_NBIO_RW_4a(ProtocolFamily family)
throws Throwable {
try (ServerSocketChannel ssc = openServerSocketChannel(family);
Selector aselector = Selector.open()) {
ssc.bind(null);
SocketAddress addr = ssc.getLocalAddress();
TestThread t = TestThread.of("t4a", () -> {
try (SocketChannel sc = openSocketChannel(family);
Selector selector = Selector.open()) {
assertTrue(sc.connect(addr));
ByteBuffer bb = ByteBuffer.allocate(10);
sc.configureBlocking(false);
sc.register(selector, OP_READ);
selector.select();
int c;
while ((c = sc.read(bb)) == 0) ;
assertEquals(c, 1);
out.printf("read: 0x%x%n", bb.get(0));
assertEquals(bb.get(0), 0x4A);
}
});
t.start();
ssc.configureBlocking(false).register(aselector, OP_ACCEPT);
assertEquals(aselector.select(), 1);
try (SocketChannel sc = ssc.accept();
Selector selector = Selector.open()) {
ByteBuffer bb = ByteBuffer.allocate(10).put((byte) 0x4A).flip();
sc.configureBlocking(false);
SelectionKey k = sc.register(selector, OP_WRITE);
selector.select();
int c;
while ((c = sc.write(bb)) < 1) ;
assertEquals(c, 1);
out.printf("wrote: 0x%x%n", bb.get(0));
k.interestOps(OP_READ);
selector.select();
bb.clear();
while ((c = sc.read(bb)) == 0) ;
assertEquals(c, -1);
}
t.awaitCompletion();
deleteFile(addr);
}
}
@Test(dataProvider = "family")
public void SPINNBAccep_BConn_NBIO_WR_5a(ProtocolFamily family)
throws Throwable {
try (ServerSocketChannel ssc = openServerSocketChannel(family)) {
ssc.bind(null);
SocketAddress addr = ssc.getLocalAddress();
TestThread t = TestThread.of("t5a", () -> {
try (SocketChannel sc = openSocketChannel(family);
Selector selector = Selector.open()) {
assertTrue(sc.connect(addr));
ByteBuffer bb = ByteBuffer.allocate(10).put((byte) 0x5A).flip();
sc.configureBlocking(false);
SelectionKey k = sc.register(selector, OP_WRITE);
selector.select();
int c;
while ((c = sc.write(bb)) < 1) ;
assertEquals(c, 1);
out.printf("wrote: 0x%x%n", bb.get(0));
k.interestOps(OP_READ);
selector.select();
bb.clear();
while ((c = sc.read(bb)) == 0) ;
assertEquals(c, -1);
}
});
t.start();
SocketChannel accepted;
for (; ; ) {
accepted = ssc.accept();
if (accepted != null) {
out.println("accepted new connection");
break;
}
Thread.onSpinWait();
}
try (SocketChannel sc = accepted;
Selector selector = Selector.open()) {
ByteBuffer bb = ByteBuffer.allocate(10);
sc.configureBlocking(false);
sc.register(selector, OP_READ);
selector.select();
int c;
while ((c = sc.read(bb)) == 0) ;
assertEquals(c, 1);
out.printf("read: 0x%x%n", bb.get(0));
assertEquals(bb.get(0), 0x5A);
}
t.awaitCompletion();
deleteFile(addr);
}
}
@Test(dataProvider = "family")
public void SPINNBAccep_BConn_NBIO_RW_6a(ProtocolFamily family)
throws Throwable {
try (ServerSocketChannel ssc = openServerSocketChannel(family)) {
ssc.bind(null);
SocketAddress addr = ssc.getLocalAddress();
TestThread t = TestThread.of("t6a", () -> {
try (SocketChannel sc = openSocketChannel(family);
Selector selector = Selector.open()) {
assertTrue(sc.connect(addr));
ByteBuffer bb = ByteBuffer.allocate(10);
sc.configureBlocking(false);
sc.register(selector, OP_READ);
selector.select();
int c;
while ((c = sc.read(bb)) == 0) ;
assertEquals(c, 1);
out.printf("read: 0x%x%n", bb.get(0));
assertEquals(bb.get(0), 0x6A);
}
});
t.start();
SocketChannel accepted;
for (; ; ) {
accepted = ssc.accept();
if (accepted != null) {
out.println("accepted new connection");
break;
}
Thread.onSpinWait();
}
try (SocketChannel sc = accepted;
Selector selector = Selector.open()) {
ByteBuffer bb = ByteBuffer.allocate(10).put((byte) 0x6A).flip();
sc.configureBlocking(false);
SelectionKey k = sc.register(selector, OP_WRITE);
selector.select();
int c;
while ((c = sc.write(bb)) < 1) ;
assertEquals(c, 1);
out.printf("wrote: 0x%x%n", bb.get(0));
k.interestOps(OP_READ);
selector.select();
bb.clear();
while ((c = sc.read(bb)) == 0) ;
assertEquals(c, -1);
}
t.awaitCompletion();
deleteFile(addr);
}
}
// Similar to the previous six scenarios but with same-thread
// non-blocking connect.
@Test(dataProvider = "family")
public void BAccep_NBConn_NBIO_WR_7a(ProtocolFamily family)
throws Throwable {
try (ServerSocketChannel ssc = openServerSocketChannel(family)) {
ssc.bind(null);
SocketAddress addr = ssc.getLocalAddress();
try (SocketChannel sc = openSocketChannel(family)) {
sc.configureBlocking(false);
sc.connect(addr);
try (SocketChannel sc2 = ssc.accept()) {
assertTrue(sc.finishConnect());
TestThread t = TestThread.of("t7a", () -> {
try (Selector selector = Selector.open()) {
ByteBuffer bb = ByteBuffer.allocate(10).put((byte) 0x7A).flip();
sc.configureBlocking(false);
SelectionKey k = sc.register(selector, OP_WRITE);
selector.select();
int c;
while ((c = sc.write(bb)) < 1) ;
assertEquals(c, 1);
out.printf("wrote: 0x%x%n", bb.get(0));
k.interestOps(OP_READ);
selector.select();
bb.clear();
while ((c = sc.read(bb)) == 0) ;
assertEquals(c, -1);
}
});
t.start();
ByteBuffer bb = ByteBuffer.allocate(10);
sc2.configureBlocking(false);
try (Selector selector = Selector.open()) {
sc2.register(selector, OP_READ);
selector.select();
int c;
while ((c = sc2.read(bb)) == 0) ;
assertEquals(c, 1);
out.printf("read: 0x%x%n", bb.get(0));
assertEquals(bb.get(0), 0x7A);
sc2.shutdownOutput();
}
t.awaitCompletion();
}
}
deleteFile(addr);
}
}
@Test(dataProvider = "family")
public void BAccep_NBConn_NBIO_RW_8a(ProtocolFamily family)
throws Throwable {
try (ServerSocketChannel ssc = openServerSocketChannel(family)) {
ssc.bind(null);
SocketAddress addr = ssc.getLocalAddress();
try (SocketChannel sc = openSocketChannel(family)) {
sc.configureBlocking(false);
sc.connect(addr);
try (SocketChannel sc2 = ssc.accept()) {
assertTrue(sc.finishConnect());
TestThread t = TestThread.of("t8a", () -> {
try (Selector selector = Selector.open()) {
ByteBuffer bb = ByteBuffer.allocate(10);
sc.register(selector, OP_READ);
selector.select();
int c;
while ((c = sc.read(bb)) == 0) ;
assertEquals(c, 1);
out.printf("read: 0x%x%n", bb.get(0));
assertEquals(bb.get(0), (byte) 0x8A);
sc.shutdownOutput();
}
});
t.start();
ByteBuffer bb = ByteBuffer.allocate(10).put((byte) 0x8A).flip();
sc2.configureBlocking(false);
try (Selector selector = Selector.open()) {
SelectionKey k = sc2.register(selector, OP_WRITE);
selector.select();
int c;
while ((c = sc2.write(bb)) < 1) ;
assertEquals(c, 1);
out.printf("wrote: 0x%x%n", bb.get(0));
k.interestOps(OP_READ);
selector.select();
bb.clear();
while ((c = sc2.read(bb)) == 0) ;
assertEquals(c, -1);
}
t.awaitCompletion();
}
}
deleteFile(addr);
}
}
@Test(dataProvider = "family")
public void SELNBAccep_NBConn_NBIO_WR_9a(ProtocolFamily family)
throws Throwable {
try (ServerSocketChannel ssc = openServerSocketChannel(family)) {
ssc.bind(null);
SocketAddress addr = ssc.getLocalAddress();
try (SocketChannel sc = openSocketChannel(family)) {
sc.configureBlocking(false);
sc.connect(addr);
Selector aselector = Selector.open();
ssc.configureBlocking(false).register(aselector, OP_ACCEPT);
assertEquals(aselector.select(), 1);
try (SocketChannel sc2 = ssc.accept()) {
assertTrue(sc.finishConnect());
TestThread t = TestThread.of("t9a", () -> {
try (Selector selector = Selector.open()) {
ByteBuffer bb = ByteBuffer.allocate(10).put((byte) 0x9A).flip();
sc.configureBlocking(false);
SelectionKey k = sc.register(selector, OP_WRITE);
selector.select();
int c;
while ((c = sc.write(bb)) < 1) ;
assertEquals(c, 1);
out.printf("wrote: 0x%x%n", bb.get(0));
k.interestOps(OP_READ);
selector.select();
bb.clear();
while ((c = sc.read(bb)) == 0) ;
assertEquals(c, -1);
}
});
t.start();
ByteBuffer bb = ByteBuffer.allocate(10);
sc2.configureBlocking(false);
try (Selector selector = Selector.open()) {
sc2.register(selector, OP_READ);
selector.select();
int c;
while ((c = sc2.read(bb)) == 0) ;
assertEquals(c, 1);
out.printf("read: 0x%x%n", bb.get(0));
assertEquals(bb.get(0), (byte) 0x9A);
sc2.shutdownOutput();
}
t.awaitCompletion();
}
}
deleteFile(addr);
}
}
@Test(dataProvider = "family")
public void SELNBAccep_NBConn_NBIO_RW_10a(ProtocolFamily family)
throws Throwable {
try (ServerSocketChannel ssc = openServerSocketChannel(family)) {
ssc.bind(null);
SocketAddress addr = ssc.getLocalAddress();
try (SocketChannel sc = openSocketChannel(family)) {
sc.configureBlocking(false);
sc.connect(addr);
Selector aselector = Selector.open();
ssc.configureBlocking(false).register(aselector, OP_ACCEPT);
assertEquals(aselector.select(), 1);
try (SocketChannel sc2 = ssc.accept()) {
assertTrue(sc.finishConnect());
TestThread t = TestThread.of("t10a", () -> {
try (Selector selector = Selector.open()) {
ByteBuffer bb = ByteBuffer.allocate(10);
sc.register(selector, OP_READ);
selector.select();
int c;
while ((c = sc.read(bb)) == 0) ;
assertEquals(c, 1);
out.printf("read: 0x%x%n", bb.get(0));
assertEquals(bb.get(0), (byte) 0xAA);
sc.shutdownOutput();
}
});
t.start();
ByteBuffer bb = ByteBuffer.allocate(10).put((byte) 0xAA).flip();
sc2.configureBlocking(false);
try (Selector selector = Selector.open()) {
SelectionKey k = sc2.register(selector, OP_WRITE);
selector.select();
int c;
while ((c = sc2.write(bb)) < 1) ;
assertEquals(c, 1);
out.printf("wrote: 0x%x%n", bb.get(0));
k.interestOps(OP_READ);
selector.select();
bb.clear();
while ((c = sc2.read(bb)) == 0) ;
assertEquals(c, -1);
}
t.awaitCompletion();
}
}
deleteFile(addr);
}
}
@Test(dataProvider = "family")
public void SPINBAccep_NBConn_NBIO_WR_11a(ProtocolFamily family)
throws Throwable {
try (ServerSocketChannel ssc = openServerSocketChannel(family)) {
ssc.bind(null);
SocketAddress addr = ssc.getLocalAddress();
try (SocketChannel sc = openSocketChannel(family)) {
sc.configureBlocking(false);
sc.connect(addr);
SocketChannel accepted;
for (; ; ) {
accepted = ssc.accept();
if (accepted != null) {
out.println("accepted new connection");
break;
}
Thread.onSpinWait();
}
try (SocketChannel sc2 = accepted) {
assertTrue(sc.finishConnect());
TestThread t = TestThread.of("t11a", () -> {
try (Selector selector = Selector.open()) {
ByteBuffer bb = ByteBuffer.allocate(10).put((byte) 0xBA).flip();
sc.configureBlocking(false);
SelectionKey k = sc.register(selector, OP_WRITE);
selector.select();
int c;
while ((c = sc.write(bb)) < 1) ;
assertEquals(c, 1);
out.printf("wrote: 0x%x%n", bb.get(0));
k.interestOps(OP_READ);
selector.select();
bb.clear();
while ((c = sc.read(bb)) == 0) ;
assertEquals(c, -1);
}
});
t.start();
ByteBuffer bb = ByteBuffer.allocate(10);
sc2.configureBlocking(false);
try (Selector selector = Selector.open()) {
sc2.register(selector, OP_READ);
selector.select();
int c;
while ((c = sc2.read(bb)) == 0) ;
assertEquals(c, 1);
out.printf("read: 0x%x%n", bb.get(0));
assertEquals(bb.get(0), (byte) 0xBA);
sc2.shutdownOutput();
}
t.awaitCompletion();
}
}
deleteFile(addr);
}
}
@Test(dataProvider = "family")
public void SPINBAccep_NBConn_NBIO_RW_12a(ProtocolFamily family)
throws Throwable {
try (ServerSocketChannel ssc = openServerSocketChannel(family)) {
ssc.bind(null);
SocketAddress addr = ssc.getLocalAddress();
try (SocketChannel sc = openSocketChannel(family)) {
sc.configureBlocking(false);
sc.connect(addr);
SocketChannel accepted;
for (; ; ) {
accepted = ssc.accept();
if (accepted != null) {
out.println("accepted new connection");
break;
}
Thread.onSpinWait();
}
try (SocketChannel sc2 = accepted) {
assertTrue(sc.finishConnect());
TestThread t = TestThread.of("t10a", () -> {
try (Selector selector = Selector.open()) {
ByteBuffer bb = ByteBuffer.allocate(10);
sc.register(selector, OP_READ);
selector.select();
int c;
while ((c = sc.read(bb)) == 0) ;
assertEquals(c, 1);
out.printf("read: 0x%x%n", bb.get(0));
assertEquals(bb.get(0), (byte) 0xCA);
sc.shutdownOutput();
}
});
t.start();
ByteBuffer bb = ByteBuffer.allocate(10).put((byte) 0xCA).flip();
sc2.configureBlocking(false);
try (Selector selector = Selector.open()) {
SelectionKey k = sc2.register(selector, OP_WRITE);
selector.select();
int c;
while ((c = sc2.write(bb)) < 1) ;
assertEquals(c, 1);
out.printf("wrote: 0x%x%n", bb.get(0));
k.interestOps(OP_READ);
selector.select();
bb.clear();
while ((c = sc2.read(bb)) == 0) ;
assertEquals(c, -1);
}
t.awaitCompletion();
}
}
deleteFile(addr);
}
}
// --
static class TestThread extends Thread {
private final UncheckedRunnable runnable;
private volatile Throwable throwable;
TestThread(UncheckedRunnable runnable, String name) {
super(name);
this.runnable = runnable;
}
@Override
public void run() {
try {
runnable.run();
} catch (Throwable t) {
out.printf("[%s] caught unexpected: %s%n", getName(), t);
throwable = t;
}
}
interface UncheckedRunnable {
void run() throws Throwable;
}
static TestThread of(String name, UncheckedRunnable runnable) {
return new TestThread(runnable, name);
}
void awaitCompletion() throws Throwable {
this.join();
if (throwable != null)
throw throwable;
}
}
}