summaryrefslogtreecommitdiffstats
path: root/searchcore
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-09-26 21:52:19 +0200
committerHenning Baldersheim <balder@yahoo-inc.com>2020-09-28 12:05:18 +0000
commit24b812dc562a2727dda0b0f20759d153f0283ee9 (patch)
tree2d00589419c5491294fa53c69dc9f8343dab8ce0 /searchcore
parentcef539adea8b1ad6ba2314722c699a0a412082b7 (diff)
- Amortise write cost by grouping mulptiple operations together when writing to TLS.
- Commit memorystructures only then persisting to disk. - Ack operations back to user when both are completed. - Do not schedule a new commit task until both the tls and the memory structures have been comitted.
Diffstat (limited to 'searchcore')
-rw-r--r--searchcore/src/vespa/searchcore/proton/documentmetastore/lid_reuse_delayer_config.cpp2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/document_db_maintenance_config.h2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp46
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/feedhandler.h6
4 files changed, 50 insertions, 6 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/documentmetastore/lid_reuse_delayer_config.cpp b/searchcore/src/vespa/searchcore/proton/documentmetastore/lid_reuse_delayer_config.cpp
index 9db1fee58e1..1f979d1566c 100644
--- a/searchcore/src/vespa/searchcore/proton/documentmetastore/lid_reuse_delayer_config.cpp
+++ b/searchcore/src/vespa/searchcore/proton/documentmetastore/lid_reuse_delayer_config.cpp
@@ -19,7 +19,7 @@ LidReuseDelayerConfig::LidReuseDelayerConfig()
LidReuseDelayerConfig::LidReuseDelayerConfig(vespalib::duration visibilityDelay, bool hasIndexedOrAttributeFields_in)
: _visibilityDelay(visibilityDelay),
- _allowEarlyAck(visibilityDelay > vespalib::duration::zero()),
+ _allowEarlyAck(visibilityDelay > 1ms),
_hasIndexedOrAttributeFields(hasIndexedOrAttributeFields_in)
{
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/document_db_maintenance_config.h b/searchcore/src/vespa/searchcore/proton/server/document_db_maintenance_config.h
index 388a1101dcf..5ed0ad7492c 100644
--- a/searchcore/src/vespa/searchcore/proton/server/document_db_maintenance_config.h
+++ b/searchcore/src/vespa/searchcore/proton/server/document_db_maintenance_config.h
@@ -135,7 +135,7 @@ public:
}
vespalib::duration getVisibilityDelay() const { return _visibilityDelay; }
bool hasVisibilityDelay() const { return _visibilityDelay > vespalib::duration::zero(); }
- bool allowEarlyAck() const { return hasVisibilityDelay(); }
+ bool allowEarlyAck() const { return _visibilityDelay > 1ms; }
const DocumentDBLidSpaceCompactionConfig &getLidSpaceCompactionConfig() const {
return _lidSpaceCompaction;
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
index 282ec3c9ace..75414052ef5 100644
--- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
@@ -93,18 +93,18 @@ TlsMgrWriter::sync(SerialNum syncTo)
{
for (int retryCount = 0; retryCount < 10; ++retryCount) {
SerialNum syncedTo(0);
- LOG(spam, "Trying tls sync(%" PRIu64 ")", syncTo);
+ LOG(debug, "Trying tls sync(%" PRIu64 ")", syncTo);
bool res = _tls_mgr.getSession()->sync(syncTo, syncedTo);
if (!res) {
- LOG(spam, "Tls sync failed, retrying");
+ LOG(debug, "Tls sync failed, retrying");
sleep(1);
continue;
}
if (syncedTo >= syncTo) {
- LOG(spam, "Tls sync complete, reached %" PRIu64", returning", syncedTo);
+ LOG(debug, "Tls sync complete, reached %" PRIu64", returning", syncedTo);
return syncedTo;
}
- LOG(spam, "Tls sync incomplete, reached %" PRIu64 ", retrying", syncedTo);
+ LOG(debug, "Tls sync incomplete, reached %" PRIu64 ", retrying", syncedTo);
}
throw IllegalStateException(make_string("Failed to sync TLS to token %" PRIu64 ".", syncTo));
}
@@ -402,6 +402,9 @@ FeedHandler::FeedHandler(IThreadingService &writeService,
_tlsReplayProgress(),
_serialNum(0),
_prunedSerialNum(0),
+ _numOperationsPendingCommit(0),
+ _numOperationsCompleted(0),
+ _numCommitsCompleted(0),
_delayedPrune(false),
_feedLock(),
_feedState(make_shared<InitState>(getDocTypeName())),
@@ -495,11 +498,46 @@ FeedHandler::getTransactionLogReplayDone() const {
}
void
+FeedHandler::onCommitDone(size_t numPendingAtStart) {
+ assert(numPendingAtStart <= _numOperationsPendingCommit);
+ _numOperationsPendingCommit -= numPendingAtStart;
+ _numOperationsCompleted += numPendingAtStart;
+ _numCommitsCompleted++;
+ if (_numOperationsPendingCommit > 0) {
+ enqueCommitTask();
+ }
+ LOG(spam, "%zu: onCommitDone(%zu) total=%zu left=%zu",
+ _numCommitsCompleted, numPendingAtStart, _numOperationsCompleted, _numOperationsPendingCommit);
+}
+
+void FeedHandler::enqueCommitTask() {
+ _writeService.master().execute(makeLambdaTask([this]() { initiateCommit(); }));
+}
+
+void
+FeedHandler::initiateCommit() {
+ auto onCommitDoneContext = std::make_shared<OnCommitDone>(
+ _writeService.master(),
+ makeLambdaTask([this, numPendingAtStart=_numOperationsPendingCommit]() {
+ onCommitDone(numPendingAtStart);
+ }));
+ auto commitResult = _tlsWriter->startCommit(onCommitDoneContext);
+ if (_activeFeedView) {
+ using KeepAlivePair = KeepAlive<std::pair<CommitResult, DoneCallback>>;
+ auto pair = std::make_pair(std::move(commitResult), std::move(onCommitDoneContext));
+ _activeFeedView->forceCommit(_serialNum, std::make_shared<KeepAlivePair>(std::move(pair)));
+ }
+}
+
+void
FeedHandler::appendOperation(const FeedOperation &op, TlsWriter::DoneCallback onDone) {
if (!op.getSerialNum()) {
const_cast<FeedOperation &>(op).setSerialNum(incSerialNum());
}
_tlsWriter->appendOperation(op, std::move(onDone));
+ if (++_numOperationsPendingCommit == 1) {
+ enqueCommitTask();
+ }
}
FeedHandler::CommitResult
diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h
index 4807c596130..c295b26a759 100644
--- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h
+++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h
@@ -76,6 +76,9 @@ private:
// the serial num of the last message in the transaction log
SerialNum _serialNum;
SerialNum _prunedSerialNum;
+ size_t _numOperationsPendingCommit;
+ size_t _numOperationsCompleted;
+ size_t _numCommitsCompleted;
bool _delayedPrune;
mutable std::shared_mutex _feedLock;
FeedStateSP _feedState;
@@ -125,6 +128,9 @@ private:
FeedStateSP getFeedState() const;
void changeFeedState(FeedStateSP newState);
void doChangeFeedState(FeedStateSP newState);
+ void onCommitDone(size_t numPendingAtStart);
+ void initiateCommit();
+ void enqueCommitTask();
public:
FeedHandler(const FeedHandler &) = delete;
FeedHandler & operator = (const FeedHandler &) = delete;