summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2020-06-09 12:53:41 +0000
committerTor Brede Vekterli <vekterli@verizonmedia.com>2020-06-09 13:05:49 +0000
commitbaca22920493a0781b473dcdf75893bc3f0a6bbc (patch)
treece2203b826d961413c087c35d76fcf1c596ee7df /storage
parent66c66aa167c2ba431943ad7287da3f20b11a05ab (diff)
Clean up some visiting code. No functional changes.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/vespa/storage/visiting/visitor.cpp43
-rw-r--r--storage/src/vespa/storage/visiting/visitorthread.cpp149
2 files changed, 76 insertions, 116 deletions
diff --git a/storage/src/vespa/storage/visiting/visitor.cpp b/storage/src/vespa/storage/visiting/visitor.cpp
index bdd066e8a4a..c9cda047784 100644
--- a/storage/src/vespa/storage/visiting/visitor.cpp
+++ b/storage/src/vespa/storage/visiting/visitor.cpp
@@ -14,7 +14,7 @@
#include <sstream>
#include <vespa/log/log.h>
-LOG_SETUP(".visitor.instance");
+LOG_SETUP(".visitor.instance.visitor");
using document::BucketSpace;
@@ -367,8 +367,7 @@ Visitor::sendReplyOnce()
{
assert(_initiatingCmd.get());
if (!_hasSentReply) {
- std::shared_ptr<api::StorageReply> reply(
- _initiatingCmd->makeReply().release());
+ std::shared_ptr<api::StorageReply> reply(_initiatingCmd->makeReply());
_hitCounter->updateVisitorStatistics(_visitorStatistics);
static_cast<api::CreateVisitorReply*>(reply.get())
@@ -563,8 +562,7 @@ Visitor::attach(std::shared_ptr<api::StorageCommand> initiatingCmd,
_priority = initiatingCmd->getPriority();
_timeToDie = _component.getClock().getTimeInMicros() + timeout.getMicros();
if (_initiatingCmd.get()) {
- std::shared_ptr<api::StorageReply> reply(
- _initiatingCmd->makeReply().release());
+ std::shared_ptr<api::StorageReply> reply(_initiatingCmd->makeReply());
reply->setResult(api::ReturnCode::ABORTED);
_messageHandler->send(reply);
}
@@ -594,7 +592,7 @@ Visitor::attach(std::shared_ptr<api::StorageCommand> initiatingCmd,
// In case there was no messages to resend we need to call
// continueVisitor to provoke it to resume.
- for (uint32_t i=0; i<_visitorOptions._maxParallelOneBucket; ++i) {
+ for (uint32_t i = 0; i < _visitorOptions._maxParallelOneBucket; ++i) {
if (!continueVisitor()) return;
}
}
@@ -669,8 +667,7 @@ Visitor::handleDocumentApiReply(mbus::Reply::UP reply,
return;
}
assert(!meta.message);
- meta.message.reset(
- static_cast<documentapi::DocumentMessage*>(message.release()));
+ meta.message.reset(static_cast<documentapi::DocumentMessage*>(message.release()));
meta.retryCount++;
const size_t retryCount = meta.retryCount;
@@ -702,7 +699,7 @@ Visitor::onCreateIteratorReply(
const std::shared_ptr<CreateIteratorReply>& reply,
VisitorThreadMetrics& /*metrics*/)
{
- std::list<BucketIterationState*>::reverse_iterator it = _bucketStates.rbegin();
+ auto it = _bucketStates.rbegin();
document::Bucket bucket(reply->getBucket());
document::BucketId bucketId(bucket.getBucketId());
@@ -752,7 +749,7 @@ Visitor::onGetIterReply(const std::shared_ptr<GetIterReply>& reply,
_id.c_str(),
reply->getBucketId().toString().c_str(),
reply->getResult().toString().c_str());
- std::list<BucketIterationState*>::reverse_iterator it = _bucketStates.rbegin();
+ auto it = _bucketStates.rbegin();
// New requests will be pushed on end of list.. So searching
// in reverse order should quickly get correct result.
@@ -803,7 +800,7 @@ Visitor::onGetIterReply(const std::shared_ptr<GetIterReply>& reply,
LOG(debug, "Visitor %s handling block of %zu documents.",
_id.c_str(),
reply->getEntries().size());
- try{
+ try {
framework::MilliSecTimer processingTimer(_component.getClock());
handleDocuments(reply->getBucketId(),
reply->getEntries(),
@@ -814,18 +811,16 @@ Visitor::onGetIterReply(const std::shared_ptr<GetIterReply>& reply,
MBUS_TRACE(reply->getTrace(), 5, "Done processing data block in visitor plugin");
uint64_t size = 0;
- for (size_t i = 0; i < reply->getEntries().size(); ++i) {
- size += reply->getEntries()[i]->getPersistedDocumentSize();
+ for (const auto& entry : reply->getEntries()) {
+ size += entry->getPersistedDocumentSize();
}
_visitorStatistics.setDocumentsVisited(
_visitorStatistics.getDocumentsVisited()
+ reply->getEntries().size());
- _visitorStatistics.setBytesVisited(
- _visitorStatistics.getBytesVisited() + size);
+ _visitorStatistics.setBytesVisited(_visitorStatistics.getBytesVisited() + size);
} catch (std::exception& e) {
- LOG(warning, "handleDocuments threw exception %s",
- e.what());
+ LOG(warning, "handleDocuments threw exception %s", e.what());
reportProblem(e.what());
}
}
@@ -849,8 +844,7 @@ Visitor::sendDueQueuedMessages(framework::MicroSecTime timeNow)
while (!_visitorTarget._queuedMessages.empty()
&& (_visitorTarget._pendingMessages.size()
< _visitorOptions._maxPending)) {
- VisitorTarget::MessageQueue::iterator it(
- _visitorTarget._queuedMessages.begin());
+ auto it = _visitorTarget._queuedMessages.begin();
if (it->first < timeNow) {
auto& msgMeta = _visitorTarget.metaForMessageId(it->second);
_visitorTarget._queuedMessages.erase(it);
@@ -1204,13 +1198,10 @@ Visitor::getIterators()
if (sentCount == 0) {
if (LOG_WOULD_LOG(debug)) {
LOG(debug, "Enough iterators being processed. Doing nothing for "
- "visitor '%s' bucketStates = %d.",
- _id.c_str(), (int)_bucketStates.size());
- for (std::list<BucketIterationState*>::iterator it(
- _bucketStates.begin());
- it != _bucketStates.end(); ++it)
- {
- LOG(debug, "Existing: %s", (*it)->toString().c_str());
+ "visitor '%s' bucketStates = %zu.",
+ _id.c_str(), _bucketStates.size());
+ for (const auto& state : _bucketStates) {
+ LOG(debug, "Existing: %s", state->toString().c_str());
}
}
}
diff --git a/storage/src/vespa/storage/visiting/visitorthread.cpp b/storage/src/vespa/storage/visiting/visitorthread.cpp
index 006af5edf7d..c6e75735690 100644
--- a/storage/src/vespa/storage/visiting/visitorthread.cpp
+++ b/storage/src/vespa/storage/visiting/visitorthread.cpp
@@ -140,8 +140,7 @@ VisitorThread::shutdown()
.getType() != PropagateVisitorConfig::ID))
{
std::shared_ptr<api::StorageReply> reply(
- static_cast<api::StorageCommand&>(*it->_message)
- .makeReply().release());
+ static_cast<api::StorageCommand&>(*it->_message).makeReply());
reply->setResult(api::ReturnCode(api::ReturnCode::ABORTED,
"Shutting down storage node."));
_messageSender.send(reply);
@@ -225,9 +224,9 @@ VisitorThread::run(framework::ThreadHandle& thread)
if (entry._message.get()) {
// If visitor doesn't exist, log failure only if it wasn't
// recently deleted
- if (_currentlyRunningVisitor == _visitors.end() &&
- entry._message->getType() != api::MessageType::VISITOR_CREATE &&
- entry._message->getType() != api::MessageType::INTERNAL)
+ if ((_currentlyRunningVisitor == _visitors.end()) &&
+ (entry._message->getType() != api::MessageType::VISITOR_CREATE) &&
+ (entry._message->getType() != api::MessageType::INTERNAL))
{
handleNonExistingVisitorCall(entry, result);
} else {
@@ -264,10 +263,8 @@ VisitorThread::run(framework::ThreadHandle& thread)
if (!handled && entry._message.get() &&
!entry._message->getType().isReply())
{
- api::StorageCommand& cmd(
- dynamic_cast<api::StorageCommand&>(*entry._message));
- std::shared_ptr<api::StorageReply> reply(
- cmd.makeReply().release());
+ auto& cmd = dynamic_cast<api::StorageCommand&>(*entry._message);
+ std::shared_ptr<api::StorageReply> reply(cmd.makeReply());
reply->setResult(result);
_messageSender.send(reply);
}
@@ -278,10 +275,8 @@ void
VisitorThread::tick()
{
// Give all visitors an event
- for (VisitorMap::iterator it = _visitors.begin(); it != _visitors.end();)
- {
- LOG(spam, "Giving tick to visitor %s.",
- it->second->getVisitorName().c_str());
+ for (auto it = _visitors.begin(); it != _visitors.end();) {
+ LOG(spam, "Giving tick to visitor %s.", it->second->getVisitorName().c_str());
it->second->continueVisitor();
if (it->second->isCompleted()) {
LOG(debug, "Closing visitor %s. Visitor marked as completed",
@@ -312,11 +307,9 @@ VisitorThread::close()
} else {
_metrics.completedVisitors[loadType].inc(1);
}
- framework::SecondTime currentTime(
- _component.getClock().getTimeInSeconds());
+ framework::SecondTime currentTime(_component.getClock().getTimeInSeconds());
trimRecentlyCompletedList(currentTime);
- _recentlyCompleted.push_back(std::make_pair(
- _currentlyRunningVisitor->first, currentTime));
+ _recentlyCompleted.emplace_back(_currentlyRunningVisitor->first, currentTime);
_visitors.erase(_currentlyRunningVisitor);
_currentlyRunningVisitor = _visitors.end();
}
@@ -324,8 +317,7 @@ VisitorThread::close()
void
VisitorThread::trimRecentlyCompletedList(framework::SecondTime currentTime)
{
- framework::SecondTime recentLimit(
- currentTime - framework::SecondTime(30));
+ framework::SecondTime recentLimit(currentTime - framework::SecondTime(30));
// Dump all elements that aren't recent anymore
while (!_recentlyCompleted.empty()
&& _recentlyCompleted.front().second < recentLimit)
@@ -339,16 +331,12 @@ VisitorThread::handleNonExistingVisitorCall(const Event& entry,
ReturnCode& code)
{
// Get current time. Set the time that is the oldest still recent.
- framework::SecondTime currentTime(
- _component.getClock().getTimeInSeconds());;
+ framework::SecondTime currentTime(_component.getClock().getTimeInSeconds());;
trimRecentlyCompletedList(currentTime);
// Go through all recent visitors. Ignore request if recent
- for (std::deque<std::pair<api::VisitorId, framework::SecondTime> >
- ::iterator it = _recentlyCompleted.begin();
- it != _recentlyCompleted.end(); ++it)
- {
- if (it->first == entry._visitorId) {
+ for (const auto& e : _recentlyCompleted) {
+ if (e.first == entry._visitorId) {
code = ReturnCode(ReturnCode::ILLEGAL_PARAMETERS,
"Visitor recently completed/failed/aborted.");
return;
@@ -371,13 +359,13 @@ VisitorThread::createVisitor(vespalib::stringref libName,
vespalib::string str = libName;
std::transform(str.begin(), str.end(), str.begin(), tolower);
- VisitorFactory::Map::iterator it(_visitorFactories.find(str));
+ auto it = _visitorFactories.find(str);
if (it == _visitorFactories.end()) {
error << "Visitor library " << str << " not found.";
return std::shared_ptr<Visitor>();
}
- LibMap::iterator libIter = _libs.find(str);
+ auto libIter = _libs.find(str);
if (libIter == _libs.end()) {
_libs[str] = std::shared_ptr<VisitorEnvironment>(
it->second->makeVisitorEnvironment(_component).release());
@@ -402,17 +390,15 @@ namespace {
std::unique_ptr<api::StorageMessageAddress>
getDataAddress(const api::CreateVisitorCommand& cmd)
{
- return std::unique_ptr<api::StorageMessageAddress>(
- new api::StorageMessageAddress(
- mbus::Route::parse(cmd.getDataDestination())));
+ return std::make_unique<api::StorageMessageAddress>(
+ mbus::Route::parse(cmd.getDataDestination()));
}
std::unique_ptr<api::StorageMessageAddress>
getControlAddress(const api::CreateVisitorCommand& cmd)
{
- return std::unique_ptr<api::StorageMessageAddress>(
- new api::StorageMessageAddress(
- mbus::Route::parse(cmd.getControlDestination())));
+ return std::make_unique<api::StorageMessageAddress>(
+ mbus::Route::parse(cmd.getControlDestination()));
}
void
@@ -447,28 +433,27 @@ VisitorThread::onCreateVisitor(
std::unique_ptr<api::StorageMessageAddress> dataAddress;
std::shared_ptr<Visitor> visitor;
do {
- // If no buckets are specified, fail command
- if (cmd->getBuckets().size() == 0) {
+ // If no buckets are specified, fail command
+ if (cmd->getBuckets().empty()) {
result = ReturnCode(ReturnCode::ILLEGAL_PARAMETERS,
"No buckets specified");
LOG(warning, "CreateVisitor(%s): No buckets specified. Aborting.",
cmd->getInstanceId().c_str());
break;
}
- // Get the source address
+ // Get the source address
controlAddress = getControlAddress(*cmd);
dataAddress = getDataAddress(*cmd);
- // Attempt to load library containing visitor
+ // Attempt to load library containing visitor
vespalib::asciistream errors;
- visitor = createVisitor(cmd->getLibraryName(), cmd->getParameters(),
- errors);
- if (visitor.get() == 0) {
+ visitor = createVisitor(cmd->getLibraryName(), cmd->getParameters(), errors);
+ if (!visitor) {
result = ReturnCode(ReturnCode::ILLEGAL_PARAMETERS, errors.str());
LOG(warning, "CreateVisitor(%s): Failed to create visitor: %s",
cmd->getInstanceId().c_str(), errors.str().data());
break;
}
- // Set visitor parameters
+ // Set visitor parameters
if (cmd->getMaximumPendingReplyCount() != 0) {
visitor->setMaxPending(cmd->getMaximumPendingReplyCount());
} else {
@@ -494,11 +479,9 @@ VisitorThread::onCreateVisitor(
// Parse document selection
try{
- if (cmd->getDocumentSelection() != "") {
- std::shared_ptr<const document::DocumentTypeRepo> repo(
- _component.getTypeRepo());
- const document::BucketIdFactory& idFactory(
- _component.getBucketIdFactory());
+ if (!cmd->getDocumentSelection().empty()) {
+ std::shared_ptr<const document::DocumentTypeRepo> repo(_component.getTypeRepo());
+ const document::BucketIdFactory& idFactory(_component.getBucketIdFactory());
document::select::Parser parser(*repo, idFactory);
docSelection = parser.parse(cmd->getDocumentSelection());
validateDocumentSelection(*repo, *docSelection);
@@ -522,11 +505,11 @@ VisitorThread::onCreateVisitor(
}
LOG(debug, "CreateVisitor(%s): Successfully created visitor",
cmd->getInstanceId().c_str());
- // Insert visitor prior to creating successful reply.
+ // Insert visitor prior to creating successful reply.
} while (false);
- // Start the visitor last, as to ensure client will receive
- // visitor create reply first, and that all errors we could detect
- // resulted in proper error code in reply..
+ // Start the visitor last, as to ensure client will receive
+ // visitor create reply first, and that all errors we could detect
+ // resulted in proper error code in reply..
if (result.success()) {
_visitors[cmd->getVisitorId()] = visitor;
try{
@@ -548,18 +531,17 @@ VisitorThread::onCreateVisitor(
visitor->attach(cmd, *controlAddress, *dataAddress,
framework::MilliSecTime(vespalib::count_ms(cmd->getTimeout())));
} catch (std::exception& e) {
- // We don't handle exceptions from this code, as we've
- // added visitor to internal structs we'll end up calling
- // close() twice.
+ // We don't handle exceptions from this code, as we've
+ // added visitor to internal structs we'll end up calling
+ // close() twice.
LOG(error, "Got exception we can't handle: %s", e.what());
assert(false);
}
_metrics.createdVisitors[visitor->getLoadType()].inc(1);
visitorTimer.stop(_metrics.averageVisitorCreationTime[visitor->getLoadType()]);
} else {
- // Send reply
- std::shared_ptr<api::CreateVisitorReply> reply(
- new api::CreateVisitorReply(*cmd));
+ // Send reply
+ auto reply = std::make_shared<api::CreateVisitorReply>(*cmd);
reply->setResult(result);
_messageSender.closed(cmd->getVisitorId());
_messageSender.send(reply);
@@ -572,7 +554,7 @@ VisitorThread::handleMessageBusReply(mbus::Reply::UP reply,
Visitor& visitor)
{
vespalib::MonitorGuard sync(_queueMonitor);
- _queue.push_back(Event(visitor.getVisitorId(), std::move(reply)));
+ _queue.emplace_back(visitor.getVisitorId(), std::move(reply));
sync.broadcast();
}
@@ -582,8 +564,7 @@ VisitorThread::onInternal(const std::shared_ptr<api::InternalCommand>& cmd)
switch (cmd->getType()) {
case PropagateVisitorConfig::ID:
{
- PropagateVisitorConfig& pcmd(
- dynamic_cast<PropagateVisitorConfig&>(*cmd));
+ auto& pcmd = dynamic_cast<PropagateVisitorConfig&>(*cmd);
const vespa::config::content::core::StorVisitorConfig& config(pcmd.getConfig());
if (_defaultDocBlockSize != 0) { // Live update
LOG(config, "Updating visitor thread configuration in visitor "
@@ -655,12 +636,10 @@ VisitorThread::onInternal(const std::shared_ptr<api::InternalCommand>& cmd)
case RequestStatusPage::ID:
{
LOG(spam, "Got RequestStatusPage request");
- RequestStatusPage& rsp(dynamic_cast<RequestStatusPage&>(*cmd));
+ auto& rsp = dynamic_cast<RequestStatusPage&>(*cmd);
vespalib::asciistream ost;
getStatus(ost, rsp.getPath());
- std::shared_ptr<RequestStatusPageReply> reply(
- new RequestStatusPageReply(rsp, ost.str()));
- _messageSender.send(reply);
+ _messageSender.send(std::make_shared<RequestStatusPageReply>(rsp, ost.str()));
break;
}
default:
@@ -679,11 +658,9 @@ VisitorThread::onInternalReply(const std::shared_ptr<api::InternalReply>& r)
switch (r->getType()) {
case GetIterReply::ID:
{
- std::shared_ptr<GetIterReply> reply(
- std::dynamic_pointer_cast<GetIterReply>(r));
+ auto reply = std::dynamic_pointer_cast<GetIterReply>(r);
assert(reply.get());
- _currentlyRunningVisitor->second->onGetIterReply(
- reply, _metrics);
+ _currentlyRunningVisitor->second->onGetIterReply(reply, _metrics);
if (_currentlyRunningVisitor->second->isCompleted()) {
LOG(debug, "onGetIterReply(%s): Visitor completed.",
_currentlyRunningVisitor->second->getVisitorName().c_str());
@@ -693,11 +670,9 @@ VisitorThread::onInternalReply(const std::shared_ptr<api::InternalReply>& r)
}
case CreateIteratorReply::ID:
{
- std::shared_ptr<CreateIteratorReply> reply(
- std::dynamic_pointer_cast<CreateIteratorReply>(r));
+ auto reply = std::dynamic_pointer_cast<CreateIteratorReply>(r);
assert(reply.get());
- _currentlyRunningVisitor->second->onCreateIteratorReply(
- reply, _metrics);
+ _currentlyRunningVisitor->second->onCreateIteratorReply(reply, _metrics);
break;
}
default:
@@ -721,25 +696,21 @@ VisitorThread::getStatus(vespalib::asciistream& out,
if (status && verbose) {
out << "<h3>Visitor libraries loaded</h3>\n<ul>\n";
- if (_libs.size() == 0) {
+ if (_libs.empty()) {
out << "None\n";
}
- for (LibMap::const_iterator it = _libs.begin(); it != _libs.end(); ++it)
- {
- out << "<li>" << it->first << "\n";
+ for (const auto& lib : _libs) {
+ out << "<li>" << lib.first << "\n";
}
out << "</ul>\n";
out << "<h3>Recently completed/failed/aborted visitors</h3>\n<ul>\n";
- if (_recentlyCompleted.size() == 0) {
+ if (_recentlyCompleted.empty()) {
out << "None\n";
}
- for (std::deque<std::pair<api::VisitorId, framework::SecondTime> >
- ::const_iterator it = _recentlyCompleted.begin();
- it != _recentlyCompleted.end(); ++it)
- {
- out << "<li> Visitor " << it->first << " done at "
- << it->second.getTime() << "\n";
+ for (const auto& cv : _recentlyCompleted) {
+ out << "<li> Visitor " << cv.first << " done at "
+ << cv.second.getTime() << "\n";
}
out << "</ul>\n";
out << "<h3>Current queue size: " << _queue.size() << "</h3>\n";
@@ -764,17 +735,15 @@ VisitorThread::getStatus(vespalib::asciistream& out,
<< "</table>\n";
}
if (showAll) {
- for (VisitorMap::const_iterator it = _visitors.begin();
- it != _visitors.end(); ++it)
- {
- out << "<h3>Visitor " << it->first << "</h3>\n";
+ for (const auto& v : _visitors) {
+ out << "<h3>Visitor " << v.first << "</h3>\n";
std::ostringstream tmp;
- it->second->getStatus(tmp, verbose);
+ v.second->getStatus(tmp, verbose);
out << tmp.str();
}
} else if (path.hasAttribute("visitor")) {
out << "<h3>Visitor " << visitor << "</h3>\n";
- VisitorMap::const_iterator it = _visitors.find(visitor);
+ auto it = _visitors.find(visitor);
if (it == _visitors.end()) {
out << "Not found\n";
} else {
@@ -784,7 +753,7 @@ VisitorThread::getStatus(vespalib::asciistream& out,
}
} else { // List visitors
out << "<h3>Active visitors</h3>\n";
- if (_visitors.size() == 0) {
+ if (_visitors.empty()) {
out << "None\n";
}
for (VisitorMap::const_iterator it = _visitors.begin();