diff options
author | Henning Baldersheim <balder@oath.com> | 2018-03-20 14:27:23 +0100 |
---|---|---|
committer | Henning Baldersheim <balder@oath.com> | 2018-04-09 14:19:48 +0200 |
commit | 4f01536d66fdfcb7978d841c5aabb441b1e846ce (patch) | |
tree | fad90ed1216126682a52bb4294400a3385beedde /storage | |
parent | e5b2095628ed2aa7c24245f403d086b04ef0727c (diff) |
- Use std::make_
- C+11 for loops
- Use unique_ptr to shared_ptr built in assignment to avoid explicit release.
Diffstat (limited to 'storage')
-rw-r--r-- | storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp | 314 | ||||
-rw-r--r-- | storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h | 28 |
2 files changed, 111 insertions, 231 deletions
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp index 42e5d05d650..94eb85a8c49 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp @@ -134,15 +134,15 @@ FileStorHandlerImpl::flush(bool killPendingMerges) Disk& t(_diskInfo[i]); vespalib::MonitorGuard lockGuard(t.lock); while (t.getQueueSize() != 0 || !t.lockedBuckets.empty()) { - LOG(debug, "Still %d in queue and %ld locked buckets for disk '%d'", t.getQueueSize(), t.lockedBuckets.size(), i); + LOG(debug, "Still %d in queue and %ld locked buckets for disk '%d'", + t.getQueueSize(), t.lockedBuckets.size(), i); lockGuard.wait(100); } LOG(debug, "All queues and bucket locks released for disk '%d'", i); } if (killPendingMerges) { - api::ReturnCode code(api::ReturnCode::ABORTED, - "Storage node is shutting down"); + api::ReturnCode code(api::ReturnCode::ABORTED, "Storage node is shutting down"); for (auto & entry : _mergeStates) { MergeStatus& s(*entry.second); @@ -164,12 +164,10 @@ FileStorHandlerImpl::flush(bool killPendingMerges) } void -FileStorHandlerImpl::reply(api::StorageMessage& msg, - DiskState state) const +FileStorHandlerImpl::reply(api::StorageMessage& msg, DiskState state) const { if (!msg.getType().isReply()) { - std::shared_ptr<api::StorageReply> rep( - static_cast<api::StorageCommand&>(msg).makeReply().release()); + std::shared_ptr<api::StorageReply> rep = static_cast<api::StorageCommand&>(msg).makeReply(); if (state == FileStorHandler::DISABLED) { rep->setResult(api::ReturnCode(api::ReturnCode::DISK_FAILURE, "Disk disabled")); } else { @@ -231,8 +229,7 @@ FileStorHandlerImpl::getQueueSize() const } bool -FileStorHandlerImpl::schedule(const std::shared_ptr<api::StorageMessage>& msg, - uint16_t disk) +FileStorHandlerImpl::schedule(const std::shared_ptr<api::StorageMessage>& msg, uint16_t disk) { assert(disk < _diskInfo.size()); Disk& t(_diskInfo[disk]); @@ -241,15 +238,10 @@ FileStorHandlerImpl::schedule(const std::shared_ptr<api::StorageMessage>& msg, if (t.getState() == FileStorHandler::AVAILABLE) { MBUS_TRACE(msg->getTrace(), 5, vespalib::make_string( - "FileStorHandler: Operation added to disk %d's queue with " - "priority %u", disk, msg->getPriority())); + "FileStorHandler: Operation added to disk %d's queue with priority %u", disk, msg->getPriority())); t.queue.emplace_back(std::move(messageEntry)); - - LOG(spam, "Queued operation %s with priority %u.", - msg->getType().toString().c_str(), - msg->getPriority()); - + LOG(spam, "Queued operation %s with priority %u.", msg->getType().toString().c_str(), msg->getPriority()); lockGuard.broadcast(); } else { return false; @@ -287,25 +279,20 @@ FileStorHandlerImpl::messageMayBeAborted(const api::StorageMessage& msg) const } void -FileStorHandlerImpl::abortQueuedCommandsForBuckets( - Disk& disk, - const AbortBucketOperationsCommand& cmd) +FileStorHandlerImpl::abortQueuedCommandsForBuckets(Disk& disk, const AbortBucketOperationsCommand& cmd) { Disk& t(disk); vespalib::MonitorGuard diskLock(t.lock); typedef PriorityQueue::iterator iter_t; api::ReturnCode abortedCode(api::ReturnCode::ABORTED, - "Sending distributor no longer owns " - "bucket operation was bound to or storage node went down"); + "Sending distributor no longer owns bucket operation was bound to, " + "or storage node went down"); for (iter_t it(t.queue.begin()), e(t.queue.end()); it != e;) { api::StorageMessage& msg(*it->_command); if (messageMayBeAborted(msg) && cmd.shouldAbort(it->_bucket)) { - LOG(debug, - "Aborting operation %s as it is bound for bucket %s", - msg.toString().c_str(), - it->_bucket.getBucketId().toString().c_str()); - std::shared_ptr<api::StorageReply> msgReply( - static_cast<api::StorageCommand&>(msg).makeReply().release()); + LOG(debug, "Aborting operation %s as it is bound for bucket %s", + msg.toString().c_str(), it->_bucket.getBucketId().toString().c_str()); + std::shared_ptr<api::StorageReply> msgReply = static_cast<api::StorageCommand&>(msg).makeReply(); msgReply->setResult(abortedCode); _messageSender.sendReply(msgReply); @@ -317,15 +304,12 @@ FileStorHandlerImpl::abortQueuedCommandsForBuckets( } bool -FileStorHandlerImpl::diskHasActiveOperationForAbortedBucket( - const Disk& disk, - const AbortBucketOperationsCommand& cmd) const +FileStorHandlerImpl::diskHasActiveOperationForAbortedBucket(const Disk& disk, + const AbortBucketOperationsCommand& cmd) const { for (auto& lockedBucket : disk.lockedBuckets) { if (cmd.shouldAbort(lockedBucket.first)) { - LOG(spam, - "Disk had active operation for aborted bucket %s, " - "waiting for it to complete...", + LOG(spam, "Disk had active operation for aborted bucket %s, waiting for it to complete...", lockedBucket.first.toString().c_str()); return true; } @@ -334,9 +318,7 @@ FileStorHandlerImpl::diskHasActiveOperationForAbortedBucket( } void -FileStorHandlerImpl::waitUntilNoActiveOperationsForAbortedBuckets( - Disk& disk, - const AbortBucketOperationsCommand& cmd) +FileStorHandlerImpl::waitUntilNoActiveOperationsForAbortedBuckets(Disk& disk, const AbortBucketOperationsCommand& cmd) { vespalib::MonitorGuard guard(disk.lock); while (diskHasActiveOperationForAbortedBucket(disk, cmd)) { @@ -346,8 +328,7 @@ FileStorHandlerImpl::waitUntilNoActiveOperationsForAbortedBuckets( } void -FileStorHandlerImpl::abortQueuedOperations( - const AbortBucketOperationsCommand& cmd) +FileStorHandlerImpl::abortQueuedOperations(const AbortBucketOperationsCommand& cmd) { // Do queue clearing and active operation waiting in two passes // to allow disk threads to drain running operations in parallel. @@ -386,7 +367,7 @@ FileStorHandlerImpl::getNextMessage(uint16_t disk, FileStorHandler::LockedMessag } vespalib::MonitorGuard lockGuard(t.lock); - BucketIdx& idx = boost::multi_index::get<2>(t.queue); + BucketIdx& idx = bmi::get<2>(t.queue); std::pair<BucketIdx::iterator, BucketIdx::iterator> range = idx.equal_range(bucket); // No more for this bucket. @@ -417,7 +398,7 @@ FileStorHandlerImpl::getNextMessage(uint16_t disk, FileStorHandler::LockedMessag lockGuard.unlock(); return lck; } else { - std::shared_ptr<api::StorageReply> msgReply(static_cast<api::StorageCommand&>(m).makeReply().release()); + std::shared_ptr<api::StorageReply> msgReply = static_cast<api::StorageCommand&>(m).makeReply(); idx.erase(range.first); lockGuard.broadcast(); lockGuard.unlock(); @@ -450,8 +431,7 @@ FileStorHandlerImpl::diskIsClosed(uint16_t disk) const } bool -FileStorHandlerImpl::messageTimedOutInQueue(const api::StorageMessage& msg, - uint64_t waitTime) const +FileStorHandlerImpl::messageTimedOutInQueue(const api::StorageMessage& msg, uint64_t waitTime) const { if (msg.getType().isReply()) { return false; // Replies must always be processed and cannot time out. @@ -460,11 +440,8 @@ FileStorHandlerImpl::messageTimedOutInQueue(const api::StorageMessage& msg, } std::unique_ptr<FileStorHandler::BucketLockInterface> -FileStorHandlerImpl::takeDiskBucketLockOwnership( - const vespalib::MonitorGuard & guard, - Disk& disk, - const document::Bucket &bucket, - const api::StorageMessage& msg) +FileStorHandlerImpl::takeDiskBucketLockOwnership(const vespalib::MonitorGuard & guard, Disk& disk, + const document::Bucket &bucket, const api::StorageMessage& msg) { return std::make_unique<BucketLock>(guard, disk, bucket, msg.getPriority(), msg.getSummary()); } @@ -473,11 +450,8 @@ std::unique_ptr<api::StorageReply> FileStorHandlerImpl::makeQueueTimeoutReply(api::StorageMessage& msg) const { assert(!msg.getType().isReply()); - std::unique_ptr<api::StorageReply> msgReply( - static_cast<api::StorageCommand&>(msg).makeReply().release()); - msgReply->setResult(api::ReturnCode( - api::ReturnCode::TIMEOUT, - "Message waited too long in storage queue")); + std::unique_ptr<api::StorageReply> msgReply = static_cast<api::StorageCommand&>(msg).makeReply(); + msgReply->setResult(api::ReturnCode(api::ReturnCode::TIMEOUT, "Message waited too long in storage queue")); return msgReply; } @@ -504,7 +478,7 @@ FileStorHandlerImpl::getNextMessage(uint16_t disk) // second attempt. This is key to allowing the run loop to register // ticks at regular intervals while not busy-waiting. for (int attempt = 0; (attempt < 2) && ! diskIsClosed(disk); ++attempt) { - PriorityIdx& idx(boost::multi_index::get<1>(t.queue)); + PriorityIdx& idx(bmi::get<1>(t.queue)); PriorityIdx::iterator iter(idx.begin()), end(idx.end()); while (iter != end && bucketIsLockedOnDisk(iter->_bucket, t)) { @@ -527,8 +501,7 @@ FileStorHandlerImpl::getMessage(vespalib::MonitorGuard & guard, Disk & t, Priori api::StorageMessage & m(*iter->_command); const uint64_t waitTime( - const_cast<metrics::MetricTimer &>(iter->_timer).stop( - t.metrics->averageQueueWaitingTime[m.getLoadType()])); + const_cast<metrics::MetricTimer &>(iter->_timer).stop(t.metrics->averageQueueWaitingTime[m.getLoadType()])); mbus::Trace &trace(m.getTrace()); MBUS_TRACE(trace, 9, "FileStorHandler: Message identified by disk thread."); @@ -585,11 +558,8 @@ namespace { monitors[index] = &monitor; } void lock() { - for (std::map<uint16_t, vespalib::Monitor*>::iterator it - = monitors.begin(); it != monitors.end(); ++it) - { - guards.push_back(std::shared_ptr<vespalib::MonitorGuard>( - new vespalib::MonitorGuard(*it->second))); + for (auto & entry : monitors) { + guards.push_back(std::make_shared<vespalib::MonitorGuard>(*entry.second)); } } }; @@ -605,12 +575,10 @@ namespace { return static_cast<const api::PutCommand&>(msg).getDocumentId(); break; case api::MessageType::UPDATE_ID: - return static_cast<const api::UpdateCommand&>(msg) - .getDocumentId(); + return static_cast<const api::UpdateCommand&>(msg).getDocumentId(); break; case api::MessageType::REMOVE_ID: - return static_cast<const api::RemoveCommand&>(msg) - .getDocumentId(); + return static_cast<const api::RemoveCommand&>(msg).getDocumentId(); break; default: assert(false); @@ -633,9 +601,7 @@ namespace { } int -FileStorHandlerImpl::calculateTargetBasedOnDocId( - const api::StorageMessage& msg, - std::vector<RemapInfo*>& targets) +FileStorHandlerImpl::calculateTargetBasedOnDocId(const api::StorageMessage& msg, std::vector<RemapInfo*>& targets) { document::DocumentId id(getDocId(msg)); document::Bucket bucket(msg.getBucket().getBucketSpace(), _bucketIdFactory.getBucketId(id)); @@ -651,13 +617,18 @@ FileStorHandlerImpl::calculateTargetBasedOnDocId( return -1; } +namespace { + +const char * +splitOrJoin(FileStorHandlerImpl::Operation op) { + return (op == FileStorHandlerImpl::Operation::SPLIT) ? "Bucket was just split" : "Bucket was just joined"; +} + +} + document::Bucket -FileStorHandlerImpl::remapMessage( - api::StorageMessage& msg, - const document::Bucket& source, - Operation op, - std::vector<RemapInfo*>& targets, - uint16_t& targetDisk, api::ReturnCode& returnCode) +FileStorHandlerImpl::remapMessage(api::StorageMessage& msg, const document::Bucket& source, Operation op, + std::vector<RemapInfo*>& targets, uint16_t& targetDisk, api::ReturnCode& returnCode) { document::Bucket newBucket = source; @@ -668,8 +639,7 @@ FileStorHandlerImpl::remapMessage( case api::MessageType::REMOVE_ID: // Move to correct queue { - api::BucketCommand& cmd( - static_cast<api::BucketCommand&>(msg)); + api::BucketCommand& cmd(static_cast<api::BucketCommand&>(msg)); if (cmd.getBucket() == source) { if (op == SPLIT) { @@ -693,8 +663,7 @@ FileStorHandlerImpl::remapMessage( } else { document::DocumentId did(getDocId(msg)); document::BucketId bucket = _bucketIdFactory.getBucketId(did); - uint32_t commonBits( - findCommonBits(targets[0]->bucket.getBucketId(), bucket)); + uint32_t commonBits(findCommonBits(targets[0]->bucket.getBucketId(), bucket)); if (commonBits < source.getBucketId().getUsedBits()) { std::ostringstream ost; ost << bucket << " belongs in neither " @@ -703,10 +672,8 @@ FileStorHandlerImpl::remapMessage( << "did not belong in the original " << "bucket " << source.getBucketId(); LOG(error, "Error remapping %s after split %s", - cmd.getType().toString().c_str(), - ost.str().c_str()); - returnCode = api::ReturnCode( - api::ReturnCode::REJECTED, ost.str()); + cmd.getType().toString().c_str(), ost.str().c_str()); + returnCode = api::ReturnCode(api::ReturnCode::REJECTED, ost.str()); } else { std::ostringstream ost; assert(targets.size() == 2); @@ -716,9 +683,7 @@ FileStorHandlerImpl::remapMessage( << "Failing operation so distributor can create " << "bucket on correct node."; LOG(debug, "%s", ost.str().c_str()); - returnCode = api::ReturnCode( - api::ReturnCode::BUCKET_DELETED, - ost.str()); + returnCode = api::ReturnCode(api::ReturnCode::BUCKET_DELETED, ost.str()); } } } else { @@ -740,8 +705,7 @@ FileStorHandlerImpl::remapMessage( } } else { LOG(debug, "Did not remap %s with bucket %s from bucket %s", - cmd.toString().c_str(), cmd.getBucketId().toString().c_str(), - source.toString().c_str()); + cmd.toString().c_str(), cmd.getBucketId().toString().c_str(), source.toString().c_str()); assert(false); } break; @@ -755,15 +719,13 @@ FileStorHandlerImpl::remapMessage( // if op == MOVE. If op != MOVE, fail with bucket not found // and clear filestor thread state { - api::BucketCommand& cmd( - static_cast<api::BucketCommand&>(msg)); + api::BucketCommand& cmd(static_cast<api::BucketCommand&>(msg)); if (cmd.getBucket() == source) { if (op != MOVE) { std::ostringstream ost; ost << "Bucket " << (op == SPLIT ? "split" : "joined") << ". Cannot remap merge, so aborting it"; - api::ReturnCode code(api::ReturnCode::BUCKET_DELETED, - ost.str()); + api::ReturnCode code(api::ReturnCode::BUCKET_DELETED, ost.str()); clearMergeStatus(cmd.getBucket(), &code); } } @@ -775,19 +737,14 @@ FileStorHandlerImpl::remapMessage( // Fail with bucket not found if op is JOIN // Ok if op is SPLIT, as we have already done as requested. { - api::BucketCommand& cmd( - static_cast<api::BucketCommand&>(msg)); + api::BucketCommand& cmd(static_cast<api::BucketCommand&>(msg)); if (cmd.getBucket() == source) { if (op == MOVE) { targetDisk = targets[0]->diskIndex; } else if (op == SPLIT) { - returnCode = api::ReturnCode( - api::ReturnCode::BUCKET_DELETED, - "Bucket split while operation enqueued"); + returnCode = api::ReturnCode(api::ReturnCode::BUCKET_DELETED, "Bucket split while operation enqueued"); } else { - returnCode = api::ReturnCode( - api::ReturnCode::BUCKET_DELETED, - "Bucket was just joined"); + returnCode = api::ReturnCode(api::ReturnCode::BUCKET_DELETED, "Bucket was just joined"); } } break; @@ -805,10 +762,7 @@ FileStorHandlerImpl::remapMessage( if (op == MOVE) { targetDisk = targets[0]->diskIndex; } else { - returnCode = api::ReturnCode( - api::ReturnCode::BUCKET_DELETED, - op == SPLIT ? "Bucket was just split" - : "Bucket was just joined"); + returnCode = api::ReturnCode(api::ReturnCode::BUCKET_DELETED, splitOrJoin(op)); } } break; @@ -818,8 +772,7 @@ FileStorHandlerImpl::remapMessage( case api::MessageType::JOINBUCKETS_ID: // Move to correct queue if op == MOVE. Otherwise ignore. { - api::BucketCommand& cmd( - static_cast<api::BucketCommand&>(msg)); + api::BucketCommand& cmd(static_cast<api::BucketCommand&>(msg)); if (cmd.getBucket() == source) { if (op == MOVE) { targetDisk = targets[0]->diskIndex; @@ -829,8 +782,7 @@ FileStorHandlerImpl::remapMessage( } case api::MessageType::INTERNAL_ID: { - const api::InternalCommand& icmd( - static_cast<const api::InternalCommand&>(msg)); + const api::InternalCommand& icmd(static_cast<const api::InternalCommand&>(msg)); document::Bucket bucket; switch(icmd.getType()) { case RequestStatusPage::ID: @@ -844,11 +796,7 @@ FileStorHandlerImpl::remapMessage( if (op == MOVE) { targetDisk = targets[0]->diskIndex; } else { - returnCode = api::ReturnCode( - api::ReturnCode::BUCKET_DELETED, - op == SPLIT - ? "Bucket was just split" - : "Bucket was just joined"); + returnCode = api::ReturnCode(api::ReturnCode::BUCKET_DELETED, splitOrJoin(op)); } } break; @@ -857,8 +805,7 @@ FileStorHandlerImpl::remapMessage( //@fallthrough@ case RepairBucketCommand::ID: if (bucket.getBucketId().getRawId() == 0) { - bucket = static_cast<RepairBucketCommand&>(msg) - .getBucket(); + bucket = static_cast<RepairBucketCommand&>(msg).getBucket(); } // Move to correct queue if op == MOVE // Fail with bucket not found if op != MOVE @@ -866,11 +813,7 @@ FileStorHandlerImpl::remapMessage( if (op == MOVE) { targetDisk = targets[0]->diskIndex; } else { - returnCode = api::ReturnCode( - api::ReturnCode::BUCKET_DELETED, - op == SPLIT - ? "Bucket was just split" - : "Bucket was just joined"); + returnCode = api::ReturnCode(api::ReturnCode::BUCKET_DELETED, splitOrJoin(op)); } } break; @@ -878,22 +821,14 @@ FileStorHandlerImpl::remapMessage( // Fail bucket not found if op != MOVE // Fail and log error if op == MOVE { - api::BucketCommand& cmd( - static_cast<api::BucketCommand&>(msg)); + api::BucketCommand& cmd(static_cast<api::BucketCommand&>(msg)); if (cmd.getBucket() == source) { if (op == MOVE) { - returnCode = api::ReturnCode( - api::ReturnCode::INTERNAL_FAILURE, - "Multiple bucket disk move " - "commands pending at the same time " - "towards bucket " - + source.toString()); + returnCode = api::ReturnCode(api::ReturnCode::INTERNAL_FAILURE, + "Multiple bucket disk move commands pending at the same time " + " towards bucket " + source.toString()); } else { - returnCode = api::ReturnCode( - api::ReturnCode::BUCKET_DELETED, - op == SPLIT - ? "Bucket was just split" - : "Bucket was just joined"); + returnCode = api::ReturnCode(api::ReturnCode::BUCKET_DELETED, splitOrJoin(op)); } } break; @@ -910,13 +845,10 @@ FileStorHandlerImpl::remapMessage( default: // Fail and log error { - LOG(error, "Attempted (and failed) to remap %s which should " - "not be processed at this time", + LOG(error, "Attempted (and failed) to remap %s which should not be processed at this time", msg.toString(true).c_str()); - returnCode = api::ReturnCode( - api::ReturnCode::INTERNAL_FAILURE, - "No such message should be processed at " - "this time."); + returnCode = api::ReturnCode(api::ReturnCode::INTERNAL_FAILURE, + "No such message should be processed at this time."); break; } } @@ -924,12 +856,8 @@ FileStorHandlerImpl::remapMessage( } default: { - returnCode = api::ReturnCode( - api::ReturnCode::INTERNAL_FAILURE, - "Unknown message type in persistence layer"); - LOG(error, - "Unknown message type in persistence layer: %s", - msg.toString().c_str()); + returnCode = api::ReturnCode(api::ReturnCode::INTERNAL_FAILURE, "Unknown message type in persistence layer"); + LOG(error, "Unknown message type in persistence layer: %s", msg.toString().c_str()); } } // End of switch @@ -937,15 +865,11 @@ FileStorHandlerImpl::remapMessage( } void -FileStorHandlerImpl::remapQueueNoLock( - Disk& from, - const RemapInfo& source, - std::vector<RemapInfo*>& targets, - Operation op) +FileStorHandlerImpl::remapQueueNoLock(Disk& from, const RemapInfo& source, + std::vector<RemapInfo*>& targets, Operation op) { - BucketIdx& idx(boost::multi_index::get<2>(from.queue)); - std::pair<BucketIdx::iterator, BucketIdx::iterator> range( - idx.equal_range(source.bucket)); + BucketIdx& idx(bmi::get<2>(from.queue)); + std::pair<BucketIdx::iterator, BucketIdx::iterator> range(idx.equal_range(source.bucket)); std::vector<MessageEntry> entriesFound; @@ -971,22 +895,14 @@ FileStorHandlerImpl::remapQueueNoLock( api::StorageMessage& msg(*entry._command); assert(entry._bucket == source.bucket); - document::Bucket bucket = remapMessage(msg, - source.bucket, - op, - targets, - targetDisk, - returnCode); + document::Bucket bucket = remapMessage(msg, source.bucket, op, targets, targetDisk, returnCode); if (returnCode.getResult() != api::ReturnCode::OK) { // Fail message if errorcode set if (!msg.getType().isReply()) { - std::shared_ptr<api::StorageReply> rep( - static_cast<api::StorageCommand&>(msg) - .makeReply().release()); + std::shared_ptr<api::StorageReply> rep = static_cast<api::StorageCommand&>(msg).makeReply(); LOG(spam, "Sending reply %s because remapping failed: %s", - msg.toString().c_str(), - returnCode.toString().c_str()); + msg.toString().c_str(), returnCode.toString().c_str()); rep->setResult(returnCode); _messageSender.sendReply(rep); @@ -1001,10 +917,8 @@ FileStorHandlerImpl::remapQueueNoLock( } void -FileStorHandlerImpl::remapQueue( - const RemapInfo& source, - RemapInfo& target, - Operation op) { +FileStorHandlerImpl::remapQueue(const RemapInfo& source, RemapInfo& target, Operation op) +{ // Use a helper class to lock to solve issue that some buckets might be // the same bucket. Will fix order if we accept wrong order later. MultiLockGuard guard; @@ -1026,11 +940,7 @@ FileStorHandlerImpl::remapQueue( } void -FileStorHandlerImpl::remapQueue( - const RemapInfo& source, - RemapInfo& target1, - RemapInfo& target2, - Operation op) +FileStorHandlerImpl::remapQueue(const RemapInfo& source, RemapInfo& target1, RemapInfo& target2, Operation op) { // Use a helper class to lock to solve issue that some buckets might be // the same bucket. Will fix order if we accept wrong order later. @@ -1059,16 +969,13 @@ FileStorHandlerImpl::remapQueue( } void -FileStorHandlerImpl::failOperations( - const document::Bucket &bucket, uint16_t fromDisk, - const api::ReturnCode& err) +FileStorHandlerImpl::failOperations(const document::Bucket &bucket, uint16_t fromDisk, const api::ReturnCode& err) { Disk& from(_diskInfo[fromDisk]); vespalib::MonitorGuard lockGuard(from.lock); - BucketIdx& idx(boost::multi_index::get<2>(from.queue)); - std::pair<BucketIdx::iterator, BucketIdx::iterator> range( - idx.equal_range(bucket)); + BucketIdx& idx(bmi::get<2>(from.queue)); + std::pair<BucketIdx::iterator, BucketIdx::iterator> range(idx.equal_range(bucket)); for (auto iter = range.first; iter != range.second;) { // We want to post delete bucket to list before calling this @@ -1076,9 +983,7 @@ FileStorHandlerImpl::failOperations( // cannot delete the delete bucket operation itself if (iter->_command->getType() != api::MessageType::DELETEBUCKET) { if (!iter->_command->getType().isReply()) { - std::shared_ptr<api::StorageReply> msgReply( - static_cast<api::StorageCommand&>(*iter->_command) - .makeReply().release()); + std::shared_ptr<api::StorageReply> msgReply = static_cast<api::StorageCommand&>(*iter->_command).makeReply(); msgReply->setResult(err); _messageSender.sendReply(msgReply); } @@ -1119,10 +1024,10 @@ FileStorHandlerImpl::MessageEntry::MessageEntry(const MessageEntry& entry) FileStorHandlerImpl::MessageEntry::MessageEntry(MessageEntry && entry) noexcept - : _command(std::move(entry._command)), - _timer(entry._timer), - _bucket(entry._bucket), - _priority(entry._priority) + : _command(std::move(entry._command)), + _timer(entry._timer), + _bucket(entry._bucket), + _priority(entry._priority) { } FileStorHandlerImpl::MessageEntry::~MessageEntry() { } @@ -1157,12 +1062,9 @@ FileStorHandlerImpl::getQueueSize(uint16_t disk) const return t.getQueueSize(); } -FileStorHandlerImpl::BucketLock::BucketLock( - const vespalib::MonitorGuard & guard, - Disk& disk, - const document::Bucket &bucket, - uint8_t priority, - const vespalib::stringref & statusString) +FileStorHandlerImpl::BucketLock::BucketLock(const vespalib::MonitorGuard & guard, Disk& disk, + const document::Bucket &bucket, uint8_t priority, + const vespalib::stringref & statusString) : _disk(disk), _bucket(bucket) { @@ -1170,12 +1072,9 @@ FileStorHandlerImpl::BucketLock::BucketLock( if (_bucket.getBucketId().getRawId() != 0) { // Lock the bucket and wait until it is not the current operation for // the disk itself. - _disk.lockedBuckets.insert( - std::make_pair(_bucket, Disk::LockEntry(priority, statusString))); - LOG(debug, - "Locked bucket %s with priority %u", - bucket.getBucketId().toString().c_str(), - priority); + _disk.lockedBuckets.insert(std::make_pair(_bucket, Disk::LockEntry(priority, statusString))); + LOG(debug, "Locked bucket %s with priority %u", + bucket.getBucketId().toString().c_str(), priority); LOG_BUCKET_OPERATION_SET_LOCK_STATE( _bucket.getBucketId(), "acquired filestor lock", false, @@ -1205,11 +1104,8 @@ FileStorHandlerImpl::dumpQueue(uint16_t disk) const const Disk& t(_diskInfo[disk]); vespalib::MonitorGuard lockGuard(t.lock); - const PriorityIdx& idx = boost::multi_index::get<1>(t.queue); - for (PriorityIdx::const_iterator it = idx.begin(); - it != idx.end(); - it++) - { + const PriorityIdx& idx = bmi::get<1>(t.queue); + for (PriorityIdx::const_iterator it = idx.begin(); it != idx.end(); it++) { ost << it->_bucket.getBucketId() << ": " << it->_command->toString() << " (priority: " << (int)it->_command->getPriority() << ")\n"; } @@ -1218,8 +1114,7 @@ FileStorHandlerImpl::dumpQueue(uint16_t disk) const } void -FileStorHandlerImpl::getStatus(std::ostream& out, - const framework::HttpUrlPath& path) const +FileStorHandlerImpl::getStatus(std::ostream& out, const framework::HttpUrlPath& path) const { bool verbose = path.hasAttribute("verbose"); out << "<h1>Filestor handler</h1>\n"; @@ -1247,11 +1142,8 @@ FileStorHandlerImpl::getStatus(std::ostream& out, out << "<h4>Input queue</h4>\n"; out << "<ul>\n"; - const PriorityIdx& idx = boost::multi_index::get<1>(t.queue); - for (PriorityIdx::const_iterator it = idx.begin(); - it != idx.end(); - it++) - { + const PriorityIdx& idx = bmi::get<1>(t.queue); + for (PriorityIdx::const_iterator it = idx.begin(); it != idx.end(); it++) { out << "<li>" << it->_command->toString() << " (priority: " << (int)it->_command->getPriority() << ")</li>\n"; } @@ -1267,10 +1159,8 @@ FileStorHandlerImpl::getStatus(std::ostream& out, if (_mergeStates.size() == 0) { out << "None\n"; } - for (std::map<document::Bucket, MergeStatus::SP>::const_iterator it - = _mergeStates.begin(); it != _mergeStates.end(); ++it) - { - out << "<b>" << it->first.toString() << "</b><br>\n"; + for (auto & entry : _mergeStates) { + out << "<b>" << entry.first.toString() << "</b><br>\n"; // << "<p>" << it->second << "</p>\n"; // Gets very spammy with // the complete state here.. } diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h index 6b6d154e149..fb084c41b7c 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h @@ -37,6 +37,8 @@ class FileStorDiskMetrics; class StorBucketDatabase; class AbortBucketOperationsCommand; +namespace bmi = boost::multi_index; + class FileStorHandlerImpl : private framework::MetricUpdateHook, private ResumeGuard::Callback, public MessageSender { @@ -61,25 +63,13 @@ public: } }; - typedef boost::multi_index::ordered_non_unique< - boost::multi_index::identity<MessageEntry> > PriorityOrder; - - typedef boost::multi_index::ordered_non_unique< - boost::multi_index::member<MessageEntry, - document::Bucket, - &MessageEntry::_bucket> > BucketOrder; - - typedef boost::multi_index::multi_index_container< - MessageEntry, - boost::multi_index::indexed_by< - boost::multi_index::sequenced<>, - PriorityOrder, - BucketOrder - > - > PriorityQueue; - - typedef boost::multi_index::nth_index<PriorityQueue, 1>::type PriorityIdx; - typedef boost::multi_index::nth_index<PriorityQueue, 2>::type BucketIdx; + using PriorityOrder = bmi::ordered_non_unique<bmi::identity<MessageEntry> >; + using BucketOrder = bmi::ordered_non_unique<bmi::member<MessageEntry, document::Bucket, &MessageEntry::_bucket>>; + + using PriorityQueue = bmi::multi_index_container<MessageEntry, bmi::indexed_by<bmi::sequenced<>, PriorityOrder, BucketOrder>>; + + using PriorityIdx = bmi::nth_index<PriorityQueue, 1>::type; + using BucketIdx = bmi::nth_index<PriorityQueue, 2>::type; struct Disk { vespalib::Monitor lock; |