aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2020-12-07 15:31:31 +0000
committerTor Brede Vekterli <vekterli@verizonmedia.com>2020-12-07 16:05:30 +0000
commit81853e7e81313a374dea7f2dec377f437393c47c (patch)
tree44c4362357059e5c3dce8cf37689ed61c73f338a
parentf2ececfb183ef73d6a53d74ee84798e0dc7eaf36 (diff)
Block ideal state operations towards buckets that are locked
Prevents ideal state ops from modifying buckets that are being used in a read-for-write context. Move `OperationSequencer` to main `Distributor` to more easily facilitate sharing of it across components.
-rw-r--r--storage/src/tests/distributor/blockingoperationstartertest.cpp5
-rw-r--r--storage/src/tests/distributor/distributor_message_sender_stub.cpp3
-rw-r--r--storage/src/tests/distributor/distributor_message_sender_stub.h10
-rw-r--r--storage/src/tests/distributor/idealstatemanagertest.cpp44
-rw-r--r--storage/src/tests/distributor/maintenancemocks.h6
-rw-r--r--storage/src/tests/distributor/mergeoperationtest.cpp24
-rw-r--r--storage/src/tests/distributor/operation_sequencer_test.cpp14
-rw-r--r--storage/src/tests/distributor/splitbuckettest.cpp42
-rw-r--r--storage/src/vespa/storage/distributor/blockingoperationstarter.cpp2
-rw-r--r--storage/src/vespa/storage/distributor/blockingoperationstarter.h6
-rw-r--r--storage/src/vespa/storage/distributor/distributor.cpp9
-rw-r--r--storage/src/vespa/storage/distributor/distributor.h7
-rw-r--r--storage/src/vespa/storage/distributor/distributormessagesender.h2
-rw-r--r--storage/src/vespa/storage/distributor/externaloperationhandler.cpp6
-rw-r--r--storage/src/vespa/storage/distributor/externaloperationhandler.h6
-rw-r--r--storage/src/vespa/storage/distributor/operation_sequencer.cpp4
-rw-r--r--storage/src/vespa/storage/distributor/operation_sequencer.h2
-rw-r--r--storage/src/vespa/storage/distributor/operationowner.h4
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp4
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp17
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h10
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp8
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.h3
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp7
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h2
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp4
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.h2
-rw-r--r--storage/src/vespa/storage/distributor/operations/operation.h3
28 files changed, 199 insertions, 57 deletions
diff --git a/storage/src/tests/distributor/blockingoperationstartertest.cpp b/storage/src/tests/distributor/blockingoperationstartertest.cpp
index 9a9e04f0f33..5203fec2462 100644
--- a/storage/src/tests/distributor/blockingoperationstartertest.cpp
+++ b/storage/src/tests/distributor/blockingoperationstartertest.cpp
@@ -2,6 +2,7 @@
#include <vespa/storage/frameworkimpl/component/storagecomponentregisterimpl.h>
#include <vespa/storage/distributor/blockingoperationstarter.h>
#include <vespa/storage/distributor/pendingmessagetracker.h>
+#include <vespa/storage/distributor/operation_sequencer.h>
#include <tests/distributor/maintenancemocks.h>
#include <vespa/document/test/make_document_bucket.h>
#include <vespa/vespalib/gtest/gtest.h>
@@ -26,6 +27,7 @@ struct BlockingOperationStarterTest : Test {
std::unique_ptr<MockOperationStarter> _starterImpl;
std::unique_ptr<StorageComponentRegisterImpl> _compReg;
std::unique_ptr<PendingMessageTracker> _messageTracker;
+ std::unique_ptr<OperationSequencer> _operation_sequencer;
std::unique_ptr<BlockingOperationStarter> _operationStarter;
void SetUp() override;
@@ -39,7 +41,8 @@ BlockingOperationStarterTest::SetUp()
_compReg->setClock(_clock);
_clock.setAbsoluteTimeInSeconds(1);
_messageTracker = std::make_unique<PendingMessageTracker>(*_compReg);
- _operationStarter = std::make_unique<BlockingOperationStarter>(*_messageTracker, *_starterImpl);
+ _operation_sequencer = std::make_unique<OperationSequencer>();
+ _operationStarter = std::make_unique<BlockingOperationStarter>(*_messageTracker, *_operation_sequencer, *_starterImpl);
}
TEST_F(BlockingOperationStarterTest, operation_not_blocked_when_no_messages_pending) {
diff --git a/storage/src/tests/distributor/distributor_message_sender_stub.cpp b/storage/src/tests/distributor/distributor_message_sender_stub.cpp
index df894f1bb2c..893ebcd158a 100644
--- a/storage/src/tests/distributor/distributor_message_sender_stub.cpp
+++ b/storage/src/tests/distributor/distributor_message_sender_stub.cpp
@@ -12,7 +12,8 @@ namespace storage {
DistributorMessageSenderStub::DistributorMessageSenderStub()
: _stub_impl(),
_cluster_name("storage"),
- _pending_message_tracker(nullptr)
+ _pending_message_tracker(nullptr),
+ _operation_sequencer(nullptr)
{}
DistributorMessageSenderStub::~DistributorMessageSenderStub() = default;
diff --git a/storage/src/tests/distributor/distributor_message_sender_stub.h b/storage/src/tests/distributor/distributor_message_sender_stub.h
index e69673a9366..0b5ab5ae9a7 100644
--- a/storage/src/tests/distributor/distributor_message_sender_stub.h
+++ b/storage/src/tests/distributor/distributor_message_sender_stub.h
@@ -13,6 +13,7 @@ class DistributorMessageSenderStub : public distributor::DistributorMessageSende
MessageSenderStub _stub_impl;
vespalib::string _cluster_name;
distributor::PendingMessageTracker* _pending_message_tracker;
+ distributor::OperationSequencer* _operation_sequencer;
public:
DistributorMessageSenderStub();
@@ -94,6 +95,15 @@ public:
void setPendingMessageTracker(distributor::PendingMessageTracker& tracker) {
_pending_message_tracker = &tracker;
}
+
+ const distributor::OperationSequencer& operation_sequencer() const noexcept override {
+ assert(_operation_sequencer);
+ return *_operation_sequencer;
+ }
+
+ void set_operation_sequencer(distributor::OperationSequencer& op_seq) {
+ _operation_sequencer = &op_seq;
+ }
};
}
diff --git a/storage/src/tests/distributor/idealstatemanagertest.cpp b/storage/src/tests/distributor/idealstatemanagertest.cpp
index d66b1315fde..d5374907723 100644
--- a/storage/src/tests/distributor/idealstatemanagertest.cpp
+++ b/storage/src/tests/distributor/idealstatemanagertest.cpp
@@ -4,6 +4,7 @@
#include <vespa/storage/distributor/bucketdbupdater.h>
#include <vespa/storage/distributor/distributor.h>
#include <vespa/storage/distributor/operations/idealstate/mergeoperation.h>
+#include <vespa/storage/distributor/operation_sequencer.h>
#include <vespa/storageapi/message/stat.h>
#include <vespa/storageapi/message/visitor.h>
#include <vespa/storageapi/message/bucketsplitting.h>
@@ -41,16 +42,18 @@ struct IdealStateManagerTest : Test, DistributorTestUtil {
bool checkBlock(const IdealStateOperation& op,
const document::Bucket& bucket,
- const PendingMessageTracker& tracker) const
+ const PendingMessageTracker& tracker,
+ const OperationSequencer& op_seq) const
{
- return op.checkBlock(bucket, tracker);
+ return op.checkBlock(bucket, tracker, op_seq);
}
bool checkBlockForAllNodes(const IdealStateOperation& op,
const document::Bucket& bucket,
- const PendingMessageTracker& tracker) const
+ const PendingMessageTracker& tracker,
+ const OperationSequencer& op_seq) const
{
- return op.checkBlockForAllNodes(bucket, tracker);
+ return op.checkBlockForAllNodes(bucket, tracker, op_seq);
}
std::vector<document::BucketSpace> _bucketSpaces;
@@ -178,6 +181,7 @@ TEST_F(IdealStateManagerTest, block_ideal_state_ops_on_full_request_bucket_info)
framework::defaultimplementation::FakeClock clock;
PendingMessageTracker tracker(_node->getComponentRegister());
+ OperationSequencer op_seq;
document::BucketId bid(16, 1234);
std::vector<document::BucketId> buckets;
@@ -193,14 +197,14 @@ TEST_F(IdealStateManagerTest, block_ideal_state_ops_on_full_request_bucket_info)
{
RemoveBucketOperation op("storage",
BucketAndNodes(makeDocumentBucket(bid), toVector<uint16_t>(3, 4)));
- EXPECT_TRUE(op.isBlocked(tracker));
+ EXPECT_TRUE(op.isBlocked(tracker, op_seq));
}
{
// Don't trigger on requests to other nodes.
RemoveBucketOperation op("storage",
BucketAndNodes(makeDocumentBucket(bid), toVector<uint16_t>(3, 5)));
- EXPECT_FALSE(op.isBlocked(tracker));
+ EXPECT_FALSE(op.isBlocked(tracker, op_seq));
}
// Don't block on null-bucket messages that aren't RequestBucketInfo.
@@ -213,7 +217,7 @@ TEST_F(IdealStateManagerTest, block_ideal_state_ops_on_full_request_bucket_info)
{
RemoveBucketOperation op("storage",
BucketAndNodes(makeDocumentBucket(bid), toVector<uint16_t>(7)));
- EXPECT_FALSE(op.isBlocked(tracker));
+ EXPECT_FALSE(op.isBlocked(tracker, op_seq));
}
}
@@ -221,6 +225,7 @@ TEST_F(IdealStateManagerTest, block_check_for_all_operations_to_specific_bucket)
setupDistributor(2, 10, "distributor:1 storage:2");
framework::defaultimplementation::FakeClock clock;
PendingMessageTracker tracker(_node->getComponentRegister());
+ OperationSequencer op_seq;
document::BucketId bid(16, 1234);
{
@@ -232,9 +237,30 @@ TEST_F(IdealStateManagerTest, block_check_for_all_operations_to_specific_bucket)
RemoveBucketOperation op("storage",
BucketAndNodes(makeDocumentBucket(bid), toVector<uint16_t>(7)));
// Not blocked for exact node match.
- EXPECT_FALSE(checkBlock(op, makeDocumentBucket(bid), tracker));
+ EXPECT_FALSE(checkBlock(op, makeDocumentBucket(bid), tracker, op_seq));
// But blocked for bucket match!
- EXPECT_TRUE(checkBlockForAllNodes(op, makeDocumentBucket(bid), tracker));
+ EXPECT_TRUE(checkBlockForAllNodes(op, makeDocumentBucket(bid), tracker, op_seq));
+ }
+}
+
+TEST_F(IdealStateManagerTest, block_operations_with_locked_buckets) {
+ setupDistributor(2, 10, "distributor:1 storage:2");
+ framework::defaultimplementation::FakeClock clock;
+ PendingMessageTracker tracker(_node->getComponentRegister());
+ OperationSequencer op_seq;
+ const auto bucket = makeDocumentBucket(document::BucketId(16, 1234));
+
+ {
+ auto msg = std::make_shared<api::JoinBucketsCommand>(bucket);
+ msg->setAddress(api::StorageMessageAddress::create(&_Storage, lib::NodeType::STORAGE, 1));
+ tracker.insert(msg);
+ }
+ auto token = op_seq.try_acquire(bucket);
+ EXPECT_TRUE(token.valid());
+ {
+ RemoveBucketOperation op("storage", BucketAndNodes(bucket, toVector<uint16_t>(0)));
+ EXPECT_TRUE(checkBlock(op, bucket, tracker, op_seq));
+ EXPECT_TRUE(checkBlockForAllNodes(op, bucket, tracker, op_seq));
}
}
diff --git a/storage/src/tests/distributor/maintenancemocks.h b/storage/src/tests/distributor/maintenancemocks.h
index c88e477e90e..2bfb4ebb40f 100644
--- a/storage/src/tests/distributor/maintenancemocks.h
+++ b/storage/src/tests/distributor/maintenancemocks.h
@@ -51,7 +51,7 @@ public:
}
void onStart(DistributorMessageSender&) override {}
void onReceive(DistributorMessageSender&, const std::shared_ptr<api::StorageReply>&) override {}
- bool isBlocked(const PendingMessageTracker&) const override {
+ bool isBlocked(const PendingMessageTracker&, const OperationSequencer&) const override {
return _shouldBlock;
}
void setShouldBlock(bool shouldBlock) {
@@ -64,7 +64,7 @@ class MockMaintenanceOperationGenerator
{
public:
MaintenanceOperation::SP generate(const document::Bucket&bucket) const override {
- return MaintenanceOperation::SP(new MockOperation(bucket));
+ return std::make_shared<MockOperation>(bucket);
}
std::vector<MaintenanceOperation::SP> generateAll(
@@ -73,7 +73,7 @@ public:
{
(void) tracker;
std::vector<MaintenanceOperation::SP> ret;
- ret.push_back(MaintenanceOperation::SP(new MockOperation(bucket)));
+ ret.emplace_back(std::make_shared<MockOperation>(bucket));
return ret;
}
diff --git a/storage/src/tests/distributor/mergeoperationtest.cpp b/storage/src/tests/distributor/mergeoperationtest.cpp
index ccd70309a88..852c2ef8754 100644
--- a/storage/src/tests/distributor/mergeoperationtest.cpp
+++ b/storage/src/tests/distributor/mergeoperationtest.cpp
@@ -6,6 +6,7 @@
#include <vespa/storage/distributor/operations/idealstate/mergeoperation.h>
#include <vespa/storage/distributor/bucketdbupdater.h>
#include <vespa/storage/distributor/distributor.h>
+#include <vespa/storage/distributor/operation_sequencer.h>
#include <tests/distributor/distributortestutil.h>
#include <vespa/vespalib/text/stringtokenizer.h>
#include <vespa/vespalib/gtest/gtest.h>
@@ -17,11 +18,13 @@ namespace storage::distributor {
struct MergeOperationTest : Test, DistributorTestUtil {
std::unique_ptr<PendingMessageTracker> _pendingTracker;
+ OperationSequencer _operation_sequencer;
void SetUp() override {
createLinks();
_pendingTracker = std::make_unique<PendingMessageTracker>(getComponentRegister());
_sender.setPendingMessageTracker(*_pendingTracker);
+ _sender.set_operation_sequencer(_operation_sequencer);
}
void TearDown() override {
@@ -397,18 +400,31 @@ TEST_F(MergeOperationTest, merge_operation_is_blocked_by_any_busy_target_node) {
// Should not block on nodes _not_ included in operation node set
_pendingTracker->getNodeInfo().setBusy(3, std::chrono::seconds(10));
- EXPECT_FALSE(op.isBlocked(*_pendingTracker));
+ EXPECT_FALSE(op.isBlocked(*_pendingTracker, _operation_sequencer));
// Node 1 is included in operation node set and should cause a block
_pendingTracker->getNodeInfo().setBusy(0, std::chrono::seconds(10));
- EXPECT_TRUE(op.isBlocked(*_pendingTracker));
+ EXPECT_TRUE(op.isBlocked(*_pendingTracker, _operation_sequencer));
getClock().addSecondsToTime(11);
- EXPECT_FALSE(op.isBlocked(*_pendingTracker)); // No longer busy
+ EXPECT_FALSE(op.isBlocked(*_pendingTracker, _operation_sequencer)); // No longer busy
// Should block on other operation nodes than the first listed as well
_pendingTracker->getNodeInfo().setBusy(1, std::chrono::seconds(10));
- EXPECT_TRUE(op.isBlocked(*_pendingTracker));
+ EXPECT_TRUE(op.isBlocked(*_pendingTracker, _operation_sequencer));
+}
+
+TEST_F(MergeOperationTest, merge_operation_is_blocked_by_locked_bucket) {
+ getClock().setAbsoluteTimeInSeconds(10);
+ addNodesToBucketDB(document::BucketId(16, 1), "0=10/1/1/t,1=20/1/1,2=10/1/1/t");
+ enableDistributorClusterState("distributor:1 storage:3");
+ MergeOperation op(BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)), toVector<uint16_t>(0, 1, 2)));
+ op.setIdealStateManager(&getIdealStateManager());
+
+ EXPECT_FALSE(op.isBlocked(*_pendingTracker, _operation_sequencer));
+ auto token = _operation_sequencer.try_acquire(makeDocumentBucket(document::BucketId(16, 1)));
+ EXPECT_TRUE(token.valid());
+ EXPECT_TRUE(op.isBlocked(*_pendingTracker, _operation_sequencer));
}
TEST_F(MergeOperationTest, missing_replica_is_included_in_limited_node_list) {
diff --git a/storage/src/tests/distributor/operation_sequencer_test.cpp b/storage/src/tests/distributor/operation_sequencer_test.cpp
index 37651be05a4..7c9a22b5226 100644
--- a/storage/src/tests/distributor/operation_sequencer_test.cpp
+++ b/storage/src/tests/distributor/operation_sequencer_test.cpp
@@ -61,8 +61,11 @@ TEST_F(OperationSequencerTest, releasing_handle_allows_for_getting_new_handles_f
}
TEST_F(OperationSequencerTest, cannot_get_handle_for_gid_contained_in_locked_bucket) {
- auto bucket_handle = sequencer.try_acquire(document::Bucket(default_space(), document::BucketId(16, 1)));
+ const auto bucket = document::Bucket(default_space(), document::BucketId(16, 1));
+ EXPECT_FALSE(sequencer.is_blocked(bucket));
+ auto bucket_handle = sequencer.try_acquire(bucket);
EXPECT_TRUE(bucket_handle.valid());
+ EXPECT_TRUE(sequencer.is_blocked(bucket));
auto doc_handle = sequencer.try_acquire(default_space(), DocumentId("id:foo:test:n=1:abcd"));
EXPECT_FALSE(doc_handle.valid());
ASSERT_TRUE(doc_handle.is_blocked());
@@ -78,10 +81,12 @@ TEST_F(OperationSequencerTest, can_get_handle_for_gid_not_contained_in_active_bu
}
TEST_F(OperationSequencerTest, releasing_bucket_lock_allows_gid_handles_to_be_acquired) {
- auto bucket_handle = sequencer.try_acquire(document::Bucket(default_space(), document::BucketId(16, 1)));
+ const auto bucket = document::Bucket(default_space(), document::BucketId(16, 1));
+ auto bucket_handle = sequencer.try_acquire(bucket);
bucket_handle.release();
auto doc_handle = sequencer.try_acquire(default_space(), DocumentId("id:foo:test:n=1:abcd"));
EXPECT_TRUE(doc_handle.valid());
+ EXPECT_FALSE(sequencer.is_blocked(bucket));
}
TEST_F(OperationSequencerTest, can_get_handle_for_gid_when_locked_bucket_is_in_separate_bucket_space) {
@@ -91,4 +96,9 @@ TEST_F(OperationSequencerTest, can_get_handle_for_gid_when_locked_bucket_is_in_s
EXPECT_TRUE(doc_handle.valid());
}
+TEST_F(OperationSequencerTest, is_blocked_is_bucket_space_aware) {
+ auto bucket_handle = sequencer.try_acquire(document::Bucket(default_space(), document::BucketId(16, 1)));
+ EXPECT_FALSE(sequencer.is_blocked(document::Bucket(global_space(), document::BucketId(16, 1))));
+}
+
} // storage::distributor
diff --git a/storage/src/tests/distributor/splitbuckettest.cpp b/storage/src/tests/distributor/splitbuckettest.cpp
index c876f3e7ee4..9bb2e8b04a7 100644
--- a/storage/src/tests/distributor/splitbuckettest.cpp
+++ b/storage/src/tests/distributor/splitbuckettest.cpp
@@ -1,13 +1,14 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include <tests/common/dummystoragelink.h>
-#include <vespa/storageapi/message/bucketsplitting.h>
-#include <vespa/storage/distributor/operations/idealstate/splitoperation.h>
#include <vespa/document/base/documentid.h>
-#include <vespa/storageapi/message/persistence.h>
-#include <vespa/storage/distributor/idealstatemanager.h>
-#include <tests/distributor/distributortestutil.h>
#include <vespa/document/test/make_document_bucket.h>
#include <vespa/storage/distributor/distributor.h>
+#include <vespa/storage/distributor/idealstatemanager.h>
+#include <vespa/storage/distributor/operation_sequencer.h>
+#include <vespa/storage/distributor/operations/idealstate/splitoperation.h>
+#include <vespa/storageapi/message/bucketsplitting.h>
+#include <vespa/storageapi/message/persistence.h>
+#include <tests/common/dummystoragelink.h>
+#include <tests/distributor/distributortestutil.h>
#include <vespa/vespalib/gtest/gtest.h>
using document::test::makeDocumentBucket;
@@ -260,6 +261,7 @@ TEST_F(SplitOperationTest, operation_blocked_by_pending_join) {
compReg.setClock(clock);
clock.setAbsoluteTimeInSeconds(1);
PendingMessageTracker tracker(compReg);
+ OperationSequencer op_seq;
enableDistributorClusterState("distributor:1 storage:2");
@@ -281,17 +283,39 @@ TEST_F(SplitOperationTest, operation_blocked_by_pending_join) {
splitCount,
splitByteSize);
- EXPECT_TRUE(op.isBlocked(tracker));
+ EXPECT_TRUE(op.isBlocked(tracker, op_seq));
// Now, pretend there's a join for another node in the same bucket. This
// will happen when a join is partially completed.
tracker.clearMessagesForNode(0);
- EXPECT_FALSE(op.isBlocked(tracker));
+ EXPECT_FALSE(op.isBlocked(tracker, op_seq));
joinCmd->setAddress(api::StorageMessageAddress::create(&_Storage, lib::NodeType::STORAGE, 1));
tracker.insert(joinCmd);
- EXPECT_TRUE(op.isBlocked(tracker));
+ EXPECT_TRUE(op.isBlocked(tracker, op_seq));
+}
+
+TEST_F(SplitOperationTest, split_is_blocked_by_locked_bucket) {
+ StorageComponentRegisterImpl compReg;
+ framework::defaultimplementation::FakeClock clock;
+ compReg.setClock(clock);
+ clock.setAbsoluteTimeInSeconds(1);
+ PendingMessageTracker tracker(compReg);
+ OperationSequencer op_seq;
+
+ enableDistributorClusterState("distributor:1 storage:2");
+
+ document::BucketId source_bucket(16, 1);
+ insertBucketInfo(source_bucket, 0, 0xabc, 1000, tooLargeBucketSize, 250);
+
+ SplitOperation op("storage", BucketAndNodes(makeDocumentBucket(source_bucket), toVector<uint16_t>(0)),
+ maxSplitBits, splitCount, splitByteSize);
+
+ EXPECT_FALSE(op.isBlocked(tracker, op_seq));
+ auto token = op_seq.try_acquire(makeDocumentBucket(source_bucket));
+ EXPECT_TRUE(token.valid());
+ EXPECT_TRUE(op.isBlocked(tracker, op_seq));
}
} // storage::distributor
diff --git a/storage/src/vespa/storage/distributor/blockingoperationstarter.cpp b/storage/src/vespa/storage/distributor/blockingoperationstarter.cpp
index 0e4bc511b26..25c38888098 100644
--- a/storage/src/vespa/storage/distributor/blockingoperationstarter.cpp
+++ b/storage/src/vespa/storage/distributor/blockingoperationstarter.cpp
@@ -7,7 +7,7 @@ namespace storage::distributor {
bool
BlockingOperationStarter::start(const std::shared_ptr<Operation>& operation, Priority priority)
{
- if (operation->isBlocked(_messageTracker)) {
+ if (operation->isBlocked(_messageTracker, _operation_sequencer)) {
return true;
}
return _starterImpl.start(operation, priority);
diff --git a/storage/src/vespa/storage/distributor/blockingoperationstarter.h b/storage/src/vespa/storage/distributor/blockingoperationstarter.h
index a64a8f5b6bb..e79ae6b4a79 100644
--- a/storage/src/vespa/storage/distributor/blockingoperationstarter.h
+++ b/storage/src/vespa/storage/distributor/blockingoperationstarter.h
@@ -7,13 +7,16 @@
namespace storage::distributor {
class PendingMessageTracker;
+class OperationSequencer;
class BlockingOperationStarter : public OperationStarter
{
public:
BlockingOperationStarter(PendingMessageTracker& messageTracker,
+ OperationSequencer& operation_sequencer,
OperationStarter& starterImpl)
: _messageTracker(messageTracker),
+ _operation_sequencer(operation_sequencer),
_starterImpl(starterImpl)
{}
BlockingOperationStarter(const BlockingOperationStarter&) = delete;
@@ -22,7 +25,8 @@ public:
bool start(const std::shared_ptr<Operation>& operation, Priority priority) override;
private:
PendingMessageTracker& _messageTracker;
- OperationStarter& _starterImpl;
+ OperationSequencer& _operation_sequencer;
+ OperationStarter& _starterImpl;
};
}
diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp
index 235384c9473..40e108dcda7 100644
--- a/storage/src/vespa/storage/distributor/distributor.cpp
+++ b/storage/src/vespa/storage/distributor/distributor.cpp
@@ -5,6 +5,7 @@
#include "distributor_bucket_space.h"
#include "distributormetricsset.h"
#include "idealstatemetricsset.h"
+#include "operation_sequencer.h"
#include "ownership_transfer_safe_time_point_calculator.h"
#include "throttlingoperationstarter.h"
#include <vespa/document/bucket/fixed_bucket_spaces.h>
@@ -79,14 +80,15 @@ Distributor::Distributor(DistributorComponentRegister& compReg,
_metrics(std::make_shared<DistributorMetricSet>()),
_operationOwner(*this, _component.getClock()),
_maintenanceOperationOwner(*this, _component.getClock()),
+ _operation_sequencer(std::make_unique<OperationSequencer>()),
_pendingMessageTracker(compReg),
_bucketDBUpdater(*this, *_bucketSpaceRepo, *_readOnlyBucketSpaceRepo, *this, compReg),
_distributorStatusDelegate(compReg, *this, *this),
_bucketDBStatusDelegate(compReg, *this, _bucketDBUpdater),
_idealStateManager(*this, *_bucketSpaceRepo, *_readOnlyBucketSpaceRepo, compReg, manageActiveBucketCopies),
_messageSender(messageSender),
- _externalOperationHandler(_component, _component,
- getMetrics(), getMessageSender(), *this, _component,
+ _externalOperationHandler(_component, _component, getMetrics(), getMessageSender(),
+ *_operation_sequencer, *this, _component,
_idealStateManager, _operationOwner),
_threadPool(threadPool),
_initializingIsUp(true),
@@ -95,7 +97,8 @@ Distributor::Distributor(DistributorComponentRegister& compReg,
_bucketPriorityDb(std::make_unique<SimpleBucketPriorityDatabase>()),
_scanner(std::make_unique<SimpleMaintenanceScanner>(*_bucketPriorityDb, _idealStateManager, *_bucketSpaceRepo)),
_throttlingStarter(std::make_unique<ThrottlingOperationStarter>(_maintenanceOperationOwner)),
- _blockingStarter(std::make_unique<BlockingOperationStarter>(_pendingMessageTracker, *_throttlingStarter)),
+ _blockingStarter(std::make_unique<BlockingOperationStarter>(_pendingMessageTracker, *_operation_sequencer,
+ *_throttlingStarter)),
_scheduler(std::make_unique<MaintenanceScheduler>(_idealStateManager, *_bucketPriorityDb, *_blockingStarter)),
_schedulingMode(MaintenanceScheduler::NORMAL_SCHEDULING_MODE),
_recoveryTimeStarted(_component.getClock()),
diff --git a/storage/src/vespa/storage/distributor/distributor.h b/storage/src/vespa/storage/distributor/distributor.h
index e95df863210..70affc59ce3 100644
--- a/storage/src/vespa/storage/distributor/distributor.h
+++ b/storage/src/vespa/storage/distributor/distributor.h
@@ -34,6 +34,7 @@ namespace storage::distributor {
class BlockingOperationStarter;
class BucketPriorityDatabase;
class DistributorBucketSpaceRepo;
+class OperationSequencer;
class OwnershipTransferSafeTimePointCalculator;
class SimpleMaintenanceScanner;
class ThrottlingOperationStarter;
@@ -76,6 +77,10 @@ public:
return _pendingMessageTracker;
}
+ const OperationSequencer& operation_sequencer() const noexcept override {
+ return *_operation_sequencer;
+ }
+
const lib::ClusterState* pendingClusterStateOrNull(const document::BucketSpace&) const override;
/**
@@ -274,6 +279,7 @@ private:
OperationOwner _operationOwner;
OperationOwner _maintenanceOperationOwner;
+ std::unique_ptr<OperationSequencer> _operation_sequencer;
PendingMessageTracker _pendingMessageTracker;
BucketDBUpdater _bucketDBUpdater;
StatusReporterDelegate _distributorStatusDelegate;
@@ -310,7 +316,6 @@ private:
DoneInitializeHandler& _doneInitializeHandler;
bool _doneInitializing;
-
std::unique_ptr<BucketPriorityDatabase> _bucketPriorityDb;
std::unique_ptr<SimpleMaintenanceScanner> _scanner;
std::unique_ptr<ThrottlingOperationStarter> _throttlingStarter;
diff --git a/storage/src/vespa/storage/distributor/distributormessagesender.h b/storage/src/vespa/storage/distributor/distributormessagesender.h
index 151413e98ef..ddf17058950 100644
--- a/storage/src/vespa/storage/distributor/distributormessagesender.h
+++ b/storage/src/vespa/storage/distributor/distributormessagesender.h
@@ -8,6 +8,7 @@ namespace storage::lib { class NodeType; }
namespace storage::distributor {
class PendingMessageTracker;
+class OperationSequencer;
class DistributorMessageSender : public MessageSender {
public:
@@ -21,6 +22,7 @@ public:
virtual int getDistributorIndex() const = 0;
virtual const vespalib::string& getClusterName() const = 0;
virtual const PendingMessageTracker& getPendingMessageTracker() const = 0;
+ virtual const OperationSequencer& operation_sequencer() const noexcept = 0;
};
}
diff --git a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
index 5fe5e51202a..cf00718b1fd 100644
--- a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
+++ b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
@@ -3,6 +3,7 @@
#include "bucket_space_distribution_context.h"
#include "externaloperationhandler.h"
#include "distributor.h"
+#include "operation_sequencer.h"
#include <vespa/document/base/documentid.h>
#include <vespa/storage/distributor/operations/external/putoperation.h>
#include <vespa/storage/distributor/operations/external/twophaseupdateoperation.h>
@@ -54,12 +55,16 @@ public:
const PendingMessageTracker& getPendingMessageTracker() const override {
abort(); // Never called by the messages using this component.
}
+ const OperationSequencer& operation_sequencer() const noexcept override {
+ abort(); // Never called by the messages using this component.
+ }
};
ExternalOperationHandler::ExternalOperationHandler(DistributorNodeContext& node_ctx,
DistributorOperationContext& op_ctx,
DistributorMetricSet& metrics,
ChainedMessageSender& msg_sender,
+ OperationSequencer& operation_sequencer,
NonTrackingMessageSender& non_tracking_sender,
DocumentSelectionParser& parser,
const MaintenanceOperationGenerator& gen,
@@ -68,6 +73,7 @@ ExternalOperationHandler::ExternalOperationHandler(DistributorNodeContext& node_
_op_ctx(op_ctx),
_metrics(metrics),
_msg_sender(msg_sender),
+ _operation_sequencer(operation_sequencer),
_parser(parser),
_direct_dispatch_sender(std::make_unique<DirectDispatchSender>(node_ctx, non_tracking_sender)),
_operationGenerator(gen),
diff --git a/storage/src/vespa/storage/distributor/externaloperationhandler.h b/storage/src/vespa/storage/distributor/externaloperationhandler.h
index 0b6e9d970aa..1a02ff92f58 100644
--- a/storage/src/vespa/storage/distributor/externaloperationhandler.h
+++ b/storage/src/vespa/storage/distributor/externaloperationhandler.h
@@ -1,7 +1,6 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
-#include "operation_sequencer.h"
#include <vespa/document/bucket/bucketid.h>
#include <vespa/document/bucket/bucketidfactory.h>
#include <vespa/vdslib/state/clusterstate.h>
@@ -23,6 +22,8 @@ class DistributorMetricSet;
class Distributor;
class MaintenanceOperationGenerator;
class DirectDispatchSender;
+class SequencingHandle;
+class OperationSequencer;
class OperationOwner;
class ExternalOperationHandler : public api::MessageHandler
@@ -44,6 +45,7 @@ public:
DistributorOperationContext& op_ctx,
DistributorMetricSet& metrics,
ChainedMessageSender& msg_sender,
+ OperationSequencer& operation_sequencer,
NonTrackingMessageSender& non_tracking_sender,
DocumentSelectionParser& parser,
const MaintenanceOperationGenerator& gen,
@@ -89,10 +91,10 @@ private:
DistributorOperationContext& _op_ctx;
DistributorMetricSet& _metrics;
ChainedMessageSender& _msg_sender;
+ OperationSequencer& _operation_sequencer;
DocumentSelectionParser& _parser;
std::unique_ptr<DirectDispatchSender> _direct_dispatch_sender;
const MaintenanceOperationGenerator& _operationGenerator;
- OperationSequencer _operation_sequencer;
Operation::SP _op;
TimePoint _rejectFeedBeforeTimeReached;
OperationOwner& _distributor_operation_owner;
diff --git a/storage/src/vespa/storage/distributor/operation_sequencer.cpp b/storage/src/vespa/storage/distributor/operation_sequencer.cpp
index aa48cb6c950..2eeba79a053 100644
--- a/storage/src/vespa/storage/distributor/operation_sequencer.cpp
+++ b/storage/src/vespa/storage/distributor/operation_sequencer.cpp
@@ -47,6 +47,10 @@ SequencingHandle OperationSequencer::try_acquire(const document::Bucket& bucket)
}
}
+bool OperationSequencer::is_blocked(const document::Bucket& bucket) const noexcept {
+ return (_active_buckets.find(bucket) != _active_buckets.end());
+}
+
void OperationSequencer::release(const SequencingHandle& handle) {
assert(handle.valid());
if (handle.has_gid()) {
diff --git a/storage/src/vespa/storage/distributor/operation_sequencer.h b/storage/src/vespa/storage/distributor/operation_sequencer.h
index b813071a1c8..f9ff6b32e0b 100644
--- a/storage/src/vespa/storage/distributor/operation_sequencer.h
+++ b/storage/src/vespa/storage/distributor/operation_sequencer.h
@@ -130,6 +130,8 @@ public:
SequencingHandle try_acquire(document::BucketSpace bucket_space, const document::DocumentId& id);
SequencingHandle try_acquire(const document::Bucket& bucket);
+
+ bool is_blocked(const document::Bucket&) const noexcept;
private:
void release(const SequencingHandle& handle);
};
diff --git a/storage/src/vespa/storage/distributor/operationowner.h b/storage/src/vespa/storage/distributor/operationowner.h
index 37412b08d2e..5f346a0104f 100644
--- a/storage/src/vespa/storage/distributor/operationowner.h
+++ b/storage/src/vespa/storage/distributor/operationowner.h
@@ -48,6 +48,10 @@ public:
return _sender.getPendingMessageTracker();
}
+ const OperationSequencer& operation_sequencer() const noexcept override {
+ return _sender.operation_sequencer();
+ }
+
private:
OperationOwner& _owner;
DistributorMessageSender& _sender;
diff --git a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
index 46eb104db56..2d6e3bd30e2 100644
--- a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
@@ -88,6 +88,10 @@ struct IntermediateMessageSender : DistributorMessageSender {
const PendingMessageTracker& getPendingMessageTracker() const override {
return forward.getPendingMessageTracker();
}
+
+ const OperationSequencer& operation_sequencer() const noexcept override {
+ return forward.operation_sequencer();
+ }
};
IntermediateMessageSender::IntermediateMessageSender(SentMessageMap& mm,
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp
index 1a2eb8c6215..42d9e3d4a3d 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp
@@ -4,6 +4,7 @@
#include <vespa/storage/distributor/pendingmessagetracker.h>
#include <vespa/storage/distributor/idealstatemetricsset.h>
#include <vespa/storage/distributor/distributor_bucket_space_repo.h>
+#include <vespa/storage/distributor/operation_sequencer.h>
#include <vespa/log/log.h>
LOG_SETUP(".distributor.operation");
@@ -200,8 +201,12 @@ checkNullBucketRequestBucketInfoMessage(uint16_t node,
bool
IdealStateOperation::checkBlock(const document::Bucket &bucket,
- const PendingMessageTracker& tracker) const
+ const PendingMessageTracker& tracker,
+ const OperationSequencer& seq) const
{
+ if (seq.is_blocked(bucket)) {
+ return true;
+ }
IdealStateOpChecker ichk(*this);
const std::vector<uint16_t>& nodes(getNodes());
for (auto node : nodes) {
@@ -219,8 +224,12 @@ IdealStateOperation::checkBlock(const document::Bucket &bucket,
bool
IdealStateOperation::checkBlockForAllNodes(
const document::Bucket &bucket,
- const PendingMessageTracker& tracker) const
+ const PendingMessageTracker& tracker,
+ const OperationSequencer& seq) const
{
+ if (seq.is_blocked(bucket)) {
+ return true;
+ }
IdealStateOpChecker ichk(*this);
// Check messages sent to _any node_ for _this_ particular bucket.
tracker.checkPendingMessages(bucket, ichk);
@@ -238,9 +247,9 @@ IdealStateOperation::checkBlockForAllNodes(
bool
-IdealStateOperation::isBlocked(const PendingMessageTracker& tracker) const
+IdealStateOperation::isBlocked(const PendingMessageTracker& tracker, const OperationSequencer& op_seq) const
{
- return checkBlock(getBucket(), tracker);
+ return checkBlock(getBucket(), tracker, op_seq);
}
std::string
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h
index 08b545057cf..dcdc2f32374 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h
@@ -182,7 +182,7 @@ public:
* Returns true if we are blocked to start this operation given
* the pending messages.
*/
- bool isBlocked(const PendingMessageTracker& pendingMessages) const override;
+ bool isBlocked(const PendingMessageTracker& pendingMessages, const OperationSequencer&) const override;
/**
Returns the priority we should send messages with.
@@ -235,8 +235,12 @@ protected:
* operations to other nodes for this bucket, these will not be part of
* the set of messages checked.
*/
- bool checkBlock(const document::Bucket &bucket, const PendingMessageTracker& tracker) const;
- bool checkBlockForAllNodes(const document::Bucket &bucket, const PendingMessageTracker& tracker) const;
+ bool checkBlock(const document::Bucket& bucket,
+ const PendingMessageTracker& tracker,
+ const OperationSequencer&) const;
+ bool checkBlockForAllNodes(const document::Bucket& bucket,
+ const PendingMessageTracker& tracker,
+ const OperationSequencer&) const;
};
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp
index 130e039a43e..292df7c5e07 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp
@@ -150,10 +150,10 @@ JoinOperation::getJoinBucket(size_t idx) const
}
bool
-JoinOperation::isBlocked(const PendingMessageTracker& tracker) const
+JoinOperation::isBlocked(const PendingMessageTracker& tracker, const OperationSequencer& op_seq) const
{
- return (checkBlock(getBucket(), tracker) ||
- checkBlock(getJoinBucket(0), tracker) ||
- (_bucketsToJoin.size() > 1 && checkBlock(getJoinBucket(1), tracker)));
+ return (checkBlock(getBucket(), tracker, op_seq) ||
+ checkBlock(getJoinBucket(0), tracker, op_seq) ||
+ (_bucketsToJoin.size() > 1 && checkBlock(getJoinBucket(1), tracker, op_seq)));
}
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.h
index 55a7c05264a..86372231058 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.h
@@ -41,7 +41,8 @@ public:
return JOIN_BUCKET;
}
- bool isBlocked(const PendingMessageTracker& pendingMessages) const override;
+ bool isBlocked(const PendingMessageTracker& pendingMessages,
+ const OperationSequencer& op_seq) const override;
protected:
using NodeToBuckets = std::map<uint16_t, std::vector<document::BucketId>>;
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp
index 445d0972937..16b510c22be 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp
@@ -233,7 +233,7 @@ MergeOperation::deleteSourceOnlyNodes(
BucketAndNodes(getBucket(), sourceOnlyNodes)));
// Must not send removes to source only copies if something has caused
// pending load to the copy after the merge was sent!
- if (_removeOperation->isBlocked(sender.getPendingMessageTracker())) {
+ if (_removeOperation->isBlocked(sender.getPendingMessageTracker(), sender.operation_sequencer())) {
LOG(debug,
"Source only removal for %s was blocked by a pending "
"operation",
@@ -324,14 +324,15 @@ bool MergeOperation::shouldBlockThisOperation(uint32_t messageType, uint8_t pri)
return IdealStateOperation::shouldBlockThisOperation(messageType, pri);
}
-bool MergeOperation::isBlocked(const PendingMessageTracker& pending_tracker) const {
+bool MergeOperation::isBlocked(const PendingMessageTracker& pending_tracker,
+ const OperationSequencer& op_seq) const {
const auto& node_info = pending_tracker.getNodeInfo();
for (auto node : getNodes()) {
if (node_info.isBusy(node)) {
return true;
}
}
- return IdealStateOperation::isBlocked(pending_tracker);
+ return IdealStateOperation::isBlocked(pending_tracker, op_seq);
}
}
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h
index d09bc0ba5c4..a5f7d352eea 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h
@@ -47,7 +47,7 @@ public:
std::vector<MergeMetaData>&);
bool shouldBlockThisOperation(uint32_t messageType, uint8_t pri) const override;
- bool isBlocked(const PendingMessageTracker& pendingMessages) const override;
+ bool isBlocked(const PendingMessageTracker& pendingMessages, const OperationSequencer&) const override;
private:
static void addIdealNodes(
const std::vector<uint16_t>& idealNodes,
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp
index 508dcf13916..a74dc1c0d65 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp
@@ -164,9 +164,9 @@ SplitOperation::onReceive(DistributorMessageSender&, const api::StorageReply::SP
}
bool
-SplitOperation::isBlocked(const PendingMessageTracker& tracker) const
+SplitOperation::isBlocked(const PendingMessageTracker& tracker, const OperationSequencer& op_seq) const
{
- return checkBlockForAllNodes(getBucket(), tracker);
+ return checkBlockForAllNodes(getBucket(), tracker, op_seq);
}
bool
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.h
index 6959929bf01..9e24852886d 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.h
@@ -20,7 +20,7 @@ public:
const char* getName() const override { return "split"; };
Type getType() const override { return SPLIT_BUCKET; }
uint32_t getMaxSplitBits() const { return _maxBits; }
- bool isBlocked(const PendingMessageTracker&) const override;
+ bool isBlocked(const PendingMessageTracker&, const OperationSequencer&) const override;
bool shouldBlockThisOperation(uint32_t, uint8_t) const override;
protected:
MessageTracker _tracker;
diff --git a/storage/src/vespa/storage/distributor/operations/operation.h b/storage/src/vespa/storage/distributor/operations/operation.h
index 538f15d2bf2..e9320817a8e 100644
--- a/storage/src/vespa/storage/distributor/operations/operation.h
+++ b/storage/src/vespa/storage/distributor/operations/operation.h
@@ -18,6 +18,7 @@ class StorageComponent;
namespace distributor {
class PendingMessageTracker;
+class OperationSequencer;
class Operation
{
@@ -61,7 +62,7 @@ public:
* Returns true if we are blocked to start this operation given
* the pending messages.
*/
- virtual bool isBlocked(const PendingMessageTracker&) const {
+ virtual bool isBlocked(const PendingMessageTracker&, const OperationSequencer&) const {
return false;
}