aboutsummaryrefslogtreecommitdiffstats
path: root/searchcore
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2017-08-04 22:06:33 +0200
committerHenning Baldersheim <balder@yahoo-inc.com>2017-08-04 22:06:33 +0200
commit9c0310165ac26d8151a0a58d32bcc92260c8b422 (patch)
tree787d4b4febd6a6310832cba5d1f16d06270eb6b4 /searchcore
parent89b3dac6831e9c48d031029b34d1c83831c96239 (diff)
Ensure ack is delayed until visibility is achieved.
Diffstat (limited to 'searchcore')
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp22
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h4
2 files changed, 16 insertions, 10 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp
index 1998fe806cc..b0fbee46320 100644
--- a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp
@@ -288,11 +288,11 @@ StoreOnlyFeedView::internalPut(FeedToken::UP token, const PutOperation &putOp)
bool docAlreadyExists = putOp.getValidPrevDbdId(_params._subDbId);
if (putOp.getValidDbdId(_params._subDbId)) {
- putSummary(serialNum, putOp.getLid(), doc);
bool immediateCommit = _commitTimeTracker.needCommit();
std::shared_ptr<PutDoneContext> onWriteDone =
createPutDoneContext(token, putOp.getType(), _params._metrics,
immediateCommit && putOp.getLid() >= oldDocIdLimit);
+ putSummary(serialNum, putOp.getLid(), doc, onWriteDone);
putAttributes(serialNum, putOp.getLid(), *doc, immediateCommit, onWriteDone);
putIndexedFields(serialNum, putOp.getLid(), doc, immediateCommit, onWriteDone);
}
@@ -350,10 +350,13 @@ StoreOnlyFeedView::handleUpdate(FeedToken *token, const UpdateOperation &updOp)
internalUpdate(dupFeedToken(token), updOp);
}
-void StoreOnlyFeedView::putSummary(SerialNum serialNum, search::DocumentIdT lid, FutureDoc futureDoc) {
+void StoreOnlyFeedView::putSummary(SerialNum serialNum, search::DocumentIdT lid,
+ FutureDoc futureDoc, OnOperationDoneType onDone)
+{
_pendingLidTracker.produce(lid);
summaryExecutor().execute(
- makeLambdaTask([serialNum, futureDoc, lid, this] {
+ makeLambdaTask([serialNum, futureDoc = std::move(futureDoc), lid, onDone, this] {
+ (void) onDone;
const Document::UP & doc = futureDoc.get();
if (doc) {
_summaryAdapter->put(serialNum, *futureDoc.get(), lid);
@@ -361,10 +364,13 @@ void StoreOnlyFeedView::putSummary(SerialNum serialNum, search::DocumentIdT lid
_pendingLidTracker.consume(lid);
}));
}
-void StoreOnlyFeedView::putSummary(SerialNum serialNum, search::DocumentIdT lid, Document::SP doc) {
+void StoreOnlyFeedView::putSummary(SerialNum serialNum, search::DocumentIdT lid,
+ Document::SP doc, OnOperationDoneType onDone)
+{
_pendingLidTracker.produce(lid);
summaryExecutor().execute(
- makeLambdaTask([serialNum, doc = std::move(doc), lid, this] {
+ makeLambdaTask([serialNum, doc = std::move(doc), onDone, lid, this] {
+ (void) onDone;
_summaryAdapter->put(serialNum, *doc, lid);
_pendingLidTracker.consume(lid);
}));
@@ -427,7 +433,7 @@ StoreOnlyFeedView::internalUpdate(FeedToken::UP token, const UpdateOperation &up
updateIndexedFields(serialNum, lid, futureDoc, immediateCommit, onWriteDone);
}
if (useDocumentStore(serialNum)) {
- putSummary(serialNum, lid, futureDoc);
+ putSummary(serialNum, lid, futureDoc, onWriteDone);
}
Document::UP prevDoc(_summaryAdapter->get(lid, *_repo));
@@ -549,7 +555,7 @@ StoreOnlyFeedView::internalRemove(FeedToken::UP token, const RemoveOperation &rm
Document::UP clearDoc(new Document(*_docType, docId));
clearDoc->setRepo(*_repo);
- putSummary(serialNum, rmOp.getLid(), std::move(clearDoc));
+ putSummary(serialNum, rmOp.getLid(), std::move(clearDoc), std::shared_ptr<OperationDoneContext>());
}
if (rmOp.getValidPrevDbdId(_params._subDbId)) {
if (rmOp.changedDbdId()) {
@@ -722,12 +728,12 @@ StoreOnlyFeedView::handleMove(const MoveOperation &moveOp, IDestructorCallback::
adjustMetaStore(moveOp, docId);
bool docAlreadyExists = moveOp.getValidPrevDbdId(_params._subDbId);
if (moveOp.getValidDbdId(_params._subDbId)) {
- putSummary(serialNum, moveOp.getLid(), doc);
bool immediateCommit = _commitTimeTracker.needCommit();
FeedToken::UP token;
std::shared_ptr<PutDoneContext> onWriteDone =
createPutDoneContext(token, moveOp.getType(), _params._metrics,
immediateCommit && (moveOp.getLid() >= oldDocIdLimit), doneCtx);
+ putSummary(serialNum, moveOp.getLid(), doc, onWriteDone);
putAttributes(serialNum, moveOp.getLid(), *doc, immediateCommit, onWriteDone);
putIndexedFields(serialNum, moveOp.getLid(), doc, immediateCommit, onWriteDone);
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h
index e12a13d3f8a..e0a871c9cc6 100644
--- a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h
+++ b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h
@@ -145,8 +145,8 @@ private:
searchcorespi::index::IThreadService & summaryExecutor() {
return _writeService.summary();
}
- void putSummary(SerialNum serialNum, search::DocumentIdT lid, FutureDoc doc);
- void putSummary(SerialNum serialNum, search::DocumentIdT lid, Document::SP doc);
+ void putSummary(SerialNum serialNum, search::DocumentIdT lid, FutureDoc doc, OnOperationDoneType onDone);
+ void putSummary(SerialNum serialNum, search::DocumentIdT lid, Document::SP doc, OnOperationDoneType onDone);
void removeSummary(SerialNum serialNum, search::DocumentIdT lid);
void heartBeatSummary(SerialNum serialNum);