diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2022-12-13 06:45:15 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2022-12-13 06:49:13 +0000 |
commit | e7c958c4abd98a8ef43ab191d9457c089a00031f (patch) | |
tree | b86a25aeb298804b2acab49d18b154dcd0f1d862 /searchcore | |
parent | 3c457c690d1e306a0560cfb724e4c892c00fe9e4 (diff) |
Use the forward scheduler in proton also for disk-mem-util-sampler.
Diffstat (limited to 'searchcore')
4 files changed, 23 insertions, 30 deletions
diff --git a/searchcore/src/tests/proton/server/disk_mem_usage_sampler/disk_mem_usage_sampler_test.cpp b/searchcore/src/tests/proton/server/disk_mem_usage_sampler/disk_mem_usage_sampler_test.cpp index 0c06d27c916..5879eafd0d0 100644 --- a/searchcore/src/tests/proton/server/disk_mem_usage_sampler/disk_mem_usage_sampler_test.cpp +++ b/searchcore/src/tests/proton/server/disk_mem_usage_sampler/disk_mem_usage_sampler_test.cpp @@ -1,6 +1,7 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include <vespa/searchcore/proton/common/hw_info.h> +#include <vespa/searchcore/proton/common/scheduledexecutor.h> #include <vespa/searchcore/proton/common/i_transient_resource_usage_provider.h> #include <vespa/searchcore/proton/server/disk_mem_usage_sampler.h> #include <vespa/searchcore/proton/test/transport_helper.h> @@ -39,20 +40,23 @@ public: struct DiskMemUsageSamplerTest : public ::testing::Test { Transport transport; + ScheduledExecutor executor; std::unique_ptr<DiskMemUsageSampler> sampler; DiskMemUsageSamplerTest() : transport(), - sampler(std::make_unique<DiskMemUsageSampler>(transport.transport(), ".", DiskMemUsageSampler::Config(0.8, 0.8, 50ms, make_hw_info()))) + executor(transport.transport()), + sampler(std::make_unique<DiskMemUsageSampler>(".", make_hw_info())) { + sampler->setConfig(DiskMemUsageSampler::Config(0.8, 0.8, 50ms, make_hw_info()), executor); sampler->add_transient_usage_provider(std::make_shared<MyProvider>(50, 200)); sampler->add_transient_usage_provider(std::make_shared<MyProvider>(100, 150)); } - ~DiskMemUsageSamplerTest() { - sampler.reset(); - } + ~DiskMemUsageSamplerTest(); const DiskMemUsageFilter& filter() const { return sampler->writeFilter(); } }; +DiskMemUsageSamplerTest::~DiskMemUsageSamplerTest() = default; + TEST_F(DiskMemUsageSamplerTest, resource_usage_is_sampled) { // Poll for up to 20 seconds to get a sample. diff --git a/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.cpp b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.cpp index 40cd6238393..9b6ec85ce9b 100644 --- a/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.cpp @@ -10,26 +10,21 @@ using vespalib::makeLambdaTask; namespace proton { -DiskMemUsageSampler::DiskMemUsageSampler(FNET_Transport & transport, const std::string &path_in, const Config &config) - : _filter(config.hwInfo), +DiskMemUsageSampler::DiskMemUsageSampler(const std::string &path_in, const HwInfo &hwInfo) + : _filter(hwInfo), _path(path_in), _sampleInterval(60s), - _lastSampleTime(vespalib::steady_clock::now()), - _periodicTimer(std::make_unique<ScheduledExecutor>(transport)), + _lastSampleTime(), _lock(), _periodicHandle(), _transient_usage_providers() { - setConfig(config); } -DiskMemUsageSampler::~DiskMemUsageSampler() -{ - _periodicHandle.reset(); -} +DiskMemUsageSampler::~DiskMemUsageSampler() = default; void -DiskMemUsageSampler::setConfig(const Config &config) +DiskMemUsageSampler::setConfig(const Config &config, IScheduledExecutor & executor) { _periodicHandle.reset(); _filter.setConfig(config.filterConfig); @@ -37,7 +32,7 @@ DiskMemUsageSampler::setConfig(const Config &config) sampleAndReportUsage(); _lastSampleTime = vespalib::steady_clock::now(); vespalib::duration maxInterval = std::min(vespalib::duration(1s), _sampleInterval); - _periodicHandle = _periodicTimer->scheduleAtFixedRate(makeLambdaTask([this]() { + _periodicHandle = executor.scheduleAtFixedRate(makeLambdaTask([this]() { if (_filter.acceptWriteOperation() && (vespalib::steady_clock::now() < (_lastSampleTime + _sampleInterval))) { return; } diff --git a/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.h b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.h index 2b7d3ab759f..7b1ba40e30c 100644 --- a/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.h +++ b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.h @@ -2,8 +2,8 @@ #pragma once -#include <vespa/vespalib/util/time.h> #include "disk_mem_usage_filter.h" +#include <vespa/searchcore/proton/common/i_scheduled_executor.h> class FNET_Transport; @@ -12,7 +12,6 @@ namespace vespalib { class IDestructorCallback; } namespace proton { class ITransientResourceUsageProvider; -class ScheduledExecutor; /* * Class to sample disk and memory usage used for filtering write operations. @@ -22,7 +21,6 @@ class DiskMemUsageSampler { std::filesystem::path _path; vespalib::duration _sampleInterval; vespalib::steady_time _lastSampleTime; - std::unique_ptr<ScheduledExecutor> _periodicTimer; std::mutex _lock; std::unique_ptr<vespalib::IDestructorCallback> _periodicHandle; std::vector<std::shared_ptr<const ITransientResourceUsageProvider>> _transient_usage_providers; @@ -41,8 +39,7 @@ public: : filterConfig(), sampleInterval(60s), hwInfo() - { - } + { } Config(double memoryLimit_in, double diskLimit_in, @@ -51,17 +48,14 @@ public: : filterConfig(memoryLimit_in, diskLimit_in), sampleInterval(sampleInterval_in), hwInfo(hwInfo_in) - { - } + { } }; - DiskMemUsageSampler(FNET_Transport & transport, - const std::string &path_in, - const Config &config); + DiskMemUsageSampler(const std::string &path_in, const HwInfo &config); ~DiskMemUsageSampler(); - void setConfig(const Config &config); + void setConfig(const Config &config, IScheduledExecutor & executor); const DiskMemUsageFilter &writeFilter() const { return _filter; } IDiskMemUsageNotifier ¬ifier() { return _filter; } @@ -69,5 +63,4 @@ public: void remove_transient_usage_provider(std::shared_ptr<const ITransientResourceUsageProvider> provider); }; - } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.cpp b/searchcore/src/vespa/searchcore/proton/server/proton.cpp index 44dfbbfba98..acda83de1dc 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/proton.cpp @@ -297,8 +297,7 @@ Proton::init(const BootstrapConfig::SP & configSnapshot) setBucketCheckSumType(protonConfig); setFS4Compression(protonConfig); - _diskMemUsageSampler = std::make_unique<DiskMemUsageSampler>(_transport, protonConfig.basedir, - diskMemUsageSamplerConfig(protonConfig, hwInfo)); + _diskMemUsageSampler = std::make_unique<DiskMemUsageSampler>(protonConfig.basedir, hwInfo); _tls = std::make_unique<TLS>(_configUri.createWithNewId(protonConfig.tlsconfigid), _fileHeaderContext); _metricsEngine->addMetricsHook(*_metricsHook); @@ -348,6 +347,7 @@ Proton::init(const BootstrapConfig::SP & configSnapshot) _shared_service = std::make_unique<SharedThreadingService>( SharedThreadingServiceConfig::make(protonConfig, hwInfo.cpu()), _transport, *_persistenceEngine); _scheduler = std::make_unique<ScheduledForwardExecutor>(_transport, _shared_service->shared()); + _diskMemUsageSampler->setConfig(diskMemUsageSamplerConfig(protonConfig, hwInfo), *_scheduler); vespalib::string fileConfigId; _compile_cache_executor_binding = vespalib::eval::CompileCache::bind(_shared_service->shared_raw()); @@ -406,7 +406,7 @@ Proton::applyConfig(const BootstrapConfig::SP & configSnapshot) protonConfig.search.memory.limiter.minhits); const std::shared_ptr<const DocumentTypeRepo> repo = configSnapshot->getDocumentTypeRepoSP(); - _diskMemUsageSampler->setConfig(diskMemUsageSamplerConfig(protonConfig, configSnapshot->getHwInfo())); + _diskMemUsageSampler->setConfig(diskMemUsageSamplerConfig(protonConfig, configSnapshot->getHwInfo()), *_scheduler); if (_memoryFlushConfigUpdater) { _memoryFlushConfigUpdater->setConfig(protonConfig.flush.memory); _flushEngine->kick(); @@ -470,6 +470,7 @@ Proton::~Proton() _diskMemUsageSampler->notifier().removeDiskMemUsageListener(_memoryFlushConfigUpdater.get()); } _sessionPruneHandle.reset(); + _diskMemUsageSampler.reset(); _scheduler.reset(); _executor.shutdown(); _executor.sync(); |