diff options
author | Arne Juul <arnej@verizonmedia.com> | 2021-08-20 10:19:53 +0000 |
---|---|---|
committer | Arne Juul <arnej@verizonmedia.com> | 2021-08-20 10:19:53 +0000 |
commit | 8e7e1ecbd1117723d02b51df80861e3a05b4fb79 (patch) | |
tree | 96e36a04bc0727873abe81592520a960b507f8ba /slobrok | |
parent | c55a73fbc972fc1194877dc895ff90906a57d569 (diff) |
use new MappingMonitor
Diffstat (limited to 'slobrok')
-rw-r--r-- | slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.cpp | 74 | ||||
-rw-r--r-- | slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.h | 42 | ||||
-rw-r--r-- | slobrok/src/vespa/slobrok/server/sbenv.cpp | 5 | ||||
-rw-r--r-- | slobrok/src/vespa/slobrok/server/sbenv.h | 1 |
4 files changed, 56 insertions, 66 deletions
diff --git a/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.cpp b/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.cpp index 485e6d27a1c..fe7c7dd2c36 100644 --- a/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.cpp +++ b/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.cpp @@ -11,10 +11,6 @@ namespace slobrok { #pragma GCC diagnostic ignored "-Winline" void LocalRpcMonitorMap::DelayedTasks::PerformTask() { - { - std::vector<MUP> deleteAfterSwap; - std::swap(deleteAfterSwap, _deleteList); - } std::vector<Event> todo; std::swap(todo, _queue); for (const auto & entry : todo) { @@ -29,12 +25,13 @@ void LocalRpcMonitorMap::DelayedTasks::PerformTask() { } } -LocalRpcMonitorMap::LocalRpcMonitorMap(FRT_Supervisor &supervisor) +LocalRpcMonitorMap::LocalRpcMonitorMap(FRT_Supervisor &supervisor, + MappingMonitorFactory mappingMonitorFactory) : _delayedTasks(supervisor.GetScheduler(), *this), _map(), _dispatcher(), _history(), - _supervisor(supervisor), + _mappingMonitor(mappingMonitorFactory(*this)), _subscription(MapSubscription::subscribe(_dispatcher, _history)) { } @@ -42,15 +39,20 @@ LocalRpcMonitorMap::LocalRpcMonitorMap(FRT_Supervisor &supervisor) LocalRpcMonitorMap::~LocalRpcMonitorMap() = default; LocalRpcMonitorMap::PerService * -LocalRpcMonitorMap::lookup(ManagedRpcServer *rpcsrv) { - auto iter = _map.find(rpcsrv->getName()); +LocalRpcMonitorMap::lookup(const ServiceMapping &mapping) { + LOG(spam, "lookup %s->%s", mapping.name.c_str(), mapping.spec.c_str()); + auto iter = _map.find(mapping.name); if (iter == _map.end()) { - return nullptr; + LOG_ABORT("not in map"); } PerService & psd = iter->second; - if (psd.srv.get() != rpcsrv) { - return nullptr; + if (psd.spec != mapping.spec) { + LOG_ABORT("conflict in map: %s->%s"); } + LOG(spam, "found in map: %s->%s [%s,%s]", + iter->first.c_str(), psd.spec.c_str(), + psd.up ? "up" : "down", + psd.localOnly ? "local" : "global"); return &psd; } @@ -66,7 +68,7 @@ void LocalRpcMonitorMap::addLocal(const ServiceMapping &mapping, auto old = _map.find(mapping.name); if (old != _map.end()) { const PerService & exists = old->second; - if (exists.spec() == mapping.spec) { + if (exists.spec == mapping.spec) { LOG(debug, "added mapping %s->%s was already present", mapping.name.c_str(), mapping.spec.c_str()); inflight->doneHandler(OkState(0, "already registered")); @@ -74,13 +76,14 @@ void LocalRpcMonitorMap::addLocal(const ServiceMapping &mapping, } LOG(warning, "tried addLocal for mapping %s->%s, but already had conflicting mapping %s->%s", mapping.name.c_str(), mapping.spec.c_str(), - exists.name().c_str(), exists.spec().c_str()); + mapping.name.c_str(), exists.spec.c_str()); inflight->doneHandler(OkState(FRTE_RPC_METHOD_FAILED, "conflict")); return; } auto [ iter, was_inserted ] = _map.try_emplace(mapping.name, localService(mapping, std::move(inflight))); LOG_ASSERT(was_inserted); + _mappingMonitor->start(mapping); } void LocalRpcMonitorMap::add(const ServiceMapping &mapping) { @@ -97,41 +100,45 @@ void LocalRpcMonitorMap::doAdd(const ServiceMapping &mapping) { auto old = _map.find(mapping.name); if (old != _map.end()) { PerService & exists = old->second; - if (exists.spec() == mapping.spec) { + if (exists.spec == mapping.spec) { LOG(debug, "added mapping %s->%s was already present", mapping.name.c_str(), mapping.spec.c_str()); exists.localOnly = false; return; } PerService removed = std::move(exists); + ServiceMapping toRemove{old->first, removed.spec}; + _mappingMonitor->stop(toRemove); _map.erase(old); LOG(warning, "added mapping %s->%s, but already had conflicting mapping %s->%s", mapping.name.c_str(), mapping.spec.c_str(), - removed.name().c_str(), removed.spec().c_str()); + toRemove.name.c_str(), removed.spec.c_str()); if (removed.inflight) { auto target = std::move(removed.inflight); target->doneHandler(OkState(13, "conflict during initialization")); } if (removed.up) { - _dispatcher.remove(removed.mapping()); + _dispatcher.remove(toRemove); } - _delayedTasks.deleteLater(std::move(removed.srv)); } auto [ iter, was_inserted ] = _map.try_emplace(mapping.name, globalService(mapping)); LOG_ASSERT(was_inserted); + _mappingMonitor->start(mapping); } void LocalRpcMonitorMap::doRemove(const ServiceMapping &mapping) { auto iter = _map.find(mapping.name); if (iter != _map.end()) { PerService removed = std::move(iter->second); + ServiceMapping toRemove{iter->first, removed.spec}; + _mappingMonitor->stop(toRemove); _map.erase(iter); LOG(debug, "remove: mapping %s->%s", mapping.name.c_str(), mapping.spec.c_str()); - if (mapping.spec != removed.spec()) { + if (mapping.spec != removed.spec) { LOG(warning, "inconsistent specs for name '%s': had '%s', but was asked to remove '%s'", mapping.name.c_str(), - removed.spec().c_str(), + removed.spec.c_str(), mapping.spec.c_str()); } if (removed.inflight) { @@ -139,53 +146,48 @@ void LocalRpcMonitorMap::doRemove(const ServiceMapping &mapping) { target->doneHandler(OkState(13, "removed during initialization")); } if (removed.up) { - _dispatcher.remove(removed.mapping()); + _dispatcher.remove(toRemove); } - _delayedTasks.deleteLater(std::move(removed.srv)); } else { LOG(debug, "tried to remove non-existing mapping %s->%s", mapping.name.c_str(), mapping.spec.c_str()); } } -void LocalRpcMonitorMap::notifyFailedRpcSrv(ManagedRpcServer *rpcsrv, std::string) { - if (auto *psd = lookup(rpcsrv)) { - LOG(debug, "failed: %s->%s", psd->name().c_str(), psd->spec().c_str()); +void LocalRpcMonitorMap::down(const ServiceMapping& mapping) { + if (auto *psd = lookup(mapping)) { + LOG(debug, "failed: %s->%s", mapping.name.c_str(), psd->spec.c_str()); if (psd->inflight) { auto target = std::move(psd->inflight); target->doneHandler(OkState(13, "failed check using listNames callback")); } if (psd->localOnly) { PerService removed = std::move(*psd); - auto iter = _map.find(removed.name()); + auto iter = _map.find(mapping.name); + _mappingMonitor->stop(mapping); _map.erase(iter); if (removed.up) { - _dispatcher.remove(removed.mapping()); + _dispatcher.remove(mapping); } - _delayedTasks.deleteLater(std::move(removed.srv)); } else if (psd->up) { psd->up = false; - _dispatcher.remove(psd->mapping()); + _dispatcher.remove(mapping); } } } -void LocalRpcMonitorMap::notifyOkRpcSrv(ManagedRpcServer *rpcsrv) { - if (auto *psd = lookup(rpcsrv)) { - LOG(debug, "ok: %s->%s", psd->name().c_str(), psd->spec().c_str()); +void LocalRpcMonitorMap::up(const ServiceMapping& mapping) { + if (auto *psd = lookup(mapping)) { + LOG(debug, "ok: %s->%s", mapping.name.c_str(), psd->spec.c_str()); if (psd->inflight) { auto target = std::move(psd->inflight); target->doneHandler(OkState()); } if (! psd->up) { psd->up = true; - _dispatcher.add(psd->mapping()); + _dispatcher.add(mapping); } } } -FRT_Supervisor * LocalRpcMonitorMap::getSupervisor() { - return &_supervisor; -} - } // namespace slobrok diff --git a/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.h b/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.h index 8961e21f386..a0971a8c80b 100644 --- a/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.h +++ b/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.h @@ -2,10 +2,10 @@ #pragma once #include "cmd.h" -#include "i_rpc_server_manager.h" #include "managed_rpc_server.h" #include "map_listener.h" #include "map_source.h" +#include "mapping_monitor.h" #include "named_service.h" #include "proxy_map_source.h" #include "service_map_history.h" @@ -24,8 +24,8 @@ namespace slobrok { * Tracks up/down status for name->spec combinations * that are considered for publication locally. **/ -class LocalRpcMonitorMap : public IRpcServerManager, - public MapListener +class LocalRpcMonitorMap : public MapListener, + public MappingMonitorOwner { private: enum class EventType { ADD, REMOVE }; @@ -42,16 +42,9 @@ private: }; class DelayedTasks : public FNET_Task { - using MUP = std::unique_ptr<ManagedRpcServer>; - std::vector<MUP> _deleteList; std::vector<Event> _queue; LocalRpcMonitorMap &_target; public: - void deleteLater(MUP rpcsrv) { - _deleteList.emplace_back(std::move(rpcsrv)); - ScheduleNow(); - } - void handleLater(Event event) { _queue.emplace_back(std::move(event)); ScheduleNow(); @@ -61,7 +54,6 @@ private: DelayedTasks(FNET_Scheduler *scheduler, LocalRpcMonitorMap &target) : FNET_Task(scheduler), - _deleteList(), _queue(), _target(target) {} @@ -75,17 +67,9 @@ private: bool up; bool localOnly; std::unique_ptr<ScriptCommand> inflight; - std::unique_ptr<ManagedRpcServer> srv; - - vespalib::string name() const { return srv->getName(); } - vespalib::string spec() const { return srv->getSpec(); } - ServiceMapping mapping() const { return ServiceMapping{srv->getName(), srv->getSpec()}; } + vespalib::string spec; }; - std::unique_ptr<ManagedRpcServer> managedFor(const ServiceMapping &mapping) { - return std::make_unique<ManagedRpcServer>(mapping.name, mapping.spec, *this); - } - PerService localService(const ServiceMapping &mapping, std::unique_ptr<ScriptCommand> inflight) { @@ -93,7 +77,7 @@ private: .up = false, .localOnly = true, .inflight = std::move(inflight), - .srv = managedFor(mapping) + .spec = mapping.spec }; } @@ -102,7 +86,7 @@ private: .up = false, .localOnly = false, .inflight = {}, - .srv = managedFor(mapping) + .spec = mapping.spec }; } @@ -111,16 +95,17 @@ private: Map _map; ProxyMapSource _dispatcher; ServiceMapHistory _history; - FRT_Supervisor &_supervisor; + MappingMonitor::UP _mappingMonitor; std::unique_ptr<MapSubscription> _subscription; - PerService *lookup(ManagedRpcServer *rpcsrv); - void doAdd(const ServiceMapping &mapping); void doRemove(const ServiceMapping &mapping); + + PerService * lookup(const ServiceMapping &mapping); public: - LocalRpcMonitorMap(FRT_Supervisor &_supervisor); + LocalRpcMonitorMap(FRT_Supervisor &supervisor, + MappingMonitorFactory mappingMonitorFactory); ~LocalRpcMonitorMap(); MapSource &dispatcher() { return _dispatcher; } @@ -133,9 +118,8 @@ public: void add(const ServiceMapping &mapping) override; void remove(const ServiceMapping &mapping) override; - void notifyFailedRpcSrv(ManagedRpcServer *rpcsrv, std::string errmsg) override; - void notifyOkRpcSrv(ManagedRpcServer *rpcsrv) override; - FRT_Supervisor *getSupervisor() override; + void up(const ServiceMapping& mapping) override; + void down(const ServiceMapping& mapping) override; }; //----------------------------------------------------------------------------- diff --git a/slobrok/src/vespa/slobrok/server/sbenv.cpp b/slobrok/src/vespa/slobrok/server/sbenv.cpp index 1b52b85f736..d8a3e62f44e 100644 --- a/slobrok/src/vespa/slobrok/server/sbenv.cpp +++ b/slobrok/src/vespa/slobrok/server/sbenv.cpp @@ -110,7 +110,10 @@ SBEnv::SBEnv(const ConfigShim &shim) _health(), _metrics(_rpcHooks, *_transport), _components(), - _localRpcMonitorMap(*_supervisor), + _localRpcMonitorMap(*_supervisor, + [this] (MappingMonitorOwner &owner) { + return std::make_unique<RpcMappingMonitor>(*_supervisor, owner); + }), _rpcsrvmanager(*this), _exchanger(*this, _rpcsrvmap), _rpcsrvmap() diff --git a/slobrok/src/vespa/slobrok/server/sbenv.h b/slobrok/src/vespa/slobrok/server/sbenv.h index b744a2d1d92..576e5d4b30c 100644 --- a/slobrok/src/vespa/slobrok/server/sbenv.h +++ b/slobrok/src/vespa/slobrok/server/sbenv.h @@ -2,6 +2,7 @@ #pragma once #include "named_service.h" +#include "rpc_mapping_monitor.h" #include "rpc_server_map.h" #include "rpc_server_manager.h" #include "remote_slobrok.h" |