diff options
Diffstat (limited to 'slobrok')
-rw-r--r-- | slobrok/CMakeLists.txt | 1 | ||||
-rw-r--r-- | slobrok/src/apps/slobrok/slobrok.cpp | 7 | ||||
-rw-r--r-- | slobrok/src/tests/local_rpc_monitor_map/CMakeLists.txt | 9 | ||||
-rw-r--r-- | slobrok/src/tests/local_rpc_monitor_map/local_rpc_monitor_map_test.cpp | 331 | ||||
-rw-r--r-- | slobrok/src/vespa/slobrok/server/cmd.cpp | 13 | ||||
-rw-r--r-- | slobrok/src/vespa/slobrok/server/cmd.h | 1 | ||||
-rw-r--r-- | slobrok/src/vespa/slobrok/server/exchange_manager.cpp | 24 | ||||
-rw-r--r-- | slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.cpp | 86 | ||||
-rw-r--r-- | slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.h | 22 | ||||
-rw-r--r-- | slobrok/src/vespa/slobrok/server/rpchooks.cpp | 123 | ||||
-rw-r--r-- | slobrok/src/vespa/slobrok/server/rpchooks.h | 8 | ||||
-rw-r--r-- | slobrok/src/vespa/slobrok/server/sbenv.cpp | 17 | ||||
-rw-r--r-- | slobrok/src/vespa/slobrok/server/sbenv.h | 3 | ||||
-rw-r--r-- | slobrok/src/vespa/slobrok/server/union_service_map.cpp | 13 | ||||
-rw-r--r-- | slobrok/src/vespa/slobrok/server/union_service_map.h | 2 |
15 files changed, 631 insertions, 29 deletions
diff --git a/slobrok/CMakeLists.txt b/slobrok/CMakeLists.txt index 6acbf5d5134..c6c6313cf68 100644 --- a/slobrok/CMakeLists.txt +++ b/slobrok/CMakeLists.txt @@ -19,6 +19,7 @@ vespa_define_module( TESTS src/tests/backoff src/tests/configure + src/tests/local_rpc_monitor_map src/tests/mirrorapi src/tests/registerapi src/tests/service_map_history diff --git a/slobrok/src/apps/slobrok/slobrok.cpp b/slobrok/src/apps/slobrok/slobrok.cpp index 63212745644..b2748762a12 100644 --- a/slobrok/src/apps/slobrok/slobrok.cpp +++ b/slobrok/src/apps/slobrok/slobrok.cpp @@ -50,6 +50,7 @@ App::Main() { uint32_t portnum = 2773; vespalib::string cfgId; + bool useNewLogic = false; int argi = 1; const char* optArg; @@ -63,7 +64,7 @@ App::Main() portnum = atoi(optArg); break; case 'N': - // ignore flag for now + useNewLogic = true; break; default: LOG(error, "unknown option letter '%c'", c); @@ -75,11 +76,11 @@ App::Main() if (cfgId.empty()) { LOG(debug, "no config id specified"); ConfigShim shim(portnum); - mainobj = std::make_unique<SBEnv>(shim); + mainobj = std::make_unique<SBEnv>(shim, useNewLogic); } else { ConfigShim shim(portnum, cfgId); shim.enableStateServer(true); - mainobj = std::make_unique<SBEnv>(shim); + mainobj = std::make_unique<SBEnv>(shim, useNewLogic); } hook_sigterm(); res = mainobj->MainLoop(); diff --git a/slobrok/src/tests/local_rpc_monitor_map/CMakeLists.txt b/slobrok/src/tests/local_rpc_monitor_map/CMakeLists.txt new file mode 100644 index 00000000000..aa30021939c --- /dev/null +++ b/slobrok/src/tests/local_rpc_monitor_map/CMakeLists.txt @@ -0,0 +1,9 @@ +# Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_executable(slobrok_local_rpc_monitor_map_test_app TEST + SOURCES + local_rpc_monitor_map_test.cpp + DEPENDS + slobrok_slobrokserver + GTest::GTest +) +vespa_add_test(NAME slobrok_local_rpc_monitor_map_test_app COMMAND slobrok_local_rpc_monitor_map_test_app) diff --git a/slobrok/src/tests/local_rpc_monitor_map/local_rpc_monitor_map_test.cpp b/slobrok/src/tests/local_rpc_monitor_map/local_rpc_monitor_map_test.cpp new file mode 100644 index 00000000000..9782f6ccbdc --- /dev/null +++ b/slobrok/src/tests/local_rpc_monitor_map/local_rpc_monitor_map_test.cpp @@ -0,0 +1,331 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/vespalib/gtest/gtest.h> +#include <vespa/slobrok/server/local_rpc_monitor_map.h> +#include <vespa/vespalib/util/stringfmt.h> +#include <vespa/vespalib/util/time.h> +#include <vespa/fnet/scheduler.h> +#include <map> + +using namespace vespalib; +using namespace slobrok; +using vespalib::make_string_short::fmt; + +struct MapCall { + vespalib::string name; + ServiceMapping mapping; + ServiceMapping old; + static MapCall add(const ServiceMapping &m) { return {"add", m, {"",""}}; } + static MapCall remove(const ServiceMapping &m) { return {"remove", m, {"",""}}; } + static MapCall update(const ServiceMapping &o, const ServiceMapping &m) { return {"update", m, o}; } + void check(const MapCall &rhs) const { + EXPECT_EQ(name, rhs.name); + EXPECT_EQ(mapping, rhs.mapping); + EXPECT_EQ(old, rhs.old); + } + ~MapCall(); +}; +MapCall::~MapCall() = default; + +struct MonitorCall { + vespalib::string name; + ServiceMapping mapping; + bool hurry; + static MonitorCall start(const ServiceMapping &m, bool h) { return {"start", m, h}; } + static MonitorCall stop(const ServiceMapping &m) { return {"stop", m, false}; } + void check(const MonitorCall &rhs) const { + EXPECT_EQ(name, rhs.name); + EXPECT_EQ(mapping, rhs.mapping); + EXPECT_EQ(hurry, rhs.hurry); + } + ~MonitorCall(); +}; +MonitorCall::~MonitorCall() = default; + +template <typename Call> +class CallLog { +private: + std::vector<Call> _calls; + size_t _checked; +public: + CallLog() noexcept : _calls(), _checked(0) {} + ~CallLog() { EXPECT_EQ(_calls.size(), _checked); } + void log(Call call) { _calls.push_back(call); } + void expect(std::initializer_list<Call> list) { + ASSERT_EQ(list.size(), (_calls.size() - _checked)); + for (const auto &call: list) { + call.check(_calls[_checked++]); + } + } +}; + +struct MapLog : CallLog<MapCall>, MapListener { + void add(const ServiceMapping &mapping) override { + log(MapCall::add(mapping)); + } + void remove(const ServiceMapping &mapping) override { + log(MapCall::remove(mapping)); + } + void update(const ServiceMapping &old_mapping, + const ServiceMapping &new_mapping) override + { + log(MapCall::update(old_mapping, new_mapping)); + } +}; + +struct MonitorLog : CallLog<MonitorCall>, MappingMonitor { + void start(const ServiceMapping& mapping, bool hurry) override { + log(MonitorCall::start(mapping, hurry)); + } + void stop(const ServiceMapping& mapping) override { + log(MonitorCall::stop(mapping)); + } +}; + +struct MyMappingMonitor : MappingMonitor { + MonitorLog &monitor; + MyMappingMonitor(MonitorLog &m) : monitor(m) {} + void start(const ServiceMapping& mapping, bool hurry) override { + monitor.start(mapping, hurry); + } + void stop(const ServiceMapping& mapping) override { + monitor.stop(mapping); + } +}; + +struct LocalRpcMonitorMapTest : public ::testing::Test { + steady_time time; + FNET_Scheduler scheduler; + MonitorLog monitor_log; + MapLog map_log; + LocalRpcMonitorMap map; + std::unique_ptr<MapSubscription> subscription; + ServiceMapping mapping; + ServiceMapping mapping_conflict; + LocalRpcMonitorMapTest() + : time(duration::zero()), + scheduler(&time, &time), monitor_log(), map_log(), + map(&scheduler, [this](auto &owner) + { + EXPECT_EQ(&owner, &map); + return std::make_unique<MyMappingMonitor>(monitor_log); + }), + subscription(MapSubscription::subscribe(map.dispatcher(), map_log)), + mapping("dummy_service", "dummy_spec"), + mapping_conflict("dummy_service", "conflicting_dummy_spec") + {} + void tick(duration elapsed = FNET_Scheduler::tick_ms) { + time += elapsed; + scheduler.CheckTasks(); + } + void add_mapping(const ServiceMapping &m, bool is_up) { + map.add(m); // <- add from consensus map + monitor_log.expect({}); + tick(0ms); // <- process delayed add event + monitor_log.expect({MonitorCall::start(m, false)}); + map_log.expect({}); + if (is_up) { + map.up(m); // <- up from monitor + map_log.expect({MapCall::add(m)}); + } else { + map.down(m); // <- down from monitor + map_log.expect({}); + } + } + void flip_up_state(const ServiceMapping &m, bool was_up, size_t cnt) { + for (size_t i = 0; i < cnt; ++i) { + if (was_up) { + map.up(m); + map_log.expect({}); + map.down(m); + map_log.expect({MapCall::remove(m)}); + } else { + map.down(m); + map_log.expect({}); + map.up(m); + map_log.expect({MapCall::add(m)}); + } + was_up = !was_up; + } + monitor_log.expect({}); + } + void remove_mapping(const ServiceMapping &m, bool was_up) { + map.remove(m); // <- remove from consensus map + monitor_log.expect({}); + tick(0ms); // <- process delayed remove event + monitor_log.expect({MonitorCall::stop(m)}); + if (was_up) { + map_log.expect({MapCall::remove(m)}); + } else { + map_log.expect({}); + } + } + ~LocalRpcMonitorMapTest(); +}; +LocalRpcMonitorMapTest::~LocalRpcMonitorMapTest() = default; + +struct MyAddLocalHandler : LocalRpcMonitorMap::AddLocalCompletionHandler { + std::unique_ptr<OkState> &state; + bool &handler_deleted; + MyAddLocalHandler(std::unique_ptr<OkState> &s, bool &hd) + : state(s), handler_deleted(hd) {} + void doneHandler(OkState result) override { + state = std::make_unique<OkState>(result); + } + ~MyAddLocalHandler() override { + handler_deleted = true; + } +}; + +TEST_F(LocalRpcMonitorMapTest, external_add_remove_while_up) { + add_mapping(mapping, true); + remove_mapping(mapping, true); +} + +TEST_F(LocalRpcMonitorMapTest, external_add_remove_while_down) { + add_mapping(mapping, false); + remove_mapping(mapping, false); +} + +TEST_F(LocalRpcMonitorMapTest, server_up_down_up_down) { + add_mapping(mapping, true); + flip_up_state(mapping, true, 3); + remove_mapping(mapping, false); +} + +TEST_F(LocalRpcMonitorMapTest, server_down_up_down_up) { + add_mapping(mapping, false); + flip_up_state(mapping, false, 3); + remove_mapping(mapping, true); +} + +TEST_F(LocalRpcMonitorMapTest, multi_mapping) { + ServiceMapping m1("dummy_service1", "dummy_spec1"); + ServiceMapping m2("dummy_service2", "dummy_spec2"); + ServiceMapping m3("dummy_service3", "dummy_spec3"); + add_mapping(m1, true); + add_mapping(m2, false); + add_mapping(m3, true); + flip_up_state(m1, true, 3); + flip_up_state(m2, false, 3); + flip_up_state(m3, true, 3); + remove_mapping(m1, false); + remove_mapping(m2, true); + remove_mapping(m3, false); +} + +TEST_F(LocalRpcMonitorMapTest, local_add_ok) { + std::unique_ptr<OkState> state; + bool handler_deleted; + map.addLocal(mapping, std::make_unique<MyAddLocalHandler>(state, handler_deleted)); + monitor_log.expect({MonitorCall::start(mapping, true)}); + map_log.expect({}); + map.up(mapping); + monitor_log.expect({}); + map_log.expect({MapCall::add(mapping)}); + ASSERT_TRUE(state); + EXPECT_TRUE(state->ok()); + ASSERT_TRUE(handler_deleted); +} + +TEST_F(LocalRpcMonitorMapTest, local_add_already_up) { + std::unique_ptr<OkState> state; + bool handler_deleted; + add_mapping(mapping, true); + map.addLocal(mapping, std::make_unique<MyAddLocalHandler>(state, handler_deleted)); + monitor_log.expect({}); + map_log.expect({}); + ASSERT_TRUE(state); + EXPECT_TRUE(state->ok()); + ASSERT_TRUE(handler_deleted); +} + +TEST_F(LocalRpcMonitorMapTest, local_add_unknown_comes_up) { + std::unique_ptr<OkState> state; + bool handler_deleted; + add_mapping(mapping, false); + map.addLocal(mapping, std::make_unique<MyAddLocalHandler>(state, handler_deleted)); + monitor_log.expect({MonitorCall::stop(mapping), MonitorCall::start(mapping, true)}); + map_log.expect({}); + EXPECT_FALSE(state); + map.up(mapping); + map_log.expect({MapCall::add(mapping)}); + ASSERT_TRUE(state); + EXPECT_TRUE(state->ok()); + ASSERT_TRUE(handler_deleted); +} + +TEST_F(LocalRpcMonitorMapTest, local_add_unknown_goes_down) { + std::unique_ptr<OkState> state; + bool handler_deleted; + add_mapping(mapping, false); + map.addLocal(mapping, std::make_unique<MyAddLocalHandler>(state, handler_deleted)); + monitor_log.expect({MonitorCall::stop(mapping), MonitorCall::start(mapping, true)}); + map_log.expect({}); + EXPECT_FALSE(state); + map.down(mapping); + map_log.expect({}); + ASSERT_TRUE(state); + EXPECT_FALSE(state->ok()); + ASSERT_TRUE(handler_deleted); +} + +TEST_F(LocalRpcMonitorMapTest, local_add_conflict) { + std::unique_ptr<OkState> state; + bool handler_deleted; + add_mapping(mapping, true); + map.addLocal(mapping_conflict, std::make_unique<MyAddLocalHandler>(state, handler_deleted)); + monitor_log.expect({}); + map_log.expect({}); + ASSERT_TRUE(state); + EXPECT_TRUE(state->failed()); + ASSERT_TRUE(handler_deleted); +} + +TEST_F(LocalRpcMonitorMapTest, local_multi_add) { + std::unique_ptr<OkState> state1; + bool handler_deleted1; + std::unique_ptr<OkState> state2; + bool handler_deleted2; + map.addLocal(mapping, std::make_unique<MyAddLocalHandler>(state1, handler_deleted1)); + monitor_log.expect({MonitorCall::start(mapping, true)}); + map.addLocal(mapping, std::make_unique<MyAddLocalHandler>(state2, handler_deleted2)); + monitor_log.expect({}); + map_log.expect({}); + EXPECT_FALSE(state1); + EXPECT_FALSE(state2); + map.up(mapping); + monitor_log.expect({}); + map_log.expect({MapCall::add(mapping)}); + ASSERT_TRUE(state1); + ASSERT_TRUE(state2); + EXPECT_TRUE(state1->ok()); + EXPECT_TRUE(state2->ok()); + ASSERT_TRUE(handler_deleted1); + ASSERT_TRUE(handler_deleted2); +} + +TEST_F(LocalRpcMonitorMapTest, local_remove) { + add_mapping(mapping, true); + map.removeLocal(mapping); + monitor_log.expect({MonitorCall::stop(mapping), MonitorCall::start(mapping, false)}); + map_log.expect({MapCall::remove(mapping)}); + map.up(mapping); // timeout case (should normally not happen) + map_log.expect({MapCall::add(mapping)}); +} + +TEST_F(LocalRpcMonitorMapTest, local_add_local_remove) { + std::unique_ptr<OkState> state; + bool handler_deleted; + map.addLocal(mapping, std::make_unique<MyAddLocalHandler>(state, handler_deleted)); + monitor_log.expect({MonitorCall::start(mapping, true)}); + map_log.expect({}); + map.removeLocal(mapping); + monitor_log.expect({MonitorCall::stop(mapping)}); + map_log.expect({}); + ASSERT_TRUE(state); + EXPECT_TRUE(state->failed()); + ASSERT_TRUE(handler_deleted); +} + +GTEST_MAIN_RUN_ALL_TESTS() diff --git a/slobrok/src/vespa/slobrok/server/cmd.cpp b/slobrok/src/vespa/slobrok/server/cmd.cpp index b809f655a9d..df856189d89 100644 --- a/slobrok/src/vespa/slobrok/server/cmd.cpp +++ b/slobrok/src/vespa/slobrok/server/cmd.cpp @@ -62,6 +62,15 @@ ScriptCommand::makeIgnoreCmd(SBEnv &env, const std::string & name, const std::st return ScriptCommand(std::move(data)); } +ScriptCommand +ScriptCommand::makeRegCompleter(SBEnv &env, + const std::string &name, const std::string &spec, + FRT_RPCRequest *req) +{ + auto data = std::make_unique<ScriptData>(env, name, spec, req); + data->_state = ScriptData::XCH_DOADD; + return ScriptCommand(std::move(data)); +} void ScriptCommand::doRequest() @@ -124,7 +133,9 @@ ScriptCommand::doneHandler(OkState result) LOG(debug, "done doAdd(%s,%s)", name_p, spec_p); data._state = ScriptData::RDC_INVAL; // all OK - data.registerRequest->Return(); + if (data.registerRequest != nullptr) { + data.registerRequest->Return(); + } cleanupReservation(data); return; } else if (data._state == ScriptData::XCH_IGNORE) { diff --git a/slobrok/src/vespa/slobrok/server/cmd.h b/slobrok/src/vespa/slobrok/server/cmd.h index d790ae93f5c..e7f42f75e42 100644 --- a/slobrok/src/vespa/slobrok/server/cmd.h +++ b/slobrok/src/vespa/slobrok/server/cmd.h @@ -26,6 +26,7 @@ public: static ScriptCommand makeRegRpcSrvCmd(SBEnv &env, const std::string &name, const std::string &spec, FRT_RPCRequest *req); static ScriptCommand makeIgnoreCmd(SBEnv &env, const std::string &name, const std::string &spec); + static ScriptCommand makeRegCompleter(SBEnv &env, const std::string &name, const std::string &spec, FRT_RPCRequest *req); void doneHandler(OkState result); void doRequest(); diff --git a/slobrok/src/vespa/slobrok/server/exchange_manager.cpp b/slobrok/src/vespa/slobrok/server/exchange_manager.cpp index 18da01ee526..ccfb8d3bd63 100644 --- a/slobrok/src/vespa/slobrok/server/exchange_manager.cpp +++ b/slobrok/src/vespa/slobrok/server/exchange_manager.cpp @@ -127,17 +127,19 @@ ExchangeManager::diffLists(const ServiceMappingList &lhs, const ServiceMappingLi void ExchangeManager::healthCheck() { - auto oldWorldServices = env().rpcServerMap().allManaged(); - ServiceMappingList oldWorldList; - for (const auto *nsp : oldWorldServices) { - oldWorldList.emplace_back(nsp->getName(), nsp->getSpec()); - } - std::sort(oldWorldList.begin(), oldWorldList.end()); auto newWorldList = env().consensusMap().currentConsensus(); - vespalib::string diff = diffLists(oldWorldList, newWorldList); - if (! diff.empty()) { - LOG(warning, "Diff from old world rpcServerMap to new world consensus map: %s", - diff.c_str()); + if (! _env.useNewLogic()) { + auto oldWorldServices = env().rpcServerMap().allManaged(); + ServiceMappingList oldWorldList; + for (const auto *nsp : oldWorldServices) { + oldWorldList.emplace_back(nsp->getName(), nsp->getSpec()); + } + std::sort(oldWorldList.begin(), oldWorldList.end()); + vespalib::string diff = diffLists(oldWorldList, newWorldList); + if (! diff.empty()) { + LOG(warning, "Diff from old world rpcServerMap to new world consensus map: %s", + diff.c_str()); + } } for (const auto & [ name, partner ] : _partners) { partner->maybeStartFetch(); @@ -145,7 +147,7 @@ ExchangeManager::healthCheck() auto remoteList = partner->remoteMap().allMappings(); // 0 is expected (when remote is down) if (remoteList.size() != 0) { - diff = diffLists(newWorldList, remoteList); + vespalib::string diff = diffLists(newWorldList, remoteList); if (! diff.empty()) { LOG(warning, "Diff from consensus map to peer slobrok mirror: %s", diff.c_str()); diff --git a/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.cpp b/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.cpp index 454d123eead..16e47371cbb 100644 --- a/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.cpp +++ b/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.cpp @@ -10,6 +10,26 @@ namespace slobrok { #pragma GCC diagnostic ignored "-Winline" +namespace { + +struct ChainedAddLocalCompletionHandler : LocalRpcMonitorMap::AddLocalCompletionHandler { + std::unique_ptr<AddLocalCompletionHandler> first; + std::unique_ptr<AddLocalCompletionHandler> second; + + ChainedAddLocalCompletionHandler(std::unique_ptr<AddLocalCompletionHandler> f, + std::unique_ptr<AddLocalCompletionHandler> s) + : first(std::move(f)), second(std::move(s)) + {} + + void doneHandler(OkState result) override { + first->doneHandler(result); + second->doneHandler(result); + } + ~ChainedAddLocalCompletionHandler() override {} +}; + +} + void LocalRpcMonitorMap::DelayedTasks::PerformTask() { std::vector<Event> todo; std::swap(todo, _queue); @@ -25,9 +45,9 @@ void LocalRpcMonitorMap::DelayedTasks::PerformTask() { } } -LocalRpcMonitorMap::LocalRpcMonitorMap(FRT_Supervisor &supervisor, +LocalRpcMonitorMap::LocalRpcMonitorMap(FNET_Scheduler *scheduler, MappingMonitorFactory mappingMonitorFactory) - : _delayedTasks(supervisor.GetScheduler(), *this), + : _delayedTasks(scheduler, *this), _map(), _dispatcher(), _history(), @@ -82,18 +102,37 @@ ServiceMapHistory & LocalRpcMonitorMap::history() { return _history; } +bool LocalRpcMonitorMap::wouldConflict(const ServiceMapping &mapping) const { + auto iter = _map.find(mapping.name); + if (iter == _map.end()) { + return false; // no mapping, no conflict + } + return (iter->second.spec != mapping.spec); +} + void LocalRpcMonitorMap::addLocal(const ServiceMapping &mapping, - std::unique_ptr<ScriptCommand> inflight) + std::unique_ptr<AddLocalCompletionHandler> inflight) { LOG(debug, "try local add: mapping %s->%s", mapping.name.c_str(), mapping.spec.c_str()); auto old = _map.find(mapping.name); if (old != _map.end()) { - const PerService & exists = old->second; + PerService & exists = old->second; if (exists.spec == mapping.spec) { LOG(debug, "added mapping %s->%s was already present", mapping.name.c_str(), mapping.spec.c_str()); - inflight->doneHandler(OkState(0, "already registered")); + if (exists.up) { + inflight->doneHandler(OkState(0, "already registered")); + } else if (exists.inflight) { + auto newInflight = std::make_unique<ChainedAddLocalCompletionHandler>( + std::move(exists.inflight), + std::move(inflight)); + exists.inflight = std::move(newInflight); + } else { + _mappingMonitor->stop(mapping); + exists.inflight = std::move(inflight); + _mappingMonitor->start(mapping, true); + } return; } LOG(warning, "tried addLocal for mapping %s->%s, but already had conflicting mapping %s->%s", @@ -105,6 +144,43 @@ void LocalRpcMonitorMap::addLocal(const ServiceMapping &mapping, addToMap(mapping, localService(mapping, std::move(inflight)), true); } +void LocalRpcMonitorMap::removeLocal(const ServiceMapping &mapping) { + LOG(debug, "try local remove: mapping %s->%s", + mapping.name.c_str(), mapping.spec.c_str()); + auto old = _map.find(mapping.name); + if (old == _map.end()) { + return; // already removed, OK + } + PerService & exists = old->second; + if (exists.spec != mapping.spec) { + LOG(warning, "tried removeLocal for mapping %s->%s, but already had conflicting mapping %s->%s", + mapping.name.c_str(), mapping.spec.c_str(), + mapping.name.c_str(), exists.spec.c_str()); + return; // unregister for old, conflicting mapping + } + if (exists.localOnly) { + // we can just remove it + auto removed = removeFromMap(old); + if (removed.inflight) { + auto target = std::move(removed.inflight); + target->doneHandler(OkState(13, "removed during initialization")); + } + if (removed.up) { + _dispatcher.remove(removed.mapping); + } + return; + } + // also exists in consensus map, so we can't just remove it + // instead, pretend it's down and delay next ping + _mappingMonitor->stop(mapping); + if (exists.up) { + exists.up = false; + _dispatcher.remove(mapping); + } + _mappingMonitor->start(mapping, false); + return; +} + void LocalRpcMonitorMap::add(const ServiceMapping &mapping) { _delayedTasks.handleLater(Event::add(mapping)); } diff --git a/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.h b/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.h index 3b2c74648d2..e3d081eacc9 100644 --- a/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.h +++ b/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.h @@ -27,6 +27,13 @@ namespace slobrok { class LocalRpcMonitorMap : public MapListener, public MappingMonitorOwner { +public: + // Interface used to signal the result of addLocal + struct AddLocalCompletionHandler { + virtual void doneHandler(OkState result) = 0; + virtual ~AddLocalCompletionHandler() {} + }; + private: enum class EventType { ADD, REMOVE }; @@ -66,12 +73,12 @@ private: struct PerService { bool up; bool localOnly; - std::unique_ptr<ScriptCommand> inflight; + std::unique_ptr<AddLocalCompletionHandler> inflight; vespalib::string spec; }; PerService localService(const ServiceMapping &mapping, - std::unique_ptr<ScriptCommand> inflight) + std::unique_ptr<AddLocalCompletionHandler> inflight) { return PerService{ .up = false, @@ -109,22 +116,27 @@ private: ServiceMapping mapping; bool up; bool localOnly; - std::unique_ptr<ScriptCommand> inflight; + std::unique_ptr<AddLocalCompletionHandler> inflight; }; RemovedData removeFromMap(Map::iterator iter); public: - LocalRpcMonitorMap(FRT_Supervisor &supervisor, + LocalRpcMonitorMap(FNET_Scheduler *scheduler, MappingMonitorFactory mappingMonitorFactory); ~LocalRpcMonitorMap(); MapSource &dispatcher() { return _dispatcher; } ServiceMapHistory & history(); + bool wouldConflict(const ServiceMapping &mapping) const; + /** for use by register API, will call doneHandler() on inflight script */ void addLocal(const ServiceMapping &mapping, - std::unique_ptr<ScriptCommand> inflight); + std::unique_ptr<AddLocalCompletionHandler> inflight); + + /** for use by unregister API */ + void removeLocal(const ServiceMapping &mapping); void add(const ServiceMapping &mapping) override; void remove(const ServiceMapping &mapping) override; diff --git a/slobrok/src/vespa/slobrok/server/rpchooks.cpp b/slobrok/src/vespa/slobrok/server/rpchooks.cpp index 4ed173deaa1..ab1d5246ddc 100644 --- a/slobrok/src/vespa/slobrok/server/rpchooks.cpp +++ b/slobrok/src/vespa/slobrok/server/rpchooks.cpp @@ -37,6 +37,12 @@ public: ~MetricsReport() override { Kill(); } }; +struct ScriptCommandWrapper : LocalRpcMonitorMap::AddLocalCompletionHandler { + ScriptCommand script; + ScriptCommandWrapper(ScriptCommand &&script_in) : script(std::move(script_in)) {} + void doneHandler(OkState result) override { script.doneHandler(result); } +}; + } // namespace <unnamed> //----------------------------------------------------------------------------- @@ -215,6 +221,10 @@ RPCHooks::initRPC(FRT_Supervisor *supervisor) } +bool RPCHooks::useNewLogic() const { + return _env.useNewLogic(); +} + void RPCHooks::rpc_listNamesServed(FRT_RPCRequest *req) { @@ -228,6 +238,9 @@ RPCHooks::rpc_listNamesServed(FRT_RPCRequest *req) void RPCHooks::rpc_registerRpcServer(FRT_RPCRequest *req) { + if (useNewLogic()) { + return new_registerRpcServer(req); + } FRT_Values &args = *req->GetParams(); const char *dName = args[0]._string._str; const char *dSpec = args[1]._string._str; @@ -238,7 +251,7 @@ RPCHooks::rpc_registerRpcServer(FRT_RPCRequest *req) // TODO: run only this path, and complete the request instead of ignoring auto script = ScriptCommand::makeIgnoreCmd(_env, dName, dSpec); ServiceMapping mapping{dName, dSpec}; - _env.localMonitorMap().addLocal(mapping, std::make_unique<ScriptCommand>(std::move(script))); + _env.localMonitorMap().addLocal(mapping, std::make_unique<ScriptCommandWrapper>(std::move(script))); } // is this already OK? if (_rpcsrvmanager.alreadyManaged(dName, dSpec)) { @@ -260,9 +273,34 @@ RPCHooks::rpc_registerRpcServer(FRT_RPCRequest *req) completer.doRequest(); } +void RPCHooks::new_registerRpcServer(FRT_RPCRequest *req) { + FRT_Values &args = *req->GetParams(); + const char *dName = args[0]._string._str; + const char *dSpec = args[1]._string._str; + LOG(debug, "RPC: invoked registerRpcServer(%s,%s)", dName, dSpec); + _cnts.registerReqs++; + ServiceMapping mapping{dName, dSpec}; + // can we say now, that this will fail? + if (_env.consensusMap().wouldConflict(mapping)) { + req->SetError(FRTE_RPC_METHOD_FAILED, "conflict detected"); + LOG(info, "cannot register %s at %s: conflict", dName, dSpec); + return; + } + auto script = ScriptCommand::makeRegCompleter(_env, dName, dSpec, req); + req->Detach(); + _env.localMonitorMap().addLocal(mapping, std::make_unique<ScriptCommandWrapper>(std::move(script))); + // TODO: remove this + script = ScriptCommand::makeRegRpcSrvCmd(_env, dName, dSpec, nullptr); + script.doRequest(); + return; +} + void RPCHooks::rpc_unregisterRpcServer(FRT_RPCRequest *req) { + if (useNewLogic()) { + return new_unregisterRpcServer(req); + } FRT_Values &args = *req->GetParams(); const char *dName = args[0]._string._str; const char *dSpec = args[1]._string._str; @@ -278,6 +316,17 @@ RPCHooks::rpc_unregisterRpcServer(FRT_RPCRequest *req) return; } +void RPCHooks::new_unregisterRpcServer(FRT_RPCRequest *req) { + FRT_Values &args = *req->GetParams(); + const char *dName = args[0]._string._str; + const char *dSpec = args[1]._string._str; + ServiceMapping mapping{dName, dSpec}; + _env.localMonitorMap().removeLocal(mapping); + _env.exchangeManager().forwardRemove(dName, dSpec); + LOG(debug, "unregisterRpcServer(%s,%s)", dName, dSpec); + _cnts.otherReqs++; + return; +} void RPCHooks::rpc_addPeer(FRT_RPCRequest *req) @@ -322,6 +371,9 @@ RPCHooks::rpc_removePeer(FRT_RPCRequest *req) void RPCHooks::rpc_wantAdd(FRT_RPCRequest *req) { + if (useNewLogic()) { + return new_wantAdd(req); + } FRT_Values &args = *req->GetParams(); const char *remsb = args[0]._string._str; const char *dName = args[1]._string._str; @@ -341,10 +393,38 @@ RPCHooks::rpc_wantAdd(FRT_RPCRequest *req) return; } +void RPCHooks::new_wantAdd(FRT_RPCRequest *req) { + FRT_Values &args = *req->GetParams(); + const char *remsb = args[0]._string._str; + const char *dName = args[1]._string._str; + const char *dSpec = args[2]._string._str; + FRT_Values &retval = *req->GetReturn(); + ServiceMapping mapping{dName, dSpec}; + bool conflict = ( + _env.consensusMap().wouldConflict(mapping) + || + _env.localMonitorMap().wouldConflict(mapping) + ); + if (conflict) { + retval.AddInt32(13); + retval.AddString("conflict detected"); + req->SetError(FRTE_RPC_METHOD_FAILED, "conflict detected"); + } else { + retval.AddInt32(0); + retval.AddString("ok"); + } + LOG(debug, "%s->wantAdd(%s,%s) %s", + remsb, dName, dSpec, conflict ? "conflict" : "OK"); + _cnts.wantAddReqs++; + return; +} void RPCHooks::rpc_doRemove(FRT_RPCRequest *req) { + if (useNewLogic()) { + return new_doRemove(req); + } FRT_Values &args = *req->GetParams(); const char *rname = args[0]._string._str; const char *dname = args[1]._string._str; @@ -364,9 +444,27 @@ RPCHooks::rpc_doRemove(FRT_RPCRequest *req) return; } +void RPCHooks::new_doRemove(FRT_RPCRequest *req) { + FRT_Values &args = *req->GetParams(); + const char *rname = args[0]._string._str; + const char *dname = args[1]._string._str; + const char *dspec = args[2]._string._str; + FRT_Values &retval = *req->GetReturn(); + ServiceMapping mapping{dname, dspec}; + _env.localMonitorMap().removeLocal(mapping); + retval.AddInt32(0); + retval.AddString("ok"); + LOG(debug, "%s->doRemove(%s,%s)", rname, dname, dspec); + _cnts.doRemoveReqs++; + return; +} + void RPCHooks::rpc_doAdd(FRT_RPCRequest *req) { + if (useNewLogic()) { + return new_doAdd(req); + } FRT_Values &args = *req->GetParams(); const char *rname = args[0]._string._str; const char *dname = args[1]._string._str; @@ -386,6 +484,28 @@ RPCHooks::rpc_doAdd(FRT_RPCRequest *req) return; } +void RPCHooks::new_doAdd(FRT_RPCRequest *req) { + FRT_Values &args = *req->GetParams(); + const char *remsb = args[0]._string._str; + const char *dName = args[1]._string._str; + const char *dSpec = args[2]._string._str; + FRT_Values &retval = *req->GetReturn(); + ServiceMapping mapping{dName, dSpec}; + bool ok = true; + if (_env.consensusMap().wouldConflict(mapping)) { + retval.AddInt32(13); + retval.AddString("conflict detected"); + req->SetError(FRTE_RPC_METHOD_FAILED, "conflict detected"); + ok = false; + } else { + retval.AddInt32(0); + retval.AddString("ok"); + } + LOG(debug, "%s->doAdd(%s,%s) %s", + remsb, dName, dSpec, ok ? "OK" : "failed"); + _cnts.doAddReqs++; + return; +} void RPCHooks::rpc_lookupRpcServer(FRT_RPCRequest *req) @@ -455,6 +575,7 @@ RPCHooks::rpc_lookupManaged(FRT_RPCRequest *req) FRT_Values &args = *req->GetParams(); const char *name = args[0]._string._str; LOG(debug, "RPC: lookupManaged(%s)", name); + // TODO: use local history here const auto & visible = _env.globalHistory(); auto diff = visible.makeDiffFrom(0); for (const auto & entry : diff.updated) { diff --git a/slobrok/src/vespa/slobrok/server/rpchooks.h b/slobrok/src/vespa/slobrok/server/rpchooks.h index a41e473b183..e8f6c65ea47 100644 --- a/slobrok/src/vespa/slobrok/server/rpchooks.h +++ b/slobrok/src/vespa/slobrok/server/rpchooks.h @@ -58,8 +58,16 @@ public: void countFailedHeartbeat() { _cnts.heartBeatFails++; } private: + bool useNewLogic() const; + void rpc_lookupRpcServer(FRT_RPCRequest *req); + void new_registerRpcServer(FRT_RPCRequest *req); + void new_unregisterRpcServer(FRT_RPCRequest *req); + void new_wantAdd(FRT_RPCRequest *req); + void new_doRemove(FRT_RPCRequest *req); + void new_doAdd(FRT_RPCRequest *req); + void rpc_registerRpcServer(FRT_RPCRequest *req); void rpc_unregisterRpcServer(FRT_RPCRequest *req); diff --git a/slobrok/src/vespa/slobrok/server/sbenv.cpp b/slobrok/src/vespa/slobrok/server/sbenv.cpp index 1f54716c29c..ebb9935877f 100644 --- a/slobrok/src/vespa/slobrok/server/sbenv.cpp +++ b/slobrok/src/vespa/slobrok/server/sbenv.cpp @@ -97,12 +97,15 @@ ConfigTask::PerformTask() } // namespace slobrok::<unnamed> -SBEnv::SBEnv(const ConfigShim &shim) +SBEnv::SBEnv(const ConfigShim &shim) : SBEnv(shim, false) {} + +SBEnv::SBEnv(const ConfigShim &shim, bool useNewConsensusLogic) : _transport(std::make_unique<FNET_Transport>(TransportConfig().drop_empty_buffers(true))), _supervisor(std::make_unique<FRT_Supervisor>(_transport.get())), _configShim(shim), _configurator(shim.factory().create(*this)), _shuttingDown(false), + _useNewLogic(useNewConsensusLogic), _partnerList(), _me(createSpec(_configShim.portNumber())), _rpcHooks(*this, _rpcsrvmap, _rpcsrvmanager), @@ -110,7 +113,7 @@ SBEnv::SBEnv(const ConfigShim &shim) _health(), _metrics(_rpcHooks, *_transport), _components(), - _localRpcMonitorMap(*_supervisor, + _localRpcMonitorMap(getScheduler(), [this] (MappingMonitorOwner &owner) { return std::make_unique<RpcMappingMonitor>(*_supervisor, owner); }), @@ -118,11 +121,19 @@ SBEnv::SBEnv(const ConfigShim &shim) _exchanger(*this, _rpcsrvmap), _rpcsrvmap() { + if (useNewLogic()) { + srandom(time(nullptr) ^ getpid()); + // note: feedback loop between these two: + _localMonitorSubscription = MapSubscription::subscribe(_consensusMap, _localRpcMonitorMap); + _consensusSubscription = MapSubscription::subscribe(_localRpcMonitorMap.dispatcher(), _consensusMap); + _globalHistorySubscription = MapSubscription::subscribe(_consensusMap, _globalVisibleHistory); + _rpcHooks.initRPC(getSupervisor()); + return; + } srandom(time(nullptr) ^ getpid()); // note: feedback loop between these two: _localMonitorSubscription = MapSubscription::subscribe(_consensusMap, _localRpcMonitorMap); _consensusSubscription = MapSubscription::subscribe(_localRpcMonitorMap.dispatcher(), _consensusMap); - // TODO: use consensus as source here: _globalHistorySubscription = MapSubscription::subscribe(_rpcsrvmap.proxy(), _globalVisibleHistory); _rpcHooks.initRPC(getSupervisor()); } diff --git a/slobrok/src/vespa/slobrok/server/sbenv.h b/slobrok/src/vespa/slobrok/server/sbenv.h index 44b7305814c..c6fd8905131 100644 --- a/slobrok/src/vespa/slobrok/server/sbenv.h +++ b/slobrok/src/vespa/slobrok/server/sbenv.h @@ -44,6 +44,7 @@ private: ConfigShim _configShim; Configurator::UP _configurator; bool _shuttingDown; + const bool _useNewLogic; SBEnv(const SBEnv &); // Not used SBEnv &operator=(const SBEnv &); // Not used @@ -71,6 +72,7 @@ private: public: explicit SBEnv(const ConfigShim &shim); + SBEnv(const ConfigShim &shim, bool useNewConsensusLogic); ~SBEnv(); FNET_Transport *getTransport() { return _transport.get(); } @@ -105,6 +107,7 @@ public: bool isSuspended() const { return false; } bool isShuttingDown() const { return _shuttingDown; } + bool useNewLogic() const { return _useNewLogic; } int MainLoop(); diff --git a/slobrok/src/vespa/slobrok/server/union_service_map.cpp b/slobrok/src/vespa/slobrok/server/union_service_map.cpp index baf94a6fa69..9abfc237d56 100644 --- a/slobrok/src/vespa/slobrok/server/union_service_map.cpp +++ b/slobrok/src/vespa/slobrok/server/union_service_map.cpp @@ -20,6 +20,19 @@ ServiceMappingList UnionServiceMap::currentConsensus() const { return result; } +bool UnionServiceMap::wouldConflict(const ServiceMapping &mapping) const { + const vespalib::string &key = mapping.name; + auto iter = _mappings.find(key); + if (iter == _mappings.end()) { + return false; + } + const Mappings &values = iter->second; + if (values.size() != 1) { + return true; + } + return (values[0].spec != mapping.spec); +} + void UnionServiceMap::add(const ServiceMapping &mapping) { const vespalib::string &key = mapping.name; diff --git a/slobrok/src/vespa/slobrok/server/union_service_map.h b/slobrok/src/vespa/slobrok/server/union_service_map.h index d5bcbfaed94..67d3221849d 100644 --- a/slobrok/src/vespa/slobrok/server/union_service_map.h +++ b/slobrok/src/vespa/slobrok/server/union_service_map.h @@ -36,6 +36,8 @@ public: ServiceMappingList currentConsensus() const; + bool wouldConflict(const ServiceMapping &mapping) const; + void add(const ServiceMapping &mapping) override; void remove(const ServiceMapping &mapping) override; void update(const ServiceMapping &old_mapping, |