diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2017-01-12 00:06:58 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-01-12 00:06:58 +0100 |
commit | 3ddc12b47ef9678bdac9284a4b5c63dcab4e6151 (patch) | |
tree | b2a5bdd470e04b9ae2449ed0d619f8cd75966870 | |
parent | d9a44c344a9a03cad572ff4f54cbcf83c829f11a (diff) | |
parent | 88198e860dbf6429442590dbe78d0f6b4b3c88b3 (diff) |
Merge pull request #1481 from yahoo/balder/faster-and-more-readable-getnextmessage
Balder/faster and more readable getnextmessage
4 files changed, 100 insertions, 100 deletions
diff --git a/storage/src/vespa/storage/common/messagebucketid.cpp b/storage/src/vespa/storage/common/messagebucketid.cpp index 2135cbcd9b6..ae71e47005f 100644 --- a/storage/src/vespa/storage/common/messagebucketid.cpp +++ b/storage/src/vespa/storage/common/messagebucketid.cpp @@ -3,7 +3,6 @@ #include "messagebucketid.h" #include "statusmessages.h" #include "bucketmessages.h" -#include <vespa/storageapi/messageapi/storagemessage.h> #include <vespa/storageapi/message/bucket.h> #include <vespa/storageapi/message/bucketsplitting.h> #include <vespa/storageapi/message/multioperation.h> diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp index bee8e7a6c49..a2d5ce1a647 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp @@ -8,13 +8,10 @@ #include <vespa/storageapi/message/removelocation.h> #include <vespa/storage/bucketdb/storbucketdb.h> #include <vespa/storage/common/bucketmessages.h> -#include <vespa/storage/common/messagesender.h> -#include <vespa/storage/common/nodestateupdater.h> #include <vespa/storage/common/statusmessages.h> #include <vespa/storage/common/bucketoperationlogger.h> #include <vespa/storage/common/messagebucketid.h> #include <vespa/storage/persistence/messages.h> -#include <vespa/vespalib/util/random.h> #include <vespa/storageapi/message/stat.h> #include <vespa/storageapi/message/batch.h> #include <vespa/vespalib/stllike/hash_map.hpp> @@ -258,8 +255,7 @@ FileStorHandlerImpl::schedule(const std::shared_ptr<api::StorageMessage>& msg, "FileStorHandler: Operation added to disk %d's queue with " "priority %u", disk, msg->getPriority())); - t.queue.push_back(MessageEntry(msg, - getStorageMessageBucketId(*msg))); + t.queue.emplace_back(msg, getStorageMessageBucketId(*msg)); LOG(spam, "Queued operation %s with priority %u.", msg->getType().toString().c_str(), @@ -451,11 +447,11 @@ FileStorHandlerImpl::getNextMessage(uint16_t disk, return lck; } - std::shared_ptr<api::StorageMessage> msg(range.first->_command); - mbus::Trace& trace = msg->getTrace(); + api::StorageMessage & m(*range.first->_command); + mbus::Trace& trace = m.getTrace(); // Priority is too low, not buffering any more. - if (msg->getPriority() > maxPriority || msg->getPriority() >= _maxPriorityToBlock) { + if (m.getPriority() > maxPriority || m.getPriority() >= _maxPriorityToBlock) { lck.second.reset(); return lck; } @@ -466,16 +462,17 @@ FileStorHandlerImpl::getNextMessage(uint16_t disk, uint64_t waitTime( const_cast<metrics::MetricTimer&>(range.first->_timer).stop( - t.metrics->averageQueueWaitingTime[msg->getLoadType()])); + t.metrics->averageQueueWaitingTime[m.getLoadType()])); LOG(debug, "Message %s waited %" PRIu64 " ms in storage queue (bucket %s), " "timeout %d", - msg->toString().c_str(), waitTime, id.toString().c_str(), - static_cast<api::StorageCommand&>(*msg).getTimeout()); + m.toString().c_str(), waitTime, id.toString().c_str(), + static_cast<api::StorageCommand&>(m).getTimeout()); - if (msg->getType().isReply() || - waitTime < static_cast<api::StorageCommand&>(*msg).getTimeout()) + if (m.getType().isReply() || + waitTime < static_cast<api::StorageCommand&>(m).getTimeout()) { + std::shared_ptr<api::StorageMessage> msg = std::move(range.first->_command); idx.erase(range.first); lck.second.swap(msg); lockGuard.broadcast(); @@ -483,15 +480,14 @@ FileStorHandlerImpl::getNextMessage(uint16_t disk, return lck; } else { std::shared_ptr<api::StorageReply> msgReply( - static_cast<api::StorageCommand&>(*msg) + static_cast<api::StorageCommand&>(m) .makeReply().release()); - msgReply->setResult(api::ReturnCode( - api::ReturnCode::TIMEOUT, - "Message waited too long in storage queue")); - idx.erase(range.first); lockGuard.broadcast(); lockGuard.unlock(); + msgReply->setResult(api::ReturnCode( + api::ReturnCode::TIMEOUT, + "Message waited too long in storage queue")); _messageSender.sendReply(msgReply); lck.second.reset(); @@ -520,15 +516,6 @@ FileStorHandlerImpl::diskIsClosed(uint16_t disk) const } bool -FileStorHandlerImpl::operationHasHighEnoughPriorityToBeRun( - const api::StorageMessage& msg, - uint8_t maxPriority) const -{ - // NOTE: priority integral value 0 is considered highest pri. - return (msg.getPriority() <= maxPriority); -} - -bool FileStorHandlerImpl::operationBlockedByHigherPriorityThread( const api::StorageMessage& msg, const Disk& disk) const @@ -550,12 +537,13 @@ FileStorHandlerImpl::messageTimedOutInQueue(const api::StorageMessage& msg, std::unique_ptr<FileStorHandler::BucketLockInterface> FileStorHandlerImpl::takeDiskBucketLockOwnership( + const vespalib::MonitorGuard & guard, Disk& disk, const document::BucketId& id, const api::StorageMessage& msg) { return std::unique_ptr<FileStorHandler::BucketLockInterface>( - new BucketLock(disk, id, msg.getPriority(), msg.getSummary())); + new BucketLock(guard, disk, id, msg.getPriority(), msg.getSummary())); } std::unique_ptr<api::StorageReply> @@ -570,21 +558,34 @@ FileStorHandlerImpl::makeQueueTimeoutReply(api::StorageMessage& msg) const return msgReply; } -bool -FileStorHandlerImpl::bucketIsLockedOnDisk(const document::BucketId& id, - const Disk& t) const -{ - return (id.getRawId() != 0 && t.isLocked(id)); +namespace { + bool + bucketIsLockedOnDisk(const document::BucketId &id, const FileStorHandlerImpl::Disk &t) { + return (id.getRawId() != 0 && t.isLocked(id)); + } + + /** + * Return whether msg has sufficiently high priority that a thread with + * a configured priority threshold of maxPriority can even run in. + * Often, operations such as streaming searches will have dedicated threads + * that refuse lower priority operations such as Puts etc. + */ + bool + operationHasHighEnoughPriorityToBeRun(const api::StorageMessage& msg, uint8_t maxPriority) + { + // NOTE: priority integral value 0 is considered highest pri. + return (msg.getPriority() <= maxPriority); + } } FileStorHandler::LockedMessage FileStorHandlerImpl::getNextMessage(uint16_t disk, uint8_t maxPriority) { + assert(disk < _diskInfo.size()); if (!tryHandlePause(disk)) { return {}; // Still paused, return to allow tick. } - assert(disk < _diskInfo.size()); Disk& t(_diskInfo[disk]); vespalib::MonitorGuard lockGuard(t.lock); @@ -592,57 +593,21 @@ FileStorHandlerImpl::getNextMessage(uint16_t disk, uint8_t maxPriority) // if none can be found and then exiting if the same is the case on the // second attempt. This is key to allowing the run loop to register // ticks at regular intervals while not busy-waiting. - for (int attempt = 0; attempt < 2; ++attempt) { + for (int attempt = 0; (attempt < 2) && ! diskIsClosed(disk); ++attempt) { PriorityIdx& idx(boost::multi_index::get<1>(t.queue)); PriorityIdx::iterator iter(idx.begin()), end(idx.end()); - - if (diskIsClosed(disk)) { - return {}; - } - while (iter != end) { - document::BucketId id(iter->_bucketId); - if (bucketIsLockedOnDisk(id, t)) { - ++iter; // Try next in queue, if any. - continue; - } - std::shared_ptr<api::StorageMessage> msg(iter->_command); - mbus::Trace& trace(msg->getTrace()); + while (iter != end && bucketIsLockedOnDisk(iter->_bucketId, t)) { + iter++; + } + if (iter != end) { + api::StorageMessage &m(*iter->_command); - if (!operationHasHighEnoughPriorityToBeRun(*msg, maxPriority) - || operationBlockedByHigherPriorityThread(*msg, t) - || isPaused()) + if (operationHasHighEnoughPriorityToBeRun(m, maxPriority) + && ! operationBlockedByHigherPriorityThread(m, t) + && ! isPaused()) { - break; - } - - const uint64_t waitTime( - const_cast<metrics::MetricTimer&>(iter->_timer).stop( - t.metrics->averageQueueWaitingTime[ - msg->getLoadType()])); - - MBUS_TRACE(trace, 9, "FileStorHandler: Message identified by " - "disk thread."); - LOG(debug, - "Message %s waited %" PRIu64 " ms in storage queue, timeout %d", - msg->toString().c_str(), waitTime, - static_cast<api::StorageCommand&>(*msg).getTimeout()); - - idx.erase(iter); // iter not used after this point. - - if (!messageTimedOutInQueue(*msg, waitTime)) { - std::unique_ptr<FileStorHandler::BucketLockInterface> locker( - takeDiskBucketLockOwnership(t, id, *msg)); - MBUS_TRACE(trace, 9, "FileStorHandler: Got lock on bucket"); - lockGuard.broadcast(); // XXX: needed here? - return {std::move(locker), msg}; - } else { - std::shared_ptr<api::StorageReply> msgReply( - makeQueueTimeoutReply(*msg)); - lockGuard.broadcast(); // XXX: needed here? - lockGuard.unlock(); - _messageSender.sendReply(msgReply); - return {}; + return getMessage(lockGuard, t, idx, iter); } } if (attempt == 0) { @@ -652,6 +617,37 @@ FileStorHandlerImpl::getNextMessage(uint16_t disk, uint8_t maxPriority) return {}; // No message fetched. } +FileStorHandler::LockedMessage +FileStorHandlerImpl::getMessage(vespalib::MonitorGuard & guard, Disk & t, PriorityIdx & idx, PriorityIdx::iterator iter) { + + api::StorageMessage & m(*iter->_command); + const uint64_t waitTime( + const_cast<metrics::MetricTimer &>(iter->_timer).stop( + t.metrics->averageQueueWaitingTime[m.getLoadType()])); + + mbus::Trace &trace(m.getTrace()); + MBUS_TRACE(trace, 9, "FileStorHandler: Message identified by disk thread."); + LOG(debug, "Message %s waited %" PRIu64 " ms in storage queue, timeout %d", + m.toString().c_str(), waitTime, static_cast<api::StorageCommand &>(m).getTimeout()); + + std::shared_ptr<api::StorageMessage> msg = std::move(iter->_command); + document::BucketId bucketId(iter->_bucketId); + idx.erase(iter); // iter not used after this point. + + if (!messageTimedOutInQueue(*msg, waitTime)) { + auto locker = takeDiskBucketLockOwnership(guard, t, bucketId, *msg); + guard.unlock(); + MBUS_TRACE(trace, 9, "FileStorHandler: Got lock on bucket"); + return std::move(FileStorHandler::LockedMessage(std::move(locker), std::move(msg))); + } else { + std::shared_ptr<api::StorageReply> msgReply(makeQueueTimeoutReply(*msg)); + guard.broadcast(); // XXX: needed here? + guard.unlock(); + _messageSender.sendReply(msgReply); + return {}; + } +} + std::shared_ptr<FileStorHandler::BucketLockInterface> FileStorHandlerImpl::lock(const document::BucketId& bucket, uint16_t disk) { @@ -673,7 +669,7 @@ FileStorHandlerImpl::lock(const document::BucketId& bucket, uint16_t disk) } std::shared_ptr<FileStorHandler::BucketLockInterface> locker( - new BucketLock(t, bucket, 255, "External lock")); + new BucketLock(lockGuard, t, bucket, 255, "External lock")); lockGuard.broadcast(); return locker; @@ -1056,7 +1052,7 @@ FileStorHandlerImpl::remapQueueNoLock( for (BucketIdx::iterator i = range.first; i != range.second; ++i) { assert(i->_bucketId == source.bid); - entriesFound.push_back(*i); + entriesFound.push_back(std::move(*i)); } // Remove them @@ -1097,7 +1093,7 @@ FileStorHandlerImpl::remapQueueNoLock( } else { entry._bucketId = bid; // Move to correct disk queue if needed - _diskInfo[targetDisk].queue.push_back(entry); + _diskInfo[targetDisk].queue.emplace_back(std::move(entry)); } } @@ -1212,6 +1208,7 @@ FileStorHandlerImpl::MessageEntry::MessageEntry(const std::shared_ptr<api::Stora _priority(cmd->getPriority()) { } + FileStorHandlerImpl::MessageEntry::MessageEntry(const MessageEntry& entry) : _command(entry._command), _timer(entry._timer), @@ -1219,6 +1216,14 @@ FileStorHandlerImpl::MessageEntry::MessageEntry(const MessageEntry& entry) _priority(entry._priority) { } + +FileStorHandlerImpl::MessageEntry::MessageEntry(MessageEntry && entry) + : _command(std::move(entry._command)), + _timer(entry._timer), + _bucketId(entry._bucketId), + _priority(entry._priority) +{ } + FileStorHandlerImpl::MessageEntry::~MessageEntry() { } FileStorHandlerImpl::Disk::Disk() @@ -1252,6 +1257,7 @@ FileStorHandlerImpl::getQueueSize(uint16_t disk) const } FileStorHandlerImpl::BucketLock::BucketLock( + const vespalib::MonitorGuard & guard, Disk& disk, const document::BucketId& id, uint8_t priority, @@ -1259,6 +1265,7 @@ FileStorHandlerImpl::BucketLock::BucketLock( : _disk(disk), _id(id) { + (void) guard; if (_id.getRawId() != 0) { // Lock the bucket and wait until it is not the current operation for // the disk itself. diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h index 27bb537ab18..410d586f0a2 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h @@ -15,11 +15,11 @@ #pragma once +#include "filestorhandler.h" +#include "mergestatus.h" #include <vespa/document/bucket/bucketid.h> #include <vespa/metrics/metrics.h> #include <vespa/storage/common/servicelayercomponent.h> -#include <vespa/storage/persistence/filestorage/filestorhandler.h> -#include <vespa/storage/persistence/filestorage/mergestatus.h> #include <vespa/storageframework/storageframework.h> #include <boost/multi_index_container.hpp> #include <boost/multi_index/identity.hpp> @@ -51,6 +51,7 @@ public: uint8_t _priority; MessageEntry(const std::shared_ptr<api::StorageMessage>& cmd, const document::BucketId& bId); + MessageEntry(MessageEntry &&); MessageEntry(const MessageEntry &); MessageEntry & operator = (const MessageEntry &) = delete; ~MessageEntry(); @@ -130,7 +131,7 @@ public: class BucketLock : public FileStorHandler::BucketLockInterface { public: - BucketLock(Disk& disk, const document::BucketId& id, uint8_t priority, + BucketLock(const vespalib::MonitorGuard & guard, Disk& disk, const document::BucketId& id, uint8_t priority, const vespalib::stringref & statusString); ~BucketLock(); @@ -159,6 +160,7 @@ public: void pause(uint16_t disk, uint8_t priority) const; FileStorHandler::LockedMessage getNextMessage(uint16_t disk, uint8_t lowestPriority); + FileStorHandler::LockedMessage getMessage(vespalib::MonitorGuard & guard, Disk & t, PriorityIdx & idx, PriorityIdx::iterator iter); FileStorHandler::LockedMessage & getNextMessage(uint16_t disk, FileStorHandler::LockedMessage& lock, uint8_t lowestPriority); @@ -236,14 +238,6 @@ private: bool diskIsClosed(uint16_t disk) const; /** - * Return whether msg has sufficiently high priority that a thread with - * a configured priority threshold of maxPriority can even run in. - * Often, operations such as streaming searches will have dedicated threads - * that refuse lower priority operations such as Puts etc. - */ - bool operationHasHighEnoughPriorityToBeRun(const api::StorageMessage& msg, uint8_t maxPriority) const; - - /** * Return whether an already running high priority operation pre-empts * (blocks) the operation in msg from even starting in the current thread. */ @@ -260,14 +254,14 @@ private: * Disk lock MUST have been taken prior to calling this function. */ std::unique_ptr<FileStorHandler::BucketLockInterface> - takeDiskBucketLockOwnership(Disk& disk, const document::BucketId& id, const api::StorageMessage& msg); + takeDiskBucketLockOwnership(const vespalib::MonitorGuard & guard, + Disk& disk, const document::BucketId& id, const api::StorageMessage& msg); /** * Creates and returns a reply with api::TIMEOUT return code for msg. * Swaps (invalidates) context from msg into reply. */ std::unique_ptr<api::StorageReply> makeQueueTimeoutReply(api::StorageMessage& msg) const; - bool bucketIsLockedOnDisk(const document::BucketId&, const Disk&) const; bool messageMayBeAborted(const api::StorageMessage& msg) const; bool hasBlockingOperations(const Disk& t) const; void abortQueuedCommandsForBuckets(Disk& disk, const AbortBucketOperationsCommand& cmd); diff --git a/storage/src/vespa/storage/persistence/providershutdownwrapper.cpp b/storage/src/vespa/storage/persistence/providershutdownwrapper.cpp index 504c3b09419..890c1dc7110 100644 --- a/storage/src/vespa/storage/persistence/providershutdownwrapper.cpp +++ b/storage/src/vespa/storage/persistence/providershutdownwrapper.cpp @@ -1,7 +1,7 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include <vespa/fastos/fastos.h> -#include <vespa/storage/persistence/providershutdownwrapper.h> -#include <vespa/storage/persistence/persistenceutil.h> + +#include "providershutdownwrapper.h" +#include "persistenceutil.h" #include <vespa/log/log.h> LOG_SETUP(".persistence.shutdownwrapper"); |