diff options
Diffstat (limited to 'storage')
14 files changed, 162 insertions, 283 deletions
diff --git a/storage/src/tests/distributor/pendingmessagetrackertest.cpp b/storage/src/tests/distributor/pendingmessagetrackertest.cpp index 0ba374f7190..3bfa1027a82 100644 --- a/storage/src/tests/distributor/pendingmessagetrackertest.cpp +++ b/storage/src/tests/distributor/pendingmessagetrackertest.cpp @@ -175,7 +175,7 @@ TEST_F(PendingMessageTrackerTest, simple) { EXPECT_THAT(ost.str(), HasSubstr( "<b>Bucket(BucketSpace(0x0000000000000001), BucketId(0x40000000000004d2))</b>\n" "<ul>\n" - "<li><i>Node 0</i>: <b>1970-01-01 00:00:01</b> " + "<li><i>Node 0</i>: <b>1970-01-01 00:00:01.000 UTC</b> " "Remove(BucketId(0x40000000000004d2), priority=127)</li>\n" "</ul>\n")); } @@ -248,17 +248,17 @@ TEST_F(PendingMessageTrackerTest, multiple_messages) { EXPECT_THAT(ost.str(), HasSubstr( "<b>Bucket(BucketSpace(0x0000000000000001), BucketId(0x40000000000004d2))</b>\n" "<ul>\n" - "<li><i>Node 0</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n" - "<li><i>Node 0</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n" - "<li><i>Node 1</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n" - "<li><i>Node 1</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n" + "<li><i>Node 0</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n" + "<li><i>Node 0</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n" + "<li><i>Node 1</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n" + "<li><i>Node 1</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n" "</ul>\n" "<b>Bucket(BucketSpace(0x0000000000000001), BucketId(0x40000000000011d7))</b>\n" "<ul>\n" - "<li><i>Node 0</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n" - "<li><i>Node 0</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n" - "<li><i>Node 1</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n" - "<li><i>Node 1</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n" + "<li><i>Node 0</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n" + "<li><i>Node 0</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n" + "<li><i>Node 1</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n" + "<li><i>Node 1</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n" "</ul>\n")); } { @@ -268,44 +268,23 @@ TEST_F(PendingMessageTrackerTest, multiple_messages) { EXPECT_THAT(ost.str(), HasSubstr( "<b>Node 0 (pending count: 4)</b>\n" "<ul>\n" - "<li><i>Node 0</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n" - "<li><i>Node 0</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n" - "<li><i>Node 0</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n" - "<li><i>Node 0</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n" + "<li><i>Node 0</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n" + "<li><i>Node 0</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n" + "<li><i>Node 0</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n" + "<li><i>Node 0</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n" "</ul>\n" "<b>Node 1 (pending count: 4)</b>\n" "<ul>\n" - "<li><i>Node 1</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n" - "<li><i>Node 1</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n" - "<li><i>Node 1</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n" - "<li><i>Node 1</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n" + "<li><i>Node 1</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n" + "<li><i>Node 1</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n" + "<li><i>Node 1</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n" + "<li><i>Node 1</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n" "</ul>\n")); } } namespace { -template <typename T> -std::string setToString(const std::set<T>& s) -{ - std::ostringstream ost; - ost << '{'; - for (typename std::set<T>::const_iterator i(s.begin()), e(s.end()); - i != e; ++i) - { - if (i != s.begin()) { - ost << ','; - } - ost << *i; - } - ost << '}'; - return ost.str(); -} - -} - -namespace { - class TestChecker : public PendingMessageTracker::Checker { public: @@ -443,7 +422,7 @@ TEST_F(PendingMessageTrackerTest, busy_reply_marks_node_as_busy) { TEST_F(PendingMessageTrackerTest, busy_node_duration_can_be_adjusted) { Fixture f; auto cmd = f.sendPut(RequestBuilder().toNode(0)); - f.tracker().setNodeBusyDuration(std::chrono::seconds(10)); + f.tracker().setNodeBusyDuration(10s); f.sendPutReply(*cmd, RequestBuilder(), api::ReturnCode(api::ReturnCode::BUSY)); EXPECT_TRUE(f.tracker().getNodeInfo().isBusy(0)); f.clock().addSecondsToTime(11); diff --git a/storage/src/tests/visiting/visitormanagertest.cpp b/storage/src/tests/visiting/visitormanagertest.cpp index be4e7270c69..a82514acb03 100644 --- a/storage/src/tests/visiting/visitormanagertest.cpp +++ b/storage/src/tests/visiting/visitormanagertest.cpp @@ -217,7 +217,7 @@ VisitorManagerTest::getSession(uint32_t n) // Wait until we have started the visitor const std::vector<TestVisitorMessageSession*>& sessions(_messageSessionFactory->_visitorSessions); framework::defaultimplementation::RealClock clock; - framework::MilliSecTime endTime(clock.getTimeInMillis() + framework::MilliSecTime(30 * 1000)); + vespalib::steady_time endTime = clock.getMonotonicTime() + 30s; while (true) { { std::lock_guard lock(_messageSessionFactory->_accessLock); @@ -225,9 +225,8 @@ VisitorManagerTest::getSession(uint32_t n) return *sessions[n]; } } - if (clock.getTimeInMillis() > endTime) { - throw vespalib::IllegalStateException( - "Timed out waiting for visitor session", VESPA_STRLOC); + if (clock.getMonotonicTime() > endTime) { + throw vespalib::IllegalStateException("Timed out waiting for visitor session", VESPA_STRLOC); } std::this_thread::sleep_for(10ms); } @@ -255,12 +254,10 @@ VisitorManagerTest::getMessagesAndReply( switch (session.sentMessages[i]->getType()) { case documentapi::DocumentProtocol::MESSAGE_PUTDOCUMENT: - docs.push_back(static_cast<documentapi::PutDocumentMessage&>( - *session.sentMessages[i]).getDocumentSP()); + docs.push_back(static_cast<documentapi::PutDocumentMessage&>(*session.sentMessages[i]).getDocumentSP()); break; case documentapi::DocumentProtocol::MESSAGE_REMOVEDOCUMENT: - docIds.push_back(static_cast<documentapi::RemoveDocumentMessage&>( - *session.sentMessages[i]).getDocumentId()); + docIds.push_back(static_cast<documentapi::RemoveDocumentMessage&>(*session.sentMessages[i]).getDocumentId()); break; default: break; @@ -355,10 +352,7 @@ TEST_F(VisitorManagerTest, normal_usage) { getMessagesAndReply(1, getSession(0), docs, docIds); // All data has been replied to, expecting to get a create visitor reply - ASSERT_NO_FATAL_FAILURE( - verifyCreateVisitorReply(api::ReturnCode::OK, - int(docs.size()), - getTotalSerializedSize(docs))); + ASSERT_NO_FATAL_FAILURE(verifyCreateVisitorReply(api::ReturnCode::OK, int(docs.size()), getTotalSerializedSize(docs))); EXPECT_EQ(1u, getMatchingDocuments(docs)); EXPECT_FALSE(_manager->hasPendingMessageState()); diff --git a/storage/src/tests/visiting/visitortest.cpp b/storage/src/tests/visiting/visitortest.cpp index f3a538b7832..565131b3b99 100644 --- a/storage/src/tests/visiting/visitortest.cpp +++ b/storage/src/tests/visiting/visitortest.cpp @@ -256,11 +256,9 @@ TestVisitorMessageSession& VisitorTest::getSession(uint32_t n) { // Wait until we have started the visitor - const std::vector<TestVisitorMessageSession*>& sessions( - _messageSessionFactory->_visitorSessions); + const std::vector<TestVisitorMessageSession*>& sessions(_messageSessionFactory->_visitorSessions); framework::defaultimplementation::RealClock clock; - framework::MilliSecTime endTime( - clock.getTimeInMillis() + framework::MilliSecTime(30 * 1000)); + vespalib::steady_time endTime = clock.getMonotonicTime() + 30s; while (true) { { std::lock_guard lock(_messageSessionFactory->_accessLock); @@ -268,7 +266,7 @@ VisitorTest::getSession(uint32_t n) return *sessions[n]; } } - if (clock.getTimeInMillis() > endTime) { + if (clock.getMonotonicTime() > endTime) { throw vespalib::IllegalStateException( "Timed out waiting for visitor session", VESPA_STRLOC); } diff --git a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp index e68cbd75d52..2f1622750d7 100644 --- a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp +++ b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp @@ -182,7 +182,7 @@ struct MetricsUpdater { void add(const MetricsUpdater& rhs) noexcept { auto& d = count; - auto& s = rhs.count; + const auto& s = rhs.count; d.buckets += s.buckets; d.docs += s.docs; d.bytes += s.bytes; @@ -209,7 +209,7 @@ BucketManager::updateMetrics(bool updateDocCount) if (!updateDocCount || _doneInitialized) { MetricsUpdater total; - for (auto& space : _component.getBucketSpaceRepo()) { + for (const auto& space : _component.getBucketSpaceRepo()) { MetricsUpdater m; auto guard = space.second->bucketDatabase().acquire_read_guard(); guard->for_each(std::ref(m)); @@ -238,7 +238,7 @@ BucketManager::updateMetrics(bool updateDocCount) } void BucketManager::update_bucket_db_memory_usage_metrics() { - for (auto& space : _component.getBucketSpaceRepo()) { + for (const auto& space : _component.getBucketSpaceRepo()) { auto bm = _metrics->bucket_spaces.find(space.first); bm->second->bucket_db_metrics.memory_usage.update(space.second->bucketDatabase().detailed_memory_usage()); } @@ -342,7 +342,7 @@ BucketManager::reportStatus(std::ostream& out, using vespalib::xml::XmlAttribute; xmlReporter << vespalib::xml::XmlTag("buckets"); - for (auto& space : _component.getBucketSpaceRepo()) { + for (const auto& space : _component.getBucketSpaceRepo()) { xmlReporter << XmlTag("bucket-space") << XmlAttribute("name", document::FixedBucketSpaces::to_string(space.first)); BucketDBDumper dumper(xmlReporter.getStream()); @@ -404,7 +404,7 @@ bool BucketManager::onRequestBucketInfo( api::RequestBucketInfoReply::EntryVector info; if (!cmd->getBuckets().empty()) { for (auto bucketId : cmd->getBuckets()) { - for (auto & entry : _component.getBucketDatabase(bucketSpace).getAll(bucketId, "BucketManager::onRequestBucketInfo")) { + for (const auto & entry : _component.getBucketDatabase(bucketSpace).getAll(bucketId, "BucketManager::onRequestBucketInfo")) { info.emplace_back(entry.first, entry.second->getBucketInfo()); } } @@ -457,7 +457,7 @@ BucketManager::leaveQueueProtectedSection(ScopedQueueDispatchGuard& queueGuard) // may alter the relevant state. --_requestsCurrentlyProcessing; if (_requestsCurrentlyProcessing == 0) { - for (auto& qr : _queuedReplies) { + for (const auto& qr : _queuedReplies) { sendUp(qr); } _queuedReplies.clear(); @@ -494,7 +494,7 @@ BucketManager::processRequestBucketInfoCommands(document::BucketSpace bucketSpac reqs.size(), bucketSpace.toString().c_str(), clusterState->toString().c_str(), our_hash.c_str()); std::lock_guard clusterStateGuard(_clusterStateLock); - for (auto & req : std::ranges::reverse_view(reqs)) { + for (const auto & req : std::ranges::reverse_view(reqs)) { // Currently small requests should not be forwarded to worker thread assert(req->hasSystemState()); const auto their_hash = req->getDistributionHash(); @@ -547,7 +547,7 @@ BucketManager::processRequestBucketInfoCommands(document::BucketSpace bucketSpac std::ostringstream distrList; std::unordered_map<uint16_t, api::RequestBucketInfoReply::EntryVector> result; - for (auto& nodeAndCmd : requests) { + for (const auto& nodeAndCmd : requests) { result[nodeAndCmd.first]; if (LOG_WOULD_LOG(debug)) { distrList << ' ' << nodeAndCmd.first; @@ -576,7 +576,7 @@ BucketManager::processRequestBucketInfoCommands(document::BucketSpace bucketSpac "BucketManager::processRequestBucketInfoCommands-2"); } _metrics->fullBucketInfoLatency.addValue(runStartTime.getElapsedTimeAsDouble()); - for (auto& nodeAndCmd : requests) { + for (const auto& nodeAndCmd : requests) { auto reply(std::make_shared<api::RequestBucketInfoReply>(*nodeAndCmd.second)); reply->getBucketInfo().swap(result[nodeAndCmd.first]); sendUp(reply); diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp index 667afbf67a0..393136de654 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp @@ -15,6 +15,7 @@ LOG_SETUP(".distributor.operation.idealstate.merge"); using vespalib::to_utc; using vespalib::to_string; +using vespalib::make_string_short::fmt; namespace storage::distributor { MergeOperation::~MergeOperation() = default; @@ -24,8 +25,7 @@ MergeOperation::getStatus() const { return Operation::getStatus() + - vespalib::make_string(" . Sent MergeBucketCommand at %s", - to_string(to_utc(_sentMessageTime)).c_str()); + fmt(" . Sent MergeBucketCommand at %s", to_string(to_utc(_sentMessageTime)).c_str()); } void @@ -35,7 +35,7 @@ MergeOperation::addIdealNodes( std::vector<MergeMetaData>& result) { // Add all ideal nodes first. These are never marked source-only. - for (unsigned short idealNode : idealNodes) { + for (uint16_t idealNode : idealNodes) { const MergeMetaData* entry = nullptr; for (const auto & node : nodes) { if (idealNode == node._nodeIndex) { @@ -56,7 +56,7 @@ MergeOperation::addCopiesNotAlreadyAdded(uint16_t redundancy, const std::vector<MergeMetaData>& nodes, std::vector<MergeMetaData>& result) { - for (auto node : nodes) { + for (const auto & node : nodes) { bool found = false; for (const auto & mergeData : result) { if (mergeData._nodeIndex == node._nodeIndex) { @@ -123,7 +123,7 @@ MergeOperation::onStart(DistributorStripeMessageSender& sender) std::vector<std::unique_ptr<BucketCopy> > newCopies; std::vector<MergeMetaData> nodes; - for (unsigned short node : getNodes()) { + for (uint16_t node : getNodes()) { const BucketCopy* copy = entry->getNode(node); if (copy == nullptr) { // New copies? newCopies.emplace_back(std::make_unique<BucketCopy>(BucketCopy::recentlyCreatedCopy(0, node))); @@ -153,8 +153,7 @@ MergeOperation::onStart(DistributorStripeMessageSender& sender) msg->set_use_unordered_forwarding(true); } - LOG(debug, "Sending %s to storage node %u", msg->toString().c_str(), - _mnodes[0].index); + LOG(debug, "Sending %s to storage node %u", msg->toString().c_str(), _mnodes[0].index); // Set timeout to one hour to prevent hung nodes that manage to keep // connections open from stalling merges in the cluster indefinitely. @@ -165,8 +164,7 @@ MergeOperation::onStart(DistributorStripeMessageSender& sender) _sentMessageTime = _manager->node_context().clock().getMonotonicTime(); } else { - LOGBP(debug, - "Unable to merge bucket %s, since only one copy is available. System state %s", + LOGBP(debug, "Unable to merge bucket %s, since only one copy is available. System state %s", getBucketId().toString().c_str(), clusterState.toString().c_str()); _ok = false; done(); @@ -178,7 +176,7 @@ MergeOperation::sourceOnlyCopyChangedDuringMerge( const BucketDatabase::Entry& currentState) const { assert(currentState.valid()); - for (auto mnode : _mnodes) { + for (const auto & mnode : _mnodes) { const BucketCopy* copyBefore(_infoBefore.getNode(mnode.index)); if (!copyBefore) { continue; @@ -206,7 +204,7 @@ MergeOperation::deleteSourceOnlyNodes( { assert(currentState.valid()); std::vector<uint16_t> sourceOnlyNodes; - for (auto & mnode : _mnodes) { + for (const auto & mnode : _mnodes) { const uint16_t nodeIndex = mnode.index; const BucketCopy* copy = currentState->getNode(nodeIndex); if (!copy) { @@ -338,7 +336,7 @@ bool MergeOperation::isBlocked(const DistributorStripeOperationContext& ctx, // to enter the merge throttler queues, displacing lower priority merges. if (!is_global_bucket_merge()) { const auto& node_info = ctx.pending_message_tracker().getNodeInfo(); - for (auto node : getNodes()) { + for (uint16_t node : getNodes()) { if (node_info.isBusy(node)) { return true; } @@ -364,11 +362,9 @@ bool MergeOperation::all_involved_nodes_support_unordered_merge_chaining() const MergeBucketMetricSet* MergeOperation::get_merge_metrics() { - if (_manager) { - return dynamic_cast<MergeBucketMetricSet *>(_manager->getMetrics().operations[getType()].get()); - } else { - return nullptr; - } + return (_manager) + ? dynamic_cast<MergeBucketMetricSet *>(_manager->getMetrics().operations[getType()].get()) + : nullptr; } } diff --git a/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp b/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp index 533493a79a2..8618d570685 100644 --- a/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp +++ b/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp @@ -3,7 +3,6 @@ #include <vespa/vespalib/stllike/asciistream.h> #include <vespa/vespalib/util/stringfmt.h> #include <map> -#include <algorithm> #include <vespa/log/log.h> LOG_SETUP(".pendingmessages"); @@ -15,7 +14,7 @@ PendingMessageTracker::PendingMessageTracker(framework::ComponentRegister& cr, u vespalib::make_string("Pending messages to storage nodes (stripe %u)", stripe_index)), _component(cr, "pendingmessagetracker"), _nodeInfo(_component.getClock()), - _nodeBusyDuration(60), + _nodeBusyDuration(60s), _deferred_read_tasks(), _lock() { @@ -38,7 +37,7 @@ vespalib::string PendingMessageTracker::MessageEntry::toHtml() const { vespalib::asciistream ss; ss << "<li><i>Node " << nodeIdx << "</i>: " - << "<b>" << framework::MilliSecTime(timeStamp.count()).toString() << "</b> " + << "<b>" << vespalib::to_string(timeStamp) << "</b> " << api::MessageType::get(api::MessageType::Id(msgType)).getName() << "(" << bucket.getBucketId() << ", priority=" << priority << ")</li>\n"; return ss.str(); } @@ -46,7 +45,7 @@ PendingMessageTracker::MessageEntry::toHtml() const { PendingMessageTracker::TimePoint PendingMessageTracker::currentTime() const { - return TimePoint(_component.getClock().getTimeInMillis().getTime()); + return _component.getClock().getSystemTime(); } namespace { diff --git a/storage/src/vespa/storage/distributor/pendingmessagetracker.h b/storage/src/vespa/storage/distributor/pendingmessagetracker.h index 93238b5a83f..fb672d5ee31 100644 --- a/storage/src/vespa/storage/distributor/pendingmessagetracker.h +++ b/storage/src/vespa/storage/distributor/pendingmessagetracker.h @@ -68,13 +68,7 @@ public: virtual bool check(uint32_t messageType, uint16_t node, uint8_t priority) = 0; }; - /** - * Time point represented as the millisecond interval from the framework - * clock's epoch to a given point in time. Note that it'd be more - * semantically correct to use std::chrono::time_point, but it is bound - * to specific chrono clock types, their epochs and duration resolution. - */ - using TimePoint = std::chrono::milliseconds; + using TimePoint = vespalib::system_time; PendingMessageTracker(framework::ComponentRegister&, uint32_t stripe_index); ~PendingMessageTracker() override; @@ -119,8 +113,8 @@ public: */ std::vector<uint64_t> clearMessagesForNode(uint16_t node); - void setNodeBusyDuration(std::chrono::seconds secs) noexcept { - _nodeBusyDuration = secs; + void setNodeBusyDuration(vespalib::duration duration) noexcept { + _nodeBusyDuration = duration; } void run_once_no_pending_for_bucket(const document::Bucket& bucket, std::unique_ptr<DeferredTask> task); @@ -136,7 +130,7 @@ private: MessageEntry(TimePoint timeStamp, uint32_t msgType, uint32_t priority, uint64_t msgId, document::Bucket bucket, uint16_t nodeIdx) noexcept; - vespalib::string toHtml() const; + [[nodiscard]] vespalib::string toHtml() const; }; struct MessageIdKey : boost::multi_index::member<MessageEntry, uint64_t, &MessageEntry::msgId> {}; @@ -187,7 +181,7 @@ private: Messages _messages; framework::Component _component; NodeInfo _nodeInfo; - std::chrono::seconds _nodeBusyDuration; + vespalib::duration _nodeBusyDuration; DeferredBucketTaskMap _deferred_read_tasks; // Since distributor is currently single-threaded, this will only diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp index ec22d7c064e..db88a22d500 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp +++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp @@ -55,7 +55,7 @@ vespalib::string getNodeId(StorageComponent& sc) { return ost.str(); } -vespalib::duration TEN_MINUTES = 600s; +constexpr vespalib::duration STALE_PROTOCOL_LIFETIME = 1h; } @@ -694,7 +694,7 @@ CommunicationManager::run(framework::ThreadHandle& thread) std::lock_guard<std::mutex> guard(_earlierGenerationsLock); for (auto it(_earlierGenerations.begin()); !_earlierGenerations.empty() && - ((it->first + TEN_MINUTES) < _component.getClock().getMonotonicTime()); + ((it->first + STALE_PROTOCOL_LIFETIME) < _component.getClock().getMonotonicTime()); it = _earlierGenerations.begin()) { _earlierGenerations.erase(it); @@ -709,9 +709,8 @@ CommunicationManager::updateMetrics(const MetricLockGuard &) } void -CommunicationManager::print(std::ostream& out, bool verbose, const std::string& indent) const +CommunicationManager::print(std::ostream& out, bool , const std::string& ) const { - (void) verbose; (void) indent; out << "CommunicationManager"; } diff --git a/storage/src/vespa/storage/visiting/visitor.cpp b/storage/src/vespa/storage/visiting/visitor.cpp index 91f304ad9a0..6d36abc896e 100644 --- a/storage/src/vespa/storage/visiting/visitor.cpp +++ b/storage/src/vespa/storage/visiting/visitor.cpp @@ -121,12 +121,9 @@ Visitor::VisitorTarget::metaForMessageId(uint64_t msgId) void Visitor::VisitorTarget::discardQueuedMessages() { - for (MessageQueue::iterator - it(_queuedMessages.begin()), e(_queuedMessages.end()); - it != e; ++it) - { - LOG(spam, "Erasing queued message with id %" PRIu64, it->second); - releaseMetaForMessageId(it->second); + for (const auto & entry : _queuedMessages) { + LOG(spam, "Erasing queued message with id %" PRIu64, entry.second); + releaseMetaForMessageId(entry.second); } _queuedMessages.clear(); } @@ -310,17 +307,14 @@ Visitor::getStateName(VisitorState s) return "COMPLETED"; default: assert(!"Unknown visitor state"); - return NULL; + return nullptr; } } Visitor::VisitorState Visitor::transitionTo(VisitorState newState) { - LOG(debug, "Visitor '%s' state transition %s -> %s", - _id.c_str(), - getStateName(_state), - getStateName(newState)); + LOG(debug, "Visitor '%s' state transition %s -> %s", _id.c_str(), getStateName(_state), getStateName(newState)); VisitorState oldState = _state; _state = newState; return oldState; @@ -339,12 +333,10 @@ Visitor::mayTransitionToCompleted() const void Visitor::forceClose() { - for (std::list<BucketIterationState*>::iterator it = _bucketStates.begin(); - it != _bucketStates.end(); ++it) - { + for (auto * state : _bucketStates) { // Reset iterator id so no destroy iterator will be sent - (*it)->setIteratorId(spi::IteratorId(0)); - delete *it; + state->setIteratorId(spi::IteratorId(0)); + delete state; } _bucketStates.clear(); transitionTo(STATE_COMPLETED); @@ -358,7 +350,7 @@ Visitor::sendReplyOnce() std::shared_ptr<api::StorageReply> reply(_initiatingCmd->makeReply()); _hitCounter->updateVisitorStatistics(_visitorStatistics); - static_cast<api::CreateVisitorReply*>(reply.get())->setVisitorStatistics(_visitorStatistics); + dynamic_cast<api::CreateVisitorReply*>(reply.get())->setVisitorStatistics(_visitorStatistics); if (shouldAddMbusTrace()) { _trace.moveTraceTo(reply->getTrace()); } @@ -373,17 +365,15 @@ void Visitor::finalize() { if (_state != STATE_COMPLETED) { - LOG(error, "Attempting to finalize non-completed visitor %s", - _id.c_str()); + LOG(error, "Attempting to finalize non-completed visitor %s", _id.c_str()); assert(false); } assert(_bucketStates.empty()); if (_result.success()) { - if (_messageSession->pending() > 0) - { + if (_messageSession->pending() > 0) { _result = api::ReturnCode(api::ReturnCode::ABORTED); - try{ + try { abortedVisiting(); } catch (std::exception& e) { LOG(warning, "Visitor %s had a problem in abortVisiting(). As " @@ -404,43 +394,31 @@ Visitor::finalize() void Visitor::discardAllNoPendingBucketStates() { - for (BucketStateList::iterator - it(_bucketStates.begin()), e(_bucketStates.end()); - it != e;) - { + for (auto it = _bucketStates.begin(); it !=_bucketStates.end();) { BucketIterationState& bstate(**it); if (bstate.hasPendingControlCommand() || bstate.hasPendingIterators()) { - LOG(debug, - "Visitor '%s' not discarding bucket state %s " - "since it has pending operations", - _id.c_str(), - bstate.toString().c_str()); + LOG(debug, "Visitor '%s' not discarding bucket state %s since it has pending operations", + _id.c_str(), bstate.toString().c_str()); ++it; continue; } - LOG(debug, "Visitor '%s' discarding bucket state %s", - _id.c_str(), bstate.toString().c_str()); + LOG(debug, "Visitor '%s' discarding bucket state %s", _id.c_str(), bstate.toString().c_str()); delete *it; it = _bucketStates.erase(it); } } void -Visitor::fail(const api::ReturnCode& reason, - bool overrideExistingError) +Visitor::fail(const api::ReturnCode& reason, bool overrideExistingError) { assert(_state != STATE_COMPLETED); if (_result.getResult() < reason.getResult() || overrideExistingError) { - LOG(debug, "Setting result of visitor '%s' to %s", - _id.c_str(), reason.toString().c_str()); + LOG(debug, "Setting result of visitor '%s' to %s", _id.c_str(), reason.toString().c_str()); _result = reason; } if (_visitorTarget.hasQueuedMessages()) { - LOG(debug, "Visitor '%s' dropping %zu queued messages bound to %s " - "since visitor has failed", - _id.c_str(), - _visitorTarget._queuedMessages.size(), - _controlDestination->toString().c_str()); + LOG(debug, "Visitor '%s' dropping %zu queued messages bound to %s since visitor has failed", + _id.c_str(), _visitorTarget._queuedMessages.size(), _controlDestination->toString().c_str()); _visitorTarget.discardQueuedMessages(); } discardAllNoPendingBucketStates(); @@ -448,8 +426,7 @@ Visitor::fail(const api::ReturnCode& reason, } bool -Visitor::shouldReportProblemToClient(const api::ReturnCode& code, - size_t retryCount) const +Visitor::shouldReportProblemToClient(const api::ReturnCode& code, size_t retryCount) { // Report _once_ per message if we reach a certain retry threshold. if (retryCount == TRANSIENT_ERROR_RETRIES_BEFORE_NOTIFY) { @@ -521,7 +498,7 @@ Visitor::start(api::VisitorId id, api::StorageMessage::Id cmdId, _visitorOptions._fromTime = fromTimestamp; _visitorOptions._toTime = toTimestamp; _currentBucket = 0; - _hitCounter.reset(new HitCounter()); + _hitCounter = std::make_unique<HitCounter>(); _messageSession = std::move(messageSession); _documentPriority = documentPriority; @@ -612,8 +589,7 @@ Visitor::handleDocumentApiReply(mbus::Reply::UP reply, VisitorThreadMetrics& met uint64_t messageId = reply->getContext().value.UINT64; uint32_t removed = _visitorTarget._pendingMessages.erase(messageId); - LOG(spam, "Visitor '%s' reply %s for message ID %" PRIu64, _id.c_str(), - reply->toString().c_str(), messageId); + LOG(spam, "Visitor '%s' reply %s for message ID %" PRIu64, _id.c_str(), reply->toString().c_str(), messageId); assert(removed == 1); (void) removed; @@ -634,20 +610,16 @@ Visitor::handleDocumentApiReply(mbus::Reply::UP reply, VisitorThreadMetrics& met metrics.visitorDestinationFailureReplies.inc(); if (message->getType() == documentapi::DocumentProtocol::MESSAGE_VISITORINFO) { - LOG(debug, "Aborting visitor as we failed to talk to controller: %s", - reply->getError(0).toString().c_str()); - api::ReturnCode returnCode( - static_cast<api::ReturnCode::Result>( - reply->getError(0).getCode()), - reply->getError(0).getMessage()); + LOG(debug, "Aborting visitor as we failed to talk to controller: %s", reply->getError(0).toString().c_str()); + api::ReturnCode returnCode(static_cast<api::ReturnCode::Result>(reply->getError(0).getCode()), + reply->getError(0).getMessage()); fail(returnCode, true); close(); return; } - api::ReturnCode returnCode( - static_cast<api::ReturnCode::Result>(reply->getError(0).getCode()), - reply->getError(0).getMessage()); + api::ReturnCode returnCode(static_cast<api::ReturnCode::Result>(reply->getError(0).getCode()), + reply->getError(0).getMessage()); const bool should_fail = remap_docapi_message_error_code(returnCode); if (should_fail) { // Abort - something is wrong with target. @@ -657,8 +629,7 @@ Visitor::handleDocumentApiReply(mbus::Reply::UP reply, VisitorThreadMetrics& met } if (failed()) { - LOG(debug, "Failed to send message from visitor '%s', due to " - "%s. Not resending since visitor has failed", + LOG(debug, "Failed to send message from visitor '%s', due to %s. Not resending since visitor has failed", _id.c_str(), returnCode.toString().c_str()); return; } @@ -709,8 +680,7 @@ Visitor::onCreateIteratorReply( if (reply->getResult().failed()) { LOG(debug, "Failed to create iterator for bucket %s: %s", - bucketId.toString().c_str(), - reply->getResult().toString().c_str()); + bucketId.toString().c_str(), reply->getResult().toString().c_str()); fail(reply->getResult()); delete *it; _bucketStates.erase((++it).base()); @@ -718,17 +688,14 @@ Visitor::onCreateIteratorReply( } bucketState.setIteratorId(reply->getIteratorId()); if (failed()) { - LOG(debug, "Create iterator for bucket %s is OK, " - "but visitor has failed: %s", - bucketId.toString().c_str(), - _result.toString().c_str()); + LOG(debug, "Create iterator for bucket %s is OK, but visitor has failed: %s", + bucketId.toString().c_str(), _result.toString().c_str()); delete *it; _bucketStates.erase((++it).base()); return; } - LOG(debug, "Visitor '%s' starting to visit bucket %s.", - _id.c_str(), bucketId.toString().c_str()); + LOG(debug, "Visitor '%s' starting to visit bucket %s.", _id.c_str(), bucketId.toString().c_str()); auto cmd = std::make_shared<GetIterCommand>(bucket, bucketState.getIteratorId(), _docBlockSize); cmd->getTrace().setLevel(_traceLevel); cmd->setPriority(_priority); @@ -737,13 +704,10 @@ Visitor::onCreateIteratorReply( } void -Visitor::onGetIterReply(const std::shared_ptr<GetIterReply>& reply, - VisitorThreadMetrics& metrics) +Visitor::onGetIterReply(const std::shared_ptr<GetIterReply>& reply, VisitorThreadMetrics& metrics) { LOG(debug, "Visitor '%s' got get iter reply for bucket %s: %s", - _id.c_str(), - reply->getBucketId().toString().c_str(), - reply->getResult().toString().c_str()); + _id.c_str(), reply->getBucketId().toString().c_str(), reply->getResult().toString().c_str()); auto it = _bucketStates.rbegin(); // New requests will be pushed on end of list.. So searching @@ -763,10 +727,8 @@ Visitor::onGetIterReply(const std::shared_ptr<GetIterReply>& reply, !reply->getResult().isShutdownRelated() && !reply->getResult().isBucketDisappearance()) { - LOG(warning, "Failed to talk to persistence layer for bucket " - "%s. Aborting visitor '%s': %s", - reply->getBucketId().toString().c_str(), - _id.c_str(), reply->getResult().toString().c_str()); + LOG(warning, "Failed to talk to persistence layer for bucket %s. Aborting visitor '%s': %s", + reply->getBucketId().toString().c_str(), _id.c_str(), reply->getResult().toString().c_str()); } fail(reply->getResult()); BucketIterationState& bucketState(**it); @@ -783,17 +745,14 @@ Visitor::onGetIterReply(const std::shared_ptr<GetIterReply>& reply, bucketState.setCompleted(reply->isCompleted()); --bucketState._pendingIterators; if (!reply->getEntries().empty()) { - LOG(debug, "Processing documents in handle given from bucket %s.", - reply->getBucketId().toString().c_str()); + LOG(debug, "Processing documents in handle given from bucket %s.", reply->getBucketId().toString().c_str()); // While handling documents we should not keep locks, such // that visitor may process several things at once. if (isRunning()) { MBUS_TRACE(reply->getTrace(), 5, vespalib::make_string("Visitor %s handling block of %zu documents.", _id.c_str(), reply->getEntries().size())); - LOG(debug, "Visitor %s handling block of %zu documents.", - _id.c_str(), - reply->getEntries().size()); + LOG(debug, "Visitor %s handling block of %zu documents.", _id.c_str(), reply->getEntries().size()); try { framework::MilliSecTimer processingTimer(_component.getClock()); handleDocuments(reply->getBucketId(), reply->getEntries(), *_hitCounter); @@ -913,15 +872,11 @@ Visitor::continueVisitor() } } - LOG(debug, "No pending messages, tagging visitor '%s' complete", - _id.c_str()); + LOG(debug, "No pending messages, tagging visitor '%s' complete", _id.c_str()); transitionTo(STATE_COMPLETED); } else { - LOG(debug, "Visitor %s waiting for all commands to be replied to " - "(pending=%zu, queued=%zu)", - _id.c_str(), - _visitorTarget._pendingMessages.size(), - _visitorTarget._queuedMessages.size()); + LOG(debug, "Visitor %s waiting for all commands to be replied to (pending=%zu, queued=%zu)", + _id.c_str(), _visitorTarget._pendingMessages.size(), _visitorTarget._queuedMessages.size()); } return false; } else { @@ -981,14 +936,14 @@ Visitor::getStatus(std::ostream& out, bool verbose) const << (_visitorOptions._visitRemoves ? "true" : "false") << "</td></tr>\n"; out << "<tr><td>Control destination</td><td>"; - if (_controlDestination.get()) { + if (_controlDestination) { out << xml_content_escaped(_controlDestination->toString()); } else { out << "nil"; } out << "</td></tr>\n"; out << "<tr><td>Data destination</td><td>"; - if (_dataDestination.get()) { + if (_dataDestination) { out << xml_content_escaped(_dataDestination->toString()); } else { out << "nil"; @@ -1078,17 +1033,13 @@ Visitor::getStatus(std::ostream& out, bool verbose) const bool Visitor::getIterators() { - LOG(debug, "getIterators, visitor %s, _buckets = %zu , _bucketStates = %zu, " - "_currentBucket = %d", - _id.c_str(), _buckets.size(), - _bucketStates.size(), _currentBucket); + LOG(debug, "getIterators, visitor %s, _buckets = %zu , _bucketStates = %zu, _currentBucket = %d", + _id.c_str(), _buckets.size(), _bucketStates.size(), _currentBucket); // Don't send any further GetIters if we're closing if (!isRunning()) { if (hasPendingIterators()) { - LOG(debug, "Visitor has failed but waiting for %zu " - "buckets to finish processing", - _bucketStates.size()); + LOG(debug, "Visitor has failed but waiting for %zu buckets to finish processing", _bucketStates.size()); return true; } else { return false; @@ -1097,13 +1048,10 @@ Visitor::getIterators() // Go through buckets found. Take the first that doesn't have requested // state and request a new piece. - for (std::list<BucketIterationState*>::iterator it = _bucketStates.begin(); - it != _bucketStates.end();) - { + for (auto it = _bucketStates.begin();it != _bucketStates.end();) { assert(*it); BucketIterationState& bucketState(**it); - if ((bucketState._pendingIterators - >= _visitorOptions._maxParallelOneBucket) + if ((bucketState._pendingIterators >= _visitorOptions._maxParallelOneBucket) || bucketState.hasPendingControlCommand()) { ++it; @@ -1118,20 +1066,17 @@ Visitor::getIterators() } try{ completedBucket(bucketState.getBucketId(), *_hitCounter); - _visitorStatistics.setBucketsVisited( - _visitorStatistics.getBucketsVisited() + 1); + _visitorStatistics.setBucketsVisited(_visitorStatistics.getBucketsVisited() + 1); } catch (std::exception& e) { std::ostringstream ost; - ost << "Visitor fail to run completedBucket() notification: " - << e.what(); + ost << "Visitor fail to run completedBucket() notification: " << e.what(); reportProblem(ost.str()); } delete *it; it = _bucketStates.erase(it); continue; } - auto cmd = std::make_shared<GetIterCommand>( - bucketState.getBucket(), bucketState.getIteratorId(), _docBlockSize); + auto cmd = std::make_shared<GetIterCommand>(bucketState.getBucket(), bucketState.getIteratorId(), _docBlockSize); cmd->getTrace().setLevel(_traceLevel); cmd->setPriority(_priority); _messageHandler->send(cmd, *this); @@ -1143,7 +1088,7 @@ Visitor::getIterators() } // If there aren't anymore buckets to iterate, we're done - if (_bucketStates.size() == 0 && _currentBucket >= _buckets.size()) { + if (_bucketStates.empty() && _currentBucket >= _buckets.size()) { LOG(debug, "No more buckets to visit for visitor '%s'.", _id.c_str()); return false; } @@ -1157,17 +1102,13 @@ Visitor::getIterators() _currentBucket < _buckets.size()) { document::Bucket bucket(_bucketSpace, _buckets[_currentBucket]); - std::unique_ptr<BucketIterationState> newBucketState( - new BucketIterationState(*this, *_messageHandler, bucket)); + auto newBucketState = std::make_unique<BucketIterationState>(*this, *_messageHandler, bucket); LOG(debug, "Visitor '%s': Sending create iterator for bucket %s.", _id.c_str(), bucket.getBucketId().toString().c_str()); - spi::Selection selection - = spi::Selection(spi::DocumentSelection(_documentSelectionString)); - selection.setFromTimestamp( - spi::Timestamp(_visitorOptions._fromTime.getTime())); - selection.setToTimestamp( - spi::Timestamp(_visitorOptions._toTime.getTime())); + spi::Selection selection = spi::Selection(spi::DocumentSelection(_documentSelectionString)); + selection.setFromTimestamp(spi::Timestamp(_visitorOptions._fromTime.getTime())); + selection.setToTimestamp(spi::Timestamp(_visitorOptions._toTime.getTime())); auto cmd = std::make_shared<CreateIteratorCommand>(bucket, selection,_visitorOptions._fieldSet, _visitorOptions._visitRemoves @@ -1184,8 +1125,7 @@ Visitor::getIterators() } if (sentCount == 0) { if (LOG_WOULD_LOG(debug)) { - LOG(debug, "Enough iterators being processed. Doing nothing for " - "visitor '%s' bucketStates = %zu.", + LOG(debug, "Enough iterators being processed. Doing nothing for visitor '%s' bucketStates = %zu.", _id.c_str(), _bucketStates.size()); for (const auto& state : _bucketStates) { LOG(debug, "Existing: %s", state->toString().c_str()); diff --git a/storage/src/vespa/storage/visiting/visitor.h b/storage/src/vespa/storage/visiting/visitor.h index 0737c5612c0..9b6d8e348b9 100644 --- a/storage/src/vespa/storage/visiting/visitor.h +++ b/storage/src/vespa/storage/visiting/visitor.h @@ -136,28 +136,24 @@ private: {} /** Sends DestroyIterator over _messageHandler if _iteratorId != 0 */ - ~BucketIterationState(); + ~BucketIterationState() override; void setCompleted(bool completed = true) { _completed = completed; } - bool isCompleted() const { return _completed; } + [[nodiscard]] bool isCompleted() const { return _completed; } - document::Bucket getBucket() const { return _bucket; } - document::BucketId getBucketId() const { return _bucket.getBucketId(); } + [[nodiscard]] document::Bucket getBucket() const { return _bucket; } + [[nodiscard]] document::BucketId getBucketId() const { return _bucket.getBucketId(); } void setIteratorId(spi::IteratorId iteratorId) { _iteratorId = iteratorId; } - spi::IteratorId getIteratorId() const { return _iteratorId; } + [[nodiscard]] spi::IteratorId getIteratorId() const { return _iteratorId; } - void setPendingControlCommand() { - _iteratorId = spi::IteratorId(0); - } - - bool hasPendingControlCommand() const { + [[nodiscard]] bool hasPendingControlCommand() const { return _iteratorId == spi::IteratorId(0); } - bool hasPendingIterators() const { return _pendingIterators > 0; } + [[nodiscard]] bool hasPendingIterators() const { return _pendingIterators > 0; } void print(std::ostream& out, bool, const std::string& ) const override { out << "BucketIterationState(" @@ -247,12 +243,10 @@ private: MessageMeta releaseMetaForMessageId(uint64_t msgId); void reinsertMeta(MessageMeta); - bool hasQueuedMessages() const { return !_queuedMessages.empty(); } + [[nodiscard]] bool hasQueuedMessages() const { return !_queuedMessages.empty(); } void discardQueuedMessages(); - uint32_t getMemoryUsage() const noexcept { - return _memoryUsage; - } + [[nodiscard]] uint32_t getMemoryUsage() const noexcept { return _memoryUsage; } VisitorTarget(); ~VisitorTarget(); @@ -326,9 +320,9 @@ protected: std::string _documentSelectionString; vdslib::VisitorStatistics _visitorStatistics; - bool isCompletedCalled() const { return _calledCompletedVisitor; } + [[nodiscard]] bool isCompletedCalled() const { return _calledCompletedVisitor; } - uint32_t traceLevel() const noexcept { return _traceLevel; } + [[nodiscard]] uint32_t traceLevel() const noexcept { return _traceLevel; } /** * Attempts to add the given trace message to the internal, memory bounded @@ -339,7 +333,7 @@ protected: */ bool addBoundedTrace(uint32_t level, const vespalib::string& message); - const vdslib::Parameters& visitor_parameters() const noexcept; + [[nodiscard]] const vdslib::Parameters& visitor_parameters() const noexcept; // Possibly modifies the ReturnCode parameter in-place if its return code should // be changed based on visitor subclass-specific behavior. @@ -417,7 +411,7 @@ public: * The consistency level provided here is propagated through the SPI * Context object for createIterator calls. */ - virtual spi::ReadConsistency getRequiredReadConsistency() const { + [[nodiscard]] virtual spi::ReadConsistency getRequiredReadConsistency() const { return spi::ReadConsistency::STRONG; } @@ -428,8 +422,7 @@ public: /** * Used to silence transient errors that can happen during normal operation. */ - bool shouldReportProblemToClient(const api::ReturnCode&, - size_t retryCount) const; + [[nodiscard]] static bool shouldReportProblemToClient(const api::ReturnCode&, size_t retryCount) ; /** Called to send report to client of potential non-critical problems. */ void reportProblem(const std::string& problem); @@ -492,18 +485,16 @@ public: void getStatus(std::ostream& out, bool verbose) const; - void setMaxParallel(uint32_t maxParallel) - { _visitorOptions._maxParallel = maxParallel; } - void setMaxParallelPerBucket(uint32_t max) - { _visitorOptions._maxParallelOneBucket = max; } + void setMaxParallel(uint32_t maxParallel) { _visitorOptions._maxParallel = maxParallel; } + void setMaxParallelPerBucket(uint32_t max) { _visitorOptions._maxParallelOneBucket = max; } /** * Sends a message to the data handler for this visitor. */ void sendMessage(std::unique_ptr<documentapi::DocumentMessage> documentMessage); - bool isRunning() const { return _state == STATE_RUNNING; } - bool isCompleted() const { return _state == STATE_COMPLETED; } + [[nodiscard]] bool isRunning() const { return _state == STATE_RUNNING; } + [[nodiscard]] bool isCompleted() const { return _state == STATE_COMPLETED; } private: /** @@ -542,11 +533,9 @@ private: void sendReplyOnce(); - bool hasFailedVisiting() const { return _result.failed(); } - - bool hasPendingIterators() const { return !_bucketStates.empty(); } - - bool mayTransitionToCompleted() const; + [[nodiscard]] bool hasFailedVisiting() const { return _result.failed(); } + [[nodiscard]] bool hasPendingIterators() const { return !_bucketStates.empty(); } + [[nodiscard]] bool mayTransitionToCompleted() const; void discardAllNoPendingBucketStates(); @@ -565,9 +554,7 @@ private: * * Precondition: attach() must have been called on `this`. */ - bool shouldAddMbusTrace() const noexcept { - return _traceLevel != 0; - } + [[nodiscard]] bool shouldAddMbusTrace() const noexcept { return _traceLevel != 0; } /** * Set internal state to the given state value. diff --git a/storage/src/vespa/storage/visiting/visitormanager.cpp b/storage/src/vespa/storage/visiting/visitormanager.cpp index a03b9a9a8a3..07938002746 100644 --- a/storage/src/vespa/storage/visiting/visitormanager.cpp +++ b/storage/src/vespa/storage/visiting/visitormanager.cpp @@ -187,9 +187,8 @@ VisitorManager::configure(std::unique_ptr<vespa::config::content::core::StorVisi for (int32_t i=0; i<config->visitorthreads; ++i) { _visitorThread.emplace_back( // Naked new due to a lot of private inheritance in VisitorThread and VisitorManager - std::shared_ptr<VisitorThread>( - new VisitorThread(i, _componentRegister, _messageSessionFactory, - _visitorFactories, *_metrics->threads[i], *this)), + std::shared_ptr<VisitorThread>(new VisitorThread(i, _componentRegister, _messageSessionFactory, + _visitorFactories, *_metrics->threads[i], *this)), std::map<api::VisitorId, std::string>()); } } @@ -450,8 +449,7 @@ VisitorManager::processReply(const std::shared_ptr<api::StorageReply>& reply) } void -VisitorManager::send(const std::shared_ptr<api::StorageCommand>& cmd, - Visitor& visitor) +VisitorManager::send(const std::shared_ptr<api::StorageCommand>& cmd, Visitor& visitor) { assert(cmd->getType() == api::MessageType::INTERNAL); // Only add to internal state if not destroy iterator command, as @@ -460,7 +458,7 @@ VisitorManager::send(const std::shared_ptr<api::StorageCommand>& cmd, if (static_cast<const api::InternalCommand&>(*cmd).getType() != DestroyIteratorCommand::ID) { MessageInfo inf; inf.id = visitor.getVisitorId(); - inf.timestamp = _component.getClock().getTimeInSeconds().getTime(); + inf.timestamp = _component.getClock().getSystemTime(); inf.timeout = cmd->getTimeout(); if (cmd->getAddress()) { @@ -623,7 +621,7 @@ VisitorManager::reportHtmlStatus(std::ostream& out, out << "<tr>" << "<td>" << entry.first << "</td>" << "<td>" << entry.second.id << "</td>" - << "<td>" << entry.second.timestamp << "</td>" + << "<td>" << vespalib::to_string(entry.second.timestamp) << "</td>" << "<td>" << vespalib::count_ms(entry.second.timeout) << "</td>" << "<td>" << xml_content_escaped(entry.second.destination) << "</td>" << "</tr>\n"; diff --git a/storage/src/vespa/storage/visiting/visitormanager.h b/storage/src/vespa/storage/visiting/visitormanager.h index 33703b392bc..3e331e1c9a2 100644 --- a/storage/src/vespa/storage/visiting/visitormanager.h +++ b/storage/src/vespa/storage/visiting/visitormanager.h @@ -57,7 +57,7 @@ private: struct MessageInfo { api::VisitorId id; - time_t timestamp; + vespalib::system_time timestamp; vespalib::duration timeout; std::string destination; }; @@ -168,9 +168,7 @@ private: * by the formula: fixed + variable * ((255 - priority) / 255) */ uint32_t maximumConcurrent(const api::CreateVisitorCommand& cmd) const { - return _maxFixedConcurrentVisitors + static_cast<uint32_t>( - _maxVariableConcurrentVisitors - * ((255.0 - cmd.getPriority()) / 255.0)); + return _maxFixedConcurrentVisitors + static_cast<uint32_t>(_maxVariableConcurrentVisitors * ((255.0 - cmd.getPriority()) / 255.0)); } void updateMetrics(const MetricLockGuard &) override; diff --git a/storage/src/vespa/storage/visiting/visitorthread.cpp b/storage/src/vespa/storage/visiting/visitorthread.cpp index 55ef83ba658..e3ebef3a3ef 100644 --- a/storage/src/vespa/storage/visiting/visitorthread.cpp +++ b/storage/src/vespa/storage/visiting/visitorthread.cpp @@ -126,10 +126,10 @@ VisitorThread::shutdown() if (event._message.get()) { if (!event._message->getType().isReply() && (event._message->getType() != api::MessageType::INTERNAL - || static_cast<const api::InternalCommand&>(*event._message).getType() != PropagateVisitorConfig::ID)) + || dynamic_cast<const api::InternalCommand&>(*event._message).getType() != PropagateVisitorConfig::ID)) { std::shared_ptr<api::StorageReply> reply( - static_cast<api::StorageCommand&>(*event._message).makeReply()); + dynamic_cast<api::StorageCommand&>(*event._message).makeReply()); reply->setResult(api::ReturnCode(api::ReturnCode::ABORTED, "Shutting down storage node.")); _messageSender.send(reply); } @@ -197,7 +197,7 @@ VisitorThread::run(framework::ThreadHandle& thread) // disappear when no visiting is done) if (entry._message.get() && (entry._message->getType() != api::MessageType::INTERNAL - || static_cast<api::InternalCommand&>(*entry._message).getType() != PropagateVisitorConfig::ID)) + || dynamic_cast<api::InternalCommand&>(*entry._message).getType() != PropagateVisitorConfig::ID)) { entry._timer.stop(_metrics.averageQueueWaitingTime); } @@ -290,7 +290,7 @@ VisitorThread::close() } else { _metrics.completedVisitors.inc(1); } - framework::SecondTime currentTime(_component.getClock().getTimeInSeconds()); + vespalib::steady_time currentTime(_component.getClock().getMonotonicTime()); trimRecentlyCompletedList(currentTime); _recentlyCompleted.emplace_back(_currentlyRunningVisitor->first, currentTime); _visitors.erase(_currentlyRunningVisitor); @@ -298,9 +298,9 @@ VisitorThread::close() } void -VisitorThread::trimRecentlyCompletedList(framework::SecondTime currentTime) +VisitorThread::trimRecentlyCompletedList(vespalib::steady_time currentTime) { - framework::SecondTime recentLimit(currentTime - framework::SecondTime(30)); + vespalib::steady_time recentLimit(currentTime - 30s); // Dump all elements that aren't recent anymore while (!_recentlyCompleted.empty() && _recentlyCompleted.front().second < recentLimit) @@ -313,8 +313,7 @@ void VisitorThread::handleNonExistingVisitorCall(const Event& entry, ReturnCode& code) { // Get current time. Set the time that is the oldest still recent. - framework::SecondTime currentTime(_component.getClock().getTimeInSeconds()); - trimRecentlyCompletedList(currentTime); + trimRecentlyCompletedList(_component.getClock().getMonotonicTime()); // Go through all recent visitors. Ignore request if recent for (const auto& e : _recentlyCompleted) { @@ -344,7 +343,7 @@ VisitorThread::createVisitor(vespalib::stringref libName, auto it = _visitorFactories.find(str); if (it == _visitorFactories.end()) { error << "Visitor library " << str << " not found."; - return std::shared_ptr<Visitor>(); + return {}; } auto libIter = _libs.find(str); @@ -363,7 +362,7 @@ VisitorThread::createVisitor(vespalib::stringref libName, } catch (std::exception& e) { error << "Failed to create visitor instance of type " << libName << ": " << e.what(); - return std::shared_ptr<Visitor>(); + return {}; } } @@ -690,7 +689,7 @@ VisitorThread::getStatus(vespalib::asciistream& out, } for (const auto& cv : _recentlyCompleted) { out << "<li> Visitor " << cv.first << " done at " - << cv.second.getTime() << "\n"; + << vespalib::to_string(vespalib::to_utc(cv.second)) << "\n"; } out << "</ul>\n"; out << "<h3>Current queue size: " << _queue.size() << "</h3>\n"; @@ -736,12 +735,10 @@ VisitorThread::getStatus(vespalib::asciistream& out, if (_visitors.empty()) { out << "None\n"; } - for (VisitorMap::const_iterator it = _visitors.begin(); - it != _visitors.end(); ++it) - { - out << "<a href=\"?visitor=" << it->first + for (const auto & v : _visitors) { + out << "<a href=\"?visitor=" << v.first << (verbose ? "&verbose" : "") << "\">Visitor " - << it->first << "</a><br>\n"; + << v.first << "</a><br>\n"; } } } diff --git a/storage/src/vespa/storage/visiting/visitorthread.h b/storage/src/vespa/storage/visiting/visitorthread.h index 226e7c0631b..56e40328fda 100644 --- a/storage/src/vespa/storage/visiting/visitorthread.h +++ b/storage/src/vespa/storage/visiting/visitorthread.h @@ -38,7 +38,7 @@ class VisitorThread : public framework::Runnable, using VisitorMap = std::map<api::VisitorId, std::shared_ptr<Visitor>>; VisitorMap _visitors; - std::deque<std::pair<api::VisitorId, framework::SecondTime>> _recentlyCompleted; + std::deque<std::pair<api::VisitorId, vespalib::steady_time>> _recentlyCompleted; struct Event { enum class Type { @@ -118,7 +118,7 @@ private: */ Event popNextQueuedEventIfAvailable(); void tick(); - void trimRecentlyCompletedList(framework::SecondTime currentTime); + void trimRecentlyCompletedList(vespalib::steady_time currentTime); void handleNonExistingVisitorCall(const Event& entry, api::ReturnCode& code); std::shared_ptr<Visitor> createVisitor(vespalib::stringref libName, |