diff options
-rw-r--r-- | storage/src/tests/common/testnodestateupdater.cpp | 8 | ||||
-rw-r--r-- | storage/src/tests/common/testnodestateupdater.h | 1 | ||||
-rw-r--r-- | storage/src/tests/storageserver/bouncertest.cpp | 58 | ||||
-rw-r--r-- | storage/src/vespa/storage/storageserver/bouncer.cpp | 57 | ||||
-rw-r--r-- | storage/src/vespa/storage/storageserver/bouncer.h | 6 |
5 files changed, 113 insertions, 17 deletions
diff --git a/storage/src/tests/common/testnodestateupdater.cpp b/storage/src/tests/common/testnodestateupdater.cpp index 0547980b847..c4afda1a5ad 100644 --- a/storage/src/tests/common/testnodestateupdater.cpp +++ b/storage/src/tests/common/testnodestateupdater.cpp @@ -24,7 +24,13 @@ TestNodeStateUpdater::getClusterStateBundle() const void TestNodeStateUpdater::setClusterState(lib::ClusterState::CSP c) { - _clusterStateBundle = std::make_shared<const lib::ClusterStateBundle>(*c); + setClusterStateBundle(std::make_shared<const lib::ClusterStateBundle>(*c)); +} + +void +TestNodeStateUpdater::setClusterStateBundle(std::shared_ptr<const lib::ClusterStateBundle> clusterStateBundle) +{ + _clusterStateBundle = std::move(clusterStateBundle); for (uint32_t i = 0; i < _listeners.size(); ++i) { _listeners[i]->handleNewState(); } diff --git a/storage/src/tests/common/testnodestateupdater.h b/storage/src/tests/common/testnodestateupdater.h index 5e3001abee4..e0c636d2715 100644 --- a/storage/src/tests/common/testnodestateupdater.h +++ b/storage/src/tests/common/testnodestateupdater.h @@ -42,6 +42,7 @@ public: } void setClusterState(lib::ClusterState::CSP c); + void setClusterStateBundle(std::shared_ptr<const lib::ClusterStateBundle> clusterStateBundle); size_t explicit_node_state_reply_send_invocations() const noexcept { return _explicit_node_state_reply_send_invocations; diff --git a/storage/src/tests/storageserver/bouncertest.cpp b/storage/src/tests/storageserver/bouncertest.cpp index 43683132bc9..f8c80773a2e 100644 --- a/storage/src/tests/storageserver/bouncertest.cpp +++ b/storage/src/tests/storageserver/bouncertest.cpp @@ -9,6 +9,7 @@ #include <tests/common/teststorageapp.h> #include <tests/common/testhelper.h> #include <tests/common/dummystoragelink.h> +#include <vespa/document/bucket/fixed_bucket_spaces.h> #include <vespa/document/test/make_document_bucket.h> #include <vespa/storageapi/message/persistence.h> #include <vespa/config/common/exceptions.h> @@ -38,6 +39,7 @@ struct BouncerTest : public CppUnit::TestFixture { void readOnlyOperationsAreNotRejected(); void internalOperationsAreNotRejected(); void outOfBoundsConfigValuesThrowException(); + void abortOnlyOnDefaultBucketSpace(); CPPUNIT_TEST_SUITE(BouncerTest); CPPUNIT_TEST(testFutureTimestamp); @@ -50,6 +52,7 @@ struct BouncerTest : public CppUnit::TestFixture { CPPUNIT_TEST(readOnlyOperationsAreNotRejected); CPPUNIT_TEST(internalOperationsAreNotRejected); CPPUNIT_TEST(outOfBoundsConfigValuesThrowException); + CPPUNIT_TEST(abortOnlyOnDefaultBucketSpace); CPPUNIT_TEST_SUITE_END(); using Priority = api::StorageMessage::Priority; @@ -64,7 +67,12 @@ struct BouncerTest : public CppUnit::TestFixture { api::Timestamp timestamp, Priority priority = 0); + std::shared_ptr<api::StorageCommand> createDummyFeedMessage( + api::Timestamp timestamp, + document::BucketSpace bucketSpace); + void assertMessageBouncedWithRejection(); + void assertMessageBouncedWithAbort(); void assertMessageNotBounced(); }; @@ -120,6 +128,18 @@ BouncerTest::createDummyFeedMessage(api::Timestamp timestamp, return cmd; } +std::shared_ptr<api::StorageCommand> +BouncerTest::createDummyFeedMessage(api::Timestamp timestamp, + document::BucketSpace bucketSpace) +{ + auto cmd = std::make_shared<api::RemoveCommand>( + document::Bucket(bucketSpace, document::BucketId(0)), + document::DocumentId("doc:foo:bar"), + timestamp); + cmd->setPriority(Priority(0)); + return cmd; +} + void BouncerTest::testFutureTimestamp() { @@ -189,6 +209,17 @@ BouncerTest::assertMessageBouncedWithRejection() } void +BouncerTest::assertMessageBouncedWithAbort() +{ + CPPUNIT_ASSERT_EQUAL(size_t(1), _upper->getNumReplies()); + CPPUNIT_ASSERT_EQUAL(size_t(0), _upper->getNumCommands()); + CPPUNIT_ASSERT_EQUAL(api::ReturnCode::ABORTED, + static_cast<api::RemoveReply&>(*_upper->getReply(0)). + getResult().getResult()); + CPPUNIT_ASSERT_EQUAL(size_t(0), _lower->getNumCommands()); +} + +void BouncerTest::assertMessageNotBounced() { CPPUNIT_ASSERT_EQUAL(size_t(0), _upper->getNumReplies()); @@ -283,5 +314,32 @@ BouncerTest::outOfBoundsConfigValuesThrowException() } catch (config::InvalidConfigException) {} } + +namespace { + +std::shared_ptr<const lib::ClusterStateBundle> +makeClusterStateBundle(const vespalib::string &baselineState, const std::map<document::BucketSpace, vespalib::string> &derivedStates) +{ + lib::ClusterStateBundle::BucketSpaceStateMapping derivedBucketSpaceStates; + for (const auto &entry : derivedStates) { + derivedBucketSpaceStates[entry.first] = std::make_shared<const lib::ClusterState>(entry.second); + } + return std::make_shared<const lib::ClusterStateBundle>(lib::ClusterState(baselineState), std::move(derivedBucketSpaceStates)); +} + +} + +void +BouncerTest::abortOnlyOnDefaultBucketSpace() +{ + auto state = makeClusterStateBundle("distributor:3 storage:3", {{ document::FixedBucketSpaces::default_space(), "distributor:3 storage:3 .2.s:d" }}); + _node->getNodeStateUpdater().setClusterStateBundle(state); + _upper->sendDown(createDummyFeedMessage(11 * 1000000, document::FixedBucketSpaces::default_space())); + assertMessageBouncedWithAbort(); + _upper->reset(); + _upper->sendDown(createDummyFeedMessage(11 * 1000000, document::FixedBucketSpaces::global_space())); + assertMessageNotBounced(); +} + } // storage diff --git a/storage/src/vespa/storage/storageserver/bouncer.cpp b/storage/src/vespa/storage/storageserver/bouncer.cpp index 65e04ad7fdb..620376cfb1d 100644 --- a/storage/src/vespa/storage/storageserver/bouncer.cpp +++ b/storage/src/vespa/storage/storageserver/bouncer.cpp @@ -20,7 +20,8 @@ Bouncer::Bouncer(StorageComponentRegister& compReg, const config::ConfigUri & co _config(new vespa::config::content::core::StorBouncerConfig()), _component(compReg, "bouncer"), _lock(), - _nodeState("s:i"), + _baselineNodeState("s:i"), + _derivedNodeStates(), _clusterState(&lib::State::UP), _configFetcher(configUri.getContext()), _metrics(std::make_unique<BouncerMetrics>()) @@ -55,7 +56,7 @@ Bouncer::print(std::ostream& out, bool verbose, const std::string& indent) const { (void) verbose; (void) indent; - out << "Bouncer(" << _nodeState << ")"; + out << "Bouncer(" << _baselineNodeState << ")"; } void @@ -233,7 +234,7 @@ Bouncer::onDown(const std::shared_ptr<api::StorageMessage>& msg) int feedPriorityLowerBound; { vespalib::LockGuard lock(_lock); - state = &_nodeState.getState(); + state = &getDerivedNodeState(msg->getBucket().getBucketSpace()).getState(); maxClockSkewInSeconds = _config->maxClockSkewSeconds; abortLoadWhenClusterDown = _config->stopExternalLoadWhenClusterDown; isInAvailableState = state->oneOf( @@ -283,25 +284,51 @@ Bouncer::onDown(const std::shared_ptr<api::StorageMessage>& msg) return false; } +namespace { + +lib::NodeState +deriveNodeState(const lib::NodeState &reportedNodeState, + const lib::NodeState ¤tNodeState, + bool useWantedStateIfPossible) +{ + if (useWantedStateIfPossible) { + // If current node state is more strict than our own reported state, + // set node state to our current state + if (reportedNodeState.getState().maySetWantedStateForThisNodeState(currentNodeState.getState())) { + return currentNodeState; + } + } + return reportedNodeState; +} + +} + void Bouncer::handleNewState() { vespalib::LockGuard lock(_lock); - _nodeState = *_component.getStateUpdater().getReportedNodeState(); + const auto reportedNodeState = *_component.getStateUpdater().getReportedNodeState(); const auto clusterStateBundle = _component.getStateUpdater().getClusterStateBundle(); const auto &clusterState = *clusterStateBundle->getBaselineClusterState(); _clusterState = &clusterState.getClusterState(); - if (_config->useWantedStateIfPossible) { - // If current node state is more strict than our own reported state, - // set node state to our current state - lib::NodeState currState = clusterState. - getNodeState(lib::Node(_component.getNodeType(), - _component.getIndex())); - if (_nodeState.getState().maySetWantedStateForThisNodeState( - currState.getState())) - { - _nodeState = currState; - } + const lib::Node node(_component.getNodeType(), _component.getIndex()); + const bool useWantedStateIfPossible = _config->useWantedStateIfPossible; + _baselineNodeState = deriveNodeState(reportedNodeState, clusterState.getNodeState(node), useWantedStateIfPossible); + _derivedNodeStates.clear(); + for (const auto &derivedClusterState : clusterStateBundle->getDerivedClusterStates()) { + _derivedNodeStates[derivedClusterState.first] = + deriveNodeState(reportedNodeState, derivedClusterState.second->getNodeState(node), useWantedStateIfPossible); + } +} + +const lib::NodeState & +Bouncer::getDerivedNodeState(document::BucketSpace bucketSpace) +{ + auto itr = _derivedNodeStates.find(bucketSpace); + if (itr != _derivedNodeStates.end()) { + return itr->second; + } else { + return _baselineNodeState; } } diff --git a/storage/src/vespa/storage/storageserver/bouncer.h b/storage/src/vespa/storage/storageserver/bouncer.h index b46bf3fedc6..a5646a12787 100644 --- a/storage/src/vespa/storage/storageserver/bouncer.h +++ b/storage/src/vespa/storage/storageserver/bouncer.h @@ -18,6 +18,7 @@ #include <vespa/storage/common/storagelink.h> #include <vespa/storage/config/config-stor-bouncer.h> #include <vespa/vespalib/util/sync.h> +#include <unordered_map> namespace config { class ConfigUri; } @@ -32,7 +33,9 @@ class Bouncer : public StorageLink, std::unique_ptr<vespa::config::content::core::StorBouncerConfig> _config; StorageComponent _component; vespalib::Lock _lock; - lib::NodeState _nodeState; + lib::NodeState _baselineNodeState; + using BucketSpaceNodeStateMapping = std::unordered_map<document::BucketSpace, lib::NodeState, document::BucketSpace::hash>; + BucketSpaceNodeStateMapping _derivedNodeStates; const lib::State* _clusterState; config::ConfigFetcher _configFetcher; std::unique_ptr<BouncerMetrics> _metrics; @@ -84,6 +87,7 @@ private: bool onDown(const std::shared_ptr<api::StorageMessage>&) override; void handleNewState() override; + const lib::NodeState &getDerivedNodeState(document::BucketSpace bucketSpace); }; |