diff options
author | Håkon Hallingstad <hakon@verizonmedia.com> | 2020-09-26 18:33:19 +0200 |
---|---|---|
committer | Håkon Hallingstad <hakon@verizonmedia.com> | 2020-09-26 18:33:19 +0200 |
commit | d7118a218d4704b52ed9883e4ff381a519adb7c1 (patch) | |
tree | 160c7b057390e18c5f18af9d40e17f64d9e3e7f0 /zkfacade | |
parent | e38ba64f8253746739674720035d11bdcda5e155 (diff) |
Mock lock path from thread to per-lock (bug)
Diffstat (limited to 'zkfacade')
6 files changed, 223 insertions, 80 deletions
diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/Lock.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/Lock.java index 9af83223ae6..4239d325ba5 100644 --- a/zkfacade/src/main/java/com/yahoo/vespa/curator/Lock.java +++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/Lock.java @@ -35,23 +35,23 @@ public class Lock implements Mutex { /** Take the lock with the given timeout. This may be called multiple times from the same thread - each matched by a close */ public void acquire(Duration timeout) throws UncheckedTimeoutException { - ThreadLockInfo threadLockInfo = getThreadLockInfo(); - threadLockInfo.invokingAcquire(timeout); + ThreadLockInfo threadLockInfo = ThreadLockInfo.getCurrentThreadLockInfo(); + threadLockInfo.invokingAcquire(lockPath, timeout); final boolean acquired; try { acquired = mutex.acquire(timeout.toMillis(), TimeUnit.MILLISECONDS); } catch (Exception e) { - threadLockInfo.acquireFailed(); + threadLockInfo.acquireFailed(lockPath); throw new RuntimeException("Exception acquiring lock '" + lockPath + "'", e); } if (!acquired) { - threadLockInfo.acquireTimedOut(); + threadLockInfo.acquireTimedOut(lockPath); throw new UncheckedTimeoutException("Timed out after waiting " + timeout + " to acquire lock '" + lockPath + "'"); } - threadLockInfo.lockAcquired(); + threadLockInfo.lockAcquired(lockPath); } @Override @@ -60,7 +60,7 @@ public class Lock implements Mutex { } private void release() { - getThreadLockInfo().lockReleased(); + ThreadLockInfo.getCurrentThreadLockInfo().lockReleased(lockPath); try { mutex.release(); } @@ -68,10 +68,6 @@ public class Lock implements Mutex { throw new RuntimeException("Exception releasing lock '" + lockPath + "'"); } } - - private ThreadLockInfo getThreadLockInfo() { - return ThreadLockInfo.getCurrentThreadLockInfo(lockPath); - } } diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/LockInfo.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/LockInfo.java index e9c238a40b9..f40c6f0498f 100644 --- a/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/LockInfo.java +++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/LockInfo.java @@ -16,15 +16,16 @@ import java.util.Optional; public class LockInfo { private final ThreadLockInfo threadLockInfo; - private final Instant acquireInstant; + private final String lockPath; + private final Instant callAcquireInstant; private final Duration timeout; private volatile Optional<Instant> lockAcquiredInstant = Optional.empty(); private volatile Optional<Instant> terminalStateInstant = Optional.empty(); private volatile Optional<String> stackTrace = Optional.empty(); - public static LockInfo invokingAcquire(ThreadLockInfo threadLockInfo, Duration timeout) { - return new LockInfo(threadLockInfo, timeout); + public static LockInfo invokingAcquire(ThreadLockInfo threadLockInfo, String lockPath, Duration timeout) { + return new LockInfo(threadLockInfo, lockPath, timeout, Instant.now()); } public enum LockState { @@ -39,15 +40,16 @@ public class LockInfo { private volatile LockState lockState = LockState.ACQUIRING; - private LockInfo(ThreadLockInfo threadLockInfo, Duration timeout) { + private LockInfo(ThreadLockInfo threadLockInfo, String lockPath, Duration timeout, Instant callAcquireInstant) { this.threadLockInfo = threadLockInfo; - this.acquireInstant = Instant.now(); + this.lockPath = lockPath; + this.callAcquireInstant = callAcquireInstant; this.timeout = timeout; } public String getThreadName() { return threadLockInfo.getThreadName(); } - public String getLockPath() { return threadLockInfo.getLockPath(); } - public Instant getTimeAcquiredWasInvoked() { return acquireInstant; } + public String getLockPath() { return lockPath; } + public Instant getTimeAcquiredWasInvoked() { return callAcquireInstant; } public Duration getAcquireTimeout() { return timeout; } public LockState getLockState() { return lockState; } public Optional<Instant> getTimeLockWasAcquired() { return lockAcquiredInstant; } @@ -55,7 +57,7 @@ public class LockInfo { public Optional<String> getStackTrace() { return stackTrace; } public Duration getDurationOfAcquire() { - return Duration.between(acquireInstant, lockAcquiredInstant.orElseGet(Instant::now)); + return Duration.between(callAcquireInstant, lockAcquiredInstant.orElseGet(Instant::now)); } public Duration getDurationWithLock() { @@ -64,11 +66,11 @@ public class LockInfo { .orElse(Duration.ZERO); } - public Duration getDuration() { return Duration.between(acquireInstant, terminalStateInstant.orElseGet(Instant::now)); } + public Duration getDuration() { return Duration.between(callAcquireInstant, terminalStateInstant.orElseGet(Instant::now)); } /** Get time from just before trying to acquire lock to the time the terminal state was reached, or ZERO. */ - public Duration getDurationInTerminalStateAndForPriorityQueue() { - return terminalStateInstant.map(instant -> Duration.between(acquireInstant, instant)).orElse(Duration.ZERO); + public Duration getStableTotalDuration() { + return terminalStateInstant.map(instant -> Duration.between(callAcquireInstant, instant)).orElse(Duration.ZERO); } /** Fill in the stack trace starting at the caller's stack frame. */ @@ -88,8 +90,10 @@ public class LockInfo { lockAcquiredInstant = Optional.of(Instant.now()); } - void setTerminalState(LockState terminalState) { + void setTerminalState(LockState terminalState) { setTerminalState(terminalState, Instant.now()); } + + void setTerminalState(LockState terminalState, Instant instant) { lockState = terminalState; - terminalStateInstant = Optional.of(Instant.now()); + terminalStateInstant = Optional.of(instant); } } diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/LockInfoSamples.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/LockInfoSamples.java new file mode 100644 index 00000000000..ae84c5c984a --- /dev/null +++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/LockInfoSamples.java @@ -0,0 +1,106 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.curator.stats; + +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; + +/** + * Collection containing "interesting" {@code LockInfo}s. + * + * @author hakon + */ +// @ThreadSafe +public class LockInfoSamples { + private final int maxSamples; + + /** Ensure atomic operations on this collection. */ + private final Object monitor = new Object(); + + /** Keep at most one sample for each lock path. */ + private final Map<String, LockInfo> byLockPath; + + /** + * Priority queue containing all samples. The head of this queue (peek()/poll()) + * returns the LockInfo with the smallest duration. + */ + private final PriorityQueue<LockInfo> priorityQueue = + new PriorityQueue<>(Comparator.comparing(LockInfo::getStableTotalDuration)); + + LockInfoSamples() { this(10); } + + LockInfoSamples(int maxSamples) { + this.maxSamples = maxSamples; + this.byLockPath = new HashMap<>(maxSamples); + } + + int size() { return byLockPath.size(); } + + boolean maybeSample(LockInfo lockInfo) { + final boolean added; + synchronized (monitor) { + if (shouldAdd(lockInfo)) { + byLockPath.put(lockInfo.getLockPath(), lockInfo); + priorityQueue.add(lockInfo); + added = true; + } else { + added = false; + } + } + + if (added) { + // Unnecessary to invoke under synchronized, although it means that some samples + // may be without stack trace (just retry if that happens). + lockInfo.fillStackTrace(); + } + + return added; + } + + private boolean shouldAdd(LockInfo lockInfo) { + LockInfo existingLockInfo = byLockPath.get(lockInfo.getLockPath()); + if (existingLockInfo != null) { + if (hasLongerDurationThan(lockInfo, existingLockInfo)) { + byLockPath.remove(existingLockInfo.getLockPath()); + priorityQueue.remove(existingLockInfo); + return true; + } + + return false; + } + + if (size() < maxSamples) { + return true; + } + + // peek() and poll() retrieves the smallest element. + existingLockInfo = priorityQueue.peek(); // cannot be null + if (hasLongerDurationThan(lockInfo, existingLockInfo)) { + priorityQueue.poll(); + byLockPath.remove(existingLockInfo.getLockPath()); + return true; + } + + return false; + } + + List<LockInfo> asList() { + synchronized (monitor) { + return List.copyOf(byLockPath.values()); + } + } + + void clear() { + synchronized (monitor) { + byLockPath.clear(); + priorityQueue.clear(); + } + } + + private static boolean hasLongerDurationThan(LockInfo lockInfo, LockInfo otherLockInfo) { + // Use stable total duration to avoid messing up priority queue. + return lockInfo.getStableTotalDuration().compareTo(otherLockInfo.getStableTotalDuration()) > 0; + } +} diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/ThreadLockInfo.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/ThreadLockInfo.java index b796cb9af43..5280a049ceb 100644 --- a/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/ThreadLockInfo.java +++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/ThreadLockInfo.java @@ -4,14 +4,11 @@ package com.yahoo.vespa.curator.stats; import com.yahoo.vespa.curator.Lock; import java.time.Duration; -import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.PriorityQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; /** @@ -26,17 +23,11 @@ public class ThreadLockInfo { private static final ConcurrentHashMap<Thread, ThreadLockInfo> locks = new ConcurrentHashMap<>(); - private static final int MAX_COMPLETED_LOCK_INFOS_SIZE = 5; - /** Would have used a thread-safe priority queue. */ - private static final Object completedLockInfosMonitor = new Object(); - private static final PriorityQueue<LockInfo> completedLockInfos = - new PriorityQueue<>(Comparator.comparing(LockInfo::getDurationInTerminalStateAndForPriorityQueue)); + private static final LockInfoSamples completedLockInfoSamples = new LockInfoSamples(); private static final ConcurrentHashMap<String, LockCounters> countersByLockPath = new ConcurrentHashMap<>(); private final Thread thread; - private final String lockPath; - private final LockCounters lockCountersForPath; /** The locks are reentrant so there may be more than 1 lock for this thread. */ private final ConcurrentLinkedQueue<LockInfo> lockInfos = new ConcurrentLinkedQueue<>(); @@ -45,36 +36,26 @@ public class ThreadLockInfo { public static List<ThreadLockInfo> getThreadLockInfos() { return List.copyOf(locks.values()); } - public static List<LockInfo> getSlowLockInfos() { - synchronized (completedLockInfosMonitor) { - return List.copyOf(completedLockInfos); - } + public static List<LockInfo> getLockInfoSamples() { + return completedLockInfoSamples.asList(); } /** Returns the per-thread singleton ThreadLockInfo. */ - public static ThreadLockInfo getCurrentThreadLockInfo(String lockPath) { - return locks.computeIfAbsent( - Thread.currentThread(), - currentThread -> { - LockCounters lockCounters = countersByLockPath.computeIfAbsent(lockPath, ignored -> new LockCounters()); - return new ThreadLockInfo(currentThread, lockPath, lockCounters); - }); + public static ThreadLockInfo getCurrentThreadLockInfo() { + return locks.computeIfAbsent(Thread.currentThread(), ThreadLockInfo::new); } static void clearStaticDataForTesting() { locks.clear(); - completedLockInfos.clear(); + completedLockInfoSamples.clear(); countersByLockPath.clear(); } - ThreadLockInfo(Thread currentThread, String lockPath, LockCounters lockCountersForPath) { + ThreadLockInfo(Thread currentThread) { this.thread = currentThread; - this.lockPath = lockPath; - this.lockCountersForPath = lockCountersForPath; } public String getThreadName() { return thread.getName(); } - public String getLockPath() { return lockPath; } public String getStackTrace() { var stackTrace = new StringBuilder(); @@ -98,64 +79,62 @@ public class ThreadLockInfo { public List<LockInfo> getLockInfos() { return List.copyOf(lockInfos); } /** Mutable method (see class doc) */ - public void invokingAcquire(Duration timeout) { - lockCountersForPath.invokeAcquireCount.incrementAndGet(); - lockCountersForPath.inCriticalRegionCount.incrementAndGet(); - lockInfos.add(LockInfo.invokingAcquire(this, timeout)); + public void invokingAcquire(String lockPath, Duration timeout) { + LockCounters lockCounters = getLockCounters(lockPath); + lockCounters.invokeAcquireCount.incrementAndGet(); + lockCounters.inCriticalRegionCount.incrementAndGet(); + lockInfos.add(LockInfo.invokingAcquire(this, lockPath, timeout)); } /** Mutable method (see class doc) */ - public void acquireFailed() { - removeLastLockInfo(lockCountersForPath.acquireFailedCount, LockInfo::acquireFailed); + public void acquireFailed(String lockPath) { + LockCounters lockCounters = getLockCounters(lockPath); + lockCounters.acquireFailedCount.incrementAndGet(); + removeLastLockInfo(lockCounters, LockInfo::acquireFailed); } /** Mutable method (see class doc) */ - public void acquireTimedOut() { + public void acquireTimedOut(String lockPath) { + LockCounters lockCounters = getLockCounters(lockPath); if (lockInfos.size() > 1) { - lockCountersForPath.timeoutOnReentrancyErrorCount.incrementAndGet(); + lockCounters.timeoutOnReentrancyErrorCount.incrementAndGet(); } - removeLastLockInfo(lockCountersForPath.acquireTimedOutCount, LockInfo::timedOut); + lockCounters.acquireTimedOutCount.incrementAndGet(); + removeLastLockInfo(lockCounters, LockInfo::timedOut); } /** Mutable method (see class doc) */ - public void lockAcquired() { - lockCountersForPath.lockAcquiredCount.incrementAndGet(); - + public void lockAcquired(String lockPath) { + getLockCounters(lockPath).lockAcquiredCount.incrementAndGet(); getLastLockInfo().ifPresent(LockInfo::lockAcquired); } /** Mutable method (see class doc) */ - public void lockReleased() { - removeLastLockInfo(lockCountersForPath.locksReleasedCount, LockInfo::released); + public void lockReleased(String lockPath) { + LockCounters lockCounters = getLockCounters(lockPath); + lockCounters.locksReleasedCount.incrementAndGet(); + removeLastLockInfo(lockCounters, LockInfo::released); + } + + private LockCounters getLockCounters(String lockPath) { + return countersByLockPath.computeIfAbsent(lockPath, __ -> new LockCounters()); } private Optional<LockInfo> getLastLockInfo() { return lockInfos.isEmpty() ? Optional.empty() : Optional.of(lockInfos.peek()); } - private void removeLastLockInfo(AtomicInteger metricToIncrement, Consumer<LockInfo> completeLockInfo) { - metricToIncrement.incrementAndGet(); - lockCountersForPath.inCriticalRegionCount.decrementAndGet(); + private void removeLastLockInfo(LockCounters lockCounters, Consumer<LockInfo> completeLockInfo) { + lockCounters.inCriticalRegionCount.decrementAndGet(); if (lockInfos.isEmpty()) { - lockCountersForPath.noLocksErrorCount.incrementAndGet(); + lockCounters.noLocksErrorCount.incrementAndGet(); return; } LockInfo lockInfo = lockInfos.poll(); completeLockInfo.accept(lockInfo); - - synchronized (completedLockInfosMonitor) { - if (completedLockInfos.size() < MAX_COMPLETED_LOCK_INFOS_SIZE) { - lockInfo.fillStackTrace(); - completedLockInfos.add(lockInfo); - } else if (lockInfo.getDurationInTerminalStateAndForPriorityQueue() - .compareTo(completedLockInfos.peek().getDurationInTerminalStateAndForPriorityQueue()) > 0) { - completedLockInfos.poll(); - lockInfo.fillStackTrace(); - completedLockInfos.add(lockInfo); - } - } + completedLockInfoSamples.maybeSample(lockInfo); } } diff --git a/zkfacade/src/test/java/com/yahoo/vespa/curator/stats/LockInfoSamplesTest.java b/zkfacade/src/test/java/com/yahoo/vespa/curator/stats/LockInfoSamplesTest.java new file mode 100644 index 00000000000..4a14b0cc1b2 --- /dev/null +++ b/zkfacade/src/test/java/com/yahoo/vespa/curator/stats/LockInfoSamplesTest.java @@ -0,0 +1,58 @@ +package com.yahoo.vespa.curator.stats;// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +import org.junit.Test; + +import java.time.Duration; +import java.time.Instant; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class LockInfoSamplesTest { + private final LockInfoSamples samples = new LockInfoSamples(2); + private ThreadLockInfo threadLockInfo; + + @Test + public void test() { + threadLockInfo = new ThreadLockInfo(Thread.currentThread()); + + assertTrue(maybeSample("1", 10)); + + // new sample has longer duration + assertTrue(maybeSample("1", 11)); + + // new sample has shorter duration + assertFalse(maybeSample("1", 10)); + + // new path, will be added + assertTrue(maybeSample("2", 5)); + + // new path, too low duration be added + assertFalse(maybeSample("3", 4)); + + // new path, expels "2" + assertTrue(maybeSample("4", 6)); + + Map<String, LockInfo> lockInfos = samples.asList().stream().collect(Collectors.toMap( + lockInfo -> lockInfo.getLockPath(), + lockInfo -> lockInfo)); + assertEquals(2, lockInfos.size()); + + assertTrue(lockInfos.containsKey("1")); + assertEquals(Duration.ofSeconds(11), lockInfos.get("1").getStableTotalDuration()); + + assertTrue(lockInfos.containsKey("4")); + assertEquals(Duration.ofSeconds(6), lockInfos.get("4").getStableTotalDuration()); + } + + private boolean maybeSample(String lockPath, int secondsDuration) { + LockInfo lockInfo = LockInfo.invokingAcquire(threadLockInfo, lockPath, Duration.ofSeconds(1)); + Instant instant = lockInfo.getTimeAcquiredWasInvoked().plus(Duration.ofSeconds(secondsDuration)); + lockInfo.setTerminalState(LockInfo.LockState.RELEASED, instant); + return samples.maybeSample(lockInfo); + } + +}
\ No newline at end of file diff --git a/zkfacade/src/test/java/com/yahoo/vespa/curator/stats/LockTest.java b/zkfacade/src/test/java/com/yahoo/vespa/curator/stats/LockTest.java index 23b603eca5c..4b9b6a4429b 100644 --- a/zkfacade/src/test/java/com/yahoo/vespa/curator/stats/LockTest.java +++ b/zkfacade/src/test/java/com/yahoo/vespa/curator/stats/LockTest.java @@ -47,7 +47,7 @@ public class LockTest { expectedCounters.acquireFailedCount.set(1); assertEquals(Map.of(lockPath, expectedCounters), ThreadLockInfo.getLockCountersByPath()); - List<LockInfo> slowLockInfos = ThreadLockInfo.getSlowLockInfos(); + List<LockInfo> slowLockInfos = ThreadLockInfo.getLockInfoSamples(); assertEquals(1, slowLockInfos.size()); LockInfo slowLockInfo = slowLockInfos.get(0); assertEquals(acquireTimeout, slowLockInfo.getAcquireTimeout()); |