From 0d6e7d3599dfc66277a184bde28009530edb0d30 Mon Sep 17 00:00:00 2001 From: Harald Musum Date: Tue, 13 Dec 2022 09:46:19 +0100 Subject: Revert "Use the forward scheduler in proton also for disk-mem-util-sampler" --- .../disk_mem_usage_sampler_test.cpp | 12 ++++------- .../proton/server/disk_mem_usage_sampler.cpp | 22 ++++++++++++-------- .../proton/server/disk_mem_usage_sampler.h | 19 ++++++++++++----- .../proton/server/maintenancecontroller.h | 24 +++++++++++----------- .../src/vespa/searchcore/proton/server/proton.cpp | 7 +++---- 5 files changed, 47 insertions(+), 37 deletions(-) (limited to 'searchcore') diff --git a/searchcore/src/tests/proton/server/disk_mem_usage_sampler/disk_mem_usage_sampler_test.cpp b/searchcore/src/tests/proton/server/disk_mem_usage_sampler/disk_mem_usage_sampler_test.cpp index 5879eafd0d0..0c06d27c916 100644 --- a/searchcore/src/tests/proton/server/disk_mem_usage_sampler/disk_mem_usage_sampler_test.cpp +++ b/searchcore/src/tests/proton/server/disk_mem_usage_sampler/disk_mem_usage_sampler_test.cpp @@ -1,7 +1,6 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include -#include #include #include #include @@ -40,23 +39,20 @@ public: struct DiskMemUsageSamplerTest : public ::testing::Test { Transport transport; - ScheduledExecutor executor; std::unique_ptr sampler; DiskMemUsageSamplerTest() : transport(), - executor(transport.transport()), - sampler(std::make_unique(".", make_hw_info())) + sampler(std::make_unique(transport.transport(), ".", DiskMemUsageSampler::Config(0.8, 0.8, 50ms, make_hw_info()))) { - sampler->setConfig(DiskMemUsageSampler::Config(0.8, 0.8, 50ms, make_hw_info()), executor); sampler->add_transient_usage_provider(std::make_shared(50, 200)); sampler->add_transient_usage_provider(std::make_shared(100, 150)); } - ~DiskMemUsageSamplerTest(); + ~DiskMemUsageSamplerTest() { + sampler.reset(); + } const DiskMemUsageFilter& filter() const { return sampler->writeFilter(); } }; -DiskMemUsageSamplerTest::~DiskMemUsageSamplerTest() = default; - TEST_F(DiskMemUsageSamplerTest, resource_usage_is_sampled) { // Poll for up to 20 seconds to get a sample. diff --git a/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.cpp b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.cpp index 70718b54e50..40cd6238393 100644 --- a/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.cpp @@ -1,7 +1,7 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "disk_mem_usage_sampler.h" -#include +#include #include #include #include @@ -10,21 +10,26 @@ using vespalib::makeLambdaTask; namespace proton { -DiskMemUsageSampler::DiskMemUsageSampler(const std::string &path_in, const HwInfo &hwInfo) - : _filter(hwInfo), +DiskMemUsageSampler::DiskMemUsageSampler(FNET_Transport & transport, const std::string &path_in, const Config &config) + : _filter(config.hwInfo), _path(path_in), _sampleInterval(60s), - _lastSampleTime(), + _lastSampleTime(vespalib::steady_clock::now()), + _periodicTimer(std::make_unique(transport)), _lock(), _periodicHandle(), _transient_usage_providers() { + setConfig(config); } -DiskMemUsageSampler::~DiskMemUsageSampler() = default; +DiskMemUsageSampler::~DiskMemUsageSampler() +{ + _periodicHandle.reset(); +} void -DiskMemUsageSampler::setConfig(const Config &config, IScheduledExecutor & executor) +DiskMemUsageSampler::setConfig(const Config &config) { _periodicHandle.reset(); _filter.setConfig(config.filterConfig); @@ -32,7 +37,7 @@ DiskMemUsageSampler::setConfig(const Config &config, IScheduledExecutor & execut sampleAndReportUsage(); _lastSampleTime = vespalib::steady_clock::now(); vespalib::duration maxInterval = std::min(vespalib::duration(1s), _sampleInterval); - _periodicHandle = executor.scheduleAtFixedRate(makeLambdaTask([this]() { + _periodicHandle = _periodicTimer->scheduleAtFixedRate(makeLambdaTask([this]() { if (_filter.acceptWriteOperation() && (vespalib::steady_clock::now() < (_lastSampleTime + _sampleInterval))) { return; } @@ -76,7 +81,8 @@ uint64_t attemptSampleDirectoryDiskUsageOnce(const fs::path &path) { uint64_t result = 0; - for (const auto &elem : fs::recursive_directory_iterator(path, fs::directory_options::skip_permission_denied)) { + for (const auto &elem : fs::recursive_directory_iterator(path, + fs::directory_options::skip_permission_denied)) { if (fs::is_regular_file(elem.path()) && !fs::is_symlink(elem.path())) { std::error_code fsize_err; const auto size = fs::file_size(elem.path(), fsize_err); diff --git a/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.h b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.h index c2e8f055d09..2b7d3ab759f 100644 --- a/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.h +++ b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.h @@ -2,14 +2,17 @@ #pragma once +#include #include "disk_mem_usage_filter.h" -#include + +class FNET_Transport; namespace vespalib { class IDestructorCallback; } namespace proton { class ITransientResourceUsageProvider; +class ScheduledExecutor; /* * Class to sample disk and memory usage used for filtering write operations. @@ -19,6 +22,7 @@ class DiskMemUsageSampler { std::filesystem::path _path; vespalib::duration _sampleInterval; vespalib::steady_time _lastSampleTime; + std::unique_ptr _periodicTimer; std::mutex _lock; std::unique_ptr _periodicHandle; std::vector> _transient_usage_providers; @@ -37,7 +41,8 @@ public: : filterConfig(), sampleInterval(60s), hwInfo() - { } + { + } Config(double memoryLimit_in, double diskLimit_in, @@ -46,14 +51,17 @@ public: : filterConfig(memoryLimit_in, diskLimit_in), sampleInterval(sampleInterval_in), hwInfo(hwInfo_in) - { } + { + } }; - DiskMemUsageSampler(const std::string &path_in, const HwInfo &config); + DiskMemUsageSampler(FNET_Transport & transport, + const std::string &path_in, + const Config &config); ~DiskMemUsageSampler(); - void setConfig(const Config &config, IScheduledExecutor & executor); + void setConfig(const Config &config); const DiskMemUsageFilter &writeFilter() const { return _filter; } IDiskMemUsageNotifier ¬ifier() { return _filter; } @@ -61,4 +69,5 @@ public: void remove_transient_usage_provider(std::shared_ptr provider); }; + } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h index af2f7e8a0fc..0717a4ddc87 100644 --- a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h +++ b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h @@ -25,7 +25,7 @@ namespace searchcorespi::index { namespace proton { class MaintenanceJobRunner; -class IScheduledExecutor; +class ScheduledExecutor; /** * Class that controls the bucket moving between ready and notready sub databases @@ -81,17 +81,17 @@ private: using Guard = std::lock_guard; using TaskHandle = std::unique_ptr; - ISyncableThreadService &_masterThread; - vespalib::MonitoredRefCount &_refCount; - MaintenanceDocumentSubDB _readySubDB; - MaintenanceDocumentSubDB _remSubDB; - MaintenanceDocumentSubDB _notReadySubDB; - std::unique_ptr _periodicTimer; - std::vector _periodicTaskHandles; - State _state; - const DocTypeName &_docTypeName; - JobList _jobs; - mutable Mutex _jobsLock; + ISyncableThreadService &_masterThread; + vespalib::MonitoredRefCount &_refCount; + MaintenanceDocumentSubDB _readySubDB; + MaintenanceDocumentSubDB _remSubDB; + MaintenanceDocumentSubDB _notReadySubDB; + std::unique_ptr _periodicTimer; + std::vector _periodicTaskHandles; + State _state; + const DocTypeName &_docTypeName; + JobList _jobs; + mutable Mutex _jobsLock; void addJobsToPeriodicTimer(); void restart(); diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.cpp b/searchcore/src/vespa/searchcore/proton/server/proton.cpp index acda83de1dc..44dfbbfba98 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/proton.cpp @@ -297,7 +297,8 @@ Proton::init(const BootstrapConfig::SP & configSnapshot) setBucketCheckSumType(protonConfig); setFS4Compression(protonConfig); - _diskMemUsageSampler = std::make_unique(protonConfig.basedir, hwInfo); + _diskMemUsageSampler = std::make_unique(_transport, protonConfig.basedir, + diskMemUsageSamplerConfig(protonConfig, hwInfo)); _tls = std::make_unique(_configUri.createWithNewId(protonConfig.tlsconfigid), _fileHeaderContext); _metricsEngine->addMetricsHook(*_metricsHook); @@ -347,7 +348,6 @@ Proton::init(const BootstrapConfig::SP & configSnapshot) _shared_service = std::make_unique( SharedThreadingServiceConfig::make(protonConfig, hwInfo.cpu()), _transport, *_persistenceEngine); _scheduler = std::make_unique(_transport, _shared_service->shared()); - _diskMemUsageSampler->setConfig(diskMemUsageSamplerConfig(protonConfig, hwInfo), *_scheduler); vespalib::string fileConfigId; _compile_cache_executor_binding = vespalib::eval::CompileCache::bind(_shared_service->shared_raw()); @@ -406,7 +406,7 @@ Proton::applyConfig(const BootstrapConfig::SP & configSnapshot) protonConfig.search.memory.limiter.minhits); const std::shared_ptr repo = configSnapshot->getDocumentTypeRepoSP(); - _diskMemUsageSampler->setConfig(diskMemUsageSamplerConfig(protonConfig, configSnapshot->getHwInfo()), *_scheduler); + _diskMemUsageSampler->setConfig(diskMemUsageSamplerConfig(protonConfig, configSnapshot->getHwInfo())); if (_memoryFlushConfigUpdater) { _memoryFlushConfigUpdater->setConfig(protonConfig.flush.memory); _flushEngine->kick(); @@ -470,7 +470,6 @@ Proton::~Proton() _diskMemUsageSampler->notifier().removeDiskMemUsageListener(_memoryFlushConfigUpdater.get()); } _sessionPruneHandle.reset(); - _diskMemUsageSampler.reset(); _scheduler.reset(); _executor.shutdown(); _executor.sync(); -- cgit v1.2.3