summaryrefslogtreecommitdiffstats
path: root/clustercontroller-core
diff options
context:
space:
mode:
Diffstat (limited to 'clustercontroller-core')
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateBundle.java4
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java95
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/StateVersionTracker.java18
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java18
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/Database.java6
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/DatabaseFactory.java34
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/DatabaseHandler.java52
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/ZooKeeperDatabase.java93
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/ZooKeeperDatabaseFactory.java14
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/listeners/SystemStateListener.java5
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/EnvelopedClusterStateBundleCodec.java19
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/SlimeClusterStateBundleCodec.java31
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DatabaseHandlerTest.java107
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java51
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/FleetControllerTest.java5
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MasterElectionTest.java47
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java8
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateVersionTrackerTest.java17
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ZooKeeperDatabaseTest.java76
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/SlimeClusterStateBundleCodecTest.java19
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/StateWaiter.java10
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/WaitCondition.java59
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/Waiter.java53
23 files changed, 716 insertions, 125 deletions
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateBundle.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateBundle.java
index 9291c8277da..76177e8f1c1 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateBundle.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateBundle.java
@@ -78,6 +78,10 @@ public class ClusterStateBundle {
return new ClusterStateBundle(baselineState, Collections.emptyMap());
}
+ public static ClusterStateBundle empty() {
+ return ofBaselineOnly(AnnotatedClusterState.emptyState());
+ }
+
public AnnotatedClusterState getBaselineAnnotatedState() {
return baselineState;
}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java
index 54c6c11f81e..e781bf3b145 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java
@@ -10,6 +10,7 @@ import com.yahoo.vdslib.state.Node;
import com.yahoo.vdslib.state.NodeState;
import com.yahoo.vdslib.state.State;
import com.yahoo.vespa.clustercontroller.core.database.DatabaseHandler;
+import com.yahoo.vespa.clustercontroller.core.database.ZooKeeperDatabaseFactory;
import com.yahoo.vespa.clustercontroller.core.hostinfo.HostInfo;
import com.yahoo.vespa.clustercontroller.core.listeners.*;
import com.yahoo.vespa.clustercontroller.core.rpc.RPCCommunicator;
@@ -25,7 +26,18 @@ import org.apache.commons.lang.exception.ExceptionUtils;
import java.io.FileNotFoundException;
-import java.util.*;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.TimeZone;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
import java.util.stream.Collectors;
@@ -54,7 +66,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
private AtomicBoolean running = new AtomicBoolean(true);
private FleetControllerOptions options;
private FleetControllerOptions nextOptions;
- private final List<SystemStateListener> systemStateListeners = new LinkedList<>();
+ private final List<SystemStateListener> systemStateListeners = new CopyOnWriteArrayList<>();
private boolean processingCycle = false;
private boolean wantedStateChanged = false;
private long cycleCount = 0;
@@ -185,7 +197,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
options.nodeStateRequestTimeoutEarliestPercentage,
options.nodeStateRequestTimeoutLatestPercentage,
options.nodeStateRequestRoundTripTimeMaxSeconds);
- DatabaseHandler database = new DatabaseHandler(timer, options.zooKeeperServerAddress, options.fleetControllerIndex, timer);
+ DatabaseHandler database = new DatabaseHandler(new ZooKeeperDatabaseFactory(), timer, options.zooKeeperServerAddress, options.fleetControllerIndex, timer);
NodeLookup lookUp = new SlobrokClient(timer);
StateChangeHandler stateGenerator = new StateChangeHandler(timer, log, metricUpdater);
SystemStateBroadcaster stateBroadcaster = new SystemStateBroadcaster(timer, timer);
@@ -219,6 +231,12 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
}
}
+ public ClusterStateBundle getClusterStateBundle() {
+ synchronized (monitor) {
+ return systemStateBroadcaster.getClusterStateBundle();
+ }
+ }
+
public void schedule(RemoteClusterControllerTask task) {
synchronized (monitor) {
log.fine("Scheduled remote task " + task.getClass().getName() + " for execution");
@@ -228,13 +246,13 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
/** Used for unit testing. */
public void addSystemStateListener(SystemStateListener listener) {
- synchronized (systemStateListeners) {
- systemStateListeners.add(listener);
- // Always give cluster state listeners the current state, in case acceptable state has come before listener is registered.
- com.yahoo.vdslib.state.ClusterState state = getSystemState();
- if (state == null) throw new NullPointerException("Cluster state should never be null at this point");
- listener.handleNewSystemState(ClusterStateBundle.ofBaselineOnly(AnnotatedClusterState.withoutAnnotations(state)));
+ systemStateListeners.add(listener);
+ // Always give cluster state listeners the current state, in case acceptable state has come before listener is registered.
+ com.yahoo.vdslib.state.ClusterState state = getSystemState();
+ if (state == null) {
+ throw new NullPointerException("Cluster state should never be null at this point");
}
+ listener.handleNewPublishedState(ClusterStateBundle.ofBaselineOnly(AnnotatedClusterState.withoutAnnotations(state)));
}
public FleetControllerOptions getOptions() {
@@ -351,7 +369,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
}
@Override
- public void handleNewSystemState(ClusterStateBundle stateBundle) {
+ public void handleNewPublishedState(ClusterStateBundle stateBundle) {
verifyInControllerThread();
ClusterState baselineState = stateBundle.getBaselineClusterState();
newStates.add(stateBundle);
@@ -361,13 +379,14 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
// nodes so that a cluster controller crash after publishing but before a successful
// ZK store will not risk reusing the same version number.
if (masterElectionHandler.isMaster()) {
- storeClusterStateVersionToZooKeeper(baselineState);
+ storeClusterStateMetaDataToZooKeeper(stateBundle);
}
}
- private void storeClusterStateVersionToZooKeeper(ClusterState state) {
+ private void storeClusterStateMetaDataToZooKeeper(ClusterStateBundle stateBundle) {
try {
- database.saveLatestSystemStateVersion(databaseContext, state.getVersion());
+ database.saveLatestSystemStateVersion(databaseContext, stateBundle.getVersion());
+ database.saveLatestClusterStateBundle(databaseContext, stateBundle);
} catch (InterruptedException e) {
// Rethrow as RuntimeException to propagate exception up to main thread method.
// Don't want to hide failures to write cluster state version.
@@ -654,6 +673,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
}
sentAny = systemStateBroadcaster.broadcastNewState(databaseContext, communicator);
if (sentAny) {
+ // FIXME won't this inhibit resending to unresponsive nodes?
nextStateSendTime = currentTime + options.minTimeBetweenNewSystemStates;
}
}
@@ -665,7 +685,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
synchronized (systemStateListeners) {
for (ClusterStateBundle stateBundle : newStates) {
for(SystemStateListener listener : systemStateListeners) {
- listener.handleNewSystemState(stateBundle);
+ listener.handleNewPublishedState(stateBundle);
}
}
newStates.clear();
@@ -782,12 +802,21 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
eventLog.add(new ClusterEvent(ClusterEvent.Type.MASTER_ELECTION, "This node just became node state gatherer as we are fleetcontroller master candidate.", timer.getCurrentTimeInMillis()));
// Update versions to use so what is shown is closer to what is reality on the master
stateVersionTracker.setVersionRetrievedFromZooKeeper(database.getLatestSystemStateVersion());
+ stateChangeHandler.setStateChangedFlag();
}
}
isStateGatherer = true;
return didWork;
}
+ private void invokeCandidateStateListeners(ClusterStateBundle candidateBundle) {
+ systemStateListeners.forEach(listener -> listener.handleNewCandidateState(candidateBundle));
+ }
+
+ private boolean hasPassedFirstStateBroadcastTimePoint(long timeNowMs) {
+ return timeNowMs >= firstAllowedStateBroadcast || cluster.allStatesReported();
+ }
+
private boolean recomputeClusterStateIfRequired() {
boolean stateWasChanged = false;
if (mustRecomputeCandidateClusterState()) {
@@ -800,16 +829,18 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
.stateDeriver(createBucketSpaceStateDeriver())
.deriveAndBuild();
stateVersionTracker.updateLatestCandidateStateBundle(candidateBundle);
+ invokeCandidateStateListeners(candidateBundle);
- if (stateVersionTracker.candidateChangedEnoughFromCurrentToWarrantPublish()
- || stateVersionTracker.hasReceivedNewVersionFromZooKeeper())
+ final long timeNowMs = timer.getCurrentTimeInMillis();
+ if (hasPassedFirstStateBroadcastTimePoint(timeNowMs)
+ && (stateVersionTracker.candidateChangedEnoughFromCurrentToWarrantPublish()
+ || stateVersionTracker.hasReceivedNewVersionFromZooKeeper()))
{
- final long timeNowMs = timer.getCurrentTimeInMillis();
final ClusterStateBundle before = stateVersionTracker.getVersionedClusterStateBundle();
stateVersionTracker.promoteCandidateToVersionedState(timeNowMs);
emitEventsForAlteredStateEdges(before, stateVersionTracker.getVersionedClusterStateBundle(), timeNowMs);
- handleNewSystemState(stateVersionTracker.getVersionedClusterStateBundle());
+ handleNewPublishedState(stateVersionTracker.getVersionedClusterStateBundle());
stateWasChanged = true;
}
}
@@ -893,10 +924,19 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
}
}
+ private boolean atFirstClusterStateSendTimeEdge() {
+ // We only care about triggering a state recomputation for the master, which is the only
+ // one allowed to actually broadcast any states.
+ if (!isMaster || systemStateBroadcaster.hasBroadcastedClusterStateBundle()) {
+ return false;
+ }
+ return hasPassedFirstStateBroadcastTimePoint(timer.getCurrentTimeInMillis());
+ }
+
private boolean mustRecomputeCandidateClusterState() {
return stateChangeHandler.stateMayHaveChanged()
- || stateVersionTracker.hasReceivedNewVersionFromZooKeeper()
- || stateVersionTracker.bucketSpaceMergeCompletionStateHasChanged();
+ || stateVersionTracker.bucketSpaceMergeCompletionStateHasChanged()
+ || atFirstClusterStateSendTimeEdge();
}
private boolean handleLeadershipEdgeTransitions() throws InterruptedException {
@@ -904,16 +944,25 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
if (masterElectionHandler.isMaster()) {
if ( ! isMaster) {
metricUpdater.becameMaster();
- // If we just became master, restore wanted states from database
+ // If we just became master, restore state from ZooKeeper
+ stateChangeHandler.setStateChangedFlag();
+ systemStateBroadcaster.resetBroadcastedClusterStateBundle();
+
stateVersionTracker.setVersionRetrievedFromZooKeeper(database.getLatestSystemStateVersion());
- didWork = database.loadStartTimestamps(cluster);
- didWork |= database.loadWantedStates(databaseContext);
+ ClusterStateBundle previousBundle = database.getLatestClusterStateBundle();
+ database.loadStartTimestamps(cluster);
+ database.loadWantedStates(databaseContext);
+
+ log.info(() -> String.format("Loaded previous cluster state bundle from ZooKeeper: %s", previousBundle));
+ stateVersionTracker.setClusterStateBundleRetrievedFromZooKeeper(previousBundle);
+
eventLog.add(new ClusterEvent(ClusterEvent.Type.MASTER_ELECTION, "This node just became fleetcontroller master. Bumped version to "
+ stateVersionTracker.getCurrentVersion() + " to be in line.", timer.getCurrentTimeInMillis()));
long currentTime = timer.getCurrentTimeInMillis();
firstAllowedStateBroadcast = currentTime + options.minTimeBeforeFirstSystemStateBroadcast;
log.log(LogLevel.DEBUG, "At time " + currentTime + " we set first system state broadcast time to be "
+ options.minTimeBeforeFirstSystemStateBroadcast + " ms after at time " + firstAllowedStateBroadcast + ".");
+ didWork = true;
}
isMaster = true;
if (wantedStateChanged) {
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/StateVersionTracker.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/StateVersionTracker.java
index 177efe31766..e2f98cf5492 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/StateVersionTracker.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/StateVersionTracker.java
@@ -50,6 +50,21 @@ public class StateVersionTracker {
this.lastZooKeeperVersion = this.currentVersion;
}
+ void setClusterStateBundleRetrievedFromZooKeeper(ClusterStateBundle bundle) {
+ // There is an edge where the bundle version will mismatch with the version set
+ // via setVersionRetrievedFromZooKeeper() if the controller (or ZK) crashes before
+ // it can write both sequentially. But since we use the ZK-written version explicitly
+ // when choosing a new version for our own published states, it should not matter in
+ // practice. Worst case is that the current state reflects the same version that a
+ // previous controller had, but we will never publish this state ourselves; publishing
+ // only happens after we've generated our own, new candidate state and overwritten
+ // the empty states set below. Publishing also, as mentioned, sets a version based on
+ // the ZK version, not the version stored in the bundle itself.
+ currentClusterState = bundle;
+ currentUnversionedState = ClusterStateBundle.empty();
+ latestCandidateState = ClusterStateBundle.empty();
+ }
+
/**
* Sets limit on how many cluster states can be kept in the in-memory queue. Once
* the list exceeds this limit, the oldest state is repeatedly removed until the limit
@@ -140,8 +155,7 @@ public class StateVersionTracker {
}
private void recordCurrentStateInHistoryAtTime(final long currentTimeMs) {
- clusterStateHistory.addFirst(new ClusterStateHistoryEntry(
- currentClusterState, currentTimeMs));
+ clusterStateHistory.addFirst(new ClusterStateHistoryEntry(currentClusterState, currentTimeMs));
while (clusterStateHistory.size() > maxHistoryEntryCount) {
clusterStateHistory.removeLast();
}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java
index b0869b560f6..629800fb13c 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java
@@ -40,6 +40,18 @@ public class SystemStateBroadcaster {
return clusterStateBundle.getBaselineClusterState();
}
+ public boolean hasBroadcastedClusterStateBundle() {
+ return clusterStateBundle != null;
+ }
+
+ public void resetBroadcastedClusterStateBundle() {
+ clusterStateBundle = null;
+ }
+
+ public ClusterStateBundle getClusterStateBundle() {
+ return clusterStateBundle;
+ }
+
private void reportNodeError(boolean nodeOk, NodeInfo info, String message) {
long time = timer.getCurrentTimeInMillis();
Long lastReported = lastErrorReported.get(info.getNode());
@@ -113,10 +125,10 @@ public class SystemStateBroadcaster {
void checkIfClusterStateIsAckedByAllDistributors(DatabaseHandler database,
DatabaseHandler.Context dbContext,
FleetController fleetController) throws InterruptedException {
- final int currentStateVersion = clusterStateBundle.getVersion();
- if ((clusterStateBundle == null) || (lastClusterStateInSync == currentStateVersion)) {
+ if ((clusterStateBundle == null) || (lastClusterStateInSync == clusterStateBundle.getVersion())) {
return; // Nothing to do for the current state
}
+ final int currentStateVersion = clusterStateBundle.getVersion();
boolean anyOutdatedDistributorNodes = dbContext.getCluster().getNodeInfo().stream()
.filter(NodeInfo::isDistributor)
.anyMatch(this::nodeNeedsClusterState);
@@ -137,7 +149,7 @@ public class SystemStateBroadcaster {
if (!baselineState.isOfficial()) {
log.log(LogLevel.INFO, String.format("Publishing cluster state version %d", baselineState.getVersion()));
- baselineState.setOfficial(true);
+ baselineState.setOfficial(true); // FIXME this violates state bundle immutability
}
List<NodeInfo> recipients = resolveStateVersionSendSet(dbContext);
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/Database.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/Database.java
index ac9f1473b57..c98e362c2d2 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/Database.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/Database.java
@@ -3,6 +3,7 @@ package com.yahoo.vespa.clustercontroller.core.database;
import com.yahoo.vdslib.state.Node;
import com.yahoo.vdslib.state.NodeState;
+import com.yahoo.vespa.clustercontroller.core.ClusterStateBundle;
import java.util.Map;
@@ -82,4 +83,9 @@ public abstract class Database {
* Fetch the start times of distributor and service layer nodes.
*/
public abstract Map<Node, Long> retrieveStartTimestamps() throws InterruptedException;
+
+ public abstract boolean storeLastPublishedStateBundle(ClusterStateBundle stateBundle) throws InterruptedException;
+
+ public abstract ClusterStateBundle retrieveLastPublishedStateBundle() throws InterruptedException;
+
}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/DatabaseFactory.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/DatabaseFactory.java
new file mode 100644
index 00000000000..3e7b98fe3fb
--- /dev/null
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/DatabaseFactory.java
@@ -0,0 +1,34 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.clustercontroller.core.database;
+
+import com.yahoo.vespa.clustercontroller.core.ContentCluster;
+
+/**
+ * Database factory to enable test mocking of DB features. In practice, this
+ * will always be {@link ZooKeeperDatabase} due to rather heavy ZK feature
+ * dependencies and leaky abstractions built on top of them.
+ */
+public interface DatabaseFactory {
+
+ class Params {
+ ContentCluster cluster;
+ int nodeIndex;
+ String dbAddress;
+ int dbSessionTimeout;
+ Database.DatabaseListener listener;
+
+ Params cluster(ContentCluster c) { this.cluster = c; return this; }
+ ContentCluster cluster() { return cluster; }
+ Params nodeIndex(int i) { this.nodeIndex = i; return this; }
+ int nodeIndex() { return nodeIndex; }
+ Params databaseAddress(String address) { this.dbAddress = address; return this; }
+ String databaseAddress() { return dbAddress; }
+ Params databaseSessionTimeout(int timeout) { this.dbSessionTimeout = timeout; return this; }
+ int databaseSessionTimeout() { return dbSessionTimeout; }
+ Params databaseListener(Database.DatabaseListener listener) { this.listener = listener; return this; }
+ Database.DatabaseListener databaseListener() { return listener; }
+ }
+
+ Database create(Params params) throws Exception;
+
+}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/DatabaseHandler.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/DatabaseHandler.java
index c89bb6a136a..fcf5b58e62d 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/DatabaseHandler.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/DatabaseHandler.java
@@ -5,6 +5,8 @@ import com.yahoo.log.LogLevel;
import com.yahoo.vdslib.state.Node;
import com.yahoo.vdslib.state.NodeState;
import com.yahoo.vdslib.state.State;
+import com.yahoo.vespa.clustercontroller.core.AnnotatedClusterState;
+import com.yahoo.vespa.clustercontroller.core.ClusterStateBundle;
import com.yahoo.vespa.clustercontroller.core.FleetController;
import com.yahoo.vespa.clustercontroller.core.NodeInfo;
import com.yahoo.vespa.clustercontroller.core.Timer;
@@ -39,12 +41,14 @@ public class DatabaseHandler {
Integer lastSystemStateVersion;
Map<Node, NodeState> wantedStates;
Map<Node, Long> startTimestamps;
+ ClusterStateBundle clusterStateBundle;
void clear() {
masterVote = null;
lastSystemStateVersion = null;
wantedStates = null;
startTimestamps = null;
+ clusterStateBundle = null;
}
}
private class DatabaseListener implements Database.DatabaseListener {
@@ -68,15 +72,13 @@ public class DatabaseHandler {
}
}
+ private final DatabaseFactory databaseFactory;
private final Timer timer;
private final int nodeIndex;
private final Object monitor;
private String zooKeeperAddress;
private int zooKeeperSessionTimeout = 5000;
private final Object databaseMonitor = new Object();
-
- /** This is always ZooKeeperDatabase */
- // TODO: Get rid of the interface as it is both unnecessary and gives a false impression of independence
private Database database;
private DatabaseListener dbListener = new DatabaseListener();
@@ -87,8 +89,9 @@ public class DatabaseHandler {
private boolean lostZooKeeperConnectionEvent = false;
private Map<Integer, Integer> masterDataEvent = null;
- public DatabaseHandler(Timer timer, String zooKeeperAddress, int ourIndex, Object monitor) throws InterruptedException
+ public DatabaseHandler(DatabaseFactory databaseFactory, Timer timer, String zooKeeperAddress, int ourIndex, Object monitor) throws InterruptedException
{
+ this.databaseFactory = databaseFactory;
this.timer = timer;
this.nodeIndex = ourIndex;
pendingStore.masterVote = ourIndex; // To begin with we'll vote for ourselves.
@@ -166,8 +169,13 @@ public class DatabaseHandler {
clearSessionMetaData();
log.log(LogLevel.INFO,
"Fleetcontroller " + nodeIndex + ": Setting up new ZooKeeper session at " + zooKeeperAddress);
- database = new ZooKeeperDatabase(cluster,
- nodeIndex, zooKeeperAddress, zooKeeperSessionTimeout, dbListener);
+ DatabaseFactory.Params params = new DatabaseFactory.Params()
+ .cluster(cluster)
+ .nodeIndex(nodeIndex)
+ .databaseAddress(zooKeeperAddress)
+ .databaseSessionTimeout(zooKeeperSessionTimeout)
+ .databaseListener(dbListener);
+ database = databaseFactory.create(params);
}
} catch (KeeperException.NodeExistsException e) {
log.log(LogLevel.DEBUG, "Fleetcontroller " + nodeIndex + ": Cannot create ephemeral fleetcontroller node. ZooKeeper server "
@@ -279,6 +287,15 @@ public class DatabaseHandler {
return didWork;
}
}
+ if (pendingStore.clusterStateBundle != null) {
+ didWork = true;
+ if (database.storeLastPublishedStateBundle(pendingStore.clusterStateBundle)) {
+ currentlyStored.clusterStateBundle = pendingStore.clusterStateBundle;
+ pendingStore.clusterStateBundle = null;
+ } else {
+ return true;
+ }
+ }
}
return didWork;
}
@@ -325,11 +342,28 @@ public class DatabaseHandler {
if (usingZooKeeper()) {
log.log(LogLevel.WARNING, "Fleetcontroller " + nodeIndex + ": Failed to retrieve latest system state version from ZooKeeper. Returning version 0.");
}
- return 0;
+ return 0; // FIXME "fail-oblivious" is not a good error handling mode for such a critical component!
}
return version;
}
+ public void saveLatestClusterStateBundle(Context context, ClusterStateBundle clusterStateBundle) throws InterruptedException {
+ log.log(LogLevel.DEBUG, () -> String.format("Fleetcontroller %d: Scheduling bundle %s to be saved to ZooKeeper", nodeIndex, clusterStateBundle));
+ pendingStore.clusterStateBundle = clusterStateBundle;
+ doNextZooKeeperTask(context);
+ }
+
+ public ClusterStateBundle getLatestClusterStateBundle() throws InterruptedException {
+ log.log(LogLevel.DEBUG, () -> String.format("Fleetcontroller %d: Retrieving latest cluster state bundle from ZooKeeper", nodeIndex));
+ synchronized (databaseMonitor) {
+ if (database != null && !database.isClosed()) {
+ return database.retrieveLastPublishedStateBundle();
+ } else {
+ return ClusterStateBundle.empty();
+ }
+ }
+ }
+
public void saveWantedStates(Context context) throws InterruptedException {
log.log(LogLevel.DEBUG, "Fleetcontroller " + nodeIndex + ": Checking whether wanted states have changed compared to zookeeper version.");
Map<Node, NodeState> wantedStates = new TreeMap<>();
@@ -399,7 +433,9 @@ public class DatabaseHandler {
public boolean loadStartTimestamps(ContentCluster cluster) throws InterruptedException {
log.log(LogLevel.DEBUG, "Fleetcontroller " + nodeIndex + ": Retrieving start timestamps");
synchronized (databaseMonitor) {
- if (database == null || database.isClosed()) return false;
+ if (database == null || database.isClosed()) {
+ return false;
+ }
currentlyStored.startTimestamps = database.retrieveStartTimestamps();
}
Map<Node, Long> startTimestamps = currentlyStored.startTimestamps;
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/ZooKeeperDatabase.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/ZooKeeperDatabase.java
index 5399430fd59..fe1d9a2a5cd 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/ZooKeeperDatabase.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/ZooKeeperDatabase.java
@@ -1,7 +1,11 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.clustercontroller.core.database;
+import com.yahoo.vespa.clustercontroller.core.AnnotatedClusterState;
+import com.yahoo.vespa.clustercontroller.core.ClusterStateBundle;
import com.yahoo.vespa.clustercontroller.core.ContentCluster;
+import com.yahoo.vespa.clustercontroller.core.rpc.EnvelopedClusterStateBundleCodec;
+import com.yahoo.vespa.clustercontroller.core.rpc.SlimeClusterStateBundleCodec;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.data.ACL;
@@ -128,6 +132,7 @@ public class ZooKeeperDatabase extends Database {
createNode(zooKeeperRoot, "wantedstates", new byte[0]);
createNode(zooKeeperRoot, "starttimestamps", new byte[0]);
createNode(zooKeeperRoot, "latestversion", new Integer(0).toString().getBytes(utf8));
+ createNode(zooKeeperRoot, "published_state_bundle", new byte[0]); // TODO dedupe string constants
byte val[] = String.valueOf(nodeIndex).getBytes(utf8);
deleteNodeIfExists(getMyIndexPath());
log.log(LogLevel.INFO, "Fleetcontroller " + nodeIndex +
@@ -165,6 +170,15 @@ public class ZooKeeperDatabase extends Database {
return (!sessionOpen || watcher.getState().equals(Watcher.Event.KeeperState.Expired));
}
+ private void maybeLogExceptionWarning(Exception e, String message) {
+ if (sessionOpen && reportErrors) {
+ StringWriter sw = new StringWriter();
+ e.printStackTrace(new PrintWriter(sw));
+ log.log(LogLevel.WARNING, String.format("Fleetcontroller %s: %s. Exception: %s\n%s",
+ nodeIndex, message, e.getMessage(), sw.toString()));
+ }
+ }
+
public boolean storeMasterVote(int wantedMasterIndex) throws InterruptedException {
byte val[] = String.valueOf(wantedMasterIndex).getBytes(utf8);
try{
@@ -174,11 +188,7 @@ public class ZooKeeperDatabase extends Database {
} catch (InterruptedException e) {
throw (InterruptedException) new InterruptedException("Interrupted").initCause(e);
} catch (Exception e) {
- if (sessionOpen && reportErrors) {
- StringWriter sw = new StringWriter();
- e.printStackTrace(new PrintWriter(sw));
- log.log(LogLevel.WARNING, "Fleetcontroller " + nodeIndex + ": Failed to create our ephemeral node and store master vote:\n" + sw);
- }
+ maybeLogExceptionWarning(e, "Failed to create our ephemeral node and store master vote");
}
return false;
}
@@ -191,11 +201,7 @@ public class ZooKeeperDatabase extends Database {
} catch (InterruptedException e) {
throw (InterruptedException) new InterruptedException("Interrupted").initCause(e);
} catch (Exception e) {
- if (sessionOpen && reportErrors) {
- StringWriter sw = new StringWriter();
- e.printStackTrace(new PrintWriter(sw));
- log.log(LogLevel.WARNING, "Fleetcontroller " + nodeIndex + ": Failed to store latest system state version used " + version + "\n" + sw);
- }
+ maybeLogExceptionWarning(e, "Failed to store latest system state version used " + version);
return false;
}
}
@@ -211,11 +217,7 @@ public class ZooKeeperDatabase extends Database {
} catch (InterruptedException e) {
throw (InterruptedException) new InterruptedException("Interrupted").initCause(e);
} catch (Exception e) {
- if (sessionOpen && reportErrors) {
- StringWriter sw = new StringWriter();
- e.printStackTrace(new PrintWriter(sw));
- log.log(LogLevel.WARNING, "Fleetcontroller " + nodeIndex + ": Failed to retrieve latest system state version used. Returning null.\n" + sw);
- }
+ maybeLogExceptionWarning(e, "Failed to retrieve latest system state version used. Returning null");
return null;
}
}
@@ -242,11 +244,7 @@ public class ZooKeeperDatabase extends Database {
} catch (InterruptedException e) {
throw (InterruptedException) new InterruptedException("Interrupted").initCause(e);
} catch (Exception e) {
- if (sessionOpen && reportErrors) {
- StringWriter sw = new StringWriter();
- e.printStackTrace(new PrintWriter(sw));
- log.log(LogLevel.WARNING, "Fleetcontroller " + nodeIndex + ": Failed to store wanted states in zookeeper: " + e.getMessage() + "\n" + sw);
- }
+ maybeLogExceptionWarning(e, "Failed to store wanted states in ZooKeeper");
return false;
}
}
@@ -276,11 +274,7 @@ public class ZooKeeperDatabase extends Database {
} catch (InterruptedException e) {
throw (InterruptedException) new InterruptedException("Interrupted").initCause(e);
} catch (Exception e) {
- if (sessionOpen && reportErrors) {
- StringWriter sw = new StringWriter();
- e.printStackTrace(new PrintWriter(sw));
- log.log(LogLevel.WARNING, "Fleetcontroller " + nodeIndex + ": Failed to retrieve wanted states from zookeeper: " + e.getMessage() + "\n" + sw);
- }
+ maybeLogExceptionWarning(e, "Failed to retrieve wanted states from ZooKeeper");
return null;
}
}
@@ -301,11 +295,7 @@ public class ZooKeeperDatabase extends Database {
} catch (InterruptedException e) {
throw (InterruptedException) new InterruptedException("Interrupted").initCause(e);
} catch (Exception e) {
- if (sessionOpen && reportErrors) {
- StringWriter sw = new StringWriter();
- e.printStackTrace(new PrintWriter(sw));
- log.log(LogLevel.WARNING, "Fleetcontroller " + nodeIndex + ": Failed to store start timestamps in zookeeper: " + e.getMessage() + "\n" + sw);
- }
+ maybeLogExceptionWarning(e, "Failed to store start timestamps in ZooKeeper");
return false;
}
}
@@ -336,12 +326,45 @@ public class ZooKeeperDatabase extends Database {
} catch (InterruptedException e) {
throw (InterruptedException) new InterruptedException("Interrupted").initCause(e);
} catch (Exception e) {
- if (sessionOpen && reportErrors) {
- StringWriter sw = new StringWriter();
- e.printStackTrace(new PrintWriter(sw));
- log.log(LogLevel.WARNING, "Fleetcontroller " + nodeIndex + ": Failed to retrieve start timestamps from zookeeper: " + e.getMessage() + "\n" + sw);
- }
+ maybeLogExceptionWarning(e, "Failed to retrieve start timestamps from ZooKeeper");
return null;
}
}
+
+ @Override
+ public boolean storeLastPublishedStateBundle(ClusterStateBundle stateBundle) throws InterruptedException {
+ EnvelopedClusterStateBundleCodec envelopedBundleCodec = new SlimeClusterStateBundleCodec();
+ byte[] encodedBundle = envelopedBundleCodec.encodeWithEnvelope(stateBundle);
+ try{
+ log.log(LogLevel.DEBUG, () -> String.format("Fleetcontroller %d: Storing published state bundle %s at '%spublished_state_bundle'",
+ nodeIndex, stateBundle, zooKeeperRoot));
+ // TODO CAS on expected zknode version
+ session.setData(zooKeeperRoot + "published_state_bundle", encodedBundle, -1);
+ } catch (InterruptedException e) {
+ throw (InterruptedException) new InterruptedException("Interrupted").initCause(e);
+ } catch (Exception e) {
+ maybeLogExceptionWarning(e, "Failed to store start timestamps in ZooKeeper");
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public ClusterStateBundle retrieveLastPublishedStateBundle() throws InterruptedException {
+ Stat stat = new Stat();
+ try {
+ byte[] data = session.getData(zooKeeperRoot + "published_state_bundle", false, stat);
+ if (data != null && data.length != 0) {
+ EnvelopedClusterStateBundleCodec envelopedBundleCodec = new SlimeClusterStateBundleCodec();
+ return envelopedBundleCodec.decodeWithEnvelope(data);
+ }
+ } catch (InterruptedException e) {
+ throw (InterruptedException) new InterruptedException("Interrupted").initCause(e);
+ } catch (Exception e) {
+ maybeLogExceptionWarning(e, "Failed to retrieve last published cluster state bundle from " +
+ "ZooKeeper, will use an empty state as baseline");
+ }
+ return ClusterStateBundle.ofBaselineOnly(AnnotatedClusterState.emptyState());
+ }
+
}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/ZooKeeperDatabaseFactory.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/ZooKeeperDatabaseFactory.java
new file mode 100644
index 00000000000..64dfcccebc9
--- /dev/null
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/ZooKeeperDatabaseFactory.java
@@ -0,0 +1,14 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.clustercontroller.core.database;
+
+import com.yahoo.vespa.clustercontroller.core.ContentCluster;
+
+public class ZooKeeperDatabaseFactory implements DatabaseFactory {
+
+ @Override
+ public Database create(Params params) throws Exception {
+ return new ZooKeeperDatabase(params.cluster, params.nodeIndex, params.dbAddress,
+ params.dbSessionTimeout, params.listener);
+ }
+
+}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/listeners/SystemStateListener.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/listeners/SystemStateListener.java
index 40fe91f0c1e..764bb3a0d92 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/listeners/SystemStateListener.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/listeners/SystemStateListener.java
@@ -5,6 +5,9 @@ import com.yahoo.vespa.clustercontroller.core.ClusterStateBundle;
public interface SystemStateListener {
- void handleNewSystemState(ClusterStateBundle states);
+ // TODO consider rename to bundle
+ void handleNewPublishedState(ClusterStateBundle states);
+
+ default void handleNewCandidateState(ClusterStateBundle states) {}
}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/EnvelopedClusterStateBundleCodec.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/EnvelopedClusterStateBundleCodec.java
new file mode 100644
index 00000000000..ab98952cddb
--- /dev/null
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/EnvelopedClusterStateBundleCodec.java
@@ -0,0 +1,19 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.clustercontroller.core.rpc;
+
+import com.yahoo.vespa.clustercontroller.core.ClusterStateBundle;
+
+/**
+ * Cluster state bundle codec which opaquely encodes/decodes both the bundle
+ * as well as the metadata required to correctly perform compression/decompression.
+ *
+ * Useful for embedding an opaque bundle blob somewhere without needing to care aboout
+ * any of the associated metadata.
+ */
+public interface EnvelopedClusterStateBundleCodec {
+
+ byte[] encodeWithEnvelope(ClusterStateBundle stateBundle);
+
+ ClusterStateBundle decodeWithEnvelope(byte[] encodedClusterStateBundle);
+
+}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/SlimeClusterStateBundleCodec.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/SlimeClusterStateBundleCodec.java
index f6034cb34ff..1c391f9aacf 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/SlimeClusterStateBundleCodec.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/SlimeClusterStateBundleCodec.java
@@ -18,8 +18,11 @@ import java.util.Map;
*
* LZ4 compression is transparently applied during encoding and decompression is
* subsequently applied during decoding.
+ *
+ * Implements optional Slime-based enveloping for *WithEnvelope methods, which removes
+ * need to explicitly track compression metadata by the caller.
*/
-public class SlimeClusterStateBundleCodec implements ClusterStateBundleCodec {
+public class SlimeClusterStateBundleCodec implements ClusterStateBundleCodec, EnvelopedClusterStateBundleCodec {
private static final Compressor compressor = new Compressor(CompressionType.LZ4, 3, 0.90, 1024);
@@ -55,4 +58,30 @@ public class SlimeClusterStateBundleCodec implements ClusterStateBundleCodec {
return ClusterStateBundle.of(AnnotatedClusterState.withoutAnnotations(baseline), derivedStates);
}
+
+ // Technically the Slime enveloping could be its own class that is bundle codec independent, but
+ // realistically there won't be any other implementations. Can be trivially factored out if required.
+ @Override
+ public byte[] encodeWithEnvelope(ClusterStateBundle stateBundle) {
+ EncodedClusterStateBundle toEnvelope = encode(stateBundle);
+
+ Slime slime = new Slime();
+ Cursor root = slime.setObject();
+ root.setLong("compression-type", toEnvelope.getCompression().type().getCode());
+ root.setLong("uncompressed-size", toEnvelope.getCompression().uncompressedSize());
+ root.setData("data", toEnvelope.getCompression().data());
+ return BinaryFormat.encode(slime);
+ }
+
+ @Override
+ public ClusterStateBundle decodeWithEnvelope(byte[] encodedClusterStateBundle) {
+ // We expect ZK writes to be atomic, letting us not worry about partially written or corrupted blobs.
+ Slime slime = BinaryFormat.decode(encodedClusterStateBundle);
+ Inspector root = slime.get();
+ CompressionType compressionType = CompressionType.valueOf((byte)root.field("compression-type").asLong());
+ int uncompressedSize = (int)root.field("uncompressed-size").asLong();
+ byte[] data = root.field("data").asData();
+ return decode(EncodedClusterStateBundle.fromCompressionBuffer(
+ new Compressor.Compression(compressionType, uncompressedSize, data)));
+ }
}
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DatabaseHandlerTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DatabaseHandlerTest.java
new file mode 100644
index 00000000000..f929cd0a605
--- /dev/null
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DatabaseHandlerTest.java
@@ -0,0 +1,107 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.clustercontroller.core;
+
+import com.yahoo.vespa.clustercontroller.core.database.Database;
+import com.yahoo.vespa.clustercontroller.core.database.DatabaseFactory;
+import com.yahoo.vespa.clustercontroller.core.database.DatabaseHandler;
+import com.yahoo.vespa.clustercontroller.core.listeners.NodeAddedOrRemovedListener;
+import com.yahoo.vespa.clustercontroller.core.listeners.NodeStateOrHostInfoChangeHandler;
+import org.junit.Test;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class DatabaseHandlerTest {
+
+ static class Fixture {
+ final ClusterFixture clusterFixture = ClusterFixture.forFlatCluster(10);
+ final FleetController mockController = mock(FleetController.class);
+ final Database mockDatabase = mock(Database.class);
+ final Timer mockTimer = mock(Timer.class);
+ final DatabaseFactory mockDbFactory = (params) -> mockDatabase;
+ final String databaseAddress = "localhost:0";
+ final Object monitor = new Object();
+ final ClusterStateBundle dummyBundle;
+
+ Fixture() throws Exception {
+ dummyBundle = ClusterStateBundleUtil.makeBundle("distributor:2 storage:2",
+ StateMapping.of("default", "distributor:2 storage:2 .0.s:d"),
+ StateMapping.of("upsidedown", "distributor:2 .0.s:d storage:2"));
+
+ when(mockDatabase.isClosed()).thenReturn(false);
+ when(mockDatabase.storeMasterVote(anyInt())).thenReturn(true);
+ when(mockDatabase.storeLastPublishedStateBundle(any())).thenReturn(true);
+ when(mockTimer.getCurrentTimeInMillis()).thenReturn(1000000L);
+ }
+
+ DatabaseHandler.Context createMockContext() {
+ return new DatabaseHandler.Context() {
+ @Override
+ public ContentCluster getCluster() {
+ return clusterFixture.cluster();
+ }
+
+ @Override
+ public FleetController getFleetController() {
+ return mockController;
+ }
+
+ @Override
+ public NodeAddedOrRemovedListener getNodeAddedOrRemovedListener() {
+ return null;
+ }
+
+ @Override
+ public NodeStateOrHostInfoChangeHandler getNodeStateUpdateListener() {
+ return null;
+ }
+ };
+ }
+
+ DatabaseHandler createHandler() throws Exception {
+ return new DatabaseHandler(mockDbFactory, mockTimer, databaseAddress, 0, monitor);
+ }
+ }
+
+ @Test
+ public void can_store_latest_cluster_state_bundle() throws Exception {
+ Fixture f = new Fixture();
+ DatabaseHandler handler = f.createHandler();
+ handler.doNextZooKeeperTask(f.createMockContext()); // Database setup step
+ handler.saveLatestClusterStateBundle(f.createMockContext(), f.dummyBundle);
+ handler.doNextZooKeeperTask(f.createMockContext()); // Bundle store step
+
+ verify(f.mockDatabase).storeLastPublishedStateBundle(eq(f.dummyBundle));
+ }
+
+ @Test
+ public void can_load_latest_cluster_state_bundle() throws Exception {
+ Fixture f = new Fixture();
+ DatabaseHandler handler = f.createHandler();
+ handler.doNextZooKeeperTask(f.createMockContext()); // Database setup step
+
+ when(f.mockDatabase.retrieveLastPublishedStateBundle()).thenReturn(f.dummyBundle);
+
+ ClusterStateBundle retrievedBundle = handler.getLatestClusterStateBundle();
+ assertThat(retrievedBundle, equalTo(f.dummyBundle));
+ }
+
+ // FIXME I don't like the semantics of this, but it mirrors the legacy behavior for the
+ // rest of the DB load operations exposed by the DatabaseHandler.
+ @Test
+ public void empty_bundle_is_returned_if_no_db_connection() throws Exception {
+ Fixture f = new Fixture();
+ DatabaseHandler handler = f.createHandler();
+ // Note: no DB setup step
+
+ ClusterStateBundle retrievedBundle = handler.getLatestClusterStateBundle();
+ assertThat(retrievedBundle, equalTo(ClusterStateBundle.empty()));
+ }
+
+}
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java
index a2da6bd1c6c..6d59a672e86 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java
@@ -1,8 +1,6 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.clustercontroller.core;
-import com.yahoo.compress.CompressionType;
-import com.yahoo.compress.Compressor;
import com.yahoo.jrt.*;
import com.yahoo.jrt.StringValue;
import com.yahoo.jrt.slobrok.api.BackOffPolicy;
@@ -10,15 +8,13 @@ import com.yahoo.jrt.slobrok.api.Register;
import com.yahoo.jrt.slobrok.api.SlobrokList;
import com.yahoo.log.LogLevel;
import com.yahoo.vdslib.state.*;
-import com.yahoo.vespa.clustercontroller.core.rpc.EncodedClusterStateBundle;
import com.yahoo.vespa.clustercontroller.core.rpc.RPCCommunicator;
import com.yahoo.vespa.clustercontroller.core.rpc.RPCUtil;
-import com.yahoo.vespa.clustercontroller.core.rpc.SlimeClusterStateBundleCodec;
-import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.*;
import java.util.logging.Logger;
+import java.util.stream.Collectors;
/**
*
@@ -34,6 +30,7 @@ public class DummyVdsNode {
private NodeType type;
private int index;
private NodeState nodeState;
+ private String hostInfo = "{}";
private Supervisor supervisor;
private Acceptor acceptor;
private Register register;
@@ -67,10 +64,10 @@ public class DummyVdsNode {
private final List<Req> waitingRequests = new LinkedList<>();
/**
- * History of received system states.
+ * History of received cluster states.
* Any access to this list or to its members must be synchronized on the timer variable.
*/
- private List<ClusterState> systemState = new LinkedList<>();
+ private List<ClusterStateBundle> clusterStateBundles = new LinkedList<>();
private Thread messageResponder = new Thread() {
public void run() {
@@ -216,8 +213,10 @@ public class DummyVdsNode {
/** Returns the latest system state version received, or empty if none are received yet. */
private Optional<Integer> getLatestSystemStateVersion() {
synchronized(timer) {
- if (systemState.isEmpty()) return Optional.empty();
- return Optional.of(systemState.get(0).getVersion());
+ if (clusterStateBundles.isEmpty()) {
+ return Optional.empty();
+ }
+ return Optional.of(clusterStateBundles.get(0).getVersion());
}
}
@@ -260,7 +259,7 @@ public class DummyVdsNode {
log.log(LogLevel.DEBUG, "Dummy node " + this + " answering pending node state request.");
req.request.returnValues().add(new StringValue(nodeState.serialize()));
if (req.request.methodName().equals("getnodestate3")) {
- req.request.returnValues().add(new StringValue("Dummy node host info"));
+ req.request.returnValues().add(new StringValue(hostInfo));
}
req.request.returnRequest();
++setNodeStateReplies;
@@ -268,14 +267,19 @@ public class DummyVdsNode {
waitingRequests.clear();
}
- public void setNodeState(NodeState state) {
+ public void setNodeState(NodeState state, String hostInfo) {
log.log(LogLevel.DEBUG, "Dummy node " + this + " got new state: " + state);
synchronized(timer) {
this.nodeState = state;
+ this.hostInfo = hostInfo;
replyToPendingNodeStateRequests();
}
}
+ public void setNodeState(NodeState state) {
+ setNodeState(state, "{}");
+ }
+
public void setNodeState(State state) {
setNodeState(new NodeState(type, state));
}
@@ -287,16 +291,22 @@ public class DummyVdsNode {
}
public List<ClusterState> getSystemStatesReceived() {
- List<ClusterState> states = new ArrayList<>();
synchronized(timer) {
- states.addAll(systemState);
+ return clusterStateBundles.stream()
+ .map(ClusterStateBundle::getBaselineClusterState)
+ .collect(Collectors.toList());
+ }
+ }
+
+ public ClusterStateBundle getClusterStateBundle() {
+ synchronized(timer) {
+ return (clusterStateBundles.isEmpty() ? null : clusterStateBundles.get(0));
}
- return states;
}
public ClusterState getClusterState() {
synchronized(timer) {
- return (systemState.isEmpty() ? null : systemState.get(0));
+ return (clusterStateBundles.isEmpty() ? null : clusterStateBundles.get(0).getBaselineClusterState());
}
}
@@ -482,7 +492,7 @@ public class DummyVdsNode {
ClusterState newState = new ClusterState(req.parameters().get(0).asString());
synchronized(timer) {
updateStartTimestamps(newState);
- systemState.add(0, newState);
+ clusterStateBundles.add(0, ClusterStateBundle.ofBaselineOnly(AnnotatedClusterState.withoutAnnotations(newState)));
timer.notifyAll();
}
req.returnValues().add(new Int32Value(1));
@@ -505,7 +515,7 @@ public class DummyVdsNode {
ClusterState newState = new ClusterState(req.parameters().get(0).asString());
synchronized(timer) {
updateStartTimestamps(newState);
- systemState.add(0, newState);
+ clusterStateBundles.add(0, ClusterStateBundle.ofBaselineOnly(AnnotatedClusterState.withoutAnnotations(newState)));
timer.notifyAll();
}
log.log(LogLevel.DEBUG, "Dummy node " + this + ": Got new system state " + newState);
@@ -523,13 +533,12 @@ public class DummyVdsNode {
return;
}
ClusterStateBundle stateBundle = RPCUtil.decodeStateBundleFromSetDistributionStatesRequest(req);
- ClusterState newState = stateBundle.getBaselineClusterState();
synchronized(timer) {
- updateStartTimestamps(newState);
- systemState.add(0, newState);
+ updateStartTimestamps(stateBundle.getBaselineClusterState());
+ clusterStateBundles.add(0, stateBundle);
timer.notifyAll();
}
- log.log(LogLevel.DEBUG, "Dummy node " + this + ": Got new cluster state " + newState);
+ log.log(LogLevel.DEBUG, "Dummy node " + this + ": Got new cluster state " + stateBundle);
} catch (Exception e) {
log.log(LogLevel.ERROR, "Dummy node " + this + ": An error occured when answering setdistributionstates request: " + e.getMessage());
e.printStackTrace(System.err);
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/FleetControllerTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/FleetControllerTest.java
index 19d87b878d4..21d9b0a7a1f 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/FleetControllerTest.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/FleetControllerTest.java
@@ -18,6 +18,7 @@ import com.yahoo.vdslib.state.NodeState;
import com.yahoo.vdslib.state.NodeType;
import com.yahoo.vdslib.state.State;
import com.yahoo.vespa.clustercontroller.core.database.DatabaseHandler;
+import com.yahoo.vespa.clustercontroller.core.database.ZooKeeperDatabaseFactory;
import com.yahoo.vespa.clustercontroller.core.rpc.RPCCommunicator;
import com.yahoo.vespa.clustercontroller.core.rpc.RpcServer;
import com.yahoo.vespa.clustercontroller.core.rpc.SlobrokClient;
@@ -157,7 +158,7 @@ public abstract class FleetControllerTest implements Waiter {
status = new StatusPageServer(timer, timer, options.httpPort);
}
RpcServer rpcServer = new RpcServer(timer, timer, options.clusterName, options.fleetControllerIndex, options.slobrokBackOffPolicy);
- DatabaseHandler database = new DatabaseHandler(timer, options.zooKeeperServerAddress, options.fleetControllerIndex, timer);
+ DatabaseHandler database = new DatabaseHandler(new ZooKeeperDatabaseFactory(), timer, options.zooKeeperServerAddress, options.fleetControllerIndex, timer);
StateChangeHandler stateGenerator = new StateChangeHandler(timer, log, metricUpdater);
SystemStateBroadcaster stateBroadcaster = new SystemStateBroadcaster(timer, timer);
MasterElectionHandler masterElectionHandler = new MasterElectionHandler(options.fleetControllerIndex, options.fleetControllerCount, timer, timer);
@@ -311,6 +312,8 @@ public abstract class FleetControllerTest implements Waiter {
public ClusterState waitForStableSystem() throws Exception { return waiter.waitForStableSystem(); }
public ClusterState waitForStableSystem(int nodeCount) throws Exception { return waiter.waitForStableSystem(nodeCount); }
public ClusterState waitForState(String state) throws Exception { return waiter.waitForState(state); }
+ public ClusterState waitForStateInAllSpaces(String state) throws Exception { return waiter.waitForStateInAllSpaces(state); }
+ public ClusterState waitForStateInSpace(String space, String state) throws Exception { return waiter.waitForStateInSpace(space, state); }
public ClusterState waitForState(String state, int timeoutMS) throws Exception { return waiter.waitForState(state, timeoutMS); }
public ClusterState waitForInitProgressPassed(Node n, double progress) { return waiter.waitForInitProgressPassed(n, progress); }
public ClusterState waitForClusterStateIncludingNodesWithMinUsedBits(int bitcount, int nodecount) { return waiter.waitForClusterStateIncludingNodesWithMinUsedBits(bitcount, nodecount); }
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MasterElectionTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MasterElectionTest.java
index dc79a92e68b..b2464e83f95 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MasterElectionTest.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MasterElectionTest.java
@@ -9,6 +9,8 @@ import com.yahoo.jrt.Transport;
import com.yahoo.jrt.slobrok.server.Slobrok;
import com.yahoo.log.LogLevel;
import com.yahoo.vdslib.state.ClusterState;
+import com.yahoo.vdslib.state.NodeState;
+import com.yahoo.vdslib.state.NodeType;
import com.yahoo.vdslib.state.State;
import org.junit.Ignore;
import org.junit.Rule;
@@ -483,7 +485,7 @@ public class MasterElectionTest extends FleetControllerTest {
// no guarantees that it has been published to any nodes yet.
final long preElectionVersionNumber = fleetControllers.get(0).getSystemState().getVersion();
- // Nuke controller 1, leaving controller 1 in charge.
+ // Nuke controller 0, leaving controller 1 in charge.
// It should have observed the most recently written version number and increase this
// number before publishing its own new state.
fleetControllers.get(0).shutdown();
@@ -495,4 +497,47 @@ public class MasterElectionTest extends FleetControllerTest {
assertThat(postElectionVersionNumber, greaterThan(preElectionVersionNumber));
}
+ @Test
+ public void previously_published_state_is_taken_into_account_for_default_space_when_controller_bootstraps() throws Exception {
+ startingTest("MasterElectionTest::previously_published_state_is_taken_into_account_for_default_space_when_controller_bootstraps");
+ FleetControllerOptions options = new FleetControllerOptions("mycluster");
+ options.enableMultipleBucketSpaces = true;
+ options.masterZooKeeperCooldownPeriod = 1;
+ options.minTimeBeforeFirstSystemStateBroadcast = 10000;
+ setUpFleetController(3, true, options);
+ setUpVdsNodes(true, new DummyVdsNodeOptions());
+ fleetController = fleetControllers.get(0); // Required to prevent waitForStableSystem from NPE'ing
+ waitForMaster(0);
+ waitForStableSystem();
+ log.info("Waiting for full maintenance mode in default space");
+ waitForStateInSpace("default", "version:\\d+ distributor:10 storage:10 .0.s:m .1.s:m .2.s:m .3.s:m .4.s:m .5.s:m .6.s:m .7.s:m .8.s:m .9.s:m");
+
+ log.info("Responding with zero global merges pending from all distributors");
+ final int ackVersion = fleetControllers.get(0).getClusterStateBundle().getVersion();
+ // ACKing with no merge state in host info (implied: no pending merges) should not cause
+ // a new state to be published before the last node has ACKed. Consequently there should
+ // not be any race potential where a new version is published concurrently with our attempts
+ // at ACKing a previous one.
+ this.nodes.stream().filter(DummyVdsNode::isDistributor).forEach(node -> {
+ node.setNodeState(new NodeState(NodeType.DISTRIBUTOR, State.UP),
+ String.format("{\"cluster-state-version\":%d}", ackVersion));
+ });
+ waitForStateInAllSpaces("version:\\d+ distributor:10 storage:10");
+
+ log.info("Bundle before restart cycle: " + fleetControllers.get(0).getClusterStateBundle());
+ log.info("Doing restart cycle of controller 0");
+ fleetControllers.get(0).shutdown();
+ waitForMaster(1);
+ waitForCompleteCycle(1);
+
+ fleetControllers.set(0, createFleetController(usingFakeTimer, fleetControllers.get(0).getOptions(), true, null));
+ waitForMaster(0);
+ waitForCompleteCycle(0);
+
+ // We should NOT publish a state where all storage nodes are in Maintenance, since they were
+ // marked as Up in the last published cluster state.
+ log.info("Bundle after restart cycle: " + fleetControllers.get(0).getClusterStateBundle());
+ waitForStateInAllSpaces("version:\\d+ distributor:10 storage:10");
+ }
+
}
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java
index b10a8101c37..32de3591f2d 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java
@@ -5,6 +5,7 @@ import com.yahoo.jrt.*;
import com.yahoo.vdslib.distribution.ConfiguredNode;
import com.yahoo.vdslib.state.*;
import com.yahoo.vespa.clustercontroller.core.database.DatabaseHandler;
+import com.yahoo.vespa.clustercontroller.core.database.ZooKeeperDatabaseFactory;
import com.yahoo.vespa.clustercontroller.core.testutils.StateWaiter;
import com.yahoo.vespa.clustercontroller.utils.util.NoMetricReporter;
import org.junit.Before;
@@ -46,7 +47,7 @@ public class StateChangeTest extends FleetControllerTest {
ContentCluster cluster = new ContentCluster(options.clusterName, options.nodes, options.storageDistribution,
options.minStorageNodesUp, options.minRatioOfStorageNodesUp);
NodeStateGatherer stateGatherer = new NodeStateGatherer(timer, timer, eventLog);
- DatabaseHandler database = new DatabaseHandler(timer, options.zooKeeperServerAddress, options.fleetControllerIndex, timer);
+ DatabaseHandler database = new DatabaseHandler(new ZooKeeperDatabaseFactory(), timer, options.zooKeeperServerAddress, options.fleetControllerIndex, timer);
StateChangeHandler stateGenerator = new StateChangeHandler(timer, eventLog, metricUpdater);
SystemStateBroadcaster stateBroadcaster = new SystemStateBroadcaster(timer, timer);
MasterElectionHandler masterElectionHandler = new MasterElectionHandler(options.fleetControllerIndex, options.fleetControllerCount, timer, timer);
@@ -955,8 +956,9 @@ public class StateChangeTest extends FleetControllerTest {
StateWaiter waiter = new StateWaiter(timer);
fleetController.addSystemStateListener(waiter);
- // Ensure all nodes have been seen by fleetcontroller and that it has had enough time to possibly have sent a cluster state
- waiter.waitForState("version:\\d+ distributor:10 (\\.\\d+\\.t:\\d+ )*storage:10 (\\.\\d+\\.t:\\d+ )*.1.s:d( \\.\\d+\\.t:\\d+)*", timeoutMS);
+ // Ensure all nodes have been seen by fleetcontroller and that it has had enough time to possibly have sent a cluster state
+ // Note: this is a candidate state and therefore NOT versioned yet
+ waiter.waitForState("^distributor:10 (\\.\\d+\\.t:\\d+ )*storage:10 (\\.\\d+\\.t:\\d+ )*.1.s:d( \\.\\d+\\.t:\\d+)*", timeoutMS);
waitForCompleteCycle();
new StateMessageChecker(nodes) {
@Override int expectedMessageCount(final DummyVdsNode node) { return 0; }
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateVersionTrackerTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateVersionTrackerTest.java
index 2a3be1e7194..95b699d4de4 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateVersionTrackerTest.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateVersionTrackerTest.java
@@ -303,6 +303,23 @@ public class StateVersionTrackerTest {
assertFalse(tracker.bucketSpaceMergeCompletionStateHasChanged());
}
+ @Test
+ public void setting_zookeeper_retrieved_bundle_sets_current_versioned_state_and_resets_candidate_state() {
+ final StateVersionTracker versionTracker = createWithMockedMetrics();
+ versionTracker.setVersionRetrievedFromZooKeeper(100);
+ versionTracker.updateLatestCandidateStateBundle(
+ ClusterStateBundle.ofBaselineOnly(stateWithoutAnnotations("distributor:1 storage:1")));
+ versionTracker.promoteCandidateToVersionedState(12345);
+
+ ClusterStateBundle zkBundle = ClusterStateBundle.ofBaselineOnly(stateWithoutAnnotations("version:199 distributor:2 storage:2"));
+ versionTracker.setVersionRetrievedFromZooKeeper(200);
+ versionTracker.setClusterStateBundleRetrievedFromZooKeeper(zkBundle);
+
+ assertThat(versionTracker.getLatestCandidateState(), equalTo(AnnotatedClusterState.emptyState()));
+ assertThat(versionTracker.getVersionedClusterStateBundle(), equalTo(zkBundle));
+ assertThat(versionTracker.getCurrentVersion(), equalTo(200)); // Not from bundle, but from explicitly stored version
+ }
+
private HostInfo createHostInfo(long bucketsPending) {
return HostInfo.createHostInfo(
"{\n" +
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ZooKeeperDatabaseTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ZooKeeperDatabaseTest.java
new file mode 100644
index 00000000000..e5200b63a7e
--- /dev/null
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ZooKeeperDatabaseTest.java
@@ -0,0 +1,76 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.clustercontroller.core;
+
+import com.yahoo.vespa.clustercontroller.core.database.Database;
+import com.yahoo.vespa.clustercontroller.core.database.ZooKeeperDatabase;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.time.Duration;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+
+public class ZooKeeperDatabaseTest {
+
+ private static class Fixture implements AutoCloseable {
+ final ZooKeeperTestServer zkServer;
+ ClusterFixture clusterFixture;
+ ZooKeeperDatabase zkDatabase;
+ int nodeIndex = 1;
+ Duration sessionTimeout = Duration.ofSeconds(60);
+ Database.DatabaseListener mockListener = mock(Database.DatabaseListener.class);
+
+ Fixture() throws IOException {
+ zkServer = new ZooKeeperTestServer();
+ clusterFixture = ClusterFixture.forFlatCluster(10);
+ }
+
+ void createDatabase() throws Exception {
+ closeDatabaseIfOpen();
+ zkDatabase = new ZooKeeperDatabase(clusterFixture.cluster(), nodeIndex, zkServer.getAddress(),
+ (int)sessionTimeout.toMillis(), mockListener);
+ }
+
+ ZooKeeperDatabase db() { return zkDatabase; }
+
+ void closeDatabaseIfOpen() {
+ if (zkDatabase != null) {
+ zkDatabase.close();
+ zkDatabase = null;
+ }
+ }
+
+ @Override
+ public void close() {
+ closeDatabaseIfOpen();
+ zkServer.shutdown(true);
+ }
+ }
+
+ @Test
+ public void can_store_and_load_cluster_state_bundle_from_database() throws Exception {
+ try (Fixture f = new Fixture()) {
+ f.createDatabase();
+ ClusterStateBundle bundleToStore = ClusterStateBundleUtil.makeBundle("distributor:2 storage:2",
+ StateMapping.of("default", "distributor:2 storage:2 .0.s:d"),
+ StateMapping.of("upsidedown", "distributor:2 .0.s:d storage:2"));
+ f.db().storeLastPublishedStateBundle(bundleToStore);
+
+ ClusterStateBundle bundleReceived = f.db().retrieveLastPublishedStateBundle();
+ assertThat(bundleReceived, equalTo(bundleToStore));
+ }
+ }
+
+ @Test
+ public void empty_state_bundle_is_returned_if_no_bundle_already_stored_in_database() throws Exception {
+ try (Fixture f = new Fixture()) {
+ f.createDatabase();
+ ClusterStateBundle bundleReceived = f.db().retrieveLastPublishedStateBundle();
+
+ assertThat(bundleReceived, equalTo(ClusterStateBundle.empty()));
+ }
+ }
+
+}
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/SlimeClusterStateBundleCodecTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/SlimeClusterStateBundleCodecTest.java
index f2fe494ce61..b19b1d780bf 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/SlimeClusterStateBundleCodecTest.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/SlimeClusterStateBundleCodecTest.java
@@ -18,6 +18,12 @@ public class SlimeClusterStateBundleCodecTest {
return codec.decode(encoded);
}
+ private static ClusterStateBundle roundtripEncodeWithEnvelope(ClusterStateBundle stateBundle) {
+ SlimeClusterStateBundleCodec codec = new SlimeClusterStateBundleCodec();
+ byte[] encoded = codec.encodeWithEnvelope(stateBundle);
+ return codec.decodeWithEnvelope(encoded);
+ }
+
@Test
public void baseline_only_bundle_can_be_round_trip_encoded() {
ClusterStateBundle stateBundle = ClusterStateBundleUtil.makeBundle("distributor:2 storage:2");
@@ -53,4 +59,17 @@ public class SlimeClusterStateBundleCodecTest {
assertThat(decodedBundle, equalTo(stateBundle));
}
+ @Test
+ public void uncompressed_enveloped_bundle_can_be_roundtrip_encoded() {
+ // Insufficient length and too much entropy to be compressed
+ ClusterStateBundle stateBundle = ClusterStateBundleUtil.makeBundle("distributor:2 storage:3");
+ assertThat(roundtripEncodeWithEnvelope(stateBundle), equalTo(stateBundle));
+ }
+
+ @Test
+ public void compressable_enveloped_bundle_can_be_roundtrip_encoded() {
+ ClusterStateBundle stateBundle = makeCompressableBundle();
+ assertThat(roundtripEncodeWithEnvelope(stateBundle), equalTo(stateBundle));
+ }
+
}
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/StateWaiter.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/StateWaiter.java
index 730b03ff7c9..7811e68c381 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/StateWaiter.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/StateWaiter.java
@@ -23,7 +23,8 @@ public class StateWaiter implements SystemStateListener {
this.timer = timer;
}
- public void handleNewSystemState(ClusterStateBundle state) {
+ @Override
+ public void handleNewPublishedState(ClusterStateBundle state) {
synchronized(timer) {
current = state.getBaselineClusterState();
@@ -32,6 +33,13 @@ public class StateWaiter implements SystemStateListener {
}
}
+ @Override
+ public void handleNewCandidateState(ClusterStateBundle states) {
+ // Treat candidate states as if they were published for the tests that use
+ // this (deprecated) waiter class.
+ handleNewPublishedState(states);
+ }
+
public int getStateUpdates() { return Math.max(0, stateUpdates); }
public ClusterState getCurrentSystemState() {
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/WaitCondition.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/WaitCondition.java
index 6396b27aa79..d140ef998b6 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/WaitCondition.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/WaitCondition.java
@@ -4,12 +4,17 @@ package com.yahoo.vespa.clustercontroller.core.testutils;
import com.yahoo.vdslib.state.ClusterState;
import com.yahoo.vdslib.state.Node;
import com.yahoo.vdslib.state.NodeType;
+import com.yahoo.vespa.clustercontroller.core.AnnotatedClusterState;
import com.yahoo.vespa.clustercontroller.core.ClusterStateBundle;
import com.yahoo.vespa.clustercontroller.core.DummyVdsNode;
import com.yahoo.vespa.clustercontroller.core.FleetController;
import com.yahoo.vespa.clustercontroller.core.listeners.SystemStateListener;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -19,14 +24,14 @@ import java.util.regex.Pattern;
public interface WaitCondition {
/** Return null if met, why not if it is not met. */
- public String isConditionMet();
+ String isConditionMet();
- public abstract class StateWait implements WaitCondition {
+ abstract class StateWait implements WaitCondition {
private final Object monitor;
protected ClusterState currentState;
private final SystemStateListener listener = new SystemStateListener() {
@Override
- public void handleNewSystemState(ClusterStateBundle state) {
+ public void handleNewPublishedState(ClusterStateBundle state) {
synchronized (monitor) {
currentState = state.getBaselineClusterState();
monitor.notifyAll();
@@ -36,7 +41,9 @@ public interface WaitCondition {
public StateWait(FleetController fc, Object monitor) {
this.monitor = monitor;
- fc.addSystemStateListener(listener);
+ synchronized (this.monitor) {
+ fc.addSystemStateListener(listener);
+ }
}
public ClusterState getCurrentState() {
@@ -46,11 +53,13 @@ public interface WaitCondition {
}
}
- public class RegexStateMatcher extends StateWait {
+ class RegexStateMatcher extends StateWait {
private final Pattern pattern;
private Collection<DummyVdsNode> nodesToCheck;
private ClusterState lastCheckedState;
+ private boolean checkAllSpaces = false;
+ private Set<String> checkSpaceSubset = Collections.emptySet();
public RegexStateMatcher(String regex, FleetController fc, Object monitor) {
super(fc, monitor);
@@ -62,19 +71,51 @@ public interface WaitCondition {
return this;
}
+ public RegexStateMatcher checkAllSpaces(boolean checkAllSpaces) {
+ this.checkAllSpaces = checkAllSpaces;
+ return this;
+ }
+
+ public RegexStateMatcher checkSpaceSubset(Set<String> spaces) {
+ this.checkSpaceSubset = spaces;
+ return this;
+ }
+
+ private static List<ClusterState> statesInBundle(ClusterStateBundle bundle) {
+ List<ClusterState> states = new ArrayList<>(3);
+ states.add(bundle.getBaselineClusterState());
+ bundle.getDerivedBucketSpaceStates().forEach((space, state) -> states.add(state.getClusterState()));
+ return states;
+ }
+
@Override
public String isConditionMet() {
if (currentState != null) {
lastCheckedState = currentState;
Matcher m = pattern.matcher(lastCheckedState.toString());
- if (m.matches()) {
+ if (m.matches() || !checkSpaceSubset.isEmpty()) {
if (nodesToCheck != null) {
for (DummyVdsNode node : nodesToCheck) {
if (node.getClusterState() == null) {
return "Node " + node + " has not received a cluster state yet";
}
- if (! pattern.matcher(withoutTimestamps(node.getClusterState().toString())).matches()) {
- return "Node " + node + " state mismatch.\n wanted: " + pattern + "\n is: " + node.getClusterState().toString();
+ // TODO refactor, simplify
+ boolean match;
+ if (checkAllSpaces) {
+ match = statesInBundle(node.getClusterStateBundle()).stream()
+ .allMatch(state -> pattern.matcher(withoutTimestamps(state.toString())).matches());
+ } else if (!checkSpaceSubset.isEmpty()) {
+ match = checkSpaceSubset.stream().allMatch(space -> {
+ String state = node.getClusterStateBundle().getDerivedBucketSpaceStates()
+ .getOrDefault(space, AnnotatedClusterState.emptyState()).getClusterState().toString();
+ return pattern.matcher(withoutTimestamps(state)).matches();
+ });
+ } else {
+ match = pattern.matcher(withoutTimestamps(node.getClusterState().toString())).matches();
+ }
+
+ if (!match) {
+ return "Node " + node + " state mismatch.\n wanted: " + pattern + "\n is: " + node.getClusterStateBundle().toString();
}
if (node.getStateCommunicationVersion() > 0) {
if (!node.hasPendingGetNodeStateRequest()) {
@@ -141,7 +182,7 @@ public interface WaitCondition {
}
}
- public static class MinUsedBitsMatcher extends StateWait {
+ class MinUsedBitsMatcher extends StateWait {
private final int bitCount;
private final int nodeCount;
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/Waiter.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/Waiter.java
index a0af1ad2d0e..fb49b1cbf47 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/Waiter.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/Waiter.java
@@ -7,28 +7,32 @@ import com.yahoo.vdslib.state.Node;
import com.yahoo.vespa.clustercontroller.core.DummyVdsNode;
import com.yahoo.vespa.clustercontroller.core.FleetController;
+import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
+import java.util.Set;
import java.util.logging.Logger;
public interface Waiter {
- public interface DataRetriever {
- public Object getMonitor();
- public FleetController getFleetController();
- public List<DummyVdsNode> getDummyNodes();
- public int getTimeoutMS();
+ interface DataRetriever {
+ Object getMonitor();
+ FleetController getFleetController();
+ List<DummyVdsNode> getDummyNodes();
+ int getTimeoutMS();
}
- public ClusterState waitForState(String state) throws Exception;
- public ClusterState waitForState(String state, int timeoutMS) throws Exception;
- public ClusterState waitForStableSystem() throws Exception;
- public ClusterState waitForStableSystem(int nodeCount) throws Exception;
- public ClusterState waitForInitProgressPassed(Node n, double progress);
- public ClusterState waitForClusterStateIncludingNodesWithMinUsedBits(int bitcount, int nodecount);
- public void wait(WaitCondition c, WaitTask wt, int timeoutMS);
+ ClusterState waitForState(String state) throws Exception;
+ ClusterState waitForStateInSpace(String space, String state) throws Exception;
+ ClusterState waitForStateInAllSpaces(String state) throws Exception;
+ ClusterState waitForState(String state, int timeoutMS) throws Exception;
+ ClusterState waitForStableSystem() throws Exception;
+ ClusterState waitForStableSystem(int nodeCount) throws Exception;
+ ClusterState waitForInitProgressPassed(Node n, double progress);
+ ClusterState waitForClusterStateIncludingNodesWithMinUsedBits(int bitcount, int nodecount);
+ void wait(WaitCondition c, WaitTask wt, int timeoutMS);
- public static class Impl implements Waiter {
+ class Impl implements Waiter {
private static final Logger log = Logger.getLogger(Impl.class.getName());
private final DataRetriever data;
@@ -37,16 +41,33 @@ public interface Waiter {
this.data = data;
}
- public ClusterState waitForState(String state) throws Exception { return waitForState(state, data.getTimeoutMS()); }
- public ClusterState waitForState(String state, int timeoutMS) throws Exception {
+ // TODO refactor
+ private ClusterState waitForState(String state, int timeoutMS, boolean checkAllSpaces, Set<String> checkSpaces) {
LinkedList<DummyVdsNode> nodesToCheck = new LinkedList<>();
for(DummyVdsNode node : data.getDummyNodes()) {
if (node.isConnected()) nodesToCheck.add(node);
}
- WaitCondition.StateWait swc = new WaitCondition.RegexStateMatcher(state, data.getFleetController(), data.getMonitor()).includeNotifyingNodes(nodesToCheck);
+ WaitCondition.StateWait swc = new WaitCondition.RegexStateMatcher(
+ state, data.getFleetController(), data.getMonitor())
+ .includeNotifyingNodes(nodesToCheck)
+ .checkAllSpaces(checkAllSpaces)
+ .checkSpaceSubset(checkSpaces);
wait(swc, new WaitTask.StateResender(data.getFleetController()), timeoutMS);
return swc.getCurrentState();
}
+
+ public ClusterState waitForState(String state) throws Exception {
+ return waitForState(state, data.getTimeoutMS());
+ }
+ public ClusterState waitForStateInAllSpaces(String state) {
+ return waitForState(state, data.getTimeoutMS(), true, Collections.emptySet());
+ }
+ public ClusterState waitForStateInSpace(String space, String state) {
+ return waitForState(state, data.getTimeoutMS(), false, Collections.singleton(space));
+ }
+ public ClusterState waitForState(String state, int timeoutMS) {
+ return waitForState(state, timeoutMS, false, Collections.emptySet());
+ }
public ClusterState waitForStableSystem() throws Exception {
return waitForStableSystem(data.getDummyNodes().size() / 2);
}