diff options
author | hakonhall <hakon@oath.com> | 2017-09-06 16:44:12 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-09-06 16:44:12 +0200 |
commit | 5bd5b92210f84c44dbc46478d607bed9c2f1866d (patch) | |
tree | 1423d5349b14fe5cf33dfcb96ca55637ff52eb84 | |
parent | 9a1b184090da69224b2fce165bba6f37b427cffb (diff) | |
parent | f22011553430b3dc86dd08d917bd3a8d3b226288 (diff) |
Merge pull request #3340 from vespa-engine/hakon/add-preferred-orchestrator-lock
Add preferred Orchestrator lock
4 files changed, 89 insertions, 55 deletions
diff --git a/orchestrator/src/main/java/com/yahoo/vespa/orchestrator/status/ZookeeperStatusService.java b/orchestrator/src/main/java/com/yahoo/vespa/orchestrator/status/ZookeeperStatusService.java index 6e973bcfb0b..a9392711d7d 100644 --- a/orchestrator/src/main/java/com/yahoo/vespa/orchestrator/status/ZookeeperStatusService.java +++ b/orchestrator/src/main/java/com/yahoo/vespa/orchestrator/status/ZookeeperStatusService.java @@ -6,11 +6,11 @@ import com.yahoo.log.LogLevel; import com.yahoo.vespa.applicationmodel.ApplicationInstanceReference; import com.yahoo.vespa.applicationmodel.HostName; import com.yahoo.vespa.curator.Curator; +import com.yahoo.vespa.curator.Lock; import com.yahoo.vespa.orchestrator.OrchestratorUtil; import org.apache.curator.SessionFailRetryLoop; import org.apache.curator.SessionFailRetryLoop.Mode; import org.apache.curator.SessionFailRetryLoop.SessionFailedException; -import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex; import org.apache.zookeeper.KeeperException.NoNodeException; import org.apache.zookeeper.KeeperException.NodeExistsException; @@ -18,6 +18,7 @@ import org.apache.zookeeper.data.Stat; import javax.annotation.concurrent.GuardedBy; import javax.inject.Inject; +import java.time.Duration; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -45,18 +46,11 @@ public class ZookeeperStatusService implements StatusService { final static String HOST_STATUS_BASE_PATH = "/vespa/host-status-service"; final static String APPLICATION_STATUS_BASE_PATH = "/vespa/application-status-service"; - private final CuratorFramework curatorFramework; + private final Curator curator; @Inject public ZookeeperStatusService(@Component Curator curator) { - this(curator.framework()); - } - - /** - * Called via public constructor on directly on testing. - */ - ZookeeperStatusService(CuratorFramework curatorFramework) { - this.curatorFramework = curatorFramework; + this.curator = curator; } @Override @@ -86,7 +80,7 @@ public class ZookeeperStatusService implements StatusService { */ @Override public MutableStatusRegistry lockApplicationInstance_forCurrentThreadOnly(ApplicationInstanceReference applicationInstanceReference) { - return lockApplicationInstance_forCurrentThreadOnly(applicationInstanceReference, 10, TimeUnit.SECONDS); + return lockApplicationInstance_forCurrentThreadOnly(applicationInstanceReference, 10); } @Override @@ -95,11 +89,11 @@ public class ZookeeperStatusService implements StatusService { Set<ApplicationInstanceReference> resultSet = new HashSet<>(); // Return empty set if the base path does not exist - Stat stat = curatorFramework.checkExists().forPath(APPLICATION_STATUS_BASE_PATH); + Stat stat = curator.framework().checkExists().forPath(APPLICATION_STATUS_BASE_PATH); if (stat == null) return resultSet; // The path exist and we may have children - for (String appRefStr : curatorFramework.getChildren().forPath(APPLICATION_STATUS_BASE_PATH)) { + for (String appRefStr : curator.framework().getChildren().forPath(APPLICATION_STATUS_BASE_PATH)) { ApplicationInstanceReference appRef = OrchestratorUtil.parseAppInstanceReference(appRefStr); resultSet.add(appRef); } @@ -111,9 +105,9 @@ public class ZookeeperStatusService implements StatusService { } } - MutableStatusRegistry lockApplicationInstance_forCurrentThreadOnly(ApplicationInstanceReference applicationInstanceReference, - long timeout, - TimeUnit timeoutTimeUnit) { + MutableStatusRegistry lockApplicationInstance_forCurrentThreadOnly( + ApplicationInstanceReference applicationInstanceReference, + long timeoutSeconds) { Thread currentThread = Thread.currentThread(); //Due to limitations in SessionFailRetryLoop. @@ -121,17 +115,33 @@ public class ZookeeperStatusService implements StatusService { try { SessionFailRetryLoop sessionFailRetryLoop = - curatorFramework.getZookeeperClient().newSessionFailRetryLoop(Mode.FAIL); + curator.framework().getZookeeperClient().newSessionFailRetryLoop(Mode.FAIL); sessionFailRetryLoop.start(); try { String lockPath = applicationInstanceLockPath(applicationInstanceReference); - InterProcessSemaphoreMutex mutex = acquireMutexOrThrow(timeout, timeoutTimeUnit, lockPath); + InterProcessSemaphoreMutex mutex = acquireMutexOrThrow(timeoutSeconds, TimeUnit.SECONDS, lockPath); + + // TODO: Once rolled out, make this the only lock mechanism + Lock lock2; + try { + String lock2Path = applicationInstanceLock2Path(applicationInstanceReference); + lock2 = new Lock(lock2Path, curator); + lock2.acquire(Duration.ofSeconds(timeoutSeconds)); + } catch (Throwable t) { + mutex.release(); + throw t; + } synchronized (threadsHoldingLock) { threadsHoldingLock.put(currentThread, applicationInstanceReference); } - return new ZkMutableStatusRegistry(mutex, sessionFailRetryLoop, applicationInstanceReference, currentThread); + return new ZkMutableStatusRegistry( + lock2, + mutex, + sessionFailRetryLoop, + applicationInstanceReference, + currentThread); } catch (Throwable t) { sessionFailRetryLoop.close(); throw t; @@ -151,7 +161,7 @@ public class ZookeeperStatusService implements StatusService { } private InterProcessSemaphoreMutex acquireMutexOrThrow(long timeout, TimeUnit timeoutTimeUnit, String lockPath) throws Exception { - InterProcessSemaphoreMutex mutex = new InterProcessSemaphoreMutex(curatorFramework, lockPath); + InterProcessSemaphoreMutex mutex = new InterProcessSemaphoreMutex(curator.framework(), lockPath); log.log(LogLevel.DEBUG, "Waiting for lock on " + lockPath); boolean acquired = mutex.acquire(timeout, timeoutTimeUnit); @@ -205,7 +215,7 @@ public class ZookeeperStatusService implements StatusService { private void deleteNode_ignoreNoNodeException(String path, String debugLogMessageIfNotExists) throws Exception { try { - curatorFramework.delete().forPath(path); + curator.framework().delete().forPath(path); } catch (NoNodeException e) { log.log(LogLevel.DEBUG, debugLogMessageIfNotExists, e); } @@ -213,7 +223,7 @@ public class ZookeeperStatusService implements StatusService { private void createNode_ignoreNodeExistsException(String path, String debugLogMessageIfExists) throws Exception { try { - curatorFramework.create() + curator.framework().create() .creatingParentsIfNeeded() .forPath(path); } catch (NodeExistsException e) { @@ -224,7 +234,7 @@ public class ZookeeperStatusService implements StatusService { //TODO: Eliminate repeated calls to getHostStatus, replace with bulk operation. private HostStatus getInternalHostStatus(ApplicationInstanceReference applicationInstanceReference, HostName hostName) { try { - Stat statOrNull = curatorFramework.checkExists().forPath( + Stat statOrNull = curator.framework().checkExists().forPath( hostAllowedDownPath(applicationInstanceReference, hostName)); return (statOrNull == null) ? HostStatus.NO_REMARKS : HostStatus.ALLOWED_TO_BE_DOWN; @@ -237,7 +247,7 @@ public class ZookeeperStatusService implements StatusService { /** Common implementation for the two internal classes that sets ApplicationInstanceStatus. */ private ApplicationInstanceStatus getInternalApplicationInstanceStatus(ApplicationInstanceReference applicationInstanceReference) { try { - Stat statOrNull = curatorFramework.checkExists().forPath( + Stat statOrNull = curator.framework().checkExists().forPath( applicationInstanceSuspendedPath(applicationInstanceReference)); return (statOrNull == null) ? ApplicationInstanceStatus.NO_REMARKS : ApplicationInstanceStatus.ALLOWED_TO_BE_DOWN; @@ -266,6 +276,10 @@ public class ZookeeperStatusService implements StatusService { return applicationInstancePath(applicationInstanceReference) + "/lock"; } + private static String applicationInstanceLock2Path(ApplicationInstanceReference applicationInstanceReference) { + return applicationInstancePath(applicationInstanceReference) + "/lock2"; + } + private String applicationInstanceSuspendedPath(ApplicationInstanceReference applicationInstanceReference) { return APPLICATION_STATUS_BASE_PATH + "/" + OrchestratorUtil.toRestApiFormat(applicationInstanceReference); } @@ -275,18 +289,21 @@ public class ZookeeperStatusService implements StatusService { } private class ZkMutableStatusRegistry implements MutableStatusRegistry { + private final Lock lock; private final InterProcessSemaphoreMutex mutex; private final SessionFailRetryLoop sessionFailRetryLoop; private final ApplicationInstanceReference applicationInstanceReference; private final Thread lockingThread; public ZkMutableStatusRegistry( + Lock lock, InterProcessSemaphoreMutex mutex, SessionFailRetryLoop sessionFailRetryLoop, ApplicationInstanceReference applicationInstanceReference, Thread lockingThread) { this.mutex = mutex; + this.lock = lock; this.sessionFailRetryLoop = sessionFailRetryLoop; this.applicationInstanceReference = applicationInstanceReference; this.lockingThread = lockingThread; @@ -337,6 +354,14 @@ public class ZookeeperStatusService implements StatusService { } try { + lock.close(); + } catch (RuntimeException e) { + // We may want to avoid logging some exceptions that may be expected, like when session expires. + log.log(LogLevel.WARNING, "Failed to close application lock for " + + ZookeeperStatusService.class.getSimpleName() + ", will ignore and continue", e); + } + + try { mutex.release(); } catch (Exception e) { if (e.getCause() instanceof SessionFailedException) { diff --git a/orchestrator/src/test/java/com/yahoo/vespa/orchestrator/status/ZookeeperStatusServiceTest.java b/orchestrator/src/test/java/com/yahoo/vespa/orchestrator/status/ZookeeperStatusServiceTest.java index e564edb8c59..e997743fcf0 100644 --- a/orchestrator/src/test/java/com/yahoo/vespa/orchestrator/status/ZookeeperStatusServiceTest.java +++ b/orchestrator/src/test/java/com/yahoo/vespa/orchestrator/status/ZookeeperStatusServiceTest.java @@ -3,6 +3,7 @@ package com.yahoo.vespa.orchestrator.status; import com.yahoo.log.LogLevel; import com.yahoo.vespa.applicationmodel.ApplicationInstanceReference; +import com.yahoo.vespa.curator.Curator; import com.yahoo.vespa.orchestrator.TestIds; import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.curator.SessionFailRetryLoop.SessionFailedException; @@ -39,32 +40,32 @@ import static org.junit.Assert.fail; public class ZookeeperStatusServiceTest { private TestingServer testingServer; private ZookeeperStatusService zookeeperStatusService; - private CuratorFramework curatorFramework; + private Curator curator; @Before public void setUp() throws Exception { Logger.getLogger("").setLevel(LogLevel.WARNING); testingServer = new TestingServer(); - curatorFramework = createConnectedCuratorFramework(testingServer); - zookeeperStatusService = new ZookeeperStatusService(curatorFramework); + curator = createConnectedCurator(testingServer); + zookeeperStatusService = new ZookeeperStatusService(curator); } - private static CuratorFramework createConnectedCuratorFramework(TestingServer server) throws InterruptedException { - CuratorFramework curatorFramework = CuratorFrameworkFactory.builder() - .connectString(server.getConnectString()) - .retryPolicy(new ExponentialBackoffRetry(1000, 3)) - .build(); + private static Curator createConnectedCuratorFramework(TestingServer server) throws InterruptedException { + return createConnectedCurator(server); + } - curatorFramework.start(); - curatorFramework.blockUntilConnected(1, TimeUnit.MINUTES); - return curatorFramework; + private static Curator createConnectedCurator(TestingServer server) throws InterruptedException { + Curator curator = new Curator(server.getConnectString()); + curator.framework().blockUntilConnected(1, TimeUnit.MINUTES); + return curator; } @After public void tearDown() throws Exception { - if (curatorFramework != null) { //teardown is called even if setUp fails. - curatorFramework.close(); + if (curator != null) { //teardown is called even if setUp fails. + curator.close(); + curator = null; } if (testingServer != null) { testingServer.close(); @@ -101,8 +102,8 @@ public class ZookeeperStatusServiceTest { @Test public void locks_are_exclusive() throws Exception { - try (CuratorFramework curatorFramework2 = createConnectedCuratorFramework(testingServer)) { - ZookeeperStatusService zookeeperStatusService2 = new ZookeeperStatusService(curatorFramework2); + try (Curator curator = createConnectedCuratorFramework(testingServer)) { + ZookeeperStatusService zookeeperStatusService2 = new ZookeeperStatusService(curator); final CompletableFuture<Void> lockedSuccessfullyFuture; try (MutableStatusRegistry statusRegistry = zookeeperStatusService.lockApplicationInstance_forCurrentThreadOnly( @@ -131,7 +132,7 @@ public class ZookeeperStatusServiceTest { try (MutableStatusRegistry statusRegistry = zookeeperStatusService.lockApplicationInstance_forCurrentThreadOnly( TestIds.APPLICATION_INSTANCE_REFERENCE)) { - KillSession.kill(curatorFramework.getZookeeperClient().getZooKeeper(), testingServer.getConnectString()); + KillSession.kill(curator.framework().getZookeeperClient().getZooKeeper(), testingServer.getConnectString()); assertSessionFailed(() -> statusRegistry.setHostState( @@ -148,8 +149,8 @@ public class ZookeeperStatusServiceTest { @Test public void failing_to_get_lock_closes_SessionFailRetryLoop() throws Exception { - try (CuratorFramework curatorFramework2 = createConnectedCuratorFramework(testingServer)) { - ZookeeperStatusService zookeeperStatusService2 = new ZookeeperStatusService(curatorFramework2); + try (Curator curator = createConnectedCuratorFramework(testingServer)) { + ZookeeperStatusService zookeeperStatusService2 = new ZookeeperStatusService(curator); try (MutableStatusRegistry statusRegistry = zookeeperStatusService.lockApplicationInstance_forCurrentThreadOnly( TestIds.APPLICATION_INSTANCE_REFERENCE)) { @@ -158,13 +159,12 @@ public class ZookeeperStatusServiceTest { CompletableFuture<Void> resultOfZkOperationAfterLockFailure = CompletableFuture.runAsync(() -> { try { zookeeperStatusService2.lockApplicationInstance_forCurrentThreadOnly( - TestIds.APPLICATION_INSTANCE_REFERENCE, - 1, TimeUnit.SECONDS); + TestIds.APPLICATION_INSTANCE_REFERENCE,1); fail("Both zookeeper host status services locked simultaneously for the same application instance"); } catch (RuntimeException e) { } - killSession(curatorFramework2, testingServer); + killSession(curator.framework(), testingServer); //Throws SessionFailedException if the SessionFailRetryLoop has not been closed. zookeeperStatusService2.forApplicationInstance(TestIds.APPLICATION_INSTANCE_REFERENCE) @@ -226,8 +226,8 @@ public class ZookeeperStatusServiceTest { */ @Test(expected = AssertionError.class) public void multiple_locks_in_a_single_thread_gives_error() throws InterruptedException { - try (CuratorFramework curatorFramework2 = createConnectedCuratorFramework(testingServer)) { - ZookeeperStatusService zookeeperStatusService2 = new ZookeeperStatusService(curatorFramework2); + try (Curator curator = createConnectedCuratorFramework(testingServer)) { + ZookeeperStatusService zookeeperStatusService2 = new ZookeeperStatusService(curator); try (MutableStatusRegistry statusRegistry1 = zookeeperStatusService .lockApplicationInstance_forCurrentThreadOnly(TestIds.APPLICATION_INSTANCE_REFERENCE); diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java index 41c774f13c3..15257e11cbe 100644 --- a/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java +++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java @@ -21,6 +21,7 @@ import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.curator.retry.ExponentialBackoffRetry; +import java.io.Closeable; import java.time.Duration; import java.util.Arrays; import java.util.Collections; @@ -40,7 +41,7 @@ import java.util.concurrent.TimeUnit; * @author vegardh * @author bratseth */ -public class Curator { +public class Curator implements AutoCloseable { private static final long UNKNOWN_HOST_TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(30); private static final int ZK_SESSION_TIMEOUT = 30000; @@ -269,6 +270,11 @@ public class Curator { return curatorFramework; } + @Override + public void close() { + curatorFramework.close(); + } + /** * Interface for waiting for completion of an operation */ diff --git a/zkfacade/src/test/java/com/yahoo/vespa/zookeeper/CuratorTest.java b/zkfacade/src/test/java/com/yahoo/vespa/zookeeper/CuratorTest.java index 53d03fa9450..36205bdaca3 100644 --- a/zkfacade/src/test/java/com/yahoo/vespa/zookeeper/CuratorTest.java +++ b/zkfacade/src/test/java/com/yahoo/vespa/zookeeper/CuratorTest.java @@ -52,23 +52,26 @@ public class CuratorTest { @Test public void require_curator_is_created_from_config() { - Curator curator = createCurator(createTestConfig()); - assertThat(curator.connectionSpec(), is(spec1 + "," + spec2)); + try (Curator curator = createCurator(createTestConfig())) { + assertThat(curator.connectionSpec(), is(spec1 + "," + spec2)); + } } @Test public void require_that_curator_can_produce_spec() { - Curator curator = createCurator(createTestConfig()); - assertThat(curator.connectionSpec(), is(spec1 + "," + spec2)); - assertThat(curator.serverCount(), is(2)); + try (Curator curator = createCurator(createTestConfig())) { + assertThat(curator.connectionSpec(), is(spec1 + "," + spec2)); + assertThat(curator.serverCount(), is(2)); + } } @Test public void require_that_server_count_is_correct() { ConfigserverConfig.Builder builder = new ConfigserverConfig.Builder(); builder.zookeeperserver(createZKBuilder("localhost", port1)); - Curator curator = createCurator(new ConfigserverConfig(builder)); - assertThat(curator.serverCount(), is(1)); + try (Curator curator = createCurator(new ConfigserverConfig(builder))) { + assertThat(curator.serverCount(), is(1)); + } } private ConfigserverConfig createTestConfig() { |