summaryrefslogtreecommitdiffstats
path: root/metrics
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@vespa.ai>2024-04-05 14:59:51 +0000
committerTor Brede Vekterli <vekterli@vespa.ai>2024-04-09 12:31:46 +0000
commita8dd709dceca4c53096be285f35686439a7902eb (patch)
tree5754badd60b8654426694ec83d9d6dac110eb364 /metrics
parent7a5047b9cb7c1ad40bc69dbacfbbbeafbe15b83a (diff)
Support pipelining (batching) of mutating ops to same bucket
Bucket operations require either exclusive (single writer) or shared (multiple readers) access. Prior to this commit, this means that many enqueued feed operations to the same bucket introduce pipeline stalls due to each operation having to wait for all prior operations to the bucket to complete entirely (including fsync of WAL append). This is a likely scenario when feeding a document set that was previously acquired through visiting, as such documents will inherently be output in bucket-order. With this commit, a configurable number of feed operations (put, remove and update) bound for the exact same bucket may be sent asynchronously to the persistence provider in the context of the _same_ write lock. This mirrors how merge operations work for puts and removes. Batching is fairly conservative, and will _not_ batch across further messages when any of the following holds: * A non-feed operation is encountered * More than one mutating operation is encountered for the same document ID * No more persistence throttler tokens can be acquired * Max batch size has been reached Updating the bucket DB, assigning bucket info and sending replies is deferred until _all_ batched operations complete. Max batch size is (re-)configurable live and defaults to a batch size of 1, which shall have the exact same semantics as the legacy behavior. Additionally, clock sampling for persistence threads have been abstracted away to allow for mocking in tests (no need for sleep!).
Diffstat (limited to 'metrics')
-rw-r--r--metrics/src/vespa/metrics/metrictimer.cpp9
-rw-r--r--metrics/src/vespa/metrics/metrictimer.h24
2 files changed, 25 insertions, 8 deletions
diff --git a/metrics/src/vespa/metrics/metrictimer.cpp b/metrics/src/vespa/metrics/metrictimer.cpp
index 84d4844104d..a3b0f215d58 100644
--- a/metrics/src/vespa/metrics/metrictimer.cpp
+++ b/metrics/src/vespa/metrics/metrictimer.cpp
@@ -3,13 +3,18 @@
namespace metrics {
-MetricTimer::MetricTimer()
+MetricTimer::MetricTimer() noexcept
+ : _startTime(std::chrono::steady_clock::now())
{
// Amusingly enough, steady_clock was not actually steady by default on
// GCC < 4.8.1, so add a bit of compile-time paranoia just to make sure.
static_assert(std::chrono::steady_clock::is_steady,
"Old/broken STL implementation; steady_clock not steady");
- _startTime = std::chrono::steady_clock::now();
+}
+
+MetricTimer::MetricTimer(std::chrono::steady_clock::time_point start_time) noexcept
+ : _startTime(start_time)
+{
}
} // metrics
diff --git a/metrics/src/vespa/metrics/metrictimer.h b/metrics/src/vespa/metrics/metrictimer.h
index 8a338432362..133cd819489 100644
--- a/metrics/src/vespa/metrics/metrictimer.h
+++ b/metrics/src/vespa/metrics/metrictimer.h
@@ -15,7 +15,19 @@ namespace metrics {
class MetricTimer {
public:
- MetricTimer();
+ // Start time point set by system steady clock
+ MetricTimer() noexcept;
+ // Start time point explicitly given
+ explicit MetricTimer(std::chrono::steady_clock::time_point start_time) noexcept;
+
+ template<typename AvgVal, typename TotVal, bool SumOnAdd>
+ AvgVal stop(std::chrono::steady_clock::time_point now, ValueMetric<AvgVal, TotVal, SumOnAdd>& metric) const {
+ const auto delta = now - _startTime;
+ using ToDuration = std::chrono::duration<AvgVal, std::milli>;
+ const auto deltaMs(std::chrono::duration_cast<ToDuration>(delta).count());
+ metric.addValue(deltaMs);
+ return deltaMs;
+ }
/**
* Adds ms passed since this timer was constructed to given value metric.
@@ -26,11 +38,11 @@ public:
*/
template<typename AvgVal, typename TotVal, bool SumOnAdd>
AvgVal stop(ValueMetric<AvgVal, TotVal, SumOnAdd>& metric) const {
- const auto delta = std::chrono::steady_clock::now() - _startTime;
- using ToDuration = std::chrono::duration<AvgVal, std::milli>;
- const auto deltaMs(std::chrono::duration_cast<ToDuration>(delta).count());
- metric.addValue(deltaMs);
- return deltaMs;
+ return stop(std::chrono::steady_clock::now(), metric);
+ }
+
+ [[nodiscard]] std::chrono::steady_clock::time_point start_time() const noexcept {
+ return _startTime;
}
private: