diff options
author | Arne H Juul <arnej27959@users.noreply.github.com> | 2021-08-16 10:44:48 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-08-16 10:44:48 +0200 |
commit | ae9bd93fa7e7d8ce721eb2a75fb5600678b2c7c4 (patch) | |
tree | 2cca25e81e46f6e066b517208f4a3a0f0ac3a3cb /slobrok | |
parent | b71ef7c4e1a0f44f000126fcd50353e0cf63bc3b (diff) | |
parent | 2bddc841afefd89d3de25d83f6b97b465923e455 (diff) |
Merge pull request #18743 from vespa-engine/arnej/wire-consensus-map
Arnej/wire consensus map
Diffstat (limited to 'slobrok')
5 files changed, 24 insertions, 8 deletions
diff --git a/slobrok/src/vespa/slobrok/server/exchange_manager.cpp b/slobrok/src/vespa/slobrok/server/exchange_manager.cpp index 632c823a2c1..7e693a9ea3d 100644 --- a/slobrok/src/vespa/slobrok/server/exchange_manager.cpp +++ b/slobrok/src/vespa/slobrok/server/exchange_manager.cpp @@ -39,6 +39,7 @@ ExchangeManager::addPartner(const std::string & name, const std::string & spec) auto [ it, wasNew ] = _partners.emplace(name, std::make_unique<RemoteSlobrok>(name, spec, *this)); LOG_ASSERT(wasNew); RemoteSlobrok & partner = *it->second; + partner.remoteMap().registerListener(_env.consensusMap()); partner.tryConnect(); return OkState(); } @@ -51,6 +52,7 @@ ExchangeManager::removePartner(const std::string & name) LOG_ASSERT(oldremote); _partners.erase(name); oldremote->shutdown(); + oldremote->remoteMap().unregisterListener(_env.consensusMap()); } std::vector<std::string> diff --git a/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.cpp b/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.cpp index 50a11c7b521..7141da29b6d 100644 --- a/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.cpp +++ b/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.cpp @@ -9,9 +9,11 @@ namespace slobrok { LocalRpcMonitorMap::LocalRpcMonitorMap(FRT_Supervisor &supervisor) : _map(), + _dispatcher(), _history(), _supervisor(supervisor) { + _dispatcher.registerListener(_history); } LocalRpcMonitorMap::~LocalRpcMonitorMap() = default; @@ -46,7 +48,7 @@ void LocalRpcMonitorMap::add(const ServiceMapping &mapping) { mapping.name.c_str(), mapping.spec.c_str(), exists.name().c_str(), exists.spec().c_str()); if (exists.up) { - _history.remove(exists.mapping()); + _dispatcher.remove(exists.mapping()); } _map.erase(old); } @@ -69,9 +71,10 @@ void LocalRpcMonitorMap::remove(const ServiceMapping &mapping) { mapping.name.c_str(), exists.spec().c_str(), mapping.spec.c_str()); + return; } if (exists.up) { - _history.remove(exists.mapping()); + _dispatcher.remove(exists.mapping()); } _map.erase(iter); } else { @@ -88,7 +91,7 @@ void LocalRpcMonitorMap::notifyFailedRpcSrv(ManagedRpcServer *rpcsrv, std::strin LOG(debug, "failed: %s->%s", mapping.name.c_str(), mapping.spec.c_str()); if (psd->up) { psd->up = false; - _history.remove(mapping); + _dispatcher.remove(mapping); } } @@ -100,7 +103,7 @@ void LocalRpcMonitorMap::notifyOkRpcSrv(ManagedRpcServer *rpcsrv) { LOG(debug, "ok: %s->%s", mapping.name.c_str(), mapping.spec.c_str()); if (! psd->up) { psd->up = true; - _history.add(mapping); + _dispatcher.add(mapping); } } diff --git a/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.h b/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.h index 3f732d6875b..7d3ed6b466a 100644 --- a/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.h +++ b/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.h @@ -1,14 +1,16 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once -#include "map_listener.h" -#include "map_source.h" -#include "service_mapping.h" -#include "service_map_history.h" #include "i_rpc_server_manager.h" #include "managed_rpc_server.h" #include "map_listener.h" +#include "map_listener.h" +#include "map_source.h" #include "named_service.h" +#include "proxy_map_source.h" +#include "service_map_history.h" +#include "service_mapping.h" + #include <vector> #include <memory> #include <map> @@ -37,6 +39,7 @@ private: using Map = std::map<vespalib::string, PerService>; Map _map; + ProxyMapSource _dispatcher; ServiceMapHistory _history; FRT_Supervisor &_supervisor; @@ -46,6 +49,7 @@ public: LocalRpcMonitorMap(FRT_Supervisor &_supervisor); ~LocalRpcMonitorMap(); + MapSource &dispatcher() { return _dispatcher; } ServiceMapHistory & history(); void add(const ServiceMapping &mapping) override; diff --git a/slobrok/src/vespa/slobrok/server/sbenv.cpp b/slobrok/src/vespa/slobrok/server/sbenv.cpp index 6bf5bbd985d..bf7466bb397 100644 --- a/slobrok/src/vespa/slobrok/server/sbenv.cpp +++ b/slobrok/src/vespa/slobrok/server/sbenv.cpp @@ -118,6 +118,7 @@ SBEnv::SBEnv(const ConfigShim &shim) srandom(time(nullptr) ^ getpid()); _rpcsrvmap.proxy().registerListener(_globalVisibleHistory); _rpcsrvmap.proxy().registerListener(_localRpcMonitorMap); + _localRpcMonitorMap.dispatcher().registerListener(_consensusMap); _rpcHooks.initRPC(getSupervisor()); } diff --git a/slobrok/src/vespa/slobrok/server/sbenv.h b/slobrok/src/vespa/slobrok/server/sbenv.h index 853e72e83c5..18ac40c2ba7 100644 --- a/slobrok/src/vespa/slobrok/server/sbenv.h +++ b/slobrok/src/vespa/slobrok/server/sbenv.h @@ -10,6 +10,7 @@ #include "ok_state.h" #include "local_rpc_monitor_map.h" #include "metrics_producer.h" +#include "union_service_map.h" #include <vespa/config-slobroks.h> #include <vespa/slobrok/cfg.h> #include <vespa/vespalib/net/simple_health_producer.h> @@ -56,6 +57,7 @@ private: MetricsProducer _metrics; vespalib::SimpleComponentConfigProducer _components; LocalRpcMonitorMap _localRpcMonitorMap; + UnionServiceMap _consensusMap; ServiceMapHistory _globalVisibleHistory; public: @@ -82,6 +84,10 @@ public: return _localRpcMonitorMap.history(); } + UnionServiceMap& consensusMap() { + return _consensusMap; + } + const std::string & mySpec() const { return _me; } bool isSuspended() const { return false; } |