diff options
author | Arne Juul <arnej@verizonmedia.com> | 2021-07-15 09:54:42 +0000 |
---|---|---|
committer | Arne Juul <arnej@verizonmedia.com> | 2021-08-05 08:38:27 +0000 |
commit | 5df45d85a421da415715a2bb74851ea553af2035 (patch) | |
tree | 5eb407d41a35c24fb9ea7789729d5ce571b5bf67 /slobrok | |
parent | 16277f3f494c1bcbe54e3f6d175bc536cf98f99e (diff) |
add LocalRpcMonitorMap serving local history
Diffstat (limited to 'slobrok')
-rw-r--r-- | slobrok/src/vespa/slobrok/server/CMakeLists.txt | 1 | ||||
-rw-r--r-- | slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.cpp | 111 | ||||
-rw-r--r-- | slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.h | 61 | ||||
-rw-r--r-- | slobrok/src/vespa/slobrok/server/sbenv.cpp | 2 | ||||
-rw-r--r-- | slobrok/src/vespa/slobrok/server/sbenv.h | 4 |
5 files changed, 178 insertions, 1 deletions
diff --git a/slobrok/src/vespa/slobrok/server/CMakeLists.txt b/slobrok/src/vespa/slobrok/server/CMakeLists.txt index 5b9059af932..c837b0ac106 100644 --- a/slobrok/src/vespa/slobrok/server/CMakeLists.txt +++ b/slobrok/src/vespa/slobrok/server/CMakeLists.txt @@ -6,6 +6,7 @@ vespa_add_library(slobrok_slobrokserver exchange_manager.cpp i_monitored_server.cpp i_rpc_server_manager.cpp + local_rpc_monitor_map.cpp managed_rpc_server.cpp map_diff.cpp map_listener.cpp diff --git a/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.cpp b/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.cpp new file mode 100644 index 00000000000..c229ac9ab4d --- /dev/null +++ b/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.cpp @@ -0,0 +1,111 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "local_rpc_monitor_map.h" +#include "sbenv.h" +#include <vespa/log/log.h> +LOG_SETUP(".slobrok.server.local_rpc_monitor_map"); + +namespace slobrok { + +LocalRpcMonitorMap::LocalRpcMonitorMap(FRT_Supervisor &supervisor) + : _map(), + _history(), + _supervisor(supervisor) +{ +} + +LocalRpcMonitorMap::~LocalRpcMonitorMap() = default; + +LocalRpcMonitorMap::PerService * +LocalRpcMonitorMap::lookup(const ServiceMapping &mapping) { + auto iter = _map.find(mapping.name); + if (iter != _map.end()) { + PerService & psd = iter->second; + if (psd.mapping.spec == mapping.spec) { + return &psd; + } + } + return nullptr; +} + +ServiceMapHistory & LocalRpcMonitorMap::localHistory() { + return _history; +} + +void LocalRpcMonitorMap::add(const ServiceMapping &mapping) { + LOG(debug, "try add: mapping %s->%s", + mapping.name.c_str(), mapping.spec.c_str()); + auto old = _map.find(mapping.name); + if (old != _map.end()) { + PerService & exists = old->second; + if (exists.mapping.spec == mapping.spec) { + LOG(debug, "added mapping %s->%s was already present", + mapping.name.c_str(), mapping.spec.c_str()); + return; + } + LOG(warning, "added mapping %s->%s, but already had conflicting mapping %s->%s", + mapping.name.c_str(), mapping.spec.c_str(), + exists.mapping.name.c_str(), exists.mapping.spec.c_str()); + if (exists.up) { + _history.remove(exists.mapping); + } + _map.erase(old); + } + auto [ iter, was_inserted ] = _map.try_emplace(mapping.name, mapping); + LOG_ASSERT(was_inserted); + PerService & fill = iter->second; + fill.srv = std::make_unique<ManagedRpcServer>(mapping.name, + mapping.spec, + *this); +} + +void LocalRpcMonitorMap::remove(const ServiceMapping &mapping) { + auto iter = _map.find(mapping.name); + if (iter != _map.end()) { + LOG(debug, "remove: mapping %s->%s", mapping.name.c_str(), mapping.spec.c_str()); + PerService & exists = iter->second; + if (mapping.spec != exists.mapping.spec) { + LOG(warning, "inconsistent specs for name '%s': had '%s', but was asked to remove '%s'", + mapping.name.c_str(), + exists.mapping.spec.c_str(), + mapping.spec.c_str()); + } + if (exists.up) { + _history.remove(exists.mapping); + } + _map.erase(iter); + } else { + LOG(debug, "tried to remove non-existing mapping %s->%s", + mapping.name.c_str(), mapping.spec.c_str()); + } +} + +void LocalRpcMonitorMap::notifyFailedRpcSrv(ManagedRpcServer *rpcsrv, std::string) { + ServiceMapping mapping{rpcsrv->getName(), rpcsrv->getSpec()}; + auto * psd = lookup(mapping); + LOG_ASSERT(psd); + LOG_ASSERT(psd->srv.get() == rpcsrv); + LOG(debug, "failed: %s->%s", mapping.name.c_str(), mapping.spec.c_str()); + if (psd->up) { + psd->up = false; + _history.remove(mapping); + } +} + +void LocalRpcMonitorMap::notifyOkRpcSrv(ManagedRpcServer *rpcsrv) { + ServiceMapping mapping{rpcsrv->getName(), rpcsrv->getSpec()}; + auto * psd = lookup(mapping); + LOG_ASSERT(psd); + LOG_ASSERT(psd->srv.get() == rpcsrv); + LOG(debug, "ok: %s->%s", mapping.name.c_str(), mapping.spec.c_str()); + if (! psd->up) { + psd->up = true; + _history.add(mapping); + } +} + +FRT_Supervisor * LocalRpcMonitorMap::getSupervisor() { + return &_supervisor; +} + +} // namespace slobrok diff --git a/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.h b/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.h new file mode 100644 index 00000000000..7caea0bfdc2 --- /dev/null +++ b/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.h @@ -0,0 +1,61 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include "map_listener.h" +#include "map_source.h" +#include "service_mapping.h" +#include "service_map_history.h" +#include "i_rpc_server_manager.h" +#include "managed_rpc_server.h" +#include "map_listener.h" +#include "named_service.h" +#include <vector> +#include <memory> +#include <map> + +namespace slobrok { + +/** + * @class LocalRpcMonitorMap + * @brief A collection of ManagedRpcServer objects + * + * Tracks up/down status for name->spec combinations + * that are considered for publication locally. + **/ +class LocalRpcMonitorMap : public IRpcServerManager, + public MapListener +{ +private: + struct PerService { + const ServiceMapping mapping; + bool up = false; + std::unique_ptr<ManagedRpcServer> srv = {}; + }; + + using Map = std::map<vespalib::string, PerService>; + + Map _map; + ServiceMapHistory _history; + FRT_Supervisor &_supervisor; + vespalib::GenCnt _genFromGlobal; + + PerService * lookup(const ServiceMapping &mapping); + +public: + LocalRpcMonitorMap(FRT_Supervisor &_supervisor); + ~LocalRpcMonitorMap(); + + ServiceMapHistory & localHistory(); + + void add(const ServiceMapping &mapping) override; + void remove(const ServiceMapping &mapping) override; + + void notifyFailedRpcSrv(ManagedRpcServer *rpcsrv, std::string errmsg) override; + void notifyOkRpcSrv(ManagedRpcServer *rpcsrv) override; + FRT_Supervisor *getSupervisor() override; +}; + +//----------------------------------------------------------------------------- + +} // namespace slobrok + diff --git a/slobrok/src/vespa/slobrok/server/sbenv.cpp b/slobrok/src/vespa/slobrok/server/sbenv.cpp index 78c34f1bdb4..6bf5bbd985d 100644 --- a/slobrok/src/vespa/slobrok/server/sbenv.cpp +++ b/slobrok/src/vespa/slobrok/server/sbenv.cpp @@ -110,12 +110,14 @@ SBEnv::SBEnv(const ConfigShim &shim) _health(), _metrics(_rpcHooks, *_transport), _components(), + _localRpcMonitorMap(*_supervisor), _rpcsrvmanager(*this), _exchanger(*this, _rpcsrvmap), _rpcsrvmap() { srandom(time(nullptr) ^ getpid()); _rpcsrvmap.proxy().registerListener(_globalVisibleHistory); + _rpcsrvmap.proxy().registerListener(_localRpcMonitorMap); _rpcHooks.initRPC(getSupervisor()); } diff --git a/slobrok/src/vespa/slobrok/server/sbenv.h b/slobrok/src/vespa/slobrok/server/sbenv.h index 6836715854a..5b4e838f76c 100644 --- a/slobrok/src/vespa/slobrok/server/sbenv.h +++ b/slobrok/src/vespa/slobrok/server/sbenv.h @@ -8,6 +8,7 @@ #include "exchange_manager.h" #include "configshim.h" #include "ok_state.h" +#include "local_rpc_monitor_map.h" #include "metrics_producer.h" #include <vespa/config-slobroks.h> #include <vespa/slobrok/cfg.h> @@ -54,6 +55,7 @@ private: vespalib::SimpleHealthProducer _health; MetricsProducer _metrics; vespalib::SimpleComponentConfigProducer _components; + LocalRpcMonitorMap _localRpcMonitorMap; ServiceMapHistory _globalVisibleHistory; public: @@ -77,7 +79,7 @@ public: } ServiceMapHistory& localHistory() { - return _globalVisibleHistory; + return _localRpcMonitorMap.localHistory(); } const std::string & mySpec() const { return _me; } |