diff options
Diffstat (limited to 'slobrok/src')
-rw-r--r-- | slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.cpp | 46 | ||||
-rw-r--r-- | slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.h | 32 |
2 files changed, 71 insertions, 7 deletions
diff --git a/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.cpp b/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.cpp index b904daca069..3f8a3bf6747 100644 --- a/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.cpp +++ b/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.cpp @@ -7,6 +7,8 @@ LOG_SETUP(".slobrok.server.local_rpc_monitor_map"); namespace slobrok { +LocalRpcMonitorMap::PerService::~PerService() = default; + LocalRpcMonitorMap::LocalRpcMonitorMap(FRT_Supervisor &supervisor) : _map(), _dispatcher(), @@ -31,6 +33,31 @@ ServiceMapHistory & LocalRpcMonitorMap::history() { return _history; } +void LocalRpcMonitorMap::addLocal(const ServiceMapping &mapping, + std::unique_ptr<ScriptCommand> inflight) +{ + LOG(debug, "try local add: mapping %s->%s", + mapping.name.c_str(), mapping.spec.c_str()); + auto old = _map.find(mapping.name); + if (old != _map.end()) { + PerService & exists = old->second; + if (exists.spec() == mapping.spec) { + LOG(debug, "added mapping %s->%s was already present", + mapping.name.c_str(), mapping.spec.c_str()); + inflight->doneHandler(OkState(0, "already registered")); + return; + } + LOG(warning, "tried addLocal for mapping %s->%s, but already had conflicting mapping %s->%s", + mapping.name.c_str(), mapping.spec.c_str(), + exists.name().c_str(), exists.spec().c_str()); + inflight->doneHandler(OkState(FRTE_RPC_METHOD_FAILED, "conflict")); + return; + } + auto [ iter, was_inserted ] = + _map.try_emplace(mapping.name, localService(mapping, std::move(inflight))); + LOG_ASSERT(was_inserted); +} + void LocalRpcMonitorMap::add(const ServiceMapping &mapping) { LOG(debug, "try add: mapping %s->%s", mapping.name.c_str(), mapping.spec.c_str()); @@ -40,6 +67,7 @@ void LocalRpcMonitorMap::add(const ServiceMapping &mapping) { if (exists.spec() == mapping.spec) { LOG(debug, "added mapping %s->%s was already present", mapping.name.c_str(), mapping.spec.c_str()); + exists.localOnly = false; return; } LOG(warning, "added mapping %s->%s, but already had conflicting mapping %s->%s", @@ -51,11 +79,7 @@ void LocalRpcMonitorMap::add(const ServiceMapping &mapping) { _map.erase(old); } auto [ iter, was_inserted ] = - _map.try_emplace(mapping.name, - false, - std::make_unique<ManagedRpcServer>(mapping.name, - mapping.spec, - *this)); + _map.try_emplace(mapping.name, globalService(mapping)); LOG_ASSERT(was_inserted); } @@ -86,10 +110,18 @@ void LocalRpcMonitorMap::notifyFailedRpcSrv(ManagedRpcServer *rpcsrv, std::strin auto &psd = lookup(mapping); LOG_ASSERT(psd.srv.get() == rpcsrv); LOG(debug, "failed: %s->%s", mapping.name.c_str(), mapping.spec.c_str()); + if (psd.inflight) { + auto target = std::move(psd.inflight); + target->doneHandler(OkState(13, "failed check using listNames callback")); + } if (psd.up) { psd.up = false; _dispatcher.remove(mapping); } + if (psd.localOnly) { + auto iter = _map.find(mapping.name); + _map.erase(iter); + } } void LocalRpcMonitorMap::notifyOkRpcSrv(ManagedRpcServer *rpcsrv) { @@ -97,6 +129,10 @@ void LocalRpcMonitorMap::notifyOkRpcSrv(ManagedRpcServer *rpcsrv) { auto &psd = lookup(mapping); LOG_ASSERT(psd.srv.get() == rpcsrv); LOG(debug, "ok: %s->%s", mapping.name.c_str(), mapping.spec.c_str()); + if (psd.inflight) { + auto target = std::move(psd.inflight); + target->doneHandler(OkState()); + } if (! psd.up) { psd.up = true; _dispatcher.add(mapping); diff --git a/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.h b/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.h index fa69b24c5d9..e88425ceb16 100644 --- a/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.h +++ b/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.h @@ -1,6 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once +#include "cmd.h" #include "i_rpc_server_manager.h" #include "managed_rpc_server.h" #include "map_listener.h" @@ -29,13 +30,37 @@ class LocalRpcMonitorMap : public IRpcServerManager, { private: struct PerService { - bool up; - std::unique_ptr<ManagedRpcServer> srv; + bool up = false; + bool localOnly = false; + std::unique_ptr<ScriptCommand> inflight = {}; + std::unique_ptr<ManagedRpcServer> srv = {}; + ~PerService(); + PerService() = default; + + PerService(PerService &&) = default; + PerService& operator=(PerService &&) = default; + vespalib::string name() { return srv->getName(); } vespalib::string spec() { return srv->getSpec(); } ServiceMapping mapping() { return ServiceMapping{srv->getName(), srv->getSpec()}; } }; + PerService localService(const ServiceMapping &mapping, + std::unique_ptr<ScriptCommand> inflight) + { + PerService result; + result.localOnly = true; + result.inflight = std::move(inflight); + result.srv = std::make_unique<ManagedRpcServer>(mapping.name, mapping.spec, *this); + return result; + } + + PerService globalService(const ServiceMapping &mapping) { + PerService result; + result.srv = std::make_unique<ManagedRpcServer>(mapping.name, mapping.spec, *this); + return result; + } + using Map = std::map<vespalib::string, PerService>; Map _map; @@ -53,6 +78,9 @@ public: MapSource &dispatcher() { return _dispatcher; } ServiceMapHistory & history(); + /** for use by register API, will call doneHandler() on inflight script */ + void addLocal(const ServiceMapping &mapping, + std::unique_ptr<ScriptCommand> inflight); void add(const ServiceMapping &mapping) override; void remove(const ServiceMapping &mapping) override; |