summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2017-01-10 15:15:37 +0100
committerHenning Baldersheim <balder@yahoo-inc.com>2017-01-11 10:20:22 +0000
commit23faf0d6c74493b9efc602cb94cb90246ac61fd9 (patch)
tree8c53f0d09e52385a7b025883efb119fb35f4d7b3
parentc2987747ab678c5ce292b8d0cd21e68ab6b356fe (diff)
Rewrite the getNextMessage loop so that is easier to understand and follow
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp62
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h9
2 files changed, 29 insertions, 42 deletions
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
index 78ac8ee6e1d..7ce7f60daf8 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
@@ -516,15 +516,6 @@ FileStorHandlerImpl::diskIsClosed(uint16_t disk) const
}
bool
-FileStorHandlerImpl::operationHasHighEnoughPriorityToBeRun(
- const api::StorageMessage& msg,
- uint8_t maxPriority) const
-{
- // NOTE: priority integral value 0 is considered highest pri.
- return (msg.getPriority() <= maxPriority);
-}
-
-bool
FileStorHandlerImpl::operationBlockedByHigherPriorityThread(
const api::StorageMessage& msg,
const Disk& disk) const
@@ -567,21 +558,34 @@ FileStorHandlerImpl::makeQueueTimeoutReply(api::StorageMessage& msg) const
return msgReply;
}
-bool
-FileStorHandlerImpl::bucketIsLockedOnDisk(const document::BucketId& id,
- const Disk& t) const
-{
- return (id.getRawId() != 0 && t.isLocked(id));
+namespace {
+ bool
+ bucketIsLockedOnDisk(const document::BucketId &id, const FileStorHandlerImpl::Disk &t) {
+ return (id.getRawId() != 0 && t.isLocked(id));
+ }
+
+ /**
+ * Return whether msg has sufficiently high priority that a thread with
+ * a configured priority threshold of maxPriority can even run in.
+ * Often, operations such as streaming searches will have dedicated threads
+ * that refuse lower priority operations such as Puts etc.
+ */
+ bool
+ operationHasHighEnoughPriorityToBeRun(const api::StorageMessage& msg, uint8_t maxPriority)
+ {
+ // NOTE: priority integral value 0 is considered highest pri.
+ return (msg.getPriority() <= maxPriority);
+ }
}
FileStorHandler::LockedMessage
FileStorHandlerImpl::getNextMessage(uint16_t disk, uint8_t maxPriority)
{
+ assert(disk < _diskInfo.size());
if (!tryHandlePause(disk)) {
return {}; // Still paused, return to allow tick.
}
- assert(disk < _diskInfo.size());
Disk& t(_diskInfo[disk]);
vespalib::MonitorGuard lockGuard(t.lock);
@@ -589,30 +593,22 @@ FileStorHandlerImpl::getNextMessage(uint16_t disk, uint8_t maxPriority)
// if none can be found and then exiting if the same is the case on the
// second attempt. This is key to allowing the run loop to register
// ticks at regular intervals while not busy-waiting.
- for (int attempt = 0; attempt < 2; ++attempt) {
+ for (int attempt = 0; (attempt < 2) && ! diskIsClosed(disk); ++attempt) {
PriorityIdx& idx(boost::multi_index::get<1>(t.queue));
PriorityIdx::iterator iter(idx.begin()), end(idx.end());
-
- if (diskIsClosed(disk)) {
- return {};
- }
- while (iter != end) {
- document::BucketId id(iter->_bucketId);
- if (bucketIsLockedOnDisk(id, t)) {
- ++iter; // Try next in queue, if any.
- continue;
- }
- api::StorageMessage & m(*iter->_command);
+ while (iter != end && bucketIsLockedOnDisk(iter->_bucketId, t)) {
+ iter++;
+ }
+ if (iter != end) {
+ api::StorageMessage &m(*iter->_command);
- if (!operationHasHighEnoughPriorityToBeRun(m, maxPriority)
- || operationBlockedByHigherPriorityThread(m, t)
- || isPaused())
+ if (operationHasHighEnoughPriorityToBeRun(m, maxPriority)
+ && ! operationBlockedByHigherPriorityThread(m, t)
+ && ! isPaused())
{
- break;
+ return getMessage(lockGuard, t, idx, iter);
}
-
- return getMessage(lockGuard, t, idx, iter);
}
if (attempt == 0) {
lockGuard.wait(_getNextMessageTimeout);
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
index db55f9c512e..410d586f0a2 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
@@ -238,14 +238,6 @@ private:
bool diskIsClosed(uint16_t disk) const;
/**
- * Return whether msg has sufficiently high priority that a thread with
- * a configured priority threshold of maxPriority can even run in.
- * Often, operations such as streaming searches will have dedicated threads
- * that refuse lower priority operations such as Puts etc.
- */
- bool operationHasHighEnoughPriorityToBeRun(const api::StorageMessage& msg, uint8_t maxPriority) const;
-
- /**
* Return whether an already running high priority operation pre-empts
* (blocks) the operation in msg from even starting in the current thread.
*/
@@ -270,7 +262,6 @@ private:
* Swaps (invalidates) context from msg into reply.
*/
std::unique_ptr<api::StorageReply> makeQueueTimeoutReply(api::StorageMessage& msg) const;
- bool bucketIsLockedOnDisk(const document::BucketId&, const Disk&) const;
bool messageMayBeAborted(const api::StorageMessage& msg) const;
bool hasBlockingOperations(const Disk& t) const;
void abortQueuedCommandsForBuckets(Disk& disk, const AbortBucketOperationsCommand& cmd);