aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@online.no>2021-11-26 15:23:39 +0100
committerTor Egge <Tor.Egge@online.no>2021-11-26 15:26:52 +0100
commitd5d8301d1c3a73d9471829fc335cb4f270812f99 (patch)
treedf3bce488b3d9d25b7456b6d4b8b9b02e057cd04
parent1142475e6db77b0e073b0d2b9774bb672cd8feaf (diff)
Update document db feeding metrics.
-rw-r--r--searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp14
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt1
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/documentdb.cpp2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/documentdb_metrics_updater.cpp31
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/documentdb_metrics_updater.h8
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/feed_handler_stats.cpp69
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/feed_handler_stats.h39
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp30
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/feedhandler.h8
9 files changed, 189 insertions, 13 deletions
diff --git a/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp b/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp
index bffedfd8dab..f76b7d03d08 100644
--- a/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp
@@ -747,6 +747,20 @@ TEST_F("require that put with different document type repo is ok", FeedHandlerFi
EXPECT_EQUAL(1, f.tls_writer.store_count);
}
+TEST_F("require that feed stats are updated", FeedHandlerFixture)
+{
+ DocumentContext doc_context("id:ns:searchdocument::foo", *f.schema.builder);
+ auto op =std::make_unique<PutOperation>(doc_context.bucketId, Timestamp(10), std::move(doc_context.doc));
+ FeedTokenContext token_context;
+ f.handler.performOperation(std::move(token_context.token), std::move(op));
+ f.syncMaster(); // wait for initateCommit
+ f.syncMaster(); // wait for onCommitDone
+ auto stats = f.handler.get_stats(false);
+ EXPECT_EQUAL(1u, stats.get_commits());
+ EXPECT_EQUAL(1u, stats.get_operations());
+ EXPECT_LESS(0.0, stats.get_total_latency());
+}
+
using namespace document;
TEST_F("require that update with a fieldpath update will be rejected", SchemaContext) {
diff --git a/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt b/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt
index efa22be6533..1daacc29fcb 100644
--- a/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt
+++ b/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt
@@ -47,6 +47,7 @@ vespa_add_library(searchcore_server STATIC
feedhandler.cpp
feedstate.cpp
feedstates.cpp
+ feed_handler_stats.cpp
fileconfigmanager.cpp
flushhandlerproxy.cpp
forcecommitcontext.cpp
diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp
index d491f4ab364..5d7f841a909 100644
--- a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp
@@ -208,7 +208,7 @@ DocumentDB::DocumentDB(const vespalib::string &baseDir,
_maintenanceController(_writeService.master(), shared_service.shared(), _refCount, _docTypeName),
_jobTrackers(),
_calc(),
- _metricsUpdater(_subDBs, _writeService, _jobTrackers, *_sessionManager, _writeFilter)
+ _metricsUpdater(_subDBs, _writeService, _jobTrackers, *_sessionManager, _writeFilter, *_feedHandler)
{
assert(configSnapshot);
diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb_metrics_updater.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdb_metrics_updater.cpp
index d0bd7d4ee69..4e156539441 100644
--- a/searchcore/src/vespa/searchcore/proton/server/documentdb_metrics_updater.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/documentdb_metrics_updater.cpp
@@ -5,6 +5,7 @@
#include "documentdb_metrics_updater.h"
#include "documentsubdbcollection.h"
#include "executorthreadingservice.h"
+#include "feedhandler.h"
#include "idocumentsubdb.h"
#include <vespa/searchcommon/attribute/status.h>
#include <vespa/searchcore/proton/attribute/attribute_usage_filter.h>
@@ -34,12 +35,16 @@ DocumentDBMetricsUpdater::DocumentDBMetricsUpdater(const DocumentSubDBCollection
ExecutorThreadingService &writeService,
DocumentDBJobTrackers &jobTrackers,
matching::SessionManager &sessionManager,
- const AttributeUsageFilter &writeFilter)
+ const AttributeUsageFilter &writeFilter,
+ FeedHandler& feed_handler)
: _subDBs(subDBs),
_writeService(writeService),
_jobTrackers(jobTrackers),
_sessionManager(sessionManager),
- _writeFilter(writeFilter)
+ _writeFilter(writeFilter),
+ _feed_handler(feed_handler),
+ _lastDocStoreCacheStats(),
+ _last_feed_handler_stats()
{
}
@@ -280,6 +285,27 @@ updateLidSpaceMetrics(MetricSetType &metrics, const search::IDocumentMetaStore &
metrics.lidFragmentationFactor.set(stats.getLidFragmentationFactor());
}
+void
+update_feeding_metrics(DocumentDBFeedingMetrics& metrics, FeedHandlerStats stats, std::optional<FeedHandlerStats>& last_stats)
+{
+ auto delta_stats = stats;
+ if (last_stats.has_value()) {
+ delta_stats -= last_stats.value();
+ }
+ last_stats = stats;
+ uint32_t commits = delta_stats.get_commits();
+ if (commits != 0) {
+ double min_operations = delta_stats.get_min_operations().value_or(0);
+ double max_operations = delta_stats.get_max_operations().value_or(0);
+ double avg_operations = ((double) delta_stats.get_operations()) / commits;
+ metrics.commit.operations.addValueBatch(avg_operations, commits, min_operations, max_operations);
+ double min_latency = delta_stats.get_min_latency().value_or(0.0);
+ double max_latency = delta_stats.get_max_latency().value_or(0.0);
+ double avg_latency = delta_stats.get_total_latency() / commits;
+ metrics.commit.latency.addValueBatch(avg_latency, commits, min_latency, max_latency);
+ }
+}
+
}
void
@@ -297,6 +323,7 @@ DocumentDBMetricsUpdater::updateMetrics(const metrics::MetricLockGuard & guard,
metrics.totalMemoryUsage.update(totalStats.memoryUsage);
metrics.totalDiskUsage.set(totalStats.diskUsage);
+ update_feeding_metrics(metrics.feeding, _feed_handler.get_stats(true), _last_feed_handler_stats);
}
void
diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb_metrics_updater.h b/searchcore/src/vespa/searchcore/proton/server/documentdb_metrics_updater.h
index b73fa3b4eb9..381d98b2199 100644
--- a/searchcore/src/vespa/searchcore/proton/server/documentdb_metrics_updater.h
+++ b/searchcore/src/vespa/searchcore/proton/server/documentdb_metrics_updater.h
@@ -1,8 +1,10 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
+#include "feed_handler_stats.h"
#include <vespa/searchcore/proton/metrics/documentdb_tagged_metrics.h>
#include <vespa/searchlib/docstore/cachestats.h>
+#include <optional>
namespace proton {
@@ -14,6 +16,7 @@ class DocumentDBJobTrackers;
class DocumentSubDBCollection;
class ExecutorThreadingService;
class ExecutorThreadingServiceStats;
+class FeedHandler;
/**
* Class used to update metrics for a document db.
@@ -34,8 +37,10 @@ private:
DocumentDBJobTrackers &_jobTrackers;
matching::SessionManager &_sessionManager;
const AttributeUsageFilter &_writeFilter;
+ FeedHandler &_feed_handler;
// Last updated document store cache statistics. Necessary due to metrics implementation is upside down.
DocumentStoreCacheStats _lastDocStoreCacheStats;
+ std::optional<FeedHandlerStats> _last_feed_handler_stats;
void updateMiscMetrics(DocumentDBTaggedMetrics &metrics, const ExecutorThreadingServiceStats &threadingServiceStats);
void updateAttributeResourceUsageMetrics(DocumentDBTaggedMetrics::AttributeMetrics &metrics);
@@ -45,7 +50,8 @@ public:
ExecutorThreadingService &writeService,
DocumentDBJobTrackers &jobTrackers,
matching::SessionManager &sessionManager,
- const AttributeUsageFilter &writeFilter);
+ const AttributeUsageFilter &writeFilter,
+ FeedHandler& feed_handler);
~DocumentDBMetricsUpdater();
void updateMetrics(const metrics::MetricLockGuard & guard, DocumentDBTaggedMetrics &metrics);
diff --git a/searchcore/src/vespa/searchcore/proton/server/feed_handler_stats.cpp b/searchcore/src/vespa/searchcore/proton/server/feed_handler_stats.cpp
new file mode 100644
index 00000000000..0db388d2644
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/proton/server/feed_handler_stats.cpp
@@ -0,0 +1,69 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "feed_handler_stats.h"
+
+namespace proton {
+
+namespace {
+
+template <typename T>
+void update_min_max(T value, std::optional<T>& min, std::optional<T>& max)
+{
+ if (!min.has_value() || value < min.value()) {
+ min = value;
+ }
+ if (!max.has_value() || value > max.value()) {
+ max = value;
+ }
+}
+
+}
+
+FeedHandlerStats::FeedHandlerStats(uint64_t commits, uint64_t operations, double total_latency) noexcept
+ : _commits(commits),
+ _operations(operations),
+ _total_latency(total_latency),
+ _min_operations(),
+ _max_operations(),
+ _min_latency(),
+ _max_latency()
+{
+}
+
+FeedHandlerStats::FeedHandlerStats() noexcept
+ : FeedHandlerStats(0, 0, 0.0)
+{
+}
+
+FeedHandlerStats::~FeedHandlerStats() = default;
+
+
+FeedHandlerStats&
+FeedHandlerStats::operator-=(const FeedHandlerStats& rhs) noexcept
+{
+ _commits -= rhs._commits;
+ _operations -= rhs._operations;
+ _total_latency -= rhs._total_latency;
+ return *this;
+}
+
+void
+FeedHandlerStats::add_commit(uint32_t operations, double latency) noexcept
+{
+ ++_commits;
+ _operations += operations;
+ _total_latency += latency;
+ update_min_max(operations, _min_operations, _max_operations);
+ update_min_max(latency, _min_latency, _max_latency);
+}
+
+void
+FeedHandlerStats::reset_min_max() noexcept
+{
+ _min_operations.reset();
+ _max_operations.reset();
+ _min_latency.reset();
+ _max_latency.reset();
+}
+
+}
diff --git a/searchcore/src/vespa/searchcore/proton/server/feed_handler_stats.h b/searchcore/src/vespa/searchcore/proton/server/feed_handler_stats.h
new file mode 100644
index 00000000000..9c8d1b9190b
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/proton/server/feed_handler_stats.h
@@ -0,0 +1,39 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <cstdint>
+#include <optional>
+
+namespace proton {
+
+/*
+ * Stats for feed handler.
+ */
+class FeedHandlerStats
+{
+ uint64_t _commits;
+ uint64_t _operations;
+ double _total_latency;
+ std::optional<uint32_t> _min_operations;
+ std::optional<uint32_t> _max_operations;
+ std::optional<double> _min_latency;
+ std::optional<double> _max_latency;
+
+public:
+ FeedHandlerStats(uint64_t commits, uint64_t operations, double total_latency) noexcept;
+ FeedHandlerStats() noexcept;
+ ~FeedHandlerStats();
+ FeedHandlerStats& operator-=(const FeedHandlerStats& rhs) noexcept;
+ void add_commit(uint32_t operations, double latency) noexcept;
+ void reset_min_max() noexcept;
+ uint64_t get_commits() noexcept { return _commits; }
+ uint64_t get_operations() noexcept { return _operations; }
+ double get_total_latency() noexcept { return _total_latency; }
+ const std::optional<uint32_t>& get_min_operations() noexcept { return _min_operations; }
+ const std::optional<uint32_t>& get_max_operations() noexcept { return _max_operations; }
+ const std::optional<double>& get_min_latency() noexcept { return _min_latency; }
+ const std::optional<double>& get_max_latency() noexcept { return _max_latency; }
+};
+
+}
diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
index bb03f48882f..51ea6425622 100644
--- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
@@ -1,7 +1,7 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include "ddbstate.h"
#include "feedhandler.h"
+#include "ddbstate.h"
#include "feedstates.h"
#include "i_feed_handler_owner.h"
#include "ifeedview.h"
@@ -429,7 +429,9 @@ FeedHandler::FeedHandler(IThreadingService &writeService,
_syncLock(),
_syncedSerialNum(0),
_allowSync(false),
- _heart_beat_time(vespalib::steady_time())
+ _heart_beat_time(vespalib::steady_time()),
+ _stats_lock(),
+ _stats()
{ }
@@ -515,7 +517,7 @@ FeedHandler::getTransactionLogReplayDone() const {
}
void
-FeedHandler::onCommitDone(size_t numPendingAtStart) {
+FeedHandler::onCommitDone(size_t numPendingAtStart, vespalib::steady_time start_time) {
assert(numPendingAtStart <= _numOperationsPendingCommit);
_numOperationsPendingCommit -= numPendingAtStart;
_numOperationsCompleted += numPendingAtStart;
@@ -525,18 +527,22 @@ FeedHandler::onCommitDone(size_t numPendingAtStart) {
}
LOG(spam, "%zu: onCommitDone(%zu) total=%zu left=%zu",
_numCommitsCompleted, numPendingAtStart, _numOperationsCompleted, _numOperationsPendingCommit);
+ vespalib::steady_time now = vespalib::steady_clock::now();
+ auto latency = vespalib::to_s(now - start_time);
+ std::lock_guard guard(_stats_lock);
+ _stats.add_commit(numPendingAtStart, latency);
}
void FeedHandler::enqueCommitTask() {
- _writeService.master().execute(makeLambdaTask([this]() { initiateCommit(); }));
+ _writeService.master().execute(makeLambdaTask([this, start_time(vespalib::steady_clock::now())]() { initiateCommit(start_time); }));
}
void
-FeedHandler::initiateCommit() {
+FeedHandler::initiateCommit(vespalib::steady_time start_time) {
auto onCommitDoneContext = std::make_shared<OnCommitDone>(
_writeService.master(),
- makeLambdaTask([this, numPendingAtStart=_numOperationsPendingCommit]() {
- onCommitDone(numPendingAtStart);
+ makeLambdaTask([this, numPendingAtStart=_numOperationsPendingCommit, start_time]() {
+ onCommitDone(numPendingAtStart, start_time);
}));
auto commitResult = _tlsWriter->startCommit(onCommitDoneContext);
if (_activeFeedView) {
@@ -822,4 +828,14 @@ FeedHandler::get_heart_beat_time() const
return _heart_beat_time.load(std::memory_order_relaxed);
}
+FeedHandlerStats
+FeedHandler::get_stats(bool reset_min_max) const {
+ std::lock_guard guard(_stats_lock);
+ auto result = _stats;
+ if (reset_min_max) {
+ _stats.reset_min_max();
+ }
+ return result;
+}
+
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h
index ef15b268086..39d1f0f47fb 100644
--- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h
+++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h
@@ -2,6 +2,7 @@
#pragma once
+#include "feed_handler_stats.h"
#include "i_inc_serial_num.h"
#include "i_operation_storer.h"
#include "idocumentmovehandler.h"
@@ -97,6 +98,8 @@ private:
SerialNum _syncedSerialNum;
bool _allowSync; // Sanity check
std::atomic<vespalib::steady_time> _heart_beat_time;
+ mutable std::mutex _stats_lock;
+ mutable FeedHandlerStats _stats;
/**
* Delayed handling of feed operations, in master write thread.
@@ -134,8 +137,8 @@ private:
FeedStateSP getFeedState() const;
void changeFeedState(FeedStateSP newState);
void doChangeFeedState(FeedStateSP newState);
- void onCommitDone(size_t numPendingAtStart);
- void initiateCommit();
+ void onCommitDone(size_t numPendingAtStart, vespalib::steady_time start_time);
+ void initiateCommit(vespalib::steady_time start_time);
void enqueCommitTask();
public:
FeedHandler(const FeedHandler &) = delete;
@@ -245,6 +248,7 @@ public:
[[nodiscard]] CommitResult storeOperationSync(const FeedOperation & op);
void considerDelayedPrune();
vespalib::steady_time get_heart_beat_time() const;
+ FeedHandlerStats get_stats(bool reset_min_max) const;
};
} // namespace proton