diff options
author | Martin Polden <mpolden@mpolden.no> | 2017-09-13 15:10:35 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-09-13 15:10:35 +0200 |
commit | f2bd97d4658bb95821aabe7b304a5bb8d401946f (patch) | |
tree | bdb92ca3aae44cfbe8ee34e9c56bca2567894978 /controller-server | |
parent | 6c1989052d591ada9d7fbb7efbc58ce518435288 (diff) | |
parent | 5150763830b262e9a5d66b8ab9a7a236a60887c0 (diff) |
Merge pull request #3398 from vespa-engine/mpolden/zookeeper-backed-controller
Enable ZooKeeper server for controller
Diffstat (limited to 'controller-server')
-rw-r--r-- | controller-server/src/main/java/com/yahoo/vespa/hosted/controller/persistence/CuratorDb.java | 80 |
1 files changed, 52 insertions, 28 deletions
diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/persistence/CuratorDb.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/persistence/CuratorDb.java index 5777636fa24..a83a24764ce 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/persistence/CuratorDb.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/persistence/CuratorDb.java @@ -2,17 +2,19 @@ package com.yahoo.vespa.hosted.controller.persistence; import com.google.inject.Inject; +import com.yahoo.cloud.config.ClusterInfoConfig; import com.yahoo.cloud.config.ZookeeperServerConfig; import com.yahoo.component.Version; import com.yahoo.component.Vtag; import com.yahoo.config.provision.ApplicationId; +import com.yahoo.net.HostName; import com.yahoo.path.Path; import com.yahoo.transaction.NestedTransaction; import com.yahoo.vespa.curator.Curator; import com.yahoo.vespa.curator.Lock; -import com.yahoo.vespa.curator.mock.MockCurator; import com.yahoo.vespa.hosted.controller.api.identifiers.TenantId; import com.yahoo.vespa.hosted.controller.application.DeploymentJobs; +import com.yahoo.vespa.zookeeper.ZooKeeperServer; import java.nio.charset.StandardCharsets; import java.time.Duration; @@ -25,15 +27,19 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.logging.Level; import java.util.logging.Logger; +import java.util.stream.Collectors; /** * Curator backed database for storing working state shared between controller servers. * This maps controller specific operations to general curator operations. - * + * * @author bratseth */ public class CuratorDb { + /** Use a nonstandard zk port to avoid interfering with connection to the config server zk cluster */ + private static final int zooKeeperPort = 2281; + private static final Logger log = Logger.getLogger(CuratorDb.class.getName()); private static final Path root = Path.fromString("/controller/v1"); @@ -44,7 +50,8 @@ public class CuratorDb { private final JobQueueSerializer jobQueueSerializer = new JobQueueSerializer(); @SuppressWarnings("unused") // This server is used (only) from the curator instance of this over the network */ - //private final ZooKeeperServer zooKeeperServer; + private final ZooKeeperServer zooKeeperServer; + private final Curator curator; /** @@ -55,35 +62,52 @@ public class CuratorDb { /** Create a curator db which also set up a ZooKeeper server (such that this instance is both client and server) */ @Inject - public CuratorDb() { - // this.zooKeeperServer = new ZooKeeperServer(createZookeeperServerConfig()); - // this.curator = new Curator("localhost:2281"); - //this.zooKeeperServer = null; - this.curator = new MockCurator(); - } - - private static ZookeeperServerConfig createZookeeperServerConfig() { + public CuratorDb(ClusterInfoConfig clusterInfo) { + this.zooKeeperServer = new ZooKeeperServer(toZookeeperServerConfig(clusterInfo)); + this.curator = new Curator(toConnectionSpec(clusterInfo)); + } + + /** Create a curator db which does not set up a server, using the given Curator instance */ + protected CuratorDb(Curator curator) { + this.zooKeeperServer = null; + this.curator = curator; + } + + private static ZookeeperServerConfig toZookeeperServerConfig(ClusterInfoConfig clusterInfo) { ZookeeperServerConfig.Builder b = new ZookeeperServerConfig.Builder(); b.zooKeeperConfigFile("conf/zookeeper/controller-zookeeper.cfg"); b.dataDir("var/controller-zookeeper"); - b.clientPort(2281); + b.clientPort(zooKeeperPort); b.myidFile("var/controller-zookeeper/myid"); - b.myid(0); - ZookeeperServerConfig.Server.Builder server = new ZookeeperServerConfig.Server.Builder(); - server.id(0); - server.hostname("localhost"); - server.quorumPort(2282); - server.electionPort(2283); - b.server(server); + b.myid(myIndex(clusterInfo)); + + for (ClusterInfoConfig.Services clusterMember : clusterInfo.services()) { + ZookeeperServerConfig.Server.Builder server = new ZookeeperServerConfig.Server.Builder(); + server.id(clusterMember.index()); + server.hostname(clusterMember.hostname()); + server.quorumPort(zooKeeperPort + 1); + server.electionPort(zooKeeperPort + 2); + b.server(server); + } return new ZookeeperServerConfig(b); } - /** Create a curator db which does not set uop a server, using the given Curator instance */ - protected CuratorDb(Curator curator) { - //this.zooKeeperServer = null; - this.curator = curator; + private static Integer myIndex(ClusterInfoConfig clusterInfo) { + String hostname = HostName.getLocalhost(); + return clusterInfo.services().stream() + .filter(service -> service.hostname().equals(hostname)) + .map(ClusterInfoConfig.Services::index) + .findFirst() + .orElseThrow(() -> new IllegalStateException("Unable to find index for this node by hostname '" + + hostname + "'")); } - + + private static String toConnectionSpec(ClusterInfoConfig clusterInfo) { + return clusterInfo.services().stream() + .map(member -> member.hostname() + ":" + zooKeeperPort) + .collect(Collectors.joining(",")); + } + // -------------- Locks -------------------------------------------------- public Lock lock(TenantId id, Duration timeout) { @@ -123,13 +147,13 @@ public class CuratorDb { if (! data.isPresent() || data.get().length == 0) return Vtag.currentVersion; return Version.fromString(new String(data.get(), StandardCharsets.UTF_8)); } - + public void writeSystemVersion(Version version) { NestedTransaction transaction = new NestedTransaction(); curator.set(systemVersionPath(), version.toString().getBytes(StandardCharsets.UTF_8)); transaction.commit(); } - + public Set<String> readInactiveJobs() { try { Optional<byte[]> data = curator.getData(inactiveJobsPath()); @@ -167,9 +191,9 @@ public class CuratorDb { curator.set(jobQueuePath(jobType), jobQueueSerializer.toJson(queue)); transaction.commit(); } - + // -------------- Paths -------------------------------------------------- - + private Path systemVersionPath() { return root.append("systemVersion"); } |