8319757: java/nio/channels/DatagramChannel/InterruptibleOrNot.java failed: wrong exception thrown
Reviewed-by: jpai, bpb
This commit is contained in:
parent
be4614eb5e
commit
ace010b38a
@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved.
|
||||
* Copyright (c) 2019, 2024, Oracle and/or its affiliates. All rights reserved.
|
||||
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
||||
*
|
||||
* This code is free software; you can redistribute it and/or modify it
|
||||
@ -25,7 +25,7 @@
|
||||
* @test
|
||||
* @bug 8236246
|
||||
* @modules java.base/sun.nio.ch
|
||||
* @run testng InterruptibleOrNot
|
||||
* @run junit InterruptibleOrNot
|
||||
* @summary Test SelectorProviderImpl.openDatagramChannel(boolean) to create
|
||||
* DatagramChannel objects that optionally support interrupt
|
||||
*/
|
||||
@ -40,152 +40,178 @@ import java.nio.channels.AsynchronousCloseException;
|
||||
import java.nio.channels.ClosedByInterruptException;
|
||||
import java.nio.channels.DatagramChannel;
|
||||
import java.time.Duration;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.Arrays;
|
||||
import sun.nio.ch.DefaultSelectorProvider;
|
||||
|
||||
import org.testng.annotations.Test;
|
||||
import static org.testng.Assert.*;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.function.Executable;
|
||||
import static org.junit.jupiter.api.Assertions.*;
|
||||
|
||||
@Test
|
||||
public class InterruptibleOrNot {
|
||||
// DatagramChannel implementation class
|
||||
private static String dcImplClassName;
|
||||
|
||||
@BeforeAll
|
||||
static void setup() throws Exception {
|
||||
try (DatagramChannel dc = boundDatagramChannel(true)) {
|
||||
dcImplClassName = dc.getClass().getName();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Call DatagramChannel.receive with the interrupt status set, the DatagramChannel
|
||||
* is interruptible.
|
||||
*/
|
||||
@Test
|
||||
public void testInterruptBeforeInterruptibleReceive() throws Exception {
|
||||
testInterruptBeforeReceive(true);
|
||||
try (DatagramChannel dc = boundDatagramChannel(true)) {
|
||||
ByteBuffer buf = ByteBuffer.allocate(100);
|
||||
Thread.currentThread().interrupt();
|
||||
assertThrows(ClosedByInterruptException.class, () -> dc.receive(buf));
|
||||
assertFalse(dc.isOpen());
|
||||
} finally {
|
||||
Thread.interrupted(); // clear interrupt status
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test interrupting a thread blocked in DatagramChannel.receive, the DatagramChannel
|
||||
* is interruptible.
|
||||
*/
|
||||
@Test
|
||||
public void testInterruptDuringInterruptibleReceive() throws Exception {
|
||||
testInterruptDuringReceive(true);
|
||||
try (DatagramChannel dc = boundDatagramChannel(true)) {
|
||||
ByteBuffer buf = ByteBuffer.allocate(100);
|
||||
Thread thread = Thread.currentThread();
|
||||
onReceive(thread::interrupt);
|
||||
assertThrows(ClosedByInterruptException.class, () -> dc.receive(buf));
|
||||
assertFalse(dc.isOpen());
|
||||
} finally {
|
||||
Thread.interrupted(); // clear interrupt status
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Call DatagramChannel.receive with the interrupt status set, the DatagramChannel
|
||||
* is not interruptible.
|
||||
*/
|
||||
@Test
|
||||
public void testInterruptBeforeUninterruptibleReceive() throws Exception {
|
||||
testInterruptBeforeReceive(false);
|
||||
try (DatagramChannel dc = boundDatagramChannel(false)) {
|
||||
ByteBuffer buf = ByteBuffer.allocate(100);
|
||||
onReceive(() -> {
|
||||
// close the channel after a delay to ensure receive wakes up
|
||||
Thread.sleep(1000);
|
||||
dc.close();
|
||||
});
|
||||
Thread.currentThread().interrupt();
|
||||
assertThrows(AsynchronousCloseException.class, () -> dc.receive(buf));
|
||||
assertFalse(dc.isOpen());
|
||||
} finally {
|
||||
Thread.interrupted(); // clear interrupt status
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test interrupting a thread blocked in DatagramChannel.receive, the DatagramChannel
|
||||
* is not interruptible.
|
||||
*/
|
||||
@Test
|
||||
public void testInterruptDuringUninterruptibleReceive() throws Exception {
|
||||
testInterruptDuringReceive(false);
|
||||
try (DatagramChannel dc = boundDatagramChannel(true)) {
|
||||
ByteBuffer buf = ByteBuffer.allocate(100);
|
||||
|
||||
Thread thread = Thread.currentThread();
|
||||
onReceive(() -> {
|
||||
// interrupt should not cause the receive to wakeup
|
||||
thread.interrupt();
|
||||
|
||||
// close the channel after a delay to ensure receive wakes up
|
||||
Thread.sleep(1000);
|
||||
dc.close();
|
||||
});
|
||||
assertThrows(AsynchronousCloseException.class, () -> dc.receive(buf));
|
||||
assertFalse(dc.isOpen());
|
||||
} finally {
|
||||
Thread.interrupted(); // clear interrupt status
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Call DatagramChannel.send with the interrupt status set, the DatagramChannel
|
||||
* is interruptible.
|
||||
*/
|
||||
@Test
|
||||
public void testInterruptBeforeInterruptibleSend() throws Exception {
|
||||
testInterruptBeforeSend(true);
|
||||
try (DatagramChannel dc = boundDatagramChannel(true)) {
|
||||
ByteBuffer buf = ByteBuffer.allocate(100);
|
||||
SocketAddress target = dc.getLocalAddress();
|
||||
Thread.currentThread().interrupt();
|
||||
assertThrows(ClosedByInterruptException.class, () -> dc.send(buf, target));
|
||||
assertFalse(dc.isOpen());
|
||||
} finally {
|
||||
Thread.interrupted(); // clear interrupt
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Call DatagramChannel.send with the interrupt status set, the DatagramChannel
|
||||
* is not interruptible.
|
||||
*/
|
||||
@Test
|
||||
public void testInterruptBeforeUninterruptibleSend() throws Exception {
|
||||
testInterruptBeforeSend(false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test invoking DatagramChannel receive with interrupt status set
|
||||
*/
|
||||
static void testInterruptBeforeReceive(boolean interruptible)
|
||||
throws Exception
|
||||
{
|
||||
try (DatagramChannel dc = openDatagramChannel(interruptible)) {
|
||||
dc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
|
||||
Future<?> timeout = scheduleClose(dc, Duration.ofSeconds(2));
|
||||
try {
|
||||
ByteBuffer buf = ByteBuffer.allocate(100);
|
||||
Thread.currentThread().interrupt();
|
||||
assertThrows(expectedException(interruptible), () -> dc.receive(buf));
|
||||
} finally {
|
||||
timeout.cancel(false);
|
||||
}
|
||||
try (DatagramChannel dc = boundDatagramChannel(false)) {
|
||||
ByteBuffer buf = ByteBuffer.allocate(100);
|
||||
SocketAddress target = dc.getLocalAddress();
|
||||
Thread.currentThread().interrupt();
|
||||
int n = dc.send(buf, target);
|
||||
assertEquals(100, n);
|
||||
assertTrue(dc.isOpen());
|
||||
} finally {
|
||||
Thread.interrupted(); // clear interrupt
|
||||
Thread.interrupted(); // clear interrupt status
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test Thread.interrupt when target thread is blocked in DatagramChannel receive
|
||||
* Creates a DatagramChannel that is interruptible or not, and bound to the loopback
|
||||
* address.
|
||||
*/
|
||||
static void testInterruptDuringReceive(boolean interruptible)
|
||||
throws Exception
|
||||
{
|
||||
try (DatagramChannel dc = openDatagramChannel(interruptible)) {
|
||||
dc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
|
||||
Future<?> timerTask = scheduleClose(dc, Duration.ofSeconds(5));
|
||||
Future<?> interruptTask = scheduleInterrupt(Thread.currentThread(), Duration.ofSeconds(1));
|
||||
try {
|
||||
ByteBuffer buf = ByteBuffer.allocate(100);
|
||||
assertThrows(expectedException(interruptible), () -> dc.receive(buf));
|
||||
} finally {
|
||||
timerTask.cancel(false);
|
||||
interruptTask.cancel(false);
|
||||
}
|
||||
} finally {
|
||||
Thread.interrupted(); // clear interrupt
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test invoking DatagramChannel send with interrupt status set
|
||||
*/
|
||||
static void testInterruptBeforeSend(boolean interruptible)
|
||||
throws Exception
|
||||
{
|
||||
try (DatagramChannel dc = openDatagramChannel(interruptible)) {
|
||||
dc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
|
||||
Future<?> timeout = scheduleClose(dc, Duration.ofSeconds(2));
|
||||
try {
|
||||
ByteBuffer buf = ByteBuffer.allocate(100);
|
||||
SocketAddress target = dc.getLocalAddress();
|
||||
Thread.currentThread().interrupt();
|
||||
if (interruptible) {
|
||||
assertThrows(ClosedByInterruptException.class, () -> dc.send(buf, target));
|
||||
} else {
|
||||
int n = dc.send(buf, target);
|
||||
assertTrue(n == 100);
|
||||
}
|
||||
} finally {
|
||||
timeout.cancel(false);
|
||||
}
|
||||
} finally {
|
||||
Thread.interrupted(); // clear interrupt
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a DatagramChannel that is interruptible or not.
|
||||
*/
|
||||
static DatagramChannel openDatagramChannel(boolean interruptible) throws IOException {
|
||||
static DatagramChannel boundDatagramChannel(boolean interruptible) throws IOException {
|
||||
DatagramChannel dc;
|
||||
if (interruptible) {
|
||||
return DatagramChannel.open();
|
||||
dc = DatagramChannel.open();
|
||||
} else {
|
||||
return DefaultSelectorProvider.get().openUninterruptibleDatagramChannel();
|
||||
dc = DefaultSelectorProvider.get().openUninterruptibleDatagramChannel();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Expect ClosedByInterruptException if interruptible.
|
||||
*/
|
||||
static Class<? extends Exception> expectedException(boolean expectInterrupt) {
|
||||
if (expectInterrupt) {
|
||||
return ClosedByInterruptException.class;
|
||||
} else {
|
||||
return AsynchronousCloseException.class;
|
||||
try {
|
||||
dc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
|
||||
} catch (IOException ioe) {
|
||||
dc.close();
|
||||
throw ioe;
|
||||
}
|
||||
return dc;
|
||||
}
|
||||
|
||||
/**
|
||||
* Schedule the given object to be closed.
|
||||
* Runs the given action when the current thread is sampled in DatagramChannel.receive.
|
||||
*/
|
||||
static Future<?> scheduleClose(Closeable c, Duration timeout) {
|
||||
long nanos = TimeUnit.NANOSECONDS.convert(timeout);
|
||||
return STPE.schedule(() -> {
|
||||
c.close();
|
||||
return null;
|
||||
}, nanos, TimeUnit.NANOSECONDS);
|
||||
static void onReceive(Executable action) {
|
||||
Thread target = Thread.currentThread();
|
||||
Thread.ofPlatform().daemon().start(() -> {
|
||||
try {
|
||||
boolean found = false;
|
||||
while (!found) {
|
||||
Thread.sleep(20);
|
||||
StackTraceElement[] stack = target.getStackTrace();
|
||||
found = Arrays.stream(stack)
|
||||
.anyMatch(e -> dcImplClassName.equals(e.getClassName())
|
||||
&& "receive".equals(e.getMethodName()));
|
||||
}
|
||||
action.execute();
|
||||
} catch (Throwable ex) {
|
||||
ex.printStackTrace();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Schedule the given thread to be interrupted.
|
||||
*/
|
||||
static Future<?> scheduleInterrupt(Thread t, Duration timeout) {
|
||||
long nanos = TimeUnit.NANOSECONDS.convert(timeout);
|
||||
return STPE.schedule(t::interrupt, nanos, TimeUnit.NANOSECONDS);
|
||||
}
|
||||
|
||||
static final ScheduledExecutorService STPE = Executors.newScheduledThreadPool(0);
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user