summaryrefslogtreecommitdiffstats
path: root/searchcore
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2021-02-23 11:19:30 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2021-02-23 11:19:30 +0000
commit31006e5945dd26d28d0157829862f61c405435ca (patch)
tree39dfe6a95b4489c1d61f0fad2023d2cb2ebf2de7 /searchcore
parent9190653f24b07563ca70d1fb0263b8f3e9c8440f (diff)
Add metirc for buckets pending move.
Diffstat (limited to 'searchcore')
-rw-r--r--searchcore/src/tests/proton/documentdb/documentbucketmover/bucketmover_common.h1
-rw-r--r--searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_v2_test.cpp17
-rw-r--r--searchcore/src/vespa/searchcore/proton/metrics/documentdb_tagged_metrics.cpp8
-rw-r--r--searchcore/src/vespa/searchcore/proton/metrics/documentdb_tagged_metrics.h8
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp11
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/documentdb.cpp16
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/documentdb_metrics_updater.h10
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/i_maintenance_job.h2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/i_move_operation_limiter.h1
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp9
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h1
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/move_operation_limiter.cpp13
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/move_operation_limiter.h5
14 files changed, 82 insertions, 22 deletions
diff --git a/searchcore/src/tests/proton/documentdb/documentbucketmover/bucketmover_common.h b/searchcore/src/tests/proton/documentdb/documentbucketmover/bucketmover_common.h
index e2cd5e268d7..b74a2410fa0 100644
--- a/searchcore/src/tests/proton/documentdb/documentbucketmover/bucketmover_common.h
+++ b/searchcore/src/tests/proton/documentdb/documentbucketmover/bucketmover_common.h
@@ -27,6 +27,7 @@ struct MyMoveOperationLimiter : public IMoveOperationLimiter {
++beginOpCount;
return {};
}
+ size_t numPending() const override { return beginOpCount; }
};
struct MyMoveHandler : public IDocumentMoveHandler {
diff --git a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_v2_test.cpp b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_v2_test.cpp
index 9db18091268..78232663eae 100644
--- a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_v2_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_v2_test.cpp
@@ -6,6 +6,8 @@
#include <vespa/vespalib/gtest/gtest.h>
#include <vespa/log/log.h>
+#include <vespa/searchcore/proton/metrics/documentdb_tagged_metrics.h>
+
LOG_SETUP("document_bucket_mover_test");
using namespace proton;
@@ -35,6 +37,7 @@ struct ControllerFixtureBase : public ::testing::Test
ExecutorThreadService _master;
DummyBucketExecutor _bucketExecutor;
MyMoveHandler _moveHandler;
+ DocumentDBTaggedMetrics _metrics;
BucketMoveJobV2 _bmj;
MyCountJobRunner _runner;
ControllerFixtureBase(const BlockableMaintenanceJobConfig &blockableConfig, bool storeMoveDoneContexts);
@@ -73,6 +76,10 @@ struct ControllerFixtureBase : public ::testing::Test
const BucketId::List &calcAsked() const {
return _calc->asked();
}
+ size_t numPending() {
+ _bmj.updateMetrics(_metrics);
+ return _metrics.bucketMove.bucketsPending.getValue();
+ }
void runLoop() {
while (!_bmj.isBlocked() && !_bmj.run()) {
}
@@ -98,6 +105,7 @@ ControllerFixtureBase::ControllerFixtureBase(const BlockableMaintenanceJobConfig
_master(_singleExecutor),
_bucketExecutor(4),
_moveHandler(*_bucketDB, storeMoveDoneContexts),
+ _metrics("test", 1),
_bmj(_calc, _moveHandler, _modifiedHandler, _master, _bucketExecutor, _ready._subDb,
_notReady._subDb, _bucketCreateNotifier, _clusterStateHandler, _bucketHandler,
_diskMemUsageNotifier, blockableConfig,
@@ -156,11 +164,15 @@ TEST_F(ControllerFixture, require_that_not_ready_bucket_is_moved_to_ready_if_buc
addReady(_ready.bucket(1));
addReady(_ready.bucket(2));
addReady(_notReady.bucket(4));
+
+ EXPECT_EQ(0, numPending());
_bmj.recompute();
+ EXPECT_EQ(1, numPending());
EXPECT_FALSE(_bmj.done());
EXPECT_TRUE(_bmj.scanAndMove(4, 3));
EXPECT_TRUE(_bmj.done());
sync();
+ EXPECT_EQ(0, numPending());
EXPECT_EQ(3u, docsMoved().size());
assertEqual(_notReady.bucket(4), _notReady.docs(4)[0], 2, 1, docsMoved()[0]);
assertEqual(_notReady.bucket(4), _notReady.docs(4)[1], 2, 1, docsMoved()[1]);
@@ -194,27 +206,32 @@ TEST_F(ControllerFixture, require_that_we_move_buckets_in_several_steps)
addReady(_notReady.bucket(4));
_bmj.recompute();
+ EXPECT_EQ(3, numPending());
EXPECT_FALSE(_bmj.done());
EXPECT_FALSE(_bmj.scanAndMove(1, 2));
EXPECT_FALSE(_bmj.done());
sync();
+ EXPECT_EQ(2, numPending());
EXPECT_EQ(2u, docsMoved().size());
EXPECT_FALSE(_bmj.scanAndMove(1, 2));
EXPECT_FALSE(_bmj.done());
sync();
+ EXPECT_EQ(2, numPending());
EXPECT_EQ(4u, docsMoved().size());
EXPECT_FALSE(_bmj.scanAndMove(1, 2));
EXPECT_FALSE(_bmj.done());
sync();
+ EXPECT_EQ(1, numPending());
EXPECT_EQ(6u, docsMoved().size());
// move bucket 4, docs 3
EXPECT_TRUE(_bmj.scanAndMove(1,2));
EXPECT_TRUE(_bmj.done());
sync();
+ EXPECT_EQ(0, numPending());
EXPECT_EQ(7u, docsMoved().size());
EXPECT_EQ(3u, bucketsModified().size());
EXPECT_EQ(_ready.bucket(2), bucketsModified()[0]);
diff --git a/searchcore/src/vespa/searchcore/proton/metrics/documentdb_tagged_metrics.cpp b/searchcore/src/vespa/searchcore/proton/metrics/documentdb_tagged_metrics.cpp
index 7b7fcc9e45d..e032cc1cef6 100644
--- a/searchcore/src/vespa/searchcore/proton/metrics/documentdb_tagged_metrics.cpp
+++ b/searchcore/src/vespa/searchcore/proton/metrics/documentdb_tagged_metrics.cpp
@@ -250,6 +250,13 @@ DocumentDBTaggedMetrics::DocumentsMetrics::DocumentsMetrics(metrics::MetricSet *
DocumentDBTaggedMetrics::DocumentsMetrics::~DocumentsMetrics() = default;
+DocumentDBTaggedMetrics::BucketMoveMetrics::BucketMoveMetrics(metrics::MetricSet *parent)
+ : metrics::MetricSet("bucket_move", {}, "Metrics for bucket move job in this document db", parent),
+ bucketsPending("buckets_pending", {}, "The number of buckets left to move", this)
+{ }
+
+DocumentDBTaggedMetrics::BucketMoveMetrics::~BucketMoveMetrics() = default;
+
DocumentDBTaggedMetrics::DocumentDBTaggedMetrics(const vespalib::string &docTypeName, size_t maxNumThreads_)
: MetricSet("documentdb", {{"documenttype", docTypeName}}, "Document DB metrics", nullptr),
job(this),
@@ -262,6 +269,7 @@ DocumentDBTaggedMetrics::DocumentDBTaggedMetrics(const vespalib::string &docType
matching(this),
sessionCache(this),
documents(this),
+ bucketMove(this),
totalMemoryUsage(this),
totalDiskUsage("disk_usage", {}, "The total disk usage (in bytes) for this document db", this),
maxNumThreads(maxNumThreads_)
diff --git a/searchcore/src/vespa/searchcore/proton/metrics/documentdb_tagged_metrics.h b/searchcore/src/vespa/searchcore/proton/metrics/documentdb_tagged_metrics.h
index 133df81a9e6..63d712b75ca 100644
--- a/searchcore/src/vespa/searchcore/proton/metrics/documentdb_tagged_metrics.h
+++ b/searchcore/src/vespa/searchcore/proton/metrics/documentdb_tagged_metrics.h
@@ -185,6 +185,13 @@ struct DocumentDBTaggedMetrics : metrics::MetricSet
~DocumentsMetrics() override;
};
+ struct BucketMoveMetrics : metrics::MetricSet {
+ metrics::LongCountMetric bucketsPending;
+
+ BucketMoveMetrics(metrics::MetricSet *parent);
+ ~BucketMoveMetrics() override;
+ };
+
JobMetrics job;
AttributeMetrics attribute;
IndexMetrics index;
@@ -195,6 +202,7 @@ struct DocumentDBTaggedMetrics : metrics::MetricSet
MatchingMetrics matching;
SessionCacheMetrics sessionCache;
DocumentsMetrics documents;
+ BucketMoveMetrics bucketMove;
MemoryUsageMetrics totalMemoryUsage;
metrics::LongValueMetric totalDiskUsage;
size_t maxNumThreads;
diff --git a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp
index 62de8a83173..becaa0c9e31 100644
--- a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp
@@ -9,6 +9,7 @@
#include "ibucketmodifiedhandler.h"
#include "move_operation_limiter.h"
#include "document_db_maintenance_config.h"
+#include "document_db_explorer.h"
#include <vespa/searchcore/proton/bucketdb/i_bucket_create_notifier.h>
#include <vespa/searchcore/proton/feedoperation/moveoperation.h>
#include <vespa/searchcore/proton/documentmetastore/i_document_meta_store.h>
@@ -87,6 +88,7 @@ BucketMoveJobV2::BucketMoveJobV2(const std::shared_ptr<IBucketStateCalculator> &
_stopped(false),
_startedCount(0),
_executedCount(0),
+ _bucketsPending(0),
_bucketCreateNotifier(bucketCreateNotifier),
_clusterStateChangedNotifier(clusterStateChangedNotifier),
_bucketStateChangedNotifier(bucketStateChangedNotifier),
@@ -289,6 +291,7 @@ BucketMoveJobV2::moveDocs(size_t maxDocsToMove) {
} else {
_movers.erase(_movers.begin() + index);
}
+ _bucketsPending.store(_movers.size() + _buckets2Move.size(), std::memory_order_relaxed);
}
}
return done();
@@ -338,6 +341,7 @@ BucketMoveJobV2::backFillMovers() {
while ( ! _buckets2Move.empty() && (_movers.size() < _movers.capacity())) {
_movers.push_back(greedyCreateMover());
}
+ _bucketsPending.store(_movers.size() + _buckets2Move.size(), std::memory_order_relaxed);
}
void
BucketMoveJobV2::notifyClusterStateChanged(const std::shared_ptr<IBucketStateCalculator> &newCalc)
@@ -380,4 +384,11 @@ BucketMoveJobV2::onStop() {
}
}
+void
+BucketMoveJobV2::updateMetrics(DocumentDBTaggedMetrics & metrics) {
+ // This is an over estimate to ensure we do not count down to zero until everything has been and completed and acked.
+ metrics.bucketMove.bucketsPending.set(_bucketsPending.load(std::memory_order_relaxed) +
+ getLimiter().numPending());
+}
+
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h
index e20e6eeaf42..7e0c45ba8fa 100644
--- a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h
+++ b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h
@@ -68,6 +68,7 @@ private:
std::atomic<bool> _stopped;
std::atomic<size_t> _startedCount;
std::atomic<size_t> _executedCount;
+ std::atomic<size_t> _bucketsPending;
bucketdb::IBucketCreateNotifier &_bucketCreateNotifier;
IClusterStateChangedNotifier &_clusterStateChangedNotifier;
@@ -114,6 +115,7 @@ public:
void notifyDiskMemUsage(DiskMemUsageState state) override;
void notifyCreateBucket(const bucketdb::Guard & guard, const BucketId &bucket) override;
void onStop() override;
+ void updateMetrics(DocumentDBTaggedMetrics & metrics) override;
};
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp
index 55ce842e0cf..7f51227103f 100644
--- a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp
@@ -191,7 +191,8 @@ DocumentDB::DocumentDB(const vespalib::string &baseDir,
_writeFilter.setConfig(loaded_config->getMaintenanceConfigSP()->getAttributeUsageFilterConfig());
}
-void DocumentDB::registerReference()
+void
+DocumentDB::registerReference()
{
if (_state.getAllowReconfig()) {
auto registry = _owner.getDocumentDBReferenceRegistry();
@@ -204,7 +205,8 @@ void DocumentDB::registerReference()
}
}
-void DocumentDB::setActiveConfig(const DocumentDBConfig::SP &config, int64_t generation) {
+void
+DocumentDB::setActiveConfig(const DocumentDBConfig::SP &config, int64_t generation) {
lock_guard guard(_configMutex);
registerReference();
_activeConfigSnapshot = config;
@@ -215,7 +217,8 @@ void DocumentDB::setActiveConfig(const DocumentDBConfig::SP &config, int64_t gen
_configCV.notify_all();
}
-DocumentDBConfig::SP DocumentDB::getActiveConfig() const {
+DocumentDBConfig::SP
+DocumentDB::getActiveConfig() const {
lock_guard guard(_configMutex);
return _activeConfigSnapshot;
}
@@ -868,7 +871,8 @@ DocumentDB::replayConfig(search::SerialNum serialNum)
_docTypeName.toString().c_str(), serialNum);
}
-int64_t DocumentDB::getActiveGeneration() const {
+int64_t
+DocumentDB::getActiveGeneration() const {
lock_guard guard(_configMutex);
return _activeConfigSnapshotGeneration;
}
@@ -1004,7 +1008,8 @@ DocumentDB::notifyClusterStateChanged(const std::shared_ptr<IBucketStateCalculat
namespace {
-void notifyBucketsChanged(const documentmetastore::IBucketHandler &metaStore,
+void
+notifyBucketsChanged(const documentmetastore::IBucketHandler &metaStore,
IBucketModifiedHandler &handler,
const vespalib::string &name)
{
@@ -1037,6 +1042,7 @@ DocumentDB::updateMetrics(const metrics::MetricLockGuard & guard)
return;
}
_metricsUpdater.updateMetrics(guard, _metrics);
+ _maintenanceController.updateMetrics(_metrics);
}
void
diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb_metrics_updater.h b/searchcore/src/vespa/searchcore/proton/server/documentdb_metrics_updater.h
index 475da7f4e4c..4a70e756a86 100644
--- a/searchcore/src/vespa/searchcore/proton/server/documentdb_metrics_updater.h
+++ b/searchcore/src/vespa/searchcore/proton/server/documentdb_metrics_updater.h
@@ -30,12 +30,12 @@ public:
private:
const DocumentSubDBCollection &_subDBs;
- ExecutorThreadingService &_writeService;
- DocumentDBJobTrackers &_jobTrackers;
- matching::SessionManager &_sessionManager;
- const AttributeUsageFilter &_writeFilter;
+ ExecutorThreadingService &_writeService;
+ DocumentDBJobTrackers &_jobTrackers;
+ matching::SessionManager &_sessionManager;
+ const AttributeUsageFilter &_writeFilter;
// Last updated document store cache statistics. Necessary due to metrics implementation is upside down.
- DocumentStoreCacheStats _lastDocStoreCacheStats;
+ DocumentStoreCacheStats _lastDocStoreCacheStats;
void updateMiscMetrics(DocumentDBTaggedMetrics &metrics, const ExecutorThreadingServiceStats &threadingServiceStats);
void updateAttributeResourceUsageMetrics(DocumentDBTaggedMetrics::AttributeMetrics &metrics);
diff --git a/searchcore/src/vespa/searchcore/proton/server/i_maintenance_job.h b/searchcore/src/vespa/searchcore/proton/server/i_maintenance_job.h
index 869dd2dfdb5..c7fcfc5c508 100644
--- a/searchcore/src/vespa/searchcore/proton/server/i_maintenance_job.h
+++ b/searchcore/src/vespa/searchcore/proton/server/i_maintenance_job.h
@@ -9,6 +9,7 @@ namespace proton {
class IBlockableMaintenanceJob;
class IMaintenanceJobRunner;
+class DocumentDBTaggedMetrics;
/**
* Interface for a maintenance job that is executed after "delay" seconds and
@@ -40,6 +41,7 @@ public:
virtual bool isBlocked() const { return false; }
virtual IBlockableMaintenanceJob *asBlockable() { return nullptr; }
virtual void onStop() {}
+ virtual void updateMetrics(DocumentDBTaggedMetrics &) {}
/**
* Register maintenance job runner, in case event passed to the
diff --git a/searchcore/src/vespa/searchcore/proton/server/i_move_operation_limiter.h b/searchcore/src/vespa/searchcore/proton/server/i_move_operation_limiter.h
index cc91413826c..0eae0c5924f 100644
--- a/searchcore/src/vespa/searchcore/proton/server/i_move_operation_limiter.h
+++ b/searchcore/src/vespa/searchcore/proton/server/i_move_operation_limiter.h
@@ -13,6 +13,7 @@ namespace proton {
struct IMoveOperationLimiter {
virtual ~IMoveOperationLimiter() = default;
virtual std::shared_ptr<vespalib::IDestructorCallback> beginOperation() = 0;
+ virtual size_t numPending() const = 0;
};
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp
index 5cedcaf1c3d..c71e9d832f3 100644
--- a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp
@@ -107,6 +107,15 @@ MaintenanceController::killJobs()
}
void
+MaintenanceController::updateMetrics(DocumentDBTaggedMetrics & metrics)
+{
+ Guard guard(_jobsLock);
+ for (auto &job : _jobs) {
+ job->getJob().updateMetrics(metrics); // Make sure no more tasks are added to the executor
+ }
+}
+
+void
MaintenanceController::performHoldJobs(JobList jobs)
{
// Called by master write thread
diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h
index 3c8510e6c66..fdb6f4fa880 100644
--- a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h
+++ b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h
@@ -52,6 +52,7 @@ public:
void stop();
void start(const DocumentDBMaintenanceConfigSP &config);
void newConfig(const DocumentDBMaintenanceConfigSP &config);
+ void updateMetrics(DocumentDBTaggedMetrics & metrics);
void
syncSubDBs(const MaintenanceDocumentSubDB &readySubDB,
diff --git a/searchcore/src/vespa/searchcore/proton/server/move_operation_limiter.cpp b/searchcore/src/vespa/searchcore/proton/server/move_operation_limiter.cpp
index e3d565afb17..d8c3cbafc88 100644
--- a/searchcore/src/vespa/searchcore/proton/server/move_operation_limiter.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/move_operation_limiter.cpp
@@ -50,18 +50,11 @@ MoveOperationLimiter::clearJob()
_job = nullptr;
}
-bool
-MoveOperationLimiter::isAboveLimit() const
-{
- LockGuard guard(_mutex);
- return (_outstandingOps >= _maxOutstandingOps);
-}
-
-bool
-MoveOperationLimiter::hasPending() const
+size_t
+MoveOperationLimiter::numPending() const
{
LockGuard guard(_mutex);
- return (_outstandingOps > 0);
+ return _outstandingOps;
}
std::shared_ptr<vespalib::IDestructorCallback>
diff --git a/searchcore/src/vespa/searchcore/proton/server/move_operation_limiter.h b/searchcore/src/vespa/searchcore/proton/server/move_operation_limiter.h
index b5c8b1b9998..096e45d3cc6 100644
--- a/searchcore/src/vespa/searchcore/proton/server/move_operation_limiter.h
+++ b/searchcore/src/vespa/searchcore/proton/server/move_operation_limiter.h
@@ -37,9 +37,10 @@ public:
MoveOperationLimiter(IBlockableMaintenanceJob *job, uint32_t maxOutstandingOps);
~MoveOperationLimiter() override;
void clearJob();
- bool isAboveLimit() const;
- bool hasPending() const;
+ bool isAboveLimit() const { return numPending() >= _maxOutstandingOps; }
+ bool hasPending() const { return numPending() > 0;}
std::shared_ptr<vespalib::IDestructorCallback> beginOperation() override;
+ size_t numPending() const override;
};
}