summaryrefslogtreecommitdiffstats
path: root/slobrok
diff options
context:
space:
mode:
authorArne Juul <arnej@verizonmedia.com>2021-08-18 14:25:38 +0000
committerArne Juul <arnej@verizonmedia.com>2021-08-18 20:26:20 +0000
commit3731e975304f19d07d4614d17998cee39fe82ce3 (patch)
treef2630fdd05770215e3c6f02317b607e9d999c438 /slobrok
parente1544cad33ce650a30bb017f99bba71f29136c84 (diff)
delay add() and remove() for untangling
Diffstat (limited to 'slobrok')
-rw-r--r--slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.cpp39
-rw-r--r--slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.h46
2 files changed, 70 insertions, 15 deletions
diff --git a/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.cpp b/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.cpp
index 3bea2018761..9013717f8b7 100644
--- a/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.cpp
+++ b/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.cpp
@@ -10,8 +10,29 @@ namespace slobrok {
#pragma GCC diagnostic ignored "-Winline"
+void LocalRpcMonitorMap::DelayedTasks::PerformTask() {
+ {
+ std::vector<MUP> deleteAfterSwap;
+ std::swap(deleteAfterSwap, _deleteList);
+ }
+ std::vector<Event> todo;
+ std::swap(todo, _queue);
+ for (const auto & entry : todo) {
+ switch (entry.type) {
+ case EventType::ADD:
+ _target.doAdd(entry.mapping);
+ break;
+ case EventType::REMOVE:
+ _target.doRemove(entry.mapping);
+ break;
+ default:
+ abort();
+ }
+ }
+}
+
LocalRpcMonitorMap::LocalRpcMonitorMap(FRT_Supervisor &supervisor)
- : _delete(supervisor.GetScheduler()),
+ : _delayedTasks(supervisor.GetScheduler(), *this),
_map(),
_dispatcher(),
_history(),
@@ -65,6 +86,14 @@ void LocalRpcMonitorMap::addLocal(const ServiceMapping &mapping,
}
void LocalRpcMonitorMap::add(const ServiceMapping &mapping) {
+ _delayedTasks.handleLater(Event::add(mapping));
+}
+
+void LocalRpcMonitorMap::remove(const ServiceMapping &mapping) {
+ _delayedTasks.handleLater(Event::remove(mapping));
+}
+
+void LocalRpcMonitorMap::doAdd(const ServiceMapping &mapping) {
LOG(debug, "try add: mapping %s->%s",
mapping.name.c_str(), mapping.spec.c_str());
auto old = _map.find(mapping.name);
@@ -88,14 +117,14 @@ void LocalRpcMonitorMap::add(const ServiceMapping &mapping) {
if (removed.up) {
_dispatcher.remove(removed.mapping());
}
- _delete.later(std::move(removed.srv));
+ _delayedTasks.deleteLater(std::move(removed.srv));
}
auto [ iter, was_inserted ] =
_map.try_emplace(mapping.name, globalService(mapping));
LOG_ASSERT(was_inserted);
}
-void LocalRpcMonitorMap::remove(const ServiceMapping &mapping) {
+void LocalRpcMonitorMap::doRemove(const ServiceMapping &mapping) {
auto iter = _map.find(mapping.name);
if (iter != _map.end()) {
PerService removed = std::move(iter->second);
@@ -114,7 +143,7 @@ void LocalRpcMonitorMap::remove(const ServiceMapping &mapping) {
if (removed.up) {
_dispatcher.remove(removed.mapping());
}
- _delete.later(std::move(removed.srv));
+ _delayedTasks.deleteLater(std::move(removed.srv));
} else {
LOG(debug, "tried to remove non-existing mapping %s->%s",
mapping.name.c_str(), mapping.spec.c_str());
@@ -135,7 +164,7 @@ void LocalRpcMonitorMap::notifyFailedRpcSrv(ManagedRpcServer *rpcsrv, std::strin
if (removed.up) {
_dispatcher.remove(removed.mapping());
}
- _delete.later(std::move(removed.srv));
+ _delayedTasks.deleteLater(std::move(removed.srv));
} else if (psd->up) {
psd->up = false;
_dispatcher.remove(psd->mapping());
diff --git a/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.h b/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.h
index 5557bca62f6..424ea82c6b5 100644
--- a/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.h
+++ b/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.h
@@ -29,26 +29,48 @@ class LocalRpcMonitorMap : public IRpcServerManager,
public MapListener
{
private:
- class DeleteTask : public FNET_Task {
+ enum class EventType { ADD, REMOVE };
+
+ struct Event {
+ EventType type;
+ ServiceMapping mapping;
+ static Event add(const ServiceMapping &value) {
+ return Event{EventType::ADD, value};
+ }
+ static Event remove(const ServiceMapping &value) {
+ return Event{EventType::REMOVE, value};
+ }
+ };
+
+ class DelayedTasks : public FNET_Task {
using MUP = std::unique_ptr<ManagedRpcServer>;
- std::vector<MUP> _deleteList;
+ std::vector<MUP> _deleteList;
+ std::vector<Event> _queue;
+ LocalRpcMonitorMap &_target;
public:
- void later(MUP rpcsrv) {
+ void deleteLater(MUP rpcsrv) {
_deleteList.emplace_back(std::move(rpcsrv));
ScheduleNow();
}
- void PerformTask() override {
- std::vector<MUP> deleteAfterSwap;
- std::swap(deleteAfterSwap, _deleteList);
+
+ void handleLater(Event event) {
+ _queue.emplace_back(std::move(event));
+ ScheduleNow();
}
- DeleteTask(FNET_Scheduler *scheduler)
+
+ void PerformTask() override;
+
+ DelayedTasks(FNET_Scheduler *scheduler, LocalRpcMonitorMap &target)
: FNET_Task(scheduler),
- _deleteList()
+ _deleteList(),
+ _queue(),
+ _target(target)
{}
- ~DeleteTask() { Kill(); }
+
+ ~DelayedTasks() { Kill(); }
};
- DeleteTask _delete;
+ DelayedTasks _delayedTasks;
struct PerService {
bool up;
@@ -94,6 +116,9 @@ private:
std::unique_ptr<MapSubscription> _subscription;
PerService *lookup(ManagedRpcServer *rpcsrv);
+
+ void doAdd(const ServiceMapping &mapping);
+ void doRemove(const ServiceMapping &mapping);
public:
LocalRpcMonitorMap(FRT_Supervisor &_supervisor);
@@ -105,6 +130,7 @@ public:
/** for use by register API, will call doneHandler() on inflight script */
void addLocal(const ServiceMapping &mapping,
std::unique_ptr<ScriptCommand> inflight);
+
void add(const ServiceMapping &mapping) override;
void remove(const ServiceMapping &mapping) override;