1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
|
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "loadbalancer.h"
namespace documentapi {
LoadBalancer::LoadBalancer(const string& cluster, const string& session)
: _mutex(),
_nodeInfo(),
_cluster(cluster),
_session(session),
_position(0)
{}
LoadBalancer::~LoadBalancer() = default;
uint32_t
LoadBalancer::getIndex(const string& name) const
{
size_t lastSlash = name.find('/');
string idx = name.substr(_cluster.length() + 1, lastSlash);
return atoi(idx.c_str());
}
string
LoadBalancer::getLastSpec(size_t target) const {
lock_guard guard(_mutex);
return _nodeInfo[target].lastSpec;
}
double
LoadBalancer::getWeight(size_t target) const {
lock_guard guard(_mutex);
return _nodeInfo[target].weight;
}
std::pair<string, int>
LoadBalancer::getRecipient(const slobrok::api::IMirrorAPI::SpecList& choices) {
lock_guard guard(_mutex);
return getRecipient(guard, choices);
}
std::pair<string, int>
LoadBalancer::getRecipient(const lock_guard & guard, const slobrok::api::IMirrorAPI::SpecList& choices) {
std::pair<string, int> retVal("", -1);
if (choices.size() == 0) {
return retVal;
}
double weightSum = 0.0;
for (uint32_t i = 0; i < choices.size(); i++) {
const std::pair<string, string>& curr = choices[i];
uint32_t index = getIndex(curr.first);
if (_nodeInfo.size() < (index + 1)) {
_nodeInfo.resize(index + 1);
}
NodeInfo& info = _nodeInfo[index];
info.valid = true;
weightSum += info.weight;
if (weightSum > _position) {
retVal.first = curr.second;
retVal.second = index;
info.lastSpec = retVal.first;
break;
}
}
if (retVal.second == -1) {
_position -= weightSum;
return getRecipient(guard, choices);
} else {
_position += 1.0;
}
return retVal;
}
void
LoadBalancer::normalizeWeights(const lock_guard &) {
double lowest = -1.0;
for (uint32_t i = 0; i < _nodeInfo.size(); i++) {
if (!_nodeInfo[i].valid) {
continue;
}
if (lowest < 0 || _nodeInfo[i].weight < lowest) {
lowest = _nodeInfo[i].weight;
}
}
for (uint32_t i = 0; i < _nodeInfo.size(); i++) {
if (!_nodeInfo[i].valid) {
continue;
}
_nodeInfo[i].weight = _nodeInfo[i].weight / lowest;
}
}
void
LoadBalancer::received(uint32_t nodeIndex, bool busy) {
if (busy) {
lock_guard guard(_mutex);
NodeInfo& info = _nodeInfo[nodeIndex];
info.weight = info.weight - 0.01;
normalizeWeights(guard);
}
}
}
|