diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2021-12-10 18:12:15 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2021-12-10 18:20:03 +0000 |
commit | af2b95592c411646b68399cf8ba01af72c2d177b (patch) | |
tree | 591d354962814bb67e7beb6a35acf1ff3ac6918e /searchcore | |
parent | 9ed631e3af71e1e91743faa1745f93b45b3c5cec (diff) |
Move operation counting out to separate class
Diffstat (limited to 'searchcore')
4 files changed, 63 insertions, 30 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/server/feed_handler_stats.cpp b/searchcore/src/vespa/searchcore/proton/server/feed_handler_stats.cpp index 0db388d2644..f5665c47529 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feed_handler_stats.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/feed_handler_stats.cpp @@ -1,6 +1,10 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "feed_handler_stats.h" +#include <cassert> +#include <vespa/log/log.h> + +LOG_SETUP(".proton.server.feed_handler_stats"); namespace proton { @@ -66,4 +70,14 @@ FeedHandlerStats::reset_min_max() noexcept _max_latency.reset(); } +void +FeedOperationCounter::commitCompleted(size_t numOperations) { + assert(_commitsStarted > _commitsCompleted); + assert(_operationsStarted >= _operationsCompleted + numOperations); + _operationsCompleted += numOperations; + _commitsCompleted++; + LOG(spam, "%zu: onCommitDone(%zu) total=%zu left=%zu", + _commitsCompleted, numOperations, _operationsCompleted, operationsInFlight()); +} + } diff --git a/searchcore/src/vespa/searchcore/proton/server/feed_handler_stats.h b/searchcore/src/vespa/searchcore/proton/server/feed_handler_stats.h index 9c8d1b9190b..db93c157046 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feed_handler_stats.h +++ b/searchcore/src/vespa/searchcore/proton/server/feed_handler_stats.h @@ -36,4 +36,41 @@ public: const std::optional<double>& get_max_latency() noexcept { return _max_latency; } }; +/** + * Keeps track of feed operations started, completed and being committed. + * Also tracks started and completed commit operations. + */ +class FeedOperationCounter { +public: + FeedOperationCounter() + : _operationsStarted(0), + _operationsCompleted(0), + _operationsStartedAtLastCommitStart(0), + _commitsStarted(0), + _commitsCompleted(0) + {} + void startOperation() { ++_operationsStarted; } + void startCommit() { + _commitsStarted++; + _operationsStartedAtLastCommitStart = _operationsStarted; + } + + void commitCompleted(size_t numOperations); + + size_t operationsSinceLastCommitStart() const { + return _operationsStarted - _operationsStartedAtLastCommitStart; + } + size_t operationsInFlight() const { return _operationsStarted - _operationsCompleted; } + size_t commitsInFlight() const { return _commitsStarted - _commitsCompleted; } + bool shouldScheduleCommit() const { + return (operationsInFlight() > 0) && (commitsInFlight() == 0); + } +private: + size_t _operationsStarted; + size_t _operationsCompleted; + size_t _operationsStartedAtLastCommitStart; + size_t _commitsStarted; + size_t _commitsCompleted; +}; + } diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp index 0ec7f1c606a..8b99c39dd65 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp @@ -23,7 +23,6 @@ #include <vespa/vespalib/util/exceptions.h> #include <vespa/vespalib/util/lambdatask.h> #include <cassert> -#include <unistd.h> #include <vespa/log/log.h> LOG_SETUP(".proton.server.feedhandler"); @@ -99,7 +98,7 @@ TlsMgrWriter::sync(SerialNum syncTo) bool res = _tls_mgr.getSession()->sync(syncTo, syncedTo); if (!res) { LOG(debug, "Tls sync failed, retrying"); - sleep(1); + std::this_thread::sleep_for(100ms); continue; } if (syncedTo >= syncTo) { @@ -418,11 +417,7 @@ FeedHandler::FeedHandler(IThreadingService &writeService, _prunedSerialNum(0), _replay_end_serial_num(0), _prepare_serial_num(0), - _numOperationsStarted(0), - _numOperationsCompleted(0), - _numOperationsStartedAtLastCommitStart(0), - _numCommitsStarted(0), - _numCommitsCompleted(0), + _numOperations(), _delayedPrune(false), _feedLock(), _feedState(make_shared<InitState>(getDocTypeName())), @@ -521,20 +516,15 @@ FeedHandler::getTransactionLogReplayDone() const { } void -FeedHandler::onCommitDone(size_t numOperationsInCommit, vespalib::steady_time start_time) { - assert(_numCommitsStarted > _numCommitsCompleted); - assert(_numOperationsStarted >= _numOperationsCompleted + numOperationsInCommit); - _numOperationsCompleted += numOperationsInCommit; - _numCommitsCompleted++; - if ((numOperationsInFlight() > 0) && numCommitsInFlight() == 0) { +FeedHandler::onCommitDone(size_t numOperations, vespalib::steady_time start_time) { + _numOperations.commitCompleted(numOperations); + if (_numOperations.shouldScheduleCommit()) { enqueCommitTask(); } - LOG(spam, "%zu: onCommitDone(%zu) total=%zu left=%zu", - _numCommitsCompleted, numOperationsInCommit, _numOperationsCompleted, numOperationsInFlight()); vespalib::steady_time now = vespalib::steady_clock::now(); auto latency = vespalib::to_s(now - start_time); std::lock_guard guard(_stats_lock); - _stats.add_commit(numOperationsInCommit, latency); + _stats.add_commit(numOperations, latency); } void FeedHandler::enqueCommitTask() { @@ -545,15 +535,13 @@ void FeedHandler::enqueCommitTask() { void FeedHandler::initiateCommit(vespalib::steady_time start_time) { - size_t numOperationsSinceLastInitiatedCommit = _numOperationsStarted - _numOperationsStartedAtLastCommitStart; auto onCommitDoneContext = std::make_shared<OnCommitDone>( _writeService.master(), - makeLambdaTask([this, numOperationsSinceLastInitiatedCommit, start_time]() { - onCommitDone(numOperationsSinceLastInitiatedCommit, start_time); + makeLambdaTask([this, operations=_numOperations.operationsSinceLastCommitStart(), start_time]() { + onCommitDone(operations, start_time); })); auto commitResult = _tlsWriter->startCommit(onCommitDoneContext); - _numCommitsStarted++; - _numOperationsStartedAtLastCommitStart = _numOperationsStarted; + _numOperations.startCommit(); if (_activeFeedView) { using KeepAlivePair = vespalib::KeepAlive<std::pair<CommitResult, DoneCallback>>; auto pair = std::make_pair(std::move(commitResult), std::move(onCommitDoneContext)); @@ -567,8 +555,8 @@ FeedHandler::appendOperation(const FeedOperation &op, TlsWriter::DoneCallback on const_cast<FeedOperation &>(op).setSerialNum(inc_serial_num()); } _tlsWriter->appendOperation(op, std::move(onDone)); - ++_numOperationsStarted; - if (numOperationsInFlight() == 1) { + _numOperations.startOperation(); + if (_numOperations.operationsInFlight() == 1) { enqueCommitTask(); } } diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h index 7b319779620..417d9c21548 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h +++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h @@ -83,11 +83,7 @@ private: // the serial num of the last feed operation in the transaction log at startup before replay SerialNum _replay_end_serial_num; uint64_t _prepare_serial_num; - size_t _numOperationsStarted; - size_t _numOperationsCompleted; - size_t _numOperationsStartedAtLastCommitStart; - size_t _numCommitsStarted; - size_t _numCommitsCompleted; + FeedOperationCounter _numOperations; bool _delayedPrune; mutable std::shared_mutex _feedLock; FeedStateSP _feedState; @@ -142,8 +138,6 @@ private: void onCommitDone(size_t numPendingAtStart, vespalib::steady_time start_time); void initiateCommit(vespalib::steady_time start_time); void enqueCommitTask(); - size_t numOperationsInFlight() const { return _numOperationsStarted - _numOperationsCompleted; } - size_t numCommitsInFlight() const { return _numCommitsStarted - _numCommitsCompleted; } public: FeedHandler(const FeedHandler &) = delete; FeedHandler & operator = (const FeedHandler &) = delete; |