summaryrefslogtreecommitdiffstats
path: root/zkfacade/src
diff options
context:
space:
mode:
Diffstat (limited to 'zkfacade/src')
-rw-r--r--zkfacade/src/main/java/com/yahoo/vespa/curator/stats/LockAttempt.java14
-rw-r--r--zkfacade/src/main/java/com/yahoo/vespa/curator/stats/LockStats.java44
-rw-r--r--zkfacade/src/main/java/com/yahoo/vespa/curator/stats/ThreadLockStats.java83
-rw-r--r--zkfacade/src/test/java/com/yahoo/vespa/curator/stats/LockTest.java75
4 files changed, 199 insertions, 17 deletions
diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/LockAttempt.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/LockAttempt.java
index 9cf490bf8c6..a7a75a283f1 100644
--- a/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/LockAttempt.java
+++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/LockAttempt.java
@@ -32,12 +32,6 @@ public class LockAttempt {
private volatile Optional<Instant> terminalStateInstant = Optional.empty();
private volatile Optional<String> stackTrace = Optional.empty();
- public static LockAttempt invokingAcquire(ThreadLockStats threadLockStats, String lockPath,
- Duration timeout, LockMetrics lockMetrics,
- boolean reentry) {
- return new LockAttempt(threadLockStats, lockPath, timeout, Instant.now(), lockMetrics, reentry);
- }
-
public enum LockState {
ACQUIRING(false), ACQUIRE_FAILED(true), TIMED_OUT(true), ACQUIRED(false), RELEASED(true),
RELEASED_WITH_ERROR(true);
@@ -51,6 +45,12 @@ public class LockAttempt {
private volatile LockState lockState = LockState.ACQUIRING;
+ public static LockAttempt invokingAcquire(ThreadLockStats threadLockStats, String lockPath,
+ Duration timeout, LockMetrics lockMetrics,
+ boolean reentry) {
+ return new LockAttempt(threadLockStats, lockPath, timeout, Instant.now(), lockMetrics, reentry);
+ }
+
private LockAttempt(ThreadLockStats threadLockStats, String lockPath, Duration timeout,
Instant callAcquireInstant, LockMetrics lockMetrics, boolean reentry) {
this.threadLockStats = threadLockStats;
@@ -66,8 +66,10 @@ public class LockAttempt {
public String getLockPath() { return lockPath; }
public Instant getTimeAcquiredWasInvoked() { return callAcquireInstant; }
public Duration getAcquireTimeout() { return timeout; }
+ public boolean getReentry() { return reentry; }
public LockState getLockState() { return lockState; }
public Optional<Instant> getTimeLockWasAcquired() { return lockAcquiredInstant; }
+ public boolean isAcquiring() { return lockAcquiredInstant.isEmpty(); }
public Instant getTimeAcquireEndedOrNow() {
return lockAcquiredInstant.orElseGet(() -> getTimeTerminalStateWasReached().orElseGet(Instant::now));
}
diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/LockStats.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/LockStats.java
index ee464e0918d..2ed6655d68f 100644
--- a/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/LockStats.java
+++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/LockStats.java
@@ -4,6 +4,7 @@ package com.yahoo.vespa.curator.stats;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.PriorityQueue;
import java.util.concurrent.ConcurrentHashMap;
@@ -18,6 +19,9 @@ public class LockStats {
private final ConcurrentHashMap<Thread, ThreadLockStats> statsByThread = new ConcurrentHashMap<>();
+ /** Modified only by Thread actually holding the lock on the path (key). */
+ private final ConcurrentHashMap<String, Thread> lockPathsHeld = new ConcurrentHashMap<>();
+
private final LockAttemptSamples completedLockAttemptSamples = new LockAttemptSamples(3);
// Keep recordings in a priority queue, with the smallest element having the smallest duration.
@@ -33,9 +37,7 @@ public class LockStats {
public static LockStats getGlobal() { return stats; }
/** Returns stats tied to the current thread. */
- public static ThreadLockStats getForCurrentThread() {
- return stats.statsByThread.computeIfAbsent(Thread.currentThread(), ThreadLockStats::new);
- }
+ public static ThreadLockStats getForCurrentThread() { return stats.getForThread(Thread.currentThread()); }
public static void clearForTesting() {
stats = new LockStats();
@@ -53,6 +55,42 @@ public class LockStats {
}
}
+ /** Non-private for testing. */
+ ThreadLockStats getForThread(Thread thread) {
+ return statsByThread.computeIfAbsent(thread, ThreadLockStats::new);
+ }
+
+ /** Must be invoked only after the first and non-reentry acquisition of the lock. */
+ void notifyOfThreadHoldingLock(Thread currentThread, String lockPath) {
+ Thread oldThread = lockPathsHeld.put(lockPath, currentThread);
+ if (oldThread != null) {
+ throw new IllegalStateException("Thread " + currentThread.getName() +
+ " reports it has the lock on " + lockPath + ", but thread " + oldThread.getName() +
+ " has not reported it released the lock");
+ }
+ }
+
+ /** Must be invoked only before the last and non-reentry release of the lock. */
+ void notifyOfThreadReleasingLock(Thread currentThread, String lockPath) {
+ Thread oldThread = lockPathsHeld.remove(lockPath);
+ if (oldThread == null) {
+ throw new IllegalStateException("Thread " + currentThread.getName() +
+ " is releasing the lock " + lockPath + ", but nobody own that lock");
+ } else if (oldThread != currentThread) {
+ throw new IllegalStateException("Thread " + currentThread.getName() +
+ " is releasing the lock " + lockPath + ", but it was owned by thread "
+ + oldThread.getName());
+ }
+ }
+
+ /**
+ * Returns the ThreadLockStats holding the lock on the path, but the info may be outdated already
+ * on return, either no-one holds the lock or another thread may hold the lock.
+ */
+ Optional<ThreadLockStats> getThreadLockStatsHolding(String lockPath) {
+ return Optional.ofNullable(lockPathsHeld.get(lockPath)).map(statsByThread::get);
+ }
+
LockMetrics getLockMetrics(String lockPath) {
return metricsByLockPath.computeIfAbsent(lockPath, __ -> new LockMetrics());
}
diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/ThreadLockStats.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/ThreadLockStats.java
index 393fac5e3db..f38811c4b3c 100644
--- a/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/ThreadLockStats.java
+++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/ThreadLockStats.java
@@ -1,6 +1,7 @@
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.curator.stats;
+import com.google.common.util.concurrent.UncheckedTimeoutException;
import com.yahoo.vespa.curator.Lock;
import java.time.Duration;
@@ -59,13 +60,19 @@ public class ThreadLockStats {
}
public List<LockAttempt> getOngoingLockAttempts() { return List.copyOf(lockAttemptsStack); }
- public Optional<LockAttempt> getTopMostOngoingLockAttempt() { return lockAttemptsStack.stream().findFirst(); }
+ public Optional<LockAttempt> getTopMostOngoingLockAttempt() { return Optional.ofNullable(lockAttemptsStack.peekFirst()); }
+ /** The most recent and deeply nested ongoing lock attempt. */
+ public Optional<LockAttempt> getBottomMostOngoingLockAttempt() { return Optional.ofNullable(lockAttemptsStack.peekLast()); }
public Optional<RecordedLockAttempts> getOngoingRecording() { return ongoingRecording; }
/** Mutable method (see class doc) */
public void invokingAcquire(String lockPath, Duration timeout) {
boolean reentry = lockAttemptsStack.stream().anyMatch(lockAttempt -> lockAttempt.getLockPath().equals(lockPath));
+ if (!reentry) {
+ throwOnDeadlock(lockPath);
+ }
+
LockAttempt lockAttempt = LockAttempt.invokingAcquire(this, lockPath, timeout,
getGlobalLockMetrics(lockPath), reentry);
@@ -90,12 +97,30 @@ public class ThreadLockStats {
/** Mutable method (see class doc) */
public void lockAcquired() {
- withLastLockAttempt(LockAttempt::lockAcquired);
+ withLastLockAttempt(lockAttempt -> {
+ // Note on the order of lockAcquired() vs notifyOfThreadHoldingLock(): When the latter is
+ // invoked, other threads may query e.g. isAcquired() on the lockAttempt, which would
+ // return false in a small window if these two statements were reversed. Not a biggie,
+ // but seems better to ensure LockAttempt is updated first.
+ lockAttempt.lockAcquired();
+
+ if (!lockAttempt.getReentry()) {
+ LockStats.getGlobal().notifyOfThreadHoldingLock(thread, lockAttempt.getLockPath());
+ }
+ });
}
/** Mutable method (see class doc) */
public void preRelease() {
- withLastLockAttempt(LockAttempt::preRelease);
+ withLastLockAttempt(lockAttempt -> {
+ // Note on the order of these two statement: Same concerns apply here as in lockAcquired().
+
+ if (!lockAttempt.getReentry()) {
+ LockStats.getGlobal().notifyOfThreadReleasingLock(thread, lockAttempt.getLockPath());
+ }
+
+ lockAttempt.preRelease();
+ });
}
/** Mutable method (see class doc) */
@@ -125,10 +150,62 @@ public class ThreadLockStats {
}
}
+ /**
+ * Throws a timeout exception if acquiring the path would cause a deadlock.
+ *
+ * <p>Thread T0 will deadlock if it tries to acquire a lock on a path L1 held by T1,
+ * and T1 is waiting on L2 held by T2, and so forth, and TN is waiting on L0 held by T0.</p>
+ *
+ * <p>This method is a best-effort attempt at detecting deadlocks: A deadlock may in fact be
+ * resolved even though this method throws, if e.g. locks are released just after this
+ * method </p>
+ *
+ * @throws com.google.common.util.concurrent.UncheckedTimeoutException
+ */
+ private void throwOnDeadlock(String pathToAcquire) {
+ LockStats globalLockStats = LockStats.getGlobal();
+ var errorMessage = new StringBuilder().append("Deadlock detected: Thread ").append(thread.getName());
+
+ String lockPath = pathToAcquire;
+ while (true) {
+ Optional<ThreadLockStats> threadLockStats = globalLockStats.getThreadLockStatsHolding(lockPath);
+ if (threadLockStats.isEmpty()) {
+ return;
+ }
+
+ errorMessage.append(", trying to acquire lock ")
+ .append(lockPath)
+ .append(" held by thread ")
+ .append(threadLockStats.get().thread.getName());
+
+ if (threadLockStats.get().thread == thread) {
+ if (lockPath.equals(pathToAcquire)) {
+ // reentry, ignore
+ return;
+ } else {
+ throw new UncheckedTimeoutException(errorMessage.toString());
+ }
+ }
+
+ Optional<String> nextLockPath = threadLockStats.get().acquiringLockPath();
+ if (nextLockPath.isEmpty()) {
+ return;
+ }
+
+ lockPath = nextLockPath.get();
+ }
+ }
+
private LockMetrics getGlobalLockMetrics(String lockPath) {
return LockStats.getGlobal().getLockMetrics(lockPath);
}
+ private Optional<String> acquiringLockPath() {
+ return Optional.ofNullable(lockAttemptsStack.peekLast())
+ .filter(LockAttempt::isAcquiring)
+ .map(LockAttempt::getLockPath);
+ }
+
private void withLastLockAttempt(Consumer<LockAttempt> lockAttemptConsumer) {
LockAttempt lockAttempt = lockAttemptsStack.peekLast();
if (lockAttempt == null) {
diff --git a/zkfacade/src/test/java/com/yahoo/vespa/curator/stats/LockTest.java b/zkfacade/src/test/java/com/yahoo/vespa/curator/stats/LockTest.java
index f440c2cfad8..c28691fe655 100644
--- a/zkfacade/src/test/java/com/yahoo/vespa/curator/stats/LockTest.java
+++ b/zkfacade/src/test/java/com/yahoo/vespa/curator/stats/LockTest.java
@@ -1,6 +1,7 @@
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.curator.stats;
+import com.google.common.util.concurrent.UncheckedTimeoutException;
import com.yahoo.vespa.curator.Lock;
import org.apache.curator.framework.recipes.locks.InterProcessLock;
import org.junit.Before;
@@ -9,6 +10,9 @@ import org.junit.Test;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@@ -27,7 +31,7 @@ public class LockTest {
private final InterProcessLock mutex = mock(InterProcessLock.class);
private final String lockPath = "/lock/path";
private final String lock2Path = "/lock2/path";
- private final Duration acquireTimeout = Duration.ofSeconds(10);
+ private static final Duration acquireTimeout = Duration.ofMinutes(10);
private final Lock lock = new Lock(lockPath, mutex);
private final Lock lock2 = new Lock(lock2Path, mutex);
@@ -176,9 +180,6 @@ public class LockTest {
public void nestedLocks() throws Exception {
when(mutex.acquire(anyLong(), any())).thenReturn(true);
- String lockPath2 = "/lock/path/2";
- Lock lock2 = new Lock(lockPath2, mutex);
-
lock.acquire(acquireTimeout);
lock2.acquire(acquireTimeout);
@@ -188,10 +189,74 @@ public class LockTest {
assertEquals(2, lockAttempts.size());
assertEquals(lockPath, lockAttempts.get(0).getLockPath());
assertEquals(LockAttempt.LockState.ACQUIRED, lockAttempts.get(0).getLockState());
- assertEquals(lockPath2, lockAttempts.get(1).getLockPath());
+ assertEquals(lock2Path, lockAttempts.get(1).getLockPath());
assertEquals(LockAttempt.LockState.ACQUIRED, lockAttempts.get(1).getLockState());
lock.close();
lock.close();
}
+
+ @Test
+ public void deadlock() throws Exception {
+ var lockPath1 = "/lock/path/1";
+ var lockPath2 = "/lock/path/2";
+
+ var lock1 = new Lock(lockPath1, new InterProcessMutexMock());
+ var lock2 = new Lock(lockPath2, new InterProcessMutexMock());
+
+ lock2.acquire(acquireTimeout);
+
+ Thread thread = Executors.defaultThreadFactory().newThread(() -> threadMain(lock1, lock2));
+ thread.setName("LockTest-async-thread");
+ thread.start();
+
+ LockStats globalStats = LockStats.getGlobal();
+ ThreadLockStats asyncThreadStats = globalStats.getForThread(thread);
+ while (true) {
+ Optional<LockAttempt> bottomMostOngoingLockAttempt = asyncThreadStats.getBottomMostOngoingLockAttempt();
+ if (bottomMostOngoingLockAttempt.isPresent() &&
+ bottomMostOngoingLockAttempt.get().getLockPath().equals(lockPath2)) {
+ break;
+ }
+
+ try {
+ Thread.sleep(1);
+ } catch (InterruptedException e) { }
+ }
+
+ try {
+ lock1.acquire(acquireTimeout);
+ fail();
+ } catch (UncheckedTimeoutException e) {
+ assertEquals(
+ "Unexpected timeout exception message: " + e.getMessage(),
+ "Deadlock detected: Thread main, " +
+ "trying to acquire lock /lock/path/1 held by thread LockTest-async-thread, " +
+ "trying to acquire lock /lock/path/2 held by thread main",
+ e.getMessage());
+ }
+
+ // Unlock, which unblocks thread
+ lock2.close();
+ thread.join();
+ }
+
+ private static void threadMain(Lock lock1, Lock lock2) {
+ lock1.acquire(acquireTimeout);
+
+ // This will block
+ lock2.acquire(acquireTimeout);
+
+ lock2.close();
+
+ lock1.close();
+ }
+
+ private static class InterProcessMutexMock implements InterProcessLock {
+ private final ReentrantLock lock = new ReentrantLock();
+ @Override public void acquire() throws Exception { lock.lock(); }
+ @Override public boolean acquire(long time, TimeUnit unit) throws Exception { acquire(); return true; }
+ @Override public void release() throws Exception { lock.unlock(); }
+ @Override public boolean isAcquiredInThisProcess() { return lock.isLocked(); }
+ }
}