From dfa18fe6b395171c821cde02f081e12dd1565ba5 Mon Sep 17 00:00:00 2001 From: Alan Bateman Date: Thu, 21 Nov 2024 16:25:43 +0000 Subject: [PATCH] 8344328: (dc) DatagramChannelImpl.blockingReceive can now synchronize on packet Reviewed-by: dfuchs, jpai --- .../sun/nio/ch/DatagramChannelImpl.java | 73 ++++++++----------- .../sun/nio/ch/DatagramSocketAdaptor.java | 8 +- 2 files changed, 37 insertions(+), 44 deletions(-) diff --git a/src/java.base/share/classes/sun/nio/ch/DatagramChannelImpl.java b/src/java.base/share/classes/sun/nio/ch/DatagramChannelImpl.java index 74c54105201..33948afbcb0 100644 --- a/src/java.base/share/classes/sun/nio/ch/DatagramChannelImpl.java +++ b/src/java.base/share/classes/sun/nio/ch/DatagramChannelImpl.java @@ -599,8 +599,7 @@ class DatagramChannelImpl * @throws SocketTimeoutException if the timeout elapses */ void blockingReceive(DatagramPacket p, long nanos) throws IOException { - Objects.requireNonNull(p); - assert nanos >= 0; + assert Thread.holdsLock(p) && nanos >= 0; readLock.lock(); try { @@ -615,37 +614,31 @@ class DatagramChannelImpl configureSocketNonBlockingIfVirtualThread(); } - // p.bufLength is the maximum size of the datagram that can be received - int bufLength; - synchronized (p) { - bufLength = DatagramPackets.getBufLength(p); - } - boolean completed = false; try { SocketAddress remote = beginRead(true, false); boolean connected = (remote != null); + + // p.bufLength is the maximum size of the datagram that can be received + int bufLength = DatagramPackets.getBufLength(p); ByteBuffer dst = tryBlockingReceive(connected, bufLength, nanos); if (dst != null) { - // if datagram received then get sender and copy to DatagramPacket + // copy to DatagramPacket, set length and sender try { - SocketAddress sender = sourceSocketAddress(); - synchronized (p) { - // copy bytes to the DatagramPacket, and set length and sender. - // Need to re-read p.bufLength in case DatagramPacket changed - int len = Math.min(dst.limit(), DatagramPackets.getBufLength(p)); - dst.get(p.getData(), p.getOffset(), len); - DatagramPackets.setLength(p, len); - p.setSocketAddress(sender); - } + int len = dst.limit(); + dst.get(p.getData(), p.getOffset(), len); + DatagramPackets.setLength(p, len); + p.setSocketAddress(sourceSocketAddress()); } finally { Util.offerFirstTemporaryDirectBuffer(dst); } completed = true; } + } finally { endRead(true, completed); } + } finally { readLock.unlock(); } @@ -835,7 +828,7 @@ class DatagramChannelImpl * @throws IllegalBlockingModeException if the channel is non-blocking */ void blockingSend(DatagramPacket p) throws IOException { - Objects.requireNonNull(p); + assert Thread.holdsLock(p); writeLock.lock(); try { @@ -843,38 +836,34 @@ class DatagramChannelImpl if (!isBlocking()) throw new IllegalBlockingModeException(); - ByteBuffer src = null; + int len = p.getLength(); + ByteBuffer src = Util.getTemporaryDirectBuffer(len); try { + // copy bytes to temporary direct buffer + src.put(p.getData(), p.getOffset(), len); + src.flip(); + + // target address InetSocketAddress target; - synchronized (p) { - int len = p.getLength(); - src = Util.getTemporaryDirectBuffer(len); - - // copy bytes to temporary direct buffer - src.put(p.getData(), p.getOffset(), len); - src.flip(); - - // target address - if (p.getAddress() == null) { - InetSocketAddress remote = remoteAddress(); - if (remote == null) { - throw new IllegalArgumentException("Address not set"); - } - // set address/port to be compatible with long standing behavior - p.setAddress(remote.getAddress()); - p.setPort(remote.getPort()); - target = remote; - } else { - target = (InetSocketAddress) p.getSocketAddress(); + if (p.getAddress() == null) { + InetSocketAddress remote = remoteAddress(); + if (remote == null) { + throw new IllegalArgumentException("Address not set"); } + // set address/port to be compatible with long-standing behavior + p.setAddress(remote.getAddress()); + p.setPort(remote.getPort()); + target = remote; + } else { + target = (InetSocketAddress) p.getSocketAddress(); } // send the datagram (does not block) send(src, target); - } finally { - if (src != null) Util.offerFirstTemporaryDirectBuffer(src); + Util.offerFirstTemporaryDirectBuffer(src); } + } finally { writeLock.unlock(); } diff --git a/src/java.base/share/classes/sun/nio/ch/DatagramSocketAdaptor.java b/src/java.base/share/classes/sun/nio/ch/DatagramSocketAdaptor.java index 6ea20ec6270..4930e1447a3 100644 --- a/src/java.base/share/classes/sun/nio/ch/DatagramSocketAdaptor.java +++ b/src/java.base/share/classes/sun/nio/ch/DatagramSocketAdaptor.java @@ -174,7 +174,9 @@ public class DatagramSocketAdaptor @Override public void send(DatagramPacket p) throws IOException { try { - dc.blockingSend(p); + synchronized (p) { + dc.blockingSend(p); + } } catch (AlreadyConnectedException e) { throw new IllegalArgumentException("Connected and packet address differ"); } catch (ClosedChannelException e) { @@ -185,7 +187,9 @@ public class DatagramSocketAdaptor @Override public void receive(DatagramPacket p) throws IOException { try { - dc.blockingReceive(p, MILLISECONDS.toNanos(timeout)); + synchronized (p) { + dc.blockingReceive(p, MILLISECONDS.toNanos(timeout)); + } } catch (SocketTimeoutException | ClosedByInterruptException e) { throw e; } catch (InterruptedIOException e) {