aboutsummaryrefslogtreecommitdiffstats
path: root/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStatsAggregator.java
blob: f1c19bac9b674a66a7e91f7e31215c816ae8b899 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.clustercontroller.core;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

/**
 * Class that stores content cluster stats (with bucket space stats per node) for
 * the current cluster state version.
 *
 * Each distributor reports bucket space stats for the different content nodes.
 * These reports arrive with getnodestate RPC calls,
 * and eventually ends up as calls to updateForDistributor().
 * No assumptions are made on the sequence of getnodestate calls.
 * For instance, it's perfectly fine for the calls to arrive in the
 * following order:
 *   distributor 0
 *   distributor 1
 *   distributor 1
 *   distributor 0
 *   distributor 2
 *   ... etc
 *
 * @author hakonhall
 */
public class ClusterStatsAggregator {

    private final Set<Integer> distributors;
    private final Set<Integer> nonUpdatedDistributors;

    // Maps the distributor node index to a map of content node index to the
    // content node's stats.
    private final Map<Integer, ContentClusterStats> distributorToStats = new HashMap<>();

    // This is only needed as an optimization. Is just the sum of distributorToStats' ContentClusterStats.
    // Maps the content node index to the content node stats for that node.
    // This MUST be kept up-to-date with distributorToStats;
    private final ContentClusterStats aggregatedStats;

    ClusterStatsAggregator(Set<Integer> distributors, Set<Integer> storageNodes) {
        this.distributors = distributors;
        nonUpdatedDistributors = new HashSet<>(distributors);
        aggregatedStats = new ContentClusterStats(storageNodes);
    }

    public AggregatedClusterStats getAggregatedStats() {
        return new AggregatedClusterStats() {

            @Override
            public boolean hasUpdatesFromAllDistributors() {
                return nonUpdatedDistributors.isEmpty();
            }

            @Override
            public ContentClusterStats getStats() {
                return aggregatedStats;
            }

        };
    }

    public ContentNodeStats getAggregatedStatsForDistributor(int distributorIndex) {
        ContentNodeStats result = new ContentNodeStats(distributorIndex);
        ContentClusterStats distributorStats = distributorToStats.get(distributorIndex);
        if (distributorStats != null) {
            for (ContentNodeStats distributorStat : distributorStats) {
                result.add(distributorStat);
            }
        }
        return result;
    }

    MergePendingChecker createMergePendingChecker(double minMergeCompletionRatio) {
        return new AggregatedStatsMergePendingChecker(getAggregatedStats(), minMergeCompletionRatio);
    }

    /**
     * Update the aggregator with the newest available stats from a distributor.
     */
    void updateForDistributor(int distributorIndex, ContentClusterStats clusterStats) {
        if (!distributors.contains(distributorIndex)) {
            return;
        }
        nonUpdatedDistributors.remove(distributorIndex);
        addStatsFromDistributor(distributorIndex, clusterStats);
    }

    private void addStatsFromDistributor(int distributorIndex, ContentClusterStats clusterStats) {
        ContentClusterStats prevClusterStats = distributorToStats.put(distributorIndex, clusterStats);

        for (ContentNodeStats contentNode : aggregatedStats) {
            Integer nodeIndex = contentNode.getNodeIndex();

            ContentNodeStats statsToAdd = clusterStats.getNodeStats(nodeIndex);
            if (statsToAdd != null) {
                contentNode.add(statsToAdd);
            }

            if (prevClusterStats != null) {
                ContentNodeStats statsToSubtract = prevClusterStats.getNodeStats(nodeIndex);
                if (statsToSubtract != null) {
                    contentNode.subtract(statsToSubtract);
                }
            }
        }
    }

}