summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@online.no>2021-10-19 12:58:55 +0200
committerTor Egge <Tor.Egge@online.no>2021-10-19 12:58:55 +0200
commit8b5738894affddabdb73b15bb8c2eaa1bfb5f316 (patch)
tree24903b32138c26e37b2945b52e50be9aecb75911 /storage
parentf996eb3c39db4c83b856495ec64984ba7fddc344 (diff)
Pass message tracker to MergeHandler::handleApplyBucketDiffReply.
This enables handover of bucket lock (part of message tracker) to be forwarded to ApplyBucketDiffState to keep bucket locked until async writes have been completed and service layer bucket db has been updated.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/persistence/mergehandlertest.cpp42
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.cpp3
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.h2
-rw-r--r--storage/src/vespa/storage/persistence/persistencehandler.cpp9
-rw-r--r--storage/src/vespa/storage/persistence/persistencehandler.h2
5 files changed, 30 insertions, 28 deletions
diff --git a/storage/src/tests/persistence/mergehandlertest.cpp b/storage/src/tests/persistence/mergehandlertest.cpp
index fa989410137..e3b63c1bdf9 100644
--- a/storage/src/tests/persistence/mergehandlertest.cpp
+++ b/storage/src/tests/persistence/mergehandlertest.cpp
@@ -304,10 +304,10 @@ MergeHandlerTest::testApplyBucketDiffChain(bool midChain)
EXPECT_FALSE(replySent.get());
LOG(debug, "Verifying that replying the diff sends on back");
- auto reply = std::make_unique<api::ApplyBucketDiffReply>(cmd2);
+ auto reply = std::make_shared<api::ApplyBucketDiffReply>(cmd2);
MessageSenderStub stub;
- handler.handleApplyBucketDiffReply(*reply, stub);
+ handler.handleApplyBucketDiffReply(*reply, stub, createTracker(reply, _bucket));
ASSERT_EQ(1, stub.replies.size());
replySent = stub.replies[0];
}
@@ -353,12 +353,12 @@ TEST_F(MergeHandlerTest, master_message_flow) {
ASSERT_EQ(2, messageKeeper()._msgs.size());
ASSERT_EQ(api::MessageType::APPLYBUCKETDIFF, messageKeeper()._msgs[1]->getType());
auto& cmd3 = dynamic_cast<api::ApplyBucketDiffCommand&>(*messageKeeper()._msgs[1]);
- auto reply2 = std::make_unique<api::ApplyBucketDiffReply>(cmd3);
+ auto reply2 = std::make_shared<api::ApplyBucketDiffReply>(cmd3);
ASSERT_EQ(1, reply2->getDiff().size());
reply2->getDiff()[0]._entry._hasMask |= 2u;
MessageSenderStub stub;
- handler.handleApplyBucketDiffReply(*reply2, stub);
+ handler.handleApplyBucketDiffReply(*reply2, stub, createTracker(reply2, _bucket));
ASSERT_EQ(1, stub.replies.size());
@@ -470,9 +470,9 @@ TEST_F(MergeHandlerTest, chunked_apply_bucket_diff) {
}
}
- auto applyBucketDiffReply = std::make_unique<api::ApplyBucketDiffReply>(*applyBucketDiffCmd);
+ auto applyBucketDiffReply = std::make_shared<api::ApplyBucketDiffReply>(*applyBucketDiffCmd);
{
- handler.handleApplyBucketDiffReply(*applyBucketDiffReply, messageKeeper());
+ handler.handleApplyBucketDiffReply(*applyBucketDiffReply, messageKeeper(), createTracker(applyBucketDiffReply, _bucket));
if (!messageKeeper()._msgs.empty()) {
ASSERT_FALSE(reply.get());
@@ -672,10 +672,10 @@ TEST_F(MergeHandlerTest, merge_progress_safe_guard) {
handler.handleGetBucketDiffReply(*getBucketDiffReply, messageKeeper());
auto applyBucketDiffCmd = fetchSingleMessage<api::ApplyBucketDiffCommand>();
- auto applyBucketDiffReply = std::make_unique<api::ApplyBucketDiffReply>(*applyBucketDiffCmd);
+ auto applyBucketDiffReply = std::make_shared<api::ApplyBucketDiffReply>(*applyBucketDiffCmd);
MessageSenderStub stub;
- handler.handleApplyBucketDiffReply(*applyBucketDiffReply, stub);
+ handler.handleApplyBucketDiffReply(*applyBucketDiffReply, stub, createTracker(applyBucketDiffReply, _bucket));
ASSERT_EQ(1, stub.replies.size());
@@ -699,14 +699,14 @@ TEST_F(MergeHandlerTest, safe_guard_not_invoked_when_has_mask_changes) {
handler.handleGetBucketDiffReply(*getBucketDiffReply, messageKeeper());
auto applyBucketDiffCmd = fetchSingleMessage<api::ApplyBucketDiffCommand>();
- auto applyBucketDiffReply = std::make_unique<api::ApplyBucketDiffReply>(*applyBucketDiffCmd);
+ auto applyBucketDiffReply = std::make_shared<api::ApplyBucketDiffReply>(*applyBucketDiffCmd);
ASSERT_FALSE(applyBucketDiffReply->getDiff().empty());
// Change a hasMask to indicate something changed during merging.
applyBucketDiffReply->getDiff()[0]._entry._hasMask = 0x5;
MessageSenderStub stub;
LOG(debug, "sending apply bucket diff reply");
- handler.handleApplyBucketDiffReply(*applyBucketDiffReply, stub);
+ handler.handleApplyBucketDiffReply(*applyBucketDiffReply, stub, createTracker(applyBucketDiffReply, _bucket));
ASSERT_EQ(1, stub.commands.size());
@@ -1012,10 +1012,10 @@ MergeHandlerTest::HandleApplyBucketDiffReplyInvoker::invoke(
spi::Context&)
{
(void) test;
- api::ApplyBucketDiffReply reply(*_applyCmd);
- test.fillDummyApplyDiff(reply.getDiff());
+ auto reply = std::make_shared<api::ApplyBucketDiffReply>(*_applyCmd);
+ test.fillDummyApplyDiff(reply->getDiff());
_stub.clear();
- handler.handleApplyBucketDiffReply(reply, _stub);
+ handler.handleApplyBucketDiffReply(*reply, _stub, test.createTracker(reply, test._bucket));
}
std::string
@@ -1301,7 +1301,7 @@ TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply)
auto& cmd3 = dynamic_cast<api::ApplyBucketDiffCommand&>(*messageKeeper()._msgs[1]);
// ApplyBucketDiffCommand has a shorter node list, node 2 is not present
EXPECT_EQ((NodeList{{0, false}, {1, false}, {3, true}}), cmd3.getNodes());
- auto reply = std::make_unique<api::ApplyBucketDiffReply>(cmd3);
+ auto reply = std::make_shared<api::ApplyBucketDiffReply>(cmd3);
auto& diff = reply->getDiff();
EXPECT_EQ(2u, diff.size());
EXPECT_EQ(EntryCheck(20000u, 4u), diff[0]._entry);
@@ -1312,7 +1312,7 @@ TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply)
*/
fill_entry(diff[0], *doc1, getEnv().getDocumentTypeRepo());
diff[0]._entry._hasMask |= 2u; // Simulate diff entry having been applied on node 1.
- handler.handleApplyBucketDiffReply(*reply, messageKeeper());
+ handler.handleApplyBucketDiffReply(*reply, messageKeeper(), createTracker(reply, _bucket));
LOG(debug, "handled first ApplyBucketDiffReply");
}
ASSERT_EQ(3u, messageKeeper()._msgs.size());
@@ -1326,13 +1326,13 @@ TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply)
EXPECT_EQ(EntryCheck(20100, 24u), s.diff[baseline_diff_size]);
auto& cmd4 = dynamic_cast<api::ApplyBucketDiffCommand&>(*messageKeeper()._msgs[2]);
EXPECT_EQ((NodeList{{0, false}, {1, false}, {3, true}}), cmd4.getNodes());
- auto reply = std::make_unique<api::ApplyBucketDiffReply>(cmd4);
+ auto reply = std::make_shared<api::ApplyBucketDiffReply>(cmd4);
auto& diff = reply->getDiff();
EXPECT_EQ(1u, diff.size());
EXPECT_EQ(EntryCheck(20100u, 4u), diff[0]._entry);
// Simulate that node 3 somehow lost doc2 when trying to fill diff entry.
diff[0]._entry._hasMask &= ~4u;
- handler.handleApplyBucketDiffReply(*reply, messageKeeper());
+ handler.handleApplyBucketDiffReply(*reply, messageKeeper(), createTracker(reply, _bucket));
LOG(debug, "handled second ApplyBucketDiffReply");
}
ASSERT_EQ(4u, messageKeeper()._msgs.size());
@@ -1347,14 +1347,14 @@ TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply)
EXPECT_EQ(EntryCheck(20100, 16u), s.diff[baseline_diff_size]);
auto& cmd5 = dynamic_cast<api::ApplyBucketDiffCommand&>(*messageKeeper()._msgs[3]);
EXPECT_EQ((NodeList{{0, false}, {1, false}}), cmd5.getNodes());
- auto reply = std::make_unique<api::ApplyBucketDiffReply>(cmd5);
+ auto reply = std::make_shared<api::ApplyBucketDiffReply>(cmd5);
auto& diff = reply->getDiff();
EXPECT_EQ(baseline_diff_size, diff.size());
for (auto& e : diff) {
EXPECT_EQ(1u, e._entry._hasMask);
e._entry._hasMask |= 2u;
}
- handler.handleApplyBucketDiffReply(*reply, messageKeeper());
+ handler.handleApplyBucketDiffReply(*reply, messageKeeper(), createTracker(reply, _bucket));
LOG(debug, "handled third ApplyBucketDiffReply");
}
ASSERT_EQ(5u, messageKeeper()._msgs.size());
@@ -1369,12 +1369,12 @@ TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply)
EXPECT_EQ(EntryCheck(20100, 16u), s.diff[0]);
auto& cmd6 = dynamic_cast<api::ApplyBucketDiffCommand&>(*messageKeeper()._msgs[4]);
EXPECT_EQ((NodeList{{0, false}, {1, false}, {4, true}}), cmd6.getNodes());
- auto reply = std::make_unique<api::ApplyBucketDiffReply>(cmd6);
+ auto reply = std::make_shared<api::ApplyBucketDiffReply>(cmd6);
auto& diff = reply->getDiff();
EXPECT_EQ(1u, diff.size());
fill_entry(diff[0], *doc2, getEnv().getDocumentTypeRepo());
diff[0]._entry._hasMask |= 2u;
- handler.handleApplyBucketDiffReply(*reply, messageKeeper());
+ handler.handleApplyBucketDiffReply(*reply, messageKeeper(), createTracker(reply, _bucket));
LOG(debug, "handled fourth ApplyBucketDiffReply");
}
ASSERT_EQ(6u, messageKeeper()._msgs.size());
diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp
index 7693156ae30..4b0560424e5 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.cpp
+++ b/storage/src/vespa/storage/persistence/mergehandler.cpp
@@ -1282,8 +1282,9 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTra
}
void
-MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,MessageSender& sender) const
+MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,MessageSender& sender, MessageTracker::UP tracker) const
{
+ (void) tracker;
_env._metrics.applyBucketDiffReply.inc();
spi::Bucket bucket(reply.getBucket());
ApplyBucketDiffState async_results(*this, bucket);
diff --git a/storage/src/vespa/storage/persistence/mergehandler.h b/storage/src/vespa/storage/persistence/mergehandler.h
index fa7e21dae78..e3b5b0899f6 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.h
+++ b/storage/src/vespa/storage/persistence/mergehandler.h
@@ -68,7 +68,7 @@ public:
MessageTrackerUP handleGetBucketDiff(api::GetBucketDiffCommand&, MessageTrackerUP) const;
void handleGetBucketDiffReply(api::GetBucketDiffReply&, MessageSender&) const;
MessageTrackerUP handleApplyBucketDiff(api::ApplyBucketDiffCommand&, MessageTrackerUP) const;
- void handleApplyBucketDiffReply(api::ApplyBucketDiffReply&, MessageSender&) const;
+ void handleApplyBucketDiffReply(api::ApplyBucketDiffReply&, MessageSender&, MessageTrackerUP) const;
private:
const framework::Clock &_clock;
diff --git a/storage/src/vespa/storage/persistence/persistencehandler.cpp b/storage/src/vespa/storage/persistence/persistencehandler.cpp
index 297185ac54c..2106d1385ae 100644
--- a/storage/src/vespa/storage/persistence/persistencehandler.cpp
+++ b/storage/src/vespa/storage/persistence/persistencehandler.cpp
@@ -87,19 +87,20 @@ PersistenceHandler::handleCommandSplitByType(api::StorageCommand& msg, MessageTr
return MessageTracker::UP();
}
-void
-PersistenceHandler::handleReply(api::StorageReply& reply) const
+MessageTracker::UP
+PersistenceHandler::handleReply(api::StorageReply& reply, MessageTracker::UP tracker) const
{
switch (reply.getType().getId()) {
case api::MessageType::GETBUCKETDIFF_REPLY_ID:
_mergeHandler.handleGetBucketDiffReply(static_cast<api::GetBucketDiffReply&>(reply), _env._fileStorHandler);
break;
case api::MessageType::APPLYBUCKETDIFF_REPLY_ID:
- _mergeHandler.handleApplyBucketDiffReply(static_cast<api::ApplyBucketDiffReply&>(reply), _env._fileStorHandler);
+ _mergeHandler.handleApplyBucketDiffReply(static_cast<api::ApplyBucketDiffReply&>(reply), _env._fileStorHandler, std::move(tracker));
break;
default:
break;
}
+ return tracker;
}
MessageTracker::UP
@@ -112,7 +113,7 @@ PersistenceHandler::processMessage(api::StorageMessage& msg, MessageTracker::UP
try{
LOG(debug, "Handling reply: %s", msg.toString().c_str());
LOG(spam, "Message content: %s", msg.toString(true).c_str());
- handleReply(static_cast<api::StorageReply&>(msg));
+ return handleReply(static_cast<api::StorageReply&>(msg), std::move(tracker));
} catch (std::exception& e) {
// It's a reply, so nothing we can do.
LOG(debug, "Caught exception for %s: %s", msg.toString().c_str(), e.what());
diff --git a/storage/src/vespa/storage/persistence/persistencehandler.h b/storage/src/vespa/storage/persistence/persistencehandler.h
index a800d1d4053..a92c2dc78ca 100644
--- a/storage/src/vespa/storage/persistence/persistencehandler.h
+++ b/storage/src/vespa/storage/persistence/persistencehandler.h
@@ -38,7 +38,7 @@ public:
private:
// Message handling functions
MessageTracker::UP handleCommandSplitByType(api::StorageCommand&, MessageTracker::UP tracker) const;
- void handleReply(api::StorageReply&) const;
+ MessageTracker::UP handleReply(api::StorageReply&, MessageTracker::UP) const;
MessageTracker::UP processMessage(api::StorageMessage& msg, MessageTracker::UP tracker) const;