diff options
author | Henning Baldersheim <balder@oath.com> | 2018-03-16 15:08:57 +0100 |
---|---|---|
committer | Henning Baldersheim <balder@oath.com> | 2018-03-19 13:01:16 +0100 |
commit | 2a788ec1b3900d7ba853b3f5f89e49df7e016c7e (patch) | |
tree | 4154c20716ce69e501f17232372dc8c671fe0db2 | |
parent | 518d8a9a8bac25f96228dac74a2d74e65cc7e460 (diff) |
Remove thread priority concept, has not added anything but complexity for a very long time.
12 files changed, 94 insertions, 182 deletions
diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp index 8d03c9de54c..318881364d7 100644 --- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp +++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp @@ -265,15 +265,14 @@ std::unique_ptr<DiskThread> createThread(vdstestlib::DirConfig& config, spi::PersistenceProvider& provider, FileStorHandler& filestorHandler, FileStorThreadMetrics& metrics, - uint16_t deviceIndex, - uint8_t lowestPriority) + uint16_t deviceIndex) { (void) config; std::unique_ptr<DiskThread> disk; disk.reset(new PersistenceThread( node.getComponentRegister(), config.getConfigId(), provider, filestorHandler, metrics, - deviceIndex, lowestPriority)); + deviceIndex)); return disk; } @@ -645,23 +644,18 @@ FileStorManagerTest::testHandlerPriority() // Populate bucket with the given data for (uint32_t i = 1; i < 6; i++) { - std::shared_ptr<api::PutCommand> cmd( - new api::PutCommand(makeDocumentBucket(bucket), doc, 100)); - std::unique_ptr<api::StorageMessageAddress> address( - new api::StorageMessageAddress( - "storage", lib::NodeType::STORAGE, 3)); + auto cmd = std::make_shared<api::PutCommand>(makeDocumentBucket(bucket), doc, 100); + auto address = std::make_shared<api::StorageMessageAddress>("storage", lib::NodeType::STORAGE, 3); cmd->setAddress(*address); cmd->setPriority(i * 15); filestorHandler.schedule(cmd, 0); } - CPPUNIT_ASSERT_EQUAL(15, (int)filestorHandler.getNextMessage(0, 20).second->getPriority()); - CPPUNIT_ASSERT(filestorHandler.getNextMessage(0, 20).second.get() == NULL); - CPPUNIT_ASSERT_EQUAL(30, (int)filestorHandler.getNextMessage(0, 50).second->getPriority()); - CPPUNIT_ASSERT_EQUAL(45, (int)filestorHandler.getNextMessage(0, 50).second->getPriority()); - CPPUNIT_ASSERT(filestorHandler.getNextMessage(0, 50).second.get() == NULL); - CPPUNIT_ASSERT_EQUAL(60, (int)filestorHandler.getNextMessage(0, 255).second->getPriority()); - CPPUNIT_ASSERT_EQUAL(75, (int)filestorHandler.getNextMessage(0, 255).second->getPriority()); + CPPUNIT_ASSERT_EQUAL(15, (int)filestorHandler.getNextMessage(0).second->getPriority()); + CPPUNIT_ASSERT_EQUAL(30, (int)filestorHandler.getNextMessage(0).second->getPriority()); + CPPUNIT_ASSERT_EQUAL(45, (int)filestorHandler.getNextMessage(0).second->getPriority()); + CPPUNIT_ASSERT_EQUAL(60, (int)filestorHandler.getNextMessage(0).second->getPriority()); + CPPUNIT_ASSERT_EQUAL(75, (int)filestorHandler.getNextMessage(0).second->getPriority()); } class MessagePusherThread : public document::Runnable @@ -678,11 +672,9 @@ public: void run() override { while (!_done) { document::BucketIdFactory factory; - document::BucketId bucket(16, factory.getBucketId( - _doc->getId()).getRawId()); + document::BucketId bucket(16, factory.getBucketId(_doc->getId()).getRawId()); - std::shared_ptr<api::PutCommand> cmd( - new api::PutCommand(makeDocumentBucket(bucket), _doc, 100)); + auto cmd = std::make_shared<api::PutCommand>(makeDocumentBucket(bucket), _doc, 100); _handler.schedule(cmd, 0); FastOS_Thread::Sleep(1); } @@ -694,7 +686,7 @@ public: MessagePusherThread::MessagePusherThread(FileStorHandler& handler, Document::SP doc) : _handler(handler), _doc(doc), _done(false), _threadDone(false) {} -MessagePusherThread::~MessagePusherThread() {} +MessagePusherThread::~MessagePusherThread() = default; class MessageFetchingThread : public document::Runnable { public: @@ -711,7 +703,7 @@ public: void run() override { while (!_done) { - FileStorHandler::LockedMessage msg = _handler.getNextMessage(0, 255); + FileStorHandler::LockedMessage msg = _handler.getNextMessage(0); if (msg.second.get()) { uint32_t originalConfig = _config.load(); _fetchedCount++; @@ -824,15 +816,15 @@ FileStorManagerTest::testHandlerPause() filestorHandler.schedule(cmd, 0); } - CPPUNIT_ASSERT_EQUAL(15, (int)filestorHandler.getNextMessage(0, 255).second->getPriority()); + CPPUNIT_ASSERT_EQUAL(15, (int)filestorHandler.getNextMessage(0).second->getPriority()); { ResumeGuard guard = filestorHandler.pause(); (void)guard; - CPPUNIT_ASSERT(filestorHandler.getNextMessage(0, 255).second.get() == NULL); + CPPUNIT_ASSERT(filestorHandler.getNextMessage(0).second.get() == NULL); } - CPPUNIT_ASSERT_EQUAL(30, (int)filestorHandler.getNextMessage(0, 255).second->getPriority()); + CPPUNIT_ASSERT_EQUAL(30, (int)filestorHandler.getNextMessage(0).second->getPriority()); } namespace { @@ -957,21 +949,21 @@ FileStorManagerTest::testHandlerMulti() } { - FileStorHandler::LockedMessage lock = filestorHandler.getNextMessage(0, 255); + FileStorHandler::LockedMessage lock = filestorHandler.getNextMessage(0); CPPUNIT_ASSERT_EQUAL((uint64_t)1, getPutTime(lock.second)); - lock = filestorHandler.getNextMessage(0, lock, 255); + lock = filestorHandler.getNextMessage(0, lock); CPPUNIT_ASSERT_EQUAL((uint64_t)2, getPutTime(lock.second)); - lock = filestorHandler.getNextMessage(0, lock, 255); + lock = filestorHandler.getNextMessage(0, lock); CPPUNIT_ASSERT_EQUAL((uint64_t)3, getPutTime(lock.second)); } { - FileStorHandler::LockedMessage lock = filestorHandler.getNextMessage(0, 255); + FileStorHandler::LockedMessage lock = filestorHandler.getNextMessage(0); CPPUNIT_ASSERT_EQUAL((uint64_t)11, getPutTime(lock.second)); - lock = filestorHandler.getNextMessage(0, lock, 255); + lock = filestorHandler.getNextMessage(0, lock); CPPUNIT_ASSERT_EQUAL((uint64_t)12, getPutTime(lock.second)); } } @@ -984,8 +976,7 @@ FileStorManagerTest::testHandlerTimeout() // Setup a filestorthread to test DummyStorageLink top; DummyStorageLink *dummyManager; - top.push_back(std::unique_ptr<StorageLink>( - dummyManager = new DummyStorageLink)); + top.push_back(std::unique_ptr<StorageLink>(dummyManager = new DummyStorageLink)); top.open(); ForwardingMessageSender messageSender(*dummyManager); @@ -1036,7 +1027,7 @@ FileStorManagerTest::testHandlerTimeout() FastOS_Thread::Sleep(51); for (;;) { - auto lock = filestorHandler.getNextMessage(0, 255); + auto lock = filestorHandler.getNextMessage(0); if (lock.first.get()) { CPPUNIT_ASSERT_EQUAL(uint8_t(200), lock.second->getPriority()); break; @@ -1092,17 +1083,17 @@ FileStorManagerTest::testHandlerPriorityBlocking() } { - FileStorHandler::LockedMessage lock1 = filestorHandler.getNextMessage(0, 20); + FileStorHandler::LockedMessage lock1 = filestorHandler.getNextMessage(0); CPPUNIT_ASSERT_EQUAL(15, (int)lock1.second->getPriority()); LOG(debug, "Waiting for request that should time out"); - FileStorHandler::LockedMessage lock2 = filestorHandler.getNextMessage(0, 30); + FileStorHandler::LockedMessage lock2 = filestorHandler.getNextMessage(0); LOG(debug, "Got request that should time out"); CPPUNIT_ASSERT(lock2.second.get() == NULL); } { - FileStorHandler::LockedMessage lock1 = filestorHandler.getNextMessage(0, 40); + FileStorHandler::LockedMessage lock1 = filestorHandler.getNextMessage(0); CPPUNIT_ASSERT_EQUAL(30, (int)lock1.second->getPriority()); // New high-pri message comes in @@ -1118,20 +1109,20 @@ FileStorManagerTest::testHandlerPriorityBlocking() cmd->setPriority(15); filestorHandler.schedule(cmd, 0); - FileStorHandler::LockedMessage lock2 = filestorHandler.getNextMessage(0, 20); + FileStorHandler::LockedMessage lock2 = filestorHandler.getNextMessage(0); CPPUNIT_ASSERT_EQUAL(15, (int)lock2.second->getPriority()); LOG(debug, "Waiting for request that should time out"); - FileStorHandler::LockedMessage lock3 = filestorHandler.getNextMessage(0, 255); + FileStorHandler::LockedMessage lock3 = filestorHandler.getNextMessage(0); LOG(debug, "Got request that should time out"); CPPUNIT_ASSERT(lock3.second.get() == NULL); } { - FileStorHandler::LockedMessage lock1 = filestorHandler.getNextMessage(0, 255); + FileStorHandler::LockedMessage lock1 = filestorHandler.getNextMessage(0); CPPUNIT_ASSERT_EQUAL(45, (int)lock1.second->getPriority()); - FileStorHandler::LockedMessage lock = filestorHandler.getNextMessage(0, 255); + FileStorHandler::LockedMessage lock = filestorHandler.getNextMessage(0); CPPUNIT_ASSERT_EQUAL(60, (int)lock.second->getPriority()); } LOG(debug, "Test done"); @@ -1150,7 +1141,7 @@ public: : _handler(handler), pause(false), done(false), gotoperation(false) {} void run() override { - FileStorHandler::LockedMessage msg = _handler.getNextMessage(0, 255); + FileStorHandler::LockedMessage msg = _handler.getNextMessage(0); gotoperation = true; while (!done) { @@ -1229,7 +1220,7 @@ FileStorManagerTest::testHandlerPriorityPreempt() } { - FileStorHandler::LockedMessage lock1 = filestorHandler.getNextMessage(0, 20); + FileStorHandler::LockedMessage lock1 = filestorHandler.getNextMessage(0); CPPUNIT_ASSERT_EQUAL(20, (int)lock1.second->getPriority()); thread.pause = true; @@ -1273,10 +1264,10 @@ FileStorManagerTest::testPriority() _node->getComponentRegister(), 255, 0); std::unique_ptr<DiskThread> thread(createThread( *config, *_node, _node->getPersistenceProvider(), - filestorHandler, *metrics.disks[0]->threads[0], 0, 25)); + filestorHandler, *metrics.disks[0]->threads[0], 0)); std::unique_ptr<DiskThread> thread2(createThread( *config, *_node, _node->getPersistenceProvider(), - filestorHandler, *metrics.disks[0]->threads[1], 0, 255)); + filestorHandler, *metrics.disks[0]->threads[1], 0)); // Creating documents to test with. Different gids, 2 locations. std::vector<document::Document::SP > documents; @@ -1342,9 +1333,8 @@ FileStorManagerTest::testPriority() CPPUNIT_ASSERT_EQUAL(uint64_t(documents.size()), metrics.disks[0]->threads[0]->operations.getValue() + metrics.disks[0]->threads[1]->operations.getValue()); - CPPUNIT_ASSERT(metrics.disks[0]->threads[0]->operations.getValue() <= 13); - // Closing file stor handler before threads are deleted, such that - // file stor threads getNextMessage calls returns. + // Closing file stor handler before threads are deleted, such that + // file stor threads getNextMessage calls returns. filestorHandler.close(); } @@ -1367,7 +1357,7 @@ FileStorManagerTest::testSplit1() _node->getComponentRegister(), 255, 0); std::unique_ptr<DiskThread> thread(createThread( *config, *_node, _node->getPersistenceProvider(), - filestorHandler, *metrics.disks[0]->threads[0], 0, 255)); + filestorHandler, *metrics.disks[0]->threads[0], 0)); // Creating documents to test with. Different gids, 2 locations. std::vector<document::Document::SP > documents; for (uint32_t i=0; i<20; ++i) { @@ -1544,7 +1534,7 @@ FileStorManagerTest::testSplitSingleGroup() std::unique_ptr<DiskThread> thread(createThread( *config, *_node, _node->getPersistenceProvider(), - filestorHandler, *metrics.disks[0]->threads[0], 0, 255)); + filestorHandler, *metrics.disks[0]->threads[0], 0)); // Creating documents to test with. Different gids, 2 locations. std::vector<document::Document::SP > documents; for (uint32_t i=0; i<20; ++i) { @@ -1677,7 +1667,7 @@ FileStorManagerTest::testSplitEmptyTargetWithRemappedOps() _node->getComponentRegister(), 255, 0); std::unique_ptr<DiskThread> thread(createThread( *config, *_node, _node->getPersistenceProvider(), - filestorHandler, *metrics.disks[0]->threads[0], 0, 255)); + filestorHandler, *metrics.disks[0]->threads[0], 0)); document::BucketId source(16, 0x10001); @@ -1755,7 +1745,7 @@ FileStorManagerTest::testNotifyOnSplitSourceOwnershipChanged() _node->getComponentRegister(), 255, 0); std::unique_ptr<DiskThread> thread(createThread( *config, *_node, _node->getPersistenceProvider(), - filestorHandler, *metrics.disks[0]->threads[0], 0, 255)); + filestorHandler, *metrics.disks[0]->threads[0], 0)); document::BucketId source(getFirstBucketNotOwnedByDistributor(0)); createBucket(source, 0); @@ -1805,7 +1795,7 @@ FileStorManagerTest::testJoin() _node->getComponentRegister(), 255, 0); std::unique_ptr<DiskThread> thread(createThread( *config, *_node, _node->getPersistenceProvider(), - filestorHandler, *metrics.disks[0]->threads[0], 0, 255)); + filestorHandler, *metrics.disks[0]->threads[0], 0)); // Creating documents to test with. Different gids, 2 locations. std::vector<document::Document::SP > documents; for (uint32_t i=0; i<20; ++i) { diff --git a/storage/src/tests/persistence/persistencequeuetest.cpp b/storage/src/tests/persistence/persistencequeuetest.cpp index 77e0ed6b71c..8d18f75d3db 100644 --- a/storage/src/tests/persistence/persistencequeuetest.cpp +++ b/storage/src/tests/persistence/persistencequeuetest.cpp @@ -89,17 +89,15 @@ PersistenceQueueTest::testFetchNextUnlockedMessageIfBucketLocked() filestorHandler.schedule(createPut(1234, 1), 0); filestorHandler.schedule(createPut(5432, 0), 0); - auto lock0 = filestorHandler.getNextMessage(0, 255); + auto lock0 = filestorHandler.getNextMessage(0); CPPUNIT_ASSERT(lock0.first.get()); - CPPUNIT_ASSERT_EQUAL( - document::BucketId(16, 1234), - dynamic_cast<api::PutCommand&>(*lock0.second).getBucketId()); + CPPUNIT_ASSERT_EQUAL(document::BucketId(16, 1234), + dynamic_cast<api::PutCommand&>(*lock0.second).getBucketId()); - auto lock1 = filestorHandler.getNextMessage(0, 255); + auto lock1 = filestorHandler.getNextMessage(0); CPPUNIT_ASSERT(lock1.first.get()); - CPPUNIT_ASSERT_EQUAL( - document::BucketId(16, 5432), - dynamic_cast<api::PutCommand&>(*lock1.second).getBucketId()); + CPPUNIT_ASSERT_EQUAL(document::BucketId(16, 5432), + dynamic_cast<api::PutCommand&>(*lock1.second).getBucketId()); } } // namespace storage diff --git a/storage/src/tests/persistence/persistencetestutils.cpp b/storage/src/tests/persistence/persistencetestutils.cpp index 9d3a3d5f008..e0769795bed 100644 --- a/storage/src/tests/persistence/persistencetestutils.cpp +++ b/storage/src/tests/persistence/persistencetestutils.cpp @@ -59,14 +59,8 @@ PersistenceTestEnvironment::PersistenceTestEnvironment(DiskCount numDisks, const for (uint32_t i = 0; i < numDisks; i++) { _diskEnvs.push_back( std::unique_ptr<PersistenceUtil>( - new PersistenceUtil( - _config.getConfigId(), - _node.getComponentRegister(), - *_handler, - *_metrics.disks[i]->threads[0], - i, - 255, - _node.getPersistenceProvider()))); + new PersistenceUtil(_config.getConfigId(), _node.getComponentRegister(), + *_handler, *_metrics.disks[i]->threads[0], i, _node.getPersistenceProvider()))); } } @@ -79,8 +73,7 @@ PersistenceTestUtils::~PersistenceTestUtils() } std::string -PersistenceTestUtils::dumpBucket(const document::BucketId& bid, - uint16_t disk) { +PersistenceTestUtils::dumpBucket(const document::BucketId& bid, uint16_t disk) { return dynamic_cast<spi::dummy::DummyPersistence&>(_env->_node.getPersistenceProvider()).dumpBucket(makeSpiBucket(bid, spi::PartitionId(disk))); } @@ -92,14 +85,9 @@ PersistenceTestUtils::setupDisks(uint32_t numDisks) { std::unique_ptr<PersistenceThread> PersistenceTestUtils::createPersistenceThread(uint32_t disk) { - return std::unique_ptr<PersistenceThread>( - new PersistenceThread(_env->_node.getComponentRegister(), - _env->_config.getConfigId(), - getPersistenceProvider(), - getEnv()._fileStorHandler, - getEnv()._metrics, - disk, - 255)); + return std::make_unique<PersistenceThread>(_env->_node.getComponentRegister(), _env->_config.getConfigId(), + getPersistenceProvider(), getEnv()._fileStorHandler, + getEnv()._metrics, disk); } document::Document::SP diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp index e23bfda192c..b53a2a361b3 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp @@ -63,17 +63,15 @@ FileStorHandler::pause(uint16_t disk, uint8_t priority) const { } FileStorHandler::LockedMessage -FileStorHandler::getNextMessage(uint16_t thread, uint8_t lowestPriority) +FileStorHandler::getNextMessage(uint16_t thread) { - return _impl->getNextMessage(thread, lowestPriority); + return _impl->getNextMessage(thread); } FileStorHandler::LockedMessage & -FileStorHandler::getNextMessage(uint16_t thread, - LockedMessage& lck, - uint8_t lowestPriority) +FileStorHandler::getNextMessage(uint16_t thread, LockedMessage& lck) { - return _impl->getNextMessage(thread, lck, lowestPriority); + return _impl->getNextMessage(thread, lck); } FileStorHandler::BucketLockInterface::SP @@ -89,30 +87,23 @@ FileStorHandler::remapQueueAfterDiskMove( { RemapInfo target(bucket, targetDisk); - _impl->remapQueue(RemapInfo(bucket, sourceDisk), target, - FileStorHandlerImpl::MOVE); + _impl->remapQueue(RemapInfo(bucket, sourceDisk), target, FileStorHandlerImpl::MOVE); } void -FileStorHandler::remapQueueAfterJoin( - const RemapInfo& source, - RemapInfo& target) +FileStorHandler::remapQueueAfterJoin(const RemapInfo& source,RemapInfo& target) { _impl->remapQueue(source, target, FileStorHandlerImpl::JOIN); } void -FileStorHandler::remapQueueAfterSplit( - const RemapInfo& source, - RemapInfo& target1, - RemapInfo& target2) +FileStorHandler::remapQueueAfterSplit(const RemapInfo& source,RemapInfo& target1, RemapInfo& target2) { _impl->remapQueue(source, target1, target2, FileStorHandlerImpl::SPLIT); } void -FileStorHandler::failOperations(const document::Bucket &bucket, - uint16_t fromDisk, const api::ReturnCode& err) +FileStorHandler::failOperations(const document::Bucket &bucket, uint16_t fromDisk, const api::ReturnCode& err) { _impl->failOperations(bucket, fromDisk, err); } @@ -130,8 +121,7 @@ FileStorHandler::sendReply(const api::StorageReply::SP& msg) } void -FileStorHandler::getStatus(std::ostream& out, - const framework::HttpUrlPath& path) const +FileStorHandler::getStatus(std::ostream& out, const framework::HttpUrlPath& path) const { _impl->getStatus(out, path); } @@ -149,8 +139,7 @@ FileStorHandler::getQueueSize(uint16_t disk) const } void -FileStorHandler::addMergeStatus(const document::Bucket& bucket, - MergeStatus::SP ms) +FileStorHandler::addMergeStatus(const document::Bucket& bucket, MergeStatus::SP ms) { return _impl->addMergeStatus(bucket, ms); } diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h index 6e2d6a0fc07..8fc528b5ad7 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h @@ -131,16 +131,13 @@ public: * Used by file stor threads to get their next message to process. * * @param disk The disk to get messages for - * @param lowestPriority The lowest priority of operation we should return */ - LockedMessage getNextMessage(uint16_t disk, uint8_t lowestPriority); + LockedMessage getNextMessage(uint16_t disk); /** * Returns the next message for the same bucket. */ - LockedMessage & getNextMessage(uint16_t disk, - LockedMessage& lock, - uint8_t lowestPriority); + LockedMessage & getNextMessage(uint16_t disk, LockedMessage& lock); /** * Lock a bucket. By default, each file stor thread has the locks of all diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp index c8b5c71ee2e..0793d23b862 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp @@ -398,7 +398,7 @@ FileStorHandlerImpl::abortQueuedOperations( bool FileStorHandlerImpl::hasBlockingOperations(const Disk& t) const { - for (auto& lockedBucket : t.lockedBuckets) { + for (const auto & lockedBucket : t.lockedBuckets) { if (lockedBucket.second.priority <= _minPriorityToBeBlocking) { return true; } @@ -419,9 +419,7 @@ FileStorHandlerImpl::updateMetrics(const MetricLockGuard &) } FileStorHandler::LockedMessage & -FileStorHandlerImpl::getNextMessage(uint16_t disk, - FileStorHandler::LockedMessage& lck, - uint8_t maxPriority) +FileStorHandlerImpl::getNextMessage(uint16_t disk, FileStorHandler::LockedMessage& lck) { document::Bucket bucket(lck.first->getBucket()); @@ -452,7 +450,7 @@ FileStorHandlerImpl::getNextMessage(uint16_t disk, mbus::Trace& trace = m.getTrace(); // Priority is too low, not buffering any more. - if (m.getPriority() > maxPriority || m.getPriority() >= _maxPriorityToBlock) { + if (m.getPriority() >= _maxPriorityToBlock) { lck.second.reset(); return lck; } @@ -564,23 +562,10 @@ namespace { bucketIsLockedOnDisk(const document::Bucket &id, const FileStorHandlerImpl::Disk &t) { return (id.getBucketId().getRawId() != 0 && t.isLocked(id)); } - - /** - * Return whether msg has sufficiently high priority that a thread with - * a configured priority threshold of maxPriority can even run in. - * Often, operations such as streaming searches will have dedicated threads - * that refuse lower priority operations such as Puts etc. - */ - bool - operationHasHighEnoughPriorityToBeRun(const api::StorageMessage& msg, uint8_t maxPriority) - { - // NOTE: priority integral value 0 is considered highest pri. - return (msg.getPriority() <= maxPriority); - } } FileStorHandler::LockedMessage -FileStorHandlerImpl::getNextMessage(uint16_t disk, uint8_t maxPriority) +FileStorHandlerImpl::getNextMessage(uint16_t disk) { assert(disk < _diskInfo.size()); if (!tryHandlePause(disk)) { @@ -604,8 +589,7 @@ FileStorHandlerImpl::getNextMessage(uint16_t disk, uint8_t maxPriority) if (iter != end) { api::StorageMessage &m(*iter->_command); - if (operationHasHighEnoughPriorityToBeRun(m, maxPriority) - && ! operationBlockedByHigherPriorityThread(m, t) + if (! operationBlockedByHigherPriorityThread(m, t) && ! isPaused()) { return getMessage(lockGuard, t, idx, iter); diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h index 56ac9ea0577..0b1ab5661e2 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h @@ -159,11 +159,10 @@ public: bool schedule(const std::shared_ptr<api::StorageMessage>&, uint16_t disk); void pause(uint16_t disk, uint8_t priority) const; - FileStorHandler::LockedMessage getNextMessage(uint16_t disk, uint8_t lowestPriority); + FileStorHandler::LockedMessage getNextMessage(uint16_t disk); FileStorHandler::LockedMessage getMessage(vespalib::MonitorGuard & guard, Disk & t, PriorityIdx & idx, PriorityIdx::iterator iter); - FileStorHandler::LockedMessage & getNextMessage(uint16_t disk, FileStorHandler::LockedMessage& lock, - uint8_t lowestPriority); + FileStorHandler::LockedMessage & getNextMessage(uint16_t disk, FileStorHandler::LockedMessage& lock); enum Operation { MOVE, SPLIT, JOIN }; void remapQueue(const RemapInfo& source, RemapInfo& target, Operation op); diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp index 6644d09da7f..20c08acc01c 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp @@ -126,27 +126,12 @@ FileStorManager::configure(std::unique_ptr<vespa::config::content::StorFilestorC _config->maxPriorityToBlock, _config->minPriorityToBeBlocking)); for (uint32_t i=0; i<_component.getDiskCount(); ++i) { if (_partitions[i].isUp()) { - if (_config->threads.size() == 0) { - LOG(spam, "Setting up disk %u", i); - for (uint32_t j = 0; j < 4; j++) { - _disks[i].push_back(DiskThread::SP( - new PersistenceThread(_compReg, _configUri, *_provider, *_filestorHandler, - *_metrics->disks[i]->threads[j], i, 255))); - - } - for (uint32_t j = 4; j < 6; j++) { - _disks[i].push_back(DiskThread::SP( - new PersistenceThread(_compReg, _configUri, *_provider, *_filestorHandler, - *_metrics->disks[i]->threads[j], i, 100))); - } - } - - for (uint16_t j = 0; j < _config->threads.size(); j++) { - LOG(spam, "Setting up disk %u, thread %u with priority %d", - i, j, _config->threads[j].lowestpri); + size_t numThreads = (_config->threads.size() == 0) ? 6 : _config->threads.size(); + LOG(spam, "Setting up disk %u", i); + for (uint32_t j = 0; j < numThreads; j++) { _disks[i].push_back(DiskThread::SP( new PersistenceThread(_compReg, _configUri, *_provider, *_filestorHandler, - *_metrics->disks[i]->threads[j], i, _config->threads[j].lowestpri))); + *_metrics->disks[i]->threads[j], i))); } } else { diff --git a/storage/src/vespa/storage/persistence/persistencethread.cpp b/storage/src/vespa/storage/persistence/persistencethread.cpp index 6c4cd7b64d2..13d9d0ffbb6 100644 --- a/storage/src/vespa/storage/persistence/persistencethread.cpp +++ b/storage/src/vespa/storage/persistence/persistencethread.cpp @@ -21,9 +21,8 @@ PersistenceThread::PersistenceThread(ServiceLayerComponentRegister& compReg, spi::PersistenceProvider& provider, FileStorHandler& filestorHandler, FileStorThreadMetrics& metrics, - uint16_t deviceIndex, - uint8_t lowestPriority) - : _env(configUri, compReg, filestorHandler, metrics, deviceIndex, lowestPriority, provider), + uint16_t deviceIndex) + : _env(configUri, compReg, filestorHandler, metrics, deviceIndex, provider), _warnOnSlowOperations(5000), _spi(provider), _processAllHandler(_env, provider), @@ -139,9 +138,7 @@ PersistenceThread::handleRemove(api::RemoveCommand& cmd) spi::Timestamp(cmd.getTimestamp()), cmd.getDocumentId(), _context); if (checkForError(response, *tracker)) { - api::RemoveReply* reply(new api::RemoveReply( - cmd, response.wasFound() ? cmd.getTimestamp() : 0)); - tracker->setReply(api::StorageReply::SP(reply)); + tracker->setReply(std::make_shared<api::RemoveReply>(cmd, response.wasFound() ? cmd.getTimestamp() : 0)); } if (!response.wasFound()) { ++_env._metrics.remove[cmd.getLoadType()].notFound; @@ -165,9 +162,9 @@ PersistenceThread::handleUpdate(api::UpdateCommand& cmd) spi::Timestamp(cmd.getTimestamp()), cmd.getUpdate(), _context); if (checkForError(response, *tracker)) { - api::UpdateReply* reply = new api::UpdateReply(cmd); + auto reply = std::make_shared<api::UpdateReply>(cmd); reply->setOldTimestamp(response.getExistingTimestamp()); - tracker->setReply(api::StorageReply::SP(reply)); + tracker->setReply(std::move(reply)); } return tracker; } @@ -243,11 +240,9 @@ PersistenceThread::handleRevert(api::RevertCommand& cmd) _env._metrics.revert[cmd.getLoadType()], _env._component.getClock())); spi::Bucket b = spi::Bucket(cmd.getBucket(), spi::PartitionId(_env._partition)); - const std::vector<api::Timestamp> tokens = cmd.getRevertTokens(); - for (uint32_t i = 0; i < tokens.size(); ++i) { - spi::Result result = _spi.removeEntry(b, - spi::Timestamp(tokens[i]), - _context); + const std::vector<api::Timestamp> & tokens = cmd.getRevertTokens(); + for (const api::Timestamp & token : tokens) { + spi::Result result = _spi.removeEntry(b, spi::Timestamp(token), _context); } return tracker; } @@ -430,9 +425,7 @@ PersistenceThread::handleCreateIterator(CreateIteratorCommand& cmd) cmd.getIncludedVersions(), _context)); if (checkForError(result, *tracker)) { - tracker->setReply(CreateIteratorReply::SP( - new CreateIteratorReply( - cmd, spi::IteratorId(result.getIteratorId())))); + tracker->setReply(std::make_shared<CreateIteratorReply>(cmd, spi::IteratorId(result.getIteratorId()))); } return tracker; } @@ -519,8 +512,9 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd) // Ensure to take them in rising order. StorBucketDatabase::WrappedEntry sourceEntry(_env.getBucketDatabase(spiBucket.getBucket().getBucketSpace()).get( cmd.getBucketId(), "PersistenceThread::handleSplitBucket-source")); - api::SplitBucketReply* splitReply(new api::SplitBucketReply(cmd)); - tracker->setReply(api::StorageReply::SP(splitReply)); + auto reply = std::make_shared<api::SplitBucketReply>(cmd); + api::SplitBucketReply & splitReply = *reply; + tracker->setReply(std::move(reply)); typedef std::pair<StorBucketDatabase::WrappedEntry, FileStorHandler::RemapInfo> TargetInfo; @@ -581,7 +575,7 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd) createTarget.toString().c_str()); _spi.createBucket(createTarget, _context); } - splitReply->getSplitInfo().push_back( + splitReply.getSplitInfo().push_back( api::SplitBucketReply::Entry( targets[i].second.bucket.getBucketId(), targets[i].first->getBucketInfo())); @@ -1139,10 +1133,7 @@ void PersistenceThread::processMessages(FileStorHandler::LockedMessage & lock) trackers.push_back(std::move(tracker)); if (trackers.back()->getReply()->getResult().success()) { - _env._fileStorHandler.getNextMessage( - _env._partition, - lock, - _env._lowestPriority); + _env._fileStorHandler.getNextMessage(_env._partition, lock); } else { break; } @@ -1170,9 +1161,7 @@ PersistenceThread::run(framework::ThreadHandle& thread) { thread.registerTick(); - FileStorHandler::LockedMessage lock( - _env._fileStorHandler.getNextMessage( - _env._partition, _env._lowestPriority)); + FileStorHandler::LockedMessage lock(_env._fileStorHandler.getNextMessage(_env._partition)); if (lock.first.get()) { processMessages(lock); diff --git a/storage/src/vespa/storage/persistence/persistencethread.h b/storage/src/vespa/storage/persistence/persistencethread.h index 07fadb875cc..640614f7edf 100644 --- a/storage/src/vespa/storage/persistence/persistencethread.h +++ b/storage/src/vespa/storage/persistence/persistencethread.h @@ -21,7 +21,7 @@ class PersistenceThread final : public DiskThread, public Types public: PersistenceThread(ServiceLayerComponentRegister&, const config::ConfigUri & configUri, spi::PersistenceProvider& provider, FileStorHandler& filestorHandler, - FileStorThreadMetrics& metrics, uint16_t deviceIndex, uint8_t lowestPriority); + FileStorThreadMetrics& metrics, uint16_t deviceIndex); ~PersistenceThread(); /** Waits for current operation to be finished. */ diff --git a/storage/src/vespa/storage/persistence/persistenceutil.cpp b/storage/src/vespa/storage/persistence/persistenceutil.cpp index 8e96c05a66a..ec666925148 100644 --- a/storage/src/vespa/storage/persistence/persistenceutil.cpp +++ b/storage/src/vespa/storage/persistence/persistenceutil.cpp @@ -66,7 +66,6 @@ PersistenceUtil::PersistenceUtil( FileStorHandler& fileStorHandler, FileStorThreadMetrics& metrics, uint16_t partition, - uint8_t lowestPriority, spi::PersistenceProvider& provider) : _config(*config::ConfigGetter<vespa::config::content::StorFilestorConfig>::getConfig(configUri.getConfigId(), configUri.getContext())), _compReg(compReg), @@ -77,19 +76,15 @@ PersistenceUtil::PersistenceUtil( _metrics(metrics), _bucketFactory(_component.getBucketIdFactory()), _repo(_component.getTypeRepo()), - _lowestPriority(lowestPriority), _pauseHandler(), _spi(provider) { } -PersistenceUtil::~PersistenceUtil() -{ -} +PersistenceUtil::~PersistenceUtil() { } void -PersistenceUtil::updateBucketDatabase(const document::Bucket &bucket, - const api::BucketInfo& i) +PersistenceUtil::updateBucketDatabase(const document::Bucket &bucket, const api::BucketInfo& i) { // Update bucket database StorBucketDatabase::WrappedEntry entry(getBucketDatabase(bucket.getBucketSpace()).get( diff --git a/storage/src/vespa/storage/persistence/persistenceutil.h b/storage/src/vespa/storage/persistence/persistenceutil.h index 5126931bab6..c3bd706f4b7 100644 --- a/storage/src/vespa/storage/persistence/persistenceutil.h +++ b/storage/src/vespa/storage/persistence/persistenceutil.h @@ -70,7 +70,6 @@ struct PersistenceUtil { FileStorThreadMetrics& _metrics; const document::BucketIdFactory& _bucketFactory; const std::shared_ptr<document::DocumentTypeRepo> _repo; - uint8_t _lowestPriority; PauseHandler _pauseHandler; spi::PersistenceProvider& _spi; @@ -80,7 +79,6 @@ struct PersistenceUtil { FileStorHandler& fileStorHandler, FileStorThreadMetrics& metrics, uint16_t partition, - uint8_t lowestPriority, spi::PersistenceProvider& provider); ~PersistenceUtil(); |