aboutsummaryrefslogtreecommitdiffstats
path: root/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/ZooKeeperDatabase.java
diff options
context:
space:
mode:
Diffstat (limited to 'clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/ZooKeeperDatabase.java')
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/ZooKeeperDatabase.java82
1 files changed, 37 insertions, 45 deletions
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/ZooKeeperDatabase.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/ZooKeeperDatabase.java
index 72c81489351..ea745a56066 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/ZooKeeperDatabase.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/ZooKeeperDatabase.java
@@ -6,7 +6,6 @@ import com.yahoo.vdslib.state.NodeState;
import com.yahoo.vdslib.state.State;
import com.yahoo.vespa.clustercontroller.core.AnnotatedClusterState;
import com.yahoo.vespa.clustercontroller.core.ClusterStateBundle;
-import com.yahoo.vespa.clustercontroller.core.ContentCluster;
import com.yahoo.vespa.clustercontroller.core.FleetControllerContext;
import com.yahoo.vespa.clustercontroller.core.rpc.EnvelopedClusterStateBundleCodec;
import com.yahoo.vespa.clustercontroller.core.rpc.SlimeClusterStateBundleCodec;
@@ -38,13 +37,12 @@ public class ZooKeeperDatabase extends Database {
private static final Charset utf8 = StandardCharsets.UTF_8;
private static final List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;
- private final String zooKeeperRoot;
+ private final ZooKeeperPaths paths;
private final Database.DatabaseListener listener;
private final ZooKeeperWatcher watcher = new ZooKeeperWatcher();
private final ZooKeeper session;
private boolean sessionOpen = true;
private final FleetControllerContext context;
- private final int nodeIndex;
private final MasterDataGatherer masterDataGatherer;
// Expected ZK znode versions. Note: these are _not_ -1 as that would match anything.
// We expect the caller to invoke the load methods prior to calling any store methods.
@@ -102,31 +100,30 @@ public class ZooKeeperDatabase extends Database {
}
}
- public ZooKeeperDatabase(FleetControllerContext context, ContentCluster cluster, int nodeIndex, String address, int timeout, DatabaseListener zksl) throws IOException, KeeperException, InterruptedException {
+ public ZooKeeperDatabase(FleetControllerContext context, String address, int timeout, DatabaseListener zksl) throws IOException, KeeperException, InterruptedException {
this.context = context;
- this.nodeIndex = nodeIndex;
- zooKeeperRoot = "/vespa/fleetcontroller/" + cluster.getName() + "/";
+ this.paths = new ZooKeeperPaths(context.id());
session = new ZooKeeper(address, timeout, watcher, new ZkClientConfigBuilder().toConfig());
boolean completedOk = false;
try{
this.listener = zksl;
setupRoot();
context.log(log, Level.FINEST, "Asking for initial data on master election");
- masterDataGatherer = new MasterDataGatherer(session, zooKeeperRoot, listener, nodeIndex);
+ masterDataGatherer = new MasterDataGatherer(session, paths, listener, context.id().index());
completedOk = true;
} finally {
if (!completedOk) session.close();
}
}
- private void createNode(String prefix, String nodename, byte[] value) throws KeeperException, InterruptedException {
+ private void createNode(String path, byte[] value) throws KeeperException, InterruptedException {
try{
- if (session.exists(prefix + nodename, false) != null) {
- context.log(log, Level.FINE, () -> "Zookeeper node '" + prefix + nodename + "' already exists. Not creating it");
+ if (session.exists(path, false) != null) {
+ context.log(log, Level.FINE, () -> "Zookeeper node '" + path + "' already exists. Not creating it");
return;
}
- session.create(prefix + nodename, value, acl, CreateMode.PERSISTENT);
- context.log(log, Level.FINE, () -> "Created zookeeper node '" + prefix + nodename + "'");
+ session.create(path, value, acl, CreateMode.PERSISTENT);
+ context.log(log, Level.FINE, () -> "Created zookeeper node '" + path + "'");
} catch (KeeperException.NodeExistsException e) {
context.log(log, Level.FINE, "Node to create existed, but this is normal as other nodes " +
"may create them at the same time.");
@@ -134,21 +131,21 @@ public class ZooKeeperDatabase extends Database {
}
private void setupRoot() throws KeeperException, InterruptedException {
- String[] pathElements = zooKeeperRoot.substring(1).split("/");
+ String[] pathElements = paths.root().substring(1).split("/");
String path = "";
for (String elem : pathElements) {
path += "/" + elem;
- createNode("", path, new byte[0]);
+ createNode(path, new byte[0]);
}
- createNode(zooKeeperRoot, "indexes", new byte[0]);
- createNode(zooKeeperRoot, "wantedstates", new byte[0]);
- createNode(zooKeeperRoot, "starttimestamps", new byte[0]);
- createNode(zooKeeperRoot, "latestversion", Integer.valueOf(0).toString().getBytes(utf8));
- createNode(zooKeeperRoot, "published_state_bundle", new byte[0]); // TODO dedupe string constants
- byte[] val = String.valueOf(nodeIndex).getBytes(utf8);
- deleteNodeIfExists(getMyIndexPath());
+ createNode(paths.indexesRoot(), new byte[0]);
+ createNode(paths.wantedStates(), new byte[0]);
+ createNode(paths.startTimestamps(), new byte[0]);
+ createNode(paths.latestVersion(), Integer.valueOf(0).toString().getBytes(utf8));
+ createNode(paths.publishedStateBundle(), new byte[0]);
+ byte[] val = String.valueOf(context.id().index()).getBytes(utf8);
+ deleteNodeIfExists(paths.indexOfMe());
context.log(log, Level.INFO, "Creating ephemeral master vote node with vote to self.");
- session.create(getMyIndexPath(), val, acl, CreateMode.EPHEMERAL);
+ session.create(paths.indexOfMe(), val, acl, CreateMode.EPHEMERAL);
}
private void deleteNodeIfExists(String path) throws KeeperException, InterruptedException {
@@ -158,10 +155,6 @@ public class ZooKeeperDatabase extends Database {
}
}
- private String getMyIndexPath() {
- return zooKeeperRoot + "indexes/" + nodeIndex;
- }
-
/**
* If this is called, we assume we're in shutdown situation, or we are doing it because we need a new session.
* Thus we only need to free up resources, no need to notify anyone.
@@ -192,8 +185,8 @@ public class ZooKeeperDatabase extends Database {
public boolean storeMasterVote(int wantedMasterIndex) {
byte[] val = String.valueOf(wantedMasterIndex).getBytes(utf8);
try{
- session.setData(getMyIndexPath(), val, -1);
- context.log(log, Level.INFO, "Stored new vote in ephemeral node. " + nodeIndex + " -> " + wantedMasterIndex);
+ session.setData(paths.indexOfMe(), val, -1);
+ context.log(log, Level.INFO, "Stored new vote in ephemeral node. " + context.id().index() + " -> " + wantedMasterIndex);
return true;
} catch (InterruptedException e) {
throw new RuntimeException(e);
@@ -206,7 +199,7 @@ public class ZooKeeperDatabase extends Database {
byte[] data = Integer.toString(version).getBytes(utf8);
try{
context.log(log, Level.INFO, "Storing new cluster state version in ZooKeeper: " + version);
- var stat = session.setData(zooKeeperRoot + "latestversion", data, lastKnownStateVersionZNodeVersion);
+ var stat = session.setData(paths.latestVersion(), data, lastKnownStateVersionZNodeVersion);
lastKnownStateVersionZNodeVersion = stat.getVersion();
return true;
} catch (InterruptedException e) {
@@ -222,17 +215,17 @@ public class ZooKeeperDatabase extends Database {
public Integer retrieveLatestSystemStateVersion() {
Stat stat = new Stat();
- context.log(log, Level.FINE, "Fetching latest cluster state at '%slatestversion'", zooKeeperRoot);
+ context.log(log, Level.FINE, "Fetching latest cluster state at '%s'", paths.latestVersion());
final byte[] data;
try {
- data = session.getData(zooKeeperRoot + "latestversion", false, stat);
+ data = session.getData(paths.latestVersion(), false, stat);
} catch (KeeperException.NoNodeException e) {
// Initial condition: No latest version has ever been written (or ZK state completely wiped!)
lastKnownStateVersionZNodeVersion = 0;
maybeLogExceptionWarning(e, "No latest system state found");
return null;
} catch (InterruptedException | KeeperException e) {
- throw new RuntimeException("Failed to get " + zooKeeperRoot + "latestversion", e);
+ throw new RuntimeException("Failed to get " + paths.latestVersion(), e);
}
lastKnownStateVersionZNodeVersion = stat.getVersion();
@@ -257,8 +250,8 @@ public class ZooKeeperDatabase extends Database {
}
byte[] val = sb.toString().getBytes(utf8);
try{
- context.log(log, Level.FINE, () -> "Storing wanted states at '" + zooKeeperRoot + "wantedstates'");
- session.setData(zooKeeperRoot + "wantedstates", val, -1);
+ context.log(log, Level.FINE, () -> "Storing wanted states at '" + paths.wantedStates() + "'");
+ session.setData(paths.wantedStates(), val, -1);
return true;
} catch (InterruptedException e) {
throw new RuntimeException(e);
@@ -270,9 +263,9 @@ public class ZooKeeperDatabase extends Database {
public Map<Node, NodeState> retrieveWantedStates() {
try{
- context.log(log, Level.FINE, () -> "Fetching wanted states at '" + zooKeeperRoot + "wantedstates'");
+ context.log(log, Level.FINE, () -> "Fetching wanted states at '" + paths.wantedStates() + "'");
Stat stat = new Stat();
- byte[] data = session.getData(zooKeeperRoot + "wantedstates", false, stat);
+ byte[] data = session.getData(paths.wantedStates(), false, stat);
Map<Node, NodeState> wanted = new TreeMap<>();
if (data != null && data.length > 0) {
StringTokenizer st = new StringTokenizer(new String(data, utf8), "\n", false);
@@ -308,8 +301,8 @@ public class ZooKeeperDatabase extends Database {
}
byte val[] = sb.toString().getBytes(utf8);
try{
- context.log(log, Level.FINE, () -> "Storing start timestamps at '" + zooKeeperRoot + "starttimestamps");
- session.setData(zooKeeperRoot + "starttimestamps", val, -1);
+ context.log(log, Level.FINE, () -> "Storing start timestamps at '" + paths.startTimestamps() + "'");
+ session.setData(paths.startTimestamps(), val, -1);
return true;
} catch (InterruptedException e) {
throw new RuntimeException(e);
@@ -322,9 +315,9 @@ public class ZooKeeperDatabase extends Database {
@Override
public Map<Node, Long> retrieveStartTimestamps() {
try{
- context.log(log, Level.FINE, () -> "Fetching start timestamps at '" + zooKeeperRoot + "starttimestamps'");
+ context.log(log, Level.FINE, () -> "Fetching start timestamps at '" + paths.startTimestamps() + "'");
Stat stat = new Stat();
- byte[] data = session.getData(zooKeeperRoot + "starttimestamps", false, stat);
+ byte[] data = session.getData(paths.startTimestamps(), false, stat);
Map<Node, Long> wanted = new TreeMap<Node, Long>();
if (data != null && data.length > 0) {
StringTokenizer st = new StringTokenizer(new String(data, utf8), "\n", false);
@@ -357,10 +350,9 @@ public class ZooKeeperDatabase extends Database {
try{
context.log(log,
Level.FINE,
- () -> String.format("Storing published state bundle %s at " +
- "'%spublished_state_bundle' with expected znode version %d",
- stateBundle, zooKeeperRoot, lastKnownStateBundleZNodeVersion));
- var stat = session.setData(zooKeeperRoot + "published_state_bundle", encodedBundle, lastKnownStateBundleZNodeVersion);
+ () -> String.format("Storing published state bundle %s at '%s' with expected znode version %d",
+ stateBundle, paths.publishedStateBundle(), lastKnownStateBundleZNodeVersion));
+ var stat = session.setData(paths.publishedStateBundle(), encodedBundle, lastKnownStateBundleZNodeVersion);
lastKnownStateBundleZNodeVersion = stat.getVersion();
} catch (InterruptedException e) {
throw new RuntimeException(e);
@@ -378,7 +370,7 @@ public class ZooKeeperDatabase extends Database {
public ClusterStateBundle retrieveLastPublishedStateBundle() {
Stat stat = new Stat();
try {
- byte[] data = session.getData(zooKeeperRoot + "published_state_bundle", false, stat);
+ byte[] data = session.getData(paths.publishedStateBundle(), false, stat);
lastKnownStateBundleZNodeVersion = stat.getVersion();
if (data != null && data.length != 0) {
EnvelopedClusterStateBundleCodec envelopedBundleCodec = new SlimeClusterStateBundleCodec();