aboutsummaryrefslogtreecommitdiffstats
path: root/clustercontroller-core
diff options
context:
space:
mode:
Diffstat (limited to 'clustercontroller-core')
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/AnnotatedClusterState.java4
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateBundle.java149
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateDeriver.java18
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/Communicator.java2
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java79
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetControllerOptions.java4
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/MaintenanceWhenPendingGlobalMerges.java66
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/MergePendingChecker.java13
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/StateVersionTracker.java57
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java55
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/DatabaseHandler.java8
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/listeners/SystemStateListener.java4
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java13
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterFixture.java5
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterStateBundleTest.java99
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyCommunicator.java7
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MaintenancehenPendingGlobalMergesTest.java71
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateVersionTrackerTest.java50
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java145
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/StateWaiter.java5
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/WaitCondition.java5
21 files changed, 757 insertions, 102 deletions
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/AnnotatedClusterState.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/AnnotatedClusterState.java
index f935b8b2cc7..9bf36cca947 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/AnnotatedClusterState.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/AnnotatedClusterState.java
@@ -28,6 +28,10 @@ public class AnnotatedClusterState {
return new AnnotatedClusterState(ClusterState.emptyState(), Optional.empty(), emptyNodeStateReasons());
}
+ public static AnnotatedClusterState withoutAnnotations(ClusterState state) {
+ return new AnnotatedClusterState(state, Optional.empty(), emptyNodeStateReasons());
+ }
+
static Map<Node, NodeStateReason> emptyNodeStateReasons() {
return Collections.emptyMap();
}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateBundle.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateBundle.java
new file mode 100644
index 00000000000..09273ee5656
--- /dev/null
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateBundle.java
@@ -0,0 +1,149 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.clustercontroller.core;
+
+import com.yahoo.vdslib.state.ClusterState;
+
+import java.util.*;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * A cluster state bundle is a wrapper around the baseline ("source of truth") cluster
+ * state and any bucket space specific states that may be derived from it.
+ *
+ * The baseline state represents the generated state of the _nodes_ in the cluster,
+ * while the per-space states represent possible transformations that make sense in
+ * the context of that particular bucket space. The most prominent example is
+ * transforming nodes in the default bucket space into maintenance mode if they have
+ * merges pending in the global space.
+ *
+ * The baseline state is identical to the legacy, global cluster state that the
+ * cluster controller has historically produced as its only output.
+ */
+public class ClusterStateBundle {
+
+ private final AnnotatedClusterState baselineState;
+ private final Map<String, ClusterState> derivedBucketSpaceStates;
+
+ public static class Builder {
+ private final AnnotatedClusterState baselineState;
+ private ClusterStateDeriver stateDeriver;
+ private Set<String> bucketSpaces;
+
+ public Builder(AnnotatedClusterState baselineState) {
+ this.baselineState = baselineState;
+ }
+
+ public Builder stateDeriver(ClusterStateDeriver stateDeriver) {
+ this.stateDeriver = stateDeriver;
+ return this;
+ }
+
+ public Builder bucketSpaces(Set<String> bucketSpaces) {
+ this.bucketSpaces = bucketSpaces;
+ return this;
+ }
+
+ public Builder bucketSpaces(String... bucketSpaces) {
+ this.bucketSpaces = new TreeSet<>(Arrays.asList(bucketSpaces));
+ return this;
+ }
+
+ public ClusterStateBundle deriveAndBuild() {
+ if (stateDeriver == null || bucketSpaces == null || bucketSpaces.isEmpty()) {
+ return ClusterStateBundle.ofBaselineOnly(baselineState);
+ }
+ Map<String, ClusterState> derived = bucketSpaces.stream()
+ .collect(Collectors.toMap(
+ Function.identity(),
+ s -> stateDeriver.derivedFrom(baselineState.getClusterState(), s)));
+ return new ClusterStateBundle(baselineState, derived);
+ }
+ }
+
+ private ClusterStateBundle(AnnotatedClusterState baselineState, Map<String, ClusterState> derivedBucketSpaceStates) {
+ this.baselineState = baselineState;
+ this.derivedBucketSpaceStates = Collections.unmodifiableMap(derivedBucketSpaceStates);
+ }
+
+ public static Builder builder(AnnotatedClusterState baselineState) {
+ return new Builder(baselineState);
+ }
+
+ public static ClusterStateBundle of(AnnotatedClusterState baselineState, Map<String, ClusterState> derivedBucketSpaceStates) {
+ return new ClusterStateBundle(baselineState, derivedBucketSpaceStates);
+ }
+
+ public static ClusterStateBundle ofBaselineOnly(AnnotatedClusterState baselineState) {
+ return new ClusterStateBundle(baselineState, Collections.emptyMap());
+ }
+
+ public AnnotatedClusterState getBaselineAnnotatedState() {
+ return baselineState;
+ }
+
+ public ClusterState getBaselineClusterState() {
+ return baselineState.getClusterState();
+ }
+
+ public Map<String, ClusterState> getDerivedBucketSpaceStates() {
+ return derivedBucketSpaceStates;
+ }
+
+ public ClusterStateBundle cloneWithMapper(Function<ClusterState, ClusterState> mapper) {
+ AnnotatedClusterState clonedBaseline = new AnnotatedClusterState(
+ mapper.apply(baselineState.getClusterState().clone()),
+ baselineState.getClusterStateReason(),
+ baselineState.getNodeStateReasons());
+ Map<String, ClusterState> clonedDerived = derivedBucketSpaceStates.entrySet().stream()
+ .collect(Collectors.toMap(e -> e.getKey(), e -> mapper.apply(e.getValue().clone())));
+ return new ClusterStateBundle(clonedBaseline, clonedDerived);
+ }
+
+ public ClusterStateBundle clonedWithVersionSet(int version) {
+ return cloneWithMapper(state -> {
+ state.setVersion(version);
+ return state;
+ });
+ }
+
+ public boolean similarTo(ClusterStateBundle other) {
+ if (!baselineState.getClusterState().similarToIgnoringInitProgress(other.baselineState.getClusterState())) {
+ return false;
+ }
+ // FIXME we currently treat mismatching bucket space sets as unchanged to avoid breaking some tests
+ return derivedBucketSpaceStates.entrySet().stream()
+ .allMatch(entry -> other.derivedBucketSpaceStates.getOrDefault(entry.getKey(), entry.getValue())
+ .similarToIgnoringInitProgress(entry.getValue()));
+ }
+
+ public int getVersion() {
+ return baselineState.getClusterState().getVersion();
+ }
+
+ @Override
+ public String toString() {
+ if (derivedBucketSpaceStates.isEmpty()) {
+ return String.format("ClusterStateBundle('%s')", baselineState);
+ }
+ Map<String, ClusterState> orderedStates = new TreeMap<>(derivedBucketSpaceStates);
+ return String.format("ClusterStateBundle('%s', %s)", baselineState, orderedStates.entrySet().stream()
+ .map(e -> String.format("%s '%s'", e.getKey(), e.getValue()))
+ .collect(Collectors.joining(", ")));
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ ClusterStateBundle that = (ClusterStateBundle) o;
+ return Objects.equals(baselineState, that.baselineState) &&
+ Objects.equals(derivedBucketSpaceStates, that.derivedBucketSpaceStates);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(baselineState, derivedBucketSpaceStates);
+ }
+
+}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateDeriver.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateDeriver.java
new file mode 100644
index 00000000000..9e42e1da649
--- /dev/null
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateDeriver.java
@@ -0,0 +1,18 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.clustercontroller.core;
+
+import com.yahoo.vdslib.state.ClusterState;
+
+/**
+ * Bucket-space aware transformation factory to "derive" new cluster states from an
+ * existing state.
+ */
+public interface ClusterStateDeriver {
+ /**
+ * @param state Baseline cluster state used as a source for deriving a new state.
+ * MUST NOT be modified explicitly or implicitly.
+ * @param bucketSpace The name of the bucket space for which the state should be derived
+ * @return A cluster state instance representing the derived state, or <em>state</em> unchanged.
+ */
+ ClusterState derivedFrom(ClusterState state, String bucketSpace);
+}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/Communicator.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/Communicator.java
index de0ba6bf33c..450513343b0 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/Communicator.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/Communicator.java
@@ -18,7 +18,7 @@ public interface Communicator {
void getNodeState(NodeInfo node, Waiter<GetNodeStateRequest> waiter);
- void setSystemState(ClusterState state, NodeInfo node, Waiter<SetClusterStateRequest> waiter);
+ void setSystemState(ClusterStateBundle states, NodeInfo node, Waiter<SetClusterStateRequest> waiter);
void shutdown();
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java
index ba56d75ab4c..1059434aac3 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java
@@ -1,6 +1,7 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.clustercontroller.core;
+import com.yahoo.document.FixedBucketSpaces;
import com.yahoo.jrt.ListenFailedException;
import com.yahoo.log.LogLevel;
import com.yahoo.vdslib.distribution.ConfiguredNode;
@@ -24,18 +25,11 @@ import org.apache.commons.lang.exception.ExceptionUtils;
import java.io.FileNotFoundException;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-import java.util.TimeZone;
+import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAddedOrRemovedListener, SystemStateListener,
Runnable, RemoteClusterControllerTaskScheduler {
@@ -69,7 +63,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
private boolean waitingForCycle = false;
private StatusPageServer.PatternRequestRouter statusRequestRouter = new StatusPageServer.PatternRequestRouter();
- private final List<com.yahoo.vdslib.state.ClusterState> newStates = new ArrayList<>();
+ private final List<ClusterStateBundle> newStates = new ArrayList<>();
private long configGeneration = -1;
private long nextConfigGeneration = -1;
private Queue<RemoteClusterControllerTask> remoteTasks = new LinkedList<>();
@@ -84,6 +78,11 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
// Invariant: queued task versions are monotonically increasing with queue position
private Queue<VersionDependentTaskCompletion> taskCompletionQueue = new ArrayDeque<>();
+ // Legacy behavior is an empty set of explicitly configured bucket spaces, which means that
+ // only a baseline cluster state will be sent from the controller and no per-space state
+ // deriving is done.
+ private Set<String> configuredBucketSpaces = Collections.emptySet();
+
private final RunDataExtractor dataExtractor = new RunDataExtractor() {
@Override
public com.yahoo.vdslib.state.ClusterState getLatestClusterState() { return stateVersionTracker.getVersionedClusterState(); }
@@ -233,7 +232,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
// Always give cluster state listeners the current state, in case acceptable state has come before listener is registered.
com.yahoo.vdslib.state.ClusterState state = getSystemState();
if (state == null) throw new NullPointerException("Cluster state should never be null at this point");
- listener.handleNewSystemState(state);
+ listener.handleNewSystemState(ClusterStateBundle.ofBaselineOnly(AnnotatedClusterState.withoutAnnotations(state)));
}
}
@@ -350,16 +349,18 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
stateChangeHandler.handleReturnedRpcAddress(node);
}
- public void handleNewSystemState(com.yahoo.vdslib.state.ClusterState state) {
+ @Override
+ public void handleNewSystemState(ClusterStateBundle stateBundle) {
verifyInControllerThread();
- newStates.add(state);
- metricUpdater.updateClusterStateMetrics(cluster, state);
- systemStateBroadcaster.handleNewSystemState(state);
+ ClusterState baselineState = stateBundle.getBaselineClusterState();
+ newStates.add(stateBundle);
+ metricUpdater.updateClusterStateMetrics(cluster, baselineState);
+ systemStateBroadcaster.handleNewClusterStates(stateBundle);
// Iff master, always store new version in ZooKeeper _before_ publishing to any
// nodes so that a cluster controller crash after publishing but before a successful
// ZK store will not risk reusing the same version number.
if (masterElectionHandler.isMaster()) {
- storeClusterStateVersionToZooKeeper(state);
+ storeClusterStateVersionToZooKeeper(baselineState);
}
}
@@ -425,8 +426,9 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
// Check retirement changes
for (ConfiguredNode node : newNodes) {
- if (node.retired() != cluster.getConfiguredNodes().get(node.index()).retired())
+ if (node.retired() != cluster.getConfiguredNodes().get(node.index()).retired()) {
return true;
+ }
}
return false;
@@ -441,10 +443,20 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
cluster.setSlobrokGenerationCount(0);
}
+ // TODO don't hardcode bucket spaces
+ if (options.enableMultipleBucketSpaces) {
+ configuredBucketSpaces = Collections.unmodifiableSet(
+ Stream.of(FixedBucketSpaces.defaultSpace(), FixedBucketSpaces.globalSpace())
+ .collect(Collectors.toSet()));
+ } else {
+ configuredBucketSpaces = Collections.emptySet();
+ }
+
communicator.propagateOptions(options);
- if (nodeLookup instanceof SlobrokClient)
- ((SlobrokClient)nodeLookup).setSlobrokConnectionSpecs(options.slobrokConnectionSpecs);
+ if (nodeLookup instanceof SlobrokClient) {
+ ((SlobrokClient) nodeLookup).setSlobrokConnectionSpecs(options.slobrokConnectionSpecs);
+ }
eventLog.setMaxSize(options.eventLogMaxSize, options.eventNodeLogMaxSize);
cluster.setPollingFrequency(options.statePollingFrequency);
cluster.setDistribution(options.storageDistribution);
@@ -625,7 +637,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
return false;
}
- private boolean broadcastClusterStateToEligibleNodes() throws InterruptedException {
+ private boolean broadcastClusterStateToEligibleNodes() {
boolean sentAny = false;
// Give nodes a fair chance to respond first time to state gathering requests, so we don't
// disturb system when we take over. Allow anyways if we have states from all nodes.
@@ -638,7 +650,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
// Reset timer to only see warning once.
firstAllowedStateBroadcast = currentTime;
}
- sentAny = systemStateBroadcaster.broadcastNewState(database, databaseContext, communicator, this);
+ sentAny = systemStateBroadcaster.broadcastNewState(databaseContext, communicator);
if (sentAny) {
nextStateSendTime = currentTime + options.minTimeBetweenNewSystemStates;
}
@@ -649,9 +661,9 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
private void propagateNewStatesToListeners() {
if ( ! newStates.isEmpty()) {
synchronized (systemStateListeners) {
- for (ClusterState state : newStates) {
+ for (ClusterStateBundle stateBundle : newStates) {
for(SystemStateListener listener : systemStateListeners) {
- listener.handleNewSystemState(state);
+ listener.handleNewSystemState(stateBundle);
}
}
newStates.clear();
@@ -778,7 +790,13 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
if (mustRecomputeCandidateClusterState()) {
stateChangeHandler.unsetStateChangedFlag();
final AnnotatedClusterState candidate = computeCurrentAnnotatedState();
- stateVersionTracker.updateLatestCandidateState(candidate);
+ // TODO test multiple bucket spaces configured
+ // TODO what interaction do we want between generated and derived states wrt. auto group take-downs?
+ final ClusterStateBundle candidateBundle = ClusterStateBundle.builder(candidate)
+ .bucketSpaces(configuredBucketSpaces)
+ .stateDeriver(createBucketSpaceStateDeriver())
+ .deriveAndBuild();
+ stateVersionTracker.updateLatestCandidateStateBundle(candidateBundle);
if (stateVersionTracker.candidateChangedEnoughFromCurrentToWarrantPublish()
|| stateVersionTracker.hasReceivedNewVersionFromZooKeeper())
@@ -787,8 +805,9 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
final AnnotatedClusterState before = stateVersionTracker.getAnnotatedVersionedClusterState();
stateVersionTracker.promoteCandidateToVersionedState(timeNowMs);
+ // TODO also emit derived state edges events
emitEventsForAlteredStateEdges(before, stateVersionTracker.getAnnotatedVersionedClusterState(), timeNowMs);
- handleNewSystemState(stateVersionTracker.getVersionedClusterState());
+ handleNewSystemState(stateVersionTracker.getVersionedClusterStateBundle());
stateWasChanged = true;
}
}
@@ -804,6 +823,10 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
return stateWasChanged;
}
+ private ClusterStateDeriver createBucketSpaceStateDeriver() {
+ return new MaintenanceWhenPendingGlobalMerges((space, index) -> false); // TODO wire with stats aggregator
+ }
+
/**
* Move tasks that are dependent on the most recently generated state being published into
* a completion queue with a dependency on the provided version argument. Once that version
@@ -862,7 +885,9 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
}
private boolean mustRecomputeCandidateClusterState() {
- return stateChangeHandler.stateMayHaveChanged() || stateVersionTracker.hasReceivedNewVersionFromZooKeeper();
+ return stateChangeHandler.stateMayHaveChanged()
+ || stateVersionTracker.hasReceivedNewVersionFromZooKeeper()
+ || stateVersionTracker.bucketSpaceMergeCompletionStateHasChanged();
}
private boolean handleLeadershipEdgeTransitions() throws InterruptedException {
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetControllerOptions.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetControllerOptions.java
index d8c853f45cb..3bf8edafbe3 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetControllerOptions.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetControllerOptions.java
@@ -119,6 +119,9 @@ public class FleetControllerOptions implements Cloneable {
private Duration maxDeferredTaskVersionWaitTime = Duration.ofSeconds(30);
+ // TODO replace this flag with a set of bucket spaces instead
+ public boolean enableMultipleBucketSpaces = false;
+
// TODO: Replace usage of this by usage where the nodes are explicitly passed (below)
public FleetControllerOptions(String clusterName) {
this.clusterName = clusterName;
@@ -225,6 +228,7 @@ public class FleetControllerOptions implements Cloneable {
sb.append("<tr><td><nobr>Maximum node event log size</nobr></td><td align=\"right\">").append(eventNodeLogMaxSize).append("</td></tr>");
sb.append("<tr><td><nobr>Wanted distribution bits</nobr></td><td align=\"right\">").append(distributionBits).append("</td></tr>");
sb.append("<tr><td><nobr>Max deferred task version wait time</nobr></td><td align=\"right\">").append(maxDeferredTaskVersionWaitTime.toMillis()).append("ms</td></tr>");
+ sb.append("<tr><td><nobr>Multiple bucket spaces enabled</nobr></td><td align=\"right\">").append(enableMultipleBucketSpaces).append("</td></tr>");
sb.append("</table>");
}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/MaintenanceWhenPendingGlobalMerges.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/MaintenanceWhenPendingGlobalMerges.java
new file mode 100644
index 00000000000..16401a79f08
--- /dev/null
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/MaintenanceWhenPendingGlobalMerges.java
@@ -0,0 +1,66 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.clustercontroller.core;
+
+import com.yahoo.document.FixedBucketSpaces;
+import com.yahoo.vdslib.state.*;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Cluster state deriver which checks if nodes have merges pending for globally
+ * distributed documents, and if they do, sets the node in maintenance mode in the
+ * default bucket space. This allows merges to complete for global documents before
+ * the default space documents that have references to them are made searchable.
+ *
+ * Note: bucket spaces are currently hard-coded.
+ */
+public class MaintenanceWhenPendingGlobalMerges implements ClusterStateDeriver {
+
+ // TODO make these configurable
+ private static final String bucketSpaceToCheck = FixedBucketSpaces.globalSpace();
+ private static final String bucketSpaceToDerive = FixedBucketSpaces.defaultSpace();
+
+ private final MergePendingChecker mergePendingChecker;
+
+ public MaintenanceWhenPendingGlobalMerges(MergePendingChecker mergePendingChecker) {
+ this.mergePendingChecker = mergePendingChecker;
+ }
+
+ @Override
+ public ClusterState derivedFrom(ClusterState baselineState, String bucketSpace) {
+ ClusterState derivedState = baselineState.clone();
+ if (!bucketSpace.equals(bucketSpaceToDerive)) {
+ return derivedState;
+ }
+ Set<Integer> incompleteNodeIndices = nodesWithMergesNotDone(baselineState);
+ if (incompleteNodeIndices.isEmpty()) {
+ return derivedState; // Nothing to do
+ }
+ incompleteNodeIndices.forEach(nodeIndex -> derivedState.setNodeState(Node.ofStorage(nodeIndex),
+ new NodeState(NodeType.STORAGE, State.MAINTENANCE)));
+ return derivedState;
+ }
+
+ private Set<Integer> nodesWithMergesNotDone(ClusterState baselineState) {
+ final Set<Integer> incompleteNodes = new HashSet<>();
+ final int nodeCount = baselineState.getNodeCount(NodeType.STORAGE);
+ for (int nodeIndex = 0; nodeIndex < nodeCount; ++nodeIndex) {
+ // FIXME should only set nodes into maintenance if they've not yet been up in the cluster
+ // state since they came back as Reported state Up!
+ // Must be implemented before this state deriver is enabled in production.
+ if (contentNodeIsAvailable(baselineState, nodeIndex) && hasMergesNotDone(bucketSpaceToCheck, nodeIndex)) {
+ incompleteNodes.add(nodeIndex);
+ }
+ }
+ return incompleteNodes;
+ }
+
+ private boolean contentNodeIsAvailable(ClusterState state, int nodeIndex) {
+ return state.getNodeState(Node.ofStorage(nodeIndex)).getState().oneOf("uir");
+ }
+
+ private boolean hasMergesNotDone(String bucketSpace, int nodeIndex) {
+ return mergePendingChecker.hasMergesPending(bucketSpace, nodeIndex);
+ }
+}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/MergePendingChecker.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/MergePendingChecker.java
new file mode 100644
index 00000000000..e1b844c53de
--- /dev/null
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/MergePendingChecker.java
@@ -0,0 +1,13 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.clustercontroller.core;
+
+/**
+ * Implementations provide functionality for checking if a particular bucket space
+ * has merges reported as pending from the cluster's distributor nodes. It is up
+ * to the implementation to determine the exact semantics of what "pending" implies.
+ */
+public interface MergePendingChecker {
+
+ boolean hasMergesPending(String bucketSpace, int contentNodeIndex);
+
+}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/StateVersionTracker.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/StateVersionTracker.java
index 902189fca8b..fffe1c95124 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/StateVersionTracker.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/StateVersionTracker.java
@@ -28,9 +28,9 @@ public class StateVersionTracker {
// TODO this mirrors legacy behavior, but should be moved into stable ZK state.
private int lowestObservedDistributionBits = 16;
- private ClusterState currentUnversionedState = ClusterState.emptyState();
- private AnnotatedClusterState latestCandidateState = AnnotatedClusterState.emptyState();
- private AnnotatedClusterState currentClusterState = latestCandidateState;
+ private ClusterStateBundle currentUnversionedState = ClusterStateBundle.ofBaselineOnly(AnnotatedClusterState.emptyState());
+ private ClusterStateBundle latestCandidateState = ClusterStateBundle.ofBaselineOnly(AnnotatedClusterState.emptyState());
+ private ClusterStateBundle currentClusterState = latestCandidateState;
private ClusterStateView clusterStateView;
@@ -38,7 +38,7 @@ public class StateVersionTracker {
private int maxHistoryEntryCount = 50;
StateVersionTracker() {
- clusterStateView = ClusterStateView.create(currentUnversionedState);
+ clusterStateView = ClusterStateView.create(currentUnversionedState.getBaselineClusterState());
}
void setVersionRetrievedFromZooKeeper(final int version) {
@@ -70,26 +70,30 @@ public class StateVersionTracker {
}
AnnotatedClusterState getAnnotatedVersionedClusterState() {
- return currentClusterState;
+ return currentClusterState.getBaselineAnnotatedState();
}
public ClusterState getVersionedClusterState() {
- return currentClusterState.getClusterState();
+ return currentClusterState.getBaselineClusterState();
+ }
+
+ public ClusterStateBundle getVersionedClusterStateBundle() {
+ return currentClusterState;
}
- public void updateLatestCandidateState(final AnnotatedClusterState candidate) {
- assert(latestCandidateState.getClusterState().getVersion() == 0);
- latestCandidateState = candidate;
+ public void updateLatestCandidateStateBundle(final ClusterStateBundle candidateBundle) {
+ assert(latestCandidateState.getBaselineClusterState().getVersion() == 0);
+ latestCandidateState = candidateBundle;
}
/**
- * Returns the last state provided to updateLatestCandidateState, which _may or may not_ be
+ * Returns the last state provided to updateLatestCandidateStateBundle, which _may or may not_ be
* a published state. Primary use case for this function is a caller which is interested in
* changes that may not be reflected in the published state. The best example of this would
* be node state changes when a cluster is marked as Down.
*/
public AnnotatedClusterState getLatestCandidateState() {
- return latestCandidateState;
+ return latestCandidateState.getBaselineAnnotatedState();
}
public List<ClusterStateHistoryEntry> getClusterStateHistory() {
@@ -97,7 +101,9 @@ public class StateVersionTracker {
}
boolean candidateChangedEnoughFromCurrentToWarrantPublish() {
- return !currentUnversionedState.similarToIgnoringInitProgress(latestCandidateState.getClusterState());
+ // Neither latestCandidateState nor currentUnversionedState has a version set, so the
+ // similarity is only done on structural state metadata.
+ return !currentUnversionedState.similarTo(latestCandidateState);
}
void promoteCandidateToVersionedState(final long currentTimeMs) {
@@ -108,23 +114,19 @@ public class StateVersionTracker {
recordCurrentStateInHistoryAtTime(currentTimeMs);
}
- private void updateStatesForNewVersion(final AnnotatedClusterState newState, final int newVersion) {
- currentClusterState = new AnnotatedClusterState(
- newState.getClusterState().clone(), // Because we mutate version below
- newState.getClusterStateReason(),
- newState.getNodeStateReasons());
- currentClusterState.getClusterState().setVersion(newVersion);
- currentUnversionedState = newState.getClusterState().clone();
+ private void updateStatesForNewVersion(final ClusterStateBundle newStateBundle, final int newVersion) {
+ currentClusterState = newStateBundle.clonedWithVersionSet(newVersion);
+ currentUnversionedState = newStateBundle; // TODO should we clone..? ClusterState really should be made immutable
lowestObservedDistributionBits = Math.min(
lowestObservedDistributionBits,
- newState.getClusterState().getDistributionBitCount());
- // TODO should this take place in updateLatestCandidateState instead? I.e. does it require a consolidated state?
- clusterStateView = ClusterStateView.create(currentClusterState.getClusterState());
+ newStateBundle.getBaselineClusterState().getDistributionBitCount());
+ // TODO should this take place in updateLatestCandidateStateBundle instead? I.e. does it require a consolidated state?
+ clusterStateView = ClusterStateView.create(currentClusterState.getBaselineClusterState());
}
private void recordCurrentStateInHistoryAtTime(final long currentTimeMs) {
clusterStateHistory.addFirst(new ClusterStateHistoryEntry(
- currentClusterState.getClusterState(), currentTimeMs));
+ currentClusterState.getBaselineClusterState(), currentTimeMs));
while (clusterStateHistory.size() > maxHistoryEntryCount) {
clusterStateHistory.removeLast();
}
@@ -135,4 +137,13 @@ public class StateVersionTracker {
clusterStateView.handleUpdatedHostInfo(hostnames, node, hostInfo);
}
+ boolean bucketSpaceMergeCompletionStateHasChanged() {
+ return false; // TODO wire changes in merge info
+ }
+
+ /*
+ TODO test and implement
+ - derived default space down-condition can only _keep_ a node in maintenance (down), not transition it from up -> maintenance
+ */
+
}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java
index 024c5d21607..45a14ee19cd 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java
@@ -18,7 +18,7 @@ public class SystemStateBroadcaster {
private final Timer timer;
private final Object monitor;
- private ClusterState systemState;
+ private ClusterStateBundle clusterStateBundle;
private final List<SetClusterStateRequest> replies = new LinkedList<>();
private final static long minTimeBetweenNodeErrorLogging = 10 * 60 * 1000;
@@ -32,12 +32,12 @@ public class SystemStateBroadcaster {
this.monitor = monitor;
}
- public void handleNewSystemState(ClusterState state) {
- systemState = state;
+ public void handleNewClusterStates(ClusterStateBundle state) {
+ clusterStateBundle = state;
}
public ClusterState getClusterState() {
- return systemState;
+ return clusterStateBundle.getBaselineClusterState();
}
private void reportNodeError(boolean nodeOk, NodeInfo info, String message) {
@@ -79,7 +79,7 @@ public class SystemStateBroadcaster {
}
private boolean nodeNeedsClusterState(NodeInfo node) {
- if (node.getSystemStateVersionAcknowledged() == systemState.getVersion()) {
+ if (node.getSystemStateVersionAcknowledged() == clusterStateBundle.getVersion()) {
return false; // No point in sending if node already has updated system state
}
if (node.getRpcAddress() == null || node.isRpcAddressOutdated()) {
@@ -102,7 +102,7 @@ public class SystemStateBroadcaster {
}
private boolean newestStateAlreadySentToNode(NodeInfo node) {
- return (node.getNewestSystemStateVersionSent() == systemState.getVersion());
+ return (node.getNewestSystemStateVersionSent() == clusterStateBundle.getVersion());
}
/**
@@ -113,42 +113,45 @@ public class SystemStateBroadcaster {
void checkIfClusterStateIsAckedByAllDistributors(DatabaseHandler database,
DatabaseHandler.Context dbContext,
FleetController fleetController) throws InterruptedException {
- if ((systemState == null) || (lastClusterStateInSync == systemState.getVersion())) {
+ final int currentStateVersion = clusterStateBundle.getVersion();
+ if ((clusterStateBundle == null) || (lastClusterStateInSync == currentStateVersion)) {
return; // Nothing to do for the current state
}
boolean anyOutdatedDistributorNodes = dbContext.getCluster().getNodeInfo().stream()
.filter(NodeInfo::isDistributor)
.anyMatch(this::nodeNeedsClusterState);
- if (!anyOutdatedDistributorNodes && (systemState.getVersion() > lastClusterStateInSync)) {
+ if (!anyOutdatedDistributorNodes && (currentStateVersion > lastClusterStateInSync)) {
log.log(LogLevel.DEBUG, "All distributors have newest clusterstate, updating start timestamps in zookeeper and clearing them from cluster state");
- lastClusterStateInSync = systemState.getVersion();
+ lastClusterStateInSync = currentStateVersion;
fleetController.handleAllDistributorsInSync(database, dbContext);
}
}
- public boolean broadcastNewState(DatabaseHandler database,
- DatabaseHandler.Context dbContext,
- Communicator communicator,
- FleetController fleetController) throws InterruptedException {
- if (systemState == null) return false;
+ public boolean broadcastNewState(DatabaseHandler.Context dbContext, Communicator communicator) {
+ if (clusterStateBundle == null) {
+ return false;
+ }
+
+ ClusterState baselineState = clusterStateBundle.getBaselineClusterState();
- if (!systemState.isOfficial()) {
- log.log(LogLevel.INFO, String.format("Publishing cluster state version %d", systemState.getVersion()));
- systemState.setOfficial(true);
+ if (!baselineState.isOfficial()) {
+ log.log(LogLevel.INFO, String.format("Publishing cluster state version %d", baselineState.getVersion()));
+ baselineState.setOfficial(true);
}
List<NodeInfo> recipients = resolveStateVersionSendSet(dbContext);
for (NodeInfo node : recipients) {
if (nodeNeedsToObserveStartupTimestamps(node)) {
- ClusterState newState = buildModifiedClusterState(dbContext);
- log.log(LogLevel.DEBUG, "Sending modified system state version " + systemState.getVersion()
- + " to node " + node + ": " + newState);
- communicator.setSystemState(newState, node, waiter);
+ // TODO this is the same for all nodes, compute only once
+ ClusterStateBundle modifiedBundle = clusterStateBundle.cloneWithMapper(state -> buildModifiedClusterState(state, dbContext));
+ log.log(LogLevel.DEBUG, "Sending modified cluster state version " + baselineState.getVersion()
+ + " to node " + node + ": " + modifiedBundle);
+ communicator.setSystemState(modifiedBundle, node, waiter);
} else {
- log.log(LogLevel.DEBUG, "Sending system state version " + systemState.getVersion() + " to node " + node
+ log.log(LogLevel.DEBUG, "Sending system state version " + baselineState.getVersion() + " to node " + node
+ ". (went down time " + node.getWentDownWithStartTime() + ", node start time " + node.getStartTimestamp() + ")");
- communicator.setSystemState(systemState, node, waiter);
+ communicator.setSystemState(clusterStateBundle, node, waiter);
}
}
@@ -157,12 +160,12 @@ public class SystemStateBroadcaster {
public int lastClusterStateVersionInSync() { return lastClusterStateInSync; }
- private boolean nodeNeedsToObserveStartupTimestamps(NodeInfo node) {
+ private static boolean nodeNeedsToObserveStartupTimestamps(NodeInfo node) {
return node.getStartTimestamp() != 0 && node.getWentDownWithStartTime() == node.getStartTimestamp();
}
- private ClusterState buildModifiedClusterState(DatabaseHandler.Context dbContext) {
- ClusterState newState = systemState.clone();
+ private static ClusterState buildModifiedClusterState(ClusterState sourceState, DatabaseHandler.Context dbContext) {
+ ClusterState newState = sourceState.clone();
for (NodeInfo n : dbContext.getCluster().getNodeInfo()) {
NodeState ns = newState.getNodeState(n.getNode());
if (!n.isDistributor() && ns.getStartTimestamp() == 0) {
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/DatabaseHandler.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/DatabaseHandler.java
index 07f42301695..c89bb6a136a 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/DatabaseHandler.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/DatabaseHandler.java
@@ -28,10 +28,10 @@ public class DatabaseHandler {
private static Logger log = Logger.getLogger(DatabaseHandler.class.getName());
public interface Context {
- public ContentCluster getCluster();
- public FleetController getFleetController();
- public NodeAddedOrRemovedListener getNodeAddedOrRemovedListener();
- public NodeStateOrHostInfoChangeHandler getNodeStateUpdateListener();
+ ContentCluster getCluster();
+ FleetController getFleetController();
+ NodeAddedOrRemovedListener getNodeAddedOrRemovedListener();
+ NodeStateOrHostInfoChangeHandler getNodeStateUpdateListener();
}
private class Data {
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/listeners/SystemStateListener.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/listeners/SystemStateListener.java
index 617f91cb837..40fe91f0c1e 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/listeners/SystemStateListener.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/listeners/SystemStateListener.java
@@ -1,10 +1,10 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.clustercontroller.core.listeners;
-import com.yahoo.vdslib.state.ClusterState;
+import com.yahoo.vespa.clustercontroller.core.ClusterStateBundle;
public interface SystemStateListener {
- void handleNewSystemState(ClusterState state);
+ void handleNewSystemState(ClusterStateBundle states);
}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java
index 1887c1dd2ad..0cc5ef36e3c 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java
@@ -157,8 +157,9 @@ public class RPCCommunicator implements Communicator {
}
@Override
- public void setSystemState(ClusterState state, NodeInfo node, Waiter<SetClusterStateRequest> externalWaiter) {
- RPCSetClusterStateWaiter waiter = new RPCSetClusterStateWaiter(externalWaiter, timer);
+ public void setSystemState(ClusterStateBundle stateBundle, NodeInfo node, Waiter<SetClusterStateRequest> externalWaiter) {
+ final RPCSetClusterStateWaiter waiter = new RPCSetClusterStateWaiter(externalWaiter, timer);
+ final ClusterState baselineState = stateBundle.getBaselineClusterState();
Target connection = getConnection(node);
if (!connection.isValid()) {
@@ -172,17 +173,17 @@ public class RPCCommunicator implements Communicator {
Request req;
if (node.getVersion() == 0) {
req = new Request("setsystemstate");
- req.parameters().add(new StringValue(state.toString(true)));
+ req.parameters().add(new StringValue(baselineState.toString(true)));
} else {
req = new Request("setsystemstate2");
- req.parameters().add(new StringValue(state.toString(false)));
+ req.parameters().add(new StringValue(baselineState.toString(false)));
}
- RPCSetClusterStateRequest stateRequest = new RPCSetClusterStateRequest(node, req, state.getVersion());
+ RPCSetClusterStateRequest stateRequest = new RPCSetClusterStateRequest(node, req, baselineState.getVersion());
waiter.setRequest(stateRequest);
connection.invokeAsync(req, 60, waiter);
- node.setSystemStateVersionSent(state);
+ node.setSystemStateVersionSent(baselineState);
}
// protected for testing.
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterFixture.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterFixture.java
index 16e9675d586..4862c83f2a6 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterFixture.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterFixture.java
@@ -149,6 +149,11 @@ class ClusterFixture {
return this;
}
+ ClusterFixture assignDummyRpcAddresses() {
+ cluster.getNodeInfo().forEach(ni -> ni.setRpcAddress("tcp/localhost:0"));
+ return this;
+ }
+
static Map<NodeType, Integer> buildTransitionTimeMap(int distributorTransitionTime, int storageTransitionTime) {
Map<NodeType, Integer> maxTransitionTime = new TreeMap<>();
maxTransitionTime.put(NodeType.DISTRIBUTOR, distributorTransitionTime);
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterStateBundleTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterStateBundleTest.java
new file mode 100644
index 00000000000..04a0451bc6d
--- /dev/null
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterStateBundleTest.java
@@ -0,0 +1,99 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.clustercontroller.core;
+
+import com.yahoo.vdslib.state.*;
+import org.junit.Test;
+
+import java.text.ParseException;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class ClusterStateBundleTest {
+
+ private static ClusterState stateOf(String state) {
+ try {
+ return new ClusterState(state);
+ } catch (ParseException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static ClusterStateBundle createTestBundle(boolean modifyDefaultSpace) {
+ return ClusterStateBundle
+ .builder(AnnotatedClusterState.withoutAnnotations(stateOf("distributor:2 storage:2")))
+ .bucketSpaces("default", "global", "narnia")
+ .stateDeriver((state, space) -> {
+ ClusterState derived = state.clone();
+ if (space.equals("default") && modifyDefaultSpace) {
+ derived.setNodeState(Node.ofStorage(0), new NodeState(NodeType.STORAGE, State.DOWN));
+ } else if (space.equals("narnia")) {
+ derived.setNodeState(Node.ofDistributor(0), new NodeState(NodeType.DISTRIBUTOR, State.DOWN));
+ }
+ return derived;
+ })
+ .deriveAndBuild();
+ }
+
+ private static ClusterStateBundle createTestBundle() {
+ return createTestBundle(true);
+ }
+
+ @Test
+ public void builder_creates_baseline_state_and_derived_state_per_space() {
+ ClusterStateBundle bundle = createTestBundle();
+ assertThat(bundle.getBaselineClusterState(), equalTo(stateOf("distributor:2 storage:2")));
+ assertThat(bundle.getDerivedBucketSpaceStates().size(), equalTo(3));
+ assertThat(bundle.getDerivedBucketSpaceStates().get("default"), equalTo(stateOf("distributor:2 storage:2 .0.s:d")));
+ assertThat(bundle.getDerivedBucketSpaceStates().get("global"), equalTo(stateOf("distributor:2 storage:2")));
+ assertThat(bundle.getDerivedBucketSpaceStates().get("narnia"), equalTo(stateOf("distributor:2 .0.s:d storage:2")));
+ }
+
+ @Test
+ public void version_clone_sets_version_for_all_spaces() {
+ ClusterStateBundle bundle = createTestBundle().clonedWithVersionSet(123);
+ assertThat(bundle.getBaselineClusterState(), equalTo(stateOf("version:123 distributor:2 storage:2")));
+ assertThat(bundle.getDerivedBucketSpaceStates().size(), equalTo(3));
+ assertThat(bundle.getDerivedBucketSpaceStates().get("default"), equalTo(stateOf("version:123 distributor:2 storage:2 .0.s:d")));
+ assertThat(bundle.getDerivedBucketSpaceStates().get("global"), equalTo(stateOf("version:123 distributor:2 storage:2")));
+ assertThat(bundle.getDerivedBucketSpaceStates().get("narnia"), equalTo(stateOf("version:123 distributor:2 .0.s:d storage:2")));
+ }
+
+ @Test
+ public void same_bundle_instance_considered_similar() {
+ ClusterStateBundle bundle = createTestBundle();
+ assertTrue(bundle.similarTo(bundle));
+ }
+
+ @Test
+ public void similarity_test_considers_all_bucket_spaces() {
+ ClusterStateBundle bundle = createTestBundle(false);
+ ClusterStateBundle unchangedBundle = createTestBundle(false);
+
+ assertTrue(bundle.similarTo(unchangedBundle));
+ assertTrue(unchangedBundle.similarTo(bundle));
+
+ ClusterStateBundle changedBundle = createTestBundle(true);
+ assertFalse(bundle.similarTo(changedBundle));
+ assertFalse(changedBundle.similarTo(bundle));
+ }
+
+ @Test
+ public void toString_without_bucket_space_states_prints_only_baseline_state() {
+ ClusterStateBundle bundle = ClusterStateBundle.ofBaselineOnly(
+ AnnotatedClusterState.withoutAnnotations(stateOf("distributor:2 storage:2")));
+ assertThat(bundle.toString(), equalTo("ClusterStateBundle('distributor:2 storage:2')"));
+ }
+
+ @Test
+ public void toString_includes_all_bucket_space_states() {
+ ClusterStateBundle bundle = createTestBundle();
+ assertThat(bundle.toString(), equalTo("ClusterStateBundle('distributor:2 storage:2', " +
+ "default 'distributor:2 storage:2 .0.s:d', " +
+ "global 'distributor:2 storage:2', " +
+ "narnia 'distributor:2 .0.s:d storage:2')"));
+ }
+
+}
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyCommunicator.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyCommunicator.java
index c500b4c7390..5d200d65516 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyCommunicator.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyCommunicator.java
@@ -86,9 +86,10 @@ public class DummyCommunicator implements Communicator, NodeLookup {
}
@Override
- public void setSystemState(ClusterState state, NodeInfo node, Waiter<SetClusterStateRequest> waiter) {
- DummySetClusterStateRequest req = new DummySetClusterStateRequest(node, state);
- node.setSystemStateVersionSent(state);
+ public void setSystemState(ClusterStateBundle stateBundle, NodeInfo node, Waiter<SetClusterStateRequest> waiter) {
+ ClusterState baselineState = stateBundle.getBaselineClusterState();
+ DummySetClusterStateRequest req = new DummySetClusterStateRequest(node, baselineState);
+ node.setSystemStateVersionSent(baselineState);
req.setReply(new SetClusterStateRequest.Reply());
if (node.isStorage() || !shouldDeferDistributorClusterStateAcks) {
waiter.done(req);
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MaintenancehenPendingGlobalMergesTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MaintenancehenPendingGlobalMergesTest.java
new file mode 100644
index 00000000000..6c51f251096
--- /dev/null
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MaintenancehenPendingGlobalMergesTest.java
@@ -0,0 +1,71 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.clustercontroller.core;
+
+import com.yahoo.document.FixedBucketSpaces;
+import com.yahoo.vdslib.state.ClusterState;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class MaintenancehenPendingGlobalMergesTest {
+
+ private static class Fixture {
+ public MergePendingChecker mockPendingChecker = mock(MergePendingChecker.class);
+ public MaintenanceWhenPendingGlobalMerges deriver = new MaintenanceWhenPendingGlobalMerges(mockPendingChecker);
+ }
+
+ private static String defaultSpace() {
+ return FixedBucketSpaces.defaultSpace();
+ }
+
+ private static String globalSpace() {
+ return FixedBucketSpaces.globalSpace();
+ }
+
+ @Test
+ public void no_nodes_set_to_maintenance_in_global_bucket_space_state() {
+ Fixture f = new Fixture();
+ when(f.mockPendingChecker.hasMergesPending(eq(globalSpace()), anyInt())).thenReturn(true); // False returned by default otherwise
+ ClusterState derived = f.deriver.derivedFrom(ClusterState.stateFromString("distributor:2 storage:2"), globalSpace());
+ assertThat(derived, equalTo(ClusterState.stateFromString("distributor:2 storage:2")));
+ }
+
+ @Test
+ public void content_nodes_with_global_merge_pending_set_to_maintenance_in_default_space_state() {
+ Fixture f = new Fixture();
+ when(f.mockPendingChecker.hasMergesPending(globalSpace(), 1)).thenReturn(true);
+ when(f.mockPendingChecker.hasMergesPending(globalSpace(), 3)).thenReturn(true);
+ ClusterState derived = f.deriver.derivedFrom(ClusterState.stateFromString("distributor:5 storage:5"), defaultSpace());
+ assertThat(derived, equalTo(ClusterState.stateFromString("distributor:5 storage:5 .1.s:m .3.s:m")));
+ }
+
+ @Test
+ public void no_nodes_set_to_maintenance_when_no_merges_pending() {
+ Fixture f = new Fixture();
+ ClusterState derived = f.deriver.derivedFrom(ClusterState.stateFromString("distributor:5 storage:5"), defaultSpace());
+ assertThat(derived, equalTo(ClusterState.stateFromString("distributor:5 storage:5")));
+ }
+
+ @Test
+ public void default_space_merges_do_not_count_towards_maintenance() {
+ Fixture f = new Fixture();
+ when(f.mockPendingChecker.hasMergesPending(eq(defaultSpace()), anyInt())).thenReturn(true);
+ ClusterState derived = f.deriver.derivedFrom(ClusterState.stateFromString("distributor:2 storage:2"), defaultSpace());
+ assertThat(derived, equalTo(ClusterState.stateFromString("distributor:2 storage:2")));
+ }
+
+ @Test
+ public void nodes_only_set_to_maintenance_when_marked_up_init_or_retiring() {
+ Fixture f = new Fixture();
+ when(f.mockPendingChecker.hasMergesPending(eq(globalSpace()), anyInt())).thenReturn(true);
+ ClusterState derived = f.deriver.derivedFrom(ClusterState.stateFromString("distributor:5 storage:5 .1.s:m .2.s:r .3.s:i .4.s:d"), defaultSpace());
+ // TODO reconsider role of retired here... It should not have merges pending towards it in the general case, but may be out of sync
+ assertThat(derived, equalTo(ClusterState.stateFromString("distributor:5 storage:5 .0.s:m .1.s:m .2.s:m .3.s:m .4.s:d")));
+ }
+
+}
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateVersionTrackerTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateVersionTrackerTest.java
index d27acf8bc17..0f4d1fcdefc 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateVersionTrackerTest.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateVersionTrackerTest.java
@@ -8,6 +8,7 @@ import com.yahoo.vdslib.state.NodeType;
import com.yahoo.vdslib.state.State;
import org.junit.Test;
+import java.text.ParseException;
import java.util.Arrays;
import java.util.Optional;
@@ -24,6 +25,10 @@ public class StateVersionTrackerTest {
return new AnnotatedClusterState(state, Optional.empty(), AnnotatedClusterState.emptyNodeStateReasons());
}
+ private static ClusterStateBundle stateBundleWithoutAnnotations(String stateStr) {
+ return ClusterStateBundle.ofBaselineOnly(stateWithoutAnnotations(stateStr));
+ }
+
private static StateVersionTracker createWithMockedMetrics() {
return new StateVersionTracker();
}
@@ -32,7 +37,7 @@ public class StateVersionTrackerTest {
final AnnotatedClusterState state,
final long timeMs)
{
- versionTracker.updateLatestCandidateState(state);
+ versionTracker.updateLatestCandidateStateBundle(ClusterStateBundle.ofBaselineOnly(state));
versionTracker.promoteCandidateToVersionedState(timeMs);
}
@@ -89,14 +94,14 @@ public class StateVersionTrackerTest {
@Test
public void diff_from_initial_state_implies_changed_state() {
final StateVersionTracker versionTracker = createWithMockedMetrics();
- versionTracker.updateLatestCandidateState(stateWithoutAnnotations("cluster:d"));
+ versionTracker.updateLatestCandidateStateBundle(stateBundleWithoutAnnotations("cluster:d"));
assertTrue(versionTracker.candidateChangedEnoughFromCurrentToWarrantPublish());
}
private static boolean stateChangedBetween(String fromState, String toState) {
final StateVersionTracker versionTracker = createWithMockedMetrics();
updateAndPromote(versionTracker, stateWithoutAnnotations(fromState), 123);
- versionTracker.updateLatestCandidateState(stateWithoutAnnotations(toState));
+ versionTracker.updateLatestCandidateStateBundle(stateBundleWithoutAnnotations(toState));
return versionTracker.candidateChangedEnoughFromCurrentToWarrantPublish();
}
@@ -165,7 +170,7 @@ public class StateVersionTrackerTest {
new Node(NodeType.STORAGE, 0),
new NodeState(NodeType.STORAGE, State.UP).setMinUsedBits(15));
updateAndPromote(versionTracker, stateWithMinBits, 123);
- versionTracker.updateLatestCandidateState(stateWithoutAnnotations("distributor:2 storage:2"));
+ versionTracker.updateLatestCandidateStateBundle(stateBundleWithoutAnnotations("distributor:2 storage:2"));
assertFalse(versionTracker.candidateChangedEnoughFromCurrentToWarrantPublish());
}
@@ -221,12 +226,45 @@ public class StateVersionTrackerTest {
final StateVersionTracker versionTracker = createWithMockedMetrics();
AnnotatedClusterState candidate = stateWithoutAnnotations("distributor:2 storage:2");
- versionTracker.updateLatestCandidateState(candidate);
+ versionTracker.updateLatestCandidateStateBundle(ClusterStateBundle.ofBaselineOnly(candidate));
assertThat(versionTracker.getLatestCandidateState(), equalTo(candidate));
candidate = stateWithoutAnnotations("distributor:3 storage:3");
- versionTracker.updateLatestCandidateState(candidate);
+ versionTracker.updateLatestCandidateStateBundle(ClusterStateBundle.ofBaselineOnly(candidate));
assertThat(versionTracker.getLatestCandidateState(), equalTo(candidate));
}
+ private static ClusterState stateOf(String state) {
+ return ClusterState.stateFromString(state);
+ }
+
+ private static ClusterStateBundle baselineBundle(boolean alteredDefaultState) {
+ return ClusterStateBundle
+ .builder(AnnotatedClusterState.withoutAnnotations(stateOf("distributor:1 storage:1")))
+ .bucketSpaces("default")
+ .stateDeriver((state, space) -> {
+ ClusterState derived = state.clone();
+ if (alteredDefaultState) {
+ derived.setNodeState(Node.ofStorage(0), new NodeState(NodeType.STORAGE, State.DOWN));
+ }
+ return derived;
+ })
+ .deriveAndBuild();
+ }
+
+ @Test
+ public void version_change_check_takes_derived_states_into_account() {
+ final StateVersionTracker versionTracker = createWithMockedMetrics();
+ versionTracker.updateLatestCandidateStateBundle(baselineBundle(false));
+ versionTracker.promoteCandidateToVersionedState(1234);
+
+ // Not marked changed with no changes across bucket spaces
+ versionTracker.updateLatestCandidateStateBundle(baselineBundle(false));
+ assertFalse(versionTracker.candidateChangedEnoughFromCurrentToWarrantPublish());
+
+ // Changing state in default space marks as sufficiently changed
+ versionTracker.updateLatestCandidateStateBundle(baselineBundle(true));
+ assertTrue(versionTracker.candidateChangedEnoughFromCurrentToWarrantPublish());
+ }
+
}
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java
new file mode 100644
index 00000000000..93aac5c83ed
--- /dev/null
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java
@@ -0,0 +1,145 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.clustercontroller.core;
+
+import com.yahoo.vdslib.state.*;
+import com.yahoo.vespa.clustercontroller.core.database.DatabaseHandler;
+import com.yahoo.vespa.clustercontroller.core.listeners.NodeAddedOrRemovedListener;
+import com.yahoo.vespa.clustercontroller.core.listeners.NodeStateOrHostInfoChangeHandler;
+import org.junit.Test;
+
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+public class SystemStateBroadcasterTest {
+
+ private static class Fixture {
+ FakeTimer timer = new FakeTimer();
+ final Object monitor = new Object();
+ SystemStateBroadcaster broadcaster = new SystemStateBroadcaster(timer, monitor);
+ Communicator mockCommunicator = mock(Communicator.class);
+
+ void simulateNodePartitionedAwaySilently(ClusterFixture cf) {
+ cf.cluster().getNodeInfo(Node.ofStorage(0)).setStartTimestamp(600);
+ cf.cluster().getNodeInfo(Node.ofStorage(1)).setStartTimestamp(700);
+ // Simulate a distributor being partitioned away from the controller without actually going down. It will
+ // need to observe all startup timestamps to infer if it should fetch bucket info from nodes.
+ cf.cluster().getNodeInfo(Node.ofDistributor(0)).setStartTimestamp(500); // FIXME multiple sources of timestamps are... rather confusing
+ cf.cluster().getNodeInfo(Node.ofDistributor(0)).setReportedState(new NodeState(NodeType.DISTRIBUTOR, State.UP).setStartTimestamp(500), 1000);
+ cf.cluster().getNodeInfo(Node.ofDistributor(0)).setReportedState(new NodeState(NodeType.DISTRIBUTOR, State.DOWN).setStartTimestamp(500), 2000);
+ cf.cluster().getNodeInfo(Node.ofDistributor(0)).setReportedState(new NodeState(NodeType.DISTRIBUTOR, State.UP).setStartTimestamp(500), 3000);
+ }
+ }
+
+ private static DatabaseHandler.Context dbContextFrom(ContentCluster cluster) {
+ return new DatabaseHandler.Context() {
+ @Override
+ public ContentCluster getCluster() {
+ return cluster;
+ }
+
+ @Override
+ public FleetController getFleetController() {
+ return null; // We assume the broadcaster doesn't use this for our test purposes
+ }
+
+ @Override
+ public NodeAddedOrRemovedListener getNodeAddedOrRemovedListener() {
+ return null;
+ }
+
+ @Override
+ public NodeStateOrHostInfoChangeHandler getNodeStateUpdateListener() {
+ return null;
+ }
+ };
+ }
+
+ private static Stream<NodeInfo> clusterNodeInfos(ContentCluster c, Node... nodes) {
+ return Stream.of(nodes).map(c::getNodeInfo);
+ }
+
+ private static class StateMapping {
+ final String bucketSpace;
+ final ClusterState state;
+
+ StateMapping(String bucketSpace, ClusterState state) {
+ this.bucketSpace = bucketSpace;
+ this.state = state;
+ }
+ }
+
+ private static StateMapping mapping(String bucketSpace, String state) {
+ return new StateMapping(bucketSpace, ClusterState.stateFromString(state));
+ }
+
+ private static ClusterStateBundle makeBundle(String baselineState, StateMapping... bucketSpaceStates) {
+ return ClusterStateBundle.of(AnnotatedClusterState.withoutAnnotations(ClusterState.stateFromString(baselineState)),
+ Stream.of(bucketSpaceStates).collect(Collectors.toMap(sm -> sm.bucketSpace, sm -> sm.state)));
+ }
+
+ @Test
+ public void always_publish_baseline_cluster_state() {
+ Fixture f = new Fixture();
+ ClusterStateBundle stateBundle = makeBundle("distributor:2 storage:2");
+ ClusterFixture cf = ClusterFixture.forFlatCluster(2).bringEntireClusterUp().assignDummyRpcAddresses();
+ f.broadcaster.handleNewClusterStates(stateBundle);
+ f.broadcaster.broadcastNewState(dbContextFrom(cf.cluster()), f.mockCommunicator);
+ cf.cluster().getNodeInfo().forEach(nodeInfo -> verify(f.mockCommunicator).setSystemState(eq(stateBundle), eq(nodeInfo), any()));
+ }
+
+ @Test
+ public void non_observed_startup_timestamps_are_published_per_node_for_baseline_state() {
+ Fixture f = new Fixture();
+ ClusterStateBundle stateBundle = makeBundle("distributor:2 storage:2");
+ ClusterFixture cf = ClusterFixture.forFlatCluster(2).bringEntireClusterUp().assignDummyRpcAddresses();
+ f.simulateNodePartitionedAwaySilently(cf);
+ f.broadcaster.handleNewClusterStates(stateBundle);
+ f.broadcaster.broadcastNewState(dbContextFrom(cf.cluster()), f.mockCommunicator);
+
+ clusterNodeInfos(cf.cluster(), Node.ofDistributor(1), Node.ofStorage(0), Node.ofStorage(1)).forEach(nodeInfo -> {
+ // Only distributor 0 should observe startup timestamps
+ verify(f.mockCommunicator).setSystemState(eq(stateBundle), eq(nodeInfo), any());
+ });
+ ClusterStateBundle expectedDistr0Bundle = makeBundle("distributor:2 storage:2 .0.t:600 .1.t:700");
+ verify(f.mockCommunicator).setSystemState(eq(expectedDistr0Bundle), eq(cf.cluster().getNodeInfo(Node.ofDistributor(0))), any());
+ }
+
+ @Test
+ public void bucket_space_states_are_published_verbatim_when_no_additional_timestamps_needed() {
+ Fixture f = new Fixture();
+ ClusterStateBundle stateBundle = makeBundle("distributor:2 storage:2",
+ mapping("default", "distributor:2 storage:2 .0.s:d"),
+ mapping("upsidedown", "distributor:2 .0.s:d storage:2"));
+ ClusterFixture cf = ClusterFixture.forFlatCluster(2).bringEntireClusterUp().assignDummyRpcAddresses();
+ f.broadcaster.handleNewClusterStates(stateBundle);
+ f.broadcaster.broadcastNewState(dbContextFrom(cf.cluster()), f.mockCommunicator);
+
+ cf.cluster().getNodeInfo().forEach(nodeInfo -> verify(f.mockCommunicator).setSystemState(eq(stateBundle), eq(nodeInfo), any()));
+ }
+
+ @Test
+ public void non_observed_startup_timestamps_are_published_per_bucket_space_state() {
+ Fixture f = new Fixture();
+ ClusterStateBundle stateBundle = makeBundle("distributor:2 storage:2",
+ mapping("default", "distributor:2 storage:2 .0.s:d"),
+ mapping("upsidedown", "distributor:2 .0.s:d storage:2"));
+ ClusterFixture cf = ClusterFixture.forFlatCluster(2).bringEntireClusterUp().assignDummyRpcAddresses();
+ f.simulateNodePartitionedAwaySilently(cf);
+ f.broadcaster.handleNewClusterStates(stateBundle);
+ f.broadcaster.broadcastNewState(dbContextFrom(cf.cluster()), f.mockCommunicator);
+
+ clusterNodeInfos(cf.cluster(), Node.ofDistributor(1), Node.ofStorage(0), Node.ofStorage(1)).forEach(nodeInfo -> {
+ // Only distributor 0 should observe startup timestamps
+ verify(f.mockCommunicator).setSystemState(eq(stateBundle), eq(nodeInfo), any());
+ });
+ ClusterStateBundle expectedDistr0Bundle = makeBundle("distributor:2 storage:2 .0.t:600 .1.t:700",
+ mapping("default", "distributor:2 storage:2 .0.s:d .0.t:600 .1.t:700"),
+ mapping("upsidedown", "distributor:2 .0.s:d storage:2 .0.t:600 .1.t:700"));
+ verify(f.mockCommunicator).setSystemState(eq(expectedDistr0Bundle), eq(cf.cluster().getNodeInfo(Node.ofDistributor(0))), any());
+ }
+}
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/StateWaiter.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/StateWaiter.java
index 3b64d3bc1cb..730b03ff7c9 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/StateWaiter.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/StateWaiter.java
@@ -3,6 +3,7 @@ package com.yahoo.vespa.clustercontroller.core.testutils;
import com.yahoo.vdslib.state.ClusterState;
import com.yahoo.vdslib.state.Node;
+import com.yahoo.vespa.clustercontroller.core.ClusterStateBundle;
import com.yahoo.vespa.clustercontroller.core.FakeTimer;
import com.yahoo.vespa.clustercontroller.core.listeners.SystemStateListener;
@@ -22,9 +23,9 @@ public class StateWaiter implements SystemStateListener {
this.timer = timer;
}
- public void handleNewSystemState(ClusterState state) {
+ public void handleNewSystemState(ClusterStateBundle state) {
synchronized(timer) {
- current = state;
+ current = state.getBaselineClusterState();
++stateUpdates;
timer.notifyAll();
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/WaitCondition.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/WaitCondition.java
index ebe0c0edb4b..6396b27aa79 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/WaitCondition.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/WaitCondition.java
@@ -4,6 +4,7 @@ package com.yahoo.vespa.clustercontroller.core.testutils;
import com.yahoo.vdslib.state.ClusterState;
import com.yahoo.vdslib.state.Node;
import com.yahoo.vdslib.state.NodeType;
+import com.yahoo.vespa.clustercontroller.core.ClusterStateBundle;
import com.yahoo.vespa.clustercontroller.core.DummyVdsNode;
import com.yahoo.vespa.clustercontroller.core.FleetController;
import com.yahoo.vespa.clustercontroller.core.listeners.SystemStateListener;
@@ -25,9 +26,9 @@ public interface WaitCondition {
protected ClusterState currentState;
private final SystemStateListener listener = new SystemStateListener() {
@Override
- public void handleNewSystemState(ClusterState state) {
+ public void handleNewSystemState(ClusterStateBundle state) {
synchronized (monitor) {
- currentState = state;
+ currentState = state.getBaselineClusterState();
monitor.notifyAll();
}
}