diff options
5 files changed, 170 insertions, 68 deletions
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/CasWriteFailed.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/CasWriteFailed.java new file mode 100644 index 00000000000..c62d8d4bcd5 --- /dev/null +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/CasWriteFailed.java @@ -0,0 +1,23 @@ +// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.clustercontroller.core.database; + +/** + * Exception used to signal that a write intended to overwrite a value previously + * read encountered an underlying version conflict during an atomic compare-and-swap + * operation. This generally means that another node has written to the value since + * we last read it, and that the information we hold may be stale. + * + * Upon receiving such an exception, the caller should no longer assume it holds + * up-to-date information and should drop and roles that build on top of such an + * assumption (such as leadership sessions). + */ +public class CasWriteFailed extends RuntimeException { + + public CasWriteFailed(String message) { + super(message); + } + + public CasWriteFailed(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/Database.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/Database.java index c98e362c2d2..4b0461200a4 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/Database.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/Database.java @@ -46,7 +46,12 @@ public abstract class Database { * store the version in the database, such that if another fleetcontroller takes over as master it will use a * higher version system state. * + * Precondition: retrieveLatestSystemStateVersion() MUST have been called at least once prior to calling + * this method. + * * @return True if request succeeded. False if not. + * @throws CasWriteFailed if the expected version of the znode did not match what was actually stored in the DB. + * In this case, the write has NOT been applied. */ public abstract boolean storeLatestSystemStateVersion(int version) throws InterruptedException; @@ -84,6 +89,17 @@ public abstract class Database { */ public abstract Map<Node, Long> retrieveStartTimestamps() throws InterruptedException; + /** + * Stores the last published cluster state bundle synchronously into ZooKeeper. + * + * Precondition: retrieveLastPublishedStateBundle() MUST have been called at least once prior to calling + * this method. + * + * @return true if the write is known to have been successful, false otherwise. If false is returned, the + * write may or may not have taken place. + * @throws CasWriteFailed if the expected version of the znode did not match what was actually stored in the DB. + * In this case, the write has NOT been applied. + */ public abstract boolean storeLastPublishedStateBundle(ClusterStateBundle stateBundle) throws InterruptedException; public abstract ClusterStateBundle retrieveLastPublishedStateBundle() throws InterruptedException; diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/DatabaseHandler.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/DatabaseHandler.java index f30b86130c2..b9d02f16090 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/DatabaseHandler.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/DatabaseHandler.java @@ -105,8 +105,7 @@ public class DatabaseHandler { } public void shutdown(FleetController fleetController) { - reset(); - fleetController.lostDatabaseConnection(); + relinquishDatabaseConnectivity(fleetController); } public boolean isClosed() { return database == null || database.isClosed(); } @@ -232,68 +231,85 @@ public class DatabaseHandler { didWork = true; connect(context.getCluster(), currentTime); } - synchronized (databaseMonitor) { - if (database == null || database.isClosed()) { - return didWork; - } - if (pendingStore.masterVote != null) { - didWork = true; - log.log(LogLevel.DEBUG, "Fleetcontroller " + nodeIndex + ": Attempting to store master vote " - + pendingStore.masterVote + " into zookeeper."); - if (database.storeMasterVote(pendingStore.masterVote)) { - log.log(LogLevel.DEBUG, "Fleetcontroller " + nodeIndex + ": Managed to store master vote " - + pendingStore.masterVote + " into zookeeper."); - currentlyStored.masterVote = pendingStore.masterVote; - pendingStore.masterVote = null; - } else { - log.log(LogLevel.WARNING, "Fleetcontroller " + nodeIndex + ": Failed to store master vote"); + try { + synchronized (databaseMonitor) { + if (database == null || database.isClosed()) { return didWork; } + didWork |= performZooKeeperWrites(); } - if (pendingStore.lastSystemStateVersion != null) { - didWork = true; - log.log(LogLevel.DEBUG, "Fleetcontroller " + nodeIndex - + ": Attempting to store last system state version " + pendingStore.lastSystemStateVersion - + " into zookeeper."); - // TODO guard version write with a CaS predicated on the version we last read/wrote. - // TODO Drop leadership status if there is a mismatch, as it implies we're racing with another leader. - if (database.storeLatestSystemStateVersion(pendingStore.lastSystemStateVersion)) { - currentlyStored.lastSystemStateVersion = pendingStore.lastSystemStateVersion; - pendingStore.lastSystemStateVersion = null; - } else { - return didWork; - } + } catch (CasWriteFailed e) { + log.log(LogLevel.WARNING, String.format("CaS write to ZooKeeper failed, another controller " + + "has likely taken over ownership: %s", e.getMessage())); + // Clear DB and master election state. This shall trigger a full re-fetch of all + // version and election-related metadata. + relinquishDatabaseConnectivity(context.getFleetController()); + } + return didWork; + } + + private void relinquishDatabaseConnectivity(FleetController fleetController) { + reset(); + fleetController.lostDatabaseConnection(); + } + + private boolean performZooKeeperWrites() throws InterruptedException { + boolean didWork = false; + if (pendingStore.masterVote != null) { + didWork = true; + log.log(LogLevel.DEBUG, "Fleetcontroller " + nodeIndex + ": Attempting to store master vote " + + pendingStore.masterVote + " into zookeeper."); + if (database.storeMasterVote(pendingStore.masterVote)) { + log.log(LogLevel.DEBUG, "Fleetcontroller " + nodeIndex + ": Managed to store master vote " + + pendingStore.masterVote + " into zookeeper."); + currentlyStored.masterVote = pendingStore.masterVote; + pendingStore.masterVote = null; + } else { + log.log(LogLevel.WARNING, "Fleetcontroller " + nodeIndex + ": Failed to store master vote"); + return true; } - if (pendingStore.startTimestamps != null) { - didWork = true; - log.log(LogLevel.DEBUG, "Fleetcontroller " + nodeIndex + ": Attempting to store " - + pendingStore.startTimestamps.size() + " start timestamps into zookeeper."); - if (database.storeStartTimestamps(pendingStore.startTimestamps)) { - currentlyStored.startTimestamps = pendingStore.startTimestamps; - pendingStore.startTimestamps = null; - } else { - return didWork; - } + } + if (pendingStore.lastSystemStateVersion != null) { + didWork = true; + log.log(LogLevel.DEBUG, "Fleetcontroller " + nodeIndex + + ": Attempting to store last system state version " + pendingStore.lastSystemStateVersion + + " into zookeeper."); + if (database.storeLatestSystemStateVersion(pendingStore.lastSystemStateVersion)) { + currentlyStored.lastSystemStateVersion = pendingStore.lastSystemStateVersion; + pendingStore.lastSystemStateVersion = null; + } else { + return true; } - if (pendingStore.wantedStates != null) { - didWork = true; - log.log(LogLevel.DEBUG, "Fleetcontroller " + nodeIndex + ": Attempting to store " - + pendingStore.wantedStates.size() + " wanted states into zookeeper."); - if (database.storeWantedStates(pendingStore.wantedStates)) { - currentlyStored.wantedStates = pendingStore.wantedStates; - pendingStore.wantedStates = null; - } else { - return didWork; - } + } + if (pendingStore.startTimestamps != null) { + didWork = true; + log.log(LogLevel.DEBUG, "Fleetcontroller " + nodeIndex + ": Attempting to store " + + pendingStore.startTimestamps.size() + " start timestamps into zookeeper."); + if (database.storeStartTimestamps(pendingStore.startTimestamps)) { + currentlyStored.startTimestamps = pendingStore.startTimestamps; + pendingStore.startTimestamps = null; + } else { + return true; } - if (pendingStore.clusterStateBundle != null) { - didWork = true; - if (database.storeLastPublishedStateBundle(pendingStore.clusterStateBundle)) { - currentlyStored.clusterStateBundle = pendingStore.clusterStateBundle; - pendingStore.clusterStateBundle = null; - } else { - return true; - } + } + if (pendingStore.wantedStates != null) { + didWork = true; + log.log(LogLevel.DEBUG, "Fleetcontroller " + nodeIndex + ": Attempting to store " + + pendingStore.wantedStates.size() + " wanted states into zookeeper."); + if (database.storeWantedStates(pendingStore.wantedStates)) { + currentlyStored.wantedStates = pendingStore.wantedStates; + pendingStore.wantedStates = null; + } else { + return true; + } + } + if (pendingStore.clusterStateBundle != null) { + didWork = true; + if (database.storeLastPublishedStateBundle(pendingStore.clusterStateBundle)) { + currentlyStored.clusterStateBundle = pendingStore.clusterStateBundle; + pendingStore.clusterStateBundle = null; + } else { + return true; } } return didWork; diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/ZooKeeperDatabase.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/ZooKeeperDatabase.java index 77ca7e12711..880f2cdaf19 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/ZooKeeperDatabase.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/ZooKeeperDatabase.java @@ -35,6 +35,10 @@ public class ZooKeeperDatabase extends Database { private final int nodeIndex; private final MasterDataGatherer masterDataGatherer; private boolean reportErrors = true; + // Expected ZK znode versions. Note: these are _not_ -1 as that would match anything. + // We expect the caller to invoke the load methods prior to calling any store methods. + private int lastKnownStateBundleZNodeVersion = -2; + private int lastKnownStateVersionZNodeVersion = -2; public void stopErrorReporting() { reportErrors = false; @@ -196,10 +200,14 @@ public class ZooKeeperDatabase extends Database { byte data[] = Integer.toString(version).getBytes(utf8); try{ log.log(LogLevel.INFO, String.format("Fleetcontroller %d: Storing new cluster state version in ZooKeeper: %d", nodeIndex, version)); - session.setData(zooKeeperRoot + "latestversion", data, -1); + var stat = session.setData(zooKeeperRoot + "latestversion", data, lastKnownStateVersionZNodeVersion); + lastKnownStateVersionZNodeVersion = stat.getVersion(); return true; } catch (InterruptedException e) { throw (InterruptedException) new InterruptedException("Interrupted").initCause(e); + } catch (KeeperException.BadVersionException e) { + throw new CasWriteFailed(String.format("version mismatch in cluster state version znode (expected %d): %s", + lastKnownStateVersionZNodeVersion, e.getMessage()), e); } catch (Exception e) { maybeLogExceptionWarning(e, "Failed to store latest system state version used " + version); return false; @@ -209,10 +217,13 @@ public class ZooKeeperDatabase extends Database { public Integer retrieveLatestSystemStateVersion() throws InterruptedException { Stat stat = new Stat(); try{ - log.log(LogLevel.DEBUG, "Fleetcontroller " + nodeIndex + ": Fetching latest cluster state at '" + zooKeeperRoot + "latestversion'"); + log.log(LogLevel.DEBUG, () -> String.format("Fleetcontroller %d: Fetching latest cluster state at '%slatestversion'", + nodeIndex, zooKeeperRoot)); byte[] data = session.getData(zooKeeperRoot + "latestversion", false, stat); + lastKnownStateVersionZNodeVersion = stat.getVersion(); final Integer versionNumber = Integer.valueOf(new String(data, utf8)); - log.log(LogLevel.INFO, String.format("Fleetcontroller %d: Read cluster state version %d from ZooKeeper", nodeIndex, versionNumber)); + log.log(LogLevel.INFO, String.format("Fleetcontroller %d: Read cluster state version %d from ZooKeeper " + + "(znode version %d)", nodeIndex, versionNumber, stat.getVersion())); return versionNumber; } catch (InterruptedException e) { throw (InterruptedException) new InterruptedException("Interrupted").initCause(e); @@ -336,12 +347,16 @@ public class ZooKeeperDatabase extends Database { EnvelopedClusterStateBundleCodec envelopedBundleCodec = new SlimeClusterStateBundleCodec(); byte[] encodedBundle = envelopedBundleCodec.encodeWithEnvelope(stateBundle); try{ - log.log(LogLevel.DEBUG, () -> String.format("Fleetcontroller %d: Storing published state bundle %s at '%spublished_state_bundle'", - nodeIndex, stateBundle, zooKeeperRoot)); - // TODO CAS on expected zknode version - session.setData(zooKeeperRoot + "published_state_bundle", encodedBundle, -1); + log.log(LogLevel.DEBUG, () -> String.format("Fleetcontroller %d: Storing published state bundle %s at " + + "'%spublished_state_bundle' with expected znode version %d", + nodeIndex, stateBundle, zooKeeperRoot, lastKnownStateBundleZNodeVersion)); + var stat = session.setData(zooKeeperRoot + "published_state_bundle", encodedBundle, lastKnownStateBundleZNodeVersion); + lastKnownStateBundleZNodeVersion = stat.getVersion(); } catch (InterruptedException e) { throw (InterruptedException) new InterruptedException("Interrupted").initCause(e); + } catch (KeeperException.BadVersionException e) { + throw new CasWriteFailed(String.format("version mismatch in cluster state bundle znode (expected %d): %s", + lastKnownStateBundleZNodeVersion, e.getMessage()), e); } catch (Exception e) { maybeLogExceptionWarning(e, "Failed to store last published cluster state bundle in ZooKeeper"); return false; @@ -354,6 +369,7 @@ public class ZooKeeperDatabase extends Database { Stat stat = new Stat(); try { byte[] data = session.getData(zooKeeperRoot + "published_state_bundle", false, stat); + lastKnownStateBundleZNodeVersion = stat.getVersion(); if (data != null && data.length != 0) { EnvelopedClusterStateBundleCodec envelopedBundleCodec = new SlimeClusterStateBundleCodec(); return envelopedBundleCodec.decodeWithEnvelope(data); diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ZooKeeperDatabaseTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ZooKeeperDatabaseTest.java index e5200b63a7e..6062f45c4de 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ZooKeeperDatabaseTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ZooKeeperDatabaseTest.java @@ -1,9 +1,12 @@ // Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.clustercontroller.core; +import com.yahoo.vespa.clustercontroller.core.database.CasWriteFailed; import com.yahoo.vespa.clustercontroller.core.database.Database; import com.yahoo.vespa.clustercontroller.core.database.ZooKeeperDatabase; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import java.io.IOException; import java.time.Duration; @@ -14,6 +17,9 @@ import static org.mockito.Mockito.mock; public class ZooKeeperDatabaseTest { + @Rule + public ExpectedException expectedException = ExpectedException.none(); + private static class Fixture implements AutoCloseable { final ZooKeeperTestServer zkServer; ClusterFixture clusterFixture; @@ -53,9 +59,8 @@ public class ZooKeeperDatabaseTest { public void can_store_and_load_cluster_state_bundle_from_database() throws Exception { try (Fixture f = new Fixture()) { f.createDatabase(); - ClusterStateBundle bundleToStore = ClusterStateBundleUtil.makeBundle("distributor:2 storage:2", - StateMapping.of("default", "distributor:2 storage:2 .0.s:d"), - StateMapping.of("upsidedown", "distributor:2 .0.s:d storage:2")); + f.db().retrieveLastPublishedStateBundle(); // Must be called once prior to prime last known znode version + ClusterStateBundle bundleToStore = dummyBundle(); f.db().storeLastPublishedStateBundle(bundleToStore); ClusterStateBundle bundleReceived = f.db().retrieveLastPublishedStateBundle(); @@ -63,6 +68,32 @@ public class ZooKeeperDatabaseTest { } } + private static ClusterStateBundle dummyBundle() { + return ClusterStateBundleUtil.makeBundle("distributor:2 storage:2", + StateMapping.of("default", "distributor:2 storage:2 .0.s:d"), + StateMapping.of("upsidedown", "distributor:2 .0.s:d storage:2")); + } + + @Test + public void storing_cluster_state_bundle_with_mismatching_expected_znode_version_throws_exception() throws Exception { + expectedException.expect(CasWriteFailed.class); + expectedException.expectMessage("version mismatch in cluster state bundle znode (expected -2)"); + try (Fixture f = new Fixture()) { + f.createDatabase(); + f.db().storeLastPublishedStateBundle(dummyBundle()); + } + } + + @Test + public void storing_cluster_state_version_with_mismatching_expected_znode_version_throws_exception() throws Exception { + expectedException.expect(CasWriteFailed.class); + expectedException.expectMessage("version mismatch in cluster state version znode (expected -2)"); + try (Fixture f = new Fixture()) { + f.createDatabase(); + f.db().storeLatestSystemStateVersion(12345); + } + } + @Test public void empty_state_bundle_is_returned_if_no_bundle_already_stored_in_database() throws Exception { try (Fixture f = new Fixture()) { |