summaryrefslogtreecommitdiffstats
path: root/storage/src/tests
diff options
context:
space:
mode:
Diffstat (limited to 'storage/src/tests')
-rw-r--r--storage/src/tests/distributor/CMakeLists.txt1
-rw-r--r--storage/src/tests/distributor/blockingoperationstartertest.cpp3
-rw-r--r--storage/src/tests/distributor/distributor_stripe_test.cpp17
-rw-r--r--storage/src/tests/distributor/distributor_stripe_test_util.cpp9
-rw-r--r--storage/src/tests/distributor/distributor_stripe_test_util.h2
-rw-r--r--storage/src/tests/distributor/mergeoperationtest.cpp62
-rw-r--r--storage/src/tests/distributor/mock_tickable_stripe.h4
-rw-r--r--storage/src/tests/distributor/node_supported_features_repo_test.cpp52
-rw-r--r--storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp53
-rw-r--r--storage/src/tests/distributor/top_level_distributor_test_util.cpp6
-rw-r--r--storage/src/tests/storageserver/mergethrottlertest.cpp58
11 files changed, 242 insertions, 25 deletions
diff --git a/storage/src/tests/distributor/CMakeLists.txt b/storage/src/tests/distributor/CMakeLists.txt
index 7348cfc328b..bee7650aebd 100644
--- a/storage/src/tests/distributor/CMakeLists.txt
+++ b/storage/src/tests/distributor/CMakeLists.txt
@@ -25,6 +25,7 @@ vespa_add_executable(storage_distributor_gtest_runner_app TEST
mergelimitertest.cpp
mergeoperationtest.cpp
multi_thread_stripe_access_guard_test.cpp
+ node_supported_features_repo_test.cpp
nodeinfotest.cpp
nodemaintenancestatstrackertest.cpp
operation_sequencer_test.cpp
diff --git a/storage/src/tests/distributor/blockingoperationstartertest.cpp b/storage/src/tests/distributor/blockingoperationstartertest.cpp
index 15aada37c9b..7c97c962a97 100644
--- a/storage/src/tests/distributor/blockingoperationstartertest.cpp
+++ b/storage/src/tests/distributor/blockingoperationstartertest.cpp
@@ -100,6 +100,9 @@ struct FakeDistributorStripeOperationContext : public DistributorStripeOperation
const BucketGcTimeCalculator::BucketIdHasher& bucket_id_hasher() const override {
abort();
}
+ const NodeSupportedFeaturesRepo& node_supported_features_repo() const noexcept override {
+ abort();
+ }
};
struct BlockingOperationStarterTest : Test {
diff --git a/storage/src/tests/distributor/distributor_stripe_test.cpp b/storage/src/tests/distributor/distributor_stripe_test.cpp
index 902dd6454f1..8c2ebc983fa 100644
--- a/storage/src/tests/distributor/distributor_stripe_test.cpp
+++ b/storage/src/tests/distributor/distributor_stripe_test.cpp
@@ -185,6 +185,12 @@ struct DistributorStripeTest : Test, DistributorStripeTestUtil {
configure_stripe(builder);
}
+ void configure_use_unordered_merge_chaining(bool use_unordered) {
+ ConfigBuilder builder;
+ builder.useUnorderedMergeChaining = use_unordered;
+ configure_stripe(builder);
+ }
+
bool scheduler_has_implicitly_clear_priority_on_schedule_set() const noexcept {
return _stripe->_scheduler->implicitly_clear_priority_on_schedule();
}
@@ -982,4 +988,15 @@ TEST_F(DistributorStripeTest, closing_aborts_gets_started_outside_stripe_thread)
EXPECT_EQ(api::ReturnCode::ABORTED, _sender.reply(0)->getResult().getResult());
}
+TEST_F(DistributorStripeTest, use_unordered_merge_chaining_config_is_propagated_to_internal_config)
+{
+ setup_stripe(Redundancy(1), NodeCount(1), "distributor:1 storage:1");
+
+ configure_use_unordered_merge_chaining(true);
+ EXPECT_TRUE(getConfig().use_unordered_merge_chaining());
+
+ configure_use_unordered_merge_chaining(false);
+ EXPECT_FALSE(getConfig().use_unordered_merge_chaining());
+}
+
}
diff --git a/storage/src/tests/distributor/distributor_stripe_test_util.cpp b/storage/src/tests/distributor/distributor_stripe_test_util.cpp
index c5c51e64e68..b96b2dda1cb 100644
--- a/storage/src/tests/distributor/distributor_stripe_test_util.cpp
+++ b/storage/src/tests/distributor/distributor_stripe_test_util.cpp
@@ -9,8 +9,10 @@
#include <vespa/storage/distributor/distributor_stripe_component.h>
#include <vespa/storage/distributor/distributormetricsset.h>
#include <vespa/storage/distributor/ideal_state_total_metrics.h>
+#include <vespa/storage/distributor/node_supported_features_repo.h>
#include <vespa/vdslib/distribution/distribution.h>
#include <vespa/vespalib/text/stringtokenizer.h>
+#include <vespa/vespalib/stllike/hash_map.hpp>
using document::test::makeBucketSpace;
using document::test::makeDocumentBucket;
@@ -526,6 +528,13 @@ DistributorStripeTestUtil::db_memory_sample_interval() const noexcept {
return _stripe->db_memory_sample_interval();
}
+void
+DistributorStripeTestUtil::set_node_supported_features(uint16_t node, const NodeSupportedFeatures& features) {
+ vespalib::hash_map<uint16_t, NodeSupportedFeatures> new_features;
+ new_features[node] = features;
+ _stripe->update_node_supported_features_repo(_stripe->node_supported_features_repo().make_union_of(new_features));
+}
+
const lib::Distribution&
DistributorStripeTestUtil::getDistribution() const {
return getBucketSpaceRepo().get(makeBucketSpace()).getDistribution();
diff --git a/storage/src/tests/distributor/distributor_stripe_test_util.h b/storage/src/tests/distributor/distributor_stripe_test_util.h
index b1e90821e3b..3226c16aba3 100644
--- a/storage/src/tests/distributor/distributor_stripe_test_util.h
+++ b/storage/src/tests/distributor/distributor_stripe_test_util.h
@@ -26,6 +26,7 @@ class DocumentSelectionParser;
class ExternalOperationHandler;
class IdealStateManager;
class IdealStateMetricSet;
+class NodeSupportedFeatures;
class Operation;
class StripeBucketDBUpdater;
@@ -150,6 +151,7 @@ public:
[[nodiscard]] const PendingMessageTracker& pending_message_tracker() const noexcept;
[[nodiscard]] PendingMessageTracker& pending_message_tracker() noexcept;
[[nodiscard]] std::chrono::steady_clock::duration db_memory_sample_interval() const noexcept;
+ void set_node_supported_features(uint16_t node, const NodeSupportedFeatures& features);
const lib::Distribution& getDistribution() const;
diff --git a/storage/src/tests/distributor/mergeoperationtest.cpp b/storage/src/tests/distributor/mergeoperationtest.cpp
index 65ee5254193..54bd06c98e0 100644
--- a/storage/src/tests/distributor/mergeoperationtest.cpp
+++ b/storage/src/tests/distributor/mergeoperationtest.cpp
@@ -1,5 +1,5 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include <tests/common/dummystoragelink.h>
+
#include <tests/distributor/distributor_stripe_test_util.h>
#include <vespa/document/test/make_bucket_space.h>
#include <vespa/document/test/make_document_bucket.h>
@@ -12,6 +12,7 @@
#include <vespa/vdslib/distribution/distribution.h>
#include <vespa/vespalib/gtest/gtest.h>
#include <vespa/vespalib/text/stringtokenizer.h>
+#include <charconv>
using document::test::makeDocumentBucket;
using document::test::makeBucketSpace;
@@ -37,6 +38,7 @@ struct MergeOperationTest : Test, DistributorStripeTestUtil {
}
std::shared_ptr<MergeOperation> setup_minimal_merge_op();
+ std::shared_ptr<MergeOperation> setup_simple_merge_op(const std::vector<uint16_t>& nodes);
std::shared_ptr<MergeOperation> setup_simple_merge_op();
void assert_simple_merge_bucket_command();
void assert_simple_delete_bucket_command();
@@ -47,13 +49,13 @@ std::shared_ptr<MergeOperation>
MergeOperationTest::setup_minimal_merge_op()
{
document::BucketId bucket_id(16, 1);
- auto op = std::make_shared<MergeOperation>(BucketAndNodes(makeDocumentBucket(bucket_id), toVector<uint16_t>(0, 1, 2)));
+ auto op = std::make_shared<MergeOperation>(BucketAndNodes(makeDocumentBucket(bucket_id), {0, 1, 2}));
op->setIdealStateManager(&getIdealStateManager());
return op;
}
std::shared_ptr<MergeOperation>
-MergeOperationTest::setup_simple_merge_op()
+MergeOperationTest::setup_simple_merge_op(const std::vector<uint16_t>& nodes)
{
getClock().setAbsoluteTimeInSeconds(10);
@@ -64,12 +66,18 @@ MergeOperationTest::setup_simple_merge_op()
enable_cluster_state("distributor:1 storage:3");
- auto op = std::make_shared<MergeOperation>(BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)), toVector<uint16_t>(0, 1, 2)));
+ auto op = std::make_shared<MergeOperation>(BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)), nodes));
op->setIdealStateManager(&getIdealStateManager());
op->start(_sender, framework::MilliSecTime(0));
return op;
}
+std::shared_ptr<MergeOperation>
+MergeOperationTest::setup_simple_merge_op()
+{
+ return setup_simple_merge_op({0, 1, 2});
+}
+
void
MergeOperationTest::assert_simple_merge_bucket_command()
{
@@ -150,8 +158,10 @@ std::string getNodeList(std::string state, uint32_t redundancy, std::string exis
num.erase(pos);
trusted = true;
}
- bucketDB[i] = BucketCopy(0, atoi(num.c_str()),
- api::BucketInfo(1, 2, 3));
+ uint16_t node;
+ [[maybe_unused]] auto [ptr, ec] = std::from_chars(num.data(), num.data() + num.size(), node);
+ assert(ec == std::errc{});
+ bucketDB[i] = BucketCopy(0, node, api::BucketInfo(1, 2, 3));
bucketDB[i].setTrusted(trusted);
}
std::vector<MergeMetaData> nodes(st.size());
@@ -553,4 +563,44 @@ TEST_F(MergeOperationTest, on_throttled_updates_metrics)
EXPECT_EQ(1, metrics->throttled.getValue());
}
+TEST_F(MergeOperationTest, unordered_merges_only_sent_iff_config_enabled_and_all_nodes_support_feature) {
+ setup_stripe(Redundancy(4), NodeCount(4), "distributor:1 storage:4");
+ NodeSupportedFeatures with_unordered;
+ with_unordered.unordered_merge_chaining = true;
+
+ set_node_supported_features(1, with_unordered);
+ set_node_supported_features(2, with_unordered);
+
+ auto config = make_config();
+ config->set_use_unordered_merge_chaining(true);
+ configure_stripe(std::move(config));
+
+ // Only nodes {1, 2} support unordered merging; merges should be ordered (sent to lowest index node 1).
+ setup_simple_merge_op({1, 2, 3}); // Note: these will be re-ordered in ideal state order internally
+ ASSERT_EQ("MergeBucketCommand(BucketId(0x4000000000000001), to time 10000000, "
+ "cluster state version: 0, nodes: [2, 1, 3], chain: [], "
+ "reasons to start: ) => 1",
+ _sender.getLastCommand(true));
+
+ // All involved nodes support unordered merging; merges should be unordered (sent to ideal node 2)
+ setup_simple_merge_op({1, 2});
+ ASSERT_EQ("MergeBucketCommand(BucketId(0x4000000000000001), to time 10000001, "
+ "cluster state version: 0, nodes: [2, 1], chain: [] (unordered forwarding), "
+ "reasons to start: ) => 2",
+ _sender.getLastCommand(true));
+
+ _sender.clear();
+
+ config = make_config();
+ config->set_use_unordered_merge_chaining(false);
+ configure_stripe(std::move(config));
+
+ // If config is not enabled, should send ordered even if nodes support the feature.
+ setup_simple_merge_op({2, 1});
+ ASSERT_EQ("MergeBucketCommand(BucketId(0x4000000000000001), to time 10000002, "
+ "cluster state version: 0, nodes: [2, 1], chain: [], "
+ "reasons to start: ) => 1",
+ _sender.getLastCommand(true));
+}
+
} // storage::distributor
diff --git a/storage/src/tests/distributor/mock_tickable_stripe.h b/storage/src/tests/distributor/mock_tickable_stripe.h
index 38fc0c599a2..ec2f978c029 100644
--- a/storage/src/tests/distributor/mock_tickable_stripe.h
+++ b/storage/src/tests/distributor/mock_tickable_stripe.h
@@ -33,6 +33,10 @@ struct MockTickableStripe : TickableStripe {
void update_read_snapshot_after_activation(const lib::ClusterStateBundle&) override { abort(); }
void clear_read_only_bucket_repo_databases() override { abort(); }
+ void update_node_supported_features_repo(std::shared_ptr<const NodeSupportedFeaturesRepo>) override {
+ abort();
+ }
+
void report_bucket_db_status(document::BucketSpace, std::ostream&) const override { abort(); }
StripeAccessGuard::PendingOperationStats pending_operation_stats() const override { abort(); }
void report_single_bucket_requests(vespalib::xml::XmlOutputStream&) const override { abort(); }
diff --git a/storage/src/tests/distributor/node_supported_features_repo_test.cpp b/storage/src/tests/distributor/node_supported_features_repo_test.cpp
new file mode 100644
index 00000000000..990e0fc50a3
--- /dev/null
+++ b/storage/src/tests/distributor/node_supported_features_repo_test.cpp
@@ -0,0 +1,52 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/storage/distributor/node_supported_features_repo.h>
+#include <vespa/vespalib/stllike/hash_map.hpp>
+#include <vespa/vespalib/gtest/gtest.h>
+
+using namespace ::testing;
+
+namespace storage::distributor {
+
+struct NodeSupportedFeaturesRepoTest : Test {
+ using FeatureMap = vespalib::hash_map<uint16_t, NodeSupportedFeatures>;
+ NodeSupportedFeaturesRepo _repo;
+
+ static NodeSupportedFeatures set_features() noexcept {
+ NodeSupportedFeatures f;
+ f.unordered_merge_chaining = true;
+ return f;
+ }
+
+ static NodeSupportedFeatures unset_features() noexcept {
+ return {};
+ }
+};
+
+TEST_F(NodeSupportedFeaturesRepoTest, feature_set_is_empty_by_default) {
+ EXPECT_EQ(_repo.node_supported_features(0), unset_features());
+ EXPECT_EQ(_repo.node_supported_features(12345), unset_features());
+}
+
+TEST_F(NodeSupportedFeaturesRepoTest, make_union_of_can_add_new_feature_mapping) {
+ FeatureMap fm;
+ fm[1] = set_features();
+ fm[60] = set_features();
+ auto new_repo = _repo.make_union_of(fm);
+ EXPECT_EQ(new_repo->node_supported_features(0), unset_features());
+ EXPECT_EQ(new_repo->node_supported_features(1), set_features());
+ EXPECT_EQ(new_repo->node_supported_features(60), set_features());
+}
+
+TEST_F(NodeSupportedFeaturesRepoTest, make_union_of_updates_existing_feature_mappings) {
+ FeatureMap fm;
+ fm[1] = set_features();
+ fm[60] = set_features();
+ auto new_repo = _repo.make_union_of(fm);
+ fm[1] = unset_features();
+ new_repo = new_repo->make_union_of(fm);
+ EXPECT_EQ(new_repo->node_supported_features(1), unset_features());
+ EXPECT_EQ(new_repo->node_supported_features(60), set_features());
+}
+
+}
diff --git a/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp b/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp
index fe8a607c9ae..3ed5e9f4a8d 100644
--- a/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp
+++ b/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp
@@ -4,6 +4,7 @@
#include <vespa/storage/distributor/top_level_bucket_db_updater.h>
#include <vespa/storage/distributor/bucket_space_distribution_context.h>
#include <vespa/storage/distributor/distributormetricsset.h>
+#include <vespa/storage/distributor/node_supported_features_repo.h>
#include <vespa/storage/distributor/pending_bucket_space_db_transition.h>
#include <vespa/storage/distributor/outdated_nodes_map.h>
#include <vespa/storage/storageutil/distributorstatecache.h>
@@ -119,6 +120,21 @@ public:
invalid_bucket_count));
}
+ void fake_bucket_reply(const lib::ClusterState &state,
+ const api::StorageCommand &cmd,
+ uint32_t bucket_count,
+ const std::function<void(api::RequestBucketInfoReply&)>& reply_decorator)
+ {
+ ASSERT_EQ(cmd.getType(), MessageType::REQUESTBUCKETINFO);
+ const api::StorageMessageAddress& address(*cmd.getAddress());
+ auto reply = make_fake_bucket_reply(state,
+ dynamic_cast<const RequestBucketInfoCommand &>(cmd),
+ address.getIndex(),
+ bucket_count, 0);
+ reply_decorator(*reply);
+ bucket_db_updater().onRequestBucketInfoReply(reply);
+ }
+
void send_fake_reply_for_single_bucket_request(
const api::RequestBucketInfoCommand& rbi)
{
@@ -232,7 +248,7 @@ public:
}
}
- api::StorageMessageAddress storage_address(uint16_t node) {
+ static api::StorageMessageAddress storage_address(uint16_t node) {
static vespalib::string _storage("storage");
return api::StorageMessageAddress(&_storage, lib::NodeType::STORAGE, node);
}
@@ -1299,7 +1315,7 @@ TEST_F(TopLevelBucketDBUpdaterTest, merge_reply_node_down_after_request_sent) {
add_nodes_to_stripe_bucket_db(bucket_id, "0=1234,1=1234,2=1234");
for (uint32_t i = 0; i < 3; ++i) {
- nodes.push_back(api::MergeBucketCommand::Node(i));
+ nodes.emplace_back(i);
}
api::MergeBucketCommand cmd(makeDocumentBucket(bucket_id), nodes, 0);
@@ -2662,4 +2678,37 @@ TEST_F(BucketDBUpdaterSnapshotTest, snapshot_is_unroutable_if_stale_reads_disabl
EXPECT_FALSE(def_rs.is_routable());
}
+TEST_F(BucketDBUpdaterSnapshotTest, node_feature_sets_are_aggregated_from_nodes_and_propagated_to_stripes) {
+ lib::ClusterState state("distributor:1 storage:3");
+ set_cluster_state(state);
+ uint32_t expected_msgs = message_count(3), dummy_buckets_to_return = 1;
+
+ // Known feature sets are initially empty.
+ auto stripes = distributor_stripes();
+ for (auto* s : stripes) {
+ for (uint16_t i : {0, 1, 2}) {
+ EXPECT_FALSE(s->node_supported_features_repo().node_supported_features(i).unordered_merge_chaining);
+ }
+ }
+
+ ASSERT_EQ(expected_msgs, _sender.commands().size());
+ for (uint32_t i = 0; i < _sender.commands().size(); i++) {
+ ASSERT_NO_FATAL_FAILURE(fake_bucket_reply(state, *_sender.command(i),
+ dummy_buckets_to_return, [i](auto& reply) noexcept {
+ // Pretend nodes 1 and 2 are on a shiny version with unordered merge chaining supported.
+ // Node 0 does not support the fanciness.
+ if (i > 0) {
+ reply.supported_node_features().unordered_merge_chaining = true;
+ }
+ }));
+ }
+
+ // Node features should be propagated to all stripes
+ for (auto* s : stripes) {
+ EXPECT_FALSE(s->node_supported_features_repo().node_supported_features(0).unordered_merge_chaining);
+ EXPECT_TRUE(s->node_supported_features_repo().node_supported_features(1).unordered_merge_chaining);
+ EXPECT_TRUE(s->node_supported_features_repo().node_supported_features(2).unordered_merge_chaining);
+ }
+}
+
}
diff --git a/storage/src/tests/distributor/top_level_distributor_test_util.cpp b/storage/src/tests/distributor/top_level_distributor_test_util.cpp
index 636a09d1f6e..2a61141865a 100644
--- a/storage/src/tests/distributor/top_level_distributor_test_util.cpp
+++ b/storage/src/tests/distributor/top_level_distributor_test_util.cpp
@@ -115,13 +115,13 @@ TopLevelDistributorTestUtil::handle_top_level_message(const std::shared_ptr<api:
void
TopLevelDistributorTestUtil::close()
{
- _component.reset(0);
- if (_distributor.get()) {
+ _component.reset();
+ if (_distributor) {
_stripe_pool->stop_and_join(); // Must be tagged as stopped prior to onClose
_distributor->onClose();
}
_sender.clear();
- _node.reset(0);
+ _node.reset();
_config = getStandardConfig(false);
}
diff --git a/storage/src/tests/storageserver/mergethrottlertest.cpp b/storage/src/tests/storageserver/mergethrottlertest.cpp
index e8f8e425af4..0f844ab6b4f 100644
--- a/storage/src/tests/storageserver/mergethrottlertest.cpp
+++ b/storage/src/tests/storageserver/mergethrottlertest.cpp
@@ -52,15 +52,18 @@ struct MergeBuilder {
~MergeBuilder();
MergeBuilder& nodes(uint16_t n0) {
+ _nodes.clear();
_nodes.push_back(n0);
return *this;
}
MergeBuilder& nodes(uint16_t n0, uint16_t n1) {
+ _nodes.clear();
_nodes.push_back(n0);
_nodes.push_back(n1);
return *this;
}
MergeBuilder& nodes(uint16_t n0, uint16_t n1, uint16_t n2) {
+ _nodes.clear();
_nodes.push_back(n0);
_nodes.push_back(n1);
_nodes.push_back(n2);
@@ -146,7 +149,8 @@ struct MergeThrottlerTest : Test {
api::ReturnCode::Result expectedResultCode);
void fill_throttler_queue_with_n_commands(uint16_t throttler_index, size_t queued_count);
- void receive_chained_merge_with_full_queue(bool disable_queue_limits);
+ void fill_up_throttler_active_window_and_queue(uint16_t node_idx);
+ void receive_chained_merge_with_full_queue(bool disable_queue_limits, bool unordered_fwd = false);
std::shared_ptr<api::MergeBucketCommand> peek_throttler_queue_top(size_t throttler_idx) {
auto& queue = _throttlers[throttler_idx]->getMergeQueue();
@@ -1197,7 +1201,7 @@ TEST_F(MergeThrottlerTest, busy_returned_on_full_queue_for_merges_sent_from_dist
}
void
-MergeThrottlerTest::receive_chained_merge_with_full_queue(bool disable_queue_limits)
+MergeThrottlerTest::receive_chained_merge_with_full_queue(bool disable_queue_limits, bool unordered_fwd)
{
// Note: uses node with index 1 to not be the first node in chain
_throttlers[1]->set_disable_queue_limits_for_chained_merges(disable_queue_limits);
@@ -1218,10 +1222,15 @@ MergeThrottlerTest::receive_chained_merge_with_full_queue(bool disable_queue_lim
// Send down another merge with non-empty chain. It should _not_ be busy bounced
// (if limits disabled) as it has already been accepted into another node's merge window.
{
- std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}});
+ std::vector<MergeBucketCommand::Node> nodes({{2}, {1}, {0}});
auto cmd = std::make_shared<MergeBucketCommand>(
makeDocumentBucket(BucketId(32, 0xf000baaa)), nodes, 1234, 1);
- cmd->setChain(std::vector<uint16_t>({0})); // Forwarded from node 0
+ if (!unordered_fwd) {
+ cmd->setChain(std::vector<uint16_t>({0})); // Forwarded from node 0
+ } else {
+ cmd->setChain(std::vector<uint16_t>({2})); // Forwarded from node 2, i.e. _not_ the lowest index
+ }
+ cmd->set_use_unordered_forwarding(unordered_fwd);
_topLinks[1]->sendDown(cmd);
}
}
@@ -1249,11 +1258,34 @@ TEST_F(MergeThrottlerTest, forwarded_merge_has_higher_pri_when_chain_limits_disa
EXPECT_FALSE(highest_pri_merge->getChain().empty()); // Should be the forwarded merge
}
+TEST_F(MergeThrottlerTest, forwarded_unordered_merge_is_directly_accepted_into_active_window) {
+ // Unordered forwarding is orthogonal to disabled chain limits config, so we implicitly test that too.
+ ASSERT_NO_FATAL_FAILURE(receive_chained_merge_with_full_queue(true, true));
+
+ // Unordered merge is immediately forwarded to the next node
+ _topLinks[1]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime);
+ auto fwd = std::dynamic_pointer_cast<api::MergeBucketCommand>(
+ _topLinks[1]->getAndRemoveMessage(MessageType::MERGEBUCKET));
+ ASSERT_TRUE(fwd);
+ EXPECT_TRUE(fwd->use_unordered_forwarding());
+ EXPECT_EQ(fwd->getChain(), std::vector<uint16_t>({2, 1}));
+}
+
+TEST_F(MergeThrottlerTest, non_forwarded_unordered_merge_is_enqueued_if_active_window_full)
+{
+ fill_throttler_queue_with_n_commands(1, 0); // Fill active window entirely
+ {
+ std::vector<MergeBucketCommand::Node> nodes({{1}, {2}, {0}});
+ auto cmd = std::make_shared<MergeBucketCommand>(
+ makeDocumentBucket(BucketId(32, 0xf000baaa)), nodes, 1234, 1);
+ cmd->set_use_unordered_forwarding(true);
+ _topLinks[1]->sendDown(cmd);
+ }
+ waitUntilMergeQueueIs(*_throttlers[1], 1, _messageWaitTime); // Should be in queue, not active window
+}
+
TEST_F(MergeThrottlerTest, broken_cycle) {
- std::vector<MergeBucketCommand::Node> nodes;
- nodes.push_back(1);
- nodes.push_back(0);
- nodes.push_back(2);
+ std::vector<MergeBucketCommand::Node> nodes({1, 0, 2});
{
std::vector<uint16_t> chain;
chain.push_back(0);
@@ -1268,10 +1300,7 @@ TEST_F(MergeThrottlerTest, broken_cycle) {
// Send cycled merge which will be executed
{
- std::vector<uint16_t> chain;
- chain.push_back(0);
- chain.push_back(1);
- chain.push_back(2);
+ std::vector<uint16_t> chain({0, 1, 2});
auto cmd = std::make_shared<MergeBucketCommand>(
makeDocumentBucket(BucketId(32, 0xfeef00)), nodes, 1234, 1, chain);
_topLinks[1]->sendDown(cmd);
@@ -1425,9 +1454,10 @@ TEST_F(MergeThrottlerTest, source_only_merges_are_not_affected_by_backpressure)
void MergeThrottlerTest::fill_throttler_queue_with_n_commands(uint16_t throttler_index, size_t queued_count) {
size_t max_pending = _throttlers[throttler_index]->getThrottlePolicy().getMaxPendingCount();
for (size_t i = 0; i < max_pending + queued_count; ++i) {
- _topLinks[throttler_index]->sendDown(MergeBuilder(document::BucketId(16, i)).create());
+ _topLinks[throttler_index]->sendDown(MergeBuilder(document::BucketId(16, i))
+ .nodes(throttler_index, throttler_index + 1)
+ .create());
}
-
// Wait till we have max_pending merge forwards and queued_count enqueued.
_topLinks[throttler_index]->waitForMessages(max_pending, _messageWaitTime);
waitUntilMergeQueueIs(*_throttlers[throttler_index], queued_count, _messageWaitTime);