summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2022-12-13 14:29:30 +0100
committerGitHub <noreply@github.com>2022-12-13 14:29:30 +0100
commit3753569f574b4eec9c868a676dfe539e77929b80 (patch)
tree2ddeed5dce158c3675f542394cf2398e1fc27a02
parent955bffcbec62f94b461d17c5011868351c45559d (diff)
parent83a00f9a6391f09a98f7a20ebbb5fc9143e5cb25 (diff)
Merge pull request #25239 from vespa-engine/revert-25238-revert-25229-revert-25228-revert-25227-balder/use-forward-scheduler-for-disk-mem-usage-sampler
Revert "Revert "Use the forward scheduler in proton also for disk-mem-util-sampler"""
-rw-r--r--searchcore/src/tests/proton/server/disk_mem_usage_sampler/disk_mem_usage_sampler_test.cpp14
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.cpp27
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.h23
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h24
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/proton.cpp7
5 files changed, 46 insertions, 49 deletions
diff --git a/searchcore/src/tests/proton/server/disk_mem_usage_sampler/disk_mem_usage_sampler_test.cpp b/searchcore/src/tests/proton/server/disk_mem_usage_sampler/disk_mem_usage_sampler_test.cpp
index 0c06d27c916..d01e9f8fc5f 100644
--- a/searchcore/src/tests/proton/server/disk_mem_usage_sampler/disk_mem_usage_sampler_test.cpp
+++ b/searchcore/src/tests/proton/server/disk_mem_usage_sampler/disk_mem_usage_sampler_test.cpp
@@ -1,6 +1,7 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <vespa/searchcore/proton/common/hw_info.h>
+#include <vespa/searchcore/proton/common/scheduledexecutor.h>
#include <vespa/searchcore/proton/common/i_transient_resource_usage_provider.h>
#include <vespa/searchcore/proton/server/disk_mem_usage_sampler.h>
#include <vespa/searchcore/proton/test/transport_helper.h>
@@ -39,20 +40,25 @@ public:
struct DiskMemUsageSamplerTest : public ::testing::Test {
Transport transport;
+ ScheduledExecutor executor;
std::unique_ptr<DiskMemUsageSampler> sampler;
DiskMemUsageSamplerTest()
: transport(),
- sampler(std::make_unique<DiskMemUsageSampler>(transport.transport(), ".", DiskMemUsageSampler::Config(0.8, 0.8, 50ms, make_hw_info())))
+ executor(transport.transport()),
+ sampler(std::make_unique<DiskMemUsageSampler>(".", make_hw_info()))
{
+ sampler->setConfig(DiskMemUsageSampler::Config(0.8, 0.8, 50ms, make_hw_info()), executor);
sampler->add_transient_usage_provider(std::make_shared<MyProvider>(50, 200));
sampler->add_transient_usage_provider(std::make_shared<MyProvider>(100, 150));
}
- ~DiskMemUsageSamplerTest() {
- sampler.reset();
- }
+ ~DiskMemUsageSamplerTest();
const DiskMemUsageFilter& filter() const { return sampler->writeFilter(); }
};
+DiskMemUsageSamplerTest::~DiskMemUsageSamplerTest() {
+ sampler->close(); // Ensure all tasks are stopped
+}
+
TEST_F(DiskMemUsageSamplerTest, resource_usage_is_sampled)
{
// Poll for up to 20 seconds to get a sample.
diff --git a/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.cpp b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.cpp
index 40cd6238393..f2fae014f0f 100644
--- a/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.cpp
@@ -1,7 +1,7 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "disk_mem_usage_sampler.h"
-#include <vespa/searchcore/proton/common/scheduledexecutor.h>
+#include <vespa/searchcore/proton/common/i_scheduled_executor.h>
#include <vespa/vespalib/util/lambdatask.h>
#include <vespa/searchcore/proton/common/i_transient_resource_usage_provider.h>
#include <filesystem>
@@ -10,26 +10,26 @@ using vespalib::makeLambdaTask;
namespace proton {
-DiskMemUsageSampler::DiskMemUsageSampler(FNET_Transport & transport, const std::string &path_in, const Config &config)
- : _filter(config.hwInfo),
+DiskMemUsageSampler::DiskMemUsageSampler(const std::string &path_in, const HwInfo &hwInfo)
+ : _filter(hwInfo),
_path(path_in),
_sampleInterval(60s),
- _lastSampleTime(vespalib::steady_clock::now()),
- _periodicTimer(std::make_unique<ScheduledExecutor>(transport)),
+ _lastSampleTime(),
_lock(),
- _periodicHandle(),
- _transient_usage_providers()
+ _transient_usage_providers(),
+ _periodicHandle()
{
- setConfig(config);
}
-DiskMemUsageSampler::~DiskMemUsageSampler()
-{
+DiskMemUsageSampler::~DiskMemUsageSampler() = default;
+
+void
+DiskMemUsageSampler::close() {
_periodicHandle.reset();
}
void
-DiskMemUsageSampler::setConfig(const Config &config)
+DiskMemUsageSampler::setConfig(const Config &config, IScheduledExecutor & executor)
{
_periodicHandle.reset();
_filter.setConfig(config.filterConfig);
@@ -37,7 +37,7 @@ DiskMemUsageSampler::setConfig(const Config &config)
sampleAndReportUsage();
_lastSampleTime = vespalib::steady_clock::now();
vespalib::duration maxInterval = std::min(vespalib::duration(1s), _sampleInterval);
- _periodicHandle = _periodicTimer->scheduleAtFixedRate(makeLambdaTask([this]() {
+ _periodicHandle = executor.scheduleAtFixedRate(makeLambdaTask([this]() {
if (_filter.acceptWriteOperation() && (vespalib::steady_clock::now() < (_lastSampleTime + _sampleInterval))) {
return;
}
@@ -81,8 +81,7 @@ uint64_t
attemptSampleDirectoryDiskUsageOnce(const fs::path &path)
{
uint64_t result = 0;
- for (const auto &elem : fs::recursive_directory_iterator(path,
- fs::directory_options::skip_permission_denied)) {
+ for (const auto &elem : fs::recursive_directory_iterator(path, fs::directory_options::skip_permission_denied)) {
if (fs::is_regular_file(elem.path()) && !fs::is_symlink(elem.path())) {
std::error_code fsize_err;
const auto size = fs::file_size(elem.path(), fsize_err);
diff --git a/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.h b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.h
index 2b7d3ab759f..16c89a253fd 100644
--- a/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.h
+++ b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.h
@@ -2,17 +2,14 @@
#pragma once
-#include <vespa/vespalib/util/time.h>
#include "disk_mem_usage_filter.h"
-
-class FNET_Transport;
+#include <vespa/searchcore/proton/common/i_scheduled_executor.h>
namespace vespalib { class IDestructorCallback; }
namespace proton {
class ITransientResourceUsageProvider;
-class ScheduledExecutor;
/*
* Class to sample disk and memory usage used for filtering write operations.
@@ -22,10 +19,9 @@ class DiskMemUsageSampler {
std::filesystem::path _path;
vespalib::duration _sampleInterval;
vespalib::steady_time _lastSampleTime;
- std::unique_ptr<ScheduledExecutor> _periodicTimer;
std::mutex _lock;
- std::unique_ptr<vespalib::IDestructorCallback> _periodicHandle;
std::vector<std::shared_ptr<const ITransientResourceUsageProvider>> _transient_usage_providers;
+ std::unique_ptr<vespalib::IDestructorCallback> _periodicHandle;
void sampleAndReportUsage();
uint64_t sampleDiskUsage();
@@ -41,8 +37,7 @@ public:
: filterConfig(),
sampleInterval(60s),
hwInfo()
- {
- }
+ { }
Config(double memoryLimit_in,
double diskLimit_in,
@@ -51,17 +46,14 @@ public:
: filterConfig(memoryLimit_in, diskLimit_in),
sampleInterval(sampleInterval_in),
hwInfo(hwInfo_in)
- {
- }
+ { }
};
- DiskMemUsageSampler(FNET_Transport & transport,
- const std::string &path_in,
- const Config &config);
-
+ DiskMemUsageSampler(const std::string &path_in, const HwInfo &config);
~DiskMemUsageSampler();
+ void close();
- void setConfig(const Config &config);
+ void setConfig(const Config &config, IScheduledExecutor & executor);
const DiskMemUsageFilter &writeFilter() const { return _filter; }
IDiskMemUsageNotifier &notifier() { return _filter; }
@@ -69,5 +61,4 @@ public:
void remove_transient_usage_provider(std::shared_ptr<const ITransientResourceUsageProvider> provider);
};
-
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h
index 0717a4ddc87..af2f7e8a0fc 100644
--- a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h
+++ b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h
@@ -25,7 +25,7 @@ namespace searchcorespi::index {
namespace proton {
class MaintenanceJobRunner;
-class ScheduledExecutor;
+class IScheduledExecutor;
/**
* Class that controls the bucket moving between ready and notready sub databases
@@ -81,17 +81,17 @@ private:
using Guard = std::lock_guard<Mutex>;
using TaskHandle = std::unique_ptr<vespalib::IDestructorCallback>;
- ISyncableThreadService &_masterThread;
- vespalib::MonitoredRefCount &_refCount;
- MaintenanceDocumentSubDB _readySubDB;
- MaintenanceDocumentSubDB _remSubDB;
- MaintenanceDocumentSubDB _notReadySubDB;
- std::unique_ptr<ScheduledExecutor> _periodicTimer;
- std::vector<TaskHandle> _periodicTaskHandles;
- State _state;
- const DocTypeName &_docTypeName;
- JobList _jobs;
- mutable Mutex _jobsLock;
+ ISyncableThreadService &_masterThread;
+ vespalib::MonitoredRefCount &_refCount;
+ MaintenanceDocumentSubDB _readySubDB;
+ MaintenanceDocumentSubDB _remSubDB;
+ MaintenanceDocumentSubDB _notReadySubDB;
+ std::unique_ptr<IScheduledExecutor> _periodicTimer;
+ std::vector<TaskHandle> _periodicTaskHandles;
+ State _state;
+ const DocTypeName &_docTypeName;
+ JobList _jobs;
+ mutable Mutex _jobsLock;
void addJobsToPeriodicTimer();
void restart();
diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.cpp b/searchcore/src/vespa/searchcore/proton/server/proton.cpp
index 44dfbbfba98..94b14780df5 100644
--- a/searchcore/src/vespa/searchcore/proton/server/proton.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/proton.cpp
@@ -297,8 +297,7 @@ Proton::init(const BootstrapConfig::SP & configSnapshot)
setBucketCheckSumType(protonConfig);
setFS4Compression(protonConfig);
- _diskMemUsageSampler = std::make_unique<DiskMemUsageSampler>(_transport, protonConfig.basedir,
- diskMemUsageSamplerConfig(protonConfig, hwInfo));
+ _diskMemUsageSampler = std::make_unique<DiskMemUsageSampler>(protonConfig.basedir, hwInfo);
_tls = std::make_unique<TLS>(_configUri.createWithNewId(protonConfig.tlsconfigid), _fileHeaderContext);
_metricsEngine->addMetricsHook(*_metricsHook);
@@ -348,6 +347,7 @@ Proton::init(const BootstrapConfig::SP & configSnapshot)
_shared_service = std::make_unique<SharedThreadingService>(
SharedThreadingServiceConfig::make(protonConfig, hwInfo.cpu()), _transport, *_persistenceEngine);
_scheduler = std::make_unique<ScheduledForwardExecutor>(_transport, _shared_service->shared());
+ _diskMemUsageSampler->setConfig(diskMemUsageSamplerConfig(protonConfig, hwInfo), *_scheduler);
vespalib::string fileConfigId;
_compile_cache_executor_binding = vespalib::eval::CompileCache::bind(_shared_service->shared_raw());
@@ -406,7 +406,7 @@ Proton::applyConfig(const BootstrapConfig::SP & configSnapshot)
protonConfig.search.memory.limiter.minhits);
const std::shared_ptr<const DocumentTypeRepo> repo = configSnapshot->getDocumentTypeRepoSP();
- _diskMemUsageSampler->setConfig(diskMemUsageSamplerConfig(protonConfig, configSnapshot->getHwInfo()));
+ _diskMemUsageSampler->setConfig(diskMemUsageSamplerConfig(protonConfig, configSnapshot->getHwInfo()), *_scheduler);
if (_memoryFlushConfigUpdater) {
_memoryFlushConfigUpdater->setConfig(protonConfig.flush.memory);
_flushEngine->kick();
@@ -470,6 +470,7 @@ Proton::~Proton()
_diskMemUsageSampler->notifier().removeDiskMemUsageListener(_memoryFlushConfigUpdater.get());
}
_sessionPruneHandle.reset();
+ _diskMemUsageSampler->close();
_scheduler.reset();
_executor.shutdown();
_executor.sync();