diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2022-02-17 13:33:36 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2022-02-17 13:33:36 +0000 |
commit | ba95e0e4c67c5444747a062272630b46ff0750ff (patch) | |
tree | 5ba8c871d26eb5551ccb9d88b035300e6b2f5c76 | |
parent | 77e04dad6bc437de3c934b28fe9f3a1df54a01c5 (diff) |
Remove possibility to update waitTime, maxProcessTime and maxTicksBeforWait from config.
12 files changed, 39 insertions, 126 deletions
diff --git a/storage/src/tests/distributor/top_level_distributor_test_util.cpp b/storage/src/tests/distributor/top_level_distributor_test_util.cpp index 2a61141865a..5dcc7f5c6ad 100644 --- a/storage/src/tests/distributor/top_level_distributor_test_util.cpp +++ b/storage/src/tests/distributor/top_level_distributor_test_util.cpp @@ -31,8 +31,8 @@ TopLevelDistributorTestUtil::~TopLevelDistributorTestUtil() = default; void TopLevelDistributorTestUtil::create_links() { - _node.reset(new TestDistributorApp(_config.getConfigId())); - _thread_pool = framework::TickingThreadPool::createDefault("distributor"); + _node = std::make_unique<TestDistributorApp>(_config.getConfigId()); + _thread_pool = framework::TickingThreadPool::createDefault("distributor", 100ms); _stripe_pool = DistributorStripePool::make_non_threaded_pool_for_testing(); _distributor.reset(new TopLevelDistributor( _node->getComponentRegister(), @@ -43,7 +43,7 @@ TopLevelDistributorTestUtil::create_links() _num_distributor_stripes, _host_info, &_message_sender)); - _component.reset(new storage::DistributorComponent(_node->getComponentRegister(), "distrtestutil")); + _component = std::make_unique<storage::DistributorComponent>(_node->getComponentRegister(), "distrtestutil"); }; void diff --git a/storage/src/vespa/storage/distributor/top_level_distributor.h b/storage/src/vespa/storage/distributor/top_level_distributor.h index 20a61acfa37..95bff77fe40 100644 --- a/storage/src/vespa/storage/distributor/top_level_distributor.h +++ b/storage/src/vespa/storage/distributor/top_level_distributor.h @@ -106,8 +106,8 @@ public: bool handleStatusRequest(const DelegatedStatusRequest& request) const override; - virtual framework::ThreadWaitInfo doCriticalTick(framework::ThreadIndex) override; - virtual framework::ThreadWaitInfo doNonCriticalTick(framework::ThreadIndex) override; + framework::ThreadWaitInfo doCriticalTick(framework::ThreadIndex) override; + framework::ThreadWaitInfo doNonCriticalTick(framework::ThreadIndex) override; // Called by DistributorStripe threads when they want to notify the cluster controller of changed stats. // Thread safe. diff --git a/storage/src/vespa/storage/storageserver/distributornode.cpp b/storage/src/vespa/storage/storageserver/distributornode.cpp index b7016220531..6fd8cded08e 100644 --- a/storage/src/vespa/storage/storageserver/distributornode.cpp +++ b/storage/src/vespa/storage/storageserver/distributornode.cpp @@ -26,9 +26,7 @@ DistributorNode::DistributorNode( : StorageNode(configUri, context, generationFetcher, std::make_unique<HostInfo>(), !communicationManager ? NORMAL : SINGLE_THREADED_TEST_MODE), - // TODO STRIPE: Change waitTime default to 100ms when legacy mode is removed. - _threadPool(framework::TickingThreadPool::createDefault("distributor", - (num_distributor_stripes > 0) ? 100ms : 5ms)), + _threadPool(framework::TickingThreadPool::createDefault("distributor", 100ms, 1, 5s)), _stripe_pool(std::make_unique<distributor::DistributorStripePool>()), _context(context), _timestamp_mutex(), @@ -72,9 +70,6 @@ DistributorNode::handleConfigChange(vespa::config::content::core::StorDistributo { framework::TickingLockGuard guard(_threadPool->freezeAllTicks()); _context.getComponentRegister().setDistributorConfig(c); - _threadPool->updateParametersAllThreads(std::chrono::milliseconds(c.ticksWaitTimeMs), - std::chrono::milliseconds(c.maxProcessTimeMs), - c.ticksBeforeWait); } void diff --git a/storageframework/src/tests/thread/taskthreadtest.cpp b/storageframework/src/tests/thread/taskthreadtest.cpp index b7d7de3239e..32bc6e8a8fb 100644 --- a/storageframework/src/tests/thread/taskthreadtest.cpp +++ b/storageframework/src/tests/thread/taskthreadtest.cpp @@ -31,7 +31,7 @@ struct MyThread : public TaskThread<Task> { TEST(TaskThreadTest, test_normal_usage) { - TickingThreadPool::UP pool(TickingThreadPool::createDefault("testApp")); + TickingThreadPool::UP pool(TickingThreadPool::createDefault("testApp", 100ms)); MyThread t(*pool); t.addTask(Task("a", 6)); diff --git a/storageframework/src/tests/thread/tickingthreadtest.cpp b/storageframework/src/tests/thread/tickingthreadtest.cpp index 4d6ce76a676..5fe8f25ae72 100644 --- a/storageframework/src/tests/thread/tickingthreadtest.cpp +++ b/storageframework/src/tests/thread/tickingthreadtest.cpp @@ -4,7 +4,6 @@ #include <vespa/storageframework/defaultimplementation/component/testcomponentregister.h> #include <vespa/storageframework/generic/thread/tickingthread.h> #include <vespa/vespalib/gtest/gtest.h> -#include <vespa/vespalib/util/exception.h> #include <vespa/vespalib/util/stringfmt.h> #include <thread> @@ -86,7 +85,7 @@ MyApp::MyApp(int threadCount, bool doCritOverlapTest) : _critOverlapCounter(0), _doCritOverlapTest(doCritOverlapTest), _critOverlap(false), - _threadPool(TickingThreadPool::createDefault("testApp")) + _threadPool(TickingThreadPool::createDefault("testApp", 100ms)) { for (int i=0; i<threadCount; ++i) { _threadPool->addThread(*this); @@ -100,8 +99,7 @@ MyApp::~MyApp() = default; TEST(TickingThreadTest, test_ticks_before_wait_basic) { - TestComponentRegister testReg( - ComponentRegisterImpl::UP(new ComponentRegisterImpl)); + TestComponentRegister testReg(std::make_unique<ComponentRegisterImpl>()); int threadCount = 1; MyApp app(threadCount); app.start(testReg.getThreadPoolImpl()); @@ -117,29 +115,6 @@ TEST(TickingThreadTest, test_ticks_before_wait_basic) app._threadPool->stop(); } -TEST(TickingThreadTest, test_ticks_before_wait_live_update) -{ - TestComponentRegister testReg = std::make_unique<ComponentRegisterImpl>(); - int threadCount = 1; - MyApp app(threadCount); - // Configure thread pool to send bulks of 5000 ticks each second. - long unsigned int ticksBeforeWaitMs = 5000; - app.start(testReg.getThreadPoolImpl()); - app._threadPool->updateParametersAllThreads( - 1s, 234234ms, ticksBeforeWaitMs); - - // Check that 5000 ticks are received instantly (usually <2 ms) - // (if live update is broken it will take more than an hour). - int maxAttempts = 120000; // a bit more than 120 secs - while (app.getTotalNonCritTicks() < ticksBeforeWaitMs && maxAttempts-->0) { - std::this_thread::sleep_for(1ms); - } - - EXPECT_GT(maxAttempts, 0); - EXPECT_GE(app.getTotalNonCritTicks(), ticksBeforeWaitMs); - app._threadPool->stop(); -} - TEST(TickingThreadTest, test_destroy_without_starting) { TestComponentRegister testReg(std::make_unique<ComponentRegisterImpl>()); diff --git a/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadimpl.cpp b/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadimpl.cpp index f4dacb7ccc3..7281c55ba25 100644 --- a/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadimpl.cpp +++ b/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadimpl.cpp @@ -116,15 +116,6 @@ ThreadImpl::setTickData(const ThreadTickData& tickData) _tickDataPtr = nextData; } -void -ThreadImpl::updateParameters(vespalib::duration waitTime, - vespalib::duration maxProcessTime, - int ticksBeforeWait) { - _properties.setWaitTime(waitTime); - _properties.setMaxProcessTime(maxProcessTime); - _properties.setTicksBeforeWait(ticksBeforeWait); -} - ThreadTickData ThreadImpl::AtomicThreadTickData::loadRelaxed() const noexcept { diff --git a/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadimpl.h b/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadimpl.h index 0757c5cdf1a..e6f0a21ea20 100644 --- a/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadimpl.h +++ b/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadimpl.h @@ -75,8 +75,6 @@ public: return _properties.getTicksBeforeWait(); } - void updateParameters(vespalib::duration waitTime, vespalib::duration maxProcessTime, int ticksBeforeWait) override; - void setTickData(const ThreadTickData&); ThreadTickData getTickData() const; const ThreadProperties& getProperties() const { return _properties; } diff --git a/storageframework/src/vespa/storageframework/generic/thread/thread.h b/storageframework/src/vespa/storageframework/generic/thread/thread.h index f11c1fbcf91..c17638a0d42 100644 --- a/storageframework/src/vespa/storageframework/generic/thread/thread.h +++ b/storageframework/src/vespa/storageframework/generic/thread/thread.h @@ -13,13 +13,8 @@ #pragma once #include "runnable.h" -#include <vespa/vespalib/stllike/string.h> #include <condition_variable> -namespace vespalib { - class Monitor; -} - namespace storage::framework { class Thread : public ThreadHandle { @@ -29,7 +24,7 @@ public: typedef std::unique_ptr<Thread> UP; Thread(vespalib::stringref id) : _id(id) {} - virtual ~Thread() {} + virtual ~Thread() = default; virtual const vespalib::string& getId() const { return _id; } @@ -50,10 +45,6 @@ public: */ virtual void join() = 0; - virtual void updateParameters(vespalib::duration waitTime, - vespalib::duration maxProcessTime, - int ticksBeforeWait) = 0; - /** * Utility function to interrupt and join a thread, possibly broadcasting * through a monitor after the signalling face. diff --git a/storageframework/src/vespa/storageframework/generic/thread/threadpool.cpp b/storageframework/src/vespa/storageframework/generic/thread/threadpool.cpp index 29be5fb07a2..480e42c91ef 100644 --- a/storageframework/src/vespa/storageframework/generic/thread/threadpool.cpp +++ b/storageframework/src/vespa/storageframework/generic/thread/threadpool.cpp @@ -7,34 +7,10 @@ namespace storage::framework { ThreadProperties::ThreadProperties(vespalib::duration waitTime, vespalib::duration maxProcessTime, int ticksBeforeWait) + : _maxProcessTime(maxProcessTime), + _waitTime(waitTime), + _ticksBeforeWait(ticksBeforeWait) { - setWaitTime(waitTime); - setMaxProcessTime(maxProcessTime); - setTicksBeforeWait(ticksBeforeWait); -} - - vespalib::duration ThreadProperties::getMaxProcessTime() const { - return _maxProcessTime.load(std::memory_order_relaxed); -} - -vespalib::duration ThreadProperties::getWaitTime() const { - return _waitTime.load(std::memory_order_relaxed); -} - -int ThreadProperties::getTicksBeforeWait() const { - return _ticksBeforeWait.load(std::memory_order_relaxed); -} - -void ThreadProperties::setMaxProcessTime(vespalib::duration maxProcessingTime) { - _maxProcessTime.store(maxProcessingTime); -} - -void ThreadProperties::setWaitTime(vespalib::duration waitTime) { - _waitTime.store(waitTime); -} - -void ThreadProperties::setTicksBeforeWait(int ticksBeforeWait) { - _ticksBeforeWait.store(ticksBeforeWait); } } diff --git a/storageframework/src/vespa/storageframework/generic/thread/threadpool.h b/storageframework/src/vespa/storageframework/generic/thread/threadpool.h index a490a6f8511..7607932e079 100644 --- a/storageframework/src/vespa/storageframework/generic/thread/threadpool.h +++ b/storageframework/src/vespa/storageframework/generic/thread/threadpool.h @@ -34,7 +34,7 @@ private: * Time this thread should maximum use to process before a tick is * registered. (Including wait time if wait time is not set) */ - std::atomic<vespalib::duration> _maxProcessTime; + vespalib::duration _maxProcessTime; /** * Time this thread will wait in a non-interrupted wait cycle. * Used in cases where a wait cycle is registered. As long as no other @@ -42,28 +42,23 @@ private: * wait time here. The deadlock detector should add a configurable * global time period before flagging deadlock anyways. */ - std::atomic<vespalib::duration> _waitTime; + vespalib::duration _waitTime; /** * Number of ticks to be done before a wait. */ - std::atomic_uint _ticksBeforeWait; + uint32_t _ticksBeforeWait; public: ThreadProperties(vespalib::duration waitTime, vespalib::duration maxProcessTime, int ticksBeforeWait); - void setMaxProcessTime(vespalib::duration); - void setWaitTime(vespalib::duration); - void setTicksBeforeWait(int); - - vespalib::duration getMaxProcessTime() const; - vespalib::duration getWaitTime() const; - int getTicksBeforeWait() const; + vespalib::duration getMaxProcessTime() const { return _maxProcessTime; } + vespalib::duration getWaitTime() const { return _waitTime; } + int getTicksBeforeWait() const { return _ticksBeforeWait; } vespalib::duration getMaxCycleTime() const { - return std::max(_maxProcessTime.load(std::memory_order_relaxed), - _waitTime.load(std::memory_order_relaxed)); + return std::max(_maxProcessTime, _waitTime); } }; diff --git a/storageframework/src/vespa/storageframework/generic/thread/tickingthread.cpp b/storageframework/src/vespa/storageframework/generic/thread/tickingthread.cpp index d7f6189d36c..1a9cb459f28 100644 --- a/storageframework/src/vespa/storageframework/generic/thread/tickingthread.cpp +++ b/storageframework/src/vespa/storageframework/generic/thread/tickingthread.cpp @@ -107,12 +107,12 @@ private: }; class TickingThreadPoolImpl final : public TickingThreadPool { - vespalib::string _name; + const vespalib::string _name; + const vespalib::duration _waitTime; + const vespalib::duration _maxProcessTime; + const uint32_t _ticksBeforeWait; std::mutex _lock; std::condition_variable _cond; - std::atomic<vespalib::duration> _waitTime; - std::atomic_uint _ticksBeforeWait; - std::atomic<vespalib::duration> _maxProcessTime; std::vector<TickingThreadRunner::SP> _tickers; std::vector<std::shared_ptr<Thread>> _threads; @@ -137,24 +137,14 @@ public: int ticksBeforeWait, vespalib::duration maxProcessTime) : _name(name), _waitTime(waitTime), - _ticksBeforeWait(ticksBeforeWait), - _maxProcessTime(maxProcessTime) {} + _maxProcessTime(maxProcessTime), + _ticksBeforeWait(ticksBeforeWait) + { } ~TickingThreadPoolImpl() override { stop(); } - void updateParametersAllThreads(vespalib::duration waitTime, vespalib::duration maxProcessTime, - int ticksBeforeWait) override { - _waitTime.store(waitTime); - _maxProcessTime.store(maxProcessTime); - _ticksBeforeWait.store(ticksBeforeWait); - // TODO: Add locking so threads not deleted while updating - for (uint32_t i=0; i<_threads.size(); ++i) { - _threads[i]->updateParameters(waitTime, maxProcessTime, ticksBeforeWait); - } - } - void addThread(TickingThread& ticker) override { ThreadIndex index = _tickers.size(); ticker.newThreadCreated(index); @@ -169,9 +159,9 @@ public: _threads.push_back(std::shared_ptr<Thread>(pool.startThread( *_tickers[i], ost.str(), - _waitTime.load(std::memory_order_relaxed), - _maxProcessTime.load(std::memory_order_relaxed), - _ticksBeforeWait.load(std::memory_order_relaxed), std::nullopt))); + _waitTime, + _maxProcessTime, + _ticksBeforeWait, std::nullopt))); } } @@ -227,4 +217,10 @@ TickingThreadPool::createDefault( return std::make_unique<TickingThreadPoolImpl>(name, waitTime, ticksBeforeWait, maxProcessTime); } +TickingThreadPool::UP +TickingThreadPool::createDefault(vespalib::stringref name, vespalib::duration waitTime) +{ + return createDefault(name, waitTime, 1, 5s); +} + } // storage::framework diff --git a/storageframework/src/vespa/storageframework/generic/thread/tickingthread.h b/storageframework/src/vespa/storageframework/generic/thread/tickingthread.h index 1265ebd7203..9ddc47c8f3a 100644 --- a/storageframework/src/vespa/storageframework/generic/thread/tickingthread.h +++ b/storageframework/src/vespa/storageframework/generic/thread/tickingthread.h @@ -81,14 +81,10 @@ struct TickingThreadPool : public ThreadLock { // TODO STRIPE: Change waitTime default to 100ms when legacy mode is removed. static TickingThreadPool::UP createDefault( vespalib::stringref name, - vespalib::duration waitTime = 5ms, - int ticksBeforeWait = 1, - vespalib::duration maxProcessTime = 5s); - - virtual void updateParametersAllThreads( vespalib::duration waitTime, - vespalib::duration maxProcessTime, - int ticksBeforeWait) = 0; + int ticksBeforeWait, + vespalib::duration maxProcessTime); + static TickingThreadPool::UP createDefault(vespalib::stringref name, vespalib::duration waitTime); ~TickingThreadPool() override = default; |