summaryrefslogtreecommitdiffstats
path: root/storage/src/tests/distributor/distributortest.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'storage/src/tests/distributor/distributortest.cpp')
-rw-r--r--storage/src/tests/distributor/distributortest.cpp48
1 files changed, 48 insertions, 0 deletions
diff --git a/storage/src/tests/distributor/distributortest.cpp b/storage/src/tests/distributor/distributortest.cpp
index bfa1181eca1..3aedd31f574 100644
--- a/storage/src/tests/distributor/distributortest.cpp
+++ b/storage/src/tests/distributor/distributortest.cpp
@@ -46,6 +46,8 @@ class Distributor_Test : public CppUnit::TestFixture,
CPPUNIT_TEST(max_clock_skew_config_is_propagated_to_distributor_config);
CPPUNIT_TEST(configured_safe_time_point_rejection_works_end_to_end);
CPPUNIT_TEST(sequencing_config_is_propagated_to_distributor_config);
+ CPPUNIT_TEST(merge_busy_inhibit_duration_config_is_propagated_to_distributor_config);
+ CPPUNIT_TEST(merge_busy_inhibit_duration_is_propagated_to_pending_message_tracker);
CPPUNIT_TEST_SUITE_END();
protected:
@@ -73,6 +75,8 @@ protected:
void max_clock_skew_config_is_propagated_to_distributor_config();
void configured_safe_time_point_rejection_works_end_to_end();
void sequencing_config_is_propagated_to_distributor_config();
+ void merge_busy_inhibit_duration_config_is_propagated_to_distributor_config();
+ void merge_busy_inhibit_duration_is_propagated_to_pending_message_tracker();
public:
void setUp() override {
@@ -177,6 +181,7 @@ private:
void assertSingleBouncedRemoveReplyPresent();
void assertNoMessageBounced();
void configure_mutation_sequencing(bool enabled);
+ void configure_merge_busy_inhibit_duration(int seconds);
};
CPPUNIT_TEST_SUITE_REGISTRATION(Distributor_Test);
@@ -819,6 +824,49 @@ void Distributor_Test::sequencing_config_is_propagated_to_distributor_config() {
CPPUNIT_ASSERT(getConfig().getSequenceMutatingOperations());
}
+void
+Distributor_Test::configure_merge_busy_inhibit_duration(int seconds) {
+ using namespace vespa::config::content::core;
+ using ConfigBuilder = StorDistributormanagerConfigBuilder;
+
+ ConfigBuilder builder;
+ builder.inhibitMergeSendingOnBusyNodeDurationSec = seconds;
+ getConfig().configure(builder);
+ _distributor->enableNextConfig();
+}
+
+void Distributor_Test::merge_busy_inhibit_duration_config_is_propagated_to_distributor_config() {
+ setupDistributor(Redundancy(2), NodeCount(2), "storage:2 distributor:1");
+
+ configure_merge_busy_inhibit_duration(7);
+ CPPUNIT_ASSERT(getConfig().getInhibitMergesOnBusyNodeDuration() == std::chrono::seconds(7));
+}
+
+void Distributor_Test::merge_busy_inhibit_duration_is_propagated_to_pending_message_tracker() {
+ setupDistributor(Redundancy(2), NodeCount(2), "storage:1 distributor:1");
+ addNodesToBucketDB(document::BucketId(16, 1), "0=1/1/1/t");
+
+ configure_merge_busy_inhibit_duration(100);
+ auto cmd = makeDummyRemoveCommand(); // Remove is for bucket 1
+ _distributor->handleMessage(cmd);
+
+ // Should send to content node 0
+ CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.commands.size());
+ CPPUNIT_ASSERT_EQUAL(api::MessageType::REMOVE, _sender.commands[0]->getType());
+ auto& fwd_cmd = dynamic_cast<api::RemoveCommand&>(*_sender.commands[0]);
+ auto reply = fwd_cmd.makeReply();
+ reply->setResult(api::ReturnCode(api::ReturnCode::BUSY));
+ _distributor->handleReply(std::shared_ptr<api::StorageReply>(std::move(reply)));
+
+ auto& node_info = _distributor->getPendingMessageTracker().getNodeInfo();
+
+ CPPUNIT_ASSERT(node_info.isBusy(0));
+ getClock().addSecondsToTime(99);
+ CPPUNIT_ASSERT(node_info.isBusy(0));
+ getClock().addSecondsToTime(2);
+ CPPUNIT_ASSERT(!node_info.isBusy(0));
+}
+
}
}