aboutsummaryrefslogtreecommitdiffstats
path: root/clustercontroller-core
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2021-02-24 17:07:17 +0100
committerTor Brede Vekterli <vekterli@verizonmedia.com>2021-02-26 12:32:04 +0100
commit20627141c1f57c0c199193b07abce885508152f3 (patch)
tree87717c7794b751fcd6d786556248dad77e076aaa /clustercontroller-core
parent38beecb43c8dfc15af8f3b14a62ee29430689713 (diff)
Enforce that no cluster state can be published unless confirmed written to ZooKeeper
This avoids a subtle edge case where the underlying ZK integration code may fail silently a write, leaving the core controller logic to think that it had actually durably persisted a particular state version. In case of reelections racing with broadcasts, it would be possible for leader-edge readbacks from ZK to retrieve a _lower_ version than one that had already been published. This would cause the cluster controller to get very confused about which cluster states nodes had already observed. If a newly produced state version overlapped with a previously broadcast state, the controller would not push the updated state to the nodes, as it would (with good reason) assume the node had already observed it, seeing that it had already ACKed the particular version number.
Diffstat (limited to 'clustercontroller-core')
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java7
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java14
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/DatabaseHandler.java23
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java4
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java29
5 files changed, 73 insertions, 4 deletions
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java
index 58f40d821a9..52a17fe88ab 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java
@@ -407,6 +407,10 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
try {
database.saveLatestSystemStateVersion(databaseContext, stateBundle.getVersion());
database.saveLatestClusterStateBundle(databaseContext, stateBundle);
+ // It's possible that due to transient ZK quorum errors, we were not able to actually
+ // fully store the state. Gate state broadcasting on the actually synchronously ACKed
+ // stored state instead of what's _hopefully_ stored.
+ systemStateBroadcaster.setLastClusterStateVersionWrittenToZooKeeper(database.getLastKnownStateBundleVersionWrittenBySelf());
} catch (InterruptedException e) {
// Rethrow as RuntimeException to propagate exception up to main thread method.
// Don't want to hide failures to write cluster state version.
@@ -579,6 +583,9 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
synchronized (monitor) {
boolean didWork;
didWork = database.doNextZooKeeperTask(databaseContext);
+ // In case of delayed ZK writes caused by intermittent failures, make sure to tag any newly written versions.
+ systemStateBroadcaster.setLastClusterStateVersionWrittenToZooKeeper(
+ database.getLastKnownStateBundleVersionWrittenBySelf());
didWork |= updateMasterElectionState();
didWork |= handleLeadershipEdgeTransitions();
stateChangeHandler.setMaster(isMaster);
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java
index 502a0823b13..9fefd0e6a12 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java
@@ -28,6 +28,7 @@ public class SystemStateBroadcaster {
private int lastStateVersionBundleAcked = 0;
private int lastClusterStateVersionConverged = 0;
+ private int lastClusterStateVersionWrittenToZooKeeper = 0;
private ClusterStateBundle lastClusterStateBundleConverged;
private final SetClusterStateWaiter setClusterStateWaiter = new SetClusterStateWaiter();
@@ -255,10 +256,21 @@ public class SystemStateBroadcaster {
return lastClusterStateVersionConverged == clusterStateBundle.getVersion();
}
- public boolean broadcastNewStateBundleIfRequired(DatabaseHandler.Context dbContext, Communicator communicator) {
+ public void setLastClusterStateVersionWrittenToZooKeeper(int version) {
+ lastClusterStateVersionWrittenToZooKeeper = version;
+ }
+
+ private boolean currentStateVersionHasBeenWrittenToZooKeeper() {
if (clusterStateBundle == null) {
return false;
}
+ return clusterStateBundle.getVersion() == lastClusterStateVersionWrittenToZooKeeper;
+ }
+
+ public boolean broadcastNewStateBundleIfRequired(DatabaseHandler.Context dbContext, Communicator communicator) {
+ if (!currentStateVersionHasBeenWrittenToZooKeeper()) {
+ return false;
+ }
ClusterState baselineState = clusterStateBundle.getBaselineClusterState();
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/DatabaseHandler.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/DatabaseHandler.java
index 5bad5716eb9..5d5bb674d4f 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/DatabaseHandler.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/DatabaseHandler.java
@@ -83,6 +83,7 @@ public class DatabaseHandler {
private final DatabaseListener dbListener = new DatabaseListener();
private final Data currentlyStored = new Data();
private final Data pendingStore = new Data();
+ private int lastKnownStateBundleVersionWrittenBySelf = 0;
private long lastZooKeeperConnectionAttempt = 0;
private static final int minimumWaitBetweenFailedConnectionAttempts = 10000;
private boolean lostZooKeeperConnectionEvent = false;
@@ -111,6 +112,10 @@ public class DatabaseHandler {
public boolean isClosed() { return database == null || database.isClosed(); }
+ public int getLastKnownStateBundleVersionWrittenBySelf() {
+ return lastKnownStateBundleVersionWrittenBySelf;
+ }
+
public void reset() {
final boolean wasRunning;
synchronized (databaseMonitor) {
@@ -222,7 +227,7 @@ public class DatabaseHandler {
didWork = true;
}
}
- if (isDatabaseClosedSafe()) {
+ if (isDatabaseClosedSafe() && zooKeeperIsConfigured()) {
long currentTime = timer.getCurrentTimeInMillis();
if (currentTime - lastZooKeeperConnectionAttempt < minimumWaitBetweenFailedConnectionAttempts) {
return false; // Not time to attempt connection yet.
@@ -247,6 +252,11 @@ public class DatabaseHandler {
return didWork;
}
+ private boolean zooKeeperIsConfigured() {
+ // This should only ever be null during unit testing.
+ return zooKeeperAddress != null;
+ }
+
private void relinquishDatabaseConnectivity(FleetController fleetController) {
reset();
fleetController.lostDatabaseConnection();
@@ -304,7 +314,10 @@ public class DatabaseHandler {
}
if (pendingStore.clusterStateBundle != null) {
didWork = true;
+ log.fine(() -> String.format("Fleetcontroller %d: Attempting to store last cluster state bundle with version %d into zookeeper.",
+ nodeIndex, pendingStore.clusterStateBundle.getVersion()));
if (database.storeLastPublishedStateBundle(pendingStore.clusterStateBundle)) {
+ lastKnownStateBundleVersionWrittenBySelf = pendingStore.clusterStateBundle.getVersion();
currentlyStored.clusterStateBundle = pendingStore.clusterStateBundle;
pendingStore.clusterStateBundle = null;
} else {
@@ -365,6 +378,14 @@ public class DatabaseHandler {
log.log(Level.FINE, () -> String.format("Fleetcontroller %d: Scheduling bundle %s to be saved to ZooKeeper", nodeIndex, clusterStateBundle));
pendingStore.clusterStateBundle = clusterStateBundle;
doNextZooKeeperTask(context);
+ // FIXME this is a nasty hack to get around the fact that a massive amount of unit tests
+ // set up the system with a null ZooKeeper server address. If we don't fake that we have
+ // written the state version, the tests will never progress past waiting for state broadcasts.
+ if (zooKeeperAddress == null) {
+ log.warning(() -> String.format("Fleetcontroller %d: Simulating ZK write of version %d. This should not happen in production!",
+ nodeIndex, clusterStateBundle.getVersion()));
+ lastKnownStateBundleVersionWrittenBySelf = clusterStateBundle.getVersion();
+ }
}
// TODO should we expand this to cover _any_ pending ZK write?
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java
index 15778846ca6..cf2b151e55a 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java
@@ -954,7 +954,7 @@ public class StateChangeTest extends FleetControllerTest {
options.minTimeBeforeFirstSystemStateBroadcast = 3 * 60 * 1000;
setUpSystem(true, options);
setUpVdsNodes(true, new DummyVdsNodeOptions(), true);
- // Leave one node down to avoid sending cluster state due to having seen all node states.
+ // Leave one node down to avoid sending cluster state due to having seen all node states.
for (int i=0; i<nodes.size(); ++i) {
if (i != 3) {
nodes.get(i).connect();
@@ -973,7 +973,7 @@ public class StateChangeTest extends FleetControllerTest {
@Override int expectedMessageCount(final DummyVdsNode node) { return 0; }
};
- // Pass time and see that the nodes get state
+ // Pass time and see that the nodes get state
timer.advanceTime(3 * 60 * 1000);
waiter.waitForState("version:\\d+ distributor:10 storage:10 .1.s:d", timeoutMS);
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java
index 7cfedb937bc..c49cc77aabb 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java
@@ -144,6 +144,34 @@ public class SystemStateBroadcasterTest {
verify(f.mockCommunicator).setSystemState(eq(expectedDistr0Bundle), eq(cf.cluster().getNodeInfo(Node.ofDistributor(0))), any());
}
+ @Test
+ public void state_not_broadcast_if_version_not_tagged_as_written_to_zookeeper() {
+ Fixture f = new Fixture();
+ ClusterStateBundle stateBundle = ClusterStateBundleUtil.makeBundle("version:100 distributor:2 storage:2");
+ ClusterFixture cf = ClusterFixture.forFlatCluster(2).bringEntireClusterUp().assignDummyRpcAddresses();
+ f.broadcaster.handleNewClusterStates(stateBundle);
+ f.broadcaster.setLastClusterStateVersionWrittenToZooKeeper(99);
+ f.broadcaster.broadcastNewStateBundleIfRequired(dbContextFrom(cf.cluster()), f.mockCommunicator);
+
+ cf.cluster().getNodeInfo().forEach(nodeInfo -> {
+ verify(f.mockCommunicator, times(0)).setSystemState(any(), eq(nodeInfo), any());
+ });
+ }
+
+ @Test
+ public void state_is_broadcast_if_version_is_tagged_as_written_to_zookeeper() {
+ Fixture f = new Fixture();
+ ClusterStateBundle stateBundle = ClusterStateBundleUtil.makeBundle("version:100 distributor:2 storage:2");
+ ClusterFixture cf = ClusterFixture.forFlatCluster(2).bringEntireClusterUp().assignDummyRpcAddresses();
+ f.broadcaster.handleNewClusterStates(stateBundle);
+ f.broadcaster.setLastClusterStateVersionWrittenToZooKeeper(100);
+ f.broadcaster.broadcastNewStateBundleIfRequired(dbContextFrom(cf.cluster()), f.mockCommunicator);
+
+ cf.cluster().getNodeInfo().forEach(nodeInfo -> {
+ verify(f.mockCommunicator, times(1)).setSystemState(any(), eq(nodeInfo), any());
+ });
+ }
+
private static class MockSetClusterStateRequest extends SetClusterStateRequest {
MockSetClusterStateRequest(NodeInfo nodeInfo, int clusterStateVersion) {
super(nodeInfo, clusterStateVersion);
@@ -202,6 +230,7 @@ public class SystemStateBroadcasterTest {
.deriveAndBuild();
cf = ClusterFixture.forFlatCluster(2).bringEntireClusterUp().assignDummyRpcAddresses();
broadcaster.handleNewClusterStates(stateBundle);
+ broadcaster.setLastClusterStateVersionWrittenToZooKeeper(stateBundle.getVersion());
broadcaster.broadcastNewStateBundleIfRequired(dbContextFrom(cf.cluster()), mockCommunicator);
d0Waiter = ArgumentCaptor.forClass(Communicator.Waiter.class);