diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2023-08-10 16:29:41 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-08-10 16:29:41 +0200 |
commit | 23edd8f868f94a1698b7b5de1e4dcde3fe9ff752 (patch) | |
tree | 2fb6a98034d21a237eaa26840273fa8f2c2a4db5 | |
parent | faf0824b22f09ab0c959df4bb29e02df82b9afab (diff) | |
parent | d04d3ea48797d29ef957574b3fd0d6da78a21b17 (diff) |
Merge pull request #28017 from vespa-engine/balder/pass-time-along
Balder/pass time along
9 files changed, 76 insertions, 92 deletions
diff --git a/storage/src/tests/distributor/pendingmessagetrackertest.cpp b/storage/src/tests/distributor/pendingmessagetrackertest.cpp index 3bfa1027a82..8277281206d 100644 --- a/storage/src/tests/distributor/pendingmessagetrackertest.cpp +++ b/storage/src/tests/distributor/pendingmessagetrackertest.cpp @@ -162,10 +162,14 @@ TEST_F(PendingMessageTrackerTest, simple) { clock.setAbsoluteTimeInSeconds(1); PendingMessageTracker tracker(compReg, 0); + std::ostringstream dummy; // Enable time tracking + tracker.reportStatus(dummy, framework::HttpUrlPath("/pendingmessages?order=bucket")); + auto remove = std::make_shared<api::RemoveCommand>( makeDocumentBucket(document::BucketId(16, 1234)), document::DocumentId("id:footype:testdoc:n=1234:foo"), 1001); remove->setAddress(makeStorageAddress(0)); + tracker.insert(remove); { @@ -238,6 +242,8 @@ TEST_F(PendingMessageTrackerTest, multiple_messages) { compReg.setClock(clock); clock.setAbsoluteTimeInSeconds(1); PendingMessageTracker tracker(compReg, 0); + std::ostringstream dummy; // Enable time tracking + tracker.reportStatus(dummy, framework::HttpUrlPath("/pendingmessages?order=bucket")); insertMessages(tracker); diff --git a/storage/src/vespa/storage/distributor/nodeinfo.cpp b/storage/src/vespa/storage/distributor/nodeinfo.cpp index 6bb1949d606..3e645f57393 100644 --- a/storage/src/vespa/storage/distributor/nodeinfo.cpp +++ b/storage/src/vespa/storage/distributor/nodeinfo.cpp @@ -5,14 +5,16 @@ namespace storage::distributor { -NodeInfo::NodeInfo(const framework::Clock& clock) +NodeInfo::NodeInfo(const framework::Clock& clock) noexcept : _clock(clock) {} -uint32_t NodeInfo::getPendingCount(uint16_t idx) const { +uint32_t +NodeInfo::getPendingCount(uint16_t idx) const { return getNode(idx)._pending; } -bool NodeInfo::isBusy(uint16_t idx) const { +bool +NodeInfo::isBusy(uint16_t idx) const { const SingleNodeInfo& info = getNode(idx); if (info._busyUntilTime.time_since_epoch().count() != 0) { if (_clock.getMonotonicTime() > info._busyUntilTime) { @@ -25,15 +27,18 @@ bool NodeInfo::isBusy(uint16_t idx) const { return false; } -void NodeInfo::setBusy(uint16_t idx, vespalib::duration for_duration) { +void +NodeInfo::setBusy(uint16_t idx, vespalib::duration for_duration) { getNode(idx)._busyUntilTime = _clock.getMonotonicTime() + for_duration; } -void NodeInfo::incPending(uint16_t idx) { +void +NodeInfo::incPending(uint16_t idx) { getNode(idx)._pending++; } -void NodeInfo::decPending(uint16_t idx) { +void +NodeInfo::decPending(uint16_t idx) { SingleNodeInfo& info = getNode(idx); if (info._pending > 0) { @@ -41,12 +46,14 @@ void NodeInfo::decPending(uint16_t idx) { } } -void NodeInfo::clearPending(uint16_t idx) { +void +NodeInfo::clearPending(uint16_t idx) { SingleNodeInfo& info = getNode(idx); info._pending = 0; } -NodeInfo::SingleNodeInfo& NodeInfo::getNode(uint16_t idx) { +NodeInfo::SingleNodeInfo& +NodeInfo::getNode(uint16_t idx) { const auto index_lbound = static_cast<size_t>(idx) + 1; while (_nodes.size() < index_lbound) { _nodes.emplace_back(); @@ -55,7 +62,8 @@ NodeInfo::SingleNodeInfo& NodeInfo::getNode(uint16_t idx) { return _nodes[idx]; } -const NodeInfo::SingleNodeInfo& NodeInfo::getNode(uint16_t idx) const { +const NodeInfo::SingleNodeInfo& +NodeInfo::getNode(uint16_t idx) const { const auto index_lbound = static_cast<size_t>(idx) + 1; while (_nodes.size() < index_lbound) { _nodes.emplace_back(); diff --git a/storage/src/vespa/storage/distributor/nodeinfo.h b/storage/src/vespa/storage/distributor/nodeinfo.h index 7f0716d7804..446739ca7e9 100644 --- a/storage/src/vespa/storage/distributor/nodeinfo.h +++ b/storage/src/vespa/storage/distributor/nodeinfo.h @@ -17,30 +17,24 @@ namespace storage::distributor { class NodeInfo { public: - explicit NodeInfo(const framework::Clock& clock); - + explicit NodeInfo(const framework::Clock& clock) noexcept; uint32_t getPendingCount(uint16_t idx) const; - bool isBusy(uint16_t idx) const; - void setBusy(uint16_t idx, vespalib::duration for_duration); - void incPending(uint16_t idx); - void decPending(uint16_t idx); - void clearPending(uint16_t idx); private: struct SingleNodeInfo { - SingleNodeInfo() : _pending(0), _busyUntilTime() {} + SingleNodeInfo() noexcept : _pending(0), _busyUntilTime() {} - uint32_t _pending; + uint32_t _pending; mutable vespalib::steady_time _busyUntilTime; }; mutable std::vector<SingleNodeInfo> _nodes; - const framework::Clock& _clock; + const framework::Clock& _clock; const SingleNodeInfo& getNode(uint16_t idx) const; SingleNodeInfo& getNode(uint16_t idx); diff --git a/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp b/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp index 5b8fa6b69e3..7b3cdacf702 100644 --- a/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp +++ b/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp @@ -17,6 +17,7 @@ PendingMessageTracker::PendingMessageTracker(framework::ComponentRegister& cr, u _nodeInfo(_component.getClock()), _nodeBusyDuration(60s), _deferred_read_tasks(), + _trackTime(false), _lock() { _component.registerStatusPage(*this); @@ -69,6 +70,13 @@ pairAsRange(Pair pair) return PairAsRange<Pair>(std::move(pair)); } +document::Bucket +getBucket(const api::StorageMessage & msg) { + return (msg.getType() != api::MessageType::REQUESTBUCKETINFO) + ? msg.getBucket() + : document::Bucket(msg.getBucket().getBucketSpace(), dynamic_cast<const api::RequestBucketInfoCommand&>(msg).super_bucket_id()); +} + } std::vector<uint64_t> @@ -91,17 +99,19 @@ PendingMessageTracker::clearMessagesForNode(uint16_t node) void PendingMessageTracker::insert(const std::shared_ptr<api::StorageMessage>& msg) { - std::lock_guard guard(_lock); if (msg->getAddress()) { // TODO STRIPE reevaluate if getBucket() on RequestBucketInfo msgs should transparently return superbucket..! - document::Bucket bucket = (msg->getType() != api::MessageType::REQUESTBUCKETINFO) - ? msg->getBucket() - : document::Bucket(msg->getBucket().getBucketSpace(), - dynamic_cast<api::RequestBucketInfoCommand&>(*msg).super_bucket_id()); - _messages.emplace(currentTime(), msg->getType().getId(), msg->getPriority(), msg->getMsgId(), - bucket, msg->getAddress()->getIndex()); - - _nodeInfo.incPending(msg->getAddress()->getIndex()); + document::Bucket bucket = getBucket(*msg); + { + // We will not start tracking time until we have been asked for html at least once. + // Time tracking is only used for presenting pending messages for debugging. + TimePoint now = (_trackTime.load(std::memory_order_relaxed)) ? currentTime() : TimePoint(); + std::lock_guard guard(_lock); + _messages.emplace(now, msg->getType().getId(), msg->getPriority(), msg->getMsgId(), + bucket, msg->getAddress()->getIndex()); + + _nodeInfo.incPending(msg->getAddress()->getIndex()); + } LOG(debug, "Sending message %s with id %" PRIu64 " to %s", msg->toString().c_str(), msg->getMsgId(), msg->getAddress()->toString().c_str()); @@ -111,15 +121,13 @@ PendingMessageTracker::insert(const std::shared_ptr<api::StorageMessage>& msg) document::Bucket PendingMessageTracker::reply(const api::StorageReply& r) { - std::unique_lock guard(_lock); document::Bucket bucket; - LOG(debug, "Got reply: %s", r.toString().c_str()); uint64_t msgId = r.getMsgId(); + std::unique_lock guard(_lock); MessagesByMsgId& msgs = boost::multi_index::get<0>(_messages); MessagesByMsgId::iterator iter = msgs.find(msgId); - if (iter != msgs.end()) { bucket = iter->bucket; _nodeInfo.decPending(r.getAddress()->getIndex()); @@ -127,7 +135,6 @@ PendingMessageTracker::reply(const api::StorageReply& r) if (code == api::ReturnCode::BUSY || code == api::ReturnCode::TIMEOUT) { _nodeInfo.setBusy(r.getAddress()->getIndex(), _nodeBusyDuration); } - LOG(debug, "Erased message with id %" PRIu64 " for bucket %s", msgId, bucket.toString().c_str()); msgs.erase(msgId); auto deferred_tasks = get_deferred_ops_if_bucket_writes_drained(bucket); // Deferred tasks may try to send messages, which in turn will invoke the PendingMessageTracker. @@ -139,6 +146,7 @@ PendingMessageTracker::reply(const api::StorageReply& r) for (auto& task : deferred_tasks) { task->run(TaskRunState::OK); } + LOG(debug, "Erased message with id %" PRIu64 " for bucket %s", msgId, bucket.toString().c_str()); } return bucket; @@ -328,6 +336,7 @@ PendingMessageTracker::getStatusPerNode(std::ostream& out) const void PendingMessageTracker::reportHtmlStatus(std::ostream& out, const framework::HttpUrlPath& path) const { + _trackTime.store(true, std::memory_order_relaxed); if (!path.hasAttribute("order")) { getStatusStartPage(out); } else if (path.getAttribute("order") == "bucket") { diff --git a/storage/src/vespa/storage/distributor/pendingmessagetracker.h b/storage/src/vespa/storage/distributor/pendingmessagetracker.h index fb672d5ee31..4b5655d3f3c 100644 --- a/storage/src/vespa/storage/distributor/pendingmessagetracker.h +++ b/storage/src/vespa/storage/distributor/pendingmessagetracker.h @@ -178,11 +178,12 @@ private: document::Bucket::hash >; - Messages _messages; - framework::Component _component; - NodeInfo _nodeInfo; - vespalib::duration _nodeBusyDuration; - DeferredBucketTaskMap _deferred_read_tasks; + Messages _messages; + framework::Component _component; + NodeInfo _nodeInfo; + vespalib::duration _nodeBusyDuration; + DeferredBucketTaskMap _deferred_read_tasks; + mutable std::atomic<bool> _trackTime; // Since distributor is currently single-threaded, this will only // contend when status page is being accessed. It is, however, required diff --git a/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp b/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp index fe384a68d72..a4295613fd2 100644 --- a/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp +++ b/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp @@ -65,9 +65,7 @@ PersistenceMessageTrackerImpl::fail(MessageSender& sender, const api::ReturnCode } uint16_t -PersistenceMessageTrackerImpl::receiveReply( - MessageSender& sender, - api::BucketInfoReply& reply) +PersistenceMessageTrackerImpl::receiveReply(MessageSender& sender, api::BucketInfoReply& reply) { uint16_t node = handleReply(reply); @@ -79,9 +77,7 @@ PersistenceMessageTrackerImpl::receiveReply( } void -PersistenceMessageTrackerImpl::revert( - MessageSender& sender, - const std::vector<BucketNodePair>& revertNodes) +PersistenceMessageTrackerImpl::revert(MessageSender& sender, const std::vector<BucketNodePair>& revertNodes) { if (_revertTimestamp != 0) { // Since we're reverting, all received bucket info is voided. @@ -156,24 +152,18 @@ PersistenceMessageTrackerImpl::canSendReplyEarly() const } void -PersistenceMessageTrackerImpl::addBucketInfoFromReply( - uint16_t node, - const api::BucketInfoReply& reply) +PersistenceMessageTrackerImpl::addBucketInfoFromReply(uint16_t node, const api::BucketInfoReply& reply) { document::Bucket bucket(reply.getBucket()); const api::BucketInfo& bucketInfo(reply.getBucketInfo()); if (reply.hasBeenRemapped()) { LOG(debug, "Bucket %s: Received remapped bucket info %s from node %d", - bucket.toString().c_str(), - bucketInfo.toString().c_str(), - node); + bucket.toString().c_str(), bucketInfo.toString().c_str(), node); _remapBucketInfo[bucket].emplace_back(_op_ctx.generate_unique_timestamp(), node, bucketInfo); } else { LOG(debug, "Bucket %s: Received bucket info %s from node %d", - bucket.toString().c_str(), - bucketInfo.toString().c_str(), - node); + bucket.toString().c_str(), bucketInfo.toString().c_str(), node); _bucketInfo[bucket].emplace_back(_op_ctx.generate_unique_timestamp(), node, bucketInfo); } } @@ -182,17 +172,12 @@ void PersistenceMessageTrackerImpl::logSuccessfulReply(uint16_t node, const api::BucketInfoReply& reply) const { LOG(spam, "Bucket %s: Received successful reply %s", - reply.getBucketId().toString().c_str(), - reply.toString().c_str()); + reply.getBucketId().toString().c_str(), reply.toString().c_str()); if (!reply.getBucketInfo().valid()) { - LOG(error, - "Reply %s from node %d contained invalid bucket " - "information %s. This is a bug! Please report " - "this to the Vespa team", - reply.toString().c_str(), - node, - reply.getBucketInfo().toString().c_str()); + LOG(error, "Reply %s from node %d contained invalid bucket information %s. This is a bug! " + "Please report this to the Vespa team", + reply.toString().c_str(), node, reply.getBucketInfo().toString().c_str()); } } @@ -236,12 +221,8 @@ void PersistenceMessageTrackerImpl::updateFailureResult(const api::BucketInfoReply& reply) { LOG(debug, "Bucket %s: Received failed reply %s with result %s", - reply.getBucketId().toString().c_str(), - reply.toString().c_str(), - reply.getResult().toString().c_str()); - if (reply.getResult().getResult() > - _reply->getResult().getResult()) - { + reply.getBucketId().toString().c_str(), reply.toString().c_str(), reply.getResult().toString().c_str()); + if (reply.getResult().getResult() > _reply->getResult().getResult()) { _reply->setResult(reply.getResult()); } @@ -249,12 +230,9 @@ PersistenceMessageTrackerImpl::updateFailureResult(const api::BucketInfoReply& r } void -PersistenceMessageTrackerImpl::handleCreateBucketReply( - api::BucketInfoReply& reply, - uint16_t node) +PersistenceMessageTrackerImpl::handleCreateBucketReply(api::BucketInfoReply& reply, uint16_t node) { - LOG(spam, "Received CreateBucket reply for %s from node %u", - reply.getBucketId().toString().c_str(), node); + LOG(spam, "Received CreateBucket reply for %s from node %u", reply.getBucketId().toString().c_str(), node); if (!reply.getResult().success() && reply.getResult().getResult() != api::ReturnCode::EXISTS) { @@ -271,9 +249,7 @@ PersistenceMessageTrackerImpl::handleCreateBucketReply( } void -PersistenceMessageTrackerImpl::handlePersistenceReply( - api::BucketInfoReply& reply, - uint16_t node) +PersistenceMessageTrackerImpl::handlePersistenceReply(api::BucketInfoReply& reply, uint16_t node) { ++_n_persistence_replies_total; if (reply.getBucketInfo().valid()) { @@ -298,10 +274,7 @@ PersistenceMessageTrackerImpl::transfer_trace_state_to_reply() } void -PersistenceMessageTrackerImpl::updateFromReply( - MessageSender& sender, - api::BucketInfoReply& reply, - uint16_t node) +PersistenceMessageTrackerImpl::updateFromReply(MessageSender& sender, api::BucketInfoReply& reply, uint16_t node) { _trace.addChild(reply.steal_trace()); diff --git a/storage/src/vespa/storage/distributor/persistencemessagetracker.h b/storage/src/vespa/storage/distributor/persistencemessagetracker.h index ecc4732696b..9b06547dd98 100644 --- a/storage/src/vespa/storage/distributor/persistencemessagetracker.h +++ b/storage/src/vespa/storage/distributor/persistencemessagetracker.h @@ -8,7 +8,6 @@ #include <vespa/storageapi/messageapi/bucketinfocommand.h> #include <vespa/storageapi/messageapi/bucketinforeply.h> - namespace storage::distributor { struct PersistenceMessageTracker { diff --git a/storage/src/vespa/storageframework/generic/status/htmlstatusreporter.cpp b/storage/src/vespa/storageframework/generic/status/htmlstatusreporter.cpp index 9b7c4919403..4cc32a2fc3d 100644 --- a/storage/src/vespa/storageframework/generic/status/htmlstatusreporter.cpp +++ b/storage/src/vespa/storageframework/generic/status/htmlstatusreporter.cpp @@ -4,17 +4,14 @@ namespace storage::framework { -HtmlStatusReporter::HtmlStatusReporter(vespalib::stringref id, - vespalib::stringref name) +HtmlStatusReporter::HtmlStatusReporter(vespalib::stringref id, vespalib::stringref name) : StatusReporter(id, name) -{ -} +{ } HtmlStatusReporter::~HtmlStatusReporter() = default; void -HtmlStatusReporter::reportHtmlHeader(std::ostream& out, - const HttpUrlPath& path) const +HtmlStatusReporter::reportHtmlHeader(std::ostream& out, const HttpUrlPath& path) const { out << "<html>\n" << "<head>\n" @@ -26,8 +23,7 @@ HtmlStatusReporter::reportHtmlHeader(std::ostream& out, } void -HtmlStatusReporter::reportHtmlFooter(std::ostream& out, - const HttpUrlPath&) const +HtmlStatusReporter::reportHtmlFooter(std::ostream& out, const HttpUrlPath&) const { out << "</body>\n</html>\n"; } @@ -39,8 +35,7 @@ HtmlStatusReporter::getReportContentType(const HttpUrlPath&) const } bool -HtmlStatusReporter::reportStatus(std::ostream& out, - const HttpUrlPath& path) const +HtmlStatusReporter::reportStatus(std::ostream& out, const HttpUrlPath& path) const { if (!isValidStatusRequest()) return false; reportHtmlHeader(out, path); diff --git a/storage/src/vespa/storageframework/generic/status/htmlstatusreporter.h b/storage/src/vespa/storageframework/generic/status/htmlstatusreporter.h index 4ffba20a3fa..ee3d65b0de3 100644 --- a/storage/src/vespa/storageframework/generic/status/htmlstatusreporter.h +++ b/storage/src/vespa/storageframework/generic/status/htmlstatusreporter.h @@ -29,8 +29,7 @@ struct HtmlStatusReporter : public StatusReporter { * some code in the <head></head> part of the HTML, such as javascript * functions. */ - virtual void reportHtmlHeaderAdditions(std::ostream&, - const HttpUrlPath&) const {} + virtual void reportHtmlHeaderAdditions(std::ostream&, const HttpUrlPath&) const {} /** * Write a default HTML header. It writes the start of an HTML |