summaryrefslogtreecommitdiffstats
path: root/slobrok
diff options
context:
space:
mode:
authorArne H Juul <arnej27959@users.noreply.github.com>2021-08-20 15:31:33 +0200
committerGitHub <noreply@github.com>2021-08-20 15:31:33 +0200
commit87b495d4f9bda18b5189eb3039bbf0d19170de05 (patch)
tree194c7e65cecb1ab20c3c2310ae431336c0d373ac /slobrok
parent30c8dac6c619d6fe2d5858e5def4f64c64a11029 (diff)
parent78599e4d5669f1d6d354ee1fcc1106dd4ead63f5 (diff)
Merge pull request #18809 from vespa-engine/arnej/use-external-mapping-monitor
use new MappingMonitor
Diffstat (limited to 'slobrok')
-rw-r--r--slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.cpp143
-rw-r--r--slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.h55
-rw-r--r--slobrok/src/vespa/slobrok/server/sbenv.cpp5
-rw-r--r--slobrok/src/vespa/slobrok/server/sbenv.h1
4 files changed, 106 insertions, 98 deletions
diff --git a/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.cpp b/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.cpp
index 485e6d27a1c..70bbab37e3e 100644
--- a/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.cpp
+++ b/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.cpp
@@ -11,10 +11,6 @@ namespace slobrok {
#pragma GCC diagnostic ignored "-Winline"
void LocalRpcMonitorMap::DelayedTasks::PerformTask() {
- {
- std::vector<MUP> deleteAfterSwap;
- std::swap(deleteAfterSwap, _deleteList);
- }
std::vector<Event> todo;
std::swap(todo, _queue);
for (const auto & entry : todo) {
@@ -29,29 +25,57 @@ void LocalRpcMonitorMap::DelayedTasks::PerformTask() {
}
}
-LocalRpcMonitorMap::LocalRpcMonitorMap(FRT_Supervisor &supervisor)
+LocalRpcMonitorMap::LocalRpcMonitorMap(FRT_Supervisor &supervisor,
+ MappingMonitorFactory mappingMonitorFactory)
: _delayedTasks(supervisor.GetScheduler(), *this),
_map(),
_dispatcher(),
_history(),
- _supervisor(supervisor),
+ _mappingMonitor(mappingMonitorFactory(*this)),
_subscription(MapSubscription::subscribe(_dispatcher, _history))
{
}
LocalRpcMonitorMap::~LocalRpcMonitorMap() = default;
-LocalRpcMonitorMap::PerService *
-LocalRpcMonitorMap::lookup(ManagedRpcServer *rpcsrv) {
- auto iter = _map.find(rpcsrv->getName());
+LocalRpcMonitorMap::PerService &
+LocalRpcMonitorMap::lookup(const ServiceMapping &mapping) {
+ LOG(spam, "lookup %s->%s", mapping.name.c_str(), mapping.spec.c_str());
+ auto iter = _map.find(mapping.name);
if (iter == _map.end()) {
- return nullptr;
+ LOG_ABORT("not in map");
}
PerService & psd = iter->second;
- if (psd.srv.get() != rpcsrv) {
- return nullptr;
+ if (psd.spec != mapping.spec) {
+ LOG_ABORT("conflict in map: %s->%s");
}
- return &psd;
+ LOG(spam, "found in map: %s->%s [%s,%s]",
+ iter->first.c_str(), psd.spec.c_str(),
+ psd.up ? "up" : "down",
+ psd.localOnly ? "local" : "global");
+ return psd;
+}
+
+void LocalRpcMonitorMap::addToMap(const ServiceMapping &mapping, PerService psd) {
+ auto [ iter, was_inserted ] =
+ _map.try_emplace(mapping.name, std::move(psd));
+ LOG_ASSERT(was_inserted);
+ _mappingMonitor->start(mapping);
+}
+
+LocalRpcMonitorMap::RemovedData
+LocalRpcMonitorMap::removeFromMap(Map::iterator iter) {
+ auto name = iter->first;
+ PerService psd = std::move(iter->second);
+ ServiceMapping mapping{iter->first, psd.spec};
+ _mappingMonitor->stop(mapping);
+ _map.erase(iter);
+ return RemovedData {
+ .mapping = mapping,
+ .up = psd.up,
+ .localOnly = psd.localOnly,
+ .inflight = std::move(psd.inflight)
+ };
}
ServiceMapHistory & LocalRpcMonitorMap::history() {
@@ -66,7 +90,7 @@ void LocalRpcMonitorMap::addLocal(const ServiceMapping &mapping,
auto old = _map.find(mapping.name);
if (old != _map.end()) {
const PerService & exists = old->second;
- if (exists.spec() == mapping.spec) {
+ if (exists.spec == mapping.spec) {
LOG(debug, "added mapping %s->%s was already present",
mapping.name.c_str(), mapping.spec.c_str());
inflight->doneHandler(OkState(0, "already registered"));
@@ -74,13 +98,11 @@ void LocalRpcMonitorMap::addLocal(const ServiceMapping &mapping,
}
LOG(warning, "tried addLocal for mapping %s->%s, but already had conflicting mapping %s->%s",
mapping.name.c_str(), mapping.spec.c_str(),
- exists.name().c_str(), exists.spec().c_str());
+ mapping.name.c_str(), exists.spec.c_str());
inflight->doneHandler(OkState(FRTE_RPC_METHOD_FAILED, "conflict"));
return;
}
- auto [ iter, was_inserted ] =
- _map.try_emplace(mapping.name, localService(mapping, std::move(inflight)));
- LOG_ASSERT(was_inserted);
+ addToMap(mapping, localService(mapping, std::move(inflight)));
}
void LocalRpcMonitorMap::add(const ServiceMapping &mapping) {
@@ -97,41 +119,37 @@ void LocalRpcMonitorMap::doAdd(const ServiceMapping &mapping) {
auto old = _map.find(mapping.name);
if (old != _map.end()) {
PerService & exists = old->second;
- if (exists.spec() == mapping.spec) {
+ if (exists.spec == mapping.spec) {
LOG(debug, "added mapping %s->%s was already present",
mapping.name.c_str(), mapping.spec.c_str());
exists.localOnly = false;
return;
}
- PerService removed = std::move(exists);
- _map.erase(old);
+ auto removed = removeFromMap(old);
LOG(warning, "added mapping %s->%s, but already had conflicting mapping %s->%s",
mapping.name.c_str(), mapping.spec.c_str(),
- removed.name().c_str(), removed.spec().c_str());
+ removed.mapping.name.c_str(), removed.mapping.spec.c_str());
if (removed.inflight) {
auto target = std::move(removed.inflight);
target->doneHandler(OkState(13, "conflict during initialization"));
}
if (removed.up) {
- _dispatcher.remove(removed.mapping());
+ _dispatcher.remove(removed.mapping);
}
- _delayedTasks.deleteLater(std::move(removed.srv));
}
- auto [ iter, was_inserted ] =
- _map.try_emplace(mapping.name, globalService(mapping));
- LOG_ASSERT(was_inserted);
+ addToMap(mapping, globalService(mapping));
}
void LocalRpcMonitorMap::doRemove(const ServiceMapping &mapping) {
auto iter = _map.find(mapping.name);
if (iter != _map.end()) {
- PerService removed = std::move(iter->second);
- _map.erase(iter);
- LOG(debug, "remove: mapping %s->%s", mapping.name.c_str(), mapping.spec.c_str());
- if (mapping.spec != removed.spec()) {
+ auto removed = removeFromMap(iter);
+ LOG(debug, "remove: mapping %s->%s",
+ removed.mapping.name.c_str(), removed.mapping.spec.c_str());
+ if (mapping.spec != removed.mapping.spec) {
LOG(warning, "inconsistent specs for name '%s': had '%s', but was asked to remove '%s'",
mapping.name.c_str(),
- removed.spec().c_str(),
+ removed.mapping.spec.c_str(),
mapping.spec.c_str());
}
if (removed.inflight) {
@@ -139,53 +157,44 @@ void LocalRpcMonitorMap::doRemove(const ServiceMapping &mapping) {
target->doneHandler(OkState(13, "removed during initialization"));
}
if (removed.up) {
- _dispatcher.remove(removed.mapping());
+ _dispatcher.remove(removed.mapping);
}
- _delayedTasks.deleteLater(std::move(removed.srv));
} else {
LOG(debug, "tried to remove non-existing mapping %s->%s",
mapping.name.c_str(), mapping.spec.c_str());
}
}
-void LocalRpcMonitorMap::notifyFailedRpcSrv(ManagedRpcServer *rpcsrv, std::string) {
- if (auto *psd = lookup(rpcsrv)) {
- LOG(debug, "failed: %s->%s", psd->name().c_str(), psd->spec().c_str());
- if (psd->inflight) {
- auto target = std::move(psd->inflight);
- target->doneHandler(OkState(13, "failed check using listNames callback"));
- }
- if (psd->localOnly) {
- PerService removed = std::move(*psd);
- auto iter = _map.find(removed.name());
- _map.erase(iter);
- if (removed.up) {
- _dispatcher.remove(removed.mapping());
- }
- _delayedTasks.deleteLater(std::move(removed.srv));
- } else if (psd->up) {
- psd->up = false;
- _dispatcher.remove(psd->mapping());
- }
+void LocalRpcMonitorMap::down(const ServiceMapping& mapping) {
+ PerService &psd = lookup(mapping);
+ LOG(debug, "failed: %s->%s", mapping.name.c_str(), psd.spec.c_str());
+ if (psd.inflight) {
+ auto target = std::move(psd.inflight);
+ target->doneHandler(OkState(13, "failed check using listNames callback"));
}
-}
-
-void LocalRpcMonitorMap::notifyOkRpcSrv(ManagedRpcServer *rpcsrv) {
- if (auto *psd = lookup(rpcsrv)) {
- LOG(debug, "ok: %s->%s", psd->name().c_str(), psd->spec().c_str());
- if (psd->inflight) {
- auto target = std::move(psd->inflight);
- target->doneHandler(OkState());
- }
- if (! psd->up) {
- psd->up = true;
- _dispatcher.add(psd->mapping());
+ if (psd.localOnly) {
+ auto iter = _map.find(mapping.name);
+ auto removed = removeFromMap(iter);
+ if (removed.up) {
+ _dispatcher.remove(removed.mapping);
}
+ } else if (psd.up) {
+ psd.up = false;
+ _dispatcher.remove(mapping);
}
}
-FRT_Supervisor * LocalRpcMonitorMap::getSupervisor() {
- return &_supervisor;
+void LocalRpcMonitorMap::up(const ServiceMapping& mapping) {
+ PerService &psd = lookup(mapping);
+ LOG(debug, "ok: %s->%s", mapping.name.c_str(), psd.spec.c_str());
+ if (psd.inflight) {
+ auto target = std::move(psd.inflight);
+ target->doneHandler(OkState());
+ }
+ if (! psd.up) {
+ psd.up = true;
+ _dispatcher.add(mapping);
+ }
}
} // namespace slobrok
diff --git a/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.h b/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.h
index 8961e21f386..8df2ca882de 100644
--- a/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.h
+++ b/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.h
@@ -2,10 +2,10 @@
#pragma once
#include "cmd.h"
-#include "i_rpc_server_manager.h"
#include "managed_rpc_server.h"
#include "map_listener.h"
#include "map_source.h"
+#include "mapping_monitor.h"
#include "named_service.h"
#include "proxy_map_source.h"
#include "service_map_history.h"
@@ -24,8 +24,8 @@ namespace slobrok {
* Tracks up/down status for name->spec combinations
* that are considered for publication locally.
**/
-class LocalRpcMonitorMap : public IRpcServerManager,
- public MapListener
+class LocalRpcMonitorMap : public MapListener,
+ public MappingMonitorOwner
{
private:
enum class EventType { ADD, REMOVE };
@@ -42,16 +42,9 @@ private:
};
class DelayedTasks : public FNET_Task {
- using MUP = std::unique_ptr<ManagedRpcServer>;
- std::vector<MUP> _deleteList;
std::vector<Event> _queue;
LocalRpcMonitorMap &_target;
public:
- void deleteLater(MUP rpcsrv) {
- _deleteList.emplace_back(std::move(rpcsrv));
- ScheduleNow();
- }
-
void handleLater(Event event) {
_queue.emplace_back(std::move(event));
ScheduleNow();
@@ -61,7 +54,6 @@ private:
DelayedTasks(FNET_Scheduler *scheduler, LocalRpcMonitorMap &target)
: FNET_Task(scheduler),
- _deleteList(),
_queue(),
_target(target)
{}
@@ -75,17 +67,9 @@ private:
bool up;
bool localOnly;
std::unique_ptr<ScriptCommand> inflight;
- std::unique_ptr<ManagedRpcServer> srv;
-
- vespalib::string name() const { return srv->getName(); }
- vespalib::string spec() const { return srv->getSpec(); }
- ServiceMapping mapping() const { return ServiceMapping{srv->getName(), srv->getSpec()}; }
+ vespalib::string spec;
};
- std::unique_ptr<ManagedRpcServer> managedFor(const ServiceMapping &mapping) {
- return std::make_unique<ManagedRpcServer>(mapping.name, mapping.spec, *this);
- }
-
PerService localService(const ServiceMapping &mapping,
std::unique_ptr<ScriptCommand> inflight)
{
@@ -93,7 +77,7 @@ private:
.up = false,
.localOnly = true,
.inflight = std::move(inflight),
- .srv = managedFor(mapping)
+ .spec = mapping.spec
};
}
@@ -102,7 +86,7 @@ private:
.up = false,
.localOnly = false,
.inflight = {},
- .srv = managedFor(mapping)
+ .spec = mapping.spec
};
}
@@ -111,16 +95,28 @@ private:
Map _map;
ProxyMapSource _dispatcher;
ServiceMapHistory _history;
- FRT_Supervisor &_supervisor;
+ MappingMonitor::UP _mappingMonitor;
std::unique_ptr<MapSubscription> _subscription;
- PerService *lookup(ManagedRpcServer *rpcsrv);
-
void doAdd(const ServiceMapping &mapping);
void doRemove(const ServiceMapping &mapping);
-
+
+ PerService & lookup(const ServiceMapping &mapping);
+
+ void addToMap(const ServiceMapping &mapping, PerService psd);
+
+ struct RemovedData {
+ ServiceMapping mapping;
+ bool up;
+ bool localOnly;
+ std::unique_ptr<ScriptCommand> inflight;
+ };
+
+ RemovedData removeFromMap(Map::iterator iter);
+
public:
- LocalRpcMonitorMap(FRT_Supervisor &_supervisor);
+ LocalRpcMonitorMap(FRT_Supervisor &supervisor,
+ MappingMonitorFactory mappingMonitorFactory);
~LocalRpcMonitorMap();
MapSource &dispatcher() { return _dispatcher; }
@@ -133,9 +129,8 @@ public:
void add(const ServiceMapping &mapping) override;
void remove(const ServiceMapping &mapping) override;
- void notifyFailedRpcSrv(ManagedRpcServer *rpcsrv, std::string errmsg) override;
- void notifyOkRpcSrv(ManagedRpcServer *rpcsrv) override;
- FRT_Supervisor *getSupervisor() override;
+ void up(const ServiceMapping& mapping) override;
+ void down(const ServiceMapping& mapping) override;
};
//-----------------------------------------------------------------------------
diff --git a/slobrok/src/vespa/slobrok/server/sbenv.cpp b/slobrok/src/vespa/slobrok/server/sbenv.cpp
index cf612a187bb..1f54716c29c 100644
--- a/slobrok/src/vespa/slobrok/server/sbenv.cpp
+++ b/slobrok/src/vespa/slobrok/server/sbenv.cpp
@@ -110,7 +110,10 @@ SBEnv::SBEnv(const ConfigShim &shim)
_health(),
_metrics(_rpcHooks, *_transport),
_components(),
- _localRpcMonitorMap(*_supervisor),
+ _localRpcMonitorMap(*_supervisor,
+ [this] (MappingMonitorOwner &owner) {
+ return std::make_unique<RpcMappingMonitor>(*_supervisor, owner);
+ }),
_rpcsrvmanager(*this),
_exchanger(*this, _rpcsrvmap),
_rpcsrvmap()
diff --git a/slobrok/src/vespa/slobrok/server/sbenv.h b/slobrok/src/vespa/slobrok/server/sbenv.h
index 7bed910936f..44b7305814c 100644
--- a/slobrok/src/vespa/slobrok/server/sbenv.h
+++ b/slobrok/src/vespa/slobrok/server/sbenv.h
@@ -2,6 +2,7 @@
#pragma once
#include "named_service.h"
+#include "rpc_mapping_monitor.h"
#include "rpc_server_map.h"
#include "rpc_server_manager.h"
#include "remote_slobrok.h"