diff options
author | Arne H Juul <arnej@yahooinc.com> | 2021-09-10 07:53:42 +0000 |
---|---|---|
committer | Arne H Juul <arnej@yahooinc.com> | 2021-09-10 10:01:17 +0000 |
commit | f514ebda393a385787091a8d26e4d8e159424735 (patch) | |
tree | 7084bab39bbb6bc5fc4dde27a97661ac1609d047 /slobrok | |
parent | 23dd3ea6e797560374407a1d03e3ab41c13f0191 (diff) |
rewrite to use new logic
Diffstat (limited to 'slobrok')
-rw-r--r-- | slobrok/src/vespa/slobrok/server/rpchooks.cpp | 116 | ||||
-rw-r--r-- | slobrok/src/vespa/slobrok/server/rpchooks.h | 6 | ||||
-rw-r--r-- | slobrok/src/vespa/slobrok/server/sbenv.cpp | 10 |
3 files changed, 131 insertions, 1 deletions
diff --git a/slobrok/src/vespa/slobrok/server/rpchooks.cpp b/slobrok/src/vespa/slobrok/server/rpchooks.cpp index d40fd93afd6..d5797902833 100644 --- a/slobrok/src/vespa/slobrok/server/rpchooks.cpp +++ b/slobrok/src/vespa/slobrok/server/rpchooks.cpp @@ -238,6 +238,9 @@ RPCHooks::rpc_listNamesServed(FRT_RPCRequest *req) void RPCHooks::rpc_registerRpcServer(FRT_RPCRequest *req) { + if (useNewLogic()) { + return new_registerRpcServer(req); + } FRT_Values &args = *req->GetParams(); const char *dName = args[0]._string._str; const char *dSpec = args[1]._string._str; @@ -270,9 +273,31 @@ RPCHooks::rpc_registerRpcServer(FRT_RPCRequest *req) completer.doRequest(); } +void RPCHooks::new_registerRpcServer(FRT_RPCRequest *req) { + FRT_Values &args = *req->GetParams(); + const char *dName = args[0]._string._str; + const char *dSpec = args[1]._string._str; + LOG(debug, "RPC: invoked registerRpcServer(%s,%s)", dName, dSpec); + _cnts.registerReqs++; + ServiceMapping mapping{dName, dSpec}; + // can we say now, that this will fail? + if (_env.consensusMap().wouldConflict(mapping)) { + req->SetError(FRTE_RPC_METHOD_FAILED, "conflict detected"); + LOG(info, "cannot register %s at %s: conflict", dName, dSpec); + return; + } + auto script = ScriptCommand::makeRegCompleter(_env, dName, dSpec, req); + req->Detach(); + _env.localMonitorMap().addLocal(mapping, std::make_unique<ScriptCommandWrapper>(std::move(script))); + return; +} + void RPCHooks::rpc_unregisterRpcServer(FRT_RPCRequest *req) { + if (useNewLogic()) { + return new_unregisterRpcServer(req); + } FRT_Values &args = *req->GetParams(); const char *dName = args[0]._string._str; const char *dSpec = args[1]._string._str; @@ -288,6 +313,23 @@ RPCHooks::rpc_unregisterRpcServer(FRT_RPCRequest *req) return; } +void RPCHooks::new_unregisterRpcServer(FRT_RPCRequest *req) { + FRT_Values &args = *req->GetParams(); + const char *dName = args[0]._string._str; + const char *dSpec = args[1]._string._str; + ServiceMapping mapping{dName, dSpec}; + bool ok = ! _env.consensusMap().wouldConflict(mapping); + if (! ok) { + req->SetError(FRTE_RPC_METHOD_FAILED, "conflict detected"); + } + _env.localMonitorMap().removeLocal(mapping); + _env.exchangeManager().forwardRemove(dName, dSpec); + LOG(debug, "unregisterRpcServer(%s,%s) %s", + dName, dSpec, + ok ? "OK" : "failed"); + _cnts.otherReqs++; + return; +} void RPCHooks::rpc_addPeer(FRT_RPCRequest *req) @@ -332,6 +374,9 @@ RPCHooks::rpc_removePeer(FRT_RPCRequest *req) void RPCHooks::rpc_wantAdd(FRT_RPCRequest *req) { + if (useNewLogic()) { + return new_wantAdd(req); + } FRT_Values &args = *req->GetParams(); const char *remsb = args[0]._string._str; const char *dName = args[1]._string._str; @@ -351,10 +396,38 @@ RPCHooks::rpc_wantAdd(FRT_RPCRequest *req) return; } +void RPCHooks::new_wantAdd(FRT_RPCRequest *req) { + FRT_Values &args = *req->GetParams(); + const char *remsb = args[0]._string._str; + const char *dName = args[1]._string._str; + const char *dSpec = args[2]._string._str; + FRT_Values &retval = *req->GetReturn(); + ServiceMapping mapping{dName, dSpec}; + bool conflict = ( + _env.consensusMap().wouldConflict(mapping) + || + _env.localMonitorMap().wouldConflict(mapping) + ); + if (conflict) { + retval.AddInt32(13); + retval.AddString("conflict detected"); + req->SetError(FRTE_RPC_METHOD_FAILED, "conflict detected"); + } else { + retval.AddInt32(0); + retval.AddString("ok"); + } + LOG(debug, "%s->wantAdd(%s,%s) %s", + remsb, dName, dSpec, conflict ? "conflict" : "OK"); + _cnts.wantAddReqs++; + return; +} void RPCHooks::rpc_doRemove(FRT_RPCRequest *req) { + if (useNewLogic()) { + return new_doRemove(req); + } FRT_Values &args = *req->GetParams(); const char *rname = args[0]._string._str; const char *dname = args[1]._string._str; @@ -374,9 +447,27 @@ RPCHooks::rpc_doRemove(FRT_RPCRequest *req) return; } +void RPCHooks::new_doRemove(FRT_RPCRequest *req) { + FRT_Values &args = *req->GetParams(); + const char *rname = args[0]._string._str; + const char *dname = args[1]._string._str; + const char *dspec = args[2]._string._str; + FRT_Values &retval = *req->GetReturn(); + ServiceMapping mapping{dname, dspec}; + _env.localMonitorMap().removeLocal(mapping); + retval.AddInt32(0); + retval.AddString("ok"); + LOG(debug, "%s->doRemove(%s,%s)", rname, dname, dspec); + _cnts.doRemoveReqs++; + return; +} + void RPCHooks::rpc_doAdd(FRT_RPCRequest *req) { + if (useNewLogic()) { + return new_doAdd(req); + } FRT_Values &args = *req->GetParams(); const char *rname = args[0]._string._str; const char *dname = args[1]._string._str; @@ -396,6 +487,30 @@ RPCHooks::rpc_doAdd(FRT_RPCRequest *req) return; } +void RPCHooks::new_doAdd(FRT_RPCRequest *req) { + FRT_Values &args = *req->GetParams(); + const char *remsb = args[0]._string._str; + const char *dName = args[1]._string._str; + const char *dSpec = args[2]._string._str; + FRT_Values &retval = *req->GetReturn(); + ServiceMapping mapping{dName, dSpec}; + bool ok = true; + if (_env.consensusMap().wouldConflict(mapping)) { + retval.AddInt32(13); + retval.AddString("conflict detected"); + req->SetError(FRTE_RPC_METHOD_FAILED, "conflict detected"); + ok = false; + } else { + retval.AddInt32(0); + retval.AddString("ok"); + auto script = ScriptCommand::makeIgnoreCmd(_env, dName, dSpec); + _env.localMonitorMap().addLocal(mapping, std::make_unique<ScriptCommandWrapper>(std::move(script))); + } + LOG(debug, "%s->doAdd(%s,%s) %s", + remsb, dName, dSpec, ok ? "OK" : "failed"); + _cnts.wantAddReqs++; + return; +} void RPCHooks::rpc_lookupRpcServer(FRT_RPCRequest *req) @@ -465,6 +580,7 @@ RPCHooks::rpc_lookupManaged(FRT_RPCRequest *req) FRT_Values &args = *req->GetParams(); const char *name = args[0]._string._str; LOG(debug, "RPC: lookupManaged(%s)", name); + // TODO: use local history here const auto & visible = _env.globalHistory(); auto diff = visible.makeDiffFrom(0); for (const auto & entry : diff.updated) { diff --git a/slobrok/src/vespa/slobrok/server/rpchooks.h b/slobrok/src/vespa/slobrok/server/rpchooks.h index 19ae64df5f1..e8f6c65ea47 100644 --- a/slobrok/src/vespa/slobrok/server/rpchooks.h +++ b/slobrok/src/vespa/slobrok/server/rpchooks.h @@ -62,6 +62,12 @@ private: void rpc_lookupRpcServer(FRT_RPCRequest *req); + void new_registerRpcServer(FRT_RPCRequest *req); + void new_unregisterRpcServer(FRT_RPCRequest *req); + void new_wantAdd(FRT_RPCRequest *req); + void new_doRemove(FRT_RPCRequest *req); + void new_doAdd(FRT_RPCRequest *req); + void rpc_registerRpcServer(FRT_RPCRequest *req); void rpc_unregisterRpcServer(FRT_RPCRequest *req); diff --git a/slobrok/src/vespa/slobrok/server/sbenv.cpp b/slobrok/src/vespa/slobrok/server/sbenv.cpp index 9fb5511948c..ebb9935877f 100644 --- a/slobrok/src/vespa/slobrok/server/sbenv.cpp +++ b/slobrok/src/vespa/slobrok/server/sbenv.cpp @@ -121,11 +121,19 @@ SBEnv::SBEnv(const ConfigShim &shim, bool useNewConsensusLogic) _exchanger(*this, _rpcsrvmap), _rpcsrvmap() { + if (useNewLogic()) { + srandom(time(nullptr) ^ getpid()); + // note: feedback loop between these two: + _localMonitorSubscription = MapSubscription::subscribe(_consensusMap, _localRpcMonitorMap); + _consensusSubscription = MapSubscription::subscribe(_localRpcMonitorMap.dispatcher(), _consensusMap); + _globalHistorySubscription = MapSubscription::subscribe(_consensusMap, _globalVisibleHistory); + _rpcHooks.initRPC(getSupervisor()); + return; + } srandom(time(nullptr) ^ getpid()); // note: feedback loop between these two: _localMonitorSubscription = MapSubscription::subscribe(_consensusMap, _localRpcMonitorMap); _consensusSubscription = MapSubscription::subscribe(_localRpcMonitorMap.dispatcher(), _consensusMap); - // TODO: use consensus as source here: _globalHistorySubscription = MapSubscription::subscribe(_rpcsrvmap.proxy(), _globalVisibleHistory); _rpcHooks.initRPC(getSupervisor()); } |