summaryrefslogtreecommitdiffstats
path: root/storage/src/tests/distributor
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2020-12-07 15:31:31 +0000
committerTor Brede Vekterli <vekterli@verizonmedia.com>2020-12-07 16:05:30 +0000
commit81853e7e81313a374dea7f2dec377f437393c47c (patch)
tree44c4362357059e5c3dce8cf37689ed61c73f338a /storage/src/tests/distributor
parentf2ececfb183ef73d6a53d74ee84798e0dc7eaf36 (diff)
Block ideal state operations towards buckets that are locked
Prevents ideal state ops from modifying buckets that are being used in a read-for-write context. Move `OperationSequencer` to main `Distributor` to more easily facilitate sharing of it across components.
Diffstat (limited to 'storage/src/tests/distributor')
-rw-r--r--storage/src/tests/distributor/blockingoperationstartertest.cpp5
-rw-r--r--storage/src/tests/distributor/distributor_message_sender_stub.cpp3
-rw-r--r--storage/src/tests/distributor/distributor_message_sender_stub.h10
-rw-r--r--storage/src/tests/distributor/idealstatemanagertest.cpp44
-rw-r--r--storage/src/tests/distributor/maintenancemocks.h6
-rw-r--r--storage/src/tests/distributor/mergeoperationtest.cpp24
-rw-r--r--storage/src/tests/distributor/operation_sequencer_test.cpp14
-rw-r--r--storage/src/tests/distributor/splitbuckettest.cpp42
8 files changed, 119 insertions, 29 deletions
diff --git a/storage/src/tests/distributor/blockingoperationstartertest.cpp b/storage/src/tests/distributor/blockingoperationstartertest.cpp
index 9a9e04f0f33..5203fec2462 100644
--- a/storage/src/tests/distributor/blockingoperationstartertest.cpp
+++ b/storage/src/tests/distributor/blockingoperationstartertest.cpp
@@ -2,6 +2,7 @@
#include <vespa/storage/frameworkimpl/component/storagecomponentregisterimpl.h>
#include <vespa/storage/distributor/blockingoperationstarter.h>
#include <vespa/storage/distributor/pendingmessagetracker.h>
+#include <vespa/storage/distributor/operation_sequencer.h>
#include <tests/distributor/maintenancemocks.h>
#include <vespa/document/test/make_document_bucket.h>
#include <vespa/vespalib/gtest/gtest.h>
@@ -26,6 +27,7 @@ struct BlockingOperationStarterTest : Test {
std::unique_ptr<MockOperationStarter> _starterImpl;
std::unique_ptr<StorageComponentRegisterImpl> _compReg;
std::unique_ptr<PendingMessageTracker> _messageTracker;
+ std::unique_ptr<OperationSequencer> _operation_sequencer;
std::unique_ptr<BlockingOperationStarter> _operationStarter;
void SetUp() override;
@@ -39,7 +41,8 @@ BlockingOperationStarterTest::SetUp()
_compReg->setClock(_clock);
_clock.setAbsoluteTimeInSeconds(1);
_messageTracker = std::make_unique<PendingMessageTracker>(*_compReg);
- _operationStarter = std::make_unique<BlockingOperationStarter>(*_messageTracker, *_starterImpl);
+ _operation_sequencer = std::make_unique<OperationSequencer>();
+ _operationStarter = std::make_unique<BlockingOperationStarter>(*_messageTracker, *_operation_sequencer, *_starterImpl);
}
TEST_F(BlockingOperationStarterTest, operation_not_blocked_when_no_messages_pending) {
diff --git a/storage/src/tests/distributor/distributor_message_sender_stub.cpp b/storage/src/tests/distributor/distributor_message_sender_stub.cpp
index df894f1bb2c..893ebcd158a 100644
--- a/storage/src/tests/distributor/distributor_message_sender_stub.cpp
+++ b/storage/src/tests/distributor/distributor_message_sender_stub.cpp
@@ -12,7 +12,8 @@ namespace storage {
DistributorMessageSenderStub::DistributorMessageSenderStub()
: _stub_impl(),
_cluster_name("storage"),
- _pending_message_tracker(nullptr)
+ _pending_message_tracker(nullptr),
+ _operation_sequencer(nullptr)
{}
DistributorMessageSenderStub::~DistributorMessageSenderStub() = default;
diff --git a/storage/src/tests/distributor/distributor_message_sender_stub.h b/storage/src/tests/distributor/distributor_message_sender_stub.h
index e69673a9366..0b5ab5ae9a7 100644
--- a/storage/src/tests/distributor/distributor_message_sender_stub.h
+++ b/storage/src/tests/distributor/distributor_message_sender_stub.h
@@ -13,6 +13,7 @@ class DistributorMessageSenderStub : public distributor::DistributorMessageSende
MessageSenderStub _stub_impl;
vespalib::string _cluster_name;
distributor::PendingMessageTracker* _pending_message_tracker;
+ distributor::OperationSequencer* _operation_sequencer;
public:
DistributorMessageSenderStub();
@@ -94,6 +95,15 @@ public:
void setPendingMessageTracker(distributor::PendingMessageTracker& tracker) {
_pending_message_tracker = &tracker;
}
+
+ const distributor::OperationSequencer& operation_sequencer() const noexcept override {
+ assert(_operation_sequencer);
+ return *_operation_sequencer;
+ }
+
+ void set_operation_sequencer(distributor::OperationSequencer& op_seq) {
+ _operation_sequencer = &op_seq;
+ }
};
}
diff --git a/storage/src/tests/distributor/idealstatemanagertest.cpp b/storage/src/tests/distributor/idealstatemanagertest.cpp
index d66b1315fde..d5374907723 100644
--- a/storage/src/tests/distributor/idealstatemanagertest.cpp
+++ b/storage/src/tests/distributor/idealstatemanagertest.cpp
@@ -4,6 +4,7 @@
#include <vespa/storage/distributor/bucketdbupdater.h>
#include <vespa/storage/distributor/distributor.h>
#include <vespa/storage/distributor/operations/idealstate/mergeoperation.h>
+#include <vespa/storage/distributor/operation_sequencer.h>
#include <vespa/storageapi/message/stat.h>
#include <vespa/storageapi/message/visitor.h>
#include <vespa/storageapi/message/bucketsplitting.h>
@@ -41,16 +42,18 @@ struct IdealStateManagerTest : Test, DistributorTestUtil {
bool checkBlock(const IdealStateOperation& op,
const document::Bucket& bucket,
- const PendingMessageTracker& tracker) const
+ const PendingMessageTracker& tracker,
+ const OperationSequencer& op_seq) const
{
- return op.checkBlock(bucket, tracker);
+ return op.checkBlock(bucket, tracker, op_seq);
}
bool checkBlockForAllNodes(const IdealStateOperation& op,
const document::Bucket& bucket,
- const PendingMessageTracker& tracker) const
+ const PendingMessageTracker& tracker,
+ const OperationSequencer& op_seq) const
{
- return op.checkBlockForAllNodes(bucket, tracker);
+ return op.checkBlockForAllNodes(bucket, tracker, op_seq);
}
std::vector<document::BucketSpace> _bucketSpaces;
@@ -178,6 +181,7 @@ TEST_F(IdealStateManagerTest, block_ideal_state_ops_on_full_request_bucket_info)
framework::defaultimplementation::FakeClock clock;
PendingMessageTracker tracker(_node->getComponentRegister());
+ OperationSequencer op_seq;
document::BucketId bid(16, 1234);
std::vector<document::BucketId> buckets;
@@ -193,14 +197,14 @@ TEST_F(IdealStateManagerTest, block_ideal_state_ops_on_full_request_bucket_info)
{
RemoveBucketOperation op("storage",
BucketAndNodes(makeDocumentBucket(bid), toVector<uint16_t>(3, 4)));
- EXPECT_TRUE(op.isBlocked(tracker));
+ EXPECT_TRUE(op.isBlocked(tracker, op_seq));
}
{
// Don't trigger on requests to other nodes.
RemoveBucketOperation op("storage",
BucketAndNodes(makeDocumentBucket(bid), toVector<uint16_t>(3, 5)));
- EXPECT_FALSE(op.isBlocked(tracker));
+ EXPECT_FALSE(op.isBlocked(tracker, op_seq));
}
// Don't block on null-bucket messages that aren't RequestBucketInfo.
@@ -213,7 +217,7 @@ TEST_F(IdealStateManagerTest, block_ideal_state_ops_on_full_request_bucket_info)
{
RemoveBucketOperation op("storage",
BucketAndNodes(makeDocumentBucket(bid), toVector<uint16_t>(7)));
- EXPECT_FALSE(op.isBlocked(tracker));
+ EXPECT_FALSE(op.isBlocked(tracker, op_seq));
}
}
@@ -221,6 +225,7 @@ TEST_F(IdealStateManagerTest, block_check_for_all_operations_to_specific_bucket)
setupDistributor(2, 10, "distributor:1 storage:2");
framework::defaultimplementation::FakeClock clock;
PendingMessageTracker tracker(_node->getComponentRegister());
+ OperationSequencer op_seq;
document::BucketId bid(16, 1234);
{
@@ -232,9 +237,30 @@ TEST_F(IdealStateManagerTest, block_check_for_all_operations_to_specific_bucket)
RemoveBucketOperation op("storage",
BucketAndNodes(makeDocumentBucket(bid), toVector<uint16_t>(7)));
// Not blocked for exact node match.
- EXPECT_FALSE(checkBlock(op, makeDocumentBucket(bid), tracker));
+ EXPECT_FALSE(checkBlock(op, makeDocumentBucket(bid), tracker, op_seq));
// But blocked for bucket match!
- EXPECT_TRUE(checkBlockForAllNodes(op, makeDocumentBucket(bid), tracker));
+ EXPECT_TRUE(checkBlockForAllNodes(op, makeDocumentBucket(bid), tracker, op_seq));
+ }
+}
+
+TEST_F(IdealStateManagerTest, block_operations_with_locked_buckets) {
+ setupDistributor(2, 10, "distributor:1 storage:2");
+ framework::defaultimplementation::FakeClock clock;
+ PendingMessageTracker tracker(_node->getComponentRegister());
+ OperationSequencer op_seq;
+ const auto bucket = makeDocumentBucket(document::BucketId(16, 1234));
+
+ {
+ auto msg = std::make_shared<api::JoinBucketsCommand>(bucket);
+ msg->setAddress(api::StorageMessageAddress::create(&_Storage, lib::NodeType::STORAGE, 1));
+ tracker.insert(msg);
+ }
+ auto token = op_seq.try_acquire(bucket);
+ EXPECT_TRUE(token.valid());
+ {
+ RemoveBucketOperation op("storage", BucketAndNodes(bucket, toVector<uint16_t>(0)));
+ EXPECT_TRUE(checkBlock(op, bucket, tracker, op_seq));
+ EXPECT_TRUE(checkBlockForAllNodes(op, bucket, tracker, op_seq));
}
}
diff --git a/storage/src/tests/distributor/maintenancemocks.h b/storage/src/tests/distributor/maintenancemocks.h
index c88e477e90e..2bfb4ebb40f 100644
--- a/storage/src/tests/distributor/maintenancemocks.h
+++ b/storage/src/tests/distributor/maintenancemocks.h
@@ -51,7 +51,7 @@ public:
}
void onStart(DistributorMessageSender&) override {}
void onReceive(DistributorMessageSender&, const std::shared_ptr<api::StorageReply>&) override {}
- bool isBlocked(const PendingMessageTracker&) const override {
+ bool isBlocked(const PendingMessageTracker&, const OperationSequencer&) const override {
return _shouldBlock;
}
void setShouldBlock(bool shouldBlock) {
@@ -64,7 +64,7 @@ class MockMaintenanceOperationGenerator
{
public:
MaintenanceOperation::SP generate(const document::Bucket&bucket) const override {
- return MaintenanceOperation::SP(new MockOperation(bucket));
+ return std::make_shared<MockOperation>(bucket);
}
std::vector<MaintenanceOperation::SP> generateAll(
@@ -73,7 +73,7 @@ public:
{
(void) tracker;
std::vector<MaintenanceOperation::SP> ret;
- ret.push_back(MaintenanceOperation::SP(new MockOperation(bucket)));
+ ret.emplace_back(std::make_shared<MockOperation>(bucket));
return ret;
}
diff --git a/storage/src/tests/distributor/mergeoperationtest.cpp b/storage/src/tests/distributor/mergeoperationtest.cpp
index ccd70309a88..852c2ef8754 100644
--- a/storage/src/tests/distributor/mergeoperationtest.cpp
+++ b/storage/src/tests/distributor/mergeoperationtest.cpp
@@ -6,6 +6,7 @@
#include <vespa/storage/distributor/operations/idealstate/mergeoperation.h>
#include <vespa/storage/distributor/bucketdbupdater.h>
#include <vespa/storage/distributor/distributor.h>
+#include <vespa/storage/distributor/operation_sequencer.h>
#include <tests/distributor/distributortestutil.h>
#include <vespa/vespalib/text/stringtokenizer.h>
#include <vespa/vespalib/gtest/gtest.h>
@@ -17,11 +18,13 @@ namespace storage::distributor {
struct MergeOperationTest : Test, DistributorTestUtil {
std::unique_ptr<PendingMessageTracker> _pendingTracker;
+ OperationSequencer _operation_sequencer;
void SetUp() override {
createLinks();
_pendingTracker = std::make_unique<PendingMessageTracker>(getComponentRegister());
_sender.setPendingMessageTracker(*_pendingTracker);
+ _sender.set_operation_sequencer(_operation_sequencer);
}
void TearDown() override {
@@ -397,18 +400,31 @@ TEST_F(MergeOperationTest, merge_operation_is_blocked_by_any_busy_target_node) {
// Should not block on nodes _not_ included in operation node set
_pendingTracker->getNodeInfo().setBusy(3, std::chrono::seconds(10));
- EXPECT_FALSE(op.isBlocked(*_pendingTracker));
+ EXPECT_FALSE(op.isBlocked(*_pendingTracker, _operation_sequencer));
// Node 1 is included in operation node set and should cause a block
_pendingTracker->getNodeInfo().setBusy(0, std::chrono::seconds(10));
- EXPECT_TRUE(op.isBlocked(*_pendingTracker));
+ EXPECT_TRUE(op.isBlocked(*_pendingTracker, _operation_sequencer));
getClock().addSecondsToTime(11);
- EXPECT_FALSE(op.isBlocked(*_pendingTracker)); // No longer busy
+ EXPECT_FALSE(op.isBlocked(*_pendingTracker, _operation_sequencer)); // No longer busy
// Should block on other operation nodes than the first listed as well
_pendingTracker->getNodeInfo().setBusy(1, std::chrono::seconds(10));
- EXPECT_TRUE(op.isBlocked(*_pendingTracker));
+ EXPECT_TRUE(op.isBlocked(*_pendingTracker, _operation_sequencer));
+}
+
+TEST_F(MergeOperationTest, merge_operation_is_blocked_by_locked_bucket) {
+ getClock().setAbsoluteTimeInSeconds(10);
+ addNodesToBucketDB(document::BucketId(16, 1), "0=10/1/1/t,1=20/1/1,2=10/1/1/t");
+ enableDistributorClusterState("distributor:1 storage:3");
+ MergeOperation op(BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)), toVector<uint16_t>(0, 1, 2)));
+ op.setIdealStateManager(&getIdealStateManager());
+
+ EXPECT_FALSE(op.isBlocked(*_pendingTracker, _operation_sequencer));
+ auto token = _operation_sequencer.try_acquire(makeDocumentBucket(document::BucketId(16, 1)));
+ EXPECT_TRUE(token.valid());
+ EXPECT_TRUE(op.isBlocked(*_pendingTracker, _operation_sequencer));
}
TEST_F(MergeOperationTest, missing_replica_is_included_in_limited_node_list) {
diff --git a/storage/src/tests/distributor/operation_sequencer_test.cpp b/storage/src/tests/distributor/operation_sequencer_test.cpp
index 37651be05a4..7c9a22b5226 100644
--- a/storage/src/tests/distributor/operation_sequencer_test.cpp
+++ b/storage/src/tests/distributor/operation_sequencer_test.cpp
@@ -61,8 +61,11 @@ TEST_F(OperationSequencerTest, releasing_handle_allows_for_getting_new_handles_f
}
TEST_F(OperationSequencerTest, cannot_get_handle_for_gid_contained_in_locked_bucket) {
- auto bucket_handle = sequencer.try_acquire(document::Bucket(default_space(), document::BucketId(16, 1)));
+ const auto bucket = document::Bucket(default_space(), document::BucketId(16, 1));
+ EXPECT_FALSE(sequencer.is_blocked(bucket));
+ auto bucket_handle = sequencer.try_acquire(bucket);
EXPECT_TRUE(bucket_handle.valid());
+ EXPECT_TRUE(sequencer.is_blocked(bucket));
auto doc_handle = sequencer.try_acquire(default_space(), DocumentId("id:foo:test:n=1:abcd"));
EXPECT_FALSE(doc_handle.valid());
ASSERT_TRUE(doc_handle.is_blocked());
@@ -78,10 +81,12 @@ TEST_F(OperationSequencerTest, can_get_handle_for_gid_not_contained_in_active_bu
}
TEST_F(OperationSequencerTest, releasing_bucket_lock_allows_gid_handles_to_be_acquired) {
- auto bucket_handle = sequencer.try_acquire(document::Bucket(default_space(), document::BucketId(16, 1)));
+ const auto bucket = document::Bucket(default_space(), document::BucketId(16, 1));
+ auto bucket_handle = sequencer.try_acquire(bucket);
bucket_handle.release();
auto doc_handle = sequencer.try_acquire(default_space(), DocumentId("id:foo:test:n=1:abcd"));
EXPECT_TRUE(doc_handle.valid());
+ EXPECT_FALSE(sequencer.is_blocked(bucket));
}
TEST_F(OperationSequencerTest, can_get_handle_for_gid_when_locked_bucket_is_in_separate_bucket_space) {
@@ -91,4 +96,9 @@ TEST_F(OperationSequencerTest, can_get_handle_for_gid_when_locked_bucket_is_in_s
EXPECT_TRUE(doc_handle.valid());
}
+TEST_F(OperationSequencerTest, is_blocked_is_bucket_space_aware) {
+ auto bucket_handle = sequencer.try_acquire(document::Bucket(default_space(), document::BucketId(16, 1)));
+ EXPECT_FALSE(sequencer.is_blocked(document::Bucket(global_space(), document::BucketId(16, 1))));
+}
+
} // storage::distributor
diff --git a/storage/src/tests/distributor/splitbuckettest.cpp b/storage/src/tests/distributor/splitbuckettest.cpp
index c876f3e7ee4..9bb2e8b04a7 100644
--- a/storage/src/tests/distributor/splitbuckettest.cpp
+++ b/storage/src/tests/distributor/splitbuckettest.cpp
@@ -1,13 +1,14 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include <tests/common/dummystoragelink.h>
-#include <vespa/storageapi/message/bucketsplitting.h>
-#include <vespa/storage/distributor/operations/idealstate/splitoperation.h>
#include <vespa/document/base/documentid.h>
-#include <vespa/storageapi/message/persistence.h>
-#include <vespa/storage/distributor/idealstatemanager.h>
-#include <tests/distributor/distributortestutil.h>
#include <vespa/document/test/make_document_bucket.h>
#include <vespa/storage/distributor/distributor.h>
+#include <vespa/storage/distributor/idealstatemanager.h>
+#include <vespa/storage/distributor/operation_sequencer.h>
+#include <vespa/storage/distributor/operations/idealstate/splitoperation.h>
+#include <vespa/storageapi/message/bucketsplitting.h>
+#include <vespa/storageapi/message/persistence.h>
+#include <tests/common/dummystoragelink.h>
+#include <tests/distributor/distributortestutil.h>
#include <vespa/vespalib/gtest/gtest.h>
using document::test::makeDocumentBucket;
@@ -260,6 +261,7 @@ TEST_F(SplitOperationTest, operation_blocked_by_pending_join) {
compReg.setClock(clock);
clock.setAbsoluteTimeInSeconds(1);
PendingMessageTracker tracker(compReg);
+ OperationSequencer op_seq;
enableDistributorClusterState("distributor:1 storage:2");
@@ -281,17 +283,39 @@ TEST_F(SplitOperationTest, operation_blocked_by_pending_join) {
splitCount,
splitByteSize);
- EXPECT_TRUE(op.isBlocked(tracker));
+ EXPECT_TRUE(op.isBlocked(tracker, op_seq));
// Now, pretend there's a join for another node in the same bucket. This
// will happen when a join is partially completed.
tracker.clearMessagesForNode(0);
- EXPECT_FALSE(op.isBlocked(tracker));
+ EXPECT_FALSE(op.isBlocked(tracker, op_seq));
joinCmd->setAddress(api::StorageMessageAddress::create(&_Storage, lib::NodeType::STORAGE, 1));
tracker.insert(joinCmd);
- EXPECT_TRUE(op.isBlocked(tracker));
+ EXPECT_TRUE(op.isBlocked(tracker, op_seq));
+}
+
+TEST_F(SplitOperationTest, split_is_blocked_by_locked_bucket) {
+ StorageComponentRegisterImpl compReg;
+ framework::defaultimplementation::FakeClock clock;
+ compReg.setClock(clock);
+ clock.setAbsoluteTimeInSeconds(1);
+ PendingMessageTracker tracker(compReg);
+ OperationSequencer op_seq;
+
+ enableDistributorClusterState("distributor:1 storage:2");
+
+ document::BucketId source_bucket(16, 1);
+ insertBucketInfo(source_bucket, 0, 0xabc, 1000, tooLargeBucketSize, 250);
+
+ SplitOperation op("storage", BucketAndNodes(makeDocumentBucket(source_bucket), toVector<uint16_t>(0)),
+ maxSplitBits, splitCount, splitByteSize);
+
+ EXPECT_FALSE(op.isBlocked(tracker, op_seq));
+ auto token = op_seq.try_acquire(makeDocumentBucket(source_bucket));
+ EXPECT_TRUE(token.valid());
+ EXPECT_TRUE(op.isBlocked(tracker, op_seq));
}
} // storage::distributor