summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@oath.com>2018-03-20 14:27:23 +0100
committerHenning Baldersheim <balder@oath.com>2018-04-09 14:19:48 +0200
commit4f01536d66fdfcb7978d841c5aabb441b1e846ce (patch)
treefad90ed1216126682a52bb4294400a3385beedde /storage
parente5b2095628ed2aa7c24245f403d086b04ef0727c (diff)
- Use std::make_
- C+11 for loops - Use unique_ptr to shared_ptr built in assignment to avoid explicit release.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp314
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h28
2 files changed, 111 insertions, 231 deletions
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
index 42e5d05d650..94eb85a8c49 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
@@ -134,15 +134,15 @@ FileStorHandlerImpl::flush(bool killPendingMerges)
Disk& t(_diskInfo[i]);
vespalib::MonitorGuard lockGuard(t.lock);
while (t.getQueueSize() != 0 || !t.lockedBuckets.empty()) {
- LOG(debug, "Still %d in queue and %ld locked buckets for disk '%d'", t.getQueueSize(), t.lockedBuckets.size(), i);
+ LOG(debug, "Still %d in queue and %ld locked buckets for disk '%d'",
+ t.getQueueSize(), t.lockedBuckets.size(), i);
lockGuard.wait(100);
}
LOG(debug, "All queues and bucket locks released for disk '%d'", i);
}
if (killPendingMerges) {
- api::ReturnCode code(api::ReturnCode::ABORTED,
- "Storage node is shutting down");
+ api::ReturnCode code(api::ReturnCode::ABORTED, "Storage node is shutting down");
for (auto & entry : _mergeStates)
{
MergeStatus& s(*entry.second);
@@ -164,12 +164,10 @@ FileStorHandlerImpl::flush(bool killPendingMerges)
}
void
-FileStorHandlerImpl::reply(api::StorageMessage& msg,
- DiskState state) const
+FileStorHandlerImpl::reply(api::StorageMessage& msg, DiskState state) const
{
if (!msg.getType().isReply()) {
- std::shared_ptr<api::StorageReply> rep(
- static_cast<api::StorageCommand&>(msg).makeReply().release());
+ std::shared_ptr<api::StorageReply> rep = static_cast<api::StorageCommand&>(msg).makeReply();
if (state == FileStorHandler::DISABLED) {
rep->setResult(api::ReturnCode(api::ReturnCode::DISK_FAILURE, "Disk disabled"));
} else {
@@ -231,8 +229,7 @@ FileStorHandlerImpl::getQueueSize() const
}
bool
-FileStorHandlerImpl::schedule(const std::shared_ptr<api::StorageMessage>& msg,
- uint16_t disk)
+FileStorHandlerImpl::schedule(const std::shared_ptr<api::StorageMessage>& msg, uint16_t disk)
{
assert(disk < _diskInfo.size());
Disk& t(_diskInfo[disk]);
@@ -241,15 +238,10 @@ FileStorHandlerImpl::schedule(const std::shared_ptr<api::StorageMessage>& msg,
if (t.getState() == FileStorHandler::AVAILABLE) {
MBUS_TRACE(msg->getTrace(), 5, vespalib::make_string(
- "FileStorHandler: Operation added to disk %d's queue with "
- "priority %u", disk, msg->getPriority()));
+ "FileStorHandler: Operation added to disk %d's queue with priority %u", disk, msg->getPriority()));
t.queue.emplace_back(std::move(messageEntry));
-
- LOG(spam, "Queued operation %s with priority %u.",
- msg->getType().toString().c_str(),
- msg->getPriority());
-
+ LOG(spam, "Queued operation %s with priority %u.", msg->getType().toString().c_str(), msg->getPriority());
lockGuard.broadcast();
} else {
return false;
@@ -287,25 +279,20 @@ FileStorHandlerImpl::messageMayBeAborted(const api::StorageMessage& msg) const
}
void
-FileStorHandlerImpl::abortQueuedCommandsForBuckets(
- Disk& disk,
- const AbortBucketOperationsCommand& cmd)
+FileStorHandlerImpl::abortQueuedCommandsForBuckets(Disk& disk, const AbortBucketOperationsCommand& cmd)
{
Disk& t(disk);
vespalib::MonitorGuard diskLock(t.lock);
typedef PriorityQueue::iterator iter_t;
api::ReturnCode abortedCode(api::ReturnCode::ABORTED,
- "Sending distributor no longer owns "
- "bucket operation was bound to or storage node went down");
+ "Sending distributor no longer owns bucket operation was bound to, "
+ "or storage node went down");
for (iter_t it(t.queue.begin()), e(t.queue.end()); it != e;) {
api::StorageMessage& msg(*it->_command);
if (messageMayBeAborted(msg) && cmd.shouldAbort(it->_bucket)) {
- LOG(debug,
- "Aborting operation %s as it is bound for bucket %s",
- msg.toString().c_str(),
- it->_bucket.getBucketId().toString().c_str());
- std::shared_ptr<api::StorageReply> msgReply(
- static_cast<api::StorageCommand&>(msg).makeReply().release());
+ LOG(debug, "Aborting operation %s as it is bound for bucket %s",
+ msg.toString().c_str(), it->_bucket.getBucketId().toString().c_str());
+ std::shared_ptr<api::StorageReply> msgReply = static_cast<api::StorageCommand&>(msg).makeReply();
msgReply->setResult(abortedCode);
_messageSender.sendReply(msgReply);
@@ -317,15 +304,12 @@ FileStorHandlerImpl::abortQueuedCommandsForBuckets(
}
bool
-FileStorHandlerImpl::diskHasActiveOperationForAbortedBucket(
- const Disk& disk,
- const AbortBucketOperationsCommand& cmd) const
+FileStorHandlerImpl::diskHasActiveOperationForAbortedBucket(const Disk& disk,
+ const AbortBucketOperationsCommand& cmd) const
{
for (auto& lockedBucket : disk.lockedBuckets) {
if (cmd.shouldAbort(lockedBucket.first)) {
- LOG(spam,
- "Disk had active operation for aborted bucket %s, "
- "waiting for it to complete...",
+ LOG(spam, "Disk had active operation for aborted bucket %s, waiting for it to complete...",
lockedBucket.first.toString().c_str());
return true;
}
@@ -334,9 +318,7 @@ FileStorHandlerImpl::diskHasActiveOperationForAbortedBucket(
}
void
-FileStorHandlerImpl::waitUntilNoActiveOperationsForAbortedBuckets(
- Disk& disk,
- const AbortBucketOperationsCommand& cmd)
+FileStorHandlerImpl::waitUntilNoActiveOperationsForAbortedBuckets(Disk& disk, const AbortBucketOperationsCommand& cmd)
{
vespalib::MonitorGuard guard(disk.lock);
while (diskHasActiveOperationForAbortedBucket(disk, cmd)) {
@@ -346,8 +328,7 @@ FileStorHandlerImpl::waitUntilNoActiveOperationsForAbortedBuckets(
}
void
-FileStorHandlerImpl::abortQueuedOperations(
- const AbortBucketOperationsCommand& cmd)
+FileStorHandlerImpl::abortQueuedOperations(const AbortBucketOperationsCommand& cmd)
{
// Do queue clearing and active operation waiting in two passes
// to allow disk threads to drain running operations in parallel.
@@ -386,7 +367,7 @@ FileStorHandlerImpl::getNextMessage(uint16_t disk, FileStorHandler::LockedMessag
}
vespalib::MonitorGuard lockGuard(t.lock);
- BucketIdx& idx = boost::multi_index::get<2>(t.queue);
+ BucketIdx& idx = bmi::get<2>(t.queue);
std::pair<BucketIdx::iterator, BucketIdx::iterator> range = idx.equal_range(bucket);
// No more for this bucket.
@@ -417,7 +398,7 @@ FileStorHandlerImpl::getNextMessage(uint16_t disk, FileStorHandler::LockedMessag
lockGuard.unlock();
return lck;
} else {
- std::shared_ptr<api::StorageReply> msgReply(static_cast<api::StorageCommand&>(m).makeReply().release());
+ std::shared_ptr<api::StorageReply> msgReply = static_cast<api::StorageCommand&>(m).makeReply();
idx.erase(range.first);
lockGuard.broadcast();
lockGuard.unlock();
@@ -450,8 +431,7 @@ FileStorHandlerImpl::diskIsClosed(uint16_t disk) const
}
bool
-FileStorHandlerImpl::messageTimedOutInQueue(const api::StorageMessage& msg,
- uint64_t waitTime) const
+FileStorHandlerImpl::messageTimedOutInQueue(const api::StorageMessage& msg, uint64_t waitTime) const
{
if (msg.getType().isReply()) {
return false; // Replies must always be processed and cannot time out.
@@ -460,11 +440,8 @@ FileStorHandlerImpl::messageTimedOutInQueue(const api::StorageMessage& msg,
}
std::unique_ptr<FileStorHandler::BucketLockInterface>
-FileStorHandlerImpl::takeDiskBucketLockOwnership(
- const vespalib::MonitorGuard & guard,
- Disk& disk,
- const document::Bucket &bucket,
- const api::StorageMessage& msg)
+FileStorHandlerImpl::takeDiskBucketLockOwnership(const vespalib::MonitorGuard & guard, Disk& disk,
+ const document::Bucket &bucket, const api::StorageMessage& msg)
{
return std::make_unique<BucketLock>(guard, disk, bucket, msg.getPriority(), msg.getSummary());
}
@@ -473,11 +450,8 @@ std::unique_ptr<api::StorageReply>
FileStorHandlerImpl::makeQueueTimeoutReply(api::StorageMessage& msg) const
{
assert(!msg.getType().isReply());
- std::unique_ptr<api::StorageReply> msgReply(
- static_cast<api::StorageCommand&>(msg).makeReply().release());
- msgReply->setResult(api::ReturnCode(
- api::ReturnCode::TIMEOUT,
- "Message waited too long in storage queue"));
+ std::unique_ptr<api::StorageReply> msgReply = static_cast<api::StorageCommand&>(msg).makeReply();
+ msgReply->setResult(api::ReturnCode(api::ReturnCode::TIMEOUT, "Message waited too long in storage queue"));
return msgReply;
}
@@ -504,7 +478,7 @@ FileStorHandlerImpl::getNextMessage(uint16_t disk)
// second attempt. This is key to allowing the run loop to register
// ticks at regular intervals while not busy-waiting.
for (int attempt = 0; (attempt < 2) && ! diskIsClosed(disk); ++attempt) {
- PriorityIdx& idx(boost::multi_index::get<1>(t.queue));
+ PriorityIdx& idx(bmi::get<1>(t.queue));
PriorityIdx::iterator iter(idx.begin()), end(idx.end());
while (iter != end && bucketIsLockedOnDisk(iter->_bucket, t)) {
@@ -527,8 +501,7 @@ FileStorHandlerImpl::getMessage(vespalib::MonitorGuard & guard, Disk & t, Priori
api::StorageMessage & m(*iter->_command);
const uint64_t waitTime(
- const_cast<metrics::MetricTimer &>(iter->_timer).stop(
- t.metrics->averageQueueWaitingTime[m.getLoadType()]));
+ const_cast<metrics::MetricTimer &>(iter->_timer).stop(t.metrics->averageQueueWaitingTime[m.getLoadType()]));
mbus::Trace &trace(m.getTrace());
MBUS_TRACE(trace, 9, "FileStorHandler: Message identified by disk thread.");
@@ -585,11 +558,8 @@ namespace {
monitors[index] = &monitor;
}
void lock() {
- for (std::map<uint16_t, vespalib::Monitor*>::iterator it
- = monitors.begin(); it != monitors.end(); ++it)
- {
- guards.push_back(std::shared_ptr<vespalib::MonitorGuard>(
- new vespalib::MonitorGuard(*it->second)));
+ for (auto & entry : monitors) {
+ guards.push_back(std::make_shared<vespalib::MonitorGuard>(*entry.second));
}
}
};
@@ -605,12 +575,10 @@ namespace {
return static_cast<const api::PutCommand&>(msg).getDocumentId();
break;
case api::MessageType::UPDATE_ID:
- return static_cast<const api::UpdateCommand&>(msg)
- .getDocumentId();
+ return static_cast<const api::UpdateCommand&>(msg).getDocumentId();
break;
case api::MessageType::REMOVE_ID:
- return static_cast<const api::RemoveCommand&>(msg)
- .getDocumentId();
+ return static_cast<const api::RemoveCommand&>(msg).getDocumentId();
break;
default:
assert(false);
@@ -633,9 +601,7 @@ namespace {
}
int
-FileStorHandlerImpl::calculateTargetBasedOnDocId(
- const api::StorageMessage& msg,
- std::vector<RemapInfo*>& targets)
+FileStorHandlerImpl::calculateTargetBasedOnDocId(const api::StorageMessage& msg, std::vector<RemapInfo*>& targets)
{
document::DocumentId id(getDocId(msg));
document::Bucket bucket(msg.getBucket().getBucketSpace(), _bucketIdFactory.getBucketId(id));
@@ -651,13 +617,18 @@ FileStorHandlerImpl::calculateTargetBasedOnDocId(
return -1;
}
+namespace {
+
+const char *
+splitOrJoin(FileStorHandlerImpl::Operation op) {
+ return (op == FileStorHandlerImpl::Operation::SPLIT) ? "Bucket was just split" : "Bucket was just joined";
+}
+
+}
+
document::Bucket
-FileStorHandlerImpl::remapMessage(
- api::StorageMessage& msg,
- const document::Bucket& source,
- Operation op,
- std::vector<RemapInfo*>& targets,
- uint16_t& targetDisk, api::ReturnCode& returnCode)
+FileStorHandlerImpl::remapMessage(api::StorageMessage& msg, const document::Bucket& source, Operation op,
+ std::vector<RemapInfo*>& targets, uint16_t& targetDisk, api::ReturnCode& returnCode)
{
document::Bucket newBucket = source;
@@ -668,8 +639,7 @@ FileStorHandlerImpl::remapMessage(
case api::MessageType::REMOVE_ID:
// Move to correct queue
{
- api::BucketCommand& cmd(
- static_cast<api::BucketCommand&>(msg));
+ api::BucketCommand& cmd(static_cast<api::BucketCommand&>(msg));
if (cmd.getBucket() == source) {
if (op == SPLIT) {
@@ -693,8 +663,7 @@ FileStorHandlerImpl::remapMessage(
} else {
document::DocumentId did(getDocId(msg));
document::BucketId bucket = _bucketIdFactory.getBucketId(did);
- uint32_t commonBits(
- findCommonBits(targets[0]->bucket.getBucketId(), bucket));
+ uint32_t commonBits(findCommonBits(targets[0]->bucket.getBucketId(), bucket));
if (commonBits < source.getBucketId().getUsedBits()) {
std::ostringstream ost;
ost << bucket << " belongs in neither "
@@ -703,10 +672,8 @@ FileStorHandlerImpl::remapMessage(
<< "did not belong in the original "
<< "bucket " << source.getBucketId();
LOG(error, "Error remapping %s after split %s",
- cmd.getType().toString().c_str(),
- ost.str().c_str());
- returnCode = api::ReturnCode(
- api::ReturnCode::REJECTED, ost.str());
+ cmd.getType().toString().c_str(), ost.str().c_str());
+ returnCode = api::ReturnCode(api::ReturnCode::REJECTED, ost.str());
} else {
std::ostringstream ost;
assert(targets.size() == 2);
@@ -716,9 +683,7 @@ FileStorHandlerImpl::remapMessage(
<< "Failing operation so distributor can create "
<< "bucket on correct node.";
LOG(debug, "%s", ost.str().c_str());
- returnCode = api::ReturnCode(
- api::ReturnCode::BUCKET_DELETED,
- ost.str());
+ returnCode = api::ReturnCode(api::ReturnCode::BUCKET_DELETED, ost.str());
}
}
} else {
@@ -740,8 +705,7 @@ FileStorHandlerImpl::remapMessage(
}
} else {
LOG(debug, "Did not remap %s with bucket %s from bucket %s",
- cmd.toString().c_str(), cmd.getBucketId().toString().c_str(),
- source.toString().c_str());
+ cmd.toString().c_str(), cmd.getBucketId().toString().c_str(), source.toString().c_str());
assert(false);
}
break;
@@ -755,15 +719,13 @@ FileStorHandlerImpl::remapMessage(
// if op == MOVE. If op != MOVE, fail with bucket not found
// and clear filestor thread state
{
- api::BucketCommand& cmd(
- static_cast<api::BucketCommand&>(msg));
+ api::BucketCommand& cmd(static_cast<api::BucketCommand&>(msg));
if (cmd.getBucket() == source) {
if (op != MOVE) {
std::ostringstream ost;
ost << "Bucket " << (op == SPLIT ? "split" : "joined")
<< ". Cannot remap merge, so aborting it";
- api::ReturnCode code(api::ReturnCode::BUCKET_DELETED,
- ost.str());
+ api::ReturnCode code(api::ReturnCode::BUCKET_DELETED, ost.str());
clearMergeStatus(cmd.getBucket(), &code);
}
}
@@ -775,19 +737,14 @@ FileStorHandlerImpl::remapMessage(
// Fail with bucket not found if op is JOIN
// Ok if op is SPLIT, as we have already done as requested.
{
- api::BucketCommand& cmd(
- static_cast<api::BucketCommand&>(msg));
+ api::BucketCommand& cmd(static_cast<api::BucketCommand&>(msg));
if (cmd.getBucket() == source) {
if (op == MOVE) {
targetDisk = targets[0]->diskIndex;
} else if (op == SPLIT) {
- returnCode = api::ReturnCode(
- api::ReturnCode::BUCKET_DELETED,
- "Bucket split while operation enqueued");
+ returnCode = api::ReturnCode(api::ReturnCode::BUCKET_DELETED, "Bucket split while operation enqueued");
} else {
- returnCode = api::ReturnCode(
- api::ReturnCode::BUCKET_DELETED,
- "Bucket was just joined");
+ returnCode = api::ReturnCode(api::ReturnCode::BUCKET_DELETED, "Bucket was just joined");
}
}
break;
@@ -805,10 +762,7 @@ FileStorHandlerImpl::remapMessage(
if (op == MOVE) {
targetDisk = targets[0]->diskIndex;
} else {
- returnCode = api::ReturnCode(
- api::ReturnCode::BUCKET_DELETED,
- op == SPLIT ? "Bucket was just split"
- : "Bucket was just joined");
+ returnCode = api::ReturnCode(api::ReturnCode::BUCKET_DELETED, splitOrJoin(op));
}
}
break;
@@ -818,8 +772,7 @@ FileStorHandlerImpl::remapMessage(
case api::MessageType::JOINBUCKETS_ID:
// Move to correct queue if op == MOVE. Otherwise ignore.
{
- api::BucketCommand& cmd(
- static_cast<api::BucketCommand&>(msg));
+ api::BucketCommand& cmd(static_cast<api::BucketCommand&>(msg));
if (cmd.getBucket() == source) {
if (op == MOVE) {
targetDisk = targets[0]->diskIndex;
@@ -829,8 +782,7 @@ FileStorHandlerImpl::remapMessage(
}
case api::MessageType::INTERNAL_ID:
{
- const api::InternalCommand& icmd(
- static_cast<const api::InternalCommand&>(msg));
+ const api::InternalCommand& icmd(static_cast<const api::InternalCommand&>(msg));
document::Bucket bucket;
switch(icmd.getType()) {
case RequestStatusPage::ID:
@@ -844,11 +796,7 @@ FileStorHandlerImpl::remapMessage(
if (op == MOVE) {
targetDisk = targets[0]->diskIndex;
} else {
- returnCode = api::ReturnCode(
- api::ReturnCode::BUCKET_DELETED,
- op == SPLIT
- ? "Bucket was just split"
- : "Bucket was just joined");
+ returnCode = api::ReturnCode(api::ReturnCode::BUCKET_DELETED, splitOrJoin(op));
}
}
break;
@@ -857,8 +805,7 @@ FileStorHandlerImpl::remapMessage(
//@fallthrough@
case RepairBucketCommand::ID:
if (bucket.getBucketId().getRawId() == 0) {
- bucket = static_cast<RepairBucketCommand&>(msg)
- .getBucket();
+ bucket = static_cast<RepairBucketCommand&>(msg).getBucket();
}
// Move to correct queue if op == MOVE
// Fail with bucket not found if op != MOVE
@@ -866,11 +813,7 @@ FileStorHandlerImpl::remapMessage(
if (op == MOVE) {
targetDisk = targets[0]->diskIndex;
} else {
- returnCode = api::ReturnCode(
- api::ReturnCode::BUCKET_DELETED,
- op == SPLIT
- ? "Bucket was just split"
- : "Bucket was just joined");
+ returnCode = api::ReturnCode(api::ReturnCode::BUCKET_DELETED, splitOrJoin(op));
}
}
break;
@@ -878,22 +821,14 @@ FileStorHandlerImpl::remapMessage(
// Fail bucket not found if op != MOVE
// Fail and log error if op == MOVE
{
- api::BucketCommand& cmd(
- static_cast<api::BucketCommand&>(msg));
+ api::BucketCommand& cmd(static_cast<api::BucketCommand&>(msg));
if (cmd.getBucket() == source) {
if (op == MOVE) {
- returnCode = api::ReturnCode(
- api::ReturnCode::INTERNAL_FAILURE,
- "Multiple bucket disk move "
- "commands pending at the same time "
- "towards bucket "
- + source.toString());
+ returnCode = api::ReturnCode(api::ReturnCode::INTERNAL_FAILURE,
+ "Multiple bucket disk move commands pending at the same time "
+ " towards bucket " + source.toString());
} else {
- returnCode = api::ReturnCode(
- api::ReturnCode::BUCKET_DELETED,
- op == SPLIT
- ? "Bucket was just split"
- : "Bucket was just joined");
+ returnCode = api::ReturnCode(api::ReturnCode::BUCKET_DELETED, splitOrJoin(op));
}
}
break;
@@ -910,13 +845,10 @@ FileStorHandlerImpl::remapMessage(
default:
// Fail and log error
{
- LOG(error, "Attempted (and failed) to remap %s which should "
- "not be processed at this time",
+ LOG(error, "Attempted (and failed) to remap %s which should not be processed at this time",
msg.toString(true).c_str());
- returnCode = api::ReturnCode(
- api::ReturnCode::INTERNAL_FAILURE,
- "No such message should be processed at "
- "this time.");
+ returnCode = api::ReturnCode(api::ReturnCode::INTERNAL_FAILURE,
+ "No such message should be processed at this time.");
break;
}
}
@@ -924,12 +856,8 @@ FileStorHandlerImpl::remapMessage(
}
default:
{
- returnCode = api::ReturnCode(
- api::ReturnCode::INTERNAL_FAILURE,
- "Unknown message type in persistence layer");
- LOG(error,
- "Unknown message type in persistence layer: %s",
- msg.toString().c_str());
+ returnCode = api::ReturnCode(api::ReturnCode::INTERNAL_FAILURE, "Unknown message type in persistence layer");
+ LOG(error, "Unknown message type in persistence layer: %s", msg.toString().c_str());
}
} // End of switch
@@ -937,15 +865,11 @@ FileStorHandlerImpl::remapMessage(
}
void
-FileStorHandlerImpl::remapQueueNoLock(
- Disk& from,
- const RemapInfo& source,
- std::vector<RemapInfo*>& targets,
- Operation op)
+FileStorHandlerImpl::remapQueueNoLock(Disk& from, const RemapInfo& source,
+ std::vector<RemapInfo*>& targets, Operation op)
{
- BucketIdx& idx(boost::multi_index::get<2>(from.queue));
- std::pair<BucketIdx::iterator, BucketIdx::iterator> range(
- idx.equal_range(source.bucket));
+ BucketIdx& idx(bmi::get<2>(from.queue));
+ std::pair<BucketIdx::iterator, BucketIdx::iterator> range(idx.equal_range(source.bucket));
std::vector<MessageEntry> entriesFound;
@@ -971,22 +895,14 @@ FileStorHandlerImpl::remapQueueNoLock(
api::StorageMessage& msg(*entry._command);
assert(entry._bucket == source.bucket);
- document::Bucket bucket = remapMessage(msg,
- source.bucket,
- op,
- targets,
- targetDisk,
- returnCode);
+ document::Bucket bucket = remapMessage(msg, source.bucket, op, targets, targetDisk, returnCode);
if (returnCode.getResult() != api::ReturnCode::OK) {
// Fail message if errorcode set
if (!msg.getType().isReply()) {
- std::shared_ptr<api::StorageReply> rep(
- static_cast<api::StorageCommand&>(msg)
- .makeReply().release());
+ std::shared_ptr<api::StorageReply> rep = static_cast<api::StorageCommand&>(msg).makeReply();
LOG(spam, "Sending reply %s because remapping failed: %s",
- msg.toString().c_str(),
- returnCode.toString().c_str());
+ msg.toString().c_str(), returnCode.toString().c_str());
rep->setResult(returnCode);
_messageSender.sendReply(rep);
@@ -1001,10 +917,8 @@ FileStorHandlerImpl::remapQueueNoLock(
}
void
-FileStorHandlerImpl::remapQueue(
- const RemapInfo& source,
- RemapInfo& target,
- Operation op) {
+FileStorHandlerImpl::remapQueue(const RemapInfo& source, RemapInfo& target, Operation op)
+{
// Use a helper class to lock to solve issue that some buckets might be
// the same bucket. Will fix order if we accept wrong order later.
MultiLockGuard guard;
@@ -1026,11 +940,7 @@ FileStorHandlerImpl::remapQueue(
}
void
-FileStorHandlerImpl::remapQueue(
- const RemapInfo& source,
- RemapInfo& target1,
- RemapInfo& target2,
- Operation op)
+FileStorHandlerImpl::remapQueue(const RemapInfo& source, RemapInfo& target1, RemapInfo& target2, Operation op)
{
// Use a helper class to lock to solve issue that some buckets might be
// the same bucket. Will fix order if we accept wrong order later.
@@ -1059,16 +969,13 @@ FileStorHandlerImpl::remapQueue(
}
void
-FileStorHandlerImpl::failOperations(
- const document::Bucket &bucket, uint16_t fromDisk,
- const api::ReturnCode& err)
+FileStorHandlerImpl::failOperations(const document::Bucket &bucket, uint16_t fromDisk, const api::ReturnCode& err)
{
Disk& from(_diskInfo[fromDisk]);
vespalib::MonitorGuard lockGuard(from.lock);
- BucketIdx& idx(boost::multi_index::get<2>(from.queue));
- std::pair<BucketIdx::iterator, BucketIdx::iterator> range(
- idx.equal_range(bucket));
+ BucketIdx& idx(bmi::get<2>(from.queue));
+ std::pair<BucketIdx::iterator, BucketIdx::iterator> range(idx.equal_range(bucket));
for (auto iter = range.first; iter != range.second;) {
// We want to post delete bucket to list before calling this
@@ -1076,9 +983,7 @@ FileStorHandlerImpl::failOperations(
// cannot delete the delete bucket operation itself
if (iter->_command->getType() != api::MessageType::DELETEBUCKET) {
if (!iter->_command->getType().isReply()) {
- std::shared_ptr<api::StorageReply> msgReply(
- static_cast<api::StorageCommand&>(*iter->_command)
- .makeReply().release());
+ std::shared_ptr<api::StorageReply> msgReply = static_cast<api::StorageCommand&>(*iter->_command).makeReply();
msgReply->setResult(err);
_messageSender.sendReply(msgReply);
}
@@ -1119,10 +1024,10 @@ FileStorHandlerImpl::MessageEntry::MessageEntry(const MessageEntry& entry)
FileStorHandlerImpl::MessageEntry::MessageEntry(MessageEntry && entry) noexcept
- : _command(std::move(entry._command)),
- _timer(entry._timer),
- _bucket(entry._bucket),
- _priority(entry._priority)
+ : _command(std::move(entry._command)),
+ _timer(entry._timer),
+ _bucket(entry._bucket),
+ _priority(entry._priority)
{ }
FileStorHandlerImpl::MessageEntry::~MessageEntry() { }
@@ -1157,12 +1062,9 @@ FileStorHandlerImpl::getQueueSize(uint16_t disk) const
return t.getQueueSize();
}
-FileStorHandlerImpl::BucketLock::BucketLock(
- const vespalib::MonitorGuard & guard,
- Disk& disk,
- const document::Bucket &bucket,
- uint8_t priority,
- const vespalib::stringref & statusString)
+FileStorHandlerImpl::BucketLock::BucketLock(const vespalib::MonitorGuard & guard, Disk& disk,
+ const document::Bucket &bucket, uint8_t priority,
+ const vespalib::stringref & statusString)
: _disk(disk),
_bucket(bucket)
{
@@ -1170,12 +1072,9 @@ FileStorHandlerImpl::BucketLock::BucketLock(
if (_bucket.getBucketId().getRawId() != 0) {
// Lock the bucket and wait until it is not the current operation for
// the disk itself.
- _disk.lockedBuckets.insert(
- std::make_pair(_bucket, Disk::LockEntry(priority, statusString)));
- LOG(debug,
- "Locked bucket %s with priority %u",
- bucket.getBucketId().toString().c_str(),
- priority);
+ _disk.lockedBuckets.insert(std::make_pair(_bucket, Disk::LockEntry(priority, statusString)));
+ LOG(debug, "Locked bucket %s with priority %u",
+ bucket.getBucketId().toString().c_str(), priority);
LOG_BUCKET_OPERATION_SET_LOCK_STATE(
_bucket.getBucketId(), "acquired filestor lock", false,
@@ -1205,11 +1104,8 @@ FileStorHandlerImpl::dumpQueue(uint16_t disk) const
const Disk& t(_diskInfo[disk]);
vespalib::MonitorGuard lockGuard(t.lock);
- const PriorityIdx& idx = boost::multi_index::get<1>(t.queue);
- for (PriorityIdx::const_iterator it = idx.begin();
- it != idx.end();
- it++)
- {
+ const PriorityIdx& idx = bmi::get<1>(t.queue);
+ for (PriorityIdx::const_iterator it = idx.begin(); it != idx.end(); it++) {
ost << it->_bucket.getBucketId() << ": " << it->_command->toString() << " (priority: "
<< (int)it->_command->getPriority() << ")\n";
}
@@ -1218,8 +1114,7 @@ FileStorHandlerImpl::dumpQueue(uint16_t disk) const
}
void
-FileStorHandlerImpl::getStatus(std::ostream& out,
- const framework::HttpUrlPath& path) const
+FileStorHandlerImpl::getStatus(std::ostream& out, const framework::HttpUrlPath& path) const
{
bool verbose = path.hasAttribute("verbose");
out << "<h1>Filestor handler</h1>\n";
@@ -1247,11 +1142,8 @@ FileStorHandlerImpl::getStatus(std::ostream& out,
out << "<h4>Input queue</h4>\n";
out << "<ul>\n";
- const PriorityIdx& idx = boost::multi_index::get<1>(t.queue);
- for (PriorityIdx::const_iterator it = idx.begin();
- it != idx.end();
- it++)
- {
+ const PriorityIdx& idx = bmi::get<1>(t.queue);
+ for (PriorityIdx::const_iterator it = idx.begin(); it != idx.end(); it++) {
out << "<li>" << it->_command->toString() << " (priority: "
<< (int)it->_command->getPriority() << ")</li>\n";
}
@@ -1267,10 +1159,8 @@ FileStorHandlerImpl::getStatus(std::ostream& out,
if (_mergeStates.size() == 0) {
out << "None\n";
}
- for (std::map<document::Bucket, MergeStatus::SP>::const_iterator it
- = _mergeStates.begin(); it != _mergeStates.end(); ++it)
- {
- out << "<b>" << it->first.toString() << "</b><br>\n";
+ for (auto & entry : _mergeStates) {
+ out << "<b>" << entry.first.toString() << "</b><br>\n";
// << "<p>" << it->second << "</p>\n"; // Gets very spammy with
// the complete state here..
}
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
index 6b6d154e149..fb084c41b7c 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
@@ -37,6 +37,8 @@ class FileStorDiskMetrics;
class StorBucketDatabase;
class AbortBucketOperationsCommand;
+namespace bmi = boost::multi_index;
+
class FileStorHandlerImpl : private framework::MetricUpdateHook,
private ResumeGuard::Callback,
public MessageSender {
@@ -61,25 +63,13 @@ public:
}
};
- typedef boost::multi_index::ordered_non_unique<
- boost::multi_index::identity<MessageEntry> > PriorityOrder;
-
- typedef boost::multi_index::ordered_non_unique<
- boost::multi_index::member<MessageEntry,
- document::Bucket,
- &MessageEntry::_bucket> > BucketOrder;
-
- typedef boost::multi_index::multi_index_container<
- MessageEntry,
- boost::multi_index::indexed_by<
- boost::multi_index::sequenced<>,
- PriorityOrder,
- BucketOrder
- >
- > PriorityQueue;
-
- typedef boost::multi_index::nth_index<PriorityQueue, 1>::type PriorityIdx;
- typedef boost::multi_index::nth_index<PriorityQueue, 2>::type BucketIdx;
+ using PriorityOrder = bmi::ordered_non_unique<bmi::identity<MessageEntry> >;
+ using BucketOrder = bmi::ordered_non_unique<bmi::member<MessageEntry, document::Bucket, &MessageEntry::_bucket>>;
+
+ using PriorityQueue = bmi::multi_index_container<MessageEntry, bmi::indexed_by<bmi::sequenced<>, PriorityOrder, BucketOrder>>;
+
+ using PriorityIdx = bmi::nth_index<PriorityQueue, 1>::type;
+ using BucketIdx = bmi::nth_index<PriorityQueue, 2>::type;
struct Disk {
vespalib::Monitor lock;