summaryrefslogtreecommitdiffstats
path: root/slobrok
diff options
context:
space:
mode:
authorArne Juul <arnej@verizonmedia.com>2021-08-20 10:19:53 +0000
committerArne Juul <arnej@verizonmedia.com>2021-08-20 10:19:53 +0000
commit8e7e1ecbd1117723d02b51df80861e3a05b4fb79 (patch)
tree96e36a04bc0727873abe81592520a960b507f8ba /slobrok
parentc55a73fbc972fc1194877dc895ff90906a57d569 (diff)
use new MappingMonitor
Diffstat (limited to 'slobrok')
-rw-r--r--slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.cpp74
-rw-r--r--slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.h42
-rw-r--r--slobrok/src/vespa/slobrok/server/sbenv.cpp5
-rw-r--r--slobrok/src/vespa/slobrok/server/sbenv.h1
4 files changed, 56 insertions, 66 deletions
diff --git a/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.cpp b/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.cpp
index 485e6d27a1c..fe7c7dd2c36 100644
--- a/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.cpp
+++ b/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.cpp
@@ -11,10 +11,6 @@ namespace slobrok {
#pragma GCC diagnostic ignored "-Winline"
void LocalRpcMonitorMap::DelayedTasks::PerformTask() {
- {
- std::vector<MUP> deleteAfterSwap;
- std::swap(deleteAfterSwap, _deleteList);
- }
std::vector<Event> todo;
std::swap(todo, _queue);
for (const auto & entry : todo) {
@@ -29,12 +25,13 @@ void LocalRpcMonitorMap::DelayedTasks::PerformTask() {
}
}
-LocalRpcMonitorMap::LocalRpcMonitorMap(FRT_Supervisor &supervisor)
+LocalRpcMonitorMap::LocalRpcMonitorMap(FRT_Supervisor &supervisor,
+ MappingMonitorFactory mappingMonitorFactory)
: _delayedTasks(supervisor.GetScheduler(), *this),
_map(),
_dispatcher(),
_history(),
- _supervisor(supervisor),
+ _mappingMonitor(mappingMonitorFactory(*this)),
_subscription(MapSubscription::subscribe(_dispatcher, _history))
{
}
@@ -42,15 +39,20 @@ LocalRpcMonitorMap::LocalRpcMonitorMap(FRT_Supervisor &supervisor)
LocalRpcMonitorMap::~LocalRpcMonitorMap() = default;
LocalRpcMonitorMap::PerService *
-LocalRpcMonitorMap::lookup(ManagedRpcServer *rpcsrv) {
- auto iter = _map.find(rpcsrv->getName());
+LocalRpcMonitorMap::lookup(const ServiceMapping &mapping) {
+ LOG(spam, "lookup %s->%s", mapping.name.c_str(), mapping.spec.c_str());
+ auto iter = _map.find(mapping.name);
if (iter == _map.end()) {
- return nullptr;
+ LOG_ABORT("not in map");
}
PerService & psd = iter->second;
- if (psd.srv.get() != rpcsrv) {
- return nullptr;
+ if (psd.spec != mapping.spec) {
+ LOG_ABORT("conflict in map: %s->%s");
}
+ LOG(spam, "found in map: %s->%s [%s,%s]",
+ iter->first.c_str(), psd.spec.c_str(),
+ psd.up ? "up" : "down",
+ psd.localOnly ? "local" : "global");
return &psd;
}
@@ -66,7 +68,7 @@ void LocalRpcMonitorMap::addLocal(const ServiceMapping &mapping,
auto old = _map.find(mapping.name);
if (old != _map.end()) {
const PerService & exists = old->second;
- if (exists.spec() == mapping.spec) {
+ if (exists.spec == mapping.spec) {
LOG(debug, "added mapping %s->%s was already present",
mapping.name.c_str(), mapping.spec.c_str());
inflight->doneHandler(OkState(0, "already registered"));
@@ -74,13 +76,14 @@ void LocalRpcMonitorMap::addLocal(const ServiceMapping &mapping,
}
LOG(warning, "tried addLocal for mapping %s->%s, but already had conflicting mapping %s->%s",
mapping.name.c_str(), mapping.spec.c_str(),
- exists.name().c_str(), exists.spec().c_str());
+ mapping.name.c_str(), exists.spec.c_str());
inflight->doneHandler(OkState(FRTE_RPC_METHOD_FAILED, "conflict"));
return;
}
auto [ iter, was_inserted ] =
_map.try_emplace(mapping.name, localService(mapping, std::move(inflight)));
LOG_ASSERT(was_inserted);
+ _mappingMonitor->start(mapping);
}
void LocalRpcMonitorMap::add(const ServiceMapping &mapping) {
@@ -97,41 +100,45 @@ void LocalRpcMonitorMap::doAdd(const ServiceMapping &mapping) {
auto old = _map.find(mapping.name);
if (old != _map.end()) {
PerService & exists = old->second;
- if (exists.spec() == mapping.spec) {
+ if (exists.spec == mapping.spec) {
LOG(debug, "added mapping %s->%s was already present",
mapping.name.c_str(), mapping.spec.c_str());
exists.localOnly = false;
return;
}
PerService removed = std::move(exists);
+ ServiceMapping toRemove{old->first, removed.spec};
+ _mappingMonitor->stop(toRemove);
_map.erase(old);
LOG(warning, "added mapping %s->%s, but already had conflicting mapping %s->%s",
mapping.name.c_str(), mapping.spec.c_str(),
- removed.name().c_str(), removed.spec().c_str());
+ toRemove.name.c_str(), removed.spec.c_str());
if (removed.inflight) {
auto target = std::move(removed.inflight);
target->doneHandler(OkState(13, "conflict during initialization"));
}
if (removed.up) {
- _dispatcher.remove(removed.mapping());
+ _dispatcher.remove(toRemove);
}
- _delayedTasks.deleteLater(std::move(removed.srv));
}
auto [ iter, was_inserted ] =
_map.try_emplace(mapping.name, globalService(mapping));
LOG_ASSERT(was_inserted);
+ _mappingMonitor->start(mapping);
}
void LocalRpcMonitorMap::doRemove(const ServiceMapping &mapping) {
auto iter = _map.find(mapping.name);
if (iter != _map.end()) {
PerService removed = std::move(iter->second);
+ ServiceMapping toRemove{iter->first, removed.spec};
+ _mappingMonitor->stop(toRemove);
_map.erase(iter);
LOG(debug, "remove: mapping %s->%s", mapping.name.c_str(), mapping.spec.c_str());
- if (mapping.spec != removed.spec()) {
+ if (mapping.spec != removed.spec) {
LOG(warning, "inconsistent specs for name '%s': had '%s', but was asked to remove '%s'",
mapping.name.c_str(),
- removed.spec().c_str(),
+ removed.spec.c_str(),
mapping.spec.c_str());
}
if (removed.inflight) {
@@ -139,53 +146,48 @@ void LocalRpcMonitorMap::doRemove(const ServiceMapping &mapping) {
target->doneHandler(OkState(13, "removed during initialization"));
}
if (removed.up) {
- _dispatcher.remove(removed.mapping());
+ _dispatcher.remove(toRemove);
}
- _delayedTasks.deleteLater(std::move(removed.srv));
} else {
LOG(debug, "tried to remove non-existing mapping %s->%s",
mapping.name.c_str(), mapping.spec.c_str());
}
}
-void LocalRpcMonitorMap::notifyFailedRpcSrv(ManagedRpcServer *rpcsrv, std::string) {
- if (auto *psd = lookup(rpcsrv)) {
- LOG(debug, "failed: %s->%s", psd->name().c_str(), psd->spec().c_str());
+void LocalRpcMonitorMap::down(const ServiceMapping& mapping) {
+ if (auto *psd = lookup(mapping)) {
+ LOG(debug, "failed: %s->%s", mapping.name.c_str(), psd->spec.c_str());
if (psd->inflight) {
auto target = std::move(psd->inflight);
target->doneHandler(OkState(13, "failed check using listNames callback"));
}
if (psd->localOnly) {
PerService removed = std::move(*psd);
- auto iter = _map.find(removed.name());
+ auto iter = _map.find(mapping.name);
+ _mappingMonitor->stop(mapping);
_map.erase(iter);
if (removed.up) {
- _dispatcher.remove(removed.mapping());
+ _dispatcher.remove(mapping);
}
- _delayedTasks.deleteLater(std::move(removed.srv));
} else if (psd->up) {
psd->up = false;
- _dispatcher.remove(psd->mapping());
+ _dispatcher.remove(mapping);
}
}
}
-void LocalRpcMonitorMap::notifyOkRpcSrv(ManagedRpcServer *rpcsrv) {
- if (auto *psd = lookup(rpcsrv)) {
- LOG(debug, "ok: %s->%s", psd->name().c_str(), psd->spec().c_str());
+void LocalRpcMonitorMap::up(const ServiceMapping& mapping) {
+ if (auto *psd = lookup(mapping)) {
+ LOG(debug, "ok: %s->%s", mapping.name.c_str(), psd->spec.c_str());
if (psd->inflight) {
auto target = std::move(psd->inflight);
target->doneHandler(OkState());
}
if (! psd->up) {
psd->up = true;
- _dispatcher.add(psd->mapping());
+ _dispatcher.add(mapping);
}
}
}
-FRT_Supervisor * LocalRpcMonitorMap::getSupervisor() {
- return &_supervisor;
-}
-
} // namespace slobrok
diff --git a/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.h b/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.h
index 8961e21f386..a0971a8c80b 100644
--- a/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.h
+++ b/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.h
@@ -2,10 +2,10 @@
#pragma once
#include "cmd.h"
-#include "i_rpc_server_manager.h"
#include "managed_rpc_server.h"
#include "map_listener.h"
#include "map_source.h"
+#include "mapping_monitor.h"
#include "named_service.h"
#include "proxy_map_source.h"
#include "service_map_history.h"
@@ -24,8 +24,8 @@ namespace slobrok {
* Tracks up/down status for name->spec combinations
* that are considered for publication locally.
**/
-class LocalRpcMonitorMap : public IRpcServerManager,
- public MapListener
+class LocalRpcMonitorMap : public MapListener,
+ public MappingMonitorOwner
{
private:
enum class EventType { ADD, REMOVE };
@@ -42,16 +42,9 @@ private:
};
class DelayedTasks : public FNET_Task {
- using MUP = std::unique_ptr<ManagedRpcServer>;
- std::vector<MUP> _deleteList;
std::vector<Event> _queue;
LocalRpcMonitorMap &_target;
public:
- void deleteLater(MUP rpcsrv) {
- _deleteList.emplace_back(std::move(rpcsrv));
- ScheduleNow();
- }
-
void handleLater(Event event) {
_queue.emplace_back(std::move(event));
ScheduleNow();
@@ -61,7 +54,6 @@ private:
DelayedTasks(FNET_Scheduler *scheduler, LocalRpcMonitorMap &target)
: FNET_Task(scheduler),
- _deleteList(),
_queue(),
_target(target)
{}
@@ -75,17 +67,9 @@ private:
bool up;
bool localOnly;
std::unique_ptr<ScriptCommand> inflight;
- std::unique_ptr<ManagedRpcServer> srv;
-
- vespalib::string name() const { return srv->getName(); }
- vespalib::string spec() const { return srv->getSpec(); }
- ServiceMapping mapping() const { return ServiceMapping{srv->getName(), srv->getSpec()}; }
+ vespalib::string spec;
};
- std::unique_ptr<ManagedRpcServer> managedFor(const ServiceMapping &mapping) {
- return std::make_unique<ManagedRpcServer>(mapping.name, mapping.spec, *this);
- }
-
PerService localService(const ServiceMapping &mapping,
std::unique_ptr<ScriptCommand> inflight)
{
@@ -93,7 +77,7 @@ private:
.up = false,
.localOnly = true,
.inflight = std::move(inflight),
- .srv = managedFor(mapping)
+ .spec = mapping.spec
};
}
@@ -102,7 +86,7 @@ private:
.up = false,
.localOnly = false,
.inflight = {},
- .srv = managedFor(mapping)
+ .spec = mapping.spec
};
}
@@ -111,16 +95,17 @@ private:
Map _map;
ProxyMapSource _dispatcher;
ServiceMapHistory _history;
- FRT_Supervisor &_supervisor;
+ MappingMonitor::UP _mappingMonitor;
std::unique_ptr<MapSubscription> _subscription;
- PerService *lookup(ManagedRpcServer *rpcsrv);
-
void doAdd(const ServiceMapping &mapping);
void doRemove(const ServiceMapping &mapping);
+
+ PerService * lookup(const ServiceMapping &mapping);
public:
- LocalRpcMonitorMap(FRT_Supervisor &_supervisor);
+ LocalRpcMonitorMap(FRT_Supervisor &supervisor,
+ MappingMonitorFactory mappingMonitorFactory);
~LocalRpcMonitorMap();
MapSource &dispatcher() { return _dispatcher; }
@@ -133,9 +118,8 @@ public:
void add(const ServiceMapping &mapping) override;
void remove(const ServiceMapping &mapping) override;
- void notifyFailedRpcSrv(ManagedRpcServer *rpcsrv, std::string errmsg) override;
- void notifyOkRpcSrv(ManagedRpcServer *rpcsrv) override;
- FRT_Supervisor *getSupervisor() override;
+ void up(const ServiceMapping& mapping) override;
+ void down(const ServiceMapping& mapping) override;
};
//-----------------------------------------------------------------------------
diff --git a/slobrok/src/vespa/slobrok/server/sbenv.cpp b/slobrok/src/vespa/slobrok/server/sbenv.cpp
index 1b52b85f736..d8a3e62f44e 100644
--- a/slobrok/src/vespa/slobrok/server/sbenv.cpp
+++ b/slobrok/src/vespa/slobrok/server/sbenv.cpp
@@ -110,7 +110,10 @@ SBEnv::SBEnv(const ConfigShim &shim)
_health(),
_metrics(_rpcHooks, *_transport),
_components(),
- _localRpcMonitorMap(*_supervisor),
+ _localRpcMonitorMap(*_supervisor,
+ [this] (MappingMonitorOwner &owner) {
+ return std::make_unique<RpcMappingMonitor>(*_supervisor, owner);
+ }),
_rpcsrvmanager(*this),
_exchanger(*this, _rpcsrvmap),
_rpcsrvmap()
diff --git a/slobrok/src/vespa/slobrok/server/sbenv.h b/slobrok/src/vespa/slobrok/server/sbenv.h
index b744a2d1d92..576e5d4b30c 100644
--- a/slobrok/src/vespa/slobrok/server/sbenv.h
+++ b/slobrok/src/vespa/slobrok/server/sbenv.h
@@ -2,6 +2,7 @@
#pragma once
#include "named_service.h"
+#include "rpc_mapping_monitor.h"
#include "rpc_server_map.h"
#include "rpc_server_manager.h"
#include "remote_slobrok.h"