summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-10-15 07:35:33 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2020-10-15 11:28:32 +0000
commitc99d392f69713b5300bd5b75d4a7d7e6b08c2d46 (patch)
treeb29b308ee23183ffd89b44ed016b77808439ece4 /storage
parent94eafebf4f7f9b4cb0650bd7aa0c0030a66895bb (diff)
Avoid one indirection for the disk state.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp211
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h119
2 files changed, 120 insertions, 210 deletions
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
index 88b3a6bcafc..82541f5e9b4 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
@@ -41,18 +41,27 @@ FileStorHandlerImpl::FileStorHandlerImpl(uint32_t numThreads, uint32_t numStripe
FileStorMetrics& metrics,
ServiceLayerComponentRegister& compReg)
: _component(compReg, "filestorhandlerimpl"),
- _disk(*this, sender, numStripes),
+ _state(FileStorHandler::AVAILABLE),
+ _nextStripeId(0),
+ _metrics(nullptr),
+ _stripes(),
_messageSender(sender),
_bucketIdFactory(_component.getBucketIdFactory()),
_getNextMessageTimeout(100ms),
_max_active_merges_per_stripe(per_stripe_merge_limit(numThreads, numStripes)),
_paused(false)
{
- _disk.metrics = metrics.disks[0].get();
- assert(_disk.metrics != nullptr);
+ assert(numStripes > 0);
+ _stripes.reserve(numStripes);
+ for (size_t i(0); i < numStripes; i++) {
+ _stripes.emplace_back(*this, sender);
+ }
+
+ _metrics = metrics.disks[0].get();
+ assert(_metrics != nullptr);
uint32_t j(0);
- for (Stripe & stripe : _disk.getStripes()) {
- stripe.setMetrics(metrics.disks[0]->stripes[j++].get());
+ for (Stripe & stripe : _stripes) {
+ stripe.setMetrics(_metrics->stripes[j++].get());
}
// Add update hook, so we will get callbacks each 5 seconds to update metrics.
@@ -141,9 +150,9 @@ FileStorHandlerImpl::clearMergeStatus(const document::Bucket& bucket, const api:
void
FileStorHandlerImpl::flush(bool killPendingMerges)
{
- LOG(debug, "Wait until queues and bucket locks released.");
- _disk.flush();
- LOG(debug, "All queues and bucket locks released.");
+ LOG(debug, "Wait until queues and bucket locks released.");
+ flush();
+ LOG(debug, "All queues and bucket locks released.");
if (killPendingMerges) {
api::ReturnCode code(api::ReturnCode::ABORTED, "Storage node is shutting down");
@@ -184,18 +193,17 @@ FileStorHandlerImpl::reply(api::StorageMessage& msg, DiskState state) const
void
FileStorHandlerImpl::setDiskState(DiskState state)
{
-
// Mark disk closed
- _disk.setState(state);
+ setState(state);
if (state != FileStorHandler::AVAILABLE) {
- _disk.flush();
+ flush();
}
}
FileStorHandler::DiskState
FileStorHandlerImpl::getDiskState() const
{
- return _disk.getState();
+ return getState();
}
void
@@ -206,20 +214,31 @@ FileStorHandlerImpl::close()
setDiskState(FileStorHandler::CLOSED);
}
LOG(debug, "Closing");
- _disk.broadcast();
+ for (auto & stripe : _stripes) {
+ stripe.broadcast();
+ }
LOG(debug, "Closed");
}
uint32_t
FileStorHandlerImpl::getQueueSize() const
{
- return _disk.getQueueSize();
+ size_t sum(0);
+ for (const auto & stripe : _stripes) {
+ sum += stripe.getQueueSize();
+ }
+ return sum;
}
bool
FileStorHandlerImpl::schedule(const std::shared_ptr<api::StorageMessage>& msg)
{
- return _disk.schedule(msg);
+ if (getState() == FileStorHandler::AVAILABLE) {
+ document::Bucket bucket = getStorageMessageBucket(*msg);
+ stripe(bucket).schedule(MessageEntry(msg, bucket));
+ return true;
+ }
+ return false;
}
bool
@@ -250,54 +269,49 @@ FileStorHandlerImpl::messageMayBeAborted(const api::StorageMessage& msg)
}
void
-FileStorHandlerImpl::abortQueuedCommandsForBuckets(Disk& disk, const AbortBucketOperationsCommand& cmd)
+FileStorHandlerImpl::abortQueuedOperations(const AbortBucketOperationsCommand& cmd)
{
+ // Do queue clearing and active operation waiting in two passes
+ // to allow disk threads to drain running operations in parallel.
api::ReturnCode abortedCode(api::ReturnCode::ABORTED,
"Sending distributor no longer owns bucket operation was bound to, "
"or storage node went down");
- auto aborted = disk.abort(cmd);
+ std::vector<std::shared_ptr<api::StorageReply>> aborted;
+ for (auto & stripe : _stripes) {
+ stripe.abort(aborted, cmd);
+ }
for (auto & msgReply : aborted) {
msgReply->setResult(abortedCode);
_messageSender.sendReply(msgReply);
}
-}
-void
-FileStorHandlerImpl::abortQueuedOperations(const AbortBucketOperationsCommand& cmd)
-{
- // Do queue clearing and active operation waiting in two passes
- // to allow disk threads to drain running operations in parallel.
- abortQueuedCommandsForBuckets(_disk, cmd);
- _disk.waitInactive(cmd);
+ for (auto & stripe : _stripes) {
+ stripe.waitInactive(cmd);
+ }
}
void
FileStorHandlerImpl::updateMetrics(const MetricLockGuard &)
{
std::lock_guard lockGuard(_mergeStatesLock);
- _disk.metrics->pendingMerges.addValue(_mergeStates.size());
- _disk.metrics->queueSize.addValue(_disk.getQueueSize());
+ _metrics->pendingMerges.addValue(_mergeStates.size());
+ _metrics->queueSize.addValue(getQueueSize());
- for (auto & entry : _disk.metrics->averageQueueWaitingTime.getMetricMap()) {
+ for (auto & entry : _metrics->averageQueueWaitingTime.getMetricMap()) {
metrics::LoadType loadType(entry.first, "ignored");
- for (const auto & stripe : _disk.metrics->stripes) {
+ for (const auto & stripe : _metrics->stripes) {
const auto & m = stripe->averageQueueWaitingTime[loadType];
entry.second->addTotalValueWithCount(m.getTotal(), m.getCount());
}
}
}
-uint32_t
-FileStorHandlerImpl::getNextStripeId() {
- return _disk.getNextStripeId();
-}
-
bool
FileStorHandlerImpl::tryHandlePause() const
{
if (isPaused()) {
// Wait a single time to see if filestor gets unpaused.
- if (!_disk.isClosed()) {
+ if (!isClosed()) {
std::unique_lock g(_pauseMonitor);
_pauseCond.wait_for(g, 100ms);
}
@@ -331,7 +345,7 @@ FileStorHandlerImpl::getNextMessage(uint32_t stripeId)
return {}; // Still paused, return to allow tick.
}
- return _disk.getNextMessage(stripeId, _getNextMessageTimeout);
+ return getNextMessage(stripeId, _getNextMessageTimeout);
}
std::shared_ptr<FileStorHandler::BucketLockInterface>
@@ -640,10 +654,9 @@ FileStorHandlerImpl::remapMessage(api::StorageMessage& msg, const document::Buck
}
void
-FileStorHandlerImpl::remapQueueNoLock(Disk& from, const RemapInfo& source,
- std::vector<RemapInfo*>& targets, Operation op)
+FileStorHandlerImpl::remapQueueNoLock(const RemapInfo& source, std::vector<RemapInfo*>& targets, Operation op)
{
- BucketIdx& idx(from.stripe(source.bucket).exposeBucketIdx());
+ BucketIdx& idx(stripe(source.bucket).exposeBucketIdx());
auto range(idx.equal_range(source.bucket));
std::vector<MessageEntry> entriesFound;
@@ -687,7 +700,7 @@ FileStorHandlerImpl::remapQueueNoLock(Disk& from, const RemapInfo& source,
assert(bucket == source.bucket || std::find_if(targets.begin(), targets.end(), [bucket](auto* e){
return e->bucket == bucket;
}) != targets.end());
- _disk.stripe(bucket).exposeQueue().emplace_back(std::move(entry));
+ stripe(bucket).exposeQueue().emplace_back(std::move(entry));
}
}
@@ -700,10 +713,10 @@ FileStorHandlerImpl::remapQueue(const RemapInfo& source, RemapInfo& target, Oper
// the same bucket. Will fix order if we accept wrong order later.
MultiLockGuard guard;
- guard.addLock(_disk.stripe(source.bucket).exposeLock(), _disk.stripe_index(source.bucket));
+ guard.addLock(stripe(source.bucket).exposeLock(), stripe_index(source.bucket));
if (target.bucket.getBucketId().getRawId() != 0) {
- guard.addLock(_disk.stripe(target.bucket).exposeLock(), _disk.stripe_index(target.bucket));
+ guard.addLock(stripe(target.bucket).exposeLock(), stripe_index(target.bucket));
}
std::vector<RemapInfo*> targets;
@@ -711,7 +724,7 @@ FileStorHandlerImpl::remapQueue(const RemapInfo& source, RemapInfo& target, Oper
guard.lock();
- remapQueueNoLock(_disk, source, targets, op);
+ remapQueueNoLock(source, targets, op);
}
void
@@ -721,14 +734,14 @@ FileStorHandlerImpl::remapQueue(const RemapInfo& source, RemapInfo& target1, Rem
// the same bucket. Will fix order if we accept wrong order later.
MultiLockGuard guard;
- guard.addLock(_disk.stripe(source.bucket).exposeLock(), _disk.stripe_index(source.bucket));
+ guard.addLock(stripe(source.bucket).exposeLock(), stripe_index(source.bucket));
if (target1.bucket.getBucketId().getRawId() != 0) {
- guard.addLock(_disk.stripe(target1.bucket).exposeLock(), _disk.stripe_index(target1.bucket));
+ guard.addLock(stripe(target1.bucket).exposeLock(), stripe_index(target1.bucket));
}
if (target2.bucket.getBucketId().getRawId() != 0) {
- guard.addLock(_disk.stripe(target2.bucket).exposeLock(), _disk.stripe_index(target2.bucket));
+ guard.addLock(stripe(target2.bucket).exposeLock(), stripe_index(target2.bucket));
}
guard.lock();
@@ -737,7 +750,7 @@ FileStorHandlerImpl::remapQueue(const RemapInfo& source, RemapInfo& target1, Rem
targets.push_back(&target1);
targets.push_back(&target2);
- remapQueueNoLock(_disk, source, targets, op);
+ remapQueueNoLock(source, targets, op);
}
void
@@ -808,64 +821,23 @@ FileStorHandlerImpl::MessageEntry::MessageEntry(MessageEntry && entry) noexcept
{ }
FileStorHandlerImpl::MessageEntry::~MessageEntry() = default;
-
-FileStorHandlerImpl::Disk::Disk(const FileStorHandlerImpl & owner, MessageSender & messageSender, uint32_t numStripes)
- : metrics(0),
- _nextStripeId(0),
- _stripes(),
- state(FileStorHandler::AVAILABLE)
-{
- _stripes.reserve(numStripes);
- for (size_t i(0); i < numStripes; i++) {
- _stripes.emplace_back(owner, messageSender);
- }
- assert(numStripes > 0);
-}
-
-FileStorHandlerImpl::Disk::Disk(Disk && rhs) noexcept
- : metrics(std::move(rhs.metrics)),
- _nextStripeId(rhs._nextStripeId),
- _stripes(std::move(rhs._stripes)),
- state(rhs.state.load())
-{ }
-
-FileStorHandlerImpl::Disk::~Disk() = default;
FileStorHandlerImpl::Stripe::~Stripe() = default;
FileStorHandlerImpl::Stripe::Stripe(Stripe &&) noexcept = default;
void
-FileStorHandlerImpl::Disk::flush()
+FileStorHandlerImpl::flush()
{
for (auto & stripe : _stripes) {
stripe.flush();
}
}
-void
-FileStorHandlerImpl::Disk::broadcast()
-{
- for (auto & stripe : _stripes) {
- stripe.broadcast();
- }
-}
-
-uint64_t FileStorHandlerImpl::Disk::dispersed_bucket_bits(const document::Bucket& bucket) noexcept {
+uint64_t
+FileStorHandlerImpl::dispersed_bucket_bits(const document::Bucket& bucket) noexcept {
const uint64_t raw_id = bucket.getBucketId().getId();
return XXH3_64bits(&raw_id, sizeof(uint64_t));
}
-bool
-FileStorHandlerImpl::Disk::schedule(const std::shared_ptr<api::StorageMessage>& msg)
-{
- MessageEntry messageEntry(msg, getStorageMessageBucket(*msg));
- if (getState() == FileStorHandler::AVAILABLE) {
- stripe(messageEntry._bucket).schedule(std::move(messageEntry));
- } else {
- return false;
- }
- return true;
-}
-
FileStorHandlerImpl::Stripe::Stripe(const FileStorHandlerImpl & owner, MessageSender & messageSender)
: _owner(owner),
_messageSender(messageSender),
@@ -878,14 +850,14 @@ FileStorHandlerImpl::Stripe::Stripe(const FileStorHandlerImpl & owner, MessageSe
{}
FileStorHandler::LockedMessage
-FileStorHandlerImpl::Stripe::getNextMessage(vespalib::duration timeout, Disk & disk)
+FileStorHandlerImpl::Stripe::getNextMessage(vespalib::duration timeout)
{
std::unique_lock guard(*_lock);
// Try to grab a message+lock, immediately retrying once after a wait
// if none can be found and then exiting if the same is the case on the
// second attempt. This is key to allowing the run loop to register
// ticks at regular intervals while not busy-waiting.
- for (int attempt = 0; (attempt < 2) && ! disk.isClosed() && !_owner.isPaused(); ++attempt) {
+ for (int attempt = 0; (attempt < 2) && ! _owner.isClosed() && !_owner.isPaused(); ++attempt) {
PriorityIdx& idx(bmi::get<1>(*_queue));
PriorityIdx::iterator iter(idx.begin()), end(idx.end());
@@ -928,14 +900,6 @@ FileStorHandlerImpl::Stripe::getMessage(monitor_guard & guard, PriorityIdx & idx
}
void
-FileStorHandlerImpl::Disk::waitUntilNoLocks() const
-{
- for (const auto & stripe : _stripes) {
- stripe.waitUntilNoLocks();
- }
-}
-
-void
FileStorHandlerImpl::Stripe::waitUntilNoLocks() const
{
std::unique_lock guard(*_lock);
@@ -945,13 +909,6 @@ FileStorHandlerImpl::Stripe::waitUntilNoLocks() const
}
void
-FileStorHandlerImpl::Disk::waitInactive(const AbortBucketOperationsCommand& cmd) const {
- for (auto & stripe : _stripes) {
- stripe.waitInactive(cmd);
- }
-}
-
-void
FileStorHandlerImpl::Stripe::waitInactive(const AbortBucketOperationsCommand& cmd) const {
std::unique_lock guard(*_lock);
while (hasActive(guard, cmd)) {
@@ -971,16 +928,6 @@ FileStorHandlerImpl::Stripe::hasActive(monitor_guard &, const AbortBucketOperati
return false;
}
-std::vector<std::shared_ptr<api::StorageReply>>
-FileStorHandlerImpl::Disk::abort(const AbortBucketOperationsCommand& cmd)
-{
- std::vector<std::shared_ptr<api::StorageReply>> aborted;
- for (auto & stripe : _stripes) {
- stripe.abort(aborted, cmd);
- }
- return aborted;
-}
-
void FileStorHandlerImpl::Stripe::abort(std::vector<std::shared_ptr<api::StorageReply>> & aborted,
const AbortBucketOperationsCommand& cmd)
{
@@ -1113,16 +1060,6 @@ FileStorHandlerImpl::Stripe::operationIsInhibited(const monitor_guard & guard, c
return isLocked(guard, bucket, msg.lockingRequirements());
}
-uint32_t
-FileStorHandlerImpl::Disk::getQueueSize() const noexcept
-{
- size_t sum(0);
- for (const auto & stripe : _stripes) {
- sum += stripe.getQueueSize();
- }
- return sum;
-}
-
FileStorHandlerImpl::BucketLock::BucketLock(const monitor_guard & guard, Stripe& stripe,
const document::Bucket &bucket, uint8_t priority,
api::MessageType::Id msgType, api::StorageMessage::Id msgId,
@@ -1149,7 +1086,7 @@ FileStorHandlerImpl::BucketLock::~BucketLock() {
}
std::string
-FileStorHandlerImpl::Disk::dumpQueue() const
+FileStorHandlerImpl::dumpQueue() const
{
std::ostringstream os;
for (const Stripe & stripe : _stripes) {
@@ -1159,7 +1096,7 @@ FileStorHandlerImpl::Disk::dumpQueue() const
}
void
-FileStorHandlerImpl::Disk::dumpQueueHtml(std::ostream & os) const
+FileStorHandlerImpl::dumpQueueHtml(std::ostream & os) const
{
for (const Stripe & stripe : _stripes) {
stripe.dumpQueueHtml(os);
@@ -1167,7 +1104,7 @@ FileStorHandlerImpl::Disk::dumpQueueHtml(std::ostream & os) const
}
void
-FileStorHandlerImpl::Disk::dumpActiveHtml(std::ostream & os) const
+FileStorHandlerImpl::dumpActiveHtml(std::ostream & os) const
{
for (const Stripe & stripe : _stripes) {
stripe.dumpActiveHtml(os);
@@ -1234,19 +1171,19 @@ FileStorHandlerImpl::getStatus(std::ostream& out, const framework::HttpUrlPath&
out << "<h1>Filestor handler</h1>\n";
out << "<h2>Disk " << "</h2>\n";
- out << "Queue size: " << _disk.getQueueSize() << "<br>\n";
+ out << "Queue size: " << getQueueSize() << "<br>\n";
out << "Disk state: ";
- switch (_disk.getState()) {
+ switch (getState()) {
case FileStorHandler::AVAILABLE: out << "AVAILABLE"; break;
case FileStorHandler::DISABLED: out << "DISABLED"; break;
case FileStorHandler::CLOSED: out << "CLOSED"; break;
}
out << "<h4>Active operations</h4>\n";
- _disk.dumpActiveHtml(out);
+ dumpActiveHtml(out);
if (verbose) {
out << "<h4>Input queue</h4>\n";
out << "<ul>\n";
- _disk.dumpQueueHtml(out);
+ dumpQueueHtml(out);
out << "</ul>\n";
}
@@ -1266,7 +1203,9 @@ FileStorHandlerImpl::getStatus(std::ostream& out, const framework::HttpUrlPath&
void
FileStorHandlerImpl::waitUntilNoLocks()
{
- _disk.waitUntilNoLocks();
+ for (const auto & stripe : _stripes) {
+ stripe.waitUntilNoLocks();
+ }
}
ResumeGuard
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
index f397b82d199..dfb830d72a5 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
@@ -75,8 +75,6 @@ public:
using Clock = std::chrono::steady_clock;
using monitor_guard = std::unique_lock<std::mutex>;
- struct Disk;
-
class Stripe {
public:
struct LockEntry {
@@ -131,7 +129,7 @@ public:
std::shared_ptr<FileStorHandler::BucketLockInterface> lock(const document::Bucket & bucket, api::LockingRequirements lockReq);
void failOperations(const document::Bucket & bucket, const api::ReturnCode & code);
- FileStorHandler::LockedMessage getNextMessage(vespalib::duration timeout, Disk & disk);
+ FileStorHandler::LockedMessage getNextMessage(vespalib::duration timeout);
void dumpQueue(std::ostream & os) const;
void dumpActiveHtml(std::ostream & os) const;
void dumpQueueHtml(std::ostream & os) const;
@@ -155,66 +153,6 @@ public:
LockedBuckets _lockedBuckets;
uint32_t _active_merges;
};
- struct Disk {
- FileStorDiskMetrics * metrics;
-
- /**
- * No assumption on memory ordering around disk state reads should
- * be made by callers.
- */
- DiskState getState() const noexcept {
- return state.load(std::memory_order_relaxed);
- }
- /**
- * No assumption on memory ordering around disk state writes should
- * be made by callers.
- */
- void setState(DiskState s) noexcept {
- state.store(s, std::memory_order_relaxed);
- }
-
- Disk(const FileStorHandlerImpl & owner, MessageSender & messageSender, uint32_t numStripes);
- Disk(Disk &&) noexcept;
- ~Disk();
-
- bool isClosed() const noexcept { return getState() == DiskState::CLOSED; }
-
- void flush();
- void broadcast();
- bool schedule(const std::shared_ptr<api::StorageMessage>& msg);
- void waitUntilNoLocks() const;
- std::vector<std::shared_ptr<api::StorageReply>> abort(const AbortBucketOperationsCommand& cmd);
- void waitInactive(const AbortBucketOperationsCommand& cmd) const;
- FileStorHandler::LockedMessage getNextMessage(uint32_t stripeId, vespalib::duration timeout) {
- return _stripes[stripeId].getNextMessage(timeout, *this);
- }
- std::shared_ptr<FileStorHandler::BucketLockInterface>
- lock(const document::Bucket & bucket, api::LockingRequirements lockReq) {
- return stripe(bucket).lock(bucket, lockReq);
- }
- void failOperations(const document::Bucket & bucket, const api::ReturnCode & code) {
- stripe(bucket).failOperations(bucket, code);
- }
-
- uint32_t getQueueSize() const noexcept;
- uint32_t getNextStripeId() { return (_nextStripeId++)%_stripes.size(); }
- std::string dumpQueue() const;
- void dumpActiveHtml(std::ostream & os) const;
- void dumpQueueHtml(std::ostream & os) const;
- static uint64_t dispersed_bucket_bits(const document::Bucket& bucket) noexcept;
- // We make a fairly reasonable assumption that there will be less than 64k stripes.
- uint16_t stripe_index(const document::Bucket& bucket) const noexcept {
- return static_cast<uint16_t>(dispersed_bucket_bits(bucket) % _stripes.size());
- }
- Stripe & stripe(const document::Bucket & bucket) {
- return _stripes[stripe_index(bucket)];
- }
- std::vector<Stripe> & getStripes() { return _stripes; }
- private:
- uint32_t _nextStripeId;
- std::vector<Stripe> _stripes;
- std::atomic<DiskState> state;
- };
class BucketLock : public FileStorHandler::BucketLockInterface {
public:
@@ -254,7 +192,7 @@ public:
void remapQueue(const RemapInfo& source, RemapInfo& target1, RemapInfo& target2, Operation op);
void failOperations(const document::Bucket & bucket, const api::ReturnCode & code) {
- _disk.failOperations(bucket, code);
+ stripe(bucket).failOperations(bucket, code);
}
void sendCommand(const std::shared_ptr<api::StorageCommand>&) override;
void sendReply(const std::shared_ptr<api::StorageReply>&) override;
@@ -263,11 +201,13 @@ public:
void getStatus(std::ostream& out, const framework::HttpUrlPath& path) const;
uint32_t getQueueSize() const;
- uint32_t getNextStripeId();
+ uint32_t getNextStripeId() {
+ return (_nextStripeId++) % _stripes.size();
+ }
std::shared_ptr<FileStorHandler::BucketLockInterface>
lock(const document::Bucket & bucket, api::LockingRequirements lockReq) {
- return _disk.lock(bucket, lockReq);
+ return stripe(bucket).lock(bucket, lockReq);
}
void addMergeStatus(const document::Bucket&, MergeStatus::SP);
@@ -276,17 +216,18 @@ public:
uint32_t getNumActiveMerges() const;
void clearMergeStatus(const document::Bucket&, const api::ReturnCode*);
- std::string dumpQueue() const {
- return _disk.dumpQueue();
- }
+ std::string dumpQueue() const;
ResumeGuard pause();
void resume() override;
void abortQueuedOperations(const AbortBucketOperationsCommand& cmd);
private:
- ServiceLayerComponent _component;
- Disk _disk;
- MessageSender& _messageSender;
+ ServiceLayerComponent _component;
+ std::atomic<DiskState> _state;
+ uint32_t _nextStripeId;
+ FileStorDiskMetrics * _metrics;
+ std::vector<Stripe> _stripes;
+ MessageSender& _messageSender;
const document::BucketIdFactory& _bucketIdFactory;
mutable std::mutex _mergeStatesLock;
std::map<document::Bucket, MergeStatus::SP> _mergeStates;
@@ -327,7 +268,6 @@ private:
*/
static std::unique_ptr<api::StorageReply> makeQueueTimeoutReply(api::StorageMessage& msg);
static bool messageMayBeAborted(const api::StorageMessage& msg);
- void abortQueuedCommandsForBuckets(Disk& disk, const AbortBucketOperationsCommand& cmd);
// Update hook
void updateMetrics(const MetricLockGuard &) override;
@@ -336,13 +276,44 @@ private:
remapMessage(api::StorageMessage& msg, const document::Bucket &source, Operation op,
std::vector<RemapInfo*>& targets, api::ReturnCode& returnCode);
- void remapQueueNoLock(Disk& from, const RemapInfo& source, std::vector<RemapInfo*>& targets, Operation op);
+ void remapQueueNoLock(const RemapInfo& source, std::vector<RemapInfo*>& targets, Operation op);
/**
* Waits until the queue has no pending operations (i.e. no locks are
* being held.
*/
void waitUntilNoLocks();
+ /**
+ * No assumption on memory ordering around disk state reads should
+ * be made by callers.
+ */
+ DiskState getState() const noexcept {
+ return _state.load(std::memory_order_relaxed);
+ }
+ /**
+ * No assumption on memory ordering around disk state writes should
+ * be made by callers.
+ */
+ void setState(DiskState s) noexcept {
+ _state.store(s, std::memory_order_relaxed);
+ }
+ bool isClosed() const noexcept { return getState() == DiskState::CLOSED; }
+ void dumpActiveHtml(std::ostream & os) const;
+ void dumpQueueHtml(std::ostream & os) const;
+ void flush();
+ static uint64_t dispersed_bucket_bits(const document::Bucket& bucket) noexcept;
+
+ // We make a fairly reasonable assumption that there will be less than 64k stripes.
+ uint16_t stripe_index(const document::Bucket& bucket) const noexcept {
+ return static_cast<uint16_t>(dispersed_bucket_bits(bucket) % _stripes.size());
+ }
+ Stripe & stripe(const document::Bucket & bucket) {
+ return _stripes[stripe_index(bucket)];
+ }
+ FileStorHandler::LockedMessage getNextMessage(uint32_t stripeId, vespalib::duration timeout) {
+ return _stripes[stripeId].getNextMessage(timeout);
+ }
+
};
} // storage