diff --git a/src/java.base/share/classes/java/util/concurrent/DelayQueue.java b/src/java.base/share/classes/java/util/concurrent/DelayQueue.java index bf0858df1b7..36bf9c56c6b 100644 --- a/src/java.base/share/classes/java/util/concurrent/DelayQueue.java +++ b/src/java.base/share/classes/java/util/concurrent/DelayQueue.java @@ -47,18 +47,41 @@ import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; /** - * An unbounded {@linkplain BlockingQueue blocking queue} of - * {@code Delayed} elements, in which an element can only be taken - * when its delay has expired. The head of the queue is that - * {@code Delayed} element whose delay expired furthest in the - * past. If no delay has expired there is no head and {@code poll} - * will return {@code null}. Expiration occurs when an element's - * {@code getDelay(TimeUnit.NANOSECONDS)} method returns a value less - * than or equal to zero. Even though unexpired elements cannot be - * removed using {@code take} or {@code poll}, they are otherwise - * treated as normal elements. For example, the {@code size} method - * returns the count of both expired and unexpired elements. - * This queue does not permit null elements. + * An unbounded {@linkplain BlockingQueue blocking queue} of {@link Delayed} + * elements, in which an element generally becomes eligible for removal when its + * delay has expired. + * + *

An element is considered expired when its + * {@code getDelay(TimeUnit.NANOSECONDS)} method would return a value less than + * or equal to zero. + * + *

An element is considered the head of the queue if it + * is the element with the earliest expiration time, whether in the past or the + * future, if there is such an element. + * + *

An element is considered the expired head of + * the queue if it is the expired element with the earliest expiration + * time in the past, if there is such an element. + * The expired head, when present, is also the head. + * + *

While this class implements the {@code BlockingQueue} interface, it + * intentionally violates the general contract of {@code BlockingQueue}, in that + * the following methods disregard the presence of unexpired elements and only + * ever remove the expired head: + * + *

+ * + *

All other methods operate on both expired and unexpired elements. + * For example, the {@link #size()} method returns the count of all elements. + * Method {@link #peek()} may return the (non-null) head even when + * {@code take()} would block waiting for that element to expire. + * + *

This queue does not permit null elements. * *

This class and its iterator implement all of the optional * methods of the {@link Collection} and {@link Iterator} interfaces. @@ -181,10 +204,11 @@ public class DelayQueue extends AbstractQueue } /** - * Retrieves and removes the head of this queue, or returns {@code null} - * if this queue has no elements with an expired delay. + * Retrieves and removes the expired head of + * this queue, or returns {@code null} if this queue has no + * expired elements. * - * @return the head of this queue, or {@code null} if this + * @return the expired head of this queue, or {@code null} if this * queue has no elements with an expired delay */ public E poll() { @@ -201,10 +225,11 @@ public class DelayQueue extends AbstractQueue } /** - * Retrieves and removes the head of this queue, waiting if necessary - * until an element with an expired delay is available on this queue. + * Retrieves and removes the expired head of + * this queue, waiting if necessary until an + * expired element is available on this queue. * - * @return the head of this queue + * @return the expired head of this queue * @throws InterruptedException {@inheritDoc} */ public E take() throws InterruptedException { @@ -242,11 +267,12 @@ public class DelayQueue extends AbstractQueue } /** - * Retrieves and removes the head of this queue, waiting if necessary - * until an element with an expired delay is available on this queue, + * Retrieves and removes the expired head of + * this queue, waiting if necessary until an + * expired element is available on this queue, * or the specified wait time expires. * - * @return the head of this queue, or {@code null} if the + * @return the expired head of this queue, or {@code null} if the * specified waiting time elapses before an element with * an expired delay becomes available * @throws InterruptedException {@inheritDoc} @@ -293,13 +319,25 @@ public class DelayQueue extends AbstractQueue } /** - * Retrieves, but does not remove, the head of this queue, or - * returns {@code null} if this queue is empty. Unlike - * {@code poll}, if no expired elements are available in the queue, - * this method returns the element that will expire next, - * if one exists. + * Retrieves and removes the expired head of + * this queue, or throws an exception if this queue has no + * expired elements. * - * @return the head of this queue, or {@code null} if this + * @return the expired head of this queue + * @throws NoSuchElementException if this queue has no elements with an + * expired delay + */ + public E remove() { + return super.remove(); + } + + /** + * Retrieves, but does not remove, the head of this + * queue, or returns {@code null} if this queue is empty. + * Unlike {@code poll}, if no expired elements are available in the queue, + * this method returns the element that will expire next, if one exists. + * + * @return the head of this queue, or {@code null} if this * queue is empty */ public E peek() { diff --git a/test/jdk/java/util/concurrent/tck/DelayQueueTest.java b/test/jdk/java/util/concurrent/tck/DelayQueueTest.java index 814dca744e9..5de5ecf815a 100644 --- a/test/jdk/java/util/concurrent/tck/DelayQueueTest.java +++ b/test/jdk/java/util/concurrent/tck/DelayQueueTest.java @@ -33,7 +33,10 @@ * Pat Fisher, Mike Judd. */ +import static java.util.concurrent.TimeUnit.DAYS; import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.MICROSECONDS; +import static java.util.concurrent.TimeUnit.NANOSECONDS; import java.util.ArrayList; import java.util.Arrays; @@ -41,11 +44,13 @@ import java.util.Collection; import java.util.Iterator; import java.util.NoSuchElementException; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Delayed; import java.util.concurrent.DelayQueue; import java.util.concurrent.Executors; import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import junit.framework.Test; @@ -104,34 +109,26 @@ public class DelayQueueTest extends JSR166TestCase { } /** - * Delayed implementation that actually delays + * Delayed implementation that actually delays. + * Only for use in DelayQueue. */ - static class NanoDelay implements Delayed { + static class SimpleDelay implements Delayed { final long trigger; - NanoDelay(long i) { - trigger = System.nanoTime() + i; + + SimpleDelay(long delay, TimeUnit unit) { + trigger = System.nanoTime() + unit.toNanos(delay); } public int compareTo(Delayed y) { - return Long.compare(trigger, ((NanoDelay)y).trigger); + long now = System.nanoTime(); + return Long.compare(trigger - now, ((SimpleDelay)y).trigger - now); } - public boolean equals(Object other) { - return (other instanceof NanoDelay) && - this.trigger == ((NanoDelay)other).trigger; - } - - // suppress [overrides] javac warning - public int hashCode() { return (int) trigger; } - public long getDelay(TimeUnit unit) { - long n = trigger - System.nanoTime(); - return unit.convert(n, TimeUnit.NANOSECONDS); + return unit.convert(trigger - System.nanoTime(), NANOSECONDS); } - public long getTriggerTime() { - return trigger; - } + public long getTriggerTime() { return trigger; } public String toString() { return String.valueOf(trigger); @@ -700,13 +697,16 @@ public class DelayQueueTest extends JSR166TestCase { * Delayed actions do not occur until their delay elapses */ public void testDelay() throws InterruptedException { - DelayQueue q = new DelayQueue<>(); - for (int i = 0; i < SIZE; ++i) - q.add(new NanoDelay(1000000L * (SIZE - i))); + DelayQueue q = new DelayQueue<>(); + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + for (int i = 0; i < SIZE; ++i) { + long micros = rnd.nextLong(SIZE); + q.add(new SimpleDelay(micros, MICROSECONDS)); + } long last = 0; for (int i = 0; i < SIZE; ++i) { - NanoDelay e = q.take(); + SimpleDelay e = q.take(); long tt = e.getTriggerTime(); assertTrue(System.nanoTime() - tt >= 0); if (i != 0) @@ -720,29 +720,96 @@ public class DelayQueueTest extends JSR166TestCase { * peek of a non-empty queue returns non-null even if not expired */ public void testPeekDelayed() { - DelayQueue q = new DelayQueue<>(); - q.add(new NanoDelay(Long.MAX_VALUE)); - assertNotNull(q.peek()); + DelayQueue q = new DelayQueue<>(); + SimpleDelay unexpired = new SimpleDelay(1L, DAYS); + SimpleDelay expired = new SimpleDelay(0L, DAYS); + q.add(unexpired); + assertSame(unexpired, q.peek()); + q.add(expired); + assertSame(expired, q.peek()); + } + + /** + * remove(Object) disregards the expiration state + */ + public void testRemoveObject() { + DelayQueue q = new DelayQueue<>(); + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + var xs = new ArrayList(); + int size = 8; + for (int i = 0; i < size; i++) { + long days = rnd.nextLong(-size, size); + var x = new SimpleDelay(days, DAYS); + xs.add(x); + q.add(x); + } + for (SimpleDelay x : xs) { + assertTrue(q.remove(x)); + assertFalse(q.remove(x)); + } + assertTrue(q.isEmpty()); } /** * poll of a non-empty queue returns null if no expired elements. */ public void testPollDelayed() { - DelayQueue q = new DelayQueue<>(); - q.add(new NanoDelay(Long.MAX_VALUE)); + DelayQueue q = new DelayQueue<>(); + SimpleDelay unexpired = new SimpleDelay(1L, DAYS); + SimpleDelay expired = new SimpleDelay(0L, DAYS); + q.add(unexpired); assertNull(q.poll()); + q.add(expired); + assertSame(expired, q.poll()); + assertNull(q.poll()); + assertSame(unexpired, q.peek()); } /** * timed poll of a non-empty queue returns null if no expired elements. */ public void testTimedPollDelayed() throws InterruptedException { - DelayQueue q = new DelayQueue<>(); - q.add(new NanoDelay(LONG_DELAY_MS * 1000000L)); + DelayQueue q = new DelayQueue<>(); + SimpleDelay unexpired = new SimpleDelay(1L, DAYS); + SimpleDelay expired = new SimpleDelay(0L, DAYS); + q.add(unexpired); long startTime = System.nanoTime(); assertNull(q.poll(timeoutMillis(), MILLISECONDS)); assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); + q.add(expired); + assertSame(expired, q.poll(1L, DAYS)); + assertNull(q.poll(0L, DAYS)); + assertSame(unexpired, q.peek()); + } + + /** + * q.take() waits for an expired element. + */ + public void testTakeDelayed() throws InterruptedException { + DelayQueue q = new DelayQueue<>(); + SimpleDelay unexpired = new SimpleDelay(1L, DAYS); + SimpleDelay expired = new SimpleDelay(0L, DAYS); + q.add(unexpired); + CompletableFuture.runAsync(() -> q.add(expired)); + assertSame(expired, q.take()); + assertSame(unexpired, q.peek()); + } + + /** + * q.remove() throws NoSuchElementException if no expired elements. + */ + public void testRemoveDelayed() throws InterruptedException { + DelayQueue q = new DelayQueue<>(); + SimpleDelay unexpired = new SimpleDelay(1L, DAYS); + SimpleDelay expired = new SimpleDelay(0L, DAYS); + q.add(unexpired); + try { + q.remove(); + shouldThrow(); + } catch (NoSuchElementException success) {} + q.add(expired); + assertSame(expired, q.remove()); + assertSame(unexpired, q.peek()); } /**