diff options
Diffstat (limited to 'clustercontroller-core')
23 files changed, 716 insertions, 125 deletions
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateBundle.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateBundle.java index 9291c8277da..76177e8f1c1 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateBundle.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateBundle.java @@ -78,6 +78,10 @@ public class ClusterStateBundle { return new ClusterStateBundle(baselineState, Collections.emptyMap()); } + public static ClusterStateBundle empty() { + return ofBaselineOnly(AnnotatedClusterState.emptyState()); + } + public AnnotatedClusterState getBaselineAnnotatedState() { return baselineState; } diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java index 54c6c11f81e..e781bf3b145 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java @@ -10,6 +10,7 @@ import com.yahoo.vdslib.state.Node; import com.yahoo.vdslib.state.NodeState; import com.yahoo.vdslib.state.State; import com.yahoo.vespa.clustercontroller.core.database.DatabaseHandler; +import com.yahoo.vespa.clustercontroller.core.database.ZooKeeperDatabaseFactory; import com.yahoo.vespa.clustercontroller.core.hostinfo.HostInfo; import com.yahoo.vespa.clustercontroller.core.listeners.*; import com.yahoo.vespa.clustercontroller.core.rpc.RPCCommunicator; @@ -25,7 +26,18 @@ import org.apache.commons.lang.exception.ExceptionUtils; import java.io.FileNotFoundException; -import java.util.*; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.TimeZone; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Logger; import java.util.stream.Collectors; @@ -54,7 +66,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd private AtomicBoolean running = new AtomicBoolean(true); private FleetControllerOptions options; private FleetControllerOptions nextOptions; - private final List<SystemStateListener> systemStateListeners = new LinkedList<>(); + private final List<SystemStateListener> systemStateListeners = new CopyOnWriteArrayList<>(); private boolean processingCycle = false; private boolean wantedStateChanged = false; private long cycleCount = 0; @@ -185,7 +197,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd options.nodeStateRequestTimeoutEarliestPercentage, options.nodeStateRequestTimeoutLatestPercentage, options.nodeStateRequestRoundTripTimeMaxSeconds); - DatabaseHandler database = new DatabaseHandler(timer, options.zooKeeperServerAddress, options.fleetControllerIndex, timer); + DatabaseHandler database = new DatabaseHandler(new ZooKeeperDatabaseFactory(), timer, options.zooKeeperServerAddress, options.fleetControllerIndex, timer); NodeLookup lookUp = new SlobrokClient(timer); StateChangeHandler stateGenerator = new StateChangeHandler(timer, log, metricUpdater); SystemStateBroadcaster stateBroadcaster = new SystemStateBroadcaster(timer, timer); @@ -219,6 +231,12 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd } } + public ClusterStateBundle getClusterStateBundle() { + synchronized (monitor) { + return systemStateBroadcaster.getClusterStateBundle(); + } + } + public void schedule(RemoteClusterControllerTask task) { synchronized (monitor) { log.fine("Scheduled remote task " + task.getClass().getName() + " for execution"); @@ -228,13 +246,13 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd /** Used for unit testing. */ public void addSystemStateListener(SystemStateListener listener) { - synchronized (systemStateListeners) { - systemStateListeners.add(listener); - // Always give cluster state listeners the current state, in case acceptable state has come before listener is registered. - com.yahoo.vdslib.state.ClusterState state = getSystemState(); - if (state == null) throw new NullPointerException("Cluster state should never be null at this point"); - listener.handleNewSystemState(ClusterStateBundle.ofBaselineOnly(AnnotatedClusterState.withoutAnnotations(state))); + systemStateListeners.add(listener); + // Always give cluster state listeners the current state, in case acceptable state has come before listener is registered. + com.yahoo.vdslib.state.ClusterState state = getSystemState(); + if (state == null) { + throw new NullPointerException("Cluster state should never be null at this point"); } + listener.handleNewPublishedState(ClusterStateBundle.ofBaselineOnly(AnnotatedClusterState.withoutAnnotations(state))); } public FleetControllerOptions getOptions() { @@ -351,7 +369,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd } @Override - public void handleNewSystemState(ClusterStateBundle stateBundle) { + public void handleNewPublishedState(ClusterStateBundle stateBundle) { verifyInControllerThread(); ClusterState baselineState = stateBundle.getBaselineClusterState(); newStates.add(stateBundle); @@ -361,13 +379,14 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd // nodes so that a cluster controller crash after publishing but before a successful // ZK store will not risk reusing the same version number. if (masterElectionHandler.isMaster()) { - storeClusterStateVersionToZooKeeper(baselineState); + storeClusterStateMetaDataToZooKeeper(stateBundle); } } - private void storeClusterStateVersionToZooKeeper(ClusterState state) { + private void storeClusterStateMetaDataToZooKeeper(ClusterStateBundle stateBundle) { try { - database.saveLatestSystemStateVersion(databaseContext, state.getVersion()); + database.saveLatestSystemStateVersion(databaseContext, stateBundle.getVersion()); + database.saveLatestClusterStateBundle(databaseContext, stateBundle); } catch (InterruptedException e) { // Rethrow as RuntimeException to propagate exception up to main thread method. // Don't want to hide failures to write cluster state version. @@ -654,6 +673,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd } sentAny = systemStateBroadcaster.broadcastNewState(databaseContext, communicator); if (sentAny) { + // FIXME won't this inhibit resending to unresponsive nodes? nextStateSendTime = currentTime + options.minTimeBetweenNewSystemStates; } } @@ -665,7 +685,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd synchronized (systemStateListeners) { for (ClusterStateBundle stateBundle : newStates) { for(SystemStateListener listener : systemStateListeners) { - listener.handleNewSystemState(stateBundle); + listener.handleNewPublishedState(stateBundle); } } newStates.clear(); @@ -782,12 +802,21 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd eventLog.add(new ClusterEvent(ClusterEvent.Type.MASTER_ELECTION, "This node just became node state gatherer as we are fleetcontroller master candidate.", timer.getCurrentTimeInMillis())); // Update versions to use so what is shown is closer to what is reality on the master stateVersionTracker.setVersionRetrievedFromZooKeeper(database.getLatestSystemStateVersion()); + stateChangeHandler.setStateChangedFlag(); } } isStateGatherer = true; return didWork; } + private void invokeCandidateStateListeners(ClusterStateBundle candidateBundle) { + systemStateListeners.forEach(listener -> listener.handleNewCandidateState(candidateBundle)); + } + + private boolean hasPassedFirstStateBroadcastTimePoint(long timeNowMs) { + return timeNowMs >= firstAllowedStateBroadcast || cluster.allStatesReported(); + } + private boolean recomputeClusterStateIfRequired() { boolean stateWasChanged = false; if (mustRecomputeCandidateClusterState()) { @@ -800,16 +829,18 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd .stateDeriver(createBucketSpaceStateDeriver()) .deriveAndBuild(); stateVersionTracker.updateLatestCandidateStateBundle(candidateBundle); + invokeCandidateStateListeners(candidateBundle); - if (stateVersionTracker.candidateChangedEnoughFromCurrentToWarrantPublish() - || stateVersionTracker.hasReceivedNewVersionFromZooKeeper()) + final long timeNowMs = timer.getCurrentTimeInMillis(); + if (hasPassedFirstStateBroadcastTimePoint(timeNowMs) + && (stateVersionTracker.candidateChangedEnoughFromCurrentToWarrantPublish() + || stateVersionTracker.hasReceivedNewVersionFromZooKeeper())) { - final long timeNowMs = timer.getCurrentTimeInMillis(); final ClusterStateBundle before = stateVersionTracker.getVersionedClusterStateBundle(); stateVersionTracker.promoteCandidateToVersionedState(timeNowMs); emitEventsForAlteredStateEdges(before, stateVersionTracker.getVersionedClusterStateBundle(), timeNowMs); - handleNewSystemState(stateVersionTracker.getVersionedClusterStateBundle()); + handleNewPublishedState(stateVersionTracker.getVersionedClusterStateBundle()); stateWasChanged = true; } } @@ -893,10 +924,19 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd } } + private boolean atFirstClusterStateSendTimeEdge() { + // We only care about triggering a state recomputation for the master, which is the only + // one allowed to actually broadcast any states. + if (!isMaster || systemStateBroadcaster.hasBroadcastedClusterStateBundle()) { + return false; + } + return hasPassedFirstStateBroadcastTimePoint(timer.getCurrentTimeInMillis()); + } + private boolean mustRecomputeCandidateClusterState() { return stateChangeHandler.stateMayHaveChanged() - || stateVersionTracker.hasReceivedNewVersionFromZooKeeper() - || stateVersionTracker.bucketSpaceMergeCompletionStateHasChanged(); + || stateVersionTracker.bucketSpaceMergeCompletionStateHasChanged() + || atFirstClusterStateSendTimeEdge(); } private boolean handleLeadershipEdgeTransitions() throws InterruptedException { @@ -904,16 +944,25 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd if (masterElectionHandler.isMaster()) { if ( ! isMaster) { metricUpdater.becameMaster(); - // If we just became master, restore wanted states from database + // If we just became master, restore state from ZooKeeper + stateChangeHandler.setStateChangedFlag(); + systemStateBroadcaster.resetBroadcastedClusterStateBundle(); + stateVersionTracker.setVersionRetrievedFromZooKeeper(database.getLatestSystemStateVersion()); - didWork = database.loadStartTimestamps(cluster); - didWork |= database.loadWantedStates(databaseContext); + ClusterStateBundle previousBundle = database.getLatestClusterStateBundle(); + database.loadStartTimestamps(cluster); + database.loadWantedStates(databaseContext); + + log.info(() -> String.format("Loaded previous cluster state bundle from ZooKeeper: %s", previousBundle)); + stateVersionTracker.setClusterStateBundleRetrievedFromZooKeeper(previousBundle); + eventLog.add(new ClusterEvent(ClusterEvent.Type.MASTER_ELECTION, "This node just became fleetcontroller master. Bumped version to " + stateVersionTracker.getCurrentVersion() + " to be in line.", timer.getCurrentTimeInMillis())); long currentTime = timer.getCurrentTimeInMillis(); firstAllowedStateBroadcast = currentTime + options.minTimeBeforeFirstSystemStateBroadcast; log.log(LogLevel.DEBUG, "At time " + currentTime + " we set first system state broadcast time to be " + options.minTimeBeforeFirstSystemStateBroadcast + " ms after at time " + firstAllowedStateBroadcast + "."); + didWork = true; } isMaster = true; if (wantedStateChanged) { diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/StateVersionTracker.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/StateVersionTracker.java index 177efe31766..e2f98cf5492 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/StateVersionTracker.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/StateVersionTracker.java @@ -50,6 +50,21 @@ public class StateVersionTracker { this.lastZooKeeperVersion = this.currentVersion; } + void setClusterStateBundleRetrievedFromZooKeeper(ClusterStateBundle bundle) { + // There is an edge where the bundle version will mismatch with the version set + // via setVersionRetrievedFromZooKeeper() if the controller (or ZK) crashes before + // it can write both sequentially. But since we use the ZK-written version explicitly + // when choosing a new version for our own published states, it should not matter in + // practice. Worst case is that the current state reflects the same version that a + // previous controller had, but we will never publish this state ourselves; publishing + // only happens after we've generated our own, new candidate state and overwritten + // the empty states set below. Publishing also, as mentioned, sets a version based on + // the ZK version, not the version stored in the bundle itself. + currentClusterState = bundle; + currentUnversionedState = ClusterStateBundle.empty(); + latestCandidateState = ClusterStateBundle.empty(); + } + /** * Sets limit on how many cluster states can be kept in the in-memory queue. Once * the list exceeds this limit, the oldest state is repeatedly removed until the limit @@ -140,8 +155,7 @@ public class StateVersionTracker { } private void recordCurrentStateInHistoryAtTime(final long currentTimeMs) { - clusterStateHistory.addFirst(new ClusterStateHistoryEntry( - currentClusterState, currentTimeMs)); + clusterStateHistory.addFirst(new ClusterStateHistoryEntry(currentClusterState, currentTimeMs)); while (clusterStateHistory.size() > maxHistoryEntryCount) { clusterStateHistory.removeLast(); } diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java index b0869b560f6..629800fb13c 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java @@ -40,6 +40,18 @@ public class SystemStateBroadcaster { return clusterStateBundle.getBaselineClusterState(); } + public boolean hasBroadcastedClusterStateBundle() { + return clusterStateBundle != null; + } + + public void resetBroadcastedClusterStateBundle() { + clusterStateBundle = null; + } + + public ClusterStateBundle getClusterStateBundle() { + return clusterStateBundle; + } + private void reportNodeError(boolean nodeOk, NodeInfo info, String message) { long time = timer.getCurrentTimeInMillis(); Long lastReported = lastErrorReported.get(info.getNode()); @@ -113,10 +125,10 @@ public class SystemStateBroadcaster { void checkIfClusterStateIsAckedByAllDistributors(DatabaseHandler database, DatabaseHandler.Context dbContext, FleetController fleetController) throws InterruptedException { - final int currentStateVersion = clusterStateBundle.getVersion(); - if ((clusterStateBundle == null) || (lastClusterStateInSync == currentStateVersion)) { + if ((clusterStateBundle == null) || (lastClusterStateInSync == clusterStateBundle.getVersion())) { return; // Nothing to do for the current state } + final int currentStateVersion = clusterStateBundle.getVersion(); boolean anyOutdatedDistributorNodes = dbContext.getCluster().getNodeInfo().stream() .filter(NodeInfo::isDistributor) .anyMatch(this::nodeNeedsClusterState); @@ -137,7 +149,7 @@ public class SystemStateBroadcaster { if (!baselineState.isOfficial()) { log.log(LogLevel.INFO, String.format("Publishing cluster state version %d", baselineState.getVersion())); - baselineState.setOfficial(true); + baselineState.setOfficial(true); // FIXME this violates state bundle immutability } List<NodeInfo> recipients = resolveStateVersionSendSet(dbContext); diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/Database.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/Database.java index ac9f1473b57..c98e362c2d2 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/Database.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/Database.java @@ -3,6 +3,7 @@ package com.yahoo.vespa.clustercontroller.core.database; import com.yahoo.vdslib.state.Node; import com.yahoo.vdslib.state.NodeState; +import com.yahoo.vespa.clustercontroller.core.ClusterStateBundle; import java.util.Map; @@ -82,4 +83,9 @@ public abstract class Database { * Fetch the start times of distributor and service layer nodes. */ public abstract Map<Node, Long> retrieveStartTimestamps() throws InterruptedException; + + public abstract boolean storeLastPublishedStateBundle(ClusterStateBundle stateBundle) throws InterruptedException; + + public abstract ClusterStateBundle retrieveLastPublishedStateBundle() throws InterruptedException; + } diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/DatabaseFactory.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/DatabaseFactory.java new file mode 100644 index 00000000000..3e7b98fe3fb --- /dev/null +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/DatabaseFactory.java @@ -0,0 +1,34 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.clustercontroller.core.database; + +import com.yahoo.vespa.clustercontroller.core.ContentCluster; + +/** + * Database factory to enable test mocking of DB features. In practice, this + * will always be {@link ZooKeeperDatabase} due to rather heavy ZK feature + * dependencies and leaky abstractions built on top of them. + */ +public interface DatabaseFactory { + + class Params { + ContentCluster cluster; + int nodeIndex; + String dbAddress; + int dbSessionTimeout; + Database.DatabaseListener listener; + + Params cluster(ContentCluster c) { this.cluster = c; return this; } + ContentCluster cluster() { return cluster; } + Params nodeIndex(int i) { this.nodeIndex = i; return this; } + int nodeIndex() { return nodeIndex; } + Params databaseAddress(String address) { this.dbAddress = address; return this; } + String databaseAddress() { return dbAddress; } + Params databaseSessionTimeout(int timeout) { this.dbSessionTimeout = timeout; return this; } + int databaseSessionTimeout() { return dbSessionTimeout; } + Params databaseListener(Database.DatabaseListener listener) { this.listener = listener; return this; } + Database.DatabaseListener databaseListener() { return listener; } + } + + Database create(Params params) throws Exception; + +} diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/DatabaseHandler.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/DatabaseHandler.java index c89bb6a136a..fcf5b58e62d 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/DatabaseHandler.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/DatabaseHandler.java @@ -5,6 +5,8 @@ import com.yahoo.log.LogLevel; import com.yahoo.vdslib.state.Node; import com.yahoo.vdslib.state.NodeState; import com.yahoo.vdslib.state.State; +import com.yahoo.vespa.clustercontroller.core.AnnotatedClusterState; +import com.yahoo.vespa.clustercontroller.core.ClusterStateBundle; import com.yahoo.vespa.clustercontroller.core.FleetController; import com.yahoo.vespa.clustercontroller.core.NodeInfo; import com.yahoo.vespa.clustercontroller.core.Timer; @@ -39,12 +41,14 @@ public class DatabaseHandler { Integer lastSystemStateVersion; Map<Node, NodeState> wantedStates; Map<Node, Long> startTimestamps; + ClusterStateBundle clusterStateBundle; void clear() { masterVote = null; lastSystemStateVersion = null; wantedStates = null; startTimestamps = null; + clusterStateBundle = null; } } private class DatabaseListener implements Database.DatabaseListener { @@ -68,15 +72,13 @@ public class DatabaseHandler { } } + private final DatabaseFactory databaseFactory; private final Timer timer; private final int nodeIndex; private final Object monitor; private String zooKeeperAddress; private int zooKeeperSessionTimeout = 5000; private final Object databaseMonitor = new Object(); - - /** This is always ZooKeeperDatabase */ - // TODO: Get rid of the interface as it is both unnecessary and gives a false impression of independence private Database database; private DatabaseListener dbListener = new DatabaseListener(); @@ -87,8 +89,9 @@ public class DatabaseHandler { private boolean lostZooKeeperConnectionEvent = false; private Map<Integer, Integer> masterDataEvent = null; - public DatabaseHandler(Timer timer, String zooKeeperAddress, int ourIndex, Object monitor) throws InterruptedException + public DatabaseHandler(DatabaseFactory databaseFactory, Timer timer, String zooKeeperAddress, int ourIndex, Object monitor) throws InterruptedException { + this.databaseFactory = databaseFactory; this.timer = timer; this.nodeIndex = ourIndex; pendingStore.masterVote = ourIndex; // To begin with we'll vote for ourselves. @@ -166,8 +169,13 @@ public class DatabaseHandler { clearSessionMetaData(); log.log(LogLevel.INFO, "Fleetcontroller " + nodeIndex + ": Setting up new ZooKeeper session at " + zooKeeperAddress); - database = new ZooKeeperDatabase(cluster, - nodeIndex, zooKeeperAddress, zooKeeperSessionTimeout, dbListener); + DatabaseFactory.Params params = new DatabaseFactory.Params() + .cluster(cluster) + .nodeIndex(nodeIndex) + .databaseAddress(zooKeeperAddress) + .databaseSessionTimeout(zooKeeperSessionTimeout) + .databaseListener(dbListener); + database = databaseFactory.create(params); } } catch (KeeperException.NodeExistsException e) { log.log(LogLevel.DEBUG, "Fleetcontroller " + nodeIndex + ": Cannot create ephemeral fleetcontroller node. ZooKeeper server " @@ -279,6 +287,15 @@ public class DatabaseHandler { return didWork; } } + if (pendingStore.clusterStateBundle != null) { + didWork = true; + if (database.storeLastPublishedStateBundle(pendingStore.clusterStateBundle)) { + currentlyStored.clusterStateBundle = pendingStore.clusterStateBundle; + pendingStore.clusterStateBundle = null; + } else { + return true; + } + } } return didWork; } @@ -325,11 +342,28 @@ public class DatabaseHandler { if (usingZooKeeper()) { log.log(LogLevel.WARNING, "Fleetcontroller " + nodeIndex + ": Failed to retrieve latest system state version from ZooKeeper. Returning version 0."); } - return 0; + return 0; // FIXME "fail-oblivious" is not a good error handling mode for such a critical component! } return version; } + public void saveLatestClusterStateBundle(Context context, ClusterStateBundle clusterStateBundle) throws InterruptedException { + log.log(LogLevel.DEBUG, () -> String.format("Fleetcontroller %d: Scheduling bundle %s to be saved to ZooKeeper", nodeIndex, clusterStateBundle)); + pendingStore.clusterStateBundle = clusterStateBundle; + doNextZooKeeperTask(context); + } + + public ClusterStateBundle getLatestClusterStateBundle() throws InterruptedException { + log.log(LogLevel.DEBUG, () -> String.format("Fleetcontroller %d: Retrieving latest cluster state bundle from ZooKeeper", nodeIndex)); + synchronized (databaseMonitor) { + if (database != null && !database.isClosed()) { + return database.retrieveLastPublishedStateBundle(); + } else { + return ClusterStateBundle.empty(); + } + } + } + public void saveWantedStates(Context context) throws InterruptedException { log.log(LogLevel.DEBUG, "Fleetcontroller " + nodeIndex + ": Checking whether wanted states have changed compared to zookeeper version."); Map<Node, NodeState> wantedStates = new TreeMap<>(); @@ -399,7 +433,9 @@ public class DatabaseHandler { public boolean loadStartTimestamps(ContentCluster cluster) throws InterruptedException { log.log(LogLevel.DEBUG, "Fleetcontroller " + nodeIndex + ": Retrieving start timestamps"); synchronized (databaseMonitor) { - if (database == null || database.isClosed()) return false; + if (database == null || database.isClosed()) { + return false; + } currentlyStored.startTimestamps = database.retrieveStartTimestamps(); } Map<Node, Long> startTimestamps = currentlyStored.startTimestamps; diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/ZooKeeperDatabase.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/ZooKeeperDatabase.java index 5399430fd59..fe1d9a2a5cd 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/ZooKeeperDatabase.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/ZooKeeperDatabase.java @@ -1,7 +1,11 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.clustercontroller.core.database; +import com.yahoo.vespa.clustercontroller.core.AnnotatedClusterState; +import com.yahoo.vespa.clustercontroller.core.ClusterStateBundle; import com.yahoo.vespa.clustercontroller.core.ContentCluster; +import com.yahoo.vespa.clustercontroller.core.rpc.EnvelopedClusterStateBundleCodec; +import com.yahoo.vespa.clustercontroller.core.rpc.SlimeClusterStateBundleCodec; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.data.ACL; @@ -128,6 +132,7 @@ public class ZooKeeperDatabase extends Database { createNode(zooKeeperRoot, "wantedstates", new byte[0]); createNode(zooKeeperRoot, "starttimestamps", new byte[0]); createNode(zooKeeperRoot, "latestversion", new Integer(0).toString().getBytes(utf8)); + createNode(zooKeeperRoot, "published_state_bundle", new byte[0]); // TODO dedupe string constants byte val[] = String.valueOf(nodeIndex).getBytes(utf8); deleteNodeIfExists(getMyIndexPath()); log.log(LogLevel.INFO, "Fleetcontroller " + nodeIndex + @@ -165,6 +170,15 @@ public class ZooKeeperDatabase extends Database { return (!sessionOpen || watcher.getState().equals(Watcher.Event.KeeperState.Expired)); } + private void maybeLogExceptionWarning(Exception e, String message) { + if (sessionOpen && reportErrors) { + StringWriter sw = new StringWriter(); + e.printStackTrace(new PrintWriter(sw)); + log.log(LogLevel.WARNING, String.format("Fleetcontroller %s: %s. Exception: %s\n%s", + nodeIndex, message, e.getMessage(), sw.toString())); + } + } + public boolean storeMasterVote(int wantedMasterIndex) throws InterruptedException { byte val[] = String.valueOf(wantedMasterIndex).getBytes(utf8); try{ @@ -174,11 +188,7 @@ public class ZooKeeperDatabase extends Database { } catch (InterruptedException e) { throw (InterruptedException) new InterruptedException("Interrupted").initCause(e); } catch (Exception e) { - if (sessionOpen && reportErrors) { - StringWriter sw = new StringWriter(); - e.printStackTrace(new PrintWriter(sw)); - log.log(LogLevel.WARNING, "Fleetcontroller " + nodeIndex + ": Failed to create our ephemeral node and store master vote:\n" + sw); - } + maybeLogExceptionWarning(e, "Failed to create our ephemeral node and store master vote"); } return false; } @@ -191,11 +201,7 @@ public class ZooKeeperDatabase extends Database { } catch (InterruptedException e) { throw (InterruptedException) new InterruptedException("Interrupted").initCause(e); } catch (Exception e) { - if (sessionOpen && reportErrors) { - StringWriter sw = new StringWriter(); - e.printStackTrace(new PrintWriter(sw)); - log.log(LogLevel.WARNING, "Fleetcontroller " + nodeIndex + ": Failed to store latest system state version used " + version + "\n" + sw); - } + maybeLogExceptionWarning(e, "Failed to store latest system state version used " + version); return false; } } @@ -211,11 +217,7 @@ public class ZooKeeperDatabase extends Database { } catch (InterruptedException e) { throw (InterruptedException) new InterruptedException("Interrupted").initCause(e); } catch (Exception e) { - if (sessionOpen && reportErrors) { - StringWriter sw = new StringWriter(); - e.printStackTrace(new PrintWriter(sw)); - log.log(LogLevel.WARNING, "Fleetcontroller " + nodeIndex + ": Failed to retrieve latest system state version used. Returning null.\n" + sw); - } + maybeLogExceptionWarning(e, "Failed to retrieve latest system state version used. Returning null"); return null; } } @@ -242,11 +244,7 @@ public class ZooKeeperDatabase extends Database { } catch (InterruptedException e) { throw (InterruptedException) new InterruptedException("Interrupted").initCause(e); } catch (Exception e) { - if (sessionOpen && reportErrors) { - StringWriter sw = new StringWriter(); - e.printStackTrace(new PrintWriter(sw)); - log.log(LogLevel.WARNING, "Fleetcontroller " + nodeIndex + ": Failed to store wanted states in zookeeper: " + e.getMessage() + "\n" + sw); - } + maybeLogExceptionWarning(e, "Failed to store wanted states in ZooKeeper"); return false; } } @@ -276,11 +274,7 @@ public class ZooKeeperDatabase extends Database { } catch (InterruptedException e) { throw (InterruptedException) new InterruptedException("Interrupted").initCause(e); } catch (Exception e) { - if (sessionOpen && reportErrors) { - StringWriter sw = new StringWriter(); - e.printStackTrace(new PrintWriter(sw)); - log.log(LogLevel.WARNING, "Fleetcontroller " + nodeIndex + ": Failed to retrieve wanted states from zookeeper: " + e.getMessage() + "\n" + sw); - } + maybeLogExceptionWarning(e, "Failed to retrieve wanted states from ZooKeeper"); return null; } } @@ -301,11 +295,7 @@ public class ZooKeeperDatabase extends Database { } catch (InterruptedException e) { throw (InterruptedException) new InterruptedException("Interrupted").initCause(e); } catch (Exception e) { - if (sessionOpen && reportErrors) { - StringWriter sw = new StringWriter(); - e.printStackTrace(new PrintWriter(sw)); - log.log(LogLevel.WARNING, "Fleetcontroller " + nodeIndex + ": Failed to store start timestamps in zookeeper: " + e.getMessage() + "\n" + sw); - } + maybeLogExceptionWarning(e, "Failed to store start timestamps in ZooKeeper"); return false; } } @@ -336,12 +326,45 @@ public class ZooKeeperDatabase extends Database { } catch (InterruptedException e) { throw (InterruptedException) new InterruptedException("Interrupted").initCause(e); } catch (Exception e) { - if (sessionOpen && reportErrors) { - StringWriter sw = new StringWriter(); - e.printStackTrace(new PrintWriter(sw)); - log.log(LogLevel.WARNING, "Fleetcontroller " + nodeIndex + ": Failed to retrieve start timestamps from zookeeper: " + e.getMessage() + "\n" + sw); - } + maybeLogExceptionWarning(e, "Failed to retrieve start timestamps from ZooKeeper"); return null; } } + + @Override + public boolean storeLastPublishedStateBundle(ClusterStateBundle stateBundle) throws InterruptedException { + EnvelopedClusterStateBundleCodec envelopedBundleCodec = new SlimeClusterStateBundleCodec(); + byte[] encodedBundle = envelopedBundleCodec.encodeWithEnvelope(stateBundle); + try{ + log.log(LogLevel.DEBUG, () -> String.format("Fleetcontroller %d: Storing published state bundle %s at '%spublished_state_bundle'", + nodeIndex, stateBundle, zooKeeperRoot)); + // TODO CAS on expected zknode version + session.setData(zooKeeperRoot + "published_state_bundle", encodedBundle, -1); + } catch (InterruptedException e) { + throw (InterruptedException) new InterruptedException("Interrupted").initCause(e); + } catch (Exception e) { + maybeLogExceptionWarning(e, "Failed to store start timestamps in ZooKeeper"); + return false; + } + return true; + } + + @Override + public ClusterStateBundle retrieveLastPublishedStateBundle() throws InterruptedException { + Stat stat = new Stat(); + try { + byte[] data = session.getData(zooKeeperRoot + "published_state_bundle", false, stat); + if (data != null && data.length != 0) { + EnvelopedClusterStateBundleCodec envelopedBundleCodec = new SlimeClusterStateBundleCodec(); + return envelopedBundleCodec.decodeWithEnvelope(data); + } + } catch (InterruptedException e) { + throw (InterruptedException) new InterruptedException("Interrupted").initCause(e); + } catch (Exception e) { + maybeLogExceptionWarning(e, "Failed to retrieve last published cluster state bundle from " + + "ZooKeeper, will use an empty state as baseline"); + } + return ClusterStateBundle.ofBaselineOnly(AnnotatedClusterState.emptyState()); + } + } diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/ZooKeeperDatabaseFactory.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/ZooKeeperDatabaseFactory.java new file mode 100644 index 00000000000..64dfcccebc9 --- /dev/null +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/ZooKeeperDatabaseFactory.java @@ -0,0 +1,14 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.clustercontroller.core.database; + +import com.yahoo.vespa.clustercontroller.core.ContentCluster; + +public class ZooKeeperDatabaseFactory implements DatabaseFactory { + + @Override + public Database create(Params params) throws Exception { + return new ZooKeeperDatabase(params.cluster, params.nodeIndex, params.dbAddress, + params.dbSessionTimeout, params.listener); + } + +} diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/listeners/SystemStateListener.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/listeners/SystemStateListener.java index 40fe91f0c1e..764bb3a0d92 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/listeners/SystemStateListener.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/listeners/SystemStateListener.java @@ -5,6 +5,9 @@ import com.yahoo.vespa.clustercontroller.core.ClusterStateBundle; public interface SystemStateListener { - void handleNewSystemState(ClusterStateBundle states); + // TODO consider rename to bundle + void handleNewPublishedState(ClusterStateBundle states); + + default void handleNewCandidateState(ClusterStateBundle states) {} } diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/EnvelopedClusterStateBundleCodec.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/EnvelopedClusterStateBundleCodec.java new file mode 100644 index 00000000000..ab98952cddb --- /dev/null +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/EnvelopedClusterStateBundleCodec.java @@ -0,0 +1,19 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.clustercontroller.core.rpc; + +import com.yahoo.vespa.clustercontroller.core.ClusterStateBundle; + +/** + * Cluster state bundle codec which opaquely encodes/decodes both the bundle + * as well as the metadata required to correctly perform compression/decompression. + * + * Useful for embedding an opaque bundle blob somewhere without needing to care aboout + * any of the associated metadata. + */ +public interface EnvelopedClusterStateBundleCodec { + + byte[] encodeWithEnvelope(ClusterStateBundle stateBundle); + + ClusterStateBundle decodeWithEnvelope(byte[] encodedClusterStateBundle); + +} diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/SlimeClusterStateBundleCodec.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/SlimeClusterStateBundleCodec.java index f6034cb34ff..1c391f9aacf 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/SlimeClusterStateBundleCodec.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/SlimeClusterStateBundleCodec.java @@ -18,8 +18,11 @@ import java.util.Map; * * LZ4 compression is transparently applied during encoding and decompression is * subsequently applied during decoding. + * + * Implements optional Slime-based enveloping for *WithEnvelope methods, which removes + * need to explicitly track compression metadata by the caller. */ -public class SlimeClusterStateBundleCodec implements ClusterStateBundleCodec { +public class SlimeClusterStateBundleCodec implements ClusterStateBundleCodec, EnvelopedClusterStateBundleCodec { private static final Compressor compressor = new Compressor(CompressionType.LZ4, 3, 0.90, 1024); @@ -55,4 +58,30 @@ public class SlimeClusterStateBundleCodec implements ClusterStateBundleCodec { return ClusterStateBundle.of(AnnotatedClusterState.withoutAnnotations(baseline), derivedStates); } + + // Technically the Slime enveloping could be its own class that is bundle codec independent, but + // realistically there won't be any other implementations. Can be trivially factored out if required. + @Override + public byte[] encodeWithEnvelope(ClusterStateBundle stateBundle) { + EncodedClusterStateBundle toEnvelope = encode(stateBundle); + + Slime slime = new Slime(); + Cursor root = slime.setObject(); + root.setLong("compression-type", toEnvelope.getCompression().type().getCode()); + root.setLong("uncompressed-size", toEnvelope.getCompression().uncompressedSize()); + root.setData("data", toEnvelope.getCompression().data()); + return BinaryFormat.encode(slime); + } + + @Override + public ClusterStateBundle decodeWithEnvelope(byte[] encodedClusterStateBundle) { + // We expect ZK writes to be atomic, letting us not worry about partially written or corrupted blobs. + Slime slime = BinaryFormat.decode(encodedClusterStateBundle); + Inspector root = slime.get(); + CompressionType compressionType = CompressionType.valueOf((byte)root.field("compression-type").asLong()); + int uncompressedSize = (int)root.field("uncompressed-size").asLong(); + byte[] data = root.field("data").asData(); + return decode(EncodedClusterStateBundle.fromCompressionBuffer( + new Compressor.Compression(compressionType, uncompressedSize, data))); + } } diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DatabaseHandlerTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DatabaseHandlerTest.java new file mode 100644 index 00000000000..f929cd0a605 --- /dev/null +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DatabaseHandlerTest.java @@ -0,0 +1,107 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.clustercontroller.core; + +import com.yahoo.vespa.clustercontroller.core.database.Database; +import com.yahoo.vespa.clustercontroller.core.database.DatabaseFactory; +import com.yahoo.vespa.clustercontroller.core.database.DatabaseHandler; +import com.yahoo.vespa.clustercontroller.core.listeners.NodeAddedOrRemovedListener; +import com.yahoo.vespa.clustercontroller.core.listeners.NodeStateOrHostInfoChangeHandler; +import org.junit.Test; + +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class DatabaseHandlerTest { + + static class Fixture { + final ClusterFixture clusterFixture = ClusterFixture.forFlatCluster(10); + final FleetController mockController = mock(FleetController.class); + final Database mockDatabase = mock(Database.class); + final Timer mockTimer = mock(Timer.class); + final DatabaseFactory mockDbFactory = (params) -> mockDatabase; + final String databaseAddress = "localhost:0"; + final Object monitor = new Object(); + final ClusterStateBundle dummyBundle; + + Fixture() throws Exception { + dummyBundle = ClusterStateBundleUtil.makeBundle("distributor:2 storage:2", + StateMapping.of("default", "distributor:2 storage:2 .0.s:d"), + StateMapping.of("upsidedown", "distributor:2 .0.s:d storage:2")); + + when(mockDatabase.isClosed()).thenReturn(false); + when(mockDatabase.storeMasterVote(anyInt())).thenReturn(true); + when(mockDatabase.storeLastPublishedStateBundle(any())).thenReturn(true); + when(mockTimer.getCurrentTimeInMillis()).thenReturn(1000000L); + } + + DatabaseHandler.Context createMockContext() { + return new DatabaseHandler.Context() { + @Override + public ContentCluster getCluster() { + return clusterFixture.cluster(); + } + + @Override + public FleetController getFleetController() { + return mockController; + } + + @Override + public NodeAddedOrRemovedListener getNodeAddedOrRemovedListener() { + return null; + } + + @Override + public NodeStateOrHostInfoChangeHandler getNodeStateUpdateListener() { + return null; + } + }; + } + + DatabaseHandler createHandler() throws Exception { + return new DatabaseHandler(mockDbFactory, mockTimer, databaseAddress, 0, monitor); + } + } + + @Test + public void can_store_latest_cluster_state_bundle() throws Exception { + Fixture f = new Fixture(); + DatabaseHandler handler = f.createHandler(); + handler.doNextZooKeeperTask(f.createMockContext()); // Database setup step + handler.saveLatestClusterStateBundle(f.createMockContext(), f.dummyBundle); + handler.doNextZooKeeperTask(f.createMockContext()); // Bundle store step + + verify(f.mockDatabase).storeLastPublishedStateBundle(eq(f.dummyBundle)); + } + + @Test + public void can_load_latest_cluster_state_bundle() throws Exception { + Fixture f = new Fixture(); + DatabaseHandler handler = f.createHandler(); + handler.doNextZooKeeperTask(f.createMockContext()); // Database setup step + + when(f.mockDatabase.retrieveLastPublishedStateBundle()).thenReturn(f.dummyBundle); + + ClusterStateBundle retrievedBundle = handler.getLatestClusterStateBundle(); + assertThat(retrievedBundle, equalTo(f.dummyBundle)); + } + + // FIXME I don't like the semantics of this, but it mirrors the legacy behavior for the + // rest of the DB load operations exposed by the DatabaseHandler. + @Test + public void empty_bundle_is_returned_if_no_db_connection() throws Exception { + Fixture f = new Fixture(); + DatabaseHandler handler = f.createHandler(); + // Note: no DB setup step + + ClusterStateBundle retrievedBundle = handler.getLatestClusterStateBundle(); + assertThat(retrievedBundle, equalTo(ClusterStateBundle.empty())); + } + +} diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java index a2da6bd1c6c..6d59a672e86 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java @@ -1,8 +1,6 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.clustercontroller.core; -import com.yahoo.compress.CompressionType; -import com.yahoo.compress.Compressor; import com.yahoo.jrt.*; import com.yahoo.jrt.StringValue; import com.yahoo.jrt.slobrok.api.BackOffPolicy; @@ -10,15 +8,13 @@ import com.yahoo.jrt.slobrok.api.Register; import com.yahoo.jrt.slobrok.api.SlobrokList; import com.yahoo.log.LogLevel; import com.yahoo.vdslib.state.*; -import com.yahoo.vespa.clustercontroller.core.rpc.EncodedClusterStateBundle; import com.yahoo.vespa.clustercontroller.core.rpc.RPCCommunicator; import com.yahoo.vespa.clustercontroller.core.rpc.RPCUtil; -import com.yahoo.vespa.clustercontroller.core.rpc.SlimeClusterStateBundleCodec; -import java.net.InetAddress; import java.net.UnknownHostException; import java.util.*; import java.util.logging.Logger; +import java.util.stream.Collectors; /** * @@ -34,6 +30,7 @@ public class DummyVdsNode { private NodeType type; private int index; private NodeState nodeState; + private String hostInfo = "{}"; private Supervisor supervisor; private Acceptor acceptor; private Register register; @@ -67,10 +64,10 @@ public class DummyVdsNode { private final List<Req> waitingRequests = new LinkedList<>(); /** - * History of received system states. + * History of received cluster states. * Any access to this list or to its members must be synchronized on the timer variable. */ - private List<ClusterState> systemState = new LinkedList<>(); + private List<ClusterStateBundle> clusterStateBundles = new LinkedList<>(); private Thread messageResponder = new Thread() { public void run() { @@ -216,8 +213,10 @@ public class DummyVdsNode { /** Returns the latest system state version received, or empty if none are received yet. */ private Optional<Integer> getLatestSystemStateVersion() { synchronized(timer) { - if (systemState.isEmpty()) return Optional.empty(); - return Optional.of(systemState.get(0).getVersion()); + if (clusterStateBundles.isEmpty()) { + return Optional.empty(); + } + return Optional.of(clusterStateBundles.get(0).getVersion()); } } @@ -260,7 +259,7 @@ public class DummyVdsNode { log.log(LogLevel.DEBUG, "Dummy node " + this + " answering pending node state request."); req.request.returnValues().add(new StringValue(nodeState.serialize())); if (req.request.methodName().equals("getnodestate3")) { - req.request.returnValues().add(new StringValue("Dummy node host info")); + req.request.returnValues().add(new StringValue(hostInfo)); } req.request.returnRequest(); ++setNodeStateReplies; @@ -268,14 +267,19 @@ public class DummyVdsNode { waitingRequests.clear(); } - public void setNodeState(NodeState state) { + public void setNodeState(NodeState state, String hostInfo) { log.log(LogLevel.DEBUG, "Dummy node " + this + " got new state: " + state); synchronized(timer) { this.nodeState = state; + this.hostInfo = hostInfo; replyToPendingNodeStateRequests(); } } + public void setNodeState(NodeState state) { + setNodeState(state, "{}"); + } + public void setNodeState(State state) { setNodeState(new NodeState(type, state)); } @@ -287,16 +291,22 @@ public class DummyVdsNode { } public List<ClusterState> getSystemStatesReceived() { - List<ClusterState> states = new ArrayList<>(); synchronized(timer) { - states.addAll(systemState); + return clusterStateBundles.stream() + .map(ClusterStateBundle::getBaselineClusterState) + .collect(Collectors.toList()); + } + } + + public ClusterStateBundle getClusterStateBundle() { + synchronized(timer) { + return (clusterStateBundles.isEmpty() ? null : clusterStateBundles.get(0)); } - return states; } public ClusterState getClusterState() { synchronized(timer) { - return (systemState.isEmpty() ? null : systemState.get(0)); + return (clusterStateBundles.isEmpty() ? null : clusterStateBundles.get(0).getBaselineClusterState()); } } @@ -482,7 +492,7 @@ public class DummyVdsNode { ClusterState newState = new ClusterState(req.parameters().get(0).asString()); synchronized(timer) { updateStartTimestamps(newState); - systemState.add(0, newState); + clusterStateBundles.add(0, ClusterStateBundle.ofBaselineOnly(AnnotatedClusterState.withoutAnnotations(newState))); timer.notifyAll(); } req.returnValues().add(new Int32Value(1)); @@ -505,7 +515,7 @@ public class DummyVdsNode { ClusterState newState = new ClusterState(req.parameters().get(0).asString()); synchronized(timer) { updateStartTimestamps(newState); - systemState.add(0, newState); + clusterStateBundles.add(0, ClusterStateBundle.ofBaselineOnly(AnnotatedClusterState.withoutAnnotations(newState))); timer.notifyAll(); } log.log(LogLevel.DEBUG, "Dummy node " + this + ": Got new system state " + newState); @@ -523,13 +533,12 @@ public class DummyVdsNode { return; } ClusterStateBundle stateBundle = RPCUtil.decodeStateBundleFromSetDistributionStatesRequest(req); - ClusterState newState = stateBundle.getBaselineClusterState(); synchronized(timer) { - updateStartTimestamps(newState); - systemState.add(0, newState); + updateStartTimestamps(stateBundle.getBaselineClusterState()); + clusterStateBundles.add(0, stateBundle); timer.notifyAll(); } - log.log(LogLevel.DEBUG, "Dummy node " + this + ": Got new cluster state " + newState); + log.log(LogLevel.DEBUG, "Dummy node " + this + ": Got new cluster state " + stateBundle); } catch (Exception e) { log.log(LogLevel.ERROR, "Dummy node " + this + ": An error occured when answering setdistributionstates request: " + e.getMessage()); e.printStackTrace(System.err); diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/FleetControllerTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/FleetControllerTest.java index 19d87b878d4..21d9b0a7a1f 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/FleetControllerTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/FleetControllerTest.java @@ -18,6 +18,7 @@ import com.yahoo.vdslib.state.NodeState; import com.yahoo.vdslib.state.NodeType; import com.yahoo.vdslib.state.State; import com.yahoo.vespa.clustercontroller.core.database.DatabaseHandler; +import com.yahoo.vespa.clustercontroller.core.database.ZooKeeperDatabaseFactory; import com.yahoo.vespa.clustercontroller.core.rpc.RPCCommunicator; import com.yahoo.vespa.clustercontroller.core.rpc.RpcServer; import com.yahoo.vespa.clustercontroller.core.rpc.SlobrokClient; @@ -157,7 +158,7 @@ public abstract class FleetControllerTest implements Waiter { status = new StatusPageServer(timer, timer, options.httpPort); } RpcServer rpcServer = new RpcServer(timer, timer, options.clusterName, options.fleetControllerIndex, options.slobrokBackOffPolicy); - DatabaseHandler database = new DatabaseHandler(timer, options.zooKeeperServerAddress, options.fleetControllerIndex, timer); + DatabaseHandler database = new DatabaseHandler(new ZooKeeperDatabaseFactory(), timer, options.zooKeeperServerAddress, options.fleetControllerIndex, timer); StateChangeHandler stateGenerator = new StateChangeHandler(timer, log, metricUpdater); SystemStateBroadcaster stateBroadcaster = new SystemStateBroadcaster(timer, timer); MasterElectionHandler masterElectionHandler = new MasterElectionHandler(options.fleetControllerIndex, options.fleetControllerCount, timer, timer); @@ -311,6 +312,8 @@ public abstract class FleetControllerTest implements Waiter { public ClusterState waitForStableSystem() throws Exception { return waiter.waitForStableSystem(); } public ClusterState waitForStableSystem(int nodeCount) throws Exception { return waiter.waitForStableSystem(nodeCount); } public ClusterState waitForState(String state) throws Exception { return waiter.waitForState(state); } + public ClusterState waitForStateInAllSpaces(String state) throws Exception { return waiter.waitForStateInAllSpaces(state); } + public ClusterState waitForStateInSpace(String space, String state) throws Exception { return waiter.waitForStateInSpace(space, state); } public ClusterState waitForState(String state, int timeoutMS) throws Exception { return waiter.waitForState(state, timeoutMS); } public ClusterState waitForInitProgressPassed(Node n, double progress) { return waiter.waitForInitProgressPassed(n, progress); } public ClusterState waitForClusterStateIncludingNodesWithMinUsedBits(int bitcount, int nodecount) { return waiter.waitForClusterStateIncludingNodesWithMinUsedBits(bitcount, nodecount); } diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MasterElectionTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MasterElectionTest.java index dc79a92e68b..b2464e83f95 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MasterElectionTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MasterElectionTest.java @@ -9,6 +9,8 @@ import com.yahoo.jrt.Transport; import com.yahoo.jrt.slobrok.server.Slobrok; import com.yahoo.log.LogLevel; import com.yahoo.vdslib.state.ClusterState; +import com.yahoo.vdslib.state.NodeState; +import com.yahoo.vdslib.state.NodeType; import com.yahoo.vdslib.state.State; import org.junit.Ignore; import org.junit.Rule; @@ -483,7 +485,7 @@ public class MasterElectionTest extends FleetControllerTest { // no guarantees that it has been published to any nodes yet. final long preElectionVersionNumber = fleetControllers.get(0).getSystemState().getVersion(); - // Nuke controller 1, leaving controller 1 in charge. + // Nuke controller 0, leaving controller 1 in charge. // It should have observed the most recently written version number and increase this // number before publishing its own new state. fleetControllers.get(0).shutdown(); @@ -495,4 +497,47 @@ public class MasterElectionTest extends FleetControllerTest { assertThat(postElectionVersionNumber, greaterThan(preElectionVersionNumber)); } + @Test + public void previously_published_state_is_taken_into_account_for_default_space_when_controller_bootstraps() throws Exception { + startingTest("MasterElectionTest::previously_published_state_is_taken_into_account_for_default_space_when_controller_bootstraps"); + FleetControllerOptions options = new FleetControllerOptions("mycluster"); + options.enableMultipleBucketSpaces = true; + options.masterZooKeeperCooldownPeriod = 1; + options.minTimeBeforeFirstSystemStateBroadcast = 10000; + setUpFleetController(3, true, options); + setUpVdsNodes(true, new DummyVdsNodeOptions()); + fleetController = fleetControllers.get(0); // Required to prevent waitForStableSystem from NPE'ing + waitForMaster(0); + waitForStableSystem(); + log.info("Waiting for full maintenance mode in default space"); + waitForStateInSpace("default", "version:\\d+ distributor:10 storage:10 .0.s:m .1.s:m .2.s:m .3.s:m .4.s:m .5.s:m .6.s:m .7.s:m .8.s:m .9.s:m"); + + log.info("Responding with zero global merges pending from all distributors"); + final int ackVersion = fleetControllers.get(0).getClusterStateBundle().getVersion(); + // ACKing with no merge state in host info (implied: no pending merges) should not cause + // a new state to be published before the last node has ACKed. Consequently there should + // not be any race potential where a new version is published concurrently with our attempts + // at ACKing a previous one. + this.nodes.stream().filter(DummyVdsNode::isDistributor).forEach(node -> { + node.setNodeState(new NodeState(NodeType.DISTRIBUTOR, State.UP), + String.format("{\"cluster-state-version\":%d}", ackVersion)); + }); + waitForStateInAllSpaces("version:\\d+ distributor:10 storage:10"); + + log.info("Bundle before restart cycle: " + fleetControllers.get(0).getClusterStateBundle()); + log.info("Doing restart cycle of controller 0"); + fleetControllers.get(0).shutdown(); + waitForMaster(1); + waitForCompleteCycle(1); + + fleetControllers.set(0, createFleetController(usingFakeTimer, fleetControllers.get(0).getOptions(), true, null)); + waitForMaster(0); + waitForCompleteCycle(0); + + // We should NOT publish a state where all storage nodes are in Maintenance, since they were + // marked as Up in the last published cluster state. + log.info("Bundle after restart cycle: " + fleetControllers.get(0).getClusterStateBundle()); + waitForStateInAllSpaces("version:\\d+ distributor:10 storage:10"); + } + } diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java index b10a8101c37..32de3591f2d 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java @@ -5,6 +5,7 @@ import com.yahoo.jrt.*; import com.yahoo.vdslib.distribution.ConfiguredNode; import com.yahoo.vdslib.state.*; import com.yahoo.vespa.clustercontroller.core.database.DatabaseHandler; +import com.yahoo.vespa.clustercontroller.core.database.ZooKeeperDatabaseFactory; import com.yahoo.vespa.clustercontroller.core.testutils.StateWaiter; import com.yahoo.vespa.clustercontroller.utils.util.NoMetricReporter; import org.junit.Before; @@ -46,7 +47,7 @@ public class StateChangeTest extends FleetControllerTest { ContentCluster cluster = new ContentCluster(options.clusterName, options.nodes, options.storageDistribution, options.minStorageNodesUp, options.minRatioOfStorageNodesUp); NodeStateGatherer stateGatherer = new NodeStateGatherer(timer, timer, eventLog); - DatabaseHandler database = new DatabaseHandler(timer, options.zooKeeperServerAddress, options.fleetControllerIndex, timer); + DatabaseHandler database = new DatabaseHandler(new ZooKeeperDatabaseFactory(), timer, options.zooKeeperServerAddress, options.fleetControllerIndex, timer); StateChangeHandler stateGenerator = new StateChangeHandler(timer, eventLog, metricUpdater); SystemStateBroadcaster stateBroadcaster = new SystemStateBroadcaster(timer, timer); MasterElectionHandler masterElectionHandler = new MasterElectionHandler(options.fleetControllerIndex, options.fleetControllerCount, timer, timer); @@ -955,8 +956,9 @@ public class StateChangeTest extends FleetControllerTest { StateWaiter waiter = new StateWaiter(timer); fleetController.addSystemStateListener(waiter); - // Ensure all nodes have been seen by fleetcontroller and that it has had enough time to possibly have sent a cluster state - waiter.waitForState("version:\\d+ distributor:10 (\\.\\d+\\.t:\\d+ )*storage:10 (\\.\\d+\\.t:\\d+ )*.1.s:d( \\.\\d+\\.t:\\d+)*", timeoutMS); + // Ensure all nodes have been seen by fleetcontroller and that it has had enough time to possibly have sent a cluster state + // Note: this is a candidate state and therefore NOT versioned yet + waiter.waitForState("^distributor:10 (\\.\\d+\\.t:\\d+ )*storage:10 (\\.\\d+\\.t:\\d+ )*.1.s:d( \\.\\d+\\.t:\\d+)*", timeoutMS); waitForCompleteCycle(); new StateMessageChecker(nodes) { @Override int expectedMessageCount(final DummyVdsNode node) { return 0; } diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateVersionTrackerTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateVersionTrackerTest.java index 2a3be1e7194..95b699d4de4 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateVersionTrackerTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateVersionTrackerTest.java @@ -303,6 +303,23 @@ public class StateVersionTrackerTest { assertFalse(tracker.bucketSpaceMergeCompletionStateHasChanged()); } + @Test + public void setting_zookeeper_retrieved_bundle_sets_current_versioned_state_and_resets_candidate_state() { + final StateVersionTracker versionTracker = createWithMockedMetrics(); + versionTracker.setVersionRetrievedFromZooKeeper(100); + versionTracker.updateLatestCandidateStateBundle( + ClusterStateBundle.ofBaselineOnly(stateWithoutAnnotations("distributor:1 storage:1"))); + versionTracker.promoteCandidateToVersionedState(12345); + + ClusterStateBundle zkBundle = ClusterStateBundle.ofBaselineOnly(stateWithoutAnnotations("version:199 distributor:2 storage:2")); + versionTracker.setVersionRetrievedFromZooKeeper(200); + versionTracker.setClusterStateBundleRetrievedFromZooKeeper(zkBundle); + + assertThat(versionTracker.getLatestCandidateState(), equalTo(AnnotatedClusterState.emptyState())); + assertThat(versionTracker.getVersionedClusterStateBundle(), equalTo(zkBundle)); + assertThat(versionTracker.getCurrentVersion(), equalTo(200)); // Not from bundle, but from explicitly stored version + } + private HostInfo createHostInfo(long bucketsPending) { return HostInfo.createHostInfo( "{\n" + diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ZooKeeperDatabaseTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ZooKeeperDatabaseTest.java new file mode 100644 index 00000000000..e5200b63a7e --- /dev/null +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ZooKeeperDatabaseTest.java @@ -0,0 +1,76 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.clustercontroller.core; + +import com.yahoo.vespa.clustercontroller.core.database.Database; +import com.yahoo.vespa.clustercontroller.core.database.ZooKeeperDatabase; +import org.junit.Test; + +import java.io.IOException; +import java.time.Duration; + +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.mock; + +public class ZooKeeperDatabaseTest { + + private static class Fixture implements AutoCloseable { + final ZooKeeperTestServer zkServer; + ClusterFixture clusterFixture; + ZooKeeperDatabase zkDatabase; + int nodeIndex = 1; + Duration sessionTimeout = Duration.ofSeconds(60); + Database.DatabaseListener mockListener = mock(Database.DatabaseListener.class); + + Fixture() throws IOException { + zkServer = new ZooKeeperTestServer(); + clusterFixture = ClusterFixture.forFlatCluster(10); + } + + void createDatabase() throws Exception { + closeDatabaseIfOpen(); + zkDatabase = new ZooKeeperDatabase(clusterFixture.cluster(), nodeIndex, zkServer.getAddress(), + (int)sessionTimeout.toMillis(), mockListener); + } + + ZooKeeperDatabase db() { return zkDatabase; } + + void closeDatabaseIfOpen() { + if (zkDatabase != null) { + zkDatabase.close(); + zkDatabase = null; + } + } + + @Override + public void close() { + closeDatabaseIfOpen(); + zkServer.shutdown(true); + } + } + + @Test + public void can_store_and_load_cluster_state_bundle_from_database() throws Exception { + try (Fixture f = new Fixture()) { + f.createDatabase(); + ClusterStateBundle bundleToStore = ClusterStateBundleUtil.makeBundle("distributor:2 storage:2", + StateMapping.of("default", "distributor:2 storage:2 .0.s:d"), + StateMapping.of("upsidedown", "distributor:2 .0.s:d storage:2")); + f.db().storeLastPublishedStateBundle(bundleToStore); + + ClusterStateBundle bundleReceived = f.db().retrieveLastPublishedStateBundle(); + assertThat(bundleReceived, equalTo(bundleToStore)); + } + } + + @Test + public void empty_state_bundle_is_returned_if_no_bundle_already_stored_in_database() throws Exception { + try (Fixture f = new Fixture()) { + f.createDatabase(); + ClusterStateBundle bundleReceived = f.db().retrieveLastPublishedStateBundle(); + + assertThat(bundleReceived, equalTo(ClusterStateBundle.empty())); + } + } + +} diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/SlimeClusterStateBundleCodecTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/SlimeClusterStateBundleCodecTest.java index f2fe494ce61..b19b1d780bf 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/SlimeClusterStateBundleCodecTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/SlimeClusterStateBundleCodecTest.java @@ -18,6 +18,12 @@ public class SlimeClusterStateBundleCodecTest { return codec.decode(encoded); } + private static ClusterStateBundle roundtripEncodeWithEnvelope(ClusterStateBundle stateBundle) { + SlimeClusterStateBundleCodec codec = new SlimeClusterStateBundleCodec(); + byte[] encoded = codec.encodeWithEnvelope(stateBundle); + return codec.decodeWithEnvelope(encoded); + } + @Test public void baseline_only_bundle_can_be_round_trip_encoded() { ClusterStateBundle stateBundle = ClusterStateBundleUtil.makeBundle("distributor:2 storage:2"); @@ -53,4 +59,17 @@ public class SlimeClusterStateBundleCodecTest { assertThat(decodedBundle, equalTo(stateBundle)); } + @Test + public void uncompressed_enveloped_bundle_can_be_roundtrip_encoded() { + // Insufficient length and too much entropy to be compressed + ClusterStateBundle stateBundle = ClusterStateBundleUtil.makeBundle("distributor:2 storage:3"); + assertThat(roundtripEncodeWithEnvelope(stateBundle), equalTo(stateBundle)); + } + + @Test + public void compressable_enveloped_bundle_can_be_roundtrip_encoded() { + ClusterStateBundle stateBundle = makeCompressableBundle(); + assertThat(roundtripEncodeWithEnvelope(stateBundle), equalTo(stateBundle)); + } + } diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/StateWaiter.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/StateWaiter.java index 730b03ff7c9..7811e68c381 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/StateWaiter.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/StateWaiter.java @@ -23,7 +23,8 @@ public class StateWaiter implements SystemStateListener { this.timer = timer; } - public void handleNewSystemState(ClusterStateBundle state) { + @Override + public void handleNewPublishedState(ClusterStateBundle state) { synchronized(timer) { current = state.getBaselineClusterState(); @@ -32,6 +33,13 @@ public class StateWaiter implements SystemStateListener { } } + @Override + public void handleNewCandidateState(ClusterStateBundle states) { + // Treat candidate states as if they were published for the tests that use + // this (deprecated) waiter class. + handleNewPublishedState(states); + } + public int getStateUpdates() { return Math.max(0, stateUpdates); } public ClusterState getCurrentSystemState() { diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/WaitCondition.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/WaitCondition.java index 6396b27aa79..d140ef998b6 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/WaitCondition.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/WaitCondition.java @@ -4,12 +4,17 @@ package com.yahoo.vespa.clustercontroller.core.testutils; import com.yahoo.vdslib.state.ClusterState; import com.yahoo.vdslib.state.Node; import com.yahoo.vdslib.state.NodeType; +import com.yahoo.vespa.clustercontroller.core.AnnotatedClusterState; import com.yahoo.vespa.clustercontroller.core.ClusterStateBundle; import com.yahoo.vespa.clustercontroller.core.DummyVdsNode; import com.yahoo.vespa.clustercontroller.core.FleetController; import com.yahoo.vespa.clustercontroller.core.listeners.SystemStateListener; +import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -19,14 +24,14 @@ import java.util.regex.Pattern; public interface WaitCondition { /** Return null if met, why not if it is not met. */ - public String isConditionMet(); + String isConditionMet(); - public abstract class StateWait implements WaitCondition { + abstract class StateWait implements WaitCondition { private final Object monitor; protected ClusterState currentState; private final SystemStateListener listener = new SystemStateListener() { @Override - public void handleNewSystemState(ClusterStateBundle state) { + public void handleNewPublishedState(ClusterStateBundle state) { synchronized (monitor) { currentState = state.getBaselineClusterState(); monitor.notifyAll(); @@ -36,7 +41,9 @@ public interface WaitCondition { public StateWait(FleetController fc, Object monitor) { this.monitor = monitor; - fc.addSystemStateListener(listener); + synchronized (this.monitor) { + fc.addSystemStateListener(listener); + } } public ClusterState getCurrentState() { @@ -46,11 +53,13 @@ public interface WaitCondition { } } - public class RegexStateMatcher extends StateWait { + class RegexStateMatcher extends StateWait { private final Pattern pattern; private Collection<DummyVdsNode> nodesToCheck; private ClusterState lastCheckedState; + private boolean checkAllSpaces = false; + private Set<String> checkSpaceSubset = Collections.emptySet(); public RegexStateMatcher(String regex, FleetController fc, Object monitor) { super(fc, monitor); @@ -62,19 +71,51 @@ public interface WaitCondition { return this; } + public RegexStateMatcher checkAllSpaces(boolean checkAllSpaces) { + this.checkAllSpaces = checkAllSpaces; + return this; + } + + public RegexStateMatcher checkSpaceSubset(Set<String> spaces) { + this.checkSpaceSubset = spaces; + return this; + } + + private static List<ClusterState> statesInBundle(ClusterStateBundle bundle) { + List<ClusterState> states = new ArrayList<>(3); + states.add(bundle.getBaselineClusterState()); + bundle.getDerivedBucketSpaceStates().forEach((space, state) -> states.add(state.getClusterState())); + return states; + } + @Override public String isConditionMet() { if (currentState != null) { lastCheckedState = currentState; Matcher m = pattern.matcher(lastCheckedState.toString()); - if (m.matches()) { + if (m.matches() || !checkSpaceSubset.isEmpty()) { if (nodesToCheck != null) { for (DummyVdsNode node : nodesToCheck) { if (node.getClusterState() == null) { return "Node " + node + " has not received a cluster state yet"; } - if (! pattern.matcher(withoutTimestamps(node.getClusterState().toString())).matches()) { - return "Node " + node + " state mismatch.\n wanted: " + pattern + "\n is: " + node.getClusterState().toString(); + // TODO refactor, simplify + boolean match; + if (checkAllSpaces) { + match = statesInBundle(node.getClusterStateBundle()).stream() + .allMatch(state -> pattern.matcher(withoutTimestamps(state.toString())).matches()); + } else if (!checkSpaceSubset.isEmpty()) { + match = checkSpaceSubset.stream().allMatch(space -> { + String state = node.getClusterStateBundle().getDerivedBucketSpaceStates() + .getOrDefault(space, AnnotatedClusterState.emptyState()).getClusterState().toString(); + return pattern.matcher(withoutTimestamps(state)).matches(); + }); + } else { + match = pattern.matcher(withoutTimestamps(node.getClusterState().toString())).matches(); + } + + if (!match) { + return "Node " + node + " state mismatch.\n wanted: " + pattern + "\n is: " + node.getClusterStateBundle().toString(); } if (node.getStateCommunicationVersion() > 0) { if (!node.hasPendingGetNodeStateRequest()) { @@ -141,7 +182,7 @@ public interface WaitCondition { } } - public static class MinUsedBitsMatcher extends StateWait { + class MinUsedBitsMatcher extends StateWait { private final int bitCount; private final int nodeCount; diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/Waiter.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/Waiter.java index a0af1ad2d0e..fb49b1cbf47 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/Waiter.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/Waiter.java @@ -7,28 +7,32 @@ import com.yahoo.vdslib.state.Node; import com.yahoo.vespa.clustercontroller.core.DummyVdsNode; import com.yahoo.vespa.clustercontroller.core.FleetController; +import java.util.Collections; import java.util.LinkedList; import java.util.List; +import java.util.Set; import java.util.logging.Logger; public interface Waiter { - public interface DataRetriever { - public Object getMonitor(); - public FleetController getFleetController(); - public List<DummyVdsNode> getDummyNodes(); - public int getTimeoutMS(); + interface DataRetriever { + Object getMonitor(); + FleetController getFleetController(); + List<DummyVdsNode> getDummyNodes(); + int getTimeoutMS(); } - public ClusterState waitForState(String state) throws Exception; - public ClusterState waitForState(String state, int timeoutMS) throws Exception; - public ClusterState waitForStableSystem() throws Exception; - public ClusterState waitForStableSystem(int nodeCount) throws Exception; - public ClusterState waitForInitProgressPassed(Node n, double progress); - public ClusterState waitForClusterStateIncludingNodesWithMinUsedBits(int bitcount, int nodecount); - public void wait(WaitCondition c, WaitTask wt, int timeoutMS); + ClusterState waitForState(String state) throws Exception; + ClusterState waitForStateInSpace(String space, String state) throws Exception; + ClusterState waitForStateInAllSpaces(String state) throws Exception; + ClusterState waitForState(String state, int timeoutMS) throws Exception; + ClusterState waitForStableSystem() throws Exception; + ClusterState waitForStableSystem(int nodeCount) throws Exception; + ClusterState waitForInitProgressPassed(Node n, double progress); + ClusterState waitForClusterStateIncludingNodesWithMinUsedBits(int bitcount, int nodecount); + void wait(WaitCondition c, WaitTask wt, int timeoutMS); - public static class Impl implements Waiter { + class Impl implements Waiter { private static final Logger log = Logger.getLogger(Impl.class.getName()); private final DataRetriever data; @@ -37,16 +41,33 @@ public interface Waiter { this.data = data; } - public ClusterState waitForState(String state) throws Exception { return waitForState(state, data.getTimeoutMS()); } - public ClusterState waitForState(String state, int timeoutMS) throws Exception { + // TODO refactor + private ClusterState waitForState(String state, int timeoutMS, boolean checkAllSpaces, Set<String> checkSpaces) { LinkedList<DummyVdsNode> nodesToCheck = new LinkedList<>(); for(DummyVdsNode node : data.getDummyNodes()) { if (node.isConnected()) nodesToCheck.add(node); } - WaitCondition.StateWait swc = new WaitCondition.RegexStateMatcher(state, data.getFleetController(), data.getMonitor()).includeNotifyingNodes(nodesToCheck); + WaitCondition.StateWait swc = new WaitCondition.RegexStateMatcher( + state, data.getFleetController(), data.getMonitor()) + .includeNotifyingNodes(nodesToCheck) + .checkAllSpaces(checkAllSpaces) + .checkSpaceSubset(checkSpaces); wait(swc, new WaitTask.StateResender(data.getFleetController()), timeoutMS); return swc.getCurrentState(); } + + public ClusterState waitForState(String state) throws Exception { + return waitForState(state, data.getTimeoutMS()); + } + public ClusterState waitForStateInAllSpaces(String state) { + return waitForState(state, data.getTimeoutMS(), true, Collections.emptySet()); + } + public ClusterState waitForStateInSpace(String space, String state) { + return waitForState(state, data.getTimeoutMS(), false, Collections.singleton(space)); + } + public ClusterState waitForState(String state, int timeoutMS) { + return waitForState(state, timeoutMS, false, Collections.emptySet()); + } public ClusterState waitForStableSystem() throws Exception { return waitForStableSystem(data.getDummyNodes().size() / 2); } |