aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2021-06-24 12:07:08 +0000
committerTor Brede Vekterli <vekterli@verizonmedia.com>2021-06-24 21:07:27 +0000
commit889f5f60aed5ef60d486fea07b051052da004970 (patch)
tree94bafea581f9c156cd101ec87595dcd1a10fd80b
parent4bb24d31ec0014ebd9e3dce7d4acf11ae98b9ac6 (diff)
Avoid race condition regression introduced in #18179
We would previously check for the presence of pending null-bucket `RequestBucketInfoCommand`s to determine if a pending cluster state was present. We would also attempt to block all bucket delete operations from starting if _any_ operation was pending towards that bucket on a given node, including bucket info requests. The former was rewritten to instead explicitly consider pending cluster state checks instead, as checking null buckets no longer works when using stripes. Unfortunately, due to a long-standing bug with message tracking of `RequestBucketInfoCommand`s, these would _always_ be marked as pending towards the null bucket. Since all ideal state ops would be blocked by null-bucket info requests, this would avoid starting any ideal state op as long as _any_ other op had an info request pending for the target node. This had the desirable (but not explicitly coded for) side effect of inhibiting bucket deletions from racing with half-finished merge operations. It also had the undesirable effect of needlessly blocking ops for completely unrelated buckets. With these changes, we now explicitly handle bucket info requests for single buckets in the `PendingMessageTracker`, allowing inhibition of deletions to work as expected. Also add an explicit check for pending info requests for all ideal state ops to mirror the old behavior (but now per-bucket instead of globally...!).
-rw-r--r--storage/src/tests/distributor/mergeoperationtest.cpp49
-rw-r--r--storage/src/tests/distributor/pendingmessagetrackertest.cpp72
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp6
-rw-r--r--storage/src/vespa/storage/distributor/pendingmessagetracker.cpp9
-rw-r--r--storageapi/src/vespa/storageapi/message/bucket.cpp4
5 files changed, 135 insertions, 5 deletions
diff --git a/storage/src/tests/distributor/mergeoperationtest.cpp b/storage/src/tests/distributor/mergeoperationtest.cpp
index 52a8bfc41b6..33baa572c64 100644
--- a/storage/src/tests/distributor/mergeoperationtest.cpp
+++ b/storage/src/tests/distributor/mergeoperationtest.cpp
@@ -1,6 +1,8 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <tests/common/dummystoragelink.h>
#include <tests/distributor/distributortestutil.h>
+#include <vespa/document/test/make_document_bucket.h>
+#include <vespa/document/test/make_bucket_space.h>
#include <vespa/storage/distributor/idealstatemanager.h>
#include <vespa/storageapi/message/persistence.h>
#include <vespa/storage/distributor/operations/idealstate/mergeoperation.h>
@@ -8,15 +10,19 @@
#include <vespa/storage/distributor/distributor.h>
#include <vespa/storage/distributor/operation_sequencer.h>
#include <vespa/vdslib/distribution/distribution.h>
-#include <vespa/document/test/make_document_bucket.h>
#include <vespa/vespalib/text/stringtokenizer.h>
#include <vespa/vespalib/gtest/gtest.h>
using document::test::makeDocumentBucket;
+using document::test::makeBucketSpace;
using namespace ::testing;
namespace storage::distributor {
+namespace {
+vespalib::string _g_storage("storage");
+}
+
struct MergeOperationTest : Test, DistributorTestUtil {
OperationSequencer _operation_sequencer;
@@ -252,8 +258,7 @@ TEST_F(MergeOperationTest, do_not_remove_copies_with_pending_messages) {
// at will.
auto msg = std::make_shared<api::SetBucketStateCommand>(
makeDocumentBucket(bucket), api::SetBucketStateCommand::ACTIVE);
- vespalib::string storage("storage");
- msg->setAddress(api::StorageMessageAddress::create(&storage, lib::NodeType::STORAGE, 1));
+ msg->setAddress(api::StorageMessageAddress::create(&_g_storage, lib::NodeType::STORAGE, 1));
pending_message_tracker().insert(msg);
sendReply(op);
@@ -457,4 +462,42 @@ TEST_F(MergeOperationTest, missing_replica_is_included_in_limited_node_list) {
_sender.getLastCommand(true));
}
+TEST_F(MergeOperationTest, merge_operation_is_blocked_by_request_bucket_info_to_any_node_in_chain) {
+ getClock().setAbsoluteTimeInSeconds(10);
+ document::BucketId bucket_id(16, 1);
+ addNodesToBucketDB(bucket_id, "0=10/1/1/t,1=20/1/1,2=10/1/1/t");
+ enableDistributorClusterState("distributor:1 storage:3");
+ MergeOperation op(BucketAndNodes(makeDocumentBucket(bucket_id), toVector<uint16_t>(0, 1, 2)));
+ op.setIdealStateManager(&getIdealStateManager());
+
+ // Not initially blocked
+ EXPECT_FALSE(op.isBlocked(operation_context(), _operation_sequencer));
+
+ auto info_cmd = std::make_shared<api::RequestBucketInfoCommand>(
+ makeBucketSpace(), std::vector<document::BucketId>({bucket_id}));
+ info_cmd->setAddress(api::StorageMessageAddress::create(&_g_storage, lib::NodeType::STORAGE, 1)); // 1 is in chain
+ pending_message_tracker().insert(info_cmd);
+
+ // Now blocked by info request
+ EXPECT_TRUE(op.isBlocked(operation_context(), _operation_sequencer));
+}
+
+TEST_F(MergeOperationTest, merge_operation_is_not_blocked_by_request_bucket_info_to_unrelated_bucket) {
+ getClock().setAbsoluteTimeInSeconds(10);
+ document::BucketId bucket_id(16, 1);
+ document::BucketId other_bucket_id(16, 2);
+ addNodesToBucketDB(bucket_id, "0=10/1/1/t,1=20/1/1,2=10/1/1/t");
+ enableDistributorClusterState("distributor:1 storage:3");
+ MergeOperation op(BucketAndNodes(makeDocumentBucket(bucket_id), toVector<uint16_t>(0, 1, 2)));
+ op.setIdealStateManager(&getIdealStateManager());
+
+ auto info_cmd = std::make_shared<api::RequestBucketInfoCommand>(
+ makeBucketSpace(), std::vector<document::BucketId>({other_bucket_id}));
+ info_cmd->setAddress(api::StorageMessageAddress::create(&_g_storage, lib::NodeType::STORAGE, 1));
+ pending_message_tracker().insert(info_cmd);
+
+ // Not blocked; bucket info request is for another bucket
+ EXPECT_FALSE(op.isBlocked(operation_context(), _operation_sequencer));
+}
+
} // storage::distributor
diff --git a/storage/src/tests/distributor/pendingmessagetrackertest.cpp b/storage/src/tests/distributor/pendingmessagetrackertest.cpp
index 72365c61597..20ffd216e3d 100644
--- a/storage/src/tests/distributor/pendingmessagetrackertest.cpp
+++ b/storage/src/tests/distributor/pendingmessagetrackertest.cpp
@@ -2,16 +2,20 @@
#include <vespa/document/base/testdocman.h>
#include <vespa/document/test/make_document_bucket.h>
+#include <vespa/document/test/make_bucket_space.h>
#include <vespa/storage/distributor/pendingmessagetracker.h>
#include <vespa/storage/frameworkimpl/component/storagecomponentregisterimpl.h>
+#include <vespa/storageapi/message/bucket.h>
#include <vespa/storageapi/message/persistence.h>
#include <vespa/storageframework/defaultimplementation/clock/fakeclock.h>
#include <tests/common/dummystoragelink.h>
+#include <vespa/vdslib/state/clusterstate.h>
#include <vespa/vespalib/util/lambdatask.h>
#include <vespa/vespalib/gtest/gtest.h>
#include <gmock/gmock.h>
using document::test::makeDocumentBucket;
+using document::test::makeBucketSpace;
using namespace ::testing;
namespace storage::distributor {
@@ -517,4 +521,72 @@ TEST_F(PendingMessageTrackerTest, abort_invokes_deferred_tasks_with_aborted_stat
EXPECT_EQ(state, TaskRunState::Aborted);
}
+TEST_F(PendingMessageTrackerTest, request_bucket_info_with_no_buckets_tracked_as_null_bucket) {
+ Fixture f;
+ auto msg = std::make_shared<api::RequestBucketInfoCommand>(makeBucketSpace(), 0, lib::ClusterState(), "");
+ msg->setAddress(makeStorageAddress(2));
+ f.tracker().insert(msg);
+
+ // Tracked as null bucket
+ {
+ OperationEnumerator enumerator;
+ f.tracker().checkPendingMessages(makeDocumentBucket(document::BucketId()), enumerator);
+ EXPECT_EQ("Request bucket info -> 2\n", enumerator.str());
+ }
+
+ // Nothing to a specific bucket
+ {
+ OperationEnumerator enumerator;
+ f.tracker().checkPendingMessages(makeDocumentBucket(document::BucketId(16, 1234)), enumerator);
+ EXPECT_EQ("", enumerator.str());
+ }
+
+ auto reply = std::shared_ptr<api::StorageReply>(msg->makeReply());
+ f.tracker().reply(*reply);
+
+ // No longer tracked as null bucket
+ {
+ OperationEnumerator enumerator;
+ f.tracker().checkPendingMessages(makeDocumentBucket(document::BucketId()), enumerator);
+ EXPECT_EQ("", enumerator.str());
+ }
+}
+
+TEST_F(PendingMessageTrackerTest, request_bucket_info_with_bucket_tracked_with_superbucket) {
+ Fixture f;
+ document::BucketId bucket(16, 1234);
+ auto msg = std::make_shared<api::RequestBucketInfoCommand>(makeBucketSpace(), std::vector<document::BucketId>({bucket}));
+ msg->setAddress(makeStorageAddress(3));
+ f.tracker().insert(msg);
+
+ // Not tracked as null bucket
+ {
+ OperationEnumerator enumerator;
+ f.tracker().checkPendingMessages(makeDocumentBucket(document::BucketId()), enumerator);
+ EXPECT_EQ("", enumerator.str());
+ }
+ // Tracked for superbucket
+ {
+ OperationEnumerator enumerator;
+ f.tracker().checkPendingMessages(makeDocumentBucket(bucket), enumerator);
+ EXPECT_EQ("Request bucket info -> 3\n", enumerator.str());
+ }
+ // Not tracked for other buckets
+ {
+ OperationEnumerator enumerator;
+ f.tracker().checkPendingMessages(makeDocumentBucket(document::BucketId(16, 2345)), enumerator);
+ EXPECT_EQ("", enumerator.str());
+ }
+
+ auto reply = std::shared_ptr<api::StorageReply>(msg->makeReply());
+ f.tracker().reply(*reply);
+
+ // No longer tracked for specified bucket
+ {
+ OperationEnumerator enumerator;
+ f.tracker().checkPendingMessages(makeDocumentBucket(bucket), enumerator);
+ EXPECT_EQ("", enumerator.str());
+ }
+}
+
}
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp
index f11d1c26da2..ad5467d9156 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp
@@ -223,6 +223,12 @@ IdealStateOperation::shouldBlockThisOperation(uint32_t messageType,
return true;
}
}
+ // Also block on pending bucket-specific RequestBucketInfo since this usually
+ // means there's a semi-completed merge being processed for the bucket, but
+ // there will not be a pending merge command for it at the time.
+ if (messageType == api::MessageType::REQUESTBUCKETINFO_ID) {
+ return true;
+ }
return false;
}
diff --git a/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp b/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp
index f84eafe8d85..cb76b6d50b2 100644
--- a/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp
+++ b/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp
@@ -90,8 +90,13 @@ PendingMessageTracker::insert(const std::shared_ptr<api::StorageMessage>& msg)
{
std::lock_guard guard(_lock);
if (msg->getAddress()) {
+ // TODO STRIPE reevaluate if getBucket() on RequestBucketInfo msgs should transparently return superbucket..!
+ document::Bucket bucket = (msg->getType() != api::MessageType::REQUESTBUCKETINFO)
+ ? msg->getBucket()
+ : document::Bucket(msg->getBucket().getBucketSpace(),
+ dynamic_cast<api::RequestBucketInfoCommand&>(*msg).super_bucket_id());
_messages.emplace(currentTime(), msg->getType().getId(), msg->getPriority(), msg->getMsgId(),
- msg->getBucket(), msg->getAddress()->getIndex());
+ bucket, msg->getAddress()->getIndex());
_nodeInfo.incPending(msg->getAddress()->getIndex());
@@ -119,7 +124,7 @@ PendingMessageTracker::reply(const api::StorageReply& r)
if (code == api::ReturnCode::BUSY || code == api::ReturnCode::TIMEOUT) {
_nodeInfo.setBusy(r.getAddress()->getIndex(), _nodeBusyDuration);
}
- LOG(debug, "Erased message with id %" PRIu64, msgId);
+ LOG(debug, "Erased message with id %" PRIu64 " for bucket %s", msgId, bucket.toString().c_str());
msgs.erase(msgId);
auto deferred_tasks = get_deferred_ops_if_bucket_writes_drained(bucket);
// Deferred tasks may try to send messages, which in turn will invoke the PendingMessageTracker.
diff --git a/storageapi/src/vespa/storageapi/message/bucket.cpp b/storageapi/src/vespa/storageapi/message/bucket.cpp
index 2323a1ab0a4..0f03c63ca93 100644
--- a/storageapi/src/vespa/storageapi/message/bucket.cpp
+++ b/storageapi/src/vespa/storageapi/message/bucket.cpp
@@ -493,6 +493,8 @@ RequestBucketInfoCommand::print(std::ostream& out, bool verbose,
if (hasSystemState()) {
out << "distributor " << _distributor << " in ";
_state->print(out, verbose, indent + " ");
+ } else if (super_bucket_id().isSet()) {
+ out << ", super bucket " << super_bucket_id() << ". ";
}
if (verbose && !_buckets.empty()) {
out << "\n" << indent << " Specified buckets:\n" << indent << " ";
@@ -529,6 +531,8 @@ RequestBucketInfoReply::print(std::ostream& out, bool verbose,
out << "RequestBucketInfoReply(" << _buckets.size();
if (_full_bucket_fetch) {
out << ", full fetch";
+ } else if (super_bucket_id().isSet()) {
+ out << ", super bucket " << super_bucket_id();
}
if (verbose) {
out << "\n" << indent << " ";