This commit is contained in:
Lana Steuck 2016-02-18 13:42:39 -08:00
commit 2c11c44055
6 changed files with 135 additions and 25 deletions

View File

@ -54,11 +54,17 @@ public class CorbaInboundConnectionCacheImpl
{
protected Collection connectionCache;
private Acceptor acceptor;
public CorbaInboundConnectionCacheImpl(ORB orb, Acceptor acceptor)
{
super(orb, acceptor.getConnectionCacheType(),
((CorbaAcceptor)acceptor).getMonitoringName());
this.connectionCache = new ArrayList();
this.acceptor = acceptor;
if (orb.transportDebugFlag) {
dprint(": " + acceptor );
}
}
////////////////////////////////////////////////////
@ -66,11 +72,25 @@ public class CorbaInboundConnectionCacheImpl
// pept.transport.InboundConnectionCache
//
public void close () {
super.close();
if (orb.transportDebugFlag) {
dprint(".close: " + acceptor );
}
this.acceptor.close();
}
public Connection get(Acceptor acceptor)
{
throw wrapper.methodShouldNotBeCalled();
}
public Acceptor getAcceptor () {
return acceptor;
}
public void put(Acceptor acceptor, Connection connection)
{
if (orb.transportDebugFlag) {

View File

@ -188,8 +188,9 @@ public class CorbaTransportManagerImpl
for (Object cc : outboundConnectionCaches.values()) {
((ConnectionCache)cc).close() ;
}
for (Object cc : inboundConnectionCaches.values()) {
((ConnectionCache)cc).close() ;
for (Object icc : inboundConnectionCaches.values()) {
((ConnectionCache)icc).close() ;
unregisterAcceptor(((InboundConnectionCache)icc).getAcceptor());
}
getSelector(0).close();
} finally {

View File

@ -26,16 +26,20 @@
package com.sun.corba.se.impl.transport;
import java.io.IOException;
import java.net.ServerSocket;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ClosedSelectorException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Iterator;
import java.util.List;
import com.sun.corba.se.pept.broker.Broker;
import com.sun.corba.se.pept.transport.Acceptor;
import com.sun.corba.se.pept.transport.Connection;
@ -111,8 +115,17 @@ class SelectorImpl
interestOpsList.add(keyAndOp);
}
// tell Selector Thread there's an update to a SelectorKey's Ops
try {
if (selector != null) {
// wakeup Selector thread to process close request
selector.wakeup();
}
} catch (Throwable t) {
if (orb.transportDebugFlag) {
dprint(".registerInterestOps: selector.wakeup: ", t);
}
}
}
else {
wrapper.selectionKeyInvalid(eventHandler.toString());
if (orb.transportDebugFlag) {
@ -186,7 +199,9 @@ class SelectorImpl
if (selectionKey != null) {
selectionKey.cancel();
}
if (selector != null) {
selector.wakeup();
}
return;
}
@ -239,6 +254,8 @@ class SelectorImpl
readerThread.close();
}
clearDeferredRegistrations();
// Selector
try {
@ -248,7 +265,7 @@ class SelectorImpl
}
} catch (Throwable t) {
if (orb.transportDebugFlag) {
dprint(".close: selector.close: " + t);
dprint(".close: selector.wakeup: ", t);
}
}
}
@ -273,15 +290,16 @@ class SelectorImpl
n = selector.select(timeout);
} catch (IOException e) {
if (orb.transportDebugFlag) {
dprint(".run: selector.select: " + e);
dprint(".run: selector.select: ", e);
}
} catch (ClosedSelectorException csEx) {
if (orb.transportDebugFlag) {
dprint(".run: selector.select: ", csEx);
}
break;
}
if (closed) {
selector.close();
if (orb.transportDebugFlag) {
dprint(".run: closed - .run return");
}
return;
break;
}
/*
if (timeout == 0 && orb.transportDebugFlag) {
@ -321,6 +339,18 @@ class SelectorImpl
}
}
}
try {
if (selector != null) {
if (orb.transportDebugFlag) {
dprint(".run: selector.close ");
}
selector.close();
}
} catch (Throwable t) {
if (orb.transportDebugFlag) {
dprint(".run: selector.close: ", t);
}
}
}
/////////////////////////////////////////////////////
@ -328,6 +358,44 @@ class SelectorImpl
// Implementation.
//
private void clearDeferredRegistrations() {
synchronized (deferredRegistrations) {
int deferredListSize = deferredRegistrations.size();
if (orb.transportDebugFlag) {
dprint(".clearDeferredRegistrations:deferred list size == " + deferredListSize);
}
for (int i = 0; i < deferredListSize; i++) {
EventHandler eventHandler =
(EventHandler)deferredRegistrations.get(i);
if (orb.transportDebugFlag) {
dprint(".clearDeferredRegistrations: " + eventHandler);
}
SelectableChannel channel = eventHandler.getChannel();
SelectionKey selectionKey = null;
try {
if (orb.transportDebugFlag) {
dprint(".clearDeferredRegistrations:close channel == "
+ channel);
dprint(".clearDeferredRegistrations:close channel class == "
+ channel.getClass().getName());
}
channel.close();
selectionKey = eventHandler.getSelectionKey();
if (selectionKey != null) {
selectionKey.cancel();
selectionKey.attach(null);
}
} catch (IOException ioEx) {
if (orb.transportDebugFlag) {
dprint(".clearDeferredRegistrations: ", ioEx);
}
}
}
deferredRegistrations.clear();
}
}
private synchronized boolean isClosed ()
{
return closed;
@ -344,7 +412,7 @@ class SelectorImpl
selector = Selector.open();
} catch (IOException e) {
if (orb.transportDebugFlag) {
dprint(".startSelector: Selector.open: IOException: " + e);
dprint(".startSelector: Selector.open: IOException: ", e);
}
// REVISIT - better handling/reporting
RuntimeException rte =
@ -379,7 +447,7 @@ class SelectorImpl
(Object)eventHandler);
} catch (ClosedChannelException e) {
if (orb.transportDebugFlag) {
dprint(".handleDeferredRegistrations: " + e);
dprint(".handleDeferredRegistrations: ", e);
}
}
eventHandler.setSelectionKey(selectionKey);

View File

@ -264,8 +264,13 @@ public class SocketOrChannelAcceptorImpl
if (connection.shouldRegisterServerReadEvent()) {
Selector selector = orb.getTransportManager().getSelector(0);
if (selector != null) {
if (orb.transportDebugFlag) {
dprint(".accept: registerForEvent: " + connection);
}
selector.registerForEvent(connection.getEventHandler());
}
}
getConnectionCache().reclaim();
@ -273,14 +278,17 @@ public class SocketOrChannelAcceptorImpl
if (orb.transportDebugFlag) {
dprint(".accept:", e);
}
orb.getTransportManager().getSelector(0).unregisterForEvent(this);
Selector selector = orb.getTransportManager().getSelector(0);
if (selector != null) {
selector.unregisterForEvent(this);
// REVISIT - need to close - recreate - then register new one.
orb.getTransportManager().getSelector(0).registerForEvent(this);
selector.registerForEvent(this);
// NOTE: if register cycling we do not want to shut down ORB
// since local beans will still work. Instead one will see
// a growing log file to alert admin of problem.
}
}
}
public void close ()
{
@ -289,7 +297,9 @@ public class SocketOrChannelAcceptorImpl
dprint(".close->:");
}
Selector selector = orb.getTransportManager().getSelector(0);
if (selector != null) {
selector.unregisterForEvent(this);
}
if (serverSocketChannel != null) {
serverSocketChannel.close();
}
@ -480,7 +490,9 @@ public class SocketOrChannelAcceptorImpl
// of calling SelectionKey.interestOps(<interest op>).
Selector selector = orb.getTransportManager().getSelector(0);
if (selector != null) {
selector.registerInterestOps(this);
}
if (orb.transportDebugFlag) {
dprint(".doWork<-:" + this);

View File

@ -367,7 +367,10 @@ public class SocketOrChannelConnectionImpl
}
}
// REVISIT - make sure reader thread is killed.
orb.getTransportManager().getSelector(0).unregisterForEvent(this);
Selector selector = orb.getTransportManager().getSelector(0);
if (selector != null) {
selector.unregisterForEvent(this);
}
// Notify anyone waiting.
purgeCalls(wrapper.connectionAbort(ex), true, false);
// REVISIT
@ -801,7 +804,9 @@ public class SocketOrChannelConnectionImpl
}
try {
Selector selector = orb.getTransportManager().getSelector(0);
if (selector != null) {
selector.unregisterForEvent(this);
}
if (socketChannel != null) {
socketChannel.close();
}
@ -824,7 +829,9 @@ public class SocketOrChannelConnectionImpl
dprint(".closeConnectionResources->: " + this);
}
Selector selector = orb.getTransportManager().getSelector(0);
if (selector != null) {
selector.unregisterForEvent(this);
}
try {
if (socketChannel != null)
socketChannel.close() ;

View File

@ -36,6 +36,8 @@ public interface InboundConnectionCache
public void put(Acceptor acceptor, Connection connection);
public void remove(Connection connection);
public Acceptor getAcceptor();
}
// End of file.