diff options
author | Tor Brede Vekterli <vekterli@vespa.ai> | 2024-04-05 14:59:51 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@vespa.ai> | 2024-04-09 12:31:46 +0000 |
commit | a8dd709dceca4c53096be285f35686439a7902eb (patch) | |
tree | 5754badd60b8654426694ec83d9d6dac110eb364 /metrics | |
parent | 7a5047b9cb7c1ad40bc69dbacfbbbeafbe15b83a (diff) |
Support pipelining (batching) of mutating ops to same bucket
Bucket operations require either exclusive (single writer) or
shared (multiple readers) access. Prior to this commit, this
means that many enqueued feed operations to the same bucket
introduce pipeline stalls due to each operation having to wait
for all prior operations to the bucket to complete entirely
(including fsync of WAL append). This is a likely scenario when
feeding a document set that was previously acquired through
visiting, as such documents will inherently be output in
bucket-order.
With this commit, a configurable number of feed operations
(put, remove and update) bound for the exact same bucket may
be sent asynchronously to the persistence provider in the
context of the _same_ write lock. This mirrors how merge
operations work for puts and removes.
Batching is fairly conservative, and will _not_ batch across
further messages when any of the following holds:
* A non-feed operation is encountered
* More than one mutating operation is encountered for the
same document ID
* No more persistence throttler tokens can be acquired
* Max batch size has been reached
Updating the bucket DB, assigning bucket info and sending
replies is deferred until _all_ batched operations complete.
Max batch size is (re-)configurable live and defaults to a
batch size of 1, which shall have the exact same semantics as
the legacy behavior.
Additionally, clock sampling for persistence threads have been
abstracted away to allow for mocking in tests (no need for sleep!).
Diffstat (limited to 'metrics')
-rw-r--r-- | metrics/src/vespa/metrics/metrictimer.cpp | 9 | ||||
-rw-r--r-- | metrics/src/vespa/metrics/metrictimer.h | 24 |
2 files changed, 25 insertions, 8 deletions
diff --git a/metrics/src/vespa/metrics/metrictimer.cpp b/metrics/src/vespa/metrics/metrictimer.cpp index 84d4844104d..a3b0f215d58 100644 --- a/metrics/src/vespa/metrics/metrictimer.cpp +++ b/metrics/src/vespa/metrics/metrictimer.cpp @@ -3,13 +3,18 @@ namespace metrics { -MetricTimer::MetricTimer() +MetricTimer::MetricTimer() noexcept + : _startTime(std::chrono::steady_clock::now()) { // Amusingly enough, steady_clock was not actually steady by default on // GCC < 4.8.1, so add a bit of compile-time paranoia just to make sure. static_assert(std::chrono::steady_clock::is_steady, "Old/broken STL implementation; steady_clock not steady"); - _startTime = std::chrono::steady_clock::now(); +} + +MetricTimer::MetricTimer(std::chrono::steady_clock::time_point start_time) noexcept + : _startTime(start_time) +{ } } // metrics diff --git a/metrics/src/vespa/metrics/metrictimer.h b/metrics/src/vespa/metrics/metrictimer.h index 8a338432362..133cd819489 100644 --- a/metrics/src/vespa/metrics/metrictimer.h +++ b/metrics/src/vespa/metrics/metrictimer.h @@ -15,7 +15,19 @@ namespace metrics { class MetricTimer { public: - MetricTimer(); + // Start time point set by system steady clock + MetricTimer() noexcept; + // Start time point explicitly given + explicit MetricTimer(std::chrono::steady_clock::time_point start_time) noexcept; + + template<typename AvgVal, typename TotVal, bool SumOnAdd> + AvgVal stop(std::chrono::steady_clock::time_point now, ValueMetric<AvgVal, TotVal, SumOnAdd>& metric) const { + const auto delta = now - _startTime; + using ToDuration = std::chrono::duration<AvgVal, std::milli>; + const auto deltaMs(std::chrono::duration_cast<ToDuration>(delta).count()); + metric.addValue(deltaMs); + return deltaMs; + } /** * Adds ms passed since this timer was constructed to given value metric. @@ -26,11 +38,11 @@ public: */ template<typename AvgVal, typename TotVal, bool SumOnAdd> AvgVal stop(ValueMetric<AvgVal, TotVal, SumOnAdd>& metric) const { - const auto delta = std::chrono::steady_clock::now() - _startTime; - using ToDuration = std::chrono::duration<AvgVal, std::milli>; - const auto deltaMs(std::chrono::duration_cast<ToDuration>(delta).count()); - metric.addValue(deltaMs); - return deltaMs; + return stop(std::chrono::steady_clock::now(), metric); + } + + [[nodiscard]] std::chrono::steady_clock::time_point start_time() const noexcept { + return _startTime; } private: |