diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2023-02-07 15:44:34 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2023-02-07 15:44:34 +0000 |
commit | 150f9cf546a73a7b77e40f6b25b36558a8fc517e (patch) | |
tree | 476ea49f10cce8271162d049a2cf99c338fa7f36 /storage | |
parent | 260788dbcc76f4b7b0855bfa48e23495a1561e91 (diff) |
Avoid using duration::max, cap timeout at a sensible value
Diffstat (limited to 'storage')
6 files changed, 49 insertions, 76 deletions
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp index e4130f28eee..0e9873f3434 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp @@ -4,7 +4,6 @@ #include <vespa/storageapi/message/bucketsplitting.h> #include <vespa/storage/distributor/distributor_bucket_space.h> #include <vespa/storage/distributor/idealstatemanager.h> -#include <climits> #include <vespa/log/bufferedlogger.h> LOG_SETUP(".distributor.operation.idealstate.join"); @@ -30,9 +29,7 @@ JoinOperation::onStart(DistributorStripeMessageSender& sender) _bucketsToJoin[0].toString().c_str(), getBucketId().toString().c_str()); } else { LOG(debug, "Starting join operation for (%s,%s) -> %s", - _bucketsToJoin[0].toString().c_str(), - _bucketsToJoin[1].toString().c_str(), - getBucketId().toString().c_str()); + _bucketsToJoin[0].toString().c_str(), _bucketsToJoin[1].toString().c_str(), getBucketId().toString().c_str()); } std::sort(_bucketsToJoin.begin(), _bucketsToJoin.end()); @@ -66,8 +63,7 @@ JoinOperation::resolveSourceBucketsPerTargetNode() const } void -JoinOperation::fillMissingSourceBucketsForInconsistentJoins( - NodeToBuckets& nodeToBuckets) const +JoinOperation::fillMissingSourceBucketsForInconsistentJoins(NodeToBuckets& nodeToBuckets) const { for (auto& node : nodeToBuckets) { if (node.second.size() == 1) { @@ -88,7 +84,7 @@ JoinOperation::enqueueJoinMessagePerTargetNode( std::shared_ptr<api::JoinBucketsCommand> msg( new api::JoinBucketsCommand(getBucket())); msg->getSourceBuckets() = node.second; - msg->setTimeout(vespalib::duration::max()); + msg->setTimeout(MAX_TIMEOUT); setCommandMeta(*msg); _tracker.queueCommand(msg, node.first); } @@ -98,7 +94,7 @@ JoinOperation::enqueueJoinMessagePerTargetNode( void JoinOperation::onReceive(DistributorStripeMessageSender&, const api::StorageReply::SP& msg) { - api::JoinBucketsReply& rep = static_cast<api::JoinBucketsReply&>(*msg); + auto& rep = static_cast<api::JoinBucketsReply&>(*msg); uint16_t node = _tracker.handleReply(rep); if (node == 0xffff) { LOG(debug, "Ignored reply since node was max uint16_t for unknown reasons"); @@ -108,43 +104,35 @@ JoinOperation::onReceive(DistributorStripeMessageSender&, const api::StorageRepl if (rep.getResult().success()) { const std::vector<document::BucketId>& sourceBuckets( rep.getSourceBuckets()); - for (uint32_t i = 0; i < sourceBuckets.size(); i++) { - document::Bucket sourceBucket(msg->getBucket().getBucketSpace(), sourceBuckets[i]); + for (auto bucket : sourceBuckets) { + document::Bucket sourceBucket(msg->getBucket().getBucketSpace(), bucket); _manager->operation_context().remove_node_from_bucket_database(sourceBucket, node); } // Add new buckets. if (!rep.getBucketInfo().valid()) { - LOG(debug, "Invalid bucketinfo for bucket %s returned in join", - getBucketId().toString().c_str()); + LOG(debug, "Invalid bucketinfo for bucket %s returned in join", getBucketId().toString().c_str()); } else { _manager->operation_context().update_bucket_database( getBucket(), - BucketCopy(_manager->operation_context().generate_unique_timestamp(), - node, - rep.getBucketInfo()), + BucketCopy(_manager->operation_context().generate_unique_timestamp(), node, rep.getBucketInfo()), DatabaseUpdate::CREATE_IF_NONEXISTING); LOG(spam, "Adding joined bucket %s", getBucketId().toString().c_str()); } } else if (rep.getResult().getResult() == api::ReturnCode::BUCKET_NOT_FOUND - && _bucketSpace->getBucketDatabase().get(getBucketId())->getNode(node) != 0) + && _bucketSpace->getBucketDatabase().get(getBucketId())->getNode(node) != nullptr) { _manager->operation_context().recheck_bucket_info(node, getBucket()); - LOGBP(warning, "Join failed to find %s: %s", - getBucketId().toString().c_str(), - rep.getResult().toString().c_str()); + LOGBP(warning, "Join failed to find %s: %s", getBucketId().toString().c_str(), rep.getResult().toString().c_str()); } else if (rep.getResult().isBusy()) { - LOG(debug, "Join failed for %s, node was busy. Will retry later", - getBucketId().toString().c_str()); + LOG(debug, "Join failed for %s, node was busy. Will retry later", getBucketId().toString().c_str()); } else if (rep.getResult().isCriticalForMaintenance()) { LOGBP(warning, "Join failed for %s: %s with error '%s'", - getBucketId().toString().c_str(), msg->toString().c_str(), - msg->getResult().toString().c_str()); + getBucketId().toString().c_str(), msg->toString().c_str(), msg->getResult().toString().c_str()); } else { LOG(debug, "Join failed for %s with non-critical failure: %s", - getBucketId().toString().c_str(), - rep.getResult().toString().c_str()); + getBucketId().toString().c_str(), rep.getResult().toString().c_str()); } _ok = rep.getResult().success(); @@ -157,7 +145,7 @@ JoinOperation::onReceive(DistributorStripeMessageSender&, const api::StorageRepl document::Bucket JoinOperation::getJoinBucket(size_t idx) const { - return document::Bucket(getBucket().getBucketSpace(), _bucketsToJoin[idx]); + return {getBucket().getBucketSpace(), _bucketsToJoin[idx]}; } bool diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp index 97d86528ea0..8e64fb227a7 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp @@ -4,7 +4,6 @@ #include <vespa/storage/distributor/idealstatemanager.h> #include <vespa/storageapi/message/bucketsplitting.h> #include <vespa/storage/distributor/distributor_bucket_space.h> -#include <climits> #include <vespa/log/bufferedlogger.h> LOG_SETUP(".distributor.operation.idealstate.split"); @@ -30,12 +29,11 @@ SplitOperation::onStart(DistributorStripeMessageSender& sender) BucketDatabase::Entry entry = _bucketSpace->getBucketDatabase().get(getBucketId()); for (uint32_t i = 0; i < entry->getNodeCount(); i++) { - std::shared_ptr<api::SplitBucketCommand> msg( - new api::SplitBucketCommand(getBucket())); + auto msg = std::make_shared<api::SplitBucketCommand>(getBucket()); msg->setMaxSplitBits(_maxBits); msg->setMinDocCount(_splitCount); msg->setMinByteSize(_splitSize); - msg->setTimeout(vespalib::duration::max()); + msg->setTimeout(MAX_TIMEOUT); setCommandMeta(*msg); _tracker.queueCommand(msg, entry->getNodeRef(i).getNode()); _ok = true; @@ -52,28 +50,25 @@ SplitOperation::onStart(DistributorStripeMessageSender& sender) void SplitOperation::onReceive(DistributorStripeMessageSender&, const api::StorageReply::SP& msg) { - api::SplitBucketReply& rep = static_cast<api::SplitBucketReply&>(*msg); + auto & rep = static_cast<api::SplitBucketReply&>(*msg); uint16_t node = _tracker.handleReply(rep); if (node == 0xffff) { - LOG(debug, "Ignored reply since node was max uint16_t for unknown " - "reasons"); + LOG(debug, "Ignored reply since node was max uint16_t for unknown reasons"); return; } std::ostringstream ost; if (rep.getResult().success()) { - BucketDatabase::Entry entry = - _bucketSpace->getBucketDatabase().get(rep.getBucketId()); + BucketDatabase::Entry entry = _bucketSpace->getBucketDatabase().get(rep.getBucketId()); if (entry.valid()) { entry->removeNode(node); if (entry->getNodeCount() == 0) { - LOG(spam, "Removing split bucket %s", - getBucketId().toString().c_str()); + LOG(spam, "Removing split bucket %s", getBucketId().toString().c_str()); _bucketSpace->getBucketDatabase().remove(rep.getBucketId()); } else { _bucketSpace->getBucketDatabase().update(entry); @@ -83,44 +78,34 @@ SplitOperation::onReceive(DistributorStripeMessageSender&, const api::StorageRep } // Add new buckets. - for (uint32_t i = 0; i < rep.getSplitInfo().size(); i++) { - const api::SplitBucketReply::Entry& sinfo = rep.getSplitInfo()[i]; - + for (const auto & sinfo : rep.getSplitInfo()) { if (!sinfo.second.valid()) { - LOG(error, "Received invalid bucket %s from node %d as reply " - "to split bucket", + LOG(error, "Received invalid bucket %s from node %d as reply to split bucket", sinfo.first.toString().c_str(), node); } ost << sinfo.first << ","; - BucketCopy copy( - BucketCopy(_manager->operation_context().generate_unique_timestamp(), - node, - sinfo.second)); + BucketCopy copy(_manager->operation_context().generate_unique_timestamp(), node, sinfo.second); // Must reset trusted since otherwise trustedness of inconsistent // copies would be arbitrarily determined by which copy managed // to finish its split first. _manager->operation_context().update_bucket_database( document::Bucket(msg->getBucket().getBucketSpace(), sinfo.first), copy, - (DatabaseUpdate::CREATE_IF_NONEXISTING - | DatabaseUpdate::RESET_TRUSTED)); + (DatabaseUpdate::CREATE_IF_NONEXISTING | DatabaseUpdate::RESET_TRUSTED)); } } else if ( rep.getResult().getResult() == api::ReturnCode::BUCKET_NOT_FOUND - && _bucketSpace->getBucketDatabase().get(rep.getBucketId())->getNode(node) != 0) + && _bucketSpace->getBucketDatabase().get(rep.getBucketId())->getNode(node) != nullptr) { _manager->operation_context().recheck_bucket_info(node, getBucket()); - LOGBP(debug, "Split failed for %s: bucket not found. Storage and " - "distributor bucket databases might be out of sync: %s", - getBucketId().toString().c_str(), - vespalib::string(rep.getResult().getMessage()).c_str()); + LOGBP(debug, "Split failed for %s: bucket not found. Storage and distributor bucket databases might be out of sync: %s", + getBucketId().toString().c_str(), vespalib::string(rep.getResult().getMessage()).c_str()); _ok = false; } else if (rep.getResult().isBusy()) { - LOG(debug, "Split failed for %s, node was busy. Will retry later", - getBucketId().toString().c_str()); + LOG(debug, "Split failed for %s, node was busy. Will retry later", getBucketId().toString().c_str()); _ok = false; } else if (rep.getResult().isCriticalForMaintenance()) { LOGBP(warning, "Split failed for %s: %s with error '%s'", @@ -134,12 +119,10 @@ SplitOperation::onReceive(DistributorStripeMessageSender&, const api::StorageRep } if (_tracker.finished()) { - LOG(debug, "Split done on node %d: %s completed operation", - node, ost.str().c_str()); + LOG(debug, "Split done on node %d: %s completed operation", node, ost.str().c_str()); done(); } else { - LOG(debug, "Split done on node %d: %s still pending on other nodes", - node, ost.str().c_str()); + LOG(debug, "Split done on node %d: %s still pending on other nodes", node, ost.str().c_str()); } } diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.h index 604b29e296c..d870f919b86 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.h +++ b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.h @@ -14,7 +14,7 @@ public: uint32_t maxBits, uint32_t splitCount, uint32_t splitSize); SplitOperation(const SplitOperation&) = delete; SplitOperation& operator=(const SplitOperation&) = delete; - ~SplitOperation(); + ~SplitOperation() override; void onStart(DistributorStripeMessageSender& sender) override; void onReceive(DistributorStripeMessageSender& sender, const std::shared_ptr<api::StorageReply>&) override; diff --git a/storage/src/vespa/storage/distributor/operations/operation.h b/storage/src/vespa/storage/distributor/operations/operation.h index 8bb81b8d365..68358be4df8 100644 --- a/storage/src/vespa/storage/distributor/operations/operation.h +++ b/storage/src/vespa/storage/distributor/operations/operation.h @@ -93,6 +93,7 @@ private: const std::shared_ptr<api::StorageReply> & msg) = 0; protected: + static constexpr vespalib::duration MAX_TIMEOUT = 3600s; vespalib::system_time _startTime; }; diff --git a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp index c86254cb69a..995a3453dc5 100644 --- a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp +++ b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp @@ -22,6 +22,10 @@ using lib::Node; using lib::NodeType; using lib::NodeState; +namespace { + constexpr vespalib::duration MAX_TIMEOUT=3600s; +} + PendingClusterState::PendingClusterState( const framework::Clock& clock, const ClusterInformation::CSP& clusterInfo, @@ -160,10 +164,8 @@ PendingClusterState::iAmDown() const void PendingClusterState::requestNodes() { - LOG(debug, - "New system state: Old state was %s, new state is %s", - getPrevClusterStateBundleString().c_str(), - getNewClusterStateBundleString().c_str()); + LOG(debug, "New system state: Old state was %s, new state is %s", + getPrevClusterStateBundleString().c_str(), getNewClusterStateBundleString().c_str()); requestBucketInfoFromStorageNodesWithChangedState(); } @@ -188,10 +190,8 @@ PendingClusterState::requestNode(BucketSpaceAndNode bucketSpaceAndNode) vespalib::string distributionHash = distribution.getNodeGraph().getDistributionConfigHash(); LOG(debug, - "Requesting bucket info for bucket space %" PRIu64 " node %d with cluster state '%s' " - "and distribution hash '%s'", - bucketSpaceAndNode.bucketSpace.getId(), - bucketSpaceAndNode.node, + "Requesting bucket info for bucket space %" PRIu64 " node %d with cluster state '%s' and distribution hash '%s'", + bucketSpaceAndNode.bucketSpace.getId(), bucketSpaceAndNode.node, _newClusterStateBundle.getDerivedClusterState(bucketSpaceAndNode.bucketSpace)->toString().c_str(), distributionHash.c_str()); @@ -202,7 +202,7 @@ PendingClusterState::requestNode(BucketSpaceAndNode bucketSpaceAndNode) distributionHash); cmd->setPriority(api::StorageMessage::HIGH); - cmd->setTimeout(vespalib::duration::max()); + cmd->setTimeout(MAX_TIMEOUT); _sentMessages.emplace(cmd->getMsgId(), bucketSpaceAndNode); diff --git a/storage/src/vespa/storageapi/messageapi/storagecommand.cpp b/storage/src/vespa/storageapi/messageapi/storagecommand.cpp index 1e797ba4792..1641d1ae85b 100644 --- a/storage/src/vespa/storageapi/messageapi/storagecommand.cpp +++ b/storage/src/vespa/storageapi/messageapi/storagecommand.cpp @@ -6,6 +6,10 @@ namespace storage::api { +namespace { + constexpr vespalib::duration MAX_TIMEOUT=3600s; +} + StorageCommand::StorageCommand(const StorageCommand& other) : StorageMessage(other, generateMsgId()), _timeout(other._timeout), @@ -15,10 +19,9 @@ StorageCommand::StorageCommand(const StorageCommand& other) StorageCommand::StorageCommand(const MessageType& type, Priority p) : StorageMessage(type, generateMsgId()), - // Default timeout is unlimited. Set from mbus message. Some internal - // use want unlimited timeout, (such as readbucketinfo, repair bucket - // etc) - _timeout(duration::max()), + // Default timeout one hour. Set from mbus message. Some internal + // use want unlimited timeout, (such as readbucketinfo, repair bucket, etc.) + _timeout(MAX_TIMEOUT), _sourceIndex(0xFFFF) { setPriority(p); @@ -27,10 +30,8 @@ StorageCommand::StorageCommand(const MessageType& type, Priority p) StorageCommand::~StorageCommand() = default; void -StorageCommand::print(std::ostream& out, bool verbose, - const std::string& indent) const +StorageCommand::print(std::ostream& out, bool, const std::string&) const { - (void) verbose; (void) indent; out << "StorageCommand(" << _type.getName(); if (_priority != NORMAL) out << ", priority = " << static_cast<int>(_priority); if (_sourceIndex != 0xFFFF) out << ", source = " << _sourceIndex; |