summaryrefslogtreecommitdiffstats
path: root/documentapi
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2022-06-30 14:52:07 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2022-06-30 14:52:07 +0000
commitb0d233d29927b9dcd22c3322ed45e2a4bf3f2337 (patch)
tree07c3745ae8c76129641051ec24cd3f9ebe15204d /documentapi
parentc1ed50f4e96f8b2c9376fa9582d65023c0f0ea06 (diff)
Make load balancer thread safe.
Diffstat (limited to 'documentapi')
-rw-r--r--documentapi/src/tests/policies/policies_test.cpp6
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/policies/loadbalancer.cpp34
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/policies/loadbalancer.h45
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/policies/loadbalancerpolicy.cpp2
4 files changed, 54 insertions, 33 deletions
diff --git a/documentapi/src/tests/policies/policies_test.cpp b/documentapi/src/tests/policies/policies_test.cpp
index dfa9cbb581e..19960d148df 100644
--- a/documentapi/src/tests/policies/policies_test.cpp
+++ b/documentapi/src/tests/policies/policies_test.cpp
@@ -746,8 +746,6 @@ void Test::testLoadBalancer() {
entries.push_back(IMirrorAPI::Spec("foo/1/default", "tcp/bar:2"));
entries.push_back(IMirrorAPI::Spec("foo/2/default", "tcp/bar:3"));
- const std::vector<LoadBalancer::NodeInfo>& nodeInfo = lb.getNodeInfo();
-
for (int i = 0; i < 99; i++) {
std::pair<string, int> recipient = lb.getRecipient(entries);
EXPECT_EQUAL((i % 3), recipient.second);
@@ -766,8 +764,8 @@ void Test::testLoadBalancer() {
lb.received(1, false);
}
- EXPECT_EQUAL(421, (int)(100 * nodeInfo[0].weight / nodeInfo[1].weight));
- EXPECT_EQUAL(421, (int)(100 * nodeInfo[2].weight / nodeInfo[1].weight));
+ EXPECT_EQUAL(421, (int)(100 * lb.getWeight(0) / lb.getWeight(1)));
+ EXPECT_EQUAL(421, (int)(100 * lb.getWeight(2) / lb.getWeight(1)));
EXPECT_EQUAL(0 , lb.getRecipient(entries).second);
EXPECT_EQUAL(0 , lb.getRecipient(entries).second);
diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/loadbalancer.cpp b/documentapi/src/vespa/documentapi/messagebus/policies/loadbalancer.cpp
index 75a0e4d966c..d3ba72b54b5 100644
--- a/documentapi/src/vespa/documentapi/messagebus/policies/loadbalancer.cpp
+++ b/documentapi/src/vespa/documentapi/messagebus/policies/loadbalancer.cpp
@@ -5,12 +5,14 @@
namespace documentapi {
LoadBalancer::LoadBalancer(const string& cluster, const string& session)
- : _cluster(cluster),
+ : _mutex(),
+ _nodeInfo(),
+ _cluster(cluster),
_session(session),
_position(0)
{}
-LoadBalancer::~LoadBalancer() {}
+LoadBalancer::~LoadBalancer() = default;
uint32_t
LoadBalancer::getIndex(const string& name) const
@@ -20,9 +22,26 @@ LoadBalancer::getIndex(const string& name) const
return atoi(idx.c_str());
}
+string
+LoadBalancer::getLastSpec(size_t target) const {
+ lock_guard guard(_mutex);
+ return _nodeInfo[target].lastSpec;
+}
+
+double
+LoadBalancer::getWeight(size_t target) const {
+ lock_guard guard(_mutex);
+ return _nodeInfo[target].weight;
+}
+
std::pair<string, int>
-LoadBalancer::getRecipient(const slobrok::api::IMirrorAPI::SpecList& choices)
-{
+LoadBalancer::getRecipient(const slobrok::api::IMirrorAPI::SpecList& choices) {
+ lock_guard guard(_mutex);
+ return getRecipient(guard, choices);
+}
+
+std::pair<string, int>
+LoadBalancer::getRecipient(const lock_guard & guard, const slobrok::api::IMirrorAPI::SpecList& choices) {
std::pair<string, int> retVal("", -1);
if (choices.size() == 0) {
@@ -54,7 +73,7 @@ LoadBalancer::getRecipient(const slobrok::api::IMirrorAPI::SpecList& choices)
if (retVal.second == -1) {
_position -= weightSum;
- return getRecipient(choices);
+ return getRecipient(guard, choices);
} else {
_position += 1.0;
}
@@ -63,7 +82,7 @@ LoadBalancer::getRecipient(const slobrok::api::IMirrorAPI::SpecList& choices)
}
void
-LoadBalancer::normalizeWeights() {
+LoadBalancer::normalizeWeights(const lock_guard &) {
double lowest = -1.0;
for (uint32_t i = 0; i < _nodeInfo.size(); i++) {
@@ -88,10 +107,11 @@ LoadBalancer::normalizeWeights() {
void
LoadBalancer::received(uint32_t nodeIndex, bool busy) {
if (busy) {
+ lock_guard guard(_mutex);
NodeInfo& info = _nodeInfo[nodeIndex];
info.weight = info.weight - 0.01;
- normalizeWeights();
+ normalizeWeights(guard);
}
}
diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/loadbalancer.h b/documentapi/src/vespa/documentapi/messagebus/policies/loadbalancer.h
index 6627fc57ef2..f401fc6fabe 100644
--- a/documentapi/src/vespa/documentapi/messagebus/policies/loadbalancer.h
+++ b/documentapi/src/vespa/documentapi/messagebus/policies/loadbalancer.h
@@ -3,33 +3,17 @@
#include <vespa/documentapi/common.h>
#include <vespa/slobrok/imirrorapi.h>
+#include <mutex>
namespace documentapi {
class LoadBalancer {
public:
- class NodeInfo {
- public:
- NodeInfo() noexcept : valid(false), sent(0), busy(0), weight(1.0) {};
-
- bool valid;
- uint32_t sent;
- uint32_t busy;
- double weight;
- string lastSpec;
- };
-
- std::vector<NodeInfo> _nodeInfo;
- string _cluster;
- string _session;
- double _position;
-
LoadBalancer(const string& cluster, const string& session);
~LoadBalancer();
- const std::vector<NodeInfo>& getNodeInfo() const { return _nodeInfo; }
-
- uint32_t getIndex(const string& name) const;
+ string getLastSpec(size_t target) const;
+ double getWeight(size_t target) const;
/**
Returns the spec and the node index of the node we should send to.
@@ -37,9 +21,28 @@ public:
*/
std::pair<string, int> getRecipient(const slobrok::api::IMirrorAPI::SpecList& choices);
- void normalizeWeights();
-
void received(uint32_t nodeIndex, bool busy);
+private:
+ using lock_guard = std::lock_guard<std::mutex>;
+ std::pair<string, int> getRecipient(const lock_guard & guard, const slobrok::api::IMirrorAPI::SpecList& choices);
+ void normalizeWeights(const lock_guard & guard);
+ uint32_t getIndex(const string& name) const;
+
+ class NodeInfo {
+ public:
+ NodeInfo() noexcept : weight(1.0), sent(0), busy(0), valid(false), lastSpec() {}
+
+ double weight;
+ uint32_t sent;
+ uint32_t busy;
+ bool valid;
+ string lastSpec;
+ };
+ mutable std::mutex _mutex;
+ std::vector<NodeInfo> _nodeInfo;
+ string _cluster;
+ string _session;
+ double _position;
};
}
diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/loadbalancerpolicy.cpp b/documentapi/src/vespa/documentapi/messagebus/policies/loadbalancerpolicy.cpp
index 473d04ea7d9..160ee55adae 100644
--- a/documentapi/src/vespa/documentapi/messagebus/policies/loadbalancerpolicy.cpp
+++ b/documentapi/src/vespa/documentapi/messagebus/policies/loadbalancerpolicy.cpp
@@ -60,7 +60,7 @@ LoadBalancerPolicy::merge(mbus::RoutingContext& context) {
bool busy = false;
for (uint32_t i = 0; i < reply->getNumErrors(); i++) {
if (reply->getError(i).getCode() == mbus::ErrorCode::SESSION_BUSY) {
- string lastSpec = _loadBalancer->getNodeInfo()[target].lastSpec;
+ string lastSpec = _loadBalancer->getLastSpec(target);
if (reply->getError(i).getMessage().find(lastSpec) == string::npos) {
LOG(debug, "Received busy with message %s, doesn't contain target %s so not updating weight.", reply->getError(i).getMessage().c_str(), lastSpec.c_str());