summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@broadpark.no>2021-01-13 11:01:39 +0100
committerTor Egge <Tor.Egge@broadpark.no>2021-01-15 15:17:53 +0100
commite6f30bdbe67a994339d201ce57b2942e3bd5eb78 (patch)
tree8ca1df13c26267038af447fd4f0ff1328e0a31b0
parent1463162fc4b86877df3314c82e76f03a769d2532 (diff)
Add interface for resource usage listener to spi.
Propagate resource usage to service layer.
-rw-r--r--persistence/src/vespa/persistence/conformancetest/conformancetest.cpp14
-rw-r--r--persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp11
-rw-r--r--persistence/src/vespa/persistence/dummyimpl/dummypersistence.h2
-rw-r--r--persistence/src/vespa/persistence/spi/CMakeLists.txt1
-rw-r--r--persistence/src/vespa/persistence/spi/i_resource_usage_listener.h21
-rw-r--r--persistence/src/vespa/persistence/spi/persistenceprovider.h9
-rw-r--r--persistence/src/vespa/persistence/spi/resource_usage.h33
-rw-r--r--persistence/src/vespa/persistence/spi/resource_usage_listener.cpp32
-rw-r--r--persistence/src/vespa/persistence/spi/resource_usage_listener.h27
-rw-r--r--searchcore/CMakeLists.txt1
-rw-r--r--searchcore/src/apps/tests/persistenceconformance_test.cpp10
-rw-r--r--searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp5
-rw-r--r--searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp4
-rw-r--r--searchcore/src/tests/proton/persistenceengine/resource_usage_tracker/CMakeLists.txt9
-rw-r--r--searchcore/src/tests/proton/persistenceengine/resource_usage_tracker/resource_usage_tracker_test.cpp85
-rw-r--r--searchcore/src/vespa/searchcore/proton/persistenceengine/CMakeLists.txt1
-rw-r--r--searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp10
-rw-r--r--searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h11
-rw-r--r--searchcore/src/vespa/searchcore/proton/persistenceengine/resource_usage_tracker.cpp80
-rw-r--r--searchcore/src/vespa/searchcore/proton/persistenceengine/resource_usage_tracker.h37
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/proton.cpp1
-rw-r--r--searchcore/src/vespa/searchcore/proton/test/disk_mem_usage_notifier.h11
-rw-r--r--storage/src/tests/persistence/common/persistenceproviderwrapper.cpp7
-rw-r--r--storage/src/tests/persistence/common/persistenceproviderwrapper.h1
-rw-r--r--storage/src/vespa/storage/persistence/provider_error_wrapper.cpp7
-rw-r--r--storage/src/vespa/storage/persistence/provider_error_wrapper.h1
26 files changed, 420 insertions, 11 deletions
diff --git a/persistence/src/vespa/persistence/conformancetest/conformancetest.cpp b/persistence/src/vespa/persistence/conformancetest/conformancetest.cpp
index 7da88e34ee9..2fbe6c8b5a6 100644
--- a/persistence/src/vespa/persistence/conformancetest/conformancetest.cpp
+++ b/persistence/src/vespa/persistence/conformancetest/conformancetest.cpp
@@ -3,6 +3,7 @@
#include <vespa/document/base/testdocman.h>
#include <vespa/persistence/conformancetest/conformancetest.h>
#include <vespa/persistence/spi/test.h>
+#include <vespa/persistence/spi/resource_usage_listener.h>
#include <vespa/document/fieldset/fieldsets.h>
#include <vespa/document/update/documentupdate.h>
#include <vespa/document/update/assignvalueupdate.h>
@@ -14,6 +15,7 @@
#include <vespa/vdslib/state/nodestate.h>
#include <vespa/vdslib/state/clusterstate.h>
#include <vespa/vdslib/distribution/distribution.h>
+#include <vespa/vespalib/util/idestructorcallback.h>
#include <vespa/config-stor-distribution.h>
#include <algorithm>
#include <limits>
@@ -2255,6 +2257,18 @@ TEST_F(ConformanceTest, testBucketSpaces)
assertBucketInfo(*spi, bucket12, 1);
}
+TEST_F(ConformanceTest, resource_usage)
+{
+ ResourceUsageListener resource_usage_listener;
+ document::TestDocMan testDocMan;
+ PersistenceProviderUP spi(getSpi(*_factory, testDocMan));
+ EXPECT_EQ(0.0, resource_usage_listener.get_usage().get_disk_usage());
+ EXPECT_EQ(0.0, resource_usage_listener.get_usage().get_memory_usage());
+ auto register_guard = spi->register_resource_usage_listener(resource_usage_listener);
+ EXPECT_EQ(0.5, resource_usage_listener.get_usage().get_disk_usage());
+ EXPECT_EQ(0.4, resource_usage_listener.get_usage().get_memory_usage());
+}
+
TEST_F(ConformanceTest, detectAndTestOptionalBehavior)
{
// Report if implementation supports setting bucket size info.
diff --git a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp
index bc7310f4806..0c1c5db69d6 100644
--- a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp
+++ b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp
@@ -6,10 +6,13 @@
#include <vespa/document/fieldvalue/document.h>
#include <vespa/document/update/documentupdate.h>
#include <vespa/document/bucket/fixed_bucket_spaces.h>
+#include <vespa/persistence/spi/i_resource_usage_listener.h>
+#include <vespa/persistence/spi/resource_usage.h>
#include <vespa/vespalib/util/crc.h>
#include <vespa/document/fieldset/fieldsetrepo.h>
#include <vespa/vespalib/stllike/asciistream.h>
#include <vespa/vespalib/util/exceptions.h>
+#include <vespa/vespalib/util/idestructorcallback.h>
#include <vespa/vespalib/stllike/hash_map.hpp>
#include <algorithm>
#include <cassert>
@@ -852,6 +855,14 @@ DummyPersistence::join(const Bucket& source1, const Bucket& source2,
return Result();
}
+std::unique_ptr<vespalib::IDestructorCallback>
+DummyPersistence::register_resource_usage_listener(IResourceUsageListener &listener)
+{
+ ResourceUsage usage(0.5, 0.4);
+ listener.update_resource_usage(usage);
+ return {};
+}
+
std::string
DummyPersistence::dumpBucket(const Bucket& b) const
{
diff --git a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h
index 6b80d6bab0f..c37af0d33eb 100644
--- a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h
+++ b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h
@@ -175,6 +175,8 @@ public:
Result join(const Bucket& source1, const Bucket& source2, const Bucket& target, Context&) override;
+ std::unique_ptr<vespalib::IDestructorCallback> register_resource_usage_listener(IResourceUsageListener& listener) override;
+
/**
* The following methods are used only for unit testing.
* DummyPersistence is used many places to test the framework around it.
diff --git a/persistence/src/vespa/persistence/spi/CMakeLists.txt b/persistence/src/vespa/persistence/spi/CMakeLists.txt
index efc744e0792..aad99e12a69 100644
--- a/persistence/src/vespa/persistence/spi/CMakeLists.txt
+++ b/persistence/src/vespa/persistence/spi/CMakeLists.txt
@@ -10,6 +10,7 @@ vespa_add_library(persistence_spi OBJECT
exceptions.cpp
persistenceprovider.cpp
read_consistency.cpp
+ resource_usage_listener.cpp
result.cpp
selection.cpp
test.cpp
diff --git a/persistence/src/vespa/persistence/spi/i_resource_usage_listener.h b/persistence/src/vespa/persistence/spi/i_resource_usage_listener.h
new file mode 100644
index 00000000000..72f6f8fbc59
--- /dev/null
+++ b/persistence/src/vespa/persistence/spi/i_resource_usage_listener.h
@@ -0,0 +1,21 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <vespa/vespalib/stllike/string.h>
+
+namespace storage::spi {
+
+class ResourceUsage;
+
+/*
+ * Interface class for listening to resource usage updates.
+ */
+class IResourceUsageListener
+{
+public:
+ virtual ~IResourceUsageListener() = default;
+ virtual void update_resource_usage(const ResourceUsage& resource_usage) = 0;
+};
+
+}
diff --git a/persistence/src/vespa/persistence/spi/persistenceprovider.h b/persistence/src/vespa/persistence/spi/persistenceprovider.h
index d00862ed5fa..2d2864bd1fa 100644
--- a/persistence/src/vespa/persistence/spi/persistenceprovider.h
+++ b/persistence/src/vespa/persistence/spi/persistenceprovider.h
@@ -12,9 +12,12 @@
#include "operationcomplete.h"
namespace document { class FieldSet; }
+namespace vespalib { class IDestructorCallback; }
namespace storage::spi {
+class IResourceUsageListener;
+
/**
* This interface is the basis for a persistence provider in Vespa. A
* persistence provider is used by Vespa Storage to provide an elastic stateful
@@ -374,6 +377,12 @@ struct PersistenceProvider
* source1 and source2 should be stored in the target bucket.
*/
virtual Result join(const Bucket& source1, const Bucket& source2, const Bucket& target, Context&) = 0;
+
+ /*
+ * Register a listener for updates to resource usage.
+ */
+ virtual std::unique_ptr<vespalib::IDestructorCallback> register_resource_usage_listener(IResourceUsageListener& listener) = 0;
+
};
}
diff --git a/persistence/src/vespa/persistence/spi/resource_usage.h b/persistence/src/vespa/persistence/spi/resource_usage.h
new file mode 100644
index 00000000000..1b2504b0f13
--- /dev/null
+++ b/persistence/src/vespa/persistence/spi/resource_usage.h
@@ -0,0 +1,33 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+namespace storage::spi {
+
+/*
+ * Class representing resource usage for persistence provider.
+ * Numbers are normalized to be between 0.0 and 1.0
+ */
+class ResourceUsage
+{
+ double _disk_usage;
+ double _memory_usage;
+public:
+
+ ResourceUsage(double disk_usage, double memory_usage)
+ : _disk_usage(disk_usage),
+ _memory_usage(memory_usage)
+ {
+ }
+
+ ResourceUsage()
+ : ResourceUsage(0.0, 0.0)
+ {
+ }
+
+ double get_disk_usage() const noexcept { return _disk_usage; }
+ double get_memory_usage() const noexcept { return _memory_usage; }
+};
+
+}
+
diff --git a/persistence/src/vespa/persistence/spi/resource_usage_listener.cpp b/persistence/src/vespa/persistence/spi/resource_usage_listener.cpp
new file mode 100644
index 00000000000..123aac2a62b
--- /dev/null
+++ b/persistence/src/vespa/persistence/spi/resource_usage_listener.cpp
@@ -0,0 +1,32 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "resource_usage_listener.h"
+#include <vespa/vespalib/util/idestructorcallback.h>
+
+namespace storage::spi {
+
+ResourceUsageListener::ResourceUsageListener()
+ : IResourceUsageListener(),
+ _usage(),
+ _register_guard()
+{
+}
+
+ResourceUsageListener::~ResourceUsageListener()
+{
+ _register_guard.reset();
+}
+
+void
+ResourceUsageListener::update_resource_usage(const ResourceUsage& resource_usage)
+{
+ _usage = resource_usage;
+}
+
+void
+ResourceUsageListener::set_register_guard(std::unique_ptr<vespalib::IDestructorCallback> register_guard)
+{
+ _register_guard = std::move(register_guard);
+}
+
+}
diff --git a/persistence/src/vespa/persistence/spi/resource_usage_listener.h b/persistence/src/vespa/persistence/spi/resource_usage_listener.h
new file mode 100644
index 00000000000..ff16c4a011a
--- /dev/null
+++ b/persistence/src/vespa/persistence/spi/resource_usage_listener.h
@@ -0,0 +1,27 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "i_resource_usage_listener.h"
+#include "resource_usage.h"
+
+namespace vespalib { class IDestructorCallback; }
+
+namespace storage::spi {
+
+/*
+ * Class for listening to resource usage updates.
+ */
+class ResourceUsageListener : public IResourceUsageListener
+{
+ ResourceUsage _usage;
+ std::unique_ptr<vespalib::IDestructorCallback> _register_guard;
+public:
+ ResourceUsageListener();
+ ~ResourceUsageListener() override;
+ void update_resource_usage(const ResourceUsage& resource_usage) override;
+ const ResourceUsage& get_usage() const noexcept { return _usage; }
+ void set_register_guard(std::unique_ptr<vespalib::IDestructorCallback> register_guard);
+};
+
+}
diff --git a/searchcore/CMakeLists.txt b/searchcore/CMakeLists.txt
index 3e95c60f21b..04cface5d40 100644
--- a/searchcore/CMakeLists.txt
+++ b/searchcore/CMakeLists.txt
@@ -124,6 +124,7 @@ vespa_define_module(
src/tests/proton/metrics/metrics_engine
src/tests/proton/persistenceconformance
src/tests/proton/persistenceengine
+ src/tests/proton/persistenceengine/resource_usage_tracker
src/tests/proton/persistenceengine/persistence_handler_map
src/tests/proton/proton
src/tests/proton/proton_config_fetcher
diff --git a/searchcore/src/apps/tests/persistenceconformance_test.cpp b/searchcore/src/apps/tests/persistenceconformance_test.cpp
index 417e47f6bda..da194d1ff7b 100644
--- a/searchcore/src/apps/tests/persistenceconformance_test.cpp
+++ b/searchcore/src/apps/tests/persistenceconformance_test.cpp
@@ -25,6 +25,7 @@
#include <vespa/searchcore/proton/server/memoryconfigstore.h>
#include <vespa/searchcore/proton/server/persistencehandlerproxy.h>
#include <vespa/searchcore/proton/server/threading_service_config.h>
+#include <vespa/searchcore/proton/test/disk_mem_usage_notifier.h>
#include <vespa/searchlib/index/dummyfileheadercontext.h>
#include <vespa/searchlib/transactionlog/translogserver.h>
#include <vespa/searchsummary/config/config-juniperrc.h>
@@ -295,10 +296,11 @@ class MyPersistenceEngine : public DocDBRepoHolder,
public:
MyPersistenceEngine(MyPersistenceEngineOwner &owner,
MyResourceWriteFilter &writeFilter,
+ IDiskMemUsageNotifier& disk_mem_usage_notifier,
DocumentDBRepo::UP docDbRepo,
const vespalib::string &docType = "")
: DocDBRepoHolder(std::move(docDbRepo)),
- PersistenceEngine(owner, writeFilter, -1, false)
+ PersistenceEngine(owner, writeFilter, disk_mem_usage_notifier, -1, false)
{
addHandlers(docType);
}
@@ -348,6 +350,7 @@ private:
vespalib::string _docType;
MyPersistenceEngineOwner _engineOwner;
MyResourceWriteFilter _writeFilter;
+ test::DiskMemUsageNotifier _disk_mem_usage_notifier;
public:
MyPersistenceFactory(const vespalib::string &baseDir, int tlsListenPort,
SchemaConfigFactory::SP schemaFactory,
@@ -358,7 +361,8 @@ public:
_docDbRepo(),
_docType(docType),
_engineOwner(),
- _writeFilter()
+ _writeFilter(),
+ _disk_mem_usage_notifier(DiskMemUsageState({ 0.8, 0.5 }, { 0.8, 0.4 }))
{
clear();
}
@@ -369,7 +373,7 @@ public:
const DocumenttypesConfig &typesCfg) override {
ConfigFactory cfgFactory(repo, std::make_shared<DocumenttypesConfig>(typesCfg), _schemaFactory);
_docDbRepo = std::make_unique<DocumentDBRepo>(cfgFactory, _docDbFactory);
- auto engine = std::make_unique<MyPersistenceEngine>(_engineOwner,_writeFilter,std::move(_docDbRepo), _docType);
+ auto engine = std::make_unique<MyPersistenceEngine>(_engineOwner,_writeFilter, _disk_mem_usage_notifier, std::move(_docDbRepo), _docType);
assert( ! _docDbRepo); // Repo should be handed over
return engine;
}
diff --git a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp
index 170885d1d99..a8c5912aadd 100644
--- a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp
+++ b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp
@@ -51,6 +51,7 @@
#include <vespa/searchcore/proton/server/memoryconfigstore.h>
#include <vespa/searchcore/proton/server/persistencehandlerproxy.h>
#include <vespa/searchcore/proton/server/threading_service_config.h>
+#include <vespa/searchcore/proton/test/disk_mem_usage_notifier.h>
#include <vespa/searchlib/index/dummyfileheadercontext.h>
#include <vespa/searchlib/transactionlog/translogserver.h>
#include <vespa/searchsummary/config/config-juniperrc.h>
@@ -655,6 +656,7 @@ struct PersistenceProviderFixture {
std::shared_ptr<DocumentDB> _document_db;
MyPersistenceEngineOwner _persistence_owner;
MyResourceWriteFilter _write_filter;
+ test::DiskMemUsageNotifier _disk_mem_usage_notifier;
std::shared_ptr<PersistenceEngine> _persistence_engine;
std::unique_ptr<const FieldSetRepo> _field_set_repo;
uint32_t _bucket_bits;
@@ -724,6 +726,7 @@ PersistenceProviderFixture::PersistenceProviderFixture(const BMParams& params)
_document_db(),
_persistence_owner(),
_write_filter(),
+ _disk_mem_usage_notifier(),
_persistence_engine(),
_field_set_repo(std::make_unique<const FieldSetRepo>(*_repo)),
_bucket_bits(16),
@@ -743,7 +746,7 @@ PersistenceProviderFixture::PersistenceProviderFixture(const BMParams& params)
_message_bus()
{
create_document_db(params);
- _persistence_engine = std::make_unique<PersistenceEngine>(_persistence_owner, _write_filter, -1, false);
+ _persistence_engine = std::make_unique<PersistenceEngine>(_persistence_owner, _write_filter, _disk_mem_usage_notifier, -1, false);
auto proxy = std::make_shared<PersistenceHandlerProxy>(_document_db);
_persistence_engine->putHandler(_persistence_engine->getWLock(), _bucket_space, _doc_type_name, proxy);
_service_layer_config.add_builders(_config_set);
diff --git a/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp b/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp
index 36c7d3588c7..41ccafc9838 100644
--- a/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp
+++ b/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp
@@ -14,6 +14,7 @@
#include <vespa/searchcore/proton/persistenceengine/bucket_guard.h>
#include <vespa/searchcore/proton/persistenceengine/ipersistenceengineowner.h>
#include <vespa/searchcore/proton/persistenceengine/persistenceengine.h>
+#include <vespa/searchcore/proton/test/disk_mem_usage_notifier.h>
#include <vespa/vdslib/distribution/distribution.h>
#include <vespa/vdslib/state/clusterstate.h>
#include <vespa/vespalib/testkit/testapp.h>
@@ -395,11 +396,12 @@ struct SimpleResourceWriteFilter : public IResourceWriteFilter
struct SimpleFixture {
SimplePersistenceEngineOwner _owner;
SimpleResourceWriteFilter _writeFilter;
+ test::DiskMemUsageNotifier _disk_mem_usage_notifier;
PersistenceEngine engine;
HandlerSet hset;
explicit SimpleFixture(BucketSpace bucketSpace2)
: _owner(),
- engine(_owner, _writeFilter, -1, false),
+ engine(_owner, _writeFilter, _disk_mem_usage_notifier, -1, false),
hset()
{
engine.putHandler(engine.getWLock(), makeBucketSpace(), DocTypeName(doc1->getType()), hset.phandler1);
diff --git a/searchcore/src/tests/proton/persistenceengine/resource_usage_tracker/CMakeLists.txt b/searchcore/src/tests/proton/persistenceengine/resource_usage_tracker/CMakeLists.txt
new file mode 100644
index 00000000000..d1a70c5be14
--- /dev/null
+++ b/searchcore/src/tests/proton/persistenceengine/resource_usage_tracker/CMakeLists.txt
@@ -0,0 +1,9 @@
+# Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+vespa_add_executable(searchcore_resource_usage_tracker_test_app TEST
+ SOURCES
+ resource_usage_tracker_test.cpp
+ DEPENDS
+ searchcore_persistenceengine
+ GTest::GTest
+)
+vespa_add_test(NAME searchcore_resource_usage_tracker_test_app COMMAND searchcore_resource_usage_tracker_test_app)
diff --git a/searchcore/src/tests/proton/persistenceengine/resource_usage_tracker/resource_usage_tracker_test.cpp b/searchcore/src/tests/proton/persistenceengine/resource_usage_tracker/resource_usage_tracker_test.cpp
new file mode 100644
index 00000000000..7391f9fb3a2
--- /dev/null
+++ b/searchcore/src/tests/proton/persistenceengine/resource_usage_tracker/resource_usage_tracker_test.cpp
@@ -0,0 +1,85 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/persistence/spi/resource_usage_listener.h>
+#include <vespa/persistence/spi/resource_usage.h>
+#include <vespa/searchcore/proton/persistenceengine/resource_usage_tracker.h>
+#include <vespa/searchcore/proton/test/disk_mem_usage_notifier.h>
+#include <vespa/vespalib/gtest/gtest.h>
+#include <vespa/vespalib/util/idestructorcallback.h>
+
+using storage::spi::ResourceUsage;
+using proton::test::DiskMemUsageNotifier;
+using proton::DiskMemUsageState;
+using proton::ResourceUsageTracker;
+
+namespace {
+
+struct MyResourceUsageListener : public storage::spi::ResourceUsageListener
+{
+ using storage::spi::ResourceUsageListener::ResourceUsageListener;
+
+ std::vector<double> get_usage_vector() const { return { get_usage().get_disk_usage(), get_usage().get_memory_usage() }; }
+};
+
+}
+
+class ResourceUsageTrackerTest : public ::testing::Test
+{
+protected:
+ DiskMemUsageNotifier _notifier;
+ std::shared_ptr<ResourceUsageTracker> _tracker;
+ std::unique_ptr<MyResourceUsageListener> _listener;
+
+public:
+ ResourceUsageTrackerTest()
+ : testing::Test(),
+ _notifier(DiskMemUsageState({ 0.8, 0.5 }, { 0.8, 0.4 })),
+ _tracker(std::make_shared<ResourceUsageTracker>(_notifier)),
+ _listener(std::make_unique<MyResourceUsageListener>())
+ {
+ }
+
+ ~ResourceUsageTrackerTest();
+
+ void notify(double disk_usage, double memory_usage)
+ {
+ _notifier.notify(DiskMemUsageState({ 0.8, disk_usage }, { 0.8, memory_usage }));
+ }
+
+};
+
+ResourceUsageTrackerTest::~ResourceUsageTrackerTest() = default;
+
+TEST_F(ResourceUsageTrackerTest, resource_usage_is_forwarded_to_listener)
+{
+ EXPECT_EQ((std::vector<double>{ 0.0, 0.0 }), _listener->get_usage_vector());
+ auto register_guard = _tracker->set_listener(*_listener);
+ EXPECT_EQ((std::vector<double>{ 0.5, 0.4 }), _listener->get_usage_vector());
+ notify(0.75, 0.25);
+ EXPECT_EQ((std::vector<double>{ 0.75, 0.25 }), _listener->get_usage_vector());
+}
+
+TEST_F(ResourceUsageTrackerTest, forwarding_depends_on_register_guard)
+{
+ auto register_guard = _tracker->set_listener(*_listener);
+ register_guard.reset();
+ notify(0.75, 0.25);
+ EXPECT_EQ((std::vector<double>{ 0.5, 0.4 }), _listener->get_usage_vector());
+}
+
+TEST_F(ResourceUsageTrackerTest, no_forwarding_to_deleted_listener)
+{
+ _listener->set_register_guard(_tracker->set_listener(*_listener));
+ notify(0.75, 0.25);
+ EXPECT_EQ((std::vector<double>{ 0.75, 0.25 }), _listener->get_usage_vector());
+ _listener.reset();
+ notify(0.2, 0.1);
+}
+
+TEST_F(ResourceUsageTrackerTest, register_guard_handles_deleted_tracker)
+{
+ auto register_guard = _tracker->set_listener(*_listener);
+ _tracker.reset();
+}
+
+GTEST_MAIN_RUN_ALL_TESTS()
diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/CMakeLists.txt b/searchcore/src/vespa/searchcore/proton/persistenceengine/CMakeLists.txt
index 989f6f16b1b..6441212f247 100644
--- a/searchcore/src/vespa/searchcore/proton/persistenceengine/CMakeLists.txt
+++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/CMakeLists.txt
@@ -6,6 +6,7 @@ vespa_add_library(searchcore_persistenceengine STATIC
i_document_retriever.cpp
persistenceengine.cpp
persistence_handler_map.cpp
+ resource_usage_tracker.cpp
transport_latch.cpp
DEPENDS
)
diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp
index 7a2d9403c29..67d96226f76 100644
--- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp
+++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp
@@ -180,7 +180,7 @@ PersistenceEngine::getHandlerSnapshot(const WriteGuard &, document::BucketSpace
return _handlers.getHandlerSnapshot(bucketSpace);
}
-PersistenceEngine::PersistenceEngine(IPersistenceEngineOwner &owner, const IResourceWriteFilter &writeFilter,
+PersistenceEngine::PersistenceEngine(IPersistenceEngineOwner &owner, const IResourceWriteFilter &writeFilter, IDiskMemUsageNotifier& disk_mem_usage_notifier,
ssize_t defaultSerializedSize, bool ignoreMaxBytes)
: AbstractPersistenceProvider(),
_defaultSerializedSize(defaultSerializedSize),
@@ -193,7 +193,8 @@ PersistenceEngine::PersistenceEngine(IPersistenceEngineOwner &owner, const IReso
_writeFilter(writeFilter),
_clusterStates(),
_extraModifiedBuckets(),
- _rwMutex()
+ _rwMutex(),
+ _resource_usage_tracker(std::make_shared<ResourceUsageTracker>(disk_mem_usage_notifier))
{
}
@@ -608,6 +609,11 @@ PersistenceEngine::join(const Bucket& source1, const Bucket& source2, const Buck
return latch.getResult();
}
+std::unique_ptr<vespalib::IDestructorCallback>
+PersistenceEngine::register_resource_usage_listener(IResourceUsageListener& listener)
+{
+ return _resource_usage_tracker->set_listener(listener);
+}
void
PersistenceEngine::destroyIterators()
diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h
index ee0a09cd240..659156fdea0 100644
--- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h
+++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h
@@ -5,6 +5,7 @@
#include "i_resource_write_filter.h"
#include "persistence_handler_map.h"
#include "ipersistencehandler.h"
+#include "resource_usage_tracker.h"
#include <vespa/persistence/spi/abstractpersistenceprovider.h>
#include <mutex>
#include <shared_mutex>
@@ -12,6 +13,7 @@
namespace proton {
class IPersistenceEngineOwner;
+class IDiskMemUsageNotifier;
class PersistenceEngine : public storage::spi::AbstractPersistenceProvider {
private:
@@ -27,6 +29,7 @@ private:
using CreateIteratorResult = storage::spi::CreateIteratorResult;
using GetResult = storage::spi::GetResult;
using IncludedVersions = storage::spi::IncludedVersions;
+ using IResourceUsageListener = storage::spi::IResourceUsageListener;
using IterateResult = storage::spi::IterateResult;
using IteratorId = storage::spi::IteratorId;
using RemoveResult = storage::spi::RemoveResult;
@@ -68,7 +71,8 @@ private:
const IResourceWriteFilter &_writeFilter;
std::unordered_map<BucketSpace, ClusterState::SP, BucketSpace::hash> _clusterStates;
mutable ExtraModifiedBuckets _extraModifiedBuckets;
- mutable std::shared_mutex _rwMutex;
+ mutable std::shared_mutex _rwMutex;
+ std::shared_ptr<ResourceUsageTracker> _resource_usage_tracker;
using ReadGuard = std::shared_lock<std::shared_mutex>;
using WriteGuard = std::unique_lock<std::shared_mutex>;
@@ -84,7 +88,7 @@ private:
public:
typedef std::unique_ptr<PersistenceEngine> UP;
- PersistenceEngine(IPersistenceEngineOwner &owner, const IResourceWriteFilter &writeFilter,
+ PersistenceEngine(IPersistenceEngineOwner &owner, const IResourceWriteFilter &writeFilter, IDiskMemUsageNotifier &disk_mem_usage_notifier,
ssize_t defaultSerializedSize, bool ignoreMaxBytes);
~PersistenceEngine() override;
@@ -111,12 +115,13 @@ public:
BucketIdListResult getModifiedBuckets(BucketSpace bucketSpace) const override;
Result split(const Bucket& source, const Bucket& target1, const Bucket& target2, Context&) override;
Result join(const Bucket& source1, const Bucket& source2, const Bucket& target, Context&) override;
-
+ std::unique_ptr<vespalib::IDestructorCallback> register_resource_usage_listener(IResourceUsageListener& listener) override;
void destroyIterators();
void propagateSavedClusterState(BucketSpace bucketSpace, IPersistenceHandler &handler);
void grabExtraModifiedBuckets(BucketSpace bucketSpace, IPersistenceHandler &handler);
void populateInitialBucketDB(const WriteGuard & guard, BucketSpace bucketSpace, IPersistenceHandler &targetHandler);
WriteGuard getWLock() const;
+ ResourceUsageTracker &get_resource_usage_tracker() noexcept { return *_resource_usage_tracker; }
};
}
diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/resource_usage_tracker.cpp b/searchcore/src/vespa/searchcore/proton/persistenceengine/resource_usage_tracker.cpp
new file mode 100644
index 00000000000..8830f462178
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/resource_usage_tracker.cpp
@@ -0,0 +1,80 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "resource_usage_tracker.h"
+#include <vespa/searchcore/proton/server/disk_mem_usage_state.h>
+#include <vespa/searchcore/proton/server/i_disk_mem_usage_notifier.h>
+#include <vespa/persistence/spi/i_resource_usage_listener.h>
+#include <vespa/vespalib/util/idestructorcallback.h>
+#include <cassert>
+
+using storage::spi::ResourceUsage;
+
+namespace proton {
+
+class ResourceUsageTracker::ListenerGuard : public vespalib::IDestructorCallback
+{
+ std::weak_ptr<ResourceUsageTracker> _tracker;
+public:
+ ListenerGuard(std::shared_ptr<ResourceUsageTracker> tracker);
+ ~ListenerGuard() override;
+};
+
+ResourceUsageTracker::ListenerGuard::ListenerGuard(std::shared_ptr<ResourceUsageTracker> tracker)
+ : _tracker(tracker)
+{
+}
+
+ResourceUsageTracker::ListenerGuard::~ListenerGuard()
+{
+ auto tracker = _tracker.lock();
+ if (tracker) {
+ tracker->remove_listener();
+ }
+}
+
+ResourceUsageTracker::ResourceUsageTracker(IDiskMemUsageNotifier& disk_mem_usage_notifier)
+ : std::enable_shared_from_this<ResourceUsageTracker>(),
+ IDiskMemUsageListener(),
+ _lock(),
+ _resource_usage(),
+ _listener(nullptr),
+ _disk_mem_usage_notifier(disk_mem_usage_notifier)
+{
+ _disk_mem_usage_notifier.addDiskMemUsageListener(this);
+}
+
+ResourceUsageTracker::~ResourceUsageTracker()
+{
+ _disk_mem_usage_notifier.removeDiskMemUsageListener(this);
+ std::lock_guard guard(_lock);
+}
+
+void
+ResourceUsageTracker::notifyDiskMemUsage(DiskMemUsageState state)
+{
+ std::lock_guard guard(_lock);
+ _resource_usage = ResourceUsage(state.diskState().usage(), state.memoryState().usage());
+ if (_listener != nullptr) {
+ _listener->update_resource_usage(_resource_usage);
+ }
+}
+
+std::unique_ptr<vespalib::IDestructorCallback>
+ResourceUsageTracker::set_listener(storage::spi::IResourceUsageListener& listener)
+{
+ std::lock_guard guard(_lock);
+ assert(_listener == nullptr);
+ _listener = &listener;
+ listener.update_resource_usage(_resource_usage);
+ return std::make_unique<ListenerGuard>(shared_from_this());
+}
+
+void
+ResourceUsageTracker::remove_listener()
+{
+ std::lock_guard guard(_lock);
+ _listener = nullptr;
+}
+
+};
+
diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/resource_usage_tracker.h b/searchcore/src/vespa/searchcore/proton/persistenceengine/resource_usage_tracker.h
new file mode 100644
index 00000000000..e41435a1a83
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/resource_usage_tracker.h
@@ -0,0 +1,37 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <vespa/searchcore/proton/server/i_disk_mem_usage_listener.h>
+#include <vespa/persistence/spi/resource_usage.h>
+#include <mutex>
+#include <memory>
+#include <vector>
+
+namespace storage::spi { class IResourceUsageListener; }
+namespace vespalib { class IDestructorCallback; }
+
+namespace proton {
+
+class DiskMemUsageState;
+class IDiskMemUsageNotifier;
+
+/*
+ * Class tracking resource usage for persistence provider.
+ */
+class ResourceUsageTracker : public std::enable_shared_from_this<ResourceUsageTracker>, public IDiskMemUsageListener
+{
+ class ListenerGuard;
+ std::mutex _lock;
+ storage::spi::ResourceUsage _resource_usage;
+ storage::spi::IResourceUsageListener* _listener;
+ IDiskMemUsageNotifier& _disk_mem_usage_notifier;
+ void remove_listener();
+public:
+ ResourceUsageTracker(IDiskMemUsageNotifier& notifier);
+ ~ResourceUsageTracker() override;
+ void notifyDiskMemUsage(DiskMemUsageState state) override;
+ std::unique_ptr<vespalib::IDestructorCallback> set_listener(storage::spi::IResourceUsageListener& listener);
+};
+
+}
diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.cpp b/searchcore/src/vespa/searchcore/proton/server/proton.cpp
index 36d630bc519..3f7cf14afbe 100644
--- a/searchcore/src/vespa/searchcore/proton/server/proton.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/proton.cpp
@@ -310,6 +310,7 @@ Proton::init(const BootstrapConfig::SP & configSnapshot)
protonConfig.basedir.c_str(), getcwd(tmp, sizeof(tmp)));
_persistenceEngine = std::make_unique<PersistenceEngine>(*this, _diskMemUsageSampler->writeFilter(),
+ _diskMemUsageSampler->notifier(),
protonConfig.visit.defaultserializedsize,
protonConfig.visit.ignoremaxbytes);
diff --git a/searchcore/src/vespa/searchcore/proton/test/disk_mem_usage_notifier.h b/searchcore/src/vespa/searchcore/proton/test/disk_mem_usage_notifier.h
index 55b6554ab2d..9a9b9b0ea14 100644
--- a/searchcore/src/vespa/searchcore/proton/test/disk_mem_usage_notifier.h
+++ b/searchcore/src/vespa/searchcore/proton/test/disk_mem_usage_notifier.h
@@ -19,7 +19,16 @@ class DiskMemUsageNotifier : public IDiskMemUsageNotifier
std::vector<IDiskMemUsageListener *> _listeners;
DiskMemUsageState _state;
public:
- DiskMemUsageNotifier() : IDiskMemUsageNotifier(), _listeners(), _state() { }
+ DiskMemUsageNotifier(DiskMemUsageState state)
+ : IDiskMemUsageNotifier(),
+ _listeners(),
+ _state(state)
+ {
+ }
+ DiskMemUsageNotifier()
+ : DiskMemUsageNotifier(DiskMemUsageState())
+ {
+ }
virtual ~DiskMemUsageNotifier() { }
virtual void addDiskMemUsageListener(IDiskMemUsageListener *listener) override {
_listeners.push_back(listener);
diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp
index 02c92fc1650..bbde377fdec 100644
--- a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp
+++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp
@@ -3,6 +3,7 @@
#include "persistenceproviderwrapper.h"
#include <vespa/document/fieldvalue/document.h>
#include <vespa/document/update/documentupdate.h>
+#include <vespa/vespalib/util/idestructorcallback.h>
#include <sstream>
#define LOG_SPI(ops) \
@@ -202,6 +203,12 @@ PersistenceProviderWrapper::join(const spi::Bucket& source1,
return _spi.join(source1, source2, target, context);
}
+std::unique_ptr<vespalib::IDestructorCallback>
+PersistenceProviderWrapper::register_resource_usage_listener(spi::IResourceUsageListener& listener)
+{
+ return _spi.register_resource_usage_listener(listener);
+}
+
spi::Result
PersistenceProviderWrapper::removeEntry(const spi::Bucket& bucket,
spi::Timestamp timestamp,
diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.h b/storage/src/tests/persistence/common/persistenceproviderwrapper.h
index 4061343c8da..cc26b251e67 100644
--- a/storage/src/tests/persistence/common/persistenceproviderwrapper.h
+++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.h
@@ -111,6 +111,7 @@ public:
spi::Result join(const spi::Bucket& source1, const spi::Bucket& source2,
const spi::Bucket& target, spi::Context&) override;
spi::Result removeEntry(const spi::Bucket&, spi::Timestamp, spi::Context&) override;
+ std::unique_ptr<vespalib::IDestructorCallback> register_resource_usage_listener(spi::IResourceUsageListener& listener) override;
};
} // storage
diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp
index 26cfe845eef..cf83741f5bb 100644
--- a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp
+++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp
@@ -2,6 +2,7 @@
#include "provider_error_wrapper.h"
#include "persistenceutil.h"
+#include <vespa/vespalib/util/idestructorcallback.h>
namespace storage {
@@ -155,6 +156,12 @@ ProviderErrorWrapper::join(const spi::Bucket& source1, const spi::Bucket& source
return checkResult(_impl.join(source1, source2, target, context));
}
+std::unique_ptr<vespalib::IDestructorCallback>
+ProviderErrorWrapper::register_resource_usage_listener(spi::IResourceUsageListener& listener)
+{
+ return _impl.register_resource_usage_listener(listener);
+}
+
spi::Result
ProviderErrorWrapper::removeEntry(const spi::Bucket& bucket, spi::Timestamp ts, spi::Context& context)
{
diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.h b/storage/src/vespa/storage/persistence/provider_error_wrapper.h
index 5e5682a8bb4..fa24dbeea45 100644
--- a/storage/src/vespa/storage/persistence/provider_error_wrapper.h
+++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.h
@@ -58,6 +58,7 @@ public:
spi::BucketIdListResult getModifiedBuckets(BucketSpace bucketSpace) const override;
spi::Result split(const spi::Bucket& source, const spi::Bucket& target1, const spi::Bucket& target2, spi::Context&) override;
spi::Result join(const spi::Bucket& source1, const spi::Bucket& source2, const spi::Bucket& target, spi::Context&) override;
+ std::unique_ptr<vespalib::IDestructorCallback> register_resource_usage_listener(spi::IResourceUsageListener& listener) override;
spi::Result removeEntry(const spi::Bucket&, spi::Timestamp, spi::Context&) override;
spi::PersistenceProvider& getProviderImplementation() {