aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2020-04-20 12:46:06 +0200
committerGitHub <noreply@github.com>2020-04-20 12:46:06 +0200
commiteb5129002b5a3053882273707666b7c7786b4e6d (patch)
treed3ce07c6407b955d0a68522f2422e6c6f12f0e98
parent33c94080e9f5e1672bfbd8ceb66cd550f2e91815 (diff)
parent0aade2bf351d67e56eea952211ee28efc87d2f5f (diff)
Merge pull request #12970 from vespa-engine/vekterli/partially-inhibit-maintenance-scans-when-node-has-heavy-message-load
Temporarily inhibit maintenance scans when node has heavy message load
-rw-r--r--storage/src/tests/distributor/distributortest.cpp8
-rw-r--r--storage/src/vespa/storage/config/distributorconfiguration.cpp5
-rw-r--r--storage/src/vespa/storage/config/distributorconfiguration.h5
-rw-r--r--storage/src/vespa/storage/config/stor-distributormanager.def9
-rw-r--r--storage/src/vespa/storage/distributor/distributor.cpp29
-rw-r--r--storage/src/vespa/storage/distributor/distributor.h6
-rw-r--r--storageframework/src/tests/thread/tickingthreadtest.cpp15
-rw-r--r--storageframework/src/vespa/storageframework/generic/thread/tickingthread.cpp69
-rw-r--r--storageframework/src/vespa/storageframework/generic/thread/tickingthread.h19
9 files changed, 95 insertions, 70 deletions
diff --git a/storage/src/tests/distributor/distributortest.cpp b/storage/src/tests/distributor/distributortest.cpp
index 421fd2cb4b6..b110e99f8a4 100644
--- a/storage/src/tests/distributor/distributortest.cpp
+++ b/storage/src/tests/distributor/distributortest.cpp
@@ -700,6 +700,14 @@ TEST_F(DistributorTest, replica_counting_mode_config_is_propagated_to_metric_upd
EXPECT_EQ(ConfigBuilder::MinimumReplicaCountingMode::ANY, currentReplicaCountingMode());
}
+TEST_F(DistributorTest, max_consecutively_inhibited_maintenance_ticks_config_is_propagated_to_internal_config) {
+ setupDistributor(Redundancy(2), NodeCount(2), "storage:2 distributor:1");
+ ConfigBuilder builder;
+ builder.maxConsecutivelyInhibitedMaintenanceTicks = 123;
+ getConfig().configure(builder);
+ EXPECT_EQ(getConfig().max_consecutively_inhibited_maintenance_ticks(), 123);
+}
+
TEST_F(DistributorTest, bucket_activation_is_enabled_by_default) {
setupDistributor(Redundancy(2), NodeCount(2), "storage:2 distributor:1");
EXPECT_FALSE(getConfig().isBucketActivationDisabled());
diff --git a/storage/src/vespa/storage/config/distributorconfiguration.cpp b/storage/src/vespa/storage/config/distributorconfiguration.cpp
index 9f51d70ce60..0c9988421a3 100644
--- a/storage/src/vespa/storage/config/distributorconfiguration.cpp
+++ b/storage/src/vespa/storage/config/distributorconfiguration.cpp
@@ -22,6 +22,7 @@ DistributorConfiguration::DistributorConfiguration(StorageComponent& component)
_maxIdealStateOperations(100),
_idealStateChunkSize(1000),
_maxNodesPerMerge(16),
+ _max_consecutively_inhibited_maintenance_ticks(20),
_lastGarbageCollectionChange(vespalib::duration::zero()),
_garbageCollectionInterval(0),
_minPendingMaintenanceOps(100),
@@ -44,7 +45,8 @@ DistributorConfiguration::DistributorConfiguration(StorageComponent& component)
_use_weak_internal_read_consistency_for_client_gets(false),
_enable_metadata_only_fetch_phase_for_inconsistent_updates(false),
_minimumReplicaCountingMode(ReplicaCountingMode::TRUSTED)
-{ }
+{
+}
DistributorConfiguration::~DistributorConfiguration() = default;
@@ -123,6 +125,7 @@ DistributorConfiguration::configure(const vespa::config::content::core::StorDist
_docCountJoinLimit = config.joincount;
_minimalBucketSplit = config.minsplitcount;
_maxNodesPerMerge = config.maximumNodesPerMerge;
+ _max_consecutively_inhibited_maintenance_ticks = config.maxConsecutivelyInhibitedMaintenanceTicks;
_garbageCollectionInterval = std::chrono::seconds(config.garbagecollection.interval);
diff --git a/storage/src/vespa/storage/config/distributorconfiguration.h b/storage/src/vespa/storage/config/distributorconfiguration.h
index b8e99165d69..0c4b1f5756c 100644
--- a/storage/src/vespa/storage/config/distributorconfiguration.h
+++ b/storage/src/vespa/storage/config/distributorconfiguration.h
@@ -242,6 +242,10 @@ public:
return _enable_metadata_only_fetch_phase_for_inconsistent_updates;
}
+ uint32_t max_consecutively_inhibited_maintenance_ticks() const noexcept {
+ return _max_consecutively_inhibited_maintenance_ticks;
+ }
+
bool containsTimeStatement(const std::string& documentSelection) const;
private:
@@ -258,6 +262,7 @@ private:
uint32_t _maxIdealStateOperations;
uint32_t _idealStateChunkSize;
uint32_t _maxNodesPerMerge;
+ uint32_t _max_consecutively_inhibited_maintenance_ticks;
std::string _garbageCollectionSelection;
diff --git a/storage/src/vespa/storage/config/stor-distributormanager.def b/storage/src/vespa/storage/config/stor-distributormanager.def
index 915b9b6b304..71059d717a6 100644
--- a/storage/src/vespa/storage/config/stor-distributormanager.def
+++ b/storage/src/vespa/storage/config/stor-distributormanager.def
@@ -236,4 +236,11 @@ use_weak_internal_read_consistency_for_client_gets bool default=false
## Setting this option to true always implicitly enables the fast update restart
## feature, so it's not required to set that config to true, nor will setting it
## to false actually disable the feature.
-enable_metadata_only_fetch_phase_for_inconsistent_updates bool default=false \ No newline at end of file
+enable_metadata_only_fetch_phase_for_inconsistent_updates bool default=false
+
+## If a distributor main thread tick is constantly processing requests or responses
+## originating from other nodes, setting this value above zero will prevent implicit
+## maintenance scans from being done as part of the tick for up to N rounds of ticking.
+## This is to reduce the amount of CPU spent on ideal state calculations and bucket DB
+## accesses when the distributor is heavily loaded with feed operations.
+max_consecutively_inhibited_maintenance_ticks int default=20 \ No newline at end of file
diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp
index cce3d2d1acb..5dade47c2a6 100644
--- a/storage/src/vespa/storage/distributor/distributor.cpp
+++ b/storage/src/vespa/storage/distributor/distributor.cpp
@@ -105,6 +105,7 @@ Distributor::Distributor(DistributorComponentRegister& compReg,
_ownershipSafeTimeCalc(std::make_unique<OwnershipTransferSafeTimePointCalculator>(0s)), // Set by config later
_db_memory_sample_interval(30s),
_last_db_memory_sample_time_point(),
+ _inhibited_maintenance_tick_count(0),
_must_send_updated_host_info(false),
_use_btree_database(use_btree_database)
{
@@ -622,7 +623,7 @@ Distributor::signalWorkWasDone()
}
bool
-Distributor::workWasDone()
+Distributor::workWasDone() const noexcept
{
return !_tickResult.waitWanted();
}
@@ -848,17 +849,39 @@ Distributor::doNonCriticalTick(framework::ThreadIndex)
_tickResult = framework::ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN;
handleStatusRequests();
startExternalOperations();
- if (!initializing()) {
+ if (initializing()) {
+ _bucketDBUpdater.resendDelayedMessages();
+ return _tickResult;
+ }
+ // Ordering note: since maintenance inhibiting checks whether startExternalOperations()
+ // did any useful work with incoming data, this check must be performed _after_ the call.
+ if (!should_inhibit_current_maintenance_scan_tick()) {
scanNextBucket();
startNextMaintenanceOperation();
if (isInRecoveryMode()) {
signalWorkWasDone();
}
+ mark_maintenance_tick_as_no_longer_inhibited();
+ _bucketDBUpdater.resendDelayedMessages();
+ } else {
+ mark_current_maintenance_tick_as_inhibited();
}
- _bucketDBUpdater.resendDelayedMessages();
return _tickResult;
}
+bool Distributor::should_inhibit_current_maintenance_scan_tick() const noexcept {
+ return (workWasDone() && (_inhibited_maintenance_tick_count
+ < getConfig().max_consecutively_inhibited_maintenance_ticks()));
+}
+
+void Distributor::mark_current_maintenance_tick_as_inhibited() noexcept {
+ ++_inhibited_maintenance_tick_count;
+}
+
+void Distributor::mark_maintenance_tick_as_no_longer_inhibited() noexcept {
+ _inhibited_maintenance_tick_count = 0;
+}
+
void
Distributor::enableNextConfig()
{
diff --git a/storage/src/vespa/storage/distributor/distributor.h b/storage/src/vespa/storage/distributor/distributor.h
index bf780434edf..36d399b0a9f 100644
--- a/storage/src/vespa/storage/distributor/distributor.h
+++ b/storage/src/vespa/storage/distributor/distributor.h
@@ -233,12 +233,15 @@ private:
void maybe_update_bucket_db_memory_usage_stats();
void scanAllBuckets();
MaintenanceScanner::ScanResult scanNextBucket();
+ bool should_inhibit_current_maintenance_scan_tick() const noexcept;
+ void mark_current_maintenance_tick_as_inhibited() noexcept;
+ void mark_maintenance_tick_as_no_longer_inhibited() noexcept;
void enableNextConfig();
void fetchStatusRequests();
void fetchExternalMessages();
void startNextMaintenanceOperation();
void signalWorkWasDone();
- bool workWasDone();
+ bool workWasDone() const noexcept;
void enterRecoveryMode();
void leaveRecoveryMode();
@@ -336,6 +339,7 @@ private:
std::unique_ptr<OwnershipTransferSafeTimePointCalculator> _ownershipSafeTimeCalc;
std::chrono::steady_clock::duration _db_memory_sample_interval;
std::chrono::steady_clock::time_point _last_db_memory_sample_time_point;
+ size_t _inhibited_maintenance_tick_count;
bool _must_send_updated_host_info;
const bool _use_btree_database;
};
diff --git a/storageframework/src/tests/thread/tickingthreadtest.cpp b/storageframework/src/tests/thread/tickingthreadtest.cpp
index c42a9c17283..f69f77e8a07 100644
--- a/storageframework/src/tests/thread/tickingthreadtest.cpp
+++ b/storageframework/src/tests/thread/tickingthreadtest.cpp
@@ -234,21 +234,6 @@ TEST(TickingThreadTest, test_lock_critical_ticks)
}
}
-TEST(TickingThreadTest, test_fails_on_start_without_threads)
-{
- TestComponentRegister testReg(
- ComponentRegisterImpl::UP(new ComponentRegisterImpl));
- int threadCount = 0;
- MyApp app(threadCount, true);
- try{
- app.start(testReg.getThreadPoolImpl());
- FAIL() << "Expected starting without threads to fail";
- } catch (vespalib::Exception& e) {
- EXPECT_EQ(vespalib::string("Makes no sense to start threadpool without threads"),
- e.getMessage());
- }
-}
-
namespace {
RealClock clock;
diff --git a/storageframework/src/vespa/storageframework/generic/thread/tickingthread.cpp b/storageframework/src/vespa/storageframework/generic/thread/tickingthread.cpp
index e8651811511..b006e0501f3 100644
--- a/storageframework/src/vespa/storageframework/generic/thread/tickingthread.cpp
+++ b/storageframework/src/vespa/storageframework/generic/thread/tickingthread.cpp
@@ -4,22 +4,23 @@
#include <vespa/vespalib/util/exceptions.h>
#include <vespa/vespalib/stllike/asciistream.h>
-namespace storage {
-namespace framework {
+namespace storage::framework {
ThreadWaitInfo ThreadWaitInfo::MORE_WORK_ENQUEUED(false);
ThreadWaitInfo ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN(true);
void
ThreadWaitInfo::merge(const ThreadWaitInfo& other) {
- if (!other._waitWanted) _waitWanted = false;
+ if (!other._waitWanted) {
+ _waitWanted = false;
+ }
}
/**
* \brief Implementation actually doing lock handling, waiting, and allowing a
* global synchronization point where no thread is currently running.
*/
-class TickingThreadRunner : public Runnable {
+class TickingThreadRunner final : public Runnable {
vespalib::Monitor& _monitor;
TickingThread& _tickingThread;
uint32_t _threadIndex;
@@ -70,10 +71,10 @@ private:
if (info.waitWanted()) {
_state = 'w';
cycle = WAIT_CYCLE;
- if (ticksExecutedAfterWait >= handle.getTicksBeforeWait()) {
+ if (ticksExecutedAfterWait >= handle.getTicksBeforeWait()) {
guard.wait(handle.getWaitTime());
ticksExecutedAfterWait = 0;
- }
+ }
}
if (_wantToFreeze) {
_state = 'f';
@@ -101,26 +102,26 @@ private:
}
};
-class TickingThreadPoolImpl : public TickingThreadPool {
+class TickingThreadPoolImpl final : public TickingThreadPool {
vespalib::string _name;
vespalib::Monitor _monitor;
- std::atomic_uint_least64_t _waitTime ;
+ std::atomic_uint_least64_t _waitTime;
std::atomic_uint _ticksBeforeWait;
std::atomic_uint_least64_t _maxProcessTime;
std::vector<TickingThreadRunner::SP> _tickers;
- std::vector<std::shared_ptr<Thread> > _threads;
+ std::vector<std::shared_ptr<Thread>> _threads;
- struct FreezeGuard : public TickingLockGuard::Impl {
+ struct FreezeGuard final : public TickingLockGuard::Impl {
TickingThreadPoolImpl& _pool;
- FreezeGuard(TickingThreadPoolImpl& pool) : _pool(pool) { _pool.freeze(); }
- ~FreezeGuard() { _pool.thaw(); }
+ explicit FreezeGuard(TickingThreadPoolImpl& pool) : _pool(pool) { _pool.freeze(); }
+ ~FreezeGuard() override { _pool.thaw(); }
void broadcast() override {}
};
- struct CriticalGuard : public TickingLockGuard::Impl {
+ struct CriticalGuard final : public TickingLockGuard::Impl {
vespalib::MonitorGuard _guard;
- CriticalGuard(vespalib::Monitor& m) : _guard(m) {}
+ explicit CriticalGuard(vespalib::Monitor& m) : _guard(m) {}
void broadcast() override { _guard.broadcast(); }
};
@@ -133,7 +134,7 @@ public:
_ticksBeforeWait(ticksBeforeWait),
_maxProcessTime(maxProcessTime.getTime()) {}
- ~TickingThreadPoolImpl() {
+ ~TickingThreadPoolImpl() override {
stop();
}
@@ -151,15 +152,11 @@ public:
void addThread(TickingThread& ticker) override {
ThreadIndex index = _tickers.size();
ticker.newThreadCreated(index);
- _tickers.push_back(TickingThreadRunner::SP(new TickingThreadRunner(_monitor, ticker, index)));
+ _tickers.emplace_back(std::make_shared<TickingThreadRunner>(_monitor, ticker, index));
}
void start(ThreadPool& pool) override {
- if (_tickers.empty()) {
- throw vespalib::IllegalStateException(
- "Makes no sense to start threadpool without threads",
- VESPA_STRLOC);
- }
+ assert(!_tickers.empty());
for (uint32_t i=0; i<_tickers.size(); ++i) {
vespalib::asciistream ost;
ost << _name.c_str() << " thread " << i;
@@ -173,24 +170,23 @@ public:
}
TickingLockGuard freezeAllTicks() override {
- return TickingLockGuard(std::unique_ptr<TickingLockGuard::Impl>(new FreezeGuard(*this)));
+ return TickingLockGuard(std::make_unique<FreezeGuard>(*this));
}
TickingLockGuard freezeCriticalTicks() override {
- return TickingLockGuard(std::unique_ptr<TickingLockGuard::Impl>(
- new CriticalGuard(_monitor)));
+ return TickingLockGuard(std::make_unique<CriticalGuard>(_monitor));
}
void stop() override {
- for (uint32_t i=0; i<_threads.size(); ++i) {
- _threads[i]->interrupt();
+ for (auto& t : _threads) {
+ t->interrupt();
}
{
vespalib::MonitorGuard guard(_monitor);
guard.broadcast();
}
- for (uint32_t i=0; i<_threads.size(); ++i) {
- _threads[i]->join();
+ for (auto& t : _threads) {
+ t->join();
}
}
@@ -204,14 +200,14 @@ public:
private:
void freeze() {
- for (uint32_t i=0; i<_tickers.size(); ++i) {
- _tickers[i]->freeze();
+ for (auto& t : _tickers) {
+ t->freeze();
}
}
void thaw() {
- for (uint32_t i=0; i<_tickers.size(); ++i) {
- _tickers[i]->thaw();
+ for (auto& t : _tickers) {
+ t->thaw();
}
}
};
@@ -223,12 +219,7 @@ TickingThreadPool::createDefault(
int ticksBeforeWait,
MilliSecTime maxProcessTime)
{
- return TickingThreadPool::UP(new TickingThreadPoolImpl(
- name,
- waitTime,
- ticksBeforeWait,
- maxProcessTime));
+ return std::make_unique<TickingThreadPoolImpl>(name, waitTime, ticksBeforeWait, maxProcessTime);
}
-} // framework
-} // storage
+} // storage::framework
diff --git a/storageframework/src/vespa/storageframework/generic/thread/tickingthread.h b/storageframework/src/vespa/storageframework/generic/thread/tickingthread.h
index b03385da50e..477c17b1cc3 100644
--- a/storageframework/src/vespa/storageframework/generic/thread/tickingthread.h
+++ b/storageframework/src/vespa/storageframework/generic/thread/tickingthread.h
@@ -26,7 +26,7 @@
namespace storage::framework {
struct ThreadPool;
-typedef uint32_t ThreadIndex;
+using ThreadIndex = uint32_t;
/**
* \brief Information returned from tick functions to indicate whether thread
@@ -34,21 +34,21 @@ typedef uint32_t ThreadIndex;
*/
class ThreadWaitInfo {
bool _waitWanted;
- ThreadWaitInfo(bool waitBeforeNextTick) : _waitWanted(waitBeforeNextTick) {}
+ explicit ThreadWaitInfo(bool waitBeforeNextTick) : _waitWanted(waitBeforeNextTick) {}
public:
static ThreadWaitInfo MORE_WORK_ENQUEUED;
static ThreadWaitInfo NO_MORE_CRITICAL_WORK_KNOWN;
void merge(const ThreadWaitInfo& other);
- bool waitWanted() { return _waitWanted; }
+ bool waitWanted() const noexcept { return _waitWanted; }
};
/**
* \brief Simple superclass to implement for ticking threads.
*/
struct TickingThread {
- virtual ~TickingThread() {}
+ virtual ~TickingThread() = default;
virtual ThreadWaitInfo doCriticalTick(ThreadIndex) = 0;
virtual ThreadWaitInfo doNonCriticalTick(ThreadIndex) = 0;
@@ -58,17 +58,17 @@ struct TickingThread {
/** \brief Delete to allow threads to tick again. */
struct TickingLockGuard {
struct Impl {
- virtual ~Impl() {}
+ virtual ~Impl() = default;
virtual void broadcast() = 0;
};
- TickingLockGuard(std::unique_ptr<Impl> impl) : _impl(std::move(impl)) {}
+ explicit TickingLockGuard(std::unique_ptr<Impl> impl) : _impl(std::move(impl)) {}
void broadcast() { _impl->broadcast(); }
private:
std::unique_ptr<Impl> _impl;
};
struct ThreadLock {
- virtual ~ThreadLock() { }
+ virtual ~ThreadLock() = default;
virtual TickingLockGuard freezeAllTicks() = 0;
virtual TickingLockGuard freezeCriticalTicks() = 0;
};
@@ -77,7 +77,7 @@ struct ThreadLock {
* \brief Thread pool set up by the application to control the threads.
*/
struct TickingThreadPool : public ThreadLock {
- typedef std::unique_ptr<TickingThreadPool> UP;
+ using UP = std::unique_ptr<TickingThreadPool>;
static TickingThreadPool::UP createDefault(
vespalib::stringref name,
@@ -90,8 +90,7 @@ struct TickingThreadPool : public ThreadLock {
MilliSecTime maxProcessTime,
int ticksBeforeWait) = 0;
-
- virtual ~TickingThreadPool() {}
+ ~TickingThreadPool() override = default;
/** All threads must be added before starting the threads. */
virtual void addThread(TickingThread& ticker) = 0;