summaryrefslogtreecommitdiffstats
path: root/zkfacade
diff options
context:
space:
mode:
authorHåkon Hallingstad <hakon@verizonmedia.com>2020-10-11 15:49:56 +0200
committerHåkon Hallingstad <hakon@verizonmedia.com>2020-10-11 15:49:56 +0200
commitc2ae38b9706a85760c5d3abbe527eaca01294fcc (patch)
tree13d5631ee36b387600f6144370b7d87790fa9dd5 /zkfacade
parent982a1b1804b8773be2c5db13535fa0b0e33928b1 (diff)
Deadlock detection
Just before Lock.acquire() is invoked, the locks within the process is queried to see if a "deadlock" will occur: The current thread waiting to acquire lock path P1, which is held by thread T1 waiting on acquiring a lock at path P2, etc, until a thread is waiting for a lock held by the current thread. Even without this PR the deadlock would resolve itself automatically because all locks are acquired with timeouts. However, this PR 1. resolves the deadlock immediately, and 2. leaves a log trace (hopefully from the exception) to allow us to refactor code to avoid such deadlocks.
Diffstat (limited to 'zkfacade')
-rw-r--r--zkfacade/src/main/java/com/yahoo/vespa/curator/stats/LockAttempt.java14
-rw-r--r--zkfacade/src/main/java/com/yahoo/vespa/curator/stats/LockStats.java44
-rw-r--r--zkfacade/src/main/java/com/yahoo/vespa/curator/stats/ThreadLockStats.java83
-rw-r--r--zkfacade/src/test/java/com/yahoo/vespa/curator/stats/LockTest.java75
4 files changed, 199 insertions, 17 deletions
diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/LockAttempt.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/LockAttempt.java
index 9cf490bf8c6..a7a75a283f1 100644
--- a/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/LockAttempt.java
+++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/LockAttempt.java
@@ -32,12 +32,6 @@ public class LockAttempt {
private volatile Optional<Instant> terminalStateInstant = Optional.empty();
private volatile Optional<String> stackTrace = Optional.empty();
- public static LockAttempt invokingAcquire(ThreadLockStats threadLockStats, String lockPath,
- Duration timeout, LockMetrics lockMetrics,
- boolean reentry) {
- return new LockAttempt(threadLockStats, lockPath, timeout, Instant.now(), lockMetrics, reentry);
- }
-
public enum LockState {
ACQUIRING(false), ACQUIRE_FAILED(true), TIMED_OUT(true), ACQUIRED(false), RELEASED(true),
RELEASED_WITH_ERROR(true);
@@ -51,6 +45,12 @@ public class LockAttempt {
private volatile LockState lockState = LockState.ACQUIRING;
+ public static LockAttempt invokingAcquire(ThreadLockStats threadLockStats, String lockPath,
+ Duration timeout, LockMetrics lockMetrics,
+ boolean reentry) {
+ return new LockAttempt(threadLockStats, lockPath, timeout, Instant.now(), lockMetrics, reentry);
+ }
+
private LockAttempt(ThreadLockStats threadLockStats, String lockPath, Duration timeout,
Instant callAcquireInstant, LockMetrics lockMetrics, boolean reentry) {
this.threadLockStats = threadLockStats;
@@ -66,8 +66,10 @@ public class LockAttempt {
public String getLockPath() { return lockPath; }
public Instant getTimeAcquiredWasInvoked() { return callAcquireInstant; }
public Duration getAcquireTimeout() { return timeout; }
+ public boolean getReentry() { return reentry; }
public LockState getLockState() { return lockState; }
public Optional<Instant> getTimeLockWasAcquired() { return lockAcquiredInstant; }
+ public boolean isAcquiring() { return lockAcquiredInstant.isEmpty(); }
public Instant getTimeAcquireEndedOrNow() {
return lockAcquiredInstant.orElseGet(() -> getTimeTerminalStateWasReached().orElseGet(Instant::now));
}
diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/LockStats.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/LockStats.java
index ee464e0918d..2ed6655d68f 100644
--- a/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/LockStats.java
+++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/LockStats.java
@@ -4,6 +4,7 @@ package com.yahoo.vespa.curator.stats;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.PriorityQueue;
import java.util.concurrent.ConcurrentHashMap;
@@ -18,6 +19,9 @@ public class LockStats {
private final ConcurrentHashMap<Thread, ThreadLockStats> statsByThread = new ConcurrentHashMap<>();
+ /** Modified only by Thread actually holding the lock on the path (key). */
+ private final ConcurrentHashMap<String, Thread> lockPathsHeld = new ConcurrentHashMap<>();
+
private final LockAttemptSamples completedLockAttemptSamples = new LockAttemptSamples(3);
// Keep recordings in a priority queue, with the smallest element having the smallest duration.
@@ -33,9 +37,7 @@ public class LockStats {
public static LockStats getGlobal() { return stats; }
/** Returns stats tied to the current thread. */
- public static ThreadLockStats getForCurrentThread() {
- return stats.statsByThread.computeIfAbsent(Thread.currentThread(), ThreadLockStats::new);
- }
+ public static ThreadLockStats getForCurrentThread() { return stats.getForThread(Thread.currentThread()); }
public static void clearForTesting() {
stats = new LockStats();
@@ -53,6 +55,42 @@ public class LockStats {
}
}
+ /** Non-private for testing. */
+ ThreadLockStats getForThread(Thread thread) {
+ return statsByThread.computeIfAbsent(thread, ThreadLockStats::new);
+ }
+
+ /** Must be invoked only after the first and non-reentry acquisition of the lock. */
+ void notifyOfThreadHoldingLock(Thread currentThread, String lockPath) {
+ Thread oldThread = lockPathsHeld.put(lockPath, currentThread);
+ if (oldThread != null) {
+ throw new IllegalStateException("Thread " + currentThread.getName() +
+ " reports it has the lock on " + lockPath + ", but thread " + oldThread.getName() +
+ " has not reported it released the lock");
+ }
+ }
+
+ /** Must be invoked only before the last and non-reentry release of the lock. */
+ void notifyOfThreadReleasingLock(Thread currentThread, String lockPath) {
+ Thread oldThread = lockPathsHeld.remove(lockPath);
+ if (oldThread == null) {
+ throw new IllegalStateException("Thread " + currentThread.getName() +
+ " is releasing the lock " + lockPath + ", but nobody own that lock");
+ } else if (oldThread != currentThread) {
+ throw new IllegalStateException("Thread " + currentThread.getName() +
+ " is releasing the lock " + lockPath + ", but it was owned by thread "
+ + oldThread.getName());
+ }
+ }
+
+ /**
+ * Returns the ThreadLockStats holding the lock on the path, but the info may be outdated already
+ * on return, either no-one holds the lock or another thread may hold the lock.
+ */
+ Optional<ThreadLockStats> getThreadLockStatsHolding(String lockPath) {
+ return Optional.ofNullable(lockPathsHeld.get(lockPath)).map(statsByThread::get);
+ }
+
LockMetrics getLockMetrics(String lockPath) {
return metricsByLockPath.computeIfAbsent(lockPath, __ -> new LockMetrics());
}
diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/ThreadLockStats.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/ThreadLockStats.java
index 393fac5e3db..f38811c4b3c 100644
--- a/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/ThreadLockStats.java
+++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/ThreadLockStats.java
@@ -1,6 +1,7 @@
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.curator.stats;
+import com.google.common.util.concurrent.UncheckedTimeoutException;
import com.yahoo.vespa.curator.Lock;
import java.time.Duration;
@@ -59,13 +60,19 @@ public class ThreadLockStats {
}
public List<LockAttempt> getOngoingLockAttempts() { return List.copyOf(lockAttemptsStack); }
- public Optional<LockAttempt> getTopMostOngoingLockAttempt() { return lockAttemptsStack.stream().findFirst(); }
+ public Optional<LockAttempt> getTopMostOngoingLockAttempt() { return Optional.ofNullable(lockAttemptsStack.peekFirst()); }
+ /** The most recent and deeply nested ongoing lock attempt. */
+ public Optional<LockAttempt> getBottomMostOngoingLockAttempt() { return Optional.ofNullable(lockAttemptsStack.peekLast()); }
public Optional<RecordedLockAttempts> getOngoingRecording() { return ongoingRecording; }
/** Mutable method (see class doc) */
public void invokingAcquire(String lockPath, Duration timeout) {
boolean reentry = lockAttemptsStack.stream().anyMatch(lockAttempt -> lockAttempt.getLockPath().equals(lockPath));
+ if (!reentry) {
+ throwOnDeadlock(lockPath);
+ }
+
LockAttempt lockAttempt = LockAttempt.invokingAcquire(this, lockPath, timeout,
getGlobalLockMetrics(lockPath), reentry);
@@ -90,12 +97,30 @@ public class ThreadLockStats {
/** Mutable method (see class doc) */
public void lockAcquired() {
- withLastLockAttempt(LockAttempt::lockAcquired);
+ withLastLockAttempt(lockAttempt -> {
+ // Note on the order of lockAcquired() vs notifyOfThreadHoldingLock(): When the latter is
+ // invoked, other threads may query e.g. isAcquired() on the lockAttempt, which would
+ // return false in a small window if these two statements were reversed. Not a biggie,
+ // but seems better to ensure LockAttempt is updated first.
+ lockAttempt.lockAcquired();
+
+ if (!lockAttempt.getReentry()) {
+ LockStats.getGlobal().notifyOfThreadHoldingLock(thread, lockAttempt.getLockPath());
+ }
+ });
}
/** Mutable method (see class doc) */
public void preRelease() {
- withLastLockAttempt(LockAttempt::preRelease);
+ withLastLockAttempt(lockAttempt -> {
+ // Note on the order of these two statement: Same concerns apply here as in lockAcquired().
+
+ if (!lockAttempt.getReentry()) {
+ LockStats.getGlobal().notifyOfThreadReleasingLock(thread, lockAttempt.getLockPath());
+ }
+
+ lockAttempt.preRelease();
+ });
}
/** Mutable method (see class doc) */
@@ -125,10 +150,62 @@ public class ThreadLockStats {
}
}
+ /**
+ * Throws a timeout exception if acquiring the path would cause a deadlock.
+ *
+ * <p>Thread T0 will deadlock if it tries to acquire a lock on a path L1 held by T1,
+ * and T1 is waiting on L2 held by T2, and so forth, and TN is waiting on L0 held by T0.</p>
+ *
+ * <p>This method is a best-effort attempt at detecting deadlocks: A deadlock may in fact be
+ * resolved even though this method throws, if e.g. locks are released just after this
+ * method </p>
+ *
+ * @throws com.google.common.util.concurrent.UncheckedTimeoutException
+ */
+ private void throwOnDeadlock(String pathToAcquire) {
+ LockStats globalLockStats = LockStats.getGlobal();
+ var errorMessage = new StringBuilder().append("Deadlock detected: Thread ").append(thread.getName());
+
+ String lockPath = pathToAcquire;
+ while (true) {
+ Optional<ThreadLockStats> threadLockStats = globalLockStats.getThreadLockStatsHolding(lockPath);
+ if (threadLockStats.isEmpty()) {
+ return;
+ }
+
+ errorMessage.append(", trying to acquire lock ")
+ .append(lockPath)
+ .append(" held by thread ")
+ .append(threadLockStats.get().thread.getName());
+
+ if (threadLockStats.get().thread == thread) {
+ if (lockPath.equals(pathToAcquire)) {
+ // reentry, ignore
+ return;
+ } else {
+ throw new UncheckedTimeoutException(errorMessage.toString());
+ }
+ }
+
+ Optional<String> nextLockPath = threadLockStats.get().acquiringLockPath();
+ if (nextLockPath.isEmpty()) {
+ return;
+ }
+
+ lockPath = nextLockPath.get();
+ }
+ }
+
private LockMetrics getGlobalLockMetrics(String lockPath) {
return LockStats.getGlobal().getLockMetrics(lockPath);
}
+ private Optional<String> acquiringLockPath() {
+ return Optional.ofNullable(lockAttemptsStack.peekLast())
+ .filter(LockAttempt::isAcquiring)
+ .map(LockAttempt::getLockPath);
+ }
+
private void withLastLockAttempt(Consumer<LockAttempt> lockAttemptConsumer) {
LockAttempt lockAttempt = lockAttemptsStack.peekLast();
if (lockAttempt == null) {
diff --git a/zkfacade/src/test/java/com/yahoo/vespa/curator/stats/LockTest.java b/zkfacade/src/test/java/com/yahoo/vespa/curator/stats/LockTest.java
index f440c2cfad8..c28691fe655 100644
--- a/zkfacade/src/test/java/com/yahoo/vespa/curator/stats/LockTest.java
+++ b/zkfacade/src/test/java/com/yahoo/vespa/curator/stats/LockTest.java
@@ -1,6 +1,7 @@
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.curator.stats;
+import com.google.common.util.concurrent.UncheckedTimeoutException;
import com.yahoo.vespa.curator.Lock;
import org.apache.curator.framework.recipes.locks.InterProcessLock;
import org.junit.Before;
@@ -9,6 +10,9 @@ import org.junit.Test;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@@ -27,7 +31,7 @@ public class LockTest {
private final InterProcessLock mutex = mock(InterProcessLock.class);
private final String lockPath = "/lock/path";
private final String lock2Path = "/lock2/path";
- private final Duration acquireTimeout = Duration.ofSeconds(10);
+ private static final Duration acquireTimeout = Duration.ofMinutes(10);
private final Lock lock = new Lock(lockPath, mutex);
private final Lock lock2 = new Lock(lock2Path, mutex);
@@ -176,9 +180,6 @@ public class LockTest {
public void nestedLocks() throws Exception {
when(mutex.acquire(anyLong(), any())).thenReturn(true);
- String lockPath2 = "/lock/path/2";
- Lock lock2 = new Lock(lockPath2, mutex);
-
lock.acquire(acquireTimeout);
lock2.acquire(acquireTimeout);
@@ -188,10 +189,74 @@ public class LockTest {
assertEquals(2, lockAttempts.size());
assertEquals(lockPath, lockAttempts.get(0).getLockPath());
assertEquals(LockAttempt.LockState.ACQUIRED, lockAttempts.get(0).getLockState());
- assertEquals(lockPath2, lockAttempts.get(1).getLockPath());
+ assertEquals(lock2Path, lockAttempts.get(1).getLockPath());
assertEquals(LockAttempt.LockState.ACQUIRED, lockAttempts.get(1).getLockState());
lock.close();
lock.close();
}
+
+ @Test
+ public void deadlock() throws Exception {
+ var lockPath1 = "/lock/path/1";
+ var lockPath2 = "/lock/path/2";
+
+ var lock1 = new Lock(lockPath1, new InterProcessMutexMock());
+ var lock2 = new Lock(lockPath2, new InterProcessMutexMock());
+
+ lock2.acquire(acquireTimeout);
+
+ Thread thread = Executors.defaultThreadFactory().newThread(() -> threadMain(lock1, lock2));
+ thread.setName("LockTest-async-thread");
+ thread.start();
+
+ LockStats globalStats = LockStats.getGlobal();
+ ThreadLockStats asyncThreadStats = globalStats.getForThread(thread);
+ while (true) {
+ Optional<LockAttempt> bottomMostOngoingLockAttempt = asyncThreadStats.getBottomMostOngoingLockAttempt();
+ if (bottomMostOngoingLockAttempt.isPresent() &&
+ bottomMostOngoingLockAttempt.get().getLockPath().equals(lockPath2)) {
+ break;
+ }
+
+ try {
+ Thread.sleep(1);
+ } catch (InterruptedException e) { }
+ }
+
+ try {
+ lock1.acquire(acquireTimeout);
+ fail();
+ } catch (UncheckedTimeoutException e) {
+ assertEquals(
+ "Unexpected timeout exception message: " + e.getMessage(),
+ "Deadlock detected: Thread main, " +
+ "trying to acquire lock /lock/path/1 held by thread LockTest-async-thread, " +
+ "trying to acquire lock /lock/path/2 held by thread main",
+ e.getMessage());
+ }
+
+ // Unlock, which unblocks thread
+ lock2.close();
+ thread.join();
+ }
+
+ private static void threadMain(Lock lock1, Lock lock2) {
+ lock1.acquire(acquireTimeout);
+
+ // This will block
+ lock2.acquire(acquireTimeout);
+
+ lock2.close();
+
+ lock1.close();
+ }
+
+ private static class InterProcessMutexMock implements InterProcessLock {
+ private final ReentrantLock lock = new ReentrantLock();
+ @Override public void acquire() throws Exception { lock.lock(); }
+ @Override public boolean acquire(long time, TimeUnit unit) throws Exception { acquire(); return true; }
+ @Override public void release() throws Exception { lock.unlock(); }
+ @Override public boolean isAcquiredInThisProcess() { return lock.isLocked(); }
+ }
}