blob: 6fb31cc1b1c8287c5d96a4396913ed1f9f5b319f (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
|
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.clustercontroller.core;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
/**
* Class that stores content cluster stats (with bucket space stats per node) for
* the current cluster state version.
*
* Each distributor reports bucket space stats for the different content nodes.
* These reports arrive with getnodestate RPC calls,
* and eventually ends up as calls to updateForDistributor().
* No assumptions are made on the sequence of getnodestate calls.
* For instance, it's perfectly fine for the calls to arrive in the
* following order:
* distributor 0
* distributor 1
* distributor 1
* distributor 0
* distributor 2
* ... etc
*
* @author hakonhall
*/
public class ClusterStatsAggregator {
private final Set<Integer> distributors;
private final Set<Integer> nonUpdatedDistributors;
// Maps the distributor node index to a map of content node index to the
// content node's stats.
private final Map<Integer, ContentClusterStats> distributorToStats = new HashMap<>();
// This is only needed as an optimization. Is just the sum of distributorToStats' ContentClusterStats.
// Maps the content node index to the content node stats for that node.
// This MUST be kept up-to-date with distributorToStats;
private final ContentClusterStats aggregatedStats;
// This is the aggregate of aggregates across content nodes, allowing a reader to
// get a O(1) view of all merges pending in the cluster.
private final ContentNodeStats globallyAggregatedNodeStats = new ContentNodeStats(-1);
ClusterStatsAggregator(Set<Integer> distributors, Set<Integer> storageNodes) {
this.distributors = distributors;
nonUpdatedDistributors = new HashSet<>(distributors);
aggregatedStats = new ContentClusterStats(storageNodes);
}
public AggregatedClusterStats getAggregatedStats() {
return new AggregatedClusterStats() {
@Override
public boolean hasUpdatesFromAllDistributors() {
return nonUpdatedDistributors.isEmpty();
}
@Override
public ContentClusterStats getStats() {
return aggregatedStats;
}
@Override
public ContentNodeStats getGlobalStats() {
return globallyAggregatedNodeStats;
}
};
}
public ContentNodeStats getAggregatedStatsForDistributor(int distributorIndex) {
ContentNodeStats result = new ContentNodeStats(distributorIndex);
ContentClusterStats distributorStats = distributorToStats.get(distributorIndex);
if (distributorStats != null) {
for (ContentNodeStats distributorStat : distributorStats) {
result.add(distributorStat);
}
}
return result;
}
MergePendingChecker createMergePendingChecker(double minMergeCompletionRatio) {
return new AggregatedStatsMergePendingChecker(getAggregatedStats(), minMergeCompletionRatio);
}
/**
* Update the aggregator with the newest available stats from a distributor.
*/
void updateForDistributor(int distributorIndex, ContentClusterStats clusterStats) {
if (!distributors.contains(distributorIndex)) {
return;
}
nonUpdatedDistributors.remove(distributorIndex);
addStatsFromDistributor(distributorIndex, clusterStats);
}
private void addStatsFromDistributor(int distributorIndex, ContentClusterStats clusterStats) {
ContentClusterStats prevClusterStats = distributorToStats.put(distributorIndex, clusterStats);
for (ContentNodeStats contentNode : aggregatedStats) {
Integer nodeIndex = contentNode.getNodeIndex();
ContentNodeStats statsToAdd = clusterStats.getNodeStats(nodeIndex);
if (statsToAdd != null) {
contentNode.add(statsToAdd);
globallyAggregatedNodeStats.add(statsToAdd);
}
if (prevClusterStats != null) {
ContentNodeStats statsToSubtract = prevClusterStats.getNodeStats(nodeIndex);
if (statsToSubtract != null) {
contentNode.subtract(statsToSubtract);
globallyAggregatedNodeStats.subtract(statsToSubtract);
}
}
}
}
}
|