diff options
13 files changed, 252 insertions, 81 deletions
diff --git a/storage/src/tests/distributor/distributor_stripe_test.cpp b/storage/src/tests/distributor/distributor_stripe_test.cpp index 556f5e20fb2..902dd6454f1 100644 --- a/storage/src/tests/distributor/distributor_stripe_test.cpp +++ b/storage/src/tests/distributor/distributor_stripe_test.cpp @@ -179,6 +179,16 @@ struct DistributorStripeTest : Test, DistributorStripeTestUtil { configure_stripe(builder); } + void configure_implicitly_clear_priority_on_schedule(bool implicitly_clear) { + ConfigBuilder builder; + builder.implicitlyClearBucketPriorityOnSchedule = implicitly_clear; + configure_stripe(builder); + } + + bool scheduler_has_implicitly_clear_priority_on_schedule_set() const noexcept { + return _stripe->_scheduler->implicitly_clear_priority_on_schedule(); + } + void configureMaxClusterClockSkew(int seconds); void configure_mutation_sequencing(bool enabled); void configure_merge_busy_inhibit_duration(int seconds); @@ -888,6 +898,17 @@ TEST_F(DistributorStripeTest, max_activation_inhibited_out_of_sync_groups_config EXPECT_EQ(getConfig().max_activation_inhibited_out_of_sync_groups(), 0); } +TEST_F(DistributorStripeTest, implicitly_clear_priority_on_schedule_config_is_propagated_to_scheduler) +{ + setup_stripe(Redundancy(1), NodeCount(1), "distributor:1 storage:1"); + + configure_implicitly_clear_priority_on_schedule(true); + EXPECT_TRUE(scheduler_has_implicitly_clear_priority_on_schedule_set()); + + configure_implicitly_clear_priority_on_schedule(false); + EXPECT_FALSE(scheduler_has_implicitly_clear_priority_on_schedule_set()); +} + TEST_F(DistributorStripeTest, wanted_split_bit_count_is_lower_bounded) { setup_stripe(Redundancy(1), NodeCount(1), "distributor:1 storage:1"); diff --git a/storage/src/tests/distributor/maintenancemocks.h b/storage/src/tests/distributor/maintenancemocks.h index f708fbd41aa..2809a81f79b 100644 --- a/storage/src/tests/distributor/maintenancemocks.h +++ b/storage/src/tests/distributor/maintenancemocks.h @@ -4,6 +4,7 @@ #include <vespa/document/test/make_bucket_space.h> #include <vespa/storage/distributor/maintenance/maintenanceprioritygenerator.h> #include <vespa/storage/distributor/maintenance/maintenanceoperationgenerator.h> +#include <vespa/storage/distributor/maintenance/pending_window_checker.h> #include <vespa/storage/distributor/operationstarter.h> #include <vespa/storage/distributor/operations/operation.h> #include <vespa/storageframework/defaultimplementation/clock/fakeclock.h> @@ -122,4 +123,16 @@ public: } }; +class MockPendingWindowChecker : public PendingWindowChecker { + bool _allow = true; +public: + void allow_operations(bool allow) noexcept { + _allow = allow; + } + + bool may_allow_operation_with_priority(OperationStarter::Priority) const noexcept override { + return _allow; + } +}; + } diff --git a/storage/src/tests/distributor/maintenanceschedulertest.cpp b/storage/src/tests/distributor/maintenanceschedulertest.cpp index a97ffeef24b..44b842a2622 100644 --- a/storage/src/tests/distributor/maintenanceschedulertest.cpp +++ b/storage/src/tests/distributor/maintenanceschedulertest.cpp @@ -17,61 +17,111 @@ using document::BucketId; using Priority = MaintenancePriority; using WaitTimeMs = MaintenanceScheduler::WaitTimeMs; -struct MaintenanceSchedulerTest : Test { - std::unique_ptr<SimpleBucketPriorityDatabase> _priorityDb; - std::unique_ptr<MockMaintenanceOperationGenerator> _operationGenerator; - std::unique_ptr<MockOperationStarter> _operationStarter; - std::unique_ptr<MaintenanceScheduler> _scheduler; +struct MaintenanceSchedulerTest : TestWithParam<bool> { + SimpleBucketPriorityDatabase _priority_db; + MockMaintenanceOperationGenerator _operation_generator; + MockOperationStarter _operation_starter; + MockPendingWindowChecker _pending_window_checker; + MaintenanceScheduler _scheduler; - void SetUp() override; + MaintenanceSchedulerTest() + : _priority_db(), + _operation_generator(), + _operation_starter(), + _pending_window_checker(), + _scheduler(_operation_generator, _priority_db, _pending_window_checker, _operation_starter) + {} + + void SetUp() override { + _scheduler.set_implicitly_clear_priority_on_schedule(GetParam()); + } }; -void -MaintenanceSchedulerTest::SetUp() -{ - _priorityDb = std::make_unique<SimpleBucketPriorityDatabase>(); - _operationGenerator = std::make_unique<MockMaintenanceOperationGenerator>(); - _operationStarter = std::make_unique<MockOperationStarter>(); - _scheduler = std::make_unique<MaintenanceScheduler>(*_operationGenerator, *_priorityDb, *_operationStarter); +TEST_P(MaintenanceSchedulerTest, priority_cleared_after_scheduled) { + _priority_db.setPriority(PrioritizedBucket(makeDocumentBucket(BucketId(16, 1)), Priority::HIGHEST)); + _scheduler.tick(MaintenanceScheduler::NORMAL_SCHEDULING_MODE); + EXPECT_EQ("", _priority_db.toString()); } -TEST_F(MaintenanceSchedulerTest, priority_cleared_after_scheduled) { - _priorityDb->setPriority(PrioritizedBucket(makeDocumentBucket(BucketId(16, 1)), Priority::HIGHEST)); - _scheduler->tick(MaintenanceScheduler::NORMAL_SCHEDULING_MODE); - EXPECT_EQ("", _priorityDb->toString()); +TEST_P(MaintenanceSchedulerTest, operation_is_scheduled) { + _priority_db.setPriority(PrioritizedBucket(makeDocumentBucket(BucketId(16, 1)), Priority::MEDIUM)); + _scheduler.tick(MaintenanceScheduler::NORMAL_SCHEDULING_MODE); + EXPECT_EQ("Bucket(BucketSpace(0x0000000000000001), BucketId(0x4000000000000001)), pri 100\n", + _operation_starter.toString()); } -TEST_F(MaintenanceSchedulerTest, operation_is_scheduled) { - _priorityDb->setPriority(PrioritizedBucket(makeDocumentBucket(BucketId(16, 1)), Priority::MEDIUM)); - _scheduler->tick(MaintenanceScheduler::NORMAL_SCHEDULING_MODE); - EXPECT_EQ("Bucket(BucketSpace(0x0000000000000001), BucketId(0x4000000000000001)), pri 100\n", - _operationStarter->toString()); +TEST_P(MaintenanceSchedulerTest, operation_is_not_scheduled_if_pending_ops_not_accepted) { + if (!GetParam()) { + return; // Only works when implicit clearing is enabled + } + _priority_db.setPriority(PrioritizedBucket(makeDocumentBucket(BucketId(16, 1)), Priority::MEDIUM)); + _pending_window_checker.allow_operations(false); + _scheduler.tick(MaintenanceScheduler::NORMAL_SCHEDULING_MODE); + EXPECT_EQ("", _operation_starter.toString()); + // Priority DB entry is not cleared + EXPECT_EQ("PrioritizedBucket(Bucket(BucketSpace(0x0000000000000001), BucketId(0x4000000000000001)), pri MEDIUM)\n", + _priority_db.toString()); } -TEST_F(MaintenanceSchedulerTest, no_operations_to_schedule) { - WaitTimeMs waitMs(_scheduler->tick(MaintenanceScheduler::NORMAL_SCHEDULING_MODE)); +TEST_P(MaintenanceSchedulerTest, no_operations_to_schedule) { + WaitTimeMs waitMs(_scheduler.tick(MaintenanceScheduler::NORMAL_SCHEDULING_MODE)); EXPECT_EQ(WaitTimeMs(1), waitMs); - EXPECT_EQ("", _operationStarter->toString()); + EXPECT_EQ("", _operation_starter.toString()); } -TEST_F(MaintenanceSchedulerTest, suppress_low_priorities_in_emergency_mode) { - _priorityDb->setPriority(PrioritizedBucket(makeDocumentBucket(BucketId(16, 1)), Priority::VERY_HIGH)); - _priorityDb->setPriority(PrioritizedBucket(makeDocumentBucket(BucketId(16, 2)), Priority::HIGHEST)); - EXPECT_EQ(WaitTimeMs(0), _scheduler->tick(MaintenanceScheduler::RECOVERY_SCHEDULING_MODE)); - EXPECT_EQ(WaitTimeMs(1), _scheduler->tick(MaintenanceScheduler::RECOVERY_SCHEDULING_MODE)); +TEST_P(MaintenanceSchedulerTest, suppress_low_priorities_in_emergency_mode) { + _priority_db.setPriority(PrioritizedBucket(makeDocumentBucket(BucketId(16, 1)), Priority::VERY_HIGH)); + _priority_db.setPriority(PrioritizedBucket(makeDocumentBucket(BucketId(16, 2)), Priority::HIGHEST)); + EXPECT_EQ(WaitTimeMs(0), _scheduler.tick(MaintenanceScheduler::RECOVERY_SCHEDULING_MODE)); + EXPECT_EQ(WaitTimeMs(1), _scheduler.tick(MaintenanceScheduler::RECOVERY_SCHEDULING_MODE)); EXPECT_EQ("Bucket(BucketSpace(0x0000000000000001), BucketId(0x4000000000000002)), pri 0\n", - _operationStarter->toString()); + _operation_starter.toString()); EXPECT_EQ("PrioritizedBucket(Bucket(BucketSpace(0x0000000000000001), BucketId(0x4000000000000001)), pri VERY_HIGH)\n", - _priorityDb->toString()); + _priority_db.toString()); +} + +TEST_P(MaintenanceSchedulerTest, priority_not_cleared_if_operation_not_started) { + if (GetParam()) { + return; // Only works when implicit clearing is NOT enabled + } + _priority_db.setPriority(PrioritizedBucket(makeDocumentBucket(BucketId(16, 1)), Priority::HIGH)); + _operation_starter.setShouldStartOperations(false); + WaitTimeMs waitMs(_scheduler.tick(MaintenanceScheduler::NORMAL_SCHEDULING_MODE)); + EXPECT_EQ(WaitTimeMs(1), waitMs); + EXPECT_EQ("PrioritizedBucket(Bucket(BucketSpace(0x0000000000000001), BucketId(0x4000000000000001)), pri HIGH)\n", + _priority_db.toString()); } -TEST_F(MaintenanceSchedulerTest, priority_not_cleared_if_operation_not_started) { - _priorityDb->setPriority(PrioritizedBucket(makeDocumentBucket(BucketId(16, 1)), Priority::HIGH)); - _operationStarter->setShouldStartOperations(false); - WaitTimeMs waitMs(_scheduler->tick(MaintenanceScheduler::NORMAL_SCHEDULING_MODE)); +TEST_P(MaintenanceSchedulerTest, priority_cleared_if_operation_not_started_inside_pending_window) { + if (!GetParam()) { + return; // Only works when implicit clearing is enabled + } + _priority_db.setPriority(PrioritizedBucket(makeDocumentBucket(BucketId(16, 1)), Priority::HIGH)); + _operation_starter.setShouldStartOperations(false); + WaitTimeMs waitMs(_scheduler.tick(MaintenanceScheduler::NORMAL_SCHEDULING_MODE)); + EXPECT_EQ(WaitTimeMs(1), waitMs); + EXPECT_EQ("", _priority_db.toString()); +} + +TEST_P(MaintenanceSchedulerTest, priority_not_cleared_if_operation_not_started_inside_pending_window_for_highest_pri) { + _priority_db.setPriority(PrioritizedBucket(makeDocumentBucket(BucketId(16, 1)), Priority::HIGHEST)); + _operation_starter.setShouldStartOperations(false); + WaitTimeMs waitMs(_scheduler.tick(MaintenanceScheduler::NORMAL_SCHEDULING_MODE)); + EXPECT_EQ(WaitTimeMs(1), waitMs); + EXPECT_EQ("PrioritizedBucket(Bucket(BucketSpace(0x0000000000000001), BucketId(0x4000000000000001)), pri HIGHEST)\n", + _priority_db.toString()); +} + +TEST_P(MaintenanceSchedulerTest, priority_not_cleared_if_operation_not_started_outside_pending_window) { + _priority_db.setPriority(PrioritizedBucket(makeDocumentBucket(BucketId(16, 1)), Priority::HIGH)); + _operation_starter.setShouldStartOperations(false); + _pending_window_checker.allow_operations(false); + WaitTimeMs waitMs(_scheduler.tick(MaintenanceScheduler::NORMAL_SCHEDULING_MODE)); EXPECT_EQ(WaitTimeMs(1), waitMs); EXPECT_EQ("PrioritizedBucket(Bucket(BucketSpace(0x0000000000000001), BucketId(0x4000000000000001)), pri HIGH)\n", - _priorityDb->toString()); + _priority_db.toString()); } +VESPA_GTEST_INSTANTIATE_TEST_SUITE_P(ImplicitClearOfDbPri, MaintenanceSchedulerTest, ::testing::Values(false, true)); + } diff --git a/storage/src/tests/distributor/throttlingoperationstartertest.cpp b/storage/src/tests/distributor/throttlingoperationstartertest.cpp index eac345a243f..4e99388327b 100644 --- a/storage/src/tests/distributor/throttlingoperationstartertest.cpp +++ b/storage/src/tests/distributor/throttlingoperationstartertest.cpp @@ -21,7 +21,7 @@ const MockOperation& as_mock_operation(const Operation& operation) { struct ThrottlingOperationStarterTest : Test { std::shared_ptr<Operation> createMockOperation() { - return std::shared_ptr<Operation>(new MockOperation(makeDocumentBucket(BucketId(16, 1)))); + return std::make_shared<MockOperation>(makeDocumentBucket(BucketId(16, 1))); } std::unique_ptr<MockOperationStarter> _starterImpl; @@ -48,14 +48,12 @@ ThrottlingOperationStarterTest::TearDown() TEST_F(ThrottlingOperationStarterTest, operation_not_throttled_when_slot_available) { auto operation = createMockOperation(); - EXPECT_TRUE(_operationStarter->start(operation, - OperationStarter::Priority(0))); + EXPECT_TRUE(_operationStarter->start(operation, OperationStarter::Priority(0))); EXPECT_FALSE(as_mock_operation(*operation).get_was_throttled()); } TEST_F(ThrottlingOperationStarterTest, operation_starting_is_forwarded_to_implementation) { - ASSERT_TRUE(_operationStarter->start(createMockOperation(), - OperationStarter::Priority(0))); + ASSERT_TRUE(_operationStarter->start(createMockOperation(), OperationStarter::Priority(0))); EXPECT_EQ("Bucket(BucketSpace(0x0000000000000001), BucketId(0x4000000000000001)), pri 0\n", _starterImpl->toString()); } @@ -63,8 +61,8 @@ TEST_F(ThrottlingOperationStarterTest, operation_starting_is_forwarded_to_implem TEST_F(ThrottlingOperationStarterTest, operation_throttled_when_no_available_slots) { _operationStarter->setMaxPendingRange(0, 0); auto operation = createMockOperation(); - EXPECT_FALSE(_operationStarter->start(operation, - OperationStarter::Priority(0))); + EXPECT_FALSE(_operationStarter->may_allow_operation_with_priority(OperationStarter::Priority(0))); + EXPECT_FALSE(_operationStarter->start(operation, OperationStarter::Priority(0))); EXPECT_TRUE(as_mock_operation(*operation).get_was_throttled()); } @@ -88,32 +86,35 @@ TEST_F(ThrottlingOperationStarterTest, throttling_with_max_pending_range) { TEST_F(ThrottlingOperationStarterTest, starting_operations_fills_up_pending_window) { _operationStarter->setMaxPendingRange(1, 3); - EXPECT_TRUE(_operationStarter->start(createMockOperation(), - OperationStarter::Priority(255))); - EXPECT_FALSE(_operationStarter->start(createMockOperation(), - OperationStarter::Priority(255))); - EXPECT_TRUE(_operationStarter->start(createMockOperation(), - OperationStarter::Priority(100))); - EXPECT_FALSE(_operationStarter->start(createMockOperation(), - OperationStarter::Priority(100))); - EXPECT_TRUE(_operationStarter->start(createMockOperation(), - OperationStarter::Priority(0))); - EXPECT_FALSE(_operationStarter->start(createMockOperation(), - OperationStarter::Priority(0))); + EXPECT_TRUE(_operationStarter->may_allow_operation_with_priority(OperationStarter::Priority(255))); + EXPECT_TRUE(_operationStarter->start(createMockOperation(), OperationStarter::Priority(255))); + + EXPECT_FALSE(_operationStarter->may_allow_operation_with_priority(OperationStarter::Priority(255))); + EXPECT_FALSE(_operationStarter->start(createMockOperation(), OperationStarter::Priority(255))); + + EXPECT_TRUE(_operationStarter->may_allow_operation_with_priority(OperationStarter::Priority(100))); + EXPECT_TRUE(_operationStarter->start(createMockOperation(), OperationStarter::Priority(100))); + + EXPECT_FALSE(_operationStarter->may_allow_operation_with_priority(OperationStarter::Priority(255))); + EXPECT_FALSE(_operationStarter->start(createMockOperation(), OperationStarter::Priority(100))); + + EXPECT_TRUE(_operationStarter->may_allow_operation_with_priority(OperationStarter::Priority(0))); + EXPECT_TRUE(_operationStarter->start(createMockOperation(), OperationStarter::Priority(0))); + + EXPECT_FALSE(_operationStarter->may_allow_operation_with_priority(OperationStarter::Priority(0))); + EXPECT_FALSE(_operationStarter->start(createMockOperation(), OperationStarter::Priority(0))); } TEST_F(ThrottlingOperationStarterTest, finishing_operations_allows_more_to_start) { _operationStarter->setMaxPendingRange(1, 1); - EXPECT_TRUE(_operationStarter->start(createMockOperation(), - OperationStarter::Priority(255))); - EXPECT_FALSE(_operationStarter->start(createMockOperation(), - OperationStarter::Priority(255))); + EXPECT_TRUE(_operationStarter->start(createMockOperation(), OperationStarter::Priority(255))); + EXPECT_FALSE(_operationStarter->start(createMockOperation(), OperationStarter::Priority(255))); EXPECT_FALSE(_starterImpl->getOperations().empty()); _starterImpl->getOperations().pop_back(); - EXPECT_TRUE(_operationStarter->start(createMockOperation(), - OperationStarter::Priority(255))); + EXPECT_TRUE(_operationStarter->may_allow_operation_with_priority(OperationStarter::Priority(255))); + EXPECT_TRUE(_operationStarter->start(createMockOperation(), OperationStarter::Priority(255))); EXPECT_FALSE(_starterImpl->getOperations().empty()); } diff --git a/storage/src/vespa/storage/config/distributorconfiguration.cpp b/storage/src/vespa/storage/config/distributorconfiguration.cpp index 6de3f5a4698..a23d00ee6a3 100644 --- a/storage/src/vespa/storage/config/distributorconfiguration.cpp +++ b/storage/src/vespa/storage/config/distributorconfiguration.cpp @@ -49,6 +49,7 @@ DistributorConfiguration::DistributorConfiguration(StorageComponent& component) _enable_metadata_only_fetch_phase_for_inconsistent_updates(false), _prioritize_global_bucket_merges(true), _enable_revert(true), + _implicitly_clear_priority_on_schedule(false), _minimumReplicaCountingMode(ReplicaCountingMode::TRUSTED) { } @@ -169,6 +170,7 @@ DistributorConfiguration::configure(const vespa::config::content::core::StorDist _prioritize_global_bucket_merges = config.prioritizeGlobalBucketMerges; _max_activation_inhibited_out_of_sync_groups = config.maxActivationInhibitedOutOfSyncGroups; _enable_revert = config.enableRevert; + _implicitly_clear_priority_on_schedule = config.implicitlyClearBucketPriorityOnSchedule; _minimumReplicaCountingMode = config.minimumReplicaCountingMode; diff --git a/storage/src/vespa/storage/config/distributorconfiguration.h b/storage/src/vespa/storage/config/distributorconfiguration.h index e50b2cf7771..7b4e082d1ed 100644 --- a/storage/src/vespa/storage/config/distributorconfiguration.h +++ b/storage/src/vespa/storage/config/distributorconfiguration.h @@ -264,6 +264,9 @@ public: bool enable_revert() const noexcept { return _enable_revert; } + [[nodiscard]] bool implicitly_clear_priority_on_schedule() const noexcept { + return _implicitly_clear_priority_on_schedule; + } uint32_t num_distributor_stripes() const noexcept { return _num_distributor_stripes; } @@ -320,6 +323,7 @@ private: bool _enable_metadata_only_fetch_phase_for_inconsistent_updates; bool _prioritize_global_bucket_merges; bool _enable_revert; + bool _implicitly_clear_priority_on_schedule; DistrConfig::MinimumReplicaCountingMode _minimumReplicaCountingMode; diff --git a/storage/src/vespa/storage/config/stor-distributormanager.def b/storage/src/vespa/storage/config/stor-distributormanager.def index 3b324d6ddd2..8a9fdf74802 100644 --- a/storage/src/vespa/storage/config/stor-distributormanager.def +++ b/storage/src/vespa/storage/config/stor-distributormanager.def @@ -280,3 +280,9 @@ max_activation_inhibited_out_of_sync_groups int default=0 ## CPU cores available. If > 0, the number of stripes is explicitly overridden. ## Stripe counts must be a power of two. num_distributor_stripes int default=0 restart + +## If set, the maintenance scheduler will implicitly clear entries from its internal +## bucket maintenance priority database even when no operation can be started for the +## bucket due to being blocked by concurrent operations. This avoids potential head-of-line +## blocking of later buckets in the priority database. +implicitly_clear_bucket_priority_on_schedule bool default=false diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.cpp b/storage/src/vespa/storage/distributor/distributor_stripe.cpp index d87b2db1638..18a19288ad3 100644 --- a/storage/src/vespa/storage/distributor/distributor_stripe.cpp +++ b/storage/src/vespa/storage/distributor/distributor_stripe.cpp @@ -62,7 +62,8 @@ DistributorStripe::DistributorStripe(DistributorComponentRegister& compReg, _throttlingStarter(std::make_unique<ThrottlingOperationStarter>(_maintenanceOperationOwner)), _blockingStarter(std::make_unique<BlockingOperationStarter>(_component, *_operation_sequencer, *_throttlingStarter)), - _scheduler(std::make_unique<MaintenanceScheduler>(_idealStateManager, *_bucketPriorityDb, *_blockingStarter)), + _scheduler(std::make_unique<MaintenanceScheduler>(_idealStateManager, *_bucketPriorityDb, + *_throttlingStarter, *_blockingStarter)), _schedulingMode(MaintenanceScheduler::NORMAL_SCHEDULING_MODE), _recoveryTimeStarted(_component.getClock()), _tickResult(framework::ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN), @@ -709,7 +710,9 @@ DistributorStripe::doNonCriticalTick(framework::ThreadIndex) // did any useful work with incoming data, this check must be performed _after_ the call. if (!should_inhibit_current_maintenance_scan_tick()) { scanNextBucket(); - startNextMaintenanceOperation(); + if (!_bucketDBUpdater.hasPendingClusterState()) { + startNextMaintenanceOperation(); + } if (isInRecoveryMode()) { signalWorkWasDone(); } @@ -757,6 +760,7 @@ DistributorStripe::propagate_config_snapshot_to_internal_components() getConfig().allowStaleReadsDuringClusterStateTransitions()); _externalOperationHandler.set_use_weak_internal_read_consistency_for_gets( getConfig().use_weak_internal_read_consistency_for_client_gets()); + _scheduler->set_implicitly_clear_priority_on_schedule(getConfig().implicitly_clear_priority_on_schedule()); } void diff --git a/storage/src/vespa/storage/distributor/maintenance/maintenancescheduler.cpp b/storage/src/vespa/storage/distributor/maintenance/maintenancescheduler.cpp index ee5e1278c50..c2fef4f781f 100644 --- a/storage/src/vespa/storage/distributor/maintenance/maintenancescheduler.cpp +++ b/storage/src/vespa/storage/distributor/maintenance/maintenancescheduler.cpp @@ -2,6 +2,7 @@ #include "maintenancescheduler.h" #include "maintenanceoperationgenerator.h" +#include "pending_window_checker.h" #include <vespa/storage/distributor/operationstarter.h> #include <vespa/storage/distributor/operations/idealstate/idealstateoperation.h> @@ -13,17 +14,20 @@ namespace storage::distributor { MaintenanceScheduler::MaintenanceScheduler( MaintenanceOperationGenerator& operationGenerator, BucketPriorityDatabase& priorityDb, + const PendingWindowChecker& pending_window_checker, OperationStarter& operationStarter) : _operationGenerator(operationGenerator), _priorityDb(priorityDb), - _operationStarter(operationStarter) + _pending_window_checker(pending_window_checker), + _operationStarter(operationStarter), + _implicitly_clear_priority_on_schedule(false) { } PrioritizedBucket MaintenanceScheduler::getMostImportantBucket() { - BucketPriorityDatabase::const_iterator mostImportant(_priorityDb.begin()); + auto mostImportant = _priorityDb.begin(); if (mostImportant == _priorityDb.end()) { return PrioritizedBucket::INVALID; } @@ -38,10 +42,21 @@ MaintenanceScheduler::tick(SchedulingMode currentMode) if (!possibleToSchedule(mostImportant, currentMode)) { return WaitTimeMs(1); } + // Bucket activations are so important to do ASAP that we _want_ to block further + // maintenance scheduling until we're able to schedule the next possible bucket. + // The inverse is the case for other maintenance operations. + const bool is_activation = has_bucket_activation_priority(mostImportant); + if (_implicitly_clear_priority_on_schedule && !is_activation) { + // If we can't start the operation, move on to the next bucket. Bucket will be + // re-prioritized when the distributor stripe next scans it. + clearPriority(mostImportant); + } if (!startOperation(mostImportant)) { return WaitTimeMs(1); } - clearPriority(mostImportant); + if (!_implicitly_clear_priority_on_schedule || is_activation) { + clearPriority(mostImportant); + } return WaitTimeMs(0); } @@ -49,11 +64,19 @@ bool MaintenanceScheduler::possibleToSchedule(const PrioritizedBucket& bucket, SchedulingMode currentMode) const { + if (!bucket.valid()) { + return false; + } + // If pending window is full nothing of equal or lower priority can be scheduled, so no point in trying. + if (_implicitly_clear_priority_on_schedule && + !_pending_window_checker.may_allow_operation_with_priority(convertToOperationPriority(bucket.getPriority()))) + { + return false; + } if (currentMode == RECOVERY_SCHEDULING_MODE) { - return (bucket.valid() - && possibleToScheduleInEmergency(bucket)); + return possibleToScheduleInEmergency(bucket); } else { - return bucket.valid(); + return true; } } @@ -104,4 +127,10 @@ MaintenanceScheduler::startOperation(const PrioritizedBucket& bucket) return _operationStarter.start(operation, operationPriority); } +bool +MaintenanceScheduler::has_bucket_activation_priority(const PrioritizedBucket& bucket) const noexcept +{ + return (bucket.getPriority() == MaintenancePriority::HIGHEST); +} + } diff --git a/storage/src/vespa/storage/distributor/maintenance/maintenancescheduler.h b/storage/src/vespa/storage/distributor/maintenance/maintenancescheduler.h index 33eb97292bb..115bf12a64f 100644 --- a/storage/src/vespa/storage/distributor/maintenance/maintenancescheduler.h +++ b/storage/src/vespa/storage/distributor/maintenance/maintenancescheduler.h @@ -9,6 +9,7 @@ namespace storage::distributor { class MaintenanceOperationGenerator; class BucketPriorityDatabase; +class PendingWindowChecker; class MaintenanceScheduler { @@ -22,10 +23,18 @@ public: MaintenanceScheduler(MaintenanceOperationGenerator& operationGenerator, BucketPriorityDatabase& priorityDb, + const PendingWindowChecker& pending_window_checker, OperationStarter& operationStarter); WaitTimeMs tick(SchedulingMode currentMode); + void set_implicitly_clear_priority_on_schedule(bool implicitly_clear) noexcept { + _implicitly_clear_priority_on_schedule = implicitly_clear; + } + [[nodiscard]] bool implicitly_clear_priority_on_schedule() const noexcept { + return _implicitly_clear_priority_on_schedule; + } + private: MaintenanceScheduler(const MaintenanceScheduler&); MaintenanceScheduler& operator=(const MaintenanceScheduler&); @@ -35,12 +44,14 @@ private: bool possibleToScheduleInEmergency(const PrioritizedBucket& bucket) const; void clearPriority(const PrioritizedBucket& bucket); bool startOperation(const PrioritizedBucket& bucket); - OperationStarter::Priority convertToOperationPriority( - MaintenancePriority::Priority priority) const; + OperationStarter::Priority convertToOperationPriority(MaintenancePriority::Priority priority) const; + bool has_bucket_activation_priority(const PrioritizedBucket&) const noexcept; MaintenanceOperationGenerator& _operationGenerator; - BucketPriorityDatabase& _priorityDb; - OperationStarter& _operationStarter; + BucketPriorityDatabase& _priorityDb; + const PendingWindowChecker& _pending_window_checker; + OperationStarter& _operationStarter; + bool _implicitly_clear_priority_on_schedule; }; } diff --git a/storage/src/vespa/storage/distributor/maintenance/pending_window_checker.h b/storage/src/vespa/storage/distributor/maintenance/pending_window_checker.h new file mode 100644 index 00000000000..8a333a3073c --- /dev/null +++ b/storage/src/vespa/storage/distributor/maintenance/pending_window_checker.h @@ -0,0 +1,19 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <vespa/storage/distributor/operationstarter.h> + +namespace storage::distributor { + +/** + * Allows for easily and cheaply checking if an operation with a given internal maintenance + * priority could possibly be started "downstream" due to there being available capacity + * in the maintenance pending window. + */ +class PendingWindowChecker { +public: + virtual ~PendingWindowChecker() = default; + [[nodiscard]] virtual bool may_allow_operation_with_priority(OperationStarter::Priority) const noexcept = 0; +}; + +} diff --git a/storage/src/vespa/storage/distributor/throttlingoperationstarter.cpp b/storage/src/vespa/storage/distributor/throttlingoperationstarter.cpp index ccf8273e386..47404bb365f 100644 --- a/storage/src/vespa/storage/distributor/throttlingoperationstarter.cpp +++ b/storage/src/vespa/storage/distributor/throttlingoperationstarter.cpp @@ -10,6 +10,8 @@ ThrottlingOperationStarter::ThrottlingOperation::~ThrottlingOperation() _operationStarter.signalOperationFinished(*this); } +ThrottlingOperationStarter::~ThrottlingOperationStarter() = default; + bool ThrottlingOperationStarter::canStart(uint32_t currentOperationCount, Priority priority) const { @@ -20,10 +22,9 @@ ThrottlingOperationStarter::canStart(uint32_t currentOperationCount, Priority pr } bool -ThrottlingOperationStarter::start(const std::shared_ptr<Operation>& operation, - Priority priority) +ThrottlingOperationStarter::start(const std::shared_ptr<Operation>& operation, Priority priority) { - if (!canStart(_pendingCount, priority)) { + if (!may_allow_operation_with_priority(priority)) { operation->on_throttled(); return false; } @@ -32,6 +33,12 @@ ThrottlingOperationStarter::start(const std::shared_ptr<Operation>& operation, return _starterImpl.start(wrappedOp, priority); } +bool +ThrottlingOperationStarter::may_allow_operation_with_priority(Priority priority) const noexcept +{ + return canStart(_pendingCount, priority); +} + void ThrottlingOperationStarter::signalOperationFinished(const Operation& op) { diff --git a/storage/src/vespa/storage/distributor/throttlingoperationstarter.h b/storage/src/vespa/storage/distributor/throttlingoperationstarter.h index e63fead8458..00d1237b2f7 100644 --- a/storage/src/vespa/storage/distributor/throttlingoperationstarter.h +++ b/storage/src/vespa/storage/distributor/throttlingoperationstarter.h @@ -2,23 +2,24 @@ #pragma once #include "operationstarter.h" +#include <vespa/storage/distributor/maintenance/pending_window_checker.h> #include <vespa/vespalib/util/hdr_abort.h> #include <vespa/storage/distributor/operations/operation.h> namespace storage::distributor { -class ThrottlingOperationStarter : public OperationStarter +class ThrottlingOperationStarter : public OperationStarter, public PendingWindowChecker { class ThrottlingOperation : public Operation { public: ThrottlingOperation(const Operation::SP& operation, - ThrottlingOperationStarter& operationStarter) + ThrottlingOperationStarter& operationStarter) : _operation(operation), _operationStarter(operationStarter) {} - ~ThrottlingOperation(); + ~ThrottlingOperation() override; private: Operation::SP _operation; ThrottlingOperationStarter& _operationStarter; @@ -53,7 +54,7 @@ class ThrottlingOperationStarter : public OperationStarter HDR_ABORT("should not be reached"); } void onReceive(DistributorStripeMessageSender&, - const std::shared_ptr<api::StorageReply>&) override { + const std::shared_ptr<api::StorageReply>&) override { HDR_ABORT("should not be reached"); } }; @@ -66,9 +67,12 @@ public: _maxPending(UINT32_MAX), _pendingCount(0) {} + ~ThrottlingOperationStarter() override; bool start(const std::shared_ptr<Operation>& operation, Priority priority) override; + bool may_allow_operation_with_priority(Priority priority) const noexcept override; + bool canStart(uint32_t currentOperationCount, Priority priority) const; void setMaxPendingRange(uint32_t minPending, uint32_t maxPending) { |