summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--storage/src/tests/distributor/distributor_stripe_test.cpp21
-rw-r--r--storage/src/tests/distributor/maintenancemocks.h13
-rw-r--r--storage/src/tests/distributor/maintenanceschedulertest.cpp124
-rw-r--r--storage/src/tests/distributor/throttlingoperationstartertest.cpp51
-rw-r--r--storage/src/vespa/storage/config/distributorconfiguration.cpp2
-rw-r--r--storage/src/vespa/storage/config/distributorconfiguration.h4
-rw-r--r--storage/src/vespa/storage/config/stor-distributormanager.def6
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe.cpp8
-rw-r--r--storage/src/vespa/storage/distributor/maintenance/maintenancescheduler.cpp41
-rw-r--r--storage/src/vespa/storage/distributor/maintenance/maintenancescheduler.h19
-rw-r--r--storage/src/vespa/storage/distributor/maintenance/pending_window_checker.h19
-rw-r--r--storage/src/vespa/storage/distributor/throttlingoperationstarter.cpp13
-rw-r--r--storage/src/vespa/storage/distributor/throttlingoperationstarter.h12
13 files changed, 252 insertions, 81 deletions
diff --git a/storage/src/tests/distributor/distributor_stripe_test.cpp b/storage/src/tests/distributor/distributor_stripe_test.cpp
index 556f5e20fb2..902dd6454f1 100644
--- a/storage/src/tests/distributor/distributor_stripe_test.cpp
+++ b/storage/src/tests/distributor/distributor_stripe_test.cpp
@@ -179,6 +179,16 @@ struct DistributorStripeTest : Test, DistributorStripeTestUtil {
configure_stripe(builder);
}
+ void configure_implicitly_clear_priority_on_schedule(bool implicitly_clear) {
+ ConfigBuilder builder;
+ builder.implicitlyClearBucketPriorityOnSchedule = implicitly_clear;
+ configure_stripe(builder);
+ }
+
+ bool scheduler_has_implicitly_clear_priority_on_schedule_set() const noexcept {
+ return _stripe->_scheduler->implicitly_clear_priority_on_schedule();
+ }
+
void configureMaxClusterClockSkew(int seconds);
void configure_mutation_sequencing(bool enabled);
void configure_merge_busy_inhibit_duration(int seconds);
@@ -888,6 +898,17 @@ TEST_F(DistributorStripeTest, max_activation_inhibited_out_of_sync_groups_config
EXPECT_EQ(getConfig().max_activation_inhibited_out_of_sync_groups(), 0);
}
+TEST_F(DistributorStripeTest, implicitly_clear_priority_on_schedule_config_is_propagated_to_scheduler)
+{
+ setup_stripe(Redundancy(1), NodeCount(1), "distributor:1 storage:1");
+
+ configure_implicitly_clear_priority_on_schedule(true);
+ EXPECT_TRUE(scheduler_has_implicitly_clear_priority_on_schedule_set());
+
+ configure_implicitly_clear_priority_on_schedule(false);
+ EXPECT_FALSE(scheduler_has_implicitly_clear_priority_on_schedule_set());
+}
+
TEST_F(DistributorStripeTest, wanted_split_bit_count_is_lower_bounded)
{
setup_stripe(Redundancy(1), NodeCount(1), "distributor:1 storage:1");
diff --git a/storage/src/tests/distributor/maintenancemocks.h b/storage/src/tests/distributor/maintenancemocks.h
index f708fbd41aa..2809a81f79b 100644
--- a/storage/src/tests/distributor/maintenancemocks.h
+++ b/storage/src/tests/distributor/maintenancemocks.h
@@ -4,6 +4,7 @@
#include <vespa/document/test/make_bucket_space.h>
#include <vespa/storage/distributor/maintenance/maintenanceprioritygenerator.h>
#include <vespa/storage/distributor/maintenance/maintenanceoperationgenerator.h>
+#include <vespa/storage/distributor/maintenance/pending_window_checker.h>
#include <vespa/storage/distributor/operationstarter.h>
#include <vespa/storage/distributor/operations/operation.h>
#include <vespa/storageframework/defaultimplementation/clock/fakeclock.h>
@@ -122,4 +123,16 @@ public:
}
};
+class MockPendingWindowChecker : public PendingWindowChecker {
+ bool _allow = true;
+public:
+ void allow_operations(bool allow) noexcept {
+ _allow = allow;
+ }
+
+ bool may_allow_operation_with_priority(OperationStarter::Priority) const noexcept override {
+ return _allow;
+ }
+};
+
}
diff --git a/storage/src/tests/distributor/maintenanceschedulertest.cpp b/storage/src/tests/distributor/maintenanceschedulertest.cpp
index a97ffeef24b..44b842a2622 100644
--- a/storage/src/tests/distributor/maintenanceschedulertest.cpp
+++ b/storage/src/tests/distributor/maintenanceschedulertest.cpp
@@ -17,61 +17,111 @@ using document::BucketId;
using Priority = MaintenancePriority;
using WaitTimeMs = MaintenanceScheduler::WaitTimeMs;
-struct MaintenanceSchedulerTest : Test {
- std::unique_ptr<SimpleBucketPriorityDatabase> _priorityDb;
- std::unique_ptr<MockMaintenanceOperationGenerator> _operationGenerator;
- std::unique_ptr<MockOperationStarter> _operationStarter;
- std::unique_ptr<MaintenanceScheduler> _scheduler;
+struct MaintenanceSchedulerTest : TestWithParam<bool> {
+ SimpleBucketPriorityDatabase _priority_db;
+ MockMaintenanceOperationGenerator _operation_generator;
+ MockOperationStarter _operation_starter;
+ MockPendingWindowChecker _pending_window_checker;
+ MaintenanceScheduler _scheduler;
- void SetUp() override;
+ MaintenanceSchedulerTest()
+ : _priority_db(),
+ _operation_generator(),
+ _operation_starter(),
+ _pending_window_checker(),
+ _scheduler(_operation_generator, _priority_db, _pending_window_checker, _operation_starter)
+ {}
+
+ void SetUp() override {
+ _scheduler.set_implicitly_clear_priority_on_schedule(GetParam());
+ }
};
-void
-MaintenanceSchedulerTest::SetUp()
-{
- _priorityDb = std::make_unique<SimpleBucketPriorityDatabase>();
- _operationGenerator = std::make_unique<MockMaintenanceOperationGenerator>();
- _operationStarter = std::make_unique<MockOperationStarter>();
- _scheduler = std::make_unique<MaintenanceScheduler>(*_operationGenerator, *_priorityDb, *_operationStarter);
+TEST_P(MaintenanceSchedulerTest, priority_cleared_after_scheduled) {
+ _priority_db.setPriority(PrioritizedBucket(makeDocumentBucket(BucketId(16, 1)), Priority::HIGHEST));
+ _scheduler.tick(MaintenanceScheduler::NORMAL_SCHEDULING_MODE);
+ EXPECT_EQ("", _priority_db.toString());
}
-TEST_F(MaintenanceSchedulerTest, priority_cleared_after_scheduled) {
- _priorityDb->setPriority(PrioritizedBucket(makeDocumentBucket(BucketId(16, 1)), Priority::HIGHEST));
- _scheduler->tick(MaintenanceScheduler::NORMAL_SCHEDULING_MODE);
- EXPECT_EQ("", _priorityDb->toString());
+TEST_P(MaintenanceSchedulerTest, operation_is_scheduled) {
+ _priority_db.setPriority(PrioritizedBucket(makeDocumentBucket(BucketId(16, 1)), Priority::MEDIUM));
+ _scheduler.tick(MaintenanceScheduler::NORMAL_SCHEDULING_MODE);
+ EXPECT_EQ("Bucket(BucketSpace(0x0000000000000001), BucketId(0x4000000000000001)), pri 100\n",
+ _operation_starter.toString());
}
-TEST_F(MaintenanceSchedulerTest, operation_is_scheduled) {
- _priorityDb->setPriority(PrioritizedBucket(makeDocumentBucket(BucketId(16, 1)), Priority::MEDIUM));
- _scheduler->tick(MaintenanceScheduler::NORMAL_SCHEDULING_MODE);
- EXPECT_EQ("Bucket(BucketSpace(0x0000000000000001), BucketId(0x4000000000000001)), pri 100\n",
- _operationStarter->toString());
+TEST_P(MaintenanceSchedulerTest, operation_is_not_scheduled_if_pending_ops_not_accepted) {
+ if (!GetParam()) {
+ return; // Only works when implicit clearing is enabled
+ }
+ _priority_db.setPriority(PrioritizedBucket(makeDocumentBucket(BucketId(16, 1)), Priority::MEDIUM));
+ _pending_window_checker.allow_operations(false);
+ _scheduler.tick(MaintenanceScheduler::NORMAL_SCHEDULING_MODE);
+ EXPECT_EQ("", _operation_starter.toString());
+ // Priority DB entry is not cleared
+ EXPECT_EQ("PrioritizedBucket(Bucket(BucketSpace(0x0000000000000001), BucketId(0x4000000000000001)), pri MEDIUM)\n",
+ _priority_db.toString());
}
-TEST_F(MaintenanceSchedulerTest, no_operations_to_schedule) {
- WaitTimeMs waitMs(_scheduler->tick(MaintenanceScheduler::NORMAL_SCHEDULING_MODE));
+TEST_P(MaintenanceSchedulerTest, no_operations_to_schedule) {
+ WaitTimeMs waitMs(_scheduler.tick(MaintenanceScheduler::NORMAL_SCHEDULING_MODE));
EXPECT_EQ(WaitTimeMs(1), waitMs);
- EXPECT_EQ("", _operationStarter->toString());
+ EXPECT_EQ("", _operation_starter.toString());
}
-TEST_F(MaintenanceSchedulerTest, suppress_low_priorities_in_emergency_mode) {
- _priorityDb->setPriority(PrioritizedBucket(makeDocumentBucket(BucketId(16, 1)), Priority::VERY_HIGH));
- _priorityDb->setPriority(PrioritizedBucket(makeDocumentBucket(BucketId(16, 2)), Priority::HIGHEST));
- EXPECT_EQ(WaitTimeMs(0), _scheduler->tick(MaintenanceScheduler::RECOVERY_SCHEDULING_MODE));
- EXPECT_EQ(WaitTimeMs(1), _scheduler->tick(MaintenanceScheduler::RECOVERY_SCHEDULING_MODE));
+TEST_P(MaintenanceSchedulerTest, suppress_low_priorities_in_emergency_mode) {
+ _priority_db.setPriority(PrioritizedBucket(makeDocumentBucket(BucketId(16, 1)), Priority::VERY_HIGH));
+ _priority_db.setPriority(PrioritizedBucket(makeDocumentBucket(BucketId(16, 2)), Priority::HIGHEST));
+ EXPECT_EQ(WaitTimeMs(0), _scheduler.tick(MaintenanceScheduler::RECOVERY_SCHEDULING_MODE));
+ EXPECT_EQ(WaitTimeMs(1), _scheduler.tick(MaintenanceScheduler::RECOVERY_SCHEDULING_MODE));
EXPECT_EQ("Bucket(BucketSpace(0x0000000000000001), BucketId(0x4000000000000002)), pri 0\n",
- _operationStarter->toString());
+ _operation_starter.toString());
EXPECT_EQ("PrioritizedBucket(Bucket(BucketSpace(0x0000000000000001), BucketId(0x4000000000000001)), pri VERY_HIGH)\n",
- _priorityDb->toString());
+ _priority_db.toString());
+}
+
+TEST_P(MaintenanceSchedulerTest, priority_not_cleared_if_operation_not_started) {
+ if (GetParam()) {
+ return; // Only works when implicit clearing is NOT enabled
+ }
+ _priority_db.setPriority(PrioritizedBucket(makeDocumentBucket(BucketId(16, 1)), Priority::HIGH));
+ _operation_starter.setShouldStartOperations(false);
+ WaitTimeMs waitMs(_scheduler.tick(MaintenanceScheduler::NORMAL_SCHEDULING_MODE));
+ EXPECT_EQ(WaitTimeMs(1), waitMs);
+ EXPECT_EQ("PrioritizedBucket(Bucket(BucketSpace(0x0000000000000001), BucketId(0x4000000000000001)), pri HIGH)\n",
+ _priority_db.toString());
}
-TEST_F(MaintenanceSchedulerTest, priority_not_cleared_if_operation_not_started) {
- _priorityDb->setPriority(PrioritizedBucket(makeDocumentBucket(BucketId(16, 1)), Priority::HIGH));
- _operationStarter->setShouldStartOperations(false);
- WaitTimeMs waitMs(_scheduler->tick(MaintenanceScheduler::NORMAL_SCHEDULING_MODE));
+TEST_P(MaintenanceSchedulerTest, priority_cleared_if_operation_not_started_inside_pending_window) {
+ if (!GetParam()) {
+ return; // Only works when implicit clearing is enabled
+ }
+ _priority_db.setPriority(PrioritizedBucket(makeDocumentBucket(BucketId(16, 1)), Priority::HIGH));
+ _operation_starter.setShouldStartOperations(false);
+ WaitTimeMs waitMs(_scheduler.tick(MaintenanceScheduler::NORMAL_SCHEDULING_MODE));
+ EXPECT_EQ(WaitTimeMs(1), waitMs);
+ EXPECT_EQ("", _priority_db.toString());
+}
+
+TEST_P(MaintenanceSchedulerTest, priority_not_cleared_if_operation_not_started_inside_pending_window_for_highest_pri) {
+ _priority_db.setPriority(PrioritizedBucket(makeDocumentBucket(BucketId(16, 1)), Priority::HIGHEST));
+ _operation_starter.setShouldStartOperations(false);
+ WaitTimeMs waitMs(_scheduler.tick(MaintenanceScheduler::NORMAL_SCHEDULING_MODE));
+ EXPECT_EQ(WaitTimeMs(1), waitMs);
+ EXPECT_EQ("PrioritizedBucket(Bucket(BucketSpace(0x0000000000000001), BucketId(0x4000000000000001)), pri HIGHEST)\n",
+ _priority_db.toString());
+}
+
+TEST_P(MaintenanceSchedulerTest, priority_not_cleared_if_operation_not_started_outside_pending_window) {
+ _priority_db.setPriority(PrioritizedBucket(makeDocumentBucket(BucketId(16, 1)), Priority::HIGH));
+ _operation_starter.setShouldStartOperations(false);
+ _pending_window_checker.allow_operations(false);
+ WaitTimeMs waitMs(_scheduler.tick(MaintenanceScheduler::NORMAL_SCHEDULING_MODE));
EXPECT_EQ(WaitTimeMs(1), waitMs);
EXPECT_EQ("PrioritizedBucket(Bucket(BucketSpace(0x0000000000000001), BucketId(0x4000000000000001)), pri HIGH)\n",
- _priorityDb->toString());
+ _priority_db.toString());
}
+VESPA_GTEST_INSTANTIATE_TEST_SUITE_P(ImplicitClearOfDbPri, MaintenanceSchedulerTest, ::testing::Values(false, true));
+
}
diff --git a/storage/src/tests/distributor/throttlingoperationstartertest.cpp b/storage/src/tests/distributor/throttlingoperationstartertest.cpp
index eac345a243f..4e99388327b 100644
--- a/storage/src/tests/distributor/throttlingoperationstartertest.cpp
+++ b/storage/src/tests/distributor/throttlingoperationstartertest.cpp
@@ -21,7 +21,7 @@ const MockOperation& as_mock_operation(const Operation& operation) {
struct ThrottlingOperationStarterTest : Test {
std::shared_ptr<Operation> createMockOperation() {
- return std::shared_ptr<Operation>(new MockOperation(makeDocumentBucket(BucketId(16, 1))));
+ return std::make_shared<MockOperation>(makeDocumentBucket(BucketId(16, 1)));
}
std::unique_ptr<MockOperationStarter> _starterImpl;
@@ -48,14 +48,12 @@ ThrottlingOperationStarterTest::TearDown()
TEST_F(ThrottlingOperationStarterTest, operation_not_throttled_when_slot_available) {
auto operation = createMockOperation();
- EXPECT_TRUE(_operationStarter->start(operation,
- OperationStarter::Priority(0)));
+ EXPECT_TRUE(_operationStarter->start(operation, OperationStarter::Priority(0)));
EXPECT_FALSE(as_mock_operation(*operation).get_was_throttled());
}
TEST_F(ThrottlingOperationStarterTest, operation_starting_is_forwarded_to_implementation) {
- ASSERT_TRUE(_operationStarter->start(createMockOperation(),
- OperationStarter::Priority(0)));
+ ASSERT_TRUE(_operationStarter->start(createMockOperation(), OperationStarter::Priority(0)));
EXPECT_EQ("Bucket(BucketSpace(0x0000000000000001), BucketId(0x4000000000000001)), pri 0\n",
_starterImpl->toString());
}
@@ -63,8 +61,8 @@ TEST_F(ThrottlingOperationStarterTest, operation_starting_is_forwarded_to_implem
TEST_F(ThrottlingOperationStarterTest, operation_throttled_when_no_available_slots) {
_operationStarter->setMaxPendingRange(0, 0);
auto operation = createMockOperation();
- EXPECT_FALSE(_operationStarter->start(operation,
- OperationStarter::Priority(0)));
+ EXPECT_FALSE(_operationStarter->may_allow_operation_with_priority(OperationStarter::Priority(0)));
+ EXPECT_FALSE(_operationStarter->start(operation, OperationStarter::Priority(0)));
EXPECT_TRUE(as_mock_operation(*operation).get_was_throttled());
}
@@ -88,32 +86,35 @@ TEST_F(ThrottlingOperationStarterTest, throttling_with_max_pending_range) {
TEST_F(ThrottlingOperationStarterTest, starting_operations_fills_up_pending_window) {
_operationStarter->setMaxPendingRange(1, 3);
- EXPECT_TRUE(_operationStarter->start(createMockOperation(),
- OperationStarter::Priority(255)));
- EXPECT_FALSE(_operationStarter->start(createMockOperation(),
- OperationStarter::Priority(255)));
- EXPECT_TRUE(_operationStarter->start(createMockOperation(),
- OperationStarter::Priority(100)));
- EXPECT_FALSE(_operationStarter->start(createMockOperation(),
- OperationStarter::Priority(100)));
- EXPECT_TRUE(_operationStarter->start(createMockOperation(),
- OperationStarter::Priority(0)));
- EXPECT_FALSE(_operationStarter->start(createMockOperation(),
- OperationStarter::Priority(0)));
+ EXPECT_TRUE(_operationStarter->may_allow_operation_with_priority(OperationStarter::Priority(255)));
+ EXPECT_TRUE(_operationStarter->start(createMockOperation(), OperationStarter::Priority(255)));
+
+ EXPECT_FALSE(_operationStarter->may_allow_operation_with_priority(OperationStarter::Priority(255)));
+ EXPECT_FALSE(_operationStarter->start(createMockOperation(), OperationStarter::Priority(255)));
+
+ EXPECT_TRUE(_operationStarter->may_allow_operation_with_priority(OperationStarter::Priority(100)));
+ EXPECT_TRUE(_operationStarter->start(createMockOperation(), OperationStarter::Priority(100)));
+
+ EXPECT_FALSE(_operationStarter->may_allow_operation_with_priority(OperationStarter::Priority(255)));
+ EXPECT_FALSE(_operationStarter->start(createMockOperation(), OperationStarter::Priority(100)));
+
+ EXPECT_TRUE(_operationStarter->may_allow_operation_with_priority(OperationStarter::Priority(0)));
+ EXPECT_TRUE(_operationStarter->start(createMockOperation(), OperationStarter::Priority(0)));
+
+ EXPECT_FALSE(_operationStarter->may_allow_operation_with_priority(OperationStarter::Priority(0)));
+ EXPECT_FALSE(_operationStarter->start(createMockOperation(), OperationStarter::Priority(0)));
}
TEST_F(ThrottlingOperationStarterTest, finishing_operations_allows_more_to_start) {
_operationStarter->setMaxPendingRange(1, 1);
- EXPECT_TRUE(_operationStarter->start(createMockOperation(),
- OperationStarter::Priority(255)));
- EXPECT_FALSE(_operationStarter->start(createMockOperation(),
- OperationStarter::Priority(255)));
+ EXPECT_TRUE(_operationStarter->start(createMockOperation(), OperationStarter::Priority(255)));
+ EXPECT_FALSE(_operationStarter->start(createMockOperation(), OperationStarter::Priority(255)));
EXPECT_FALSE(_starterImpl->getOperations().empty());
_starterImpl->getOperations().pop_back();
- EXPECT_TRUE(_operationStarter->start(createMockOperation(),
- OperationStarter::Priority(255)));
+ EXPECT_TRUE(_operationStarter->may_allow_operation_with_priority(OperationStarter::Priority(255)));
+ EXPECT_TRUE(_operationStarter->start(createMockOperation(), OperationStarter::Priority(255)));
EXPECT_FALSE(_starterImpl->getOperations().empty());
}
diff --git a/storage/src/vespa/storage/config/distributorconfiguration.cpp b/storage/src/vespa/storage/config/distributorconfiguration.cpp
index 6de3f5a4698..a23d00ee6a3 100644
--- a/storage/src/vespa/storage/config/distributorconfiguration.cpp
+++ b/storage/src/vespa/storage/config/distributorconfiguration.cpp
@@ -49,6 +49,7 @@ DistributorConfiguration::DistributorConfiguration(StorageComponent& component)
_enable_metadata_only_fetch_phase_for_inconsistent_updates(false),
_prioritize_global_bucket_merges(true),
_enable_revert(true),
+ _implicitly_clear_priority_on_schedule(false),
_minimumReplicaCountingMode(ReplicaCountingMode::TRUSTED)
{
}
@@ -169,6 +170,7 @@ DistributorConfiguration::configure(const vespa::config::content::core::StorDist
_prioritize_global_bucket_merges = config.prioritizeGlobalBucketMerges;
_max_activation_inhibited_out_of_sync_groups = config.maxActivationInhibitedOutOfSyncGroups;
_enable_revert = config.enableRevert;
+ _implicitly_clear_priority_on_schedule = config.implicitlyClearBucketPriorityOnSchedule;
_minimumReplicaCountingMode = config.minimumReplicaCountingMode;
diff --git a/storage/src/vespa/storage/config/distributorconfiguration.h b/storage/src/vespa/storage/config/distributorconfiguration.h
index e50b2cf7771..7b4e082d1ed 100644
--- a/storage/src/vespa/storage/config/distributorconfiguration.h
+++ b/storage/src/vespa/storage/config/distributorconfiguration.h
@@ -264,6 +264,9 @@ public:
bool enable_revert() const noexcept {
return _enable_revert;
}
+ [[nodiscard]] bool implicitly_clear_priority_on_schedule() const noexcept {
+ return _implicitly_clear_priority_on_schedule;
+ }
uint32_t num_distributor_stripes() const noexcept { return _num_distributor_stripes; }
@@ -320,6 +323,7 @@ private:
bool _enable_metadata_only_fetch_phase_for_inconsistent_updates;
bool _prioritize_global_bucket_merges;
bool _enable_revert;
+ bool _implicitly_clear_priority_on_schedule;
DistrConfig::MinimumReplicaCountingMode _minimumReplicaCountingMode;
diff --git a/storage/src/vespa/storage/config/stor-distributormanager.def b/storage/src/vespa/storage/config/stor-distributormanager.def
index 3b324d6ddd2..8a9fdf74802 100644
--- a/storage/src/vespa/storage/config/stor-distributormanager.def
+++ b/storage/src/vespa/storage/config/stor-distributormanager.def
@@ -280,3 +280,9 @@ max_activation_inhibited_out_of_sync_groups int default=0
## CPU cores available. If > 0, the number of stripes is explicitly overridden.
## Stripe counts must be a power of two.
num_distributor_stripes int default=0 restart
+
+## If set, the maintenance scheduler will implicitly clear entries from its internal
+## bucket maintenance priority database even when no operation can be started for the
+## bucket due to being blocked by concurrent operations. This avoids potential head-of-line
+## blocking of later buckets in the priority database.
+implicitly_clear_bucket_priority_on_schedule bool default=false
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.cpp b/storage/src/vespa/storage/distributor/distributor_stripe.cpp
index d87b2db1638..18a19288ad3 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe.cpp
+++ b/storage/src/vespa/storage/distributor/distributor_stripe.cpp
@@ -62,7 +62,8 @@ DistributorStripe::DistributorStripe(DistributorComponentRegister& compReg,
_throttlingStarter(std::make_unique<ThrottlingOperationStarter>(_maintenanceOperationOwner)),
_blockingStarter(std::make_unique<BlockingOperationStarter>(_component, *_operation_sequencer,
*_throttlingStarter)),
- _scheduler(std::make_unique<MaintenanceScheduler>(_idealStateManager, *_bucketPriorityDb, *_blockingStarter)),
+ _scheduler(std::make_unique<MaintenanceScheduler>(_idealStateManager, *_bucketPriorityDb,
+ *_throttlingStarter, *_blockingStarter)),
_schedulingMode(MaintenanceScheduler::NORMAL_SCHEDULING_MODE),
_recoveryTimeStarted(_component.getClock()),
_tickResult(framework::ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN),
@@ -709,7 +710,9 @@ DistributorStripe::doNonCriticalTick(framework::ThreadIndex)
// did any useful work with incoming data, this check must be performed _after_ the call.
if (!should_inhibit_current_maintenance_scan_tick()) {
scanNextBucket();
- startNextMaintenanceOperation();
+ if (!_bucketDBUpdater.hasPendingClusterState()) {
+ startNextMaintenanceOperation();
+ }
if (isInRecoveryMode()) {
signalWorkWasDone();
}
@@ -757,6 +760,7 @@ DistributorStripe::propagate_config_snapshot_to_internal_components()
getConfig().allowStaleReadsDuringClusterStateTransitions());
_externalOperationHandler.set_use_weak_internal_read_consistency_for_gets(
getConfig().use_weak_internal_read_consistency_for_client_gets());
+ _scheduler->set_implicitly_clear_priority_on_schedule(getConfig().implicitly_clear_priority_on_schedule());
}
void
diff --git a/storage/src/vespa/storage/distributor/maintenance/maintenancescheduler.cpp b/storage/src/vespa/storage/distributor/maintenance/maintenancescheduler.cpp
index ee5e1278c50..c2fef4f781f 100644
--- a/storage/src/vespa/storage/distributor/maintenance/maintenancescheduler.cpp
+++ b/storage/src/vespa/storage/distributor/maintenance/maintenancescheduler.cpp
@@ -2,6 +2,7 @@
#include "maintenancescheduler.h"
#include "maintenanceoperationgenerator.h"
+#include "pending_window_checker.h"
#include <vespa/storage/distributor/operationstarter.h>
#include <vespa/storage/distributor/operations/idealstate/idealstateoperation.h>
@@ -13,17 +14,20 @@ namespace storage::distributor {
MaintenanceScheduler::MaintenanceScheduler(
MaintenanceOperationGenerator& operationGenerator,
BucketPriorityDatabase& priorityDb,
+ const PendingWindowChecker& pending_window_checker,
OperationStarter& operationStarter)
: _operationGenerator(operationGenerator),
_priorityDb(priorityDb),
- _operationStarter(operationStarter)
+ _pending_window_checker(pending_window_checker),
+ _operationStarter(operationStarter),
+ _implicitly_clear_priority_on_schedule(false)
{
}
PrioritizedBucket
MaintenanceScheduler::getMostImportantBucket()
{
- BucketPriorityDatabase::const_iterator mostImportant(_priorityDb.begin());
+ auto mostImportant = _priorityDb.begin();
if (mostImportant == _priorityDb.end()) {
return PrioritizedBucket::INVALID;
}
@@ -38,10 +42,21 @@ MaintenanceScheduler::tick(SchedulingMode currentMode)
if (!possibleToSchedule(mostImportant, currentMode)) {
return WaitTimeMs(1);
}
+ // Bucket activations are so important to do ASAP that we _want_ to block further
+ // maintenance scheduling until we're able to schedule the next possible bucket.
+ // The inverse is the case for other maintenance operations.
+ const bool is_activation = has_bucket_activation_priority(mostImportant);
+ if (_implicitly_clear_priority_on_schedule && !is_activation) {
+ // If we can't start the operation, move on to the next bucket. Bucket will be
+ // re-prioritized when the distributor stripe next scans it.
+ clearPriority(mostImportant);
+ }
if (!startOperation(mostImportant)) {
return WaitTimeMs(1);
}
- clearPriority(mostImportant);
+ if (!_implicitly_clear_priority_on_schedule || is_activation) {
+ clearPriority(mostImportant);
+ }
return WaitTimeMs(0);
}
@@ -49,11 +64,19 @@ bool
MaintenanceScheduler::possibleToSchedule(const PrioritizedBucket& bucket,
SchedulingMode currentMode) const
{
+ if (!bucket.valid()) {
+ return false;
+ }
+ // If pending window is full nothing of equal or lower priority can be scheduled, so no point in trying.
+ if (_implicitly_clear_priority_on_schedule &&
+ !_pending_window_checker.may_allow_operation_with_priority(convertToOperationPriority(bucket.getPriority())))
+ {
+ return false;
+ }
if (currentMode == RECOVERY_SCHEDULING_MODE) {
- return (bucket.valid()
- && possibleToScheduleInEmergency(bucket));
+ return possibleToScheduleInEmergency(bucket);
} else {
- return bucket.valid();
+ return true;
}
}
@@ -104,4 +127,10 @@ MaintenanceScheduler::startOperation(const PrioritizedBucket& bucket)
return _operationStarter.start(operation, operationPriority);
}
+bool
+MaintenanceScheduler::has_bucket_activation_priority(const PrioritizedBucket& bucket) const noexcept
+{
+ return (bucket.getPriority() == MaintenancePriority::HIGHEST);
+}
+
}
diff --git a/storage/src/vespa/storage/distributor/maintenance/maintenancescheduler.h b/storage/src/vespa/storage/distributor/maintenance/maintenancescheduler.h
index 33eb97292bb..115bf12a64f 100644
--- a/storage/src/vespa/storage/distributor/maintenance/maintenancescheduler.h
+++ b/storage/src/vespa/storage/distributor/maintenance/maintenancescheduler.h
@@ -9,6 +9,7 @@ namespace storage::distributor {
class MaintenanceOperationGenerator;
class BucketPriorityDatabase;
+class PendingWindowChecker;
class MaintenanceScheduler
{
@@ -22,10 +23,18 @@ public:
MaintenanceScheduler(MaintenanceOperationGenerator& operationGenerator,
BucketPriorityDatabase& priorityDb,
+ const PendingWindowChecker& pending_window_checker,
OperationStarter& operationStarter);
WaitTimeMs tick(SchedulingMode currentMode);
+ void set_implicitly_clear_priority_on_schedule(bool implicitly_clear) noexcept {
+ _implicitly_clear_priority_on_schedule = implicitly_clear;
+ }
+ [[nodiscard]] bool implicitly_clear_priority_on_schedule() const noexcept {
+ return _implicitly_clear_priority_on_schedule;
+ }
+
private:
MaintenanceScheduler(const MaintenanceScheduler&);
MaintenanceScheduler& operator=(const MaintenanceScheduler&);
@@ -35,12 +44,14 @@ private:
bool possibleToScheduleInEmergency(const PrioritizedBucket& bucket) const;
void clearPriority(const PrioritizedBucket& bucket);
bool startOperation(const PrioritizedBucket& bucket);
- OperationStarter::Priority convertToOperationPriority(
- MaintenancePriority::Priority priority) const;
+ OperationStarter::Priority convertToOperationPriority(MaintenancePriority::Priority priority) const;
+ bool has_bucket_activation_priority(const PrioritizedBucket&) const noexcept;
MaintenanceOperationGenerator& _operationGenerator;
- BucketPriorityDatabase& _priorityDb;
- OperationStarter& _operationStarter;
+ BucketPriorityDatabase& _priorityDb;
+ const PendingWindowChecker& _pending_window_checker;
+ OperationStarter& _operationStarter;
+ bool _implicitly_clear_priority_on_schedule;
};
}
diff --git a/storage/src/vespa/storage/distributor/maintenance/pending_window_checker.h b/storage/src/vespa/storage/distributor/maintenance/pending_window_checker.h
new file mode 100644
index 00000000000..8a333a3073c
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/maintenance/pending_window_checker.h
@@ -0,0 +1,19 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <vespa/storage/distributor/operationstarter.h>
+
+namespace storage::distributor {
+
+/**
+ * Allows for easily and cheaply checking if an operation with a given internal maintenance
+ * priority could possibly be started "downstream" due to there being available capacity
+ * in the maintenance pending window.
+ */
+class PendingWindowChecker {
+public:
+ virtual ~PendingWindowChecker() = default;
+ [[nodiscard]] virtual bool may_allow_operation_with_priority(OperationStarter::Priority) const noexcept = 0;
+};
+
+}
diff --git a/storage/src/vespa/storage/distributor/throttlingoperationstarter.cpp b/storage/src/vespa/storage/distributor/throttlingoperationstarter.cpp
index ccf8273e386..47404bb365f 100644
--- a/storage/src/vespa/storage/distributor/throttlingoperationstarter.cpp
+++ b/storage/src/vespa/storage/distributor/throttlingoperationstarter.cpp
@@ -10,6 +10,8 @@ ThrottlingOperationStarter::ThrottlingOperation::~ThrottlingOperation()
_operationStarter.signalOperationFinished(*this);
}
+ThrottlingOperationStarter::~ThrottlingOperationStarter() = default;
+
bool
ThrottlingOperationStarter::canStart(uint32_t currentOperationCount, Priority priority) const
{
@@ -20,10 +22,9 @@ ThrottlingOperationStarter::canStart(uint32_t currentOperationCount, Priority pr
}
bool
-ThrottlingOperationStarter::start(const std::shared_ptr<Operation>& operation,
- Priority priority)
+ThrottlingOperationStarter::start(const std::shared_ptr<Operation>& operation, Priority priority)
{
- if (!canStart(_pendingCount, priority)) {
+ if (!may_allow_operation_with_priority(priority)) {
operation->on_throttled();
return false;
}
@@ -32,6 +33,12 @@ ThrottlingOperationStarter::start(const std::shared_ptr<Operation>& operation,
return _starterImpl.start(wrappedOp, priority);
}
+bool
+ThrottlingOperationStarter::may_allow_operation_with_priority(Priority priority) const noexcept
+{
+ return canStart(_pendingCount, priority);
+}
+
void
ThrottlingOperationStarter::signalOperationFinished(const Operation& op)
{
diff --git a/storage/src/vespa/storage/distributor/throttlingoperationstarter.h b/storage/src/vespa/storage/distributor/throttlingoperationstarter.h
index e63fead8458..00d1237b2f7 100644
--- a/storage/src/vespa/storage/distributor/throttlingoperationstarter.h
+++ b/storage/src/vespa/storage/distributor/throttlingoperationstarter.h
@@ -2,23 +2,24 @@
#pragma once
#include "operationstarter.h"
+#include <vespa/storage/distributor/maintenance/pending_window_checker.h>
#include <vespa/vespalib/util/hdr_abort.h>
#include <vespa/storage/distributor/operations/operation.h>
namespace storage::distributor {
-class ThrottlingOperationStarter : public OperationStarter
+class ThrottlingOperationStarter : public OperationStarter, public PendingWindowChecker
{
class ThrottlingOperation : public Operation
{
public:
ThrottlingOperation(const Operation::SP& operation,
- ThrottlingOperationStarter& operationStarter)
+ ThrottlingOperationStarter& operationStarter)
: _operation(operation),
_operationStarter(operationStarter)
{}
- ~ThrottlingOperation();
+ ~ThrottlingOperation() override;
private:
Operation::SP _operation;
ThrottlingOperationStarter& _operationStarter;
@@ -53,7 +54,7 @@ class ThrottlingOperationStarter : public OperationStarter
HDR_ABORT("should not be reached");
}
void onReceive(DistributorStripeMessageSender&,
- const std::shared_ptr<api::StorageReply>&) override {
+ const std::shared_ptr<api::StorageReply>&) override {
HDR_ABORT("should not be reached");
}
};
@@ -66,9 +67,12 @@ public:
_maxPending(UINT32_MAX),
_pendingCount(0)
{}
+ ~ThrottlingOperationStarter() override;
bool start(const std::shared_ptr<Operation>& operation, Priority priority) override;
+ bool may_allow_operation_with_priority(Priority priority) const noexcept override;
+
bool canStart(uint32_t currentOperationCount, Priority priority) const;
void setMaxPendingRange(uint32_t minPending, uint32_t maxPending) {