summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2017-08-05 00:05:57 +0200
committerGitHub <noreply@github.com>2017-08-05 00:05:57 +0200
commit3055b8c99bfdaf304648b0156ede5c797c881080 (patch)
treec4a5fabb422957d60794c6e7fcb7210f92f5e6ee
parent07eb55cc83561ffaebead2415729a37b374a1097 (diff)
parent2dc446532299fc32b92c045a3fbba53f7625b50b (diff)
Merge pull request #3052 from yahoo/balder/delay-ack
Balder/delay ack
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp25
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h4
-rw-r--r--searchlib/src/vespa/searchlib/common/isequencedtaskexecutor.h11
-rw-r--r--searchlib/src/vespa/searchlib/common/lambdatask.h5
4 files changed, 26 insertions, 19 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp
index 1998fe806cc..5601a0b5911 100644
--- a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp
@@ -288,11 +288,11 @@ StoreOnlyFeedView::internalPut(FeedToken::UP token, const PutOperation &putOp)
bool docAlreadyExists = putOp.getValidPrevDbdId(_params._subDbId);
if (putOp.getValidDbdId(_params._subDbId)) {
- putSummary(serialNum, putOp.getLid(), doc);
bool immediateCommit = _commitTimeTracker.needCommit();
std::shared_ptr<PutDoneContext> onWriteDone =
createPutDoneContext(token, putOp.getType(), _params._metrics,
immediateCommit && putOp.getLid() >= oldDocIdLimit);
+ putSummary(serialNum, putOp.getLid(), doc, onWriteDone);
putAttributes(serialNum, putOp.getLid(), *doc, immediateCommit, onWriteDone);
putIndexedFields(serialNum, putOp.getLid(), doc, immediateCommit, onWriteDone);
}
@@ -350,10 +350,13 @@ StoreOnlyFeedView::handleUpdate(FeedToken *token, const UpdateOperation &updOp)
internalUpdate(dupFeedToken(token), updOp);
}
-void StoreOnlyFeedView::putSummary(SerialNum serialNum, search::DocumentIdT lid, FutureDoc futureDoc) {
+void StoreOnlyFeedView::putSummary(SerialNum serialNum, search::DocumentIdT lid,
+ const FutureDoc & futureDoc, OnOperationDoneType onDone)
+{
_pendingLidTracker.produce(lid);
summaryExecutor().execute(
- makeLambdaTask([serialNum, futureDoc, lid, this] {
+ makeLambdaTask([serialNum, lid, futureDoc, onDone, this] {
+ (void) onDone;
const Document::UP & doc = futureDoc.get();
if (doc) {
_summaryAdapter->put(serialNum, *futureDoc.get(), lid);
@@ -361,10 +364,13 @@ void StoreOnlyFeedView::putSummary(SerialNum serialNum, search::DocumentIdT lid
_pendingLidTracker.consume(lid);
}));
}
-void StoreOnlyFeedView::putSummary(SerialNum serialNum, search::DocumentIdT lid, Document::SP doc) {
+void StoreOnlyFeedView::putSummary(SerialNum serialNum, search::DocumentIdT lid,
+ Document::SP doc, OnOperationDoneType onDone)
+{
_pendingLidTracker.produce(lid);
summaryExecutor().execute(
- makeLambdaTask([serialNum, doc = std::move(doc), lid, this] {
+ makeLambdaTask([serialNum, doc = std::move(doc), onDone, lid, this] {
+ (void) onDone;
_summaryAdapter->put(serialNum, *doc, lid);
_pendingLidTracker.consume(lid);
}));
@@ -427,14 +433,13 @@ StoreOnlyFeedView::internalUpdate(FeedToken::UP token, const UpdateOperation &up
updateIndexedFields(serialNum, lid, futureDoc, immediateCommit, onWriteDone);
}
if (useDocumentStore(serialNum)) {
- putSummary(serialNum, lid, futureDoc);
+ putSummary(serialNum, lid, futureDoc, onWriteDone);
}
- Document::UP prevDoc(_summaryAdapter->get(lid, *_repo));
_writeService
.attributeFieldWriter()
.execute(serialNum,
- [upd = updOp.getUpdate(), serialNum, prevDoc = std::move(prevDoc), onWriteDone,
+ [upd = updOp.getUpdate(), serialNum, prevDoc = _summaryAdapter->get(lid, *_repo), onWriteDone,
promisedDoc = std::move(promisedDoc), this]() mutable
{
makeUpdatedDocument(serialNum, std::move(prevDoc), upd,
@@ -549,7 +554,7 @@ StoreOnlyFeedView::internalRemove(FeedToken::UP token, const RemoveOperation &rm
Document::UP clearDoc(new Document(*_docType, docId));
clearDoc->setRepo(*_repo);
- putSummary(serialNum, rmOp.getLid(), std::move(clearDoc));
+ putSummary(serialNum, rmOp.getLid(), std::move(clearDoc), std::shared_ptr<OperationDoneContext>());
}
if (rmOp.getValidPrevDbdId(_params._subDbId)) {
if (rmOp.changedDbdId()) {
@@ -722,12 +727,12 @@ StoreOnlyFeedView::handleMove(const MoveOperation &moveOp, IDestructorCallback::
adjustMetaStore(moveOp, docId);
bool docAlreadyExists = moveOp.getValidPrevDbdId(_params._subDbId);
if (moveOp.getValidDbdId(_params._subDbId)) {
- putSummary(serialNum, moveOp.getLid(), doc);
bool immediateCommit = _commitTimeTracker.needCommit();
FeedToken::UP token;
std::shared_ptr<PutDoneContext> onWriteDone =
createPutDoneContext(token, moveOp.getType(), _params._metrics,
immediateCommit && (moveOp.getLid() >= oldDocIdLimit), doneCtx);
+ putSummary(serialNum, moveOp.getLid(), doc, onWriteDone);
putAttributes(serialNum, moveOp.getLid(), *doc, immediateCommit, onWriteDone);
putIndexedFields(serialNum, moveOp.getLid(), doc, immediateCommit, onWriteDone);
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h
index e12a13d3f8a..2cec0aba4f9 100644
--- a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h
+++ b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h
@@ -145,8 +145,8 @@ private:
searchcorespi::index::IThreadService & summaryExecutor() {
return _writeService.summary();
}
- void putSummary(SerialNum serialNum, search::DocumentIdT lid, FutureDoc doc);
- void putSummary(SerialNum serialNum, search::DocumentIdT lid, Document::SP doc);
+ void putSummary(SerialNum serialNum, search::DocumentIdT lid, const FutureDoc & doc, OnOperationDoneType onDone);
+ void putSummary(SerialNum serialNum, search::DocumentIdT lid, Document::SP doc, OnOperationDoneType onDone);
void removeSummary(SerialNum serialNum, search::DocumentIdT lid);
void heartBeatSummary(SerialNum serialNum);
diff --git a/searchlib/src/vespa/searchlib/common/isequencedtaskexecutor.h b/searchlib/src/vespa/searchlib/common/isequencedtaskexecutor.h
index 765f73e67e5..6984f696117 100644
--- a/searchlib/src/vespa/searchlib/common/isequencedtaskexecutor.h
+++ b/searchlib/src/vespa/searchlib/common/isequencedtaskexecutor.h
@@ -26,7 +26,7 @@ public:
*/
virtual uint32_t getExecutorId(uint64_t componentId) = 0;
- inline uint32_t getExecutorId(vespalib::stringref componentId) {
+ uint32_t getExecutorId(vespalib::stringref componentId) {
vespalib::hash<vespalib::stringref> hashfun;
return getExecutorId(hashfun(componentId));
}
@@ -49,7 +49,7 @@ public:
* @param function function to be wrapped in a task and later executed
*/
template <class FunctionType>
- inline void executeLambda(uint32_t executorId, FunctionType &&function) {
+ void executeLambda(uint32_t executorId, FunctionType &&function) {
executeTask(executorId, makeLambdaTask(std::forward<FunctionType>(function)));
}
/**
@@ -67,7 +67,7 @@ public:
* @param function function to be wrapped in a task and later executed
*/
template <class FunctionType>
- inline void execute(uint64_t componentId, FunctionType &&function) {
+ void execute(uint64_t componentId, FunctionType &&function) {
uint32_t executorId = getExecutorId(componentId);
executeTask(executorId, makeLambdaTask(std::forward<FunctionType>(function)));
}
@@ -82,10 +82,9 @@ public:
* @param function function to be wrapped in a task and later executed
*/
template <class FunctionType>
- inline void execute(vespalib::stringref componentId, FunctionType &&function) {
+ void execute(vespalib::stringref componentId, FunctionType &&function) {
uint32_t executorId = getExecutorId(componentId);
- executeTask(executorId,
- makeLambdaTask(std::forward<FunctionType>(function)));
+ executeTask(executorId, makeLambdaTask(std::forward<FunctionType>(function)));
}
};
diff --git a/searchlib/src/vespa/searchlib/common/lambdatask.h b/searchlib/src/vespa/searchlib/common/lambdatask.h
index 7df22813010..01b57694d11 100644
--- a/searchlib/src/vespa/searchlib/common/lambdatask.h
+++ b/searchlib/src/vespa/searchlib/common/lambdatask.h
@@ -12,11 +12,14 @@ class LambdaTask : public vespalib::Executor::Task {
public:
LambdaTask(const FunctionType &func) : _func(func) {}
LambdaTask(FunctionType &&func) : _func(std::move(func)) {}
+ LambdaTask(const LambdaTask &) = delete;
+ LambdaTask & operator = (const LambdaTask &) = delete;
+ ~LambdaTask() {}
void run() override { _func(); }
};
template <class FunctionType>
-inline vespalib::Executor::Task::UP
+vespalib::Executor::Task::UP
makeLambdaTask(FunctionType &&function)
{
return std::make_unique<LambdaTask<std::decay_t<FunctionType>>>