summaryrefslogtreecommitdiffstats
path: root/searchcore
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2017-10-13 15:21:23 +0200
committerHenning Baldersheim <balder@yahoo-inc.com>2017-10-13 15:26:42 +0200
commit10ff6f37abf65c3e6b0fd18911fe5c2815b4b06c (patch)
tree319584e48be066f487da2d1b11543b64b732749d /searchcore
parent9fc068e627ae71c5478a91fc06fff4d62933efa1 (diff)
Remove explicit ack and use feedtoken as a smartptr
Diffstat (limited to 'searchcore')
-rw-r--r--searchcore/src/tests/proton/documentdb/combiningfeedview/combiningfeedview_test.cpp42
-rw-r--r--searchcore/src/tests/proton/documentdb/document_subdbs/document_subdbs_test.cpp39
-rw-r--r--searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp64
-rw-r--r--searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp18
-rw-r--r--searchcore/src/tests/proton/feedtoken/feedtoken.cpp12
-rw-r--r--searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp100
-rw-r--r--searchcore/src/tests/proton/server/feedstates_test.cpp6
-rw-r--r--searchcore/src/vespa/searchcore/proton/common/feedtoken.cpp28
-rw-r--r--searchcore/src/vespa/searchcore/proton/common/feedtoken.h23
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/combiningfeedview.cpp23
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/combiningfeedview.h11
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp82
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/feedhandler.h30
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/feedstates.cpp18
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/feedstates.h8
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/ifeedview.h12
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/operationdonecontext.cpp7
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/operationdonecontext.h10
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/putdonecontext.cpp11
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/putdonecontext.h2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/removedonecontext.cpp7
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/removedonecontext.h6
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp88
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h56
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/updatedonecontext.cpp3
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/updatedonecontext.h2
-rw-r--r--searchcore/src/vespa/searchcore/proton/test/dummy_feed_view.h44
27 files changed, 295 insertions, 457 deletions
diff --git a/searchcore/src/tests/proton/documentdb/combiningfeedview/combiningfeedview_test.cpp b/searchcore/src/tests/proton/documentdb/combiningfeedview/combiningfeedview_test.cpp
index d4af7b214b6..a997b3cc3db 100644
--- a/searchcore/src/tests/proton/documentdb/combiningfeedview/combiningfeedview_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/combiningfeedview/combiningfeedview_test.cpp
@@ -72,22 +72,20 @@ struct MyFeedView : public test::DummyFeedView
_metaStore.constructFreeList();
}
- // Implements IFeedView
- virtual const DocumentMetaStore *getDocumentMetaStorePtr() const override { return &_metaStore; }
- virtual void preparePut(PutOperation &) override { ++_preparePut; }
- virtual void handlePut(FeedToken *, const PutOperation &) override { ++_handlePut; }
- virtual void prepareUpdate(UpdateOperation &) override { ++_prepareUpdate; }
- virtual void handleUpdate(FeedToken *, const UpdateOperation &) override { ++_handleUpdate; }
- virtual void prepareRemove(RemoveOperation &) override { ++_prepareRemove; }
- virtual void handleRemove(FeedToken *, const RemoveOperation &) override { ++_handleRemove; }
- virtual void prepareDeleteBucket(DeleteBucketOperation &) override { ++_prepareDeleteBucket; }
- virtual void handleDeleteBucket(const DeleteBucketOperation &) override
- { ++_handleDeleteBucket; }
- virtual void prepareMove(MoveOperation &) override { ++_prepareMove; }
- virtual void handleMove(const MoveOperation &, IDestructorCallback::SP) override { ++_handleMove; }
- virtual void heartBeat(SerialNum) override { ++_heartBeat; }
- virtual void handlePruneRemovedDocuments(const PruneRemovedDocumentsOperation &) override { ++_handlePrune; }
- virtual void handleCompactLidSpace(const CompactLidSpaceOperation &op) override {
+ const DocumentMetaStore *getDocumentMetaStorePtr() const override { return &_metaStore; }
+ void preparePut(PutOperation &) override { ++_preparePut; }
+ void handlePut(FeedToken, const PutOperation &) override { ++_handlePut; }
+ void prepareUpdate(UpdateOperation &) override { ++_prepareUpdate; }
+ void handleUpdate(FeedToken, const UpdateOperation &) override { ++_handleUpdate; }
+ void prepareRemove(RemoveOperation &) override { ++_prepareRemove; }
+ void handleRemove(FeedToken, const RemoveOperation &) override { ++_handleRemove; }
+ void prepareDeleteBucket(DeleteBucketOperation &) override { ++_prepareDeleteBucket; }
+ void handleDeleteBucket(const DeleteBucketOperation &) override { ++_handleDeleteBucket; }
+ void prepareMove(MoveOperation &) override { ++_prepareMove; }
+ void handleMove(const MoveOperation &, IDestructorCallback::SP) override { ++_handleMove; }
+ void heartBeat(SerialNum) override { ++_heartBeat; }
+ void handlePruneRemovedDocuments(const PruneRemovedDocumentsOperation &) override { ++_handlePrune; }
+ void handleCompactLidSpace(const CompactLidSpaceOperation &op) override {
_wantedLidLimit = op.getLidLimit();
}
};
@@ -213,7 +211,7 @@ TEST_F("require that handlePut() sends to 1 feed view", Fixture)
{
PutOperation op = f.put(2);
op.setDbDocumentId(DbDocumentId(READY, 2));
- f._view.handlePut(NULL, op);
+ f._view.handlePut(FeedToken(), op);
EXPECT_EQUAL(1u, f._ready._view->_handlePut);
EXPECT_EQUAL(0u, f._removed._view->_handlePut);
EXPECT_EQUAL(0u, f._notReady._view->_handlePut);
@@ -225,7 +223,7 @@ TEST_F("require that handlePut() sends to 2 feed views", Fixture)
PutOperation op = f.put(2);
op.setDbDocumentId(DbDocumentId(NOT_READY, 2));
op.setPrevDbDocumentId(DbDocumentId(REMOVED, 2));
- f._view.handlePut(NULL, op);
+ f._view.handlePut(FeedToken(), op);
EXPECT_EQUAL(0u, f._ready._view->_handlePut);
EXPECT_EQUAL(1u, f._removed._view->_handlePut);
EXPECT_EQUAL(1u, f._notReady._view->_handlePut);
@@ -259,7 +257,7 @@ TEST_F("require that handleRemove() sends op with valid dbdId to 1 feed view", F
{
RemoveOperation op = f.remove(1);
op.setDbDocumentId(DbDocumentId(REMOVED, 1));
- f._view.handleRemove(NULL, op);
+ f._view.handleRemove(FeedToken(), op);
EXPECT_EQUAL(0u, f._ready._view->_handleRemove);
EXPECT_EQUAL(1u, f._removed._view->_handleRemove);
EXPECT_EQUAL(0u, f._notReady._view->_handleRemove);
@@ -271,7 +269,7 @@ TEST_F("require that handleRemove() sends op with valid dbdId to 2 feed views",
RemoveOperation op = f.remove(1);
op.setDbDocumentId(DbDocumentId(REMOVED, 1));
op.setPrevDbDocumentId(DbDocumentId(READY, 1));
- f._view.handleRemove(NULL, op);
+ f._view.handleRemove(FeedToken(), op);
EXPECT_EQUAL(1u, f._ready._view->_handleRemove);
EXPECT_EQUAL(1u, f._removed._view->_handleRemove);
EXPECT_EQUAL(0u, f._notReady._view->_handleRemove);
@@ -283,7 +281,7 @@ TEST_F("require that handleRemove() sends op with invalid dbdId to prev view", F
RemoveOperation op = f.remove(1);
// can be used in the case where removed feed view does not remember removes.
op.setPrevDbDocumentId(DbDocumentId(READY, 1));
- f._view.handleRemove(NULL, op);
+ f._view.handleRemove(FeedToken(), op);
EXPECT_EQUAL(1u, f._ready._view->_handleRemove);
EXPECT_EQUAL(0u, f._removed._view->_handleRemove);
EXPECT_EQUAL(0u, f._notReady._view->_handleRemove);
@@ -317,7 +315,7 @@ TEST_F("require that handleUpdate() sends op to correct view", Fixture)
UpdateOperation op = f.update(1);
op.setDbDocumentId(DbDocumentId(READY, 1));
op.setPrevDbDocumentId(DbDocumentId(READY, 1));
- f._view.handleUpdate(NULL, op);
+ f._view.handleUpdate(FeedToken(), op);
EXPECT_EQUAL(1u, f._ready._view->_handleUpdate);
EXPECT_EQUAL(0u, f._removed._view->_handleUpdate);
EXPECT_EQUAL(0u, f._notReady._view->_handleUpdate);
diff --git a/searchcore/src/tests/proton/documentdb/document_subdbs/document_subdbs_test.cpp b/searchcore/src/tests/proton/documentdb/document_subdbs/document_subdbs_test.cpp
index 547e400cd76..8369ec0630d 100644
--- a/searchcore/src/tests/proton/documentdb/document_subdbs/document_subdbs_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/document_subdbs/document_subdbs_test.cpp
@@ -745,18 +745,15 @@ struct DocumentHandler
op.setSerialNum(serialNum);
return op;
}
- MoveOperation createMove(Document::UP doc, Timestamp timestamp,
- DbDocumentId sourceDbdId,
- uint32_t targetSubDbId,
- SerialNum serialNum)
+ MoveOperation createMove(Document::UP doc, Timestamp timestamp, DbDocumentId sourceDbdId,
+ uint32_t targetSubDbId, SerialNum serialNum)
{
proton::test::Document testDoc(Document::SP(doc.release()), 0, timestamp);
MoveOperation op(testDoc.getBucket(), testDoc.getTimestamp(), testDoc.getDoc(), sourceDbdId, targetSubDbId);
op.setSerialNum(serialNum);
return op;
}
- RemoveOperation createRemove(const DocumentId &docId, Timestamp timestamp,
- SerialNum serialNum)
+ RemoveOperation createRemove(const DocumentId &docId, Timestamp timestamp, SerialNum serialNum)
{
const document::GlobalId &gid = docId.getGlobalId();
BucketId bucket = gid.convertToBucketId();
@@ -769,7 +766,7 @@ struct DocumentHandler
void putDoc(PutOperation &op) {
IFeedView::SP feedView = _f._subDb.getFeedView();
_f.runInMaster([&]() { feedView->preparePut(op);
- feedView->handlePut(NULL, op); } );
+ feedView->handlePut(FeedToken(), op); } );
}
void moveDoc(MoveOperation &op) {
IFeedView::SP feedView = _f._subDb.getFeedView();
@@ -779,11 +776,10 @@ struct DocumentHandler
{
IFeedView::SP feedView = _f._subDb.getFeedView();
_f.runInMaster([&]() { feedView->prepareRemove(op);
- feedView->handleRemove(NULL, op); } );
+ feedView->handleRemove(FeedToken(), op); } );
}
void putDocs() {
- PutOperation putOp = createPut(std::move(createDoc(1, 22, 33)),
- Timestamp(10), 10);
+ PutOperation putOp = createPut(std::move(createDoc(1, 22, 33)), Timestamp(10), 10);
putDoc(putOp);
putOp = createPut(std::move(createDoc(2, 44, 55)), Timestamp(20), 20);
putDoc(putOp);
@@ -791,13 +787,8 @@ struct DocumentHandler
};
void
-assertAttribute(const AttributeGuard &attr,
- const vespalib::string &name,
- uint32_t numDocs,
- int64_t doc1Value,
- int64_t doc2Value,
- SerialNum createSerialNum,
- SerialNum lastSerialNum)
+assertAttribute(const AttributeGuard &attr, const vespalib::string &name, uint32_t numDocs,
+ int64_t doc1Value, int64_t doc2Value, SerialNum createSerialNum, SerialNum lastSerialNum)
{
EXPECT_EQUAL(name, attr->getName());
EXPECT_EQUAL(numDocs, attr->getNumDocs());
@@ -808,17 +799,13 @@ assertAttribute(const AttributeGuard &attr,
}
void
-assertAttribute1(const AttributeGuard &attr,
- SerialNum createSerialNum,
- SerialNum lastSerialNum)
+assertAttribute1(const AttributeGuard &attr, SerialNum createSerialNum, SerialNum lastSerialNum)
{
assertAttribute(attr, "attr1", 3, 22, 44, createSerialNum, lastSerialNum);
}
void
-assertAttribute2(const AttributeGuard &attr,
- SerialNum createSerialNum,
- SerialNum lastSerialNum)
+assertAttribute2(const AttributeGuard &attr, SerialNum createSerialNum, SerialNum lastSerialNum)
{
assertAttribute(attr, "attr2", 3, 33, 55, createSerialNum, lastSerialNum);
}
@@ -877,12 +864,10 @@ TEST_F("require that regular attributes are populated during reprocessing",
requireThatAttributesArePopulatedDuringReprocessing<SearchableFixtureTwoField, ConfigDir2>(f);
}
-namespace
-{
+namespace {
bool
-assertOperation(DocumentOperation &op,
- uint32_t expPrevSubDbId, uint32_t expPrevLid,
+assertOperation(DocumentOperation &op, uint32_t expPrevSubDbId, uint32_t expPrevLid,
uint32_t expSubDbId, uint32_t expLid)
{
if (!EXPECT_EQUAL(expPrevSubDbId, op.getPrevSubDbId())) {
diff --git a/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp b/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp
index b0b06a238c9..e70e83fd61e 100644
--- a/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp
@@ -140,12 +140,6 @@ struct MyReplayConfig : public IReplayConfig {
virtual void replayConfig(SerialNum) override {}
};
-void ackToken(FeedToken *token) {
- if (token != NULL) {
- token->ack();
- }
-}
-
struct MyDocumentMetaStore {
struct Entry {
DbDocumentId _id;
@@ -195,9 +189,9 @@ struct MyFeedView : public test::DummyFeedView {
int update_count;
SerialNum update_serial;
MyFeedView(const DocumentTypeRepo::SP &dtr);
- ~MyFeedView();
+ ~MyFeedView() override;
void resetPutLatch(uint32_t count) { putLatch.reset(new vespalib::CountDownLatch(count)); }
- virtual void preparePut(PutOperation &op) override {
+ void preparePut(PutOperation &op) override {
prepareDocumentOperation(op, op.getDocument()->getId().getGlobalId());
}
void prepareDocumentOperation(DocumentOperation &op, const GlobalId &gid) {
@@ -208,7 +202,8 @@ struct MyFeedView : public test::DummyFeedView {
op.setPrevTimestamp(entry->_prevTimestamp);
}
}
- virtual void handlePut(FeedToken *token, const PutOperation &putOp) override {
+ void handlePut(FeedToken token, const PutOperation &putOp) override {
+ (void) token;
LOG(info, "MyFeedView::handlePut(): docId(%s), putCount(%u), putLatchCount(%u)",
putOp.getDocument()->getId().toString().c_str(), put_count,
(putLatch.get() != NULL ? putLatch->getCount() : 0u));
@@ -221,23 +216,24 @@ struct MyFeedView : public test::DummyFeedView {
if (putLatch.get() != NULL) {
putLatch->countDown();
}
- ackToken(token);
}
- virtual void prepareUpdate(UpdateOperation &op) override {
+ void prepareUpdate(UpdateOperation &op) override {
prepareDocumentOperation(op, op.getUpdate()->getId().getGlobalId());
}
- virtual void handleUpdate(FeedToken *token, const UpdateOperation &op) override {
+ void handleUpdate(FeedToken token, const UpdateOperation &op) override {
+ (void) token;
+
++update_count;
update_serial = op.getSerialNum();
- ackToken(token);
}
- virtual void handleRemove(FeedToken *token, const RemoveOperation &) override
- { ++remove_count; ackToken(token); }
- virtual void handleMove(const MoveOperation &, IDestructorCallback::SP) override { ++move_count; }
- virtual void heartBeat(SerialNum) override { ++heartbeat_count; }
- virtual void handlePruneRemovedDocuments(
- const PruneRemovedDocumentsOperation &) override { ++prune_removed_count; }
- virtual const ISimpleDocumentMetaStore *getDocumentMetaStorePtr() const override {
+ void handleRemove(FeedToken token, const RemoveOperation &) override {
+ (void) token;
+ ++remove_count;
+ }
+ void handleMove(const MoveOperation &, IDestructorCallback::SP) override { ++move_count; }
+ void heartBeat(SerialNum) override { ++heartbeat_count; }
+ void handlePruneRemovedDocuments(const PruneRemovedDocumentsOperation &) override { ++prune_removed_count; }
+ const ISimpleDocumentMetaStore *getDocumentMetaStorePtr() const override {
return NULL;
}
};
@@ -317,8 +313,7 @@ MyTransport::~MyTransport() {}
struct FeedTokenContext {
MyTransport transport;
- FeedToken::UP token_ap;
- FeedToken &token;
+ FeedToken token;
FeedTokenContext();
~FeedTokenContext();
@@ -333,10 +328,8 @@ struct FeedTokenContext {
FeedTokenContext::FeedTokenContext()
: transport(),
- token_ap(new FeedToken(transport)),
- token(*token_ap)
-{
-}
+ token(transport)
+{}
FeedTokenContext::~FeedTokenContext() = default;
@@ -432,8 +425,7 @@ struct FeedHandlerFixture
handler.init(1);
}
- ~FeedHandlerFixture()
- {
+ ~FeedHandlerFixture() {
writeService.sync();
}
template <class FunctionType>
@@ -484,7 +476,7 @@ TEST_F("require that outdated remove is ignored", FeedHandlerFixture)
static_cast<DocumentOperation &>(*op).setPrevDbDocumentId(DbDocumentId(4));
static_cast<DocumentOperation &>(*op).setPrevTimestamp(Timestamp(10000));
FeedTokenContext token_context;
- f.handler.performOperation(std::move(token_context.token_ap), std::move(op));
+ f.handler.performOperation(std::move(token_context.token), std::move(op));
EXPECT_EQUAL(0, f.feedView.remove_count);
EXPECT_EQUAL(0, f.tls_writer.store_count);
}
@@ -496,7 +488,7 @@ TEST_F("require that outdated put is ignored", FeedHandlerFixture)
Timestamp(10), doc_context.doc));
static_cast<DocumentOperation &>(*op).setPrevTimestamp(Timestamp(10000));
FeedTokenContext token_context;
- f.handler.performOperation(std::move(token_context.token_ap), std::move(op));
+ f.handler.performOperation(std::move(token_context.token), std::move(op));
EXPECT_EQUAL(0, f.feedView.put_count);
EXPECT_EQUAL(0, f.tls_writer.store_count);
}
@@ -575,7 +567,7 @@ TEST_F("require that remove of unknown document with known data type stores remo
DocumentContext doc_context("id:test:searchdocument::foo", *f.schema.builder);
FeedOperation::UP op(new RemoveOperation(doc_context.bucketId, Timestamp(10), doc_context.doc->getId()));
FeedTokenContext token_context;
- f.handler.performOperation(std::move(token_context.token_ap), std::move(op));
+ f.handler.performOperation(std::move(token_context.token), std::move(op));
EXPECT_EQUAL(1, f.feedView.remove_count);
EXPECT_EQUAL(1, f.tls_writer.store_count);
}
@@ -585,7 +577,7 @@ TEST_F("require that partial update for non-existing document is tagged as such"
UpdateContext upCtx("id:test:searchdocument::foo", *f.schema.builder);
FeedOperation::UP op(new UpdateOperation(upCtx.bucketId, Timestamp(10), upCtx.update));
FeedTokenContext token_context;
- f.handler.performOperation(std::move(token_context.token_ap), std::move(op));
+ f.handler.performOperation(std::move(token_context.token), std::move(op));
const UpdateResult *result = static_cast<const UpdateResult *>(token_context.getResult());
EXPECT_FALSE(token_context.transport.documentWasFound);
@@ -603,7 +595,7 @@ TEST_F("require that partial update for non-existing document is created if spec
f.feedView.metaStore.insert(upCtx.update->getId().getGlobalId(), MyDocumentMetaStore::Entry(5, 5, Timestamp(10)));
FeedOperation::UP op(new UpdateOperation(upCtx.bucketId, Timestamp(10), upCtx.update));
FeedTokenContext token_context;
- f.handler.performOperation(std::move(token_context.token_ap), std::move(op));
+ f.handler.performOperation(std::move(token_context.token), std::move(op));
const UpdateResult *result = static_cast<const UpdateResult *>(token_context.getResult());
EXPECT_TRUE(token_context.transport.documentWasFound);
@@ -624,7 +616,7 @@ TEST_F("require that put is rejected if resource limit is reached", FeedHandlerF
DocumentContext docCtx("id:test:searchdocument::foo", *f.schema.builder);
FeedOperation::UP op = std::make_unique<PutOperation>(docCtx.bucketId, Timestamp(10), docCtx.doc);
FeedTokenContext token;
- f.handler.performOperation(std::move(token.token_ap), std::move(op));
+ f.handler.performOperation(std::move(token.token), std::move(op));
EXPECT_EQUAL(0, f.feedView.put_count);
EXPECT_EQUAL(Result::RESOURCE_EXHAUSTED, token.getResult()->getErrorCode());
EXPECT_EQUAL("Put operation rejected for document 'id:test:searchdocument::foo' of type 'searchdocument': 'Attribute resource limit reached'",
@@ -639,7 +631,7 @@ TEST_F("require that update is rejected if resource limit is reached", FeedHandl
UpdateContext updCtx("id:test:searchdocument::foo", *f.schema.builder);
FeedOperation::UP op = std::make_unique<UpdateOperation>(updCtx.bucketId, Timestamp(10), updCtx.update);
FeedTokenContext token;
- f.handler.performOperation(std::move(token.token_ap), std::move(op));
+ f.handler.performOperation(std::move(token.token), std::move(op));
EXPECT_EQUAL(0, f.feedView.update_count);
EXPECT_TRUE(dynamic_cast<const UpdateResult *>(token.getResult()));
EXPECT_EQUAL(Result::RESOURCE_EXHAUSTED, token.getResult()->getErrorCode());
@@ -655,7 +647,7 @@ TEST_F("require that remove is NOT rejected if resource limit is reached", FeedH
DocumentContext docCtx("id:test:searchdocument::foo", *f.schema.builder);
FeedOperation::UP op = std::make_unique<RemoveOperation>(docCtx.bucketId, Timestamp(10), docCtx.doc->getId());
FeedTokenContext token;
- f.handler.performOperation(std::move(token.token_ap), std::move(op));
+ f.handler.performOperation(std::move(token.token), std::move(op));
EXPECT_EQUAL(1, f.feedView.remove_count);
EXPECT_EQUAL(Result::NONE, token.getResult()->getErrorCode());
EXPECT_EQUAL("", token.getResult()->getErrorMessage());
diff --git a/searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp b/searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp
index 4eefbed0a53..548ce8ba20d 100644
--- a/searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp
@@ -571,7 +571,7 @@ struct FixtureBase
return doc("doc:test:1", timestamp);
}
- void performPut(FeedToken *token, PutOperation &op) {
+ void performPut(FeedToken token, PutOperation &op) {
getFeedView().preparePut(op);
op.setSerialNum(++serial);
getFeedView().handlePut(token, op);
@@ -586,10 +586,10 @@ struct FixtureBase
void putAndWait(const DocumentContext &docCtx) {
FeedTokenContext token(_tracer);
PutOperation op(docCtx.bid, docCtx.ts, docCtx.doc);
- runInMaster([&] () { performPut(&token.ft, op); });
+ runInMaster([&] () { performPut(token.ft, op); });
}
- void performUpdate(FeedToken *token, UpdateOperation &op) {
+ void performUpdate(FeedToken token, UpdateOperation &op) {
getFeedView().prepareUpdate(op);
op.setSerialNum(++serial);
getFeedView().handleUpdate(token, op);
@@ -598,25 +598,21 @@ struct FixtureBase
void updateAndWait(const DocumentContext &docCtx) {
FeedTokenContext token(_tracer);
UpdateOperation op(docCtx.bid, docCtx.ts, docCtx.upd);
- runInMaster([&] () { performUpdate(&token.ft, op); });
+ runInMaster([&] () { performUpdate(token.ft, op); });
}
- void performRemove(FeedToken *token, RemoveOperation &op) {
+ void performRemove(FeedToken token, RemoveOperation &op) {
getFeedView().prepareRemove(op);
if (op.getValidNewOrPrevDbdId()) {
op.setSerialNum(++serial);
- getFeedView().handleRemove(token, op);
- } else {
- if (token != NULL) {
- token->ack();
- }
+ getFeedView().handleRemove(std::move(token), op);
}
}
void removeAndWait(const DocumentContext &docCtx) {
FeedTokenContext token(_tracer);
RemoveOperation op(docCtx.bid, docCtx.ts, docCtx.doc->getId());
- runInMaster([&] () { performRemove(&token.ft, op); });
+ runInMaster([&] () { performRemove(token.ft, op); });
}
void removeAndWait(const DocumentContext::List &docs) {
diff --git a/searchcore/src/tests/proton/feedtoken/feedtoken.cpp b/searchcore/src/tests/proton/feedtoken/feedtoken.cpp
index 530c9ebef39..0fea9501d93 100644
--- a/searchcore/src/tests/proton/feedtoken/feedtoken.cpp
+++ b/searchcore/src/tests/proton/feedtoken/feedtoken.cpp
@@ -45,8 +45,9 @@ void
Test::testAck()
{
LocalTransport transport;
- FeedToken token(transport);
- token.ack();
+ {
+ FeedToken token(transport);
+ }
EXPECT_EQUAL(1u, transport.getReceivedCount());
}
@@ -70,9 +71,10 @@ Test::testHandover()
LocalTransport transport;
- FeedToken token(transport);
- token = MyHandover::handover(token);
- token.ack();
+ {
+ FeedToken token(transport);
+ token = MyHandover::handover(token);
+ }
EXPECT_EQUAL(1u, transport.getReceivedCount());
}
diff --git a/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp b/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp
index f8c0e0ac2f6..38d4d809d60 100644
--- a/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp
+++ b/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp
@@ -127,33 +127,29 @@ struct MyDocumentRetriever : DocumentRetrieverBaseForTest {
MyDocumentRetriever(const Document *d, Timestamp ts, DocumentId &last_id)
: repo(), document(d), timestamp(ts), last_doc_id(last_id) {}
- virtual const document::DocumentTypeRepo &getDocumentTypeRepo() const override {
+ const document::DocumentTypeRepo &getDocumentTypeRepo() const override {
return repo;
}
- virtual void getBucketMetaData(const storage::spi::Bucket &,
- search::DocumentMetaData::Vector &v) const override {
+ void getBucketMetaData(const storage::spi::Bucket &, search::DocumentMetaData::Vector &v) const override {
if (document != 0) {
v.push_back(getDocumentMetaData(document->getId()));
}
}
- virtual DocumentMetaData getDocumentMetaData(const DocumentId &id) const override {
+ DocumentMetaData getDocumentMetaData(const DocumentId &id) const override {
last_doc_id = id;
if (document != 0) {
- return DocumentMetaData(1, timestamp, document::BucketId(1),
- document->getId().getGlobalId());
+ return DocumentMetaData(1, timestamp, document::BucketId(1), document->getId().getGlobalId());
}
return DocumentMetaData();
}
- virtual document::Document::UP getDocument(search::DocumentIdT) const override {
+ document::Document::UP getDocument(search::DocumentIdT) const override {
if (document != 0) {
return Document::UP(document->clone());
}
return Document::UP();
}
- virtual CachedSelect::SP
- parseSelect(const vespalib::string &) const override
- {
+ CachedSelect::SP parseSelect(const vespalib::string &) const override {
return CachedSelect::SP();
}
};
@@ -208,134 +204,110 @@ struct MyHandler : public IPersistenceHandler, IBucketFreezer {
setExistingTimestamp(ts);
}
void handle(FeedToken token, const Bucket &bucket, Timestamp timestamp, const DocumentId &docId) {
+ (void) token;
lastBucket = bucket;
lastTimestamp = timestamp;
lastDocId = docId;
- token.ack();
}
- virtual void initialize() override { initialized = true; }
+ void initialize() override { initialized = true; }
- virtual void handlePut(FeedToken token, const Bucket& bucket,
- Timestamp timestamp, const document::Document::SP& doc) override {
+ void handlePut(FeedToken token, const Bucket& bucket,
+ Timestamp timestamp, const document::Document::SP& doc) override {
token.setResult(ResultUP(new storage::spi::Result()), false);
handle(token, bucket, timestamp, doc->getId());
}
- virtual void handleUpdate(FeedToken token, const Bucket& bucket,
- Timestamp timestamp, const document::DocumentUpdate::SP& upd) override {
+ void handleUpdate(FeedToken token, const Bucket& bucket,
+ Timestamp timestamp, const document::DocumentUpdate::SP& upd) override {
token.setResult(ResultUP(new storage::spi::UpdateResult(existingTimestamp)),
existingTimestamp > 0);
handle(token, bucket, timestamp, upd->getId());
}
- virtual void handleRemove(FeedToken token, const Bucket& bucket,
- Timestamp timestamp, const DocumentId& id) override {
+ void handleRemove(FeedToken token, const Bucket& bucket,
+ Timestamp timestamp, const DocumentId& id) override {
bool wasFound = existingTimestamp > 0;
token.setResult(ResultUP(new storage::spi::RemoveResult(wasFound)), wasFound);
handle(token, bucket, timestamp, id);
}
- virtual void handleListBuckets(IBucketIdListResultHandler &resultHandler) override {
+ void handleListBuckets(IBucketIdListResultHandler &resultHandler) override {
resultHandler.handle(BucketIdListResult(bucketList));
}
- virtual void handleSetClusterState(const ClusterState &calc,
- IGenericResultHandler &resultHandler) override {
+ void handleSetClusterState(const ClusterState &calc, IGenericResultHandler &resultHandler) override {
lastCalc = &calc;
resultHandler.handle(Result());
}
- virtual void handleSetActiveState(const Bucket &bucket,
- storage::spi::BucketInfo::ActiveState newState,
- IGenericResultHandler &resultHandler) override {
+ void handleSetActiveState(const Bucket &bucket, storage::spi::BucketInfo::ActiveState newState,
+ IGenericResultHandler &resultHandler) override {
lastBucket = bucket;
lastBucketState = newState;
resultHandler.handle(bucketStateResult);
}
- virtual void handleGetBucketInfo(const Bucket &,
- IBucketInfoResultHandler &resultHandler) override {
+ void handleGetBucketInfo(const Bucket &, IBucketInfoResultHandler &resultHandler) override {
resultHandler.handle(BucketInfoResult(bucketInfo));
}
- virtual void
- handleCreateBucket(FeedToken token,
- const storage::spi::Bucket &) override
- {
+ void handleCreateBucket(FeedToken token, const storage::spi::Bucket &) override {
token.setResult(ResultUP(new Result(_createBucketResult)), true);
- token.ack();
}
- virtual void handleDeleteBucket(FeedToken token,
- const storage::spi::Bucket &) override {
+ void handleDeleteBucket(FeedToken token, const storage::spi::Bucket &) override {
token.setResult(ResultUP(new Result(deleteBucketResult)), true);
- token.ack();
}
- virtual void handleGetModifiedBuckets(IBucketIdListResultHandler &resultHandler) override {
+ void handleGetModifiedBuckets(IBucketIdListResultHandler &resultHandler) override {
resultHandler.handle(BucketIdListResult(modBucketList));
}
- virtual void
- handleSplit(FeedToken token,
- const storage::spi::Bucket &source,
- const storage::spi::Bucket &target1,
- const storage::spi::Bucket &target2) override
+ void handleSplit(FeedToken token, const storage::spi::Bucket &source, const storage::spi::Bucket &target1,
+ const storage::spi::Bucket &target2) override
{
(void) source;
(void) target1;
(void) target2;
token.setResult(ResultUP(new Result(_splitResult)), true);
- token.ack();
}
- virtual void
- handleJoin(FeedToken token,
- const storage::spi::Bucket &source1,
- const storage::spi::Bucket &source2,
+ void handleJoin(FeedToken token, const storage::spi::Bucket &source1, const storage::spi::Bucket &source2,
const storage::spi::Bucket &target) override
{
(void) source1;
(void) source2;
(void) target;
token.setResult(ResultUP(new Result(_joinResult)), true);
- token.ack();
}
- virtual RetrieversSP getDocumentRetrievers(storage::spi::ReadConsistency) override {
+ RetrieversSP getDocumentRetrievers(storage::spi::ReadConsistency) override {
RetrieversSP ret(new std::vector<IDocumentRetriever::SP>);
- ret->push_back(IDocumentRetriever::SP(new MyDocumentRetriever(
- 0, Timestamp(), lastDocId)));
- ret->push_back(IDocumentRetriever::SP(new MyDocumentRetriever(
- document, existingTimestamp, lastDocId)));
+ ret->push_back(IDocumentRetriever::SP(new MyDocumentRetriever(0, Timestamp(), lastDocId)));
+ ret->push_back(IDocumentRetriever::SP(new MyDocumentRetriever(document, existingTimestamp, lastDocId)));
return ret;
}
- virtual BucketGuard::UP lockBucket(const storage::spi::Bucket &b) override {
+ BucketGuard::UP lockBucket(const storage::spi::Bucket &b) override {
return BucketGuard::UP(new BucketGuard(b.getBucketId(), *this));
}
- virtual void
- handleListActiveBuckets(IBucketIdListResultHandler &resultHandler) override
- {
+ void handleListActiveBuckets(IBucketIdListResultHandler &resultHandler) override {
BucketIdListResult::List list;
resultHandler.handle(BucketIdListResult(list));
}
- virtual void
- handlePopulateActiveBuckets(document::BucketId::List &buckets,
- IGenericResultHandler &resultHandler) override
- {
+ void handlePopulateActiveBuckets(document::BucketId::List &buckets, IGenericResultHandler &resultHandler) override {
(void) buckets;
resultHandler.handle(Result());
}
- virtual void freezeBucket(BucketId bucket) override {
+ void freezeBucket(BucketId bucket) override {
frozen.insert(bucket.getId());
was_frozen.insert(bucket.getId());
}
- virtual void thawBucket(BucketId bucket) override {
+ void thawBucket(BucketId bucket) override {
std::multiset<uint64_t>::iterator it = frozen.find(bucket.getId());
ASSERT_TRUE(it != frozen.end());
frozen.erase(it);
@@ -425,11 +397,7 @@ HandlerSet::prepareGetModifiedBuckets()
class SimplePersistenceEngineOwner : public IPersistenceEngineOwner
{
- virtual void
- setClusterState(const storage::spi::ClusterState &calc) override
- {
- (void) calc;
- }
+ void setClusterState(const storage::spi::ClusterState &calc) override { (void) calc; }
};
struct SimpleResourceWriteFilter : public IResourceWriteFilter
diff --git a/searchcore/src/tests/proton/server/feedstates_test.cpp b/searchcore/src/tests/proton/server/feedstates_test.cpp
index 15c2fbe5a84..a6f3496e1ed 100644
--- a/searchcore/src/tests/proton/server/feedstates_test.cpp
+++ b/searchcore/src/tests/proton/server/feedstates_test.cpp
@@ -40,10 +40,8 @@ struct MyFeedView : public test::DummyFeedView {
MyFeedView();
~MyFeedView();
- virtual const DocumentTypeRepo::SP &getDocumentTypeRepo() const override
- { return repo_sp; }
- virtual void handleRemove(FeedToken *, const RemoveOperation &) override
- { ++remove_handled; }
+ const DocumentTypeRepo::SP &getDocumentTypeRepo() const override { return repo_sp; }
+ void handleRemove(FeedToken , const RemoveOperation &) override { ++remove_handled; }
};
MyFeedView::MyFeedView() : repo_sp(repo.getTypeRepoSp()), remove_handled(0) {}
diff --git a/searchcore/src/vespa/searchcore/proton/common/feedtoken.cpp b/searchcore/src/vespa/searchcore/proton/common/feedtoken.cpp
index 008fafa332b..6fcd6bae4ec 100644
--- a/searchcore/src/vespa/searchcore/proton/common/feedtoken.cpp
+++ b/searchcore/src/vespa/searchcore/proton/common/feedtoken.cpp
@@ -4,48 +4,40 @@
namespace proton {
+FeedToken::FeedToken() = default;
+
FeedToken::FeedToken(ITransport &transport) :
- _state(new State(transport, 1))
+ _state(new State(transport))
{
}
-FeedToken::State::State(ITransport & transport, uint32_t numAcksRequired) :
+FeedToken::State::State(ITransport & transport) :
_transport(transport),
_result(new storage::spi::Result()),
_documentWasFound(false),
- _unAckedCount(numAcksRequired)
+ _alreadySent(false)
{
- assert(_unAckedCount > 0);
}
FeedToken::State::~State()
{
- assert(_unAckedCount == 0);
+ ack();
}
void
FeedToken::State::ack()
{
- uint32_t prev(_unAckedCount--);
- if (prev == 1) {
+ bool alreadySent = _alreadySent.exchange(true);
+ if ( !alreadySent ) {
_transport.send(std::move(_result), _documentWasFound);
}
- assert(prev >= 1);
-}
-
-void
-FeedToken::State::incNeededAcks()
-{
- uint32_t prev(_unAckedCount++);
- assert(prev >= 1);
- (void) prev;
}
void
FeedToken::State::fail()
{
- uint32_t prev = _unAckedCount.exchange(0);
- if (prev > 0) {
+ bool alreadySent = _alreadySent.exchange(true);
+ if ( !alreadySent ) {
_transport.send(std::move(_result), _documentWasFound);
}
}
diff --git a/searchcore/src/vespa/searchcore/proton/common/feedtoken.h b/searchcore/src/vespa/searchcore/proton/common/feedtoken.h
index 856c8a22652..d43a80b25ed 100644
--- a/searchcore/src/vespa/searchcore/proton/common/feedtoken.h
+++ b/searchcore/src/vespa/searchcore/proton/common/feedtoken.h
@@ -23,15 +23,12 @@ public:
virtual void send(ResultUP result, bool documentWasFound) = 0;
};
-private:
class State {
public:
State(const State &) = delete;
State & operator = (const State &) = delete;
- State(ITransport & transport, uint32_t numAcksRequired);
+ State(ITransport & transport);
~State();
- void incNeededAcks();
- void ack();
void fail();
void setResult(ResultUP result, bool documentWasFound) {
_documentWasFound = documentWasFound;
@@ -39,10 +36,11 @@ private:
}
const storage::spi::Result &getResult() { return *_result; }
private:
+ void ack();
ITransport &_transport;
ResultUP _result;
bool _documentWasFound;
- std::atomic<uint32_t> _unAckedCount;
+ std::atomic<bool> _alreadySent;
};
std::shared_ptr<State> _state;
@@ -59,6 +57,7 @@ public:
* @param transport The transport to pass the reply to.
*/
FeedToken(ITransport &transport);
+ FeedToken();
FeedToken(FeedToken &&) = default;
FeedToken & operator =(FeedToken &&) = default;
@@ -66,16 +65,10 @@ public:
FeedToken & operator =(const FeedToken &) = default;
~FeedToken() = default;
- /**
- * Passes a receipt back to the originating FeedEngine, declaring that this
- * operation succeeded. If an error occured while processing the operation,
- * use fail() instead. Invoking this and/or fail() more than once is void.
- */
- void ack() const { _state->ack(); }
-
- void incNeededAcks() const {
- _state->incNeededAcks();
- }
+ explicit operator bool() const { return static_cast<bool>(_state); }
+ State * operator ->() { return _state.get(); }
+ const State * operator -> () const { return _state.get(); }
+ void reset() { _state.reset(); }
/**
* Passes a receipt back to the originating FeedEngine, declaring that this
diff --git a/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.cpp b/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.cpp
index 30ee6e1ba75..106ce846d99 100644
--- a/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.cpp
@@ -117,19 +117,16 @@ CombiningFeedView::preparePut(PutOperation &putOp)
}
void
-CombiningFeedView::handlePut(FeedToken *token,
- const PutOperation &putOp)
+CombiningFeedView::handlePut(FeedToken token, const PutOperation &putOp)
{
assert(putOp.getValidDbdId());
uint32_t subDbId = putOp.getSubDbId();
uint32_t prevSubDbId = putOp.getPrevSubDbId();
if (putOp.getValidPrevDbdId() && prevSubDbId != subDbId) {
- if (token != NULL)
- token->incNeededAcks();
_views[subDbId]->handlePut(token, putOp);
- _views[prevSubDbId]->handlePut(token, putOp);
+ _views[prevSubDbId]->handlePut(std::move(token), putOp);
} else {
- _views[subDbId]->handlePut(token, putOp);
+ _views[subDbId]->handlePut(std::move(token), putOp);
}
}
@@ -143,14 +140,13 @@ CombiningFeedView::prepareUpdate(UpdateOperation &updOp)
}
void
-CombiningFeedView::handleUpdate(FeedToken *token,
- const UpdateOperation &updOp)
+CombiningFeedView::handleUpdate(FeedToken token, const UpdateOperation &updOp)
{
assert(updOp.getValidDbdId());
assert(updOp.getValidPrevDbdId());
assert(!updOp.changedDbdId());
uint32_t subDbId(updOp.getSubDbId());
- _views[subDbId]->handleUpdate(token, updOp);
+ _views[subDbId]->handleUpdate(std::move(token), updOp);
}
void
@@ -165,19 +161,16 @@ CombiningFeedView::prepareRemove(RemoveOperation &rmOp)
}
void
-CombiningFeedView::handleRemove(FeedToken *token,
- const RemoveOperation &rmOp)
+CombiningFeedView::handleRemove(FeedToken token, const RemoveOperation &rmOp)
{
if (rmOp.getValidDbdId()) {
uint32_t subDbId = rmOp.getSubDbId();
uint32_t prevSubDbId = rmOp.getPrevSubDbId();
if (rmOp.getValidPrevDbdId() && prevSubDbId != subDbId) {
- if (token != NULL)
- token->incNeededAcks();
_views[subDbId]->handleRemove(token, rmOp);
- _views[prevSubDbId]->handleRemove(token, rmOp);
+ _views[prevSubDbId]->handleRemove(std::move(token), rmOp);
} else {
- _views[subDbId]->handleRemove(token, rmOp);
+ _views[subDbId]->handleRemove(std::move(token), rmOp);
}
} else {
assert(rmOp.getValidPrevDbdId());
diff --git a/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.h b/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.h
index ea4ac64176a..284004dfa81 100644
--- a/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.h
+++ b/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.h
@@ -16,8 +16,7 @@
#include "replaypacketdispatcher.h"
#include "ibucketstatecalculator.h"
-namespace proton
-{
+namespace proton {
class CombiningFeedView : public IFeedView
@@ -63,7 +62,7 @@ public:
document::BucketSpace bucketSpace,
const IBucketStateCalculator::SP &calc);
- virtual ~CombiningFeedView();
+ ~CombiningFeedView() override;
const document::DocumentTypeRepo::SP & getDocumentTypeRepo() const override;
@@ -72,11 +71,11 @@ public:
*/
void preparePut(PutOperation &putOp) override;
- void handlePut(FeedToken *token, const PutOperation &putOp) override;
+ void handlePut(FeedToken token, const PutOperation &putOp) override;
void prepareUpdate(UpdateOperation &updOp) override;
- void handleUpdate(FeedToken *token, const UpdateOperation &updOp) override;
+ void handleUpdate(FeedToken token, const UpdateOperation &updOp) override;
void prepareRemove(RemoveOperation &rmOp) override;
- void handleRemove(FeedToken *token, const RemoveOperation &rmOp) override;
+ void handleRemove(FeedToken token, const RemoveOperation &rmOp) override;
void prepareDeleteBucket(DeleteBucketOperation &delOp) override;
void handleDeleteBucket(const DeleteBucketOperation &delOp) override;
void prepareMove(MoveOperation &putOp) override;
diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
index 9300af5c3f0..ea81b64ff22 100644
--- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
@@ -90,31 +90,30 @@ FeedHandler::doHandleOperation(FeedToken token, FeedOperation::UP op)
{
assert(_writeService.master().isCurrentThread());
LockGuard guard(_feedLock);
- _feedState->handleOperation(token, std::move(op));
+ _feedState->handleOperation(std::move(token), std::move(op));
}
-void FeedHandler::performPut(FeedToken::UP token, PutOperation &op) {
+void FeedHandler::performPut(FeedToken token, PutOperation &op) {
op.assertValid();
_activeFeedView->preparePut(op);
if (ignoreOperation(op)) {
LOG(debug, "performPut(): ignoreOperation: docId(%s), timestamp(%" PRIu64 "), prevTimestamp(%" PRIu64 ")",
op.getDocument()->getId().toString().c_str(), (uint64_t)op.getTimestamp(), (uint64_t)op.getPrevTimestamp());
if (token) {
- token->setResult(ResultUP(new Result), false);
- token->ack();
+ token->setResult(std::make_unique<Result>(), false);
}
return;
}
storeOperation(op);
if (token) {
- token->setResult(ResultUP(new Result), false);
+ token->setResult(std::make_unique<Result>(), false);
}
- _activeFeedView->handlePut(token.get(), op);
+ _activeFeedView->handlePut(std::move(token), op);
}
void
-FeedHandler::performUpdate(FeedToken::UP token, UpdateOperation &op)
+FeedHandler::performUpdate(FeedToken token, UpdateOperation &op)
{
_activeFeedView->prepareUpdate(op);
if (op.getPrevDbDocumentId().valid() && !op.getPrevMarkedAsRemoved()) {
@@ -123,26 +122,25 @@ FeedHandler::performUpdate(FeedToken::UP token, UpdateOperation &op)
createNonExistingDocument(std::move(token), op);
} else {
if (token) {
- token->setResult(ResultUP(new UpdateResult(Timestamp(0))), false);
- token->ack();
+ token->setResult(std::make_unique<UpdateResult>(Timestamp(0)), false);
}
}
}
void
-FeedHandler::performInternalUpdate(FeedToken::UP token, UpdateOperation &op)
+FeedHandler::performInternalUpdate(FeedToken token, UpdateOperation &op)
{
storeOperation(op);
if (token) {
token->setResult(ResultUP(new UpdateResult(op.getPrevTimestamp())), true);
}
- _activeFeedView->handleUpdate(token.get(), op);
+ _activeFeedView->handleUpdate(std::move(token), op);
}
void
-FeedHandler::createNonExistingDocument(FeedToken::UP token, const UpdateOperation &op)
+FeedHandler::createNonExistingDocument(FeedToken token, const UpdateOperation &op)
{
Document::SP doc(new Document(op.getUpdate()->getType(), op.getUpdate()->getId()));
doc->setRepo(*_activeFeedView->getDocumentTypeRepo());
@@ -155,22 +153,18 @@ FeedHandler::createNonExistingDocument(FeedToken::UP token, const UpdateOperatio
}
TransportLatch latch(1);
FeedToken putToken(latch);
- _activeFeedView->handlePut(&putToken, putOp);
+ _activeFeedView->handlePut(std::move(putToken), putOp);
latch.await();
- if (token) {
- token->ack();
- }
}
-void FeedHandler::performRemove(FeedToken::UP token, RemoveOperation &op) {
+void FeedHandler::performRemove(FeedToken token, RemoveOperation &op) {
_activeFeedView->prepareRemove(op);
if (ignoreOperation(op)) {
LOG(debug, "performRemove(): ignoreOperation: docId(%s), timestamp(%" PRIu64 "), prevTimestamp(%" PRIu64 ")",
op.getDocumentId().toString().c_str(), (uint64_t)op.getTimestamp(), (uint64_t)op.getPrevTimestamp());
if (token) {
token->setResult(ResultUP(new RemoveResult(false)), false);
- token->ack();
}
return;
}
@@ -182,70 +176,60 @@ void FeedHandler::performRemove(FeedToken::UP token, RemoveOperation &op) {
bool documentWasFound = !op.getPrevMarkedAsRemoved();
token->setResult(ResultUP(new RemoveResult(documentWasFound)), documentWasFound);
}
- _activeFeedView->handleRemove(token.get(), op);
+ _activeFeedView->handleRemove(std::move(token), op);
} else if (op.hasDocType()) {
assert(op.getDocType() == _docTypeName.getName());
storeOperation(op);
if (token) {
token->setResult(ResultUP(new RemoveResult(false)), false);
}
- _activeFeedView->handleRemove(token.get(), op);
+ _activeFeedView->handleRemove(std::move(token), op);
} else {
if (token) {
token->setResult(ResultUP(new RemoveResult(false)), false);
- token->ack();
}
}
}
void
-FeedHandler::performGarbageCollect(FeedToken::UP token)
+FeedHandler::performGarbageCollect(FeedToken token)
{
- if (token) {
- token->ack();
- }
+ (void) token;
}
void
-FeedHandler::performCreateBucket(FeedToken::UP token, CreateBucketOperation &op)
+FeedHandler::performCreateBucket(FeedToken token, CreateBucketOperation &op)
{
+ (void) token;
storeOperation(op);
_bucketDBHandler->handleCreateBucket(op.getBucketId());
- if (token) {
- token->ack();
- }
}
-void FeedHandler::performDeleteBucket(FeedToken::UP token, DeleteBucketOperation &op) {
+void FeedHandler::performDeleteBucket(FeedToken token, DeleteBucketOperation &op) {
+ (void) token;
_activeFeedView->prepareDeleteBucket(op);
storeOperation(op);
// Delete documents in bucket
_activeFeedView->handleDeleteBucket(op);
// Delete bucket itself, should no longer have documents.
_bucketDBHandler->handleDeleteBucket(op.getBucketId());
- if (token) {
- token->ack();
- }
+
}
-void FeedHandler::performSplit(FeedToken::UP token, SplitBucketOperation &op) {
+void FeedHandler::performSplit(FeedToken token, SplitBucketOperation &op) {
+ (void) token;
storeOperation(op);
_bucketDBHandler->handleSplit(op.getSerialNum(), op.getSource(), op.getTarget1(), op.getTarget2());
- if (token) {
- token->ack();
- }
}
-void FeedHandler::performJoin(FeedToken::UP token, JoinBucketsOperation &op) {
+void FeedHandler::performJoin(FeedToken token, JoinBucketsOperation &op) {
+ (void) token;
storeOperation(op);
_bucketDBHandler->handleJoin(op.getSerialNum(), op.getSource1(), op.getSource2(), op.getTarget());
- if (token) {
- token->ack();
- }
}
@@ -328,7 +312,7 @@ void
FeedHandler::changeFeedState(FeedState::SP newState)
{
LockGuard guard(_feedLock);
- changeFeedState(newState, guard);
+ changeFeedState(std::move(newState), guard);
}
@@ -466,8 +450,8 @@ isRejectableFeedOperation(FeedOperation::Type type)
}
template <typename ResultType>
-void feedOperationRejected(FeedToken *token, const vespalib::string &opType, const vespalib::string &docId,
- DocTypeName docTypeName, const vespalib::string &rejectMessage)
+void feedOperationRejected(FeedToken & token, const vespalib::string &opType, const vespalib::string &docId,
+ const DocTypeName & docTypeName, const vespalib::string &rejectMessage)
{
if (token) {
auto message = make_string("%s operation rejected for document '%s' of type '%s': '%s'",
@@ -478,8 +462,8 @@ void feedOperationRejected(FeedToken *token, const vespalib::string &opType, con
}
void
-notifyFeedOperationRejected(FeedToken *token, const FeedOperation &op,
- DocTypeName docTypeName, const vespalib::string &rejectMessage)
+notifyFeedOperationRejected(FeedToken & token, const FeedOperation &op,
+ const DocTypeName & docTypeName, const vespalib::string &rejectMessage)
{
if ((op.getType() == FeedOperation::UPDATE_42) || (op.getType() == FeedOperation::UPDATE)) {
vespalib::string docId = (static_cast<const UpdateOperation &>(op)).getUpdate()->getId().toString();
@@ -495,7 +479,7 @@ notifyFeedOperationRejected(FeedToken *token, const FeedOperation &op,
}
bool
-FeedHandler::considerWriteOperationForRejection(FeedToken *token, const FeedOperation &op)
+FeedHandler::considerWriteOperationForRejection(FeedToken & token, const FeedOperation &op)
{
if (!_writeFilter.acceptWriteOperation() && isRejectableFeedOperation(op.getType())) {
IResourceWriteFilter::State state = _writeFilter.getAcceptState();
@@ -508,9 +492,9 @@ FeedHandler::considerWriteOperationForRejection(FeedToken *token, const FeedOper
}
void
-FeedHandler::performOperation(FeedToken::UP token, FeedOperation::UP op)
+FeedHandler::performOperation(FeedToken token, FeedOperation::UP op)
{
- if (considerWriteOperationForRejection(token.get(), *op)) {
+ if (considerWriteOperationForRejection(token, *op)) {
return;
}
switch(op->getType()) {
diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h
index dc955cfeb79..4aec9196491 100644
--- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h
+++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h
@@ -11,6 +11,7 @@
#include "transactionlogmanager.h"
#include <persistence/spi/types.h>
#include <vespa/searchcore/proton/common/doctypename.h>
+#include <vespa/searchcore/proton/common/feedtoken.h>
#include <vespa/searchlib/transactionlog/translogclient.h>
namespace searchcorespi { namespace index { class IThreadingService; } }
@@ -101,24 +102,24 @@ private:
*/
void doHandleOperation(FeedToken token, FeedOperationUP op);
- bool considerWriteOperationForRejection(FeedToken *token, const FeedOperation &op);
+ bool considerWriteOperationForRejection(FeedToken & token, const FeedOperation &op);
/**
* Delayed execution of feed operations against feed view, in
* master write thread.
*/
- void performPut(FeedTokenUP token, PutOperation &op);
-
- void performUpdate(FeedTokenUP token, UpdateOperation &op);
- void performInternalUpdate(FeedTokenUP token, UpdateOperation &op);
- void createNonExistingDocument(FeedTokenUP, const UpdateOperation &op);
-
- void performRemove(FeedTokenUP token, RemoveOperation &op);
- void performGarbageCollect(FeedTokenUP token);
- void performCreateBucket(FeedTokenUP token, CreateBucketOperation &op);
- void performDeleteBucket(FeedTokenUP token, DeleteBucketOperation &op);
- void performSplit(FeedTokenUP token, SplitBucketOperation &op);
- void performJoin(FeedTokenUP token, JoinBucketsOperation &op);
+ void performPut(FeedToken token, PutOperation &op);
+
+ void performUpdate(FeedToken token, UpdateOperation &op);
+ void performInternalUpdate(FeedToken token, UpdateOperation &op);
+ void createNonExistingDocument(FeedToken, const UpdateOperation &op);
+
+ void performRemove(FeedToken token, RemoveOperation &op);
+ void performGarbageCollect(FeedToken token);
+ void performCreateBucket(FeedToken token, CreateBucketOperation &op);
+ void performDeleteBucket(FeedToken token, DeleteBucketOperation &op);
+ void performSplit(FeedToken token, SplitBucketOperation &op);
+ void performJoin(FeedToken token, JoinBucketsOperation &op);
void performSync();
void performEof();
@@ -223,7 +224,7 @@ public:
vespalib::string getDocTypeName() const { return _docTypeName.getName(); }
void tlsPrune(SerialNum oldest_to_keep);
- void performOperation(FeedTokenUP token, FeedOperationUP op);
+ void performOperation(FeedToken token, FeedOperationUP op);
void handleOperation(FeedToken token, FeedOperationUP op);
void handleMove(MoveOperation &op, std::shared_ptr<search::IDestructorCallback> moveDoneCtx) override;
@@ -240,4 +241,3 @@ public:
};
} // namespace proton
-
diff --git a/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp b/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp
index 6219cadd24f..91b310e3925 100644
--- a/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp
@@ -82,13 +82,13 @@ public:
}
virtual void replay(const PutOperation &op) override {
- _feed_view_ptr->handlePut(NULL, op);
+ _feed_view_ptr->handlePut(FeedToken(), op);
}
virtual void replay(const RemoveOperation &op) override {
- _feed_view_ptr->handleRemove(NULL, op);
+ _feed_view_ptr->handleRemove(FeedToken(), op);
}
virtual void replay(const UpdateOperation &op) override {
- _feed_view_ptr->handleUpdate(NULL, op);
+ _feed_view_ptr->handleUpdate(FeedToken(), op);
}
virtual void replay(const NoopOperation &) override {} // ignored
virtual void replay(const NewConfigOperation &op) override {
@@ -100,16 +100,12 @@ public:
_feed_view_ptr->handleDeleteBucket(op);
}
virtual void replay(const SplitBucketOperation &op) override {
- _bucketDBHandler.handleSplit(op.getSerialNum(),
- op.getSource(),
- op.getTarget1(),
- op.getTarget2());
+ _bucketDBHandler.handleSplit(op.getSerialNum(), op.getSource(),
+ op.getTarget1(), op.getTarget2());
}
virtual void replay(const JoinBucketsOperation &op) override {
- _bucketDBHandler.handleJoin(op.getSerialNum(),
- op.getSource1(),
- op.getSource2(),
- op.getTarget());
+ _bucketDBHandler.handleJoin(op.getSerialNum(), op.getSource1(),
+ op.getSource2(), op.getTarget());
}
virtual void replay(const PruneRemovedDocumentsOperation &op) override {
_feed_view_ptr->handlePruneRemovedDocuments(op);
diff --git a/searchcore/src/vespa/searchcore/proton/server/feedstates.h b/searchcore/src/vespa/searchcore/proton/server/feedstates.h
index 61e80d1c7cc..963a78d0d6b 100644
--- a/searchcore/src/vespa/searchcore/proton/server/feedstates.h
+++ b/searchcore/src/vespa/searchcore/proton/server/feedstates.h
@@ -79,13 +79,11 @@ public:
_handler(handler) {
}
- virtual void handleOperation(FeedToken token, FeedOperation::UP op) override {
- _handler.performOperation(FeedToken::UP(new FeedToken(token)), std::move(op));
+ void handleOperation(FeedToken token, FeedOperation::UP op) override {
+ _handler.performOperation(std::move(token), std::move(op));
}
- virtual void
- receive(const PacketWrapper::SP &wrap, vespalib::Executor &) override
- {
+ void receive(const PacketWrapper::SP &wrap, vespalib::Executor &) override {
throwExceptionInReceive(_handler.getDocTypeName().c_str(),
wrap->packet.range().from(),
wrap->packet.range().to(),
diff --git a/searchcore/src/vespa/searchcore/proton/server/ifeedview.h b/searchcore/src/vespa/searchcore/proton/server/ifeedview.h
index c8222902c5d..3e1635d0f33 100644
--- a/searchcore/src/vespa/searchcore/proton/server/ifeedview.h
+++ b/searchcore/src/vespa/searchcore/proton/server/ifeedview.h
@@ -2,19 +2,17 @@
#pragma once
+#include <vespa/searchcore/proton/common/feedtoken.h>
#include <vespa/searchlib/common/serialnum.h>
-#include <memory>
namespace document { class DocumentTypeRepo; }
namespace search { class IDestructorCallback; }
-namespace proton
-{
+namespace proton {
class CompactLidSpaceOperation;
class DeleteBucketOperation;
-class FeedToken;
class ISimpleDocumentMetaStore;
class MoveOperation;
class PruneRemovedDocumentsOperation;
@@ -49,11 +47,11 @@ public:
*/
virtual void preparePut(PutOperation &putOp) = 0;
- virtual void handlePut(FeedToken *token, const PutOperation &putOp) = 0;
+ virtual void handlePut(FeedToken token, const PutOperation &putOp) = 0;
virtual void prepareUpdate(UpdateOperation &updOp) = 0;
- virtual void handleUpdate(FeedToken *token, const UpdateOperation &updOp) = 0;
+ virtual void handleUpdate(FeedToken token, const UpdateOperation &updOp) = 0;
virtual void prepareRemove(RemoveOperation &rmOp) = 0;
- virtual void handleRemove(FeedToken *token, const RemoveOperation &rmOp) = 0;
+ virtual void handleRemove(FeedToken token, const RemoveOperation &rmOp) = 0;
virtual void prepareDeleteBucket(DeleteBucketOperation &delOp) = 0;
virtual void handleDeleteBucket(const DeleteBucketOperation &delOp) = 0;
virtual void prepareMove(MoveOperation &putOp) = 0;
diff --git a/searchcore/src/vespa/searchcore/proton/server/operationdonecontext.cpp b/searchcore/src/vespa/searchcore/proton/server/operationdonecontext.cpp
index 9c0115f0084..31e4c87b352 100644
--- a/searchcore/src/vespa/searchcore/proton/server/operationdonecontext.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/operationdonecontext.cpp
@@ -5,7 +5,7 @@
namespace proton {
-OperationDoneContext::OperationDoneContext(std::unique_ptr<FeedToken> token)
+OperationDoneContext::OperationDoneContext(FeedToken token)
: _token(std::move(token))
{
}
@@ -18,10 +18,7 @@ OperationDoneContext::~OperationDoneContext()
void
OperationDoneContext::ack()
{
- if (_token) {
- std::unique_ptr<FeedToken> token(std::move(_token));
- token->ack();
- }
+ _token.reset();
}
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/operationdonecontext.h b/searchcore/src/vespa/searchcore/proton/server/operationdonecontext.h
index b801987844b..4c310abf871 100644
--- a/searchcore/src/vespa/searchcore/proton/server/operationdonecontext.h
+++ b/searchcore/src/vespa/searchcore/proton/server/operationdonecontext.h
@@ -3,12 +3,10 @@
#pragma once
#include <vespa/searchlib/common/idestructorcallback.h>
-#include <vespa/searchcore/proton/feedoperation/feedoperation.h>
+#include <vespa/searchcore/proton/common/feedtoken.h>
namespace proton {
-class FeedToken;
-
/**
* Context class for document operations that acks operation when
* instance is destroyed. Typically a shared pointer to an instance is
@@ -18,15 +16,15 @@ class FeedToken;
*/
class OperationDoneContext : public search::IDestructorCallback
{
- std::unique_ptr<FeedToken> _token;
+ FeedToken _token;
protected:
void ack();
public:
- OperationDoneContext(std::unique_ptr<FeedToken> token);
+ OperationDoneContext(FeedToken token);
~OperationDoneContext() override;
- FeedToken *getToken() { return _token.get(); }
+ bool hasToken() const { return static_cast<bool>(_token); }
};
diff --git a/searchcore/src/vespa/searchcore/proton/server/putdonecontext.cpp b/searchcore/src/vespa/searchcore/proton/server/putdonecontext.cpp
index 649eebb26f5..fd7871773c8 100644
--- a/searchcore/src/vespa/searchcore/proton/server/putdonecontext.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/putdonecontext.cpp
@@ -1,19 +1,14 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "putdonecontext.h"
-#include <vespa/searchcore/proton/common/feedtoken.h>
#include <vespa/searchcore/proton/common/docid_limit.h>
#include <vespa/searchcore/proton/reference/i_gid_to_lid_change_handler.h>
namespace proton {
-PutDoneContext::PutDoneContext(std::unique_ptr<FeedToken> token,
-
- IGidToLidChangeHandler &gidToLidChangeHandler,
- const document::GlobalId &gid,
- uint32_t lid,
- search::SerialNum serialNum,
- bool enableNotifyPut)
+PutDoneContext::PutDoneContext(FeedToken token, IGidToLidChangeHandler &gidToLidChangeHandler,
+ const document::GlobalId &gid, uint32_t lid,
+ search::SerialNum serialNum, bool enableNotifyPut)
: OperationDoneContext(std::move(token)),
_lid(lid),
_docIdLimit(nullptr),
diff --git a/searchcore/src/vespa/searchcore/proton/server/putdonecontext.h b/searchcore/src/vespa/searchcore/proton/server/putdonecontext.h
index 3e98b02dda6..587c8f9054d 100644
--- a/searchcore/src/vespa/searchcore/proton/server/putdonecontext.h
+++ b/searchcore/src/vespa/searchcore/proton/server/putdonecontext.h
@@ -28,7 +28,7 @@ class PutDoneContext : public OperationDoneContext
bool _enableNotifyPut;
public:
- PutDoneContext(std::unique_ptr<FeedToken> token, IGidToLidChangeHandler &gidToLidChangeHandler,
+ PutDoneContext(FeedToken token, IGidToLidChangeHandler &gidToLidChangeHandler,
const document::GlobalId &gid, uint32_t lid, search::SerialNum serialNum, bool enableNotifyPut);
~PutDoneContext() override;
diff --git a/searchcore/src/vespa/searchcore/proton/server/removedonecontext.cpp b/searchcore/src/vespa/searchcore/proton/server/removedonecontext.cpp
index bd9a8240d73..194e190bb7b 100644
--- a/searchcore/src/vespa/searchcore/proton/server/removedonecontext.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/removedonecontext.cpp
@@ -2,16 +2,13 @@
#include "removedonecontext.h"
#include "removedonetask.h"
-#include <vespa/searchcore/proton/common/feedtoken.h>
#include <vespa/searchcore/proton/reference/i_gid_to_lid_change_handler.h>
namespace proton {
-RemoveDoneContext::RemoveDoneContext(std::unique_ptr<FeedToken> token,
- vespalib::Executor &executor,
+RemoveDoneContext::RemoveDoneContext(FeedToken token, vespalib::Executor &executor,
IDocumentMetaStore &documentMetaStore,
- PendingNotifyRemoveDone &&pendingNotifyRemoveDone,
- uint32_t lid)
+ PendingNotifyRemoveDone &&pendingNotifyRemoveDone, uint32_t lid)
: OperationDoneContext(std::move(token)),
_executor(executor),
_task(),
diff --git a/searchcore/src/vespa/searchcore/proton/server/removedonecontext.h b/searchcore/src/vespa/searchcore/proton/server/removedonecontext.h
index 83f6013dd85..912d31dde22 100644
--- a/searchcore/src/vespa/searchcore/proton/server/removedonecontext.h
+++ b/searchcore/src/vespa/searchcore/proton/server/removedonecontext.h
@@ -28,10 +28,8 @@ class RemoveDoneContext : public OperationDoneContext
PendingNotifyRemoveDone _pendingNotifyRemoveDone;
public:
- RemoveDoneContext(std::unique_ptr<FeedToken> token, vespalib::Executor &executor,
- IDocumentMetaStore &documentMetaStore, PendingNotifyRemoveDone &&pendingNotifyRemoveDone,
- uint32_t lid);
-
+ RemoveDoneContext(FeedToken token, vespalib::Executor &executor, IDocumentMetaStore &documentMetaStore,
+ PendingNotifyRemoveDone &&pendingNotifyRemoveDone, uint32_t lid);
~RemoveDoneContext() override;
};
diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp
index 8c8559115bd..33e8708799f 100644
--- a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp
@@ -36,26 +36,14 @@ namespace proton {
namespace {
-FeedToken::UP dupFeedToken(FeedToken *token)
-{
- // If token is not nullptr then a new feed token is created, referencing
- // same shared state as old token.
- if (token != nullptr) {
- return std::make_unique<FeedToken>(*token);
- } else {
- return FeedToken::UP();
- }
-}
-
class PutDoneContextForMove : public PutDoneContext {
private:
IDestructorCallback::SP _moveDoneCtx;
public:
- PutDoneContextForMove(std::unique_ptr<FeedToken> token,
- IGidToLidChangeHandler &gidToLidChangeHandler,
- const document::GlobalId &gid,
- uint32_t lid, search::SerialNum serialNum, bool enableNotifyPut, IDestructorCallback::SP moveDoneCtx)
+ PutDoneContextForMove(FeedToken token, IGidToLidChangeHandler &gidToLidChangeHandler,
+ const document::GlobalId &gid, uint32_t lid, search::SerialNum serialNum,
+ bool enableNotifyPut, IDestructorCallback::SP moveDoneCtx)
: PutDoneContext(std::move(token), gidToLidChangeHandler, gid, lid, serialNum, enableNotifyPut),
_moveDoneCtx(std::move(moveDoneCtx))
{}
@@ -63,7 +51,7 @@ public:
};
std::shared_ptr<PutDoneContext>
-createPutDoneContext(FeedToken::UP &token,
+createPutDoneContext(FeedToken token,
IGidToLidChangeHandler &gidToLidChangeHandler,
const document::GlobalId &gid, uint32_t lid,
SerialNum serialNum, bool enableNotifyPut,
@@ -79,14 +67,14 @@ createPutDoneContext(FeedToken::UP &token,
}
std::shared_ptr<PutDoneContext>
-createPutDoneContext(FeedToken::UP &token, IGidToLidChangeHandler &gidToLidChangeHandler,
+createPutDoneContext(FeedToken token, IGidToLidChangeHandler &gidToLidChangeHandler,
const document::GlobalId &gid, uint32_t lid, SerialNum serialNum, bool enableNotifyPut)
{
return createPutDoneContext(token, gidToLidChangeHandler, gid, lid, serialNum, enableNotifyPut, IDestructorCallback::SP());
}
std::shared_ptr<UpdateDoneContext>
-createUpdateDoneContext(FeedToken::UP &token, const DocumentUpdate::SP &upd)
+createUpdateDoneContext(FeedToken token, const DocumentUpdate::SP &upd)
{
return std::make_shared<UpdateDoneContext>(std::move(token), upd);
}
@@ -106,11 +94,9 @@ private:
IDestructorCallback::SP _moveDoneCtx;
public:
- RemoveDoneContextForMove(std::unique_ptr<FeedToken> token, vespalib::Executor &executor,
- IDocumentMetaStore &documentMetaStore,
+ RemoveDoneContextForMove(FeedToken token, vespalib::Executor &executor, IDocumentMetaStore &documentMetaStore,
PendingNotifyRemoveDone &&pendingNotifyRemoveDone,
- uint32_t lid,
- IDestructorCallback::SP moveDoneCtx)
+ uint32_t lid, IDestructorCallback::SP moveDoneCtx)
: RemoveDoneContext(std::move(token), executor, documentMetaStore, std::move(pendingNotifyRemoveDone) ,lid),
_moveDoneCtx(std::move(moveDoneCtx))
{}
@@ -118,13 +104,14 @@ public:
};
std::shared_ptr<RemoveDoneContext>
-createRemoveDoneContext(std::unique_ptr<FeedToken> token, vespalib::Executor &executor,
- IDocumentMetaStore &documentMetaStore, PendingNotifyRemoveDone &&pendingNotifyRemoveDone,
+createRemoveDoneContext(FeedToken token, vespalib::Executor &executor, IDocumentMetaStore &documentMetaStore,
+ PendingNotifyRemoveDone &&pendingNotifyRemoveDone,
uint32_t lid, IDestructorCallback::SP moveDoneCtx)
{
if (moveDoneCtx) {
return std::make_shared<RemoveDoneContextForMove>
- (std::move(token), executor, documentMetaStore, std::move(pendingNotifyRemoveDone), lid, std::move(moveDoneCtx));
+ (std::move(token), executor, documentMetaStore, std::move(pendingNotifyRemoveDone),
+ lid, std::move(moveDoneCtx));
} else {
return std::make_shared<RemoveDoneContext>
(std::move(token), executor, documentMetaStore, std::move(pendingNotifyRemoveDone), lid);
@@ -132,7 +119,8 @@ createRemoveDoneContext(std::unique_ptr<FeedToken> token, vespalib::Executor &ex
}
std::vector<document::GlobalId> getGidsToRemove(const IDocumentMetaStore &metaStore,
- const LidVectorContext::LidVector &lidsToRemove) {
+ const LidVectorContext::LidVector &lidsToRemove)
+{
std::vector<document::GlobalId> gids;
gids.reserve(lidsToRemove.size());
for (const auto &lid : lidsToRemove) {
@@ -145,7 +133,8 @@ std::vector<document::GlobalId> getGidsToRemove(const IDocumentMetaStore &metaSt
}
void putMetaData(documentmetastore::IStore &meta_store, const DocumentId &doc_id,
- const DocumentOperation &op, bool is_removed_doc) {
+ const DocumentOperation &op, bool is_removed_doc)
+{
documentmetastore::IStore::Result putRes(
meta_store.put(doc_id.getGlobalId(),
op.getBucketId(), op.getTimestamp(), op.getSerializedDocSize(), op.getLid()));
@@ -234,10 +223,9 @@ StoreOnlyFeedView::forceCommit(SerialNum serialNum, OnForceCommitDoneType onComm
}
void
-StoreOnlyFeedView::considerEarlyAck(FeedToken::UP &token)
+StoreOnlyFeedView::considerEarlyAck(FeedToken & token)
{
if (_commitTimeTracker.hasVisibilityDelay() && token) {
- token->ack();
token.reset();
}
}
@@ -260,13 +248,13 @@ StoreOnlyFeedView::preparePut(PutOperation &putOp)
}
void
-StoreOnlyFeedView::handlePut(FeedToken *token, const PutOperation &putOp)
+StoreOnlyFeedView::handlePut(FeedToken token, const PutOperation &putOp)
{
- internalPut(dupFeedToken(token), putOp);
+ internalPut(std::move(token), putOp);
}
void
-StoreOnlyFeedView::internalPut(FeedToken::UP token, const PutOperation &putOp)
+StoreOnlyFeedView::internalPut(FeedToken token, const PutOperation &putOp)
{
assert(putOp.getValidDbdId());
assert(putOp.notMovingLidInSameSubDb());
@@ -291,7 +279,7 @@ StoreOnlyFeedView::internalPut(FeedToken::UP token, const PutOperation &putOp)
bool immediateCommit = _commitTimeTracker.needCommit();
const document::GlobalId &gid = docId.getGlobalId();
std::shared_ptr<PutDoneContext> onWriteDone =
- createPutDoneContext(token, _gidToLidChangeHandler, gid, putOp.getLid(), serialNum,
+ createPutDoneContext(std::move(token), _gidToLidChangeHandler, gid, putOp.getLid(), serialNum,
putOp.changedDbdId() && useDocumentMetaStore(serialNum));
putSummary(serialNum, putOp.getLid(), doc, onWriteDone);
putAttributes(serialNum, putOp.getLid(), *doc, immediateCommit, onWriteDone);
@@ -302,9 +290,6 @@ StoreOnlyFeedView::internalPut(FeedToken::UP token, const PutOperation &putOp)
internalRemove(std::move(token), serialNum, std::move(pendingNotifyRemoveDone),
putOp.getPrevLid(), IDestructorCallback::SP());
}
- if (token) {
- token->ack();
- }
}
void
@@ -347,9 +332,9 @@ StoreOnlyFeedView::prepareUpdate(UpdateOperation &updOp)
}
void
-StoreOnlyFeedView::handleUpdate(FeedToken *token, const UpdateOperation &updOp)
+StoreOnlyFeedView::handleUpdate(FeedToken token, const UpdateOperation &updOp)
{
- internalUpdate(dupFeedToken(token), updOp);
+ internalUpdate(std::move(token), updOp);
}
void StoreOnlyFeedView::putSummary(SerialNum serialNum, Lid lid,
@@ -400,7 +385,7 @@ void StoreOnlyFeedView::heartBeatSummary(SerialNum serialNum) {
}
void
-StoreOnlyFeedView::internalUpdate(FeedToken::UP token, const UpdateOperation &updOp) {
+StoreOnlyFeedView::internalUpdate(FeedToken token, const UpdateOperation &updOp) {
if ( ! updOp.getUpdate()) {
LOG(warning, "database(%s): ignoring invalid update operation",
_params._docTypeName.toString().c_str());
@@ -430,7 +415,7 @@ StoreOnlyFeedView::internalUpdate(FeedToken::UP token, const UpdateOperation &up
considerEarlyAck(token);
bool immediateCommit = _commitTimeTracker.needCommit();
- auto onWriteDone = createUpdateDoneContext(token, updOp.getUpdate());
+ auto onWriteDone = createUpdateDoneContext(std::move(token), updOp.getUpdate());
updateAttributes(serialNum, lid, upd, immediateCommit, onWriteDone);
UpdateScope updateScope(getUpdateScope(upd));
@@ -470,19 +455,18 @@ StoreOnlyFeedView::makeUpdatedDocument(SerialNum serialNum, Lid lid, DocumentUpd
const DocumentUpdate & upd = *update;
Document::UP newDoc;
vespalib::nbostream newStream(12345);
- assert(onWriteDone->getToken() == nullptr || useDocumentStore(serialNum));
+ assert(!onWriteDone->hasToken() || useDocumentStore(serialNum));
if (useDocumentStore(serialNum)) {
assert(prevDoc);
}
if (!prevDoc) {
// Replaying, document removed later before summary was flushed.
- assert(onWriteDone->getToken() == nullptr);
+ assert(!onWriteDone->hasToken());
// If we've passed serial number for flushed index then we could
// also check that this operation is marked for ignore by index
// proxy.
} else {
if (upd.getId() == prevDoc->getId()) {
-
newDoc = std::move(prevDoc);
if (useDocumentStore(serialNum)) {
upd.applyTo(*newDoc);
@@ -491,7 +475,7 @@ StoreOnlyFeedView::makeUpdatedDocument(SerialNum serialNum, Lid lid, DocumentUpd
} else {
// Replaying, document removed and lid reused before summary
// was flushed.
- assert(onWriteDone->getToken() == nullptr && !useDocumentStore(serialNum));
+ assert(!onWriteDone->hasToken() && !useDocumentStore(serialNum));
}
}
promisedDoc.set_value(std::move(newDoc));
@@ -531,12 +515,12 @@ StoreOnlyFeedView::prepareRemove(RemoveOperation &rmOp)
}
void
-StoreOnlyFeedView::handleRemove(FeedToken *token, const RemoveOperation &rmOp) {
- internalRemove(dupFeedToken(token), rmOp);
+StoreOnlyFeedView::handleRemove(FeedToken token, const RemoveOperation &rmOp) {
+ internalRemove(std::move(token), rmOp);
}
void
-StoreOnlyFeedView::internalRemove(FeedToken::UP token, const RemoveOperation &rmOp)
+StoreOnlyFeedView::internalRemove(FeedToken token, const RemoveOperation &rmOp)
{
assert(rmOp.getValidNewOrPrevDbdId());
assert(rmOp.notMovingLidInSameSubDb());
@@ -564,13 +548,10 @@ StoreOnlyFeedView::internalRemove(FeedToken::UP token, const RemoveOperation &rm
rmOp.getPrevLid(), IDestructorCallback::SP());
}
}
- if (token) {
- token->ack();
- }
}
void
-StoreOnlyFeedView::internalRemove(FeedToken::UP token, SerialNum serialNum,
+StoreOnlyFeedView::internalRemove(FeedToken token, SerialNum serialNum,
PendingNotifyRemoveDone &&pendingNotifyRemoveDone, Lid lid,
IDestructorCallback::SP moveDoneCtx)
{
@@ -727,16 +708,15 @@ StoreOnlyFeedView::handleMove(const MoveOperation &moveOp, IDestructorCallback::
if (moveOp.getValidDbdId(_params._subDbId)) {
bool immediateCommit = _commitTimeTracker.needCommit();
const document::GlobalId &gid = docId.getGlobalId();
- FeedToken::UP token;
std::shared_ptr<PutDoneContext> onWriteDone =
- createPutDoneContext(token, _gidToLidChangeHandler, gid, moveOp.getLid(), serialNum,
+ createPutDoneContext(FeedToken(), _gidToLidChangeHandler, gid, moveOp.getLid(), serialNum,
moveOp.changedDbdId() && useDocumentMetaStore(serialNum), doneCtx);
putSummary(serialNum, moveOp.getLid(), doc, onWriteDone);
putAttributes(serialNum, moveOp.getLid(), *doc, immediateCommit, onWriteDone);
putIndexedFields(serialNum, moveOp.getLid(), doc, immediateCommit, onWriteDone);
}
if (docAlreadyExists && moveOp.changedDbdId()) {
- internalRemove(FeedToken::UP(), serialNum, std::move(pendingNotifyRemoveDone), moveOp.getPrevLid(), doneCtx);
+ internalRemove(FeedToken(), serialNum, std::move(pendingNotifyRemoveDone), moveOp.getPrevLid(), doneCtx);
}
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h
index baaf77bbe59..a8119224ba8 100644
--- a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h
+++ b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h
@@ -168,22 +168,22 @@ private:
}
PendingNotifyRemoveDone adjustMetaStore(const DocumentOperation &op, const document::DocumentId &docId);
- void internalPut(FeedTokenUP token, const PutOperation &putOp);
- void internalUpdate(FeedTokenUP token, const UpdateOperation &updOp);
+ void internalPut(FeedToken token, const PutOperation &putOp);
+ void internalUpdate(FeedToken token, const UpdateOperation &updOp);
bool lookupDocId(const document::DocumentId &docId, Lid & lid) const;
- void internalRemove(FeedTokenUP token, const RemoveOperation &rmOp);
+ void internalRemove(FeedToken token, const RemoveOperation &rmOp);
// Removes documents from meta store and document store.
// returns the number of documents removed.
size_t removeDocuments(const RemoveDocumentsOperation &op, bool remove_index_and_attribute_fields,
bool immediateCommit);
- void internalRemove(FeedTokenUP token, SerialNum serialNum, PendingNotifyRemoveDone &&pendingNotifyRemoveDone,
+ void internalRemove(FeedToken token, SerialNum serialNum, PendingNotifyRemoveDone &&pendingNotifyRemoveDone,
Lid lid, std::shared_ptr<search::IDestructorCallback> moveDoneCtx);
// Ack token early if visibility delay is nonzero
- void considerEarlyAck(FeedTokenUP &token);
+ void considerEarlyAck(FeedToken &token);
void makeUpdatedDocument(SerialNum serialNum, Lid lid, DocumentUpdateSP upd, OnOperationDoneType onWriteDone,
PromisedDoc promisedDoc, PromisedStream promisedStream);
@@ -220,7 +220,6 @@ protected:
public:
StoreOnlyFeedView(const Context &ctx, const PersistentParams &params);
-
~StoreOnlyFeedView() override;
const ISummaryAdapter::SP &getSummaryAdapter() const { return _summaryAdapter; }
@@ -233,31 +232,22 @@ public:
CommitTimeTracker &getCommitTimeTracker() { return _commitTimeTracker; }
IGidToLidChangeHandler &getGidToLidChangeHandler() const { return _gidToLidChangeHandler; }
- /**
- * Implements IFeedView.
- */
- virtual const document::DocumentTypeRepo::SP &getDocumentTypeRepo() const override { return _repo; }
- virtual const ISimpleDocumentMetaStore *getDocumentMetaStorePtr() const override;
-
- /**
- * Similar to IPersistenceHandler functions.
- * Takes pointer to feed token instead of instance because
- * when replaying the spooler we don't have a feed token.
- */
-
- virtual void preparePut(PutOperation &putOp) override;
- virtual void handlePut(FeedToken *token, const PutOperation &putOp) override;
- virtual void prepareUpdate(UpdateOperation &updOp) override;
- virtual void handleUpdate(FeedToken *token, const UpdateOperation &updOp) override;
- virtual void prepareRemove(RemoveOperation &rmOp) override;
- virtual void handleRemove(FeedToken *token, const RemoveOperation &rmOp) override;
- virtual void prepareDeleteBucket(DeleteBucketOperation &delOp) override;
- virtual void handleDeleteBucket(const DeleteBucketOperation &delOp) override;
- virtual void prepareMove(MoveOperation &putOp) override;
- virtual void handleMove(const MoveOperation &putOp, std::shared_ptr<search::IDestructorCallback> doneCtx) override;
- virtual void heartBeat(search::SerialNum serialNum) override;
- virtual void sync() override;
- virtual void forceCommit(SerialNum serialNum) override;
+ const document::DocumentTypeRepo::SP &getDocumentTypeRepo() const override { return _repo; }
+ const ISimpleDocumentMetaStore *getDocumentMetaStorePtr() const override;
+
+ void preparePut(PutOperation &putOp) override;
+ void handlePut(FeedToken token, const PutOperation &putOp) override;
+ void prepareUpdate(UpdateOperation &updOp) override;
+ void handleUpdate(FeedToken token, const UpdateOperation &updOp) override;
+ void prepareRemove(RemoveOperation &rmOp) override;
+ void handleRemove(FeedToken token, const RemoveOperation &rmOp) override;
+ void prepareDeleteBucket(DeleteBucketOperation &delOp) override;
+ void handleDeleteBucket(const DeleteBucketOperation &delOp) override;
+ void prepareMove(MoveOperation &putOp) override;
+ void handleMove(const MoveOperation &putOp, std::shared_ptr<search::IDestructorCallback> doneCtx) override;
+ void heartBeat(search::SerialNum serialNum) override;
+ void sync() override;
+ void forceCommit(SerialNum serialNum) override;
virtual void forceCommit(SerialNum serialNum, OnForceCommitDoneType onCommitDone);
/**
@@ -266,8 +256,8 @@ public:
*
* Called by writer thread.
*/
- virtual void handlePruneRemovedDocuments(const PruneRemovedDocumentsOperation &pruneOp) override;
- virtual void handleCompactLidSpace(const CompactLidSpaceOperation &op) override;
+ void handlePruneRemovedDocuments(const PruneRemovedDocumentsOperation &pruneOp) override;
+ void handleCompactLidSpace(const CompactLidSpaceOperation &op) override;
};
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/updatedonecontext.cpp b/searchcore/src/vespa/searchcore/proton/server/updatedonecontext.cpp
index 171990c32d3..6fc1a788531 100644
--- a/searchcore/src/vespa/searchcore/proton/server/updatedonecontext.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/updatedonecontext.cpp
@@ -1,11 +1,10 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "updatedonecontext.h"
-#include <vespa/searchcore/proton/common/feedtoken.h>
namespace proton {
-UpdateDoneContext::UpdateDoneContext(std::unique_ptr<FeedToken> token, const document::DocumentUpdate::SP &upd)
+UpdateDoneContext::UpdateDoneContext(FeedToken token, const document::DocumentUpdate::SP &upd)
: OperationDoneContext(std::move(token)),
_upd(upd)
{
diff --git a/searchcore/src/vespa/searchcore/proton/server/updatedonecontext.h b/searchcore/src/vespa/searchcore/proton/server/updatedonecontext.h
index 4701db300de..2b8a018af87 100644
--- a/searchcore/src/vespa/searchcore/proton/server/updatedonecontext.h
+++ b/searchcore/src/vespa/searchcore/proton/server/updatedonecontext.h
@@ -18,7 +18,7 @@ class UpdateDoneContext : public OperationDoneContext
{
document::DocumentUpdate::SP _upd;
public:
- UpdateDoneContext(std::unique_ptr<FeedToken> token, const document::DocumentUpdate::SP &upd);
+ UpdateDoneContext(FeedToken token, const document::DocumentUpdate::SP &upd);
~UpdateDoneContext() override;
const document::DocumentUpdate &getUpdate() { return *_upd; }
diff --git a/searchcore/src/vespa/searchcore/proton/test/dummy_feed_view.h b/searchcore/src/vespa/searchcore/proton/test/dummy_feed_view.h
index cafcbb64410..163168a1a09 100644
--- a/searchcore/src/vespa/searchcore/proton/test/dummy_feed_view.h
+++ b/searchcore/src/vespa/searchcore/proton/test/dummy_feed_view.h
@@ -4,9 +4,7 @@
#include <vespa/searchcore/proton/server/ifeedview.h>
#include <vespa/document/repo/documenttyperepo.h>
-namespace proton {
-
-namespace test {
+namespace proton::test {
struct DummyFeedView : public IFeedView
{
@@ -18,33 +16,27 @@ struct DummyFeedView : public IFeedView
DummyFeedView(const document::DocumentTypeRepo::SP &docTypeRepo)
: _docTypeRepo(docTypeRepo)
{}
- virtual const document::DocumentTypeRepo::SP &getDocumentTypeRepo() const override {
+ const document::DocumentTypeRepo::SP &getDocumentTypeRepo() const override {
return _docTypeRepo;
}
- virtual const ISimpleDocumentMetaStore *getDocumentMetaStorePtr() const override {
+ const ISimpleDocumentMetaStore *getDocumentMetaStorePtr() const override {
return std::nullptr_t();
}
- virtual void preparePut(PutOperation &) override {}
- virtual void handlePut(FeedToken *,
- const PutOperation &) override {}
- virtual void prepareUpdate(UpdateOperation &) override {}
- virtual void handleUpdate(FeedToken *,
- const UpdateOperation &) override {}
- virtual void prepareRemove(RemoveOperation &) override {}
- virtual void handleRemove(FeedToken *,
- const RemoveOperation &) override {}
- virtual void prepareDeleteBucket(DeleteBucketOperation &) override {}
- virtual void handleDeleteBucket(const DeleteBucketOperation &) override {}
- virtual void prepareMove(MoveOperation &) override {}
- virtual void handleMove(const MoveOperation &, std::shared_ptr<search::IDestructorCallback>) override {}
- virtual void heartBeat(search::SerialNum) override {}
- virtual void sync() override {}
- virtual void handlePruneRemovedDocuments(const PruneRemovedDocumentsOperation &) override {}
- virtual void handleCompactLidSpace(const CompactLidSpaceOperation &) override {}
+ void preparePut(PutOperation &) override {}
+ void handlePut(FeedToken, const PutOperation &) override {}
+ void prepareUpdate(UpdateOperation &) override {}
+ void handleUpdate(FeedToken, const UpdateOperation &) override {}
+ void prepareRemove(RemoveOperation &) override {}
+ void handleRemove(FeedToken, const RemoveOperation &) override {}
+ void prepareDeleteBucket(DeleteBucketOperation &) override {}
+ void handleDeleteBucket(const DeleteBucketOperation &) override {}
+ void prepareMove(MoveOperation &) override {}
+ void handleMove(const MoveOperation &, std::shared_ptr<search::IDestructorCallback>) override {}
+ void heartBeat(search::SerialNum) override {}
+ void sync() override {}
+ void handlePruneRemovedDocuments(const PruneRemovedDocumentsOperation &) override {}
+ void handleCompactLidSpace(const CompactLidSpaceOperation &) override {}
void forceCommit(search::SerialNum) override { }
};
-} // namespace test
-
-} // namespace proton
-
+}