diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2022-12-13 13:35:24 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-12-13 13:35:24 +0100 |
commit | 26d8b8f992c3c3b6bee102f3b6c4551f20b9aad7 (patch) | |
tree | 66b5029386a7ce394589e650ff17b5f110865aef | |
parent | 4c4e2be6e6870db6c95c591108c4780da4696a66 (diff) |
Revert "Revert "Use the forward scheduler in proton also for disk-mem-util-sampler"""
5 files changed, 41 insertions, 46 deletions
diff --git a/searchcore/src/tests/proton/server/disk_mem_usage_sampler/disk_mem_usage_sampler_test.cpp b/searchcore/src/tests/proton/server/disk_mem_usage_sampler/disk_mem_usage_sampler_test.cpp index 0c06d27c916..5879eafd0d0 100644 --- a/searchcore/src/tests/proton/server/disk_mem_usage_sampler/disk_mem_usage_sampler_test.cpp +++ b/searchcore/src/tests/proton/server/disk_mem_usage_sampler/disk_mem_usage_sampler_test.cpp @@ -1,6 +1,7 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include <vespa/searchcore/proton/common/hw_info.h> +#include <vespa/searchcore/proton/common/scheduledexecutor.h> #include <vespa/searchcore/proton/common/i_transient_resource_usage_provider.h> #include <vespa/searchcore/proton/server/disk_mem_usage_sampler.h> #include <vespa/searchcore/proton/test/transport_helper.h> @@ -39,20 +40,23 @@ public: struct DiskMemUsageSamplerTest : public ::testing::Test { Transport transport; + ScheduledExecutor executor; std::unique_ptr<DiskMemUsageSampler> sampler; DiskMemUsageSamplerTest() : transport(), - sampler(std::make_unique<DiskMemUsageSampler>(transport.transport(), ".", DiskMemUsageSampler::Config(0.8, 0.8, 50ms, make_hw_info()))) + executor(transport.transport()), + sampler(std::make_unique<DiskMemUsageSampler>(".", make_hw_info())) { + sampler->setConfig(DiskMemUsageSampler::Config(0.8, 0.8, 50ms, make_hw_info()), executor); sampler->add_transient_usage_provider(std::make_shared<MyProvider>(50, 200)); sampler->add_transient_usage_provider(std::make_shared<MyProvider>(100, 150)); } - ~DiskMemUsageSamplerTest() { - sampler.reset(); - } + ~DiskMemUsageSamplerTest(); const DiskMemUsageFilter& filter() const { return sampler->writeFilter(); } }; +DiskMemUsageSamplerTest::~DiskMemUsageSamplerTest() = default; + TEST_F(DiskMemUsageSamplerTest, resource_usage_is_sampled) { // Poll for up to 20 seconds to get a sample. diff --git a/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.cpp b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.cpp index 40cd6238393..4e900c83821 100644 --- a/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.cpp @@ -1,7 +1,7 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "disk_mem_usage_sampler.h" -#include <vespa/searchcore/proton/common/scheduledexecutor.h> +#include <vespa/searchcore/proton/common/i_scheduled_executor.h> #include <vespa/vespalib/util/lambdatask.h> #include <vespa/searchcore/proton/common/i_transient_resource_usage_provider.h> #include <filesystem> @@ -10,26 +10,26 @@ using vespalib::makeLambdaTask; namespace proton { -DiskMemUsageSampler::DiskMemUsageSampler(FNET_Transport & transport, const std::string &path_in, const Config &config) - : _filter(config.hwInfo), +DiskMemUsageSampler::DiskMemUsageSampler(const std::string &path_in, const HwInfo &hwInfo) + : _filter(hwInfo), _path(path_in), _sampleInterval(60s), - _lastSampleTime(vespalib::steady_clock::now()), - _periodicTimer(std::make_unique<ScheduledExecutor>(transport)), + _lastSampleTime(), _lock(), _periodicHandle(), _transient_usage_providers() { - setConfig(config); } -DiskMemUsageSampler::~DiskMemUsageSampler() -{ +DiskMemUsageSampler::~DiskMemUsageSampler() = default; + +void +DiskMemUsageSampler::close() { _periodicHandle.reset(); } void -DiskMemUsageSampler::setConfig(const Config &config) +DiskMemUsageSampler::setConfig(const Config &config, IScheduledExecutor & executor) { _periodicHandle.reset(); _filter.setConfig(config.filterConfig); @@ -37,7 +37,7 @@ DiskMemUsageSampler::setConfig(const Config &config) sampleAndReportUsage(); _lastSampleTime = vespalib::steady_clock::now(); vespalib::duration maxInterval = std::min(vespalib::duration(1s), _sampleInterval); - _periodicHandle = _periodicTimer->scheduleAtFixedRate(makeLambdaTask([this]() { + _periodicHandle = executor.scheduleAtFixedRate(makeLambdaTask([this]() { if (_filter.acceptWriteOperation() && (vespalib::steady_clock::now() < (_lastSampleTime + _sampleInterval))) { return; } @@ -81,8 +81,7 @@ uint64_t attemptSampleDirectoryDiskUsageOnce(const fs::path &path) { uint64_t result = 0; - for (const auto &elem : fs::recursive_directory_iterator(path, - fs::directory_options::skip_permission_denied)) { + for (const auto &elem : fs::recursive_directory_iterator(path, fs::directory_options::skip_permission_denied)) { if (fs::is_regular_file(elem.path()) && !fs::is_symlink(elem.path())) { std::error_code fsize_err; const auto size = fs::file_size(elem.path(), fsize_err); diff --git a/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.h b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.h index 2b7d3ab759f..ed0853ac100 100644 --- a/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.h +++ b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.h @@ -2,17 +2,14 @@ #pragma once -#include <vespa/vespalib/util/time.h> #include "disk_mem_usage_filter.h" - -class FNET_Transport; +#include <vespa/searchcore/proton/common/i_scheduled_executor.h> namespace vespalib { class IDestructorCallback; } namespace proton { class ITransientResourceUsageProvider; -class ScheduledExecutor; /* * Class to sample disk and memory usage used for filtering write operations. @@ -22,7 +19,6 @@ class DiskMemUsageSampler { std::filesystem::path _path; vespalib::duration _sampleInterval; vespalib::steady_time _lastSampleTime; - std::unique_ptr<ScheduledExecutor> _periodicTimer; std::mutex _lock; std::unique_ptr<vespalib::IDestructorCallback> _periodicHandle; std::vector<std::shared_ptr<const ITransientResourceUsageProvider>> _transient_usage_providers; @@ -41,8 +37,7 @@ public: : filterConfig(), sampleInterval(60s), hwInfo() - { - } + { } Config(double memoryLimit_in, double diskLimit_in, @@ -51,17 +46,14 @@ public: : filterConfig(memoryLimit_in, diskLimit_in), sampleInterval(sampleInterval_in), hwInfo(hwInfo_in) - { - } + { } }; - DiskMemUsageSampler(FNET_Transport & transport, - const std::string &path_in, - const Config &config); - + DiskMemUsageSampler(const std::string &path_in, const HwInfo &config); ~DiskMemUsageSampler(); + void close(); - void setConfig(const Config &config); + void setConfig(const Config &config, IScheduledExecutor & executor); const DiskMemUsageFilter &writeFilter() const { return _filter; } IDiskMemUsageNotifier ¬ifier() { return _filter; } @@ -69,5 +61,4 @@ public: void remove_transient_usage_provider(std::shared_ptr<const ITransientResourceUsageProvider> provider); }; - } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h index 0717a4ddc87..af2f7e8a0fc 100644 --- a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h +++ b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h @@ -25,7 +25,7 @@ namespace searchcorespi::index { namespace proton { class MaintenanceJobRunner; -class ScheduledExecutor; +class IScheduledExecutor; /** * Class that controls the bucket moving between ready and notready sub databases @@ -81,17 +81,17 @@ private: using Guard = std::lock_guard<Mutex>; using TaskHandle = std::unique_ptr<vespalib::IDestructorCallback>; - ISyncableThreadService &_masterThread; - vespalib::MonitoredRefCount &_refCount; - MaintenanceDocumentSubDB _readySubDB; - MaintenanceDocumentSubDB _remSubDB; - MaintenanceDocumentSubDB _notReadySubDB; - std::unique_ptr<ScheduledExecutor> _periodicTimer; - std::vector<TaskHandle> _periodicTaskHandles; - State _state; - const DocTypeName &_docTypeName; - JobList _jobs; - mutable Mutex _jobsLock; + ISyncableThreadService &_masterThread; + vespalib::MonitoredRefCount &_refCount; + MaintenanceDocumentSubDB _readySubDB; + MaintenanceDocumentSubDB _remSubDB; + MaintenanceDocumentSubDB _notReadySubDB; + std::unique_ptr<IScheduledExecutor> _periodicTimer; + std::vector<TaskHandle> _periodicTaskHandles; + State _state; + const DocTypeName &_docTypeName; + JobList _jobs; + mutable Mutex _jobsLock; void addJobsToPeriodicTimer(); void restart(); diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.cpp b/searchcore/src/vespa/searchcore/proton/server/proton.cpp index 44dfbbfba98..94b14780df5 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/proton.cpp @@ -297,8 +297,7 @@ Proton::init(const BootstrapConfig::SP & configSnapshot) setBucketCheckSumType(protonConfig); setFS4Compression(protonConfig); - _diskMemUsageSampler = std::make_unique<DiskMemUsageSampler>(_transport, protonConfig.basedir, - diskMemUsageSamplerConfig(protonConfig, hwInfo)); + _diskMemUsageSampler = std::make_unique<DiskMemUsageSampler>(protonConfig.basedir, hwInfo); _tls = std::make_unique<TLS>(_configUri.createWithNewId(protonConfig.tlsconfigid), _fileHeaderContext); _metricsEngine->addMetricsHook(*_metricsHook); @@ -348,6 +347,7 @@ Proton::init(const BootstrapConfig::SP & configSnapshot) _shared_service = std::make_unique<SharedThreadingService>( SharedThreadingServiceConfig::make(protonConfig, hwInfo.cpu()), _transport, *_persistenceEngine); _scheduler = std::make_unique<ScheduledForwardExecutor>(_transport, _shared_service->shared()); + _diskMemUsageSampler->setConfig(diskMemUsageSamplerConfig(protonConfig, hwInfo), *_scheduler); vespalib::string fileConfigId; _compile_cache_executor_binding = vespalib::eval::CompileCache::bind(_shared_service->shared_raw()); @@ -406,7 +406,7 @@ Proton::applyConfig(const BootstrapConfig::SP & configSnapshot) protonConfig.search.memory.limiter.minhits); const std::shared_ptr<const DocumentTypeRepo> repo = configSnapshot->getDocumentTypeRepoSP(); - _diskMemUsageSampler->setConfig(diskMemUsageSamplerConfig(protonConfig, configSnapshot->getHwInfo())); + _diskMemUsageSampler->setConfig(diskMemUsageSamplerConfig(protonConfig, configSnapshot->getHwInfo()), *_scheduler); if (_memoryFlushConfigUpdater) { _memoryFlushConfigUpdater->setConfig(protonConfig.flush.memory); _flushEngine->kick(); @@ -470,6 +470,7 @@ Proton::~Proton() _diskMemUsageSampler->notifier().removeDiskMemUsageListener(_memoryFlushConfigUpdater.get()); } _sessionPruneHandle.reset(); + _diskMemUsageSampler->close(); _scheduler.reset(); _executor.shutdown(); _executor.sync(); |