diff options
author | Håkon Hallingstad <hakon@yahooinc.com> | 2022-04-19 14:19:51 +0200 |
---|---|---|
committer | Håkon Hallingstad <hakon@yahooinc.com> | 2022-04-19 14:19:51 +0200 |
commit | dd28e17023492c9dd67084c0a5ea0467101a60e9 (patch) | |
tree | 66c4704d063194860caba37c72a94fceda58110f | |
parent | 1e4a7bd6c22f52d7c6cd57b6f48dbfa5a89388b5 (diff) |
ZooKeeperPaths
3 files changed, 66 insertions, 47 deletions
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/MasterDataGatherer.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/MasterDataGatherer.java index 907f2e0c5e9..7852ab2e031 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/MasterDataGatherer.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/MasterDataGatherer.java @@ -24,7 +24,7 @@ public class MasterDataGatherer { return Integer.parseInt(nodeName.substring(lastSlash + 1)); } - private final String zooKeeperRoot; // The root path in zookeeper, typically /vespa/fleetcontroller/<clustername>/ + private final ZooKeeperPaths paths; private Map<Integer, Integer> masterData = new TreeMap<>(); // The master state last reported to the fleetcontroller private final Map<Integer, Integer> nextMasterData = new TreeMap<>(); // Temporary master state while gathering new info from zookeeper private final AsyncCallback.ChildrenCallback childListener = new DirCallback(); // Dir change listener @@ -46,7 +46,7 @@ public class MasterDataGatherer { switch (watchedEvent.getType()) { case NodeChildrenChanged: // Fleetcontrollers have either connected or disconnected to ZooKeeper log.log(Level.INFO, "Fleetcontroller " + nodeIndex + ": A change occurred in the list of registered fleetcontrollers. Requesting new information"); - session.getChildren(zooKeeperRoot + "indexes", this, childListener, null); + session.getChildren(paths.indexesRoot(), this, childListener, null); break; case NodeDataChanged: // A fleetcontroller has changed what node it is voting for log.log(Level.INFO, "Fleetcontroller " + nodeIndex + ": Altered data in node " + watchedEvent.getPath() + ". Requesting new vote"); @@ -54,7 +54,7 @@ public class MasterDataGatherer { synchronized (nextMasterData) { nextMasterData.put(index, null); } - session.getData(zooKeeperRoot + "indexes/" + index, this, nodeListener, null); + session.getData(paths.indexesOf(index), this, nodeListener, null); break; case NodeCreated: // How can this happen? Can one leave watches on non-existing nodes? log.log(Level.WARNING, "Fleetcontroller " + nodeIndex + ": Got unexpected ZooKeeper event NodeCreated"); @@ -85,8 +85,8 @@ public class MasterDataGatherer { int index = Integer.parseInt(node); nextMasterData.put(index, null); log.log(Level.FINE, () -> "Fleetcontroller " + nodeIndex + ": Attempting to fetch data in node '" - + zooKeeperRoot + index + "' to see vote"); - session.getData(zooKeeperRoot + "indexes/" + index, changeWatcher, nodeListener, null); + + paths.indexesOf(index) + "' to see vote"); + session.getData(paths.indexesOf(index), changeWatcher, nodeListener, null); // Invocation of cycleCompleted() for fully accumulated election state will happen // as soon as all getData calls have been processed. } @@ -146,8 +146,8 @@ public class MasterDataGatherer { } /** Constructor setting up the various needed members, and initializing the first data fetch to start things up */ - public MasterDataGatherer(ZooKeeper session, String zooKeeperRoot, Database.DatabaseListener listener, int nodeIndex) { - this.zooKeeperRoot = zooKeeperRoot; + public MasterDataGatherer(ZooKeeper session, ZooKeeperPaths paths, Database.DatabaseListener listener, int nodeIndex) { + this.paths = paths; this.session = session; this.listener = listener; this.nodeIndex = nodeIndex; @@ -161,7 +161,7 @@ public class MasterDataGatherer { synchronized (nextMasterData) { masterData = new TreeMap<>(); nextMasterData.clear(); - session.getChildren(zooKeeperRoot + "indexes", changeWatcher, childListener, null); + session.getChildren(paths.indexesRoot(), changeWatcher, childListener, null); } } diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/ZooKeeperDatabase.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/ZooKeeperDatabase.java index 72c81489351..b7b4ef14117 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/ZooKeeperDatabase.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/ZooKeeperDatabase.java @@ -38,7 +38,7 @@ public class ZooKeeperDatabase extends Database { private static final Charset utf8 = StandardCharsets.UTF_8; private static final List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE; - private final String zooKeeperRoot; + private final ZooKeeperPaths paths; private final Database.DatabaseListener listener; private final ZooKeeperWatcher watcher = new ZooKeeperWatcher(); private final ZooKeeper session; @@ -105,28 +105,28 @@ public class ZooKeeperDatabase extends Database { public ZooKeeperDatabase(FleetControllerContext context, ContentCluster cluster, int nodeIndex, String address, int timeout, DatabaseListener zksl) throws IOException, KeeperException, InterruptedException { this.context = context; this.nodeIndex = nodeIndex; - zooKeeperRoot = "/vespa/fleetcontroller/" + cluster.getName() + "/"; + this.paths = new ZooKeeperPaths(cluster.getName(), nodeIndex); session = new ZooKeeper(address, timeout, watcher, new ZkClientConfigBuilder().toConfig()); boolean completedOk = false; try{ this.listener = zksl; setupRoot(); context.log(log, Level.FINEST, "Asking for initial data on master election"); - masterDataGatherer = new MasterDataGatherer(session, zooKeeperRoot, listener, nodeIndex); + masterDataGatherer = new MasterDataGatherer(session, paths, listener, nodeIndex); completedOk = true; } finally { if (!completedOk) session.close(); } } - private void createNode(String prefix, String nodename, byte[] value) throws KeeperException, InterruptedException { + private void createNode(String path, byte[] value) throws KeeperException, InterruptedException { try{ - if (session.exists(prefix + nodename, false) != null) { - context.log(log, Level.FINE, () -> "Zookeeper node '" + prefix + nodename + "' already exists. Not creating it"); + if (session.exists(path, false) != null) { + context.log(log, Level.FINE, () -> "Zookeeper node '" + path + "' already exists. Not creating it"); return; } - session.create(prefix + nodename, value, acl, CreateMode.PERSISTENT); - context.log(log, Level.FINE, () -> "Created zookeeper node '" + prefix + nodename + "'"); + session.create(path, value, acl, CreateMode.PERSISTENT); + context.log(log, Level.FINE, () -> "Created zookeeper node '" + path + "'"); } catch (KeeperException.NodeExistsException e) { context.log(log, Level.FINE, "Node to create existed, but this is normal as other nodes " + "may create them at the same time."); @@ -134,21 +134,21 @@ public class ZooKeeperDatabase extends Database { } private void setupRoot() throws KeeperException, InterruptedException { - String[] pathElements = zooKeeperRoot.substring(1).split("/"); + String[] pathElements = paths.root().substring(1).split("/"); String path = ""; for (String elem : pathElements) { path += "/" + elem; - createNode("", path, new byte[0]); + createNode(path, new byte[0]); } - createNode(zooKeeperRoot, "indexes", new byte[0]); - createNode(zooKeeperRoot, "wantedstates", new byte[0]); - createNode(zooKeeperRoot, "starttimestamps", new byte[0]); - createNode(zooKeeperRoot, "latestversion", Integer.valueOf(0).toString().getBytes(utf8)); - createNode(zooKeeperRoot, "published_state_bundle", new byte[0]); // TODO dedupe string constants + createNode(paths.indexesRoot(), new byte[0]); + createNode(paths.wantedStates(), new byte[0]); + createNode(paths.startTimestamps(), new byte[0]); + createNode(paths.latestVersion(), Integer.valueOf(0).toString().getBytes(utf8)); + createNode(paths.publishedStateBundle(), new byte[0]); // TODO dedupe string constants byte[] val = String.valueOf(nodeIndex).getBytes(utf8); - deleteNodeIfExists(getMyIndexPath()); + deleteNodeIfExists(paths.indexesOfMe()); context.log(log, Level.INFO, "Creating ephemeral master vote node with vote to self."); - session.create(getMyIndexPath(), val, acl, CreateMode.EPHEMERAL); + session.create(paths.indexesOfMe(), val, acl, CreateMode.EPHEMERAL); } private void deleteNodeIfExists(String path) throws KeeperException, InterruptedException { @@ -158,10 +158,6 @@ public class ZooKeeperDatabase extends Database { } } - private String getMyIndexPath() { - return zooKeeperRoot + "indexes/" + nodeIndex; - } - /** * If this is called, we assume we're in shutdown situation, or we are doing it because we need a new session. * Thus we only need to free up resources, no need to notify anyone. @@ -192,7 +188,7 @@ public class ZooKeeperDatabase extends Database { public boolean storeMasterVote(int wantedMasterIndex) { byte[] val = String.valueOf(wantedMasterIndex).getBytes(utf8); try{ - session.setData(getMyIndexPath(), val, -1); + session.setData(paths.indexesOfMe(), val, -1); context.log(log, Level.INFO, "Stored new vote in ephemeral node. " + nodeIndex + " -> " + wantedMasterIndex); return true; } catch (InterruptedException e) { @@ -206,7 +202,7 @@ public class ZooKeeperDatabase extends Database { byte[] data = Integer.toString(version).getBytes(utf8); try{ context.log(log, Level.INFO, "Storing new cluster state version in ZooKeeper: " + version); - var stat = session.setData(zooKeeperRoot + "latestversion", data, lastKnownStateVersionZNodeVersion); + var stat = session.setData(paths.latestVersion(), data, lastKnownStateVersionZNodeVersion); lastKnownStateVersionZNodeVersion = stat.getVersion(); return true; } catch (InterruptedException e) { @@ -222,17 +218,17 @@ public class ZooKeeperDatabase extends Database { public Integer retrieveLatestSystemStateVersion() { Stat stat = new Stat(); - context.log(log, Level.FINE, "Fetching latest cluster state at '%slatestversion'", zooKeeperRoot); + context.log(log, Level.FINE, "Fetching latest cluster state at '%s'", paths.latestVersion()); final byte[] data; try { - data = session.getData(zooKeeperRoot + "latestversion", false, stat); + data = session.getData(paths.latestVersion(), false, stat); } catch (KeeperException.NoNodeException e) { // Initial condition: No latest version has ever been written (or ZK state completely wiped!) lastKnownStateVersionZNodeVersion = 0; maybeLogExceptionWarning(e, "No latest system state found"); return null; } catch (InterruptedException | KeeperException e) { - throw new RuntimeException("Failed to get " + zooKeeperRoot + "latestversion", e); + throw new RuntimeException("Failed to get " + paths.latestVersion(), e); } lastKnownStateVersionZNodeVersion = stat.getVersion(); @@ -257,8 +253,8 @@ public class ZooKeeperDatabase extends Database { } byte[] val = sb.toString().getBytes(utf8); try{ - context.log(log, Level.FINE, () -> "Storing wanted states at '" + zooKeeperRoot + "wantedstates'"); - session.setData(zooKeeperRoot + "wantedstates", val, -1); + context.log(log, Level.FINE, () -> "Storing wanted states at '" + paths.wantedStates() + "'"); + session.setData(paths.wantedStates(), val, -1); return true; } catch (InterruptedException e) { throw new RuntimeException(e); @@ -270,9 +266,9 @@ public class ZooKeeperDatabase extends Database { public Map<Node, NodeState> retrieveWantedStates() { try{ - context.log(log, Level.FINE, () -> "Fetching wanted states at '" + zooKeeperRoot + "wantedstates'"); + context.log(log, Level.FINE, () -> "Fetching wanted states at '" + paths.wantedStates() + "'"); Stat stat = new Stat(); - byte[] data = session.getData(zooKeeperRoot + "wantedstates", false, stat); + byte[] data = session.getData(paths.wantedStates(), false, stat); Map<Node, NodeState> wanted = new TreeMap<>(); if (data != null && data.length > 0) { StringTokenizer st = new StringTokenizer(new String(data, utf8), "\n", false); @@ -308,8 +304,8 @@ public class ZooKeeperDatabase extends Database { } byte val[] = sb.toString().getBytes(utf8); try{ - context.log(log, Level.FINE, () -> "Storing start timestamps at '" + zooKeeperRoot + "starttimestamps"); - session.setData(zooKeeperRoot + "starttimestamps", val, -1); + context.log(log, Level.FINE, () -> "Storing start timestamps at '" + paths.startTimestamps() + "'"); + session.setData(paths.startTimestamps(), val, -1); return true; } catch (InterruptedException e) { throw new RuntimeException(e); @@ -322,9 +318,9 @@ public class ZooKeeperDatabase extends Database { @Override public Map<Node, Long> retrieveStartTimestamps() { try{ - context.log(log, Level.FINE, () -> "Fetching start timestamps at '" + zooKeeperRoot + "starttimestamps'"); + context.log(log, Level.FINE, () -> "Fetching start timestamps at '" + paths.startTimestamps() + "'"); Stat stat = new Stat(); - byte[] data = session.getData(zooKeeperRoot + "starttimestamps", false, stat); + byte[] data = session.getData(paths.startTimestamps(), false, stat); Map<Node, Long> wanted = new TreeMap<Node, Long>(); if (data != null && data.length > 0) { StringTokenizer st = new StringTokenizer(new String(data, utf8), "\n", false); @@ -357,10 +353,9 @@ public class ZooKeeperDatabase extends Database { try{ context.log(log, Level.FINE, - () -> String.format("Storing published state bundle %s at " + - "'%spublished_state_bundle' with expected znode version %d", - stateBundle, zooKeeperRoot, lastKnownStateBundleZNodeVersion)); - var stat = session.setData(zooKeeperRoot + "published_state_bundle", encodedBundle, lastKnownStateBundleZNodeVersion); + () -> String.format("Storing published state bundle %s at '%s' with expected znode version %d", + stateBundle, paths.publishedStateBundle(), lastKnownStateBundleZNodeVersion)); + var stat = session.setData(paths.publishedStateBundle(), encodedBundle, lastKnownStateBundleZNodeVersion); lastKnownStateBundleZNodeVersion = stat.getVersion(); } catch (InterruptedException e) { throw new RuntimeException(e); @@ -378,7 +373,7 @@ public class ZooKeeperDatabase extends Database { public ClusterStateBundle retrieveLastPublishedStateBundle() { Stat stat = new Stat(); try { - byte[] data = session.getData(zooKeeperRoot + "published_state_bundle", false, stat); + byte[] data = session.getData(paths.publishedStateBundle(), false, stat); lastKnownStateBundleZNodeVersion = stat.getVersion(); if (data != null && data.length != 0) { EnvelopedClusterStateBundleCodec envelopedBundleCodec = new SlimeClusterStateBundleCodec(); diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/ZooKeeperPaths.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/ZooKeeperPaths.java new file mode 100644 index 00000000000..b3b6718a7bc --- /dev/null +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/ZooKeeperPaths.java @@ -0,0 +1,24 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.clustercontroller.core.database; + +/** + * @author hakonhall + */ +public class ZooKeeperPaths { + private final String root; + private final int nodeIndex; + + public ZooKeeperPaths(String clusterName, int nodeIndex) { + this.root = "/vespa/fleetcontroller/" + clusterName; + this.nodeIndex = nodeIndex; + } + + public String root() { return root; } + public String indexesRoot() { return root + "/indexes"; } + public String indexesOf(int index) { return indexesRoot() + "/" + index; } + public String indexesOfMe() { return indexesOf(nodeIndex); } + public String wantedStates() { return root + "/wantedstates"; } + public String publishedStateBundle() { return root + "/published_state_bundle"; } + public String latestVersion() { return root + "/latestversion"; } + public String startTimestamps() { return root + "/starttimestamps"; } +} |