diff options
Diffstat (limited to 'clustercontroller-core')
21 files changed, 757 insertions, 102 deletions
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/AnnotatedClusterState.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/AnnotatedClusterState.java index f935b8b2cc7..9bf36cca947 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/AnnotatedClusterState.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/AnnotatedClusterState.java @@ -28,6 +28,10 @@ public class AnnotatedClusterState { return new AnnotatedClusterState(ClusterState.emptyState(), Optional.empty(), emptyNodeStateReasons()); } + public static AnnotatedClusterState withoutAnnotations(ClusterState state) { + return new AnnotatedClusterState(state, Optional.empty(), emptyNodeStateReasons()); + } + static Map<Node, NodeStateReason> emptyNodeStateReasons() { return Collections.emptyMap(); } diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateBundle.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateBundle.java new file mode 100644 index 00000000000..09273ee5656 --- /dev/null +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateBundle.java @@ -0,0 +1,149 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.clustercontroller.core; + +import com.yahoo.vdslib.state.ClusterState; + +import java.util.*; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** + * A cluster state bundle is a wrapper around the baseline ("source of truth") cluster + * state and any bucket space specific states that may be derived from it. + * + * The baseline state represents the generated state of the _nodes_ in the cluster, + * while the per-space states represent possible transformations that make sense in + * the context of that particular bucket space. The most prominent example is + * transforming nodes in the default bucket space into maintenance mode if they have + * merges pending in the global space. + * + * The baseline state is identical to the legacy, global cluster state that the + * cluster controller has historically produced as its only output. + */ +public class ClusterStateBundle { + + private final AnnotatedClusterState baselineState; + private final Map<String, ClusterState> derivedBucketSpaceStates; + + public static class Builder { + private final AnnotatedClusterState baselineState; + private ClusterStateDeriver stateDeriver; + private Set<String> bucketSpaces; + + public Builder(AnnotatedClusterState baselineState) { + this.baselineState = baselineState; + } + + public Builder stateDeriver(ClusterStateDeriver stateDeriver) { + this.stateDeriver = stateDeriver; + return this; + } + + public Builder bucketSpaces(Set<String> bucketSpaces) { + this.bucketSpaces = bucketSpaces; + return this; + } + + public Builder bucketSpaces(String... bucketSpaces) { + this.bucketSpaces = new TreeSet<>(Arrays.asList(bucketSpaces)); + return this; + } + + public ClusterStateBundle deriveAndBuild() { + if (stateDeriver == null || bucketSpaces == null || bucketSpaces.isEmpty()) { + return ClusterStateBundle.ofBaselineOnly(baselineState); + } + Map<String, ClusterState> derived = bucketSpaces.stream() + .collect(Collectors.toMap( + Function.identity(), + s -> stateDeriver.derivedFrom(baselineState.getClusterState(), s))); + return new ClusterStateBundle(baselineState, derived); + } + } + + private ClusterStateBundle(AnnotatedClusterState baselineState, Map<String, ClusterState> derivedBucketSpaceStates) { + this.baselineState = baselineState; + this.derivedBucketSpaceStates = Collections.unmodifiableMap(derivedBucketSpaceStates); + } + + public static Builder builder(AnnotatedClusterState baselineState) { + return new Builder(baselineState); + } + + public static ClusterStateBundle of(AnnotatedClusterState baselineState, Map<String, ClusterState> derivedBucketSpaceStates) { + return new ClusterStateBundle(baselineState, derivedBucketSpaceStates); + } + + public static ClusterStateBundle ofBaselineOnly(AnnotatedClusterState baselineState) { + return new ClusterStateBundle(baselineState, Collections.emptyMap()); + } + + public AnnotatedClusterState getBaselineAnnotatedState() { + return baselineState; + } + + public ClusterState getBaselineClusterState() { + return baselineState.getClusterState(); + } + + public Map<String, ClusterState> getDerivedBucketSpaceStates() { + return derivedBucketSpaceStates; + } + + public ClusterStateBundle cloneWithMapper(Function<ClusterState, ClusterState> mapper) { + AnnotatedClusterState clonedBaseline = new AnnotatedClusterState( + mapper.apply(baselineState.getClusterState().clone()), + baselineState.getClusterStateReason(), + baselineState.getNodeStateReasons()); + Map<String, ClusterState> clonedDerived = derivedBucketSpaceStates.entrySet().stream() + .collect(Collectors.toMap(e -> e.getKey(), e -> mapper.apply(e.getValue().clone()))); + return new ClusterStateBundle(clonedBaseline, clonedDerived); + } + + public ClusterStateBundle clonedWithVersionSet(int version) { + return cloneWithMapper(state -> { + state.setVersion(version); + return state; + }); + } + + public boolean similarTo(ClusterStateBundle other) { + if (!baselineState.getClusterState().similarToIgnoringInitProgress(other.baselineState.getClusterState())) { + return false; + } + // FIXME we currently treat mismatching bucket space sets as unchanged to avoid breaking some tests + return derivedBucketSpaceStates.entrySet().stream() + .allMatch(entry -> other.derivedBucketSpaceStates.getOrDefault(entry.getKey(), entry.getValue()) + .similarToIgnoringInitProgress(entry.getValue())); + } + + public int getVersion() { + return baselineState.getClusterState().getVersion(); + } + + @Override + public String toString() { + if (derivedBucketSpaceStates.isEmpty()) { + return String.format("ClusterStateBundle('%s')", baselineState); + } + Map<String, ClusterState> orderedStates = new TreeMap<>(derivedBucketSpaceStates); + return String.format("ClusterStateBundle('%s', %s)", baselineState, orderedStates.entrySet().stream() + .map(e -> String.format("%s '%s'", e.getKey(), e.getValue())) + .collect(Collectors.joining(", "))); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ClusterStateBundle that = (ClusterStateBundle) o; + return Objects.equals(baselineState, that.baselineState) && + Objects.equals(derivedBucketSpaceStates, that.derivedBucketSpaceStates); + } + + @Override + public int hashCode() { + return Objects.hash(baselineState, derivedBucketSpaceStates); + } + +} diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateDeriver.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateDeriver.java new file mode 100644 index 00000000000..9e42e1da649 --- /dev/null +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateDeriver.java @@ -0,0 +1,18 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.clustercontroller.core; + +import com.yahoo.vdslib.state.ClusterState; + +/** + * Bucket-space aware transformation factory to "derive" new cluster states from an + * existing state. + */ +public interface ClusterStateDeriver { + /** + * @param state Baseline cluster state used as a source for deriving a new state. + * MUST NOT be modified explicitly or implicitly. + * @param bucketSpace The name of the bucket space for which the state should be derived + * @return A cluster state instance representing the derived state, or <em>state</em> unchanged. + */ + ClusterState derivedFrom(ClusterState state, String bucketSpace); +} diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/Communicator.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/Communicator.java index de0ba6bf33c..450513343b0 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/Communicator.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/Communicator.java @@ -18,7 +18,7 @@ public interface Communicator { void getNodeState(NodeInfo node, Waiter<GetNodeStateRequest> waiter); - void setSystemState(ClusterState state, NodeInfo node, Waiter<SetClusterStateRequest> waiter); + void setSystemState(ClusterStateBundle states, NodeInfo node, Waiter<SetClusterStateRequest> waiter); void shutdown(); diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java index ba56d75ab4c..1059434aac3 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java @@ -1,6 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.clustercontroller.core; +import com.yahoo.document.FixedBucketSpaces; import com.yahoo.jrt.ListenFailedException; import com.yahoo.log.LogLevel; import com.yahoo.vdslib.distribution.ConfiguredNode; @@ -24,18 +25,11 @@ import org.apache.commons.lang.exception.ExceptionUtils; import java.io.FileNotFoundException; -import java.util.ArrayDeque; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Queue; -import java.util.Set; -import java.util.TimeZone; +import java.util.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Logger; +import java.util.stream.Collectors; +import java.util.stream.Stream; public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAddedOrRemovedListener, SystemStateListener, Runnable, RemoteClusterControllerTaskScheduler { @@ -69,7 +63,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd private boolean waitingForCycle = false; private StatusPageServer.PatternRequestRouter statusRequestRouter = new StatusPageServer.PatternRequestRouter(); - private final List<com.yahoo.vdslib.state.ClusterState> newStates = new ArrayList<>(); + private final List<ClusterStateBundle> newStates = new ArrayList<>(); private long configGeneration = -1; private long nextConfigGeneration = -1; private Queue<RemoteClusterControllerTask> remoteTasks = new LinkedList<>(); @@ -84,6 +78,11 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd // Invariant: queued task versions are monotonically increasing with queue position private Queue<VersionDependentTaskCompletion> taskCompletionQueue = new ArrayDeque<>(); + // Legacy behavior is an empty set of explicitly configured bucket spaces, which means that + // only a baseline cluster state will be sent from the controller and no per-space state + // deriving is done. + private Set<String> configuredBucketSpaces = Collections.emptySet(); + private final RunDataExtractor dataExtractor = new RunDataExtractor() { @Override public com.yahoo.vdslib.state.ClusterState getLatestClusterState() { return stateVersionTracker.getVersionedClusterState(); } @@ -233,7 +232,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd // Always give cluster state listeners the current state, in case acceptable state has come before listener is registered. com.yahoo.vdslib.state.ClusterState state = getSystemState(); if (state == null) throw new NullPointerException("Cluster state should never be null at this point"); - listener.handleNewSystemState(state); + listener.handleNewSystemState(ClusterStateBundle.ofBaselineOnly(AnnotatedClusterState.withoutAnnotations(state))); } } @@ -350,16 +349,18 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd stateChangeHandler.handleReturnedRpcAddress(node); } - public void handleNewSystemState(com.yahoo.vdslib.state.ClusterState state) { + @Override + public void handleNewSystemState(ClusterStateBundle stateBundle) { verifyInControllerThread(); - newStates.add(state); - metricUpdater.updateClusterStateMetrics(cluster, state); - systemStateBroadcaster.handleNewSystemState(state); + ClusterState baselineState = stateBundle.getBaselineClusterState(); + newStates.add(stateBundle); + metricUpdater.updateClusterStateMetrics(cluster, baselineState); + systemStateBroadcaster.handleNewClusterStates(stateBundle); // Iff master, always store new version in ZooKeeper _before_ publishing to any // nodes so that a cluster controller crash after publishing but before a successful // ZK store will not risk reusing the same version number. if (masterElectionHandler.isMaster()) { - storeClusterStateVersionToZooKeeper(state); + storeClusterStateVersionToZooKeeper(baselineState); } } @@ -425,8 +426,9 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd // Check retirement changes for (ConfiguredNode node : newNodes) { - if (node.retired() != cluster.getConfiguredNodes().get(node.index()).retired()) + if (node.retired() != cluster.getConfiguredNodes().get(node.index()).retired()) { return true; + } } return false; @@ -441,10 +443,20 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd cluster.setSlobrokGenerationCount(0); } + // TODO don't hardcode bucket spaces + if (options.enableMultipleBucketSpaces) { + configuredBucketSpaces = Collections.unmodifiableSet( + Stream.of(FixedBucketSpaces.defaultSpace(), FixedBucketSpaces.globalSpace()) + .collect(Collectors.toSet())); + } else { + configuredBucketSpaces = Collections.emptySet(); + } + communicator.propagateOptions(options); - if (nodeLookup instanceof SlobrokClient) - ((SlobrokClient)nodeLookup).setSlobrokConnectionSpecs(options.slobrokConnectionSpecs); + if (nodeLookup instanceof SlobrokClient) { + ((SlobrokClient) nodeLookup).setSlobrokConnectionSpecs(options.slobrokConnectionSpecs); + } eventLog.setMaxSize(options.eventLogMaxSize, options.eventNodeLogMaxSize); cluster.setPollingFrequency(options.statePollingFrequency); cluster.setDistribution(options.storageDistribution); @@ -625,7 +637,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd return false; } - private boolean broadcastClusterStateToEligibleNodes() throws InterruptedException { + private boolean broadcastClusterStateToEligibleNodes() { boolean sentAny = false; // Give nodes a fair chance to respond first time to state gathering requests, so we don't // disturb system when we take over. Allow anyways if we have states from all nodes. @@ -638,7 +650,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd // Reset timer to only see warning once. firstAllowedStateBroadcast = currentTime; } - sentAny = systemStateBroadcaster.broadcastNewState(database, databaseContext, communicator, this); + sentAny = systemStateBroadcaster.broadcastNewState(databaseContext, communicator); if (sentAny) { nextStateSendTime = currentTime + options.minTimeBetweenNewSystemStates; } @@ -649,9 +661,9 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd private void propagateNewStatesToListeners() { if ( ! newStates.isEmpty()) { synchronized (systemStateListeners) { - for (ClusterState state : newStates) { + for (ClusterStateBundle stateBundle : newStates) { for(SystemStateListener listener : systemStateListeners) { - listener.handleNewSystemState(state); + listener.handleNewSystemState(stateBundle); } } newStates.clear(); @@ -778,7 +790,13 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd if (mustRecomputeCandidateClusterState()) { stateChangeHandler.unsetStateChangedFlag(); final AnnotatedClusterState candidate = computeCurrentAnnotatedState(); - stateVersionTracker.updateLatestCandidateState(candidate); + // TODO test multiple bucket spaces configured + // TODO what interaction do we want between generated and derived states wrt. auto group take-downs? + final ClusterStateBundle candidateBundle = ClusterStateBundle.builder(candidate) + .bucketSpaces(configuredBucketSpaces) + .stateDeriver(createBucketSpaceStateDeriver()) + .deriveAndBuild(); + stateVersionTracker.updateLatestCandidateStateBundle(candidateBundle); if (stateVersionTracker.candidateChangedEnoughFromCurrentToWarrantPublish() || stateVersionTracker.hasReceivedNewVersionFromZooKeeper()) @@ -787,8 +805,9 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd final AnnotatedClusterState before = stateVersionTracker.getAnnotatedVersionedClusterState(); stateVersionTracker.promoteCandidateToVersionedState(timeNowMs); + // TODO also emit derived state edges events emitEventsForAlteredStateEdges(before, stateVersionTracker.getAnnotatedVersionedClusterState(), timeNowMs); - handleNewSystemState(stateVersionTracker.getVersionedClusterState()); + handleNewSystemState(stateVersionTracker.getVersionedClusterStateBundle()); stateWasChanged = true; } } @@ -804,6 +823,10 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd return stateWasChanged; } + private ClusterStateDeriver createBucketSpaceStateDeriver() { + return new MaintenanceWhenPendingGlobalMerges((space, index) -> false); // TODO wire with stats aggregator + } + /** * Move tasks that are dependent on the most recently generated state being published into * a completion queue with a dependency on the provided version argument. Once that version @@ -862,7 +885,9 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd } private boolean mustRecomputeCandidateClusterState() { - return stateChangeHandler.stateMayHaveChanged() || stateVersionTracker.hasReceivedNewVersionFromZooKeeper(); + return stateChangeHandler.stateMayHaveChanged() + || stateVersionTracker.hasReceivedNewVersionFromZooKeeper() + || stateVersionTracker.bucketSpaceMergeCompletionStateHasChanged(); } private boolean handleLeadershipEdgeTransitions() throws InterruptedException { diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetControllerOptions.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetControllerOptions.java index d8c853f45cb..3bf8edafbe3 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetControllerOptions.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetControllerOptions.java @@ -119,6 +119,9 @@ public class FleetControllerOptions implements Cloneable { private Duration maxDeferredTaskVersionWaitTime = Duration.ofSeconds(30); + // TODO replace this flag with a set of bucket spaces instead + public boolean enableMultipleBucketSpaces = false; + // TODO: Replace usage of this by usage where the nodes are explicitly passed (below) public FleetControllerOptions(String clusterName) { this.clusterName = clusterName; @@ -225,6 +228,7 @@ public class FleetControllerOptions implements Cloneable { sb.append("<tr><td><nobr>Maximum node event log size</nobr></td><td align=\"right\">").append(eventNodeLogMaxSize).append("</td></tr>"); sb.append("<tr><td><nobr>Wanted distribution bits</nobr></td><td align=\"right\">").append(distributionBits).append("</td></tr>"); sb.append("<tr><td><nobr>Max deferred task version wait time</nobr></td><td align=\"right\">").append(maxDeferredTaskVersionWaitTime.toMillis()).append("ms</td></tr>"); + sb.append("<tr><td><nobr>Multiple bucket spaces enabled</nobr></td><td align=\"right\">").append(enableMultipleBucketSpaces).append("</td></tr>"); sb.append("</table>"); } diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/MaintenanceWhenPendingGlobalMerges.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/MaintenanceWhenPendingGlobalMerges.java new file mode 100644 index 00000000000..16401a79f08 --- /dev/null +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/MaintenanceWhenPendingGlobalMerges.java @@ -0,0 +1,66 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.clustercontroller.core; + +import com.yahoo.document.FixedBucketSpaces; +import com.yahoo.vdslib.state.*; + +import java.util.HashSet; +import java.util.Set; + +/** + * Cluster state deriver which checks if nodes have merges pending for globally + * distributed documents, and if they do, sets the node in maintenance mode in the + * default bucket space. This allows merges to complete for global documents before + * the default space documents that have references to them are made searchable. + * + * Note: bucket spaces are currently hard-coded. + */ +public class MaintenanceWhenPendingGlobalMerges implements ClusterStateDeriver { + + // TODO make these configurable + private static final String bucketSpaceToCheck = FixedBucketSpaces.globalSpace(); + private static final String bucketSpaceToDerive = FixedBucketSpaces.defaultSpace(); + + private final MergePendingChecker mergePendingChecker; + + public MaintenanceWhenPendingGlobalMerges(MergePendingChecker mergePendingChecker) { + this.mergePendingChecker = mergePendingChecker; + } + + @Override + public ClusterState derivedFrom(ClusterState baselineState, String bucketSpace) { + ClusterState derivedState = baselineState.clone(); + if (!bucketSpace.equals(bucketSpaceToDerive)) { + return derivedState; + } + Set<Integer> incompleteNodeIndices = nodesWithMergesNotDone(baselineState); + if (incompleteNodeIndices.isEmpty()) { + return derivedState; // Nothing to do + } + incompleteNodeIndices.forEach(nodeIndex -> derivedState.setNodeState(Node.ofStorage(nodeIndex), + new NodeState(NodeType.STORAGE, State.MAINTENANCE))); + return derivedState; + } + + private Set<Integer> nodesWithMergesNotDone(ClusterState baselineState) { + final Set<Integer> incompleteNodes = new HashSet<>(); + final int nodeCount = baselineState.getNodeCount(NodeType.STORAGE); + for (int nodeIndex = 0; nodeIndex < nodeCount; ++nodeIndex) { + // FIXME should only set nodes into maintenance if they've not yet been up in the cluster + // state since they came back as Reported state Up! + // Must be implemented before this state deriver is enabled in production. + if (contentNodeIsAvailable(baselineState, nodeIndex) && hasMergesNotDone(bucketSpaceToCheck, nodeIndex)) { + incompleteNodes.add(nodeIndex); + } + } + return incompleteNodes; + } + + private boolean contentNodeIsAvailable(ClusterState state, int nodeIndex) { + return state.getNodeState(Node.ofStorage(nodeIndex)).getState().oneOf("uir"); + } + + private boolean hasMergesNotDone(String bucketSpace, int nodeIndex) { + return mergePendingChecker.hasMergesPending(bucketSpace, nodeIndex); + } +} diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/MergePendingChecker.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/MergePendingChecker.java new file mode 100644 index 00000000000..e1b844c53de --- /dev/null +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/MergePendingChecker.java @@ -0,0 +1,13 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.clustercontroller.core; + +/** + * Implementations provide functionality for checking if a particular bucket space + * has merges reported as pending from the cluster's distributor nodes. It is up + * to the implementation to determine the exact semantics of what "pending" implies. + */ +public interface MergePendingChecker { + + boolean hasMergesPending(String bucketSpace, int contentNodeIndex); + +} diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/StateVersionTracker.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/StateVersionTracker.java index 902189fca8b..fffe1c95124 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/StateVersionTracker.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/StateVersionTracker.java @@ -28,9 +28,9 @@ public class StateVersionTracker { // TODO this mirrors legacy behavior, but should be moved into stable ZK state. private int lowestObservedDistributionBits = 16; - private ClusterState currentUnversionedState = ClusterState.emptyState(); - private AnnotatedClusterState latestCandidateState = AnnotatedClusterState.emptyState(); - private AnnotatedClusterState currentClusterState = latestCandidateState; + private ClusterStateBundle currentUnversionedState = ClusterStateBundle.ofBaselineOnly(AnnotatedClusterState.emptyState()); + private ClusterStateBundle latestCandidateState = ClusterStateBundle.ofBaselineOnly(AnnotatedClusterState.emptyState()); + private ClusterStateBundle currentClusterState = latestCandidateState; private ClusterStateView clusterStateView; @@ -38,7 +38,7 @@ public class StateVersionTracker { private int maxHistoryEntryCount = 50; StateVersionTracker() { - clusterStateView = ClusterStateView.create(currentUnversionedState); + clusterStateView = ClusterStateView.create(currentUnversionedState.getBaselineClusterState()); } void setVersionRetrievedFromZooKeeper(final int version) { @@ -70,26 +70,30 @@ public class StateVersionTracker { } AnnotatedClusterState getAnnotatedVersionedClusterState() { - return currentClusterState; + return currentClusterState.getBaselineAnnotatedState(); } public ClusterState getVersionedClusterState() { - return currentClusterState.getClusterState(); + return currentClusterState.getBaselineClusterState(); + } + + public ClusterStateBundle getVersionedClusterStateBundle() { + return currentClusterState; } - public void updateLatestCandidateState(final AnnotatedClusterState candidate) { - assert(latestCandidateState.getClusterState().getVersion() == 0); - latestCandidateState = candidate; + public void updateLatestCandidateStateBundle(final ClusterStateBundle candidateBundle) { + assert(latestCandidateState.getBaselineClusterState().getVersion() == 0); + latestCandidateState = candidateBundle; } /** - * Returns the last state provided to updateLatestCandidateState, which _may or may not_ be + * Returns the last state provided to updateLatestCandidateStateBundle, which _may or may not_ be * a published state. Primary use case for this function is a caller which is interested in * changes that may not be reflected in the published state. The best example of this would * be node state changes when a cluster is marked as Down. */ public AnnotatedClusterState getLatestCandidateState() { - return latestCandidateState; + return latestCandidateState.getBaselineAnnotatedState(); } public List<ClusterStateHistoryEntry> getClusterStateHistory() { @@ -97,7 +101,9 @@ public class StateVersionTracker { } boolean candidateChangedEnoughFromCurrentToWarrantPublish() { - return !currentUnversionedState.similarToIgnoringInitProgress(latestCandidateState.getClusterState()); + // Neither latestCandidateState nor currentUnversionedState has a version set, so the + // similarity is only done on structural state metadata. + return !currentUnversionedState.similarTo(latestCandidateState); } void promoteCandidateToVersionedState(final long currentTimeMs) { @@ -108,23 +114,19 @@ public class StateVersionTracker { recordCurrentStateInHistoryAtTime(currentTimeMs); } - private void updateStatesForNewVersion(final AnnotatedClusterState newState, final int newVersion) { - currentClusterState = new AnnotatedClusterState( - newState.getClusterState().clone(), // Because we mutate version below - newState.getClusterStateReason(), - newState.getNodeStateReasons()); - currentClusterState.getClusterState().setVersion(newVersion); - currentUnversionedState = newState.getClusterState().clone(); + private void updateStatesForNewVersion(final ClusterStateBundle newStateBundle, final int newVersion) { + currentClusterState = newStateBundle.clonedWithVersionSet(newVersion); + currentUnversionedState = newStateBundle; // TODO should we clone..? ClusterState really should be made immutable lowestObservedDistributionBits = Math.min( lowestObservedDistributionBits, - newState.getClusterState().getDistributionBitCount()); - // TODO should this take place in updateLatestCandidateState instead? I.e. does it require a consolidated state? - clusterStateView = ClusterStateView.create(currentClusterState.getClusterState()); + newStateBundle.getBaselineClusterState().getDistributionBitCount()); + // TODO should this take place in updateLatestCandidateStateBundle instead? I.e. does it require a consolidated state? + clusterStateView = ClusterStateView.create(currentClusterState.getBaselineClusterState()); } private void recordCurrentStateInHistoryAtTime(final long currentTimeMs) { clusterStateHistory.addFirst(new ClusterStateHistoryEntry( - currentClusterState.getClusterState(), currentTimeMs)); + currentClusterState.getBaselineClusterState(), currentTimeMs)); while (clusterStateHistory.size() > maxHistoryEntryCount) { clusterStateHistory.removeLast(); } @@ -135,4 +137,13 @@ public class StateVersionTracker { clusterStateView.handleUpdatedHostInfo(hostnames, node, hostInfo); } + boolean bucketSpaceMergeCompletionStateHasChanged() { + return false; // TODO wire changes in merge info + } + + /* + TODO test and implement + - derived default space down-condition can only _keep_ a node in maintenance (down), not transition it from up -> maintenance + */ + } diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java index 024c5d21607..45a14ee19cd 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java @@ -18,7 +18,7 @@ public class SystemStateBroadcaster { private final Timer timer; private final Object monitor; - private ClusterState systemState; + private ClusterStateBundle clusterStateBundle; private final List<SetClusterStateRequest> replies = new LinkedList<>(); private final static long minTimeBetweenNodeErrorLogging = 10 * 60 * 1000; @@ -32,12 +32,12 @@ public class SystemStateBroadcaster { this.monitor = monitor; } - public void handleNewSystemState(ClusterState state) { - systemState = state; + public void handleNewClusterStates(ClusterStateBundle state) { + clusterStateBundle = state; } public ClusterState getClusterState() { - return systemState; + return clusterStateBundle.getBaselineClusterState(); } private void reportNodeError(boolean nodeOk, NodeInfo info, String message) { @@ -79,7 +79,7 @@ public class SystemStateBroadcaster { } private boolean nodeNeedsClusterState(NodeInfo node) { - if (node.getSystemStateVersionAcknowledged() == systemState.getVersion()) { + if (node.getSystemStateVersionAcknowledged() == clusterStateBundle.getVersion()) { return false; // No point in sending if node already has updated system state } if (node.getRpcAddress() == null || node.isRpcAddressOutdated()) { @@ -102,7 +102,7 @@ public class SystemStateBroadcaster { } private boolean newestStateAlreadySentToNode(NodeInfo node) { - return (node.getNewestSystemStateVersionSent() == systemState.getVersion()); + return (node.getNewestSystemStateVersionSent() == clusterStateBundle.getVersion()); } /** @@ -113,42 +113,45 @@ public class SystemStateBroadcaster { void checkIfClusterStateIsAckedByAllDistributors(DatabaseHandler database, DatabaseHandler.Context dbContext, FleetController fleetController) throws InterruptedException { - if ((systemState == null) || (lastClusterStateInSync == systemState.getVersion())) { + final int currentStateVersion = clusterStateBundle.getVersion(); + if ((clusterStateBundle == null) || (lastClusterStateInSync == currentStateVersion)) { return; // Nothing to do for the current state } boolean anyOutdatedDistributorNodes = dbContext.getCluster().getNodeInfo().stream() .filter(NodeInfo::isDistributor) .anyMatch(this::nodeNeedsClusterState); - if (!anyOutdatedDistributorNodes && (systemState.getVersion() > lastClusterStateInSync)) { + if (!anyOutdatedDistributorNodes && (currentStateVersion > lastClusterStateInSync)) { log.log(LogLevel.DEBUG, "All distributors have newest clusterstate, updating start timestamps in zookeeper and clearing them from cluster state"); - lastClusterStateInSync = systemState.getVersion(); + lastClusterStateInSync = currentStateVersion; fleetController.handleAllDistributorsInSync(database, dbContext); } } - public boolean broadcastNewState(DatabaseHandler database, - DatabaseHandler.Context dbContext, - Communicator communicator, - FleetController fleetController) throws InterruptedException { - if (systemState == null) return false; + public boolean broadcastNewState(DatabaseHandler.Context dbContext, Communicator communicator) { + if (clusterStateBundle == null) { + return false; + } + + ClusterState baselineState = clusterStateBundle.getBaselineClusterState(); - if (!systemState.isOfficial()) { - log.log(LogLevel.INFO, String.format("Publishing cluster state version %d", systemState.getVersion())); - systemState.setOfficial(true); + if (!baselineState.isOfficial()) { + log.log(LogLevel.INFO, String.format("Publishing cluster state version %d", baselineState.getVersion())); + baselineState.setOfficial(true); } List<NodeInfo> recipients = resolveStateVersionSendSet(dbContext); for (NodeInfo node : recipients) { if (nodeNeedsToObserveStartupTimestamps(node)) { - ClusterState newState = buildModifiedClusterState(dbContext); - log.log(LogLevel.DEBUG, "Sending modified system state version " + systemState.getVersion() - + " to node " + node + ": " + newState); - communicator.setSystemState(newState, node, waiter); + // TODO this is the same for all nodes, compute only once + ClusterStateBundle modifiedBundle = clusterStateBundle.cloneWithMapper(state -> buildModifiedClusterState(state, dbContext)); + log.log(LogLevel.DEBUG, "Sending modified cluster state version " + baselineState.getVersion() + + " to node " + node + ": " + modifiedBundle); + communicator.setSystemState(modifiedBundle, node, waiter); } else { - log.log(LogLevel.DEBUG, "Sending system state version " + systemState.getVersion() + " to node " + node + log.log(LogLevel.DEBUG, "Sending system state version " + baselineState.getVersion() + " to node " + node + ". (went down time " + node.getWentDownWithStartTime() + ", node start time " + node.getStartTimestamp() + ")"); - communicator.setSystemState(systemState, node, waiter); + communicator.setSystemState(clusterStateBundle, node, waiter); } } @@ -157,12 +160,12 @@ public class SystemStateBroadcaster { public int lastClusterStateVersionInSync() { return lastClusterStateInSync; } - private boolean nodeNeedsToObserveStartupTimestamps(NodeInfo node) { + private static boolean nodeNeedsToObserveStartupTimestamps(NodeInfo node) { return node.getStartTimestamp() != 0 && node.getWentDownWithStartTime() == node.getStartTimestamp(); } - private ClusterState buildModifiedClusterState(DatabaseHandler.Context dbContext) { - ClusterState newState = systemState.clone(); + private static ClusterState buildModifiedClusterState(ClusterState sourceState, DatabaseHandler.Context dbContext) { + ClusterState newState = sourceState.clone(); for (NodeInfo n : dbContext.getCluster().getNodeInfo()) { NodeState ns = newState.getNodeState(n.getNode()); if (!n.isDistributor() && ns.getStartTimestamp() == 0) { diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/DatabaseHandler.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/DatabaseHandler.java index 07f42301695..c89bb6a136a 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/DatabaseHandler.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/DatabaseHandler.java @@ -28,10 +28,10 @@ public class DatabaseHandler { private static Logger log = Logger.getLogger(DatabaseHandler.class.getName()); public interface Context { - public ContentCluster getCluster(); - public FleetController getFleetController(); - public NodeAddedOrRemovedListener getNodeAddedOrRemovedListener(); - public NodeStateOrHostInfoChangeHandler getNodeStateUpdateListener(); + ContentCluster getCluster(); + FleetController getFleetController(); + NodeAddedOrRemovedListener getNodeAddedOrRemovedListener(); + NodeStateOrHostInfoChangeHandler getNodeStateUpdateListener(); } private class Data { diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/listeners/SystemStateListener.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/listeners/SystemStateListener.java index 617f91cb837..40fe91f0c1e 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/listeners/SystemStateListener.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/listeners/SystemStateListener.java @@ -1,10 +1,10 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.clustercontroller.core.listeners; -import com.yahoo.vdslib.state.ClusterState; +import com.yahoo.vespa.clustercontroller.core.ClusterStateBundle; public interface SystemStateListener { - void handleNewSystemState(ClusterState state); + void handleNewSystemState(ClusterStateBundle states); } diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java index 1887c1dd2ad..0cc5ef36e3c 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java @@ -157,8 +157,9 @@ public class RPCCommunicator implements Communicator { } @Override - public void setSystemState(ClusterState state, NodeInfo node, Waiter<SetClusterStateRequest> externalWaiter) { - RPCSetClusterStateWaiter waiter = new RPCSetClusterStateWaiter(externalWaiter, timer); + public void setSystemState(ClusterStateBundle stateBundle, NodeInfo node, Waiter<SetClusterStateRequest> externalWaiter) { + final RPCSetClusterStateWaiter waiter = new RPCSetClusterStateWaiter(externalWaiter, timer); + final ClusterState baselineState = stateBundle.getBaselineClusterState(); Target connection = getConnection(node); if (!connection.isValid()) { @@ -172,17 +173,17 @@ public class RPCCommunicator implements Communicator { Request req; if (node.getVersion() == 0) { req = new Request("setsystemstate"); - req.parameters().add(new StringValue(state.toString(true))); + req.parameters().add(new StringValue(baselineState.toString(true))); } else { req = new Request("setsystemstate2"); - req.parameters().add(new StringValue(state.toString(false))); + req.parameters().add(new StringValue(baselineState.toString(false))); } - RPCSetClusterStateRequest stateRequest = new RPCSetClusterStateRequest(node, req, state.getVersion()); + RPCSetClusterStateRequest stateRequest = new RPCSetClusterStateRequest(node, req, baselineState.getVersion()); waiter.setRequest(stateRequest); connection.invokeAsync(req, 60, waiter); - node.setSystemStateVersionSent(state); + node.setSystemStateVersionSent(baselineState); } // protected for testing. diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterFixture.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterFixture.java index 16e9675d586..4862c83f2a6 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterFixture.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterFixture.java @@ -149,6 +149,11 @@ class ClusterFixture { return this; } + ClusterFixture assignDummyRpcAddresses() { + cluster.getNodeInfo().forEach(ni -> ni.setRpcAddress("tcp/localhost:0")); + return this; + } + static Map<NodeType, Integer> buildTransitionTimeMap(int distributorTransitionTime, int storageTransitionTime) { Map<NodeType, Integer> maxTransitionTime = new TreeMap<>(); maxTransitionTime.put(NodeType.DISTRIBUTOR, distributorTransitionTime); diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterStateBundleTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterStateBundleTest.java new file mode 100644 index 00000000000..04a0451bc6d --- /dev/null +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterStateBundleTest.java @@ -0,0 +1,99 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.clustercontroller.core; + +import com.yahoo.vdslib.state.*; +import org.junit.Test; + +import java.text.ParseException; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class ClusterStateBundleTest { + + private static ClusterState stateOf(String state) { + try { + return new ClusterState(state); + } catch (ParseException e) { + throw new RuntimeException(e); + } + } + + private static ClusterStateBundle createTestBundle(boolean modifyDefaultSpace) { + return ClusterStateBundle + .builder(AnnotatedClusterState.withoutAnnotations(stateOf("distributor:2 storage:2"))) + .bucketSpaces("default", "global", "narnia") + .stateDeriver((state, space) -> { + ClusterState derived = state.clone(); + if (space.equals("default") && modifyDefaultSpace) { + derived.setNodeState(Node.ofStorage(0), new NodeState(NodeType.STORAGE, State.DOWN)); + } else if (space.equals("narnia")) { + derived.setNodeState(Node.ofDistributor(0), new NodeState(NodeType.DISTRIBUTOR, State.DOWN)); + } + return derived; + }) + .deriveAndBuild(); + } + + private static ClusterStateBundle createTestBundle() { + return createTestBundle(true); + } + + @Test + public void builder_creates_baseline_state_and_derived_state_per_space() { + ClusterStateBundle bundle = createTestBundle(); + assertThat(bundle.getBaselineClusterState(), equalTo(stateOf("distributor:2 storage:2"))); + assertThat(bundle.getDerivedBucketSpaceStates().size(), equalTo(3)); + assertThat(bundle.getDerivedBucketSpaceStates().get("default"), equalTo(stateOf("distributor:2 storage:2 .0.s:d"))); + assertThat(bundle.getDerivedBucketSpaceStates().get("global"), equalTo(stateOf("distributor:2 storage:2"))); + assertThat(bundle.getDerivedBucketSpaceStates().get("narnia"), equalTo(stateOf("distributor:2 .0.s:d storage:2"))); + } + + @Test + public void version_clone_sets_version_for_all_spaces() { + ClusterStateBundle bundle = createTestBundle().clonedWithVersionSet(123); + assertThat(bundle.getBaselineClusterState(), equalTo(stateOf("version:123 distributor:2 storage:2"))); + assertThat(bundle.getDerivedBucketSpaceStates().size(), equalTo(3)); + assertThat(bundle.getDerivedBucketSpaceStates().get("default"), equalTo(stateOf("version:123 distributor:2 storage:2 .0.s:d"))); + assertThat(bundle.getDerivedBucketSpaceStates().get("global"), equalTo(stateOf("version:123 distributor:2 storage:2"))); + assertThat(bundle.getDerivedBucketSpaceStates().get("narnia"), equalTo(stateOf("version:123 distributor:2 .0.s:d storage:2"))); + } + + @Test + public void same_bundle_instance_considered_similar() { + ClusterStateBundle bundle = createTestBundle(); + assertTrue(bundle.similarTo(bundle)); + } + + @Test + public void similarity_test_considers_all_bucket_spaces() { + ClusterStateBundle bundle = createTestBundle(false); + ClusterStateBundle unchangedBundle = createTestBundle(false); + + assertTrue(bundle.similarTo(unchangedBundle)); + assertTrue(unchangedBundle.similarTo(bundle)); + + ClusterStateBundle changedBundle = createTestBundle(true); + assertFalse(bundle.similarTo(changedBundle)); + assertFalse(changedBundle.similarTo(bundle)); + } + + @Test + public void toString_without_bucket_space_states_prints_only_baseline_state() { + ClusterStateBundle bundle = ClusterStateBundle.ofBaselineOnly( + AnnotatedClusterState.withoutAnnotations(stateOf("distributor:2 storage:2"))); + assertThat(bundle.toString(), equalTo("ClusterStateBundle('distributor:2 storage:2')")); + } + + @Test + public void toString_includes_all_bucket_space_states() { + ClusterStateBundle bundle = createTestBundle(); + assertThat(bundle.toString(), equalTo("ClusterStateBundle('distributor:2 storage:2', " + + "default 'distributor:2 storage:2 .0.s:d', " + + "global 'distributor:2 storage:2', " + + "narnia 'distributor:2 .0.s:d storage:2')")); + } + +} diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyCommunicator.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyCommunicator.java index c500b4c7390..5d200d65516 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyCommunicator.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyCommunicator.java @@ -86,9 +86,10 @@ public class DummyCommunicator implements Communicator, NodeLookup { } @Override - public void setSystemState(ClusterState state, NodeInfo node, Waiter<SetClusterStateRequest> waiter) { - DummySetClusterStateRequest req = new DummySetClusterStateRequest(node, state); - node.setSystemStateVersionSent(state); + public void setSystemState(ClusterStateBundle stateBundle, NodeInfo node, Waiter<SetClusterStateRequest> waiter) { + ClusterState baselineState = stateBundle.getBaselineClusterState(); + DummySetClusterStateRequest req = new DummySetClusterStateRequest(node, baselineState); + node.setSystemStateVersionSent(baselineState); req.setReply(new SetClusterStateRequest.Reply()); if (node.isStorage() || !shouldDeferDistributorClusterStateAcks) { waiter.done(req); diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MaintenancehenPendingGlobalMergesTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MaintenancehenPendingGlobalMergesTest.java new file mode 100644 index 00000000000..6c51f251096 --- /dev/null +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MaintenancehenPendingGlobalMergesTest.java @@ -0,0 +1,71 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.clustercontroller.core; + +import com.yahoo.document.FixedBucketSpaces; +import com.yahoo.vdslib.state.ClusterState; +import org.junit.Test; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.junit.Assert.assertThat; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class MaintenancehenPendingGlobalMergesTest { + + private static class Fixture { + public MergePendingChecker mockPendingChecker = mock(MergePendingChecker.class); + public MaintenanceWhenPendingGlobalMerges deriver = new MaintenanceWhenPendingGlobalMerges(mockPendingChecker); + } + + private static String defaultSpace() { + return FixedBucketSpaces.defaultSpace(); + } + + private static String globalSpace() { + return FixedBucketSpaces.globalSpace(); + } + + @Test + public void no_nodes_set_to_maintenance_in_global_bucket_space_state() { + Fixture f = new Fixture(); + when(f.mockPendingChecker.hasMergesPending(eq(globalSpace()), anyInt())).thenReturn(true); // False returned by default otherwise + ClusterState derived = f.deriver.derivedFrom(ClusterState.stateFromString("distributor:2 storage:2"), globalSpace()); + assertThat(derived, equalTo(ClusterState.stateFromString("distributor:2 storage:2"))); + } + + @Test + public void content_nodes_with_global_merge_pending_set_to_maintenance_in_default_space_state() { + Fixture f = new Fixture(); + when(f.mockPendingChecker.hasMergesPending(globalSpace(), 1)).thenReturn(true); + when(f.mockPendingChecker.hasMergesPending(globalSpace(), 3)).thenReturn(true); + ClusterState derived = f.deriver.derivedFrom(ClusterState.stateFromString("distributor:5 storage:5"), defaultSpace()); + assertThat(derived, equalTo(ClusterState.stateFromString("distributor:5 storage:5 .1.s:m .3.s:m"))); + } + + @Test + public void no_nodes_set_to_maintenance_when_no_merges_pending() { + Fixture f = new Fixture(); + ClusterState derived = f.deriver.derivedFrom(ClusterState.stateFromString("distributor:5 storage:5"), defaultSpace()); + assertThat(derived, equalTo(ClusterState.stateFromString("distributor:5 storage:5"))); + } + + @Test + public void default_space_merges_do_not_count_towards_maintenance() { + Fixture f = new Fixture(); + when(f.mockPendingChecker.hasMergesPending(eq(defaultSpace()), anyInt())).thenReturn(true); + ClusterState derived = f.deriver.derivedFrom(ClusterState.stateFromString("distributor:2 storage:2"), defaultSpace()); + assertThat(derived, equalTo(ClusterState.stateFromString("distributor:2 storage:2"))); + } + + @Test + public void nodes_only_set_to_maintenance_when_marked_up_init_or_retiring() { + Fixture f = new Fixture(); + when(f.mockPendingChecker.hasMergesPending(eq(globalSpace()), anyInt())).thenReturn(true); + ClusterState derived = f.deriver.derivedFrom(ClusterState.stateFromString("distributor:5 storage:5 .1.s:m .2.s:r .3.s:i .4.s:d"), defaultSpace()); + // TODO reconsider role of retired here... It should not have merges pending towards it in the general case, but may be out of sync + assertThat(derived, equalTo(ClusterState.stateFromString("distributor:5 storage:5 .0.s:m .1.s:m .2.s:m .3.s:m .4.s:d"))); + } + +} diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateVersionTrackerTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateVersionTrackerTest.java index d27acf8bc17..0f4d1fcdefc 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateVersionTrackerTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateVersionTrackerTest.java @@ -8,6 +8,7 @@ import com.yahoo.vdslib.state.NodeType; import com.yahoo.vdslib.state.State; import org.junit.Test; +import java.text.ParseException; import java.util.Arrays; import java.util.Optional; @@ -24,6 +25,10 @@ public class StateVersionTrackerTest { return new AnnotatedClusterState(state, Optional.empty(), AnnotatedClusterState.emptyNodeStateReasons()); } + private static ClusterStateBundle stateBundleWithoutAnnotations(String stateStr) { + return ClusterStateBundle.ofBaselineOnly(stateWithoutAnnotations(stateStr)); + } + private static StateVersionTracker createWithMockedMetrics() { return new StateVersionTracker(); } @@ -32,7 +37,7 @@ public class StateVersionTrackerTest { final AnnotatedClusterState state, final long timeMs) { - versionTracker.updateLatestCandidateState(state); + versionTracker.updateLatestCandidateStateBundle(ClusterStateBundle.ofBaselineOnly(state)); versionTracker.promoteCandidateToVersionedState(timeMs); } @@ -89,14 +94,14 @@ public class StateVersionTrackerTest { @Test public void diff_from_initial_state_implies_changed_state() { final StateVersionTracker versionTracker = createWithMockedMetrics(); - versionTracker.updateLatestCandidateState(stateWithoutAnnotations("cluster:d")); + versionTracker.updateLatestCandidateStateBundle(stateBundleWithoutAnnotations("cluster:d")); assertTrue(versionTracker.candidateChangedEnoughFromCurrentToWarrantPublish()); } private static boolean stateChangedBetween(String fromState, String toState) { final StateVersionTracker versionTracker = createWithMockedMetrics(); updateAndPromote(versionTracker, stateWithoutAnnotations(fromState), 123); - versionTracker.updateLatestCandidateState(stateWithoutAnnotations(toState)); + versionTracker.updateLatestCandidateStateBundle(stateBundleWithoutAnnotations(toState)); return versionTracker.candidateChangedEnoughFromCurrentToWarrantPublish(); } @@ -165,7 +170,7 @@ public class StateVersionTrackerTest { new Node(NodeType.STORAGE, 0), new NodeState(NodeType.STORAGE, State.UP).setMinUsedBits(15)); updateAndPromote(versionTracker, stateWithMinBits, 123); - versionTracker.updateLatestCandidateState(stateWithoutAnnotations("distributor:2 storage:2")); + versionTracker.updateLatestCandidateStateBundle(stateBundleWithoutAnnotations("distributor:2 storage:2")); assertFalse(versionTracker.candidateChangedEnoughFromCurrentToWarrantPublish()); } @@ -221,12 +226,45 @@ public class StateVersionTrackerTest { final StateVersionTracker versionTracker = createWithMockedMetrics(); AnnotatedClusterState candidate = stateWithoutAnnotations("distributor:2 storage:2"); - versionTracker.updateLatestCandidateState(candidate); + versionTracker.updateLatestCandidateStateBundle(ClusterStateBundle.ofBaselineOnly(candidate)); assertThat(versionTracker.getLatestCandidateState(), equalTo(candidate)); candidate = stateWithoutAnnotations("distributor:3 storage:3"); - versionTracker.updateLatestCandidateState(candidate); + versionTracker.updateLatestCandidateStateBundle(ClusterStateBundle.ofBaselineOnly(candidate)); assertThat(versionTracker.getLatestCandidateState(), equalTo(candidate)); } + private static ClusterState stateOf(String state) { + return ClusterState.stateFromString(state); + } + + private static ClusterStateBundle baselineBundle(boolean alteredDefaultState) { + return ClusterStateBundle + .builder(AnnotatedClusterState.withoutAnnotations(stateOf("distributor:1 storage:1"))) + .bucketSpaces("default") + .stateDeriver((state, space) -> { + ClusterState derived = state.clone(); + if (alteredDefaultState) { + derived.setNodeState(Node.ofStorage(0), new NodeState(NodeType.STORAGE, State.DOWN)); + } + return derived; + }) + .deriveAndBuild(); + } + + @Test + public void version_change_check_takes_derived_states_into_account() { + final StateVersionTracker versionTracker = createWithMockedMetrics(); + versionTracker.updateLatestCandidateStateBundle(baselineBundle(false)); + versionTracker.promoteCandidateToVersionedState(1234); + + // Not marked changed with no changes across bucket spaces + versionTracker.updateLatestCandidateStateBundle(baselineBundle(false)); + assertFalse(versionTracker.candidateChangedEnoughFromCurrentToWarrantPublish()); + + // Changing state in default space marks as sufficiently changed + versionTracker.updateLatestCandidateStateBundle(baselineBundle(true)); + assertTrue(versionTracker.candidateChangedEnoughFromCurrentToWarrantPublish()); + } + } diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java new file mode 100644 index 00000000000..93aac5c83ed --- /dev/null +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java @@ -0,0 +1,145 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.clustercontroller.core; + +import com.yahoo.vdslib.state.*; +import com.yahoo.vespa.clustercontroller.core.database.DatabaseHandler; +import com.yahoo.vespa.clustercontroller.core.listeners.NodeAddedOrRemovedListener; +import com.yahoo.vespa.clustercontroller.core.listeners.NodeStateOrHostInfoChangeHandler; +import org.junit.Test; + +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +public class SystemStateBroadcasterTest { + + private static class Fixture { + FakeTimer timer = new FakeTimer(); + final Object monitor = new Object(); + SystemStateBroadcaster broadcaster = new SystemStateBroadcaster(timer, monitor); + Communicator mockCommunicator = mock(Communicator.class); + + void simulateNodePartitionedAwaySilently(ClusterFixture cf) { + cf.cluster().getNodeInfo(Node.ofStorage(0)).setStartTimestamp(600); + cf.cluster().getNodeInfo(Node.ofStorage(1)).setStartTimestamp(700); + // Simulate a distributor being partitioned away from the controller without actually going down. It will + // need to observe all startup timestamps to infer if it should fetch bucket info from nodes. + cf.cluster().getNodeInfo(Node.ofDistributor(0)).setStartTimestamp(500); // FIXME multiple sources of timestamps are... rather confusing + cf.cluster().getNodeInfo(Node.ofDistributor(0)).setReportedState(new NodeState(NodeType.DISTRIBUTOR, State.UP).setStartTimestamp(500), 1000); + cf.cluster().getNodeInfo(Node.ofDistributor(0)).setReportedState(new NodeState(NodeType.DISTRIBUTOR, State.DOWN).setStartTimestamp(500), 2000); + cf.cluster().getNodeInfo(Node.ofDistributor(0)).setReportedState(new NodeState(NodeType.DISTRIBUTOR, State.UP).setStartTimestamp(500), 3000); + } + } + + private static DatabaseHandler.Context dbContextFrom(ContentCluster cluster) { + return new DatabaseHandler.Context() { + @Override + public ContentCluster getCluster() { + return cluster; + } + + @Override + public FleetController getFleetController() { + return null; // We assume the broadcaster doesn't use this for our test purposes + } + + @Override + public NodeAddedOrRemovedListener getNodeAddedOrRemovedListener() { + return null; + } + + @Override + public NodeStateOrHostInfoChangeHandler getNodeStateUpdateListener() { + return null; + } + }; + } + + private static Stream<NodeInfo> clusterNodeInfos(ContentCluster c, Node... nodes) { + return Stream.of(nodes).map(c::getNodeInfo); + } + + private static class StateMapping { + final String bucketSpace; + final ClusterState state; + + StateMapping(String bucketSpace, ClusterState state) { + this.bucketSpace = bucketSpace; + this.state = state; + } + } + + private static StateMapping mapping(String bucketSpace, String state) { + return new StateMapping(bucketSpace, ClusterState.stateFromString(state)); + } + + private static ClusterStateBundle makeBundle(String baselineState, StateMapping... bucketSpaceStates) { + return ClusterStateBundle.of(AnnotatedClusterState.withoutAnnotations(ClusterState.stateFromString(baselineState)), + Stream.of(bucketSpaceStates).collect(Collectors.toMap(sm -> sm.bucketSpace, sm -> sm.state))); + } + + @Test + public void always_publish_baseline_cluster_state() { + Fixture f = new Fixture(); + ClusterStateBundle stateBundle = makeBundle("distributor:2 storage:2"); + ClusterFixture cf = ClusterFixture.forFlatCluster(2).bringEntireClusterUp().assignDummyRpcAddresses(); + f.broadcaster.handleNewClusterStates(stateBundle); + f.broadcaster.broadcastNewState(dbContextFrom(cf.cluster()), f.mockCommunicator); + cf.cluster().getNodeInfo().forEach(nodeInfo -> verify(f.mockCommunicator).setSystemState(eq(stateBundle), eq(nodeInfo), any())); + } + + @Test + public void non_observed_startup_timestamps_are_published_per_node_for_baseline_state() { + Fixture f = new Fixture(); + ClusterStateBundle stateBundle = makeBundle("distributor:2 storage:2"); + ClusterFixture cf = ClusterFixture.forFlatCluster(2).bringEntireClusterUp().assignDummyRpcAddresses(); + f.simulateNodePartitionedAwaySilently(cf); + f.broadcaster.handleNewClusterStates(stateBundle); + f.broadcaster.broadcastNewState(dbContextFrom(cf.cluster()), f.mockCommunicator); + + clusterNodeInfos(cf.cluster(), Node.ofDistributor(1), Node.ofStorage(0), Node.ofStorage(1)).forEach(nodeInfo -> { + // Only distributor 0 should observe startup timestamps + verify(f.mockCommunicator).setSystemState(eq(stateBundle), eq(nodeInfo), any()); + }); + ClusterStateBundle expectedDistr0Bundle = makeBundle("distributor:2 storage:2 .0.t:600 .1.t:700"); + verify(f.mockCommunicator).setSystemState(eq(expectedDistr0Bundle), eq(cf.cluster().getNodeInfo(Node.ofDistributor(0))), any()); + } + + @Test + public void bucket_space_states_are_published_verbatim_when_no_additional_timestamps_needed() { + Fixture f = new Fixture(); + ClusterStateBundle stateBundle = makeBundle("distributor:2 storage:2", + mapping("default", "distributor:2 storage:2 .0.s:d"), + mapping("upsidedown", "distributor:2 .0.s:d storage:2")); + ClusterFixture cf = ClusterFixture.forFlatCluster(2).bringEntireClusterUp().assignDummyRpcAddresses(); + f.broadcaster.handleNewClusterStates(stateBundle); + f.broadcaster.broadcastNewState(dbContextFrom(cf.cluster()), f.mockCommunicator); + + cf.cluster().getNodeInfo().forEach(nodeInfo -> verify(f.mockCommunicator).setSystemState(eq(stateBundle), eq(nodeInfo), any())); + } + + @Test + public void non_observed_startup_timestamps_are_published_per_bucket_space_state() { + Fixture f = new Fixture(); + ClusterStateBundle stateBundle = makeBundle("distributor:2 storage:2", + mapping("default", "distributor:2 storage:2 .0.s:d"), + mapping("upsidedown", "distributor:2 .0.s:d storage:2")); + ClusterFixture cf = ClusterFixture.forFlatCluster(2).bringEntireClusterUp().assignDummyRpcAddresses(); + f.simulateNodePartitionedAwaySilently(cf); + f.broadcaster.handleNewClusterStates(stateBundle); + f.broadcaster.broadcastNewState(dbContextFrom(cf.cluster()), f.mockCommunicator); + + clusterNodeInfos(cf.cluster(), Node.ofDistributor(1), Node.ofStorage(0), Node.ofStorage(1)).forEach(nodeInfo -> { + // Only distributor 0 should observe startup timestamps + verify(f.mockCommunicator).setSystemState(eq(stateBundle), eq(nodeInfo), any()); + }); + ClusterStateBundle expectedDistr0Bundle = makeBundle("distributor:2 storage:2 .0.t:600 .1.t:700", + mapping("default", "distributor:2 storage:2 .0.s:d .0.t:600 .1.t:700"), + mapping("upsidedown", "distributor:2 .0.s:d storage:2 .0.t:600 .1.t:700")); + verify(f.mockCommunicator).setSystemState(eq(expectedDistr0Bundle), eq(cf.cluster().getNodeInfo(Node.ofDistributor(0))), any()); + } +} diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/StateWaiter.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/StateWaiter.java index 3b64d3bc1cb..730b03ff7c9 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/StateWaiter.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/StateWaiter.java @@ -3,6 +3,7 @@ package com.yahoo.vespa.clustercontroller.core.testutils; import com.yahoo.vdslib.state.ClusterState; import com.yahoo.vdslib.state.Node; +import com.yahoo.vespa.clustercontroller.core.ClusterStateBundle; import com.yahoo.vespa.clustercontroller.core.FakeTimer; import com.yahoo.vespa.clustercontroller.core.listeners.SystemStateListener; @@ -22,9 +23,9 @@ public class StateWaiter implements SystemStateListener { this.timer = timer; } - public void handleNewSystemState(ClusterState state) { + public void handleNewSystemState(ClusterStateBundle state) { synchronized(timer) { - current = state; + current = state.getBaselineClusterState(); ++stateUpdates; timer.notifyAll(); diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/WaitCondition.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/WaitCondition.java index ebe0c0edb4b..6396b27aa79 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/WaitCondition.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/WaitCondition.java @@ -4,6 +4,7 @@ package com.yahoo.vespa.clustercontroller.core.testutils; import com.yahoo.vdslib.state.ClusterState; import com.yahoo.vdslib.state.Node; import com.yahoo.vdslib.state.NodeType; +import com.yahoo.vespa.clustercontroller.core.ClusterStateBundle; import com.yahoo.vespa.clustercontroller.core.DummyVdsNode; import com.yahoo.vespa.clustercontroller.core.FleetController; import com.yahoo.vespa.clustercontroller.core.listeners.SystemStateListener; @@ -25,9 +26,9 @@ public interface WaitCondition { protected ClusterState currentState; private final SystemStateListener listener = new SystemStateListener() { @Override - public void handleNewSystemState(ClusterState state) { + public void handleNewSystemState(ClusterStateBundle state) { synchronized (monitor) { - currentState = state; + currentState = state.getBaselineClusterState(); monitor.notifyAll(); } } |