diff options
3 files changed, 105 insertions, 19 deletions
diff --git a/storage/src/tests/storageserver/statemanagertest.cpp b/storage/src/tests/storageserver/statemanagertest.cpp index 5c201f86f90..45939cf3e63 100644 --- a/storage/src/tests/storageserver/statemanagertest.cpp +++ b/storage/src/tests/storageserver/statemanagertest.cpp @@ -35,13 +35,23 @@ struct StateManagerTest : public CppUnit::TestFixture { void testReportedNodeState(); void current_cluster_state_version_is_included_in_host_info_json(); void can_explicitly_send_get_node_state_reply(); + void explicit_node_state_send_without_pending_request_immediately_replies_on_next_request(); + void immediate_node_state_reply_sending_is_tracked_per_controller(); CPPUNIT_TEST_SUITE(StateManagerTest); CPPUNIT_TEST(testSystemState); CPPUNIT_TEST(testReportedNodeState); CPPUNIT_TEST(current_cluster_state_version_is_included_in_host_info_json); CPPUNIT_TEST(can_explicitly_send_get_node_state_reply); + CPPUNIT_TEST(explicit_node_state_send_without_pending_request_immediately_replies_on_next_request); + CPPUNIT_TEST(immediate_node_state_reply_sending_is_tracked_per_controller); CPPUNIT_TEST_SUITE_END(); + + void mark_reported_node_state_up(); + void send_down_get_node_state_request(uint16_t controller_index); + void assert_ok_get_node_state_reply_sent_and_clear(); + void clear_sent_replies(); + void mark_reply_observed_from_n_controllers(uint16_t n); }; CPPUNIT_TEST_SUITE_REGISTRATION(StateManagerTest); @@ -255,26 +265,82 @@ void StateManagerTest::current_cluster_state_version_is_included_in_host_info_js CPPUNIT_ASSERT_EQUAL(123, version); } -void StateManagerTest::can_explicitly_send_get_node_state_reply() { - { - auto lock = _manager->grabStateChangeLock(); - _manager->setReportedNodeState(NodeState(NodeType::STORAGE, State::UP)); - } - // Send down a GetNodeState with the same state as we currently have. This - // ensures that the StateManager doesn't auto-reply with the current state - // to inform the caller that the state has changed. +void StateManagerTest::mark_reported_node_state_up() { + auto lock = _manager->grabStateChangeLock(); + _manager->setReportedNodeState(NodeState(NodeType::STORAGE, State::UP)); +} + +void StateManagerTest::send_down_get_node_state_request(uint16_t controller_index) { auto cmd = std::make_shared<api::GetNodeStateCommand>( std::make_unique<NodeState>(NodeType::STORAGE, State::UP)); cmd->setTimeout(10000000); + cmd->setSourceIndex(controller_index); _upper->sendDown(cmd); +} +void StateManagerTest::assert_ok_get_node_state_reply_sent_and_clear() { + CPPUNIT_ASSERT_EQUAL(size_t(1), _upper->getNumReplies()); + std::shared_ptr<api::StorageReply> reply; + GET_ONLY_OK_REPLY(reply); // Implicitly clears messages from _upper + CPPUNIT_ASSERT_EQUAL(api::MessageType::GETNODESTATE_REPLY, reply->getType()); +} + +void StateManagerTest::clear_sent_replies() { + _upper->getRepliesOnce(); +} + +void StateManagerTest::mark_reply_observed_from_n_controllers(uint16_t n) { + for (uint16_t i = 0; i < n; ++i) { + send_down_get_node_state_request(i); + assert_ok_get_node_state_reply_sent_and_clear(); + } +} + +void StateManagerTest::can_explicitly_send_get_node_state_reply() { + mark_reported_node_state_up(); + // Must "pre-trigger" that a controller has already received a GetNodeState + // reply, or an immediate reply will be sent by default when the first request + // from a controller is observed. + mark_reply_observed_from_n_controllers(1); + + send_down_get_node_state_request(0); CPPUNIT_ASSERT_EQUAL(size_t(0), _upper->getNumReplies()); _manager->immediately_send_get_node_state_replies(); + assert_ok_get_node_state_reply_sent_and_clear(); +} - std::shared_ptr<api::StorageReply> reply; - GET_ONLY_OK_REPLY(reply); - CPPUNIT_ASSERT_EQUAL(api::MessageType::GETNODESTATE_REPLY, reply->getType()); +void StateManagerTest::explicit_node_state_send_without_pending_request_immediately_replies_on_next_request() { + mark_reported_node_state_up(); + mark_reply_observed_from_n_controllers(1); + + // No pending requests at this time + _manager->immediately_send_get_node_state_replies(); + + send_down_get_node_state_request(0); + assert_ok_get_node_state_reply_sent_and_clear(); + // Sending a new request should now _not_ immediately receive a reply + send_down_get_node_state_request(0); + CPPUNIT_ASSERT_EQUAL(size_t(0), _upper->getNumReplies()); +} + +void StateManagerTest::immediate_node_state_reply_sending_is_tracked_per_controller() { + mark_reported_node_state_up(); + mark_reply_observed_from_n_controllers(3); + + _manager->immediately_send_get_node_state_replies(); + + send_down_get_node_state_request(0); + send_down_get_node_state_request(1); + send_down_get_node_state_request(2); + CPPUNIT_ASSERT_EQUAL(size_t(3), _upper->getNumReplies()); + clear_sent_replies(); + + // Sending a new request should now _not_ immediately receive a reply + send_down_get_node_state_request(0); + send_down_get_node_state_request(1); + send_down_get_node_state_request(2); + CPPUNIT_ASSERT_EQUAL(size_t(0), _upper->getNumReplies()); } } // storage diff --git a/storage/src/vespa/storage/storageserver/statemanager.cpp b/storage/src/vespa/storage/storageserver/statemanager.cpp index e643350aa65..693e56133df 100644 --- a/storage/src/vespa/storage/storageserver/statemanager.cpp +++ b/storage/src/vespa/storage/storageserver/statemanager.cpp @@ -32,13 +32,10 @@ StateManager::StateManager(StorageComponentRegister& compReg, bool testMode) : StorageLink("State manager"), framework::HtmlStatusReporter("systemstate", "Node and system state"), - _noThreadTestMode(testMode), _component(compReg, "statemanager"), _metricManager(metricManager), _stateLock(), _listenerLock(), - _grabbedExternalLock(false), - _notifyingListeners(false), _nodeState(std::make_shared<lib::NodeState>(_component.getNodeType(), lib::State::INITIALIZING)), _nextNodeState(), _systemState(std::make_shared<const ClusterStateBundle>(lib::ClusterState())), @@ -50,7 +47,11 @@ StateManager::StateManager(StorageComponentRegister& compReg, _progressLastInitStateSend(-1), _systemStateHistory(), _systemStateHistorySize(50), - _hostInfo(std::move(hostInfo)) + _hostInfo(std::move(hostInfo)), + _controllers_observed_explicit_node_state(), + _noThreadTestMode(testMode), + _grabbedExternalLock(false), + _notifyingListeners(false) { _nodeState->setMinUsedBits(58); _nodeState->setStartTimestamp( @@ -453,8 +454,11 @@ StateManager::onGetNodeState(const api::GetNodeStateCommand::SP& cmd) std::shared_ptr<api::GetNodeStateReply> reply; { vespalib::LockGuard lock(_stateLock); + const bool is_up_to_date = (_controllers_observed_explicit_node_state.find(cmd->getSourceIndex()) + != _controllers_observed_explicit_node_state.end()); if (cmd->getExpectedState() != nullptr - && (*cmd->getExpectedState() == *_nodeState || sentReply)) + && (*cmd->getExpectedState() == *_nodeState || sentReply) + && is_up_to_date) { LOG(debug, "Received get node state request with timeout of " "%u milliseconds. Scheduling to be answered in " @@ -478,6 +482,7 @@ StateManager::onGetNodeState(const api::GetNodeStateCommand::SP& cmd) lock.unlock(); std::string nodeInfo(getNodeInfo()); reply->setNodeInfo(nodeInfo); + mark_controller_as_having_observed_explicit_node_state(cmd->getSourceIndex()); } } if (reply) { @@ -486,6 +491,10 @@ StateManager::onGetNodeState(const api::GetNodeStateCommand::SP& cmd) return true; } +void StateManager::mark_controller_as_having_observed_explicit_node_state(uint16_t controller_index) { + _controllers_observed_explicit_node_state.emplace(controller_index); +} + void StateManager::setClusterStateBundle(const ClusterStateBundle& c) { @@ -545,6 +554,7 @@ StateManager::sendGetNodeStateReplies(framework::MilliSecTime olderThanTime, replies.emplace_back(std::make_shared<api::GetNodeStateReply>(*it->second, *_nodeState)); auto eraseIt = it++; _queuedStateRequests.erase(eraseIt); + mark_controller_as_having_observed_explicit_node_state(eraseIt->second->getSourceIndex()); } else { ++it; } @@ -631,6 +641,11 @@ StateManager::getNodeInfo() const void StateManager::immediately_send_get_node_state_replies() { LOG(debug, "Immediately replying to all pending GetNodeState requests"); + { + vespalib::MonitorGuard guard(_stateLock); + // Next GetNodeState request from any controller will be replied to instantly + _controllers_observed_explicit_node_state.clear(); + } sendGetNodeStateReplies(); } diff --git a/storage/src/vespa/storage/storageserver/statemanager.h b/storage/src/vespa/storage/storageserver/statemanager.h index f38232c821c..a10e8d01ee5 100644 --- a/storage/src/vespa/storage/storageserver/statemanager.h +++ b/storage/src/vespa/storage/storageserver/statemanager.h @@ -24,6 +24,7 @@ #include <vespa/vespalib/objects/floatingpointtype.h> #include <deque> #include <map> +#include <unordered_set> #include <list> #include <atomic> @@ -41,13 +42,10 @@ class StateManager : public NodeStateUpdater, private framework::Runnable, private vespalib::JsonStreamTypes { - bool _noThreadTestMode; StorageComponent _component; metrics::MetricManager& _metricManager; vespalib::Monitor _stateLock; vespalib::Lock _listenerLock; - bool _grabbedExternalLock; - std::atomic<bool> _notifyingListeners; std::shared_ptr<lib::NodeState> _nodeState; std::shared_ptr<lib::NodeState> _nextNodeState; using ClusterStateBundle = lib::ClusterStateBundle; @@ -65,6 +63,12 @@ class StateManager : public NodeStateUpdater, uint32_t _systemStateHistorySize; std::unique_ptr<HostInfo> _hostInfo; framework::Thread::UP _thread; + // Controllers that have observed a GetNodeState response sent _after_ + // immediately_send_get_node_state_replies() has been invoked. + std::unordered_set<uint16_t> _controllers_observed_explicit_node_state; + bool _noThreadTestMode; + bool _grabbedExternalLock; + std::atomic<bool> _notifyingListeners; public: explicit StateManager(StorageComponentRegister&, metrics::MetricManager&, @@ -102,6 +106,7 @@ private: bool sendGetNodeStateReplies( framework::MilliSecTime olderThanTime = framework::MilliSecTime(0), uint16_t index = 0xffff); + void mark_controller_as_having_observed_explicit_node_state(uint16_t controller_index); lib::Node thisNode() const; |