diff options
author | Tor Egge <Tor.Egge@broadpark.no> | 2021-01-13 11:01:39 +0100 |
---|---|---|
committer | Tor Egge <Tor.Egge@broadpark.no> | 2021-01-15 15:17:53 +0100 |
commit | e6f30bdbe67a994339d201ce57b2942e3bd5eb78 (patch) | |
tree | 8ca1df13c26267038af447fd4f0ff1328e0a31b0 | |
parent | 1463162fc4b86877df3314c82e76f03a769d2532 (diff) |
Add interface for resource usage listener to spi.
Propagate resource usage to service layer.
26 files changed, 420 insertions, 11 deletions
diff --git a/persistence/src/vespa/persistence/conformancetest/conformancetest.cpp b/persistence/src/vespa/persistence/conformancetest/conformancetest.cpp index 7da88e34ee9..2fbe6c8b5a6 100644 --- a/persistence/src/vespa/persistence/conformancetest/conformancetest.cpp +++ b/persistence/src/vespa/persistence/conformancetest/conformancetest.cpp @@ -3,6 +3,7 @@ #include <vespa/document/base/testdocman.h> #include <vespa/persistence/conformancetest/conformancetest.h> #include <vespa/persistence/spi/test.h> +#include <vespa/persistence/spi/resource_usage_listener.h> #include <vespa/document/fieldset/fieldsets.h> #include <vespa/document/update/documentupdate.h> #include <vespa/document/update/assignvalueupdate.h> @@ -14,6 +15,7 @@ #include <vespa/vdslib/state/nodestate.h> #include <vespa/vdslib/state/clusterstate.h> #include <vespa/vdslib/distribution/distribution.h> +#include <vespa/vespalib/util/idestructorcallback.h> #include <vespa/config-stor-distribution.h> #include <algorithm> #include <limits> @@ -2255,6 +2257,18 @@ TEST_F(ConformanceTest, testBucketSpaces) assertBucketInfo(*spi, bucket12, 1); } +TEST_F(ConformanceTest, resource_usage) +{ + ResourceUsageListener resource_usage_listener; + document::TestDocMan testDocMan; + PersistenceProviderUP spi(getSpi(*_factory, testDocMan)); + EXPECT_EQ(0.0, resource_usage_listener.get_usage().get_disk_usage()); + EXPECT_EQ(0.0, resource_usage_listener.get_usage().get_memory_usage()); + auto register_guard = spi->register_resource_usage_listener(resource_usage_listener); + EXPECT_EQ(0.5, resource_usage_listener.get_usage().get_disk_usage()); + EXPECT_EQ(0.4, resource_usage_listener.get_usage().get_memory_usage()); +} + TEST_F(ConformanceTest, detectAndTestOptionalBehavior) { // Report if implementation supports setting bucket size info. diff --git a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp index bc7310f4806..0c1c5db69d6 100644 --- a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp +++ b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp @@ -6,10 +6,13 @@ #include <vespa/document/fieldvalue/document.h> #include <vespa/document/update/documentupdate.h> #include <vespa/document/bucket/fixed_bucket_spaces.h> +#include <vespa/persistence/spi/i_resource_usage_listener.h> +#include <vespa/persistence/spi/resource_usage.h> #include <vespa/vespalib/util/crc.h> #include <vespa/document/fieldset/fieldsetrepo.h> #include <vespa/vespalib/stllike/asciistream.h> #include <vespa/vespalib/util/exceptions.h> +#include <vespa/vespalib/util/idestructorcallback.h> #include <vespa/vespalib/stllike/hash_map.hpp> #include <algorithm> #include <cassert> @@ -852,6 +855,14 @@ DummyPersistence::join(const Bucket& source1, const Bucket& source2, return Result(); } +std::unique_ptr<vespalib::IDestructorCallback> +DummyPersistence::register_resource_usage_listener(IResourceUsageListener &listener) +{ + ResourceUsage usage(0.5, 0.4); + listener.update_resource_usage(usage); + return {}; +} + std::string DummyPersistence::dumpBucket(const Bucket& b) const { diff --git a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h index 6b80d6bab0f..c37af0d33eb 100644 --- a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h +++ b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h @@ -175,6 +175,8 @@ public: Result join(const Bucket& source1, const Bucket& source2, const Bucket& target, Context&) override; + std::unique_ptr<vespalib::IDestructorCallback> register_resource_usage_listener(IResourceUsageListener& listener) override; + /** * The following methods are used only for unit testing. * DummyPersistence is used many places to test the framework around it. diff --git a/persistence/src/vespa/persistence/spi/CMakeLists.txt b/persistence/src/vespa/persistence/spi/CMakeLists.txt index efc744e0792..aad99e12a69 100644 --- a/persistence/src/vespa/persistence/spi/CMakeLists.txt +++ b/persistence/src/vespa/persistence/spi/CMakeLists.txt @@ -10,6 +10,7 @@ vespa_add_library(persistence_spi OBJECT exceptions.cpp persistenceprovider.cpp read_consistency.cpp + resource_usage_listener.cpp result.cpp selection.cpp test.cpp diff --git a/persistence/src/vespa/persistence/spi/i_resource_usage_listener.h b/persistence/src/vespa/persistence/spi/i_resource_usage_listener.h new file mode 100644 index 00000000000..72f6f8fbc59 --- /dev/null +++ b/persistence/src/vespa/persistence/spi/i_resource_usage_listener.h @@ -0,0 +1,21 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <vespa/vespalib/stllike/string.h> + +namespace storage::spi { + +class ResourceUsage; + +/* + * Interface class for listening to resource usage updates. + */ +class IResourceUsageListener +{ +public: + virtual ~IResourceUsageListener() = default; + virtual void update_resource_usage(const ResourceUsage& resource_usage) = 0; +}; + +} diff --git a/persistence/src/vespa/persistence/spi/persistenceprovider.h b/persistence/src/vespa/persistence/spi/persistenceprovider.h index d00862ed5fa..2d2864bd1fa 100644 --- a/persistence/src/vespa/persistence/spi/persistenceprovider.h +++ b/persistence/src/vespa/persistence/spi/persistenceprovider.h @@ -12,9 +12,12 @@ #include "operationcomplete.h" namespace document { class FieldSet; } +namespace vespalib { class IDestructorCallback; } namespace storage::spi { +class IResourceUsageListener; + /** * This interface is the basis for a persistence provider in Vespa. A * persistence provider is used by Vespa Storage to provide an elastic stateful @@ -374,6 +377,12 @@ struct PersistenceProvider * source1 and source2 should be stored in the target bucket. */ virtual Result join(const Bucket& source1, const Bucket& source2, const Bucket& target, Context&) = 0; + + /* + * Register a listener for updates to resource usage. + */ + virtual std::unique_ptr<vespalib::IDestructorCallback> register_resource_usage_listener(IResourceUsageListener& listener) = 0; + }; } diff --git a/persistence/src/vespa/persistence/spi/resource_usage.h b/persistence/src/vespa/persistence/spi/resource_usage.h new file mode 100644 index 00000000000..1b2504b0f13 --- /dev/null +++ b/persistence/src/vespa/persistence/spi/resource_usage.h @@ -0,0 +1,33 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +namespace storage::spi { + +/* + * Class representing resource usage for persistence provider. + * Numbers are normalized to be between 0.0 and 1.0 + */ +class ResourceUsage +{ + double _disk_usage; + double _memory_usage; +public: + + ResourceUsage(double disk_usage, double memory_usage) + : _disk_usage(disk_usage), + _memory_usage(memory_usage) + { + } + + ResourceUsage() + : ResourceUsage(0.0, 0.0) + { + } + + double get_disk_usage() const noexcept { return _disk_usage; } + double get_memory_usage() const noexcept { return _memory_usage; } +}; + +} + diff --git a/persistence/src/vespa/persistence/spi/resource_usage_listener.cpp b/persistence/src/vespa/persistence/spi/resource_usage_listener.cpp new file mode 100644 index 00000000000..123aac2a62b --- /dev/null +++ b/persistence/src/vespa/persistence/spi/resource_usage_listener.cpp @@ -0,0 +1,32 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "resource_usage_listener.h" +#include <vespa/vespalib/util/idestructorcallback.h> + +namespace storage::spi { + +ResourceUsageListener::ResourceUsageListener() + : IResourceUsageListener(), + _usage(), + _register_guard() +{ +} + +ResourceUsageListener::~ResourceUsageListener() +{ + _register_guard.reset(); +} + +void +ResourceUsageListener::update_resource_usage(const ResourceUsage& resource_usage) +{ + _usage = resource_usage; +} + +void +ResourceUsageListener::set_register_guard(std::unique_ptr<vespalib::IDestructorCallback> register_guard) +{ + _register_guard = std::move(register_guard); +} + +} diff --git a/persistence/src/vespa/persistence/spi/resource_usage_listener.h b/persistence/src/vespa/persistence/spi/resource_usage_listener.h new file mode 100644 index 00000000000..ff16c4a011a --- /dev/null +++ b/persistence/src/vespa/persistence/spi/resource_usage_listener.h @@ -0,0 +1,27 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "i_resource_usage_listener.h" +#include "resource_usage.h" + +namespace vespalib { class IDestructorCallback; } + +namespace storage::spi { + +/* + * Class for listening to resource usage updates. + */ +class ResourceUsageListener : public IResourceUsageListener +{ + ResourceUsage _usage; + std::unique_ptr<vespalib::IDestructorCallback> _register_guard; +public: + ResourceUsageListener(); + ~ResourceUsageListener() override; + void update_resource_usage(const ResourceUsage& resource_usage) override; + const ResourceUsage& get_usage() const noexcept { return _usage; } + void set_register_guard(std::unique_ptr<vespalib::IDestructorCallback> register_guard); +}; + +} diff --git a/searchcore/CMakeLists.txt b/searchcore/CMakeLists.txt index 3e95c60f21b..04cface5d40 100644 --- a/searchcore/CMakeLists.txt +++ b/searchcore/CMakeLists.txt @@ -124,6 +124,7 @@ vespa_define_module( src/tests/proton/metrics/metrics_engine src/tests/proton/persistenceconformance src/tests/proton/persistenceengine + src/tests/proton/persistenceengine/resource_usage_tracker src/tests/proton/persistenceengine/persistence_handler_map src/tests/proton/proton src/tests/proton/proton_config_fetcher diff --git a/searchcore/src/apps/tests/persistenceconformance_test.cpp b/searchcore/src/apps/tests/persistenceconformance_test.cpp index 417e47f6bda..da194d1ff7b 100644 --- a/searchcore/src/apps/tests/persistenceconformance_test.cpp +++ b/searchcore/src/apps/tests/persistenceconformance_test.cpp @@ -25,6 +25,7 @@ #include <vespa/searchcore/proton/server/memoryconfigstore.h> #include <vespa/searchcore/proton/server/persistencehandlerproxy.h> #include <vespa/searchcore/proton/server/threading_service_config.h> +#include <vespa/searchcore/proton/test/disk_mem_usage_notifier.h> #include <vespa/searchlib/index/dummyfileheadercontext.h> #include <vespa/searchlib/transactionlog/translogserver.h> #include <vespa/searchsummary/config/config-juniperrc.h> @@ -295,10 +296,11 @@ class MyPersistenceEngine : public DocDBRepoHolder, public: MyPersistenceEngine(MyPersistenceEngineOwner &owner, MyResourceWriteFilter &writeFilter, + IDiskMemUsageNotifier& disk_mem_usage_notifier, DocumentDBRepo::UP docDbRepo, const vespalib::string &docType = "") : DocDBRepoHolder(std::move(docDbRepo)), - PersistenceEngine(owner, writeFilter, -1, false) + PersistenceEngine(owner, writeFilter, disk_mem_usage_notifier, -1, false) { addHandlers(docType); } @@ -348,6 +350,7 @@ private: vespalib::string _docType; MyPersistenceEngineOwner _engineOwner; MyResourceWriteFilter _writeFilter; + test::DiskMemUsageNotifier _disk_mem_usage_notifier; public: MyPersistenceFactory(const vespalib::string &baseDir, int tlsListenPort, SchemaConfigFactory::SP schemaFactory, @@ -358,7 +361,8 @@ public: _docDbRepo(), _docType(docType), _engineOwner(), - _writeFilter() + _writeFilter(), + _disk_mem_usage_notifier(DiskMemUsageState({ 0.8, 0.5 }, { 0.8, 0.4 })) { clear(); } @@ -369,7 +373,7 @@ public: const DocumenttypesConfig &typesCfg) override { ConfigFactory cfgFactory(repo, std::make_shared<DocumenttypesConfig>(typesCfg), _schemaFactory); _docDbRepo = std::make_unique<DocumentDBRepo>(cfgFactory, _docDbFactory); - auto engine = std::make_unique<MyPersistenceEngine>(_engineOwner,_writeFilter,std::move(_docDbRepo), _docType); + auto engine = std::make_unique<MyPersistenceEngine>(_engineOwner,_writeFilter, _disk_mem_usage_notifier, std::move(_docDbRepo), _docType); assert( ! _docDbRepo); // Repo should be handed over return engine; } diff --git a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp index 170885d1d99..a8c5912aadd 100644 --- a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp +++ b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp @@ -51,6 +51,7 @@ #include <vespa/searchcore/proton/server/memoryconfigstore.h> #include <vespa/searchcore/proton/server/persistencehandlerproxy.h> #include <vespa/searchcore/proton/server/threading_service_config.h> +#include <vespa/searchcore/proton/test/disk_mem_usage_notifier.h> #include <vespa/searchlib/index/dummyfileheadercontext.h> #include <vespa/searchlib/transactionlog/translogserver.h> #include <vespa/searchsummary/config/config-juniperrc.h> @@ -655,6 +656,7 @@ struct PersistenceProviderFixture { std::shared_ptr<DocumentDB> _document_db; MyPersistenceEngineOwner _persistence_owner; MyResourceWriteFilter _write_filter; + test::DiskMemUsageNotifier _disk_mem_usage_notifier; std::shared_ptr<PersistenceEngine> _persistence_engine; std::unique_ptr<const FieldSetRepo> _field_set_repo; uint32_t _bucket_bits; @@ -724,6 +726,7 @@ PersistenceProviderFixture::PersistenceProviderFixture(const BMParams& params) _document_db(), _persistence_owner(), _write_filter(), + _disk_mem_usage_notifier(), _persistence_engine(), _field_set_repo(std::make_unique<const FieldSetRepo>(*_repo)), _bucket_bits(16), @@ -743,7 +746,7 @@ PersistenceProviderFixture::PersistenceProviderFixture(const BMParams& params) _message_bus() { create_document_db(params); - _persistence_engine = std::make_unique<PersistenceEngine>(_persistence_owner, _write_filter, -1, false); + _persistence_engine = std::make_unique<PersistenceEngine>(_persistence_owner, _write_filter, _disk_mem_usage_notifier, -1, false); auto proxy = std::make_shared<PersistenceHandlerProxy>(_document_db); _persistence_engine->putHandler(_persistence_engine->getWLock(), _bucket_space, _doc_type_name, proxy); _service_layer_config.add_builders(_config_set); diff --git a/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp b/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp index 36c7d3588c7..41ccafc9838 100644 --- a/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp +++ b/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp @@ -14,6 +14,7 @@ #include <vespa/searchcore/proton/persistenceengine/bucket_guard.h> #include <vespa/searchcore/proton/persistenceengine/ipersistenceengineowner.h> #include <vespa/searchcore/proton/persistenceengine/persistenceengine.h> +#include <vespa/searchcore/proton/test/disk_mem_usage_notifier.h> #include <vespa/vdslib/distribution/distribution.h> #include <vespa/vdslib/state/clusterstate.h> #include <vespa/vespalib/testkit/testapp.h> @@ -395,11 +396,12 @@ struct SimpleResourceWriteFilter : public IResourceWriteFilter struct SimpleFixture { SimplePersistenceEngineOwner _owner; SimpleResourceWriteFilter _writeFilter; + test::DiskMemUsageNotifier _disk_mem_usage_notifier; PersistenceEngine engine; HandlerSet hset; explicit SimpleFixture(BucketSpace bucketSpace2) : _owner(), - engine(_owner, _writeFilter, -1, false), + engine(_owner, _writeFilter, _disk_mem_usage_notifier, -1, false), hset() { engine.putHandler(engine.getWLock(), makeBucketSpace(), DocTypeName(doc1->getType()), hset.phandler1); diff --git a/searchcore/src/tests/proton/persistenceengine/resource_usage_tracker/CMakeLists.txt b/searchcore/src/tests/proton/persistenceengine/resource_usage_tracker/CMakeLists.txt new file mode 100644 index 00000000000..d1a70c5be14 --- /dev/null +++ b/searchcore/src/tests/proton/persistenceengine/resource_usage_tracker/CMakeLists.txt @@ -0,0 +1,9 @@ +# Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_executable(searchcore_resource_usage_tracker_test_app TEST + SOURCES + resource_usage_tracker_test.cpp + DEPENDS + searchcore_persistenceengine + GTest::GTest +) +vespa_add_test(NAME searchcore_resource_usage_tracker_test_app COMMAND searchcore_resource_usage_tracker_test_app) diff --git a/searchcore/src/tests/proton/persistenceengine/resource_usage_tracker/resource_usage_tracker_test.cpp b/searchcore/src/tests/proton/persistenceengine/resource_usage_tracker/resource_usage_tracker_test.cpp new file mode 100644 index 00000000000..7391f9fb3a2 --- /dev/null +++ b/searchcore/src/tests/proton/persistenceengine/resource_usage_tracker/resource_usage_tracker_test.cpp @@ -0,0 +1,85 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/persistence/spi/resource_usage_listener.h> +#include <vespa/persistence/spi/resource_usage.h> +#include <vespa/searchcore/proton/persistenceengine/resource_usage_tracker.h> +#include <vespa/searchcore/proton/test/disk_mem_usage_notifier.h> +#include <vespa/vespalib/gtest/gtest.h> +#include <vespa/vespalib/util/idestructorcallback.h> + +using storage::spi::ResourceUsage; +using proton::test::DiskMemUsageNotifier; +using proton::DiskMemUsageState; +using proton::ResourceUsageTracker; + +namespace { + +struct MyResourceUsageListener : public storage::spi::ResourceUsageListener +{ + using storage::spi::ResourceUsageListener::ResourceUsageListener; + + std::vector<double> get_usage_vector() const { return { get_usage().get_disk_usage(), get_usage().get_memory_usage() }; } +}; + +} + +class ResourceUsageTrackerTest : public ::testing::Test +{ +protected: + DiskMemUsageNotifier _notifier; + std::shared_ptr<ResourceUsageTracker> _tracker; + std::unique_ptr<MyResourceUsageListener> _listener; + +public: + ResourceUsageTrackerTest() + : testing::Test(), + _notifier(DiskMemUsageState({ 0.8, 0.5 }, { 0.8, 0.4 })), + _tracker(std::make_shared<ResourceUsageTracker>(_notifier)), + _listener(std::make_unique<MyResourceUsageListener>()) + { + } + + ~ResourceUsageTrackerTest(); + + void notify(double disk_usage, double memory_usage) + { + _notifier.notify(DiskMemUsageState({ 0.8, disk_usage }, { 0.8, memory_usage })); + } + +}; + +ResourceUsageTrackerTest::~ResourceUsageTrackerTest() = default; + +TEST_F(ResourceUsageTrackerTest, resource_usage_is_forwarded_to_listener) +{ + EXPECT_EQ((std::vector<double>{ 0.0, 0.0 }), _listener->get_usage_vector()); + auto register_guard = _tracker->set_listener(*_listener); + EXPECT_EQ((std::vector<double>{ 0.5, 0.4 }), _listener->get_usage_vector()); + notify(0.75, 0.25); + EXPECT_EQ((std::vector<double>{ 0.75, 0.25 }), _listener->get_usage_vector()); +} + +TEST_F(ResourceUsageTrackerTest, forwarding_depends_on_register_guard) +{ + auto register_guard = _tracker->set_listener(*_listener); + register_guard.reset(); + notify(0.75, 0.25); + EXPECT_EQ((std::vector<double>{ 0.5, 0.4 }), _listener->get_usage_vector()); +} + +TEST_F(ResourceUsageTrackerTest, no_forwarding_to_deleted_listener) +{ + _listener->set_register_guard(_tracker->set_listener(*_listener)); + notify(0.75, 0.25); + EXPECT_EQ((std::vector<double>{ 0.75, 0.25 }), _listener->get_usage_vector()); + _listener.reset(); + notify(0.2, 0.1); +} + +TEST_F(ResourceUsageTrackerTest, register_guard_handles_deleted_tracker) +{ + auto register_guard = _tracker->set_listener(*_listener); + _tracker.reset(); +} + +GTEST_MAIN_RUN_ALL_TESTS() diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/CMakeLists.txt b/searchcore/src/vespa/searchcore/proton/persistenceengine/CMakeLists.txt index 989f6f16b1b..6441212f247 100644 --- a/searchcore/src/vespa/searchcore/proton/persistenceengine/CMakeLists.txt +++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/CMakeLists.txt @@ -6,6 +6,7 @@ vespa_add_library(searchcore_persistenceengine STATIC i_document_retriever.cpp persistenceengine.cpp persistence_handler_map.cpp + resource_usage_tracker.cpp transport_latch.cpp DEPENDS ) diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp index 7a2d9403c29..67d96226f76 100644 --- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp +++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp @@ -180,7 +180,7 @@ PersistenceEngine::getHandlerSnapshot(const WriteGuard &, document::BucketSpace return _handlers.getHandlerSnapshot(bucketSpace); } -PersistenceEngine::PersistenceEngine(IPersistenceEngineOwner &owner, const IResourceWriteFilter &writeFilter, +PersistenceEngine::PersistenceEngine(IPersistenceEngineOwner &owner, const IResourceWriteFilter &writeFilter, IDiskMemUsageNotifier& disk_mem_usage_notifier, ssize_t defaultSerializedSize, bool ignoreMaxBytes) : AbstractPersistenceProvider(), _defaultSerializedSize(defaultSerializedSize), @@ -193,7 +193,8 @@ PersistenceEngine::PersistenceEngine(IPersistenceEngineOwner &owner, const IReso _writeFilter(writeFilter), _clusterStates(), _extraModifiedBuckets(), - _rwMutex() + _rwMutex(), + _resource_usage_tracker(std::make_shared<ResourceUsageTracker>(disk_mem_usage_notifier)) { } @@ -608,6 +609,11 @@ PersistenceEngine::join(const Bucket& source1, const Bucket& source2, const Buck return latch.getResult(); } +std::unique_ptr<vespalib::IDestructorCallback> +PersistenceEngine::register_resource_usage_listener(IResourceUsageListener& listener) +{ + return _resource_usage_tracker->set_listener(listener); +} void PersistenceEngine::destroyIterators() diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h index ee0a09cd240..659156fdea0 100644 --- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h +++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h @@ -5,6 +5,7 @@ #include "i_resource_write_filter.h" #include "persistence_handler_map.h" #include "ipersistencehandler.h" +#include "resource_usage_tracker.h" #include <vespa/persistence/spi/abstractpersistenceprovider.h> #include <mutex> #include <shared_mutex> @@ -12,6 +13,7 @@ namespace proton { class IPersistenceEngineOwner; +class IDiskMemUsageNotifier; class PersistenceEngine : public storage::spi::AbstractPersistenceProvider { private: @@ -27,6 +29,7 @@ private: using CreateIteratorResult = storage::spi::CreateIteratorResult; using GetResult = storage::spi::GetResult; using IncludedVersions = storage::spi::IncludedVersions; + using IResourceUsageListener = storage::spi::IResourceUsageListener; using IterateResult = storage::spi::IterateResult; using IteratorId = storage::spi::IteratorId; using RemoveResult = storage::spi::RemoveResult; @@ -68,7 +71,8 @@ private: const IResourceWriteFilter &_writeFilter; std::unordered_map<BucketSpace, ClusterState::SP, BucketSpace::hash> _clusterStates; mutable ExtraModifiedBuckets _extraModifiedBuckets; - mutable std::shared_mutex _rwMutex; + mutable std::shared_mutex _rwMutex; + std::shared_ptr<ResourceUsageTracker> _resource_usage_tracker; using ReadGuard = std::shared_lock<std::shared_mutex>; using WriteGuard = std::unique_lock<std::shared_mutex>; @@ -84,7 +88,7 @@ private: public: typedef std::unique_ptr<PersistenceEngine> UP; - PersistenceEngine(IPersistenceEngineOwner &owner, const IResourceWriteFilter &writeFilter, + PersistenceEngine(IPersistenceEngineOwner &owner, const IResourceWriteFilter &writeFilter, IDiskMemUsageNotifier &disk_mem_usage_notifier, ssize_t defaultSerializedSize, bool ignoreMaxBytes); ~PersistenceEngine() override; @@ -111,12 +115,13 @@ public: BucketIdListResult getModifiedBuckets(BucketSpace bucketSpace) const override; Result split(const Bucket& source, const Bucket& target1, const Bucket& target2, Context&) override; Result join(const Bucket& source1, const Bucket& source2, const Bucket& target, Context&) override; - + std::unique_ptr<vespalib::IDestructorCallback> register_resource_usage_listener(IResourceUsageListener& listener) override; void destroyIterators(); void propagateSavedClusterState(BucketSpace bucketSpace, IPersistenceHandler &handler); void grabExtraModifiedBuckets(BucketSpace bucketSpace, IPersistenceHandler &handler); void populateInitialBucketDB(const WriteGuard & guard, BucketSpace bucketSpace, IPersistenceHandler &targetHandler); WriteGuard getWLock() const; + ResourceUsageTracker &get_resource_usage_tracker() noexcept { return *_resource_usage_tracker; } }; } diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/resource_usage_tracker.cpp b/searchcore/src/vespa/searchcore/proton/persistenceengine/resource_usage_tracker.cpp new file mode 100644 index 00000000000..8830f462178 --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/resource_usage_tracker.cpp @@ -0,0 +1,80 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "resource_usage_tracker.h" +#include <vespa/searchcore/proton/server/disk_mem_usage_state.h> +#include <vespa/searchcore/proton/server/i_disk_mem_usage_notifier.h> +#include <vespa/persistence/spi/i_resource_usage_listener.h> +#include <vespa/vespalib/util/idestructorcallback.h> +#include <cassert> + +using storage::spi::ResourceUsage; + +namespace proton { + +class ResourceUsageTracker::ListenerGuard : public vespalib::IDestructorCallback +{ + std::weak_ptr<ResourceUsageTracker> _tracker; +public: + ListenerGuard(std::shared_ptr<ResourceUsageTracker> tracker); + ~ListenerGuard() override; +}; + +ResourceUsageTracker::ListenerGuard::ListenerGuard(std::shared_ptr<ResourceUsageTracker> tracker) + : _tracker(tracker) +{ +} + +ResourceUsageTracker::ListenerGuard::~ListenerGuard() +{ + auto tracker = _tracker.lock(); + if (tracker) { + tracker->remove_listener(); + } +} + +ResourceUsageTracker::ResourceUsageTracker(IDiskMemUsageNotifier& disk_mem_usage_notifier) + : std::enable_shared_from_this<ResourceUsageTracker>(), + IDiskMemUsageListener(), + _lock(), + _resource_usage(), + _listener(nullptr), + _disk_mem_usage_notifier(disk_mem_usage_notifier) +{ + _disk_mem_usage_notifier.addDiskMemUsageListener(this); +} + +ResourceUsageTracker::~ResourceUsageTracker() +{ + _disk_mem_usage_notifier.removeDiskMemUsageListener(this); + std::lock_guard guard(_lock); +} + +void +ResourceUsageTracker::notifyDiskMemUsage(DiskMemUsageState state) +{ + std::lock_guard guard(_lock); + _resource_usage = ResourceUsage(state.diskState().usage(), state.memoryState().usage()); + if (_listener != nullptr) { + _listener->update_resource_usage(_resource_usage); + } +} + +std::unique_ptr<vespalib::IDestructorCallback> +ResourceUsageTracker::set_listener(storage::spi::IResourceUsageListener& listener) +{ + std::lock_guard guard(_lock); + assert(_listener == nullptr); + _listener = &listener; + listener.update_resource_usage(_resource_usage); + return std::make_unique<ListenerGuard>(shared_from_this()); +} + +void +ResourceUsageTracker::remove_listener() +{ + std::lock_guard guard(_lock); + _listener = nullptr; +} + +}; + diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/resource_usage_tracker.h b/searchcore/src/vespa/searchcore/proton/persistenceengine/resource_usage_tracker.h new file mode 100644 index 00000000000..e41435a1a83 --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/resource_usage_tracker.h @@ -0,0 +1,37 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <vespa/searchcore/proton/server/i_disk_mem_usage_listener.h> +#include <vespa/persistence/spi/resource_usage.h> +#include <mutex> +#include <memory> +#include <vector> + +namespace storage::spi { class IResourceUsageListener; } +namespace vespalib { class IDestructorCallback; } + +namespace proton { + +class DiskMemUsageState; +class IDiskMemUsageNotifier; + +/* + * Class tracking resource usage for persistence provider. + */ +class ResourceUsageTracker : public std::enable_shared_from_this<ResourceUsageTracker>, public IDiskMemUsageListener +{ + class ListenerGuard; + std::mutex _lock; + storage::spi::ResourceUsage _resource_usage; + storage::spi::IResourceUsageListener* _listener; + IDiskMemUsageNotifier& _disk_mem_usage_notifier; + void remove_listener(); +public: + ResourceUsageTracker(IDiskMemUsageNotifier& notifier); + ~ResourceUsageTracker() override; + void notifyDiskMemUsage(DiskMemUsageState state) override; + std::unique_ptr<vespalib::IDestructorCallback> set_listener(storage::spi::IResourceUsageListener& listener); +}; + +} diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.cpp b/searchcore/src/vespa/searchcore/proton/server/proton.cpp index 36d630bc519..3f7cf14afbe 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/proton.cpp @@ -310,6 +310,7 @@ Proton::init(const BootstrapConfig::SP & configSnapshot) protonConfig.basedir.c_str(), getcwd(tmp, sizeof(tmp))); _persistenceEngine = std::make_unique<PersistenceEngine>(*this, _diskMemUsageSampler->writeFilter(), + _diskMemUsageSampler->notifier(), protonConfig.visit.defaultserializedsize, protonConfig.visit.ignoremaxbytes); diff --git a/searchcore/src/vespa/searchcore/proton/test/disk_mem_usage_notifier.h b/searchcore/src/vespa/searchcore/proton/test/disk_mem_usage_notifier.h index 55b6554ab2d..9a9b9b0ea14 100644 --- a/searchcore/src/vespa/searchcore/proton/test/disk_mem_usage_notifier.h +++ b/searchcore/src/vespa/searchcore/proton/test/disk_mem_usage_notifier.h @@ -19,7 +19,16 @@ class DiskMemUsageNotifier : public IDiskMemUsageNotifier std::vector<IDiskMemUsageListener *> _listeners; DiskMemUsageState _state; public: - DiskMemUsageNotifier() : IDiskMemUsageNotifier(), _listeners(), _state() { } + DiskMemUsageNotifier(DiskMemUsageState state) + : IDiskMemUsageNotifier(), + _listeners(), + _state(state) + { + } + DiskMemUsageNotifier() + : DiskMemUsageNotifier(DiskMemUsageState()) + { + } virtual ~DiskMemUsageNotifier() { } virtual void addDiskMemUsageListener(IDiskMemUsageListener *listener) override { _listeners.push_back(listener); diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp index 02c92fc1650..bbde377fdec 100644 --- a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp +++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp @@ -3,6 +3,7 @@ #include "persistenceproviderwrapper.h" #include <vespa/document/fieldvalue/document.h> #include <vespa/document/update/documentupdate.h> +#include <vespa/vespalib/util/idestructorcallback.h> #include <sstream> #define LOG_SPI(ops) \ @@ -202,6 +203,12 @@ PersistenceProviderWrapper::join(const spi::Bucket& source1, return _spi.join(source1, source2, target, context); } +std::unique_ptr<vespalib::IDestructorCallback> +PersistenceProviderWrapper::register_resource_usage_listener(spi::IResourceUsageListener& listener) +{ + return _spi.register_resource_usage_listener(listener); +} + spi::Result PersistenceProviderWrapper::removeEntry(const spi::Bucket& bucket, spi::Timestamp timestamp, diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.h b/storage/src/tests/persistence/common/persistenceproviderwrapper.h index 4061343c8da..cc26b251e67 100644 --- a/storage/src/tests/persistence/common/persistenceproviderwrapper.h +++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.h @@ -111,6 +111,7 @@ public: spi::Result join(const spi::Bucket& source1, const spi::Bucket& source2, const spi::Bucket& target, spi::Context&) override; spi::Result removeEntry(const spi::Bucket&, spi::Timestamp, spi::Context&) override; + std::unique_ptr<vespalib::IDestructorCallback> register_resource_usage_listener(spi::IResourceUsageListener& listener) override; }; } // storage diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp index 26cfe845eef..cf83741f5bb 100644 --- a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp +++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp @@ -2,6 +2,7 @@ #include "provider_error_wrapper.h" #include "persistenceutil.h" +#include <vespa/vespalib/util/idestructorcallback.h> namespace storage { @@ -155,6 +156,12 @@ ProviderErrorWrapper::join(const spi::Bucket& source1, const spi::Bucket& source return checkResult(_impl.join(source1, source2, target, context)); } +std::unique_ptr<vespalib::IDestructorCallback> +ProviderErrorWrapper::register_resource_usage_listener(spi::IResourceUsageListener& listener) +{ + return _impl.register_resource_usage_listener(listener); +} + spi::Result ProviderErrorWrapper::removeEntry(const spi::Bucket& bucket, spi::Timestamp ts, spi::Context& context) { diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.h b/storage/src/vespa/storage/persistence/provider_error_wrapper.h index 5e5682a8bb4..fa24dbeea45 100644 --- a/storage/src/vespa/storage/persistence/provider_error_wrapper.h +++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.h @@ -58,6 +58,7 @@ public: spi::BucketIdListResult getModifiedBuckets(BucketSpace bucketSpace) const override; spi::Result split(const spi::Bucket& source, const spi::Bucket& target1, const spi::Bucket& target2, spi::Context&) override; spi::Result join(const spi::Bucket& source1, const spi::Bucket& source2, const spi::Bucket& target, spi::Context&) override; + std::unique_ptr<vespalib::IDestructorCallback> register_resource_usage_listener(spi::IResourceUsageListener& listener) override; spi::Result removeEntry(const spi::Bucket&, spi::Timestamp, spi::Context&) override; spi::PersistenceProvider& getProviderImplementation() { |