diff options
Diffstat (limited to 'clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/ZooKeeperDatabase.java')
-rw-r--r-- | clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/ZooKeeperDatabase.java | 82 |
1 files changed, 37 insertions, 45 deletions
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/ZooKeeperDatabase.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/ZooKeeperDatabase.java index 72c81489351..ea745a56066 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/ZooKeeperDatabase.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/ZooKeeperDatabase.java @@ -6,7 +6,6 @@ import com.yahoo.vdslib.state.NodeState; import com.yahoo.vdslib.state.State; import com.yahoo.vespa.clustercontroller.core.AnnotatedClusterState; import com.yahoo.vespa.clustercontroller.core.ClusterStateBundle; -import com.yahoo.vespa.clustercontroller.core.ContentCluster; import com.yahoo.vespa.clustercontroller.core.FleetControllerContext; import com.yahoo.vespa.clustercontroller.core.rpc.EnvelopedClusterStateBundleCodec; import com.yahoo.vespa.clustercontroller.core.rpc.SlimeClusterStateBundleCodec; @@ -38,13 +37,12 @@ public class ZooKeeperDatabase extends Database { private static final Charset utf8 = StandardCharsets.UTF_8; private static final List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE; - private final String zooKeeperRoot; + private final ZooKeeperPaths paths; private final Database.DatabaseListener listener; private final ZooKeeperWatcher watcher = new ZooKeeperWatcher(); private final ZooKeeper session; private boolean sessionOpen = true; private final FleetControllerContext context; - private final int nodeIndex; private final MasterDataGatherer masterDataGatherer; // Expected ZK znode versions. Note: these are _not_ -1 as that would match anything. // We expect the caller to invoke the load methods prior to calling any store methods. @@ -102,31 +100,30 @@ public class ZooKeeperDatabase extends Database { } } - public ZooKeeperDatabase(FleetControllerContext context, ContentCluster cluster, int nodeIndex, String address, int timeout, DatabaseListener zksl) throws IOException, KeeperException, InterruptedException { + public ZooKeeperDatabase(FleetControllerContext context, String address, int timeout, DatabaseListener zksl) throws IOException, KeeperException, InterruptedException { this.context = context; - this.nodeIndex = nodeIndex; - zooKeeperRoot = "/vespa/fleetcontroller/" + cluster.getName() + "/"; + this.paths = new ZooKeeperPaths(context.id()); session = new ZooKeeper(address, timeout, watcher, new ZkClientConfigBuilder().toConfig()); boolean completedOk = false; try{ this.listener = zksl; setupRoot(); context.log(log, Level.FINEST, "Asking for initial data on master election"); - masterDataGatherer = new MasterDataGatherer(session, zooKeeperRoot, listener, nodeIndex); + masterDataGatherer = new MasterDataGatherer(session, paths, listener, context.id().index()); completedOk = true; } finally { if (!completedOk) session.close(); } } - private void createNode(String prefix, String nodename, byte[] value) throws KeeperException, InterruptedException { + private void createNode(String path, byte[] value) throws KeeperException, InterruptedException { try{ - if (session.exists(prefix + nodename, false) != null) { - context.log(log, Level.FINE, () -> "Zookeeper node '" + prefix + nodename + "' already exists. Not creating it"); + if (session.exists(path, false) != null) { + context.log(log, Level.FINE, () -> "Zookeeper node '" + path + "' already exists. Not creating it"); return; } - session.create(prefix + nodename, value, acl, CreateMode.PERSISTENT); - context.log(log, Level.FINE, () -> "Created zookeeper node '" + prefix + nodename + "'"); + session.create(path, value, acl, CreateMode.PERSISTENT); + context.log(log, Level.FINE, () -> "Created zookeeper node '" + path + "'"); } catch (KeeperException.NodeExistsException e) { context.log(log, Level.FINE, "Node to create existed, but this is normal as other nodes " + "may create them at the same time."); @@ -134,21 +131,21 @@ public class ZooKeeperDatabase extends Database { } private void setupRoot() throws KeeperException, InterruptedException { - String[] pathElements = zooKeeperRoot.substring(1).split("/"); + String[] pathElements = paths.root().substring(1).split("/"); String path = ""; for (String elem : pathElements) { path += "/" + elem; - createNode("", path, new byte[0]); + createNode(path, new byte[0]); } - createNode(zooKeeperRoot, "indexes", new byte[0]); - createNode(zooKeeperRoot, "wantedstates", new byte[0]); - createNode(zooKeeperRoot, "starttimestamps", new byte[0]); - createNode(zooKeeperRoot, "latestversion", Integer.valueOf(0).toString().getBytes(utf8)); - createNode(zooKeeperRoot, "published_state_bundle", new byte[0]); // TODO dedupe string constants - byte[] val = String.valueOf(nodeIndex).getBytes(utf8); - deleteNodeIfExists(getMyIndexPath()); + createNode(paths.indexesRoot(), new byte[0]); + createNode(paths.wantedStates(), new byte[0]); + createNode(paths.startTimestamps(), new byte[0]); + createNode(paths.latestVersion(), Integer.valueOf(0).toString().getBytes(utf8)); + createNode(paths.publishedStateBundle(), new byte[0]); + byte[] val = String.valueOf(context.id().index()).getBytes(utf8); + deleteNodeIfExists(paths.indexOfMe()); context.log(log, Level.INFO, "Creating ephemeral master vote node with vote to self."); - session.create(getMyIndexPath(), val, acl, CreateMode.EPHEMERAL); + session.create(paths.indexOfMe(), val, acl, CreateMode.EPHEMERAL); } private void deleteNodeIfExists(String path) throws KeeperException, InterruptedException { @@ -158,10 +155,6 @@ public class ZooKeeperDatabase extends Database { } } - private String getMyIndexPath() { - return zooKeeperRoot + "indexes/" + nodeIndex; - } - /** * If this is called, we assume we're in shutdown situation, or we are doing it because we need a new session. * Thus we only need to free up resources, no need to notify anyone. @@ -192,8 +185,8 @@ public class ZooKeeperDatabase extends Database { public boolean storeMasterVote(int wantedMasterIndex) { byte[] val = String.valueOf(wantedMasterIndex).getBytes(utf8); try{ - session.setData(getMyIndexPath(), val, -1); - context.log(log, Level.INFO, "Stored new vote in ephemeral node. " + nodeIndex + " -> " + wantedMasterIndex); + session.setData(paths.indexOfMe(), val, -1); + context.log(log, Level.INFO, "Stored new vote in ephemeral node. " + context.id().index() + " -> " + wantedMasterIndex); return true; } catch (InterruptedException e) { throw new RuntimeException(e); @@ -206,7 +199,7 @@ public class ZooKeeperDatabase extends Database { byte[] data = Integer.toString(version).getBytes(utf8); try{ context.log(log, Level.INFO, "Storing new cluster state version in ZooKeeper: " + version); - var stat = session.setData(zooKeeperRoot + "latestversion", data, lastKnownStateVersionZNodeVersion); + var stat = session.setData(paths.latestVersion(), data, lastKnownStateVersionZNodeVersion); lastKnownStateVersionZNodeVersion = stat.getVersion(); return true; } catch (InterruptedException e) { @@ -222,17 +215,17 @@ public class ZooKeeperDatabase extends Database { public Integer retrieveLatestSystemStateVersion() { Stat stat = new Stat(); - context.log(log, Level.FINE, "Fetching latest cluster state at '%slatestversion'", zooKeeperRoot); + context.log(log, Level.FINE, "Fetching latest cluster state at '%s'", paths.latestVersion()); final byte[] data; try { - data = session.getData(zooKeeperRoot + "latestversion", false, stat); + data = session.getData(paths.latestVersion(), false, stat); } catch (KeeperException.NoNodeException e) { // Initial condition: No latest version has ever been written (or ZK state completely wiped!) lastKnownStateVersionZNodeVersion = 0; maybeLogExceptionWarning(e, "No latest system state found"); return null; } catch (InterruptedException | KeeperException e) { - throw new RuntimeException("Failed to get " + zooKeeperRoot + "latestversion", e); + throw new RuntimeException("Failed to get " + paths.latestVersion(), e); } lastKnownStateVersionZNodeVersion = stat.getVersion(); @@ -257,8 +250,8 @@ public class ZooKeeperDatabase extends Database { } byte[] val = sb.toString().getBytes(utf8); try{ - context.log(log, Level.FINE, () -> "Storing wanted states at '" + zooKeeperRoot + "wantedstates'"); - session.setData(zooKeeperRoot + "wantedstates", val, -1); + context.log(log, Level.FINE, () -> "Storing wanted states at '" + paths.wantedStates() + "'"); + session.setData(paths.wantedStates(), val, -1); return true; } catch (InterruptedException e) { throw new RuntimeException(e); @@ -270,9 +263,9 @@ public class ZooKeeperDatabase extends Database { public Map<Node, NodeState> retrieveWantedStates() { try{ - context.log(log, Level.FINE, () -> "Fetching wanted states at '" + zooKeeperRoot + "wantedstates'"); + context.log(log, Level.FINE, () -> "Fetching wanted states at '" + paths.wantedStates() + "'"); Stat stat = new Stat(); - byte[] data = session.getData(zooKeeperRoot + "wantedstates", false, stat); + byte[] data = session.getData(paths.wantedStates(), false, stat); Map<Node, NodeState> wanted = new TreeMap<>(); if (data != null && data.length > 0) { StringTokenizer st = new StringTokenizer(new String(data, utf8), "\n", false); @@ -308,8 +301,8 @@ public class ZooKeeperDatabase extends Database { } byte val[] = sb.toString().getBytes(utf8); try{ - context.log(log, Level.FINE, () -> "Storing start timestamps at '" + zooKeeperRoot + "starttimestamps"); - session.setData(zooKeeperRoot + "starttimestamps", val, -1); + context.log(log, Level.FINE, () -> "Storing start timestamps at '" + paths.startTimestamps() + "'"); + session.setData(paths.startTimestamps(), val, -1); return true; } catch (InterruptedException e) { throw new RuntimeException(e); @@ -322,9 +315,9 @@ public class ZooKeeperDatabase extends Database { @Override public Map<Node, Long> retrieveStartTimestamps() { try{ - context.log(log, Level.FINE, () -> "Fetching start timestamps at '" + zooKeeperRoot + "starttimestamps'"); + context.log(log, Level.FINE, () -> "Fetching start timestamps at '" + paths.startTimestamps() + "'"); Stat stat = new Stat(); - byte[] data = session.getData(zooKeeperRoot + "starttimestamps", false, stat); + byte[] data = session.getData(paths.startTimestamps(), false, stat); Map<Node, Long> wanted = new TreeMap<Node, Long>(); if (data != null && data.length > 0) { StringTokenizer st = new StringTokenizer(new String(data, utf8), "\n", false); @@ -357,10 +350,9 @@ public class ZooKeeperDatabase extends Database { try{ context.log(log, Level.FINE, - () -> String.format("Storing published state bundle %s at " + - "'%spublished_state_bundle' with expected znode version %d", - stateBundle, zooKeeperRoot, lastKnownStateBundleZNodeVersion)); - var stat = session.setData(zooKeeperRoot + "published_state_bundle", encodedBundle, lastKnownStateBundleZNodeVersion); + () -> String.format("Storing published state bundle %s at '%s' with expected znode version %d", + stateBundle, paths.publishedStateBundle(), lastKnownStateBundleZNodeVersion)); + var stat = session.setData(paths.publishedStateBundle(), encodedBundle, lastKnownStateBundleZNodeVersion); lastKnownStateBundleZNodeVersion = stat.getVersion(); } catch (InterruptedException e) { throw new RuntimeException(e); @@ -378,7 +370,7 @@ public class ZooKeeperDatabase extends Database { public ClusterStateBundle retrieveLastPublishedStateBundle() { Stat stat = new Stat(); try { - byte[] data = session.getData(zooKeeperRoot + "published_state_bundle", false, stat); + byte[] data = session.getData(paths.publishedStateBundle(), false, stat); lastKnownStateBundleZNodeVersion = stat.getVersion(); if (data != null && data.length != 0) { EnvelopedClusterStateBundleCodec envelopedBundleCodec = new SlimeClusterStateBundleCodec(); |