diff options
Diffstat (limited to 'storage')
6 files changed, 34 insertions, 28 deletions
diff --git a/storage/src/tests/persistence/mergehandlertest.cpp b/storage/src/tests/persistence/mergehandlertest.cpp index fa989410137..e3b63c1bdf9 100644 --- a/storage/src/tests/persistence/mergehandlertest.cpp +++ b/storage/src/tests/persistence/mergehandlertest.cpp @@ -304,10 +304,10 @@ MergeHandlerTest::testApplyBucketDiffChain(bool midChain) EXPECT_FALSE(replySent.get()); LOG(debug, "Verifying that replying the diff sends on back"); - auto reply = std::make_unique<api::ApplyBucketDiffReply>(cmd2); + auto reply = std::make_shared<api::ApplyBucketDiffReply>(cmd2); MessageSenderStub stub; - handler.handleApplyBucketDiffReply(*reply, stub); + handler.handleApplyBucketDiffReply(*reply, stub, createTracker(reply, _bucket)); ASSERT_EQ(1, stub.replies.size()); replySent = stub.replies[0]; } @@ -353,12 +353,12 @@ TEST_F(MergeHandlerTest, master_message_flow) { ASSERT_EQ(2, messageKeeper()._msgs.size()); ASSERT_EQ(api::MessageType::APPLYBUCKETDIFF, messageKeeper()._msgs[1]->getType()); auto& cmd3 = dynamic_cast<api::ApplyBucketDiffCommand&>(*messageKeeper()._msgs[1]); - auto reply2 = std::make_unique<api::ApplyBucketDiffReply>(cmd3); + auto reply2 = std::make_shared<api::ApplyBucketDiffReply>(cmd3); ASSERT_EQ(1, reply2->getDiff().size()); reply2->getDiff()[0]._entry._hasMask |= 2u; MessageSenderStub stub; - handler.handleApplyBucketDiffReply(*reply2, stub); + handler.handleApplyBucketDiffReply(*reply2, stub, createTracker(reply2, _bucket)); ASSERT_EQ(1, stub.replies.size()); @@ -470,9 +470,9 @@ TEST_F(MergeHandlerTest, chunked_apply_bucket_diff) { } } - auto applyBucketDiffReply = std::make_unique<api::ApplyBucketDiffReply>(*applyBucketDiffCmd); + auto applyBucketDiffReply = std::make_shared<api::ApplyBucketDiffReply>(*applyBucketDiffCmd); { - handler.handleApplyBucketDiffReply(*applyBucketDiffReply, messageKeeper()); + handler.handleApplyBucketDiffReply(*applyBucketDiffReply, messageKeeper(), createTracker(applyBucketDiffReply, _bucket)); if (!messageKeeper()._msgs.empty()) { ASSERT_FALSE(reply.get()); @@ -672,10 +672,10 @@ TEST_F(MergeHandlerTest, merge_progress_safe_guard) { handler.handleGetBucketDiffReply(*getBucketDiffReply, messageKeeper()); auto applyBucketDiffCmd = fetchSingleMessage<api::ApplyBucketDiffCommand>(); - auto applyBucketDiffReply = std::make_unique<api::ApplyBucketDiffReply>(*applyBucketDiffCmd); + auto applyBucketDiffReply = std::make_shared<api::ApplyBucketDiffReply>(*applyBucketDiffCmd); MessageSenderStub stub; - handler.handleApplyBucketDiffReply(*applyBucketDiffReply, stub); + handler.handleApplyBucketDiffReply(*applyBucketDiffReply, stub, createTracker(applyBucketDiffReply, _bucket)); ASSERT_EQ(1, stub.replies.size()); @@ -699,14 +699,14 @@ TEST_F(MergeHandlerTest, safe_guard_not_invoked_when_has_mask_changes) { handler.handleGetBucketDiffReply(*getBucketDiffReply, messageKeeper()); auto applyBucketDiffCmd = fetchSingleMessage<api::ApplyBucketDiffCommand>(); - auto applyBucketDiffReply = std::make_unique<api::ApplyBucketDiffReply>(*applyBucketDiffCmd); + auto applyBucketDiffReply = std::make_shared<api::ApplyBucketDiffReply>(*applyBucketDiffCmd); ASSERT_FALSE(applyBucketDiffReply->getDiff().empty()); // Change a hasMask to indicate something changed during merging. applyBucketDiffReply->getDiff()[0]._entry._hasMask = 0x5; MessageSenderStub stub; LOG(debug, "sending apply bucket diff reply"); - handler.handleApplyBucketDiffReply(*applyBucketDiffReply, stub); + handler.handleApplyBucketDiffReply(*applyBucketDiffReply, stub, createTracker(applyBucketDiffReply, _bucket)); ASSERT_EQ(1, stub.commands.size()); @@ -1012,10 +1012,10 @@ MergeHandlerTest::HandleApplyBucketDiffReplyInvoker::invoke( spi::Context&) { (void) test; - api::ApplyBucketDiffReply reply(*_applyCmd); - test.fillDummyApplyDiff(reply.getDiff()); + auto reply = std::make_shared<api::ApplyBucketDiffReply>(*_applyCmd); + test.fillDummyApplyDiff(reply->getDiff()); _stub.clear(); - handler.handleApplyBucketDiffReply(reply, _stub); + handler.handleApplyBucketDiffReply(*reply, _stub, test.createTracker(reply, test._bucket)); } std::string @@ -1301,7 +1301,7 @@ TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply) auto& cmd3 = dynamic_cast<api::ApplyBucketDiffCommand&>(*messageKeeper()._msgs[1]); // ApplyBucketDiffCommand has a shorter node list, node 2 is not present EXPECT_EQ((NodeList{{0, false}, {1, false}, {3, true}}), cmd3.getNodes()); - auto reply = std::make_unique<api::ApplyBucketDiffReply>(cmd3); + auto reply = std::make_shared<api::ApplyBucketDiffReply>(cmd3); auto& diff = reply->getDiff(); EXPECT_EQ(2u, diff.size()); EXPECT_EQ(EntryCheck(20000u, 4u), diff[0]._entry); @@ -1312,7 +1312,7 @@ TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply) */ fill_entry(diff[0], *doc1, getEnv().getDocumentTypeRepo()); diff[0]._entry._hasMask |= 2u; // Simulate diff entry having been applied on node 1. - handler.handleApplyBucketDiffReply(*reply, messageKeeper()); + handler.handleApplyBucketDiffReply(*reply, messageKeeper(), createTracker(reply, _bucket)); LOG(debug, "handled first ApplyBucketDiffReply"); } ASSERT_EQ(3u, messageKeeper()._msgs.size()); @@ -1326,13 +1326,13 @@ TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply) EXPECT_EQ(EntryCheck(20100, 24u), s.diff[baseline_diff_size]); auto& cmd4 = dynamic_cast<api::ApplyBucketDiffCommand&>(*messageKeeper()._msgs[2]); EXPECT_EQ((NodeList{{0, false}, {1, false}, {3, true}}), cmd4.getNodes()); - auto reply = std::make_unique<api::ApplyBucketDiffReply>(cmd4); + auto reply = std::make_shared<api::ApplyBucketDiffReply>(cmd4); auto& diff = reply->getDiff(); EXPECT_EQ(1u, diff.size()); EXPECT_EQ(EntryCheck(20100u, 4u), diff[0]._entry); // Simulate that node 3 somehow lost doc2 when trying to fill diff entry. diff[0]._entry._hasMask &= ~4u; - handler.handleApplyBucketDiffReply(*reply, messageKeeper()); + handler.handleApplyBucketDiffReply(*reply, messageKeeper(), createTracker(reply, _bucket)); LOG(debug, "handled second ApplyBucketDiffReply"); } ASSERT_EQ(4u, messageKeeper()._msgs.size()); @@ -1347,14 +1347,14 @@ TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply) EXPECT_EQ(EntryCheck(20100, 16u), s.diff[baseline_diff_size]); auto& cmd5 = dynamic_cast<api::ApplyBucketDiffCommand&>(*messageKeeper()._msgs[3]); EXPECT_EQ((NodeList{{0, false}, {1, false}}), cmd5.getNodes()); - auto reply = std::make_unique<api::ApplyBucketDiffReply>(cmd5); + auto reply = std::make_shared<api::ApplyBucketDiffReply>(cmd5); auto& diff = reply->getDiff(); EXPECT_EQ(baseline_diff_size, diff.size()); for (auto& e : diff) { EXPECT_EQ(1u, e._entry._hasMask); e._entry._hasMask |= 2u; } - handler.handleApplyBucketDiffReply(*reply, messageKeeper()); + handler.handleApplyBucketDiffReply(*reply, messageKeeper(), createTracker(reply, _bucket)); LOG(debug, "handled third ApplyBucketDiffReply"); } ASSERT_EQ(5u, messageKeeper()._msgs.size()); @@ -1369,12 +1369,12 @@ TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply) EXPECT_EQ(EntryCheck(20100, 16u), s.diff[0]); auto& cmd6 = dynamic_cast<api::ApplyBucketDiffCommand&>(*messageKeeper()._msgs[4]); EXPECT_EQ((NodeList{{0, false}, {1, false}, {4, true}}), cmd6.getNodes()); - auto reply = std::make_unique<api::ApplyBucketDiffReply>(cmd6); + auto reply = std::make_shared<api::ApplyBucketDiffReply>(cmd6); auto& diff = reply->getDiff(); EXPECT_EQ(1u, diff.size()); fill_entry(diff[0], *doc2, getEnv().getDocumentTypeRepo()); diff[0]._entry._hasMask |= 2u; - handler.handleApplyBucketDiffReply(*reply, messageKeeper()); + handler.handleApplyBucketDiffReply(*reply, messageKeeper(), createTracker(reply, _bucket)); LOG(debug, "handled fourth ApplyBucketDiffReply"); } ASSERT_EQ(6u, messageKeeper()._msgs.size()); diff --git a/storage/src/vespa/storage/config/stor-server.def b/storage/src/vespa/storage/config/stor-server.def index 6611c3cba91..db660fb70d0 100644 --- a/storage/src/vespa/storage/config/stor-server.def +++ b/storage/src/vespa/storage/config/stor-server.def @@ -53,6 +53,10 @@ resource_exhaustion_merge_back_pressure_duration_secs double default=30.0 ## a busy-reply that would subsequently be unwound through the entire merge chain. disable_queue_limits_for_chained_merges bool default=false +## If set, portions of apply bucket diff handling will be performed asynchronously +## with persistence thread not waiting for local writes to complete. +async_apply_bucket_diff bool default=false + ## Whether the deadlock detector should be enabled or not. If disabled, it will ## still run, but it will never actually abort the process it is running in. enable_dead_lock_detector bool default=false restart diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp index 7693156ae30..4b0560424e5 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.cpp +++ b/storage/src/vespa/storage/persistence/mergehandler.cpp @@ -1282,8 +1282,9 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTra } void -MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,MessageSender& sender) const +MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,MessageSender& sender, MessageTracker::UP tracker) const { + (void) tracker; _env._metrics.applyBucketDiffReply.inc(); spi::Bucket bucket(reply.getBucket()); ApplyBucketDiffState async_results(*this, bucket); diff --git a/storage/src/vespa/storage/persistence/mergehandler.h b/storage/src/vespa/storage/persistence/mergehandler.h index fa7e21dae78..e3b5b0899f6 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.h +++ b/storage/src/vespa/storage/persistence/mergehandler.h @@ -68,7 +68,7 @@ public: MessageTrackerUP handleGetBucketDiff(api::GetBucketDiffCommand&, MessageTrackerUP) const; void handleGetBucketDiffReply(api::GetBucketDiffReply&, MessageSender&) const; MessageTrackerUP handleApplyBucketDiff(api::ApplyBucketDiffCommand&, MessageTrackerUP) const; - void handleApplyBucketDiffReply(api::ApplyBucketDiffReply&, MessageSender&) const; + void handleApplyBucketDiffReply(api::ApplyBucketDiffReply&, MessageSender&, MessageTrackerUP) const; private: const framework::Clock &_clock; diff --git a/storage/src/vespa/storage/persistence/persistencehandler.cpp b/storage/src/vespa/storage/persistence/persistencehandler.cpp index 297185ac54c..2106d1385ae 100644 --- a/storage/src/vespa/storage/persistence/persistencehandler.cpp +++ b/storage/src/vespa/storage/persistence/persistencehandler.cpp @@ -87,19 +87,20 @@ PersistenceHandler::handleCommandSplitByType(api::StorageCommand& msg, MessageTr return MessageTracker::UP(); } -void -PersistenceHandler::handleReply(api::StorageReply& reply) const +MessageTracker::UP +PersistenceHandler::handleReply(api::StorageReply& reply, MessageTracker::UP tracker) const { switch (reply.getType().getId()) { case api::MessageType::GETBUCKETDIFF_REPLY_ID: _mergeHandler.handleGetBucketDiffReply(static_cast<api::GetBucketDiffReply&>(reply), _env._fileStorHandler); break; case api::MessageType::APPLYBUCKETDIFF_REPLY_ID: - _mergeHandler.handleApplyBucketDiffReply(static_cast<api::ApplyBucketDiffReply&>(reply), _env._fileStorHandler); + _mergeHandler.handleApplyBucketDiffReply(static_cast<api::ApplyBucketDiffReply&>(reply), _env._fileStorHandler, std::move(tracker)); break; default: break; } + return tracker; } MessageTracker::UP @@ -112,7 +113,7 @@ PersistenceHandler::processMessage(api::StorageMessage& msg, MessageTracker::UP try{ LOG(debug, "Handling reply: %s", msg.toString().c_str()); LOG(spam, "Message content: %s", msg.toString(true).c_str()); - handleReply(static_cast<api::StorageReply&>(msg)); + return handleReply(static_cast<api::StorageReply&>(msg), std::move(tracker)); } catch (std::exception& e) { // It's a reply, so nothing we can do. LOG(debug, "Caught exception for %s: %s", msg.toString().c_str(), e.what()); diff --git a/storage/src/vespa/storage/persistence/persistencehandler.h b/storage/src/vespa/storage/persistence/persistencehandler.h index a800d1d4053..a92c2dc78ca 100644 --- a/storage/src/vespa/storage/persistence/persistencehandler.h +++ b/storage/src/vespa/storage/persistence/persistencehandler.h @@ -38,7 +38,7 @@ public: private: // Message handling functions MessageTracker::UP handleCommandSplitByType(api::StorageCommand&, MessageTracker::UP tracker) const; - void handleReply(api::StorageReply&) const; + MessageTracker::UP handleReply(api::StorageReply&, MessageTracker::UP) const; MessageTracker::UP processMessage(api::StorageMessage& msg, MessageTracker::UP tracker) const; |