diff options
Diffstat (limited to 'zkfacade/src')
4 files changed, 199 insertions, 17 deletions
diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/LockAttempt.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/LockAttempt.java index 9cf490bf8c6..a7a75a283f1 100644 --- a/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/LockAttempt.java +++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/LockAttempt.java @@ -32,12 +32,6 @@ public class LockAttempt { private volatile Optional<Instant> terminalStateInstant = Optional.empty(); private volatile Optional<String> stackTrace = Optional.empty(); - public static LockAttempt invokingAcquire(ThreadLockStats threadLockStats, String lockPath, - Duration timeout, LockMetrics lockMetrics, - boolean reentry) { - return new LockAttempt(threadLockStats, lockPath, timeout, Instant.now(), lockMetrics, reentry); - } - public enum LockState { ACQUIRING(false), ACQUIRE_FAILED(true), TIMED_OUT(true), ACQUIRED(false), RELEASED(true), RELEASED_WITH_ERROR(true); @@ -51,6 +45,12 @@ public class LockAttempt { private volatile LockState lockState = LockState.ACQUIRING; + public static LockAttempt invokingAcquire(ThreadLockStats threadLockStats, String lockPath, + Duration timeout, LockMetrics lockMetrics, + boolean reentry) { + return new LockAttempt(threadLockStats, lockPath, timeout, Instant.now(), lockMetrics, reentry); + } + private LockAttempt(ThreadLockStats threadLockStats, String lockPath, Duration timeout, Instant callAcquireInstant, LockMetrics lockMetrics, boolean reentry) { this.threadLockStats = threadLockStats; @@ -66,8 +66,10 @@ public class LockAttempt { public String getLockPath() { return lockPath; } public Instant getTimeAcquiredWasInvoked() { return callAcquireInstant; } public Duration getAcquireTimeout() { return timeout; } + public boolean getReentry() { return reentry; } public LockState getLockState() { return lockState; } public Optional<Instant> getTimeLockWasAcquired() { return lockAcquiredInstant; } + public boolean isAcquiring() { return lockAcquiredInstant.isEmpty(); } public Instant getTimeAcquireEndedOrNow() { return lockAcquiredInstant.orElseGet(() -> getTimeTerminalStateWasReached().orElseGet(Instant::now)); } diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/LockStats.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/LockStats.java index ee464e0918d..2ed6655d68f 100644 --- a/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/LockStats.java +++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/LockStats.java @@ -4,6 +4,7 @@ package com.yahoo.vespa.curator.stats; import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.PriorityQueue; import java.util.concurrent.ConcurrentHashMap; @@ -18,6 +19,9 @@ public class LockStats { private final ConcurrentHashMap<Thread, ThreadLockStats> statsByThread = new ConcurrentHashMap<>(); + /** Modified only by Thread actually holding the lock on the path (key). */ + private final ConcurrentHashMap<String, Thread> lockPathsHeld = new ConcurrentHashMap<>(); + private final LockAttemptSamples completedLockAttemptSamples = new LockAttemptSamples(3); // Keep recordings in a priority queue, with the smallest element having the smallest duration. @@ -33,9 +37,7 @@ public class LockStats { public static LockStats getGlobal() { return stats; } /** Returns stats tied to the current thread. */ - public static ThreadLockStats getForCurrentThread() { - return stats.statsByThread.computeIfAbsent(Thread.currentThread(), ThreadLockStats::new); - } + public static ThreadLockStats getForCurrentThread() { return stats.getForThread(Thread.currentThread()); } public static void clearForTesting() { stats = new LockStats(); @@ -53,6 +55,42 @@ public class LockStats { } } + /** Non-private for testing. */ + ThreadLockStats getForThread(Thread thread) { + return statsByThread.computeIfAbsent(thread, ThreadLockStats::new); + } + + /** Must be invoked only after the first and non-reentry acquisition of the lock. */ + void notifyOfThreadHoldingLock(Thread currentThread, String lockPath) { + Thread oldThread = lockPathsHeld.put(lockPath, currentThread); + if (oldThread != null) { + throw new IllegalStateException("Thread " + currentThread.getName() + + " reports it has the lock on " + lockPath + ", but thread " + oldThread.getName() + + " has not reported it released the lock"); + } + } + + /** Must be invoked only before the last and non-reentry release of the lock. */ + void notifyOfThreadReleasingLock(Thread currentThread, String lockPath) { + Thread oldThread = lockPathsHeld.remove(lockPath); + if (oldThread == null) { + throw new IllegalStateException("Thread " + currentThread.getName() + + " is releasing the lock " + lockPath + ", but nobody own that lock"); + } else if (oldThread != currentThread) { + throw new IllegalStateException("Thread " + currentThread.getName() + + " is releasing the lock " + lockPath + ", but it was owned by thread " + + oldThread.getName()); + } + } + + /** + * Returns the ThreadLockStats holding the lock on the path, but the info may be outdated already + * on return, either no-one holds the lock or another thread may hold the lock. + */ + Optional<ThreadLockStats> getThreadLockStatsHolding(String lockPath) { + return Optional.ofNullable(lockPathsHeld.get(lockPath)).map(statsByThread::get); + } + LockMetrics getLockMetrics(String lockPath) { return metricsByLockPath.computeIfAbsent(lockPath, __ -> new LockMetrics()); } diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/ThreadLockStats.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/ThreadLockStats.java index 393fac5e3db..f38811c4b3c 100644 --- a/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/ThreadLockStats.java +++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/ThreadLockStats.java @@ -1,6 +1,7 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.curator.stats; +import com.google.common.util.concurrent.UncheckedTimeoutException; import com.yahoo.vespa.curator.Lock; import java.time.Duration; @@ -59,13 +60,19 @@ public class ThreadLockStats { } public List<LockAttempt> getOngoingLockAttempts() { return List.copyOf(lockAttemptsStack); } - public Optional<LockAttempt> getTopMostOngoingLockAttempt() { return lockAttemptsStack.stream().findFirst(); } + public Optional<LockAttempt> getTopMostOngoingLockAttempt() { return Optional.ofNullable(lockAttemptsStack.peekFirst()); } + /** The most recent and deeply nested ongoing lock attempt. */ + public Optional<LockAttempt> getBottomMostOngoingLockAttempt() { return Optional.ofNullable(lockAttemptsStack.peekLast()); } public Optional<RecordedLockAttempts> getOngoingRecording() { return ongoingRecording; } /** Mutable method (see class doc) */ public void invokingAcquire(String lockPath, Duration timeout) { boolean reentry = lockAttemptsStack.stream().anyMatch(lockAttempt -> lockAttempt.getLockPath().equals(lockPath)); + if (!reentry) { + throwOnDeadlock(lockPath); + } + LockAttempt lockAttempt = LockAttempt.invokingAcquire(this, lockPath, timeout, getGlobalLockMetrics(lockPath), reentry); @@ -90,12 +97,30 @@ public class ThreadLockStats { /** Mutable method (see class doc) */ public void lockAcquired() { - withLastLockAttempt(LockAttempt::lockAcquired); + withLastLockAttempt(lockAttempt -> { + // Note on the order of lockAcquired() vs notifyOfThreadHoldingLock(): When the latter is + // invoked, other threads may query e.g. isAcquired() on the lockAttempt, which would + // return false in a small window if these two statements were reversed. Not a biggie, + // but seems better to ensure LockAttempt is updated first. + lockAttempt.lockAcquired(); + + if (!lockAttempt.getReentry()) { + LockStats.getGlobal().notifyOfThreadHoldingLock(thread, lockAttempt.getLockPath()); + } + }); } /** Mutable method (see class doc) */ public void preRelease() { - withLastLockAttempt(LockAttempt::preRelease); + withLastLockAttempt(lockAttempt -> { + // Note on the order of these two statement: Same concerns apply here as in lockAcquired(). + + if (!lockAttempt.getReentry()) { + LockStats.getGlobal().notifyOfThreadReleasingLock(thread, lockAttempt.getLockPath()); + } + + lockAttempt.preRelease(); + }); } /** Mutable method (see class doc) */ @@ -125,10 +150,62 @@ public class ThreadLockStats { } } + /** + * Throws a timeout exception if acquiring the path would cause a deadlock. + * + * <p>Thread T0 will deadlock if it tries to acquire a lock on a path L1 held by T1, + * and T1 is waiting on L2 held by T2, and so forth, and TN is waiting on L0 held by T0.</p> + * + * <p>This method is a best-effort attempt at detecting deadlocks: A deadlock may in fact be + * resolved even though this method throws, if e.g. locks are released just after this + * method </p> + * + * @throws com.google.common.util.concurrent.UncheckedTimeoutException + */ + private void throwOnDeadlock(String pathToAcquire) { + LockStats globalLockStats = LockStats.getGlobal(); + var errorMessage = new StringBuilder().append("Deadlock detected: Thread ").append(thread.getName()); + + String lockPath = pathToAcquire; + while (true) { + Optional<ThreadLockStats> threadLockStats = globalLockStats.getThreadLockStatsHolding(lockPath); + if (threadLockStats.isEmpty()) { + return; + } + + errorMessage.append(", trying to acquire lock ") + .append(lockPath) + .append(" held by thread ") + .append(threadLockStats.get().thread.getName()); + + if (threadLockStats.get().thread == thread) { + if (lockPath.equals(pathToAcquire)) { + // reentry, ignore + return; + } else { + throw new UncheckedTimeoutException(errorMessage.toString()); + } + } + + Optional<String> nextLockPath = threadLockStats.get().acquiringLockPath(); + if (nextLockPath.isEmpty()) { + return; + } + + lockPath = nextLockPath.get(); + } + } + private LockMetrics getGlobalLockMetrics(String lockPath) { return LockStats.getGlobal().getLockMetrics(lockPath); } + private Optional<String> acquiringLockPath() { + return Optional.ofNullable(lockAttemptsStack.peekLast()) + .filter(LockAttempt::isAcquiring) + .map(LockAttempt::getLockPath); + } + private void withLastLockAttempt(Consumer<LockAttempt> lockAttemptConsumer) { LockAttempt lockAttempt = lockAttemptsStack.peekLast(); if (lockAttempt == null) { diff --git a/zkfacade/src/test/java/com/yahoo/vespa/curator/stats/LockTest.java b/zkfacade/src/test/java/com/yahoo/vespa/curator/stats/LockTest.java index f440c2cfad8..c28691fe655 100644 --- a/zkfacade/src/test/java/com/yahoo/vespa/curator/stats/LockTest.java +++ b/zkfacade/src/test/java/com/yahoo/vespa/curator/stats/LockTest.java @@ -1,6 +1,7 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.curator.stats; +import com.google.common.util.concurrent.UncheckedTimeoutException; import com.yahoo.vespa.curator.Lock; import org.apache.curator.framework.recipes.locks.InterProcessLock; import org.junit.Before; @@ -9,6 +10,9 @@ import org.junit.Test; import java.time.Duration; import java.util.List; import java.util.Optional; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -27,7 +31,7 @@ public class LockTest { private final InterProcessLock mutex = mock(InterProcessLock.class); private final String lockPath = "/lock/path"; private final String lock2Path = "/lock2/path"; - private final Duration acquireTimeout = Duration.ofSeconds(10); + private static final Duration acquireTimeout = Duration.ofMinutes(10); private final Lock lock = new Lock(lockPath, mutex); private final Lock lock2 = new Lock(lock2Path, mutex); @@ -176,9 +180,6 @@ public class LockTest { public void nestedLocks() throws Exception { when(mutex.acquire(anyLong(), any())).thenReturn(true); - String lockPath2 = "/lock/path/2"; - Lock lock2 = new Lock(lockPath2, mutex); - lock.acquire(acquireTimeout); lock2.acquire(acquireTimeout); @@ -188,10 +189,74 @@ public class LockTest { assertEquals(2, lockAttempts.size()); assertEquals(lockPath, lockAttempts.get(0).getLockPath()); assertEquals(LockAttempt.LockState.ACQUIRED, lockAttempts.get(0).getLockState()); - assertEquals(lockPath2, lockAttempts.get(1).getLockPath()); + assertEquals(lock2Path, lockAttempts.get(1).getLockPath()); assertEquals(LockAttempt.LockState.ACQUIRED, lockAttempts.get(1).getLockState()); lock.close(); lock.close(); } + + @Test + public void deadlock() throws Exception { + var lockPath1 = "/lock/path/1"; + var lockPath2 = "/lock/path/2"; + + var lock1 = new Lock(lockPath1, new InterProcessMutexMock()); + var lock2 = new Lock(lockPath2, new InterProcessMutexMock()); + + lock2.acquire(acquireTimeout); + + Thread thread = Executors.defaultThreadFactory().newThread(() -> threadMain(lock1, lock2)); + thread.setName("LockTest-async-thread"); + thread.start(); + + LockStats globalStats = LockStats.getGlobal(); + ThreadLockStats asyncThreadStats = globalStats.getForThread(thread); + while (true) { + Optional<LockAttempt> bottomMostOngoingLockAttempt = asyncThreadStats.getBottomMostOngoingLockAttempt(); + if (bottomMostOngoingLockAttempt.isPresent() && + bottomMostOngoingLockAttempt.get().getLockPath().equals(lockPath2)) { + break; + } + + try { + Thread.sleep(1); + } catch (InterruptedException e) { } + } + + try { + lock1.acquire(acquireTimeout); + fail(); + } catch (UncheckedTimeoutException e) { + assertEquals( + "Unexpected timeout exception message: " + e.getMessage(), + "Deadlock detected: Thread main, " + + "trying to acquire lock /lock/path/1 held by thread LockTest-async-thread, " + + "trying to acquire lock /lock/path/2 held by thread main", + e.getMessage()); + } + + // Unlock, which unblocks thread + lock2.close(); + thread.join(); + } + + private static void threadMain(Lock lock1, Lock lock2) { + lock1.acquire(acquireTimeout); + + // This will block + lock2.acquire(acquireTimeout); + + lock2.close(); + + lock1.close(); + } + + private static class InterProcessMutexMock implements InterProcessLock { + private final ReentrantLock lock = new ReentrantLock(); + @Override public void acquire() throws Exception { lock.lock(); } + @Override public boolean acquire(long time, TimeUnit unit) throws Exception { acquire(); return true; } + @Override public void release() throws Exception { lock.unlock(); } + @Override public boolean isAcquiredInThisProcess() { return lock.isLocked(); } + } } |