8139965: Hang seen when using com.sun.jndi.ldap.search.replyQueueSize
Reviewed-by: dfuchs
This commit is contained in:
parent
37281369f0
commit
88303d1c60
@ -186,12 +186,16 @@ public final class BerDecoder extends Ber {
|
|||||||
*</pre></blockquote>
|
*</pre></blockquote>
|
||||||
*/
|
*/
|
||||||
private int parseIntWithTag(int tag) throws DecodeException {
|
private int parseIntWithTag(int tag) throws DecodeException {
|
||||||
|
|
||||||
|
|
||||||
if (parseByte() != tag) {
|
if (parseByte() != tag) {
|
||||||
|
// Ber could have been reset;
|
||||||
|
String s;
|
||||||
|
if (offset > 0) {
|
||||||
|
s = Integer.toString(buf[offset - 1] & 0xff);
|
||||||
|
} else {
|
||||||
|
s = "Empty tag";
|
||||||
|
}
|
||||||
throw new DecodeException("Encountered ASN.1 tag " +
|
throw new DecodeException("Encountered ASN.1 tag " +
|
||||||
Integer.toString(buf[offset - 1] & 0xff) +
|
s + " (expected tag " + Integer.toString(tag) + ")");
|
||||||
" (expected tag " + Integer.toString(tag) + ")");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int len = parseLength();
|
int len = parseLength();
|
||||||
|
@ -408,65 +408,29 @@ public final class Connection implements Runnable {
|
|||||||
/**
|
/**
|
||||||
* Reads a reply; waits until one is ready.
|
* Reads a reply; waits until one is ready.
|
||||||
*/
|
*/
|
||||||
BerDecoder readReply(LdapRequest ldr)
|
BerDecoder readReply(LdapRequest ldr) throws IOException, NamingException {
|
||||||
throws IOException, NamingException {
|
|
||||||
BerDecoder rber;
|
BerDecoder rber;
|
||||||
|
|
||||||
// Track down elapsed time to workaround spurious wakeups
|
try {
|
||||||
long elapsedMilli = 0;
|
// if no timeout is set so we wait infinitely until
|
||||||
long elapsedNano = 0;
|
// a response is received
|
||||||
|
// http://docs.oracle.com/javase/8/docs/technotes/guides/jndi/jndi-ldap.html#PROP
|
||||||
while (((rber = ldr.getReplyBer()) == null) &&
|
rber = ldr.getReplyBer(readTimeout);
|
||||||
(readTimeout <= 0 || elapsedMilli < readTimeout))
|
} catch (InterruptedException ex) {
|
||||||
{
|
throw new InterruptedNamingException(
|
||||||
try {
|
"Interrupted during LDAP operation");
|
||||||
// If socket closed, don't even try
|
|
||||||
synchronized (this) {
|
|
||||||
if (sock == null) {
|
|
||||||
throw new ServiceUnavailableException(host + ":" + port +
|
|
||||||
"; socket closed");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
synchronized (ldr) {
|
|
||||||
// check if condition has changed since our last check
|
|
||||||
rber = ldr.getReplyBer();
|
|
||||||
if (rber == null) {
|
|
||||||
if (readTimeout > 0) { // Socket read timeout is specified
|
|
||||||
long beginNano = System.nanoTime();
|
|
||||||
|
|
||||||
// will be woken up before readTimeout if reply is
|
|
||||||
// available
|
|
||||||
ldr.wait(readTimeout - elapsedMilli);
|
|
||||||
elapsedNano += (System.nanoTime() - beginNano);
|
|
||||||
elapsedMilli += elapsedNano / 1000_000;
|
|
||||||
elapsedNano %= 1000_000;
|
|
||||||
|
|
||||||
} else {
|
|
||||||
// no timeout is set so we wait infinitely until
|
|
||||||
// a response is received
|
|
||||||
// http://docs.oracle.com/javase/8/docs/technotes/guides/jndi/jndi-ldap.html#PROP
|
|
||||||
ldr.wait();
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (InterruptedException ex) {
|
|
||||||
throw new InterruptedNamingException(
|
|
||||||
"Interrupted during LDAP operation");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((rber == null) && (elapsedMilli >= readTimeout)) {
|
if (rber == null) {
|
||||||
abandonRequest(ldr, null);
|
abandonRequest(ldr, null);
|
||||||
throw new NamingException("LDAP response read timed out, timeout used:"
|
throw new NamingException(
|
||||||
|
"LDAP response read timed out, timeout used:"
|
||||||
+ readTimeout + "ms." );
|
+ readTimeout + "ms." );
|
||||||
|
|
||||||
}
|
}
|
||||||
return rber;
|
return rber;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////
|
||||||
//
|
//
|
||||||
// Methods to add, find, delete, and abandon requests made to server
|
// Methods to add, find, delete, and abandon requests made to server
|
||||||
@ -660,14 +624,11 @@ public final class Connection implements Runnable {
|
|||||||
if (nparent) {
|
if (nparent) {
|
||||||
LdapRequest ldr = pendingRequests;
|
LdapRequest ldr = pendingRequests;
|
||||||
while (ldr != null) {
|
while (ldr != null) {
|
||||||
|
ldr.close();
|
||||||
synchronized (ldr) {
|
|
||||||
ldr.notify();
|
|
||||||
ldr = ldr.next;
|
ldr = ldr.next;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
if (nparent) {
|
if (nparent) {
|
||||||
parent.processConnectionClosure();
|
parent.processConnectionClosure();
|
||||||
}
|
}
|
||||||
@ -755,7 +716,7 @@ public final class Connection implements Runnable {
|
|||||||
* the safest thing to do is to shut it down.
|
* the safest thing to do is to shut it down.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
private Object pauseLock = new Object(); // lock for reader to wait on while paused
|
private final Object pauseLock = new Object(); // lock for reader to wait on while paused
|
||||||
private boolean paused = false; // paused state of reader
|
private boolean paused = false; // paused state of reader
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -29,55 +29,52 @@ import java.io.IOException;
|
|||||||
import java.util.concurrent.BlockingQueue;
|
import java.util.concurrent.BlockingQueue;
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
import javax.naming.CommunicationException;
|
import javax.naming.CommunicationException;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
final class LdapRequest {
|
final class LdapRequest {
|
||||||
|
|
||||||
|
private final static BerDecoder EOF = new BerDecoder(new byte[]{}, -1, 0);
|
||||||
|
|
||||||
LdapRequest next; // Set/read in synchronized Connection methods
|
LdapRequest next; // Set/read in synchronized Connection methods
|
||||||
int msgId; // read-only
|
final int msgId; // read-only
|
||||||
|
|
||||||
private int gotten = 0;
|
private final BlockingQueue<BerDecoder> replies;
|
||||||
private BlockingQueue<BerDecoder> replies;
|
private volatile boolean cancelled;
|
||||||
private int highWatermark = -1;
|
private volatile boolean closed;
|
||||||
private boolean cancelled = false;
|
private volatile boolean completed;
|
||||||
private boolean pauseAfterReceipt = false;
|
private final boolean pauseAfterReceipt;
|
||||||
private boolean completed = false;
|
|
||||||
|
|
||||||
LdapRequest(int msgId, boolean pause) {
|
|
||||||
this(msgId, pause, -1);
|
|
||||||
}
|
|
||||||
|
|
||||||
LdapRequest(int msgId, boolean pause, int replyQueueCapacity) {
|
LdapRequest(int msgId, boolean pause, int replyQueueCapacity) {
|
||||||
this.msgId = msgId;
|
this.msgId = msgId;
|
||||||
this.pauseAfterReceipt = pause;
|
this.pauseAfterReceipt = pause;
|
||||||
if (replyQueueCapacity == -1) {
|
if (replyQueueCapacity == -1) {
|
||||||
this.replies = new LinkedBlockingQueue<BerDecoder>();
|
this.replies = new LinkedBlockingQueue<>();
|
||||||
} else {
|
} else {
|
||||||
this.replies =
|
this.replies = new LinkedBlockingQueue<>(8 * replyQueueCapacity / 10);
|
||||||
new LinkedBlockingQueue<BerDecoder>(replyQueueCapacity);
|
|
||||||
highWatermark = (replyQueueCapacity * 80) / 100; // 80% capacity
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized void cancel() {
|
void cancel() {
|
||||||
cancelled = true;
|
cancelled = true;
|
||||||
|
replies.offer(EOF);
|
||||||
|
}
|
||||||
|
|
||||||
// Unblock reader of pending request
|
synchronized void close() {
|
||||||
// Should only ever have at most one waiter
|
closed = true;
|
||||||
notify();
|
replies.offer(EOF);
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean isClosed() {
|
||||||
|
return closed && (replies.size() == 0 || replies.peek() == EOF);
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized boolean addReplyBer(BerDecoder ber) {
|
synchronized boolean addReplyBer(BerDecoder ber) {
|
||||||
if (cancelled) {
|
// check the closed boolean value here as we don't want anything
|
||||||
|
// to be added to the queue after close() has been called.
|
||||||
|
if (cancelled || closed) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add a new reply to the queue of unprocessed replies.
|
|
||||||
try {
|
|
||||||
replies.put(ber);
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
// ignore
|
|
||||||
}
|
|
||||||
|
|
||||||
// peek at the BER buffer to check if it is a SearchResultDone PDU
|
// peek at the BER buffer to check if it is a SearchResultDone PDU
|
||||||
try {
|
try {
|
||||||
ber.parseSeq(null);
|
ber.parseSeq(null);
|
||||||
@ -88,33 +85,38 @@ final class LdapRequest {
|
|||||||
}
|
}
|
||||||
ber.reset();
|
ber.reset();
|
||||||
|
|
||||||
notify(); // notify anyone waiting for reply
|
// Add a new reply to the queue of unprocessed replies.
|
||||||
/*
|
try {
|
||||||
* If a queue capacity has been set then trigger a pause when the
|
replies.put(ber);
|
||||||
* queue has filled to 80% capacity. Later, when the queue has drained
|
} catch (InterruptedException e) {
|
||||||
* then the reader gets unpaused.
|
// ignore
|
||||||
*/
|
|
||||||
if (highWatermark != -1 && replies.size() >= highWatermark) {
|
|
||||||
return true; // trigger the pause
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return pauseAfterReceipt;
|
return pauseAfterReceipt;
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized BerDecoder getReplyBer() throws CommunicationException {
|
BerDecoder getReplyBer(long millis) throws CommunicationException,
|
||||||
|
InterruptedException {
|
||||||
|
if (cancelled) {
|
||||||
|
throw new CommunicationException("Request: " + msgId +
|
||||||
|
" cancelled");
|
||||||
|
}
|
||||||
|
if (isClosed()) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
BerDecoder result = millis > 0 ?
|
||||||
|
replies.poll(millis, TimeUnit.MILLISECONDS) : replies.take();
|
||||||
|
|
||||||
if (cancelled) {
|
if (cancelled) {
|
||||||
throw new CommunicationException("Request: " + msgId +
|
throw new CommunicationException("Request: " + msgId +
|
||||||
" cancelled");
|
" cancelled");
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
return result == EOF ? null : result;
|
||||||
* Remove a reply if the queue is not empty.
|
|
||||||
* poll returns null if queue is empty.
|
|
||||||
*/
|
|
||||||
BerDecoder reply = replies.poll();
|
|
||||||
return reply;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized boolean hasSearchCompleted() {
|
boolean hasSearchCompleted() {
|
||||||
return completed;
|
return completed;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -194,8 +194,8 @@ public class LdapDnsProviderTest {
|
|||||||
// no SecurityManager
|
// no SecurityManager
|
||||||
runTest("ldap:///dc=example,dc=com", "localhost:389");
|
runTest("ldap:///dc=example,dc=com", "localhost:389");
|
||||||
runTest("ldap://localhost/dc=example,dc=com", "localhost:389");
|
runTest("ldap://localhost/dc=example,dc=com", "localhost:389");
|
||||||
runTest("ldap://localhost:111/dc=example,dc=com", "localhost:111");
|
runTest("ldap://localhost:1111/dc=example,dc=com", "localhost:1111");
|
||||||
runTest("ldaps://localhost:111/dc=example,dc=com", "localhost:111");
|
runTest("ldaps://localhost:1111/dc=example,dc=com", "localhost:1111");
|
||||||
runTest("ldaps://localhost/dc=example,dc=com", "localhost:636");
|
runTest("ldaps://localhost/dc=example,dc=com", "localhost:636");
|
||||||
runTest(null, "localhost:389");
|
runTest(null, "localhost:389");
|
||||||
runTest("", "ConfigurationException");
|
runTest("", "ConfigurationException");
|
||||||
|
Loading…
Reference in New Issue
Block a user