summaryrefslogtreecommitdiffstats
path: root/clustercontroller-core
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2019-07-11 15:14:22 +0200
committerTor Brede Vekterli <vekterli@verizonmedia.com>2019-07-11 15:16:15 +0200
commitf76ef475431084530c1e6d77eb5cec4788ed8ff7 (patch)
tree60d8d64b35213b1565899ddbce98d70dc429cf8b /clustercontroller-core
parent1c79079945c56fa91de8427fbc8f2170eec9ed8c (diff)
Guard critical cluster state version ZK writes with an atomic CaS
Lets a controller discover that another controller also believes it is the leader by tracking the expected znode versions for cluster state version and bundle znodes. If a CaS failure is triggered, the controller will drop its database and election state, forcing a state refresh.
Diffstat (limited to 'clustercontroller-core')
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/CasWriteFailed.java23
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/Database.java16
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/DatabaseHandler.java132
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/ZooKeeperDatabase.java30
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ZooKeeperDatabaseTest.java37
5 files changed, 170 insertions, 68 deletions
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/CasWriteFailed.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/CasWriteFailed.java
new file mode 100644
index 00000000000..c62d8d4bcd5
--- /dev/null
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/CasWriteFailed.java
@@ -0,0 +1,23 @@
+// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.clustercontroller.core.database;
+
+/**
+ * Exception used to signal that a write intended to overwrite a value previously
+ * read encountered an underlying version conflict during an atomic compare-and-swap
+ * operation. This generally means that another node has written to the value since
+ * we last read it, and that the information we hold may be stale.
+ *
+ * Upon receiving such an exception, the caller should no longer assume it holds
+ * up-to-date information and should drop and roles that build on top of such an
+ * assumption (such as leadership sessions).
+ */
+public class CasWriteFailed extends RuntimeException {
+
+ public CasWriteFailed(String message) {
+ super(message);
+ }
+
+ public CasWriteFailed(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/Database.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/Database.java
index c98e362c2d2..4b0461200a4 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/Database.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/Database.java
@@ -46,7 +46,12 @@ public abstract class Database {
* store the version in the database, such that if another fleetcontroller takes over as master it will use a
* higher version system state.
*
+ * Precondition: retrieveLatestSystemStateVersion() MUST have been called at least once prior to calling
+ * this method.
+ *
* @return True if request succeeded. False if not.
+ * @throws CasWriteFailed if the expected version of the znode did not match what was actually stored in the DB.
+ * In this case, the write has NOT been applied.
*/
public abstract boolean storeLatestSystemStateVersion(int version) throws InterruptedException;
@@ -84,6 +89,17 @@ public abstract class Database {
*/
public abstract Map<Node, Long> retrieveStartTimestamps() throws InterruptedException;
+ /**
+ * Stores the last published cluster state bundle synchronously into ZooKeeper.
+ *
+ * Precondition: retrieveLastPublishedStateBundle() MUST have been called at least once prior to calling
+ * this method.
+ *
+ * @return true if the write is known to have been successful, false otherwise. If false is returned, the
+ * write may or may not have taken place.
+ * @throws CasWriteFailed if the expected version of the znode did not match what was actually stored in the DB.
+ * In this case, the write has NOT been applied.
+ */
public abstract boolean storeLastPublishedStateBundle(ClusterStateBundle stateBundle) throws InterruptedException;
public abstract ClusterStateBundle retrieveLastPublishedStateBundle() throws InterruptedException;
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/DatabaseHandler.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/DatabaseHandler.java
index f30b86130c2..b9d02f16090 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/DatabaseHandler.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/DatabaseHandler.java
@@ -105,8 +105,7 @@ public class DatabaseHandler {
}
public void shutdown(FleetController fleetController) {
- reset();
- fleetController.lostDatabaseConnection();
+ relinquishDatabaseConnectivity(fleetController);
}
public boolean isClosed() { return database == null || database.isClosed(); }
@@ -232,68 +231,85 @@ public class DatabaseHandler {
didWork = true;
connect(context.getCluster(), currentTime);
}
- synchronized (databaseMonitor) {
- if (database == null || database.isClosed()) {
- return didWork;
- }
- if (pendingStore.masterVote != null) {
- didWork = true;
- log.log(LogLevel.DEBUG, "Fleetcontroller " + nodeIndex + ": Attempting to store master vote "
- + pendingStore.masterVote + " into zookeeper.");
- if (database.storeMasterVote(pendingStore.masterVote)) {
- log.log(LogLevel.DEBUG, "Fleetcontroller " + nodeIndex + ": Managed to store master vote "
- + pendingStore.masterVote + " into zookeeper.");
- currentlyStored.masterVote = pendingStore.masterVote;
- pendingStore.masterVote = null;
- } else {
- log.log(LogLevel.WARNING, "Fleetcontroller " + nodeIndex + ": Failed to store master vote");
+ try {
+ synchronized (databaseMonitor) {
+ if (database == null || database.isClosed()) {
return didWork;
}
+ didWork |= performZooKeeperWrites();
}
- if (pendingStore.lastSystemStateVersion != null) {
- didWork = true;
- log.log(LogLevel.DEBUG, "Fleetcontroller " + nodeIndex
- + ": Attempting to store last system state version " + pendingStore.lastSystemStateVersion
- + " into zookeeper.");
- // TODO guard version write with a CaS predicated on the version we last read/wrote.
- // TODO Drop leadership status if there is a mismatch, as it implies we're racing with another leader.
- if (database.storeLatestSystemStateVersion(pendingStore.lastSystemStateVersion)) {
- currentlyStored.lastSystemStateVersion = pendingStore.lastSystemStateVersion;
- pendingStore.lastSystemStateVersion = null;
- } else {
- return didWork;
- }
+ } catch (CasWriteFailed e) {
+ log.log(LogLevel.WARNING, String.format("CaS write to ZooKeeper failed, another controller " +
+ "has likely taken over ownership: %s", e.getMessage()));
+ // Clear DB and master election state. This shall trigger a full re-fetch of all
+ // version and election-related metadata.
+ relinquishDatabaseConnectivity(context.getFleetController());
+ }
+ return didWork;
+ }
+
+ private void relinquishDatabaseConnectivity(FleetController fleetController) {
+ reset();
+ fleetController.lostDatabaseConnection();
+ }
+
+ private boolean performZooKeeperWrites() throws InterruptedException {
+ boolean didWork = false;
+ if (pendingStore.masterVote != null) {
+ didWork = true;
+ log.log(LogLevel.DEBUG, "Fleetcontroller " + nodeIndex + ": Attempting to store master vote "
+ + pendingStore.masterVote + " into zookeeper.");
+ if (database.storeMasterVote(pendingStore.masterVote)) {
+ log.log(LogLevel.DEBUG, "Fleetcontroller " + nodeIndex + ": Managed to store master vote "
+ + pendingStore.masterVote + " into zookeeper.");
+ currentlyStored.masterVote = pendingStore.masterVote;
+ pendingStore.masterVote = null;
+ } else {
+ log.log(LogLevel.WARNING, "Fleetcontroller " + nodeIndex + ": Failed to store master vote");
+ return true;
}
- if (pendingStore.startTimestamps != null) {
- didWork = true;
- log.log(LogLevel.DEBUG, "Fleetcontroller " + nodeIndex + ": Attempting to store "
- + pendingStore.startTimestamps.size() + " start timestamps into zookeeper.");
- if (database.storeStartTimestamps(pendingStore.startTimestamps)) {
- currentlyStored.startTimestamps = pendingStore.startTimestamps;
- pendingStore.startTimestamps = null;
- } else {
- return didWork;
- }
+ }
+ if (pendingStore.lastSystemStateVersion != null) {
+ didWork = true;
+ log.log(LogLevel.DEBUG, "Fleetcontroller " + nodeIndex
+ + ": Attempting to store last system state version " + pendingStore.lastSystemStateVersion
+ + " into zookeeper.");
+ if (database.storeLatestSystemStateVersion(pendingStore.lastSystemStateVersion)) {
+ currentlyStored.lastSystemStateVersion = pendingStore.lastSystemStateVersion;
+ pendingStore.lastSystemStateVersion = null;
+ } else {
+ return true;
}
- if (pendingStore.wantedStates != null) {
- didWork = true;
- log.log(LogLevel.DEBUG, "Fleetcontroller " + nodeIndex + ": Attempting to store "
- + pendingStore.wantedStates.size() + " wanted states into zookeeper.");
- if (database.storeWantedStates(pendingStore.wantedStates)) {
- currentlyStored.wantedStates = pendingStore.wantedStates;
- pendingStore.wantedStates = null;
- } else {
- return didWork;
- }
+ }
+ if (pendingStore.startTimestamps != null) {
+ didWork = true;
+ log.log(LogLevel.DEBUG, "Fleetcontroller " + nodeIndex + ": Attempting to store "
+ + pendingStore.startTimestamps.size() + " start timestamps into zookeeper.");
+ if (database.storeStartTimestamps(pendingStore.startTimestamps)) {
+ currentlyStored.startTimestamps = pendingStore.startTimestamps;
+ pendingStore.startTimestamps = null;
+ } else {
+ return true;
}
- if (pendingStore.clusterStateBundle != null) {
- didWork = true;
- if (database.storeLastPublishedStateBundle(pendingStore.clusterStateBundle)) {
- currentlyStored.clusterStateBundle = pendingStore.clusterStateBundle;
- pendingStore.clusterStateBundle = null;
- } else {
- return true;
- }
+ }
+ if (pendingStore.wantedStates != null) {
+ didWork = true;
+ log.log(LogLevel.DEBUG, "Fleetcontroller " + nodeIndex + ": Attempting to store "
+ + pendingStore.wantedStates.size() + " wanted states into zookeeper.");
+ if (database.storeWantedStates(pendingStore.wantedStates)) {
+ currentlyStored.wantedStates = pendingStore.wantedStates;
+ pendingStore.wantedStates = null;
+ } else {
+ return true;
+ }
+ }
+ if (pendingStore.clusterStateBundle != null) {
+ didWork = true;
+ if (database.storeLastPublishedStateBundle(pendingStore.clusterStateBundle)) {
+ currentlyStored.clusterStateBundle = pendingStore.clusterStateBundle;
+ pendingStore.clusterStateBundle = null;
+ } else {
+ return true;
}
}
return didWork;
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/ZooKeeperDatabase.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/ZooKeeperDatabase.java
index 77ca7e12711..880f2cdaf19 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/ZooKeeperDatabase.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/ZooKeeperDatabase.java
@@ -35,6 +35,10 @@ public class ZooKeeperDatabase extends Database {
private final int nodeIndex;
private final MasterDataGatherer masterDataGatherer;
private boolean reportErrors = true;
+ // Expected ZK znode versions. Note: these are _not_ -1 as that would match anything.
+ // We expect the caller to invoke the load methods prior to calling any store methods.
+ private int lastKnownStateBundleZNodeVersion = -2;
+ private int lastKnownStateVersionZNodeVersion = -2;
public void stopErrorReporting() {
reportErrors = false;
@@ -196,10 +200,14 @@ public class ZooKeeperDatabase extends Database {
byte data[] = Integer.toString(version).getBytes(utf8);
try{
log.log(LogLevel.INFO, String.format("Fleetcontroller %d: Storing new cluster state version in ZooKeeper: %d", nodeIndex, version));
- session.setData(zooKeeperRoot + "latestversion", data, -1);
+ var stat = session.setData(zooKeeperRoot + "latestversion", data, lastKnownStateVersionZNodeVersion);
+ lastKnownStateVersionZNodeVersion = stat.getVersion();
return true;
} catch (InterruptedException e) {
throw (InterruptedException) new InterruptedException("Interrupted").initCause(e);
+ } catch (KeeperException.BadVersionException e) {
+ throw new CasWriteFailed(String.format("version mismatch in cluster state version znode (expected %d): %s",
+ lastKnownStateVersionZNodeVersion, e.getMessage()), e);
} catch (Exception e) {
maybeLogExceptionWarning(e, "Failed to store latest system state version used " + version);
return false;
@@ -209,10 +217,13 @@ public class ZooKeeperDatabase extends Database {
public Integer retrieveLatestSystemStateVersion() throws InterruptedException {
Stat stat = new Stat();
try{
- log.log(LogLevel.DEBUG, "Fleetcontroller " + nodeIndex + ": Fetching latest cluster state at '" + zooKeeperRoot + "latestversion'");
+ log.log(LogLevel.DEBUG, () -> String.format("Fleetcontroller %d: Fetching latest cluster state at '%slatestversion'",
+ nodeIndex, zooKeeperRoot));
byte[] data = session.getData(zooKeeperRoot + "latestversion", false, stat);
+ lastKnownStateVersionZNodeVersion = stat.getVersion();
final Integer versionNumber = Integer.valueOf(new String(data, utf8));
- log.log(LogLevel.INFO, String.format("Fleetcontroller %d: Read cluster state version %d from ZooKeeper", nodeIndex, versionNumber));
+ log.log(LogLevel.INFO, String.format("Fleetcontroller %d: Read cluster state version %d from ZooKeeper " +
+ "(znode version %d)", nodeIndex, versionNumber, stat.getVersion()));
return versionNumber;
} catch (InterruptedException e) {
throw (InterruptedException) new InterruptedException("Interrupted").initCause(e);
@@ -336,12 +347,16 @@ public class ZooKeeperDatabase extends Database {
EnvelopedClusterStateBundleCodec envelopedBundleCodec = new SlimeClusterStateBundleCodec();
byte[] encodedBundle = envelopedBundleCodec.encodeWithEnvelope(stateBundle);
try{
- log.log(LogLevel.DEBUG, () -> String.format("Fleetcontroller %d: Storing published state bundle %s at '%spublished_state_bundle'",
- nodeIndex, stateBundle, zooKeeperRoot));
- // TODO CAS on expected zknode version
- session.setData(zooKeeperRoot + "published_state_bundle", encodedBundle, -1);
+ log.log(LogLevel.DEBUG, () -> String.format("Fleetcontroller %d: Storing published state bundle %s at " +
+ "'%spublished_state_bundle' with expected znode version %d",
+ nodeIndex, stateBundle, zooKeeperRoot, lastKnownStateBundleZNodeVersion));
+ var stat = session.setData(zooKeeperRoot + "published_state_bundle", encodedBundle, lastKnownStateBundleZNodeVersion);
+ lastKnownStateBundleZNodeVersion = stat.getVersion();
} catch (InterruptedException e) {
throw (InterruptedException) new InterruptedException("Interrupted").initCause(e);
+ } catch (KeeperException.BadVersionException e) {
+ throw new CasWriteFailed(String.format("version mismatch in cluster state bundle znode (expected %d): %s",
+ lastKnownStateBundleZNodeVersion, e.getMessage()), e);
} catch (Exception e) {
maybeLogExceptionWarning(e, "Failed to store last published cluster state bundle in ZooKeeper");
return false;
@@ -354,6 +369,7 @@ public class ZooKeeperDatabase extends Database {
Stat stat = new Stat();
try {
byte[] data = session.getData(zooKeeperRoot + "published_state_bundle", false, stat);
+ lastKnownStateBundleZNodeVersion = stat.getVersion();
if (data != null && data.length != 0) {
EnvelopedClusterStateBundleCodec envelopedBundleCodec = new SlimeClusterStateBundleCodec();
return envelopedBundleCodec.decodeWithEnvelope(data);
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ZooKeeperDatabaseTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ZooKeeperDatabaseTest.java
index e5200b63a7e..6062f45c4de 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ZooKeeperDatabaseTest.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ZooKeeperDatabaseTest.java
@@ -1,9 +1,12 @@
// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.clustercontroller.core;
+import com.yahoo.vespa.clustercontroller.core.database.CasWriteFailed;
import com.yahoo.vespa.clustercontroller.core.database.Database;
import com.yahoo.vespa.clustercontroller.core.database.ZooKeeperDatabase;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
import java.io.IOException;
import java.time.Duration;
@@ -14,6 +17,9 @@ import static org.mockito.Mockito.mock;
public class ZooKeeperDatabaseTest {
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
private static class Fixture implements AutoCloseable {
final ZooKeeperTestServer zkServer;
ClusterFixture clusterFixture;
@@ -53,9 +59,8 @@ public class ZooKeeperDatabaseTest {
public void can_store_and_load_cluster_state_bundle_from_database() throws Exception {
try (Fixture f = new Fixture()) {
f.createDatabase();
- ClusterStateBundle bundleToStore = ClusterStateBundleUtil.makeBundle("distributor:2 storage:2",
- StateMapping.of("default", "distributor:2 storage:2 .0.s:d"),
- StateMapping.of("upsidedown", "distributor:2 .0.s:d storage:2"));
+ f.db().retrieveLastPublishedStateBundle(); // Must be called once prior to prime last known znode version
+ ClusterStateBundle bundleToStore = dummyBundle();
f.db().storeLastPublishedStateBundle(bundleToStore);
ClusterStateBundle bundleReceived = f.db().retrieveLastPublishedStateBundle();
@@ -63,6 +68,32 @@ public class ZooKeeperDatabaseTest {
}
}
+ private static ClusterStateBundle dummyBundle() {
+ return ClusterStateBundleUtil.makeBundle("distributor:2 storage:2",
+ StateMapping.of("default", "distributor:2 storage:2 .0.s:d"),
+ StateMapping.of("upsidedown", "distributor:2 .0.s:d storage:2"));
+ }
+
+ @Test
+ public void storing_cluster_state_bundle_with_mismatching_expected_znode_version_throws_exception() throws Exception {
+ expectedException.expect(CasWriteFailed.class);
+ expectedException.expectMessage("version mismatch in cluster state bundle znode (expected -2)");
+ try (Fixture f = new Fixture()) {
+ f.createDatabase();
+ f.db().storeLastPublishedStateBundle(dummyBundle());
+ }
+ }
+
+ @Test
+ public void storing_cluster_state_version_with_mismatching_expected_znode_version_throws_exception() throws Exception {
+ expectedException.expect(CasWriteFailed.class);
+ expectedException.expectMessage("version mismatch in cluster state version znode (expected -2)");
+ try (Fixture f = new Fixture()) {
+ f.createDatabase();
+ f.db().storeLatestSystemStateVersion(12345);
+ }
+ }
+
@Test
public void empty_state_bundle_is_returned_if_no_bundle_already_stored_in_database() throws Exception {
try (Fixture f = new Fixture()) {