From f16bb08f8941d0c0eb1f6e8956e201f737f18616 Mon Sep 17 00:00:00 2001 From: Roger Riggs Date: Thu, 31 Aug 2017 17:08:35 -0400 Subject: [PATCH] 8087189: RMI server-side multiplex protocol support should be removed Reviewed-by: alanb --- .../sun/rmi/server/ActivatableRef.java | 5 +- .../transport/tcp/ConnectionMultiplexer.java | 449 ------------------ .../tcp/MultiplexConnectionInfo.java | 55 --- .../transport/tcp/MultiplexInputStream.java | 213 --------- .../transport/tcp/MultiplexOutputStream.java | 231 --------- .../sun/rmi/transport/tcp/TCPChannel.java | 212 ++++----- .../sun/rmi/transport/tcp/TCPTransport.java | 58 +-- 7 files changed, 91 insertions(+), 1132 deletions(-) delete mode 100644 jdk/src/java.rmi/share/classes/sun/rmi/transport/tcp/ConnectionMultiplexer.java delete mode 100644 jdk/src/java.rmi/share/classes/sun/rmi/transport/tcp/MultiplexConnectionInfo.java delete mode 100644 jdk/src/java.rmi/share/classes/sun/rmi/transport/tcp/MultiplexInputStream.java delete mode 100644 jdk/src/java.rmi/share/classes/sun/rmi/transport/tcp/MultiplexOutputStream.java diff --git a/jdk/src/java.rmi/share/classes/sun/rmi/server/ActivatableRef.java b/jdk/src/java.rmi/share/classes/sun/rmi/server/ActivatableRef.java index 7e6668c389e..22e6976a1ef 100644 --- a/jdk/src/java.rmi/share/classes/sun/rmi/server/ActivatableRef.java +++ b/jdk/src/java.rmi/share/classes/sun/rmi/server/ActivatableRef.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 1997, 2013, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 1997, 2017, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -158,8 +158,7 @@ public class ActivatableRef implements RemoteRef { exception = e; } catch (ConnectIOException e) { /* - * Failure setting up multiplexed connection or reusing - * cached connection; retry call + * Failure reusing cached connection; retry call */ exception = e; } catch (MarshalException e) { diff --git a/jdk/src/java.rmi/share/classes/sun/rmi/transport/tcp/ConnectionMultiplexer.java b/jdk/src/java.rmi/share/classes/sun/rmi/transport/tcp/ConnectionMultiplexer.java deleted file mode 100644 index 42857ec967c..00000000000 --- a/jdk/src/java.rmi/share/classes/sun/rmi/transport/tcp/ConnectionMultiplexer.java +++ /dev/null @@ -1,449 +0,0 @@ -/* - * Copyright (c) 1996, 2017, Oracle and/or its affiliates. All rights reserved. - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This code is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License version 2 only, as - * published by the Free Software Foundation. Oracle designates this - * particular file as subject to the "Classpath" exception as provided - * by Oracle in the LICENSE file that accompanied this code. - * - * This code is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * version 2 for more details (a copy is included in the LICENSE file that - * accompanied this code). - * - * You should have received a copy of the GNU General Public License version - * 2 along with this work; if not, write to the Free Software Foundation, - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. - * - * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA - * or visit www.oracle.com if you need additional information or have any - * questions. - */ -package sun.rmi.transport.tcp; - -import java.io.*; -import java.util.*; -import java.rmi.server.LogStream; -import java.security.PrivilegedAction; - -import sun.rmi.runtime.Log; - -/** - * ConnectionMultiplexer manages the transparent multiplexing of - * multiple virtual connections from one endpoint to another through - * one given real connection to that endpoint. The input and output - * streams for the underlying real connection must be supplied. - * A callback object is also supplied to be informed of new virtual - * connections opened by the remote endpoint. After creation, the - * run() method must be called in a thread created for demultiplexing - * the connections. The openConnection() method is called to - * initiate a virtual connection from this endpoint. - * - * @author Peter Jones - */ -@SuppressWarnings("deprecation") -final class ConnectionMultiplexer { - - /** "multiplex" log level */ - static int logLevel = LogStream.parseLevel(getLogLevel()); - - private static String getLogLevel() { - return java.security.AccessController.doPrivileged( - (PrivilegedAction) () -> System.getProperty("sun.rmi.transport.tcp.multiplex.logLevel")); - } - - /* multiplex system log */ - static final Log multiplexLog = - Log.getLog("sun.rmi.transport.tcp.multiplex", - "multiplex", ConnectionMultiplexer.logLevel); - - /** multiplexing protocol operation codes */ - private final static int OPEN = 0xE1; - private final static int CLOSE = 0xE2; - private final static int CLOSEACK = 0xE3; - private final static int REQUEST = 0xE4; - private final static int TRANSMIT = 0xE5; - - /** object to notify for new connections from remote endpoint */ - private TCPChannel channel; - - /** input stream for underlying single connection */ - private InputStream in; - - /** output stream for underlying single connection */ - private OutputStream out; - - /** true if underlying connection originated from this endpoint - (used for generating unique connection IDs) */ - private boolean orig; - - /** layered stream for reading formatted data from underlying connection */ - private DataInputStream dataIn; - - /** layered stream for writing formatted data to underlying connection */ - private DataOutputStream dataOut; - - /** table holding currently open connection IDs and related info */ - private Hashtable connectionTable = new Hashtable<>(7); - - /** number of currently open connections */ - private int numConnections = 0; - - /** maximum allowed open connections */ - private final static int maxConnections = 256; - - /** ID of last connection opened */ - private int lastID = 0x1001; - - /** true if this mechanism is still alive */ - private boolean alive = true; - - /** - * Create a new ConnectionMultiplexer using the given underlying - * input/output stream pair. The run method must be called - * (possibly on a new thread) to handle the demultiplexing. - * @param channel object to notify when new connection is received - * @param in input stream of underlying connection - * @param out output stream of underlying connection - * @param orig true if this endpoint intiated the underlying - * connection (needs to be set differently at both ends) - */ - public ConnectionMultiplexer( - TCPChannel channel, - InputStream in, - OutputStream out, - boolean orig) - { - this.channel = channel; - this.in = in; - this.out = out; - this.orig = orig; - - dataIn = new DataInputStream(in); - dataOut = new DataOutputStream(out); - } - - /** - * Process multiplexing protocol received from underlying connection. - */ - public void run() throws IOException - { - try { - int op, id, length; - MultiplexConnectionInfo info; - - while (true) { - - // read next op code from remote endpoint - op = dataIn.readUnsignedByte(); - switch (op) { - - // remote endpoint initiating new connection - case OPEN: - id = dataIn.readUnsignedShort(); - - if (multiplexLog.isLoggable(Log.VERBOSE)) { - multiplexLog.log(Log.VERBOSE, "operation OPEN " + id); - } - - info = connectionTable.get(id); - if (info != null) - throw new IOException( - "OPEN: Connection ID already exists"); - info = new MultiplexConnectionInfo(id); - info.in = new MultiplexInputStream(this, info, 2048); - info.out = new MultiplexOutputStream(this, info, 2048); - synchronized (connectionTable) { - connectionTable.put(id, info); - ++ numConnections; - } - sun.rmi.transport.Connection conn; - conn = new TCPConnection(channel, info.in, info.out); - channel.acceptMultiplexConnection(conn); - break; - - // remote endpoint closing connection - case CLOSE: - id = dataIn.readUnsignedShort(); - - if (multiplexLog.isLoggable(Log.VERBOSE)) { - multiplexLog.log(Log.VERBOSE, "operation CLOSE " + id); - } - - info = connectionTable.get(id); - if (info == null) - throw new IOException( - "CLOSE: Invalid connection ID"); - info.in.disconnect(); - info.out.disconnect(); - if (!info.closed) - sendCloseAck(info); - synchronized (connectionTable) { - connectionTable.remove(id); - -- numConnections; - } - break; - - // remote endpoint acknowledging close of connection - case CLOSEACK: - id = dataIn.readUnsignedShort(); - - if (multiplexLog.isLoggable(Log.VERBOSE)) { - multiplexLog.log(Log.VERBOSE, - "operation CLOSEACK " + id); - } - - info = connectionTable.get(id); - if (info == null) - throw new IOException( - "CLOSEACK: Invalid connection ID"); - if (!info.closed) - throw new IOException( - "CLOSEACK: Connection not closed"); - info.in.disconnect(); - info.out.disconnect(); - synchronized (connectionTable) { - connectionTable.remove(id); - -- numConnections; - } - break; - - // remote endpoint declaring additional bytes receivable - case REQUEST: - id = dataIn.readUnsignedShort(); - info = connectionTable.get(id); - if (info == null) - throw new IOException( - "REQUEST: Invalid connection ID"); - length = dataIn.readInt(); - - if (multiplexLog.isLoggable(Log.VERBOSE)) { - multiplexLog.log(Log.VERBOSE, - "operation REQUEST " + id + ": " + length); - } - - info.out.request(length); - break; - - // remote endpoint transmitting data packet - case TRANSMIT: - id = dataIn.readUnsignedShort(); - info = connectionTable.get(id); - if (info == null) - throw new IOException("SEND: Invalid connection ID"); - length = dataIn.readInt(); - - if (multiplexLog.isLoggable(Log.VERBOSE)) { - multiplexLog.log(Log.VERBOSE, - "operation TRANSMIT " + id + ": " + length); - } - - info.in.receive(length, dataIn); - break; - - default: - throw new IOException("Invalid operation: " + - Integer.toHexString(op)); - } - } - } finally { - shutDown(); - } - } - - /** - * Initiate a new multiplexed connection through the underlying - * connection. - */ - public synchronized TCPConnection openConnection() throws IOException - { - // generate ID that should not be already used - // If all possible 32768 IDs are used, - // this method will block searching for a new ID forever. - int id; - do { - lastID = (++ lastID) & 0x7FFF; - id = lastID; - - // The orig flag (copied to the high bit of the ID) is used - // to have two distinct ranges to choose IDs from for the - // two endpoints. - if (orig) - id |= 0x8000; - } while (connectionTable.get(id) != null); - - // create multiplexing streams and bookkeeping information - MultiplexConnectionInfo info = new MultiplexConnectionInfo(id); - info.in = new MultiplexInputStream(this, info, 2048); - info.out = new MultiplexOutputStream(this, info, 2048); - - // add to connection table if multiplexer has not died - synchronized (connectionTable) { - if (!alive) - throw new IOException("Multiplexer connection dead"); - if (numConnections >= maxConnections) - throw new IOException("Cannot exceed " + maxConnections + - " simultaneous multiplexed connections"); - connectionTable.put(id, info); - ++ numConnections; - } - - // inform remote endpoint of new connection - synchronized (dataOut) { - try { - dataOut.writeByte(OPEN); - dataOut.writeShort(id); - dataOut.flush(); - } catch (IOException e) { - multiplexLog.log(Log.BRIEF, "exception: ", e); - - shutDown(); - throw e; - } - } - - return new TCPConnection(channel, info.in, info.out); - } - - /** - * Shut down all connections and clean up. - */ - public void shutDown() - { - // inform all associated streams - synchronized (connectionTable) { - // return if multiplexer already officially dead - if (!alive) - return; - alive = false; - - Enumeration enum_ = - connectionTable.elements(); - while (enum_.hasMoreElements()) { - MultiplexConnectionInfo info = enum_.nextElement(); - info.in.disconnect(); - info.out.disconnect(); - } - connectionTable.clear(); - numConnections = 0; - } - - // close underlying connection, if possible (and not already done) - try { - in.close(); - } catch (IOException e) { - } - try { - out.close(); - } catch (IOException e) { - } - } - - /** - * Send request for more data on connection to remote endpoint. - * @param info connection information structure - * @param len number of more bytes that can be received - */ - void sendRequest(MultiplexConnectionInfo info, int len) throws IOException - { - synchronized (dataOut) { - if (alive && !info.closed) - try { - dataOut.writeByte(REQUEST); - dataOut.writeShort(info.id); - dataOut.writeInt(len); - dataOut.flush(); - } catch (IOException e) { - multiplexLog.log(Log.BRIEF, "exception: ", e); - - shutDown(); - throw e; - } - } - } - - /** - * Send packet of requested data on connection to remote endpoint. - * @param info connection information structure - * @param buf array containing bytes to send - * @param off offset of first array index of packet - * @param len number of bytes in packet to send - */ - void sendTransmit(MultiplexConnectionInfo info, - byte buf[], int off, int len) throws IOException - { - synchronized (dataOut) { - if (alive && !info.closed) - try { - dataOut.writeByte(TRANSMIT); - dataOut.writeShort(info.id); - dataOut.writeInt(len); - dataOut.write(buf, off, len); - dataOut.flush(); - } catch (IOException e) { - multiplexLog.log(Log.BRIEF, "exception: ", e); - - shutDown(); - throw e; - } - } - } - - /** - * Inform remote endpoint that connection has been closed. - * @param info connection information structure - */ - void sendClose(MultiplexConnectionInfo info) throws IOException - { - info.out.disconnect(); - synchronized (dataOut) { - if (alive && !info.closed) - try { - dataOut.writeByte(CLOSE); - dataOut.writeShort(info.id); - dataOut.flush(); - info.closed = true; - } catch (IOException e) { - multiplexLog.log(Log.BRIEF, "exception: ", e); - - shutDown(); - throw e; - } - } - } - - /** - * Acknowledge remote endpoint's closing of connection. - * @param info connection information structure - */ - void sendCloseAck(MultiplexConnectionInfo info) throws IOException - { - synchronized (dataOut) { - if (alive && !info.closed) - try { - dataOut.writeByte(CLOSEACK); - dataOut.writeShort(info.id); - dataOut.flush(); - info.closed = true; - } catch (IOException e) { - multiplexLog.log(Log.BRIEF, "exception: ", e); - - shutDown(); - throw e; - } - } - } - - /** - * Shut down connection upon finalization. - */ - @SuppressWarnings("deprecation") - protected void finalize() throws Throwable - { - super.finalize(); - shutDown(); - } -} diff --git a/jdk/src/java.rmi/share/classes/sun/rmi/transport/tcp/MultiplexConnectionInfo.java b/jdk/src/java.rmi/share/classes/sun/rmi/transport/tcp/MultiplexConnectionInfo.java deleted file mode 100644 index 9059971c047..00000000000 --- a/jdk/src/java.rmi/share/classes/sun/rmi/transport/tcp/MultiplexConnectionInfo.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Copyright (c) 1996, Oracle and/or its affiliates. All rights reserved. - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This code is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License version 2 only, as - * published by the Free Software Foundation. Oracle designates this - * particular file as subject to the "Classpath" exception as provided - * by Oracle in the LICENSE file that accompanied this code. - * - * This code is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * version 2 for more details (a copy is included in the LICENSE file that - * accompanied this code). - * - * You should have received a copy of the GNU General Public License version - * 2 along with this work; if not, write to the Free Software Foundation, - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. - * - * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA - * or visit www.oracle.com if you need additional information or have any - * questions. - */ -package sun.rmi.transport.tcp; - -/** - * MultiplexConnectionInfo groups related information about a - * virtual connection managed by a ConnectionMultiplexer object. - * - * @author Peter Jones - */ -class MultiplexConnectionInfo { - - /** integer that uniquely identifies this connection */ - int id; - - /** input stream for reading from connection */ - MultiplexInputStream in = null; - - /** output stream for writing to connection */ - MultiplexOutputStream out = null; - - /** true if this connection has been closed */ - boolean closed = false; - - /** - * Create information structure for given connection identifier. - * @param id connection identifier - */ - MultiplexConnectionInfo(int id) - { - this.id = id; - } -} diff --git a/jdk/src/java.rmi/share/classes/sun/rmi/transport/tcp/MultiplexInputStream.java b/jdk/src/java.rmi/share/classes/sun/rmi/transport/tcp/MultiplexInputStream.java deleted file mode 100644 index 4e094e272df..00000000000 --- a/jdk/src/java.rmi/share/classes/sun/rmi/transport/tcp/MultiplexInputStream.java +++ /dev/null @@ -1,213 +0,0 @@ -/* - * Copyright (c) 1996, 1997, Oracle and/or its affiliates. All rights reserved. - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This code is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License version 2 only, as - * published by the Free Software Foundation. Oracle designates this - * particular file as subject to the "Classpath" exception as provided - * by Oracle in the LICENSE file that accompanied this code. - * - * This code is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * version 2 for more details (a copy is included in the LICENSE file that - * accompanied this code). - * - * You should have received a copy of the GNU General Public License version - * 2 along with this work; if not, write to the Free Software Foundation, - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. - * - * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA - * or visit www.oracle.com if you need additional information or have any - * questions. - */ -package sun.rmi.transport.tcp; - -import java.io.*; - -/** - * MultiplexInputStream manages receiving data over a connection managed - * by a ConnectionMultiplexer object. This object is responsible for - * requesting more bytes of data as space in its internal buffer becomes - * available. - * - * @author Peter Jones - */ -final class MultiplexInputStream extends InputStream { - - /** object managing multiplexed connection */ - private ConnectionMultiplexer manager; - - /** information about the connection this is the input stream for */ - private MultiplexConnectionInfo info; - - /** input buffer */ - private byte buffer[]; - - /** number of real data bytes present in buffer */ - private int present = 0; - - /** current position to read from in input buffer */ - private int pos = 0; - - /** pending number of bytes this stream has requested */ - private int requested = 0; - - /** true if this connection has been disconnected */ - private boolean disconnected = false; - - /** - * lock acquired to access shared variables: - * buffer, present, pos, requested, & disconnected - * WARNING: Any of the methods manager.send*() should not be - * invoked while this lock is held, since they could potentially - * block if the underlying connection's transport buffers are - * full, and the manager may need to acquire this lock to process - * and consume data coming over the underlying connection. - */ - private Object lock = new Object(); - - /** level at which more data is requested when read past */ - private int waterMark; - - /** data structure for holding reads of one byte */ - private byte temp[] = new byte[1]; - - /** - * Create a new MultiplexInputStream for the given manager. - * @param manager object that manages this connection - * @param info structure for connection this stream reads from - * @param bufferLength length of input buffer - */ - MultiplexInputStream( - ConnectionMultiplexer manager, - MultiplexConnectionInfo info, - int bufferLength) - { - this.manager = manager; - this.info = info; - - buffer = new byte[bufferLength]; - waterMark = bufferLength / 2; - } - - /** - * Read a byte from the connection. - */ - public synchronized int read() throws IOException - { - int n = read(temp, 0, 1); - if (n != 1) - return -1; - return temp[0] & 0xFF; - } - - /** - * Read a subarray of bytes from connection. This method blocks for - * at least one byte, and it returns the number of bytes actually read, - * or -1 if the end of the stream was detected. - * @param b array to read bytes into - * @param off offset of beginning of bytes to read into - * @param len number of bytes to read - */ - public synchronized int read(byte b[], int off, int len) throws IOException - { - if (len <= 0) - return 0; - - int moreSpace; - synchronized (lock) { - if (pos >= present) - pos = present = 0; - else if (pos >= waterMark) { - System.arraycopy(buffer, pos, buffer, 0, present - pos); - present -= pos; - pos = 0; - } - int freeSpace = buffer.length - present; - moreSpace = Math.max(freeSpace - requested, 0); - } - if (moreSpace > 0) - manager.sendRequest(info, moreSpace); - synchronized (lock) { - requested += moreSpace; - while ((pos >= present) && !disconnected) { - try { - lock.wait(); - } catch (InterruptedException e) { - } - } - if (disconnected && pos >= present) - return -1; - - int available = present - pos; - if (len < available) { - System.arraycopy(buffer, pos, b, off, len); - pos += len; - return len; - } - else { - System.arraycopy(buffer, pos, b, off, available); - pos = present = 0; - // could send another request here, if len > available?? - return available; - } - } - } - - /** - * Return the number of bytes immediately available for reading. - */ - public int available() throws IOException - { - synchronized (lock) { - return present - pos; - } - } - - /** - * Close this connection. - */ - public void close() throws IOException - { - manager.sendClose(info); - } - - /** - * Receive bytes transmitted from connection at remote endpoint. - * @param length number of bytes transmitted - * @param in input stream with those bytes ready to be read - */ - void receive(int length, DataInputStream in) - throws IOException - { - /* TO DO: Optimize so that data received from stream can be loaded - * directly into user's buffer if there is a pending read(). - */ - synchronized (lock) { - if ((pos > 0) && ((buffer.length - present) < length)) { - System.arraycopy(buffer, pos, buffer, 0, present - pos); - present -= pos; - pos = 0; - } - if ((buffer.length - present) < length) - throw new IOException("Receive buffer overflow"); - in.readFully(buffer, present, length); - present += length; - requested -= length; - lock.notifyAll(); - } - } - - /** - * Disconnect this stream from all connection activity. - */ - void disconnect() - { - synchronized (lock) { - disconnected = true; - lock.notifyAll(); - } - } -} diff --git a/jdk/src/java.rmi/share/classes/sun/rmi/transport/tcp/MultiplexOutputStream.java b/jdk/src/java.rmi/share/classes/sun/rmi/transport/tcp/MultiplexOutputStream.java deleted file mode 100644 index d5a77dd3b25..00000000000 --- a/jdk/src/java.rmi/share/classes/sun/rmi/transport/tcp/MultiplexOutputStream.java +++ /dev/null @@ -1,231 +0,0 @@ -/* - * Copyright (c) 1996, 2013, Oracle and/or its affiliates. All rights reserved. - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This code is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License version 2 only, as - * published by the Free Software Foundation. Oracle designates this - * particular file as subject to the "Classpath" exception as provided - * by Oracle in the LICENSE file that accompanied this code. - * - * This code is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * version 2 for more details (a copy is included in the LICENSE file that - * accompanied this code). - * - * You should have received a copy of the GNU General Public License version - * 2 along with this work; if not, write to the Free Software Foundation, - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. - * - * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA - * or visit www.oracle.com if you need additional information or have any - * questions. - */ -package sun.rmi.transport.tcp; - -import java.io.*; - -/** - * MultiplexOutputStream manages sending data over a connection managed - * by a ConnectionMultiplexer object. Data written is buffered until the - * internal buffer is full or the flush() method is called, at which - * point it attempts to push a packet of bytes through to the remote - * endpoint. This will never push more bytes than the amount already - * requested by the remote endpoint (to prevent receive buffer from - * overflowing), so if the write() and flush() methods will block - * until their operation can complete if enough bytes cannot be - * pushed immediately. - * - * @author Peter Jones - */ -final class MultiplexOutputStream extends OutputStream { - - /** object managing multiplexed connection */ - private ConnectionMultiplexer manager; - - /** information about the connection this is the output stream for */ - private MultiplexConnectionInfo info; - - /** output buffer */ - private byte buffer[]; - - /** current position to write to in output buffer */ - private int pos = 0; - - /** pending number of bytes requested by remote endpoint */ - private int requested = 0; - - /** true if this connection has been disconnected */ - private boolean disconnected = false; - - /** - * lock acquired to access shared variables: - * requested & disconnected - * WARNING: Any of the methods manager.send*() should not be - * invoked while this lock is held, since they could potentially - * block if the underlying connection's transport buffers are - * full, and the manager may need to acquire this lock to process - * and consume data coming over the underlying connection. - */ - private Object lock = new Object(); - - /** - * Create a new MultiplexOutputStream for the given manager. - * @param manager object that manages this connection - * @param info structure for connection this stream writes to - * @param bufferLength length of output buffer - */ - MultiplexOutputStream( - ConnectionMultiplexer manager, - MultiplexConnectionInfo info, - int bufferLength) - { - this.manager = manager; - this.info = info; - - buffer = new byte[bufferLength]; - pos = 0; - } - - /** - * Write a byte over connection. - * @param b byte of data to write - */ - public synchronized void write(int b) throws IOException - { - while (pos >= buffer.length) - push(); - buffer[pos ++] = (byte) b; - } - - /** - * Write a subarray of bytes over connection. - * @param b array containing bytes to write - * @param off offset of beginning of bytes to write - * @param len number of bytes to write - */ - public synchronized void write(byte b[], int off, int len) - throws IOException - { - if (len <= 0) - return; - - // if enough free space in output buffer, just copy into there - int freeSpace = buffer.length - pos; - if (len <= freeSpace) { - System.arraycopy(b, off, buffer, pos, len); - pos += len; - return; - } - - // else, flush buffer and send rest directly to avoid array copy - flush(); - int local_requested; - while (true) { - synchronized (lock) { - while ((local_requested = requested) < 1 && !disconnected) { - try { - lock.wait(); - } catch (InterruptedException e) { - } - } - if (disconnected) - throw new IOException("Connection closed"); - } - - if (local_requested < len) { - manager.sendTransmit(info, b, off, local_requested); - off += local_requested; - len -= local_requested; - synchronized (lock) { - requested -= local_requested; - } - } - else { - manager.sendTransmit(info, b, off, len); - synchronized (lock) { - requested -= len; - } - // len = 0; - break; - } - } - } - - /** - * Guarantee that all data written to this stream has been pushed - * over and made available to the remote endpoint. - */ - public synchronized void flush() throws IOException { - while (pos > 0) - push(); - } - - /** - * Close this connection. - */ - public void close() throws IOException - { - manager.sendClose(info); - } - - /** - * Take note of more bytes requested by connection at remote endpoint. - * @param num number of additional bytes requested - */ - void request(int num) - { - synchronized (lock) { - requested += num; - lock.notifyAll(); - } - } - - /** - * Disconnect this stream from all connection activity. - */ - void disconnect() - { - synchronized (lock) { - disconnected = true; - lock.notifyAll(); - } - } - - /** - * Push bytes in output buffer to connection at remote endpoint. - * This method blocks until at least one byte has been pushed across. - */ - private void push() throws IOException - { - int local_requested; - synchronized (lock) { - while ((local_requested = requested) < 1 && !disconnected) { - try { - lock.wait(); - } catch (InterruptedException e) { - } - } - if (disconnected) - throw new IOException("Connection closed"); - } - - if (local_requested < pos) { - manager.sendTransmit(info, buffer, 0, local_requested); - System.arraycopy(buffer, local_requested, - buffer, 0, pos - local_requested); - pos -= local_requested; - synchronized (lock) { - requested -= local_requested; - } - } - else { - manager.sendTransmit(info, buffer, 0, pos); - synchronized (lock) { - requested -= pos; - } - pos = 0; - } - } -} diff --git a/jdk/src/java.rmi/share/classes/sun/rmi/transport/tcp/TCPChannel.java b/jdk/src/java.rmi/share/classes/sun/rmi/transport/tcp/TCPChannel.java index d8099df6e18..eab115a2f9a 100644 --- a/jdk/src/java.rmi/share/classes/sun/rmi/transport/tcp/TCPChannel.java +++ b/jdk/src/java.rmi/share/classes/sun/rmi/transport/tcp/TCPChannel.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 1996, 2013, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 1996, 2017, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -67,10 +67,6 @@ public class TCPChannel implements Channel { /** frees cached connections that have expired (guarded by freeList) */ private Future reaper = null; - /** using multiplexer (for bi-directional applet communication */ - private boolean usingMultiplexer = false; - /** connection multiplexer, if used */ - private ConnectionMultiplexer multiplexer = null; /** connection acceptor (should be in TCPTransport) */ private ConnectionAcceptor acceptor; @@ -210,113 +206,99 @@ public class TCPChannel implements Channel { TCPTransport.tcpLog.log(Log.BRIEF, "create connection"); - if (!usingMultiplexer) { - Socket sock = ep.newSocket(); - conn = new TCPConnection(this, sock); + Socket sock = ep.newSocket(); + conn = new TCPConnection(this, sock); - try { - DataOutputStream out = - new DataOutputStream(conn.getOutputStream()); - writeTransportHeader(out); + try { + DataOutputStream out = + new DataOutputStream(conn.getOutputStream()); + writeTransportHeader(out); - // choose protocol (single op if not reusable socket) - if (!conn.isReusable()) { - out.writeByte(TransportConstants.SingleOpProtocol); - } else { - out.writeByte(TransportConstants.StreamProtocol); - out.flush(); + // choose protocol (single op if not reusable socket) + if (!conn.isReusable()) { + out.writeByte(TransportConstants.SingleOpProtocol); + } else { + out.writeByte(TransportConstants.StreamProtocol); + out.flush(); - /* - * Set socket read timeout to configured value for JRMP - * connection handshake; this also serves to guard against - * non-JRMP servers that do not respond (see 4322806). - */ - int originalSoTimeout = 0; - try { - originalSoTimeout = sock.getSoTimeout(); - sock.setSoTimeout(handshakeTimeout); - } catch (Exception e) { - // if we fail to set this, ignore and proceed anyway - } - - DataInputStream in = - new DataInputStream(conn.getInputStream()); - byte ack = in.readByte(); - if (ack != TransportConstants.ProtocolAck) { - throw new ConnectIOException( - ack == TransportConstants.ProtocolNack ? - "JRMP StreamProtocol not supported by server" : - "non-JRMP server at remote endpoint"); - } - - String suggestedHost = in.readUTF(); - int suggestedPort = in.readInt(); - if (TCPTransport.tcpLog.isLoggable(Log.VERBOSE)) { - TCPTransport.tcpLog.log(Log.VERBOSE, - "server suggested " + suggestedHost + ":" + - suggestedPort); - } - - // set local host name, if unknown - TCPEndpoint.setLocalHost(suggestedHost); - // do NOT set the default port, because we don't - // know if we can't listen YET... - - // write out default endpoint to match protocol - // (but it serves no purpose) - TCPEndpoint localEp = - TCPEndpoint.getLocalEndpoint(0, null, null); - out.writeUTF(localEp.getHost()); - out.writeInt(localEp.getPort()); - if (TCPTransport.tcpLog.isLoggable(Log.VERBOSE)) { - TCPTransport.tcpLog.log(Log.VERBOSE, "using " + - localEp.getHost() + ":" + localEp.getPort()); - } - - /* - * After JRMP handshake, set socket read timeout to value - * configured for the rest of the lifetime of the - * connection. NOTE: this timeout, if configured to a - * finite duration, places an upper bound on the time - * that a remote method call is permitted to execute. - */ - try { - /* - * If socket factory had set a non-zero timeout on its - * own, then restore it instead of using the property- - * configured value. - */ - sock.setSoTimeout((originalSoTimeout != 0 ? - originalSoTimeout : - responseTimeout)); - } catch (Exception e) { - // if we fail to set this, ignore and proceed anyway - } - - out.flush(); - } - } catch (IOException e) { + /* + * Set socket read timeout to configured value for JRMP + * connection handshake; this also serves to guard against + * non-JRMP servers that do not respond (see 4322806). + */ + int originalSoTimeout = 0; try { - conn.close(); - } catch (Exception ex) {} - if (e instanceof RemoteException) { - throw (RemoteException) e; - } else { + originalSoTimeout = sock.getSoTimeout(); + sock.setSoTimeout(handshakeTimeout); + } catch (Exception e) { + // if we fail to set this, ignore and proceed anyway + } + + DataInputStream in = + new DataInputStream(conn.getInputStream()); + byte ack = in.readByte(); + if (ack != TransportConstants.ProtocolAck) { throw new ConnectIOException( - "error during JRMP connection establishment", e); + ack == TransportConstants.ProtocolNack ? + "JRMP StreamProtocol not supported by server" : + "non-JRMP server at remote endpoint"); } + + String suggestedHost = in.readUTF(); + int suggestedPort = in.readInt(); + if (TCPTransport.tcpLog.isLoggable(Log.VERBOSE)) { + TCPTransport.tcpLog.log(Log.VERBOSE, + "server suggested " + suggestedHost + ":" + + suggestedPort); + } + + // set local host name, if unknown + TCPEndpoint.setLocalHost(suggestedHost); + // do NOT set the default port, because we don't + // know if we can't listen YET... + + // write out default endpoint to match protocol + // (but it serves no purpose) + TCPEndpoint localEp = + TCPEndpoint.getLocalEndpoint(0, null, null); + out.writeUTF(localEp.getHost()); + out.writeInt(localEp.getPort()); + if (TCPTransport.tcpLog.isLoggable(Log.VERBOSE)) { + TCPTransport.tcpLog.log(Log.VERBOSE, "using " + + localEp.getHost() + ":" + localEp.getPort()); + } + + /* + * After JRMP handshake, set socket read timeout to value + * configured for the rest of the lifetime of the + * connection. NOTE: this timeout, if configured to a + * finite duration, places an upper bound on the time + * that a remote method call is permitted to execute. + */ + try { + /* + * If socket factory had set a non-zero timeout on its + * own, then restore it instead of using the property- + * configured value. + */ + sock.setSoTimeout((originalSoTimeout != 0 ? + originalSoTimeout : + responseTimeout)); + } catch (Exception e) { + // if we fail to set this, ignore and proceed anyway + } + + out.flush(); } - } else { + } catch (IOException e) { try { - conn = multiplexer.openConnection(); - } catch (IOException e) { - synchronized (this) { - usingMultiplexer = false; - multiplexer = null; - } + conn.close(); + } catch (Exception ex) {} + if (e instanceof RemoteException) { + throw (RemoteException) e; + } else { throw new ConnectIOException( - "error opening virtual connection " + - "over multiplexed connection", e); + "error during JRMP connection establishment", e); } } return conn; @@ -387,28 +369,6 @@ public class TCPChannel implements Channel { } } - /** - * Use given connection multiplexer object to obtain new connections - * through this channel. - */ - synchronized void useMultiplexer(ConnectionMultiplexer newMultiplexer) { - // for now, always just use the last one given - multiplexer = newMultiplexer; - - usingMultiplexer = true; - } - - /** - * Accept a connection provided over a multiplexed channel. - */ - void acceptMultiplexConnection(Connection conn) { - if (acceptor == null) { - acceptor = new ConnectionAcceptor(tr); - acceptor.startNewAcceptor(); - } - acceptor.accept(conn); - } - /** * Closes all the connections in the cache, whether timed out or not. */ @@ -501,7 +461,7 @@ class ConnectionAcceptor implements Runnable { public void startNewAcceptor() { Thread t = AccessController.doPrivileged( new NewThreadAction(ConnectionAcceptor.this, - "Multiplex Accept-" + ++ threadNum, + "TCPChannel Accept-" + ++ threadNum, true)); t.start(); } diff --git a/jdk/src/java.rmi/share/classes/sun/rmi/transport/tcp/TCPTransport.java b/jdk/src/java.rmi/share/classes/sun/rmi/transport/tcp/TCPTransport.java index 0fef27e34c6..251b7f6cef8 100644 --- a/jdk/src/java.rmi/share/classes/sun/rmi/transport/tcp/TCPTransport.java +++ b/jdk/src/java.rmi/share/classes/sun/rmi/transport/tcp/TCPTransport.java @@ -102,11 +102,6 @@ public class TCPTransport extends Transport { AccessController.doPrivileged((PrivilegedAction) () -> Long.getLong("sun.rmi.transport.tcp.threadKeepAliveTime", 60000)); - /** enable multiplexing protocol */ - private static final boolean enableMultiplexProtocol = // default false - AccessController.doPrivileged((PrivilegedAction) () -> - Boolean.getBoolean("sun.rmi.transport.tcp.enableMultiplexProtocol")); - /** thread pool for connection handlers */ private static final ExecutorService connectionThreadPool = new ThreadPoolExecutor(0, maxConnectionThreads, @@ -687,6 +682,7 @@ public class TCPTransport extends Transport { } } + @SuppressWarnings("fallthrough") private void run0() { TCPEndpoint endpoint = getEndpoint(); int port = endpoint.getPort(); @@ -801,59 +797,11 @@ public class TCPTransport extends Transport { break; case TransportConstants.MultiplexProtocol: - - if (!enableMultiplexProtocol) { - if (tcpLog.isLoggable(Log.VERBOSE)) { - tcpLog.log(Log.VERBOSE, "(port " + port + - ") rejecting multiplex protocol"); - } - - // If MultiplexProtocol is disabled, send NACK immediately. - out.writeByte(TransportConstants.ProtocolNack); - out.flush(); - break; - } - if (tcpLog.isLoggable(Log.VERBOSE)) { tcpLog.log(Log.VERBOSE, "(port " + port + - ") accepting multiplex protocol"); + ") rejecting multiplex protocol"); } - - // send ack - out.writeByte(TransportConstants.ProtocolAck); - - // suggest endpoint (in case client doesn't already have one) - if (tcpLog.isLoggable(Log.VERBOSE)) { - tcpLog.log(Log.VERBOSE, "(port " + port + - ") suggesting " + remoteHost + ":" + remotePort); - } - - out.writeUTF(remoteHost); - out.writeInt(remotePort); - out.flush(); - - // read endpoint client has decided to use - ep = new TCPEndpoint(in.readUTF(), in.readInt(), - endpoint.getClientSocketFactory(), - endpoint.getServerSocketFactory()); - if (tcpLog.isLoggable(Log.VERBOSE)) { - tcpLog.log(Log.VERBOSE, "(port " + - port + ") client using " + - ep.getHost() + ":" + ep.getPort()); - } - - ConnectionMultiplexer multiplexer; - synchronized (channelTable) { - // create or find channel for this endpoint - ch = getChannel(ep); - multiplexer = - new ConnectionMultiplexer(ch, bufIn, sockOut, - false); - ch.useMultiplexer(multiplexer); - } - multiplexer.run(); - break; - + // Fall-through to reject use of MultiplexProtocol default: // protocol not understood, send nack and close socket out.writeByte(TransportConstants.ProtocolNack);