summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/distributor/pendingmessagetrackertest.cpp57
-rw-r--r--storage/src/tests/visiting/visitormanagertest.cpp18
-rw-r--r--storage/src/tests/visiting/visitortest.cpp8
-rw-r--r--storage/src/vespa/storage/bucketdb/bucketmanager.cpp18
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp30
-rw-r--r--storage/src/vespa/storage/distributor/pendingmessagetracker.cpp7
-rw-r--r--storage/src/vespa/storage/distributor/pendingmessagetracker.h16
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.cpp7
-rw-r--r--storage/src/vespa/storage/visiting/visitor.cpp176
-rw-r--r--storage/src/vespa/storage/visiting/visitor.h57
-rw-r--r--storage/src/vespa/storage/visiting/visitormanager.cpp12
-rw-r--r--storage/src/vespa/storage/visiting/visitormanager.h6
-rw-r--r--storage/src/vespa/storage/visiting/visitorthread.cpp29
-rw-r--r--storage/src/vespa/storage/visiting/visitorthread.h4
14 files changed, 162 insertions, 283 deletions
diff --git a/storage/src/tests/distributor/pendingmessagetrackertest.cpp b/storage/src/tests/distributor/pendingmessagetrackertest.cpp
index 0ba374f7190..3bfa1027a82 100644
--- a/storage/src/tests/distributor/pendingmessagetrackertest.cpp
+++ b/storage/src/tests/distributor/pendingmessagetrackertest.cpp
@@ -175,7 +175,7 @@ TEST_F(PendingMessageTrackerTest, simple) {
EXPECT_THAT(ost.str(), HasSubstr(
"<b>Bucket(BucketSpace(0x0000000000000001), BucketId(0x40000000000004d2))</b>\n"
"<ul>\n"
- "<li><i>Node 0</i>: <b>1970-01-01 00:00:01</b> "
+ "<li><i>Node 0</i>: <b>1970-01-01 00:00:01.000 UTC</b> "
"Remove(BucketId(0x40000000000004d2), priority=127)</li>\n"
"</ul>\n"));
}
@@ -248,17 +248,17 @@ TEST_F(PendingMessageTrackerTest, multiple_messages) {
EXPECT_THAT(ost.str(), HasSubstr(
"<b>Bucket(BucketSpace(0x0000000000000001), BucketId(0x40000000000004d2))</b>\n"
"<ul>\n"
- "<li><i>Node 0</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n"
- "<li><i>Node 0</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n"
- "<li><i>Node 1</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n"
- "<li><i>Node 1</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n"
+ "<li><i>Node 0</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n"
+ "<li><i>Node 0</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n"
+ "<li><i>Node 1</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n"
+ "<li><i>Node 1</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n"
"</ul>\n"
"<b>Bucket(BucketSpace(0x0000000000000001), BucketId(0x40000000000011d7))</b>\n"
"<ul>\n"
- "<li><i>Node 0</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n"
- "<li><i>Node 0</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n"
- "<li><i>Node 1</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n"
- "<li><i>Node 1</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n"
+ "<li><i>Node 0</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n"
+ "<li><i>Node 0</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n"
+ "<li><i>Node 1</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n"
+ "<li><i>Node 1</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n"
"</ul>\n"));
}
{
@@ -268,44 +268,23 @@ TEST_F(PendingMessageTrackerTest, multiple_messages) {
EXPECT_THAT(ost.str(), HasSubstr(
"<b>Node 0 (pending count: 4)</b>\n"
"<ul>\n"
- "<li><i>Node 0</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n"
- "<li><i>Node 0</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n"
- "<li><i>Node 0</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n"
- "<li><i>Node 0</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n"
+ "<li><i>Node 0</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n"
+ "<li><i>Node 0</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n"
+ "<li><i>Node 0</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n"
+ "<li><i>Node 0</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n"
"</ul>\n"
"<b>Node 1 (pending count: 4)</b>\n"
"<ul>\n"
- "<li><i>Node 1</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n"
- "<li><i>Node 1</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n"
- "<li><i>Node 1</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n"
- "<li><i>Node 1</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n"
+ "<li><i>Node 1</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n"
+ "<li><i>Node 1</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n"
+ "<li><i>Node 1</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n"
+ "<li><i>Node 1</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n"
"</ul>\n"));
}
}
namespace {
-template <typename T>
-std::string setToString(const std::set<T>& s)
-{
- std::ostringstream ost;
- ost << '{';
- for (typename std::set<T>::const_iterator i(s.begin()), e(s.end());
- i != e; ++i)
- {
- if (i != s.begin()) {
- ost << ',';
- }
- ost << *i;
- }
- ost << '}';
- return ost.str();
-}
-
-}
-
-namespace {
-
class TestChecker : public PendingMessageTracker::Checker
{
public:
@@ -443,7 +422,7 @@ TEST_F(PendingMessageTrackerTest, busy_reply_marks_node_as_busy) {
TEST_F(PendingMessageTrackerTest, busy_node_duration_can_be_adjusted) {
Fixture f;
auto cmd = f.sendPut(RequestBuilder().toNode(0));
- f.tracker().setNodeBusyDuration(std::chrono::seconds(10));
+ f.tracker().setNodeBusyDuration(10s);
f.sendPutReply(*cmd, RequestBuilder(), api::ReturnCode(api::ReturnCode::BUSY));
EXPECT_TRUE(f.tracker().getNodeInfo().isBusy(0));
f.clock().addSecondsToTime(11);
diff --git a/storage/src/tests/visiting/visitormanagertest.cpp b/storage/src/tests/visiting/visitormanagertest.cpp
index be4e7270c69..a82514acb03 100644
--- a/storage/src/tests/visiting/visitormanagertest.cpp
+++ b/storage/src/tests/visiting/visitormanagertest.cpp
@@ -217,7 +217,7 @@ VisitorManagerTest::getSession(uint32_t n)
// Wait until we have started the visitor
const std::vector<TestVisitorMessageSession*>& sessions(_messageSessionFactory->_visitorSessions);
framework::defaultimplementation::RealClock clock;
- framework::MilliSecTime endTime(clock.getTimeInMillis() + framework::MilliSecTime(30 * 1000));
+ vespalib::steady_time endTime = clock.getMonotonicTime() + 30s;
while (true) {
{
std::lock_guard lock(_messageSessionFactory->_accessLock);
@@ -225,9 +225,8 @@ VisitorManagerTest::getSession(uint32_t n)
return *sessions[n];
}
}
- if (clock.getTimeInMillis() > endTime) {
- throw vespalib::IllegalStateException(
- "Timed out waiting for visitor session", VESPA_STRLOC);
+ if (clock.getMonotonicTime() > endTime) {
+ throw vespalib::IllegalStateException("Timed out waiting for visitor session", VESPA_STRLOC);
}
std::this_thread::sleep_for(10ms);
}
@@ -255,12 +254,10 @@ VisitorManagerTest::getMessagesAndReply(
switch (session.sentMessages[i]->getType()) {
case documentapi::DocumentProtocol::MESSAGE_PUTDOCUMENT:
- docs.push_back(static_cast<documentapi::PutDocumentMessage&>(
- *session.sentMessages[i]).getDocumentSP());
+ docs.push_back(static_cast<documentapi::PutDocumentMessage&>(*session.sentMessages[i]).getDocumentSP());
break;
case documentapi::DocumentProtocol::MESSAGE_REMOVEDOCUMENT:
- docIds.push_back(static_cast<documentapi::RemoveDocumentMessage&>(
- *session.sentMessages[i]).getDocumentId());
+ docIds.push_back(static_cast<documentapi::RemoveDocumentMessage&>(*session.sentMessages[i]).getDocumentId());
break;
default:
break;
@@ -355,10 +352,7 @@ TEST_F(VisitorManagerTest, normal_usage) {
getMessagesAndReply(1, getSession(0), docs, docIds);
// All data has been replied to, expecting to get a create visitor reply
- ASSERT_NO_FATAL_FAILURE(
- verifyCreateVisitorReply(api::ReturnCode::OK,
- int(docs.size()),
- getTotalSerializedSize(docs)));
+ ASSERT_NO_FATAL_FAILURE(verifyCreateVisitorReply(api::ReturnCode::OK, int(docs.size()), getTotalSerializedSize(docs)));
EXPECT_EQ(1u, getMatchingDocuments(docs));
EXPECT_FALSE(_manager->hasPendingMessageState());
diff --git a/storage/src/tests/visiting/visitortest.cpp b/storage/src/tests/visiting/visitortest.cpp
index f3a538b7832..565131b3b99 100644
--- a/storage/src/tests/visiting/visitortest.cpp
+++ b/storage/src/tests/visiting/visitortest.cpp
@@ -256,11 +256,9 @@ TestVisitorMessageSession&
VisitorTest::getSession(uint32_t n)
{
// Wait until we have started the visitor
- const std::vector<TestVisitorMessageSession*>& sessions(
- _messageSessionFactory->_visitorSessions);
+ const std::vector<TestVisitorMessageSession*>& sessions(_messageSessionFactory->_visitorSessions);
framework::defaultimplementation::RealClock clock;
- framework::MilliSecTime endTime(
- clock.getTimeInMillis() + framework::MilliSecTime(30 * 1000));
+ vespalib::steady_time endTime = clock.getMonotonicTime() + 30s;
while (true) {
{
std::lock_guard lock(_messageSessionFactory->_accessLock);
@@ -268,7 +266,7 @@ VisitorTest::getSession(uint32_t n)
return *sessions[n];
}
}
- if (clock.getTimeInMillis() > endTime) {
+ if (clock.getMonotonicTime() > endTime) {
throw vespalib::IllegalStateException(
"Timed out waiting for visitor session", VESPA_STRLOC);
}
diff --git a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp
index e68cbd75d52..2f1622750d7 100644
--- a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp
+++ b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp
@@ -182,7 +182,7 @@ struct MetricsUpdater {
void add(const MetricsUpdater& rhs) noexcept {
auto& d = count;
- auto& s = rhs.count;
+ const auto& s = rhs.count;
d.buckets += s.buckets;
d.docs += s.docs;
d.bytes += s.bytes;
@@ -209,7 +209,7 @@ BucketManager::updateMetrics(bool updateDocCount)
if (!updateDocCount || _doneInitialized) {
MetricsUpdater total;
- for (auto& space : _component.getBucketSpaceRepo()) {
+ for (const auto& space : _component.getBucketSpaceRepo()) {
MetricsUpdater m;
auto guard = space.second->bucketDatabase().acquire_read_guard();
guard->for_each(std::ref(m));
@@ -238,7 +238,7 @@ BucketManager::updateMetrics(bool updateDocCount)
}
void BucketManager::update_bucket_db_memory_usage_metrics() {
- for (auto& space : _component.getBucketSpaceRepo()) {
+ for (const auto& space : _component.getBucketSpaceRepo()) {
auto bm = _metrics->bucket_spaces.find(space.first);
bm->second->bucket_db_metrics.memory_usage.update(space.second->bucketDatabase().detailed_memory_usage());
}
@@ -342,7 +342,7 @@ BucketManager::reportStatus(std::ostream& out,
using vespalib::xml::XmlAttribute;
xmlReporter << vespalib::xml::XmlTag("buckets");
- for (auto& space : _component.getBucketSpaceRepo()) {
+ for (const auto& space : _component.getBucketSpaceRepo()) {
xmlReporter << XmlTag("bucket-space")
<< XmlAttribute("name", document::FixedBucketSpaces::to_string(space.first));
BucketDBDumper dumper(xmlReporter.getStream());
@@ -404,7 +404,7 @@ bool BucketManager::onRequestBucketInfo(
api::RequestBucketInfoReply::EntryVector info;
if (!cmd->getBuckets().empty()) {
for (auto bucketId : cmd->getBuckets()) {
- for (auto & entry : _component.getBucketDatabase(bucketSpace).getAll(bucketId, "BucketManager::onRequestBucketInfo")) {
+ for (const auto & entry : _component.getBucketDatabase(bucketSpace).getAll(bucketId, "BucketManager::onRequestBucketInfo")) {
info.emplace_back(entry.first, entry.second->getBucketInfo());
}
}
@@ -457,7 +457,7 @@ BucketManager::leaveQueueProtectedSection(ScopedQueueDispatchGuard& queueGuard)
// may alter the relevant state.
--_requestsCurrentlyProcessing;
if (_requestsCurrentlyProcessing == 0) {
- for (auto& qr : _queuedReplies) {
+ for (const auto& qr : _queuedReplies) {
sendUp(qr);
}
_queuedReplies.clear();
@@ -494,7 +494,7 @@ BucketManager::processRequestBucketInfoCommands(document::BucketSpace bucketSpac
reqs.size(), bucketSpace.toString().c_str(), clusterState->toString().c_str(), our_hash.c_str());
std::lock_guard clusterStateGuard(_clusterStateLock);
- for (auto & req : std::ranges::reverse_view(reqs)) {
+ for (const auto & req : std::ranges::reverse_view(reqs)) {
// Currently small requests should not be forwarded to worker thread
assert(req->hasSystemState());
const auto their_hash = req->getDistributionHash();
@@ -547,7 +547,7 @@ BucketManager::processRequestBucketInfoCommands(document::BucketSpace bucketSpac
std::ostringstream distrList;
std::unordered_map<uint16_t, api::RequestBucketInfoReply::EntryVector> result;
- for (auto& nodeAndCmd : requests) {
+ for (const auto& nodeAndCmd : requests) {
result[nodeAndCmd.first];
if (LOG_WOULD_LOG(debug)) {
distrList << ' ' << nodeAndCmd.first;
@@ -576,7 +576,7 @@ BucketManager::processRequestBucketInfoCommands(document::BucketSpace bucketSpac
"BucketManager::processRequestBucketInfoCommands-2");
}
_metrics->fullBucketInfoLatency.addValue(runStartTime.getElapsedTimeAsDouble());
- for (auto& nodeAndCmd : requests) {
+ for (const auto& nodeAndCmd : requests) {
auto reply(std::make_shared<api::RequestBucketInfoReply>(*nodeAndCmd.second));
reply->getBucketInfo().swap(result[nodeAndCmd.first]);
sendUp(reply);
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp
index 667afbf67a0..393136de654 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp
@@ -15,6 +15,7 @@ LOG_SETUP(".distributor.operation.idealstate.merge");
using vespalib::to_utc;
using vespalib::to_string;
+using vespalib::make_string_short::fmt;
namespace storage::distributor {
MergeOperation::~MergeOperation() = default;
@@ -24,8 +25,7 @@ MergeOperation::getStatus() const
{
return
Operation::getStatus() +
- vespalib::make_string(" . Sent MergeBucketCommand at %s",
- to_string(to_utc(_sentMessageTime)).c_str());
+ fmt(" . Sent MergeBucketCommand at %s", to_string(to_utc(_sentMessageTime)).c_str());
}
void
@@ -35,7 +35,7 @@ MergeOperation::addIdealNodes(
std::vector<MergeMetaData>& result)
{
// Add all ideal nodes first. These are never marked source-only.
- for (unsigned short idealNode : idealNodes) {
+ for (uint16_t idealNode : idealNodes) {
const MergeMetaData* entry = nullptr;
for (const auto & node : nodes) {
if (idealNode == node._nodeIndex) {
@@ -56,7 +56,7 @@ MergeOperation::addCopiesNotAlreadyAdded(uint16_t redundancy,
const std::vector<MergeMetaData>& nodes,
std::vector<MergeMetaData>& result)
{
- for (auto node : nodes) {
+ for (const auto & node : nodes) {
bool found = false;
for (const auto & mergeData : result) {
if (mergeData._nodeIndex == node._nodeIndex) {
@@ -123,7 +123,7 @@ MergeOperation::onStart(DistributorStripeMessageSender& sender)
std::vector<std::unique_ptr<BucketCopy> > newCopies;
std::vector<MergeMetaData> nodes;
- for (unsigned short node : getNodes()) {
+ for (uint16_t node : getNodes()) {
const BucketCopy* copy = entry->getNode(node);
if (copy == nullptr) { // New copies?
newCopies.emplace_back(std::make_unique<BucketCopy>(BucketCopy::recentlyCreatedCopy(0, node)));
@@ -153,8 +153,7 @@ MergeOperation::onStart(DistributorStripeMessageSender& sender)
msg->set_use_unordered_forwarding(true);
}
- LOG(debug, "Sending %s to storage node %u", msg->toString().c_str(),
- _mnodes[0].index);
+ LOG(debug, "Sending %s to storage node %u", msg->toString().c_str(), _mnodes[0].index);
// Set timeout to one hour to prevent hung nodes that manage to keep
// connections open from stalling merges in the cluster indefinitely.
@@ -165,8 +164,7 @@ MergeOperation::onStart(DistributorStripeMessageSender& sender)
_sentMessageTime = _manager->node_context().clock().getMonotonicTime();
} else {
- LOGBP(debug,
- "Unable to merge bucket %s, since only one copy is available. System state %s",
+ LOGBP(debug, "Unable to merge bucket %s, since only one copy is available. System state %s",
getBucketId().toString().c_str(), clusterState.toString().c_str());
_ok = false;
done();
@@ -178,7 +176,7 @@ MergeOperation::sourceOnlyCopyChangedDuringMerge(
const BucketDatabase::Entry& currentState) const
{
assert(currentState.valid());
- for (auto mnode : _mnodes) {
+ for (const auto & mnode : _mnodes) {
const BucketCopy* copyBefore(_infoBefore.getNode(mnode.index));
if (!copyBefore) {
continue;
@@ -206,7 +204,7 @@ MergeOperation::deleteSourceOnlyNodes(
{
assert(currentState.valid());
std::vector<uint16_t> sourceOnlyNodes;
- for (auto & mnode : _mnodes) {
+ for (const auto & mnode : _mnodes) {
const uint16_t nodeIndex = mnode.index;
const BucketCopy* copy = currentState->getNode(nodeIndex);
if (!copy) {
@@ -338,7 +336,7 @@ bool MergeOperation::isBlocked(const DistributorStripeOperationContext& ctx,
// to enter the merge throttler queues, displacing lower priority merges.
if (!is_global_bucket_merge()) {
const auto& node_info = ctx.pending_message_tracker().getNodeInfo();
- for (auto node : getNodes()) {
+ for (uint16_t node : getNodes()) {
if (node_info.isBusy(node)) {
return true;
}
@@ -364,11 +362,9 @@ bool MergeOperation::all_involved_nodes_support_unordered_merge_chaining() const
MergeBucketMetricSet*
MergeOperation::get_merge_metrics()
{
- if (_manager) {
- return dynamic_cast<MergeBucketMetricSet *>(_manager->getMetrics().operations[getType()].get());
- } else {
- return nullptr;
- }
+ return (_manager)
+ ? dynamic_cast<MergeBucketMetricSet *>(_manager->getMetrics().operations[getType()].get())
+ : nullptr;
}
}
diff --git a/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp b/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp
index 533493a79a2..8618d570685 100644
--- a/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp
+++ b/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp
@@ -3,7 +3,6 @@
#include <vespa/vespalib/stllike/asciistream.h>
#include <vespa/vespalib/util/stringfmt.h>
#include <map>
-#include <algorithm>
#include <vespa/log/log.h>
LOG_SETUP(".pendingmessages");
@@ -15,7 +14,7 @@ PendingMessageTracker::PendingMessageTracker(framework::ComponentRegister& cr, u
vespalib::make_string("Pending messages to storage nodes (stripe %u)", stripe_index)),
_component(cr, "pendingmessagetracker"),
_nodeInfo(_component.getClock()),
- _nodeBusyDuration(60),
+ _nodeBusyDuration(60s),
_deferred_read_tasks(),
_lock()
{
@@ -38,7 +37,7 @@ vespalib::string
PendingMessageTracker::MessageEntry::toHtml() const {
vespalib::asciistream ss;
ss << "<li><i>Node " << nodeIdx << "</i>: "
- << "<b>" << framework::MilliSecTime(timeStamp.count()).toString() << "</b> "
+ << "<b>" << vespalib::to_string(timeStamp) << "</b> "
<< api::MessageType::get(api::MessageType::Id(msgType)).getName() << "(" << bucket.getBucketId() << ", priority=" << priority << ")</li>\n";
return ss.str();
}
@@ -46,7 +45,7 @@ PendingMessageTracker::MessageEntry::toHtml() const {
PendingMessageTracker::TimePoint
PendingMessageTracker::currentTime() const
{
- return TimePoint(_component.getClock().getTimeInMillis().getTime());
+ return _component.getClock().getSystemTime();
}
namespace {
diff --git a/storage/src/vespa/storage/distributor/pendingmessagetracker.h b/storage/src/vespa/storage/distributor/pendingmessagetracker.h
index 93238b5a83f..fb672d5ee31 100644
--- a/storage/src/vespa/storage/distributor/pendingmessagetracker.h
+++ b/storage/src/vespa/storage/distributor/pendingmessagetracker.h
@@ -68,13 +68,7 @@ public:
virtual bool check(uint32_t messageType, uint16_t node, uint8_t priority) = 0;
};
- /**
- * Time point represented as the millisecond interval from the framework
- * clock's epoch to a given point in time. Note that it'd be more
- * semantically correct to use std::chrono::time_point, but it is bound
- * to specific chrono clock types, their epochs and duration resolution.
- */
- using TimePoint = std::chrono::milliseconds;
+ using TimePoint = vespalib::system_time;
PendingMessageTracker(framework::ComponentRegister&, uint32_t stripe_index);
~PendingMessageTracker() override;
@@ -119,8 +113,8 @@ public:
*/
std::vector<uint64_t> clearMessagesForNode(uint16_t node);
- void setNodeBusyDuration(std::chrono::seconds secs) noexcept {
- _nodeBusyDuration = secs;
+ void setNodeBusyDuration(vespalib::duration duration) noexcept {
+ _nodeBusyDuration = duration;
}
void run_once_no_pending_for_bucket(const document::Bucket& bucket, std::unique_ptr<DeferredTask> task);
@@ -136,7 +130,7 @@ private:
MessageEntry(TimePoint timeStamp, uint32_t msgType, uint32_t priority,
uint64_t msgId, document::Bucket bucket, uint16_t nodeIdx) noexcept;
- vespalib::string toHtml() const;
+ [[nodiscard]] vespalib::string toHtml() const;
};
struct MessageIdKey : boost::multi_index::member<MessageEntry, uint64_t, &MessageEntry::msgId> {};
@@ -187,7 +181,7 @@ private:
Messages _messages;
framework::Component _component;
NodeInfo _nodeInfo;
- std::chrono::seconds _nodeBusyDuration;
+ vespalib::duration _nodeBusyDuration;
DeferredBucketTaskMap _deferred_read_tasks;
// Since distributor is currently single-threaded, this will only
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
index ec22d7c064e..db88a22d500 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
@@ -55,7 +55,7 @@ vespalib::string getNodeId(StorageComponent& sc) {
return ost.str();
}
-vespalib::duration TEN_MINUTES = 600s;
+constexpr vespalib::duration STALE_PROTOCOL_LIFETIME = 1h;
}
@@ -694,7 +694,7 @@ CommunicationManager::run(framework::ThreadHandle& thread)
std::lock_guard<std::mutex> guard(_earlierGenerationsLock);
for (auto it(_earlierGenerations.begin());
!_earlierGenerations.empty() &&
- ((it->first + TEN_MINUTES) < _component.getClock().getMonotonicTime());
+ ((it->first + STALE_PROTOCOL_LIFETIME) < _component.getClock().getMonotonicTime());
it = _earlierGenerations.begin())
{
_earlierGenerations.erase(it);
@@ -709,9 +709,8 @@ CommunicationManager::updateMetrics(const MetricLockGuard &)
}
void
-CommunicationManager::print(std::ostream& out, bool verbose, const std::string& indent) const
+CommunicationManager::print(std::ostream& out, bool , const std::string& ) const
{
- (void) verbose; (void) indent;
out << "CommunicationManager";
}
diff --git a/storage/src/vespa/storage/visiting/visitor.cpp b/storage/src/vespa/storage/visiting/visitor.cpp
index 91f304ad9a0..6d36abc896e 100644
--- a/storage/src/vespa/storage/visiting/visitor.cpp
+++ b/storage/src/vespa/storage/visiting/visitor.cpp
@@ -121,12 +121,9 @@ Visitor::VisitorTarget::metaForMessageId(uint64_t msgId)
void
Visitor::VisitorTarget::discardQueuedMessages()
{
- for (MessageQueue::iterator
- it(_queuedMessages.begin()), e(_queuedMessages.end());
- it != e; ++it)
- {
- LOG(spam, "Erasing queued message with id %" PRIu64, it->second);
- releaseMetaForMessageId(it->second);
+ for (const auto & entry : _queuedMessages) {
+ LOG(spam, "Erasing queued message with id %" PRIu64, entry.second);
+ releaseMetaForMessageId(entry.second);
}
_queuedMessages.clear();
}
@@ -310,17 +307,14 @@ Visitor::getStateName(VisitorState s)
return "COMPLETED";
default:
assert(!"Unknown visitor state");
- return NULL;
+ return nullptr;
}
}
Visitor::VisitorState
Visitor::transitionTo(VisitorState newState)
{
- LOG(debug, "Visitor '%s' state transition %s -> %s",
- _id.c_str(),
- getStateName(_state),
- getStateName(newState));
+ LOG(debug, "Visitor '%s' state transition %s -> %s", _id.c_str(), getStateName(_state), getStateName(newState));
VisitorState oldState = _state;
_state = newState;
return oldState;
@@ -339,12 +333,10 @@ Visitor::mayTransitionToCompleted() const
void
Visitor::forceClose()
{
- for (std::list<BucketIterationState*>::iterator it = _bucketStates.begin();
- it != _bucketStates.end(); ++it)
- {
+ for (auto * state : _bucketStates) {
// Reset iterator id so no destroy iterator will be sent
- (*it)->setIteratorId(spi::IteratorId(0));
- delete *it;
+ state->setIteratorId(spi::IteratorId(0));
+ delete state;
}
_bucketStates.clear();
transitionTo(STATE_COMPLETED);
@@ -358,7 +350,7 @@ Visitor::sendReplyOnce()
std::shared_ptr<api::StorageReply> reply(_initiatingCmd->makeReply());
_hitCounter->updateVisitorStatistics(_visitorStatistics);
- static_cast<api::CreateVisitorReply*>(reply.get())->setVisitorStatistics(_visitorStatistics);
+ dynamic_cast<api::CreateVisitorReply*>(reply.get())->setVisitorStatistics(_visitorStatistics);
if (shouldAddMbusTrace()) {
_trace.moveTraceTo(reply->getTrace());
}
@@ -373,17 +365,15 @@ void
Visitor::finalize()
{
if (_state != STATE_COMPLETED) {
- LOG(error, "Attempting to finalize non-completed visitor %s",
- _id.c_str());
+ LOG(error, "Attempting to finalize non-completed visitor %s", _id.c_str());
assert(false);
}
assert(_bucketStates.empty());
if (_result.success()) {
- if (_messageSession->pending() > 0)
- {
+ if (_messageSession->pending() > 0) {
_result = api::ReturnCode(api::ReturnCode::ABORTED);
- try{
+ try {
abortedVisiting();
} catch (std::exception& e) {
LOG(warning, "Visitor %s had a problem in abortVisiting(). As "
@@ -404,43 +394,31 @@ Visitor::finalize()
void
Visitor::discardAllNoPendingBucketStates()
{
- for (BucketStateList::iterator
- it(_bucketStates.begin()), e(_bucketStates.end());
- it != e;)
- {
+ for (auto it = _bucketStates.begin(); it !=_bucketStates.end();) {
BucketIterationState& bstate(**it);
if (bstate.hasPendingControlCommand() || bstate.hasPendingIterators()) {
- LOG(debug,
- "Visitor '%s' not discarding bucket state %s "
- "since it has pending operations",
- _id.c_str(),
- bstate.toString().c_str());
+ LOG(debug, "Visitor '%s' not discarding bucket state %s since it has pending operations",
+ _id.c_str(), bstate.toString().c_str());
++it;
continue;
}
- LOG(debug, "Visitor '%s' discarding bucket state %s",
- _id.c_str(), bstate.toString().c_str());
+ LOG(debug, "Visitor '%s' discarding bucket state %s", _id.c_str(), bstate.toString().c_str());
delete *it;
it = _bucketStates.erase(it);
}
}
void
-Visitor::fail(const api::ReturnCode& reason,
- bool overrideExistingError)
+Visitor::fail(const api::ReturnCode& reason, bool overrideExistingError)
{
assert(_state != STATE_COMPLETED);
if (_result.getResult() < reason.getResult() || overrideExistingError) {
- LOG(debug, "Setting result of visitor '%s' to %s",
- _id.c_str(), reason.toString().c_str());
+ LOG(debug, "Setting result of visitor '%s' to %s", _id.c_str(), reason.toString().c_str());
_result = reason;
}
if (_visitorTarget.hasQueuedMessages()) {
- LOG(debug, "Visitor '%s' dropping %zu queued messages bound to %s "
- "since visitor has failed",
- _id.c_str(),
- _visitorTarget._queuedMessages.size(),
- _controlDestination->toString().c_str());
+ LOG(debug, "Visitor '%s' dropping %zu queued messages bound to %s since visitor has failed",
+ _id.c_str(), _visitorTarget._queuedMessages.size(), _controlDestination->toString().c_str());
_visitorTarget.discardQueuedMessages();
}
discardAllNoPendingBucketStates();
@@ -448,8 +426,7 @@ Visitor::fail(const api::ReturnCode& reason,
}
bool
-Visitor::shouldReportProblemToClient(const api::ReturnCode& code,
- size_t retryCount) const
+Visitor::shouldReportProblemToClient(const api::ReturnCode& code, size_t retryCount)
{
// Report _once_ per message if we reach a certain retry threshold.
if (retryCount == TRANSIENT_ERROR_RETRIES_BEFORE_NOTIFY) {
@@ -521,7 +498,7 @@ Visitor::start(api::VisitorId id, api::StorageMessage::Id cmdId,
_visitorOptions._fromTime = fromTimestamp;
_visitorOptions._toTime = toTimestamp;
_currentBucket = 0;
- _hitCounter.reset(new HitCounter());
+ _hitCounter = std::make_unique<HitCounter>();
_messageSession = std::move(messageSession);
_documentPriority = documentPriority;
@@ -612,8 +589,7 @@ Visitor::handleDocumentApiReply(mbus::Reply::UP reply, VisitorThreadMetrics& met
uint64_t messageId = reply->getContext().value.UINT64;
uint32_t removed = _visitorTarget._pendingMessages.erase(messageId);
- LOG(spam, "Visitor '%s' reply %s for message ID %" PRIu64, _id.c_str(),
- reply->toString().c_str(), messageId);
+ LOG(spam, "Visitor '%s' reply %s for message ID %" PRIu64, _id.c_str(), reply->toString().c_str(), messageId);
assert(removed == 1);
(void) removed;
@@ -634,20 +610,16 @@ Visitor::handleDocumentApiReply(mbus::Reply::UP reply, VisitorThreadMetrics& met
metrics.visitorDestinationFailureReplies.inc();
if (message->getType() == documentapi::DocumentProtocol::MESSAGE_VISITORINFO) {
- LOG(debug, "Aborting visitor as we failed to talk to controller: %s",
- reply->getError(0).toString().c_str());
- api::ReturnCode returnCode(
- static_cast<api::ReturnCode::Result>(
- reply->getError(0).getCode()),
- reply->getError(0).getMessage());
+ LOG(debug, "Aborting visitor as we failed to talk to controller: %s", reply->getError(0).toString().c_str());
+ api::ReturnCode returnCode(static_cast<api::ReturnCode::Result>(reply->getError(0).getCode()),
+ reply->getError(0).getMessage());
fail(returnCode, true);
close();
return;
}
- api::ReturnCode returnCode(
- static_cast<api::ReturnCode::Result>(reply->getError(0).getCode()),
- reply->getError(0).getMessage());
+ api::ReturnCode returnCode(static_cast<api::ReturnCode::Result>(reply->getError(0).getCode()),
+ reply->getError(0).getMessage());
const bool should_fail = remap_docapi_message_error_code(returnCode);
if (should_fail) {
// Abort - something is wrong with target.
@@ -657,8 +629,7 @@ Visitor::handleDocumentApiReply(mbus::Reply::UP reply, VisitorThreadMetrics& met
}
if (failed()) {
- LOG(debug, "Failed to send message from visitor '%s', due to "
- "%s. Not resending since visitor has failed",
+ LOG(debug, "Failed to send message from visitor '%s', due to %s. Not resending since visitor has failed",
_id.c_str(), returnCode.toString().c_str());
return;
}
@@ -709,8 +680,7 @@ Visitor::onCreateIteratorReply(
if (reply->getResult().failed()) {
LOG(debug, "Failed to create iterator for bucket %s: %s",
- bucketId.toString().c_str(),
- reply->getResult().toString().c_str());
+ bucketId.toString().c_str(), reply->getResult().toString().c_str());
fail(reply->getResult());
delete *it;
_bucketStates.erase((++it).base());
@@ -718,17 +688,14 @@ Visitor::onCreateIteratorReply(
}
bucketState.setIteratorId(reply->getIteratorId());
if (failed()) {
- LOG(debug, "Create iterator for bucket %s is OK, "
- "but visitor has failed: %s",
- bucketId.toString().c_str(),
- _result.toString().c_str());
+ LOG(debug, "Create iterator for bucket %s is OK, but visitor has failed: %s",
+ bucketId.toString().c_str(), _result.toString().c_str());
delete *it;
_bucketStates.erase((++it).base());
return;
}
- LOG(debug, "Visitor '%s' starting to visit bucket %s.",
- _id.c_str(), bucketId.toString().c_str());
+ LOG(debug, "Visitor '%s' starting to visit bucket %s.", _id.c_str(), bucketId.toString().c_str());
auto cmd = std::make_shared<GetIterCommand>(bucket, bucketState.getIteratorId(), _docBlockSize);
cmd->getTrace().setLevel(_traceLevel);
cmd->setPriority(_priority);
@@ -737,13 +704,10 @@ Visitor::onCreateIteratorReply(
}
void
-Visitor::onGetIterReply(const std::shared_ptr<GetIterReply>& reply,
- VisitorThreadMetrics& metrics)
+Visitor::onGetIterReply(const std::shared_ptr<GetIterReply>& reply, VisitorThreadMetrics& metrics)
{
LOG(debug, "Visitor '%s' got get iter reply for bucket %s: %s",
- _id.c_str(),
- reply->getBucketId().toString().c_str(),
- reply->getResult().toString().c_str());
+ _id.c_str(), reply->getBucketId().toString().c_str(), reply->getResult().toString().c_str());
auto it = _bucketStates.rbegin();
// New requests will be pushed on end of list.. So searching
@@ -763,10 +727,8 @@ Visitor::onGetIterReply(const std::shared_ptr<GetIterReply>& reply,
!reply->getResult().isShutdownRelated() &&
!reply->getResult().isBucketDisappearance())
{
- LOG(warning, "Failed to talk to persistence layer for bucket "
- "%s. Aborting visitor '%s': %s",
- reply->getBucketId().toString().c_str(),
- _id.c_str(), reply->getResult().toString().c_str());
+ LOG(warning, "Failed to talk to persistence layer for bucket %s. Aborting visitor '%s': %s",
+ reply->getBucketId().toString().c_str(), _id.c_str(), reply->getResult().toString().c_str());
}
fail(reply->getResult());
BucketIterationState& bucketState(**it);
@@ -783,17 +745,14 @@ Visitor::onGetIterReply(const std::shared_ptr<GetIterReply>& reply,
bucketState.setCompleted(reply->isCompleted());
--bucketState._pendingIterators;
if (!reply->getEntries().empty()) {
- LOG(debug, "Processing documents in handle given from bucket %s.",
- reply->getBucketId().toString().c_str());
+ LOG(debug, "Processing documents in handle given from bucket %s.", reply->getBucketId().toString().c_str());
// While handling documents we should not keep locks, such
// that visitor may process several things at once.
if (isRunning()) {
MBUS_TRACE(reply->getTrace(), 5,
vespalib::make_string("Visitor %s handling block of %zu documents.",
_id.c_str(), reply->getEntries().size()));
- LOG(debug, "Visitor %s handling block of %zu documents.",
- _id.c_str(),
- reply->getEntries().size());
+ LOG(debug, "Visitor %s handling block of %zu documents.", _id.c_str(), reply->getEntries().size());
try {
framework::MilliSecTimer processingTimer(_component.getClock());
handleDocuments(reply->getBucketId(), reply->getEntries(), *_hitCounter);
@@ -913,15 +872,11 @@ Visitor::continueVisitor()
}
}
- LOG(debug, "No pending messages, tagging visitor '%s' complete",
- _id.c_str());
+ LOG(debug, "No pending messages, tagging visitor '%s' complete", _id.c_str());
transitionTo(STATE_COMPLETED);
} else {
- LOG(debug, "Visitor %s waiting for all commands to be replied to "
- "(pending=%zu, queued=%zu)",
- _id.c_str(),
- _visitorTarget._pendingMessages.size(),
- _visitorTarget._queuedMessages.size());
+ LOG(debug, "Visitor %s waiting for all commands to be replied to (pending=%zu, queued=%zu)",
+ _id.c_str(), _visitorTarget._pendingMessages.size(), _visitorTarget._queuedMessages.size());
}
return false;
} else {
@@ -981,14 +936,14 @@ Visitor::getStatus(std::ostream& out, bool verbose) const
<< (_visitorOptions._visitRemoves ? "true" : "false")
<< "</td></tr>\n";
out << "<tr><td>Control destination</td><td>";
- if (_controlDestination.get()) {
+ if (_controlDestination) {
out << xml_content_escaped(_controlDestination->toString());
} else {
out << "nil";
}
out << "</td></tr>\n";
out << "<tr><td>Data destination</td><td>";
- if (_dataDestination.get()) {
+ if (_dataDestination) {
out << xml_content_escaped(_dataDestination->toString());
} else {
out << "nil";
@@ -1078,17 +1033,13 @@ Visitor::getStatus(std::ostream& out, bool verbose) const
bool
Visitor::getIterators()
{
- LOG(debug, "getIterators, visitor %s, _buckets = %zu , _bucketStates = %zu, "
- "_currentBucket = %d",
- _id.c_str(), _buckets.size(),
- _bucketStates.size(), _currentBucket);
+ LOG(debug, "getIterators, visitor %s, _buckets = %zu , _bucketStates = %zu, _currentBucket = %d",
+ _id.c_str(), _buckets.size(), _bucketStates.size(), _currentBucket);
// Don't send any further GetIters if we're closing
if (!isRunning()) {
if (hasPendingIterators()) {
- LOG(debug, "Visitor has failed but waiting for %zu "
- "buckets to finish processing",
- _bucketStates.size());
+ LOG(debug, "Visitor has failed but waiting for %zu buckets to finish processing", _bucketStates.size());
return true;
} else {
return false;
@@ -1097,13 +1048,10 @@ Visitor::getIterators()
// Go through buckets found. Take the first that doesn't have requested
// state and request a new piece.
- for (std::list<BucketIterationState*>::iterator it = _bucketStates.begin();
- it != _bucketStates.end();)
- {
+ for (auto it = _bucketStates.begin();it != _bucketStates.end();) {
assert(*it);
BucketIterationState& bucketState(**it);
- if ((bucketState._pendingIterators
- >= _visitorOptions._maxParallelOneBucket)
+ if ((bucketState._pendingIterators >= _visitorOptions._maxParallelOneBucket)
|| bucketState.hasPendingControlCommand())
{
++it;
@@ -1118,20 +1066,17 @@ Visitor::getIterators()
}
try{
completedBucket(bucketState.getBucketId(), *_hitCounter);
- _visitorStatistics.setBucketsVisited(
- _visitorStatistics.getBucketsVisited() + 1);
+ _visitorStatistics.setBucketsVisited(_visitorStatistics.getBucketsVisited() + 1);
} catch (std::exception& e) {
std::ostringstream ost;
- ost << "Visitor fail to run completedBucket() notification: "
- << e.what();
+ ost << "Visitor fail to run completedBucket() notification: " << e.what();
reportProblem(ost.str());
}
delete *it;
it = _bucketStates.erase(it);
continue;
}
- auto cmd = std::make_shared<GetIterCommand>(
- bucketState.getBucket(), bucketState.getIteratorId(), _docBlockSize);
+ auto cmd = std::make_shared<GetIterCommand>(bucketState.getBucket(), bucketState.getIteratorId(), _docBlockSize);
cmd->getTrace().setLevel(_traceLevel);
cmd->setPriority(_priority);
_messageHandler->send(cmd, *this);
@@ -1143,7 +1088,7 @@ Visitor::getIterators()
}
// If there aren't anymore buckets to iterate, we're done
- if (_bucketStates.size() == 0 && _currentBucket >= _buckets.size()) {
+ if (_bucketStates.empty() && _currentBucket >= _buckets.size()) {
LOG(debug, "No more buckets to visit for visitor '%s'.", _id.c_str());
return false;
}
@@ -1157,17 +1102,13 @@ Visitor::getIterators()
_currentBucket < _buckets.size())
{
document::Bucket bucket(_bucketSpace, _buckets[_currentBucket]);
- std::unique_ptr<BucketIterationState> newBucketState(
- new BucketIterationState(*this, *_messageHandler, bucket));
+ auto newBucketState = std::make_unique<BucketIterationState>(*this, *_messageHandler, bucket);
LOG(debug, "Visitor '%s': Sending create iterator for bucket %s.",
_id.c_str(), bucket.getBucketId().toString().c_str());
- spi::Selection selection
- = spi::Selection(spi::DocumentSelection(_documentSelectionString));
- selection.setFromTimestamp(
- spi::Timestamp(_visitorOptions._fromTime.getTime()));
- selection.setToTimestamp(
- spi::Timestamp(_visitorOptions._toTime.getTime()));
+ spi::Selection selection = spi::Selection(spi::DocumentSelection(_documentSelectionString));
+ selection.setFromTimestamp(spi::Timestamp(_visitorOptions._fromTime.getTime()));
+ selection.setToTimestamp(spi::Timestamp(_visitorOptions._toTime.getTime()));
auto cmd = std::make_shared<CreateIteratorCommand>(bucket, selection,_visitorOptions._fieldSet,
_visitorOptions._visitRemoves
@@ -1184,8 +1125,7 @@ Visitor::getIterators()
}
if (sentCount == 0) {
if (LOG_WOULD_LOG(debug)) {
- LOG(debug, "Enough iterators being processed. Doing nothing for "
- "visitor '%s' bucketStates = %zu.",
+ LOG(debug, "Enough iterators being processed. Doing nothing for visitor '%s' bucketStates = %zu.",
_id.c_str(), _bucketStates.size());
for (const auto& state : _bucketStates) {
LOG(debug, "Existing: %s", state->toString().c_str());
diff --git a/storage/src/vespa/storage/visiting/visitor.h b/storage/src/vespa/storage/visiting/visitor.h
index 0737c5612c0..9b6d8e348b9 100644
--- a/storage/src/vespa/storage/visiting/visitor.h
+++ b/storage/src/vespa/storage/visiting/visitor.h
@@ -136,28 +136,24 @@ private:
{}
/** Sends DestroyIterator over _messageHandler if _iteratorId != 0 */
- ~BucketIterationState();
+ ~BucketIterationState() override;
void setCompleted(bool completed = true) { _completed = completed; }
- bool isCompleted() const { return _completed; }
+ [[nodiscard]] bool isCompleted() const { return _completed; }
- document::Bucket getBucket() const { return _bucket; }
- document::BucketId getBucketId() const { return _bucket.getBucketId(); }
+ [[nodiscard]] document::Bucket getBucket() const { return _bucket; }
+ [[nodiscard]] document::BucketId getBucketId() const { return _bucket.getBucketId(); }
void setIteratorId(spi::IteratorId iteratorId) {
_iteratorId = iteratorId;
}
- spi::IteratorId getIteratorId() const { return _iteratorId; }
+ [[nodiscard]] spi::IteratorId getIteratorId() const { return _iteratorId; }
- void setPendingControlCommand() {
- _iteratorId = spi::IteratorId(0);
- }
-
- bool hasPendingControlCommand() const {
+ [[nodiscard]] bool hasPendingControlCommand() const {
return _iteratorId == spi::IteratorId(0);
}
- bool hasPendingIterators() const { return _pendingIterators > 0; }
+ [[nodiscard]] bool hasPendingIterators() const { return _pendingIterators > 0; }
void print(std::ostream& out, bool, const std::string& ) const override {
out << "BucketIterationState("
@@ -247,12 +243,10 @@ private:
MessageMeta releaseMetaForMessageId(uint64_t msgId);
void reinsertMeta(MessageMeta);
- bool hasQueuedMessages() const { return !_queuedMessages.empty(); }
+ [[nodiscard]] bool hasQueuedMessages() const { return !_queuedMessages.empty(); }
void discardQueuedMessages();
- uint32_t getMemoryUsage() const noexcept {
- return _memoryUsage;
- }
+ [[nodiscard]] uint32_t getMemoryUsage() const noexcept { return _memoryUsage; }
VisitorTarget();
~VisitorTarget();
@@ -326,9 +320,9 @@ protected:
std::string _documentSelectionString;
vdslib::VisitorStatistics _visitorStatistics;
- bool isCompletedCalled() const { return _calledCompletedVisitor; }
+ [[nodiscard]] bool isCompletedCalled() const { return _calledCompletedVisitor; }
- uint32_t traceLevel() const noexcept { return _traceLevel; }
+ [[nodiscard]] uint32_t traceLevel() const noexcept { return _traceLevel; }
/**
* Attempts to add the given trace message to the internal, memory bounded
@@ -339,7 +333,7 @@ protected:
*/
bool addBoundedTrace(uint32_t level, const vespalib::string& message);
- const vdslib::Parameters& visitor_parameters() const noexcept;
+ [[nodiscard]] const vdslib::Parameters& visitor_parameters() const noexcept;
// Possibly modifies the ReturnCode parameter in-place if its return code should
// be changed based on visitor subclass-specific behavior.
@@ -417,7 +411,7 @@ public:
* The consistency level provided here is propagated through the SPI
* Context object for createIterator calls.
*/
- virtual spi::ReadConsistency getRequiredReadConsistency() const {
+ [[nodiscard]] virtual spi::ReadConsistency getRequiredReadConsistency() const {
return spi::ReadConsistency::STRONG;
}
@@ -428,8 +422,7 @@ public:
/**
* Used to silence transient errors that can happen during normal operation.
*/
- bool shouldReportProblemToClient(const api::ReturnCode&,
- size_t retryCount) const;
+ [[nodiscard]] static bool shouldReportProblemToClient(const api::ReturnCode&, size_t retryCount) ;
/** Called to send report to client of potential non-critical problems. */
void reportProblem(const std::string& problem);
@@ -492,18 +485,16 @@ public:
void getStatus(std::ostream& out, bool verbose) const;
- void setMaxParallel(uint32_t maxParallel)
- { _visitorOptions._maxParallel = maxParallel; }
- void setMaxParallelPerBucket(uint32_t max)
- { _visitorOptions._maxParallelOneBucket = max; }
+ void setMaxParallel(uint32_t maxParallel) { _visitorOptions._maxParallel = maxParallel; }
+ void setMaxParallelPerBucket(uint32_t max) { _visitorOptions._maxParallelOneBucket = max; }
/**
* Sends a message to the data handler for this visitor.
*/
void sendMessage(std::unique_ptr<documentapi::DocumentMessage> documentMessage);
- bool isRunning() const { return _state == STATE_RUNNING; }
- bool isCompleted() const { return _state == STATE_COMPLETED; }
+ [[nodiscard]] bool isRunning() const { return _state == STATE_RUNNING; }
+ [[nodiscard]] bool isCompleted() const { return _state == STATE_COMPLETED; }
private:
/**
@@ -542,11 +533,9 @@ private:
void sendReplyOnce();
- bool hasFailedVisiting() const { return _result.failed(); }
-
- bool hasPendingIterators() const { return !_bucketStates.empty(); }
-
- bool mayTransitionToCompleted() const;
+ [[nodiscard]] bool hasFailedVisiting() const { return _result.failed(); }
+ [[nodiscard]] bool hasPendingIterators() const { return !_bucketStates.empty(); }
+ [[nodiscard]] bool mayTransitionToCompleted() const;
void discardAllNoPendingBucketStates();
@@ -565,9 +554,7 @@ private:
*
* Precondition: attach() must have been called on `this`.
*/
- bool shouldAddMbusTrace() const noexcept {
- return _traceLevel != 0;
- }
+ [[nodiscard]] bool shouldAddMbusTrace() const noexcept { return _traceLevel != 0; }
/**
* Set internal state to the given state value.
diff --git a/storage/src/vespa/storage/visiting/visitormanager.cpp b/storage/src/vespa/storage/visiting/visitormanager.cpp
index a03b9a9a8a3..07938002746 100644
--- a/storage/src/vespa/storage/visiting/visitormanager.cpp
+++ b/storage/src/vespa/storage/visiting/visitormanager.cpp
@@ -187,9 +187,8 @@ VisitorManager::configure(std::unique_ptr<vespa::config::content::core::StorVisi
for (int32_t i=0; i<config->visitorthreads; ++i) {
_visitorThread.emplace_back(
// Naked new due to a lot of private inheritance in VisitorThread and VisitorManager
- std::shared_ptr<VisitorThread>(
- new VisitorThread(i, _componentRegister, _messageSessionFactory,
- _visitorFactories, *_metrics->threads[i], *this)),
+ std::shared_ptr<VisitorThread>(new VisitorThread(i, _componentRegister, _messageSessionFactory,
+ _visitorFactories, *_metrics->threads[i], *this)),
std::map<api::VisitorId, std::string>());
}
}
@@ -450,8 +449,7 @@ VisitorManager::processReply(const std::shared_ptr<api::StorageReply>& reply)
}
void
-VisitorManager::send(const std::shared_ptr<api::StorageCommand>& cmd,
- Visitor& visitor)
+VisitorManager::send(const std::shared_ptr<api::StorageCommand>& cmd, Visitor& visitor)
{
assert(cmd->getType() == api::MessageType::INTERNAL);
// Only add to internal state if not destroy iterator command, as
@@ -460,7 +458,7 @@ VisitorManager::send(const std::shared_ptr<api::StorageCommand>& cmd,
if (static_cast<const api::InternalCommand&>(*cmd).getType() != DestroyIteratorCommand::ID) {
MessageInfo inf;
inf.id = visitor.getVisitorId();
- inf.timestamp = _component.getClock().getTimeInSeconds().getTime();
+ inf.timestamp = _component.getClock().getSystemTime();
inf.timeout = cmd->getTimeout();
if (cmd->getAddress()) {
@@ -623,7 +621,7 @@ VisitorManager::reportHtmlStatus(std::ostream& out,
out << "<tr>"
<< "<td>" << entry.first << "</td>"
<< "<td>" << entry.second.id << "</td>"
- << "<td>" << entry.second.timestamp << "</td>"
+ << "<td>" << vespalib::to_string(entry.second.timestamp) << "</td>"
<< "<td>" << vespalib::count_ms(entry.second.timeout) << "</td>"
<< "<td>" << xml_content_escaped(entry.second.destination) << "</td>"
<< "</tr>\n";
diff --git a/storage/src/vespa/storage/visiting/visitormanager.h b/storage/src/vespa/storage/visiting/visitormanager.h
index 33703b392bc..3e331e1c9a2 100644
--- a/storage/src/vespa/storage/visiting/visitormanager.h
+++ b/storage/src/vespa/storage/visiting/visitormanager.h
@@ -57,7 +57,7 @@ private:
struct MessageInfo {
api::VisitorId id;
- time_t timestamp;
+ vespalib::system_time timestamp;
vespalib::duration timeout;
std::string destination;
};
@@ -168,9 +168,7 @@ private:
* by the formula: fixed + variable * ((255 - priority) / 255)
*/
uint32_t maximumConcurrent(const api::CreateVisitorCommand& cmd) const {
- return _maxFixedConcurrentVisitors + static_cast<uint32_t>(
- _maxVariableConcurrentVisitors
- * ((255.0 - cmd.getPriority()) / 255.0));
+ return _maxFixedConcurrentVisitors + static_cast<uint32_t>(_maxVariableConcurrentVisitors * ((255.0 - cmd.getPriority()) / 255.0));
}
void updateMetrics(const MetricLockGuard &) override;
diff --git a/storage/src/vespa/storage/visiting/visitorthread.cpp b/storage/src/vespa/storage/visiting/visitorthread.cpp
index 55ef83ba658..e3ebef3a3ef 100644
--- a/storage/src/vespa/storage/visiting/visitorthread.cpp
+++ b/storage/src/vespa/storage/visiting/visitorthread.cpp
@@ -126,10 +126,10 @@ VisitorThread::shutdown()
if (event._message.get()) {
if (!event._message->getType().isReply()
&& (event._message->getType() != api::MessageType::INTERNAL
- || static_cast<const api::InternalCommand&>(*event._message).getType() != PropagateVisitorConfig::ID))
+ || dynamic_cast<const api::InternalCommand&>(*event._message).getType() != PropagateVisitorConfig::ID))
{
std::shared_ptr<api::StorageReply> reply(
- static_cast<api::StorageCommand&>(*event._message).makeReply());
+ dynamic_cast<api::StorageCommand&>(*event._message).makeReply());
reply->setResult(api::ReturnCode(api::ReturnCode::ABORTED, "Shutting down storage node."));
_messageSender.send(reply);
}
@@ -197,7 +197,7 @@ VisitorThread::run(framework::ThreadHandle& thread)
// disappear when no visiting is done)
if (entry._message.get() &&
(entry._message->getType() != api::MessageType::INTERNAL
- || static_cast<api::InternalCommand&>(*entry._message).getType() != PropagateVisitorConfig::ID))
+ || dynamic_cast<api::InternalCommand&>(*entry._message).getType() != PropagateVisitorConfig::ID))
{
entry._timer.stop(_metrics.averageQueueWaitingTime);
}
@@ -290,7 +290,7 @@ VisitorThread::close()
} else {
_metrics.completedVisitors.inc(1);
}
- framework::SecondTime currentTime(_component.getClock().getTimeInSeconds());
+ vespalib::steady_time currentTime(_component.getClock().getMonotonicTime());
trimRecentlyCompletedList(currentTime);
_recentlyCompleted.emplace_back(_currentlyRunningVisitor->first, currentTime);
_visitors.erase(_currentlyRunningVisitor);
@@ -298,9 +298,9 @@ VisitorThread::close()
}
void
-VisitorThread::trimRecentlyCompletedList(framework::SecondTime currentTime)
+VisitorThread::trimRecentlyCompletedList(vespalib::steady_time currentTime)
{
- framework::SecondTime recentLimit(currentTime - framework::SecondTime(30));
+ vespalib::steady_time recentLimit(currentTime - 30s);
// Dump all elements that aren't recent anymore
while (!_recentlyCompleted.empty()
&& _recentlyCompleted.front().second < recentLimit)
@@ -313,8 +313,7 @@ void
VisitorThread::handleNonExistingVisitorCall(const Event& entry, ReturnCode& code)
{
// Get current time. Set the time that is the oldest still recent.
- framework::SecondTime currentTime(_component.getClock().getTimeInSeconds());
- trimRecentlyCompletedList(currentTime);
+ trimRecentlyCompletedList(_component.getClock().getMonotonicTime());
// Go through all recent visitors. Ignore request if recent
for (const auto& e : _recentlyCompleted) {
@@ -344,7 +343,7 @@ VisitorThread::createVisitor(vespalib::stringref libName,
auto it = _visitorFactories.find(str);
if (it == _visitorFactories.end()) {
error << "Visitor library " << str << " not found.";
- return std::shared_ptr<Visitor>();
+ return {};
}
auto libIter = _libs.find(str);
@@ -363,7 +362,7 @@ VisitorThread::createVisitor(vespalib::stringref libName,
} catch (std::exception& e) {
error << "Failed to create visitor instance of type " << libName
<< ": " << e.what();
- return std::shared_ptr<Visitor>();
+ return {};
}
}
@@ -690,7 +689,7 @@ VisitorThread::getStatus(vespalib::asciistream& out,
}
for (const auto& cv : _recentlyCompleted) {
out << "<li> Visitor " << cv.first << " done at "
- << cv.second.getTime() << "\n";
+ << vespalib::to_string(vespalib::to_utc(cv.second)) << "\n";
}
out << "</ul>\n";
out << "<h3>Current queue size: " << _queue.size() << "</h3>\n";
@@ -736,12 +735,10 @@ VisitorThread::getStatus(vespalib::asciistream& out,
if (_visitors.empty()) {
out << "None\n";
}
- for (VisitorMap::const_iterator it = _visitors.begin();
- it != _visitors.end(); ++it)
- {
- out << "<a href=\"?visitor=" << it->first
+ for (const auto & v : _visitors) {
+ out << "<a href=\"?visitor=" << v.first
<< (verbose ? "&verbose" : "") << "\">Visitor "
- << it->first << "</a><br>\n";
+ << v.first << "</a><br>\n";
}
}
}
diff --git a/storage/src/vespa/storage/visiting/visitorthread.h b/storage/src/vespa/storage/visiting/visitorthread.h
index 226e7c0631b..56e40328fda 100644
--- a/storage/src/vespa/storage/visiting/visitorthread.h
+++ b/storage/src/vespa/storage/visiting/visitorthread.h
@@ -38,7 +38,7 @@ class VisitorThread : public framework::Runnable,
using VisitorMap = std::map<api::VisitorId, std::shared_ptr<Visitor>>;
VisitorMap _visitors;
- std::deque<std::pair<api::VisitorId, framework::SecondTime>> _recentlyCompleted;
+ std::deque<std::pair<api::VisitorId, vespalib::steady_time>> _recentlyCompleted;
struct Event {
enum class Type {
@@ -118,7 +118,7 @@ private:
*/
Event popNextQueuedEventIfAvailable();
void tick();
- void trimRecentlyCompletedList(framework::SecondTime currentTime);
+ void trimRecentlyCompletedList(vespalib::steady_time currentTime);
void handleNonExistingVisitorCall(const Event& entry, api::ReturnCode& code);
std::shared_ptr<Visitor> createVisitor(vespalib::stringref libName,