diff options
author | Arne H Juul <arnej27959@users.noreply.github.com> | 2021-08-20 15:31:33 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-08-20 15:31:33 +0200 |
commit | 87b495d4f9bda18b5189eb3039bbf0d19170de05 (patch) | |
tree | 194c7e65cecb1ab20c3c2310ae431336c0d373ac /slobrok | |
parent | 30c8dac6c619d6fe2d5858e5def4f64c64a11029 (diff) | |
parent | 78599e4d5669f1d6d354ee1fcc1106dd4ead63f5 (diff) |
Merge pull request #18809 from vespa-engine/arnej/use-external-mapping-monitor
use new MappingMonitor
Diffstat (limited to 'slobrok')
-rw-r--r-- | slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.cpp | 143 | ||||
-rw-r--r-- | slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.h | 55 | ||||
-rw-r--r-- | slobrok/src/vespa/slobrok/server/sbenv.cpp | 5 | ||||
-rw-r--r-- | slobrok/src/vespa/slobrok/server/sbenv.h | 1 |
4 files changed, 106 insertions, 98 deletions
diff --git a/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.cpp b/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.cpp index 485e6d27a1c..70bbab37e3e 100644 --- a/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.cpp +++ b/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.cpp @@ -11,10 +11,6 @@ namespace slobrok { #pragma GCC diagnostic ignored "-Winline" void LocalRpcMonitorMap::DelayedTasks::PerformTask() { - { - std::vector<MUP> deleteAfterSwap; - std::swap(deleteAfterSwap, _deleteList); - } std::vector<Event> todo; std::swap(todo, _queue); for (const auto & entry : todo) { @@ -29,29 +25,57 @@ void LocalRpcMonitorMap::DelayedTasks::PerformTask() { } } -LocalRpcMonitorMap::LocalRpcMonitorMap(FRT_Supervisor &supervisor) +LocalRpcMonitorMap::LocalRpcMonitorMap(FRT_Supervisor &supervisor, + MappingMonitorFactory mappingMonitorFactory) : _delayedTasks(supervisor.GetScheduler(), *this), _map(), _dispatcher(), _history(), - _supervisor(supervisor), + _mappingMonitor(mappingMonitorFactory(*this)), _subscription(MapSubscription::subscribe(_dispatcher, _history)) { } LocalRpcMonitorMap::~LocalRpcMonitorMap() = default; -LocalRpcMonitorMap::PerService * -LocalRpcMonitorMap::lookup(ManagedRpcServer *rpcsrv) { - auto iter = _map.find(rpcsrv->getName()); +LocalRpcMonitorMap::PerService & +LocalRpcMonitorMap::lookup(const ServiceMapping &mapping) { + LOG(spam, "lookup %s->%s", mapping.name.c_str(), mapping.spec.c_str()); + auto iter = _map.find(mapping.name); if (iter == _map.end()) { - return nullptr; + LOG_ABORT("not in map"); } PerService & psd = iter->second; - if (psd.srv.get() != rpcsrv) { - return nullptr; + if (psd.spec != mapping.spec) { + LOG_ABORT("conflict in map: %s->%s"); } - return &psd; + LOG(spam, "found in map: %s->%s [%s,%s]", + iter->first.c_str(), psd.spec.c_str(), + psd.up ? "up" : "down", + psd.localOnly ? "local" : "global"); + return psd; +} + +void LocalRpcMonitorMap::addToMap(const ServiceMapping &mapping, PerService psd) { + auto [ iter, was_inserted ] = + _map.try_emplace(mapping.name, std::move(psd)); + LOG_ASSERT(was_inserted); + _mappingMonitor->start(mapping); +} + +LocalRpcMonitorMap::RemovedData +LocalRpcMonitorMap::removeFromMap(Map::iterator iter) { + auto name = iter->first; + PerService psd = std::move(iter->second); + ServiceMapping mapping{iter->first, psd.spec}; + _mappingMonitor->stop(mapping); + _map.erase(iter); + return RemovedData { + .mapping = mapping, + .up = psd.up, + .localOnly = psd.localOnly, + .inflight = std::move(psd.inflight) + }; } ServiceMapHistory & LocalRpcMonitorMap::history() { @@ -66,7 +90,7 @@ void LocalRpcMonitorMap::addLocal(const ServiceMapping &mapping, auto old = _map.find(mapping.name); if (old != _map.end()) { const PerService & exists = old->second; - if (exists.spec() == mapping.spec) { + if (exists.spec == mapping.spec) { LOG(debug, "added mapping %s->%s was already present", mapping.name.c_str(), mapping.spec.c_str()); inflight->doneHandler(OkState(0, "already registered")); @@ -74,13 +98,11 @@ void LocalRpcMonitorMap::addLocal(const ServiceMapping &mapping, } LOG(warning, "tried addLocal for mapping %s->%s, but already had conflicting mapping %s->%s", mapping.name.c_str(), mapping.spec.c_str(), - exists.name().c_str(), exists.spec().c_str()); + mapping.name.c_str(), exists.spec.c_str()); inflight->doneHandler(OkState(FRTE_RPC_METHOD_FAILED, "conflict")); return; } - auto [ iter, was_inserted ] = - _map.try_emplace(mapping.name, localService(mapping, std::move(inflight))); - LOG_ASSERT(was_inserted); + addToMap(mapping, localService(mapping, std::move(inflight))); } void LocalRpcMonitorMap::add(const ServiceMapping &mapping) { @@ -97,41 +119,37 @@ void LocalRpcMonitorMap::doAdd(const ServiceMapping &mapping) { auto old = _map.find(mapping.name); if (old != _map.end()) { PerService & exists = old->second; - if (exists.spec() == mapping.spec) { + if (exists.spec == mapping.spec) { LOG(debug, "added mapping %s->%s was already present", mapping.name.c_str(), mapping.spec.c_str()); exists.localOnly = false; return; } - PerService removed = std::move(exists); - _map.erase(old); + auto removed = removeFromMap(old); LOG(warning, "added mapping %s->%s, but already had conflicting mapping %s->%s", mapping.name.c_str(), mapping.spec.c_str(), - removed.name().c_str(), removed.spec().c_str()); + removed.mapping.name.c_str(), removed.mapping.spec.c_str()); if (removed.inflight) { auto target = std::move(removed.inflight); target->doneHandler(OkState(13, "conflict during initialization")); } if (removed.up) { - _dispatcher.remove(removed.mapping()); + _dispatcher.remove(removed.mapping); } - _delayedTasks.deleteLater(std::move(removed.srv)); } - auto [ iter, was_inserted ] = - _map.try_emplace(mapping.name, globalService(mapping)); - LOG_ASSERT(was_inserted); + addToMap(mapping, globalService(mapping)); } void LocalRpcMonitorMap::doRemove(const ServiceMapping &mapping) { auto iter = _map.find(mapping.name); if (iter != _map.end()) { - PerService removed = std::move(iter->second); - _map.erase(iter); - LOG(debug, "remove: mapping %s->%s", mapping.name.c_str(), mapping.spec.c_str()); - if (mapping.spec != removed.spec()) { + auto removed = removeFromMap(iter); + LOG(debug, "remove: mapping %s->%s", + removed.mapping.name.c_str(), removed.mapping.spec.c_str()); + if (mapping.spec != removed.mapping.spec) { LOG(warning, "inconsistent specs for name '%s': had '%s', but was asked to remove '%s'", mapping.name.c_str(), - removed.spec().c_str(), + removed.mapping.spec.c_str(), mapping.spec.c_str()); } if (removed.inflight) { @@ -139,53 +157,44 @@ void LocalRpcMonitorMap::doRemove(const ServiceMapping &mapping) { target->doneHandler(OkState(13, "removed during initialization")); } if (removed.up) { - _dispatcher.remove(removed.mapping()); + _dispatcher.remove(removed.mapping); } - _delayedTasks.deleteLater(std::move(removed.srv)); } else { LOG(debug, "tried to remove non-existing mapping %s->%s", mapping.name.c_str(), mapping.spec.c_str()); } } -void LocalRpcMonitorMap::notifyFailedRpcSrv(ManagedRpcServer *rpcsrv, std::string) { - if (auto *psd = lookup(rpcsrv)) { - LOG(debug, "failed: %s->%s", psd->name().c_str(), psd->spec().c_str()); - if (psd->inflight) { - auto target = std::move(psd->inflight); - target->doneHandler(OkState(13, "failed check using listNames callback")); - } - if (psd->localOnly) { - PerService removed = std::move(*psd); - auto iter = _map.find(removed.name()); - _map.erase(iter); - if (removed.up) { - _dispatcher.remove(removed.mapping()); - } - _delayedTasks.deleteLater(std::move(removed.srv)); - } else if (psd->up) { - psd->up = false; - _dispatcher.remove(psd->mapping()); - } +void LocalRpcMonitorMap::down(const ServiceMapping& mapping) { + PerService &psd = lookup(mapping); + LOG(debug, "failed: %s->%s", mapping.name.c_str(), psd.spec.c_str()); + if (psd.inflight) { + auto target = std::move(psd.inflight); + target->doneHandler(OkState(13, "failed check using listNames callback")); } -} - -void LocalRpcMonitorMap::notifyOkRpcSrv(ManagedRpcServer *rpcsrv) { - if (auto *psd = lookup(rpcsrv)) { - LOG(debug, "ok: %s->%s", psd->name().c_str(), psd->spec().c_str()); - if (psd->inflight) { - auto target = std::move(psd->inflight); - target->doneHandler(OkState()); - } - if (! psd->up) { - psd->up = true; - _dispatcher.add(psd->mapping()); + if (psd.localOnly) { + auto iter = _map.find(mapping.name); + auto removed = removeFromMap(iter); + if (removed.up) { + _dispatcher.remove(removed.mapping); } + } else if (psd.up) { + psd.up = false; + _dispatcher.remove(mapping); } } -FRT_Supervisor * LocalRpcMonitorMap::getSupervisor() { - return &_supervisor; +void LocalRpcMonitorMap::up(const ServiceMapping& mapping) { + PerService &psd = lookup(mapping); + LOG(debug, "ok: %s->%s", mapping.name.c_str(), psd.spec.c_str()); + if (psd.inflight) { + auto target = std::move(psd.inflight); + target->doneHandler(OkState()); + } + if (! psd.up) { + psd.up = true; + _dispatcher.add(mapping); + } } } // namespace slobrok diff --git a/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.h b/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.h index 8961e21f386..8df2ca882de 100644 --- a/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.h +++ b/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.h @@ -2,10 +2,10 @@ #pragma once #include "cmd.h" -#include "i_rpc_server_manager.h" #include "managed_rpc_server.h" #include "map_listener.h" #include "map_source.h" +#include "mapping_monitor.h" #include "named_service.h" #include "proxy_map_source.h" #include "service_map_history.h" @@ -24,8 +24,8 @@ namespace slobrok { * Tracks up/down status for name->spec combinations * that are considered for publication locally. **/ -class LocalRpcMonitorMap : public IRpcServerManager, - public MapListener +class LocalRpcMonitorMap : public MapListener, + public MappingMonitorOwner { private: enum class EventType { ADD, REMOVE }; @@ -42,16 +42,9 @@ private: }; class DelayedTasks : public FNET_Task { - using MUP = std::unique_ptr<ManagedRpcServer>; - std::vector<MUP> _deleteList; std::vector<Event> _queue; LocalRpcMonitorMap &_target; public: - void deleteLater(MUP rpcsrv) { - _deleteList.emplace_back(std::move(rpcsrv)); - ScheduleNow(); - } - void handleLater(Event event) { _queue.emplace_back(std::move(event)); ScheduleNow(); @@ -61,7 +54,6 @@ private: DelayedTasks(FNET_Scheduler *scheduler, LocalRpcMonitorMap &target) : FNET_Task(scheduler), - _deleteList(), _queue(), _target(target) {} @@ -75,17 +67,9 @@ private: bool up; bool localOnly; std::unique_ptr<ScriptCommand> inflight; - std::unique_ptr<ManagedRpcServer> srv; - - vespalib::string name() const { return srv->getName(); } - vespalib::string spec() const { return srv->getSpec(); } - ServiceMapping mapping() const { return ServiceMapping{srv->getName(), srv->getSpec()}; } + vespalib::string spec; }; - std::unique_ptr<ManagedRpcServer> managedFor(const ServiceMapping &mapping) { - return std::make_unique<ManagedRpcServer>(mapping.name, mapping.spec, *this); - } - PerService localService(const ServiceMapping &mapping, std::unique_ptr<ScriptCommand> inflight) { @@ -93,7 +77,7 @@ private: .up = false, .localOnly = true, .inflight = std::move(inflight), - .srv = managedFor(mapping) + .spec = mapping.spec }; } @@ -102,7 +86,7 @@ private: .up = false, .localOnly = false, .inflight = {}, - .srv = managedFor(mapping) + .spec = mapping.spec }; } @@ -111,16 +95,28 @@ private: Map _map; ProxyMapSource _dispatcher; ServiceMapHistory _history; - FRT_Supervisor &_supervisor; + MappingMonitor::UP _mappingMonitor; std::unique_ptr<MapSubscription> _subscription; - PerService *lookup(ManagedRpcServer *rpcsrv); - void doAdd(const ServiceMapping &mapping); void doRemove(const ServiceMapping &mapping); - + + PerService & lookup(const ServiceMapping &mapping); + + void addToMap(const ServiceMapping &mapping, PerService psd); + + struct RemovedData { + ServiceMapping mapping; + bool up; + bool localOnly; + std::unique_ptr<ScriptCommand> inflight; + }; + + RemovedData removeFromMap(Map::iterator iter); + public: - LocalRpcMonitorMap(FRT_Supervisor &_supervisor); + LocalRpcMonitorMap(FRT_Supervisor &supervisor, + MappingMonitorFactory mappingMonitorFactory); ~LocalRpcMonitorMap(); MapSource &dispatcher() { return _dispatcher; } @@ -133,9 +129,8 @@ public: void add(const ServiceMapping &mapping) override; void remove(const ServiceMapping &mapping) override; - void notifyFailedRpcSrv(ManagedRpcServer *rpcsrv, std::string errmsg) override; - void notifyOkRpcSrv(ManagedRpcServer *rpcsrv) override; - FRT_Supervisor *getSupervisor() override; + void up(const ServiceMapping& mapping) override; + void down(const ServiceMapping& mapping) override; }; //----------------------------------------------------------------------------- diff --git a/slobrok/src/vespa/slobrok/server/sbenv.cpp b/slobrok/src/vespa/slobrok/server/sbenv.cpp index cf612a187bb..1f54716c29c 100644 --- a/slobrok/src/vespa/slobrok/server/sbenv.cpp +++ b/slobrok/src/vespa/slobrok/server/sbenv.cpp @@ -110,7 +110,10 @@ SBEnv::SBEnv(const ConfigShim &shim) _health(), _metrics(_rpcHooks, *_transport), _components(), - _localRpcMonitorMap(*_supervisor), + _localRpcMonitorMap(*_supervisor, + [this] (MappingMonitorOwner &owner) { + return std::make_unique<RpcMappingMonitor>(*_supervisor, owner); + }), _rpcsrvmanager(*this), _exchanger(*this, _rpcsrvmap), _rpcsrvmap() diff --git a/slobrok/src/vespa/slobrok/server/sbenv.h b/slobrok/src/vespa/slobrok/server/sbenv.h index 7bed910936f..44b7305814c 100644 --- a/slobrok/src/vespa/slobrok/server/sbenv.h +++ b/slobrok/src/vespa/slobrok/server/sbenv.h @@ -2,6 +2,7 @@ #pragma once #include "named_service.h" +#include "rpc_mapping_monitor.h" #include "rpc_server_map.h" #include "rpc_server_manager.h" #include "remote_slobrok.h" |