8344328: (dc) DatagramChannelImpl.blockingReceive can now synchronize on packet

Reviewed-by: dfuchs, jpai
This commit is contained in:
Alan Bateman 2024-11-21 16:25:43 +00:00
parent 87be63f85d
commit dfa18fe6b3
2 changed files with 37 additions and 44 deletions

View File

@ -599,8 +599,7 @@ class DatagramChannelImpl
* @throws SocketTimeoutException if the timeout elapses * @throws SocketTimeoutException if the timeout elapses
*/ */
void blockingReceive(DatagramPacket p, long nanos) throws IOException { void blockingReceive(DatagramPacket p, long nanos) throws IOException {
Objects.requireNonNull(p); assert Thread.holdsLock(p) && nanos >= 0;
assert nanos >= 0;
readLock.lock(); readLock.lock();
try { try {
@ -615,37 +614,31 @@ class DatagramChannelImpl
configureSocketNonBlockingIfVirtualThread(); configureSocketNonBlockingIfVirtualThread();
} }
// p.bufLength is the maximum size of the datagram that can be received
int bufLength;
synchronized (p) {
bufLength = DatagramPackets.getBufLength(p);
}
boolean completed = false; boolean completed = false;
try { try {
SocketAddress remote = beginRead(true, false); SocketAddress remote = beginRead(true, false);
boolean connected = (remote != null); boolean connected = (remote != null);
// p.bufLength is the maximum size of the datagram that can be received
int bufLength = DatagramPackets.getBufLength(p);
ByteBuffer dst = tryBlockingReceive(connected, bufLength, nanos); ByteBuffer dst = tryBlockingReceive(connected, bufLength, nanos);
if (dst != null) { if (dst != null) {
// if datagram received then get sender and copy to DatagramPacket // copy to DatagramPacket, set length and sender
try { try {
SocketAddress sender = sourceSocketAddress(); int len = dst.limit();
synchronized (p) { dst.get(p.getData(), p.getOffset(), len);
// copy bytes to the DatagramPacket, and set length and sender. DatagramPackets.setLength(p, len);
// Need to re-read p.bufLength in case DatagramPacket changed p.setSocketAddress(sourceSocketAddress());
int len = Math.min(dst.limit(), DatagramPackets.getBufLength(p));
dst.get(p.getData(), p.getOffset(), len);
DatagramPackets.setLength(p, len);
p.setSocketAddress(sender);
}
} finally { } finally {
Util.offerFirstTemporaryDirectBuffer(dst); Util.offerFirstTemporaryDirectBuffer(dst);
} }
completed = true; completed = true;
} }
} finally { } finally {
endRead(true, completed); endRead(true, completed);
} }
} finally { } finally {
readLock.unlock(); readLock.unlock();
} }
@ -835,7 +828,7 @@ class DatagramChannelImpl
* @throws IllegalBlockingModeException if the channel is non-blocking * @throws IllegalBlockingModeException if the channel is non-blocking
*/ */
void blockingSend(DatagramPacket p) throws IOException { void blockingSend(DatagramPacket p) throws IOException {
Objects.requireNonNull(p); assert Thread.holdsLock(p);
writeLock.lock(); writeLock.lock();
try { try {
@ -843,38 +836,34 @@ class DatagramChannelImpl
if (!isBlocking()) if (!isBlocking())
throw new IllegalBlockingModeException(); throw new IllegalBlockingModeException();
ByteBuffer src = null; int len = p.getLength();
ByteBuffer src = Util.getTemporaryDirectBuffer(len);
try { try {
// copy bytes to temporary direct buffer
src.put(p.getData(), p.getOffset(), len);
src.flip();
// target address
InetSocketAddress target; InetSocketAddress target;
synchronized (p) { if (p.getAddress() == null) {
int len = p.getLength(); InetSocketAddress remote = remoteAddress();
src = Util.getTemporaryDirectBuffer(len); if (remote == null) {
throw new IllegalArgumentException("Address not set");
// copy bytes to temporary direct buffer
src.put(p.getData(), p.getOffset(), len);
src.flip();
// target address
if (p.getAddress() == null) {
InetSocketAddress remote = remoteAddress();
if (remote == null) {
throw new IllegalArgumentException("Address not set");
}
// set address/port to be compatible with long standing behavior
p.setAddress(remote.getAddress());
p.setPort(remote.getPort());
target = remote;
} else {
target = (InetSocketAddress) p.getSocketAddress();
} }
// set address/port to be compatible with long-standing behavior
p.setAddress(remote.getAddress());
p.setPort(remote.getPort());
target = remote;
} else {
target = (InetSocketAddress) p.getSocketAddress();
} }
// send the datagram (does not block) // send the datagram (does not block)
send(src, target); send(src, target);
} finally { } finally {
if (src != null) Util.offerFirstTemporaryDirectBuffer(src); Util.offerFirstTemporaryDirectBuffer(src);
} }
} finally { } finally {
writeLock.unlock(); writeLock.unlock();
} }

View File

@ -174,7 +174,9 @@ public class DatagramSocketAdaptor
@Override @Override
public void send(DatagramPacket p) throws IOException { public void send(DatagramPacket p) throws IOException {
try { try {
dc.blockingSend(p); synchronized (p) {
dc.blockingSend(p);
}
} catch (AlreadyConnectedException e) { } catch (AlreadyConnectedException e) {
throw new IllegalArgumentException("Connected and packet address differ"); throw new IllegalArgumentException("Connected and packet address differ");
} catch (ClosedChannelException e) { } catch (ClosedChannelException e) {
@ -185,7 +187,9 @@ public class DatagramSocketAdaptor
@Override @Override
public void receive(DatagramPacket p) throws IOException { public void receive(DatagramPacket p) throws IOException {
try { try {
dc.blockingReceive(p, MILLISECONDS.toNanos(timeout)); synchronized (p) {
dc.blockingReceive(p, MILLISECONDS.toNanos(timeout));
}
} catch (SocketTimeoutException | ClosedByInterruptException e) { } catch (SocketTimeoutException | ClosedByInterruptException e) {
throw e; throw e;
} catch (InterruptedIOException e) { } catch (InterruptedIOException e) {