summaryrefslogtreecommitdiffstats
path: root/clustercontroller-core
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@oath.com>2018-02-20 13:55:59 +0100
committerTor Brede Vekterli <vekterli@oath.com>2018-02-22 14:17:22 +0100
commit4cd6e2a3c02fe49409fa0a888af87e84ec93fa7a (patch)
tree8b90a25de4e201374c973a53f3e0329416447b53 /clustercontroller-core
parent49f310b5545eaac8aa8567f7d0a5b1944505e011 (diff)
Initial support for per bucket space cluster states in cluster controller
Multiple spaces are only enabled in tests, so the controller still only generates the legacy baseline cluster state, maintaining today's behavior.
Diffstat (limited to 'clustercontroller-core')
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/AnnotatedClusterState.java4
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateBundle.java149
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateDeriver.java18
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/Communicator.java2
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java79
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetControllerOptions.java4
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/MaintenanceWhenPendingGlobalMerges.java66
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/MergePendingChecker.java13
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/StateVersionTracker.java57
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java55
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/DatabaseHandler.java8
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/listeners/SystemStateListener.java4
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java13
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterFixture.java5
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterStateBundleTest.java99
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyCommunicator.java7
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MaintenancehenPendingGlobalMergesTest.java71
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateVersionTrackerTest.java50
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java145
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/StateWaiter.java5
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/WaitCondition.java5
21 files changed, 757 insertions, 102 deletions
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/AnnotatedClusterState.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/AnnotatedClusterState.java
index f935b8b2cc7..9bf36cca947 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/AnnotatedClusterState.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/AnnotatedClusterState.java
@@ -28,6 +28,10 @@ public class AnnotatedClusterState {
return new AnnotatedClusterState(ClusterState.emptyState(), Optional.empty(), emptyNodeStateReasons());
}
+ public static AnnotatedClusterState withoutAnnotations(ClusterState state) {
+ return new AnnotatedClusterState(state, Optional.empty(), emptyNodeStateReasons());
+ }
+
static Map<Node, NodeStateReason> emptyNodeStateReasons() {
return Collections.emptyMap();
}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateBundle.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateBundle.java
new file mode 100644
index 00000000000..09273ee5656
--- /dev/null
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateBundle.java
@@ -0,0 +1,149 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.clustercontroller.core;
+
+import com.yahoo.vdslib.state.ClusterState;
+
+import java.util.*;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * A cluster state bundle is a wrapper around the baseline ("source of truth") cluster
+ * state and any bucket space specific states that may be derived from it.
+ *
+ * The baseline state represents the generated state of the _nodes_ in the cluster,
+ * while the per-space states represent possible transformations that make sense in
+ * the context of that particular bucket space. The most prominent example is
+ * transforming nodes in the default bucket space into maintenance mode if they have
+ * merges pending in the global space.
+ *
+ * The baseline state is identical to the legacy, global cluster state that the
+ * cluster controller has historically produced as its only output.
+ */
+public class ClusterStateBundle {
+
+ private final AnnotatedClusterState baselineState;
+ private final Map<String, ClusterState> derivedBucketSpaceStates;
+
+ public static class Builder {
+ private final AnnotatedClusterState baselineState;
+ private ClusterStateDeriver stateDeriver;
+ private Set<String> bucketSpaces;
+
+ public Builder(AnnotatedClusterState baselineState) {
+ this.baselineState = baselineState;
+ }
+
+ public Builder stateDeriver(ClusterStateDeriver stateDeriver) {
+ this.stateDeriver = stateDeriver;
+ return this;
+ }
+
+ public Builder bucketSpaces(Set<String> bucketSpaces) {
+ this.bucketSpaces = bucketSpaces;
+ return this;
+ }
+
+ public Builder bucketSpaces(String... bucketSpaces) {
+ this.bucketSpaces = new TreeSet<>(Arrays.asList(bucketSpaces));
+ return this;
+ }
+
+ public ClusterStateBundle deriveAndBuild() {
+ if (stateDeriver == null || bucketSpaces == null || bucketSpaces.isEmpty()) {
+ return ClusterStateBundle.ofBaselineOnly(baselineState);
+ }
+ Map<String, ClusterState> derived = bucketSpaces.stream()
+ .collect(Collectors.toMap(
+ Function.identity(),
+ s -> stateDeriver.derivedFrom(baselineState.getClusterState(), s)));
+ return new ClusterStateBundle(baselineState, derived);
+ }
+ }
+
+ private ClusterStateBundle(AnnotatedClusterState baselineState, Map<String, ClusterState> derivedBucketSpaceStates) {
+ this.baselineState = baselineState;
+ this.derivedBucketSpaceStates = Collections.unmodifiableMap(derivedBucketSpaceStates);
+ }
+
+ public static Builder builder(AnnotatedClusterState baselineState) {
+ return new Builder(baselineState);
+ }
+
+ public static ClusterStateBundle of(AnnotatedClusterState baselineState, Map<String, ClusterState> derivedBucketSpaceStates) {
+ return new ClusterStateBundle(baselineState, derivedBucketSpaceStates);
+ }
+
+ public static ClusterStateBundle ofBaselineOnly(AnnotatedClusterState baselineState) {
+ return new ClusterStateBundle(baselineState, Collections.emptyMap());
+ }
+
+ public AnnotatedClusterState getBaselineAnnotatedState() {
+ return baselineState;
+ }
+
+ public ClusterState getBaselineClusterState() {
+ return baselineState.getClusterState();
+ }
+
+ public Map<String, ClusterState> getDerivedBucketSpaceStates() {
+ return derivedBucketSpaceStates;
+ }
+
+ public ClusterStateBundle cloneWithMapper(Function<ClusterState, ClusterState> mapper) {
+ AnnotatedClusterState clonedBaseline = new AnnotatedClusterState(
+ mapper.apply(baselineState.getClusterState().clone()),
+ baselineState.getClusterStateReason(),
+ baselineState.getNodeStateReasons());
+ Map<String, ClusterState> clonedDerived = derivedBucketSpaceStates.entrySet().stream()
+ .collect(Collectors.toMap(e -> e.getKey(), e -> mapper.apply(e.getValue().clone())));
+ return new ClusterStateBundle(clonedBaseline, clonedDerived);
+ }
+
+ public ClusterStateBundle clonedWithVersionSet(int version) {
+ return cloneWithMapper(state -> {
+ state.setVersion(version);
+ return state;
+ });
+ }
+
+ public boolean similarTo(ClusterStateBundle other) {
+ if (!baselineState.getClusterState().similarToIgnoringInitProgress(other.baselineState.getClusterState())) {
+ return false;
+ }
+ // FIXME we currently treat mismatching bucket space sets as unchanged to avoid breaking some tests
+ return derivedBucketSpaceStates.entrySet().stream()
+ .allMatch(entry -> other.derivedBucketSpaceStates.getOrDefault(entry.getKey(), entry.getValue())
+ .similarToIgnoringInitProgress(entry.getValue()));
+ }
+
+ public int getVersion() {
+ return baselineState.getClusterState().getVersion();
+ }
+
+ @Override
+ public String toString() {
+ if (derivedBucketSpaceStates.isEmpty()) {
+ return String.format("ClusterStateBundle('%s')", baselineState);
+ }
+ Map<String, ClusterState> orderedStates = new TreeMap<>(derivedBucketSpaceStates);
+ return String.format("ClusterStateBundle('%s', %s)", baselineState, orderedStates.entrySet().stream()
+ .map(e -> String.format("%s '%s'", e.getKey(), e.getValue()))
+ .collect(Collectors.joining(", ")));
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ ClusterStateBundle that = (ClusterStateBundle) o;
+ return Objects.equals(baselineState, that.baselineState) &&
+ Objects.equals(derivedBucketSpaceStates, that.derivedBucketSpaceStates);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(baselineState, derivedBucketSpaceStates);
+ }
+
+}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateDeriver.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateDeriver.java
new file mode 100644
index 00000000000..9e42e1da649
--- /dev/null
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateDeriver.java
@@ -0,0 +1,18 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.clustercontroller.core;
+
+import com.yahoo.vdslib.state.ClusterState;
+
+/**
+ * Bucket-space aware transformation factory to "derive" new cluster states from an
+ * existing state.
+ */
+public interface ClusterStateDeriver {
+ /**
+ * @param state Baseline cluster state used as a source for deriving a new state.
+ * MUST NOT be modified explicitly or implicitly.
+ * @param bucketSpace The name of the bucket space for which the state should be derived
+ * @return A cluster state instance representing the derived state, or <em>state</em> unchanged.
+ */
+ ClusterState derivedFrom(ClusterState state, String bucketSpace);
+}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/Communicator.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/Communicator.java
index de0ba6bf33c..450513343b0 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/Communicator.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/Communicator.java
@@ -18,7 +18,7 @@ public interface Communicator {
void getNodeState(NodeInfo node, Waiter<GetNodeStateRequest> waiter);
- void setSystemState(ClusterState state, NodeInfo node, Waiter<SetClusterStateRequest> waiter);
+ void setSystemState(ClusterStateBundle states, NodeInfo node, Waiter<SetClusterStateRequest> waiter);
void shutdown();
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java
index ba56d75ab4c..1059434aac3 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java
@@ -1,6 +1,7 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.clustercontroller.core;
+import com.yahoo.document.FixedBucketSpaces;
import com.yahoo.jrt.ListenFailedException;
import com.yahoo.log.LogLevel;
import com.yahoo.vdslib.distribution.ConfiguredNode;
@@ -24,18 +25,11 @@ import org.apache.commons.lang.exception.ExceptionUtils;
import java.io.FileNotFoundException;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-import java.util.TimeZone;
+import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAddedOrRemovedListener, SystemStateListener,
Runnable, RemoteClusterControllerTaskScheduler {
@@ -69,7 +63,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
private boolean waitingForCycle = false;
private StatusPageServer.PatternRequestRouter statusRequestRouter = new StatusPageServer.PatternRequestRouter();
- private final List<com.yahoo.vdslib.state.ClusterState> newStates = new ArrayList<>();
+ private final List<ClusterStateBundle> newStates = new ArrayList<>();
private long configGeneration = -1;
private long nextConfigGeneration = -1;
private Queue<RemoteClusterControllerTask> remoteTasks = new LinkedList<>();
@@ -84,6 +78,11 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
// Invariant: queued task versions are monotonically increasing with queue position
private Queue<VersionDependentTaskCompletion> taskCompletionQueue = new ArrayDeque<>();
+ // Legacy behavior is an empty set of explicitly configured bucket spaces, which means that
+ // only a baseline cluster state will be sent from the controller and no per-space state
+ // deriving is done.
+ private Set<String> configuredBucketSpaces = Collections.emptySet();
+
private final RunDataExtractor dataExtractor = new RunDataExtractor() {
@Override
public com.yahoo.vdslib.state.ClusterState getLatestClusterState() { return stateVersionTracker.getVersionedClusterState(); }
@@ -233,7 +232,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
// Always give cluster state listeners the current state, in case acceptable state has come before listener is registered.
com.yahoo.vdslib.state.ClusterState state = getSystemState();
if (state == null) throw new NullPointerException("Cluster state should never be null at this point");
- listener.handleNewSystemState(state);
+ listener.handleNewSystemState(ClusterStateBundle.ofBaselineOnly(AnnotatedClusterState.withoutAnnotations(state)));
}
}
@@ -350,16 +349,18 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
stateChangeHandler.handleReturnedRpcAddress(node);
}
- public void handleNewSystemState(com.yahoo.vdslib.state.ClusterState state) {
+ @Override
+ public void handleNewSystemState(ClusterStateBundle stateBundle) {
verifyInControllerThread();
- newStates.add(state);
- metricUpdater.updateClusterStateMetrics(cluster, state);
- systemStateBroadcaster.handleNewSystemState(state);
+ ClusterState baselineState = stateBundle.getBaselineClusterState();
+ newStates.add(stateBundle);
+ metricUpdater.updateClusterStateMetrics(cluster, baselineState);
+ systemStateBroadcaster.handleNewClusterStates(stateBundle);
// Iff master, always store new version in ZooKeeper _before_ publishing to any
// nodes so that a cluster controller crash after publishing but before a successful
// ZK store will not risk reusing the same version number.
if (masterElectionHandler.isMaster()) {
- storeClusterStateVersionToZooKeeper(state);
+ storeClusterStateVersionToZooKeeper(baselineState);
}
}
@@ -425,8 +426,9 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
// Check retirement changes
for (ConfiguredNode node : newNodes) {
- if (node.retired() != cluster.getConfiguredNodes().get(node.index()).retired())
+ if (node.retired() != cluster.getConfiguredNodes().get(node.index()).retired()) {
return true;
+ }
}
return false;
@@ -441,10 +443,20 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
cluster.setSlobrokGenerationCount(0);
}
+ // TODO don't hardcode bucket spaces
+ if (options.enableMultipleBucketSpaces) {
+ configuredBucketSpaces = Collections.unmodifiableSet(
+ Stream.of(FixedBucketSpaces.defaultSpace(), FixedBucketSpaces.globalSpace())
+ .collect(Collectors.toSet()));
+ } else {
+ configuredBucketSpaces = Collections.emptySet();
+ }
+
communicator.propagateOptions(options);
- if (nodeLookup instanceof SlobrokClient)
- ((SlobrokClient)nodeLookup).setSlobrokConnectionSpecs(options.slobrokConnectionSpecs);
+ if (nodeLookup instanceof SlobrokClient) {
+ ((SlobrokClient) nodeLookup).setSlobrokConnectionSpecs(options.slobrokConnectionSpecs);
+ }
eventLog.setMaxSize(options.eventLogMaxSize, options.eventNodeLogMaxSize);
cluster.setPollingFrequency(options.statePollingFrequency);
cluster.setDistribution(options.storageDistribution);
@@ -625,7 +637,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
return false;
}
- private boolean broadcastClusterStateToEligibleNodes() throws InterruptedException {
+ private boolean broadcastClusterStateToEligibleNodes() {
boolean sentAny = false;
// Give nodes a fair chance to respond first time to state gathering requests, so we don't
// disturb system when we take over. Allow anyways if we have states from all nodes.
@@ -638,7 +650,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
// Reset timer to only see warning once.
firstAllowedStateBroadcast = currentTime;
}
- sentAny = systemStateBroadcaster.broadcastNewState(database, databaseContext, communicator, this);
+ sentAny = systemStateBroadcaster.broadcastNewState(databaseContext, communicator);
if (sentAny) {
nextStateSendTime = currentTime + options.minTimeBetweenNewSystemStates;
}
@@ -649,9 +661,9 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
private void propagateNewStatesToListeners() {
if ( ! newStates.isEmpty()) {
synchronized (systemStateListeners) {
- for (ClusterState state : newStates) {
+ for (ClusterStateBundle stateBundle : newStates) {
for(SystemStateListener listener : systemStateListeners) {
- listener.handleNewSystemState(state);
+ listener.handleNewSystemState(stateBundle);
}
}
newStates.clear();
@@ -778,7 +790,13 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
if (mustRecomputeCandidateClusterState()) {
stateChangeHandler.unsetStateChangedFlag();
final AnnotatedClusterState candidate = computeCurrentAnnotatedState();
- stateVersionTracker.updateLatestCandidateState(candidate);
+ // TODO test multiple bucket spaces configured
+ // TODO what interaction do we want between generated and derived states wrt. auto group take-downs?
+ final ClusterStateBundle candidateBundle = ClusterStateBundle.builder(candidate)
+ .bucketSpaces(configuredBucketSpaces)
+ .stateDeriver(createBucketSpaceStateDeriver())
+ .deriveAndBuild();
+ stateVersionTracker.updateLatestCandidateStateBundle(candidateBundle);
if (stateVersionTracker.candidateChangedEnoughFromCurrentToWarrantPublish()
|| stateVersionTracker.hasReceivedNewVersionFromZooKeeper())
@@ -787,8 +805,9 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
final AnnotatedClusterState before = stateVersionTracker.getAnnotatedVersionedClusterState();
stateVersionTracker.promoteCandidateToVersionedState(timeNowMs);
+ // TODO also emit derived state edges events
emitEventsForAlteredStateEdges(before, stateVersionTracker.getAnnotatedVersionedClusterState(), timeNowMs);
- handleNewSystemState(stateVersionTracker.getVersionedClusterState());
+ handleNewSystemState(stateVersionTracker.getVersionedClusterStateBundle());
stateWasChanged = true;
}
}
@@ -804,6 +823,10 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
return stateWasChanged;
}
+ private ClusterStateDeriver createBucketSpaceStateDeriver() {
+ return new MaintenanceWhenPendingGlobalMerges((space, index) -> false); // TODO wire with stats aggregator
+ }
+
/**
* Move tasks that are dependent on the most recently generated state being published into
* a completion queue with a dependency on the provided version argument. Once that version
@@ -862,7 +885,9 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
}
private boolean mustRecomputeCandidateClusterState() {
- return stateChangeHandler.stateMayHaveChanged() || stateVersionTracker.hasReceivedNewVersionFromZooKeeper();
+ return stateChangeHandler.stateMayHaveChanged()
+ || stateVersionTracker.hasReceivedNewVersionFromZooKeeper()
+ || stateVersionTracker.bucketSpaceMergeCompletionStateHasChanged();
}
private boolean handleLeadershipEdgeTransitions() throws InterruptedException {
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetControllerOptions.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetControllerOptions.java
index d8c853f45cb..3bf8edafbe3 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetControllerOptions.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetControllerOptions.java
@@ -119,6 +119,9 @@ public class FleetControllerOptions implements Cloneable {
private Duration maxDeferredTaskVersionWaitTime = Duration.ofSeconds(30);
+ // TODO replace this flag with a set of bucket spaces instead
+ public boolean enableMultipleBucketSpaces = false;
+
// TODO: Replace usage of this by usage where the nodes are explicitly passed (below)
public FleetControllerOptions(String clusterName) {
this.clusterName = clusterName;
@@ -225,6 +228,7 @@ public class FleetControllerOptions implements Cloneable {
sb.append("<tr><td><nobr>Maximum node event log size</nobr></td><td align=\"right\">").append(eventNodeLogMaxSize).append("</td></tr>");
sb.append("<tr><td><nobr>Wanted distribution bits</nobr></td><td align=\"right\">").append(distributionBits).append("</td></tr>");
sb.append("<tr><td><nobr>Max deferred task version wait time</nobr></td><td align=\"right\">").append(maxDeferredTaskVersionWaitTime.toMillis()).append("ms</td></tr>");
+ sb.append("<tr><td><nobr>Multiple bucket spaces enabled</nobr></td><td align=\"right\">").append(enableMultipleBucketSpaces).append("</td></tr>");
sb.append("</table>");
}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/MaintenanceWhenPendingGlobalMerges.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/MaintenanceWhenPendingGlobalMerges.java
new file mode 100644
index 00000000000..16401a79f08
--- /dev/null
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/MaintenanceWhenPendingGlobalMerges.java
@@ -0,0 +1,66 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.clustercontroller.core;
+
+import com.yahoo.document.FixedBucketSpaces;
+import com.yahoo.vdslib.state.*;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Cluster state deriver which checks if nodes have merges pending for globally
+ * distributed documents, and if they do, sets the node in maintenance mode in the
+ * default bucket space. This allows merges to complete for global documents before
+ * the default space documents that have references to them are made searchable.
+ *
+ * Note: bucket spaces are currently hard-coded.
+ */
+public class MaintenanceWhenPendingGlobalMerges implements ClusterStateDeriver {
+
+ // TODO make these configurable
+ private static final String bucketSpaceToCheck = FixedBucketSpaces.globalSpace();
+ private static final String bucketSpaceToDerive = FixedBucketSpaces.defaultSpace();
+
+ private final MergePendingChecker mergePendingChecker;
+
+ public MaintenanceWhenPendingGlobalMerges(MergePendingChecker mergePendingChecker) {
+ this.mergePendingChecker = mergePendingChecker;
+ }
+
+ @Override
+ public ClusterState derivedFrom(ClusterState baselineState, String bucketSpace) {
+ ClusterState derivedState = baselineState.clone();
+ if (!bucketSpace.equals(bucketSpaceToDerive)) {
+ return derivedState;
+ }
+ Set<Integer> incompleteNodeIndices = nodesWithMergesNotDone(baselineState);
+ if (incompleteNodeIndices.isEmpty()) {
+ return derivedState; // Nothing to do
+ }
+ incompleteNodeIndices.forEach(nodeIndex -> derivedState.setNodeState(Node.ofStorage(nodeIndex),
+ new NodeState(NodeType.STORAGE, State.MAINTENANCE)));
+ return derivedState;
+ }
+
+ private Set<Integer> nodesWithMergesNotDone(ClusterState baselineState) {
+ final Set<Integer> incompleteNodes = new HashSet<>();
+ final int nodeCount = baselineState.getNodeCount(NodeType.STORAGE);
+ for (int nodeIndex = 0; nodeIndex < nodeCount; ++nodeIndex) {
+ // FIXME should only set nodes into maintenance if they've not yet been up in the cluster
+ // state since they came back as Reported state Up!
+ // Must be implemented before this state deriver is enabled in production.
+ if (contentNodeIsAvailable(baselineState, nodeIndex) && hasMergesNotDone(bucketSpaceToCheck, nodeIndex)) {
+ incompleteNodes.add(nodeIndex);
+ }
+ }
+ return incompleteNodes;
+ }
+
+ private boolean contentNodeIsAvailable(ClusterState state, int nodeIndex) {
+ return state.getNodeState(Node.ofStorage(nodeIndex)).getState().oneOf("uir");
+ }
+
+ private boolean hasMergesNotDone(String bucketSpace, int nodeIndex) {
+ return mergePendingChecker.hasMergesPending(bucketSpace, nodeIndex);
+ }
+}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/MergePendingChecker.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/MergePendingChecker.java
new file mode 100644
index 00000000000..e1b844c53de
--- /dev/null
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/MergePendingChecker.java
@@ -0,0 +1,13 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.clustercontroller.core;
+
+/**
+ * Implementations provide functionality for checking if a particular bucket space
+ * has merges reported as pending from the cluster's distributor nodes. It is up
+ * to the implementation to determine the exact semantics of what "pending" implies.
+ */
+public interface MergePendingChecker {
+
+ boolean hasMergesPending(String bucketSpace, int contentNodeIndex);
+
+}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/StateVersionTracker.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/StateVersionTracker.java
index 902189fca8b..fffe1c95124 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/StateVersionTracker.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/StateVersionTracker.java
@@ -28,9 +28,9 @@ public class StateVersionTracker {
// TODO this mirrors legacy behavior, but should be moved into stable ZK state.
private int lowestObservedDistributionBits = 16;
- private ClusterState currentUnversionedState = ClusterState.emptyState();
- private AnnotatedClusterState latestCandidateState = AnnotatedClusterState.emptyState();
- private AnnotatedClusterState currentClusterState = latestCandidateState;
+ private ClusterStateBundle currentUnversionedState = ClusterStateBundle.ofBaselineOnly(AnnotatedClusterState.emptyState());
+ private ClusterStateBundle latestCandidateState = ClusterStateBundle.ofBaselineOnly(AnnotatedClusterState.emptyState());
+ private ClusterStateBundle currentClusterState = latestCandidateState;
private ClusterStateView clusterStateView;
@@ -38,7 +38,7 @@ public class StateVersionTracker {
private int maxHistoryEntryCount = 50;
StateVersionTracker() {
- clusterStateView = ClusterStateView.create(currentUnversionedState);
+ clusterStateView = ClusterStateView.create(currentUnversionedState.getBaselineClusterState());
}
void setVersionRetrievedFromZooKeeper(final int version) {
@@ -70,26 +70,30 @@ public class StateVersionTracker {
}
AnnotatedClusterState getAnnotatedVersionedClusterState() {
- return currentClusterState;
+ return currentClusterState.getBaselineAnnotatedState();
}
public ClusterState getVersionedClusterState() {
- return currentClusterState.getClusterState();
+ return currentClusterState.getBaselineClusterState();
+ }
+
+ public ClusterStateBundle getVersionedClusterStateBundle() {
+ return currentClusterState;
}
- public void updateLatestCandidateState(final AnnotatedClusterState candidate) {
- assert(latestCandidateState.getClusterState().getVersion() == 0);
- latestCandidateState = candidate;
+ public void updateLatestCandidateStateBundle(final ClusterStateBundle candidateBundle) {
+ assert(latestCandidateState.getBaselineClusterState().getVersion() == 0);
+ latestCandidateState = candidateBundle;
}
/**
- * Returns the last state provided to updateLatestCandidateState, which _may or may not_ be
+ * Returns the last state provided to updateLatestCandidateStateBundle, which _may or may not_ be
* a published state. Primary use case for this function is a caller which is interested in
* changes that may not be reflected in the published state. The best example of this would
* be node state changes when a cluster is marked as Down.
*/
public AnnotatedClusterState getLatestCandidateState() {
- return latestCandidateState;
+ return latestCandidateState.getBaselineAnnotatedState();
}
public List<ClusterStateHistoryEntry> getClusterStateHistory() {
@@ -97,7 +101,9 @@ public class StateVersionTracker {
}
boolean candidateChangedEnoughFromCurrentToWarrantPublish() {
- return !currentUnversionedState.similarToIgnoringInitProgress(latestCandidateState.getClusterState());
+ // Neither latestCandidateState nor currentUnversionedState has a version set, so the
+ // similarity is only done on structural state metadata.
+ return !currentUnversionedState.similarTo(latestCandidateState);
}
void promoteCandidateToVersionedState(final long currentTimeMs) {
@@ -108,23 +114,19 @@ public class StateVersionTracker {
recordCurrentStateInHistoryAtTime(currentTimeMs);
}
- private void updateStatesForNewVersion(final AnnotatedClusterState newState, final int newVersion) {
- currentClusterState = new AnnotatedClusterState(
- newState.getClusterState().clone(), // Because we mutate version below
- newState.getClusterStateReason(),
- newState.getNodeStateReasons());
- currentClusterState.getClusterState().setVersion(newVersion);
- currentUnversionedState = newState.getClusterState().clone();
+ private void updateStatesForNewVersion(final ClusterStateBundle newStateBundle, final int newVersion) {
+ currentClusterState = newStateBundle.clonedWithVersionSet(newVersion);
+ currentUnversionedState = newStateBundle; // TODO should we clone..? ClusterState really should be made immutable
lowestObservedDistributionBits = Math.min(
lowestObservedDistributionBits,
- newState.getClusterState().getDistributionBitCount());
- // TODO should this take place in updateLatestCandidateState instead? I.e. does it require a consolidated state?
- clusterStateView = ClusterStateView.create(currentClusterState.getClusterState());
+ newStateBundle.getBaselineClusterState().getDistributionBitCount());
+ // TODO should this take place in updateLatestCandidateStateBundle instead? I.e. does it require a consolidated state?
+ clusterStateView = ClusterStateView.create(currentClusterState.getBaselineClusterState());
}
private void recordCurrentStateInHistoryAtTime(final long currentTimeMs) {
clusterStateHistory.addFirst(new ClusterStateHistoryEntry(
- currentClusterState.getClusterState(), currentTimeMs));
+ currentClusterState.getBaselineClusterState(), currentTimeMs));
while (clusterStateHistory.size() > maxHistoryEntryCount) {
clusterStateHistory.removeLast();
}
@@ -135,4 +137,13 @@ public class StateVersionTracker {
clusterStateView.handleUpdatedHostInfo(hostnames, node, hostInfo);
}
+ boolean bucketSpaceMergeCompletionStateHasChanged() {
+ return false; // TODO wire changes in merge info
+ }
+
+ /*
+ TODO test and implement
+ - derived default space down-condition can only _keep_ a node in maintenance (down), not transition it from up -> maintenance
+ */
+
}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java
index 024c5d21607..45a14ee19cd 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java
@@ -18,7 +18,7 @@ public class SystemStateBroadcaster {
private final Timer timer;
private final Object monitor;
- private ClusterState systemState;
+ private ClusterStateBundle clusterStateBundle;
private final List<SetClusterStateRequest> replies = new LinkedList<>();
private final static long minTimeBetweenNodeErrorLogging = 10 * 60 * 1000;
@@ -32,12 +32,12 @@ public class SystemStateBroadcaster {
this.monitor = monitor;
}
- public void handleNewSystemState(ClusterState state) {
- systemState = state;
+ public void handleNewClusterStates(ClusterStateBundle state) {
+ clusterStateBundle = state;
}
public ClusterState getClusterState() {
- return systemState;
+ return clusterStateBundle.getBaselineClusterState();
}
private void reportNodeError(boolean nodeOk, NodeInfo info, String message) {
@@ -79,7 +79,7 @@ public class SystemStateBroadcaster {
}
private boolean nodeNeedsClusterState(NodeInfo node) {
- if (node.getSystemStateVersionAcknowledged() == systemState.getVersion()) {
+ if (node.getSystemStateVersionAcknowledged() == clusterStateBundle.getVersion()) {
return false; // No point in sending if node already has updated system state
}
if (node.getRpcAddress() == null || node.isRpcAddressOutdated()) {
@@ -102,7 +102,7 @@ public class SystemStateBroadcaster {
}
private boolean newestStateAlreadySentToNode(NodeInfo node) {
- return (node.getNewestSystemStateVersionSent() == systemState.getVersion());
+ return (node.getNewestSystemStateVersionSent() == clusterStateBundle.getVersion());
}
/**
@@ -113,42 +113,45 @@ public class SystemStateBroadcaster {
void checkIfClusterStateIsAckedByAllDistributors(DatabaseHandler database,
DatabaseHandler.Context dbContext,
FleetController fleetController) throws InterruptedException {
- if ((systemState == null) || (lastClusterStateInSync == systemState.getVersion())) {
+ final int currentStateVersion = clusterStateBundle.getVersion();
+ if ((clusterStateBundle == null) || (lastClusterStateInSync == currentStateVersion)) {
return; // Nothing to do for the current state
}
boolean anyOutdatedDistributorNodes = dbContext.getCluster().getNodeInfo().stream()
.filter(NodeInfo::isDistributor)
.anyMatch(this::nodeNeedsClusterState);
- if (!anyOutdatedDistributorNodes && (systemState.getVersion() > lastClusterStateInSync)) {
+ if (!anyOutdatedDistributorNodes && (currentStateVersion > lastClusterStateInSync)) {
log.log(LogLevel.DEBUG, "All distributors have newest clusterstate, updating start timestamps in zookeeper and clearing them from cluster state");
- lastClusterStateInSync = systemState.getVersion();
+ lastClusterStateInSync = currentStateVersion;
fleetController.handleAllDistributorsInSync(database, dbContext);
}
}
- public boolean broadcastNewState(DatabaseHandler database,
- DatabaseHandler.Context dbContext,
- Communicator communicator,
- FleetController fleetController) throws InterruptedException {
- if (systemState == null) return false;
+ public boolean broadcastNewState(DatabaseHandler.Context dbContext, Communicator communicator) {
+ if (clusterStateBundle == null) {
+ return false;
+ }
+
+ ClusterState baselineState = clusterStateBundle.getBaselineClusterState();
- if (!systemState.isOfficial()) {
- log.log(LogLevel.INFO, String.format("Publishing cluster state version %d", systemState.getVersion()));
- systemState.setOfficial(true);
+ if (!baselineState.isOfficial()) {
+ log.log(LogLevel.INFO, String.format("Publishing cluster state version %d", baselineState.getVersion()));
+ baselineState.setOfficial(true);
}
List<NodeInfo> recipients = resolveStateVersionSendSet(dbContext);
for (NodeInfo node : recipients) {
if (nodeNeedsToObserveStartupTimestamps(node)) {
- ClusterState newState = buildModifiedClusterState(dbContext);
- log.log(LogLevel.DEBUG, "Sending modified system state version " + systemState.getVersion()
- + " to node " + node + ": " + newState);
- communicator.setSystemState(newState, node, waiter);
+ // TODO this is the same for all nodes, compute only once
+ ClusterStateBundle modifiedBundle = clusterStateBundle.cloneWithMapper(state -> buildModifiedClusterState(state, dbContext));
+ log.log(LogLevel.DEBUG, "Sending modified cluster state version " + baselineState.getVersion()
+ + " to node " + node + ": " + modifiedBundle);
+ communicator.setSystemState(modifiedBundle, node, waiter);
} else {
- log.log(LogLevel.DEBUG, "Sending system state version " + systemState.getVersion() + " to node " + node
+ log.log(LogLevel.DEBUG, "Sending system state version " + baselineState.getVersion() + " to node " + node
+ ". (went down time " + node.getWentDownWithStartTime() + ", node start time " + node.getStartTimestamp() + ")");
- communicator.setSystemState(systemState, node, waiter);
+ communicator.setSystemState(clusterStateBundle, node, waiter);
}
}
@@ -157,12 +160,12 @@ public class SystemStateBroadcaster {
public int lastClusterStateVersionInSync() { return lastClusterStateInSync; }
- private boolean nodeNeedsToObserveStartupTimestamps(NodeInfo node) {
+ private static boolean nodeNeedsToObserveStartupTimestamps(NodeInfo node) {
return node.getStartTimestamp() != 0 && node.getWentDownWithStartTime() == node.getStartTimestamp();
}
- private ClusterState buildModifiedClusterState(DatabaseHandler.Context dbContext) {
- ClusterState newState = systemState.clone();
+ private static ClusterState buildModifiedClusterState(ClusterState sourceState, DatabaseHandler.Context dbContext) {
+ ClusterState newState = sourceState.clone();
for (NodeInfo n : dbContext.getCluster().getNodeInfo()) {
NodeState ns = newState.getNodeState(n.getNode());
if (!n.isDistributor() && ns.getStartTimestamp() == 0) {
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/DatabaseHandler.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/DatabaseHandler.java
index 07f42301695..c89bb6a136a 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/DatabaseHandler.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/DatabaseHandler.java
@@ -28,10 +28,10 @@ public class DatabaseHandler {
private static Logger log = Logger.getLogger(DatabaseHandler.class.getName());
public interface Context {
- public ContentCluster getCluster();
- public FleetController getFleetController();
- public NodeAddedOrRemovedListener getNodeAddedOrRemovedListener();
- public NodeStateOrHostInfoChangeHandler getNodeStateUpdateListener();
+ ContentCluster getCluster();
+ FleetController getFleetController();
+ NodeAddedOrRemovedListener getNodeAddedOrRemovedListener();
+ NodeStateOrHostInfoChangeHandler getNodeStateUpdateListener();
}
private class Data {
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/listeners/SystemStateListener.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/listeners/SystemStateListener.java
index 617f91cb837..40fe91f0c1e 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/listeners/SystemStateListener.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/listeners/SystemStateListener.java
@@ -1,10 +1,10 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.clustercontroller.core.listeners;
-import com.yahoo.vdslib.state.ClusterState;
+import com.yahoo.vespa.clustercontroller.core.ClusterStateBundle;
public interface SystemStateListener {
- void handleNewSystemState(ClusterState state);
+ void handleNewSystemState(ClusterStateBundle states);
}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java
index 1887c1dd2ad..0cc5ef36e3c 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java
@@ -157,8 +157,9 @@ public class RPCCommunicator implements Communicator {
}
@Override
- public void setSystemState(ClusterState state, NodeInfo node, Waiter<SetClusterStateRequest> externalWaiter) {
- RPCSetClusterStateWaiter waiter = new RPCSetClusterStateWaiter(externalWaiter, timer);
+ public void setSystemState(ClusterStateBundle stateBundle, NodeInfo node, Waiter<SetClusterStateRequest> externalWaiter) {
+ final RPCSetClusterStateWaiter waiter = new RPCSetClusterStateWaiter(externalWaiter, timer);
+ final ClusterState baselineState = stateBundle.getBaselineClusterState();
Target connection = getConnection(node);
if (!connection.isValid()) {
@@ -172,17 +173,17 @@ public class RPCCommunicator implements Communicator {
Request req;
if (node.getVersion() == 0) {
req = new Request("setsystemstate");
- req.parameters().add(new StringValue(state.toString(true)));
+ req.parameters().add(new StringValue(baselineState.toString(true)));
} else {
req = new Request("setsystemstate2");
- req.parameters().add(new StringValue(state.toString(false)));
+ req.parameters().add(new StringValue(baselineState.toString(false)));
}
- RPCSetClusterStateRequest stateRequest = new RPCSetClusterStateRequest(node, req, state.getVersion());
+ RPCSetClusterStateRequest stateRequest = new RPCSetClusterStateRequest(node, req, baselineState.getVersion());
waiter.setRequest(stateRequest);
connection.invokeAsync(req, 60, waiter);
- node.setSystemStateVersionSent(state);
+ node.setSystemStateVersionSent(baselineState);
}
// protected for testing.
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterFixture.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterFixture.java
index 16e9675d586..4862c83f2a6 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterFixture.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterFixture.java
@@ -149,6 +149,11 @@ class ClusterFixture {
return this;
}
+ ClusterFixture assignDummyRpcAddresses() {
+ cluster.getNodeInfo().forEach(ni -> ni.setRpcAddress("tcp/localhost:0"));
+ return this;
+ }
+
static Map<NodeType, Integer> buildTransitionTimeMap(int distributorTransitionTime, int storageTransitionTime) {
Map<NodeType, Integer> maxTransitionTime = new TreeMap<>();
maxTransitionTime.put(NodeType.DISTRIBUTOR, distributorTransitionTime);
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterStateBundleTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterStateBundleTest.java
new file mode 100644
index 00000000000..04a0451bc6d
--- /dev/null
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterStateBundleTest.java
@@ -0,0 +1,99 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.clustercontroller.core;
+
+import com.yahoo.vdslib.state.*;
+import org.junit.Test;
+
+import java.text.ParseException;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class ClusterStateBundleTest {
+
+ private static ClusterState stateOf(String state) {
+ try {
+ return new ClusterState(state);
+ } catch (ParseException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static ClusterStateBundle createTestBundle(boolean modifyDefaultSpace) {
+ return ClusterStateBundle
+ .builder(AnnotatedClusterState.withoutAnnotations(stateOf("distributor:2 storage:2")))
+ .bucketSpaces("default", "global", "narnia")
+ .stateDeriver((state, space) -> {
+ ClusterState derived = state.clone();
+ if (space.equals("default") && modifyDefaultSpace) {
+ derived.setNodeState(Node.ofStorage(0), new NodeState(NodeType.STORAGE, State.DOWN));
+ } else if (space.equals("narnia")) {
+ derived.setNodeState(Node.ofDistributor(0), new NodeState(NodeType.DISTRIBUTOR, State.DOWN));
+ }
+ return derived;
+ })
+ .deriveAndBuild();
+ }
+
+ private static ClusterStateBundle createTestBundle() {
+ return createTestBundle(true);
+ }
+
+ @Test
+ public void builder_creates_baseline_state_and_derived_state_per_space() {
+ ClusterStateBundle bundle = createTestBundle();
+ assertThat(bundle.getBaselineClusterState(), equalTo(stateOf("distributor:2 storage:2")));
+ assertThat(bundle.getDerivedBucketSpaceStates().size(), equalTo(3));
+ assertThat(bundle.getDerivedBucketSpaceStates().get("default"), equalTo(stateOf("distributor:2 storage:2 .0.s:d")));
+ assertThat(bundle.getDerivedBucketSpaceStates().get("global"), equalTo(stateOf("distributor:2 storage:2")));
+ assertThat(bundle.getDerivedBucketSpaceStates().get("narnia"), equalTo(stateOf("distributor:2 .0.s:d storage:2")));
+ }
+
+ @Test
+ public void version_clone_sets_version_for_all_spaces() {
+ ClusterStateBundle bundle = createTestBundle().clonedWithVersionSet(123);
+ assertThat(bundle.getBaselineClusterState(), equalTo(stateOf("version:123 distributor:2 storage:2")));
+ assertThat(bundle.getDerivedBucketSpaceStates().size(), equalTo(3));
+ assertThat(bundle.getDerivedBucketSpaceStates().get("default"), equalTo(stateOf("version:123 distributor:2 storage:2 .0.s:d")));
+ assertThat(bundle.getDerivedBucketSpaceStates().get("global"), equalTo(stateOf("version:123 distributor:2 storage:2")));
+ assertThat(bundle.getDerivedBucketSpaceStates().get("narnia"), equalTo(stateOf("version:123 distributor:2 .0.s:d storage:2")));
+ }
+
+ @Test
+ public void same_bundle_instance_considered_similar() {
+ ClusterStateBundle bundle = createTestBundle();
+ assertTrue(bundle.similarTo(bundle));
+ }
+
+ @Test
+ public void similarity_test_considers_all_bucket_spaces() {
+ ClusterStateBundle bundle = createTestBundle(false);
+ ClusterStateBundle unchangedBundle = createTestBundle(false);
+
+ assertTrue(bundle.similarTo(unchangedBundle));
+ assertTrue(unchangedBundle.similarTo(bundle));
+
+ ClusterStateBundle changedBundle = createTestBundle(true);
+ assertFalse(bundle.similarTo(changedBundle));
+ assertFalse(changedBundle.similarTo(bundle));
+ }
+
+ @Test
+ public void toString_without_bucket_space_states_prints_only_baseline_state() {
+ ClusterStateBundle bundle = ClusterStateBundle.ofBaselineOnly(
+ AnnotatedClusterState.withoutAnnotations(stateOf("distributor:2 storage:2")));
+ assertThat(bundle.toString(), equalTo("ClusterStateBundle('distributor:2 storage:2')"));
+ }
+
+ @Test
+ public void toString_includes_all_bucket_space_states() {
+ ClusterStateBundle bundle = createTestBundle();
+ assertThat(bundle.toString(), equalTo("ClusterStateBundle('distributor:2 storage:2', " +
+ "default 'distributor:2 storage:2 .0.s:d', " +
+ "global 'distributor:2 storage:2', " +
+ "narnia 'distributor:2 .0.s:d storage:2')"));
+ }
+
+}
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyCommunicator.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyCommunicator.java
index c500b4c7390..5d200d65516 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyCommunicator.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyCommunicator.java
@@ -86,9 +86,10 @@ public class DummyCommunicator implements Communicator, NodeLookup {
}
@Override
- public void setSystemState(ClusterState state, NodeInfo node, Waiter<SetClusterStateRequest> waiter) {
- DummySetClusterStateRequest req = new DummySetClusterStateRequest(node, state);
- node.setSystemStateVersionSent(state);
+ public void setSystemState(ClusterStateBundle stateBundle, NodeInfo node, Waiter<SetClusterStateRequest> waiter) {
+ ClusterState baselineState = stateBundle.getBaselineClusterState();
+ DummySetClusterStateRequest req = new DummySetClusterStateRequest(node, baselineState);
+ node.setSystemStateVersionSent(baselineState);
req.setReply(new SetClusterStateRequest.Reply());
if (node.isStorage() || !shouldDeferDistributorClusterStateAcks) {
waiter.done(req);
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MaintenancehenPendingGlobalMergesTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MaintenancehenPendingGlobalMergesTest.java
new file mode 100644
index 00000000000..6c51f251096
--- /dev/null
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MaintenancehenPendingGlobalMergesTest.java
@@ -0,0 +1,71 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.clustercontroller.core;
+
+import com.yahoo.document.FixedBucketSpaces;
+import com.yahoo.vdslib.state.ClusterState;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class MaintenancehenPendingGlobalMergesTest {
+
+ private static class Fixture {
+ public MergePendingChecker mockPendingChecker = mock(MergePendingChecker.class);
+ public MaintenanceWhenPendingGlobalMerges deriver = new MaintenanceWhenPendingGlobalMerges(mockPendingChecker);
+ }
+
+ private static String defaultSpace() {
+ return FixedBucketSpaces.defaultSpace();
+ }
+
+ private static String globalSpace() {
+ return FixedBucketSpaces.globalSpace();
+ }
+
+ @Test
+ public void no_nodes_set_to_maintenance_in_global_bucket_space_state() {
+ Fixture f = new Fixture();
+ when(f.mockPendingChecker.hasMergesPending(eq(globalSpace()), anyInt())).thenReturn(true); // False returned by default otherwise
+ ClusterState derived = f.deriver.derivedFrom(ClusterState.stateFromString("distributor:2 storage:2"), globalSpace());
+ assertThat(derived, equalTo(ClusterState.stateFromString("distributor:2 storage:2")));
+ }
+
+ @Test
+ public void content_nodes_with_global_merge_pending_set_to_maintenance_in_default_space_state() {
+ Fixture f = new Fixture();
+ when(f.mockPendingChecker.hasMergesPending(globalSpace(), 1)).thenReturn(true);
+ when(f.mockPendingChecker.hasMergesPending(globalSpace(), 3)).thenReturn(true);
+ ClusterState derived = f.deriver.derivedFrom(ClusterState.stateFromString("distributor:5 storage:5"), defaultSpace());
+ assertThat(derived, equalTo(ClusterState.stateFromString("distributor:5 storage:5 .1.s:m .3.s:m")));
+ }
+
+ @Test
+ public void no_nodes_set_to_maintenance_when_no_merges_pending() {
+ Fixture f = new Fixture();
+ ClusterState derived = f.deriver.derivedFrom(ClusterState.stateFromString("distributor:5 storage:5"), defaultSpace());
+ assertThat(derived, equalTo(ClusterState.stateFromString("distributor:5 storage:5")));
+ }
+
+ @Test
+ public void default_space_merges_do_not_count_towards_maintenance() {
+ Fixture f = new Fixture();
+ when(f.mockPendingChecker.hasMergesPending(eq(defaultSpace()), anyInt())).thenReturn(true);
+ ClusterState derived = f.deriver.derivedFrom(ClusterState.stateFromString("distributor:2 storage:2"), defaultSpace());
+ assertThat(derived, equalTo(ClusterState.stateFromString("distributor:2 storage:2")));
+ }
+
+ @Test
+ public void nodes_only_set_to_maintenance_when_marked_up_init_or_retiring() {
+ Fixture f = new Fixture();
+ when(f.mockPendingChecker.hasMergesPending(eq(globalSpace()), anyInt())).thenReturn(true);
+ ClusterState derived = f.deriver.derivedFrom(ClusterState.stateFromString("distributor:5 storage:5 .1.s:m .2.s:r .3.s:i .4.s:d"), defaultSpace());
+ // TODO reconsider role of retired here... It should not have merges pending towards it in the general case, but may be out of sync
+ assertThat(derived, equalTo(ClusterState.stateFromString("distributor:5 storage:5 .0.s:m .1.s:m .2.s:m .3.s:m .4.s:d")));
+ }
+
+}
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateVersionTrackerTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateVersionTrackerTest.java
index d27acf8bc17..0f4d1fcdefc 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateVersionTrackerTest.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateVersionTrackerTest.java
@@ -8,6 +8,7 @@ import com.yahoo.vdslib.state.NodeType;
import com.yahoo.vdslib.state.State;
import org.junit.Test;
+import java.text.ParseException;
import java.util.Arrays;
import java.util.Optional;
@@ -24,6 +25,10 @@ public class StateVersionTrackerTest {
return new AnnotatedClusterState(state, Optional.empty(), AnnotatedClusterState.emptyNodeStateReasons());
}
+ private static ClusterStateBundle stateBundleWithoutAnnotations(String stateStr) {
+ return ClusterStateBundle.ofBaselineOnly(stateWithoutAnnotations(stateStr));
+ }
+
private static StateVersionTracker createWithMockedMetrics() {
return new StateVersionTracker();
}
@@ -32,7 +37,7 @@ public class StateVersionTrackerTest {
final AnnotatedClusterState state,
final long timeMs)
{
- versionTracker.updateLatestCandidateState(state);
+ versionTracker.updateLatestCandidateStateBundle(ClusterStateBundle.ofBaselineOnly(state));
versionTracker.promoteCandidateToVersionedState(timeMs);
}
@@ -89,14 +94,14 @@ public class StateVersionTrackerTest {
@Test
public void diff_from_initial_state_implies_changed_state() {
final StateVersionTracker versionTracker = createWithMockedMetrics();
- versionTracker.updateLatestCandidateState(stateWithoutAnnotations("cluster:d"));
+ versionTracker.updateLatestCandidateStateBundle(stateBundleWithoutAnnotations("cluster:d"));
assertTrue(versionTracker.candidateChangedEnoughFromCurrentToWarrantPublish());
}
private static boolean stateChangedBetween(String fromState, String toState) {
final StateVersionTracker versionTracker = createWithMockedMetrics();
updateAndPromote(versionTracker, stateWithoutAnnotations(fromState), 123);
- versionTracker.updateLatestCandidateState(stateWithoutAnnotations(toState));
+ versionTracker.updateLatestCandidateStateBundle(stateBundleWithoutAnnotations(toState));
return versionTracker.candidateChangedEnoughFromCurrentToWarrantPublish();
}
@@ -165,7 +170,7 @@ public class StateVersionTrackerTest {
new Node(NodeType.STORAGE, 0),
new NodeState(NodeType.STORAGE, State.UP).setMinUsedBits(15));
updateAndPromote(versionTracker, stateWithMinBits, 123);
- versionTracker.updateLatestCandidateState(stateWithoutAnnotations("distributor:2 storage:2"));
+ versionTracker.updateLatestCandidateStateBundle(stateBundleWithoutAnnotations("distributor:2 storage:2"));
assertFalse(versionTracker.candidateChangedEnoughFromCurrentToWarrantPublish());
}
@@ -221,12 +226,45 @@ public class StateVersionTrackerTest {
final StateVersionTracker versionTracker = createWithMockedMetrics();
AnnotatedClusterState candidate = stateWithoutAnnotations("distributor:2 storage:2");
- versionTracker.updateLatestCandidateState(candidate);
+ versionTracker.updateLatestCandidateStateBundle(ClusterStateBundle.ofBaselineOnly(candidate));
assertThat(versionTracker.getLatestCandidateState(), equalTo(candidate));
candidate = stateWithoutAnnotations("distributor:3 storage:3");
- versionTracker.updateLatestCandidateState(candidate);
+ versionTracker.updateLatestCandidateStateBundle(ClusterStateBundle.ofBaselineOnly(candidate));
assertThat(versionTracker.getLatestCandidateState(), equalTo(candidate));
}
+ private static ClusterState stateOf(String state) {
+ return ClusterState.stateFromString(state);
+ }
+
+ private static ClusterStateBundle baselineBundle(boolean alteredDefaultState) {
+ return ClusterStateBundle
+ .builder(AnnotatedClusterState.withoutAnnotations(stateOf("distributor:1 storage:1")))
+ .bucketSpaces("default")
+ .stateDeriver((state, space) -> {
+ ClusterState derived = state.clone();
+ if (alteredDefaultState) {
+ derived.setNodeState(Node.ofStorage(0), new NodeState(NodeType.STORAGE, State.DOWN));
+ }
+ return derived;
+ })
+ .deriveAndBuild();
+ }
+
+ @Test
+ public void version_change_check_takes_derived_states_into_account() {
+ final StateVersionTracker versionTracker = createWithMockedMetrics();
+ versionTracker.updateLatestCandidateStateBundle(baselineBundle(false));
+ versionTracker.promoteCandidateToVersionedState(1234);
+
+ // Not marked changed with no changes across bucket spaces
+ versionTracker.updateLatestCandidateStateBundle(baselineBundle(false));
+ assertFalse(versionTracker.candidateChangedEnoughFromCurrentToWarrantPublish());
+
+ // Changing state in default space marks as sufficiently changed
+ versionTracker.updateLatestCandidateStateBundle(baselineBundle(true));
+ assertTrue(versionTracker.candidateChangedEnoughFromCurrentToWarrantPublish());
+ }
+
}
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java
new file mode 100644
index 00000000000..93aac5c83ed
--- /dev/null
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java
@@ -0,0 +1,145 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.clustercontroller.core;
+
+import com.yahoo.vdslib.state.*;
+import com.yahoo.vespa.clustercontroller.core.database.DatabaseHandler;
+import com.yahoo.vespa.clustercontroller.core.listeners.NodeAddedOrRemovedListener;
+import com.yahoo.vespa.clustercontroller.core.listeners.NodeStateOrHostInfoChangeHandler;
+import org.junit.Test;
+
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+public class SystemStateBroadcasterTest {
+
+ private static class Fixture {
+ FakeTimer timer = new FakeTimer();
+ final Object monitor = new Object();
+ SystemStateBroadcaster broadcaster = new SystemStateBroadcaster(timer, monitor);
+ Communicator mockCommunicator = mock(Communicator.class);
+
+ void simulateNodePartitionedAwaySilently(ClusterFixture cf) {
+ cf.cluster().getNodeInfo(Node.ofStorage(0)).setStartTimestamp(600);
+ cf.cluster().getNodeInfo(Node.ofStorage(1)).setStartTimestamp(700);
+ // Simulate a distributor being partitioned away from the controller without actually going down. It will
+ // need to observe all startup timestamps to infer if it should fetch bucket info from nodes.
+ cf.cluster().getNodeInfo(Node.ofDistributor(0)).setStartTimestamp(500); // FIXME multiple sources of timestamps are... rather confusing
+ cf.cluster().getNodeInfo(Node.ofDistributor(0)).setReportedState(new NodeState(NodeType.DISTRIBUTOR, State.UP).setStartTimestamp(500), 1000);
+ cf.cluster().getNodeInfo(Node.ofDistributor(0)).setReportedState(new NodeState(NodeType.DISTRIBUTOR, State.DOWN).setStartTimestamp(500), 2000);
+ cf.cluster().getNodeInfo(Node.ofDistributor(0)).setReportedState(new NodeState(NodeType.DISTRIBUTOR, State.UP).setStartTimestamp(500), 3000);
+ }
+ }
+
+ private static DatabaseHandler.Context dbContextFrom(ContentCluster cluster) {
+ return new DatabaseHandler.Context() {
+ @Override
+ public ContentCluster getCluster() {
+ return cluster;
+ }
+
+ @Override
+ public FleetController getFleetController() {
+ return null; // We assume the broadcaster doesn't use this for our test purposes
+ }
+
+ @Override
+ public NodeAddedOrRemovedListener getNodeAddedOrRemovedListener() {
+ return null;
+ }
+
+ @Override
+ public NodeStateOrHostInfoChangeHandler getNodeStateUpdateListener() {
+ return null;
+ }
+ };
+ }
+
+ private static Stream<NodeInfo> clusterNodeInfos(ContentCluster c, Node... nodes) {
+ return Stream.of(nodes).map(c::getNodeInfo);
+ }
+
+ private static class StateMapping {
+ final String bucketSpace;
+ final ClusterState state;
+
+ StateMapping(String bucketSpace, ClusterState state) {
+ this.bucketSpace = bucketSpace;
+ this.state = state;
+ }
+ }
+
+ private static StateMapping mapping(String bucketSpace, String state) {
+ return new StateMapping(bucketSpace, ClusterState.stateFromString(state));
+ }
+
+ private static ClusterStateBundle makeBundle(String baselineState, StateMapping... bucketSpaceStates) {
+ return ClusterStateBundle.of(AnnotatedClusterState.withoutAnnotations(ClusterState.stateFromString(baselineState)),
+ Stream.of(bucketSpaceStates).collect(Collectors.toMap(sm -> sm.bucketSpace, sm -> sm.state)));
+ }
+
+ @Test
+ public void always_publish_baseline_cluster_state() {
+ Fixture f = new Fixture();
+ ClusterStateBundle stateBundle = makeBundle("distributor:2 storage:2");
+ ClusterFixture cf = ClusterFixture.forFlatCluster(2).bringEntireClusterUp().assignDummyRpcAddresses();
+ f.broadcaster.handleNewClusterStates(stateBundle);
+ f.broadcaster.broadcastNewState(dbContextFrom(cf.cluster()), f.mockCommunicator);
+ cf.cluster().getNodeInfo().forEach(nodeInfo -> verify(f.mockCommunicator).setSystemState(eq(stateBundle), eq(nodeInfo), any()));
+ }
+
+ @Test
+ public void non_observed_startup_timestamps_are_published_per_node_for_baseline_state() {
+ Fixture f = new Fixture();
+ ClusterStateBundle stateBundle = makeBundle("distributor:2 storage:2");
+ ClusterFixture cf = ClusterFixture.forFlatCluster(2).bringEntireClusterUp().assignDummyRpcAddresses();
+ f.simulateNodePartitionedAwaySilently(cf);
+ f.broadcaster.handleNewClusterStates(stateBundle);
+ f.broadcaster.broadcastNewState(dbContextFrom(cf.cluster()), f.mockCommunicator);
+
+ clusterNodeInfos(cf.cluster(), Node.ofDistributor(1), Node.ofStorage(0), Node.ofStorage(1)).forEach(nodeInfo -> {
+ // Only distributor 0 should observe startup timestamps
+ verify(f.mockCommunicator).setSystemState(eq(stateBundle), eq(nodeInfo), any());
+ });
+ ClusterStateBundle expectedDistr0Bundle = makeBundle("distributor:2 storage:2 .0.t:600 .1.t:700");
+ verify(f.mockCommunicator).setSystemState(eq(expectedDistr0Bundle), eq(cf.cluster().getNodeInfo(Node.ofDistributor(0))), any());
+ }
+
+ @Test
+ public void bucket_space_states_are_published_verbatim_when_no_additional_timestamps_needed() {
+ Fixture f = new Fixture();
+ ClusterStateBundle stateBundle = makeBundle("distributor:2 storage:2",
+ mapping("default", "distributor:2 storage:2 .0.s:d"),
+ mapping("upsidedown", "distributor:2 .0.s:d storage:2"));
+ ClusterFixture cf = ClusterFixture.forFlatCluster(2).bringEntireClusterUp().assignDummyRpcAddresses();
+ f.broadcaster.handleNewClusterStates(stateBundle);
+ f.broadcaster.broadcastNewState(dbContextFrom(cf.cluster()), f.mockCommunicator);
+
+ cf.cluster().getNodeInfo().forEach(nodeInfo -> verify(f.mockCommunicator).setSystemState(eq(stateBundle), eq(nodeInfo), any()));
+ }
+
+ @Test
+ public void non_observed_startup_timestamps_are_published_per_bucket_space_state() {
+ Fixture f = new Fixture();
+ ClusterStateBundle stateBundle = makeBundle("distributor:2 storage:2",
+ mapping("default", "distributor:2 storage:2 .0.s:d"),
+ mapping("upsidedown", "distributor:2 .0.s:d storage:2"));
+ ClusterFixture cf = ClusterFixture.forFlatCluster(2).bringEntireClusterUp().assignDummyRpcAddresses();
+ f.simulateNodePartitionedAwaySilently(cf);
+ f.broadcaster.handleNewClusterStates(stateBundle);
+ f.broadcaster.broadcastNewState(dbContextFrom(cf.cluster()), f.mockCommunicator);
+
+ clusterNodeInfos(cf.cluster(), Node.ofDistributor(1), Node.ofStorage(0), Node.ofStorage(1)).forEach(nodeInfo -> {
+ // Only distributor 0 should observe startup timestamps
+ verify(f.mockCommunicator).setSystemState(eq(stateBundle), eq(nodeInfo), any());
+ });
+ ClusterStateBundle expectedDistr0Bundle = makeBundle("distributor:2 storage:2 .0.t:600 .1.t:700",
+ mapping("default", "distributor:2 storage:2 .0.s:d .0.t:600 .1.t:700"),
+ mapping("upsidedown", "distributor:2 .0.s:d storage:2 .0.t:600 .1.t:700"));
+ verify(f.mockCommunicator).setSystemState(eq(expectedDistr0Bundle), eq(cf.cluster().getNodeInfo(Node.ofDistributor(0))), any());
+ }
+}
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/StateWaiter.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/StateWaiter.java
index 3b64d3bc1cb..730b03ff7c9 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/StateWaiter.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/StateWaiter.java
@@ -3,6 +3,7 @@ package com.yahoo.vespa.clustercontroller.core.testutils;
import com.yahoo.vdslib.state.ClusterState;
import com.yahoo.vdslib.state.Node;
+import com.yahoo.vespa.clustercontroller.core.ClusterStateBundle;
import com.yahoo.vespa.clustercontroller.core.FakeTimer;
import com.yahoo.vespa.clustercontroller.core.listeners.SystemStateListener;
@@ -22,9 +23,9 @@ public class StateWaiter implements SystemStateListener {
this.timer = timer;
}
- public void handleNewSystemState(ClusterState state) {
+ public void handleNewSystemState(ClusterStateBundle state) {
synchronized(timer) {
- current = state;
+ current = state.getBaselineClusterState();
++stateUpdates;
timer.notifyAll();
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/WaitCondition.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/WaitCondition.java
index ebe0c0edb4b..6396b27aa79 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/WaitCondition.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/WaitCondition.java
@@ -4,6 +4,7 @@ package com.yahoo.vespa.clustercontroller.core.testutils;
import com.yahoo.vdslib.state.ClusterState;
import com.yahoo.vdslib.state.Node;
import com.yahoo.vdslib.state.NodeType;
+import com.yahoo.vespa.clustercontroller.core.ClusterStateBundle;
import com.yahoo.vespa.clustercontroller.core.DummyVdsNode;
import com.yahoo.vespa.clustercontroller.core.FleetController;
import com.yahoo.vespa.clustercontroller.core.listeners.SystemStateListener;
@@ -25,9 +26,9 @@ public interface WaitCondition {
protected ClusterState currentState;
private final SystemStateListener listener = new SystemStateListener() {
@Override
- public void handleNewSystemState(ClusterState state) {
+ public void handleNewSystemState(ClusterStateBundle state) {
synchronized (monitor) {
- currentState = state;
+ currentState = state.getBaselineClusterState();
monitor.notifyAll();
}
}