Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,15 @@
class Buffer {

private static final long bufferActiveBit = 1L << 63;
private final AtomicLong observationCount = new AtomicLong(0);
// Tracking observation counts requires an AtomicLong for coordination between recording and
// collecting. AtomicLong does much worse under contention than the LongAdder instances used
// elsewhere to hold aggregated state. To improve, we stripe the AtomicLong into N instances,
// where N is the number of available processors. Each record operation chooses the appropriate
// instance to use based on the modulo of its thread id and N. This is a more naive / simple
// implementation compared to the striping used under the hood in java.util.concurrent classes
// like LongAdder - contention and hot spots can still occur if recording thread ids happen to
// resolve to the same index. Further improvement is possible.
private final AtomicLong[] stripedObservationCounts;
private double[] observationBuffer = new double[0];
private int bufferPos = 0;
private boolean reset = false;
Expand All @@ -27,8 +35,17 @@ class Buffer {
ReentrantLock runLock = new ReentrantLock();
Condition bufferFilled = appendLock.newCondition();

Buffer() {
stripedObservationCounts = new AtomicLong[Runtime.getRuntime().availableProcessors()];
for (int i = 0; i < stripedObservationCounts.length; i++) {
stripedObservationCounts[i] = new AtomicLong(0);
}
}

boolean append(double value) {
long count = observationCount.incrementAndGet();
int index = Math.abs((int) Thread.currentThread().getId()) % stripedObservationCounts.length;
AtomicLong observationCountForThread = stripedObservationCounts[index];
long count = observationCountForThread.incrementAndGet();
if ((count & bufferActiveBit) == 0) {
return false; // sign bit not set -> buffer not active.
} else {
Expand Down Expand Up @@ -69,7 +86,10 @@ <T extends DataPointSnapshot> T run(
runLock.lock();
try {
// Signal that the buffer is active.
Long expectedCount = observationCount.getAndAdd(bufferActiveBit);
long expectedCount = 0L;
for (AtomicLong observationCount : stripedObservationCounts) {
expectedCount += observationCount.getAndAdd(bufferActiveBit);
}

while (!complete.apply(expectedCount)) {
// Wait until all in-flight threads have added their observations to the histogram /
Expand All @@ -81,14 +101,18 @@ <T extends DataPointSnapshot> T run(
result = createResult.get();

// Signal that the buffer is inactive.
int expectedBufferSize;
long expectedBufferSize = 0;
if (reset) {
expectedBufferSize =
(int) ((observationCount.getAndSet(0) & ~bufferActiveBit) - expectedCount);
for (AtomicLong observationCount : stripedObservationCounts) {
expectedBufferSize += observationCount.getAndSet(0) & ~bufferActiveBit;
}
reset = false;
} else {
expectedBufferSize = (int) (observationCount.addAndGet(bufferActiveBit) - expectedCount);
for (AtomicLong observationCount : stripedObservationCounts) {
expectedBufferSize += observationCount.addAndGet(bufferActiveBit);
}
}
expectedBufferSize -= expectedCount;

appendLock.lock();
try {
Expand Down