From 21a4d3c623ef93dc4ed37758fde1d7e0b493e998 Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Tue, 7 Feb 2023 12:16:18 +0000 Subject: Minor code refactoring and layout changes. --- storage/src/vespa/storage/visiting/visitor.cpp | 176 ++++++++----------------- storage/src/vespa/storage/visiting/visitor.h | 57 ++++---- 2 files changed, 80 insertions(+), 153 deletions(-) (limited to 'storage') diff --git a/storage/src/vespa/storage/visiting/visitor.cpp b/storage/src/vespa/storage/visiting/visitor.cpp index 91f304ad9a0..6d36abc896e 100644 --- a/storage/src/vespa/storage/visiting/visitor.cpp +++ b/storage/src/vespa/storage/visiting/visitor.cpp @@ -121,12 +121,9 @@ Visitor::VisitorTarget::metaForMessageId(uint64_t msgId) void Visitor::VisitorTarget::discardQueuedMessages() { - for (MessageQueue::iterator - it(_queuedMessages.begin()), e(_queuedMessages.end()); - it != e; ++it) - { - LOG(spam, "Erasing queued message with id %" PRIu64, it->second); - releaseMetaForMessageId(it->second); + for (const auto & entry : _queuedMessages) { + LOG(spam, "Erasing queued message with id %" PRIu64, entry.second); + releaseMetaForMessageId(entry.second); } _queuedMessages.clear(); } @@ -310,17 +307,14 @@ Visitor::getStateName(VisitorState s) return "COMPLETED"; default: assert(!"Unknown visitor state"); - return NULL; + return nullptr; } } Visitor::VisitorState Visitor::transitionTo(VisitorState newState) { - LOG(debug, "Visitor '%s' state transition %s -> %s", - _id.c_str(), - getStateName(_state), - getStateName(newState)); + LOG(debug, "Visitor '%s' state transition %s -> %s", _id.c_str(), getStateName(_state), getStateName(newState)); VisitorState oldState = _state; _state = newState; return oldState; @@ -339,12 +333,10 @@ Visitor::mayTransitionToCompleted() const void Visitor::forceClose() { - for (std::list::iterator it = _bucketStates.begin(); - it != _bucketStates.end(); ++it) - { + for (auto * state : _bucketStates) { // Reset iterator id so no destroy iterator will be sent - (*it)->setIteratorId(spi::IteratorId(0)); - delete *it; + state->setIteratorId(spi::IteratorId(0)); + delete state; } _bucketStates.clear(); transitionTo(STATE_COMPLETED); @@ -358,7 +350,7 @@ Visitor::sendReplyOnce() std::shared_ptr reply(_initiatingCmd->makeReply()); _hitCounter->updateVisitorStatistics(_visitorStatistics); - static_cast(reply.get())->setVisitorStatistics(_visitorStatistics); + dynamic_cast(reply.get())->setVisitorStatistics(_visitorStatistics); if (shouldAddMbusTrace()) { _trace.moveTraceTo(reply->getTrace()); } @@ -373,17 +365,15 @@ void Visitor::finalize() { if (_state != STATE_COMPLETED) { - LOG(error, "Attempting to finalize non-completed visitor %s", - _id.c_str()); + LOG(error, "Attempting to finalize non-completed visitor %s", _id.c_str()); assert(false); } assert(_bucketStates.empty()); if (_result.success()) { - if (_messageSession->pending() > 0) - { + if (_messageSession->pending() > 0) { _result = api::ReturnCode(api::ReturnCode::ABORTED); - try{ + try { abortedVisiting(); } catch (std::exception& e) { LOG(warning, "Visitor %s had a problem in abortVisiting(). As " @@ -404,43 +394,31 @@ Visitor::finalize() void Visitor::discardAllNoPendingBucketStates() { - for (BucketStateList::iterator - it(_bucketStates.begin()), e(_bucketStates.end()); - it != e;) - { + for (auto it = _bucketStates.begin(); it !=_bucketStates.end();) { BucketIterationState& bstate(**it); if (bstate.hasPendingControlCommand() || bstate.hasPendingIterators()) { - LOG(debug, - "Visitor '%s' not discarding bucket state %s " - "since it has pending operations", - _id.c_str(), - bstate.toString().c_str()); + LOG(debug, "Visitor '%s' not discarding bucket state %s since it has pending operations", + _id.c_str(), bstate.toString().c_str()); ++it; continue; } - LOG(debug, "Visitor '%s' discarding bucket state %s", - _id.c_str(), bstate.toString().c_str()); + LOG(debug, "Visitor '%s' discarding bucket state %s", _id.c_str(), bstate.toString().c_str()); delete *it; it = _bucketStates.erase(it); } } void -Visitor::fail(const api::ReturnCode& reason, - bool overrideExistingError) +Visitor::fail(const api::ReturnCode& reason, bool overrideExistingError) { assert(_state != STATE_COMPLETED); if (_result.getResult() < reason.getResult() || overrideExistingError) { - LOG(debug, "Setting result of visitor '%s' to %s", - _id.c_str(), reason.toString().c_str()); + LOG(debug, "Setting result of visitor '%s' to %s", _id.c_str(), reason.toString().c_str()); _result = reason; } if (_visitorTarget.hasQueuedMessages()) { - LOG(debug, "Visitor '%s' dropping %zu queued messages bound to %s " - "since visitor has failed", - _id.c_str(), - _visitorTarget._queuedMessages.size(), - _controlDestination->toString().c_str()); + LOG(debug, "Visitor '%s' dropping %zu queued messages bound to %s since visitor has failed", + _id.c_str(), _visitorTarget._queuedMessages.size(), _controlDestination->toString().c_str()); _visitorTarget.discardQueuedMessages(); } discardAllNoPendingBucketStates(); @@ -448,8 +426,7 @@ Visitor::fail(const api::ReturnCode& reason, } bool -Visitor::shouldReportProblemToClient(const api::ReturnCode& code, - size_t retryCount) const +Visitor::shouldReportProblemToClient(const api::ReturnCode& code, size_t retryCount) { // Report _once_ per message if we reach a certain retry threshold. if (retryCount == TRANSIENT_ERROR_RETRIES_BEFORE_NOTIFY) { @@ -521,7 +498,7 @@ Visitor::start(api::VisitorId id, api::StorageMessage::Id cmdId, _visitorOptions._fromTime = fromTimestamp; _visitorOptions._toTime = toTimestamp; _currentBucket = 0; - _hitCounter.reset(new HitCounter()); + _hitCounter = std::make_unique(); _messageSession = std::move(messageSession); _documentPriority = documentPriority; @@ -612,8 +589,7 @@ Visitor::handleDocumentApiReply(mbus::Reply::UP reply, VisitorThreadMetrics& met uint64_t messageId = reply->getContext().value.UINT64; uint32_t removed = _visitorTarget._pendingMessages.erase(messageId); - LOG(spam, "Visitor '%s' reply %s for message ID %" PRIu64, _id.c_str(), - reply->toString().c_str(), messageId); + LOG(spam, "Visitor '%s' reply %s for message ID %" PRIu64, _id.c_str(), reply->toString().c_str(), messageId); assert(removed == 1); (void) removed; @@ -634,20 +610,16 @@ Visitor::handleDocumentApiReply(mbus::Reply::UP reply, VisitorThreadMetrics& met metrics.visitorDestinationFailureReplies.inc(); if (message->getType() == documentapi::DocumentProtocol::MESSAGE_VISITORINFO) { - LOG(debug, "Aborting visitor as we failed to talk to controller: %s", - reply->getError(0).toString().c_str()); - api::ReturnCode returnCode( - static_cast( - reply->getError(0).getCode()), - reply->getError(0).getMessage()); + LOG(debug, "Aborting visitor as we failed to talk to controller: %s", reply->getError(0).toString().c_str()); + api::ReturnCode returnCode(static_cast(reply->getError(0).getCode()), + reply->getError(0).getMessage()); fail(returnCode, true); close(); return; } - api::ReturnCode returnCode( - static_cast(reply->getError(0).getCode()), - reply->getError(0).getMessage()); + api::ReturnCode returnCode(static_cast(reply->getError(0).getCode()), + reply->getError(0).getMessage()); const bool should_fail = remap_docapi_message_error_code(returnCode); if (should_fail) { // Abort - something is wrong with target. @@ -657,8 +629,7 @@ Visitor::handleDocumentApiReply(mbus::Reply::UP reply, VisitorThreadMetrics& met } if (failed()) { - LOG(debug, "Failed to send message from visitor '%s', due to " - "%s. Not resending since visitor has failed", + LOG(debug, "Failed to send message from visitor '%s', due to %s. Not resending since visitor has failed", _id.c_str(), returnCode.toString().c_str()); return; } @@ -709,8 +680,7 @@ Visitor::onCreateIteratorReply( if (reply->getResult().failed()) { LOG(debug, "Failed to create iterator for bucket %s: %s", - bucketId.toString().c_str(), - reply->getResult().toString().c_str()); + bucketId.toString().c_str(), reply->getResult().toString().c_str()); fail(reply->getResult()); delete *it; _bucketStates.erase((++it).base()); @@ -718,17 +688,14 @@ Visitor::onCreateIteratorReply( } bucketState.setIteratorId(reply->getIteratorId()); if (failed()) { - LOG(debug, "Create iterator for bucket %s is OK, " - "but visitor has failed: %s", - bucketId.toString().c_str(), - _result.toString().c_str()); + LOG(debug, "Create iterator for bucket %s is OK, but visitor has failed: %s", + bucketId.toString().c_str(), _result.toString().c_str()); delete *it; _bucketStates.erase((++it).base()); return; } - LOG(debug, "Visitor '%s' starting to visit bucket %s.", - _id.c_str(), bucketId.toString().c_str()); + LOG(debug, "Visitor '%s' starting to visit bucket %s.", _id.c_str(), bucketId.toString().c_str()); auto cmd = std::make_shared(bucket, bucketState.getIteratorId(), _docBlockSize); cmd->getTrace().setLevel(_traceLevel); cmd->setPriority(_priority); @@ -737,13 +704,10 @@ Visitor::onCreateIteratorReply( } void -Visitor::onGetIterReply(const std::shared_ptr& reply, - VisitorThreadMetrics& metrics) +Visitor::onGetIterReply(const std::shared_ptr& reply, VisitorThreadMetrics& metrics) { LOG(debug, "Visitor '%s' got get iter reply for bucket %s: %s", - _id.c_str(), - reply->getBucketId().toString().c_str(), - reply->getResult().toString().c_str()); + _id.c_str(), reply->getBucketId().toString().c_str(), reply->getResult().toString().c_str()); auto it = _bucketStates.rbegin(); // New requests will be pushed on end of list.. So searching @@ -763,10 +727,8 @@ Visitor::onGetIterReply(const std::shared_ptr& reply, !reply->getResult().isShutdownRelated() && !reply->getResult().isBucketDisappearance()) { - LOG(warning, "Failed to talk to persistence layer for bucket " - "%s. Aborting visitor '%s': %s", - reply->getBucketId().toString().c_str(), - _id.c_str(), reply->getResult().toString().c_str()); + LOG(warning, "Failed to talk to persistence layer for bucket %s. Aborting visitor '%s': %s", + reply->getBucketId().toString().c_str(), _id.c_str(), reply->getResult().toString().c_str()); } fail(reply->getResult()); BucketIterationState& bucketState(**it); @@ -783,17 +745,14 @@ Visitor::onGetIterReply(const std::shared_ptr& reply, bucketState.setCompleted(reply->isCompleted()); --bucketState._pendingIterators; if (!reply->getEntries().empty()) { - LOG(debug, "Processing documents in handle given from bucket %s.", - reply->getBucketId().toString().c_str()); + LOG(debug, "Processing documents in handle given from bucket %s.", reply->getBucketId().toString().c_str()); // While handling documents we should not keep locks, such // that visitor may process several things at once. if (isRunning()) { MBUS_TRACE(reply->getTrace(), 5, vespalib::make_string("Visitor %s handling block of %zu documents.", _id.c_str(), reply->getEntries().size())); - LOG(debug, "Visitor %s handling block of %zu documents.", - _id.c_str(), - reply->getEntries().size()); + LOG(debug, "Visitor %s handling block of %zu documents.", _id.c_str(), reply->getEntries().size()); try { framework::MilliSecTimer processingTimer(_component.getClock()); handleDocuments(reply->getBucketId(), reply->getEntries(), *_hitCounter); @@ -913,15 +872,11 @@ Visitor::continueVisitor() } } - LOG(debug, "No pending messages, tagging visitor '%s' complete", - _id.c_str()); + LOG(debug, "No pending messages, tagging visitor '%s' complete", _id.c_str()); transitionTo(STATE_COMPLETED); } else { - LOG(debug, "Visitor %s waiting for all commands to be replied to " - "(pending=%zu, queued=%zu)", - _id.c_str(), - _visitorTarget._pendingMessages.size(), - _visitorTarget._queuedMessages.size()); + LOG(debug, "Visitor %s waiting for all commands to be replied to (pending=%zu, queued=%zu)", + _id.c_str(), _visitorTarget._pendingMessages.size(), _visitorTarget._queuedMessages.size()); } return false; } else { @@ -981,14 +936,14 @@ Visitor::getStatus(std::ostream& out, bool verbose) const << (_visitorOptions._visitRemoves ? "true" : "false") << "\n"; out << "Control destination"; - if (_controlDestination.get()) { + if (_controlDestination) { out << xml_content_escaped(_controlDestination->toString()); } else { out << "nil"; } out << "\n"; out << "Data destination"; - if (_dataDestination.get()) { + if (_dataDestination) { out << xml_content_escaped(_dataDestination->toString()); } else { out << "nil"; @@ -1078,17 +1033,13 @@ Visitor::getStatus(std::ostream& out, bool verbose) const bool Visitor::getIterators() { - LOG(debug, "getIterators, visitor %s, _buckets = %zu , _bucketStates = %zu, " - "_currentBucket = %d", - _id.c_str(), _buckets.size(), - _bucketStates.size(), _currentBucket); + LOG(debug, "getIterators, visitor %s, _buckets = %zu , _bucketStates = %zu, _currentBucket = %d", + _id.c_str(), _buckets.size(), _bucketStates.size(), _currentBucket); // Don't send any further GetIters if we're closing if (!isRunning()) { if (hasPendingIterators()) { - LOG(debug, "Visitor has failed but waiting for %zu " - "buckets to finish processing", - _bucketStates.size()); + LOG(debug, "Visitor has failed but waiting for %zu buckets to finish processing", _bucketStates.size()); return true; } else { return false; @@ -1097,13 +1048,10 @@ Visitor::getIterators() // Go through buckets found. Take the first that doesn't have requested // state and request a new piece. - for (std::list::iterator it = _bucketStates.begin(); - it != _bucketStates.end();) - { + for (auto it = _bucketStates.begin();it != _bucketStates.end();) { assert(*it); BucketIterationState& bucketState(**it); - if ((bucketState._pendingIterators - >= _visitorOptions._maxParallelOneBucket) + if ((bucketState._pendingIterators >= _visitorOptions._maxParallelOneBucket) || bucketState.hasPendingControlCommand()) { ++it; @@ -1118,20 +1066,17 @@ Visitor::getIterators() } try{ completedBucket(bucketState.getBucketId(), *_hitCounter); - _visitorStatistics.setBucketsVisited( - _visitorStatistics.getBucketsVisited() + 1); + _visitorStatistics.setBucketsVisited(_visitorStatistics.getBucketsVisited() + 1); } catch (std::exception& e) { std::ostringstream ost; - ost << "Visitor fail to run completedBucket() notification: " - << e.what(); + ost << "Visitor fail to run completedBucket() notification: " << e.what(); reportProblem(ost.str()); } delete *it; it = _bucketStates.erase(it); continue; } - auto cmd = std::make_shared( - bucketState.getBucket(), bucketState.getIteratorId(), _docBlockSize); + auto cmd = std::make_shared(bucketState.getBucket(), bucketState.getIteratorId(), _docBlockSize); cmd->getTrace().setLevel(_traceLevel); cmd->setPriority(_priority); _messageHandler->send(cmd, *this); @@ -1143,7 +1088,7 @@ Visitor::getIterators() } // If there aren't anymore buckets to iterate, we're done - if (_bucketStates.size() == 0 && _currentBucket >= _buckets.size()) { + if (_bucketStates.empty() && _currentBucket >= _buckets.size()) { LOG(debug, "No more buckets to visit for visitor '%s'.", _id.c_str()); return false; } @@ -1157,17 +1102,13 @@ Visitor::getIterators() _currentBucket < _buckets.size()) { document::Bucket bucket(_bucketSpace, _buckets[_currentBucket]); - std::unique_ptr newBucketState( - new BucketIterationState(*this, *_messageHandler, bucket)); + auto newBucketState = std::make_unique(*this, *_messageHandler, bucket); LOG(debug, "Visitor '%s': Sending create iterator for bucket %s.", _id.c_str(), bucket.getBucketId().toString().c_str()); - spi::Selection selection - = spi::Selection(spi::DocumentSelection(_documentSelectionString)); - selection.setFromTimestamp( - spi::Timestamp(_visitorOptions._fromTime.getTime())); - selection.setToTimestamp( - spi::Timestamp(_visitorOptions._toTime.getTime())); + spi::Selection selection = spi::Selection(spi::DocumentSelection(_documentSelectionString)); + selection.setFromTimestamp(spi::Timestamp(_visitorOptions._fromTime.getTime())); + selection.setToTimestamp(spi::Timestamp(_visitorOptions._toTime.getTime())); auto cmd = std::make_shared(bucket, selection,_visitorOptions._fieldSet, _visitorOptions._visitRemoves @@ -1184,8 +1125,7 @@ Visitor::getIterators() } if (sentCount == 0) { if (LOG_WOULD_LOG(debug)) { - LOG(debug, "Enough iterators being processed. Doing nothing for " - "visitor '%s' bucketStates = %zu.", + LOG(debug, "Enough iterators being processed. Doing nothing for visitor '%s' bucketStates = %zu.", _id.c_str(), _bucketStates.size()); for (const auto& state : _bucketStates) { LOG(debug, "Existing: %s", state->toString().c_str()); diff --git a/storage/src/vespa/storage/visiting/visitor.h b/storage/src/vespa/storage/visiting/visitor.h index 0737c5612c0..9b6d8e348b9 100644 --- a/storage/src/vespa/storage/visiting/visitor.h +++ b/storage/src/vespa/storage/visiting/visitor.h @@ -136,28 +136,24 @@ private: {} /** Sends DestroyIterator over _messageHandler if _iteratorId != 0 */ - ~BucketIterationState(); + ~BucketIterationState() override; void setCompleted(bool completed = true) { _completed = completed; } - bool isCompleted() const { return _completed; } + [[nodiscard]] bool isCompleted() const { return _completed; } - document::Bucket getBucket() const { return _bucket; } - document::BucketId getBucketId() const { return _bucket.getBucketId(); } + [[nodiscard]] document::Bucket getBucket() const { return _bucket; } + [[nodiscard]] document::BucketId getBucketId() const { return _bucket.getBucketId(); } void setIteratorId(spi::IteratorId iteratorId) { _iteratorId = iteratorId; } - spi::IteratorId getIteratorId() const { return _iteratorId; } + [[nodiscard]] spi::IteratorId getIteratorId() const { return _iteratorId; } - void setPendingControlCommand() { - _iteratorId = spi::IteratorId(0); - } - - bool hasPendingControlCommand() const { + [[nodiscard]] bool hasPendingControlCommand() const { return _iteratorId == spi::IteratorId(0); } - bool hasPendingIterators() const { return _pendingIterators > 0; } + [[nodiscard]] bool hasPendingIterators() const { return _pendingIterators > 0; } void print(std::ostream& out, bool, const std::string& ) const override { out << "BucketIterationState(" @@ -247,12 +243,10 @@ private: MessageMeta releaseMetaForMessageId(uint64_t msgId); void reinsertMeta(MessageMeta); - bool hasQueuedMessages() const { return !_queuedMessages.empty(); } + [[nodiscard]] bool hasQueuedMessages() const { return !_queuedMessages.empty(); } void discardQueuedMessages(); - uint32_t getMemoryUsage() const noexcept { - return _memoryUsage; - } + [[nodiscard]] uint32_t getMemoryUsage() const noexcept { return _memoryUsage; } VisitorTarget(); ~VisitorTarget(); @@ -326,9 +320,9 @@ protected: std::string _documentSelectionString; vdslib::VisitorStatistics _visitorStatistics; - bool isCompletedCalled() const { return _calledCompletedVisitor; } + [[nodiscard]] bool isCompletedCalled() const { return _calledCompletedVisitor; } - uint32_t traceLevel() const noexcept { return _traceLevel; } + [[nodiscard]] uint32_t traceLevel() const noexcept { return _traceLevel; } /** * Attempts to add the given trace message to the internal, memory bounded @@ -339,7 +333,7 @@ protected: */ bool addBoundedTrace(uint32_t level, const vespalib::string& message); - const vdslib::Parameters& visitor_parameters() const noexcept; + [[nodiscard]] const vdslib::Parameters& visitor_parameters() const noexcept; // Possibly modifies the ReturnCode parameter in-place if its return code should // be changed based on visitor subclass-specific behavior. @@ -417,7 +411,7 @@ public: * The consistency level provided here is propagated through the SPI * Context object for createIterator calls. */ - virtual spi::ReadConsistency getRequiredReadConsistency() const { + [[nodiscard]] virtual spi::ReadConsistency getRequiredReadConsistency() const { return spi::ReadConsistency::STRONG; } @@ -428,8 +422,7 @@ public: /** * Used to silence transient errors that can happen during normal operation. */ - bool shouldReportProblemToClient(const api::ReturnCode&, - size_t retryCount) const; + [[nodiscard]] static bool shouldReportProblemToClient(const api::ReturnCode&, size_t retryCount) ; /** Called to send report to client of potential non-critical problems. */ void reportProblem(const std::string& problem); @@ -492,18 +485,16 @@ public: void getStatus(std::ostream& out, bool verbose) const; - void setMaxParallel(uint32_t maxParallel) - { _visitorOptions._maxParallel = maxParallel; } - void setMaxParallelPerBucket(uint32_t max) - { _visitorOptions._maxParallelOneBucket = max; } + void setMaxParallel(uint32_t maxParallel) { _visitorOptions._maxParallel = maxParallel; } + void setMaxParallelPerBucket(uint32_t max) { _visitorOptions._maxParallelOneBucket = max; } /** * Sends a message to the data handler for this visitor. */ void sendMessage(std::unique_ptr documentMessage); - bool isRunning() const { return _state == STATE_RUNNING; } - bool isCompleted() const { return _state == STATE_COMPLETED; } + [[nodiscard]] bool isRunning() const { return _state == STATE_RUNNING; } + [[nodiscard]] bool isCompleted() const { return _state == STATE_COMPLETED; } private: /** @@ -542,11 +533,9 @@ private: void sendReplyOnce(); - bool hasFailedVisiting() const { return _result.failed(); } - - bool hasPendingIterators() const { return !_bucketStates.empty(); } - - bool mayTransitionToCompleted() const; + [[nodiscard]] bool hasFailedVisiting() const { return _result.failed(); } + [[nodiscard]] bool hasPendingIterators() const { return !_bucketStates.empty(); } + [[nodiscard]] bool mayTransitionToCompleted() const; void discardAllNoPendingBucketStates(); @@ -565,9 +554,7 @@ private: * * Precondition: attach() must have been called on `this`. */ - bool shouldAddMbusTrace() const noexcept { - return _traceLevel != 0; - } + [[nodiscard]] bool shouldAddMbusTrace() const noexcept { return _traceLevel != 0; } /** * Set internal state to the given state value. -- cgit v1.2.3