aboutsummaryrefslogtreecommitdiffstats
path: root/slobrok
diff options
context:
space:
mode:
authorArne H Juul <arnej@yahooinc.com>2021-09-10 07:53:42 +0000
committerArne H Juul <arnej@yahooinc.com>2021-09-10 10:01:17 +0000
commitf514ebda393a385787091a8d26e4d8e159424735 (patch)
tree7084bab39bbb6bc5fc4dde27a97661ac1609d047 /slobrok
parent23dd3ea6e797560374407a1d03e3ab41c13f0191 (diff)
rewrite to use new logic
Diffstat (limited to 'slobrok')
-rw-r--r--slobrok/src/vespa/slobrok/server/rpchooks.cpp116
-rw-r--r--slobrok/src/vespa/slobrok/server/rpchooks.h6
-rw-r--r--slobrok/src/vespa/slobrok/server/sbenv.cpp10
3 files changed, 131 insertions, 1 deletions
diff --git a/slobrok/src/vespa/slobrok/server/rpchooks.cpp b/slobrok/src/vespa/slobrok/server/rpchooks.cpp
index d40fd93afd6..d5797902833 100644
--- a/slobrok/src/vespa/slobrok/server/rpchooks.cpp
+++ b/slobrok/src/vespa/slobrok/server/rpchooks.cpp
@@ -238,6 +238,9 @@ RPCHooks::rpc_listNamesServed(FRT_RPCRequest *req)
void
RPCHooks::rpc_registerRpcServer(FRT_RPCRequest *req)
{
+ if (useNewLogic()) {
+ return new_registerRpcServer(req);
+ }
FRT_Values &args = *req->GetParams();
const char *dName = args[0]._string._str;
const char *dSpec = args[1]._string._str;
@@ -270,9 +273,31 @@ RPCHooks::rpc_registerRpcServer(FRT_RPCRequest *req)
completer.doRequest();
}
+void RPCHooks::new_registerRpcServer(FRT_RPCRequest *req) {
+ FRT_Values &args = *req->GetParams();
+ const char *dName = args[0]._string._str;
+ const char *dSpec = args[1]._string._str;
+ LOG(debug, "RPC: invoked registerRpcServer(%s,%s)", dName, dSpec);
+ _cnts.registerReqs++;
+ ServiceMapping mapping{dName, dSpec};
+ // can we say now, that this will fail?
+ if (_env.consensusMap().wouldConflict(mapping)) {
+ req->SetError(FRTE_RPC_METHOD_FAILED, "conflict detected");
+ LOG(info, "cannot register %s at %s: conflict", dName, dSpec);
+ return;
+ }
+ auto script = ScriptCommand::makeRegCompleter(_env, dName, dSpec, req);
+ req->Detach();
+ _env.localMonitorMap().addLocal(mapping, std::make_unique<ScriptCommandWrapper>(std::move(script)));
+ return;
+}
+
void
RPCHooks::rpc_unregisterRpcServer(FRT_RPCRequest *req)
{
+ if (useNewLogic()) {
+ return new_unregisterRpcServer(req);
+ }
FRT_Values &args = *req->GetParams();
const char *dName = args[0]._string._str;
const char *dSpec = args[1]._string._str;
@@ -288,6 +313,23 @@ RPCHooks::rpc_unregisterRpcServer(FRT_RPCRequest *req)
return;
}
+void RPCHooks::new_unregisterRpcServer(FRT_RPCRequest *req) {
+ FRT_Values &args = *req->GetParams();
+ const char *dName = args[0]._string._str;
+ const char *dSpec = args[1]._string._str;
+ ServiceMapping mapping{dName, dSpec};
+ bool ok = ! _env.consensusMap().wouldConflict(mapping);
+ if (! ok) {
+ req->SetError(FRTE_RPC_METHOD_FAILED, "conflict detected");
+ }
+ _env.localMonitorMap().removeLocal(mapping);
+ _env.exchangeManager().forwardRemove(dName, dSpec);
+ LOG(debug, "unregisterRpcServer(%s,%s) %s",
+ dName, dSpec,
+ ok ? "OK" : "failed");
+ _cnts.otherReqs++;
+ return;
+}
void
RPCHooks::rpc_addPeer(FRT_RPCRequest *req)
@@ -332,6 +374,9 @@ RPCHooks::rpc_removePeer(FRT_RPCRequest *req)
void
RPCHooks::rpc_wantAdd(FRT_RPCRequest *req)
{
+ if (useNewLogic()) {
+ return new_wantAdd(req);
+ }
FRT_Values &args = *req->GetParams();
const char *remsb = args[0]._string._str;
const char *dName = args[1]._string._str;
@@ -351,10 +396,38 @@ RPCHooks::rpc_wantAdd(FRT_RPCRequest *req)
return;
}
+void RPCHooks::new_wantAdd(FRT_RPCRequest *req) {
+ FRT_Values &args = *req->GetParams();
+ const char *remsb = args[0]._string._str;
+ const char *dName = args[1]._string._str;
+ const char *dSpec = args[2]._string._str;
+ FRT_Values &retval = *req->GetReturn();
+ ServiceMapping mapping{dName, dSpec};
+ bool conflict = (
+ _env.consensusMap().wouldConflict(mapping)
+ ||
+ _env.localMonitorMap().wouldConflict(mapping)
+ );
+ if (conflict) {
+ retval.AddInt32(13);
+ retval.AddString("conflict detected");
+ req->SetError(FRTE_RPC_METHOD_FAILED, "conflict detected");
+ } else {
+ retval.AddInt32(0);
+ retval.AddString("ok");
+ }
+ LOG(debug, "%s->wantAdd(%s,%s) %s",
+ remsb, dName, dSpec, conflict ? "conflict" : "OK");
+ _cnts.wantAddReqs++;
+ return;
+}
void
RPCHooks::rpc_doRemove(FRT_RPCRequest *req)
{
+ if (useNewLogic()) {
+ return new_doRemove(req);
+ }
FRT_Values &args = *req->GetParams();
const char *rname = args[0]._string._str;
const char *dname = args[1]._string._str;
@@ -374,9 +447,27 @@ RPCHooks::rpc_doRemove(FRT_RPCRequest *req)
return;
}
+void RPCHooks::new_doRemove(FRT_RPCRequest *req) {
+ FRT_Values &args = *req->GetParams();
+ const char *rname = args[0]._string._str;
+ const char *dname = args[1]._string._str;
+ const char *dspec = args[2]._string._str;
+ FRT_Values &retval = *req->GetReturn();
+ ServiceMapping mapping{dname, dspec};
+ _env.localMonitorMap().removeLocal(mapping);
+ retval.AddInt32(0);
+ retval.AddString("ok");
+ LOG(debug, "%s->doRemove(%s,%s)", rname, dname, dspec);
+ _cnts.doRemoveReqs++;
+ return;
+}
+
void
RPCHooks::rpc_doAdd(FRT_RPCRequest *req)
{
+ if (useNewLogic()) {
+ return new_doAdd(req);
+ }
FRT_Values &args = *req->GetParams();
const char *rname = args[0]._string._str;
const char *dname = args[1]._string._str;
@@ -396,6 +487,30 @@ RPCHooks::rpc_doAdd(FRT_RPCRequest *req)
return;
}
+void RPCHooks::new_doAdd(FRT_RPCRequest *req) {
+ FRT_Values &args = *req->GetParams();
+ const char *remsb = args[0]._string._str;
+ const char *dName = args[1]._string._str;
+ const char *dSpec = args[2]._string._str;
+ FRT_Values &retval = *req->GetReturn();
+ ServiceMapping mapping{dName, dSpec};
+ bool ok = true;
+ if (_env.consensusMap().wouldConflict(mapping)) {
+ retval.AddInt32(13);
+ retval.AddString("conflict detected");
+ req->SetError(FRTE_RPC_METHOD_FAILED, "conflict detected");
+ ok = false;
+ } else {
+ retval.AddInt32(0);
+ retval.AddString("ok");
+ auto script = ScriptCommand::makeIgnoreCmd(_env, dName, dSpec);
+ _env.localMonitorMap().addLocal(mapping, std::make_unique<ScriptCommandWrapper>(std::move(script)));
+ }
+ LOG(debug, "%s->doAdd(%s,%s) %s",
+ remsb, dName, dSpec, ok ? "OK" : "failed");
+ _cnts.wantAddReqs++;
+ return;
+}
void
RPCHooks::rpc_lookupRpcServer(FRT_RPCRequest *req)
@@ -465,6 +580,7 @@ RPCHooks::rpc_lookupManaged(FRT_RPCRequest *req)
FRT_Values &args = *req->GetParams();
const char *name = args[0]._string._str;
LOG(debug, "RPC: lookupManaged(%s)", name);
+ // TODO: use local history here
const auto & visible = _env.globalHistory();
auto diff = visible.makeDiffFrom(0);
for (const auto & entry : diff.updated) {
diff --git a/slobrok/src/vespa/slobrok/server/rpchooks.h b/slobrok/src/vespa/slobrok/server/rpchooks.h
index 19ae64df5f1..e8f6c65ea47 100644
--- a/slobrok/src/vespa/slobrok/server/rpchooks.h
+++ b/slobrok/src/vespa/slobrok/server/rpchooks.h
@@ -62,6 +62,12 @@ private:
void rpc_lookupRpcServer(FRT_RPCRequest *req);
+ void new_registerRpcServer(FRT_RPCRequest *req);
+ void new_unregisterRpcServer(FRT_RPCRequest *req);
+ void new_wantAdd(FRT_RPCRequest *req);
+ void new_doRemove(FRT_RPCRequest *req);
+ void new_doAdd(FRT_RPCRequest *req);
+
void rpc_registerRpcServer(FRT_RPCRequest *req);
void rpc_unregisterRpcServer(FRT_RPCRequest *req);
diff --git a/slobrok/src/vespa/slobrok/server/sbenv.cpp b/slobrok/src/vespa/slobrok/server/sbenv.cpp
index 9fb5511948c..ebb9935877f 100644
--- a/slobrok/src/vespa/slobrok/server/sbenv.cpp
+++ b/slobrok/src/vespa/slobrok/server/sbenv.cpp
@@ -121,11 +121,19 @@ SBEnv::SBEnv(const ConfigShim &shim, bool useNewConsensusLogic)
_exchanger(*this, _rpcsrvmap),
_rpcsrvmap()
{
+ if (useNewLogic()) {
+ srandom(time(nullptr) ^ getpid());
+ // note: feedback loop between these two:
+ _localMonitorSubscription = MapSubscription::subscribe(_consensusMap, _localRpcMonitorMap);
+ _consensusSubscription = MapSubscription::subscribe(_localRpcMonitorMap.dispatcher(), _consensusMap);
+ _globalHistorySubscription = MapSubscription::subscribe(_consensusMap, _globalVisibleHistory);
+ _rpcHooks.initRPC(getSupervisor());
+ return;
+ }
srandom(time(nullptr) ^ getpid());
// note: feedback loop between these two:
_localMonitorSubscription = MapSubscription::subscribe(_consensusMap, _localRpcMonitorMap);
_consensusSubscription = MapSubscription::subscribe(_localRpcMonitorMap.dispatcher(), _consensusMap);
- // TODO: use consensus as source here:
_globalHistorySubscription = MapSubscription::subscribe(_rpcsrvmap.proxy(), _globalVisibleHistory);
_rpcHooks.initRPC(getSupervisor());
}