summaryrefslogtreecommitdiffstats
path: root/slobrok
diff options
context:
space:
mode:
authorArne Juul <arnej@verizonmedia.com>2021-07-15 09:54:42 +0000
committerArne Juul <arnej@verizonmedia.com>2021-08-05 08:38:27 +0000
commit5df45d85a421da415715a2bb74851ea553af2035 (patch)
tree5eb407d41a35c24fb9ea7789729d5ce571b5bf67 /slobrok
parent16277f3f494c1bcbe54e3f6d175bc536cf98f99e (diff)
add LocalRpcMonitorMap serving local history
Diffstat (limited to 'slobrok')
-rw-r--r--slobrok/src/vespa/slobrok/server/CMakeLists.txt1
-rw-r--r--slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.cpp111
-rw-r--r--slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.h61
-rw-r--r--slobrok/src/vespa/slobrok/server/sbenv.cpp2
-rw-r--r--slobrok/src/vespa/slobrok/server/sbenv.h4
5 files changed, 178 insertions, 1 deletions
diff --git a/slobrok/src/vespa/slobrok/server/CMakeLists.txt b/slobrok/src/vespa/slobrok/server/CMakeLists.txt
index 5b9059af932..c837b0ac106 100644
--- a/slobrok/src/vespa/slobrok/server/CMakeLists.txt
+++ b/slobrok/src/vespa/slobrok/server/CMakeLists.txt
@@ -6,6 +6,7 @@ vespa_add_library(slobrok_slobrokserver
exchange_manager.cpp
i_monitored_server.cpp
i_rpc_server_manager.cpp
+ local_rpc_monitor_map.cpp
managed_rpc_server.cpp
map_diff.cpp
map_listener.cpp
diff --git a/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.cpp b/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.cpp
new file mode 100644
index 00000000000..c229ac9ab4d
--- /dev/null
+++ b/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.cpp
@@ -0,0 +1,111 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "local_rpc_monitor_map.h"
+#include "sbenv.h"
+#include <vespa/log/log.h>
+LOG_SETUP(".slobrok.server.local_rpc_monitor_map");
+
+namespace slobrok {
+
+LocalRpcMonitorMap::LocalRpcMonitorMap(FRT_Supervisor &supervisor)
+ : _map(),
+ _history(),
+ _supervisor(supervisor)
+{
+}
+
+LocalRpcMonitorMap::~LocalRpcMonitorMap() = default;
+
+LocalRpcMonitorMap::PerService *
+LocalRpcMonitorMap::lookup(const ServiceMapping &mapping) {
+ auto iter = _map.find(mapping.name);
+ if (iter != _map.end()) {
+ PerService & psd = iter->second;
+ if (psd.mapping.spec == mapping.spec) {
+ return &psd;
+ }
+ }
+ return nullptr;
+}
+
+ServiceMapHistory & LocalRpcMonitorMap::localHistory() {
+ return _history;
+}
+
+void LocalRpcMonitorMap::add(const ServiceMapping &mapping) {
+ LOG(debug, "try add: mapping %s->%s",
+ mapping.name.c_str(), mapping.spec.c_str());
+ auto old = _map.find(mapping.name);
+ if (old != _map.end()) {
+ PerService & exists = old->second;
+ if (exists.mapping.spec == mapping.spec) {
+ LOG(debug, "added mapping %s->%s was already present",
+ mapping.name.c_str(), mapping.spec.c_str());
+ return;
+ }
+ LOG(warning, "added mapping %s->%s, but already had conflicting mapping %s->%s",
+ mapping.name.c_str(), mapping.spec.c_str(),
+ exists.mapping.name.c_str(), exists.mapping.spec.c_str());
+ if (exists.up) {
+ _history.remove(exists.mapping);
+ }
+ _map.erase(old);
+ }
+ auto [ iter, was_inserted ] = _map.try_emplace(mapping.name, mapping);
+ LOG_ASSERT(was_inserted);
+ PerService & fill = iter->second;
+ fill.srv = std::make_unique<ManagedRpcServer>(mapping.name,
+ mapping.spec,
+ *this);
+}
+
+void LocalRpcMonitorMap::remove(const ServiceMapping &mapping) {
+ auto iter = _map.find(mapping.name);
+ if (iter != _map.end()) {
+ LOG(debug, "remove: mapping %s->%s", mapping.name.c_str(), mapping.spec.c_str());
+ PerService & exists = iter->second;
+ if (mapping.spec != exists.mapping.spec) {
+ LOG(warning, "inconsistent specs for name '%s': had '%s', but was asked to remove '%s'",
+ mapping.name.c_str(),
+ exists.mapping.spec.c_str(),
+ mapping.spec.c_str());
+ }
+ if (exists.up) {
+ _history.remove(exists.mapping);
+ }
+ _map.erase(iter);
+ } else {
+ LOG(debug, "tried to remove non-existing mapping %s->%s",
+ mapping.name.c_str(), mapping.spec.c_str());
+ }
+}
+
+void LocalRpcMonitorMap::notifyFailedRpcSrv(ManagedRpcServer *rpcsrv, std::string) {
+ ServiceMapping mapping{rpcsrv->getName(), rpcsrv->getSpec()};
+ auto * psd = lookup(mapping);
+ LOG_ASSERT(psd);
+ LOG_ASSERT(psd->srv.get() == rpcsrv);
+ LOG(debug, "failed: %s->%s", mapping.name.c_str(), mapping.spec.c_str());
+ if (psd->up) {
+ psd->up = false;
+ _history.remove(mapping);
+ }
+}
+
+void LocalRpcMonitorMap::notifyOkRpcSrv(ManagedRpcServer *rpcsrv) {
+ ServiceMapping mapping{rpcsrv->getName(), rpcsrv->getSpec()};
+ auto * psd = lookup(mapping);
+ LOG_ASSERT(psd);
+ LOG_ASSERT(psd->srv.get() == rpcsrv);
+ LOG(debug, "ok: %s->%s", mapping.name.c_str(), mapping.spec.c_str());
+ if (! psd->up) {
+ psd->up = true;
+ _history.add(mapping);
+ }
+}
+
+FRT_Supervisor * LocalRpcMonitorMap::getSupervisor() {
+ return &_supervisor;
+}
+
+} // namespace slobrok
diff --git a/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.h b/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.h
new file mode 100644
index 00000000000..7caea0bfdc2
--- /dev/null
+++ b/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.h
@@ -0,0 +1,61 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include "map_listener.h"
+#include "map_source.h"
+#include "service_mapping.h"
+#include "service_map_history.h"
+#include "i_rpc_server_manager.h"
+#include "managed_rpc_server.h"
+#include "map_listener.h"
+#include "named_service.h"
+#include <vector>
+#include <memory>
+#include <map>
+
+namespace slobrok {
+
+/**
+ * @class LocalRpcMonitorMap
+ * @brief A collection of ManagedRpcServer objects
+ *
+ * Tracks up/down status for name->spec combinations
+ * that are considered for publication locally.
+ **/
+class LocalRpcMonitorMap : public IRpcServerManager,
+ public MapListener
+{
+private:
+ struct PerService {
+ const ServiceMapping mapping;
+ bool up = false;
+ std::unique_ptr<ManagedRpcServer> srv = {};
+ };
+
+ using Map = std::map<vespalib::string, PerService>;
+
+ Map _map;
+ ServiceMapHistory _history;
+ FRT_Supervisor &_supervisor;
+ vespalib::GenCnt _genFromGlobal;
+
+ PerService * lookup(const ServiceMapping &mapping);
+
+public:
+ LocalRpcMonitorMap(FRT_Supervisor &_supervisor);
+ ~LocalRpcMonitorMap();
+
+ ServiceMapHistory & localHistory();
+
+ void add(const ServiceMapping &mapping) override;
+ void remove(const ServiceMapping &mapping) override;
+
+ void notifyFailedRpcSrv(ManagedRpcServer *rpcsrv, std::string errmsg) override;
+ void notifyOkRpcSrv(ManagedRpcServer *rpcsrv) override;
+ FRT_Supervisor *getSupervisor() override;
+};
+
+//-----------------------------------------------------------------------------
+
+} // namespace slobrok
+
diff --git a/slobrok/src/vespa/slobrok/server/sbenv.cpp b/slobrok/src/vespa/slobrok/server/sbenv.cpp
index 78c34f1bdb4..6bf5bbd985d 100644
--- a/slobrok/src/vespa/slobrok/server/sbenv.cpp
+++ b/slobrok/src/vespa/slobrok/server/sbenv.cpp
@@ -110,12 +110,14 @@ SBEnv::SBEnv(const ConfigShim &shim)
_health(),
_metrics(_rpcHooks, *_transport),
_components(),
+ _localRpcMonitorMap(*_supervisor),
_rpcsrvmanager(*this),
_exchanger(*this, _rpcsrvmap),
_rpcsrvmap()
{
srandom(time(nullptr) ^ getpid());
_rpcsrvmap.proxy().registerListener(_globalVisibleHistory);
+ _rpcsrvmap.proxy().registerListener(_localRpcMonitorMap);
_rpcHooks.initRPC(getSupervisor());
}
diff --git a/slobrok/src/vespa/slobrok/server/sbenv.h b/slobrok/src/vespa/slobrok/server/sbenv.h
index 6836715854a..5b4e838f76c 100644
--- a/slobrok/src/vespa/slobrok/server/sbenv.h
+++ b/slobrok/src/vespa/slobrok/server/sbenv.h
@@ -8,6 +8,7 @@
#include "exchange_manager.h"
#include "configshim.h"
#include "ok_state.h"
+#include "local_rpc_monitor_map.h"
#include "metrics_producer.h"
#include <vespa/config-slobroks.h>
#include <vespa/slobrok/cfg.h>
@@ -54,6 +55,7 @@ private:
vespalib::SimpleHealthProducer _health;
MetricsProducer _metrics;
vespalib::SimpleComponentConfigProducer _components;
+ LocalRpcMonitorMap _localRpcMonitorMap;
ServiceMapHistory _globalVisibleHistory;
public:
@@ -77,7 +79,7 @@ public:
}
ServiceMapHistory& localHistory() {
- return _globalVisibleHistory;
+ return _localRpcMonitorMap.localHistory();
}
const std::string & mySpec() const { return _me; }