/* * Copyright (c) 2014, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License version 2 only, as * published by the Free Software Foundation. * * This code is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License * version 2 for more details (a copy is included in the LICENSE file that * accompanied this code). * * You should have received a copy of the GNU General Public License version * 2 along with this work; if not, write to the Free Software Foundation, * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. * * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA * or visit www.oracle.com if you need additional information or have any * questions. */ package org.openjdk.bench.java.net; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; import org.openjdk.jmh.annotations.Fork; import org.openjdk.jmh.annotations.Measurement; import org.openjdk.jmh.annotations.Mode; import org.openjdk.jmh.annotations.OutputTimeUnit; import org.openjdk.jmh.annotations.Scope; import org.openjdk.jmh.annotations.Setup; import org.openjdk.jmh.annotations.State; import org.openjdk.jmh.annotations.TearDown; import org.openjdk.jmh.annotations.Warmup; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.InetAddress; import java.net.ServerSocket; import java.net.Socket; import java.util.concurrent.TimeUnit; /** * Micro benchmark for streaming data over a Socket. */ @BenchmarkMode(Mode.AverageTime) @OutputTimeUnit(TimeUnit.MILLISECONDS) @State(Scope.Thread) @Warmup(iterations = 5, time = 1) @Measurement(iterations = 5, time = 1) @Fork(value = 3) public class SocketStreaming { /** The bytes to write/read. */ public static final int dataLength = 16383; /** setTcpNoDelay(noNagle) */ public static final boolean noNagle = false; private WriterThread writerThread; private Socket readSocket; private byte[] bytes; @Setup public void prepare() throws Exception { bytes = new byte[dataLength]; // Setup the writer thread writerThread = new WriterThread(dataLength, noNagle); writerThread.start(); // Wait for a read socket readSocket = writerThread.waitForReadSocket(); } @TearDown public void cleanup() throws IOException { // Take down the writer thread and the reader socket writerThread.finish(); while (!readSocket.isClosed()) { readSocket.close(); } readSocket = null; } @Benchmark public void testSocketInputStreamRead() throws InterruptedException, IOException { InputStream in = readSocket.getInputStream(); // Notify the writer thread to add elements to stream writerThread.requestSendBytes(); // Read these from the stream int bytesRead = 0; while (bytesRead < dataLength) { int lastRead = in.read(bytes); if (lastRead < 0) { throw new InternalError("Unexpectedly got " + lastRead + " bytes from the socket"); } bytesRead += lastRead; } } /** * Thread used to write bytes to a socket. */ private class WriterThread extends Thread { /** The number of bytes to write. */ private int dataLength; /** setTcpNoDelay(noNagle) */ private boolean noNagle; /** Lock needed to send sendBytes requests. */ private final Object sendBytesLock = new Object(); /** Indicates that a sendBytes has been requested. */ private boolean sendBytesRequested; /** Indicates that no more sendBytes will be requested. Time to shutdown. */ private boolean sendBytesDone; /** Lock needed to protect the connectPort variable. */ private final Object connectLock = new Object(); /** The port the read socket should connect to. */ private int connectPort = -1; /** * Constructor. * * @param dataLength The number of bytes to write * @param noNagle setTcpNoDelay(noNagle) */ public WriterThread(int dataLength, boolean noNagle) { super("Load producer"); this.dataLength = dataLength; this.noNagle = noNagle; } /** Entry point for data sending helper thread. */ @Override public void run() { try { Socket writeSocket; ServerSocket serverSocket = new ServerSocket(0); /* Tell the other thread that we now know the port number. * The other thread will now start to connect until the following accept() call succeeds. */ synchronized (connectLock) { connectPort = serverSocket.getLocalPort(); connectLock.notify(); } // Wait for the other thread to connect writeSocket = serverSocket.accept(); writeSocket.setTcpNoDelay(noNagle); // No more connects so this can be closed serverSocket.close(); serverSocket = null; OutputStream out = writeSocket.getOutputStream(); // Iterate as long as sendBytes are issued while (waitForSendBytesRequest()) { sendBytes(out); } // Time to shutdown while (!writeSocket.isClosed()) { writeSocket.close(); } writeSocket = null; } catch (Exception e) { System.exit(1); } } /** * Sends bytes to the output stream * * @param out The output stream * @throws IOException */ private void sendBytes(OutputStream out) throws IOException { byte outBytes[] = new byte[dataLength]; int bytesToSend = dataLength; int bytesSent = 0; while (bytesSent < bytesToSend) { out.write(outBytes); bytesSent += outBytes.length; } } /** * Waits for the readSocket and returns it when it is ready. * * @return The socket to read from * @throws InterruptedException */ @SuppressWarnings("SleepWhileHoldingLock") public Socket waitForReadSocket() throws InterruptedException { int theConnectPort = waitForConnectPort(); while (true) { try { return new Socket(InetAddress.getByName(null), theConnectPort); } catch (IOException e) { // Wait some more for the server thread to get going Thread.sleep(1000); } } } /** * Waits for next sendBytes request * * @return true if it is time to sendBytes, false if it is time to shutdown * @throws InterruptedException */ public boolean waitForSendBytesRequest() throws InterruptedException { synchronized (sendBytesLock) { while (!sendBytesRequested && !sendBytesDone) { sendBytesLock.wait(); } // Clear the flag sendBytesRequested = false; return !sendBytesDone; } } /** Requests a sendBytes. */ public void requestSendBytes() { synchronized (sendBytesLock) { sendBytesRequested = true; sendBytesLock.notify(); } } /** Tells the writerThread that it is time to shutdown. */ public void finish() { synchronized (sendBytesLock) { sendBytesDone = true; sendBytesLock.notify(); } } private int waitForConnectPort() throws InterruptedException { synchronized (connectLock) { while (connectPort == -1) { connectLock.wait(); } return connectPort; } } } }