summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHåkon Hallingstad <hakon@yahooinc.com>2022-04-19 14:19:51 +0200
committerHåkon Hallingstad <hakon@yahooinc.com>2022-04-19 14:19:51 +0200
commitdd28e17023492c9dd67084c0a5ea0467101a60e9 (patch)
tree66c4704d063194860caba37c72a94fceda58110f
parent1e4a7bd6c22f52d7c6cd57b6f48dbfa5a89388b5 (diff)
ZooKeeperPaths
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/MasterDataGatherer.java16
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/ZooKeeperDatabase.java73
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/ZooKeeperPaths.java24
3 files changed, 66 insertions, 47 deletions
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/MasterDataGatherer.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/MasterDataGatherer.java
index 907f2e0c5e9..7852ab2e031 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/MasterDataGatherer.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/MasterDataGatherer.java
@@ -24,7 +24,7 @@ public class MasterDataGatherer {
return Integer.parseInt(nodeName.substring(lastSlash + 1));
}
- private final String zooKeeperRoot; // The root path in zookeeper, typically /vespa/fleetcontroller/<clustername>/
+ private final ZooKeeperPaths paths;
private Map<Integer, Integer> masterData = new TreeMap<>(); // The master state last reported to the fleetcontroller
private final Map<Integer, Integer> nextMasterData = new TreeMap<>(); // Temporary master state while gathering new info from zookeeper
private final AsyncCallback.ChildrenCallback childListener = new DirCallback(); // Dir change listener
@@ -46,7 +46,7 @@ public class MasterDataGatherer {
switch (watchedEvent.getType()) {
case NodeChildrenChanged: // Fleetcontrollers have either connected or disconnected to ZooKeeper
log.log(Level.INFO, "Fleetcontroller " + nodeIndex + ": A change occurred in the list of registered fleetcontrollers. Requesting new information");
- session.getChildren(zooKeeperRoot + "indexes", this, childListener, null);
+ session.getChildren(paths.indexesRoot(), this, childListener, null);
break;
case NodeDataChanged: // A fleetcontroller has changed what node it is voting for
log.log(Level.INFO, "Fleetcontroller " + nodeIndex + ": Altered data in node " + watchedEvent.getPath() + ". Requesting new vote");
@@ -54,7 +54,7 @@ public class MasterDataGatherer {
synchronized (nextMasterData) {
nextMasterData.put(index, null);
}
- session.getData(zooKeeperRoot + "indexes/" + index, this, nodeListener, null);
+ session.getData(paths.indexesOf(index), this, nodeListener, null);
break;
case NodeCreated: // How can this happen? Can one leave watches on non-existing nodes?
log.log(Level.WARNING, "Fleetcontroller " + nodeIndex + ": Got unexpected ZooKeeper event NodeCreated");
@@ -85,8 +85,8 @@ public class MasterDataGatherer {
int index = Integer.parseInt(node);
nextMasterData.put(index, null);
log.log(Level.FINE, () -> "Fleetcontroller " + nodeIndex + ": Attempting to fetch data in node '"
- + zooKeeperRoot + index + "' to see vote");
- session.getData(zooKeeperRoot + "indexes/" + index, changeWatcher, nodeListener, null);
+ + paths.indexesOf(index) + "' to see vote");
+ session.getData(paths.indexesOf(index), changeWatcher, nodeListener, null);
// Invocation of cycleCompleted() for fully accumulated election state will happen
// as soon as all getData calls have been processed.
}
@@ -146,8 +146,8 @@ public class MasterDataGatherer {
}
/** Constructor setting up the various needed members, and initializing the first data fetch to start things up */
- public MasterDataGatherer(ZooKeeper session, String zooKeeperRoot, Database.DatabaseListener listener, int nodeIndex) {
- this.zooKeeperRoot = zooKeeperRoot;
+ public MasterDataGatherer(ZooKeeper session, ZooKeeperPaths paths, Database.DatabaseListener listener, int nodeIndex) {
+ this.paths = paths;
this.session = session;
this.listener = listener;
this.nodeIndex = nodeIndex;
@@ -161,7 +161,7 @@ public class MasterDataGatherer {
synchronized (nextMasterData) {
masterData = new TreeMap<>();
nextMasterData.clear();
- session.getChildren(zooKeeperRoot + "indexes", changeWatcher, childListener, null);
+ session.getChildren(paths.indexesRoot(), changeWatcher, childListener, null);
}
}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/ZooKeeperDatabase.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/ZooKeeperDatabase.java
index 72c81489351..b7b4ef14117 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/ZooKeeperDatabase.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/ZooKeeperDatabase.java
@@ -38,7 +38,7 @@ public class ZooKeeperDatabase extends Database {
private static final Charset utf8 = StandardCharsets.UTF_8;
private static final List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;
- private final String zooKeeperRoot;
+ private final ZooKeeperPaths paths;
private final Database.DatabaseListener listener;
private final ZooKeeperWatcher watcher = new ZooKeeperWatcher();
private final ZooKeeper session;
@@ -105,28 +105,28 @@ public class ZooKeeperDatabase extends Database {
public ZooKeeperDatabase(FleetControllerContext context, ContentCluster cluster, int nodeIndex, String address, int timeout, DatabaseListener zksl) throws IOException, KeeperException, InterruptedException {
this.context = context;
this.nodeIndex = nodeIndex;
- zooKeeperRoot = "/vespa/fleetcontroller/" + cluster.getName() + "/";
+ this.paths = new ZooKeeperPaths(cluster.getName(), nodeIndex);
session = new ZooKeeper(address, timeout, watcher, new ZkClientConfigBuilder().toConfig());
boolean completedOk = false;
try{
this.listener = zksl;
setupRoot();
context.log(log, Level.FINEST, "Asking for initial data on master election");
- masterDataGatherer = new MasterDataGatherer(session, zooKeeperRoot, listener, nodeIndex);
+ masterDataGatherer = new MasterDataGatherer(session, paths, listener, nodeIndex);
completedOk = true;
} finally {
if (!completedOk) session.close();
}
}
- private void createNode(String prefix, String nodename, byte[] value) throws KeeperException, InterruptedException {
+ private void createNode(String path, byte[] value) throws KeeperException, InterruptedException {
try{
- if (session.exists(prefix + nodename, false) != null) {
- context.log(log, Level.FINE, () -> "Zookeeper node '" + prefix + nodename + "' already exists. Not creating it");
+ if (session.exists(path, false) != null) {
+ context.log(log, Level.FINE, () -> "Zookeeper node '" + path + "' already exists. Not creating it");
return;
}
- session.create(prefix + nodename, value, acl, CreateMode.PERSISTENT);
- context.log(log, Level.FINE, () -> "Created zookeeper node '" + prefix + nodename + "'");
+ session.create(path, value, acl, CreateMode.PERSISTENT);
+ context.log(log, Level.FINE, () -> "Created zookeeper node '" + path + "'");
} catch (KeeperException.NodeExistsException e) {
context.log(log, Level.FINE, "Node to create existed, but this is normal as other nodes " +
"may create them at the same time.");
@@ -134,21 +134,21 @@ public class ZooKeeperDatabase extends Database {
}
private void setupRoot() throws KeeperException, InterruptedException {
- String[] pathElements = zooKeeperRoot.substring(1).split("/");
+ String[] pathElements = paths.root().substring(1).split("/");
String path = "";
for (String elem : pathElements) {
path += "/" + elem;
- createNode("", path, new byte[0]);
+ createNode(path, new byte[0]);
}
- createNode(zooKeeperRoot, "indexes", new byte[0]);
- createNode(zooKeeperRoot, "wantedstates", new byte[0]);
- createNode(zooKeeperRoot, "starttimestamps", new byte[0]);
- createNode(zooKeeperRoot, "latestversion", Integer.valueOf(0).toString().getBytes(utf8));
- createNode(zooKeeperRoot, "published_state_bundle", new byte[0]); // TODO dedupe string constants
+ createNode(paths.indexesRoot(), new byte[0]);
+ createNode(paths.wantedStates(), new byte[0]);
+ createNode(paths.startTimestamps(), new byte[0]);
+ createNode(paths.latestVersion(), Integer.valueOf(0).toString().getBytes(utf8));
+ createNode(paths.publishedStateBundle(), new byte[0]); // TODO dedupe string constants
byte[] val = String.valueOf(nodeIndex).getBytes(utf8);
- deleteNodeIfExists(getMyIndexPath());
+ deleteNodeIfExists(paths.indexesOfMe());
context.log(log, Level.INFO, "Creating ephemeral master vote node with vote to self.");
- session.create(getMyIndexPath(), val, acl, CreateMode.EPHEMERAL);
+ session.create(paths.indexesOfMe(), val, acl, CreateMode.EPHEMERAL);
}
private void deleteNodeIfExists(String path) throws KeeperException, InterruptedException {
@@ -158,10 +158,6 @@ public class ZooKeeperDatabase extends Database {
}
}
- private String getMyIndexPath() {
- return zooKeeperRoot + "indexes/" + nodeIndex;
- }
-
/**
* If this is called, we assume we're in shutdown situation, or we are doing it because we need a new session.
* Thus we only need to free up resources, no need to notify anyone.
@@ -192,7 +188,7 @@ public class ZooKeeperDatabase extends Database {
public boolean storeMasterVote(int wantedMasterIndex) {
byte[] val = String.valueOf(wantedMasterIndex).getBytes(utf8);
try{
- session.setData(getMyIndexPath(), val, -1);
+ session.setData(paths.indexesOfMe(), val, -1);
context.log(log, Level.INFO, "Stored new vote in ephemeral node. " + nodeIndex + " -> " + wantedMasterIndex);
return true;
} catch (InterruptedException e) {
@@ -206,7 +202,7 @@ public class ZooKeeperDatabase extends Database {
byte[] data = Integer.toString(version).getBytes(utf8);
try{
context.log(log, Level.INFO, "Storing new cluster state version in ZooKeeper: " + version);
- var stat = session.setData(zooKeeperRoot + "latestversion", data, lastKnownStateVersionZNodeVersion);
+ var stat = session.setData(paths.latestVersion(), data, lastKnownStateVersionZNodeVersion);
lastKnownStateVersionZNodeVersion = stat.getVersion();
return true;
} catch (InterruptedException e) {
@@ -222,17 +218,17 @@ public class ZooKeeperDatabase extends Database {
public Integer retrieveLatestSystemStateVersion() {
Stat stat = new Stat();
- context.log(log, Level.FINE, "Fetching latest cluster state at '%slatestversion'", zooKeeperRoot);
+ context.log(log, Level.FINE, "Fetching latest cluster state at '%s'", paths.latestVersion());
final byte[] data;
try {
- data = session.getData(zooKeeperRoot + "latestversion", false, stat);
+ data = session.getData(paths.latestVersion(), false, stat);
} catch (KeeperException.NoNodeException e) {
// Initial condition: No latest version has ever been written (or ZK state completely wiped!)
lastKnownStateVersionZNodeVersion = 0;
maybeLogExceptionWarning(e, "No latest system state found");
return null;
} catch (InterruptedException | KeeperException e) {
- throw new RuntimeException("Failed to get " + zooKeeperRoot + "latestversion", e);
+ throw new RuntimeException("Failed to get " + paths.latestVersion(), e);
}
lastKnownStateVersionZNodeVersion = stat.getVersion();
@@ -257,8 +253,8 @@ public class ZooKeeperDatabase extends Database {
}
byte[] val = sb.toString().getBytes(utf8);
try{
- context.log(log, Level.FINE, () -> "Storing wanted states at '" + zooKeeperRoot + "wantedstates'");
- session.setData(zooKeeperRoot + "wantedstates", val, -1);
+ context.log(log, Level.FINE, () -> "Storing wanted states at '" + paths.wantedStates() + "'");
+ session.setData(paths.wantedStates(), val, -1);
return true;
} catch (InterruptedException e) {
throw new RuntimeException(e);
@@ -270,9 +266,9 @@ public class ZooKeeperDatabase extends Database {
public Map<Node, NodeState> retrieveWantedStates() {
try{
- context.log(log, Level.FINE, () -> "Fetching wanted states at '" + zooKeeperRoot + "wantedstates'");
+ context.log(log, Level.FINE, () -> "Fetching wanted states at '" + paths.wantedStates() + "'");
Stat stat = new Stat();
- byte[] data = session.getData(zooKeeperRoot + "wantedstates", false, stat);
+ byte[] data = session.getData(paths.wantedStates(), false, stat);
Map<Node, NodeState> wanted = new TreeMap<>();
if (data != null && data.length > 0) {
StringTokenizer st = new StringTokenizer(new String(data, utf8), "\n", false);
@@ -308,8 +304,8 @@ public class ZooKeeperDatabase extends Database {
}
byte val[] = sb.toString().getBytes(utf8);
try{
- context.log(log, Level.FINE, () -> "Storing start timestamps at '" + zooKeeperRoot + "starttimestamps");
- session.setData(zooKeeperRoot + "starttimestamps", val, -1);
+ context.log(log, Level.FINE, () -> "Storing start timestamps at '" + paths.startTimestamps() + "'");
+ session.setData(paths.startTimestamps(), val, -1);
return true;
} catch (InterruptedException e) {
throw new RuntimeException(e);
@@ -322,9 +318,9 @@ public class ZooKeeperDatabase extends Database {
@Override
public Map<Node, Long> retrieveStartTimestamps() {
try{
- context.log(log, Level.FINE, () -> "Fetching start timestamps at '" + zooKeeperRoot + "starttimestamps'");
+ context.log(log, Level.FINE, () -> "Fetching start timestamps at '" + paths.startTimestamps() + "'");
Stat stat = new Stat();
- byte[] data = session.getData(zooKeeperRoot + "starttimestamps", false, stat);
+ byte[] data = session.getData(paths.startTimestamps(), false, stat);
Map<Node, Long> wanted = new TreeMap<Node, Long>();
if (data != null && data.length > 0) {
StringTokenizer st = new StringTokenizer(new String(data, utf8), "\n", false);
@@ -357,10 +353,9 @@ public class ZooKeeperDatabase extends Database {
try{
context.log(log,
Level.FINE,
- () -> String.format("Storing published state bundle %s at " +
- "'%spublished_state_bundle' with expected znode version %d",
- stateBundle, zooKeeperRoot, lastKnownStateBundleZNodeVersion));
- var stat = session.setData(zooKeeperRoot + "published_state_bundle", encodedBundle, lastKnownStateBundleZNodeVersion);
+ () -> String.format("Storing published state bundle %s at '%s' with expected znode version %d",
+ stateBundle, paths.publishedStateBundle(), lastKnownStateBundleZNodeVersion));
+ var stat = session.setData(paths.publishedStateBundle(), encodedBundle, lastKnownStateBundleZNodeVersion);
lastKnownStateBundleZNodeVersion = stat.getVersion();
} catch (InterruptedException e) {
throw new RuntimeException(e);
@@ -378,7 +373,7 @@ public class ZooKeeperDatabase extends Database {
public ClusterStateBundle retrieveLastPublishedStateBundle() {
Stat stat = new Stat();
try {
- byte[] data = session.getData(zooKeeperRoot + "published_state_bundle", false, stat);
+ byte[] data = session.getData(paths.publishedStateBundle(), false, stat);
lastKnownStateBundleZNodeVersion = stat.getVersion();
if (data != null && data.length != 0) {
EnvelopedClusterStateBundleCodec envelopedBundleCodec = new SlimeClusterStateBundleCodec();
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/ZooKeeperPaths.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/ZooKeeperPaths.java
new file mode 100644
index 00000000000..b3b6718a7bc
--- /dev/null
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/ZooKeeperPaths.java
@@ -0,0 +1,24 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.clustercontroller.core.database;
+
+/**
+ * @author hakonhall
+ */
+public class ZooKeeperPaths {
+ private final String root;
+ private final int nodeIndex;
+
+ public ZooKeeperPaths(String clusterName, int nodeIndex) {
+ this.root = "/vespa/fleetcontroller/" + clusterName;
+ this.nodeIndex = nodeIndex;
+ }
+
+ public String root() { return root; }
+ public String indexesRoot() { return root + "/indexes"; }
+ public String indexesOf(int index) { return indexesRoot() + "/" + index; }
+ public String indexesOfMe() { return indexesOf(nodeIndex); }
+ public String wantedStates() { return root + "/wantedstates"; }
+ public String publishedStateBundle() { return root + "/published_state_bundle"; }
+ public String latestVersion() { return root + "/latestversion"; }
+ public String startTimestamps() { return root + "/starttimestamps"; }
+}