aboutsummaryrefslogtreecommitdiffstats
path: root/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/StateVersionTracker.java
blob: fa3ff68b0337a10eb9c72d5c842dcac3ed6e77ef (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.clustercontroller.core;

import com.yahoo.vdslib.state.ClusterState;
import com.yahoo.vespa.clustercontroller.core.hostinfo.HostInfo;

import java.util.List;

/**
 * Keeps track of the active cluster state and handles the transition edges between
 * one state to the next. In particular, it ensures that states have strictly increasing
 * version numbers.
 *
 * Wraps ClusterStateView to ensure its knowledge of available nodes stays up to date.
 */
public class StateVersionTracker {

    // We always increment the version _before_ publishing, so the effective first cluster
    // state version when starting from 1 will be 2. This matches legacy behavior and a bunch
    // of existing tests expect it.
    private int currentVersion = 1;
    private int lastZooKeeperVersion = 0;

    // The lowest published distribution bit count for the lifetime of this controller.
    // TODO this mirrors legacy behavior, but should be moved into stable ZK state.
    private int lowestObservedDistributionBits = 16;

    private ClusterStateBundle currentUnversionedState = ClusterStateBundle.ofBaselineOnly(AnnotatedClusterState.emptyState());
    private ClusterStateBundle latestCandidateState = ClusterStateBundle.ofBaselineOnly(AnnotatedClusterState.emptyState());
    private ClusterStateBundle currentClusterState = latestCandidateState;

    private ClusterStateView clusterStateView;
    private final ClusterStatsChangeTracker clusterStatsChangeTracker;

    private final ClusterStateHistory clusterStateHistory = new ClusterStateHistory();
    private double minMergeCompletionRatio;

    StateVersionTracker(double minMergeCompletionRatio) {
        clusterStateView = ClusterStateView.create(currentUnversionedState.getBaselineClusterState());
        clusterStatsChangeTracker = new ClusterStatsChangeTracker(clusterStateView.getStatsAggregator().getAggregatedStats(),
                minMergeCompletionRatio);
        this.minMergeCompletionRatio = minMergeCompletionRatio;
    }

    void setVersionRetrievedFromZooKeeper(final int version) {
        this.currentVersion = Math.max(1, version);
        this.lastZooKeeperVersion = this.currentVersion;
    }

    void setClusterStateBundleRetrievedFromZooKeeper(ClusterStateBundle bundle) {
        // There is an edge where the bundle version will mismatch with the version set
        // via setVersionRetrievedFromZooKeeper() if the controller (or ZK) crashes before
        // it can write both sequentially. But since we use the ZK-written version explicitly
        // when choosing a new version for our own published states, it should not matter in
        // practice. Worst case is that the current state reflects the same version that a
        // previous controller had, but we will never publish this state ourselves; publishing
        // only happens after we've generated our own, new candidate state and overwritten
        // the empty states set below. Publishing also, as mentioned, sets a version based on
        // the ZK version, not the version stored in the bundle itself.
        currentClusterState = bundle;
        currentUnversionedState = ClusterStateBundle.empty();
        latestCandidateState = ClusterStateBundle.empty();
    }

    /**
     * Sets limit on how many cluster states can be kept in the in-memory queue. Once
     * the list exceeds this limit, the oldest state is repeatedly removed until the limit
     * is no longer exceeded.
     *
     * Takes effect upon the next invocation of promoteCandidateToVersionedState().
     */
    void setMaxHistoryEntryCount(int maxHistoryEntryCount) {
        this.clusterStateHistory.setMaxHistoryEntryCount(maxHistoryEntryCount);
    }

    void setMinMergeCompletionRatio(double minMergeCompletionRatio) {
        this.minMergeCompletionRatio = minMergeCompletionRatio;
    }

    int getCurrentVersion() {
        return this.currentVersion;
    }

    boolean hasReceivedNewVersionFromZooKeeper() {
        return currentVersion <= lastZooKeeperVersion;
    }

    int getLowestObservedDistributionBits() {
        return lowestObservedDistributionBits;
    }

    AnnotatedClusterState getAnnotatedVersionedClusterState() {
        return currentClusterState.getBaselineAnnotatedState();
    }

    public ClusterState getVersionedClusterState() {
        return currentClusterState.getBaselineClusterState();
    }

    public ClusterStatsAggregator getAggregatedClusterStats() {
        return clusterStateView.getStatsAggregator();
    }

    public ClusterStateBundle getVersionedClusterStateBundle() {
        return currentClusterState;
    }

    public void updateLatestCandidateStateBundle(final ClusterStateBundle candidateBundle) {
        assert(latestCandidateState.getBaselineClusterState().getVersion() == 0);
        latestCandidateState = candidateBundle;
        clusterStatsChangeTracker.syncAggregatedStats();
    }

    /**
     * Returns the last state provided to updateLatestCandidateStateBundle, which _may or may not_ be
     * a published state. Primary use case for this function is a caller which is interested in
     * changes that may not be reflected in the published state. The best example of this would
     * be node state changes when a cluster is marked as Down.
     */
    public AnnotatedClusterState getLatestCandidateState() {
        return latestCandidateState.getBaselineAnnotatedState();
    }

    public ClusterStateBundle getLatestCandidateStateBundle() {
        return latestCandidateState;
    }

    public List<ClusterStateHistoryEntry> getClusterStateHistory() {
        return clusterStateHistory.getHistory();
    }

    boolean candidateChangedEnoughFromCurrentToWarrantPublish() {
        // Neither latestCandidateState nor currentUnversionedState has a version set, so the
        // similarity is only done on structural state metadata.
        return !currentUnversionedState.similarTo(latestCandidateState);
    }

    void promoteCandidateToVersionedState(final long currentTimeMs) {
        final int newVersion = currentVersion + 1;
        updateStatesForNewVersion(latestCandidateState, newVersion);
        currentVersion = newVersion;

        recordCurrentStateInHistoryAtTime(currentTimeMs);
    }

    private void updateStatesForNewVersion(final ClusterStateBundle newStateBundle, final int newVersion) {
        currentClusterState = newStateBundle.clonedWithVersionSet(newVersion);
        currentUnversionedState = newStateBundle; // TODO should we clone..? ClusterState really should be made immutable
        lowestObservedDistributionBits = Math.min(
                lowestObservedDistributionBits,
                newStateBundle.getBaselineClusterState().getDistributionBitCount());
        // TODO should this take place in updateLatestCandidateStateBundle instead? I.e. does it require a consolidated state?
        clusterStateView = ClusterStateView.create(currentClusterState.getBaselineClusterState());
        clusterStatsChangeTracker.updateAggregatedStats(clusterStateView.getStatsAggregator().getAggregatedStats(),
                minMergeCompletionRatio);
    }

    private void recordCurrentStateInHistoryAtTime(final long currentTimeMs) {
        clusterStateHistory.add(currentClusterState, currentTimeMs);
    }

    void handleUpdatedHostInfo(final NodeInfo node, final HostInfo hostInfo) {
        // TODO the wiring here isn't unit tested. Need mockable integration points.
        clusterStateView.handleUpdatedHostInfo(node, hostInfo);
    }

    boolean bucketSpaceMergeCompletionStateHasChanged() {
        return clusterStatsChangeTracker.statsHaveChanged();
    }

    MergePendingChecker createMergePendingChecker() {
        return clusterStateView.getStatsAggregator().createMergePendingChecker(minMergeCompletionRatio);
    }

    /*
    TODO test and implement
      - derived default space down-condition can only _keep_ a node in maintenance (down), not transition it from up -> maintenance
    */

}