diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2017-08-05 00:05:57 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-08-05 00:05:57 +0200 |
commit | 3055b8c99bfdaf304648b0156ede5c797c881080 (patch) | |
tree | c4a5fabb422957d60794c6e7fcb7210f92f5e6ee | |
parent | 07eb55cc83561ffaebead2415729a37b374a1097 (diff) | |
parent | 2dc446532299fc32b92c045a3fbba53f7625b50b (diff) |
Merge pull request #3052 from yahoo/balder/delay-ack
Balder/delay ack
4 files changed, 26 insertions, 19 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp index 1998fe806cc..5601a0b5911 100644 --- a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp @@ -288,11 +288,11 @@ StoreOnlyFeedView::internalPut(FeedToken::UP token, const PutOperation &putOp) bool docAlreadyExists = putOp.getValidPrevDbdId(_params._subDbId); if (putOp.getValidDbdId(_params._subDbId)) { - putSummary(serialNum, putOp.getLid(), doc); bool immediateCommit = _commitTimeTracker.needCommit(); std::shared_ptr<PutDoneContext> onWriteDone = createPutDoneContext(token, putOp.getType(), _params._metrics, immediateCommit && putOp.getLid() >= oldDocIdLimit); + putSummary(serialNum, putOp.getLid(), doc, onWriteDone); putAttributes(serialNum, putOp.getLid(), *doc, immediateCommit, onWriteDone); putIndexedFields(serialNum, putOp.getLid(), doc, immediateCommit, onWriteDone); } @@ -350,10 +350,13 @@ StoreOnlyFeedView::handleUpdate(FeedToken *token, const UpdateOperation &updOp) internalUpdate(dupFeedToken(token), updOp); } -void StoreOnlyFeedView::putSummary(SerialNum serialNum, search::DocumentIdT lid, FutureDoc futureDoc) { +void StoreOnlyFeedView::putSummary(SerialNum serialNum, search::DocumentIdT lid, + const FutureDoc & futureDoc, OnOperationDoneType onDone) +{ _pendingLidTracker.produce(lid); summaryExecutor().execute( - makeLambdaTask([serialNum, futureDoc, lid, this] { + makeLambdaTask([serialNum, lid, futureDoc, onDone, this] { + (void) onDone; const Document::UP & doc = futureDoc.get(); if (doc) { _summaryAdapter->put(serialNum, *futureDoc.get(), lid); @@ -361,10 +364,13 @@ void StoreOnlyFeedView::putSummary(SerialNum serialNum, search::DocumentIdT lid _pendingLidTracker.consume(lid); })); } -void StoreOnlyFeedView::putSummary(SerialNum serialNum, search::DocumentIdT lid, Document::SP doc) { +void StoreOnlyFeedView::putSummary(SerialNum serialNum, search::DocumentIdT lid, + Document::SP doc, OnOperationDoneType onDone) +{ _pendingLidTracker.produce(lid); summaryExecutor().execute( - makeLambdaTask([serialNum, doc = std::move(doc), lid, this] { + makeLambdaTask([serialNum, doc = std::move(doc), onDone, lid, this] { + (void) onDone; _summaryAdapter->put(serialNum, *doc, lid); _pendingLidTracker.consume(lid); })); @@ -427,14 +433,13 @@ StoreOnlyFeedView::internalUpdate(FeedToken::UP token, const UpdateOperation &up updateIndexedFields(serialNum, lid, futureDoc, immediateCommit, onWriteDone); } if (useDocumentStore(serialNum)) { - putSummary(serialNum, lid, futureDoc); + putSummary(serialNum, lid, futureDoc, onWriteDone); } - Document::UP prevDoc(_summaryAdapter->get(lid, *_repo)); _writeService .attributeFieldWriter() .execute(serialNum, - [upd = updOp.getUpdate(), serialNum, prevDoc = std::move(prevDoc), onWriteDone, + [upd = updOp.getUpdate(), serialNum, prevDoc = _summaryAdapter->get(lid, *_repo), onWriteDone, promisedDoc = std::move(promisedDoc), this]() mutable { makeUpdatedDocument(serialNum, std::move(prevDoc), upd, @@ -549,7 +554,7 @@ StoreOnlyFeedView::internalRemove(FeedToken::UP token, const RemoveOperation &rm Document::UP clearDoc(new Document(*_docType, docId)); clearDoc->setRepo(*_repo); - putSummary(serialNum, rmOp.getLid(), std::move(clearDoc)); + putSummary(serialNum, rmOp.getLid(), std::move(clearDoc), std::shared_ptr<OperationDoneContext>()); } if (rmOp.getValidPrevDbdId(_params._subDbId)) { if (rmOp.changedDbdId()) { @@ -722,12 +727,12 @@ StoreOnlyFeedView::handleMove(const MoveOperation &moveOp, IDestructorCallback:: adjustMetaStore(moveOp, docId); bool docAlreadyExists = moveOp.getValidPrevDbdId(_params._subDbId); if (moveOp.getValidDbdId(_params._subDbId)) { - putSummary(serialNum, moveOp.getLid(), doc); bool immediateCommit = _commitTimeTracker.needCommit(); FeedToken::UP token; std::shared_ptr<PutDoneContext> onWriteDone = createPutDoneContext(token, moveOp.getType(), _params._metrics, immediateCommit && (moveOp.getLid() >= oldDocIdLimit), doneCtx); + putSummary(serialNum, moveOp.getLid(), doc, onWriteDone); putAttributes(serialNum, moveOp.getLid(), *doc, immediateCommit, onWriteDone); putIndexedFields(serialNum, moveOp.getLid(), doc, immediateCommit, onWriteDone); } diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h index e12a13d3f8a..2cec0aba4f9 100644 --- a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h +++ b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h @@ -145,8 +145,8 @@ private: searchcorespi::index::IThreadService & summaryExecutor() { return _writeService.summary(); } - void putSummary(SerialNum serialNum, search::DocumentIdT lid, FutureDoc doc); - void putSummary(SerialNum serialNum, search::DocumentIdT lid, Document::SP doc); + void putSummary(SerialNum serialNum, search::DocumentIdT lid, const FutureDoc & doc, OnOperationDoneType onDone); + void putSummary(SerialNum serialNum, search::DocumentIdT lid, Document::SP doc, OnOperationDoneType onDone); void removeSummary(SerialNum serialNum, search::DocumentIdT lid); void heartBeatSummary(SerialNum serialNum); diff --git a/searchlib/src/vespa/searchlib/common/isequencedtaskexecutor.h b/searchlib/src/vespa/searchlib/common/isequencedtaskexecutor.h index 765f73e67e5..6984f696117 100644 --- a/searchlib/src/vespa/searchlib/common/isequencedtaskexecutor.h +++ b/searchlib/src/vespa/searchlib/common/isequencedtaskexecutor.h @@ -26,7 +26,7 @@ public: */ virtual uint32_t getExecutorId(uint64_t componentId) = 0; - inline uint32_t getExecutorId(vespalib::stringref componentId) { + uint32_t getExecutorId(vespalib::stringref componentId) { vespalib::hash<vespalib::stringref> hashfun; return getExecutorId(hashfun(componentId)); } @@ -49,7 +49,7 @@ public: * @param function function to be wrapped in a task and later executed */ template <class FunctionType> - inline void executeLambda(uint32_t executorId, FunctionType &&function) { + void executeLambda(uint32_t executorId, FunctionType &&function) { executeTask(executorId, makeLambdaTask(std::forward<FunctionType>(function))); } /** @@ -67,7 +67,7 @@ public: * @param function function to be wrapped in a task and later executed */ template <class FunctionType> - inline void execute(uint64_t componentId, FunctionType &&function) { + void execute(uint64_t componentId, FunctionType &&function) { uint32_t executorId = getExecutorId(componentId); executeTask(executorId, makeLambdaTask(std::forward<FunctionType>(function))); } @@ -82,10 +82,9 @@ public: * @param function function to be wrapped in a task and later executed */ template <class FunctionType> - inline void execute(vespalib::stringref componentId, FunctionType &&function) { + void execute(vespalib::stringref componentId, FunctionType &&function) { uint32_t executorId = getExecutorId(componentId); - executeTask(executorId, - makeLambdaTask(std::forward<FunctionType>(function))); + executeTask(executorId, makeLambdaTask(std::forward<FunctionType>(function))); } }; diff --git a/searchlib/src/vespa/searchlib/common/lambdatask.h b/searchlib/src/vespa/searchlib/common/lambdatask.h index 7df22813010..01b57694d11 100644 --- a/searchlib/src/vespa/searchlib/common/lambdatask.h +++ b/searchlib/src/vespa/searchlib/common/lambdatask.h @@ -12,11 +12,14 @@ class LambdaTask : public vespalib::Executor::Task { public: LambdaTask(const FunctionType &func) : _func(func) {} LambdaTask(FunctionType &&func) : _func(std::move(func)) {} + LambdaTask(const LambdaTask &) = delete; + LambdaTask & operator = (const LambdaTask &) = delete; + ~LambdaTask() {} void run() override { _func(); } }; template <class FunctionType> -inline vespalib::Executor::Task::UP +vespalib::Executor::Task::UP makeLambdaTask(FunctionType &&function) { return std::make_unique<LambdaTask<std::decay_t<FunctionType>>> |