diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2023-02-07 15:01:04 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-02-07 15:01:04 +0100 |
commit | 2c8c643e3fcd0aaaf8e47094e5059194239f1600 (patch) | |
tree | 42a9677436c10c577cf8c7a639cafb661fc4dd7f | |
parent | 880182fec33634bec6bbfa9c0f1a2b85e4e1f8f2 (diff) | |
parent | e4ff2074e4a68a987b5c19bc41a15b19cdc03ecc (diff) |
Merge pull request #25921 from vespa-engine/balder/code-review-follow-up
Better name STALE_PROTOCOL_LIFETIME and extend from 10m to 1h
3 files changed, 25 insertions, 30 deletions
diff --git a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp index e68cbd75d52..2f1622750d7 100644 --- a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp +++ b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp @@ -182,7 +182,7 @@ struct MetricsUpdater { void add(const MetricsUpdater& rhs) noexcept { auto& d = count; - auto& s = rhs.count; + const auto& s = rhs.count; d.buckets += s.buckets; d.docs += s.docs; d.bytes += s.bytes; @@ -209,7 +209,7 @@ BucketManager::updateMetrics(bool updateDocCount) if (!updateDocCount || _doneInitialized) { MetricsUpdater total; - for (auto& space : _component.getBucketSpaceRepo()) { + for (const auto& space : _component.getBucketSpaceRepo()) { MetricsUpdater m; auto guard = space.second->bucketDatabase().acquire_read_guard(); guard->for_each(std::ref(m)); @@ -238,7 +238,7 @@ BucketManager::updateMetrics(bool updateDocCount) } void BucketManager::update_bucket_db_memory_usage_metrics() { - for (auto& space : _component.getBucketSpaceRepo()) { + for (const auto& space : _component.getBucketSpaceRepo()) { auto bm = _metrics->bucket_spaces.find(space.first); bm->second->bucket_db_metrics.memory_usage.update(space.second->bucketDatabase().detailed_memory_usage()); } @@ -342,7 +342,7 @@ BucketManager::reportStatus(std::ostream& out, using vespalib::xml::XmlAttribute; xmlReporter << vespalib::xml::XmlTag("buckets"); - for (auto& space : _component.getBucketSpaceRepo()) { + for (const auto& space : _component.getBucketSpaceRepo()) { xmlReporter << XmlTag("bucket-space") << XmlAttribute("name", document::FixedBucketSpaces::to_string(space.first)); BucketDBDumper dumper(xmlReporter.getStream()); @@ -404,7 +404,7 @@ bool BucketManager::onRequestBucketInfo( api::RequestBucketInfoReply::EntryVector info; if (!cmd->getBuckets().empty()) { for (auto bucketId : cmd->getBuckets()) { - for (auto & entry : _component.getBucketDatabase(bucketSpace).getAll(bucketId, "BucketManager::onRequestBucketInfo")) { + for (const auto & entry : _component.getBucketDatabase(bucketSpace).getAll(bucketId, "BucketManager::onRequestBucketInfo")) { info.emplace_back(entry.first, entry.second->getBucketInfo()); } } @@ -457,7 +457,7 @@ BucketManager::leaveQueueProtectedSection(ScopedQueueDispatchGuard& queueGuard) // may alter the relevant state. --_requestsCurrentlyProcessing; if (_requestsCurrentlyProcessing == 0) { - for (auto& qr : _queuedReplies) { + for (const auto& qr : _queuedReplies) { sendUp(qr); } _queuedReplies.clear(); @@ -494,7 +494,7 @@ BucketManager::processRequestBucketInfoCommands(document::BucketSpace bucketSpac reqs.size(), bucketSpace.toString().c_str(), clusterState->toString().c_str(), our_hash.c_str()); std::lock_guard clusterStateGuard(_clusterStateLock); - for (auto & req : std::ranges::reverse_view(reqs)) { + for (const auto & req : std::ranges::reverse_view(reqs)) { // Currently small requests should not be forwarded to worker thread assert(req->hasSystemState()); const auto their_hash = req->getDistributionHash(); @@ -547,7 +547,7 @@ BucketManager::processRequestBucketInfoCommands(document::BucketSpace bucketSpac std::ostringstream distrList; std::unordered_map<uint16_t, api::RequestBucketInfoReply::EntryVector> result; - for (auto& nodeAndCmd : requests) { + for (const auto& nodeAndCmd : requests) { result[nodeAndCmd.first]; if (LOG_WOULD_LOG(debug)) { distrList << ' ' << nodeAndCmd.first; @@ -576,7 +576,7 @@ BucketManager::processRequestBucketInfoCommands(document::BucketSpace bucketSpac "BucketManager::processRequestBucketInfoCommands-2"); } _metrics->fullBucketInfoLatency.addValue(runStartTime.getElapsedTimeAsDouble()); - for (auto& nodeAndCmd : requests) { + for (const auto& nodeAndCmd : requests) { auto reply(std::make_shared<api::RequestBucketInfoReply>(*nodeAndCmd.second)); reply->getBucketInfo().swap(result[nodeAndCmd.first]); sendUp(reply); diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp index 667afbf67a0..393136de654 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp @@ -15,6 +15,7 @@ LOG_SETUP(".distributor.operation.idealstate.merge"); using vespalib::to_utc; using vespalib::to_string; +using vespalib::make_string_short::fmt; namespace storage::distributor { MergeOperation::~MergeOperation() = default; @@ -24,8 +25,7 @@ MergeOperation::getStatus() const { return Operation::getStatus() + - vespalib::make_string(" . Sent MergeBucketCommand at %s", - to_string(to_utc(_sentMessageTime)).c_str()); + fmt(" . Sent MergeBucketCommand at %s", to_string(to_utc(_sentMessageTime)).c_str()); } void @@ -35,7 +35,7 @@ MergeOperation::addIdealNodes( std::vector<MergeMetaData>& result) { // Add all ideal nodes first. These are never marked source-only. - for (unsigned short idealNode : idealNodes) { + for (uint16_t idealNode : idealNodes) { const MergeMetaData* entry = nullptr; for (const auto & node : nodes) { if (idealNode == node._nodeIndex) { @@ -56,7 +56,7 @@ MergeOperation::addCopiesNotAlreadyAdded(uint16_t redundancy, const std::vector<MergeMetaData>& nodes, std::vector<MergeMetaData>& result) { - for (auto node : nodes) { + for (const auto & node : nodes) { bool found = false; for (const auto & mergeData : result) { if (mergeData._nodeIndex == node._nodeIndex) { @@ -123,7 +123,7 @@ MergeOperation::onStart(DistributorStripeMessageSender& sender) std::vector<std::unique_ptr<BucketCopy> > newCopies; std::vector<MergeMetaData> nodes; - for (unsigned short node : getNodes()) { + for (uint16_t node : getNodes()) { const BucketCopy* copy = entry->getNode(node); if (copy == nullptr) { // New copies? newCopies.emplace_back(std::make_unique<BucketCopy>(BucketCopy::recentlyCreatedCopy(0, node))); @@ -153,8 +153,7 @@ MergeOperation::onStart(DistributorStripeMessageSender& sender) msg->set_use_unordered_forwarding(true); } - LOG(debug, "Sending %s to storage node %u", msg->toString().c_str(), - _mnodes[0].index); + LOG(debug, "Sending %s to storage node %u", msg->toString().c_str(), _mnodes[0].index); // Set timeout to one hour to prevent hung nodes that manage to keep // connections open from stalling merges in the cluster indefinitely. @@ -165,8 +164,7 @@ MergeOperation::onStart(DistributorStripeMessageSender& sender) _sentMessageTime = _manager->node_context().clock().getMonotonicTime(); } else { - LOGBP(debug, - "Unable to merge bucket %s, since only one copy is available. System state %s", + LOGBP(debug, "Unable to merge bucket %s, since only one copy is available. System state %s", getBucketId().toString().c_str(), clusterState.toString().c_str()); _ok = false; done(); @@ -178,7 +176,7 @@ MergeOperation::sourceOnlyCopyChangedDuringMerge( const BucketDatabase::Entry& currentState) const { assert(currentState.valid()); - for (auto mnode : _mnodes) { + for (const auto & mnode : _mnodes) { const BucketCopy* copyBefore(_infoBefore.getNode(mnode.index)); if (!copyBefore) { continue; @@ -206,7 +204,7 @@ MergeOperation::deleteSourceOnlyNodes( { assert(currentState.valid()); std::vector<uint16_t> sourceOnlyNodes; - for (auto & mnode : _mnodes) { + for (const auto & mnode : _mnodes) { const uint16_t nodeIndex = mnode.index; const BucketCopy* copy = currentState->getNode(nodeIndex); if (!copy) { @@ -338,7 +336,7 @@ bool MergeOperation::isBlocked(const DistributorStripeOperationContext& ctx, // to enter the merge throttler queues, displacing lower priority merges. if (!is_global_bucket_merge()) { const auto& node_info = ctx.pending_message_tracker().getNodeInfo(); - for (auto node : getNodes()) { + for (uint16_t node : getNodes()) { if (node_info.isBusy(node)) { return true; } @@ -364,11 +362,9 @@ bool MergeOperation::all_involved_nodes_support_unordered_merge_chaining() const MergeBucketMetricSet* MergeOperation::get_merge_metrics() { - if (_manager) { - return dynamic_cast<MergeBucketMetricSet *>(_manager->getMetrics().operations[getType()].get()); - } else { - return nullptr; - } + return (_manager) + ? dynamic_cast<MergeBucketMetricSet *>(_manager->getMetrics().operations[getType()].get()) + : nullptr; } } diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp index ec22d7c064e..db88a22d500 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp +++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp @@ -55,7 +55,7 @@ vespalib::string getNodeId(StorageComponent& sc) { return ost.str(); } -vespalib::duration TEN_MINUTES = 600s; +constexpr vespalib::duration STALE_PROTOCOL_LIFETIME = 1h; } @@ -694,7 +694,7 @@ CommunicationManager::run(framework::ThreadHandle& thread) std::lock_guard<std::mutex> guard(_earlierGenerationsLock); for (auto it(_earlierGenerations.begin()); !_earlierGenerations.empty() && - ((it->first + TEN_MINUTES) < _component.getClock().getMonotonicTime()); + ((it->first + STALE_PROTOCOL_LIFETIME) < _component.getClock().getMonotonicTime()); it = _earlierGenerations.begin()) { _earlierGenerations.erase(it); @@ -709,9 +709,8 @@ CommunicationManager::updateMetrics(const MetricLockGuard &) } void -CommunicationManager::print(std::ostream& out, bool verbose, const std::string& indent) const +CommunicationManager::print(std::ostream& out, bool , const std::string& ) const { - (void) verbose; (void) indent; out << "CommunicationManager"; } |