diff options
author | Tor Egge <Tor.Egge@online.no> | 2021-10-19 12:58:55 +0200 |
---|---|---|
committer | Tor Egge <Tor.Egge@online.no> | 2021-10-19 12:58:55 +0200 |
commit | 8b5738894affddabdb73b15bb8c2eaa1bfb5f316 (patch) | |
tree | 24903b32138c26e37b2945b52e50be9aecb75911 /storage | |
parent | f996eb3c39db4c83b856495ec64984ba7fddc344 (diff) |
Pass message tracker to MergeHandler::handleApplyBucketDiffReply.
This enables handover of bucket lock (part of message tracker) to be
forwarded to ApplyBucketDiffState to keep bucket locked until async writes
have been completed and service layer bucket db has been updated.
Diffstat (limited to 'storage')
5 files changed, 30 insertions, 28 deletions
diff --git a/storage/src/tests/persistence/mergehandlertest.cpp b/storage/src/tests/persistence/mergehandlertest.cpp index fa989410137..e3b63c1bdf9 100644 --- a/storage/src/tests/persistence/mergehandlertest.cpp +++ b/storage/src/tests/persistence/mergehandlertest.cpp @@ -304,10 +304,10 @@ MergeHandlerTest::testApplyBucketDiffChain(bool midChain) EXPECT_FALSE(replySent.get()); LOG(debug, "Verifying that replying the diff sends on back"); - auto reply = std::make_unique<api::ApplyBucketDiffReply>(cmd2); + auto reply = std::make_shared<api::ApplyBucketDiffReply>(cmd2); MessageSenderStub stub; - handler.handleApplyBucketDiffReply(*reply, stub); + handler.handleApplyBucketDiffReply(*reply, stub, createTracker(reply, _bucket)); ASSERT_EQ(1, stub.replies.size()); replySent = stub.replies[0]; } @@ -353,12 +353,12 @@ TEST_F(MergeHandlerTest, master_message_flow) { ASSERT_EQ(2, messageKeeper()._msgs.size()); ASSERT_EQ(api::MessageType::APPLYBUCKETDIFF, messageKeeper()._msgs[1]->getType()); auto& cmd3 = dynamic_cast<api::ApplyBucketDiffCommand&>(*messageKeeper()._msgs[1]); - auto reply2 = std::make_unique<api::ApplyBucketDiffReply>(cmd3); + auto reply2 = std::make_shared<api::ApplyBucketDiffReply>(cmd3); ASSERT_EQ(1, reply2->getDiff().size()); reply2->getDiff()[0]._entry._hasMask |= 2u; MessageSenderStub stub; - handler.handleApplyBucketDiffReply(*reply2, stub); + handler.handleApplyBucketDiffReply(*reply2, stub, createTracker(reply2, _bucket)); ASSERT_EQ(1, stub.replies.size()); @@ -470,9 +470,9 @@ TEST_F(MergeHandlerTest, chunked_apply_bucket_diff) { } } - auto applyBucketDiffReply = std::make_unique<api::ApplyBucketDiffReply>(*applyBucketDiffCmd); + auto applyBucketDiffReply = std::make_shared<api::ApplyBucketDiffReply>(*applyBucketDiffCmd); { - handler.handleApplyBucketDiffReply(*applyBucketDiffReply, messageKeeper()); + handler.handleApplyBucketDiffReply(*applyBucketDiffReply, messageKeeper(), createTracker(applyBucketDiffReply, _bucket)); if (!messageKeeper()._msgs.empty()) { ASSERT_FALSE(reply.get()); @@ -672,10 +672,10 @@ TEST_F(MergeHandlerTest, merge_progress_safe_guard) { handler.handleGetBucketDiffReply(*getBucketDiffReply, messageKeeper()); auto applyBucketDiffCmd = fetchSingleMessage<api::ApplyBucketDiffCommand>(); - auto applyBucketDiffReply = std::make_unique<api::ApplyBucketDiffReply>(*applyBucketDiffCmd); + auto applyBucketDiffReply = std::make_shared<api::ApplyBucketDiffReply>(*applyBucketDiffCmd); MessageSenderStub stub; - handler.handleApplyBucketDiffReply(*applyBucketDiffReply, stub); + handler.handleApplyBucketDiffReply(*applyBucketDiffReply, stub, createTracker(applyBucketDiffReply, _bucket)); ASSERT_EQ(1, stub.replies.size()); @@ -699,14 +699,14 @@ TEST_F(MergeHandlerTest, safe_guard_not_invoked_when_has_mask_changes) { handler.handleGetBucketDiffReply(*getBucketDiffReply, messageKeeper()); auto applyBucketDiffCmd = fetchSingleMessage<api::ApplyBucketDiffCommand>(); - auto applyBucketDiffReply = std::make_unique<api::ApplyBucketDiffReply>(*applyBucketDiffCmd); + auto applyBucketDiffReply = std::make_shared<api::ApplyBucketDiffReply>(*applyBucketDiffCmd); ASSERT_FALSE(applyBucketDiffReply->getDiff().empty()); // Change a hasMask to indicate something changed during merging. applyBucketDiffReply->getDiff()[0]._entry._hasMask = 0x5; MessageSenderStub stub; LOG(debug, "sending apply bucket diff reply"); - handler.handleApplyBucketDiffReply(*applyBucketDiffReply, stub); + handler.handleApplyBucketDiffReply(*applyBucketDiffReply, stub, createTracker(applyBucketDiffReply, _bucket)); ASSERT_EQ(1, stub.commands.size()); @@ -1012,10 +1012,10 @@ MergeHandlerTest::HandleApplyBucketDiffReplyInvoker::invoke( spi::Context&) { (void) test; - api::ApplyBucketDiffReply reply(*_applyCmd); - test.fillDummyApplyDiff(reply.getDiff()); + auto reply = std::make_shared<api::ApplyBucketDiffReply>(*_applyCmd); + test.fillDummyApplyDiff(reply->getDiff()); _stub.clear(); - handler.handleApplyBucketDiffReply(reply, _stub); + handler.handleApplyBucketDiffReply(*reply, _stub, test.createTracker(reply, test._bucket)); } std::string @@ -1301,7 +1301,7 @@ TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply) auto& cmd3 = dynamic_cast<api::ApplyBucketDiffCommand&>(*messageKeeper()._msgs[1]); // ApplyBucketDiffCommand has a shorter node list, node 2 is not present EXPECT_EQ((NodeList{{0, false}, {1, false}, {3, true}}), cmd3.getNodes()); - auto reply = std::make_unique<api::ApplyBucketDiffReply>(cmd3); + auto reply = std::make_shared<api::ApplyBucketDiffReply>(cmd3); auto& diff = reply->getDiff(); EXPECT_EQ(2u, diff.size()); EXPECT_EQ(EntryCheck(20000u, 4u), diff[0]._entry); @@ -1312,7 +1312,7 @@ TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply) */ fill_entry(diff[0], *doc1, getEnv().getDocumentTypeRepo()); diff[0]._entry._hasMask |= 2u; // Simulate diff entry having been applied on node 1. - handler.handleApplyBucketDiffReply(*reply, messageKeeper()); + handler.handleApplyBucketDiffReply(*reply, messageKeeper(), createTracker(reply, _bucket)); LOG(debug, "handled first ApplyBucketDiffReply"); } ASSERT_EQ(3u, messageKeeper()._msgs.size()); @@ -1326,13 +1326,13 @@ TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply) EXPECT_EQ(EntryCheck(20100, 24u), s.diff[baseline_diff_size]); auto& cmd4 = dynamic_cast<api::ApplyBucketDiffCommand&>(*messageKeeper()._msgs[2]); EXPECT_EQ((NodeList{{0, false}, {1, false}, {3, true}}), cmd4.getNodes()); - auto reply = std::make_unique<api::ApplyBucketDiffReply>(cmd4); + auto reply = std::make_shared<api::ApplyBucketDiffReply>(cmd4); auto& diff = reply->getDiff(); EXPECT_EQ(1u, diff.size()); EXPECT_EQ(EntryCheck(20100u, 4u), diff[0]._entry); // Simulate that node 3 somehow lost doc2 when trying to fill diff entry. diff[0]._entry._hasMask &= ~4u; - handler.handleApplyBucketDiffReply(*reply, messageKeeper()); + handler.handleApplyBucketDiffReply(*reply, messageKeeper(), createTracker(reply, _bucket)); LOG(debug, "handled second ApplyBucketDiffReply"); } ASSERT_EQ(4u, messageKeeper()._msgs.size()); @@ -1347,14 +1347,14 @@ TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply) EXPECT_EQ(EntryCheck(20100, 16u), s.diff[baseline_diff_size]); auto& cmd5 = dynamic_cast<api::ApplyBucketDiffCommand&>(*messageKeeper()._msgs[3]); EXPECT_EQ((NodeList{{0, false}, {1, false}}), cmd5.getNodes()); - auto reply = std::make_unique<api::ApplyBucketDiffReply>(cmd5); + auto reply = std::make_shared<api::ApplyBucketDiffReply>(cmd5); auto& diff = reply->getDiff(); EXPECT_EQ(baseline_diff_size, diff.size()); for (auto& e : diff) { EXPECT_EQ(1u, e._entry._hasMask); e._entry._hasMask |= 2u; } - handler.handleApplyBucketDiffReply(*reply, messageKeeper()); + handler.handleApplyBucketDiffReply(*reply, messageKeeper(), createTracker(reply, _bucket)); LOG(debug, "handled third ApplyBucketDiffReply"); } ASSERT_EQ(5u, messageKeeper()._msgs.size()); @@ -1369,12 +1369,12 @@ TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply) EXPECT_EQ(EntryCheck(20100, 16u), s.diff[0]); auto& cmd6 = dynamic_cast<api::ApplyBucketDiffCommand&>(*messageKeeper()._msgs[4]); EXPECT_EQ((NodeList{{0, false}, {1, false}, {4, true}}), cmd6.getNodes()); - auto reply = std::make_unique<api::ApplyBucketDiffReply>(cmd6); + auto reply = std::make_shared<api::ApplyBucketDiffReply>(cmd6); auto& diff = reply->getDiff(); EXPECT_EQ(1u, diff.size()); fill_entry(diff[0], *doc2, getEnv().getDocumentTypeRepo()); diff[0]._entry._hasMask |= 2u; - handler.handleApplyBucketDiffReply(*reply, messageKeeper()); + handler.handleApplyBucketDiffReply(*reply, messageKeeper(), createTracker(reply, _bucket)); LOG(debug, "handled fourth ApplyBucketDiffReply"); } ASSERT_EQ(6u, messageKeeper()._msgs.size()); diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp index 7693156ae30..4b0560424e5 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.cpp +++ b/storage/src/vespa/storage/persistence/mergehandler.cpp @@ -1282,8 +1282,9 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTra } void -MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,MessageSender& sender) const +MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,MessageSender& sender, MessageTracker::UP tracker) const { + (void) tracker; _env._metrics.applyBucketDiffReply.inc(); spi::Bucket bucket(reply.getBucket()); ApplyBucketDiffState async_results(*this, bucket); diff --git a/storage/src/vespa/storage/persistence/mergehandler.h b/storage/src/vespa/storage/persistence/mergehandler.h index fa7e21dae78..e3b5b0899f6 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.h +++ b/storage/src/vespa/storage/persistence/mergehandler.h @@ -68,7 +68,7 @@ public: MessageTrackerUP handleGetBucketDiff(api::GetBucketDiffCommand&, MessageTrackerUP) const; void handleGetBucketDiffReply(api::GetBucketDiffReply&, MessageSender&) const; MessageTrackerUP handleApplyBucketDiff(api::ApplyBucketDiffCommand&, MessageTrackerUP) const; - void handleApplyBucketDiffReply(api::ApplyBucketDiffReply&, MessageSender&) const; + void handleApplyBucketDiffReply(api::ApplyBucketDiffReply&, MessageSender&, MessageTrackerUP) const; private: const framework::Clock &_clock; diff --git a/storage/src/vespa/storage/persistence/persistencehandler.cpp b/storage/src/vespa/storage/persistence/persistencehandler.cpp index 297185ac54c..2106d1385ae 100644 --- a/storage/src/vespa/storage/persistence/persistencehandler.cpp +++ b/storage/src/vespa/storage/persistence/persistencehandler.cpp @@ -87,19 +87,20 @@ PersistenceHandler::handleCommandSplitByType(api::StorageCommand& msg, MessageTr return MessageTracker::UP(); } -void -PersistenceHandler::handleReply(api::StorageReply& reply) const +MessageTracker::UP +PersistenceHandler::handleReply(api::StorageReply& reply, MessageTracker::UP tracker) const { switch (reply.getType().getId()) { case api::MessageType::GETBUCKETDIFF_REPLY_ID: _mergeHandler.handleGetBucketDiffReply(static_cast<api::GetBucketDiffReply&>(reply), _env._fileStorHandler); break; case api::MessageType::APPLYBUCKETDIFF_REPLY_ID: - _mergeHandler.handleApplyBucketDiffReply(static_cast<api::ApplyBucketDiffReply&>(reply), _env._fileStorHandler); + _mergeHandler.handleApplyBucketDiffReply(static_cast<api::ApplyBucketDiffReply&>(reply), _env._fileStorHandler, std::move(tracker)); break; default: break; } + return tracker; } MessageTracker::UP @@ -112,7 +113,7 @@ PersistenceHandler::processMessage(api::StorageMessage& msg, MessageTracker::UP try{ LOG(debug, "Handling reply: %s", msg.toString().c_str()); LOG(spam, "Message content: %s", msg.toString(true).c_str()); - handleReply(static_cast<api::StorageReply&>(msg)); + return handleReply(static_cast<api::StorageReply&>(msg), std::move(tracker)); } catch (std::exception& e) { // It's a reply, so nothing we can do. LOG(debug, "Caught exception for %s: %s", msg.toString().c_str(), e.what()); diff --git a/storage/src/vespa/storage/persistence/persistencehandler.h b/storage/src/vespa/storage/persistence/persistencehandler.h index a800d1d4053..a92c2dc78ca 100644 --- a/storage/src/vespa/storage/persistence/persistencehandler.h +++ b/storage/src/vespa/storage/persistence/persistencehandler.h @@ -38,7 +38,7 @@ public: private: // Message handling functions MessageTracker::UP handleCommandSplitByType(api::StorageCommand&, MessageTracker::UP tracker) const; - void handleReply(api::StorageReply&) const; + MessageTracker::UP handleReply(api::StorageReply&, MessageTracker::UP) const; MessageTracker::UP processMessage(api::StorageMessage& msg, MessageTracker::UP tracker) const; |