/* * Copyright 2009 Sun Microsystems, Inc. All Rights Reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License version 2 only, as * published by the Free Software Foundation. * * This code is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License * version 2 for more details (a copy is included in the LICENSE file that * accompanied this code). * * You should have received a copy of the GNU General Public License version * 2 along with this work; if not, write to the Free Software Foundation, * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. * * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, * CA 95054 USA or visit www.sun.com if you need additional information or * have any questions. */ /* @test * @bug 6863110 * @summary Newly connected/accepted SctpChannel should fire OP_READ if registered with a Selector * @author chegar */ import java.net.InetSocketAddress; import java.net.SocketAddress; import java.io.IOException; import java.util.Iterator; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.nio.ByteBuffer; import java.nio.channels.Selector; import java.nio.channels.SelectionKey; import com.sun.nio.sctp.AbstractNotificationHandler; import com.sun.nio.sctp.AssociationChangeNotification; import com.sun.nio.sctp.AssociationChangeNotification.AssocChangeEvent; import com.sun.nio.sctp.HandlerResult; import com.sun.nio.sctp.Notification; import com.sun.nio.sctp.SctpChannel; import com.sun.nio.sctp.SctpServerChannel; import com.sun.nio.sctp.ShutdownNotification; import static java.lang.System.out; import static java.lang.System.err; import static java.nio.channels.SelectionKey.OP_CONNECT; import static java.nio.channels.SelectionKey.OP_READ; public class CommUp { static CountDownLatch acceptLatch = new CountDownLatch(1); static final int TIMEOUT = 10000; CommUpNotificationHandler clientHandler = new CommUpNotificationHandler(); CommUpNotificationHandler serverHandler = new CommUpNotificationHandler(); CommUpServer server; Thread clientThread; void test(String[] args) { SocketAddress address = null; if (!Util.isSCTPSupported()) { out.println("SCTP protocol is not supported"); out.println("Test cannot be run"); return; } if (args.length == 2) { /* requested to connecct to a specific address */ try { int port = Integer.valueOf(args[1]); address = new InetSocketAddress(args[0], port); } catch (NumberFormatException nfe) { err.println(nfe); } } else { /* start server on local machine, default */ try { server = new CommUpServer(); server.start(); address = server.address(); debug("Server started and listening on " + address); } catch (IOException ioe) { ioe.printStackTrace(); return; } } /* store the main thread so that the server can interrupt it, if necessary */ clientThread = Thread.currentThread(); doClient(address); } void doClient(SocketAddress peerAddress) { SctpChannel sc = null; try { debug("connecting to " + peerAddress); sc = SctpChannel.open(); sc.configureBlocking(false); check(sc.isBlocking() == false, "Should be in non-blocking mode"); sc.connect(peerAddress); Selector selector = Selector.open(); SelectionKey selectiontKey = sc.register(selector, OP_CONNECT); /* Expect two interest Ops */ boolean opConnectReceived = false; boolean opReadReceived = false; for (int z=0; z<2; z++) { debug("select " + z); int keysAdded = selector.select(TIMEOUT); debug("returned " + keysAdded + " keys"); if (keysAdded > 0) { Set keys = selector.selectedKeys(); Iterator i = keys.iterator(); while(i.hasNext()) { SelectionKey sk = i.next(); i.remove(); SctpChannel readyChannel = (SctpChannel)sk.channel(); /* OP_CONNECT */ if (sk.isConnectable()) { /* some trivial checks */ check(opConnectReceived == false, "should only received one OP_CONNECT"); check(opReadReceived == false, "should not receive OP_READ before OP_CONNECT"); check(readyChannel.equals(sc), "channels should be equal"); check(!sk.isAcceptable(), "key should not be acceptable"); check(!sk.isReadable(), "key should not be readable"); check(!sk.isWritable(), "key should not be writable"); /* now process the OP_CONNECT */ opConnectReceived = true; check((sk.interestOps() & OP_CONNECT) == OP_CONNECT, "selection key interest ops should contain OP_CONNECT"); sk.interestOps(OP_READ); check((sk.interestOps() & OP_CONNECT) != OP_CONNECT, "selection key interest ops should not contain OP_CONNECT"); check(sc.finishConnect(), "finishConnect should return true"); } /* OP_READ */ else if (sk.isReadable()) { /* some trivial checks */ check(opConnectReceived == true, "should receive one OP_CONNECT before OP_READ"); check(opReadReceived == false, "should not receive OP_READ before OP_CONNECT"); check(readyChannel.equals(sc), "channels should be equal"); check(!sk.isAcceptable(), "key should not be acceptable"); check(sk.isReadable(), "key should be readable"); check(!sk.isWritable(), "key should not be writable"); check(!sk.isConnectable(), "key should not be connectable"); /* now process the OP_READ */ opReadReceived = true; selectiontKey.cancel(); /* try with small buffer to see if native * implementation can handle this */ ByteBuffer buffer = ByteBuffer.allocateDirect(1); readyChannel.receive(buffer, null, clientHandler); check(clientHandler.receivedCommUp(), "Client should have received COMM_UP"); /* dont close (or put anything on) the channel until * we check that the server's accepted channel also * received COMM_UP */ serverHandler.waitForCommUp(); } else { fail("Unexpected selection key"); } } } else { fail("Client selector returned 0 ready keys"); /* stop the server */ server.thread().interrupt(); } } //for } catch (IOException ioe) { unexpected(ioe); } catch (InterruptedException ie) { unexpected(ie); } } class CommUpServer implements Runnable { final InetSocketAddress serverAddr; private SctpServerChannel ssc; private Thread serverThread; public CommUpServer() throws IOException { ssc = SctpServerChannel.open().bind(null); java.util.Set addrs = ssc.getAllLocalAddresses(); if (addrs.isEmpty()) debug("addrs should not be empty"); serverAddr = (InetSocketAddress) addrs.iterator().next(); } void start() { serverThread = new Thread(this, "CommUpServer-" + serverAddr.getPort()); serverThread.start(); } InetSocketAddress address () { return serverAddr; } Thread thread() { return serverThread; } @Override public void run() { Selector selector = null; SctpChannel sc = null; SelectionKey readKey = null; try { sc = ssc.accept(); debug("accepted " + sc); selector = Selector.open(); sc.configureBlocking(false); check(sc.isBlocking() == false, "Should be in non-blocking mode"); readKey = sc.register(selector, SelectionKey.OP_READ); debug("select"); int keysAdded = selector.select(TIMEOUT); debug("returned " + keysAdded + " keys"); if (keysAdded > 0) { Set keys = selector.selectedKeys(); Iterator i = keys.iterator(); while(i.hasNext()) { SelectionKey sk = i.next(); i.remove(); SctpChannel readyChannel = (SctpChannel)sk.channel(); check(readyChannel.equals(sc), "channels should be equal"); check(!sk.isAcceptable(), "key should not be acceptable"); check(sk.isReadable(), "key should be readable"); check(!sk.isWritable(), "key should not be writable"); check(!sk.isConnectable(), "key should not be connectable"); /* block until we check if the client has received its COMM_UP*/ clientHandler.waitForCommUp(); ByteBuffer buffer = ByteBuffer.allocateDirect(1); sc.receive(buffer, null, serverHandler); check(serverHandler.receivedCommUp(), "Accepted channel should have received COMM_UP"); } } else { fail("Server selector returned 0 ready keys"); /* stop the client */ clientThread.interrupt(); } } catch (IOException ioe) { ioe.printStackTrace(); } catch (InterruptedException unused) { } finally { if (readKey != null) readKey.cancel(); try { if (selector != null) selector.close(); } catch (IOException ioe) { unexpected(ioe); } try { if (ssc != null) ssc.close(); } catch (IOException ioe) { unexpected(ioe); } try { if (sc != null) sc.close(); } catch (IOException ioe) { unexpected(ioe); } } } } class CommUpNotificationHandler extends AbstractNotificationHandler { private boolean receivedCommUp; // false public synchronized boolean receivedCommUp() { return receivedCommUp; } public synchronized boolean waitForCommUp() throws InterruptedException { while (receivedCommUp == false) { wait(); } return false; } @Override public HandlerResult handleNotification( Notification notification, Object attachment) { fail("Unknown notification type"); return HandlerResult.CONTINUE; } @Override public synchronized HandlerResult handleNotification( AssociationChangeNotification notification, Object attachment) { AssocChangeEvent event = notification.event(); debug("AssociationChangeNotification"); debug(" Association: " + notification.association()); debug(" Event: " + event); if (event.equals(AssocChangeEvent.COMM_UP)) { receivedCommUp = true; notifyAll(); } return HandlerResult.RETURN; } @Override public HandlerResult handleNotification( ShutdownNotification notification, Object attachment) { debug("ShutdownNotification"); debug(" Association: " + notification.association()); return HandlerResult.RETURN; } } //--------------------- Infrastructure --------------------------- boolean debug = true; volatile int passed = 0, failed = 0; void pass() {passed++;} void fail() {failed++; Thread.dumpStack();} void fail(String msg) {err.println(msg); fail();} void unexpected(Throwable t) {failed++; t.printStackTrace();} void check(boolean cond) {if (cond) pass(); else fail();} void check(boolean cond, String failMessage) {if (cond) pass(); else fail(failMessage);} void debug(String message) {if(debug) { out.println(Thread.currentThread().getName() + ": " + message); } } void sleep(long millis) { try { Thread.currentThread().sleep(millis); } catch(InterruptedException ie) { unexpected(ie); }} public static void main(String[] args) throws Throwable { Class k = new Object(){}.getClass().getEnclosingClass(); try {k.getMethod("instanceMain",String[].class) .invoke( k.newInstance(), (Object) args);} catch (Throwable e) {throw e.getCause();}} public void instanceMain(String[] args) throws Throwable { try {test(args);} catch (Throwable t) {unexpected(t);} out.printf("%nPassed = %d, failed = %d%n%n", passed, failed); if (failed > 0) throw new AssertionError("Some tests failed");} }