diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-04-28 14:57:44 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2020-04-28 14:57:44 +0000 |
commit | 9370479561c4098775ca3c6ae3985eb563ce050d (patch) | |
tree | 2f62764f157bf2c132d4db9af4ef16c107df29cb /storage | |
parent | 7befd97b93525c96ef1d90d8d94393f7e881ccc0 (diff) |
Implement hasReply avoid copying the shared_ptr just to peak at the result.
Diffstat (limited to 'storage')
8 files changed, 64 insertions, 70 deletions
diff --git a/storage/src/tests/persistence/mergehandlertest.cpp b/storage/src/tests/persistence/mergehandlertest.cpp index 7e7a76b95a8..3b3846c3278 100644 --- a/storage/src/tests/persistence/mergehandlertest.cpp +++ b/storage/src/tests/persistence/mergehandlertest.cpp @@ -218,7 +218,7 @@ TEST_F(MergeHandlerTest, merge_bucket_command) { EXPECT_EQ(1234, cmd2.getSourceIndex()); tracker->generateReply(cmd); - EXPECT_FALSE(tracker->getReply().get()); + EXPECT_FALSE(tracker->hasReply()); } void @@ -230,7 +230,7 @@ MergeHandlerTest::testGetBucketDiffChain(bool midChain) LOG(debug, "Verifying that get bucket diff is sent on"); api::GetBucketDiffCommand cmd(_bucket, _nodes, _maxTimestamp); MessageTracker::UP tracker1 = handler.handleGetBucketDiff(cmd, *_context); - api::StorageMessage::SP replySent = tracker1->getReply(); + api::StorageMessage::SP replySent = tracker1->getReplySP(); if (midChain) { LOG(debug, "Check state"); @@ -279,7 +279,7 @@ MergeHandlerTest::testApplyBucketDiffChain(bool midChain) LOG(debug, "Verifying that apply bucket diff is sent on"); api::ApplyBucketDiffCommand cmd(_bucket, _nodes, _maxTimestamp); MessageTracker::UP tracker1 = handler.handleApplyBucketDiff(cmd, *_context); - api::StorageMessage::SP replySent = tracker1->getReply(); + api::StorageMessage::SP replySent = tracker1->getReplySP(); if (midChain) { LOG(debug, "Check state"); @@ -724,7 +724,7 @@ TEST_F(MergeHandlerTest, entry_removed_after_get_bucket_diff) { auto tracker = handler.handleApplyBucketDiff(*applyBucketDiffCmd, *_context); - auto applyBucketDiffReply = std::dynamic_pointer_cast<api::ApplyBucketDiffReply>(tracker->getReply()); + auto applyBucketDiffReply = std::dynamic_pointer_cast<api::ApplyBucketDiffReply>(tracker->getReplySP()); ASSERT_TRUE(applyBucketDiffReply.get()); auto& diff = applyBucketDiffReply->getDiff(); @@ -1129,8 +1129,7 @@ TEST_F(MergeHandlerTest, remove_put_on_existing_timestamp) { setUpChain(BACK); document::TestDocMan docMan; - document::Document::SP doc( - docMan.createRandomDocumentAtLocation(_location)); + document::Document::SP doc(docMan.createRandomDocumentAtLocation(_location)); spi::Timestamp ts(10111); doPut(doc, ts); @@ -1150,9 +1149,7 @@ TEST_F(MergeHandlerTest, remove_put_on_existing_timestamp) { auto tracker = handler.handleApplyBucketDiff(*applyBucketDiffCmd, *_context); - auto applyBucketDiffReply = - std::dynamic_pointer_cast<api::ApplyBucketDiffReply>( - tracker->getReply()); + auto applyBucketDiffReply = std::dynamic_pointer_cast<api::ApplyBucketDiffReply>(tracker->getReplySP()); ASSERT_TRUE(applyBucketDiffReply.get()); api::MergeBucketCommand cmd(_bucket, _nodes, _maxTimestamp); @@ -1163,8 +1160,7 @@ TEST_F(MergeHandlerTest, remove_put_on_existing_timestamp) { // Timestamp should now be a regular remove bool foundTimestamp = false; for (size_t i = 0; i < getBucketDiffCmd->getDiff().size(); ++i) { - const api::GetBucketDiffCommand::Entry& e( - getBucketDiffCmd->getDiff()[i]); + const api::GetBucketDiffCommand::Entry& e(getBucketDiffCmd->getDiff()[i]); if (e._timestamp == ts) { EXPECT_EQ( uint16_t(MergeHandler::IN_USE | MergeHandler::DELETED), diff --git a/storage/src/tests/persistence/persistencethread_splittest.cpp b/storage/src/tests/persistence/persistencethread_splittest.cpp index c8318bef211..98a2be6880d 100644 --- a/storage/src/tests/persistence/persistencethread_splittest.cpp +++ b/storage/src/tests/persistence/persistencethread_splittest.cpp @@ -219,7 +219,7 @@ PersistenceThreadSplitTest::doTest(SplitCase splitCase) if (!code.success()) { return; } - auto& reply = dynamic_cast<api::SplitBucketReply&>(*result->getReply()); + auto& reply = dynamic_cast<api::SplitBucketReply&>(result->getReply()); std::set<std::string> expected; for (uint32_t i=0; i<resultBuckets; ++i) { document::BucketId b(resultSplitLevel, diff --git a/storage/src/tests/persistence/processalltest.cpp b/storage/src/tests/persistence/processalltest.cpp index 83f243ed1b2..e33e692b7d2 100644 --- a/storage/src/tests/persistence/processalltest.cpp +++ b/storage/src/tests/persistence/processalltest.cpp @@ -29,8 +29,8 @@ TEST_F(ProcessAllHandlerTest, remove_location) { "DocEntry(2345, 1, id:mail:testdoctype1:n=4:4008.html)\n", dumpBucket(bucketId)); - auto reply = std::dynamic_pointer_cast<api::RemoveLocationReply>(tracker->getReply()); - ASSERT_TRUE(reply.get() != nullptr); + auto reply = std::dynamic_pointer_cast<api::RemoveLocationReply>(tracker->getReplySP()); + ASSERT_TRUE(reply); EXPECT_EQ(2u, reply->documents_removed()); } @@ -62,8 +62,8 @@ TEST_F(ProcessAllHandlerTest, remove_location_document_subset) { "DocEntry(109, 0, Doc(id:mail:testdoctype1:n=4:6925.html))\n", dumpBucket(bucketId)); - auto reply = std::dynamic_pointer_cast<api::RemoveLocationReply>(tracker->getReply()); - ASSERT_TRUE(reply.get() != nullptr); + auto reply = std::dynamic_pointer_cast<api::RemoveLocationReply>(tracker->getReplySP()); + ASSERT_TRUE(reply); EXPECT_EQ(5u, reply->documents_removed()); } @@ -112,8 +112,8 @@ TEST_F(ProcessAllHandlerTest, bucket_stat_request_returns_document_metadata_matc spi::Context context(documentapi::LoadType::DEFAULT, 0, 0); MessageTracker::UP tracker = handler.handleStatBucket(statBucket, context); - ASSERT_TRUE(tracker->getReply().get()); - auto& reply = dynamic_cast<api::StatBucketReply&>(*tracker->getReply().get()); + ASSERT_TRUE(tracker->hasReply()); + auto& reply = dynamic_cast<api::StatBucketReply&>(tracker->getReply()); EXPECT_EQ(api::ReturnCode::OK, reply.getResult().getResult()); vespalib::string expected = @@ -146,8 +146,8 @@ TEST_F(ProcessAllHandlerTest, stat_bucket_request_can_returned_removed_entries) spi::Context context(documentapi::LoadType::DEFAULT, 0, 0); MessageTracker::UP tracker = handler.handleStatBucket(statBucket, context); - ASSERT_TRUE(tracker->getReply().get()); - auto& reply = dynamic_cast<api::StatBucketReply&>(*tracker->getReply().get()); + ASSERT_TRUE(tracker->hasReply()); + auto& reply = dynamic_cast<api::StatBucketReply&>(tracker->getReply()); EXPECT_EQ(api::ReturnCode::OK, reply.getResult().getResult()); vespalib::string expected = @@ -192,8 +192,8 @@ TEST_F(ProcessAllHandlerTest, bucket_stat_request_can_return_all_put_entries_in_ spi::Context context(documentapi::LoadType::DEFAULT, 0, 0); MessageTracker::UP tracker = handler.handleStatBucket(statBucket, context); - ASSERT_TRUE(tracker->getReply().get()); - auto& reply = dynamic_cast<api::StatBucketReply&>(*tracker->getReply().get()); + ASSERT_TRUE(tracker->hasReply()); + auto& reply = dynamic_cast<api::StatBucketReply&>(tracker->getReply()); EXPECT_EQ(api::ReturnCode::OK, reply.getResult().getResult()); vespalib::string expected = diff --git a/storage/src/tests/persistence/testandsettest.cpp b/storage/src/tests/persistence/testandsettest.cpp index 1e67e90b540..08555fe0627 100644 --- a/storage/src/tests/persistence/testandsettest.cpp +++ b/storage/src/tests/persistence/testandsettest.cpp @@ -257,7 +257,7 @@ document::Document::SP TestAndSetTest::retrieveTestDocument() auto tracker = thread->handleGet(get, context); assert(tracker->getResult() == api::ReturnCode::Result::OK); - auto & reply = static_cast<api::GetReply &>(*tracker->getReply()); + auto & reply = static_cast<api::GetReply &>(tracker->getReply()); assert(reply.wasFound()); return reply.getDocument(); diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp index d47ed28f636..4cb687bb753 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.cpp +++ b/storage/src/vespa/storage/persistence/mergehandler.cpp @@ -1333,9 +1333,7 @@ MessageTracker::UP MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, spi::Context& context) { - MessageTracker::UP tracker(new MessageTracker( - _env._metrics.applyBucketDiff, - _env._component.getClock())); + auto tracker = std::make_unique<MessageTracker>(_env._metrics.applyBucketDiff, _env._component.getClock()); spi::Bucket bucket(cmd.getBucket(), spi::PartitionId(_env._partition)); LOG(debug, "%s", cmd.toString().c_str()); @@ -1391,35 +1389,30 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, } } - tracker->setReply(api::StorageReply::SP(new api::ApplyBucketDiffReply(cmd))); - static_cast<api::ApplyBucketDiffReply&>(*tracker->getReply()).getDiff().swap( - cmd.getDiff()); + tracker->setReply(std::make_shared<api::ApplyBucketDiffReply>(cmd)); + static_cast<api::ApplyBucketDiffReply&>(tracker->getReply()).getDiff().swap(cmd.getDiff()); LOG(spam, "Replying to ApplyBucketDiff for %s to node %d.", bucket.toString().c_str(), cmd.getNodes()[index - 1].index); } else { // When not the last node in merge chain, we must save reply, and // send command on. MergeStateDeleter stateGuard(_env._fileStorHandler, bucket.getBucket()); - MergeStatus::SP s(new MergeStatus(_env._component.getClock(), - cmd.getLoadType(), cmd.getPriority(), - cmd.getTrace().getLevel())); + auto s = std::make_shared<MergeStatus>(_env._component.getClock(), + cmd.getLoadType(), cmd.getPriority(), + cmd.getTrace().getLevel()); _env._fileStorHandler.addMergeStatus(bucket.getBucket(), s); - s->pendingApplyDiff = - api::ApplyBucketDiffReply::SP(new api::ApplyBucketDiffReply(cmd)); + s->pendingApplyDiff = std::make_shared<api::ApplyBucketDiffReply>(cmd); LOG(spam, "Sending ApplyBucketDiff for %s on to node %d", bucket.toString().c_str(), cmd.getNodes()[index + 1].index); - std::shared_ptr<api::ApplyBucketDiffCommand> cmd2( - new api::ApplyBucketDiffCommand( - bucket.getBucket(), cmd.getNodes(), cmd.getMaxBufferSize())); - cmd2->setAddress(createAddress(_env._component.getClusterName(), - cmd.getNodes()[index + 1].index)); + auto cmd2 = std::make_shared<api::ApplyBucketDiffCommand>(bucket.getBucket(), cmd.getNodes(), cmd.getMaxBufferSize()); + cmd2->setAddress(createAddress(_env._component.getClusterName(), cmd.getNodes()[index + 1].index)); cmd2->getDiff().swap(cmd.getDiff()); cmd2->setPriority(cmd.getPriority()); cmd2->setTimeout(cmd.getTimeout()); s->pendingId = cmd2->getMsgId(); _env._fileStorHandler.sendCommand(cmd2); - // Everything went fine. Don't delete state but wait for reply + // Everything went fine. Don't delete state but wait for reply stateGuard.deactivate(); tracker->dontReply(); } diff --git a/storage/src/vespa/storage/persistence/persistencethread.cpp b/storage/src/vespa/storage/persistence/persistencethread.cpp index aaf62a85e87..7b220f39a90 100644 --- a/storage/src/vespa/storage/persistence/persistencethread.cpp +++ b/storage/src/vespa/storage/persistence/persistencethread.cpp @@ -788,8 +788,8 @@ PersistenceThread::handleCommand(api::StorageCommand& msg) spi::Context context(msg.getLoadType(), msg.getPriority(), msg.getTrace().getLevel()); MessageTracker::UP mtracker(handleCommandSplitByType(msg, context)); if (mtracker && ! context.getTrace().getRoot().isEmpty()) { - if (mtracker->getReply()) { - mtracker->getReply()->getTrace().getRoot().addChild(context.getTrace().getRoot()); + if (mtracker->hasReply()) { + mtracker->getReply().getTrace().getRoot().addChild(context.getTrace().getRoot()); } else { msg.getTrace().getRoot().addChild(context.getTrace().getRoot()); } @@ -840,8 +840,8 @@ PersistenceThread::processMessage(api::StorageMessage& msg) LOG(debug, "Received unsupported command %s", msg.getType().getName().c_str()); } else { tracker->generateReply(initiatingCommand); - if ((tracker->getReply() - && tracker->getReply()->getResult().failed()) + if ((tracker->hasReply() + && tracker->getReply().getResult().failed()) || tracker->getResult().failed()) { _env._metrics.failedOperations.inc(); @@ -873,19 +873,19 @@ PersistenceThread::processMessage(api::StorageMessage& msg) namespace { -bool isBatchable(const api::StorageMessage& msg) +bool isBatchable(api::MessageType::Id id) { - return (msg.getType().getId() == api::MessageType::PUT_ID || - msg.getType().getId() == api::MessageType::REMOVE_ID || - msg.getType().getId() == api::MessageType::UPDATE_ID || - msg.getType().getId() == api::MessageType::REVERT_ID); + return (id == api::MessageType::PUT_ID || + id == api::MessageType::REMOVE_ID || + id == api::MessageType::UPDATE_ID || + id == api::MessageType::REVERT_ID); } -bool hasBucketInfo(const api::StorageMessage& msg) +bool hasBucketInfo(api::MessageType::Id id) { - return (isBatchable(msg) || - (msg.getType().getId() == api::MessageType::REMOVELOCATION_ID || - msg.getType().getId() == api::MessageType::JOINBUCKETS_ID)); + return (isBatchable(id) || + (id == api::MessageType::REMOVELOCATION_ID || + id == api::MessageType::JOINBUCKETS_ID)); } } @@ -899,15 +899,15 @@ PersistenceThread::processLockedMessage(FileStorHandler::LockedMessage & lock) { api::StorageMessage & msg(*lock.second); std::unique_ptr<MessageTracker> tracker = processMessage(msg); - if (tracker && tracker->getReply()) { - if (hasBucketInfo(msg)) { - if (tracker->getReply()->getResult().success()) { + if (tracker && tracker->hasReply()) { + if (hasBucketInfo(msg.getType().getId())) { + if (tracker->getReply().getResult().success()) { _env.setBucketInfo(*tracker, bucket); } } LOG(spam, "Sending reply up: %s %" PRIu64, - tracker->getReply()->toString().c_str(), tracker->getReply()->getMsgId()); - _env._fileStorHandler.sendReply(tracker->getReply()); + tracker->getReply().toString().c_str(), tracker->getReply().getMsgId()); + _env._fileStorHandler.sendReply(tracker->getReplySP()); } } diff --git a/storage/src/vespa/storage/persistence/persistenceutil.cpp b/storage/src/vespa/storage/persistence/persistenceutil.cpp index 9cea0e6c5d7..9c49dc96750 100644 --- a/storage/src/vespa/storage/persistence/persistenceutil.cpp +++ b/storage/src/vespa/storage/persistence/persistenceutil.cpp @@ -147,8 +147,7 @@ PersistenceUtil::setBucketInfo(MessageTracker& tracker, const document::Bucket & { api::BucketInfo info = getBucketInfo(bucket, _partition); - static_cast<api::BucketInfoReply&>(*tracker.getReply()). - setBucketInfo(info); + static_cast<api::BucketInfoReply&>(tracker.getReply()).setBucketInfo(info); updateBucketDatabase(bucket, info); } diff --git a/storage/src/vespa/storage/persistence/persistenceutil.h b/storage/src/vespa/storage/persistence/persistenceutil.h index 0eba334c81d..de996a81390 100644 --- a/storage/src/vespa/storage/persistence/persistenceutil.h +++ b/storage/src/vespa/storage/persistence/persistenceutil.h @@ -17,8 +17,7 @@ class MessageTracker : protected Types { public: typedef std::unique_ptr<MessageTracker> UP; - MessageTracker(FileStorThreadMetrics::Op& metric, - framework::Clock& clock); + MessageTracker(FileStorThreadMetrics::Op& metric, framework::Clock& clock); ~MessageTracker(); @@ -28,8 +27,8 @@ public: * a reply, to ensure it is stored in case of failure after reply creation. */ void setReply(api::StorageReply::SP reply) { - assert(_reply.get() == 0); - _reply = reply; + assert( ! _reply ); + _reply = std::move(reply); } /** Utility function to be able to write a bit less in client. */ @@ -43,8 +42,15 @@ public: * commands like merge. */ void dontReply() { _sendReply = false; } - api::StorageReply::SP getReply() { - return _reply; + bool hasReply() const { return bool(_reply); } + const api::StorageReply & getReply() const { + return *_reply; + } + api::StorageReply & getReply() { + return *_reply; + } + api::StorageReply::SP getReplySP() { + return std::move(_reply); } void generateReply(api::StorageCommand& cmd); @@ -52,11 +58,11 @@ public: api::ReturnCode getResult() const { return _result; } private: - bool _sendReply; + bool _sendReply; FileStorThreadMetrics::Op& _metric; - api::StorageReply::SP _reply; - api::ReturnCode _result; - framework::MilliSecTimer _timer; + api::StorageReply::SP _reply; + api::ReturnCode _result; + framework::MilliSecTimer _timer; }; struct PersistenceUtil { |