aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--searchcore/src/apps/vespa-transactionlog-inspect/vespa-transactionlog-inspect.cpp1
-rw-r--r--searchcore/src/tests/proton/documentdb/configurer/configurer_test.cpp37
-rw-r--r--searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp67
-rw-r--r--searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp40
-rw-r--r--searchcore/src/tests/proton/documentdb/storeonlyfeedview/storeonlyfeedview_test.cpp12
-rw-r--r--searchcore/src/tests/proton/server/feedstates_test.cpp14
-rw-r--r--searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.h1
-rw-r--r--searchcore/src/vespa/searchcore/proton/common/commit_time_tracker.cpp18
-rw-r--r--searchcore/src/vespa/searchcore/proton/common/commit_time_tracker.h5
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/documentdb.cpp2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/documentdb.h1
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/fast_access_doc_subdb.cpp6
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/fast_access_doc_subdb.h2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/fast_access_doc_subdb_configurer.cpp7
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/fast_access_doc_subdb_configurer.h2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/fast_access_feed_view.h7
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp3
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/feedhandler.h4
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/feedstates.cpp44
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/feedstates.h7
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/ireplaypackethandler.h3
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/searchable_doc_subdb_configurer.cpp8
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/searchable_doc_subdb_configurer.h2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/searchabledocsubdb.cpp6
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/searchabledocsubdb.h4
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/storeonlydocsubdb.cpp5
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/storeonlydocsubdb.h4
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp21
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h11
29 files changed, 138 insertions, 206 deletions
diff --git a/searchcore/src/apps/vespa-transactionlog-inspect/vespa-transactionlog-inspect.cpp b/searchcore/src/apps/vespa-transactionlog-inspect/vespa-transactionlog-inspect.cpp
index 54fc072c2b1..2f34292ad52 100644
--- a/searchcore/src/apps/vespa-transactionlog-inspect/vespa-transactionlog-inspect.cpp
+++ b/searchcore/src/apps/vespa-transactionlog-inspect/vespa-transactionlog-inspect.cpp
@@ -204,6 +204,7 @@ public:
document::DocumentTypeRepo &getDeserializeRepo() override {
return _repo;
}
+ void optionalCommit(search::SerialNum) override { }
};
diff --git a/searchcore/src/tests/proton/documentdb/configurer/configurer_test.cpp b/searchcore/src/tests/proton/documentdb/configurer/configurer_test.cpp
index 15f7d5798ea..42dc804523e 100644
--- a/searchcore/src/tests/proton/documentdb/configurer/configurer_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/configurer/configurer_test.cpp
@@ -57,9 +57,9 @@ const vespalib::string DOC_TYPE("invalid");
class IndexManagerDummyReconfigurer : public searchcorespi::IIndexManager::Reconfigurer
{
- virtual bool reconfigure(vespalib::Closure0<bool>::UP closure) override {
+ bool reconfigure(vespalib::Closure0<bool>::UP closure) override {
bool ret = true;
- if (closure.get() != NULL)
+ if (closure)
ret = closure->call(); // Perform index manager reconfiguration now
return ret;
}
@@ -96,14 +96,13 @@ struct ViewSet
proton::IDocumentMetaStoreContext::SP _dmsc;
std::shared_ptr<IGidToLidChangeHandler> _gidToLidChangeHandler;
std::unique_ptr<documentmetastore::ILidReuseDelayer> _lidReuseDelayer;
- CommitTimeTracker _commitTimeTracker;
VarHolder<SearchView::SP> searchView;
VarHolder<SearchableFeedView::SP> feedView;
HwInfo _hwInfo;
ViewSet();
~ViewSet();
- ViewPtrs getViewPtrs() {
+ ViewPtrs getViewPtrs() const {
ViewPtrs ptrs;
ptrs.sv = searchView.get();
ptrs.fv = feedView.get();
@@ -126,7 +125,6 @@ ViewSet::ViewSet()
_dmsc(),
_gidToLidChangeHandler(),
_lidReuseDelayer(),
- _commitTimeTracker(vespalib::duration::zero()),
searchView(),
feedView(),
_hwInfo()
@@ -177,8 +175,8 @@ Fixture::Fixture()
vespalib::rmdir(BASE_DIR, true);
vespalib::mkdir(BASE_DIR);
initViewSet(_views);
- _configurer.reset(new Configurer(_views._summaryMgr, _views.searchView, _views.feedView, _queryLimiter,
- _constantValueRepo, _clock, "test", 0));
+ _configurer = std::make_unique<Configurer>(_views._summaryMgr, _views.searchView, _views.feedView, _queryLimiter,
+ _constantValueRepo, _clock, "test", 0);
}
Fixture::~Fixture() = default;
@@ -207,7 +205,7 @@ Fixture::initViewSet(ViewSet &views)
views._dmsc = metaStore;
views._lidReuseDelayer = std::make_unique<documentmetastore::LidReuseDelayer>(views._writeService, metaStore->get());
IndexSearchable::SP indexSearchable;
- MatchView::SP matchView(new MatchView(matchers, indexSearchable, attrMgr, sesMgr, metaStore, views._docIdLimit));
+ auto matchView = std::make_shared<MatchView>(matchers, indexSearchable, attrMgr, sesMgr, metaStore, views._docIdLimit);
views.searchView.set(SearchView::create
(summaryMgr->createSummarySetup(SummaryConfig(), SummarymapConfig(),
JuniperrcConfig(), views.repo, attrMgr),
@@ -219,8 +217,7 @@ Fixture::initViewSet(ViewSet &views)
*views._gidToLidChangeHandler,
views.repo,
views._writeService,
- *views._lidReuseDelayer,
- views._commitTimeTracker),
+ *views._lidReuseDelayer),
SearchableFeedView::PersistentParams(
views.serialNum,
views.serialNum,
@@ -244,10 +241,9 @@ struct MyFastAccessFeedView
proton::IDocumentMetaStoreContext::SP _dmsc;
std::shared_ptr<IGidToLidChangeHandler> _gidToLidChangeHandler;
std::unique_ptr<documentmetastore::ILidReuseDelayer> _lidReuseDelayer;
- CommitTimeTracker _commitTimeTracker;
VarHolder<FastAccessFeedView::SP> _feedView;
- MyFastAccessFeedView(IThreadingService &writeService)
+ explicit MyFastAccessFeedView(IThreadingService &writeService)
: _fileHeaderContext(),
_docIdLimit(0),
_writeService(writeService),
@@ -255,7 +251,6 @@ struct MyFastAccessFeedView
_dmsc(),
_gidToLidChangeHandler(make_shared<DummyGidToLidChangeHandler>()),
_lidReuseDelayer(),
- _commitTimeTracker(vespalib::duration::zero()),
_feedView()
{
init();
@@ -267,16 +262,16 @@ struct MyFastAccessFeedView
ISummaryAdapter::SP summaryAdapter(new MySummaryAdapter());
Schema::SP schema(new Schema());
_dmsc = make_shared<DocumentMetaStoreContext>(std::make_shared<BucketDBOwner>());
- _lidReuseDelayer.reset(new documentmetastore::LidReuseDelayer(_writeService, _dmsc->get()));
+ _lidReuseDelayer = std::make_unique<documentmetastore::LidReuseDelayer>(_writeService, _dmsc->get());
std::shared_ptr<const DocumentTypeRepo> repo = createRepo();
StoreOnlyFeedView::Context storeOnlyCtx(summaryAdapter, schema, _dmsc, *_gidToLidChangeHandler, repo,
- _writeService, *_lidReuseDelayer, _commitTimeTracker);
+ _writeService, *_lidReuseDelayer);
StoreOnlyFeedView::PersistentParams params(1, 1, DocTypeName(DOC_TYPE), 0, SubDbType::NOTREADY);
auto mgr = make_shared<AttributeManager>(BASE_DIR, "test.subdb", TuneFileAttributes(), _fileHeaderContext,
_writeService.attributeFieldWriter(), _writeService.shared(), _hwInfo);
- IAttributeWriter::SP writer(new AttributeWriter(mgr));
+ auto writer = std::make_shared<AttributeWriter>(mgr);
FastAccessFeedView::Context fastUpdateCtx(writer, _docIdLimit);
- _feedView.set(FastAccessFeedView::SP(new FastAccessFeedView(storeOnlyCtx, params, fastUpdateCtx)));;
+ _feedView.set(std::make_shared<FastAccessFeedView>(storeOnlyCtx, params, fastUpdateCtx));
}
};
@@ -370,7 +365,7 @@ SearchViewComparer::SearchViewComparer(SearchView::SP old, SearchView::SP new_)
: _old(std::move(old)),
_new(std::move(new_))
{}
-SearchViewComparer::~SearchViewComparer() {}
+SearchViewComparer::~SearchViewComparer() = default;
struct FeedViewComparer
@@ -409,7 +404,7 @@ FeedViewComparer::FeedViewComparer(SearchableFeedView::SP old, SearchableFeedVie
: _old(std::move(old)),
_new(std::move(new_))
{}
-FeedViewComparer::~FeedViewComparer() {}
+FeedViewComparer::~FeedViewComparer() = default;
struct FastAccessFeedViewComparer
{
@@ -435,7 +430,7 @@ FastAccessFeedViewComparer::FastAccessFeedViewComparer(FastAccessFeedView::SP ol
: _old(std::move(old)),
_new(std::move(new_))
{}
-FastAccessFeedViewComparer::~FastAccessFeedViewComparer() {}
+FastAccessFeedViewComparer::~FastAccessFeedViewComparer() = default;
TEST_F("require that we can reconfigure index searchable", Fixture)
{
@@ -467,7 +462,7 @@ TEST_F("require that we can reconfigure index searchable", Fixture)
const AttributeManager *
asAttributeManager(const proton::IAttributeManager::SP &attrMgr)
{
- const AttributeManager *result = dynamic_cast<const AttributeManager *>(attrMgr.get());
+ auto result = dynamic_cast<const AttributeManager *>(attrMgr.get());
ASSERT_TRUE(result != nullptr);
return result;
}
diff --git a/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp b/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp
index d1d08a332e3..4a01d7ae3e8 100644
--- a/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp
@@ -107,17 +107,13 @@ struct MyOwner : public IFeedHandlerOwner
_allowPrune(false)
{
}
- virtual void onTransactionLogReplayDone() override {
+ void onTransactionLogReplayDone() override {
LOG(info, "MyOwner::onTransactionLogReplayDone()");
}
- virtual void enterRedoReprocessState() override {}
- virtual void onPerformPrune(SerialNum) override {}
+ void enterRedoReprocessState() override {}
+ void onPerformPrune(SerialNum) override {}
- virtual bool
- getAllowPrune() const override
- {
- return _allowPrune;
- }
+ bool getAllowPrune() const override { return _allowPrune; }
};
@@ -130,15 +126,15 @@ struct MyResourceWriteFilter : public IResourceWriteFilter
_message()
{}
- virtual bool acceptWriteOperation() const override { return _acceptWriteOperation; }
- virtual State getAcceptState() const override {
+ bool acceptWriteOperation() const override { return _acceptWriteOperation; }
+ State getAcceptState() const override {
return IResourceWriteFilter::State(acceptWriteOperation(), _message);
}
};
struct MyReplayConfig : public IReplayConfig {
- virtual void replayConfig(SerialNum) override {}
+ void replayConfig(SerialNum) override {}
};
struct MyDocumentMetaStore {
@@ -172,7 +168,7 @@ struct MyDocumentMetaStore {
if (itr != _allocated.end()) {
return &itr->second;
}
- return NULL;
+ return nullptr;
}
};
@@ -197,9 +193,9 @@ struct MyFeedView : public test::DummyFeedView {
void preparePut(PutOperation &op) override {
prepareDocumentOperation(op, op.getDocument()->getId().getGlobalId());
}
- void prepareDocumentOperation(DocumentOperation &op, const GlobalId &gid) {
+ void prepareDocumentOperation(DocumentOperation &op, const GlobalId &gid) const {
const MyDocumentMetaStore::Entry *entry = metaStore.get(gid);
- if (entry != NULL) {
+ if (entry != nullptr) {
op.setDbDocumentId(entry->_id);
op.setPrevDbDocumentId(entry->_prevId);
op.setPrevTimestamp(entry->_prevTimestamp);
@@ -209,7 +205,7 @@ struct MyFeedView : public test::DummyFeedView {
(void) token;
LOG(info, "MyFeedView::handlePut(): docId(%s), putCount(%u), putLatchCount(%u)",
putOp.getDocument()->getId().toString().c_str(), put_count,
- (putLatch.get() != NULL ? putLatch->getCount() : 0u));
+ (putLatch ? putLatch->getCount() : 0u));
if (usePutRdz) {
putRdz.run();
}
@@ -218,7 +214,7 @@ struct MyFeedView : public test::DummyFeedView {
++put_count;
put_serial = putOp.getSerialNum();
metaStore.allocate(putOp.getDocument()->getId().getGlobalId());
- if (putLatch.get() != NULL) {
+ if (putLatch) {
putLatch->countDown();
}
}
@@ -240,9 +236,9 @@ struct MyFeedView : public test::DummyFeedView {
void heartBeat(SerialNum) override { ++heartbeat_count; }
void handlePruneRemovedDocuments(const PruneRemovedDocumentsOperation &) override { ++prune_removed_count; }
const ISimpleDocumentMetaStore *getDocumentMetaStorePtr() const override {
- return NULL;
+ return nullptr;
}
- void checkCounts(int exp_update_count, SerialNum exp_update_serial, int exp_put_count, SerialNum exp_put_serial) {
+ void checkCounts(int exp_update_count, SerialNum exp_update_serial, int exp_put_count, SerialNum exp_put_serial) const {
EXPECT_EQUAL(exp_update_count, update_count);
EXPECT_EQUAL(exp_update_serial, update_serial);
EXPECT_EQUAL(exp_put_count, put_count);
@@ -266,7 +262,7 @@ MyFeedView::MyFeedView(const std::shared_ptr<const DocumentTypeRepo> &dtr, const
update_serial(0),
documentType(dtr->getDocumentType(docTypeName.getName()))
{}
-MyFeedView::~MyFeedView() {}
+MyFeedView::~MyFeedView() = default;
struct SchemaContext {
@@ -358,7 +354,7 @@ struct MyTransport : public feedtoken::ITransport {
ResultUP result;
bool documentWasFound;
MyTransport();
- ~MyTransport();
+ ~MyTransport() override;
void send(ResultUP res, bool documentWasFound_) override {
result = std::move(res);
documentWasFound = documentWasFound_;
@@ -421,8 +417,8 @@ struct PutHandler {
puts.push_back(pc);
}
bool await(uint32_t timeout = 80000) {
- for (size_t i = 0; i < puts.size(); ++i) {
- if (!puts[i]->tokenCtx.await(timeout)) {
+ for (const auto & put : puts) {
+ if (!put->tokenCtx.await(timeout)) {
return false;
}
}
@@ -475,7 +471,7 @@ struct FeedHandlerFixture
feedView(schema.getRepo(), schema.getDocType()),
_bucketDB(),
_bucketDBHandler(_bucketDB),
- handler(writeService, tlsSpec, schema.getDocType(), _state, owner,
+ handler(writeService, tlsSpec, schema.getDocType(), owner,
writeFilter, replayConfig, tls, &tls_writer)
{
_state.enterLoadState();
@@ -500,18 +496,17 @@ struct FeedHandlerFixture
struct MyConfigStore : ConfigStore {
- virtual SerialNum getBestSerialNum() const override { return 1; }
- virtual SerialNum getOldestSerialNum() const override { return 1; }
- virtual void saveConfig(const DocumentDBConfig &, SerialNum) override {}
- virtual void loadConfig(const DocumentDBConfig &, SerialNum,
- DocumentDBConfig::SP &) override {}
- virtual void removeInvalid() override {}
+ SerialNum getBestSerialNum() const override { return 1; }
+ SerialNum getOldestSerialNum() const override { return 1; }
+ void saveConfig(const DocumentDBConfig &, SerialNum) override {}
+ void loadConfig(const DocumentDBConfig &, SerialNum, DocumentDBConfig::SP &) override {}
+ void removeInvalid() override {}
void prune(SerialNum) override {}
- virtual bool hasValidSerial(SerialNum) const override { return true; }
- virtual SerialNum getPrevValidSerial(SerialNum) const override { return 1; }
- virtual void serializeConfig(SerialNum, vespalib::nbostream &) override {}
- virtual void deserializeConfig(SerialNum, vespalib::nbostream &) override {}
- virtual void setProtonConfig(const ProtonConfigSP &) override { }
+ bool hasValidSerial(SerialNum) const override { return true; }
+ SerialNum getPrevValidSerial(SerialNum) const override { return 1; }
+ void serializeConfig(SerialNum, vespalib::nbostream &) override {}
+ void deserializeConfig(SerialNum, vespalib::nbostream &) override {}
+ void setProtonConfig(const ProtonConfigSP &) override { }
};
@@ -637,7 +632,7 @@ TEST_F("require that partial update for non-existing document is tagged as such"
auto op = std::make_unique<UpdateOperation>(upCtx.bucketId, Timestamp(10), upCtx.update);
FeedTokenContext token_context;
f.handler.performOperation(std::move(token_context.token), std::move(op));
- const UpdateResult *result = static_cast<const UpdateResult *>(token_context.getResult());
+ const auto *result = dynamic_cast<const UpdateResult *>(token_context.getResult());
EXPECT_FALSE(token_context.transport.documentWasFound);
EXPECT_EQUAL(0u, result->getExistingTimestamp());
@@ -655,7 +650,7 @@ TEST_F("require that partial update for non-existing document is created if spec
auto op = std::make_unique<UpdateOperation>(upCtx.bucketId, Timestamp(10), upCtx.update);
FeedTokenContext token_context;
f.handler.performOperation(std::move(token_context.token), std::move(op));
- const UpdateResult *result = static_cast<const UpdateResult *>(token_context.getResult());
+ const auto * result = dynamic_cast<const UpdateResult *>(token_context.getResult());
EXPECT_TRUE(token_context.transport.documentWasFound);
EXPECT_EQUAL(10u, result->getExistingTimestamp());
diff --git a/searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp b/searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp
index a538d36ddf5..9690133d247 100644
--- a/searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp
@@ -3,7 +3,6 @@
#include <vespa/searchcore/proton/attribute/i_attribute_writer.h>
#include <vespa/searchcore/proton/attribute/ifieldupdatecallback.h>
#include <vespa/searchcore/proton/test/bucketfactory.h>
-#include <vespa/searchcore/proton/common/commit_time_tracker.h>
#include <vespa/searchcore/proton/common/feedtoken.h>
#include <vespa/searchcore/proton/documentmetastore/lidreusedelayer.h>
#include <vespa/searchcore/proton/index/i_index_writer.h>
@@ -521,7 +520,6 @@ struct FixtureBase
ExecutorThreadingService _writeServiceReal;
test::ThreadingServiceObserver _writeService;
documentmetastore::LidReuseDelayer _lidReuseDelayer;
- CommitTimeTracker _commitTimeTracker;
SerialNum serial;
std::shared_ptr<MyGidToLidChangeHandler> _gidToLidChangeHandler;
FixtureBase(vespalib::duration visibilityDelay);
@@ -706,7 +704,6 @@ FixtureBase::FixtureBase(vespalib::duration visibilityDelay)
_writeServiceReal(_sharedExecutor),
_writeService(_writeServiceReal),
_lidReuseDelayer(_writeService, _dmsc->get()),
- _commitTimeTracker(visibilityDelay),
serial(0),
_gidToLidChangeHandler(std::make_shared<MyGidToLidChangeHandler>())
{
@@ -736,8 +733,7 @@ struct SearchableFeedViewFixture : public FixtureBase
*_gidToLidChangeHandler,
sc.getRepo(),
_writeService,
- _lidReuseDelayer,
- _commitTimeTracker),
+ _lidReuseDelayer),
pc.getParams(),
FastAccessFeedView::Context(aw, _docIdLimit),
SearchableFeedView::Context(iw))
@@ -758,13 +754,12 @@ struct FastAccessFeedViewFixture : public FixtureBase
*_gidToLidChangeHandler,
sc.getRepo(),
_writeService,
- _lidReuseDelayer,
- _commitTimeTracker),
+ _lidReuseDelayer),
pc.getParams(),
FastAccessFeedView::Context(aw, _docIdLimit))
{
}
- virtual IFeedView &getFeedView() override { return fv; }
+ IFeedView &getFeedView() override { return fv; }
};
void
@@ -1229,34 +1224,9 @@ TEST_F("require that commit is not called when inside a commit interval",
"ack(Result(0, ))");
}
-TEST_F("require that commit is called when crossing a commit interval",
- SearchableFeedViewFixture(SHORT_DELAY))
-{
- std::this_thread::sleep_for(SHORT_DELAY + 100ms);
- DocumentContext dc = f.doc1();
- f.putAndWait(dc);
- EXPECT_EQUAL(1u, f.miw._commitCount);
- EXPECT_EQUAL(1u, f.maw._commitCount);
- EXPECT_EQUAL(2u, f._docIdLimit.get());
- std::this_thread::sleep_for(SHORT_DELAY + 100ms);
- f.removeAndWait(dc);
- EXPECT_EQUAL(2u, f.miw._commitCount);
- EXPECT_EQUAL(2u, f.maw._commitCount);
- f.assertTrace("put(adapter=attribute,serialNum=1,lid=1,commit=1),"
- "put(adapter=index,serialNum=1,lid=1,commit=0),"
- "commit(adapter=index,serialNum=1),"
- "ack(Result(0, )),"
- "remove(adapter=attribute,serialNum=2,lid=1,commit=1),"
- "remove(adapter=index,serialNum=2,lid=1,commit=0),"
- "commit(adapter=index,serialNum=2),"
- "ack(Result(0, ))");
-}
-
-
-TEST_F("require that commit is not implicitly called after handover to maintenance job",
+TEST_F("require that commit is not implicitly called",
SearchableFeedViewFixture(SHORT_DELAY))
{
- f._commitTimeTracker.setReplayDone();
std::this_thread::sleep_for(SHORT_DELAY + 100ms);
DocumentContext dc = f.doc1();
f.putAndWait(dc);
@@ -1279,7 +1249,6 @@ TEST_F("require that commit is not implicitly called after handover to maintenan
TEST_F("require that forceCommit updates docid limit",
SearchableFeedViewFixture(LONG_DELAY))
{
- f._commitTimeTracker.setReplayDone();
DocumentContext dc = f.doc1();
f.putAndWait(dc);
EXPECT_EQUAL(0u, f.miw._commitCount);
@@ -1298,7 +1267,6 @@ TEST_F("require that forceCommit updates docid limit",
TEST_F("require that forceCommit updates docid limit during shrink", SearchableFeedViewFixture(LONG_DELAY))
{
- f._commitTimeTracker.setReplayDone();
f.putAndWait(f.makeDummyDocs(0, 3, 1000));
EXPECT_EQUAL(0u, f._docIdLimit.get());
f.forceCommitAndWait();
diff --git a/searchcore/src/tests/proton/documentdb/storeonlyfeedview/storeonlyfeedview_test.cpp b/searchcore/src/tests/proton/documentdb/storeonlyfeedview/storeonlyfeedview_test.cpp
index 7c9df8d3e47..eae7425148c 100644
--- a/searchcore/src/tests/proton/documentdb/storeonlyfeedview/storeonlyfeedview_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/storeonlyfeedview/storeonlyfeedview_test.cpp
@@ -3,7 +3,6 @@
#include <vespa/document/base/documentid.h>
#include <vespa/document/datatype/datatype.h>
#include <vespa/searchcommon/common/schema.h>
-#include <vespa/searchcore/proton/common/commit_time_tracker.h>
#include <vespa/searchcore/proton/documentmetastore/lidreusedelayer.h>
#include <vespa/searchcore/proton/server/executorthreadingservice.h>
#include <vespa/searchcore/proton/server/putdonecontext.h>
@@ -88,7 +87,6 @@ struct MyMinimalFeedView : public MyMinimalFeedViewBase, public StoreOnlyFeedVie
const DocumentMetaStore::SP &metaStore,
searchcorespi::index::IThreadingService &writeService,
documentmetastore::ILidReuseDelayer &lidReuseDelayer,
- CommitTimeTracker &commitTimeTracker,
const PersistentParams &params,
int &outstandingMoveOps_) :
MyMinimalFeedViewBase(),
@@ -98,8 +96,7 @@ struct MyMinimalFeedView : public MyMinimalFeedViewBase, public StoreOnlyFeedVie
*gidToLidChangeHandler,
myGetDocumentTypeRepo(),
writeService,
- lidReuseDelayer,
- commitTimeTracker),
+ lidReuseDelayer),
params),
removeMultiAttributesCount(0),
removeMultiIndexFieldsCount(0),
@@ -138,11 +135,10 @@ struct MoveOperationFeedView : public MyMinimalFeedView {
const DocumentMetaStore::SP &metaStore,
searchcorespi::index::IThreadingService &writeService,
documentmetastore::ILidReuseDelayer &lidReuseDelayer,
- CommitTimeTracker &commitTimeTracker,
const PersistentParams &params,
int &outstandingMoveOps_) :
MyMinimalFeedView(summaryAdapter, metaStore, writeService, lidReuseDelayer,
- commitTimeTracker, params, outstandingMoveOps_),
+ params, outstandingMoveOps_),
putAttributesCount(0),
putIndexFieldsCount(0),
removeAttributesCount(0),
@@ -196,7 +192,6 @@ struct FixtureBase {
vespalib::ThreadStackExecutor sharedExecutor;
ExecutorThreadingService writeService;
documentmetastore::LidReuseDelayer lidReuseDelayer;
- CommitTimeTracker commitTimeTracker;
typename FeedViewType::UP feedview;
FixtureBase(SubDbType subDbType = SubDbType::READY)
@@ -212,14 +207,13 @@ struct FixtureBase {
sharedExecutor(1, 0x10000),
writeService(sharedExecutor),
lidReuseDelayer(writeService, *metaStore),
- commitTimeTracker(vespalib::duration::zero()),
feedview()
{
StoreOnlyFeedView::PersistentParams params(0, 0, DocTypeName("foo"), subdb_id, subDbType);
metaStore->constructFreeList();
ISummaryAdapter::SP adapter = std::make_unique<MySummaryAdapter>(removeCount, putCount, heartbeatCount);
feedview = std::make_unique<FeedViewType>(adapter, metaStore, writeService, lidReuseDelayer,
- commitTimeTracker, params, outstandingMoveOps);
+ params, outstandingMoveOps);
}
~FixtureBase() {
diff --git a/searchcore/src/tests/proton/server/feedstates_test.cpp b/searchcore/src/tests/proton/server/feedstates_test.cpp
index 6643eb925b9..fd1e24c1f17 100644
--- a/searchcore/src/tests/proton/server/feedstates_test.cpp
+++ b/searchcore/src/tests/proton/server/feedstates_test.cpp
@@ -39,21 +39,21 @@ struct MyFeedView : public test::DummyFeedView {
int remove_handled;
MyFeedView();
- ~MyFeedView();
+ ~MyFeedView() override;
const std::shared_ptr<const DocumentTypeRepo> &getDocumentTypeRepo() const override { return repo_sp; }
void handleRemove(FeedToken , const RemoveOperation &) override { ++remove_handled; }
};
MyFeedView::MyFeedView() : repo_sp(repo.getTypeRepoSp()), remove_handled(0) {}
-MyFeedView::~MyFeedView() {}
+MyFeedView::~MyFeedView() = default;
struct MyReplayConfig : IReplayConfig {
- virtual void replayConfig(SerialNum) override {}
+ void replayConfig(SerialNum) override {}
};
struct InstantExecutor : vespalib::Executor {
- virtual Task::UP execute(Task::UP task) override {
+ Task::UP execute(Task::UP task) override {
task->run();
return Task::UP();
}
@@ -95,7 +95,7 @@ struct RemoveOperationContext
nbostream str;
std::unique_ptr<Packet> packet;
- RemoveOperationContext(search::SerialNum serial);
+ explicit RemoveOperationContext(search::SerialNum serial);
~RemoveOperationContext();
};
@@ -106,14 +106,14 @@ RemoveOperationContext::RemoveOperationContext(search::SerialNum serial)
{
op.serialize(str);
ConstBufferRef buf(str.data(), str.wp());
- packet.reset(new Packet());
+ packet = std::make_unique<Packet>();
packet->add(Packet::Entry(serial, FeedOperation::REMOVE, buf));
}
RemoveOperationContext::~RemoveOperationContext() = default;
TEST_F("require that active FeedView can change during replay", Fixture)
{
RemoveOperationContext opCtx(10);
- PacketWrapper::SP wrap(new PacketWrapper(*opCtx.packet, NULL));
+ auto wrap = std::make_shared<PacketWrapper>(*opCtx.packet, nullptr);
InstantExecutor executor;
EXPECT_EQUAL(0, f.feed_view1.remove_handled);
diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.h b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.h
index eaf8abe4872..9e9e8910669 100644
--- a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.h
+++ b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.h
@@ -3,7 +3,6 @@
#include "i_attribute_manager.h"
#include "i_attribute_writer.h"
-#include <vespa/searchcore/proton/common/commit_time_tracker.h>
#include <vespa/document/base/fieldpath.h>
#include <vespa/vespalib/util/isequencedtaskexecutor.h>
#include <vespa/vespalib/stllike/hash_map.h>
diff --git a/searchcore/src/vespa/searchcore/proton/common/commit_time_tracker.cpp b/searchcore/src/vespa/searchcore/proton/common/commit_time_tracker.cpp
index eaf515d4fa5..f6bac549691 100644
--- a/searchcore/src/vespa/searchcore/proton/common/commit_time_tracker.cpp
+++ b/searchcore/src/vespa/searchcore/proton/common/commit_time_tracker.cpp
@@ -6,8 +6,7 @@ namespace proton {
CommitTimeTracker::CommitTimeTracker(vespalib::duration visibilityDelay)
: _visibilityDelay(visibilityDelay),
- _nextCommit(vespalib::steady_clock::now()),
- _replayDone(false)
+ _nextCommit(vespalib::steady_clock::now())
{
_nextCommit = _nextCommit + visibilityDelay;
}
@@ -16,9 +15,6 @@ bool
CommitTimeTracker::needCommit() const
{
if (hasVisibilityDelay()) {
- if (_replayDone) {
- return false; // maintenance job will do forced commits now
- }
vespalib::steady_time now(vespalib::steady_clock::now());
if (now > _nextCommit) {
_nextCommit = now + _visibilityDelay;
@@ -26,17 +22,7 @@ CommitTimeTracker::needCommit() const
}
return false;
}
- return true;
-}
-
-void
-CommitTimeTracker::setVisibilityDelay(vespalib::duration visibilityDelay)
-{
- vespalib::steady_time nextCommit = vespalib::steady_clock::now() + visibilityDelay;
- if (nextCommit < _nextCommit) {
- _nextCommit = nextCommit;
- }
- _visibilityDelay = visibilityDelay;
+ return false;
}
}
diff --git a/searchcore/src/vespa/searchcore/proton/common/commit_time_tracker.h b/searchcore/src/vespa/searchcore/proton/common/commit_time_tracker.h
index dda9d43ce67..c5fdbc7bda2 100644
--- a/searchcore/src/vespa/searchcore/proton/common/commit_time_tracker.h
+++ b/searchcore/src/vespa/searchcore/proton/common/commit_time_tracker.h
@@ -13,14 +13,11 @@ class CommitTimeTracker
private:
vespalib::duration _visibilityDelay;
mutable vespalib::steady_time _nextCommit;
- bool _replayDone;
+ bool hasVisibilityDelay() const { return _visibilityDelay != vespalib::duration::zero(); }
public:
CommitTimeTracker(vespalib::duration visibilityDelay);
bool needCommit() const;
- void setVisibilityDelay(vespalib::duration visibilityDelay);
- bool hasVisibilityDelay() const { return _visibilityDelay != vespalib::duration::zero(); }
- void setReplayDone() { _replayDone = true; }
};
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp
index 6093ef06c5b..df01d2cc06f 100644
--- a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp
@@ -166,7 +166,7 @@ DocumentDB::DocumentDB(const vespalib::string &baseDir,
_dmUsageForwarder(_writeService.master()),
_writeFilter(),
_transient_memory_usage_provider(std::make_shared<TransientMemoryUsageProvider>()),
- _feedHandler(_writeService, tlsSpec, docTypeName, _state, *this, _writeFilter, *this, tlsDirectWriter),
+ _feedHandler(_writeService, tlsSpec, docTypeName, *this, _writeFilter, *this, tlsDirectWriter),
_subDBs(*this, *this, _feedHandler, _docTypeName, _writeService, warmupExecutor, fileHeaderContext,
metricsWireService, getMetrics(), queryLimiter, clock, _configMutex, _baseDir,
makeSubDBConfig(protonCfg.distribution,
diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.h b/searchcore/src/vespa/searchcore/proton/server/documentdb.h
index 33dfa85b628..7db4e908878 100644
--- a/searchcore/src/vespa/searchcore/proton/server/documentdb.h
+++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.h
@@ -202,7 +202,6 @@ private:
* Implements IFeedHandlerOwner
**/
bool getAllowPrune() const override;
-
void startTransactionLogReplay();
diff --git a/searchcore/src/vespa/searchcore/proton/server/fast_access_doc_subdb.cpp b/searchcore/src/vespa/searchcore/proton/server/fast_access_doc_subdb.cpp
index 063561347b4..a3f4ef2e05e 100644
--- a/searchcore/src/vespa/searchcore/proton/server/fast_access_doc_subdb.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/fast_access_doc_subdb.cpp
@@ -110,13 +110,13 @@ FastAccessDocSubDB::createAttributeSpec(const AttributesConfig &attrCfg, SerialN
}
void
-FastAccessDocSubDB::initFeedView(const IAttributeWriter::SP &writer, const DocumentDBConfig &configSnapshot)
+FastAccessDocSubDB::initFeedView(IAttributeWriter::SP writer, const DocumentDBConfig &configSnapshot)
{
// Called by executor thread
auto feedView = std::make_shared<FastAccessFeedView>(
getStoreOnlyFeedViewContext(configSnapshot),
getFeedViewPersistentParams(),
- FastAccessFeedView::Context(writer, _docIdLimit));
+ FastAccessFeedView::Context(std::move(writer), _docIdLimit));
_fastAccessFeedView.set(feedView);
_iFeedView.set(_fastAccessFeedView.get());
@@ -236,7 +236,7 @@ FastAccessDocSubDB::initViews(const DocumentDBConfig &configSnapshot,
auto writer = std::make_shared<AttributeWriter>(getAndResetInitAttributeManager());
{
std::lock_guard<std::mutex> guard(_configMutex);
- initFeedView(writer, configSnapshot);
+ initFeedView(std::move(writer), configSnapshot);
}
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/fast_access_doc_subdb.h b/searchcore/src/vespa/searchcore/proton/server/fast_access_doc_subdb.h
index 89961d7f1ef..2bf574aba10 100644
--- a/searchcore/src/vespa/searchcore/proton/server/fast_access_doc_subdb.h
+++ b/searchcore/src/vespa/searchcore/proton/server/fast_access_doc_subdb.h
@@ -74,7 +74,7 @@ private:
std::shared_ptr<AttributeManager::SP> attrMgrResult) const;
void setupAttributeManager(AttributeManager::SP attrMgrResult);
- void initFeedView(const IAttributeWriter::SP &writer, const DocumentDBConfig &configSnapshot);
+ void initFeedView(IAttributeWriter::SP writer, const DocumentDBConfig &configSnapshot);
protected:
using Parent = StoreOnlyDocSubDB;
diff --git a/searchcore/src/vespa/searchcore/proton/server/fast_access_doc_subdb_configurer.cpp b/searchcore/src/vespa/searchcore/proton/server/fast_access_doc_subdb_configurer.cpp
index 697219db563..10c9565f0fe 100644
--- a/searchcore/src/vespa/searchcore/proton/server/fast_access_doc_subdb_configurer.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/fast_access_doc_subdb_configurer.cpp
@@ -18,7 +18,7 @@ void
FastAccessDocSubDBConfigurer::reconfigureFeedView(const FastAccessFeedView::SP &curr,
const Schema::SP &schema,
const std::shared_ptr<const DocumentTypeRepo> &repo,
- const IAttributeWriter::SP &writer)
+ IAttributeWriter::SP writer)
{
_feedView.set(std::make_shared<FastAccessFeedView>(
StoreOnlyFeedView::Context(curr->getSummaryAdapter(),
@@ -27,10 +27,9 @@ FastAccessDocSubDBConfigurer::reconfigureFeedView(const FastAccessFeedView::SP &
curr->getGidToLidChangeHandler(),
repo,
curr->getWriteService(),
- curr->getLidReuseDelayer(),
- curr->getCommitTimeTracker()),
+ curr->getLidReuseDelayer()),
curr->getPersistentParams(),
- FastAccessFeedView::Context(writer,curr->getDocIdLimit())));
+ FastAccessFeedView::Context(std::move(writer),curr->getDocIdLimit())));
}
FastAccessDocSubDBConfigurer::FastAccessDocSubDBConfigurer(FeedViewVarHolder &feedView,
diff --git a/searchcore/src/vespa/searchcore/proton/server/fast_access_doc_subdb_configurer.h b/searchcore/src/vespa/searchcore/proton/server/fast_access_doc_subdb_configurer.h
index e1744ff2d28..132f6e61d3a 100644
--- a/searchcore/src/vespa/searchcore/proton/server/fast_access_doc_subdb_configurer.h
+++ b/searchcore/src/vespa/searchcore/proton/server/fast_access_doc_subdb_configurer.h
@@ -25,7 +25,7 @@ private:
void reconfigureFeedView(const FastAccessFeedView::SP &curr,
const search::index::Schema::SP &schema,
const std::shared_ptr<const document::DocumentTypeRepo> &repo,
- const IAttributeWriter::SP &attrWriter);
+ IAttributeWriter::SP attrWriter);
public:
FastAccessDocSubDBConfigurer(FeedViewVarHolder &feedView,
diff --git a/searchcore/src/vespa/searchcore/proton/server/fast_access_feed_view.h b/searchcore/src/vespa/searchcore/proton/server/fast_access_feed_view.h
index ed36266d74f..8745bb2ad35 100644
--- a/searchcore/src/vespa/searchcore/proton/server/fast_access_feed_view.h
+++ b/searchcore/src/vespa/searchcore/proton/server/fast_access_feed_view.h
@@ -22,11 +22,10 @@ public:
struct Context
{
- const IAttributeWriter::SP &_attrWriter;
+ const IAttributeWriter::SP _attrWriter;
DocIdLimit &_docIdLimit;
- Context(const IAttributeWriter::SP &attrWriter,
- DocIdLimit &docIdLimit)
- : _attrWriter(attrWriter),
+ Context(IAttributeWriter::SP attrWriter, DocIdLimit &docIdLimit)
+ : _attrWriter(std::move(attrWriter)),
_docIdLimit(docIdLimit)
{ }
};
diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
index 3d21a080ae1..684bf0125ac 100644
--- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
@@ -320,14 +320,13 @@ void
FeedHandler::changeFeedState(FeedState::SP newState, const std::lock_guard<std::mutex> &)
{
LOG(debug, "Change feed state from '%s' -> '%s'", _feedState->getName().c_str(), newState->getName().c_str());
- _feedState = newState;
+ _feedState = std::move(newState);
}
FeedHandler::FeedHandler(IThreadingService &writeService,
const vespalib::string &tlsSpec,
const DocTypeName &docTypeName,
- [[maybe_unused]] DDBState &state,
IFeedHandlerOwner &owner,
const IResourceWriteFilter &writeFilter,
IReplayConfig &replayConfig,
diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h
index af06f898716..2e6b2616118 100644
--- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h
+++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h
@@ -142,7 +142,6 @@ public:
* @param writeService The thread service used for all write tasks.
* @param tlsSpec The spec to connect to the transaction log server.
* @param docTypeName The name and version of the document type we are feed handler for.
- * @param state Document db state
* @param owner Reference to the owner of this feed handler.
* @param replayConfig Reference to interface used for replaying config changes.
* @param writer Inject writer for tls, or nullptr to use internal.
@@ -150,7 +149,6 @@ public:
FeedHandler(IThreadingService &writeService,
const vespalib::string &tlsSpec,
const DocTypeName &docTypeName,
- DDBState &state,
IFeedHandlerOwner &owner,
const IResourceWriteFilter &writerFilter,
IReplayConfig &replayConfig,
@@ -229,7 +227,7 @@ public:
void handleMove(MoveOperation &op, std::shared_ptr<search::IDestructorCallback> moveDoneCtx) override;
void heartBeat() override;
- virtual void sync();
+ void sync();
RPC::Result receive(const Packet &packet) override;
void eof() override;
diff --git a/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp b/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp
index faee5914a97..c14ed3bb1d9 100644
--- a/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp
@@ -10,7 +10,7 @@
#include <vespa/searchcore/proton/common/eventlogger.h>
#include <vespa/searchlib/common/idestructorcallback.h>
#include <vespa/vespalib/util/closuretask.h>
-
+#include <vespa/vespalib/util/lambdatask.h>
#include <vespa/log/log.h>
LOG_SETUP(".proton.server.feedstates");
@@ -20,6 +20,7 @@ using search::transactionlog::RPC;
using search::SerialNum;
using vespalib::Executor;
using vespalib::makeClosure;
+using vespalib::makeLambdaTask;
using vespalib::makeTask;
using vespalib::make_string;
using proton::bucketdb::IBucketDBHandler;
@@ -47,19 +48,19 @@ handleProgress(TlsReplayProgress &progress, SerialNum currentSerial)
}
void
-handlePacket(PacketWrapper::SP wrap, EntryHandler entryHandler)
+handlePacket(PacketWrapper & wrap, EntryHandler entryHandler)
{
- vespalib::nbostream_longlivedbuf handle(wrap->packet.getHandle().data(), wrap->packet.getHandle().size());
- while (handle.size() > 0) {
+ vespalib::nbostream_longlivedbuf handle(wrap.packet.getHandle().data(), wrap.packet.getHandle().size());
+ while ( !handle.empty() ) {
Packet::Entry entry;
entry.deserialize(handle);
entryHandler->call(entry);
- if (wrap->progress != nullptr) {
- handleProgress(*wrap->progress, entry.serial());
+ if (wrap.progress != nullptr) {
+ handleProgress(*wrap.progress, entry.serial());
}
}
- wrap->result = RPC::OK;
- wrap->gate.countDown();
+ wrap.result = RPC::OK;
+ wrap.gate.countDown();
}
class TransactionLogReplayPacketHandler : public IReplayPacketHandler {
@@ -67,6 +68,7 @@ class TransactionLogReplayPacketHandler : public IReplayPacketHandler {
IBucketDBHandler &_bucketDBHandler;
IReplayConfig &_replay_config;
FeedConfigStore &_config_store;
+ CommitTimeTracker _commitTimeTracker;
public:
TransactionLogReplayPacketHandler(IFeedView *& feed_view_ptr,
@@ -76,8 +78,9 @@ public:
: _feed_view_ptr(feed_view_ptr),
_bucketDBHandler(bucketDBHandler),
_replay_config(replay_config),
- _config_store(config_store) {
- }
+ _config_store(config_store),
+ _commitTimeTracker(100ms)
+ { }
void replay(const PutOperation &op) override {
_feed_view_ptr->handlePut(FeedToken(), op);
@@ -121,16 +124,20 @@ public:
const document::DocumentTypeRepo &getDeserializeRepo() override {
return *_feed_view_ptr->getDocumentTypeRepo();
}
+ void optionalCommit(search::SerialNum serialNum) override {
+ if (_commitTimeTracker.needCommit()) {
+ _feed_view_ptr->forceCommit(serialNum);
+ }
+ }
};
void startDispatch(IReplayPacketHandler *packet_handler, const Packet::Entry &entry) {
// Called by handlePacket() in executor thread.
- LOG(spam,
- "replay packet entry: entrySerial(%" PRIu64 "), entryType(%u)",
- entry.serial(), entry.type());
+ LOG(spam, "replay packet entry: entrySerial(%" PRIu64 "), entryType(%u)", entry.serial(), entry.type());
ReplayPacketDispatcher dispatcher(*packet_handler);
dispatcher.replayEntry(entry);
+ packet_handler->optionalCommit(entry.serial());
}
} // namespace
@@ -143,14 +150,13 @@ ReplayTransactionLogState::ReplayTransactionLogState(
FeedConfigStore &config_store)
: FeedState(REPLAY_TRANSACTION_LOG),
_doc_type_name(name),
- _packet_handler(new TransactionLogReplayPacketHandler(
- feed_view_ptr, bucketDBHandler,
- replay_config, config_store)) {
-}
+ _packet_handler(std::make_unique<TransactionLogReplayPacketHandler>(feed_view_ptr, bucketDBHandler, replay_config, config_store))
+{ }
-void ReplayTransactionLogState::receive(const PacketWrapper::SP &wrap, Executor &executor) {
+void
+ReplayTransactionLogState::receive(const PacketWrapper::SP &wrap, Executor &executor) {
EntryHandler closure = makeClosure(&startDispatch, _packet_handler.get());
- executor.execute(makeTask(makeClosure(&handlePacket, wrap, std::move(closure))));
+ executor.execute(makeLambdaTask([wrap = wrap, dispatch = std::move(closure)] () mutable { handlePacket(*wrap, std::move(dispatch)); }));
}
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/feedstates.h b/searchcore/src/vespa/searchcore/proton/server/feedstates.h
index a9224669d87..bf376bb8065 100644
--- a/searchcore/src/vespa/searchcore/proton/server/feedstates.h
+++ b/searchcore/src/vespa/searchcore/proton/server/feedstates.h
@@ -2,9 +2,10 @@
#pragma once
-#include <vespa/searchcore/proton/server/feedhandler.h>
-#include <vespa/searchcore/proton/server/feedstate.h>
-#include <vespa/searchcore/proton/server/ireplaypackethandler.h>
+#include "feedhandler.h"
+#include "feedstate.h"
+#include "ireplaypackethandler.h"
+#include <vespa/searchcore/proton/common/commit_time_tracker.h>
namespace proton {
diff --git a/searchcore/src/vespa/searchcore/proton/server/ireplaypackethandler.h b/searchcore/src/vespa/searchcore/proton/server/ireplaypackethandler.h
index 6de61e5eeb7..77caa52b5d5 100644
--- a/searchcore/src/vespa/searchcore/proton/server/ireplaypackethandler.h
+++ b/searchcore/src/vespa/searchcore/proton/server/ireplaypackethandler.h
@@ -2,6 +2,8 @@
#pragma once
+#include <vespa/searchlib/common/serialnum.h>
+
namespace document { class DocumentTypeRepo; }
namespace proton {
@@ -40,6 +42,7 @@ struct IReplayPacketHandler
virtual void replay(const MoveOperation &op) = 0;
virtual void replay(const CreateBucketOperation &op) = 0;
virtual void replay(const CompactLidSpaceOperation &op) = 0;
+ virtual void optionalCommit(search::SerialNum) = 0;
virtual feedoperation::IStreamHandler &getNewConfigStreamHandler() = 0;
virtual const document::DocumentTypeRepo &getDeserializeRepo() = 0;
diff --git a/searchcore/src/vespa/searchcore/proton/server/searchable_doc_subdb_configurer.cpp b/searchcore/src/vespa/searchcore/proton/server/searchable_doc_subdb_configurer.cpp
index 46d5a3282be..71b76c64ad5 100644
--- a/searchcore/src/vespa/searchcore/proton/server/searchable_doc_subdb_configurer.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/searchable_doc_subdb_configurer.cpp
@@ -39,7 +39,7 @@ SearchableDocSubDBConfigurer::reconfigureFeedView(const SearchView::SP &searchVi
void
SearchableDocSubDBConfigurer::reconfigureFeedView(const IIndexWriter::SP &indexWriter,
const ISummaryAdapter::SP &summaryAdapter,
- const IAttributeWriter::SP &attrWriter,
+ IAttributeWriter::SP attrWriter,
const Schema::SP &schema,
const std::shared_ptr<const DocumentTypeRepo> &repo,
const SearchView::SP &searchView)
@@ -52,9 +52,9 @@ SearchableDocSubDBConfigurer::reconfigureFeedView(const IIndexWriter::SP &indexW
curr->getGidToLidChangeHandler(),
repo,
curr->getWriteService(),
- curr->getLidReuseDelayer(), curr->getCommitTimeTracker()),
+ curr->getLidReuseDelayer()),
curr->getPersistentParams(),
- FastAccessFeedView::Context(attrWriter, curr->getDocIdLimit()),
+ FastAccessFeedView::Context(std::move(attrWriter), curr->getDocIdLimit()),
SearchableFeedView::Context(indexWriter)));
}
@@ -258,7 +258,7 @@ SearchableDocSubDBConfigurer::reconfigure(const DocumentDBConfig &newConfig,
SearchableFeedView::SP curr = _feedView.get();
reconfigureFeedView(curr->getIndexWriter(),
curr->getSummaryAdapter(),
- attrWriter,
+ std::move(attrWriter),
newConfig.getSchemaSP(),
newConfig.getDocumentTypeRepoSP(),
searchView);
diff --git a/searchcore/src/vespa/searchcore/proton/server/searchable_doc_subdb_configurer.h b/searchcore/src/vespa/searchcore/proton/server/searchable_doc_subdb_configurer.h
index 459c4651e67..806588e9f70 100644
--- a/searchcore/src/vespa/searchcore/proton/server/searchable_doc_subdb_configurer.h
+++ b/searchcore/src/vespa/searchcore/proton/server/searchable_doc_subdb_configurer.h
@@ -51,7 +51,7 @@ private:
void
reconfigureFeedView(const IIndexWriter::SP &indexWriter,
const ISummaryAdapter::SP &summaryAdapter,
- const IAttributeWriter::SP &attrWriter,
+ IAttributeWriter::SP attrWriter,
const search::index::Schema::SP &schema,
const std::shared_ptr<const document::DocumentTypeRepo> &repo,
const SearchView::SP &searchView);
diff --git a/searchcore/src/vespa/searchcore/proton/server/searchabledocsubdb.cpp b/searchcore/src/vespa/searchcore/proton/server/searchabledocsubdb.cpp
index 3ca8a4cff49..71241d98a5a 100644
--- a/searchcore/src/vespa/searchcore/proton/server/searchabledocsubdb.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/searchabledocsubdb.cpp
@@ -217,7 +217,7 @@ SearchableDocSubDB::initViews(const DocumentDBConfig &configSnapshot, const Sess
auto attrWriter = std::make_shared<AttributeWriter>(attrMgr);
{
std::lock_guard<std::mutex> guard(_configMutex);
- initFeedView(attrWriter, configSnapshot);
+ initFeedView(std::move(attrWriter), configSnapshot);
}
if (_addMetrics) {
reconfigureMatchingMetrics(configSnapshot.getRankProfilesConfig());
@@ -225,13 +225,13 @@ SearchableDocSubDB::initViews(const DocumentDBConfig &configSnapshot, const Sess
}
void
-SearchableDocSubDB::initFeedView(const IAttributeWriter::SP &attrWriter,
+SearchableDocSubDB::initFeedView(IAttributeWriter::SP attrWriter,
const DocumentDBConfig &configSnapshot)
{
assert(_writeService.master().isCurrentThread());
auto feedView = std::make_shared<SearchableFeedView>(getStoreOnlyFeedViewContext(configSnapshot),
getFeedViewPersistentParams(),
- FastAccessFeedView::Context(attrWriter, _docIdLimit),
+ FastAccessFeedView::Context(std::move(attrWriter), _docIdLimit),
SearchableFeedView::Context(getIndexWriter()));
// XXX: Not exception safe.
diff --git a/searchcore/src/vespa/searchcore/proton/server/searchabledocsubdb.h b/searchcore/src/vespa/searchcore/proton/server/searchabledocsubdb.h
index f3cf95ebfa0..70bb905cb55 100644
--- a/searchcore/src/vespa/searchcore/proton/server/searchabledocsubdb.h
+++ b/searchcore/src/vespa/searchcore/proton/server/searchabledocsubdb.h
@@ -92,7 +92,7 @@ private:
std::shared_ptr<searchcorespi::IIndexManager::SP> indexManager) const;
void setupIndexManager(searchcorespi::IIndexManager::SP indexManager);
- void initFeedView(const IAttributeWriter::SP &attrWriter, const DocumentDBConfig &configSnapshot);
+ void initFeedView(IAttributeWriter::SP attrWriter, const DocumentDBConfig &configSnapshot);
void reconfigureMatchingMetrics(const vespa::config::search::RankProfilesConfig &config);
bool reconfigure(vespalib::Closure0<bool>::UP closure) override;
@@ -108,7 +108,7 @@ protected:
void updateLidReuseDelayer(const LidReuseDelayerConfig &config) override;
public:
SearchableDocSubDB(const Config &cfg, const Context &ctx);
- ~SearchableDocSubDB();
+ ~SearchableDocSubDB() override;
std::unique_ptr<DocumentSubDbInitializer>
createInitializer(const DocumentDBConfig &configSnapshot, SerialNum configSerialNum,
diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlydocsubdb.cpp b/searchcore/src/vespa/searchcore/proton/server/storeonlydocsubdb.cpp
index 59a115ce5d1..1e5bbe023c2 100644
--- a/searchcore/src/vespa/searchcore/proton/server/storeonlydocsubdb.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/storeonlydocsubdb.cpp
@@ -123,7 +123,6 @@ StoreOnlyDocSubDB::StoreOnlyDocSubDB(const Config &cfg, const Context &ctx)
_subDbType(cfg._subDbType),
_fileHeaderContext(*this, ctx._fileHeaderContext, _docTypeName, _baseDir),
_lidReuseDelayer(),
- _commitTimeTracker(3600s),
_gidToLidChangeHandler(std::make_shared<DummyGidToLidChangeHandler>())
{
vespalib::mkdir(_baseDir, false); // Assume parent is created.
@@ -195,7 +194,6 @@ StoreOnlyDocSubDB::onReplayDone()
void
StoreOnlyDocSubDB::onReprocessDone(SerialNum)
{
- _commitTimeTracker.setReplayDone();
}
@@ -343,7 +341,7 @@ StoreOnlyDocSubDB::getStoreOnlyFeedViewContext(const DocumentDBConfig &configSna
{
return StoreOnlyFeedView::Context(getSummaryAdapter(), configSnapshot.getSchemaSP(), _metaStoreCtx,
*_gidToLidChangeHandler, configSnapshot.getDocumentTypeRepoSP(), _writeService,
- *_lidReuseDelayer, _commitTimeTracker);
+ *_lidReuseDelayer);
}
StoreOnlyFeedView::PersistentParams
@@ -423,7 +421,6 @@ StoreOnlyDocSubDB::updateLidReuseDelayer(const LidReuseDelayerConfig &config)
* feed view before applying the new config to the sub dbs.
*/
_lidReuseDelayer->setImmediateCommit(immediateCommit);
- _commitTimeTracker.setVisibilityDelay(config.visibilityDelay());
}
IReprocessingTask::List
diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlydocsubdb.h b/searchcore/src/vespa/searchcore/proton/server/storeonlydocsubdb.h
index 700a6d29460..633201cd5d3 100644
--- a/searchcore/src/vespa/searchcore/proton/server/storeonlydocsubdb.h
+++ b/searchcore/src/vespa/searchcore/proton/server/storeonlydocsubdb.h
@@ -14,7 +14,6 @@
#include <vespa/searchcore/proton/documentmetastore/documentmetastoreflushtarget.h>
#include <vespa/searchcore/proton/documentmetastore/ilidreusedelayer.h>
#include <vespa/searchcore/proton/summaryengine/isearchhandler.h>
-#include <vespa/searchcore/proton/common/commit_time_tracker.h>
#include <vespa/searchcore/proton/persistenceengine/i_document_retriever.h>
#include <vespa/searchlib/common/fileheadercontext.h>
#include <vespa/vespalib/util/varholder.h>
@@ -166,7 +165,6 @@ protected:
const SubDbType _subDbType;
StoreOnlySubDBFileHeaderContext _fileHeaderContext;
std::unique_ptr<documentmetastore::ILidReuseDelayer> _lidReuseDelayer;
- CommitTimeTracker _commitTimeTracker;
std::shared_ptr<IGidToLidChangeHandler> _gidToLidChangeHandler;
std::shared_ptr<initializer::InitializerTask>
@@ -195,7 +193,7 @@ protected:
void reconfigure(const search::LogDocumentStore::Config & protonConfig);
public:
StoreOnlyDocSubDB(const Config &cfg, const Context &ctx);
- ~StoreOnlyDocSubDB();
+ ~StoreOnlyDocSubDB() override;
uint32_t getSubDbId() const override { return _subDbId; }
vespalib::string getName() const override { return _subName; }
diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp
index bd5f2775735..c07d30b7f3a 100644
--- a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp
@@ -1,19 +1,18 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include "storeonlyfeedview.h"
#include "forcecommitcontext.h"
#include "ireplayconfig.h"
#include "operationdonecontext.h"
#include "putdonecontext.h"
#include "remove_batch_done_context.h"
#include "removedonecontext.h"
-#include "storeonlyfeedview.h"
#include "updatedonecontext.h"
#include <vespa/document/datatype/documenttype.h>
#include <vespa/document/fieldvalue/document.h>
#include <vespa/document/repo/documenttyperepo.h>
#include <vespa/searchcore/proton/attribute/attribute_utils.h>
#include <vespa/searchcore/proton/attribute/ifieldupdatecallback.h>
-#include <vespa/searchcore/proton/common/commit_time_tracker.h>
#include <vespa/searchcore/proton/common/feedtoken.h>
#include <vespa/searchcore/proton/documentmetastore/ilidreusedelayer.h>
#include <vespa/searchcore/proton/feedoperation/operations.h>
@@ -198,7 +197,6 @@ StoreOnlyFeedView::StoreOnlyFeedView(const Context &ctx, const PersistentParams
_repo(ctx._repo),
_docType(nullptr),
_lidReuseDelayer(ctx._lidReuseDelayer),
- _commitTimeTracker(ctx._commitTimeTracker),
_pendingLidTracker(),
_schema(ctx._schema),
_writeService(ctx._writeService),
@@ -237,7 +235,7 @@ StoreOnlyFeedView::forceCommit(SerialNum serialNum, OnForceCommitDoneType onComm
void
StoreOnlyFeedView::considerEarlyAck(FeedToken & token)
{
- if (_commitTimeTracker.hasVisibilityDelay() && token) {
+ if ( ! needCommit() && token) {
token.reset();
}
}
@@ -288,7 +286,7 @@ StoreOnlyFeedView::internalPut(FeedToken token, const PutOperation &putOp)
bool docAlreadyExists = putOp.getValidPrevDbdId(_params._subDbId);
if (putOp.getValidDbdId(_params._subDbId)) {
- bool immediateCommit = _commitTimeTracker.needCommit();
+ bool immediateCommit = needCommit();
const document::GlobalId &gid = docId.getGlobalId();
std::shared_ptr<PutDoneContext> onWriteDone =
createPutDoneContext(std::move(token), _gidToLidChangeHandler, doc, gid, putOp.getLid(), serialNum,
@@ -304,6 +302,11 @@ StoreOnlyFeedView::internalPut(FeedToken token, const PutOperation &putOp)
}
}
+bool
+StoreOnlyFeedView::needCommit() const {
+ return _lidReuseDelayer.getImmediateCommit();
+}
+
void
StoreOnlyFeedView::heartBeatIndexedFields(SerialNum ) {}
@@ -440,7 +443,7 @@ StoreOnlyFeedView::internalUpdate(FeedToken token, const UpdateOperation &updOp)
}
considerEarlyAck(token);
- bool immediateCommit = _commitTimeTracker.needCommit();
+ bool immediateCommit = needCommit();
auto onWriteDone = createUpdateDoneContext(std::move(token), updOp.getUpdate());
UpdateScope updateScope(*_schema, upd);
updateAttributes(serialNum, lid, upd, immediateCommit, onWriteDone, updateScope);
@@ -612,7 +615,7 @@ StoreOnlyFeedView::internalRemove(FeedToken token, SerialNum serialNum,
std::move(pendingNotifyRemoveDone), (explicitReuseLid ? lid : 0u),
std::move(moveDoneCtx));
removeSummary(serialNum, lid, onWriteDone);
- bool immediateCommit = _commitTimeTracker.needCommit();
+ bool immediateCommit = needCommit();
removeAttributes(serialNum, lid, immediateCommit, onWriteDone);
removeIndexedFields(serialNum, lid, immediateCommit, onWriteDone);
}
@@ -725,7 +728,7 @@ StoreOnlyFeedView::handleDeleteBucket(const DeleteBucketOperation &delOp)
void
StoreOnlyFeedView::internalDeleteBucket(const DeleteBucketOperation &delOp)
{
- bool immediateCommit = _commitTimeTracker.needCommit();
+ bool immediateCommit = needCommit();
size_t rm_count = removeDocuments(delOp, true, immediateCommit);
LOG(debug, "internalDeleteBucket(): docType(%s), bucket(%s), lidsToRemove(%zu)",
_params._docTypeName.toString().c_str(), delOp.getBucketId().toString().c_str(), rm_count);
@@ -764,7 +767,7 @@ StoreOnlyFeedView::handleMove(const MoveOperation &moveOp, IDestructorCallback::
PendingNotifyRemoveDone pendingNotifyRemoveDone = adjustMetaStore(moveOp, docId.getGlobalId(), docId);
bool docAlreadyExists = moveOp.getValidPrevDbdId(_params._subDbId);
if (moveOp.getValidDbdId(_params._subDbId)) {
- bool immediateCommit = _commitTimeTracker.needCommit();
+ bool immediateCommit = needCommit();
const document::GlobalId &gid = docId.getGlobalId();
std::shared_ptr<PutDoneContext> onWriteDone =
createPutDoneContext(FeedToken(), _gidToLidChangeHandler, doc, gid, moveOp.getLid(), serialNum,
diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h
index 1d48785b8a9..78b0d0a72d2 100644
--- a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h
+++ b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h
@@ -32,7 +32,6 @@ class ForceCommitContext;
class OperationDoneContext;
class PutDoneContext;
class RemoveDoneContext;
-class CommitTimeTracker;
class IGidToLidChangeHandler;
struct IFieldUpdateCallback;
class RemoveDocumentsOperation;
@@ -81,7 +80,6 @@ public:
const std::shared_ptr<const document::DocumentTypeRepo> &_repo;
searchcorespi::index::IThreadingService &_writeService;
documentmetastore::ILidReuseDelayer &_lidReuseDelayer;
- CommitTimeTracker &_commitTimeTracker;
Context(const ISummaryAdapter::SP &summaryAdapter,
const search::index::Schema::SP &schema,
@@ -89,16 +87,14 @@ public:
IGidToLidChangeHandler &gidToLidChangeHandler,
const std::shared_ptr<const document::DocumentTypeRepo> &repo,
searchcorespi::index::IThreadingService &writeService,
- documentmetastore::ILidReuseDelayer &lidReuseDelayer,
- CommitTimeTracker &commitTimeTracker)
+ documentmetastore::ILidReuseDelayer &lidReuseDelayer)
: _summaryAdapter(summaryAdapter),
_schema(schema),
_documentMetaStoreContext(documentMetaStoreContext),
_gidToLidChangeHandler(gidToLidChangeHandler),
_repo(repo),
_writeService(writeService),
- _lidReuseDelayer(lidReuseDelayer),
- _commitTimeTracker(commitTimeTracker)
+ _lidReuseDelayer(lidReuseDelayer)
{}
};
@@ -145,7 +141,6 @@ private:
const std::shared_ptr<const document::DocumentTypeRepo> _repo;
const document::DocumentType *_docType;
documentmetastore::ILidReuseDelayer &_lidReuseDelayer;
- CommitTimeTracker &_commitTimeTracker;
PendingLidTracker _pendingLidTracker;
protected:
@@ -163,6 +158,7 @@ private:
void putSummary(SerialNum serialNum, Lid lid, DocumentSP doc, OnOperationDoneType onDone);
void removeSummary(SerialNum serialNum, Lid lid, OnWriteDoneType onDone);
void heartBeatSummary(SerialNum serialNum);
+ bool needCommit() const;
bool useDocumentStore(SerialNum replaySerialNum) const {
@@ -236,7 +232,6 @@ public:
const IDocumentMetaStoreContext::SP &getDocumentMetaStore() const { return _documentMetaStoreContext; }
searchcorespi::index::IThreadingService &getWriteService() { return _writeService; }
documentmetastore::ILidReuseDelayer &getLidReuseDelayer() { return _lidReuseDelayer; }
- CommitTimeTracker &getCommitTimeTracker() { return _commitTimeTracker; }
IGidToLidChangeHandler &getGidToLidChangeHandler() const { return _gidToLidChangeHandler; }
const std::shared_ptr<const document::DocumentTypeRepo> &getDocumentTypeRepo() const override { return _repo; }