summaryrefslogtreecommitdiffstats
path: root/controller-server
diff options
context:
space:
mode:
authorMartin Polden <mpolden@mpolden.no>2017-09-13 15:10:35 +0200
committerGitHub <noreply@github.com>2017-09-13 15:10:35 +0200
commitf2bd97d4658bb95821aabe7b304a5bb8d401946f (patch)
treebdb92ca3aae44cfbe8ee34e9c56bca2567894978 /controller-server
parent6c1989052d591ada9d7fbb7efbc58ce518435288 (diff)
parent5150763830b262e9a5d66b8ab9a7a236a60887c0 (diff)
Merge pull request #3398 from vespa-engine/mpolden/zookeeper-backed-controller
Enable ZooKeeper server for controller
Diffstat (limited to 'controller-server')
-rw-r--r--controller-server/src/main/java/com/yahoo/vespa/hosted/controller/persistence/CuratorDb.java80
1 files changed, 52 insertions, 28 deletions
diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/persistence/CuratorDb.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/persistence/CuratorDb.java
index 5777636fa24..a83a24764ce 100644
--- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/persistence/CuratorDb.java
+++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/persistence/CuratorDb.java
@@ -2,17 +2,19 @@
package com.yahoo.vespa.hosted.controller.persistence;
import com.google.inject.Inject;
+import com.yahoo.cloud.config.ClusterInfoConfig;
import com.yahoo.cloud.config.ZookeeperServerConfig;
import com.yahoo.component.Version;
import com.yahoo.component.Vtag;
import com.yahoo.config.provision.ApplicationId;
+import com.yahoo.net.HostName;
import com.yahoo.path.Path;
import com.yahoo.transaction.NestedTransaction;
import com.yahoo.vespa.curator.Curator;
import com.yahoo.vespa.curator.Lock;
-import com.yahoo.vespa.curator.mock.MockCurator;
import com.yahoo.vespa.hosted.controller.api.identifiers.TenantId;
import com.yahoo.vespa.hosted.controller.application.DeploymentJobs;
+import com.yahoo.vespa.zookeeper.ZooKeeperServer;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
@@ -25,15 +27,19 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Level;
import java.util.logging.Logger;
+import java.util.stream.Collectors;
/**
* Curator backed database for storing working state shared between controller servers.
* This maps controller specific operations to general curator operations.
- *
+ *
* @author bratseth
*/
public class CuratorDb {
+ /** Use a nonstandard zk port to avoid interfering with connection to the config server zk cluster */
+ private static final int zooKeeperPort = 2281;
+
private static final Logger log = Logger.getLogger(CuratorDb.class.getName());
private static final Path root = Path.fromString("/controller/v1");
@@ -44,7 +50,8 @@ public class CuratorDb {
private final JobQueueSerializer jobQueueSerializer = new JobQueueSerializer();
@SuppressWarnings("unused") // This server is used (only) from the curator instance of this over the network */
- //private final ZooKeeperServer zooKeeperServer;
+ private final ZooKeeperServer zooKeeperServer;
+
private final Curator curator;
/**
@@ -55,35 +62,52 @@ public class CuratorDb {
/** Create a curator db which also set up a ZooKeeper server (such that this instance is both client and server) */
@Inject
- public CuratorDb() {
- // this.zooKeeperServer = new ZooKeeperServer(createZookeeperServerConfig());
- // this.curator = new Curator("localhost:2281");
- //this.zooKeeperServer = null;
- this.curator = new MockCurator();
- }
-
- private static ZookeeperServerConfig createZookeeperServerConfig() {
+ public CuratorDb(ClusterInfoConfig clusterInfo) {
+ this.zooKeeperServer = new ZooKeeperServer(toZookeeperServerConfig(clusterInfo));
+ this.curator = new Curator(toConnectionSpec(clusterInfo));
+ }
+
+ /** Create a curator db which does not set up a server, using the given Curator instance */
+ protected CuratorDb(Curator curator) {
+ this.zooKeeperServer = null;
+ this.curator = curator;
+ }
+
+ private static ZookeeperServerConfig toZookeeperServerConfig(ClusterInfoConfig clusterInfo) {
ZookeeperServerConfig.Builder b = new ZookeeperServerConfig.Builder();
b.zooKeeperConfigFile("conf/zookeeper/controller-zookeeper.cfg");
b.dataDir("var/controller-zookeeper");
- b.clientPort(2281);
+ b.clientPort(zooKeeperPort);
b.myidFile("var/controller-zookeeper/myid");
- b.myid(0);
- ZookeeperServerConfig.Server.Builder server = new ZookeeperServerConfig.Server.Builder();
- server.id(0);
- server.hostname("localhost");
- server.quorumPort(2282);
- server.electionPort(2283);
- b.server(server);
+ b.myid(myIndex(clusterInfo));
+
+ for (ClusterInfoConfig.Services clusterMember : clusterInfo.services()) {
+ ZookeeperServerConfig.Server.Builder server = new ZookeeperServerConfig.Server.Builder();
+ server.id(clusterMember.index());
+ server.hostname(clusterMember.hostname());
+ server.quorumPort(zooKeeperPort + 1);
+ server.electionPort(zooKeeperPort + 2);
+ b.server(server);
+ }
return new ZookeeperServerConfig(b);
}
- /** Create a curator db which does not set uop a server, using the given Curator instance */
- protected CuratorDb(Curator curator) {
- //this.zooKeeperServer = null;
- this.curator = curator;
+ private static Integer myIndex(ClusterInfoConfig clusterInfo) {
+ String hostname = HostName.getLocalhost();
+ return clusterInfo.services().stream()
+ .filter(service -> service.hostname().equals(hostname))
+ .map(ClusterInfoConfig.Services::index)
+ .findFirst()
+ .orElseThrow(() -> new IllegalStateException("Unable to find index for this node by hostname '" +
+ hostname + "'"));
}
-
+
+ private static String toConnectionSpec(ClusterInfoConfig clusterInfo) {
+ return clusterInfo.services().stream()
+ .map(member -> member.hostname() + ":" + zooKeeperPort)
+ .collect(Collectors.joining(","));
+ }
+
// -------------- Locks --------------------------------------------------
public Lock lock(TenantId id, Duration timeout) {
@@ -123,13 +147,13 @@ public class CuratorDb {
if (! data.isPresent() || data.get().length == 0) return Vtag.currentVersion;
return Version.fromString(new String(data.get(), StandardCharsets.UTF_8));
}
-
+
public void writeSystemVersion(Version version) {
NestedTransaction transaction = new NestedTransaction();
curator.set(systemVersionPath(), version.toString().getBytes(StandardCharsets.UTF_8));
transaction.commit();
}
-
+
public Set<String> readInactiveJobs() {
try {
Optional<byte[]> data = curator.getData(inactiveJobsPath());
@@ -167,9 +191,9 @@ public class CuratorDb {
curator.set(jobQueuePath(jobType), jobQueueSerializer.toJson(queue));
transaction.commit();
}
-
+
// -------------- Paths --------------------------------------------------
-
+
private Path systemVersionPath() {
return root.append("systemVersion");
}