aboutsummaryrefslogtreecommitdiffstats
path: root/zkfacade
diff options
context:
space:
mode:
authorHåkon Hallingstad <hakon@verizonmedia.com>2020-09-26 18:33:19 +0200
committerHåkon Hallingstad <hakon@verizonmedia.com>2020-09-26 18:33:19 +0200
commitd7118a218d4704b52ed9883e4ff381a519adb7c1 (patch)
tree160c7b057390e18c5f18af9d40e17f64d9e3e7f0 /zkfacade
parente38ba64f8253746739674720035d11bdcda5e155 (diff)
Mock lock path from thread to per-lock (bug)
Diffstat (limited to 'zkfacade')
-rw-r--r--zkfacade/src/main/java/com/yahoo/vespa/curator/Lock.java16
-rw-r--r--zkfacade/src/main/java/com/yahoo/vespa/curator/stats/LockInfo.java30
-rw-r--r--zkfacade/src/main/java/com/yahoo/vespa/curator/stats/LockInfoSamples.java106
-rw-r--r--zkfacade/src/main/java/com/yahoo/vespa/curator/stats/ThreadLockInfo.java91
-rw-r--r--zkfacade/src/test/java/com/yahoo/vespa/curator/stats/LockInfoSamplesTest.java58
-rw-r--r--zkfacade/src/test/java/com/yahoo/vespa/curator/stats/LockTest.java2
6 files changed, 223 insertions, 80 deletions
diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/Lock.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/Lock.java
index 9af83223ae6..4239d325ba5 100644
--- a/zkfacade/src/main/java/com/yahoo/vespa/curator/Lock.java
+++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/Lock.java
@@ -35,23 +35,23 @@ public class Lock implements Mutex {
/** Take the lock with the given timeout. This may be called multiple times from the same thread - each matched by a close */
public void acquire(Duration timeout) throws UncheckedTimeoutException {
- ThreadLockInfo threadLockInfo = getThreadLockInfo();
- threadLockInfo.invokingAcquire(timeout);
+ ThreadLockInfo threadLockInfo = ThreadLockInfo.getCurrentThreadLockInfo();
+ threadLockInfo.invokingAcquire(lockPath, timeout);
final boolean acquired;
try {
acquired = mutex.acquire(timeout.toMillis(), TimeUnit.MILLISECONDS);
} catch (Exception e) {
- threadLockInfo.acquireFailed();
+ threadLockInfo.acquireFailed(lockPath);
throw new RuntimeException("Exception acquiring lock '" + lockPath + "'", e);
}
if (!acquired) {
- threadLockInfo.acquireTimedOut();
+ threadLockInfo.acquireTimedOut(lockPath);
throw new UncheckedTimeoutException("Timed out after waiting " + timeout +
" to acquire lock '" + lockPath + "'");
}
- threadLockInfo.lockAcquired();
+ threadLockInfo.lockAcquired(lockPath);
}
@Override
@@ -60,7 +60,7 @@ public class Lock implements Mutex {
}
private void release() {
- getThreadLockInfo().lockReleased();
+ ThreadLockInfo.getCurrentThreadLockInfo().lockReleased(lockPath);
try {
mutex.release();
}
@@ -68,10 +68,6 @@ public class Lock implements Mutex {
throw new RuntimeException("Exception releasing lock '" + lockPath + "'");
}
}
-
- private ThreadLockInfo getThreadLockInfo() {
- return ThreadLockInfo.getCurrentThreadLockInfo(lockPath);
- }
}
diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/LockInfo.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/LockInfo.java
index e9c238a40b9..f40c6f0498f 100644
--- a/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/LockInfo.java
+++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/LockInfo.java
@@ -16,15 +16,16 @@ import java.util.Optional;
public class LockInfo {
private final ThreadLockInfo threadLockInfo;
- private final Instant acquireInstant;
+ private final String lockPath;
+ private final Instant callAcquireInstant;
private final Duration timeout;
private volatile Optional<Instant> lockAcquiredInstant = Optional.empty();
private volatile Optional<Instant> terminalStateInstant = Optional.empty();
private volatile Optional<String> stackTrace = Optional.empty();
- public static LockInfo invokingAcquire(ThreadLockInfo threadLockInfo, Duration timeout) {
- return new LockInfo(threadLockInfo, timeout);
+ public static LockInfo invokingAcquire(ThreadLockInfo threadLockInfo, String lockPath, Duration timeout) {
+ return new LockInfo(threadLockInfo, lockPath, timeout, Instant.now());
}
public enum LockState {
@@ -39,15 +40,16 @@ public class LockInfo {
private volatile LockState lockState = LockState.ACQUIRING;
- private LockInfo(ThreadLockInfo threadLockInfo, Duration timeout) {
+ private LockInfo(ThreadLockInfo threadLockInfo, String lockPath, Duration timeout, Instant callAcquireInstant) {
this.threadLockInfo = threadLockInfo;
- this.acquireInstant = Instant.now();
+ this.lockPath = lockPath;
+ this.callAcquireInstant = callAcquireInstant;
this.timeout = timeout;
}
public String getThreadName() { return threadLockInfo.getThreadName(); }
- public String getLockPath() { return threadLockInfo.getLockPath(); }
- public Instant getTimeAcquiredWasInvoked() { return acquireInstant; }
+ public String getLockPath() { return lockPath; }
+ public Instant getTimeAcquiredWasInvoked() { return callAcquireInstant; }
public Duration getAcquireTimeout() { return timeout; }
public LockState getLockState() { return lockState; }
public Optional<Instant> getTimeLockWasAcquired() { return lockAcquiredInstant; }
@@ -55,7 +57,7 @@ public class LockInfo {
public Optional<String> getStackTrace() { return stackTrace; }
public Duration getDurationOfAcquire() {
- return Duration.between(acquireInstant, lockAcquiredInstant.orElseGet(Instant::now));
+ return Duration.between(callAcquireInstant, lockAcquiredInstant.orElseGet(Instant::now));
}
public Duration getDurationWithLock() {
@@ -64,11 +66,11 @@ public class LockInfo {
.orElse(Duration.ZERO);
}
- public Duration getDuration() { return Duration.between(acquireInstant, terminalStateInstant.orElseGet(Instant::now)); }
+ public Duration getDuration() { return Duration.between(callAcquireInstant, terminalStateInstant.orElseGet(Instant::now)); }
/** Get time from just before trying to acquire lock to the time the terminal state was reached, or ZERO. */
- public Duration getDurationInTerminalStateAndForPriorityQueue() {
- return terminalStateInstant.map(instant -> Duration.between(acquireInstant, instant)).orElse(Duration.ZERO);
+ public Duration getStableTotalDuration() {
+ return terminalStateInstant.map(instant -> Duration.between(callAcquireInstant, instant)).orElse(Duration.ZERO);
}
/** Fill in the stack trace starting at the caller's stack frame. */
@@ -88,8 +90,10 @@ public class LockInfo {
lockAcquiredInstant = Optional.of(Instant.now());
}
- void setTerminalState(LockState terminalState) {
+ void setTerminalState(LockState terminalState) { setTerminalState(terminalState, Instant.now()); }
+
+ void setTerminalState(LockState terminalState, Instant instant) {
lockState = terminalState;
- terminalStateInstant = Optional.of(Instant.now());
+ terminalStateInstant = Optional.of(instant);
}
}
diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/LockInfoSamples.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/LockInfoSamples.java
new file mode 100644
index 00000000000..ae84c5c984a
--- /dev/null
+++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/LockInfoSamples.java
@@ -0,0 +1,106 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.curator.stats;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+
+/**
+ * Collection containing "interesting" {@code LockInfo}s.
+ *
+ * @author hakon
+ */
+// @ThreadSafe
+public class LockInfoSamples {
+ private final int maxSamples;
+
+ /** Ensure atomic operations on this collection. */
+ private final Object monitor = new Object();
+
+ /** Keep at most one sample for each lock path. */
+ private final Map<String, LockInfo> byLockPath;
+
+ /**
+ * Priority queue containing all samples. The head of this queue (peek()/poll())
+ * returns the LockInfo with the smallest duration.
+ */
+ private final PriorityQueue<LockInfo> priorityQueue =
+ new PriorityQueue<>(Comparator.comparing(LockInfo::getStableTotalDuration));
+
+ LockInfoSamples() { this(10); }
+
+ LockInfoSamples(int maxSamples) {
+ this.maxSamples = maxSamples;
+ this.byLockPath = new HashMap<>(maxSamples);
+ }
+
+ int size() { return byLockPath.size(); }
+
+ boolean maybeSample(LockInfo lockInfo) {
+ final boolean added;
+ synchronized (monitor) {
+ if (shouldAdd(lockInfo)) {
+ byLockPath.put(lockInfo.getLockPath(), lockInfo);
+ priorityQueue.add(lockInfo);
+ added = true;
+ } else {
+ added = false;
+ }
+ }
+
+ if (added) {
+ // Unnecessary to invoke under synchronized, although it means that some samples
+ // may be without stack trace (just retry if that happens).
+ lockInfo.fillStackTrace();
+ }
+
+ return added;
+ }
+
+ private boolean shouldAdd(LockInfo lockInfo) {
+ LockInfo existingLockInfo = byLockPath.get(lockInfo.getLockPath());
+ if (existingLockInfo != null) {
+ if (hasLongerDurationThan(lockInfo, existingLockInfo)) {
+ byLockPath.remove(existingLockInfo.getLockPath());
+ priorityQueue.remove(existingLockInfo);
+ return true;
+ }
+
+ return false;
+ }
+
+ if (size() < maxSamples) {
+ return true;
+ }
+
+ // peek() and poll() retrieves the smallest element.
+ existingLockInfo = priorityQueue.peek(); // cannot be null
+ if (hasLongerDurationThan(lockInfo, existingLockInfo)) {
+ priorityQueue.poll();
+ byLockPath.remove(existingLockInfo.getLockPath());
+ return true;
+ }
+
+ return false;
+ }
+
+ List<LockInfo> asList() {
+ synchronized (monitor) {
+ return List.copyOf(byLockPath.values());
+ }
+ }
+
+ void clear() {
+ synchronized (monitor) {
+ byLockPath.clear();
+ priorityQueue.clear();
+ }
+ }
+
+ private static boolean hasLongerDurationThan(LockInfo lockInfo, LockInfo otherLockInfo) {
+ // Use stable total duration to avoid messing up priority queue.
+ return lockInfo.getStableTotalDuration().compareTo(otherLockInfo.getStableTotalDuration()) > 0;
+ }
+}
diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/ThreadLockInfo.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/ThreadLockInfo.java
index b796cb9af43..5280a049ceb 100644
--- a/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/ThreadLockInfo.java
+++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/ThreadLockInfo.java
@@ -4,14 +4,11 @@ package com.yahoo.vespa.curator.stats;
import com.yahoo.vespa.curator.Lock;
import java.time.Duration;
-import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.PriorityQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
/**
@@ -26,17 +23,11 @@ public class ThreadLockInfo {
private static final ConcurrentHashMap<Thread, ThreadLockInfo> locks = new ConcurrentHashMap<>();
- private static final int MAX_COMPLETED_LOCK_INFOS_SIZE = 5;
- /** Would have used a thread-safe priority queue. */
- private static final Object completedLockInfosMonitor = new Object();
- private static final PriorityQueue<LockInfo> completedLockInfos =
- new PriorityQueue<>(Comparator.comparing(LockInfo::getDurationInTerminalStateAndForPriorityQueue));
+ private static final LockInfoSamples completedLockInfoSamples = new LockInfoSamples();
private static final ConcurrentHashMap<String, LockCounters> countersByLockPath = new ConcurrentHashMap<>();
private final Thread thread;
- private final String lockPath;
- private final LockCounters lockCountersForPath;
/** The locks are reentrant so there may be more than 1 lock for this thread. */
private final ConcurrentLinkedQueue<LockInfo> lockInfos = new ConcurrentLinkedQueue<>();
@@ -45,36 +36,26 @@ public class ThreadLockInfo {
public static List<ThreadLockInfo> getThreadLockInfos() { return List.copyOf(locks.values()); }
- public static List<LockInfo> getSlowLockInfos() {
- synchronized (completedLockInfosMonitor) {
- return List.copyOf(completedLockInfos);
- }
+ public static List<LockInfo> getLockInfoSamples() {
+ return completedLockInfoSamples.asList();
}
/** Returns the per-thread singleton ThreadLockInfo. */
- public static ThreadLockInfo getCurrentThreadLockInfo(String lockPath) {
- return locks.computeIfAbsent(
- Thread.currentThread(),
- currentThread -> {
- LockCounters lockCounters = countersByLockPath.computeIfAbsent(lockPath, ignored -> new LockCounters());
- return new ThreadLockInfo(currentThread, lockPath, lockCounters);
- });
+ public static ThreadLockInfo getCurrentThreadLockInfo() {
+ return locks.computeIfAbsent(Thread.currentThread(), ThreadLockInfo::new);
}
static void clearStaticDataForTesting() {
locks.clear();
- completedLockInfos.clear();
+ completedLockInfoSamples.clear();
countersByLockPath.clear();
}
- ThreadLockInfo(Thread currentThread, String lockPath, LockCounters lockCountersForPath) {
+ ThreadLockInfo(Thread currentThread) {
this.thread = currentThread;
- this.lockPath = lockPath;
- this.lockCountersForPath = lockCountersForPath;
}
public String getThreadName() { return thread.getName(); }
- public String getLockPath() { return lockPath; }
public String getStackTrace() {
var stackTrace = new StringBuilder();
@@ -98,64 +79,62 @@ public class ThreadLockInfo {
public List<LockInfo> getLockInfos() { return List.copyOf(lockInfos); }
/** Mutable method (see class doc) */
- public void invokingAcquire(Duration timeout) {
- lockCountersForPath.invokeAcquireCount.incrementAndGet();
- lockCountersForPath.inCriticalRegionCount.incrementAndGet();
- lockInfos.add(LockInfo.invokingAcquire(this, timeout));
+ public void invokingAcquire(String lockPath, Duration timeout) {
+ LockCounters lockCounters = getLockCounters(lockPath);
+ lockCounters.invokeAcquireCount.incrementAndGet();
+ lockCounters.inCriticalRegionCount.incrementAndGet();
+ lockInfos.add(LockInfo.invokingAcquire(this, lockPath, timeout));
}
/** Mutable method (see class doc) */
- public void acquireFailed() {
- removeLastLockInfo(lockCountersForPath.acquireFailedCount, LockInfo::acquireFailed);
+ public void acquireFailed(String lockPath) {
+ LockCounters lockCounters = getLockCounters(lockPath);
+ lockCounters.acquireFailedCount.incrementAndGet();
+ removeLastLockInfo(lockCounters, LockInfo::acquireFailed);
}
/** Mutable method (see class doc) */
- public void acquireTimedOut() {
+ public void acquireTimedOut(String lockPath) {
+ LockCounters lockCounters = getLockCounters(lockPath);
if (lockInfos.size() > 1) {
- lockCountersForPath.timeoutOnReentrancyErrorCount.incrementAndGet();
+ lockCounters.timeoutOnReentrancyErrorCount.incrementAndGet();
}
- removeLastLockInfo(lockCountersForPath.acquireTimedOutCount, LockInfo::timedOut);
+ lockCounters.acquireTimedOutCount.incrementAndGet();
+ removeLastLockInfo(lockCounters, LockInfo::timedOut);
}
/** Mutable method (see class doc) */
- public void lockAcquired() {
- lockCountersForPath.lockAcquiredCount.incrementAndGet();
-
+ public void lockAcquired(String lockPath) {
+ getLockCounters(lockPath).lockAcquiredCount.incrementAndGet();
getLastLockInfo().ifPresent(LockInfo::lockAcquired);
}
/** Mutable method (see class doc) */
- public void lockReleased() {
- removeLastLockInfo(lockCountersForPath.locksReleasedCount, LockInfo::released);
+ public void lockReleased(String lockPath) {
+ LockCounters lockCounters = getLockCounters(lockPath);
+ lockCounters.locksReleasedCount.incrementAndGet();
+ removeLastLockInfo(lockCounters, LockInfo::released);
+ }
+
+ private LockCounters getLockCounters(String lockPath) {
+ return countersByLockPath.computeIfAbsent(lockPath, __ -> new LockCounters());
}
private Optional<LockInfo> getLastLockInfo() {
return lockInfos.isEmpty() ? Optional.empty() : Optional.of(lockInfos.peek());
}
- private void removeLastLockInfo(AtomicInteger metricToIncrement, Consumer<LockInfo> completeLockInfo) {
- metricToIncrement.incrementAndGet();
- lockCountersForPath.inCriticalRegionCount.decrementAndGet();
+ private void removeLastLockInfo(LockCounters lockCounters, Consumer<LockInfo> completeLockInfo) {
+ lockCounters.inCriticalRegionCount.decrementAndGet();
if (lockInfos.isEmpty()) {
- lockCountersForPath.noLocksErrorCount.incrementAndGet();
+ lockCounters.noLocksErrorCount.incrementAndGet();
return;
}
LockInfo lockInfo = lockInfos.poll();
completeLockInfo.accept(lockInfo);
-
- synchronized (completedLockInfosMonitor) {
- if (completedLockInfos.size() < MAX_COMPLETED_LOCK_INFOS_SIZE) {
- lockInfo.fillStackTrace();
- completedLockInfos.add(lockInfo);
- } else if (lockInfo.getDurationInTerminalStateAndForPriorityQueue()
- .compareTo(completedLockInfos.peek().getDurationInTerminalStateAndForPriorityQueue()) > 0) {
- completedLockInfos.poll();
- lockInfo.fillStackTrace();
- completedLockInfos.add(lockInfo);
- }
- }
+ completedLockInfoSamples.maybeSample(lockInfo);
}
}
diff --git a/zkfacade/src/test/java/com/yahoo/vespa/curator/stats/LockInfoSamplesTest.java b/zkfacade/src/test/java/com/yahoo/vespa/curator/stats/LockInfoSamplesTest.java
new file mode 100644
index 00000000000..4a14b0cc1b2
--- /dev/null
+++ b/zkfacade/src/test/java/com/yahoo/vespa/curator/stats/LockInfoSamplesTest.java
@@ -0,0 +1,58 @@
+package com.yahoo.vespa.curator.stats;// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+import org.junit.Test;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class LockInfoSamplesTest {
+ private final LockInfoSamples samples = new LockInfoSamples(2);
+ private ThreadLockInfo threadLockInfo;
+
+ @Test
+ public void test() {
+ threadLockInfo = new ThreadLockInfo(Thread.currentThread());
+
+ assertTrue(maybeSample("1", 10));
+
+ // new sample has longer duration
+ assertTrue(maybeSample("1", 11));
+
+ // new sample has shorter duration
+ assertFalse(maybeSample("1", 10));
+
+ // new path, will be added
+ assertTrue(maybeSample("2", 5));
+
+ // new path, too low duration be added
+ assertFalse(maybeSample("3", 4));
+
+ // new path, expels "2"
+ assertTrue(maybeSample("4", 6));
+
+ Map<String, LockInfo> lockInfos = samples.asList().stream().collect(Collectors.toMap(
+ lockInfo -> lockInfo.getLockPath(),
+ lockInfo -> lockInfo));
+ assertEquals(2, lockInfos.size());
+
+ assertTrue(lockInfos.containsKey("1"));
+ assertEquals(Duration.ofSeconds(11), lockInfos.get("1").getStableTotalDuration());
+
+ assertTrue(lockInfos.containsKey("4"));
+ assertEquals(Duration.ofSeconds(6), lockInfos.get("4").getStableTotalDuration());
+ }
+
+ private boolean maybeSample(String lockPath, int secondsDuration) {
+ LockInfo lockInfo = LockInfo.invokingAcquire(threadLockInfo, lockPath, Duration.ofSeconds(1));
+ Instant instant = lockInfo.getTimeAcquiredWasInvoked().plus(Duration.ofSeconds(secondsDuration));
+ lockInfo.setTerminalState(LockInfo.LockState.RELEASED, instant);
+ return samples.maybeSample(lockInfo);
+ }
+
+} \ No newline at end of file
diff --git a/zkfacade/src/test/java/com/yahoo/vespa/curator/stats/LockTest.java b/zkfacade/src/test/java/com/yahoo/vespa/curator/stats/LockTest.java
index 23b603eca5c..4b9b6a4429b 100644
--- a/zkfacade/src/test/java/com/yahoo/vespa/curator/stats/LockTest.java
+++ b/zkfacade/src/test/java/com/yahoo/vespa/curator/stats/LockTest.java
@@ -47,7 +47,7 @@ public class LockTest {
expectedCounters.acquireFailedCount.set(1);
assertEquals(Map.of(lockPath, expectedCounters), ThreadLockInfo.getLockCountersByPath());
- List<LockInfo> slowLockInfos = ThreadLockInfo.getSlowLockInfos();
+ List<LockInfo> slowLockInfos = ThreadLockInfo.getLockInfoSamples();
assertEquals(1, slowLockInfos.size());
LockInfo slowLockInfo = slowLockInfos.get(0);
assertEquals(acquireTimeout, slowLockInfo.getAcquireTimeout());