summaryrefslogtreecommitdiffstats
path: root/slobrok
diff options
context:
space:
mode:
authorArne H Juul <arnej27959@users.noreply.github.com>2021-08-16 10:44:48 +0200
committerGitHub <noreply@github.com>2021-08-16 10:44:48 +0200
commitae9bd93fa7e7d8ce721eb2a75fb5600678b2c7c4 (patch)
tree2cca25e81e46f6e066b517208f4a3a0f0ac3a3cb /slobrok
parentb71ef7c4e1a0f44f000126fcd50353e0cf63bc3b (diff)
parent2bddc841afefd89d3de25d83f6b97b465923e455 (diff)
Merge pull request #18743 from vespa-engine/arnej/wire-consensus-map
Arnej/wire consensus map
Diffstat (limited to 'slobrok')
-rw-r--r--slobrok/src/vespa/slobrok/server/exchange_manager.cpp2
-rw-r--r--slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.cpp11
-rw-r--r--slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.h12
-rw-r--r--slobrok/src/vespa/slobrok/server/sbenv.cpp1
-rw-r--r--slobrok/src/vespa/slobrok/server/sbenv.h6
5 files changed, 24 insertions, 8 deletions
diff --git a/slobrok/src/vespa/slobrok/server/exchange_manager.cpp b/slobrok/src/vespa/slobrok/server/exchange_manager.cpp
index 632c823a2c1..7e693a9ea3d 100644
--- a/slobrok/src/vespa/slobrok/server/exchange_manager.cpp
+++ b/slobrok/src/vespa/slobrok/server/exchange_manager.cpp
@@ -39,6 +39,7 @@ ExchangeManager::addPartner(const std::string & name, const std::string & spec)
auto [ it, wasNew ] = _partners.emplace(name, std::make_unique<RemoteSlobrok>(name, spec, *this));
LOG_ASSERT(wasNew);
RemoteSlobrok & partner = *it->second;
+ partner.remoteMap().registerListener(_env.consensusMap());
partner.tryConnect();
return OkState();
}
@@ -51,6 +52,7 @@ ExchangeManager::removePartner(const std::string & name)
LOG_ASSERT(oldremote);
_partners.erase(name);
oldremote->shutdown();
+ oldremote->remoteMap().unregisterListener(_env.consensusMap());
}
std::vector<std::string>
diff --git a/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.cpp b/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.cpp
index 50a11c7b521..7141da29b6d 100644
--- a/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.cpp
+++ b/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.cpp
@@ -9,9 +9,11 @@ namespace slobrok {
LocalRpcMonitorMap::LocalRpcMonitorMap(FRT_Supervisor &supervisor)
: _map(),
+ _dispatcher(),
_history(),
_supervisor(supervisor)
{
+ _dispatcher.registerListener(_history);
}
LocalRpcMonitorMap::~LocalRpcMonitorMap() = default;
@@ -46,7 +48,7 @@ void LocalRpcMonitorMap::add(const ServiceMapping &mapping) {
mapping.name.c_str(), mapping.spec.c_str(),
exists.name().c_str(), exists.spec().c_str());
if (exists.up) {
- _history.remove(exists.mapping());
+ _dispatcher.remove(exists.mapping());
}
_map.erase(old);
}
@@ -69,9 +71,10 @@ void LocalRpcMonitorMap::remove(const ServiceMapping &mapping) {
mapping.name.c_str(),
exists.spec().c_str(),
mapping.spec.c_str());
+ return;
}
if (exists.up) {
- _history.remove(exists.mapping());
+ _dispatcher.remove(exists.mapping());
}
_map.erase(iter);
} else {
@@ -88,7 +91,7 @@ void LocalRpcMonitorMap::notifyFailedRpcSrv(ManagedRpcServer *rpcsrv, std::strin
LOG(debug, "failed: %s->%s", mapping.name.c_str(), mapping.spec.c_str());
if (psd->up) {
psd->up = false;
- _history.remove(mapping);
+ _dispatcher.remove(mapping);
}
}
@@ -100,7 +103,7 @@ void LocalRpcMonitorMap::notifyOkRpcSrv(ManagedRpcServer *rpcsrv) {
LOG(debug, "ok: %s->%s", mapping.name.c_str(), mapping.spec.c_str());
if (! psd->up) {
psd->up = true;
- _history.add(mapping);
+ _dispatcher.add(mapping);
}
}
diff --git a/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.h b/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.h
index 3f732d6875b..7d3ed6b466a 100644
--- a/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.h
+++ b/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.h
@@ -1,14 +1,16 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
-#include "map_listener.h"
-#include "map_source.h"
-#include "service_mapping.h"
-#include "service_map_history.h"
#include "i_rpc_server_manager.h"
#include "managed_rpc_server.h"
#include "map_listener.h"
+#include "map_listener.h"
+#include "map_source.h"
#include "named_service.h"
+#include "proxy_map_source.h"
+#include "service_map_history.h"
+#include "service_mapping.h"
+
#include <vector>
#include <memory>
#include <map>
@@ -37,6 +39,7 @@ private:
using Map = std::map<vespalib::string, PerService>;
Map _map;
+ ProxyMapSource _dispatcher;
ServiceMapHistory _history;
FRT_Supervisor &_supervisor;
@@ -46,6 +49,7 @@ public:
LocalRpcMonitorMap(FRT_Supervisor &_supervisor);
~LocalRpcMonitorMap();
+ MapSource &dispatcher() { return _dispatcher; }
ServiceMapHistory & history();
void add(const ServiceMapping &mapping) override;
diff --git a/slobrok/src/vespa/slobrok/server/sbenv.cpp b/slobrok/src/vespa/slobrok/server/sbenv.cpp
index 6bf5bbd985d..bf7466bb397 100644
--- a/slobrok/src/vespa/slobrok/server/sbenv.cpp
+++ b/slobrok/src/vespa/slobrok/server/sbenv.cpp
@@ -118,6 +118,7 @@ SBEnv::SBEnv(const ConfigShim &shim)
srandom(time(nullptr) ^ getpid());
_rpcsrvmap.proxy().registerListener(_globalVisibleHistory);
_rpcsrvmap.proxy().registerListener(_localRpcMonitorMap);
+ _localRpcMonitorMap.dispatcher().registerListener(_consensusMap);
_rpcHooks.initRPC(getSupervisor());
}
diff --git a/slobrok/src/vespa/slobrok/server/sbenv.h b/slobrok/src/vespa/slobrok/server/sbenv.h
index 853e72e83c5..18ac40c2ba7 100644
--- a/slobrok/src/vespa/slobrok/server/sbenv.h
+++ b/slobrok/src/vespa/slobrok/server/sbenv.h
@@ -10,6 +10,7 @@
#include "ok_state.h"
#include "local_rpc_monitor_map.h"
#include "metrics_producer.h"
+#include "union_service_map.h"
#include <vespa/config-slobroks.h>
#include <vespa/slobrok/cfg.h>
#include <vespa/vespalib/net/simple_health_producer.h>
@@ -56,6 +57,7 @@ private:
MetricsProducer _metrics;
vespalib::SimpleComponentConfigProducer _components;
LocalRpcMonitorMap _localRpcMonitorMap;
+ UnionServiceMap _consensusMap;
ServiceMapHistory _globalVisibleHistory;
public:
@@ -82,6 +84,10 @@ public:
return _localRpcMonitorMap.history();
}
+ UnionServiceMap& consensusMap() {
+ return _consensusMap;
+ }
+
const std::string & mySpec() const { return _me; }
bool isSuspended() const { return false; }