diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2017-01-10 15:15:37 +0100 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2017-01-11 10:20:22 +0000 |
commit | 23faf0d6c74493b9efc602cb94cb90246ac61fd9 (patch) | |
tree | 8c53f0d09e52385a7b025883efb119fb35f4d7b3 | |
parent | c2987747ab678c5ce292b8d0cd21e68ab6b356fe (diff) |
Rewrite the getNextMessage loop so that is easier to understand and follow
-rw-r--r-- | storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp | 62 | ||||
-rw-r--r-- | storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h | 9 |
2 files changed, 29 insertions, 42 deletions
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp index 78ac8ee6e1d..7ce7f60daf8 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp @@ -516,15 +516,6 @@ FileStorHandlerImpl::diskIsClosed(uint16_t disk) const } bool -FileStorHandlerImpl::operationHasHighEnoughPriorityToBeRun( - const api::StorageMessage& msg, - uint8_t maxPriority) const -{ - // NOTE: priority integral value 0 is considered highest pri. - return (msg.getPriority() <= maxPriority); -} - -bool FileStorHandlerImpl::operationBlockedByHigherPriorityThread( const api::StorageMessage& msg, const Disk& disk) const @@ -567,21 +558,34 @@ FileStorHandlerImpl::makeQueueTimeoutReply(api::StorageMessage& msg) const return msgReply; } -bool -FileStorHandlerImpl::bucketIsLockedOnDisk(const document::BucketId& id, - const Disk& t) const -{ - return (id.getRawId() != 0 && t.isLocked(id)); +namespace { + bool + bucketIsLockedOnDisk(const document::BucketId &id, const FileStorHandlerImpl::Disk &t) { + return (id.getRawId() != 0 && t.isLocked(id)); + } + + /** + * Return whether msg has sufficiently high priority that a thread with + * a configured priority threshold of maxPriority can even run in. + * Often, operations such as streaming searches will have dedicated threads + * that refuse lower priority operations such as Puts etc. + */ + bool + operationHasHighEnoughPriorityToBeRun(const api::StorageMessage& msg, uint8_t maxPriority) + { + // NOTE: priority integral value 0 is considered highest pri. + return (msg.getPriority() <= maxPriority); + } } FileStorHandler::LockedMessage FileStorHandlerImpl::getNextMessage(uint16_t disk, uint8_t maxPriority) { + assert(disk < _diskInfo.size()); if (!tryHandlePause(disk)) { return {}; // Still paused, return to allow tick. } - assert(disk < _diskInfo.size()); Disk& t(_diskInfo[disk]); vespalib::MonitorGuard lockGuard(t.lock); @@ -589,30 +593,22 @@ FileStorHandlerImpl::getNextMessage(uint16_t disk, uint8_t maxPriority) // if none can be found and then exiting if the same is the case on the // second attempt. This is key to allowing the run loop to register // ticks at regular intervals while not busy-waiting. - for (int attempt = 0; attempt < 2; ++attempt) { + for (int attempt = 0; (attempt < 2) && ! diskIsClosed(disk); ++attempt) { PriorityIdx& idx(boost::multi_index::get<1>(t.queue)); PriorityIdx::iterator iter(idx.begin()), end(idx.end()); - - if (diskIsClosed(disk)) { - return {}; - } - while (iter != end) { - document::BucketId id(iter->_bucketId); - if (bucketIsLockedOnDisk(id, t)) { - ++iter; // Try next in queue, if any. - continue; - } - api::StorageMessage & m(*iter->_command); + while (iter != end && bucketIsLockedOnDisk(iter->_bucketId, t)) { + iter++; + } + if (iter != end) { + api::StorageMessage &m(*iter->_command); - if (!operationHasHighEnoughPriorityToBeRun(m, maxPriority) - || operationBlockedByHigherPriorityThread(m, t) - || isPaused()) + if (operationHasHighEnoughPriorityToBeRun(m, maxPriority) + && ! operationBlockedByHigherPriorityThread(m, t) + && ! isPaused()) { - break; + return getMessage(lockGuard, t, idx, iter); } - - return getMessage(lockGuard, t, idx, iter); } if (attempt == 0) { lockGuard.wait(_getNextMessageTimeout); diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h index db55f9c512e..410d586f0a2 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h @@ -238,14 +238,6 @@ private: bool diskIsClosed(uint16_t disk) const; /** - * Return whether msg has sufficiently high priority that a thread with - * a configured priority threshold of maxPriority can even run in. - * Often, operations such as streaming searches will have dedicated threads - * that refuse lower priority operations such as Puts etc. - */ - bool operationHasHighEnoughPriorityToBeRun(const api::StorageMessage& msg, uint8_t maxPriority) const; - - /** * Return whether an already running high priority operation pre-empts * (blocks) the operation in msg from even starting in the current thread. */ @@ -270,7 +262,6 @@ private: * Swaps (invalidates) context from msg into reply. */ std::unique_ptr<api::StorageReply> makeQueueTimeoutReply(api::StorageMessage& msg) const; - bool bucketIsLockedOnDisk(const document::BucketId&, const Disk&) const; bool messageMayBeAborted(const api::StorageMessage& msg) const; bool hasBlockingOperations(const Disk& t) const; void abortQueuedCommandsForBuckets(Disk& disk, const AbortBucketOperationsCommand& cmd); |