diff options
author | Arne Juul <arnej@verizonmedia.com> | 2021-08-18 14:25:38 +0000 |
---|---|---|
committer | Arne Juul <arnej@verizonmedia.com> | 2021-08-18 20:26:20 +0000 |
commit | 3731e975304f19d07d4614d17998cee39fe82ce3 (patch) | |
tree | f2630fdd05770215e3c6f02317b607e9d999c438 /slobrok | |
parent | e1544cad33ce650a30bb017f99bba71f29136c84 (diff) |
delay add() and remove() for untangling
Diffstat (limited to 'slobrok')
-rw-r--r-- | slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.cpp | 39 | ||||
-rw-r--r-- | slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.h | 46 |
2 files changed, 70 insertions, 15 deletions
diff --git a/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.cpp b/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.cpp index 3bea2018761..9013717f8b7 100644 --- a/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.cpp +++ b/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.cpp @@ -10,8 +10,29 @@ namespace slobrok { #pragma GCC diagnostic ignored "-Winline" +void LocalRpcMonitorMap::DelayedTasks::PerformTask() { + { + std::vector<MUP> deleteAfterSwap; + std::swap(deleteAfterSwap, _deleteList); + } + std::vector<Event> todo; + std::swap(todo, _queue); + for (const auto & entry : todo) { + switch (entry.type) { + case EventType::ADD: + _target.doAdd(entry.mapping); + break; + case EventType::REMOVE: + _target.doRemove(entry.mapping); + break; + default: + abort(); + } + } +} + LocalRpcMonitorMap::LocalRpcMonitorMap(FRT_Supervisor &supervisor) - : _delete(supervisor.GetScheduler()), + : _delayedTasks(supervisor.GetScheduler(), *this), _map(), _dispatcher(), _history(), @@ -65,6 +86,14 @@ void LocalRpcMonitorMap::addLocal(const ServiceMapping &mapping, } void LocalRpcMonitorMap::add(const ServiceMapping &mapping) { + _delayedTasks.handleLater(Event::add(mapping)); +} + +void LocalRpcMonitorMap::remove(const ServiceMapping &mapping) { + _delayedTasks.handleLater(Event::remove(mapping)); +} + +void LocalRpcMonitorMap::doAdd(const ServiceMapping &mapping) { LOG(debug, "try add: mapping %s->%s", mapping.name.c_str(), mapping.spec.c_str()); auto old = _map.find(mapping.name); @@ -88,14 +117,14 @@ void LocalRpcMonitorMap::add(const ServiceMapping &mapping) { if (removed.up) { _dispatcher.remove(removed.mapping()); } - _delete.later(std::move(removed.srv)); + _delayedTasks.deleteLater(std::move(removed.srv)); } auto [ iter, was_inserted ] = _map.try_emplace(mapping.name, globalService(mapping)); LOG_ASSERT(was_inserted); } -void LocalRpcMonitorMap::remove(const ServiceMapping &mapping) { +void LocalRpcMonitorMap::doRemove(const ServiceMapping &mapping) { auto iter = _map.find(mapping.name); if (iter != _map.end()) { PerService removed = std::move(iter->second); @@ -114,7 +143,7 @@ void LocalRpcMonitorMap::remove(const ServiceMapping &mapping) { if (removed.up) { _dispatcher.remove(removed.mapping()); } - _delete.later(std::move(removed.srv)); + _delayedTasks.deleteLater(std::move(removed.srv)); } else { LOG(debug, "tried to remove non-existing mapping %s->%s", mapping.name.c_str(), mapping.spec.c_str()); @@ -135,7 +164,7 @@ void LocalRpcMonitorMap::notifyFailedRpcSrv(ManagedRpcServer *rpcsrv, std::strin if (removed.up) { _dispatcher.remove(removed.mapping()); } - _delete.later(std::move(removed.srv)); + _delayedTasks.deleteLater(std::move(removed.srv)); } else if (psd->up) { psd->up = false; _dispatcher.remove(psd->mapping()); diff --git a/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.h b/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.h index 5557bca62f6..424ea82c6b5 100644 --- a/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.h +++ b/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.h @@ -29,26 +29,48 @@ class LocalRpcMonitorMap : public IRpcServerManager, public MapListener { private: - class DeleteTask : public FNET_Task { + enum class EventType { ADD, REMOVE }; + + struct Event { + EventType type; + ServiceMapping mapping; + static Event add(const ServiceMapping &value) { + return Event{EventType::ADD, value}; + } + static Event remove(const ServiceMapping &value) { + return Event{EventType::REMOVE, value}; + } + }; + + class DelayedTasks : public FNET_Task { using MUP = std::unique_ptr<ManagedRpcServer>; - std::vector<MUP> _deleteList; + std::vector<MUP> _deleteList; + std::vector<Event> _queue; + LocalRpcMonitorMap &_target; public: - void later(MUP rpcsrv) { + void deleteLater(MUP rpcsrv) { _deleteList.emplace_back(std::move(rpcsrv)); ScheduleNow(); } - void PerformTask() override { - std::vector<MUP> deleteAfterSwap; - std::swap(deleteAfterSwap, _deleteList); + + void handleLater(Event event) { + _queue.emplace_back(std::move(event)); + ScheduleNow(); } - DeleteTask(FNET_Scheduler *scheduler) + + void PerformTask() override; + + DelayedTasks(FNET_Scheduler *scheduler, LocalRpcMonitorMap &target) : FNET_Task(scheduler), - _deleteList() + _deleteList(), + _queue(), + _target(target) {} - ~DeleteTask() { Kill(); } + + ~DelayedTasks() { Kill(); } }; - DeleteTask _delete; + DelayedTasks _delayedTasks; struct PerService { bool up; @@ -94,6 +116,9 @@ private: std::unique_ptr<MapSubscription> _subscription; PerService *lookup(ManagedRpcServer *rpcsrv); + + void doAdd(const ServiceMapping &mapping); + void doRemove(const ServiceMapping &mapping); public: LocalRpcMonitorMap(FRT_Supervisor &_supervisor); @@ -105,6 +130,7 @@ public: /** for use by register API, will call doneHandler() on inflight script */ void addLocal(const ServiceMapping &mapping, std::unique_ptr<ScriptCommand> inflight); + void add(const ServiceMapping &mapping) override; void remove(const ServiceMapping &mapping) override; |