aboutsummaryrefslogtreecommitdiffstats
path: root/config
diff options
context:
space:
mode:
Diffstat (limited to 'config')
-rw-r--r--config/src/tests/frt/frt.cpp1
-rw-r--r--config/src/tests/frtconnectionpool/frtconnectionpool.cpp8
-rw-r--r--config/src/vespa/config/frt/connection.h1
-rw-r--r--config/src/vespa/config/frt/frtconnection.cpp46
-rw-r--r--config/src/vespa/config/frt/frtconnection.h35
-rw-r--r--config/src/vespa/config/frt/frtconnectionpool.cpp4
6 files changed, 43 insertions, 52 deletions
diff --git a/config/src/tests/frt/frt.cpp b/config/src/tests/frt/frt.cpp
index 5cfa125051a..4e77b289f49 100644
--- a/config/src/tests/frt/frt.cpp
+++ b/config/src/tests/frt/frt.cpp
@@ -125,7 +125,6 @@ struct Response {
waiter->RequestDone(req);
}
const vespalib::string & getAddress() const override { return address; }
- void setTransientDelay(duration delay) override { (void) delay; }
};
ConnectionMock::ConnectionMock(std::unique_ptr<Response> answer)
diff --git a/config/src/tests/frtconnectionpool/frtconnectionpool.cpp b/config/src/tests/frtconnectionpool/frtconnectionpool.cpp
index 7cd385deb8e..9c3498a92a1 100644
--- a/config/src/tests/frtconnectionpool/frtconnectionpool.cpp
+++ b/config/src/tests/frtconnectionpool/frtconnectionpool.cpp
@@ -5,7 +5,6 @@
#include <vespa/fnet/frt/error.h>
#include <vespa/fnet/transport.h>
#include <vespa/fastos/thread.h>
-#include <vespa/vespalib/util/size_literals.h>
#include <sstream>
#include <set>
#include <unistd.h>
@@ -218,9 +217,10 @@ void Test::testSetErrorAllHashBased() {
*/
void Test::testSuspensionTimeout() {
const ServerSpec spec(_sources);
- FRTConnectionPool sourcePool(_transport, spec, timingValues);
- Connection* source = sourcePool.getCurrent();
- source->setTransientDelay(1s);
+ TimingValues short_transient_delay;
+ short_transient_delay.transientDelay = 1s;
+ FRTConnectionPool sourcePool(_transport, spec, short_transient_delay);
+ FRTConnection* source = dynamic_cast<FRTConnection *>(sourcePool.getCurrent());
source->setError(FRTE_RPC_CONNECTION);
for (int i = 0; i < 9; i++) {
EXPECT_NOT_EQUAL(source->getAddress(), sourcePool.getCurrent()->getAddress());
diff --git a/config/src/vespa/config/frt/connection.h b/config/src/vespa/config/frt/connection.h
index ea4863123fc..328e9ede67a 100644
--- a/config/src/vespa/config/frt/connection.h
+++ b/config/src/vespa/config/frt/connection.h
@@ -16,7 +16,6 @@ public:
virtual void setError(int errorCode) = 0;
virtual void invoke(FRT_RPCRequest * req, duration timeout, FRT_IRequestWait * waiter) = 0;
virtual const vespalib::string & getAddress() const = 0;
- virtual void setTransientDelay(duration delay) = 0;
virtual ~Connection() = default;
};
diff --git a/config/src/vespa/config/frt/frtconnection.cpp b/config/src/vespa/config/frt/frtconnection.cpp
index c7e21842851..466239200d4 100644
--- a/config/src/vespa/config/frt/frtconnection.cpp
+++ b/config/src/vespa/config/frt/frtconnection.cpp
@@ -14,19 +14,21 @@ namespace config {
FRTConnection::FRTConnection(const vespalib::string& address, FRT_Supervisor& supervisor, const TimingValues & timingValues)
: _address(address),
+ _transientDelay(timingValues.transientDelay),
+ _fatalDelay(timingValues.fatalDelay),
_supervisor(supervisor),
+ _lock(),
_target(0),
_suspendedUntil(),
_suspendWarned(),
_transientFailures(0),
- _fatalFailures(0),
- _transientDelay(timingValues.transientDelay),
- _fatalDelay(timingValues.fatalDelay)
+ _fatalFailures(0)
{
}
FRTConnection::~FRTConnection()
{
+ std::lock_guard guard(_lock);
if (_target != nullptr) {
LOG(debug, "Shutting down %s", _address.c_str());
_target->SubRef();
@@ -37,6 +39,7 @@ FRTConnection::~FRTConnection()
FRT_Target *
FRTConnection::getTarget()
{
+ std::lock_guard guard(_lock);
if (_target == nullptr) {
_target = _supervisor.GetTarget(_address.c_str());
} else if ( ! _target->IsValid()) {
@@ -78,41 +81,36 @@ FRTConnection::setError(int errorCode)
void FRTConnection::setSuccess()
{
+ std::lock_guard guard(_lock);
_transientFailures = 0;
_fatalFailures = 0;
- _suspendedUntil = system_time();
+ _suspendedUntil = steady_time();
+}
+
+namespace {
+
+constexpr uint32_t MAX_DELAY_MULTIPLIER = 6u;
+constexpr vespalib::duration WARN_INTERVAL = 10s;
+
}
void FRTConnection::calculateSuspension(ErrorType type)
{
duration delay = duration::zero();
+ steady_time now = steady_clock::now();
+ std::lock_guard guard(_lock);
switch(type) {
case TRANSIENT:
- _transientFailures.fetch_add(1);
- delay = _transientFailures.load(std::memory_order_relaxed) * getTransientDelay();
- if (delay > getMaxTransientDelay()) {
- delay = getMaxTransientDelay();
- }
+ delay = std::min(MAX_DELAY_MULTIPLIER, ++_transientFailures) * _transientDelay;
LOG(warning, "Connection to %s failed or timed out", _address.c_str());
break;
case FATAL:
- _fatalFailures.fetch_add(1);
- delay = _fatalFailures.load(std::memory_order_relaxed) * getFatalDelay();
- if (delay > getMaxFatalDelay()) {
- delay = getMaxFatalDelay();
- }
+ delay = std::min(MAX_DELAY_MULTIPLIER, ++_fatalFailures) * _fatalDelay;
break;
}
- system_time now = system_clock::now();
- /*
- * On Darwin, the std::chrono::steady_clock period (std::nano) is
- * not exactly divisible by the std::chrono::system_clock period
- * (std::micro). Thus we need to use std::chrono::duration_cast to
- * convert from steady_time::duration to system_time::duration.
- */
- _suspendedUntil = now + std::chrono::duration_cast<system_time::duration>(delay);
- if (_suspendWarned < (now - 5s)) {
- LOG(warning, "FRT Connection %s suspended until %s", _address.c_str(), vespalib::to_string(_suspendedUntil).c_str());
+ _suspendedUntil = now + delay;
+ if (_suspendWarned < (now - WARN_INTERVAL)) {
+ LOG(warning, "FRT Connection %s suspended until %s", _address.c_str(), vespalib::to_string(to_utc(_suspendedUntil)).c_str());
_suspendWarned = now;
}
}
diff --git a/config/src/vespa/config/frt/frtconnection.h b/config/src/vespa/config/frt/frtconnection.h
index 0e491d61bbf..a85c29863f0 100644
--- a/config/src/vespa/config/frt/frtconnection.h
+++ b/config/src/vespa/config/frt/frtconnection.h
@@ -3,7 +3,6 @@
#include "connection.h"
#include <vespa/config/common/timingvalues.h>
-#include <atomic>
#include <memory>
class FRT_Supervisor;
@@ -12,19 +11,6 @@ class FRT_Target;
namespace config {
class FRTConnection : public Connection {
-private:
- const vespalib::string _address;
- FRT_Supervisor& _supervisor;
- FRT_Target* _target;
- vespalib::system_time _suspendedUntil;
- vespalib::system_time _suspendWarned;
- std::atomic<int> _transientFailures;
- std::atomic<int> _fatalFailures;
- duration _transientDelay;
- duration _fatalDelay;
-
- FRT_Target * getTarget();
-
public:
typedef std::shared_ptr<FRTConnection> SP;
enum ErrorType { TRANSIENT, FATAL };
@@ -37,15 +23,24 @@ public:
FRT_RPCRequest * allocRPCRequest() override;
void invoke(FRT_RPCRequest * req, duration timeout, FRT_IRequestWait * waiter) override;
const vespalib::string & getAddress() const override { return _address; }
- vespalib::system_time getSuspendedUntil() { return _suspendedUntil; }
+ vespalib::steady_time getSuspendedUntil() const { return _suspendedUntil; }
void setError(int errorCode) override;
void setSuccess();
+private:
+ FRT_Target * getTarget();
+
void calculateSuspension(ErrorType type);
- duration getTransientDelay() { return _transientDelay; }
- duration getMaxTransientDelay() { return getTransientDelay() * 6; }
- void setTransientDelay(duration delay) override { _transientDelay = delay; }
- duration getFatalDelay() { return _fatalDelay; }
- duration getMaxFatalDelay() { return getFatalDelay() * 6; }
+
+ const vespalib::string _address;
+ const duration _transientDelay;
+ const duration _fatalDelay;
+ FRT_Supervisor& _supervisor;
+ std::mutex _lock;
+ FRT_Target* _target;
+ vespalib::steady_time _suspendedUntil;
+ vespalib::steady_time _suspendWarned;
+ uint32_t _transientFailures;
+ uint32_t _fatalFailures;
};
} // namespace config
diff --git a/config/src/vespa/config/frt/frtconnectionpool.cpp b/config/src/vespa/config/frt/frtconnectionpool.cpp
index 916784896ab..21d6f0dbe90 100644
--- a/config/src/vespa/config/frt/frtconnectionpool.cpp
+++ b/config/src/vespa/config/frt/frtconnectionpool.cpp
@@ -132,9 +132,9 @@ std::vector<FRTConnection *>
FRTConnectionPool::getReadySources() const
{
std::vector<FRTConnection*> readySources;
+ auto timestamp = vespalib::steady_clock::now();
for (const auto & entry : _connections) {
FRTConnection* source = entry.second.get();
- vespalib::system_time timestamp = vespalib::system_clock::now();
if (source->getSuspendedUntil() < timestamp) {
readySources.push_back(source);
}
@@ -146,9 +146,9 @@ std::vector<FRTConnection *>
FRTConnectionPool::getSuspendedSources() const
{
std::vector<FRTConnection*> suspendedSources;
+ auto timestamp = vespalib::steady_clock::now();
for (const auto & entry : _connections) {
FRTConnection* source = entry.second.get();
- vespalib::system_time timestamp = vespalib::system_clock::now();
if (source->getSuspendedUntil() >= timestamp) {
suspendedSources.push_back(source);
}