diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2021-02-23 11:19:30 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2021-02-23 11:19:30 +0000 |
commit | 31006e5945dd26d28d0157829862f61c405435ca (patch) | |
tree | 39dfe6a95b4489c1d61f0fad2023d2cb2ebf2de7 /searchcore | |
parent | 9190653f24b07563ca70d1fb0263b8f3e9c8440f (diff) |
Add metirc for buckets pending move.
Diffstat (limited to 'searchcore')
14 files changed, 82 insertions, 22 deletions
diff --git a/searchcore/src/tests/proton/documentdb/documentbucketmover/bucketmover_common.h b/searchcore/src/tests/proton/documentdb/documentbucketmover/bucketmover_common.h index e2cd5e268d7..b74a2410fa0 100644 --- a/searchcore/src/tests/proton/documentdb/documentbucketmover/bucketmover_common.h +++ b/searchcore/src/tests/proton/documentdb/documentbucketmover/bucketmover_common.h @@ -27,6 +27,7 @@ struct MyMoveOperationLimiter : public IMoveOperationLimiter { ++beginOpCount; return {}; } + size_t numPending() const override { return beginOpCount; } }; struct MyMoveHandler : public IDocumentMoveHandler { diff --git a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_v2_test.cpp b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_v2_test.cpp index 9db18091268..78232663eae 100644 --- a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_v2_test.cpp +++ b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_v2_test.cpp @@ -6,6 +6,8 @@ #include <vespa/vespalib/gtest/gtest.h> #include <vespa/log/log.h> +#include <vespa/searchcore/proton/metrics/documentdb_tagged_metrics.h> + LOG_SETUP("document_bucket_mover_test"); using namespace proton; @@ -35,6 +37,7 @@ struct ControllerFixtureBase : public ::testing::Test ExecutorThreadService _master; DummyBucketExecutor _bucketExecutor; MyMoveHandler _moveHandler; + DocumentDBTaggedMetrics _metrics; BucketMoveJobV2 _bmj; MyCountJobRunner _runner; ControllerFixtureBase(const BlockableMaintenanceJobConfig &blockableConfig, bool storeMoveDoneContexts); @@ -73,6 +76,10 @@ struct ControllerFixtureBase : public ::testing::Test const BucketId::List &calcAsked() const { return _calc->asked(); } + size_t numPending() { + _bmj.updateMetrics(_metrics); + return _metrics.bucketMove.bucketsPending.getValue(); + } void runLoop() { while (!_bmj.isBlocked() && !_bmj.run()) { } @@ -98,6 +105,7 @@ ControllerFixtureBase::ControllerFixtureBase(const BlockableMaintenanceJobConfig _master(_singleExecutor), _bucketExecutor(4), _moveHandler(*_bucketDB, storeMoveDoneContexts), + _metrics("test", 1), _bmj(_calc, _moveHandler, _modifiedHandler, _master, _bucketExecutor, _ready._subDb, _notReady._subDb, _bucketCreateNotifier, _clusterStateHandler, _bucketHandler, _diskMemUsageNotifier, blockableConfig, @@ -156,11 +164,15 @@ TEST_F(ControllerFixture, require_that_not_ready_bucket_is_moved_to_ready_if_buc addReady(_ready.bucket(1)); addReady(_ready.bucket(2)); addReady(_notReady.bucket(4)); + + EXPECT_EQ(0, numPending()); _bmj.recompute(); + EXPECT_EQ(1, numPending()); EXPECT_FALSE(_bmj.done()); EXPECT_TRUE(_bmj.scanAndMove(4, 3)); EXPECT_TRUE(_bmj.done()); sync(); + EXPECT_EQ(0, numPending()); EXPECT_EQ(3u, docsMoved().size()); assertEqual(_notReady.bucket(4), _notReady.docs(4)[0], 2, 1, docsMoved()[0]); assertEqual(_notReady.bucket(4), _notReady.docs(4)[1], 2, 1, docsMoved()[1]); @@ -194,27 +206,32 @@ TEST_F(ControllerFixture, require_that_we_move_buckets_in_several_steps) addReady(_notReady.bucket(4)); _bmj.recompute(); + EXPECT_EQ(3, numPending()); EXPECT_FALSE(_bmj.done()); EXPECT_FALSE(_bmj.scanAndMove(1, 2)); EXPECT_FALSE(_bmj.done()); sync(); + EXPECT_EQ(2, numPending()); EXPECT_EQ(2u, docsMoved().size()); EXPECT_FALSE(_bmj.scanAndMove(1, 2)); EXPECT_FALSE(_bmj.done()); sync(); + EXPECT_EQ(2, numPending()); EXPECT_EQ(4u, docsMoved().size()); EXPECT_FALSE(_bmj.scanAndMove(1, 2)); EXPECT_FALSE(_bmj.done()); sync(); + EXPECT_EQ(1, numPending()); EXPECT_EQ(6u, docsMoved().size()); // move bucket 4, docs 3 EXPECT_TRUE(_bmj.scanAndMove(1,2)); EXPECT_TRUE(_bmj.done()); sync(); + EXPECT_EQ(0, numPending()); EXPECT_EQ(7u, docsMoved().size()); EXPECT_EQ(3u, bucketsModified().size()); EXPECT_EQ(_ready.bucket(2), bucketsModified()[0]); diff --git a/searchcore/src/vespa/searchcore/proton/metrics/documentdb_tagged_metrics.cpp b/searchcore/src/vespa/searchcore/proton/metrics/documentdb_tagged_metrics.cpp index 7b7fcc9e45d..e032cc1cef6 100644 --- a/searchcore/src/vespa/searchcore/proton/metrics/documentdb_tagged_metrics.cpp +++ b/searchcore/src/vespa/searchcore/proton/metrics/documentdb_tagged_metrics.cpp @@ -250,6 +250,13 @@ DocumentDBTaggedMetrics::DocumentsMetrics::DocumentsMetrics(metrics::MetricSet * DocumentDBTaggedMetrics::DocumentsMetrics::~DocumentsMetrics() = default; +DocumentDBTaggedMetrics::BucketMoveMetrics::BucketMoveMetrics(metrics::MetricSet *parent) + : metrics::MetricSet("bucket_move", {}, "Metrics for bucket move job in this document db", parent), + bucketsPending("buckets_pending", {}, "The number of buckets left to move", this) +{ } + +DocumentDBTaggedMetrics::BucketMoveMetrics::~BucketMoveMetrics() = default; + DocumentDBTaggedMetrics::DocumentDBTaggedMetrics(const vespalib::string &docTypeName, size_t maxNumThreads_) : MetricSet("documentdb", {{"documenttype", docTypeName}}, "Document DB metrics", nullptr), job(this), @@ -262,6 +269,7 @@ DocumentDBTaggedMetrics::DocumentDBTaggedMetrics(const vespalib::string &docType matching(this), sessionCache(this), documents(this), + bucketMove(this), totalMemoryUsage(this), totalDiskUsage("disk_usage", {}, "The total disk usage (in bytes) for this document db", this), maxNumThreads(maxNumThreads_) diff --git a/searchcore/src/vespa/searchcore/proton/metrics/documentdb_tagged_metrics.h b/searchcore/src/vespa/searchcore/proton/metrics/documentdb_tagged_metrics.h index 133df81a9e6..63d712b75ca 100644 --- a/searchcore/src/vespa/searchcore/proton/metrics/documentdb_tagged_metrics.h +++ b/searchcore/src/vespa/searchcore/proton/metrics/documentdb_tagged_metrics.h @@ -185,6 +185,13 @@ struct DocumentDBTaggedMetrics : metrics::MetricSet ~DocumentsMetrics() override; }; + struct BucketMoveMetrics : metrics::MetricSet { + metrics::LongCountMetric bucketsPending; + + BucketMoveMetrics(metrics::MetricSet *parent); + ~BucketMoveMetrics() override; + }; + JobMetrics job; AttributeMetrics attribute; IndexMetrics index; @@ -195,6 +202,7 @@ struct DocumentDBTaggedMetrics : metrics::MetricSet MatchingMetrics matching; SessionCacheMetrics sessionCache; DocumentsMetrics documents; + BucketMoveMetrics bucketMove; MemoryUsageMetrics totalMemoryUsage; metrics::LongValueMetric totalDiskUsage; size_t maxNumThreads; diff --git a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp index 62de8a83173..becaa0c9e31 100644 --- a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp @@ -9,6 +9,7 @@ #include "ibucketmodifiedhandler.h" #include "move_operation_limiter.h" #include "document_db_maintenance_config.h" +#include "document_db_explorer.h" #include <vespa/searchcore/proton/bucketdb/i_bucket_create_notifier.h> #include <vespa/searchcore/proton/feedoperation/moveoperation.h> #include <vespa/searchcore/proton/documentmetastore/i_document_meta_store.h> @@ -87,6 +88,7 @@ BucketMoveJobV2::BucketMoveJobV2(const std::shared_ptr<IBucketStateCalculator> & _stopped(false), _startedCount(0), _executedCount(0), + _bucketsPending(0), _bucketCreateNotifier(bucketCreateNotifier), _clusterStateChangedNotifier(clusterStateChangedNotifier), _bucketStateChangedNotifier(bucketStateChangedNotifier), @@ -289,6 +291,7 @@ BucketMoveJobV2::moveDocs(size_t maxDocsToMove) { } else { _movers.erase(_movers.begin() + index); } + _bucketsPending.store(_movers.size() + _buckets2Move.size(), std::memory_order_relaxed); } } return done(); @@ -338,6 +341,7 @@ BucketMoveJobV2::backFillMovers() { while ( ! _buckets2Move.empty() && (_movers.size() < _movers.capacity())) { _movers.push_back(greedyCreateMover()); } + _bucketsPending.store(_movers.size() + _buckets2Move.size(), std::memory_order_relaxed); } void BucketMoveJobV2::notifyClusterStateChanged(const std::shared_ptr<IBucketStateCalculator> &newCalc) @@ -380,4 +384,11 @@ BucketMoveJobV2::onStop() { } } +void +BucketMoveJobV2::updateMetrics(DocumentDBTaggedMetrics & metrics) { + // This is an over estimate to ensure we do not count down to zero until everything has been and completed and acked. + metrics.bucketMove.bucketsPending.set(_bucketsPending.load(std::memory_order_relaxed) + + getLimiter().numPending()); +} + } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h index e20e6eeaf42..7e0c45ba8fa 100644 --- a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h +++ b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h @@ -68,6 +68,7 @@ private: std::atomic<bool> _stopped; std::atomic<size_t> _startedCount; std::atomic<size_t> _executedCount; + std::atomic<size_t> _bucketsPending; bucketdb::IBucketCreateNotifier &_bucketCreateNotifier; IClusterStateChangedNotifier &_clusterStateChangedNotifier; @@ -114,6 +115,7 @@ public: void notifyDiskMemUsage(DiskMemUsageState state) override; void notifyCreateBucket(const bucketdb::Guard & guard, const BucketId &bucket) override; void onStop() override; + void updateMetrics(DocumentDBTaggedMetrics & metrics) override; }; } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp index 55ce842e0cf..7f51227103f 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp @@ -191,7 +191,8 @@ DocumentDB::DocumentDB(const vespalib::string &baseDir, _writeFilter.setConfig(loaded_config->getMaintenanceConfigSP()->getAttributeUsageFilterConfig()); } -void DocumentDB::registerReference() +void +DocumentDB::registerReference() { if (_state.getAllowReconfig()) { auto registry = _owner.getDocumentDBReferenceRegistry(); @@ -204,7 +205,8 @@ void DocumentDB::registerReference() } } -void DocumentDB::setActiveConfig(const DocumentDBConfig::SP &config, int64_t generation) { +void +DocumentDB::setActiveConfig(const DocumentDBConfig::SP &config, int64_t generation) { lock_guard guard(_configMutex); registerReference(); _activeConfigSnapshot = config; @@ -215,7 +217,8 @@ void DocumentDB::setActiveConfig(const DocumentDBConfig::SP &config, int64_t gen _configCV.notify_all(); } -DocumentDBConfig::SP DocumentDB::getActiveConfig() const { +DocumentDBConfig::SP +DocumentDB::getActiveConfig() const { lock_guard guard(_configMutex); return _activeConfigSnapshot; } @@ -868,7 +871,8 @@ DocumentDB::replayConfig(search::SerialNum serialNum) _docTypeName.toString().c_str(), serialNum); } -int64_t DocumentDB::getActiveGeneration() const { +int64_t +DocumentDB::getActiveGeneration() const { lock_guard guard(_configMutex); return _activeConfigSnapshotGeneration; } @@ -1004,7 +1008,8 @@ DocumentDB::notifyClusterStateChanged(const std::shared_ptr<IBucketStateCalculat namespace { -void notifyBucketsChanged(const documentmetastore::IBucketHandler &metaStore, +void +notifyBucketsChanged(const documentmetastore::IBucketHandler &metaStore, IBucketModifiedHandler &handler, const vespalib::string &name) { @@ -1037,6 +1042,7 @@ DocumentDB::updateMetrics(const metrics::MetricLockGuard & guard) return; } _metricsUpdater.updateMetrics(guard, _metrics); + _maintenanceController.updateMetrics(_metrics); } void diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb_metrics_updater.h b/searchcore/src/vespa/searchcore/proton/server/documentdb_metrics_updater.h index 475da7f4e4c..4a70e756a86 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentdb_metrics_updater.h +++ b/searchcore/src/vespa/searchcore/proton/server/documentdb_metrics_updater.h @@ -30,12 +30,12 @@ public: private: const DocumentSubDBCollection &_subDBs; - ExecutorThreadingService &_writeService; - DocumentDBJobTrackers &_jobTrackers; - matching::SessionManager &_sessionManager; - const AttributeUsageFilter &_writeFilter; + ExecutorThreadingService &_writeService; + DocumentDBJobTrackers &_jobTrackers; + matching::SessionManager &_sessionManager; + const AttributeUsageFilter &_writeFilter; // Last updated document store cache statistics. Necessary due to metrics implementation is upside down. - DocumentStoreCacheStats _lastDocStoreCacheStats; + DocumentStoreCacheStats _lastDocStoreCacheStats; void updateMiscMetrics(DocumentDBTaggedMetrics &metrics, const ExecutorThreadingServiceStats &threadingServiceStats); void updateAttributeResourceUsageMetrics(DocumentDBTaggedMetrics::AttributeMetrics &metrics); diff --git a/searchcore/src/vespa/searchcore/proton/server/i_maintenance_job.h b/searchcore/src/vespa/searchcore/proton/server/i_maintenance_job.h index 869dd2dfdb5..c7fcfc5c508 100644 --- a/searchcore/src/vespa/searchcore/proton/server/i_maintenance_job.h +++ b/searchcore/src/vespa/searchcore/proton/server/i_maintenance_job.h @@ -9,6 +9,7 @@ namespace proton { class IBlockableMaintenanceJob; class IMaintenanceJobRunner; +class DocumentDBTaggedMetrics; /** * Interface for a maintenance job that is executed after "delay" seconds and @@ -40,6 +41,7 @@ public: virtual bool isBlocked() const { return false; } virtual IBlockableMaintenanceJob *asBlockable() { return nullptr; } virtual void onStop() {} + virtual void updateMetrics(DocumentDBTaggedMetrics &) {} /** * Register maintenance job runner, in case event passed to the diff --git a/searchcore/src/vespa/searchcore/proton/server/i_move_operation_limiter.h b/searchcore/src/vespa/searchcore/proton/server/i_move_operation_limiter.h index cc91413826c..0eae0c5924f 100644 --- a/searchcore/src/vespa/searchcore/proton/server/i_move_operation_limiter.h +++ b/searchcore/src/vespa/searchcore/proton/server/i_move_operation_limiter.h @@ -13,6 +13,7 @@ namespace proton { struct IMoveOperationLimiter { virtual ~IMoveOperationLimiter() = default; virtual std::shared_ptr<vespalib::IDestructorCallback> beginOperation() = 0; + virtual size_t numPending() const = 0; }; } diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp index 5cedcaf1c3d..c71e9d832f3 100644 --- a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp @@ -107,6 +107,15 @@ MaintenanceController::killJobs() } void +MaintenanceController::updateMetrics(DocumentDBTaggedMetrics & metrics) +{ + Guard guard(_jobsLock); + for (auto &job : _jobs) { + job->getJob().updateMetrics(metrics); // Make sure no more tasks are added to the executor + } +} + +void MaintenanceController::performHoldJobs(JobList jobs) { // Called by master write thread diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h index 3c8510e6c66..fdb6f4fa880 100644 --- a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h +++ b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h @@ -52,6 +52,7 @@ public: void stop(); void start(const DocumentDBMaintenanceConfigSP &config); void newConfig(const DocumentDBMaintenanceConfigSP &config); + void updateMetrics(DocumentDBTaggedMetrics & metrics); void syncSubDBs(const MaintenanceDocumentSubDB &readySubDB, diff --git a/searchcore/src/vespa/searchcore/proton/server/move_operation_limiter.cpp b/searchcore/src/vespa/searchcore/proton/server/move_operation_limiter.cpp index e3d565afb17..d8c3cbafc88 100644 --- a/searchcore/src/vespa/searchcore/proton/server/move_operation_limiter.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/move_operation_limiter.cpp @@ -50,18 +50,11 @@ MoveOperationLimiter::clearJob() _job = nullptr; } -bool -MoveOperationLimiter::isAboveLimit() const -{ - LockGuard guard(_mutex); - return (_outstandingOps >= _maxOutstandingOps); -} - -bool -MoveOperationLimiter::hasPending() const +size_t +MoveOperationLimiter::numPending() const { LockGuard guard(_mutex); - return (_outstandingOps > 0); + return _outstandingOps; } std::shared_ptr<vespalib::IDestructorCallback> diff --git a/searchcore/src/vespa/searchcore/proton/server/move_operation_limiter.h b/searchcore/src/vespa/searchcore/proton/server/move_operation_limiter.h index b5c8b1b9998..096e45d3cc6 100644 --- a/searchcore/src/vespa/searchcore/proton/server/move_operation_limiter.h +++ b/searchcore/src/vespa/searchcore/proton/server/move_operation_limiter.h @@ -37,9 +37,10 @@ public: MoveOperationLimiter(IBlockableMaintenanceJob *job, uint32_t maxOutstandingOps); ~MoveOperationLimiter() override; void clearJob(); - bool isAboveLimit() const; - bool hasPending() const; + bool isAboveLimit() const { return numPending() >= _maxOutstandingOps; } + bool hasPending() const { return numPending() > 0;} std::shared_ptr<vespalib::IDestructorCallback> beginOperation() override; + size_t numPending() const override; }; } |