diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2017-10-12 18:16:11 +0200 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2017-10-12 18:16:11 +0200 |
commit | 4130c7302a59f3cea0845fb1825600cc1549bc4a (patch) | |
tree | 8869768b617b19c8af3a1a2d3a8ade41c60a232c /searchcore | |
parent | 31553efdaafacaabb28dc7ce96719184be33a22b (diff) |
Get rid of mbus::reply leftovers.
Diffstat (limited to 'searchcore')
11 files changed, 87 insertions, 261 deletions
diff --git a/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp b/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp index 29f043e7caa..30c0556e812 100644 --- a/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp +++ b/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp @@ -305,7 +305,7 @@ struct MyTransport : public FeedToken::ITransport { bool documentWasFound; MyTransport(); ~MyTransport(); - virtual void send(Reply::UP, ResultUP res, bool documentWasFound_, double) override { + void send(ResultUP res, bool documentWasFound_) override { result = std::move(res); documentWasFound = documentWasFound_; gate.countDown(); @@ -315,21 +315,12 @@ struct MyTransport : public FeedToken::ITransport { MyTransport::MyTransport() : gate(), result(), documentWasFound(false) {} MyTransport::~MyTransport() {} -Reply::UP getReply(uint32_t type) { - if (type == DocumentProtocol::REPLY_REMOVEDOCUMENT) { - return Reply::UP(new RemoveDocumentReply); - } else if (type == DocumentProtocol::REPLY_UPDATEDOCUMENT) { - return Reply::UP(new UpdateDocumentReply); - } - return Reply::UP(new DocumentReply(type)); -} - struct FeedTokenContext { MyTransport transport; FeedToken::UP token_ap; FeedToken &token; - FeedTokenContext(uint32_t type = 0); + FeedTokenContext(); ~FeedTokenContext(); bool await(uint32_t timeout = 80000) { return transport.gate.await(timeout); } const Result *getResult() { @@ -340,24 +331,23 @@ struct FeedTokenContext { } }; -FeedTokenContext::FeedTokenContext(uint32_t type) +FeedTokenContext::FeedTokenContext() : transport(), - token_ap(new FeedToken(transport, getReply(type))), + token_ap(new FeedToken(transport)), token(*token_ap) { - token.getReply().getTrace().setLevel(9); } -FeedTokenContext::~FeedTokenContext() {} + +FeedTokenContext::~FeedTokenContext() = default; struct PutContext { FeedTokenContext tokenCtx; DocumentContext docCtx; typedef std::shared_ptr<PutContext> SP; PutContext(const vespalib::string &docId, DocBuilder &builder) : - tokenCtx(DocumentProtocol::REPLY_PUTDOCUMENT), + tokenCtx(), docCtx(docId, builder) - { - } + {} }; @@ -371,12 +361,10 @@ struct PutHandler { builder(db), timestamp(0), puts() - { - } + {} void put(const vespalib::string &docId) { PutContext::SP pc(new PutContext(docId, builder)); - FeedOperation::UP op(new PutOperation(pc->docCtx.bucketId, - timestamp, pc->docCtx.doc)); + FeedOperation::UP op(new PutOperation(pc->docCtx.bucketId, timestamp, pc->docCtx.doc)); handler.handleOperation(pc->tokenCtx.token, std::move(op)); timestamp = Timestamp(timestamp + 1); puts.push_back(pc); @@ -492,12 +480,10 @@ TEST_F("require that heartBeat calls FeedView's heartBeat", TEST_F("require that outdated remove is ignored", FeedHandlerFixture) { DocumentContext doc_context("doc:test:foo", *f.schema.builder); - FeedOperation::UP op(new RemoveOperation(doc_context.bucketId, - Timestamp(10), - doc_context.doc->getId())); + FeedOperation::UP op(new RemoveOperation(doc_context.bucketId, Timestamp(10), doc_context.doc->getId())); static_cast<DocumentOperation &>(*op).setPrevDbDocumentId(DbDocumentId(4)); static_cast<DocumentOperation &>(*op).setPrevTimestamp(Timestamp(10000)); - FeedTokenContext token_context(DocumentProtocol::REPLY_REMOVEDOCUMENT); + FeedTokenContext token_context; f.handler.performOperation(std::move(token_context.token_ap), std::move(op)); EXPECT_EQUAL(0, f.feedView.remove_count); EXPECT_EQUAL(0, f.tls_writer.store_count); @@ -584,15 +570,11 @@ TEST_F("require that flush cannot unprune", FeedHandlerFixture) EXPECT_EQUAL(10u, f.handler.getPrunedSerialNum()); } -TEST_F("require that remove of unknown document with known data type " - "stores remove", FeedHandlerFixture) +TEST_F("require that remove of unknown document with known data type stores remove", FeedHandlerFixture) { - DocumentContext doc_context("id:test:searchdocument::foo", - *f.schema.builder); - FeedOperation::UP op(new RemoveOperation(doc_context.bucketId, - Timestamp(10), - doc_context.doc->getId())); - FeedTokenContext token_context(DocumentProtocol::REPLY_REMOVEDOCUMENT); + DocumentContext doc_context("id:test:searchdocument::foo", *f.schema.builder); + FeedOperation::UP op(new RemoveOperation(doc_context.bucketId, Timestamp(10), doc_context.doc->getId())); + FeedTokenContext token_context; f.handler.performOperation(std::move(token_context.token_ap), std::move(op)); EXPECT_EQUAL(1, f.feedView.remove_count); EXPECT_EQUAL(1, f.tls_writer.store_count); @@ -602,10 +584,8 @@ TEST_F("require that partial update for non-existing document is tagged as such" FeedHandlerFixture) { UpdateContext upCtx("id:test:searchdocument::foo", *f.schema.builder); - FeedOperation::UP op(new UpdateOperation(upCtx.bucketId, - Timestamp(10), - upCtx.update)); - FeedTokenContext token_context(DocumentProtocol::REPLY_UPDATEDOCUMENT); + FeedOperation::UP op(new UpdateOperation(upCtx.bucketId, Timestamp(10), upCtx.update)); + FeedTokenContext token_context; f.handler.performOperation(std::move(token_context.token_ap), std::move(op)); const UpdateResult *result = static_cast<const UpdateResult *>(token_context.getResult()); @@ -622,12 +602,9 @@ TEST_F("require that partial update for non-existing document is created if spec f.handler.setSerialNum(15); UpdateContext upCtx("id:test:searchdocument::foo", *f.schema.builder); upCtx.update->setCreateIfNonExistent(true); - f.feedView.metaStore.insert(upCtx.update->getId().getGlobalId(), - MyDocumentMetaStore::Entry(5, 5, Timestamp(10))); - FeedOperation::UP op(new UpdateOperation(upCtx.bucketId, - Timestamp(10), - upCtx.update)); - FeedTokenContext token_context(DocumentProtocol::REPLY_UPDATEDOCUMENT); + f.feedView.metaStore.insert(upCtx.update->getId().getGlobalId(), MyDocumentMetaStore::Entry(5, 5, Timestamp(10))); + FeedOperation::UP op(new UpdateOperation(upCtx.bucketId, Timestamp(10), upCtx.update)); + FeedTokenContext token_context; f.handler.performOperation(std::move(token_context.token_ap), std::move(op)); const UpdateResult *result = static_cast<const UpdateResult *>(token_context.getResult()); @@ -663,7 +640,7 @@ TEST_F("require that update is rejected if resource limit is reached", FeedHandl UpdateContext updCtx("id:test:searchdocument::foo", *f.schema.builder); FeedOperation::UP op = std::make_unique<UpdateOperation>(updCtx.bucketId, Timestamp(10), updCtx.update); - FeedTokenContext token(DocumentProtocol::REPLY_UPDATEDOCUMENT); + FeedTokenContext token; f.handler.performOperation(std::move(token.token_ap), std::move(op)); EXPECT_EQUAL(0, f.feedView.update_count); EXPECT_TRUE(dynamic_cast<const UpdateResult *>(token.getResult())); @@ -679,7 +656,7 @@ TEST_F("require that remove is NOT rejected if resource limit is reached", FeedH DocumentContext docCtx("id:test:searchdocument::foo", *f.schema.builder); FeedOperation::UP op = std::make_unique<RemoveOperation>(docCtx.bucketId, Timestamp(10), docCtx.doc->getId()); - FeedTokenContext token(DocumentProtocol::REPLY_REMOVEDOCUMENT); + FeedTokenContext token; f.handler.performOperation(std::move(token.token_ap), std::move(op)); EXPECT_EQUAL(1, f.feedView.remove_count); EXPECT_EQUAL(Result::NONE, token.getResult()->getErrorCode()); diff --git a/searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp b/searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp index 6d4e5ae2478..4eefbed0a53 100644 --- a/searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp +++ b/searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp @@ -412,11 +412,7 @@ struct MyTransport : public FeedToken::ITransport MyTracer &_tracer; MyTransport(MyTracer &tracer); ~MyTransport(); - virtual void send(mbus::Reply::UP reply, - ResultUP result, - bool documentWasFound, - double latency_ms) override { - (void) reply; (void) documentWasFound, (void) latency_ms; + void send(ResultUP result, bool ) override { lastResult = std::move(result); _tracer.traceAck(lastResult); _gate.countDown(); @@ -484,36 +480,20 @@ DocumentContext::DocumentContext(const vespalib::string &docId, uint64_t timesta {} DocumentContext::~DocumentContext() {} -namespace { - -mbus::Reply::UP -createReply(MessageType mtype) -{ - if (mtype == DocumentProtocol::REPLY_UPDATEDOCUMENT) { - return mbus::Reply::UP(new documentapi::UpdateDocumentReply); - } else if (mtype == DocumentProtocol::REPLY_REMOVEDOCUMENT) { - return mbus::Reply::UP(new documentapi::RemoveDocumentReply); - } else { - return mbus::Reply::UP(new documentapi::DocumentReply(mtype)); - } -} - -} // namespace - struct FeedTokenContext { MyTransport mt; FeedToken ft; typedef std::shared_ptr<FeedTokenContext> SP; typedef std::vector<SP> List; - FeedTokenContext(MyTracer &tracer, MessageType mtype); + FeedTokenContext(MyTracer &tracer); ~FeedTokenContext(); }; -FeedTokenContext::FeedTokenContext(MyTracer &tracer, MessageType mtype) - : mt(tracer), ft(mt, createReply(mtype)) +FeedTokenContext::FeedTokenContext(MyTracer &tracer) + : mt(tracer), ft(mt) {} -FeedTokenContext::~FeedTokenContext() {} +FeedTokenContext::~FeedTokenContext() = default; struct FixtureBase { @@ -604,7 +584,7 @@ struct FixtureBase } void putAndWait(const DocumentContext &docCtx) { - FeedTokenContext token(_tracer, DocumentProtocol::REPLY_PUTDOCUMENT); + FeedTokenContext token(_tracer); PutOperation op(docCtx.bid, docCtx.ts, docCtx.doc); runInMaster([&] () { performPut(&token.ft, op); }); } @@ -616,7 +596,7 @@ struct FixtureBase } void updateAndWait(const DocumentContext &docCtx) { - FeedTokenContext token(_tracer, DocumentProtocol::REPLY_UPDATEDOCUMENT); + FeedTokenContext token(_tracer); UpdateOperation op(docCtx.bid, docCtx.ts, docCtx.upd); runInMaster([&] () { performUpdate(&token.ft, op); }); } @@ -634,7 +614,7 @@ struct FixtureBase } void removeAndWait(const DocumentContext &docCtx) { - FeedTokenContext token(_tracer, DocumentProtocol::REPLY_REMOVEDOCUMENT); + FeedTokenContext token(_tracer); RemoveOperation op(docCtx.bid, docCtx.ts, docCtx.doc->getId()); runInMaster([&] () { performRemove(&token.ft, op); }); } diff --git a/searchcore/src/tests/proton/feedtoken/feedtoken.cpp b/searchcore/src/tests/proton/feedtoken/feedtoken.cpp index 9df65ae3437..301f21d04a1 100644 --- a/searchcore/src/tests/proton/feedtoken/feedtoken.cpp +++ b/searchcore/src/tests/proton/feedtoken/feedtoken.cpp @@ -1,57 +1,39 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include <vespa/messagebus/emptyreply.h> -#include <vespa/messagebus/testlib/receptor.h> -#include <vespa/documentapi/messagebus/messages/removedocumentreply.h> -#include <vespa/searchcore/proton/common/feedtoken.h> #include <vespa/vespalib/testkit/testapp.h> +#include <vespa/searchcore/proton/common/feedtoken.h> #include <vespa/vespalib/util/exceptions.h> using namespace proton; class LocalTransport : public FeedToken::ITransport { private: - mbus::Receptor _receptor; - double _latency_ms; + size_t _receivedCount; public: LocalTransport() - : _receptor(), - _latency_ms(0.0) - { - // empty - } - - void send(mbus::Reply::UP reply, ResultUP, bool, double latency_ms) override { - _receptor.handleReply(std::move(reply)); - _latency_ms = latency_ms; - } + : _receivedCount(0) + { } - mbus::Reply::UP getReply() { - return _receptor.getReply(); + void send(ResultUP, bool) override { + _receivedCount++; } - double getLatencyMs() const { - return _latency_ms; - } + size_t getReceivedCount() const { return _receivedCount; } }; class Test : public vespalib::TestApp { private: void testAck(); - void testAutoReply(); void testFail(); void testHandover(); - void testIntegrity(); public: int Main() override { TEST_INIT("feedtoken_test"); testAck(); TEST_FLUSH(); -// testAutoReply(); TEST_FLUSH(); testFail(); TEST_FLUSH(); testHandover(); TEST_FLUSH(); -// testIntegrity(); TEST_FLUSH(); TEST_DONE(); } @@ -63,41 +45,18 @@ void Test::testAck() { LocalTransport transport; - mbus::Reply::UP msg(new documentapi::RemoveDocumentReply()); - FeedToken token(transport, std::move(msg)); + FeedToken token(transport); token.ack(); - mbus::Reply::UP reply = transport.getReply(); - ASSERT_TRUE(reply.get() != NULL); - EXPECT_TRUE(!reply->hasErrors()); -} - -void -Test::testAutoReply() -{ - mbus::Receptor receptor; - mbus::Reply::UP reply(new documentapi::RemoveDocumentReply()); - reply->pushHandler(receptor); - { - LocalTransport transport; - FeedToken token(transport, std::move(reply)); - } - reply = receptor.getReply(0); - ASSERT_TRUE(reply.get() != NULL); - EXPECT_TRUE(reply->hasErrors()); + EXPECT_EQUAL(1u, transport.getReceivedCount()); } void Test::testFail() { LocalTransport transport; - mbus::Reply::UP reply(new documentapi::RemoveDocumentReply()); - FeedToken token(transport, std::move(reply)); + FeedToken token(transport); token.fail(69, "6699"); - reply = transport.getReply(); - ASSERT_TRUE(reply.get() != NULL); - EXPECT_EQUAL(1u, reply->getNumErrors()); - EXPECT_EQUAL(69u, reply->getError(0).getCode()); - EXPECT_EQUAL("6699", reply->getError(0).getMessage()); + EXPECT_EQUAL(1u, transport.getReceivedCount()); } void @@ -110,25 +69,11 @@ Test::testHandover() }; LocalTransport transport; - mbus::Reply::UP reply(new documentapi::RemoveDocumentReply()); - FeedToken token(transport, std::move(reply)); + FeedToken token(transport); token = MyHandover::handover(token); token.ack(); - reply = transport.getReply(); - ASSERT_TRUE(reply.get() != NULL); - EXPECT_TRUE(!reply->hasErrors()); + EXPECT_EQUAL(1u, transport.getReceivedCount()); } -void -Test::testIntegrity() -{ - LocalTransport transport; - try { - FeedToken token(transport, mbus::Reply::UP()); - EXPECT_TRUE(false); // should throw an exception - } catch (vespalib::IllegalArgumentException &e) { - (void)e; // expected - } -} diff --git a/searchcore/src/vespa/searchcore/proton/common/feedtoken.cpp b/searchcore/src/vespa/searchcore/proton/common/feedtoken.cpp index 272719f8e4a..0a4f00ef6e0 100644 --- a/searchcore/src/vespa/searchcore/proton/common/feedtoken.cpp +++ b/searchcore/src/vespa/searchcore/proton/common/feedtoken.cpp @@ -4,36 +4,32 @@ namespace proton { -FeedToken::FeedToken(ITransport &transport, mbus::Reply::UP reply) : - _state(new State(transport, std::move(reply), 1)) +FeedToken::FeedToken(ITransport &transport) : + _state(new State(transport, 1)) { } -FeedToken::State::State(ITransport & transport, mbus::Reply::UP reply, uint32_t numAcksRequired) : +FeedToken::State::State(ITransport & transport, uint32_t numAcksRequired) : _transport(transport), - _reply(std::move(reply)), _result(new storage::spi::Result()), _documentWasFound(false), _unAckedCount(numAcksRequired), - _lock(), - _startTime() + _lock() { - assert(_reply); - _startTime.SetNow(); + assert(_unAckedCount > 0); } FeedToken::State::~State() { - assert(!_reply); + assert(_unAckedCount == 0); } void FeedToken::State::ack() { - assert(_reply); uint32_t prev(_unAckedCount--); if (prev == 1) { - _transport.send(std::move(_reply), std::move(_result), _documentWasFound, _startTime.MilliSecsToNow()); + _transport.send(std::move(_result), _documentWasFound); } assert(prev >= 1); } @@ -41,20 +37,17 @@ FeedToken::State::ack() void FeedToken::State::incNeededAcks() { - assert(_reply); uint32_t prev(_unAckedCount++); assert(prev >= 1); (void) prev; } - void -FeedToken::State::fail(uint32_t errNum, const vespalib::string &errMsg) +FeedToken::State::fail(uint32_t, const vespalib::string &) { - assert(_reply); + _unAckedCount = 0; vespalib::LockGuard guard(_lock); - _reply->addError(mbus::Error(errNum, errMsg)); - _transport.send(std::move(_reply), std::move(_result), _documentWasFound, _startTime.MilliSecsToNow()); + _transport.send(std::move(_result), _documentWasFound); } } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/common/feedtoken.h b/searchcore/src/vespa/searchcore/proton/common/feedtoken.h index 0983819cefb..8f6b315f803 100644 --- a/searchcore/src/vespa/searchcore/proton/common/feedtoken.h +++ b/searchcore/src/vespa/searchcore/proton/common/feedtoken.h @@ -1,11 +1,9 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once -#include <vespa/messagebus/reply.h> #include <vespa/persistence/spi/persistenceprovider.h> #include <vespa/vespalib/util/exception.h> #include <vespa/vespalib/util/sync.h> -#include <vespa/searchcore/proton/feedoperation/feedoperation.h> #include <atomic> namespace proton { @@ -22,7 +20,7 @@ public: class ITransport { public: virtual ~ITransport() { } - virtual void send(mbus::Reply::UP reply, ResultUP result, bool documentWasFound, double latency_ms) = 0; + virtual void send(ResultUP result, bool documentWasFound) = 0; }; private: @@ -30,26 +28,22 @@ private: public: State(const State &) = delete; State & operator = (const State &) = delete; - State(ITransport & transport, mbus::Reply::UP reply, uint32_t numAcksRequired); + State(ITransport & transport, uint32_t numAcksRequired); ~State(); void incNeededAcks(); void ack(); void fail(uint32_t errNum, const vespalib::string &errMsg); - mbus::Reply & getReply() { return *_reply; } void setResult(ResultUP result, bool documentWasFound) { _documentWasFound = documentWasFound; _result = std::move(result); } const storage::spi::Result &getResult() { return *_result; } - FastOS_Time getStartTime() const { return _startTime; } private: ITransport &_transport; - mbus::Reply::UP _reply; ResultUP _result; bool _documentWasFound; std::atomic<uint32_t> _unAckedCount; vespalib::Lock _lock; - FastOS_Time _startTime; }; std::shared_ptr<State> _state; @@ -64,9 +58,8 @@ public: * vespalib::IllegalArgumentException. * * @param transport The transport to pass the reply to. - * @param reply The mbus::Reply corresponding to this operation. */ - FeedToken(ITransport &transport, mbus::Reply::UP reply); + FeedToken(ITransport &transport); FeedToken(FeedToken &&) = default; FeedToken & operator =(FeedToken &&) = default; @@ -96,13 +89,6 @@ public: void fail(uint32_t errNum, const vespalib::string &errMsg) const { _state->fail(errNum, errMsg); } /** - * Gives you access to the underlying reply message. - * - * @return The reply - */ - mbus::Reply & getReply() const { return _state->getReply(); } - - /** * Gives you access to the underlying result. * * @return The result @@ -115,8 +101,6 @@ public: void setResult(ResultUP result, bool documentWasFound) { _state->setResult(std::move(result), documentWasFound); } - - FastOS_Time getStartTime() const { return _state->getStartTime(); } }; } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp index 69a3a902af8..feebbf4cf2a 100644 --- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp +++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp @@ -182,17 +182,14 @@ PersistenceEngine::getHandlerSnapshot(document::BucketSpace bucketSpace) const } PersistenceEngine::HandlerSnapshot::UP -PersistenceEngine::getHandlerSnapshot(document::BucketSpace bucketSpace, - const DocumentId &id) const +PersistenceEngine::getHandlerSnapshot(document::BucketSpace bucketSpace, const DocumentId &id) const { LockGuard guard(_lock); return _handlers.getHandlerSnapshot(bucketSpace, id); } -PersistenceEngine::PersistenceEngine(IPersistenceEngineOwner &owner, - const IResourceWriteFilter &writeFilter, - ssize_t defaultSerializedSize, - bool ignoreMaxBytes) +PersistenceEngine::PersistenceEngine(IPersistenceEngineOwner &owner, const IResourceWriteFilter &writeFilter, + ssize_t defaultSerializedSize, bool ignoreMaxBytes) : AbstractPersistenceProvider(), _defaultSerializedSize(defaultSerializedSize), _ignoreMaxBytes(ignoreMaxBytes), @@ -216,8 +213,7 @@ PersistenceEngine::~PersistenceEngine() IPersistenceHandler::SP -PersistenceEngine::putHandler(document::BucketSpace bucketSpace, - const DocTypeName &docType, +PersistenceEngine::putHandler(document::BucketSpace bucketSpace, const DocTypeName &docType, const IPersistenceHandler::SP &handler) { LockGuard guard(_lock); @@ -226,8 +222,7 @@ PersistenceEngine::putHandler(document::BucketSpace bucketSpace, IPersistenceHandler::SP -PersistenceEngine::getHandler(document::BucketSpace bucketSpace, - const DocTypeName &docType) const +PersistenceEngine::getHandler(document::BucketSpace bucketSpace, const DocTypeName &docType) const { LockGuard guard(_lock); return _handlers.getHandler(bucketSpace, docType); @@ -235,8 +230,7 @@ PersistenceEngine::getHandler(document::BucketSpace bucketSpace, IPersistenceHandler::SP -PersistenceEngine::removeHandler(document::BucketSpace bucketSpace, - const DocTypeName &docType) +PersistenceEngine::removeHandler(document::BucketSpace bucketSpace, const DocTypeName &docType) { // TODO: Grab bucket list and treat them as modified LockGuard guard(_lock); @@ -367,8 +361,7 @@ PersistenceEngine::put(const Bucket& b, Timestamp t, const document::Document::S docType.toString().c_str())); } TransportLatch latch(1); - FeedToken token(latch, mbus::Reply::UP(new documentapi::FeedReply( - documentapi::DocumentProtocol::REPLY_PUTDOCUMENT))); + FeedToken token(latch); handler->handlePut(token, b, t, doc); latch.await(); return latch.getResult(); @@ -390,7 +383,7 @@ PersistenceEngine::remove(const Bucket& b, Timestamp t, const DocumentId& did, C TransportLatch latch(snap->size()); for (; snap->handlers().valid(); snap->handlers().next()) { IPersistenceHandler *handler = snap->handlers().get(); - FeedToken token(latch, Reply::UP(new RemoveDocumentReply)); + FeedToken token(latch); handler->handleRemove(token, b, t, did); } latch.await(); @@ -421,7 +414,7 @@ PersistenceEngine::update(const Bucket& b, Timestamp t, const DocumentUpdate::SP IPersistenceHandler::SP handler = getHandler(b.getBucketSpace(), docType); TransportLatch latch(1); if (handler.get() != NULL) { - FeedToken token(latch, mbus::Reply::UP(new documentapi::UpdateDocumentReply())); + FeedToken token(latch); LOG(debug, "update = %s", upd->toXml().c_str()); handler->handleUpdate(token, b, t, upd); latch.await(); @@ -433,10 +426,7 @@ PersistenceEngine::update(const Bucket& b, Timestamp t, const DocumentUpdate::SP PersistenceEngine::GetResult -PersistenceEngine::get(const Bucket& b, - const document::FieldSet& fields, - const DocumentId& did, - Context& context) const +PersistenceEngine::get(const Bucket& b, const document::FieldSet& fields, const DocumentId& did, Context& context) const { std::shared_lock<std::shared_timed_mutex> rguard(_rwMutex); HandlerSnapshot::UP snapshot = getHandlerSnapshot(b.getBucketSpace()); @@ -465,11 +455,8 @@ PersistenceEngine::get(const Bucket& b, PersistenceEngine::CreateIteratorResult -PersistenceEngine::createIterator(const Bucket &bucket, - const document::FieldSet& fields, - const Selection &selection, - IncludedVersions versions, - Context & context) +PersistenceEngine::createIterator(const Bucket &bucket, const document::FieldSet& fields, const Selection &selection, + IncludedVersions versions, Context & context) { std::shared_lock<std::shared_timed_mutex> rguard(_rwMutex); HandlerSnapshot::UP snapshot = getHandlerSnapshot(bucket.getBucketSpace()); @@ -552,7 +539,7 @@ PersistenceEngine::createBucket(const Bucket &b, Context &) TransportLatch latch(snap->size()); for (; snap->handlers().valid(); snap->handlers().next()) { IPersistenceHandler *handler = snap->handlers().get(); - FeedToken token(latch, Reply::UP(new DocumentReply(0))); + FeedToken token(latch); handler->handleCreateBucket(token, b); } latch.await(); @@ -569,7 +556,7 @@ PersistenceEngine::deleteBucket(const Bucket& b, Context&) TransportLatch latch(snap->size()); for (; snap->handlers().valid(); snap->handlers().next()) { IPersistenceHandler *handler = snap->handlers().get(); - FeedToken token(latch, Reply::UP(new DocumentReply(0))); + FeedToken token(latch); handler->handleDeleteBucket(token, b); } latch.await(); @@ -612,7 +599,7 @@ PersistenceEngine::split(const Bucket& source, const Bucket& target1, const Buck TransportLatch latch(snap->size()); for (; snap->handlers().valid(); snap->handlers().next()) { IPersistenceHandler *handler = snap->handlers().get(); - FeedToken token(latch, Reply::UP(new DocumentReply(0))); + FeedToken token(latch); handler->handleSplit(token, source, target1, target2); } latch.await(); @@ -631,7 +618,7 @@ PersistenceEngine::join(const Bucket& source1, const Bucket& source2, const Buck TransportLatch latch(snap->size()); for (; snap->handlers().valid(); snap->handlers().next()) { IPersistenceHandler *handler = snap->handlers().get(); - FeedToken token(latch, Reply::UP(new DocumentReply(0))); + FeedToken token(latch); handler->handleJoin(token, source1, source2, target); } latch.await(); diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.cpp b/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.cpp index e719a0aa962..16813d1b56c 100644 --- a/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.cpp +++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.cpp @@ -16,13 +16,8 @@ TransportLatch::TransportLatch(uint32_t cnt) TransportLatch::~TransportLatch() {} void -TransportLatch::send(mbus::Reply::UP reply, - ResultUP result, - bool documentWasFound, - double latency_ms) +TransportLatch::send(ResultUP result, bool documentWasFound) { - (void) reply; - (void) latency_ms; { vespalib::LockGuard guard(_lock); if (!_result.get()) { diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.h b/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.h index 747c95358b4..12f92722dfa 100644 --- a/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.h +++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.h @@ -20,10 +20,7 @@ private: public: TransportLatch(uint32_t cnt); ~TransportLatch(); - virtual void send(mbus::Reply::UP reply, - ResultUP result, - bool documentWasFound, - double latency_ms) override; + void send(ResultUP result, bool documentWasFound) override; void await() { _latch.await(); } diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp index 5c6dd497cbd..7aeaea1d1a7 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp @@ -49,27 +49,9 @@ namespace proton { namespace { -void -setUpdateWasFound(mbus::Reply &reply, bool was_found) -{ - assert(static_cast<DocumentReply&>(reply).getType() == DocumentProtocol::REPLY_UPDATEDOCUMENT); - UpdateDocumentReply &update_rep = static_cast<UpdateDocumentReply&>(reply); - update_rep.setWasFound(was_found); -} - -void -setRemoveWasFound(mbus::Reply &reply, bool was_found) -{ - assert(static_cast<DocumentReply&>(reply).getType() == DocumentProtocol::REPLY_REMOVEDOCUMENT); - RemoveDocumentReply &remove_rep = static_cast<RemoveDocumentReply&>(reply); - remove_rep.setWasFound(was_found); -} - bool -ignoreOperation(const DocumentOperation &op) -{ - return (op.getPrevTimestamp() != 0) - && (op.getTimestamp() < op.getPrevTimestamp()); +ignoreOperation(const DocumentOperation &op) { + return (op.getPrevTimestamp() != 0) && (op.getTimestamp() < op.getPrevTimestamp()); } } // namespace @@ -142,7 +124,6 @@ FeedHandler::performUpdate(FeedToken::UP token, UpdateOperation &op) } else { if (token) { token->setResult(ResultUP(new UpdateResult(Timestamp(0))), false); - setUpdateWasFound(token->getReply(), false); token->ack(); } } @@ -155,7 +136,6 @@ FeedHandler::performInternalUpdate(FeedToken::UP token, UpdateOperation &op) storeOperation(op); if (token) { token->setResult(ResultUP(new UpdateResult(op.getPrevTimestamp())), true); - setUpdateWasFound(token->getReply(), true); } _activeFeedView->handleUpdate(token.get(), op); } @@ -172,10 +152,9 @@ FeedHandler::createNonExistingDocument(FeedToken::UP token, const UpdateOperatio storeOperation(putOp); if (token) { token->setResult(ResultUP(new UpdateResult(putOp.getTimestamp())), true); - setUpdateWasFound(token->getReply(), true); } TransportLatch latch(1); - FeedToken putToken(latch, mbus::Reply::UP(new FeedReply(DocumentProtocol::REPLY_PUTDOCUMENT))); + FeedToken putToken(latch); _activeFeedView->handlePut(&putToken, putOp); latch.await(); if (token) { @@ -202,7 +181,6 @@ void FeedHandler::performRemove(FeedToken::UP token, RemoveOperation &op) { if (token) { bool documentWasFound = !op.getPrevMarkedAsRemoved(); token->setResult(ResultUP(new RemoveResult(documentWasFound)), documentWasFound); - setRemoveWasFound(token->getReply(), documentWasFound); } _activeFeedView->handleRemove(token.get(), op); } else if (op.hasDocType()) { @@ -210,13 +188,11 @@ void FeedHandler::performRemove(FeedToken::UP token, RemoveOperation &op) { storeOperation(op); if (token) { token->setResult(ResultUP(new RemoveResult(false)), false); - setRemoveWasFound(token->getReply(), false); } _activeFeedView->handleRemove(token.get(), op); } else { if (token) { token->setResult(ResultUP(new RemoveResult(false)), false); - setRemoveWasFound(token->getReply(), false); token->ack(); } } diff --git a/searchcore/src/vespa/searchcore/proton/server/feedstate.cpp b/searchcore/src/vespa/searchcore/proton/server/feedstate.cpp index 2ec3de61dea..3628505ed66 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedstate.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/feedstate.cpp @@ -1,6 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "feedstate.h" +#include <vespa/searchcore/proton/feedoperation/feedoperation.h> #include <vespa/vespalib/util/exceptions.h> using document::BucketId; @@ -26,8 +27,7 @@ void FeedState::throwExceptionInReceive(const vespalib::string &docType, } void -FeedState::throwExceptionInHandleOperation(const vespalib::string &docType, - const FeedOperation &op) +FeedState::throwExceptionInHandleOperation(const vespalib::string &docType, const FeedOperation &op) { throw IllegalStateException (make_string("We should not receive any feed operations" diff --git a/searchcore/src/vespa/searchcore/proton/server/feedstate.h b/searchcore/src/vespa/searchcore/proton/server/feedstate.h index 13dd6ea9dc8..472f5cb224f 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedstate.h +++ b/searchcore/src/vespa/searchcore/proton/server/feedstate.h @@ -12,27 +12,22 @@ namespace proton { +class FeedOperation; /** * Class representing the current state of a feed handler. */ class FeedState { public: - enum Type { NORMAL, - REPLAY_TRANSACTION_LOG, - INIT }; + enum Type { NORMAL, REPLAY_TRANSACTION_LOG, INIT }; private: Type _type; protected: - void throwExceptionInReceive(const vespalib::string &docType, - uint64_t serialRangeFrom, - uint64_t serialRangeTo, - size_t packetSize); - - void throwExceptionInHandleOperation(const vespalib::string &docType, - const FeedOperation &op); + void throwExceptionInReceive(const vespalib::string &docType, uint64_t serialRangeFrom, + uint64_t serialRangeTo, size_t packetSize); + void throwExceptionInHandleOperation(const vespalib::string &docType, const FeedOperation &op); public: typedef std::shared_ptr<FeedState> SP; @@ -43,11 +38,8 @@ public: Type getType() const { return _type; } vespalib::string getName() const; - virtual void handleOperation(FeedToken token, FeedOperation::UP op) = 0; - - virtual void receive(const PacketWrapper::SP &wrap, - vespalib::Executor &executor) = 0; + virtual void handleOperation(FeedToken token, std::unique_ptr<FeedOperation> op) = 0; + virtual void receive(const PacketWrapper::SP &wrap, vespalib::Executor &executor) = 0; }; } // namespace proton - |