diff options
author | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2020-04-20 12:46:06 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-04-20 12:46:06 +0200 |
commit | eb5129002b5a3053882273707666b7c7786b4e6d (patch) | |
tree | d3ce07c6407b955d0a68522f2422e6c6f12f0e98 | |
parent | 33c94080e9f5e1672bfbd8ceb66cd550f2e91815 (diff) | |
parent | 0aade2bf351d67e56eea952211ee28efc87d2f5f (diff) |
Merge pull request #12970 from vespa-engine/vekterli/partially-inhibit-maintenance-scans-when-node-has-heavy-message-load
Temporarily inhibit maintenance scans when node has heavy message load
9 files changed, 95 insertions, 70 deletions
diff --git a/storage/src/tests/distributor/distributortest.cpp b/storage/src/tests/distributor/distributortest.cpp index 421fd2cb4b6..b110e99f8a4 100644 --- a/storage/src/tests/distributor/distributortest.cpp +++ b/storage/src/tests/distributor/distributortest.cpp @@ -700,6 +700,14 @@ TEST_F(DistributorTest, replica_counting_mode_config_is_propagated_to_metric_upd EXPECT_EQ(ConfigBuilder::MinimumReplicaCountingMode::ANY, currentReplicaCountingMode()); } +TEST_F(DistributorTest, max_consecutively_inhibited_maintenance_ticks_config_is_propagated_to_internal_config) { + setupDistributor(Redundancy(2), NodeCount(2), "storage:2 distributor:1"); + ConfigBuilder builder; + builder.maxConsecutivelyInhibitedMaintenanceTicks = 123; + getConfig().configure(builder); + EXPECT_EQ(getConfig().max_consecutively_inhibited_maintenance_ticks(), 123); +} + TEST_F(DistributorTest, bucket_activation_is_enabled_by_default) { setupDistributor(Redundancy(2), NodeCount(2), "storage:2 distributor:1"); EXPECT_FALSE(getConfig().isBucketActivationDisabled()); diff --git a/storage/src/vespa/storage/config/distributorconfiguration.cpp b/storage/src/vespa/storage/config/distributorconfiguration.cpp index 9f51d70ce60..0c9988421a3 100644 --- a/storage/src/vespa/storage/config/distributorconfiguration.cpp +++ b/storage/src/vespa/storage/config/distributorconfiguration.cpp @@ -22,6 +22,7 @@ DistributorConfiguration::DistributorConfiguration(StorageComponent& component) _maxIdealStateOperations(100), _idealStateChunkSize(1000), _maxNodesPerMerge(16), + _max_consecutively_inhibited_maintenance_ticks(20), _lastGarbageCollectionChange(vespalib::duration::zero()), _garbageCollectionInterval(0), _minPendingMaintenanceOps(100), @@ -44,7 +45,8 @@ DistributorConfiguration::DistributorConfiguration(StorageComponent& component) _use_weak_internal_read_consistency_for_client_gets(false), _enable_metadata_only_fetch_phase_for_inconsistent_updates(false), _minimumReplicaCountingMode(ReplicaCountingMode::TRUSTED) -{ } +{ +} DistributorConfiguration::~DistributorConfiguration() = default; @@ -123,6 +125,7 @@ DistributorConfiguration::configure(const vespa::config::content::core::StorDist _docCountJoinLimit = config.joincount; _minimalBucketSplit = config.minsplitcount; _maxNodesPerMerge = config.maximumNodesPerMerge; + _max_consecutively_inhibited_maintenance_ticks = config.maxConsecutivelyInhibitedMaintenanceTicks; _garbageCollectionInterval = std::chrono::seconds(config.garbagecollection.interval); diff --git a/storage/src/vespa/storage/config/distributorconfiguration.h b/storage/src/vespa/storage/config/distributorconfiguration.h index b8e99165d69..0c4b1f5756c 100644 --- a/storage/src/vespa/storage/config/distributorconfiguration.h +++ b/storage/src/vespa/storage/config/distributorconfiguration.h @@ -242,6 +242,10 @@ public: return _enable_metadata_only_fetch_phase_for_inconsistent_updates; } + uint32_t max_consecutively_inhibited_maintenance_ticks() const noexcept { + return _max_consecutively_inhibited_maintenance_ticks; + } + bool containsTimeStatement(const std::string& documentSelection) const; private: @@ -258,6 +262,7 @@ private: uint32_t _maxIdealStateOperations; uint32_t _idealStateChunkSize; uint32_t _maxNodesPerMerge; + uint32_t _max_consecutively_inhibited_maintenance_ticks; std::string _garbageCollectionSelection; diff --git a/storage/src/vespa/storage/config/stor-distributormanager.def b/storage/src/vespa/storage/config/stor-distributormanager.def index 915b9b6b304..71059d717a6 100644 --- a/storage/src/vespa/storage/config/stor-distributormanager.def +++ b/storage/src/vespa/storage/config/stor-distributormanager.def @@ -236,4 +236,11 @@ use_weak_internal_read_consistency_for_client_gets bool default=false ## Setting this option to true always implicitly enables the fast update restart ## feature, so it's not required to set that config to true, nor will setting it ## to false actually disable the feature. -enable_metadata_only_fetch_phase_for_inconsistent_updates bool default=false
\ No newline at end of file +enable_metadata_only_fetch_phase_for_inconsistent_updates bool default=false + +## If a distributor main thread tick is constantly processing requests or responses +## originating from other nodes, setting this value above zero will prevent implicit +## maintenance scans from being done as part of the tick for up to N rounds of ticking. +## This is to reduce the amount of CPU spent on ideal state calculations and bucket DB +## accesses when the distributor is heavily loaded with feed operations. +max_consecutively_inhibited_maintenance_ticks int default=20
\ No newline at end of file diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp index cce3d2d1acb..5dade47c2a6 100644 --- a/storage/src/vespa/storage/distributor/distributor.cpp +++ b/storage/src/vespa/storage/distributor/distributor.cpp @@ -105,6 +105,7 @@ Distributor::Distributor(DistributorComponentRegister& compReg, _ownershipSafeTimeCalc(std::make_unique<OwnershipTransferSafeTimePointCalculator>(0s)), // Set by config later _db_memory_sample_interval(30s), _last_db_memory_sample_time_point(), + _inhibited_maintenance_tick_count(0), _must_send_updated_host_info(false), _use_btree_database(use_btree_database) { @@ -622,7 +623,7 @@ Distributor::signalWorkWasDone() } bool -Distributor::workWasDone() +Distributor::workWasDone() const noexcept { return !_tickResult.waitWanted(); } @@ -848,17 +849,39 @@ Distributor::doNonCriticalTick(framework::ThreadIndex) _tickResult = framework::ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN; handleStatusRequests(); startExternalOperations(); - if (!initializing()) { + if (initializing()) { + _bucketDBUpdater.resendDelayedMessages(); + return _tickResult; + } + // Ordering note: since maintenance inhibiting checks whether startExternalOperations() + // did any useful work with incoming data, this check must be performed _after_ the call. + if (!should_inhibit_current_maintenance_scan_tick()) { scanNextBucket(); startNextMaintenanceOperation(); if (isInRecoveryMode()) { signalWorkWasDone(); } + mark_maintenance_tick_as_no_longer_inhibited(); + _bucketDBUpdater.resendDelayedMessages(); + } else { + mark_current_maintenance_tick_as_inhibited(); } - _bucketDBUpdater.resendDelayedMessages(); return _tickResult; } +bool Distributor::should_inhibit_current_maintenance_scan_tick() const noexcept { + return (workWasDone() && (_inhibited_maintenance_tick_count + < getConfig().max_consecutively_inhibited_maintenance_ticks())); +} + +void Distributor::mark_current_maintenance_tick_as_inhibited() noexcept { + ++_inhibited_maintenance_tick_count; +} + +void Distributor::mark_maintenance_tick_as_no_longer_inhibited() noexcept { + _inhibited_maintenance_tick_count = 0; +} + void Distributor::enableNextConfig() { diff --git a/storage/src/vespa/storage/distributor/distributor.h b/storage/src/vespa/storage/distributor/distributor.h index bf780434edf..36d399b0a9f 100644 --- a/storage/src/vespa/storage/distributor/distributor.h +++ b/storage/src/vespa/storage/distributor/distributor.h @@ -233,12 +233,15 @@ private: void maybe_update_bucket_db_memory_usage_stats(); void scanAllBuckets(); MaintenanceScanner::ScanResult scanNextBucket(); + bool should_inhibit_current_maintenance_scan_tick() const noexcept; + void mark_current_maintenance_tick_as_inhibited() noexcept; + void mark_maintenance_tick_as_no_longer_inhibited() noexcept; void enableNextConfig(); void fetchStatusRequests(); void fetchExternalMessages(); void startNextMaintenanceOperation(); void signalWorkWasDone(); - bool workWasDone(); + bool workWasDone() const noexcept; void enterRecoveryMode(); void leaveRecoveryMode(); @@ -336,6 +339,7 @@ private: std::unique_ptr<OwnershipTransferSafeTimePointCalculator> _ownershipSafeTimeCalc; std::chrono::steady_clock::duration _db_memory_sample_interval; std::chrono::steady_clock::time_point _last_db_memory_sample_time_point; + size_t _inhibited_maintenance_tick_count; bool _must_send_updated_host_info; const bool _use_btree_database; }; diff --git a/storageframework/src/tests/thread/tickingthreadtest.cpp b/storageframework/src/tests/thread/tickingthreadtest.cpp index c42a9c17283..f69f77e8a07 100644 --- a/storageframework/src/tests/thread/tickingthreadtest.cpp +++ b/storageframework/src/tests/thread/tickingthreadtest.cpp @@ -234,21 +234,6 @@ TEST(TickingThreadTest, test_lock_critical_ticks) } } -TEST(TickingThreadTest, test_fails_on_start_without_threads) -{ - TestComponentRegister testReg( - ComponentRegisterImpl::UP(new ComponentRegisterImpl)); - int threadCount = 0; - MyApp app(threadCount, true); - try{ - app.start(testReg.getThreadPoolImpl()); - FAIL() << "Expected starting without threads to fail"; - } catch (vespalib::Exception& e) { - EXPECT_EQ(vespalib::string("Makes no sense to start threadpool without threads"), - e.getMessage()); - } -} - namespace { RealClock clock; diff --git a/storageframework/src/vespa/storageframework/generic/thread/tickingthread.cpp b/storageframework/src/vespa/storageframework/generic/thread/tickingthread.cpp index e8651811511..b006e0501f3 100644 --- a/storageframework/src/vespa/storageframework/generic/thread/tickingthread.cpp +++ b/storageframework/src/vespa/storageframework/generic/thread/tickingthread.cpp @@ -4,22 +4,23 @@ #include <vespa/vespalib/util/exceptions.h> #include <vespa/vespalib/stllike/asciistream.h> -namespace storage { -namespace framework { +namespace storage::framework { ThreadWaitInfo ThreadWaitInfo::MORE_WORK_ENQUEUED(false); ThreadWaitInfo ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN(true); void ThreadWaitInfo::merge(const ThreadWaitInfo& other) { - if (!other._waitWanted) _waitWanted = false; + if (!other._waitWanted) { + _waitWanted = false; + } } /** * \brief Implementation actually doing lock handling, waiting, and allowing a * global synchronization point where no thread is currently running. */ -class TickingThreadRunner : public Runnable { +class TickingThreadRunner final : public Runnable { vespalib::Monitor& _monitor; TickingThread& _tickingThread; uint32_t _threadIndex; @@ -70,10 +71,10 @@ private: if (info.waitWanted()) { _state = 'w'; cycle = WAIT_CYCLE; - if (ticksExecutedAfterWait >= handle.getTicksBeforeWait()) { + if (ticksExecutedAfterWait >= handle.getTicksBeforeWait()) { guard.wait(handle.getWaitTime()); ticksExecutedAfterWait = 0; - } + } } if (_wantToFreeze) { _state = 'f'; @@ -101,26 +102,26 @@ private: } }; -class TickingThreadPoolImpl : public TickingThreadPool { +class TickingThreadPoolImpl final : public TickingThreadPool { vespalib::string _name; vespalib::Monitor _monitor; - std::atomic_uint_least64_t _waitTime ; + std::atomic_uint_least64_t _waitTime; std::atomic_uint _ticksBeforeWait; std::atomic_uint_least64_t _maxProcessTime; std::vector<TickingThreadRunner::SP> _tickers; - std::vector<std::shared_ptr<Thread> > _threads; + std::vector<std::shared_ptr<Thread>> _threads; - struct FreezeGuard : public TickingLockGuard::Impl { + struct FreezeGuard final : public TickingLockGuard::Impl { TickingThreadPoolImpl& _pool; - FreezeGuard(TickingThreadPoolImpl& pool) : _pool(pool) { _pool.freeze(); } - ~FreezeGuard() { _pool.thaw(); } + explicit FreezeGuard(TickingThreadPoolImpl& pool) : _pool(pool) { _pool.freeze(); } + ~FreezeGuard() override { _pool.thaw(); } void broadcast() override {} }; - struct CriticalGuard : public TickingLockGuard::Impl { + struct CriticalGuard final : public TickingLockGuard::Impl { vespalib::MonitorGuard _guard; - CriticalGuard(vespalib::Monitor& m) : _guard(m) {} + explicit CriticalGuard(vespalib::Monitor& m) : _guard(m) {} void broadcast() override { _guard.broadcast(); } }; @@ -133,7 +134,7 @@ public: _ticksBeforeWait(ticksBeforeWait), _maxProcessTime(maxProcessTime.getTime()) {} - ~TickingThreadPoolImpl() { + ~TickingThreadPoolImpl() override { stop(); } @@ -151,15 +152,11 @@ public: void addThread(TickingThread& ticker) override { ThreadIndex index = _tickers.size(); ticker.newThreadCreated(index); - _tickers.push_back(TickingThreadRunner::SP(new TickingThreadRunner(_monitor, ticker, index))); + _tickers.emplace_back(std::make_shared<TickingThreadRunner>(_monitor, ticker, index)); } void start(ThreadPool& pool) override { - if (_tickers.empty()) { - throw vespalib::IllegalStateException( - "Makes no sense to start threadpool without threads", - VESPA_STRLOC); - } + assert(!_tickers.empty()); for (uint32_t i=0; i<_tickers.size(); ++i) { vespalib::asciistream ost; ost << _name.c_str() << " thread " << i; @@ -173,24 +170,23 @@ public: } TickingLockGuard freezeAllTicks() override { - return TickingLockGuard(std::unique_ptr<TickingLockGuard::Impl>(new FreezeGuard(*this))); + return TickingLockGuard(std::make_unique<FreezeGuard>(*this)); } TickingLockGuard freezeCriticalTicks() override { - return TickingLockGuard(std::unique_ptr<TickingLockGuard::Impl>( - new CriticalGuard(_monitor))); + return TickingLockGuard(std::make_unique<CriticalGuard>(_monitor)); } void stop() override { - for (uint32_t i=0; i<_threads.size(); ++i) { - _threads[i]->interrupt(); + for (auto& t : _threads) { + t->interrupt(); } { vespalib::MonitorGuard guard(_monitor); guard.broadcast(); } - for (uint32_t i=0; i<_threads.size(); ++i) { - _threads[i]->join(); + for (auto& t : _threads) { + t->join(); } } @@ -204,14 +200,14 @@ public: private: void freeze() { - for (uint32_t i=0; i<_tickers.size(); ++i) { - _tickers[i]->freeze(); + for (auto& t : _tickers) { + t->freeze(); } } void thaw() { - for (uint32_t i=0; i<_tickers.size(); ++i) { - _tickers[i]->thaw(); + for (auto& t : _tickers) { + t->thaw(); } } }; @@ -223,12 +219,7 @@ TickingThreadPool::createDefault( int ticksBeforeWait, MilliSecTime maxProcessTime) { - return TickingThreadPool::UP(new TickingThreadPoolImpl( - name, - waitTime, - ticksBeforeWait, - maxProcessTime)); + return std::make_unique<TickingThreadPoolImpl>(name, waitTime, ticksBeforeWait, maxProcessTime); } -} // framework -} // storage +} // storage::framework diff --git a/storageframework/src/vespa/storageframework/generic/thread/tickingthread.h b/storageframework/src/vespa/storageframework/generic/thread/tickingthread.h index b03385da50e..477c17b1cc3 100644 --- a/storageframework/src/vespa/storageframework/generic/thread/tickingthread.h +++ b/storageframework/src/vespa/storageframework/generic/thread/tickingthread.h @@ -26,7 +26,7 @@ namespace storage::framework { struct ThreadPool; -typedef uint32_t ThreadIndex; +using ThreadIndex = uint32_t; /** * \brief Information returned from tick functions to indicate whether thread @@ -34,21 +34,21 @@ typedef uint32_t ThreadIndex; */ class ThreadWaitInfo { bool _waitWanted; - ThreadWaitInfo(bool waitBeforeNextTick) : _waitWanted(waitBeforeNextTick) {} + explicit ThreadWaitInfo(bool waitBeforeNextTick) : _waitWanted(waitBeforeNextTick) {} public: static ThreadWaitInfo MORE_WORK_ENQUEUED; static ThreadWaitInfo NO_MORE_CRITICAL_WORK_KNOWN; void merge(const ThreadWaitInfo& other); - bool waitWanted() { return _waitWanted; } + bool waitWanted() const noexcept { return _waitWanted; } }; /** * \brief Simple superclass to implement for ticking threads. */ struct TickingThread { - virtual ~TickingThread() {} + virtual ~TickingThread() = default; virtual ThreadWaitInfo doCriticalTick(ThreadIndex) = 0; virtual ThreadWaitInfo doNonCriticalTick(ThreadIndex) = 0; @@ -58,17 +58,17 @@ struct TickingThread { /** \brief Delete to allow threads to tick again. */ struct TickingLockGuard { struct Impl { - virtual ~Impl() {} + virtual ~Impl() = default; virtual void broadcast() = 0; }; - TickingLockGuard(std::unique_ptr<Impl> impl) : _impl(std::move(impl)) {} + explicit TickingLockGuard(std::unique_ptr<Impl> impl) : _impl(std::move(impl)) {} void broadcast() { _impl->broadcast(); } private: std::unique_ptr<Impl> _impl; }; struct ThreadLock { - virtual ~ThreadLock() { } + virtual ~ThreadLock() = default; virtual TickingLockGuard freezeAllTicks() = 0; virtual TickingLockGuard freezeCriticalTicks() = 0; }; @@ -77,7 +77,7 @@ struct ThreadLock { * \brief Thread pool set up by the application to control the threads. */ struct TickingThreadPool : public ThreadLock { - typedef std::unique_ptr<TickingThreadPool> UP; + using UP = std::unique_ptr<TickingThreadPool>; static TickingThreadPool::UP createDefault( vespalib::stringref name, @@ -90,8 +90,7 @@ struct TickingThreadPool : public ThreadLock { MilliSecTime maxProcessTime, int ticksBeforeWait) = 0; - - virtual ~TickingThreadPool() {} + ~TickingThreadPool() override = default; /** All threads must be added before starting the threads. */ virtual void addThread(TickingThread& ticker) = 0; |