diff options
Diffstat (limited to 'config')
-rw-r--r-- | config/src/tests/frt/frt.cpp | 1 | ||||
-rw-r--r-- | config/src/tests/frtconnectionpool/frtconnectionpool.cpp | 8 | ||||
-rw-r--r-- | config/src/vespa/config/frt/connection.h | 1 | ||||
-rw-r--r-- | config/src/vespa/config/frt/frtconnection.cpp | 46 | ||||
-rw-r--r-- | config/src/vespa/config/frt/frtconnection.h | 35 | ||||
-rw-r--r-- | config/src/vespa/config/frt/frtconnectionpool.cpp | 4 |
6 files changed, 43 insertions, 52 deletions
diff --git a/config/src/tests/frt/frt.cpp b/config/src/tests/frt/frt.cpp index 5cfa125051a..4e77b289f49 100644 --- a/config/src/tests/frt/frt.cpp +++ b/config/src/tests/frt/frt.cpp @@ -125,7 +125,6 @@ struct Response { waiter->RequestDone(req); } const vespalib::string & getAddress() const override { return address; } - void setTransientDelay(duration delay) override { (void) delay; } }; ConnectionMock::ConnectionMock(std::unique_ptr<Response> answer) diff --git a/config/src/tests/frtconnectionpool/frtconnectionpool.cpp b/config/src/tests/frtconnectionpool/frtconnectionpool.cpp index 7cd385deb8e..9c3498a92a1 100644 --- a/config/src/tests/frtconnectionpool/frtconnectionpool.cpp +++ b/config/src/tests/frtconnectionpool/frtconnectionpool.cpp @@ -5,7 +5,6 @@ #include <vespa/fnet/frt/error.h> #include <vespa/fnet/transport.h> #include <vespa/fastos/thread.h> -#include <vespa/vespalib/util/size_literals.h> #include <sstream> #include <set> #include <unistd.h> @@ -218,9 +217,10 @@ void Test::testSetErrorAllHashBased() { */ void Test::testSuspensionTimeout() { const ServerSpec spec(_sources); - FRTConnectionPool sourcePool(_transport, spec, timingValues); - Connection* source = sourcePool.getCurrent(); - source->setTransientDelay(1s); + TimingValues short_transient_delay; + short_transient_delay.transientDelay = 1s; + FRTConnectionPool sourcePool(_transport, spec, short_transient_delay); + FRTConnection* source = dynamic_cast<FRTConnection *>(sourcePool.getCurrent()); source->setError(FRTE_RPC_CONNECTION); for (int i = 0; i < 9; i++) { EXPECT_NOT_EQUAL(source->getAddress(), sourcePool.getCurrent()->getAddress()); diff --git a/config/src/vespa/config/frt/connection.h b/config/src/vespa/config/frt/connection.h index ea4863123fc..328e9ede67a 100644 --- a/config/src/vespa/config/frt/connection.h +++ b/config/src/vespa/config/frt/connection.h @@ -16,7 +16,6 @@ public: virtual void setError(int errorCode) = 0; virtual void invoke(FRT_RPCRequest * req, duration timeout, FRT_IRequestWait * waiter) = 0; virtual const vespalib::string & getAddress() const = 0; - virtual void setTransientDelay(duration delay) = 0; virtual ~Connection() = default; }; diff --git a/config/src/vespa/config/frt/frtconnection.cpp b/config/src/vespa/config/frt/frtconnection.cpp index c7e21842851..466239200d4 100644 --- a/config/src/vespa/config/frt/frtconnection.cpp +++ b/config/src/vespa/config/frt/frtconnection.cpp @@ -14,19 +14,21 @@ namespace config { FRTConnection::FRTConnection(const vespalib::string& address, FRT_Supervisor& supervisor, const TimingValues & timingValues) : _address(address), + _transientDelay(timingValues.transientDelay), + _fatalDelay(timingValues.fatalDelay), _supervisor(supervisor), + _lock(), _target(0), _suspendedUntil(), _suspendWarned(), _transientFailures(0), - _fatalFailures(0), - _transientDelay(timingValues.transientDelay), - _fatalDelay(timingValues.fatalDelay) + _fatalFailures(0) { } FRTConnection::~FRTConnection() { + std::lock_guard guard(_lock); if (_target != nullptr) { LOG(debug, "Shutting down %s", _address.c_str()); _target->SubRef(); @@ -37,6 +39,7 @@ FRTConnection::~FRTConnection() FRT_Target * FRTConnection::getTarget() { + std::lock_guard guard(_lock); if (_target == nullptr) { _target = _supervisor.GetTarget(_address.c_str()); } else if ( ! _target->IsValid()) { @@ -78,41 +81,36 @@ FRTConnection::setError(int errorCode) void FRTConnection::setSuccess() { + std::lock_guard guard(_lock); _transientFailures = 0; _fatalFailures = 0; - _suspendedUntil = system_time(); + _suspendedUntil = steady_time(); +} + +namespace { + +constexpr uint32_t MAX_DELAY_MULTIPLIER = 6u; +constexpr vespalib::duration WARN_INTERVAL = 10s; + } void FRTConnection::calculateSuspension(ErrorType type) { duration delay = duration::zero(); + steady_time now = steady_clock::now(); + std::lock_guard guard(_lock); switch(type) { case TRANSIENT: - _transientFailures.fetch_add(1); - delay = _transientFailures.load(std::memory_order_relaxed) * getTransientDelay(); - if (delay > getMaxTransientDelay()) { - delay = getMaxTransientDelay(); - } + delay = std::min(MAX_DELAY_MULTIPLIER, ++_transientFailures) * _transientDelay; LOG(warning, "Connection to %s failed or timed out", _address.c_str()); break; case FATAL: - _fatalFailures.fetch_add(1); - delay = _fatalFailures.load(std::memory_order_relaxed) * getFatalDelay(); - if (delay > getMaxFatalDelay()) { - delay = getMaxFatalDelay(); - } + delay = std::min(MAX_DELAY_MULTIPLIER, ++_fatalFailures) * _fatalDelay; break; } - system_time now = system_clock::now(); - /* - * On Darwin, the std::chrono::steady_clock period (std::nano) is - * not exactly divisible by the std::chrono::system_clock period - * (std::micro). Thus we need to use std::chrono::duration_cast to - * convert from steady_time::duration to system_time::duration. - */ - _suspendedUntil = now + std::chrono::duration_cast<system_time::duration>(delay); - if (_suspendWarned < (now - 5s)) { - LOG(warning, "FRT Connection %s suspended until %s", _address.c_str(), vespalib::to_string(_suspendedUntil).c_str()); + _suspendedUntil = now + delay; + if (_suspendWarned < (now - WARN_INTERVAL)) { + LOG(warning, "FRT Connection %s suspended until %s", _address.c_str(), vespalib::to_string(to_utc(_suspendedUntil)).c_str()); _suspendWarned = now; } } diff --git a/config/src/vespa/config/frt/frtconnection.h b/config/src/vespa/config/frt/frtconnection.h index 0e491d61bbf..a85c29863f0 100644 --- a/config/src/vespa/config/frt/frtconnection.h +++ b/config/src/vespa/config/frt/frtconnection.h @@ -3,7 +3,6 @@ #include "connection.h" #include <vespa/config/common/timingvalues.h> -#include <atomic> #include <memory> class FRT_Supervisor; @@ -12,19 +11,6 @@ class FRT_Target; namespace config { class FRTConnection : public Connection { -private: - const vespalib::string _address; - FRT_Supervisor& _supervisor; - FRT_Target* _target; - vespalib::system_time _suspendedUntil; - vespalib::system_time _suspendWarned; - std::atomic<int> _transientFailures; - std::atomic<int> _fatalFailures; - duration _transientDelay; - duration _fatalDelay; - - FRT_Target * getTarget(); - public: typedef std::shared_ptr<FRTConnection> SP; enum ErrorType { TRANSIENT, FATAL }; @@ -37,15 +23,24 @@ public: FRT_RPCRequest * allocRPCRequest() override; void invoke(FRT_RPCRequest * req, duration timeout, FRT_IRequestWait * waiter) override; const vespalib::string & getAddress() const override { return _address; } - vespalib::system_time getSuspendedUntil() { return _suspendedUntil; } + vespalib::steady_time getSuspendedUntil() const { return _suspendedUntil; } void setError(int errorCode) override; void setSuccess(); +private: + FRT_Target * getTarget(); + void calculateSuspension(ErrorType type); - duration getTransientDelay() { return _transientDelay; } - duration getMaxTransientDelay() { return getTransientDelay() * 6; } - void setTransientDelay(duration delay) override { _transientDelay = delay; } - duration getFatalDelay() { return _fatalDelay; } - duration getMaxFatalDelay() { return getFatalDelay() * 6; } + + const vespalib::string _address; + const duration _transientDelay; + const duration _fatalDelay; + FRT_Supervisor& _supervisor; + std::mutex _lock; + FRT_Target* _target; + vespalib::steady_time _suspendedUntil; + vespalib::steady_time _suspendWarned; + uint32_t _transientFailures; + uint32_t _fatalFailures; }; } // namespace config diff --git a/config/src/vespa/config/frt/frtconnectionpool.cpp b/config/src/vespa/config/frt/frtconnectionpool.cpp index 916784896ab..21d6f0dbe90 100644 --- a/config/src/vespa/config/frt/frtconnectionpool.cpp +++ b/config/src/vespa/config/frt/frtconnectionpool.cpp @@ -132,9 +132,9 @@ std::vector<FRTConnection *> FRTConnectionPool::getReadySources() const { std::vector<FRTConnection*> readySources; + auto timestamp = vespalib::steady_clock::now(); for (const auto & entry : _connections) { FRTConnection* source = entry.second.get(); - vespalib::system_time timestamp = vespalib::system_clock::now(); if (source->getSuspendedUntil() < timestamp) { readySources.push_back(source); } @@ -146,9 +146,9 @@ std::vector<FRTConnection *> FRTConnectionPool::getSuspendedSources() const { std::vector<FRTConnection*> suspendedSources; + auto timestamp = vespalib::steady_clock::now(); for (const auto & entry : _connections) { FRTConnection* source = entry.second.get(); - vespalib::system_time timestamp = vespalib::system_clock::now(); if (source->getSuspendedUntil() >= timestamp) { suspendedSources.push_back(source); } |