8164166: Make sure java/nio/channels tests shutdown asynchronous channel groups

Reviewed-by: alanb
This commit is contained in:
Artem Smotrakov 2016-08-24 10:58:29 -07:00
parent dde76394d5
commit 8fe1b11fa9
4 changed files with 298 additions and 263 deletions
jdk/test/java/nio/channels/AsynchronousChannelGroup

@ -1,5 +1,5 @@
/*
* Copyright (c) 2008, 2012, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2008, 2016, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -119,19 +119,31 @@ public class Basic {
ExecutorService pool = Executors.newCachedThreadPool();
AsynchronousChannelGroup group = AsynchronousChannelGroup
.withCachedThreadPool(pool, rand.nextInt(10));
testShutdownWithChannels(pool, group);
try {
testShutdownWithChannels(pool, group);
} finally {
group.shutdown();
}
}
for (int i = 0; i < 100; i++) {
int nThreads = 1 + rand.nextInt(8);
AsynchronousChannelGroup group = AsynchronousChannelGroup
.withFixedThreadPool(nThreads, threadFactory);
testShutdownWithChannels(null, group);
try {
testShutdownWithChannels(null, group);
} finally {
group.shutdown();
}
}
for (int i = 0; i < 100; i++) {
ExecutorService pool = Executors.newCachedThreadPool();
AsynchronousChannelGroup group = AsynchronousChannelGroup
.withThreadPool(pool);
testShutdownWithChannels(pool, group);
try {
testShutdownWithChannels(pool, group);
} finally {
group.shutdown();
}
}
}
@ -164,19 +176,31 @@ public class Basic {
ExecutorService pool = pool = Executors.newCachedThreadPool();
AsynchronousChannelGroup group = AsynchronousChannelGroup
.withCachedThreadPool(pool, rand.nextInt(5));
testShutdownNow(pool, group);
try {
testShutdownNow(pool, group);
} finally {
group.shutdown();
}
}
for (int i = 0; i < 10; i++) {
int nThreads = 1 + rand.nextInt(8);
AsynchronousChannelGroup group = AsynchronousChannelGroup
.withFixedThreadPool(nThreads, threadFactory);
testShutdownNow(null, group);
try {
testShutdownNow(null, group);
} finally {
group.shutdown();
}
}
for (int i = 0; i < 10; i++) {
ExecutorService pool = Executors.newCachedThreadPool();
AsynchronousChannelGroup group = AsynchronousChannelGroup
.withThreadPool(pool);
testShutdownNow(pool, group);
try {
testShutdownNow(pool, group);
} finally {
group.shutdown();
}
}
}
@ -186,78 +210,78 @@ public class Basic {
AsynchronousChannelGroup group =
AsynchronousChannelGroup.withFixedThreadPool(1, threadFactory);
AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(group);
AsynchronousServerSocketChannel listener = AsynchronousServerSocketChannel.open(group);
try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(group);
AsynchronousServerSocketChannel listener =
AsynchronousServerSocketChannel.open(group)) {
// initiate accept
listener.bind(new InetSocketAddress(0));
Future<AsynchronousSocketChannel> result = listener.accept();
// initiate accept
listener.bind(new InetSocketAddress(0));
Future<AsynchronousSocketChannel> result = listener.accept();
// shutdown group
group.shutdown();
if (!group.isShutdown())
throw new RuntimeException("Group should be shutdown");
// shutdown group
group.shutdown();
if (!group.isShutdown())
throw new RuntimeException("Group should be shutdown");
// attempt to create another channel
try {
AsynchronousSocketChannel.open(group);
throw new RuntimeException("ShutdownChannelGroupException expected");
} catch (ShutdownChannelGroupException x) {
// attempt to create another channel
try {
AsynchronousSocketChannel.open(group);
throw new RuntimeException("ShutdownChannelGroupException expected");
} catch (ShutdownChannelGroupException x) {
}
try {
AsynchronousServerSocketChannel.open(group);
throw new RuntimeException("ShutdownChannelGroupException expected");
} catch (ShutdownChannelGroupException x) {
}
// attempt to create another channel by connecting. This should cause
// the accept operation to fail.
InetAddress lh = InetAddress.getLocalHost();
int port = ((InetSocketAddress)listener.getLocalAddress()).getPort();
InetSocketAddress isa = new InetSocketAddress(lh, port);
ch.connect(isa).get();
try {
result.get();
throw new RuntimeException("Connection was accepted");
} catch (ExecutionException x) {
Throwable cause = x.getCause();
if (!(cause instanceof IOException))
throw new RuntimeException("Cause should be IOException");
cause = cause.getCause();
if (!(cause instanceof ShutdownChannelGroupException))
throw new RuntimeException("IOException cause should be ShutdownChannelGroupException");
}
// initiate another accept even though channel group is shutdown.
Future<AsynchronousSocketChannel> res = listener.accept();
try {
res.get(3, TimeUnit.SECONDS);
throw new RuntimeException("TimeoutException expected");
} catch (TimeoutException x) {
}
// connect to the listener which should cause the accept to complete
AsynchronousSocketChannel.open().connect(isa);
try {
res.get();
throw new RuntimeException("Connection was accepted");
} catch (ExecutionException x) {
Throwable cause = x.getCause();
if (!(cause instanceof IOException))
throw new RuntimeException("Cause should be IOException");
cause = cause.getCause();
if (!(cause instanceof ShutdownChannelGroupException))
throw new RuntimeException("IOException cause should be ShutdownChannelGroupException");
}
// group should *not* terminate as channels are open
boolean terminated = group.awaitTermination(3, TimeUnit.SECONDS);
if (terminated) {
throw new RuntimeException("Group should not have terminated");
}
} finally {
group.shutdown();
}
try {
AsynchronousServerSocketChannel.open(group);
throw new RuntimeException("ShutdownChannelGroupException expected");
} catch (ShutdownChannelGroupException x) {
}
// attempt to create another channel by connecting. This should cause
// the accept operation to fail.
InetAddress lh = InetAddress.getLocalHost();
int port = ((InetSocketAddress)listener.getLocalAddress()).getPort();
InetSocketAddress isa = new InetSocketAddress(lh, port);
ch.connect(isa).get();
try {
result.get();
throw new RuntimeException("Connection was accepted");
} catch (ExecutionException x) {
Throwable cause = x.getCause();
if (!(cause instanceof IOException))
throw new RuntimeException("Cause should be IOException");
cause = cause.getCause();
if (!(cause instanceof ShutdownChannelGroupException))
throw new RuntimeException("IOException cause should be ShutdownChannelGroupException");
}
// initiate another accept even though channel group is shutdown.
Future<AsynchronousSocketChannel> res = listener.accept();
try {
res.get(3, TimeUnit.SECONDS);
throw new RuntimeException("TimeoutException expected");
} catch (TimeoutException x) {
}
// connect to the listener which should cause the accept to complete
AsynchronousSocketChannel.open().connect(isa);
try {
res.get();
throw new RuntimeException("Connection was accepted");
} catch (ExecutionException x) {
Throwable cause = x.getCause();
if (!(cause instanceof IOException))
throw new RuntimeException("Cause should be IOException");
cause = cause.getCause();
if (!(cause instanceof ShutdownChannelGroupException))
throw new RuntimeException("IOException cause should be ShutdownChannelGroupException");
}
// group should *not* terminate as channels are open
boolean terminated = group.awaitTermination(3, TimeUnit.SECONDS);
if (terminated)
throw new RuntimeException("Group should not have terminated");
// close channel; group should terminate quickly
ch.close();
listener.close();
awaitTermination(group);
}
static void miscTests() throws Exception {

@ -1,5 +1,5 @@
/*
* Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2008, 2016, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -41,34 +41,36 @@ import java.io.IOException;
public class GroupOfOne {
public static void main(String[] args) throws Exception {
final List<AsynchronousSocketChannel> accepted = new ArrayList<>();
// create listener to accept connections
final AsynchronousServerSocketChannel listener =
AsynchronousServerSocketChannel.open()
.bind(new InetSocketAddress(0));
final List<AsynchronousSocketChannel> accepted = new ArrayList<AsynchronousSocketChannel>();
listener.accept((Void)null, new CompletionHandler<AsynchronousSocketChannel,Void>() {
public void completed(AsynchronousSocketChannel ch, Void att) {
synchronized (accepted) {
accepted.add(ch);
try (final AsynchronousServerSocketChannel listener =
AsynchronousServerSocketChannel.open()) {
listener.bind(new InetSocketAddress(0));
listener.accept((Void)null, new CompletionHandler<AsynchronousSocketChannel,Void>() {
public void completed(AsynchronousSocketChannel ch, Void att) {
synchronized (accepted) {
accepted.add(ch);
}
listener.accept((Void)null, this);
}
listener.accept((Void)null, this);
}
public void failed(Throwable exc, Void att) {
}
});
public void failed(Throwable exc, Void att) {
}
});
int port = ((InetSocketAddress)(listener.getLocalAddress())).getPort();
SocketAddress sa = new InetSocketAddress(InetAddress.getLocalHost(), port);
int port = ((InetSocketAddress)(listener.getLocalAddress())).getPort();
SocketAddress sa = new InetSocketAddress(InetAddress.getLocalHost(), port);
test(sa, true, false);
test(sa, false, true);
test(sa, true, true);
// clean-up
listener.close();
synchronized (accepted) {
for (AsynchronousSocketChannel ch: accepted) {
ch.close();
test(sa, true, false);
test(sa, false, true);
test(sa, true, true);
} finally {
// clean-up
synchronized (accepted) {
for (AsynchronousSocketChannel ch: accepted) {
ch.close();
}
}
}
}
@ -86,60 +88,60 @@ public class GroupOfOne {
return new Thread(r);
}});
final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(group);
try {
// the latch counts down when:
// 1. The read operation fails (expected)
// 2. the close/shutdown completes
final CountDownLatch latch = new CountDownLatch(2);
// the latch counts down when:
// 1. The read operation fails (expected)
// 2. the close/shutdown completes
final CountDownLatch latch = new CountDownLatch(2);
ch.connect(sa, (Void)null, new CompletionHandler<Void,Void>() {
public void completed(Void result, Void att) {
System.out.println("Connected");
ch.connect(sa, (Void)null, new CompletionHandler<Void,Void>() {
public void completed(Void result, Void att) {
System.out.println("Connected");
// initiate I/O operation that does not complete (successfully)
ByteBuffer buf = ByteBuffer.allocate(100);
ch.read(buf, (Void)null, new CompletionHandler<Integer,Void>() {
public void completed(Integer bytesRead, Void att) {
throw new RuntimeException();
}
public void failed(Throwable exc, Void att) {
if (!(exc instanceof AsynchronousCloseException))
throw new RuntimeException(exc);
System.out.println("Read failed (expected)");
latch.countDown();
}
});
// initiate I/O operation that does not complete (successfully)
ByteBuffer buf = ByteBuffer.allocate(100);
ch.read(buf, (Void)null, new CompletionHandler<Integer,Void>() {
public void completed(Integer bytesRead, Void att) {
// close channel or shutdown group
try {
if (closeChannel) {
System.out.print("Close channel ...");
ch.close();
System.out.println(" done.");
}
if (shutdownGroup) {
System.out.print("Shutdown group ...");
group.shutdownNow();
System.out.println(" done.");
}
latch.countDown();
} catch (IOException e) {
throw new RuntimeException();
}
public void failed(Throwable exc, Void att) {
if (!(exc instanceof AsynchronousCloseException))
throw new RuntimeException(exc);
System.out.println("Read failed (expected)");
latch.countDown();
}
});
// close channel or shutdown group
try {
if (closeChannel) {
System.out.print("Close channel ...");
ch.close();
System.out.println(" done.");
}
if (shutdownGroup) {
System.out.print("Shutdown group ...");
group.shutdownNow();
System.out.println(" done.");
}
latch.countDown();
} catch (IOException e) {
throw new RuntimeException();
}
}
public void failed(Throwable exc, Void att) {
throw new RuntimeException(exc);
}
});
latch.await();
// clean-up
group.shutdown();
boolean terminated = group.awaitTermination(20, TimeUnit.SECONDS);
if (!terminated)
throw new RuntimeException("Group did not terminate");
public void failed(Throwable exc, Void att) {
throw new RuntimeException(exc);
}
});
latch.await();
} finally {
// clean-up
group.shutdown();
boolean terminated = group.awaitTermination(20, TimeUnit.SECONDS);
if (!terminated)
throw new RuntimeException("Group did not terminate");
}
System.out.println("TEST OKAY");
}
}

@ -1,5 +1,5 @@
/*
* Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2008, 2016, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -76,89 +76,91 @@ public class Identity {
}
public static void main(String[] args) throws Exception {
// create listener to accept connections
final AsynchronousServerSocketChannel listener =
AsynchronousServerSocketChannel.open()
.bind(new InetSocketAddress(0));
listener.accept((Void)null, new CompletionHandler<AsynchronousSocketChannel,Void>() {
public void completed(final AsynchronousSocketChannel ch, Void att) {
listener.accept((Void)null, this);
final ByteBuffer buf = ByteBuffer.allocate(100);
ch.read(buf, ch, new CompletionHandler<Integer,AsynchronousSocketChannel>() {
public void completed(Integer bytesRead, AsynchronousSocketChannel ch) {
if (bytesRead < 0) {
try { ch.close(); } catch (IOException ignore) { }
} else {
buf.clear();
ch.read(buf, ch, this);
}
}
public void failed(Throwable exc, AsynchronousSocketChannel ch) {
try { ch.close(); } catch (IOException ignore) { }
}
});
}
public void failed(Throwable exc, Void att) {
}
});
int port = ((InetSocketAddress)(listener.getLocalAddress())).getPort();
SocketAddress sa = new InetSocketAddress(InetAddress.getLocalHost(), port);
// create 3-10 channels, each in its own group
final int groupCount = 3 + rand.nextInt(8);
AsynchronousChannelGroup[] groups = new AsynchronousChannelGroup[groupCount];
final AsynchronousChannelGroup[] groups = new AsynchronousChannelGroup[groupCount];
final AsynchronousSocketChannel[] channels = new AsynchronousSocketChannel[groupCount];
for (int i=0; i<groupCount; i++) {
ThreadFactory factory = createThreadFactory(i);
AsynchronousChannelGroup group;
if (rand.nextBoolean()) {
int nThreads = 1 + rand.nextInt(10);
group = AsynchronousChannelGroup.withFixedThreadPool(nThreads, factory);
} else {
ExecutorService pool = Executors.newCachedThreadPool(factory);
group = AsynchronousChannelGroup.withCachedThreadPool(pool, rand.nextInt(5));
}
groups[i] = group;
// create channel in group and connect it to the server
AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(group);
ch.connect(sa).get();
channels[i] = ch;
}
// create listener to accept connections
try (final AsynchronousServerSocketChannel listener =
AsynchronousServerSocketChannel.open()) {
// randomly write to each channel, ensuring that the completion handler
// is always invoked by a thread with the right identity.
final AtomicInteger writeCount = new AtomicInteger(100);
channels[0].write(getBuffer(), 0, new CompletionHandler<Integer,Integer>() {
public void completed(Integer bytesWritten, Integer groupId) {
if (bytesWritten != 1)
fail("Expected 1 byte to be written");
if (!myGroup.get().equals(groupId))
fail("Handler invoked by thread with the wrong identity");
if (writeCount.decrementAndGet() > 0) {
int id = rand.nextInt(groupCount);
channels[id].write(getBuffer(), id, this);
} else {
done.countDown();
listener.bind(new InetSocketAddress(0));
listener.accept((Void)null, new CompletionHandler<AsynchronousSocketChannel,Void>() {
public void completed(final AsynchronousSocketChannel ch, Void att) {
listener.accept((Void)null, this);
final ByteBuffer buf = ByteBuffer.allocate(100);
ch.read(buf, ch, new CompletionHandler<Integer,AsynchronousSocketChannel>() {
public void completed(Integer bytesRead, AsynchronousSocketChannel ch) {
if (bytesRead < 0) {
try { ch.close(); } catch (IOException ignore) { }
} else {
buf.clear();
ch.read(buf, ch, this);
}
}
public void failed(Throwable exc, AsynchronousSocketChannel ch) {
try { ch.close(); } catch (IOException ignore) { }
}
});
}
public void failed(Throwable exc, Void att) {
}
});
int port = ((InetSocketAddress)(listener.getLocalAddress())).getPort();
SocketAddress sa = new InetSocketAddress(InetAddress.getLocalHost(), port);
for (int i=0; i<groupCount; i++) {
ThreadFactory factory = createThreadFactory(i);
AsynchronousChannelGroup group;
if (rand.nextBoolean()) {
int nThreads = 1 + rand.nextInt(10);
group = AsynchronousChannelGroup.withFixedThreadPool(nThreads, factory);
} else {
ExecutorService pool = Executors.newCachedThreadPool(factory);
group = AsynchronousChannelGroup.withCachedThreadPool(pool, rand.nextInt(5));
}
groups[i] = group;
// create channel in group and connect it to the server
AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(group);
ch.connect(sa).get();
channels[i] = ch;
}
public void failed(Throwable exc, Integer groupId) {
fail(exc.getMessage());
}
});
// wait until done
done.await();
// randomly write to each channel, ensuring that the completion handler
// is always invoked by a thread with the right identity.
final AtomicInteger writeCount = new AtomicInteger(100);
channels[0].write(getBuffer(), 0, new CompletionHandler<Integer,Integer>() {
public void completed(Integer bytesWritten, Integer groupId) {
if (bytesWritten != 1)
fail("Expected 1 byte to be written");
if (!myGroup.get().equals(groupId))
fail("Handler invoked by thread with the wrong identity");
if (writeCount.decrementAndGet() > 0) {
int id = rand.nextInt(groupCount);
channels[id].write(getBuffer(), id, this);
} else {
done.countDown();
}
}
public void failed(Throwable exc, Integer groupId) {
fail(exc.getMessage());
}
});
// clean-up
for (AsynchronousSocketChannel ch: channels)
ch.close();
for (AsynchronousChannelGroup group: groups)
group.shutdownNow();
listener.close();
// wait until done
done.await();
} finally {
// clean-up
for (AsynchronousSocketChannel ch: channels)
ch.close();
for (AsynchronousChannelGroup group: groups)
group.shutdownNow();
if (failed.get())
throw new RuntimeException("Test failed - see log for details");
if (failed.get())
throw new RuntimeException("Test failed - see log for details");
}
}
static ByteBuffer getBuffer() {

@ -1,5 +1,5 @@
/*
* Copyright (c) 2008, 2012, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2008, 2016, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -66,21 +66,30 @@ public class Restart {
// group with fixed thread pool
int nThreads = 1 + rand.nextInt(4);
AsynchronousChannelGroup group =
AsynchronousChannelGroup.withFixedThreadPool(nThreads, factory);
testRestart(group, 100);
group.shutdown();
AsynchronousChannelGroup.withFixedThreadPool(nThreads, factory);
try {
testRestart(group, 100);
} finally {
group.shutdown();
}
// group with cached thread pool
ExecutorService pool = Executors.newCachedThreadPool(factory);
group = AsynchronousChannelGroup.withCachedThreadPool(pool, rand.nextInt(5));
testRestart(group, 100);
group.shutdown();
try {
testRestart(group, 100);
} finally {
group.shutdown();
}
// group with custom thread pool
group = AsynchronousChannelGroup
.withThreadPool(Executors.newFixedThreadPool(1+rand.nextInt(5), factory));
testRestart(group, 100);
group.shutdown();
group = AsynchronousChannelGroup.withThreadPool(
Executors.newFixedThreadPool(1+rand.nextInt(5), factory));
try {
testRestart(group, 100);
} finally {
group.shutdown();
}
// give time for threads to terminate
Thread.sleep(3000);
@ -92,45 +101,43 @@ public class Restart {
static void testRestart(AsynchronousChannelGroup group, int count)
throws Exception
{
AsynchronousServerSocketChannel listener =
AsynchronousServerSocketChannel.open(group)
.bind(new InetSocketAddress(0));
try (AsynchronousServerSocketChannel listener =
AsynchronousServerSocketChannel.open(group)) {
for (int i=0; i<count; i++) {
final CountDownLatch latch = new CountDownLatch(1);
listener.bind(new InetSocketAddress(0));
for (int i=0; i<count; i++) {
final CountDownLatch latch = new CountDownLatch(1);
listener.accept((Void)null, new CompletionHandler<AsynchronousSocketChannel,Void>() {
public void completed(AsynchronousSocketChannel ch, Void att) {
try {
ch.close();
} catch (IOException ignore) { }
listener.accept((Void)null, new CompletionHandler<AsynchronousSocketChannel,Void>() {
public void completed(AsynchronousSocketChannel ch, Void att) {
try {
ch.close();
} catch (IOException ignore) { }
latch.countDown();
latch.countDown();
// throw error or runtime exception
if (rand.nextBoolean()) {
throw new Error();
} else {
throw new RuntimeException();
// throw error or runtime exception
if (rand.nextBoolean()) {
throw new Error();
} else {
throw new RuntimeException();
}
}
}
public void failed(Throwable exc, Void att) {
}
});
public void failed(Throwable exc, Void att) {
}
});
// establish loopback connection which should cause completion
// handler to be invoked.
int port = ((InetSocketAddress)(listener.getLocalAddress())).getPort();
AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
InetAddress lh = InetAddress.getLocalHost();
ch.connect(new InetSocketAddress(lh, port)).get();
ch.close();
// establish loopback connection which should cause completion
// handler to be invoked.
int port = ((InetSocketAddress)(listener.getLocalAddress())).getPort();
try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) {
InetAddress lh = InetAddress.getLocalHost();
ch.connect(new InetSocketAddress(lh, port)).get();
}
// wait for handler to be invoked
latch.await();
// wait for handler to be invoked
latch.await();
}
}
// clean-up
listener.close();
}
}