diff options
Diffstat (limited to 'searchcore/src')
36 files changed, 275 insertions, 768 deletions
diff --git a/searchcore/src/tests/proton/documentdb/configurer/configurer_test.cpp b/searchcore/src/tests/proton/documentdb/configurer/configurer_test.cpp index 315fb7e86eb..87efd74659d 100644 --- a/searchcore/src/tests/proton/documentdb/configurer/configurer_test.cpp +++ b/searchcore/src/tests/proton/documentdb/configurer/configurer_test.cpp @@ -9,7 +9,6 @@ #include <vespa/searchcore/proton/documentmetastore/documentmetastore.h> #include <vespa/searchcore/proton/documentmetastore/lidreusedelayer.h> #include <vespa/searchcore/proton/matching/error_constant_value.h> -#include <vespa/searchcore/proton/metrics/feed_metrics.h> #include <vespa/searchcore/proton/index/index_writer.h> #include <vespa/searchcore/proton/index/indexmanager.h> #include <vespa/searchcore/proton/reprocessing/attribute_reprocessing_initializer.h> @@ -47,7 +46,7 @@ using proton::matching::SessionManager; using searchcorespi::IndexSearchable; using searchcorespi::index::IThreadingService; using proton::test::MockGidToLidChangeHandler; - +using std::make_shared; using CCR = DocumentDBConfig::ComparisonResult; using Configurer = SearchableDocSubDBConfigurer; @@ -178,14 +177,8 @@ Fixture::Fixture() vespalib::rmdir(BASE_DIR, true); vespalib::mkdir(BASE_DIR); initViewSet(_views); - _configurer.reset(new Configurer(_views._summaryMgr, - _views.searchView, - _views.feedView, - _queryLimiter, - _constantValueRepo, - _clock, - "test", - 0)); + _configurer.reset(new Configurer(_views._summaryMgr, _views.searchView, _views.feedView, _queryLimiter, + _constantValueRepo, _clock, "test", 0)); } Fixture::~Fixture() {} @@ -193,53 +186,33 @@ void Fixture::initViewSet(ViewSet &views) { Matchers::SP matchers(new Matchers(_clock, _queryLimiter, _constantValueRepo)); - IndexManager::SP indexMgr(new IndexManager(BASE_DIR, searchcorespi::index::WarmupConfig(), - 2, 0, Schema(), 1, views._reconfigurer, - views._writeService, _summaryExecutor, TuneFileIndexManager(), - TuneFileAttributes(), views._fileHeaderContext)); - AttributeManager::SP attrMgr(new AttributeManager(BASE_DIR, - "test.subdb", - TuneFileAttributes(), - views._fileHeaderContext, - views._writeService. - attributeFieldWriter(), - views._hwInfo)); + auto indexMgr = make_shared<IndexManager>(BASE_DIR, searchcorespi::index::WarmupConfig(), 2, 0, Schema(), 1, + views._reconfigurer, views._writeService, _summaryExecutor, + TuneFileIndexManager(), TuneFileAttributes(), views._fileHeaderContext); + auto attrMgr = make_shared<AttributeManager>(BASE_DIR, "test.subdb", TuneFileAttributes(), views._fileHeaderContext, + views._writeService.attributeFieldWriter(),views._hwInfo); ProtonConfig protonCfg; - SummaryManager::SP summaryMgr( - new SummaryManager(_summaryExecutor, search::LogDocumentStore::Config(), - GrowStrategy(), BASE_DIR, views._docTypeName, - TuneFileSummary(), views._fileHeaderContext, - views._noTlSyncer, search::IBucketizer::SP())); - SessionManager::SP sesMgr( - new SessionManager(protonCfg.grouping.sessionmanager.maxentries)); - DocumentMetaStoreContext::SP metaStore( - new DocumentMetaStoreContext(std::make_shared<BucketDBOwner>())); + auto summaryMgr = make_shared<SummaryManager> + (_summaryExecutor, search::LogDocumentStore::Config(), GrowStrategy(), BASE_DIR, views._docTypeName, + TuneFileSummary(), views._fileHeaderContext,views._noTlSyncer, search::IBucketizer::SP()); + auto sesMgr = make_shared<SessionManager>(protonCfg.grouping.sessionmanager.maxentries); + auto metaStore = make_shared<DocumentMetaStoreContext>(make_shared<BucketDBOwner>()); IIndexWriter::SP indexWriter(new IndexWriter(indexMgr)); AttributeWriter::SP attrWriter(new AttributeWriter(attrMgr)); ISummaryAdapter::SP summaryAdapter(new SummaryAdapter(summaryMgr)); - views._gidToLidChangeHandler = std::make_shared<MockGidToLidChangeHandler>(); + views._gidToLidChangeHandler = make_shared<MockGidToLidChangeHandler>(); Schema::SP schema(new Schema()); views._summaryMgr = summaryMgr; views._dmsc = metaStore; - views._lidReuseDelayer.reset( - new documentmetastore::LidReuseDelayer(views._writeService, - metaStore->get())); + views._lidReuseDelayer.reset(new documentmetastore::LidReuseDelayer(views._writeService, metaStore->get())); IndexSearchable::SP indexSearchable; - MatchView::SP matchView(new MatchView(matchers, indexSearchable, attrMgr, - sesMgr, metaStore, views._docIdLimit)); - views.searchView.set( - SearchView::SP( - new SearchView( - summaryMgr->createSummarySetup(SummaryConfig(), - SummarymapConfig(), - JuniperrcConfig(), - views.repo, - attrMgr), - matchView))); - PerDocTypeFeedMetrics metrics(0); + MatchView::SP matchView(new MatchView(matchers, indexSearchable, attrMgr, sesMgr, metaStore, views._docIdLimit)); + views.searchView.set(make_shared<SearchView> + (summaryMgr->createSummarySetup(SummaryConfig(), SummarymapConfig(), + JuniperrcConfig(), views.repo, attrMgr), + matchView)); views.feedView.set( - SearchableFeedView::SP( - new SearchableFeedView(StoreOnlyFeedView::Context(summaryAdapter, + make_shared<SearchableFeedView>(StoreOnlyFeedView::Context(summaryAdapter, schema, views.searchView.get()->getDocumentMetaStore(), *views._gidToLidChangeHandler, @@ -251,11 +224,10 @@ Fixture::initViewSet(ViewSet &views) views.serialNum, views.serialNum, views._docTypeName, - metrics, 0u /* subDbId */, SubDbType::READY), FastAccessFeedView::Context(attrWriter, views._docIdLimit), - SearchableFeedView::Context(indexWriter)))); + SearchableFeedView::Context(indexWriter))); } @@ -263,7 +235,6 @@ using MySummaryAdapter = test::MockSummaryAdapter; struct MyFastAccessFeedView { - PerDocTypeFeedMetrics _metrics; DummyFileHeaderContext _fileHeaderContext; DocIdLimit _docIdLimit; IThreadingService &_writeService; @@ -276,13 +247,12 @@ struct MyFastAccessFeedView VarHolder<FastAccessFeedView::SP> _feedView; MyFastAccessFeedView(IThreadingService &writeService) - : _metrics(0), - _fileHeaderContext(), + : _fileHeaderContext(), _docIdLimit(0), _writeService(writeService), _hwInfo(), _dmsc(), - _gidToLidChangeHandler(std::make_shared<DummyGidToLidChangeHandler>()), + _gidToLidChangeHandler(make_shared<DummyGidToLidChangeHandler>()), _lidReuseDelayer(), _commitTimeTracker(TimeStamp()), _feedView() @@ -295,30 +265,21 @@ struct MyFastAccessFeedView void init() { ISummaryAdapter::SP summaryAdapter(new MySummaryAdapter()); Schema::SP schema(new Schema()); - DocumentMetaStoreContext::SP docMetaCtx( - new DocumentMetaStoreContext(std::make_shared<BucketDBOwner>())); - _dmsc = docMetaCtx; - _lidReuseDelayer.reset( - new documentmetastore::LidReuseDelayer(_writeService, - docMetaCtx->get())); + _dmsc = make_shared<DocumentMetaStoreContext>(std::make_shared<BucketDBOwner>()); + _lidReuseDelayer.reset(new documentmetastore::LidReuseDelayer(_writeService, _dmsc->get())); DocumentTypeRepo::SP repo = createRepo(); - StoreOnlyFeedView::Context storeOnlyCtx(summaryAdapter, schema, docMetaCtx, *_gidToLidChangeHandler, repo, _writeService, *_lidReuseDelayer, _commitTimeTracker); - StoreOnlyFeedView::PersistentParams params(1, 1, DocTypeName(DOC_TYPE), _metrics, 0, SubDbType::NOTREADY); - AttributeManager::SP mgr(new AttributeManager(BASE_DIR, "test.subdb", - TuneFileAttributes(), - _fileHeaderContext, - _writeService. - attributeFieldWriter(), - _hwInfo)); + StoreOnlyFeedView::Context storeOnlyCtx(summaryAdapter, schema, _dmsc, *_gidToLidChangeHandler, repo, + _writeService, *_lidReuseDelayer, _commitTimeTracker); + StoreOnlyFeedView::PersistentParams params(1, 1, DocTypeName(DOC_TYPE), 0, SubDbType::NOTREADY); + auto mgr = make_shared<AttributeManager>(BASE_DIR, "test.subdb", TuneFileAttributes(), _fileHeaderContext, + _writeService.attributeFieldWriter(), _hwInfo); IAttributeWriter::SP writer(new AttributeWriter(mgr)); FastAccessFeedView::Context fastUpdateCtx(writer, _docIdLimit); - _feedView.set(FastAccessFeedView::SP(new FastAccessFeedView(storeOnlyCtx, - params, fastUpdateCtx)));; + _feedView.set(FastAccessFeedView::SP(new FastAccessFeedView(storeOnlyCtx, params, fastUpdateCtx)));; } }; -MyFastAccessFeedView::~MyFastAccessFeedView() { -} +MyFastAccessFeedView::~MyFastAccessFeedView() = default; struct FastAccessFixture { @@ -328,8 +289,7 @@ struct FastAccessFixture FastAccessFixture() : _writeService(), _view(_writeService), - _configurer(_view._feedView, - IAttributeWriterFactory::UP(new AttributeWriterFactory), "test") + _configurer(_view._feedView, IAttributeWriterFactory::UP(new AttributeWriterFactory), "test") { vespalib::rmdir(BASE_DIR, true); vespalib::mkdir(BASE_DIR); @@ -339,11 +299,10 @@ struct FastAccessFixture } }; - DocumentDBConfig::SP createConfig() { - return test::DocumentDBConfigBuilder(0, std::make_shared<Schema>(), "client", DOC_TYPE). + return test::DocumentDBConfigBuilder(0, make_shared<Schema>(), "client", DOC_TYPE). repo(createRepo()).build(); } diff --git a/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp b/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp index cccbbededd1..b0b06a238c9 100644 --- a/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp +++ b/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp @@ -14,7 +14,6 @@ #include <vespa/searchcore/proton/feedoperation/removeoperation.h> #include <vespa/searchcore/proton/feedoperation/updateoperation.h> #include <vespa/searchcore/proton/feedoperation/wipehistoryoperation.h> -#include <vespa/searchcore/proton/metrics/feed_metrics.h> #include <vespa/searchcore/proton/persistenceengine/i_resource_write_filter.h> #include <vespa/searchcore/proton/server/configstore.h> #include <vespa/searchcore/proton/server/ddbstate.h> @@ -306,7 +305,7 @@ struct MyTransport : public FeedToken::ITransport { bool documentWasFound; MyTransport(); ~MyTransport(); - virtual void send(Reply::UP, ResultUP res, bool documentWasFound_, double) override { + void send(ResultUP res, bool documentWasFound_) override { result = std::move(res); documentWasFound = documentWasFound_; gate.countDown(); @@ -316,21 +315,12 @@ struct MyTransport : public FeedToken::ITransport { MyTransport::MyTransport() : gate(), result(), documentWasFound(false) {} MyTransport::~MyTransport() {} -Reply::UP getReply(uint32_t type) { - if (type == DocumentProtocol::REPLY_REMOVEDOCUMENT) { - return Reply::UP(new RemoveDocumentReply); - } else if (type == DocumentProtocol::REPLY_UPDATEDOCUMENT) { - return Reply::UP(new UpdateDocumentReply); - } - return Reply::UP(new DocumentReply(type)); -} - struct FeedTokenContext { MyTransport transport; FeedToken::UP token_ap; FeedToken &token; - FeedTokenContext(uint32_t type = 0); + FeedTokenContext(); ~FeedTokenContext(); bool await(uint32_t timeout = 80000) { return transport.gate.await(timeout); } const Result *getResult() { @@ -341,24 +331,23 @@ struct FeedTokenContext { } }; -FeedTokenContext::FeedTokenContext(uint32_t type) +FeedTokenContext::FeedTokenContext() : transport(), - token_ap(new FeedToken(transport, getReply(type))), + token_ap(new FeedToken(transport)), token(*token_ap) { - token.getReply().getTrace().setLevel(9); } -FeedTokenContext::~FeedTokenContext() {} + +FeedTokenContext::~FeedTokenContext() = default; struct PutContext { FeedTokenContext tokenCtx; DocumentContext docCtx; typedef std::shared_ptr<PutContext> SP; PutContext(const vespalib::string &docId, DocBuilder &builder) : - tokenCtx(DocumentProtocol::REPLY_PUTDOCUMENT), + tokenCtx(), docCtx(docId, builder) - { - } + {} }; @@ -372,12 +361,10 @@ struct PutHandler { builder(db), timestamp(0), puts() - { - } + {} void put(const vespalib::string &docId) { PutContext::SP pc(new PutContext(docId, builder)); - FeedOperation::UP op(new PutOperation(pc->docCtx.bucketId, - timestamp, pc->docCtx.doc)); + FeedOperation::UP op(new PutOperation(pc->docCtx.bucketId, timestamp, pc->docCtx.doc)); handler.handleOperation(pc->tokenCtx.token, std::move(op)); timestamp = Timestamp(timestamp + 1); puts.push_back(pc); @@ -393,18 +380,6 @@ struct PutHandler { }; -struct MyFeedMetrics : public metrics::MetricSet -{ - PerDocTypeFeedMetrics _feed; - - MyFeedMetrics() - : metrics::MetricSet("myfeedmetrics", "", "My feed metrics", NULL), - _feed(this) - { - } -}; - - struct MyTlsWriter : TlsWriter { int store_count; int erase_count; @@ -419,7 +394,6 @@ struct MyTlsWriter : TlsWriter { } }; - struct FeedHandlerFixture { DummyFileHeaderContext _fileHeaderContext; @@ -432,7 +406,6 @@ struct FeedHandlerFixture DDBState _state; MyReplayConfig replayConfig; MyFeedView feedView; - MyFeedMetrics feedMetrics; MyTlsWriter tls_writer; BucketDBOwner _bucketDB; bucketdb::BucketDBHandler _bucketDBHandler; @@ -449,8 +422,8 @@ struct FeedHandlerFixture feedView(schema.getRepo()), _bucketDB(), _bucketDBHandler(_bucketDB), - handler(writeService, tlsSpec, schema.getDocType(), - feedMetrics._feed, _state, owner, writeFilter, replayConfig, tls, &tls_writer) + handler(writeService, tlsSpec, schema.getDocType(), _state, owner, + writeFilter, replayConfig, tls, &tls_writer) { _state.enterLoadState(); _state.enterReplayTransactionLogState(); @@ -507,12 +480,10 @@ TEST_F("require that heartBeat calls FeedView's heartBeat", TEST_F("require that outdated remove is ignored", FeedHandlerFixture) { DocumentContext doc_context("doc:test:foo", *f.schema.builder); - FeedOperation::UP op(new RemoveOperation(doc_context.bucketId, - Timestamp(10), - doc_context.doc->getId())); + FeedOperation::UP op(new RemoveOperation(doc_context.bucketId, Timestamp(10), doc_context.doc->getId())); static_cast<DocumentOperation &>(*op).setPrevDbDocumentId(DbDocumentId(4)); static_cast<DocumentOperation &>(*op).setPrevTimestamp(Timestamp(10000)); - FeedTokenContext token_context(DocumentProtocol::REPLY_REMOVEDOCUMENT); + FeedTokenContext token_context; f.handler.performOperation(std::move(token_context.token_ap), std::move(op)); EXPECT_EQUAL(0, f.feedView.remove_count); EXPECT_EQUAL(0, f.tls_writer.store_count); @@ -599,28 +570,21 @@ TEST_F("require that flush cannot unprune", FeedHandlerFixture) EXPECT_EQUAL(10u, f.handler.getPrunedSerialNum()); } -TEST_F("require that remove of unknown document with known data type " - "stores remove", FeedHandlerFixture) +TEST_F("require that remove of unknown document with known data type stores remove", FeedHandlerFixture) { - DocumentContext doc_context("id:test:searchdocument::foo", - *f.schema.builder); - FeedOperation::UP op(new RemoveOperation(doc_context.bucketId, - Timestamp(10), - doc_context.doc->getId())); - FeedTokenContext token_context(DocumentProtocol::REPLY_REMOVEDOCUMENT); + DocumentContext doc_context("id:test:searchdocument::foo", *f.schema.builder); + FeedOperation::UP op(new RemoveOperation(doc_context.bucketId, Timestamp(10), doc_context.doc->getId())); + FeedTokenContext token_context; f.handler.performOperation(std::move(token_context.token_ap), std::move(op)); EXPECT_EQUAL(1, f.feedView.remove_count); EXPECT_EQUAL(1, f.tls_writer.store_count); } -TEST_F("require that partial update for non-existing document is tagged as such", - FeedHandlerFixture) +TEST_F("require that partial update for non-existing document is tagged as such", FeedHandlerFixture) { UpdateContext upCtx("id:test:searchdocument::foo", *f.schema.builder); - FeedOperation::UP op(new UpdateOperation(upCtx.bucketId, - Timestamp(10), - upCtx.update)); - FeedTokenContext token_context(DocumentProtocol::REPLY_UPDATEDOCUMENT); + FeedOperation::UP op(new UpdateOperation(upCtx.bucketId, Timestamp(10), upCtx.update)); + FeedTokenContext token_context; f.handler.performOperation(std::move(token_context.token_ap), std::move(op)); const UpdateResult *result = static_cast<const UpdateResult *>(token_context.getResult()); @@ -631,18 +595,14 @@ TEST_F("require that partial update for non-existing document is tagged as such" EXPECT_EQUAL(0, f.tls_writer.store_count); } -TEST_F("require that partial update for non-existing document is created if specified", - FeedHandlerFixture) +TEST_F("require that partial update for non-existing document is created if specified", FeedHandlerFixture) { f.handler.setSerialNum(15); UpdateContext upCtx("id:test:searchdocument::foo", *f.schema.builder); upCtx.update->setCreateIfNonExistent(true); - f.feedView.metaStore.insert(upCtx.update->getId().getGlobalId(), - MyDocumentMetaStore::Entry(5, 5, Timestamp(10))); - FeedOperation::UP op(new UpdateOperation(upCtx.bucketId, - Timestamp(10), - upCtx.update)); - FeedTokenContext token_context(DocumentProtocol::REPLY_UPDATEDOCUMENT); + f.feedView.metaStore.insert(upCtx.update->getId().getGlobalId(), MyDocumentMetaStore::Entry(5, 5, Timestamp(10))); + FeedOperation::UP op(new UpdateOperation(upCtx.bucketId, Timestamp(10), upCtx.update)); + FeedTokenContext token_context; f.handler.performOperation(std::move(token_context.token_ap), std::move(op)); const UpdateResult *result = static_cast<const UpdateResult *>(token_context.getResult()); @@ -678,7 +638,7 @@ TEST_F("require that update is rejected if resource limit is reached", FeedHandl UpdateContext updCtx("id:test:searchdocument::foo", *f.schema.builder); FeedOperation::UP op = std::make_unique<UpdateOperation>(updCtx.bucketId, Timestamp(10), updCtx.update); - FeedTokenContext token(DocumentProtocol::REPLY_UPDATEDOCUMENT); + FeedTokenContext token; f.handler.performOperation(std::move(token.token_ap), std::move(op)); EXPECT_EQUAL(0, f.feedView.update_count); EXPECT_TRUE(dynamic_cast<const UpdateResult *>(token.getResult())); @@ -694,7 +654,7 @@ TEST_F("require that remove is NOT rejected if resource limit is reached", FeedH DocumentContext docCtx("id:test:searchdocument::foo", *f.schema.builder); FeedOperation::UP op = std::make_unique<RemoveOperation>(docCtx.bucketId, Timestamp(10), docCtx.doc->getId()); - FeedTokenContext token(DocumentProtocol::REPLY_REMOVEDOCUMENT); + FeedTokenContext token; f.handler.performOperation(std::move(token.token_ap), std::move(op)); EXPECT_EQUAL(1, f.feedView.remove_count); EXPECT_EQUAL(Result::NONE, token.getResult()->getErrorCode()); diff --git a/searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp b/searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp index c820a9f392c..4eefbed0a53 100644 --- a/searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp +++ b/searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp @@ -10,7 +10,6 @@ #include <vespa/searchcore/proton/common/feedtoken.h> #include <vespa/searchcore/proton/documentmetastore/lidreusedelayer.h> #include <vespa/searchcore/proton/index/i_index_writer.h> -#include <vespa/searchcore/proton/metrics/feed_metrics.h> #include <vespa/searchcore/proton/server/executorthreadingservice.h> #include <vespa/searchcore/proton/server/ifrozenbuckethandler.h> #include <vespa/searchcore/proton/server/isummaryadapter.h> @@ -30,9 +29,6 @@ #include <vespa/searchlib/docstore/cachestats.h> #include <vespa/searchlib/docstore/idocumentstore.h> #include <vespa/searchlib/index/docbuilder.h> -#include <vespa/vespalib/testkit/testapp.h> -#include <vespa/vespalib/util/blockingthreadstackexecutor.h> -#include <mutex> #include <vespa/log/log.h> LOG_SETUP("feedview_test"); @@ -132,8 +128,6 @@ struct MyTracer struct ParamsContext { DocTypeName _docTypeName; - FeedMetrics _feedMetrics; - PerDocTypeFeedMetrics _metrics; SearchableFeedView::PersistentParams _params; ParamsContext(const vespalib::string &docType, const vespalib::string &baseDir); @@ -143,9 +137,7 @@ struct ParamsContext ParamsContext::ParamsContext(const vespalib::string &docType, const vespalib::string &baseDir) : _docTypeName(docType), - _feedMetrics(), - _metrics(&_feedMetrics), - _params(0, 0, _docTypeName, _metrics, subdb_id, SubDbType::READY) + _params(0, 0, _docTypeName, subdb_id, SubDbType::READY) { (void) baseDir; } @@ -420,11 +412,7 @@ struct MyTransport : public FeedToken::ITransport MyTracer &_tracer; MyTransport(MyTracer &tracer); ~MyTransport(); - virtual void send(mbus::Reply::UP reply, - ResultUP result, - bool documentWasFound, - double latency_ms) override { - (void) reply; (void) documentWasFound, (void) latency_ms; + void send(ResultUP result, bool ) override { lastResult = std::move(result); _tracer.traceAck(lastResult); _gate.countDown(); @@ -492,36 +480,20 @@ DocumentContext::DocumentContext(const vespalib::string &docId, uint64_t timesta {} DocumentContext::~DocumentContext() {} -namespace { - -mbus::Reply::UP -createReply(MessageType mtype) -{ - if (mtype == DocumentProtocol::REPLY_UPDATEDOCUMENT) { - return mbus::Reply::UP(new documentapi::UpdateDocumentReply); - } else if (mtype == DocumentProtocol::REPLY_REMOVEDOCUMENT) { - return mbus::Reply::UP(new documentapi::RemoveDocumentReply); - } else { - return mbus::Reply::UP(new documentapi::DocumentReply(mtype)); - } -} - -} // namespace - struct FeedTokenContext { MyTransport mt; FeedToken ft; typedef std::shared_ptr<FeedTokenContext> SP; typedef std::vector<SP> List; - FeedTokenContext(MyTracer &tracer, MessageType mtype); + FeedTokenContext(MyTracer &tracer); ~FeedTokenContext(); }; -FeedTokenContext::FeedTokenContext(MyTracer &tracer, MessageType mtype) - : mt(tracer), ft(mt, createReply(mtype)) +FeedTokenContext::FeedTokenContext(MyTracer &tracer) + : mt(tracer), ft(mt) {} -FeedTokenContext::~FeedTokenContext() {} +FeedTokenContext::~FeedTokenContext() = default; struct FixtureBase { @@ -612,7 +584,7 @@ struct FixtureBase } void putAndWait(const DocumentContext &docCtx) { - FeedTokenContext token(_tracer, DocumentProtocol::REPLY_PUTDOCUMENT); + FeedTokenContext token(_tracer); PutOperation op(docCtx.bid, docCtx.ts, docCtx.doc); runInMaster([&] () { performPut(&token.ft, op); }); } @@ -624,7 +596,7 @@ struct FixtureBase } void updateAndWait(const DocumentContext &docCtx) { - FeedTokenContext token(_tracer, DocumentProtocol::REPLY_UPDATEDOCUMENT); + FeedTokenContext token(_tracer); UpdateOperation op(docCtx.bid, docCtx.ts, docCtx.upd); runInMaster([&] () { performUpdate(&token.ft, op); }); } @@ -636,13 +608,13 @@ struct FixtureBase getFeedView().handleRemove(token, op); } else { if (token != NULL) { - token->ack(op.getType(), pc._metrics); + token->ack(); } } } void removeAndWait(const DocumentContext &docCtx) { - FeedTokenContext token(_tracer, DocumentProtocol::REPLY_REMOVEDOCUMENT); + FeedTokenContext token(_tracer); RemoveOperation op(docCtx.bid, docCtx.ts, docCtx.doc->getId()); runInMaster([&] () { performRemove(&token.ft, op); }); } diff --git a/searchcore/src/tests/proton/documentdb/storeonlyfeedview/storeonlyfeedview_test.cpp b/searchcore/src/tests/proton/documentdb/storeonlyfeedview/storeonlyfeedview_test.cpp index 1ec201fdbd2..bbdabe2d0ef 100644 --- a/searchcore/src/tests/proton/documentdb/storeonlyfeedview/storeonlyfeedview_test.cpp +++ b/searchcore/src/tests/proton/documentdb/storeonlyfeedview/storeonlyfeedview_test.cpp @@ -1,13 +1,10 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include <vespa/document/base/documentid.h> -#include <vespa/document/base/globalid.h> -#include <vespa/document/bucket/bucketid.h> #include <vespa/document/datatype/datatype.h> #include <vespa/searchcommon/common/schema.h> #include <vespa/searchcore/proton/common/commit_time_tracker.h> #include <vespa/searchcore/proton/documentmetastore/lidreusedelayer.h> -#include <vespa/searchcore/proton/metrics/feed_metrics.h> #include <vespa/searchcore/proton/server/executorthreadingservice.h> #include <vespa/searchcore/proton/server/putdonecontext.h> #include <vespa/searchcore/proton/server/removedonecontext.h> @@ -15,8 +12,6 @@ #include <vespa/searchcore/proton/reference/dummy_gid_to_lid_change_handler.h> #include <vespa/searchcore/proton/test/mock_summary_adapter.h> #include <vespa/searchcore/proton/test/thread_utils.h> -#include <vespa/searchlib/common/idestructorcallback.h> -#include <vespa/searchlib/common/serialnum.h> #include <vespa/searchlib/index/docbuilder.h> #include <vespa/vespalib/testkit/testapp.h> @@ -225,8 +220,7 @@ struct FixtureBase { commitTimeTracker(fastos::TimeStamp()), feedview() { - PerDocTypeFeedMetrics metrics(0); - StoreOnlyFeedView::PersistentParams params(0, 0, DocTypeName("foo"), metrics, subdb_id, subDbType); + StoreOnlyFeedView::PersistentParams params(0, 0, DocTypeName("foo"), subdb_id, subDbType); metaStore->constructFreeList(); ISummaryAdapter::SP adapter = std::make_unique<MySummaryAdapter>(removeCount, putCount, heartbeatCount); feedview = std::make_unique<FeedViewType>(adapter, metaStore, writeService, lidReuseDelayer, diff --git a/searchcore/src/tests/proton/feedtoken/feedtoken.cpp b/searchcore/src/tests/proton/feedtoken/feedtoken.cpp index 9df65ae3437..530c9ebef39 100644 --- a/searchcore/src/tests/proton/feedtoken/feedtoken.cpp +++ b/searchcore/src/tests/proton/feedtoken/feedtoken.cpp @@ -1,57 +1,39 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include <vespa/messagebus/emptyreply.h> -#include <vespa/messagebus/testlib/receptor.h> -#include <vespa/documentapi/messagebus/messages/removedocumentreply.h> -#include <vespa/searchcore/proton/common/feedtoken.h> #include <vespa/vespalib/testkit/testapp.h> +#include <vespa/searchcore/proton/common/feedtoken.h> #include <vespa/vespalib/util/exceptions.h> using namespace proton; class LocalTransport : public FeedToken::ITransport { private: - mbus::Receptor _receptor; - double _latency_ms; + size_t _receivedCount; public: LocalTransport() - : _receptor(), - _latency_ms(0.0) - { - // empty - } - - void send(mbus::Reply::UP reply, ResultUP, bool, double latency_ms) override { - _receptor.handleReply(std::move(reply)); - _latency_ms = latency_ms; - } + : _receivedCount(0) + { } - mbus::Reply::UP getReply() { - return _receptor.getReply(); + void send(ResultUP, bool) override { + _receivedCount++; } - double getLatencyMs() const { - return _latency_ms; - } + size_t getReceivedCount() const { return _receivedCount; } }; class Test : public vespalib::TestApp { private: void testAck(); - void testAutoReply(); void testFail(); void testHandover(); - void testIntegrity(); public: int Main() override { TEST_INIT("feedtoken_test"); testAck(); TEST_FLUSH(); -// testAutoReply(); TEST_FLUSH(); testFail(); TEST_FLUSH(); testHandover(); TEST_FLUSH(); -// testIntegrity(); TEST_FLUSH(); TEST_DONE(); } @@ -63,41 +45,18 @@ void Test::testAck() { LocalTransport transport; - mbus::Reply::UP msg(new documentapi::RemoveDocumentReply()); - FeedToken token(transport, std::move(msg)); + FeedToken token(transport); token.ack(); - mbus::Reply::UP reply = transport.getReply(); - ASSERT_TRUE(reply.get() != NULL); - EXPECT_TRUE(!reply->hasErrors()); -} - -void -Test::testAutoReply() -{ - mbus::Receptor receptor; - mbus::Reply::UP reply(new documentapi::RemoveDocumentReply()); - reply->pushHandler(receptor); - { - LocalTransport transport; - FeedToken token(transport, std::move(reply)); - } - reply = receptor.getReply(0); - ASSERT_TRUE(reply.get() != NULL); - EXPECT_TRUE(reply->hasErrors()); + EXPECT_EQUAL(1u, transport.getReceivedCount()); } void Test::testFail() { LocalTransport transport; - mbus::Reply::UP reply(new documentapi::RemoveDocumentReply()); - FeedToken token(transport, std::move(reply)); - token.fail(69, "6699"); - reply = transport.getReply(); - ASSERT_TRUE(reply.get() != NULL); - EXPECT_EQUAL(1u, reply->getNumErrors()); - EXPECT_EQUAL(69u, reply->getError(0).getCode()); - EXPECT_EQUAL("6699", reply->getError(0).getMessage()); + FeedToken token(transport); + token.fail(); + EXPECT_EQUAL(1u, transport.getReceivedCount()); } void @@ -110,25 +69,11 @@ Test::testHandover() }; LocalTransport transport; - mbus::Reply::UP reply(new documentapi::RemoveDocumentReply()); - FeedToken token(transport, std::move(reply)); + FeedToken token(transport); token = MyHandover::handover(token); token.ack(); - reply = transport.getReply(); - ASSERT_TRUE(reply.get() != NULL); - EXPECT_TRUE(!reply->hasErrors()); + EXPECT_EQUAL(1u, transport.getReceivedCount()); } -void -Test::testIntegrity() -{ - LocalTransport transport; - try { - FeedToken token(transport, mbus::Reply::UP()); - EXPECT_TRUE(false); // should throw an exception - } catch (vespalib::IllegalArgumentException &e) { - (void)e; // expected - } -} diff --git a/searchcore/src/vespa/searchcore/fdispatch/search/nodemanager.cpp b/searchcore/src/vespa/searchcore/fdispatch/search/nodemanager.cpp index fd60b16ee70..10f70b2e518 100644 --- a/searchcore/src/vespa/searchcore/fdispatch/search/nodemanager.cpp +++ b/searchcore/src/vespa/searchcore/fdispatch/search/nodemanager.cpp @@ -239,10 +239,11 @@ FastS_NodeManager::SetCollDesc(FastS_DataSetCollDesc *configDesc, break; FastOS_Thread::Sleep(100); }; - if (allup) + if (allup) { LOG(debug, "All new engines up after %d ms", rwait); - else + } else { LOG(debug, "Some new engines still down after %d ms", rwait); + } } gencnt = SetDataSetCollection(newCollection); diff --git a/searchcore/src/vespa/searchcore/proton/attribute/i_attribute_writer.h b/searchcore/src/vespa/searchcore/proton/attribute/i_attribute_writer.h index 7455ccda5a3..abcf132d537 100644 --- a/searchcore/src/vespa/searchcore/proton/attribute/i_attribute_writer.h +++ b/searchcore/src/vespa/searchcore/proton/attribute/i_attribute_writer.h @@ -9,11 +9,7 @@ #include <vespa/searchcore/proton/attribute/i_attribute_manager.h> #include <vespa/searchcore/proton/feedoperation/lidvectorcontext.h> -namespace search { - -class IDestructorCallback; - -} +namespace search { class IDestructorCallback; } namespace proton { diff --git a/searchcore/src/vespa/searchcore/proton/common/feedtoken.cpp b/searchcore/src/vespa/searchcore/proton/common/feedtoken.cpp index 545a303cb16..008fafa332b 100644 --- a/searchcore/src/vespa/searchcore/proton/common/feedtoken.cpp +++ b/searchcore/src/vespa/searchcore/proton/common/feedtoken.cpp @@ -1,93 +1,53 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "feedtoken.h" -#include <vespa/searchcore/proton/metrics/feed_metrics.h> namespace proton { -FeedToken::FeedToken(ITransport &transport, mbus::Reply::UP reply) : - _state(new State(transport, std::move(reply), 1)) +FeedToken::FeedToken(ITransport &transport) : + _state(new State(transport, 1)) { } -FeedToken::State::State(ITransport & transport, mbus::Reply::UP reply, uint32_t numAcksRequired) : +FeedToken::State::State(ITransport & transport, uint32_t numAcksRequired) : _transport(transport), - _reply(std::move(reply)), _result(new storage::spi::Result()), _documentWasFound(false), - _unAckedCount(numAcksRequired), - _lock(), - _startTime() + _unAckedCount(numAcksRequired) { - assert(_reply); - _startTime.SetNow(); + assert(_unAckedCount > 0); } FeedToken::State::~State() { - assert(!_reply); + assert(_unAckedCount == 0); } void FeedToken::State::ack() { - assert(_reply); uint32_t prev(_unAckedCount--); if (prev == 1) { - _transport.send(std::move(_reply), std::move(_result), _documentWasFound, _startTime.MilliSecsToNow()); + _transport.send(std::move(_result), _documentWasFound); } assert(prev >= 1); } - -void -FeedToken::State::ack(const FeedOperation::Type opType, - PerDocTypeFeedMetrics &metrics) -{ - assert(_reply); - uint32_t prev(_unAckedCount--); - if (prev == 1) { - _transport.send(std::move(_reply), std::move(_result), _documentWasFound, _startTime.MilliSecsToNow()); - switch (opType) { - case FeedOperation::PUT: - metrics.RegisterPut(_startTime); - break; - case FeedOperation::REMOVE: - case FeedOperation::REMOVE_BATCH: - metrics.RegisterRemove(_startTime); - break; - case FeedOperation::UPDATE_42: - case FeedOperation::UPDATE: - metrics.RegisterUpdate(_startTime); - break; - case FeedOperation::MOVE: - metrics.RegisterMove(_startTime); - break; - default: - ; - } - } - assert(prev >= 1); -} - - void FeedToken::State::incNeededAcks() { - assert(_reply); uint32_t prev(_unAckedCount++); assert(prev >= 1); (void) prev; } - void -FeedToken::State::fail(uint32_t errNum, const vespalib::string &errMsg) +FeedToken::State::fail() { - assert(_reply); - vespalib::LockGuard guard(_lock); - _reply->addError(mbus::Error(errNum, errMsg)); - _transport.send(std::move(_reply), std::move(_result), _documentWasFound, _startTime.MilliSecsToNow()); + uint32_t prev = _unAckedCount.exchange(0); + if (prev > 0) { + _transport.send(std::move(_result), _documentWasFound); + } } } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/common/feedtoken.h b/searchcore/src/vespa/searchcore/proton/common/feedtoken.h index 722827ded87..856c8a22652 100644 --- a/searchcore/src/vespa/searchcore/proton/common/feedtoken.h +++ b/searchcore/src/vespa/searchcore/proton/common/feedtoken.h @@ -1,16 +1,13 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once -#include <vespa/messagebus/reply.h> #include <vespa/persistence/spi/persistenceprovider.h> #include <vespa/vespalib/util/exception.h> #include <vespa/vespalib/util/sync.h> -#include <vespa/searchcore/proton/feedoperation/feedoperation.h> #include <atomic> namespace proton { -class PerDocTypeFeedMetrics; typedef std::unique_ptr<storage::spi::Result> ResultUP; /** @@ -23,10 +20,7 @@ public: class ITransport { public: virtual ~ITransport() { } - virtual void send(mbus::Reply::UP reply, - ResultUP result, - bool documentWasFound, - double latency_ms) = 0; + virtual void send(ResultUP result, bool documentWasFound) = 0; }; private: @@ -34,30 +28,21 @@ private: public: State(const State &) = delete; State & operator = (const State &) = delete; - State(ITransport & transport, mbus::Reply::UP reply, uint32_t numAcksRequired); + State(ITransport & transport, uint32_t numAcksRequired); ~State(); - void ack(); - - void ack(const FeedOperation::Type opType, PerDocTypeFeedMetrics &metrics); - void incNeededAcks(); - - void fail(uint32_t errNum, const vespalib::string &errMsg); - mbus::Reply & getReply() { return *_reply; } + void ack(); + void fail(); void setResult(ResultUP result, bool documentWasFound) { _documentWasFound = documentWasFound; _result = std::move(result); } const storage::spi::Result &getResult() { return *_result; } - FastOS_Time getStartTime() const { return _startTime; } private: - ITransport &_transport; - mbus::Reply::UP _reply; - ResultUP _result; - bool _documentWasFound; + ITransport &_transport; + ResultUP _result; + bool _documentWasFound; std::atomic<uint32_t> _unAckedCount; - vespalib::Lock _lock; - FastOS_Time _startTime; }; std::shared_ptr<State> _state; @@ -72,9 +57,8 @@ public: * vespalib::IllegalArgumentException. * * @param transport The transport to pass the reply to. - * @param reply The mbus::Reply corresponding to this operation. */ - FeedToken(ITransport &transport, mbus::Reply::UP reply); + FeedToken(ITransport &transport); FeedToken(FeedToken &&) = default; FeedToken & operator =(FeedToken &&) = default; @@ -89,10 +73,6 @@ public: */ void ack() const { _state->ack(); } - void ack(const FeedOperation::Type opType, PerDocTypeFeedMetrics &metrics) const { - _state->ack(opType, metrics); - } - void incNeededAcks() const { _state->incNeededAcks(); } @@ -105,14 +85,7 @@ public: * @param errNum A numerical representation of the error. * @param errMsg A readable string detailing the error. */ - void fail(uint32_t errNum, const vespalib::string &errMsg) const { _state->fail(errNum, errMsg); } - - /** - * Gives you access to the underlying reply message. - * - * @return The reply - */ - mbus::Reply & getReply() const { return _state->getReply(); } + void fail() const { _state->fail(); } /** * Gives you access to the underlying result. @@ -127,8 +100,6 @@ public: void setResult(ResultUP result, bool documentWasFound) { _state->setResult(std::move(result), documentWasFound); } - - FastOS_Time getStartTime() const { return _state->getStartTime(); } }; } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/metrics/CMakeLists.txt b/searchcore/src/vespa/searchcore/proton/metrics/CMakeLists.txt index 9f599208aa2..95d5f153f35 100644 --- a/searchcore/src/vespa/searchcore/proton/metrics/CMakeLists.txt +++ b/searchcore/src/vespa/searchcore/proton/metrics/CMakeLists.txt @@ -7,7 +7,6 @@ vespa_add_library(searchcore_proton_metrics STATIC documentdb_metrics_collection.cpp documentdb_tagged_metrics.cpp executor_metrics.cpp - feed_metrics.cpp job_load_sampler.cpp job_tracker.cpp job_tracked_flush_target.cpp diff --git a/searchcore/src/vespa/searchcore/proton/metrics/feed_metrics.cpp b/searchcore/src/vespa/searchcore/proton/metrics/feed_metrics.cpp deleted file mode 100644 index 06b2f8a5fca..00000000000 --- a/searchcore/src/vespa/searchcore/proton/metrics/feed_metrics.cpp +++ /dev/null @@ -1,60 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "feed_metrics.h" - -using vespalib::LockGuard; - -namespace proton { - -FeedMetrics::FeedMetrics() - : metrics::MetricSet("feed", "", "Feed metrics", 0), - count("count", "logdefault", "Feed messages handled", this), - latency("latency", "logdefault", "Feed message latency", this) -{ -} - -FeedMetrics::~FeedMetrics() {} - -PerDocTypeFeedMetrics::PerDocTypeFeedMetrics(MetricSet *parent) - : MetricSet("feedmetrics", "", "Feed metrics", parent), - _update_lock(), - _puts("puts", "", "Number of feed put operations", this), - _updates("updates", "", "Number of feed update operations", this), - _removes("removes", "", "Number of feed remove operations", this), - _moves("moves", "", "Number of feed move operations", this), - _put_latency("put_latency", "", "Latency for feed puts", this), - _update_latency("update_latency", "", "Latency for feed updates", this), - _remove_latency("remove_latency", "", "Latency for feed removes", this), - _move_latency("move_latency", "", "Latency for feed moves", this) -{ -} - -PerDocTypeFeedMetrics::~PerDocTypeFeedMetrics() {} - -void PerDocTypeFeedMetrics::RegisterPut(const FastOS_Time &start_time) { - LockGuard lock(_update_lock); - _puts.inc(1); - _put_latency.addValue(start_time.MilliSecsToNow() / 1000.0); -} - -void PerDocTypeFeedMetrics::RegisterUpdate(const FastOS_Time &start_time) { - LockGuard lock(_update_lock); - _updates.inc(1); - _update_latency.addValue(start_time.MilliSecsToNow() / 1000.0); -} - -void PerDocTypeFeedMetrics::RegisterRemove(const FastOS_Time &start_time) { - LockGuard lock(_update_lock); - _removes.inc(1); - _remove_latency.addValue(start_time.MilliSecsToNow() / 1000.0); -} - -void -PerDocTypeFeedMetrics::RegisterMove(const FastOS_Time &start_time) -{ - LockGuard lock(_update_lock); - _moves.inc(1); - _move_latency.addValue(start_time.MilliSecsToNow() / 1000.0); -} - -} // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/metrics/feed_metrics.h b/searchcore/src/vespa/searchcore/proton/metrics/feed_metrics.h deleted file mode 100644 index 8d8a8bcbb88..00000000000 --- a/searchcore/src/vespa/searchcore/proton/metrics/feed_metrics.h +++ /dev/null @@ -1,41 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#pragma once - -#include <vespa/metrics/metrics.h> -#include <vespa/vespalib/util/sync.h> - -namespace proton { - -struct FeedMetrics : metrics::MetricSet -{ - vespalib::Lock updateLock; - metrics::LongCountMetric count; - metrics::DoubleAverageMetric latency; - - FeedMetrics(); - ~FeedMetrics(); -}; - -class PerDocTypeFeedMetrics : metrics::MetricSet { - vespalib::Lock _update_lock; - metrics::LongCountMetric _puts; - metrics::LongCountMetric _updates; - metrics::LongCountMetric _removes; - metrics::LongCountMetric _moves; - metrics::DoubleAverageMetric _put_latency; - metrics::DoubleAverageMetric _update_latency; - metrics::DoubleAverageMetric _remove_latency; - metrics::DoubleAverageMetric _move_latency; - -public: - PerDocTypeFeedMetrics(metrics::MetricSet *parent); - ~PerDocTypeFeedMetrics(); - void RegisterPut(const FastOS_Time &start_time); - void RegisterUpdate(const FastOS_Time &start_time); - void RegisterRemove(const FastOS_Time &start_time); - void RegisterMove(const FastOS_Time &start_time); -}; - -} // namespace proton - diff --git a/searchcore/src/vespa/searchcore/proton/metrics/legacy_documentdb_metrics.cpp b/searchcore/src/vespa/searchcore/proton/metrics/legacy_documentdb_metrics.cpp index d324c4a7373..2c753d24c69 100644 --- a/searchcore/src/vespa/searchcore/proton/metrics/legacy_documentdb_metrics.cpp +++ b/searchcore/src/vespa/searchcore/proton/metrics/legacy_documentdb_metrics.cpp @@ -152,7 +152,6 @@ LegacyDocumentDBMetrics::LegacyDocumentDBMetrics(const std::string &docTypeName, executor("executor", this), indexExecutor("indexexecutor", this), summaryExecutor("summaryexecutor", this), - feed(this), sessionManager(this), ready("ready", this), notReady("notready", this), diff --git a/searchcore/src/vespa/searchcore/proton/metrics/legacy_documentdb_metrics.h b/searchcore/src/vespa/searchcore/proton/metrics/legacy_documentdb_metrics.h index 78a13002d7a..0abad83a3a6 100644 --- a/searchcore/src/vespa/searchcore/proton/metrics/legacy_documentdb_metrics.h +++ b/searchcore/src/vespa/searchcore/proton/metrics/legacy_documentdb_metrics.h @@ -6,7 +6,6 @@ #include "executor_metrics.h" #include <vespa/metrics/metrics.h> #include "sessionmanager_metrics.h" -#include "feed_metrics.h" #include <vespa/searchcore/proton/matching/matching_stats.h> namespace proton { @@ -117,7 +116,6 @@ struct LegacyDocumentDBMetrics : metrics::MetricSet ExecutorMetrics executor; ExecutorMetrics indexExecutor; ExecutorMetrics summaryExecutor; - PerDocTypeFeedMetrics feed; search::grouping::SessionManagerMetrics sessionManager; SubDBMetrics ready; SubDBMetrics notReady; diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp index 69a3a902af8..feebbf4cf2a 100644 --- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp +++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp @@ -182,17 +182,14 @@ PersistenceEngine::getHandlerSnapshot(document::BucketSpace bucketSpace) const } PersistenceEngine::HandlerSnapshot::UP -PersistenceEngine::getHandlerSnapshot(document::BucketSpace bucketSpace, - const DocumentId &id) const +PersistenceEngine::getHandlerSnapshot(document::BucketSpace bucketSpace, const DocumentId &id) const { LockGuard guard(_lock); return _handlers.getHandlerSnapshot(bucketSpace, id); } -PersistenceEngine::PersistenceEngine(IPersistenceEngineOwner &owner, - const IResourceWriteFilter &writeFilter, - ssize_t defaultSerializedSize, - bool ignoreMaxBytes) +PersistenceEngine::PersistenceEngine(IPersistenceEngineOwner &owner, const IResourceWriteFilter &writeFilter, + ssize_t defaultSerializedSize, bool ignoreMaxBytes) : AbstractPersistenceProvider(), _defaultSerializedSize(defaultSerializedSize), _ignoreMaxBytes(ignoreMaxBytes), @@ -216,8 +213,7 @@ PersistenceEngine::~PersistenceEngine() IPersistenceHandler::SP -PersistenceEngine::putHandler(document::BucketSpace bucketSpace, - const DocTypeName &docType, +PersistenceEngine::putHandler(document::BucketSpace bucketSpace, const DocTypeName &docType, const IPersistenceHandler::SP &handler) { LockGuard guard(_lock); @@ -226,8 +222,7 @@ PersistenceEngine::putHandler(document::BucketSpace bucketSpace, IPersistenceHandler::SP -PersistenceEngine::getHandler(document::BucketSpace bucketSpace, - const DocTypeName &docType) const +PersistenceEngine::getHandler(document::BucketSpace bucketSpace, const DocTypeName &docType) const { LockGuard guard(_lock); return _handlers.getHandler(bucketSpace, docType); @@ -235,8 +230,7 @@ PersistenceEngine::getHandler(document::BucketSpace bucketSpace, IPersistenceHandler::SP -PersistenceEngine::removeHandler(document::BucketSpace bucketSpace, - const DocTypeName &docType) +PersistenceEngine::removeHandler(document::BucketSpace bucketSpace, const DocTypeName &docType) { // TODO: Grab bucket list and treat them as modified LockGuard guard(_lock); @@ -367,8 +361,7 @@ PersistenceEngine::put(const Bucket& b, Timestamp t, const document::Document::S docType.toString().c_str())); } TransportLatch latch(1); - FeedToken token(latch, mbus::Reply::UP(new documentapi::FeedReply( - documentapi::DocumentProtocol::REPLY_PUTDOCUMENT))); + FeedToken token(latch); handler->handlePut(token, b, t, doc); latch.await(); return latch.getResult(); @@ -390,7 +383,7 @@ PersistenceEngine::remove(const Bucket& b, Timestamp t, const DocumentId& did, C TransportLatch latch(snap->size()); for (; snap->handlers().valid(); snap->handlers().next()) { IPersistenceHandler *handler = snap->handlers().get(); - FeedToken token(latch, Reply::UP(new RemoveDocumentReply)); + FeedToken token(latch); handler->handleRemove(token, b, t, did); } latch.await(); @@ -421,7 +414,7 @@ PersistenceEngine::update(const Bucket& b, Timestamp t, const DocumentUpdate::SP IPersistenceHandler::SP handler = getHandler(b.getBucketSpace(), docType); TransportLatch latch(1); if (handler.get() != NULL) { - FeedToken token(latch, mbus::Reply::UP(new documentapi::UpdateDocumentReply())); + FeedToken token(latch); LOG(debug, "update = %s", upd->toXml().c_str()); handler->handleUpdate(token, b, t, upd); latch.await(); @@ -433,10 +426,7 @@ PersistenceEngine::update(const Bucket& b, Timestamp t, const DocumentUpdate::SP PersistenceEngine::GetResult -PersistenceEngine::get(const Bucket& b, - const document::FieldSet& fields, - const DocumentId& did, - Context& context) const +PersistenceEngine::get(const Bucket& b, const document::FieldSet& fields, const DocumentId& did, Context& context) const { std::shared_lock<std::shared_timed_mutex> rguard(_rwMutex); HandlerSnapshot::UP snapshot = getHandlerSnapshot(b.getBucketSpace()); @@ -465,11 +455,8 @@ PersistenceEngine::get(const Bucket& b, PersistenceEngine::CreateIteratorResult -PersistenceEngine::createIterator(const Bucket &bucket, - const document::FieldSet& fields, - const Selection &selection, - IncludedVersions versions, - Context & context) +PersistenceEngine::createIterator(const Bucket &bucket, const document::FieldSet& fields, const Selection &selection, + IncludedVersions versions, Context & context) { std::shared_lock<std::shared_timed_mutex> rguard(_rwMutex); HandlerSnapshot::UP snapshot = getHandlerSnapshot(bucket.getBucketSpace()); @@ -552,7 +539,7 @@ PersistenceEngine::createBucket(const Bucket &b, Context &) TransportLatch latch(snap->size()); for (; snap->handlers().valid(); snap->handlers().next()) { IPersistenceHandler *handler = snap->handlers().get(); - FeedToken token(latch, Reply::UP(new DocumentReply(0))); + FeedToken token(latch); handler->handleCreateBucket(token, b); } latch.await(); @@ -569,7 +556,7 @@ PersistenceEngine::deleteBucket(const Bucket& b, Context&) TransportLatch latch(snap->size()); for (; snap->handlers().valid(); snap->handlers().next()) { IPersistenceHandler *handler = snap->handlers().get(); - FeedToken token(latch, Reply::UP(new DocumentReply(0))); + FeedToken token(latch); handler->handleDeleteBucket(token, b); } latch.await(); @@ -612,7 +599,7 @@ PersistenceEngine::split(const Bucket& source, const Bucket& target1, const Buck TransportLatch latch(snap->size()); for (; snap->handlers().valid(); snap->handlers().next()) { IPersistenceHandler *handler = snap->handlers().get(); - FeedToken token(latch, Reply::UP(new DocumentReply(0))); + FeedToken token(latch); handler->handleSplit(token, source, target1, target2); } latch.await(); @@ -631,7 +618,7 @@ PersistenceEngine::join(const Bucket& source1, const Bucket& source2, const Buck TransportLatch latch(snap->size()); for (; snap->handlers().valid(); snap->handlers().next()) { IPersistenceHandler *handler = snap->handlers().get(); - FeedToken token(latch, Reply::UP(new DocumentReply(0))); + FeedToken token(latch); handler->handleJoin(token, source1, source2, target); } latch.await(); diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.cpp b/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.cpp index e719a0aa962..e0d512ae6e0 100644 --- a/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.cpp +++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.cpp @@ -16,16 +16,11 @@ TransportLatch::TransportLatch(uint32_t cnt) TransportLatch::~TransportLatch() {} void -TransportLatch::send(mbus::Reply::UP reply, - ResultUP result, - bool documentWasFound, - double latency_ms) +TransportLatch::send(ResultUP result, bool documentWasFound) { - (void) reply; - (void) latency_ms; { vespalib::LockGuard guard(_lock); - if (!_result.get()) { + if (!_result) { _result = std::move(result); } else if (result->hasError()) { _result.reset(new Result(mergeErrorResults(*_result, *result))); @@ -40,9 +35,7 @@ Result TransportLatch::mergeErrorResults(const Result &lhs, const Result &rhs) { Result::ErrorType error = (lhs.getErrorCode() > rhs.getErrorCode() ? lhs : rhs).getErrorCode(); - return Result(error, vespalib::make_string("%s, %s", - lhs.getErrorMessage().c_str(), - rhs.getErrorMessage().c_str())); + return Result(error, vespalib::make_string("%s, %s", lhs.getErrorMessage().c_str(), rhs.getErrorMessage().c_str())); } } // proton diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.h b/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.h index 747c95358b4..12f92722dfa 100644 --- a/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.h +++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.h @@ -20,10 +20,7 @@ private: public: TransportLatch(uint32_t cnt); ~TransportLatch(); - virtual void send(mbus::Reply::UP reply, - ResultUP result, - bool documentWasFound, - double latency_ms) override; + void send(ResultUP result, bool documentWasFound) override; void await() { _latch.await(); } diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp index cffa014534e..4198803d1fe 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp @@ -125,9 +125,7 @@ DocumentDB::DocumentDB(const vespalib::string &baseDir, _state(), _dmUsageForwarder(_writeService.master()), _writeFilter(), - _feedHandler(_writeService, tlsSpec, docTypeName, - getMetricsCollection().getLegacyMetrics().feed, - _state, *this, _writeFilter, *this, tlsDirectWriter), + _feedHandler(_writeService, tlsSpec, docTypeName, _state, *this, _writeFilter, *this, tlsDirectWriter), _subDBs(*this, *this, _feedHandler, _docTypeName, _writeService, warmupExecutor, summaryExecutor, fileHeaderContext, metricsWireService, getMetricsCollection(), queryLimiter, clock, _configMutex, _baseDir, protonCfg, hwInfo), @@ -138,7 +136,7 @@ DocumentDB::DocumentDB(const vespalib::string &baseDir, _lastDocStoreCacheStats(), _calc() { - assert(configSnapshot.get() != NULL); + assert(configSnapshot); LOG(debug, "DocumentDB(%s): Creating database in directory '%s'", _docTypeName.toString().c_str(), _baseDir.c_str()); @@ -277,7 +275,7 @@ DocumentDB::newConfigSnapshot(DocumentDBConfig::SP snapshot) _pendingConfigSnapshot.set(snapshot); { lock_guard guard(_configMutex); - if (_activeConfigSnapshot.get() == NULL) { + if ( ! _activeConfigSnapshot) { LOG(debug, "DocumentDB(%s): Ignoring new available config snapshot. " "The document database does not have" diff --git a/searchcore/src/vespa/searchcore/proton/server/fast_access_feed_view.cpp b/searchcore/src/vespa/searchcore/proton/server/fast_access_feed_view.cpp index e46caca4fba..2b2849d025c 100644 --- a/searchcore/src/vespa/searchcore/proton/server/fast_access_feed_view.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/fast_access_feed_view.cpp @@ -36,11 +36,8 @@ FastAccessFeedView::getUpdateScope(const DocumentUpdate &upd) * Otherwise we can drop it and ack the operation right away. */ void -FastAccessFeedView::putAttributes(SerialNum serialNum, - search::DocumentIdT lid, - const Document &doc, - bool immediateCommit, - OnPutDoneType onWriteDone) +FastAccessFeedView::putAttributes(SerialNum serialNum, search::DocumentIdT lid, const Document &doc, + bool immediateCommit, OnPutDoneType onWriteDone) { _attributeWriter->put(serialNum, doc, lid, immediateCommit, onWriteDone); if (immediateCommit && onWriteDone) { @@ -49,29 +46,22 @@ FastAccessFeedView::putAttributes(SerialNum serialNum, } void -FastAccessFeedView::updateAttributes(SerialNum serialNum, - search::DocumentIdT lid, - const DocumentUpdate &upd, - bool immediateCommit, - OnOperationDoneType onWriteDone) +FastAccessFeedView::updateAttributes(SerialNum serialNum, search::DocumentIdT lid, const DocumentUpdate &upd, + bool immediateCommit, OnOperationDoneType onWriteDone) { _attributeWriter->update(serialNum, upd, lid, immediateCommit, onWriteDone); } void -FastAccessFeedView::removeAttributes(SerialNum serialNum, - search::DocumentIdT lid, - bool immediateCommit, - OnRemoveDoneType onWriteDone) +FastAccessFeedView::removeAttributes(SerialNum serialNum, search::DocumentIdT lid, + bool immediateCommit, OnRemoveDoneType onWriteDone) { _attributeWriter->remove(serialNum, lid, immediateCommit, onWriteDone); } void -FastAccessFeedView::removeAttributes(SerialNum serialNum, - const LidVector &lidsToRemove, - bool immediateCommit, - OnWriteDoneType onWriteDone) +FastAccessFeedView::removeAttributes(SerialNum serialNum, const LidVector &lidsToRemove, + bool immediateCommit, OnWriteDoneType onWriteDone) { _attributeWriter->remove(lidsToRemove, serialNum, immediateCommit, onWriteDone); } @@ -83,8 +73,7 @@ FastAccessFeedView::heartBeatAttributes(SerialNum serialNum) } FastAccessFeedView::FastAccessFeedView(const StoreOnlyFeedView::Context &storeOnlyCtx, - const PersistentParams ¶ms, - const Context &ctx) + const PersistentParams ¶ms, const Context &ctx) : Parent(storeOnlyCtx, params), _attributeWriter(ctx._attrWriter), _docIdLimit(ctx._docIdLimit) @@ -103,8 +92,7 @@ FastAccessFeedView::handleCompactLidSpace(const CompactLidSpaceOperation &op) } void -FastAccessFeedView::forceCommit(SerialNum serialNum, - OnForceCommitDoneType onCommitDone) +FastAccessFeedView::forceCommit(SerialNum serialNum, OnForceCommitDoneType onCommitDone) { _attributeWriter->forceCommit(serialNum, onCommitDone); onCommitDone->registerCommittedDocIdLimit(_metaStore.getCommittedDocIdLimit(), &_docIdLimit); @@ -119,5 +107,19 @@ FastAccessFeedView::sync() _writeService.attributeFieldWriter().sync(); } +bool +FastAccessFeedView::fastPartialUpdateAttribute(const vespalib::string &fieldName) const { + search::AttributeVector *attribute = _attributeWriter->getWritableAttribute(fieldName); + if (attribute == nullptr) { + // Partial update to non-attribute field must update document + return false; + } + search::attribute::BasicType::Type attrType = attribute->getBasicType(); + // Partial update to tensor, predicate or reference attribute + // must update document + return ((attrType != search::attribute::BasicType::Type::PREDICATE) && + (attrType != search::attribute::BasicType::Type::TENSOR) && + (attrType != search::attribute::BasicType::Type::REFERENCE)); +} } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/fast_access_feed_view.h b/searchcore/src/vespa/searchcore/proton/server/fast_access_feed_view.h index 4056aecd24c..e1b0cf83f64 100644 --- a/searchcore/src/vespa/searchcore/proton/server/fast_access_feed_view.h +++ b/searchcore/src/vespa/searchcore/proton/server/fast_access_feed_view.h @@ -39,39 +39,28 @@ private: const IAttributeWriter::SP _attributeWriter; DocIdLimit &_docIdLimit; - virtual UpdateScope getUpdateScope(const document::DocumentUpdate &upd) override; + UpdateScope getUpdateScope(const document::DocumentUpdate &upd) override; - virtual void putAttributes(SerialNum serialNum, - search::DocumentIdT lid, - const document::Document &doc, - bool immediateCommit, - OnPutDoneType onWriteDone) override; + void putAttributes(SerialNum serialNum, search::DocumentIdT lid, const document::Document &doc, + bool immediateCommit, OnPutDoneType onWriteDone) override; - virtual void updateAttributes(SerialNum serialNum, - search::DocumentIdT lid, - const document::DocumentUpdate &upd, - bool immediateCommit, - OnOperationDoneType onWriteDone) override; + void updateAttributes(SerialNum serialNum, search::DocumentIdT lid, const document::DocumentUpdate &upd, + bool immediateCommit, OnOperationDoneType onWriteDone) override; - virtual void removeAttributes(SerialNum serialNum, - search::DocumentIdT lid, - bool immediateCommit, - OnRemoveDoneType onWriteDone) override; + void removeAttributes(SerialNum serialNum, search::DocumentIdT lid, + bool immediateCommit, OnRemoveDoneType onWriteDone) override; - virtual void removeAttributes(SerialNum serialNum, - const LidVector &lidsToRemove, - bool immediateCommit, - OnWriteDoneType onWriteDone) override; + void removeAttributes(SerialNum serialNum, const LidVector &lidsToRemove, + bool immediateCommit, OnWriteDoneType onWriteDone) override; - virtual void heartBeatAttributes(SerialNum serialNum) override; + void heartBeatAttributes(SerialNum serialNum) override; protected: - virtual void forceCommit(SerialNum serialNum, OnForceCommitDoneType onCommitDone) override; + void forceCommit(SerialNum serialNum, OnForceCommitDoneType onCommitDone) override; public: FastAccessFeedView(const StoreOnlyFeedView::Context &storeOnlyCtx, - const PersistentParams ¶ms, - const Context &ctx); + const PersistentParams ¶ms, const Context &ctx); ~FastAccessFeedView(); virtual const IAttributeWriter::SP &getAttributeWriter() const { @@ -82,24 +71,10 @@ public: return _docIdLimit; } - virtual void handleCompactLidSpace(const CompactLidSpaceOperation &op) override; - - virtual void sync() override; - - bool fastPartialUpdateAttribute(const vespalib::string &fieldName) { - search::AttributeVector *attribute = - _attributeWriter->getWritableAttribute(fieldName); - if (attribute == nullptr) { - // Partial update to non-attribute field must update document - return false; - } - search::attribute::BasicType::Type attrType = attribute->getBasicType(); - // Partial update to tensor, predicate or reference attribute - // must update document - return ((attrType != search::attribute::BasicType::Type::PREDICATE) && - (attrType != search::attribute::BasicType::Type::TENSOR) && - (attrType != search::attribute::BasicType::Type::REFERENCE)); - } + void handleCompactLidSpace(const CompactLidSpaceOperation &op) override; + void sync() override; + + bool fastPartialUpdateAttribute(const vespalib::string &fieldName) const; }; } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp index bb5f058429a..9300af5c3f0 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp @@ -49,27 +49,9 @@ namespace proton { namespace { -void -setUpdateWasFound(mbus::Reply &reply, bool was_found) -{ - assert(static_cast<DocumentReply&>(reply).getType() == DocumentProtocol::REPLY_UPDATEDOCUMENT); - UpdateDocumentReply &update_rep = static_cast<UpdateDocumentReply&>(reply); - update_rep.setWasFound(was_found); -} - -void -setRemoveWasFound(mbus::Reply &reply, bool was_found) -{ - assert(static_cast<DocumentReply&>(reply).getType() == DocumentProtocol::REPLY_REMOVEDOCUMENT); - RemoveDocumentReply &remove_rep = static_cast<RemoveDocumentReply&>(reply); - remove_rep.setWasFound(was_found); -} - bool -ignoreOperation(const DocumentOperation &op) -{ - return (op.getPrevTimestamp() != 0) - && (op.getTimestamp() < op.getPrevTimestamp()); +ignoreOperation(const DocumentOperation &op) { + return (op.getPrevTimestamp() != 0) && (op.getTimestamp() < op.getPrevTimestamp()); } } // namespace @@ -119,7 +101,7 @@ void FeedHandler::performPut(FeedToken::UP token, PutOperation &op) { op.getDocument()->getId().toString().c_str(), (uint64_t)op.getTimestamp(), (uint64_t)op.getPrevTimestamp()); if (token) { token->setResult(ResultUP(new Result), false); - token->ack(op.getType(), _metrics); + token->ack(); } return; } @@ -142,8 +124,7 @@ FeedHandler::performUpdate(FeedToken::UP token, UpdateOperation &op) } else { if (token) { token->setResult(ResultUP(new UpdateResult(Timestamp(0))), false); - setUpdateWasFound(token->getReply(), false); - token->ack(op.getType(), _metrics); + token->ack(); } } } @@ -155,7 +136,6 @@ FeedHandler::performInternalUpdate(FeedToken::UP token, UpdateOperation &op) storeOperation(op); if (token) { token->setResult(ResultUP(new UpdateResult(op.getPrevTimestamp())), true); - setUpdateWasFound(token->getReply(), true); } _activeFeedView->handleUpdate(token.get(), op); } @@ -172,10 +152,9 @@ FeedHandler::createNonExistingDocument(FeedToken::UP token, const UpdateOperatio storeOperation(putOp); if (token) { token->setResult(ResultUP(new UpdateResult(putOp.getTimestamp())), true); - setUpdateWasFound(token->getReply(), true); } TransportLatch latch(1); - FeedToken putToken(latch, mbus::Reply::UP(new FeedReply(DocumentProtocol::REPLY_PUTDOCUMENT))); + FeedToken putToken(latch); _activeFeedView->handlePut(&putToken, putOp); latch.await(); if (token) { @@ -191,7 +170,7 @@ void FeedHandler::performRemove(FeedToken::UP token, RemoveOperation &op) { op.getDocumentId().toString().c_str(), (uint64_t)op.getTimestamp(), (uint64_t)op.getPrevTimestamp()); if (token) { token->setResult(ResultUP(new RemoveResult(false)), false); - token->ack(op.getType(), _metrics); + token->ack(); } return; } @@ -202,7 +181,6 @@ void FeedHandler::performRemove(FeedToken::UP token, RemoveOperation &op) { if (token) { bool documentWasFound = !op.getPrevMarkedAsRemoved(); token->setResult(ResultUP(new RemoveResult(documentWasFound)), documentWasFound); - setRemoveWasFound(token->getReply(), documentWasFound); } _activeFeedView->handleRemove(token.get(), op); } else if (op.hasDocType()) { @@ -210,14 +188,12 @@ void FeedHandler::performRemove(FeedToken::UP token, RemoveOperation &op) { storeOperation(op); if (token) { token->setResult(ResultUP(new RemoveResult(false)), false); - setRemoveWasFound(token->getReply(), false); } _activeFeedView->handleRemove(token.get(), op); } else { if (token) { token->setResult(ResultUP(new RemoveResult(false)), false); - setRemoveWasFound(token->getReply(), false); - token->ack(op.getType(), _metrics); + token->ack(); } } } @@ -367,7 +343,6 @@ FeedHandler::changeFeedState(FeedState::SP newState, const LockGuard &) FeedHandler::FeedHandler(IThreadingService &writeService, const vespalib::string &tlsSpec, const DocTypeName &docTypeName, - PerDocTypeFeedMetrics &metrics, DDBState &state, IFeedHandlerOwner &owner, const IResourceWriteFilter &writeFilter, @@ -395,9 +370,8 @@ FeedHandler::FeedHandler(IThreadingService &writeService, _delayedPrune(false), _feedLock(), _feedState(std::make_shared<InitState>(getDocTypeName())), - _activeFeedView(NULL), + _activeFeedView(nullptr), _bucketDBHandler(nullptr), - _metrics(metrics), _syncLock(), _syncedSerialNum(0), _allowSync(false) @@ -499,7 +473,7 @@ void feedOperationRejected(FeedToken *token, const vespalib::string &opType, con auto message = make_string("%s operation rejected for document '%s' of type '%s': '%s'", opType.c_str(), docId.c_str(), docTypeName.toString().c_str(), rejectMessage.c_str()); token->setResult(ResultUP(new ResultType(Result::RESOURCE_EXHAUSTED, message)), false); - token->fail(documentapi::DocumentProtocol::ERROR_REJECTED, message); + token->fail(); } } diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h index 335a86e0279..dc955cfeb79 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h +++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h @@ -29,7 +29,6 @@ class IFeedView; class IResourceWriteFilter; class IReplayConfig; class JoinBucketsOperation; -class PerDocTypeFeedMetrics; class PutOperation; class RemoveOperation; class SplitBucketOperation; @@ -92,8 +91,6 @@ private: // used by master write thread tasks IFeedView *_activeFeedView; bucketdb::IBucketDBHandler *_bucketDBHandler; - PerDocTypeFeedMetrics &_metrics; - vespalib::Lock _syncLock; SerialNum _syncedSerialNum; bool _allowSync; // Sanity check @@ -143,16 +140,14 @@ public: * @param writeService The thread service used for all write tasks. * @param tlsSpec The spec to connect to the transaction log server. * @param docTypeName The name and version of the document type we are feed handler for. - * @param metrics Feeding metrics. * @param state Document db state * @param owner Reference to the owner of this feed handler. * @param replayConfig Reference to interface used for replaying config changes. - * @param writer Inject writer for tls, or NULL to use internal. + * @param writer Inject writer for tls, or nullptr to use internal. */ FeedHandler(IThreadingService &writeService, const vespalib::string &tlsSpec, const DocTypeName &docTypeName, - PerDocTypeFeedMetrics &metrics, DDBState &state, IFeedHandlerOwner &owner, const IResourceWriteFilter &writerFilter, @@ -160,7 +155,7 @@ public: search::transactionlog::Writer & writer, TlsWriter * tlsWriter = nullptr); - virtual ~FeedHandler(); + ~FeedHandler() override; /** * Init this feed handler. diff --git a/searchcore/src/vespa/searchcore/proton/server/feedstate.cpp b/searchcore/src/vespa/searchcore/proton/server/feedstate.cpp index 2ec3de61dea..3628505ed66 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedstate.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/feedstate.cpp @@ -1,6 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "feedstate.h" +#include <vespa/searchcore/proton/feedoperation/feedoperation.h> #include <vespa/vespalib/util/exceptions.h> using document::BucketId; @@ -26,8 +27,7 @@ void FeedState::throwExceptionInReceive(const vespalib::string &docType, } void -FeedState::throwExceptionInHandleOperation(const vespalib::string &docType, - const FeedOperation &op) +FeedState::throwExceptionInHandleOperation(const vespalib::string &docType, const FeedOperation &op) { throw IllegalStateException (make_string("We should not receive any feed operations" diff --git a/searchcore/src/vespa/searchcore/proton/server/feedstate.h b/searchcore/src/vespa/searchcore/proton/server/feedstate.h index 13dd6ea9dc8..472f5cb224f 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedstate.h +++ b/searchcore/src/vespa/searchcore/proton/server/feedstate.h @@ -12,27 +12,22 @@ namespace proton { +class FeedOperation; /** * Class representing the current state of a feed handler. */ class FeedState { public: - enum Type { NORMAL, - REPLAY_TRANSACTION_LOG, - INIT }; + enum Type { NORMAL, REPLAY_TRANSACTION_LOG, INIT }; private: Type _type; protected: - void throwExceptionInReceive(const vespalib::string &docType, - uint64_t serialRangeFrom, - uint64_t serialRangeTo, - size_t packetSize); - - void throwExceptionInHandleOperation(const vespalib::string &docType, - const FeedOperation &op); + void throwExceptionInReceive(const vespalib::string &docType, uint64_t serialRangeFrom, + uint64_t serialRangeTo, size_t packetSize); + void throwExceptionInHandleOperation(const vespalib::string &docType, const FeedOperation &op); public: typedef std::shared_ptr<FeedState> SP; @@ -43,11 +38,8 @@ public: Type getType() const { return _type; } vespalib::string getName() const; - virtual void handleOperation(FeedToken token, FeedOperation::UP op) = 0; - - virtual void receive(const PacketWrapper::SP &wrap, - vespalib::Executor &executor) = 0; + virtual void handleOperation(FeedToken token, std::unique_ptr<FeedOperation> op) = 0; + virtual void receive(const PacketWrapper::SP &wrap, vespalib::Executor &executor) = 0; }; } // namespace proton - diff --git a/searchcore/src/vespa/searchcore/proton/server/operationdonecontext.cpp b/searchcore/src/vespa/searchcore/proton/server/operationdonecontext.cpp index 19235bb1e23..9c0115f0084 100644 --- a/searchcore/src/vespa/searchcore/proton/server/operationdonecontext.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/operationdonecontext.cpp @@ -5,12 +5,8 @@ namespace proton { -OperationDoneContext::OperationDoneContext(std::unique_ptr<FeedToken> token, - const FeedOperation::Type opType, - PerDocTypeFeedMetrics &metrics) - : _token(std::move(token)), - _opType(opType), - _metrics(metrics) +OperationDoneContext::OperationDoneContext(std::unique_ptr<FeedToken> token) + : _token(std::move(token)) { } @@ -24,7 +20,7 @@ OperationDoneContext::ack() { if (_token) { std::unique_ptr<FeedToken> token(std::move(_token)); - token->ack(_opType, _metrics); + token->ack(); } } diff --git a/searchcore/src/vespa/searchcore/proton/server/operationdonecontext.h b/searchcore/src/vespa/searchcore/proton/server/operationdonecontext.h index 3f7a6436604..b801987844b 100644 --- a/searchcore/src/vespa/searchcore/proton/server/operationdonecontext.h +++ b/searchcore/src/vespa/searchcore/proton/server/operationdonecontext.h @@ -7,7 +7,6 @@ namespace proton { -class PerDocTypeFeedMetrics; class FeedToken; /** @@ -20,18 +19,13 @@ class FeedToken; class OperationDoneContext : public search::IDestructorCallback { std::unique_ptr<FeedToken> _token; - const FeedOperation::Type _opType; - PerDocTypeFeedMetrics &_metrics; - protected: void ack(); public: - OperationDoneContext(std::unique_ptr<FeedToken> token, - const FeedOperation::Type opType, - PerDocTypeFeedMetrics &metrics); + OperationDoneContext(std::unique_ptr<FeedToken> token); - virtual ~OperationDoneContext(); + ~OperationDoneContext() override; FeedToken *getToken() { return _token.get(); } }; diff --git a/searchcore/src/vespa/searchcore/proton/server/putdonecontext.cpp b/searchcore/src/vespa/searchcore/proton/server/putdonecontext.cpp index efb5a58dd2e..649eebb26f5 100644 --- a/searchcore/src/vespa/searchcore/proton/server/putdonecontext.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/putdonecontext.cpp @@ -8,14 +8,13 @@ namespace proton { PutDoneContext::PutDoneContext(std::unique_ptr<FeedToken> token, - const FeedOperation::Type opType, - PerDocTypeFeedMetrics &metrics, + IGidToLidChangeHandler &gidToLidChangeHandler, const document::GlobalId &gid, uint32_t lid, search::SerialNum serialNum, bool enableNotifyPut) - : OperationDoneContext(std::move(token), opType, metrics), + : OperationDoneContext(std::move(token)), _lid(lid), _docIdLimit(nullptr), _gidToLidChangeHandler(gidToLidChangeHandler), diff --git a/searchcore/src/vespa/searchcore/proton/server/putdonecontext.h b/searchcore/src/vespa/searchcore/proton/server/putdonecontext.h index bddf9dabd90..3e98b02dda6 100644 --- a/searchcore/src/vespa/searchcore/proton/server/putdonecontext.h +++ b/searchcore/src/vespa/searchcore/proton/server/putdonecontext.h @@ -6,9 +6,7 @@ #include <vespa/document/base/globalid.h> #include <vespa/searchlib/common/serialnum.h> -namespace proton -{ - +namespace proton { class DocIdLimit; class IGidToLidChangeHandler; @@ -30,21 +28,11 @@ class PutDoneContext : public OperationDoneContext bool _enableNotifyPut; public: - PutDoneContext(std::unique_ptr<FeedToken> token, - const FeedOperation::Type opType, - PerDocTypeFeedMetrics &metrics, - IGidToLidChangeHandler &gidToLidChangeHandler, - const document::GlobalId &gid, - uint32_t lid, - search::SerialNum serialNum, - bool enableNotifyPut); - - virtual ~PutDoneContext(); - - void registerPutLid(DocIdLimit *docIdLimit) - { - _docIdLimit = docIdLimit; - } + PutDoneContext(std::unique_ptr<FeedToken> token, IGidToLidChangeHandler &gidToLidChangeHandler, + const document::GlobalId &gid, uint32_t lid, search::SerialNum serialNum, bool enableNotifyPut); + ~PutDoneContext() override; + + void registerPutLid(DocIdLimit *docIdLimit) { _docIdLimit = docIdLimit; } }; diff --git a/searchcore/src/vespa/searchcore/proton/server/removedonecontext.cpp b/searchcore/src/vespa/searchcore/proton/server/removedonecontext.cpp index 627e8d9f627..bd9a8240d73 100644 --- a/searchcore/src/vespa/searchcore/proton/server/removedonecontext.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/removedonecontext.cpp @@ -8,13 +8,11 @@ namespace proton { RemoveDoneContext::RemoveDoneContext(std::unique_ptr<FeedToken> token, - const FeedOperation::Type opType, - PerDocTypeFeedMetrics &metrics, vespalib::Executor &executor, IDocumentMetaStore &documentMetaStore, PendingNotifyRemoveDone &&pendingNotifyRemoveDone, uint32_t lid) - : OperationDoneContext(std::move(token), opType, metrics), + : OperationDoneContext(std::move(token)), _executor(executor), _task(), _pendingNotifyRemoveDone(std::move(pendingNotifyRemoveDone)) diff --git a/searchcore/src/vespa/searchcore/proton/server/removedonecontext.h b/searchcore/src/vespa/searchcore/proton/server/removedonecontext.h index c4fafb4e886..83f6013dd85 100644 --- a/searchcore/src/vespa/searchcore/proton/server/removedonecontext.h +++ b/searchcore/src/vespa/searchcore/proton/server/removedonecontext.h @@ -8,8 +8,7 @@ #include <vespa/searchlib/common/serialnum.h> #include <vespa/searchcore/proton/reference/pending_notify_remove_done.h> -namespace proton -{ +namespace proton { class IDocumentMetaStore; @@ -29,15 +28,11 @@ class RemoveDoneContext : public OperationDoneContext PendingNotifyRemoveDone _pendingNotifyRemoveDone; public: - RemoveDoneContext(std::unique_ptr<FeedToken> token, - const FeedOperation::Type opType, - PerDocTypeFeedMetrics &metrics, - vespalib::Executor &executor, - IDocumentMetaStore &documentMetaStore, - PendingNotifyRemoveDone &&pendingNotifyRemoveDone, + RemoveDoneContext(std::unique_ptr<FeedToken> token, vespalib::Executor &executor, + IDocumentMetaStore &documentMetaStore, PendingNotifyRemoveDone &&pendingNotifyRemoveDone, uint32_t lid); - virtual ~RemoveDoneContext(); + ~RemoveDoneContext() override; }; diff --git a/searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.cpp b/searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.cpp index d3d73b42fbe..0c87d24899d 100644 --- a/searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.cpp @@ -5,7 +5,6 @@ #include "operationdonecontext.h" #include "removedonecontext.h" #include <vespa/searchcore/proton/common/feedtoken.h> -#include <vespa/searchcore/proton/metrics/feed_metrics.h> #include <vespa/searchcore/proton/documentmetastore/ilidreusedelayer.h> #include <vespa/vespalib/text/stringtokenizer.h> #include <vespa/vespalib/util/closuretask.h> @@ -35,10 +34,8 @@ SearchableFeedView::Context::Context(const IIndexWriter::SP &indexWriter) SearchableFeedView::Context::~Context() = default; -SearchableFeedView::SearchableFeedView(const StoreOnlyFeedView::Context &storeOnlyCtx, - const PersistentParams ¶ms, - const FastAccessFeedView::Context &fastUpdateCtx, - Context ctx) +SearchableFeedView::SearchableFeedView(const StoreOnlyFeedView::Context &storeOnlyCtx, const PersistentParams ¶ms, + const FastAccessFeedView::Context &fastUpdateCtx, Context ctx) : Parent(storeOnlyCtx, params, fastUpdateCtx), _indexWriter(ctx._indexWriter), _hasIndexedFields(_schema->getNumIndexFields() > 0) @@ -191,11 +188,8 @@ SearchableFeedView::performIndexRemove(SerialNum serialNum, const LidVector &lid assert(_writeService.index().isCurrentThread()); for (const auto lid : lidsToRemove) { VLOG(getDebugLevel(lid, nullptr), - "database(%s): performIndexRemove: serialNum(%" PRIu64 "), " - "lid(%d)", - _params._docTypeName.toString().c_str(), - serialNum, - lid); + "database(%s): performIndexRemove: serialNum(%" PRIu64 "), lid(%d)", + _params._docTypeName.toString().c_str(), serialNum, lid); _indexWriter->remove(serialNum, lid); } diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlydocsubdb.cpp b/searchcore/src/vespa/searchcore/proton/server/storeonlydocsubdb.cpp index e931a28c6a5..bba03621f8a 100644 --- a/searchcore/src/vespa/searchcore/proton/server/storeonlydocsubdb.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/storeonlydocsubdb.cpp @@ -160,7 +160,7 @@ StoreOnlyDocSubDB::clearViews() { size_t StoreOnlyDocSubDB::getNumDocs() const { - if (_metaStoreCtx.get() != NULL) { + if (_metaStoreCtx) { return _metaStoreCtx->get().getNumUsedLids(); } else { return 0u; @@ -378,8 +378,7 @@ StoreOnlyDocSubDB::getFeedViewPersistentParams() { SerialNum flushedDMSSN(_flushedDocumentMetaStoreSerialNum); SerialNum flushedDSSN(_flushedDocumentStoreSerialNum); - return StoreOnlyFeedView::PersistentParams(flushedDMSSN, flushedDSSN, _docTypeName, - _metrics.feed, _subDbId, _subDbType); + return StoreOnlyFeedView::PersistentParams(flushedDMSSN, flushedDSSN, _docTypeName, _subDbId, _subDbType); } void diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp index de3e1648085..8c8559115bd 100644 --- a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp @@ -11,10 +11,8 @@ #include <vespa/searchcore/proton/common/commit_time_tracker.h> #include <vespa/searchcore/proton/common/feedtoken.h> #include <vespa/searchcore/proton/documentmetastore/ilidreusedelayer.h> -#include <vespa/searchcore/proton/metrics/feed_metrics.h> #include <vespa/searchcore/proton/reference/i_gid_to_lid_change_handler.h> #include <vespa/document/datatype/documenttype.h> -#include <vespa/document/fieldvalue/document.h> #include <vespa/vespalib/util/exceptions.h> #include <vespa/log/log.h> @@ -54,19 +52,18 @@ private: IDestructorCallback::SP _moveDoneCtx; public: - PutDoneContextForMove(std::unique_ptr<FeedToken> token, const FeedOperation::Type opType, - PerDocTypeFeedMetrics &metrics, + PutDoneContextForMove(std::unique_ptr<FeedToken> token, IGidToLidChangeHandler &gidToLidChangeHandler, const document::GlobalId &gid, uint32_t lid, search::SerialNum serialNum, bool enableNotifyPut, IDestructorCallback::SP moveDoneCtx) - : PutDoneContext(std::move(token), opType, metrics, gidToLidChangeHandler, gid, lid, serialNum, enableNotifyPut), + : PutDoneContext(std::move(token), gidToLidChangeHandler, gid, lid, serialNum, enableNotifyPut), _moveDoneCtx(std::move(moveDoneCtx)) {} - virtual ~PutDoneContextForMove() {} + ~PutDoneContextForMove() = default; }; std::shared_ptr<PutDoneContext> -createPutDoneContext(FeedToken::UP &token, FeedOperation::Type opType, PerDocTypeFeedMetrics &metrics, +createPutDoneContext(FeedToken::UP &token, IGidToLidChangeHandler &gidToLidChangeHandler, const document::GlobalId &gid, uint32_t lid, SerialNum serialNum, bool enableNotifyPut, @@ -74,25 +71,24 @@ createPutDoneContext(FeedToken::UP &token, FeedOperation::Type opType, PerDocTyp { std::shared_ptr<PutDoneContext> result; if (moveDoneCtx) { - result = std::make_shared<PutDoneContextForMove>(std::move(token), opType, metrics, gidToLidChangeHandler, gid, lid, serialNum, enableNotifyPut, std::move(moveDoneCtx)); + result = std::make_shared<PutDoneContextForMove>(std::move(token), gidToLidChangeHandler, gid, lid, serialNum, enableNotifyPut, std::move(moveDoneCtx)); } else { - result = std::make_shared<PutDoneContext>(std::move(token), opType, metrics, gidToLidChangeHandler, gid, lid, serialNum, enableNotifyPut); + result = std::make_shared<PutDoneContext>(std::move(token), gidToLidChangeHandler, gid, lid, serialNum, enableNotifyPut); } return result; } std::shared_ptr<PutDoneContext> -createPutDoneContext(FeedToken::UP &token, FeedOperation::Type opType, PerDocTypeFeedMetrics &metrics, IGidToLidChangeHandler &gidToLidChangeHandler, +createPutDoneContext(FeedToken::UP &token, IGidToLidChangeHandler &gidToLidChangeHandler, const document::GlobalId &gid, uint32_t lid, SerialNum serialNum, bool enableNotifyPut) { - return createPutDoneContext(token, opType, metrics, gidToLidChangeHandler, gid, lid, serialNum, enableNotifyPut, IDestructorCallback::SP()); + return createPutDoneContext(token, gidToLidChangeHandler, gid, lid, serialNum, enableNotifyPut, IDestructorCallback::SP()); } std::shared_ptr<UpdateDoneContext> -createUpdateDoneContext(FeedToken::UP &token, FeedOperation::Type opType, - PerDocTypeFeedMetrics &metrics, const DocumentUpdate::SP &upd) +createUpdateDoneContext(FeedToken::UP &token, const DocumentUpdate::SP &upd) { - return std::make_shared<UpdateDoneContext>(std::move(token), opType, metrics, upd); + return std::make_shared<UpdateDoneContext>(std::move(token), upd); } void setPrev(DocumentOperation &op, const documentmetastore::IStore::Result &result, @@ -110,32 +106,28 @@ private: IDestructorCallback::SP _moveDoneCtx; public: - RemoveDoneContextForMove(std::unique_ptr<FeedToken> token, const FeedOperation::Type opType, - PerDocTypeFeedMetrics &metrics, vespalib::Executor &executor, + RemoveDoneContextForMove(std::unique_ptr<FeedToken> token, vespalib::Executor &executor, IDocumentMetaStore &documentMetaStore, PendingNotifyRemoveDone &&pendingNotifyRemoveDone, uint32_t lid, IDestructorCallback::SP moveDoneCtx) - : RemoveDoneContext(std::move(token), opType, metrics, executor, documentMetaStore, std::move(pendingNotifyRemoveDone) ,lid), + : RemoveDoneContext(std::move(token), executor, documentMetaStore, std::move(pendingNotifyRemoveDone) ,lid), _moveDoneCtx(std::move(moveDoneCtx)) {} - virtual ~RemoveDoneContextForMove() {} + ~RemoveDoneContextForMove() = default; }; std::shared_ptr<RemoveDoneContext> -createRemoveDoneContext(std::unique_ptr<FeedToken> token, const FeedOperation::Type opType, - PerDocTypeFeedMetrics &metrics, vespalib::Executor &executor, - IDocumentMetaStore &documentMetaStore, - PendingNotifyRemoveDone &&pendingNotifyRemoveDone, - uint32_t lid, - IDestructorCallback::SP moveDoneCtx) +createRemoveDoneContext(std::unique_ptr<FeedToken> token, vespalib::Executor &executor, + IDocumentMetaStore &documentMetaStore, PendingNotifyRemoveDone &&pendingNotifyRemoveDone, + uint32_t lid, IDestructorCallback::SP moveDoneCtx) { if (moveDoneCtx) { return std::make_shared<RemoveDoneContextForMove> - (std::move(token), opType, metrics, executor, documentMetaStore, std::move(pendingNotifyRemoveDone), lid, std::move(moveDoneCtx)); + (std::move(token), executor, documentMetaStore, std::move(pendingNotifyRemoveDone), lid, std::move(moveDoneCtx)); } else { return std::make_shared<RemoveDoneContext> - (std::move(token), opType, metrics, executor, documentMetaStore, std::move(pendingNotifyRemoveDone), lid); + (std::move(token), executor, documentMetaStore, std::move(pendingNotifyRemoveDone), lid); } } @@ -216,6 +208,8 @@ StoreOnlyFeedView::StoreOnlyFeedView(const Context &ctx, const PersistentParams _docType = _repo->getDocumentType(_params._docTypeName.getName()); } +StoreOnlyFeedView::~StoreOnlyFeedView() = default; + void StoreOnlyFeedView::sync() { @@ -240,10 +234,10 @@ StoreOnlyFeedView::forceCommit(SerialNum serialNum, OnForceCommitDoneType onComm } void -StoreOnlyFeedView::considerEarlyAck(FeedToken::UP &token, FeedOperation::Type opType) +StoreOnlyFeedView::considerEarlyAck(FeedToken::UP &token) { if (_commitTimeTracker.hasVisibilityDelay() && token) { - token->ack(opType, _params._metrics); + token->ack(); token.reset(); } } @@ -289,7 +283,7 @@ StoreOnlyFeedView::internalPut(FeedToken::UP token, const PutOperation &putOp) _params._subDbId, doc->toString(true).size(), doc->toString(true).c_str()); PendingNotifyRemoveDone pendingNotifyRemoveDone = adjustMetaStore(putOp, docId); - considerEarlyAck(token, putOp.getType()); + considerEarlyAck(token); bool docAlreadyExists = putOp.getValidPrevDbdId(_params._subDbId); @@ -297,18 +291,19 @@ StoreOnlyFeedView::internalPut(FeedToken::UP token, const PutOperation &putOp) bool immediateCommit = _commitTimeTracker.needCommit(); const document::GlobalId &gid = docId.getGlobalId(); std::shared_ptr<PutDoneContext> onWriteDone = - createPutDoneContext(token, putOp.getType(), _params._metrics, - _gidToLidChangeHandler, gid, putOp.getLid(), serialNum, putOp.changedDbdId() && useDocumentMetaStore(serialNum)); + createPutDoneContext(token, _gidToLidChangeHandler, gid, putOp.getLid(), serialNum, + putOp.changedDbdId() && useDocumentMetaStore(serialNum)); putSummary(serialNum, putOp.getLid(), doc, onWriteDone); putAttributes(serialNum, putOp.getLid(), *doc, immediateCommit, onWriteDone); putIndexedFields(serialNum, putOp.getLid(), doc, immediateCommit, onWriteDone); } if (docAlreadyExists && putOp.changedDbdId()) { assert(!putOp.getValidDbdId(_params._subDbId)); - internalRemove(std::move(token), serialNum, std::move(pendingNotifyRemoveDone), putOp.getPrevLid(), putOp.getType(), IDestructorCallback::SP()); + internalRemove(std::move(token), serialNum, std::move(pendingNotifyRemoveDone), + putOp.getPrevLid(), IDestructorCallback::SP()); } if (token) { - token->ack(putOp.getType(), _params._metrics); + token->ack(); } } @@ -432,13 +427,12 @@ StoreOnlyFeedView::internalUpdate(FeedToken::UP token, const UpdateOperation &up (void) updateOk; _metaStore.commit(serialNum, serialNum); } - considerEarlyAck(token, updOp.getType()); + considerEarlyAck(token); bool immediateCommit = _commitTimeTracker.needCommit(); - auto onWriteDone = createUpdateDoneContext(token, updOp.getType(), _params._metrics, updOp.getUpdate()); + auto onWriteDone = createUpdateDoneContext(token, updOp.getUpdate()); updateAttributes(serialNum, lid, upd, immediateCommit, onWriteDone); - UpdateScope updateScope(getUpdateScope(upd)); if (updateScope.hasIndexOrNonAttributeFields()) { PromisedDoc promisedDoc; @@ -555,7 +549,7 @@ StoreOnlyFeedView::internalRemove(FeedToken::UP token, const RemoveOperation &rm rmOp.getSubDbId(), rmOp.getLid(), rmOp.getPrevSubDbId(), rmOp.getPrevLid(), _params._subDbId); PendingNotifyRemoveDone pendingNotifyRemoveDone = adjustMetaStore(rmOp, docId); - considerEarlyAck(token, rmOp.getType()); + considerEarlyAck(token); if (rmOp.getValidDbdId(_params._subDbId)) { Document::UP clearDoc(new Document(*_docType, docId)); @@ -566,22 +560,25 @@ StoreOnlyFeedView::internalRemove(FeedToken::UP token, const RemoveOperation &rm if (rmOp.getValidPrevDbdId(_params._subDbId)) { if (rmOp.changedDbdId()) { assert(!rmOp.getValidDbdId(_params._subDbId)); - internalRemove(std::move(token), serialNum, std::move(pendingNotifyRemoveDone), rmOp.getPrevLid(), rmOp.getType(), IDestructorCallback::SP()); + internalRemove(std::move(token), serialNum, std::move(pendingNotifyRemoveDone), + rmOp.getPrevLid(), IDestructorCallback::SP()); } } if (token) { - token->ack(rmOp.getType(), _params._metrics); + token->ack(); } } void -StoreOnlyFeedView::internalRemove(FeedToken::UP token, SerialNum serialNum, PendingNotifyRemoveDone &&pendingNotifyRemoveDone, Lid lid, - FeedOperation::Type opType, IDestructorCallback::SP moveDoneCtx) +StoreOnlyFeedView::internalRemove(FeedToken::UP token, SerialNum serialNum, + PendingNotifyRemoveDone &&pendingNotifyRemoveDone, Lid lid, + IDestructorCallback::SP moveDoneCtx) { bool explicitReuseLid = _lidReuseDelayer.delayReuse(lid); std::shared_ptr<RemoveDoneContext> onWriteDone; - onWriteDone = createRemoveDoneContext(std::move(token), opType, _params._metrics, _writeService.master(), - _metaStore, std::move(pendingNotifyRemoveDone), (explicitReuseLid ? lid : 0u), moveDoneCtx); + onWriteDone = createRemoveDoneContext(std::move(token), _writeService.master(), _metaStore, + std::move(pendingNotifyRemoveDone), (explicitReuseLid ? lid : 0u), + std::move(moveDoneCtx)); removeSummary(serialNum, lid, onWriteDone); bool immediateCommit = _commitTimeTracker.needCommit(); removeAttributes(serialNum, lid, immediateCommit, onWriteDone); @@ -732,15 +729,14 @@ StoreOnlyFeedView::handleMove(const MoveOperation &moveOp, IDestructorCallback:: const document::GlobalId &gid = docId.getGlobalId(); FeedToken::UP token; std::shared_ptr<PutDoneContext> onWriteDone = - createPutDoneContext(token, moveOp.getType(), _params._metrics, - _gidToLidChangeHandler, gid, moveOp.getLid(), - serialNum, moveOp.changedDbdId() && useDocumentMetaStore(serialNum), doneCtx); + createPutDoneContext(token, _gidToLidChangeHandler, gid, moveOp.getLid(), serialNum, + moveOp.changedDbdId() && useDocumentMetaStore(serialNum), doneCtx); putSummary(serialNum, moveOp.getLid(), doc, onWriteDone); putAttributes(serialNum, moveOp.getLid(), *doc, immediateCommit, onWriteDone); putIndexedFields(serialNum, moveOp.getLid(), doc, immediateCommit, onWriteDone); } if (docAlreadyExists && moveOp.changedDbdId()) { - internalRemove(FeedToken::UP(), serialNum, std::move(pendingNotifyRemoveDone), moveOp.getPrevLid(), moveOp.getType(), doneCtx); + internalRemove(FeedToken::UP(), serialNum, std::move(pendingNotifyRemoveDone), moveOp.getPrevLid(), doneCtx); } } diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h index 021c2b2f8f7..baaf77bbe59 100644 --- a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h +++ b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h @@ -27,7 +27,6 @@ namespace document { class GLobalId; } namespace proton { class IReplayConfig; -class PerDocTypeFeedMetrics; class ForceCommitContext; class OperationDoneContext; class PutDoneContext; @@ -104,20 +103,17 @@ public: const SerialNum _flushedDocumentMetaStoreSerialNum; const SerialNum _flushedDocumentStoreSerialNum; const DocTypeName _docTypeName; - PerDocTypeFeedMetrics &_metrics; const uint32_t _subDbId; const SubDbType _subDbType; PersistentParams(SerialNum flushedDocumentMetaStoreSerialNum, SerialNum flushedDocumentStoreSerialNum, const DocTypeName &docTypeName, - PerDocTypeFeedMetrics &metrics, uint32_t subDbId, SubDbType subDbType) : _flushedDocumentMetaStoreSerialNum(flushedDocumentMetaStoreSerialNum), _flushedDocumentStoreSerialNum(flushedDocumentStoreSerialNum), _docTypeName(docTypeName), - _metrics(metrics), _subDbId(subDbId), _subDbType(subDbType) {} @@ -183,14 +179,14 @@ private: size_t removeDocuments(const RemoveDocumentsOperation &op, bool remove_index_and_attribute_fields, bool immediateCommit); - void internalRemove(FeedTokenUP token, SerialNum serialNum, PendingNotifyRemoveDone &&pendingNotifyRemoveDone, Lid lid, - FeedOperation::Type opType, std::shared_ptr<search::IDestructorCallback> moveDoneCtx); + void internalRemove(FeedTokenUP token, SerialNum serialNum, PendingNotifyRemoveDone &&pendingNotifyRemoveDone, + Lid lid, std::shared_ptr<search::IDestructorCallback> moveDoneCtx); // Ack token early if visibility delay is nonzero - void considerEarlyAck(FeedTokenUP &token, FeedOperation::Type opType); + void considerEarlyAck(FeedTokenUP &token); - void makeUpdatedDocument(SerialNum serialNum, Lid lid, DocumentUpdateSP upd, - OnOperationDoneType onWriteDone,PromisedDoc promisedDoc, PromisedStream promisedStream); + void makeUpdatedDocument(SerialNum serialNum, Lid lid, DocumentUpdateSP upd, OnOperationDoneType onWriteDone, + PromisedDoc promisedDoc, PromisedStream promisedStream); protected: virtual void internalDeleteBucket(const DeleteBucketOperation &delOp); @@ -225,7 +221,7 @@ protected: public: StoreOnlyFeedView(const Context &ctx, const PersistentParams ¶ms); - virtual ~StoreOnlyFeedView() {} + ~StoreOnlyFeedView() override; const ISummaryAdapter::SP &getSummaryAdapter() const { return _summaryAdapter; } const search::index::Schema::SP &getSchema() const { return _schema; } diff --git a/searchcore/src/vespa/searchcore/proton/server/updatedonecontext.cpp b/searchcore/src/vespa/searchcore/proton/server/updatedonecontext.cpp index 5eb7e2ed7f9..171990c32d3 100644 --- a/searchcore/src/vespa/searchcore/proton/server/updatedonecontext.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/updatedonecontext.cpp @@ -3,23 +3,14 @@ #include "updatedonecontext.h" #include <vespa/searchcore/proton/common/feedtoken.h> -namespace proton -{ - +namespace proton { -UpdateDoneContext::UpdateDoneContext(std::unique_ptr<FeedToken> token, - const FeedOperation::Type opType, - PerDocTypeFeedMetrics &metrics, - const document::DocumentUpdate::SP &upd) - : OperationDoneContext(std::move(token), opType, metrics), +UpdateDoneContext::UpdateDoneContext(std::unique_ptr<FeedToken> token, const document::DocumentUpdate::SP &upd) + : OperationDoneContext(std::move(token)), _upd(upd) { } - -UpdateDoneContext::~UpdateDoneContext() -{ -} - +UpdateDoneContext::~UpdateDoneContext() = default; } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/updatedonecontext.h b/searchcore/src/vespa/searchcore/proton/server/updatedonecontext.h index df47538d6dd..4701db300de 100644 --- a/searchcore/src/vespa/searchcore/proton/server/updatedonecontext.h +++ b/searchcore/src/vespa/searchcore/proton/server/updatedonecontext.h @@ -5,8 +5,7 @@ #include "operationdonecontext.h" #include <vespa/document/update/documentupdate.h> -namespace proton -{ +namespace proton { /** * Context class for document update operations that acks operation when @@ -19,12 +18,8 @@ class UpdateDoneContext : public OperationDoneContext { document::DocumentUpdate::SP _upd; public: - UpdateDoneContext(std::unique_ptr<FeedToken> token, - const FeedOperation::Type opType, - PerDocTypeFeedMetrics &metrics, - const document::DocumentUpdate::SP &upd); - - virtual ~UpdateDoneContext(); + UpdateDoneContext(std::unique_ptr<FeedToken> token, const document::DocumentUpdate::SP &upd); + ~UpdateDoneContext() override; const document::DocumentUpdate &getUpdate() { return *_upd; } }; |