summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahooinc.com>2021-09-30 09:37:07 +0000
committerTor Brede Vekterli <vekterli@yahooinc.com>2021-10-14 10:27:34 +0000
commit0e59cdfc303a4a2273f0e606e04060fabfe7dd5e (patch)
tree31e3f23322d1bdf073a964b75ce204d2438c3f4d /storage
parente796dcf2409e6c9bfef432eded6cd90d69ce0c27 (diff)
Don't let a blocked maintenance operation inhibit remaining maintenance queue
The old maintenance scheduler behavior is to only remove a bucket from the priority DB if its maintenance operation was successfully started. Failing to start an operation could happen from both max pending throttling as well as operation/bucket-specific blocking behavior. Since the scheduler would encounter the same bucket as the one previously blocked upon its next tick invocation, a single blocked bucket would run the risk of head-of-line stalling the rest of the remaining maintenance queue (assuming the ongoing DB scan did not encounter any higher priority buckets). This commit changes the following aspects of maintenance scheduling: * Always clear entries from the priority DB before trying to start an operation. A blocked operation will be retried the next time the regular bucket DB scan encounters the bucket. * Avoid trying to start (and clear) inherently doomed operations by _not_ trying to schedule any operations if it would be blocked due to too many pending maintenance operations anyway. Introduces a new `PendingWindowChecker` interface for this purpose. * Explicitly inhibit all maintenance scheduling if a pending cluster state is present. Operations are already _implicitly_ blocked from starting if there's a pending cluster state, but this would cause the priority DB from being pointlessly cleared.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/distributor/maintenancemocks.h13
-rw-r--r--storage/src/tests/distributor/maintenanceschedulertest.cpp83
-rw-r--r--storage/src/tests/distributor/throttlingoperationstartertest.cpp51
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe.cpp7
-rw-r--r--storage/src/vespa/storage/distributor/maintenance/maintenancescheduler.cpp21
-rw-r--r--storage/src/vespa/storage/distributor/maintenance/maintenancescheduler.h3
-rw-r--r--storage/src/vespa/storage/distributor/maintenance/pending_window_checker.h19
-rw-r--r--storage/src/vespa/storage/distributor/throttlingoperationstarter.cpp13
-rw-r--r--storage/src/vespa/storage/distributor/throttlingoperationstarter.h12
9 files changed, 150 insertions, 72 deletions
diff --git a/storage/src/tests/distributor/maintenancemocks.h b/storage/src/tests/distributor/maintenancemocks.h
index f708fbd41aa..2809a81f79b 100644
--- a/storage/src/tests/distributor/maintenancemocks.h
+++ b/storage/src/tests/distributor/maintenancemocks.h
@@ -4,6 +4,7 @@
#include <vespa/document/test/make_bucket_space.h>
#include <vespa/storage/distributor/maintenance/maintenanceprioritygenerator.h>
#include <vespa/storage/distributor/maintenance/maintenanceoperationgenerator.h>
+#include <vespa/storage/distributor/maintenance/pending_window_checker.h>
#include <vespa/storage/distributor/operationstarter.h>
#include <vespa/storage/distributor/operations/operation.h>
#include <vespa/storageframework/defaultimplementation/clock/fakeclock.h>
@@ -122,4 +123,16 @@ public:
}
};
+class MockPendingWindowChecker : public PendingWindowChecker {
+ bool _allow = true;
+public:
+ void allow_operations(bool allow) noexcept {
+ _allow = allow;
+ }
+
+ bool may_allow_operation_with_priority(OperationStarter::Priority) const noexcept override {
+ return _allow;
+ }
+};
+
}
diff --git a/storage/src/tests/distributor/maintenanceschedulertest.cpp b/storage/src/tests/distributor/maintenanceschedulertest.cpp
index a97ffeef24b..8f13e10fedc 100644
--- a/storage/src/tests/distributor/maintenanceschedulertest.cpp
+++ b/storage/src/tests/distributor/maintenanceschedulertest.cpp
@@ -18,60 +18,77 @@ using Priority = MaintenancePriority;
using WaitTimeMs = MaintenanceScheduler::WaitTimeMs;
struct MaintenanceSchedulerTest : Test {
- std::unique_ptr<SimpleBucketPriorityDatabase> _priorityDb;
- std::unique_ptr<MockMaintenanceOperationGenerator> _operationGenerator;
- std::unique_ptr<MockOperationStarter> _operationStarter;
- std::unique_ptr<MaintenanceScheduler> _scheduler;
+ SimpleBucketPriorityDatabase _priority_db;
+ MockMaintenanceOperationGenerator _operation_generator;
+ MockOperationStarter _operation_starter;
+ MockPendingWindowChecker _pending_window_checker;
+ MaintenanceScheduler _scheduler;
- void SetUp() override;
+ MaintenanceSchedulerTest()
+ : _priority_db(),
+ _operation_generator(),
+ _operation_starter(),
+ _pending_window_checker(),
+ _scheduler(_operation_generator, _priority_db, _pending_window_checker, _operation_starter)
+ {}
};
-void
-MaintenanceSchedulerTest::SetUp()
-{
- _priorityDb = std::make_unique<SimpleBucketPriorityDatabase>();
- _operationGenerator = std::make_unique<MockMaintenanceOperationGenerator>();
- _operationStarter = std::make_unique<MockOperationStarter>();
- _scheduler = std::make_unique<MaintenanceScheduler>(*_operationGenerator, *_priorityDb, *_operationStarter);
-}
-
TEST_F(MaintenanceSchedulerTest, priority_cleared_after_scheduled) {
- _priorityDb->setPriority(PrioritizedBucket(makeDocumentBucket(BucketId(16, 1)), Priority::HIGHEST));
- _scheduler->tick(MaintenanceScheduler::NORMAL_SCHEDULING_MODE);
- EXPECT_EQ("", _priorityDb->toString());
+ _priority_db.setPriority(PrioritizedBucket(makeDocumentBucket(BucketId(16, 1)), Priority::HIGHEST));
+ _scheduler.tick(MaintenanceScheduler::NORMAL_SCHEDULING_MODE);
+ EXPECT_EQ("", _priority_db.toString());
}
TEST_F(MaintenanceSchedulerTest, operation_is_scheduled) {
- _priorityDb->setPriority(PrioritizedBucket(makeDocumentBucket(BucketId(16, 1)), Priority::MEDIUM));
- _scheduler->tick(MaintenanceScheduler::NORMAL_SCHEDULING_MODE);
+ _priority_db.setPriority(PrioritizedBucket(makeDocumentBucket(BucketId(16, 1)), Priority::MEDIUM));
+ _scheduler.tick(MaintenanceScheduler::NORMAL_SCHEDULING_MODE);
EXPECT_EQ("Bucket(BucketSpace(0x0000000000000001), BucketId(0x4000000000000001)), pri 100\n",
- _operationStarter->toString());
+ _operation_starter.toString());
+}
+
+TEST_F(MaintenanceSchedulerTest, operation_is_not_scheduled_if_pending_ops_not_accepted) {
+ _priority_db.setPriority(PrioritizedBucket(makeDocumentBucket(BucketId(16, 1)), Priority::MEDIUM));
+ _pending_window_checker.allow_operations(false);
+ _scheduler.tick(MaintenanceScheduler::NORMAL_SCHEDULING_MODE);
+ EXPECT_EQ("", _operation_starter.toString());
+ // Priority DB entry is not cleared
+ EXPECT_EQ("PrioritizedBucket(Bucket(BucketSpace(0x0000000000000001), BucketId(0x4000000000000001)), pri MEDIUM)\n",
+ _priority_db.toString());
}
TEST_F(MaintenanceSchedulerTest, no_operations_to_schedule) {
- WaitTimeMs waitMs(_scheduler->tick(MaintenanceScheduler::NORMAL_SCHEDULING_MODE));
+ WaitTimeMs waitMs(_scheduler.tick(MaintenanceScheduler::NORMAL_SCHEDULING_MODE));
EXPECT_EQ(WaitTimeMs(1), waitMs);
- EXPECT_EQ("", _operationStarter->toString());
+ EXPECT_EQ("", _operation_starter.toString());
}
TEST_F(MaintenanceSchedulerTest, suppress_low_priorities_in_emergency_mode) {
- _priorityDb->setPriority(PrioritizedBucket(makeDocumentBucket(BucketId(16, 1)), Priority::VERY_HIGH));
- _priorityDb->setPriority(PrioritizedBucket(makeDocumentBucket(BucketId(16, 2)), Priority::HIGHEST));
- EXPECT_EQ(WaitTimeMs(0), _scheduler->tick(MaintenanceScheduler::RECOVERY_SCHEDULING_MODE));
- EXPECT_EQ(WaitTimeMs(1), _scheduler->tick(MaintenanceScheduler::RECOVERY_SCHEDULING_MODE));
+ _priority_db.setPriority(PrioritizedBucket(makeDocumentBucket(BucketId(16, 1)), Priority::VERY_HIGH));
+ _priority_db.setPriority(PrioritizedBucket(makeDocumentBucket(BucketId(16, 2)), Priority::HIGHEST));
+ EXPECT_EQ(WaitTimeMs(0), _scheduler.tick(MaintenanceScheduler::RECOVERY_SCHEDULING_MODE));
+ EXPECT_EQ(WaitTimeMs(1), _scheduler.tick(MaintenanceScheduler::RECOVERY_SCHEDULING_MODE));
EXPECT_EQ("Bucket(BucketSpace(0x0000000000000001), BucketId(0x4000000000000002)), pri 0\n",
- _operationStarter->toString());
+ _operation_starter.toString());
EXPECT_EQ("PrioritizedBucket(Bucket(BucketSpace(0x0000000000000001), BucketId(0x4000000000000001)), pri VERY_HIGH)\n",
- _priorityDb->toString());
+ _priority_db.toString());
+}
+
+TEST_F(MaintenanceSchedulerTest, priority_cleared_if_operation_not_started_inside_pending_window) {
+ _priority_db.setPriority(PrioritizedBucket(makeDocumentBucket(BucketId(16, 1)), Priority::HIGH));
+ _operation_starter.setShouldStartOperations(false);
+ WaitTimeMs waitMs(_scheduler.tick(MaintenanceScheduler::NORMAL_SCHEDULING_MODE));
+ EXPECT_EQ(WaitTimeMs(1), waitMs);
+ EXPECT_EQ("", _priority_db.toString());
}
-TEST_F(MaintenanceSchedulerTest, priority_not_cleared_if_operation_not_started) {
- _priorityDb->setPriority(PrioritizedBucket(makeDocumentBucket(BucketId(16, 1)), Priority::HIGH));
- _operationStarter->setShouldStartOperations(false);
- WaitTimeMs waitMs(_scheduler->tick(MaintenanceScheduler::NORMAL_SCHEDULING_MODE));
+TEST_F(MaintenanceSchedulerTest, priority_not_cleared_if_operation_not_started_outside_pending_window) {
+ _priority_db.setPriority(PrioritizedBucket(makeDocumentBucket(BucketId(16, 1)), Priority::HIGH));
+ _operation_starter.setShouldStartOperations(false);
+ _pending_window_checker.allow_operations(false);
+ WaitTimeMs waitMs(_scheduler.tick(MaintenanceScheduler::NORMAL_SCHEDULING_MODE));
EXPECT_EQ(WaitTimeMs(1), waitMs);
EXPECT_EQ("PrioritizedBucket(Bucket(BucketSpace(0x0000000000000001), BucketId(0x4000000000000001)), pri HIGH)\n",
- _priorityDb->toString());
+ _priority_db.toString());
}
}
diff --git a/storage/src/tests/distributor/throttlingoperationstartertest.cpp b/storage/src/tests/distributor/throttlingoperationstartertest.cpp
index eac345a243f..4e99388327b 100644
--- a/storage/src/tests/distributor/throttlingoperationstartertest.cpp
+++ b/storage/src/tests/distributor/throttlingoperationstartertest.cpp
@@ -21,7 +21,7 @@ const MockOperation& as_mock_operation(const Operation& operation) {
struct ThrottlingOperationStarterTest : Test {
std::shared_ptr<Operation> createMockOperation() {
- return std::shared_ptr<Operation>(new MockOperation(makeDocumentBucket(BucketId(16, 1))));
+ return std::make_shared<MockOperation>(makeDocumentBucket(BucketId(16, 1)));
}
std::unique_ptr<MockOperationStarter> _starterImpl;
@@ -48,14 +48,12 @@ ThrottlingOperationStarterTest::TearDown()
TEST_F(ThrottlingOperationStarterTest, operation_not_throttled_when_slot_available) {
auto operation = createMockOperation();
- EXPECT_TRUE(_operationStarter->start(operation,
- OperationStarter::Priority(0)));
+ EXPECT_TRUE(_operationStarter->start(operation, OperationStarter::Priority(0)));
EXPECT_FALSE(as_mock_operation(*operation).get_was_throttled());
}
TEST_F(ThrottlingOperationStarterTest, operation_starting_is_forwarded_to_implementation) {
- ASSERT_TRUE(_operationStarter->start(createMockOperation(),
- OperationStarter::Priority(0)));
+ ASSERT_TRUE(_operationStarter->start(createMockOperation(), OperationStarter::Priority(0)));
EXPECT_EQ("Bucket(BucketSpace(0x0000000000000001), BucketId(0x4000000000000001)), pri 0\n",
_starterImpl->toString());
}
@@ -63,8 +61,8 @@ TEST_F(ThrottlingOperationStarterTest, operation_starting_is_forwarded_to_implem
TEST_F(ThrottlingOperationStarterTest, operation_throttled_when_no_available_slots) {
_operationStarter->setMaxPendingRange(0, 0);
auto operation = createMockOperation();
- EXPECT_FALSE(_operationStarter->start(operation,
- OperationStarter::Priority(0)));
+ EXPECT_FALSE(_operationStarter->may_allow_operation_with_priority(OperationStarter::Priority(0)));
+ EXPECT_FALSE(_operationStarter->start(operation, OperationStarter::Priority(0)));
EXPECT_TRUE(as_mock_operation(*operation).get_was_throttled());
}
@@ -88,32 +86,35 @@ TEST_F(ThrottlingOperationStarterTest, throttling_with_max_pending_range) {
TEST_F(ThrottlingOperationStarterTest, starting_operations_fills_up_pending_window) {
_operationStarter->setMaxPendingRange(1, 3);
- EXPECT_TRUE(_operationStarter->start(createMockOperation(),
- OperationStarter::Priority(255)));
- EXPECT_FALSE(_operationStarter->start(createMockOperation(),
- OperationStarter::Priority(255)));
- EXPECT_TRUE(_operationStarter->start(createMockOperation(),
- OperationStarter::Priority(100)));
- EXPECT_FALSE(_operationStarter->start(createMockOperation(),
- OperationStarter::Priority(100)));
- EXPECT_TRUE(_operationStarter->start(createMockOperation(),
- OperationStarter::Priority(0)));
- EXPECT_FALSE(_operationStarter->start(createMockOperation(),
- OperationStarter::Priority(0)));
+ EXPECT_TRUE(_operationStarter->may_allow_operation_with_priority(OperationStarter::Priority(255)));
+ EXPECT_TRUE(_operationStarter->start(createMockOperation(), OperationStarter::Priority(255)));
+
+ EXPECT_FALSE(_operationStarter->may_allow_operation_with_priority(OperationStarter::Priority(255)));
+ EXPECT_FALSE(_operationStarter->start(createMockOperation(), OperationStarter::Priority(255)));
+
+ EXPECT_TRUE(_operationStarter->may_allow_operation_with_priority(OperationStarter::Priority(100)));
+ EXPECT_TRUE(_operationStarter->start(createMockOperation(), OperationStarter::Priority(100)));
+
+ EXPECT_FALSE(_operationStarter->may_allow_operation_with_priority(OperationStarter::Priority(255)));
+ EXPECT_FALSE(_operationStarter->start(createMockOperation(), OperationStarter::Priority(100)));
+
+ EXPECT_TRUE(_operationStarter->may_allow_operation_with_priority(OperationStarter::Priority(0)));
+ EXPECT_TRUE(_operationStarter->start(createMockOperation(), OperationStarter::Priority(0)));
+
+ EXPECT_FALSE(_operationStarter->may_allow_operation_with_priority(OperationStarter::Priority(0)));
+ EXPECT_FALSE(_operationStarter->start(createMockOperation(), OperationStarter::Priority(0)));
}
TEST_F(ThrottlingOperationStarterTest, finishing_operations_allows_more_to_start) {
_operationStarter->setMaxPendingRange(1, 1);
- EXPECT_TRUE(_operationStarter->start(createMockOperation(),
- OperationStarter::Priority(255)));
- EXPECT_FALSE(_operationStarter->start(createMockOperation(),
- OperationStarter::Priority(255)));
+ EXPECT_TRUE(_operationStarter->start(createMockOperation(), OperationStarter::Priority(255)));
+ EXPECT_FALSE(_operationStarter->start(createMockOperation(), OperationStarter::Priority(255)));
EXPECT_FALSE(_starterImpl->getOperations().empty());
_starterImpl->getOperations().pop_back();
- EXPECT_TRUE(_operationStarter->start(createMockOperation(),
- OperationStarter::Priority(255)));
+ EXPECT_TRUE(_operationStarter->may_allow_operation_with_priority(OperationStarter::Priority(255)));
+ EXPECT_TRUE(_operationStarter->start(createMockOperation(), OperationStarter::Priority(255)));
EXPECT_FALSE(_starterImpl->getOperations().empty());
}
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.cpp b/storage/src/vespa/storage/distributor/distributor_stripe.cpp
index d87b2db1638..e45ef13f4ef 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe.cpp
+++ b/storage/src/vespa/storage/distributor/distributor_stripe.cpp
@@ -62,7 +62,8 @@ DistributorStripe::DistributorStripe(DistributorComponentRegister& compReg,
_throttlingStarter(std::make_unique<ThrottlingOperationStarter>(_maintenanceOperationOwner)),
_blockingStarter(std::make_unique<BlockingOperationStarter>(_component, *_operation_sequencer,
*_throttlingStarter)),
- _scheduler(std::make_unique<MaintenanceScheduler>(_idealStateManager, *_bucketPriorityDb, *_blockingStarter)),
+ _scheduler(std::make_unique<MaintenanceScheduler>(_idealStateManager, *_bucketPriorityDb,
+ *_throttlingStarter, *_blockingStarter)),
_schedulingMode(MaintenanceScheduler::NORMAL_SCHEDULING_MODE),
_recoveryTimeStarted(_component.getClock()),
_tickResult(framework::ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN),
@@ -709,7 +710,9 @@ DistributorStripe::doNonCriticalTick(framework::ThreadIndex)
// did any useful work with incoming data, this check must be performed _after_ the call.
if (!should_inhibit_current_maintenance_scan_tick()) {
scanNextBucket();
- startNextMaintenanceOperation();
+ if (!_bucketDBUpdater.hasPendingClusterState()) {
+ startNextMaintenanceOperation();
+ }
if (isInRecoveryMode()) {
signalWorkWasDone();
}
diff --git a/storage/src/vespa/storage/distributor/maintenance/maintenancescheduler.cpp b/storage/src/vespa/storage/distributor/maintenance/maintenancescheduler.cpp
index ee5e1278c50..d910c80d360 100644
--- a/storage/src/vespa/storage/distributor/maintenance/maintenancescheduler.cpp
+++ b/storage/src/vespa/storage/distributor/maintenance/maintenancescheduler.cpp
@@ -2,6 +2,7 @@
#include "maintenancescheduler.h"
#include "maintenanceoperationgenerator.h"
+#include "pending_window_checker.h"
#include <vespa/storage/distributor/operationstarter.h>
#include <vespa/storage/distributor/operations/idealstate/idealstateoperation.h>
@@ -13,9 +14,11 @@ namespace storage::distributor {
MaintenanceScheduler::MaintenanceScheduler(
MaintenanceOperationGenerator& operationGenerator,
BucketPriorityDatabase& priorityDb,
+ const PendingWindowChecker& pending_window_checker,
OperationStarter& operationStarter)
: _operationGenerator(operationGenerator),
_priorityDb(priorityDb),
+ _pending_window_checker(pending_window_checker),
_operationStarter(operationStarter)
{
}
@@ -23,7 +26,7 @@ MaintenanceScheduler::MaintenanceScheduler(
PrioritizedBucket
MaintenanceScheduler::getMostImportantBucket()
{
- BucketPriorityDatabase::const_iterator mostImportant(_priorityDb.begin());
+ auto mostImportant = _priorityDb.begin();
if (mostImportant == _priorityDb.end()) {
return PrioritizedBucket::INVALID;
}
@@ -38,10 +41,12 @@ MaintenanceScheduler::tick(SchedulingMode currentMode)
if (!possibleToSchedule(mostImportant, currentMode)) {
return WaitTimeMs(1);
}
+ // If we can't start the operation, move on to the next bucket. Bucket will be
+ // re-prioritized when the distributor stripe next scans it.
+ clearPriority(mostImportant);
if (!startOperation(mostImportant)) {
return WaitTimeMs(1);
}
- clearPriority(mostImportant);
return WaitTimeMs(0);
}
@@ -49,11 +54,17 @@ bool
MaintenanceScheduler::possibleToSchedule(const PrioritizedBucket& bucket,
SchedulingMode currentMode) const
{
+ if (!bucket.valid()) {
+ return false;
+ }
+ // If pending window is full nothing of equal or lower priority can be scheduled, so no point in trying.
+ if (!_pending_window_checker.may_allow_operation_with_priority(convertToOperationPriority(bucket.getPriority()))) {
+ return false;
+ }
if (currentMode == RECOVERY_SCHEDULING_MODE) {
- return (bucket.valid()
- && possibleToScheduleInEmergency(bucket));
+ return possibleToScheduleInEmergency(bucket);
} else {
- return bucket.valid();
+ return true;
}
}
diff --git a/storage/src/vespa/storage/distributor/maintenance/maintenancescheduler.h b/storage/src/vespa/storage/distributor/maintenance/maintenancescheduler.h
index 33eb97292bb..982a90d47d8 100644
--- a/storage/src/vespa/storage/distributor/maintenance/maintenancescheduler.h
+++ b/storage/src/vespa/storage/distributor/maintenance/maintenancescheduler.h
@@ -9,6 +9,7 @@ namespace storage::distributor {
class MaintenanceOperationGenerator;
class BucketPriorityDatabase;
+class PendingWindowChecker;
class MaintenanceScheduler
{
@@ -22,6 +23,7 @@ public:
MaintenanceScheduler(MaintenanceOperationGenerator& operationGenerator,
BucketPriorityDatabase& priorityDb,
+ const PendingWindowChecker& pending_window_checker,
OperationStarter& operationStarter);
WaitTimeMs tick(SchedulingMode currentMode);
@@ -40,6 +42,7 @@ private:
MaintenanceOperationGenerator& _operationGenerator;
BucketPriorityDatabase& _priorityDb;
+ const PendingWindowChecker& _pending_window_checker;
OperationStarter& _operationStarter;
};
diff --git a/storage/src/vespa/storage/distributor/maintenance/pending_window_checker.h b/storage/src/vespa/storage/distributor/maintenance/pending_window_checker.h
new file mode 100644
index 00000000000..8a333a3073c
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/maintenance/pending_window_checker.h
@@ -0,0 +1,19 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <vespa/storage/distributor/operationstarter.h>
+
+namespace storage::distributor {
+
+/**
+ * Allows for easily and cheaply checking if an operation with a given internal maintenance
+ * priority could possibly be started "downstream" due to there being available capacity
+ * in the maintenance pending window.
+ */
+class PendingWindowChecker {
+public:
+ virtual ~PendingWindowChecker() = default;
+ [[nodiscard]] virtual bool may_allow_operation_with_priority(OperationStarter::Priority) const noexcept = 0;
+};
+
+}
diff --git a/storage/src/vespa/storage/distributor/throttlingoperationstarter.cpp b/storage/src/vespa/storage/distributor/throttlingoperationstarter.cpp
index ccf8273e386..47404bb365f 100644
--- a/storage/src/vespa/storage/distributor/throttlingoperationstarter.cpp
+++ b/storage/src/vespa/storage/distributor/throttlingoperationstarter.cpp
@@ -10,6 +10,8 @@ ThrottlingOperationStarter::ThrottlingOperation::~ThrottlingOperation()
_operationStarter.signalOperationFinished(*this);
}
+ThrottlingOperationStarter::~ThrottlingOperationStarter() = default;
+
bool
ThrottlingOperationStarter::canStart(uint32_t currentOperationCount, Priority priority) const
{
@@ -20,10 +22,9 @@ ThrottlingOperationStarter::canStart(uint32_t currentOperationCount, Priority pr
}
bool
-ThrottlingOperationStarter::start(const std::shared_ptr<Operation>& operation,
- Priority priority)
+ThrottlingOperationStarter::start(const std::shared_ptr<Operation>& operation, Priority priority)
{
- if (!canStart(_pendingCount, priority)) {
+ if (!may_allow_operation_with_priority(priority)) {
operation->on_throttled();
return false;
}
@@ -32,6 +33,12 @@ ThrottlingOperationStarter::start(const std::shared_ptr<Operation>& operation,
return _starterImpl.start(wrappedOp, priority);
}
+bool
+ThrottlingOperationStarter::may_allow_operation_with_priority(Priority priority) const noexcept
+{
+ return canStart(_pendingCount, priority);
+}
+
void
ThrottlingOperationStarter::signalOperationFinished(const Operation& op)
{
diff --git a/storage/src/vespa/storage/distributor/throttlingoperationstarter.h b/storage/src/vespa/storage/distributor/throttlingoperationstarter.h
index e63fead8458..00d1237b2f7 100644
--- a/storage/src/vespa/storage/distributor/throttlingoperationstarter.h
+++ b/storage/src/vespa/storage/distributor/throttlingoperationstarter.h
@@ -2,23 +2,24 @@
#pragma once
#include "operationstarter.h"
+#include <vespa/storage/distributor/maintenance/pending_window_checker.h>
#include <vespa/vespalib/util/hdr_abort.h>
#include <vespa/storage/distributor/operations/operation.h>
namespace storage::distributor {
-class ThrottlingOperationStarter : public OperationStarter
+class ThrottlingOperationStarter : public OperationStarter, public PendingWindowChecker
{
class ThrottlingOperation : public Operation
{
public:
ThrottlingOperation(const Operation::SP& operation,
- ThrottlingOperationStarter& operationStarter)
+ ThrottlingOperationStarter& operationStarter)
: _operation(operation),
_operationStarter(operationStarter)
{}
- ~ThrottlingOperation();
+ ~ThrottlingOperation() override;
private:
Operation::SP _operation;
ThrottlingOperationStarter& _operationStarter;
@@ -53,7 +54,7 @@ class ThrottlingOperationStarter : public OperationStarter
HDR_ABORT("should not be reached");
}
void onReceive(DistributorStripeMessageSender&,
- const std::shared_ptr<api::StorageReply>&) override {
+ const std::shared_ptr<api::StorageReply>&) override {
HDR_ABORT("should not be reached");
}
};
@@ -66,9 +67,12 @@ public:
_maxPending(UINT32_MAX),
_pendingCount(0)
{}
+ ~ThrottlingOperationStarter() override;
bool start(const std::shared_ptr<Operation>& operation, Priority priority) override;
+ bool may_allow_operation_with_priority(Priority priority) const noexcept override;
+
bool canStart(uint32_t currentOperationCount, Priority priority) const;
void setMaxPendingRange(uint32_t minPending, uint32_t maxPending) {