diff options
author | Arne H Juul <arnej@yahooinc.com> | 2021-09-16 14:20:38 +0000 |
---|---|---|
committer | Arne H Juul <arnej@yahooinc.com> | 2021-09-20 10:28:52 +0000 |
commit | 5ed228b3247ca234ad49904515f2db547f7d0f9f (patch) | |
tree | d2eb0156a2709881139fc64172078b62705a115b /slobrok | |
parent | 595ddada60f248862012b7e43095f54482ec6004 (diff) |
stop sending old-sync RPC calls
Diffstat (limited to 'slobrok')
-rw-r--r-- | slobrok/src/vespa/slobrok/server/exchange_manager.cpp | 14 | ||||
-rw-r--r-- | slobrok/src/vespa/slobrok/server/remote_slobrok.cpp | 176 | ||||
-rw-r--r-- | slobrok/src/vespa/slobrok/server/remote_slobrok.h | 7 |
3 files changed, 4 insertions, 193 deletions
diff --git a/slobrok/src/vespa/slobrok/server/exchange_manager.cpp b/slobrok/src/vespa/slobrok/server/exchange_manager.cpp index 94e951ca252..c0340e2432d 100644 --- a/slobrok/src/vespa/slobrok/server/exchange_manager.cpp +++ b/slobrok/src/vespa/slobrok/server/exchange_manager.cpp @@ -128,22 +128,8 @@ void ExchangeManager::healthCheck() { auto newWorldList = env().consensusMap().currentConsensus(); - if (! _env.useNewLogic()) { - auto oldWorldServices = env().rpcServerMap().allManaged(); - ServiceMappingList oldWorldList; - for (const auto *nsp : oldWorldServices) { - oldWorldList.emplace_back(nsp->getName(), nsp->getSpec()); - } - std::sort(oldWorldList.begin(), oldWorldList.end()); - vespalib::string diff = diffLists(oldWorldList, newWorldList); - if (! diff.empty()) { - LOG(warning, "Diff from old world rpcServerMap to new world consensus map: %s", - diff.c_str()); - } - } for (const auto & [ name, partner ] : _partners) { partner->maybeStartFetch(); - partner->maybePushMine(); auto remoteList = partner->remoteMap().allMappings(); // 0 is expected (when remote is down) if (remoteList.size() != 0) { diff --git a/slobrok/src/vespa/slobrok/server/remote_slobrok.cpp b/slobrok/src/vespa/slobrok/server/remote_slobrok.cpp index d867d955dac..2e7c9683286 100644 --- a/slobrok/src/vespa/slobrok/server/remote_slobrok.cpp +++ b/slobrok/src/vespa/slobrok/server/remote_slobrok.cpp @@ -26,11 +26,7 @@ RemoteSlobrok::RemoteSlobrok(const std::string &name, const std::string &spec, _failCnt(0), _consensusSubscription(MapSubscription::subscribe(_serviceMapMirror, _exchanger.env().consensusMap())), _remAddPeerReq(nullptr), - _remListReq(nullptr), - _remAddReq(nullptr), - _remRemReq(nullptr), - _remFetchReq(nullptr), - _pending() + _remFetchReq(nullptr) { _rpcserver.healthCheck(); } @@ -38,8 +34,6 @@ RemoteSlobrok::RemoteSlobrok(const std::string &name, const std::string &spec, void RemoteSlobrok::shutdown() { _reconnecter.disable(); - _pending.clear(); - if (_remote != nullptr) { _remote->SubRef(); _remote = nullptr; @@ -51,15 +45,6 @@ void RemoteSlobrok::shutdown() { if (_remAddPeerReq != nullptr) { _remAddPeerReq->Abort(); } - if (_remListReq != nullptr) { - _remListReq->Abort(); - } - if (_remAddReq != nullptr) { - _remAddReq->Abort(); - } - if (_remRemReq != nullptr) { - _remRemReq->Abort(); - } _serviceMapMirror.clear(); } @@ -68,39 +53,6 @@ RemoteSlobrok::~RemoteSlobrok() { // _rpcserver destructor called automatically } -void -RemoteSlobrok::doPending() -{ - if (_remAddReq != nullptr) return; - if (_remRemReq != nullptr) return; - - if (_remote == nullptr) return; - - if ( ! _pending.empty() ) { - std::unique_ptr<NamedService> todo = std::move(_pending.front()); - _pending.pop_front(); - - const NamedService *rpcsrv = _exchanger.rpcServerMap().lookup(todo->getName()); - - if (rpcsrv == nullptr) { - _remRemReq = getSupervisor()->AllocRPCRequest(); - _remRemReq->SetMethodName("slobrok.internal.doRemove"); - _remRemReq->GetParams()->AddString(_exchanger.env().mySpec().c_str()); - _remRemReq->GetParams()->AddString(todo->getName().c_str()); - _remRemReq->GetParams()->AddString(todo->getSpec().c_str()); - _remote->InvokeAsync(_remRemReq, 2.0, this); - } else { - _remAddReq = getSupervisor()->AllocRPCRequest(); - _remAddReq->SetMethodName("slobrok.internal.doAdd"); - _remAddReq->GetParams()->AddString(_exchanger.env().mySpec().c_str()); - _remAddReq->GetParams()->AddString(todo->getName().c_str()); - _remAddReq->GetParams()->AddString(rpcsrv->getSpec().c_str()); - _remote->InvokeAsync(_remAddReq, 2.0, this); - } - // XXX should save this and pick up on RequestDone() - } -} - void RemoteSlobrok::maybeStartFetch() { if (_remFetchReq != nullptr) return; if (_remote == nullptr) return; @@ -168,21 +120,6 @@ void RemoteSlobrok::handleFetchResult() { } } - - -void -RemoteSlobrok::pushMine() -{ - // all mine - std::vector<const NamedService *> mine = _exchanger.rpcServerMap().allManaged(); - while (mine.size() > 0) { - const NamedService *now = mine.back(); - mine.pop_back(); - _pending.push_back(std::make_unique<NamedService>(now->getName(), now->getSpec())); - } - doPending(); -} - void RemoteSlobrok::RequestDone(FRT_RPCRequest *req) { @@ -190,7 +127,6 @@ RemoteSlobrok::RequestDone(FRT_RPCRequest *req) handleFetchResult(); return; } - FRT_Values &answer = *(req->GetReturn()); if (req == _remAddPeerReq) { // handle response after asking remote slobrok to add me as a peer: if (req->IsError()) { @@ -201,96 +137,15 @@ RemoteSlobrok::RequestDone(FRT_RPCRequest *req) myname, myspec, getName().c_str(), getSpec().c_str(), req->GetErrorMessage()); req->SubRef(); _remAddPeerReq = nullptr; - goto retrylater; + fail(); + return; } req->SubRef(); _remAddPeerReq = nullptr; - // next step is to ask the remote to send its list of managed names: - LOG_ASSERT(_remListReq == nullptr); - _remListReq = getSupervisor()->AllocRPCRequest(); - _remListReq->SetMethodName("slobrok.internal.listManagedRpcServers"); - if (_remote != nullptr) { - _remote->InvokeAsync(_remListReq, 3.0, this); - } - // when _remListReq is returned, our managed list is added - } else if (req == _remListReq) { - // handle the list sent from the remote: - if (req->IsError() - || strcmp(answer.GetTypeString(), "SS") != 0) - { - LOG(error, "error listing remote slobrok %s at %s: %s", - getName().c_str(), getSpec().c_str(), req->GetErrorMessage()); - req->SubRef(); - _remListReq = nullptr; - goto retrylater; - } - uint32_t numNames = answer.GetValue(0)._string_array._len; - uint32_t numSpecs = answer.GetValue(1)._string_array._len; - - if (numNames != numSpecs) { - LOG(error, "inconsistent array lengths from %s at %s", getName().c_str(), getSpec().c_str()); - req->SubRef(); - _remListReq = nullptr; - goto retrylater; - } - FRT_StringValue *names = answer.GetValue(0)._string_array._pt; - FRT_StringValue *specs = answer.GetValue(1)._string_array._pt; - - for (uint32_t idx = 0; idx < numNames; idx++) { - _rpcsrvmanager.addRemote(names[idx]._str, specs[idx]._str); - } - req->SubRef(); - _remListReq = nullptr; - - // next step is to push the ones I own: - maybeStartFetch(); - maybePushMine(); - } else if (req == _remAddReq) { - // handle response after pushing some name that we managed: - if (req->IsError() && (req->GetErrorCode() == FRTE_RPC_CONNECTION || - req->GetErrorCode() == FRTE_RPC_TIMEOUT)) - { - LOG(error, "connection error adding to remote slobrok: %s", req->GetErrorMessage()); - req->SubRef(); - _remAddReq = nullptr; - goto retrylater; - } - if (req->IsError()) { - FRT_Values &args = *req->GetParams(); - const char *rpcsrvname = args[1]._string._str; - const char *rpcsrvspec = args[2]._string._str; - LOG(warning, "error adding [%s -> %s] to remote slobrok: %s", - rpcsrvname, rpcsrvspec, req->GetErrorMessage()); - _rpcsrvmanager.removeLocal(rpcsrvname, rpcsrvspec); - } - req->SubRef(); - _remAddReq = nullptr; - doPending(); - } else if (req == _remRemReq) { - // handle response after pushing some remove we had pending: - if (req->IsError() && (req->GetErrorCode() == FRTE_RPC_CONNECTION || - req->GetErrorCode() == FRTE_RPC_TIMEOUT)) - { - LOG(error, "connection error adding to remote slobrok: %s", req->GetErrorMessage()); - req->SubRef(); - _remRemReq = nullptr; - goto retrylater; - } - if (req->IsError()) { - LOG(warning, "error removing on remote slobrok: %s", req->GetErrorMessage()); - } - req->SubRef(); - _remRemReq = nullptr; - doPending(); } else { LOG(error, "got unknown request back in RequestDone()"); LOG_ASSERT(req == nullptr); } - - return; - retrylater: - fail(); - return; } @@ -321,25 +176,6 @@ RemoteSlobrok::fail() _reconnecter.scheduleTryConnect(); } - -void -RemoteSlobrok::maybePushMine() -{ - if (_remote != nullptr && - _remAddPeerReq == nullptr && - _remListReq == nullptr && - _remAddReq == nullptr && - _remRemReq == nullptr) - { - LOG(debug, "spamming remote at %s with my names", getName().c_str()); - pushMine(); - } else { - LOG(debug, "not pushing mine, as we have: remote %p r.a.p.r=%p r.l.r=%p r.a.r=%p r.r.r=%p", - _remote, _remAddPeerReq, _remListReq, _remAddReq, _remRemReq); - } -} - - void RemoteSlobrok::notifyOkRpcSrv(ManagedRpcServer *rpcsrv) { @@ -357,10 +193,7 @@ RemoteSlobrok::notifyOkRpcSrv(ManagedRpcServer *rpcsrv) _remote = getSupervisor()->GetTarget(getSpec().c_str()); maybeStartFetch(); - // at this point, we will do (in sequence): - // ask peer to connect to us too; - // ask peer for its list of managed rpcservers, adding to our database - // add our managed rpcserver on peer + // at this point, we will ask peer to connect to us too; // any failure will cause disconnect and retry. _remAddPeerReq = getSupervisor()->AllocRPCRequest(); @@ -368,7 +201,6 @@ RemoteSlobrok::notifyOkRpcSrv(ManagedRpcServer *rpcsrv) _remAddPeerReq->GetParams()->AddString(_exchanger.env().mySpec().c_str()); _remAddPeerReq->GetParams()->AddString(_exchanger.env().mySpec().c_str()); _remote->InvokeAsync(_remAddPeerReq, 3.0, this); - // when _remAddPeerReq is returned, our managed list is added via doAdd() } void diff --git a/slobrok/src/vespa/slobrok/server/remote_slobrok.h b/slobrok/src/vespa/slobrok/server/remote_slobrok.h index b980aa90de0..d9086015f3a 100644 --- a/slobrok/src/vespa/slobrok/server/remote_slobrok.h +++ b/slobrok/src/vespa/slobrok/server/remote_slobrok.h @@ -53,14 +53,8 @@ private: std::unique_ptr<MapSubscription> _consensusSubscription; FRT_RPCRequest *_remAddPeerReq; - FRT_RPCRequest *_remListReq; - FRT_RPCRequest *_remAddReq; - FRT_RPCRequest *_remRemReq; FRT_RPCRequest *_remFetchReq; - std::deque<std::unique_ptr<NamedService>> _pending; - void pushMine(); - void doPending(); void handleFetchResult(); public: @@ -72,7 +66,6 @@ public: void fail(); bool isConnected() const { return (_remote != nullptr); } void tryConnect(); - void maybePushMine(); void maybeStartFetch(); void invokeAsync(FRT_RPCRequest *req, double timeout, FRT_IRequestWait *rwaiter); const std::string & getName() const { return _rpcserver.getName(); } |