diff options
Diffstat (limited to 'zkfacade/src/main/java/com/yahoo/vespa/curator/Lock.java')
-rw-r--r-- | zkfacade/src/main/java/com/yahoo/vespa/curator/Lock.java | 43 |
1 files changed, 41 insertions, 2 deletions
diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/Lock.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/Lock.java index 6f914a8e9a3..05294a5435b 100644 --- a/zkfacade/src/main/java/com/yahoo/vespa/curator/Lock.java +++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/Lock.java @@ -9,6 +9,8 @@ import com.yahoo.vespa.curator.stats.ThreadLockStats; import org.apache.curator.framework.recipes.locks.InterProcessLock; import java.time.Duration; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.TimeUnit; /** @@ -21,6 +23,10 @@ import java.util.concurrent.TimeUnit; */ public class Lock implements Mutex { + private final Object monitor = new Object(); + private long nextSequenceNumber = 0; + private final Map<Long, Long> reentriesByThreadId = new HashMap<>(); + private final InterProcessLock mutex; private final String lockPath; @@ -52,14 +58,43 @@ public class Lock implements Mutex { throw new UncheckedTimeoutException("Timed out after waiting " + timeout + " to acquire lock '" + lockPath + "'"); } - threadLockStats.lockAcquired(); + + invoke(+1L, threadLockStats::lockAcquired); + } + + @FunctionalInterface + private interface TriConsumer { + void accept(String lockId, long reentryCountDiff, Map<Long, Long> reentriesByThreadId); + } + + // TODO(hakon): Remove once debugging is unnecessary + private void invoke(long reentryCountDiff, TriConsumer consumer) { + long threadId = Thread.currentThread().getId(); + final long sequenceNumber; + final Map<Long, Long> reentriesByThreadIdCopy; + synchronized (monitor) { + sequenceNumber = nextSequenceNumber++; + reentriesByThreadId.merge(threadId, reentryCountDiff, (oldValue, argumentValue) -> { + long sum = oldValue + argumentValue /* == reentryCountDiff */; + if (sum == 0) { + // Remove from map + return null; + } else { + return sum; + } + }); + reentriesByThreadIdCopy = Map.copyOf(reentriesByThreadId); + } + + String lockId = Integer.toHexString(System.identityHashCode(this)); + consumer.accept(lockId, sequenceNumber, reentriesByThreadIdCopy); } @Override public void close() { ThreadLockStats threadLockStats = LockStats.getForCurrentThread(); // Update metrics now before release() to avoid double-counting time in locked state. - threadLockStats.preRelease(); + invoke(-1L, threadLockStats::preRelease); try { mutex.release(); threadLockStats.postRelease(); @@ -72,6 +107,10 @@ public class Lock implements Mutex { protected String lockPath() { return lockPath; } + @Override + public String toString() { + return "Lock{" + lockPath + "}"; + } } |