8005311: Add Scalable Updatable Variables, DoubleAccumulator, DoubleAdder, LongAccumulator, LongAdder
Reviewed-by: chegar, darcy, goetz
This commit is contained in:
parent
3d92cb12a6
commit
3f14786363
jdk
make/java/java
src/share/classes/java/util/concurrent/atomic
test/java/util/concurrent/atomic
@ -373,6 +373,11 @@ JAVA_JAVA_java = \
|
||||
java/util/concurrent/atomic/AtomicReferenceArray.java \
|
||||
java/util/concurrent/atomic/AtomicReferenceFieldUpdater.java \
|
||||
java/util/concurrent/atomic/AtomicStampedReference.java \
|
||||
java/util/concurrent/atomic/DoubleAccumulator.java \
|
||||
java/util/concurrent/atomic/DoubleAdder.java \
|
||||
java/util/concurrent/atomic/LongAccumulator.java \
|
||||
java/util/concurrent/atomic/LongAdder.java \
|
||||
java/util/concurrent/atomic/Striped64.java \
|
||||
java/util/concurrent/locks/AbstractOwnableSynchronizer.java \
|
||||
java/util/concurrent/locks/AbstractQueuedLongSynchronizer.java \
|
||||
java/util/concurrent/locks/AbstractQueuedSynchronizer.java \
|
||||
|
@ -0,0 +1,239 @@
|
||||
/*
|
||||
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
||||
*
|
||||
* This code is free software; you can redistribute it and/or modify it
|
||||
* under the terms of the GNU General Public License version 2 only, as
|
||||
* published by the Free Software Foundation. Oracle designates this
|
||||
* particular file as subject to the "Classpath" exception as provided
|
||||
* by Oracle in the LICENSE file that accompanied this code.
|
||||
*
|
||||
* This code is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
|
||||
* version 2 for more details (a copy is included in the LICENSE file that
|
||||
* accompanied this code).
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License version
|
||||
* 2 along with this work; if not, write to the Free Software Foundation,
|
||||
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||
*
|
||||
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
|
||||
* or visit www.oracle.com if you need additional information or have any
|
||||
* questions.
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is available under and governed by the GNU General Public
|
||||
* License version 2 only, as published by the Free Software Foundation.
|
||||
* However, the following notice accompanied the original version of this
|
||||
* file:
|
||||
*
|
||||
* Written by Doug Lea with assistance from members of JCP JSR-166
|
||||
* Expert Group and released to the public domain, as explained at
|
||||
* http://creativecommons.org/publicdomain/zero/1.0/
|
||||
*/
|
||||
|
||||
package java.util.concurrent.atomic;
|
||||
import java.io.Serializable;
|
||||
import java.util.function.DoubleBinaryOperator;
|
||||
|
||||
/**
|
||||
* One or more variables that together maintain a running {@code double}
|
||||
* value updated using a supplied function. When updates (method
|
||||
* {@link #accumulate}) are contended across threads, the set of variables
|
||||
* may grow dynamically to reduce contention. Method {@link #get}
|
||||
* (or, equivalently, {@link #doubleValue}) returns the current value
|
||||
* across the variables maintaining updates.
|
||||
*
|
||||
* <p>This class is usually preferable to alternatives when multiple
|
||||
* threads update a common value that is used for purposes such as
|
||||
* summary statistics that are frequently updated but less frequently
|
||||
* read.
|
||||
*
|
||||
* <p>The supplied accumulator function should be side-effect-free,
|
||||
* since it may be re-applied when attempted updates fail due to
|
||||
* contention among threads. The function is applied with the current
|
||||
* value as its first argument, and the given update as the second
|
||||
* argument. For example, to maintain a running maximum value, you
|
||||
* could supply {@code Double::max} along with {@code
|
||||
* Double.NEGATIVE_INFINITY} as the identity. The order of
|
||||
* accumulation within or across threads is not guaranteed. Thus, this
|
||||
* class may not be applicable if numerical stability is required,
|
||||
* especially when combining values of substantially different orders
|
||||
* of magnitude.
|
||||
*
|
||||
* <p>Class {@link DoubleAdder} provides analogs of the functionality
|
||||
* of this class for the common special case of maintaining sums. The
|
||||
* call {@code new DoubleAdder()} is equivalent to {@code new
|
||||
* DoubleAccumulator((x, y) -> x + y, 0.0}.
|
||||
*
|
||||
* <p>This class extends {@link Number}, but does <em>not</em> define
|
||||
* methods such as {@code equals}, {@code hashCode} and {@code
|
||||
* compareTo} because instances are expected to be mutated, and so are
|
||||
* not useful as collection keys.
|
||||
*
|
||||
* @since 1.8
|
||||
* @author Doug Lea
|
||||
*/
|
||||
public class DoubleAccumulator extends Striped64 implements Serializable {
|
||||
private static final long serialVersionUID = 7249069246863182397L;
|
||||
|
||||
private final DoubleBinaryOperator function;
|
||||
private final long identity; // use long representation
|
||||
|
||||
/**
|
||||
* Creates a new instance using the given accumulator function
|
||||
* and identity element.
|
||||
*/
|
||||
public DoubleAccumulator(DoubleBinaryOperator accumulatorFunction,
|
||||
double identity) {
|
||||
this.function = accumulatorFunction;
|
||||
base = this.identity = Double.doubleToRawLongBits(identity);
|
||||
}
|
||||
|
||||
/**
|
||||
* Updates with the given value.
|
||||
*
|
||||
* @param x the value
|
||||
*/
|
||||
public void accumulate(double x) {
|
||||
Cell[] as; long b, v, r; int m; Cell a;
|
||||
if ((as = cells) != null ||
|
||||
(r = Double.doubleToRawLongBits
|
||||
(function.operateAsDouble
|
||||
(Double.longBitsToDouble(b = base), x))) != b && !casBase(b, r)) {
|
||||
boolean uncontended = true;
|
||||
if (as == null || (m = as.length - 1) < 0 ||
|
||||
(a = as[getProbe() & m]) == null ||
|
||||
!(uncontended =
|
||||
(r = Double.doubleToRawLongBits
|
||||
(function.operateAsDouble
|
||||
(Double.longBitsToDouble(v = a.value), x))) == v ||
|
||||
a.cas(v, r)))
|
||||
doubleAccumulate(x, function, uncontended);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the current value. The returned value is <em>NOT</em>
|
||||
* an atomic snapshot; invocation in the absence of concurrent
|
||||
* updates returns an accurate result, but concurrent updates that
|
||||
* occur while the value is being calculated might not be
|
||||
* incorporated.
|
||||
*
|
||||
* @return the current value
|
||||
*/
|
||||
public double get() {
|
||||
Cell[] as = cells; Cell a;
|
||||
double result = Double.longBitsToDouble(base);
|
||||
if (as != null) {
|
||||
for (int i = 0; i < as.length; ++i) {
|
||||
if ((a = as[i]) != null)
|
||||
result = function.operateAsDouble
|
||||
(result, Double.longBitsToDouble(a.value));
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Resets variables maintaining updates to the identity value.
|
||||
* This method may be a useful alternative to creating a new
|
||||
* updater, but is only effective if there are no concurrent
|
||||
* updates. Because this method is intrinsically racy, it should
|
||||
* only be used when it is known that no threads are concurrently
|
||||
* updating.
|
||||
*/
|
||||
public void reset() {
|
||||
Cell[] as = cells; Cell a;
|
||||
base = identity;
|
||||
if (as != null) {
|
||||
for (int i = 0; i < as.length; ++i) {
|
||||
if ((a = as[i]) != null)
|
||||
a.value = identity;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Equivalent in effect to {@link #get} followed by {@link
|
||||
* #reset}. This method may apply for example during quiescent
|
||||
* points between multithreaded computations. If there are
|
||||
* updates concurrent with this method, the returned value is
|
||||
* <em>not</em> guaranteed to be the final value occurring before
|
||||
* the reset.
|
||||
*
|
||||
* @return the value before reset
|
||||
*/
|
||||
public double getThenReset() {
|
||||
Cell[] as = cells; Cell a;
|
||||
double result = Double.longBitsToDouble(base);
|
||||
base = identity;
|
||||
if (as != null) {
|
||||
for (int i = 0; i < as.length; ++i) {
|
||||
if ((a = as[i]) != null) {
|
||||
double v = Double.longBitsToDouble(a.value);
|
||||
a.value = identity;
|
||||
result = function.operateAsDouble(result, v);
|
||||
}
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the String representation of the current value.
|
||||
* @return the String representation of the current value
|
||||
*/
|
||||
public String toString() {
|
||||
return Double.toString(get());
|
||||
}
|
||||
|
||||
/**
|
||||
* Equivalent to {@link #get}.
|
||||
*
|
||||
* @return the current value
|
||||
*/
|
||||
public double doubleValue() {
|
||||
return get();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the {@linkplain #get current value} as a {@code long}
|
||||
* after a narrowing primitive conversion.
|
||||
*/
|
||||
public long longValue() {
|
||||
return (long)get();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the {@linkplain #get current value} as an {@code int}
|
||||
* after a narrowing primitive conversion.
|
||||
*/
|
||||
public int intValue() {
|
||||
return (int)get();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the {@linkplain #get current value} as a {@code float}
|
||||
* after a narrowing primitive conversion.
|
||||
*/
|
||||
public float floatValue() {
|
||||
return (float)get();
|
||||
}
|
||||
|
||||
private void writeObject(java.io.ObjectOutputStream s)
|
||||
throws java.io.IOException {
|
||||
s.defaultWriteObject();
|
||||
s.writeDouble(get());
|
||||
}
|
||||
|
||||
private void readObject(java.io.ObjectInputStream s)
|
||||
throws java.io.IOException, ClassNotFoundException {
|
||||
s.defaultReadObject();
|
||||
cellsBusy = 0;
|
||||
cells = null;
|
||||
base = Double.doubleToRawLongBits(s.readDouble());
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,227 @@
|
||||
/*
|
||||
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
||||
*
|
||||
* This code is free software; you can redistribute it and/or modify it
|
||||
* under the terms of the GNU General Public License version 2 only, as
|
||||
* published by the Free Software Foundation. Oracle designates this
|
||||
* particular file as subject to the "Classpath" exception as provided
|
||||
* by Oracle in the LICENSE file that accompanied this code.
|
||||
*
|
||||
* This code is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
|
||||
* version 2 for more details (a copy is included in the LICENSE file that
|
||||
* accompanied this code).
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License version
|
||||
* 2 along with this work; if not, write to the Free Software Foundation,
|
||||
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||
*
|
||||
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
|
||||
* or visit www.oracle.com if you need additional information or have any
|
||||
* questions.
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is available under and governed by the GNU General Public
|
||||
* License version 2 only, as published by the Free Software Foundation.
|
||||
* However, the following notice accompanied the original version of this
|
||||
* file:
|
||||
*
|
||||
* Written by Doug Lea with assistance from members of JCP JSR-166
|
||||
* Expert Group and released to the public domain, as explained at
|
||||
* http://creativecommons.org/publicdomain/zero/1.0/
|
||||
*/
|
||||
|
||||
package java.util.concurrent.atomic;
|
||||
import java.io.Serializable;
|
||||
|
||||
/**
|
||||
* One or more variables that together maintain an initially zero
|
||||
* {@code double} sum. When updates (method {@link #add}) are
|
||||
* contended across threads, the set of variables may grow dynamically
|
||||
* to reduce contention. Method {@link #sum} (or, equivalently {@link
|
||||
* #doubleValue}) returns the current total combined across the
|
||||
* variables maintaining the sum. The order of accumulation within or
|
||||
* across threads is not guaranteed. Thus, this class may not be
|
||||
* applicable if numerical stability is required, especially when
|
||||
* combining values of substantially different orders of magnitude.
|
||||
*
|
||||
* <p>This class is usually preferable to alternatives when multiple
|
||||
* threads update a common value that is used for purposes such as
|
||||
* summary statistics that are frequently updated but less frequently
|
||||
* read.
|
||||
*
|
||||
* <p>This class extends {@link Number}, but does <em>not</em> define
|
||||
* methods such as {@code equals}, {@code hashCode} and {@code
|
||||
* compareTo} because instances are expected to be mutated, and so are
|
||||
* not useful as collection keys.
|
||||
*
|
||||
* @since 1.8
|
||||
* @author Doug Lea
|
||||
*/
|
||||
public class DoubleAdder extends Striped64 implements Serializable {
|
||||
private static final long serialVersionUID = 7249069246863182397L;
|
||||
|
||||
/**
|
||||
* Note that we must use "long" for underlying representations,
|
||||
* because there is no compareAndSet for double, due to the fact
|
||||
* that the bitwise equals used in any CAS implementation is not
|
||||
* the same as double-precision equals. However, we use CAS only
|
||||
* to detect and alleviate contention, for which bitwise equals
|
||||
* works best anyway. In principle, the long/double conversions
|
||||
* used here should be essentially free on most platforms since
|
||||
* they just re-interpret bits.
|
||||
*/
|
||||
|
||||
/**
|
||||
* Creates a new adder with initial sum of zero.
|
||||
*/
|
||||
public DoubleAdder() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds the given value.
|
||||
*
|
||||
* @param x the value to add
|
||||
*/
|
||||
public void add(double x) {
|
||||
Cell[] as; long b, v; int m; Cell a;
|
||||
if ((as = cells) != null ||
|
||||
!casBase(b = base,
|
||||
Double.doubleToRawLongBits
|
||||
(Double.longBitsToDouble(b) + x))) {
|
||||
boolean uncontended = true;
|
||||
if (as == null || (m = as.length - 1) < 0 ||
|
||||
(a = as[getProbe() & m]) == null ||
|
||||
!(uncontended = a.cas(v = a.value,
|
||||
Double.doubleToRawLongBits
|
||||
(Double.longBitsToDouble(v) + x))))
|
||||
doubleAccumulate(x, null, uncontended);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the current sum. The returned value is <em>NOT</em> an
|
||||
* atomic snapshot; invocation in the absence of concurrent
|
||||
* updates returns an accurate result, but concurrent updates that
|
||||
* occur while the sum is being calculated might not be
|
||||
* incorporated. Also, because floating-point arithmetic is not
|
||||
* strictly associative, the returned result need not be identical
|
||||
* to the value that would be obtained in a sequential series of
|
||||
* updates to a single variable.
|
||||
*
|
||||
* @return the sum
|
||||
*/
|
||||
public double sum() {
|
||||
Cell[] as = cells; Cell a;
|
||||
double sum = Double.longBitsToDouble(base);
|
||||
if (as != null) {
|
||||
for (int i = 0; i < as.length; ++i) {
|
||||
if ((a = as[i]) != null)
|
||||
sum += Double.longBitsToDouble(a.value);
|
||||
}
|
||||
}
|
||||
return sum;
|
||||
}
|
||||
|
||||
/**
|
||||
* Resets variables maintaining the sum to zero. This method may
|
||||
* be a useful alternative to creating a new adder, but is only
|
||||
* effective if there are no concurrent updates. Because this
|
||||
* method is intrinsically racy, it should only be used when it is
|
||||
* known that no threads are concurrently updating.
|
||||
*/
|
||||
public void reset() {
|
||||
Cell[] as = cells; Cell a;
|
||||
base = 0L; // relies on fact that double 0 must have same rep as long
|
||||
if (as != null) {
|
||||
for (int i = 0; i < as.length; ++i) {
|
||||
if ((a = as[i]) != null)
|
||||
a.value = 0L;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Equivalent in effect to {@link #sum} followed by {@link
|
||||
* #reset}. This method may apply for example during quiescent
|
||||
* points between multithreaded computations. If there are
|
||||
* updates concurrent with this method, the returned value is
|
||||
* <em>not</em> guaranteed to be the final value occurring before
|
||||
* the reset.
|
||||
*
|
||||
* @return the sum
|
||||
*/
|
||||
public double sumThenReset() {
|
||||
Cell[] as = cells; Cell a;
|
||||
double sum = Double.longBitsToDouble(base);
|
||||
base = 0L;
|
||||
if (as != null) {
|
||||
for (int i = 0; i < as.length; ++i) {
|
||||
if ((a = as[i]) != null) {
|
||||
long v = a.value;
|
||||
a.value = 0L;
|
||||
sum += Double.longBitsToDouble(v);
|
||||
}
|
||||
}
|
||||
}
|
||||
return sum;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the String representation of the {@link #sum}.
|
||||
* @return the String representation of the {@link #sum}
|
||||
*/
|
||||
public String toString() {
|
||||
return Double.toString(sum());
|
||||
}
|
||||
|
||||
/**
|
||||
* Equivalent to {@link #sum}.
|
||||
*
|
||||
* @return the sum
|
||||
*/
|
||||
public double doubleValue() {
|
||||
return sum();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the {@link #sum} as a {@code long} after a
|
||||
* narrowing primitive conversion.
|
||||
*/
|
||||
public long longValue() {
|
||||
return (long)sum();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the {@link #sum} as an {@code int} after a
|
||||
* narrowing primitive conversion.
|
||||
*/
|
||||
public int intValue() {
|
||||
return (int)sum();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the {@link #sum} as a {@code float}
|
||||
* after a narrowing primitive conversion.
|
||||
*/
|
||||
public float floatValue() {
|
||||
return (float)sum();
|
||||
}
|
||||
|
||||
private void writeObject(java.io.ObjectOutputStream s)
|
||||
throws java.io.IOException {
|
||||
s.defaultWriteObject();
|
||||
s.writeDouble(sum());
|
||||
}
|
||||
|
||||
private void readObject(java.io.ObjectInputStream s)
|
||||
throws java.io.IOException, ClassNotFoundException {
|
||||
s.defaultReadObject();
|
||||
cellsBusy = 0;
|
||||
cells = null;
|
||||
base = Double.doubleToRawLongBits(s.readDouble());
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,236 @@
|
||||
/*
|
||||
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
||||
*
|
||||
* This code is free software; you can redistribute it and/or modify it
|
||||
* under the terms of the GNU General Public License version 2 only, as
|
||||
* published by the Free Software Foundation. Oracle designates this
|
||||
* particular file as subject to the "Classpath" exception as provided
|
||||
* by Oracle in the LICENSE file that accompanied this code.
|
||||
*
|
||||
* This code is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
|
||||
* version 2 for more details (a copy is included in the LICENSE file that
|
||||
* accompanied this code).
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License version
|
||||
* 2 along with this work; if not, write to the Free Software Foundation,
|
||||
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||
*
|
||||
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
|
||||
* or visit www.oracle.com if you need additional information or have any
|
||||
* questions.
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is available under and governed by the GNU General Public
|
||||
* License version 2 only, as published by the Free Software Foundation.
|
||||
* However, the following notice accompanied the original version of this
|
||||
* file:
|
||||
*
|
||||
* Written by Doug Lea with assistance from members of JCP JSR-166
|
||||
* Expert Group and released to the public domain, as explained at
|
||||
* http://creativecommons.org/publicdomain/zero/1.0/
|
||||
*/
|
||||
|
||||
package java.util.concurrent.atomic;
|
||||
import java.io.Serializable;
|
||||
import java.util.function.LongBinaryOperator;
|
||||
|
||||
/**
|
||||
* One or more variables that together maintain a running {@code long}
|
||||
* value updated using a supplied function. When updates (method
|
||||
* {@link #accumulate}) are contended across threads, the set of variables
|
||||
* may grow dynamically to reduce contention. Method {@link #get}
|
||||
* (or, equivalently, {@link #longValue}) returns the current value
|
||||
* across the variables maintaining updates.
|
||||
*
|
||||
* <p>This class is usually preferable to {@link AtomicLong} when
|
||||
* multiple threads update a common value that is used for purposes such
|
||||
* as collecting statistics, not for fine-grained synchronization
|
||||
* control. Under low update contention, the two classes have similar
|
||||
* characteristics. But under high contention, expected throughput of
|
||||
* this class is significantly higher, at the expense of higher space
|
||||
* consumption.
|
||||
*
|
||||
* <p>The order of accumulation within or across threads is not
|
||||
* guaranteed and cannot be depended upon, so this class is only
|
||||
* applicable to functions for which the order of accumulation does
|
||||
* not matter. The supplied accumulator function should be
|
||||
* side-effect-free, since it may be re-applied when attempted updates
|
||||
* fail due to contention among threads. The function is applied with
|
||||
* the current value as its first argument, and the given update as
|
||||
* the second argument. For example, to maintain a running maximum
|
||||
* value, you could supply {@code Long::max} along with {@code
|
||||
* Long.MIN_VALUE} as the identity.
|
||||
*
|
||||
* <p>Class {@link LongAdder} provides analogs of the functionality of
|
||||
* this class for the common special case of maintaining counts and
|
||||
* sums. The call {@code new LongAdder()} is equivalent to {@code new
|
||||
* LongAccumulator((x, y) -> x + y, 0L}.
|
||||
*
|
||||
* <p>This class extends {@link Number}, but does <em>not</em> define
|
||||
* methods such as {@code equals}, {@code hashCode} and {@code
|
||||
* compareTo} because instances are expected to be mutated, and so are
|
||||
* not useful as collection keys.
|
||||
*
|
||||
* @since 1.8
|
||||
* @author Doug Lea
|
||||
*/
|
||||
public class LongAccumulator extends Striped64 implements Serializable {
|
||||
private static final long serialVersionUID = 7249069246863182397L;
|
||||
|
||||
private final LongBinaryOperator function;
|
||||
private final long identity;
|
||||
|
||||
/**
|
||||
* Creates a new instance using the given accumulator function
|
||||
* and identity element.
|
||||
*/
|
||||
public LongAccumulator(LongBinaryOperator accumulatorFunction,
|
||||
long identity) {
|
||||
this.function = accumulatorFunction;
|
||||
base = this.identity = identity;
|
||||
}
|
||||
|
||||
/**
|
||||
* Updates with the given value.
|
||||
*
|
||||
* @param x the value
|
||||
*/
|
||||
public void accumulate(long x) {
|
||||
Cell[] as; long b, v, r; int m; Cell a;
|
||||
if ((as = cells) != null ||
|
||||
(r = function.operateAsLong(b = base, x)) != b && !casBase(b, r)) {
|
||||
boolean uncontended = true;
|
||||
if (as == null || (m = as.length - 1) < 0 ||
|
||||
(a = as[getProbe() & m]) == null ||
|
||||
!(uncontended =
|
||||
(r = function.operateAsLong(v = a.value, x)) == v ||
|
||||
a.cas(v, r)))
|
||||
longAccumulate(x, function, uncontended);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the current value. The returned value is <em>NOT</em>
|
||||
* an atomic snapshot; invocation in the absence of concurrent
|
||||
* updates returns an accurate result, but concurrent updates that
|
||||
* occur while the value is being calculated might not be
|
||||
* incorporated.
|
||||
*
|
||||
* @return the current value
|
||||
*/
|
||||
public long get() {
|
||||
Cell[] as = cells; Cell a;
|
||||
long result = base;
|
||||
if (as != null) {
|
||||
for (int i = 0; i < as.length; ++i) {
|
||||
if ((a = as[i]) != null)
|
||||
result = function.operateAsLong(result, a.value);
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Resets variables maintaining updates to the identity value.
|
||||
* This method may be a useful alternative to creating a new
|
||||
* updater, but is only effective if there are no concurrent
|
||||
* updates. Because this method is intrinsically racy, it should
|
||||
* only be used when it is known that no threads are concurrently
|
||||
* updating.
|
||||
*/
|
||||
public void reset() {
|
||||
Cell[] as = cells; Cell a;
|
||||
base = identity;
|
||||
if (as != null) {
|
||||
for (int i = 0; i < as.length; ++i) {
|
||||
if ((a = as[i]) != null)
|
||||
a.value = identity;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Equivalent in effect to {@link #get} followed by {@link
|
||||
* #reset}. This method may apply for example during quiescent
|
||||
* points between multithreaded computations. If there are
|
||||
* updates concurrent with this method, the returned value is
|
||||
* <em>not</em> guaranteed to be the final value occurring before
|
||||
* the reset.
|
||||
*
|
||||
* @return the value before reset
|
||||
*/
|
||||
public long getThenReset() {
|
||||
Cell[] as = cells; Cell a;
|
||||
long result = base;
|
||||
base = identity;
|
||||
if (as != null) {
|
||||
for (int i = 0; i < as.length; ++i) {
|
||||
if ((a = as[i]) != null) {
|
||||
long v = a.value;
|
||||
a.value = identity;
|
||||
result = function.operateAsLong(result, v);
|
||||
}
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the String representation of the current value.
|
||||
* @return the String representation of the current value
|
||||
*/
|
||||
public String toString() {
|
||||
return Long.toString(get());
|
||||
}
|
||||
|
||||
/**
|
||||
* Equivalent to {@link #get}.
|
||||
*
|
||||
* @return the current value
|
||||
*/
|
||||
public long longValue() {
|
||||
return get();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the {@linkplain #get current value} as an {@code int}
|
||||
* after a narrowing primitive conversion.
|
||||
*/
|
||||
public int intValue() {
|
||||
return (int)get();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the {@linkplain #get current value} as a {@code float}
|
||||
* after a widening primitive conversion.
|
||||
*/
|
||||
public float floatValue() {
|
||||
return (float)get();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the {@linkplain #get current value} as a {@code double}
|
||||
* after a widening primitive conversion.
|
||||
*/
|
||||
public double doubleValue() {
|
||||
return (double)get();
|
||||
}
|
||||
|
||||
private void writeObject(java.io.ObjectOutputStream s)
|
||||
throws java.io.IOException {
|
||||
s.defaultWriteObject();
|
||||
s.writeLong(get());
|
||||
}
|
||||
|
||||
private void readObject(java.io.ObjectInputStream s)
|
||||
throws java.io.IOException, ClassNotFoundException {
|
||||
s.defaultReadObject();
|
||||
cellsBusy = 0;
|
||||
cells = null;
|
||||
base = s.readLong();
|
||||
}
|
||||
|
||||
}
|
228
jdk/src/share/classes/java/util/concurrent/atomic/LongAdder.java
Normal file
228
jdk/src/share/classes/java/util/concurrent/atomic/LongAdder.java
Normal file
@ -0,0 +1,228 @@
|
||||
/*
|
||||
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
||||
*
|
||||
* This code is free software; you can redistribute it and/or modify it
|
||||
* under the terms of the GNU General Public License version 2 only, as
|
||||
* published by the Free Software Foundation. Oracle designates this
|
||||
* particular file as subject to the "Classpath" exception as provided
|
||||
* by Oracle in the LICENSE file that accompanied this code.
|
||||
*
|
||||
* This code is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
|
||||
* version 2 for more details (a copy is included in the LICENSE file that
|
||||
* accompanied this code).
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License version
|
||||
* 2 along with this work; if not, write to the Free Software Foundation,
|
||||
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||
*
|
||||
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
|
||||
* or visit www.oracle.com if you need additional information or have any
|
||||
* questions.
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is available under and governed by the GNU General Public
|
||||
* License version 2 only, as published by the Free Software Foundation.
|
||||
* However, the following notice accompanied the original version of this
|
||||
* file:
|
||||
*
|
||||
* Written by Doug Lea with assistance from members of JCP JSR-166
|
||||
* Expert Group and released to the public domain, as explained at
|
||||
* http://creativecommons.org/publicdomain/zero/1.0/
|
||||
*/
|
||||
|
||||
package java.util.concurrent.atomic;
|
||||
import java.io.Serializable;
|
||||
|
||||
/**
|
||||
* One or more variables that together maintain an initially zero
|
||||
* {@code long} sum. When updates (method {@link #add}) are contended
|
||||
* across threads, the set of variables may grow dynamically to reduce
|
||||
* contention. Method {@link #sum} (or, equivalently, {@link
|
||||
* #longValue}) returns the current total combined across the
|
||||
* variables maintaining the sum.
|
||||
*
|
||||
* <p>This class is usually preferable to {@link AtomicLong} when
|
||||
* multiple threads update a common sum that is used for purposes such
|
||||
* as collecting statistics, not for fine-grained synchronization
|
||||
* control. Under low update contention, the two classes have similar
|
||||
* characteristics. But under high contention, expected throughput of
|
||||
* this class is significantly higher, at the expense of higher space
|
||||
* consumption.
|
||||
*
|
||||
* <p>LongAdders can be used with a {@link
|
||||
* java.util.concurrent.ConcurrentHashMap} to maintain a scalable
|
||||
* frequency map (a form of histogram or multiset). For example, to
|
||||
* add a count to a {@code ConcurrentHashMap<String,LongAdder> freqs},
|
||||
* initializing if not already present, you can use {@code
|
||||
* freqs.computeIfAbsent(k -> new LongAdder()).increment();}
|
||||
*
|
||||
* <p>This class extends {@link Number}, but does <em>not</em> define
|
||||
* methods such as {@code equals}, {@code hashCode} and {@code
|
||||
* compareTo} because instances are expected to be mutated, and so are
|
||||
* not useful as collection keys.
|
||||
*
|
||||
* @since 1.8
|
||||
* @author Doug Lea
|
||||
*/
|
||||
public class LongAdder extends Striped64 implements Serializable {
|
||||
private static final long serialVersionUID = 7249069246863182397L;
|
||||
|
||||
/**
|
||||
* Creates a new adder with initial sum of zero.
|
||||
*/
|
||||
public LongAdder() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds the given value.
|
||||
*
|
||||
* @param x the value to add
|
||||
*/
|
||||
public void add(long x) {
|
||||
Cell[] as; long b, v; int m; Cell a;
|
||||
if ((as = cells) != null || !casBase(b = base, b + x)) {
|
||||
boolean uncontended = true;
|
||||
if (as == null || (m = as.length - 1) < 0 ||
|
||||
(a = as[getProbe() & m]) == null ||
|
||||
!(uncontended = a.cas(v = a.value, v + x)))
|
||||
longAccumulate(x, null, uncontended);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Equivalent to {@code add(1)}.
|
||||
*/
|
||||
public void increment() {
|
||||
add(1L);
|
||||
}
|
||||
|
||||
/**
|
||||
* Equivalent to {@code add(-1)}.
|
||||
*/
|
||||
public void decrement() {
|
||||
add(-1L);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the current sum. The returned value is <em>NOT</em> an
|
||||
* atomic snapshot; invocation in the absence of concurrent
|
||||
* updates returns an accurate result, but concurrent updates that
|
||||
* occur while the sum is being calculated might not be
|
||||
* incorporated.
|
||||
*
|
||||
* @return the sum
|
||||
*/
|
||||
public long sum() {
|
||||
Cell[] as = cells; Cell a;
|
||||
long sum = base;
|
||||
if (as != null) {
|
||||
for (int i = 0; i < as.length; ++i) {
|
||||
if ((a = as[i]) != null)
|
||||
sum += a.value;
|
||||
}
|
||||
}
|
||||
return sum;
|
||||
}
|
||||
|
||||
/**
|
||||
* Resets variables maintaining the sum to zero. This method may
|
||||
* be a useful alternative to creating a new adder, but is only
|
||||
* effective if there are no concurrent updates. Because this
|
||||
* method is intrinsically racy, it should only be used when it is
|
||||
* known that no threads are concurrently updating.
|
||||
*/
|
||||
public void reset() {
|
||||
Cell[] as = cells; Cell a;
|
||||
base = 0L;
|
||||
if (as != null) {
|
||||
for (int i = 0; i < as.length; ++i) {
|
||||
if ((a = as[i]) != null)
|
||||
a.value = 0L;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Equivalent in effect to {@link #sum} followed by {@link
|
||||
* #reset}. This method may apply for example during quiescent
|
||||
* points between multithreaded computations. If there are
|
||||
* updates concurrent with this method, the returned value is
|
||||
* <em>not</em> guaranteed to be the final value occurring before
|
||||
* the reset.
|
||||
*
|
||||
* @return the sum
|
||||
*/
|
||||
public long sumThenReset() {
|
||||
Cell[] as = cells; Cell a;
|
||||
long sum = base;
|
||||
base = 0L;
|
||||
if (as != null) {
|
||||
for (int i = 0; i < as.length; ++i) {
|
||||
if ((a = as[i]) != null) {
|
||||
sum += a.value;
|
||||
a.value = 0L;
|
||||
}
|
||||
}
|
||||
}
|
||||
return sum;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the String representation of the {@link #sum}.
|
||||
* @return the String representation of the {@link #sum}
|
||||
*/
|
||||
public String toString() {
|
||||
return Long.toString(sum());
|
||||
}
|
||||
|
||||
/**
|
||||
* Equivalent to {@link #sum}.
|
||||
*
|
||||
* @return the sum
|
||||
*/
|
||||
public long longValue() {
|
||||
return sum();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the {@link #sum} as an {@code int} after a narrowing
|
||||
* primitive conversion.
|
||||
*/
|
||||
public int intValue() {
|
||||
return (int)sum();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the {@link #sum} as a {@code float}
|
||||
* after a widening primitive conversion.
|
||||
*/
|
||||
public float floatValue() {
|
||||
return (float)sum();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the {@link #sum} as a {@code double} after a widening
|
||||
* primitive conversion.
|
||||
*/
|
||||
public double doubleValue() {
|
||||
return (double)sum();
|
||||
}
|
||||
|
||||
private void writeObject(java.io.ObjectOutputStream s)
|
||||
throws java.io.IOException {
|
||||
s.defaultWriteObject();
|
||||
s.writeLong(sum());
|
||||
}
|
||||
|
||||
private void readObject(java.io.ObjectInputStream s)
|
||||
throws java.io.IOException, ClassNotFoundException {
|
||||
s.defaultReadObject();
|
||||
cellsBusy = 0;
|
||||
cells = null;
|
||||
base = s.readLong();
|
||||
}
|
||||
|
||||
}
|
417
jdk/src/share/classes/java/util/concurrent/atomic/Striped64.java
Normal file
417
jdk/src/share/classes/java/util/concurrent/atomic/Striped64.java
Normal file
@ -0,0 +1,417 @@
|
||||
/*
|
||||
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
||||
*
|
||||
* This code is free software; you can redistribute it and/or modify it
|
||||
* under the terms of the GNU General Public License version 2 only, as
|
||||
* published by the Free Software Foundation. Oracle designates this
|
||||
* particular file as subject to the "Classpath" exception as provided
|
||||
* by Oracle in the LICENSE file that accompanied this code.
|
||||
*
|
||||
* This code is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
|
||||
* version 2 for more details (a copy is included in the LICENSE file that
|
||||
* accompanied this code).
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License version
|
||||
* 2 along with this work; if not, write to the Free Software Foundation,
|
||||
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||
*
|
||||
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
|
||||
* or visit www.oracle.com if you need additional information or have any
|
||||
* questions.
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is available under and governed by the GNU General Public
|
||||
* License version 2 only, as published by the Free Software Foundation.
|
||||
* However, the following notice accompanied the original version of this
|
||||
* file:
|
||||
*
|
||||
* Written by Doug Lea with assistance from members of JCP JSR-166
|
||||
* Expert Group and released to the public domain, as explained at
|
||||
* http://creativecommons.org/publicdomain/zero/1.0/
|
||||
*/
|
||||
|
||||
package java.util.concurrent.atomic;
|
||||
import java.util.function.LongBinaryOperator;
|
||||
import java.util.function.DoubleBinaryOperator;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
|
||||
/**
|
||||
* A package-local class holding common representation and mechanics
|
||||
* for classes supporting dynamic striping on 64bit values. The class
|
||||
* extends Number so that concrete subclasses must publicly do so.
|
||||
*/
|
||||
abstract class Striped64 extends Number {
|
||||
/*
|
||||
* This class maintains a lazily-initialized table of atomically
|
||||
* updated variables, plus an extra "base" field. The table size
|
||||
* is a power of two. Indexing uses masked per-thread hash codes.
|
||||
* Nearly all declarations in this class are package-private,
|
||||
* accessed directly by subclasses.
|
||||
*
|
||||
* Table entries are of class Cell; a variant of AtomicLong padded
|
||||
* to reduce cache contention on most processors. Padding is
|
||||
* overkill for most Atomics because they are usually irregularly
|
||||
* scattered in memory and thus don't interfere much with each
|
||||
* other. But Atomic objects residing in arrays will tend to be
|
||||
* placed adjacent to each other, and so will most often share
|
||||
* cache lines (with a huge negative performance impact) without
|
||||
* this precaution.
|
||||
*
|
||||
* In part because Cells are relatively large, we avoid creating
|
||||
* them until they are needed. When there is no contention, all
|
||||
* updates are made to the base field. Upon first contention (a
|
||||
* failed CAS on base update), the table is initialized to size 2.
|
||||
* The table size is doubled upon further contention until
|
||||
* reaching the nearest power of two greater than or equal to the
|
||||
* number of CPUS. Table slots remain empty (null) until they are
|
||||
* needed.
|
||||
*
|
||||
* A single spinlock ("cellsBusy") is used for initializing and
|
||||
* resizing the table, as well as populating slots with new Cells.
|
||||
* There is no need for a blocking lock; when the lock is not
|
||||
* available, threads try other slots (or the base). During these
|
||||
* retries, there is increased contention and reduced locality,
|
||||
* which is still better than alternatives.
|
||||
*
|
||||
* The Thread probe fields maintained via ThreadLocalRandom serve
|
||||
* as per-thread hash codes. We let them remain uninitialized as
|
||||
* zero (if they come in this way) until they contend at slot
|
||||
* 0. They are then initialized to values that typically do not
|
||||
* often conflict with others. Contention and/or table collisions
|
||||
* are indicated by failed CASes when performing an update
|
||||
* operation. Upon a collision, if the table size is less than
|
||||
* the capacity, it is doubled in size unless some other thread
|
||||
* holds the lock. If a hashed slot is empty, and lock is
|
||||
* available, a new Cell is created. Otherwise, if the slot
|
||||
* exists, a CAS is tried. Retries proceed by "double hashing",
|
||||
* using a secondary hash (Marsaglia XorShift) to try to find a
|
||||
* free slot.
|
||||
*
|
||||
* The table size is capped because, when there are more threads
|
||||
* than CPUs, supposing that each thread were bound to a CPU,
|
||||
* there would exist a perfect hash function mapping threads to
|
||||
* slots that eliminates collisions. When we reach capacity, we
|
||||
* search for this mapping by randomly varying the hash codes of
|
||||
* colliding threads. Because search is random, and collisions
|
||||
* only become known via CAS failures, convergence can be slow,
|
||||
* and because threads are typically not bound to CPUS forever,
|
||||
* may not occur at all. However, despite these limitations,
|
||||
* observed contention rates are typically low in these cases.
|
||||
*
|
||||
* It is possible for a Cell to become unused when threads that
|
||||
* once hashed to it terminate, as well as in the case where
|
||||
* doubling the table causes no thread to hash to it under
|
||||
* expanded mask. We do not try to detect or remove such cells,
|
||||
* under the assumption that for long-running instances, observed
|
||||
* contention levels will recur, so the cells will eventually be
|
||||
* needed again; and for short-lived ones, it does not matter.
|
||||
*/
|
||||
|
||||
/**
|
||||
* Padded variant of AtomicLong supporting only raw accesses plus CAS.
|
||||
* The value field is placed between pads, hoping that the JVM doesn't
|
||||
* reorder them.
|
||||
*
|
||||
* JVM intrinsics note: It would be possible to use a release-only
|
||||
* form of CAS here, if it were provided.
|
||||
*/
|
||||
static final class Cell {
|
||||
volatile long p0, p1, p2, p3, p4, p5, p6;
|
||||
volatile long value;
|
||||
volatile long q0, q1, q2, q3, q4, q5, q6;
|
||||
Cell(long x) { value = x; }
|
||||
|
||||
final boolean cas(long cmp, long val) {
|
||||
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
|
||||
}
|
||||
|
||||
// Unsafe mechanics
|
||||
private static final sun.misc.Unsafe UNSAFE;
|
||||
private static final long valueOffset;
|
||||
static {
|
||||
try {
|
||||
UNSAFE = sun.misc.Unsafe.getUnsafe();
|
||||
Class<?> ak = Cell.class;
|
||||
valueOffset = UNSAFE.objectFieldOffset
|
||||
(ak.getDeclaredField("value"));
|
||||
} catch (Exception e) {
|
||||
throw new Error(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** Number of CPUS, to place bound on table size */
|
||||
static final int NCPU = Runtime.getRuntime().availableProcessors();
|
||||
|
||||
/**
|
||||
* Table of cells. When non-null, size is a power of 2.
|
||||
*/
|
||||
transient volatile Cell[] cells;
|
||||
|
||||
/**
|
||||
* Base value, used mainly when there is no contention, but also as
|
||||
* a fallback during table initialization races. Updated via CAS.
|
||||
*/
|
||||
transient volatile long base;
|
||||
|
||||
/**
|
||||
* Spinlock (locked via CAS) used when resizing and/or creating Cells.
|
||||
*/
|
||||
transient volatile int cellsBusy;
|
||||
|
||||
/**
|
||||
* Package-private default constructor
|
||||
*/
|
||||
Striped64() {
|
||||
}
|
||||
|
||||
/**
|
||||
* CASes the base field.
|
||||
*/
|
||||
final boolean casBase(long cmp, long val) {
|
||||
return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
|
||||
}
|
||||
|
||||
/**
|
||||
* CASes the cellsBusy field from 0 to 1 to acquire lock.
|
||||
*/
|
||||
final boolean casCellsBusy() {
|
||||
return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the probe value for the current thread.
|
||||
* Duplicated from ThreadLocalRandom because of packaging restrictions.
|
||||
*/
|
||||
static final int getProbe() {
|
||||
return UNSAFE.getInt(Thread.currentThread(), PROBE);
|
||||
}
|
||||
|
||||
/**
|
||||
* Pseudo-randomly advances and records the given probe value for the
|
||||
* given thread.
|
||||
* Duplicated from ThreadLocalRandom because of packaging restrictions.
|
||||
*/
|
||||
static final int advanceProbe(int probe) {
|
||||
probe ^= probe << 13; // xorshift
|
||||
probe ^= probe >>> 17;
|
||||
probe ^= probe << 5;
|
||||
UNSAFE.putInt(Thread.currentThread(), PROBE, probe);
|
||||
return probe;
|
||||
}
|
||||
|
||||
/**
|
||||
* Handles cases of updates involving initialization, resizing,
|
||||
* creating new Cells, and/or contention. See above for
|
||||
* explanation. This method suffers the usual non-modularity
|
||||
* problems of optimistic retry code, relying on rechecked sets of
|
||||
* reads.
|
||||
*
|
||||
* @param x the value
|
||||
* @param fn the update function, or null for add (this convention
|
||||
* avoids the need for an extra field or function in LongAdder).
|
||||
* @param wasUncontended false if CAS failed before call
|
||||
*/
|
||||
final void longAccumulate(long x, LongBinaryOperator fn,
|
||||
boolean wasUncontended) {
|
||||
int h;
|
||||
if ((h = getProbe()) == 0) {
|
||||
ThreadLocalRandom.current(); // force initialization
|
||||
h = getProbe();
|
||||
wasUncontended = true;
|
||||
}
|
||||
boolean collide = false; // True if last slot nonempty
|
||||
for (;;) {
|
||||
Cell[] as; Cell a; int n; long v;
|
||||
if ((as = cells) != null && (n = as.length) > 0) {
|
||||
if ((a = as[(n - 1) & h]) == null) {
|
||||
if (cellsBusy == 0) { // Try to attach new Cell
|
||||
Cell r = new Cell(x); // Optimistically create
|
||||
if (cellsBusy == 0 && casCellsBusy()) {
|
||||
boolean created = false;
|
||||
try { // Recheck under lock
|
||||
Cell[] rs; int m, j;
|
||||
if ((rs = cells) != null &&
|
||||
(m = rs.length) > 0 &&
|
||||
rs[j = (m - 1) & h] == null) {
|
||||
rs[j] = r;
|
||||
created = true;
|
||||
}
|
||||
} finally {
|
||||
cellsBusy = 0;
|
||||
}
|
||||
if (created)
|
||||
break;
|
||||
continue; // Slot is now non-empty
|
||||
}
|
||||
}
|
||||
collide = false;
|
||||
}
|
||||
else if (!wasUncontended) // CAS already known to fail
|
||||
wasUncontended = true; // Continue after rehash
|
||||
else if (a.cas(v = a.value, ((fn == null) ? v + x :
|
||||
fn.operateAsLong(v, x))))
|
||||
break;
|
||||
else if (n >= NCPU || cells != as)
|
||||
collide = false; // At max size or stale
|
||||
else if (!collide)
|
||||
collide = true;
|
||||
else if (cellsBusy == 0 && casCellsBusy()) {
|
||||
try {
|
||||
if (cells == as) { // Expand table unless stale
|
||||
Cell[] rs = new Cell[n << 1];
|
||||
for (int i = 0; i < n; ++i)
|
||||
rs[i] = as[i];
|
||||
cells = rs;
|
||||
}
|
||||
} finally {
|
||||
cellsBusy = 0;
|
||||
}
|
||||
collide = false;
|
||||
continue; // Retry with expanded table
|
||||
}
|
||||
h = advanceProbe(h);
|
||||
}
|
||||
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
|
||||
boolean init = false;
|
||||
try { // Initialize table
|
||||
if (cells == as) {
|
||||
Cell[] rs = new Cell[2];
|
||||
rs[h & 1] = new Cell(x);
|
||||
cells = rs;
|
||||
init = true;
|
||||
}
|
||||
} finally {
|
||||
cellsBusy = 0;
|
||||
}
|
||||
if (init)
|
||||
break;
|
||||
}
|
||||
else if (casBase(v = base, ((fn == null) ? v + x :
|
||||
fn.operateAsLong(v, x))))
|
||||
break; // Fall back on using base
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Same as longAccumulate, but injecting long/double conversions
|
||||
* in too many places to sensibly merge with long version, given
|
||||
* the low-overhead requirements of this class. So must instead be
|
||||
* maintained by copy/paste/adapt.
|
||||
*/
|
||||
final void doubleAccumulate(double x, DoubleBinaryOperator fn,
|
||||
boolean wasUncontended) {
|
||||
int h;
|
||||
if ((h = getProbe()) == 0) {
|
||||
ThreadLocalRandom.current(); // force initialization
|
||||
h = getProbe();
|
||||
wasUncontended = true;
|
||||
}
|
||||
boolean collide = false; // True if last slot nonempty
|
||||
for (;;) {
|
||||
Cell[] as; Cell a; int n; long v;
|
||||
if ((as = cells) != null && (n = as.length) > 0) {
|
||||
if ((a = as[(n - 1) & h]) == null) {
|
||||
if (cellsBusy == 0) { // Try to attach new Cell
|
||||
Cell r = new Cell(Double.doubleToRawLongBits(x));
|
||||
if (cellsBusy == 0 && casCellsBusy()) {
|
||||
boolean created = false;
|
||||
try { // Recheck under lock
|
||||
Cell[] rs; int m, j;
|
||||
if ((rs = cells) != null &&
|
||||
(m = rs.length) > 0 &&
|
||||
rs[j = (m - 1) & h] == null) {
|
||||
rs[j] = r;
|
||||
created = true;
|
||||
}
|
||||
} finally {
|
||||
cellsBusy = 0;
|
||||
}
|
||||
if (created)
|
||||
break;
|
||||
continue; // Slot is now non-empty
|
||||
}
|
||||
}
|
||||
collide = false;
|
||||
}
|
||||
else if (!wasUncontended) // CAS already known to fail
|
||||
wasUncontended = true; // Continue after rehash
|
||||
else if (a.cas(v = a.value,
|
||||
((fn == null) ?
|
||||
Double.doubleToRawLongBits
|
||||
(Double.longBitsToDouble(v) + x) :
|
||||
Double.doubleToRawLongBits
|
||||
(fn.operateAsDouble
|
||||
(Double.longBitsToDouble(v), x)))))
|
||||
break;
|
||||
else if (n >= NCPU || cells != as)
|
||||
collide = false; // At max size or stale
|
||||
else if (!collide)
|
||||
collide = true;
|
||||
else if (cellsBusy == 0 && casCellsBusy()) {
|
||||
try {
|
||||
if (cells == as) { // Expand table unless stale
|
||||
Cell[] rs = new Cell[n << 1];
|
||||
for (int i = 0; i < n; ++i)
|
||||
rs[i] = as[i];
|
||||
cells = rs;
|
||||
}
|
||||
} finally {
|
||||
cellsBusy = 0;
|
||||
}
|
||||
collide = false;
|
||||
continue; // Retry with expanded table
|
||||
}
|
||||
h = advanceProbe(h);
|
||||
}
|
||||
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
|
||||
boolean init = false;
|
||||
try { // Initialize table
|
||||
if (cells == as) {
|
||||
Cell[] rs = new Cell[2];
|
||||
rs[h & 1] = new Cell(Double.doubleToRawLongBits(x));
|
||||
cells = rs;
|
||||
init = true;
|
||||
}
|
||||
} finally {
|
||||
cellsBusy = 0;
|
||||
}
|
||||
if (init)
|
||||
break;
|
||||
}
|
||||
else if (casBase(v = base,
|
||||
((fn == null) ?
|
||||
Double.doubleToRawLongBits
|
||||
(Double.longBitsToDouble(v) + x) :
|
||||
Double.doubleToRawLongBits
|
||||
(fn.operateAsDouble
|
||||
(Double.longBitsToDouble(v), x)))))
|
||||
break; // Fall back on using base
|
||||
}
|
||||
}
|
||||
|
||||
// Unsafe mechanics
|
||||
private static final sun.misc.Unsafe UNSAFE;
|
||||
private static final long BASE;
|
||||
private static final long CELLSBUSY;
|
||||
private static final long PROBE;
|
||||
static {
|
||||
try {
|
||||
UNSAFE = sun.misc.Unsafe.getUnsafe();
|
||||
Class<?> sk = Striped64.class;
|
||||
BASE = UNSAFE.objectFieldOffset
|
||||
(sk.getDeclaredField("base"));
|
||||
CELLSBUSY = UNSAFE.objectFieldOffset
|
||||
(sk.getDeclaredField("cellsBusy"));
|
||||
Class<?> tk = Thread.class;
|
||||
PROBE = UNSAFE.objectFieldOffset
|
||||
(tk.getDeclaredField("threadLocalRandomProbe"));
|
||||
} catch (Exception e) {
|
||||
throw new Error(e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
164
jdk/test/java/util/concurrent/atomic/DoubleAdderDemo.java
Normal file
164
jdk/test/java/util/concurrent/atomic/DoubleAdderDemo.java
Normal file
@ -0,0 +1,164 @@
|
||||
/*
|
||||
* Copyright (c) 2013, Oracle and/or its affiliates. All rights reserved.
|
||||
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
||||
*
|
||||
* This code is free software; you can redistribute it and/or modify it
|
||||
* under the terms of the GNU General Public License version 2 only, as
|
||||
* published by the Free Software Foundation.
|
||||
*
|
||||
* This code is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
|
||||
* version 2 for more details (a copy is included in the LICENSE file that
|
||||
* accompanied this code).
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License version
|
||||
* 2 along with this work; if not, write to the Free Software Foundation,
|
||||
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||
*
|
||||
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
|
||||
* or visit www.oracle.com if you need additional information or have any
|
||||
* questions.
|
||||
*/
|
||||
|
||||
/*
|
||||
* Written by Doug Lea with assistance from members of JCP JSR-166
|
||||
* Expert Group and released to the public domain, as explained at
|
||||
* http://creativecommons.org/publicdomain/zero/1.0/
|
||||
*/
|
||||
|
||||
/* Adapted from Dougs CVS test/jsr166e/DoubleAdderDemo.java
|
||||
*
|
||||
* The demo is a micro-benchmark to compare synchronized access to a primitive
|
||||
* double and DoubleAdder (run without any args), this restricted version simply
|
||||
* exercises the basic functionality of DoubleAdder, suitable for automated
|
||||
* testing (-shortrun).
|
||||
*/
|
||||
|
||||
/*
|
||||
* @test
|
||||
* @bug 8005311
|
||||
* @run main DoubleAdderDemo -shortrun
|
||||
* @summary Basic test for Doubledder
|
||||
*/
|
||||
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Phaser;
|
||||
import java.util.concurrent.atomic.DoubleAdder;
|
||||
|
||||
public class DoubleAdderDemo {
|
||||
static final int INCS_PER_THREAD = 10000000;
|
||||
static final int NCPU = Runtime.getRuntime().availableProcessors();
|
||||
static final int SHORT_RUN_MAX_THREADS = NCPU > 1 ? NCPU / 2 : 1;
|
||||
static final int LONG_RUN_MAX_THREADS = NCPU * 2;
|
||||
static final ExecutorService pool = Executors.newCachedThreadPool();
|
||||
|
||||
static final class SynchronizedDoubleAdder {
|
||||
double value;
|
||||
synchronized double sum() { return value; }
|
||||
synchronized void add(double x) { value += x; }
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
boolean shortRun = args.length > 0 && args[0].equals("-shortrun");
|
||||
int maxNumThreads = shortRun ? SHORT_RUN_MAX_THREADS : LONG_RUN_MAX_THREADS;
|
||||
|
||||
System.out.println("Warmup...");
|
||||
int half = NCPU > 1 ? NCPU / 2 : 1;
|
||||
if (!shortRun)
|
||||
syncTest(half, 1000);
|
||||
adderTest(half, 1000);
|
||||
|
||||
for (int reps = 0; reps < 2; ++reps) {
|
||||
System.out.println("Running...");
|
||||
for (int i = 1; i <= maxNumThreads; i <<= 1) {
|
||||
if (!shortRun)
|
||||
syncTest(i, INCS_PER_THREAD);
|
||||
adderTest(i, INCS_PER_THREAD);
|
||||
}
|
||||
}
|
||||
pool.shutdown();
|
||||
}
|
||||
|
||||
static void syncTest(int nthreads, int incs) {
|
||||
System.out.print("Synchronized ");
|
||||
Phaser phaser = new Phaser(nthreads + 1);
|
||||
SynchronizedDoubleAdder a = new SynchronizedDoubleAdder();
|
||||
for (int i = 0; i < nthreads; ++i)
|
||||
pool.execute(new SyncTask(a, phaser, incs));
|
||||
report(nthreads, incs, timeTasks(phaser), a.sum());
|
||||
}
|
||||
|
||||
static void adderTest(int nthreads, int incs) {
|
||||
System.out.print("DoubleAdder ");
|
||||
Phaser phaser = new Phaser(nthreads + 1);
|
||||
DoubleAdder a = new DoubleAdder();
|
||||
for (int i = 0; i < nthreads; ++i)
|
||||
pool.execute(new AdderTask(a, phaser, incs));
|
||||
report(nthreads, incs, timeTasks(phaser), a.sum());
|
||||
}
|
||||
|
||||
static void report(int nthreads, int incs, long time, double sum) {
|
||||
long total = (long)nthreads * incs;
|
||||
if (sum != (double)total)
|
||||
throw new Error(sum + " != " + total);
|
||||
double secs = (double)time / (1000L * 1000 * 1000);
|
||||
long rate = total * (1000L) / time;
|
||||
System.out.printf("threads:%3d Time: %7.3fsec Incs per microsec: %4d\n",
|
||||
nthreads, secs, rate);
|
||||
}
|
||||
|
||||
static long timeTasks(Phaser phaser) {
|
||||
phaser.arriveAndAwaitAdvance();
|
||||
long start = System.nanoTime();
|
||||
phaser.arriveAndAwaitAdvance();
|
||||
phaser.arriveAndAwaitAdvance();
|
||||
return System.nanoTime() - start;
|
||||
}
|
||||
|
||||
static final class AdderTask implements Runnable {
|
||||
final DoubleAdder adder;
|
||||
final Phaser phaser;
|
||||
final int incs;
|
||||
volatile double result;
|
||||
AdderTask(DoubleAdder adder, Phaser phaser, int incs) {
|
||||
this.adder = adder;
|
||||
this.phaser = phaser;
|
||||
this.incs = incs;
|
||||
}
|
||||
|
||||
public void run() {
|
||||
phaser.arriveAndAwaitAdvance();
|
||||
phaser.arriveAndAwaitAdvance();
|
||||
DoubleAdder a = adder;
|
||||
for (int i = 0; i < incs; ++i)
|
||||
a.add(1.0);
|
||||
result = a.sum();
|
||||
phaser.arrive();
|
||||
}
|
||||
}
|
||||
|
||||
static final class SyncTask implements Runnable {
|
||||
final SynchronizedDoubleAdder adder;
|
||||
final Phaser phaser;
|
||||
final int incs;
|
||||
volatile double result;
|
||||
SyncTask(SynchronizedDoubleAdder adder, Phaser phaser, int incs) {
|
||||
this.adder = adder;
|
||||
this.phaser = phaser;
|
||||
this.incs = incs;
|
||||
}
|
||||
|
||||
public void run() {
|
||||
phaser.arriveAndAwaitAdvance();
|
||||
phaser.arriveAndAwaitAdvance();
|
||||
SynchronizedDoubleAdder a = adder;
|
||||
for (int i = 0; i < incs; ++i)
|
||||
a.add(1.0);
|
||||
result = a.sum();
|
||||
phaser.arrive();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
158
jdk/test/java/util/concurrent/atomic/LongAdderDemo.java
Normal file
158
jdk/test/java/util/concurrent/atomic/LongAdderDemo.java
Normal file
@ -0,0 +1,158 @@
|
||||
/*
|
||||
* Copyright (c) 2013, Oracle and/or its affiliates. All rights reserved.
|
||||
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
||||
*
|
||||
* This code is free software; you can redistribute it and/or modify it
|
||||
* under the terms of the GNU General Public License version 2 only, as
|
||||
* published by the Free Software Foundation.
|
||||
*
|
||||
* This code is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
|
||||
* version 2 for more details (a copy is included in the LICENSE file that
|
||||
* accompanied this code).
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License version
|
||||
* 2 along with this work; if not, write to the Free Software Foundation,
|
||||
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||
*
|
||||
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
|
||||
* or visit www.oracle.com if you need additional information or have any
|
||||
* questions.
|
||||
*/
|
||||
|
||||
/*
|
||||
* Written by Doug Lea with assistance from members of JCP JSR-166
|
||||
* Expert Group and released to the public domain, as explained at
|
||||
* http://creativecommons.org/publicdomain/zero/1.0/
|
||||
*/
|
||||
|
||||
/* Adapted from Dougs CVS test/jsr166e/LongAdderDemo.java
|
||||
*
|
||||
* The demo is a micro-benchmark to compare AtomicLong and LongAdder (run
|
||||
* without any args), this restricted version simply exercises the basic
|
||||
* functionality of LongAdder, suitable for automated testing (-shortrun).
|
||||
*/
|
||||
|
||||
/*
|
||||
* @test
|
||||
* @bug 8005311
|
||||
* @run main LongAdderDemo -shortrun
|
||||
* @summary Basic test for LongAdder
|
||||
*/
|
||||
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Phaser;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.LongAdder;
|
||||
|
||||
public class LongAdderDemo {
|
||||
static final int INCS_PER_THREAD = 10000000;
|
||||
static final int NCPU = Runtime.getRuntime().availableProcessors();
|
||||
static final int SHORT_RUN_MAX_THREADS = NCPU > 1 ? NCPU / 2 : 1;
|
||||
static final int LONG_RUN_MAX_THREADS = NCPU * 2;
|
||||
static final ExecutorService pool = Executors.newCachedThreadPool();
|
||||
|
||||
public static void main(String[] args) {
|
||||
boolean shortRun = args.length > 0 && args[0].equals("-shortrun");
|
||||
int maxNumThreads = shortRun ? SHORT_RUN_MAX_THREADS : LONG_RUN_MAX_THREADS;
|
||||
|
||||
System.out.println("Warmup...");
|
||||
int half = NCPU > 1 ? NCPU / 2 : 1;
|
||||
if (!shortRun)
|
||||
casTest(half, 1000);
|
||||
adderTest(half, 1000);
|
||||
|
||||
for (int reps = 0; reps < 2; ++reps) {
|
||||
System.out.println("Running...");
|
||||
for (int i = 1; i <= maxNumThreads; i <<= 1) {
|
||||
if (!shortRun)
|
||||
casTest(i, INCS_PER_THREAD);
|
||||
adderTest(i, INCS_PER_THREAD);
|
||||
}
|
||||
}
|
||||
pool.shutdown();
|
||||
}
|
||||
|
||||
static void casTest(int nthreads, int incs) {
|
||||
System.out.print("AtomicLong ");
|
||||
Phaser phaser = new Phaser(nthreads + 1);
|
||||
AtomicLong a = new AtomicLong();
|
||||
for (int i = 0; i < nthreads; ++i)
|
||||
pool.execute(new CasTask(a, phaser, incs));
|
||||
report(nthreads, incs, timeTasks(phaser), a.get());
|
||||
}
|
||||
|
||||
static void adderTest(int nthreads, int incs) {
|
||||
System.out.print("LongAdder ");
|
||||
Phaser phaser = new Phaser(nthreads + 1);
|
||||
LongAdder a = new LongAdder();
|
||||
for (int i = 0; i < nthreads; ++i)
|
||||
pool.execute(new AdderTask(a, phaser, incs));
|
||||
report(nthreads, incs, timeTasks(phaser), a.sum());
|
||||
}
|
||||
|
||||
static void report(int nthreads, int incs, long time, long sum) {
|
||||
long total = (long)nthreads * incs;
|
||||
if (sum != total)
|
||||
throw new Error(sum + " != " + total);
|
||||
double secs = (double)time / (1000L * 1000 * 1000);
|
||||
long rate = total * (1000L) / time;
|
||||
System.out.printf("threads:%3d Time: %7.3fsec Incs per microsec: %4d\n",
|
||||
nthreads, secs, rate);
|
||||
}
|
||||
|
||||
static long timeTasks(Phaser phaser) {
|
||||
phaser.arriveAndAwaitAdvance();
|
||||
long start = System.nanoTime();
|
||||
phaser.arriveAndAwaitAdvance();
|
||||
phaser.arriveAndAwaitAdvance();
|
||||
return System.nanoTime() - start;
|
||||
}
|
||||
|
||||
static final class AdderTask implements Runnable {
|
||||
final LongAdder adder;
|
||||
final Phaser phaser;
|
||||
final int incs;
|
||||
volatile long result;
|
||||
AdderTask(LongAdder adder, Phaser phaser, int incs) {
|
||||
this.adder = adder;
|
||||
this.phaser = phaser;
|
||||
this.incs = incs;
|
||||
}
|
||||
|
||||
public void run() {
|
||||
phaser.arriveAndAwaitAdvance();
|
||||
phaser.arriveAndAwaitAdvance();
|
||||
LongAdder a = adder;
|
||||
for (int i = 0; i < incs; ++i)
|
||||
a.increment();
|
||||
result = a.sum();
|
||||
phaser.arrive();
|
||||
}
|
||||
}
|
||||
|
||||
static final class CasTask implements Runnable {
|
||||
final AtomicLong adder;
|
||||
final Phaser phaser;
|
||||
final int incs;
|
||||
volatile long result;
|
||||
CasTask(AtomicLong adder, Phaser phaser, int incs) {
|
||||
this.adder = adder;
|
||||
this.phaser = phaser;
|
||||
this.incs = incs;
|
||||
}
|
||||
|
||||
public void run() {
|
||||
phaser.arriveAndAwaitAdvance();
|
||||
phaser.arriveAndAwaitAdvance();
|
||||
AtomicLong a = adder;
|
||||
for (int i = 0; i < incs; ++i)
|
||||
a.getAndIncrement();
|
||||
result = a.get();
|
||||
phaser.arrive();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user