summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--storage/src/vespa/storage/bucketdb/bucketmanager.cpp18
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp30
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.cpp7
3 files changed, 25 insertions, 30 deletions
diff --git a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp
index e68cbd75d52..2f1622750d7 100644
--- a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp
+++ b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp
@@ -182,7 +182,7 @@ struct MetricsUpdater {
void add(const MetricsUpdater& rhs) noexcept {
auto& d = count;
- auto& s = rhs.count;
+ const auto& s = rhs.count;
d.buckets += s.buckets;
d.docs += s.docs;
d.bytes += s.bytes;
@@ -209,7 +209,7 @@ BucketManager::updateMetrics(bool updateDocCount)
if (!updateDocCount || _doneInitialized) {
MetricsUpdater total;
- for (auto& space : _component.getBucketSpaceRepo()) {
+ for (const auto& space : _component.getBucketSpaceRepo()) {
MetricsUpdater m;
auto guard = space.second->bucketDatabase().acquire_read_guard();
guard->for_each(std::ref(m));
@@ -238,7 +238,7 @@ BucketManager::updateMetrics(bool updateDocCount)
}
void BucketManager::update_bucket_db_memory_usage_metrics() {
- for (auto& space : _component.getBucketSpaceRepo()) {
+ for (const auto& space : _component.getBucketSpaceRepo()) {
auto bm = _metrics->bucket_spaces.find(space.first);
bm->second->bucket_db_metrics.memory_usage.update(space.second->bucketDatabase().detailed_memory_usage());
}
@@ -342,7 +342,7 @@ BucketManager::reportStatus(std::ostream& out,
using vespalib::xml::XmlAttribute;
xmlReporter << vespalib::xml::XmlTag("buckets");
- for (auto& space : _component.getBucketSpaceRepo()) {
+ for (const auto& space : _component.getBucketSpaceRepo()) {
xmlReporter << XmlTag("bucket-space")
<< XmlAttribute("name", document::FixedBucketSpaces::to_string(space.first));
BucketDBDumper dumper(xmlReporter.getStream());
@@ -404,7 +404,7 @@ bool BucketManager::onRequestBucketInfo(
api::RequestBucketInfoReply::EntryVector info;
if (!cmd->getBuckets().empty()) {
for (auto bucketId : cmd->getBuckets()) {
- for (auto & entry : _component.getBucketDatabase(bucketSpace).getAll(bucketId, "BucketManager::onRequestBucketInfo")) {
+ for (const auto & entry : _component.getBucketDatabase(bucketSpace).getAll(bucketId, "BucketManager::onRequestBucketInfo")) {
info.emplace_back(entry.first, entry.second->getBucketInfo());
}
}
@@ -457,7 +457,7 @@ BucketManager::leaveQueueProtectedSection(ScopedQueueDispatchGuard& queueGuard)
// may alter the relevant state.
--_requestsCurrentlyProcessing;
if (_requestsCurrentlyProcessing == 0) {
- for (auto& qr : _queuedReplies) {
+ for (const auto& qr : _queuedReplies) {
sendUp(qr);
}
_queuedReplies.clear();
@@ -494,7 +494,7 @@ BucketManager::processRequestBucketInfoCommands(document::BucketSpace bucketSpac
reqs.size(), bucketSpace.toString().c_str(), clusterState->toString().c_str(), our_hash.c_str());
std::lock_guard clusterStateGuard(_clusterStateLock);
- for (auto & req : std::ranges::reverse_view(reqs)) {
+ for (const auto & req : std::ranges::reverse_view(reqs)) {
// Currently small requests should not be forwarded to worker thread
assert(req->hasSystemState());
const auto their_hash = req->getDistributionHash();
@@ -547,7 +547,7 @@ BucketManager::processRequestBucketInfoCommands(document::BucketSpace bucketSpac
std::ostringstream distrList;
std::unordered_map<uint16_t, api::RequestBucketInfoReply::EntryVector> result;
- for (auto& nodeAndCmd : requests) {
+ for (const auto& nodeAndCmd : requests) {
result[nodeAndCmd.first];
if (LOG_WOULD_LOG(debug)) {
distrList << ' ' << nodeAndCmd.first;
@@ -576,7 +576,7 @@ BucketManager::processRequestBucketInfoCommands(document::BucketSpace bucketSpac
"BucketManager::processRequestBucketInfoCommands-2");
}
_metrics->fullBucketInfoLatency.addValue(runStartTime.getElapsedTimeAsDouble());
- for (auto& nodeAndCmd : requests) {
+ for (const auto& nodeAndCmd : requests) {
auto reply(std::make_shared<api::RequestBucketInfoReply>(*nodeAndCmd.second));
reply->getBucketInfo().swap(result[nodeAndCmd.first]);
sendUp(reply);
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp
index 667afbf67a0..393136de654 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp
@@ -15,6 +15,7 @@ LOG_SETUP(".distributor.operation.idealstate.merge");
using vespalib::to_utc;
using vespalib::to_string;
+using vespalib::make_string_short::fmt;
namespace storage::distributor {
MergeOperation::~MergeOperation() = default;
@@ -24,8 +25,7 @@ MergeOperation::getStatus() const
{
return
Operation::getStatus() +
- vespalib::make_string(" . Sent MergeBucketCommand at %s",
- to_string(to_utc(_sentMessageTime)).c_str());
+ fmt(" . Sent MergeBucketCommand at %s", to_string(to_utc(_sentMessageTime)).c_str());
}
void
@@ -35,7 +35,7 @@ MergeOperation::addIdealNodes(
std::vector<MergeMetaData>& result)
{
// Add all ideal nodes first. These are never marked source-only.
- for (unsigned short idealNode : idealNodes) {
+ for (uint16_t idealNode : idealNodes) {
const MergeMetaData* entry = nullptr;
for (const auto & node : nodes) {
if (idealNode == node._nodeIndex) {
@@ -56,7 +56,7 @@ MergeOperation::addCopiesNotAlreadyAdded(uint16_t redundancy,
const std::vector<MergeMetaData>& nodes,
std::vector<MergeMetaData>& result)
{
- for (auto node : nodes) {
+ for (const auto & node : nodes) {
bool found = false;
for (const auto & mergeData : result) {
if (mergeData._nodeIndex == node._nodeIndex) {
@@ -123,7 +123,7 @@ MergeOperation::onStart(DistributorStripeMessageSender& sender)
std::vector<std::unique_ptr<BucketCopy> > newCopies;
std::vector<MergeMetaData> nodes;
- for (unsigned short node : getNodes()) {
+ for (uint16_t node : getNodes()) {
const BucketCopy* copy = entry->getNode(node);
if (copy == nullptr) { // New copies?
newCopies.emplace_back(std::make_unique<BucketCopy>(BucketCopy::recentlyCreatedCopy(0, node)));
@@ -153,8 +153,7 @@ MergeOperation::onStart(DistributorStripeMessageSender& sender)
msg->set_use_unordered_forwarding(true);
}
- LOG(debug, "Sending %s to storage node %u", msg->toString().c_str(),
- _mnodes[0].index);
+ LOG(debug, "Sending %s to storage node %u", msg->toString().c_str(), _mnodes[0].index);
// Set timeout to one hour to prevent hung nodes that manage to keep
// connections open from stalling merges in the cluster indefinitely.
@@ -165,8 +164,7 @@ MergeOperation::onStart(DistributorStripeMessageSender& sender)
_sentMessageTime = _manager->node_context().clock().getMonotonicTime();
} else {
- LOGBP(debug,
- "Unable to merge bucket %s, since only one copy is available. System state %s",
+ LOGBP(debug, "Unable to merge bucket %s, since only one copy is available. System state %s",
getBucketId().toString().c_str(), clusterState.toString().c_str());
_ok = false;
done();
@@ -178,7 +176,7 @@ MergeOperation::sourceOnlyCopyChangedDuringMerge(
const BucketDatabase::Entry& currentState) const
{
assert(currentState.valid());
- for (auto mnode : _mnodes) {
+ for (const auto & mnode : _mnodes) {
const BucketCopy* copyBefore(_infoBefore.getNode(mnode.index));
if (!copyBefore) {
continue;
@@ -206,7 +204,7 @@ MergeOperation::deleteSourceOnlyNodes(
{
assert(currentState.valid());
std::vector<uint16_t> sourceOnlyNodes;
- for (auto & mnode : _mnodes) {
+ for (const auto & mnode : _mnodes) {
const uint16_t nodeIndex = mnode.index;
const BucketCopy* copy = currentState->getNode(nodeIndex);
if (!copy) {
@@ -338,7 +336,7 @@ bool MergeOperation::isBlocked(const DistributorStripeOperationContext& ctx,
// to enter the merge throttler queues, displacing lower priority merges.
if (!is_global_bucket_merge()) {
const auto& node_info = ctx.pending_message_tracker().getNodeInfo();
- for (auto node : getNodes()) {
+ for (uint16_t node : getNodes()) {
if (node_info.isBusy(node)) {
return true;
}
@@ -364,11 +362,9 @@ bool MergeOperation::all_involved_nodes_support_unordered_merge_chaining() const
MergeBucketMetricSet*
MergeOperation::get_merge_metrics()
{
- if (_manager) {
- return dynamic_cast<MergeBucketMetricSet *>(_manager->getMetrics().operations[getType()].get());
- } else {
- return nullptr;
- }
+ return (_manager)
+ ? dynamic_cast<MergeBucketMetricSet *>(_manager->getMetrics().operations[getType()].get())
+ : nullptr;
}
}
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
index ec22d7c064e..db88a22d500 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
@@ -55,7 +55,7 @@ vespalib::string getNodeId(StorageComponent& sc) {
return ost.str();
}
-vespalib::duration TEN_MINUTES = 600s;
+constexpr vespalib::duration STALE_PROTOCOL_LIFETIME = 1h;
}
@@ -694,7 +694,7 @@ CommunicationManager::run(framework::ThreadHandle& thread)
std::lock_guard<std::mutex> guard(_earlierGenerationsLock);
for (auto it(_earlierGenerations.begin());
!_earlierGenerations.empty() &&
- ((it->first + TEN_MINUTES) < _component.getClock().getMonotonicTime());
+ ((it->first + STALE_PROTOCOL_LIFETIME) < _component.getClock().getMonotonicTime());
it = _earlierGenerations.begin())
{
_earlierGenerations.erase(it);
@@ -709,9 +709,8 @@ CommunicationManager::updateMetrics(const MetricLockGuard &)
}
void
-CommunicationManager::print(std::ostream& out, bool verbose, const std::string& indent) const
+CommunicationManager::print(std::ostream& out, bool , const std::string& ) const
{
- (void) verbose; (void) indent;
out << "CommunicationManager";
}