diff options
author | Tor Brede Vekterli <vekterli@yahooinc.com> | 2021-09-30 09:37:07 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@yahooinc.com> | 2021-10-14 10:27:34 +0000 |
commit | 0e59cdfc303a4a2273f0e606e04060fabfe7dd5e (patch) | |
tree | 31e3f23322d1bdf073a964b75ce204d2438c3f4d /storage | |
parent | e796dcf2409e6c9bfef432eded6cd90d69ce0c27 (diff) |
Don't let a blocked maintenance operation inhibit remaining maintenance queue
The old maintenance scheduler behavior is to only remove a bucket from the
priority DB if its maintenance operation was successfully started. Failing
to start an operation could happen from both max pending throttling as well
as operation/bucket-specific blocking behavior. Since the scheduler would
encounter the same bucket as the one previously blocked upon its next tick
invocation, a single blocked bucket would run the risk of head-of-line
stalling the rest of the remaining maintenance queue (assuming the ongoing
DB scan did not encounter any higher priority buckets).
This commit changes the following aspects of maintenance scheduling:
* Always clear entries from the priority DB before trying to start an
operation. A blocked operation will be retried the next time the
regular bucket DB scan encounters the bucket.
* Avoid trying to start (and clear) inherently doomed operations by
_not_ trying to schedule any operations if it would be blocked due
to too many pending maintenance operations anyway. Introduces a
new `PendingWindowChecker` interface for this purpose.
* Explicitly inhibit all maintenance scheduling if a pending cluster
state is present. Operations are already _implicitly_ blocked from
starting if there's a pending cluster state, but this would cause
the priority DB from being pointlessly cleared.
Diffstat (limited to 'storage')
9 files changed, 150 insertions, 72 deletions
diff --git a/storage/src/tests/distributor/maintenancemocks.h b/storage/src/tests/distributor/maintenancemocks.h index f708fbd41aa..2809a81f79b 100644 --- a/storage/src/tests/distributor/maintenancemocks.h +++ b/storage/src/tests/distributor/maintenancemocks.h @@ -4,6 +4,7 @@ #include <vespa/document/test/make_bucket_space.h> #include <vespa/storage/distributor/maintenance/maintenanceprioritygenerator.h> #include <vespa/storage/distributor/maintenance/maintenanceoperationgenerator.h> +#include <vespa/storage/distributor/maintenance/pending_window_checker.h> #include <vespa/storage/distributor/operationstarter.h> #include <vespa/storage/distributor/operations/operation.h> #include <vespa/storageframework/defaultimplementation/clock/fakeclock.h> @@ -122,4 +123,16 @@ public: } }; +class MockPendingWindowChecker : public PendingWindowChecker { + bool _allow = true; +public: + void allow_operations(bool allow) noexcept { + _allow = allow; + } + + bool may_allow_operation_with_priority(OperationStarter::Priority) const noexcept override { + return _allow; + } +}; + } diff --git a/storage/src/tests/distributor/maintenanceschedulertest.cpp b/storage/src/tests/distributor/maintenanceschedulertest.cpp index a97ffeef24b..8f13e10fedc 100644 --- a/storage/src/tests/distributor/maintenanceschedulertest.cpp +++ b/storage/src/tests/distributor/maintenanceschedulertest.cpp @@ -18,60 +18,77 @@ using Priority = MaintenancePriority; using WaitTimeMs = MaintenanceScheduler::WaitTimeMs; struct MaintenanceSchedulerTest : Test { - std::unique_ptr<SimpleBucketPriorityDatabase> _priorityDb; - std::unique_ptr<MockMaintenanceOperationGenerator> _operationGenerator; - std::unique_ptr<MockOperationStarter> _operationStarter; - std::unique_ptr<MaintenanceScheduler> _scheduler; + SimpleBucketPriorityDatabase _priority_db; + MockMaintenanceOperationGenerator _operation_generator; + MockOperationStarter _operation_starter; + MockPendingWindowChecker _pending_window_checker; + MaintenanceScheduler _scheduler; - void SetUp() override; + MaintenanceSchedulerTest() + : _priority_db(), + _operation_generator(), + _operation_starter(), + _pending_window_checker(), + _scheduler(_operation_generator, _priority_db, _pending_window_checker, _operation_starter) + {} }; -void -MaintenanceSchedulerTest::SetUp() -{ - _priorityDb = std::make_unique<SimpleBucketPriorityDatabase>(); - _operationGenerator = std::make_unique<MockMaintenanceOperationGenerator>(); - _operationStarter = std::make_unique<MockOperationStarter>(); - _scheduler = std::make_unique<MaintenanceScheduler>(*_operationGenerator, *_priorityDb, *_operationStarter); -} - TEST_F(MaintenanceSchedulerTest, priority_cleared_after_scheduled) { - _priorityDb->setPriority(PrioritizedBucket(makeDocumentBucket(BucketId(16, 1)), Priority::HIGHEST)); - _scheduler->tick(MaintenanceScheduler::NORMAL_SCHEDULING_MODE); - EXPECT_EQ("", _priorityDb->toString()); + _priority_db.setPriority(PrioritizedBucket(makeDocumentBucket(BucketId(16, 1)), Priority::HIGHEST)); + _scheduler.tick(MaintenanceScheduler::NORMAL_SCHEDULING_MODE); + EXPECT_EQ("", _priority_db.toString()); } TEST_F(MaintenanceSchedulerTest, operation_is_scheduled) { - _priorityDb->setPriority(PrioritizedBucket(makeDocumentBucket(BucketId(16, 1)), Priority::MEDIUM)); - _scheduler->tick(MaintenanceScheduler::NORMAL_SCHEDULING_MODE); + _priority_db.setPriority(PrioritizedBucket(makeDocumentBucket(BucketId(16, 1)), Priority::MEDIUM)); + _scheduler.tick(MaintenanceScheduler::NORMAL_SCHEDULING_MODE); EXPECT_EQ("Bucket(BucketSpace(0x0000000000000001), BucketId(0x4000000000000001)), pri 100\n", - _operationStarter->toString()); + _operation_starter.toString()); +} + +TEST_F(MaintenanceSchedulerTest, operation_is_not_scheduled_if_pending_ops_not_accepted) { + _priority_db.setPriority(PrioritizedBucket(makeDocumentBucket(BucketId(16, 1)), Priority::MEDIUM)); + _pending_window_checker.allow_operations(false); + _scheduler.tick(MaintenanceScheduler::NORMAL_SCHEDULING_MODE); + EXPECT_EQ("", _operation_starter.toString()); + // Priority DB entry is not cleared + EXPECT_EQ("PrioritizedBucket(Bucket(BucketSpace(0x0000000000000001), BucketId(0x4000000000000001)), pri MEDIUM)\n", + _priority_db.toString()); } TEST_F(MaintenanceSchedulerTest, no_operations_to_schedule) { - WaitTimeMs waitMs(_scheduler->tick(MaintenanceScheduler::NORMAL_SCHEDULING_MODE)); + WaitTimeMs waitMs(_scheduler.tick(MaintenanceScheduler::NORMAL_SCHEDULING_MODE)); EXPECT_EQ(WaitTimeMs(1), waitMs); - EXPECT_EQ("", _operationStarter->toString()); + EXPECT_EQ("", _operation_starter.toString()); } TEST_F(MaintenanceSchedulerTest, suppress_low_priorities_in_emergency_mode) { - _priorityDb->setPriority(PrioritizedBucket(makeDocumentBucket(BucketId(16, 1)), Priority::VERY_HIGH)); - _priorityDb->setPriority(PrioritizedBucket(makeDocumentBucket(BucketId(16, 2)), Priority::HIGHEST)); - EXPECT_EQ(WaitTimeMs(0), _scheduler->tick(MaintenanceScheduler::RECOVERY_SCHEDULING_MODE)); - EXPECT_EQ(WaitTimeMs(1), _scheduler->tick(MaintenanceScheduler::RECOVERY_SCHEDULING_MODE)); + _priority_db.setPriority(PrioritizedBucket(makeDocumentBucket(BucketId(16, 1)), Priority::VERY_HIGH)); + _priority_db.setPriority(PrioritizedBucket(makeDocumentBucket(BucketId(16, 2)), Priority::HIGHEST)); + EXPECT_EQ(WaitTimeMs(0), _scheduler.tick(MaintenanceScheduler::RECOVERY_SCHEDULING_MODE)); + EXPECT_EQ(WaitTimeMs(1), _scheduler.tick(MaintenanceScheduler::RECOVERY_SCHEDULING_MODE)); EXPECT_EQ("Bucket(BucketSpace(0x0000000000000001), BucketId(0x4000000000000002)), pri 0\n", - _operationStarter->toString()); + _operation_starter.toString()); EXPECT_EQ("PrioritizedBucket(Bucket(BucketSpace(0x0000000000000001), BucketId(0x4000000000000001)), pri VERY_HIGH)\n", - _priorityDb->toString()); + _priority_db.toString()); +} + +TEST_F(MaintenanceSchedulerTest, priority_cleared_if_operation_not_started_inside_pending_window) { + _priority_db.setPriority(PrioritizedBucket(makeDocumentBucket(BucketId(16, 1)), Priority::HIGH)); + _operation_starter.setShouldStartOperations(false); + WaitTimeMs waitMs(_scheduler.tick(MaintenanceScheduler::NORMAL_SCHEDULING_MODE)); + EXPECT_EQ(WaitTimeMs(1), waitMs); + EXPECT_EQ("", _priority_db.toString()); } -TEST_F(MaintenanceSchedulerTest, priority_not_cleared_if_operation_not_started) { - _priorityDb->setPriority(PrioritizedBucket(makeDocumentBucket(BucketId(16, 1)), Priority::HIGH)); - _operationStarter->setShouldStartOperations(false); - WaitTimeMs waitMs(_scheduler->tick(MaintenanceScheduler::NORMAL_SCHEDULING_MODE)); +TEST_F(MaintenanceSchedulerTest, priority_not_cleared_if_operation_not_started_outside_pending_window) { + _priority_db.setPriority(PrioritizedBucket(makeDocumentBucket(BucketId(16, 1)), Priority::HIGH)); + _operation_starter.setShouldStartOperations(false); + _pending_window_checker.allow_operations(false); + WaitTimeMs waitMs(_scheduler.tick(MaintenanceScheduler::NORMAL_SCHEDULING_MODE)); EXPECT_EQ(WaitTimeMs(1), waitMs); EXPECT_EQ("PrioritizedBucket(Bucket(BucketSpace(0x0000000000000001), BucketId(0x4000000000000001)), pri HIGH)\n", - _priorityDb->toString()); + _priority_db.toString()); } } diff --git a/storage/src/tests/distributor/throttlingoperationstartertest.cpp b/storage/src/tests/distributor/throttlingoperationstartertest.cpp index eac345a243f..4e99388327b 100644 --- a/storage/src/tests/distributor/throttlingoperationstartertest.cpp +++ b/storage/src/tests/distributor/throttlingoperationstartertest.cpp @@ -21,7 +21,7 @@ const MockOperation& as_mock_operation(const Operation& operation) { struct ThrottlingOperationStarterTest : Test { std::shared_ptr<Operation> createMockOperation() { - return std::shared_ptr<Operation>(new MockOperation(makeDocumentBucket(BucketId(16, 1)))); + return std::make_shared<MockOperation>(makeDocumentBucket(BucketId(16, 1))); } std::unique_ptr<MockOperationStarter> _starterImpl; @@ -48,14 +48,12 @@ ThrottlingOperationStarterTest::TearDown() TEST_F(ThrottlingOperationStarterTest, operation_not_throttled_when_slot_available) { auto operation = createMockOperation(); - EXPECT_TRUE(_operationStarter->start(operation, - OperationStarter::Priority(0))); + EXPECT_TRUE(_operationStarter->start(operation, OperationStarter::Priority(0))); EXPECT_FALSE(as_mock_operation(*operation).get_was_throttled()); } TEST_F(ThrottlingOperationStarterTest, operation_starting_is_forwarded_to_implementation) { - ASSERT_TRUE(_operationStarter->start(createMockOperation(), - OperationStarter::Priority(0))); + ASSERT_TRUE(_operationStarter->start(createMockOperation(), OperationStarter::Priority(0))); EXPECT_EQ("Bucket(BucketSpace(0x0000000000000001), BucketId(0x4000000000000001)), pri 0\n", _starterImpl->toString()); } @@ -63,8 +61,8 @@ TEST_F(ThrottlingOperationStarterTest, operation_starting_is_forwarded_to_implem TEST_F(ThrottlingOperationStarterTest, operation_throttled_when_no_available_slots) { _operationStarter->setMaxPendingRange(0, 0); auto operation = createMockOperation(); - EXPECT_FALSE(_operationStarter->start(operation, - OperationStarter::Priority(0))); + EXPECT_FALSE(_operationStarter->may_allow_operation_with_priority(OperationStarter::Priority(0))); + EXPECT_FALSE(_operationStarter->start(operation, OperationStarter::Priority(0))); EXPECT_TRUE(as_mock_operation(*operation).get_was_throttled()); } @@ -88,32 +86,35 @@ TEST_F(ThrottlingOperationStarterTest, throttling_with_max_pending_range) { TEST_F(ThrottlingOperationStarterTest, starting_operations_fills_up_pending_window) { _operationStarter->setMaxPendingRange(1, 3); - EXPECT_TRUE(_operationStarter->start(createMockOperation(), - OperationStarter::Priority(255))); - EXPECT_FALSE(_operationStarter->start(createMockOperation(), - OperationStarter::Priority(255))); - EXPECT_TRUE(_operationStarter->start(createMockOperation(), - OperationStarter::Priority(100))); - EXPECT_FALSE(_operationStarter->start(createMockOperation(), - OperationStarter::Priority(100))); - EXPECT_TRUE(_operationStarter->start(createMockOperation(), - OperationStarter::Priority(0))); - EXPECT_FALSE(_operationStarter->start(createMockOperation(), - OperationStarter::Priority(0))); + EXPECT_TRUE(_operationStarter->may_allow_operation_with_priority(OperationStarter::Priority(255))); + EXPECT_TRUE(_operationStarter->start(createMockOperation(), OperationStarter::Priority(255))); + + EXPECT_FALSE(_operationStarter->may_allow_operation_with_priority(OperationStarter::Priority(255))); + EXPECT_FALSE(_operationStarter->start(createMockOperation(), OperationStarter::Priority(255))); + + EXPECT_TRUE(_operationStarter->may_allow_operation_with_priority(OperationStarter::Priority(100))); + EXPECT_TRUE(_operationStarter->start(createMockOperation(), OperationStarter::Priority(100))); + + EXPECT_FALSE(_operationStarter->may_allow_operation_with_priority(OperationStarter::Priority(255))); + EXPECT_FALSE(_operationStarter->start(createMockOperation(), OperationStarter::Priority(100))); + + EXPECT_TRUE(_operationStarter->may_allow_operation_with_priority(OperationStarter::Priority(0))); + EXPECT_TRUE(_operationStarter->start(createMockOperation(), OperationStarter::Priority(0))); + + EXPECT_FALSE(_operationStarter->may_allow_operation_with_priority(OperationStarter::Priority(0))); + EXPECT_FALSE(_operationStarter->start(createMockOperation(), OperationStarter::Priority(0))); } TEST_F(ThrottlingOperationStarterTest, finishing_operations_allows_more_to_start) { _operationStarter->setMaxPendingRange(1, 1); - EXPECT_TRUE(_operationStarter->start(createMockOperation(), - OperationStarter::Priority(255))); - EXPECT_FALSE(_operationStarter->start(createMockOperation(), - OperationStarter::Priority(255))); + EXPECT_TRUE(_operationStarter->start(createMockOperation(), OperationStarter::Priority(255))); + EXPECT_FALSE(_operationStarter->start(createMockOperation(), OperationStarter::Priority(255))); EXPECT_FALSE(_starterImpl->getOperations().empty()); _starterImpl->getOperations().pop_back(); - EXPECT_TRUE(_operationStarter->start(createMockOperation(), - OperationStarter::Priority(255))); + EXPECT_TRUE(_operationStarter->may_allow_operation_with_priority(OperationStarter::Priority(255))); + EXPECT_TRUE(_operationStarter->start(createMockOperation(), OperationStarter::Priority(255))); EXPECT_FALSE(_starterImpl->getOperations().empty()); } diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.cpp b/storage/src/vespa/storage/distributor/distributor_stripe.cpp index d87b2db1638..e45ef13f4ef 100644 --- a/storage/src/vespa/storage/distributor/distributor_stripe.cpp +++ b/storage/src/vespa/storage/distributor/distributor_stripe.cpp @@ -62,7 +62,8 @@ DistributorStripe::DistributorStripe(DistributorComponentRegister& compReg, _throttlingStarter(std::make_unique<ThrottlingOperationStarter>(_maintenanceOperationOwner)), _blockingStarter(std::make_unique<BlockingOperationStarter>(_component, *_operation_sequencer, *_throttlingStarter)), - _scheduler(std::make_unique<MaintenanceScheduler>(_idealStateManager, *_bucketPriorityDb, *_blockingStarter)), + _scheduler(std::make_unique<MaintenanceScheduler>(_idealStateManager, *_bucketPriorityDb, + *_throttlingStarter, *_blockingStarter)), _schedulingMode(MaintenanceScheduler::NORMAL_SCHEDULING_MODE), _recoveryTimeStarted(_component.getClock()), _tickResult(framework::ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN), @@ -709,7 +710,9 @@ DistributorStripe::doNonCriticalTick(framework::ThreadIndex) // did any useful work with incoming data, this check must be performed _after_ the call. if (!should_inhibit_current_maintenance_scan_tick()) { scanNextBucket(); - startNextMaintenanceOperation(); + if (!_bucketDBUpdater.hasPendingClusterState()) { + startNextMaintenanceOperation(); + } if (isInRecoveryMode()) { signalWorkWasDone(); } diff --git a/storage/src/vespa/storage/distributor/maintenance/maintenancescheduler.cpp b/storage/src/vespa/storage/distributor/maintenance/maintenancescheduler.cpp index ee5e1278c50..d910c80d360 100644 --- a/storage/src/vespa/storage/distributor/maintenance/maintenancescheduler.cpp +++ b/storage/src/vespa/storage/distributor/maintenance/maintenancescheduler.cpp @@ -2,6 +2,7 @@ #include "maintenancescheduler.h" #include "maintenanceoperationgenerator.h" +#include "pending_window_checker.h" #include <vespa/storage/distributor/operationstarter.h> #include <vespa/storage/distributor/operations/idealstate/idealstateoperation.h> @@ -13,9 +14,11 @@ namespace storage::distributor { MaintenanceScheduler::MaintenanceScheduler( MaintenanceOperationGenerator& operationGenerator, BucketPriorityDatabase& priorityDb, + const PendingWindowChecker& pending_window_checker, OperationStarter& operationStarter) : _operationGenerator(operationGenerator), _priorityDb(priorityDb), + _pending_window_checker(pending_window_checker), _operationStarter(operationStarter) { } @@ -23,7 +26,7 @@ MaintenanceScheduler::MaintenanceScheduler( PrioritizedBucket MaintenanceScheduler::getMostImportantBucket() { - BucketPriorityDatabase::const_iterator mostImportant(_priorityDb.begin()); + auto mostImportant = _priorityDb.begin(); if (mostImportant == _priorityDb.end()) { return PrioritizedBucket::INVALID; } @@ -38,10 +41,12 @@ MaintenanceScheduler::tick(SchedulingMode currentMode) if (!possibleToSchedule(mostImportant, currentMode)) { return WaitTimeMs(1); } + // If we can't start the operation, move on to the next bucket. Bucket will be + // re-prioritized when the distributor stripe next scans it. + clearPriority(mostImportant); if (!startOperation(mostImportant)) { return WaitTimeMs(1); } - clearPriority(mostImportant); return WaitTimeMs(0); } @@ -49,11 +54,17 @@ bool MaintenanceScheduler::possibleToSchedule(const PrioritizedBucket& bucket, SchedulingMode currentMode) const { + if (!bucket.valid()) { + return false; + } + // If pending window is full nothing of equal or lower priority can be scheduled, so no point in trying. + if (!_pending_window_checker.may_allow_operation_with_priority(convertToOperationPriority(bucket.getPriority()))) { + return false; + } if (currentMode == RECOVERY_SCHEDULING_MODE) { - return (bucket.valid() - && possibleToScheduleInEmergency(bucket)); + return possibleToScheduleInEmergency(bucket); } else { - return bucket.valid(); + return true; } } diff --git a/storage/src/vespa/storage/distributor/maintenance/maintenancescheduler.h b/storage/src/vespa/storage/distributor/maintenance/maintenancescheduler.h index 33eb97292bb..982a90d47d8 100644 --- a/storage/src/vespa/storage/distributor/maintenance/maintenancescheduler.h +++ b/storage/src/vespa/storage/distributor/maintenance/maintenancescheduler.h @@ -9,6 +9,7 @@ namespace storage::distributor { class MaintenanceOperationGenerator; class BucketPriorityDatabase; +class PendingWindowChecker; class MaintenanceScheduler { @@ -22,6 +23,7 @@ public: MaintenanceScheduler(MaintenanceOperationGenerator& operationGenerator, BucketPriorityDatabase& priorityDb, + const PendingWindowChecker& pending_window_checker, OperationStarter& operationStarter); WaitTimeMs tick(SchedulingMode currentMode); @@ -40,6 +42,7 @@ private: MaintenanceOperationGenerator& _operationGenerator; BucketPriorityDatabase& _priorityDb; + const PendingWindowChecker& _pending_window_checker; OperationStarter& _operationStarter; }; diff --git a/storage/src/vespa/storage/distributor/maintenance/pending_window_checker.h b/storage/src/vespa/storage/distributor/maintenance/pending_window_checker.h new file mode 100644 index 00000000000..8a333a3073c --- /dev/null +++ b/storage/src/vespa/storage/distributor/maintenance/pending_window_checker.h @@ -0,0 +1,19 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <vespa/storage/distributor/operationstarter.h> + +namespace storage::distributor { + +/** + * Allows for easily and cheaply checking if an operation with a given internal maintenance + * priority could possibly be started "downstream" due to there being available capacity + * in the maintenance pending window. + */ +class PendingWindowChecker { +public: + virtual ~PendingWindowChecker() = default; + [[nodiscard]] virtual bool may_allow_operation_with_priority(OperationStarter::Priority) const noexcept = 0; +}; + +} diff --git a/storage/src/vespa/storage/distributor/throttlingoperationstarter.cpp b/storage/src/vespa/storage/distributor/throttlingoperationstarter.cpp index ccf8273e386..47404bb365f 100644 --- a/storage/src/vespa/storage/distributor/throttlingoperationstarter.cpp +++ b/storage/src/vespa/storage/distributor/throttlingoperationstarter.cpp @@ -10,6 +10,8 @@ ThrottlingOperationStarter::ThrottlingOperation::~ThrottlingOperation() _operationStarter.signalOperationFinished(*this); } +ThrottlingOperationStarter::~ThrottlingOperationStarter() = default; + bool ThrottlingOperationStarter::canStart(uint32_t currentOperationCount, Priority priority) const { @@ -20,10 +22,9 @@ ThrottlingOperationStarter::canStart(uint32_t currentOperationCount, Priority pr } bool -ThrottlingOperationStarter::start(const std::shared_ptr<Operation>& operation, - Priority priority) +ThrottlingOperationStarter::start(const std::shared_ptr<Operation>& operation, Priority priority) { - if (!canStart(_pendingCount, priority)) { + if (!may_allow_operation_with_priority(priority)) { operation->on_throttled(); return false; } @@ -32,6 +33,12 @@ ThrottlingOperationStarter::start(const std::shared_ptr<Operation>& operation, return _starterImpl.start(wrappedOp, priority); } +bool +ThrottlingOperationStarter::may_allow_operation_with_priority(Priority priority) const noexcept +{ + return canStart(_pendingCount, priority); +} + void ThrottlingOperationStarter::signalOperationFinished(const Operation& op) { diff --git a/storage/src/vespa/storage/distributor/throttlingoperationstarter.h b/storage/src/vespa/storage/distributor/throttlingoperationstarter.h index e63fead8458..00d1237b2f7 100644 --- a/storage/src/vespa/storage/distributor/throttlingoperationstarter.h +++ b/storage/src/vespa/storage/distributor/throttlingoperationstarter.h @@ -2,23 +2,24 @@ #pragma once #include "operationstarter.h" +#include <vespa/storage/distributor/maintenance/pending_window_checker.h> #include <vespa/vespalib/util/hdr_abort.h> #include <vespa/storage/distributor/operations/operation.h> namespace storage::distributor { -class ThrottlingOperationStarter : public OperationStarter +class ThrottlingOperationStarter : public OperationStarter, public PendingWindowChecker { class ThrottlingOperation : public Operation { public: ThrottlingOperation(const Operation::SP& operation, - ThrottlingOperationStarter& operationStarter) + ThrottlingOperationStarter& operationStarter) : _operation(operation), _operationStarter(operationStarter) {} - ~ThrottlingOperation(); + ~ThrottlingOperation() override; private: Operation::SP _operation; ThrottlingOperationStarter& _operationStarter; @@ -53,7 +54,7 @@ class ThrottlingOperationStarter : public OperationStarter HDR_ABORT("should not be reached"); } void onReceive(DistributorStripeMessageSender&, - const std::shared_ptr<api::StorageReply>&) override { + const std::shared_ptr<api::StorageReply>&) override { HDR_ABORT("should not be reached"); } }; @@ -66,9 +67,12 @@ public: _maxPending(UINT32_MAX), _pendingCount(0) {} + ~ThrottlingOperationStarter() override; bool start(const std::shared_ptr<Operation>& operation, Priority priority) override; + bool may_allow_operation_with_priority(Priority priority) const noexcept override; + bool canStart(uint32_t currentOperationCount, Priority priority) const; void setMaxPendingRange(uint32_t minPending, uint32_t maxPending) { |