diff options
Diffstat (limited to 'storage/src/vespa/storage/persistence/mergehandler.cpp')
-rw-r--r-- | storage/src/vespa/storage/persistence/mergehandler.cpp | 63 |
1 files changed, 33 insertions, 30 deletions
diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp index 7693156ae30..963fddd9fb5 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.cpp +++ b/storage/src/vespa/storage/persistence/mergehandler.cpp @@ -23,13 +23,15 @@ namespace storage { MergeHandler::MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi, const ClusterContext& cluster_context, const framework::Clock & clock, uint32_t maxChunkSize, - uint32_t commonMergeChainOptimalizationMinimumSize) + uint32_t commonMergeChainOptimalizationMinimumSize, + bool async_apply_bucket_diff) : _clock(clock), _cluster_context(cluster_context), _env(env), _spi(spi), _maxChunkSize(maxChunkSize), - _commonMergeChainOptimalizationMinimumSize(commonMergeChainOptimalizationMinimumSize) + _commonMergeChainOptimalizationMinimumSize(commonMergeChainOptimalizationMinimumSize), + _async_apply_bucket_diff(async_apply_bucket_diff) { } @@ -1136,49 +1138,49 @@ MergeHandler::handleGetBucketDiffReply(api::GetBucketDiffReply& reply, MessageSe return; } - MergeStatus& s = _env._fileStorHandler.editMergeStatus(bucket.getBucket()); - if (s.pendingId != reply.getMsgId()) { + auto s = _env._fileStorHandler.editMergeStatus(bucket.getBucket()); + if (s->pendingId != reply.getMsgId()) { LOG(warning, "Got GetBucketDiffReply for %s which had message " "id %" PRIu64 " when we expected %" PRIu64 ". Ignoring reply.", - bucket.toString().c_str(), reply.getMsgId(), s.pendingId); + bucket.toString().c_str(), reply.getMsgId(), s->pendingId); return; } api::StorageReply::SP replyToSend; bool clearState = true; try { - if (s.isFirstNode()) { + if (s->isFirstNode()) { if (reply.getResult().failed()) { // We failed, so we should reply to the pending message. - replyToSend = s.reply; + replyToSend = s->reply; } else { // If we didn't fail, reply should have good content // Sanity check for nodes assert(reply.getNodes().size() >= 2); // Get bucket diff should retrieve all info at once - assert(s.diff.size() == 0); - s.diff.insert(s.diff.end(), + assert(s->diff.size() == 0); + s->diff.insert(s->diff.end(), reply.getDiff().begin(), reply.getDiff().end()); - replyToSend = processBucketMerge(bucket, s, sender, s.context); + replyToSend = processBucketMerge(bucket, *s, sender, s->context); if (!replyToSend.get()) { // We have sent something on, and shouldn't reply now. clearState = false; } else { _env._metrics.merge_handler_metrics.mergeLatencyTotal.addValue( - s.startTime.getElapsedTimeAsDouble()); + s->startTime.getElapsedTimeAsDouble()); } } } else { // Exists in send on list, send on! - replyToSend = s.pendingGetDiff; + replyToSend = s->pendingGetDiff; LOG(spam, "Received GetBucketDiffReply for %s with diff of " "size %zu. Sending it on.", bucket.toString().c_str(), reply.getDiff().size()); - s.pendingGetDiff->getDiff().swap(reply.getDiff()); + s->pendingGetDiff->getDiff().swap(reply.getDiff()); } } catch (std::exception& e) { _env._fileStorHandler.clearMergeStatus( @@ -1282,8 +1284,9 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTra } void -MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,MessageSender& sender) const +MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,MessageSender& sender, MessageTracker::UP tracker) const { + (void) tracker; _env._metrics.applyBucketDiffReply.inc(); spi::Bucket bucket(reply.getBucket()); ApplyBucketDiffState async_results(*this, bucket); @@ -1296,11 +1299,11 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,Messag return; } - MergeStatus& s = _env._fileStorHandler.editMergeStatus(bucket.getBucket()); - if (s.pendingId != reply.getMsgId()) { + auto s = _env._fileStorHandler.editMergeStatus(bucket.getBucket()); + if (s->pendingId != reply.getMsgId()) { LOG(warning, "Got ApplyBucketDiffReply for %s which had message " "id %" PRIu64 " when we expected %" PRIu64 ". Ignoring reply.", - bucket.toString().c_str(), reply.getMsgId(), s.pendingId); + bucket.toString().c_str(), reply.getMsgId(), s->pendingId); return; } bool clearState = true; @@ -1315,12 +1318,12 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,Messag uint8_t index = findOwnIndex(reply.getNodes(), _env._nodeIndex); if (applyDiffNeedLocalData(diff, index, false)) { framework::MilliSecTimer startTime(_clock); - fetchLocalData(bucket, diff, index, s.context); + fetchLocalData(bucket, diff, index, s->context); _env._metrics.merge_handler_metrics.mergeDataReadLatency.addValue(startTime.getElapsedTimeAsDouble()); } if (applyDiffHasLocallyNeededData(diff, index)) { framework::MilliSecTimer startTime(_clock); - applyDiffLocally(bucket, diff, index, s.context, async_results); + applyDiffLocally(bucket, diff, index, s->context, async_results); _env._metrics.merge_handler_metrics.mergeDataWriteLatency.addValue(startTime.getElapsedTimeAsDouble()); async_results.check(); async_results.sync_bucket_info(); @@ -1332,50 +1335,50 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,Messag } } - if (s.isFirstNode()) { + if (s->isFirstNode()) { uint16_t hasMask = 0; for (uint16_t i=0; i<reply.getNodes().size(); ++i) { hasMask |= (1 << i); } - const size_t diffSizeBefore = s.diff.size(); - const bool altered = s.removeFromDiff(diff, hasMask, reply.getNodes()); + const size_t diffSizeBefore = s->diff.size(); + const bool altered = s->removeFromDiff(diff, hasMask, reply.getNodes()); if (reply.getResult().success() - && s.diff.size() == diffSizeBefore + && s->diff.size() == diffSizeBefore && !altered) { std::string msg( vespalib::make_string( "Completed merge cycle without fixing " "any entries (merge state diff at %zu entries)", - s.diff.size())); + s->diff.size())); returnCode = api::ReturnCode(api::ReturnCode::INTERNAL_FAILURE, msg); LOG(warning, "Got reply indicating merge cycle did not fix any entries: %s", reply.toString(true).c_str()); LOG(warning, "Merge state for which there was no progress across a full merge cycle: %s", - s.toString().c_str()); + s->toString().c_str()); } if (returnCode.failed()) { // Should reply now, since we failed. - replyToSend = s.reply; + replyToSend = s->reply; } else { - replyToSend = processBucketMerge(bucket, s, sender, s.context); + replyToSend = processBucketMerge(bucket, *s, sender, s->context); if (!replyToSend.get()) { // We have sent something on and shouldn't reply now. clearState = false; } else { - _env._metrics.merge_handler_metrics.mergeLatencyTotal.addValue(s.startTime.getElapsedTimeAsDouble()); + _env._metrics.merge_handler_metrics.mergeLatencyTotal.addValue(s->startTime.getElapsedTimeAsDouble()); } } } else { - replyToSend = s.pendingApplyDiff; + replyToSend = s->pendingApplyDiff; LOG(debug, "ApplyBucketDiff(%s) finished. Sending reply.", bucket.toString().c_str()); - s.pendingApplyDiff->getDiff().swap(reply.getDiff()); + s->pendingApplyDiff->getDiff().swap(reply.getDiff()); } } catch (std::exception& e) { _env._fileStorHandler.clearMergeStatus( |