summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2021-06-09 13:15:38 +0000
committerTor Brede Vekterli <vekterli@verizonmedia.com>2021-06-09 13:15:38 +0000
commit29a607c45f0cd8e373df995589743d233075a1fe (patch)
tree3ff40e7aac766c94b04c3371fb50115644df978a /storage
parent3b5aba3a7223924498d651f8257170e35ee79825 (diff)
Block ideal state ops when a pending cluster state is present
Since distributor stripes no longer have access to the top-level pending message tracking info, it's no longer possible to infer if a pending cluster state is happening by looking at the sent messages. Instead, do this more generally (and efficiently) by looking at the potential pending cluster state directly. Rewire the `isBlocked` logic to take in an operation context instead of just a `PendingMessageTracker`, giving it access to a lot more relevant information.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/distributor/blockingoperationstartertest.cpp85
-rw-r--r--storage/src/tests/distributor/distributor_message_sender_stub.h5
-rw-r--r--storage/src/tests/distributor/distributortest.cpp9
-rw-r--r--storage/src/tests/distributor/distributortestutil.cpp13
-rw-r--r--storage/src/tests/distributor/distributortestutil.h6
-rw-r--r--storage/src/tests/distributor/idealstatemanagertest.cpp76
-rw-r--r--storage/src/tests/distributor/maintenancemocks.h2
-rw-r--r--storage/src/tests/distributor/mergeoperationtest.cpp28
-rw-r--r--storage/src/tests/distributor/splitbuckettest.cpp18
-rw-r--r--storage/src/vespa/storage/distributor/blockingoperationstarter.cpp2
-rw-r--r--storage/src/vespa/storage/distributor/blockingoperationstarter.h12
-rw-r--r--storage/src/vespa/storage/distributor/distributor_operation_context.h2
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe.cpp2
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe_component.h3
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe_interface.h1
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe_operation_context.h3
-rw-r--r--storage/src/vespa/storage/distributor/distributormessagesender.h1
-rw-r--r--storage/src/vespa/storage/distributor/externaloperationhandler.cpp3
-rw-r--r--storage/src/vespa/storage/distributor/operationowner.h4
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp4
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp48
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h6
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp8
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.h2
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp8
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h2
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp4
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.h2
-rw-r--r--storage/src/vespa/storage/distributor/operations/operation.h3
29 files changed, 223 insertions, 139 deletions
diff --git a/storage/src/tests/distributor/blockingoperationstartertest.cpp b/storage/src/tests/distributor/blockingoperationstartertest.cpp
index 5203fec2462..861f8e72832 100644
--- a/storage/src/tests/distributor/blockingoperationstartertest.cpp
+++ b/storage/src/tests/distributor/blockingoperationstartertest.cpp
@@ -1,6 +1,7 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <vespa/storage/frameworkimpl/component/storagecomponentregisterimpl.h>
#include <vespa/storage/distributor/blockingoperationstarter.h>
+#include <vespa/storage/distributor/distributor_stripe_operation_context.h>
#include <vespa/storage/distributor/pendingmessagetracker.h>
#include <vespa/storage/distributor/operation_sequencer.h>
#include <tests/distributor/maintenancemocks.h>
@@ -13,6 +14,86 @@ using namespace ::testing;
namespace storage::distributor {
+struct FakeDistributorStripeOperationContext : public DistributorStripeOperationContext {
+
+ PendingMessageTracker& _message_tracker;
+
+ explicit FakeDistributorStripeOperationContext(PendingMessageTracker& message_tracker)
+ : _message_tracker(message_tracker)
+ {}
+
+ ~FakeDistributorStripeOperationContext() override = default;
+
+ // From DistributorOperationContext:
+ api::Timestamp generate_unique_timestamp() override {
+ abort();
+ }
+ const DistributorBucketSpaceRepo& bucket_space_repo() const noexcept override {
+ abort();
+ }
+ DistributorBucketSpaceRepo& bucket_space_repo() noexcept override {
+ abort();
+ }
+ const DistributorBucketSpaceRepo& read_only_bucket_space_repo() const noexcept override {
+ abort();
+ }
+ DistributorBucketSpaceRepo& read_only_bucket_space_repo() noexcept override {
+ abort();
+ }
+ const DistributorConfiguration& distributor_config() const noexcept override {
+ abort();
+ }
+ // From DistributorStripeOperationContext:
+ void update_bucket_database(const document::Bucket&, const BucketCopy&, uint32_t) override {
+ abort();
+ }
+ void update_bucket_database(const document::Bucket&, const std::vector<BucketCopy>&, uint32_t) override {
+ abort();
+ }
+ void remove_node_from_bucket_database(const document::Bucket&, uint16_t) override {
+ abort();
+ }
+ void remove_nodes_from_bucket_database(const document::Bucket&, const std::vector<uint16_t>&) override {
+ abort();
+ }
+ document::BucketId make_split_bit_constrained_bucket_id(const document::DocumentId&) const override {
+ abort();
+ }
+ void recheck_bucket_info(uint16_t, const document::Bucket&) override {
+ abort();
+ }
+ document::BucketId get_sibling(const document::BucketId&) const override {
+ abort();
+ }
+ void send_inline_split_if_bucket_too_large(document::BucketSpace, const BucketDatabase::Entry&, uint8_t) override {
+ abort();
+ }
+ OperationRoutingSnapshot read_snapshot_for_bucket(const document::Bucket&) const override {
+ abort();
+ }
+ PendingMessageTracker& pending_message_tracker() noexcept override {
+ return _message_tracker;
+ }
+ const PendingMessageTracker& pending_message_tracker() const noexcept override {
+ return _message_tracker;
+ }
+ bool has_pending_message(uint16_t, const document::Bucket&, uint32_t) const override {
+ abort();
+ }
+ const lib::ClusterState* pending_cluster_state_or_null(const document::BucketSpace&) const override {
+ abort();
+ }
+ const lib::ClusterStateBundle& cluster_state_bundle() const override {
+ abort();
+ }
+ bool storage_node_is_up(document::BucketSpace, uint32_t) const override {
+ abort();
+ }
+ const BucketGcTimeCalculator::BucketIdHasher& bucket_id_hasher() const override {
+ abort();
+ }
+};
+
struct BlockingOperationStarterTest : Test {
std::shared_ptr<Operation> createMockOperation() {
return std::make_shared<MockOperation>(makeDocumentBucket(BucketId(16, 1)));
@@ -27,6 +108,7 @@ struct BlockingOperationStarterTest : Test {
std::unique_ptr<MockOperationStarter> _starterImpl;
std::unique_ptr<StorageComponentRegisterImpl> _compReg;
std::unique_ptr<PendingMessageTracker> _messageTracker;
+ std::unique_ptr<FakeDistributorStripeOperationContext> _fake_ctx;
std::unique_ptr<OperationSequencer> _operation_sequencer;
std::unique_ptr<BlockingOperationStarter> _operationStarter;
@@ -41,8 +123,9 @@ BlockingOperationStarterTest::SetUp()
_compReg->setClock(_clock);
_clock.setAbsoluteTimeInSeconds(1);
_messageTracker = std::make_unique<PendingMessageTracker>(*_compReg);
+ _fake_ctx = std::make_unique<FakeDistributorStripeOperationContext>(*_messageTracker);
_operation_sequencer = std::make_unique<OperationSequencer>();
- _operationStarter = std::make_unique<BlockingOperationStarter>(*_messageTracker, *_operation_sequencer, *_starterImpl);
+ _operationStarter = std::make_unique<BlockingOperationStarter>(*_fake_ctx, *_operation_sequencer, *_starterImpl);
}
TEST_F(BlockingOperationStarterTest, operation_not_blocked_when_no_messages_pending) {
diff --git a/storage/src/tests/distributor/distributor_message_sender_stub.h b/storage/src/tests/distributor/distributor_message_sender_stub.h
index 59a5a82b7df..18662fbce8f 100644
--- a/storage/src/tests/distributor/distributor_message_sender_stub.h
+++ b/storage/src/tests/distributor/distributor_message_sender_stub.h
@@ -87,6 +87,11 @@ public:
return dummy_cluster_context;
}
+ distributor::PendingMessageTracker& getPendingMessageTracker() override {
+ assert(_pending_message_tracker);
+ return *_pending_message_tracker;
+ }
+
const distributor::PendingMessageTracker& getPendingMessageTracker() const override {
assert(_pending_message_tracker);
return *_pending_message_tracker;
diff --git a/storage/src/tests/distributor/distributortest.cpp b/storage/src/tests/distributor/distributortest.cpp
index 3d1c6165946..9c3686d3614 100644
--- a/storage/src/tests/distributor/distributortest.cpp
+++ b/storage/src/tests/distributor/distributortest.cpp
@@ -222,7 +222,6 @@ struct DistributorTest : Test, DistributorTestUtil {
}
void configureMaxClusterClockSkew(int seconds);
- void sendDownClusterStateCommand();
void replyToSingleRequestBucketInfoCommandWith1Bucket();
void sendDownDummyRemoveCommand();
void assertSingleBouncedRemoveReplyPresent();
@@ -795,12 +794,6 @@ auto make_dummy_get_command_for_bucket_1() {
}
-void DistributorTest::sendDownClusterStateCommand() {
- lib::ClusterState newState("bits:1 storage:1 distributor:1");
- auto stateCmd = std::make_shared<api::SetSystemStateCommand>(newState);
- _distributor->handleMessage(stateCmd);
-}
-
void DistributorTest::replyToSingleRequestBucketInfoCommandWith1Bucket() {
ASSERT_EQ(_bucketSpaces.size(), _sender.commands().size());
for (uint32_t i = 0; i < _sender.commands().size(); ++i) {
@@ -847,7 +840,7 @@ TEST_F(DistributorTest, configured_safe_time_point_rejection_works_end_to_end) {
getClock().setAbsoluteTimeInSeconds(1000);
configureMaxClusterClockSkew(10);
- sendDownClusterStateCommand();
+ receive_set_system_state_command("bits:1 storage:1 distributor:1");
ASSERT_NO_FATAL_FAILURE(replyToSingleRequestBucketInfoCommandWith1Bucket());
// SetSystemStateCommand sent down chain at this point.
sendDownDummyRemoveCommand();
diff --git a/storage/src/tests/distributor/distributortestutil.cpp b/storage/src/tests/distributor/distributortestutil.cpp
index a2f32d8faa2..3ec1c95b206 100644
--- a/storage/src/tests/distributor/distributortestutil.cpp
+++ b/storage/src/tests/distributor/distributortestutil.cpp
@@ -101,6 +101,19 @@ DistributorTestUtil::triggerDistributionChange(lib::Distribution::SP distr)
}
void
+DistributorTestUtil::receive_set_system_state_command(const vespalib::string& state_str)
+{
+ auto state_cmd = std::make_shared<api::SetSystemStateCommand>(lib::ClusterState(state_str));
+ _distributor->handleMessage(state_cmd); // TODO move semantics
+}
+
+void
+DistributorTestUtil::handle_top_level_message(const std::shared_ptr<api::StorageMessage>& msg)
+{
+ _distributor->handleMessage(msg);
+}
+
+void
DistributorTestUtil::setTypeRepo(const std::shared_ptr<const document::DocumentTypeRepo> &repo)
{
_node->getComponentRegister().setDocumentTypeRepo(repo);
diff --git a/storage/src/tests/distributor/distributortestutil.h b/storage/src/tests/distributor/distributortestutil.h
index 6664b8d823d..533fd49811f 100644
--- a/storage/src/tests/distributor/distributortestutil.h
+++ b/storage/src/tests/distributor/distributortestutil.h
@@ -202,6 +202,12 @@ public:
void setSystemState(const lib::ClusterState& systemState);
+ // Invokes full cluster state transition pipeline rather than directly applying
+ // the state and just pretending everything has been completed.
+ void receive_set_system_state_command(const vespalib::string& state_str);
+
+ void handle_top_level_message(const std::shared_ptr<api::StorageMessage>& msg);
+
// Must be called prior to createLinks() to have any effect
void set_num_distributor_stripes(uint32_t n_stripes) noexcept {
_num_distributor_stripes = n_stripes;
diff --git a/storage/src/tests/distributor/idealstatemanagertest.cpp b/storage/src/tests/distributor/idealstatemanagertest.cpp
index 0a36e5cd0e5..e38e4b5b668 100644
--- a/storage/src/tests/distributor/idealstatemanagertest.cpp
+++ b/storage/src/tests/distributor/idealstatemanagertest.cpp
@@ -40,18 +40,18 @@ struct IdealStateManagerTest : Test, DistributorTestUtil {
bool checkBlock(const IdealStateOperation& op,
const document::Bucket& bucket,
- const PendingMessageTracker& tracker,
+ const DistributorStripeOperationContext& ctx,
const OperationSequencer& op_seq) const
{
- return op.checkBlock(bucket, tracker, op_seq);
+ return op.checkBlock(bucket, ctx, op_seq);
}
bool checkBlockForAllNodes(const IdealStateOperation& op,
const document::Bucket& bucket,
- const PendingMessageTracker& tracker,
+ const DistributorStripeOperationContext& ctx,
const OperationSequencer& op_seq) const
{
- return op.checkBlockForAllNodes(bucket, tracker, op_seq);
+ return op.checkBlockForAllNodes(bucket, ctx, op_seq);
}
std::vector<document::BucketSpace> _bucketSpaces;
@@ -170,92 +170,86 @@ TEST_F(IdealStateManagerTest, recheck_when_active) {
active_ideal_state_operations());
}
-TEST_F(IdealStateManagerTest, block_ideal_state_ops_on_full_request_bucket_info) {
+/**
+ * Don't schedule ideal state operations when there's a pending cluster state.
+ * This subsumes the legacy behavior of blocking ideal state ops when there is a
+ * zero-bucket RequestBucketInfoCommand pending towards a node (i.e. full bucket
+ * info fetch).
+ *
+ * This is for two reasons:
+ * - Avoids race conditions where we change the bucket set concurrently with
+ * requesting bucket info.
+ * - Once we get updated bucket info it's likely that the set of ideal state ops
+ * to execute will change anyway, so it makes sense to wait until it's ready.
+ */
+TEST_F(IdealStateManagerTest, block_ideal_state_ops_when_pending_cluster_state_is_present) {
+
+ setupDistributor(2, 10, "version:1 distributor:1 storage:1 .0.s:d");
+
+ // Trigger a pending cluster state with bucket info requests towards 1 node
+ receive_set_system_state_command("version:2 distributor:1 storage:1");
- setupDistributor(2, 10, "distributor:1 storage:2");
-
- framework::defaultimplementation::FakeClock clock;
- PendingMessageTracker tracker(_node->getComponentRegister());
OperationSequencer op_seq;
-
document::BucketId bid(16, 1234);
- std::vector<document::BucketId> buckets;
-
- // RequestBucketInfoCommand does not have a specific bucketid since it's
- // sent to the entire node. It will then use a null bucketid.
- {
- auto msg = std::make_shared<api::RequestBucketInfoCommand>(makeBucketSpace(), buckets);
- msg->setAddress(api::StorageMessageAddress::create(dummy_cluster_context.cluster_name_ptr(), lib::NodeType::STORAGE, 4));
- tracker.insert(msg);
- }
{
RemoveBucketOperation op(dummy_cluster_context,
BucketAndNodes(makeDocumentBucket(bid), toVector<uint16_t>(3, 4)));
- EXPECT_TRUE(op.isBlocked(tracker, op_seq));
- }
-
- {
- // Don't trigger on requests to other nodes.
- RemoveBucketOperation op(dummy_cluster_context,
- BucketAndNodes(makeDocumentBucket(bid), toVector<uint16_t>(3, 5)));
- EXPECT_FALSE(op.isBlocked(tracker, op_seq));
+ EXPECT_TRUE(op.isBlocked(operation_context(), op_seq));
}
- // Don't block on null-bucket messages that aren't RequestBucketInfo.
- {
- auto msg = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "foo", "bar", "baz");
- msg->setAddress(api::StorageMessageAddress::create(dummy_cluster_context.cluster_name_ptr(), lib::NodeType::STORAGE, 7));
- tracker.insert(msg);
+ // Clear pending by replying with zero buckets for all bucket spaces
+ ASSERT_EQ(_bucketSpaces.size(), _sender.commands().size());
+ for (uint32_t i = 0; i < _sender.commands().size(); ++i) {
+ auto& bucket_req = dynamic_cast<api::RequestBucketInfoCommand&>(*_sender.command(i));
+ handle_top_level_message(bucket_req.makeReply());
}
{
RemoveBucketOperation op(dummy_cluster_context,
BucketAndNodes(makeDocumentBucket(bid), toVector<uint16_t>(7)));
- EXPECT_FALSE(op.isBlocked(tracker, op_seq));
+ EXPECT_FALSE(op.isBlocked(operation_context(), op_seq));
}
}
TEST_F(IdealStateManagerTest, block_check_for_all_operations_to_specific_bucket) {
setupDistributor(2, 10, "distributor:1 storage:2");
framework::defaultimplementation::FakeClock clock;
- PendingMessageTracker tracker(_node->getComponentRegister());
OperationSequencer op_seq;
document::BucketId bid(16, 1234);
{
auto msg = std::make_shared<api::JoinBucketsCommand>(makeDocumentBucket(bid));
msg->setAddress(api::StorageMessageAddress::create(dummy_cluster_context.cluster_name_ptr(), lib::NodeType::STORAGE, 4));
- tracker.insert(msg);
+ pending_message_tracker().insert(msg);
}
{
RemoveBucketOperation op(dummy_cluster_context,
BucketAndNodes(makeDocumentBucket(bid), toVector<uint16_t>(7)));
// Not blocked for exact node match.
- EXPECT_FALSE(checkBlock(op, makeDocumentBucket(bid), tracker, op_seq));
+ EXPECT_FALSE(checkBlock(op, makeDocumentBucket(bid), operation_context(), op_seq));
// But blocked for bucket match!
- EXPECT_TRUE(checkBlockForAllNodes(op, makeDocumentBucket(bid), tracker, op_seq));
+ EXPECT_TRUE(checkBlockForAllNodes(op, makeDocumentBucket(bid), operation_context(), op_seq));
}
}
TEST_F(IdealStateManagerTest, block_operations_with_locked_buckets) {
setupDistributor(2, 10, "distributor:1 storage:2");
framework::defaultimplementation::FakeClock clock;
- PendingMessageTracker tracker(_node->getComponentRegister());
OperationSequencer op_seq;
const auto bucket = makeDocumentBucket(document::BucketId(16, 1234));
{
auto msg = std::make_shared<api::JoinBucketsCommand>(bucket);
msg->setAddress(api::StorageMessageAddress::create(dummy_cluster_context.cluster_name_ptr(), lib::NodeType::STORAGE, 1));
- tracker.insert(msg);
+ pending_message_tracker().insert(msg);
}
auto token = op_seq.try_acquire(bucket, "foo");
EXPECT_TRUE(token.valid());
{
RemoveBucketOperation op(dummy_cluster_context, BucketAndNodes(bucket, toVector<uint16_t>(0)));
- EXPECT_TRUE(checkBlock(op, bucket, tracker, op_seq));
- EXPECT_TRUE(checkBlockForAllNodes(op, bucket, tracker, op_seq));
+ EXPECT_TRUE(checkBlock(op, bucket, operation_context(), op_seq));
+ EXPECT_TRUE(checkBlockForAllNodes(op, bucket, operation_context(), op_seq));
}
}
diff --git a/storage/src/tests/distributor/maintenancemocks.h b/storage/src/tests/distributor/maintenancemocks.h
index fff798d4413..1245c9bb15d 100644
--- a/storage/src/tests/distributor/maintenancemocks.h
+++ b/storage/src/tests/distributor/maintenancemocks.h
@@ -51,7 +51,7 @@ public:
}
void onStart(DistributorStripeMessageSender&) override {}
void onReceive(DistributorStripeMessageSender&, const std::shared_ptr<api::StorageReply>&) override {}
- bool isBlocked(const PendingMessageTracker&, const OperationSequencer&) const override {
+ bool isBlocked(const DistributorStripeOperationContext&, const OperationSequencer&) const override {
return _shouldBlock;
}
void setShouldBlock(bool shouldBlock) {
diff --git a/storage/src/tests/distributor/mergeoperationtest.cpp b/storage/src/tests/distributor/mergeoperationtest.cpp
index 1026ea2855e..52a8bfc41b6 100644
--- a/storage/src/tests/distributor/mergeoperationtest.cpp
+++ b/storage/src/tests/distributor/mergeoperationtest.cpp
@@ -18,13 +18,11 @@ using namespace ::testing;
namespace storage::distributor {
struct MergeOperationTest : Test, DistributorTestUtil {
- std::unique_ptr<PendingMessageTracker> _pendingTracker;
OperationSequencer _operation_sequencer;
void SetUp() override {
createLinks();
- _pendingTracker = std::make_unique<PendingMessageTracker>(getComponentRegister());
- _sender.setPendingMessageTracker(*_pendingTracker);
+ _sender.setPendingMessageTracker(pending_message_tracker());
_sender.set_operation_sequencer(_operation_sequencer);
}
@@ -256,7 +254,7 @@ TEST_F(MergeOperationTest, do_not_remove_copies_with_pending_messages) {
makeDocumentBucket(bucket), api::SetBucketStateCommand::ACTIVE);
vespalib::string storage("storage");
msg->setAddress(api::StorageMessageAddress::create(&storage, lib::NodeType::STORAGE, 1));
- _pendingTracker->insert(msg);
+ pending_message_tracker().insert(msg);
sendReply(op);
// Should not be a remove here!
@@ -400,19 +398,19 @@ TEST_F(MergeOperationTest, merge_operation_is_blocked_by_any_busy_target_node) {
op.setIdealStateManager(&getIdealStateManager());
// Should not block on nodes _not_ included in operation node set
- _pendingTracker->getNodeInfo().setBusy(3, std::chrono::seconds(10));
- EXPECT_FALSE(op.isBlocked(*_pendingTracker, _operation_sequencer));
+ pending_message_tracker().getNodeInfo().setBusy(3, std::chrono::seconds(10));
+ EXPECT_FALSE(op.isBlocked(operation_context(), _operation_sequencer));
// Node 1 is included in operation node set and should cause a block
- _pendingTracker->getNodeInfo().setBusy(0, std::chrono::seconds(10));
- EXPECT_TRUE(op.isBlocked(*_pendingTracker, _operation_sequencer));
+ pending_message_tracker().getNodeInfo().setBusy(0, std::chrono::seconds(10));
+ EXPECT_TRUE(op.isBlocked(operation_context(), _operation_sequencer));
getClock().addSecondsToTime(11);
- EXPECT_FALSE(op.isBlocked(*_pendingTracker, _operation_sequencer)); // No longer busy
+ EXPECT_FALSE(op.isBlocked(operation_context(), _operation_sequencer)); // No longer busy
// Should block on other operation nodes than the first listed as well
- _pendingTracker->getNodeInfo().setBusy(1, std::chrono::seconds(10));
- EXPECT_TRUE(op.isBlocked(*_pendingTracker, _operation_sequencer));
+ pending_message_tracker().getNodeInfo().setBusy(1, std::chrono::seconds(10));
+ EXPECT_TRUE(op.isBlocked(operation_context(), _operation_sequencer));
}
@@ -426,8 +424,8 @@ TEST_F(MergeOperationTest, global_bucket_merges_are_not_blocked_by_busy_nodes) {
op.setIdealStateManager(&getIdealStateManager());
// Node 1 is included in operation node set but should not cause a block of global bucket merge
- _pendingTracker->getNodeInfo().setBusy(0, std::chrono::seconds(10));
- EXPECT_FALSE(op.isBlocked(*_pendingTracker, _operation_sequencer));
+ pending_message_tracker().getNodeInfo().setBusy(0, std::chrono::seconds(10));
+ EXPECT_FALSE(op.isBlocked(operation_context(), _operation_sequencer));
}
TEST_F(MergeOperationTest, merge_operation_is_blocked_by_locked_bucket) {
@@ -437,10 +435,10 @@ TEST_F(MergeOperationTest, merge_operation_is_blocked_by_locked_bucket) {
MergeOperation op(BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)), toVector<uint16_t>(0, 1, 2)));
op.setIdealStateManager(&getIdealStateManager());
- EXPECT_FALSE(op.isBlocked(*_pendingTracker, _operation_sequencer));
+ EXPECT_FALSE(op.isBlocked(operation_context(), _operation_sequencer));
auto token = _operation_sequencer.try_acquire(makeDocumentBucket(document::BucketId(16, 1)), "foo");
EXPECT_TRUE(token.valid());
- EXPECT_TRUE(op.isBlocked(*_pendingTracker, _operation_sequencer));
+ EXPECT_TRUE(op.isBlocked(operation_context(), _operation_sequencer));
}
TEST_F(MergeOperationTest, missing_replica_is_included_in_limited_node_list) {
diff --git a/storage/src/tests/distributor/splitbuckettest.cpp b/storage/src/tests/distributor/splitbuckettest.cpp
index 8c8da1bb197..ec58992ed3e 100644
--- a/storage/src/tests/distributor/splitbuckettest.cpp
+++ b/storage/src/tests/distributor/splitbuckettest.cpp
@@ -261,7 +261,6 @@ TEST_F(SplitOperationTest, operation_blocked_by_pending_join) {
framework::defaultimplementation::FakeClock clock;
compReg.setClock(clock);
clock.setAbsoluteTimeInSeconds(1);
- PendingMessageTracker tracker(compReg);
OperationSequencer op_seq;
enableDistributorClusterState("distributor:1 storage:2");
@@ -274,7 +273,7 @@ TEST_F(SplitOperationTest, operation_blocked_by_pending_join) {
joinCmd->getSourceBuckets() = joinSources;
joinCmd->setAddress(_Storage0Address);
- tracker.insert(joinCmd);
+ pending_message_tracker().insert(joinCmd);
insertBucketInfo(joinTarget, 0, 0xabc, 1000, 1234, true);
@@ -284,18 +283,18 @@ TEST_F(SplitOperationTest, operation_blocked_by_pending_join) {
splitCount,
splitByteSize);
- EXPECT_TRUE(op.isBlocked(tracker, op_seq));
+ EXPECT_TRUE(op.isBlocked(operation_context(), op_seq));
// Now, pretend there's a join for another node in the same bucket. This
// will happen when a join is partially completed.
- tracker.clearMessagesForNode(0);
- EXPECT_FALSE(op.isBlocked(tracker, op_seq));
+ pending_message_tracker().clearMessagesForNode(0);
+ EXPECT_FALSE(op.isBlocked(operation_context(), op_seq));
joinCmd->setAddress(api::StorageMessageAddress::create(dummy_cluster_context.cluster_name_ptr(),
lib::NodeType::STORAGE, 1));
- tracker.insert(joinCmd);
+ pending_message_tracker().insert(joinCmd);
- EXPECT_TRUE(op.isBlocked(tracker, op_seq));
+ EXPECT_TRUE(op.isBlocked(operation_context(), op_seq));
}
TEST_F(SplitOperationTest, split_is_blocked_by_locked_bucket) {
@@ -303,7 +302,6 @@ TEST_F(SplitOperationTest, split_is_blocked_by_locked_bucket) {
framework::defaultimplementation::FakeClock clock;
compReg.setClock(clock);
clock.setAbsoluteTimeInSeconds(1);
- PendingMessageTracker tracker(compReg);
OperationSequencer op_seq;
enableDistributorClusterState("distributor:1 storage:2");
@@ -314,10 +312,10 @@ TEST_F(SplitOperationTest, split_is_blocked_by_locked_bucket) {
SplitOperation op(dummy_cluster_context, BucketAndNodes(makeDocumentBucket(source_bucket), toVector<uint16_t>(0)),
maxSplitBits, splitCount, splitByteSize);
- EXPECT_FALSE(op.isBlocked(tracker, op_seq));
+ EXPECT_FALSE(op.isBlocked(operation_context(), op_seq));
auto token = op_seq.try_acquire(makeDocumentBucket(source_bucket), "foo");
EXPECT_TRUE(token.valid());
- EXPECT_TRUE(op.isBlocked(tracker, op_seq));
+ EXPECT_TRUE(op.isBlocked(operation_context(), op_seq));
}
} // storage::distributor
diff --git a/storage/src/vespa/storage/distributor/blockingoperationstarter.cpp b/storage/src/vespa/storage/distributor/blockingoperationstarter.cpp
index 25c38888098..e9b53e35b61 100644
--- a/storage/src/vespa/storage/distributor/blockingoperationstarter.cpp
+++ b/storage/src/vespa/storage/distributor/blockingoperationstarter.cpp
@@ -7,7 +7,7 @@ namespace storage::distributor {
bool
BlockingOperationStarter::start(const std::shared_ptr<Operation>& operation, Priority priority)
{
- if (operation->isBlocked(_messageTracker, _operation_sequencer)) {
+ if (operation->isBlocked(_operation_context, _operation_sequencer)) {
return true;
}
return _starterImpl.start(operation, priority);
diff --git a/storage/src/vespa/storage/distributor/blockingoperationstarter.h b/storage/src/vespa/storage/distributor/blockingoperationstarter.h
index e79ae6b4a79..180e617d08d 100644
--- a/storage/src/vespa/storage/distributor/blockingoperationstarter.h
+++ b/storage/src/vespa/storage/distributor/blockingoperationstarter.h
@@ -6,16 +6,16 @@
namespace storage::distributor {
-class PendingMessageTracker;
+class DistributorStripeOperationContext;
class OperationSequencer;
class BlockingOperationStarter : public OperationStarter
{
public:
- BlockingOperationStarter(PendingMessageTracker& messageTracker,
+ BlockingOperationStarter(DistributorStripeOperationContext& ctx,
OperationSequencer& operation_sequencer,
OperationStarter& starterImpl)
- : _messageTracker(messageTracker),
+ : _operation_context(ctx),
_operation_sequencer(operation_sequencer),
_starterImpl(starterImpl)
{}
@@ -24,9 +24,9 @@ public:
bool start(const std::shared_ptr<Operation>& operation, Priority priority) override;
private:
- PendingMessageTracker& _messageTracker;
- OperationSequencer& _operation_sequencer;
- OperationStarter& _starterImpl;
+ DistributorStripeOperationContext& _operation_context;
+ OperationSequencer& _operation_sequencer;
+ OperationStarter& _starterImpl;
};
}
diff --git a/storage/src/vespa/storage/distributor/distributor_operation_context.h b/storage/src/vespa/storage/distributor/distributor_operation_context.h
index aa598835cdb..e0d481a322a 100644
--- a/storage/src/vespa/storage/distributor/distributor_operation_context.h
+++ b/storage/src/vespa/storage/distributor/distributor_operation_context.h
@@ -20,7 +20,7 @@ public:
virtual ~DistributorOperationContext() {}
virtual api::Timestamp generate_unique_timestamp() = 0;
// TODO STRIPE: Access to bucket space repos is only temporary at this level.
- virtual const DistributorBucketSpaceRepo& bucket_space_repo() const noexcept= 0;
+ virtual const DistributorBucketSpaceRepo& bucket_space_repo() const noexcept = 0;
virtual DistributorBucketSpaceRepo& bucket_space_repo() noexcept = 0;
virtual const DistributorBucketSpaceRepo& read_only_bucket_space_repo() const noexcept = 0;
virtual DistributorBucketSpaceRepo& read_only_bucket_space_repo() noexcept = 0;
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.cpp b/storage/src/vespa/storage/distributor/distributor_stripe.cpp
index bf78707cfd9..4f6e2d5016b 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe.cpp
+++ b/storage/src/vespa/storage/distributor/distributor_stripe.cpp
@@ -71,7 +71,7 @@ DistributorStripe::DistributorStripe(DistributorComponentRegister& compReg,
_bucketPriorityDb(std::make_unique<SimpleBucketPriorityDatabase>()),
_scanner(std::make_unique<SimpleMaintenanceScanner>(*_bucketPriorityDb, _idealStateManager, *_bucketSpaceRepo)),
_throttlingStarter(std::make_unique<ThrottlingOperationStarter>(_maintenanceOperationOwner)),
- _blockingStarter(std::make_unique<BlockingOperationStarter>(_pendingMessageTracker, *_operation_sequencer,
+ _blockingStarter(std::make_unique<BlockingOperationStarter>(_component, *_operation_sequencer,
*_throttlingStarter)),
_scheduler(std::make_unique<MaintenanceScheduler>(_idealStateManager, *_bucketPriorityDb, *_blockingStarter)),
_schedulingMode(MaintenanceScheduler::NORMAL_SCHEDULING_MODE),
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_component.h b/storage/src/vespa/storage/distributor/distributor_stripe_component.h
index 31ee9ca88d2..e47d73cc4df 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe_component.h
+++ b/storage/src/vespa/storage/distributor/distributor_stripe_component.h
@@ -140,6 +140,9 @@ public:
PendingMessageTracker& pending_message_tracker() noexcept override {
return getDistributor().getPendingMessageTracker();
}
+ const PendingMessageTracker& pending_message_tracker() const noexcept override {
+ return getDistributor().getPendingMessageTracker();
+ }
bool has_pending_message(uint16_t node_index,
const document::Bucket& bucket,
uint32_t message_type) const override;
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_interface.h b/storage/src/vespa/storage/distributor/distributor_stripe_interface.h
index bd9a4e1de57..24db212c120 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe_interface.h
+++ b/storage/src/vespa/storage/distributor/distributor_stripe_interface.h
@@ -24,7 +24,6 @@ class PendingMessageTracker;
class DistributorStripeInterface : public DistributorStripeMessageSender
{
public:
- virtual PendingMessageTracker& getPendingMessageTracker() = 0;
virtual DistributorMetricSet& getMetrics() = 0;
virtual void enableClusterStateBundle(const lib::ClusterStateBundle& state) = 0;
virtual const lib::ClusterState* pendingClusterStateOrNull(const document::BucketSpace&) const = 0;
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_operation_context.h b/storage/src/vespa/storage/distributor/distributor_stripe_operation_context.h
index 8419abeadaa..518c83d7ffa 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe_operation_context.h
+++ b/storage/src/vespa/storage/distributor/distributor_stripe_operation_context.h
@@ -22,7 +22,7 @@ class PendingMessageTracker;
*/
class DistributorStripeOperationContext : public DistributorOperationContext {
public:
- virtual ~DistributorStripeOperationContext() {}
+ virtual ~DistributorStripeOperationContext() = default;
virtual void update_bucket_database(const document::Bucket& bucket,
const BucketCopy& changed_node,
uint32_t update_flags = 0) = 0;
@@ -41,6 +41,7 @@ public:
uint8_t pri) = 0;
virtual OperationRoutingSnapshot read_snapshot_for_bucket(const document::Bucket& bucket) const = 0;
virtual PendingMessageTracker& pending_message_tracker() noexcept = 0;
+ virtual const PendingMessageTracker& pending_message_tracker() const noexcept = 0;
virtual bool has_pending_message(uint16_t node_index,
const document::Bucket& bucket,
uint32_t message_type) const = 0;
diff --git a/storage/src/vespa/storage/distributor/distributormessagesender.h b/storage/src/vespa/storage/distributor/distributormessagesender.h
index c39e3e8fe8a..c5a164ed036 100644
--- a/storage/src/vespa/storage/distributor/distributormessagesender.h
+++ b/storage/src/vespa/storage/distributor/distributormessagesender.h
@@ -26,6 +26,7 @@ public:
class DistributorStripeMessageSender : public DistributorMessageSender {
public:
+ virtual PendingMessageTracker& getPendingMessageTracker() = 0;
virtual const PendingMessageTracker& getPendingMessageTracker() const = 0;
virtual const OperationSequencer& operation_sequencer() const noexcept = 0;
};
diff --git a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
index e703c5bfdb8..2bc779aa47e 100644
--- a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
+++ b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
@@ -54,6 +54,9 @@ public:
const ClusterContext & cluster_context() const override {
return _node_ctx;
}
+ PendingMessageTracker& getPendingMessageTracker() override {
+ abort(); // Never called by the messages using this component.
+ }
const PendingMessageTracker& getPendingMessageTracker() const override {
abort(); // Never called by the messages using this component.
}
diff --git a/storage/src/vespa/storage/distributor/operationowner.h b/storage/src/vespa/storage/distributor/operationowner.h
index d3f46343ebc..c469b35a8dc 100644
--- a/storage/src/vespa/storage/distributor/operationowner.h
+++ b/storage/src/vespa/storage/distributor/operationowner.h
@@ -43,6 +43,10 @@ public:
return _sender.cluster_context();
}
+ PendingMessageTracker& getPendingMessageTracker() override {
+ return _sender.getPendingMessageTracker();
+ }
+
const PendingMessageTracker& getPendingMessageTracker() const override {
return _sender.getPendingMessageTracker();
}
diff --git a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
index 9077f3dc288..db30fcc7196 100644
--- a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
@@ -86,6 +86,10 @@ struct IntermediateMessageSender : DistributorStripeMessageSender {
return forward.cluster_context();
}
+ PendingMessageTracker& getPendingMessageTracker() override {
+ return forward.getPendingMessageTracker();
+ }
+
const PendingMessageTracker& getPendingMessageTracker() const override {
return forward.getPendingMessageTracker();
}
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp
index 1a48df0fd7c..f11d1c26da2 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp
@@ -156,41 +156,26 @@ public:
}
};
-// TODO STRIPE replace with check for pending cluster state transition.
-// Null-bucket messages are not intercepted nor observeable by stripes,
-// only by the top-level distributor.
-bool
-checkNullBucketRequestBucketInfoMessage(uint16_t node,
- document::BucketSpace bucketSpace,
- const PendingMessageTracker& tracker)
-{
- RequestBucketInfoChecker rchk;
- // Check messages sent to null-bucket (i.e. any bucket) for the node.
- document::Bucket nullBucket(bucketSpace, document::BucketId());
- tracker.checkPendingMessages(node, nullBucket, rchk);
- return rchk.blocked;
-}
-
}
bool
IdealStateOperation::checkBlock(const document::Bucket &bucket,
- const PendingMessageTracker& tracker,
+ const DistributorStripeOperationContext& ctx,
const OperationSequencer& seq) const
{
if (seq.is_blocked(bucket)) {
return true;
}
+ if (ctx.pending_cluster_state_or_null(bucket.getBucketSpace())) {
+ return true;
+ }
IdealStateOpChecker ichk(*this);
const std::vector<uint16_t>& nodes(getNodes());
for (auto node : nodes) {
- tracker.checkPendingMessages(node, bucket, ichk);
+ ctx.pending_message_tracker().checkPendingMessages(node, bucket, ichk);
if (ichk.blocked) {
return true;
}
- if (checkNullBucketRequestBucketInfoMessage(node, bucket.getBucketSpace(), tracker)) {
- return true;
- }
}
return false;
}
@@ -198,32 +183,25 @@ IdealStateOperation::checkBlock(const document::Bucket &bucket,
bool
IdealStateOperation::checkBlockForAllNodes(
const document::Bucket &bucket,
- const PendingMessageTracker& tracker,
+ const DistributorStripeOperationContext& ctx,
const OperationSequencer& seq) const
{
if (seq.is_blocked(bucket)) {
return true;
}
- IdealStateOpChecker ichk(*this);
- // Check messages sent to _any node_ for _this_ particular bucket.
- tracker.checkPendingMessages(bucket, ichk);
- if (ichk.blocked) {
+ if (ctx.pending_cluster_state_or_null(bucket.getBucketSpace())) {
return true;
}
- const std::vector<uint16_t>& nodes(getNodes());
- for (auto node : nodes) {
- if (checkNullBucketRequestBucketInfoMessage(node, bucket.getBucketSpace(), tracker)) {
- return true;
- }
- }
- return false;
+ IdealStateOpChecker ichk(*this);
+ // Check messages sent to _any node_ for _this_ particular bucket.
+ ctx.pending_message_tracker().checkPendingMessages(bucket, ichk);
+ return ichk.blocked;
}
-
bool
-IdealStateOperation::isBlocked(const PendingMessageTracker& tracker, const OperationSequencer& op_seq) const
+IdealStateOperation::isBlocked(const DistributorStripeOperationContext& ctx, const OperationSequencer& op_seq) const
{
- return checkBlock(getBucket(), tracker, op_seq);
+ return checkBlock(getBucket(), ctx, op_seq);
}
std::string
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h
index 0e45d7f3b3a..d41640b468e 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h
@@ -182,7 +182,7 @@ public:
* Returns true if we are blocked to start this operation given
* the pending messages.
*/
- bool isBlocked(const PendingMessageTracker& pendingMessages, const OperationSequencer&) const override;
+ bool isBlocked(const DistributorStripeOperationContext& ctx, const OperationSequencer&) const override;
/**
Returns the priority we should send messages with.
@@ -234,10 +234,10 @@ protected:
* the set of messages checked.
*/
bool checkBlock(const document::Bucket& bucket,
- const PendingMessageTracker& tracker,
+ const DistributorStripeOperationContext& ctx,
const OperationSequencer&) const;
bool checkBlockForAllNodes(const document::Bucket& bucket,
- const PendingMessageTracker& tracker,
+ const DistributorStripeOperationContext& ctx,
const OperationSequencer&) const;
};
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp
index d9e411bc44e..15d3129b309 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp
@@ -161,10 +161,10 @@ JoinOperation::getJoinBucket(size_t idx) const
}
bool
-JoinOperation::isBlocked(const PendingMessageTracker& tracker, const OperationSequencer& op_seq) const
+JoinOperation::isBlocked(const DistributorStripeOperationContext& ctx, const OperationSequencer& op_seq) const
{
- return (checkBlock(getBucket(), tracker, op_seq) ||
- checkBlock(getJoinBucket(0), tracker, op_seq) ||
- (_bucketsToJoin.size() > 1 && checkBlock(getJoinBucket(1), tracker, op_seq)));
+ return (checkBlock(getBucket(), ctx, op_seq) ||
+ checkBlock(getJoinBucket(0), ctx, op_seq) ||
+ (_bucketsToJoin.size() > 1 && checkBlock(getJoinBucket(1), ctx, op_seq)));
}
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.h
index 5796b8d3fa1..4515092cfef 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.h
@@ -35,7 +35,7 @@ public:
return JOIN_BUCKET;
}
- bool isBlocked(const PendingMessageTracker& pendingMessages,
+ bool isBlocked(const DistributorStripeOperationContext& ctx,
const OperationSequencer& op_seq) const override;
protected:
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp
index 27e203a9060..749787c51b9 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp
@@ -235,7 +235,7 @@ MergeOperation::deleteSourceOnlyNodes(
BucketAndNodes(getBucket(), sourceOnlyNodes));
// Must not send removes to source only copies if something has caused
// pending load to the copy after the merge was sent!
- if (_removeOperation->isBlocked(sender.getPendingMessageTracker(), sender.operation_sequencer())) {
+ if (_removeOperation->isBlocked(_manager->operation_context(), sender.operation_sequencer())) {
LOG(debug, "Source only removal for %s was blocked by a pending operation",
getBucketId().toString().c_str());
_ok = false;
@@ -324,7 +324,7 @@ bool MergeOperation::shouldBlockThisOperation(uint32_t messageType, uint8_t pri)
return IdealStateOperation::shouldBlockThisOperation(messageType, pri);
}
-bool MergeOperation::isBlocked(const PendingMessageTracker& pending_tracker,
+bool MergeOperation::isBlocked(const DistributorStripeOperationContext& ctx,
const OperationSequencer& op_seq) const {
// To avoid starvation of high priority global bucket merges, we do not consider
// these for blocking due to a node being "busy" (usually caused by a full merge
@@ -338,14 +338,14 @@ bool MergeOperation::isBlocked(const PendingMessageTracker& pending_tracker,
// 2. Global bucket merges have high priority and will most likely be allowed
// to enter the merge throttler queues, displacing lower priority merges.
if (!is_global_bucket_merge()) {
- const auto& node_info = pending_tracker.getNodeInfo();
+ const auto& node_info = ctx.pending_message_tracker().getNodeInfo();
for (auto node : getNodes()) {
if (node_info.isBusy(node)) {
return true;
}
}
}
- return IdealStateOperation::isBlocked(pending_tracker, op_seq);
+ return IdealStateOperation::isBlocked(ctx, op_seq);
}
bool MergeOperation::is_global_bucket_merge() const noexcept {
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h
index 11b5494fd9b..945b9318482 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h
@@ -47,7 +47,7 @@ public:
std::vector<MergeMetaData>&);
bool shouldBlockThisOperation(uint32_t messageType, uint8_t pri) const override;
- bool isBlocked(const PendingMessageTracker& pendingMessages, const OperationSequencer&) const override;
+ bool isBlocked(const DistributorStripeOperationContext& ctx, const OperationSequencer&) const override;
private:
static void addIdealNodes(
const std::vector<uint16_t>& idealNodes,
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp
index 437c4ed6033..d7f03740e4d 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp
@@ -143,9 +143,9 @@ SplitOperation::onReceive(DistributorStripeMessageSender&, const api::StorageRep
}
bool
-SplitOperation::isBlocked(const PendingMessageTracker& tracker, const OperationSequencer& op_seq) const
+SplitOperation::isBlocked(const DistributorStripeOperationContext& ctx, const OperationSequencer& op_seq) const
{
- return checkBlockForAllNodes(getBucket(), tracker, op_seq);
+ return checkBlockForAllNodes(getBucket(), ctx, op_seq);
}
bool
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.h
index eccbdc69869..5581edf41bd 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.h
@@ -20,7 +20,7 @@ public:
void onReceive(DistributorStripeMessageSender& sender, const std::shared_ptr<api::StorageReply> &) override;
const char* getName() const override { return "split"; };
Type getType() const override { return SPLIT_BUCKET; }
- bool isBlocked(const PendingMessageTracker&, const OperationSequencer&) const override;
+ bool isBlocked(const DistributorStripeOperationContext&, const OperationSequencer&) const override;
bool shouldBlockThisOperation(uint32_t, uint8_t) const override;
protected:
MessageTracker _tracker;
diff --git a/storage/src/vespa/storage/distributor/operations/operation.h b/storage/src/vespa/storage/distributor/operations/operation.h
index 5099762fd6a..18f7214c498 100644
--- a/storage/src/vespa/storage/distributor/operations/operation.h
+++ b/storage/src/vespa/storage/distributor/operations/operation.h
@@ -16,6 +16,7 @@ class StorageComponent;
namespace distributor {
+class DistributorStripeOperationContext;
class PendingMessageTracker;
class OperationSequencer;
@@ -61,7 +62,7 @@ public:
* Returns true if we are blocked to start this operation given
* the pending messages.
*/
- virtual bool isBlocked(const PendingMessageTracker&, const OperationSequencer&) const {
+ virtual bool isBlocked(const DistributorStripeOperationContext&, const OperationSequencer&) const {
return false;
}