diff options
author | Henning Baldersheim <balder@oath.com> | 2018-03-21 11:34:00 +0100 |
---|---|---|
committer | Henning Baldersheim <balder@oath.com> | 2018-04-09 14:19:49 +0200 |
commit | 96071a9312fbc01038050726b1bd7d07f44fde0b (patch) | |
tree | d2f9e22c0a29266484426dc28757dd94e8ea0561 /storage | |
parent | 37999706a2755345effcf81a38e1acd22bef0237 (diff) |
Stripe the operations on bucketid to provide for a more efficient thread handover.
Diffstat (limited to 'storage')
9 files changed, 552 insertions, 413 deletions
diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp index c786f0bbaa8..168e9599b3d 100644 --- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp +++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp @@ -615,6 +615,8 @@ FileStorManagerTest::testHandlerPriority() FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), _node->getComponentRegister()); filestorHandler.setGetNextMessageTimeout(50); + uint32_t stripeId = filestorHandler.getNextStripeId(0); + CPPUNIT_ASSERT_EQUAL(0u, stripeId); std::string content("Here is some content which is in all documents"); std::ostringstream uri; @@ -633,11 +635,11 @@ FileStorManagerTest::testHandlerPriority() filestorHandler.schedule(cmd, 0); } - CPPUNIT_ASSERT_EQUAL(15, (int)filestorHandler.getNextMessage(0).second->getPriority()); - CPPUNIT_ASSERT_EQUAL(30, (int)filestorHandler.getNextMessage(0).second->getPriority()); - CPPUNIT_ASSERT_EQUAL(45, (int)filestorHandler.getNextMessage(0).second->getPriority()); - CPPUNIT_ASSERT_EQUAL(60, (int)filestorHandler.getNextMessage(0).second->getPriority()); - CPPUNIT_ASSERT_EQUAL(75, (int)filestorHandler.getNextMessage(0).second->getPriority()); + CPPUNIT_ASSERT_EQUAL(15, (int)filestorHandler.getNextMessage(0, stripeId).second->getPriority()); + CPPUNIT_ASSERT_EQUAL(30, (int)filestorHandler.getNextMessage(0, stripeId).second->getPriority()); + CPPUNIT_ASSERT_EQUAL(45, (int)filestorHandler.getNextMessage(0, stripeId).second->getPriority()); + CPPUNIT_ASSERT_EQUAL(60, (int)filestorHandler.getNextMessage(0, stripeId).second->getPriority()); + CPPUNIT_ASSERT_EQUAL(75, (int)filestorHandler.getNextMessage(0, stripeId).second->getPriority()); } class MessagePusherThread : public document::Runnable @@ -672,6 +674,7 @@ MessagePusherThread::~MessagePusherThread() = default; class MessageFetchingThread : public document::Runnable { public: + const uint32_t _threadId; FileStorHandler& _handler; std::atomic<uint32_t> _config; uint32_t _fetchedCount; @@ -680,12 +683,13 @@ public: bool _threadDone; MessageFetchingThread(FileStorHandler& handler) - : _handler(handler), _config(0), _fetchedCount(0), _done(false), - _failed(false), _threadDone(false) {} + : _threadId(handler.getNextStripeId(0)), _handler(handler), _config(0), _fetchedCount(0), _done(false), + _failed(false), _threadDone(false) + {} void run() override { while (!_done) { - FileStorHandler::LockedMessage msg = _handler.getNextMessage(0); + FileStorHandler::LockedMessage msg = _handler.getNextMessage(0, _threadId); if (msg.second.get()) { uint32_t originalConfig = _config.load(); _fetchedCount++; @@ -761,8 +765,7 @@ FileStorManagerTest::testHandlerPause() // Setup a filestorthread to test DummyStorageLink top; DummyStorageLink *dummyManager; - top.push_back(std::unique_ptr<StorageLink>( - dummyManager = new DummyStorageLink)); + top.push_back(std::unique_ptr<StorageLink>(dummyManager = new DummyStorageLink)); top.open(); ForwardingMessageSender messageSender(*dummyManager); // Since we fake time with small numbers, we need to make sure we dont @@ -774,6 +777,7 @@ FileStorManagerTest::testHandlerPause() FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), _node->getComponentRegister()); filestorHandler.setGetNextMessageTimeout(50); + uint32_t stripeId = filestorHandler.getNextStripeId(0); std::string content("Here is some content which is in all documents"); std::ostringstream uri; @@ -781,8 +785,7 @@ FileStorManagerTest::testHandlerPause() Document::SP doc(createDocument(content, "userdoc:footype:1234:bar").release()); document::BucketIdFactory factory; - document::BucketId bucket(16, factory.getBucketId( - doc->getId()).getRawId()); + document::BucketId bucket(16, factory.getBucketId(doc->getId()).getRawId()); // Populate bucket with the given data for (uint32_t i = 1; i < 6; i++) { @@ -793,15 +796,15 @@ FileStorManagerTest::testHandlerPause() filestorHandler.schedule(cmd, 0); } - CPPUNIT_ASSERT_EQUAL(15, (int)filestorHandler.getNextMessage(0).second->getPriority()); + CPPUNIT_ASSERT_EQUAL(15, (int)filestorHandler.getNextMessage(0, stripeId).second->getPriority()); { ResumeGuard guard = filestorHandler.pause(); (void)guard; - CPPUNIT_ASSERT(filestorHandler.getNextMessage(0).second.get() == NULL); + CPPUNIT_ASSERT(filestorHandler.getNextMessage(0, stripeId).second.get() == NULL); } - CPPUNIT_ASSERT_EQUAL(30, (int)filestorHandler.getNextMessage(0).second->getPriority()); + CPPUNIT_ASSERT_EQUAL(30, (int)filestorHandler.getNextMessage(0, stripeId).second->getPriority()); } namespace { @@ -898,9 +901,9 @@ FileStorManagerTest::testHandlerMulti() FileStorMetrics metrics(loadTypes.getMetricLoadTypes()); metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1); - FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), - _node->getComponentRegister()); + FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), _node->getComponentRegister()); filestorHandler.setGetNextMessageTimeout(50); + uint32_t stripeId = filestorHandler.getNextStripeId(0); std::string content("Here is some content which is in all documents"); @@ -921,21 +924,21 @@ FileStorManagerTest::testHandlerMulti() } { - FileStorHandler::LockedMessage lock = filestorHandler.getNextMessage(0); + FileStorHandler::LockedMessage lock = filestorHandler.getNextMessage(0, 0); CPPUNIT_ASSERT_EQUAL((uint64_t)1, getPutTime(lock.second)); - lock = filestorHandler.getNextMessage(0, lock); + lock = filestorHandler.getNextMessage(0, stripeId, lock); CPPUNIT_ASSERT_EQUAL((uint64_t)2, getPutTime(lock.second)); - lock = filestorHandler.getNextMessage(0, lock); + lock = filestorHandler.getNextMessage(0, stripeId, lock); CPPUNIT_ASSERT_EQUAL((uint64_t)3, getPutTime(lock.second)); } { - FileStorHandler::LockedMessage lock = filestorHandler.getNextMessage(0); + FileStorHandler::LockedMessage lock = filestorHandler.getNextMessage(0, stripeId); CPPUNIT_ASSERT_EQUAL((uint64_t)11, getPutTime(lock.second)); - lock = filestorHandler.getNextMessage(0, lock); + lock = filestorHandler.getNextMessage(0, stripeId, lock); CPPUNIT_ASSERT_EQUAL((uint64_t)12, getPutTime(lock.second)); } } @@ -959,9 +962,9 @@ FileStorManagerTest::testHandlerTimeout() FileStorMetrics metrics(loadTypes.getMetricLoadTypes()); metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1); - FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), - _node->getComponentRegister()); + FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), _node->getComponentRegister()); filestorHandler.setGetNextMessageTimeout(50); + uint32_t stripeId = filestorHandler.getNextStripeId(0); std::string content("Here is some content which is in all documents"); std::ostringstream uri; @@ -969,16 +972,12 @@ FileStorManagerTest::testHandlerTimeout() Document::SP doc(createDocument(content, "userdoc:footype:1234:bar").release()); document::BucketIdFactory factory; - document::BucketId bucket(16, factory.getBucketId( - doc->getId()).getRawId()); + document::BucketId bucket(16, factory.getBucketId(doc->getId()).getRawId()); // Populate bucket with the given data { - std::shared_ptr<api::PutCommand> cmd( - new api::PutCommand(makeDocumentBucket(bucket), doc, 100)); - std::unique_ptr<api::StorageMessageAddress> address( - new api::StorageMessageAddress( - "storage", lib::NodeType::STORAGE, 3)); + auto cmd = std::make_shared<api::PutCommand>(makeDocumentBucket(bucket), doc, 100); + auto address = std::make_unique<api::StorageMessageAddress>("storage", lib::NodeType::STORAGE, 3); cmd->setAddress(*address); cmd->setPriority(0); cmd->setTimeout(50); @@ -986,11 +985,8 @@ FileStorManagerTest::testHandlerTimeout() } { - std::shared_ptr<api::PutCommand> cmd( - new api::PutCommand(makeDocumentBucket(bucket), doc, 100)); - std::unique_ptr<api::StorageMessageAddress> address( - new api::StorageMessageAddress( - "storage", lib::NodeType::STORAGE, 3)); + auto cmd = std::make_shared<api::PutCommand>(makeDocumentBucket(bucket), doc, 100); + auto address = std::make_unique<api::StorageMessageAddress>("storage", lib::NodeType::STORAGE, 3); cmd->setAddress(*address); cmd->setPriority(200); cmd->setTimeout(10000); @@ -999,7 +995,7 @@ FileStorManagerTest::testHandlerTimeout() FastOS_Thread::Sleep(51); for (;;) { - auto lock = filestorHandler.getNextMessage(0); + auto lock = filestorHandler.getNextMessage(0, stripeId); if (lock.first.get()) { CPPUNIT_ASSERT_EQUAL(uint8_t(200), lock.second->getPriority()); break; @@ -1008,8 +1004,7 @@ FileStorManagerTest::testHandlerTimeout() CPPUNIT_ASSERT_EQUAL(size_t(1), top.getNumReplies()); CPPUNIT_ASSERT_EQUAL(api::ReturnCode::TIMEOUT, - static_cast<api::StorageReply&>(*top.getReply(0)) - .getResult().getResult()); + static_cast<api::StorageReply&>(*top.getReply(0)).getResult().getResult()); } void diff --git a/storage/src/tests/persistence/persistencequeuetest.cpp b/storage/src/tests/persistence/persistencequeuetest.cpp index f5f190ad718..1a0b549ceed 100644 --- a/storage/src/tests/persistence/persistencequeuetest.cpp +++ b/storage/src/tests/persistence/persistencequeuetest.cpp @@ -67,8 +67,7 @@ PersistenceQueueTest::testFetchNextUnlockedMessageIfBucketLocked() { DummyStorageLink top; DummyStorageLink *dummyManager; - top.push_back(std::unique_ptr<StorageLink>( - dummyManager = new DummyStorageLink)); + top.push_back(std::unique_ptr<StorageLink>(dummyManager = new DummyStorageLink)); top.open(); ForwardingMessageSender messageSender(*dummyManager); @@ -78,6 +77,7 @@ PersistenceQueueTest::testFetchNextUnlockedMessageIfBucketLocked() loadTypes.getMetricLoadTypes(), 1); FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), _node->getComponentRegister()); + uint32_t stripeId = filestorHandler.getNextStripeId(0); // Send 2 puts, 2 to the first bucket, 1 to the second. Calling // getNextMessage 2 times should then return a lock on the first bucket, @@ -87,12 +87,12 @@ PersistenceQueueTest::testFetchNextUnlockedMessageIfBucketLocked() filestorHandler.schedule(createPut(1234, 1), 0); filestorHandler.schedule(createPut(5432, 0), 0); - auto lock0 = filestorHandler.getNextMessage(0); + auto lock0 = filestorHandler.getNextMessage(0, stripeId); CPPUNIT_ASSERT(lock0.first.get()); CPPUNIT_ASSERT_EQUAL(document::BucketId(16, 1234), dynamic_cast<api::PutCommand&>(*lock0.second).getBucketId()); - auto lock1 = filestorHandler.getNextMessage(0); + auto lock1 = filestorHandler.getNextMessage(0, stripeId); CPPUNIT_ASSERT(lock1.first.get()); CPPUNIT_ASSERT_EQUAL(document::BucketId(16, 5432), dynamic_cast<api::PutCommand&>(*lock1.second).getBucketId()); diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp index 949ceb901e1..b067239a6f7 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp @@ -4,11 +4,16 @@ namespace storage { -FileStorHandler::FileStorHandler(MessageSender& sender, - FileStorMetrics& metrics, - const spi::PartitionStateList& partitions, - ServiceLayerComponentRegister& compReg) - : _impl(new FileStorHandlerImpl(sender, metrics, partitions, compReg)) +FileStorHandler::FileStorHandler(MessageSender& sender, FileStorMetrics& metrics, + const spi::PartitionStateList& partitions, ServiceLayerComponentRegister& compReg) + : _impl(new FileStorHandlerImpl(1, sender, metrics, partitions, compReg)) +{ +} + + +FileStorHandler::FileStorHandler(uint32_t numStripes, MessageSender& sender, FileStorMetrics& metrics, + const spi::PartitionStateList& partitions, ServiceLayerComponentRegister& compReg) + : _impl(new FileStorHandlerImpl(numStripes, sender, metrics, partitions, compReg)) { } @@ -54,15 +59,15 @@ FileStorHandler::schedule(const api::StorageMessage::SP& msg, uint16_t thread) } FileStorHandler::LockedMessage -FileStorHandler::getNextMessage(uint16_t thread) +FileStorHandler::getNextMessage(uint16_t thread, uint32_t stripeId) { - return _impl->getNextMessage(thread); + return _impl->getNextMessage(thread, stripeId); } FileStorHandler::LockedMessage & -FileStorHandler::getNextMessage(uint16_t thread, LockedMessage& lck) +FileStorHandler::getNextMessage(uint16_t thread, uint32_t stripeId, LockedMessage& lck) { - return _impl->getNextMessage(thread, lck); + return _impl->getNextMessage(thread, stripeId, lck); } FileStorHandler::BucketLockInterface::SP @@ -72,12 +77,9 @@ FileStorHandler::lock(const document::Bucket& bucket, uint16_t disk) } void -FileStorHandler::remapQueueAfterDiskMove( - const document::Bucket& bucket, - uint16_t sourceDisk, uint16_t targetDisk) +FileStorHandler::remapQueueAfterDiskMove(const document::Bucket& bucket, uint16_t sourceDisk, uint16_t targetDisk) { RemapInfo target(bucket, targetDisk); - _impl->remapQueue(RemapInfo(bucket, sourceDisk), target, FileStorHandlerImpl::MOVE); } @@ -153,6 +155,11 @@ FileStorHandler::getNumActiveMerges() const return _impl->getNumActiveMerges(); } +uint32_t +FileStorHandler::getNextStripeId(uint32_t disk) { + return _impl->getNextStripeId(disk); +} + void FileStorHandler::clearMergeStatus(const document::Bucket& bucket, const api::ReturnCode& code) diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h index 2cfc97ca71b..0477845f3aa 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h @@ -62,8 +62,7 @@ public: virtual ~BucketLockInterface() {}; }; - typedef std::pair<BucketLockInterface::SP, - api::StorageMessage::SP> LockedMessage; + typedef std::pair<BucketLockInterface::SP, api::StorageMessage::SP> LockedMessage; enum DiskState { AVAILABLE, @@ -71,7 +70,10 @@ public: CLOSED }; - FileStorHandler(MessageSender&, FileStorMetrics&, const spi::PartitionStateList&, ServiceLayerComponentRegister&); + FileStorHandler(uint32_t numStripes, MessageSender&, FileStorMetrics&, + const spi::PartitionStateList&, ServiceLayerComponentRegister&); + FileStorHandler(MessageSender&, FileStorMetrics&, + const spi::PartitionStateList&, ServiceLayerComponentRegister&); ~FileStorHandler(); // Commands used by file stor manager @@ -116,12 +118,12 @@ public: * * @param disk The disk to get messages for */ - LockedMessage getNextMessage(uint16_t disk); + LockedMessage getNextMessage(uint16_t disk, uint32_t stripeId); /** * Returns the next message for the same bucket. */ - LockedMessage & getNextMessage(uint16_t disk, LockedMessage& lock); + LockedMessage & getNextMessage(uint16_t disk, uint32_t stripeId, LockedMessage& lock); /** * Lock a bucket. By default, each file stor thread has the locks of all @@ -229,6 +231,13 @@ public: */ uint32_t getNumActiveMerges() const; + /** + * Provides the next stripe id for a certain disk. + * @param disk + * @return + */ + uint32_t getNextStripeId(uint32_t disk); + /** Removes the merge status for the given bucket. */ void clearMergeStatus(const document::Bucket&); void clearMergeStatus(const document::Bucket&, const api::ReturnCode&); diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp index d282b0ee3ed..01e42133bac 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp @@ -23,17 +23,21 @@ using document::BucketSpace; namespace storage { -FileStorHandlerImpl::FileStorHandlerImpl(MessageSender& sender, FileStorMetrics& metrics, +FileStorHandlerImpl::FileStorHandlerImpl(uint32_t numStripes, MessageSender& sender, FileStorMetrics& metrics, const spi::PartitionStateList& partitions, ServiceLayerComponentRegister& compReg) : _partitions(partitions), _component(compReg, "filestorhandlerimpl"), - _diskInfo(_component.getDiskCount()), + _diskInfo(), _messageSender(sender), _bucketIdFactory(_component.getBucketIdFactory()), _getNextMessageTimeout(100), _paused(false) { + _diskInfo.reserve(_component.getDiskCount()); + for (uint32_t i(0); i < _component.getDiskCount(); i++) { + _diskInfo.emplace_back(*this, sender, numStripes); + } for (uint32_t i=0; i<_diskInfo.size(); ++i) { _diskInfo[i].metrics = metrics.disks[i].get(); assert(_diskInfo[i].metrics != 0); @@ -131,13 +135,7 @@ FileStorHandlerImpl::flush(bool killPendingMerges) { for (uint32_t i=0; i<_diskInfo.size(); ++i) { LOG(debug, "Wait until queues and bucket locks released for disk '%d'", i); - Disk& t(_diskInfo[i]); - vespalib::MonitorGuard lockGuard(t.lock); - while (t.getQueueSize() != 0 || !t.lockedBuckets.empty()) { - LOG(debug, "Still %d in queue and %ld locked buckets for disk '%d'", - t.getQueueSize(), t.lockedBuckets.size(), i); - lockGuard.wait(100); - } + _diskInfo[i].flush(); LOG(debug, "All queues and bucket locks released for disk '%d'", i); } @@ -178,20 +176,15 @@ FileStorHandlerImpl::reply(api::StorageMessage& msg, DiskState state) const } void -FileStorHandlerImpl::setDiskState(uint16_t disk, DiskState state) +FileStorHandlerImpl::setDiskState(uint16_t diskId, DiskState state) { - Disk& t(_diskInfo[disk]); - vespalib::MonitorGuard lockGuard(t.lock); + Disk& disk = _diskInfo[diskId]; // Mark disk closed - t.setState(state); + disk.setState(state); if (state != FileStorHandler::AVAILABLE) { - while (t.queue.begin() != t.queue.end()) { - reply(*t.queue.begin()->_command, state); - t.queue.erase(t.queue.begin()); - } + disk.flush(); } - lockGuard.broadcast(); } FileStorHandler::DiskState @@ -209,9 +202,7 @@ FileStorHandlerImpl::close() setDiskState(i, FileStorHandler::CLOSED); } LOG(debug, "Closing disk[%d]", i); - Disk& t(_diskInfo[i]); - vespalib::MonitorGuard lockGuard(t.lock); - lockGuard.broadcast(); + _diskInfo[i].broadcast(); LOG(debug, "Closed disk[%d]", i); } } @@ -219,38 +210,23 @@ FileStorHandlerImpl::close() uint32_t FileStorHandlerImpl::getQueueSize() const { - uint32_t count = 0; - for (uint32_t i=0; i<_diskInfo.size(); ++i) { - const Disk& t(_diskInfo[i]); - vespalib::MonitorGuard lockGuard(t.lock); - count += t.getQueueSize(); + size_t count = 0; + for (const auto & disk : _diskInfo) { + count += disk.getQueueSize(); } return count; } bool -FileStorHandlerImpl::schedule(const std::shared_ptr<api::StorageMessage>& msg, uint16_t disk) +FileStorHandlerImpl::schedule(const std::shared_ptr<api::StorageMessage>& msg, uint16_t diskId) { - assert(disk < _diskInfo.size()); - Disk& t(_diskInfo[disk]); - MessageEntry messageEntry(msg, getStorageMessageBucket(*msg)); - vespalib::MonitorGuard lockGuard(t.lock); - - if (t.getState() == FileStorHandler::AVAILABLE) { - MBUS_TRACE(msg->getTrace(), 5, vespalib::make_string( - "FileStorHandler: Operation added to disk %d's queue with priority %u", disk, msg->getPriority())); - - t.queue.emplace_back(std::move(messageEntry)); - LOG(spam, "Queued operation %s with priority %u.", msg->getType().toString().c_str(), msg->getPriority()); - lockGuard.broadcast(); - } else { - return false; - } - return true; + assert(diskId < _diskInfo.size()); + Disk& disk(_diskInfo[diskId]); + return disk.schedule(msg); } bool -FileStorHandlerImpl::messageMayBeAborted(const api::StorageMessage& msg) const +FileStorHandlerImpl::messageMayBeAborted(const api::StorageMessage& msg) { if (msg.getType().isReply()) { return false; @@ -281,50 +257,14 @@ FileStorHandlerImpl::messageMayBeAborted(const api::StorageMessage& msg) const void FileStorHandlerImpl::abortQueuedCommandsForBuckets(Disk& disk, const AbortBucketOperationsCommand& cmd) { - Disk& t(disk); - vespalib::MonitorGuard diskLock(t.lock); - typedef PriorityQueue::iterator iter_t; api::ReturnCode abortedCode(api::ReturnCode::ABORTED, "Sending distributor no longer owns bucket operation was bound to, " "or storage node went down"); - for (iter_t it(t.queue.begin()), e(t.queue.end()); it != e;) { - api::StorageMessage& msg(*it->_command); - if (messageMayBeAborted(msg) && cmd.shouldAbort(it->_bucket)) { - LOG(debug, "Aborting operation %s as it is bound for bucket %s", - msg.toString().c_str(), it->_bucket.getBucketId().toString().c_str()); - std::shared_ptr<api::StorageReply> msgReply = static_cast<api::StorageCommand&>(msg).makeReply(); - msgReply->setResult(abortedCode); - _messageSender.sendReply(msgReply); - - it = t.queue.erase(it); - } else { - ++it; - } - } -} - -bool -FileStorHandlerImpl::diskHasActiveOperationForAbortedBucket(const Disk& disk, - const AbortBucketOperationsCommand& cmd) const -{ - for (auto& lockedBucket : disk.lockedBuckets) { - if (cmd.shouldAbort(lockedBucket.first)) { - LOG(spam, "Disk had active operation for aborted bucket %s, waiting for it to complete...", - lockedBucket.first.toString().c_str()); - return true; - } - } - return false; -} - -void -FileStorHandlerImpl::waitUntilNoActiveOperationsForAbortedBuckets(Disk& disk, const AbortBucketOperationsCommand& cmd) -{ - vespalib::MonitorGuard guard(disk.lock); - while (diskHasActiveOperationForAbortedBucket(disk, cmd)) { - guard.wait(); + auto aborted = disk.abort(cmd); + for (auto & msgReply : aborted) { + msgReply->setResult(abortedCode); + _messageSender.sendReply(msgReply); } - guard.broadcast(); } void @@ -332,82 +272,46 @@ FileStorHandlerImpl::abortQueuedOperations(const AbortBucketOperationsCommand& c { // Do queue clearing and active operation waiting in two passes // to allow disk threads to drain running operations in parallel. - for (uint32_t i = 0; i < _diskInfo.size(); ++i) { - abortQueuedCommandsForBuckets(_diskInfo[i], cmd); + for (Disk & disk : _diskInfo) { + abortQueuedCommandsForBuckets(disk, cmd); } - for (uint32_t i = 0; i < _diskInfo.size(); ++i) { - waitUntilNoActiveOperationsForAbortedBuckets(_diskInfo[i], cmd); + for (Disk & disk : _diskInfo) { + disk.waitInactive(cmd); } } void FileStorHandlerImpl::updateMetrics(const MetricLockGuard &) { - for (uint32_t i=0; i<_diskInfo.size(); ++i) { - const Disk& t(_diskInfo[i]); - vespalib::MonitorGuard lockGuard(t.lock); - t.metrics->pendingMerges.addValue(_mergeStates.size()); - t.metrics->queueSize.addValue(t.getQueueSize()); + for (Disk & disk : _diskInfo) { + vespalib::MonitorGuard lockGuard(_mergeStatesLock); + disk.metrics->pendingMerges.addValue(_mergeStates.size()); + disk.metrics->queueSize.addValue(disk.getQueueSize()); } } +uint32_t +FileStorHandlerImpl::getNextStripeId(uint32_t disk) { + return _diskInfo[disk].getNextStripeId(); +} + + FileStorHandler::LockedMessage & -FileStorHandlerImpl::getNextMessage(uint16_t disk, FileStorHandler::LockedMessage& lck) +FileStorHandlerImpl::getNextMessage(uint16_t diskId, uint32_t stripeId, FileStorHandler::LockedMessage& lck) { document::Bucket bucket(lck.first->getBucket()); - LOG(spam, "Disk %d retrieving message for buffered bucket %s", disk, bucket.getBucketId().toString().c_str()); - - assert(disk < _diskInfo.size()); - Disk& t(_diskInfo[disk]); - - if (getDiskState(disk) == FileStorHandler::CLOSED) { - lck.second.reset(); - return lck; - } + LOG(spam, "Disk %d retrieving message for buffered bucket %s", diskId, bucket.getBucketId().toString().c_str()); - vespalib::MonitorGuard lockGuard(t.lock); - BucketIdx& idx = bmi::get<2>(t.queue); - std::pair<BucketIdx::iterator, BucketIdx::iterator> range = idx.equal_range(bucket); + assert(diskId < _diskInfo.size()); + Disk& disk(_diskInfo[diskId]); - // No more for this bucket. - if (range.first == range.second) { + if (disk.isClosed()) { lck.second.reset(); return lck; } - api::StorageMessage & m(*range.first->_command); - mbus::Trace& trace = m.getTrace(); - - MBUS_TRACE(trace, 9, "FileStorHandler: Message identified by disk thread looking for more requests to active bucket."); - - uint64_t waitTime(const_cast<metrics::MetricTimer&>(range.first->_timer).stop( - t.metrics->averageQueueWaitingTime[m.getLoadType()])); - - LOG(debug, "Message %s waited %" PRIu64 " ms in storage queue (bucket %s), timeout %d", - m.toString().c_str(), waitTime, bucket.getBucketId().toString().c_str(), - static_cast<api::StorageCommand&>(m).getTimeout()); - - if (m.getType().isReply() || - waitTime < static_cast<api::StorageCommand&>(m).getTimeout()) - { - std::shared_ptr<api::StorageMessage> msg = std::move(range.first->_command); - idx.erase(range.first); - lck.second.swap(msg); - lockGuard.broadcast(); - lockGuard.unlock(); - return lck; - } else { - std::shared_ptr<api::StorageReply> msgReply = static_cast<api::StorageCommand&>(m).makeReply(); - idx.erase(range.first); - lockGuard.broadcast(); - lockGuard.unlock(); - msgReply->setResult(api::ReturnCode(api::ReturnCode::TIMEOUT, "Message waited too long in storage queue")); - _messageSender.sendReply(msgReply); - - lck.second.reset(); - return lck; - } + return disk.getNextMessage(stripeId, lck); } bool @@ -415,7 +319,7 @@ FileStorHandlerImpl::tryHandlePause(uint16_t disk) const { if (isPaused()) { // Wait a single time to see if filestor gets unpaused. - if (getDiskState(disk) != FileStorHandler::CLOSED) { + if (!_diskInfo[disk].isClosed()) { vespalib::MonitorGuard g(_pauseMonitor); g.wait(100); } @@ -425,13 +329,7 @@ FileStorHandlerImpl::tryHandlePause(uint16_t disk) const } bool -FileStorHandlerImpl::diskIsClosed(uint16_t disk) const -{ - return (getDiskState(disk) == FileStorHandler::CLOSED); -} - -bool -FileStorHandlerImpl::messageTimedOutInQueue(const api::StorageMessage& msg, uint64_t waitTime) const +FileStorHandlerImpl::messageTimedOutInQueue(const api::StorageMessage& msg, uint64_t waitTime) { if (msg.getType().isReply()) { return false; // Replies must always be processed and cannot time out. @@ -439,15 +337,8 @@ FileStorHandlerImpl::messageTimedOutInQueue(const api::StorageMessage& msg, uint return (waitTime >= static_cast<const api::StorageCommand&>(msg).getTimeout()); } -std::unique_ptr<FileStorHandler::BucketLockInterface> -FileStorHandlerImpl::takeDiskBucketLockOwnership(const vespalib::MonitorGuard & guard, Disk& disk, - const document::Bucket &bucket, const api::StorageMessage& msg) -{ - return std::make_unique<BucketLock>(guard, disk, bucket, msg.getPriority(), msg.getSummary()); -} - std::unique_ptr<api::StorageReply> -FileStorHandlerImpl::makeQueueTimeoutReply(api::StorageMessage& msg) const +FileStorHandlerImpl::makeQueueTimeoutReply(api::StorageMessage& msg) { assert(!msg.getType().isReply()); std::unique_ptr<api::StorageReply> msgReply = static_cast<api::StorageCommand&>(msg).makeReply(); @@ -455,95 +346,30 @@ FileStorHandlerImpl::makeQueueTimeoutReply(api::StorageMessage& msg) const return msgReply; } -namespace { - bool - bucketIsLockedOnDisk(const document::Bucket &id, const FileStorHandlerImpl::Disk &t) { - return (id.getBucketId().getRawId() != 0 && t.isLocked(id)); - } -} - FileStorHandler::LockedMessage -FileStorHandlerImpl::getNextMessage(uint16_t disk) +FileStorHandlerImpl::getNextMessage(uint16_t disk, uint32_t stripeId) { assert(disk < _diskInfo.size()); if (!tryHandlePause(disk)) { return {}; // Still paused, return to allow tick. } - Disk& t(_diskInfo[disk]); - - vespalib::MonitorGuard lockGuard(t.lock); - // Try to grab a message+lock, immediately retrying once after a wait - // if none can be found and then exiting if the same is the case on the - // second attempt. This is key to allowing the run loop to register - // ticks at regular intervals while not busy-waiting. - for (int attempt = 0; (attempt < 2) && ! diskIsClosed(disk); ++attempt) { - PriorityIdx& idx(bmi::get<1>(t.queue)); - PriorityIdx::iterator iter(idx.begin()), end(idx.end()); - - while (iter != end && bucketIsLockedOnDisk(iter->_bucket, t)) { - iter++; - } - if (iter != end) { - if (! isPaused()) { - return getMessage(lockGuard, t, idx, iter); - } - } - if (attempt == 0) { - lockGuard.wait(_getNextMessageTimeout); - } - } - return {}; // No message fetched. -} - -FileStorHandler::LockedMessage -FileStorHandlerImpl::getMessage(vespalib::MonitorGuard & guard, Disk & t, PriorityIdx & idx, PriorityIdx::iterator iter) { - - api::StorageMessage & m(*iter->_command); - const uint64_t waitTime( - const_cast<metrics::MetricTimer &>(iter->_timer).stop(t.metrics->averageQueueWaitingTime[m.getLoadType()])); - - mbus::Trace &trace(m.getTrace()); - MBUS_TRACE(trace, 9, "FileStorHandler: Message identified by disk thread."); - LOG(debug, "Message %s waited %" PRIu64 " ms in storage queue, timeout %d", - m.toString().c_str(), waitTime, static_cast<api::StorageCommand &>(m).getTimeout()); - - std::shared_ptr<api::StorageMessage> msg = std::move(iter->_command); - document::Bucket bucket(iter->_bucket); - idx.erase(iter); // iter not used after this point. - - if (!messageTimedOutInQueue(*msg, waitTime)) { - auto locker = takeDiskBucketLockOwnership(guard, t, bucket, *msg); - guard.unlock(); - MBUS_TRACE(trace, 9, "FileStorHandler: Got lock on bucket"); - return std::move(FileStorHandler::LockedMessage(std::move(locker), std::move(msg))); - } else { - std::shared_ptr<api::StorageReply> msgReply(makeQueueTimeoutReply(*msg)); - guard.broadcast(); // XXX: needed here? - guard.unlock(); - _messageSender.sendReply(msgReply); - return {}; - } + return _diskInfo[disk].getNextMessage(stripeId, _getNextMessageTimeout); } std::shared_ptr<FileStorHandler::BucketLockInterface> -FileStorHandlerImpl::lock(const document::Bucket &bucket, uint16_t disk) +FileStorHandlerImpl::Stripe::lock(const document::Bucket &bucket) { - assert(disk < _diskInfo.size()); - - Disk& t(_diskInfo[disk]); - LOG(spam, "Acquiring filestor lock for %s on disk %d", bucket.getBucketId().toString().c_str(), disk); - - vespalib::MonitorGuard lockGuard(t.lock); + vespalib::MonitorGuard guard(_lock); - while (bucket.getBucketId().getRawId() != 0 && t.isLocked(bucket)) { + while (isLocked(guard, bucket)) { LOG(spam, "Contending for filestor lock for %s", bucket.getBucketId().toString().c_str()); - lockGuard.wait(100); + guard.wait(100); } - auto locker = std::make_shared<BucketLock>(lockGuard, t, bucket, 255, "External lock"); + auto locker = std::make_shared<BucketLock>(guard, *this, bucket, 255, "External lock"); - lockGuard.broadcast(); + guard.broadcast(); return locker; } @@ -868,8 +694,8 @@ void FileStorHandlerImpl::remapQueueNoLock(Disk& from, const RemapInfo& source, std::vector<RemapInfo*>& targets, Operation op) { - BucketIdx& idx(bmi::get<2>(from.queue)); - std::pair<BucketIdx::iterator, BucketIdx::iterator> range(idx.equal_range(source.bucket)); + BucketIdx& idx(from.stripe(source.bucket).exposeBucketIdx()); + auto range(idx.equal_range(source.bucket)); std::vector<MessageEntry> entriesFound; @@ -910,7 +736,7 @@ FileStorHandlerImpl::remapQueueNoLock(Disk& from, const RemapInfo& source, } else { entry._bucket = bucket; // Move to correct disk queue if needed - _diskInfo[targetDisk].queue.emplace_back(std::move(entry)); + _diskInfo[targetDisk].stripe(bucket).exposeQueue().emplace_back(std::move(entry)); } } @@ -924,11 +750,11 @@ FileStorHandlerImpl::remapQueue(const RemapInfo& source, RemapInfo& target, Oper MultiLockGuard guard; Disk& from(_diskInfo[source.diskIndex]); - guard.addLock(from.lock, source.diskIndex); + guard.addLock(from.stripe(source.bucket).exposeLock(), source.diskIndex); Disk& to1(_diskInfo[target.diskIndex]); if (target.bucket.getBucketId().getRawId() != 0) { - guard.addLock(to1.lock, target.diskIndex); + guard.addLock(to1.stripe(target.bucket).exposeLock(), target.diskIndex); } std::vector<RemapInfo*> targets; @@ -947,16 +773,16 @@ FileStorHandlerImpl::remapQueue(const RemapInfo& source, RemapInfo& target1, Rem MultiLockGuard guard; Disk& from(_diskInfo[source.diskIndex]); - guard.addLock(from.lock, source.diskIndex); + guard.addLock(from.stripe(source.bucket).exposeLock(), source.diskIndex); Disk& to1(_diskInfo[target1.diskIndex]); if (target1.bucket.getBucketId().getRawId() != 0) { - guard.addLock(to1.lock, target1.diskIndex); + guard.addLock(to1.stripe(target1.bucket).exposeLock(), target1.diskIndex); } Disk& to2(_diskInfo[target2.diskIndex]); if (target2.bucket.getBucketId().getRawId() != 0) { - guard.addLock(to2.lock, target2.diskIndex); + guard.addLock(to2.stripe(target2.bucket).exposeLock(), target2.diskIndex); } guard.lock(); @@ -969,12 +795,11 @@ FileStorHandlerImpl::remapQueue(const RemapInfo& source, RemapInfo& target1, Rem } void -FileStorHandlerImpl::failOperations(const document::Bucket &bucket, uint16_t fromDisk, const api::ReturnCode& err) +FileStorHandlerImpl::Stripe::failOperations(const document::Bucket &bucket, const api::ReturnCode& err) { - Disk& from(_diskInfo[fromDisk]); - vespalib::MonitorGuard lockGuard(from.lock); + vespalib::MonitorGuard guard(_lock); - BucketIdx& idx(bmi::get<2>(from.queue)); + BucketIdx& idx(bmi::get<2>(_queue)); std::pair<BucketIdx::iterator, BucketIdx::iterator> range(idx.equal_range(bucket)); for (auto iter = range.first; iter != range.second;) { @@ -1032,47 +857,260 @@ FileStorHandlerImpl::MessageEntry::MessageEntry(MessageEntry && entry) noexcept FileStorHandlerImpl::MessageEntry::~MessageEntry() { } -FileStorHandlerImpl::Disk::Disk() - : lock(), - queue(), - lockedBuckets(100), - metrics(0), +FileStorHandlerImpl::Disk::Disk(const FileStorHandlerImpl & owner, MessageSender & messageSender, uint32_t numThreads) + : metrics(0), + _nextStripeId(0), + _stripes(numThreads, Stripe(owner, messageSender)), state(FileStorHandler::AVAILABLE) +{ + assert(numThreads > 0); +} + +FileStorHandlerImpl::Disk::Disk(Disk && rhs) noexcept + : metrics(std::move(rhs.metrics)), + _nextStripeId(rhs._nextStripeId), + _stripes(std::move(rhs._stripes)), + state(rhs.state.load()) { } -FileStorHandlerImpl::Disk::~Disk() { } +FileStorHandlerImpl::Disk::~Disk() = default; +FileStorHandlerImpl::Stripe::~Stripe() = default; + +void +FileStorHandlerImpl::Disk::flush() +{ + for (auto & stripe : _stripes) { + stripe.flush(); + } +} + +void +FileStorHandlerImpl::Disk::broadcast() +{ + for (auto & stripe : _stripes) { + stripe.broadcast(); + } +} bool -FileStorHandlerImpl::Disk::isLocked(const document::Bucket& bucket) const noexcept +FileStorHandlerImpl::Disk::schedule(const std::shared_ptr<api::StorageMessage>& msg) { - return (lockedBuckets.find(bucket) != lockedBuckets.end()); + MessageEntry messageEntry(msg, getStorageMessageBucket(*msg)); + if (getState() == FileStorHandler::AVAILABLE) { + stripe(messageEntry._bucket).schedule(std::move(messageEntry)); + } else { + return false; + } + return true; +} + +FileStorHandlerImpl::Stripe::Stripe(const FileStorHandlerImpl & owner, MessageSender & messageSender) + : _owner(owner), + _messageSender(messageSender) +{ } +FileStorHandler::LockedMessage +FileStorHandlerImpl::Stripe::getNextMessage(uint32_t timeout, Disk & disk) +{ + vespalib::MonitorGuard guard(_lock); + // Try to grab a message+lock, immediately retrying once after a wait + // if none can be found and then exiting if the same is the case on the + // second attempt. This is key to allowing the run loop to register + // ticks at regular intervals while not busy-waiting. + for (int attempt = 0; (attempt < 2) && ! disk.isClosed() && !_owner.isPaused(); ++attempt) { + PriorityIdx& idx(bmi::get<1>(_queue)); + PriorityIdx::iterator iter(idx.begin()), end(idx.end()); + + while (iter != end && isLocked(guard, iter->_bucket)) { + iter++; + } + if (iter != end) { + return getMessage(guard, disk, idx, iter); + } + if (attempt == 0) { + guard.wait(timeout); + } + } + return {}; // No message fetched. +} + +FileStorHandler::LockedMessage & +FileStorHandlerImpl::Stripe::getNextMessage(Disk & disk, FileStorHandler::LockedMessage& lck) +{ + const document::Bucket & bucket = lck.second->getBucket(); + vespalib::MonitorGuard guard(_lock); + BucketIdx& idx = bmi::get<2>(_queue); + std::pair<BucketIdx::iterator, BucketIdx::iterator> range = idx.equal_range(bucket); + + // No more for this bucket. + if (range.first == range.second) { + lck.second.reset(); + return lck; + } + + api::StorageMessage & m(*range.first->_command); + + uint64_t waitTime(const_cast<metrics::MetricTimer&>(range.first->_timer).stop(disk.metrics->averageQueueWaitingTime[m.getLoadType()])); + + if (!messageTimedOutInQueue(m, waitTime)) { + std::shared_ptr<api::StorageMessage> msg = std::move(range.first->_command); + idx.erase(range.first); + lck.second.swap(msg); + guard.broadcast(); + } else { + std::shared_ptr<api::StorageReply> msgReply = static_cast<api::StorageCommand&>(m).makeReply(); + idx.erase(range.first); + guard.broadcast(); + guard.unlock(); + msgReply->setResult(api::ReturnCode(api::ReturnCode::TIMEOUT, "Message waited too long in storage queue")); + _messageSender.sendReply(msgReply); + + lck.second.reset(); + } + return lck; +} + +FileStorHandler::LockedMessage +FileStorHandlerImpl::Stripe::getMessage(vespalib::MonitorGuard & guard, Disk & disk, PriorityIdx & idx, PriorityIdx::iterator iter) { + + api::StorageMessage & m(*iter->_command); + uint64_t waitTime(const_cast<metrics::MetricTimer &>(iter->_timer).stop(disk.metrics->averageQueueWaitingTime[m.getLoadType()])); + + std::shared_ptr<api::StorageMessage> msg = std::move(iter->_command); + document::Bucket bucket(iter->_bucket); + idx.erase(iter); // iter not used after this point. + + if (!messageTimedOutInQueue(*msg, waitTime)) { + auto locker = std::make_unique<BucketLock>(guard, *this, bucket, msg->getPriority(), msg->getSummary());; + guard.unlock(); + return FileStorHandler::LockedMessage(std::move(locker), std::move(msg)); + } else { + std::shared_ptr<api::StorageReply> msgReply(makeQueueTimeoutReply(*msg)); + guard.broadcast(); // XXX: needed here? + guard.unlock(); + _messageSender.sendReply(msgReply); + return {}; + } +} + +void +FileStorHandlerImpl::Disk::waitUntilNoLocks() const +{ + for (const auto & stripe : _stripes) { + stripe.waitUntilNoLocks(); + } +} + +void +FileStorHandlerImpl::Stripe::waitUntilNoLocks() const +{ + vespalib::MonitorGuard lockGuard(_lock); + while (!_lockedBuckets.empty()) { + lockGuard.wait(); + } +} + +void +FileStorHandlerImpl::Disk::waitInactive(const AbortBucketOperationsCommand& cmd) const { + for (auto & stripe : _stripes) { + stripe.waitInactive(cmd); + } +} + +void +FileStorHandlerImpl::Stripe::waitInactive(const AbortBucketOperationsCommand& cmd) const { + vespalib::MonitorGuard lockGuard(_lock); + while (hasActive(lockGuard, cmd)) { + lockGuard.wait(); + } +} + +bool +FileStorHandlerImpl::Stripe::hasActive(vespalib::MonitorGuard &, const AbortBucketOperationsCommand& cmd) const { + for (auto& lockedBucket : _lockedBuckets) { + if (cmd.shouldAbort(lockedBucket.first)) { + LOG(spam, "Disk had active operation for aborted bucket %s, waiting for it to complete...", + lockedBucket.first.toString().c_str()); + return true; + } + } + return false; +} + +std::vector<std::shared_ptr<api::StorageReply>> +FileStorHandlerImpl::Disk::abort(const AbortBucketOperationsCommand& cmd) +{ + std::vector<std::shared_ptr<api::StorageReply>> aborted; + for (auto & stripe : _stripes) { + stripe.abort(aborted, cmd); + } + return aborted; +} + +void FileStorHandlerImpl::Stripe::abort(std::vector<std::shared_ptr<api::StorageReply>> & aborted, + const AbortBucketOperationsCommand& cmd) +{ + vespalib::MonitorGuard lockGuard(_lock); + for (auto it(_queue.begin()); it != _queue.end();) { + api::StorageMessage& msg(*it->_command); + if (messageMayBeAborted(msg) && cmd.shouldAbort(it->_bucket)) { + aborted.emplace_back(static_cast<api::StorageCommand&>(msg).makeReply()); + it = _queue.erase(it); + } else { + ++it; + } + } +} + +bool FileStorHandlerImpl::Stripe::schedule(MessageEntry messageEntry) +{ + vespalib::MonitorGuard lockGuard(_lock); + _queue.emplace_back(std::move(messageEntry)); + lockGuard.broadcast(); + return true; +} + +void +FileStorHandlerImpl::Stripe::flush() +{ + vespalib::MonitorGuard lockGuard(_lock); + while (!(_queue.empty() && _lockedBuckets.empty())) { + LOG(debug, "Still %ld in queue and %ld locked buckets", _queue.size(), _lockedBuckets.size()); + lockGuard.wait(100); + } +} +bool +FileStorHandlerImpl::Stripe::isLocked(const vespalib::MonitorGuard &, const document::Bucket& bucket) const noexcept +{ + return (bucket.getBucketId().getRawId() != 0) && (_lockedBuckets.find(bucket) != _lockedBuckets.end()); } uint32_t FileStorHandlerImpl::Disk::getQueueSize() const noexcept { - return queue.size(); + size_t sum(0); + for (const auto & stripe : _stripes) { + sum += stripe.getQueueSize(); + } + return sum; } uint32_t FileStorHandlerImpl::getQueueSize(uint16_t disk) const { - const Disk& t(_diskInfo[disk]); - vespalib::MonitorGuard lockGuard(t.lock); - return t.getQueueSize(); + return _diskInfo[disk].getQueueSize(); } -FileStorHandlerImpl::BucketLock::BucketLock(const vespalib::MonitorGuard & guard, Disk& disk, +FileStorHandlerImpl::BucketLock::BucketLock(const vespalib::MonitorGuard & guard, Stripe& stripe, const document::Bucket &bucket, uint8_t priority, const vespalib::stringref & statusString) - : _disk(disk), + : _stripe(stripe), _bucket(bucket) { (void) guard; if (_bucket.getBucketId().getRawId() != 0) { // Lock the bucket and wait until it is not the current operation for // the disk itself. - _disk.lockedBuckets.insert(std::make_pair(_bucket, Disk::LockEntry(priority, statusString))); + _stripe.lock(guard, _bucket, Stripe::LockEntry(priority, statusString)); LOG(debug, "Locked bucket %s with priority %u", bucket.getBucketId().toString().c_str(), priority); @@ -1086,31 +1124,73 @@ FileStorHandlerImpl::BucketLock::BucketLock(const vespalib::MonitorGuard & guard FileStorHandlerImpl::BucketLock::~BucketLock() { if (_bucket.getBucketId().getRawId() != 0) { - vespalib::MonitorGuard lockGuard(_disk.lock); - _disk.lockedBuckets.erase(_bucket); + _stripe.release(_bucket); LOG(debug, "Unlocked bucket %s", _bucket.getBucketId().toString().c_str()); LOG_BUCKET_OPERATION_SET_LOCK_STATE( _bucket.getBucketId(), "released filestor lock", true, debug::BucketOperationLogger::State::BUCKET_UNLOCKED); - lockGuard.broadcast(); } } std::string -FileStorHandlerImpl::dumpQueue(uint16_t disk) const +FileStorHandlerImpl::Disk::dumpQueue() const +{ + std::ostringstream os; + for (const Stripe & stripe : _stripes) { + stripe.dumpQueue(os); + } + return os.str(); +} + +void +FileStorHandlerImpl::Disk::dumpQueueHtml(std::ostream & os) const +{ + for (const Stripe & stripe : _stripes) { + stripe.dumpQueueHtml(os); + } +} + +void +FileStorHandlerImpl::Disk::dumpActiveHtml(std::ostream & os) const +{ + for (const Stripe & stripe : _stripes) { + stripe.dumpActiveHtml(os); + } +} + +void +FileStorHandlerImpl::Stripe::dumpQueueHtml(std::ostream & os) const { - std::ostringstream ost; + vespalib::MonitorGuard guard(_lock); - const Disk& t(_diskInfo[disk]); - vespalib::MonitorGuard lockGuard(t.lock); + const PriorityIdx& idx = bmi::get<1>(_queue); + for (const auto & entry : idx) { + os << "<li>" << entry._command->toString() << " (priority: " + << (int)entry._command->getPriority() << ")</li>\n"; + } +} - const PriorityIdx& idx = bmi::get<1>(t.queue); - for (PriorityIdx::const_iterator it = idx.begin(); it != idx.end(); it++) { - ost << it->_bucket.getBucketId() << ": " << it->_command->toString() << " (priority: " - << (int)it->_command->getPriority() << ")\n"; +void +FileStorHandlerImpl::Stripe::dumpActiveHtml(std::ostream & os) const +{ + uint32_t now = time(nullptr); + vespalib::MonitorGuard guard(_lock); + for (const auto & entry : _lockedBuckets) { + os << entry.second.statusString << " (" << entry.first.getBucketId() + << ") Running for " << (now - entry.second.timestamp) << " secs<br/>\n"; } +} + +void +FileStorHandlerImpl::Stripe::dumpQueue(std::ostream & os) const +{ + vespalib::MonitorGuard guard(_lock); - return ost.str(); + const PriorityIdx& idx = bmi::get<1>(_queue); + for (const auto & entry : idx) { + os << entry._bucket.getBucketId() << ": " << entry._command->toString() << " (priority: " + << (int)entry._command->getPriority() << ")\n"; + } } void @@ -1120,37 +1200,25 @@ FileStorHandlerImpl::getStatus(std::ostream& out, const framework::HttpUrlPath& out << "<h1>Filestor handler</h1>\n"; for (uint32_t i=0; i<_diskInfo.size(); ++i) { out << "<h2>Disk " << i << "</h2>\n"; - const Disk& t(_diskInfo[i]); - vespalib::MonitorGuard lockGuard(t.lock); - out << "Queue size: " << t.getQueueSize() << "<br>\n"; + const Disk& disk(_diskInfo[i]); + out << "Queue size: " << disk.getQueueSize() << "<br>\n"; out << "Disk state: "; - switch (t.getState()) { - case FileStorHandler::AVAILABLE: out << "AVAILABLE"; break; - case FileStorHandler::DISABLED: out << "DISABLED"; break; - case FileStorHandler::CLOSED: out << "CLOSED"; break; + switch (disk.getState()) { + case FileStorHandler::AVAILABLE: out << "AVAILABLE"; break; + case FileStorHandler::DISABLED: out << "DISABLED"; break; + case FileStorHandler::CLOSED: out << "CLOSED"; break; } out << "<h4>Active operations</h4>\n"; - for (const auto& lockedBucket : t.lockedBuckets) { - out << lockedBucket.second.statusString - << " (" << lockedBucket.first.getBucketId() << ") Running for " - << (_component.getClock().getTimeInSeconds().getTime() - lockedBucket.second.timestamp) - << " secs<br/>\n"; - } + disk.dumpActiveHtml(out); if (!verbose) continue; out << "<h4>Input queue</h4>\n"; - out << "<ul>\n"; - const PriorityIdx& idx = bmi::get<1>(t.queue); - for (PriorityIdx::const_iterator it = idx.begin(); it != idx.end(); it++) { - out << "<li>" << it->_command->toString() << " (priority: " - << (int)it->_command->getPriority() << ")</li>\n"; - } + disk.dumpQueueHtml(out); out << "</ul>\n"; } + vespalib::LockGuard mergeGuard(_mergeStatesLock); out << "<tr><td>Active merge operations</td><td>" << _mergeStates.size() << "</td></tr>\n"; - - // Print merge states if (verbose) { out << "<h4>Active merges</h4>\n"; if (_mergeStates.size() == 0) { @@ -1158,8 +1226,7 @@ FileStorHandlerImpl::getStatus(std::ostream& out, const framework::HttpUrlPath& } for (auto & entry : _mergeStates) { out << "<b>" << entry.first.toString() << "</b><br>\n"; - // << "<p>" << it->second << "</p>\n"; // Gets very spammy with - // the complete state here.. + // << "<p>" << it->second << "</p>\n"; // Gets very spammy with the complete state here.. } } } @@ -1167,12 +1234,8 @@ FileStorHandlerImpl::getStatus(std::ostream& out, const framework::HttpUrlPath& void FileStorHandlerImpl::waitUntilNoLocks() { - for (uint32_t i=0; i<_diskInfo.size(); ++i) { - const Disk& t(_diskInfo[i]); - vespalib::MonitorGuard lockGuard(t.lock); - while (!t.lockedBuckets.empty()) { - lockGuard.wait(); - } + for (const auto & disk : _diskInfo) { + disk.waitUntilNoLocks(); } } diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h index fb084c41b7c..dd3dc01d69f 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h @@ -71,29 +71,73 @@ public: using PriorityIdx = bmi::nth_index<PriorityQueue, 1>::type; using BucketIdx = bmi::nth_index<PriorityQueue, 2>::type; - struct Disk { - vespalib::Monitor lock; - PriorityQueue queue; + class Disk; + class Stripe { + public: struct LockEntry { - uint32_t timestamp; - uint8_t priority; + uint32_t timestamp; + uint8_t priority; vespalib::string statusString; - LockEntry() - : timestamp(0), priority(0), statusString() - { } + LockEntry() : timestamp(0), priority(0), statusString() { } LockEntry(uint8_t priority_, vespalib::stringref status) - : timestamp(time(NULL)), - priority(priority_), - statusString(status) + : timestamp(time(nullptr)), priority(priority_), statusString(status) { } }; + Stripe(const FileStorHandlerImpl & owner, MessageSender & messageSender); + ~Stripe(); + void flush(); + bool schedule(MessageEntry messageEntry); + void waitUntilNoLocks() const; + void abort(std::vector<std::shared_ptr<api::StorageReply>> & aborted, const AbortBucketOperationsCommand& cmd); + void waitInactive(const AbortBucketOperationsCommand& cmd) const; + + void broadcast() { + vespalib::MonitorGuard guard(_lock); + guard.broadcast(); + } + size_t getQueueSize() const { + vespalib::MonitorGuard guard(_lock); + return _queue.size(); + } + void release(const document::Bucket & bucket){ + vespalib::MonitorGuard guard(_lock); + _lockedBuckets.erase(bucket); + guard.broadcast(); + } + bool isLocked(const vespalib::MonitorGuard &, const document::Bucket&) const noexcept; + + void lock(const vespalib::MonitorGuard &, const document::Bucket & bucket, const LockEntry & lockEntry) { + _lockedBuckets.insert(std::make_pair(bucket, lockEntry)); + } + + std::shared_ptr<FileStorHandler::BucketLockInterface> lock(const document::Bucket & bucket); + void failOperations(const document::Bucket & bucket, const api::ReturnCode & code); + + FileStorHandler::LockedMessage getNextMessage(uint32_t timeout, Disk & disk); + FileStorHandler::LockedMessage & getNextMessage(Disk & disk, FileStorHandler::LockedMessage& lock); + void dumpQueue(std::ostream & os) const; + void dumpActiveHtml(std::ostream & os) const; + void dumpQueueHtml(std::ostream & os) const; + vespalib::Monitor & exposeLock() { return _lock; } + PriorityQueue & exposeQueue() { return _queue; } + BucketIdx & exposeBucketIdx() { return bmi::get<2>(_queue); } + private: + bool hasActive(vespalib::MonitorGuard & monitor, const AbortBucketOperationsCommand& cmd) const; + FileStorHandler::LockedMessage getMessage(vespalib::MonitorGuard & guard, Disk & t, + PriorityIdx & idx, PriorityIdx::iterator iter); typedef vespalib::hash_map<document::Bucket, LockEntry, document::Bucket::hash> LockedBuckets; - LockedBuckets lockedBuckets; - FileStorDiskMetrics* metrics; + const FileStorHandlerImpl &_owner; + MessageSender &_messageSender; + vespalib::Monitor _lock; + PriorityQueue _queue; + LockedBuckets _lockedBuckets; + }; + struct Disk { + FileStorDiskMetrics * metrics; /** * No assumption on memory ordering around disk state reads should @@ -110,29 +154,61 @@ public: state.store(s, std::memory_order_relaxed); } - Disk(); + Disk(const FileStorHandlerImpl & owner, MessageSender & messageSender, uint32_t numThreads); + Disk(Disk &&) noexcept; ~Disk(); - bool isLocked(const document::Bucket&) const noexcept; + bool isClosed() const noexcept { return getState() == DiskState::CLOSED; } + + void flush(); + void broadcast(); + bool schedule(const std::shared_ptr<api::StorageMessage>& msg); + void waitUntilNoLocks() const; + std::vector<std::shared_ptr<api::StorageReply>> abort(const AbortBucketOperationsCommand& cmd); + void waitInactive(const AbortBucketOperationsCommand& cmd) const; + FileStorHandler::LockedMessage getNextMessage(uint32_t stripeId, uint32_t timeout) { + assert(_nextStripeId >= _stripes.size()); + return _stripes[stripeId].getNextMessage(timeout, *this); + } + FileStorHandler::LockedMessage & getNextMessage(uint32_t stripeId, FileStorHandler::LockedMessage & lck) { + return _stripes[stripeId].getNextMessage(*this, lck); + } + std::shared_ptr<FileStorHandler::BucketLockInterface> + lock(const document::Bucket & bucket) { + return stripe(bucket).lock(bucket); + } + void failOperations(const document::Bucket & bucket, const api::ReturnCode & code) { + stripe(bucket).failOperations(bucket, code); + } + uint32_t getQueueSize() const noexcept; + uint32_t getNextStripeId() { return (_nextStripeId++)%_stripes.size(); } + std::string dumpQueue() const; + void dumpActiveHtml(std::ostream & os) const; + void dumpQueueHtml(std::ostream & os) const; + Stripe & stripe(const document::Bucket & bucket) { + return _stripes[bucket.getBucketId().getRawId()%_stripes.size()]; + } private: + uint32_t _nextStripeId; + std::vector<Stripe> _stripes; std::atomic<DiskState> state; }; class BucketLock : public FileStorHandler::BucketLockInterface { public: - BucketLock(const vespalib::MonitorGuard & guard, Disk& disk, const document::Bucket &bucket, uint8_t priority, - const vespalib::stringref & statusString); + BucketLock(const vespalib::MonitorGuard & guard, Stripe& disk, const document::Bucket &bucket, + uint8_t priority, const vespalib::stringref & statusString); ~BucketLock(); const document::Bucket &getBucket() const override { return _bucket; } private: - Disk& _disk; + Stripe & _stripe; document::Bucket _bucket; }; - FileStorHandlerImpl(MessageSender&, FileStorMetrics&, + FileStorHandlerImpl(uint32_t numStripes, MessageSender&, FileStorMetrics&, const spi::PartitionStateList&, ServiceLayerComponentRegister&); ~FileStorHandlerImpl(); @@ -144,17 +220,18 @@ public: void close(); bool schedule(const std::shared_ptr<api::StorageMessage>&, uint16_t disk); - FileStorHandler::LockedMessage getNextMessage(uint16_t disk); - FileStorHandler::LockedMessage getMessage(vespalib::MonitorGuard & guard, Disk & t, PriorityIdx & idx, PriorityIdx::iterator iter); + FileStorHandler::LockedMessage getNextMessage(uint16_t disk, uint32_t stripeId); - FileStorHandler::LockedMessage & getNextMessage(uint16_t disk, FileStorHandler::LockedMessage& lock); + FileStorHandler::LockedMessage & getNextMessage(uint16_t disk, uint32_t stripeId, FileStorHandler::LockedMessage& lock); enum Operation { MOVE, SPLIT, JOIN }; void remapQueue(const RemapInfo& source, RemapInfo& target, Operation op); void remapQueue(const RemapInfo& source, RemapInfo& target1, RemapInfo& target2, Operation op); - void failOperations(const document::Bucket&, uint16_t fromDisk, const api::ReturnCode&); + void failOperations(const document::Bucket & bucket, uint16_t disk, const api::ReturnCode & code) { + _diskInfo[disk].failOperations(bucket, code); + } void sendCommand(const std::shared_ptr<api::StorageCommand>&) override; void sendReply(const std::shared_ptr<api::StorageReply>&) override; @@ -162,9 +239,12 @@ public: uint32_t getQueueSize() const; uint32_t getQueueSize(uint16_t disk) const; + uint32_t getNextStripeId(uint32_t disk); std::shared_ptr<FileStorHandler::BucketLockInterface> - lock(const document::Bucket&, uint16_t disk); + lock(const document::Bucket & bucket, uint16_t disk) { + return _diskInfo[disk].lock(bucket); + } void addMergeStatus(const document::Bucket&, MergeStatus::SP); MergeStatus& editMergeStatus(const document::Bucket&); @@ -172,7 +252,9 @@ public: uint32_t getNumActiveMerges() const; void clearMergeStatus(const document::Bucket&, const api::ReturnCode*); - std::string dumpQueue(uint16_t disk) const; + std::string dumpQueue(uint16_t disk) const { + return _diskInfo[disk].dumpQueue(); + } ResumeGuard pause(); void resume() override; void abortQueuedOperations(const AbortBucketOperationsCommand& cmd); @@ -213,44 +295,25 @@ private: bool isPaused() const { return _paused.load(std::memory_order_relaxed); } /** - * Return whether a disk has been shut down by the system (IO failure is - * the most likely candidate here) and should not serve any more requests. - */ - bool diskIsClosed(uint16_t disk) const; - - /** * Return whether msg has timed out based on waitTime and the message's * specified timeout. */ - bool messageTimedOutInQueue(const api::StorageMessage& msg, uint64_t waitTime) const; - - /** - * Assume ownership of lock for a given bucket on a given disk. - * Disk lock MUST have been taken prior to calling this function. - */ - std::unique_ptr<FileStorHandler::BucketLockInterface> - takeDiskBucketLockOwnership(const vespalib::MonitorGuard & guard, - Disk& disk, const document::Bucket &bucket, const api::StorageMessage& msg); + static bool messageTimedOutInQueue(const api::StorageMessage& msg, uint64_t waitTime); /** * Creates and returns a reply with api::TIMEOUT return code for msg. * Swaps (invalidates) context from msg into reply. */ - std::unique_ptr<api::StorageReply> makeQueueTimeoutReply(api::StorageMessage& msg) const; - bool messageMayBeAborted(const api::StorageMessage& msg) const; + static std::unique_ptr<api::StorageReply> makeQueueTimeoutReply(api::StorageMessage& msg); + static bool messageMayBeAborted(const api::StorageMessage& msg); void abortQueuedCommandsForBuckets(Disk& disk, const AbortBucketOperationsCommand& cmd); - bool diskHasActiveOperationForAbortedBucket(const Disk& disk, const AbortBucketOperationsCommand& cmd) const; - void waitUntilNoActiveOperationsForAbortedBuckets(Disk& disk, const AbortBucketOperationsCommand& cmd); // Update hook void updateMetrics(const MetricLockGuard &) override; - document::Bucket remapMessage(api::StorageMessage& msg, - const document::Bucket &source, - Operation op, - std::vector<RemapInfo*>& targets, - uint16_t& targetDisk, - api::ReturnCode& returnCode); + document::Bucket + remapMessage(api::StorageMessage& msg, const document::Bucket &source, Operation op, + std::vector<RemapInfo*>& targets, uint16_t& targetDisk, api::ReturnCode& returnCode); void remapQueueNoLock(Disk& from, const RemapInfo& source, std::vector<RemapInfo*>& targets, Operation op); diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp index b4735c2961a..1c2e415c39c 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp @@ -116,7 +116,7 @@ FileStorManager::configure(std::unique_ptr<vespa::config::content::StorFilestorC size_t numThreads = _config->numThreads; _metrics->initDiskMetrics(_disks.size(), _component.getLoadTypes()->getMetricLoadTypes(), numThreads); - _filestorHandler.reset(new FileStorHandler(*this, *_metrics, _partitions, _compReg)); + _filestorHandler.reset(new FileStorHandler(std::min(numThreads, numThreads), *this, *_metrics, _partitions, _compReg)); for (uint32_t i=0; i<_component.getDiskCount(); ++i) { if (_partitions[i].isUp()) { LOG(spam, "Setting up disk %u", i); diff --git a/storage/src/vespa/storage/persistence/persistencethread.cpp b/storage/src/vespa/storage/persistence/persistencethread.cpp index be6f9577642..130fb91e199 100644 --- a/storage/src/vespa/storage/persistence/persistencethread.cpp +++ b/storage/src/vespa/storage/persistence/persistencethread.cpp @@ -21,7 +21,8 @@ PersistenceThread::PersistenceThread(ServiceLayerComponentRegister& compReg, FileStorHandler& filestorHandler, FileStorThreadMetrics& metrics, uint16_t deviceIndex) - : _env(configUri, compReg, filestorHandler, metrics, deviceIndex, provider), + : _threadId(filestorHandler.getNextStripeId(deviceIndex)), + _env(configUri, compReg, filestorHandler, metrics, deviceIndex, provider), _warnOnSlowOperations(5000), _spi(provider), _processAllHandler(_env, provider), @@ -33,7 +34,7 @@ PersistenceThread::PersistenceThread(ServiceLayerComponentRegister& compReg, _closed(false) { std::ostringstream threadName; - threadName << "Disk " << _env._partition << " thread " << (void*) this; + threadName << "Disk " << _env._partition << " thread " << _threadId; _component.reset(new ServiceLayerComponent(compReg, threadName.str())); _bucketOwnershipNotifier.reset(new BucketOwnershipNotifier(*_component, filestorHandler)); framework::MilliSecTime maxProcessingTime(60 * 1000); @@ -1063,7 +1064,7 @@ void PersistenceThread::processMessages(FileStorHandler::LockedMessage & lock) trackers.push_back(std::move(tracker)); if (trackers.back()->getReply()->getResult().success()) { - _env._fileStorHandler.getNextMessage(_env._partition, lock); + _env._fileStorHandler.getNextMessage(_env._partition, _threadId, lock); } else { break; } @@ -1087,7 +1088,7 @@ PersistenceThread::run(framework::ThreadHandle& thread) while (!thread.interrupted() && !_env._fileStorHandler.closed(_env._partition)) { thread.registerTick(); - FileStorHandler::LockedMessage lock(_env._fileStorHandler.getNextMessage(_env._partition)); + FileStorHandler::LockedMessage lock(_env._fileStorHandler.getNextMessage(_env._partition, _threadId)); if (lock.first) { processMessages(lock); diff --git a/storage/src/vespa/storage/persistence/persistencethread.h b/storage/src/vespa/storage/persistence/persistencethread.h index 640614f7edf..c001a52a6be 100644 --- a/storage/src/vespa/storage/persistence/persistencethread.h +++ b/storage/src/vespa/storage/persistence/persistencethread.h @@ -47,6 +47,7 @@ public: MessageTracker::UP handleRecheckBucketInfo(RecheckBucketInfoCommand& cmd); private: + uint32_t _threadId; PersistenceUtil _env; uint32_t _warnOnSlowOperations; spi::PersistenceProvider& _spi; |