summaryrefslogtreecommitdiffstats
path: root/searchcore
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2017-08-10 18:25:01 +0200
committerHenning Baldersheim <balder@yahoo-inc.com>2017-08-10 18:25:40 +0200
commit56a8d3e424f8d1ddb09045fd5686a1c608d6bd42 (patch)
tree78d116c3903040a9379766a646f13f8709ca0d28 /searchcore
parenta35807d95255625553daa0db745df0e422172fd8 (diff)
Also run the serialization in different threadpool.
Diffstat (limited to 'searchcore')
-rw-r--r--searchcore/src/tests/proton/docsummary/docsummary.cpp32
-rw-r--r--searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp26
-rw-r--r--searchcore/src/tests/proton/documentdb/storeonlyfeedview/storeonlyfeedview_test.cpp8
-rw-r--r--searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.cpp11
-rw-r--r--searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.h5
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/isummaryadapter.h41
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.cpp11
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.h69
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp38
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h10
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/summaryadapter.cpp50
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/summaryadapter.h46
-rw-r--r--searchcore/src/vespa/searchcore/proton/test/dummy_document_store.h3
-rw-r--r--searchcore/src/vespa/searchcore/proton/test/mock_summary_adapter.h22
14 files changed, 185 insertions, 187 deletions
diff --git a/searchcore/src/tests/proton/docsummary/docsummary.cpp b/searchcore/src/tests/proton/docsummary/docsummary.cpp
index b8367330fbd..f492dc44fc6 100644
--- a/searchcore/src/tests/proton/docsummary/docsummary.cpp
+++ b/searchcore/src/tests/proton/docsummary/docsummary.cpp
@@ -114,7 +114,7 @@ public:
endDocument(uint32_t docId)
{
Document::UP doc = _bld.endDocument();
- _str.write(_serialNum++, *doc, docId);
+ _str.write(_serialNum++, docId, *doc);
}
FieldCacheRepo::UP createFieldCacheRepo(const ResultConfig &resConfig) const {
@@ -259,9 +259,8 @@ public:
LOG_ASSERT(putRes.ok());
uint64_t serialNum = _ddb->getFeedHandler().incSerialNum();
_aw->put(serialNum, doc, lid, true, std::shared_ptr<IDestructorCallback>());
- _ddb->getReadySubDB()->
- getAttributeManager()->getAttributeFieldWriter().sync();
- _sa->put(serialNum, doc, lid);
+ _ddb->getReadySubDB()->getAttributeManager()->getAttributeFieldWriter().sync();
+ _sa->put(serialNum, lid, doc);
const GlobalId &gid = docId.getGlobalId();
BucketId bucketId(gid.convertToBucketId());
bucketId.setUsedBits(8);
@@ -269,9 +268,7 @@ public:
DbDocumentId dbdId(lid);
DbDocumentId prevDbdId(0);
document::Document::SP xdoc(new document::Document(doc));
- PutOperation op(bucketId,
- ts,
- xdoc);
+ PutOperation op(bucketId, ts, xdoc);
op.setSerialNum(serialNum);
op.setDbDocumentId(dbdId);
op.setPrevDbDocumentId(prevDbdId);
@@ -601,7 +598,6 @@ GlobalId gid3 = DocumentId("doc::3").getGlobalId(); // lid 3
GlobalId gid4 = DocumentId("doc::4").getGlobalId(); // lid 4
GlobalId gid9 = DocumentId("doc::9").getGlobalId(); // not existing
-
void
Test::requireThatDocsumRequestIsProcessed()
{
@@ -861,9 +857,8 @@ Test::requireThatSummaryAdapterHandlesPutAndRemove()
addStr("foo").
endField().
endDocument();
- dc._sa->put(1, *exp, 1);
- IDocumentStore & store =
- dc._ddb->getReadySubDB()->getSummaryManager()->getBackingStore();
+ dc._sa->put(1, 1, *exp);
+ IDocumentStore & store = dc._ddb->getReadySubDB()->getSummaryManager()->getBackingStore();
Document::UP act = store.read(1, *bc._repo);
EXPECT_TRUE(act.get() != NULL);
EXPECT_EQUAL(exp->getType(), act->getType());
@@ -915,10 +910,9 @@ Test::requireThatAnnotationsAreUsed()
setAutoAnnotate(true).
endField().
endDocument();
- dc._sa->put(1, *exp, 1);
+ dc._sa->put(1, 1, *exp);
- IDocumentStore & store =
- dc._ddb->getReadySubDB()->getSummaryManager()->getBackingStore();
+ IDocumentStore & store = dc._ddb->getReadySubDB()->getSummaryManager()->getBackingStore();
Document::UP act = store.read(1, *bc._repo);
EXPECT_TRUE(act.get() != NULL);
EXPECT_EQUAL(exp->getType(), act->getType());
@@ -1074,7 +1068,7 @@ Test::requireThatUrisAreUsed()
endElement().
endField().
endDocument();
- dc._sa->put(1, *exp, 1);
+ dc._sa->put(1, 1, *exp);
IDocumentStore & store =
dc._ddb->getReadySubDB()->getSummaryManager()->getBackingStore();
@@ -1086,8 +1080,7 @@ Test::requireThatUrisAreUsed()
bc.createFieldCacheRepo(getResultConfig())->getFieldCache("class0"),
getMarkupFields());
- EXPECT_TRUE(assertString("http://www.example.com:81/fluke?ab=2#4",
- "urisingle", dsa, 1));
+ EXPECT_TRUE(assertString("http://www.example.com:81/fluke?ab=2#4", "urisingle", dsa, 1));
GeneralResultPtr res = getResult(dsa, 1);
{
vespalib::Slime slime;
@@ -1213,10 +1206,9 @@ Test::requireThatRawFieldsWorks()
endElement().
endField().
endDocument();
- dc._sa->put(1, *exp, 1);
+ dc._sa->put(1, 1, *exp);
- IDocumentStore & store =
- dc._ddb->getReadySubDB()->getSummaryManager()->getBackingStore();
+ IDocumentStore & store = dc._ddb->getReadySubDB()->getSummaryManager()->getBackingStore();
Document::UP act = store.read(1, *bc._repo);
EXPECT_TRUE(act.get() != NULL);
EXPECT_EQUAL(exp->getType(), act->getType());
diff --git a/searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp b/searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp
index ed0e5091aec..6667709db47 100644
--- a/searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp
@@ -222,11 +222,13 @@ public:
struct MyDocumentStore : public test::DummyDocumentStore
{
typedef std::map<DocumentIdT, document::Document::SP> DocMap;
+ const document::DocumentTypeRepo & _repo;
DocMap _docs;
uint64_t _lastSyncToken;
uint32_t _compactLidSpaceLidLimit;
- MyDocumentStore()
+ MyDocumentStore(const document::DocumentTypeRepo & repo)
: test::DummyDocumentStore("."),
+ _repo(repo),
_docs(),
_lastSyncToken(0),
_compactLidSpaceLidLimit(0)
@@ -239,10 +241,14 @@ struct MyDocumentStore : public test::DummyDocumentStore
}
return Document::UP();
}
- virtual void write(uint64_t syncToken, const document::Document& doc, DocumentIdT lid) override {
+ virtual void write(uint64_t syncToken, DocumentIdT lid, const document::Document& doc) override {
_lastSyncToken = syncToken;
_docs[lid] = Document::SP(doc.clone());
}
+ virtual void write(uint64_t syncToken, DocumentIdT lid, const vespalib::nbostream & os) override {
+ _lastSyncToken = syncToken;
+ _docs[lid] = std::make_shared<Document>(_repo, const_cast<vespalib::nbostream &>(os));
+ }
virtual void remove(uint64_t syncToken, DocumentIdT lid) override {
_lastSyncToken = syncToken;
_docs.erase(lid);
@@ -259,7 +265,7 @@ struct MyDocumentStore : public test::DummyDocumentStore
struct MySummaryManager : public test::DummySummaryManager
{
MyDocumentStore _store;
- MySummaryManager() : _store() {}
+ MySummaryManager(const document::DocumentTypeRepo & repo) : _store(repo) {}
virtual search::IDocumentStore &getBackingStore() override { return _store; }
};
@@ -269,14 +275,14 @@ struct MySummaryAdapter : public test::MockSummaryAdapter
MyDocumentStore &_store;
MyLidVector _removes;
- MySummaryAdapter()
- : _sumMgr(new MySummaryManager()),
+ MySummaryAdapter(const document::DocumentTypeRepo & repo)
+ : _sumMgr(new MySummaryManager(repo)),
_store(static_cast<MyDocumentStore &>(_sumMgr->getBackingStore())),
_removes()
{}
- virtual void put(SerialNum serialNum, const document::Document &doc, const DocumentIdT lid) override {
+ virtual void put(SerialNum serialNum, DocumentIdT lid, const document::Document &doc) override {
(void) serialNum;
- _store.write(serialNum, doc, lid);
+ _store.write(serialNum, lid, doc);
}
virtual void remove(SerialNum serialNum, const DocumentIdT lid) override {
LOG(info,
@@ -511,13 +517,13 @@ FeedTokenContext::~FeedTokenContext() {}
struct FixtureBase
{
MyTracer _tracer;
+ SchemaContext sc;
IIndexWriter::SP iw;
ISummaryAdapter::SP sa;
IAttributeWriter::SP aw;
MyIndexWriter &miw;
MySummaryAdapter &msa;
MyAttributeWriter &maw;
- SchemaContext sc;
DocIdLimit _docIdLimit;
DocumentMetaStoreContext::SP _dmscReal;
test::DocumentMetaStoreContextObserver::SP _dmsc;
@@ -699,13 +705,13 @@ struct FixtureBase
FixtureBase::FixtureBase(TimeStamp visibilityDelay)
: _tracer(),
+ sc(),
iw(new MyIndexWriter(_tracer)),
- sa(new MySummaryAdapter),
+ sa(new MySummaryAdapter(*sc._builder->getDocumentTypeRepo())),
aw(new MyAttributeWriter(_tracer)),
miw(static_cast<MyIndexWriter&>(*iw)),
msa(static_cast<MySummaryAdapter&>(*sa)),
maw(static_cast<MyAttributeWriter&>(*aw)),
- sc(),
_docIdLimit(0u),
_dmscReal(new DocumentMetaStoreContext(std::make_shared<BucketDBOwner>())),
_dmsc(new test::DocumentMetaStoreContextObserver(*_dmscReal)),
diff --git a/searchcore/src/tests/proton/documentdb/storeonlyfeedview/storeonlyfeedview_test.cpp b/searchcore/src/tests/proton/documentdb/storeonlyfeedview/storeonlyfeedview_test.cpp
index ed265bca79d..15bd55441d4 100644
--- a/searchcore/src/tests/proton/documentdb/storeonlyfeedview/storeonlyfeedview_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/storeonlyfeedview/storeonlyfeedview_test.cpp
@@ -52,9 +52,11 @@ public:
_putCount(putCount),
_heartbeatCount(heartbeatCount) {
}
- virtual void put(SerialNum, const Document &, DocumentIdT) override { ++ _putCount; }
- virtual void remove(SerialNum, DocumentIdT) override { ++_rmCount; }
- virtual void heartBeat(SerialNum) override { ++_heartbeatCount; }
+ void put(SerialNum, DocumentIdT, const Document &) override { ++ _putCount; }
+ void put(SerialNum, DocumentIdT, const vespalib::nbostream &) override { ++ _putCount; }
+
+ void remove(SerialNum, DocumentIdT) override { ++_rmCount; }
+ void heartBeat(SerialNum) override { ++_heartbeatCount; }
};
DocumentTypeRepo::SP myGetDocumentTypeRepo() {
diff --git a/searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.cpp b/searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.cpp
index 7cb5e55ad89..e65209bf526 100644
--- a/searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.cpp
+++ b/searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.cpp
@@ -213,9 +213,16 @@ SummaryManager::SummaryManager(vespalib::ThreadExecutor & executor,
SummaryManager::~SummaryManager() {}
void
-SummaryManager::putDocument(uint64_t syncToken, const Document & doc, search::DocumentIdT lid)
+SummaryManager::putDocument(uint64_t syncToken, search::DocumentIdT lid, const Document & doc)
{
- _docStore->write(syncToken, doc, lid);
+ _docStore->write(syncToken, lid, doc);
+ _currentSerial = syncToken;
+}
+
+void
+SummaryManager::putDocument(uint64_t syncToken, search::DocumentIdT lid, const vespalib::nbostream & doc)
+{
+ _docStore->write(syncToken, lid, doc);
_currentSerial = syncToken;
}
diff --git a/searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.h b/searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.h
index 07804f86764..af82f25ba51 100644
--- a/searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.h
+++ b/searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.h
@@ -85,8 +85,9 @@ public:
const std::shared_ptr<search::IBucketizer> & bucketizer);
~SummaryManager();
- void putDocument(uint64_t syncToken, const document::Document & doc,
- search::DocumentIdT lid);
+ void putDocument(uint64_t syncToken, search::DocumentIdT lid, const document::Document & doc);
+ void putDocument(uint64_t syncToken, search::DocumentIdT lid, const vespalib::nbostream & doc);
+
void removeDocument(uint64_t syncToken, search::DocumentIdT lid);
searchcorespi::IFlushTarget::List getFlushTargets(searchcorespi::index::IThreadService & summaryService);
diff --git a/searchcore/src/vespa/searchcore/proton/server/isummaryadapter.h b/searchcore/src/vespa/searchcore/proton/server/isummaryadapter.h
index ffdc59456ac..5c4eb75ecaf 100644
--- a/searchcore/src/vespa/searchcore/proton/server/isummaryadapter.h
+++ b/searchcore/src/vespa/searchcore/proton/server/isummaryadapter.h
@@ -1,12 +1,16 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
-#include <vespa/document/fieldvalue/document.h>
-#include <vespa/document/update/documentupdate.h>
-#include <vespa/searchcore/proton/docsummary/isummarymanager.h>
#include <vespa/searchlib/query/base.h>
#include <vespa/searchlib/common/serialnum.h>
+namespace document {
+ class Document;
+ class DocumentTypeRepo;
+}
+namespace search { class IDocumentStore; }
+namespace vespalib { class nbostream; }
+
namespace proton {
/**
@@ -14,30 +18,23 @@ namespace proton {
**/
class ISummaryAdapter {
public:
- typedef std::unique_ptr<ISummaryAdapter> UP;
- typedef std::shared_ptr<ISummaryAdapter> SP;
+ using UP = std::unique_ptr<ISummaryAdapter>;
+ using SP = std::shared_ptr<ISummaryAdapter>;
+ using SerialNum = search::SerialNum;
+ using Document = document::Document;
+ using DocumentIdT = search::DocumentIdT;
+ using DocumentTypeRepo = document::DocumentTypeRepo;
virtual ~ISummaryAdapter() {}
// feed interface
- virtual void put(search::SerialNum serialNum,
- const document::Document &doc,
- const search::DocumentIdT lid) = 0;
- virtual void remove(search::SerialNum serialNum,
- const search::DocumentIdT lid) = 0;
-
- virtual void
- heartBeat(search::SerialNum serialNum) = 0;
-
- virtual const search::IDocumentStore &
- getDocumentStore() const = 0;
-
- virtual std::unique_ptr<document::Document>
- get(const search::DocumentIdT lid,
- const document::DocumentTypeRepo &repo) = 0;
-
+ virtual void put(SerialNum serialNum, const DocumentIdT lid, const Document &doc) = 0;
+ virtual void put(SerialNum serialNum, const DocumentIdT lid, const vespalib::nbostream & os) = 0;
+ virtual void remove(SerialNum serialNum, const DocumentIdT lid) = 0;
+ virtual void heartBeat(SerialNum serialNum) = 0;
+ virtual const search::IDocumentStore &getDocumentStore() const = 0;
+ virtual std::unique_ptr<Document> get(const DocumentIdT lid, const DocumentTypeRepo &repo) = 0;
virtual void compactLidSpace(uint32_t wantedDocIdLimit) = 0;
};
} // namespace proton
-
diff --git a/searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.cpp b/searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.cpp
index 1987ba81cad..3a3264336c8 100644
--- a/searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.cpp
@@ -116,10 +116,10 @@ SearchableFeedView::performIndexPut(SerialNum serialNum, search::DocumentIdT lid
}
void
-SearchableFeedView::performIndexPut(SerialNum serialNum, search::DocumentIdT lid, const FutureDoc & futureDoc,
+SearchableFeedView::performIndexPut(SerialNum serialNum, search::DocumentIdT lid, FutureDoc futureDoc,
bool immediateCommit, OnOperationDoneType onWriteDone)
{
- const Document::UP & doc = futureDoc.get();
+ Document::UP doc = std::move(futureDoc.get());
if (doc) {
performIndexPut(serialNum, lid, *doc, immediateCommit, onWriteDone);
}
@@ -161,15 +161,16 @@ SearchableFeedView::getUpdateScope(const DocumentUpdate &upd)
}
void
-SearchableFeedView::updateIndexedFields(SerialNum serialNum, search::DocumentIdT lid, const FutureDoc & futureDoc,
+SearchableFeedView::updateIndexedFields(SerialNum serialNum, search::DocumentIdT lid, FutureDoc futureDoc,
bool immediateCommit, OnOperationDoneType onWriteDone)
{
if (shouldTrace(onWriteDone, 1)) {
onWriteDone->getToken()->trace(1, "Then we can update the index.");
}
_writeService.index().execute(
- makeLambdaTask([=]() {
- performIndexPut(serialNum, lid, futureDoc, immediateCommit, onWriteDone);
+ makeLambdaTask([serialNum, lid, futureDoc = std::move(futureDoc),
+ immediateCommit, onWriteDone = std::move(onWriteDone), this]() mutable {
+ performIndexPut(serialNum, lid, std::move(futureDoc), immediateCommit, std::move(onWriteDone));
}));
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.h b/searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.h
index 88ee03b3f93..c0d9bfcfbc6 100644
--- a/searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.h
+++ b/searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.h
@@ -6,8 +6,7 @@
#include <vespa/searchcore/proton/attribute/i_attribute_writer.h>
#include <vespa/searchcore/proton/index/i_index_writer.h>
-namespace proton
-{
+namespace proton {
class IGidToLidChangeHandler;
@@ -41,36 +40,23 @@ private:
bool hasIndexedFields() const { return _hasIndexedFields; }
void
- performIndexPut(SerialNum serialNum,
- search::DocumentIdT lid,
- const document::Document &doc,
- bool immediateCommit,
- OnOperationDoneType onWriteDone);
+ performIndexPut(SerialNum serialNum, search::DocumentIdT lid, const document::Document &doc,
+ bool immediateCommit, OnOperationDoneType onWriteDone);
void
- performIndexPut(SerialNum serialNum,
- search::DocumentIdT lid,
- const document::Document::SP &doc,
- bool immediateCommit,
- OnOperationDoneType onWriteDone);
+ performIndexPut(SerialNum serialNum, search::DocumentIdT lid, const document::Document::SP &doc,
+ bool immediateCommit, OnOperationDoneType onWriteDone);
void
- performIndexPut(SerialNum serialNum,
- search::DocumentIdT lid,
- const FutureDoc & doc,
- bool immediateCommit,
- OnOperationDoneType onWriteDone);
+ performIndexPut(SerialNum serialNum, search::DocumentIdT lid, FutureDoc doc,
+ bool immediateCommit, OnOperationDoneType onWriteDone);
void
- performIndexRemove(SerialNum serialNum,
- search::DocumentIdT lid,
- bool immediateCommit,
- OnRemoveDoneType onWriteDone);
+ performIndexRemove(SerialNum serialNum, search::DocumentIdT lid,
+ bool immediateCommit, OnRemoveDoneType onWriteDone);
void
- performIndexRemove(SerialNum serialNum,
- const LidVector &lidsToRemove,
- bool immediateCommit,
- OnWriteDoneType onWriteDone);
+ performIndexRemove(SerialNum serialNum, const LidVector &lidsToRemove,
+ bool immediateCommit, OnWriteDoneType onWriteDone);
void performIndexHeartBeat(SerialNum serialNum);
@@ -80,32 +66,22 @@ private:
void heartBeatIndexedFields(SerialNum serialNum) override;
virtual void
- putIndexedFields(SerialNum serialNum,
- search::DocumentIdT lid,
- const document::Document::SP &newDoc,
- bool immediateCommit,
- OnOperationDoneType onWriteDone) override;
+ putIndexedFields(SerialNum serialNum, search::DocumentIdT lid, const document::Document::SP &newDoc,
+ bool immediateCommit, OnOperationDoneType onWriteDone) override;
UpdateScope getUpdateScope(const document::DocumentUpdate &upd) override;
virtual void
- updateIndexedFields(SerialNum serialNum,
- search::DocumentIdT lid,
- const FutureDoc & newDoc,
- bool immediateCommit,
- OnOperationDoneType onWriteDone) override;
+ updateIndexedFields(SerialNum serialNum, search::DocumentIdT lid, FutureDoc newDoc,
+ bool immediateCommit, OnOperationDoneType onWriteDone) override;
virtual void
- removeIndexedFields(SerialNum serialNum,
- search::DocumentIdT lid,
- bool immediateCommit,
- OnRemoveDoneType onWriteDone) override;
+ removeIndexedFields(SerialNum serialNum, search::DocumentIdT lid,
+ bool immediateCommit, OnRemoveDoneType onWriteDone) override;
virtual void
- removeIndexedFields(SerialNum serialNum,
- const LidVector &lidsToRemove,
- bool immediateCommit,
- OnWriteDoneType onWriteDone) override;
+ removeIndexedFields(SerialNum serialNum, const LidVector &lidsToRemove,
+ bool immediateCommit, OnWriteDoneType onWriteDone) override;
void performIndexForceCommit(SerialNum serialNum, OnForceCommitDoneType onCommitDone);
void forceCommit(SerialNum serialNum, OnForceCommitDoneType onCommitDone) override;
@@ -113,10 +89,8 @@ private:
virtual void notifyGidToLidChange(const document::GlobalId &gid, uint32_t lid) override;
public:
- SearchableFeedView(const StoreOnlyFeedView::Context &storeOnlyCtx,
- const PersistentParams &params,
- const FastAccessFeedView::Context &fastUpdateCtx,
- Context ctx);
+ SearchableFeedView(const StoreOnlyFeedView::Context &storeOnlyCtx, const PersistentParams &params,
+ const FastAccessFeedView::Context &fastUpdateCtx, Context ctx);
virtual ~SearchableFeedView();
const IIndexWriter::SP &getIndexWriter() const { return _indexWriter; }
@@ -125,4 +99,3 @@ public:
};
} // namespace proton
-
diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp
index 5601a0b5911..84e2910d059 100644
--- a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp
@@ -328,7 +328,7 @@ void
StoreOnlyFeedView::updateAttributes(SerialNum, search::DocumentIdT, const DocumentUpdate &, bool, OnOperationDoneType) {}
void
-StoreOnlyFeedView::updateIndexedFields(SerialNum, search::DocumentIdT, const FutureDoc &, bool, OnOperationDoneType)
+StoreOnlyFeedView::updateIndexedFields(SerialNum, search::DocumentIdT, FutureDoc, bool, OnOperationDoneType)
{
abort(); // Should never be called.
}
@@ -351,19 +351,20 @@ StoreOnlyFeedView::handleUpdate(FeedToken *token, const UpdateOperation &updOp)
}
void StoreOnlyFeedView::putSummary(SerialNum serialNum, search::DocumentIdT lid,
- const FutureDoc & futureDoc, OnOperationDoneType onDone)
+ FutureStream futureStream, OnOperationDoneType onDone)
{
_pendingLidTracker.produce(lid);
summaryExecutor().execute(
- makeLambdaTask([serialNum, lid, futureDoc, onDone, this] {
+ makeLambdaTask([serialNum, lid, futureStream = std::move(futureStream), onDone, this] () mutable {
(void) onDone;
- const Document::UP & doc = futureDoc.get();
- if (doc) {
- _summaryAdapter->put(serialNum, *futureDoc.get(), lid);
+ vespalib::nbostream os = std::move(futureStream.get());
+ if (!os.empty()) {
+ _summaryAdapter->put(serialNum, lid, os);
}
_pendingLidTracker.consume(lid);
}));
}
+
void StoreOnlyFeedView::putSummary(SerialNum serialNum, search::DocumentIdT lid,
Document::SP doc, OnOperationDoneType onDone)
{
@@ -371,7 +372,7 @@ void StoreOnlyFeedView::putSummary(SerialNum serialNum, search::DocumentIdT lid
summaryExecutor().execute(
makeLambdaTask([serialNum, doc = std::move(doc), onDone, lid, this] {
(void) onDone;
- _summaryAdapter->put(serialNum, *doc, lid);
+ _summaryAdapter->put(serialNum, lid, *doc);
_pendingLidTracker.consume(lid);
}));
}
@@ -424,26 +425,30 @@ StoreOnlyFeedView::internalUpdate(FeedToken::UP token, const UpdateOperation &up
auto onWriteDone = createUpdateDoneContext(token, updOp.getType(), _params._metrics, updOp.getUpdate());
updateAttributes(serialNum, lid, upd, immediateCommit, onWriteDone);
- PromisedDoc promisedDoc;
- FutureDoc futureDoc = promisedDoc.get_future().share();
+
UpdateScope updateScope(getUpdateScope(upd));
if (updateScope.hasIndexOrNonAttributeFields()) {
+ PromisedDoc promisedDoc;
+ FutureDoc futureDoc = promisedDoc.get_future();
_pendingLidTracker.waitForConsumedLid(lid);
if (updateScope._indexedFields) {
- updateIndexedFields(serialNum, lid, futureDoc, immediateCommit, onWriteDone);
+ updateIndexedFields(serialNum, lid, std::move(futureDoc), immediateCommit, onWriteDone);
}
+ PromisedStream promisedStream;
+ FutureStream futureStream = promisedStream.get_future();
if (useDocumentStore(serialNum)) {
- putSummary(serialNum, lid, futureDoc, onWriteDone);
+ putSummary(serialNum, lid, std::move(futureStream), onWriteDone);
}
_writeService
.attributeFieldWriter()
.execute(serialNum,
[upd = updOp.getUpdate(), serialNum, prevDoc = _summaryAdapter->get(lid, *_repo), onWriteDone,
- promisedDoc = std::move(promisedDoc), this]() mutable
+ promisedDoc = std::move(promisedDoc), promisedStream = std::move(promisedStream),
+ this]() mutable
{
makeUpdatedDocument(serialNum, std::move(prevDoc), upd,
- onWriteDone, std::move(promisedDoc));
+ onWriteDone, std::move(promisedDoc), std::move(promisedStream));
});
}
if (!updateScope._indexedFields && onWriteDone) {
@@ -455,10 +460,12 @@ StoreOnlyFeedView::internalUpdate(FeedToken::UP token, const UpdateOperation &up
void
StoreOnlyFeedView::makeUpdatedDocument(SerialNum serialNum, Document::UP prevDoc, DocumentUpdate::SP update,
- OnOperationDoneType onWriteDone, PromisedDoc promisedDoc)
+ OnOperationDoneType onWriteDone, PromisedDoc promisedDoc,
+ PromisedStream promisedStream)
{
const DocumentUpdate & upd = *update;
Document::UP newDoc;
+ vespalib::nbostream newStream(12345);
assert(onWriteDone->getToken() == NULL || useDocumentStore(serialNum));
if (useDocumentStore(serialNum)) {
assert(prevDoc.get() != NULL);
@@ -483,6 +490,8 @@ StoreOnlyFeedView::makeUpdatedDocument(SerialNum serialNum, Document::UP prevDoc
LOG(spam, "Update\n%s", upd.toXml().c_str());
upd.applyTo(*newDoc);
LOG(spam, "Updated document :\n%s", newDoc->toXml(" ").c_str());
+ newDoc->serialize(newStream);
+ LOG(spam, "Serialized new document to a buffer of %zd bytes", newStream.size());
if (shouldTrace(onWriteDone, 1)) {
onWriteDone->getToken()->trace(1, "Then we update summary.");
}
@@ -494,6 +503,7 @@ StoreOnlyFeedView::makeUpdatedDocument(SerialNum serialNum, Document::UP prevDoc
}
}
promisedDoc.set_value(std::move(newDoc));
+ promisedStream.set_value(std::move(newStream));
}
bool
diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h
index 2cec0aba4f9..a9d56b8f98d 100644
--- a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h
+++ b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h
@@ -57,8 +57,10 @@ public:
using OnPutDoneType = const std::shared_ptr<PutDoneContext> &;
using OnRemoveDoneType = const std::shared_ptr<RemoveDoneContext> &;
using FeedTokenUP = std::unique_ptr<FeedToken>;
- using FutureDoc = std::shared_future<Document::UP>;
+ using FutureDoc = std::future<Document::UP>;
using PromisedDoc = std::promise<Document::UP>;
+ using FutureStream = std::future<vespalib::nbostream>;
+ using PromisedStream = std::promise<vespalib::nbostream>;
struct Context
{
@@ -145,7 +147,7 @@ private:
searchcorespi::index::IThreadService & summaryExecutor() {
return _writeService.summary();
}
- void putSummary(SerialNum serialNum, search::DocumentIdT lid, const FutureDoc & doc, OnOperationDoneType onDone);
+ void putSummary(SerialNum serialNum, search::DocumentIdT lid, FutureStream doc, OnOperationDoneType onDone);
void putSummary(SerialNum serialNum, search::DocumentIdT lid, Document::SP doc, OnOperationDoneType onDone);
void removeSummary(SerialNum serialNum, search::DocumentIdT lid);
void heartBeatSummary(SerialNum serialNum);
@@ -179,7 +181,7 @@ private:
virtual void notifyGidToLidChange(const document::GlobalId &gid, uint32_t lid);
void makeUpdatedDocument(SerialNum serialNum, Document::UP prevDoc, DocumentUpdate::SP upd,
- OnOperationDoneType onWriteDone, PromisedDoc promisedDoc);
+ OnOperationDoneType onWriteDone, PromisedDoc promisedDoc, PromisedStream promisedStream);
protected:
virtual void internalDeleteBucket(const DeleteBucketOperation &delOp);
@@ -198,7 +200,7 @@ private:
virtual void updateAttributes(SerialNum serialNum, search::DocumentIdT lid, const DocumentUpdate &upd,
bool immediateCommit, OnOperationDoneType onWriteDone);
- virtual void updateIndexedFields(SerialNum serialNum, search::DocumentIdT lid, const FutureDoc & doc,
+ virtual void updateIndexedFields(SerialNum serialNum, search::DocumentIdT lid, FutureDoc doc,
bool immediateCommit, OnOperationDoneType onWriteDone);
virtual void removeAttributes(SerialNum serialNum, search::DocumentIdT lid,
diff --git a/searchcore/src/vespa/searchcore/proton/server/summaryadapter.cpp b/searchcore/src/vespa/searchcore/proton/server/summaryadapter.cpp
index a79f72ed7b2..3740b2f1f70 100644
--- a/searchcore/src/vespa/searchcore/proton/server/summaryadapter.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/summaryadapter.cpp
@@ -1,7 +1,8 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "summaryadapter.h"
-#include <vespa/document/fieldvalue/stringfieldvalue.h>
+#include <vespa/searchcore/proton/docsummary/summarymanager.h>
+#include <vespa/vespalib/objects/nbostream.h>
#include <vespa/log/log.h>
LOG_SETUP(".proton.server.summaryadapter");
@@ -12,34 +13,43 @@ namespace proton {
SummaryAdapter::SummaryAdapter(const SummaryManager::SP &mgr)
: _mgr(mgr),
- _imgr(mgr),
_lastSerial(_mgr->getBackingStore().lastSyncToken())
-{
- // empty
-}
+{}
+
+SummaryAdapter::~SummaryAdapter() {}
-bool SummaryAdapter::ignore(search::SerialNum serialNum) const
+bool SummaryAdapter::ignore(SerialNum serialNum) const
{
assert(serialNum != 0);
return serialNum <= _lastSerial;
}
+ISummaryManager & SummaryAdapter::imgr() const { return *_mgr; }
+
void
-SummaryAdapter::put(search::SerialNum serialNum,
- const document::Document &doc,
- const search::DocumentIdT lid)
+SummaryAdapter::put(SerialNum serialNum, const DocumentIdT lid, const Document &doc)
{
if ( ! ignore(serialNum) ) {
LOG(spam, "SummaryAdapter::put(docId = '%s', lid = %u, document = '%s')",
doc.getId().toString().c_str(), lid, doc.toString(true).c_str());
- _mgr->putDocument(serialNum, doc, lid);
+ _mgr->putDocument(serialNum, lid, doc);
_lastSerial = serialNum;
}
}
void
-SummaryAdapter::remove(search::SerialNum serialNum,
- const search::DocumentIdT lid)
+SummaryAdapter::put(SerialNum serialNum, const DocumentIdT lid, const vespalib::nbostream &os)
+{
+ if ( ! ignore(serialNum) ) {
+ LOG(spam, "SummaryAdapter::put(serialnum = '%zd', lid = %u, stream size = '%zd')",
+ serialNum, lid, os.size());
+ _mgr->putDocument(serialNum, lid, os);
+ _lastSerial = serialNum;
+ }
+}
+
+void
+SummaryAdapter::remove(SerialNum serialNum, const DocumentIdT lid)
{
if ( ! ignore(serialNum + 1) ) {
_mgr->removeDocument(serialNum, lid);
@@ -48,12 +58,26 @@ SummaryAdapter::remove(search::SerialNum serialNum,
}
void
-SummaryAdapter::heartBeat(search::SerialNum serialNum)
+SummaryAdapter::heartBeat(SerialNum serialNum)
{
if (serialNum > _lastSerial) {
remove(serialNum, 0u); // XXX: Misused lid 0
}
}
+const search::IDocumentStore &
+SummaryAdapter::getDocumentStore() const {
+ return imgr().getBackingStore();
+}
+
+std::unique_ptr<Document>
+SummaryAdapter::get(const DocumentIdT lid, const DocumentTypeRepo &repo) {
+ return imgr().getBackingStore().read(lid, repo);
+}
+
+void
+SummaryAdapter::compactLidSpace(uint32_t wantedDocIdLimit) {
+ _mgr->getBackingStore().compactLidSpace(wantedDocIdLimit);
+}
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/summaryadapter.h b/searchcore/src/vespa/searchcore/proton/server/summaryadapter.h
index 4bd422bb643..dfcbc5c05a5 100644
--- a/searchcore/src/vespa/searchcore/proton/server/summaryadapter.h
+++ b/searchcore/src/vespa/searchcore/proton/server/summaryadapter.h
@@ -1,46 +1,32 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
-#include <vespa/searchcore/proton/docsummary/summarymanager.h>
#include "isummaryadapter.h"
namespace proton {
+class SummaryManager;
+class ISummaryManager;
+
class SummaryAdapter : public ISummaryAdapter {
private:
- SummaryManager::SP _mgr;
- ISummaryManager::SP _imgr;
- search::SerialNum _lastSerial;
+ std::shared_ptr<SummaryManager> _mgr;
+ SerialNum _lastSerial;
bool ignore(search::SerialNum serialNum) const;
+ ISummaryManager & imgr() const;
public:
- SummaryAdapter(const SummaryManager::SP &mgr);
-
- /**
- * Implements ISummaryAdapter.
- */
- virtual void put(search::SerialNum serialNum,
- const document::Document &doc,
- const search::DocumentIdT lid) override;
- virtual void remove(search::SerialNum serialNum,
- const search::DocumentIdT lid) override;
-
- virtual void heartBeat(search::SerialNum serialNum) override;
-
- virtual const search::IDocumentStore &getDocumentStore() const override {
- return _imgr->getBackingStore();
- }
-
- virtual std::unique_ptr<document::Document> get(const search::DocumentIdT lid,
- const document::DocumentTypeRepo &repo) override {
- return _imgr->getBackingStore().read(lid, repo);
- }
-
- virtual void compactLidSpace(uint32_t wantedDocIdLimit) override {
- _mgr->getBackingStore().compactLidSpace(wantedDocIdLimit);
- }
+ SummaryAdapter(const std::shared_ptr<SummaryManager> &mgr);
+ ~SummaryAdapter();
+
+ void put(SerialNum serialNum, const DocumentIdT lid, const Document &doc) override;
+ void put(SerialNum serialNum, const DocumentIdT lid, const vespalib::nbostream &doc) override;
+ void remove(SerialNum serialNum, const DocumentIdT lid) override;
+ void heartBeat(SerialNum serialNum) override;
+ const search::IDocumentStore &getDocumentStore() const override;
+ std::unique_ptr<document::Document> get(const DocumentIdT lid, const DocumentTypeRepo &repo) override;
+ void compactLidSpace(uint32_t wantedDocIdLimit) override;
};
} // namespace proton
-
diff --git a/searchcore/src/vespa/searchcore/proton/test/dummy_document_store.h b/searchcore/src/vespa/searchcore/proton/test/dummy_document_store.h
index 1809f673951..258be5e3936 100644
--- a/searchcore/src/vespa/searchcore/proton/test/dummy_document_store.h
+++ b/searchcore/src/vespa/searchcore/proton/test/dummy_document_store.h
@@ -21,7 +21,8 @@ struct DummyDocumentStore : public search::IDocumentStore
const document::DocumentTypeRepo &) const override {
return document::Document::UP();
}
- virtual void write(uint64_t, const document::Document &, search::DocumentIdT) override {}
+ virtual void write(uint64_t, search::DocumentIdT, const document::Document &) override {}
+ virtual void write(uint64_t, search::DocumentIdT, const vespalib::nbostream &) override {}
virtual void remove(uint64_t, search::DocumentIdT) override {}
virtual void flush(uint64_t) override {}
virtual uint64_t initFlush(uint64_t) override { return 0; }
diff --git a/searchcore/src/vespa/searchcore/proton/test/mock_summary_adapter.h b/searchcore/src/vespa/searchcore/proton/test/mock_summary_adapter.h
index 4e6568206bf..7eec47b4929 100644
--- a/searchcore/src/vespa/searchcore/proton/test/mock_summary_adapter.h
+++ b/searchcore/src/vespa/searchcore/proton/test/mock_summary_adapter.h
@@ -3,31 +3,27 @@
#include <vespa/searchcore/proton/server/isummaryadapter.h>
-namespace proton {
-
-namespace test {
+namespace proton::test {
/**
* Mock of the ISummaryAdapter interface used for unit testing.
*/
struct MockSummaryAdapter : public ISummaryAdapter
{
- virtual void put(search::SerialNum, const document::Document &, const search::DocumentIdT) override {}
- virtual void remove(search::SerialNum, const search::DocumentIdT) override {}
- virtual void heartBeat(search::SerialNum) override {}
- virtual const search::IDocumentStore &getDocumentStore() const override {
+ void put(SerialNum, DocumentIdT, const Document &) override {}
+ void put(SerialNum, DocumentIdT, const vespalib::nbostream &) override {}
+ void remove(SerialNum, DocumentIdT) override {}
+ void heartBeat(SerialNum) override {}
+ const search::IDocumentStore &getDocumentStore() const override {
const search::IDocumentStore *store = NULL;
return *store;
}
- virtual std::unique_ptr<document::Document> get(const search::DocumentIdT,
- const document::DocumentTypeRepo &) override {
+ std::unique_ptr<document::Document> get(DocumentIdT, const DocumentTypeRepo &) override {
return std::unique_ptr<document::Document>();
}
- virtual void compactLidSpace(uint32_t wantedDocIdLimit) override {
+ void compactLidSpace(uint32_t wantedDocIdLimit) override {
(void) wantedDocIdLimit;
}
};
-} // namespace test
-
-} // namespace proton
+}