aboutsummaryrefslogtreecommitdiffstats
path: root/searchcore
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2021-12-10 18:12:15 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2021-12-10 18:20:03 +0000
commitaf2b95592c411646b68399cf8ba01af72c2d177b (patch)
tree591d354962814bb67e7beb6a35acf1ff3ac6918e /searchcore
parent9ed631e3af71e1e91743faa1745f93b45b3c5cec (diff)
Move operation counting out to separate class
Diffstat (limited to 'searchcore')
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/feed_handler_stats.cpp14
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/feed_handler_stats.h37
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp34
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/feedhandler.h8
4 files changed, 63 insertions, 30 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/server/feed_handler_stats.cpp b/searchcore/src/vespa/searchcore/proton/server/feed_handler_stats.cpp
index 0db388d2644..f5665c47529 100644
--- a/searchcore/src/vespa/searchcore/proton/server/feed_handler_stats.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/feed_handler_stats.cpp
@@ -1,6 +1,10 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "feed_handler_stats.h"
+#include <cassert>
+#include <vespa/log/log.h>
+
+LOG_SETUP(".proton.server.feed_handler_stats");
namespace proton {
@@ -66,4 +70,14 @@ FeedHandlerStats::reset_min_max() noexcept
_max_latency.reset();
}
+void
+FeedOperationCounter::commitCompleted(size_t numOperations) {
+ assert(_commitsStarted > _commitsCompleted);
+ assert(_operationsStarted >= _operationsCompleted + numOperations);
+ _operationsCompleted += numOperations;
+ _commitsCompleted++;
+ LOG(spam, "%zu: onCommitDone(%zu) total=%zu left=%zu",
+ _commitsCompleted, numOperations, _operationsCompleted, operationsInFlight());
+}
+
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/feed_handler_stats.h b/searchcore/src/vespa/searchcore/proton/server/feed_handler_stats.h
index 9c8d1b9190b..db93c157046 100644
--- a/searchcore/src/vespa/searchcore/proton/server/feed_handler_stats.h
+++ b/searchcore/src/vespa/searchcore/proton/server/feed_handler_stats.h
@@ -36,4 +36,41 @@ public:
const std::optional<double>& get_max_latency() noexcept { return _max_latency; }
};
+/**
+ * Keeps track of feed operations started, completed and being committed.
+ * Also tracks started and completed commit operations.
+ */
+class FeedOperationCounter {
+public:
+ FeedOperationCounter()
+ : _operationsStarted(0),
+ _operationsCompleted(0),
+ _operationsStartedAtLastCommitStart(0),
+ _commitsStarted(0),
+ _commitsCompleted(0)
+ {}
+ void startOperation() { ++_operationsStarted; }
+ void startCommit() {
+ _commitsStarted++;
+ _operationsStartedAtLastCommitStart = _operationsStarted;
+ }
+
+ void commitCompleted(size_t numOperations);
+
+ size_t operationsSinceLastCommitStart() const {
+ return _operationsStarted - _operationsStartedAtLastCommitStart;
+ }
+ size_t operationsInFlight() const { return _operationsStarted - _operationsCompleted; }
+ size_t commitsInFlight() const { return _commitsStarted - _commitsCompleted; }
+ bool shouldScheduleCommit() const {
+ return (operationsInFlight() > 0) && (commitsInFlight() == 0);
+ }
+private:
+ size_t _operationsStarted;
+ size_t _operationsCompleted;
+ size_t _operationsStartedAtLastCommitStart;
+ size_t _commitsStarted;
+ size_t _commitsCompleted;
+};
+
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
index 0ec7f1c606a..8b99c39dd65 100644
--- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
@@ -23,7 +23,6 @@
#include <vespa/vespalib/util/exceptions.h>
#include <vespa/vespalib/util/lambdatask.h>
#include <cassert>
-#include <unistd.h>
#include <vespa/log/log.h>
LOG_SETUP(".proton.server.feedhandler");
@@ -99,7 +98,7 @@ TlsMgrWriter::sync(SerialNum syncTo)
bool res = _tls_mgr.getSession()->sync(syncTo, syncedTo);
if (!res) {
LOG(debug, "Tls sync failed, retrying");
- sleep(1);
+ std::this_thread::sleep_for(100ms);
continue;
}
if (syncedTo >= syncTo) {
@@ -418,11 +417,7 @@ FeedHandler::FeedHandler(IThreadingService &writeService,
_prunedSerialNum(0),
_replay_end_serial_num(0),
_prepare_serial_num(0),
- _numOperationsStarted(0),
- _numOperationsCompleted(0),
- _numOperationsStartedAtLastCommitStart(0),
- _numCommitsStarted(0),
- _numCommitsCompleted(0),
+ _numOperations(),
_delayedPrune(false),
_feedLock(),
_feedState(make_shared<InitState>(getDocTypeName())),
@@ -521,20 +516,15 @@ FeedHandler::getTransactionLogReplayDone() const {
}
void
-FeedHandler::onCommitDone(size_t numOperationsInCommit, vespalib::steady_time start_time) {
- assert(_numCommitsStarted > _numCommitsCompleted);
- assert(_numOperationsStarted >= _numOperationsCompleted + numOperationsInCommit);
- _numOperationsCompleted += numOperationsInCommit;
- _numCommitsCompleted++;
- if ((numOperationsInFlight() > 0) && numCommitsInFlight() == 0) {
+FeedHandler::onCommitDone(size_t numOperations, vespalib::steady_time start_time) {
+ _numOperations.commitCompleted(numOperations);
+ if (_numOperations.shouldScheduleCommit()) {
enqueCommitTask();
}
- LOG(spam, "%zu: onCommitDone(%zu) total=%zu left=%zu",
- _numCommitsCompleted, numOperationsInCommit, _numOperationsCompleted, numOperationsInFlight());
vespalib::steady_time now = vespalib::steady_clock::now();
auto latency = vespalib::to_s(now - start_time);
std::lock_guard guard(_stats_lock);
- _stats.add_commit(numOperationsInCommit, latency);
+ _stats.add_commit(numOperations, latency);
}
void FeedHandler::enqueCommitTask() {
@@ -545,15 +535,13 @@ void FeedHandler::enqueCommitTask() {
void
FeedHandler::initiateCommit(vespalib::steady_time start_time) {
- size_t numOperationsSinceLastInitiatedCommit = _numOperationsStarted - _numOperationsStartedAtLastCommitStart;
auto onCommitDoneContext = std::make_shared<OnCommitDone>(
_writeService.master(),
- makeLambdaTask([this, numOperationsSinceLastInitiatedCommit, start_time]() {
- onCommitDone(numOperationsSinceLastInitiatedCommit, start_time);
+ makeLambdaTask([this, operations=_numOperations.operationsSinceLastCommitStart(), start_time]() {
+ onCommitDone(operations, start_time);
}));
auto commitResult = _tlsWriter->startCommit(onCommitDoneContext);
- _numCommitsStarted++;
- _numOperationsStartedAtLastCommitStart = _numOperationsStarted;
+ _numOperations.startCommit();
if (_activeFeedView) {
using KeepAlivePair = vespalib::KeepAlive<std::pair<CommitResult, DoneCallback>>;
auto pair = std::make_pair(std::move(commitResult), std::move(onCommitDoneContext));
@@ -567,8 +555,8 @@ FeedHandler::appendOperation(const FeedOperation &op, TlsWriter::DoneCallback on
const_cast<FeedOperation &>(op).setSerialNum(inc_serial_num());
}
_tlsWriter->appendOperation(op, std::move(onDone));
- ++_numOperationsStarted;
- if (numOperationsInFlight() == 1) {
+ _numOperations.startOperation();
+ if (_numOperations.operationsInFlight() == 1) {
enqueCommitTask();
}
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h
index 7b319779620..417d9c21548 100644
--- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h
+++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h
@@ -83,11 +83,7 @@ private:
// the serial num of the last feed operation in the transaction log at startup before replay
SerialNum _replay_end_serial_num;
uint64_t _prepare_serial_num;
- size_t _numOperationsStarted;
- size_t _numOperationsCompleted;
- size_t _numOperationsStartedAtLastCommitStart;
- size_t _numCommitsStarted;
- size_t _numCommitsCompleted;
+ FeedOperationCounter _numOperations;
bool _delayedPrune;
mutable std::shared_mutex _feedLock;
FeedStateSP _feedState;
@@ -142,8 +138,6 @@ private:
void onCommitDone(size_t numPendingAtStart, vespalib::steady_time start_time);
void initiateCommit(vespalib::steady_time start_time);
void enqueCommitTask();
- size_t numOperationsInFlight() const { return _numOperationsStarted - _numOperationsCompleted; }
- size_t numCommitsInFlight() const { return _numCommitsStarted - _numCommitsCompleted; }
public:
FeedHandler(const FeedHandler &) = delete;
FeedHandler & operator = (const FeedHandler &) = delete;