summaryrefslogtreecommitdiffstats
path: root/storage/src/tests
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2021-06-24 12:07:08 +0000
committerTor Brede Vekterli <vekterli@verizonmedia.com>2021-06-24 21:07:27 +0000
commit889f5f60aed5ef60d486fea07b051052da004970 (patch)
tree94bafea581f9c156cd101ec87595dcd1a10fd80b /storage/src/tests
parent4bb24d31ec0014ebd9e3dce7d4acf11ae98b9ac6 (diff)
Avoid race condition regression introduced in #18179
We would previously check for the presence of pending null-bucket `RequestBucketInfoCommand`s to determine if a pending cluster state was present. We would also attempt to block all bucket delete operations from starting if _any_ operation was pending towards that bucket on a given node, including bucket info requests. The former was rewritten to instead explicitly consider pending cluster state checks instead, as checking null buckets no longer works when using stripes. Unfortunately, due to a long-standing bug with message tracking of `RequestBucketInfoCommand`s, these would _always_ be marked as pending towards the null bucket. Since all ideal state ops would be blocked by null-bucket info requests, this would avoid starting any ideal state op as long as _any_ other op had an info request pending for the target node. This had the desirable (but not explicitly coded for) side effect of inhibiting bucket deletions from racing with half-finished merge operations. It also had the undesirable effect of needlessly blocking ops for completely unrelated buckets. With these changes, we now explicitly handle bucket info requests for single buckets in the `PendingMessageTracker`, allowing inhibition of deletions to work as expected. Also add an explicit check for pending info requests for all ideal state ops to mirror the old behavior (but now per-bucket instead of globally...!).
Diffstat (limited to 'storage/src/tests')
-rw-r--r--storage/src/tests/distributor/mergeoperationtest.cpp49
-rw-r--r--storage/src/tests/distributor/pendingmessagetrackertest.cpp72
2 files changed, 118 insertions, 3 deletions
diff --git a/storage/src/tests/distributor/mergeoperationtest.cpp b/storage/src/tests/distributor/mergeoperationtest.cpp
index 52a8bfc41b6..33baa572c64 100644
--- a/storage/src/tests/distributor/mergeoperationtest.cpp
+++ b/storage/src/tests/distributor/mergeoperationtest.cpp
@@ -1,6 +1,8 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <tests/common/dummystoragelink.h>
#include <tests/distributor/distributortestutil.h>
+#include <vespa/document/test/make_document_bucket.h>
+#include <vespa/document/test/make_bucket_space.h>
#include <vespa/storage/distributor/idealstatemanager.h>
#include <vespa/storageapi/message/persistence.h>
#include <vespa/storage/distributor/operations/idealstate/mergeoperation.h>
@@ -8,15 +10,19 @@
#include <vespa/storage/distributor/distributor.h>
#include <vespa/storage/distributor/operation_sequencer.h>
#include <vespa/vdslib/distribution/distribution.h>
-#include <vespa/document/test/make_document_bucket.h>
#include <vespa/vespalib/text/stringtokenizer.h>
#include <vespa/vespalib/gtest/gtest.h>
using document::test::makeDocumentBucket;
+using document::test::makeBucketSpace;
using namespace ::testing;
namespace storage::distributor {
+namespace {
+vespalib::string _g_storage("storage");
+}
+
struct MergeOperationTest : Test, DistributorTestUtil {
OperationSequencer _operation_sequencer;
@@ -252,8 +258,7 @@ TEST_F(MergeOperationTest, do_not_remove_copies_with_pending_messages) {
// at will.
auto msg = std::make_shared<api::SetBucketStateCommand>(
makeDocumentBucket(bucket), api::SetBucketStateCommand::ACTIVE);
- vespalib::string storage("storage");
- msg->setAddress(api::StorageMessageAddress::create(&storage, lib::NodeType::STORAGE, 1));
+ msg->setAddress(api::StorageMessageAddress::create(&_g_storage, lib::NodeType::STORAGE, 1));
pending_message_tracker().insert(msg);
sendReply(op);
@@ -457,4 +462,42 @@ TEST_F(MergeOperationTest, missing_replica_is_included_in_limited_node_list) {
_sender.getLastCommand(true));
}
+TEST_F(MergeOperationTest, merge_operation_is_blocked_by_request_bucket_info_to_any_node_in_chain) {
+ getClock().setAbsoluteTimeInSeconds(10);
+ document::BucketId bucket_id(16, 1);
+ addNodesToBucketDB(bucket_id, "0=10/1/1/t,1=20/1/1,2=10/1/1/t");
+ enableDistributorClusterState("distributor:1 storage:3");
+ MergeOperation op(BucketAndNodes(makeDocumentBucket(bucket_id), toVector<uint16_t>(0, 1, 2)));
+ op.setIdealStateManager(&getIdealStateManager());
+
+ // Not initially blocked
+ EXPECT_FALSE(op.isBlocked(operation_context(), _operation_sequencer));
+
+ auto info_cmd = std::make_shared<api::RequestBucketInfoCommand>(
+ makeBucketSpace(), std::vector<document::BucketId>({bucket_id}));
+ info_cmd->setAddress(api::StorageMessageAddress::create(&_g_storage, lib::NodeType::STORAGE, 1)); // 1 is in chain
+ pending_message_tracker().insert(info_cmd);
+
+ // Now blocked by info request
+ EXPECT_TRUE(op.isBlocked(operation_context(), _operation_sequencer));
+}
+
+TEST_F(MergeOperationTest, merge_operation_is_not_blocked_by_request_bucket_info_to_unrelated_bucket) {
+ getClock().setAbsoluteTimeInSeconds(10);
+ document::BucketId bucket_id(16, 1);
+ document::BucketId other_bucket_id(16, 2);
+ addNodesToBucketDB(bucket_id, "0=10/1/1/t,1=20/1/1,2=10/1/1/t");
+ enableDistributorClusterState("distributor:1 storage:3");
+ MergeOperation op(BucketAndNodes(makeDocumentBucket(bucket_id), toVector<uint16_t>(0, 1, 2)));
+ op.setIdealStateManager(&getIdealStateManager());
+
+ auto info_cmd = std::make_shared<api::RequestBucketInfoCommand>(
+ makeBucketSpace(), std::vector<document::BucketId>({other_bucket_id}));
+ info_cmd->setAddress(api::StorageMessageAddress::create(&_g_storage, lib::NodeType::STORAGE, 1));
+ pending_message_tracker().insert(info_cmd);
+
+ // Not blocked; bucket info request is for another bucket
+ EXPECT_FALSE(op.isBlocked(operation_context(), _operation_sequencer));
+}
+
} // storage::distributor
diff --git a/storage/src/tests/distributor/pendingmessagetrackertest.cpp b/storage/src/tests/distributor/pendingmessagetrackertest.cpp
index 72365c61597..20ffd216e3d 100644
--- a/storage/src/tests/distributor/pendingmessagetrackertest.cpp
+++ b/storage/src/tests/distributor/pendingmessagetrackertest.cpp
@@ -2,16 +2,20 @@
#include <vespa/document/base/testdocman.h>
#include <vespa/document/test/make_document_bucket.h>
+#include <vespa/document/test/make_bucket_space.h>
#include <vespa/storage/distributor/pendingmessagetracker.h>
#include <vespa/storage/frameworkimpl/component/storagecomponentregisterimpl.h>
+#include <vespa/storageapi/message/bucket.h>
#include <vespa/storageapi/message/persistence.h>
#include <vespa/storageframework/defaultimplementation/clock/fakeclock.h>
#include <tests/common/dummystoragelink.h>
+#include <vespa/vdslib/state/clusterstate.h>
#include <vespa/vespalib/util/lambdatask.h>
#include <vespa/vespalib/gtest/gtest.h>
#include <gmock/gmock.h>
using document::test::makeDocumentBucket;
+using document::test::makeBucketSpace;
using namespace ::testing;
namespace storage::distributor {
@@ -517,4 +521,72 @@ TEST_F(PendingMessageTrackerTest, abort_invokes_deferred_tasks_with_aborted_stat
EXPECT_EQ(state, TaskRunState::Aborted);
}
+TEST_F(PendingMessageTrackerTest, request_bucket_info_with_no_buckets_tracked_as_null_bucket) {
+ Fixture f;
+ auto msg = std::make_shared<api::RequestBucketInfoCommand>(makeBucketSpace(), 0, lib::ClusterState(), "");
+ msg->setAddress(makeStorageAddress(2));
+ f.tracker().insert(msg);
+
+ // Tracked as null bucket
+ {
+ OperationEnumerator enumerator;
+ f.tracker().checkPendingMessages(makeDocumentBucket(document::BucketId()), enumerator);
+ EXPECT_EQ("Request bucket info -> 2\n", enumerator.str());
+ }
+
+ // Nothing to a specific bucket
+ {
+ OperationEnumerator enumerator;
+ f.tracker().checkPendingMessages(makeDocumentBucket(document::BucketId(16, 1234)), enumerator);
+ EXPECT_EQ("", enumerator.str());
+ }
+
+ auto reply = std::shared_ptr<api::StorageReply>(msg->makeReply());
+ f.tracker().reply(*reply);
+
+ // No longer tracked as null bucket
+ {
+ OperationEnumerator enumerator;
+ f.tracker().checkPendingMessages(makeDocumentBucket(document::BucketId()), enumerator);
+ EXPECT_EQ("", enumerator.str());
+ }
+}
+
+TEST_F(PendingMessageTrackerTest, request_bucket_info_with_bucket_tracked_with_superbucket) {
+ Fixture f;
+ document::BucketId bucket(16, 1234);
+ auto msg = std::make_shared<api::RequestBucketInfoCommand>(makeBucketSpace(), std::vector<document::BucketId>({bucket}));
+ msg->setAddress(makeStorageAddress(3));
+ f.tracker().insert(msg);
+
+ // Not tracked as null bucket
+ {
+ OperationEnumerator enumerator;
+ f.tracker().checkPendingMessages(makeDocumentBucket(document::BucketId()), enumerator);
+ EXPECT_EQ("", enumerator.str());
+ }
+ // Tracked for superbucket
+ {
+ OperationEnumerator enumerator;
+ f.tracker().checkPendingMessages(makeDocumentBucket(bucket), enumerator);
+ EXPECT_EQ("Request bucket info -> 3\n", enumerator.str());
+ }
+ // Not tracked for other buckets
+ {
+ OperationEnumerator enumerator;
+ f.tracker().checkPendingMessages(makeDocumentBucket(document::BucketId(16, 2345)), enumerator);
+ EXPECT_EQ("", enumerator.str());
+ }
+
+ auto reply = std::shared_ptr<api::StorageReply>(msg->makeReply());
+ f.tracker().reply(*reply);
+
+ // No longer tracked for specified bucket
+ {
+ OperationEnumerator enumerator;
+ f.tracker().checkPendingMessages(makeDocumentBucket(bucket), enumerator);
+ EXPECT_EQ("", enumerator.str());
+ }
+}
+
}