8215249: Miscellaneous changes imported from jsr166 CVS 2019-02

Reviewed-by: martin, chegar, dholmes
This commit is contained in:
Doug Lea 2019-02-15 11:18:01 -08:00
parent bb6d423ab8
commit ff97b60fde
4 changed files with 155 additions and 70 deletions
src/java.base/share/classes/java/util/concurrent
test/jdk/java/util

@ -98,12 +98,11 @@ import java.util.concurrent.locks.ReentrantLock;
* }
* }}</pre>
*
* Here, each worker thread processes a row of the matrix then waits at the
* barrier until all rows have been processed. When all rows are processed
* the supplied {@link Runnable} barrier action is executed and merges the
* rows. If the merger
* determines that a solution has been found then {@code done()} will return
* {@code true} and each worker will terminate.
* Here, each worker thread processes a row of the matrix, then waits at the
* barrier until all rows have been processed. When all rows are processed the
* supplied {@link Runnable} barrier action is executed and merges the rows.
* If the merger determines that a solution has been found then {@code done()}
* will return {@code true} and each worker will terminate.
*
* <p>If the barrier action does not rely on the parties being suspended when
* it is executed, then any of the threads in the party could execute that
@ -132,6 +131,7 @@ import java.util.concurrent.locks.ReentrantLock;
* corresponding {@code await()} in other threads.
*
* @see CountDownLatch
* @see Phaser
*
* @author Doug Lea
* @since 1.5
@ -214,18 +214,17 @@ public class CyclicBarrier {
int index = --count;
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
Runnable command = barrierCommand;
if (command != null) {
try {
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)
} catch (Throwable ex) {
breakBarrier();
throw ex;
}
}
nextGeneration();
return 0;
}
// loop until tripped, broken, interrupted, or timed out

@ -69,8 +69,6 @@ import java.util.stream.Stream;
* Be patient; this program runs for a very long time.
* For faster runs, restrict execution using command line args.
*
* This is an interface based version of ArrayList/IteratorMicroBenchmark
*
* @author Martin Buchholz
*/
public class IteratorMicroBenchmark {
@ -115,7 +113,9 @@ public class IteratorMicroBenchmark {
CountDownLatch finalized = new CountDownLatch(1);
ReferenceQueue<Object> queue = new ReferenceQueue<>();
WeakReference<Object> ref = new WeakReference<>(
new Object() { protected void finalize() { finalized.countDown(); }},
new Object() {
@SuppressWarnings("deprecation")
protected void finalize() { finalized.countDown(); }},
queue);
try {
for (int tries = 3; tries--> 0; ) {
@ -267,16 +267,22 @@ public class IteratorMicroBenchmark {
});
}
static List<Integer> makeSubList(List<Integer> list) {
String goodClassName(Object x) {
return goodClassName(x.getClass());
}
static List<Integer> makeSubList(
List<Integer> elements,
UnaryOperator<List<Integer>> copyConstructor) {
final ArrayList<Integer> padded = new ArrayList<>();
final ThreadLocalRandom rnd = ThreadLocalRandom.current();
int size = list.size();
if (size <= 2) return list.subList(0, size);
List<Integer> subList = list.subList(rnd.nextInt(0, 2),
size - rnd.nextInt(0, 2));
List<Integer> copy = new ArrayList<>(list);
subList.clear();
subList.addAll(copy);
return subList;
final int frontPorch = rnd.nextInt(3);
final int backPorch = rnd.nextInt(3);
for (int n = frontPorch; n--> 0; ) padded.add(rnd.nextInt());
padded.addAll(elements);
for (int n = backPorch; n--> 0; ) padded.add(rnd.nextInt());
return copyConstructor.apply(padded)
.subList(frontPorch, frontPorch + elements.size());
}
void run() throws Throwable {
@ -297,22 +303,42 @@ public class IteratorMicroBenchmark {
abq.add(abq.remove());
}
ArrayList<Job> jobs = Stream.<Collection<Integer>>of(
al, ad, abq,
makeSubList(new ArrayList<>(al)),
new LinkedList<>(al),
makeSubList(new LinkedList<>(al)),
new PriorityQueue<>(al),
final Integer[] array = al.toArray(new Integer[0]);
final List<Integer> immutableSubList
= makeSubList(al, x -> List.of(x.toArray(new Integer[0])));
Stream<Collection<Integer>> collections = concatStreams(
Stream.of(
// Lists and their subLists
al,
makeSubList(al, ArrayList::new),
new Vector<>(al),
makeSubList(new Vector<>(al)),
makeSubList(al, Vector::new),
new LinkedList<>(al),
makeSubList(al, LinkedList::new),
new CopyOnWriteArrayList<>(al),
makeSubList(new CopyOnWriteArrayList<>(al)),
makeSubList(al, CopyOnWriteArrayList::new),
ad,
new PriorityQueue<>(al),
new ConcurrentLinkedQueue<>(al),
new ConcurrentLinkedDeque<>(al),
// Blocking Queues
abq,
new LinkedBlockingQueue<>(al),
new LinkedBlockingDeque<>(al),
new LinkedTransferQueue<>(al),
new PriorityBlockingQueue<>(al))
new PriorityBlockingQueue<>(al),
List.of(al.toArray(new Integer[0]))),
// avoid UnsupportedOperationException in jdk9 and jdk10
(goodClassName(immutableSubList).equals("RandomAccessSubList"))
? Stream.empty()
: Stream.of(immutableSubList));
ArrayList<Job> jobs = collections
.flatMap(x -> jobs(x))
.filter(job ->
nameFilter == null || nameFilter.matcher(job.name()).find())
@ -329,16 +355,29 @@ public class IteratorMicroBenchmark {
return Stream.of(streams).flatMap(s -> s);
}
boolean isMutable(Collection<Integer> x) {
return !(x.getClass().getName().contains("ImmutableCollections$"));
}
Stream<Job> jobs(Collection<Integer> x) {
final String klazz = goodClassName(x);
return concatStreams(
collectionJobs(x),
(isMutable(x))
? mutableCollectionJobs(x)
: Stream.empty(),
(x instanceof Deque)
? dequeJobs((Deque<Integer>)x)
: Stream.empty(),
(x instanceof List)
? listJobs((List<Integer>)x)
: Stream.empty(),
(x instanceof List && isMutable(x))
? mutableListJobs((List<Integer>)x)
: Stream.empty());
}
@ -350,7 +389,7 @@ public class IteratorMicroBenchmark {
}
Stream<Job> collectionJobs(Collection<Integer> x) {
final String klazz = goodClassName(x.getClass());
final String klazz = goodClassName(x);
return Stream.of(
new Job(klazz + " iterate for loop") {
public void work() throws Throwable {
@ -381,14 +420,6 @@ public class IteratorMicroBenchmark {
sum[0] = 0;
x.spliterator().forEachRemaining(n -> sum[0] += n);
check.sum(sum[0]);}}},
new Job(klazz + " removeIf") {
public void work() throws Throwable {
int[] sum = new int[1];
for (int i = 0; i < iterations; i++) {
sum[0] = 0;
if (x.removeIf(n -> { sum[0] += n; return false; }))
throw new AssertionError();
check.sum(sum[0]);}}},
new Job(klazz + " contains") {
public void work() throws Throwable {
int[] sum = new int[1];
@ -407,14 +438,6 @@ public class IteratorMicroBenchmark {
if (x.containsAll(sneakyAdderCollection))
throw new AssertionError();
check.sum(sum[0]);}}},
new Job(klazz + " remove(Object)") {
public void work() throws Throwable {
int[] sum = new int[1];
Object sneakyAdder = sneakyAdder(sum);
for (int i = 0; i < iterations; i++) {
sum[0] = 0;
if (x.remove(sneakyAdder)) throw new AssertionError();
check.sum(sum[0]);}}},
new Job(klazz + " forEach") {
public void work() throws Throwable {
int[] sum = new int[1];
@ -498,8 +521,29 @@ public class IteratorMicroBenchmark {
check.sum(sum[0]);}}});
}
Stream<Job> mutableCollectionJobs(Collection<Integer> x) {
final String klazz = goodClassName(x);
return Stream.of(
new Job(klazz + " removeIf") {
public void work() throws Throwable {
int[] sum = new int[1];
for (int i = 0; i < iterations; i++) {
sum[0] = 0;
if (x.removeIf(n -> { sum[0] += n; return false; }))
throw new AssertionError();
check.sum(sum[0]);}}},
new Job(klazz + " remove(Object)") {
public void work() throws Throwable {
int[] sum = new int[1];
Object sneakyAdder = sneakyAdder(sum);
for (int i = 0; i < iterations; i++) {
sum[0] = 0;
if (x.remove(sneakyAdder)) throw new AssertionError();
check.sum(sum[0]);}}});
}
Stream<Job> dequeJobs(Deque<Integer> x) {
String klazz = goodClassName(x.getClass());
final String klazz = goodClassName(x);
return Stream.of(
new Job(klazz + " descendingIterator() loop") {
public void work() throws Throwable {
@ -519,7 +563,7 @@ public class IteratorMicroBenchmark {
}
Stream<Job> listJobs(List<Integer> x) {
final String klazz = goodClassName(x.getClass());
final String klazz = goodClassName(x);
return Stream.of(
new Job(klazz + " listIterator forward loop") {
public void work() throws Throwable {
@ -555,15 +599,6 @@ public class IteratorMicroBenchmark {
if (x.lastIndexOf(sneakyAdder) != -1)
throw new AssertionError();
check.sum(sum[0]);}}},
new Job(klazz + " replaceAll") {
public void work() throws Throwable {
int[] sum = new int[1];
UnaryOperator<Integer> sneakyAdder =
x -> { sum[0] += x; return x; };
for (int i = 0; i < iterations; i++) {
sum[0] = 0;
x.replaceAll(sneakyAdder);
check.sum(sum[0]);}}},
new Job(klazz + " equals") {
public void work() throws Throwable {
ArrayList<Integer> copy = new ArrayList<>(x);
@ -577,4 +612,18 @@ public class IteratorMicroBenchmark {
if (x.hashCode() != hashCode)
throw new AssertionError();}}});
}
Stream<Job> mutableListJobs(List<Integer> x) {
final String klazz = goodClassName(x);
return Stream.of(
new Job(klazz + " replaceAll") {
public void work() throws Throwable {
int[] sum = new int[1];
UnaryOperator<Integer> sneakyAdder =
x -> { sum[0] += x; return x; };
for (int i = 0; i < iterations; i++) {
sum[0] = 0;
x.replaceAll(sneakyAdder);
check.sum(sum[0]);}}});
}
}

@ -270,6 +270,10 @@ public class RemoveMicroBenchmark {
});
}
String goodClassName(Object x) {
return goodClassName(x.getClass());
}
static List<Integer> makeSubList(List<Integer> list) {
final ThreadLocalRandom rnd = ThreadLocalRandom.current();
int size = rnd.nextInt(4);
@ -369,7 +373,7 @@ public class RemoveMicroBenchmark {
}
Stream<Job> collectionJobs(Collection<Integer> x) {
final String klazz = goodClassName(x.getClass());
final String klazz = goodClassName(x);
return Stream.of(
new Job(klazz + " removeIf") {
public void work() throws Throwable {
@ -422,7 +426,7 @@ public class RemoveMicroBenchmark {
}
Stream<Job> iteratorRemoveJobs(Collection<Integer> x) {
final String klazz = goodClassName(x.getClass());
final String klazz = goodClassName(x);
return Stream.of(
new Job(klazz + " Iterator.remove") {
public void work() throws Throwable {
@ -460,7 +464,7 @@ public class RemoveMicroBenchmark {
}
Stream<Job> queueJobs(Queue<Integer> x) {
final String klazz = goodClassName(x.getClass());
final String klazz = goodClassName(x);
return Stream.of(
new Job(klazz + " poll()") {
public void work() throws Throwable {
@ -474,7 +478,7 @@ public class RemoveMicroBenchmark {
}
Stream<Job> dequeJobs(Deque<Integer> x) {
final String klazz = goodClassName(x.getClass());
final String klazz = goodClassName(x);
return Stream.of(
new Job(klazz + " descendingIterator().remove") {
public void work() throws Throwable {
@ -509,7 +513,7 @@ public class RemoveMicroBenchmark {
}
Stream<Job> blockingQueueJobs(BlockingQueue<Integer> x) {
final String klazz = goodClassName(x.getClass());
final String klazz = goodClassName(x);
return Stream.of(
new Job(klazz + " timed poll()") {
public void work() throws Throwable {
@ -545,7 +549,7 @@ public class RemoveMicroBenchmark {
}
Stream<Job> blockingDequeJobs(BlockingDeque<Integer> x) {
final String klazz = goodClassName(x.getClass());
final String klazz = goodClassName(x);
return Stream.of(
new Job(klazz + " timed pollFirst()") {
public void work() throws Throwable {

@ -38,6 +38,9 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@ -486,4 +489,34 @@ public class CyclicBarrierTest extends JSR166TestCase {
assertEquals(0, barrier.getNumberWaiting());
}
}
/**
* There can be more threads calling await() than parties, as long as each
* task only calls await once and the task count is a multiple of parties.
*/
public void testMoreTasksThanParties() throws Exception {
final ThreadLocalRandom rnd = ThreadLocalRandom.current();
final int parties = rnd.nextInt(1, 5);
final int nTasks = rnd.nextInt(1, 5) * parties;
final AtomicInteger tripCount = new AtomicInteger(0);
final AtomicInteger awaitCount = new AtomicInteger(0);
final CyclicBarrier barrier =
new CyclicBarrier(parties, () -> tripCount.getAndIncrement());
final ExecutorService e = Executors.newFixedThreadPool(nTasks);
final Runnable awaiter = () -> {
try {
if (ThreadLocalRandom.current().nextBoolean())
barrier.await();
else
barrier.await(LONG_DELAY_MS, MILLISECONDS);
awaitCount.getAndIncrement();
} catch (Throwable fail) { threadUnexpectedException(fail); }};
try (PoolCleaner cleaner = cleaner(e)) {
for (int i = nTasks; i--> 0; )
e.execute(awaiter);
}
assertEquals(nTasks / parties, tripCount.get());
assertEquals(nTasks, awaitCount.get());
assertEquals(0, barrier.getNumberWaiting());
}
}