summaryrefslogtreecommitdiffstats
path: root/clustercontroller-core
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@oath.com>2018-04-20 17:04:34 +0200
committerTor Brede Vekterli <vekterli@oath.com>2018-04-24 14:12:54 +0200
commit2e736fdd071fc3ad72912afe96c767c7902870d1 (patch)
tree452c8e9d9f63c8007d77e536123791ff35e5c4f3 /clustercontroller-core
parent6a7351dc77ad191540b61555360d20caea94c156 (diff)
ZooKeeper-persist and load published cluster state bundles
Store synchronously upon each new versioned state, load whenever controller is elected master. Effectively carries over visible node states from one controller's lifetime to the next. This removes the edge case where default bucket space content nodes are marked as in Maintainence until their global merge status is known. To avoid controller tripping over its own feet, state bundles are now _not_ versioned at all until the initial send time period has passed. This prevents overwriting the state persisted from a previous controller with a transient state where all nodes are down due to not having Slobrok contact yet. A new cluster state recompute+send edge has been added when the master passes its initial state send time period.
Diffstat (limited to 'clustercontroller-core')
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateBundle.java4
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java95
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/StateVersionTracker.java18
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java18
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/Database.java6
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/DatabaseFactory.java34
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/DatabaseHandler.java52
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/ZooKeeperDatabase.java93
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/ZooKeeperDatabaseFactory.java14
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/listeners/SystemStateListener.java5
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/EnvelopedClusterStateBundleCodec.java19
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/SlimeClusterStateBundleCodec.java31
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DatabaseHandlerTest.java107
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java51
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/FleetControllerTest.java5
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MasterElectionTest.java47
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java8
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateVersionTrackerTest.java17
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ZooKeeperDatabaseTest.java76
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/SlimeClusterStateBundleCodecTest.java19
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/StateWaiter.java10
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/WaitCondition.java59
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/Waiter.java53
23 files changed, 716 insertions, 125 deletions
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateBundle.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateBundle.java
index 9291c8277da..76177e8f1c1 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateBundle.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateBundle.java
@@ -78,6 +78,10 @@ public class ClusterStateBundle {
return new ClusterStateBundle(baselineState, Collections.emptyMap());
}
+ public static ClusterStateBundle empty() {
+ return ofBaselineOnly(AnnotatedClusterState.emptyState());
+ }
+
public AnnotatedClusterState getBaselineAnnotatedState() {
return baselineState;
}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java
index 54c6c11f81e..e781bf3b145 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java
@@ -10,6 +10,7 @@ import com.yahoo.vdslib.state.Node;
import com.yahoo.vdslib.state.NodeState;
import com.yahoo.vdslib.state.State;
import com.yahoo.vespa.clustercontroller.core.database.DatabaseHandler;
+import com.yahoo.vespa.clustercontroller.core.database.ZooKeeperDatabaseFactory;
import com.yahoo.vespa.clustercontroller.core.hostinfo.HostInfo;
import com.yahoo.vespa.clustercontroller.core.listeners.*;
import com.yahoo.vespa.clustercontroller.core.rpc.RPCCommunicator;
@@ -25,7 +26,18 @@ import org.apache.commons.lang.exception.ExceptionUtils;
import java.io.FileNotFoundException;
-import java.util.*;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.TimeZone;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
import java.util.stream.Collectors;
@@ -54,7 +66,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
private AtomicBoolean running = new AtomicBoolean(true);
private FleetControllerOptions options;
private FleetControllerOptions nextOptions;
- private final List<SystemStateListener> systemStateListeners = new LinkedList<>();
+ private final List<SystemStateListener> systemStateListeners = new CopyOnWriteArrayList<>();
private boolean processingCycle = false;
private boolean wantedStateChanged = false;
private long cycleCount = 0;
@@ -185,7 +197,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
options.nodeStateRequestTimeoutEarliestPercentage,
options.nodeStateRequestTimeoutLatestPercentage,
options.nodeStateRequestRoundTripTimeMaxSeconds);
- DatabaseHandler database = new DatabaseHandler(timer, options.zooKeeperServerAddress, options.fleetControllerIndex, timer);
+ DatabaseHandler database = new DatabaseHandler(new ZooKeeperDatabaseFactory(), timer, options.zooKeeperServerAddress, options.fleetControllerIndex, timer);
NodeLookup lookUp = new SlobrokClient(timer);
StateChangeHandler stateGenerator = new StateChangeHandler(timer, log, metricUpdater);
SystemStateBroadcaster stateBroadcaster = new SystemStateBroadcaster(timer, timer);
@@ -219,6 +231,12 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
}
}
+ public ClusterStateBundle getClusterStateBundle() {
+ synchronized (monitor) {
+ return systemStateBroadcaster.getClusterStateBundle();
+ }
+ }
+
public void schedule(RemoteClusterControllerTask task) {
synchronized (monitor) {
log.fine("Scheduled remote task " + task.getClass().getName() + " for execution");
@@ -228,13 +246,13 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
/** Used for unit testing. */
public void addSystemStateListener(SystemStateListener listener) {
- synchronized (systemStateListeners) {
- systemStateListeners.add(listener);
- // Always give cluster state listeners the current state, in case acceptable state has come before listener is registered.
- com.yahoo.vdslib.state.ClusterState state = getSystemState();
- if (state == null) throw new NullPointerException("Cluster state should never be null at this point");
- listener.handleNewSystemState(ClusterStateBundle.ofBaselineOnly(AnnotatedClusterState.withoutAnnotations(state)));
+ systemStateListeners.add(listener);
+ // Always give cluster state listeners the current state, in case acceptable state has come before listener is registered.
+ com.yahoo.vdslib.state.ClusterState state = getSystemState();
+ if (state == null) {
+ throw new NullPointerException("Cluster state should never be null at this point");
}
+ listener.handleNewPublishedState(ClusterStateBundle.ofBaselineOnly(AnnotatedClusterState.withoutAnnotations(state)));
}
public FleetControllerOptions getOptions() {
@@ -351,7 +369,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
}
@Override
- public void handleNewSystemState(ClusterStateBundle stateBundle) {
+ public void handleNewPublishedState(ClusterStateBundle stateBundle) {
verifyInControllerThread();
ClusterState baselineState = stateBundle.getBaselineClusterState();
newStates.add(stateBundle);
@@ -361,13 +379,14 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
// nodes so that a cluster controller crash after publishing but before a successful
// ZK store will not risk reusing the same version number.
if (masterElectionHandler.isMaster()) {
- storeClusterStateVersionToZooKeeper(baselineState);
+ storeClusterStateMetaDataToZooKeeper(stateBundle);
}
}
- private void storeClusterStateVersionToZooKeeper(ClusterState state) {
+ private void storeClusterStateMetaDataToZooKeeper(ClusterStateBundle stateBundle) {
try {
- database.saveLatestSystemStateVersion(databaseContext, state.getVersion());
+ database.saveLatestSystemStateVersion(databaseContext, stateBundle.getVersion());
+ database.saveLatestClusterStateBundle(databaseContext, stateBundle);
} catch (InterruptedException e) {
// Rethrow as RuntimeException to propagate exception up to main thread method.
// Don't want to hide failures to write cluster state version.
@@ -654,6 +673,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
}
sentAny = systemStateBroadcaster.broadcastNewState(databaseContext, communicator);
if (sentAny) {
+ // FIXME won't this inhibit resending to unresponsive nodes?
nextStateSendTime = currentTime + options.minTimeBetweenNewSystemStates;
}
}
@@ -665,7 +685,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
synchronized (systemStateListeners) {
for (ClusterStateBundle stateBundle : newStates) {
for(SystemStateListener listener : systemStateListeners) {
- listener.handleNewSystemState(stateBundle);
+ listener.handleNewPublishedState(stateBundle);
}
}
newStates.clear();
@@ -782,12 +802,21 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
eventLog.add(new ClusterEvent(ClusterEvent.Type.MASTER_ELECTION, "This node just became node state gatherer as we are fleetcontroller master candidate.", timer.getCurrentTimeInMillis()));
// Update versions to use so what is shown is closer to what is reality on the master
stateVersionTracker.setVersionRetrievedFromZooKeeper(database.getLatestSystemStateVersion());
+ stateChangeHandler.setStateChangedFlag();
}
}
isStateGatherer = true;
return didWork;
}
+ private void invokeCandidateStateListeners(ClusterStateBundle candidateBundle) {
+ systemStateListeners.forEach(listener -> listener.handleNewCandidateState(candidateBundle));
+ }
+
+ private boolean hasPassedFirstStateBroadcastTimePoint(long timeNowMs) {
+ return timeNowMs >= firstAllowedStateBroadcast || cluster.allStatesReported();
+ }
+
private boolean recomputeClusterStateIfRequired() {
boolean stateWasChanged = false;
if (mustRecomputeCandidateClusterState()) {
@@ -800,16 +829,18 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
.stateDeriver(createBucketSpaceStateDeriver())
.deriveAndBuild();
stateVersionTracker.updateLatestCandidateStateBundle(candidateBundle);
+ invokeCandidateStateListeners(candidateBundle);
- if (stateVersionTracker.candidateChangedEnoughFromCurrentToWarrantPublish()
- || stateVersionTracker.hasReceivedNewVersionFromZooKeeper())
+ final long timeNowMs = timer.getCurrentTimeInMillis();
+ if (hasPassedFirstStateBroadcastTimePoint(timeNowMs)
+ && (stateVersionTracker.candidateChangedEnoughFromCurrentToWarrantPublish()
+ || stateVersionTracker.hasReceivedNewVersionFromZooKeeper()))
{
- final long timeNowMs = timer.getCurrentTimeInMillis();
final ClusterStateBundle before = stateVersionTracker.getVersionedClusterStateBundle();
stateVersionTracker.promoteCandidateToVersionedState(timeNowMs);
emitEventsForAlteredStateEdges(before, stateVersionTracker.getVersionedClusterStateBundle(), timeNowMs);
- handleNewSystemState(stateVersionTracker.getVersionedClusterStateBundle());
+ handleNewPublishedState(stateVersionTracker.getVersionedClusterStateBundle());
stateWasChanged = true;
}
}
@@ -893,10 +924,19 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
}
}
+ private boolean atFirstClusterStateSendTimeEdge() {
+ // We only care about triggering a state recomputation for the master, which is the only
+ // one allowed to actually broadcast any states.
+ if (!isMaster || systemStateBroadcaster.hasBroadcastedClusterStateBundle()) {
+ return false;
+ }
+ return hasPassedFirstStateBroadcastTimePoint(timer.getCurrentTimeInMillis());
+ }
+
private boolean mustRecomputeCandidateClusterState() {
return stateChangeHandler.stateMayHaveChanged()
- || stateVersionTracker.hasReceivedNewVersionFromZooKeeper()
- || stateVersionTracker.bucketSpaceMergeCompletionStateHasChanged();
+ || stateVersionTracker.bucketSpaceMergeCompletionStateHasChanged()
+ || atFirstClusterStateSendTimeEdge();
}
private boolean handleLeadershipEdgeTransitions() throws InterruptedException {
@@ -904,16 +944,25 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
if (masterElectionHandler.isMaster()) {
if ( ! isMaster) {
metricUpdater.becameMaster();
- // If we just became master, restore wanted states from database
+ // If we just became master, restore state from ZooKeeper
+ stateChangeHandler.setStateChangedFlag();
+ systemStateBroadcaster.resetBroadcastedClusterStateBundle();
+
stateVersionTracker.setVersionRetrievedFromZooKeeper(database.getLatestSystemStateVersion());
- didWork = database.loadStartTimestamps(cluster);
- didWork |= database.loadWantedStates(databaseContext);
+ ClusterStateBundle previousBundle = database.getLatestClusterStateBundle();
+ database.loadStartTimestamps(cluster);
+ database.loadWantedStates(databaseContext);
+
+ log.info(() -> String.format("Loaded previous cluster state bundle from ZooKeeper: %s", previousBundle));
+ stateVersionTracker.setClusterStateBundleRetrievedFromZooKeeper(previousBundle);
+
eventLog.add(new ClusterEvent(ClusterEvent.Type.MASTER_ELECTION, "This node just became fleetcontroller master. Bumped version to "
+ stateVersionTracker.getCurrentVersion() + " to be in line.", timer.getCurrentTimeInMillis()));
long currentTime = timer.getCurrentTimeInMillis();
firstAllowedStateBroadcast = currentTime + options.minTimeBeforeFirstSystemStateBroadcast;
log.log(LogLevel.DEBUG, "At time " + currentTime + " we set first system state broadcast time to be "
+ options.minTimeBeforeFirstSystemStateBroadcast + " ms after at time " + firstAllowedStateBroadcast + ".");
+ didWork = true;
}
isMaster = true;
if (wantedStateChanged) {
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/StateVersionTracker.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/StateVersionTracker.java
index 177efe31766..e2f98cf5492 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/StateVersionTracker.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/StateVersionTracker.java
@@ -50,6 +50,21 @@ public class StateVersionTracker {
this.lastZooKeeperVersion = this.currentVersion;
}
+ void setClusterStateBundleRetrievedFromZooKeeper(ClusterStateBundle bundle) {
+ // There is an edge where the bundle version will mismatch with the version set
+ // via setVersionRetrievedFromZooKeeper() if the controller (or ZK) crashes before
+ // it can write both sequentially. But since we use the ZK-written version explicitly
+ // when choosing a new version for our own published states, it should not matter in
+ // practice. Worst case is that the current state reflects the same version that a
+ // previous controller had, but we will never publish this state ourselves; publishing
+ // only happens after we've generated our own, new candidate state and overwritten
+ // the empty states set below. Publishing also, as mentioned, sets a version based on
+ // the ZK version, not the version stored in the bundle itself.
+ currentClusterState = bundle;
+ currentUnversionedState = ClusterStateBundle.empty();
+ latestCandidateState = ClusterStateBundle.empty();
+ }
+
/**
* Sets limit on how many cluster states can be kept in the in-memory queue. Once
* the list exceeds this limit, the oldest state is repeatedly removed until the limit
@@ -140,8 +155,7 @@ public class StateVersionTracker {
}
private void recordCurrentStateInHistoryAtTime(final long currentTimeMs) {
- clusterStateHistory.addFirst(new ClusterStateHistoryEntry(
- currentClusterState, currentTimeMs));
+ clusterStateHistory.addFirst(new ClusterStateHistoryEntry(currentClusterState, currentTimeMs));
while (clusterStateHistory.size() > maxHistoryEntryCount) {
clusterStateHistory.removeLast();
}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java
index b0869b560f6..629800fb13c 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java
@@ -40,6 +40,18 @@ public class SystemStateBroadcaster {
return clusterStateBundle.getBaselineClusterState();
}
+ public boolean hasBroadcastedClusterStateBundle() {
+ return clusterStateBundle != null;
+ }
+
+ public void resetBroadcastedClusterStateBundle() {
+ clusterStateBundle = null;
+ }
+
+ public ClusterStateBundle getClusterStateBundle() {
+ return clusterStateBundle;
+ }
+
private void reportNodeError(boolean nodeOk, NodeInfo info, String message) {
long time = timer.getCurrentTimeInMillis();
Long lastReported = lastErrorReported.get(info.getNode());
@@ -113,10 +125,10 @@ public class SystemStateBroadcaster {
void checkIfClusterStateIsAckedByAllDistributors(DatabaseHandler database,
DatabaseHandler.Context dbContext,
FleetController fleetController) throws InterruptedException {
- final int currentStateVersion = clusterStateBundle.getVersion();
- if ((clusterStateBundle == null) || (lastClusterStateInSync == currentStateVersion)) {
+ if ((clusterStateBundle == null) || (lastClusterStateInSync == clusterStateBundle.getVersion())) {
return; // Nothing to do for the current state
}
+ final int currentStateVersion = clusterStateBundle.getVersion();
boolean anyOutdatedDistributorNodes = dbContext.getCluster().getNodeInfo().stream()
.filter(NodeInfo::isDistributor)
.anyMatch(this::nodeNeedsClusterState);
@@ -137,7 +149,7 @@ public class SystemStateBroadcaster {
if (!baselineState.isOfficial()) {
log.log(LogLevel.INFO, String.format("Publishing cluster state version %d", baselineState.getVersion()));
- baselineState.setOfficial(true);
+ baselineState.setOfficial(true); // FIXME this violates state bundle immutability
}
List<NodeInfo> recipients = resolveStateVersionSendSet(dbContext);
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/Database.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/Database.java
index ac9f1473b57..c98e362c2d2 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/Database.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/Database.java
@@ -3,6 +3,7 @@ package com.yahoo.vespa.clustercontroller.core.database;
import com.yahoo.vdslib.state.Node;
import com.yahoo.vdslib.state.NodeState;
+import com.yahoo.vespa.clustercontroller.core.ClusterStateBundle;
import java.util.Map;
@@ -82,4 +83,9 @@ public abstract class Database {
* Fetch the start times of distributor and service layer nodes.
*/
public abstract Map<Node, Long> retrieveStartTimestamps() throws InterruptedException;
+
+ public abstract boolean storeLastPublishedStateBundle(ClusterStateBundle stateBundle) throws InterruptedException;
+
+ public abstract ClusterStateBundle retrieveLastPublishedStateBundle() throws InterruptedException;
+
}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/DatabaseFactory.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/DatabaseFactory.java
new file mode 100644
index 00000000000..3e7b98fe3fb
--- /dev/null
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/DatabaseFactory.java
@@ -0,0 +1,34 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.clustercontroller.core.database;
+
+import com.yahoo.vespa.clustercontroller.core.ContentCluster;
+
+/**
+ * Database factory to enable test mocking of DB features. In practice, this
+ * will always be {@link ZooKeeperDatabase} due to rather heavy ZK feature
+ * dependencies and leaky abstractions built on top of them.
+ */
+public interface DatabaseFactory {
+
+ class Params {
+ ContentCluster cluster;
+ int nodeIndex;
+ String dbAddress;
+ int dbSessionTimeout;
+ Database.DatabaseListener listener;
+
+ Params cluster(ContentCluster c) { this.cluster = c; return this; }
+ ContentCluster cluster() { return cluster; }
+ Params nodeIndex(int i) { this.nodeIndex = i; return this; }
+ int nodeIndex() { return nodeIndex; }
+ Params databaseAddress(String address) { this.dbAddress = address; return this; }
+ String databaseAddress() { return dbAddress; }
+ Params databaseSessionTimeout(int timeout) { this.dbSessionTimeout = timeout; return this; }
+ int databaseSessionTimeout() { return dbSessionTimeout; }
+ Params databaseListener(Database.DatabaseListener listener) { this.listener = listener; return this; }
+ Database.DatabaseListener databaseListener() { return listener; }
+ }
+
+ Database create(Params params) throws Exception;
+
+}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/DatabaseHandler.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/DatabaseHandler.java
index c89bb6a136a..fcf5b58e62d 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/DatabaseHandler.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/DatabaseHandler.java
@@ -5,6 +5,8 @@ import com.yahoo.log.LogLevel;
import com.yahoo.vdslib.state.Node;
import com.yahoo.vdslib.state.NodeState;
import com.yahoo.vdslib.state.State;
+import com.yahoo.vespa.clustercontroller.core.AnnotatedClusterState;
+import com.yahoo.vespa.clustercontroller.core.ClusterStateBundle;
import com.yahoo.vespa.clustercontroller.core.FleetController;
import com.yahoo.vespa.clustercontroller.core.NodeInfo;
import com.yahoo.vespa.clustercontroller.core.Timer;
@@ -39,12 +41,14 @@ public class DatabaseHandler {
Integer lastSystemStateVersion;
Map<Node, NodeState> wantedStates;
Map<Node, Long> startTimestamps;
+ ClusterStateBundle clusterStateBundle;
void clear() {
masterVote = null;
lastSystemStateVersion = null;
wantedStates = null;
startTimestamps = null;
+ clusterStateBundle = null;
}
}
private class DatabaseListener implements Database.DatabaseListener {
@@ -68,15 +72,13 @@ public class DatabaseHandler {
}
}
+ private final DatabaseFactory databaseFactory;
private final Timer timer;
private final int nodeIndex;
private final Object monitor;
private String zooKeeperAddress;
private int zooKeeperSessionTimeout = 5000;
private final Object databaseMonitor = new Object();
-
- /** This is always ZooKeeperDatabase */
- // TODO: Get rid of the interface as it is both unnecessary and gives a false impression of independence
private Database database;
private DatabaseListener dbListener = new DatabaseListener();
@@ -87,8 +89,9 @@ public class DatabaseHandler {
private boolean lostZooKeeperConnectionEvent = false;
private Map<Integer, Integer> masterDataEvent = null;
- public DatabaseHandler(Timer timer, String zooKeeperAddress, int ourIndex, Object monitor) throws InterruptedException
+ public DatabaseHandler(DatabaseFactory databaseFactory, Timer timer, String zooKeeperAddress, int ourIndex, Object monitor) throws InterruptedException
{
+ this.databaseFactory = databaseFactory;
this.timer = timer;
this.nodeIndex = ourIndex;
pendingStore.masterVote = ourIndex; // To begin with we'll vote for ourselves.
@@ -166,8 +169,13 @@ public class DatabaseHandler {
clearSessionMetaData();
log.log(LogLevel.INFO,
"Fleetcontroller " + nodeIndex + ": Setting up new ZooKeeper session at " + zooKeeperAddress);
- database = new ZooKeeperDatabase(cluster,
- nodeIndex, zooKeeperAddress, zooKeeperSessionTimeout, dbListener);
+ DatabaseFactory.Params params = new DatabaseFactory.Params()
+ .cluster(cluster)
+ .nodeIndex(nodeIndex)
+ .databaseAddress(zooKeeperAddress)
+ .databaseSessionTimeout(zooKeeperSessionTimeout)
+ .databaseListener(dbListener);
+ database = databaseFactory.create(params);
}
} catch (KeeperException.NodeExistsException e) {
log.log(LogLevel.DEBUG, "Fleetcontroller " + nodeIndex + ": Cannot create ephemeral fleetcontroller node. ZooKeeper server "
@@ -279,6 +287,15 @@ public class DatabaseHandler {
return didWork;
}
}
+ if (pendingStore.clusterStateBundle != null) {
+ didWork = true;
+ if (database.storeLastPublishedStateBundle(pendingStore.clusterStateBundle)) {
+ currentlyStored.clusterStateBundle = pendingStore.clusterStateBundle;
+ pendingStore.clusterStateBundle = null;
+ } else {
+ return true;
+ }
+ }
}
return didWork;
}
@@ -325,11 +342,28 @@ public class DatabaseHandler {
if (usingZooKeeper()) {
log.log(LogLevel.WARNING, "Fleetcontroller " + nodeIndex + ": Failed to retrieve latest system state version from ZooKeeper. Returning version 0.");
}
- return 0;
+ return 0; // FIXME "fail-oblivious" is not a good error handling mode for such a critical component!
}
return version;
}
+ public void saveLatestClusterStateBundle(Context context, ClusterStateBundle clusterStateBundle) throws InterruptedException {
+ log.log(LogLevel.DEBUG, () -> String.format("Fleetcontroller %d: Scheduling bundle %s to be saved to ZooKeeper", nodeIndex, clusterStateBundle));
+ pendingStore.clusterStateBundle = clusterStateBundle;
+ doNextZooKeeperTask(context);
+ }
+
+ public ClusterStateBundle getLatestClusterStateBundle() throws InterruptedException {
+ log.log(LogLevel.DEBUG, () -> String.format("Fleetcontroller %d: Retrieving latest cluster state bundle from ZooKeeper", nodeIndex));
+ synchronized (databaseMonitor) {
+ if (database != null && !database.isClosed()) {
+ return database.retrieveLastPublishedStateBundle();
+ } else {
+ return ClusterStateBundle.empty();
+ }
+ }
+ }
+
public void saveWantedStates(Context context) throws InterruptedException {
log.log(LogLevel.DEBUG, "Fleetcontroller " + nodeIndex + ": Checking whether wanted states have changed compared to zookeeper version.");
Map<Node, NodeState> wantedStates = new TreeMap<>();
@@ -399,7 +433,9 @@ public class DatabaseHandler {
public boolean loadStartTimestamps(ContentCluster cluster) throws InterruptedException {
log.log(LogLevel.DEBUG, "Fleetcontroller " + nodeIndex + ": Retrieving start timestamps");
synchronized (databaseMonitor) {
- if (database == null || database.isClosed()) return false;
+ if (database == null || database.isClosed()) {
+ return false;
+ }
currentlyStored.startTimestamps = database.retrieveStartTimestamps();
}
Map<Node, Long> startTimestamps = currentlyStored.startTimestamps;
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/ZooKeeperDatabase.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/ZooKeeperDatabase.java
index 5399430fd59..fe1d9a2a5cd 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/ZooKeeperDatabase.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/ZooKeeperDatabase.java
@@ -1,7 +1,11 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.clustercontroller.core.database;
+import com.yahoo.vespa.clustercontroller.core.AnnotatedClusterState;
+import com.yahoo.vespa.clustercontroller.core.ClusterStateBundle;
import com.yahoo.vespa.clustercontroller.core.ContentCluster;
+import com.yahoo.vespa.clustercontroller.core.rpc.EnvelopedClusterStateBundleCodec;
+import com.yahoo.vespa.clustercontroller.core.rpc.SlimeClusterStateBundleCodec;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.data.ACL;
@@ -128,6 +132,7 @@ public class ZooKeeperDatabase extends Database {
createNode(zooKeeperRoot, "wantedstates", new byte[0]);
createNode(zooKeeperRoot, "starttimestamps", new byte[0]);
createNode(zooKeeperRoot, "latestversion", new Integer(0).toString().getBytes(utf8));
+ createNode(zooKeeperRoot, "published_state_bundle", new byte[0]); // TODO dedupe string constants
byte val[] = String.valueOf(nodeIndex).getBytes(utf8);
deleteNodeIfExists(getMyIndexPath());
log.log(LogLevel.INFO, "Fleetcontroller " + nodeIndex +
@@ -165,6 +170,15 @@ public class ZooKeeperDatabase extends Database {
return (!sessionOpen || watcher.getState().equals(Watcher.Event.KeeperState.Expired));
}
+ private void maybeLogExceptionWarning(Exception e, String message) {
+ if (sessionOpen && reportErrors) {
+ StringWriter sw = new StringWriter();
+ e.printStackTrace(new PrintWriter(sw));
+ log.log(LogLevel.WARNING, String.format("Fleetcontroller %s: %s. Exception: %s\n%s",
+ nodeIndex, message, e.getMessage(), sw.toString()));
+ }
+ }
+
public boolean storeMasterVote(int wantedMasterIndex) throws InterruptedException {
byte val[] = String.valueOf(wantedMasterIndex).getBytes(utf8);
try{
@@ -174,11 +188,7 @@ public class ZooKeeperDatabase extends Database {
} catch (InterruptedException e) {
throw (InterruptedException) new InterruptedException("Interrupted").initCause(e);
} catch (Exception e) {
- if (sessionOpen && reportErrors) {
- StringWriter sw = new StringWriter();
- e.printStackTrace(new PrintWriter(sw));
- log.log(LogLevel.WARNING, "Fleetcontroller " + nodeIndex + ": Failed to create our ephemeral node and store master vote:\n" + sw);
- }
+ maybeLogExceptionWarning(e, "Failed to create our ephemeral node and store master vote");
}
return false;
}
@@ -191,11 +201,7 @@ public class ZooKeeperDatabase extends Database {
} catch (InterruptedException e) {
throw (InterruptedException) new InterruptedException("Interrupted").initCause(e);
} catch (Exception e) {
- if (sessionOpen && reportErrors) {
- StringWriter sw = new StringWriter();
- e.printStackTrace(new PrintWriter(sw));
- log.log(LogLevel.WARNING, "Fleetcontroller " + nodeIndex + ": Failed to store latest system state version used " + version + "\n" + sw);
- }
+ maybeLogExceptionWarning(e, "Failed to store latest system state version used " + version);
return false;
}
}
@@ -211,11 +217,7 @@ public class ZooKeeperDatabase extends Database {
} catch (InterruptedException e) {
throw (InterruptedException) new InterruptedException("Interrupted").initCause(e);
} catch (Exception e) {
- if (sessionOpen && reportErrors) {
- StringWriter sw = new StringWriter();
- e.printStackTrace(new PrintWriter(sw));
- log.log(LogLevel.WARNING, "Fleetcontroller " + nodeIndex + ": Failed to retrieve latest system state version used. Returning null.\n" + sw);
- }
+ maybeLogExceptionWarning(e, "Failed to retrieve latest system state version used. Returning null");
return null;
}
}
@@ -242,11 +244,7 @@ public class ZooKeeperDatabase extends Database {
} catch (InterruptedException e) {
throw (InterruptedException) new InterruptedException("Interrupted").initCause(e);
} catch (Exception e) {
- if (sessionOpen && reportErrors) {
- StringWriter sw = new StringWriter();
- e.printStackTrace(new PrintWriter(sw));
- log.log(LogLevel.WARNING, "Fleetcontroller " + nodeIndex + ": Failed to store wanted states in zookeeper: " + e.getMessage() + "\n" + sw);
- }
+ maybeLogExceptionWarning(e, "Failed to store wanted states in ZooKeeper");
return false;
}
}
@@ -276,11 +274,7 @@ public class ZooKeeperDatabase extends Database {
} catch (InterruptedException e) {
throw (InterruptedException) new InterruptedException("Interrupted").initCause(e);
} catch (Exception e) {
- if (sessionOpen && reportErrors) {
- StringWriter sw = new StringWriter();
- e.printStackTrace(new PrintWriter(sw));
- log.log(LogLevel.WARNING, "Fleetcontroller " + nodeIndex + ": Failed to retrieve wanted states from zookeeper: " + e.getMessage() + "\n" + sw);
- }
+ maybeLogExceptionWarning(e, "Failed to retrieve wanted states from ZooKeeper");
return null;
}
}
@@ -301,11 +295,7 @@ public class ZooKeeperDatabase extends Database {
} catch (InterruptedException e) {
throw (InterruptedException) new InterruptedException("Interrupted").initCause(e);
} catch (Exception e) {
- if (sessionOpen && reportErrors) {
- StringWriter sw = new StringWriter();
- e.printStackTrace(new PrintWriter(sw));
- log.log(LogLevel.WARNING, "Fleetcontroller " + nodeIndex + ": Failed to store start timestamps in zookeeper: " + e.getMessage() + "\n" + sw);
- }
+ maybeLogExceptionWarning(e, "Failed to store start timestamps in ZooKeeper");
return false;
}
}
@@ -336,12 +326,45 @@ public class ZooKeeperDatabase extends Database {
} catch (InterruptedException e) {
throw (InterruptedException) new InterruptedException("Interrupted").initCause(e);
} catch (Exception e) {
- if (sessionOpen && reportErrors) {
- StringWriter sw = new StringWriter();
- e.printStackTrace(new PrintWriter(sw));
- log.log(LogLevel.WARNING, "Fleetcontroller " + nodeIndex + ": Failed to retrieve start timestamps from zookeeper: " + e.getMessage() + "\n" + sw);
- }
+ maybeLogExceptionWarning(e, "Failed to retrieve start timestamps from ZooKeeper");
return null;
}
}
+
+ @Override
+ public boolean storeLastPublishedStateBundle(ClusterStateBundle stateBundle) throws InterruptedException {
+ EnvelopedClusterStateBundleCodec envelopedBundleCodec = new SlimeClusterStateBundleCodec();
+ byte[] encodedBundle = envelopedBundleCodec.encodeWithEnvelope(stateBundle);
+ try{
+ log.log(LogLevel.DEBUG, () -> String.format("Fleetcontroller %d: Storing published state bundle %s at '%spublished_state_bundle'",
+ nodeIndex, stateBundle, zooKeeperRoot));
+ // TODO CAS on expected zknode version
+ session.setData(zooKeeperRoot + "published_state_bundle", encodedBundle, -1);
+ } catch (InterruptedException e) {
+ throw (InterruptedException) new InterruptedException("Interrupted").initCause(e);
+ } catch (Exception e) {
+ maybeLogExceptionWarning(e, "Failed to store start timestamps in ZooKeeper");
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public ClusterStateBundle retrieveLastPublishedStateBundle() throws InterruptedException {
+ Stat stat = new Stat();
+ try {
+ byte[] data = session.getData(zooKeeperRoot + "published_state_bundle", false, stat);
+ if (data != null && data.length != 0) {
+ EnvelopedClusterStateBundleCodec envelopedBundleCodec = new SlimeClusterStateBundleCodec();
+ return envelopedBundleCodec.decodeWithEnvelope(data);
+ }
+ } catch (InterruptedException e) {
+ throw (InterruptedException) new InterruptedException("Interrupted").initCause(e);
+ } catch (Exception e) {
+ maybeLogExceptionWarning(e, "Failed to retrieve last published cluster state bundle from " +
+ "ZooKeeper, will use an empty state as baseline");
+ }
+ return ClusterStateBundle.ofBaselineOnly(AnnotatedClusterState.emptyState());
+ }
+
}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/ZooKeeperDatabaseFactory.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/ZooKeeperDatabaseFactory.java
new file mode 100644
index 00000000000..64dfcccebc9
--- /dev/null
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/ZooKeeperDatabaseFactory.java
@@ -0,0 +1,14 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.clustercontroller.core.database;
+
+import com.yahoo.vespa.clustercontroller.core.ContentCluster;
+
+public class ZooKeeperDatabaseFactory implements DatabaseFactory {
+
+ @Override
+ public Database create(Params params) throws Exception {
+ return new ZooKeeperDatabase(params.cluster, params.nodeIndex, params.dbAddress,
+ params.dbSessionTimeout, params.listener);
+ }
+
+}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/listeners/SystemStateListener.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/listeners/SystemStateListener.java
index 40fe91f0c1e..764bb3a0d92 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/listeners/SystemStateListener.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/listeners/SystemStateListener.java
@@ -5,6 +5,9 @@ import com.yahoo.vespa.clustercontroller.core.ClusterStateBundle;
public interface SystemStateListener {
- void handleNewSystemState(ClusterStateBundle states);
+ // TODO consider rename to bundle
+ void handleNewPublishedState(ClusterStateBundle states);
+
+ default void handleNewCandidateState(ClusterStateBundle states) {}
}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/EnvelopedClusterStateBundleCodec.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/EnvelopedClusterStateBundleCodec.java
new file mode 100644
index 00000000000..ab98952cddb
--- /dev/null
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/EnvelopedClusterStateBundleCodec.java
@@ -0,0 +1,19 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.clustercontroller.core.rpc;
+
+import com.yahoo.vespa.clustercontroller.core.ClusterStateBundle;
+
+/**
+ * Cluster state bundle codec which opaquely encodes/decodes both the bundle
+ * as well as the metadata required to correctly perform compression/decompression.
+ *
+ * Useful for embedding an opaque bundle blob somewhere without needing to care aboout
+ * any of the associated metadata.
+ */
+public interface EnvelopedClusterStateBundleCodec {
+
+ byte[] encodeWithEnvelope(ClusterStateBundle stateBundle);
+
+ ClusterStateBundle decodeWithEnvelope(byte[] encodedClusterStateBundle);
+
+}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/SlimeClusterStateBundleCodec.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/SlimeClusterStateBundleCodec.java
index f6034cb34ff..1c391f9aacf 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/SlimeClusterStateBundleCodec.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/SlimeClusterStateBundleCodec.java
@@ -18,8 +18,11 @@ import java.util.Map;
*
* LZ4 compression is transparently applied during encoding and decompression is
* subsequently applied during decoding.
+ *
+ * Implements optional Slime-based enveloping for *WithEnvelope methods, which removes
+ * need to explicitly track compression metadata by the caller.
*/
-public class SlimeClusterStateBundleCodec implements ClusterStateBundleCodec {
+public class SlimeClusterStateBundleCodec implements ClusterStateBundleCodec, EnvelopedClusterStateBundleCodec {
private static final Compressor compressor = new Compressor(CompressionType.LZ4, 3, 0.90, 1024);
@@ -55,4 +58,30 @@ public class SlimeClusterStateBundleCodec implements ClusterStateBundleCodec {
return ClusterStateBundle.of(AnnotatedClusterState.withoutAnnotations(baseline), derivedStates);
}
+
+ // Technically the Slime enveloping could be its own class that is bundle codec independent, but
+ // realistically there won't be any other implementations. Can be trivially factored out if required.
+ @Override
+ public byte[] encodeWithEnvelope(ClusterStateBundle stateBundle) {
+ EncodedClusterStateBundle toEnvelope = encode(stateBundle);
+
+ Slime slime = new Slime();
+ Cursor root = slime.setObject();
+ root.setLong("compression-type", toEnvelope.getCompression().type().getCode());
+ root.setLong("uncompressed-size", toEnvelope.getCompression().uncompressedSize());
+ root.setData("data", toEnvelope.getCompression().data());
+ return BinaryFormat.encode(slime);
+ }
+
+ @Override
+ public ClusterStateBundle decodeWithEnvelope(byte[] encodedClusterStateBundle) {
+ // We expect ZK writes to be atomic, letting us not worry about partially written or corrupted blobs.
+ Slime slime = BinaryFormat.decode(encodedClusterStateBundle);
+ Inspector root = slime.get();
+ CompressionType compressionType = CompressionType.valueOf((byte)root.field("compression-type").asLong());
+ int uncompressedSize = (int)root.field("uncompressed-size").asLong();
+ byte[] data = root.field("data").asData();
+ return decode(EncodedClusterStateBundle.fromCompressionBuffer(
+ new Compressor.Compression(compressionType, uncompressedSize, data)));
+ }
}
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DatabaseHandlerTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DatabaseHandlerTest.java
new file mode 100644
index 00000000000..f929cd0a605
--- /dev/null
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DatabaseHandlerTest.java
@@ -0,0 +1,107 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.clustercontroller.core;
+
+import com.yahoo.vespa.clustercontroller.core.database.Database;
+import com.yahoo.vespa.clustercontroller.core.database.DatabaseFactory;
+import com.yahoo.vespa.clustercontroller.core.database.DatabaseHandler;
+import com.yahoo.vespa.clustercontroller.core.listeners.NodeAddedOrRemovedListener;
+import com.yahoo.vespa.clustercontroller.core.listeners.NodeStateOrHostInfoChangeHandler;
+import org.junit.Test;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class DatabaseHandlerTest {
+
+ static class Fixture {
+ final ClusterFixture clusterFixture = ClusterFixture.forFlatCluster(10);
+ final FleetController mockController = mock(FleetController.class);
+ final Database mockDatabase = mock(Database.class);
+ final Timer mockTimer = mock(Timer.class);
+ final DatabaseFactory mockDbFactory = (params) -> mockDatabase;
+ final String databaseAddress = "localhost:0";
+ final Object monitor = new Object();
+ final ClusterStateBundle dummyBundle;
+
+ Fixture() throws Exception {
+ dummyBundle = ClusterStateBundleUtil.makeBundle("distributor:2 storage:2",
+ StateMapping.of("default", "distributor:2 storage:2 .0.s:d"),
+ StateMapping.of("upsidedown", "distributor:2 .0.s:d storage:2"));
+
+ when(mockDatabase.isClosed()).thenReturn(false);
+ when(mockDatabase.storeMasterVote(anyInt())).thenReturn(true);
+ when(mockDatabase.storeLastPublishedStateBundle(any())).thenReturn(true);
+ when(mockTimer.getCurrentTimeInMillis()).thenReturn(1000000L);
+ }
+
+ DatabaseHandler.Context createMockContext() {
+ return new DatabaseHandler.Context() {
+ @Override
+ public ContentCluster getCluster() {
+ return clusterFixture.cluster();
+ }
+
+ @Override
+ public FleetController getFleetController() {
+ return mockController;
+ }
+
+ @Override
+ public NodeAddedOrRemovedListener getNodeAddedOrRemovedListener() {
+ return null;
+ }
+
+ @Override
+ public NodeStateOrHostInfoChangeHandler getNodeStateUpdateListener() {
+ return null;
+ }
+ };
+ }
+
+ DatabaseHandler createHandler() throws Exception {
+ return new DatabaseHandler(mockDbFactory, mockTimer, databaseAddress, 0, monitor);
+ }
+ }
+
+ @Test
+ public void can_store_latest_cluster_state_bundle() throws Exception {
+ Fixture f = new Fixture();
+ DatabaseHandler handler = f.createHandler();
+ handler.doNextZooKeeperTask(f.createMockContext()); // Database setup step
+ handler.saveLatestClusterStateBundle(f.createMockContext(), f.dummyBundle);
+ handler.doNextZooKeeperTask(f.createMockContext()); // Bundle store step
+
+ verify(f.mockDatabase).storeLastPublishedStateBundle(eq(f.dummyBundle));
+ }
+
+ @Test
+ public void can_load_latest_cluster_state_bundle() throws Exception {
+ Fixture f = new Fixture();
+ DatabaseHandler handler = f.createHandler();
+ handler.doNextZooKeeperTask(f.createMockContext()); // Database setup step
+
+ when(f.mockDatabase.retrieveLastPublishedStateBundle()).thenReturn(f.dummyBundle);
+
+ ClusterStateBundle retrievedBundle = handler.getLatestClusterStateBundle();
+ assertThat(retrievedBundle, equalTo(f.dummyBundle));
+ }
+
+ // FIXME I don't like the semantics of this, but it mirrors the legacy behavior for the
+ // rest of the DB load operations exposed by the DatabaseHandler.
+ @Test
+ public void empty_bundle_is_returned_if_no_db_connection() throws Exception {
+ Fixture f = new Fixture();
+ DatabaseHandler handler = f.createHandler();
+ // Note: no DB setup step
+
+ ClusterStateBundle retrievedBundle = handler.getLatestClusterStateBundle();
+ assertThat(retrievedBundle, equalTo(ClusterStateBundle.empty()));
+ }
+
+}
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java
index a2da6bd1c6c..6d59a672e86 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java
@@ -1,8 +1,6 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.clustercontroller.core;
-import com.yahoo.compress.CompressionType;
-import com.yahoo.compress.Compressor;
import com.yahoo.jrt.*;
import com.yahoo.jrt.StringValue;
import com.yahoo.jrt.slobrok.api.BackOffPolicy;
@@ -10,15 +8,13 @@ import com.yahoo.jrt.slobrok.api.Register;
import com.yahoo.jrt.slobrok.api.SlobrokList;
import com.yahoo.log.LogLevel;
import com.yahoo.vdslib.state.*;
-import com.yahoo.vespa.clustercontroller.core.rpc.EncodedClusterStateBundle;
import com.yahoo.vespa.clustercontroller.core.rpc.RPCCommunicator;
import com.yahoo.vespa.clustercontroller.core.rpc.RPCUtil;
-import com.yahoo.vespa.clustercontroller.core.rpc.SlimeClusterStateBundleCodec;
-import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.*;
import java.util.logging.Logger;
+import java.util.stream.Collectors;
/**
*
@@ -34,6 +30,7 @@ public class DummyVdsNode {
private NodeType type;
private int index;
private NodeState nodeState;
+ private String hostInfo = "{}";
private Supervisor supervisor;
private Acceptor acceptor;
private Register register;
@@ -67,10 +64,10 @@ public class DummyVdsNode {
private final List<Req> waitingRequests = new LinkedList<>();
/**
- * History of received system states.
+ * History of received cluster states.
* Any access to this list or to its members must be synchronized on the timer variable.
*/
- private List<ClusterState> systemState = new LinkedList<>();
+ private List<ClusterStateBundle> clusterStateBundles = new LinkedList<>();
private Thread messageResponder = new Thread() {
public void run() {
@@ -216,8 +213,10 @@ public class DummyVdsNode {
/** Returns the latest system state version received, or empty if none are received yet. */
private Optional<Integer> getLatestSystemStateVersion() {
synchronized(timer) {
- if (systemState.isEmpty()) return Optional.empty();
- return Optional.of(systemState.get(0).getVersion());
+ if (clusterStateBundles.isEmpty()) {
+ return Optional.empty();
+ }
+ return Optional.of(clusterStateBundles.get(0).getVersion());
}
}
@@ -260,7 +259,7 @@ public class DummyVdsNode {
log.log(LogLevel.DEBUG, "Dummy node " + this + " answering pending node state request.");
req.request.returnValues().add(new StringValue(nodeState.serialize()));
if (req.request.methodName().equals("getnodestate3")) {
- req.request.returnValues().add(new StringValue("Dummy node host info"));
+ req.request.returnValues().add(new StringValue(hostInfo));
}
req.request.returnRequest();
++setNodeStateReplies;
@@ -268,14 +267,19 @@ public class DummyVdsNode {
waitingRequests.clear();
}
- public void setNodeState(NodeState state) {
+ public void setNodeState(NodeState state, String hostInfo) {
log.log(LogLevel.DEBUG, "Dummy node " + this + " got new state: " + state);
synchronized(timer) {
this.nodeState = state;
+ this.hostInfo = hostInfo;
replyToPendingNodeStateRequests();
}
}
+ public void setNodeState(NodeState state) {
+ setNodeState(state, "{}");
+ }
+
public void setNodeState(State state) {
setNodeState(new NodeState(type, state));
}
@@ -287,16 +291,22 @@ public class DummyVdsNode {
}
public List<ClusterState> getSystemStatesReceived() {
- List<ClusterState> states = new ArrayList<>();
synchronized(timer) {
- states.addAll(systemState);
+ return clusterStateBundles.stream()
+ .map(ClusterStateBundle::getBaselineClusterState)
+ .collect(Collectors.toList());
+ }
+ }
+
+ public ClusterStateBundle getClusterStateBundle() {
+ synchronized(timer) {
+ return (clusterStateBundles.isEmpty() ? null : clusterStateBundles.get(0));
}
- return states;
}
public ClusterState getClusterState() {
synchronized(timer) {
- return (systemState.isEmpty() ? null : systemState.get(0));
+ return (clusterStateBundles.isEmpty() ? null : clusterStateBundles.get(0).getBaselineClusterState());
}
}
@@ -482,7 +492,7 @@ public class DummyVdsNode {
ClusterState newState = new ClusterState(req.parameters().get(0).asString());
synchronized(timer) {
updateStartTimestamps(newState);
- systemState.add(0, newState);
+ clusterStateBundles.add(0, ClusterStateBundle.ofBaselineOnly(AnnotatedClusterState.withoutAnnotations(newState)));
timer.notifyAll();
}
req.returnValues().add(new Int32Value(1));
@@ -505,7 +515,7 @@ public class DummyVdsNode {
ClusterState newState = new ClusterState(req.parameters().get(0).asString());
synchronized(timer) {
updateStartTimestamps(newState);
- systemState.add(0, newState);
+ clusterStateBundles.add(0, ClusterStateBundle.ofBaselineOnly(AnnotatedClusterState.withoutAnnotations(newState)));
timer.notifyAll();
}
log.log(LogLevel.DEBUG, "Dummy node " + this + ": Got new system state " + newState);
@@ -523,13 +533,12 @@ public class DummyVdsNode {
return;
}
ClusterStateBundle stateBundle = RPCUtil.decodeStateBundleFromSetDistributionStatesRequest(req);
- ClusterState newState = stateBundle.getBaselineClusterState();
synchronized(timer) {
- updateStartTimestamps(newState);
- systemState.add(0, newState);
+ updateStartTimestamps(stateBundle.getBaselineClusterState());
+ clusterStateBundles.add(0, stateBundle);
timer.notifyAll();
}
- log.log(LogLevel.DEBUG, "Dummy node " + this + ": Got new cluster state " + newState);
+ log.log(LogLevel.DEBUG, "Dummy node " + this + ": Got new cluster state " + stateBundle);
} catch (Exception e) {
log.log(LogLevel.ERROR, "Dummy node " + this + ": An error occured when answering setdistributionstates request: " + e.getMessage());
e.printStackTrace(System.err);
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/FleetControllerTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/FleetControllerTest.java
index 19d87b878d4..21d9b0a7a1f 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/FleetControllerTest.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/FleetControllerTest.java
@@ -18,6 +18,7 @@ import com.yahoo.vdslib.state.NodeState;
import com.yahoo.vdslib.state.NodeType;
import com.yahoo.vdslib.state.State;
import com.yahoo.vespa.clustercontroller.core.database.DatabaseHandler;
+import com.yahoo.vespa.clustercontroller.core.database.ZooKeeperDatabaseFactory;
import com.yahoo.vespa.clustercontroller.core.rpc.RPCCommunicator;
import com.yahoo.vespa.clustercontroller.core.rpc.RpcServer;
import com.yahoo.vespa.clustercontroller.core.rpc.SlobrokClient;
@@ -157,7 +158,7 @@ public abstract class FleetControllerTest implements Waiter {
status = new StatusPageServer(timer, timer, options.httpPort);
}
RpcServer rpcServer = new RpcServer(timer, timer, options.clusterName, options.fleetControllerIndex, options.slobrokBackOffPolicy);
- DatabaseHandler database = new DatabaseHandler(timer, options.zooKeeperServerAddress, options.fleetControllerIndex, timer);
+ DatabaseHandler database = new DatabaseHandler(new ZooKeeperDatabaseFactory(), timer, options.zooKeeperServerAddress, options.fleetControllerIndex, timer);
StateChangeHandler stateGenerator = new StateChangeHandler(timer, log, metricUpdater);
SystemStateBroadcaster stateBroadcaster = new SystemStateBroadcaster(timer, timer);
MasterElectionHandler masterElectionHandler = new MasterElectionHandler(options.fleetControllerIndex, options.fleetControllerCount, timer, timer);
@@ -311,6 +312,8 @@ public abstract class FleetControllerTest implements Waiter {
public ClusterState waitForStableSystem() throws Exception { return waiter.waitForStableSystem(); }
public ClusterState waitForStableSystem(int nodeCount) throws Exception { return waiter.waitForStableSystem(nodeCount); }
public ClusterState waitForState(String state) throws Exception { return waiter.waitForState(state); }
+ public ClusterState waitForStateInAllSpaces(String state) throws Exception { return waiter.waitForStateInAllSpaces(state); }
+ public ClusterState waitForStateInSpace(String space, String state) throws Exception { return waiter.waitForStateInSpace(space, state); }
public ClusterState waitForState(String state, int timeoutMS) throws Exception { return waiter.waitForState(state, timeoutMS); }
public ClusterState waitForInitProgressPassed(Node n, double progress) { return waiter.waitForInitProgressPassed(n, progress); }
public ClusterState waitForClusterStateIncludingNodesWithMinUsedBits(int bitcount, int nodecount) { return waiter.waitForClusterStateIncludingNodesWithMinUsedBits(bitcount, nodecount); }
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MasterElectionTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MasterElectionTest.java
index dc79a92e68b..b2464e83f95 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MasterElectionTest.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MasterElectionTest.java
@@ -9,6 +9,8 @@ import com.yahoo.jrt.Transport;
import com.yahoo.jrt.slobrok.server.Slobrok;
import com.yahoo.log.LogLevel;
import com.yahoo.vdslib.state.ClusterState;
+import com.yahoo.vdslib.state.NodeState;
+import com.yahoo.vdslib.state.NodeType;
import com.yahoo.vdslib.state.State;
import org.junit.Ignore;
import org.junit.Rule;
@@ -483,7 +485,7 @@ public class MasterElectionTest extends FleetControllerTest {
// no guarantees that it has been published to any nodes yet.
final long preElectionVersionNumber = fleetControllers.get(0).getSystemState().getVersion();
- // Nuke controller 1, leaving controller 1 in charge.
+ // Nuke controller 0, leaving controller 1 in charge.
// It should have observed the most recently written version number and increase this
// number before publishing its own new state.
fleetControllers.get(0).shutdown();
@@ -495,4 +497,47 @@ public class MasterElectionTest extends FleetControllerTest {
assertThat(postElectionVersionNumber, greaterThan(preElectionVersionNumber));
}
+ @Test
+ public void previously_published_state_is_taken_into_account_for_default_space_when_controller_bootstraps() throws Exception {
+ startingTest("MasterElectionTest::previously_published_state_is_taken_into_account_for_default_space_when_controller_bootstraps");
+ FleetControllerOptions options = new FleetControllerOptions("mycluster");
+ options.enableMultipleBucketSpaces = true;
+ options.masterZooKeeperCooldownPeriod = 1;
+ options.minTimeBeforeFirstSystemStateBroadcast = 10000;
+ setUpFleetController(3, true, options);
+ setUpVdsNodes(true, new DummyVdsNodeOptions());
+ fleetController = fleetControllers.get(0); // Required to prevent waitForStableSystem from NPE'ing
+ waitForMaster(0);
+ waitForStableSystem();
+ log.info("Waiting for full maintenance mode in default space");
+ waitForStateInSpace("default", "version:\\d+ distributor:10 storage:10 .0.s:m .1.s:m .2.s:m .3.s:m .4.s:m .5.s:m .6.s:m .7.s:m .8.s:m .9.s:m");
+
+ log.info("Responding with zero global merges pending from all distributors");
+ final int ackVersion = fleetControllers.get(0).getClusterStateBundle().getVersion();
+ // ACKing with no merge state in host info (implied: no pending merges) should not cause
+ // a new state to be published before the last node has ACKed. Consequently there should
+ // not be any race potential where a new version is published concurrently with our attempts
+ // at ACKing a previous one.
+ this.nodes.stream().filter(DummyVdsNode::isDistributor).forEach(node -> {
+ node.setNodeState(new NodeState(NodeType.DISTRIBUTOR, State.UP),
+ String.format("{\"cluster-state-version\":%d}", ackVersion));
+ });
+ waitForStateInAllSpaces("version:\\d+ distributor:10 storage:10");
+
+ log.info("Bundle before restart cycle: " + fleetControllers.get(0).getClusterStateBundle());
+ log.info("Doing restart cycle of controller 0");
+ fleetControllers.get(0).shutdown();
+ waitForMaster(1);
+ waitForCompleteCycle(1);
+
+ fleetControllers.set(0, createFleetController(usingFakeTimer, fleetControllers.get(0).getOptions(), true, null));
+ waitForMaster(0);
+ waitForCompleteCycle(0);
+
+ // We should NOT publish a state where all storage nodes are in Maintenance, since they were
+ // marked as Up in the last published cluster state.
+ log.info("Bundle after restart cycle: " + fleetControllers.get(0).getClusterStateBundle());
+ waitForStateInAllSpaces("version:\\d+ distributor:10 storage:10");
+ }
+
}
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java
index b10a8101c37..32de3591f2d 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java
@@ -5,6 +5,7 @@ import com.yahoo.jrt.*;
import com.yahoo.vdslib.distribution.ConfiguredNode;
import com.yahoo.vdslib.state.*;
import com.yahoo.vespa.clustercontroller.core.database.DatabaseHandler;
+import com.yahoo.vespa.clustercontroller.core.database.ZooKeeperDatabaseFactory;
import com.yahoo.vespa.clustercontroller.core.testutils.StateWaiter;
import com.yahoo.vespa.clustercontroller.utils.util.NoMetricReporter;
import org.junit.Before;
@@ -46,7 +47,7 @@ public class StateChangeTest extends FleetControllerTest {
ContentCluster cluster = new ContentCluster(options.clusterName, options.nodes, options.storageDistribution,
options.minStorageNodesUp, options.minRatioOfStorageNodesUp);
NodeStateGatherer stateGatherer = new NodeStateGatherer(timer, timer, eventLog);
- DatabaseHandler database = new DatabaseHandler(timer, options.zooKeeperServerAddress, options.fleetControllerIndex, timer);
+ DatabaseHandler database = new DatabaseHandler(new ZooKeeperDatabaseFactory(), timer, options.zooKeeperServerAddress, options.fleetControllerIndex, timer);
StateChangeHandler stateGenerator = new StateChangeHandler(timer, eventLog, metricUpdater);
SystemStateBroadcaster stateBroadcaster = new SystemStateBroadcaster(timer, timer);
MasterElectionHandler masterElectionHandler = new MasterElectionHandler(options.fleetControllerIndex, options.fleetControllerCount, timer, timer);
@@ -955,8 +956,9 @@ public class StateChangeTest extends FleetControllerTest {
StateWaiter waiter = new StateWaiter(timer);
fleetController.addSystemStateListener(waiter);
- // Ensure all nodes have been seen by fleetcontroller and that it has had enough time to possibly have sent a cluster state
- waiter.waitForState("version:\\d+ distributor:10 (\\.\\d+\\.t:\\d+ )*storage:10 (\\.\\d+\\.t:\\d+ )*.1.s:d( \\.\\d+\\.t:\\d+)*", timeoutMS);
+ // Ensure all nodes have been seen by fleetcontroller and that it has had enough time to possibly have sent a cluster state
+ // Note: this is a candidate state and therefore NOT versioned yet
+ waiter.waitForState("^distributor:10 (\\.\\d+\\.t:\\d+ )*storage:10 (\\.\\d+\\.t:\\d+ )*.1.s:d( \\.\\d+\\.t:\\d+)*", timeoutMS);
waitForCompleteCycle();
new StateMessageChecker(nodes) {
@Override int expectedMessageCount(final DummyVdsNode node) { return 0; }
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateVersionTrackerTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateVersionTrackerTest.java
index 2a3be1e7194..95b699d4de4 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateVersionTrackerTest.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateVersionTrackerTest.java
@@ -303,6 +303,23 @@ public class StateVersionTrackerTest {
assertFalse(tracker.bucketSpaceMergeCompletionStateHasChanged());
}
+ @Test
+ public void setting_zookeeper_retrieved_bundle_sets_current_versioned_state_and_resets_candidate_state() {
+ final StateVersionTracker versionTracker = createWithMockedMetrics();
+ versionTracker.setVersionRetrievedFromZooKeeper(100);
+ versionTracker.updateLatestCandidateStateBundle(
+ ClusterStateBundle.ofBaselineOnly(stateWithoutAnnotations("distributor:1 storage:1")));
+ versionTracker.promoteCandidateToVersionedState(12345);
+
+ ClusterStateBundle zkBundle = ClusterStateBundle.ofBaselineOnly(stateWithoutAnnotations("version:199 distributor:2 storage:2"));
+ versionTracker.setVersionRetrievedFromZooKeeper(200);
+ versionTracker.setClusterStateBundleRetrievedFromZooKeeper(zkBundle);
+
+ assertThat(versionTracker.getLatestCandidateState(), equalTo(AnnotatedClusterState.emptyState()));
+ assertThat(versionTracker.getVersionedClusterStateBundle(), equalTo(zkBundle));
+ assertThat(versionTracker.getCurrentVersion(), equalTo(200)); // Not from bundle, but from explicitly stored version
+ }
+
private HostInfo createHostInfo(long bucketsPending) {
return HostInfo.createHostInfo(
"{\n" +
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ZooKeeperDatabaseTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ZooKeeperDatabaseTest.java
new file mode 100644
index 00000000000..e5200b63a7e
--- /dev/null
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ZooKeeperDatabaseTest.java
@@ -0,0 +1,76 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.clustercontroller.core;
+
+import com.yahoo.vespa.clustercontroller.core.database.Database;
+import com.yahoo.vespa.clustercontroller.core.database.ZooKeeperDatabase;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.time.Duration;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+
+public class ZooKeeperDatabaseTest {
+
+ private static class Fixture implements AutoCloseable {
+ final ZooKeeperTestServer zkServer;
+ ClusterFixture clusterFixture;
+ ZooKeeperDatabase zkDatabase;
+ int nodeIndex = 1;
+ Duration sessionTimeout = Duration.ofSeconds(60);
+ Database.DatabaseListener mockListener = mock(Database.DatabaseListener.class);
+
+ Fixture() throws IOException {
+ zkServer = new ZooKeeperTestServer();
+ clusterFixture = ClusterFixture.forFlatCluster(10);
+ }
+
+ void createDatabase() throws Exception {
+ closeDatabaseIfOpen();
+ zkDatabase = new ZooKeeperDatabase(clusterFixture.cluster(), nodeIndex, zkServer.getAddress(),
+ (int)sessionTimeout.toMillis(), mockListener);
+ }
+
+ ZooKeeperDatabase db() { return zkDatabase; }
+
+ void closeDatabaseIfOpen() {
+ if (zkDatabase != null) {
+ zkDatabase.close();
+ zkDatabase = null;
+ }
+ }
+
+ @Override
+ public void close() {
+ closeDatabaseIfOpen();
+ zkServer.shutdown(true);
+ }
+ }
+
+ @Test
+ public void can_store_and_load_cluster_state_bundle_from_database() throws Exception {
+ try (Fixture f = new Fixture()) {
+ f.createDatabase();
+ ClusterStateBundle bundleToStore = ClusterStateBundleUtil.makeBundle("distributor:2 storage:2",
+ StateMapping.of("default", "distributor:2 storage:2 .0.s:d"),
+ StateMapping.of("upsidedown", "distributor:2 .0.s:d storage:2"));
+ f.db().storeLastPublishedStateBundle(bundleToStore);
+
+ ClusterStateBundle bundleReceived = f.db().retrieveLastPublishedStateBundle();
+ assertThat(bundleReceived, equalTo(bundleToStore));
+ }
+ }
+
+ @Test
+ public void empty_state_bundle_is_returned_if_no_bundle_already_stored_in_database() throws Exception {
+ try (Fixture f = new Fixture()) {
+ f.createDatabase();
+ ClusterStateBundle bundleReceived = f.db().retrieveLastPublishedStateBundle();
+
+ assertThat(bundleReceived, equalTo(ClusterStateBundle.empty()));
+ }
+ }
+
+}
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/SlimeClusterStateBundleCodecTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/SlimeClusterStateBundleCodecTest.java
index f2fe494ce61..b19b1d780bf 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/SlimeClusterStateBundleCodecTest.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/SlimeClusterStateBundleCodecTest.java
@@ -18,6 +18,12 @@ public class SlimeClusterStateBundleCodecTest {
return codec.decode(encoded);
}
+ private static ClusterStateBundle roundtripEncodeWithEnvelope(ClusterStateBundle stateBundle) {
+ SlimeClusterStateBundleCodec codec = new SlimeClusterStateBundleCodec();
+ byte[] encoded = codec.encodeWithEnvelope(stateBundle);
+ return codec.decodeWithEnvelope(encoded);
+ }
+
@Test
public void baseline_only_bundle_can_be_round_trip_encoded() {
ClusterStateBundle stateBundle = ClusterStateBundleUtil.makeBundle("distributor:2 storage:2");
@@ -53,4 +59,17 @@ public class SlimeClusterStateBundleCodecTest {
assertThat(decodedBundle, equalTo(stateBundle));
}
+ @Test
+ public void uncompressed_enveloped_bundle_can_be_roundtrip_encoded() {
+ // Insufficient length and too much entropy to be compressed
+ ClusterStateBundle stateBundle = ClusterStateBundleUtil.makeBundle("distributor:2 storage:3");
+ assertThat(roundtripEncodeWithEnvelope(stateBundle), equalTo(stateBundle));
+ }
+
+ @Test
+ public void compressable_enveloped_bundle_can_be_roundtrip_encoded() {
+ ClusterStateBundle stateBundle = makeCompressableBundle();
+ assertThat(roundtripEncodeWithEnvelope(stateBundle), equalTo(stateBundle));
+ }
+
}
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/StateWaiter.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/StateWaiter.java
index 730b03ff7c9..7811e68c381 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/StateWaiter.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/StateWaiter.java
@@ -23,7 +23,8 @@ public class StateWaiter implements SystemStateListener {
this.timer = timer;
}
- public void handleNewSystemState(ClusterStateBundle state) {
+ @Override
+ public void handleNewPublishedState(ClusterStateBundle state) {
synchronized(timer) {
current = state.getBaselineClusterState();
@@ -32,6 +33,13 @@ public class StateWaiter implements SystemStateListener {
}
}
+ @Override
+ public void handleNewCandidateState(ClusterStateBundle states) {
+ // Treat candidate states as if they were published for the tests that use
+ // this (deprecated) waiter class.
+ handleNewPublishedState(states);
+ }
+
public int getStateUpdates() { return Math.max(0, stateUpdates); }
public ClusterState getCurrentSystemState() {
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/WaitCondition.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/WaitCondition.java
index 6396b27aa79..d140ef998b6 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/WaitCondition.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/WaitCondition.java
@@ -4,12 +4,17 @@ package com.yahoo.vespa.clustercontroller.core.testutils;
import com.yahoo.vdslib.state.ClusterState;
import com.yahoo.vdslib.state.Node;
import com.yahoo.vdslib.state.NodeType;
+import com.yahoo.vespa.clustercontroller.core.AnnotatedClusterState;
import com.yahoo.vespa.clustercontroller.core.ClusterStateBundle;
import com.yahoo.vespa.clustercontroller.core.DummyVdsNode;
import com.yahoo.vespa.clustercontroller.core.FleetController;
import com.yahoo.vespa.clustercontroller.core.listeners.SystemStateListener;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -19,14 +24,14 @@ import java.util.regex.Pattern;
public interface WaitCondition {
/** Return null if met, why not if it is not met. */
- public String isConditionMet();
+ String isConditionMet();
- public abstract class StateWait implements WaitCondition {
+ abstract class StateWait implements WaitCondition {
private final Object monitor;
protected ClusterState currentState;
private final SystemStateListener listener = new SystemStateListener() {
@Override
- public void handleNewSystemState(ClusterStateBundle state) {
+ public void handleNewPublishedState(ClusterStateBundle state) {
synchronized (monitor) {
currentState = state.getBaselineClusterState();
monitor.notifyAll();
@@ -36,7 +41,9 @@ public interface WaitCondition {
public StateWait(FleetController fc, Object monitor) {
this.monitor = monitor;
- fc.addSystemStateListener(listener);
+ synchronized (this.monitor) {
+ fc.addSystemStateListener(listener);
+ }
}
public ClusterState getCurrentState() {
@@ -46,11 +53,13 @@ public interface WaitCondition {
}
}
- public class RegexStateMatcher extends StateWait {
+ class RegexStateMatcher extends StateWait {
private final Pattern pattern;
private Collection<DummyVdsNode> nodesToCheck;
private ClusterState lastCheckedState;
+ private boolean checkAllSpaces = false;
+ private Set<String> checkSpaceSubset = Collections.emptySet();
public RegexStateMatcher(String regex, FleetController fc, Object monitor) {
super(fc, monitor);
@@ -62,19 +71,51 @@ public interface WaitCondition {
return this;
}
+ public RegexStateMatcher checkAllSpaces(boolean checkAllSpaces) {
+ this.checkAllSpaces = checkAllSpaces;
+ return this;
+ }
+
+ public RegexStateMatcher checkSpaceSubset(Set<String> spaces) {
+ this.checkSpaceSubset = spaces;
+ return this;
+ }
+
+ private static List<ClusterState> statesInBundle(ClusterStateBundle bundle) {
+ List<ClusterState> states = new ArrayList<>(3);
+ states.add(bundle.getBaselineClusterState());
+ bundle.getDerivedBucketSpaceStates().forEach((space, state) -> states.add(state.getClusterState()));
+ return states;
+ }
+
@Override
public String isConditionMet() {
if (currentState != null) {
lastCheckedState = currentState;
Matcher m = pattern.matcher(lastCheckedState.toString());
- if (m.matches()) {
+ if (m.matches() || !checkSpaceSubset.isEmpty()) {
if (nodesToCheck != null) {
for (DummyVdsNode node : nodesToCheck) {
if (node.getClusterState() == null) {
return "Node " + node + " has not received a cluster state yet";
}
- if (! pattern.matcher(withoutTimestamps(node.getClusterState().toString())).matches()) {
- return "Node " + node + " state mismatch.\n wanted: " + pattern + "\n is: " + node.getClusterState().toString();
+ // TODO refactor, simplify
+ boolean match;
+ if (checkAllSpaces) {
+ match = statesInBundle(node.getClusterStateBundle()).stream()
+ .allMatch(state -> pattern.matcher(withoutTimestamps(state.toString())).matches());
+ } else if (!checkSpaceSubset.isEmpty()) {
+ match = checkSpaceSubset.stream().allMatch(space -> {
+ String state = node.getClusterStateBundle().getDerivedBucketSpaceStates()
+ .getOrDefault(space, AnnotatedClusterState.emptyState()).getClusterState().toString();
+ return pattern.matcher(withoutTimestamps(state)).matches();
+ });
+ } else {
+ match = pattern.matcher(withoutTimestamps(node.getClusterState().toString())).matches();
+ }
+
+ if (!match) {
+ return "Node " + node + " state mismatch.\n wanted: " + pattern + "\n is: " + node.getClusterStateBundle().toString();
}
if (node.getStateCommunicationVersion() > 0) {
if (!node.hasPendingGetNodeStateRequest()) {
@@ -141,7 +182,7 @@ public interface WaitCondition {
}
}
- public static class MinUsedBitsMatcher extends StateWait {
+ class MinUsedBitsMatcher extends StateWait {
private final int bitCount;
private final int nodeCount;
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/Waiter.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/Waiter.java
index a0af1ad2d0e..fb49b1cbf47 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/Waiter.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/Waiter.java
@@ -7,28 +7,32 @@ import com.yahoo.vdslib.state.Node;
import com.yahoo.vespa.clustercontroller.core.DummyVdsNode;
import com.yahoo.vespa.clustercontroller.core.FleetController;
+import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
+import java.util.Set;
import java.util.logging.Logger;
public interface Waiter {
- public interface DataRetriever {
- public Object getMonitor();
- public FleetController getFleetController();
- public List<DummyVdsNode> getDummyNodes();
- public int getTimeoutMS();
+ interface DataRetriever {
+ Object getMonitor();
+ FleetController getFleetController();
+ List<DummyVdsNode> getDummyNodes();
+ int getTimeoutMS();
}
- public ClusterState waitForState(String state) throws Exception;
- public ClusterState waitForState(String state, int timeoutMS) throws Exception;
- public ClusterState waitForStableSystem() throws Exception;
- public ClusterState waitForStableSystem(int nodeCount) throws Exception;
- public ClusterState waitForInitProgressPassed(Node n, double progress);
- public ClusterState waitForClusterStateIncludingNodesWithMinUsedBits(int bitcount, int nodecount);
- public void wait(WaitCondition c, WaitTask wt, int timeoutMS);
+ ClusterState waitForState(String state) throws Exception;
+ ClusterState waitForStateInSpace(String space, String state) throws Exception;
+ ClusterState waitForStateInAllSpaces(String state) throws Exception;
+ ClusterState waitForState(String state, int timeoutMS) throws Exception;
+ ClusterState waitForStableSystem() throws Exception;
+ ClusterState waitForStableSystem(int nodeCount) throws Exception;
+ ClusterState waitForInitProgressPassed(Node n, double progress);
+ ClusterState waitForClusterStateIncludingNodesWithMinUsedBits(int bitcount, int nodecount);
+ void wait(WaitCondition c, WaitTask wt, int timeoutMS);
- public static class Impl implements Waiter {
+ class Impl implements Waiter {
private static final Logger log = Logger.getLogger(Impl.class.getName());
private final DataRetriever data;
@@ -37,16 +41,33 @@ public interface Waiter {
this.data = data;
}
- public ClusterState waitForState(String state) throws Exception { return waitForState(state, data.getTimeoutMS()); }
- public ClusterState waitForState(String state, int timeoutMS) throws Exception {
+ // TODO refactor
+ private ClusterState waitForState(String state, int timeoutMS, boolean checkAllSpaces, Set<String> checkSpaces) {
LinkedList<DummyVdsNode> nodesToCheck = new LinkedList<>();
for(DummyVdsNode node : data.getDummyNodes()) {
if (node.isConnected()) nodesToCheck.add(node);
}
- WaitCondition.StateWait swc = new WaitCondition.RegexStateMatcher(state, data.getFleetController(), data.getMonitor()).includeNotifyingNodes(nodesToCheck);
+ WaitCondition.StateWait swc = new WaitCondition.RegexStateMatcher(
+ state, data.getFleetController(), data.getMonitor())
+ .includeNotifyingNodes(nodesToCheck)
+ .checkAllSpaces(checkAllSpaces)
+ .checkSpaceSubset(checkSpaces);
wait(swc, new WaitTask.StateResender(data.getFleetController()), timeoutMS);
return swc.getCurrentState();
}
+
+ public ClusterState waitForState(String state) throws Exception {
+ return waitForState(state, data.getTimeoutMS());
+ }
+ public ClusterState waitForStateInAllSpaces(String state) {
+ return waitForState(state, data.getTimeoutMS(), true, Collections.emptySet());
+ }
+ public ClusterState waitForStateInSpace(String space, String state) {
+ return waitForState(state, data.getTimeoutMS(), false, Collections.singleton(space));
+ }
+ public ClusterState waitForState(String state, int timeoutMS) {
+ return waitForState(state, timeoutMS, false, Collections.emptySet());
+ }
public ClusterState waitForStableSystem() throws Exception {
return waitForStableSystem(data.getDummyNodes().size() / 2);
}