aboutsummaryrefslogtreecommitdiffstats
path: root/config/src/vespa/config/frt/frtconnection.cpp
blob: d9b48bc8bbe3b5139447c68640d29c72d196c98f (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "frtconnection.h"
#include <vespa/config/common/errorcode.h>
#include <vespa/fnet/frt/supervisor.h>
#include <vespa/fnet/frt/target.h>
#include <vespa/fnet/frt/rpcrequest.h>

#include <vespa/log/log.h>
LOG_SETUP(".config.frt.frtconnection");

using namespace vespalib;

namespace config {

FRTConnection::FRTConnection(const vespalib::string& address, FRT_Supervisor& supervisor, const TimingValues & timingValues)
    : _address(address),
      _transientDelay(timingValues.transientDelay),
      _fatalDelay(timingValues.fatalDelay),
      _supervisor(supervisor),
      _lock(),
      _target(0),
      _suspendedUntil(),
      _suspendWarned(),
      _transientFailures(0),
      _fatalFailures(0)
{
}

FRTConnection::~FRTConnection()
{
    if (_target != nullptr) {
        LOG(debug, "Shutting down %s", _address.c_str());
        _target->internal_subref();
        _target = nullptr;
    }
}

FRT_Target *
FRTConnection::getTarget()
{
    std::lock_guard guard(_lock);
    if (_target == nullptr) {
        _target = _supervisor.GetTarget(_address.c_str());
    } else if ( ! _target->IsValid()) {
        _target->internal_subref();
        _target = _supervisor.GetTarget(_address.c_str());
    }
    _target->internal_addref();
    return _target;
}

void
FRTConnection::invoke(FRT_RPCRequest * req, duration timeout, FRT_IRequestWait * waiter)
{
    FRT_Target * target = getTarget();
    target->InvokeAsync(req, vespalib::to_s(timeout), waiter);
    target->internal_subref();
}

void
FRTConnection::setError(int errorCode)
{
    switch(errorCode) {
    case FRTE_RPC_CONNECTION:
    case FRTE_RPC_TIMEOUT:
        calculateSuspension(TRANSIENT); break;
    case ErrorCode::UNKNOWN_CONFIG:
    case ErrorCode::UNKNOWN_DEFINITION:
    case ErrorCode::UNKNOWN_VERSION:
    case ErrorCode::UNKNOWN_CONFIGID:
    case ErrorCode::UNKNOWN_DEF_MD5:
    case ErrorCode::ILLEGAL_NAME:
    case ErrorCode::ILLEGAL_VERSION:
    case ErrorCode::ILLEGAL_CONFIGID:
    case ErrorCode::ILLEGAL_DEF_MD5:
    case ErrorCode::ILLEGAL_CONFIG_MD5:
    case ErrorCode::ILLEGAL_TIMEOUT:
    case ErrorCode::OUTDATED_CONFIG:
    case ErrorCode::INTERNAL_ERROR:
        calculateSuspension(FATAL); break;
    }
}

void FRTConnection::setSuccess()
{
    std::lock_guard guard(_lock);
    _transientFailures = 0;
    _fatalFailures = 0;
    _suspendedUntil = steady_time();
}

namespace {

constexpr uint32_t MAX_DELAY_MULTIPLIER = 6u;
constexpr vespalib::duration WARN_INTERVAL = 10s;

}

void FRTConnection::calculateSuspension(ErrorType type)
{
    duration delay = duration::zero();
    steady_time now = steady_clock::now();
    std::lock_guard guard(_lock);
    switch(type) {
    case TRANSIENT:
        delay = std::min(MAX_DELAY_MULTIPLIER, ++_transientFailures) * _transientDelay;
        LOG(debug, "Connection to %s failed or timed out", _address.c_str());
        break;
    case FATAL:
        delay = std::min(MAX_DELAY_MULTIPLIER, ++_fatalFailures) * _fatalDelay;
        break;
    }
    _suspendedUntil = now + delay;
    if (_suspendWarned < (now - WARN_INTERVAL)) {
        LOG(debug, "FRT Connection %s suspended until %s", _address.c_str(), vespalib::to_string(to_utc(_suspendedUntil)).c_str());
        _suspendWarned = now;
    }
}

FRT_RPCRequest *
FRTConnection::allocRPCRequest() {
    return _supervisor.AllocRPCRequest();
}

}