summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJon Marius Venstad <jonmv@users.noreply.github.com>2020-03-20 19:39:46 +0100
committerGitHub <noreply@github.com>2020-03-20 19:39:46 +0100
commit6cd559b2773b9cff88b7db646d9e6b8f38130b5a (patch)
tree582e82d51d74c17ee96fc1fa86240056c6289bd5
parentfc0a6eb4a073bca84e0b0da3997bc986646084e9 (diff)
parent1f02dd7f9b53d32e2dbc1242de56dc1b6135b097 (diff)
Merge pull request #11815 from vespa-engine/jvenstad/wrap-curator-mutex-with-reentarnt-lock
Hold a JVM-wide reentrant lock to grab mutex — helps ZK stale reads?
-rw-r--r--zkfacade/src/main/java/com/yahoo/vespa/curator/Lock.java28
1 files changed, 22 insertions, 6 deletions
diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/Lock.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/Lock.java
index 30af89d0ea8..2e554d39e44 100644
--- a/zkfacade/src/main/java/com/yahoo/vespa/curator/Lock.java
+++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/Lock.java
@@ -7,6 +7,7 @@ import org.apache.curator.framework.recipes.locks.InterProcessLock;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
/**
* A cluster-wide re-entrant mutex which is released on (the last symmetric) close
@@ -15,32 +16,47 @@ import java.util.concurrent.TimeUnit;
*/
public class Lock implements Mutex {
+ private final ReentrantLock lock;
private final InterProcessLock mutex;
private final String lockPath;
public Lock(String lockPath, Curator curator) {
this.lockPath = lockPath;
+ this.lock = new ReentrantLock(true);
mutex = curator.createMutex(lockPath);
}
/** Take the lock with the given timeout. This may be called multiple times from the same thread - each matched by a close */
public void acquire(Duration timeout) throws UncheckedTimeoutException {
- boolean acquired;
try {
- acquired = mutex.acquire(timeout.toMillis(), TimeUnit.MILLISECONDS);
+ if ( ! mutex.acquire(timeout.toMillis(), TimeUnit.MILLISECONDS))
+ throw new UncheckedTimeoutException("Timed out after waiting " + timeout +
+ " to acquire lock '" + lockPath + "'");
+ if ( ! lock.tryLock()) { // Should be available to only this thread, while holding the above mutex.
+ release();
+ throw new IllegalStateException("InterProcessMutex acquired, but guarded lock held by someone else, for lock '" + lockPath + "'");
+ }
+ }
+ catch (UncheckedTimeoutException | IllegalStateException e) {
+ throw e;
}
catch (Exception e) {
throw new RuntimeException("Exception acquiring lock '" + lockPath + "'", e);
}
-
- if (! acquired)
- throw new UncheckedTimeoutException("Timed out after waiting " + timeout +
- " to acquire lock '" + lockPath + "'");
}
@Override
public void close() {
try {
+ lock.unlock();
+ }
+ finally {
+ release();
+ }
+ }
+
+ private void release() {
+ try {
mutex.release();
}
catch (Exception e) {