summaryrefslogtreecommitdiffstats
path: root/searchcore
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2017-10-12 18:16:11 +0200
committerHenning Baldersheim <balder@yahoo-inc.com>2017-10-12 18:16:11 +0200
commit4130c7302a59f3cea0845fb1825600cc1549bc4a (patch)
tree8869768b617b19c8af3a1a2d3a8ade41c60a232c /searchcore
parent31553efdaafacaabb28dc7ce96719184be33a22b (diff)
Get rid of mbus::reply leftovers.
Diffstat (limited to 'searchcore')
-rw-r--r--searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp69
-rw-r--r--searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp36
-rw-r--r--searchcore/src/tests/proton/feedtoken/feedtoken.cpp81
-rw-r--r--searchcore/src/vespa/searchcore/proton/common/feedtoken.cpp27
-rw-r--r--searchcore/src/vespa/searchcore/proton/common/feedtoken.h22
-rw-r--r--searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp45
-rw-r--r--searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.cpp7
-rw-r--r--searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.h5
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp30
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/feedstate.cpp4
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/feedstate.h22
11 files changed, 87 insertions, 261 deletions
diff --git a/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp b/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp
index 29f043e7caa..30c0556e812 100644
--- a/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp
@@ -305,7 +305,7 @@ struct MyTransport : public FeedToken::ITransport {
bool documentWasFound;
MyTransport();
~MyTransport();
- virtual void send(Reply::UP, ResultUP res, bool documentWasFound_, double) override {
+ void send(ResultUP res, bool documentWasFound_) override {
result = std::move(res);
documentWasFound = documentWasFound_;
gate.countDown();
@@ -315,21 +315,12 @@ struct MyTransport : public FeedToken::ITransport {
MyTransport::MyTransport() : gate(), result(), documentWasFound(false) {}
MyTransport::~MyTransport() {}
-Reply::UP getReply(uint32_t type) {
- if (type == DocumentProtocol::REPLY_REMOVEDOCUMENT) {
- return Reply::UP(new RemoveDocumentReply);
- } else if (type == DocumentProtocol::REPLY_UPDATEDOCUMENT) {
- return Reply::UP(new UpdateDocumentReply);
- }
- return Reply::UP(new DocumentReply(type));
-}
-
struct FeedTokenContext {
MyTransport transport;
FeedToken::UP token_ap;
FeedToken &token;
- FeedTokenContext(uint32_t type = 0);
+ FeedTokenContext();
~FeedTokenContext();
bool await(uint32_t timeout = 80000) { return transport.gate.await(timeout); }
const Result *getResult() {
@@ -340,24 +331,23 @@ struct FeedTokenContext {
}
};
-FeedTokenContext::FeedTokenContext(uint32_t type)
+FeedTokenContext::FeedTokenContext()
: transport(),
- token_ap(new FeedToken(transport, getReply(type))),
+ token_ap(new FeedToken(transport)),
token(*token_ap)
{
- token.getReply().getTrace().setLevel(9);
}
-FeedTokenContext::~FeedTokenContext() {}
+
+FeedTokenContext::~FeedTokenContext() = default;
struct PutContext {
FeedTokenContext tokenCtx;
DocumentContext docCtx;
typedef std::shared_ptr<PutContext> SP;
PutContext(const vespalib::string &docId, DocBuilder &builder) :
- tokenCtx(DocumentProtocol::REPLY_PUTDOCUMENT),
+ tokenCtx(),
docCtx(docId, builder)
- {
- }
+ {}
};
@@ -371,12 +361,10 @@ struct PutHandler {
builder(db),
timestamp(0),
puts()
- {
- }
+ {}
void put(const vespalib::string &docId) {
PutContext::SP pc(new PutContext(docId, builder));
- FeedOperation::UP op(new PutOperation(pc->docCtx.bucketId,
- timestamp, pc->docCtx.doc));
+ FeedOperation::UP op(new PutOperation(pc->docCtx.bucketId, timestamp, pc->docCtx.doc));
handler.handleOperation(pc->tokenCtx.token, std::move(op));
timestamp = Timestamp(timestamp + 1);
puts.push_back(pc);
@@ -492,12 +480,10 @@ TEST_F("require that heartBeat calls FeedView's heartBeat",
TEST_F("require that outdated remove is ignored", FeedHandlerFixture)
{
DocumentContext doc_context("doc:test:foo", *f.schema.builder);
- FeedOperation::UP op(new RemoveOperation(doc_context.bucketId,
- Timestamp(10),
- doc_context.doc->getId()));
+ FeedOperation::UP op(new RemoveOperation(doc_context.bucketId, Timestamp(10), doc_context.doc->getId()));
static_cast<DocumentOperation &>(*op).setPrevDbDocumentId(DbDocumentId(4));
static_cast<DocumentOperation &>(*op).setPrevTimestamp(Timestamp(10000));
- FeedTokenContext token_context(DocumentProtocol::REPLY_REMOVEDOCUMENT);
+ FeedTokenContext token_context;
f.handler.performOperation(std::move(token_context.token_ap), std::move(op));
EXPECT_EQUAL(0, f.feedView.remove_count);
EXPECT_EQUAL(0, f.tls_writer.store_count);
@@ -584,15 +570,11 @@ TEST_F("require that flush cannot unprune", FeedHandlerFixture)
EXPECT_EQUAL(10u, f.handler.getPrunedSerialNum());
}
-TEST_F("require that remove of unknown document with known data type "
- "stores remove", FeedHandlerFixture)
+TEST_F("require that remove of unknown document with known data type stores remove", FeedHandlerFixture)
{
- DocumentContext doc_context("id:test:searchdocument::foo",
- *f.schema.builder);
- FeedOperation::UP op(new RemoveOperation(doc_context.bucketId,
- Timestamp(10),
- doc_context.doc->getId()));
- FeedTokenContext token_context(DocumentProtocol::REPLY_REMOVEDOCUMENT);
+ DocumentContext doc_context("id:test:searchdocument::foo", *f.schema.builder);
+ FeedOperation::UP op(new RemoveOperation(doc_context.bucketId, Timestamp(10), doc_context.doc->getId()));
+ FeedTokenContext token_context;
f.handler.performOperation(std::move(token_context.token_ap), std::move(op));
EXPECT_EQUAL(1, f.feedView.remove_count);
EXPECT_EQUAL(1, f.tls_writer.store_count);
@@ -602,10 +584,8 @@ TEST_F("require that partial update for non-existing document is tagged as such"
FeedHandlerFixture)
{
UpdateContext upCtx("id:test:searchdocument::foo", *f.schema.builder);
- FeedOperation::UP op(new UpdateOperation(upCtx.bucketId,
- Timestamp(10),
- upCtx.update));
- FeedTokenContext token_context(DocumentProtocol::REPLY_UPDATEDOCUMENT);
+ FeedOperation::UP op(new UpdateOperation(upCtx.bucketId, Timestamp(10), upCtx.update));
+ FeedTokenContext token_context;
f.handler.performOperation(std::move(token_context.token_ap), std::move(op));
const UpdateResult *result = static_cast<const UpdateResult *>(token_context.getResult());
@@ -622,12 +602,9 @@ TEST_F("require that partial update for non-existing document is created if spec
f.handler.setSerialNum(15);
UpdateContext upCtx("id:test:searchdocument::foo", *f.schema.builder);
upCtx.update->setCreateIfNonExistent(true);
- f.feedView.metaStore.insert(upCtx.update->getId().getGlobalId(),
- MyDocumentMetaStore::Entry(5, 5, Timestamp(10)));
- FeedOperation::UP op(new UpdateOperation(upCtx.bucketId,
- Timestamp(10),
- upCtx.update));
- FeedTokenContext token_context(DocumentProtocol::REPLY_UPDATEDOCUMENT);
+ f.feedView.metaStore.insert(upCtx.update->getId().getGlobalId(), MyDocumentMetaStore::Entry(5, 5, Timestamp(10)));
+ FeedOperation::UP op(new UpdateOperation(upCtx.bucketId, Timestamp(10), upCtx.update));
+ FeedTokenContext token_context;
f.handler.performOperation(std::move(token_context.token_ap), std::move(op));
const UpdateResult *result = static_cast<const UpdateResult *>(token_context.getResult());
@@ -663,7 +640,7 @@ TEST_F("require that update is rejected if resource limit is reached", FeedHandl
UpdateContext updCtx("id:test:searchdocument::foo", *f.schema.builder);
FeedOperation::UP op = std::make_unique<UpdateOperation>(updCtx.bucketId, Timestamp(10), updCtx.update);
- FeedTokenContext token(DocumentProtocol::REPLY_UPDATEDOCUMENT);
+ FeedTokenContext token;
f.handler.performOperation(std::move(token.token_ap), std::move(op));
EXPECT_EQUAL(0, f.feedView.update_count);
EXPECT_TRUE(dynamic_cast<const UpdateResult *>(token.getResult()));
@@ -679,7 +656,7 @@ TEST_F("require that remove is NOT rejected if resource limit is reached", FeedH
DocumentContext docCtx("id:test:searchdocument::foo", *f.schema.builder);
FeedOperation::UP op = std::make_unique<RemoveOperation>(docCtx.bucketId, Timestamp(10), docCtx.doc->getId());
- FeedTokenContext token(DocumentProtocol::REPLY_REMOVEDOCUMENT);
+ FeedTokenContext token;
f.handler.performOperation(std::move(token.token_ap), std::move(op));
EXPECT_EQUAL(1, f.feedView.remove_count);
EXPECT_EQUAL(Result::NONE, token.getResult()->getErrorCode());
diff --git a/searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp b/searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp
index 6d4e5ae2478..4eefbed0a53 100644
--- a/searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp
@@ -412,11 +412,7 @@ struct MyTransport : public FeedToken::ITransport
MyTracer &_tracer;
MyTransport(MyTracer &tracer);
~MyTransport();
- virtual void send(mbus::Reply::UP reply,
- ResultUP result,
- bool documentWasFound,
- double latency_ms) override {
- (void) reply; (void) documentWasFound, (void) latency_ms;
+ void send(ResultUP result, bool ) override {
lastResult = std::move(result);
_tracer.traceAck(lastResult);
_gate.countDown();
@@ -484,36 +480,20 @@ DocumentContext::DocumentContext(const vespalib::string &docId, uint64_t timesta
{}
DocumentContext::~DocumentContext() {}
-namespace {
-
-mbus::Reply::UP
-createReply(MessageType mtype)
-{
- if (mtype == DocumentProtocol::REPLY_UPDATEDOCUMENT) {
- return mbus::Reply::UP(new documentapi::UpdateDocumentReply);
- } else if (mtype == DocumentProtocol::REPLY_REMOVEDOCUMENT) {
- return mbus::Reply::UP(new documentapi::RemoveDocumentReply);
- } else {
- return mbus::Reply::UP(new documentapi::DocumentReply(mtype));
- }
-}
-
-} // namespace
-
struct FeedTokenContext
{
MyTransport mt;
FeedToken ft;
typedef std::shared_ptr<FeedTokenContext> SP;
typedef std::vector<SP> List;
- FeedTokenContext(MyTracer &tracer, MessageType mtype);
+ FeedTokenContext(MyTracer &tracer);
~FeedTokenContext();
};
-FeedTokenContext::FeedTokenContext(MyTracer &tracer, MessageType mtype)
- : mt(tracer), ft(mt, createReply(mtype))
+FeedTokenContext::FeedTokenContext(MyTracer &tracer)
+ : mt(tracer), ft(mt)
{}
-FeedTokenContext::~FeedTokenContext() {}
+FeedTokenContext::~FeedTokenContext() = default;
struct FixtureBase
{
@@ -604,7 +584,7 @@ struct FixtureBase
}
void putAndWait(const DocumentContext &docCtx) {
- FeedTokenContext token(_tracer, DocumentProtocol::REPLY_PUTDOCUMENT);
+ FeedTokenContext token(_tracer);
PutOperation op(docCtx.bid, docCtx.ts, docCtx.doc);
runInMaster([&] () { performPut(&token.ft, op); });
}
@@ -616,7 +596,7 @@ struct FixtureBase
}
void updateAndWait(const DocumentContext &docCtx) {
- FeedTokenContext token(_tracer, DocumentProtocol::REPLY_UPDATEDOCUMENT);
+ FeedTokenContext token(_tracer);
UpdateOperation op(docCtx.bid, docCtx.ts, docCtx.upd);
runInMaster([&] () { performUpdate(&token.ft, op); });
}
@@ -634,7 +614,7 @@ struct FixtureBase
}
void removeAndWait(const DocumentContext &docCtx) {
- FeedTokenContext token(_tracer, DocumentProtocol::REPLY_REMOVEDOCUMENT);
+ FeedTokenContext token(_tracer);
RemoveOperation op(docCtx.bid, docCtx.ts, docCtx.doc->getId());
runInMaster([&] () { performRemove(&token.ft, op); });
}
diff --git a/searchcore/src/tests/proton/feedtoken/feedtoken.cpp b/searchcore/src/tests/proton/feedtoken/feedtoken.cpp
index 9df65ae3437..301f21d04a1 100644
--- a/searchcore/src/tests/proton/feedtoken/feedtoken.cpp
+++ b/searchcore/src/tests/proton/feedtoken/feedtoken.cpp
@@ -1,57 +1,39 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include <vespa/messagebus/emptyreply.h>
-#include <vespa/messagebus/testlib/receptor.h>
-#include <vespa/documentapi/messagebus/messages/removedocumentreply.h>
-#include <vespa/searchcore/proton/common/feedtoken.h>
#include <vespa/vespalib/testkit/testapp.h>
+#include <vespa/searchcore/proton/common/feedtoken.h>
#include <vespa/vespalib/util/exceptions.h>
using namespace proton;
class LocalTransport : public FeedToken::ITransport {
private:
- mbus::Receptor _receptor;
- double _latency_ms;
+ size_t _receivedCount;
public:
LocalTransport()
- : _receptor(),
- _latency_ms(0.0)
- {
- // empty
- }
-
- void send(mbus::Reply::UP reply, ResultUP, bool, double latency_ms) override {
- _receptor.handleReply(std::move(reply));
- _latency_ms = latency_ms;
- }
+ : _receivedCount(0)
+ { }
- mbus::Reply::UP getReply() {
- return _receptor.getReply();
+ void send(ResultUP, bool) override {
+ _receivedCount++;
}
- double getLatencyMs() const {
- return _latency_ms;
- }
+ size_t getReceivedCount() const { return _receivedCount; }
};
class Test : public vespalib::TestApp {
private:
void testAck();
- void testAutoReply();
void testFail();
void testHandover();
- void testIntegrity();
public:
int Main() override {
TEST_INIT("feedtoken_test");
testAck(); TEST_FLUSH();
-// testAutoReply(); TEST_FLUSH();
testFail(); TEST_FLUSH();
testHandover(); TEST_FLUSH();
-// testIntegrity(); TEST_FLUSH();
TEST_DONE();
}
@@ -63,41 +45,18 @@ void
Test::testAck()
{
LocalTransport transport;
- mbus::Reply::UP msg(new documentapi::RemoveDocumentReply());
- FeedToken token(transport, std::move(msg));
+ FeedToken token(transport);
token.ack();
- mbus::Reply::UP reply = transport.getReply();
- ASSERT_TRUE(reply.get() != NULL);
- EXPECT_TRUE(!reply->hasErrors());
-}
-
-void
-Test::testAutoReply()
-{
- mbus::Receptor receptor;
- mbus::Reply::UP reply(new documentapi::RemoveDocumentReply());
- reply->pushHandler(receptor);
- {
- LocalTransport transport;
- FeedToken token(transport, std::move(reply));
- }
- reply = receptor.getReply(0);
- ASSERT_TRUE(reply.get() != NULL);
- EXPECT_TRUE(reply->hasErrors());
+ EXPECT_EQUAL(1u, transport.getReceivedCount());
}
void
Test::testFail()
{
LocalTransport transport;
- mbus::Reply::UP reply(new documentapi::RemoveDocumentReply());
- FeedToken token(transport, std::move(reply));
+ FeedToken token(transport);
token.fail(69, "6699");
- reply = transport.getReply();
- ASSERT_TRUE(reply.get() != NULL);
- EXPECT_EQUAL(1u, reply->getNumErrors());
- EXPECT_EQUAL(69u, reply->getError(0).getCode());
- EXPECT_EQUAL("6699", reply->getError(0).getMessage());
+ EXPECT_EQUAL(1u, transport.getReceivedCount());
}
void
@@ -110,25 +69,11 @@ Test::testHandover()
};
LocalTransport transport;
- mbus::Reply::UP reply(new documentapi::RemoveDocumentReply());
- FeedToken token(transport, std::move(reply));
+ FeedToken token(transport);
token = MyHandover::handover(token);
token.ack();
- reply = transport.getReply();
- ASSERT_TRUE(reply.get() != NULL);
- EXPECT_TRUE(!reply->hasErrors());
+ EXPECT_EQUAL(1u, transport.getReceivedCount());
}
-void
-Test::testIntegrity()
-{
- LocalTransport transport;
- try {
- FeedToken token(transport, mbus::Reply::UP());
- EXPECT_TRUE(false); // should throw an exception
- } catch (vespalib::IllegalArgumentException &e) {
- (void)e; // expected
- }
-}
diff --git a/searchcore/src/vespa/searchcore/proton/common/feedtoken.cpp b/searchcore/src/vespa/searchcore/proton/common/feedtoken.cpp
index 272719f8e4a..0a4f00ef6e0 100644
--- a/searchcore/src/vespa/searchcore/proton/common/feedtoken.cpp
+++ b/searchcore/src/vespa/searchcore/proton/common/feedtoken.cpp
@@ -4,36 +4,32 @@
namespace proton {
-FeedToken::FeedToken(ITransport &transport, mbus::Reply::UP reply) :
- _state(new State(transport, std::move(reply), 1))
+FeedToken::FeedToken(ITransport &transport) :
+ _state(new State(transport, 1))
{
}
-FeedToken::State::State(ITransport & transport, mbus::Reply::UP reply, uint32_t numAcksRequired) :
+FeedToken::State::State(ITransport & transport, uint32_t numAcksRequired) :
_transport(transport),
- _reply(std::move(reply)),
_result(new storage::spi::Result()),
_documentWasFound(false),
_unAckedCount(numAcksRequired),
- _lock(),
- _startTime()
+ _lock()
{
- assert(_reply);
- _startTime.SetNow();
+ assert(_unAckedCount > 0);
}
FeedToken::State::~State()
{
- assert(!_reply);
+ assert(_unAckedCount == 0);
}
void
FeedToken::State::ack()
{
- assert(_reply);
uint32_t prev(_unAckedCount--);
if (prev == 1) {
- _transport.send(std::move(_reply), std::move(_result), _documentWasFound, _startTime.MilliSecsToNow());
+ _transport.send(std::move(_result), _documentWasFound);
}
assert(prev >= 1);
}
@@ -41,20 +37,17 @@ FeedToken::State::ack()
void
FeedToken::State::incNeededAcks()
{
- assert(_reply);
uint32_t prev(_unAckedCount++);
assert(prev >= 1);
(void) prev;
}
-
void
-FeedToken::State::fail(uint32_t errNum, const vespalib::string &errMsg)
+FeedToken::State::fail(uint32_t, const vespalib::string &)
{
- assert(_reply);
+ _unAckedCount = 0;
vespalib::LockGuard guard(_lock);
- _reply->addError(mbus::Error(errNum, errMsg));
- _transport.send(std::move(_reply), std::move(_result), _documentWasFound, _startTime.MilliSecsToNow());
+ _transport.send(std::move(_result), _documentWasFound);
}
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/common/feedtoken.h b/searchcore/src/vespa/searchcore/proton/common/feedtoken.h
index 0983819cefb..8f6b315f803 100644
--- a/searchcore/src/vespa/searchcore/proton/common/feedtoken.h
+++ b/searchcore/src/vespa/searchcore/proton/common/feedtoken.h
@@ -1,11 +1,9 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
-#include <vespa/messagebus/reply.h>
#include <vespa/persistence/spi/persistenceprovider.h>
#include <vespa/vespalib/util/exception.h>
#include <vespa/vespalib/util/sync.h>
-#include <vespa/searchcore/proton/feedoperation/feedoperation.h>
#include <atomic>
namespace proton {
@@ -22,7 +20,7 @@ public:
class ITransport {
public:
virtual ~ITransport() { }
- virtual void send(mbus::Reply::UP reply, ResultUP result, bool documentWasFound, double latency_ms) = 0;
+ virtual void send(ResultUP result, bool documentWasFound) = 0;
};
private:
@@ -30,26 +28,22 @@ private:
public:
State(const State &) = delete;
State & operator = (const State &) = delete;
- State(ITransport & transport, mbus::Reply::UP reply, uint32_t numAcksRequired);
+ State(ITransport & transport, uint32_t numAcksRequired);
~State();
void incNeededAcks();
void ack();
void fail(uint32_t errNum, const vespalib::string &errMsg);
- mbus::Reply & getReply() { return *_reply; }
void setResult(ResultUP result, bool documentWasFound) {
_documentWasFound = documentWasFound;
_result = std::move(result);
}
const storage::spi::Result &getResult() { return *_result; }
- FastOS_Time getStartTime() const { return _startTime; }
private:
ITransport &_transport;
- mbus::Reply::UP _reply;
ResultUP _result;
bool _documentWasFound;
std::atomic<uint32_t> _unAckedCount;
vespalib::Lock _lock;
- FastOS_Time _startTime;
};
std::shared_ptr<State> _state;
@@ -64,9 +58,8 @@ public:
* vespalib::IllegalArgumentException.
*
* @param transport The transport to pass the reply to.
- * @param reply The mbus::Reply corresponding to this operation.
*/
- FeedToken(ITransport &transport, mbus::Reply::UP reply);
+ FeedToken(ITransport &transport);
FeedToken(FeedToken &&) = default;
FeedToken & operator =(FeedToken &&) = default;
@@ -96,13 +89,6 @@ public:
void fail(uint32_t errNum, const vespalib::string &errMsg) const { _state->fail(errNum, errMsg); }
/**
- * Gives you access to the underlying reply message.
- *
- * @return The reply
- */
- mbus::Reply & getReply() const { return _state->getReply(); }
-
- /**
* Gives you access to the underlying result.
*
* @return The result
@@ -115,8 +101,6 @@ public:
void setResult(ResultUP result, bool documentWasFound) {
_state->setResult(std::move(result), documentWasFound);
}
-
- FastOS_Time getStartTime() const { return _state->getStartTime(); }
};
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp
index 69a3a902af8..feebbf4cf2a 100644
--- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp
+++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp
@@ -182,17 +182,14 @@ PersistenceEngine::getHandlerSnapshot(document::BucketSpace bucketSpace) const
}
PersistenceEngine::HandlerSnapshot::UP
-PersistenceEngine::getHandlerSnapshot(document::BucketSpace bucketSpace,
- const DocumentId &id) const
+PersistenceEngine::getHandlerSnapshot(document::BucketSpace bucketSpace, const DocumentId &id) const
{
LockGuard guard(_lock);
return _handlers.getHandlerSnapshot(bucketSpace, id);
}
-PersistenceEngine::PersistenceEngine(IPersistenceEngineOwner &owner,
- const IResourceWriteFilter &writeFilter,
- ssize_t defaultSerializedSize,
- bool ignoreMaxBytes)
+PersistenceEngine::PersistenceEngine(IPersistenceEngineOwner &owner, const IResourceWriteFilter &writeFilter,
+ ssize_t defaultSerializedSize, bool ignoreMaxBytes)
: AbstractPersistenceProvider(),
_defaultSerializedSize(defaultSerializedSize),
_ignoreMaxBytes(ignoreMaxBytes),
@@ -216,8 +213,7 @@ PersistenceEngine::~PersistenceEngine()
IPersistenceHandler::SP
-PersistenceEngine::putHandler(document::BucketSpace bucketSpace,
- const DocTypeName &docType,
+PersistenceEngine::putHandler(document::BucketSpace bucketSpace, const DocTypeName &docType,
const IPersistenceHandler::SP &handler)
{
LockGuard guard(_lock);
@@ -226,8 +222,7 @@ PersistenceEngine::putHandler(document::BucketSpace bucketSpace,
IPersistenceHandler::SP
-PersistenceEngine::getHandler(document::BucketSpace bucketSpace,
- const DocTypeName &docType) const
+PersistenceEngine::getHandler(document::BucketSpace bucketSpace, const DocTypeName &docType) const
{
LockGuard guard(_lock);
return _handlers.getHandler(bucketSpace, docType);
@@ -235,8 +230,7 @@ PersistenceEngine::getHandler(document::BucketSpace bucketSpace,
IPersistenceHandler::SP
-PersistenceEngine::removeHandler(document::BucketSpace bucketSpace,
- const DocTypeName &docType)
+PersistenceEngine::removeHandler(document::BucketSpace bucketSpace, const DocTypeName &docType)
{
// TODO: Grab bucket list and treat them as modified
LockGuard guard(_lock);
@@ -367,8 +361,7 @@ PersistenceEngine::put(const Bucket& b, Timestamp t, const document::Document::S
docType.toString().c_str()));
}
TransportLatch latch(1);
- FeedToken token(latch, mbus::Reply::UP(new documentapi::FeedReply(
- documentapi::DocumentProtocol::REPLY_PUTDOCUMENT)));
+ FeedToken token(latch);
handler->handlePut(token, b, t, doc);
latch.await();
return latch.getResult();
@@ -390,7 +383,7 @@ PersistenceEngine::remove(const Bucket& b, Timestamp t, const DocumentId& did, C
TransportLatch latch(snap->size());
for (; snap->handlers().valid(); snap->handlers().next()) {
IPersistenceHandler *handler = snap->handlers().get();
- FeedToken token(latch, Reply::UP(new RemoveDocumentReply));
+ FeedToken token(latch);
handler->handleRemove(token, b, t, did);
}
latch.await();
@@ -421,7 +414,7 @@ PersistenceEngine::update(const Bucket& b, Timestamp t, const DocumentUpdate::SP
IPersistenceHandler::SP handler = getHandler(b.getBucketSpace(), docType);
TransportLatch latch(1);
if (handler.get() != NULL) {
- FeedToken token(latch, mbus::Reply::UP(new documentapi::UpdateDocumentReply()));
+ FeedToken token(latch);
LOG(debug, "update = %s", upd->toXml().c_str());
handler->handleUpdate(token, b, t, upd);
latch.await();
@@ -433,10 +426,7 @@ PersistenceEngine::update(const Bucket& b, Timestamp t, const DocumentUpdate::SP
PersistenceEngine::GetResult
-PersistenceEngine::get(const Bucket& b,
- const document::FieldSet& fields,
- const DocumentId& did,
- Context& context) const
+PersistenceEngine::get(const Bucket& b, const document::FieldSet& fields, const DocumentId& did, Context& context) const
{
std::shared_lock<std::shared_timed_mutex> rguard(_rwMutex);
HandlerSnapshot::UP snapshot = getHandlerSnapshot(b.getBucketSpace());
@@ -465,11 +455,8 @@ PersistenceEngine::get(const Bucket& b,
PersistenceEngine::CreateIteratorResult
-PersistenceEngine::createIterator(const Bucket &bucket,
- const document::FieldSet& fields,
- const Selection &selection,
- IncludedVersions versions,
- Context & context)
+PersistenceEngine::createIterator(const Bucket &bucket, const document::FieldSet& fields, const Selection &selection,
+ IncludedVersions versions, Context & context)
{
std::shared_lock<std::shared_timed_mutex> rguard(_rwMutex);
HandlerSnapshot::UP snapshot = getHandlerSnapshot(bucket.getBucketSpace());
@@ -552,7 +539,7 @@ PersistenceEngine::createBucket(const Bucket &b, Context &)
TransportLatch latch(snap->size());
for (; snap->handlers().valid(); snap->handlers().next()) {
IPersistenceHandler *handler = snap->handlers().get();
- FeedToken token(latch, Reply::UP(new DocumentReply(0)));
+ FeedToken token(latch);
handler->handleCreateBucket(token, b);
}
latch.await();
@@ -569,7 +556,7 @@ PersistenceEngine::deleteBucket(const Bucket& b, Context&)
TransportLatch latch(snap->size());
for (; snap->handlers().valid(); snap->handlers().next()) {
IPersistenceHandler *handler = snap->handlers().get();
- FeedToken token(latch, Reply::UP(new DocumentReply(0)));
+ FeedToken token(latch);
handler->handleDeleteBucket(token, b);
}
latch.await();
@@ -612,7 +599,7 @@ PersistenceEngine::split(const Bucket& source, const Bucket& target1, const Buck
TransportLatch latch(snap->size());
for (; snap->handlers().valid(); snap->handlers().next()) {
IPersistenceHandler *handler = snap->handlers().get();
- FeedToken token(latch, Reply::UP(new DocumentReply(0)));
+ FeedToken token(latch);
handler->handleSplit(token, source, target1, target2);
}
latch.await();
@@ -631,7 +618,7 @@ PersistenceEngine::join(const Bucket& source1, const Bucket& source2, const Buck
TransportLatch latch(snap->size());
for (; snap->handlers().valid(); snap->handlers().next()) {
IPersistenceHandler *handler = snap->handlers().get();
- FeedToken token(latch, Reply::UP(new DocumentReply(0)));
+ FeedToken token(latch);
handler->handleJoin(token, source1, source2, target);
}
latch.await();
diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.cpp b/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.cpp
index e719a0aa962..16813d1b56c 100644
--- a/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.cpp
+++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.cpp
@@ -16,13 +16,8 @@ TransportLatch::TransportLatch(uint32_t cnt)
TransportLatch::~TransportLatch() {}
void
-TransportLatch::send(mbus::Reply::UP reply,
- ResultUP result,
- bool documentWasFound,
- double latency_ms)
+TransportLatch::send(ResultUP result, bool documentWasFound)
{
- (void) reply;
- (void) latency_ms;
{
vespalib::LockGuard guard(_lock);
if (!_result.get()) {
diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.h b/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.h
index 747c95358b4..12f92722dfa 100644
--- a/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.h
+++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.h
@@ -20,10 +20,7 @@ private:
public:
TransportLatch(uint32_t cnt);
~TransportLatch();
- virtual void send(mbus::Reply::UP reply,
- ResultUP result,
- bool documentWasFound,
- double latency_ms) override;
+ void send(ResultUP result, bool documentWasFound) override;
void await() {
_latch.await();
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
index 5c6dd497cbd..7aeaea1d1a7 100644
--- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
@@ -49,27 +49,9 @@ namespace proton {
namespace {
-void
-setUpdateWasFound(mbus::Reply &reply, bool was_found)
-{
- assert(static_cast<DocumentReply&>(reply).getType() == DocumentProtocol::REPLY_UPDATEDOCUMENT);
- UpdateDocumentReply &update_rep = static_cast<UpdateDocumentReply&>(reply);
- update_rep.setWasFound(was_found);
-}
-
-void
-setRemoveWasFound(mbus::Reply &reply, bool was_found)
-{
- assert(static_cast<DocumentReply&>(reply).getType() == DocumentProtocol::REPLY_REMOVEDOCUMENT);
- RemoveDocumentReply &remove_rep = static_cast<RemoveDocumentReply&>(reply);
- remove_rep.setWasFound(was_found);
-}
-
bool
-ignoreOperation(const DocumentOperation &op)
-{
- return (op.getPrevTimestamp() != 0)
- && (op.getTimestamp() < op.getPrevTimestamp());
+ignoreOperation(const DocumentOperation &op) {
+ return (op.getPrevTimestamp() != 0) && (op.getTimestamp() < op.getPrevTimestamp());
}
} // namespace
@@ -142,7 +124,6 @@ FeedHandler::performUpdate(FeedToken::UP token, UpdateOperation &op)
} else {
if (token) {
token->setResult(ResultUP(new UpdateResult(Timestamp(0))), false);
- setUpdateWasFound(token->getReply(), false);
token->ack();
}
}
@@ -155,7 +136,6 @@ FeedHandler::performInternalUpdate(FeedToken::UP token, UpdateOperation &op)
storeOperation(op);
if (token) {
token->setResult(ResultUP(new UpdateResult(op.getPrevTimestamp())), true);
- setUpdateWasFound(token->getReply(), true);
}
_activeFeedView->handleUpdate(token.get(), op);
}
@@ -172,10 +152,9 @@ FeedHandler::createNonExistingDocument(FeedToken::UP token, const UpdateOperatio
storeOperation(putOp);
if (token) {
token->setResult(ResultUP(new UpdateResult(putOp.getTimestamp())), true);
- setUpdateWasFound(token->getReply(), true);
}
TransportLatch latch(1);
- FeedToken putToken(latch, mbus::Reply::UP(new FeedReply(DocumentProtocol::REPLY_PUTDOCUMENT)));
+ FeedToken putToken(latch);
_activeFeedView->handlePut(&putToken, putOp);
latch.await();
if (token) {
@@ -202,7 +181,6 @@ void FeedHandler::performRemove(FeedToken::UP token, RemoveOperation &op) {
if (token) {
bool documentWasFound = !op.getPrevMarkedAsRemoved();
token->setResult(ResultUP(new RemoveResult(documentWasFound)), documentWasFound);
- setRemoveWasFound(token->getReply(), documentWasFound);
}
_activeFeedView->handleRemove(token.get(), op);
} else if (op.hasDocType()) {
@@ -210,13 +188,11 @@ void FeedHandler::performRemove(FeedToken::UP token, RemoveOperation &op) {
storeOperation(op);
if (token) {
token->setResult(ResultUP(new RemoveResult(false)), false);
- setRemoveWasFound(token->getReply(), false);
}
_activeFeedView->handleRemove(token.get(), op);
} else {
if (token) {
token->setResult(ResultUP(new RemoveResult(false)), false);
- setRemoveWasFound(token->getReply(), false);
token->ack();
}
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/feedstate.cpp b/searchcore/src/vespa/searchcore/proton/server/feedstate.cpp
index 2ec3de61dea..3628505ed66 100644
--- a/searchcore/src/vespa/searchcore/proton/server/feedstate.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/feedstate.cpp
@@ -1,6 +1,7 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "feedstate.h"
+#include <vespa/searchcore/proton/feedoperation/feedoperation.h>
#include <vespa/vespalib/util/exceptions.h>
using document::BucketId;
@@ -26,8 +27,7 @@ void FeedState::throwExceptionInReceive(const vespalib::string &docType,
}
void
-FeedState::throwExceptionInHandleOperation(const vespalib::string &docType,
- const FeedOperation &op)
+FeedState::throwExceptionInHandleOperation(const vespalib::string &docType, const FeedOperation &op)
{
throw IllegalStateException
(make_string("We should not receive any feed operations"
diff --git a/searchcore/src/vespa/searchcore/proton/server/feedstate.h b/searchcore/src/vespa/searchcore/proton/server/feedstate.h
index 13dd6ea9dc8..472f5cb224f 100644
--- a/searchcore/src/vespa/searchcore/proton/server/feedstate.h
+++ b/searchcore/src/vespa/searchcore/proton/server/feedstate.h
@@ -12,27 +12,22 @@
namespace proton {
+class FeedOperation;
/**
* Class representing the current state of a feed handler.
*/
class FeedState {
public:
- enum Type { NORMAL,
- REPLAY_TRANSACTION_LOG,
- INIT };
+ enum Type { NORMAL, REPLAY_TRANSACTION_LOG, INIT };
private:
Type _type;
protected:
- void throwExceptionInReceive(const vespalib::string &docType,
- uint64_t serialRangeFrom,
- uint64_t serialRangeTo,
- size_t packetSize);
-
- void throwExceptionInHandleOperation(const vespalib::string &docType,
- const FeedOperation &op);
+ void throwExceptionInReceive(const vespalib::string &docType, uint64_t serialRangeFrom,
+ uint64_t serialRangeTo, size_t packetSize);
+ void throwExceptionInHandleOperation(const vespalib::string &docType, const FeedOperation &op);
public:
typedef std::shared_ptr<FeedState> SP;
@@ -43,11 +38,8 @@ public:
Type getType() const { return _type; }
vespalib::string getName() const;
- virtual void handleOperation(FeedToken token, FeedOperation::UP op) = 0;
-
- virtual void receive(const PacketWrapper::SP &wrap,
- vespalib::Executor &executor) = 0;
+ virtual void handleOperation(FeedToken token, std::unique_ptr<FeedOperation> op) = 0;
+ virtual void receive(const PacketWrapper::SP &wrap, vespalib::Executor &executor) = 0;
};
} // namespace proton
-