8087189: RMI server-side multiplex protocol support should be removed
Reviewed-by: alanb
This commit is contained in:
parent
a787caa42d
commit
f16bb08f89
@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright (c) 1997, 2013, Oracle and/or its affiliates. All rights reserved.
|
||||
* Copyright (c) 1997, 2017, Oracle and/or its affiliates. All rights reserved.
|
||||
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
||||
*
|
||||
* This code is free software; you can redistribute it and/or modify it
|
||||
@ -158,8 +158,7 @@ public class ActivatableRef implements RemoteRef {
|
||||
exception = e;
|
||||
} catch (ConnectIOException e) {
|
||||
/*
|
||||
* Failure setting up multiplexed connection or reusing
|
||||
* cached connection; retry call
|
||||
* Failure reusing cached connection; retry call
|
||||
*/
|
||||
exception = e;
|
||||
} catch (MarshalException e) {
|
||||
|
@ -1,449 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 1996, 2017, Oracle and/or its affiliates. All rights reserved.
|
||||
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
||||
*
|
||||
* This code is free software; you can redistribute it and/or modify it
|
||||
* under the terms of the GNU General Public License version 2 only, as
|
||||
* published by the Free Software Foundation. Oracle designates this
|
||||
* particular file as subject to the "Classpath" exception as provided
|
||||
* by Oracle in the LICENSE file that accompanied this code.
|
||||
*
|
||||
* This code is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
|
||||
* version 2 for more details (a copy is included in the LICENSE file that
|
||||
* accompanied this code).
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License version
|
||||
* 2 along with this work; if not, write to the Free Software Foundation,
|
||||
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||
*
|
||||
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
|
||||
* or visit www.oracle.com if you need additional information or have any
|
||||
* questions.
|
||||
*/
|
||||
package sun.rmi.transport.tcp;
|
||||
|
||||
import java.io.*;
|
||||
import java.util.*;
|
||||
import java.rmi.server.LogStream;
|
||||
import java.security.PrivilegedAction;
|
||||
|
||||
import sun.rmi.runtime.Log;
|
||||
|
||||
/**
|
||||
* ConnectionMultiplexer manages the transparent multiplexing of
|
||||
* multiple virtual connections from one endpoint to another through
|
||||
* one given real connection to that endpoint. The input and output
|
||||
* streams for the underlying real connection must be supplied.
|
||||
* A callback object is also supplied to be informed of new virtual
|
||||
* connections opened by the remote endpoint. After creation, the
|
||||
* run() method must be called in a thread created for demultiplexing
|
||||
* the connections. The openConnection() method is called to
|
||||
* initiate a virtual connection from this endpoint.
|
||||
*
|
||||
* @author Peter Jones
|
||||
*/
|
||||
@SuppressWarnings("deprecation")
|
||||
final class ConnectionMultiplexer {
|
||||
|
||||
/** "multiplex" log level */
|
||||
static int logLevel = LogStream.parseLevel(getLogLevel());
|
||||
|
||||
private static String getLogLevel() {
|
||||
return java.security.AccessController.doPrivileged(
|
||||
(PrivilegedAction<String>) () -> System.getProperty("sun.rmi.transport.tcp.multiplex.logLevel"));
|
||||
}
|
||||
|
||||
/* multiplex system log */
|
||||
static final Log multiplexLog =
|
||||
Log.getLog("sun.rmi.transport.tcp.multiplex",
|
||||
"multiplex", ConnectionMultiplexer.logLevel);
|
||||
|
||||
/** multiplexing protocol operation codes */
|
||||
private final static int OPEN = 0xE1;
|
||||
private final static int CLOSE = 0xE2;
|
||||
private final static int CLOSEACK = 0xE3;
|
||||
private final static int REQUEST = 0xE4;
|
||||
private final static int TRANSMIT = 0xE5;
|
||||
|
||||
/** object to notify for new connections from remote endpoint */
|
||||
private TCPChannel channel;
|
||||
|
||||
/** input stream for underlying single connection */
|
||||
private InputStream in;
|
||||
|
||||
/** output stream for underlying single connection */
|
||||
private OutputStream out;
|
||||
|
||||
/** true if underlying connection originated from this endpoint
|
||||
(used for generating unique connection IDs) */
|
||||
private boolean orig;
|
||||
|
||||
/** layered stream for reading formatted data from underlying connection */
|
||||
private DataInputStream dataIn;
|
||||
|
||||
/** layered stream for writing formatted data to underlying connection */
|
||||
private DataOutputStream dataOut;
|
||||
|
||||
/** table holding currently open connection IDs and related info */
|
||||
private Hashtable<Integer, MultiplexConnectionInfo> connectionTable = new Hashtable<>(7);
|
||||
|
||||
/** number of currently open connections */
|
||||
private int numConnections = 0;
|
||||
|
||||
/** maximum allowed open connections */
|
||||
private final static int maxConnections = 256;
|
||||
|
||||
/** ID of last connection opened */
|
||||
private int lastID = 0x1001;
|
||||
|
||||
/** true if this mechanism is still alive */
|
||||
private boolean alive = true;
|
||||
|
||||
/**
|
||||
* Create a new ConnectionMultiplexer using the given underlying
|
||||
* input/output stream pair. The run method must be called
|
||||
* (possibly on a new thread) to handle the demultiplexing.
|
||||
* @param channel object to notify when new connection is received
|
||||
* @param in input stream of underlying connection
|
||||
* @param out output stream of underlying connection
|
||||
* @param orig true if this endpoint intiated the underlying
|
||||
* connection (needs to be set differently at both ends)
|
||||
*/
|
||||
public ConnectionMultiplexer(
|
||||
TCPChannel channel,
|
||||
InputStream in,
|
||||
OutputStream out,
|
||||
boolean orig)
|
||||
{
|
||||
this.channel = channel;
|
||||
this.in = in;
|
||||
this.out = out;
|
||||
this.orig = orig;
|
||||
|
||||
dataIn = new DataInputStream(in);
|
||||
dataOut = new DataOutputStream(out);
|
||||
}
|
||||
|
||||
/**
|
||||
* Process multiplexing protocol received from underlying connection.
|
||||
*/
|
||||
public void run() throws IOException
|
||||
{
|
||||
try {
|
||||
int op, id, length;
|
||||
MultiplexConnectionInfo info;
|
||||
|
||||
while (true) {
|
||||
|
||||
// read next op code from remote endpoint
|
||||
op = dataIn.readUnsignedByte();
|
||||
switch (op) {
|
||||
|
||||
// remote endpoint initiating new connection
|
||||
case OPEN:
|
||||
id = dataIn.readUnsignedShort();
|
||||
|
||||
if (multiplexLog.isLoggable(Log.VERBOSE)) {
|
||||
multiplexLog.log(Log.VERBOSE, "operation OPEN " + id);
|
||||
}
|
||||
|
||||
info = connectionTable.get(id);
|
||||
if (info != null)
|
||||
throw new IOException(
|
||||
"OPEN: Connection ID already exists");
|
||||
info = new MultiplexConnectionInfo(id);
|
||||
info.in = new MultiplexInputStream(this, info, 2048);
|
||||
info.out = new MultiplexOutputStream(this, info, 2048);
|
||||
synchronized (connectionTable) {
|
||||
connectionTable.put(id, info);
|
||||
++ numConnections;
|
||||
}
|
||||
sun.rmi.transport.Connection conn;
|
||||
conn = new TCPConnection(channel, info.in, info.out);
|
||||
channel.acceptMultiplexConnection(conn);
|
||||
break;
|
||||
|
||||
// remote endpoint closing connection
|
||||
case CLOSE:
|
||||
id = dataIn.readUnsignedShort();
|
||||
|
||||
if (multiplexLog.isLoggable(Log.VERBOSE)) {
|
||||
multiplexLog.log(Log.VERBOSE, "operation CLOSE " + id);
|
||||
}
|
||||
|
||||
info = connectionTable.get(id);
|
||||
if (info == null)
|
||||
throw new IOException(
|
||||
"CLOSE: Invalid connection ID");
|
||||
info.in.disconnect();
|
||||
info.out.disconnect();
|
||||
if (!info.closed)
|
||||
sendCloseAck(info);
|
||||
synchronized (connectionTable) {
|
||||
connectionTable.remove(id);
|
||||
-- numConnections;
|
||||
}
|
||||
break;
|
||||
|
||||
// remote endpoint acknowledging close of connection
|
||||
case CLOSEACK:
|
||||
id = dataIn.readUnsignedShort();
|
||||
|
||||
if (multiplexLog.isLoggable(Log.VERBOSE)) {
|
||||
multiplexLog.log(Log.VERBOSE,
|
||||
"operation CLOSEACK " + id);
|
||||
}
|
||||
|
||||
info = connectionTable.get(id);
|
||||
if (info == null)
|
||||
throw new IOException(
|
||||
"CLOSEACK: Invalid connection ID");
|
||||
if (!info.closed)
|
||||
throw new IOException(
|
||||
"CLOSEACK: Connection not closed");
|
||||
info.in.disconnect();
|
||||
info.out.disconnect();
|
||||
synchronized (connectionTable) {
|
||||
connectionTable.remove(id);
|
||||
-- numConnections;
|
||||
}
|
||||
break;
|
||||
|
||||
// remote endpoint declaring additional bytes receivable
|
||||
case REQUEST:
|
||||
id = dataIn.readUnsignedShort();
|
||||
info = connectionTable.get(id);
|
||||
if (info == null)
|
||||
throw new IOException(
|
||||
"REQUEST: Invalid connection ID");
|
||||
length = dataIn.readInt();
|
||||
|
||||
if (multiplexLog.isLoggable(Log.VERBOSE)) {
|
||||
multiplexLog.log(Log.VERBOSE,
|
||||
"operation REQUEST " + id + ": " + length);
|
||||
}
|
||||
|
||||
info.out.request(length);
|
||||
break;
|
||||
|
||||
// remote endpoint transmitting data packet
|
||||
case TRANSMIT:
|
||||
id = dataIn.readUnsignedShort();
|
||||
info = connectionTable.get(id);
|
||||
if (info == null)
|
||||
throw new IOException("SEND: Invalid connection ID");
|
||||
length = dataIn.readInt();
|
||||
|
||||
if (multiplexLog.isLoggable(Log.VERBOSE)) {
|
||||
multiplexLog.log(Log.VERBOSE,
|
||||
"operation TRANSMIT " + id + ": " + length);
|
||||
}
|
||||
|
||||
info.in.receive(length, dataIn);
|
||||
break;
|
||||
|
||||
default:
|
||||
throw new IOException("Invalid operation: " +
|
||||
Integer.toHexString(op));
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
shutDown();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Initiate a new multiplexed connection through the underlying
|
||||
* connection.
|
||||
*/
|
||||
public synchronized TCPConnection openConnection() throws IOException
|
||||
{
|
||||
// generate ID that should not be already used
|
||||
// If all possible 32768 IDs are used,
|
||||
// this method will block searching for a new ID forever.
|
||||
int id;
|
||||
do {
|
||||
lastID = (++ lastID) & 0x7FFF;
|
||||
id = lastID;
|
||||
|
||||
// The orig flag (copied to the high bit of the ID) is used
|
||||
// to have two distinct ranges to choose IDs from for the
|
||||
// two endpoints.
|
||||
if (orig)
|
||||
id |= 0x8000;
|
||||
} while (connectionTable.get(id) != null);
|
||||
|
||||
// create multiplexing streams and bookkeeping information
|
||||
MultiplexConnectionInfo info = new MultiplexConnectionInfo(id);
|
||||
info.in = new MultiplexInputStream(this, info, 2048);
|
||||
info.out = new MultiplexOutputStream(this, info, 2048);
|
||||
|
||||
// add to connection table if multiplexer has not died
|
||||
synchronized (connectionTable) {
|
||||
if (!alive)
|
||||
throw new IOException("Multiplexer connection dead");
|
||||
if (numConnections >= maxConnections)
|
||||
throw new IOException("Cannot exceed " + maxConnections +
|
||||
" simultaneous multiplexed connections");
|
||||
connectionTable.put(id, info);
|
||||
++ numConnections;
|
||||
}
|
||||
|
||||
// inform remote endpoint of new connection
|
||||
synchronized (dataOut) {
|
||||
try {
|
||||
dataOut.writeByte(OPEN);
|
||||
dataOut.writeShort(id);
|
||||
dataOut.flush();
|
||||
} catch (IOException e) {
|
||||
multiplexLog.log(Log.BRIEF, "exception: ", e);
|
||||
|
||||
shutDown();
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
return new TCPConnection(channel, info.in, info.out);
|
||||
}
|
||||
|
||||
/**
|
||||
* Shut down all connections and clean up.
|
||||
*/
|
||||
public void shutDown()
|
||||
{
|
||||
// inform all associated streams
|
||||
synchronized (connectionTable) {
|
||||
// return if multiplexer already officially dead
|
||||
if (!alive)
|
||||
return;
|
||||
alive = false;
|
||||
|
||||
Enumeration<MultiplexConnectionInfo> enum_ =
|
||||
connectionTable.elements();
|
||||
while (enum_.hasMoreElements()) {
|
||||
MultiplexConnectionInfo info = enum_.nextElement();
|
||||
info.in.disconnect();
|
||||
info.out.disconnect();
|
||||
}
|
||||
connectionTable.clear();
|
||||
numConnections = 0;
|
||||
}
|
||||
|
||||
// close underlying connection, if possible (and not already done)
|
||||
try {
|
||||
in.close();
|
||||
} catch (IOException e) {
|
||||
}
|
||||
try {
|
||||
out.close();
|
||||
} catch (IOException e) {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Send request for more data on connection to remote endpoint.
|
||||
* @param info connection information structure
|
||||
* @param len number of more bytes that can be received
|
||||
*/
|
||||
void sendRequest(MultiplexConnectionInfo info, int len) throws IOException
|
||||
{
|
||||
synchronized (dataOut) {
|
||||
if (alive && !info.closed)
|
||||
try {
|
||||
dataOut.writeByte(REQUEST);
|
||||
dataOut.writeShort(info.id);
|
||||
dataOut.writeInt(len);
|
||||
dataOut.flush();
|
||||
} catch (IOException e) {
|
||||
multiplexLog.log(Log.BRIEF, "exception: ", e);
|
||||
|
||||
shutDown();
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Send packet of requested data on connection to remote endpoint.
|
||||
* @param info connection information structure
|
||||
* @param buf array containing bytes to send
|
||||
* @param off offset of first array index of packet
|
||||
* @param len number of bytes in packet to send
|
||||
*/
|
||||
void sendTransmit(MultiplexConnectionInfo info,
|
||||
byte buf[], int off, int len) throws IOException
|
||||
{
|
||||
synchronized (dataOut) {
|
||||
if (alive && !info.closed)
|
||||
try {
|
||||
dataOut.writeByte(TRANSMIT);
|
||||
dataOut.writeShort(info.id);
|
||||
dataOut.writeInt(len);
|
||||
dataOut.write(buf, off, len);
|
||||
dataOut.flush();
|
||||
} catch (IOException e) {
|
||||
multiplexLog.log(Log.BRIEF, "exception: ", e);
|
||||
|
||||
shutDown();
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Inform remote endpoint that connection has been closed.
|
||||
* @param info connection information structure
|
||||
*/
|
||||
void sendClose(MultiplexConnectionInfo info) throws IOException
|
||||
{
|
||||
info.out.disconnect();
|
||||
synchronized (dataOut) {
|
||||
if (alive && !info.closed)
|
||||
try {
|
||||
dataOut.writeByte(CLOSE);
|
||||
dataOut.writeShort(info.id);
|
||||
dataOut.flush();
|
||||
info.closed = true;
|
||||
} catch (IOException e) {
|
||||
multiplexLog.log(Log.BRIEF, "exception: ", e);
|
||||
|
||||
shutDown();
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Acknowledge remote endpoint's closing of connection.
|
||||
* @param info connection information structure
|
||||
*/
|
||||
void sendCloseAck(MultiplexConnectionInfo info) throws IOException
|
||||
{
|
||||
synchronized (dataOut) {
|
||||
if (alive && !info.closed)
|
||||
try {
|
||||
dataOut.writeByte(CLOSEACK);
|
||||
dataOut.writeShort(info.id);
|
||||
dataOut.flush();
|
||||
info.closed = true;
|
||||
} catch (IOException e) {
|
||||
multiplexLog.log(Log.BRIEF, "exception: ", e);
|
||||
|
||||
shutDown();
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Shut down connection upon finalization.
|
||||
*/
|
||||
@SuppressWarnings("deprecation")
|
||||
protected void finalize() throws Throwable
|
||||
{
|
||||
super.finalize();
|
||||
shutDown();
|
||||
}
|
||||
}
|
@ -1,55 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 1996, Oracle and/or its affiliates. All rights reserved.
|
||||
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
||||
*
|
||||
* This code is free software; you can redistribute it and/or modify it
|
||||
* under the terms of the GNU General Public License version 2 only, as
|
||||
* published by the Free Software Foundation. Oracle designates this
|
||||
* particular file as subject to the "Classpath" exception as provided
|
||||
* by Oracle in the LICENSE file that accompanied this code.
|
||||
*
|
||||
* This code is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
|
||||
* version 2 for more details (a copy is included in the LICENSE file that
|
||||
* accompanied this code).
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License version
|
||||
* 2 along with this work; if not, write to the Free Software Foundation,
|
||||
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||
*
|
||||
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
|
||||
* or visit www.oracle.com if you need additional information or have any
|
||||
* questions.
|
||||
*/
|
||||
package sun.rmi.transport.tcp;
|
||||
|
||||
/**
|
||||
* MultiplexConnectionInfo groups related information about a
|
||||
* virtual connection managed by a ConnectionMultiplexer object.
|
||||
*
|
||||
* @author Peter Jones
|
||||
*/
|
||||
class MultiplexConnectionInfo {
|
||||
|
||||
/** integer that uniquely identifies this connection */
|
||||
int id;
|
||||
|
||||
/** input stream for reading from connection */
|
||||
MultiplexInputStream in = null;
|
||||
|
||||
/** output stream for writing to connection */
|
||||
MultiplexOutputStream out = null;
|
||||
|
||||
/** true if this connection has been closed */
|
||||
boolean closed = false;
|
||||
|
||||
/**
|
||||
* Create information structure for given connection identifier.
|
||||
* @param id connection identifier
|
||||
*/
|
||||
MultiplexConnectionInfo(int id)
|
||||
{
|
||||
this.id = id;
|
||||
}
|
||||
}
|
@ -1,213 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 1996, 1997, Oracle and/or its affiliates. All rights reserved.
|
||||
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
||||
*
|
||||
* This code is free software; you can redistribute it and/or modify it
|
||||
* under the terms of the GNU General Public License version 2 only, as
|
||||
* published by the Free Software Foundation. Oracle designates this
|
||||
* particular file as subject to the "Classpath" exception as provided
|
||||
* by Oracle in the LICENSE file that accompanied this code.
|
||||
*
|
||||
* This code is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
|
||||
* version 2 for more details (a copy is included in the LICENSE file that
|
||||
* accompanied this code).
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License version
|
||||
* 2 along with this work; if not, write to the Free Software Foundation,
|
||||
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||
*
|
||||
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
|
||||
* or visit www.oracle.com if you need additional information or have any
|
||||
* questions.
|
||||
*/
|
||||
package sun.rmi.transport.tcp;
|
||||
|
||||
import java.io.*;
|
||||
|
||||
/**
|
||||
* MultiplexInputStream manages receiving data over a connection managed
|
||||
* by a ConnectionMultiplexer object. This object is responsible for
|
||||
* requesting more bytes of data as space in its internal buffer becomes
|
||||
* available.
|
||||
*
|
||||
* @author Peter Jones
|
||||
*/
|
||||
final class MultiplexInputStream extends InputStream {
|
||||
|
||||
/** object managing multiplexed connection */
|
||||
private ConnectionMultiplexer manager;
|
||||
|
||||
/** information about the connection this is the input stream for */
|
||||
private MultiplexConnectionInfo info;
|
||||
|
||||
/** input buffer */
|
||||
private byte buffer[];
|
||||
|
||||
/** number of real data bytes present in buffer */
|
||||
private int present = 0;
|
||||
|
||||
/** current position to read from in input buffer */
|
||||
private int pos = 0;
|
||||
|
||||
/** pending number of bytes this stream has requested */
|
||||
private int requested = 0;
|
||||
|
||||
/** true if this connection has been disconnected */
|
||||
private boolean disconnected = false;
|
||||
|
||||
/**
|
||||
* lock acquired to access shared variables:
|
||||
* buffer, present, pos, requested, & disconnected
|
||||
* WARNING: Any of the methods manager.send*() should not be
|
||||
* invoked while this lock is held, since they could potentially
|
||||
* block if the underlying connection's transport buffers are
|
||||
* full, and the manager may need to acquire this lock to process
|
||||
* and consume data coming over the underlying connection.
|
||||
*/
|
||||
private Object lock = new Object();
|
||||
|
||||
/** level at which more data is requested when read past */
|
||||
private int waterMark;
|
||||
|
||||
/** data structure for holding reads of one byte */
|
||||
private byte temp[] = new byte[1];
|
||||
|
||||
/**
|
||||
* Create a new MultiplexInputStream for the given manager.
|
||||
* @param manager object that manages this connection
|
||||
* @param info structure for connection this stream reads from
|
||||
* @param bufferLength length of input buffer
|
||||
*/
|
||||
MultiplexInputStream(
|
||||
ConnectionMultiplexer manager,
|
||||
MultiplexConnectionInfo info,
|
||||
int bufferLength)
|
||||
{
|
||||
this.manager = manager;
|
||||
this.info = info;
|
||||
|
||||
buffer = new byte[bufferLength];
|
||||
waterMark = bufferLength / 2;
|
||||
}
|
||||
|
||||
/**
|
||||
* Read a byte from the connection.
|
||||
*/
|
||||
public synchronized int read() throws IOException
|
||||
{
|
||||
int n = read(temp, 0, 1);
|
||||
if (n != 1)
|
||||
return -1;
|
||||
return temp[0] & 0xFF;
|
||||
}
|
||||
|
||||
/**
|
||||
* Read a subarray of bytes from connection. This method blocks for
|
||||
* at least one byte, and it returns the number of bytes actually read,
|
||||
* or -1 if the end of the stream was detected.
|
||||
* @param b array to read bytes into
|
||||
* @param off offset of beginning of bytes to read into
|
||||
* @param len number of bytes to read
|
||||
*/
|
||||
public synchronized int read(byte b[], int off, int len) throws IOException
|
||||
{
|
||||
if (len <= 0)
|
||||
return 0;
|
||||
|
||||
int moreSpace;
|
||||
synchronized (lock) {
|
||||
if (pos >= present)
|
||||
pos = present = 0;
|
||||
else if (pos >= waterMark) {
|
||||
System.arraycopy(buffer, pos, buffer, 0, present - pos);
|
||||
present -= pos;
|
||||
pos = 0;
|
||||
}
|
||||
int freeSpace = buffer.length - present;
|
||||
moreSpace = Math.max(freeSpace - requested, 0);
|
||||
}
|
||||
if (moreSpace > 0)
|
||||
manager.sendRequest(info, moreSpace);
|
||||
synchronized (lock) {
|
||||
requested += moreSpace;
|
||||
while ((pos >= present) && !disconnected) {
|
||||
try {
|
||||
lock.wait();
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
}
|
||||
if (disconnected && pos >= present)
|
||||
return -1;
|
||||
|
||||
int available = present - pos;
|
||||
if (len < available) {
|
||||
System.arraycopy(buffer, pos, b, off, len);
|
||||
pos += len;
|
||||
return len;
|
||||
}
|
||||
else {
|
||||
System.arraycopy(buffer, pos, b, off, available);
|
||||
pos = present = 0;
|
||||
// could send another request here, if len > available??
|
||||
return available;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the number of bytes immediately available for reading.
|
||||
*/
|
||||
public int available() throws IOException
|
||||
{
|
||||
synchronized (lock) {
|
||||
return present - pos;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Close this connection.
|
||||
*/
|
||||
public void close() throws IOException
|
||||
{
|
||||
manager.sendClose(info);
|
||||
}
|
||||
|
||||
/**
|
||||
* Receive bytes transmitted from connection at remote endpoint.
|
||||
* @param length number of bytes transmitted
|
||||
* @param in input stream with those bytes ready to be read
|
||||
*/
|
||||
void receive(int length, DataInputStream in)
|
||||
throws IOException
|
||||
{
|
||||
/* TO DO: Optimize so that data received from stream can be loaded
|
||||
* directly into user's buffer if there is a pending read().
|
||||
*/
|
||||
synchronized (lock) {
|
||||
if ((pos > 0) && ((buffer.length - present) < length)) {
|
||||
System.arraycopy(buffer, pos, buffer, 0, present - pos);
|
||||
present -= pos;
|
||||
pos = 0;
|
||||
}
|
||||
if ((buffer.length - present) < length)
|
||||
throw new IOException("Receive buffer overflow");
|
||||
in.readFully(buffer, present, length);
|
||||
present += length;
|
||||
requested -= length;
|
||||
lock.notifyAll();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Disconnect this stream from all connection activity.
|
||||
*/
|
||||
void disconnect()
|
||||
{
|
||||
synchronized (lock) {
|
||||
disconnected = true;
|
||||
lock.notifyAll();
|
||||
}
|
||||
}
|
||||
}
|
@ -1,231 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 1996, 2013, Oracle and/or its affiliates. All rights reserved.
|
||||
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
||||
*
|
||||
* This code is free software; you can redistribute it and/or modify it
|
||||
* under the terms of the GNU General Public License version 2 only, as
|
||||
* published by the Free Software Foundation. Oracle designates this
|
||||
* particular file as subject to the "Classpath" exception as provided
|
||||
* by Oracle in the LICENSE file that accompanied this code.
|
||||
*
|
||||
* This code is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
|
||||
* version 2 for more details (a copy is included in the LICENSE file that
|
||||
* accompanied this code).
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License version
|
||||
* 2 along with this work; if not, write to the Free Software Foundation,
|
||||
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||
*
|
||||
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
|
||||
* or visit www.oracle.com if you need additional information or have any
|
||||
* questions.
|
||||
*/
|
||||
package sun.rmi.transport.tcp;
|
||||
|
||||
import java.io.*;
|
||||
|
||||
/**
|
||||
* MultiplexOutputStream manages sending data over a connection managed
|
||||
* by a ConnectionMultiplexer object. Data written is buffered until the
|
||||
* internal buffer is full or the flush() method is called, at which
|
||||
* point it attempts to push a packet of bytes through to the remote
|
||||
* endpoint. This will never push more bytes than the amount already
|
||||
* requested by the remote endpoint (to prevent receive buffer from
|
||||
* overflowing), so if the write() and flush() methods will block
|
||||
* until their operation can complete if enough bytes cannot be
|
||||
* pushed immediately.
|
||||
*
|
||||
* @author Peter Jones
|
||||
*/
|
||||
final class MultiplexOutputStream extends OutputStream {
|
||||
|
||||
/** object managing multiplexed connection */
|
||||
private ConnectionMultiplexer manager;
|
||||
|
||||
/** information about the connection this is the output stream for */
|
||||
private MultiplexConnectionInfo info;
|
||||
|
||||
/** output buffer */
|
||||
private byte buffer[];
|
||||
|
||||
/** current position to write to in output buffer */
|
||||
private int pos = 0;
|
||||
|
||||
/** pending number of bytes requested by remote endpoint */
|
||||
private int requested = 0;
|
||||
|
||||
/** true if this connection has been disconnected */
|
||||
private boolean disconnected = false;
|
||||
|
||||
/**
|
||||
* lock acquired to access shared variables:
|
||||
* requested & disconnected
|
||||
* WARNING: Any of the methods manager.send*() should not be
|
||||
* invoked while this lock is held, since they could potentially
|
||||
* block if the underlying connection's transport buffers are
|
||||
* full, and the manager may need to acquire this lock to process
|
||||
* and consume data coming over the underlying connection.
|
||||
*/
|
||||
private Object lock = new Object();
|
||||
|
||||
/**
|
||||
* Create a new MultiplexOutputStream for the given manager.
|
||||
* @param manager object that manages this connection
|
||||
* @param info structure for connection this stream writes to
|
||||
* @param bufferLength length of output buffer
|
||||
*/
|
||||
MultiplexOutputStream(
|
||||
ConnectionMultiplexer manager,
|
||||
MultiplexConnectionInfo info,
|
||||
int bufferLength)
|
||||
{
|
||||
this.manager = manager;
|
||||
this.info = info;
|
||||
|
||||
buffer = new byte[bufferLength];
|
||||
pos = 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Write a byte over connection.
|
||||
* @param b byte of data to write
|
||||
*/
|
||||
public synchronized void write(int b) throws IOException
|
||||
{
|
||||
while (pos >= buffer.length)
|
||||
push();
|
||||
buffer[pos ++] = (byte) b;
|
||||
}
|
||||
|
||||
/**
|
||||
* Write a subarray of bytes over connection.
|
||||
* @param b array containing bytes to write
|
||||
* @param off offset of beginning of bytes to write
|
||||
* @param len number of bytes to write
|
||||
*/
|
||||
public synchronized void write(byte b[], int off, int len)
|
||||
throws IOException
|
||||
{
|
||||
if (len <= 0)
|
||||
return;
|
||||
|
||||
// if enough free space in output buffer, just copy into there
|
||||
int freeSpace = buffer.length - pos;
|
||||
if (len <= freeSpace) {
|
||||
System.arraycopy(b, off, buffer, pos, len);
|
||||
pos += len;
|
||||
return;
|
||||
}
|
||||
|
||||
// else, flush buffer and send rest directly to avoid array copy
|
||||
flush();
|
||||
int local_requested;
|
||||
while (true) {
|
||||
synchronized (lock) {
|
||||
while ((local_requested = requested) < 1 && !disconnected) {
|
||||
try {
|
||||
lock.wait();
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
}
|
||||
if (disconnected)
|
||||
throw new IOException("Connection closed");
|
||||
}
|
||||
|
||||
if (local_requested < len) {
|
||||
manager.sendTransmit(info, b, off, local_requested);
|
||||
off += local_requested;
|
||||
len -= local_requested;
|
||||
synchronized (lock) {
|
||||
requested -= local_requested;
|
||||
}
|
||||
}
|
||||
else {
|
||||
manager.sendTransmit(info, b, off, len);
|
||||
synchronized (lock) {
|
||||
requested -= len;
|
||||
}
|
||||
// len = 0;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Guarantee that all data written to this stream has been pushed
|
||||
* over and made available to the remote endpoint.
|
||||
*/
|
||||
public synchronized void flush() throws IOException {
|
||||
while (pos > 0)
|
||||
push();
|
||||
}
|
||||
|
||||
/**
|
||||
* Close this connection.
|
||||
*/
|
||||
public void close() throws IOException
|
||||
{
|
||||
manager.sendClose(info);
|
||||
}
|
||||
|
||||
/**
|
||||
* Take note of more bytes requested by connection at remote endpoint.
|
||||
* @param num number of additional bytes requested
|
||||
*/
|
||||
void request(int num)
|
||||
{
|
||||
synchronized (lock) {
|
||||
requested += num;
|
||||
lock.notifyAll();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Disconnect this stream from all connection activity.
|
||||
*/
|
||||
void disconnect()
|
||||
{
|
||||
synchronized (lock) {
|
||||
disconnected = true;
|
||||
lock.notifyAll();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Push bytes in output buffer to connection at remote endpoint.
|
||||
* This method blocks until at least one byte has been pushed across.
|
||||
*/
|
||||
private void push() throws IOException
|
||||
{
|
||||
int local_requested;
|
||||
synchronized (lock) {
|
||||
while ((local_requested = requested) < 1 && !disconnected) {
|
||||
try {
|
||||
lock.wait();
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
}
|
||||
if (disconnected)
|
||||
throw new IOException("Connection closed");
|
||||
}
|
||||
|
||||
if (local_requested < pos) {
|
||||
manager.sendTransmit(info, buffer, 0, local_requested);
|
||||
System.arraycopy(buffer, local_requested,
|
||||
buffer, 0, pos - local_requested);
|
||||
pos -= local_requested;
|
||||
synchronized (lock) {
|
||||
requested -= local_requested;
|
||||
}
|
||||
}
|
||||
else {
|
||||
manager.sendTransmit(info, buffer, 0, pos);
|
||||
synchronized (lock) {
|
||||
requested -= pos;
|
||||
}
|
||||
pos = 0;
|
||||
}
|
||||
}
|
||||
}
|
@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright (c) 1996, 2013, Oracle and/or its affiliates. All rights reserved.
|
||||
* Copyright (c) 1996, 2017, Oracle and/or its affiliates. All rights reserved.
|
||||
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
||||
*
|
||||
* This code is free software; you can redistribute it and/or modify it
|
||||
@ -67,10 +67,6 @@ public class TCPChannel implements Channel {
|
||||
/** frees cached connections that have expired (guarded by freeList) */
|
||||
private Future<?> reaper = null;
|
||||
|
||||
/** using multiplexer (for bi-directional applet communication */
|
||||
private boolean usingMultiplexer = false;
|
||||
/** connection multiplexer, if used */
|
||||
private ConnectionMultiplexer multiplexer = null;
|
||||
/** connection acceptor (should be in TCPTransport) */
|
||||
private ConnectionAcceptor acceptor;
|
||||
|
||||
@ -210,113 +206,99 @@ public class TCPChannel implements Channel {
|
||||
|
||||
TCPTransport.tcpLog.log(Log.BRIEF, "create connection");
|
||||
|
||||
if (!usingMultiplexer) {
|
||||
Socket sock = ep.newSocket();
|
||||
conn = new TCPConnection(this, sock);
|
||||
Socket sock = ep.newSocket();
|
||||
conn = new TCPConnection(this, sock);
|
||||
|
||||
try {
|
||||
DataOutputStream out =
|
||||
new DataOutputStream(conn.getOutputStream());
|
||||
writeTransportHeader(out);
|
||||
try {
|
||||
DataOutputStream out =
|
||||
new DataOutputStream(conn.getOutputStream());
|
||||
writeTransportHeader(out);
|
||||
|
||||
// choose protocol (single op if not reusable socket)
|
||||
if (!conn.isReusable()) {
|
||||
out.writeByte(TransportConstants.SingleOpProtocol);
|
||||
} else {
|
||||
out.writeByte(TransportConstants.StreamProtocol);
|
||||
out.flush();
|
||||
// choose protocol (single op if not reusable socket)
|
||||
if (!conn.isReusable()) {
|
||||
out.writeByte(TransportConstants.SingleOpProtocol);
|
||||
} else {
|
||||
out.writeByte(TransportConstants.StreamProtocol);
|
||||
out.flush();
|
||||
|
||||
/*
|
||||
* Set socket read timeout to configured value for JRMP
|
||||
* connection handshake; this also serves to guard against
|
||||
* non-JRMP servers that do not respond (see 4322806).
|
||||
*/
|
||||
int originalSoTimeout = 0;
|
||||
try {
|
||||
originalSoTimeout = sock.getSoTimeout();
|
||||
sock.setSoTimeout(handshakeTimeout);
|
||||
} catch (Exception e) {
|
||||
// if we fail to set this, ignore and proceed anyway
|
||||
}
|
||||
|
||||
DataInputStream in =
|
||||
new DataInputStream(conn.getInputStream());
|
||||
byte ack = in.readByte();
|
||||
if (ack != TransportConstants.ProtocolAck) {
|
||||
throw new ConnectIOException(
|
||||
ack == TransportConstants.ProtocolNack ?
|
||||
"JRMP StreamProtocol not supported by server" :
|
||||
"non-JRMP server at remote endpoint");
|
||||
}
|
||||
|
||||
String suggestedHost = in.readUTF();
|
||||
int suggestedPort = in.readInt();
|
||||
if (TCPTransport.tcpLog.isLoggable(Log.VERBOSE)) {
|
||||
TCPTransport.tcpLog.log(Log.VERBOSE,
|
||||
"server suggested " + suggestedHost + ":" +
|
||||
suggestedPort);
|
||||
}
|
||||
|
||||
// set local host name, if unknown
|
||||
TCPEndpoint.setLocalHost(suggestedHost);
|
||||
// do NOT set the default port, because we don't
|
||||
// know if we can't listen YET...
|
||||
|
||||
// write out default endpoint to match protocol
|
||||
// (but it serves no purpose)
|
||||
TCPEndpoint localEp =
|
||||
TCPEndpoint.getLocalEndpoint(0, null, null);
|
||||
out.writeUTF(localEp.getHost());
|
||||
out.writeInt(localEp.getPort());
|
||||
if (TCPTransport.tcpLog.isLoggable(Log.VERBOSE)) {
|
||||
TCPTransport.tcpLog.log(Log.VERBOSE, "using " +
|
||||
localEp.getHost() + ":" + localEp.getPort());
|
||||
}
|
||||
|
||||
/*
|
||||
* After JRMP handshake, set socket read timeout to value
|
||||
* configured for the rest of the lifetime of the
|
||||
* connection. NOTE: this timeout, if configured to a
|
||||
* finite duration, places an upper bound on the time
|
||||
* that a remote method call is permitted to execute.
|
||||
*/
|
||||
try {
|
||||
/*
|
||||
* If socket factory had set a non-zero timeout on its
|
||||
* own, then restore it instead of using the property-
|
||||
* configured value.
|
||||
*/
|
||||
sock.setSoTimeout((originalSoTimeout != 0 ?
|
||||
originalSoTimeout :
|
||||
responseTimeout));
|
||||
} catch (Exception e) {
|
||||
// if we fail to set this, ignore and proceed anyway
|
||||
}
|
||||
|
||||
out.flush();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
/*
|
||||
* Set socket read timeout to configured value for JRMP
|
||||
* connection handshake; this also serves to guard against
|
||||
* non-JRMP servers that do not respond (see 4322806).
|
||||
*/
|
||||
int originalSoTimeout = 0;
|
||||
try {
|
||||
conn.close();
|
||||
} catch (Exception ex) {}
|
||||
if (e instanceof RemoteException) {
|
||||
throw (RemoteException) e;
|
||||
} else {
|
||||
originalSoTimeout = sock.getSoTimeout();
|
||||
sock.setSoTimeout(handshakeTimeout);
|
||||
} catch (Exception e) {
|
||||
// if we fail to set this, ignore and proceed anyway
|
||||
}
|
||||
|
||||
DataInputStream in =
|
||||
new DataInputStream(conn.getInputStream());
|
||||
byte ack = in.readByte();
|
||||
if (ack != TransportConstants.ProtocolAck) {
|
||||
throw new ConnectIOException(
|
||||
"error during JRMP connection establishment", e);
|
||||
ack == TransportConstants.ProtocolNack ?
|
||||
"JRMP StreamProtocol not supported by server" :
|
||||
"non-JRMP server at remote endpoint");
|
||||
}
|
||||
|
||||
String suggestedHost = in.readUTF();
|
||||
int suggestedPort = in.readInt();
|
||||
if (TCPTransport.tcpLog.isLoggable(Log.VERBOSE)) {
|
||||
TCPTransport.tcpLog.log(Log.VERBOSE,
|
||||
"server suggested " + suggestedHost + ":" +
|
||||
suggestedPort);
|
||||
}
|
||||
|
||||
// set local host name, if unknown
|
||||
TCPEndpoint.setLocalHost(suggestedHost);
|
||||
// do NOT set the default port, because we don't
|
||||
// know if we can't listen YET...
|
||||
|
||||
// write out default endpoint to match protocol
|
||||
// (but it serves no purpose)
|
||||
TCPEndpoint localEp =
|
||||
TCPEndpoint.getLocalEndpoint(0, null, null);
|
||||
out.writeUTF(localEp.getHost());
|
||||
out.writeInt(localEp.getPort());
|
||||
if (TCPTransport.tcpLog.isLoggable(Log.VERBOSE)) {
|
||||
TCPTransport.tcpLog.log(Log.VERBOSE, "using " +
|
||||
localEp.getHost() + ":" + localEp.getPort());
|
||||
}
|
||||
|
||||
/*
|
||||
* After JRMP handshake, set socket read timeout to value
|
||||
* configured for the rest of the lifetime of the
|
||||
* connection. NOTE: this timeout, if configured to a
|
||||
* finite duration, places an upper bound on the time
|
||||
* that a remote method call is permitted to execute.
|
||||
*/
|
||||
try {
|
||||
/*
|
||||
* If socket factory had set a non-zero timeout on its
|
||||
* own, then restore it instead of using the property-
|
||||
* configured value.
|
||||
*/
|
||||
sock.setSoTimeout((originalSoTimeout != 0 ?
|
||||
originalSoTimeout :
|
||||
responseTimeout));
|
||||
} catch (Exception e) {
|
||||
// if we fail to set this, ignore and proceed anyway
|
||||
}
|
||||
|
||||
out.flush();
|
||||
}
|
||||
} else {
|
||||
} catch (IOException e) {
|
||||
try {
|
||||
conn = multiplexer.openConnection();
|
||||
} catch (IOException e) {
|
||||
synchronized (this) {
|
||||
usingMultiplexer = false;
|
||||
multiplexer = null;
|
||||
}
|
||||
conn.close();
|
||||
} catch (Exception ex) {}
|
||||
if (e instanceof RemoteException) {
|
||||
throw (RemoteException) e;
|
||||
} else {
|
||||
throw new ConnectIOException(
|
||||
"error opening virtual connection " +
|
||||
"over multiplexed connection", e);
|
||||
"error during JRMP connection establishment", e);
|
||||
}
|
||||
}
|
||||
return conn;
|
||||
@ -387,28 +369,6 @@ public class TCPChannel implements Channel {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Use given connection multiplexer object to obtain new connections
|
||||
* through this channel.
|
||||
*/
|
||||
synchronized void useMultiplexer(ConnectionMultiplexer newMultiplexer) {
|
||||
// for now, always just use the last one given
|
||||
multiplexer = newMultiplexer;
|
||||
|
||||
usingMultiplexer = true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Accept a connection provided over a multiplexed channel.
|
||||
*/
|
||||
void acceptMultiplexConnection(Connection conn) {
|
||||
if (acceptor == null) {
|
||||
acceptor = new ConnectionAcceptor(tr);
|
||||
acceptor.startNewAcceptor();
|
||||
}
|
||||
acceptor.accept(conn);
|
||||
}
|
||||
|
||||
/**
|
||||
* Closes all the connections in the cache, whether timed out or not.
|
||||
*/
|
||||
@ -501,7 +461,7 @@ class ConnectionAcceptor implements Runnable {
|
||||
public void startNewAcceptor() {
|
||||
Thread t = AccessController.doPrivileged(
|
||||
new NewThreadAction(ConnectionAcceptor.this,
|
||||
"Multiplex Accept-" + ++ threadNum,
|
||||
"TCPChannel Accept-" + ++ threadNum,
|
||||
true));
|
||||
t.start();
|
||||
}
|
||||
|
@ -102,11 +102,6 @@ public class TCPTransport extends Transport {
|
||||
AccessController.doPrivileged((PrivilegedAction<Long>) () ->
|
||||
Long.getLong("sun.rmi.transport.tcp.threadKeepAliveTime", 60000));
|
||||
|
||||
/** enable multiplexing protocol */
|
||||
private static final boolean enableMultiplexProtocol = // default false
|
||||
AccessController.doPrivileged((PrivilegedAction<Boolean>) () ->
|
||||
Boolean.getBoolean("sun.rmi.transport.tcp.enableMultiplexProtocol"));
|
||||
|
||||
/** thread pool for connection handlers */
|
||||
private static final ExecutorService connectionThreadPool =
|
||||
new ThreadPoolExecutor(0, maxConnectionThreads,
|
||||
@ -687,6 +682,7 @@ public class TCPTransport extends Transport {
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("fallthrough")
|
||||
private void run0() {
|
||||
TCPEndpoint endpoint = getEndpoint();
|
||||
int port = endpoint.getPort();
|
||||
@ -801,59 +797,11 @@ public class TCPTransport extends Transport {
|
||||
break;
|
||||
|
||||
case TransportConstants.MultiplexProtocol:
|
||||
|
||||
if (!enableMultiplexProtocol) {
|
||||
if (tcpLog.isLoggable(Log.VERBOSE)) {
|
||||
tcpLog.log(Log.VERBOSE, "(port " + port +
|
||||
") rejecting multiplex protocol");
|
||||
}
|
||||
|
||||
// If MultiplexProtocol is disabled, send NACK immediately.
|
||||
out.writeByte(TransportConstants.ProtocolNack);
|
||||
out.flush();
|
||||
break;
|
||||
}
|
||||
|
||||
if (tcpLog.isLoggable(Log.VERBOSE)) {
|
||||
tcpLog.log(Log.VERBOSE, "(port " + port +
|
||||
") accepting multiplex protocol");
|
||||
") rejecting multiplex protocol");
|
||||
}
|
||||
|
||||
// send ack
|
||||
out.writeByte(TransportConstants.ProtocolAck);
|
||||
|
||||
// suggest endpoint (in case client doesn't already have one)
|
||||
if (tcpLog.isLoggable(Log.VERBOSE)) {
|
||||
tcpLog.log(Log.VERBOSE, "(port " + port +
|
||||
") suggesting " + remoteHost + ":" + remotePort);
|
||||
}
|
||||
|
||||
out.writeUTF(remoteHost);
|
||||
out.writeInt(remotePort);
|
||||
out.flush();
|
||||
|
||||
// read endpoint client has decided to use
|
||||
ep = new TCPEndpoint(in.readUTF(), in.readInt(),
|
||||
endpoint.getClientSocketFactory(),
|
||||
endpoint.getServerSocketFactory());
|
||||
if (tcpLog.isLoggable(Log.VERBOSE)) {
|
||||
tcpLog.log(Log.VERBOSE, "(port " +
|
||||
port + ") client using " +
|
||||
ep.getHost() + ":" + ep.getPort());
|
||||
}
|
||||
|
||||
ConnectionMultiplexer multiplexer;
|
||||
synchronized (channelTable) {
|
||||
// create or find channel for this endpoint
|
||||
ch = getChannel(ep);
|
||||
multiplexer =
|
||||
new ConnectionMultiplexer(ch, bufIn, sockOut,
|
||||
false);
|
||||
ch.useMultiplexer(multiplexer);
|
||||
}
|
||||
multiplexer.run();
|
||||
break;
|
||||
|
||||
// Fall-through to reject use of MultiplexProtocol
|
||||
default:
|
||||
// protocol not understood, send nack and close socket
|
||||
out.writeByte(TransportConstants.ProtocolNack);
|
||||
|
Loading…
Reference in New Issue
Block a user