summaryrefslogtreecommitdiffstats
path: root/slobrok
diff options
context:
space:
mode:
authorArne Juul <arnej@verizonmedia.com>2021-08-18 14:25:38 +0000
committerArne Juul <arnej@verizonmedia.com>2021-08-19 21:30:24 +0000
commit5fe3186391f501839d7627a7118aef0176f3ea7a (patch)
treef2bb45c0da56f5482201e8c33807d0da098d403f /slobrok
parent9b0c8614bcf16fee334fabc72f2ecd977f3f2084 (diff)
add generic MappingMonitor API
* and RPC implementation using ManagedRpcServer as before
Diffstat (limited to 'slobrok')
-rw-r--r--slobrok/src/vespa/slobrok/server/CMakeLists.txt2
-rw-r--r--slobrok/src/vespa/slobrok/server/mapping_monitor.cpp8
-rw-r--r--slobrok/src/vespa/slobrok/server/mapping_monitor.h25
-rw-r--r--slobrok/src/vespa/slobrok/server/rpc_mapping_monitor.cpp85
-rw-r--r--slobrok/src/vespa/slobrok/server/rpc_mapping_monitor.h59
-rw-r--r--slobrok/src/vespa/slobrok/server/service_mapping.h6
6 files changed, 185 insertions, 0 deletions
diff --git a/slobrok/src/vespa/slobrok/server/CMakeLists.txt b/slobrok/src/vespa/slobrok/server/CMakeLists.txt
index 20f5bda4278..b4e7d5de1d5 100644
--- a/slobrok/src/vespa/slobrok/server/CMakeLists.txt
+++ b/slobrok/src/vespa/slobrok/server/CMakeLists.txt
@@ -11,6 +11,7 @@ vespa_add_library(slobrok_slobrokserver
map_diff.cpp
map_listener.cpp
map_source.cpp
+ mapping_monitor.cpp
metrics_producer.cpp
mock_map_listener.cpp
monitor.cpp
@@ -20,6 +21,7 @@ vespa_add_library(slobrok_slobrokserver
remote_check.cpp
remote_slobrok.cpp
reserved_name.cpp
+ rpc_mapping_monitor.cpp
rpc_server_manager.cpp
rpc_server_map.cpp
rpchooks.cpp
diff --git a/slobrok/src/vespa/slobrok/server/mapping_monitor.cpp b/slobrok/src/vespa/slobrok/server/mapping_monitor.cpp
new file mode 100644
index 00000000000..359ba87bf22
--- /dev/null
+++ b/slobrok/src/vespa/slobrok/server/mapping_monitor.cpp
@@ -0,0 +1,8 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "mapping_monitor.h"
+
+namespace slobrok {
+
+} // namespace slobrok
+
diff --git a/slobrok/src/vespa/slobrok/server/mapping_monitor.h b/slobrok/src/vespa/slobrok/server/mapping_monitor.h
new file mode 100644
index 00000000000..d7e1a6975cb
--- /dev/null
+++ b/slobrok/src/vespa/slobrok/server/mapping_monitor.h
@@ -0,0 +1,25 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "service_mapping.h"
+
+namespace slobrok {
+
+struct MappingMonitorListener {
+ virtual void up(const ServiceMapping& mapping) = 0;
+ virtual void down(const ServiceMapping& mapping) = 0;
+protected:
+ ~MappingMonitorListener() = default;
+};
+
+struct MappingMonitor {
+ virtual void target(MappingMonitorListener *listener) = 0;
+ virtual void start(const ServiceMapping& mapping) = 0;
+ virtual void stop(const ServiceMapping& mapping) = 0;
+protected:
+ ~MappingMonitor() = default;
+};
+
+} // namespace slobrok
+
diff --git a/slobrok/src/vespa/slobrok/server/rpc_mapping_monitor.cpp b/slobrok/src/vespa/slobrok/server/rpc_mapping_monitor.cpp
new file mode 100644
index 00000000000..ec56db709f5
--- /dev/null
+++ b/slobrok/src/vespa/slobrok/server/rpc_mapping_monitor.cpp
@@ -0,0 +1,85 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "rpc_mapping_monitor.h"
+
+#include <vespa/log/log.h>
+LOG_SETUP(".slobrok.server.rpc_mapping_monitor");
+
+namespace slobrok {
+
+void RpcMappingMonitor::DelayedTasks::PerformTask() {
+ std::vector<MUP> deleteAfterSwap;
+ std::swap(deleteAfterSwap, _deleteList);
+}
+
+RpcMappingMonitor::RpcMappingMonitor(FRT_Supervisor &orb)
+ : _orb(orb),
+ _delayedTasks(orb.GetScheduler()),
+ _map(),
+ _target(nullptr)
+{}
+
+RpcMappingMonitor::~RpcMappingMonitor() = default;
+
+void RpcMappingMonitor::target(MappingMonitorListener *listener) {
+ if (listener == nullptr) {
+ LOG_ASSERT(_target != nullptr);
+ } else {
+ LOG_ASSERT(_target == nullptr);
+ }
+ _target = listener;
+ LOG(debug, "new target %p", _target);
+}
+
+void RpcMappingMonitor::start(const ServiceMapping& mapping) {
+ LOG(spam, "start %s->%s", mapping.name.c_str(), mapping.spec.c_str());
+ LOG_ASSERT(_map.find(mapping) == _map.end());
+ _map.emplace(mapping,
+ std::make_unique<ManagedRpcServer>(mapping.name, mapping.spec, *this));
+}
+
+void RpcMappingMonitor::stop(const ServiceMapping& mapping) {
+ LOG(spam, "stop %s->%s", mapping.name.c_str(), mapping.spec.c_str());
+ auto iter = _map.find(mapping);
+ LOG_ASSERT(iter != _map.end());
+ _delayedTasks.deleteLater(std::move(iter->second));
+ _map.erase(iter);
+}
+
+void RpcMappingMonitor::notifyFailedRpcSrv(ManagedRpcServer *rpcsrv, std::string errmsg) {
+ ServiceMapping mapping{rpcsrv->getName(), rpcsrv->getSpec()};
+ LOG(spam, "notifyFailed %s->%s", mapping.name.c_str(), mapping.spec.c_str());
+ Map::const_iterator iter = _map.find(mapping);
+ if (iter == _map.end()) {
+ return;
+ }
+ const MUP & managedRpcServer = iter->second;
+ if (managedRpcServer.get() != rpcsrv) {
+ return;
+ }
+ if (_target) {
+ LOG(debug, "service %s [at %s] failed: %s",
+ mapping.name.c_str(), mapping.spec.c_str(), errmsg.c_str());
+ _target->down(mapping);
+ }
+}
+
+void RpcMappingMonitor::notifyOkRpcSrv(ManagedRpcServer *rpcsrv) {
+ ServiceMapping mapping{rpcsrv->getName(), rpcsrv->getSpec()};
+ LOG(spam, "notifyOk %s->%s", mapping.name.c_str(), mapping.spec.c_str());
+ Map::const_iterator iter = _map.find(mapping);
+ if (iter == _map.end()) {
+ return;
+ }
+ const MUP & managedRpcServer = iter->second;
+ if (managedRpcServer.get() != rpcsrv) {
+ return;
+ }
+ if (_target) {
+ LOG(debug, "service %s [at %s] up ok -> target",
+ mapping.name.c_str(), mapping.spec.c_str());
+ _target->up(mapping);
+ }
+}
+
+} // namespace slobrok
diff --git a/slobrok/src/vespa/slobrok/server/rpc_mapping_monitor.h b/slobrok/src/vespa/slobrok/server/rpc_mapping_monitor.h
new file mode 100644
index 00000000000..36043ac41b2
--- /dev/null
+++ b/slobrok/src/vespa/slobrok/server/rpc_mapping_monitor.h
@@ -0,0 +1,59 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "mapping_monitor.h"
+#include "i_rpc_server_manager.h"
+#include "managed_rpc_server.h"
+
+#include <vespa/fnet/frt/supervisor.h>
+#include <vespa/fnet/task.h>
+
+#include <vector>
+#include <memory>
+#include <map>
+
+namespace slobrok {
+
+class RpcMappingMonitor : public MappingMonitor,
+ public IRpcServerManager
+{
+private:
+ using MUP = std::unique_ptr<ManagedRpcServer>;
+
+ using Map = std::map<ServiceMapping, MUP>;
+
+ class DelayedTasks : public FNET_Task {
+ std::vector<MUP> _deleteList;
+ public:
+ void deleteLater(MUP rpcsrv) {
+ _deleteList.emplace_back(std::move(rpcsrv));
+ ScheduleNow();
+ }
+ void PerformTask() override;
+ DelayedTasks(FNET_Scheduler *scheduler)
+ : FNET_Task(scheduler),
+ _deleteList()
+ {}
+ ~DelayedTasks() { Kill(); }
+ };
+
+ FRT_Supervisor& _orb;
+ DelayedTasks _delayedTasks;
+ Map _map;
+ MappingMonitorListener* _target;
+public:
+ RpcMappingMonitor(FRT_Supervisor &orb);
+ ~RpcMappingMonitor();
+
+ void target(MappingMonitorListener *listener) override;
+ void start(const ServiceMapping& mapping) override;
+ void stop(const ServiceMapping& mapping) override;
+
+ void notifyFailedRpcSrv(ManagedRpcServer *rpcsrv, std::string errmsg) override;
+ void notifyOkRpcSrv(ManagedRpcServer *rpcsrv) override;
+ FRT_Supervisor *getSupervisor() override { return &_orb; }
+};
+
+} // namespace slobrok
+
diff --git a/slobrok/src/vespa/slobrok/server/service_mapping.h b/slobrok/src/vespa/slobrok/server/service_mapping.h
index 6c540b8e9b5..6bbf910ee3e 100644
--- a/slobrok/src/vespa/slobrok/server/service_mapping.h
+++ b/slobrok/src/vespa/slobrok/server/service_mapping.h
@@ -16,6 +16,12 @@ struct ServiceMapping {
bool operator== (const ServiceMapping &other) const {
return name == other.name && spec == other.spec;
}
+
+ bool operator< (const ServiceMapping &other) const {
+ if (name < other.name) return true;
+ if (other.name < name) return false;
+ return spec < other.spec;
+ }
};
typedef std::vector<ServiceMapping> ServiceMappingList;