summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2022-12-13 10:31:35 +0100
committerGitHub <noreply@github.com>2022-12-13 10:31:35 +0100
commit759d5ab5a2b3925fce2ca260952c1997d6988bd9 (patch)
tree0e7873d6a0df1140a79198cbac4228b673680f0a
parent000fa3620f53736b7729dfd3fbdc468d00bbf355 (diff)
Revert "Revert "Use the forward scheduler in proton also for disk-mem-util-sampler""
-rw-r--r--searchcore/src/tests/proton/server/disk_mem_usage_sampler/disk_mem_usage_sampler_test.cpp12
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.cpp22
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.h19
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h24
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/proton.cpp7
5 files changed, 37 insertions, 47 deletions
diff --git a/searchcore/src/tests/proton/server/disk_mem_usage_sampler/disk_mem_usage_sampler_test.cpp b/searchcore/src/tests/proton/server/disk_mem_usage_sampler/disk_mem_usage_sampler_test.cpp
index 0c06d27c916..5879eafd0d0 100644
--- a/searchcore/src/tests/proton/server/disk_mem_usage_sampler/disk_mem_usage_sampler_test.cpp
+++ b/searchcore/src/tests/proton/server/disk_mem_usage_sampler/disk_mem_usage_sampler_test.cpp
@@ -1,6 +1,7 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <vespa/searchcore/proton/common/hw_info.h>
+#include <vespa/searchcore/proton/common/scheduledexecutor.h>
#include <vespa/searchcore/proton/common/i_transient_resource_usage_provider.h>
#include <vespa/searchcore/proton/server/disk_mem_usage_sampler.h>
#include <vespa/searchcore/proton/test/transport_helper.h>
@@ -39,20 +40,23 @@ public:
struct DiskMemUsageSamplerTest : public ::testing::Test {
Transport transport;
+ ScheduledExecutor executor;
std::unique_ptr<DiskMemUsageSampler> sampler;
DiskMemUsageSamplerTest()
: transport(),
- sampler(std::make_unique<DiskMemUsageSampler>(transport.transport(), ".", DiskMemUsageSampler::Config(0.8, 0.8, 50ms, make_hw_info())))
+ executor(transport.transport()),
+ sampler(std::make_unique<DiskMemUsageSampler>(".", make_hw_info()))
{
+ sampler->setConfig(DiskMemUsageSampler::Config(0.8, 0.8, 50ms, make_hw_info()), executor);
sampler->add_transient_usage_provider(std::make_shared<MyProvider>(50, 200));
sampler->add_transient_usage_provider(std::make_shared<MyProvider>(100, 150));
}
- ~DiskMemUsageSamplerTest() {
- sampler.reset();
- }
+ ~DiskMemUsageSamplerTest();
const DiskMemUsageFilter& filter() const { return sampler->writeFilter(); }
};
+DiskMemUsageSamplerTest::~DiskMemUsageSamplerTest() = default;
+
TEST_F(DiskMemUsageSamplerTest, resource_usage_is_sampled)
{
// Poll for up to 20 seconds to get a sample.
diff --git a/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.cpp b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.cpp
index 40cd6238393..70718b54e50 100644
--- a/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.cpp
@@ -1,7 +1,7 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "disk_mem_usage_sampler.h"
-#include <vespa/searchcore/proton/common/scheduledexecutor.h>
+#include <vespa/searchcore/proton/common/i_scheduled_executor.h>
#include <vespa/vespalib/util/lambdatask.h>
#include <vespa/searchcore/proton/common/i_transient_resource_usage_provider.h>
#include <filesystem>
@@ -10,26 +10,21 @@ using vespalib::makeLambdaTask;
namespace proton {
-DiskMemUsageSampler::DiskMemUsageSampler(FNET_Transport & transport, const std::string &path_in, const Config &config)
- : _filter(config.hwInfo),
+DiskMemUsageSampler::DiskMemUsageSampler(const std::string &path_in, const HwInfo &hwInfo)
+ : _filter(hwInfo),
_path(path_in),
_sampleInterval(60s),
- _lastSampleTime(vespalib::steady_clock::now()),
- _periodicTimer(std::make_unique<ScheduledExecutor>(transport)),
+ _lastSampleTime(),
_lock(),
_periodicHandle(),
_transient_usage_providers()
{
- setConfig(config);
}
-DiskMemUsageSampler::~DiskMemUsageSampler()
-{
- _periodicHandle.reset();
-}
+DiskMemUsageSampler::~DiskMemUsageSampler() = default;
void
-DiskMemUsageSampler::setConfig(const Config &config)
+DiskMemUsageSampler::setConfig(const Config &config, IScheduledExecutor & executor)
{
_periodicHandle.reset();
_filter.setConfig(config.filterConfig);
@@ -37,7 +32,7 @@ DiskMemUsageSampler::setConfig(const Config &config)
sampleAndReportUsage();
_lastSampleTime = vespalib::steady_clock::now();
vespalib::duration maxInterval = std::min(vespalib::duration(1s), _sampleInterval);
- _periodicHandle = _periodicTimer->scheduleAtFixedRate(makeLambdaTask([this]() {
+ _periodicHandle = executor.scheduleAtFixedRate(makeLambdaTask([this]() {
if (_filter.acceptWriteOperation() && (vespalib::steady_clock::now() < (_lastSampleTime + _sampleInterval))) {
return;
}
@@ -81,8 +76,7 @@ uint64_t
attemptSampleDirectoryDiskUsageOnce(const fs::path &path)
{
uint64_t result = 0;
- for (const auto &elem : fs::recursive_directory_iterator(path,
- fs::directory_options::skip_permission_denied)) {
+ for (const auto &elem : fs::recursive_directory_iterator(path, fs::directory_options::skip_permission_denied)) {
if (fs::is_regular_file(elem.path()) && !fs::is_symlink(elem.path())) {
std::error_code fsize_err;
const auto size = fs::file_size(elem.path(), fsize_err);
diff --git a/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.h b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.h
index 2b7d3ab759f..c2e8f055d09 100644
--- a/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.h
+++ b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.h
@@ -2,17 +2,14 @@
#pragma once
-#include <vespa/vespalib/util/time.h>
#include "disk_mem_usage_filter.h"
-
-class FNET_Transport;
+#include <vespa/searchcore/proton/common/i_scheduled_executor.h>
namespace vespalib { class IDestructorCallback; }
namespace proton {
class ITransientResourceUsageProvider;
-class ScheduledExecutor;
/*
* Class to sample disk and memory usage used for filtering write operations.
@@ -22,7 +19,6 @@ class DiskMemUsageSampler {
std::filesystem::path _path;
vespalib::duration _sampleInterval;
vespalib::steady_time _lastSampleTime;
- std::unique_ptr<ScheduledExecutor> _periodicTimer;
std::mutex _lock;
std::unique_ptr<vespalib::IDestructorCallback> _periodicHandle;
std::vector<std::shared_ptr<const ITransientResourceUsageProvider>> _transient_usage_providers;
@@ -41,8 +37,7 @@ public:
: filterConfig(),
sampleInterval(60s),
hwInfo()
- {
- }
+ { }
Config(double memoryLimit_in,
double diskLimit_in,
@@ -51,17 +46,14 @@ public:
: filterConfig(memoryLimit_in, diskLimit_in),
sampleInterval(sampleInterval_in),
hwInfo(hwInfo_in)
- {
- }
+ { }
};
- DiskMemUsageSampler(FNET_Transport & transport,
- const std::string &path_in,
- const Config &config);
+ DiskMemUsageSampler(const std::string &path_in, const HwInfo &config);
~DiskMemUsageSampler();
- void setConfig(const Config &config);
+ void setConfig(const Config &config, IScheduledExecutor & executor);
const DiskMemUsageFilter &writeFilter() const { return _filter; }
IDiskMemUsageNotifier &notifier() { return _filter; }
@@ -69,5 +61,4 @@ public:
void remove_transient_usage_provider(std::shared_ptr<const ITransientResourceUsageProvider> provider);
};
-
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h
index 0717a4ddc87..af2f7e8a0fc 100644
--- a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h
+++ b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h
@@ -25,7 +25,7 @@ namespace searchcorespi::index {
namespace proton {
class MaintenanceJobRunner;
-class ScheduledExecutor;
+class IScheduledExecutor;
/**
* Class that controls the bucket moving between ready and notready sub databases
@@ -81,17 +81,17 @@ private:
using Guard = std::lock_guard<Mutex>;
using TaskHandle = std::unique_ptr<vespalib::IDestructorCallback>;
- ISyncableThreadService &_masterThread;
- vespalib::MonitoredRefCount &_refCount;
- MaintenanceDocumentSubDB _readySubDB;
- MaintenanceDocumentSubDB _remSubDB;
- MaintenanceDocumentSubDB _notReadySubDB;
- std::unique_ptr<ScheduledExecutor> _periodicTimer;
- std::vector<TaskHandle> _periodicTaskHandles;
- State _state;
- const DocTypeName &_docTypeName;
- JobList _jobs;
- mutable Mutex _jobsLock;
+ ISyncableThreadService &_masterThread;
+ vespalib::MonitoredRefCount &_refCount;
+ MaintenanceDocumentSubDB _readySubDB;
+ MaintenanceDocumentSubDB _remSubDB;
+ MaintenanceDocumentSubDB _notReadySubDB;
+ std::unique_ptr<IScheduledExecutor> _periodicTimer;
+ std::vector<TaskHandle> _periodicTaskHandles;
+ State _state;
+ const DocTypeName &_docTypeName;
+ JobList _jobs;
+ mutable Mutex _jobsLock;
void addJobsToPeriodicTimer();
void restart();
diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.cpp b/searchcore/src/vespa/searchcore/proton/server/proton.cpp
index 44dfbbfba98..acda83de1dc 100644
--- a/searchcore/src/vespa/searchcore/proton/server/proton.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/proton.cpp
@@ -297,8 +297,7 @@ Proton::init(const BootstrapConfig::SP & configSnapshot)
setBucketCheckSumType(protonConfig);
setFS4Compression(protonConfig);
- _diskMemUsageSampler = std::make_unique<DiskMemUsageSampler>(_transport, protonConfig.basedir,
- diskMemUsageSamplerConfig(protonConfig, hwInfo));
+ _diskMemUsageSampler = std::make_unique<DiskMemUsageSampler>(protonConfig.basedir, hwInfo);
_tls = std::make_unique<TLS>(_configUri.createWithNewId(protonConfig.tlsconfigid), _fileHeaderContext);
_metricsEngine->addMetricsHook(*_metricsHook);
@@ -348,6 +347,7 @@ Proton::init(const BootstrapConfig::SP & configSnapshot)
_shared_service = std::make_unique<SharedThreadingService>(
SharedThreadingServiceConfig::make(protonConfig, hwInfo.cpu()), _transport, *_persistenceEngine);
_scheduler = std::make_unique<ScheduledForwardExecutor>(_transport, _shared_service->shared());
+ _diskMemUsageSampler->setConfig(diskMemUsageSamplerConfig(protonConfig, hwInfo), *_scheduler);
vespalib::string fileConfigId;
_compile_cache_executor_binding = vespalib::eval::CompileCache::bind(_shared_service->shared_raw());
@@ -406,7 +406,7 @@ Proton::applyConfig(const BootstrapConfig::SP & configSnapshot)
protonConfig.search.memory.limiter.minhits);
const std::shared_ptr<const DocumentTypeRepo> repo = configSnapshot->getDocumentTypeRepoSP();
- _diskMemUsageSampler->setConfig(diskMemUsageSamplerConfig(protonConfig, configSnapshot->getHwInfo()));
+ _diskMemUsageSampler->setConfig(diskMemUsageSamplerConfig(protonConfig, configSnapshot->getHwInfo()), *_scheduler);
if (_memoryFlushConfigUpdater) {
_memoryFlushConfigUpdater->setConfig(protonConfig.flush.memory);
_flushEngine->kick();
@@ -470,6 +470,7 @@ Proton::~Proton()
_diskMemUsageSampler->notifier().removeDiskMemUsageListener(_memoryFlushConfigUpdater.get());
}
_sessionPruneHandle.reset();
+ _diskMemUsageSampler.reset();
_scheduler.reset();
_executor.shutdown();
_executor.sync();