// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include using document::test::makeDocumentBucket; using namespace ::testing; namespace storage { struct BouncerTest : public Test { std::unique_ptr _config; std::unique_ptr _node; std::unique_ptr _upper; Bouncer* _manager; DummyStorageLink* _lower; BouncerTest(); void SetUp() override; void TearDown() override; void setUpAsNode(const lib::NodeType& type); using Priority = api::StorageMessage::Priority; static constexpr int RejectionDisabledConfigValue = -1; void expectMessageBouncedWithRejection() const; void expect_message_bounced_with_node_down_abort() const; void expect_message_bounced_with_shutdown_abort() const; void expectMessageNotBounced() const; }; BouncerTest::BouncerTest() : _node(), _upper(), _manager(nullptr), _lower(nullptr) { } void BouncerTest::setUpAsNode(const lib::NodeType& type) { _config = StorageConfigSet::make_node_config(type == lib::NodeType::STORAGE); if (type == lib::NodeType::STORAGE) { _node = std::make_unique(NodeIndex(2), _config->config_uri()); } else { _node = std::make_unique(NodeIndex(2), _config->config_uri()); } _upper = std::make_unique(); using StorBouncerConfig = vespa::config::content::core::StorBouncerConfig; auto& cfg_uri = _config->config_uri(); auto cfg = config::ConfigGetter::getConfig(cfg_uri.getConfigId(), cfg_uri.getContext()); _manager = new Bouncer(_node->getComponentRegister(), *cfg); _lower = new DummyStorageLink(); _upper->push_back(std::unique_ptr(_manager)); _upper->push_back(std::unique_ptr(_lower)); _upper->open(); _node->getClock().setAbsoluteTimeInSeconds(10); } void BouncerTest::SetUp() { setUpAsNode(lib::NodeType::STORAGE); } void BouncerTest::TearDown() { _manager = nullptr; _lower = nullptr; if (_upper) { _upper->close(); _upper->flush(); _upper.reset(); } _node.reset(); } std::shared_ptr createDummyFeedMessage(api::Timestamp timestamp, api::StorageMessage::Priority priority = 0) { auto cmd = std::make_shared( makeDocumentBucket(document::BucketId(0)), document::DocumentId("id:ns:foo::bar"), timestamp); cmd->setPriority(priority); return cmd; } std::shared_ptr createDummyFeedMessage(api::Timestamp timestamp, document::BucketSpace bucketSpace) { auto cmd = std::make_shared( document::Bucket(bucketSpace, document::BucketId(0)), document::DocumentId("id:ns:foo::bar"), timestamp); cmd->setPriority(BouncerTest::Priority(0)); return cmd; } std::shared_ptr create_dummy_get_message() { return std::make_shared( document::Bucket(document::FixedBucketSpaces::default_space(), document::BucketId(0)), document::DocumentId("id:ns:foo::bar"), document::AllFields::NAME); } TEST_F(BouncerTest, future_timestamp) { EXPECT_EQ(0, _manager->metrics().clock_skew_aborts.getValue()); // Fail when future timestamps (more than 5 seconds) are received. { _upper->sendDown(createDummyFeedMessage(16 * 1000000)); ASSERT_EQ(1, _upper->getNumReplies()); EXPECT_EQ(0, _upper->getNumCommands()); EXPECT_EQ(api::ReturnCode::REJECTED, dynamic_cast(*_upper->getReply(0)).getResult().getResult()); _upper->reset(); } EXPECT_EQ(1, _manager->metrics().clock_skew_aborts.getValue()); // Verify that 1 second clock skew is OK { _upper->sendDown(createDummyFeedMessage(11 * 1000000)); EXPECT_EQ(0, _upper->getNumReplies()); EXPECT_EQ(1, _lower->getNumCommands()); _lower->reset(); } // Verify that past is OK { _upper->sendDown(createDummyFeedMessage(5 * 1000000)); EXPECT_EQ(1, _lower->getNumCommands()); } EXPECT_EQ(1, _manager->metrics().clock_skew_aborts.getValue()); } TEST_F(BouncerTest, allow_notify_bucket_change_even_when_distributor_down) { lib::NodeState state(lib::NodeType::DISTRIBUTOR, lib::State::DOWN); _node->getNodeStateUpdater().setReportedNodeState(state); // Trigger Bouncer state update auto clusterState = std::make_shared( "distributor:3 storage:3"); _node->getNodeStateUpdater().setClusterState(clusterState); document::BucketId bucket(16, 1234); api::BucketInfo info(0x1, 0x2, 0x3); auto cmd = std::make_shared(makeDocumentBucket(bucket), info); _upper->sendDown(cmd); EXPECT_EQ(0, _upper->getNumReplies()); EXPECT_EQ(1, _lower->getNumCommands()); } void BouncerTest::expectMessageBouncedWithRejection() const { ASSERT_EQ(1, _upper->getNumReplies()); EXPECT_EQ(0, _upper->getNumCommands()); EXPECT_EQ(api::ReturnCode::REJECTED, dynamic_cast(*_upper->getReply(0)).getResult().getResult()); EXPECT_EQ(size_t(0), _lower->getNumCommands()); } void BouncerTest::expect_message_bounced_with_node_down_abort() const { ASSERT_EQ(1, _upper->getNumReplies()); EXPECT_EQ(0, _upper->getNumCommands()); auto& reply = dynamic_cast(*_upper->getReply(0)); EXPECT_EQ(api::ReturnCode(api::ReturnCode::ABORTED, "We don't allow command of type MessageType(12, Remove) " "when node is in state Down (on storage.2)"), reply.getResult()); EXPECT_EQ(0, _lower->getNumCommands()); } void BouncerTest::expect_message_bounced_with_shutdown_abort() const { ASSERT_EQ(1, _upper->getNumReplies()); EXPECT_EQ(0, _upper->getNumCommands()); auto& reply = dynamic_cast(*_upper->getReply(0)); EXPECT_EQ(api::ReturnCode(api::ReturnCode::ABORTED, "Node is shutting down"), reply.getResult()); EXPECT_EQ(0, _lower->getNumCommands()); } void BouncerTest::expectMessageNotBounced() const { EXPECT_EQ(size_t(0), _upper->getNumReplies()); EXPECT_EQ(size_t(1), _lower->getNumCommands()); } TEST_F(BouncerTest, priority_rejection_is_disabled_by_default_in_config) { _upper->sendDown(createDummyFeedMessage(11 * 1000000, Priority(255))); expectMessageNotBounced(); } TEST_F(BouncerTest, read_only_operations_are_not_rejected) { // StatBucket is an external operation, but it's not a mutating operation // and should therefore not be blocked. auto cmd = std::make_shared(makeDocumentBucket(document::BucketId(16, 5)), ""); cmd->setPriority(Priority(2)); _upper->sendDown(cmd); expectMessageNotBounced(); } TEST_F(BouncerTest, internal_operations_are_not_rejected) { document::BucketId bucket(16, 1234); api::BucketInfo info(0x1, 0x2, 0x3); auto cmd = std::make_shared(makeDocumentBucket(bucket), info); cmd->setPriority(Priority(2)); _upper->sendDown(cmd); expectMessageNotBounced(); } namespace { std::shared_ptr makeClusterStateBundle(const vespalib::string &baselineState, const std::map &derivedStates) { lib::ClusterStateBundle::BucketSpaceStateMapping derivedBucketSpaceStates; for (const auto &entry : derivedStates) { derivedBucketSpaceStates[entry.first] = std::make_shared(entry.second); } return std::make_shared(lib::ClusterState(baselineState), std::move(derivedBucketSpaceStates)); } } TEST_F(BouncerTest, abort_request_when_derived_bucket_space_node_state_is_marked_down) { EXPECT_EQ(0, _manager->metrics().unavailable_node_aborts.getValue()); auto state = makeClusterStateBundle("distributor:3 storage:3", {{ document::FixedBucketSpaces::default_space(), "distributor:3 storage:3 .2.s:d" }}); _node->getNodeStateUpdater().setClusterStateBundle(state); _upper->sendDown(createDummyFeedMessage(11 * 1000000, document::FixedBucketSpaces::default_space())); expect_message_bounced_with_node_down_abort(); EXPECT_EQ(1, _manager->metrics().unavailable_node_aborts.getValue()); _upper->reset(); _upper->sendDown(createDummyFeedMessage(11 * 1000000, document::FixedBucketSpaces::global_space())); expectMessageNotBounced(); EXPECT_EQ(1, _manager->metrics().unavailable_node_aborts.getValue()); } TEST_F(BouncerTest, client_operations_are_allowed_through_on_cluster_state_down_distributor) { TearDown(); setUpAsNode(lib::NodeType::DISTRIBUTOR); // Distributor states never vary across bucket spaces, so not necessary to test with // anything except baseline state here. auto state = makeClusterStateBundle("distributor:3 .2.s:d storage:3", {}); _node->getNodeStateUpdater().setClusterStateBundle(state); _upper->sendDown(createDummyFeedMessage(11 * 1000000, document::FixedBucketSpaces::default_space())); expectMessageNotBounced(); EXPECT_EQ(0, _manager->metrics().unavailable_node_aborts.getValue()); } TEST_F(BouncerTest, cluster_state_activation_commands_are_not_bounced) { TearDown(); setUpAsNode(lib::NodeType::DISTRIBUTOR); auto state = makeClusterStateBundle("version:10 distributor:3 .2.s:d storage:3", {}); // Our index (2) is down _node->getNodeStateUpdater().setClusterStateBundle(state); auto activate_cmd = std::make_shared(11); _upper->sendDown(activate_cmd); expectMessageNotBounced(); } TEST_F(BouncerTest, allow_get_operations_when_node_is_in_maintenance_mode) { auto state = makeClusterStateBundle("version:10 distributor:3 storage:3 .2.s:m", {}); // Our index is 2 _node->getNodeStateUpdater().setClusterStateBundle(state); _upper->sendDown(create_dummy_get_message()); expectMessageNotBounced(); EXPECT_EQ(0, _manager->metrics().unavailable_node_aborts.getValue()); } namespace { std::shared_ptr make_remove_with_used_bits(uint8_t n_bits) { auto id = document::DocumentId("id:ns:foo::bar"); auto raw_bucket = id.getGlobalId().convertToBucketId(); return std::make_shared( makeDocumentBucket(document::BucketId(n_bits, raw_bucket.getRawId())), id, api::Timestamp(1000)); } } TEST_F(BouncerTest, operation_with_too_few_bucket_bits_is_rejected) { auto cmd = make_remove_with_used_bits(spi::BucketLimits::MinUsedBits - 1); _upper->sendDown(std::move(cmd)); expectMessageBouncedWithRejection(); } TEST_F(BouncerTest, operation_with_sufficient_bucket_bits_is_not_rejected) { auto cmd = make_remove_with_used_bits(spi::BucketLimits::MinUsedBits); _upper->sendDown(std::move(cmd)); expectMessageNotBounced(); } TEST_F(BouncerTest, requests_are_rejected_after_close) { _manager->close(); _upper->sendDown(createDummyFeedMessage(11 * 1000000, document::FixedBucketSpaces::default_space())); expect_message_bounced_with_shutdown_abort(); } TEST_F(BouncerTest, replies_are_swallowed_after_close) { _manager->close(); auto req = createDummyFeedMessage(11 * 1000000, document::FixedBucketSpaces::default_space()); auto reply = req->makeReply(); _upper->sendDown(std::move(reply)); EXPECT_EQ(0, _upper->getNumCommands()); EXPECT_EQ(0, _upper->getNumReplies()); EXPECT_EQ(0, _lower->getNumCommands()); EXPECT_EQ(0, _lower->getNumReplies()); } } // storage