summaryrefslogtreecommitdiffstats
path: root/slobrok
diff options
context:
space:
mode:
authorArne Juul <arnej@verizonmedia.com>2021-08-16 12:15:37 +0000
committerArne Juul <arnej@verizonmedia.com>2021-08-17 08:58:55 +0000
commit296fe4d0e9bac2539384676e3b1ea80e21328727 (patch)
treea9190a603591bcd671cd4ea9092299f2d6df6ac0 /slobrok
parent814cc1414ce1560ca3cb1b60e118e0ed694d1bd9 (diff)
new LocalRpcMonitorMap::addLocal() API
* for mappings that are not in the consensus map yet * handles inflight script, this will be used to send answer to "register" RPC requests
Diffstat (limited to 'slobrok')
-rw-r--r--slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.cpp46
-rw-r--r--slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.h32
2 files changed, 71 insertions, 7 deletions
diff --git a/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.cpp b/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.cpp
index b904daca069..3f8a3bf6747 100644
--- a/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.cpp
+++ b/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.cpp
@@ -7,6 +7,8 @@ LOG_SETUP(".slobrok.server.local_rpc_monitor_map");
namespace slobrok {
+LocalRpcMonitorMap::PerService::~PerService() = default;
+
LocalRpcMonitorMap::LocalRpcMonitorMap(FRT_Supervisor &supervisor)
: _map(),
_dispatcher(),
@@ -31,6 +33,31 @@ ServiceMapHistory & LocalRpcMonitorMap::history() {
return _history;
}
+void LocalRpcMonitorMap::addLocal(const ServiceMapping &mapping,
+ std::unique_ptr<ScriptCommand> inflight)
+{
+ LOG(debug, "try local add: mapping %s->%s",
+ mapping.name.c_str(), mapping.spec.c_str());
+ auto old = _map.find(mapping.name);
+ if (old != _map.end()) {
+ PerService & exists = old->second;
+ if (exists.spec() == mapping.spec) {
+ LOG(debug, "added mapping %s->%s was already present",
+ mapping.name.c_str(), mapping.spec.c_str());
+ inflight->doneHandler(OkState(0, "already registered"));
+ return;
+ }
+ LOG(warning, "tried addLocal for mapping %s->%s, but already had conflicting mapping %s->%s",
+ mapping.name.c_str(), mapping.spec.c_str(),
+ exists.name().c_str(), exists.spec().c_str());
+ inflight->doneHandler(OkState(FRTE_RPC_METHOD_FAILED, "conflict"));
+ return;
+ }
+ auto [ iter, was_inserted ] =
+ _map.try_emplace(mapping.name, localService(mapping, std::move(inflight)));
+ LOG_ASSERT(was_inserted);
+}
+
void LocalRpcMonitorMap::add(const ServiceMapping &mapping) {
LOG(debug, "try add: mapping %s->%s",
mapping.name.c_str(), mapping.spec.c_str());
@@ -40,6 +67,7 @@ void LocalRpcMonitorMap::add(const ServiceMapping &mapping) {
if (exists.spec() == mapping.spec) {
LOG(debug, "added mapping %s->%s was already present",
mapping.name.c_str(), mapping.spec.c_str());
+ exists.localOnly = false;
return;
}
LOG(warning, "added mapping %s->%s, but already had conflicting mapping %s->%s",
@@ -51,11 +79,7 @@ void LocalRpcMonitorMap::add(const ServiceMapping &mapping) {
_map.erase(old);
}
auto [ iter, was_inserted ] =
- _map.try_emplace(mapping.name,
- false,
- std::make_unique<ManagedRpcServer>(mapping.name,
- mapping.spec,
- *this));
+ _map.try_emplace(mapping.name, globalService(mapping));
LOG_ASSERT(was_inserted);
}
@@ -86,10 +110,18 @@ void LocalRpcMonitorMap::notifyFailedRpcSrv(ManagedRpcServer *rpcsrv, std::strin
auto &psd = lookup(mapping);
LOG_ASSERT(psd.srv.get() == rpcsrv);
LOG(debug, "failed: %s->%s", mapping.name.c_str(), mapping.spec.c_str());
+ if (psd.inflight) {
+ auto target = std::move(psd.inflight);
+ target->doneHandler(OkState(13, "failed check using listNames callback"));
+ }
if (psd.up) {
psd.up = false;
_dispatcher.remove(mapping);
}
+ if (psd.localOnly) {
+ auto iter = _map.find(mapping.name);
+ _map.erase(iter);
+ }
}
void LocalRpcMonitorMap::notifyOkRpcSrv(ManagedRpcServer *rpcsrv) {
@@ -97,6 +129,10 @@ void LocalRpcMonitorMap::notifyOkRpcSrv(ManagedRpcServer *rpcsrv) {
auto &psd = lookup(mapping);
LOG_ASSERT(psd.srv.get() == rpcsrv);
LOG(debug, "ok: %s->%s", mapping.name.c_str(), mapping.spec.c_str());
+ if (psd.inflight) {
+ auto target = std::move(psd.inflight);
+ target->doneHandler(OkState());
+ }
if (! psd.up) {
psd.up = true;
_dispatcher.add(mapping);
diff --git a/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.h b/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.h
index fa69b24c5d9..e88425ceb16 100644
--- a/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.h
+++ b/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.h
@@ -1,6 +1,7 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
+#include "cmd.h"
#include "i_rpc_server_manager.h"
#include "managed_rpc_server.h"
#include "map_listener.h"
@@ -29,13 +30,37 @@ class LocalRpcMonitorMap : public IRpcServerManager,
{
private:
struct PerService {
- bool up;
- std::unique_ptr<ManagedRpcServer> srv;
+ bool up = false;
+ bool localOnly = false;
+ std::unique_ptr<ScriptCommand> inflight = {};
+ std::unique_ptr<ManagedRpcServer> srv = {};
+ ~PerService();
+ PerService() = default;
+
+ PerService(PerService &&) = default;
+ PerService& operator=(PerService &&) = default;
+
vespalib::string name() { return srv->getName(); }
vespalib::string spec() { return srv->getSpec(); }
ServiceMapping mapping() { return ServiceMapping{srv->getName(), srv->getSpec()}; }
};
+ PerService localService(const ServiceMapping &mapping,
+ std::unique_ptr<ScriptCommand> inflight)
+ {
+ PerService result;
+ result.localOnly = true;
+ result.inflight = std::move(inflight);
+ result.srv = std::make_unique<ManagedRpcServer>(mapping.name, mapping.spec, *this);
+ return result;
+ }
+
+ PerService globalService(const ServiceMapping &mapping) {
+ PerService result;
+ result.srv = std::make_unique<ManagedRpcServer>(mapping.name, mapping.spec, *this);
+ return result;
+ }
+
using Map = std::map<vespalib::string, PerService>;
Map _map;
@@ -53,6 +78,9 @@ public:
MapSource &dispatcher() { return _dispatcher; }
ServiceMapHistory & history();
+ /** for use by register API, will call doneHandler() on inflight script */
+ void addLocal(const ServiceMapping &mapping,
+ std::unique_ptr<ScriptCommand> inflight);
void add(const ServiceMapping &mapping) override;
void remove(const ServiceMapping &mapping) override;