From 759d5ab5a2b3925fce2ca260952c1997d6988bd9 Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Tue, 13 Dec 2022 10:31:35 +0100 Subject: Revert "Revert "Use the forward scheduler in proton also for disk-mem-util-sampler"" --- .../disk_mem_usage_sampler_test.cpp | 12 +++++++---- .../proton/server/disk_mem_usage_sampler.cpp | 22 ++++++++------------ .../proton/server/disk_mem_usage_sampler.h | 19 +++++------------ .../proton/server/maintenancecontroller.h | 24 +++++++++++----------- .../src/vespa/searchcore/proton/server/proton.cpp | 7 ++++--- 5 files changed, 37 insertions(+), 47 deletions(-) (limited to 'searchcore/src') diff --git a/searchcore/src/tests/proton/server/disk_mem_usage_sampler/disk_mem_usage_sampler_test.cpp b/searchcore/src/tests/proton/server/disk_mem_usage_sampler/disk_mem_usage_sampler_test.cpp index 0c06d27c916..5879eafd0d0 100644 --- a/searchcore/src/tests/proton/server/disk_mem_usage_sampler/disk_mem_usage_sampler_test.cpp +++ b/searchcore/src/tests/proton/server/disk_mem_usage_sampler/disk_mem_usage_sampler_test.cpp @@ -1,6 +1,7 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include +#include #include #include #include @@ -39,20 +40,23 @@ public: struct DiskMemUsageSamplerTest : public ::testing::Test { Transport transport; + ScheduledExecutor executor; std::unique_ptr sampler; DiskMemUsageSamplerTest() : transport(), - sampler(std::make_unique(transport.transport(), ".", DiskMemUsageSampler::Config(0.8, 0.8, 50ms, make_hw_info()))) + executor(transport.transport()), + sampler(std::make_unique(".", make_hw_info())) { + sampler->setConfig(DiskMemUsageSampler::Config(0.8, 0.8, 50ms, make_hw_info()), executor); sampler->add_transient_usage_provider(std::make_shared(50, 200)); sampler->add_transient_usage_provider(std::make_shared(100, 150)); } - ~DiskMemUsageSamplerTest() { - sampler.reset(); - } + ~DiskMemUsageSamplerTest(); const DiskMemUsageFilter& filter() const { return sampler->writeFilter(); } }; +DiskMemUsageSamplerTest::~DiskMemUsageSamplerTest() = default; + TEST_F(DiskMemUsageSamplerTest, resource_usage_is_sampled) { // Poll for up to 20 seconds to get a sample. diff --git a/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.cpp b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.cpp index 40cd6238393..70718b54e50 100644 --- a/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.cpp @@ -1,7 +1,7 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "disk_mem_usage_sampler.h" -#include +#include #include #include #include @@ -10,26 +10,21 @@ using vespalib::makeLambdaTask; namespace proton { -DiskMemUsageSampler::DiskMemUsageSampler(FNET_Transport & transport, const std::string &path_in, const Config &config) - : _filter(config.hwInfo), +DiskMemUsageSampler::DiskMemUsageSampler(const std::string &path_in, const HwInfo &hwInfo) + : _filter(hwInfo), _path(path_in), _sampleInterval(60s), - _lastSampleTime(vespalib::steady_clock::now()), - _periodicTimer(std::make_unique(transport)), + _lastSampleTime(), _lock(), _periodicHandle(), _transient_usage_providers() { - setConfig(config); } -DiskMemUsageSampler::~DiskMemUsageSampler() -{ - _periodicHandle.reset(); -} +DiskMemUsageSampler::~DiskMemUsageSampler() = default; void -DiskMemUsageSampler::setConfig(const Config &config) +DiskMemUsageSampler::setConfig(const Config &config, IScheduledExecutor & executor) { _periodicHandle.reset(); _filter.setConfig(config.filterConfig); @@ -37,7 +32,7 @@ DiskMemUsageSampler::setConfig(const Config &config) sampleAndReportUsage(); _lastSampleTime = vespalib::steady_clock::now(); vespalib::duration maxInterval = std::min(vespalib::duration(1s), _sampleInterval); - _periodicHandle = _periodicTimer->scheduleAtFixedRate(makeLambdaTask([this]() { + _periodicHandle = executor.scheduleAtFixedRate(makeLambdaTask([this]() { if (_filter.acceptWriteOperation() && (vespalib::steady_clock::now() < (_lastSampleTime + _sampleInterval))) { return; } @@ -81,8 +76,7 @@ uint64_t attemptSampleDirectoryDiskUsageOnce(const fs::path &path) { uint64_t result = 0; - for (const auto &elem : fs::recursive_directory_iterator(path, - fs::directory_options::skip_permission_denied)) { + for (const auto &elem : fs::recursive_directory_iterator(path, fs::directory_options::skip_permission_denied)) { if (fs::is_regular_file(elem.path()) && !fs::is_symlink(elem.path())) { std::error_code fsize_err; const auto size = fs::file_size(elem.path(), fsize_err); diff --git a/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.h b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.h index 2b7d3ab759f..c2e8f055d09 100644 --- a/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.h +++ b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.h @@ -2,17 +2,14 @@ #pragma once -#include #include "disk_mem_usage_filter.h" - -class FNET_Transport; +#include namespace vespalib { class IDestructorCallback; } namespace proton { class ITransientResourceUsageProvider; -class ScheduledExecutor; /* * Class to sample disk and memory usage used for filtering write operations. @@ -22,7 +19,6 @@ class DiskMemUsageSampler { std::filesystem::path _path; vespalib::duration _sampleInterval; vespalib::steady_time _lastSampleTime; - std::unique_ptr _periodicTimer; std::mutex _lock; std::unique_ptr _periodicHandle; std::vector> _transient_usage_providers; @@ -41,8 +37,7 @@ public: : filterConfig(), sampleInterval(60s), hwInfo() - { - } + { } Config(double memoryLimit_in, double diskLimit_in, @@ -51,17 +46,14 @@ public: : filterConfig(memoryLimit_in, diskLimit_in), sampleInterval(sampleInterval_in), hwInfo(hwInfo_in) - { - } + { } }; - DiskMemUsageSampler(FNET_Transport & transport, - const std::string &path_in, - const Config &config); + DiskMemUsageSampler(const std::string &path_in, const HwInfo &config); ~DiskMemUsageSampler(); - void setConfig(const Config &config); + void setConfig(const Config &config, IScheduledExecutor & executor); const DiskMemUsageFilter &writeFilter() const { return _filter; } IDiskMemUsageNotifier ¬ifier() { return _filter; } @@ -69,5 +61,4 @@ public: void remove_transient_usage_provider(std::shared_ptr provider); }; - } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h index 0717a4ddc87..af2f7e8a0fc 100644 --- a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h +++ b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h @@ -25,7 +25,7 @@ namespace searchcorespi::index { namespace proton { class MaintenanceJobRunner; -class ScheduledExecutor; +class IScheduledExecutor; /** * Class that controls the bucket moving between ready and notready sub databases @@ -81,17 +81,17 @@ private: using Guard = std::lock_guard; using TaskHandle = std::unique_ptr; - ISyncableThreadService &_masterThread; - vespalib::MonitoredRefCount &_refCount; - MaintenanceDocumentSubDB _readySubDB; - MaintenanceDocumentSubDB _remSubDB; - MaintenanceDocumentSubDB _notReadySubDB; - std::unique_ptr _periodicTimer; - std::vector _periodicTaskHandles; - State _state; - const DocTypeName &_docTypeName; - JobList _jobs; - mutable Mutex _jobsLock; + ISyncableThreadService &_masterThread; + vespalib::MonitoredRefCount &_refCount; + MaintenanceDocumentSubDB _readySubDB; + MaintenanceDocumentSubDB _remSubDB; + MaintenanceDocumentSubDB _notReadySubDB; + std::unique_ptr _periodicTimer; + std::vector _periodicTaskHandles; + State _state; + const DocTypeName &_docTypeName; + JobList _jobs; + mutable Mutex _jobsLock; void addJobsToPeriodicTimer(); void restart(); diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.cpp b/searchcore/src/vespa/searchcore/proton/server/proton.cpp index 44dfbbfba98..acda83de1dc 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/proton.cpp @@ -297,8 +297,7 @@ Proton::init(const BootstrapConfig::SP & configSnapshot) setBucketCheckSumType(protonConfig); setFS4Compression(protonConfig); - _diskMemUsageSampler = std::make_unique(_transport, protonConfig.basedir, - diskMemUsageSamplerConfig(protonConfig, hwInfo)); + _diskMemUsageSampler = std::make_unique(protonConfig.basedir, hwInfo); _tls = std::make_unique(_configUri.createWithNewId(protonConfig.tlsconfigid), _fileHeaderContext); _metricsEngine->addMetricsHook(*_metricsHook); @@ -348,6 +347,7 @@ Proton::init(const BootstrapConfig::SP & configSnapshot) _shared_service = std::make_unique( SharedThreadingServiceConfig::make(protonConfig, hwInfo.cpu()), _transport, *_persistenceEngine); _scheduler = std::make_unique(_transport, _shared_service->shared()); + _diskMemUsageSampler->setConfig(diskMemUsageSamplerConfig(protonConfig, hwInfo), *_scheduler); vespalib::string fileConfigId; _compile_cache_executor_binding = vespalib::eval::CompileCache::bind(_shared_service->shared_raw()); @@ -406,7 +406,7 @@ Proton::applyConfig(const BootstrapConfig::SP & configSnapshot) protonConfig.search.memory.limiter.minhits); const std::shared_ptr repo = configSnapshot->getDocumentTypeRepoSP(); - _diskMemUsageSampler->setConfig(diskMemUsageSamplerConfig(protonConfig, configSnapshot->getHwInfo())); + _diskMemUsageSampler->setConfig(diskMemUsageSamplerConfig(protonConfig, configSnapshot->getHwInfo()), *_scheduler); if (_memoryFlushConfigUpdater) { _memoryFlushConfigUpdater->setConfig(protonConfig.flush.memory); _flushEngine->kick(); @@ -470,6 +470,7 @@ Proton::~Proton() _diskMemUsageSampler->notifier().removeDiskMemUsageListener(_memoryFlushConfigUpdater.get()); } _sessionPruneHandle.reset(); + _diskMemUsageSampler.reset(); _scheduler.reset(); _executor.shutdown(); _executor.sync(); -- cgit v1.2.3