summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp16
-rw-r--r--searchcore/src/tests/proton/documentmetastore/lidreusedelayer/lidreusedelayer_test.cpp27
-rw-r--r--searchcore/src/vespa/searchcore/proton/docsummary/summaryflushtarget.cpp20
-rw-r--r--searchcore/src/vespa/searchcore/proton/docsummary/summaryflushtarget.h7
-rw-r--r--searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.cpp4
-rw-r--r--searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.h3
-rw-r--r--searchcore/src/vespa/searchcore/proton/metrics/legacy_documentdb_metrics.cpp1
-rw-r--r--searchcore/src/vespa/searchcore/proton/metrics/legacy_documentdb_metrics.h1
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/documentdb.cpp2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp5
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h9
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/fast_access_doc_subdb.cpp1
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/fast_access_feed_view.cpp1
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.cpp66
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.h20
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/storeonlydocsubdb.cpp2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp159
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h64
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/visibilityhandler.cpp1
-rw-r--r--searchcore/src/vespa/searchcore/proton/test/threading_service_observer.h20
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/ithreadingservice.h11
-rw-r--r--searchlib/src/vespa/searchlib/common/serialnum.h2
22 files changed, 278 insertions, 164 deletions
diff --git a/searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp b/searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp
index 310458a12f9..ed0e5091aec 100644
--- a/searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp
@@ -913,24 +913,26 @@ TEST_F("require that remove() notifies gid to lid change handler", SearchableFee
bool
assertThreadObserver(uint32_t masterExecuteCnt,
uint32_t indexExecuteCnt,
+ uint32_t summaryExecuteCnt,
const test::ThreadingServiceObserver &observer)
{
if (!EXPECT_EQUAL(masterExecuteCnt, observer.masterObserver().getExecuteCnt())) return false;
if (!EXPECT_EQUAL(indexExecuteCnt, observer.indexObserver().getExecuteCnt())) return false;
+ if (!EXPECT_EQUAL(summaryExecuteCnt, observer.summaryObserver().getExecuteCnt())) return false;
return true;
}
TEST_F("require that remove() calls removeComplete() via delayed thread service",
SearchableFeedViewFixture)
{
- EXPECT_TRUE(assertThreadObserver(1, 0, f.writeServiceObserver()));
+ EXPECT_TRUE(assertThreadObserver(1, 0, 0, f.writeServiceObserver()));
f.putAndWait(f.doc1(10));
// put index fields handled in index thread
- EXPECT_TRUE(assertThreadObserver(2, 1, f.writeServiceObserver()));
+ EXPECT_TRUE(assertThreadObserver(2, 1, 1, f.writeServiceObserver()));
f.removeAndWait(f.doc1(20));
// remove index fields handled in index thread
// delayed remove complete handled in same index thread, then master thread
- EXPECT_TRUE(assertThreadObserver(4, 2, f.writeServiceObserver()));
+ EXPECT_TRUE(assertThreadObserver(4, 2, 2, f.writeServiceObserver()));
EXPECT_EQUAL(1u, f.metaStoreObserver()._removeCompleteCnt);
EXPECT_EQUAL(1u, f.metaStoreObserver()._removeCompleteLid);
}
@@ -1148,11 +1150,11 @@ TEST_F("require that compactLidSpace() propagates to document meta store and doc
SearchableFeedViewFixture)
{
f.populateBeforeCompactLidSpace();
- EXPECT_TRUE(assertThreadObserver(5, 3, f.writeServiceObserver()));
+ EXPECT_TRUE(assertThreadObserver(5, 3, 3, f.writeServiceObserver()));
f.compactLidSpaceAndWait(2);
// performIndexForceCommit in index thread, then completion callback
// in master thread.
- EXPECT_TRUE(assertThreadObserver(7, 4, f.writeServiceObserver()));
+ EXPECT_TRUE(assertThreadObserver(7, 4, 4, f.writeServiceObserver()));
EXPECT_EQUAL(2u, f.metaStoreObserver()._compactLidSpaceLidLimit);
EXPECT_EQUAL(2u, f.getDocumentStore()._compactLidSpaceLidLimit);
EXPECT_EQUAL(1u, f.metaStoreObserver()._holdUnblockShrinkLidSpaceCnt);
@@ -1165,12 +1167,12 @@ TEST_F("require that compactLidSpace() doesn't propagate to "
SearchableFeedViewFixture)
{
f.populateBeforeCompactLidSpace();
- EXPECT_TRUE(assertThreadObserver(5, 3, f.writeServiceObserver()));
+ EXPECT_TRUE(assertThreadObserver(5, 3, 3, f.writeServiceObserver()));
CompactLidSpaceOperation op(0, 2);
op.setSerialNum(0);
f.runInMaster([&] () { f.fv.handleCompactLidSpace(op); });
// Delayed holdUnblockShrinkLidSpace() in index thread, then master thread
- EXPECT_TRUE(assertThreadObserver(6, 3, f.writeServiceObserver()));
+ EXPECT_TRUE(assertThreadObserver(6, 3, 3, f.writeServiceObserver()));
EXPECT_EQUAL(0u, f.metaStoreObserver()._compactLidSpaceLidLimit);
EXPECT_EQUAL(0u, f.getDocumentStore()._compactLidSpaceLidLimit);
EXPECT_EQUAL(0u, f.metaStoreObserver()._holdUnblockShrinkLidSpaceCnt);
diff --git a/searchcore/src/tests/proton/documentmetastore/lidreusedelayer/lidreusedelayer_test.cpp b/searchcore/src/tests/proton/documentmetastore/lidreusedelayer/lidreusedelayer_test.cpp
index 455a47644b5..5109d994f61 100644
--- a/searchcore/src/tests/proton/documentmetastore/lidreusedelayer/lidreusedelayer_test.cpp
+++ b/searchcore/src/tests/proton/documentmetastore/lidreusedelayer/lidreusedelayer_test.cpp
@@ -19,12 +19,17 @@ namespace
bool
assertThreadObserver(uint32_t masterExecuteCnt,
uint32_t indexExecuteCnt,
+ uint32_t summaryExecuteCnt,
const test::ThreadingServiceObserver &observer)
{
if (!EXPECT_EQUAL(masterExecuteCnt,
observer.masterObserver().getExecuteCnt())) {
return false;
}
+ if (!EXPECT_EQUAL(summaryExecuteCnt,
+ observer.summaryObserver().getExecuteCnt())) {
+ return false;
+ }
if (!EXPECT_EQUAL(indexExecuteCnt,
observer.indexObserver().getExecuteCnt())) {
return false;
@@ -238,7 +243,7 @@ TEST_F("require that nothing happens before free list is active", Fixture)
EXPECT_FALSE(f.delayReuse(4));
EXPECT_FALSE(f.delayReuse({ 5, 6}));
EXPECT_TRUE(f._store.assertWork(0, 0, 0));
- EXPECT_TRUE(assertThreadObserver(3, 0, f._writeService));
+ EXPECT_TRUE(assertThreadObserver(3, 0, 0, f._writeService));
}
@@ -249,7 +254,7 @@ TEST_F("require that single lid is delayed", Fixture)
EXPECT_TRUE(f.delayReuse(4));
f.scheduleDelayReuseLid(4);
EXPECT_TRUE(f._store.assertWork(1, 0, 1));
- EXPECT_TRUE(assertThreadObserver(4, 1, f._writeService));
+ EXPECT_TRUE(assertThreadObserver(4, 1, 1, f._writeService));
}
@@ -260,7 +265,7 @@ TEST_F("require that lid vector is delayed", Fixture)
EXPECT_TRUE(f.delayReuse({ 5, 6, 7}));
f.scheduleDelayReuseLids({ 5, 6, 7});
EXPECT_TRUE(f._store.assertWork(0, 1, 3));
- EXPECT_TRUE(assertThreadObserver(4, 1, f._writeService));
+ EXPECT_TRUE(assertThreadObserver(4, 1, 1, f._writeService));
}
@@ -272,14 +277,14 @@ TEST_F("require that reuse can be batched", Fixture)
EXPECT_FALSE(f.delayReuse(4));
EXPECT_FALSE(f.delayReuse({ 5, 6, 7}));
EXPECT_TRUE(f._store.assertWork(0, 0, 0));
- EXPECT_TRUE(assertThreadObserver(4, 0, f._writeService));
+ EXPECT_TRUE(assertThreadObserver(4, 0, 0, f._writeService));
f.commit();
EXPECT_TRUE(f._store.assertWork(0, 1, 4));
- EXPECT_TRUE(assertThreadObserver(6, 1, f._writeService));
+ EXPECT_TRUE(assertThreadObserver(6, 1, 1, f._writeService));
EXPECT_FALSE(f.delayReuse(8));
EXPECT_FALSE(f.delayReuse({ 9, 10}));
EXPECT_TRUE(f._store.assertWork(0, 1, 4));
- EXPECT_TRUE(assertThreadObserver(8, 1, f._writeService));
+ EXPECT_TRUE(assertThreadObserver(8, 1, 1, f._writeService));
}
@@ -290,15 +295,15 @@ TEST_F("require that single element array is optimized", Fixture)
f.setImmediateCommit(false);
EXPECT_FALSE(f.delayReuse({ 4}));
EXPECT_TRUE(f._store.assertWork(0, 0, 0));
- EXPECT_TRUE(assertThreadObserver(3, 0, f._writeService));
+ EXPECT_TRUE(assertThreadObserver(3, 0, 0, f._writeService));
f.commit();
f.setImmediateCommit(true);
EXPECT_TRUE(f._store.assertWork(1, 0, 1));
- EXPECT_TRUE(assertThreadObserver(6, 1, f._writeService));
+ EXPECT_TRUE(assertThreadObserver(6, 1, 1, f._writeService));
EXPECT_TRUE(f.delayReuse({ 8}));
f.scheduleDelayReuseLids({ 8});
EXPECT_TRUE(f._store.assertWork(2, 0, 2));
- EXPECT_TRUE(assertThreadObserver(9, 2, f._writeService));
+ EXPECT_TRUE(assertThreadObserver(9, 2, 2, f._writeService));
}
@@ -308,10 +313,10 @@ TEST_F("require that lids are reused faster with no indexed fields", Fixture)
f.setHasIndexedOrAttributeFields(false);
EXPECT_FALSE(f.delayReuse(4));
EXPECT_TRUE(f._store.assertWork(1, 0, 1));
- EXPECT_TRUE(assertThreadObserver(2, 0, f._writeService));
+ EXPECT_TRUE(assertThreadObserver(2, 0, 0, f._writeService));
EXPECT_FALSE(f.delayReuse({ 5, 6, 7}));
EXPECT_TRUE(f._store.assertWork(1, 1, 4));
- EXPECT_TRUE(assertThreadObserver(3, 0, f._writeService));
+ EXPECT_TRUE(assertThreadObserver(3, 0, 0, f._writeService));
}
}
diff --git a/searchcore/src/vespa/searchcore/proton/docsummary/summaryflushtarget.cpp b/searchcore/src/vespa/searchcore/proton/docsummary/summaryflushtarget.cpp
index 2d448ca8c7e..de8eaacf184 100644
--- a/searchcore/src/vespa/searchcore/proton/docsummary/summaryflushtarget.cpp
+++ b/searchcore/src/vespa/searchcore/proton/docsummary/summaryflushtarget.cpp
@@ -1,6 +1,9 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "summaryflushtarget.h"
+#include <future>
+#include <vespa/searchcorespi/index/i_thread_service.h>
+#include <vespa/searchlib/common/lambdatask.h>
using search::IDocumentStore;
using search::SerialNum;
@@ -44,9 +47,11 @@ public:
}
-SummaryFlushTarget::SummaryFlushTarget(IDocumentStore & docStore)
+SummaryFlushTarget::SummaryFlushTarget(IDocumentStore & docStore,
+ searchcorespi::index::IThreadService & summaryService)
: IFlushTarget("summary.flush", Type::SYNC, Component::DOCUMENT_STORE),
_docStore(docStore),
+ _summaryService(summaryService),
_lastStats()
{
_lastStats.setPathElementsToLog(6);
@@ -77,9 +82,20 @@ SummaryFlushTarget::getFlushedSerialNum() const
}
IFlushTarget::Task::UP
+SummaryFlushTarget::internalInitFlush(SerialNum currentSerial) {
+ return Task::UP(new Flusher(_docStore, _lastStats, currentSerial));
+}
+IFlushTarget::Task::UP
SummaryFlushTarget::initFlush(SerialNum currentSerial)
{
- return Task::UP(new Flusher(_docStore, _lastStats, currentSerial));
+ // Called by document db executor
+ std::promise<Task::UP> promise;
+ std::future<Task::UP> future = promise.get_future();
+ _summaryService.execute(search::makeLambdaTask(
+ [&]() { promise.set_value(
+ internalInitFlush(currentSerial));
+ }));
+ return future.get();
}
uint64_t
diff --git a/searchcore/src/vespa/searchcore/proton/docsummary/summaryflushtarget.h b/searchcore/src/vespa/searchcore/proton/docsummary/summaryflushtarget.h
index ebf483e4a53..06b0940a8f0 100644
--- a/searchcore/src/vespa/searchcore/proton/docsummary/summaryflushtarget.h
+++ b/searchcore/src/vespa/searchcore/proton/docsummary/summaryflushtarget.h
@@ -4,6 +4,7 @@
#include <vespa/searchlib/docstore/idocumentstore.h>
#include <vespa/searchcorespi/flush/iflushtarget.h>
+namespace searchcorespi::index { class IThreadService; }
namespace proton {
/**
@@ -13,10 +14,14 @@ class SummaryFlushTarget : public searchcorespi::IFlushTarget {
private:
using FlushStats = searchcorespi::FlushStats;
search::IDocumentStore & _docStore;
+ searchcorespi::index::IThreadService & _summaryService;
FlushStats _lastStats;
+ Task::UP internalInitFlush(SerialNum currentSerial);
+
public:
- SummaryFlushTarget(search::IDocumentStore & docStore);
+ SummaryFlushTarget(search::IDocumentStore & docStore,
+ searchcorespi::index::IThreadService & summaryService);
// Implements IFlushTarget
virtual MemoryGain getApproxMemoryGain() const override;
diff --git a/searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.cpp b/searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.cpp
index 014e861f4fa..ecc43c029be 100644
--- a/searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.cpp
+++ b/searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.cpp
@@ -193,10 +193,10 @@ createShrinkLidSpaceFlushTarget(IDocumentStore::SP docStore)
}
-IFlushTarget::List SummaryManager::getFlushTargets()
+IFlushTarget::List SummaryManager::getFlushTargets(searchcorespi::index::IThreadService & summaryService)
{
IFlushTarget::List ret;
- ret.push_back(std::make_shared<SummaryFlushTarget>(getBackingStore()));
+ ret.push_back(std::make_shared<SummaryFlushTarget>(getBackingStore(), summaryService));
if (dynamic_cast<LogDocumentStore *>(_docStore.get()) != NULL) {
ret.push_back(std::make_shared<SummaryCompactTarget>(getBackingStore()));
}
diff --git a/searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.h b/searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.h
index bb5f4354845..07804f86764 100644
--- a/searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.h
+++ b/searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.h
@@ -15,6 +15,7 @@
#include <vespa/vespalib/util/threadstackexecutor.h>
#include <vespa/fastlib/text/normwordfolder.h>
+namespace searchcorespi::index { class IThreadService; }
namespace search {
class IBucketizer;
@@ -87,7 +88,7 @@ public:
void putDocument(uint64_t syncToken, const document::Document & doc,
search::DocumentIdT lid);
void removeDocument(uint64_t syncToken, search::DocumentIdT lid);
- searchcorespi::IFlushTarget::List getFlushTargets();
+ searchcorespi::IFlushTarget::List getFlushTargets(searchcorespi::index::IThreadService & summaryService);
/**
* Implements ISummaryManager.
diff --git a/searchcore/src/vespa/searchcore/proton/metrics/legacy_documentdb_metrics.cpp b/searchcore/src/vespa/searchcore/proton/metrics/legacy_documentdb_metrics.cpp
index dd4239f55ab..d324c4a7373 100644
--- a/searchcore/src/vespa/searchcore/proton/metrics/legacy_documentdb_metrics.cpp
+++ b/searchcore/src/vespa/searchcore/proton/metrics/legacy_documentdb_metrics.cpp
@@ -151,6 +151,7 @@ LegacyDocumentDBMetrics::LegacyDocumentDBMetrics(const std::string &docTypeName,
matching(this),
executor("executor", this),
indexExecutor("indexexecutor", this),
+ summaryExecutor("summaryexecutor", this),
feed(this),
sessionManager(this),
ready("ready", this),
diff --git a/searchcore/src/vespa/searchcore/proton/metrics/legacy_documentdb_metrics.h b/searchcore/src/vespa/searchcore/proton/metrics/legacy_documentdb_metrics.h
index 164dfccfc86..78a13002d7a 100644
--- a/searchcore/src/vespa/searchcore/proton/metrics/legacy_documentdb_metrics.h
+++ b/searchcore/src/vespa/searchcore/proton/metrics/legacy_documentdb_metrics.h
@@ -116,6 +116,7 @@ struct LegacyDocumentDBMetrics : metrics::MetricSet
MatchingMetrics matching;
ExecutorMetrics executor;
ExecutorMetrics indexExecutor;
+ ExecutorMetrics summaryExecutor;
PerDocTypeFeedMetrics feed;
search::grouping::SessionManagerMetrics sessionManager;
SubDBMetrics ready;
diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp
index c70907d60ac..6dd9a3e3ee2 100644
--- a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp
@@ -505,6 +505,7 @@ DocumentDB::performDropFeedView(IFeedView::SP feedView)
// Also called by DocumentDB::receive() method to keep feed view alive
_writeService.attributeFieldWriter().sync();
+ _writeService.summary().sync();
// Feed view is kept alive in the closure's shared ptr.
_writeService.index().execute(makeTask(makeClosure(this,
@@ -1278,6 +1279,7 @@ DocumentDB::updateLegacyMetrics(LegacyDocumentDBMetrics &metrics)
{
updateMatchingMetrics(metrics.matching, *_subDBs.getReadySubDB());
metrics.executor.update(_writeService.getMasterExecutor().getStats());
+ metrics.summaryExecutor.update(_writeService.getSummaryExecutor().getStats());
metrics.indexExecutor.update(_writeService.getIndexExecutor().getStats());
metrics.sessionManager.update(_sessionManager->getGroupingStats());
updateDocstoreMetrics(metrics.docstore, _subDBs, _lastDocStoreCacheStats);
diff --git a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp
index 98b42b197ac..8091196cc4b 100644
--- a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp
@@ -14,8 +14,10 @@ ExecutorThreadingService::ExecutorThreadingService(uint32_t threads,
: _masterExecutor(1, stackSize),
_indexExecutor(1, stackSize, taskLimit),
+ _summaryExecutor(1, stackSize),
_masterService(_masterExecutor),
_indexService(_indexExecutor),
+ _summaryService(_summaryExecutor),
_indexFieldInverter(threads, taskLimit),
_indexFieldWriter(threads, taskLimit),
_attributeFieldWriter(threads, taskLimit)
@@ -34,6 +36,7 @@ ExecutorThreadingService::sync()
}
_attributeFieldWriter.sync();
_indexExecutor.sync();
+ _summaryExecutor.sync();
_indexFieldInverter.sync();
_indexFieldWriter.sync();
if (!isMasterThread) {
@@ -48,6 +51,8 @@ ExecutorThreadingService::shutdown()
_masterExecutor.shutdown();
_masterExecutor.sync();
_attributeFieldWriter.sync();
+ _summaryExecutor.shutdown();
+ _summaryExecutor.sync();
_indexExecutor.shutdown();
_indexExecutor.sync();
_indexFieldInverter.sync();
diff --git a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h
index d82119fd3d1..2328702e147 100644
--- a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h
+++ b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h
@@ -18,8 +18,10 @@ class ExecutorThreadingService : public searchcorespi::index::IThreadingService
private:
vespalib::ThreadStackExecutor _masterExecutor;
vespalib::BlockingThreadStackExecutor _indexExecutor;
+ vespalib::ThreadStackExecutor _summaryExecutor;
ExecutorThreadService _masterService;
ExecutorThreadService _indexService;
+ ExecutorThreadService _summaryService;
search::SequencedTaskExecutor _indexFieldInverter;
search::SequencedTaskExecutor _indexFieldWriter;
search::SequencedTaskExecutor _attributeFieldWriter;
@@ -54,6 +56,9 @@ public:
vespalib::ThreadStackExecutorBase &getIndexExecutor() {
return _indexExecutor;
}
+ vespalib::ThreadStackExecutorBase &getSummaryExecutor() {
+ return _summaryExecutor;
+ }
/**
* Implements IThreadingService
@@ -65,6 +70,10 @@ public:
return _indexService;
}
+ virtual searchcorespi::index::IThreadService &summary() override {
+ return _summaryService;
+ }
+
virtual search::ISequencedTaskExecutor &indexFieldInverter() override {
return _indexFieldInverter;
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/fast_access_doc_subdb.cpp b/searchcore/src/vespa/searchcore/proton/server/fast_access_doc_subdb.cpp
index 48863cf9369..8f2e6081b95 100644
--- a/searchcore/src/vespa/searchcore/proton/server/fast_access_doc_subdb.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/fast_access_doc_subdb.cpp
@@ -331,6 +331,7 @@ FastAccessDocSubDB::onReprocessDone(SerialNum serialNum)
attrWriter->commit(serialNum,
std::shared_ptr<search::IDestructorCallback>());
_writeService.attributeFieldWriter().sync();
+ _writeService.summary().sync();
Parent::onReprocessDone(serialNum);
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/fast_access_feed_view.cpp b/searchcore/src/vespa/searchcore/proton/server/fast_access_feed_view.cpp
index 07f5d2ac3ce..485a03276a5 100644
--- a/searchcore/src/vespa/searchcore/proton/server/fast_access_feed_view.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/fast_access_feed_view.cpp
@@ -116,6 +116,7 @@ void
FastAccessFeedView::sync()
{
_writeService.attributeFieldWriter().sync();
+ _writeService.summary().sync();
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.cpp b/searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.cpp
index 874e3594dbe..6b82e4ef7a3 100644
--- a/searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.cpp
@@ -20,8 +20,6 @@ using document::Document;
using document::DocumentId;
using document::DocumentTypeRepo;
using document::DocumentUpdate;
-using proton::matching::MatchContext;
-using proton::matching::Matcher;
using search::index::Schema;
using storage::spi::BucketInfoResult;
using storage::spi::Timestamp;
@@ -100,38 +98,56 @@ SearchableFeedView::putIndexedFields(SerialNum serialNum,
return;
}
_writeService.index().execute(
- makeLambdaTask([=]
- { performIndexPut(serialNum, lid, newDoc,
- immediateCommit, onWriteDone); }));
+ makeLambdaTask([=] {
+ performIndexPut(serialNum, lid, newDoc, immediateCommit, onWriteDone);
+ }));
}
-
void
SearchableFeedView::performIndexPut(SerialNum serialNum,
search::DocumentIdT lid,
- const Document::SP &doc,
+ const Document &doc,
bool immediateCommit,
OnOperationDoneType onWriteDone)
{
assert(_writeService.index().isCurrentThread());
- VLOG(getDebugLevel(lid, doc->getId()),
- "database(%s): performIndexPut: serialNum(%" PRIu64 "), "
- "docId(%s), lid(%d)",
- _params._docTypeName.toString().c_str(),
- serialNum,
- doc->getId().toString().c_str(), lid);
-
- _indexWriter->put(serialNum, *doc, lid);
+ VLOG(getDebugLevel(lid, doc.getId()),
+ "database(%s): performIndexPut: serialNum(%" PRIu64 "), "
+ "docId(%s), lid(%d)",
+ _params._docTypeName.toString().c_str(),
+ serialNum,
+ doc.getId().toString().c_str(), lid);
+
+ _indexWriter->put(serialNum, doc, lid);
if (immediateCommit) {
_indexWriter->commit(serialNum, onWriteDone);
}
if (shouldTrace(onWriteDone, 1)) {
FeedToken *token = onWriteDone->getToken();
token->trace(1, "Document indexed = . New Value : " +
- doc->toString(token->shouldTrace(2)));
+ doc.toString(token->shouldTrace(2)));
}
}
+void
+SearchableFeedView::performIndexPut(SerialNum serialNum,
+ search::DocumentIdT lid,
+ const Document::SP &doc,
+ bool immediateCommit,
+ OnOperationDoneType onWriteDone)
+{
+ performIndexPut(serialNum, lid, *doc, immediateCommit, onWriteDone);
+}
+
+void
+SearchableFeedView::performIndexPut(SerialNum serialNum,
+ search::DocumentIdT lid,
+ FutureDoc doc,
+ bool immediateCommit,
+ OnOperationDoneType onWriteDone)
+{
+ performIndexPut(serialNum, lid, *doc.get(), immediateCommit, onWriteDone);
+}
void
SearchableFeedView::heartBeatIndexedFields(SerialNum serialNum)
@@ -184,11 +200,23 @@ SearchableFeedView::updateIndexedFields(SerialNum serialNum,
onWriteDone->getToken()->trace(1, "Then we can update the index.");
}
_writeService.index().execute(
- makeLambdaTask([=]()
- { performIndexPut(serialNum, lid, newDoc,
- immediateCommit, onWriteDone); }));
+ makeLambdaTask([=]() {
+ performIndexPut(serialNum, lid, newDoc, immediateCommit, onWriteDone);
+ }));
}
+void
+SearchableFeedView::updateIndexedFields(SerialNum serialNum, search::DocumentIdT lid, FutureDoc futureDoc,
+ bool immediateCommit, OnOperationDoneType onWriteDone)
+{
+ if (shouldTrace(onWriteDone, 1)) {
+ onWriteDone->getToken()->trace(1, "Then we can update the index.");
+ }
+ _writeService.index().execute(
+ makeLambdaTask([=]() {
+ performIndexPut(serialNum, lid, futureDoc, immediateCommit, onWriteDone);
+ }));
+}
void
SearchableFeedView::removeIndexedFields(SerialNum serialNum,
diff --git a/searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.h b/searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.h
index 290b335efaa..87a2c7e746d 100644
--- a/searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.h
+++ b/searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.h
@@ -3,7 +3,6 @@
#pragma once
#include "fast_access_feed_view.h"
-#include "matchview.h"
#include <vespa/searchcore/proton/attribute/i_attribute_writer.h>
#include <vespa/searchcore/proton/index/i_index_writer.h>
@@ -45,9 +44,22 @@ private:
void
performIndexPut(SerialNum serialNum,
search::DocumentIdT lid,
+ const document::Document &doc,
+ bool immediateCommit,
+ OnOperationDoneType onWriteDone);
+
+ void
+ performIndexPut(SerialNum serialNum,
+ search::DocumentIdT lid,
const document::Document::SP &doc,
bool immediateCommit,
OnOperationDoneType onWriteDone);
+ void
+ performIndexPut(SerialNum serialNum,
+ search::DocumentIdT lid,
+ FutureDoc doc,
+ bool immediateCommit,
+ OnOperationDoneType onWriteDone);
void
performIndexRemove(SerialNum serialNum,
@@ -83,6 +95,12 @@ private:
const document::Document::SP &newDoc,
bool immediateCommit,
OnOperationDoneType onWriteDone) override;
+ virtual void
+ updateIndexedFields(SerialNum serialNum,
+ search::DocumentIdT lid,
+ FutureDoc newDoc,
+ bool immediateCommit,
+ OnOperationDoneType onWriteDone) override;
virtual void
removeIndexedFields(SerialNum serialNum,
diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlydocsubdb.cpp b/searchcore/src/vespa/searchcore/proton/server/storeonlydocsubdb.cpp
index d9383fdbbb2..b5045771147 100644
--- a/searchcore/src/vespa/searchcore/proton/server/storeonlydocsubdb.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/storeonlydocsubdb.cpp
@@ -362,7 +362,7 @@ StoreOnlyDocSubDB::getFlushTargets()
IFlushTarget::List
StoreOnlyDocSubDB::getFlushTargetsInternal()
{
- IFlushTarget::List ret(_rSummaryMgr->getFlushTargets());
+ IFlushTarget::List ret(_rSummaryMgr->getFlushTargets(_writeService.summary()));
ret.push_back(_dmsFlushTarget);
ret.push_back(_dmsShrinkTarget);
return ret;
diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp
index 4c4a2f698b0..5222c0d1dbb 100644
--- a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp
@@ -102,15 +102,15 @@ StoreOnlyFeedView::StoreOnlyFeedView(const Context &ctx, const PersistentParams
: IFeedView(),
FeedDebugger(),
_summaryAdapter(ctx._summaryAdapter),
- _schema(ctx._schema),
_documentMetaStoreContext(ctx._documentMetaStoreContext),
_repo(ctx._repo),
- _writeService(ctx._writeService),
- _params(params),
- _metaStore(_documentMetaStoreContext->get()),
_docType(NULL),
_lidReuseDelayer(ctx._lidReuseDelayer),
- _commitTimeTracker(ctx._commitTimeTracker)
+ _commitTimeTracker(ctx._commitTimeTracker),
+ _schema(ctx._schema),
+ _writeService(ctx._writeService),
+ _params(params),
+ _metaStore(_documentMetaStoreContext->get())
{
_docType = _repo->getDocumentType(_params._docTypeName.getName());
}
@@ -227,8 +227,9 @@ StoreOnlyFeedView::internalPut(FeedToken::UP token, const PutOperation &putOp)
considerEarlyAck(token, putOp.getType());
bool docAlreadyExists = putOp.getValidPrevDbdId(_params._subDbId);
+
if (putOp.getValidDbdId(_params._subDbId)) {
- _summaryAdapter->put(serialNum, *doc, putOp.getLid());
+ putSummary(serialNum, putOp.getLid(), doc);
bool immediateCommit = _commitTimeTracker.needCommit();
std::shared_ptr<PutDoneContext> onWriteDone =
createPutDoneContext(token, putOp.getType(), _params._metrics,
@@ -245,7 +246,6 @@ StoreOnlyFeedView::internalPut(FeedToken::UP token, const PutOperation &putOp)
}
}
-
void
StoreOnlyFeedView::heartBeatIndexedFields(SerialNum serialNum)
{
@@ -284,14 +284,14 @@ StoreOnlyFeedView::updateAttributes(SerialNum serialNum, search::DocumentIdT lid
void
-StoreOnlyFeedView::updateIndexedFields(SerialNum serialNum, search::DocumentIdT lid, const Document::SP &newDoc,
- bool immediateCommit, OnOperationDoneType onWriteDone)
+StoreOnlyFeedView::updateIndexedFields(SerialNum, search::DocumentIdT, const Document::SP &, bool, OnOperationDoneType)
+{
+ abort(); // Should never be called.
+}
+
+void
+StoreOnlyFeedView::updateIndexedFields(SerialNum, search::DocumentIdT, FutureDoc, bool, OnOperationDoneType)
{
- (void) serialNum;
- (void) lid;
- (void) newDoc;
- (void) immediateCommit;
- (void) onWriteDone;
abort(); // Should never be called.
}
@@ -313,6 +313,33 @@ StoreOnlyFeedView::handleUpdate(FeedToken *token, const UpdateOperation &updOp)
internalUpdate(dupFeedToken(token), updOp);
}
+void StoreOnlyFeedView::putSummary(SerialNum serialNum, search::DocumentIdT lid, FutureDoc futureDoc) {
+ summaryExecutor().execute(
+ makeLambdaTask([serialNum, futureDoc, lid, this] {
+ const Document::UP & doc = futureDoc.get();
+ if (doc) {
+ _summaryAdapter->put(serialNum, *futureDoc.get(), lid);
+ }
+ }));
+}
+void StoreOnlyFeedView::putSummary(SerialNum serialNum, search::DocumentIdT lid, Document::SP doc) {
+ summaryExecutor().execute(
+ makeLambdaTask([serialNum, doc = std::move(doc), lid, this] {
+ _summaryAdapter->put(serialNum, *doc, lid);
+ }));
+}
+void StoreOnlyFeedView::removeSummary(SerialNum serialNum, search::DocumentIdT lid) {
+ summaryExecutor().execute(
+ makeLambdaTask([serialNum, lid, this] {
+ _summaryAdapter->remove(serialNum, lid);
+ }));
+}
+void StoreOnlyFeedView::heartBeatSummary(SerialNum serialNum) {
+ summaryExecutor().execute(
+ makeLambdaTask([serialNum, this] {
+ _summaryAdapter->heartBeat(serialNum);
+ }));
+}
void
StoreOnlyFeedView::internalUpdate(FeedToken::UP token, const UpdateOperation &updOp) {
@@ -347,22 +374,26 @@ StoreOnlyFeedView::internalUpdate(FeedToken::UP token, const UpdateOperation &up
bool immediateCommit = _commitTimeTracker.needCommit();
auto onWriteDone = createUpdateDoneContext(token, updOp.getType(), _params._metrics, updOp.getUpdate());
updateAttributes(serialNum, lid, upd, immediateCommit, onWriteDone);
+
+ PromisedDoc promisedDoc;
+ FutureDoc futureDoc = promisedDoc.get_future().share();
UpdateScope updateScope(getUpdateScope(upd));
- /*
- * XXX: Flushing issue
- *
- * If subclass has indexed fields, but no partial updates updates
- * affect indexed fields then serial number is not updated, for very
- * specific feed patterns, this can cause flush engine issues.
- *
- * Similarly, if subclass has attributes that are not updated.
- *
- */
- if (updateScope._indexedFields || updateScope._nonAttributeFields) {
- updateIndexAndDocumentStore(updateScope._indexedFields,
- serialNum, lid, upd,
- immediateCommit,
- onWriteDone);
+ if (updateScope.hasIndexOrNonAttributeFields()) {
+ if (updateScope._indexedFields) {
+ updateIndexedFields(serialNum, lid, futureDoc, immediateCommit, onWriteDone);
+ }
+
+ if (useDocumentStore(serialNum)) {
+ putSummary(serialNum, lid, futureDoc);
+ }
+
+ _writeService
+ .attributeFieldWriter()
+ .execute(serialNum,
+ [upd = updOp.getUpdate(), serialNum, lid, onWriteDone,
+ promisedDoc = std::move(promisedDoc), this]() mutable {
+ updateIndexAndDocumentStore(serialNum, lid, upd, onWriteDone, std::move(promisedDoc));
+ });
}
if (!updateScope._indexedFields && onWriteDone) {
if (onWriteDone->shouldTrace(1)) {
@@ -371,16 +402,13 @@ StoreOnlyFeedView::internalUpdate(FeedToken::UP token, const UpdateOperation &up
}
}
-
void
-StoreOnlyFeedView::updateIndexAndDocumentStore(bool indexedFieldsInScope,
- SerialNum serialNum,
- search::DocumentIdT lid,
- const DocumentUpdate &upd,
- bool immediateCommit,
- OnOperationDoneType onWriteDone)
+StoreOnlyFeedView::updateIndexAndDocumentStore(SerialNum serialNum, search::DocumentIdT lid, DocumentUpdate::SP update,
+ OnOperationDoneType onWriteDone, PromisedDoc promisedDoc)
{
+ const DocumentUpdate & upd = *update;
Document::UP prevDoc(_summaryAdapter->get(lid, *_repo));
+ Document::UP newDoc;
assert(onWriteDone->getToken() == NULL || useDocumentStore(serialNum));
if (useDocumentStore(serialNum)) {
assert(prevDoc.get() != NULL);
@@ -391,34 +419,31 @@ StoreOnlyFeedView::updateIndexAndDocumentStore(bool indexedFieldsInScope,
// If we've passed serial number for flushed index then we could
// also check that this operation is marked for ignore by index
// proxy.
- return;
- }
- if (upd.getId() == prevDoc->getId()) {
- if (shouldTrace(onWriteDone, 1)) {
- FeedToken *token = onWriteDone->getToken();
- token->trace(1, "The update looks like : " + upd.toString(token->shouldTrace(2)));
- }
- vespalib::nbostream os;
- prevDoc->serialize(os);
- Document::SP newDoc(new Document(*_repo, os));
- if (useDocumentStore(serialNum)) {
- LOG(spam, "Original document :\n%s", newDoc->toXml(" ").c_str());
- LOG(spam, "Update\n%s", upd.toXml().c_str());
- upd.applyTo(*newDoc);
- LOG(spam, "Updated document :\n%s", newDoc->toXml(" ").c_str());
+ } else {
+ if (upd.getId() == prevDoc->getId()) {
if (shouldTrace(onWriteDone, 1)) {
- onWriteDone->getToken()->trace(1, "Then we update summary.");
+ FeedToken *token = onWriteDone->getToken();
+ token->trace(1, "The update looks like : " + upd.toString(token->shouldTrace(2)));
}
- _summaryAdapter->put(serialNum, *newDoc, lid);
- }
- if (indexedFieldsInScope) {
- updateIndexedFields(serialNum, lid, newDoc, immediateCommit, onWriteDone);
+ vespalib::nbostream os;
+ prevDoc->serialize(os);
+ newDoc = std::make_unique<Document>(*_repo, os);
+ if (useDocumentStore(serialNum)) {
+ LOG(spam, "Original document :\n%s", newDoc->toXml(" ").c_str());
+ LOG(spam, "Update\n%s", upd.toXml().c_str());
+ upd.applyTo(*newDoc);
+ LOG(spam, "Updated document :\n%s", newDoc->toXml(" ").c_str());
+ if (shouldTrace(onWriteDone, 1)) {
+ onWriteDone->getToken()->trace(1, "Then we update summary.");
+ }
+ }
+ } else {
+ // Replaying, document removed and lid reused before summary
+ // was flushed.
+ assert(onWriteDone->getToken() == NULL && !useDocumentStore(serialNum));
}
- } else {
- // Replaying, document removed and lid reused before summary
- // was flushed.
- assert(onWriteDone->getToken() == NULL && !useDocumentStore(serialNum));
}
+ promisedDoc.set_value(std::move(newDoc));
}
@@ -484,7 +509,6 @@ StoreOnlyFeedView::internalRemove(FeedToken::UP token, const RemoveOperation &rm
{
assert(rmOp.getValidNewOrPrevDbdId());
assert(rmOp.notMovingLidInSameSubDb());
-
const SerialNum serialNum = rmOp.getSerialNum();
const DocumentId &docId = rmOp.getDocumentId();
VLOG(getDebugLevel(rmOp.getNewOrPrevLid(_params._subDbId), docId),
@@ -495,10 +519,12 @@ StoreOnlyFeedView::internalRemove(FeedToken::UP token, const RemoveOperation &rm
adjustMetaStore(rmOp, docId);
considerEarlyAck(token, rmOp.getType());
+
if (rmOp.getValidDbdId(_params._subDbId)) {
Document::UP clearDoc(new Document(*_docType, docId));
clearDoc->setRepo(*_repo);
- _summaryAdapter->put(serialNum, *clearDoc, rmOp.getLid());
+
+ putSummary(serialNum, rmOp.getLid(), std::move(clearDoc));
}
if (rmOp.getValidPrevDbdId(_params._subDbId)) {
if (rmOp.changedDbdId()) {
@@ -549,7 +575,7 @@ void
StoreOnlyFeedView::internalRemove(FeedToken::UP token, SerialNum serialNum, search::DocumentIdT lid,
FeedOperation::Type opType, IDestructorCallback::SP moveDoneCtx)
{
- _summaryAdapter->remove(serialNum, lid);
+ removeSummary(serialNum, lid);
bool explicitReuseLid = _lidReuseDelayer.delayReuse(lid);
std::shared_ptr<RemoveDoneContext> onWriteDone;
if (explicitReuseLid || token) {
@@ -712,7 +738,7 @@ StoreOnlyFeedView::removeDocuments(const RemoveDocumentsOperation &op, bool remo
}
if (useDocumentStore(serialNum + 1)) {
for (const auto &lid : lidsToRemove) {
- _summaryAdapter->remove(serialNum, lid);
+ removeSummary(serialNum, lid);
}
}
if (explicitReuseLids && !onWriteDone) {
@@ -775,6 +801,7 @@ StoreOnlyFeedView::handleMove(const MoveOperation &moveOp, IDestructorCallback::
assert(moveOp.movingLidIfInSameSubDb());
const SerialNum serialNum = moveOp.getSerialNum();
+
const Document::SP &doc = moveOp.getDocument();
const DocumentId &docId = doc->getId();
VLOG(getDebugLevel(moveOp.getNewOrPrevLid(_params._subDbId), doc->getId()),
@@ -788,7 +815,7 @@ StoreOnlyFeedView::handleMove(const MoveOperation &moveOp, IDestructorCallback::
adjustMetaStore(moveOp, docId);
bool docAlreadyExists = moveOp.getValidPrevDbdId(_params._subDbId);
if (moveOp.getValidDbdId(_params._subDbId)) {
- _summaryAdapter->put(serialNum, *doc, moveOp.getLid());
+ putSummary(serialNum, moveOp.getLid(), doc);
bool immediateCommit = _commitTimeTracker.needCommit();
FeedToken::UP token;
std::shared_ptr<PutDoneContext> onWriteDone =
@@ -811,7 +838,7 @@ StoreOnlyFeedView::heartBeat(search::SerialNum serialNum)
if (serialNum > _metaStore.getLastSerialNum()) {
_metaStore.commit(serialNum, serialNum);
}
- _summaryAdapter->heartBeat(serialNum);
+ heartBeatSummary(serialNum);
heartBeatIndexedFields(serialNum);
heartBeatAttributes(serialNum);
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h
index 73811d1cedf..5e4cefe4fe9 100644
--- a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h
+++ b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h
@@ -17,6 +17,7 @@
#include <vespa/searchcorespi/index/ithreadingservice.h>
#include <vespa/searchlib/query/base.h>
#include <vespa/vespalib/util/threadstackexecutorbase.h>
+#include <future>
namespace search { class IDestructorCallback; }
@@ -54,6 +55,8 @@ public:
using OnPutDoneType = const std::shared_ptr<PutDoneContext> &;
using OnRemoveDoneType = const std::shared_ptr<RemoveDoneContext> &;
using FeedTokenUP = std::unique_ptr<FeedToken>;
+ using FutureDoc = std::shared_future<document::Document::UP>;
+ using PromisedDoc = std::promise<document::Document::UP>;
struct Context
{
@@ -121,26 +124,34 @@ protected:
}
};
-protected:
+private:
const ISummaryAdapter::SP _summaryAdapter;
- const search::index::Schema::SP _schema;
const IDocumentMetaStoreContext::SP _documentMetaStoreContext;
const document::DocumentTypeRepo::SP _repo;
- searchcorespi::index::IThreadingService &_writeService;
- PersistentParams _params;
- IDocumentMetaStore &_metaStore;
const document::DocumentType *_docType;
documentmetastore::ILidReuseDelayer &_lidReuseDelayer;
CommitTimeTracker &_commitTimeTracker;
+protected:
+ const search::index::Schema::SP _schema;
+ searchcorespi::index::IThreadingService &_writeService;
+ PersistentParams _params;
+ IDocumentMetaStore &_metaStore;
+
+private:
+ searchcorespi::index::IThreadService & summaryExecutor() {
+ return _writeService.summary();
+ }
+ void putSummary(SerialNum serialNum, search::DocumentIdT lid, FutureDoc doc);
+ void putSummary(SerialNum serialNum, search::DocumentIdT lid, document::Document::SP doc);
+ void removeSummary(SerialNum serialNum, search::DocumentIdT lid);
+ void heartBeatSummary(SerialNum serialNum);
+
+
bool useDocumentStore(SerialNum replaySerialNum) const {
return replaySerialNum > _params._flushedDocumentStoreSerialNum;
}
-
-private:
- bool
- useDocumentMetaStore(SerialNum replaySerialNum) const
- {
+ bool useDocumentMetaStore(SerialNum replaySerialNum) const {
return replaySerialNum > _params._flushedDocumentMetaStoreSerialNum;
}
@@ -148,21 +159,8 @@ private:
void internalPut(FeedTokenUP token, const PutOperation &putOp);
void internalUpdate(FeedTokenUP token, const UpdateOperation &updOp);
- void
- updateIndexAndDocumentStore(bool indexedFieldsInScope,
- SerialNum serialNum,
- search::DocumentIdT lid,
- const document::DocumentUpdate &upd,
- bool immediateCommit,
- OnOperationDoneType onWriteDone);
-
- bool
- lookupDocId(const document::DocumentId &gid,
- search::DocumentIdT & lid) const;
-
- void
- internalRemove(FeedTokenUP token,
- const RemoveOperation &rmOp);
+ bool lookupDocId(const document::DocumentId &gid, search::DocumentIdT & lid) const;
+ void internalRemove(FeedTokenUP token, const RemoveOperation &rmOp);
// Removes documents from meta store and document store.
// returns the number of documents removed.
@@ -177,15 +175,13 @@ private:
virtual void notifyGidToLidChange(const document::GlobalId &gid, uint32_t lid);
-protected:
- virtual void
- internalDeleteBucket(const DeleteBucketOperation &delOp);
-
- virtual void
- heartBeatIndexedFields(SerialNum serialNum);
+ void updateIndexAndDocumentStore(SerialNum serialNum, search::DocumentIdT lid, document::DocumentUpdate::SP upd,
+ OnOperationDoneType onWriteDone, PromisedDoc promisedDoc);
- virtual void
- heartBeatAttributes(SerialNum serialNum);
+protected:
+ virtual void internalDeleteBucket(const DeleteBucketOperation &delOp);
+ virtual void heartBeatIndexedFields(SerialNum serialNum);
+ virtual void heartBeatAttributes(SerialNum serialNum);
private:
virtual void putAttributes(SerialNum serialNum, search::DocumentIdT lid, const document::Document &doc,
@@ -201,6 +197,8 @@ private:
virtual void updateIndexedFields(SerialNum serialNum, search::DocumentIdT lid, const document::Document::SP &newDoc,
bool immediateCommit, OnOperationDoneType onWriteDone);
+ virtual void updateIndexedFields(SerialNum serialNum, search::DocumentIdT lid, FutureDoc doc,
+ bool immediateCommit, OnOperationDoneType onWriteDone);
virtual void removeAttributes(SerialNum serialNum, search::DocumentIdT lid,
bool immediateCommit, OnRemoveDoneType onWriteDone);
diff --git a/searchcore/src/vespa/searchcore/proton/server/visibilityhandler.cpp b/searchcore/src/vespa/searchcore/proton/server/visibilityhandler.cpp
index 6edf91eed97..517f2c90514 100644
--- a/searchcore/src/vespa/searchcore/proton/server/visibilityhandler.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/visibilityhandler.cpp
@@ -47,6 +47,7 @@ void VisibilityHandler::commitAndWait()
// Always sync attribute writer threads so attribute vectors are
// properly updated when document retriver rebuilds document
_writeService.attributeFieldWriter().sync();
+ _writeService.summary().sync();
}
bool VisibilityHandler::startCommit(const LockGuard & unused)
diff --git a/searchcore/src/vespa/searchcore/proton/test/threading_service_observer.h b/searchcore/src/vespa/searchcore/proton/test/threading_service_observer.h
index ac003045b85..44dc405bf93 100644
--- a/searchcore/src/vespa/searchcore/proton/test/threading_service_observer.h
+++ b/searchcore/src/vespa/searchcore/proton/test/threading_service_observer.h
@@ -15,6 +15,7 @@ private:
searchcorespi::index::IThreadingService &_service;
ThreadServiceObserver _master;
ThreadServiceObserver _index;
+ ThreadServiceObserver _summary;
search::SequencedTaskExecutorObserver _indexFieldInverter;
search::SequencedTaskExecutorObserver _indexFieldWriter;
search::SequencedTaskExecutorObserver _attributeFieldWriter;
@@ -24,6 +25,7 @@ public:
: _service(service),
_master(_service.master()),
_index(service.index()),
+ _summary(service.summary()),
_indexFieldInverter(_service.indexFieldInverter()),
_indexFieldWriter(_service.indexFieldWriter()),
_attributeFieldWriter(_service.attributeFieldWriter())
@@ -36,20 +38,17 @@ public:
const ThreadServiceObserver &indexObserver() const {
return _index;
}
- const search::SequencedTaskExecutorObserver &indexFieldInverterObserver()
- const
- {
+ const ThreadServiceObserver &summaryObserver() const {
+ return _index;
+ }
+ const search::SequencedTaskExecutorObserver &indexFieldInverterObserver() const {
return _indexFieldInverter;
}
- const search::SequencedTaskExecutorObserver &indexFieldWriterObserver()
- const
- {
+ const search::SequencedTaskExecutorObserver &indexFieldWriterObserver() const {
return _indexFieldWriter;
}
- const search::SequencedTaskExecutorObserver &attributeFieldWriterObserver()
- const
- {
+ const search::SequencedTaskExecutorObserver &attributeFieldWriterObserver() const {
return _attributeFieldWriter;
}
@@ -69,6 +68,9 @@ public:
virtual searchcorespi::index::IThreadService &index() override {
return _index;
}
+ virtual searchcorespi::index::IThreadService &summary() override {
+ return _summary;
+ }
virtual search::ISequencedTaskExecutor &indexFieldInverter() override {
return _indexFieldInverter;
}
diff --git a/searchcorespi/src/vespa/searchcorespi/index/ithreadingservice.h b/searchcorespi/src/vespa/searchcorespi/index/ithreadingservice.h
index dfd0b208178..65dd8cc1f3f 100644
--- a/searchcorespi/src/vespa/searchcorespi/index/ithreadingservice.h
+++ b/searchcorespi/src/vespa/searchcorespi/index/ithreadingservice.h
@@ -54,20 +54,11 @@ struct IThreadingService : public vespalib::Syncable
IThreadingService() = default;
virtual ~IThreadingService() {}
- /**
- * Returns a reference to the master write thread.
- */
virtual IThreadService &master() = 0;
-
- /**
- * Returns a reference to the index write thread.
- */
virtual IThreadService &index() = 0;
-
+ virtual IThreadService &summary() = 0;
virtual search::ISequencedTaskExecutor &indexFieldInverter() = 0;
-
virtual search::ISequencedTaskExecutor &indexFieldWriter() = 0;
-
virtual search::ISequencedTaskExecutor &attributeFieldWriter() = 0;
};
diff --git a/searchlib/src/vespa/searchlib/common/serialnum.h b/searchlib/src/vespa/searchlib/common/serialnum.h
index cb704ed105d..601a34eff2e 100644
--- a/searchlib/src/vespa/searchlib/common/serialnum.h
+++ b/searchlib/src/vespa/searchlib/common/serialnum.h
@@ -2,7 +2,7 @@
#pragma once
-#include <stdint.h>
+#include <cstdint>
namespace search {