From ef9844b6896a5eb5a283ff4cda81296d06661197 Mon Sep 17 00:00:00 2001 From: Tor Egge Date: Tue, 19 Oct 2021 14:46:11 +0200 Subject: Change editMergeStatus to return a shared pointer to merge status to ensure it remains live while being used in merge handler. --- storage/src/tests/persistence/mergehandlertest.cpp | 38 +++++++-------- .../persistence/filestorage/filestorhandler.h | 4 +- .../filestorage/filestorhandlerimpl.cpp | 4 +- .../persistence/filestorage/filestorhandlerimpl.h | 2 +- .../src/vespa/storage/persistence/mergehandler.cpp | 54 +++++++++++----------- 5 files changed, 51 insertions(+), 51 deletions(-) diff --git a/storage/src/tests/persistence/mergehandlertest.cpp b/storage/src/tests/persistence/mergehandlertest.cpp index e3b63c1bdf9..60030004594 100644 --- a/storage/src/tests/persistence/mergehandlertest.cpp +++ b/storage/src/tests/persistence/mergehandlertest.cpp @@ -1275,8 +1275,8 @@ TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply) EXPECT_EQ(1, cmd2.getAddress()->getIndex()); EXPECT_EQ(1234, cmd2.getSourceIndex()); EXPECT_TRUE(getEnv()._fileStorHandler.isMerging(_bucket)); - auto &s = getEnv()._fileStorHandler.editMergeStatus(_bucket); - EXPECT_EQ((NodeList{{0, false}, {1, false}, {2, true}, {3, true}, {4, true}}), s.nodeList); + auto s = getEnv()._fileStorHandler.editMergeStatus(_bucket); + EXPECT_EQ((NodeList{{0, false}, {1, false}, {2, true}, {3, true}, {4, true}}), s->nodeList); baseline_diff_size = cmd2.getDiff().size(); auto reply = std::make_unique(cmd2); auto &diff = reply->getDiff(); @@ -1292,12 +1292,12 @@ TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply) { LOG(debug, "checking first ApplyBucketDiff command"); EXPECT_TRUE(getEnv()._fileStorHandler.isMerging(_bucket)); - auto &s = getEnv()._fileStorHandler.editMergeStatus(_bucket); + auto s = getEnv()._fileStorHandler.editMergeStatus(_bucket); // Node 4 has been eliminated before the first ApplyBucketDiff command - EXPECT_EQ((NodeList{{0, false}, {1, false}, {2, true}, {3, true}}), s.nodeList); - EXPECT_EQ(baseline_diff_size + 2u, s.diff.size()); - EXPECT_EQ(EntryCheck(20000, 24u), s.diff[baseline_diff_size]); - EXPECT_EQ(EntryCheck(20100, 24u), s.diff[baseline_diff_size + 1]); + EXPECT_EQ((NodeList{{0, false}, {1, false}, {2, true}, {3, true}}), s->nodeList); + EXPECT_EQ(baseline_diff_size + 2u, s->diff.size()); + EXPECT_EQ(EntryCheck(20000, 24u), s->diff[baseline_diff_size]); + EXPECT_EQ(EntryCheck(20100, 24u), s->diff[baseline_diff_size + 1]); auto& cmd3 = dynamic_cast(*messageKeeper()._msgs[1]); // ApplyBucketDiffCommand has a shorter node list, node 2 is not present EXPECT_EQ((NodeList{{0, false}, {1, false}, {3, true}}), cmd3.getNodes()); @@ -1320,10 +1320,10 @@ TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply) { LOG(debug, "checking second ApplyBucketDiff command"); EXPECT_TRUE(getEnv()._fileStorHandler.isMerging(_bucket)); - auto &s = getEnv()._fileStorHandler.editMergeStatus(_bucket); - EXPECT_EQ((NodeList{{0, false}, {1, false}, {2, true}, {3, true}}), s.nodeList); - EXPECT_EQ(baseline_diff_size + 1u, s.diff.size()); - EXPECT_EQ(EntryCheck(20100, 24u), s.diff[baseline_diff_size]); + auto s = getEnv()._fileStorHandler.editMergeStatus(_bucket); + EXPECT_EQ((NodeList{{0, false}, {1, false}, {2, true}, {3, true}}), s->nodeList); + EXPECT_EQ(baseline_diff_size + 1u, s->diff.size()); + EXPECT_EQ(EntryCheck(20100, 24u), s->diff[baseline_diff_size]); auto& cmd4 = dynamic_cast(*messageKeeper()._msgs[2]); EXPECT_EQ((NodeList{{0, false}, {1, false}, {3, true}}), cmd4.getNodes()); auto reply = std::make_shared(cmd4); @@ -1340,11 +1340,11 @@ TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply) { LOG(debug, "checking third ApplyBucketDiff command"); EXPECT_TRUE(getEnv()._fileStorHandler.isMerging(_bucket)); - auto &s = getEnv()._fileStorHandler.editMergeStatus(_bucket); + auto s = getEnv()._fileStorHandler.editMergeStatus(_bucket); // Nodes 3 and 2 have been eliminated before the third ApplyBucketDiff command - EXPECT_EQ((NodeList{{0, false}, {1, false}}), s.nodeList); - EXPECT_EQ(baseline_diff_size + 1u, s.diff.size()); - EXPECT_EQ(EntryCheck(20100, 16u), s.diff[baseline_diff_size]); + EXPECT_EQ((NodeList{{0, false}, {1, false}}), s->nodeList); + EXPECT_EQ(baseline_diff_size + 1u, s->diff.size()); + EXPECT_EQ(EntryCheck(20100, 16u), s->diff[baseline_diff_size]); auto& cmd5 = dynamic_cast(*messageKeeper()._msgs[3]); EXPECT_EQ((NodeList{{0, false}, {1, false}}), cmd5.getNodes()); auto reply = std::make_shared(cmd5); @@ -1362,11 +1362,11 @@ TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply) { LOG(debug, "checking fourth ApplyBucketDiff command"); EXPECT_TRUE(getEnv()._fileStorHandler.isMerging(_bucket)); - auto &s = getEnv()._fileStorHandler.editMergeStatus(_bucket); + auto s = getEnv()._fileStorHandler.editMergeStatus(_bucket); // All nodes in use again due to failure to fill diff entry for doc2 - EXPECT_EQ((NodeList{{0, false}, {1, false}, {2, true}, {3, true}, {4, true}}), s.nodeList); - EXPECT_EQ(1u, s.diff.size()); - EXPECT_EQ(EntryCheck(20100, 16u), s.diff[0]); + EXPECT_EQ((NodeList{{0, false}, {1, false}, {2, true}, {3, true}, {4, true}}), s->nodeList); + EXPECT_EQ(1u, s->diff.size()); + EXPECT_EQ(EntryCheck(20100, 16u), s->diff[0]); auto& cmd6 = dynamic_cast(*messageKeeper()._msgs[4]); EXPECT_EQ((NodeList{{0, false}, {1, false}, {4, true}}), cmd6.getNodes()); auto reply = std::make_shared(cmd6); diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h index 0dcb8539bff..70ed9845cb0 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h @@ -200,13 +200,13 @@ public: virtual void addMergeStatus(const document::Bucket&, std::shared_ptr) = 0; /** - * Returns the reference to the current merge status for the given bucket. + * Returns a shared pointer to the current merge status for the given bucket. * This allows unlocked access to an internal variable, so users should * first check that noone else is using it by calling isMerging() first. * * @param bucket The bucket to start merging. */ - virtual MergeStatus& editMergeStatus(const document::Bucket& bucket) = 0; + virtual std::shared_ptr editMergeStatus(const document::Bucket& bucket) = 0; /** * Returns true if the bucket is currently being merged on this node. diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp index 62f9fa12a21..e395a7df9e0 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp @@ -86,7 +86,7 @@ FileStorHandlerImpl::addMergeStatus(const document::Bucket& bucket, std::shared_ _mergeStates[bucket] = status; } -MergeStatus& +std::shared_ptr FileStorHandlerImpl::editMergeStatus(const document::Bucket& bucket) { std::lock_guard mlock(_mergeStatesLock); @@ -94,7 +94,7 @@ FileStorHandlerImpl::editMergeStatus(const document::Bucket& bucket) if ( ! status ) { throw vespalib::IllegalStateException("No merge state exist for " + bucket.toString(), VESPA_STRLOC); } - return *status; + return status; } bool diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h index a3dc316cdde..5f212b18a7f 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h @@ -218,7 +218,7 @@ public: } void addMergeStatus(const document::Bucket&, std::shared_ptr) override; - MergeStatus& editMergeStatus(const document::Bucket&) override; + std::shared_ptr editMergeStatus(const document::Bucket&) override; bool isMerging(const document::Bucket&) const override; void clearMergeStatus(const document::Bucket& bucket) override; void clearMergeStatus(const document::Bucket& bucket, const api::ReturnCode& code) override; diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp index 4b0560424e5..b7ff18f4518 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.cpp +++ b/storage/src/vespa/storage/persistence/mergehandler.cpp @@ -1136,49 +1136,49 @@ MergeHandler::handleGetBucketDiffReply(api::GetBucketDiffReply& reply, MessageSe return; } - MergeStatus& s = _env._fileStorHandler.editMergeStatus(bucket.getBucket()); - if (s.pendingId != reply.getMsgId()) { + auto s = _env._fileStorHandler.editMergeStatus(bucket.getBucket()); + if (s->pendingId != reply.getMsgId()) { LOG(warning, "Got GetBucketDiffReply for %s which had message " "id %" PRIu64 " when we expected %" PRIu64 ". Ignoring reply.", - bucket.toString().c_str(), reply.getMsgId(), s.pendingId); + bucket.toString().c_str(), reply.getMsgId(), s->pendingId); return; } api::StorageReply::SP replyToSend; bool clearState = true; try { - if (s.isFirstNode()) { + if (s->isFirstNode()) { if (reply.getResult().failed()) { // We failed, so we should reply to the pending message. - replyToSend = s.reply; + replyToSend = s->reply; } else { // If we didn't fail, reply should have good content // Sanity check for nodes assert(reply.getNodes().size() >= 2); // Get bucket diff should retrieve all info at once - assert(s.diff.size() == 0); - s.diff.insert(s.diff.end(), + assert(s->diff.size() == 0); + s->diff.insert(s->diff.end(), reply.getDiff().begin(), reply.getDiff().end()); - replyToSend = processBucketMerge(bucket, s, sender, s.context); + replyToSend = processBucketMerge(bucket, *s, sender, s->context); if (!replyToSend.get()) { // We have sent something on, and shouldn't reply now. clearState = false; } else { _env._metrics.merge_handler_metrics.mergeLatencyTotal.addValue( - s.startTime.getElapsedTimeAsDouble()); + s->startTime.getElapsedTimeAsDouble()); } } } else { // Exists in send on list, send on! - replyToSend = s.pendingGetDiff; + replyToSend = s->pendingGetDiff; LOG(spam, "Received GetBucketDiffReply for %s with diff of " "size %zu. Sending it on.", bucket.toString().c_str(), reply.getDiff().size()); - s.pendingGetDiff->getDiff().swap(reply.getDiff()); + s->pendingGetDiff->getDiff().swap(reply.getDiff()); } } catch (std::exception& e) { _env._fileStorHandler.clearMergeStatus( @@ -1297,11 +1297,11 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,Messag return; } - MergeStatus& s = _env._fileStorHandler.editMergeStatus(bucket.getBucket()); - if (s.pendingId != reply.getMsgId()) { + auto s = _env._fileStorHandler.editMergeStatus(bucket.getBucket()); + if (s->pendingId != reply.getMsgId()) { LOG(warning, "Got ApplyBucketDiffReply for %s which had message " "id %" PRIu64 " when we expected %" PRIu64 ". Ignoring reply.", - bucket.toString().c_str(), reply.getMsgId(), s.pendingId); + bucket.toString().c_str(), reply.getMsgId(), s->pendingId); return; } bool clearState = true; @@ -1316,12 +1316,12 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,Messag uint8_t index = findOwnIndex(reply.getNodes(), _env._nodeIndex); if (applyDiffNeedLocalData(diff, index, false)) { framework::MilliSecTimer startTime(_clock); - fetchLocalData(bucket, diff, index, s.context); + fetchLocalData(bucket, diff, index, s->context); _env._metrics.merge_handler_metrics.mergeDataReadLatency.addValue(startTime.getElapsedTimeAsDouble()); } if (applyDiffHasLocallyNeededData(diff, index)) { framework::MilliSecTimer startTime(_clock); - applyDiffLocally(bucket, diff, index, s.context, async_results); + applyDiffLocally(bucket, diff, index, s->context, async_results); _env._metrics.merge_handler_metrics.mergeDataWriteLatency.addValue(startTime.getElapsedTimeAsDouble()); async_results.check(); async_results.sync_bucket_info(); @@ -1333,50 +1333,50 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,Messag } } - if (s.isFirstNode()) { + if (s->isFirstNode()) { uint16_t hasMask = 0; for (uint16_t i=0; idiff.size(); + const bool altered = s->removeFromDiff(diff, hasMask, reply.getNodes()); if (reply.getResult().success() - && s.diff.size() == diffSizeBefore + && s->diff.size() == diffSizeBefore && !altered) { std::string msg( vespalib::make_string( "Completed merge cycle without fixing " "any entries (merge state diff at %zu entries)", - s.diff.size())); + s->diff.size())); returnCode = api::ReturnCode(api::ReturnCode::INTERNAL_FAILURE, msg); LOG(warning, "Got reply indicating merge cycle did not fix any entries: %s", reply.toString(true).c_str()); LOG(warning, "Merge state for which there was no progress across a full merge cycle: %s", - s.toString().c_str()); + s->toString().c_str()); } if (returnCode.failed()) { // Should reply now, since we failed. - replyToSend = s.reply; + replyToSend = s->reply; } else { - replyToSend = processBucketMerge(bucket, s, sender, s.context); + replyToSend = processBucketMerge(bucket, *s, sender, s->context); if (!replyToSend.get()) { // We have sent something on and shouldn't reply now. clearState = false; } else { - _env._metrics.merge_handler_metrics.mergeLatencyTotal.addValue(s.startTime.getElapsedTimeAsDouble()); + _env._metrics.merge_handler_metrics.mergeLatencyTotal.addValue(s->startTime.getElapsedTimeAsDouble()); } } } else { - replyToSend = s.pendingApplyDiff; + replyToSend = s->pendingApplyDiff; LOG(debug, "ApplyBucketDiff(%s) finished. Sending reply.", bucket.toString().c_str()); - s.pendingApplyDiff->getDiff().swap(reply.getDiff()); + s->pendingApplyDiff->getDiff().swap(reply.getDiff()); } } catch (std::exception& e) { _env._fileStorHandler.clearMergeStatus( -- cgit v1.2.3