summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorGeir Storli <geirst@yahooinc.com>2021-10-19 15:56:27 +0200
committerGitHub <noreply@github.com>2021-10-19 15:56:27 +0200
commit031c3aa73c9dffb8bd69ea2e3c0f0590881f4279 (patch)
treeb9e171d2b6138b4e6fee9f082163b84111966f57 /storage
parentc36b42b9efeba0fd855ffc8463539270a2fa7d82 (diff)
parentef9844b6896a5eb5a283ff4cda81296d06661197 (diff)
Merge pull request #19636 from vespa-engine/toregge/protect-merge-status-lifetime
Change editMergeStatus to return a shared pointer to merge status
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/persistence/mergehandlertest.cpp38
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandler.h4
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp4
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h2
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.cpp54
5 files changed, 51 insertions, 51 deletions
diff --git a/storage/src/tests/persistence/mergehandlertest.cpp b/storage/src/tests/persistence/mergehandlertest.cpp
index e3b63c1bdf9..60030004594 100644
--- a/storage/src/tests/persistence/mergehandlertest.cpp
+++ b/storage/src/tests/persistence/mergehandlertest.cpp
@@ -1275,8 +1275,8 @@ TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply)
EXPECT_EQ(1, cmd2.getAddress()->getIndex());
EXPECT_EQ(1234, cmd2.getSourceIndex());
EXPECT_TRUE(getEnv()._fileStorHandler.isMerging(_bucket));
- auto &s = getEnv()._fileStorHandler.editMergeStatus(_bucket);
- EXPECT_EQ((NodeList{{0, false}, {1, false}, {2, true}, {3, true}, {4, true}}), s.nodeList);
+ auto s = getEnv()._fileStorHandler.editMergeStatus(_bucket);
+ EXPECT_EQ((NodeList{{0, false}, {1, false}, {2, true}, {3, true}, {4, true}}), s->nodeList);
baseline_diff_size = cmd2.getDiff().size();
auto reply = std::make_unique<api::GetBucketDiffReply>(cmd2);
auto &diff = reply->getDiff();
@@ -1292,12 +1292,12 @@ TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply)
{
LOG(debug, "checking first ApplyBucketDiff command");
EXPECT_TRUE(getEnv()._fileStorHandler.isMerging(_bucket));
- auto &s = getEnv()._fileStorHandler.editMergeStatus(_bucket);
+ auto s = getEnv()._fileStorHandler.editMergeStatus(_bucket);
// Node 4 has been eliminated before the first ApplyBucketDiff command
- EXPECT_EQ((NodeList{{0, false}, {1, false}, {2, true}, {3, true}}), s.nodeList);
- EXPECT_EQ(baseline_diff_size + 2u, s.diff.size());
- EXPECT_EQ(EntryCheck(20000, 24u), s.diff[baseline_diff_size]);
- EXPECT_EQ(EntryCheck(20100, 24u), s.diff[baseline_diff_size + 1]);
+ EXPECT_EQ((NodeList{{0, false}, {1, false}, {2, true}, {3, true}}), s->nodeList);
+ EXPECT_EQ(baseline_diff_size + 2u, s->diff.size());
+ EXPECT_EQ(EntryCheck(20000, 24u), s->diff[baseline_diff_size]);
+ EXPECT_EQ(EntryCheck(20100, 24u), s->diff[baseline_diff_size + 1]);
auto& cmd3 = dynamic_cast<api::ApplyBucketDiffCommand&>(*messageKeeper()._msgs[1]);
// ApplyBucketDiffCommand has a shorter node list, node 2 is not present
EXPECT_EQ((NodeList{{0, false}, {1, false}, {3, true}}), cmd3.getNodes());
@@ -1320,10 +1320,10 @@ TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply)
{
LOG(debug, "checking second ApplyBucketDiff command");
EXPECT_TRUE(getEnv()._fileStorHandler.isMerging(_bucket));
- auto &s = getEnv()._fileStorHandler.editMergeStatus(_bucket);
- EXPECT_EQ((NodeList{{0, false}, {1, false}, {2, true}, {3, true}}), s.nodeList);
- EXPECT_EQ(baseline_diff_size + 1u, s.diff.size());
- EXPECT_EQ(EntryCheck(20100, 24u), s.diff[baseline_diff_size]);
+ auto s = getEnv()._fileStorHandler.editMergeStatus(_bucket);
+ EXPECT_EQ((NodeList{{0, false}, {1, false}, {2, true}, {3, true}}), s->nodeList);
+ EXPECT_EQ(baseline_diff_size + 1u, s->diff.size());
+ EXPECT_EQ(EntryCheck(20100, 24u), s->diff[baseline_diff_size]);
auto& cmd4 = dynamic_cast<api::ApplyBucketDiffCommand&>(*messageKeeper()._msgs[2]);
EXPECT_EQ((NodeList{{0, false}, {1, false}, {3, true}}), cmd4.getNodes());
auto reply = std::make_shared<api::ApplyBucketDiffReply>(cmd4);
@@ -1340,11 +1340,11 @@ TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply)
{
LOG(debug, "checking third ApplyBucketDiff command");
EXPECT_TRUE(getEnv()._fileStorHandler.isMerging(_bucket));
- auto &s = getEnv()._fileStorHandler.editMergeStatus(_bucket);
+ auto s = getEnv()._fileStorHandler.editMergeStatus(_bucket);
// Nodes 3 and 2 have been eliminated before the third ApplyBucketDiff command
- EXPECT_EQ((NodeList{{0, false}, {1, false}}), s.nodeList);
- EXPECT_EQ(baseline_diff_size + 1u, s.diff.size());
- EXPECT_EQ(EntryCheck(20100, 16u), s.diff[baseline_diff_size]);
+ EXPECT_EQ((NodeList{{0, false}, {1, false}}), s->nodeList);
+ EXPECT_EQ(baseline_diff_size + 1u, s->diff.size());
+ EXPECT_EQ(EntryCheck(20100, 16u), s->diff[baseline_diff_size]);
auto& cmd5 = dynamic_cast<api::ApplyBucketDiffCommand&>(*messageKeeper()._msgs[3]);
EXPECT_EQ((NodeList{{0, false}, {1, false}}), cmd5.getNodes());
auto reply = std::make_shared<api::ApplyBucketDiffReply>(cmd5);
@@ -1362,11 +1362,11 @@ TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply)
{
LOG(debug, "checking fourth ApplyBucketDiff command");
EXPECT_TRUE(getEnv()._fileStorHandler.isMerging(_bucket));
- auto &s = getEnv()._fileStorHandler.editMergeStatus(_bucket);
+ auto s = getEnv()._fileStorHandler.editMergeStatus(_bucket);
// All nodes in use again due to failure to fill diff entry for doc2
- EXPECT_EQ((NodeList{{0, false}, {1, false}, {2, true}, {3, true}, {4, true}}), s.nodeList);
- EXPECT_EQ(1u, s.diff.size());
- EXPECT_EQ(EntryCheck(20100, 16u), s.diff[0]);
+ EXPECT_EQ((NodeList{{0, false}, {1, false}, {2, true}, {3, true}, {4, true}}), s->nodeList);
+ EXPECT_EQ(1u, s->diff.size());
+ EXPECT_EQ(EntryCheck(20100, 16u), s->diff[0]);
auto& cmd6 = dynamic_cast<api::ApplyBucketDiffCommand&>(*messageKeeper()._msgs[4]);
EXPECT_EQ((NodeList{{0, false}, {1, false}, {4, true}}), cmd6.getNodes());
auto reply = std::make_shared<api::ApplyBucketDiffReply>(cmd6);
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
index 0dcb8539bff..70ed9845cb0 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
@@ -200,13 +200,13 @@ public:
virtual void addMergeStatus(const document::Bucket&, std::shared_ptr<MergeStatus>) = 0;
/**
- * Returns the reference to the current merge status for the given bucket.
+ * Returns a shared pointer to the current merge status for the given bucket.
* This allows unlocked access to an internal variable, so users should
* first check that noone else is using it by calling isMerging() first.
*
* @param bucket The bucket to start merging.
*/
- virtual MergeStatus& editMergeStatus(const document::Bucket& bucket) = 0;
+ virtual std::shared_ptr<MergeStatus> editMergeStatus(const document::Bucket& bucket) = 0;
/**
* Returns true if the bucket is currently being merged on this node.
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
index 62f9fa12a21..e395a7df9e0 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
@@ -86,7 +86,7 @@ FileStorHandlerImpl::addMergeStatus(const document::Bucket& bucket, std::shared_
_mergeStates[bucket] = status;
}
-MergeStatus&
+std::shared_ptr<MergeStatus>
FileStorHandlerImpl::editMergeStatus(const document::Bucket& bucket)
{
std::lock_guard mlock(_mergeStatesLock);
@@ -94,7 +94,7 @@ FileStorHandlerImpl::editMergeStatus(const document::Bucket& bucket)
if ( ! status ) {
throw vespalib::IllegalStateException("No merge state exist for " + bucket.toString(), VESPA_STRLOC);
}
- return *status;
+ return status;
}
bool
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
index a3dc316cdde..5f212b18a7f 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
@@ -218,7 +218,7 @@ public:
}
void addMergeStatus(const document::Bucket&, std::shared_ptr<MergeStatus>) override;
- MergeStatus& editMergeStatus(const document::Bucket&) override;
+ std::shared_ptr<MergeStatus> editMergeStatus(const document::Bucket&) override;
bool isMerging(const document::Bucket&) const override;
void clearMergeStatus(const document::Bucket& bucket) override;
void clearMergeStatus(const document::Bucket& bucket, const api::ReturnCode& code) override;
diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp
index 4b0560424e5..b7ff18f4518 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.cpp
+++ b/storage/src/vespa/storage/persistence/mergehandler.cpp
@@ -1136,49 +1136,49 @@ MergeHandler::handleGetBucketDiffReply(api::GetBucketDiffReply& reply, MessageSe
return;
}
- MergeStatus& s = _env._fileStorHandler.editMergeStatus(bucket.getBucket());
- if (s.pendingId != reply.getMsgId()) {
+ auto s = _env._fileStorHandler.editMergeStatus(bucket.getBucket());
+ if (s->pendingId != reply.getMsgId()) {
LOG(warning, "Got GetBucketDiffReply for %s which had message "
"id %" PRIu64 " when we expected %" PRIu64 ". Ignoring reply.",
- bucket.toString().c_str(), reply.getMsgId(), s.pendingId);
+ bucket.toString().c_str(), reply.getMsgId(), s->pendingId);
return;
}
api::StorageReply::SP replyToSend;
bool clearState = true;
try {
- if (s.isFirstNode()) {
+ if (s->isFirstNode()) {
if (reply.getResult().failed()) {
// We failed, so we should reply to the pending message.
- replyToSend = s.reply;
+ replyToSend = s->reply;
} else {
// If we didn't fail, reply should have good content
// Sanity check for nodes
assert(reply.getNodes().size() >= 2);
// Get bucket diff should retrieve all info at once
- assert(s.diff.size() == 0);
- s.diff.insert(s.diff.end(),
+ assert(s->diff.size() == 0);
+ s->diff.insert(s->diff.end(),
reply.getDiff().begin(),
reply.getDiff().end());
- replyToSend = processBucketMerge(bucket, s, sender, s.context);
+ replyToSend = processBucketMerge(bucket, *s, sender, s->context);
if (!replyToSend.get()) {
// We have sent something on, and shouldn't reply now.
clearState = false;
} else {
_env._metrics.merge_handler_metrics.mergeLatencyTotal.addValue(
- s.startTime.getElapsedTimeAsDouble());
+ s->startTime.getElapsedTimeAsDouble());
}
}
} else {
// Exists in send on list, send on!
- replyToSend = s.pendingGetDiff;
+ replyToSend = s->pendingGetDiff;
LOG(spam, "Received GetBucketDiffReply for %s with diff of "
"size %zu. Sending it on.",
bucket.toString().c_str(), reply.getDiff().size());
- s.pendingGetDiff->getDiff().swap(reply.getDiff());
+ s->pendingGetDiff->getDiff().swap(reply.getDiff());
}
} catch (std::exception& e) {
_env._fileStorHandler.clearMergeStatus(
@@ -1297,11 +1297,11 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,Messag
return;
}
- MergeStatus& s = _env._fileStorHandler.editMergeStatus(bucket.getBucket());
- if (s.pendingId != reply.getMsgId()) {
+ auto s = _env._fileStorHandler.editMergeStatus(bucket.getBucket());
+ if (s->pendingId != reply.getMsgId()) {
LOG(warning, "Got ApplyBucketDiffReply for %s which had message "
"id %" PRIu64 " when we expected %" PRIu64 ". Ignoring reply.",
- bucket.toString().c_str(), reply.getMsgId(), s.pendingId);
+ bucket.toString().c_str(), reply.getMsgId(), s->pendingId);
return;
}
bool clearState = true;
@@ -1316,12 +1316,12 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,Messag
uint8_t index = findOwnIndex(reply.getNodes(), _env._nodeIndex);
if (applyDiffNeedLocalData(diff, index, false)) {
framework::MilliSecTimer startTime(_clock);
- fetchLocalData(bucket, diff, index, s.context);
+ fetchLocalData(bucket, diff, index, s->context);
_env._metrics.merge_handler_metrics.mergeDataReadLatency.addValue(startTime.getElapsedTimeAsDouble());
}
if (applyDiffHasLocallyNeededData(diff, index)) {
framework::MilliSecTimer startTime(_clock);
- applyDiffLocally(bucket, diff, index, s.context, async_results);
+ applyDiffLocally(bucket, diff, index, s->context, async_results);
_env._metrics.merge_handler_metrics.mergeDataWriteLatency.addValue(startTime.getElapsedTimeAsDouble());
async_results.check();
async_results.sync_bucket_info();
@@ -1333,50 +1333,50 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,Messag
}
}
- if (s.isFirstNode()) {
+ if (s->isFirstNode()) {
uint16_t hasMask = 0;
for (uint16_t i=0; i<reply.getNodes().size(); ++i) {
hasMask |= (1 << i);
}
- const size_t diffSizeBefore = s.diff.size();
- const bool altered = s.removeFromDiff(diff, hasMask, reply.getNodes());
+ const size_t diffSizeBefore = s->diff.size();
+ const bool altered = s->removeFromDiff(diff, hasMask, reply.getNodes());
if (reply.getResult().success()
- && s.diff.size() == diffSizeBefore
+ && s->diff.size() == diffSizeBefore
&& !altered)
{
std::string msg(
vespalib::make_string(
"Completed merge cycle without fixing "
"any entries (merge state diff at %zu entries)",
- s.diff.size()));
+ s->diff.size()));
returnCode = api::ReturnCode(api::ReturnCode::INTERNAL_FAILURE, msg);
LOG(warning,
"Got reply indicating merge cycle did not fix any entries: %s",
reply.toString(true).c_str());
LOG(warning,
"Merge state for which there was no progress across a full merge cycle: %s",
- s.toString().c_str());
+ s->toString().c_str());
}
if (returnCode.failed()) {
// Should reply now, since we failed.
- replyToSend = s.reply;
+ replyToSend = s->reply;
} else {
- replyToSend = processBucketMerge(bucket, s, sender, s.context);
+ replyToSend = processBucketMerge(bucket, *s, sender, s->context);
if (!replyToSend.get()) {
// We have sent something on and shouldn't reply now.
clearState = false;
} else {
- _env._metrics.merge_handler_metrics.mergeLatencyTotal.addValue(s.startTime.getElapsedTimeAsDouble());
+ _env._metrics.merge_handler_metrics.mergeLatencyTotal.addValue(s->startTime.getElapsedTimeAsDouble());
}
}
} else {
- replyToSend = s.pendingApplyDiff;
+ replyToSend = s->pendingApplyDiff;
LOG(debug, "ApplyBucketDiff(%s) finished. Sending reply.",
bucket.toString().c_str());
- s.pendingApplyDiff->getDiff().swap(reply.getDiff());
+ s->pendingApplyDiff->getDiff().swap(reply.getDiff());
}
} catch (std::exception& e) {
_env._fileStorHandler.clearMergeStatus(