aboutsummaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2023-02-07 15:44:34 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2023-02-07 15:44:34 +0000
commit150f9cf546a73a7b77e40f6b25b36558a8fc517e (patch)
tree476ea49f10cce8271162d049a2cf99c338fa7f36 /storage
parent260788dbcc76f4b7b0855bfa48e23495a1561e91 (diff)
Avoid using duration::max, cap timeout at a sensible value
Diffstat (limited to 'storage')
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp40
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp49
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.h2
-rw-r--r--storage/src/vespa/storage/distributor/operations/operation.h1
-rw-r--r--storage/src/vespa/storage/distributor/pendingclusterstate.cpp18
-rw-r--r--storage/src/vespa/storageapi/messageapi/storagecommand.cpp15
6 files changed, 49 insertions, 76 deletions
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp
index e4130f28eee..0e9873f3434 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp
@@ -4,7 +4,6 @@
#include <vespa/storageapi/message/bucketsplitting.h>
#include <vespa/storage/distributor/distributor_bucket_space.h>
#include <vespa/storage/distributor/idealstatemanager.h>
-#include <climits>
#include <vespa/log/bufferedlogger.h>
LOG_SETUP(".distributor.operation.idealstate.join");
@@ -30,9 +29,7 @@ JoinOperation::onStart(DistributorStripeMessageSender& sender)
_bucketsToJoin[0].toString().c_str(), getBucketId().toString().c_str());
} else {
LOG(debug, "Starting join operation for (%s,%s) -> %s",
- _bucketsToJoin[0].toString().c_str(),
- _bucketsToJoin[1].toString().c_str(),
- getBucketId().toString().c_str());
+ _bucketsToJoin[0].toString().c_str(), _bucketsToJoin[1].toString().c_str(), getBucketId().toString().c_str());
}
std::sort(_bucketsToJoin.begin(), _bucketsToJoin.end());
@@ -66,8 +63,7 @@ JoinOperation::resolveSourceBucketsPerTargetNode() const
}
void
-JoinOperation::fillMissingSourceBucketsForInconsistentJoins(
- NodeToBuckets& nodeToBuckets) const
+JoinOperation::fillMissingSourceBucketsForInconsistentJoins(NodeToBuckets& nodeToBuckets) const
{
for (auto& node : nodeToBuckets) {
if (node.second.size() == 1) {
@@ -88,7 +84,7 @@ JoinOperation::enqueueJoinMessagePerTargetNode(
std::shared_ptr<api::JoinBucketsCommand> msg(
new api::JoinBucketsCommand(getBucket()));
msg->getSourceBuckets() = node.second;
- msg->setTimeout(vespalib::duration::max());
+ msg->setTimeout(MAX_TIMEOUT);
setCommandMeta(*msg);
_tracker.queueCommand(msg, node.first);
}
@@ -98,7 +94,7 @@ JoinOperation::enqueueJoinMessagePerTargetNode(
void
JoinOperation::onReceive(DistributorStripeMessageSender&, const api::StorageReply::SP& msg)
{
- api::JoinBucketsReply& rep = static_cast<api::JoinBucketsReply&>(*msg);
+ auto& rep = static_cast<api::JoinBucketsReply&>(*msg);
uint16_t node = _tracker.handleReply(rep);
if (node == 0xffff) {
LOG(debug, "Ignored reply since node was max uint16_t for unknown reasons");
@@ -108,43 +104,35 @@ JoinOperation::onReceive(DistributorStripeMessageSender&, const api::StorageRepl
if (rep.getResult().success()) {
const std::vector<document::BucketId>& sourceBuckets(
rep.getSourceBuckets());
- for (uint32_t i = 0; i < sourceBuckets.size(); i++) {
- document::Bucket sourceBucket(msg->getBucket().getBucketSpace(), sourceBuckets[i]);
+ for (auto bucket : sourceBuckets) {
+ document::Bucket sourceBucket(msg->getBucket().getBucketSpace(), bucket);
_manager->operation_context().remove_node_from_bucket_database(sourceBucket, node);
}
// Add new buckets.
if (!rep.getBucketInfo().valid()) {
- LOG(debug, "Invalid bucketinfo for bucket %s returned in join",
- getBucketId().toString().c_str());
+ LOG(debug, "Invalid bucketinfo for bucket %s returned in join", getBucketId().toString().c_str());
} else {
_manager->operation_context().update_bucket_database(
getBucket(),
- BucketCopy(_manager->operation_context().generate_unique_timestamp(),
- node,
- rep.getBucketInfo()),
+ BucketCopy(_manager->operation_context().generate_unique_timestamp(), node, rep.getBucketInfo()),
DatabaseUpdate::CREATE_IF_NONEXISTING);
LOG(spam, "Adding joined bucket %s", getBucketId().toString().c_str());
}
} else if (rep.getResult().getResult() == api::ReturnCode::BUCKET_NOT_FOUND
- && _bucketSpace->getBucketDatabase().get(getBucketId())->getNode(node) != 0)
+ && _bucketSpace->getBucketDatabase().get(getBucketId())->getNode(node) != nullptr)
{
_manager->operation_context().recheck_bucket_info(node, getBucket());
- LOGBP(warning, "Join failed to find %s: %s",
- getBucketId().toString().c_str(),
- rep.getResult().toString().c_str());
+ LOGBP(warning, "Join failed to find %s: %s", getBucketId().toString().c_str(), rep.getResult().toString().c_str());
} else if (rep.getResult().isBusy()) {
- LOG(debug, "Join failed for %s, node was busy. Will retry later",
- getBucketId().toString().c_str());
+ LOG(debug, "Join failed for %s, node was busy. Will retry later", getBucketId().toString().c_str());
} else if (rep.getResult().isCriticalForMaintenance()) {
LOGBP(warning, "Join failed for %s: %s with error '%s'",
- getBucketId().toString().c_str(), msg->toString().c_str(),
- msg->getResult().toString().c_str());
+ getBucketId().toString().c_str(), msg->toString().c_str(), msg->getResult().toString().c_str());
} else {
LOG(debug, "Join failed for %s with non-critical failure: %s",
- getBucketId().toString().c_str(),
- rep.getResult().toString().c_str());
+ getBucketId().toString().c_str(), rep.getResult().toString().c_str());
}
_ok = rep.getResult().success();
@@ -157,7 +145,7 @@ JoinOperation::onReceive(DistributorStripeMessageSender&, const api::StorageRepl
document::Bucket
JoinOperation::getJoinBucket(size_t idx) const
{
- return document::Bucket(getBucket().getBucketSpace(), _bucketsToJoin[idx]);
+ return {getBucket().getBucketSpace(), _bucketsToJoin[idx]};
}
bool
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp
index 97d86528ea0..8e64fb227a7 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp
@@ -4,7 +4,6 @@
#include <vespa/storage/distributor/idealstatemanager.h>
#include <vespa/storageapi/message/bucketsplitting.h>
#include <vespa/storage/distributor/distributor_bucket_space.h>
-#include <climits>
#include <vespa/log/bufferedlogger.h>
LOG_SETUP(".distributor.operation.idealstate.split");
@@ -30,12 +29,11 @@ SplitOperation::onStart(DistributorStripeMessageSender& sender)
BucketDatabase::Entry entry = _bucketSpace->getBucketDatabase().get(getBucketId());
for (uint32_t i = 0; i < entry->getNodeCount(); i++) {
- std::shared_ptr<api::SplitBucketCommand> msg(
- new api::SplitBucketCommand(getBucket()));
+ auto msg = std::make_shared<api::SplitBucketCommand>(getBucket());
msg->setMaxSplitBits(_maxBits);
msg->setMinDocCount(_splitCount);
msg->setMinByteSize(_splitSize);
- msg->setTimeout(vespalib::duration::max());
+ msg->setTimeout(MAX_TIMEOUT);
setCommandMeta(*msg);
_tracker.queueCommand(msg, entry->getNodeRef(i).getNode());
_ok = true;
@@ -52,28 +50,25 @@ SplitOperation::onStart(DistributorStripeMessageSender& sender)
void
SplitOperation::onReceive(DistributorStripeMessageSender&, const api::StorageReply::SP& msg)
{
- api::SplitBucketReply& rep = static_cast<api::SplitBucketReply&>(*msg);
+ auto & rep = static_cast<api::SplitBucketReply&>(*msg);
uint16_t node = _tracker.handleReply(rep);
if (node == 0xffff) {
- LOG(debug, "Ignored reply since node was max uint16_t for unknown "
- "reasons");
+ LOG(debug, "Ignored reply since node was max uint16_t for unknown reasons");
return;
}
std::ostringstream ost;
if (rep.getResult().success()) {
- BucketDatabase::Entry entry =
- _bucketSpace->getBucketDatabase().get(rep.getBucketId());
+ BucketDatabase::Entry entry = _bucketSpace->getBucketDatabase().get(rep.getBucketId());
if (entry.valid()) {
entry->removeNode(node);
if (entry->getNodeCount() == 0) {
- LOG(spam, "Removing split bucket %s",
- getBucketId().toString().c_str());
+ LOG(spam, "Removing split bucket %s", getBucketId().toString().c_str());
_bucketSpace->getBucketDatabase().remove(rep.getBucketId());
} else {
_bucketSpace->getBucketDatabase().update(entry);
@@ -83,44 +78,34 @@ SplitOperation::onReceive(DistributorStripeMessageSender&, const api::StorageRep
}
// Add new buckets.
- for (uint32_t i = 0; i < rep.getSplitInfo().size(); i++) {
- const api::SplitBucketReply::Entry& sinfo = rep.getSplitInfo()[i];
-
+ for (const auto & sinfo : rep.getSplitInfo()) {
if (!sinfo.second.valid()) {
- LOG(error, "Received invalid bucket %s from node %d as reply "
- "to split bucket",
+ LOG(error, "Received invalid bucket %s from node %d as reply to split bucket",
sinfo.first.toString().c_str(), node);
}
ost << sinfo.first << ",";
- BucketCopy copy(
- BucketCopy(_manager->operation_context().generate_unique_timestamp(),
- node,
- sinfo.second));
+ BucketCopy copy(_manager->operation_context().generate_unique_timestamp(), node, sinfo.second);
// Must reset trusted since otherwise trustedness of inconsistent
// copies would be arbitrarily determined by which copy managed
// to finish its split first.
_manager->operation_context().update_bucket_database(
document::Bucket(msg->getBucket().getBucketSpace(), sinfo.first), copy,
- (DatabaseUpdate::CREATE_IF_NONEXISTING
- | DatabaseUpdate::RESET_TRUSTED));
+ (DatabaseUpdate::CREATE_IF_NONEXISTING | DatabaseUpdate::RESET_TRUSTED));
}
} else if (
rep.getResult().getResult() == api::ReturnCode::BUCKET_NOT_FOUND
- && _bucketSpace->getBucketDatabase().get(rep.getBucketId())->getNode(node) != 0)
+ && _bucketSpace->getBucketDatabase().get(rep.getBucketId())->getNode(node) != nullptr)
{
_manager->operation_context().recheck_bucket_info(node, getBucket());
- LOGBP(debug, "Split failed for %s: bucket not found. Storage and "
- "distributor bucket databases might be out of sync: %s",
- getBucketId().toString().c_str(),
- vespalib::string(rep.getResult().getMessage()).c_str());
+ LOGBP(debug, "Split failed for %s: bucket not found. Storage and distributor bucket databases might be out of sync: %s",
+ getBucketId().toString().c_str(), vespalib::string(rep.getResult().getMessage()).c_str());
_ok = false;
} else if (rep.getResult().isBusy()) {
- LOG(debug, "Split failed for %s, node was busy. Will retry later",
- getBucketId().toString().c_str());
+ LOG(debug, "Split failed for %s, node was busy. Will retry later", getBucketId().toString().c_str());
_ok = false;
} else if (rep.getResult().isCriticalForMaintenance()) {
LOGBP(warning, "Split failed for %s: %s with error '%s'",
@@ -134,12 +119,10 @@ SplitOperation::onReceive(DistributorStripeMessageSender&, const api::StorageRep
}
if (_tracker.finished()) {
- LOG(debug, "Split done on node %d: %s completed operation",
- node, ost.str().c_str());
+ LOG(debug, "Split done on node %d: %s completed operation", node, ost.str().c_str());
done();
} else {
- LOG(debug, "Split done on node %d: %s still pending on other nodes",
- node, ost.str().c_str());
+ LOG(debug, "Split done on node %d: %s still pending on other nodes", node, ost.str().c_str());
}
}
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.h
index 604b29e296c..d870f919b86 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.h
@@ -14,7 +14,7 @@ public:
uint32_t maxBits, uint32_t splitCount, uint32_t splitSize);
SplitOperation(const SplitOperation&) = delete;
SplitOperation& operator=(const SplitOperation&) = delete;
- ~SplitOperation();
+ ~SplitOperation() override;
void onStart(DistributorStripeMessageSender& sender) override;
void onReceive(DistributorStripeMessageSender& sender, const std::shared_ptr<api::StorageReply>&) override;
diff --git a/storage/src/vespa/storage/distributor/operations/operation.h b/storage/src/vespa/storage/distributor/operations/operation.h
index 8bb81b8d365..68358be4df8 100644
--- a/storage/src/vespa/storage/distributor/operations/operation.h
+++ b/storage/src/vespa/storage/distributor/operations/operation.h
@@ -93,6 +93,7 @@ private:
const std::shared_ptr<api::StorageReply> & msg) = 0;
protected:
+ static constexpr vespalib::duration MAX_TIMEOUT = 3600s;
vespalib::system_time _startTime;
};
diff --git a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp
index c86254cb69a..995a3453dc5 100644
--- a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp
+++ b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp
@@ -22,6 +22,10 @@ using lib::Node;
using lib::NodeType;
using lib::NodeState;
+namespace {
+ constexpr vespalib::duration MAX_TIMEOUT=3600s;
+}
+
PendingClusterState::PendingClusterState(
const framework::Clock& clock,
const ClusterInformation::CSP& clusterInfo,
@@ -160,10 +164,8 @@ PendingClusterState::iAmDown() const
void
PendingClusterState::requestNodes()
{
- LOG(debug,
- "New system state: Old state was %s, new state is %s",
- getPrevClusterStateBundleString().c_str(),
- getNewClusterStateBundleString().c_str());
+ LOG(debug, "New system state: Old state was %s, new state is %s",
+ getPrevClusterStateBundleString().c_str(), getNewClusterStateBundleString().c_str());
requestBucketInfoFromStorageNodesWithChangedState();
}
@@ -188,10 +190,8 @@ PendingClusterState::requestNode(BucketSpaceAndNode bucketSpaceAndNode)
vespalib::string distributionHash = distribution.getNodeGraph().getDistributionConfigHash();
LOG(debug,
- "Requesting bucket info for bucket space %" PRIu64 " node %d with cluster state '%s' "
- "and distribution hash '%s'",
- bucketSpaceAndNode.bucketSpace.getId(),
- bucketSpaceAndNode.node,
+ "Requesting bucket info for bucket space %" PRIu64 " node %d with cluster state '%s' and distribution hash '%s'",
+ bucketSpaceAndNode.bucketSpace.getId(), bucketSpaceAndNode.node,
_newClusterStateBundle.getDerivedClusterState(bucketSpaceAndNode.bucketSpace)->toString().c_str(),
distributionHash.c_str());
@@ -202,7 +202,7 @@ PendingClusterState::requestNode(BucketSpaceAndNode bucketSpaceAndNode)
distributionHash);
cmd->setPriority(api::StorageMessage::HIGH);
- cmd->setTimeout(vespalib::duration::max());
+ cmd->setTimeout(MAX_TIMEOUT);
_sentMessages.emplace(cmd->getMsgId(), bucketSpaceAndNode);
diff --git a/storage/src/vespa/storageapi/messageapi/storagecommand.cpp b/storage/src/vespa/storageapi/messageapi/storagecommand.cpp
index 1e797ba4792..1641d1ae85b 100644
--- a/storage/src/vespa/storageapi/messageapi/storagecommand.cpp
+++ b/storage/src/vespa/storageapi/messageapi/storagecommand.cpp
@@ -6,6 +6,10 @@
namespace storage::api {
+namespace {
+ constexpr vespalib::duration MAX_TIMEOUT=3600s;
+}
+
StorageCommand::StorageCommand(const StorageCommand& other)
: StorageMessage(other, generateMsgId()),
_timeout(other._timeout),
@@ -15,10 +19,9 @@ StorageCommand::StorageCommand(const StorageCommand& other)
StorageCommand::StorageCommand(const MessageType& type, Priority p)
: StorageMessage(type, generateMsgId()),
- // Default timeout is unlimited. Set from mbus message. Some internal
- // use want unlimited timeout, (such as readbucketinfo, repair bucket
- // etc)
- _timeout(duration::max()),
+ // Default timeout one hour. Set from mbus message. Some internal
+ // use want unlimited timeout, (such as readbucketinfo, repair bucket, etc.)
+ _timeout(MAX_TIMEOUT),
_sourceIndex(0xFFFF)
{
setPriority(p);
@@ -27,10 +30,8 @@ StorageCommand::StorageCommand(const MessageType& type, Priority p)
StorageCommand::~StorageCommand() = default;
void
-StorageCommand::print(std::ostream& out, bool verbose,
- const std::string& indent) const
+StorageCommand::print(std::ostream& out, bool, const std::string&) const
{
- (void) verbose; (void) indent;
out << "StorageCommand(" << _type.getName();
if (_priority != NORMAL) out << ", priority = " << static_cast<int>(_priority);
if (_sourceIndex != 0xFFFF) out << ", source = " << _sourceIndex;