8312166: (dc) DatagramChannel's socket adaptor does not release carrier thread when blocking in receive

Reviewed-by: jpai, michaelm
This commit is contained in:
Alan Bateman 2023-07-19 13:17:37 +00:00
parent e7adbdb1f1
commit 028068a655
3 changed files with 324 additions and 196 deletions

View File

@ -33,6 +33,7 @@ import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.lang.ref.Cleaner.Cleanable;
import java.lang.reflect.Method;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.Inet4Address;
import java.net.Inet6Address;
@ -650,114 +651,130 @@ class DatagramChannelImpl
}
/**
* Receives a datagram into the given buffer.
* Receives a datagram.
*
* @apiNote This method is for use by the socket adaptor. The buffer is
* assumed to be trusted, meaning it is not accessible to user code.
* @apiNote This method is for use by the socket adaptor.
*
* @throws IllegalBlockingModeException if the channel is non-blocking
* @throws SocketTimeoutException if the timeout elapses
*/
SocketAddress blockingReceive(ByteBuffer dst, long nanos) throws IOException {
void blockingReceive(DatagramPacket p, long nanos) throws IOException {
Objects.requireNonNull(p);
assert nanos >= 0;
readLock.lock();
try {
ensureOpen();
if (!isBlocking())
throw new IllegalBlockingModeException();
@SuppressWarnings("removal")
SecurityManager sm = System.getSecurityManager();
boolean connected = isConnected();
SocketAddress sender;
do {
if (nanos > 0) {
sender = trustedBlockingReceive(dst, nanos);
} else {
sender = trustedBlockingReceive(dst);
}
// check sender when security manager set and not connected
if (sm != null && !connected) {
InetSocketAddress isa = (InetSocketAddress) sender;
try {
sm.checkAccept(isa.getAddress().getHostAddress(), isa.getPort());
} catch (SecurityException e) {
sender = null;
// underlying socket needs to be non-blocking if timed receive or virtual thread
if (nanos > 0) {
configureSocketNonBlocking();
} else {
configureSocketNonBlockingIfVirtualThread();
nanos = Long.MAX_VALUE;
}
// p.bufLength is the maximum size of the datagram that can be received
int bufLength;
synchronized (p) {
bufLength = DatagramPackets.getBufLength(p);
}
long startNanos = System.nanoTime();
SocketAddress sender = null;
try {
SocketAddress remote = beginRead(true, false);
boolean connected = (remote != null);
do {
long remainingNanos = nanos - (System.nanoTime() - startNanos);
ByteBuffer dst = tryBlockingReceive(connected, bufLength, remainingNanos);
// if datagram received then get sender and copy to DatagramPacket
if (dst != null) {
try {
// sender address is in socket address buffer
sender = sourceSocketAddress();
// check sender when security manager set and not connected
@SuppressWarnings("removal")
SecurityManager sm = System.getSecurityManager();
if (sm != null && !connected) {
InetSocketAddress isa = (InetSocketAddress) sender;
try {
sm.checkAccept(isa.getAddress().getHostAddress(), isa.getPort());
} catch (SecurityException e) {
sender = null;
}
}
// copy bytes to the DatagramPacket, and set length and sender
if (sender != null) {
synchronized (p) {
// re-read p.bufLength in case DatagramPacket changed
int len = Math.min(dst.limit(), DatagramPackets.getBufLength(p));
dst.get(p.getData(), p.getOffset(), len);
DatagramPackets.setLength(p, len);
p.setSocketAddress(sender);
}
}
} finally {
Util.offerFirstTemporaryDirectBuffer(dst);
}
}
}
} while (sender == null);
return sender;
} while (sender == null && isOpen());
} finally {
endRead(true, (sender != null));
}
} finally {
readLock.unlock();
}
}
/**
* Receives a datagram into given buffer. This method is used to support
* the socket adaptor. The buffer is assumed to be trusted.
* Attempt to receive a datagram.
*
* @param connected if the channel's socket is connected
* @param len the maximum size of the datagram to receive
* @param nanos the timeout, should be Long.MAX_VALUE for untimed
* @return a direct buffer containing the datagram or null if channel is closed
* @throws SocketTimeoutException if the timeout elapses
*/
private SocketAddress trustedBlockingReceive(ByteBuffer dst)
private ByteBuffer tryBlockingReceive(boolean connected, int len, long nanos)
throws IOException
{
assert readLock.isHeldByCurrentThread() && isBlocking();
SocketAddress sender = null;
long startNanos = System.nanoTime();
ByteBuffer dst = Util.getTemporaryDirectBuffer(len);
int n = -1;
try {
SocketAddress remote = beginRead(true, false);
configureSocketNonBlockingIfVirtualThread();
boolean connected = (remote != null);
int n = receive(dst, connected);
while (IOStatus.okayToRetry(n) && isOpen()) {
park(Net.POLLIN);
n = receive(dst, connected);
while (n == IOStatus.UNAVAILABLE && isOpen()) {
// virtual thread needs to release temporary direct buffer before parking
if (Thread.currentThread().isVirtual()) {
Util.offerFirstTemporaryDirectBuffer(dst);
dst = null;
}
long remainingNanos = nanos - (System.nanoTime() - startNanos);
if (remainingNanos <= 0) {
throw new SocketTimeoutException("Receive timed out");
}
park(Net.POLLIN, remainingNanos);
// virtual thread needs to re-allocate temporary direct buffer after parking
if (Thread.currentThread().isVirtual()) {
dst = Util.getTemporaryDirectBuffer(len);
}
n = receive(dst, connected);
}
if (n > 0 || (n == 0 && isOpen())) {
// sender address is in socket address buffer
sender = sourceSocketAddress();
}
return sender;
dst.flip();
} finally {
endRead(true, (sender != null));
}
}
/**
* Receives a datagram into given buffer with a timeout. This method is
* used to support the socket adaptor. The buffer is assumed to be trusted.
* @throws SocketTimeoutException if the timeout elapses
*/
private SocketAddress trustedBlockingReceive(ByteBuffer dst, long nanos)
throws IOException
{
assert readLock.isHeldByCurrentThread() && isBlocking();
SocketAddress sender = null;
try {
SocketAddress remote = beginRead(true, false);
boolean connected = (remote != null);
// change socket to non-blocking
lockedConfigureBlocking(false);
try {
long startNanos = System.nanoTime();
int n = receive(dst, connected);
while (n == IOStatus.UNAVAILABLE && isOpen()) {
long remainingNanos = nanos - (System.nanoTime() - startNanos);
if (remainingNanos <= 0) {
throw new SocketTimeoutException("Receive timed out");
}
park(Net.POLLIN, remainingNanos);
n = receive(dst, connected);
}
if (n > 0 || (n == 0 && isOpen())) {
// sender address is in socket address buffer
sender = sourceSocketAddress();
}
return sender;
} finally {
// restore socket to blocking mode (if channel is open)
tryLockedConfigureBlocking(true);
// release buffer if no datagram received
if (dst != null && (n < 0 || (n == 0 && !isOpen()))) {
Util.offerFirstTemporaryDirectBuffer(dst);
dst = null;
}
} finally {
endRead(true, (sender != null));
}
return dst;
}
/**
@ -889,19 +906,54 @@ class DatagramChannelImpl
}
/**
* Sends a datagram from the bytes in given buffer.
* Sends a datagram.
*
* @apiNote This method is for use by the socket adaptor.
*
* @throws IllegalArgumentException if not connected and target address not set
* @throws IllegalBlockingModeException if the channel is non-blocking
*/
void blockingSend(ByteBuffer src, SocketAddress target) throws IOException {
void blockingSend(DatagramPacket p) throws IOException {
Objects.requireNonNull(p);
writeLock.lock();
try {
ensureOpen();
if (!isBlocking())
throw new IllegalBlockingModeException();
send(src, target);
ByteBuffer src = null;
try {
InetSocketAddress target;
synchronized (p) {
int len = p.getLength();
src = Util.getTemporaryDirectBuffer(len);
// copy bytes to temporary direct buffer
src.put(p.getData(), p.getOffset(), len);
src.flip();
// target address
if (p.getAddress() == null) {
InetSocketAddress remote = remoteAddress();
if (remote == null) {
throw new IllegalArgumentException("Address not set");
}
// set address/port to be compatible with long standing behavior
p.setAddress(remote.getAddress());
p.setPort(remote.getPort());
target = remote;
} else {
target = (InetSocketAddress) p.getSocketAddress();
}
}
// send the datagram (does not block)
send(src, target);
} finally {
if (src != null) Util.offerFirstTemporaryDirectBuffer(src);
}
} finally {
writeLock.unlock();
}
@ -1198,12 +1250,12 @@ class DatagramChannelImpl
}
/**
* Ensures that the socket is configured non-blocking when on a virtual thread.
* Ensures that the socket is configured non-blocking.
* @throws IOException if there is an I/O error changing the blocking mode
*/
private void configureSocketNonBlockingIfVirtualThread() throws IOException {
private void configureSocketNonBlocking() throws IOException {
assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
if (!forcedNonBlocking && Thread.currentThread().isVirtual()) {
if (!forcedNonBlocking) {
synchronized (stateLock) {
ensureOpen();
IOUtil.configureBlocking(fd, false);
@ -1212,6 +1264,16 @@ class DatagramChannelImpl
}
}
/**
* Ensures that the socket is configured non-blocking when on a virtual thread.
* @throws IOException if there is an I/O error changing the blocking mode
*/
private void configureSocketNonBlockingIfVirtualThread() throws IOException {
if (Thread.currentThread().isVirtual()) {
configureSocketNonBlocking();
}
}
InetSocketAddress localAddress() {
synchronized (stateLock) {
return localAddress;
@ -1952,6 +2014,44 @@ class DatagramChannelImpl
};
}
/**
* Defines static methods to get/set DatagramPacket fields and workaround
* DatagramPacket deficiencies.
*/
private static class DatagramPackets {
private static final VarHandle LENGTH;
private static final VarHandle BUF_LENGTH;
static {
try {
PrivilegedExceptionAction<MethodHandles.Lookup> pa = () ->
MethodHandles.privateLookupIn(DatagramPacket.class, MethodHandles.lookup());
@SuppressWarnings("removal")
MethodHandles.Lookup l = AccessController.doPrivileged(pa);
LENGTH = l.findVarHandle(DatagramPacket.class, "length", int.class);
BUF_LENGTH = l.findVarHandle(DatagramPacket.class, "bufLength", int.class);
} catch (Exception e) {
throw new ExceptionInInitializerError(e);
}
}
/**
* Sets the DatagramPacket.length field. DatagramPacket.setLength cannot be
* used at this time because it sets both the length and bufLength fields.
*/
static void setLength(DatagramPacket p, int value) {
assert Thread.holdsLock(p);
LENGTH.set(p, value);
}
/**
* Returns the value of the DatagramPacket.bufLength field.
*/
static int getBufLength(DatagramPacket p) {
assert Thread.holdsLock(p);
return (int) BUF_LENGTH.get(p);
}
}
// -- Native methods --
private static native void disconnect0(FileDescriptor fd, boolean isIPv6)

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2001, 2022, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2001, 2023, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -32,7 +32,6 @@ import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodHandles.Lookup;
import java.lang.invoke.MethodType;
import java.lang.invoke.VarHandle;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
@ -44,7 +43,6 @@ import java.net.SocketException;
import java.net.SocketOption;
import java.net.SocketTimeoutException;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.channels.AlreadyConnectedException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ClosedByInterruptException;
@ -56,7 +54,6 @@ import java.security.PrivilegedExceptionAction;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.locks.ReentrantLock;
import jdk.internal.misc.Blocker;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
@ -192,79 +189,30 @@ public class DatagramSocketAdaptor
@Override
public void send(DatagramPacket p) throws IOException {
synchronized (p) {
int len = p.getLength();
ByteBuffer bb = Util.getTemporaryDirectBuffer(len);
try {
// copy bytes to temporary direct buffer
bb.put(p.getData(), p.getOffset(), len);
bb.flip();
// target address
InetSocketAddress target;
if (p.getAddress() == null) {
InetSocketAddress remote = dc.remoteAddress();
if (remote == null) {
// not specified by DatagramSocket
throw new IllegalArgumentException("Address not set");
}
// set address/port to maintain compatibility with DatagramSocket
p.setAddress(remote.getAddress());
p.setPort(remote.getPort());
target = remote;
} else {
target = (InetSocketAddress) p.getSocketAddress();
}
// send datagram
dc.blockingSend(bb, target);
} catch (AlreadyConnectedException e) {
throw new IllegalArgumentException("Connected and packet address differ");
} catch (ClosedChannelException e) {
throw new SocketException("Socket closed", e);
} finally {
Util.offerFirstTemporaryDirectBuffer(bb);
}
try {
dc.blockingSend(p);
} catch (AlreadyConnectedException e) {
throw new IllegalArgumentException("Connected and packet address differ");
} catch (ClosedChannelException e) {
throw new SocketException("Socket closed", e);
}
}
@Override
public void receive(DatagramPacket p) throws IOException {
synchronized (p) {
// get temporary direct buffer with a capacity of p.bufLength
int bufLength = DatagramPackets.getBufLength(p);
ByteBuffer bb = Util.getTemporaryDirectBuffer(bufLength);
try {
SocketAddress sender;
long comp = Blocker.begin();
try {
sender = dc.blockingReceive(bb, MILLISECONDS.toNanos(timeout));
} finally {
Blocker.end(comp);
}
bb.flip();
// copy bytes to the DatagramPacket and set length
int len = Math.min(bb.limit(), DatagramPackets.getBufLength(p));
bb.get(p.getData(), p.getOffset(), len);
DatagramPackets.setLength(p, len);
// sender address
p.setSocketAddress(sender);
} catch (SocketTimeoutException | ClosedByInterruptException e) {
throw e;
} catch (InterruptedIOException e) {
Thread thread = Thread.currentThread();
if (thread.isVirtual() && thread.isInterrupted()) {
close();
throw new SocketException("Closed by interrupt");
}
throw e;
} catch (ClosedChannelException e) {
throw new SocketException("Socket closed", e);
} finally {
Util.offerFirstTemporaryDirectBuffer(bb);
try {
dc.blockingReceive(p, MILLISECONDS.toNanos(timeout));
} catch (SocketTimeoutException | ClosedByInterruptException e) {
throw e;
} catch (InterruptedIOException e) {
Thread thread = Thread.currentThread();
if (thread.isVirtual() && thread.isInterrupted()) {
close();
throw new SocketException("Closed by interrupt");
}
throw e;
} catch (ClosedChannelException e) {
throw new SocketException("Socket closed", e);
}
}
@ -704,44 +652,6 @@ public class DatagramSocketAdaptor
return new InetSocketAddress(0).getAddress();
}
/**
* Defines static methods to get/set DatagramPacket fields and workaround
* DatagramPacket deficiencies.
*/
private static class DatagramPackets {
private static final VarHandle LENGTH;
private static final VarHandle BUF_LENGTH;
static {
try {
PrivilegedExceptionAction<MethodHandles.Lookup> pa = () ->
MethodHandles.privateLookupIn(DatagramPacket.class, MethodHandles.lookup());
@SuppressWarnings("removal")
MethodHandles.Lookup l = AccessController.doPrivileged(pa);
LENGTH = l.findVarHandle(DatagramPacket.class, "length", int.class);
BUF_LENGTH = l.findVarHandle(DatagramPacket.class, "bufLength", int.class);
} catch (Exception e) {
throw new ExceptionInInitializerError(e);
}
}
/**
* Sets the DatagramPacket.length field. DatagramPacket.setLength cannot be
* used at this time because it sets both the length and bufLength fields.
*/
static void setLength(DatagramPacket p, int value) {
assert Thread.holdsLock(p);
LENGTH.set(p, value);
}
/**
* Returns the value of the DatagramPacket.bufLength field.
*/
static int getBufLength(DatagramPacket p) {
assert Thread.holdsLock(p);
return (int) BUF_LENGTH.get(p);
}
}
/**
* Defines static methods to invoke non-public NetworkInterface methods.
*/

View File

@ -0,0 +1,118 @@
/*
* Copyright (c) 2023, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
/**
* @test
* @summary Test a timed DatagramSocket.receive with a SecurityManager set
* @run main/othervm -Djava.security.manager=allow TimeoutWithSM
*/
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.SocketTimeoutException;
import java.security.Permission;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicBoolean;
public class TimeoutWithSM {
private static final int TIMEOUT = 10_000;
public static void main(String[] args) throws Exception {
try (var socket = new DatagramSocket(null)) {
InetAddress lb = InetAddress.getLoopbackAddress();
socket.bind(new InetSocketAddress(lb, 0));
// start sender to send datagrams to us
var done = new AtomicBoolean();
startSender(socket.getLocalSocketAddress(), done);
// set a SecurityManager that blocks datagrams from sender
System.setSecurityManager(new SecurityManager() {
@Override
public void checkPermission(Permission p) {
}
@Override
public void checkAccept(String host, int port) {
var isa = new InetSocketAddress(host, port);
System.out.println("checkAccept " + isa);
throw new SecurityException();
}
});
// timed receive, should throw SocketTimeoutException
try {
socket.setSoTimeout(TIMEOUT);
try {
byte[] bytes = new byte[1024];
DatagramPacket p = new DatagramPacket(bytes, bytes.length);
socket.receive(p);
throw new RuntimeException("Packet received, unexpected!!! "
+ " sender=" + p.getSocketAddress() + ", len=" + p.getLength());
} catch (SocketTimeoutException expected) {
System.out.println(expected + ", expected!!!");
}
} finally {
done.set(true);
}
}
}
/**
* Start a thread to send datagrams to the given target address at intervals of
* one second. The sender stops when done is set to true.
*/
static void startSender(SocketAddress target, AtomicBoolean done) throws Exception {
assert target instanceof InetSocketAddress isa && isa.getAddress().isLoopbackAddress();
var sender = new DatagramSocket(null);
boolean started = false;
try {
InetAddress lb = InetAddress.getLoopbackAddress();
sender.bind(new InetSocketAddress(lb, 0));
Thread.ofPlatform().start(() -> {
try {
try (sender) {
byte[] bytes = "hello".getBytes("UTF-8");
DatagramPacket p = new DatagramPacket(bytes, bytes.length);
p.setSocketAddress(target);
while (!done.get()) {
System.out.println("Send datagram to " + target + " ...");
sender.send(p);
Thread.sleep(Duration.ofSeconds(1));
}
}
} catch (Exception e) {
e.printStackTrace();
}
});
started = true;
} finally {
if (!started) {
sender.close();
}
}
}
}