summaryrefslogtreecommitdiffstats
path: root/orchestrator
diff options
context:
space:
mode:
authorHåkon Hallingstad <hakon@oath.com>2017-09-06 12:46:44 +0200
committerHåkon Hallingstad <hakon@oath.com>2017-09-06 12:46:44 +0200
commitf22011553430b3dc86dd08d917bd3a8d3b226288 (patch)
treeb049d88c767320a20925288ef21ab5993880f8fe /orchestrator
parentfb1818824b94e1ee4ca553065a2f6ba73b219ca7 (diff)
Add preferred Orchestrator lock
Diffstat (limited to 'orchestrator')
-rw-r--r--orchestrator/src/main/java/com/yahoo/vespa/orchestrator/status/ZookeeperStatusService.java73
-rw-r--r--orchestrator/src/test/java/com/yahoo/vespa/orchestrator/status/ZookeeperStatusServiceTest.java46
2 files changed, 72 insertions, 47 deletions
diff --git a/orchestrator/src/main/java/com/yahoo/vespa/orchestrator/status/ZookeeperStatusService.java b/orchestrator/src/main/java/com/yahoo/vespa/orchestrator/status/ZookeeperStatusService.java
index 6e973bcfb0b..a9392711d7d 100644
--- a/orchestrator/src/main/java/com/yahoo/vespa/orchestrator/status/ZookeeperStatusService.java
+++ b/orchestrator/src/main/java/com/yahoo/vespa/orchestrator/status/ZookeeperStatusService.java
@@ -6,11 +6,11 @@ import com.yahoo.log.LogLevel;
import com.yahoo.vespa.applicationmodel.ApplicationInstanceReference;
import com.yahoo.vespa.applicationmodel.HostName;
import com.yahoo.vespa.curator.Curator;
+import com.yahoo.vespa.curator.Lock;
import com.yahoo.vespa.orchestrator.OrchestratorUtil;
import org.apache.curator.SessionFailRetryLoop;
import org.apache.curator.SessionFailRetryLoop.Mode;
import org.apache.curator.SessionFailRetryLoop.SessionFailedException;
-import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
@@ -18,6 +18,7 @@ import org.apache.zookeeper.data.Stat;
import javax.annotation.concurrent.GuardedBy;
import javax.inject.Inject;
+import java.time.Duration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -45,18 +46,11 @@ public class ZookeeperStatusService implements StatusService {
final static String HOST_STATUS_BASE_PATH = "/vespa/host-status-service";
final static String APPLICATION_STATUS_BASE_PATH = "/vespa/application-status-service";
- private final CuratorFramework curatorFramework;
+ private final Curator curator;
@Inject
public ZookeeperStatusService(@Component Curator curator) {
- this(curator.framework());
- }
-
- /**
- * Called via public constructor on directly on testing.
- */
- ZookeeperStatusService(CuratorFramework curatorFramework) {
- this.curatorFramework = curatorFramework;
+ this.curator = curator;
}
@Override
@@ -86,7 +80,7 @@ public class ZookeeperStatusService implements StatusService {
*/
@Override
public MutableStatusRegistry lockApplicationInstance_forCurrentThreadOnly(ApplicationInstanceReference applicationInstanceReference) {
- return lockApplicationInstance_forCurrentThreadOnly(applicationInstanceReference, 10, TimeUnit.SECONDS);
+ return lockApplicationInstance_forCurrentThreadOnly(applicationInstanceReference, 10);
}
@Override
@@ -95,11 +89,11 @@ public class ZookeeperStatusService implements StatusService {
Set<ApplicationInstanceReference> resultSet = new HashSet<>();
// Return empty set if the base path does not exist
- Stat stat = curatorFramework.checkExists().forPath(APPLICATION_STATUS_BASE_PATH);
+ Stat stat = curator.framework().checkExists().forPath(APPLICATION_STATUS_BASE_PATH);
if (stat == null) return resultSet;
// The path exist and we may have children
- for (String appRefStr : curatorFramework.getChildren().forPath(APPLICATION_STATUS_BASE_PATH)) {
+ for (String appRefStr : curator.framework().getChildren().forPath(APPLICATION_STATUS_BASE_PATH)) {
ApplicationInstanceReference appRef = OrchestratorUtil.parseAppInstanceReference(appRefStr);
resultSet.add(appRef);
}
@@ -111,9 +105,9 @@ public class ZookeeperStatusService implements StatusService {
}
}
- MutableStatusRegistry lockApplicationInstance_forCurrentThreadOnly(ApplicationInstanceReference applicationInstanceReference,
- long timeout,
- TimeUnit timeoutTimeUnit) {
+ MutableStatusRegistry lockApplicationInstance_forCurrentThreadOnly(
+ ApplicationInstanceReference applicationInstanceReference,
+ long timeoutSeconds) {
Thread currentThread = Thread.currentThread();
//Due to limitations in SessionFailRetryLoop.
@@ -121,17 +115,33 @@ public class ZookeeperStatusService implements StatusService {
try {
SessionFailRetryLoop sessionFailRetryLoop =
- curatorFramework.getZookeeperClient().newSessionFailRetryLoop(Mode.FAIL);
+ curator.framework().getZookeeperClient().newSessionFailRetryLoop(Mode.FAIL);
sessionFailRetryLoop.start();
try {
String lockPath = applicationInstanceLockPath(applicationInstanceReference);
- InterProcessSemaphoreMutex mutex = acquireMutexOrThrow(timeout, timeoutTimeUnit, lockPath);
+ InterProcessSemaphoreMutex mutex = acquireMutexOrThrow(timeoutSeconds, TimeUnit.SECONDS, lockPath);
+
+ // TODO: Once rolled out, make this the only lock mechanism
+ Lock lock2;
+ try {
+ String lock2Path = applicationInstanceLock2Path(applicationInstanceReference);
+ lock2 = new Lock(lock2Path, curator);
+ lock2.acquire(Duration.ofSeconds(timeoutSeconds));
+ } catch (Throwable t) {
+ mutex.release();
+ throw t;
+ }
synchronized (threadsHoldingLock) {
threadsHoldingLock.put(currentThread, applicationInstanceReference);
}
- return new ZkMutableStatusRegistry(mutex, sessionFailRetryLoop, applicationInstanceReference, currentThread);
+ return new ZkMutableStatusRegistry(
+ lock2,
+ mutex,
+ sessionFailRetryLoop,
+ applicationInstanceReference,
+ currentThread);
} catch (Throwable t) {
sessionFailRetryLoop.close();
throw t;
@@ -151,7 +161,7 @@ public class ZookeeperStatusService implements StatusService {
}
private InterProcessSemaphoreMutex acquireMutexOrThrow(long timeout, TimeUnit timeoutTimeUnit, String lockPath) throws Exception {
- InterProcessSemaphoreMutex mutex = new InterProcessSemaphoreMutex(curatorFramework, lockPath);
+ InterProcessSemaphoreMutex mutex = new InterProcessSemaphoreMutex(curator.framework(), lockPath);
log.log(LogLevel.DEBUG, "Waiting for lock on " + lockPath);
boolean acquired = mutex.acquire(timeout, timeoutTimeUnit);
@@ -205,7 +215,7 @@ public class ZookeeperStatusService implements StatusService {
private void deleteNode_ignoreNoNodeException(String path, String debugLogMessageIfNotExists) throws Exception {
try {
- curatorFramework.delete().forPath(path);
+ curator.framework().delete().forPath(path);
} catch (NoNodeException e) {
log.log(LogLevel.DEBUG, debugLogMessageIfNotExists, e);
}
@@ -213,7 +223,7 @@ public class ZookeeperStatusService implements StatusService {
private void createNode_ignoreNodeExistsException(String path, String debugLogMessageIfExists) throws Exception {
try {
- curatorFramework.create()
+ curator.framework().create()
.creatingParentsIfNeeded()
.forPath(path);
} catch (NodeExistsException e) {
@@ -224,7 +234,7 @@ public class ZookeeperStatusService implements StatusService {
//TODO: Eliminate repeated calls to getHostStatus, replace with bulk operation.
private HostStatus getInternalHostStatus(ApplicationInstanceReference applicationInstanceReference, HostName hostName) {
try {
- Stat statOrNull = curatorFramework.checkExists().forPath(
+ Stat statOrNull = curator.framework().checkExists().forPath(
hostAllowedDownPath(applicationInstanceReference, hostName));
return (statOrNull == null) ? HostStatus.NO_REMARKS : HostStatus.ALLOWED_TO_BE_DOWN;
@@ -237,7 +247,7 @@ public class ZookeeperStatusService implements StatusService {
/** Common implementation for the two internal classes that sets ApplicationInstanceStatus. */
private ApplicationInstanceStatus getInternalApplicationInstanceStatus(ApplicationInstanceReference applicationInstanceReference) {
try {
- Stat statOrNull = curatorFramework.checkExists().forPath(
+ Stat statOrNull = curator.framework().checkExists().forPath(
applicationInstanceSuspendedPath(applicationInstanceReference));
return (statOrNull == null) ? ApplicationInstanceStatus.NO_REMARKS : ApplicationInstanceStatus.ALLOWED_TO_BE_DOWN;
@@ -266,6 +276,10 @@ public class ZookeeperStatusService implements StatusService {
return applicationInstancePath(applicationInstanceReference) + "/lock";
}
+ private static String applicationInstanceLock2Path(ApplicationInstanceReference applicationInstanceReference) {
+ return applicationInstancePath(applicationInstanceReference) + "/lock2";
+ }
+
private String applicationInstanceSuspendedPath(ApplicationInstanceReference applicationInstanceReference) {
return APPLICATION_STATUS_BASE_PATH + "/" + OrchestratorUtil.toRestApiFormat(applicationInstanceReference);
}
@@ -275,18 +289,21 @@ public class ZookeeperStatusService implements StatusService {
}
private class ZkMutableStatusRegistry implements MutableStatusRegistry {
+ private final Lock lock;
private final InterProcessSemaphoreMutex mutex;
private final SessionFailRetryLoop sessionFailRetryLoop;
private final ApplicationInstanceReference applicationInstanceReference;
private final Thread lockingThread;
public ZkMutableStatusRegistry(
+ Lock lock,
InterProcessSemaphoreMutex mutex,
SessionFailRetryLoop sessionFailRetryLoop,
ApplicationInstanceReference applicationInstanceReference,
Thread lockingThread) {
this.mutex = mutex;
+ this.lock = lock;
this.sessionFailRetryLoop = sessionFailRetryLoop;
this.applicationInstanceReference = applicationInstanceReference;
this.lockingThread = lockingThread;
@@ -337,6 +354,14 @@ public class ZookeeperStatusService implements StatusService {
}
try {
+ lock.close();
+ } catch (RuntimeException e) {
+ // We may want to avoid logging some exceptions that may be expected, like when session expires.
+ log.log(LogLevel.WARNING, "Failed to close application lock for " +
+ ZookeeperStatusService.class.getSimpleName() + ", will ignore and continue", e);
+ }
+
+ try {
mutex.release();
} catch (Exception e) {
if (e.getCause() instanceof SessionFailedException) {
diff --git a/orchestrator/src/test/java/com/yahoo/vespa/orchestrator/status/ZookeeperStatusServiceTest.java b/orchestrator/src/test/java/com/yahoo/vespa/orchestrator/status/ZookeeperStatusServiceTest.java
index e564edb8c59..e997743fcf0 100644
--- a/orchestrator/src/test/java/com/yahoo/vespa/orchestrator/status/ZookeeperStatusServiceTest.java
+++ b/orchestrator/src/test/java/com/yahoo/vespa/orchestrator/status/ZookeeperStatusServiceTest.java
@@ -3,6 +3,7 @@ package com.yahoo.vespa.orchestrator.status;
import com.yahoo.log.LogLevel;
import com.yahoo.vespa.applicationmodel.ApplicationInstanceReference;
+import com.yahoo.vespa.curator.Curator;
import com.yahoo.vespa.orchestrator.TestIds;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.curator.SessionFailRetryLoop.SessionFailedException;
@@ -39,32 +40,32 @@ import static org.junit.Assert.fail;
public class ZookeeperStatusServiceTest {
private TestingServer testingServer;
private ZookeeperStatusService zookeeperStatusService;
- private CuratorFramework curatorFramework;
+ private Curator curator;
@Before
public void setUp() throws Exception {
Logger.getLogger("").setLevel(LogLevel.WARNING);
testingServer = new TestingServer();
- curatorFramework = createConnectedCuratorFramework(testingServer);
- zookeeperStatusService = new ZookeeperStatusService(curatorFramework);
+ curator = createConnectedCurator(testingServer);
+ zookeeperStatusService = new ZookeeperStatusService(curator);
}
- private static CuratorFramework createConnectedCuratorFramework(TestingServer server) throws InterruptedException {
- CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
- .connectString(server.getConnectString())
- .retryPolicy(new ExponentialBackoffRetry(1000, 3))
- .build();
+ private static Curator createConnectedCuratorFramework(TestingServer server) throws InterruptedException {
+ return createConnectedCurator(server);
+ }
- curatorFramework.start();
- curatorFramework.blockUntilConnected(1, TimeUnit.MINUTES);
- return curatorFramework;
+ private static Curator createConnectedCurator(TestingServer server) throws InterruptedException {
+ Curator curator = new Curator(server.getConnectString());
+ curator.framework().blockUntilConnected(1, TimeUnit.MINUTES);
+ return curator;
}
@After
public void tearDown() throws Exception {
- if (curatorFramework != null) { //teardown is called even if setUp fails.
- curatorFramework.close();
+ if (curator != null) { //teardown is called even if setUp fails.
+ curator.close();
+ curator = null;
}
if (testingServer != null) {
testingServer.close();
@@ -101,8 +102,8 @@ public class ZookeeperStatusServiceTest {
@Test
public void locks_are_exclusive() throws Exception {
- try (CuratorFramework curatorFramework2 = createConnectedCuratorFramework(testingServer)) {
- ZookeeperStatusService zookeeperStatusService2 = new ZookeeperStatusService(curatorFramework2);
+ try (Curator curator = createConnectedCuratorFramework(testingServer)) {
+ ZookeeperStatusService zookeeperStatusService2 = new ZookeeperStatusService(curator);
final CompletableFuture<Void> lockedSuccessfullyFuture;
try (MutableStatusRegistry statusRegistry = zookeeperStatusService.lockApplicationInstance_forCurrentThreadOnly(
@@ -131,7 +132,7 @@ public class ZookeeperStatusServiceTest {
try (MutableStatusRegistry statusRegistry = zookeeperStatusService.lockApplicationInstance_forCurrentThreadOnly(
TestIds.APPLICATION_INSTANCE_REFERENCE)) {
- KillSession.kill(curatorFramework.getZookeeperClient().getZooKeeper(), testingServer.getConnectString());
+ KillSession.kill(curator.framework().getZookeeperClient().getZooKeeper(), testingServer.getConnectString());
assertSessionFailed(() ->
statusRegistry.setHostState(
@@ -148,8 +149,8 @@ public class ZookeeperStatusServiceTest {
@Test
public void failing_to_get_lock_closes_SessionFailRetryLoop() throws Exception {
- try (CuratorFramework curatorFramework2 = createConnectedCuratorFramework(testingServer)) {
- ZookeeperStatusService zookeeperStatusService2 = new ZookeeperStatusService(curatorFramework2);
+ try (Curator curator = createConnectedCuratorFramework(testingServer)) {
+ ZookeeperStatusService zookeeperStatusService2 = new ZookeeperStatusService(curator);
try (MutableStatusRegistry statusRegistry = zookeeperStatusService.lockApplicationInstance_forCurrentThreadOnly(
TestIds.APPLICATION_INSTANCE_REFERENCE)) {
@@ -158,13 +159,12 @@ public class ZookeeperStatusServiceTest {
CompletableFuture<Void> resultOfZkOperationAfterLockFailure = CompletableFuture.runAsync(() -> {
try {
zookeeperStatusService2.lockApplicationInstance_forCurrentThreadOnly(
- TestIds.APPLICATION_INSTANCE_REFERENCE,
- 1, TimeUnit.SECONDS);
+ TestIds.APPLICATION_INSTANCE_REFERENCE,1);
fail("Both zookeeper host status services locked simultaneously for the same application instance");
} catch (RuntimeException e) {
}
- killSession(curatorFramework2, testingServer);
+ killSession(curator.framework(), testingServer);
//Throws SessionFailedException if the SessionFailRetryLoop has not been closed.
zookeeperStatusService2.forApplicationInstance(TestIds.APPLICATION_INSTANCE_REFERENCE)
@@ -226,8 +226,8 @@ public class ZookeeperStatusServiceTest {
*/
@Test(expected = AssertionError.class)
public void multiple_locks_in_a_single_thread_gives_error() throws InterruptedException {
- try (CuratorFramework curatorFramework2 = createConnectedCuratorFramework(testingServer)) {
- ZookeeperStatusService zookeeperStatusService2 = new ZookeeperStatusService(curatorFramework2);
+ try (Curator curator = createConnectedCuratorFramework(testingServer)) {
+ ZookeeperStatusService zookeeperStatusService2 = new ZookeeperStatusService(curator);
try (MutableStatusRegistry statusRegistry1 = zookeeperStatusService
.lockApplicationInstance_forCurrentThreadOnly(TestIds.APPLICATION_INSTANCE_REFERENCE);