summaryrefslogtreecommitdiffstats
path: root/slobrok
diff options
context:
space:
mode:
authorArne Juul <arnej@verizonmedia.com>2021-08-13 10:54:06 +0000
committerArne Juul <arnej@verizonmedia.com>2021-08-13 12:10:57 +0000
commit4fe5d5167df46dc8bc30e87deadb951d118145c0 (patch)
treeb1d53b9421f88ebc390c7d750b46f9f9f829da78 /slobrok
parent0885348f7b3c4dd728f01370b9fd5196658f0bd5 (diff)
let RemoteSlobrok do its own initial health check
Diffstat (limited to 'slobrok')
-rw-r--r--slobrok/src/vespa/slobrok/server/remote_slobrok.cpp112
-rw-r--r--slobrok/src/vespa/slobrok/server/remote_slobrok.h18
2 files changed, 70 insertions, 60 deletions
diff --git a/slobrok/src/vespa/slobrok/server/remote_slobrok.cpp b/slobrok/src/vespa/slobrok/server/remote_slobrok.cpp
index 78ca25d4723..c03a25cec1a 100644
--- a/slobrok/src/vespa/slobrok/server/remote_slobrok.cpp
+++ b/slobrok/src/vespa/slobrok/server/remote_slobrok.cpp
@@ -15,12 +15,13 @@ namespace slobrok {
//-----------------------------------------------------------------------------
-RemoteSlobrok::RemoteSlobrok(const std::string &name, const std::string &spec,
+RemoteSlobrok::RemoteSlobrok(const vespalib::string &name, const vespalib::string &spec,
ExchangeManager &manager)
- : _exchanger(manager),
+ : _name(name),
+ _spec(spec),
+ _exchanger(manager),
_rpcsrvmanager(manager._rpcsrvmanager),
_remote(nullptr),
- _rpcserver(name, spec, *this),
_reconnecter(getSupervisor()->GetScheduler(), *this),
_failCnt(0),
_remAddPeerReq(nullptr),
@@ -30,7 +31,7 @@ RemoteSlobrok::RemoteSlobrok(const std::string &name, const std::string &spec,
_remFetchReq(nullptr),
_pending()
{
- _rpcserver.healthCheck();
+ tryConnect();
}
void RemoteSlobrok::shutdown() {
@@ -43,6 +44,9 @@ void RemoteSlobrok::shutdown() {
_remote = nullptr;
}
+ if (_checkServerReq != nullptr) {
+ _checkServerReq->Abort();
+ }
if (_remFetchReq != nullptr) {
_remFetchReq->Abort();
}
@@ -63,7 +67,6 @@ void RemoteSlobrok::shutdown() {
RemoteSlobrok::~RemoteSlobrok() {
shutdown();
- // _rpcserver destructor called automatically
}
void
@@ -183,6 +186,10 @@ RemoteSlobrok::pushMine()
void
RemoteSlobrok::RequestDone(FRT_RPCRequest *req)
{
+ if (req == _checkServerReq) {
+ handleCheckServerResult();
+ return;
+ }
if (req == _remFetchReq) {
handleFetchResult();
return;
@@ -292,21 +299,6 @@ RemoteSlobrok::RequestDone(FRT_RPCRequest *req)
void
-RemoteSlobrok::notifyFailedRpcSrv(ManagedRpcServer *rpcsrv, std::string errmsg)
-{
- if (++_failCnt > 10) {
- LOG(warning, "remote location broker at %s failed: %s",
- rpcsrv->getSpec().c_str(), errmsg.c_str());
- } else {
- LOG(debug, "remote location broker at %s failed: %s",
- rpcsrv->getSpec().c_str(), errmsg.c_str());
- }
- LOG_ASSERT(rpcsrv == &_rpcserver);
- fail();
-}
-
-
-void
RemoteSlobrok::fail()
{
// disconnect
@@ -336,42 +328,60 @@ RemoteSlobrok::maybePushMine()
}
}
-
void
-RemoteSlobrok::notifyOkRpcSrv(ManagedRpcServer *rpcsrv)
+RemoteSlobrok::tryConnect()
{
- LOG_ASSERT(rpcsrv == &_rpcserver);
- (void) rpcsrv;
-
- // connection was OK, so disable any pending reconnect
- _reconnecter.disable();
-
- if (_remote != nullptr) {
- maybeStartFetch();
- // the rest here should only be done on first notifyOk
- return;
+ if (_remote == nullptr) {
+ _remote = getSupervisor()->GetTarget(getSpec().c_str());
+ LOG_ASSERT(_checkServerReq == nullptr);
+ _checkServerReq = getSupervisor()->AllocRPCRequest();
+ _checkServerReq->SetMethodName("slobrok.callback.listNamesServed");
+ _remote->InvokeAsync(_checkServerReq, 25.0, this);
}
- _remote = getSupervisor()->GetTarget(getSpec().c_str());
- maybeStartFetch();
-
- // at this point, we will do (in sequence):
- // ask peer to connect to us too;
- // ask peer for its list of managed rpcservers, adding to our database
- // add our managed rpcserver on peer
- // any failure will cause disconnect and retry.
-
- _remAddPeerReq = getSupervisor()->AllocRPCRequest();
- _remAddPeerReq->SetMethodName("slobrok.admin.addPeer");
- _remAddPeerReq->GetParams()->AddString(_exchanger._env.mySpec().c_str());
- _remAddPeerReq->GetParams()->AddString(_exchanger._env.mySpec().c_str());
- _remote->InvokeAsync(_remAddPeerReq, 3.0, this);
- // when _remAddPeerReq is returned, our managed list is added via doAdd()
}
-void
-RemoteSlobrok::tryConnect()
-{
- _rpcserver.healthCheck();
+void RemoteSlobrok::handleCheckServerResult() {
+ LOG_ASSERT(_checkServerReq != nullptr);
+ auto & req = *_checkServerReq;
+ _checkServerReq = nullptr;
+ if (req.GetErrorCode() == FRTE_RPC_ABORT) {
+ LOG(debug, "slobrok[%s].check aborted", _name.c_str());
+ req.SubRef();
+ return;
+ }
+ bool isOk = false;
+ if (req.CheckReturnTypes("S")) {
+ const FRT_Values &answer = *req.GetReturn();
+ const FRT_StringValue *strings = answer[0]._string_array._pt;
+ for (uint32_t i = 0; i < answer[0]._string_array._len; ++i) {
+ if (strcmp(strings[i]._str, _name.c_str()) == 0) {
+ isOk = true;
+ }
+ }
+ }
+ req.SubRef();
+ if (isOk) {
+ LOG_ASSERT(_remote != nullptr);
+ // connection was OK, so disable any pending reconnect
+ _reconnecter.disable();
+ maybeStartFetch();
+ // at this point, we will do (in sequence):
+ // ask peer to connect to us too;
+ // ask peer for its list of managed rpcservers, adding to our database
+ // add our managed rpcserver on peer
+ // any failure will cause disconnect and retry.
+ _remAddPeerReq = getSupervisor()->AllocRPCRequest();
+ _remAddPeerReq->SetMethodName("slobrok.admin.addPeer");
+ _remAddPeerReq->GetParams()->AddString(_exchanger._env.mySpec().c_str());
+ _remAddPeerReq->GetParams()->AddString(_exchanger._env.mySpec().c_str());
+ _remote->InvokeAsync(_remAddPeerReq, 3.0, this);
+ // when _remAddPeerReq is returned, our managed list is added via doAdd()
+ } else {
+ if (++_failCnt > 10) {
+ LOG(warning, "remote location broker at %s failed", _spec.c_str());
+ }
+ fail();
+ }
}
FRT_Supervisor *
diff --git a/slobrok/src/vespa/slobrok/server/remote_slobrok.h b/slobrok/src/vespa/slobrok/server/remote_slobrok.h
index e463ac6be21..401a2b4ab39 100644
--- a/slobrok/src/vespa/slobrok/server/remote_slobrok.h
+++ b/slobrok/src/vespa/slobrok/server/remote_slobrok.h
@@ -23,8 +23,7 @@ class ExchangeManager;
*
* Handles one single partner slobrok
**/
-class RemoteSlobrok: public IRpcServerManager,
- public FRT_IRequestWait
+class RemoteSlobrok: public FRT_IRequestWait
{
private:
class Reconnecter : public FNET_Task
@@ -42,11 +41,12 @@ private:
void PerformTask() override;
};
+ vespalib::string _name;
+ vespalib::string _spec;
ExchangeManager &_exchanger;
RpcServerManager &_rpcsrvmanager;
FRT_Target *_remote;
ServiceMapMirror _serviceMapMirror;
- ManagedRpcServer _rpcserver;
Reconnecter _reconnecter;
int _failCnt;
@@ -55,16 +55,18 @@ private:
FRT_RPCRequest *_remAddReq;
FRT_RPCRequest *_remRemReq;
FRT_RPCRequest *_remFetchReq;
+ FRT_RPCRequest *_checkServerReq;
std::deque<std::unique_ptr<NamedService>> _pending;
void pushMine();
void doPending();
+ void handleCheckServerResult();
void handleFetchResult();
public:
RemoteSlobrok(const RemoteSlobrok&) = delete;
RemoteSlobrok& operator= (const RemoteSlobrok&) = delete;
- RemoteSlobrok(const std::string &name, const std::string &spec, ExchangeManager &manager);
+ RemoteSlobrok(const vespalib::string &name, const vespalib::string &spec, ExchangeManager &manager);
~RemoteSlobrok() override;
void fail();
@@ -73,16 +75,14 @@ public:
void maybePushMine();
void maybeStartFetch();
void invokeAsync(FRT_RPCRequest *req, double timeout, FRT_IRequestWait *rwaiter);
- const std::string & getName() const { return _rpcserver.getName(); }
- const std::string & getSpec() const { return _rpcserver.getSpec(); }
+ const vespalib::string & getName() const { return _name; }
+ const vespalib::string & getSpec() const { return _spec; }
ServiceMapMirror &remoteMap() { return _serviceMapMirror; }
void shutdown();
+ FRT_Supervisor *getSupervisor();
// interfaces implemented:
- void notifyFailedRpcSrv(ManagedRpcServer *rpcsrv, std::string errmsg) override;
- void notifyOkRpcSrv(ManagedRpcServer *rpcsrv) override;
void RequestDone(FRT_RPCRequest *req) override;
- FRT_Supervisor *getSupervisor() override;
};
//-----------------------------------------------------------------------------