summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2017-01-12 00:06:58 +0100
committerGitHub <noreply@github.com>2017-01-12 00:06:58 +0100
commit3ddc12b47ef9678bdac9284a4b5c63dcab4e6151 (patch)
treeb2a5bdd470e04b9ae2449ed0d619f8cd75966870
parentd9a44c344a9a03cad572ff4f54cbcf83c829f11a (diff)
parent88198e860dbf6429442590dbe78d0f6b4b3c88b3 (diff)
Merge pull request #1481 from yahoo/balder/faster-and-more-readable-getnextmessage
Balder/faster and more readable getnextmessage
-rw-r--r--storage/src/vespa/storage/common/messagebucketid.cpp1
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp173
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h20
-rw-r--r--storage/src/vespa/storage/persistence/providershutdownwrapper.cpp6
4 files changed, 100 insertions, 100 deletions
diff --git a/storage/src/vespa/storage/common/messagebucketid.cpp b/storage/src/vespa/storage/common/messagebucketid.cpp
index 2135cbcd9b6..ae71e47005f 100644
--- a/storage/src/vespa/storage/common/messagebucketid.cpp
+++ b/storage/src/vespa/storage/common/messagebucketid.cpp
@@ -3,7 +3,6 @@
#include "messagebucketid.h"
#include "statusmessages.h"
#include "bucketmessages.h"
-#include <vespa/storageapi/messageapi/storagemessage.h>
#include <vespa/storageapi/message/bucket.h>
#include <vespa/storageapi/message/bucketsplitting.h>
#include <vespa/storageapi/message/multioperation.h>
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
index bee8e7a6c49..a2d5ce1a647 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
@@ -8,13 +8,10 @@
#include <vespa/storageapi/message/removelocation.h>
#include <vespa/storage/bucketdb/storbucketdb.h>
#include <vespa/storage/common/bucketmessages.h>
-#include <vespa/storage/common/messagesender.h>
-#include <vespa/storage/common/nodestateupdater.h>
#include <vespa/storage/common/statusmessages.h>
#include <vespa/storage/common/bucketoperationlogger.h>
#include <vespa/storage/common/messagebucketid.h>
#include <vespa/storage/persistence/messages.h>
-#include <vespa/vespalib/util/random.h>
#include <vespa/storageapi/message/stat.h>
#include <vespa/storageapi/message/batch.h>
#include <vespa/vespalib/stllike/hash_map.hpp>
@@ -258,8 +255,7 @@ FileStorHandlerImpl::schedule(const std::shared_ptr<api::StorageMessage>& msg,
"FileStorHandler: Operation added to disk %d's queue with "
"priority %u", disk, msg->getPriority()));
- t.queue.push_back(MessageEntry(msg,
- getStorageMessageBucketId(*msg)));
+ t.queue.emplace_back(msg, getStorageMessageBucketId(*msg));
LOG(spam, "Queued operation %s with priority %u.",
msg->getType().toString().c_str(),
@@ -451,11 +447,11 @@ FileStorHandlerImpl::getNextMessage(uint16_t disk,
return lck;
}
- std::shared_ptr<api::StorageMessage> msg(range.first->_command);
- mbus::Trace& trace = msg->getTrace();
+ api::StorageMessage & m(*range.first->_command);
+ mbus::Trace& trace = m.getTrace();
// Priority is too low, not buffering any more.
- if (msg->getPriority() > maxPriority || msg->getPriority() >= _maxPriorityToBlock) {
+ if (m.getPriority() > maxPriority || m.getPriority() >= _maxPriorityToBlock) {
lck.second.reset();
return lck;
}
@@ -466,16 +462,17 @@ FileStorHandlerImpl::getNextMessage(uint16_t disk,
uint64_t waitTime(
const_cast<metrics::MetricTimer&>(range.first->_timer).stop(
- t.metrics->averageQueueWaitingTime[msg->getLoadType()]));
+ t.metrics->averageQueueWaitingTime[m.getLoadType()]));
LOG(debug, "Message %s waited %" PRIu64 " ms in storage queue (bucket %s), "
"timeout %d",
- msg->toString().c_str(), waitTime, id.toString().c_str(),
- static_cast<api::StorageCommand&>(*msg).getTimeout());
+ m.toString().c_str(), waitTime, id.toString().c_str(),
+ static_cast<api::StorageCommand&>(m).getTimeout());
- if (msg->getType().isReply() ||
- waitTime < static_cast<api::StorageCommand&>(*msg).getTimeout())
+ if (m.getType().isReply() ||
+ waitTime < static_cast<api::StorageCommand&>(m).getTimeout())
{
+ std::shared_ptr<api::StorageMessage> msg = std::move(range.first->_command);
idx.erase(range.first);
lck.second.swap(msg);
lockGuard.broadcast();
@@ -483,15 +480,14 @@ FileStorHandlerImpl::getNextMessage(uint16_t disk,
return lck;
} else {
std::shared_ptr<api::StorageReply> msgReply(
- static_cast<api::StorageCommand&>(*msg)
+ static_cast<api::StorageCommand&>(m)
.makeReply().release());
- msgReply->setResult(api::ReturnCode(
- api::ReturnCode::TIMEOUT,
- "Message waited too long in storage queue"));
-
idx.erase(range.first);
lockGuard.broadcast();
lockGuard.unlock();
+ msgReply->setResult(api::ReturnCode(
+ api::ReturnCode::TIMEOUT,
+ "Message waited too long in storage queue"));
_messageSender.sendReply(msgReply);
lck.second.reset();
@@ -520,15 +516,6 @@ FileStorHandlerImpl::diskIsClosed(uint16_t disk) const
}
bool
-FileStorHandlerImpl::operationHasHighEnoughPriorityToBeRun(
- const api::StorageMessage& msg,
- uint8_t maxPriority) const
-{
- // NOTE: priority integral value 0 is considered highest pri.
- return (msg.getPriority() <= maxPriority);
-}
-
-bool
FileStorHandlerImpl::operationBlockedByHigherPriorityThread(
const api::StorageMessage& msg,
const Disk& disk) const
@@ -550,12 +537,13 @@ FileStorHandlerImpl::messageTimedOutInQueue(const api::StorageMessage& msg,
std::unique_ptr<FileStorHandler::BucketLockInterface>
FileStorHandlerImpl::takeDiskBucketLockOwnership(
+ const vespalib::MonitorGuard & guard,
Disk& disk,
const document::BucketId& id,
const api::StorageMessage& msg)
{
return std::unique_ptr<FileStorHandler::BucketLockInterface>(
- new BucketLock(disk, id, msg.getPriority(), msg.getSummary()));
+ new BucketLock(guard, disk, id, msg.getPriority(), msg.getSummary()));
}
std::unique_ptr<api::StorageReply>
@@ -570,21 +558,34 @@ FileStorHandlerImpl::makeQueueTimeoutReply(api::StorageMessage& msg) const
return msgReply;
}
-bool
-FileStorHandlerImpl::bucketIsLockedOnDisk(const document::BucketId& id,
- const Disk& t) const
-{
- return (id.getRawId() != 0 && t.isLocked(id));
+namespace {
+ bool
+ bucketIsLockedOnDisk(const document::BucketId &id, const FileStorHandlerImpl::Disk &t) {
+ return (id.getRawId() != 0 && t.isLocked(id));
+ }
+
+ /**
+ * Return whether msg has sufficiently high priority that a thread with
+ * a configured priority threshold of maxPriority can even run in.
+ * Often, operations such as streaming searches will have dedicated threads
+ * that refuse lower priority operations such as Puts etc.
+ */
+ bool
+ operationHasHighEnoughPriorityToBeRun(const api::StorageMessage& msg, uint8_t maxPriority)
+ {
+ // NOTE: priority integral value 0 is considered highest pri.
+ return (msg.getPriority() <= maxPriority);
+ }
}
FileStorHandler::LockedMessage
FileStorHandlerImpl::getNextMessage(uint16_t disk, uint8_t maxPriority)
{
+ assert(disk < _diskInfo.size());
if (!tryHandlePause(disk)) {
return {}; // Still paused, return to allow tick.
}
- assert(disk < _diskInfo.size());
Disk& t(_diskInfo[disk]);
vespalib::MonitorGuard lockGuard(t.lock);
@@ -592,57 +593,21 @@ FileStorHandlerImpl::getNextMessage(uint16_t disk, uint8_t maxPriority)
// if none can be found and then exiting if the same is the case on the
// second attempt. This is key to allowing the run loop to register
// ticks at regular intervals while not busy-waiting.
- for (int attempt = 0; attempt < 2; ++attempt) {
+ for (int attempt = 0; (attempt < 2) && ! diskIsClosed(disk); ++attempt) {
PriorityIdx& idx(boost::multi_index::get<1>(t.queue));
PriorityIdx::iterator iter(idx.begin()), end(idx.end());
-
- if (diskIsClosed(disk)) {
- return {};
- }
- while (iter != end) {
- document::BucketId id(iter->_bucketId);
- if (bucketIsLockedOnDisk(id, t)) {
- ++iter; // Try next in queue, if any.
- continue;
- }
- std::shared_ptr<api::StorageMessage> msg(iter->_command);
- mbus::Trace& trace(msg->getTrace());
+ while (iter != end && bucketIsLockedOnDisk(iter->_bucketId, t)) {
+ iter++;
+ }
+ if (iter != end) {
+ api::StorageMessage &m(*iter->_command);
- if (!operationHasHighEnoughPriorityToBeRun(*msg, maxPriority)
- || operationBlockedByHigherPriorityThread(*msg, t)
- || isPaused())
+ if (operationHasHighEnoughPriorityToBeRun(m, maxPriority)
+ && ! operationBlockedByHigherPriorityThread(m, t)
+ && ! isPaused())
{
- break;
- }
-
- const uint64_t waitTime(
- const_cast<metrics::MetricTimer&>(iter->_timer).stop(
- t.metrics->averageQueueWaitingTime[
- msg->getLoadType()]));
-
- MBUS_TRACE(trace, 9, "FileStorHandler: Message identified by "
- "disk thread.");
- LOG(debug,
- "Message %s waited %" PRIu64 " ms in storage queue, timeout %d",
- msg->toString().c_str(), waitTime,
- static_cast<api::StorageCommand&>(*msg).getTimeout());
-
- idx.erase(iter); // iter not used after this point.
-
- if (!messageTimedOutInQueue(*msg, waitTime)) {
- std::unique_ptr<FileStorHandler::BucketLockInterface> locker(
- takeDiskBucketLockOwnership(t, id, *msg));
- MBUS_TRACE(trace, 9, "FileStorHandler: Got lock on bucket");
- lockGuard.broadcast(); // XXX: needed here?
- return {std::move(locker), msg};
- } else {
- std::shared_ptr<api::StorageReply> msgReply(
- makeQueueTimeoutReply(*msg));
- lockGuard.broadcast(); // XXX: needed here?
- lockGuard.unlock();
- _messageSender.sendReply(msgReply);
- return {};
+ return getMessage(lockGuard, t, idx, iter);
}
}
if (attempt == 0) {
@@ -652,6 +617,37 @@ FileStorHandlerImpl::getNextMessage(uint16_t disk, uint8_t maxPriority)
return {}; // No message fetched.
}
+FileStorHandler::LockedMessage
+FileStorHandlerImpl::getMessage(vespalib::MonitorGuard & guard, Disk & t, PriorityIdx & idx, PriorityIdx::iterator iter) {
+
+ api::StorageMessage & m(*iter->_command);
+ const uint64_t waitTime(
+ const_cast<metrics::MetricTimer &>(iter->_timer).stop(
+ t.metrics->averageQueueWaitingTime[m.getLoadType()]));
+
+ mbus::Trace &trace(m.getTrace());
+ MBUS_TRACE(trace, 9, "FileStorHandler: Message identified by disk thread.");
+ LOG(debug, "Message %s waited %" PRIu64 " ms in storage queue, timeout %d",
+ m.toString().c_str(), waitTime, static_cast<api::StorageCommand &>(m).getTimeout());
+
+ std::shared_ptr<api::StorageMessage> msg = std::move(iter->_command);
+ document::BucketId bucketId(iter->_bucketId);
+ idx.erase(iter); // iter not used after this point.
+
+ if (!messageTimedOutInQueue(*msg, waitTime)) {
+ auto locker = takeDiskBucketLockOwnership(guard, t, bucketId, *msg);
+ guard.unlock();
+ MBUS_TRACE(trace, 9, "FileStorHandler: Got lock on bucket");
+ return std::move(FileStorHandler::LockedMessage(std::move(locker), std::move(msg)));
+ } else {
+ std::shared_ptr<api::StorageReply> msgReply(makeQueueTimeoutReply(*msg));
+ guard.broadcast(); // XXX: needed here?
+ guard.unlock();
+ _messageSender.sendReply(msgReply);
+ return {};
+ }
+}
+
std::shared_ptr<FileStorHandler::BucketLockInterface>
FileStorHandlerImpl::lock(const document::BucketId& bucket, uint16_t disk)
{
@@ -673,7 +669,7 @@ FileStorHandlerImpl::lock(const document::BucketId& bucket, uint16_t disk)
}
std::shared_ptr<FileStorHandler::BucketLockInterface> locker(
- new BucketLock(t, bucket, 255, "External lock"));
+ new BucketLock(lockGuard, t, bucket, 255, "External lock"));
lockGuard.broadcast();
return locker;
@@ -1056,7 +1052,7 @@ FileStorHandlerImpl::remapQueueNoLock(
for (BucketIdx::iterator i = range.first; i != range.second; ++i) {
assert(i->_bucketId == source.bid);
- entriesFound.push_back(*i);
+ entriesFound.push_back(std::move(*i));
}
// Remove them
@@ -1097,7 +1093,7 @@ FileStorHandlerImpl::remapQueueNoLock(
} else {
entry._bucketId = bid;
// Move to correct disk queue if needed
- _diskInfo[targetDisk].queue.push_back(entry);
+ _diskInfo[targetDisk].queue.emplace_back(std::move(entry));
}
}
@@ -1212,6 +1208,7 @@ FileStorHandlerImpl::MessageEntry::MessageEntry(const std::shared_ptr<api::Stora
_priority(cmd->getPriority())
{ }
+
FileStorHandlerImpl::MessageEntry::MessageEntry(const MessageEntry& entry)
: _command(entry._command),
_timer(entry._timer),
@@ -1219,6 +1216,14 @@ FileStorHandlerImpl::MessageEntry::MessageEntry(const MessageEntry& entry)
_priority(entry._priority)
{ }
+
+FileStorHandlerImpl::MessageEntry::MessageEntry(MessageEntry && entry)
+ : _command(std::move(entry._command)),
+ _timer(entry._timer),
+ _bucketId(entry._bucketId),
+ _priority(entry._priority)
+{ }
+
FileStorHandlerImpl::MessageEntry::~MessageEntry() { }
FileStorHandlerImpl::Disk::Disk()
@@ -1252,6 +1257,7 @@ FileStorHandlerImpl::getQueueSize(uint16_t disk) const
}
FileStorHandlerImpl::BucketLock::BucketLock(
+ const vespalib::MonitorGuard & guard,
Disk& disk,
const document::BucketId& id,
uint8_t priority,
@@ -1259,6 +1265,7 @@ FileStorHandlerImpl::BucketLock::BucketLock(
: _disk(disk),
_id(id)
{
+ (void) guard;
if (_id.getRawId() != 0) {
// Lock the bucket and wait until it is not the current operation for
// the disk itself.
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
index 27bb537ab18..410d586f0a2 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
@@ -15,11 +15,11 @@
#pragma once
+#include "filestorhandler.h"
+#include "mergestatus.h"
#include <vespa/document/bucket/bucketid.h>
#include <vespa/metrics/metrics.h>
#include <vespa/storage/common/servicelayercomponent.h>
-#include <vespa/storage/persistence/filestorage/filestorhandler.h>
-#include <vespa/storage/persistence/filestorage/mergestatus.h>
#include <vespa/storageframework/storageframework.h>
#include <boost/multi_index_container.hpp>
#include <boost/multi_index/identity.hpp>
@@ -51,6 +51,7 @@ public:
uint8_t _priority;
MessageEntry(const std::shared_ptr<api::StorageMessage>& cmd, const document::BucketId& bId);
+ MessageEntry(MessageEntry &&);
MessageEntry(const MessageEntry &);
MessageEntry & operator = (const MessageEntry &) = delete;
~MessageEntry();
@@ -130,7 +131,7 @@ public:
class BucketLock : public FileStorHandler::BucketLockInterface {
public:
- BucketLock(Disk& disk, const document::BucketId& id, uint8_t priority,
+ BucketLock(const vespalib::MonitorGuard & guard, Disk& disk, const document::BucketId& id, uint8_t priority,
const vespalib::stringref & statusString);
~BucketLock();
@@ -159,6 +160,7 @@ public:
void pause(uint16_t disk, uint8_t priority) const;
FileStorHandler::LockedMessage getNextMessage(uint16_t disk, uint8_t lowestPriority);
+ FileStorHandler::LockedMessage getMessage(vespalib::MonitorGuard & guard, Disk & t, PriorityIdx & idx, PriorityIdx::iterator iter);
FileStorHandler::LockedMessage & getNextMessage(uint16_t disk, FileStorHandler::LockedMessage& lock,
uint8_t lowestPriority);
@@ -236,14 +238,6 @@ private:
bool diskIsClosed(uint16_t disk) const;
/**
- * Return whether msg has sufficiently high priority that a thread with
- * a configured priority threshold of maxPriority can even run in.
- * Often, operations such as streaming searches will have dedicated threads
- * that refuse lower priority operations such as Puts etc.
- */
- bool operationHasHighEnoughPriorityToBeRun(const api::StorageMessage& msg, uint8_t maxPriority) const;
-
- /**
* Return whether an already running high priority operation pre-empts
* (blocks) the operation in msg from even starting in the current thread.
*/
@@ -260,14 +254,14 @@ private:
* Disk lock MUST have been taken prior to calling this function.
*/
std::unique_ptr<FileStorHandler::BucketLockInterface>
- takeDiskBucketLockOwnership(Disk& disk, const document::BucketId& id, const api::StorageMessage& msg);
+ takeDiskBucketLockOwnership(const vespalib::MonitorGuard & guard,
+ Disk& disk, const document::BucketId& id, const api::StorageMessage& msg);
/**
* Creates and returns a reply with api::TIMEOUT return code for msg.
* Swaps (invalidates) context from msg into reply.
*/
std::unique_ptr<api::StorageReply> makeQueueTimeoutReply(api::StorageMessage& msg) const;
- bool bucketIsLockedOnDisk(const document::BucketId&, const Disk&) const;
bool messageMayBeAborted(const api::StorageMessage& msg) const;
bool hasBlockingOperations(const Disk& t) const;
void abortQueuedCommandsForBuckets(Disk& disk, const AbortBucketOperationsCommand& cmd);
diff --git a/storage/src/vespa/storage/persistence/providershutdownwrapper.cpp b/storage/src/vespa/storage/persistence/providershutdownwrapper.cpp
index 504c3b09419..890c1dc7110 100644
--- a/storage/src/vespa/storage/persistence/providershutdownwrapper.cpp
+++ b/storage/src/vespa/storage/persistence/providershutdownwrapper.cpp
@@ -1,7 +1,7 @@
// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include <vespa/fastos/fastos.h>
-#include <vespa/storage/persistence/providershutdownwrapper.h>
-#include <vespa/storage/persistence/persistenceutil.h>
+
+#include "providershutdownwrapper.h"
+#include "persistenceutil.h"
#include <vespa/log/log.h>
LOG_SETUP(".persistence.shutdownwrapper");