diff options
Diffstat (limited to 'searchcore')
33 files changed, 413 insertions, 720 deletions
diff --git a/searchcore/src/tests/proton/documentdb/combiningfeedview/combiningfeedview_test.cpp b/searchcore/src/tests/proton/documentdb/combiningfeedview/combiningfeedview_test.cpp index d4af7b214b6..a997b3cc3db 100644 --- a/searchcore/src/tests/proton/documentdb/combiningfeedview/combiningfeedview_test.cpp +++ b/searchcore/src/tests/proton/documentdb/combiningfeedview/combiningfeedview_test.cpp @@ -72,22 +72,20 @@ struct MyFeedView : public test::DummyFeedView _metaStore.constructFreeList(); } - // Implements IFeedView - virtual const DocumentMetaStore *getDocumentMetaStorePtr() const override { return &_metaStore; } - virtual void preparePut(PutOperation &) override { ++_preparePut; } - virtual void handlePut(FeedToken *, const PutOperation &) override { ++_handlePut; } - virtual void prepareUpdate(UpdateOperation &) override { ++_prepareUpdate; } - virtual void handleUpdate(FeedToken *, const UpdateOperation &) override { ++_handleUpdate; } - virtual void prepareRemove(RemoveOperation &) override { ++_prepareRemove; } - virtual void handleRemove(FeedToken *, const RemoveOperation &) override { ++_handleRemove; } - virtual void prepareDeleteBucket(DeleteBucketOperation &) override { ++_prepareDeleteBucket; } - virtual void handleDeleteBucket(const DeleteBucketOperation &) override - { ++_handleDeleteBucket; } - virtual void prepareMove(MoveOperation &) override { ++_prepareMove; } - virtual void handleMove(const MoveOperation &, IDestructorCallback::SP) override { ++_handleMove; } - virtual void heartBeat(SerialNum) override { ++_heartBeat; } - virtual void handlePruneRemovedDocuments(const PruneRemovedDocumentsOperation &) override { ++_handlePrune; } - virtual void handleCompactLidSpace(const CompactLidSpaceOperation &op) override { + const DocumentMetaStore *getDocumentMetaStorePtr() const override { return &_metaStore; } + void preparePut(PutOperation &) override { ++_preparePut; } + void handlePut(FeedToken, const PutOperation &) override { ++_handlePut; } + void prepareUpdate(UpdateOperation &) override { ++_prepareUpdate; } + void handleUpdate(FeedToken, const UpdateOperation &) override { ++_handleUpdate; } + void prepareRemove(RemoveOperation &) override { ++_prepareRemove; } + void handleRemove(FeedToken, const RemoveOperation &) override { ++_handleRemove; } + void prepareDeleteBucket(DeleteBucketOperation &) override { ++_prepareDeleteBucket; } + void handleDeleteBucket(const DeleteBucketOperation &) override { ++_handleDeleteBucket; } + void prepareMove(MoveOperation &) override { ++_prepareMove; } + void handleMove(const MoveOperation &, IDestructorCallback::SP) override { ++_handleMove; } + void heartBeat(SerialNum) override { ++_heartBeat; } + void handlePruneRemovedDocuments(const PruneRemovedDocumentsOperation &) override { ++_handlePrune; } + void handleCompactLidSpace(const CompactLidSpaceOperation &op) override { _wantedLidLimit = op.getLidLimit(); } }; @@ -213,7 +211,7 @@ TEST_F("require that handlePut() sends to 1 feed view", Fixture) { PutOperation op = f.put(2); op.setDbDocumentId(DbDocumentId(READY, 2)); - f._view.handlePut(NULL, op); + f._view.handlePut(FeedToken(), op); EXPECT_EQUAL(1u, f._ready._view->_handlePut); EXPECT_EQUAL(0u, f._removed._view->_handlePut); EXPECT_EQUAL(0u, f._notReady._view->_handlePut); @@ -225,7 +223,7 @@ TEST_F("require that handlePut() sends to 2 feed views", Fixture) PutOperation op = f.put(2); op.setDbDocumentId(DbDocumentId(NOT_READY, 2)); op.setPrevDbDocumentId(DbDocumentId(REMOVED, 2)); - f._view.handlePut(NULL, op); + f._view.handlePut(FeedToken(), op); EXPECT_EQUAL(0u, f._ready._view->_handlePut); EXPECT_EQUAL(1u, f._removed._view->_handlePut); EXPECT_EQUAL(1u, f._notReady._view->_handlePut); @@ -259,7 +257,7 @@ TEST_F("require that handleRemove() sends op with valid dbdId to 1 feed view", F { RemoveOperation op = f.remove(1); op.setDbDocumentId(DbDocumentId(REMOVED, 1)); - f._view.handleRemove(NULL, op); + f._view.handleRemove(FeedToken(), op); EXPECT_EQUAL(0u, f._ready._view->_handleRemove); EXPECT_EQUAL(1u, f._removed._view->_handleRemove); EXPECT_EQUAL(0u, f._notReady._view->_handleRemove); @@ -271,7 +269,7 @@ TEST_F("require that handleRemove() sends op with valid dbdId to 2 feed views", RemoveOperation op = f.remove(1); op.setDbDocumentId(DbDocumentId(REMOVED, 1)); op.setPrevDbDocumentId(DbDocumentId(READY, 1)); - f._view.handleRemove(NULL, op); + f._view.handleRemove(FeedToken(), op); EXPECT_EQUAL(1u, f._ready._view->_handleRemove); EXPECT_EQUAL(1u, f._removed._view->_handleRemove); EXPECT_EQUAL(0u, f._notReady._view->_handleRemove); @@ -283,7 +281,7 @@ TEST_F("require that handleRemove() sends op with invalid dbdId to prev view", F RemoveOperation op = f.remove(1); // can be used in the case where removed feed view does not remember removes. op.setPrevDbDocumentId(DbDocumentId(READY, 1)); - f._view.handleRemove(NULL, op); + f._view.handleRemove(FeedToken(), op); EXPECT_EQUAL(1u, f._ready._view->_handleRemove); EXPECT_EQUAL(0u, f._removed._view->_handleRemove); EXPECT_EQUAL(0u, f._notReady._view->_handleRemove); @@ -317,7 +315,7 @@ TEST_F("require that handleUpdate() sends op to correct view", Fixture) UpdateOperation op = f.update(1); op.setDbDocumentId(DbDocumentId(READY, 1)); op.setPrevDbDocumentId(DbDocumentId(READY, 1)); - f._view.handleUpdate(NULL, op); + f._view.handleUpdate(FeedToken(), op); EXPECT_EQUAL(1u, f._ready._view->_handleUpdate); EXPECT_EQUAL(0u, f._removed._view->_handleUpdate); EXPECT_EQUAL(0u, f._notReady._view->_handleUpdate); diff --git a/searchcore/src/tests/proton/documentdb/document_subdbs/document_subdbs_test.cpp b/searchcore/src/tests/proton/documentdb/document_subdbs/document_subdbs_test.cpp index 547e400cd76..8369ec0630d 100644 --- a/searchcore/src/tests/proton/documentdb/document_subdbs/document_subdbs_test.cpp +++ b/searchcore/src/tests/proton/documentdb/document_subdbs/document_subdbs_test.cpp @@ -745,18 +745,15 @@ struct DocumentHandler op.setSerialNum(serialNum); return op; } - MoveOperation createMove(Document::UP doc, Timestamp timestamp, - DbDocumentId sourceDbdId, - uint32_t targetSubDbId, - SerialNum serialNum) + MoveOperation createMove(Document::UP doc, Timestamp timestamp, DbDocumentId sourceDbdId, + uint32_t targetSubDbId, SerialNum serialNum) { proton::test::Document testDoc(Document::SP(doc.release()), 0, timestamp); MoveOperation op(testDoc.getBucket(), testDoc.getTimestamp(), testDoc.getDoc(), sourceDbdId, targetSubDbId); op.setSerialNum(serialNum); return op; } - RemoveOperation createRemove(const DocumentId &docId, Timestamp timestamp, - SerialNum serialNum) + RemoveOperation createRemove(const DocumentId &docId, Timestamp timestamp, SerialNum serialNum) { const document::GlobalId &gid = docId.getGlobalId(); BucketId bucket = gid.convertToBucketId(); @@ -769,7 +766,7 @@ struct DocumentHandler void putDoc(PutOperation &op) { IFeedView::SP feedView = _f._subDb.getFeedView(); _f.runInMaster([&]() { feedView->preparePut(op); - feedView->handlePut(NULL, op); } ); + feedView->handlePut(FeedToken(), op); } ); } void moveDoc(MoveOperation &op) { IFeedView::SP feedView = _f._subDb.getFeedView(); @@ -779,11 +776,10 @@ struct DocumentHandler { IFeedView::SP feedView = _f._subDb.getFeedView(); _f.runInMaster([&]() { feedView->prepareRemove(op); - feedView->handleRemove(NULL, op); } ); + feedView->handleRemove(FeedToken(), op); } ); } void putDocs() { - PutOperation putOp = createPut(std::move(createDoc(1, 22, 33)), - Timestamp(10), 10); + PutOperation putOp = createPut(std::move(createDoc(1, 22, 33)), Timestamp(10), 10); putDoc(putOp); putOp = createPut(std::move(createDoc(2, 44, 55)), Timestamp(20), 20); putDoc(putOp); @@ -791,13 +787,8 @@ struct DocumentHandler }; void -assertAttribute(const AttributeGuard &attr, - const vespalib::string &name, - uint32_t numDocs, - int64_t doc1Value, - int64_t doc2Value, - SerialNum createSerialNum, - SerialNum lastSerialNum) +assertAttribute(const AttributeGuard &attr, const vespalib::string &name, uint32_t numDocs, + int64_t doc1Value, int64_t doc2Value, SerialNum createSerialNum, SerialNum lastSerialNum) { EXPECT_EQUAL(name, attr->getName()); EXPECT_EQUAL(numDocs, attr->getNumDocs()); @@ -808,17 +799,13 @@ assertAttribute(const AttributeGuard &attr, } void -assertAttribute1(const AttributeGuard &attr, - SerialNum createSerialNum, - SerialNum lastSerialNum) +assertAttribute1(const AttributeGuard &attr, SerialNum createSerialNum, SerialNum lastSerialNum) { assertAttribute(attr, "attr1", 3, 22, 44, createSerialNum, lastSerialNum); } void -assertAttribute2(const AttributeGuard &attr, - SerialNum createSerialNum, - SerialNum lastSerialNum) +assertAttribute2(const AttributeGuard &attr, SerialNum createSerialNum, SerialNum lastSerialNum) { assertAttribute(attr, "attr2", 3, 33, 55, createSerialNum, lastSerialNum); } @@ -877,12 +864,10 @@ TEST_F("require that regular attributes are populated during reprocessing", requireThatAttributesArePopulatedDuringReprocessing<SearchableFixtureTwoField, ConfigDir2>(f); } -namespace -{ +namespace { bool -assertOperation(DocumentOperation &op, - uint32_t expPrevSubDbId, uint32_t expPrevLid, +assertOperation(DocumentOperation &op, uint32_t expPrevSubDbId, uint32_t expPrevLid, uint32_t expSubDbId, uint32_t expLid) { if (!EXPECT_EQUAL(expPrevSubDbId, op.getPrevSubDbId())) { diff --git a/searchcore/src/tests/proton/documentdb/documentdb_test.cpp b/searchcore/src/tests/proton/documentdb/documentdb_test.cpp index 5e3e5cd78be..f7e09eeec3f 100644 --- a/searchcore/src/tests/proton/documentdb/documentdb_test.cpp +++ b/searchcore/src/tests/proton/documentdb/documentdb_test.cpp @@ -3,8 +3,6 @@ #include <tests/proton/common/dummydbowner.h> #include <vespa/document/datatype/documenttype.h> #include <vespa/fastos/file.h> -#include <vespa/messagebus/emptyreply.h> -#include <vespa/messagebus/testlib/receptor.h> #include <vespa/persistence/spi/test.h> #include <vespa/searchcore/proton/attribute/flushableattribute.h> #include <vespa/searchcore/proton/common/feedtoken.h> @@ -46,25 +44,11 @@ using vespalib::Slime; namespace { -class LocalTransport : public FeedToken::ITransport { - mbus::Receptor _receptor; - -public: - void send(mbus::Reply::UP reply) { - fprintf(stderr, "in local transport."); - _receptor.handleReply(std::move(reply)); - } - - mbus::Reply::UP getReply() { - return _receptor.getReply(10000); - } -}; - struct MyDBOwner : public DummyDBOwner { std::shared_ptr<DocumentDBReferenceRegistry> _registry; MyDBOwner(); - ~MyDBOwner(); + ~MyDBOwner() override; std::shared_ptr<IDocumentDBReferenceRegistry> getDocumentDBReferenceRegistry() const override { return _registry; } @@ -74,7 +58,7 @@ MyDBOwner::MyDBOwner() : DummyDBOwner(), _registry(std::make_shared<DocumentDBReferenceRegistry>()) {} -MyDBOwner::~MyDBOwner() {} +MyDBOwner::~MyDBOwner() = default; struct Fixture { DummyWireService _dummy; @@ -124,16 +108,14 @@ Fixture::Fixture() _db->waitForOnlineState(); } -Fixture::~Fixture() {} +Fixture::~Fixture() = default; const IFlushTarget * extractRealFlushTarget(const IFlushTarget *target) { - const JobTrackedFlushTarget *tracked = - dynamic_cast<const JobTrackedFlushTarget*>(target); + const auto tracked = dynamic_cast<const JobTrackedFlushTarget*>(target); if (tracked != nullptr) { - const ThreadedFlushTarget *threaded = - dynamic_cast<const ThreadedFlushTarget*>(&tracked->getTarget()); + const auto threaded = dynamic_cast<const ThreadedFlushTarget*>(&tracked->getTarget()); if (threaded != nullptr) { return threaded->getFlushTarget().get(); } @@ -144,10 +126,10 @@ extractRealFlushTarget(const IFlushTarget *target) TEST_F("requireThatIndexFlushTargetIsUsed", Fixture) { auto targets = f._db->getFlushTargets(); ASSERT_TRUE(!targets.empty()); - const IndexFlushTarget *index = 0; + const IndexFlushTarget *index = nullptr; for (size_t i = 0; i < targets.size(); ++i) { const IFlushTarget *target = extractRealFlushTarget(targets[i].get()); - if (target != NULL) { + if (target != nullptr) { index = dynamic_cast<const IndexFlushTarget *>(target); } if (index) { @@ -161,9 +143,9 @@ template <typename Target> size_t getNumTargets(const std::vector<IFlushTarget::SP> & targets) { size_t retval = 0; - for (size_t i = 0; i < targets.size(); ++i) { - const IFlushTarget *target = extractRealFlushTarget(targets[i].get()); - if (dynamic_cast<const Target*>(target) == NULL) { + for (const auto & candidate : targets) { + const IFlushTarget *target = extractRealFlushTarget(candidate.get()); + if (dynamic_cast<const Target*>(target) == nullptr) { continue; } retval++; @@ -244,16 +226,16 @@ TEST_F("requireThatStateIsReported", Fixture) TEST_F("require that session manager can be explored", Fixture) { - EXPECT_TRUE(DocumentDBExplorer(f._db).get_child("session").get() != nullptr); + EXPECT_TRUE(DocumentDBExplorer(f._db).get_child("session")); } TEST_F("require that document db registers reference", Fixture) { auto ®istry = f._myDBOwner._registry; auto reference = registry->get("typea"); - EXPECT_TRUE(reference.get() != nullptr); + EXPECT_TRUE(reference); auto attr = reference->getAttribute("attr1"); - EXPECT_TRUE(attr.get() != nullptr); + EXPECT_TRUE(attr); EXPECT_EQUAL(search::attribute::BasicType::INT32, attr->getBasicType()); } diff --git a/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp b/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp index b0b06a238c9..823c31dd1c2 100644 --- a/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp +++ b/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp @@ -1,9 +1,5 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include <vespa/documentapi/messagebus/documentprotocol.h> -#include <vespa/documentapi/messagebus/messages/documentreply.h> -#include <vespa/documentapi/messagebus/messages/removedocumentreply.h> -#include <vespa/documentapi/messagebus/messages/updatedocumentreply.h> #include <vespa/persistence/spi/result.h> #include <vespa/searchcore/proton/bucketdb/bucketdbhandler.h> #include <vespa/searchcore/proton/test/bucketfactory.h> @@ -40,11 +36,6 @@ using document::DocumentId; using document::DocumentTypeRepo; using document::DocumentUpdate; using document::GlobalId; -using documentapi::DocumentProtocol; -using documentapi::DocumentReply; -using documentapi::RemoveDocumentReply; -using documentapi::UpdateDocumentReply; -using mbus::Reply; using search::IDestructorCallback; using search::SerialNum; using search::index::schema::CollectionType; @@ -140,12 +131,6 @@ struct MyReplayConfig : public IReplayConfig { virtual void replayConfig(SerialNum) override {} }; -void ackToken(FeedToken *token) { - if (token != NULL) { - token->ack(); - } -} - struct MyDocumentMetaStore { struct Entry { DbDocumentId _id; @@ -195,9 +180,9 @@ struct MyFeedView : public test::DummyFeedView { int update_count; SerialNum update_serial; MyFeedView(const DocumentTypeRepo::SP &dtr); - ~MyFeedView(); + ~MyFeedView() override; void resetPutLatch(uint32_t count) { putLatch.reset(new vespalib::CountDownLatch(count)); } - virtual void preparePut(PutOperation &op) override { + void preparePut(PutOperation &op) override { prepareDocumentOperation(op, op.getDocument()->getId().getGlobalId()); } void prepareDocumentOperation(DocumentOperation &op, const GlobalId &gid) { @@ -208,7 +193,8 @@ struct MyFeedView : public test::DummyFeedView { op.setPrevTimestamp(entry->_prevTimestamp); } } - virtual void handlePut(FeedToken *token, const PutOperation &putOp) override { + void handlePut(FeedToken token, const PutOperation &putOp) override { + (void) token; LOG(info, "MyFeedView::handlePut(): docId(%s), putCount(%u), putLatchCount(%u)", putOp.getDocument()->getId().toString().c_str(), put_count, (putLatch.get() != NULL ? putLatch->getCount() : 0u)); @@ -221,23 +207,24 @@ struct MyFeedView : public test::DummyFeedView { if (putLatch.get() != NULL) { putLatch->countDown(); } - ackToken(token); } - virtual void prepareUpdate(UpdateOperation &op) override { + void prepareUpdate(UpdateOperation &op) override { prepareDocumentOperation(op, op.getUpdate()->getId().getGlobalId()); } - virtual void handleUpdate(FeedToken *token, const UpdateOperation &op) override { + void handleUpdate(FeedToken token, const UpdateOperation &op) override { + (void) token; + ++update_count; update_serial = op.getSerialNum(); - ackToken(token); } - virtual void handleRemove(FeedToken *token, const RemoveOperation &) override - { ++remove_count; ackToken(token); } - virtual void handleMove(const MoveOperation &, IDestructorCallback::SP) override { ++move_count; } - virtual void heartBeat(SerialNum) override { ++heartbeat_count; } - virtual void handlePruneRemovedDocuments( - const PruneRemovedDocumentsOperation &) override { ++prune_removed_count; } - virtual const ISimpleDocumentMetaStore *getDocumentMetaStorePtr() const override { + void handleRemove(FeedToken token, const RemoveOperation &) override { + (void) token; + ++remove_count; + } + void handleMove(const MoveOperation &, IDestructorCallback::SP) override { ++move_count; } + void heartBeat(SerialNum) override { ++heartbeat_count; } + void handlePruneRemovedDocuments(const PruneRemovedDocumentsOperation &) override { ++prune_removed_count; } + const ISimpleDocumentMetaStore *getDocumentMetaStorePtr() const override { return NULL; } }; @@ -299,7 +286,7 @@ struct UpdateContext { }; -struct MyTransport : public FeedToken::ITransport { +struct MyTransport : public feedtoken::ITransport { vespalib::Gate gate; ResultUP result; bool documentWasFound; @@ -313,12 +300,11 @@ struct MyTransport : public FeedToken::ITransport { }; MyTransport::MyTransport() : gate(), result(), documentWasFound(false) {} -MyTransport::~MyTransport() {} +MyTransport::~MyTransport() = default; struct FeedTokenContext { MyTransport transport; - FeedToken::UP token_ap; - FeedToken &token; + FeedToken token; FeedTokenContext(); ~FeedTokenContext(); @@ -327,16 +313,14 @@ struct FeedTokenContext { if (transport.result.get()) { return transport.result.get(); } - return &token.getResult(); + return &token->getResult(); } }; FeedTokenContext::FeedTokenContext() : transport(), - token_ap(new FeedToken(transport)), - token(*token_ap) -{ -} + token(feedtoken::make(transport)) +{} FeedTokenContext::~FeedTokenContext() = default; @@ -432,8 +416,7 @@ struct FeedHandlerFixture handler.init(1); } - ~FeedHandlerFixture() - { + ~FeedHandlerFixture() { writeService.sync(); } template <class FunctionType> @@ -484,7 +467,7 @@ TEST_F("require that outdated remove is ignored", FeedHandlerFixture) static_cast<DocumentOperation &>(*op).setPrevDbDocumentId(DbDocumentId(4)); static_cast<DocumentOperation &>(*op).setPrevTimestamp(Timestamp(10000)); FeedTokenContext token_context; - f.handler.performOperation(std::move(token_context.token_ap), std::move(op)); + f.handler.performOperation(std::move(token_context.token), std::move(op)); EXPECT_EQUAL(0, f.feedView.remove_count); EXPECT_EQUAL(0, f.tls_writer.store_count); } @@ -496,7 +479,7 @@ TEST_F("require that outdated put is ignored", FeedHandlerFixture) Timestamp(10), doc_context.doc)); static_cast<DocumentOperation &>(*op).setPrevTimestamp(Timestamp(10000)); FeedTokenContext token_context; - f.handler.performOperation(std::move(token_context.token_ap), std::move(op)); + f.handler.performOperation(std::move(token_context.token), std::move(op)); EXPECT_EQUAL(0, f.feedView.put_count); EXPECT_EQUAL(0, f.tls_writer.store_count); } @@ -575,7 +558,7 @@ TEST_F("require that remove of unknown document with known data type stores remo DocumentContext doc_context("id:test:searchdocument::foo", *f.schema.builder); FeedOperation::UP op(new RemoveOperation(doc_context.bucketId, Timestamp(10), doc_context.doc->getId())); FeedTokenContext token_context; - f.handler.performOperation(std::move(token_context.token_ap), std::move(op)); + f.handler.performOperation(std::move(token_context.token), std::move(op)); EXPECT_EQUAL(1, f.feedView.remove_count); EXPECT_EQUAL(1, f.tls_writer.store_count); } @@ -585,7 +568,7 @@ TEST_F("require that partial update for non-existing document is tagged as such" UpdateContext upCtx("id:test:searchdocument::foo", *f.schema.builder); FeedOperation::UP op(new UpdateOperation(upCtx.bucketId, Timestamp(10), upCtx.update)); FeedTokenContext token_context; - f.handler.performOperation(std::move(token_context.token_ap), std::move(op)); + f.handler.performOperation(std::move(token_context.token), std::move(op)); const UpdateResult *result = static_cast<const UpdateResult *>(token_context.getResult()); EXPECT_FALSE(token_context.transport.documentWasFound); @@ -603,7 +586,7 @@ TEST_F("require that partial update for non-existing document is created if spec f.feedView.metaStore.insert(upCtx.update->getId().getGlobalId(), MyDocumentMetaStore::Entry(5, 5, Timestamp(10))); FeedOperation::UP op(new UpdateOperation(upCtx.bucketId, Timestamp(10), upCtx.update)); FeedTokenContext token_context; - f.handler.performOperation(std::move(token_context.token_ap), std::move(op)); + f.handler.performOperation(std::move(token_context.token), std::move(op)); const UpdateResult *result = static_cast<const UpdateResult *>(token_context.getResult()); EXPECT_TRUE(token_context.transport.documentWasFound); @@ -624,7 +607,7 @@ TEST_F("require that put is rejected if resource limit is reached", FeedHandlerF DocumentContext docCtx("id:test:searchdocument::foo", *f.schema.builder); FeedOperation::UP op = std::make_unique<PutOperation>(docCtx.bucketId, Timestamp(10), docCtx.doc); FeedTokenContext token; - f.handler.performOperation(std::move(token.token_ap), std::move(op)); + f.handler.performOperation(std::move(token.token), std::move(op)); EXPECT_EQUAL(0, f.feedView.put_count); EXPECT_EQUAL(Result::RESOURCE_EXHAUSTED, token.getResult()->getErrorCode()); EXPECT_EQUAL("Put operation rejected for document 'id:test:searchdocument::foo' of type 'searchdocument': 'Attribute resource limit reached'", @@ -639,7 +622,7 @@ TEST_F("require that update is rejected if resource limit is reached", FeedHandl UpdateContext updCtx("id:test:searchdocument::foo", *f.schema.builder); FeedOperation::UP op = std::make_unique<UpdateOperation>(updCtx.bucketId, Timestamp(10), updCtx.update); FeedTokenContext token; - f.handler.performOperation(std::move(token.token_ap), std::move(op)); + f.handler.performOperation(std::move(token.token), std::move(op)); EXPECT_EQUAL(0, f.feedView.update_count); EXPECT_TRUE(dynamic_cast<const UpdateResult *>(token.getResult())); EXPECT_EQUAL(Result::RESOURCE_EXHAUSTED, token.getResult()->getErrorCode()); @@ -655,7 +638,7 @@ TEST_F("require that remove is NOT rejected if resource limit is reached", FeedH DocumentContext docCtx("id:test:searchdocument::foo", *f.schema.builder); FeedOperation::UP op = std::make_unique<RemoveOperation>(docCtx.bucketId, Timestamp(10), docCtx.doc->getId()); FeedTokenContext token; - f.handler.performOperation(std::move(token.token_ap), std::move(op)); + f.handler.performOperation(std::move(token.token), std::move(op)); EXPECT_EQUAL(1, f.feedView.remove_count); EXPECT_EQUAL(Result::NONE, token.getResult()->getErrorCode()); EXPECT_EQUAL("", token.getResult()->getErrorMessage()); diff --git a/searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp b/searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp index 4eefbed0a53..54aefef4463 100644 --- a/searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp +++ b/searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp @@ -1,9 +1,5 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include <vespa/documentapi/messagebus/documentprotocol.h> -#include <vespa/documentapi/messagebus/messages/documentreply.h> -#include <vespa/documentapi/messagebus/messages/removedocumentreply.h> -#include <vespa/documentapi/messagebus/messages/updatedocumentreply.h> #include <vespa/searchcore/proton/attribute/i_attribute_writer.h> #include <vespa/searchcore/proton/test/bucketfactory.h> #include <vespa/searchcore/proton/common/commit_time_tracker.h> @@ -11,7 +7,6 @@ #include <vespa/searchcore/proton/documentmetastore/lidreusedelayer.h> #include <vespa/searchcore/proton/index/i_index_writer.h> #include <vespa/searchcore/proton/server/executorthreadingservice.h> -#include <vespa/searchcore/proton/server/ifrozenbuckethandler.h> #include <vespa/searchcore/proton/server/isummaryadapter.h> #include <vespa/searchcore/proton/server/matchview.h> #include <vespa/searchcore/proton/server/searchable_feed_view.h> @@ -25,9 +20,7 @@ #include <vespa/searchcore/proton/test/thread_utils.h> #include <vespa/searchcore/proton/test/threading_service_observer.h> #include <vespa/searchlib/attribute/attributefactory.h> -#include <vespa/searchlib/common/idestructorcallback.h> -#include <vespa/searchlib/docstore/cachestats.h> -#include <vespa/searchlib/docstore/idocumentstore.h> + #include <vespa/searchlib/index/docbuilder.h> #include <vespa/log/log.h> @@ -37,8 +30,6 @@ using document::BucketId; using document::Document; using document::DocumentId; using document::DocumentUpdate; -using documentapi::DocumentProtocol; -using documentapi::RemoveDocumentReply; using fastos::TimeStamp; using proton::matching::SessionManager; using proton::test::MockGidToLidChangeHandler; @@ -62,7 +53,6 @@ using namespace search::index; typedef SearchableFeedView::SerialNum SerialNum; typedef search::DocumentIdT DocumentIdT; -typedef DocumentProtocol::MessageType MessageType; struct MyLidVector : public std::vector<DocumentIdT> { @@ -235,7 +225,8 @@ struct MyDocumentStore : public test::DummyDocumentStore _lastSyncToken(0), _compactLidSpaceLidLimit(0) {} - virtual Document::UP read(DocumentIdT lid, const document::DocumentTypeRepo &) const override { + ~MyDocumentStore() override; + Document::UP read(DocumentIdT lid, const document::DocumentTypeRepo &) const override { DocMap::const_iterator itr = _docs.find(lid); if (itr != _docs.end()) { Document::UP retval(itr->second->clone()); @@ -243,27 +234,29 @@ struct MyDocumentStore : public test::DummyDocumentStore } return Document::UP(); } - virtual void write(uint64_t syncToken, DocumentIdT lid, const document::Document& doc) override { + void write(uint64_t syncToken, DocumentIdT lid, const document::Document& doc) override { _lastSyncToken = syncToken; _docs[lid] = Document::SP(doc.clone()); } - virtual void write(uint64_t syncToken, DocumentIdT lid, const vespalib::nbostream & os) override { + void write(uint64_t syncToken, DocumentIdT lid, const vespalib::nbostream & os) override { _lastSyncToken = syncToken; _docs[lid] = std::make_shared<Document>(_repo, const_cast<vespalib::nbostream &>(os)); } - virtual void remove(uint64_t syncToken, DocumentIdT lid) override { + void remove(uint64_t syncToken, DocumentIdT lid) override { _lastSyncToken = syncToken; _docs.erase(lid); } - virtual uint64_t initFlush(uint64_t syncToken) override { + uint64_t initFlush(uint64_t syncToken) override { return syncToken; } - virtual uint64_t lastSyncToken() const override { return _lastSyncToken; } - virtual void compactLidSpace(uint32_t wantedDocLidLimit) override { + uint64_t lastSyncToken() const override { return _lastSyncToken; } + void compactLidSpace(uint32_t wantedDocLidLimit) override { _compactLidSpaceLidLimit = wantedDocLidLimit; } }; +MyDocumentStore::~MyDocumentStore() = default; + struct MySummaryManager : public test::DummySummaryManager { MyDocumentStore _store; @@ -405,7 +398,7 @@ MyAttributeWriter::MyAttributeWriter(MyTracer &tracer) } MyAttributeWriter::~MyAttributeWriter() {} -struct MyTransport : public FeedToken::ITransport +struct MyTransport : public feedtoken::ITransport { ResultUP lastResult; vespalib::Gate _gate; @@ -421,7 +414,7 @@ struct MyTransport : public FeedToken::ITransport }; MyTransport::MyTransport(MyTracer &tracer) : lastResult(), _gate(), _tracer(tracer) {} -MyTransport::~MyTransport() {} +MyTransport::~MyTransport() = default; struct MyResultHandler : public IGenericResultHandler { @@ -491,7 +484,7 @@ struct FeedTokenContext }; FeedTokenContext::FeedTokenContext(MyTracer &tracer) - : mt(tracer), ft(mt) + : mt(tracer), ft(feedtoken::make(mt)) {} FeedTokenContext::~FeedTokenContext() = default; @@ -571,7 +564,7 @@ struct FixtureBase return doc("doc:test:1", timestamp); } - void performPut(FeedToken *token, PutOperation &op) { + void performPut(FeedToken token, PutOperation &op) { getFeedView().preparePut(op); op.setSerialNum(++serial); getFeedView().handlePut(token, op); @@ -586,10 +579,10 @@ struct FixtureBase void putAndWait(const DocumentContext &docCtx) { FeedTokenContext token(_tracer); PutOperation op(docCtx.bid, docCtx.ts, docCtx.doc); - runInMaster([&] () { performPut(&token.ft, op); }); + runInMaster([&] () { performPut(token.ft, op); }); } - void performUpdate(FeedToken *token, UpdateOperation &op) { + void performUpdate(FeedToken token, UpdateOperation &op) { getFeedView().prepareUpdate(op); op.setSerialNum(++serial); getFeedView().handleUpdate(token, op); @@ -598,25 +591,21 @@ struct FixtureBase void updateAndWait(const DocumentContext &docCtx) { FeedTokenContext token(_tracer); UpdateOperation op(docCtx.bid, docCtx.ts, docCtx.upd); - runInMaster([&] () { performUpdate(&token.ft, op); }); + runInMaster([&] () { performUpdate(token.ft, op); }); } - void performRemove(FeedToken *token, RemoveOperation &op) { + void performRemove(FeedToken token, RemoveOperation &op) { getFeedView().prepareRemove(op); if (op.getValidNewOrPrevDbdId()) { op.setSerialNum(++serial); - getFeedView().handleRemove(token, op); - } else { - if (token != NULL) { - token->ack(); - } + getFeedView().handleRemove(std::move(token), op); } } void removeAndWait(const DocumentContext &docCtx) { FeedTokenContext token(_tracer); RemoveOperation op(docCtx.bid, docCtx.ts, docCtx.doc->getId()); - runInMaster([&] () { performRemove(&token.ft, op); }); + runInMaster([&] () { performRemove(token.ft, op); }); } void removeAndWait(const DocumentContext::List &docs) { @@ -1211,12 +1200,12 @@ TEST_F("require that commit is not called when inside a commit interval", EXPECT_EQUAL(0u, f.miw._commitCount); EXPECT_EQUAL(0u, f.maw._commitCount); EXPECT_EQUAL(0u, f._docIdLimit.get()); - f.assertTrace("ack(Result(0, ))," - "put(adapter=attribute,serialNum=1,lid=1,commit=0)," + f.assertTrace("put(adapter=attribute,serialNum=1,lid=1,commit=0)," "put(adapter=index,serialNum=1,lid=1,commit=0)," "ack(Result(0, ))," "remove(adapter=attribute,serialNum=2,lid=1,commit=0)," - "remove(adapter=index,serialNum=2,lid=1,commit=0)"); + "remove(adapter=index,serialNum=2,lid=1,commit=0)," + "ack(Result(0, ))"); } TEST_F("require that commit is called when crossing a commit interval", @@ -1232,19 +1221,18 @@ TEST_F("require that commit is called when crossing a commit interval", f.removeAndWait(dc); EXPECT_EQUAL(2u, f.miw._commitCount); EXPECT_EQUAL(2u, f.maw._commitCount); - f.assertTrace("ack(Result(0, ))," - "put(adapter=attribute,serialNum=1,lid=1,commit=1)," + f.assertTrace("put(adapter=attribute,serialNum=1,lid=1,commit=1)," "put(adapter=index,serialNum=1,lid=1,commit=0)," "commit(adapter=index,serialNum=1)," "ack(Result(0, ))," "remove(adapter=attribute,serialNum=2,lid=1,commit=1)," "remove(adapter=index,serialNum=2,lid=1,commit=0)," - "commit(adapter=index,serialNum=2)"); + "commit(adapter=index,serialNum=2)," + "ack(Result(0, ))"); } -TEST_F("require that commit is not implicitly called after " - "handover to maintenance job", +TEST_F("require that commit is not implicitly called after handover to maintenance job", SearchableFeedViewFixture(SHORT_DELAY)) { f._commitTimeTracker.setReplayDone(); @@ -1259,12 +1247,12 @@ TEST_F("require that commit is not implicitly called after " EXPECT_EQUAL(0u, f.miw._commitCount); EXPECT_EQUAL(0u, f.maw._commitCount); EXPECT_EQUAL(0u, f._docIdLimit.get()); - f.assertTrace("ack(Result(0, ))," - "put(adapter=attribute,serialNum=1,lid=1,commit=0)," + f.assertTrace("put(adapter=attribute,serialNum=1,lid=1,commit=0)," "put(adapter=index,serialNum=1,lid=1,commit=0)," "ack(Result(0, ))," "remove(adapter=attribute,serialNum=2,lid=1,commit=0)," - "remove(adapter=index,serialNum=2,lid=1,commit=0)"); + "remove(adapter=index,serialNum=2,lid=1,commit=0)," + "ack(Result(0, ))"); } TEST_F("require that forceCommit updates docid limit", @@ -1280,15 +1268,14 @@ TEST_F("require that forceCommit updates docid limit", EXPECT_EQUAL(1u, f.miw._commitCount); EXPECT_EQUAL(1u, f.maw._commitCount); EXPECT_EQUAL(2u, f._docIdLimit.get()); - f.assertTrace("ack(Result(0, ))," - "put(adapter=attribute,serialNum=1,lid=1,commit=0)," + f.assertTrace("put(adapter=attribute,serialNum=1,lid=1,commit=0)," "put(adapter=index,serialNum=1,lid=1,commit=0)," + "ack(Result(0, ))," "commit(adapter=attribute,serialNum=1)," "commit(adapter=index,serialNum=1)"); } -TEST_F("require that forceCommit updates docid limit during shrink", - SearchableFeedViewFixture(LONG_DELAY)) +TEST_F("require that forceCommit updates docid limit during shrink", SearchableFeedViewFixture(LONG_DELAY)) { f._commitTimeTracker.setReplayDone(); f.putAndWait(f.makeDummyDocs(0, 3, 1000)); diff --git a/searchcore/src/tests/proton/feedtoken/feedtoken.cpp b/searchcore/src/tests/proton/feedtoken/feedtoken.cpp index 530c9ebef39..8bd1d92657f 100644 --- a/searchcore/src/tests/proton/feedtoken/feedtoken.cpp +++ b/searchcore/src/tests/proton/feedtoken/feedtoken.cpp @@ -5,7 +5,7 @@ using namespace proton; -class LocalTransport : public FeedToken::ITransport { +class LocalTransport : public feedtoken::ITransport { private: size_t _receivedCount; @@ -45,8 +45,9 @@ void Test::testAck() { LocalTransport transport; - FeedToken token(transport); - token.ack(); + { + FeedToken token = feedtoken::make(transport); + } EXPECT_EQUAL(1u, transport.getReceivedCount()); } @@ -54,8 +55,8 @@ void Test::testFail() { LocalTransport transport; - FeedToken token(transport); - token.fail(); + FeedToken token = feedtoken::make(transport); + token->fail(); EXPECT_EQUAL(1u, transport.getReceivedCount()); } @@ -70,9 +71,10 @@ Test::testHandover() LocalTransport transport; - FeedToken token(transport); - token = MyHandover::handover(token); - token.ack(); + { + FeedToken token = feedtoken::make(transport); + token = MyHandover::handover(token); + } EXPECT_EQUAL(1u, transport.getReceivedCount()); } diff --git a/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp b/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp index f8c0e0ac2f6..f7b35851696 100644 --- a/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp +++ b/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp @@ -127,33 +127,29 @@ struct MyDocumentRetriever : DocumentRetrieverBaseForTest { MyDocumentRetriever(const Document *d, Timestamp ts, DocumentId &last_id) : repo(), document(d), timestamp(ts), last_doc_id(last_id) {} - virtual const document::DocumentTypeRepo &getDocumentTypeRepo() const override { + const document::DocumentTypeRepo &getDocumentTypeRepo() const override { return repo; } - virtual void getBucketMetaData(const storage::spi::Bucket &, - search::DocumentMetaData::Vector &v) const override { + void getBucketMetaData(const storage::spi::Bucket &, search::DocumentMetaData::Vector &v) const override { if (document != 0) { v.push_back(getDocumentMetaData(document->getId())); } } - virtual DocumentMetaData getDocumentMetaData(const DocumentId &id) const override { + DocumentMetaData getDocumentMetaData(const DocumentId &id) const override { last_doc_id = id; if (document != 0) { - return DocumentMetaData(1, timestamp, document::BucketId(1), - document->getId().getGlobalId()); + return DocumentMetaData(1, timestamp, document::BucketId(1), document->getId().getGlobalId()); } return DocumentMetaData(); } - virtual document::Document::UP getDocument(search::DocumentIdT) const override { + document::Document::UP getDocument(search::DocumentIdT) const override { if (document != 0) { return Document::UP(document->clone()); } return Document::UP(); } - virtual CachedSelect::SP - parseSelect(const vespalib::string &) const override - { + CachedSelect::SP parseSelect(const vespalib::string &) const override { return CachedSelect::SP(); } }; @@ -208,134 +204,104 @@ struct MyHandler : public IPersistenceHandler, IBucketFreezer { setExistingTimestamp(ts); } void handle(FeedToken token, const Bucket &bucket, Timestamp timestamp, const DocumentId &docId) { + (void) token; lastBucket = bucket; lastTimestamp = timestamp; lastDocId = docId; - token.ack(); } - virtual void initialize() override { initialized = true; } + void initialize() override { initialized = true; } - virtual void handlePut(FeedToken token, const Bucket& bucket, - Timestamp timestamp, const document::Document::SP& doc) override { - token.setResult(ResultUP(new storage::spi::Result()), false); + void handlePut(FeedToken token, const Bucket& bucket, + Timestamp timestamp, const document::Document::SP& doc) override { + token->setResult(ResultUP(new storage::spi::Result()), false); handle(token, bucket, timestamp, doc->getId()); } - virtual void handleUpdate(FeedToken token, const Bucket& bucket, - Timestamp timestamp, const document::DocumentUpdate::SP& upd) override { - token.setResult(ResultUP(new storage::spi::UpdateResult(existingTimestamp)), + void handleUpdate(FeedToken token, const Bucket& bucket, + Timestamp timestamp, const document::DocumentUpdate::SP& upd) override { + token->setResult(ResultUP(new storage::spi::UpdateResult(existingTimestamp)), existingTimestamp > 0); handle(token, bucket, timestamp, upd->getId()); } - virtual void handleRemove(FeedToken token, const Bucket& bucket, - Timestamp timestamp, const DocumentId& id) override { + void handleRemove(FeedToken token, const Bucket& bucket, + Timestamp timestamp, const DocumentId& id) override { bool wasFound = existingTimestamp > 0; - token.setResult(ResultUP(new storage::spi::RemoveResult(wasFound)), wasFound); + token->setResult(ResultUP(new storage::spi::RemoveResult(wasFound)), wasFound); handle(token, bucket, timestamp, id); } - virtual void handleListBuckets(IBucketIdListResultHandler &resultHandler) override { + void handleListBuckets(IBucketIdListResultHandler &resultHandler) override { resultHandler.handle(BucketIdListResult(bucketList)); } - virtual void handleSetClusterState(const ClusterState &calc, - IGenericResultHandler &resultHandler) override { + void handleSetClusterState(const ClusterState &calc, IGenericResultHandler &resultHandler) override { lastCalc = &calc; resultHandler.handle(Result()); } - virtual void handleSetActiveState(const Bucket &bucket, - storage::spi::BucketInfo::ActiveState newState, - IGenericResultHandler &resultHandler) override { + void handleSetActiveState(const Bucket &bucket, storage::spi::BucketInfo::ActiveState newState, + IGenericResultHandler &resultHandler) override { lastBucket = bucket; lastBucketState = newState; resultHandler.handle(bucketStateResult); } - virtual void handleGetBucketInfo(const Bucket &, - IBucketInfoResultHandler &resultHandler) override { + void handleGetBucketInfo(const Bucket &, IBucketInfoResultHandler &resultHandler) override { resultHandler.handle(BucketInfoResult(bucketInfo)); } - virtual void - handleCreateBucket(FeedToken token, - const storage::spi::Bucket &) override - { - token.setResult(ResultUP(new Result(_createBucketResult)), true); - token.ack(); + void handleCreateBucket(FeedToken token, const storage::spi::Bucket &) override { + token->setResult(ResultUP(new Result(_createBucketResult)), true); } - virtual void handleDeleteBucket(FeedToken token, - const storage::spi::Bucket &) override { - token.setResult(ResultUP(new Result(deleteBucketResult)), true); - token.ack(); + void handleDeleteBucket(FeedToken token, const storage::spi::Bucket &) override { + token->setResult(ResultUP(new Result(deleteBucketResult)), true); } - virtual void handleGetModifiedBuckets(IBucketIdListResultHandler &resultHandler) override { + void handleGetModifiedBuckets(IBucketIdListResultHandler &resultHandler) override { resultHandler.handle(BucketIdListResult(modBucketList)); } - virtual void - handleSplit(FeedToken token, - const storage::spi::Bucket &source, - const storage::spi::Bucket &target1, - const storage::spi::Bucket &target2) override + void handleSplit(FeedToken token, const storage::spi::Bucket &, const storage::spi::Bucket &, + const storage::spi::Bucket &) override { - (void) source; - (void) target1; - (void) target2; - token.setResult(ResultUP(new Result(_splitResult)), true); - token.ack(); - } - - virtual void - handleJoin(FeedToken token, - const storage::spi::Bucket &source1, - const storage::spi::Bucket &source2, - const storage::spi::Bucket &target) override + token->setResult(ResultUP(new Result(_splitResult)), true); + } + + void handleJoin(FeedToken token, const storage::spi::Bucket &, const storage::spi::Bucket &, + const storage::spi::Bucket &) override { - (void) source1; - (void) source2; - (void) target; - token.setResult(ResultUP(new Result(_joinResult)), true); - token.ack(); + token->setResult(ResultUP(new Result(_joinResult)), true); } - virtual RetrieversSP getDocumentRetrievers(storage::spi::ReadConsistency) override { + RetrieversSP getDocumentRetrievers(storage::spi::ReadConsistency) override { RetrieversSP ret(new std::vector<IDocumentRetriever::SP>); - ret->push_back(IDocumentRetriever::SP(new MyDocumentRetriever( - 0, Timestamp(), lastDocId))); - ret->push_back(IDocumentRetriever::SP(new MyDocumentRetriever( - document, existingTimestamp, lastDocId))); + ret->push_back(IDocumentRetriever::SP(new MyDocumentRetriever(0, Timestamp(), lastDocId))); + ret->push_back(IDocumentRetriever::SP(new MyDocumentRetriever(document, existingTimestamp, lastDocId))); return ret; } - virtual BucketGuard::UP lockBucket(const storage::spi::Bucket &b) override { + BucketGuard::UP lockBucket(const storage::spi::Bucket &b) override { return BucketGuard::UP(new BucketGuard(b.getBucketId(), *this)); } - virtual void - handleListActiveBuckets(IBucketIdListResultHandler &resultHandler) override - { + void handleListActiveBuckets(IBucketIdListResultHandler &resultHandler) override { BucketIdListResult::List list; resultHandler.handle(BucketIdListResult(list)); } - virtual void - handlePopulateActiveBuckets(document::BucketId::List &buckets, - IGenericResultHandler &resultHandler) override - { + void handlePopulateActiveBuckets(document::BucketId::List &buckets, IGenericResultHandler &resultHandler) override { (void) buckets; resultHandler.handle(Result()); } - virtual void freezeBucket(BucketId bucket) override { + void freezeBucket(BucketId bucket) override { frozen.insert(bucket.getId()); was_frozen.insert(bucket.getId()); } - virtual void thawBucket(BucketId bucket) override { + void thawBucket(BucketId bucket) override { std::multiset<uint64_t>::iterator it = frozen.find(bucket.getId()); ASSERT_TRUE(it != frozen.end()); frozen.erase(it); @@ -425,11 +391,7 @@ HandlerSet::prepareGetModifiedBuckets() class SimplePersistenceEngineOwner : public IPersistenceEngineOwner { - virtual void - setClusterState(const storage::spi::ClusterState &calc) override - { - (void) calc; - } + void setClusterState(const storage::spi::ClusterState &calc) override { (void) calc; } }; struct SimpleResourceWriteFilter : public IResourceWriteFilter diff --git a/searchcore/src/tests/proton/server/feedstates_test.cpp b/searchcore/src/tests/proton/server/feedstates_test.cpp index 15c2fbe5a84..a6f3496e1ed 100644 --- a/searchcore/src/tests/proton/server/feedstates_test.cpp +++ b/searchcore/src/tests/proton/server/feedstates_test.cpp @@ -40,10 +40,8 @@ struct MyFeedView : public test::DummyFeedView { MyFeedView(); ~MyFeedView(); - virtual const DocumentTypeRepo::SP &getDocumentTypeRepo() const override - { return repo_sp; } - virtual void handleRemove(FeedToken *, const RemoveOperation &) override - { ++remove_handled; } + const DocumentTypeRepo::SP &getDocumentTypeRepo() const override { return repo_sp; } + void handleRemove(FeedToken , const RemoveOperation &) override { ++remove_handled; } }; MyFeedView::MyFeedView() : repo_sp(repo.getTypeRepoSp()), remove_handled(0) {} diff --git a/searchcore/src/vespa/searchcore/config/proton.def b/searchcore/src/vespa/searchcore/config/proton.def index ddb2711fac8..e31e578e1e0 100644 --- a/searchcore/src/vespa/searchcore/config/proton.def +++ b/searchcore/src/vespa/searchcore/config/proton.def @@ -256,7 +256,7 @@ summary.log.maxbucketspread double default=2.5 summary.log.minfilesizefactor double default=0.2 ## Number of threads used for compressing incomming documents/compacting. -## Deprecated. Use background.threads instead. +## Deprecated. Use feeding.concurrency instead. ## TODO Remove summary.log.numthreads int default=8 restart @@ -378,11 +378,6 @@ visit.ignoremaxbytes bool default=true ## When set to 0 (default) we use 1 separate thread per document database. initialize.threads int default = 0 -## Number of worker threads doing background compaction/compression tasks. -## They all live i a shared thread pool. -## When set to 0 (default), it will have enough threads to saturate half of the cores. -background.threads int default=0 - ## Portion of enumstore address space that can be used before put and update ## portion of feed is blocked. writefilter.attribute.enumstorelimit double default = 0.9 diff --git a/searchcore/src/vespa/searchcore/proton/common/feedtoken.cpp b/searchcore/src/vespa/searchcore/proton/common/feedtoken.cpp index 008fafa332b..b6223dd41ab 100644 --- a/searchcore/src/vespa/searchcore/proton/common/feedtoken.cpp +++ b/searchcore/src/vespa/searchcore/proton/common/feedtoken.cpp @@ -2,50 +2,35 @@ #include "feedtoken.h" -namespace proton { +namespace proton::feedtoken { -FeedToken::FeedToken(ITransport &transport) : - _state(new State(transport, 1)) -{ -} - -FeedToken::State::State(ITransport & transport, uint32_t numAcksRequired) : +State::State(ITransport & transport) : _transport(transport), _result(new storage::spi::Result()), _documentWasFound(false), - _unAckedCount(numAcksRequired) + _alreadySent(false) { - assert(_unAckedCount > 0); } -FeedToken::State::~State() +State::~State() { - assert(_unAckedCount == 0); + ack(); } void -FeedToken::State::ack() +State::ack() { - uint32_t prev(_unAckedCount--); - if (prev == 1) { + bool alreadySent = _alreadySent.exchange(true); + if ( !alreadySent ) { _transport.send(std::move(_result), _documentWasFound); } - assert(prev >= 1); -} - -void -FeedToken::State::incNeededAcks() -{ - uint32_t prev(_unAckedCount++); - assert(prev >= 1); - (void) prev; } void -FeedToken::State::fail() +State::fail() { - uint32_t prev = _unAckedCount.exchange(0); - if (prev > 0) { + bool alreadySent = _alreadySent.exchange(true); + if ( !alreadySent ) { _transport.send(std::move(_result), _documentWasFound); } } diff --git a/searchcore/src/vespa/searchcore/proton/common/feedtoken.h b/searchcore/src/vespa/searchcore/proton/common/feedtoken.h index 856c8a22652..bc7e249ca3b 100644 --- a/searchcore/src/vespa/searchcore/proton/common/feedtoken.h +++ b/searchcore/src/vespa/searchcore/proton/common/feedtoken.h @@ -2,7 +2,7 @@ #pragma once #include <vespa/persistence/spi/persistenceprovider.h> -#include <vespa/vespalib/util/exception.h> +#include <vespa/searchlib/common/idestructorcallback.h> #include <vespa/vespalib/util/sync.h> #include <atomic> @@ -15,23 +15,19 @@ typedef std::unique_ptr<storage::spi::Result> ResultUP; * for an IFeedHandler to perform an async reply to an operation. A unique * instance of this class is passed to every invokation of the IFeedHandler. */ -class FeedToken { -public: +namespace feedtoken { class ITransport { public: virtual ~ITransport() { } virtual void send(ResultUP result, bool documentWasFound) = 0; }; -private: - class State { + class State : public search::IDestructorCallback { public: State(const State &) = delete; State & operator = (const State &) = delete; - State(ITransport & transport, uint32_t numAcksRequired); - ~State(); - void incNeededAcks(); - void ack(); + State(ITransport & transport); + ~State() override; void fail(); void setResult(ResultUP result, bool documentWasFound) { _documentWasFound = documentWasFound; @@ -39,68 +35,20 @@ private: } const storage::spi::Result &getResult() { return *_result; } private: + void ack(); ITransport &_transport; ResultUP _result; bool _documentWasFound; - std::atomic<uint32_t> _unAckedCount; + std::atomic<bool> _alreadySent; }; - std::shared_ptr<State> _state; - -public: - typedef std::unique_ptr<FeedToken> UP; - typedef std::shared_ptr<FeedToken> SP; - - /** - * Constructs a unique FeedToken. This is the constructor used by the - * FeedEngine when processing input. If the given message is empty, or it - * does not belong to the document protocol, this method throws a - * vespalib::IllegalArgumentException. - * - * @param transport The transport to pass the reply to. - */ - FeedToken(ITransport &transport); - - FeedToken(FeedToken &&) = default; - FeedToken & operator =(FeedToken &&) = default; - FeedToken(const FeedToken &) = default; - FeedToken & operator =(const FeedToken &) = default; - ~FeedToken() = default; - /** - * Passes a receipt back to the originating FeedEngine, declaring that this - * operation succeeded. If an error occured while processing the operation, - * use fail() instead. Invoking this and/or fail() more than once is void. - */ - void ack() const { _state->ack(); } - - void incNeededAcks() const { - _state->incNeededAcks(); + inline std::shared_ptr<State> + make(ITransport & latch) { + return std::make_shared<State>(latch); } +} - /** - * Passes a receipt back to the originating FeedEngine, declaring that this - * operation failed for some reason. Invoking this and/or fail() more than - * once is void. - * - * @param errNum A numerical representation of the error. - * @param errMsg A readable string detailing the error. - */ - void fail() const { _state->fail(); } - - /** - * Gives you access to the underlying result. - * - * @return The result - */ - const storage::spi::Result &getResult() const { return _state->getResult(); } - - /** - * Sets the underlying result. - */ - void setResult(ResultUP result, bool documentWasFound) { - _state->setResult(std::move(result), documentWasFound); - } -}; +using FeedToken = std::shared_ptr<feedtoken::State>; } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp index feebbf4cf2a..6cdec1ec0f9 100644 --- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp +++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp @@ -3,10 +3,6 @@ #include "persistenceengine.h" #include "ipersistenceengineowner.h" #include "transport_latch.h" -#include <vespa/documentapi/messagebus/documentprotocol.h> -#include <vespa/documentapi/messagebus/messages/feedreply.h> -#include <vespa/documentapi/messagebus/messages/removedocumentreply.h> -#include <vespa/documentapi/messagebus/messages/updatedocumentreply.h> #include <vespa/vespalib/stllike/hash_set.h> #include <vespa/fastos/thread.h> @@ -15,9 +11,6 @@ LOG_SETUP(".proton.persistenceengine.persistenceengine"); using document::Document; using document::DocumentId; -using documentapi::DocumentReply; -using documentapi::RemoveDocumentReply; -using mbus::Reply; using storage::spi::BucketChecksum; using storage::spi::BucketIdListResult; using storage::spi::BucketInfo; @@ -49,7 +42,7 @@ ResultHandlerBase::ResultHandlerBase(uint32_t waitCnt) : _lock(), _latch(waitCnt) {} -ResultHandlerBase::~ResultHandlerBase() { } +ResultHandlerBase::~ResultHandlerBase() = default; class GenericResultHandler : public ResultHandlerBase, public IGenericResultHandler { private: @@ -74,7 +67,7 @@ public: const Result &getResult() const { return _result; } }; -GenericResultHandler::~GenericResultHandler() {} +GenericResultHandler::~GenericResultHandler() = default; class BucketIdListResultHandler : public IBucketIdListResultHandler { @@ -85,8 +78,8 @@ public: BucketIdListResultHandler() : _bucketSet() { } - ~BucketIdListResultHandler(); - virtual void handle(const BucketIdListResult &result) override { + ~BucketIdListResultHandler() override; + void handle(const BucketIdListResult &result) override { const BucketIdListResult::List &buckets = result.getList(); for (size_t i = 0; i < buckets.size(); ++i) { _bucketSet.insert(buckets[i]); @@ -103,7 +96,7 @@ public: }; -BucketIdListResultHandler::~BucketIdListResultHandler() {} +BucketIdListResultHandler::~BucketIdListResultHandler() = default; class SynchronizedBucketIdListResultHandler : public ResultHandlerBase, public BucketIdListResultHandler @@ -113,8 +106,8 @@ public: : ResultHandlerBase(waitCnt), BucketIdListResultHandler() { } - ~SynchronizedBucketIdListResultHandler(); - virtual void handle(const BucketIdListResult &result) override { + ~SynchronizedBucketIdListResultHandler() override; + void handle(const BucketIdListResult &result) override { { vespalib::LockGuard guard(_lock); BucketIdListResultHandler::handle(result); @@ -123,7 +116,7 @@ public: } }; -SynchronizedBucketIdListResultHandler::~SynchronizedBucketIdListResultHandler() {} +SynchronizedBucketIdListResultHandler::~SynchronizedBucketIdListResultHandler() = default; class BucketInfoResultHandler : public IBucketInfoResultHandler { private: @@ -135,8 +128,8 @@ public: _first(true) { } - ~BucketInfoResultHandler(); - virtual void handle(const BucketInfoResult &result) override { + ~BucketInfoResultHandler() override; + void handle(const BucketInfoResult &result) override { if (_first) { _result = result; _first = false; @@ -161,12 +154,10 @@ public: const BucketInfoResult &getResult() const { return _result; } }; -BucketInfoResultHandler::~BucketInfoResultHandler() {} +BucketInfoResultHandler::~BucketInfoResultHandler() = default; } -#define NOT_YET throw vespalib::IllegalArgumentException("Not implemented yet") - PersistenceEngine::HandlerSnapshot::UP PersistenceEngine::getHandlerSnapshot() const { @@ -343,26 +334,19 @@ PersistenceEngine::put(const Bucket& b, Timestamp t, const document::Document::S } std::shared_lock<std::shared_timed_mutex> rguard(_rwMutex); DocTypeName docType(doc->getType()); - LOG(spam, - "put(%s, %" PRIu64 ", (\"%s\", \"%s\"))", - b.toString().c_str(), - static_cast<uint64_t>(t.getValue()), - docType.toString().c_str(), - doc->getId().toString().c_str()); + LOG(spam, "put(%s, %" PRIu64 ", (\"%s\", \"%s\"))", b.toString().c_str(), static_cast<uint64_t>(t.getValue()), + docType.toString().c_str(), doc->getId().toString().c_str()); if (!doc->getId().hasDocType()) { - return Result(Result::PERMANENT_ERROR, make_string( - "Old id scheme not supported in elastic mode (%s)", - doc->getId().toString().c_str())); + return Result(Result::PERMANENT_ERROR, + make_string("Old id scheme not supported in elastic mode (%s)", doc->getId().toString().c_str())); } IPersistenceHandler::SP handler = getHandler(b.getBucketSpace(), docType); - if (handler.get() == NULL) { + if (!handler) { return Result(Result::PERMANENT_ERROR, - make_string("No handler for document type '%s'", - docType.toString().c_str())); + make_string("No handler for document type '%s'", docType.toString().c_str())); } TransportLatch latch(1); - FeedToken token(latch); - handler->handlePut(token, b, t, doc); + handler->handlePut(feedtoken::make(latch), b, t, doc); latch.await(); return latch.getResult(); } @@ -371,20 +355,16 @@ PersistenceEngine::RemoveResult PersistenceEngine::remove(const Bucket& b, Timestamp t, const DocumentId& did, Context&) { std::shared_lock<std::shared_timed_mutex> rguard(_rwMutex); - LOG(spam, - "remove(%s, %" PRIu64 ", \"%s\")", - b.toString().c_str(), - static_cast<uint64_t>(t.getValue()), - did.toString().c_str()); + LOG(spam, "remove(%s, %" PRIu64 ", \"%s\")", b.toString().c_str(), + static_cast<uint64_t>(t.getValue()), did.toString().c_str()); HandlerSnapshot::UP snap = getHandlerSnapshot(b.getBucketSpace(), did); - if (!snap.get()) { + if (!snap) { return RemoveResult(false); } TransportLatch latch(snap->size()); for (; snap->handlers().valid(); snap->handlers().next()) { IPersistenceHandler *handler = snap->handlers().get(); - FeedToken token(latch); - handler->handleRemove(token, b, t, did); + handler->handleRemove(feedtoken::make(latch), b, t, did); } latch.await(); return latch.getRemoveResult(); @@ -404,24 +384,20 @@ PersistenceEngine::update(const Bucket& b, Timestamp t, const DocumentUpdate::SP } std::shared_lock<std::shared_timed_mutex> rguard(_rwMutex); DocTypeName docType(upd->getType()); - LOG(spam, - "update(%s, %" PRIu64 ", (\"%s\", \"%s\"), createIfNonExistent='%s')", - b.toString().c_str(), - static_cast<uint64_t>(t.getValue()), - docType.toString().c_str(), - upd->getId().toString().c_str(), - (upd->getCreateIfNonExistent() ? "true" : "false")); + LOG(spam, "update(%s, %" PRIu64 ", (\"%s\", \"%s\"), createIfNonExistent='%s')", + b.toString().c_str(), static_cast<uint64_t>(t.getValue()), docType.toString().c_str(), + upd->getId().toString().c_str(), (upd->getCreateIfNonExistent() ? "true" : "false")); IPersistenceHandler::SP handler = getHandler(b.getBucketSpace(), docType); - TransportLatch latch(1); - if (handler.get() != NULL) { - FeedToken token(latch); + + if (handler) { + TransportLatch latch(1); LOG(debug, "update = %s", upd->toXml().c_str()); - handler->handleUpdate(token, b, t, upd); + handler->handleUpdate(feedtoken::make(latch), b, t, upd); latch.await(); + return latch.getUpdateResult(); } else { return UpdateResult(Result::PERMANENT_ERROR, make_string("No handler for document type '%s'", docType.toString().c_str())); } - return latch.getUpdateResult(); } @@ -486,7 +462,7 @@ PersistenceEngine::iterate(IteratorId id, uint64_t maxByteSize, Context&) const { std::shared_lock<std::shared_timed_mutex> rguard(_rwMutex); LockGuard guard(_iterators_lock); - Iterators::const_iterator it = _iterators.find(id); + auto it = _iterators.find(id); if (it == _iterators.end()) { return IterateResult(Result::PERMANENT_ERROR, make_string("Unknown iterator with id %" PRIu64, id.getValue())); } @@ -517,7 +493,7 @@ PersistenceEngine::destroyIterator(IteratorId id, Context&) { std::shared_lock<std::shared_timed_mutex> rguard(_rwMutex); LockGuard guard(_iterators_lock); - Iterators::iterator it = _iterators.find(id); + auto it = _iterators.find(id); if (it == _iterators.end()) { return Result(); } @@ -539,8 +515,7 @@ PersistenceEngine::createBucket(const Bucket &b, Context &) TransportLatch latch(snap->size()); for (; snap->handlers().valid(); snap->handlers().next()) { IPersistenceHandler *handler = snap->handlers().get(); - FeedToken token(latch); - handler->handleCreateBucket(token, b); + handler->handleCreateBucket(feedtoken::make(latch), b); } latch.await(); return latch.getResult(); @@ -556,8 +531,7 @@ PersistenceEngine::deleteBucket(const Bucket& b, Context&) TransportLatch latch(snap->size()); for (; snap->handlers().valid(); snap->handlers().next()) { IPersistenceHandler *handler = snap->handlers().get(); - FeedToken token(latch); - handler->handleDeleteBucket(token, b); + handler->handleDeleteBucket(feedtoken::make(latch), b); } latch.await(); return latch.getResult(); @@ -599,8 +573,7 @@ PersistenceEngine::split(const Bucket& source, const Bucket& target1, const Buck TransportLatch latch(snap->size()); for (; snap->handlers().valid(); snap->handlers().next()) { IPersistenceHandler *handler = snap->handlers().get(); - FeedToken token(latch); - handler->handleSplit(token, source, target1, target2); + handler->handleSplit(feedtoken::make(latch), source, target1, target2); } latch.await(); return latch.getResult(); @@ -618,8 +591,7 @@ PersistenceEngine::join(const Bucket& source1, const Bucket& source2, const Buck TransportLatch latch(snap->size()); for (; snap->handlers().valid(); snap->handlers().next()) { IPersistenceHandler *handler = snap->handlers().get(); - FeedToken token(latch); - handler->handleJoin(token, source1, source2, target); + handler->handleJoin(feedtoken::make(latch), source1, source2, target); } latch.await(); return latch.getResult(); @@ -676,7 +648,7 @@ void PersistenceEngine::propagateSavedClusterState(IPersistenceHandler &handler) { ClusterState::SP clusterState(savedClusterState()); - if (clusterState.get() == NULL) + if (!clusterState) return; // Propagate saved cluster state. // TODO: Fix race with new cluster state setting. @@ -699,13 +671,13 @@ PersistenceEngine::grabExtraModifiedBuckets(BucketSpace bucketSpace, IPersistenc class ActiveBucketIdListResultHandler : public IBucketIdListResultHandler { private: - typedef std::map<document::BucketId, size_t> BucketIdMap; - typedef std::pair<BucketIdMap::iterator, bool> IR; + using BucketIdMap = std::map<document::BucketId, size_t>; + using IR = std::pair<BucketIdMap::iterator, bool>; BucketIdMap _bucketMap; public: ActiveBucketIdListResultHandler() : _bucketMap() { } - virtual void handle(const BucketIdListResult &result) override { + void handle(const BucketIdListResult &result) override { const BucketIdListResult::List &buckets = result.getList(); for (size_t i = 0; i < buckets.size(); ++i) { IR ir(_bucketMap.insert(std::make_pair(buckets[i], 1u))); diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.cpp b/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.cpp index e0d512ae6e0..12ba5e7ab29 100644 --- a/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.cpp +++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.cpp @@ -3,6 +3,7 @@ #include "transport_latch.h" #include <vespa/vespalib/util/stringfmt.h> +using vespalib::make_string; using storage::spi::Result; namespace proton { @@ -13,7 +14,7 @@ TransportLatch::TransportLatch(uint32_t cnt) _result() {} -TransportLatch::~TransportLatch() {} +TransportLatch::~TransportLatch() = default; void TransportLatch::send(ResultUP result, bool documentWasFound) @@ -35,7 +36,7 @@ Result TransportLatch::mergeErrorResults(const Result &lhs, const Result &rhs) { Result::ErrorType error = (lhs.getErrorCode() > rhs.getErrorCode() ? lhs : rhs).getErrorCode(); - return Result(error, vespalib::make_string("%s, %s", lhs.getErrorMessage().c_str(), rhs.getErrorMessage().c_str())); + return Result(error, make_string("%s, %s", lhs.getErrorMessage().c_str(), rhs.getErrorMessage().c_str())); } } // proton diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.h b/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.h index 12f92722dfa..10dae553a80 100644 --- a/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.h +++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.h @@ -11,8 +11,11 @@ namespace proton { * Implementation of FeedToken::ITransport for handling the async reply for an operation. * Uses an internal count down latch to keep track the number of outstanding replies. */ -class TransportLatch : public FeedToken::ITransport { +class TransportLatch : public feedtoken::ITransport { private: + using Result = storage::spi::Result; + using UpdateResult = storage::spi::UpdateResult; + using RemoveResult = storage::spi::RemoveResult; vespalib::CountDownLatch _latch; vespalib::Lock _lock; ResultUP _result; @@ -24,17 +27,16 @@ public: void await() { _latch.await(); } - const storage::spi::UpdateResult &getUpdateResult() const { - return dynamic_cast<const storage::spi::UpdateResult &>(*_result); + const UpdateResult &getUpdateResult() const { + return dynamic_cast<const UpdateResult &>(*_result); } - const storage::spi::Result &getResult() const { + const Result &getResult() const { return *_result; } - const storage::spi::RemoveResult &getRemoveResult() const { - return dynamic_cast<const storage::spi::RemoveResult &>(*_result); + const RemoveResult &getRemoveResult() const { + return dynamic_cast<const RemoveResult &>(*_result); } - static storage::spi::Result mergeErrorResults(const storage::spi::Result &lhs, - const storage::spi::Result &rhs); + static Result mergeErrorResults(const Result &lhs, const Result &rhs); }; } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.cpp b/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.cpp index 30ee6e1ba75..106ce846d99 100644 --- a/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.cpp @@ -117,19 +117,16 @@ CombiningFeedView::preparePut(PutOperation &putOp) } void -CombiningFeedView::handlePut(FeedToken *token, - const PutOperation &putOp) +CombiningFeedView::handlePut(FeedToken token, const PutOperation &putOp) { assert(putOp.getValidDbdId()); uint32_t subDbId = putOp.getSubDbId(); uint32_t prevSubDbId = putOp.getPrevSubDbId(); if (putOp.getValidPrevDbdId() && prevSubDbId != subDbId) { - if (token != NULL) - token->incNeededAcks(); _views[subDbId]->handlePut(token, putOp); - _views[prevSubDbId]->handlePut(token, putOp); + _views[prevSubDbId]->handlePut(std::move(token), putOp); } else { - _views[subDbId]->handlePut(token, putOp); + _views[subDbId]->handlePut(std::move(token), putOp); } } @@ -143,14 +140,13 @@ CombiningFeedView::prepareUpdate(UpdateOperation &updOp) } void -CombiningFeedView::handleUpdate(FeedToken *token, - const UpdateOperation &updOp) +CombiningFeedView::handleUpdate(FeedToken token, const UpdateOperation &updOp) { assert(updOp.getValidDbdId()); assert(updOp.getValidPrevDbdId()); assert(!updOp.changedDbdId()); uint32_t subDbId(updOp.getSubDbId()); - _views[subDbId]->handleUpdate(token, updOp); + _views[subDbId]->handleUpdate(std::move(token), updOp); } void @@ -165,19 +161,16 @@ CombiningFeedView::prepareRemove(RemoveOperation &rmOp) } void -CombiningFeedView::handleRemove(FeedToken *token, - const RemoveOperation &rmOp) +CombiningFeedView::handleRemove(FeedToken token, const RemoveOperation &rmOp) { if (rmOp.getValidDbdId()) { uint32_t subDbId = rmOp.getSubDbId(); uint32_t prevSubDbId = rmOp.getPrevSubDbId(); if (rmOp.getValidPrevDbdId() && prevSubDbId != subDbId) { - if (token != NULL) - token->incNeededAcks(); _views[subDbId]->handleRemove(token, rmOp); - _views[prevSubDbId]->handleRemove(token, rmOp); + _views[prevSubDbId]->handleRemove(std::move(token), rmOp); } else { - _views[subDbId]->handleRemove(token, rmOp); + _views[subDbId]->handleRemove(std::move(token), rmOp); } } else { assert(rmOp.getValidPrevDbdId()); diff --git a/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.h b/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.h index ea4ac64176a..284004dfa81 100644 --- a/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.h +++ b/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.h @@ -16,8 +16,7 @@ #include "replaypacketdispatcher.h" #include "ibucketstatecalculator.h" -namespace proton -{ +namespace proton { class CombiningFeedView : public IFeedView @@ -63,7 +62,7 @@ public: document::BucketSpace bucketSpace, const IBucketStateCalculator::SP &calc); - virtual ~CombiningFeedView(); + ~CombiningFeedView() override; const document::DocumentTypeRepo::SP & getDocumentTypeRepo() const override; @@ -72,11 +71,11 @@ public: */ void preparePut(PutOperation &putOp) override; - void handlePut(FeedToken *token, const PutOperation &putOp) override; + void handlePut(FeedToken token, const PutOperation &putOp) override; void prepareUpdate(UpdateOperation &updOp) override; - void handleUpdate(FeedToken *token, const UpdateOperation &updOp) override; + void handleUpdate(FeedToken token, const UpdateOperation &updOp) override; void prepareRemove(RemoveOperation &rmOp) override; - void handleRemove(FeedToken *token, const RemoveOperation &rmOp) override; + void handleRemove(FeedToken token, const RemoveOperation &rmOp) override; void prepareDeleteBucket(DeleteBucketOperation &delOp) override; void handleDeleteBucket(const DeleteBucketOperation &delOp) override; void prepareMove(MoveOperation &putOp) override; diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp index 9300af5c3f0..b01ba43cb49 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp @@ -8,11 +8,6 @@ #include "tlcproxy.h" #include "configstore.h" #include <vespa/document/datatype/documenttype.h> -#include <vespa/documentapi/messagebus/documentprotocol.h> -#include <vespa/documentapi/messagebus/messages/documentreply.h> -#include <vespa/documentapi/messagebus/messages/feedreply.h> -#include <vespa/documentapi/messagebus/messages/removedocumentreply.h> -#include <vespa/documentapi/messagebus/messages/updatedocumentreply.h> #include <vespa/searchcore/proton/bucketdb/ibucketdbhandler.h> #include <vespa/searchcore/proton/persistenceengine/i_resource_write_filter.h> #include <vespa/searchcore/proton/persistenceengine/transport_latch.h> @@ -27,11 +22,6 @@ LOG_SETUP(".proton.server.feedhandler"); using document::BucketId; using document::Document; using document::DocumentTypeRepo; -using documentapi::DocumentProtocol; -using documentapi::DocumentReply; -using documentapi::FeedReply; -using documentapi::RemoveDocumentReply; -using documentapi::UpdateDocumentReply; using storage::spi::PartitionId; using storage::spi::RemoveResult; using storage::spi::Result; @@ -90,31 +80,30 @@ FeedHandler::doHandleOperation(FeedToken token, FeedOperation::UP op) { assert(_writeService.master().isCurrentThread()); LockGuard guard(_feedLock); - _feedState->handleOperation(token, std::move(op)); + _feedState->handleOperation(std::move(token), std::move(op)); } -void FeedHandler::performPut(FeedToken::UP token, PutOperation &op) { +void FeedHandler::performPut(FeedToken token, PutOperation &op) { op.assertValid(); _activeFeedView->preparePut(op); if (ignoreOperation(op)) { LOG(debug, "performPut(): ignoreOperation: docId(%s), timestamp(%" PRIu64 "), prevTimestamp(%" PRIu64 ")", op.getDocument()->getId().toString().c_str(), (uint64_t)op.getTimestamp(), (uint64_t)op.getPrevTimestamp()); if (token) { - token->setResult(ResultUP(new Result), false); - token->ack(); + token->setResult(std::make_unique<Result>(), false); } return; } storeOperation(op); if (token) { - token->setResult(ResultUP(new Result), false); + token->setResult(std::make_unique<Result>(), false); } - _activeFeedView->handlePut(token.get(), op); + _activeFeedView->handlePut(std::move(token), op); } void -FeedHandler::performUpdate(FeedToken::UP token, UpdateOperation &op) +FeedHandler::performUpdate(FeedToken token, UpdateOperation &op) { _activeFeedView->prepareUpdate(op); if (op.getPrevDbDocumentId().valid() && !op.getPrevMarkedAsRemoved()) { @@ -123,26 +112,25 @@ FeedHandler::performUpdate(FeedToken::UP token, UpdateOperation &op) createNonExistingDocument(std::move(token), op); } else { if (token) { - token->setResult(ResultUP(new UpdateResult(Timestamp(0))), false); - token->ack(); + token->setResult(std::make_unique<UpdateResult>(Timestamp(0)), false); } } } void -FeedHandler::performInternalUpdate(FeedToken::UP token, UpdateOperation &op) +FeedHandler::performInternalUpdate(FeedToken token, UpdateOperation &op) { storeOperation(op); if (token) { token->setResult(ResultUP(new UpdateResult(op.getPrevTimestamp())), true); } - _activeFeedView->handleUpdate(token.get(), op); + _activeFeedView->handleUpdate(std::move(token), op); } void -FeedHandler::createNonExistingDocument(FeedToken::UP token, const UpdateOperation &op) +FeedHandler::createNonExistingDocument(FeedToken token, const UpdateOperation &op) { Document::SP doc(new Document(op.getUpdate()->getType(), op.getUpdate()->getId())); doc->setRepo(*_activeFeedView->getDocumentTypeRepo()); @@ -154,23 +142,18 @@ FeedHandler::createNonExistingDocument(FeedToken::UP token, const UpdateOperatio token->setResult(ResultUP(new UpdateResult(putOp.getTimestamp())), true); } TransportLatch latch(1); - FeedToken putToken(latch); - _activeFeedView->handlePut(&putToken, putOp); + _activeFeedView->handlePut(feedtoken::make(latch), putOp); latch.await(); - if (token) { - token->ack(); - } } -void FeedHandler::performRemove(FeedToken::UP token, RemoveOperation &op) { +void FeedHandler::performRemove(FeedToken token, RemoveOperation &op) { _activeFeedView->prepareRemove(op); if (ignoreOperation(op)) { LOG(debug, "performRemove(): ignoreOperation: docId(%s), timestamp(%" PRIu64 "), prevTimestamp(%" PRIu64 ")", op.getDocumentId().toString().c_str(), (uint64_t)op.getTimestamp(), (uint64_t)op.getPrevTimestamp()); if (token) { token->setResult(ResultUP(new RemoveResult(false)), false); - token->ack(); } return; } @@ -182,70 +165,60 @@ void FeedHandler::performRemove(FeedToken::UP token, RemoveOperation &op) { bool documentWasFound = !op.getPrevMarkedAsRemoved(); token->setResult(ResultUP(new RemoveResult(documentWasFound)), documentWasFound); } - _activeFeedView->handleRemove(token.get(), op); + _activeFeedView->handleRemove(std::move(token), op); } else if (op.hasDocType()) { assert(op.getDocType() == _docTypeName.getName()); storeOperation(op); if (token) { token->setResult(ResultUP(new RemoveResult(false)), false); } - _activeFeedView->handleRemove(token.get(), op); + _activeFeedView->handleRemove(std::move(token), op); } else { if (token) { token->setResult(ResultUP(new RemoveResult(false)), false); - token->ack(); } } } void -FeedHandler::performGarbageCollect(FeedToken::UP token) +FeedHandler::performGarbageCollect(FeedToken token) { - if (token) { - token->ack(); - } + (void) token; } void -FeedHandler::performCreateBucket(FeedToken::UP token, CreateBucketOperation &op) +FeedHandler::performCreateBucket(FeedToken token, CreateBucketOperation &op) { + (void) token; storeOperation(op); _bucketDBHandler->handleCreateBucket(op.getBucketId()); - if (token) { - token->ack(); - } } -void FeedHandler::performDeleteBucket(FeedToken::UP token, DeleteBucketOperation &op) { +void FeedHandler::performDeleteBucket(FeedToken token, DeleteBucketOperation &op) { + (void) token; _activeFeedView->prepareDeleteBucket(op); storeOperation(op); // Delete documents in bucket _activeFeedView->handleDeleteBucket(op); // Delete bucket itself, should no longer have documents. _bucketDBHandler->handleDeleteBucket(op.getBucketId()); - if (token) { - token->ack(); - } + } -void FeedHandler::performSplit(FeedToken::UP token, SplitBucketOperation &op) { +void FeedHandler::performSplit(FeedToken token, SplitBucketOperation &op) { + (void) token; storeOperation(op); _bucketDBHandler->handleSplit(op.getSerialNum(), op.getSource(), op.getTarget1(), op.getTarget2()); - if (token) { - token->ack(); - } } -void FeedHandler::performJoin(FeedToken::UP token, JoinBucketsOperation &op) { +void FeedHandler::performJoin(FeedToken token, JoinBucketsOperation &op) { + (void) token; storeOperation(op); _bucketDBHandler->handleJoin(op.getSerialNum(), op.getSource1(), op.getSource2(), op.getTarget()); - if (token) { - token->ack(); - } } @@ -328,7 +301,7 @@ void FeedHandler::changeFeedState(FeedState::SP newState) { LockGuard guard(_feedLock); - changeFeedState(newState, guard); + changeFeedState(std::move(newState), guard); } @@ -466,8 +439,8 @@ isRejectableFeedOperation(FeedOperation::Type type) } template <typename ResultType> -void feedOperationRejected(FeedToken *token, const vespalib::string &opType, const vespalib::string &docId, - DocTypeName docTypeName, const vespalib::string &rejectMessage) +void feedOperationRejected(FeedToken & token, const vespalib::string &opType, const vespalib::string &docId, + const DocTypeName & docTypeName, const vespalib::string &rejectMessage) { if (token) { auto message = make_string("%s operation rejected for document '%s' of type '%s': '%s'", @@ -478,8 +451,8 @@ void feedOperationRejected(FeedToken *token, const vespalib::string &opType, con } void -notifyFeedOperationRejected(FeedToken *token, const FeedOperation &op, - DocTypeName docTypeName, const vespalib::string &rejectMessage) +notifyFeedOperationRejected(FeedToken & token, const FeedOperation &op, + const DocTypeName & docTypeName, const vespalib::string &rejectMessage) { if ((op.getType() == FeedOperation::UPDATE_42) || (op.getType() == FeedOperation::UPDATE)) { vespalib::string docId = (static_cast<const UpdateOperation &>(op)).getUpdate()->getId().toString(); @@ -495,7 +468,7 @@ notifyFeedOperationRejected(FeedToken *token, const FeedOperation &op, } bool -FeedHandler::considerWriteOperationForRejection(FeedToken *token, const FeedOperation &op) +FeedHandler::considerWriteOperationForRejection(FeedToken & token, const FeedOperation &op) { if (!_writeFilter.acceptWriteOperation() && isRejectableFeedOperation(op.getType())) { IResourceWriteFilter::State state = _writeFilter.getAcceptState(); @@ -508,9 +481,9 @@ FeedHandler::considerWriteOperationForRejection(FeedToken *token, const FeedOper } void -FeedHandler::performOperation(FeedToken::UP token, FeedOperation::UP op) +FeedHandler::performOperation(FeedToken token, FeedOperation::UP op) { - if (considerWriteOperationForRejection(token.get(), *op)) { + if (considerWriteOperationForRejection(token, *op)) { return; } switch(op->getType()) { diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h index dc955cfeb79..d717346883a 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h +++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h @@ -11,6 +11,7 @@ #include "transactionlogmanager.h" #include <persistence/spi/types.h> #include <vespa/searchcore/proton/common/doctypename.h> +#include <vespa/searchcore/proton/common/feedtoken.h> #include <vespa/searchlib/transactionlog/translogclient.h> namespace searchcorespi { namespace index { class IThreadingService; } } @@ -22,7 +23,6 @@ class DDBState; class DeleteBucketOperation; class FeedConfigStore; class FeedState; -class FeedToken; class IDocumentDBOwner; class IFeedHandlerOwner; class IFeedView; @@ -54,7 +54,6 @@ private: typedef storage::spi::Timestamp Timestamp; typedef document::BucketId BucketId; using FeedStateSP = std::shared_ptr<FeedState>; - using FeedTokenUP = std::unique_ptr<FeedToken>; using FeedOperationUP = std::unique_ptr<FeedOperation>; class TlsMgrWriter : public TlsWriter { @@ -101,24 +100,24 @@ private: */ void doHandleOperation(FeedToken token, FeedOperationUP op); - bool considerWriteOperationForRejection(FeedToken *token, const FeedOperation &op); + bool considerWriteOperationForRejection(FeedToken & token, const FeedOperation &op); /** * Delayed execution of feed operations against feed view, in * master write thread. */ - void performPut(FeedTokenUP token, PutOperation &op); - - void performUpdate(FeedTokenUP token, UpdateOperation &op); - void performInternalUpdate(FeedTokenUP token, UpdateOperation &op); - void createNonExistingDocument(FeedTokenUP, const UpdateOperation &op); - - void performRemove(FeedTokenUP token, RemoveOperation &op); - void performGarbageCollect(FeedTokenUP token); - void performCreateBucket(FeedTokenUP token, CreateBucketOperation &op); - void performDeleteBucket(FeedTokenUP token, DeleteBucketOperation &op); - void performSplit(FeedTokenUP token, SplitBucketOperation &op); - void performJoin(FeedTokenUP token, JoinBucketsOperation &op); + void performPut(FeedToken token, PutOperation &op); + + void performUpdate(FeedToken token, UpdateOperation &op); + void performInternalUpdate(FeedToken token, UpdateOperation &op); + void createNonExistingDocument(FeedToken, const UpdateOperation &op); + + void performRemove(FeedToken token, RemoveOperation &op); + void performGarbageCollect(FeedToken token); + void performCreateBucket(FeedToken token, CreateBucketOperation &op); + void performDeleteBucket(FeedToken token, DeleteBucketOperation &op); + void performSplit(FeedToken token, SplitBucketOperation &op); + void performJoin(FeedToken token, JoinBucketsOperation &op); void performSync(); void performEof(); @@ -223,7 +222,7 @@ public: vespalib::string getDocTypeName() const { return _docTypeName.getName(); } void tlsPrune(SerialNum oldest_to_keep); - void performOperation(FeedTokenUP token, FeedOperationUP op); + void performOperation(FeedToken token, FeedOperationUP op); void handleOperation(FeedToken token, FeedOperationUP op); void handleMove(MoveOperation &op, std::shared_ptr<search::IDestructorCallback> moveDoneCtx) override; @@ -240,4 +239,3 @@ public: }; } // namespace proton - diff --git a/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp b/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp index 6219cadd24f..91b310e3925 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp @@ -82,13 +82,13 @@ public: } virtual void replay(const PutOperation &op) override { - _feed_view_ptr->handlePut(NULL, op); + _feed_view_ptr->handlePut(FeedToken(), op); } virtual void replay(const RemoveOperation &op) override { - _feed_view_ptr->handleRemove(NULL, op); + _feed_view_ptr->handleRemove(FeedToken(), op); } virtual void replay(const UpdateOperation &op) override { - _feed_view_ptr->handleUpdate(NULL, op); + _feed_view_ptr->handleUpdate(FeedToken(), op); } virtual void replay(const NoopOperation &) override {} // ignored virtual void replay(const NewConfigOperation &op) override { @@ -100,16 +100,12 @@ public: _feed_view_ptr->handleDeleteBucket(op); } virtual void replay(const SplitBucketOperation &op) override { - _bucketDBHandler.handleSplit(op.getSerialNum(), - op.getSource(), - op.getTarget1(), - op.getTarget2()); + _bucketDBHandler.handleSplit(op.getSerialNum(), op.getSource(), + op.getTarget1(), op.getTarget2()); } virtual void replay(const JoinBucketsOperation &op) override { - _bucketDBHandler.handleJoin(op.getSerialNum(), - op.getSource1(), - op.getSource2(), - op.getTarget()); + _bucketDBHandler.handleJoin(op.getSerialNum(), op.getSource1(), + op.getSource2(), op.getTarget()); } virtual void replay(const PruneRemovedDocumentsOperation &op) override { _feed_view_ptr->handlePruneRemovedDocuments(op); diff --git a/searchcore/src/vespa/searchcore/proton/server/feedstates.h b/searchcore/src/vespa/searchcore/proton/server/feedstates.h index 61e80d1c7cc..963a78d0d6b 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedstates.h +++ b/searchcore/src/vespa/searchcore/proton/server/feedstates.h @@ -79,13 +79,11 @@ public: _handler(handler) { } - virtual void handleOperation(FeedToken token, FeedOperation::UP op) override { - _handler.performOperation(FeedToken::UP(new FeedToken(token)), std::move(op)); + void handleOperation(FeedToken token, FeedOperation::UP op) override { + _handler.performOperation(std::move(token), std::move(op)); } - virtual void - receive(const PacketWrapper::SP &wrap, vespalib::Executor &) override - { + void receive(const PacketWrapper::SP &wrap, vespalib::Executor &) override { throwExceptionInReceive(_handler.getDocTypeName().c_str(), wrap->packet.range().from(), wrap->packet.range().to(), diff --git a/searchcore/src/vespa/searchcore/proton/server/ifeedview.h b/searchcore/src/vespa/searchcore/proton/server/ifeedview.h index c8222902c5d..3e1635d0f33 100644 --- a/searchcore/src/vespa/searchcore/proton/server/ifeedview.h +++ b/searchcore/src/vespa/searchcore/proton/server/ifeedview.h @@ -2,19 +2,17 @@ #pragma once +#include <vespa/searchcore/proton/common/feedtoken.h> #include <vespa/searchlib/common/serialnum.h> -#include <memory> namespace document { class DocumentTypeRepo; } namespace search { class IDestructorCallback; } -namespace proton -{ +namespace proton { class CompactLidSpaceOperation; class DeleteBucketOperation; -class FeedToken; class ISimpleDocumentMetaStore; class MoveOperation; class PruneRemovedDocumentsOperation; @@ -49,11 +47,11 @@ public: */ virtual void preparePut(PutOperation &putOp) = 0; - virtual void handlePut(FeedToken *token, const PutOperation &putOp) = 0; + virtual void handlePut(FeedToken token, const PutOperation &putOp) = 0; virtual void prepareUpdate(UpdateOperation &updOp) = 0; - virtual void handleUpdate(FeedToken *token, const UpdateOperation &updOp) = 0; + virtual void handleUpdate(FeedToken token, const UpdateOperation &updOp) = 0; virtual void prepareRemove(RemoveOperation &rmOp) = 0; - virtual void handleRemove(FeedToken *token, const RemoveOperation &rmOp) = 0; + virtual void handleRemove(FeedToken token, const RemoveOperation &rmOp) = 0; virtual void prepareDeleteBucket(DeleteBucketOperation &delOp) = 0; virtual void handleDeleteBucket(const DeleteBucketOperation &delOp) = 0; virtual void prepareMove(MoveOperation &putOp) = 0; diff --git a/searchcore/src/vespa/searchcore/proton/server/iheartbeathandler.h b/searchcore/src/vespa/searchcore/proton/server/iheartbeathandler.h index 7919d2f383b..3e8267f5aa5 100644 --- a/searchcore/src/vespa/searchcore/proton/server/iheartbeathandler.h +++ b/searchcore/src/vespa/searchcore/proton/server/iheartbeathandler.h @@ -2,22 +2,14 @@ #pragma once -namespace proton -{ - -class FeedToken; +namespace proton { class IHeartBeatHandler { public: - virtual void - heartBeat() = 0; + virtual void heartBeat() = 0; - virtual - ~IHeartBeatHandler() - { - } + virtual ~IHeartBeatHandler() = default; }; } // namespace proton - diff --git a/searchcore/src/vespa/searchcore/proton/server/operationdonecontext.cpp b/searchcore/src/vespa/searchcore/proton/server/operationdonecontext.cpp index 9c0115f0084..31e4c87b352 100644 --- a/searchcore/src/vespa/searchcore/proton/server/operationdonecontext.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/operationdonecontext.cpp @@ -5,7 +5,7 @@ namespace proton { -OperationDoneContext::OperationDoneContext(std::unique_ptr<FeedToken> token) +OperationDoneContext::OperationDoneContext(FeedToken token) : _token(std::move(token)) { } @@ -18,10 +18,7 @@ OperationDoneContext::~OperationDoneContext() void OperationDoneContext::ack() { - if (_token) { - std::unique_ptr<FeedToken> token(std::move(_token)); - token->ack(); - } + _token.reset(); } } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/operationdonecontext.h b/searchcore/src/vespa/searchcore/proton/server/operationdonecontext.h index b801987844b..4c310abf871 100644 --- a/searchcore/src/vespa/searchcore/proton/server/operationdonecontext.h +++ b/searchcore/src/vespa/searchcore/proton/server/operationdonecontext.h @@ -3,12 +3,10 @@ #pragma once #include <vespa/searchlib/common/idestructorcallback.h> -#include <vespa/searchcore/proton/feedoperation/feedoperation.h> +#include <vespa/searchcore/proton/common/feedtoken.h> namespace proton { -class FeedToken; - /** * Context class for document operations that acks operation when * instance is destroyed. Typically a shared pointer to an instance is @@ -18,15 +16,15 @@ class FeedToken; */ class OperationDoneContext : public search::IDestructorCallback { - std::unique_ptr<FeedToken> _token; + FeedToken _token; protected: void ack(); public: - OperationDoneContext(std::unique_ptr<FeedToken> token); + OperationDoneContext(FeedToken token); ~OperationDoneContext() override; - FeedToken *getToken() { return _token.get(); } + bool hasToken() const { return static_cast<bool>(_token); } }; diff --git a/searchcore/src/vespa/searchcore/proton/server/putdonecontext.cpp b/searchcore/src/vespa/searchcore/proton/server/putdonecontext.cpp index 649eebb26f5..fd7871773c8 100644 --- a/searchcore/src/vespa/searchcore/proton/server/putdonecontext.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/putdonecontext.cpp @@ -1,19 +1,14 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "putdonecontext.h" -#include <vespa/searchcore/proton/common/feedtoken.h> #include <vespa/searchcore/proton/common/docid_limit.h> #include <vespa/searchcore/proton/reference/i_gid_to_lid_change_handler.h> namespace proton { -PutDoneContext::PutDoneContext(std::unique_ptr<FeedToken> token, - - IGidToLidChangeHandler &gidToLidChangeHandler, - const document::GlobalId &gid, - uint32_t lid, - search::SerialNum serialNum, - bool enableNotifyPut) +PutDoneContext::PutDoneContext(FeedToken token, IGidToLidChangeHandler &gidToLidChangeHandler, + const document::GlobalId &gid, uint32_t lid, + search::SerialNum serialNum, bool enableNotifyPut) : OperationDoneContext(std::move(token)), _lid(lid), _docIdLimit(nullptr), diff --git a/searchcore/src/vespa/searchcore/proton/server/putdonecontext.h b/searchcore/src/vespa/searchcore/proton/server/putdonecontext.h index 3e98b02dda6..587c8f9054d 100644 --- a/searchcore/src/vespa/searchcore/proton/server/putdonecontext.h +++ b/searchcore/src/vespa/searchcore/proton/server/putdonecontext.h @@ -28,7 +28,7 @@ class PutDoneContext : public OperationDoneContext bool _enableNotifyPut; public: - PutDoneContext(std::unique_ptr<FeedToken> token, IGidToLidChangeHandler &gidToLidChangeHandler, + PutDoneContext(FeedToken token, IGidToLidChangeHandler &gidToLidChangeHandler, const document::GlobalId &gid, uint32_t lid, search::SerialNum serialNum, bool enableNotifyPut); ~PutDoneContext() override; diff --git a/searchcore/src/vespa/searchcore/proton/server/removedonecontext.cpp b/searchcore/src/vespa/searchcore/proton/server/removedonecontext.cpp index bd9a8240d73..194e190bb7b 100644 --- a/searchcore/src/vespa/searchcore/proton/server/removedonecontext.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/removedonecontext.cpp @@ -2,16 +2,13 @@ #include "removedonecontext.h" #include "removedonetask.h" -#include <vespa/searchcore/proton/common/feedtoken.h> #include <vespa/searchcore/proton/reference/i_gid_to_lid_change_handler.h> namespace proton { -RemoveDoneContext::RemoveDoneContext(std::unique_ptr<FeedToken> token, - vespalib::Executor &executor, +RemoveDoneContext::RemoveDoneContext(FeedToken token, vespalib::Executor &executor, IDocumentMetaStore &documentMetaStore, - PendingNotifyRemoveDone &&pendingNotifyRemoveDone, - uint32_t lid) + PendingNotifyRemoveDone &&pendingNotifyRemoveDone, uint32_t lid) : OperationDoneContext(std::move(token)), _executor(executor), _task(), diff --git a/searchcore/src/vespa/searchcore/proton/server/removedonecontext.h b/searchcore/src/vespa/searchcore/proton/server/removedonecontext.h index 83f6013dd85..912d31dde22 100644 --- a/searchcore/src/vespa/searchcore/proton/server/removedonecontext.h +++ b/searchcore/src/vespa/searchcore/proton/server/removedonecontext.h @@ -28,10 +28,8 @@ class RemoveDoneContext : public OperationDoneContext PendingNotifyRemoveDone _pendingNotifyRemoveDone; public: - RemoveDoneContext(std::unique_ptr<FeedToken> token, vespalib::Executor &executor, - IDocumentMetaStore &documentMetaStore, PendingNotifyRemoveDone &&pendingNotifyRemoveDone, - uint32_t lid); - + RemoveDoneContext(FeedToken token, vespalib::Executor &executor, IDocumentMetaStore &documentMetaStore, + PendingNotifyRemoveDone &&pendingNotifyRemoveDone, uint32_t lid); ~RemoveDoneContext() override; }; diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp index 8c8559115bd..33e8708799f 100644 --- a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp @@ -36,26 +36,14 @@ namespace proton { namespace { -FeedToken::UP dupFeedToken(FeedToken *token) -{ - // If token is not nullptr then a new feed token is created, referencing - // same shared state as old token. - if (token != nullptr) { - return std::make_unique<FeedToken>(*token); - } else { - return FeedToken::UP(); - } -} - class PutDoneContextForMove : public PutDoneContext { private: IDestructorCallback::SP _moveDoneCtx; public: - PutDoneContextForMove(std::unique_ptr<FeedToken> token, - IGidToLidChangeHandler &gidToLidChangeHandler, - const document::GlobalId &gid, - uint32_t lid, search::SerialNum serialNum, bool enableNotifyPut, IDestructorCallback::SP moveDoneCtx) + PutDoneContextForMove(FeedToken token, IGidToLidChangeHandler &gidToLidChangeHandler, + const document::GlobalId &gid, uint32_t lid, search::SerialNum serialNum, + bool enableNotifyPut, IDestructorCallback::SP moveDoneCtx) : PutDoneContext(std::move(token), gidToLidChangeHandler, gid, lid, serialNum, enableNotifyPut), _moveDoneCtx(std::move(moveDoneCtx)) {} @@ -63,7 +51,7 @@ public: }; std::shared_ptr<PutDoneContext> -createPutDoneContext(FeedToken::UP &token, +createPutDoneContext(FeedToken token, IGidToLidChangeHandler &gidToLidChangeHandler, const document::GlobalId &gid, uint32_t lid, SerialNum serialNum, bool enableNotifyPut, @@ -79,14 +67,14 @@ createPutDoneContext(FeedToken::UP &token, } std::shared_ptr<PutDoneContext> -createPutDoneContext(FeedToken::UP &token, IGidToLidChangeHandler &gidToLidChangeHandler, +createPutDoneContext(FeedToken token, IGidToLidChangeHandler &gidToLidChangeHandler, const document::GlobalId &gid, uint32_t lid, SerialNum serialNum, bool enableNotifyPut) { return createPutDoneContext(token, gidToLidChangeHandler, gid, lid, serialNum, enableNotifyPut, IDestructorCallback::SP()); } std::shared_ptr<UpdateDoneContext> -createUpdateDoneContext(FeedToken::UP &token, const DocumentUpdate::SP &upd) +createUpdateDoneContext(FeedToken token, const DocumentUpdate::SP &upd) { return std::make_shared<UpdateDoneContext>(std::move(token), upd); } @@ -106,11 +94,9 @@ private: IDestructorCallback::SP _moveDoneCtx; public: - RemoveDoneContextForMove(std::unique_ptr<FeedToken> token, vespalib::Executor &executor, - IDocumentMetaStore &documentMetaStore, + RemoveDoneContextForMove(FeedToken token, vespalib::Executor &executor, IDocumentMetaStore &documentMetaStore, PendingNotifyRemoveDone &&pendingNotifyRemoveDone, - uint32_t lid, - IDestructorCallback::SP moveDoneCtx) + uint32_t lid, IDestructorCallback::SP moveDoneCtx) : RemoveDoneContext(std::move(token), executor, documentMetaStore, std::move(pendingNotifyRemoveDone) ,lid), _moveDoneCtx(std::move(moveDoneCtx)) {} @@ -118,13 +104,14 @@ public: }; std::shared_ptr<RemoveDoneContext> -createRemoveDoneContext(std::unique_ptr<FeedToken> token, vespalib::Executor &executor, - IDocumentMetaStore &documentMetaStore, PendingNotifyRemoveDone &&pendingNotifyRemoveDone, +createRemoveDoneContext(FeedToken token, vespalib::Executor &executor, IDocumentMetaStore &documentMetaStore, + PendingNotifyRemoveDone &&pendingNotifyRemoveDone, uint32_t lid, IDestructorCallback::SP moveDoneCtx) { if (moveDoneCtx) { return std::make_shared<RemoveDoneContextForMove> - (std::move(token), executor, documentMetaStore, std::move(pendingNotifyRemoveDone), lid, std::move(moveDoneCtx)); + (std::move(token), executor, documentMetaStore, std::move(pendingNotifyRemoveDone), + lid, std::move(moveDoneCtx)); } else { return std::make_shared<RemoveDoneContext> (std::move(token), executor, documentMetaStore, std::move(pendingNotifyRemoveDone), lid); @@ -132,7 +119,8 @@ createRemoveDoneContext(std::unique_ptr<FeedToken> token, vespalib::Executor &ex } std::vector<document::GlobalId> getGidsToRemove(const IDocumentMetaStore &metaStore, - const LidVectorContext::LidVector &lidsToRemove) { + const LidVectorContext::LidVector &lidsToRemove) +{ std::vector<document::GlobalId> gids; gids.reserve(lidsToRemove.size()); for (const auto &lid : lidsToRemove) { @@ -145,7 +133,8 @@ std::vector<document::GlobalId> getGidsToRemove(const IDocumentMetaStore &metaSt } void putMetaData(documentmetastore::IStore &meta_store, const DocumentId &doc_id, - const DocumentOperation &op, bool is_removed_doc) { + const DocumentOperation &op, bool is_removed_doc) +{ documentmetastore::IStore::Result putRes( meta_store.put(doc_id.getGlobalId(), op.getBucketId(), op.getTimestamp(), op.getSerializedDocSize(), op.getLid())); @@ -234,10 +223,9 @@ StoreOnlyFeedView::forceCommit(SerialNum serialNum, OnForceCommitDoneType onComm } void -StoreOnlyFeedView::considerEarlyAck(FeedToken::UP &token) +StoreOnlyFeedView::considerEarlyAck(FeedToken & token) { if (_commitTimeTracker.hasVisibilityDelay() && token) { - token->ack(); token.reset(); } } @@ -260,13 +248,13 @@ StoreOnlyFeedView::preparePut(PutOperation &putOp) } void -StoreOnlyFeedView::handlePut(FeedToken *token, const PutOperation &putOp) +StoreOnlyFeedView::handlePut(FeedToken token, const PutOperation &putOp) { - internalPut(dupFeedToken(token), putOp); + internalPut(std::move(token), putOp); } void -StoreOnlyFeedView::internalPut(FeedToken::UP token, const PutOperation &putOp) +StoreOnlyFeedView::internalPut(FeedToken token, const PutOperation &putOp) { assert(putOp.getValidDbdId()); assert(putOp.notMovingLidInSameSubDb()); @@ -291,7 +279,7 @@ StoreOnlyFeedView::internalPut(FeedToken::UP token, const PutOperation &putOp) bool immediateCommit = _commitTimeTracker.needCommit(); const document::GlobalId &gid = docId.getGlobalId(); std::shared_ptr<PutDoneContext> onWriteDone = - createPutDoneContext(token, _gidToLidChangeHandler, gid, putOp.getLid(), serialNum, + createPutDoneContext(std::move(token), _gidToLidChangeHandler, gid, putOp.getLid(), serialNum, putOp.changedDbdId() && useDocumentMetaStore(serialNum)); putSummary(serialNum, putOp.getLid(), doc, onWriteDone); putAttributes(serialNum, putOp.getLid(), *doc, immediateCommit, onWriteDone); @@ -302,9 +290,6 @@ StoreOnlyFeedView::internalPut(FeedToken::UP token, const PutOperation &putOp) internalRemove(std::move(token), serialNum, std::move(pendingNotifyRemoveDone), putOp.getPrevLid(), IDestructorCallback::SP()); } - if (token) { - token->ack(); - } } void @@ -347,9 +332,9 @@ StoreOnlyFeedView::prepareUpdate(UpdateOperation &updOp) } void -StoreOnlyFeedView::handleUpdate(FeedToken *token, const UpdateOperation &updOp) +StoreOnlyFeedView::handleUpdate(FeedToken token, const UpdateOperation &updOp) { - internalUpdate(dupFeedToken(token), updOp); + internalUpdate(std::move(token), updOp); } void StoreOnlyFeedView::putSummary(SerialNum serialNum, Lid lid, @@ -400,7 +385,7 @@ void StoreOnlyFeedView::heartBeatSummary(SerialNum serialNum) { } void -StoreOnlyFeedView::internalUpdate(FeedToken::UP token, const UpdateOperation &updOp) { +StoreOnlyFeedView::internalUpdate(FeedToken token, const UpdateOperation &updOp) { if ( ! updOp.getUpdate()) { LOG(warning, "database(%s): ignoring invalid update operation", _params._docTypeName.toString().c_str()); @@ -430,7 +415,7 @@ StoreOnlyFeedView::internalUpdate(FeedToken::UP token, const UpdateOperation &up considerEarlyAck(token); bool immediateCommit = _commitTimeTracker.needCommit(); - auto onWriteDone = createUpdateDoneContext(token, updOp.getUpdate()); + auto onWriteDone = createUpdateDoneContext(std::move(token), updOp.getUpdate()); updateAttributes(serialNum, lid, upd, immediateCommit, onWriteDone); UpdateScope updateScope(getUpdateScope(upd)); @@ -470,19 +455,18 @@ StoreOnlyFeedView::makeUpdatedDocument(SerialNum serialNum, Lid lid, DocumentUpd const DocumentUpdate & upd = *update; Document::UP newDoc; vespalib::nbostream newStream(12345); - assert(onWriteDone->getToken() == nullptr || useDocumentStore(serialNum)); + assert(!onWriteDone->hasToken() || useDocumentStore(serialNum)); if (useDocumentStore(serialNum)) { assert(prevDoc); } if (!prevDoc) { // Replaying, document removed later before summary was flushed. - assert(onWriteDone->getToken() == nullptr); + assert(!onWriteDone->hasToken()); // If we've passed serial number for flushed index then we could // also check that this operation is marked for ignore by index // proxy. } else { if (upd.getId() == prevDoc->getId()) { - newDoc = std::move(prevDoc); if (useDocumentStore(serialNum)) { upd.applyTo(*newDoc); @@ -491,7 +475,7 @@ StoreOnlyFeedView::makeUpdatedDocument(SerialNum serialNum, Lid lid, DocumentUpd } else { // Replaying, document removed and lid reused before summary // was flushed. - assert(onWriteDone->getToken() == nullptr && !useDocumentStore(serialNum)); + assert(!onWriteDone->hasToken() && !useDocumentStore(serialNum)); } } promisedDoc.set_value(std::move(newDoc)); @@ -531,12 +515,12 @@ StoreOnlyFeedView::prepareRemove(RemoveOperation &rmOp) } void -StoreOnlyFeedView::handleRemove(FeedToken *token, const RemoveOperation &rmOp) { - internalRemove(dupFeedToken(token), rmOp); +StoreOnlyFeedView::handleRemove(FeedToken token, const RemoveOperation &rmOp) { + internalRemove(std::move(token), rmOp); } void -StoreOnlyFeedView::internalRemove(FeedToken::UP token, const RemoveOperation &rmOp) +StoreOnlyFeedView::internalRemove(FeedToken token, const RemoveOperation &rmOp) { assert(rmOp.getValidNewOrPrevDbdId()); assert(rmOp.notMovingLidInSameSubDb()); @@ -564,13 +548,10 @@ StoreOnlyFeedView::internalRemove(FeedToken::UP token, const RemoveOperation &rm rmOp.getPrevLid(), IDestructorCallback::SP()); } } - if (token) { - token->ack(); - } } void -StoreOnlyFeedView::internalRemove(FeedToken::UP token, SerialNum serialNum, +StoreOnlyFeedView::internalRemove(FeedToken token, SerialNum serialNum, PendingNotifyRemoveDone &&pendingNotifyRemoveDone, Lid lid, IDestructorCallback::SP moveDoneCtx) { @@ -727,16 +708,15 @@ StoreOnlyFeedView::handleMove(const MoveOperation &moveOp, IDestructorCallback:: if (moveOp.getValidDbdId(_params._subDbId)) { bool immediateCommit = _commitTimeTracker.needCommit(); const document::GlobalId &gid = docId.getGlobalId(); - FeedToken::UP token; std::shared_ptr<PutDoneContext> onWriteDone = - createPutDoneContext(token, _gidToLidChangeHandler, gid, moveOp.getLid(), serialNum, + createPutDoneContext(FeedToken(), _gidToLidChangeHandler, gid, moveOp.getLid(), serialNum, moveOp.changedDbdId() && useDocumentMetaStore(serialNum), doneCtx); putSummary(serialNum, moveOp.getLid(), doc, onWriteDone); putAttributes(serialNum, moveOp.getLid(), *doc, immediateCommit, onWriteDone); putIndexedFields(serialNum, moveOp.getLid(), doc, immediateCommit, onWriteDone); } if (docAlreadyExists && moveOp.changedDbdId()) { - internalRemove(FeedToken::UP(), serialNum, std::move(pendingNotifyRemoveDone), moveOp.getPrevLid(), doneCtx); + internalRemove(FeedToken(), serialNum, std::move(pendingNotifyRemoveDone), moveOp.getPrevLid(), doneCtx); } } diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h index baaf77bbe59..a8119224ba8 100644 --- a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h +++ b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h @@ -168,22 +168,22 @@ private: } PendingNotifyRemoveDone adjustMetaStore(const DocumentOperation &op, const document::DocumentId &docId); - void internalPut(FeedTokenUP token, const PutOperation &putOp); - void internalUpdate(FeedTokenUP token, const UpdateOperation &updOp); + void internalPut(FeedToken token, const PutOperation &putOp); + void internalUpdate(FeedToken token, const UpdateOperation &updOp); bool lookupDocId(const document::DocumentId &docId, Lid & lid) const; - void internalRemove(FeedTokenUP token, const RemoveOperation &rmOp); + void internalRemove(FeedToken token, const RemoveOperation &rmOp); // Removes documents from meta store and document store. // returns the number of documents removed. size_t removeDocuments(const RemoveDocumentsOperation &op, bool remove_index_and_attribute_fields, bool immediateCommit); - void internalRemove(FeedTokenUP token, SerialNum serialNum, PendingNotifyRemoveDone &&pendingNotifyRemoveDone, + void internalRemove(FeedToken token, SerialNum serialNum, PendingNotifyRemoveDone &&pendingNotifyRemoveDone, Lid lid, std::shared_ptr<search::IDestructorCallback> moveDoneCtx); // Ack token early if visibility delay is nonzero - void considerEarlyAck(FeedTokenUP &token); + void considerEarlyAck(FeedToken &token); void makeUpdatedDocument(SerialNum serialNum, Lid lid, DocumentUpdateSP upd, OnOperationDoneType onWriteDone, PromisedDoc promisedDoc, PromisedStream promisedStream); @@ -220,7 +220,6 @@ protected: public: StoreOnlyFeedView(const Context &ctx, const PersistentParams ¶ms); - ~StoreOnlyFeedView() override; const ISummaryAdapter::SP &getSummaryAdapter() const { return _summaryAdapter; } @@ -233,31 +232,22 @@ public: CommitTimeTracker &getCommitTimeTracker() { return _commitTimeTracker; } IGidToLidChangeHandler &getGidToLidChangeHandler() const { return _gidToLidChangeHandler; } - /** - * Implements IFeedView. - */ - virtual const document::DocumentTypeRepo::SP &getDocumentTypeRepo() const override { return _repo; } - virtual const ISimpleDocumentMetaStore *getDocumentMetaStorePtr() const override; - - /** - * Similar to IPersistenceHandler functions. - * Takes pointer to feed token instead of instance because - * when replaying the spooler we don't have a feed token. - */ - - virtual void preparePut(PutOperation &putOp) override; - virtual void handlePut(FeedToken *token, const PutOperation &putOp) override; - virtual void prepareUpdate(UpdateOperation &updOp) override; - virtual void handleUpdate(FeedToken *token, const UpdateOperation &updOp) override; - virtual void prepareRemove(RemoveOperation &rmOp) override; - virtual void handleRemove(FeedToken *token, const RemoveOperation &rmOp) override; - virtual void prepareDeleteBucket(DeleteBucketOperation &delOp) override; - virtual void handleDeleteBucket(const DeleteBucketOperation &delOp) override; - virtual void prepareMove(MoveOperation &putOp) override; - virtual void handleMove(const MoveOperation &putOp, std::shared_ptr<search::IDestructorCallback> doneCtx) override; - virtual void heartBeat(search::SerialNum serialNum) override; - virtual void sync() override; - virtual void forceCommit(SerialNum serialNum) override; + const document::DocumentTypeRepo::SP &getDocumentTypeRepo() const override { return _repo; } + const ISimpleDocumentMetaStore *getDocumentMetaStorePtr() const override; + + void preparePut(PutOperation &putOp) override; + void handlePut(FeedToken token, const PutOperation &putOp) override; + void prepareUpdate(UpdateOperation &updOp) override; + void handleUpdate(FeedToken token, const UpdateOperation &updOp) override; + void prepareRemove(RemoveOperation &rmOp) override; + void handleRemove(FeedToken token, const RemoveOperation &rmOp) override; + void prepareDeleteBucket(DeleteBucketOperation &delOp) override; + void handleDeleteBucket(const DeleteBucketOperation &delOp) override; + void prepareMove(MoveOperation &putOp) override; + void handleMove(const MoveOperation &putOp, std::shared_ptr<search::IDestructorCallback> doneCtx) override; + void heartBeat(search::SerialNum serialNum) override; + void sync() override; + void forceCommit(SerialNum serialNum) override; virtual void forceCommit(SerialNum serialNum, OnForceCommitDoneType onCommitDone); /** @@ -266,8 +256,8 @@ public: * * Called by writer thread. */ - virtual void handlePruneRemovedDocuments(const PruneRemovedDocumentsOperation &pruneOp) override; - virtual void handleCompactLidSpace(const CompactLidSpaceOperation &op) override; + void handlePruneRemovedDocuments(const PruneRemovedDocumentsOperation &pruneOp) override; + void handleCompactLidSpace(const CompactLidSpaceOperation &op) override; }; } diff --git a/searchcore/src/vespa/searchcore/proton/server/updatedonecontext.cpp b/searchcore/src/vespa/searchcore/proton/server/updatedonecontext.cpp index 171990c32d3..6fc1a788531 100644 --- a/searchcore/src/vespa/searchcore/proton/server/updatedonecontext.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/updatedonecontext.cpp @@ -1,11 +1,10 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "updatedonecontext.h" -#include <vespa/searchcore/proton/common/feedtoken.h> namespace proton { -UpdateDoneContext::UpdateDoneContext(std::unique_ptr<FeedToken> token, const document::DocumentUpdate::SP &upd) +UpdateDoneContext::UpdateDoneContext(FeedToken token, const document::DocumentUpdate::SP &upd) : OperationDoneContext(std::move(token)), _upd(upd) { diff --git a/searchcore/src/vespa/searchcore/proton/server/updatedonecontext.h b/searchcore/src/vespa/searchcore/proton/server/updatedonecontext.h index 4701db300de..2b8a018af87 100644 --- a/searchcore/src/vespa/searchcore/proton/server/updatedonecontext.h +++ b/searchcore/src/vespa/searchcore/proton/server/updatedonecontext.h @@ -18,7 +18,7 @@ class UpdateDoneContext : public OperationDoneContext { document::DocumentUpdate::SP _upd; public: - UpdateDoneContext(std::unique_ptr<FeedToken> token, const document::DocumentUpdate::SP &upd); + UpdateDoneContext(FeedToken token, const document::DocumentUpdate::SP &upd); ~UpdateDoneContext() override; const document::DocumentUpdate &getUpdate() { return *_upd; } diff --git a/searchcore/src/vespa/searchcore/proton/test/dummy_feed_view.h b/searchcore/src/vespa/searchcore/proton/test/dummy_feed_view.h index cafcbb64410..163168a1a09 100644 --- a/searchcore/src/vespa/searchcore/proton/test/dummy_feed_view.h +++ b/searchcore/src/vespa/searchcore/proton/test/dummy_feed_view.h @@ -4,9 +4,7 @@ #include <vespa/searchcore/proton/server/ifeedview.h> #include <vespa/document/repo/documenttyperepo.h> -namespace proton { - -namespace test { +namespace proton::test { struct DummyFeedView : public IFeedView { @@ -18,33 +16,27 @@ struct DummyFeedView : public IFeedView DummyFeedView(const document::DocumentTypeRepo::SP &docTypeRepo) : _docTypeRepo(docTypeRepo) {} - virtual const document::DocumentTypeRepo::SP &getDocumentTypeRepo() const override { + const document::DocumentTypeRepo::SP &getDocumentTypeRepo() const override { return _docTypeRepo; } - virtual const ISimpleDocumentMetaStore *getDocumentMetaStorePtr() const override { + const ISimpleDocumentMetaStore *getDocumentMetaStorePtr() const override { return std::nullptr_t(); } - virtual void preparePut(PutOperation &) override {} - virtual void handlePut(FeedToken *, - const PutOperation &) override {} - virtual void prepareUpdate(UpdateOperation &) override {} - virtual void handleUpdate(FeedToken *, - const UpdateOperation &) override {} - virtual void prepareRemove(RemoveOperation &) override {} - virtual void handleRemove(FeedToken *, - const RemoveOperation &) override {} - virtual void prepareDeleteBucket(DeleteBucketOperation &) override {} - virtual void handleDeleteBucket(const DeleteBucketOperation &) override {} - virtual void prepareMove(MoveOperation &) override {} - virtual void handleMove(const MoveOperation &, std::shared_ptr<search::IDestructorCallback>) override {} - virtual void heartBeat(search::SerialNum) override {} - virtual void sync() override {} - virtual void handlePruneRemovedDocuments(const PruneRemovedDocumentsOperation &) override {} - virtual void handleCompactLidSpace(const CompactLidSpaceOperation &) override {} + void preparePut(PutOperation &) override {} + void handlePut(FeedToken, const PutOperation &) override {} + void prepareUpdate(UpdateOperation &) override {} + void handleUpdate(FeedToken, const UpdateOperation &) override {} + void prepareRemove(RemoveOperation &) override {} + void handleRemove(FeedToken, const RemoveOperation &) override {} + void prepareDeleteBucket(DeleteBucketOperation &) override {} + void handleDeleteBucket(const DeleteBucketOperation &) override {} + void prepareMove(MoveOperation &) override {} + void handleMove(const MoveOperation &, std::shared_ptr<search::IDestructorCallback>) override {} + void heartBeat(search::SerialNum) override {} + void sync() override {} + void handlePruneRemovedDocuments(const PruneRemovedDocumentsOperation &) override {} + void handleCompactLidSpace(const CompactLidSpaceOperation &) override {} void forceCommit(search::SerialNum) override { } }; -} // namespace test - -} // namespace proton - +} |