diff options
author | Arne H Juul <arnej27959@users.noreply.github.com> | 2021-08-11 10:37:21 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-08-11 10:37:21 +0200 |
commit | 14901467ad821d5dfa0a326ebbdd944c275eb6c9 (patch) | |
tree | 422ee535a3031d44ca1f20efbd4e0b98b1e32e21 | |
parent | 2b0575e6c4ea9c96d4bbe46e1f27ec4efcdbc51d (diff) | |
parent | 1ac3fc5e1497f5a4fc9229137520d37ed311625d (diff) |
Merge pull request #18691 from vespa-engine/arnej/add-local-rpc-monitor-map
add LocalRpcMonitorMap serving local history
-rw-r--r-- | slobrok/src/vespa/slobrok/server/CMakeLists.txt | 1 | ||||
-rw-r--r-- | slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.cpp | 111 | ||||
-rw-r--r-- | slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.h | 62 | ||||
-rw-r--r-- | slobrok/src/vespa/slobrok/server/sbenv.cpp | 2 | ||||
-rw-r--r-- | slobrok/src/vespa/slobrok/server/sbenv.h | 4 |
5 files changed, 179 insertions, 1 deletions
diff --git a/slobrok/src/vespa/slobrok/server/CMakeLists.txt b/slobrok/src/vespa/slobrok/server/CMakeLists.txt index 153ce681f86..e75b94d26bc 100644 --- a/slobrok/src/vespa/slobrok/server/CMakeLists.txt +++ b/slobrok/src/vespa/slobrok/server/CMakeLists.txt @@ -6,6 +6,7 @@ vespa_add_library(slobrok_slobrokserver exchange_manager.cpp i_monitored_server.cpp i_rpc_server_manager.cpp + local_rpc_monitor_map.cpp managed_rpc_server.cpp map_diff.cpp map_listener.cpp diff --git a/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.cpp b/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.cpp new file mode 100644 index 00000000000..50a11c7b521 --- /dev/null +++ b/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.cpp @@ -0,0 +1,111 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "local_rpc_monitor_map.h" +#include "sbenv.h" +#include <vespa/log/log.h> +LOG_SETUP(".slobrok.server.local_rpc_monitor_map"); + +namespace slobrok { + +LocalRpcMonitorMap::LocalRpcMonitorMap(FRT_Supervisor &supervisor) + : _map(), + _history(), + _supervisor(supervisor) +{ +} + +LocalRpcMonitorMap::~LocalRpcMonitorMap() = default; + +LocalRpcMonitorMap::PerService * +LocalRpcMonitorMap::lookup(const ServiceMapping &mapping) { + auto iter = _map.find(mapping.name); + if (iter != _map.end()) { + PerService & psd = iter->second; + LOG_ASSERT(psd.spec() == mapping.spec); + return &psd; + } + return nullptr; +} + +ServiceMapHistory & LocalRpcMonitorMap::history() { + return _history; +} + +void LocalRpcMonitorMap::add(const ServiceMapping &mapping) { + LOG(debug, "try add: mapping %s->%s", + mapping.name.c_str(), mapping.spec.c_str()); + auto old = _map.find(mapping.name); + if (old != _map.end()) { + PerService & exists = old->second; + if (exists.spec() == mapping.spec) { + LOG(debug, "added mapping %s->%s was already present", + mapping.name.c_str(), mapping.spec.c_str()); + return; + } + LOG(warning, "added mapping %s->%s, but already had conflicting mapping %s->%s", + mapping.name.c_str(), mapping.spec.c_str(), + exists.name().c_str(), exists.spec().c_str()); + if (exists.up) { + _history.remove(exists.mapping()); + } + _map.erase(old); + } + auto [ iter, was_inserted ] = + _map.try_emplace(mapping.name, + false, + std::make_unique<ManagedRpcServer>(mapping.name, + mapping.spec, + *this)); + LOG_ASSERT(was_inserted); +} + +void LocalRpcMonitorMap::remove(const ServiceMapping &mapping) { + auto iter = _map.find(mapping.name); + if (iter != _map.end()) { + LOG(debug, "remove: mapping %s->%s", mapping.name.c_str(), mapping.spec.c_str()); + PerService & exists = iter->second; + if (mapping.spec != exists.spec()) { + LOG(warning, "inconsistent specs for name '%s': had '%s', but was asked to remove '%s'", + mapping.name.c_str(), + exists.spec().c_str(), + mapping.spec.c_str()); + } + if (exists.up) { + _history.remove(exists.mapping()); + } + _map.erase(iter); + } else { + LOG(debug, "tried to remove non-existing mapping %s->%s", + mapping.name.c_str(), mapping.spec.c_str()); + } +} + +void LocalRpcMonitorMap::notifyFailedRpcSrv(ManagedRpcServer *rpcsrv, std::string) { + ServiceMapping mapping{rpcsrv->getName(), rpcsrv->getSpec()}; + auto * psd = lookup(mapping); + LOG_ASSERT(psd); + LOG_ASSERT(psd->srv.get() == rpcsrv); + LOG(debug, "failed: %s->%s", mapping.name.c_str(), mapping.spec.c_str()); + if (psd->up) { + psd->up = false; + _history.remove(mapping); + } +} + +void LocalRpcMonitorMap::notifyOkRpcSrv(ManagedRpcServer *rpcsrv) { + ServiceMapping mapping{rpcsrv->getName(), rpcsrv->getSpec()}; + auto * psd = lookup(mapping); + LOG_ASSERT(psd); + LOG_ASSERT(psd->srv.get() == rpcsrv); + LOG(debug, "ok: %s->%s", mapping.name.c_str(), mapping.spec.c_str()); + if (! psd->up) { + psd->up = true; + _history.add(mapping); + } +} + +FRT_Supervisor * LocalRpcMonitorMap::getSupervisor() { + return &_supervisor; +} + +} // namespace slobrok diff --git a/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.h b/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.h new file mode 100644 index 00000000000..3f732d6875b --- /dev/null +++ b/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.h @@ -0,0 +1,62 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include "map_listener.h" +#include "map_source.h" +#include "service_mapping.h" +#include "service_map_history.h" +#include "i_rpc_server_manager.h" +#include "managed_rpc_server.h" +#include "map_listener.h" +#include "named_service.h" +#include <vector> +#include <memory> +#include <map> + +namespace slobrok { + +/** + * @class LocalRpcMonitorMap + * @brief A collection of ManagedRpcServer objects + * + * Tracks up/down status for name->spec combinations + * that are considered for publication locally. + **/ +class LocalRpcMonitorMap : public IRpcServerManager, + public MapListener +{ +private: + struct PerService { + bool up; + std::unique_ptr<ManagedRpcServer> srv; + vespalib::string name() { return srv->getName(); } + vespalib::string spec() { return srv->getSpec(); } + ServiceMapping mapping() { return ServiceMapping{srv->getName(), srv->getSpec()}; } + }; + + using Map = std::map<vespalib::string, PerService>; + + Map _map; + ServiceMapHistory _history; + FRT_Supervisor &_supervisor; + + PerService * lookup(const ServiceMapping &mapping); + +public: + LocalRpcMonitorMap(FRT_Supervisor &_supervisor); + ~LocalRpcMonitorMap(); + + ServiceMapHistory & history(); + + void add(const ServiceMapping &mapping) override; + void remove(const ServiceMapping &mapping) override; + + void notifyFailedRpcSrv(ManagedRpcServer *rpcsrv, std::string errmsg) override; + void notifyOkRpcSrv(ManagedRpcServer *rpcsrv) override; + FRT_Supervisor *getSupervisor() override; +}; + +//----------------------------------------------------------------------------- + +} // namespace slobrok + diff --git a/slobrok/src/vespa/slobrok/server/sbenv.cpp b/slobrok/src/vespa/slobrok/server/sbenv.cpp index 78c34f1bdb4..6bf5bbd985d 100644 --- a/slobrok/src/vespa/slobrok/server/sbenv.cpp +++ b/slobrok/src/vespa/slobrok/server/sbenv.cpp @@ -110,12 +110,14 @@ SBEnv::SBEnv(const ConfigShim &shim) _health(), _metrics(_rpcHooks, *_transport), _components(), + _localRpcMonitorMap(*_supervisor), _rpcsrvmanager(*this), _exchanger(*this, _rpcsrvmap), _rpcsrvmap() { srandom(time(nullptr) ^ getpid()); _rpcsrvmap.proxy().registerListener(_globalVisibleHistory); + _rpcsrvmap.proxy().registerListener(_localRpcMonitorMap); _rpcHooks.initRPC(getSupervisor()); } diff --git a/slobrok/src/vespa/slobrok/server/sbenv.h b/slobrok/src/vespa/slobrok/server/sbenv.h index 6836715854a..853e72e83c5 100644 --- a/slobrok/src/vespa/slobrok/server/sbenv.h +++ b/slobrok/src/vespa/slobrok/server/sbenv.h @@ -8,6 +8,7 @@ #include "exchange_manager.h" #include "configshim.h" #include "ok_state.h" +#include "local_rpc_monitor_map.h" #include "metrics_producer.h" #include <vespa/config-slobroks.h> #include <vespa/slobrok/cfg.h> @@ -54,6 +55,7 @@ private: vespalib::SimpleHealthProducer _health; MetricsProducer _metrics; vespalib::SimpleComponentConfigProducer _components; + LocalRpcMonitorMap _localRpcMonitorMap; ServiceMapHistory _globalVisibleHistory; public: @@ -77,7 +79,7 @@ public: } ServiceMapHistory& localHistory() { - return _globalVisibleHistory; + return _localRpcMonitorMap.history(); } const std::string & mySpec() const { return _me; } |