summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2021-10-19 14:11:58 +0200
committerGitHub <noreply@github.com>2021-10-19 14:11:58 +0200
commita2a1d55e1b4cebbcbc716dbdf057df2977057ea8 (patch)
tree2dd3d13daa053a6c780bdd966e06eb24df0c98b6 /storage
parent13ce30b7d28ccb1e6388481b5b5aa3deafc52191 (diff)
parent8b5738894affddabdb73b15bb8c2eaa1bfb5f316 (diff)
Merge pull request #19633 from vespa-engine/toregge/pass-message-tracker-to-handle-apply-bucket-diff-reply
Pass message tracker to MergeHandler::handleApplyBucketDiffReply.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/persistence/mergehandlertest.cpp42
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.cpp3
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.h2
-rw-r--r--storage/src/vespa/storage/persistence/persistencehandler.cpp9
-rw-r--r--storage/src/vespa/storage/persistence/persistencehandler.h2
5 files changed, 30 insertions, 28 deletions
diff --git a/storage/src/tests/persistence/mergehandlertest.cpp b/storage/src/tests/persistence/mergehandlertest.cpp
index fa989410137..e3b63c1bdf9 100644
--- a/storage/src/tests/persistence/mergehandlertest.cpp
+++ b/storage/src/tests/persistence/mergehandlertest.cpp
@@ -304,10 +304,10 @@ MergeHandlerTest::testApplyBucketDiffChain(bool midChain)
EXPECT_FALSE(replySent.get());
LOG(debug, "Verifying that replying the diff sends on back");
- auto reply = std::make_unique<api::ApplyBucketDiffReply>(cmd2);
+ auto reply = std::make_shared<api::ApplyBucketDiffReply>(cmd2);
MessageSenderStub stub;
- handler.handleApplyBucketDiffReply(*reply, stub);
+ handler.handleApplyBucketDiffReply(*reply, stub, createTracker(reply, _bucket));
ASSERT_EQ(1, stub.replies.size());
replySent = stub.replies[0];
}
@@ -353,12 +353,12 @@ TEST_F(MergeHandlerTest, master_message_flow) {
ASSERT_EQ(2, messageKeeper()._msgs.size());
ASSERT_EQ(api::MessageType::APPLYBUCKETDIFF, messageKeeper()._msgs[1]->getType());
auto& cmd3 = dynamic_cast<api::ApplyBucketDiffCommand&>(*messageKeeper()._msgs[1]);
- auto reply2 = std::make_unique<api::ApplyBucketDiffReply>(cmd3);
+ auto reply2 = std::make_shared<api::ApplyBucketDiffReply>(cmd3);
ASSERT_EQ(1, reply2->getDiff().size());
reply2->getDiff()[0]._entry._hasMask |= 2u;
MessageSenderStub stub;
- handler.handleApplyBucketDiffReply(*reply2, stub);
+ handler.handleApplyBucketDiffReply(*reply2, stub, createTracker(reply2, _bucket));
ASSERT_EQ(1, stub.replies.size());
@@ -470,9 +470,9 @@ TEST_F(MergeHandlerTest, chunked_apply_bucket_diff) {
}
}
- auto applyBucketDiffReply = std::make_unique<api::ApplyBucketDiffReply>(*applyBucketDiffCmd);
+ auto applyBucketDiffReply = std::make_shared<api::ApplyBucketDiffReply>(*applyBucketDiffCmd);
{
- handler.handleApplyBucketDiffReply(*applyBucketDiffReply, messageKeeper());
+ handler.handleApplyBucketDiffReply(*applyBucketDiffReply, messageKeeper(), createTracker(applyBucketDiffReply, _bucket));
if (!messageKeeper()._msgs.empty()) {
ASSERT_FALSE(reply.get());
@@ -672,10 +672,10 @@ TEST_F(MergeHandlerTest, merge_progress_safe_guard) {
handler.handleGetBucketDiffReply(*getBucketDiffReply, messageKeeper());
auto applyBucketDiffCmd = fetchSingleMessage<api::ApplyBucketDiffCommand>();
- auto applyBucketDiffReply = std::make_unique<api::ApplyBucketDiffReply>(*applyBucketDiffCmd);
+ auto applyBucketDiffReply = std::make_shared<api::ApplyBucketDiffReply>(*applyBucketDiffCmd);
MessageSenderStub stub;
- handler.handleApplyBucketDiffReply(*applyBucketDiffReply, stub);
+ handler.handleApplyBucketDiffReply(*applyBucketDiffReply, stub, createTracker(applyBucketDiffReply, _bucket));
ASSERT_EQ(1, stub.replies.size());
@@ -699,14 +699,14 @@ TEST_F(MergeHandlerTest, safe_guard_not_invoked_when_has_mask_changes) {
handler.handleGetBucketDiffReply(*getBucketDiffReply, messageKeeper());
auto applyBucketDiffCmd = fetchSingleMessage<api::ApplyBucketDiffCommand>();
- auto applyBucketDiffReply = std::make_unique<api::ApplyBucketDiffReply>(*applyBucketDiffCmd);
+ auto applyBucketDiffReply = std::make_shared<api::ApplyBucketDiffReply>(*applyBucketDiffCmd);
ASSERT_FALSE(applyBucketDiffReply->getDiff().empty());
// Change a hasMask to indicate something changed during merging.
applyBucketDiffReply->getDiff()[0]._entry._hasMask = 0x5;
MessageSenderStub stub;
LOG(debug, "sending apply bucket diff reply");
- handler.handleApplyBucketDiffReply(*applyBucketDiffReply, stub);
+ handler.handleApplyBucketDiffReply(*applyBucketDiffReply, stub, createTracker(applyBucketDiffReply, _bucket));
ASSERT_EQ(1, stub.commands.size());
@@ -1012,10 +1012,10 @@ MergeHandlerTest::HandleApplyBucketDiffReplyInvoker::invoke(
spi::Context&)
{
(void) test;
- api::ApplyBucketDiffReply reply(*_applyCmd);
- test.fillDummyApplyDiff(reply.getDiff());
+ auto reply = std::make_shared<api::ApplyBucketDiffReply>(*_applyCmd);
+ test.fillDummyApplyDiff(reply->getDiff());
_stub.clear();
- handler.handleApplyBucketDiffReply(reply, _stub);
+ handler.handleApplyBucketDiffReply(*reply, _stub, test.createTracker(reply, test._bucket));
}
std::string
@@ -1301,7 +1301,7 @@ TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply)
auto& cmd3 = dynamic_cast<api::ApplyBucketDiffCommand&>(*messageKeeper()._msgs[1]);
// ApplyBucketDiffCommand has a shorter node list, node 2 is not present
EXPECT_EQ((NodeList{{0, false}, {1, false}, {3, true}}), cmd3.getNodes());
- auto reply = std::make_unique<api::ApplyBucketDiffReply>(cmd3);
+ auto reply = std::make_shared<api::ApplyBucketDiffReply>(cmd3);
auto& diff = reply->getDiff();
EXPECT_EQ(2u, diff.size());
EXPECT_EQ(EntryCheck(20000u, 4u), diff[0]._entry);
@@ -1312,7 +1312,7 @@ TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply)
*/
fill_entry(diff[0], *doc1, getEnv().getDocumentTypeRepo());
diff[0]._entry._hasMask |= 2u; // Simulate diff entry having been applied on node 1.
- handler.handleApplyBucketDiffReply(*reply, messageKeeper());
+ handler.handleApplyBucketDiffReply(*reply, messageKeeper(), createTracker(reply, _bucket));
LOG(debug, "handled first ApplyBucketDiffReply");
}
ASSERT_EQ(3u, messageKeeper()._msgs.size());
@@ -1326,13 +1326,13 @@ TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply)
EXPECT_EQ(EntryCheck(20100, 24u), s.diff[baseline_diff_size]);
auto& cmd4 = dynamic_cast<api::ApplyBucketDiffCommand&>(*messageKeeper()._msgs[2]);
EXPECT_EQ((NodeList{{0, false}, {1, false}, {3, true}}), cmd4.getNodes());
- auto reply = std::make_unique<api::ApplyBucketDiffReply>(cmd4);
+ auto reply = std::make_shared<api::ApplyBucketDiffReply>(cmd4);
auto& diff = reply->getDiff();
EXPECT_EQ(1u, diff.size());
EXPECT_EQ(EntryCheck(20100u, 4u), diff[0]._entry);
// Simulate that node 3 somehow lost doc2 when trying to fill diff entry.
diff[0]._entry._hasMask &= ~4u;
- handler.handleApplyBucketDiffReply(*reply, messageKeeper());
+ handler.handleApplyBucketDiffReply(*reply, messageKeeper(), createTracker(reply, _bucket));
LOG(debug, "handled second ApplyBucketDiffReply");
}
ASSERT_EQ(4u, messageKeeper()._msgs.size());
@@ -1347,14 +1347,14 @@ TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply)
EXPECT_EQ(EntryCheck(20100, 16u), s.diff[baseline_diff_size]);
auto& cmd5 = dynamic_cast<api::ApplyBucketDiffCommand&>(*messageKeeper()._msgs[3]);
EXPECT_EQ((NodeList{{0, false}, {1, false}}), cmd5.getNodes());
- auto reply = std::make_unique<api::ApplyBucketDiffReply>(cmd5);
+ auto reply = std::make_shared<api::ApplyBucketDiffReply>(cmd5);
auto& diff = reply->getDiff();
EXPECT_EQ(baseline_diff_size, diff.size());
for (auto& e : diff) {
EXPECT_EQ(1u, e._entry._hasMask);
e._entry._hasMask |= 2u;
}
- handler.handleApplyBucketDiffReply(*reply, messageKeeper());
+ handler.handleApplyBucketDiffReply(*reply, messageKeeper(), createTracker(reply, _bucket));
LOG(debug, "handled third ApplyBucketDiffReply");
}
ASSERT_EQ(5u, messageKeeper()._msgs.size());
@@ -1369,12 +1369,12 @@ TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply)
EXPECT_EQ(EntryCheck(20100, 16u), s.diff[0]);
auto& cmd6 = dynamic_cast<api::ApplyBucketDiffCommand&>(*messageKeeper()._msgs[4]);
EXPECT_EQ((NodeList{{0, false}, {1, false}, {4, true}}), cmd6.getNodes());
- auto reply = std::make_unique<api::ApplyBucketDiffReply>(cmd6);
+ auto reply = std::make_shared<api::ApplyBucketDiffReply>(cmd6);
auto& diff = reply->getDiff();
EXPECT_EQ(1u, diff.size());
fill_entry(diff[0], *doc2, getEnv().getDocumentTypeRepo());
diff[0]._entry._hasMask |= 2u;
- handler.handleApplyBucketDiffReply(*reply, messageKeeper());
+ handler.handleApplyBucketDiffReply(*reply, messageKeeper(), createTracker(reply, _bucket));
LOG(debug, "handled fourth ApplyBucketDiffReply");
}
ASSERT_EQ(6u, messageKeeper()._msgs.size());
diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp
index 7693156ae30..4b0560424e5 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.cpp
+++ b/storage/src/vespa/storage/persistence/mergehandler.cpp
@@ -1282,8 +1282,9 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTra
}
void
-MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,MessageSender& sender) const
+MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,MessageSender& sender, MessageTracker::UP tracker) const
{
+ (void) tracker;
_env._metrics.applyBucketDiffReply.inc();
spi::Bucket bucket(reply.getBucket());
ApplyBucketDiffState async_results(*this, bucket);
diff --git a/storage/src/vespa/storage/persistence/mergehandler.h b/storage/src/vespa/storage/persistence/mergehandler.h
index fa7e21dae78..e3b5b0899f6 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.h
+++ b/storage/src/vespa/storage/persistence/mergehandler.h
@@ -68,7 +68,7 @@ public:
MessageTrackerUP handleGetBucketDiff(api::GetBucketDiffCommand&, MessageTrackerUP) const;
void handleGetBucketDiffReply(api::GetBucketDiffReply&, MessageSender&) const;
MessageTrackerUP handleApplyBucketDiff(api::ApplyBucketDiffCommand&, MessageTrackerUP) const;
- void handleApplyBucketDiffReply(api::ApplyBucketDiffReply&, MessageSender&) const;
+ void handleApplyBucketDiffReply(api::ApplyBucketDiffReply&, MessageSender&, MessageTrackerUP) const;
private:
const framework::Clock &_clock;
diff --git a/storage/src/vespa/storage/persistence/persistencehandler.cpp b/storage/src/vespa/storage/persistence/persistencehandler.cpp
index 297185ac54c..2106d1385ae 100644
--- a/storage/src/vespa/storage/persistence/persistencehandler.cpp
+++ b/storage/src/vespa/storage/persistence/persistencehandler.cpp
@@ -87,19 +87,20 @@ PersistenceHandler::handleCommandSplitByType(api::StorageCommand& msg, MessageTr
return MessageTracker::UP();
}
-void
-PersistenceHandler::handleReply(api::StorageReply& reply) const
+MessageTracker::UP
+PersistenceHandler::handleReply(api::StorageReply& reply, MessageTracker::UP tracker) const
{
switch (reply.getType().getId()) {
case api::MessageType::GETBUCKETDIFF_REPLY_ID:
_mergeHandler.handleGetBucketDiffReply(static_cast<api::GetBucketDiffReply&>(reply), _env._fileStorHandler);
break;
case api::MessageType::APPLYBUCKETDIFF_REPLY_ID:
- _mergeHandler.handleApplyBucketDiffReply(static_cast<api::ApplyBucketDiffReply&>(reply), _env._fileStorHandler);
+ _mergeHandler.handleApplyBucketDiffReply(static_cast<api::ApplyBucketDiffReply&>(reply), _env._fileStorHandler, std::move(tracker));
break;
default:
break;
}
+ return tracker;
}
MessageTracker::UP
@@ -112,7 +113,7 @@ PersistenceHandler::processMessage(api::StorageMessage& msg, MessageTracker::UP
try{
LOG(debug, "Handling reply: %s", msg.toString().c_str());
LOG(spam, "Message content: %s", msg.toString(true).c_str());
- handleReply(static_cast<api::StorageReply&>(msg));
+ return handleReply(static_cast<api::StorageReply&>(msg), std::move(tracker));
} catch (std::exception& e) {
// It's a reply, so nothing we can do.
LOG(debug, "Caught exception for %s: %s", msg.toString().c_str(), e.what());
diff --git a/storage/src/vespa/storage/persistence/persistencehandler.h b/storage/src/vespa/storage/persistence/persistencehandler.h
index a800d1d4053..a92c2dc78ca 100644
--- a/storage/src/vespa/storage/persistence/persistencehandler.h
+++ b/storage/src/vespa/storage/persistence/persistencehandler.h
@@ -38,7 +38,7 @@ public:
private:
// Message handling functions
MessageTracker::UP handleCommandSplitByType(api::StorageCommand&, MessageTracker::UP tracker) const;
- void handleReply(api::StorageReply&) const;
+ MessageTracker::UP handleReply(api::StorageReply&, MessageTracker::UP) const;
MessageTracker::UP processMessage(api::StorageMessage& msg, MessageTracker::UP tracker) const;