aboutsummaryrefslogtreecommitdiffstats
path: root/searchcore
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2021-12-10 12:18:17 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2021-12-10 12:52:10 +0000
commit48e6d31dbe64356cfb31e8dea7f36330fa479341 (patch)
tree042e83ec0a923bc47288ce3affd62aba0492e924 /searchcore
parent96bfcd5a402a778922938ab14c4071369eba4cff (diff)
Let accounting handle multiple commits in flight.
Diffstat (limited to 'searchcore')
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp36
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/feedhandler.h6
2 files changed, 27 insertions, 15 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
index 0ac91870eab..209fe39b877 100644
--- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
@@ -416,10 +416,12 @@ FeedHandler::FeedHandler(IThreadingService &writeService,
_tlsReplayProgress(),
_serialNum(0),
_prunedSerialNum(0),
- _replay_end_serial_num(0u),
- _prepare_serial_num(0u),
- _numOperationsPendingCommit(0),
+ _replay_end_serial_num(0),
+ _prepare_serial_num(0),
+ _numOperationsStarted(0),
_numOperationsCompleted(0),
+ _numOperationsStartedAtLastCommitStart(0),
+ _numCommitsStarted(0),
_numCommitsCompleted(0),
_delayedPrune(false),
_feedLock(),
@@ -519,34 +521,39 @@ FeedHandler::getTransactionLogReplayDone() const {
}
void
-FeedHandler::onCommitDone(size_t numPendingAtStart, vespalib::steady_time start_time) {
- assert(numPendingAtStart <= _numOperationsPendingCommit);
- _numOperationsPendingCommit -= numPendingAtStart;
- _numOperationsCompleted += numPendingAtStart;
+FeedHandler::onCommitDone(size_t numOperationsInCommit, vespalib::steady_time start_time) {
+ assert(_numCommitsStarted > _numCommitsCompleted);
+ assert(_numOperationsStarted >= _numOperationsCompleted + numOperationsInCommit);
+ _numOperationsCompleted += numOperationsInCommit;
_numCommitsCompleted++;
- if (_numOperationsPendingCommit > 0) {
+ if ((numOperationsInFlight() > 0) && numCommitsInFlight() == 0) {
enqueCommitTask();
}
LOG(spam, "%zu: onCommitDone(%zu) total=%zu left=%zu",
- _numCommitsCompleted, numPendingAtStart, _numOperationsCompleted, _numOperationsPendingCommit);
+ _numCommitsCompleted, numOperationsInCommit, _numOperationsCompleted, numOperationsInFlight());
vespalib::steady_time now = vespalib::steady_clock::now();
auto latency = vespalib::to_s(now - start_time);
std::lock_guard guard(_stats_lock);
- _stats.add_commit(numPendingAtStart, latency);
+ _stats.add_commit(numOperationsInCommit, latency);
}
void FeedHandler::enqueCommitTask() {
- _writeService.master().execute(makeLambdaTask([this, start_time(vespalib::steady_clock::now())]() { initiateCommit(start_time); }));
+ _writeService.master().execute(makeLambdaTask([this, start_time(vespalib::steady_clock::now())]() {
+ initiateCommit(start_time);
+ }));
}
void
FeedHandler::initiateCommit(vespalib::steady_time start_time) {
+ size_t numOperationsSinceLastInitiatedCommit = _numOperationsStarted - _numOperationsStartedAtLastCommitStart;
auto onCommitDoneContext = std::make_shared<OnCommitDone>(
_writeService.master(),
- makeLambdaTask([this, numPendingAtStart=_numOperationsPendingCommit, start_time]() {
- onCommitDone(numPendingAtStart, start_time);
+ makeLambdaTask([this, numOperationsSinceLastInitiatedCommit, start_time]() {
+ onCommitDone(numOperationsSinceLastInitiatedCommit, start_time);
}));
auto commitResult = _tlsWriter->startCommit(onCommitDoneContext);
+ _numCommitsStarted++;
+ _numOperationsStartedAtLastCommitStart = _numOperationsStarted;
if (_activeFeedView) {
using KeepAlivePair = vespalib::KeepAlive<std::pair<CommitResult, DoneCallback>>;
auto pair = std::make_pair(std::move(commitResult), std::move(onCommitDoneContext));
@@ -560,7 +567,8 @@ FeedHandler::appendOperation(const FeedOperation &op, TlsWriter::DoneCallback on
const_cast<FeedOperation &>(op).setSerialNum(inc_serial_num());
}
_tlsWriter->appendOperation(op, std::move(onDone));
- if (++_numOperationsPendingCommit == 1) {
+ ++_numOperationsStarted;
+ if (numOperationsInFlight() == 1) {
enqueCommitTask();
}
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h
index 39d1f0f47fb..7b319779620 100644
--- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h
+++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h
@@ -83,8 +83,10 @@ private:
// the serial num of the last feed operation in the transaction log at startup before replay
SerialNum _replay_end_serial_num;
uint64_t _prepare_serial_num;
- size_t _numOperationsPendingCommit;
+ size_t _numOperationsStarted;
size_t _numOperationsCompleted;
+ size_t _numOperationsStartedAtLastCommitStart;
+ size_t _numCommitsStarted;
size_t _numCommitsCompleted;
bool _delayedPrune;
mutable std::shared_mutex _feedLock;
@@ -140,6 +142,8 @@ private:
void onCommitDone(size_t numPendingAtStart, vespalib::steady_time start_time);
void initiateCommit(vespalib::steady_time start_time);
void enqueCommitTask();
+ size_t numOperationsInFlight() const { return _numOperationsStarted - _numOperationsCompleted; }
+ size_t numCommitsInFlight() const { return _numCommitsStarted - _numCommitsCompleted; }
public:
FeedHandler(const FeedHandler &) = delete;
FeedHandler & operator = (const FeedHandler &) = delete;