diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-10-15 07:35:33 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2020-10-15 11:28:32 +0000 |
commit | c99d392f69713b5300bd5b75d4a7d7e6b08c2d46 (patch) | |
tree | b29b308ee23183ffd89b44ed016b77808439ece4 /storage | |
parent | 94eafebf4f7f9b4cb0650bd7aa0c0030a66895bb (diff) |
Avoid one indirection for the disk state.
Diffstat (limited to 'storage')
-rw-r--r-- | storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp | 211 | ||||
-rw-r--r-- | storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h | 119 |
2 files changed, 120 insertions, 210 deletions
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp index 88b3a6bcafc..82541f5e9b4 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp @@ -41,18 +41,27 @@ FileStorHandlerImpl::FileStorHandlerImpl(uint32_t numThreads, uint32_t numStripe FileStorMetrics& metrics, ServiceLayerComponentRegister& compReg) : _component(compReg, "filestorhandlerimpl"), - _disk(*this, sender, numStripes), + _state(FileStorHandler::AVAILABLE), + _nextStripeId(0), + _metrics(nullptr), + _stripes(), _messageSender(sender), _bucketIdFactory(_component.getBucketIdFactory()), _getNextMessageTimeout(100ms), _max_active_merges_per_stripe(per_stripe_merge_limit(numThreads, numStripes)), _paused(false) { - _disk.metrics = metrics.disks[0].get(); - assert(_disk.metrics != nullptr); + assert(numStripes > 0); + _stripes.reserve(numStripes); + for (size_t i(0); i < numStripes; i++) { + _stripes.emplace_back(*this, sender); + } + + _metrics = metrics.disks[0].get(); + assert(_metrics != nullptr); uint32_t j(0); - for (Stripe & stripe : _disk.getStripes()) { - stripe.setMetrics(metrics.disks[0]->stripes[j++].get()); + for (Stripe & stripe : _stripes) { + stripe.setMetrics(_metrics->stripes[j++].get()); } // Add update hook, so we will get callbacks each 5 seconds to update metrics. @@ -141,9 +150,9 @@ FileStorHandlerImpl::clearMergeStatus(const document::Bucket& bucket, const api: void FileStorHandlerImpl::flush(bool killPendingMerges) { - LOG(debug, "Wait until queues and bucket locks released."); - _disk.flush(); - LOG(debug, "All queues and bucket locks released."); + LOG(debug, "Wait until queues and bucket locks released."); + flush(); + LOG(debug, "All queues and bucket locks released."); if (killPendingMerges) { api::ReturnCode code(api::ReturnCode::ABORTED, "Storage node is shutting down"); @@ -184,18 +193,17 @@ FileStorHandlerImpl::reply(api::StorageMessage& msg, DiskState state) const void FileStorHandlerImpl::setDiskState(DiskState state) { - // Mark disk closed - _disk.setState(state); + setState(state); if (state != FileStorHandler::AVAILABLE) { - _disk.flush(); + flush(); } } FileStorHandler::DiskState FileStorHandlerImpl::getDiskState() const { - return _disk.getState(); + return getState(); } void @@ -206,20 +214,31 @@ FileStorHandlerImpl::close() setDiskState(FileStorHandler::CLOSED); } LOG(debug, "Closing"); - _disk.broadcast(); + for (auto & stripe : _stripes) { + stripe.broadcast(); + } LOG(debug, "Closed"); } uint32_t FileStorHandlerImpl::getQueueSize() const { - return _disk.getQueueSize(); + size_t sum(0); + for (const auto & stripe : _stripes) { + sum += stripe.getQueueSize(); + } + return sum; } bool FileStorHandlerImpl::schedule(const std::shared_ptr<api::StorageMessage>& msg) { - return _disk.schedule(msg); + if (getState() == FileStorHandler::AVAILABLE) { + document::Bucket bucket = getStorageMessageBucket(*msg); + stripe(bucket).schedule(MessageEntry(msg, bucket)); + return true; + } + return false; } bool @@ -250,54 +269,49 @@ FileStorHandlerImpl::messageMayBeAborted(const api::StorageMessage& msg) } void -FileStorHandlerImpl::abortQueuedCommandsForBuckets(Disk& disk, const AbortBucketOperationsCommand& cmd) +FileStorHandlerImpl::abortQueuedOperations(const AbortBucketOperationsCommand& cmd) { + // Do queue clearing and active operation waiting in two passes + // to allow disk threads to drain running operations in parallel. api::ReturnCode abortedCode(api::ReturnCode::ABORTED, "Sending distributor no longer owns bucket operation was bound to, " "or storage node went down"); - auto aborted = disk.abort(cmd); + std::vector<std::shared_ptr<api::StorageReply>> aborted; + for (auto & stripe : _stripes) { + stripe.abort(aborted, cmd); + } for (auto & msgReply : aborted) { msgReply->setResult(abortedCode); _messageSender.sendReply(msgReply); } -} -void -FileStorHandlerImpl::abortQueuedOperations(const AbortBucketOperationsCommand& cmd) -{ - // Do queue clearing and active operation waiting in two passes - // to allow disk threads to drain running operations in parallel. - abortQueuedCommandsForBuckets(_disk, cmd); - _disk.waitInactive(cmd); + for (auto & stripe : _stripes) { + stripe.waitInactive(cmd); + } } void FileStorHandlerImpl::updateMetrics(const MetricLockGuard &) { std::lock_guard lockGuard(_mergeStatesLock); - _disk.metrics->pendingMerges.addValue(_mergeStates.size()); - _disk.metrics->queueSize.addValue(_disk.getQueueSize()); + _metrics->pendingMerges.addValue(_mergeStates.size()); + _metrics->queueSize.addValue(getQueueSize()); - for (auto & entry : _disk.metrics->averageQueueWaitingTime.getMetricMap()) { + for (auto & entry : _metrics->averageQueueWaitingTime.getMetricMap()) { metrics::LoadType loadType(entry.first, "ignored"); - for (const auto & stripe : _disk.metrics->stripes) { + for (const auto & stripe : _metrics->stripes) { const auto & m = stripe->averageQueueWaitingTime[loadType]; entry.second->addTotalValueWithCount(m.getTotal(), m.getCount()); } } } -uint32_t -FileStorHandlerImpl::getNextStripeId() { - return _disk.getNextStripeId(); -} - bool FileStorHandlerImpl::tryHandlePause() const { if (isPaused()) { // Wait a single time to see if filestor gets unpaused. - if (!_disk.isClosed()) { + if (!isClosed()) { std::unique_lock g(_pauseMonitor); _pauseCond.wait_for(g, 100ms); } @@ -331,7 +345,7 @@ FileStorHandlerImpl::getNextMessage(uint32_t stripeId) return {}; // Still paused, return to allow tick. } - return _disk.getNextMessage(stripeId, _getNextMessageTimeout); + return getNextMessage(stripeId, _getNextMessageTimeout); } std::shared_ptr<FileStorHandler::BucketLockInterface> @@ -640,10 +654,9 @@ FileStorHandlerImpl::remapMessage(api::StorageMessage& msg, const document::Buck } void -FileStorHandlerImpl::remapQueueNoLock(Disk& from, const RemapInfo& source, - std::vector<RemapInfo*>& targets, Operation op) +FileStorHandlerImpl::remapQueueNoLock(const RemapInfo& source, std::vector<RemapInfo*>& targets, Operation op) { - BucketIdx& idx(from.stripe(source.bucket).exposeBucketIdx()); + BucketIdx& idx(stripe(source.bucket).exposeBucketIdx()); auto range(idx.equal_range(source.bucket)); std::vector<MessageEntry> entriesFound; @@ -687,7 +700,7 @@ FileStorHandlerImpl::remapQueueNoLock(Disk& from, const RemapInfo& source, assert(bucket == source.bucket || std::find_if(targets.begin(), targets.end(), [bucket](auto* e){ return e->bucket == bucket; }) != targets.end()); - _disk.stripe(bucket).exposeQueue().emplace_back(std::move(entry)); + stripe(bucket).exposeQueue().emplace_back(std::move(entry)); } } @@ -700,10 +713,10 @@ FileStorHandlerImpl::remapQueue(const RemapInfo& source, RemapInfo& target, Oper // the same bucket. Will fix order if we accept wrong order later. MultiLockGuard guard; - guard.addLock(_disk.stripe(source.bucket).exposeLock(), _disk.stripe_index(source.bucket)); + guard.addLock(stripe(source.bucket).exposeLock(), stripe_index(source.bucket)); if (target.bucket.getBucketId().getRawId() != 0) { - guard.addLock(_disk.stripe(target.bucket).exposeLock(), _disk.stripe_index(target.bucket)); + guard.addLock(stripe(target.bucket).exposeLock(), stripe_index(target.bucket)); } std::vector<RemapInfo*> targets; @@ -711,7 +724,7 @@ FileStorHandlerImpl::remapQueue(const RemapInfo& source, RemapInfo& target, Oper guard.lock(); - remapQueueNoLock(_disk, source, targets, op); + remapQueueNoLock(source, targets, op); } void @@ -721,14 +734,14 @@ FileStorHandlerImpl::remapQueue(const RemapInfo& source, RemapInfo& target1, Rem // the same bucket. Will fix order if we accept wrong order later. MultiLockGuard guard; - guard.addLock(_disk.stripe(source.bucket).exposeLock(), _disk.stripe_index(source.bucket)); + guard.addLock(stripe(source.bucket).exposeLock(), stripe_index(source.bucket)); if (target1.bucket.getBucketId().getRawId() != 0) { - guard.addLock(_disk.stripe(target1.bucket).exposeLock(), _disk.stripe_index(target1.bucket)); + guard.addLock(stripe(target1.bucket).exposeLock(), stripe_index(target1.bucket)); } if (target2.bucket.getBucketId().getRawId() != 0) { - guard.addLock(_disk.stripe(target2.bucket).exposeLock(), _disk.stripe_index(target2.bucket)); + guard.addLock(stripe(target2.bucket).exposeLock(), stripe_index(target2.bucket)); } guard.lock(); @@ -737,7 +750,7 @@ FileStorHandlerImpl::remapQueue(const RemapInfo& source, RemapInfo& target1, Rem targets.push_back(&target1); targets.push_back(&target2); - remapQueueNoLock(_disk, source, targets, op); + remapQueueNoLock(source, targets, op); } void @@ -808,64 +821,23 @@ FileStorHandlerImpl::MessageEntry::MessageEntry(MessageEntry && entry) noexcept { } FileStorHandlerImpl::MessageEntry::~MessageEntry() = default; - -FileStorHandlerImpl::Disk::Disk(const FileStorHandlerImpl & owner, MessageSender & messageSender, uint32_t numStripes) - : metrics(0), - _nextStripeId(0), - _stripes(), - state(FileStorHandler::AVAILABLE) -{ - _stripes.reserve(numStripes); - for (size_t i(0); i < numStripes; i++) { - _stripes.emplace_back(owner, messageSender); - } - assert(numStripes > 0); -} - -FileStorHandlerImpl::Disk::Disk(Disk && rhs) noexcept - : metrics(std::move(rhs.metrics)), - _nextStripeId(rhs._nextStripeId), - _stripes(std::move(rhs._stripes)), - state(rhs.state.load()) -{ } - -FileStorHandlerImpl::Disk::~Disk() = default; FileStorHandlerImpl::Stripe::~Stripe() = default; FileStorHandlerImpl::Stripe::Stripe(Stripe &&) noexcept = default; void -FileStorHandlerImpl::Disk::flush() +FileStorHandlerImpl::flush() { for (auto & stripe : _stripes) { stripe.flush(); } } -void -FileStorHandlerImpl::Disk::broadcast() -{ - for (auto & stripe : _stripes) { - stripe.broadcast(); - } -} - -uint64_t FileStorHandlerImpl::Disk::dispersed_bucket_bits(const document::Bucket& bucket) noexcept { +uint64_t +FileStorHandlerImpl::dispersed_bucket_bits(const document::Bucket& bucket) noexcept { const uint64_t raw_id = bucket.getBucketId().getId(); return XXH3_64bits(&raw_id, sizeof(uint64_t)); } -bool -FileStorHandlerImpl::Disk::schedule(const std::shared_ptr<api::StorageMessage>& msg) -{ - MessageEntry messageEntry(msg, getStorageMessageBucket(*msg)); - if (getState() == FileStorHandler::AVAILABLE) { - stripe(messageEntry._bucket).schedule(std::move(messageEntry)); - } else { - return false; - } - return true; -} - FileStorHandlerImpl::Stripe::Stripe(const FileStorHandlerImpl & owner, MessageSender & messageSender) : _owner(owner), _messageSender(messageSender), @@ -878,14 +850,14 @@ FileStorHandlerImpl::Stripe::Stripe(const FileStorHandlerImpl & owner, MessageSe {} FileStorHandler::LockedMessage -FileStorHandlerImpl::Stripe::getNextMessage(vespalib::duration timeout, Disk & disk) +FileStorHandlerImpl::Stripe::getNextMessage(vespalib::duration timeout) { std::unique_lock guard(*_lock); // Try to grab a message+lock, immediately retrying once after a wait // if none can be found and then exiting if the same is the case on the // second attempt. This is key to allowing the run loop to register // ticks at regular intervals while not busy-waiting. - for (int attempt = 0; (attempt < 2) && ! disk.isClosed() && !_owner.isPaused(); ++attempt) { + for (int attempt = 0; (attempt < 2) && ! _owner.isClosed() && !_owner.isPaused(); ++attempt) { PriorityIdx& idx(bmi::get<1>(*_queue)); PriorityIdx::iterator iter(idx.begin()), end(idx.end()); @@ -928,14 +900,6 @@ FileStorHandlerImpl::Stripe::getMessage(monitor_guard & guard, PriorityIdx & idx } void -FileStorHandlerImpl::Disk::waitUntilNoLocks() const -{ - for (const auto & stripe : _stripes) { - stripe.waitUntilNoLocks(); - } -} - -void FileStorHandlerImpl::Stripe::waitUntilNoLocks() const { std::unique_lock guard(*_lock); @@ -945,13 +909,6 @@ FileStorHandlerImpl::Stripe::waitUntilNoLocks() const } void -FileStorHandlerImpl::Disk::waitInactive(const AbortBucketOperationsCommand& cmd) const { - for (auto & stripe : _stripes) { - stripe.waitInactive(cmd); - } -} - -void FileStorHandlerImpl::Stripe::waitInactive(const AbortBucketOperationsCommand& cmd) const { std::unique_lock guard(*_lock); while (hasActive(guard, cmd)) { @@ -971,16 +928,6 @@ FileStorHandlerImpl::Stripe::hasActive(monitor_guard &, const AbortBucketOperati return false; } -std::vector<std::shared_ptr<api::StorageReply>> -FileStorHandlerImpl::Disk::abort(const AbortBucketOperationsCommand& cmd) -{ - std::vector<std::shared_ptr<api::StorageReply>> aborted; - for (auto & stripe : _stripes) { - stripe.abort(aborted, cmd); - } - return aborted; -} - void FileStorHandlerImpl::Stripe::abort(std::vector<std::shared_ptr<api::StorageReply>> & aborted, const AbortBucketOperationsCommand& cmd) { @@ -1113,16 +1060,6 @@ FileStorHandlerImpl::Stripe::operationIsInhibited(const monitor_guard & guard, c return isLocked(guard, bucket, msg.lockingRequirements()); } -uint32_t -FileStorHandlerImpl::Disk::getQueueSize() const noexcept -{ - size_t sum(0); - for (const auto & stripe : _stripes) { - sum += stripe.getQueueSize(); - } - return sum; -} - FileStorHandlerImpl::BucketLock::BucketLock(const monitor_guard & guard, Stripe& stripe, const document::Bucket &bucket, uint8_t priority, api::MessageType::Id msgType, api::StorageMessage::Id msgId, @@ -1149,7 +1086,7 @@ FileStorHandlerImpl::BucketLock::~BucketLock() { } std::string -FileStorHandlerImpl::Disk::dumpQueue() const +FileStorHandlerImpl::dumpQueue() const { std::ostringstream os; for (const Stripe & stripe : _stripes) { @@ -1159,7 +1096,7 @@ FileStorHandlerImpl::Disk::dumpQueue() const } void -FileStorHandlerImpl::Disk::dumpQueueHtml(std::ostream & os) const +FileStorHandlerImpl::dumpQueueHtml(std::ostream & os) const { for (const Stripe & stripe : _stripes) { stripe.dumpQueueHtml(os); @@ -1167,7 +1104,7 @@ FileStorHandlerImpl::Disk::dumpQueueHtml(std::ostream & os) const } void -FileStorHandlerImpl::Disk::dumpActiveHtml(std::ostream & os) const +FileStorHandlerImpl::dumpActiveHtml(std::ostream & os) const { for (const Stripe & stripe : _stripes) { stripe.dumpActiveHtml(os); @@ -1234,19 +1171,19 @@ FileStorHandlerImpl::getStatus(std::ostream& out, const framework::HttpUrlPath& out << "<h1>Filestor handler</h1>\n"; out << "<h2>Disk " << "</h2>\n"; - out << "Queue size: " << _disk.getQueueSize() << "<br>\n"; + out << "Queue size: " << getQueueSize() << "<br>\n"; out << "Disk state: "; - switch (_disk.getState()) { + switch (getState()) { case FileStorHandler::AVAILABLE: out << "AVAILABLE"; break; case FileStorHandler::DISABLED: out << "DISABLED"; break; case FileStorHandler::CLOSED: out << "CLOSED"; break; } out << "<h4>Active operations</h4>\n"; - _disk.dumpActiveHtml(out); + dumpActiveHtml(out); if (verbose) { out << "<h4>Input queue</h4>\n"; out << "<ul>\n"; - _disk.dumpQueueHtml(out); + dumpQueueHtml(out); out << "</ul>\n"; } @@ -1266,7 +1203,9 @@ FileStorHandlerImpl::getStatus(std::ostream& out, const framework::HttpUrlPath& void FileStorHandlerImpl::waitUntilNoLocks() { - _disk.waitUntilNoLocks(); + for (const auto & stripe : _stripes) { + stripe.waitUntilNoLocks(); + } } ResumeGuard diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h index f397b82d199..dfb830d72a5 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h @@ -75,8 +75,6 @@ public: using Clock = std::chrono::steady_clock; using monitor_guard = std::unique_lock<std::mutex>; - struct Disk; - class Stripe { public: struct LockEntry { @@ -131,7 +129,7 @@ public: std::shared_ptr<FileStorHandler::BucketLockInterface> lock(const document::Bucket & bucket, api::LockingRequirements lockReq); void failOperations(const document::Bucket & bucket, const api::ReturnCode & code); - FileStorHandler::LockedMessage getNextMessage(vespalib::duration timeout, Disk & disk); + FileStorHandler::LockedMessage getNextMessage(vespalib::duration timeout); void dumpQueue(std::ostream & os) const; void dumpActiveHtml(std::ostream & os) const; void dumpQueueHtml(std::ostream & os) const; @@ -155,66 +153,6 @@ public: LockedBuckets _lockedBuckets; uint32_t _active_merges; }; - struct Disk { - FileStorDiskMetrics * metrics; - - /** - * No assumption on memory ordering around disk state reads should - * be made by callers. - */ - DiskState getState() const noexcept { - return state.load(std::memory_order_relaxed); - } - /** - * No assumption on memory ordering around disk state writes should - * be made by callers. - */ - void setState(DiskState s) noexcept { - state.store(s, std::memory_order_relaxed); - } - - Disk(const FileStorHandlerImpl & owner, MessageSender & messageSender, uint32_t numStripes); - Disk(Disk &&) noexcept; - ~Disk(); - - bool isClosed() const noexcept { return getState() == DiskState::CLOSED; } - - void flush(); - void broadcast(); - bool schedule(const std::shared_ptr<api::StorageMessage>& msg); - void waitUntilNoLocks() const; - std::vector<std::shared_ptr<api::StorageReply>> abort(const AbortBucketOperationsCommand& cmd); - void waitInactive(const AbortBucketOperationsCommand& cmd) const; - FileStorHandler::LockedMessage getNextMessage(uint32_t stripeId, vespalib::duration timeout) { - return _stripes[stripeId].getNextMessage(timeout, *this); - } - std::shared_ptr<FileStorHandler::BucketLockInterface> - lock(const document::Bucket & bucket, api::LockingRequirements lockReq) { - return stripe(bucket).lock(bucket, lockReq); - } - void failOperations(const document::Bucket & bucket, const api::ReturnCode & code) { - stripe(bucket).failOperations(bucket, code); - } - - uint32_t getQueueSize() const noexcept; - uint32_t getNextStripeId() { return (_nextStripeId++)%_stripes.size(); } - std::string dumpQueue() const; - void dumpActiveHtml(std::ostream & os) const; - void dumpQueueHtml(std::ostream & os) const; - static uint64_t dispersed_bucket_bits(const document::Bucket& bucket) noexcept; - // We make a fairly reasonable assumption that there will be less than 64k stripes. - uint16_t stripe_index(const document::Bucket& bucket) const noexcept { - return static_cast<uint16_t>(dispersed_bucket_bits(bucket) % _stripes.size()); - } - Stripe & stripe(const document::Bucket & bucket) { - return _stripes[stripe_index(bucket)]; - } - std::vector<Stripe> & getStripes() { return _stripes; } - private: - uint32_t _nextStripeId; - std::vector<Stripe> _stripes; - std::atomic<DiskState> state; - }; class BucketLock : public FileStorHandler::BucketLockInterface { public: @@ -254,7 +192,7 @@ public: void remapQueue(const RemapInfo& source, RemapInfo& target1, RemapInfo& target2, Operation op); void failOperations(const document::Bucket & bucket, const api::ReturnCode & code) { - _disk.failOperations(bucket, code); + stripe(bucket).failOperations(bucket, code); } void sendCommand(const std::shared_ptr<api::StorageCommand>&) override; void sendReply(const std::shared_ptr<api::StorageReply>&) override; @@ -263,11 +201,13 @@ public: void getStatus(std::ostream& out, const framework::HttpUrlPath& path) const; uint32_t getQueueSize() const; - uint32_t getNextStripeId(); + uint32_t getNextStripeId() { + return (_nextStripeId++) % _stripes.size(); + } std::shared_ptr<FileStorHandler::BucketLockInterface> lock(const document::Bucket & bucket, api::LockingRequirements lockReq) { - return _disk.lock(bucket, lockReq); + return stripe(bucket).lock(bucket, lockReq); } void addMergeStatus(const document::Bucket&, MergeStatus::SP); @@ -276,17 +216,18 @@ public: uint32_t getNumActiveMerges() const; void clearMergeStatus(const document::Bucket&, const api::ReturnCode*); - std::string dumpQueue() const { - return _disk.dumpQueue(); - } + std::string dumpQueue() const; ResumeGuard pause(); void resume() override; void abortQueuedOperations(const AbortBucketOperationsCommand& cmd); private: - ServiceLayerComponent _component; - Disk _disk; - MessageSender& _messageSender; + ServiceLayerComponent _component; + std::atomic<DiskState> _state; + uint32_t _nextStripeId; + FileStorDiskMetrics * _metrics; + std::vector<Stripe> _stripes; + MessageSender& _messageSender; const document::BucketIdFactory& _bucketIdFactory; mutable std::mutex _mergeStatesLock; std::map<document::Bucket, MergeStatus::SP> _mergeStates; @@ -327,7 +268,6 @@ private: */ static std::unique_ptr<api::StorageReply> makeQueueTimeoutReply(api::StorageMessage& msg); static bool messageMayBeAborted(const api::StorageMessage& msg); - void abortQueuedCommandsForBuckets(Disk& disk, const AbortBucketOperationsCommand& cmd); // Update hook void updateMetrics(const MetricLockGuard &) override; @@ -336,13 +276,44 @@ private: remapMessage(api::StorageMessage& msg, const document::Bucket &source, Operation op, std::vector<RemapInfo*>& targets, api::ReturnCode& returnCode); - void remapQueueNoLock(Disk& from, const RemapInfo& source, std::vector<RemapInfo*>& targets, Operation op); + void remapQueueNoLock(const RemapInfo& source, std::vector<RemapInfo*>& targets, Operation op); /** * Waits until the queue has no pending operations (i.e. no locks are * being held. */ void waitUntilNoLocks(); + /** + * No assumption on memory ordering around disk state reads should + * be made by callers. + */ + DiskState getState() const noexcept { + return _state.load(std::memory_order_relaxed); + } + /** + * No assumption on memory ordering around disk state writes should + * be made by callers. + */ + void setState(DiskState s) noexcept { + _state.store(s, std::memory_order_relaxed); + } + bool isClosed() const noexcept { return getState() == DiskState::CLOSED; } + void dumpActiveHtml(std::ostream & os) const; + void dumpQueueHtml(std::ostream & os) const; + void flush(); + static uint64_t dispersed_bucket_bits(const document::Bucket& bucket) noexcept; + + // We make a fairly reasonable assumption that there will be less than 64k stripes. + uint16_t stripe_index(const document::Bucket& bucket) const noexcept { + return static_cast<uint16_t>(dispersed_bucket_bits(bucket) % _stripes.size()); + } + Stripe & stripe(const document::Bucket & bucket) { + return _stripes[stripe_index(bucket)]; + } + FileStorHandler::LockedMessage getNextMessage(uint32_t stripeId, vespalib::duration timeout) { + return _stripes[stripeId].getNextMessage(timeout); + } + }; } // storage |