summaryrefslogtreecommitdiffstats
path: root/storage/src/tests/distributor/mergeoperationtest.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'storage/src/tests/distributor/mergeoperationtest.cpp')
-rw-r--r--storage/src/tests/distributor/mergeoperationtest.cpp62
1 files changed, 56 insertions, 6 deletions
diff --git a/storage/src/tests/distributor/mergeoperationtest.cpp b/storage/src/tests/distributor/mergeoperationtest.cpp
index 65ee5254193..54bd06c98e0 100644
--- a/storage/src/tests/distributor/mergeoperationtest.cpp
+++ b/storage/src/tests/distributor/mergeoperationtest.cpp
@@ -1,5 +1,5 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include <tests/common/dummystoragelink.h>
+
#include <tests/distributor/distributor_stripe_test_util.h>
#include <vespa/document/test/make_bucket_space.h>
#include <vespa/document/test/make_document_bucket.h>
@@ -12,6 +12,7 @@
#include <vespa/vdslib/distribution/distribution.h>
#include <vespa/vespalib/gtest/gtest.h>
#include <vespa/vespalib/text/stringtokenizer.h>
+#include <charconv>
using document::test::makeDocumentBucket;
using document::test::makeBucketSpace;
@@ -37,6 +38,7 @@ struct MergeOperationTest : Test, DistributorStripeTestUtil {
}
std::shared_ptr<MergeOperation> setup_minimal_merge_op();
+ std::shared_ptr<MergeOperation> setup_simple_merge_op(const std::vector<uint16_t>& nodes);
std::shared_ptr<MergeOperation> setup_simple_merge_op();
void assert_simple_merge_bucket_command();
void assert_simple_delete_bucket_command();
@@ -47,13 +49,13 @@ std::shared_ptr<MergeOperation>
MergeOperationTest::setup_minimal_merge_op()
{
document::BucketId bucket_id(16, 1);
- auto op = std::make_shared<MergeOperation>(BucketAndNodes(makeDocumentBucket(bucket_id), toVector<uint16_t>(0, 1, 2)));
+ auto op = std::make_shared<MergeOperation>(BucketAndNodes(makeDocumentBucket(bucket_id), {0, 1, 2}));
op->setIdealStateManager(&getIdealStateManager());
return op;
}
std::shared_ptr<MergeOperation>
-MergeOperationTest::setup_simple_merge_op()
+MergeOperationTest::setup_simple_merge_op(const std::vector<uint16_t>& nodes)
{
getClock().setAbsoluteTimeInSeconds(10);
@@ -64,12 +66,18 @@ MergeOperationTest::setup_simple_merge_op()
enable_cluster_state("distributor:1 storage:3");
- auto op = std::make_shared<MergeOperation>(BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)), toVector<uint16_t>(0, 1, 2)));
+ auto op = std::make_shared<MergeOperation>(BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)), nodes));
op->setIdealStateManager(&getIdealStateManager());
op->start(_sender, framework::MilliSecTime(0));
return op;
}
+std::shared_ptr<MergeOperation>
+MergeOperationTest::setup_simple_merge_op()
+{
+ return setup_simple_merge_op({0, 1, 2});
+}
+
void
MergeOperationTest::assert_simple_merge_bucket_command()
{
@@ -150,8 +158,10 @@ std::string getNodeList(std::string state, uint32_t redundancy, std::string exis
num.erase(pos);
trusted = true;
}
- bucketDB[i] = BucketCopy(0, atoi(num.c_str()),
- api::BucketInfo(1, 2, 3));
+ uint16_t node;
+ [[maybe_unused]] auto [ptr, ec] = std::from_chars(num.data(), num.data() + num.size(), node);
+ assert(ec == std::errc{});
+ bucketDB[i] = BucketCopy(0, node, api::BucketInfo(1, 2, 3));
bucketDB[i].setTrusted(trusted);
}
std::vector<MergeMetaData> nodes(st.size());
@@ -553,4 +563,44 @@ TEST_F(MergeOperationTest, on_throttled_updates_metrics)
EXPECT_EQ(1, metrics->throttled.getValue());
}
+TEST_F(MergeOperationTest, unordered_merges_only_sent_iff_config_enabled_and_all_nodes_support_feature) {
+ setup_stripe(Redundancy(4), NodeCount(4), "distributor:1 storage:4");
+ NodeSupportedFeatures with_unordered;
+ with_unordered.unordered_merge_chaining = true;
+
+ set_node_supported_features(1, with_unordered);
+ set_node_supported_features(2, with_unordered);
+
+ auto config = make_config();
+ config->set_use_unordered_merge_chaining(true);
+ configure_stripe(std::move(config));
+
+ // Only nodes {1, 2} support unordered merging; merges should be ordered (sent to lowest index node 1).
+ setup_simple_merge_op({1, 2, 3}); // Note: these will be re-ordered in ideal state order internally
+ ASSERT_EQ("MergeBucketCommand(BucketId(0x4000000000000001), to time 10000000, "
+ "cluster state version: 0, nodes: [2, 1, 3], chain: [], "
+ "reasons to start: ) => 1",
+ _sender.getLastCommand(true));
+
+ // All involved nodes support unordered merging; merges should be unordered (sent to ideal node 2)
+ setup_simple_merge_op({1, 2});
+ ASSERT_EQ("MergeBucketCommand(BucketId(0x4000000000000001), to time 10000001, "
+ "cluster state version: 0, nodes: [2, 1], chain: [] (unordered forwarding), "
+ "reasons to start: ) => 2",
+ _sender.getLastCommand(true));
+
+ _sender.clear();
+
+ config = make_config();
+ config->set_use_unordered_merge_chaining(false);
+ configure_stripe(std::move(config));
+
+ // If config is not enabled, should send ordered even if nodes support the feature.
+ setup_simple_merge_op({2, 1});
+ ASSERT_EQ("MergeBucketCommand(BucketId(0x4000000000000001), to time 10000002, "
+ "cluster state version: 0, nodes: [2, 1], chain: [], "
+ "reasons to start: ) => 1",
+ _sender.getLastCommand(true));
+}
+
} // storage::distributor