summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2020-12-09 12:27:30 +0000
committerTor Brede Vekterli <vekterli@verizonmedia.com>2020-12-09 12:40:32 +0000
commitcb3c47da010bd50df05827015c6b4bf400e8b90a (patch)
tree003f45f9617eec9625adb57f2aee6c407a0f3076 /storage
parent70435c1b3b7f58eba996da4e489878c4430836c0 (diff)
Bounce reindexing visitor with BUSY if merge is pending for bucket
Since reindexing visitors take a bucket lock when they arrive and wait for pending ops to drain before they start, doing so when there's a pending merge risks starving the bucket for a long time. This is because merges may linger for a long time in the merge throttling queues in the cluster. By not starting such visitors if there is a pending merge, we avoid this edge case. Functionality is already in place to inhibit merges from starting if there's an active bucket lock present.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp24
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/read_for_write_visitor_operation.cpp28
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/read_for_write_visitor_operation.h2
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp7
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/visitoroperation.h1
5 files changed, 62 insertions, 0 deletions
diff --git a/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp b/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp
index e8401914abf..320c55f9998 100644
--- a/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp
+++ b/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp
@@ -8,6 +8,8 @@
#include <vespa/storage/distributor/operations/external/visitoroperation.h>
#include <vespa/storage/distributor/distributor.h>
#include <vespa/storage/distributor/distributormetricsset.h>
+#include <vespa/storage/distributor/pendingmessagetracker.h>
+#include <vespa/storageapi/message/bucket.h>
#include <vespa/storageapi/message/persistence.h>
#include <vespa/storageapi/message/visitor.h>
#include <tests/distributor/distributortestutil.h>
@@ -25,6 +27,11 @@ Bucket default_bucket(BucketId id) {
return Bucket(document::FixedBucketSpaces::default_space(), id);
}
+api::StorageMessageAddress make_storage_address(uint16_t node) {
+ static vespalib::string _storage("storage");
+ return {&_storage, lib::NodeType::STORAGE, node};
+}
+
}
struct ReadForWriteVisitorOperationStarterTest : Test, DistributorTestUtil {
@@ -46,6 +53,7 @@ struct ReadForWriteVisitorOperationStarterTest : Test, DistributorTestUtil {
createLinks();
setupDistributor(1, 1, "version:1 distributor:1 storage:1");
_op_owner = std::make_unique<OperationOwner>(_sender, getClock());
+ _sender.setPendingMessageTracker(getDistributor().getPendingMessageTracker());
addNodesToBucketDB(_sub_bucket, "0=1/2/3/t");
}
@@ -81,6 +89,7 @@ struct ReadForWriteVisitorOperationStarterTest : Test, DistributorTestUtil {
TEST_F(ReadForWriteVisitorOperationStarterTest, visitor_that_fails_precondition_checks_is_immediately_failed) {
auto op = create_rfw_op(create_nested_visitor_op(false));
_op_owner->start(op, OperationStarter::Priority(120));
+ ASSERT_EQ("", _sender.getCommands(true));
EXPECT_EQ("CreateVisitorReply(last=BucketId(0x0000000000000000)) "
"ReturnCode(ILLEGAL_PARAMETERS, No buckets in CreateVisitorCommand for visitor 'foo')",
_sender.getLastReply());
@@ -92,6 +101,21 @@ TEST_F(ReadForWriteVisitorOperationStarterTest, visitor_immediately_started_if_n
ASSERT_EQ("Visitor Create => 0", _sender.getCommands(true));
}
+TEST_F(ReadForWriteVisitorOperationStarterTest, visitor_is_bounced_if_merge_pending_for_bucket) {
+ auto op = create_rfw_op(create_nested_visitor_op(true));
+ std::vector<api::MergeBucketCommand::Node> nodes({{0, false}, {1, false}});
+ auto merge = std::make_shared<api::MergeBucketCommand>(default_bucket(_sub_bucket),
+ std::move(nodes),
+ api::Timestamp(123456));
+ merge->setAddress(make_storage_address(0));
+ getDistributor().getPendingMessageTracker().insert(merge);
+ _op_owner->start(op, OperationStarter::Priority(120));
+ ASSERT_EQ("", _sender.getCommands(true));
+ EXPECT_EQ("CreateVisitorReply(last=BucketId(0x0000000000000000)) "
+ "ReturnCode(BUSY, A merge operation is pending for this bucket)",
+ _sender.getLastReply());
+}
+
namespace {
struct ConcurrentMutationFixture {
diff --git a/storage/src/vespa/storage/distributor/operations/external/read_for_write_visitor_operation.cpp b/storage/src/vespa/storage/distributor/operations/external/read_for_write_visitor_operation.cpp
index 1e87172e870..5ebf20138a4 100644
--- a/storage/src/vespa/storage/distributor/operations/external/read_for_write_visitor_operation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/read_for_write_visitor_operation.cpp
@@ -2,6 +2,7 @@
#include "read_for_write_visitor_operation.h"
#include "visitoroperation.h"
+#include <vespa/storage/distributor/distributormessagesender.h>
#include <vespa/storage/distributor/pendingmessagetracker.h>
#include <vespa/storage/distributor/operationowner.h>
#include <cassert>
@@ -40,6 +41,11 @@ void ReadForWriteVisitorOperationStarter::onStart(DistributorMessageSender& send
assert(_visitor_op->has_sent_reply());
return;
}
+ if (bucket_has_pending_merge(*maybe_bucket, sender.getPendingMessageTracker())) {
+ LOG(debug, "A merge is pending for bucket %s, failing visitor", maybe_bucket->toString().c_str());
+ _visitor_op->fail_with_merge_pending(sender);
+ return;
+ }
auto bucket_handle = _operation_sequencer.try_acquire(*maybe_bucket);
if (!bucket_handle.valid()) {
LOG(debug, "An operation is already pending for bucket %s, failing visitor",
@@ -71,4 +77,26 @@ void ReadForWriteVisitorOperationStarter::onReceive(DistributorMessageSender& se
_visitor_op->onReceive(sender, msg);
}
+namespace {
+
+struct MergePendingChecker : PendingMessageTracker::Checker {
+ bool has_pending_merge = false;
+ bool check(uint32_t message_type, [[maybe_unused]] uint16_t node, [[maybe_unused]] uint8_t priority) override {
+ if (message_type == api::MessageType::MERGEBUCKET_ID) {
+ has_pending_merge = true;
+ }
+ return true;
+ }
+};
+
+}
+
+bool ReadForWriteVisitorOperationStarter::bucket_has_pending_merge(const document::Bucket& bucket,
+ const PendingMessageTracker& tracker) const {
+ MergePendingChecker merge_checker;
+ tracker.checkPendingMessages(bucket, merge_checker);
+ return merge_checker.has_pending_merge;
+}
+
+
}
diff --git a/storage/src/vespa/storage/distributor/operations/external/read_for_write_visitor_operation.h b/storage/src/vespa/storage/distributor/operations/external/read_for_write_visitor_operation.h
index 06b4f60307e..a6b414e6fb5 100644
--- a/storage/src/vespa/storage/distributor/operations/external/read_for_write_visitor_operation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/read_for_write_visitor_operation.h
@@ -43,6 +43,8 @@ public:
void onStart(DistributorMessageSender& sender) override;
void onReceive(DistributorMessageSender& sender,
const std::shared_ptr<api::StorageReply> & msg) override;
+private:
+ bool bucket_has_pending_merge(const document::Bucket&, const PendingMessageTracker& tracker) const;
};
}
diff --git a/storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp b/storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp
index 3d79aa176d7..5a8adb26cd8 100644
--- a/storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp
@@ -887,6 +887,13 @@ VisitorOperation::fail_with_bucket_already_locked(DistributorMessageSender& send
sendReply(api::ReturnCode(api::ReturnCode::BUSY, "This bucket is already locked by another operation"), sender);
}
+void
+VisitorOperation::fail_with_merge_pending(DistributorMessageSender& sender)
+{
+ assert(is_read_for_write());
+ sendReply(api::ReturnCode(api::ReturnCode::BUSY, "A merge operation is pending for this bucket"), sender);
+}
+
std::optional<document::Bucket>
VisitorOperation::first_bucket_to_visit() const
{
diff --git a/storage/src/vespa/storage/distributor/operations/external/visitoroperation.h b/storage/src/vespa/storage/distributor/operations/external/visitoroperation.h
index 4043b0b2c50..e6ad7a042dd 100644
--- a/storage/src/vespa/storage/distributor/operations/external/visitoroperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/visitoroperation.h
@@ -51,6 +51,7 @@ public:
// Only valid to call if is_read_for_write() == true
void fail_with_bucket_already_locked(DistributorMessageSender& sender);
+ void fail_with_merge_pending(DistributorMessageSender& sender);
[[nodiscard]] bool verify_command_and_expand_buckets(DistributorMessageSender& sender);