summaryrefslogtreecommitdiffstats
path: root/slobrok
diff options
context:
space:
mode:
Diffstat (limited to 'slobrok')
-rw-r--r--slobrok/CMakeLists.txt1
-rw-r--r--slobrok/src/apps/slobrok/slobrok.cpp7
-rw-r--r--slobrok/src/tests/local_rpc_monitor_map/CMakeLists.txt9
-rw-r--r--slobrok/src/tests/local_rpc_monitor_map/local_rpc_monitor_map_test.cpp331
-rw-r--r--slobrok/src/vespa/slobrok/server/cmd.cpp13
-rw-r--r--slobrok/src/vespa/slobrok/server/cmd.h1
-rw-r--r--slobrok/src/vespa/slobrok/server/exchange_manager.cpp24
-rw-r--r--slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.cpp86
-rw-r--r--slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.h22
-rw-r--r--slobrok/src/vespa/slobrok/server/rpchooks.cpp123
-rw-r--r--slobrok/src/vespa/slobrok/server/rpchooks.h8
-rw-r--r--slobrok/src/vespa/slobrok/server/sbenv.cpp17
-rw-r--r--slobrok/src/vespa/slobrok/server/sbenv.h3
-rw-r--r--slobrok/src/vespa/slobrok/server/union_service_map.cpp13
-rw-r--r--slobrok/src/vespa/slobrok/server/union_service_map.h2
15 files changed, 631 insertions, 29 deletions
diff --git a/slobrok/CMakeLists.txt b/slobrok/CMakeLists.txt
index 6acbf5d5134..c6c6313cf68 100644
--- a/slobrok/CMakeLists.txt
+++ b/slobrok/CMakeLists.txt
@@ -19,6 +19,7 @@ vespa_define_module(
TESTS
src/tests/backoff
src/tests/configure
+ src/tests/local_rpc_monitor_map
src/tests/mirrorapi
src/tests/registerapi
src/tests/service_map_history
diff --git a/slobrok/src/apps/slobrok/slobrok.cpp b/slobrok/src/apps/slobrok/slobrok.cpp
index 63212745644..b2748762a12 100644
--- a/slobrok/src/apps/slobrok/slobrok.cpp
+++ b/slobrok/src/apps/slobrok/slobrok.cpp
@@ -50,6 +50,7 @@ App::Main()
{
uint32_t portnum = 2773;
vespalib::string cfgId;
+ bool useNewLogic = false;
int argi = 1;
const char* optArg;
@@ -63,7 +64,7 @@ App::Main()
portnum = atoi(optArg);
break;
case 'N':
- // ignore flag for now
+ useNewLogic = true;
break;
default:
LOG(error, "unknown option letter '%c'", c);
@@ -75,11 +76,11 @@ App::Main()
if (cfgId.empty()) {
LOG(debug, "no config id specified");
ConfigShim shim(portnum);
- mainobj = std::make_unique<SBEnv>(shim);
+ mainobj = std::make_unique<SBEnv>(shim, useNewLogic);
} else {
ConfigShim shim(portnum, cfgId);
shim.enableStateServer(true);
- mainobj = std::make_unique<SBEnv>(shim);
+ mainobj = std::make_unique<SBEnv>(shim, useNewLogic);
}
hook_sigterm();
res = mainobj->MainLoop();
diff --git a/slobrok/src/tests/local_rpc_monitor_map/CMakeLists.txt b/slobrok/src/tests/local_rpc_monitor_map/CMakeLists.txt
new file mode 100644
index 00000000000..aa30021939c
--- /dev/null
+++ b/slobrok/src/tests/local_rpc_monitor_map/CMakeLists.txt
@@ -0,0 +1,9 @@
+# Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+vespa_add_executable(slobrok_local_rpc_monitor_map_test_app TEST
+ SOURCES
+ local_rpc_monitor_map_test.cpp
+ DEPENDS
+ slobrok_slobrokserver
+ GTest::GTest
+)
+vespa_add_test(NAME slobrok_local_rpc_monitor_map_test_app COMMAND slobrok_local_rpc_monitor_map_test_app)
diff --git a/slobrok/src/tests/local_rpc_monitor_map/local_rpc_monitor_map_test.cpp b/slobrok/src/tests/local_rpc_monitor_map/local_rpc_monitor_map_test.cpp
new file mode 100644
index 00000000000..9782f6ccbdc
--- /dev/null
+++ b/slobrok/src/tests/local_rpc_monitor_map/local_rpc_monitor_map_test.cpp
@@ -0,0 +1,331 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/vespalib/gtest/gtest.h>
+#include <vespa/slobrok/server/local_rpc_monitor_map.h>
+#include <vespa/vespalib/util/stringfmt.h>
+#include <vespa/vespalib/util/time.h>
+#include <vespa/fnet/scheduler.h>
+#include <map>
+
+using namespace vespalib;
+using namespace slobrok;
+using vespalib::make_string_short::fmt;
+
+struct MapCall {
+ vespalib::string name;
+ ServiceMapping mapping;
+ ServiceMapping old;
+ static MapCall add(const ServiceMapping &m) { return {"add", m, {"",""}}; }
+ static MapCall remove(const ServiceMapping &m) { return {"remove", m, {"",""}}; }
+ static MapCall update(const ServiceMapping &o, const ServiceMapping &m) { return {"update", m, o}; }
+ void check(const MapCall &rhs) const {
+ EXPECT_EQ(name, rhs.name);
+ EXPECT_EQ(mapping, rhs.mapping);
+ EXPECT_EQ(old, rhs.old);
+ }
+ ~MapCall();
+};
+MapCall::~MapCall() = default;
+
+struct MonitorCall {
+ vespalib::string name;
+ ServiceMapping mapping;
+ bool hurry;
+ static MonitorCall start(const ServiceMapping &m, bool h) { return {"start", m, h}; }
+ static MonitorCall stop(const ServiceMapping &m) { return {"stop", m, false}; }
+ void check(const MonitorCall &rhs) const {
+ EXPECT_EQ(name, rhs.name);
+ EXPECT_EQ(mapping, rhs.mapping);
+ EXPECT_EQ(hurry, rhs.hurry);
+ }
+ ~MonitorCall();
+};
+MonitorCall::~MonitorCall() = default;
+
+template <typename Call>
+class CallLog {
+private:
+ std::vector<Call> _calls;
+ size_t _checked;
+public:
+ CallLog() noexcept : _calls(), _checked(0) {}
+ ~CallLog() { EXPECT_EQ(_calls.size(), _checked); }
+ void log(Call call) { _calls.push_back(call); }
+ void expect(std::initializer_list<Call> list) {
+ ASSERT_EQ(list.size(), (_calls.size() - _checked));
+ for (const auto &call: list) {
+ call.check(_calls[_checked++]);
+ }
+ }
+};
+
+struct MapLog : CallLog<MapCall>, MapListener {
+ void add(const ServiceMapping &mapping) override {
+ log(MapCall::add(mapping));
+ }
+ void remove(const ServiceMapping &mapping) override {
+ log(MapCall::remove(mapping));
+ }
+ void update(const ServiceMapping &old_mapping,
+ const ServiceMapping &new_mapping) override
+ {
+ log(MapCall::update(old_mapping, new_mapping));
+ }
+};
+
+struct MonitorLog : CallLog<MonitorCall>, MappingMonitor {
+ void start(const ServiceMapping& mapping, bool hurry) override {
+ log(MonitorCall::start(mapping, hurry));
+ }
+ void stop(const ServiceMapping& mapping) override {
+ log(MonitorCall::stop(mapping));
+ }
+};
+
+struct MyMappingMonitor : MappingMonitor {
+ MonitorLog &monitor;
+ MyMappingMonitor(MonitorLog &m) : monitor(m) {}
+ void start(const ServiceMapping& mapping, bool hurry) override {
+ monitor.start(mapping, hurry);
+ }
+ void stop(const ServiceMapping& mapping) override {
+ monitor.stop(mapping);
+ }
+};
+
+struct LocalRpcMonitorMapTest : public ::testing::Test {
+ steady_time time;
+ FNET_Scheduler scheduler;
+ MonitorLog monitor_log;
+ MapLog map_log;
+ LocalRpcMonitorMap map;
+ std::unique_ptr<MapSubscription> subscription;
+ ServiceMapping mapping;
+ ServiceMapping mapping_conflict;
+ LocalRpcMonitorMapTest()
+ : time(duration::zero()),
+ scheduler(&time, &time), monitor_log(), map_log(),
+ map(&scheduler, [this](auto &owner)
+ {
+ EXPECT_EQ(&owner, &map);
+ return std::make_unique<MyMappingMonitor>(monitor_log);
+ }),
+ subscription(MapSubscription::subscribe(map.dispatcher(), map_log)),
+ mapping("dummy_service", "dummy_spec"),
+ mapping_conflict("dummy_service", "conflicting_dummy_spec")
+ {}
+ void tick(duration elapsed = FNET_Scheduler::tick_ms) {
+ time += elapsed;
+ scheduler.CheckTasks();
+ }
+ void add_mapping(const ServiceMapping &m, bool is_up) {
+ map.add(m); // <- add from consensus map
+ monitor_log.expect({});
+ tick(0ms); // <- process delayed add event
+ monitor_log.expect({MonitorCall::start(m, false)});
+ map_log.expect({});
+ if (is_up) {
+ map.up(m); // <- up from monitor
+ map_log.expect({MapCall::add(m)});
+ } else {
+ map.down(m); // <- down from monitor
+ map_log.expect({});
+ }
+ }
+ void flip_up_state(const ServiceMapping &m, bool was_up, size_t cnt) {
+ for (size_t i = 0; i < cnt; ++i) {
+ if (was_up) {
+ map.up(m);
+ map_log.expect({});
+ map.down(m);
+ map_log.expect({MapCall::remove(m)});
+ } else {
+ map.down(m);
+ map_log.expect({});
+ map.up(m);
+ map_log.expect({MapCall::add(m)});
+ }
+ was_up = !was_up;
+ }
+ monitor_log.expect({});
+ }
+ void remove_mapping(const ServiceMapping &m, bool was_up) {
+ map.remove(m); // <- remove from consensus map
+ monitor_log.expect({});
+ tick(0ms); // <- process delayed remove event
+ monitor_log.expect({MonitorCall::stop(m)});
+ if (was_up) {
+ map_log.expect({MapCall::remove(m)});
+ } else {
+ map_log.expect({});
+ }
+ }
+ ~LocalRpcMonitorMapTest();
+};
+LocalRpcMonitorMapTest::~LocalRpcMonitorMapTest() = default;
+
+struct MyAddLocalHandler : LocalRpcMonitorMap::AddLocalCompletionHandler {
+ std::unique_ptr<OkState> &state;
+ bool &handler_deleted;
+ MyAddLocalHandler(std::unique_ptr<OkState> &s, bool &hd)
+ : state(s), handler_deleted(hd) {}
+ void doneHandler(OkState result) override {
+ state = std::make_unique<OkState>(result);
+ }
+ ~MyAddLocalHandler() override {
+ handler_deleted = true;
+ }
+};
+
+TEST_F(LocalRpcMonitorMapTest, external_add_remove_while_up) {
+ add_mapping(mapping, true);
+ remove_mapping(mapping, true);
+}
+
+TEST_F(LocalRpcMonitorMapTest, external_add_remove_while_down) {
+ add_mapping(mapping, false);
+ remove_mapping(mapping, false);
+}
+
+TEST_F(LocalRpcMonitorMapTest, server_up_down_up_down) {
+ add_mapping(mapping, true);
+ flip_up_state(mapping, true, 3);
+ remove_mapping(mapping, false);
+}
+
+TEST_F(LocalRpcMonitorMapTest, server_down_up_down_up) {
+ add_mapping(mapping, false);
+ flip_up_state(mapping, false, 3);
+ remove_mapping(mapping, true);
+}
+
+TEST_F(LocalRpcMonitorMapTest, multi_mapping) {
+ ServiceMapping m1("dummy_service1", "dummy_spec1");
+ ServiceMapping m2("dummy_service2", "dummy_spec2");
+ ServiceMapping m3("dummy_service3", "dummy_spec3");
+ add_mapping(m1, true);
+ add_mapping(m2, false);
+ add_mapping(m3, true);
+ flip_up_state(m1, true, 3);
+ flip_up_state(m2, false, 3);
+ flip_up_state(m3, true, 3);
+ remove_mapping(m1, false);
+ remove_mapping(m2, true);
+ remove_mapping(m3, false);
+}
+
+TEST_F(LocalRpcMonitorMapTest, local_add_ok) {
+ std::unique_ptr<OkState> state;
+ bool handler_deleted;
+ map.addLocal(mapping, std::make_unique<MyAddLocalHandler>(state, handler_deleted));
+ monitor_log.expect({MonitorCall::start(mapping, true)});
+ map_log.expect({});
+ map.up(mapping);
+ monitor_log.expect({});
+ map_log.expect({MapCall::add(mapping)});
+ ASSERT_TRUE(state);
+ EXPECT_TRUE(state->ok());
+ ASSERT_TRUE(handler_deleted);
+}
+
+TEST_F(LocalRpcMonitorMapTest, local_add_already_up) {
+ std::unique_ptr<OkState> state;
+ bool handler_deleted;
+ add_mapping(mapping, true);
+ map.addLocal(mapping, std::make_unique<MyAddLocalHandler>(state, handler_deleted));
+ monitor_log.expect({});
+ map_log.expect({});
+ ASSERT_TRUE(state);
+ EXPECT_TRUE(state->ok());
+ ASSERT_TRUE(handler_deleted);
+}
+
+TEST_F(LocalRpcMonitorMapTest, local_add_unknown_comes_up) {
+ std::unique_ptr<OkState> state;
+ bool handler_deleted;
+ add_mapping(mapping, false);
+ map.addLocal(mapping, std::make_unique<MyAddLocalHandler>(state, handler_deleted));
+ monitor_log.expect({MonitorCall::stop(mapping), MonitorCall::start(mapping, true)});
+ map_log.expect({});
+ EXPECT_FALSE(state);
+ map.up(mapping);
+ map_log.expect({MapCall::add(mapping)});
+ ASSERT_TRUE(state);
+ EXPECT_TRUE(state->ok());
+ ASSERT_TRUE(handler_deleted);
+}
+
+TEST_F(LocalRpcMonitorMapTest, local_add_unknown_goes_down) {
+ std::unique_ptr<OkState> state;
+ bool handler_deleted;
+ add_mapping(mapping, false);
+ map.addLocal(mapping, std::make_unique<MyAddLocalHandler>(state, handler_deleted));
+ monitor_log.expect({MonitorCall::stop(mapping), MonitorCall::start(mapping, true)});
+ map_log.expect({});
+ EXPECT_FALSE(state);
+ map.down(mapping);
+ map_log.expect({});
+ ASSERT_TRUE(state);
+ EXPECT_FALSE(state->ok());
+ ASSERT_TRUE(handler_deleted);
+}
+
+TEST_F(LocalRpcMonitorMapTest, local_add_conflict) {
+ std::unique_ptr<OkState> state;
+ bool handler_deleted;
+ add_mapping(mapping, true);
+ map.addLocal(mapping_conflict, std::make_unique<MyAddLocalHandler>(state, handler_deleted));
+ monitor_log.expect({});
+ map_log.expect({});
+ ASSERT_TRUE(state);
+ EXPECT_TRUE(state->failed());
+ ASSERT_TRUE(handler_deleted);
+}
+
+TEST_F(LocalRpcMonitorMapTest, local_multi_add) {
+ std::unique_ptr<OkState> state1;
+ bool handler_deleted1;
+ std::unique_ptr<OkState> state2;
+ bool handler_deleted2;
+ map.addLocal(mapping, std::make_unique<MyAddLocalHandler>(state1, handler_deleted1));
+ monitor_log.expect({MonitorCall::start(mapping, true)});
+ map.addLocal(mapping, std::make_unique<MyAddLocalHandler>(state2, handler_deleted2));
+ monitor_log.expect({});
+ map_log.expect({});
+ EXPECT_FALSE(state1);
+ EXPECT_FALSE(state2);
+ map.up(mapping);
+ monitor_log.expect({});
+ map_log.expect({MapCall::add(mapping)});
+ ASSERT_TRUE(state1);
+ ASSERT_TRUE(state2);
+ EXPECT_TRUE(state1->ok());
+ EXPECT_TRUE(state2->ok());
+ ASSERT_TRUE(handler_deleted1);
+ ASSERT_TRUE(handler_deleted2);
+}
+
+TEST_F(LocalRpcMonitorMapTest, local_remove) {
+ add_mapping(mapping, true);
+ map.removeLocal(mapping);
+ monitor_log.expect({MonitorCall::stop(mapping), MonitorCall::start(mapping, false)});
+ map_log.expect({MapCall::remove(mapping)});
+ map.up(mapping); // timeout case (should normally not happen)
+ map_log.expect({MapCall::add(mapping)});
+}
+
+TEST_F(LocalRpcMonitorMapTest, local_add_local_remove) {
+ std::unique_ptr<OkState> state;
+ bool handler_deleted;
+ map.addLocal(mapping, std::make_unique<MyAddLocalHandler>(state, handler_deleted));
+ monitor_log.expect({MonitorCall::start(mapping, true)});
+ map_log.expect({});
+ map.removeLocal(mapping);
+ monitor_log.expect({MonitorCall::stop(mapping)});
+ map_log.expect({});
+ ASSERT_TRUE(state);
+ EXPECT_TRUE(state->failed());
+ ASSERT_TRUE(handler_deleted);
+}
+
+GTEST_MAIN_RUN_ALL_TESTS()
diff --git a/slobrok/src/vespa/slobrok/server/cmd.cpp b/slobrok/src/vespa/slobrok/server/cmd.cpp
index b809f655a9d..df856189d89 100644
--- a/slobrok/src/vespa/slobrok/server/cmd.cpp
+++ b/slobrok/src/vespa/slobrok/server/cmd.cpp
@@ -62,6 +62,15 @@ ScriptCommand::makeIgnoreCmd(SBEnv &env, const std::string & name, const std::st
return ScriptCommand(std::move(data));
}
+ScriptCommand
+ScriptCommand::makeRegCompleter(SBEnv &env,
+ const std::string &name, const std::string &spec,
+ FRT_RPCRequest *req)
+{
+ auto data = std::make_unique<ScriptData>(env, name, spec, req);
+ data->_state = ScriptData::XCH_DOADD;
+ return ScriptCommand(std::move(data));
+}
void
ScriptCommand::doRequest()
@@ -124,7 +133,9 @@ ScriptCommand::doneHandler(OkState result)
LOG(debug, "done doAdd(%s,%s)", name_p, spec_p);
data._state = ScriptData::RDC_INVAL;
// all OK
- data.registerRequest->Return();
+ if (data.registerRequest != nullptr) {
+ data.registerRequest->Return();
+ }
cleanupReservation(data);
return;
} else if (data._state == ScriptData::XCH_IGNORE) {
diff --git a/slobrok/src/vespa/slobrok/server/cmd.h b/slobrok/src/vespa/slobrok/server/cmd.h
index d790ae93f5c..e7f42f75e42 100644
--- a/slobrok/src/vespa/slobrok/server/cmd.h
+++ b/slobrok/src/vespa/slobrok/server/cmd.h
@@ -26,6 +26,7 @@ public:
static ScriptCommand makeRegRpcSrvCmd(SBEnv &env, const std::string &name, const std::string &spec, FRT_RPCRequest *req);
static ScriptCommand makeIgnoreCmd(SBEnv &env, const std::string &name, const std::string &spec);
+ static ScriptCommand makeRegCompleter(SBEnv &env, const std::string &name, const std::string &spec, FRT_RPCRequest *req);
void doneHandler(OkState result);
void doRequest();
diff --git a/slobrok/src/vespa/slobrok/server/exchange_manager.cpp b/slobrok/src/vespa/slobrok/server/exchange_manager.cpp
index 18da01ee526..ccfb8d3bd63 100644
--- a/slobrok/src/vespa/slobrok/server/exchange_manager.cpp
+++ b/slobrok/src/vespa/slobrok/server/exchange_manager.cpp
@@ -127,17 +127,19 @@ ExchangeManager::diffLists(const ServiceMappingList &lhs, const ServiceMappingLi
void
ExchangeManager::healthCheck()
{
- auto oldWorldServices = env().rpcServerMap().allManaged();
- ServiceMappingList oldWorldList;
- for (const auto *nsp : oldWorldServices) {
- oldWorldList.emplace_back(nsp->getName(), nsp->getSpec());
- }
- std::sort(oldWorldList.begin(), oldWorldList.end());
auto newWorldList = env().consensusMap().currentConsensus();
- vespalib::string diff = diffLists(oldWorldList, newWorldList);
- if (! diff.empty()) {
- LOG(warning, "Diff from old world rpcServerMap to new world consensus map: %s",
- diff.c_str());
+ if (! _env.useNewLogic()) {
+ auto oldWorldServices = env().rpcServerMap().allManaged();
+ ServiceMappingList oldWorldList;
+ for (const auto *nsp : oldWorldServices) {
+ oldWorldList.emplace_back(nsp->getName(), nsp->getSpec());
+ }
+ std::sort(oldWorldList.begin(), oldWorldList.end());
+ vespalib::string diff = diffLists(oldWorldList, newWorldList);
+ if (! diff.empty()) {
+ LOG(warning, "Diff from old world rpcServerMap to new world consensus map: %s",
+ diff.c_str());
+ }
}
for (const auto & [ name, partner ] : _partners) {
partner->maybeStartFetch();
@@ -145,7 +147,7 @@ ExchangeManager::healthCheck()
auto remoteList = partner->remoteMap().allMappings();
// 0 is expected (when remote is down)
if (remoteList.size() != 0) {
- diff = diffLists(newWorldList, remoteList);
+ vespalib::string diff = diffLists(newWorldList, remoteList);
if (! diff.empty()) {
LOG(warning, "Diff from consensus map to peer slobrok mirror: %s",
diff.c_str());
diff --git a/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.cpp b/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.cpp
index 454d123eead..16e47371cbb 100644
--- a/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.cpp
+++ b/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.cpp
@@ -10,6 +10,26 @@ namespace slobrok {
#pragma GCC diagnostic ignored "-Winline"
+namespace {
+
+struct ChainedAddLocalCompletionHandler : LocalRpcMonitorMap::AddLocalCompletionHandler {
+ std::unique_ptr<AddLocalCompletionHandler> first;
+ std::unique_ptr<AddLocalCompletionHandler> second;
+
+ ChainedAddLocalCompletionHandler(std::unique_ptr<AddLocalCompletionHandler> f,
+ std::unique_ptr<AddLocalCompletionHandler> s)
+ : first(std::move(f)), second(std::move(s))
+ {}
+
+ void doneHandler(OkState result) override {
+ first->doneHandler(result);
+ second->doneHandler(result);
+ }
+ ~ChainedAddLocalCompletionHandler() override {}
+};
+
+}
+
void LocalRpcMonitorMap::DelayedTasks::PerformTask() {
std::vector<Event> todo;
std::swap(todo, _queue);
@@ -25,9 +45,9 @@ void LocalRpcMonitorMap::DelayedTasks::PerformTask() {
}
}
-LocalRpcMonitorMap::LocalRpcMonitorMap(FRT_Supervisor &supervisor,
+LocalRpcMonitorMap::LocalRpcMonitorMap(FNET_Scheduler *scheduler,
MappingMonitorFactory mappingMonitorFactory)
- : _delayedTasks(supervisor.GetScheduler(), *this),
+ : _delayedTasks(scheduler, *this),
_map(),
_dispatcher(),
_history(),
@@ -82,18 +102,37 @@ ServiceMapHistory & LocalRpcMonitorMap::history() {
return _history;
}
+bool LocalRpcMonitorMap::wouldConflict(const ServiceMapping &mapping) const {
+ auto iter = _map.find(mapping.name);
+ if (iter == _map.end()) {
+ return false; // no mapping, no conflict
+ }
+ return (iter->second.spec != mapping.spec);
+}
+
void LocalRpcMonitorMap::addLocal(const ServiceMapping &mapping,
- std::unique_ptr<ScriptCommand> inflight)
+ std::unique_ptr<AddLocalCompletionHandler> inflight)
{
LOG(debug, "try local add: mapping %s->%s",
mapping.name.c_str(), mapping.spec.c_str());
auto old = _map.find(mapping.name);
if (old != _map.end()) {
- const PerService & exists = old->second;
+ PerService & exists = old->second;
if (exists.spec == mapping.spec) {
LOG(debug, "added mapping %s->%s was already present",
mapping.name.c_str(), mapping.spec.c_str());
- inflight->doneHandler(OkState(0, "already registered"));
+ if (exists.up) {
+ inflight->doneHandler(OkState(0, "already registered"));
+ } else if (exists.inflight) {
+ auto newInflight = std::make_unique<ChainedAddLocalCompletionHandler>(
+ std::move(exists.inflight),
+ std::move(inflight));
+ exists.inflight = std::move(newInflight);
+ } else {
+ _mappingMonitor->stop(mapping);
+ exists.inflight = std::move(inflight);
+ _mappingMonitor->start(mapping, true);
+ }
return;
}
LOG(warning, "tried addLocal for mapping %s->%s, but already had conflicting mapping %s->%s",
@@ -105,6 +144,43 @@ void LocalRpcMonitorMap::addLocal(const ServiceMapping &mapping,
addToMap(mapping, localService(mapping, std::move(inflight)), true);
}
+void LocalRpcMonitorMap::removeLocal(const ServiceMapping &mapping) {
+ LOG(debug, "try local remove: mapping %s->%s",
+ mapping.name.c_str(), mapping.spec.c_str());
+ auto old = _map.find(mapping.name);
+ if (old == _map.end()) {
+ return; // already removed, OK
+ }
+ PerService & exists = old->second;
+ if (exists.spec != mapping.spec) {
+ LOG(warning, "tried removeLocal for mapping %s->%s, but already had conflicting mapping %s->%s",
+ mapping.name.c_str(), mapping.spec.c_str(),
+ mapping.name.c_str(), exists.spec.c_str());
+ return; // unregister for old, conflicting mapping
+ }
+ if (exists.localOnly) {
+ // we can just remove it
+ auto removed = removeFromMap(old);
+ if (removed.inflight) {
+ auto target = std::move(removed.inflight);
+ target->doneHandler(OkState(13, "removed during initialization"));
+ }
+ if (removed.up) {
+ _dispatcher.remove(removed.mapping);
+ }
+ return;
+ }
+ // also exists in consensus map, so we can't just remove it
+ // instead, pretend it's down and delay next ping
+ _mappingMonitor->stop(mapping);
+ if (exists.up) {
+ exists.up = false;
+ _dispatcher.remove(mapping);
+ }
+ _mappingMonitor->start(mapping, false);
+ return;
+}
+
void LocalRpcMonitorMap::add(const ServiceMapping &mapping) {
_delayedTasks.handleLater(Event::add(mapping));
}
diff --git a/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.h b/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.h
index 3b2c74648d2..e3d081eacc9 100644
--- a/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.h
+++ b/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.h
@@ -27,6 +27,13 @@ namespace slobrok {
class LocalRpcMonitorMap : public MapListener,
public MappingMonitorOwner
{
+public:
+ // Interface used to signal the result of addLocal
+ struct AddLocalCompletionHandler {
+ virtual void doneHandler(OkState result) = 0;
+ virtual ~AddLocalCompletionHandler() {}
+ };
+
private:
enum class EventType { ADD, REMOVE };
@@ -66,12 +73,12 @@ private:
struct PerService {
bool up;
bool localOnly;
- std::unique_ptr<ScriptCommand> inflight;
+ std::unique_ptr<AddLocalCompletionHandler> inflight;
vespalib::string spec;
};
PerService localService(const ServiceMapping &mapping,
- std::unique_ptr<ScriptCommand> inflight)
+ std::unique_ptr<AddLocalCompletionHandler> inflight)
{
return PerService{
.up = false,
@@ -109,22 +116,27 @@ private:
ServiceMapping mapping;
bool up;
bool localOnly;
- std::unique_ptr<ScriptCommand> inflight;
+ std::unique_ptr<AddLocalCompletionHandler> inflight;
};
RemovedData removeFromMap(Map::iterator iter);
public:
- LocalRpcMonitorMap(FRT_Supervisor &supervisor,
+ LocalRpcMonitorMap(FNET_Scheduler *scheduler,
MappingMonitorFactory mappingMonitorFactory);
~LocalRpcMonitorMap();
MapSource &dispatcher() { return _dispatcher; }
ServiceMapHistory & history();
+ bool wouldConflict(const ServiceMapping &mapping) const;
+
/** for use by register API, will call doneHandler() on inflight script */
void addLocal(const ServiceMapping &mapping,
- std::unique_ptr<ScriptCommand> inflight);
+ std::unique_ptr<AddLocalCompletionHandler> inflight);
+
+ /** for use by unregister API */
+ void removeLocal(const ServiceMapping &mapping);
void add(const ServiceMapping &mapping) override;
void remove(const ServiceMapping &mapping) override;
diff --git a/slobrok/src/vespa/slobrok/server/rpchooks.cpp b/slobrok/src/vespa/slobrok/server/rpchooks.cpp
index 4ed173deaa1..ab1d5246ddc 100644
--- a/slobrok/src/vespa/slobrok/server/rpchooks.cpp
+++ b/slobrok/src/vespa/slobrok/server/rpchooks.cpp
@@ -37,6 +37,12 @@ public:
~MetricsReport() override { Kill(); }
};
+struct ScriptCommandWrapper : LocalRpcMonitorMap::AddLocalCompletionHandler {
+ ScriptCommand script;
+ ScriptCommandWrapper(ScriptCommand &&script_in) : script(std::move(script_in)) {}
+ void doneHandler(OkState result) override { script.doneHandler(result); }
+};
+
} // namespace <unnamed>
//-----------------------------------------------------------------------------
@@ -215,6 +221,10 @@ RPCHooks::initRPC(FRT_Supervisor *supervisor)
}
+bool RPCHooks::useNewLogic() const {
+ return _env.useNewLogic();
+}
+
void
RPCHooks::rpc_listNamesServed(FRT_RPCRequest *req)
{
@@ -228,6 +238,9 @@ RPCHooks::rpc_listNamesServed(FRT_RPCRequest *req)
void
RPCHooks::rpc_registerRpcServer(FRT_RPCRequest *req)
{
+ if (useNewLogic()) {
+ return new_registerRpcServer(req);
+ }
FRT_Values &args = *req->GetParams();
const char *dName = args[0]._string._str;
const char *dSpec = args[1]._string._str;
@@ -238,7 +251,7 @@ RPCHooks::rpc_registerRpcServer(FRT_RPCRequest *req)
// TODO: run only this path, and complete the request instead of ignoring
auto script = ScriptCommand::makeIgnoreCmd(_env, dName, dSpec);
ServiceMapping mapping{dName, dSpec};
- _env.localMonitorMap().addLocal(mapping, std::make_unique<ScriptCommand>(std::move(script)));
+ _env.localMonitorMap().addLocal(mapping, std::make_unique<ScriptCommandWrapper>(std::move(script)));
}
// is this already OK?
if (_rpcsrvmanager.alreadyManaged(dName, dSpec)) {
@@ -260,9 +273,34 @@ RPCHooks::rpc_registerRpcServer(FRT_RPCRequest *req)
completer.doRequest();
}
+void RPCHooks::new_registerRpcServer(FRT_RPCRequest *req) {
+ FRT_Values &args = *req->GetParams();
+ const char *dName = args[0]._string._str;
+ const char *dSpec = args[1]._string._str;
+ LOG(debug, "RPC: invoked registerRpcServer(%s,%s)", dName, dSpec);
+ _cnts.registerReqs++;
+ ServiceMapping mapping{dName, dSpec};
+ // can we say now, that this will fail?
+ if (_env.consensusMap().wouldConflict(mapping)) {
+ req->SetError(FRTE_RPC_METHOD_FAILED, "conflict detected");
+ LOG(info, "cannot register %s at %s: conflict", dName, dSpec);
+ return;
+ }
+ auto script = ScriptCommand::makeRegCompleter(_env, dName, dSpec, req);
+ req->Detach();
+ _env.localMonitorMap().addLocal(mapping, std::make_unique<ScriptCommandWrapper>(std::move(script)));
+ // TODO: remove this
+ script = ScriptCommand::makeRegRpcSrvCmd(_env, dName, dSpec, nullptr);
+ script.doRequest();
+ return;
+}
+
void
RPCHooks::rpc_unregisterRpcServer(FRT_RPCRequest *req)
{
+ if (useNewLogic()) {
+ return new_unregisterRpcServer(req);
+ }
FRT_Values &args = *req->GetParams();
const char *dName = args[0]._string._str;
const char *dSpec = args[1]._string._str;
@@ -278,6 +316,17 @@ RPCHooks::rpc_unregisterRpcServer(FRT_RPCRequest *req)
return;
}
+void RPCHooks::new_unregisterRpcServer(FRT_RPCRequest *req) {
+ FRT_Values &args = *req->GetParams();
+ const char *dName = args[0]._string._str;
+ const char *dSpec = args[1]._string._str;
+ ServiceMapping mapping{dName, dSpec};
+ _env.localMonitorMap().removeLocal(mapping);
+ _env.exchangeManager().forwardRemove(dName, dSpec);
+ LOG(debug, "unregisterRpcServer(%s,%s)", dName, dSpec);
+ _cnts.otherReqs++;
+ return;
+}
void
RPCHooks::rpc_addPeer(FRT_RPCRequest *req)
@@ -322,6 +371,9 @@ RPCHooks::rpc_removePeer(FRT_RPCRequest *req)
void
RPCHooks::rpc_wantAdd(FRT_RPCRequest *req)
{
+ if (useNewLogic()) {
+ return new_wantAdd(req);
+ }
FRT_Values &args = *req->GetParams();
const char *remsb = args[0]._string._str;
const char *dName = args[1]._string._str;
@@ -341,10 +393,38 @@ RPCHooks::rpc_wantAdd(FRT_RPCRequest *req)
return;
}
+void RPCHooks::new_wantAdd(FRT_RPCRequest *req) {
+ FRT_Values &args = *req->GetParams();
+ const char *remsb = args[0]._string._str;
+ const char *dName = args[1]._string._str;
+ const char *dSpec = args[2]._string._str;
+ FRT_Values &retval = *req->GetReturn();
+ ServiceMapping mapping{dName, dSpec};
+ bool conflict = (
+ _env.consensusMap().wouldConflict(mapping)
+ ||
+ _env.localMonitorMap().wouldConflict(mapping)
+ );
+ if (conflict) {
+ retval.AddInt32(13);
+ retval.AddString("conflict detected");
+ req->SetError(FRTE_RPC_METHOD_FAILED, "conflict detected");
+ } else {
+ retval.AddInt32(0);
+ retval.AddString("ok");
+ }
+ LOG(debug, "%s->wantAdd(%s,%s) %s",
+ remsb, dName, dSpec, conflict ? "conflict" : "OK");
+ _cnts.wantAddReqs++;
+ return;
+}
void
RPCHooks::rpc_doRemove(FRT_RPCRequest *req)
{
+ if (useNewLogic()) {
+ return new_doRemove(req);
+ }
FRT_Values &args = *req->GetParams();
const char *rname = args[0]._string._str;
const char *dname = args[1]._string._str;
@@ -364,9 +444,27 @@ RPCHooks::rpc_doRemove(FRT_RPCRequest *req)
return;
}
+void RPCHooks::new_doRemove(FRT_RPCRequest *req) {
+ FRT_Values &args = *req->GetParams();
+ const char *rname = args[0]._string._str;
+ const char *dname = args[1]._string._str;
+ const char *dspec = args[2]._string._str;
+ FRT_Values &retval = *req->GetReturn();
+ ServiceMapping mapping{dname, dspec};
+ _env.localMonitorMap().removeLocal(mapping);
+ retval.AddInt32(0);
+ retval.AddString("ok");
+ LOG(debug, "%s->doRemove(%s,%s)", rname, dname, dspec);
+ _cnts.doRemoveReqs++;
+ return;
+}
+
void
RPCHooks::rpc_doAdd(FRT_RPCRequest *req)
{
+ if (useNewLogic()) {
+ return new_doAdd(req);
+ }
FRT_Values &args = *req->GetParams();
const char *rname = args[0]._string._str;
const char *dname = args[1]._string._str;
@@ -386,6 +484,28 @@ RPCHooks::rpc_doAdd(FRT_RPCRequest *req)
return;
}
+void RPCHooks::new_doAdd(FRT_RPCRequest *req) {
+ FRT_Values &args = *req->GetParams();
+ const char *remsb = args[0]._string._str;
+ const char *dName = args[1]._string._str;
+ const char *dSpec = args[2]._string._str;
+ FRT_Values &retval = *req->GetReturn();
+ ServiceMapping mapping{dName, dSpec};
+ bool ok = true;
+ if (_env.consensusMap().wouldConflict(mapping)) {
+ retval.AddInt32(13);
+ retval.AddString("conflict detected");
+ req->SetError(FRTE_RPC_METHOD_FAILED, "conflict detected");
+ ok = false;
+ } else {
+ retval.AddInt32(0);
+ retval.AddString("ok");
+ }
+ LOG(debug, "%s->doAdd(%s,%s) %s",
+ remsb, dName, dSpec, ok ? "OK" : "failed");
+ _cnts.doAddReqs++;
+ return;
+}
void
RPCHooks::rpc_lookupRpcServer(FRT_RPCRequest *req)
@@ -455,6 +575,7 @@ RPCHooks::rpc_lookupManaged(FRT_RPCRequest *req)
FRT_Values &args = *req->GetParams();
const char *name = args[0]._string._str;
LOG(debug, "RPC: lookupManaged(%s)", name);
+ // TODO: use local history here
const auto & visible = _env.globalHistory();
auto diff = visible.makeDiffFrom(0);
for (const auto & entry : diff.updated) {
diff --git a/slobrok/src/vespa/slobrok/server/rpchooks.h b/slobrok/src/vespa/slobrok/server/rpchooks.h
index a41e473b183..e8f6c65ea47 100644
--- a/slobrok/src/vespa/slobrok/server/rpchooks.h
+++ b/slobrok/src/vespa/slobrok/server/rpchooks.h
@@ -58,8 +58,16 @@ public:
void countFailedHeartbeat() { _cnts.heartBeatFails++; }
private:
+ bool useNewLogic() const;
+
void rpc_lookupRpcServer(FRT_RPCRequest *req);
+ void new_registerRpcServer(FRT_RPCRequest *req);
+ void new_unregisterRpcServer(FRT_RPCRequest *req);
+ void new_wantAdd(FRT_RPCRequest *req);
+ void new_doRemove(FRT_RPCRequest *req);
+ void new_doAdd(FRT_RPCRequest *req);
+
void rpc_registerRpcServer(FRT_RPCRequest *req);
void rpc_unregisterRpcServer(FRT_RPCRequest *req);
diff --git a/slobrok/src/vespa/slobrok/server/sbenv.cpp b/slobrok/src/vespa/slobrok/server/sbenv.cpp
index 1f54716c29c..ebb9935877f 100644
--- a/slobrok/src/vespa/slobrok/server/sbenv.cpp
+++ b/slobrok/src/vespa/slobrok/server/sbenv.cpp
@@ -97,12 +97,15 @@ ConfigTask::PerformTask()
} // namespace slobrok::<unnamed>
-SBEnv::SBEnv(const ConfigShim &shim)
+SBEnv::SBEnv(const ConfigShim &shim) : SBEnv(shim, false) {}
+
+SBEnv::SBEnv(const ConfigShim &shim, bool useNewConsensusLogic)
: _transport(std::make_unique<FNET_Transport>(TransportConfig().drop_empty_buffers(true))),
_supervisor(std::make_unique<FRT_Supervisor>(_transport.get())),
_configShim(shim),
_configurator(shim.factory().create(*this)),
_shuttingDown(false),
+ _useNewLogic(useNewConsensusLogic),
_partnerList(),
_me(createSpec(_configShim.portNumber())),
_rpcHooks(*this, _rpcsrvmap, _rpcsrvmanager),
@@ -110,7 +113,7 @@ SBEnv::SBEnv(const ConfigShim &shim)
_health(),
_metrics(_rpcHooks, *_transport),
_components(),
- _localRpcMonitorMap(*_supervisor,
+ _localRpcMonitorMap(getScheduler(),
[this] (MappingMonitorOwner &owner) {
return std::make_unique<RpcMappingMonitor>(*_supervisor, owner);
}),
@@ -118,11 +121,19 @@ SBEnv::SBEnv(const ConfigShim &shim)
_exchanger(*this, _rpcsrvmap),
_rpcsrvmap()
{
+ if (useNewLogic()) {
+ srandom(time(nullptr) ^ getpid());
+ // note: feedback loop between these two:
+ _localMonitorSubscription = MapSubscription::subscribe(_consensusMap, _localRpcMonitorMap);
+ _consensusSubscription = MapSubscription::subscribe(_localRpcMonitorMap.dispatcher(), _consensusMap);
+ _globalHistorySubscription = MapSubscription::subscribe(_consensusMap, _globalVisibleHistory);
+ _rpcHooks.initRPC(getSupervisor());
+ return;
+ }
srandom(time(nullptr) ^ getpid());
// note: feedback loop between these two:
_localMonitorSubscription = MapSubscription::subscribe(_consensusMap, _localRpcMonitorMap);
_consensusSubscription = MapSubscription::subscribe(_localRpcMonitorMap.dispatcher(), _consensusMap);
- // TODO: use consensus as source here:
_globalHistorySubscription = MapSubscription::subscribe(_rpcsrvmap.proxy(), _globalVisibleHistory);
_rpcHooks.initRPC(getSupervisor());
}
diff --git a/slobrok/src/vespa/slobrok/server/sbenv.h b/slobrok/src/vespa/slobrok/server/sbenv.h
index 44b7305814c..c6fd8905131 100644
--- a/slobrok/src/vespa/slobrok/server/sbenv.h
+++ b/slobrok/src/vespa/slobrok/server/sbenv.h
@@ -44,6 +44,7 @@ private:
ConfigShim _configShim;
Configurator::UP _configurator;
bool _shuttingDown;
+ const bool _useNewLogic;
SBEnv(const SBEnv &); // Not used
SBEnv &operator=(const SBEnv &); // Not used
@@ -71,6 +72,7 @@ private:
public:
explicit SBEnv(const ConfigShim &shim);
+ SBEnv(const ConfigShim &shim, bool useNewConsensusLogic);
~SBEnv();
FNET_Transport *getTransport() { return _transport.get(); }
@@ -105,6 +107,7 @@ public:
bool isSuspended() const { return false; }
bool isShuttingDown() const { return _shuttingDown; }
+ bool useNewLogic() const { return _useNewLogic; }
int MainLoop();
diff --git a/slobrok/src/vespa/slobrok/server/union_service_map.cpp b/slobrok/src/vespa/slobrok/server/union_service_map.cpp
index baf94a6fa69..9abfc237d56 100644
--- a/slobrok/src/vespa/slobrok/server/union_service_map.cpp
+++ b/slobrok/src/vespa/slobrok/server/union_service_map.cpp
@@ -20,6 +20,19 @@ ServiceMappingList UnionServiceMap::currentConsensus() const {
return result;
}
+bool UnionServiceMap::wouldConflict(const ServiceMapping &mapping) const {
+ const vespalib::string &key = mapping.name;
+ auto iter = _mappings.find(key);
+ if (iter == _mappings.end()) {
+ return false;
+ }
+ const Mappings &values = iter->second;
+ if (values.size() != 1) {
+ return true;
+ }
+ return (values[0].spec != mapping.spec);
+}
+
void UnionServiceMap::add(const ServiceMapping &mapping)
{
const vespalib::string &key = mapping.name;
diff --git a/slobrok/src/vespa/slobrok/server/union_service_map.h b/slobrok/src/vespa/slobrok/server/union_service_map.h
index d5bcbfaed94..67d3221849d 100644
--- a/slobrok/src/vespa/slobrok/server/union_service_map.h
+++ b/slobrok/src/vespa/slobrok/server/union_service_map.h
@@ -36,6 +36,8 @@ public:
ServiceMappingList currentConsensus() const;
+ bool wouldConflict(const ServiceMapping &mapping) const;
+
void add(const ServiceMapping &mapping) override;
void remove(const ServiceMapping &mapping) override;
void update(const ServiceMapping &old_mapping,