diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2022-01-05 16:42:46 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-01-05 16:42:46 +0100 |
commit | fcce4873d66e5e5140fa470a22cbb3e752159ea2 (patch) | |
tree | bc15881aaca6dda9915682ad2797efe05cc68cdc | |
parent | f979a03306ad9d80263684d71dc25a6c66188249 (diff) | |
parent | 1e0c4b8c3b239c904f7ff5c1303b61123d3d8c06 (diff) |
Merge pull request #20664 from vespa-engine/balder/drop-uncompressed-buffer-immediately-after-compression
Balder/drop uncompressed buffer immediately after compression
10 files changed, 40 insertions, 24 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/metrics/metrics_engine.cpp b/searchcore/src/vespa/searchcore/proton/metrics/metrics_engine.cpp index d087f603fe3..bb8fde53357 100644 --- a/searchcore/src/vespa/searchcore/proton/metrics/metrics_engine.cpp +++ b/searchcore/src/vespa/searchcore/proton/metrics/metrics_engine.cpp @@ -3,6 +3,7 @@ #include "attribute_metrics.h" #include "documentdb_tagged_metrics.h" #include "metrics_engine.h" +#include "content_proton_metrics.h" #include <vespa/metrics/jsonwriter.h> #include <vespa/metrics/metricmanager.h> @@ -12,7 +13,7 @@ LOG_SETUP(".proton.server.metricsengine"); namespace proton { MetricsEngine::MetricsEngine() - : _root(), + : _root(std::make_unique<ContentProtonMetrics>()), _manager(std::make_unique<metrics::MetricManager>()), _metrics_producer(*_manager) { } @@ -24,7 +25,7 @@ MetricsEngine::start(const config::ConfigUri &) { { metrics::MetricLockGuard guard(_manager->getMetricLock()); - _manager->registerMetric(guard, _root); + _manager->registerMetric(guard, *_root); } // Storage doesnt snapshot unset metrics to save memory. Currently @@ -54,28 +55,28 @@ void MetricsEngine::addExternalMetrics(metrics::Metric &child) { metrics::MetricLockGuard guard(_manager->getMetricLock()); - _root.registerMetric(child); + _root->registerMetric(child); } void MetricsEngine::removeExternalMetrics(metrics::Metric &child) { metrics::MetricLockGuard guard(_manager->getMetricLock()); - _root.unregisterMetric(child); + _root->unregisterMetric(child); } void MetricsEngine::addDocumentDBMetrics(DocumentDBTaggedMetrics &child) { metrics::MetricLockGuard guard(_manager->getMetricLock()); - _root.registerMetric(child); + _root->registerMetric(child); } void MetricsEngine::removeDocumentDBMetrics(DocumentDBTaggedMetrics &child) { metrics::MetricLockGuard guard(_manager->getMetricLock()); - _root.unregisterMetric(child); + _root->unregisterMetric(child); } namespace { diff --git a/searchcore/src/vespa/searchcore/proton/metrics/metrics_engine.h b/searchcore/src/vespa/searchcore/proton/metrics/metrics_engine.h index b4cc1bc1c7e..f90f8205817 100644 --- a/searchcore/src/vespa/searchcore/proton/metrics/metrics_engine.h +++ b/searchcore/src/vespa/searchcore/proton/metrics/metrics_engine.h @@ -2,7 +2,6 @@ #pragma once -#include "content_proton_metrics.h" #include "metricswireservice.h" #include <vespa/metrics/state_api_adapter.h> @@ -18,11 +17,12 @@ namespace proton { class AttributeMetrics; struct DocumentDBTaggedMetrics; +struct ContentProtonMetrics; class MetricsEngine : public MetricsWireService { private: - ContentProtonMetrics _root; + std::unique_ptr<ContentProtonMetrics> _root; std::unique_ptr<metrics::MetricManager> _manager; metrics::StateApiAdapter _metrics_producer; @@ -30,8 +30,8 @@ public: typedef std::unique_ptr<MetricsEngine> UP; MetricsEngine(); - virtual ~MetricsEngine(); - ContentProtonMetrics &root() { return _root; } + ~MetricsEngine() override; + ContentProtonMetrics &root() { return *_root; } void start(const config::ConfigUri & configUri); void addMetricsHook(metrics::UpdateHook &hook); void removeMetricsHook(metrics::UpdateHook &hook); diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.cpp b/searchcore/src/vespa/searchcore/proton/server/proton.cpp index 0bcbbc14650..f697c4d4672 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/proton.cpp @@ -26,6 +26,8 @@ #include <vespa/searchcore/proton/matchengine/matchengine.h> #include <vespa/searchcore/proton/persistenceengine/persistenceengine.h> #include <vespa/searchcore/proton/reference/document_db_reference_registry.h> +#include <vespa/searchcore/proton/metrics/metrics_engine.h> +#include <vespa/searchcore/proton/metrics/content_proton_metrics.h> #include <vespa/searchcore/proton/summaryengine/summaryengine.h> #include <vespa/searchlib/common/packets.h> #include <vespa/searchlib/transactionlog/trans_log_server_explorer.h> @@ -966,4 +968,9 @@ Proton::getPersistence() return *_persistenceEngine; } +metrics::MetricManager & +Proton::getMetricManager() { + return _metricsEngine->getManager(); +} + } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.h b/searchcore/src/vespa/searchcore/proton/server/proton.h index c18737f22b5..573f215c722 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton.h +++ b/searchcore/src/vespa/searchcore/proton/server/proton.h @@ -15,7 +15,6 @@ #include "shared_threading_service.h" #include <vespa/eval/eval/llvm/compile_cache.h> #include <vespa/searchcore/proton/matching/querylimiter.h> -#include <vespa/searchcore/proton/metrics/metrics_engine.h> #include <vespa/searchcore/proton/persistenceengine/i_resource_write_filter.h> #include <vespa/searchcore/proton/persistenceengine/ipersistenceengineowner.h> #include <vespa/searchlib/common/fileheadercontext.h> @@ -29,11 +28,12 @@ #include <mutex> #include <shared_mutex> -namespace vespalib { - class StateServer; -} +namespace vespalib { class StateServer; } namespace search::transactionlog { class TransLogServerApp; } -namespace metrics { class MetricLockGuard; } +namespace metrics { + class MetricLockGuard; + class MetricManager; +} namespace storage::spi { struct PersistenceProvider; } namespace proton { @@ -46,6 +46,7 @@ class SummaryEngine; class FlushEngine; class MatchEngine; class PersistenceEngine; +class MetricsEngine; class Proton : public IProtonConfigurerOwner, public search::engine::MonitorServer, @@ -179,7 +180,7 @@ public: addDocumentDB(const document::DocumentType &docType, BucketSpace bucketSpace, const BootstrapConfig::SP &configSnapshot, const std::shared_ptr<DocumentDBConfig> &documentDBConfig, InitializeThreads initializeThreads); - metrics::MetricManager & getMetricManager() { return _metricsEngine->getManager(); } + metrics::MetricManager & getMetricManager(); FastOS_ThreadPool & getThreadPool() { return _threadPool; } bool triggerFlush(); diff --git a/searchlib/src/vespa/searchlib/transactionlog/common.cpp b/searchlib/src/vespa/searchlib/transactionlog/common.cpp index 1e9923793bd..967f8748174 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/common.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/common.cpp @@ -65,6 +65,8 @@ Packet::Packet(const void * buf, size_t sz) : } } +Packet::~Packet() = default; + void Packet::merge(const Packet & packet) { diff --git a/searchlib/src/vespa/searchlib/transactionlog/common.h b/searchlib/src/vespa/searchlib/transactionlog/common.h index 98601525fdb..135681037ef 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/common.h +++ b/searchlib/src/vespa/searchlib/transactionlog/common.h @@ -70,6 +70,11 @@ public: public: explicit Packet(size_t reserved) : _count(0), _range(), _buf(reserved) { } Packet(const void * buf, size_t sz); + Packet(const Packet &) = delete; + Packet & operator =(const Packet &) = delete; + Packet(Packet &&) noexcept = default; + Packet & operator =(Packet &&) noexcept = default; + ~Packet(); void add(const Entry & data); void clear() { _buf.clear(); _count = 0; _range.from(0); _range.to(0); } const SerialNumRange & range() const { return _range; } @@ -133,6 +138,7 @@ public: void add(const Packet & packet, Writer::DoneCallback onDone); size_t sizeBytes() const { return _data.sizeBytes(); } const Packet & getPacket() const { return _data; } + Packet stealPacket() { return std::move(_data); } size_t getNumCallBacks() const { return _callBacks->size(); } Writer::CommitResult createCommitResult() const; void setCommitDoneCallback(Writer::DoneCallback onDone) { _onCommitDone = std::move(onDone); } diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp index 96b94955570..bbde39a42f6 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp @@ -35,7 +35,7 @@ createCommitChunk(const DomainConfig &cfg) { } -Domain::Domain(const string &domainName, const string & baseDir, Executor & executor, +Domain::Domain(const string &domainName, const string & baseDir, vespalib::Executor & executor, const DomainConfig & cfg, const FileHeaderContext &fileHeaderContext) : _config(cfg), _currentChunk(createCommitChunk(cfg)), @@ -420,7 +420,7 @@ Domain::doCommit(const SerializedChunk & serialized) { } cleanSessions(); LOG(debug, "Releasing %zu acks and %zu entries and %zu bytes.", - serialized.commitChunk().getNumCallBacks(), serialized.getNumEntries(), serialized.getData().size()); + serialized.getNumCallBacks(), serialized.getNumEntries(), serialized.getData().size()); } bool diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.h b/searchlib/src/vespa/searchlib/transactionlog/domain.h index eb3d0b6b10b..8dd5164689e 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domain.h +++ b/searchlib/src/vespa/searchlib/transactionlog/domain.h @@ -17,10 +17,9 @@ class Domain : public Writer { public: using SP = std::shared_ptr<Domain>; - using Executor = vespalib::SyncableThreadExecutor; using DomainPartSP = std::shared_ptr<DomainPart>; using FileHeaderContext = common::FileHeaderContext; - Domain(const vespalib::string &name, const vespalib::string &baseDir, Executor & executor, + Domain(const vespalib::string &name, const vespalib::string &baseDir, vespalib::Executor & executor, const DomainConfig & cfg, const FileHeaderContext &fileHeaderContext); ~Domain() override; @@ -85,8 +84,8 @@ private: DomainConfig _config; std::unique_ptr<CommitChunk> _currentChunk; SerialNum _lastSerial; - std::unique_ptr<Executor> _singleCommitter; - Executor &_executor; + std::unique_ptr<vespalib::SyncableThreadExecutor> _singleCommitter; + vespalib::Executor &_executor; std::atomic<int> _sessionId; std::mutex _syncMonitor; std::condition_variable _syncCond; diff --git a/searchlib/src/vespa/searchlib/transactionlog/ichunk.cpp b/searchlib/src/vespa/searchlib/transactionlog/ichunk.cpp index d86d9dc763c..2a4a042d967 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/ichunk.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/ichunk.cpp @@ -141,7 +141,7 @@ SerializedChunk::SerializedChunk(std::unique_ptr<CommitChunk> commitChunk, Encod _range(_commitChunk->getPacket().range()), _numEntries(_commitChunk->getPacket().size()) { - const Packet & packet = _commitChunk->getPacket(); + Packet packet = _commitChunk->stealPacket(); nbostream_longlivedbuf h(packet.getHandle().data(), packet.getHandle().size()); IChunk::UP chunk = IChunk::create(encoding, compressionLevel); diff --git a/searchlib/src/vespa/searchlib/transactionlog/ichunk.h b/searchlib/src/vespa/searchlib/transactionlog/ichunk.h index cf8d12c1feb..b8a1f519c50 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/ichunk.h +++ b/searchlib/src/vespa/searchlib/transactionlog/ichunk.h @@ -47,7 +47,7 @@ public: vespalib::ConstBufferRef getData() const; SerialNumRange range() const { return _range; } size_t getNumEntries() const { return _numEntries; } - const CommitChunk & commitChunk() const { return *_commitChunk; } + size_t getNumCallBacks() const { return _commitChunk->getNumCallBacks(); } private: // CommitChunk is required to ensure we do not reply until committed to the TLS. std::unique_ptr<CommitChunk> _commitChunk; |