diff options
Diffstat (limited to 'zkfacade')
4 files changed, 64 insertions, 19 deletions
diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/Lock.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/Lock.java index 3f9c52594a3..3e9c586f43c 100644 --- a/zkfacade/src/main/java/com/yahoo/vespa/curator/Lock.java +++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/Lock.java @@ -9,7 +9,11 @@ import com.yahoo.vespa.curator.stats.ThreadLockStats; import org.apache.curator.framework.recipes.locks.InterProcessLock; import java.time.Duration; +import java.time.Instant; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; /** * A cluster-wide re-entrant mutex which is released on (the last symmetric) close. @@ -21,17 +25,26 @@ import java.util.concurrent.TimeUnit; */ public class Lock implements Mutex { + // TODO(hakon): Remove once debugging is done + private final Object monitor = new Object(); + private long nextSequenceNumber = 0; + private final Map<Long, Long> reentriesByThreadId = new HashMap<>(); + private final Instant created = Instant.now(); + private Curator curator; + private final InterProcessLock mutex; private final String lockPath; public Lock(String lockPath, Curator curator) { this(lockPath, curator.createMutex(lockPath)); + this.curator = curator; } /** Public for testing only */ public Lock(String lockPath, InterProcessLock mutex) { this.lockPath = lockPath; this.mutex = mutex; + this.curator = null; } /** Take the lock with the given timeout. This may be called multiple times from the same thread - each matched by a close */ @@ -52,26 +65,57 @@ public class Lock implements Mutex { throw new UncheckedTimeoutException("Timed out after waiting " + timeout + " to acquire lock '" + lockPath + "'"); } - threadLockStats.lockAcquired(); + + invoke(+1L, threadLockStats::lockAcquired); + } + + // TODO(hakon): Remove once debugging is unnecessary + private void invoke(long reentryCountDiff, Consumer<String> consumer) { + long threadId = Thread.currentThread().getId(); + final long sequenceNumber; + final Map<Long, Long> reentriesByThreadIdCopy; + synchronized (monitor) { + sequenceNumber = nextSequenceNumber++; + reentriesByThreadId.merge(threadId, reentryCountDiff, (oldValue, argumentValue) -> { + long sum = oldValue + argumentValue /* == reentryCountDiff */; + if (sum == 0) { + // Remove from map + return null; + } else { + return sum; + } + }); + reentriesByThreadIdCopy = Map.copyOf(reentriesByThreadId); + } + + String debug = "thread " + threadId + " Lock 0x" + Integer.toHexString(System.identityHashCode(this)) + + "@" + created + " Curator 0x" + Integer.toHexString(System.identityHashCode(curator)) + + " lock " + lockPath + " #" + sequenceNumber + + ", reentries by thread ID = " + reentriesByThreadIdCopy; + consumer.accept(debug); } @Override public void close() { ThreadLockStats threadLockStats = LockStats.getForCurrentThread(); // Update metrics now before release() to avoid double-counting time in locked state. - threadLockStats.preRelease(); + invoke(-1L, threadLockStats::preRelease); try { mutex.release(); threadLockStats.postRelease(); } catch (Exception e) { threadLockStats.releaseFailed(); - throw new RuntimeException("Exception releasing lock '" + lockPath + "'"); + throw new RuntimeException("Exception releasing lock '" + lockPath + "'", e); } } protected String lockPath() { return lockPath; } + @Override + public String toString() { + return "Lock{" + lockPath + "}"; + } } diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/LockAttempt.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/LockAttempt.java index 887e2cd2700..1bbd3c7c734 100644 --- a/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/LockAttempt.java +++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/LockAttempt.java @@ -66,7 +66,7 @@ public class LockAttempt { public String getLockPath() { return lockPath; } public Instant getTimeAcquiredWasInvoked() { return callAcquireInstant; } public Duration getAcquireTimeout() { return timeout; } - public boolean getReentry() { return reentry; } + public boolean isReentry() { return reentry; } public LockState getLockState() { return lockState; } public Optional<Instant> getTimeLockWasAcquired() { return lockAcquiredInstant; } public boolean isAcquiring() { return lockAcquiredInstant.isEmpty(); } diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/LockStats.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/LockStats.java index e4f78a4f9e9..a37034b7547 100644 --- a/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/LockStats.java +++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/LockStats.java @@ -64,28 +64,28 @@ public class LockStats { } /** Must be invoked only after the first and non-reentry acquisition of the lock. */ - void notifyOfThreadHoldingLock(Thread currentThread, String lockPath) { + void notifyOfThreadHoldingLock(Thread currentThread, String lockPath, String debug) { Thread oldThread = lockPathsHeld.put(lockPath, currentThread); if (oldThread != null) { getLockMetrics(lockPath).incrementAcquireWithoutReleaseCount(); - logger.warning("Thread " + currentThread.getName() + - " reports it has the lock on " + lockPath + ", but thread " + oldThread.getName() + - " has not reported it released the lock"); + logger.warning("Thread " + currentThread.getName() + " reports it has the lock on " + + lockPath + ", but thread " + oldThread.getName() + + " has not reported it released the lock. " + debug); } } /** Must be invoked only before the last and non-reentry release of the lock. */ - void notifyOfThreadReleasingLock(Thread currentThread, String lockPath) { + void notifyOfThreadReleasingLock(Thread currentThread, String lockPath, String debug) { Thread oldThread = lockPathsHeld.remove(lockPath); if (oldThread == null) { getLockMetrics(lockPath).incrementNakedReleaseCount(); - logger.warning("Thread " + currentThread.getName() + - " is releasing the lock " + lockPath + ", but nobody owns that lock"); + logger.warning("Thread " + currentThread.getName() + " is releasing the lock " + lockPath + + ", but nobody owns that lock. " + debug); } else if (oldThread != currentThread) { getLockMetrics(lockPath).incrementForeignReleaseCount(); logger.warning("Thread " + currentThread.getName() + - " is releasing the lock " + lockPath + ", but it was owned by thread " - + oldThread.getName()); + " is releasing the lock " + lockPath + ", but it was owned by thread " + + oldThread.getName() + ". " + debug); } } diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/ThreadLockStats.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/ThreadLockStats.java index d4511bd04fb..beda7eaa142 100644 --- a/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/ThreadLockStats.java +++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/ThreadLockStats.java @@ -6,6 +6,7 @@ import com.yahoo.vespa.curator.Lock; import java.time.Duration; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentLinkedDeque; @@ -97,7 +98,7 @@ public class ThreadLockStats { } /** Mutable method (see class doc) */ - public void lockAcquired() { + public void lockAcquired(String debug) { withLastLockAttempt(lockAttempt -> { // Note on the order of lockAcquired() vs notifyOfThreadHoldingLock(): When the latter is // invoked, other threads may query e.g. isAcquired() on the lockAttempt, which would @@ -105,19 +106,19 @@ public class ThreadLockStats { // but seems better to ensure LockAttempt is updated first. lockAttempt.lockAcquired(); - if (!lockAttempt.getReentry()) { - LockStats.getGlobal().notifyOfThreadHoldingLock(thread, lockAttempt.getLockPath()); + if (!lockAttempt.isReentry()) { + LockStats.getGlobal().notifyOfThreadHoldingLock(thread, lockAttempt.getLockPath(), debug); } }); } /** Mutable method (see class doc) */ - public void preRelease() { + public void preRelease(String debug) { withLastLockAttempt(lockAttempt -> { // Note on the order of these two statement: Same concerns apply here as in lockAcquired(). - if (!lockAttempt.getReentry()) { - LockStats.getGlobal().notifyOfThreadReleasingLock(thread, lockAttempt.getLockPath()); + if (!lockAttempt.isReentry()) { + LockStats.getGlobal().notifyOfThreadReleasingLock(thread, lockAttempt.getLockPath(), debug); } lockAttempt.preRelease(); |