diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2022-06-30 14:52:07 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2022-06-30 14:52:07 +0000 |
commit | b0d233d29927b9dcd22c3322ed45e2a4bf3f2337 (patch) | |
tree | 07c3745ae8c76129641051ec24cd3f9ebe15204d /documentapi | |
parent | c1ed50f4e96f8b2c9376fa9582d65023c0f0ea06 (diff) |
Make load balancer thread safe.
Diffstat (limited to 'documentapi')
4 files changed, 54 insertions, 33 deletions
diff --git a/documentapi/src/tests/policies/policies_test.cpp b/documentapi/src/tests/policies/policies_test.cpp index dfa9cbb581e..19960d148df 100644 --- a/documentapi/src/tests/policies/policies_test.cpp +++ b/documentapi/src/tests/policies/policies_test.cpp @@ -746,8 +746,6 @@ void Test::testLoadBalancer() { entries.push_back(IMirrorAPI::Spec("foo/1/default", "tcp/bar:2")); entries.push_back(IMirrorAPI::Spec("foo/2/default", "tcp/bar:3")); - const std::vector<LoadBalancer::NodeInfo>& nodeInfo = lb.getNodeInfo(); - for (int i = 0; i < 99; i++) { std::pair<string, int> recipient = lb.getRecipient(entries); EXPECT_EQUAL((i % 3), recipient.second); @@ -766,8 +764,8 @@ void Test::testLoadBalancer() { lb.received(1, false); } - EXPECT_EQUAL(421, (int)(100 * nodeInfo[0].weight / nodeInfo[1].weight)); - EXPECT_EQUAL(421, (int)(100 * nodeInfo[2].weight / nodeInfo[1].weight)); + EXPECT_EQUAL(421, (int)(100 * lb.getWeight(0) / lb.getWeight(1))); + EXPECT_EQUAL(421, (int)(100 * lb.getWeight(2) / lb.getWeight(1))); EXPECT_EQUAL(0 , lb.getRecipient(entries).second); EXPECT_EQUAL(0 , lb.getRecipient(entries).second); diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/loadbalancer.cpp b/documentapi/src/vespa/documentapi/messagebus/policies/loadbalancer.cpp index 75a0e4d966c..d3ba72b54b5 100644 --- a/documentapi/src/vespa/documentapi/messagebus/policies/loadbalancer.cpp +++ b/documentapi/src/vespa/documentapi/messagebus/policies/loadbalancer.cpp @@ -5,12 +5,14 @@ namespace documentapi { LoadBalancer::LoadBalancer(const string& cluster, const string& session) - : _cluster(cluster), + : _mutex(), + _nodeInfo(), + _cluster(cluster), _session(session), _position(0) {} -LoadBalancer::~LoadBalancer() {} +LoadBalancer::~LoadBalancer() = default; uint32_t LoadBalancer::getIndex(const string& name) const @@ -20,9 +22,26 @@ LoadBalancer::getIndex(const string& name) const return atoi(idx.c_str()); } +string +LoadBalancer::getLastSpec(size_t target) const { + lock_guard guard(_mutex); + return _nodeInfo[target].lastSpec; +} + +double +LoadBalancer::getWeight(size_t target) const { + lock_guard guard(_mutex); + return _nodeInfo[target].weight; +} + std::pair<string, int> -LoadBalancer::getRecipient(const slobrok::api::IMirrorAPI::SpecList& choices) -{ +LoadBalancer::getRecipient(const slobrok::api::IMirrorAPI::SpecList& choices) { + lock_guard guard(_mutex); + return getRecipient(guard, choices); +} + +std::pair<string, int> +LoadBalancer::getRecipient(const lock_guard & guard, const slobrok::api::IMirrorAPI::SpecList& choices) { std::pair<string, int> retVal("", -1); if (choices.size() == 0) { @@ -54,7 +73,7 @@ LoadBalancer::getRecipient(const slobrok::api::IMirrorAPI::SpecList& choices) if (retVal.second == -1) { _position -= weightSum; - return getRecipient(choices); + return getRecipient(guard, choices); } else { _position += 1.0; } @@ -63,7 +82,7 @@ LoadBalancer::getRecipient(const slobrok::api::IMirrorAPI::SpecList& choices) } void -LoadBalancer::normalizeWeights() { +LoadBalancer::normalizeWeights(const lock_guard &) { double lowest = -1.0; for (uint32_t i = 0; i < _nodeInfo.size(); i++) { @@ -88,10 +107,11 @@ LoadBalancer::normalizeWeights() { void LoadBalancer::received(uint32_t nodeIndex, bool busy) { if (busy) { + lock_guard guard(_mutex); NodeInfo& info = _nodeInfo[nodeIndex]; info.weight = info.weight - 0.01; - normalizeWeights(); + normalizeWeights(guard); } } diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/loadbalancer.h b/documentapi/src/vespa/documentapi/messagebus/policies/loadbalancer.h index 6627fc57ef2..f401fc6fabe 100644 --- a/documentapi/src/vespa/documentapi/messagebus/policies/loadbalancer.h +++ b/documentapi/src/vespa/documentapi/messagebus/policies/loadbalancer.h @@ -3,33 +3,17 @@ #include <vespa/documentapi/common.h> #include <vespa/slobrok/imirrorapi.h> +#include <mutex> namespace documentapi { class LoadBalancer { public: - class NodeInfo { - public: - NodeInfo() noexcept : valid(false), sent(0), busy(0), weight(1.0) {}; - - bool valid; - uint32_t sent; - uint32_t busy; - double weight; - string lastSpec; - }; - - std::vector<NodeInfo> _nodeInfo; - string _cluster; - string _session; - double _position; - LoadBalancer(const string& cluster, const string& session); ~LoadBalancer(); - const std::vector<NodeInfo>& getNodeInfo() const { return _nodeInfo; } - - uint32_t getIndex(const string& name) const; + string getLastSpec(size_t target) const; + double getWeight(size_t target) const; /** Returns the spec and the node index of the node we should send to. @@ -37,9 +21,28 @@ public: */ std::pair<string, int> getRecipient(const slobrok::api::IMirrorAPI::SpecList& choices); - void normalizeWeights(); - void received(uint32_t nodeIndex, bool busy); +private: + using lock_guard = std::lock_guard<std::mutex>; + std::pair<string, int> getRecipient(const lock_guard & guard, const slobrok::api::IMirrorAPI::SpecList& choices); + void normalizeWeights(const lock_guard & guard); + uint32_t getIndex(const string& name) const; + + class NodeInfo { + public: + NodeInfo() noexcept : weight(1.0), sent(0), busy(0), valid(false), lastSpec() {} + + double weight; + uint32_t sent; + uint32_t busy; + bool valid; + string lastSpec; + }; + mutable std::mutex _mutex; + std::vector<NodeInfo> _nodeInfo; + string _cluster; + string _session; + double _position; }; } diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/loadbalancerpolicy.cpp b/documentapi/src/vespa/documentapi/messagebus/policies/loadbalancerpolicy.cpp index 473d04ea7d9..160ee55adae 100644 --- a/documentapi/src/vespa/documentapi/messagebus/policies/loadbalancerpolicy.cpp +++ b/documentapi/src/vespa/documentapi/messagebus/policies/loadbalancerpolicy.cpp @@ -60,7 +60,7 @@ LoadBalancerPolicy::merge(mbus::RoutingContext& context) { bool busy = false; for (uint32_t i = 0; i < reply->getNumErrors(); i++) { if (reply->getError(i).getCode() == mbus::ErrorCode::SESSION_BUSY) { - string lastSpec = _loadBalancer->getNodeInfo()[target].lastSpec; + string lastSpec = _loadBalancer->getLastSpec(target); if (reply->getError(i).getMessage().find(lastSpec) == string::npos) { LOG(debug, "Received busy with message %s, doesn't contain target %s so not updating weight.", reply->getError(i).getMessage().c_str(), lastSpec.c_str()); |