diff options
author | Jon Marius Venstad <jonmv@users.noreply.github.com> | 2020-03-20 19:39:46 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-03-20 19:39:46 +0100 |
commit | 6cd559b2773b9cff88b7db646d9e6b8f38130b5a (patch) | |
tree | 582e82d51d74c17ee96fc1fa86240056c6289bd5 /zkfacade | |
parent | fc0a6eb4a073bca84e0b0da3997bc986646084e9 (diff) | |
parent | 1f02dd7f9b53d32e2dbc1242de56dc1b6135b097 (diff) |
Merge pull request #11815 from vespa-engine/jvenstad/wrap-curator-mutex-with-reentarnt-lock
Hold a JVM-wide reentrant lock to grab mutex — helps ZK stale reads?
Diffstat (limited to 'zkfacade')
-rw-r--r-- | zkfacade/src/main/java/com/yahoo/vespa/curator/Lock.java | 28 |
1 files changed, 22 insertions, 6 deletions
diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/Lock.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/Lock.java index 30af89d0ea8..2e554d39e44 100644 --- a/zkfacade/src/main/java/com/yahoo/vespa/curator/Lock.java +++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/Lock.java @@ -7,6 +7,7 @@ import org.apache.curator.framework.recipes.locks.InterProcessLock; import java.time.Duration; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; /** * A cluster-wide re-entrant mutex which is released on (the last symmetric) close @@ -15,32 +16,47 @@ import java.util.concurrent.TimeUnit; */ public class Lock implements Mutex { + private final ReentrantLock lock; private final InterProcessLock mutex; private final String lockPath; public Lock(String lockPath, Curator curator) { this.lockPath = lockPath; + this.lock = new ReentrantLock(true); mutex = curator.createMutex(lockPath); } /** Take the lock with the given timeout. This may be called multiple times from the same thread - each matched by a close */ public void acquire(Duration timeout) throws UncheckedTimeoutException { - boolean acquired; try { - acquired = mutex.acquire(timeout.toMillis(), TimeUnit.MILLISECONDS); + if ( ! mutex.acquire(timeout.toMillis(), TimeUnit.MILLISECONDS)) + throw new UncheckedTimeoutException("Timed out after waiting " + timeout + + " to acquire lock '" + lockPath + "'"); + if ( ! lock.tryLock()) { // Should be available to only this thread, while holding the above mutex. + release(); + throw new IllegalStateException("InterProcessMutex acquired, but guarded lock held by someone else, for lock '" + lockPath + "'"); + } + } + catch (UncheckedTimeoutException | IllegalStateException e) { + throw e; } catch (Exception e) { throw new RuntimeException("Exception acquiring lock '" + lockPath + "'", e); } - - if (! acquired) - throw new UncheckedTimeoutException("Timed out after waiting " + timeout + - " to acquire lock '" + lockPath + "'"); } @Override public void close() { try { + lock.unlock(); + } + finally { + release(); + } + } + + private void release() { + try { mutex.release(); } catch (Exception e) { |