3789983e89
Reviewed-by: darcy, ihse
414 lines
13 KiB
Java
414 lines
13 KiB
Java
/*
|
|
* Copyright (c) 2000, 2010, Oracle and/or its affiliates. All rights reserved.
|
|
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
|
*
|
|
* This code is free software; you can redistribute it and/or modify it
|
|
* under the terms of the GNU General Public License version 2 only, as
|
|
* published by the Free Software Foundation.
|
|
*
|
|
* This code is distributed in the hope that it will be useful, but WITHOUT
|
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
|
|
* version 2 for more details (a copy is included in the LICENSE file that
|
|
* accompanied this code).
|
|
*
|
|
* You should have received a copy of the GNU General Public License version
|
|
* 2 along with this work; if not, write to the Free Software Foundation,
|
|
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
|
|
*
|
|
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
|
|
* or visit www.oracle.com if you need additional information or have any
|
|
* questions.
|
|
*/
|
|
|
|
/* @test
|
|
* @summary Test selectors and socketchannels
|
|
* @library ..
|
|
* @key randomness
|
|
*/
|
|
|
|
import java.io.*;
|
|
import java.net.*;
|
|
import java.nio.*;
|
|
import java.nio.channels.*;
|
|
import java.nio.channels.spi.SelectorProvider;
|
|
import java.util.*;
|
|
|
|
|
|
public class SelectorTest {
|
|
private static List clientList = new LinkedList();
|
|
private static Random rnd = new Random();
|
|
public static int NUM_CLIENTS = 30;
|
|
public static int TEST_PORT = 31452;
|
|
static PrintStream log = System.err;
|
|
private static int FINISH_TIME = 30000;
|
|
|
|
/*
|
|
* Usage note
|
|
*
|
|
* java SelectorTest [server] [client <host>] [<port>]
|
|
*
|
|
* No arguments runs both client and server in separate threads
|
|
* using the default port of 31452.
|
|
*
|
|
* client runs the client on this machine and connects to server
|
|
* at the given IP address.
|
|
*
|
|
* server runs the server on localhost.
|
|
*/
|
|
public static void main(String[] args) throws Exception {
|
|
if (args.length == 0) {
|
|
Server server = new Server(0);
|
|
server.start();
|
|
try {
|
|
Thread.sleep(1000);
|
|
} catch (InterruptedException e) { }
|
|
InetSocketAddress isa
|
|
= new InetSocketAddress(InetAddress.getLocalHost(), server.port());
|
|
Client client = new Client(isa);
|
|
client.start();
|
|
if ((server.finish(FINISH_TIME) & client.finish(FINISH_TIME)) == 0)
|
|
throw new Exception("Failure");
|
|
log.println();
|
|
|
|
} else if (args[0].equals("server")) {
|
|
|
|
if (args.length > 1)
|
|
TEST_PORT = Integer.parseInt(args[1]);
|
|
Server server = new Server(TEST_PORT);
|
|
server.start();
|
|
if (server.finish(FINISH_TIME) == 0)
|
|
throw new Exception("Failure");
|
|
log.println();
|
|
|
|
} else if (args[0].equals("client")) {
|
|
|
|
if (args.length < 2) {
|
|
log.println("No host specified: terminating.");
|
|
return;
|
|
}
|
|
String ip = args[1];
|
|
if (args.length > 2)
|
|
TEST_PORT = Integer.parseInt(args[2]);
|
|
InetAddress ia = InetAddress.getByName(ip);
|
|
InetSocketAddress isa = new InetSocketAddress(ia, TEST_PORT);
|
|
Client client = new Client(isa);
|
|
client.start();
|
|
if (client.finish(FINISH_TIME) == 0)
|
|
throw new Exception("Failure");
|
|
log.println();
|
|
|
|
} else {
|
|
System.out.println("Usage note:");
|
|
System.out.println("java SelectorTest [server] [client <host>] [<port>]");
|
|
System.out.println("No arguments runs both client and server in separate threads using the default port of 31452.");
|
|
System.out.println("client runs the client on this machine and connects to the server specified.");
|
|
System.out.println("server runs the server on localhost.");
|
|
}
|
|
}
|
|
|
|
static class Client extends TestThread {
|
|
InetSocketAddress isa;
|
|
Client(InetSocketAddress isa) {
|
|
super("Client", SelectorTest.log);
|
|
this.isa = isa;
|
|
}
|
|
|
|
public void go() throws Exception {
|
|
log.println("starting client...");
|
|
for (int i=0; i<NUM_CLIENTS; i++)
|
|
clientList.add(new RemoteEntity(i, isa, log));
|
|
|
|
Collections.shuffle(clientList);
|
|
|
|
log.println("created "+NUM_CLIENTS+" clients");
|
|
do {
|
|
for (Iterator i = clientList.iterator(); i.hasNext(); ) {
|
|
RemoteEntity re = (RemoteEntity) i.next();
|
|
if (re.cycle()) {
|
|
i.remove();
|
|
}
|
|
}
|
|
Collections.shuffle(clientList);
|
|
} while (clientList.size() > 0);
|
|
}
|
|
}
|
|
|
|
static class Server extends TestThread {
|
|
private final ServerSocketChannel ssc;
|
|
private List socketList = new ArrayList();
|
|
private ServerSocket ss;
|
|
private int connectionsAccepted = 0;
|
|
private Selector pollSelector;
|
|
private Selector acceptSelector;
|
|
private Set pkeys;
|
|
private Set pskeys;
|
|
|
|
Server(int port) throws IOException {
|
|
super("Server", SelectorTest.log);
|
|
this.ssc = ServerSocketChannel.open().bind(new InetSocketAddress(port));
|
|
}
|
|
|
|
int port() {
|
|
return ssc.socket().getLocalPort();
|
|
}
|
|
|
|
public void go() throws Exception {
|
|
log.println("starting server...");
|
|
acceptSelector = SelectorProvider.provider().openSelector();
|
|
pollSelector = SelectorProvider.provider().openSelector();
|
|
pkeys = pollSelector.keys();
|
|
pskeys = pollSelector.selectedKeys();
|
|
Set readyKeys = acceptSelector.selectedKeys();
|
|
RequestHandler rh = new RequestHandler(pollSelector, log);
|
|
Thread requestThread = new Thread(rh);
|
|
|
|
requestThread.start();
|
|
|
|
ssc.configureBlocking(false);
|
|
SelectionKey acceptKey = ssc.register(acceptSelector,
|
|
SelectionKey.OP_ACCEPT);
|
|
while(connectionsAccepted < SelectorTest.NUM_CLIENTS) {
|
|
int keysAdded = acceptSelector.select(100);
|
|
if (keysAdded > 0) {
|
|
Iterator i = readyKeys.iterator();
|
|
while(i.hasNext()) {
|
|
SelectionKey sk = (SelectionKey)i.next();
|
|
i.remove();
|
|
ServerSocketChannel nextReady =
|
|
(ServerSocketChannel)sk.channel();
|
|
SocketChannel sc = nextReady.accept();
|
|
connectionsAccepted++;
|
|
if (sc != null) {
|
|
sc.configureBlocking(false);
|
|
synchronized (pkeys) {
|
|
sc.register(pollSelector, SelectionKey.OP_READ);
|
|
}
|
|
} else {
|
|
throw new RuntimeException(
|
|
"Socket does not support Channels");
|
|
}
|
|
}
|
|
}
|
|
}
|
|
acceptKey.cancel();
|
|
requestThread.join();
|
|
acceptSelector.close();
|
|
pollSelector.close();
|
|
}
|
|
}
|
|
}
|
|
|
|
class RemoteEntity {
|
|
private static Random rnd = new Random();
|
|
int id;
|
|
ByteBuffer data;
|
|
int dataWrittenIndex;
|
|
int totalDataLength;
|
|
boolean initiated = false;
|
|
boolean connected = false;
|
|
boolean written = false;
|
|
boolean acked = false;
|
|
boolean closed = false;
|
|
private SocketChannel sc;
|
|
ByteBuffer ackBuffer;
|
|
PrintStream log;
|
|
InetSocketAddress server;
|
|
|
|
RemoteEntity(int id, InetSocketAddress server, PrintStream log)
|
|
throws Exception
|
|
{
|
|
int connectFailures = 0;
|
|
this.id = id;
|
|
this.log = log;
|
|
this.server = server;
|
|
|
|
sc = SocketChannel.open();
|
|
sc.configureBlocking(false);
|
|
|
|
// Prepare the data buffer to write out from this entity
|
|
// Let's use both slow and fast buffers
|
|
if (rnd.nextBoolean())
|
|
data = ByteBuffer.allocateDirect(100);
|
|
else
|
|
data = ByteBuffer.allocate(100);
|
|
String number = Integer.toString(id);
|
|
if (number.length() == 1)
|
|
number = "0"+number;
|
|
String source = "Testing from " + number;
|
|
data.put(source.getBytes("8859_1"));
|
|
data.flip();
|
|
totalDataLength = source.length();
|
|
|
|
// Allocate an ack buffer
|
|
ackBuffer = ByteBuffer.allocateDirect(10);
|
|
}
|
|
|
|
private void reset() throws Exception {
|
|
sc.close();
|
|
sc = SocketChannel.open();
|
|
sc.configureBlocking(false);
|
|
}
|
|
|
|
private void connect() throws Exception {
|
|
try {
|
|
connected = sc.connect(server);
|
|
initiated = true;
|
|
} catch (ConnectException e) {
|
|
initiated = false;
|
|
reset();
|
|
}
|
|
}
|
|
|
|
private void finishConnect() throws Exception {
|
|
try {
|
|
connected = sc.finishConnect();
|
|
} catch (IOException e) {
|
|
initiated = false;
|
|
reset();
|
|
}
|
|
}
|
|
|
|
int id() {
|
|
return id;
|
|
}
|
|
|
|
boolean cycle() throws Exception {
|
|
if (!initiated)
|
|
connect();
|
|
else if (!connected)
|
|
finishConnect();
|
|
else if (!written)
|
|
writeCycle();
|
|
else if (!acked)
|
|
ackCycle();
|
|
else if (!closed)
|
|
close();
|
|
return closed;
|
|
}
|
|
|
|
private void ackCycle() throws Exception {
|
|
//log.println("acking from "+id);
|
|
int bytesRead = sc.read(ackBuffer);
|
|
if (bytesRead > 0) {
|
|
acked = true;
|
|
}
|
|
}
|
|
|
|
private void close() throws Exception {
|
|
sc.close();
|
|
closed = true;
|
|
}
|
|
|
|
private void writeCycle() throws Exception {
|
|
log.println("writing from "+id);
|
|
int numBytesToWrite = rnd.nextInt(10)+1;
|
|
int newWriteTarget = dataWrittenIndex + numBytesToWrite;
|
|
if (newWriteTarget > totalDataLength)
|
|
newWriteTarget = totalDataLength;
|
|
data.limit(newWriteTarget);
|
|
int bytesWritten = sc.write(data);
|
|
if (bytesWritten > 0)
|
|
dataWrittenIndex += bytesWritten;
|
|
if (dataWrittenIndex == totalDataLength) {
|
|
written = true;
|
|
sc.socket().shutdownOutput();
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
|
|
class RequestHandler implements Runnable {
|
|
private static Random rnd = new Random();
|
|
private Selector selector;
|
|
private int connectionsHandled = 0;
|
|
private HashMap dataBin = new HashMap();
|
|
PrintStream log;
|
|
|
|
public RequestHandler(Selector selector, PrintStream log) {
|
|
this.selector = selector;
|
|
this.log = log;
|
|
}
|
|
|
|
public void run() {
|
|
log.println("starting request handler...");
|
|
int connectionsAccepted = 0;
|
|
|
|
Set nKeys = selector.keys();
|
|
Set readyKeys = selector.selectedKeys();
|
|
|
|
try {
|
|
while(connectionsHandled < SelectorTest.NUM_CLIENTS) {
|
|
int numKeys = selector.select(100);
|
|
|
|
// Process channels with data
|
|
synchronized (nKeys) {
|
|
if (readyKeys.size() > 0) {
|
|
Iterator i = readyKeys.iterator();
|
|
while(i.hasNext()) {
|
|
SelectionKey sk = (SelectionKey)i.next();
|
|
i.remove();
|
|
SocketChannel sc = (SocketChannel)sk.channel();
|
|
if (sc.isOpen())
|
|
read(sk, sc);
|
|
}
|
|
}
|
|
}
|
|
|
|
// Give other threads a chance to run
|
|
if (numKeys == 0) {
|
|
try {
|
|
Thread.sleep(1);
|
|
} catch (Exception x) {}
|
|
}
|
|
}
|
|
} catch (Exception e) {
|
|
log.println("Unexpected error 1: "+e);
|
|
e.printStackTrace();
|
|
}
|
|
}
|
|
|
|
private void read(SelectionKey sk, SocketChannel sc) throws Exception {
|
|
ByteBuffer bin = (ByteBuffer)dataBin.get(sc);
|
|
if (bin == null) {
|
|
if (rnd.nextBoolean())
|
|
bin = ByteBuffer.allocateDirect(100);
|
|
else
|
|
bin = ByteBuffer.allocate(100);
|
|
dataBin.put(sc, bin);
|
|
}
|
|
|
|
int bytesRead = 0;
|
|
do {
|
|
bytesRead = sc.read(bin);
|
|
} while(bytesRead > 0);
|
|
|
|
if (bytesRead == -1) {
|
|
sk.interestOps(0);
|
|
bin.flip();
|
|
int size = bin.limit();
|
|
byte[] data = new byte[size];
|
|
for(int j=0; j<size; j++)
|
|
data[j] = bin.get();
|
|
String message = new String(data, "8859_1");
|
|
connectionsHandled++;
|
|
acknowledge(sc);
|
|
log.println("Received >>>"+message + "<<<");
|
|
log.println("Handled: "+connectionsHandled);
|
|
}
|
|
}
|
|
|
|
private void acknowledge(SocketChannel sc) throws Exception {
|
|
ByteBuffer ackBuffer = ByteBuffer.allocateDirect(10);
|
|
String s = "ack";
|
|
ackBuffer.put(s.getBytes("8859_1"));
|
|
ackBuffer.flip();
|
|
int bytesWritten = 0;
|
|
while(bytesWritten == 0) {
|
|
bytesWritten += sc.write(ackBuffer);
|
|
}
|
|
sc.close();
|
|
}
|
|
}
|