diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2017-10-13 15:21:23 +0200 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2017-10-13 15:26:42 +0200 |
commit | 10ff6f37abf65c3e6b0fd18911fe5c2815b4b06c (patch) | |
tree | 319584e48be066f487da2d1b11543b64b732749d | |
parent | 9fc068e627ae71c5478a91fc06fff4d62933efa1 (diff) |
Remove explicit ack and use feedtoken as a smartptr
27 files changed, 295 insertions, 457 deletions
diff --git a/searchcore/src/tests/proton/documentdb/combiningfeedview/combiningfeedview_test.cpp b/searchcore/src/tests/proton/documentdb/combiningfeedview/combiningfeedview_test.cpp index d4af7b214b6..a997b3cc3db 100644 --- a/searchcore/src/tests/proton/documentdb/combiningfeedview/combiningfeedview_test.cpp +++ b/searchcore/src/tests/proton/documentdb/combiningfeedview/combiningfeedview_test.cpp @@ -72,22 +72,20 @@ struct MyFeedView : public test::DummyFeedView _metaStore.constructFreeList(); } - // Implements IFeedView - virtual const DocumentMetaStore *getDocumentMetaStorePtr() const override { return &_metaStore; } - virtual void preparePut(PutOperation &) override { ++_preparePut; } - virtual void handlePut(FeedToken *, const PutOperation &) override { ++_handlePut; } - virtual void prepareUpdate(UpdateOperation &) override { ++_prepareUpdate; } - virtual void handleUpdate(FeedToken *, const UpdateOperation &) override { ++_handleUpdate; } - virtual void prepareRemove(RemoveOperation &) override { ++_prepareRemove; } - virtual void handleRemove(FeedToken *, const RemoveOperation &) override { ++_handleRemove; } - virtual void prepareDeleteBucket(DeleteBucketOperation &) override { ++_prepareDeleteBucket; } - virtual void handleDeleteBucket(const DeleteBucketOperation &) override - { ++_handleDeleteBucket; } - virtual void prepareMove(MoveOperation &) override { ++_prepareMove; } - virtual void handleMove(const MoveOperation &, IDestructorCallback::SP) override { ++_handleMove; } - virtual void heartBeat(SerialNum) override { ++_heartBeat; } - virtual void handlePruneRemovedDocuments(const PruneRemovedDocumentsOperation &) override { ++_handlePrune; } - virtual void handleCompactLidSpace(const CompactLidSpaceOperation &op) override { + const DocumentMetaStore *getDocumentMetaStorePtr() const override { return &_metaStore; } + void preparePut(PutOperation &) override { ++_preparePut; } + void handlePut(FeedToken, const PutOperation &) override { ++_handlePut; } + void prepareUpdate(UpdateOperation &) override { ++_prepareUpdate; } + void handleUpdate(FeedToken, const UpdateOperation &) override { ++_handleUpdate; } + void prepareRemove(RemoveOperation &) override { ++_prepareRemove; } + void handleRemove(FeedToken, const RemoveOperation &) override { ++_handleRemove; } + void prepareDeleteBucket(DeleteBucketOperation &) override { ++_prepareDeleteBucket; } + void handleDeleteBucket(const DeleteBucketOperation &) override { ++_handleDeleteBucket; } + void prepareMove(MoveOperation &) override { ++_prepareMove; } + void handleMove(const MoveOperation &, IDestructorCallback::SP) override { ++_handleMove; } + void heartBeat(SerialNum) override { ++_heartBeat; } + void handlePruneRemovedDocuments(const PruneRemovedDocumentsOperation &) override { ++_handlePrune; } + void handleCompactLidSpace(const CompactLidSpaceOperation &op) override { _wantedLidLimit = op.getLidLimit(); } }; @@ -213,7 +211,7 @@ TEST_F("require that handlePut() sends to 1 feed view", Fixture) { PutOperation op = f.put(2); op.setDbDocumentId(DbDocumentId(READY, 2)); - f._view.handlePut(NULL, op); + f._view.handlePut(FeedToken(), op); EXPECT_EQUAL(1u, f._ready._view->_handlePut); EXPECT_EQUAL(0u, f._removed._view->_handlePut); EXPECT_EQUAL(0u, f._notReady._view->_handlePut); @@ -225,7 +223,7 @@ TEST_F("require that handlePut() sends to 2 feed views", Fixture) PutOperation op = f.put(2); op.setDbDocumentId(DbDocumentId(NOT_READY, 2)); op.setPrevDbDocumentId(DbDocumentId(REMOVED, 2)); - f._view.handlePut(NULL, op); + f._view.handlePut(FeedToken(), op); EXPECT_EQUAL(0u, f._ready._view->_handlePut); EXPECT_EQUAL(1u, f._removed._view->_handlePut); EXPECT_EQUAL(1u, f._notReady._view->_handlePut); @@ -259,7 +257,7 @@ TEST_F("require that handleRemove() sends op with valid dbdId to 1 feed view", F { RemoveOperation op = f.remove(1); op.setDbDocumentId(DbDocumentId(REMOVED, 1)); - f._view.handleRemove(NULL, op); + f._view.handleRemove(FeedToken(), op); EXPECT_EQUAL(0u, f._ready._view->_handleRemove); EXPECT_EQUAL(1u, f._removed._view->_handleRemove); EXPECT_EQUAL(0u, f._notReady._view->_handleRemove); @@ -271,7 +269,7 @@ TEST_F("require that handleRemove() sends op with valid dbdId to 2 feed views", RemoveOperation op = f.remove(1); op.setDbDocumentId(DbDocumentId(REMOVED, 1)); op.setPrevDbDocumentId(DbDocumentId(READY, 1)); - f._view.handleRemove(NULL, op); + f._view.handleRemove(FeedToken(), op); EXPECT_EQUAL(1u, f._ready._view->_handleRemove); EXPECT_EQUAL(1u, f._removed._view->_handleRemove); EXPECT_EQUAL(0u, f._notReady._view->_handleRemove); @@ -283,7 +281,7 @@ TEST_F("require that handleRemove() sends op with invalid dbdId to prev view", F RemoveOperation op = f.remove(1); // can be used in the case where removed feed view does not remember removes. op.setPrevDbDocumentId(DbDocumentId(READY, 1)); - f._view.handleRemove(NULL, op); + f._view.handleRemove(FeedToken(), op); EXPECT_EQUAL(1u, f._ready._view->_handleRemove); EXPECT_EQUAL(0u, f._removed._view->_handleRemove); EXPECT_EQUAL(0u, f._notReady._view->_handleRemove); @@ -317,7 +315,7 @@ TEST_F("require that handleUpdate() sends op to correct view", Fixture) UpdateOperation op = f.update(1); op.setDbDocumentId(DbDocumentId(READY, 1)); op.setPrevDbDocumentId(DbDocumentId(READY, 1)); - f._view.handleUpdate(NULL, op); + f._view.handleUpdate(FeedToken(), op); EXPECT_EQUAL(1u, f._ready._view->_handleUpdate); EXPECT_EQUAL(0u, f._removed._view->_handleUpdate); EXPECT_EQUAL(0u, f._notReady._view->_handleUpdate); diff --git a/searchcore/src/tests/proton/documentdb/document_subdbs/document_subdbs_test.cpp b/searchcore/src/tests/proton/documentdb/document_subdbs/document_subdbs_test.cpp index 547e400cd76..8369ec0630d 100644 --- a/searchcore/src/tests/proton/documentdb/document_subdbs/document_subdbs_test.cpp +++ b/searchcore/src/tests/proton/documentdb/document_subdbs/document_subdbs_test.cpp @@ -745,18 +745,15 @@ struct DocumentHandler op.setSerialNum(serialNum); return op; } - MoveOperation createMove(Document::UP doc, Timestamp timestamp, - DbDocumentId sourceDbdId, - uint32_t targetSubDbId, - SerialNum serialNum) + MoveOperation createMove(Document::UP doc, Timestamp timestamp, DbDocumentId sourceDbdId, + uint32_t targetSubDbId, SerialNum serialNum) { proton::test::Document testDoc(Document::SP(doc.release()), 0, timestamp); MoveOperation op(testDoc.getBucket(), testDoc.getTimestamp(), testDoc.getDoc(), sourceDbdId, targetSubDbId); op.setSerialNum(serialNum); return op; } - RemoveOperation createRemove(const DocumentId &docId, Timestamp timestamp, - SerialNum serialNum) + RemoveOperation createRemove(const DocumentId &docId, Timestamp timestamp, SerialNum serialNum) { const document::GlobalId &gid = docId.getGlobalId(); BucketId bucket = gid.convertToBucketId(); @@ -769,7 +766,7 @@ struct DocumentHandler void putDoc(PutOperation &op) { IFeedView::SP feedView = _f._subDb.getFeedView(); _f.runInMaster([&]() { feedView->preparePut(op); - feedView->handlePut(NULL, op); } ); + feedView->handlePut(FeedToken(), op); } ); } void moveDoc(MoveOperation &op) { IFeedView::SP feedView = _f._subDb.getFeedView(); @@ -779,11 +776,10 @@ struct DocumentHandler { IFeedView::SP feedView = _f._subDb.getFeedView(); _f.runInMaster([&]() { feedView->prepareRemove(op); - feedView->handleRemove(NULL, op); } ); + feedView->handleRemove(FeedToken(), op); } ); } void putDocs() { - PutOperation putOp = createPut(std::move(createDoc(1, 22, 33)), - Timestamp(10), 10); + PutOperation putOp = createPut(std::move(createDoc(1, 22, 33)), Timestamp(10), 10); putDoc(putOp); putOp = createPut(std::move(createDoc(2, 44, 55)), Timestamp(20), 20); putDoc(putOp); @@ -791,13 +787,8 @@ struct DocumentHandler }; void -assertAttribute(const AttributeGuard &attr, - const vespalib::string &name, - uint32_t numDocs, - int64_t doc1Value, - int64_t doc2Value, - SerialNum createSerialNum, - SerialNum lastSerialNum) +assertAttribute(const AttributeGuard &attr, const vespalib::string &name, uint32_t numDocs, + int64_t doc1Value, int64_t doc2Value, SerialNum createSerialNum, SerialNum lastSerialNum) { EXPECT_EQUAL(name, attr->getName()); EXPECT_EQUAL(numDocs, attr->getNumDocs()); @@ -808,17 +799,13 @@ assertAttribute(const AttributeGuard &attr, } void -assertAttribute1(const AttributeGuard &attr, - SerialNum createSerialNum, - SerialNum lastSerialNum) +assertAttribute1(const AttributeGuard &attr, SerialNum createSerialNum, SerialNum lastSerialNum) { assertAttribute(attr, "attr1", 3, 22, 44, createSerialNum, lastSerialNum); } void -assertAttribute2(const AttributeGuard &attr, - SerialNum createSerialNum, - SerialNum lastSerialNum) +assertAttribute2(const AttributeGuard &attr, SerialNum createSerialNum, SerialNum lastSerialNum) { assertAttribute(attr, "attr2", 3, 33, 55, createSerialNum, lastSerialNum); } @@ -877,12 +864,10 @@ TEST_F("require that regular attributes are populated during reprocessing", requireThatAttributesArePopulatedDuringReprocessing<SearchableFixtureTwoField, ConfigDir2>(f); } -namespace -{ +namespace { bool -assertOperation(DocumentOperation &op, - uint32_t expPrevSubDbId, uint32_t expPrevLid, +assertOperation(DocumentOperation &op, uint32_t expPrevSubDbId, uint32_t expPrevLid, uint32_t expSubDbId, uint32_t expLid) { if (!EXPECT_EQUAL(expPrevSubDbId, op.getPrevSubDbId())) { diff --git a/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp b/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp index b0b06a238c9..e70e83fd61e 100644 --- a/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp +++ b/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp @@ -140,12 +140,6 @@ struct MyReplayConfig : public IReplayConfig { virtual void replayConfig(SerialNum) override {} }; -void ackToken(FeedToken *token) { - if (token != NULL) { - token->ack(); - } -} - struct MyDocumentMetaStore { struct Entry { DbDocumentId _id; @@ -195,9 +189,9 @@ struct MyFeedView : public test::DummyFeedView { int update_count; SerialNum update_serial; MyFeedView(const DocumentTypeRepo::SP &dtr); - ~MyFeedView(); + ~MyFeedView() override; void resetPutLatch(uint32_t count) { putLatch.reset(new vespalib::CountDownLatch(count)); } - virtual void preparePut(PutOperation &op) override { + void preparePut(PutOperation &op) override { prepareDocumentOperation(op, op.getDocument()->getId().getGlobalId()); } void prepareDocumentOperation(DocumentOperation &op, const GlobalId &gid) { @@ -208,7 +202,8 @@ struct MyFeedView : public test::DummyFeedView { op.setPrevTimestamp(entry->_prevTimestamp); } } - virtual void handlePut(FeedToken *token, const PutOperation &putOp) override { + void handlePut(FeedToken token, const PutOperation &putOp) override { + (void) token; LOG(info, "MyFeedView::handlePut(): docId(%s), putCount(%u), putLatchCount(%u)", putOp.getDocument()->getId().toString().c_str(), put_count, (putLatch.get() != NULL ? putLatch->getCount() : 0u)); @@ -221,23 +216,24 @@ struct MyFeedView : public test::DummyFeedView { if (putLatch.get() != NULL) { putLatch->countDown(); } - ackToken(token); } - virtual void prepareUpdate(UpdateOperation &op) override { + void prepareUpdate(UpdateOperation &op) override { prepareDocumentOperation(op, op.getUpdate()->getId().getGlobalId()); } - virtual void handleUpdate(FeedToken *token, const UpdateOperation &op) override { + void handleUpdate(FeedToken token, const UpdateOperation &op) override { + (void) token; + ++update_count; update_serial = op.getSerialNum(); - ackToken(token); } - virtual void handleRemove(FeedToken *token, const RemoveOperation &) override - { ++remove_count; ackToken(token); } - virtual void handleMove(const MoveOperation &, IDestructorCallback::SP) override { ++move_count; } - virtual void heartBeat(SerialNum) override { ++heartbeat_count; } - virtual void handlePruneRemovedDocuments( - const PruneRemovedDocumentsOperation &) override { ++prune_removed_count; } - virtual const ISimpleDocumentMetaStore *getDocumentMetaStorePtr() const override { + void handleRemove(FeedToken token, const RemoveOperation &) override { + (void) token; + ++remove_count; + } + void handleMove(const MoveOperation &, IDestructorCallback::SP) override { ++move_count; } + void heartBeat(SerialNum) override { ++heartbeat_count; } + void handlePruneRemovedDocuments(const PruneRemovedDocumentsOperation &) override { ++prune_removed_count; } + const ISimpleDocumentMetaStore *getDocumentMetaStorePtr() const override { return NULL; } }; @@ -317,8 +313,7 @@ MyTransport::~MyTransport() {} struct FeedTokenContext { MyTransport transport; - FeedToken::UP token_ap; - FeedToken &token; + FeedToken token; FeedTokenContext(); ~FeedTokenContext(); @@ -333,10 +328,8 @@ struct FeedTokenContext { FeedTokenContext::FeedTokenContext() : transport(), - token_ap(new FeedToken(transport)), - token(*token_ap) -{ -} + token(transport) +{} FeedTokenContext::~FeedTokenContext() = default; @@ -432,8 +425,7 @@ struct FeedHandlerFixture handler.init(1); } - ~FeedHandlerFixture() - { + ~FeedHandlerFixture() { writeService.sync(); } template <class FunctionType> @@ -484,7 +476,7 @@ TEST_F("require that outdated remove is ignored", FeedHandlerFixture) static_cast<DocumentOperation &>(*op).setPrevDbDocumentId(DbDocumentId(4)); static_cast<DocumentOperation &>(*op).setPrevTimestamp(Timestamp(10000)); FeedTokenContext token_context; - f.handler.performOperation(std::move(token_context.token_ap), std::move(op)); + f.handler.performOperation(std::move(token_context.token), std::move(op)); EXPECT_EQUAL(0, f.feedView.remove_count); EXPECT_EQUAL(0, f.tls_writer.store_count); } @@ -496,7 +488,7 @@ TEST_F("require that outdated put is ignored", FeedHandlerFixture) Timestamp(10), doc_context.doc)); static_cast<DocumentOperation &>(*op).setPrevTimestamp(Timestamp(10000)); FeedTokenContext token_context; - f.handler.performOperation(std::move(token_context.token_ap), std::move(op)); + f.handler.performOperation(std::move(token_context.token), std::move(op)); EXPECT_EQUAL(0, f.feedView.put_count); EXPECT_EQUAL(0, f.tls_writer.store_count); } @@ -575,7 +567,7 @@ TEST_F("require that remove of unknown document with known data type stores remo DocumentContext doc_context("id:test:searchdocument::foo", *f.schema.builder); FeedOperation::UP op(new RemoveOperation(doc_context.bucketId, Timestamp(10), doc_context.doc->getId())); FeedTokenContext token_context; - f.handler.performOperation(std::move(token_context.token_ap), std::move(op)); + f.handler.performOperation(std::move(token_context.token), std::move(op)); EXPECT_EQUAL(1, f.feedView.remove_count); EXPECT_EQUAL(1, f.tls_writer.store_count); } @@ -585,7 +577,7 @@ TEST_F("require that partial update for non-existing document is tagged as such" UpdateContext upCtx("id:test:searchdocument::foo", *f.schema.builder); FeedOperation::UP op(new UpdateOperation(upCtx.bucketId, Timestamp(10), upCtx.update)); FeedTokenContext token_context; - f.handler.performOperation(std::move(token_context.token_ap), std::move(op)); + f.handler.performOperation(std::move(token_context.token), std::move(op)); const UpdateResult *result = static_cast<const UpdateResult *>(token_context.getResult()); EXPECT_FALSE(token_context.transport.documentWasFound); @@ -603,7 +595,7 @@ TEST_F("require that partial update for non-existing document is created if spec f.feedView.metaStore.insert(upCtx.update->getId().getGlobalId(), MyDocumentMetaStore::Entry(5, 5, Timestamp(10))); FeedOperation::UP op(new UpdateOperation(upCtx.bucketId, Timestamp(10), upCtx.update)); FeedTokenContext token_context; - f.handler.performOperation(std::move(token_context.token_ap), std::move(op)); + f.handler.performOperation(std::move(token_context.token), std::move(op)); const UpdateResult *result = static_cast<const UpdateResult *>(token_context.getResult()); EXPECT_TRUE(token_context.transport.documentWasFound); @@ -624,7 +616,7 @@ TEST_F("require that put is rejected if resource limit is reached", FeedHandlerF DocumentContext docCtx("id:test:searchdocument::foo", *f.schema.builder); FeedOperation::UP op = std::make_unique<PutOperation>(docCtx.bucketId, Timestamp(10), docCtx.doc); FeedTokenContext token; - f.handler.performOperation(std::move(token.token_ap), std::move(op)); + f.handler.performOperation(std::move(token.token), std::move(op)); EXPECT_EQUAL(0, f.feedView.put_count); EXPECT_EQUAL(Result::RESOURCE_EXHAUSTED, token.getResult()->getErrorCode()); EXPECT_EQUAL("Put operation rejected for document 'id:test:searchdocument::foo' of type 'searchdocument': 'Attribute resource limit reached'", @@ -639,7 +631,7 @@ TEST_F("require that update is rejected if resource limit is reached", FeedHandl UpdateContext updCtx("id:test:searchdocument::foo", *f.schema.builder); FeedOperation::UP op = std::make_unique<UpdateOperation>(updCtx.bucketId, Timestamp(10), updCtx.update); FeedTokenContext token; - f.handler.performOperation(std::move(token.token_ap), std::move(op)); + f.handler.performOperation(std::move(token.token), std::move(op)); EXPECT_EQUAL(0, f.feedView.update_count); EXPECT_TRUE(dynamic_cast<const UpdateResult *>(token.getResult())); EXPECT_EQUAL(Result::RESOURCE_EXHAUSTED, token.getResult()->getErrorCode()); @@ -655,7 +647,7 @@ TEST_F("require that remove is NOT rejected if resource limit is reached", FeedH DocumentContext docCtx("id:test:searchdocument::foo", *f.schema.builder); FeedOperation::UP op = std::make_unique<RemoveOperation>(docCtx.bucketId, Timestamp(10), docCtx.doc->getId()); FeedTokenContext token; - f.handler.performOperation(std::move(token.token_ap), std::move(op)); + f.handler.performOperation(std::move(token.token), std::move(op)); EXPECT_EQUAL(1, f.feedView.remove_count); EXPECT_EQUAL(Result::NONE, token.getResult()->getErrorCode()); EXPECT_EQUAL("", token.getResult()->getErrorMessage()); diff --git a/searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp b/searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp index 4eefbed0a53..548ce8ba20d 100644 --- a/searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp +++ b/searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp @@ -571,7 +571,7 @@ struct FixtureBase return doc("doc:test:1", timestamp); } - void performPut(FeedToken *token, PutOperation &op) { + void performPut(FeedToken token, PutOperation &op) { getFeedView().preparePut(op); op.setSerialNum(++serial); getFeedView().handlePut(token, op); @@ -586,10 +586,10 @@ struct FixtureBase void putAndWait(const DocumentContext &docCtx) { FeedTokenContext token(_tracer); PutOperation op(docCtx.bid, docCtx.ts, docCtx.doc); - runInMaster([&] () { performPut(&token.ft, op); }); + runInMaster([&] () { performPut(token.ft, op); }); } - void performUpdate(FeedToken *token, UpdateOperation &op) { + void performUpdate(FeedToken token, UpdateOperation &op) { getFeedView().prepareUpdate(op); op.setSerialNum(++serial); getFeedView().handleUpdate(token, op); @@ -598,25 +598,21 @@ struct FixtureBase void updateAndWait(const DocumentContext &docCtx) { FeedTokenContext token(_tracer); UpdateOperation op(docCtx.bid, docCtx.ts, docCtx.upd); - runInMaster([&] () { performUpdate(&token.ft, op); }); + runInMaster([&] () { performUpdate(token.ft, op); }); } - void performRemove(FeedToken *token, RemoveOperation &op) { + void performRemove(FeedToken token, RemoveOperation &op) { getFeedView().prepareRemove(op); if (op.getValidNewOrPrevDbdId()) { op.setSerialNum(++serial); - getFeedView().handleRemove(token, op); - } else { - if (token != NULL) { - token->ack(); - } + getFeedView().handleRemove(std::move(token), op); } } void removeAndWait(const DocumentContext &docCtx) { FeedTokenContext token(_tracer); RemoveOperation op(docCtx.bid, docCtx.ts, docCtx.doc->getId()); - runInMaster([&] () { performRemove(&token.ft, op); }); + runInMaster([&] () { performRemove(token.ft, op); }); } void removeAndWait(const DocumentContext::List &docs) { diff --git a/searchcore/src/tests/proton/feedtoken/feedtoken.cpp b/searchcore/src/tests/proton/feedtoken/feedtoken.cpp index 530c9ebef39..0fea9501d93 100644 --- a/searchcore/src/tests/proton/feedtoken/feedtoken.cpp +++ b/searchcore/src/tests/proton/feedtoken/feedtoken.cpp @@ -45,8 +45,9 @@ void Test::testAck() { LocalTransport transport; - FeedToken token(transport); - token.ack(); + { + FeedToken token(transport); + } EXPECT_EQUAL(1u, transport.getReceivedCount()); } @@ -70,9 +71,10 @@ Test::testHandover() LocalTransport transport; - FeedToken token(transport); - token = MyHandover::handover(token); - token.ack(); + { + FeedToken token(transport); + token = MyHandover::handover(token); + } EXPECT_EQUAL(1u, transport.getReceivedCount()); } diff --git a/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp b/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp index f8c0e0ac2f6..38d4d809d60 100644 --- a/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp +++ b/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp @@ -127,33 +127,29 @@ struct MyDocumentRetriever : DocumentRetrieverBaseForTest { MyDocumentRetriever(const Document *d, Timestamp ts, DocumentId &last_id) : repo(), document(d), timestamp(ts), last_doc_id(last_id) {} - virtual const document::DocumentTypeRepo &getDocumentTypeRepo() const override { + const document::DocumentTypeRepo &getDocumentTypeRepo() const override { return repo; } - virtual void getBucketMetaData(const storage::spi::Bucket &, - search::DocumentMetaData::Vector &v) const override { + void getBucketMetaData(const storage::spi::Bucket &, search::DocumentMetaData::Vector &v) const override { if (document != 0) { v.push_back(getDocumentMetaData(document->getId())); } } - virtual DocumentMetaData getDocumentMetaData(const DocumentId &id) const override { + DocumentMetaData getDocumentMetaData(const DocumentId &id) const override { last_doc_id = id; if (document != 0) { - return DocumentMetaData(1, timestamp, document::BucketId(1), - document->getId().getGlobalId()); + return DocumentMetaData(1, timestamp, document::BucketId(1), document->getId().getGlobalId()); } return DocumentMetaData(); } - virtual document::Document::UP getDocument(search::DocumentIdT) const override { + document::Document::UP getDocument(search::DocumentIdT) const override { if (document != 0) { return Document::UP(document->clone()); } return Document::UP(); } - virtual CachedSelect::SP - parseSelect(const vespalib::string &) const override - { + CachedSelect::SP parseSelect(const vespalib::string &) const override { return CachedSelect::SP(); } }; @@ -208,134 +204,110 @@ struct MyHandler : public IPersistenceHandler, IBucketFreezer { setExistingTimestamp(ts); } void handle(FeedToken token, const Bucket &bucket, Timestamp timestamp, const DocumentId &docId) { + (void) token; lastBucket = bucket; lastTimestamp = timestamp; lastDocId = docId; - token.ack(); } - virtual void initialize() override { initialized = true; } + void initialize() override { initialized = true; } - virtual void handlePut(FeedToken token, const Bucket& bucket, - Timestamp timestamp, const document::Document::SP& doc) override { + void handlePut(FeedToken token, const Bucket& bucket, + Timestamp timestamp, const document::Document::SP& doc) override { token.setResult(ResultUP(new storage::spi::Result()), false); handle(token, bucket, timestamp, doc->getId()); } - virtual void handleUpdate(FeedToken token, const Bucket& bucket, - Timestamp timestamp, const document::DocumentUpdate::SP& upd) override { + void handleUpdate(FeedToken token, const Bucket& bucket, + Timestamp timestamp, const document::DocumentUpdate::SP& upd) override { token.setResult(ResultUP(new storage::spi::UpdateResult(existingTimestamp)), existingTimestamp > 0); handle(token, bucket, timestamp, upd->getId()); } - virtual void handleRemove(FeedToken token, const Bucket& bucket, - Timestamp timestamp, const DocumentId& id) override { + void handleRemove(FeedToken token, const Bucket& bucket, + Timestamp timestamp, const DocumentId& id) override { bool wasFound = existingTimestamp > 0; token.setResult(ResultUP(new storage::spi::RemoveResult(wasFound)), wasFound); handle(token, bucket, timestamp, id); } - virtual void handleListBuckets(IBucketIdListResultHandler &resultHandler) override { + void handleListBuckets(IBucketIdListResultHandler &resultHandler) override { resultHandler.handle(BucketIdListResult(bucketList)); } - virtual void handleSetClusterState(const ClusterState &calc, - IGenericResultHandler &resultHandler) override { + void handleSetClusterState(const ClusterState &calc, IGenericResultHandler &resultHandler) override { lastCalc = &calc; resultHandler.handle(Result()); } - virtual void handleSetActiveState(const Bucket &bucket, - storage::spi::BucketInfo::ActiveState newState, - IGenericResultHandler &resultHandler) override { + void handleSetActiveState(const Bucket &bucket, storage::spi::BucketInfo::ActiveState newState, + IGenericResultHandler &resultHandler) override { lastBucket = bucket; lastBucketState = newState; resultHandler.handle(bucketStateResult); } - virtual void handleGetBucketInfo(const Bucket &, - IBucketInfoResultHandler &resultHandler) override { + void handleGetBucketInfo(const Bucket &, IBucketInfoResultHandler &resultHandler) override { resultHandler.handle(BucketInfoResult(bucketInfo)); } - virtual void - handleCreateBucket(FeedToken token, - const storage::spi::Bucket &) override - { + void handleCreateBucket(FeedToken token, const storage::spi::Bucket &) override { token.setResult(ResultUP(new Result(_createBucketResult)), true); - token.ack(); } - virtual void handleDeleteBucket(FeedToken token, - const storage::spi::Bucket &) override { + void handleDeleteBucket(FeedToken token, const storage::spi::Bucket &) override { token.setResult(ResultUP(new Result(deleteBucketResult)), true); - token.ack(); } - virtual void handleGetModifiedBuckets(IBucketIdListResultHandler &resultHandler) override { + void handleGetModifiedBuckets(IBucketIdListResultHandler &resultHandler) override { resultHandler.handle(BucketIdListResult(modBucketList)); } - virtual void - handleSplit(FeedToken token, - const storage::spi::Bucket &source, - const storage::spi::Bucket &target1, - const storage::spi::Bucket &target2) override + void handleSplit(FeedToken token, const storage::spi::Bucket &source, const storage::spi::Bucket &target1, + const storage::spi::Bucket &target2) override { (void) source; (void) target1; (void) target2; token.setResult(ResultUP(new Result(_splitResult)), true); - token.ack(); } - virtual void - handleJoin(FeedToken token, - const storage::spi::Bucket &source1, - const storage::spi::Bucket &source2, + void handleJoin(FeedToken token, const storage::spi::Bucket &source1, const storage::spi::Bucket &source2, const storage::spi::Bucket &target) override { (void) source1; (void) source2; (void) target; token.setResult(ResultUP(new Result(_joinResult)), true); - token.ack(); } - virtual RetrieversSP getDocumentRetrievers(storage::spi::ReadConsistency) override { + RetrieversSP getDocumentRetrievers(storage::spi::ReadConsistency) override { RetrieversSP ret(new std::vector<IDocumentRetriever::SP>); - ret->push_back(IDocumentRetriever::SP(new MyDocumentRetriever( - 0, Timestamp(), lastDocId))); - ret->push_back(IDocumentRetriever::SP(new MyDocumentRetriever( - document, existingTimestamp, lastDocId))); + ret->push_back(IDocumentRetriever::SP(new MyDocumentRetriever(0, Timestamp(), lastDocId))); + ret->push_back(IDocumentRetriever::SP(new MyDocumentRetriever(document, existingTimestamp, lastDocId))); return ret; } - virtual BucketGuard::UP lockBucket(const storage::spi::Bucket &b) override { + BucketGuard::UP lockBucket(const storage::spi::Bucket &b) override { return BucketGuard::UP(new BucketGuard(b.getBucketId(), *this)); } - virtual void - handleListActiveBuckets(IBucketIdListResultHandler &resultHandler) override - { + void handleListActiveBuckets(IBucketIdListResultHandler &resultHandler) override { BucketIdListResult::List list; resultHandler.handle(BucketIdListResult(list)); } - virtual void - handlePopulateActiveBuckets(document::BucketId::List &buckets, - IGenericResultHandler &resultHandler) override - { + void handlePopulateActiveBuckets(document::BucketId::List &buckets, IGenericResultHandler &resultHandler) override { (void) buckets; resultHandler.handle(Result()); } - virtual void freezeBucket(BucketId bucket) override { + void freezeBucket(BucketId bucket) override { frozen.insert(bucket.getId()); was_frozen.insert(bucket.getId()); } - virtual void thawBucket(BucketId bucket) override { + void thawBucket(BucketId bucket) override { std::multiset<uint64_t>::iterator it = frozen.find(bucket.getId()); ASSERT_TRUE(it != frozen.end()); frozen.erase(it); @@ -425,11 +397,7 @@ HandlerSet::prepareGetModifiedBuckets() class SimplePersistenceEngineOwner : public IPersistenceEngineOwner { - virtual void - setClusterState(const storage::spi::ClusterState &calc) override - { - (void) calc; - } + void setClusterState(const storage::spi::ClusterState &calc) override { (void) calc; } }; struct SimpleResourceWriteFilter : public IResourceWriteFilter diff --git a/searchcore/src/tests/proton/server/feedstates_test.cpp b/searchcore/src/tests/proton/server/feedstates_test.cpp index 15c2fbe5a84..a6f3496e1ed 100644 --- a/searchcore/src/tests/proton/server/feedstates_test.cpp +++ b/searchcore/src/tests/proton/server/feedstates_test.cpp @@ -40,10 +40,8 @@ struct MyFeedView : public test::DummyFeedView { MyFeedView(); ~MyFeedView(); - virtual const DocumentTypeRepo::SP &getDocumentTypeRepo() const override - { return repo_sp; } - virtual void handleRemove(FeedToken *, const RemoveOperation &) override - { ++remove_handled; } + const DocumentTypeRepo::SP &getDocumentTypeRepo() const override { return repo_sp; } + void handleRemove(FeedToken , const RemoveOperation &) override { ++remove_handled; } }; MyFeedView::MyFeedView() : repo_sp(repo.getTypeRepoSp()), remove_handled(0) {} diff --git a/searchcore/src/vespa/searchcore/proton/common/feedtoken.cpp b/searchcore/src/vespa/searchcore/proton/common/feedtoken.cpp index 008fafa332b..6fcd6bae4ec 100644 --- a/searchcore/src/vespa/searchcore/proton/common/feedtoken.cpp +++ b/searchcore/src/vespa/searchcore/proton/common/feedtoken.cpp @@ -4,48 +4,40 @@ namespace proton { +FeedToken::FeedToken() = default; + FeedToken::FeedToken(ITransport &transport) : - _state(new State(transport, 1)) + _state(new State(transport)) { } -FeedToken::State::State(ITransport & transport, uint32_t numAcksRequired) : +FeedToken::State::State(ITransport & transport) : _transport(transport), _result(new storage::spi::Result()), _documentWasFound(false), - _unAckedCount(numAcksRequired) + _alreadySent(false) { - assert(_unAckedCount > 0); } FeedToken::State::~State() { - assert(_unAckedCount == 0); + ack(); } void FeedToken::State::ack() { - uint32_t prev(_unAckedCount--); - if (prev == 1) { + bool alreadySent = _alreadySent.exchange(true); + if ( !alreadySent ) { _transport.send(std::move(_result), _documentWasFound); } - assert(prev >= 1); -} - -void -FeedToken::State::incNeededAcks() -{ - uint32_t prev(_unAckedCount++); - assert(prev >= 1); - (void) prev; } void FeedToken::State::fail() { - uint32_t prev = _unAckedCount.exchange(0); - if (prev > 0) { + bool alreadySent = _alreadySent.exchange(true); + if ( !alreadySent ) { _transport.send(std::move(_result), _documentWasFound); } } diff --git a/searchcore/src/vespa/searchcore/proton/common/feedtoken.h b/searchcore/src/vespa/searchcore/proton/common/feedtoken.h index 856c8a22652..d43a80b25ed 100644 --- a/searchcore/src/vespa/searchcore/proton/common/feedtoken.h +++ b/searchcore/src/vespa/searchcore/proton/common/feedtoken.h @@ -23,15 +23,12 @@ public: virtual void send(ResultUP result, bool documentWasFound) = 0; }; -private: class State { public: State(const State &) = delete; State & operator = (const State &) = delete; - State(ITransport & transport, uint32_t numAcksRequired); + State(ITransport & transport); ~State(); - void incNeededAcks(); - void ack(); void fail(); void setResult(ResultUP result, bool documentWasFound) { _documentWasFound = documentWasFound; @@ -39,10 +36,11 @@ private: } const storage::spi::Result &getResult() { return *_result; } private: + void ack(); ITransport &_transport; ResultUP _result; bool _documentWasFound; - std::atomic<uint32_t> _unAckedCount; + std::atomic<bool> _alreadySent; }; std::shared_ptr<State> _state; @@ -59,6 +57,7 @@ public: * @param transport The transport to pass the reply to. */ FeedToken(ITransport &transport); + FeedToken(); FeedToken(FeedToken &&) = default; FeedToken & operator =(FeedToken &&) = default; @@ -66,16 +65,10 @@ public: FeedToken & operator =(const FeedToken &) = default; ~FeedToken() = default; - /** - * Passes a receipt back to the originating FeedEngine, declaring that this - * operation succeeded. If an error occured while processing the operation, - * use fail() instead. Invoking this and/or fail() more than once is void. - */ - void ack() const { _state->ack(); } - - void incNeededAcks() const { - _state->incNeededAcks(); - } + explicit operator bool() const { return static_cast<bool>(_state); } + State * operator ->() { return _state.get(); } + const State * operator -> () const { return _state.get(); } + void reset() { _state.reset(); } /** * Passes a receipt back to the originating FeedEngine, declaring that this diff --git a/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.cpp b/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.cpp index 30ee6e1ba75..106ce846d99 100644 --- a/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.cpp @@ -117,19 +117,16 @@ CombiningFeedView::preparePut(PutOperation &putOp) } void -CombiningFeedView::handlePut(FeedToken *token, - const PutOperation &putOp) +CombiningFeedView::handlePut(FeedToken token, const PutOperation &putOp) { assert(putOp.getValidDbdId()); uint32_t subDbId = putOp.getSubDbId(); uint32_t prevSubDbId = putOp.getPrevSubDbId(); if (putOp.getValidPrevDbdId() && prevSubDbId != subDbId) { - if (token != NULL) - token->incNeededAcks(); _views[subDbId]->handlePut(token, putOp); - _views[prevSubDbId]->handlePut(token, putOp); + _views[prevSubDbId]->handlePut(std::move(token), putOp); } else { - _views[subDbId]->handlePut(token, putOp); + _views[subDbId]->handlePut(std::move(token), putOp); } } @@ -143,14 +140,13 @@ CombiningFeedView::prepareUpdate(UpdateOperation &updOp) } void -CombiningFeedView::handleUpdate(FeedToken *token, - const UpdateOperation &updOp) +CombiningFeedView::handleUpdate(FeedToken token, const UpdateOperation &updOp) { assert(updOp.getValidDbdId()); assert(updOp.getValidPrevDbdId()); assert(!updOp.changedDbdId()); uint32_t subDbId(updOp.getSubDbId()); - _views[subDbId]->handleUpdate(token, updOp); + _views[subDbId]->handleUpdate(std::move(token), updOp); } void @@ -165,19 +161,16 @@ CombiningFeedView::prepareRemove(RemoveOperation &rmOp) } void -CombiningFeedView::handleRemove(FeedToken *token, - const RemoveOperation &rmOp) +CombiningFeedView::handleRemove(FeedToken token, const RemoveOperation &rmOp) { if (rmOp.getValidDbdId()) { uint32_t subDbId = rmOp.getSubDbId(); uint32_t prevSubDbId = rmOp.getPrevSubDbId(); if (rmOp.getValidPrevDbdId() && prevSubDbId != subDbId) { - if (token != NULL) - token->incNeededAcks(); _views[subDbId]->handleRemove(token, rmOp); - _views[prevSubDbId]->handleRemove(token, rmOp); + _views[prevSubDbId]->handleRemove(std::move(token), rmOp); } else { - _views[subDbId]->handleRemove(token, rmOp); + _views[subDbId]->handleRemove(std::move(token), rmOp); } } else { assert(rmOp.getValidPrevDbdId()); diff --git a/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.h b/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.h index ea4ac64176a..284004dfa81 100644 --- a/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.h +++ b/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.h @@ -16,8 +16,7 @@ #include "replaypacketdispatcher.h" #include "ibucketstatecalculator.h" -namespace proton -{ +namespace proton { class CombiningFeedView : public IFeedView @@ -63,7 +62,7 @@ public: document::BucketSpace bucketSpace, const IBucketStateCalculator::SP &calc); - virtual ~CombiningFeedView(); + ~CombiningFeedView() override; const document::DocumentTypeRepo::SP & getDocumentTypeRepo() const override; @@ -72,11 +71,11 @@ public: */ void preparePut(PutOperation &putOp) override; - void handlePut(FeedToken *token, const PutOperation &putOp) override; + void handlePut(FeedToken token, const PutOperation &putOp) override; void prepareUpdate(UpdateOperation &updOp) override; - void handleUpdate(FeedToken *token, const UpdateOperation &updOp) override; + void handleUpdate(FeedToken token, const UpdateOperation &updOp) override; void prepareRemove(RemoveOperation &rmOp) override; - void handleRemove(FeedToken *token, const RemoveOperation &rmOp) override; + void handleRemove(FeedToken token, const RemoveOperation &rmOp) override; void prepareDeleteBucket(DeleteBucketOperation &delOp) override; void handleDeleteBucket(const DeleteBucketOperation &delOp) override; void prepareMove(MoveOperation &putOp) override; diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp index 9300af5c3f0..ea81b64ff22 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp @@ -90,31 +90,30 @@ FeedHandler::doHandleOperation(FeedToken token, FeedOperation::UP op) { assert(_writeService.master().isCurrentThread()); LockGuard guard(_feedLock); - _feedState->handleOperation(token, std::move(op)); + _feedState->handleOperation(std::move(token), std::move(op)); } -void FeedHandler::performPut(FeedToken::UP token, PutOperation &op) { +void FeedHandler::performPut(FeedToken token, PutOperation &op) { op.assertValid(); _activeFeedView->preparePut(op); if (ignoreOperation(op)) { LOG(debug, "performPut(): ignoreOperation: docId(%s), timestamp(%" PRIu64 "), prevTimestamp(%" PRIu64 ")", op.getDocument()->getId().toString().c_str(), (uint64_t)op.getTimestamp(), (uint64_t)op.getPrevTimestamp()); if (token) { - token->setResult(ResultUP(new Result), false); - token->ack(); + token->setResult(std::make_unique<Result>(), false); } return; } storeOperation(op); if (token) { - token->setResult(ResultUP(new Result), false); + token->setResult(std::make_unique<Result>(), false); } - _activeFeedView->handlePut(token.get(), op); + _activeFeedView->handlePut(std::move(token), op); } void -FeedHandler::performUpdate(FeedToken::UP token, UpdateOperation &op) +FeedHandler::performUpdate(FeedToken token, UpdateOperation &op) { _activeFeedView->prepareUpdate(op); if (op.getPrevDbDocumentId().valid() && !op.getPrevMarkedAsRemoved()) { @@ -123,26 +122,25 @@ FeedHandler::performUpdate(FeedToken::UP token, UpdateOperation &op) createNonExistingDocument(std::move(token), op); } else { if (token) { - token->setResult(ResultUP(new UpdateResult(Timestamp(0))), false); - token->ack(); + token->setResult(std::make_unique<UpdateResult>(Timestamp(0)), false); } } } void -FeedHandler::performInternalUpdate(FeedToken::UP token, UpdateOperation &op) +FeedHandler::performInternalUpdate(FeedToken token, UpdateOperation &op) { storeOperation(op); if (token) { token->setResult(ResultUP(new UpdateResult(op.getPrevTimestamp())), true); } - _activeFeedView->handleUpdate(token.get(), op); + _activeFeedView->handleUpdate(std::move(token), op); } void -FeedHandler::createNonExistingDocument(FeedToken::UP token, const UpdateOperation &op) +FeedHandler::createNonExistingDocument(FeedToken token, const UpdateOperation &op) { Document::SP doc(new Document(op.getUpdate()->getType(), op.getUpdate()->getId())); doc->setRepo(*_activeFeedView->getDocumentTypeRepo()); @@ -155,22 +153,18 @@ FeedHandler::createNonExistingDocument(FeedToken::UP token, const UpdateOperatio } TransportLatch latch(1); FeedToken putToken(latch); - _activeFeedView->handlePut(&putToken, putOp); + _activeFeedView->handlePut(std::move(putToken), putOp); latch.await(); - if (token) { - token->ack(); - } } -void FeedHandler::performRemove(FeedToken::UP token, RemoveOperation &op) { +void FeedHandler::performRemove(FeedToken token, RemoveOperation &op) { _activeFeedView->prepareRemove(op); if (ignoreOperation(op)) { LOG(debug, "performRemove(): ignoreOperation: docId(%s), timestamp(%" PRIu64 "), prevTimestamp(%" PRIu64 ")", op.getDocumentId().toString().c_str(), (uint64_t)op.getTimestamp(), (uint64_t)op.getPrevTimestamp()); if (token) { token->setResult(ResultUP(new RemoveResult(false)), false); - token->ack(); } return; } @@ -182,70 +176,60 @@ void FeedHandler::performRemove(FeedToken::UP token, RemoveOperation &op) { bool documentWasFound = !op.getPrevMarkedAsRemoved(); token->setResult(ResultUP(new RemoveResult(documentWasFound)), documentWasFound); } - _activeFeedView->handleRemove(token.get(), op); + _activeFeedView->handleRemove(std::move(token), op); } else if (op.hasDocType()) { assert(op.getDocType() == _docTypeName.getName()); storeOperation(op); if (token) { token->setResult(ResultUP(new RemoveResult(false)), false); } - _activeFeedView->handleRemove(token.get(), op); + _activeFeedView->handleRemove(std::move(token), op); } else { if (token) { token->setResult(ResultUP(new RemoveResult(false)), false); - token->ack(); } } } void -FeedHandler::performGarbageCollect(FeedToken::UP token) +FeedHandler::performGarbageCollect(FeedToken token) { - if (token) { - token->ack(); - } + (void) token; } void -FeedHandler::performCreateBucket(FeedToken::UP token, CreateBucketOperation &op) +FeedHandler::performCreateBucket(FeedToken token, CreateBucketOperation &op) { + (void) token; storeOperation(op); _bucketDBHandler->handleCreateBucket(op.getBucketId()); - if (token) { - token->ack(); - } } -void FeedHandler::performDeleteBucket(FeedToken::UP token, DeleteBucketOperation &op) { +void FeedHandler::performDeleteBucket(FeedToken token, DeleteBucketOperation &op) { + (void) token; _activeFeedView->prepareDeleteBucket(op); storeOperation(op); // Delete documents in bucket _activeFeedView->handleDeleteBucket(op); // Delete bucket itself, should no longer have documents. _bucketDBHandler->handleDeleteBucket(op.getBucketId()); - if (token) { - token->ack(); - } + } -void FeedHandler::performSplit(FeedToken::UP token, SplitBucketOperation &op) { +void FeedHandler::performSplit(FeedToken token, SplitBucketOperation &op) { + (void) token; storeOperation(op); _bucketDBHandler->handleSplit(op.getSerialNum(), op.getSource(), op.getTarget1(), op.getTarget2()); - if (token) { - token->ack(); - } } -void FeedHandler::performJoin(FeedToken::UP token, JoinBucketsOperation &op) { +void FeedHandler::performJoin(FeedToken token, JoinBucketsOperation &op) { + (void) token; storeOperation(op); _bucketDBHandler->handleJoin(op.getSerialNum(), op.getSource1(), op.getSource2(), op.getTarget()); - if (token) { - token->ack(); - } } @@ -328,7 +312,7 @@ void FeedHandler::changeFeedState(FeedState::SP newState) { LockGuard guard(_feedLock); - changeFeedState(newState, guard); + changeFeedState(std::move(newState), guard); } @@ -466,8 +450,8 @@ isRejectableFeedOperation(FeedOperation::Type type) } template <typename ResultType> -void feedOperationRejected(FeedToken *token, const vespalib::string &opType, const vespalib::string &docId, - DocTypeName docTypeName, const vespalib::string &rejectMessage) +void feedOperationRejected(FeedToken & token, const vespalib::string &opType, const vespalib::string &docId, + const DocTypeName & docTypeName, const vespalib::string &rejectMessage) { if (token) { auto message = make_string("%s operation rejected for document '%s' of type '%s': '%s'", @@ -478,8 +462,8 @@ void feedOperationRejected(FeedToken *token, const vespalib::string &opType, con } void -notifyFeedOperationRejected(FeedToken *token, const FeedOperation &op, - DocTypeName docTypeName, const vespalib::string &rejectMessage) +notifyFeedOperationRejected(FeedToken & token, const FeedOperation &op, + const DocTypeName & docTypeName, const vespalib::string &rejectMessage) { if ((op.getType() == FeedOperation::UPDATE_42) || (op.getType() == FeedOperation::UPDATE)) { vespalib::string docId = (static_cast<const UpdateOperation &>(op)).getUpdate()->getId().toString(); @@ -495,7 +479,7 @@ notifyFeedOperationRejected(FeedToken *token, const FeedOperation &op, } bool -FeedHandler::considerWriteOperationForRejection(FeedToken *token, const FeedOperation &op) +FeedHandler::considerWriteOperationForRejection(FeedToken & token, const FeedOperation &op) { if (!_writeFilter.acceptWriteOperation() && isRejectableFeedOperation(op.getType())) { IResourceWriteFilter::State state = _writeFilter.getAcceptState(); @@ -508,9 +492,9 @@ FeedHandler::considerWriteOperationForRejection(FeedToken *token, const FeedOper } void -FeedHandler::performOperation(FeedToken::UP token, FeedOperation::UP op) +FeedHandler::performOperation(FeedToken token, FeedOperation::UP op) { - if (considerWriteOperationForRejection(token.get(), *op)) { + if (considerWriteOperationForRejection(token, *op)) { return; } switch(op->getType()) { diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h index dc955cfeb79..4aec9196491 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h +++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h @@ -11,6 +11,7 @@ #include "transactionlogmanager.h" #include <persistence/spi/types.h> #include <vespa/searchcore/proton/common/doctypename.h> +#include <vespa/searchcore/proton/common/feedtoken.h> #include <vespa/searchlib/transactionlog/translogclient.h> namespace searchcorespi { namespace index { class IThreadingService; } } @@ -101,24 +102,24 @@ private: */ void doHandleOperation(FeedToken token, FeedOperationUP op); - bool considerWriteOperationForRejection(FeedToken *token, const FeedOperation &op); + bool considerWriteOperationForRejection(FeedToken & token, const FeedOperation &op); /** * Delayed execution of feed operations against feed view, in * master write thread. */ - void performPut(FeedTokenUP token, PutOperation &op); - - void performUpdate(FeedTokenUP token, UpdateOperation &op); - void performInternalUpdate(FeedTokenUP token, UpdateOperation &op); - void createNonExistingDocument(FeedTokenUP, const UpdateOperation &op); - - void performRemove(FeedTokenUP token, RemoveOperation &op); - void performGarbageCollect(FeedTokenUP token); - void performCreateBucket(FeedTokenUP token, CreateBucketOperation &op); - void performDeleteBucket(FeedTokenUP token, DeleteBucketOperation &op); - void performSplit(FeedTokenUP token, SplitBucketOperation &op); - void performJoin(FeedTokenUP token, JoinBucketsOperation &op); + void performPut(FeedToken token, PutOperation &op); + + void performUpdate(FeedToken token, UpdateOperation &op); + void performInternalUpdate(FeedToken token, UpdateOperation &op); + void createNonExistingDocument(FeedToken, const UpdateOperation &op); + + void performRemove(FeedToken token, RemoveOperation &op); + void performGarbageCollect(FeedToken token); + void performCreateBucket(FeedToken token, CreateBucketOperation &op); + void performDeleteBucket(FeedToken token, DeleteBucketOperation &op); + void performSplit(FeedToken token, SplitBucketOperation &op); + void performJoin(FeedToken token, JoinBucketsOperation &op); void performSync(); void performEof(); @@ -223,7 +224,7 @@ public: vespalib::string getDocTypeName() const { return _docTypeName.getName(); } void tlsPrune(SerialNum oldest_to_keep); - void performOperation(FeedTokenUP token, FeedOperationUP op); + void performOperation(FeedToken token, FeedOperationUP op); void handleOperation(FeedToken token, FeedOperationUP op); void handleMove(MoveOperation &op, std::shared_ptr<search::IDestructorCallback> moveDoneCtx) override; @@ -240,4 +241,3 @@ public: }; } // namespace proton - diff --git a/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp b/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp index 6219cadd24f..91b310e3925 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp @@ -82,13 +82,13 @@ public: } virtual void replay(const PutOperation &op) override { - _feed_view_ptr->handlePut(NULL, op); + _feed_view_ptr->handlePut(FeedToken(), op); } virtual void replay(const RemoveOperation &op) override { - _feed_view_ptr->handleRemove(NULL, op); + _feed_view_ptr->handleRemove(FeedToken(), op); } virtual void replay(const UpdateOperation &op) override { - _feed_view_ptr->handleUpdate(NULL, op); + _feed_view_ptr->handleUpdate(FeedToken(), op); } virtual void replay(const NoopOperation &) override {} // ignored virtual void replay(const NewConfigOperation &op) override { @@ -100,16 +100,12 @@ public: _feed_view_ptr->handleDeleteBucket(op); } virtual void replay(const SplitBucketOperation &op) override { - _bucketDBHandler.handleSplit(op.getSerialNum(), - op.getSource(), - op.getTarget1(), - op.getTarget2()); + _bucketDBHandler.handleSplit(op.getSerialNum(), op.getSource(), + op.getTarget1(), op.getTarget2()); } virtual void replay(const JoinBucketsOperation &op) override { - _bucketDBHandler.handleJoin(op.getSerialNum(), - op.getSource1(), - op.getSource2(), - op.getTarget()); + _bucketDBHandler.handleJoin(op.getSerialNum(), op.getSource1(), + op.getSource2(), op.getTarget()); } virtual void replay(const PruneRemovedDocumentsOperation &op) override { _feed_view_ptr->handlePruneRemovedDocuments(op); diff --git a/searchcore/src/vespa/searchcore/proton/server/feedstates.h b/searchcore/src/vespa/searchcore/proton/server/feedstates.h index 61e80d1c7cc..963a78d0d6b 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedstates.h +++ b/searchcore/src/vespa/searchcore/proton/server/feedstates.h @@ -79,13 +79,11 @@ public: _handler(handler) { } - virtual void handleOperation(FeedToken token, FeedOperation::UP op) override { - _handler.performOperation(FeedToken::UP(new FeedToken(token)), std::move(op)); + void handleOperation(FeedToken token, FeedOperation::UP op) override { + _handler.performOperation(std::move(token), std::move(op)); } - virtual void - receive(const PacketWrapper::SP &wrap, vespalib::Executor &) override - { + void receive(const PacketWrapper::SP &wrap, vespalib::Executor &) override { throwExceptionInReceive(_handler.getDocTypeName().c_str(), wrap->packet.range().from(), wrap->packet.range().to(), diff --git a/searchcore/src/vespa/searchcore/proton/server/ifeedview.h b/searchcore/src/vespa/searchcore/proton/server/ifeedview.h index c8222902c5d..3e1635d0f33 100644 --- a/searchcore/src/vespa/searchcore/proton/server/ifeedview.h +++ b/searchcore/src/vespa/searchcore/proton/server/ifeedview.h @@ -2,19 +2,17 @@ #pragma once +#include <vespa/searchcore/proton/common/feedtoken.h> #include <vespa/searchlib/common/serialnum.h> -#include <memory> namespace document { class DocumentTypeRepo; } namespace search { class IDestructorCallback; } -namespace proton -{ +namespace proton { class CompactLidSpaceOperation; class DeleteBucketOperation; -class FeedToken; class ISimpleDocumentMetaStore; class MoveOperation; class PruneRemovedDocumentsOperation; @@ -49,11 +47,11 @@ public: */ virtual void preparePut(PutOperation &putOp) = 0; - virtual void handlePut(FeedToken *token, const PutOperation &putOp) = 0; + virtual void handlePut(FeedToken token, const PutOperation &putOp) = 0; virtual void prepareUpdate(UpdateOperation &updOp) = 0; - virtual void handleUpdate(FeedToken *token, const UpdateOperation &updOp) = 0; + virtual void handleUpdate(FeedToken token, const UpdateOperation &updOp) = 0; virtual void prepareRemove(RemoveOperation &rmOp) = 0; - virtual void handleRemove(FeedToken *token, const RemoveOperation &rmOp) = 0; + virtual void handleRemove(FeedToken token, const RemoveOperation &rmOp) = 0; virtual void prepareDeleteBucket(DeleteBucketOperation &delOp) = 0; virtual void handleDeleteBucket(const DeleteBucketOperation &delOp) = 0; virtual void prepareMove(MoveOperation &putOp) = 0; diff --git a/searchcore/src/vespa/searchcore/proton/server/operationdonecontext.cpp b/searchcore/src/vespa/searchcore/proton/server/operationdonecontext.cpp index 9c0115f0084..31e4c87b352 100644 --- a/searchcore/src/vespa/searchcore/proton/server/operationdonecontext.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/operationdonecontext.cpp @@ -5,7 +5,7 @@ namespace proton { -OperationDoneContext::OperationDoneContext(std::unique_ptr<FeedToken> token) +OperationDoneContext::OperationDoneContext(FeedToken token) : _token(std::move(token)) { } @@ -18,10 +18,7 @@ OperationDoneContext::~OperationDoneContext() void OperationDoneContext::ack() { - if (_token) { - std::unique_ptr<FeedToken> token(std::move(_token)); - token->ack(); - } + _token.reset(); } } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/operationdonecontext.h b/searchcore/src/vespa/searchcore/proton/server/operationdonecontext.h index b801987844b..4c310abf871 100644 --- a/searchcore/src/vespa/searchcore/proton/server/operationdonecontext.h +++ b/searchcore/src/vespa/searchcore/proton/server/operationdonecontext.h @@ -3,12 +3,10 @@ #pragma once #include <vespa/searchlib/common/idestructorcallback.h> -#include <vespa/searchcore/proton/feedoperation/feedoperation.h> +#include <vespa/searchcore/proton/common/feedtoken.h> namespace proton { -class FeedToken; - /** * Context class for document operations that acks operation when * instance is destroyed. Typically a shared pointer to an instance is @@ -18,15 +16,15 @@ class FeedToken; */ class OperationDoneContext : public search::IDestructorCallback { - std::unique_ptr<FeedToken> _token; + FeedToken _token; protected: void ack(); public: - OperationDoneContext(std::unique_ptr<FeedToken> token); + OperationDoneContext(FeedToken token); ~OperationDoneContext() override; - FeedToken *getToken() { return _token.get(); } + bool hasToken() const { return static_cast<bool>(_token); } }; diff --git a/searchcore/src/vespa/searchcore/proton/server/putdonecontext.cpp b/searchcore/src/vespa/searchcore/proton/server/putdonecontext.cpp index 649eebb26f5..fd7871773c8 100644 --- a/searchcore/src/vespa/searchcore/proton/server/putdonecontext.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/putdonecontext.cpp @@ -1,19 +1,14 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "putdonecontext.h" -#include <vespa/searchcore/proton/common/feedtoken.h> #include <vespa/searchcore/proton/common/docid_limit.h> #include <vespa/searchcore/proton/reference/i_gid_to_lid_change_handler.h> namespace proton { -PutDoneContext::PutDoneContext(std::unique_ptr<FeedToken> token, - - IGidToLidChangeHandler &gidToLidChangeHandler, - const document::GlobalId &gid, - uint32_t lid, - search::SerialNum serialNum, - bool enableNotifyPut) +PutDoneContext::PutDoneContext(FeedToken token, IGidToLidChangeHandler &gidToLidChangeHandler, + const document::GlobalId &gid, uint32_t lid, + search::SerialNum serialNum, bool enableNotifyPut) : OperationDoneContext(std::move(token)), _lid(lid), _docIdLimit(nullptr), diff --git a/searchcore/src/vespa/searchcore/proton/server/putdonecontext.h b/searchcore/src/vespa/searchcore/proton/server/putdonecontext.h index 3e98b02dda6..587c8f9054d 100644 --- a/searchcore/src/vespa/searchcore/proton/server/putdonecontext.h +++ b/searchcore/src/vespa/searchcore/proton/server/putdonecontext.h @@ -28,7 +28,7 @@ class PutDoneContext : public OperationDoneContext bool _enableNotifyPut; public: - PutDoneContext(std::unique_ptr<FeedToken> token, IGidToLidChangeHandler &gidToLidChangeHandler, + PutDoneContext(FeedToken token, IGidToLidChangeHandler &gidToLidChangeHandler, const document::GlobalId &gid, uint32_t lid, search::SerialNum serialNum, bool enableNotifyPut); ~PutDoneContext() override; diff --git a/searchcore/src/vespa/searchcore/proton/server/removedonecontext.cpp b/searchcore/src/vespa/searchcore/proton/server/removedonecontext.cpp index bd9a8240d73..194e190bb7b 100644 --- a/searchcore/src/vespa/searchcore/proton/server/removedonecontext.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/removedonecontext.cpp @@ -2,16 +2,13 @@ #include "removedonecontext.h" #include "removedonetask.h" -#include <vespa/searchcore/proton/common/feedtoken.h> #include <vespa/searchcore/proton/reference/i_gid_to_lid_change_handler.h> namespace proton { -RemoveDoneContext::RemoveDoneContext(std::unique_ptr<FeedToken> token, - vespalib::Executor &executor, +RemoveDoneContext::RemoveDoneContext(FeedToken token, vespalib::Executor &executor, IDocumentMetaStore &documentMetaStore, - PendingNotifyRemoveDone &&pendingNotifyRemoveDone, - uint32_t lid) + PendingNotifyRemoveDone &&pendingNotifyRemoveDone, uint32_t lid) : OperationDoneContext(std::move(token)), _executor(executor), _task(), diff --git a/searchcore/src/vespa/searchcore/proton/server/removedonecontext.h b/searchcore/src/vespa/searchcore/proton/server/removedonecontext.h index 83f6013dd85..912d31dde22 100644 --- a/searchcore/src/vespa/searchcore/proton/server/removedonecontext.h +++ b/searchcore/src/vespa/searchcore/proton/server/removedonecontext.h @@ -28,10 +28,8 @@ class RemoveDoneContext : public OperationDoneContext PendingNotifyRemoveDone _pendingNotifyRemoveDone; public: - RemoveDoneContext(std::unique_ptr<FeedToken> token, vespalib::Executor &executor, - IDocumentMetaStore &documentMetaStore, PendingNotifyRemoveDone &&pendingNotifyRemoveDone, - uint32_t lid); - + RemoveDoneContext(FeedToken token, vespalib::Executor &executor, IDocumentMetaStore &documentMetaStore, + PendingNotifyRemoveDone &&pendingNotifyRemoveDone, uint32_t lid); ~RemoveDoneContext() override; }; diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp index 8c8559115bd..33e8708799f 100644 --- a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp @@ -36,26 +36,14 @@ namespace proton { namespace { -FeedToken::UP dupFeedToken(FeedToken *token) -{ - // If token is not nullptr then a new feed token is created, referencing - // same shared state as old token. - if (token != nullptr) { - return std::make_unique<FeedToken>(*token); - } else { - return FeedToken::UP(); - } -} - class PutDoneContextForMove : public PutDoneContext { private: IDestructorCallback::SP _moveDoneCtx; public: - PutDoneContextForMove(std::unique_ptr<FeedToken> token, - IGidToLidChangeHandler &gidToLidChangeHandler, - const document::GlobalId &gid, - uint32_t lid, search::SerialNum serialNum, bool enableNotifyPut, IDestructorCallback::SP moveDoneCtx) + PutDoneContextForMove(FeedToken token, IGidToLidChangeHandler &gidToLidChangeHandler, + const document::GlobalId &gid, uint32_t lid, search::SerialNum serialNum, + bool enableNotifyPut, IDestructorCallback::SP moveDoneCtx) : PutDoneContext(std::move(token), gidToLidChangeHandler, gid, lid, serialNum, enableNotifyPut), _moveDoneCtx(std::move(moveDoneCtx)) {} @@ -63,7 +51,7 @@ public: }; std::shared_ptr<PutDoneContext> -createPutDoneContext(FeedToken::UP &token, +createPutDoneContext(FeedToken token, IGidToLidChangeHandler &gidToLidChangeHandler, const document::GlobalId &gid, uint32_t lid, SerialNum serialNum, bool enableNotifyPut, @@ -79,14 +67,14 @@ createPutDoneContext(FeedToken::UP &token, } std::shared_ptr<PutDoneContext> -createPutDoneContext(FeedToken::UP &token, IGidToLidChangeHandler &gidToLidChangeHandler, +createPutDoneContext(FeedToken token, IGidToLidChangeHandler &gidToLidChangeHandler, const document::GlobalId &gid, uint32_t lid, SerialNum serialNum, bool enableNotifyPut) { return createPutDoneContext(token, gidToLidChangeHandler, gid, lid, serialNum, enableNotifyPut, IDestructorCallback::SP()); } std::shared_ptr<UpdateDoneContext> -createUpdateDoneContext(FeedToken::UP &token, const DocumentUpdate::SP &upd) +createUpdateDoneContext(FeedToken token, const DocumentUpdate::SP &upd) { return std::make_shared<UpdateDoneContext>(std::move(token), upd); } @@ -106,11 +94,9 @@ private: IDestructorCallback::SP _moveDoneCtx; public: - RemoveDoneContextForMove(std::unique_ptr<FeedToken> token, vespalib::Executor &executor, - IDocumentMetaStore &documentMetaStore, + RemoveDoneContextForMove(FeedToken token, vespalib::Executor &executor, IDocumentMetaStore &documentMetaStore, PendingNotifyRemoveDone &&pendingNotifyRemoveDone, - uint32_t lid, - IDestructorCallback::SP moveDoneCtx) + uint32_t lid, IDestructorCallback::SP moveDoneCtx) : RemoveDoneContext(std::move(token), executor, documentMetaStore, std::move(pendingNotifyRemoveDone) ,lid), _moveDoneCtx(std::move(moveDoneCtx)) {} @@ -118,13 +104,14 @@ public: }; std::shared_ptr<RemoveDoneContext> -createRemoveDoneContext(std::unique_ptr<FeedToken> token, vespalib::Executor &executor, - IDocumentMetaStore &documentMetaStore, PendingNotifyRemoveDone &&pendingNotifyRemoveDone, +createRemoveDoneContext(FeedToken token, vespalib::Executor &executor, IDocumentMetaStore &documentMetaStore, + PendingNotifyRemoveDone &&pendingNotifyRemoveDone, uint32_t lid, IDestructorCallback::SP moveDoneCtx) { if (moveDoneCtx) { return std::make_shared<RemoveDoneContextForMove> - (std::move(token), executor, documentMetaStore, std::move(pendingNotifyRemoveDone), lid, std::move(moveDoneCtx)); + (std::move(token), executor, documentMetaStore, std::move(pendingNotifyRemoveDone), + lid, std::move(moveDoneCtx)); } else { return std::make_shared<RemoveDoneContext> (std::move(token), executor, documentMetaStore, std::move(pendingNotifyRemoveDone), lid); @@ -132,7 +119,8 @@ createRemoveDoneContext(std::unique_ptr<FeedToken> token, vespalib::Executor &ex } std::vector<document::GlobalId> getGidsToRemove(const IDocumentMetaStore &metaStore, - const LidVectorContext::LidVector &lidsToRemove) { + const LidVectorContext::LidVector &lidsToRemove) +{ std::vector<document::GlobalId> gids; gids.reserve(lidsToRemove.size()); for (const auto &lid : lidsToRemove) { @@ -145,7 +133,8 @@ std::vector<document::GlobalId> getGidsToRemove(const IDocumentMetaStore &metaSt } void putMetaData(documentmetastore::IStore &meta_store, const DocumentId &doc_id, - const DocumentOperation &op, bool is_removed_doc) { + const DocumentOperation &op, bool is_removed_doc) +{ documentmetastore::IStore::Result putRes( meta_store.put(doc_id.getGlobalId(), op.getBucketId(), op.getTimestamp(), op.getSerializedDocSize(), op.getLid())); @@ -234,10 +223,9 @@ StoreOnlyFeedView::forceCommit(SerialNum serialNum, OnForceCommitDoneType onComm } void -StoreOnlyFeedView::considerEarlyAck(FeedToken::UP &token) +StoreOnlyFeedView::considerEarlyAck(FeedToken & token) { if (_commitTimeTracker.hasVisibilityDelay() && token) { - token->ack(); token.reset(); } } @@ -260,13 +248,13 @@ StoreOnlyFeedView::preparePut(PutOperation &putOp) } void -StoreOnlyFeedView::handlePut(FeedToken *token, const PutOperation &putOp) +StoreOnlyFeedView::handlePut(FeedToken token, const PutOperation &putOp) { - internalPut(dupFeedToken(token), putOp); + internalPut(std::move(token), putOp); } void -StoreOnlyFeedView::internalPut(FeedToken::UP token, const PutOperation &putOp) +StoreOnlyFeedView::internalPut(FeedToken token, const PutOperation &putOp) { assert(putOp.getValidDbdId()); assert(putOp.notMovingLidInSameSubDb()); @@ -291,7 +279,7 @@ StoreOnlyFeedView::internalPut(FeedToken::UP token, const PutOperation &putOp) bool immediateCommit = _commitTimeTracker.needCommit(); const document::GlobalId &gid = docId.getGlobalId(); std::shared_ptr<PutDoneContext> onWriteDone = - createPutDoneContext(token, _gidToLidChangeHandler, gid, putOp.getLid(), serialNum, + createPutDoneContext(std::move(token), _gidToLidChangeHandler, gid, putOp.getLid(), serialNum, putOp.changedDbdId() && useDocumentMetaStore(serialNum)); putSummary(serialNum, putOp.getLid(), doc, onWriteDone); putAttributes(serialNum, putOp.getLid(), *doc, immediateCommit, onWriteDone); @@ -302,9 +290,6 @@ StoreOnlyFeedView::internalPut(FeedToken::UP token, const PutOperation &putOp) internalRemove(std::move(token), serialNum, std::move(pendingNotifyRemoveDone), putOp.getPrevLid(), IDestructorCallback::SP()); } - if (token) { - token->ack(); - } } void @@ -347,9 +332,9 @@ StoreOnlyFeedView::prepareUpdate(UpdateOperation &updOp) } void -StoreOnlyFeedView::handleUpdate(FeedToken *token, const UpdateOperation &updOp) +StoreOnlyFeedView::handleUpdate(FeedToken token, const UpdateOperation &updOp) { - internalUpdate(dupFeedToken(token), updOp); + internalUpdate(std::move(token), updOp); } void StoreOnlyFeedView::putSummary(SerialNum serialNum, Lid lid, @@ -400,7 +385,7 @@ void StoreOnlyFeedView::heartBeatSummary(SerialNum serialNum) { } void -StoreOnlyFeedView::internalUpdate(FeedToken::UP token, const UpdateOperation &updOp) { +StoreOnlyFeedView::internalUpdate(FeedToken token, const UpdateOperation &updOp) { if ( ! updOp.getUpdate()) { LOG(warning, "database(%s): ignoring invalid update operation", _params._docTypeName.toString().c_str()); @@ -430,7 +415,7 @@ StoreOnlyFeedView::internalUpdate(FeedToken::UP token, const UpdateOperation &up considerEarlyAck(token); bool immediateCommit = _commitTimeTracker.needCommit(); - auto onWriteDone = createUpdateDoneContext(token, updOp.getUpdate()); + auto onWriteDone = createUpdateDoneContext(std::move(token), updOp.getUpdate()); updateAttributes(serialNum, lid, upd, immediateCommit, onWriteDone); UpdateScope updateScope(getUpdateScope(upd)); @@ -470,19 +455,18 @@ StoreOnlyFeedView::makeUpdatedDocument(SerialNum serialNum, Lid lid, DocumentUpd const DocumentUpdate & upd = *update; Document::UP newDoc; vespalib::nbostream newStream(12345); - assert(onWriteDone->getToken() == nullptr || useDocumentStore(serialNum)); + assert(!onWriteDone->hasToken() || useDocumentStore(serialNum)); if (useDocumentStore(serialNum)) { assert(prevDoc); } if (!prevDoc) { // Replaying, document removed later before summary was flushed. - assert(onWriteDone->getToken() == nullptr); + assert(!onWriteDone->hasToken()); // If we've passed serial number for flushed index then we could // also check that this operation is marked for ignore by index // proxy. } else { if (upd.getId() == prevDoc->getId()) { - newDoc = std::move(prevDoc); if (useDocumentStore(serialNum)) { upd.applyTo(*newDoc); @@ -491,7 +475,7 @@ StoreOnlyFeedView::makeUpdatedDocument(SerialNum serialNum, Lid lid, DocumentUpd } else { // Replaying, document removed and lid reused before summary // was flushed. - assert(onWriteDone->getToken() == nullptr && !useDocumentStore(serialNum)); + assert(!onWriteDone->hasToken() && !useDocumentStore(serialNum)); } } promisedDoc.set_value(std::move(newDoc)); @@ -531,12 +515,12 @@ StoreOnlyFeedView::prepareRemove(RemoveOperation &rmOp) } void -StoreOnlyFeedView::handleRemove(FeedToken *token, const RemoveOperation &rmOp) { - internalRemove(dupFeedToken(token), rmOp); +StoreOnlyFeedView::handleRemove(FeedToken token, const RemoveOperation &rmOp) { + internalRemove(std::move(token), rmOp); } void -StoreOnlyFeedView::internalRemove(FeedToken::UP token, const RemoveOperation &rmOp) +StoreOnlyFeedView::internalRemove(FeedToken token, const RemoveOperation &rmOp) { assert(rmOp.getValidNewOrPrevDbdId()); assert(rmOp.notMovingLidInSameSubDb()); @@ -564,13 +548,10 @@ StoreOnlyFeedView::internalRemove(FeedToken::UP token, const RemoveOperation &rm rmOp.getPrevLid(), IDestructorCallback::SP()); } } - if (token) { - token->ack(); - } } void -StoreOnlyFeedView::internalRemove(FeedToken::UP token, SerialNum serialNum, +StoreOnlyFeedView::internalRemove(FeedToken token, SerialNum serialNum, PendingNotifyRemoveDone &&pendingNotifyRemoveDone, Lid lid, IDestructorCallback::SP moveDoneCtx) { @@ -727,16 +708,15 @@ StoreOnlyFeedView::handleMove(const MoveOperation &moveOp, IDestructorCallback:: if (moveOp.getValidDbdId(_params._subDbId)) { bool immediateCommit = _commitTimeTracker.needCommit(); const document::GlobalId &gid = docId.getGlobalId(); - FeedToken::UP token; std::shared_ptr<PutDoneContext> onWriteDone = - createPutDoneContext(token, _gidToLidChangeHandler, gid, moveOp.getLid(), serialNum, + createPutDoneContext(FeedToken(), _gidToLidChangeHandler, gid, moveOp.getLid(), serialNum, moveOp.changedDbdId() && useDocumentMetaStore(serialNum), doneCtx); putSummary(serialNum, moveOp.getLid(), doc, onWriteDone); putAttributes(serialNum, moveOp.getLid(), *doc, immediateCommit, onWriteDone); putIndexedFields(serialNum, moveOp.getLid(), doc, immediateCommit, onWriteDone); } if (docAlreadyExists && moveOp.changedDbdId()) { - internalRemove(FeedToken::UP(), serialNum, std::move(pendingNotifyRemoveDone), moveOp.getPrevLid(), doneCtx); + internalRemove(FeedToken(), serialNum, std::move(pendingNotifyRemoveDone), moveOp.getPrevLid(), doneCtx); } } diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h index baaf77bbe59..a8119224ba8 100644 --- a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h +++ b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h @@ -168,22 +168,22 @@ private: } PendingNotifyRemoveDone adjustMetaStore(const DocumentOperation &op, const document::DocumentId &docId); - void internalPut(FeedTokenUP token, const PutOperation &putOp); - void internalUpdate(FeedTokenUP token, const UpdateOperation &updOp); + void internalPut(FeedToken token, const PutOperation &putOp); + void internalUpdate(FeedToken token, const UpdateOperation &updOp); bool lookupDocId(const document::DocumentId &docId, Lid & lid) const; - void internalRemove(FeedTokenUP token, const RemoveOperation &rmOp); + void internalRemove(FeedToken token, const RemoveOperation &rmOp); // Removes documents from meta store and document store. // returns the number of documents removed. size_t removeDocuments(const RemoveDocumentsOperation &op, bool remove_index_and_attribute_fields, bool immediateCommit); - void internalRemove(FeedTokenUP token, SerialNum serialNum, PendingNotifyRemoveDone &&pendingNotifyRemoveDone, + void internalRemove(FeedToken token, SerialNum serialNum, PendingNotifyRemoveDone &&pendingNotifyRemoveDone, Lid lid, std::shared_ptr<search::IDestructorCallback> moveDoneCtx); // Ack token early if visibility delay is nonzero - void considerEarlyAck(FeedTokenUP &token); + void considerEarlyAck(FeedToken &token); void makeUpdatedDocument(SerialNum serialNum, Lid lid, DocumentUpdateSP upd, OnOperationDoneType onWriteDone, PromisedDoc promisedDoc, PromisedStream promisedStream); @@ -220,7 +220,6 @@ protected: public: StoreOnlyFeedView(const Context &ctx, const PersistentParams ¶ms); - ~StoreOnlyFeedView() override; const ISummaryAdapter::SP &getSummaryAdapter() const { return _summaryAdapter; } @@ -233,31 +232,22 @@ public: CommitTimeTracker &getCommitTimeTracker() { return _commitTimeTracker; } IGidToLidChangeHandler &getGidToLidChangeHandler() const { return _gidToLidChangeHandler; } - /** - * Implements IFeedView. - */ - virtual const document::DocumentTypeRepo::SP &getDocumentTypeRepo() const override { return _repo; } - virtual const ISimpleDocumentMetaStore *getDocumentMetaStorePtr() const override; - - /** - * Similar to IPersistenceHandler functions. - * Takes pointer to feed token instead of instance because - * when replaying the spooler we don't have a feed token. - */ - - virtual void preparePut(PutOperation &putOp) override; - virtual void handlePut(FeedToken *token, const PutOperation &putOp) override; - virtual void prepareUpdate(UpdateOperation &updOp) override; - virtual void handleUpdate(FeedToken *token, const UpdateOperation &updOp) override; - virtual void prepareRemove(RemoveOperation &rmOp) override; - virtual void handleRemove(FeedToken *token, const RemoveOperation &rmOp) override; - virtual void prepareDeleteBucket(DeleteBucketOperation &delOp) override; - virtual void handleDeleteBucket(const DeleteBucketOperation &delOp) override; - virtual void prepareMove(MoveOperation &putOp) override; - virtual void handleMove(const MoveOperation &putOp, std::shared_ptr<search::IDestructorCallback> doneCtx) override; - virtual void heartBeat(search::SerialNum serialNum) override; - virtual void sync() override; - virtual void forceCommit(SerialNum serialNum) override; + const document::DocumentTypeRepo::SP &getDocumentTypeRepo() const override { return _repo; } + const ISimpleDocumentMetaStore *getDocumentMetaStorePtr() const override; + + void preparePut(PutOperation &putOp) override; + void handlePut(FeedToken token, const PutOperation &putOp) override; + void prepareUpdate(UpdateOperation &updOp) override; + void handleUpdate(FeedToken token, const UpdateOperation &updOp) override; + void prepareRemove(RemoveOperation &rmOp) override; + void handleRemove(FeedToken token, const RemoveOperation &rmOp) override; + void prepareDeleteBucket(DeleteBucketOperation &delOp) override; + void handleDeleteBucket(const DeleteBucketOperation &delOp) override; + void prepareMove(MoveOperation &putOp) override; + void handleMove(const MoveOperation &putOp, std::shared_ptr<search::IDestructorCallback> doneCtx) override; + void heartBeat(search::SerialNum serialNum) override; + void sync() override; + void forceCommit(SerialNum serialNum) override; virtual void forceCommit(SerialNum serialNum, OnForceCommitDoneType onCommitDone); /** @@ -266,8 +256,8 @@ public: * * Called by writer thread. */ - virtual void handlePruneRemovedDocuments(const PruneRemovedDocumentsOperation &pruneOp) override; - virtual void handleCompactLidSpace(const CompactLidSpaceOperation &op) override; + void handlePruneRemovedDocuments(const PruneRemovedDocumentsOperation &pruneOp) override; + void handleCompactLidSpace(const CompactLidSpaceOperation &op) override; }; } diff --git a/searchcore/src/vespa/searchcore/proton/server/updatedonecontext.cpp b/searchcore/src/vespa/searchcore/proton/server/updatedonecontext.cpp index 171990c32d3..6fc1a788531 100644 --- a/searchcore/src/vespa/searchcore/proton/server/updatedonecontext.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/updatedonecontext.cpp @@ -1,11 +1,10 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "updatedonecontext.h" -#include <vespa/searchcore/proton/common/feedtoken.h> namespace proton { -UpdateDoneContext::UpdateDoneContext(std::unique_ptr<FeedToken> token, const document::DocumentUpdate::SP &upd) +UpdateDoneContext::UpdateDoneContext(FeedToken token, const document::DocumentUpdate::SP &upd) : OperationDoneContext(std::move(token)), _upd(upd) { diff --git a/searchcore/src/vespa/searchcore/proton/server/updatedonecontext.h b/searchcore/src/vespa/searchcore/proton/server/updatedonecontext.h index 4701db300de..2b8a018af87 100644 --- a/searchcore/src/vespa/searchcore/proton/server/updatedonecontext.h +++ b/searchcore/src/vespa/searchcore/proton/server/updatedonecontext.h @@ -18,7 +18,7 @@ class UpdateDoneContext : public OperationDoneContext { document::DocumentUpdate::SP _upd; public: - UpdateDoneContext(std::unique_ptr<FeedToken> token, const document::DocumentUpdate::SP &upd); + UpdateDoneContext(FeedToken token, const document::DocumentUpdate::SP &upd); ~UpdateDoneContext() override; const document::DocumentUpdate &getUpdate() { return *_upd; } diff --git a/searchcore/src/vespa/searchcore/proton/test/dummy_feed_view.h b/searchcore/src/vespa/searchcore/proton/test/dummy_feed_view.h index cafcbb64410..163168a1a09 100644 --- a/searchcore/src/vespa/searchcore/proton/test/dummy_feed_view.h +++ b/searchcore/src/vespa/searchcore/proton/test/dummy_feed_view.h @@ -4,9 +4,7 @@ #include <vespa/searchcore/proton/server/ifeedview.h> #include <vespa/document/repo/documenttyperepo.h> -namespace proton { - -namespace test { +namespace proton::test { struct DummyFeedView : public IFeedView { @@ -18,33 +16,27 @@ struct DummyFeedView : public IFeedView DummyFeedView(const document::DocumentTypeRepo::SP &docTypeRepo) : _docTypeRepo(docTypeRepo) {} - virtual const document::DocumentTypeRepo::SP &getDocumentTypeRepo() const override { + const document::DocumentTypeRepo::SP &getDocumentTypeRepo() const override { return _docTypeRepo; } - virtual const ISimpleDocumentMetaStore *getDocumentMetaStorePtr() const override { + const ISimpleDocumentMetaStore *getDocumentMetaStorePtr() const override { return std::nullptr_t(); } - virtual void preparePut(PutOperation &) override {} - virtual void handlePut(FeedToken *, - const PutOperation &) override {} - virtual void prepareUpdate(UpdateOperation &) override {} - virtual void handleUpdate(FeedToken *, - const UpdateOperation &) override {} - virtual void prepareRemove(RemoveOperation &) override {} - virtual void handleRemove(FeedToken *, - const RemoveOperation &) override {} - virtual void prepareDeleteBucket(DeleteBucketOperation &) override {} - virtual void handleDeleteBucket(const DeleteBucketOperation &) override {} - virtual void prepareMove(MoveOperation &) override {} - virtual void handleMove(const MoveOperation &, std::shared_ptr<search::IDestructorCallback>) override {} - virtual void heartBeat(search::SerialNum) override {} - virtual void sync() override {} - virtual void handlePruneRemovedDocuments(const PruneRemovedDocumentsOperation &) override {} - virtual void handleCompactLidSpace(const CompactLidSpaceOperation &) override {} + void preparePut(PutOperation &) override {} + void handlePut(FeedToken, const PutOperation &) override {} + void prepareUpdate(UpdateOperation &) override {} + void handleUpdate(FeedToken, const UpdateOperation &) override {} + void prepareRemove(RemoveOperation &) override {} + void handleRemove(FeedToken, const RemoveOperation &) override {} + void prepareDeleteBucket(DeleteBucketOperation &) override {} + void handleDeleteBucket(const DeleteBucketOperation &) override {} + void prepareMove(MoveOperation &) override {} + void handleMove(const MoveOperation &, std::shared_ptr<search::IDestructorCallback>) override {} + void heartBeat(search::SerialNum) override {} + void sync() override {} + void handlePruneRemovedDocuments(const PruneRemovedDocumentsOperation &) override {} + void handleCompactLidSpace(const CompactLidSpaceOperation &) override {} void forceCommit(search::SerialNum) override { } }; -} // namespace test - -} // namespace proton - +} |