summaryrefslogtreecommitdiffstats
path: root/searchcore
diff options
context:
space:
mode:
Diffstat (limited to 'searchcore')
-rw-r--r--searchcore/src/tests/proton/documentdb/combiningfeedview/combiningfeedview_test.cpp42
-rw-r--r--searchcore/src/tests/proton/documentdb/document_subdbs/document_subdbs_test.cpp39
-rw-r--r--searchcore/src/tests/proton/documentdb/documentdb_test.cpp44
-rw-r--r--searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp79
-rw-r--r--searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp81
-rw-r--r--searchcore/src/tests/proton/feedtoken/feedtoken.cpp18
-rw-r--r--searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp126
-rw-r--r--searchcore/src/tests/proton/server/feedstates_test.cpp6
-rw-r--r--searchcore/src/vespa/searchcore/config/proton.def7
-rw-r--r--searchcore/src/vespa/searchcore/proton/common/feedtoken.cpp37
-rw-r--r--searchcore/src/vespa/searchcore/proton/common/feedtoken.h76
-rw-r--r--searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp108
-rw-r--r--searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.cpp5
-rw-r--r--searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.h18
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/combiningfeedview.cpp23
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/combiningfeedview.h11
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp93
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/feedhandler.h32
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/feedstates.cpp18
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/feedstates.h8
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/ifeedview.h12
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/iheartbeathandler.h14
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/operationdonecontext.cpp7
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/operationdonecontext.h10
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/putdonecontext.cpp11
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/putdonecontext.h2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/removedonecontext.cpp7
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/removedonecontext.h6
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp88
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h56
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/updatedonecontext.cpp3
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/updatedonecontext.h2
-rw-r--r--searchcore/src/vespa/searchcore/proton/test/dummy_feed_view.h44
33 files changed, 413 insertions, 720 deletions
diff --git a/searchcore/src/tests/proton/documentdb/combiningfeedview/combiningfeedview_test.cpp b/searchcore/src/tests/proton/documentdb/combiningfeedview/combiningfeedview_test.cpp
index d4af7b214b6..a997b3cc3db 100644
--- a/searchcore/src/tests/proton/documentdb/combiningfeedview/combiningfeedview_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/combiningfeedview/combiningfeedview_test.cpp
@@ -72,22 +72,20 @@ struct MyFeedView : public test::DummyFeedView
_metaStore.constructFreeList();
}
- // Implements IFeedView
- virtual const DocumentMetaStore *getDocumentMetaStorePtr() const override { return &_metaStore; }
- virtual void preparePut(PutOperation &) override { ++_preparePut; }
- virtual void handlePut(FeedToken *, const PutOperation &) override { ++_handlePut; }
- virtual void prepareUpdate(UpdateOperation &) override { ++_prepareUpdate; }
- virtual void handleUpdate(FeedToken *, const UpdateOperation &) override { ++_handleUpdate; }
- virtual void prepareRemove(RemoveOperation &) override { ++_prepareRemove; }
- virtual void handleRemove(FeedToken *, const RemoveOperation &) override { ++_handleRemove; }
- virtual void prepareDeleteBucket(DeleteBucketOperation &) override { ++_prepareDeleteBucket; }
- virtual void handleDeleteBucket(const DeleteBucketOperation &) override
- { ++_handleDeleteBucket; }
- virtual void prepareMove(MoveOperation &) override { ++_prepareMove; }
- virtual void handleMove(const MoveOperation &, IDestructorCallback::SP) override { ++_handleMove; }
- virtual void heartBeat(SerialNum) override { ++_heartBeat; }
- virtual void handlePruneRemovedDocuments(const PruneRemovedDocumentsOperation &) override { ++_handlePrune; }
- virtual void handleCompactLidSpace(const CompactLidSpaceOperation &op) override {
+ const DocumentMetaStore *getDocumentMetaStorePtr() const override { return &_metaStore; }
+ void preparePut(PutOperation &) override { ++_preparePut; }
+ void handlePut(FeedToken, const PutOperation &) override { ++_handlePut; }
+ void prepareUpdate(UpdateOperation &) override { ++_prepareUpdate; }
+ void handleUpdate(FeedToken, const UpdateOperation &) override { ++_handleUpdate; }
+ void prepareRemove(RemoveOperation &) override { ++_prepareRemove; }
+ void handleRemove(FeedToken, const RemoveOperation &) override { ++_handleRemove; }
+ void prepareDeleteBucket(DeleteBucketOperation &) override { ++_prepareDeleteBucket; }
+ void handleDeleteBucket(const DeleteBucketOperation &) override { ++_handleDeleteBucket; }
+ void prepareMove(MoveOperation &) override { ++_prepareMove; }
+ void handleMove(const MoveOperation &, IDestructorCallback::SP) override { ++_handleMove; }
+ void heartBeat(SerialNum) override { ++_heartBeat; }
+ void handlePruneRemovedDocuments(const PruneRemovedDocumentsOperation &) override { ++_handlePrune; }
+ void handleCompactLidSpace(const CompactLidSpaceOperation &op) override {
_wantedLidLimit = op.getLidLimit();
}
};
@@ -213,7 +211,7 @@ TEST_F("require that handlePut() sends to 1 feed view", Fixture)
{
PutOperation op = f.put(2);
op.setDbDocumentId(DbDocumentId(READY, 2));
- f._view.handlePut(NULL, op);
+ f._view.handlePut(FeedToken(), op);
EXPECT_EQUAL(1u, f._ready._view->_handlePut);
EXPECT_EQUAL(0u, f._removed._view->_handlePut);
EXPECT_EQUAL(0u, f._notReady._view->_handlePut);
@@ -225,7 +223,7 @@ TEST_F("require that handlePut() sends to 2 feed views", Fixture)
PutOperation op = f.put(2);
op.setDbDocumentId(DbDocumentId(NOT_READY, 2));
op.setPrevDbDocumentId(DbDocumentId(REMOVED, 2));
- f._view.handlePut(NULL, op);
+ f._view.handlePut(FeedToken(), op);
EXPECT_EQUAL(0u, f._ready._view->_handlePut);
EXPECT_EQUAL(1u, f._removed._view->_handlePut);
EXPECT_EQUAL(1u, f._notReady._view->_handlePut);
@@ -259,7 +257,7 @@ TEST_F("require that handleRemove() sends op with valid dbdId to 1 feed view", F
{
RemoveOperation op = f.remove(1);
op.setDbDocumentId(DbDocumentId(REMOVED, 1));
- f._view.handleRemove(NULL, op);
+ f._view.handleRemove(FeedToken(), op);
EXPECT_EQUAL(0u, f._ready._view->_handleRemove);
EXPECT_EQUAL(1u, f._removed._view->_handleRemove);
EXPECT_EQUAL(0u, f._notReady._view->_handleRemove);
@@ -271,7 +269,7 @@ TEST_F("require that handleRemove() sends op with valid dbdId to 2 feed views",
RemoveOperation op = f.remove(1);
op.setDbDocumentId(DbDocumentId(REMOVED, 1));
op.setPrevDbDocumentId(DbDocumentId(READY, 1));
- f._view.handleRemove(NULL, op);
+ f._view.handleRemove(FeedToken(), op);
EXPECT_EQUAL(1u, f._ready._view->_handleRemove);
EXPECT_EQUAL(1u, f._removed._view->_handleRemove);
EXPECT_EQUAL(0u, f._notReady._view->_handleRemove);
@@ -283,7 +281,7 @@ TEST_F("require that handleRemove() sends op with invalid dbdId to prev view", F
RemoveOperation op = f.remove(1);
// can be used in the case where removed feed view does not remember removes.
op.setPrevDbDocumentId(DbDocumentId(READY, 1));
- f._view.handleRemove(NULL, op);
+ f._view.handleRemove(FeedToken(), op);
EXPECT_EQUAL(1u, f._ready._view->_handleRemove);
EXPECT_EQUAL(0u, f._removed._view->_handleRemove);
EXPECT_EQUAL(0u, f._notReady._view->_handleRemove);
@@ -317,7 +315,7 @@ TEST_F("require that handleUpdate() sends op to correct view", Fixture)
UpdateOperation op = f.update(1);
op.setDbDocumentId(DbDocumentId(READY, 1));
op.setPrevDbDocumentId(DbDocumentId(READY, 1));
- f._view.handleUpdate(NULL, op);
+ f._view.handleUpdate(FeedToken(), op);
EXPECT_EQUAL(1u, f._ready._view->_handleUpdate);
EXPECT_EQUAL(0u, f._removed._view->_handleUpdate);
EXPECT_EQUAL(0u, f._notReady._view->_handleUpdate);
diff --git a/searchcore/src/tests/proton/documentdb/document_subdbs/document_subdbs_test.cpp b/searchcore/src/tests/proton/documentdb/document_subdbs/document_subdbs_test.cpp
index 547e400cd76..8369ec0630d 100644
--- a/searchcore/src/tests/proton/documentdb/document_subdbs/document_subdbs_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/document_subdbs/document_subdbs_test.cpp
@@ -745,18 +745,15 @@ struct DocumentHandler
op.setSerialNum(serialNum);
return op;
}
- MoveOperation createMove(Document::UP doc, Timestamp timestamp,
- DbDocumentId sourceDbdId,
- uint32_t targetSubDbId,
- SerialNum serialNum)
+ MoveOperation createMove(Document::UP doc, Timestamp timestamp, DbDocumentId sourceDbdId,
+ uint32_t targetSubDbId, SerialNum serialNum)
{
proton::test::Document testDoc(Document::SP(doc.release()), 0, timestamp);
MoveOperation op(testDoc.getBucket(), testDoc.getTimestamp(), testDoc.getDoc(), sourceDbdId, targetSubDbId);
op.setSerialNum(serialNum);
return op;
}
- RemoveOperation createRemove(const DocumentId &docId, Timestamp timestamp,
- SerialNum serialNum)
+ RemoveOperation createRemove(const DocumentId &docId, Timestamp timestamp, SerialNum serialNum)
{
const document::GlobalId &gid = docId.getGlobalId();
BucketId bucket = gid.convertToBucketId();
@@ -769,7 +766,7 @@ struct DocumentHandler
void putDoc(PutOperation &op) {
IFeedView::SP feedView = _f._subDb.getFeedView();
_f.runInMaster([&]() { feedView->preparePut(op);
- feedView->handlePut(NULL, op); } );
+ feedView->handlePut(FeedToken(), op); } );
}
void moveDoc(MoveOperation &op) {
IFeedView::SP feedView = _f._subDb.getFeedView();
@@ -779,11 +776,10 @@ struct DocumentHandler
{
IFeedView::SP feedView = _f._subDb.getFeedView();
_f.runInMaster([&]() { feedView->prepareRemove(op);
- feedView->handleRemove(NULL, op); } );
+ feedView->handleRemove(FeedToken(), op); } );
}
void putDocs() {
- PutOperation putOp = createPut(std::move(createDoc(1, 22, 33)),
- Timestamp(10), 10);
+ PutOperation putOp = createPut(std::move(createDoc(1, 22, 33)), Timestamp(10), 10);
putDoc(putOp);
putOp = createPut(std::move(createDoc(2, 44, 55)), Timestamp(20), 20);
putDoc(putOp);
@@ -791,13 +787,8 @@ struct DocumentHandler
};
void
-assertAttribute(const AttributeGuard &attr,
- const vespalib::string &name,
- uint32_t numDocs,
- int64_t doc1Value,
- int64_t doc2Value,
- SerialNum createSerialNum,
- SerialNum lastSerialNum)
+assertAttribute(const AttributeGuard &attr, const vespalib::string &name, uint32_t numDocs,
+ int64_t doc1Value, int64_t doc2Value, SerialNum createSerialNum, SerialNum lastSerialNum)
{
EXPECT_EQUAL(name, attr->getName());
EXPECT_EQUAL(numDocs, attr->getNumDocs());
@@ -808,17 +799,13 @@ assertAttribute(const AttributeGuard &attr,
}
void
-assertAttribute1(const AttributeGuard &attr,
- SerialNum createSerialNum,
- SerialNum lastSerialNum)
+assertAttribute1(const AttributeGuard &attr, SerialNum createSerialNum, SerialNum lastSerialNum)
{
assertAttribute(attr, "attr1", 3, 22, 44, createSerialNum, lastSerialNum);
}
void
-assertAttribute2(const AttributeGuard &attr,
- SerialNum createSerialNum,
- SerialNum lastSerialNum)
+assertAttribute2(const AttributeGuard &attr, SerialNum createSerialNum, SerialNum lastSerialNum)
{
assertAttribute(attr, "attr2", 3, 33, 55, createSerialNum, lastSerialNum);
}
@@ -877,12 +864,10 @@ TEST_F("require that regular attributes are populated during reprocessing",
requireThatAttributesArePopulatedDuringReprocessing<SearchableFixtureTwoField, ConfigDir2>(f);
}
-namespace
-{
+namespace {
bool
-assertOperation(DocumentOperation &op,
- uint32_t expPrevSubDbId, uint32_t expPrevLid,
+assertOperation(DocumentOperation &op, uint32_t expPrevSubDbId, uint32_t expPrevLid,
uint32_t expSubDbId, uint32_t expLid)
{
if (!EXPECT_EQUAL(expPrevSubDbId, op.getPrevSubDbId())) {
diff --git a/searchcore/src/tests/proton/documentdb/documentdb_test.cpp b/searchcore/src/tests/proton/documentdb/documentdb_test.cpp
index 5e3e5cd78be..f7e09eeec3f 100644
--- a/searchcore/src/tests/proton/documentdb/documentdb_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/documentdb_test.cpp
@@ -3,8 +3,6 @@
#include <tests/proton/common/dummydbowner.h>
#include <vespa/document/datatype/documenttype.h>
#include <vespa/fastos/file.h>
-#include <vespa/messagebus/emptyreply.h>
-#include <vespa/messagebus/testlib/receptor.h>
#include <vespa/persistence/spi/test.h>
#include <vespa/searchcore/proton/attribute/flushableattribute.h>
#include <vespa/searchcore/proton/common/feedtoken.h>
@@ -46,25 +44,11 @@ using vespalib::Slime;
namespace {
-class LocalTransport : public FeedToken::ITransport {
- mbus::Receptor _receptor;
-
-public:
- void send(mbus::Reply::UP reply) {
- fprintf(stderr, "in local transport.");
- _receptor.handleReply(std::move(reply));
- }
-
- mbus::Reply::UP getReply() {
- return _receptor.getReply(10000);
- }
-};
-
struct MyDBOwner : public DummyDBOwner
{
std::shared_ptr<DocumentDBReferenceRegistry> _registry;
MyDBOwner();
- ~MyDBOwner();
+ ~MyDBOwner() override;
std::shared_ptr<IDocumentDBReferenceRegistry> getDocumentDBReferenceRegistry() const override {
return _registry;
}
@@ -74,7 +58,7 @@ MyDBOwner::MyDBOwner()
: DummyDBOwner(),
_registry(std::make_shared<DocumentDBReferenceRegistry>())
{}
-MyDBOwner::~MyDBOwner() {}
+MyDBOwner::~MyDBOwner() = default;
struct Fixture {
DummyWireService _dummy;
@@ -124,16 +108,14 @@ Fixture::Fixture()
_db->waitForOnlineState();
}
-Fixture::~Fixture() {}
+Fixture::~Fixture() = default;
const IFlushTarget *
extractRealFlushTarget(const IFlushTarget *target)
{
- const JobTrackedFlushTarget *tracked =
- dynamic_cast<const JobTrackedFlushTarget*>(target);
+ const auto tracked = dynamic_cast<const JobTrackedFlushTarget*>(target);
if (tracked != nullptr) {
- const ThreadedFlushTarget *threaded =
- dynamic_cast<const ThreadedFlushTarget*>(&tracked->getTarget());
+ const auto threaded = dynamic_cast<const ThreadedFlushTarget*>(&tracked->getTarget());
if (threaded != nullptr) {
return threaded->getFlushTarget().get();
}
@@ -144,10 +126,10 @@ extractRealFlushTarget(const IFlushTarget *target)
TEST_F("requireThatIndexFlushTargetIsUsed", Fixture) {
auto targets = f._db->getFlushTargets();
ASSERT_TRUE(!targets.empty());
- const IndexFlushTarget *index = 0;
+ const IndexFlushTarget *index = nullptr;
for (size_t i = 0; i < targets.size(); ++i) {
const IFlushTarget *target = extractRealFlushTarget(targets[i].get());
- if (target != NULL) {
+ if (target != nullptr) {
index = dynamic_cast<const IndexFlushTarget *>(target);
}
if (index) {
@@ -161,9 +143,9 @@ template <typename Target>
size_t getNumTargets(const std::vector<IFlushTarget::SP> & targets)
{
size_t retval = 0;
- for (size_t i = 0; i < targets.size(); ++i) {
- const IFlushTarget *target = extractRealFlushTarget(targets[i].get());
- if (dynamic_cast<const Target*>(target) == NULL) {
+ for (const auto & candidate : targets) {
+ const IFlushTarget *target = extractRealFlushTarget(candidate.get());
+ if (dynamic_cast<const Target*>(target) == nullptr) {
continue;
}
retval++;
@@ -244,16 +226,16 @@ TEST_F("requireThatStateIsReported", Fixture)
TEST_F("require that session manager can be explored", Fixture)
{
- EXPECT_TRUE(DocumentDBExplorer(f._db).get_child("session").get() != nullptr);
+ EXPECT_TRUE(DocumentDBExplorer(f._db).get_child("session"));
}
TEST_F("require that document db registers reference", Fixture)
{
auto &registry = f._myDBOwner._registry;
auto reference = registry->get("typea");
- EXPECT_TRUE(reference.get() != nullptr);
+ EXPECT_TRUE(reference);
auto attr = reference->getAttribute("attr1");
- EXPECT_TRUE(attr.get() != nullptr);
+ EXPECT_TRUE(attr);
EXPECT_EQUAL(search::attribute::BasicType::INT32, attr->getBasicType());
}
diff --git a/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp b/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp
index b0b06a238c9..823c31dd1c2 100644
--- a/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp
@@ -1,9 +1,5 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include <vespa/documentapi/messagebus/documentprotocol.h>
-#include <vespa/documentapi/messagebus/messages/documentreply.h>
-#include <vespa/documentapi/messagebus/messages/removedocumentreply.h>
-#include <vespa/documentapi/messagebus/messages/updatedocumentreply.h>
#include <vespa/persistence/spi/result.h>
#include <vespa/searchcore/proton/bucketdb/bucketdbhandler.h>
#include <vespa/searchcore/proton/test/bucketfactory.h>
@@ -40,11 +36,6 @@ using document::DocumentId;
using document::DocumentTypeRepo;
using document::DocumentUpdate;
using document::GlobalId;
-using documentapi::DocumentProtocol;
-using documentapi::DocumentReply;
-using documentapi::RemoveDocumentReply;
-using documentapi::UpdateDocumentReply;
-using mbus::Reply;
using search::IDestructorCallback;
using search::SerialNum;
using search::index::schema::CollectionType;
@@ -140,12 +131,6 @@ struct MyReplayConfig : public IReplayConfig {
virtual void replayConfig(SerialNum) override {}
};
-void ackToken(FeedToken *token) {
- if (token != NULL) {
- token->ack();
- }
-}
-
struct MyDocumentMetaStore {
struct Entry {
DbDocumentId _id;
@@ -195,9 +180,9 @@ struct MyFeedView : public test::DummyFeedView {
int update_count;
SerialNum update_serial;
MyFeedView(const DocumentTypeRepo::SP &dtr);
- ~MyFeedView();
+ ~MyFeedView() override;
void resetPutLatch(uint32_t count) { putLatch.reset(new vespalib::CountDownLatch(count)); }
- virtual void preparePut(PutOperation &op) override {
+ void preparePut(PutOperation &op) override {
prepareDocumentOperation(op, op.getDocument()->getId().getGlobalId());
}
void prepareDocumentOperation(DocumentOperation &op, const GlobalId &gid) {
@@ -208,7 +193,8 @@ struct MyFeedView : public test::DummyFeedView {
op.setPrevTimestamp(entry->_prevTimestamp);
}
}
- virtual void handlePut(FeedToken *token, const PutOperation &putOp) override {
+ void handlePut(FeedToken token, const PutOperation &putOp) override {
+ (void) token;
LOG(info, "MyFeedView::handlePut(): docId(%s), putCount(%u), putLatchCount(%u)",
putOp.getDocument()->getId().toString().c_str(), put_count,
(putLatch.get() != NULL ? putLatch->getCount() : 0u));
@@ -221,23 +207,24 @@ struct MyFeedView : public test::DummyFeedView {
if (putLatch.get() != NULL) {
putLatch->countDown();
}
- ackToken(token);
}
- virtual void prepareUpdate(UpdateOperation &op) override {
+ void prepareUpdate(UpdateOperation &op) override {
prepareDocumentOperation(op, op.getUpdate()->getId().getGlobalId());
}
- virtual void handleUpdate(FeedToken *token, const UpdateOperation &op) override {
+ void handleUpdate(FeedToken token, const UpdateOperation &op) override {
+ (void) token;
+
++update_count;
update_serial = op.getSerialNum();
- ackToken(token);
}
- virtual void handleRemove(FeedToken *token, const RemoveOperation &) override
- { ++remove_count; ackToken(token); }
- virtual void handleMove(const MoveOperation &, IDestructorCallback::SP) override { ++move_count; }
- virtual void heartBeat(SerialNum) override { ++heartbeat_count; }
- virtual void handlePruneRemovedDocuments(
- const PruneRemovedDocumentsOperation &) override { ++prune_removed_count; }
- virtual const ISimpleDocumentMetaStore *getDocumentMetaStorePtr() const override {
+ void handleRemove(FeedToken token, const RemoveOperation &) override {
+ (void) token;
+ ++remove_count;
+ }
+ void handleMove(const MoveOperation &, IDestructorCallback::SP) override { ++move_count; }
+ void heartBeat(SerialNum) override { ++heartbeat_count; }
+ void handlePruneRemovedDocuments(const PruneRemovedDocumentsOperation &) override { ++prune_removed_count; }
+ const ISimpleDocumentMetaStore *getDocumentMetaStorePtr() const override {
return NULL;
}
};
@@ -299,7 +286,7 @@ struct UpdateContext {
};
-struct MyTransport : public FeedToken::ITransport {
+struct MyTransport : public feedtoken::ITransport {
vespalib::Gate gate;
ResultUP result;
bool documentWasFound;
@@ -313,12 +300,11 @@ struct MyTransport : public FeedToken::ITransport {
};
MyTransport::MyTransport() : gate(), result(), documentWasFound(false) {}
-MyTransport::~MyTransport() {}
+MyTransport::~MyTransport() = default;
struct FeedTokenContext {
MyTransport transport;
- FeedToken::UP token_ap;
- FeedToken &token;
+ FeedToken token;
FeedTokenContext();
~FeedTokenContext();
@@ -327,16 +313,14 @@ struct FeedTokenContext {
if (transport.result.get()) {
return transport.result.get();
}
- return &token.getResult();
+ return &token->getResult();
}
};
FeedTokenContext::FeedTokenContext()
: transport(),
- token_ap(new FeedToken(transport)),
- token(*token_ap)
-{
-}
+ token(feedtoken::make(transport))
+{}
FeedTokenContext::~FeedTokenContext() = default;
@@ -432,8 +416,7 @@ struct FeedHandlerFixture
handler.init(1);
}
- ~FeedHandlerFixture()
- {
+ ~FeedHandlerFixture() {
writeService.sync();
}
template <class FunctionType>
@@ -484,7 +467,7 @@ TEST_F("require that outdated remove is ignored", FeedHandlerFixture)
static_cast<DocumentOperation &>(*op).setPrevDbDocumentId(DbDocumentId(4));
static_cast<DocumentOperation &>(*op).setPrevTimestamp(Timestamp(10000));
FeedTokenContext token_context;
- f.handler.performOperation(std::move(token_context.token_ap), std::move(op));
+ f.handler.performOperation(std::move(token_context.token), std::move(op));
EXPECT_EQUAL(0, f.feedView.remove_count);
EXPECT_EQUAL(0, f.tls_writer.store_count);
}
@@ -496,7 +479,7 @@ TEST_F("require that outdated put is ignored", FeedHandlerFixture)
Timestamp(10), doc_context.doc));
static_cast<DocumentOperation &>(*op).setPrevTimestamp(Timestamp(10000));
FeedTokenContext token_context;
- f.handler.performOperation(std::move(token_context.token_ap), std::move(op));
+ f.handler.performOperation(std::move(token_context.token), std::move(op));
EXPECT_EQUAL(0, f.feedView.put_count);
EXPECT_EQUAL(0, f.tls_writer.store_count);
}
@@ -575,7 +558,7 @@ TEST_F("require that remove of unknown document with known data type stores remo
DocumentContext doc_context("id:test:searchdocument::foo", *f.schema.builder);
FeedOperation::UP op(new RemoveOperation(doc_context.bucketId, Timestamp(10), doc_context.doc->getId()));
FeedTokenContext token_context;
- f.handler.performOperation(std::move(token_context.token_ap), std::move(op));
+ f.handler.performOperation(std::move(token_context.token), std::move(op));
EXPECT_EQUAL(1, f.feedView.remove_count);
EXPECT_EQUAL(1, f.tls_writer.store_count);
}
@@ -585,7 +568,7 @@ TEST_F("require that partial update for non-existing document is tagged as such"
UpdateContext upCtx("id:test:searchdocument::foo", *f.schema.builder);
FeedOperation::UP op(new UpdateOperation(upCtx.bucketId, Timestamp(10), upCtx.update));
FeedTokenContext token_context;
- f.handler.performOperation(std::move(token_context.token_ap), std::move(op));
+ f.handler.performOperation(std::move(token_context.token), std::move(op));
const UpdateResult *result = static_cast<const UpdateResult *>(token_context.getResult());
EXPECT_FALSE(token_context.transport.documentWasFound);
@@ -603,7 +586,7 @@ TEST_F("require that partial update for non-existing document is created if spec
f.feedView.metaStore.insert(upCtx.update->getId().getGlobalId(), MyDocumentMetaStore::Entry(5, 5, Timestamp(10)));
FeedOperation::UP op(new UpdateOperation(upCtx.bucketId, Timestamp(10), upCtx.update));
FeedTokenContext token_context;
- f.handler.performOperation(std::move(token_context.token_ap), std::move(op));
+ f.handler.performOperation(std::move(token_context.token), std::move(op));
const UpdateResult *result = static_cast<const UpdateResult *>(token_context.getResult());
EXPECT_TRUE(token_context.transport.documentWasFound);
@@ -624,7 +607,7 @@ TEST_F("require that put is rejected if resource limit is reached", FeedHandlerF
DocumentContext docCtx("id:test:searchdocument::foo", *f.schema.builder);
FeedOperation::UP op = std::make_unique<PutOperation>(docCtx.bucketId, Timestamp(10), docCtx.doc);
FeedTokenContext token;
- f.handler.performOperation(std::move(token.token_ap), std::move(op));
+ f.handler.performOperation(std::move(token.token), std::move(op));
EXPECT_EQUAL(0, f.feedView.put_count);
EXPECT_EQUAL(Result::RESOURCE_EXHAUSTED, token.getResult()->getErrorCode());
EXPECT_EQUAL("Put operation rejected for document 'id:test:searchdocument::foo' of type 'searchdocument': 'Attribute resource limit reached'",
@@ -639,7 +622,7 @@ TEST_F("require that update is rejected if resource limit is reached", FeedHandl
UpdateContext updCtx("id:test:searchdocument::foo", *f.schema.builder);
FeedOperation::UP op = std::make_unique<UpdateOperation>(updCtx.bucketId, Timestamp(10), updCtx.update);
FeedTokenContext token;
- f.handler.performOperation(std::move(token.token_ap), std::move(op));
+ f.handler.performOperation(std::move(token.token), std::move(op));
EXPECT_EQUAL(0, f.feedView.update_count);
EXPECT_TRUE(dynamic_cast<const UpdateResult *>(token.getResult()));
EXPECT_EQUAL(Result::RESOURCE_EXHAUSTED, token.getResult()->getErrorCode());
@@ -655,7 +638,7 @@ TEST_F("require that remove is NOT rejected if resource limit is reached", FeedH
DocumentContext docCtx("id:test:searchdocument::foo", *f.schema.builder);
FeedOperation::UP op = std::make_unique<RemoveOperation>(docCtx.bucketId, Timestamp(10), docCtx.doc->getId());
FeedTokenContext token;
- f.handler.performOperation(std::move(token.token_ap), std::move(op));
+ f.handler.performOperation(std::move(token.token), std::move(op));
EXPECT_EQUAL(1, f.feedView.remove_count);
EXPECT_EQUAL(Result::NONE, token.getResult()->getErrorCode());
EXPECT_EQUAL("", token.getResult()->getErrorMessage());
diff --git a/searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp b/searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp
index 4eefbed0a53..54aefef4463 100644
--- a/searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp
@@ -1,9 +1,5 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include <vespa/documentapi/messagebus/documentprotocol.h>
-#include <vespa/documentapi/messagebus/messages/documentreply.h>
-#include <vespa/documentapi/messagebus/messages/removedocumentreply.h>
-#include <vespa/documentapi/messagebus/messages/updatedocumentreply.h>
#include <vespa/searchcore/proton/attribute/i_attribute_writer.h>
#include <vespa/searchcore/proton/test/bucketfactory.h>
#include <vespa/searchcore/proton/common/commit_time_tracker.h>
@@ -11,7 +7,6 @@
#include <vespa/searchcore/proton/documentmetastore/lidreusedelayer.h>
#include <vespa/searchcore/proton/index/i_index_writer.h>
#include <vespa/searchcore/proton/server/executorthreadingservice.h>
-#include <vespa/searchcore/proton/server/ifrozenbuckethandler.h>
#include <vespa/searchcore/proton/server/isummaryadapter.h>
#include <vespa/searchcore/proton/server/matchview.h>
#include <vespa/searchcore/proton/server/searchable_feed_view.h>
@@ -25,9 +20,7 @@
#include <vespa/searchcore/proton/test/thread_utils.h>
#include <vespa/searchcore/proton/test/threading_service_observer.h>
#include <vespa/searchlib/attribute/attributefactory.h>
-#include <vespa/searchlib/common/idestructorcallback.h>
-#include <vespa/searchlib/docstore/cachestats.h>
-#include <vespa/searchlib/docstore/idocumentstore.h>
+
#include <vespa/searchlib/index/docbuilder.h>
#include <vespa/log/log.h>
@@ -37,8 +30,6 @@ using document::BucketId;
using document::Document;
using document::DocumentId;
using document::DocumentUpdate;
-using documentapi::DocumentProtocol;
-using documentapi::RemoveDocumentReply;
using fastos::TimeStamp;
using proton::matching::SessionManager;
using proton::test::MockGidToLidChangeHandler;
@@ -62,7 +53,6 @@ using namespace search::index;
typedef SearchableFeedView::SerialNum SerialNum;
typedef search::DocumentIdT DocumentIdT;
-typedef DocumentProtocol::MessageType MessageType;
struct MyLidVector : public std::vector<DocumentIdT>
{
@@ -235,7 +225,8 @@ struct MyDocumentStore : public test::DummyDocumentStore
_lastSyncToken(0),
_compactLidSpaceLidLimit(0)
{}
- virtual Document::UP read(DocumentIdT lid, const document::DocumentTypeRepo &) const override {
+ ~MyDocumentStore() override;
+ Document::UP read(DocumentIdT lid, const document::DocumentTypeRepo &) const override {
DocMap::const_iterator itr = _docs.find(lid);
if (itr != _docs.end()) {
Document::UP retval(itr->second->clone());
@@ -243,27 +234,29 @@ struct MyDocumentStore : public test::DummyDocumentStore
}
return Document::UP();
}
- virtual void write(uint64_t syncToken, DocumentIdT lid, const document::Document& doc) override {
+ void write(uint64_t syncToken, DocumentIdT lid, const document::Document& doc) override {
_lastSyncToken = syncToken;
_docs[lid] = Document::SP(doc.clone());
}
- virtual void write(uint64_t syncToken, DocumentIdT lid, const vespalib::nbostream & os) override {
+ void write(uint64_t syncToken, DocumentIdT lid, const vespalib::nbostream & os) override {
_lastSyncToken = syncToken;
_docs[lid] = std::make_shared<Document>(_repo, const_cast<vespalib::nbostream &>(os));
}
- virtual void remove(uint64_t syncToken, DocumentIdT lid) override {
+ void remove(uint64_t syncToken, DocumentIdT lid) override {
_lastSyncToken = syncToken;
_docs.erase(lid);
}
- virtual uint64_t initFlush(uint64_t syncToken) override {
+ uint64_t initFlush(uint64_t syncToken) override {
return syncToken;
}
- virtual uint64_t lastSyncToken() const override { return _lastSyncToken; }
- virtual void compactLidSpace(uint32_t wantedDocLidLimit) override {
+ uint64_t lastSyncToken() const override { return _lastSyncToken; }
+ void compactLidSpace(uint32_t wantedDocLidLimit) override {
_compactLidSpaceLidLimit = wantedDocLidLimit;
}
};
+MyDocumentStore::~MyDocumentStore() = default;
+
struct MySummaryManager : public test::DummySummaryManager
{
MyDocumentStore _store;
@@ -405,7 +398,7 @@ MyAttributeWriter::MyAttributeWriter(MyTracer &tracer)
}
MyAttributeWriter::~MyAttributeWriter() {}
-struct MyTransport : public FeedToken::ITransport
+struct MyTransport : public feedtoken::ITransport
{
ResultUP lastResult;
vespalib::Gate _gate;
@@ -421,7 +414,7 @@ struct MyTransport : public FeedToken::ITransport
};
MyTransport::MyTransport(MyTracer &tracer) : lastResult(), _gate(), _tracer(tracer) {}
-MyTransport::~MyTransport() {}
+MyTransport::~MyTransport() = default;
struct MyResultHandler : public IGenericResultHandler
{
@@ -491,7 +484,7 @@ struct FeedTokenContext
};
FeedTokenContext::FeedTokenContext(MyTracer &tracer)
- : mt(tracer), ft(mt)
+ : mt(tracer), ft(feedtoken::make(mt))
{}
FeedTokenContext::~FeedTokenContext() = default;
@@ -571,7 +564,7 @@ struct FixtureBase
return doc("doc:test:1", timestamp);
}
- void performPut(FeedToken *token, PutOperation &op) {
+ void performPut(FeedToken token, PutOperation &op) {
getFeedView().preparePut(op);
op.setSerialNum(++serial);
getFeedView().handlePut(token, op);
@@ -586,10 +579,10 @@ struct FixtureBase
void putAndWait(const DocumentContext &docCtx) {
FeedTokenContext token(_tracer);
PutOperation op(docCtx.bid, docCtx.ts, docCtx.doc);
- runInMaster([&] () { performPut(&token.ft, op); });
+ runInMaster([&] () { performPut(token.ft, op); });
}
- void performUpdate(FeedToken *token, UpdateOperation &op) {
+ void performUpdate(FeedToken token, UpdateOperation &op) {
getFeedView().prepareUpdate(op);
op.setSerialNum(++serial);
getFeedView().handleUpdate(token, op);
@@ -598,25 +591,21 @@ struct FixtureBase
void updateAndWait(const DocumentContext &docCtx) {
FeedTokenContext token(_tracer);
UpdateOperation op(docCtx.bid, docCtx.ts, docCtx.upd);
- runInMaster([&] () { performUpdate(&token.ft, op); });
+ runInMaster([&] () { performUpdate(token.ft, op); });
}
- void performRemove(FeedToken *token, RemoveOperation &op) {
+ void performRemove(FeedToken token, RemoveOperation &op) {
getFeedView().prepareRemove(op);
if (op.getValidNewOrPrevDbdId()) {
op.setSerialNum(++serial);
- getFeedView().handleRemove(token, op);
- } else {
- if (token != NULL) {
- token->ack();
- }
+ getFeedView().handleRemove(std::move(token), op);
}
}
void removeAndWait(const DocumentContext &docCtx) {
FeedTokenContext token(_tracer);
RemoveOperation op(docCtx.bid, docCtx.ts, docCtx.doc->getId());
- runInMaster([&] () { performRemove(&token.ft, op); });
+ runInMaster([&] () { performRemove(token.ft, op); });
}
void removeAndWait(const DocumentContext::List &docs) {
@@ -1211,12 +1200,12 @@ TEST_F("require that commit is not called when inside a commit interval",
EXPECT_EQUAL(0u, f.miw._commitCount);
EXPECT_EQUAL(0u, f.maw._commitCount);
EXPECT_EQUAL(0u, f._docIdLimit.get());
- f.assertTrace("ack(Result(0, )),"
- "put(adapter=attribute,serialNum=1,lid=1,commit=0),"
+ f.assertTrace("put(adapter=attribute,serialNum=1,lid=1,commit=0),"
"put(adapter=index,serialNum=1,lid=1,commit=0),"
"ack(Result(0, )),"
"remove(adapter=attribute,serialNum=2,lid=1,commit=0),"
- "remove(adapter=index,serialNum=2,lid=1,commit=0)");
+ "remove(adapter=index,serialNum=2,lid=1,commit=0),"
+ "ack(Result(0, ))");
}
TEST_F("require that commit is called when crossing a commit interval",
@@ -1232,19 +1221,18 @@ TEST_F("require that commit is called when crossing a commit interval",
f.removeAndWait(dc);
EXPECT_EQUAL(2u, f.miw._commitCount);
EXPECT_EQUAL(2u, f.maw._commitCount);
- f.assertTrace("ack(Result(0, )),"
- "put(adapter=attribute,serialNum=1,lid=1,commit=1),"
+ f.assertTrace("put(adapter=attribute,serialNum=1,lid=1,commit=1),"
"put(adapter=index,serialNum=1,lid=1,commit=0),"
"commit(adapter=index,serialNum=1),"
"ack(Result(0, )),"
"remove(adapter=attribute,serialNum=2,lid=1,commit=1),"
"remove(adapter=index,serialNum=2,lid=1,commit=0),"
- "commit(adapter=index,serialNum=2)");
+ "commit(adapter=index,serialNum=2),"
+ "ack(Result(0, ))");
}
-TEST_F("require that commit is not implicitly called after "
- "handover to maintenance job",
+TEST_F("require that commit is not implicitly called after handover to maintenance job",
SearchableFeedViewFixture(SHORT_DELAY))
{
f._commitTimeTracker.setReplayDone();
@@ -1259,12 +1247,12 @@ TEST_F("require that commit is not implicitly called after "
EXPECT_EQUAL(0u, f.miw._commitCount);
EXPECT_EQUAL(0u, f.maw._commitCount);
EXPECT_EQUAL(0u, f._docIdLimit.get());
- f.assertTrace("ack(Result(0, )),"
- "put(adapter=attribute,serialNum=1,lid=1,commit=0),"
+ f.assertTrace("put(adapter=attribute,serialNum=1,lid=1,commit=0),"
"put(adapter=index,serialNum=1,lid=1,commit=0),"
"ack(Result(0, )),"
"remove(adapter=attribute,serialNum=2,lid=1,commit=0),"
- "remove(adapter=index,serialNum=2,lid=1,commit=0)");
+ "remove(adapter=index,serialNum=2,lid=1,commit=0),"
+ "ack(Result(0, ))");
}
TEST_F("require that forceCommit updates docid limit",
@@ -1280,15 +1268,14 @@ TEST_F("require that forceCommit updates docid limit",
EXPECT_EQUAL(1u, f.miw._commitCount);
EXPECT_EQUAL(1u, f.maw._commitCount);
EXPECT_EQUAL(2u, f._docIdLimit.get());
- f.assertTrace("ack(Result(0, )),"
- "put(adapter=attribute,serialNum=1,lid=1,commit=0),"
+ f.assertTrace("put(adapter=attribute,serialNum=1,lid=1,commit=0),"
"put(adapter=index,serialNum=1,lid=1,commit=0),"
+ "ack(Result(0, )),"
"commit(adapter=attribute,serialNum=1),"
"commit(adapter=index,serialNum=1)");
}
-TEST_F("require that forceCommit updates docid limit during shrink",
- SearchableFeedViewFixture(LONG_DELAY))
+TEST_F("require that forceCommit updates docid limit during shrink", SearchableFeedViewFixture(LONG_DELAY))
{
f._commitTimeTracker.setReplayDone();
f.putAndWait(f.makeDummyDocs(0, 3, 1000));
diff --git a/searchcore/src/tests/proton/feedtoken/feedtoken.cpp b/searchcore/src/tests/proton/feedtoken/feedtoken.cpp
index 530c9ebef39..8bd1d92657f 100644
--- a/searchcore/src/tests/proton/feedtoken/feedtoken.cpp
+++ b/searchcore/src/tests/proton/feedtoken/feedtoken.cpp
@@ -5,7 +5,7 @@
using namespace proton;
-class LocalTransport : public FeedToken::ITransport {
+class LocalTransport : public feedtoken::ITransport {
private:
size_t _receivedCount;
@@ -45,8 +45,9 @@ void
Test::testAck()
{
LocalTransport transport;
- FeedToken token(transport);
- token.ack();
+ {
+ FeedToken token = feedtoken::make(transport);
+ }
EXPECT_EQUAL(1u, transport.getReceivedCount());
}
@@ -54,8 +55,8 @@ void
Test::testFail()
{
LocalTransport transport;
- FeedToken token(transport);
- token.fail();
+ FeedToken token = feedtoken::make(transport);
+ token->fail();
EXPECT_EQUAL(1u, transport.getReceivedCount());
}
@@ -70,9 +71,10 @@ Test::testHandover()
LocalTransport transport;
- FeedToken token(transport);
- token = MyHandover::handover(token);
- token.ack();
+ {
+ FeedToken token = feedtoken::make(transport);
+ token = MyHandover::handover(token);
+ }
EXPECT_EQUAL(1u, transport.getReceivedCount());
}
diff --git a/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp b/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp
index f8c0e0ac2f6..f7b35851696 100644
--- a/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp
+++ b/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp
@@ -127,33 +127,29 @@ struct MyDocumentRetriever : DocumentRetrieverBaseForTest {
MyDocumentRetriever(const Document *d, Timestamp ts, DocumentId &last_id)
: repo(), document(d), timestamp(ts), last_doc_id(last_id) {}
- virtual const document::DocumentTypeRepo &getDocumentTypeRepo() const override {
+ const document::DocumentTypeRepo &getDocumentTypeRepo() const override {
return repo;
}
- virtual void getBucketMetaData(const storage::spi::Bucket &,
- search::DocumentMetaData::Vector &v) const override {
+ void getBucketMetaData(const storage::spi::Bucket &, search::DocumentMetaData::Vector &v) const override {
if (document != 0) {
v.push_back(getDocumentMetaData(document->getId()));
}
}
- virtual DocumentMetaData getDocumentMetaData(const DocumentId &id) const override {
+ DocumentMetaData getDocumentMetaData(const DocumentId &id) const override {
last_doc_id = id;
if (document != 0) {
- return DocumentMetaData(1, timestamp, document::BucketId(1),
- document->getId().getGlobalId());
+ return DocumentMetaData(1, timestamp, document::BucketId(1), document->getId().getGlobalId());
}
return DocumentMetaData();
}
- virtual document::Document::UP getDocument(search::DocumentIdT) const override {
+ document::Document::UP getDocument(search::DocumentIdT) const override {
if (document != 0) {
return Document::UP(document->clone());
}
return Document::UP();
}
- virtual CachedSelect::SP
- parseSelect(const vespalib::string &) const override
- {
+ CachedSelect::SP parseSelect(const vespalib::string &) const override {
return CachedSelect::SP();
}
};
@@ -208,134 +204,104 @@ struct MyHandler : public IPersistenceHandler, IBucketFreezer {
setExistingTimestamp(ts);
}
void handle(FeedToken token, const Bucket &bucket, Timestamp timestamp, const DocumentId &docId) {
+ (void) token;
lastBucket = bucket;
lastTimestamp = timestamp;
lastDocId = docId;
- token.ack();
}
- virtual void initialize() override { initialized = true; }
+ void initialize() override { initialized = true; }
- virtual void handlePut(FeedToken token, const Bucket& bucket,
- Timestamp timestamp, const document::Document::SP& doc) override {
- token.setResult(ResultUP(new storage::spi::Result()), false);
+ void handlePut(FeedToken token, const Bucket& bucket,
+ Timestamp timestamp, const document::Document::SP& doc) override {
+ token->setResult(ResultUP(new storage::spi::Result()), false);
handle(token, bucket, timestamp, doc->getId());
}
- virtual void handleUpdate(FeedToken token, const Bucket& bucket,
- Timestamp timestamp, const document::DocumentUpdate::SP& upd) override {
- token.setResult(ResultUP(new storage::spi::UpdateResult(existingTimestamp)),
+ void handleUpdate(FeedToken token, const Bucket& bucket,
+ Timestamp timestamp, const document::DocumentUpdate::SP& upd) override {
+ token->setResult(ResultUP(new storage::spi::UpdateResult(existingTimestamp)),
existingTimestamp > 0);
handle(token, bucket, timestamp, upd->getId());
}
- virtual void handleRemove(FeedToken token, const Bucket& bucket,
- Timestamp timestamp, const DocumentId& id) override {
+ void handleRemove(FeedToken token, const Bucket& bucket,
+ Timestamp timestamp, const DocumentId& id) override {
bool wasFound = existingTimestamp > 0;
- token.setResult(ResultUP(new storage::spi::RemoveResult(wasFound)), wasFound);
+ token->setResult(ResultUP(new storage::spi::RemoveResult(wasFound)), wasFound);
handle(token, bucket, timestamp, id);
}
- virtual void handleListBuckets(IBucketIdListResultHandler &resultHandler) override {
+ void handleListBuckets(IBucketIdListResultHandler &resultHandler) override {
resultHandler.handle(BucketIdListResult(bucketList));
}
- virtual void handleSetClusterState(const ClusterState &calc,
- IGenericResultHandler &resultHandler) override {
+ void handleSetClusterState(const ClusterState &calc, IGenericResultHandler &resultHandler) override {
lastCalc = &calc;
resultHandler.handle(Result());
}
- virtual void handleSetActiveState(const Bucket &bucket,
- storage::spi::BucketInfo::ActiveState newState,
- IGenericResultHandler &resultHandler) override {
+ void handleSetActiveState(const Bucket &bucket, storage::spi::BucketInfo::ActiveState newState,
+ IGenericResultHandler &resultHandler) override {
lastBucket = bucket;
lastBucketState = newState;
resultHandler.handle(bucketStateResult);
}
- virtual void handleGetBucketInfo(const Bucket &,
- IBucketInfoResultHandler &resultHandler) override {
+ void handleGetBucketInfo(const Bucket &, IBucketInfoResultHandler &resultHandler) override {
resultHandler.handle(BucketInfoResult(bucketInfo));
}
- virtual void
- handleCreateBucket(FeedToken token,
- const storage::spi::Bucket &) override
- {
- token.setResult(ResultUP(new Result(_createBucketResult)), true);
- token.ack();
+ void handleCreateBucket(FeedToken token, const storage::spi::Bucket &) override {
+ token->setResult(ResultUP(new Result(_createBucketResult)), true);
}
- virtual void handleDeleteBucket(FeedToken token,
- const storage::spi::Bucket &) override {
- token.setResult(ResultUP(new Result(deleteBucketResult)), true);
- token.ack();
+ void handleDeleteBucket(FeedToken token, const storage::spi::Bucket &) override {
+ token->setResult(ResultUP(new Result(deleteBucketResult)), true);
}
- virtual void handleGetModifiedBuckets(IBucketIdListResultHandler &resultHandler) override {
+ void handleGetModifiedBuckets(IBucketIdListResultHandler &resultHandler) override {
resultHandler.handle(BucketIdListResult(modBucketList));
}
- virtual void
- handleSplit(FeedToken token,
- const storage::spi::Bucket &source,
- const storage::spi::Bucket &target1,
- const storage::spi::Bucket &target2) override
+ void handleSplit(FeedToken token, const storage::spi::Bucket &, const storage::spi::Bucket &,
+ const storage::spi::Bucket &) override
{
- (void) source;
- (void) target1;
- (void) target2;
- token.setResult(ResultUP(new Result(_splitResult)), true);
- token.ack();
- }
-
- virtual void
- handleJoin(FeedToken token,
- const storage::spi::Bucket &source1,
- const storage::spi::Bucket &source2,
- const storage::spi::Bucket &target) override
+ token->setResult(ResultUP(new Result(_splitResult)), true);
+ }
+
+ void handleJoin(FeedToken token, const storage::spi::Bucket &, const storage::spi::Bucket &,
+ const storage::spi::Bucket &) override
{
- (void) source1;
- (void) source2;
- (void) target;
- token.setResult(ResultUP(new Result(_joinResult)), true);
- token.ack();
+ token->setResult(ResultUP(new Result(_joinResult)), true);
}
- virtual RetrieversSP getDocumentRetrievers(storage::spi::ReadConsistency) override {
+ RetrieversSP getDocumentRetrievers(storage::spi::ReadConsistency) override {
RetrieversSP ret(new std::vector<IDocumentRetriever::SP>);
- ret->push_back(IDocumentRetriever::SP(new MyDocumentRetriever(
- 0, Timestamp(), lastDocId)));
- ret->push_back(IDocumentRetriever::SP(new MyDocumentRetriever(
- document, existingTimestamp, lastDocId)));
+ ret->push_back(IDocumentRetriever::SP(new MyDocumentRetriever(0, Timestamp(), lastDocId)));
+ ret->push_back(IDocumentRetriever::SP(new MyDocumentRetriever(document, existingTimestamp, lastDocId)));
return ret;
}
- virtual BucketGuard::UP lockBucket(const storage::spi::Bucket &b) override {
+ BucketGuard::UP lockBucket(const storage::spi::Bucket &b) override {
return BucketGuard::UP(new BucketGuard(b.getBucketId(), *this));
}
- virtual void
- handleListActiveBuckets(IBucketIdListResultHandler &resultHandler) override
- {
+ void handleListActiveBuckets(IBucketIdListResultHandler &resultHandler) override {
BucketIdListResult::List list;
resultHandler.handle(BucketIdListResult(list));
}
- virtual void
- handlePopulateActiveBuckets(document::BucketId::List &buckets,
- IGenericResultHandler &resultHandler) override
- {
+ void handlePopulateActiveBuckets(document::BucketId::List &buckets, IGenericResultHandler &resultHandler) override {
(void) buckets;
resultHandler.handle(Result());
}
- virtual void freezeBucket(BucketId bucket) override {
+ void freezeBucket(BucketId bucket) override {
frozen.insert(bucket.getId());
was_frozen.insert(bucket.getId());
}
- virtual void thawBucket(BucketId bucket) override {
+ void thawBucket(BucketId bucket) override {
std::multiset<uint64_t>::iterator it = frozen.find(bucket.getId());
ASSERT_TRUE(it != frozen.end());
frozen.erase(it);
@@ -425,11 +391,7 @@ HandlerSet::prepareGetModifiedBuckets()
class SimplePersistenceEngineOwner : public IPersistenceEngineOwner
{
- virtual void
- setClusterState(const storage::spi::ClusterState &calc) override
- {
- (void) calc;
- }
+ void setClusterState(const storage::spi::ClusterState &calc) override { (void) calc; }
};
struct SimpleResourceWriteFilter : public IResourceWriteFilter
diff --git a/searchcore/src/tests/proton/server/feedstates_test.cpp b/searchcore/src/tests/proton/server/feedstates_test.cpp
index 15c2fbe5a84..a6f3496e1ed 100644
--- a/searchcore/src/tests/proton/server/feedstates_test.cpp
+++ b/searchcore/src/tests/proton/server/feedstates_test.cpp
@@ -40,10 +40,8 @@ struct MyFeedView : public test::DummyFeedView {
MyFeedView();
~MyFeedView();
- virtual const DocumentTypeRepo::SP &getDocumentTypeRepo() const override
- { return repo_sp; }
- virtual void handleRemove(FeedToken *, const RemoveOperation &) override
- { ++remove_handled; }
+ const DocumentTypeRepo::SP &getDocumentTypeRepo() const override { return repo_sp; }
+ void handleRemove(FeedToken , const RemoveOperation &) override { ++remove_handled; }
};
MyFeedView::MyFeedView() : repo_sp(repo.getTypeRepoSp()), remove_handled(0) {}
diff --git a/searchcore/src/vespa/searchcore/config/proton.def b/searchcore/src/vespa/searchcore/config/proton.def
index ddb2711fac8..e31e578e1e0 100644
--- a/searchcore/src/vespa/searchcore/config/proton.def
+++ b/searchcore/src/vespa/searchcore/config/proton.def
@@ -256,7 +256,7 @@ summary.log.maxbucketspread double default=2.5
summary.log.minfilesizefactor double default=0.2
## Number of threads used for compressing incomming documents/compacting.
-## Deprecated. Use background.threads instead.
+## Deprecated. Use feeding.concurrency instead.
## TODO Remove
summary.log.numthreads int default=8 restart
@@ -378,11 +378,6 @@ visit.ignoremaxbytes bool default=true
## When set to 0 (default) we use 1 separate thread per document database.
initialize.threads int default = 0
-## Number of worker threads doing background compaction/compression tasks.
-## They all live i a shared thread pool.
-## When set to 0 (default), it will have enough threads to saturate half of the cores.
-background.threads int default=0
-
## Portion of enumstore address space that can be used before put and update
## portion of feed is blocked.
writefilter.attribute.enumstorelimit double default = 0.9
diff --git a/searchcore/src/vespa/searchcore/proton/common/feedtoken.cpp b/searchcore/src/vespa/searchcore/proton/common/feedtoken.cpp
index 008fafa332b..b6223dd41ab 100644
--- a/searchcore/src/vespa/searchcore/proton/common/feedtoken.cpp
+++ b/searchcore/src/vespa/searchcore/proton/common/feedtoken.cpp
@@ -2,50 +2,35 @@
#include "feedtoken.h"
-namespace proton {
+namespace proton::feedtoken {
-FeedToken::FeedToken(ITransport &transport) :
- _state(new State(transport, 1))
-{
-}
-
-FeedToken::State::State(ITransport & transport, uint32_t numAcksRequired) :
+State::State(ITransport & transport) :
_transport(transport),
_result(new storage::spi::Result()),
_documentWasFound(false),
- _unAckedCount(numAcksRequired)
+ _alreadySent(false)
{
- assert(_unAckedCount > 0);
}
-FeedToken::State::~State()
+State::~State()
{
- assert(_unAckedCount == 0);
+ ack();
}
void
-FeedToken::State::ack()
+State::ack()
{
- uint32_t prev(_unAckedCount--);
- if (prev == 1) {
+ bool alreadySent = _alreadySent.exchange(true);
+ if ( !alreadySent ) {
_transport.send(std::move(_result), _documentWasFound);
}
- assert(prev >= 1);
-}
-
-void
-FeedToken::State::incNeededAcks()
-{
- uint32_t prev(_unAckedCount++);
- assert(prev >= 1);
- (void) prev;
}
void
-FeedToken::State::fail()
+State::fail()
{
- uint32_t prev = _unAckedCount.exchange(0);
- if (prev > 0) {
+ bool alreadySent = _alreadySent.exchange(true);
+ if ( !alreadySent ) {
_transport.send(std::move(_result), _documentWasFound);
}
}
diff --git a/searchcore/src/vespa/searchcore/proton/common/feedtoken.h b/searchcore/src/vespa/searchcore/proton/common/feedtoken.h
index 856c8a22652..bc7e249ca3b 100644
--- a/searchcore/src/vespa/searchcore/proton/common/feedtoken.h
+++ b/searchcore/src/vespa/searchcore/proton/common/feedtoken.h
@@ -2,7 +2,7 @@
#pragma once
#include <vespa/persistence/spi/persistenceprovider.h>
-#include <vespa/vespalib/util/exception.h>
+#include <vespa/searchlib/common/idestructorcallback.h>
#include <vespa/vespalib/util/sync.h>
#include <atomic>
@@ -15,23 +15,19 @@ typedef std::unique_ptr<storage::spi::Result> ResultUP;
* for an IFeedHandler to perform an async reply to an operation. A unique
* instance of this class is passed to every invokation of the IFeedHandler.
*/
-class FeedToken {
-public:
+namespace feedtoken {
class ITransport {
public:
virtual ~ITransport() { }
virtual void send(ResultUP result, bool documentWasFound) = 0;
};
-private:
- class State {
+ class State : public search::IDestructorCallback {
public:
State(const State &) = delete;
State & operator = (const State &) = delete;
- State(ITransport & transport, uint32_t numAcksRequired);
- ~State();
- void incNeededAcks();
- void ack();
+ State(ITransport & transport);
+ ~State() override;
void fail();
void setResult(ResultUP result, bool documentWasFound) {
_documentWasFound = documentWasFound;
@@ -39,68 +35,20 @@ private:
}
const storage::spi::Result &getResult() { return *_result; }
private:
+ void ack();
ITransport &_transport;
ResultUP _result;
bool _documentWasFound;
- std::atomic<uint32_t> _unAckedCount;
+ std::atomic<bool> _alreadySent;
};
- std::shared_ptr<State> _state;
-
-public:
- typedef std::unique_ptr<FeedToken> UP;
- typedef std::shared_ptr<FeedToken> SP;
-
- /**
- * Constructs a unique FeedToken. This is the constructor used by the
- * FeedEngine when processing input. If the given message is empty, or it
- * does not belong to the document protocol, this method throws a
- * vespalib::IllegalArgumentException.
- *
- * @param transport The transport to pass the reply to.
- */
- FeedToken(ITransport &transport);
-
- FeedToken(FeedToken &&) = default;
- FeedToken & operator =(FeedToken &&) = default;
- FeedToken(const FeedToken &) = default;
- FeedToken & operator =(const FeedToken &) = default;
- ~FeedToken() = default;
- /**
- * Passes a receipt back to the originating FeedEngine, declaring that this
- * operation succeeded. If an error occured while processing the operation,
- * use fail() instead. Invoking this and/or fail() more than once is void.
- */
- void ack() const { _state->ack(); }
-
- void incNeededAcks() const {
- _state->incNeededAcks();
+ inline std::shared_ptr<State>
+ make(ITransport & latch) {
+ return std::make_shared<State>(latch);
}
+}
- /**
- * Passes a receipt back to the originating FeedEngine, declaring that this
- * operation failed for some reason. Invoking this and/or fail() more than
- * once is void.
- *
- * @param errNum A numerical representation of the error.
- * @param errMsg A readable string detailing the error.
- */
- void fail() const { _state->fail(); }
-
- /**
- * Gives you access to the underlying result.
- *
- * @return The result
- */
- const storage::spi::Result &getResult() const { return _state->getResult(); }
-
- /**
- * Sets the underlying result.
- */
- void setResult(ResultUP result, bool documentWasFound) {
- _state->setResult(std::move(result), documentWasFound);
- }
-};
+using FeedToken = std::shared_ptr<feedtoken::State>;
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp
index feebbf4cf2a..6cdec1ec0f9 100644
--- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp
+++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp
@@ -3,10 +3,6 @@
#include "persistenceengine.h"
#include "ipersistenceengineowner.h"
#include "transport_latch.h"
-#include <vespa/documentapi/messagebus/documentprotocol.h>
-#include <vespa/documentapi/messagebus/messages/feedreply.h>
-#include <vespa/documentapi/messagebus/messages/removedocumentreply.h>
-#include <vespa/documentapi/messagebus/messages/updatedocumentreply.h>
#include <vespa/vespalib/stllike/hash_set.h>
#include <vespa/fastos/thread.h>
@@ -15,9 +11,6 @@ LOG_SETUP(".proton.persistenceengine.persistenceengine");
using document::Document;
using document::DocumentId;
-using documentapi::DocumentReply;
-using documentapi::RemoveDocumentReply;
-using mbus::Reply;
using storage::spi::BucketChecksum;
using storage::spi::BucketIdListResult;
using storage::spi::BucketInfo;
@@ -49,7 +42,7 @@ ResultHandlerBase::ResultHandlerBase(uint32_t waitCnt)
: _lock(),
_latch(waitCnt)
{}
-ResultHandlerBase::~ResultHandlerBase() { }
+ResultHandlerBase::~ResultHandlerBase() = default;
class GenericResultHandler : public ResultHandlerBase, public IGenericResultHandler {
private:
@@ -74,7 +67,7 @@ public:
const Result &getResult() const { return _result; }
};
-GenericResultHandler::~GenericResultHandler() {}
+GenericResultHandler::~GenericResultHandler() = default;
class BucketIdListResultHandler : public IBucketIdListResultHandler
{
@@ -85,8 +78,8 @@ public:
BucketIdListResultHandler()
: _bucketSet()
{ }
- ~BucketIdListResultHandler();
- virtual void handle(const BucketIdListResult &result) override {
+ ~BucketIdListResultHandler() override;
+ void handle(const BucketIdListResult &result) override {
const BucketIdListResult::List &buckets = result.getList();
for (size_t i = 0; i < buckets.size(); ++i) {
_bucketSet.insert(buckets[i]);
@@ -103,7 +96,7 @@ public:
};
-BucketIdListResultHandler::~BucketIdListResultHandler() {}
+BucketIdListResultHandler::~BucketIdListResultHandler() = default;
class SynchronizedBucketIdListResultHandler : public ResultHandlerBase,
public BucketIdListResultHandler
@@ -113,8 +106,8 @@ public:
: ResultHandlerBase(waitCnt),
BucketIdListResultHandler()
{ }
- ~SynchronizedBucketIdListResultHandler();
- virtual void handle(const BucketIdListResult &result) override {
+ ~SynchronizedBucketIdListResultHandler() override;
+ void handle(const BucketIdListResult &result) override {
{
vespalib::LockGuard guard(_lock);
BucketIdListResultHandler::handle(result);
@@ -123,7 +116,7 @@ public:
}
};
-SynchronizedBucketIdListResultHandler::~SynchronizedBucketIdListResultHandler() {}
+SynchronizedBucketIdListResultHandler::~SynchronizedBucketIdListResultHandler() = default;
class BucketInfoResultHandler : public IBucketInfoResultHandler {
private:
@@ -135,8 +128,8 @@ public:
_first(true)
{
}
- ~BucketInfoResultHandler();
- virtual void handle(const BucketInfoResult &result) override {
+ ~BucketInfoResultHandler() override;
+ void handle(const BucketInfoResult &result) override {
if (_first) {
_result = result;
_first = false;
@@ -161,12 +154,10 @@ public:
const BucketInfoResult &getResult() const { return _result; }
};
-BucketInfoResultHandler::~BucketInfoResultHandler() {}
+BucketInfoResultHandler::~BucketInfoResultHandler() = default;
}
-#define NOT_YET throw vespalib::IllegalArgumentException("Not implemented yet")
-
PersistenceEngine::HandlerSnapshot::UP
PersistenceEngine::getHandlerSnapshot() const
{
@@ -343,26 +334,19 @@ PersistenceEngine::put(const Bucket& b, Timestamp t, const document::Document::S
}
std::shared_lock<std::shared_timed_mutex> rguard(_rwMutex);
DocTypeName docType(doc->getType());
- LOG(spam,
- "put(%s, %" PRIu64 ", (\"%s\", \"%s\"))",
- b.toString().c_str(),
- static_cast<uint64_t>(t.getValue()),
- docType.toString().c_str(),
- doc->getId().toString().c_str());
+ LOG(spam, "put(%s, %" PRIu64 ", (\"%s\", \"%s\"))", b.toString().c_str(), static_cast<uint64_t>(t.getValue()),
+ docType.toString().c_str(), doc->getId().toString().c_str());
if (!doc->getId().hasDocType()) {
- return Result(Result::PERMANENT_ERROR, make_string(
- "Old id scheme not supported in elastic mode (%s)",
- doc->getId().toString().c_str()));
+ return Result(Result::PERMANENT_ERROR,
+ make_string("Old id scheme not supported in elastic mode (%s)", doc->getId().toString().c_str()));
}
IPersistenceHandler::SP handler = getHandler(b.getBucketSpace(), docType);
- if (handler.get() == NULL) {
+ if (!handler) {
return Result(Result::PERMANENT_ERROR,
- make_string("No handler for document type '%s'",
- docType.toString().c_str()));
+ make_string("No handler for document type '%s'", docType.toString().c_str()));
}
TransportLatch latch(1);
- FeedToken token(latch);
- handler->handlePut(token, b, t, doc);
+ handler->handlePut(feedtoken::make(latch), b, t, doc);
latch.await();
return latch.getResult();
}
@@ -371,20 +355,16 @@ PersistenceEngine::RemoveResult
PersistenceEngine::remove(const Bucket& b, Timestamp t, const DocumentId& did, Context&)
{
std::shared_lock<std::shared_timed_mutex> rguard(_rwMutex);
- LOG(spam,
- "remove(%s, %" PRIu64 ", \"%s\")",
- b.toString().c_str(),
- static_cast<uint64_t>(t.getValue()),
- did.toString().c_str());
+ LOG(spam, "remove(%s, %" PRIu64 ", \"%s\")", b.toString().c_str(),
+ static_cast<uint64_t>(t.getValue()), did.toString().c_str());
HandlerSnapshot::UP snap = getHandlerSnapshot(b.getBucketSpace(), did);
- if (!snap.get()) {
+ if (!snap) {
return RemoveResult(false);
}
TransportLatch latch(snap->size());
for (; snap->handlers().valid(); snap->handlers().next()) {
IPersistenceHandler *handler = snap->handlers().get();
- FeedToken token(latch);
- handler->handleRemove(token, b, t, did);
+ handler->handleRemove(feedtoken::make(latch), b, t, did);
}
latch.await();
return latch.getRemoveResult();
@@ -404,24 +384,20 @@ PersistenceEngine::update(const Bucket& b, Timestamp t, const DocumentUpdate::SP
}
std::shared_lock<std::shared_timed_mutex> rguard(_rwMutex);
DocTypeName docType(upd->getType());
- LOG(spam,
- "update(%s, %" PRIu64 ", (\"%s\", \"%s\"), createIfNonExistent='%s')",
- b.toString().c_str(),
- static_cast<uint64_t>(t.getValue()),
- docType.toString().c_str(),
- upd->getId().toString().c_str(),
- (upd->getCreateIfNonExistent() ? "true" : "false"));
+ LOG(spam, "update(%s, %" PRIu64 ", (\"%s\", \"%s\"), createIfNonExistent='%s')",
+ b.toString().c_str(), static_cast<uint64_t>(t.getValue()), docType.toString().c_str(),
+ upd->getId().toString().c_str(), (upd->getCreateIfNonExistent() ? "true" : "false"));
IPersistenceHandler::SP handler = getHandler(b.getBucketSpace(), docType);
- TransportLatch latch(1);
- if (handler.get() != NULL) {
- FeedToken token(latch);
+
+ if (handler) {
+ TransportLatch latch(1);
LOG(debug, "update = %s", upd->toXml().c_str());
- handler->handleUpdate(token, b, t, upd);
+ handler->handleUpdate(feedtoken::make(latch), b, t, upd);
latch.await();
+ return latch.getUpdateResult();
} else {
return UpdateResult(Result::PERMANENT_ERROR, make_string("No handler for document type '%s'", docType.toString().c_str()));
}
- return latch.getUpdateResult();
}
@@ -486,7 +462,7 @@ PersistenceEngine::iterate(IteratorId id, uint64_t maxByteSize, Context&) const
{
std::shared_lock<std::shared_timed_mutex> rguard(_rwMutex);
LockGuard guard(_iterators_lock);
- Iterators::const_iterator it = _iterators.find(id);
+ auto it = _iterators.find(id);
if (it == _iterators.end()) {
return IterateResult(Result::PERMANENT_ERROR, make_string("Unknown iterator with id %" PRIu64, id.getValue()));
}
@@ -517,7 +493,7 @@ PersistenceEngine::destroyIterator(IteratorId id, Context&)
{
std::shared_lock<std::shared_timed_mutex> rguard(_rwMutex);
LockGuard guard(_iterators_lock);
- Iterators::iterator it = _iterators.find(id);
+ auto it = _iterators.find(id);
if (it == _iterators.end()) {
return Result();
}
@@ -539,8 +515,7 @@ PersistenceEngine::createBucket(const Bucket &b, Context &)
TransportLatch latch(snap->size());
for (; snap->handlers().valid(); snap->handlers().next()) {
IPersistenceHandler *handler = snap->handlers().get();
- FeedToken token(latch);
- handler->handleCreateBucket(token, b);
+ handler->handleCreateBucket(feedtoken::make(latch), b);
}
latch.await();
return latch.getResult();
@@ -556,8 +531,7 @@ PersistenceEngine::deleteBucket(const Bucket& b, Context&)
TransportLatch latch(snap->size());
for (; snap->handlers().valid(); snap->handlers().next()) {
IPersistenceHandler *handler = snap->handlers().get();
- FeedToken token(latch);
- handler->handleDeleteBucket(token, b);
+ handler->handleDeleteBucket(feedtoken::make(latch), b);
}
latch.await();
return latch.getResult();
@@ -599,8 +573,7 @@ PersistenceEngine::split(const Bucket& source, const Bucket& target1, const Buck
TransportLatch latch(snap->size());
for (; snap->handlers().valid(); snap->handlers().next()) {
IPersistenceHandler *handler = snap->handlers().get();
- FeedToken token(latch);
- handler->handleSplit(token, source, target1, target2);
+ handler->handleSplit(feedtoken::make(latch), source, target1, target2);
}
latch.await();
return latch.getResult();
@@ -618,8 +591,7 @@ PersistenceEngine::join(const Bucket& source1, const Bucket& source2, const Buck
TransportLatch latch(snap->size());
for (; snap->handlers().valid(); snap->handlers().next()) {
IPersistenceHandler *handler = snap->handlers().get();
- FeedToken token(latch);
- handler->handleJoin(token, source1, source2, target);
+ handler->handleJoin(feedtoken::make(latch), source1, source2, target);
}
latch.await();
return latch.getResult();
@@ -676,7 +648,7 @@ void
PersistenceEngine::propagateSavedClusterState(IPersistenceHandler &handler)
{
ClusterState::SP clusterState(savedClusterState());
- if (clusterState.get() == NULL)
+ if (!clusterState)
return;
// Propagate saved cluster state.
// TODO: Fix race with new cluster state setting.
@@ -699,13 +671,13 @@ PersistenceEngine::grabExtraModifiedBuckets(BucketSpace bucketSpace, IPersistenc
class ActiveBucketIdListResultHandler : public IBucketIdListResultHandler
{
private:
- typedef std::map<document::BucketId, size_t> BucketIdMap;
- typedef std::pair<BucketIdMap::iterator, bool> IR;
+ using BucketIdMap = std::map<document::BucketId, size_t>;
+ using IR = std::pair<BucketIdMap::iterator, bool>;
BucketIdMap _bucketMap;
public:
ActiveBucketIdListResultHandler() : _bucketMap() { }
- virtual void handle(const BucketIdListResult &result) override {
+ void handle(const BucketIdListResult &result) override {
const BucketIdListResult::List &buckets = result.getList();
for (size_t i = 0; i < buckets.size(); ++i) {
IR ir(_bucketMap.insert(std::make_pair(buckets[i], 1u)));
diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.cpp b/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.cpp
index e0d512ae6e0..12ba5e7ab29 100644
--- a/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.cpp
+++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.cpp
@@ -3,6 +3,7 @@
#include "transport_latch.h"
#include <vespa/vespalib/util/stringfmt.h>
+using vespalib::make_string;
using storage::spi::Result;
namespace proton {
@@ -13,7 +14,7 @@ TransportLatch::TransportLatch(uint32_t cnt)
_result()
{}
-TransportLatch::~TransportLatch() {}
+TransportLatch::~TransportLatch() = default;
void
TransportLatch::send(ResultUP result, bool documentWasFound)
@@ -35,7 +36,7 @@ Result
TransportLatch::mergeErrorResults(const Result &lhs, const Result &rhs)
{
Result::ErrorType error = (lhs.getErrorCode() > rhs.getErrorCode() ? lhs : rhs).getErrorCode();
- return Result(error, vespalib::make_string("%s, %s", lhs.getErrorMessage().c_str(), rhs.getErrorMessage().c_str()));
+ return Result(error, make_string("%s, %s", lhs.getErrorMessage().c_str(), rhs.getErrorMessage().c_str()));
}
} // proton
diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.h b/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.h
index 12f92722dfa..10dae553a80 100644
--- a/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.h
+++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.h
@@ -11,8 +11,11 @@ namespace proton {
* Implementation of FeedToken::ITransport for handling the async reply for an operation.
* Uses an internal count down latch to keep track the number of outstanding replies.
*/
-class TransportLatch : public FeedToken::ITransport {
+class TransportLatch : public feedtoken::ITransport {
private:
+ using Result = storage::spi::Result;
+ using UpdateResult = storage::spi::UpdateResult;
+ using RemoveResult = storage::spi::RemoveResult;
vespalib::CountDownLatch _latch;
vespalib::Lock _lock;
ResultUP _result;
@@ -24,17 +27,16 @@ public:
void await() {
_latch.await();
}
- const storage::spi::UpdateResult &getUpdateResult() const {
- return dynamic_cast<const storage::spi::UpdateResult &>(*_result);
+ const UpdateResult &getUpdateResult() const {
+ return dynamic_cast<const UpdateResult &>(*_result);
}
- const storage::spi::Result &getResult() const {
+ const Result &getResult() const {
return *_result;
}
- const storage::spi::RemoveResult &getRemoveResult() const {
- return dynamic_cast<const storage::spi::RemoveResult &>(*_result);
+ const RemoveResult &getRemoveResult() const {
+ return dynamic_cast<const RemoveResult &>(*_result);
}
- static storage::spi::Result mergeErrorResults(const storage::spi::Result &lhs,
- const storage::spi::Result &rhs);
+ static Result mergeErrorResults(const Result &lhs, const Result &rhs);
};
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.cpp b/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.cpp
index 30ee6e1ba75..106ce846d99 100644
--- a/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.cpp
@@ -117,19 +117,16 @@ CombiningFeedView::preparePut(PutOperation &putOp)
}
void
-CombiningFeedView::handlePut(FeedToken *token,
- const PutOperation &putOp)
+CombiningFeedView::handlePut(FeedToken token, const PutOperation &putOp)
{
assert(putOp.getValidDbdId());
uint32_t subDbId = putOp.getSubDbId();
uint32_t prevSubDbId = putOp.getPrevSubDbId();
if (putOp.getValidPrevDbdId() && prevSubDbId != subDbId) {
- if (token != NULL)
- token->incNeededAcks();
_views[subDbId]->handlePut(token, putOp);
- _views[prevSubDbId]->handlePut(token, putOp);
+ _views[prevSubDbId]->handlePut(std::move(token), putOp);
} else {
- _views[subDbId]->handlePut(token, putOp);
+ _views[subDbId]->handlePut(std::move(token), putOp);
}
}
@@ -143,14 +140,13 @@ CombiningFeedView::prepareUpdate(UpdateOperation &updOp)
}
void
-CombiningFeedView::handleUpdate(FeedToken *token,
- const UpdateOperation &updOp)
+CombiningFeedView::handleUpdate(FeedToken token, const UpdateOperation &updOp)
{
assert(updOp.getValidDbdId());
assert(updOp.getValidPrevDbdId());
assert(!updOp.changedDbdId());
uint32_t subDbId(updOp.getSubDbId());
- _views[subDbId]->handleUpdate(token, updOp);
+ _views[subDbId]->handleUpdate(std::move(token), updOp);
}
void
@@ -165,19 +161,16 @@ CombiningFeedView::prepareRemove(RemoveOperation &rmOp)
}
void
-CombiningFeedView::handleRemove(FeedToken *token,
- const RemoveOperation &rmOp)
+CombiningFeedView::handleRemove(FeedToken token, const RemoveOperation &rmOp)
{
if (rmOp.getValidDbdId()) {
uint32_t subDbId = rmOp.getSubDbId();
uint32_t prevSubDbId = rmOp.getPrevSubDbId();
if (rmOp.getValidPrevDbdId() && prevSubDbId != subDbId) {
- if (token != NULL)
- token->incNeededAcks();
_views[subDbId]->handleRemove(token, rmOp);
- _views[prevSubDbId]->handleRemove(token, rmOp);
+ _views[prevSubDbId]->handleRemove(std::move(token), rmOp);
} else {
- _views[subDbId]->handleRemove(token, rmOp);
+ _views[subDbId]->handleRemove(std::move(token), rmOp);
}
} else {
assert(rmOp.getValidPrevDbdId());
diff --git a/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.h b/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.h
index ea4ac64176a..284004dfa81 100644
--- a/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.h
+++ b/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.h
@@ -16,8 +16,7 @@
#include "replaypacketdispatcher.h"
#include "ibucketstatecalculator.h"
-namespace proton
-{
+namespace proton {
class CombiningFeedView : public IFeedView
@@ -63,7 +62,7 @@ public:
document::BucketSpace bucketSpace,
const IBucketStateCalculator::SP &calc);
- virtual ~CombiningFeedView();
+ ~CombiningFeedView() override;
const document::DocumentTypeRepo::SP & getDocumentTypeRepo() const override;
@@ -72,11 +71,11 @@ public:
*/
void preparePut(PutOperation &putOp) override;
- void handlePut(FeedToken *token, const PutOperation &putOp) override;
+ void handlePut(FeedToken token, const PutOperation &putOp) override;
void prepareUpdate(UpdateOperation &updOp) override;
- void handleUpdate(FeedToken *token, const UpdateOperation &updOp) override;
+ void handleUpdate(FeedToken token, const UpdateOperation &updOp) override;
void prepareRemove(RemoveOperation &rmOp) override;
- void handleRemove(FeedToken *token, const RemoveOperation &rmOp) override;
+ void handleRemove(FeedToken token, const RemoveOperation &rmOp) override;
void prepareDeleteBucket(DeleteBucketOperation &delOp) override;
void handleDeleteBucket(const DeleteBucketOperation &delOp) override;
void prepareMove(MoveOperation &putOp) override;
diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
index 9300af5c3f0..b01ba43cb49 100644
--- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
@@ -8,11 +8,6 @@
#include "tlcproxy.h"
#include "configstore.h"
#include <vespa/document/datatype/documenttype.h>
-#include <vespa/documentapi/messagebus/documentprotocol.h>
-#include <vespa/documentapi/messagebus/messages/documentreply.h>
-#include <vespa/documentapi/messagebus/messages/feedreply.h>
-#include <vespa/documentapi/messagebus/messages/removedocumentreply.h>
-#include <vespa/documentapi/messagebus/messages/updatedocumentreply.h>
#include <vespa/searchcore/proton/bucketdb/ibucketdbhandler.h>
#include <vespa/searchcore/proton/persistenceengine/i_resource_write_filter.h>
#include <vespa/searchcore/proton/persistenceengine/transport_latch.h>
@@ -27,11 +22,6 @@ LOG_SETUP(".proton.server.feedhandler");
using document::BucketId;
using document::Document;
using document::DocumentTypeRepo;
-using documentapi::DocumentProtocol;
-using documentapi::DocumentReply;
-using documentapi::FeedReply;
-using documentapi::RemoveDocumentReply;
-using documentapi::UpdateDocumentReply;
using storage::spi::PartitionId;
using storage::spi::RemoveResult;
using storage::spi::Result;
@@ -90,31 +80,30 @@ FeedHandler::doHandleOperation(FeedToken token, FeedOperation::UP op)
{
assert(_writeService.master().isCurrentThread());
LockGuard guard(_feedLock);
- _feedState->handleOperation(token, std::move(op));
+ _feedState->handleOperation(std::move(token), std::move(op));
}
-void FeedHandler::performPut(FeedToken::UP token, PutOperation &op) {
+void FeedHandler::performPut(FeedToken token, PutOperation &op) {
op.assertValid();
_activeFeedView->preparePut(op);
if (ignoreOperation(op)) {
LOG(debug, "performPut(): ignoreOperation: docId(%s), timestamp(%" PRIu64 "), prevTimestamp(%" PRIu64 ")",
op.getDocument()->getId().toString().c_str(), (uint64_t)op.getTimestamp(), (uint64_t)op.getPrevTimestamp());
if (token) {
- token->setResult(ResultUP(new Result), false);
- token->ack();
+ token->setResult(std::make_unique<Result>(), false);
}
return;
}
storeOperation(op);
if (token) {
- token->setResult(ResultUP(new Result), false);
+ token->setResult(std::make_unique<Result>(), false);
}
- _activeFeedView->handlePut(token.get(), op);
+ _activeFeedView->handlePut(std::move(token), op);
}
void
-FeedHandler::performUpdate(FeedToken::UP token, UpdateOperation &op)
+FeedHandler::performUpdate(FeedToken token, UpdateOperation &op)
{
_activeFeedView->prepareUpdate(op);
if (op.getPrevDbDocumentId().valid() && !op.getPrevMarkedAsRemoved()) {
@@ -123,26 +112,25 @@ FeedHandler::performUpdate(FeedToken::UP token, UpdateOperation &op)
createNonExistingDocument(std::move(token), op);
} else {
if (token) {
- token->setResult(ResultUP(new UpdateResult(Timestamp(0))), false);
- token->ack();
+ token->setResult(std::make_unique<UpdateResult>(Timestamp(0)), false);
}
}
}
void
-FeedHandler::performInternalUpdate(FeedToken::UP token, UpdateOperation &op)
+FeedHandler::performInternalUpdate(FeedToken token, UpdateOperation &op)
{
storeOperation(op);
if (token) {
token->setResult(ResultUP(new UpdateResult(op.getPrevTimestamp())), true);
}
- _activeFeedView->handleUpdate(token.get(), op);
+ _activeFeedView->handleUpdate(std::move(token), op);
}
void
-FeedHandler::createNonExistingDocument(FeedToken::UP token, const UpdateOperation &op)
+FeedHandler::createNonExistingDocument(FeedToken token, const UpdateOperation &op)
{
Document::SP doc(new Document(op.getUpdate()->getType(), op.getUpdate()->getId()));
doc->setRepo(*_activeFeedView->getDocumentTypeRepo());
@@ -154,23 +142,18 @@ FeedHandler::createNonExistingDocument(FeedToken::UP token, const UpdateOperatio
token->setResult(ResultUP(new UpdateResult(putOp.getTimestamp())), true);
}
TransportLatch latch(1);
- FeedToken putToken(latch);
- _activeFeedView->handlePut(&putToken, putOp);
+ _activeFeedView->handlePut(feedtoken::make(latch), putOp);
latch.await();
- if (token) {
- token->ack();
- }
}
-void FeedHandler::performRemove(FeedToken::UP token, RemoveOperation &op) {
+void FeedHandler::performRemove(FeedToken token, RemoveOperation &op) {
_activeFeedView->prepareRemove(op);
if (ignoreOperation(op)) {
LOG(debug, "performRemove(): ignoreOperation: docId(%s), timestamp(%" PRIu64 "), prevTimestamp(%" PRIu64 ")",
op.getDocumentId().toString().c_str(), (uint64_t)op.getTimestamp(), (uint64_t)op.getPrevTimestamp());
if (token) {
token->setResult(ResultUP(new RemoveResult(false)), false);
- token->ack();
}
return;
}
@@ -182,70 +165,60 @@ void FeedHandler::performRemove(FeedToken::UP token, RemoveOperation &op) {
bool documentWasFound = !op.getPrevMarkedAsRemoved();
token->setResult(ResultUP(new RemoveResult(documentWasFound)), documentWasFound);
}
- _activeFeedView->handleRemove(token.get(), op);
+ _activeFeedView->handleRemove(std::move(token), op);
} else if (op.hasDocType()) {
assert(op.getDocType() == _docTypeName.getName());
storeOperation(op);
if (token) {
token->setResult(ResultUP(new RemoveResult(false)), false);
}
- _activeFeedView->handleRemove(token.get(), op);
+ _activeFeedView->handleRemove(std::move(token), op);
} else {
if (token) {
token->setResult(ResultUP(new RemoveResult(false)), false);
- token->ack();
}
}
}
void
-FeedHandler::performGarbageCollect(FeedToken::UP token)
+FeedHandler::performGarbageCollect(FeedToken token)
{
- if (token) {
- token->ack();
- }
+ (void) token;
}
void
-FeedHandler::performCreateBucket(FeedToken::UP token, CreateBucketOperation &op)
+FeedHandler::performCreateBucket(FeedToken token, CreateBucketOperation &op)
{
+ (void) token;
storeOperation(op);
_bucketDBHandler->handleCreateBucket(op.getBucketId());
- if (token) {
- token->ack();
- }
}
-void FeedHandler::performDeleteBucket(FeedToken::UP token, DeleteBucketOperation &op) {
+void FeedHandler::performDeleteBucket(FeedToken token, DeleteBucketOperation &op) {
+ (void) token;
_activeFeedView->prepareDeleteBucket(op);
storeOperation(op);
// Delete documents in bucket
_activeFeedView->handleDeleteBucket(op);
// Delete bucket itself, should no longer have documents.
_bucketDBHandler->handleDeleteBucket(op.getBucketId());
- if (token) {
- token->ack();
- }
+
}
-void FeedHandler::performSplit(FeedToken::UP token, SplitBucketOperation &op) {
+void FeedHandler::performSplit(FeedToken token, SplitBucketOperation &op) {
+ (void) token;
storeOperation(op);
_bucketDBHandler->handleSplit(op.getSerialNum(), op.getSource(), op.getTarget1(), op.getTarget2());
- if (token) {
- token->ack();
- }
}
-void FeedHandler::performJoin(FeedToken::UP token, JoinBucketsOperation &op) {
+void FeedHandler::performJoin(FeedToken token, JoinBucketsOperation &op) {
+ (void) token;
storeOperation(op);
_bucketDBHandler->handleJoin(op.getSerialNum(), op.getSource1(), op.getSource2(), op.getTarget());
- if (token) {
- token->ack();
- }
}
@@ -328,7 +301,7 @@ void
FeedHandler::changeFeedState(FeedState::SP newState)
{
LockGuard guard(_feedLock);
- changeFeedState(newState, guard);
+ changeFeedState(std::move(newState), guard);
}
@@ -466,8 +439,8 @@ isRejectableFeedOperation(FeedOperation::Type type)
}
template <typename ResultType>
-void feedOperationRejected(FeedToken *token, const vespalib::string &opType, const vespalib::string &docId,
- DocTypeName docTypeName, const vespalib::string &rejectMessage)
+void feedOperationRejected(FeedToken & token, const vespalib::string &opType, const vespalib::string &docId,
+ const DocTypeName & docTypeName, const vespalib::string &rejectMessage)
{
if (token) {
auto message = make_string("%s operation rejected for document '%s' of type '%s': '%s'",
@@ -478,8 +451,8 @@ void feedOperationRejected(FeedToken *token, const vespalib::string &opType, con
}
void
-notifyFeedOperationRejected(FeedToken *token, const FeedOperation &op,
- DocTypeName docTypeName, const vespalib::string &rejectMessage)
+notifyFeedOperationRejected(FeedToken & token, const FeedOperation &op,
+ const DocTypeName & docTypeName, const vespalib::string &rejectMessage)
{
if ((op.getType() == FeedOperation::UPDATE_42) || (op.getType() == FeedOperation::UPDATE)) {
vespalib::string docId = (static_cast<const UpdateOperation &>(op)).getUpdate()->getId().toString();
@@ -495,7 +468,7 @@ notifyFeedOperationRejected(FeedToken *token, const FeedOperation &op,
}
bool
-FeedHandler::considerWriteOperationForRejection(FeedToken *token, const FeedOperation &op)
+FeedHandler::considerWriteOperationForRejection(FeedToken & token, const FeedOperation &op)
{
if (!_writeFilter.acceptWriteOperation() && isRejectableFeedOperation(op.getType())) {
IResourceWriteFilter::State state = _writeFilter.getAcceptState();
@@ -508,9 +481,9 @@ FeedHandler::considerWriteOperationForRejection(FeedToken *token, const FeedOper
}
void
-FeedHandler::performOperation(FeedToken::UP token, FeedOperation::UP op)
+FeedHandler::performOperation(FeedToken token, FeedOperation::UP op)
{
- if (considerWriteOperationForRejection(token.get(), *op)) {
+ if (considerWriteOperationForRejection(token, *op)) {
return;
}
switch(op->getType()) {
diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h
index dc955cfeb79..d717346883a 100644
--- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h
+++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h
@@ -11,6 +11,7 @@
#include "transactionlogmanager.h"
#include <persistence/spi/types.h>
#include <vespa/searchcore/proton/common/doctypename.h>
+#include <vespa/searchcore/proton/common/feedtoken.h>
#include <vespa/searchlib/transactionlog/translogclient.h>
namespace searchcorespi { namespace index { class IThreadingService; } }
@@ -22,7 +23,6 @@ class DDBState;
class DeleteBucketOperation;
class FeedConfigStore;
class FeedState;
-class FeedToken;
class IDocumentDBOwner;
class IFeedHandlerOwner;
class IFeedView;
@@ -54,7 +54,6 @@ private:
typedef storage::spi::Timestamp Timestamp;
typedef document::BucketId BucketId;
using FeedStateSP = std::shared_ptr<FeedState>;
- using FeedTokenUP = std::unique_ptr<FeedToken>;
using FeedOperationUP = std::unique_ptr<FeedOperation>;
class TlsMgrWriter : public TlsWriter {
@@ -101,24 +100,24 @@ private:
*/
void doHandleOperation(FeedToken token, FeedOperationUP op);
- bool considerWriteOperationForRejection(FeedToken *token, const FeedOperation &op);
+ bool considerWriteOperationForRejection(FeedToken & token, const FeedOperation &op);
/**
* Delayed execution of feed operations against feed view, in
* master write thread.
*/
- void performPut(FeedTokenUP token, PutOperation &op);
-
- void performUpdate(FeedTokenUP token, UpdateOperation &op);
- void performInternalUpdate(FeedTokenUP token, UpdateOperation &op);
- void createNonExistingDocument(FeedTokenUP, const UpdateOperation &op);
-
- void performRemove(FeedTokenUP token, RemoveOperation &op);
- void performGarbageCollect(FeedTokenUP token);
- void performCreateBucket(FeedTokenUP token, CreateBucketOperation &op);
- void performDeleteBucket(FeedTokenUP token, DeleteBucketOperation &op);
- void performSplit(FeedTokenUP token, SplitBucketOperation &op);
- void performJoin(FeedTokenUP token, JoinBucketsOperation &op);
+ void performPut(FeedToken token, PutOperation &op);
+
+ void performUpdate(FeedToken token, UpdateOperation &op);
+ void performInternalUpdate(FeedToken token, UpdateOperation &op);
+ void createNonExistingDocument(FeedToken, const UpdateOperation &op);
+
+ void performRemove(FeedToken token, RemoveOperation &op);
+ void performGarbageCollect(FeedToken token);
+ void performCreateBucket(FeedToken token, CreateBucketOperation &op);
+ void performDeleteBucket(FeedToken token, DeleteBucketOperation &op);
+ void performSplit(FeedToken token, SplitBucketOperation &op);
+ void performJoin(FeedToken token, JoinBucketsOperation &op);
void performSync();
void performEof();
@@ -223,7 +222,7 @@ public:
vespalib::string getDocTypeName() const { return _docTypeName.getName(); }
void tlsPrune(SerialNum oldest_to_keep);
- void performOperation(FeedTokenUP token, FeedOperationUP op);
+ void performOperation(FeedToken token, FeedOperationUP op);
void handleOperation(FeedToken token, FeedOperationUP op);
void handleMove(MoveOperation &op, std::shared_ptr<search::IDestructorCallback> moveDoneCtx) override;
@@ -240,4 +239,3 @@ public:
};
} // namespace proton
-
diff --git a/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp b/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp
index 6219cadd24f..91b310e3925 100644
--- a/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp
@@ -82,13 +82,13 @@ public:
}
virtual void replay(const PutOperation &op) override {
- _feed_view_ptr->handlePut(NULL, op);
+ _feed_view_ptr->handlePut(FeedToken(), op);
}
virtual void replay(const RemoveOperation &op) override {
- _feed_view_ptr->handleRemove(NULL, op);
+ _feed_view_ptr->handleRemove(FeedToken(), op);
}
virtual void replay(const UpdateOperation &op) override {
- _feed_view_ptr->handleUpdate(NULL, op);
+ _feed_view_ptr->handleUpdate(FeedToken(), op);
}
virtual void replay(const NoopOperation &) override {} // ignored
virtual void replay(const NewConfigOperation &op) override {
@@ -100,16 +100,12 @@ public:
_feed_view_ptr->handleDeleteBucket(op);
}
virtual void replay(const SplitBucketOperation &op) override {
- _bucketDBHandler.handleSplit(op.getSerialNum(),
- op.getSource(),
- op.getTarget1(),
- op.getTarget2());
+ _bucketDBHandler.handleSplit(op.getSerialNum(), op.getSource(),
+ op.getTarget1(), op.getTarget2());
}
virtual void replay(const JoinBucketsOperation &op) override {
- _bucketDBHandler.handleJoin(op.getSerialNum(),
- op.getSource1(),
- op.getSource2(),
- op.getTarget());
+ _bucketDBHandler.handleJoin(op.getSerialNum(), op.getSource1(),
+ op.getSource2(), op.getTarget());
}
virtual void replay(const PruneRemovedDocumentsOperation &op) override {
_feed_view_ptr->handlePruneRemovedDocuments(op);
diff --git a/searchcore/src/vespa/searchcore/proton/server/feedstates.h b/searchcore/src/vespa/searchcore/proton/server/feedstates.h
index 61e80d1c7cc..963a78d0d6b 100644
--- a/searchcore/src/vespa/searchcore/proton/server/feedstates.h
+++ b/searchcore/src/vespa/searchcore/proton/server/feedstates.h
@@ -79,13 +79,11 @@ public:
_handler(handler) {
}
- virtual void handleOperation(FeedToken token, FeedOperation::UP op) override {
- _handler.performOperation(FeedToken::UP(new FeedToken(token)), std::move(op));
+ void handleOperation(FeedToken token, FeedOperation::UP op) override {
+ _handler.performOperation(std::move(token), std::move(op));
}
- virtual void
- receive(const PacketWrapper::SP &wrap, vespalib::Executor &) override
- {
+ void receive(const PacketWrapper::SP &wrap, vespalib::Executor &) override {
throwExceptionInReceive(_handler.getDocTypeName().c_str(),
wrap->packet.range().from(),
wrap->packet.range().to(),
diff --git a/searchcore/src/vespa/searchcore/proton/server/ifeedview.h b/searchcore/src/vespa/searchcore/proton/server/ifeedview.h
index c8222902c5d..3e1635d0f33 100644
--- a/searchcore/src/vespa/searchcore/proton/server/ifeedview.h
+++ b/searchcore/src/vespa/searchcore/proton/server/ifeedview.h
@@ -2,19 +2,17 @@
#pragma once
+#include <vespa/searchcore/proton/common/feedtoken.h>
#include <vespa/searchlib/common/serialnum.h>
-#include <memory>
namespace document { class DocumentTypeRepo; }
namespace search { class IDestructorCallback; }
-namespace proton
-{
+namespace proton {
class CompactLidSpaceOperation;
class DeleteBucketOperation;
-class FeedToken;
class ISimpleDocumentMetaStore;
class MoveOperation;
class PruneRemovedDocumentsOperation;
@@ -49,11 +47,11 @@ public:
*/
virtual void preparePut(PutOperation &putOp) = 0;
- virtual void handlePut(FeedToken *token, const PutOperation &putOp) = 0;
+ virtual void handlePut(FeedToken token, const PutOperation &putOp) = 0;
virtual void prepareUpdate(UpdateOperation &updOp) = 0;
- virtual void handleUpdate(FeedToken *token, const UpdateOperation &updOp) = 0;
+ virtual void handleUpdate(FeedToken token, const UpdateOperation &updOp) = 0;
virtual void prepareRemove(RemoveOperation &rmOp) = 0;
- virtual void handleRemove(FeedToken *token, const RemoveOperation &rmOp) = 0;
+ virtual void handleRemove(FeedToken token, const RemoveOperation &rmOp) = 0;
virtual void prepareDeleteBucket(DeleteBucketOperation &delOp) = 0;
virtual void handleDeleteBucket(const DeleteBucketOperation &delOp) = 0;
virtual void prepareMove(MoveOperation &putOp) = 0;
diff --git a/searchcore/src/vespa/searchcore/proton/server/iheartbeathandler.h b/searchcore/src/vespa/searchcore/proton/server/iheartbeathandler.h
index 7919d2f383b..3e8267f5aa5 100644
--- a/searchcore/src/vespa/searchcore/proton/server/iheartbeathandler.h
+++ b/searchcore/src/vespa/searchcore/proton/server/iheartbeathandler.h
@@ -2,22 +2,14 @@
#pragma once
-namespace proton
-{
-
-class FeedToken;
+namespace proton {
class IHeartBeatHandler
{
public:
- virtual void
- heartBeat() = 0;
+ virtual void heartBeat() = 0;
- virtual
- ~IHeartBeatHandler()
- {
- }
+ virtual ~IHeartBeatHandler() = default;
};
} // namespace proton
-
diff --git a/searchcore/src/vespa/searchcore/proton/server/operationdonecontext.cpp b/searchcore/src/vespa/searchcore/proton/server/operationdonecontext.cpp
index 9c0115f0084..31e4c87b352 100644
--- a/searchcore/src/vespa/searchcore/proton/server/operationdonecontext.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/operationdonecontext.cpp
@@ -5,7 +5,7 @@
namespace proton {
-OperationDoneContext::OperationDoneContext(std::unique_ptr<FeedToken> token)
+OperationDoneContext::OperationDoneContext(FeedToken token)
: _token(std::move(token))
{
}
@@ -18,10 +18,7 @@ OperationDoneContext::~OperationDoneContext()
void
OperationDoneContext::ack()
{
- if (_token) {
- std::unique_ptr<FeedToken> token(std::move(_token));
- token->ack();
- }
+ _token.reset();
}
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/operationdonecontext.h b/searchcore/src/vespa/searchcore/proton/server/operationdonecontext.h
index b801987844b..4c310abf871 100644
--- a/searchcore/src/vespa/searchcore/proton/server/operationdonecontext.h
+++ b/searchcore/src/vespa/searchcore/proton/server/operationdonecontext.h
@@ -3,12 +3,10 @@
#pragma once
#include <vespa/searchlib/common/idestructorcallback.h>
-#include <vespa/searchcore/proton/feedoperation/feedoperation.h>
+#include <vespa/searchcore/proton/common/feedtoken.h>
namespace proton {
-class FeedToken;
-
/**
* Context class for document operations that acks operation when
* instance is destroyed. Typically a shared pointer to an instance is
@@ -18,15 +16,15 @@ class FeedToken;
*/
class OperationDoneContext : public search::IDestructorCallback
{
- std::unique_ptr<FeedToken> _token;
+ FeedToken _token;
protected:
void ack();
public:
- OperationDoneContext(std::unique_ptr<FeedToken> token);
+ OperationDoneContext(FeedToken token);
~OperationDoneContext() override;
- FeedToken *getToken() { return _token.get(); }
+ bool hasToken() const { return static_cast<bool>(_token); }
};
diff --git a/searchcore/src/vespa/searchcore/proton/server/putdonecontext.cpp b/searchcore/src/vespa/searchcore/proton/server/putdonecontext.cpp
index 649eebb26f5..fd7871773c8 100644
--- a/searchcore/src/vespa/searchcore/proton/server/putdonecontext.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/putdonecontext.cpp
@@ -1,19 +1,14 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "putdonecontext.h"
-#include <vespa/searchcore/proton/common/feedtoken.h>
#include <vespa/searchcore/proton/common/docid_limit.h>
#include <vespa/searchcore/proton/reference/i_gid_to_lid_change_handler.h>
namespace proton {
-PutDoneContext::PutDoneContext(std::unique_ptr<FeedToken> token,
-
- IGidToLidChangeHandler &gidToLidChangeHandler,
- const document::GlobalId &gid,
- uint32_t lid,
- search::SerialNum serialNum,
- bool enableNotifyPut)
+PutDoneContext::PutDoneContext(FeedToken token, IGidToLidChangeHandler &gidToLidChangeHandler,
+ const document::GlobalId &gid, uint32_t lid,
+ search::SerialNum serialNum, bool enableNotifyPut)
: OperationDoneContext(std::move(token)),
_lid(lid),
_docIdLimit(nullptr),
diff --git a/searchcore/src/vespa/searchcore/proton/server/putdonecontext.h b/searchcore/src/vespa/searchcore/proton/server/putdonecontext.h
index 3e98b02dda6..587c8f9054d 100644
--- a/searchcore/src/vespa/searchcore/proton/server/putdonecontext.h
+++ b/searchcore/src/vespa/searchcore/proton/server/putdonecontext.h
@@ -28,7 +28,7 @@ class PutDoneContext : public OperationDoneContext
bool _enableNotifyPut;
public:
- PutDoneContext(std::unique_ptr<FeedToken> token, IGidToLidChangeHandler &gidToLidChangeHandler,
+ PutDoneContext(FeedToken token, IGidToLidChangeHandler &gidToLidChangeHandler,
const document::GlobalId &gid, uint32_t lid, search::SerialNum serialNum, bool enableNotifyPut);
~PutDoneContext() override;
diff --git a/searchcore/src/vespa/searchcore/proton/server/removedonecontext.cpp b/searchcore/src/vespa/searchcore/proton/server/removedonecontext.cpp
index bd9a8240d73..194e190bb7b 100644
--- a/searchcore/src/vespa/searchcore/proton/server/removedonecontext.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/removedonecontext.cpp
@@ -2,16 +2,13 @@
#include "removedonecontext.h"
#include "removedonetask.h"
-#include <vespa/searchcore/proton/common/feedtoken.h>
#include <vespa/searchcore/proton/reference/i_gid_to_lid_change_handler.h>
namespace proton {
-RemoveDoneContext::RemoveDoneContext(std::unique_ptr<FeedToken> token,
- vespalib::Executor &executor,
+RemoveDoneContext::RemoveDoneContext(FeedToken token, vespalib::Executor &executor,
IDocumentMetaStore &documentMetaStore,
- PendingNotifyRemoveDone &&pendingNotifyRemoveDone,
- uint32_t lid)
+ PendingNotifyRemoveDone &&pendingNotifyRemoveDone, uint32_t lid)
: OperationDoneContext(std::move(token)),
_executor(executor),
_task(),
diff --git a/searchcore/src/vespa/searchcore/proton/server/removedonecontext.h b/searchcore/src/vespa/searchcore/proton/server/removedonecontext.h
index 83f6013dd85..912d31dde22 100644
--- a/searchcore/src/vespa/searchcore/proton/server/removedonecontext.h
+++ b/searchcore/src/vespa/searchcore/proton/server/removedonecontext.h
@@ -28,10 +28,8 @@ class RemoveDoneContext : public OperationDoneContext
PendingNotifyRemoveDone _pendingNotifyRemoveDone;
public:
- RemoveDoneContext(std::unique_ptr<FeedToken> token, vespalib::Executor &executor,
- IDocumentMetaStore &documentMetaStore, PendingNotifyRemoveDone &&pendingNotifyRemoveDone,
- uint32_t lid);
-
+ RemoveDoneContext(FeedToken token, vespalib::Executor &executor, IDocumentMetaStore &documentMetaStore,
+ PendingNotifyRemoveDone &&pendingNotifyRemoveDone, uint32_t lid);
~RemoveDoneContext() override;
};
diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp
index 8c8559115bd..33e8708799f 100644
--- a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp
@@ -36,26 +36,14 @@ namespace proton {
namespace {
-FeedToken::UP dupFeedToken(FeedToken *token)
-{
- // If token is not nullptr then a new feed token is created, referencing
- // same shared state as old token.
- if (token != nullptr) {
- return std::make_unique<FeedToken>(*token);
- } else {
- return FeedToken::UP();
- }
-}
-
class PutDoneContextForMove : public PutDoneContext {
private:
IDestructorCallback::SP _moveDoneCtx;
public:
- PutDoneContextForMove(std::unique_ptr<FeedToken> token,
- IGidToLidChangeHandler &gidToLidChangeHandler,
- const document::GlobalId &gid,
- uint32_t lid, search::SerialNum serialNum, bool enableNotifyPut, IDestructorCallback::SP moveDoneCtx)
+ PutDoneContextForMove(FeedToken token, IGidToLidChangeHandler &gidToLidChangeHandler,
+ const document::GlobalId &gid, uint32_t lid, search::SerialNum serialNum,
+ bool enableNotifyPut, IDestructorCallback::SP moveDoneCtx)
: PutDoneContext(std::move(token), gidToLidChangeHandler, gid, lid, serialNum, enableNotifyPut),
_moveDoneCtx(std::move(moveDoneCtx))
{}
@@ -63,7 +51,7 @@ public:
};
std::shared_ptr<PutDoneContext>
-createPutDoneContext(FeedToken::UP &token,
+createPutDoneContext(FeedToken token,
IGidToLidChangeHandler &gidToLidChangeHandler,
const document::GlobalId &gid, uint32_t lid,
SerialNum serialNum, bool enableNotifyPut,
@@ -79,14 +67,14 @@ createPutDoneContext(FeedToken::UP &token,
}
std::shared_ptr<PutDoneContext>
-createPutDoneContext(FeedToken::UP &token, IGidToLidChangeHandler &gidToLidChangeHandler,
+createPutDoneContext(FeedToken token, IGidToLidChangeHandler &gidToLidChangeHandler,
const document::GlobalId &gid, uint32_t lid, SerialNum serialNum, bool enableNotifyPut)
{
return createPutDoneContext(token, gidToLidChangeHandler, gid, lid, serialNum, enableNotifyPut, IDestructorCallback::SP());
}
std::shared_ptr<UpdateDoneContext>
-createUpdateDoneContext(FeedToken::UP &token, const DocumentUpdate::SP &upd)
+createUpdateDoneContext(FeedToken token, const DocumentUpdate::SP &upd)
{
return std::make_shared<UpdateDoneContext>(std::move(token), upd);
}
@@ -106,11 +94,9 @@ private:
IDestructorCallback::SP _moveDoneCtx;
public:
- RemoveDoneContextForMove(std::unique_ptr<FeedToken> token, vespalib::Executor &executor,
- IDocumentMetaStore &documentMetaStore,
+ RemoveDoneContextForMove(FeedToken token, vespalib::Executor &executor, IDocumentMetaStore &documentMetaStore,
PendingNotifyRemoveDone &&pendingNotifyRemoveDone,
- uint32_t lid,
- IDestructorCallback::SP moveDoneCtx)
+ uint32_t lid, IDestructorCallback::SP moveDoneCtx)
: RemoveDoneContext(std::move(token), executor, documentMetaStore, std::move(pendingNotifyRemoveDone) ,lid),
_moveDoneCtx(std::move(moveDoneCtx))
{}
@@ -118,13 +104,14 @@ public:
};
std::shared_ptr<RemoveDoneContext>
-createRemoveDoneContext(std::unique_ptr<FeedToken> token, vespalib::Executor &executor,
- IDocumentMetaStore &documentMetaStore, PendingNotifyRemoveDone &&pendingNotifyRemoveDone,
+createRemoveDoneContext(FeedToken token, vespalib::Executor &executor, IDocumentMetaStore &documentMetaStore,
+ PendingNotifyRemoveDone &&pendingNotifyRemoveDone,
uint32_t lid, IDestructorCallback::SP moveDoneCtx)
{
if (moveDoneCtx) {
return std::make_shared<RemoveDoneContextForMove>
- (std::move(token), executor, documentMetaStore, std::move(pendingNotifyRemoveDone), lid, std::move(moveDoneCtx));
+ (std::move(token), executor, documentMetaStore, std::move(pendingNotifyRemoveDone),
+ lid, std::move(moveDoneCtx));
} else {
return std::make_shared<RemoveDoneContext>
(std::move(token), executor, documentMetaStore, std::move(pendingNotifyRemoveDone), lid);
@@ -132,7 +119,8 @@ createRemoveDoneContext(std::unique_ptr<FeedToken> token, vespalib::Executor &ex
}
std::vector<document::GlobalId> getGidsToRemove(const IDocumentMetaStore &metaStore,
- const LidVectorContext::LidVector &lidsToRemove) {
+ const LidVectorContext::LidVector &lidsToRemove)
+{
std::vector<document::GlobalId> gids;
gids.reserve(lidsToRemove.size());
for (const auto &lid : lidsToRemove) {
@@ -145,7 +133,8 @@ std::vector<document::GlobalId> getGidsToRemove(const IDocumentMetaStore &metaSt
}
void putMetaData(documentmetastore::IStore &meta_store, const DocumentId &doc_id,
- const DocumentOperation &op, bool is_removed_doc) {
+ const DocumentOperation &op, bool is_removed_doc)
+{
documentmetastore::IStore::Result putRes(
meta_store.put(doc_id.getGlobalId(),
op.getBucketId(), op.getTimestamp(), op.getSerializedDocSize(), op.getLid()));
@@ -234,10 +223,9 @@ StoreOnlyFeedView::forceCommit(SerialNum serialNum, OnForceCommitDoneType onComm
}
void
-StoreOnlyFeedView::considerEarlyAck(FeedToken::UP &token)
+StoreOnlyFeedView::considerEarlyAck(FeedToken & token)
{
if (_commitTimeTracker.hasVisibilityDelay() && token) {
- token->ack();
token.reset();
}
}
@@ -260,13 +248,13 @@ StoreOnlyFeedView::preparePut(PutOperation &putOp)
}
void
-StoreOnlyFeedView::handlePut(FeedToken *token, const PutOperation &putOp)
+StoreOnlyFeedView::handlePut(FeedToken token, const PutOperation &putOp)
{
- internalPut(dupFeedToken(token), putOp);
+ internalPut(std::move(token), putOp);
}
void
-StoreOnlyFeedView::internalPut(FeedToken::UP token, const PutOperation &putOp)
+StoreOnlyFeedView::internalPut(FeedToken token, const PutOperation &putOp)
{
assert(putOp.getValidDbdId());
assert(putOp.notMovingLidInSameSubDb());
@@ -291,7 +279,7 @@ StoreOnlyFeedView::internalPut(FeedToken::UP token, const PutOperation &putOp)
bool immediateCommit = _commitTimeTracker.needCommit();
const document::GlobalId &gid = docId.getGlobalId();
std::shared_ptr<PutDoneContext> onWriteDone =
- createPutDoneContext(token, _gidToLidChangeHandler, gid, putOp.getLid(), serialNum,
+ createPutDoneContext(std::move(token), _gidToLidChangeHandler, gid, putOp.getLid(), serialNum,
putOp.changedDbdId() && useDocumentMetaStore(serialNum));
putSummary(serialNum, putOp.getLid(), doc, onWriteDone);
putAttributes(serialNum, putOp.getLid(), *doc, immediateCommit, onWriteDone);
@@ -302,9 +290,6 @@ StoreOnlyFeedView::internalPut(FeedToken::UP token, const PutOperation &putOp)
internalRemove(std::move(token), serialNum, std::move(pendingNotifyRemoveDone),
putOp.getPrevLid(), IDestructorCallback::SP());
}
- if (token) {
- token->ack();
- }
}
void
@@ -347,9 +332,9 @@ StoreOnlyFeedView::prepareUpdate(UpdateOperation &updOp)
}
void
-StoreOnlyFeedView::handleUpdate(FeedToken *token, const UpdateOperation &updOp)
+StoreOnlyFeedView::handleUpdate(FeedToken token, const UpdateOperation &updOp)
{
- internalUpdate(dupFeedToken(token), updOp);
+ internalUpdate(std::move(token), updOp);
}
void StoreOnlyFeedView::putSummary(SerialNum serialNum, Lid lid,
@@ -400,7 +385,7 @@ void StoreOnlyFeedView::heartBeatSummary(SerialNum serialNum) {
}
void
-StoreOnlyFeedView::internalUpdate(FeedToken::UP token, const UpdateOperation &updOp) {
+StoreOnlyFeedView::internalUpdate(FeedToken token, const UpdateOperation &updOp) {
if ( ! updOp.getUpdate()) {
LOG(warning, "database(%s): ignoring invalid update operation",
_params._docTypeName.toString().c_str());
@@ -430,7 +415,7 @@ StoreOnlyFeedView::internalUpdate(FeedToken::UP token, const UpdateOperation &up
considerEarlyAck(token);
bool immediateCommit = _commitTimeTracker.needCommit();
- auto onWriteDone = createUpdateDoneContext(token, updOp.getUpdate());
+ auto onWriteDone = createUpdateDoneContext(std::move(token), updOp.getUpdate());
updateAttributes(serialNum, lid, upd, immediateCommit, onWriteDone);
UpdateScope updateScope(getUpdateScope(upd));
@@ -470,19 +455,18 @@ StoreOnlyFeedView::makeUpdatedDocument(SerialNum serialNum, Lid lid, DocumentUpd
const DocumentUpdate & upd = *update;
Document::UP newDoc;
vespalib::nbostream newStream(12345);
- assert(onWriteDone->getToken() == nullptr || useDocumentStore(serialNum));
+ assert(!onWriteDone->hasToken() || useDocumentStore(serialNum));
if (useDocumentStore(serialNum)) {
assert(prevDoc);
}
if (!prevDoc) {
// Replaying, document removed later before summary was flushed.
- assert(onWriteDone->getToken() == nullptr);
+ assert(!onWriteDone->hasToken());
// If we've passed serial number for flushed index then we could
// also check that this operation is marked for ignore by index
// proxy.
} else {
if (upd.getId() == prevDoc->getId()) {
-
newDoc = std::move(prevDoc);
if (useDocumentStore(serialNum)) {
upd.applyTo(*newDoc);
@@ -491,7 +475,7 @@ StoreOnlyFeedView::makeUpdatedDocument(SerialNum serialNum, Lid lid, DocumentUpd
} else {
// Replaying, document removed and lid reused before summary
// was flushed.
- assert(onWriteDone->getToken() == nullptr && !useDocumentStore(serialNum));
+ assert(!onWriteDone->hasToken() && !useDocumentStore(serialNum));
}
}
promisedDoc.set_value(std::move(newDoc));
@@ -531,12 +515,12 @@ StoreOnlyFeedView::prepareRemove(RemoveOperation &rmOp)
}
void
-StoreOnlyFeedView::handleRemove(FeedToken *token, const RemoveOperation &rmOp) {
- internalRemove(dupFeedToken(token), rmOp);
+StoreOnlyFeedView::handleRemove(FeedToken token, const RemoveOperation &rmOp) {
+ internalRemove(std::move(token), rmOp);
}
void
-StoreOnlyFeedView::internalRemove(FeedToken::UP token, const RemoveOperation &rmOp)
+StoreOnlyFeedView::internalRemove(FeedToken token, const RemoveOperation &rmOp)
{
assert(rmOp.getValidNewOrPrevDbdId());
assert(rmOp.notMovingLidInSameSubDb());
@@ -564,13 +548,10 @@ StoreOnlyFeedView::internalRemove(FeedToken::UP token, const RemoveOperation &rm
rmOp.getPrevLid(), IDestructorCallback::SP());
}
}
- if (token) {
- token->ack();
- }
}
void
-StoreOnlyFeedView::internalRemove(FeedToken::UP token, SerialNum serialNum,
+StoreOnlyFeedView::internalRemove(FeedToken token, SerialNum serialNum,
PendingNotifyRemoveDone &&pendingNotifyRemoveDone, Lid lid,
IDestructorCallback::SP moveDoneCtx)
{
@@ -727,16 +708,15 @@ StoreOnlyFeedView::handleMove(const MoveOperation &moveOp, IDestructorCallback::
if (moveOp.getValidDbdId(_params._subDbId)) {
bool immediateCommit = _commitTimeTracker.needCommit();
const document::GlobalId &gid = docId.getGlobalId();
- FeedToken::UP token;
std::shared_ptr<PutDoneContext> onWriteDone =
- createPutDoneContext(token, _gidToLidChangeHandler, gid, moveOp.getLid(), serialNum,
+ createPutDoneContext(FeedToken(), _gidToLidChangeHandler, gid, moveOp.getLid(), serialNum,
moveOp.changedDbdId() && useDocumentMetaStore(serialNum), doneCtx);
putSummary(serialNum, moveOp.getLid(), doc, onWriteDone);
putAttributes(serialNum, moveOp.getLid(), *doc, immediateCommit, onWriteDone);
putIndexedFields(serialNum, moveOp.getLid(), doc, immediateCommit, onWriteDone);
}
if (docAlreadyExists && moveOp.changedDbdId()) {
- internalRemove(FeedToken::UP(), serialNum, std::move(pendingNotifyRemoveDone), moveOp.getPrevLid(), doneCtx);
+ internalRemove(FeedToken(), serialNum, std::move(pendingNotifyRemoveDone), moveOp.getPrevLid(), doneCtx);
}
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h
index baaf77bbe59..a8119224ba8 100644
--- a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h
+++ b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h
@@ -168,22 +168,22 @@ private:
}
PendingNotifyRemoveDone adjustMetaStore(const DocumentOperation &op, const document::DocumentId &docId);
- void internalPut(FeedTokenUP token, const PutOperation &putOp);
- void internalUpdate(FeedTokenUP token, const UpdateOperation &updOp);
+ void internalPut(FeedToken token, const PutOperation &putOp);
+ void internalUpdate(FeedToken token, const UpdateOperation &updOp);
bool lookupDocId(const document::DocumentId &docId, Lid & lid) const;
- void internalRemove(FeedTokenUP token, const RemoveOperation &rmOp);
+ void internalRemove(FeedToken token, const RemoveOperation &rmOp);
// Removes documents from meta store and document store.
// returns the number of documents removed.
size_t removeDocuments(const RemoveDocumentsOperation &op, bool remove_index_and_attribute_fields,
bool immediateCommit);
- void internalRemove(FeedTokenUP token, SerialNum serialNum, PendingNotifyRemoveDone &&pendingNotifyRemoveDone,
+ void internalRemove(FeedToken token, SerialNum serialNum, PendingNotifyRemoveDone &&pendingNotifyRemoveDone,
Lid lid, std::shared_ptr<search::IDestructorCallback> moveDoneCtx);
// Ack token early if visibility delay is nonzero
- void considerEarlyAck(FeedTokenUP &token);
+ void considerEarlyAck(FeedToken &token);
void makeUpdatedDocument(SerialNum serialNum, Lid lid, DocumentUpdateSP upd, OnOperationDoneType onWriteDone,
PromisedDoc promisedDoc, PromisedStream promisedStream);
@@ -220,7 +220,6 @@ protected:
public:
StoreOnlyFeedView(const Context &ctx, const PersistentParams &params);
-
~StoreOnlyFeedView() override;
const ISummaryAdapter::SP &getSummaryAdapter() const { return _summaryAdapter; }
@@ -233,31 +232,22 @@ public:
CommitTimeTracker &getCommitTimeTracker() { return _commitTimeTracker; }
IGidToLidChangeHandler &getGidToLidChangeHandler() const { return _gidToLidChangeHandler; }
- /**
- * Implements IFeedView.
- */
- virtual const document::DocumentTypeRepo::SP &getDocumentTypeRepo() const override { return _repo; }
- virtual const ISimpleDocumentMetaStore *getDocumentMetaStorePtr() const override;
-
- /**
- * Similar to IPersistenceHandler functions.
- * Takes pointer to feed token instead of instance because
- * when replaying the spooler we don't have a feed token.
- */
-
- virtual void preparePut(PutOperation &putOp) override;
- virtual void handlePut(FeedToken *token, const PutOperation &putOp) override;
- virtual void prepareUpdate(UpdateOperation &updOp) override;
- virtual void handleUpdate(FeedToken *token, const UpdateOperation &updOp) override;
- virtual void prepareRemove(RemoveOperation &rmOp) override;
- virtual void handleRemove(FeedToken *token, const RemoveOperation &rmOp) override;
- virtual void prepareDeleteBucket(DeleteBucketOperation &delOp) override;
- virtual void handleDeleteBucket(const DeleteBucketOperation &delOp) override;
- virtual void prepareMove(MoveOperation &putOp) override;
- virtual void handleMove(const MoveOperation &putOp, std::shared_ptr<search::IDestructorCallback> doneCtx) override;
- virtual void heartBeat(search::SerialNum serialNum) override;
- virtual void sync() override;
- virtual void forceCommit(SerialNum serialNum) override;
+ const document::DocumentTypeRepo::SP &getDocumentTypeRepo() const override { return _repo; }
+ const ISimpleDocumentMetaStore *getDocumentMetaStorePtr() const override;
+
+ void preparePut(PutOperation &putOp) override;
+ void handlePut(FeedToken token, const PutOperation &putOp) override;
+ void prepareUpdate(UpdateOperation &updOp) override;
+ void handleUpdate(FeedToken token, const UpdateOperation &updOp) override;
+ void prepareRemove(RemoveOperation &rmOp) override;
+ void handleRemove(FeedToken token, const RemoveOperation &rmOp) override;
+ void prepareDeleteBucket(DeleteBucketOperation &delOp) override;
+ void handleDeleteBucket(const DeleteBucketOperation &delOp) override;
+ void prepareMove(MoveOperation &putOp) override;
+ void handleMove(const MoveOperation &putOp, std::shared_ptr<search::IDestructorCallback> doneCtx) override;
+ void heartBeat(search::SerialNum serialNum) override;
+ void sync() override;
+ void forceCommit(SerialNum serialNum) override;
virtual void forceCommit(SerialNum serialNum, OnForceCommitDoneType onCommitDone);
/**
@@ -266,8 +256,8 @@ public:
*
* Called by writer thread.
*/
- virtual void handlePruneRemovedDocuments(const PruneRemovedDocumentsOperation &pruneOp) override;
- virtual void handleCompactLidSpace(const CompactLidSpaceOperation &op) override;
+ void handlePruneRemovedDocuments(const PruneRemovedDocumentsOperation &pruneOp) override;
+ void handleCompactLidSpace(const CompactLidSpaceOperation &op) override;
};
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/updatedonecontext.cpp b/searchcore/src/vespa/searchcore/proton/server/updatedonecontext.cpp
index 171990c32d3..6fc1a788531 100644
--- a/searchcore/src/vespa/searchcore/proton/server/updatedonecontext.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/updatedonecontext.cpp
@@ -1,11 +1,10 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "updatedonecontext.h"
-#include <vespa/searchcore/proton/common/feedtoken.h>
namespace proton {
-UpdateDoneContext::UpdateDoneContext(std::unique_ptr<FeedToken> token, const document::DocumentUpdate::SP &upd)
+UpdateDoneContext::UpdateDoneContext(FeedToken token, const document::DocumentUpdate::SP &upd)
: OperationDoneContext(std::move(token)),
_upd(upd)
{
diff --git a/searchcore/src/vespa/searchcore/proton/server/updatedonecontext.h b/searchcore/src/vespa/searchcore/proton/server/updatedonecontext.h
index 4701db300de..2b8a018af87 100644
--- a/searchcore/src/vespa/searchcore/proton/server/updatedonecontext.h
+++ b/searchcore/src/vespa/searchcore/proton/server/updatedonecontext.h
@@ -18,7 +18,7 @@ class UpdateDoneContext : public OperationDoneContext
{
document::DocumentUpdate::SP _upd;
public:
- UpdateDoneContext(std::unique_ptr<FeedToken> token, const document::DocumentUpdate::SP &upd);
+ UpdateDoneContext(FeedToken token, const document::DocumentUpdate::SP &upd);
~UpdateDoneContext() override;
const document::DocumentUpdate &getUpdate() { return *_upd; }
diff --git a/searchcore/src/vespa/searchcore/proton/test/dummy_feed_view.h b/searchcore/src/vespa/searchcore/proton/test/dummy_feed_view.h
index cafcbb64410..163168a1a09 100644
--- a/searchcore/src/vespa/searchcore/proton/test/dummy_feed_view.h
+++ b/searchcore/src/vespa/searchcore/proton/test/dummy_feed_view.h
@@ -4,9 +4,7 @@
#include <vespa/searchcore/proton/server/ifeedview.h>
#include <vespa/document/repo/documenttyperepo.h>
-namespace proton {
-
-namespace test {
+namespace proton::test {
struct DummyFeedView : public IFeedView
{
@@ -18,33 +16,27 @@ struct DummyFeedView : public IFeedView
DummyFeedView(const document::DocumentTypeRepo::SP &docTypeRepo)
: _docTypeRepo(docTypeRepo)
{}
- virtual const document::DocumentTypeRepo::SP &getDocumentTypeRepo() const override {
+ const document::DocumentTypeRepo::SP &getDocumentTypeRepo() const override {
return _docTypeRepo;
}
- virtual const ISimpleDocumentMetaStore *getDocumentMetaStorePtr() const override {
+ const ISimpleDocumentMetaStore *getDocumentMetaStorePtr() const override {
return std::nullptr_t();
}
- virtual void preparePut(PutOperation &) override {}
- virtual void handlePut(FeedToken *,
- const PutOperation &) override {}
- virtual void prepareUpdate(UpdateOperation &) override {}
- virtual void handleUpdate(FeedToken *,
- const UpdateOperation &) override {}
- virtual void prepareRemove(RemoveOperation &) override {}
- virtual void handleRemove(FeedToken *,
- const RemoveOperation &) override {}
- virtual void prepareDeleteBucket(DeleteBucketOperation &) override {}
- virtual void handleDeleteBucket(const DeleteBucketOperation &) override {}
- virtual void prepareMove(MoveOperation &) override {}
- virtual void handleMove(const MoveOperation &, std::shared_ptr<search::IDestructorCallback>) override {}
- virtual void heartBeat(search::SerialNum) override {}
- virtual void sync() override {}
- virtual void handlePruneRemovedDocuments(const PruneRemovedDocumentsOperation &) override {}
- virtual void handleCompactLidSpace(const CompactLidSpaceOperation &) override {}
+ void preparePut(PutOperation &) override {}
+ void handlePut(FeedToken, const PutOperation &) override {}
+ void prepareUpdate(UpdateOperation &) override {}
+ void handleUpdate(FeedToken, const UpdateOperation &) override {}
+ void prepareRemove(RemoveOperation &) override {}
+ void handleRemove(FeedToken, const RemoveOperation &) override {}
+ void prepareDeleteBucket(DeleteBucketOperation &) override {}
+ void handleDeleteBucket(const DeleteBucketOperation &) override {}
+ void prepareMove(MoveOperation &) override {}
+ void handleMove(const MoveOperation &, std::shared_ptr<search::IDestructorCallback>) override {}
+ void heartBeat(search::SerialNum) override {}
+ void sync() override {}
+ void handlePruneRemovedDocuments(const PruneRemovedDocumentsOperation &) override {}
+ void handleCompactLidSpace(const CompactLidSpaceOperation &) override {}
void forceCommit(search::SerialNum) override { }
};
-} // namespace test
-
-} // namespace proton
-
+}