diff options
author | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2020-06-09 12:53:41 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2020-06-09 13:05:49 +0000 |
commit | baca22920493a0781b473dcdf75893bc3f0a6bbc (patch) | |
tree | ce2203b826d961413c087c35d76fcf1c596ee7df /storage | |
parent | 66c66aa167c2ba431943ad7287da3f20b11a05ab (diff) |
Clean up some visiting code. No functional changes.
Diffstat (limited to 'storage')
-rw-r--r-- | storage/src/vespa/storage/visiting/visitor.cpp | 43 | ||||
-rw-r--r-- | storage/src/vespa/storage/visiting/visitorthread.cpp | 149 |
2 files changed, 76 insertions, 116 deletions
diff --git a/storage/src/vespa/storage/visiting/visitor.cpp b/storage/src/vespa/storage/visiting/visitor.cpp index bdd066e8a4a..c9cda047784 100644 --- a/storage/src/vespa/storage/visiting/visitor.cpp +++ b/storage/src/vespa/storage/visiting/visitor.cpp @@ -14,7 +14,7 @@ #include <sstream> #include <vespa/log/log.h> -LOG_SETUP(".visitor.instance"); +LOG_SETUP(".visitor.instance.visitor"); using document::BucketSpace; @@ -367,8 +367,7 @@ Visitor::sendReplyOnce() { assert(_initiatingCmd.get()); if (!_hasSentReply) { - std::shared_ptr<api::StorageReply> reply( - _initiatingCmd->makeReply().release()); + std::shared_ptr<api::StorageReply> reply(_initiatingCmd->makeReply()); _hitCounter->updateVisitorStatistics(_visitorStatistics); static_cast<api::CreateVisitorReply*>(reply.get()) @@ -563,8 +562,7 @@ Visitor::attach(std::shared_ptr<api::StorageCommand> initiatingCmd, _priority = initiatingCmd->getPriority(); _timeToDie = _component.getClock().getTimeInMicros() + timeout.getMicros(); if (_initiatingCmd.get()) { - std::shared_ptr<api::StorageReply> reply( - _initiatingCmd->makeReply().release()); + std::shared_ptr<api::StorageReply> reply(_initiatingCmd->makeReply()); reply->setResult(api::ReturnCode::ABORTED); _messageHandler->send(reply); } @@ -594,7 +592,7 @@ Visitor::attach(std::shared_ptr<api::StorageCommand> initiatingCmd, // In case there was no messages to resend we need to call // continueVisitor to provoke it to resume. - for (uint32_t i=0; i<_visitorOptions._maxParallelOneBucket; ++i) { + for (uint32_t i = 0; i < _visitorOptions._maxParallelOneBucket; ++i) { if (!continueVisitor()) return; } } @@ -669,8 +667,7 @@ Visitor::handleDocumentApiReply(mbus::Reply::UP reply, return; } assert(!meta.message); - meta.message.reset( - static_cast<documentapi::DocumentMessage*>(message.release())); + meta.message.reset(static_cast<documentapi::DocumentMessage*>(message.release())); meta.retryCount++; const size_t retryCount = meta.retryCount; @@ -702,7 +699,7 @@ Visitor::onCreateIteratorReply( const std::shared_ptr<CreateIteratorReply>& reply, VisitorThreadMetrics& /*metrics*/) { - std::list<BucketIterationState*>::reverse_iterator it = _bucketStates.rbegin(); + auto it = _bucketStates.rbegin(); document::Bucket bucket(reply->getBucket()); document::BucketId bucketId(bucket.getBucketId()); @@ -752,7 +749,7 @@ Visitor::onGetIterReply(const std::shared_ptr<GetIterReply>& reply, _id.c_str(), reply->getBucketId().toString().c_str(), reply->getResult().toString().c_str()); - std::list<BucketIterationState*>::reverse_iterator it = _bucketStates.rbegin(); + auto it = _bucketStates.rbegin(); // New requests will be pushed on end of list.. So searching // in reverse order should quickly get correct result. @@ -803,7 +800,7 @@ Visitor::onGetIterReply(const std::shared_ptr<GetIterReply>& reply, LOG(debug, "Visitor %s handling block of %zu documents.", _id.c_str(), reply->getEntries().size()); - try{ + try { framework::MilliSecTimer processingTimer(_component.getClock()); handleDocuments(reply->getBucketId(), reply->getEntries(), @@ -814,18 +811,16 @@ Visitor::onGetIterReply(const std::shared_ptr<GetIterReply>& reply, MBUS_TRACE(reply->getTrace(), 5, "Done processing data block in visitor plugin"); uint64_t size = 0; - for (size_t i = 0; i < reply->getEntries().size(); ++i) { - size += reply->getEntries()[i]->getPersistedDocumentSize(); + for (const auto& entry : reply->getEntries()) { + size += entry->getPersistedDocumentSize(); } _visitorStatistics.setDocumentsVisited( _visitorStatistics.getDocumentsVisited() + reply->getEntries().size()); - _visitorStatistics.setBytesVisited( - _visitorStatistics.getBytesVisited() + size); + _visitorStatistics.setBytesVisited(_visitorStatistics.getBytesVisited() + size); } catch (std::exception& e) { - LOG(warning, "handleDocuments threw exception %s", - e.what()); + LOG(warning, "handleDocuments threw exception %s", e.what()); reportProblem(e.what()); } } @@ -849,8 +844,7 @@ Visitor::sendDueQueuedMessages(framework::MicroSecTime timeNow) while (!_visitorTarget._queuedMessages.empty() && (_visitorTarget._pendingMessages.size() < _visitorOptions._maxPending)) { - VisitorTarget::MessageQueue::iterator it( - _visitorTarget._queuedMessages.begin()); + auto it = _visitorTarget._queuedMessages.begin(); if (it->first < timeNow) { auto& msgMeta = _visitorTarget.metaForMessageId(it->second); _visitorTarget._queuedMessages.erase(it); @@ -1204,13 +1198,10 @@ Visitor::getIterators() if (sentCount == 0) { if (LOG_WOULD_LOG(debug)) { LOG(debug, "Enough iterators being processed. Doing nothing for " - "visitor '%s' bucketStates = %d.", - _id.c_str(), (int)_bucketStates.size()); - for (std::list<BucketIterationState*>::iterator it( - _bucketStates.begin()); - it != _bucketStates.end(); ++it) - { - LOG(debug, "Existing: %s", (*it)->toString().c_str()); + "visitor '%s' bucketStates = %zu.", + _id.c_str(), _bucketStates.size()); + for (const auto& state : _bucketStates) { + LOG(debug, "Existing: %s", state->toString().c_str()); } } } diff --git a/storage/src/vespa/storage/visiting/visitorthread.cpp b/storage/src/vespa/storage/visiting/visitorthread.cpp index 006af5edf7d..c6e75735690 100644 --- a/storage/src/vespa/storage/visiting/visitorthread.cpp +++ b/storage/src/vespa/storage/visiting/visitorthread.cpp @@ -140,8 +140,7 @@ VisitorThread::shutdown() .getType() != PropagateVisitorConfig::ID)) { std::shared_ptr<api::StorageReply> reply( - static_cast<api::StorageCommand&>(*it->_message) - .makeReply().release()); + static_cast<api::StorageCommand&>(*it->_message).makeReply()); reply->setResult(api::ReturnCode(api::ReturnCode::ABORTED, "Shutting down storage node.")); _messageSender.send(reply); @@ -225,9 +224,9 @@ VisitorThread::run(framework::ThreadHandle& thread) if (entry._message.get()) { // If visitor doesn't exist, log failure only if it wasn't // recently deleted - if (_currentlyRunningVisitor == _visitors.end() && - entry._message->getType() != api::MessageType::VISITOR_CREATE && - entry._message->getType() != api::MessageType::INTERNAL) + if ((_currentlyRunningVisitor == _visitors.end()) && + (entry._message->getType() != api::MessageType::VISITOR_CREATE) && + (entry._message->getType() != api::MessageType::INTERNAL)) { handleNonExistingVisitorCall(entry, result); } else { @@ -264,10 +263,8 @@ VisitorThread::run(framework::ThreadHandle& thread) if (!handled && entry._message.get() && !entry._message->getType().isReply()) { - api::StorageCommand& cmd( - dynamic_cast<api::StorageCommand&>(*entry._message)); - std::shared_ptr<api::StorageReply> reply( - cmd.makeReply().release()); + auto& cmd = dynamic_cast<api::StorageCommand&>(*entry._message); + std::shared_ptr<api::StorageReply> reply(cmd.makeReply()); reply->setResult(result); _messageSender.send(reply); } @@ -278,10 +275,8 @@ void VisitorThread::tick() { // Give all visitors an event - for (VisitorMap::iterator it = _visitors.begin(); it != _visitors.end();) - { - LOG(spam, "Giving tick to visitor %s.", - it->second->getVisitorName().c_str()); + for (auto it = _visitors.begin(); it != _visitors.end();) { + LOG(spam, "Giving tick to visitor %s.", it->second->getVisitorName().c_str()); it->second->continueVisitor(); if (it->second->isCompleted()) { LOG(debug, "Closing visitor %s. Visitor marked as completed", @@ -312,11 +307,9 @@ VisitorThread::close() } else { _metrics.completedVisitors[loadType].inc(1); } - framework::SecondTime currentTime( - _component.getClock().getTimeInSeconds()); + framework::SecondTime currentTime(_component.getClock().getTimeInSeconds()); trimRecentlyCompletedList(currentTime); - _recentlyCompleted.push_back(std::make_pair( - _currentlyRunningVisitor->first, currentTime)); + _recentlyCompleted.emplace_back(_currentlyRunningVisitor->first, currentTime); _visitors.erase(_currentlyRunningVisitor); _currentlyRunningVisitor = _visitors.end(); } @@ -324,8 +317,7 @@ VisitorThread::close() void VisitorThread::trimRecentlyCompletedList(framework::SecondTime currentTime) { - framework::SecondTime recentLimit( - currentTime - framework::SecondTime(30)); + framework::SecondTime recentLimit(currentTime - framework::SecondTime(30)); // Dump all elements that aren't recent anymore while (!_recentlyCompleted.empty() && _recentlyCompleted.front().second < recentLimit) @@ -339,16 +331,12 @@ VisitorThread::handleNonExistingVisitorCall(const Event& entry, ReturnCode& code) { // Get current time. Set the time that is the oldest still recent. - framework::SecondTime currentTime( - _component.getClock().getTimeInSeconds());; + framework::SecondTime currentTime(_component.getClock().getTimeInSeconds());; trimRecentlyCompletedList(currentTime); // Go through all recent visitors. Ignore request if recent - for (std::deque<std::pair<api::VisitorId, framework::SecondTime> > - ::iterator it = _recentlyCompleted.begin(); - it != _recentlyCompleted.end(); ++it) - { - if (it->first == entry._visitorId) { + for (const auto& e : _recentlyCompleted) { + if (e.first == entry._visitorId) { code = ReturnCode(ReturnCode::ILLEGAL_PARAMETERS, "Visitor recently completed/failed/aborted."); return; @@ -371,13 +359,13 @@ VisitorThread::createVisitor(vespalib::stringref libName, vespalib::string str = libName; std::transform(str.begin(), str.end(), str.begin(), tolower); - VisitorFactory::Map::iterator it(_visitorFactories.find(str)); + auto it = _visitorFactories.find(str); if (it == _visitorFactories.end()) { error << "Visitor library " << str << " not found."; return std::shared_ptr<Visitor>(); } - LibMap::iterator libIter = _libs.find(str); + auto libIter = _libs.find(str); if (libIter == _libs.end()) { _libs[str] = std::shared_ptr<VisitorEnvironment>( it->second->makeVisitorEnvironment(_component).release()); @@ -402,17 +390,15 @@ namespace { std::unique_ptr<api::StorageMessageAddress> getDataAddress(const api::CreateVisitorCommand& cmd) { - return std::unique_ptr<api::StorageMessageAddress>( - new api::StorageMessageAddress( - mbus::Route::parse(cmd.getDataDestination()))); + return std::make_unique<api::StorageMessageAddress>( + mbus::Route::parse(cmd.getDataDestination())); } std::unique_ptr<api::StorageMessageAddress> getControlAddress(const api::CreateVisitorCommand& cmd) { - return std::unique_ptr<api::StorageMessageAddress>( - new api::StorageMessageAddress( - mbus::Route::parse(cmd.getControlDestination()))); + return std::make_unique<api::StorageMessageAddress>( + mbus::Route::parse(cmd.getControlDestination())); } void @@ -447,28 +433,27 @@ VisitorThread::onCreateVisitor( std::unique_ptr<api::StorageMessageAddress> dataAddress; std::shared_ptr<Visitor> visitor; do { - // If no buckets are specified, fail command - if (cmd->getBuckets().size() == 0) { + // If no buckets are specified, fail command + if (cmd->getBuckets().empty()) { result = ReturnCode(ReturnCode::ILLEGAL_PARAMETERS, "No buckets specified"); LOG(warning, "CreateVisitor(%s): No buckets specified. Aborting.", cmd->getInstanceId().c_str()); break; } - // Get the source address + // Get the source address controlAddress = getControlAddress(*cmd); dataAddress = getDataAddress(*cmd); - // Attempt to load library containing visitor + // Attempt to load library containing visitor vespalib::asciistream errors; - visitor = createVisitor(cmd->getLibraryName(), cmd->getParameters(), - errors); - if (visitor.get() == 0) { + visitor = createVisitor(cmd->getLibraryName(), cmd->getParameters(), errors); + if (!visitor) { result = ReturnCode(ReturnCode::ILLEGAL_PARAMETERS, errors.str()); LOG(warning, "CreateVisitor(%s): Failed to create visitor: %s", cmd->getInstanceId().c_str(), errors.str().data()); break; } - // Set visitor parameters + // Set visitor parameters if (cmd->getMaximumPendingReplyCount() != 0) { visitor->setMaxPending(cmd->getMaximumPendingReplyCount()); } else { @@ -494,11 +479,9 @@ VisitorThread::onCreateVisitor( // Parse document selection try{ - if (cmd->getDocumentSelection() != "") { - std::shared_ptr<const document::DocumentTypeRepo> repo( - _component.getTypeRepo()); - const document::BucketIdFactory& idFactory( - _component.getBucketIdFactory()); + if (!cmd->getDocumentSelection().empty()) { + std::shared_ptr<const document::DocumentTypeRepo> repo(_component.getTypeRepo()); + const document::BucketIdFactory& idFactory(_component.getBucketIdFactory()); document::select::Parser parser(*repo, idFactory); docSelection = parser.parse(cmd->getDocumentSelection()); validateDocumentSelection(*repo, *docSelection); @@ -522,11 +505,11 @@ VisitorThread::onCreateVisitor( } LOG(debug, "CreateVisitor(%s): Successfully created visitor", cmd->getInstanceId().c_str()); - // Insert visitor prior to creating successful reply. + // Insert visitor prior to creating successful reply. } while (false); - // Start the visitor last, as to ensure client will receive - // visitor create reply first, and that all errors we could detect - // resulted in proper error code in reply.. + // Start the visitor last, as to ensure client will receive + // visitor create reply first, and that all errors we could detect + // resulted in proper error code in reply.. if (result.success()) { _visitors[cmd->getVisitorId()] = visitor; try{ @@ -548,18 +531,17 @@ VisitorThread::onCreateVisitor( visitor->attach(cmd, *controlAddress, *dataAddress, framework::MilliSecTime(vespalib::count_ms(cmd->getTimeout()))); } catch (std::exception& e) { - // We don't handle exceptions from this code, as we've - // added visitor to internal structs we'll end up calling - // close() twice. + // We don't handle exceptions from this code, as we've + // added visitor to internal structs we'll end up calling + // close() twice. LOG(error, "Got exception we can't handle: %s", e.what()); assert(false); } _metrics.createdVisitors[visitor->getLoadType()].inc(1); visitorTimer.stop(_metrics.averageVisitorCreationTime[visitor->getLoadType()]); } else { - // Send reply - std::shared_ptr<api::CreateVisitorReply> reply( - new api::CreateVisitorReply(*cmd)); + // Send reply + auto reply = std::make_shared<api::CreateVisitorReply>(*cmd); reply->setResult(result); _messageSender.closed(cmd->getVisitorId()); _messageSender.send(reply); @@ -572,7 +554,7 @@ VisitorThread::handleMessageBusReply(mbus::Reply::UP reply, Visitor& visitor) { vespalib::MonitorGuard sync(_queueMonitor); - _queue.push_back(Event(visitor.getVisitorId(), std::move(reply))); + _queue.emplace_back(visitor.getVisitorId(), std::move(reply)); sync.broadcast(); } @@ -582,8 +564,7 @@ VisitorThread::onInternal(const std::shared_ptr<api::InternalCommand>& cmd) switch (cmd->getType()) { case PropagateVisitorConfig::ID: { - PropagateVisitorConfig& pcmd( - dynamic_cast<PropagateVisitorConfig&>(*cmd)); + auto& pcmd = dynamic_cast<PropagateVisitorConfig&>(*cmd); const vespa::config::content::core::StorVisitorConfig& config(pcmd.getConfig()); if (_defaultDocBlockSize != 0) { // Live update LOG(config, "Updating visitor thread configuration in visitor " @@ -655,12 +636,10 @@ VisitorThread::onInternal(const std::shared_ptr<api::InternalCommand>& cmd) case RequestStatusPage::ID: { LOG(spam, "Got RequestStatusPage request"); - RequestStatusPage& rsp(dynamic_cast<RequestStatusPage&>(*cmd)); + auto& rsp = dynamic_cast<RequestStatusPage&>(*cmd); vespalib::asciistream ost; getStatus(ost, rsp.getPath()); - std::shared_ptr<RequestStatusPageReply> reply( - new RequestStatusPageReply(rsp, ost.str())); - _messageSender.send(reply); + _messageSender.send(std::make_shared<RequestStatusPageReply>(rsp, ost.str())); break; } default: @@ -679,11 +658,9 @@ VisitorThread::onInternalReply(const std::shared_ptr<api::InternalReply>& r) switch (r->getType()) { case GetIterReply::ID: { - std::shared_ptr<GetIterReply> reply( - std::dynamic_pointer_cast<GetIterReply>(r)); + auto reply = std::dynamic_pointer_cast<GetIterReply>(r); assert(reply.get()); - _currentlyRunningVisitor->second->onGetIterReply( - reply, _metrics); + _currentlyRunningVisitor->second->onGetIterReply(reply, _metrics); if (_currentlyRunningVisitor->second->isCompleted()) { LOG(debug, "onGetIterReply(%s): Visitor completed.", _currentlyRunningVisitor->second->getVisitorName().c_str()); @@ -693,11 +670,9 @@ VisitorThread::onInternalReply(const std::shared_ptr<api::InternalReply>& r) } case CreateIteratorReply::ID: { - std::shared_ptr<CreateIteratorReply> reply( - std::dynamic_pointer_cast<CreateIteratorReply>(r)); + auto reply = std::dynamic_pointer_cast<CreateIteratorReply>(r); assert(reply.get()); - _currentlyRunningVisitor->second->onCreateIteratorReply( - reply, _metrics); + _currentlyRunningVisitor->second->onCreateIteratorReply(reply, _metrics); break; } default: @@ -721,25 +696,21 @@ VisitorThread::getStatus(vespalib::asciistream& out, if (status && verbose) { out << "<h3>Visitor libraries loaded</h3>\n<ul>\n"; - if (_libs.size() == 0) { + if (_libs.empty()) { out << "None\n"; } - for (LibMap::const_iterator it = _libs.begin(); it != _libs.end(); ++it) - { - out << "<li>" << it->first << "\n"; + for (const auto& lib : _libs) { + out << "<li>" << lib.first << "\n"; } out << "</ul>\n"; out << "<h3>Recently completed/failed/aborted visitors</h3>\n<ul>\n"; - if (_recentlyCompleted.size() == 0) { + if (_recentlyCompleted.empty()) { out << "None\n"; } - for (std::deque<std::pair<api::VisitorId, framework::SecondTime> > - ::const_iterator it = _recentlyCompleted.begin(); - it != _recentlyCompleted.end(); ++it) - { - out << "<li> Visitor " << it->first << " done at " - << it->second.getTime() << "\n"; + for (const auto& cv : _recentlyCompleted) { + out << "<li> Visitor " << cv.first << " done at " + << cv.second.getTime() << "\n"; } out << "</ul>\n"; out << "<h3>Current queue size: " << _queue.size() << "</h3>\n"; @@ -764,17 +735,15 @@ VisitorThread::getStatus(vespalib::asciistream& out, << "</table>\n"; } if (showAll) { - for (VisitorMap::const_iterator it = _visitors.begin(); - it != _visitors.end(); ++it) - { - out << "<h3>Visitor " << it->first << "</h3>\n"; + for (const auto& v : _visitors) { + out << "<h3>Visitor " << v.first << "</h3>\n"; std::ostringstream tmp; - it->second->getStatus(tmp, verbose); + v.second->getStatus(tmp, verbose); out << tmp.str(); } } else if (path.hasAttribute("visitor")) { out << "<h3>Visitor " << visitor << "</h3>\n"; - VisitorMap::const_iterator it = _visitors.find(visitor); + auto it = _visitors.find(visitor); if (it == _visitors.end()) { out << "Not found\n"; } else { @@ -784,7 +753,7 @@ VisitorThread::getStatus(vespalib::asciistream& out, } } else { // List visitors out << "<h3>Active visitors</h3>\n"; - if (_visitors.size() == 0) { + if (_visitors.empty()) { out << "None\n"; } for (VisitorMap::const_iterator it = _visitors.begin(); |