aboutsummaryrefslogtreecommitdiffstats
path: root/slobrok
diff options
context:
space:
mode:
authorArne H Juul <arnej@yahooinc.com>2021-09-16 14:20:38 +0000
committerArne H Juul <arnej@yahooinc.com>2021-09-20 10:28:52 +0000
commit5ed228b3247ca234ad49904515f2db547f7d0f9f (patch)
treed2eb0156a2709881139fc64172078b62705a115b /slobrok
parent595ddada60f248862012b7e43095f54482ec6004 (diff)
stop sending old-sync RPC calls
Diffstat (limited to 'slobrok')
-rw-r--r--slobrok/src/vespa/slobrok/server/exchange_manager.cpp14
-rw-r--r--slobrok/src/vespa/slobrok/server/remote_slobrok.cpp176
-rw-r--r--slobrok/src/vespa/slobrok/server/remote_slobrok.h7
3 files changed, 4 insertions, 193 deletions
diff --git a/slobrok/src/vespa/slobrok/server/exchange_manager.cpp b/slobrok/src/vespa/slobrok/server/exchange_manager.cpp
index 94e951ca252..c0340e2432d 100644
--- a/slobrok/src/vespa/slobrok/server/exchange_manager.cpp
+++ b/slobrok/src/vespa/slobrok/server/exchange_manager.cpp
@@ -128,22 +128,8 @@ void
ExchangeManager::healthCheck()
{
auto newWorldList = env().consensusMap().currentConsensus();
- if (! _env.useNewLogic()) {
- auto oldWorldServices = env().rpcServerMap().allManaged();
- ServiceMappingList oldWorldList;
- for (const auto *nsp : oldWorldServices) {
- oldWorldList.emplace_back(nsp->getName(), nsp->getSpec());
- }
- std::sort(oldWorldList.begin(), oldWorldList.end());
- vespalib::string diff = diffLists(oldWorldList, newWorldList);
- if (! diff.empty()) {
- LOG(warning, "Diff from old world rpcServerMap to new world consensus map: %s",
- diff.c_str());
- }
- }
for (const auto & [ name, partner ] : _partners) {
partner->maybeStartFetch();
- partner->maybePushMine();
auto remoteList = partner->remoteMap().allMappings();
// 0 is expected (when remote is down)
if (remoteList.size() != 0) {
diff --git a/slobrok/src/vespa/slobrok/server/remote_slobrok.cpp b/slobrok/src/vespa/slobrok/server/remote_slobrok.cpp
index d867d955dac..2e7c9683286 100644
--- a/slobrok/src/vespa/slobrok/server/remote_slobrok.cpp
+++ b/slobrok/src/vespa/slobrok/server/remote_slobrok.cpp
@@ -26,11 +26,7 @@ RemoteSlobrok::RemoteSlobrok(const std::string &name, const std::string &spec,
_failCnt(0),
_consensusSubscription(MapSubscription::subscribe(_serviceMapMirror, _exchanger.env().consensusMap())),
_remAddPeerReq(nullptr),
- _remListReq(nullptr),
- _remAddReq(nullptr),
- _remRemReq(nullptr),
- _remFetchReq(nullptr),
- _pending()
+ _remFetchReq(nullptr)
{
_rpcserver.healthCheck();
}
@@ -38,8 +34,6 @@ RemoteSlobrok::RemoteSlobrok(const std::string &name, const std::string &spec,
void RemoteSlobrok::shutdown() {
_reconnecter.disable();
- _pending.clear();
-
if (_remote != nullptr) {
_remote->SubRef();
_remote = nullptr;
@@ -51,15 +45,6 @@ void RemoteSlobrok::shutdown() {
if (_remAddPeerReq != nullptr) {
_remAddPeerReq->Abort();
}
- if (_remListReq != nullptr) {
- _remListReq->Abort();
- }
- if (_remAddReq != nullptr) {
- _remAddReq->Abort();
- }
- if (_remRemReq != nullptr) {
- _remRemReq->Abort();
- }
_serviceMapMirror.clear();
}
@@ -68,39 +53,6 @@ RemoteSlobrok::~RemoteSlobrok() {
// _rpcserver destructor called automatically
}
-void
-RemoteSlobrok::doPending()
-{
- if (_remAddReq != nullptr) return;
- if (_remRemReq != nullptr) return;
-
- if (_remote == nullptr) return;
-
- if ( ! _pending.empty() ) {
- std::unique_ptr<NamedService> todo = std::move(_pending.front());
- _pending.pop_front();
-
- const NamedService *rpcsrv = _exchanger.rpcServerMap().lookup(todo->getName());
-
- if (rpcsrv == nullptr) {
- _remRemReq = getSupervisor()->AllocRPCRequest();
- _remRemReq->SetMethodName("slobrok.internal.doRemove");
- _remRemReq->GetParams()->AddString(_exchanger.env().mySpec().c_str());
- _remRemReq->GetParams()->AddString(todo->getName().c_str());
- _remRemReq->GetParams()->AddString(todo->getSpec().c_str());
- _remote->InvokeAsync(_remRemReq, 2.0, this);
- } else {
- _remAddReq = getSupervisor()->AllocRPCRequest();
- _remAddReq->SetMethodName("slobrok.internal.doAdd");
- _remAddReq->GetParams()->AddString(_exchanger.env().mySpec().c_str());
- _remAddReq->GetParams()->AddString(todo->getName().c_str());
- _remAddReq->GetParams()->AddString(rpcsrv->getSpec().c_str());
- _remote->InvokeAsync(_remAddReq, 2.0, this);
- }
- // XXX should save this and pick up on RequestDone()
- }
-}
-
void RemoteSlobrok::maybeStartFetch() {
if (_remFetchReq != nullptr) return;
if (_remote == nullptr) return;
@@ -168,21 +120,6 @@ void RemoteSlobrok::handleFetchResult() {
}
}
-
-
-void
-RemoteSlobrok::pushMine()
-{
- // all mine
- std::vector<const NamedService *> mine = _exchanger.rpcServerMap().allManaged();
- while (mine.size() > 0) {
- const NamedService *now = mine.back();
- mine.pop_back();
- _pending.push_back(std::make_unique<NamedService>(now->getName(), now->getSpec()));
- }
- doPending();
-}
-
void
RemoteSlobrok::RequestDone(FRT_RPCRequest *req)
{
@@ -190,7 +127,6 @@ RemoteSlobrok::RequestDone(FRT_RPCRequest *req)
handleFetchResult();
return;
}
- FRT_Values &answer = *(req->GetReturn());
if (req == _remAddPeerReq) {
// handle response after asking remote slobrok to add me as a peer:
if (req->IsError()) {
@@ -201,96 +137,15 @@ RemoteSlobrok::RequestDone(FRT_RPCRequest *req)
myname, myspec, getName().c_str(), getSpec().c_str(), req->GetErrorMessage());
req->SubRef();
_remAddPeerReq = nullptr;
- goto retrylater;
+ fail();
+ return;
}
req->SubRef();
_remAddPeerReq = nullptr;
- // next step is to ask the remote to send its list of managed names:
- LOG_ASSERT(_remListReq == nullptr);
- _remListReq = getSupervisor()->AllocRPCRequest();
- _remListReq->SetMethodName("slobrok.internal.listManagedRpcServers");
- if (_remote != nullptr) {
- _remote->InvokeAsync(_remListReq, 3.0, this);
- }
- // when _remListReq is returned, our managed list is added
- } else if (req == _remListReq) {
- // handle the list sent from the remote:
- if (req->IsError()
- || strcmp(answer.GetTypeString(), "SS") != 0)
- {
- LOG(error, "error listing remote slobrok %s at %s: %s",
- getName().c_str(), getSpec().c_str(), req->GetErrorMessage());
- req->SubRef();
- _remListReq = nullptr;
- goto retrylater;
- }
- uint32_t numNames = answer.GetValue(0)._string_array._len;
- uint32_t numSpecs = answer.GetValue(1)._string_array._len;
-
- if (numNames != numSpecs) {
- LOG(error, "inconsistent array lengths from %s at %s", getName().c_str(), getSpec().c_str());
- req->SubRef();
- _remListReq = nullptr;
- goto retrylater;
- }
- FRT_StringValue *names = answer.GetValue(0)._string_array._pt;
- FRT_StringValue *specs = answer.GetValue(1)._string_array._pt;
-
- for (uint32_t idx = 0; idx < numNames; idx++) {
- _rpcsrvmanager.addRemote(names[idx]._str, specs[idx]._str);
- }
- req->SubRef();
- _remListReq = nullptr;
-
- // next step is to push the ones I own:
- maybeStartFetch();
- maybePushMine();
- } else if (req == _remAddReq) {
- // handle response after pushing some name that we managed:
- if (req->IsError() && (req->GetErrorCode() == FRTE_RPC_CONNECTION ||
- req->GetErrorCode() == FRTE_RPC_TIMEOUT))
- {
- LOG(error, "connection error adding to remote slobrok: %s", req->GetErrorMessage());
- req->SubRef();
- _remAddReq = nullptr;
- goto retrylater;
- }
- if (req->IsError()) {
- FRT_Values &args = *req->GetParams();
- const char *rpcsrvname = args[1]._string._str;
- const char *rpcsrvspec = args[2]._string._str;
- LOG(warning, "error adding [%s -> %s] to remote slobrok: %s",
- rpcsrvname, rpcsrvspec, req->GetErrorMessage());
- _rpcsrvmanager.removeLocal(rpcsrvname, rpcsrvspec);
- }
- req->SubRef();
- _remAddReq = nullptr;
- doPending();
- } else if (req == _remRemReq) {
- // handle response after pushing some remove we had pending:
- if (req->IsError() && (req->GetErrorCode() == FRTE_RPC_CONNECTION ||
- req->GetErrorCode() == FRTE_RPC_TIMEOUT))
- {
- LOG(error, "connection error adding to remote slobrok: %s", req->GetErrorMessage());
- req->SubRef();
- _remRemReq = nullptr;
- goto retrylater;
- }
- if (req->IsError()) {
- LOG(warning, "error removing on remote slobrok: %s", req->GetErrorMessage());
- }
- req->SubRef();
- _remRemReq = nullptr;
- doPending();
} else {
LOG(error, "got unknown request back in RequestDone()");
LOG_ASSERT(req == nullptr);
}
-
- return;
- retrylater:
- fail();
- return;
}
@@ -321,25 +176,6 @@ RemoteSlobrok::fail()
_reconnecter.scheduleTryConnect();
}
-
-void
-RemoteSlobrok::maybePushMine()
-{
- if (_remote != nullptr &&
- _remAddPeerReq == nullptr &&
- _remListReq == nullptr &&
- _remAddReq == nullptr &&
- _remRemReq == nullptr)
- {
- LOG(debug, "spamming remote at %s with my names", getName().c_str());
- pushMine();
- } else {
- LOG(debug, "not pushing mine, as we have: remote %p r.a.p.r=%p r.l.r=%p r.a.r=%p r.r.r=%p",
- _remote, _remAddPeerReq, _remListReq, _remAddReq, _remRemReq);
- }
-}
-
-
void
RemoteSlobrok::notifyOkRpcSrv(ManagedRpcServer *rpcsrv)
{
@@ -357,10 +193,7 @@ RemoteSlobrok::notifyOkRpcSrv(ManagedRpcServer *rpcsrv)
_remote = getSupervisor()->GetTarget(getSpec().c_str());
maybeStartFetch();
- // at this point, we will do (in sequence):
- // ask peer to connect to us too;
- // ask peer for its list of managed rpcservers, adding to our database
- // add our managed rpcserver on peer
+ // at this point, we will ask peer to connect to us too;
// any failure will cause disconnect and retry.
_remAddPeerReq = getSupervisor()->AllocRPCRequest();
@@ -368,7 +201,6 @@ RemoteSlobrok::notifyOkRpcSrv(ManagedRpcServer *rpcsrv)
_remAddPeerReq->GetParams()->AddString(_exchanger.env().mySpec().c_str());
_remAddPeerReq->GetParams()->AddString(_exchanger.env().mySpec().c_str());
_remote->InvokeAsync(_remAddPeerReq, 3.0, this);
- // when _remAddPeerReq is returned, our managed list is added via doAdd()
}
void
diff --git a/slobrok/src/vespa/slobrok/server/remote_slobrok.h b/slobrok/src/vespa/slobrok/server/remote_slobrok.h
index b980aa90de0..d9086015f3a 100644
--- a/slobrok/src/vespa/slobrok/server/remote_slobrok.h
+++ b/slobrok/src/vespa/slobrok/server/remote_slobrok.h
@@ -53,14 +53,8 @@ private:
std::unique_ptr<MapSubscription> _consensusSubscription;
FRT_RPCRequest *_remAddPeerReq;
- FRT_RPCRequest *_remListReq;
- FRT_RPCRequest *_remAddReq;
- FRT_RPCRequest *_remRemReq;
FRT_RPCRequest *_remFetchReq;
- std::deque<std::unique_ptr<NamedService>> _pending;
- void pushMine();
- void doPending();
void handleFetchResult();
public:
@@ -72,7 +66,6 @@ public:
void fail();
bool isConnected() const { return (_remote != nullptr); }
void tryConnect();
- void maybePushMine();
void maybeStartFetch();
void invokeAsync(FRT_RPCRequest *req, double timeout, FRT_IRequestWait *rwaiter);
const std::string & getName() const { return _rpcserver.getName(); }