8310978: JFR events SocketReadEvent/SocketWriteEvent for Socket adaptor ops

Reviewed-by: dfuchs, alanb
This commit is contained in:
Tim Prinzing 2023-10-30 06:04:17 +00:00 committed by Alan Bateman
parent 988e1dfe6e
commit 1183b221c2
8 changed files with 204 additions and 30 deletions

@ -110,6 +110,23 @@ public class SocketReadEvent extends Event {
* timestamp and the given start time. If the duration is meets
* or exceeds the configured value (determined by calling the generated method
* {@link #shouldCommit(long)}), an event will be emitted by calling
* {@link #emit(long, long, long, SocketAddress, long)}
*
* @param start the start time
* @param nbytes how many bytes were transferred
* @param remote the address of the remote socket
* @param timeout maximum time to wait
*/
public static void offer(long start, long nbytes, SocketAddress remote, long timeout) {
long duration = timestamp() - start;
if (shouldCommit(duration)) {
emit(start, duration, nbytes, remote, timeout);
}
}
/**
* Helper method to perform a common task of getting event data ready and
* then emitting the event by calling
* {@link #commit(long, long, String, String, int, long, long, boolean)}.
*
* @param start the start time

@ -105,6 +105,22 @@ public class SocketWriteEvent extends Event {
* timestamp and the given start time. If the duration is meets
* or exceeds the configured value (determined by calling the generated method
* {@link #shouldCommit(long)}), an event will be emitted by calling
* {@link #emit(long, long, long, SocketAddress)}.
*
* @param start the start time
* @param bytesWritten how many bytes were sent
* @param remote the address of the remote socket being written to
*/
public static void offer(long start, long bytesWritten, SocketAddress remote) {
long duration = timestamp() - start;
if (shouldCommit(duration)) {
emit(start, duration, bytesWritten, remote);
}
}
/**
* Helper method to perform a common task of getting event data ready and
* then emitting the event by calling
* {@link #commit(long, long, String, String, int, long)}.
*
* @param start the start time

@ -494,10 +494,7 @@ class SocketChannelImpl
}
long start = SocketReadEvent.timestamp();
int nbytes = implRead(buf);
long duration = SocketReadEvent.timestamp() - start;
if (SocketReadEvent.shouldCommit(duration)) {
SocketReadEvent.emit(start, duration, nbytes, remoteAddress(), 0);
}
SocketReadEvent.offer(start, nbytes, remoteAddress(), 0);
return nbytes;
}
@ -511,10 +508,7 @@ class SocketChannelImpl
}
long start = SocketReadEvent.timestamp();
long nbytes = implRead(dsts, offset, length);
long duration = SocketReadEvent.timestamp() - start;
if (SocketReadEvent.shouldCommit(duration)) {
SocketReadEvent.emit(start, duration, nbytes, remoteAddress(), 0);
}
SocketReadEvent.offer(start, nbytes, remoteAddress(), 0);
return nbytes;
}
@ -625,10 +619,7 @@ class SocketChannelImpl
}
long start = SocketWriteEvent.timestamp();
int nbytes = implWrite(buf);
long duration = SocketWriteEvent.timestamp() - start;
if (SocketWriteEvent.shouldCommit(duration)) {
SocketWriteEvent.emit(start, duration, nbytes, remoteAddress());
}
SocketWriteEvent.offer(start, nbytes, remoteAddress());
return nbytes;
}
@ -641,10 +632,7 @@ class SocketChannelImpl
}
long start = SocketWriteEvent.timestamp();
long nbytes = implWrite(srcs, offset, length);
long duration = SocketWriteEvent.timestamp() - start;
if (SocketWriteEvent.shouldCommit(duration)) {
SocketWriteEvent.emit(start, duration, nbytes, remoteAddress());
}
SocketWriteEvent.offer(start, nbytes, remoteAddress());
return nbytes;
}

@ -1,5 +1,5 @@
/*
* Copyright (c) 2022, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2022, 2023, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -24,6 +24,8 @@
*/
package sun.nio.ch;
import jdk.internal.event.SocketReadEvent;
import java.io.IOException;
import java.io.InputStream;
import java.util.function.IntSupplier;
@ -60,9 +62,7 @@ class SocketInputStream extends InputStream {
return (n > 0) ? (a[0] & 0xff) : -1;
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
int timeout = timeoutSupplier.getAsInt();
private int implRead(byte[] b, int off, int len, int timeout) throws IOException {
if (timeout > 0) {
long nanos = MILLISECONDS.toNanos(timeout);
return sc.blockingRead(b, off, len, nanos);
@ -71,6 +71,18 @@ class SocketInputStream extends InputStream {
}
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
int timeout = timeoutSupplier.getAsInt();
if (!SocketReadEvent.enabled()) {
return implRead(b, off, len, timeout);
}
long start = SocketReadEvent.timestamp();
int n = implRead(b, off, len, timeout);
SocketReadEvent.offer(start, n, sc.remoteAddress(), timeout);
return n;
}
@Override
public int available() throws IOException {
return sc.available();

@ -1,5 +1,5 @@
/*
* Copyright (c) 2022, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2022, 2023, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -24,6 +24,8 @@
*/
package sun.nio.ch;
import jdk.internal.event.SocketWriteEvent;
import java.io.IOException;
import java.io.OutputStream;
@ -55,7 +57,13 @@ class SocketOutputStream extends OutputStream {
@Override
public void write(byte[] b, int off, int len) throws IOException {
if (!SocketWriteEvent.enabled()) {
sc.blockingWriteFully(b, off, len);
return;
}
long start = SocketWriteEvent.timestamp();
sc.blockingWriteFully(b, off, len);
SocketWriteEvent.offer(start, len, sc.remoteAddress());
}
@Override

@ -0,0 +1,127 @@
/*
* Copyright (c) 2023, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package jdk.jfr.event.io;
import static jdk.test.lib.Asserts.assertEquals;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import jdk.jfr.Recording;
import jdk.jfr.consumer.RecordedEvent;
import jdk.test.lib.jfr.Events;
import jdk.test.lib.thread.TestThread;
import jdk.test.lib.thread.XRun;
/**
* @test
* @bug 8310978
* @summary test socket read/write events on socket adaptors
* @key jfr
* @requires vm.hasJFR
* @library /test/lib /test/jdk
* @run main/othervm jdk.jfr.event.io.TestSocketAdapterEvents
*/
public class TestSocketAdapterEvents {
private static final int writeInt = 'A';
private static final byte[] writeBuf = { 'B', 'C', 'D', 'E' };
private List<IOEvent> expectedEvents = new ArrayList<>();
private synchronized void addExpectedEvent(IOEvent event) {
expectedEvents.add(event);
}
public static void main(String[] args) throws Throwable {
new TestSocketAdapterEvents().test();
}
public void test() throws Throwable {
try (Recording recording = new Recording()) {
try (ServerSocketChannel ssc = ServerSocketChannel.open()) {
recording.enable(IOEvent.EVENT_SOCKET_READ).withThreshold(Duration.ofMillis(0));
recording.enable(IOEvent.EVENT_SOCKET_WRITE).withThreshold(Duration.ofMillis(0));
recording.start();
InetAddress lb = InetAddress.getLoopbackAddress();
ssc.bind(new InetSocketAddress(lb, 0));
TestThread readerThread = new TestThread(new XRun() {
@Override
public void xrun() throws IOException {
byte[] bs = new byte[4];
try (SocketChannel sc = ssc.accept(); Socket s = sc.socket();
InputStream is = s.getInputStream()) {
int readInt = is.read();
assertEquals(readInt, writeInt, "Wrong readInt");
addExpectedEvent(IOEvent.createSocketReadEvent(1, s));
int bytesRead = is.read(bs, 0, 3);
assertEquals(bytesRead, 3, "Wrong bytesRead partial buffer");
addExpectedEvent(IOEvent.createSocketReadEvent(bytesRead, s));
bytesRead = is.read(bs);
assertEquals(bytesRead, writeBuf.length, "Wrong bytesRead full buffer");
addExpectedEvent(IOEvent.createSocketReadEvent(bytesRead, s));
// Try to read more, but writer have closed. Should
// get EOF.
readInt = is.read();
assertEquals(readInt, -1, "Wrong readInt at EOF");
addExpectedEvent(IOEvent.createSocketReadEvent(-1, s));
}
}
});
readerThread.start();
try (SocketChannel sc = SocketChannel.open(ssc.getLocalAddress());
Socket s = sc.socket(); OutputStream os = s.getOutputStream()) {
os.write(writeInt);
addExpectedEvent(IOEvent.createSocketWriteEvent(1, s));
os.write(writeBuf, 0, 3);
addExpectedEvent(IOEvent.createSocketWriteEvent(3, s));
os.write(writeBuf);
addExpectedEvent(IOEvent.createSocketWriteEvent(writeBuf.length, s));
}
readerThread.joinAndThrow();
recording.stop();
List<RecordedEvent> events = Events.fromRecording(recording);
IOHelper.verifyEquals(events, expectedEvents);
}
}
}
}

@ -1,5 +1,5 @@
/*
* Copyright (c) 2018, 2020, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2018, 2023, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -26,6 +26,8 @@ package jdk.jfr.event.io;
import static jdk.test.lib.Asserts.assertEquals;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
@ -41,6 +43,7 @@ import jdk.test.lib.thread.XRun;
/**
* @test
* @summary test socket read/write events on SocketChannel
* @key jfr
* @requires vm.hasJFR
* @library /test/lib /test/jdk
@ -62,20 +65,20 @@ public class TestSocketChannelEvents {
public void test() throws Throwable {
try (Recording recording = new Recording()) {
try (ServerSocketChannel ss = ServerSocketChannel.open()) {
try (ServerSocketChannel ssc = ServerSocketChannel.open()) {
recording.enable(IOEvent.EVENT_SOCKET_READ).withThreshold(Duration.ofMillis(0));
recording.enable(IOEvent.EVENT_SOCKET_WRITE).withThreshold(Duration.ofMillis(0));
recording.start();
ss.socket().setReuseAddress(true);
ss.socket().bind(null);
InetAddress lb = InetAddress.getLoopbackAddress();
ssc.bind(new InetSocketAddress(lb, 0));
TestThread readerThread = new TestThread(new XRun() {
@Override
public void xrun() throws IOException {
ByteBuffer bufA = ByteBuffer.allocate(bufSizeA);
ByteBuffer bufB = ByteBuffer.allocate(bufSizeB);
try (SocketChannel sc = ss.accept()) {
try (SocketChannel sc = ssc.accept()) {
int readSize = sc.read(bufA);
assertEquals(readSize, bufSizeA, "Wrong readSize bufA");
addExpectedEvent(IOEvent.createSocketReadEvent(bufSizeA, sc.socket()));
@ -98,7 +101,7 @@ public class TestSocketChannelEvents {
});
readerThread.start();
try (SocketChannel sc = SocketChannel.open(ss.socket().getLocalSocketAddress())) {
try (SocketChannel sc = SocketChannel.open(ssc.getLocalAddress())) {
ByteBuffer bufA = ByteBuffer.allocateDirect(bufSizeA);
ByteBuffer bufB = ByteBuffer.allocateDirect(bufSizeB);
for (int i = 0; i < bufSizeA; ++i) {

@ -1,5 +1,5 @@
/*
* Copyright (c) 2018, 2020, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2018, 2023, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -28,6 +28,8 @@ import static jdk.test.lib.Asserts.assertEquals;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.time.Duration;
@ -42,6 +44,7 @@ import jdk.test.lib.thread.XRun;
/**
* @test
* @summary test socket read/write events on Socket
* @key jfr
* @requires vm.hasJFR
* @library /test/lib /test/jdk
@ -69,8 +72,8 @@ public class TestSocketEvents {
recording.enable(IOEvent.EVENT_SOCKET_WRITE).withThreshold(Duration.ofMillis(0));
recording.start();
ss.setReuseAddress(true);
ss.bind(null);
InetAddress lb = InetAddress.getLoopbackAddress();
ss.bind(new InetSocketAddress(lb, 0));
TestThread readerThread = new TestThread(new XRun() {
@Override