summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@oath.com>2018-03-16 15:08:57 +0100
committerHenning Baldersheim <balder@oath.com>2018-03-19 13:01:16 +0100
commit2a788ec1b3900d7ba853b3f5f89e49df7e016c7e (patch)
tree4154c20716ce69e501f17232372dc8c671fe0db2
parent518d8a9a8bac25f96228dac74a2d74e65cc7e460 (diff)
Remove thread priority concept, has not added anything but complexity for a very long time.
-rw-r--r--storage/src/tests/persistence/filestorage/filestormanagertest.cpp92
-rw-r--r--storage/src/tests/persistence/persistencequeuetest.cpp14
-rw-r--r--storage/src/tests/persistence/persistencetestutils.cpp24
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp31
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandler.h7
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp26
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h5
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp23
-rw-r--r--storage/src/vespa/storage/persistence/persistencethread.cpp41
-rw-r--r--storage/src/vespa/storage/persistence/persistencethread.h2
-rw-r--r--storage/src/vespa/storage/persistence/persistenceutil.cpp9
-rw-r--r--storage/src/vespa/storage/persistence/persistenceutil.h2
12 files changed, 94 insertions, 182 deletions
diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
index 8d03c9de54c..318881364d7 100644
--- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
+++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
@@ -265,15 +265,14 @@ std::unique_ptr<DiskThread> createThread(vdstestlib::DirConfig& config,
spi::PersistenceProvider& provider,
FileStorHandler& filestorHandler,
FileStorThreadMetrics& metrics,
- uint16_t deviceIndex,
- uint8_t lowestPriority)
+ uint16_t deviceIndex)
{
(void) config;
std::unique_ptr<DiskThread> disk;
disk.reset(new PersistenceThread(
node.getComponentRegister(), config.getConfigId(), provider,
filestorHandler, metrics,
- deviceIndex, lowestPriority));
+ deviceIndex));
return disk;
}
@@ -645,23 +644,18 @@ FileStorManagerTest::testHandlerPriority()
// Populate bucket with the given data
for (uint32_t i = 1; i < 6; i++) {
- std::shared_ptr<api::PutCommand> cmd(
- new api::PutCommand(makeDocumentBucket(bucket), doc, 100));
- std::unique_ptr<api::StorageMessageAddress> address(
- new api::StorageMessageAddress(
- "storage", lib::NodeType::STORAGE, 3));
+ auto cmd = std::make_shared<api::PutCommand>(makeDocumentBucket(bucket), doc, 100);
+ auto address = std::make_shared<api::StorageMessageAddress>("storage", lib::NodeType::STORAGE, 3);
cmd->setAddress(*address);
cmd->setPriority(i * 15);
filestorHandler.schedule(cmd, 0);
}
- CPPUNIT_ASSERT_EQUAL(15, (int)filestorHandler.getNextMessage(0, 20).second->getPriority());
- CPPUNIT_ASSERT(filestorHandler.getNextMessage(0, 20).second.get() == NULL);
- CPPUNIT_ASSERT_EQUAL(30, (int)filestorHandler.getNextMessage(0, 50).second->getPriority());
- CPPUNIT_ASSERT_EQUAL(45, (int)filestorHandler.getNextMessage(0, 50).second->getPriority());
- CPPUNIT_ASSERT(filestorHandler.getNextMessage(0, 50).second.get() == NULL);
- CPPUNIT_ASSERT_EQUAL(60, (int)filestorHandler.getNextMessage(0, 255).second->getPriority());
- CPPUNIT_ASSERT_EQUAL(75, (int)filestorHandler.getNextMessage(0, 255).second->getPriority());
+ CPPUNIT_ASSERT_EQUAL(15, (int)filestorHandler.getNextMessage(0).second->getPriority());
+ CPPUNIT_ASSERT_EQUAL(30, (int)filestorHandler.getNextMessage(0).second->getPriority());
+ CPPUNIT_ASSERT_EQUAL(45, (int)filestorHandler.getNextMessage(0).second->getPriority());
+ CPPUNIT_ASSERT_EQUAL(60, (int)filestorHandler.getNextMessage(0).second->getPriority());
+ CPPUNIT_ASSERT_EQUAL(75, (int)filestorHandler.getNextMessage(0).second->getPriority());
}
class MessagePusherThread : public document::Runnable
@@ -678,11 +672,9 @@ public:
void run() override {
while (!_done) {
document::BucketIdFactory factory;
- document::BucketId bucket(16, factory.getBucketId(
- _doc->getId()).getRawId());
+ document::BucketId bucket(16, factory.getBucketId(_doc->getId()).getRawId());
- std::shared_ptr<api::PutCommand> cmd(
- new api::PutCommand(makeDocumentBucket(bucket), _doc, 100));
+ auto cmd = std::make_shared<api::PutCommand>(makeDocumentBucket(bucket), _doc, 100);
_handler.schedule(cmd, 0);
FastOS_Thread::Sleep(1);
}
@@ -694,7 +686,7 @@ public:
MessagePusherThread::MessagePusherThread(FileStorHandler& handler, Document::SP doc)
: _handler(handler), _doc(doc), _done(false), _threadDone(false)
{}
-MessagePusherThread::~MessagePusherThread() {}
+MessagePusherThread::~MessagePusherThread() = default;
class MessageFetchingThread : public document::Runnable {
public:
@@ -711,7 +703,7 @@ public:
void run() override {
while (!_done) {
- FileStorHandler::LockedMessage msg = _handler.getNextMessage(0, 255);
+ FileStorHandler::LockedMessage msg = _handler.getNextMessage(0);
if (msg.second.get()) {
uint32_t originalConfig = _config.load();
_fetchedCount++;
@@ -824,15 +816,15 @@ FileStorManagerTest::testHandlerPause()
filestorHandler.schedule(cmd, 0);
}
- CPPUNIT_ASSERT_EQUAL(15, (int)filestorHandler.getNextMessage(0, 255).second->getPriority());
+ CPPUNIT_ASSERT_EQUAL(15, (int)filestorHandler.getNextMessage(0).second->getPriority());
{
ResumeGuard guard = filestorHandler.pause();
(void)guard;
- CPPUNIT_ASSERT(filestorHandler.getNextMessage(0, 255).second.get() == NULL);
+ CPPUNIT_ASSERT(filestorHandler.getNextMessage(0).second.get() == NULL);
}
- CPPUNIT_ASSERT_EQUAL(30, (int)filestorHandler.getNextMessage(0, 255).second->getPriority());
+ CPPUNIT_ASSERT_EQUAL(30, (int)filestorHandler.getNextMessage(0).second->getPriority());
}
namespace {
@@ -957,21 +949,21 @@ FileStorManagerTest::testHandlerMulti()
}
{
- FileStorHandler::LockedMessage lock = filestorHandler.getNextMessage(0, 255);
+ FileStorHandler::LockedMessage lock = filestorHandler.getNextMessage(0);
CPPUNIT_ASSERT_EQUAL((uint64_t)1, getPutTime(lock.second));
- lock = filestorHandler.getNextMessage(0, lock, 255);
+ lock = filestorHandler.getNextMessage(0, lock);
CPPUNIT_ASSERT_EQUAL((uint64_t)2, getPutTime(lock.second));
- lock = filestorHandler.getNextMessage(0, lock, 255);
+ lock = filestorHandler.getNextMessage(0, lock);
CPPUNIT_ASSERT_EQUAL((uint64_t)3, getPutTime(lock.second));
}
{
- FileStorHandler::LockedMessage lock = filestorHandler.getNextMessage(0, 255);
+ FileStorHandler::LockedMessage lock = filestorHandler.getNextMessage(0);
CPPUNIT_ASSERT_EQUAL((uint64_t)11, getPutTime(lock.second));
- lock = filestorHandler.getNextMessage(0, lock, 255);
+ lock = filestorHandler.getNextMessage(0, lock);
CPPUNIT_ASSERT_EQUAL((uint64_t)12, getPutTime(lock.second));
}
}
@@ -984,8 +976,7 @@ FileStorManagerTest::testHandlerTimeout()
// Setup a filestorthread to test
DummyStorageLink top;
DummyStorageLink *dummyManager;
- top.push_back(std::unique_ptr<StorageLink>(
- dummyManager = new DummyStorageLink));
+ top.push_back(std::unique_ptr<StorageLink>(dummyManager = new DummyStorageLink));
top.open();
ForwardingMessageSender messageSender(*dummyManager);
@@ -1036,7 +1027,7 @@ FileStorManagerTest::testHandlerTimeout()
FastOS_Thread::Sleep(51);
for (;;) {
- auto lock = filestorHandler.getNextMessage(0, 255);
+ auto lock = filestorHandler.getNextMessage(0);
if (lock.first.get()) {
CPPUNIT_ASSERT_EQUAL(uint8_t(200), lock.second->getPriority());
break;
@@ -1092,17 +1083,17 @@ FileStorManagerTest::testHandlerPriorityBlocking()
}
{
- FileStorHandler::LockedMessage lock1 = filestorHandler.getNextMessage(0, 20);
+ FileStorHandler::LockedMessage lock1 = filestorHandler.getNextMessage(0);
CPPUNIT_ASSERT_EQUAL(15, (int)lock1.second->getPriority());
LOG(debug, "Waiting for request that should time out");
- FileStorHandler::LockedMessage lock2 = filestorHandler.getNextMessage(0, 30);
+ FileStorHandler::LockedMessage lock2 = filestorHandler.getNextMessage(0);
LOG(debug, "Got request that should time out");
CPPUNIT_ASSERT(lock2.second.get() == NULL);
}
{
- FileStorHandler::LockedMessage lock1 = filestorHandler.getNextMessage(0, 40);
+ FileStorHandler::LockedMessage lock1 = filestorHandler.getNextMessage(0);
CPPUNIT_ASSERT_EQUAL(30, (int)lock1.second->getPriority());
// New high-pri message comes in
@@ -1118,20 +1109,20 @@ FileStorManagerTest::testHandlerPriorityBlocking()
cmd->setPriority(15);
filestorHandler.schedule(cmd, 0);
- FileStorHandler::LockedMessage lock2 = filestorHandler.getNextMessage(0, 20);
+ FileStorHandler::LockedMessage lock2 = filestorHandler.getNextMessage(0);
CPPUNIT_ASSERT_EQUAL(15, (int)lock2.second->getPriority());
LOG(debug, "Waiting for request that should time out");
- FileStorHandler::LockedMessage lock3 = filestorHandler.getNextMessage(0, 255);
+ FileStorHandler::LockedMessage lock3 = filestorHandler.getNextMessage(0);
LOG(debug, "Got request that should time out");
CPPUNIT_ASSERT(lock3.second.get() == NULL);
}
{
- FileStorHandler::LockedMessage lock1 = filestorHandler.getNextMessage(0, 255);
+ FileStorHandler::LockedMessage lock1 = filestorHandler.getNextMessage(0);
CPPUNIT_ASSERT_EQUAL(45, (int)lock1.second->getPriority());
- FileStorHandler::LockedMessage lock = filestorHandler.getNextMessage(0, 255);
+ FileStorHandler::LockedMessage lock = filestorHandler.getNextMessage(0);
CPPUNIT_ASSERT_EQUAL(60, (int)lock.second->getPriority());
}
LOG(debug, "Test done");
@@ -1150,7 +1141,7 @@ public:
: _handler(handler), pause(false), done(false), gotoperation(false) {}
void run() override {
- FileStorHandler::LockedMessage msg = _handler.getNextMessage(0, 255);
+ FileStorHandler::LockedMessage msg = _handler.getNextMessage(0);
gotoperation = true;
while (!done) {
@@ -1229,7 +1220,7 @@ FileStorManagerTest::testHandlerPriorityPreempt()
}
{
- FileStorHandler::LockedMessage lock1 = filestorHandler.getNextMessage(0, 20);
+ FileStorHandler::LockedMessage lock1 = filestorHandler.getNextMessage(0);
CPPUNIT_ASSERT_EQUAL(20, (int)lock1.second->getPriority());
thread.pause = true;
@@ -1273,10 +1264,10 @@ FileStorManagerTest::testPriority()
_node->getComponentRegister(), 255, 0);
std::unique_ptr<DiskThread> thread(createThread(
*config, *_node, _node->getPersistenceProvider(),
- filestorHandler, *metrics.disks[0]->threads[0], 0, 25));
+ filestorHandler, *metrics.disks[0]->threads[0], 0));
std::unique_ptr<DiskThread> thread2(createThread(
*config, *_node, _node->getPersistenceProvider(),
- filestorHandler, *metrics.disks[0]->threads[1], 0, 255));
+ filestorHandler, *metrics.disks[0]->threads[1], 0));
// Creating documents to test with. Different gids, 2 locations.
std::vector<document::Document::SP > documents;
@@ -1342,9 +1333,8 @@ FileStorManagerTest::testPriority()
CPPUNIT_ASSERT_EQUAL(uint64_t(documents.size()),
metrics.disks[0]->threads[0]->operations.getValue()
+ metrics.disks[0]->threads[1]->operations.getValue());
- CPPUNIT_ASSERT(metrics.disks[0]->threads[0]->operations.getValue() <= 13);
- // Closing file stor handler before threads are deleted, such that
- // file stor threads getNextMessage calls returns.
+ // Closing file stor handler before threads are deleted, such that
+ // file stor threads getNextMessage calls returns.
filestorHandler.close();
}
@@ -1367,7 +1357,7 @@ FileStorManagerTest::testSplit1()
_node->getComponentRegister(), 255, 0);
std::unique_ptr<DiskThread> thread(createThread(
*config, *_node, _node->getPersistenceProvider(),
- filestorHandler, *metrics.disks[0]->threads[0], 0, 255));
+ filestorHandler, *metrics.disks[0]->threads[0], 0));
// Creating documents to test with. Different gids, 2 locations.
std::vector<document::Document::SP > documents;
for (uint32_t i=0; i<20; ++i) {
@@ -1544,7 +1534,7 @@ FileStorManagerTest::testSplitSingleGroup()
std::unique_ptr<DiskThread> thread(createThread(
*config, *_node, _node->getPersistenceProvider(),
- filestorHandler, *metrics.disks[0]->threads[0], 0, 255));
+ filestorHandler, *metrics.disks[0]->threads[0], 0));
// Creating documents to test with. Different gids, 2 locations.
std::vector<document::Document::SP > documents;
for (uint32_t i=0; i<20; ++i) {
@@ -1677,7 +1667,7 @@ FileStorManagerTest::testSplitEmptyTargetWithRemappedOps()
_node->getComponentRegister(), 255, 0);
std::unique_ptr<DiskThread> thread(createThread(
*config, *_node, _node->getPersistenceProvider(),
- filestorHandler, *metrics.disks[0]->threads[0], 0, 255));
+ filestorHandler, *metrics.disks[0]->threads[0], 0));
document::BucketId source(16, 0x10001);
@@ -1755,7 +1745,7 @@ FileStorManagerTest::testNotifyOnSplitSourceOwnershipChanged()
_node->getComponentRegister(), 255, 0);
std::unique_ptr<DiskThread> thread(createThread(
*config, *_node, _node->getPersistenceProvider(),
- filestorHandler, *metrics.disks[0]->threads[0], 0, 255));
+ filestorHandler, *metrics.disks[0]->threads[0], 0));
document::BucketId source(getFirstBucketNotOwnedByDistributor(0));
createBucket(source, 0);
@@ -1805,7 +1795,7 @@ FileStorManagerTest::testJoin()
_node->getComponentRegister(), 255, 0);
std::unique_ptr<DiskThread> thread(createThread(
*config, *_node, _node->getPersistenceProvider(),
- filestorHandler, *metrics.disks[0]->threads[0], 0, 255));
+ filestorHandler, *metrics.disks[0]->threads[0], 0));
// Creating documents to test with. Different gids, 2 locations.
std::vector<document::Document::SP > documents;
for (uint32_t i=0; i<20; ++i) {
diff --git a/storage/src/tests/persistence/persistencequeuetest.cpp b/storage/src/tests/persistence/persistencequeuetest.cpp
index 77e0ed6b71c..8d18f75d3db 100644
--- a/storage/src/tests/persistence/persistencequeuetest.cpp
+++ b/storage/src/tests/persistence/persistencequeuetest.cpp
@@ -89,17 +89,15 @@ PersistenceQueueTest::testFetchNextUnlockedMessageIfBucketLocked()
filestorHandler.schedule(createPut(1234, 1), 0);
filestorHandler.schedule(createPut(5432, 0), 0);
- auto lock0 = filestorHandler.getNextMessage(0, 255);
+ auto lock0 = filestorHandler.getNextMessage(0);
CPPUNIT_ASSERT(lock0.first.get());
- CPPUNIT_ASSERT_EQUAL(
- document::BucketId(16, 1234),
- dynamic_cast<api::PutCommand&>(*lock0.second).getBucketId());
+ CPPUNIT_ASSERT_EQUAL(document::BucketId(16, 1234),
+ dynamic_cast<api::PutCommand&>(*lock0.second).getBucketId());
- auto lock1 = filestorHandler.getNextMessage(0, 255);
+ auto lock1 = filestorHandler.getNextMessage(0);
CPPUNIT_ASSERT(lock1.first.get());
- CPPUNIT_ASSERT_EQUAL(
- document::BucketId(16, 5432),
- dynamic_cast<api::PutCommand&>(*lock1.second).getBucketId());
+ CPPUNIT_ASSERT_EQUAL(document::BucketId(16, 5432),
+ dynamic_cast<api::PutCommand&>(*lock1.second).getBucketId());
}
} // namespace storage
diff --git a/storage/src/tests/persistence/persistencetestutils.cpp b/storage/src/tests/persistence/persistencetestutils.cpp
index 9d3a3d5f008..e0769795bed 100644
--- a/storage/src/tests/persistence/persistencetestutils.cpp
+++ b/storage/src/tests/persistence/persistencetestutils.cpp
@@ -59,14 +59,8 @@ PersistenceTestEnvironment::PersistenceTestEnvironment(DiskCount numDisks, const
for (uint32_t i = 0; i < numDisks; i++) {
_diskEnvs.push_back(
std::unique_ptr<PersistenceUtil>(
- new PersistenceUtil(
- _config.getConfigId(),
- _node.getComponentRegister(),
- *_handler,
- *_metrics.disks[i]->threads[0],
- i,
- 255,
- _node.getPersistenceProvider())));
+ new PersistenceUtil(_config.getConfigId(), _node.getComponentRegister(),
+ *_handler, *_metrics.disks[i]->threads[0], i, _node.getPersistenceProvider())));
}
}
@@ -79,8 +73,7 @@ PersistenceTestUtils::~PersistenceTestUtils()
}
std::string
-PersistenceTestUtils::dumpBucket(const document::BucketId& bid,
- uint16_t disk) {
+PersistenceTestUtils::dumpBucket(const document::BucketId& bid, uint16_t disk) {
return dynamic_cast<spi::dummy::DummyPersistence&>(_env->_node.getPersistenceProvider()).dumpBucket(makeSpiBucket(bid, spi::PartitionId(disk)));
}
@@ -92,14 +85,9 @@ PersistenceTestUtils::setupDisks(uint32_t numDisks) {
std::unique_ptr<PersistenceThread>
PersistenceTestUtils::createPersistenceThread(uint32_t disk)
{
- return std::unique_ptr<PersistenceThread>(
- new PersistenceThread(_env->_node.getComponentRegister(),
- _env->_config.getConfigId(),
- getPersistenceProvider(),
- getEnv()._fileStorHandler,
- getEnv()._metrics,
- disk,
- 255));
+ return std::make_unique<PersistenceThread>(_env->_node.getComponentRegister(), _env->_config.getConfigId(),
+ getPersistenceProvider(), getEnv()._fileStorHandler,
+ getEnv()._metrics, disk);
}
document::Document::SP
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp
index e23bfda192c..b53a2a361b3 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp
@@ -63,17 +63,15 @@ FileStorHandler::pause(uint16_t disk, uint8_t priority) const {
}
FileStorHandler::LockedMessage
-FileStorHandler::getNextMessage(uint16_t thread, uint8_t lowestPriority)
+FileStorHandler::getNextMessage(uint16_t thread)
{
- return _impl->getNextMessage(thread, lowestPriority);
+ return _impl->getNextMessage(thread);
}
FileStorHandler::LockedMessage &
-FileStorHandler::getNextMessage(uint16_t thread,
- LockedMessage& lck,
- uint8_t lowestPriority)
+FileStorHandler::getNextMessage(uint16_t thread, LockedMessage& lck)
{
- return _impl->getNextMessage(thread, lck, lowestPriority);
+ return _impl->getNextMessage(thread, lck);
}
FileStorHandler::BucketLockInterface::SP
@@ -89,30 +87,23 @@ FileStorHandler::remapQueueAfterDiskMove(
{
RemapInfo target(bucket, targetDisk);
- _impl->remapQueue(RemapInfo(bucket, sourceDisk), target,
- FileStorHandlerImpl::MOVE);
+ _impl->remapQueue(RemapInfo(bucket, sourceDisk), target, FileStorHandlerImpl::MOVE);
}
void
-FileStorHandler::remapQueueAfterJoin(
- const RemapInfo& source,
- RemapInfo& target)
+FileStorHandler::remapQueueAfterJoin(const RemapInfo& source,RemapInfo& target)
{
_impl->remapQueue(source, target, FileStorHandlerImpl::JOIN);
}
void
-FileStorHandler::remapQueueAfterSplit(
- const RemapInfo& source,
- RemapInfo& target1,
- RemapInfo& target2)
+FileStorHandler::remapQueueAfterSplit(const RemapInfo& source,RemapInfo& target1, RemapInfo& target2)
{
_impl->remapQueue(source, target1, target2, FileStorHandlerImpl::SPLIT);
}
void
-FileStorHandler::failOperations(const document::Bucket &bucket,
- uint16_t fromDisk, const api::ReturnCode& err)
+FileStorHandler::failOperations(const document::Bucket &bucket, uint16_t fromDisk, const api::ReturnCode& err)
{
_impl->failOperations(bucket, fromDisk, err);
}
@@ -130,8 +121,7 @@ FileStorHandler::sendReply(const api::StorageReply::SP& msg)
}
void
-FileStorHandler::getStatus(std::ostream& out,
- const framework::HttpUrlPath& path) const
+FileStorHandler::getStatus(std::ostream& out, const framework::HttpUrlPath& path) const
{
_impl->getStatus(out, path);
}
@@ -149,8 +139,7 @@ FileStorHandler::getQueueSize(uint16_t disk) const
}
void
-FileStorHandler::addMergeStatus(const document::Bucket& bucket,
- MergeStatus::SP ms)
+FileStorHandler::addMergeStatus(const document::Bucket& bucket, MergeStatus::SP ms)
{
return _impl->addMergeStatus(bucket, ms);
}
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
index 6e2d6a0fc07..8fc528b5ad7 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
@@ -131,16 +131,13 @@ public:
* Used by file stor threads to get their next message to process.
*
* @param disk The disk to get messages for
- * @param lowestPriority The lowest priority of operation we should return
*/
- LockedMessage getNextMessage(uint16_t disk, uint8_t lowestPriority);
+ LockedMessage getNextMessage(uint16_t disk);
/**
* Returns the next message for the same bucket.
*/
- LockedMessage & getNextMessage(uint16_t disk,
- LockedMessage& lock,
- uint8_t lowestPriority);
+ LockedMessage & getNextMessage(uint16_t disk, LockedMessage& lock);
/**
* Lock a bucket. By default, each file stor thread has the locks of all
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
index c8b5c71ee2e..0793d23b862 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
@@ -398,7 +398,7 @@ FileStorHandlerImpl::abortQueuedOperations(
bool
FileStorHandlerImpl::hasBlockingOperations(const Disk& t) const
{
- for (auto& lockedBucket : t.lockedBuckets) {
+ for (const auto & lockedBucket : t.lockedBuckets) {
if (lockedBucket.second.priority <= _minPriorityToBeBlocking) {
return true;
}
@@ -419,9 +419,7 @@ FileStorHandlerImpl::updateMetrics(const MetricLockGuard &)
}
FileStorHandler::LockedMessage &
-FileStorHandlerImpl::getNextMessage(uint16_t disk,
- FileStorHandler::LockedMessage& lck,
- uint8_t maxPriority)
+FileStorHandlerImpl::getNextMessage(uint16_t disk, FileStorHandler::LockedMessage& lck)
{
document::Bucket bucket(lck.first->getBucket());
@@ -452,7 +450,7 @@ FileStorHandlerImpl::getNextMessage(uint16_t disk,
mbus::Trace& trace = m.getTrace();
// Priority is too low, not buffering any more.
- if (m.getPriority() > maxPriority || m.getPriority() >= _maxPriorityToBlock) {
+ if (m.getPriority() >= _maxPriorityToBlock) {
lck.second.reset();
return lck;
}
@@ -564,23 +562,10 @@ namespace {
bucketIsLockedOnDisk(const document::Bucket &id, const FileStorHandlerImpl::Disk &t) {
return (id.getBucketId().getRawId() != 0 && t.isLocked(id));
}
-
- /**
- * Return whether msg has sufficiently high priority that a thread with
- * a configured priority threshold of maxPriority can even run in.
- * Often, operations such as streaming searches will have dedicated threads
- * that refuse lower priority operations such as Puts etc.
- */
- bool
- operationHasHighEnoughPriorityToBeRun(const api::StorageMessage& msg, uint8_t maxPriority)
- {
- // NOTE: priority integral value 0 is considered highest pri.
- return (msg.getPriority() <= maxPriority);
- }
}
FileStorHandler::LockedMessage
-FileStorHandlerImpl::getNextMessage(uint16_t disk, uint8_t maxPriority)
+FileStorHandlerImpl::getNextMessage(uint16_t disk)
{
assert(disk < _diskInfo.size());
if (!tryHandlePause(disk)) {
@@ -604,8 +589,7 @@ FileStorHandlerImpl::getNextMessage(uint16_t disk, uint8_t maxPriority)
if (iter != end) {
api::StorageMessage &m(*iter->_command);
- if (operationHasHighEnoughPriorityToBeRun(m, maxPriority)
- && ! operationBlockedByHigherPriorityThread(m, t)
+ if (! operationBlockedByHigherPriorityThread(m, t)
&& ! isPaused())
{
return getMessage(lockGuard, t, idx, iter);
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
index 56ac9ea0577..0b1ab5661e2 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
@@ -159,11 +159,10 @@ public:
bool schedule(const std::shared_ptr<api::StorageMessage>&, uint16_t disk);
void pause(uint16_t disk, uint8_t priority) const;
- FileStorHandler::LockedMessage getNextMessage(uint16_t disk, uint8_t lowestPriority);
+ FileStorHandler::LockedMessage getNextMessage(uint16_t disk);
FileStorHandler::LockedMessage getMessage(vespalib::MonitorGuard & guard, Disk & t, PriorityIdx & idx, PriorityIdx::iterator iter);
- FileStorHandler::LockedMessage & getNextMessage(uint16_t disk, FileStorHandler::LockedMessage& lock,
- uint8_t lowestPriority);
+ FileStorHandler::LockedMessage & getNextMessage(uint16_t disk, FileStorHandler::LockedMessage& lock);
enum Operation { MOVE, SPLIT, JOIN };
void remapQueue(const RemapInfo& source, RemapInfo& target, Operation op);
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
index 6644d09da7f..20c08acc01c 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
@@ -126,27 +126,12 @@ FileStorManager::configure(std::unique_ptr<vespa::config::content::StorFilestorC
_config->maxPriorityToBlock, _config->minPriorityToBeBlocking));
for (uint32_t i=0; i<_component.getDiskCount(); ++i) {
if (_partitions[i].isUp()) {
- if (_config->threads.size() == 0) {
- LOG(spam, "Setting up disk %u", i);
- for (uint32_t j = 0; j < 4; j++) {
- _disks[i].push_back(DiskThread::SP(
- new PersistenceThread(_compReg, _configUri, *_provider, *_filestorHandler,
- *_metrics->disks[i]->threads[j], i, 255)));
-
- }
- for (uint32_t j = 4; j < 6; j++) {
- _disks[i].push_back(DiskThread::SP(
- new PersistenceThread(_compReg, _configUri, *_provider, *_filestorHandler,
- *_metrics->disks[i]->threads[j], i, 100)));
- }
- }
-
- for (uint16_t j = 0; j < _config->threads.size(); j++) {
- LOG(spam, "Setting up disk %u, thread %u with priority %d",
- i, j, _config->threads[j].lowestpri);
+ size_t numThreads = (_config->threads.size() == 0) ? 6 : _config->threads.size();
+ LOG(spam, "Setting up disk %u", i);
+ for (uint32_t j = 0; j < numThreads; j++) {
_disks[i].push_back(DiskThread::SP(
new PersistenceThread(_compReg, _configUri, *_provider, *_filestorHandler,
- *_metrics->disks[i]->threads[j], i, _config->threads[j].lowestpri)));
+ *_metrics->disks[i]->threads[j], i)));
}
} else {
diff --git a/storage/src/vespa/storage/persistence/persistencethread.cpp b/storage/src/vespa/storage/persistence/persistencethread.cpp
index 6c4cd7b64d2..13d9d0ffbb6 100644
--- a/storage/src/vespa/storage/persistence/persistencethread.cpp
+++ b/storage/src/vespa/storage/persistence/persistencethread.cpp
@@ -21,9 +21,8 @@ PersistenceThread::PersistenceThread(ServiceLayerComponentRegister& compReg,
spi::PersistenceProvider& provider,
FileStorHandler& filestorHandler,
FileStorThreadMetrics& metrics,
- uint16_t deviceIndex,
- uint8_t lowestPriority)
- : _env(configUri, compReg, filestorHandler, metrics, deviceIndex, lowestPriority, provider),
+ uint16_t deviceIndex)
+ : _env(configUri, compReg, filestorHandler, metrics, deviceIndex, provider),
_warnOnSlowOperations(5000),
_spi(provider),
_processAllHandler(_env, provider),
@@ -139,9 +138,7 @@ PersistenceThread::handleRemove(api::RemoveCommand& cmd)
spi::Timestamp(cmd.getTimestamp()),
cmd.getDocumentId(), _context);
if (checkForError(response, *tracker)) {
- api::RemoveReply* reply(new api::RemoveReply(
- cmd, response.wasFound() ? cmd.getTimestamp() : 0));
- tracker->setReply(api::StorageReply::SP(reply));
+ tracker->setReply(std::make_shared<api::RemoveReply>(cmd, response.wasFound() ? cmd.getTimestamp() : 0));
}
if (!response.wasFound()) {
++_env._metrics.remove[cmd.getLoadType()].notFound;
@@ -165,9 +162,9 @@ PersistenceThread::handleUpdate(api::UpdateCommand& cmd)
spi::Timestamp(cmd.getTimestamp()),
cmd.getUpdate(), _context);
if (checkForError(response, *tracker)) {
- api::UpdateReply* reply = new api::UpdateReply(cmd);
+ auto reply = std::make_shared<api::UpdateReply>(cmd);
reply->setOldTimestamp(response.getExistingTimestamp());
- tracker->setReply(api::StorageReply::SP(reply));
+ tracker->setReply(std::move(reply));
}
return tracker;
}
@@ -243,11 +240,9 @@ PersistenceThread::handleRevert(api::RevertCommand& cmd)
_env._metrics.revert[cmd.getLoadType()],
_env._component.getClock()));
spi::Bucket b = spi::Bucket(cmd.getBucket(), spi::PartitionId(_env._partition));
- const std::vector<api::Timestamp> tokens = cmd.getRevertTokens();
- for (uint32_t i = 0; i < tokens.size(); ++i) {
- spi::Result result = _spi.removeEntry(b,
- spi::Timestamp(tokens[i]),
- _context);
+ const std::vector<api::Timestamp> & tokens = cmd.getRevertTokens();
+ for (const api::Timestamp & token : tokens) {
+ spi::Result result = _spi.removeEntry(b, spi::Timestamp(token), _context);
}
return tracker;
}
@@ -430,9 +425,7 @@ PersistenceThread::handleCreateIterator(CreateIteratorCommand& cmd)
cmd.getIncludedVersions(),
_context));
if (checkForError(result, *tracker)) {
- tracker->setReply(CreateIteratorReply::SP(
- new CreateIteratorReply(
- cmd, spi::IteratorId(result.getIteratorId()))));
+ tracker->setReply(std::make_shared<CreateIteratorReply>(cmd, spi::IteratorId(result.getIteratorId())));
}
return tracker;
}
@@ -519,8 +512,9 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd)
// Ensure to take them in rising order.
StorBucketDatabase::WrappedEntry sourceEntry(_env.getBucketDatabase(spiBucket.getBucket().getBucketSpace()).get(
cmd.getBucketId(), "PersistenceThread::handleSplitBucket-source"));
- api::SplitBucketReply* splitReply(new api::SplitBucketReply(cmd));
- tracker->setReply(api::StorageReply::SP(splitReply));
+ auto reply = std::make_shared<api::SplitBucketReply>(cmd);
+ api::SplitBucketReply & splitReply = *reply;
+ tracker->setReply(std::move(reply));
typedef std::pair<StorBucketDatabase::WrappedEntry,
FileStorHandler::RemapInfo> TargetInfo;
@@ -581,7 +575,7 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd)
createTarget.toString().c_str());
_spi.createBucket(createTarget, _context);
}
- splitReply->getSplitInfo().push_back(
+ splitReply.getSplitInfo().push_back(
api::SplitBucketReply::Entry(
targets[i].second.bucket.getBucketId(),
targets[i].first->getBucketInfo()));
@@ -1139,10 +1133,7 @@ void PersistenceThread::processMessages(FileStorHandler::LockedMessage & lock)
trackers.push_back(std::move(tracker));
if (trackers.back()->getReply()->getResult().success()) {
- _env._fileStorHandler.getNextMessage(
- _env._partition,
- lock,
- _env._lowestPriority);
+ _env._fileStorHandler.getNextMessage(_env._partition, lock);
} else {
break;
}
@@ -1170,9 +1161,7 @@ PersistenceThread::run(framework::ThreadHandle& thread)
{
thread.registerTick();
- FileStorHandler::LockedMessage lock(
- _env._fileStorHandler.getNextMessage(
- _env._partition, _env._lowestPriority));
+ FileStorHandler::LockedMessage lock(_env._fileStorHandler.getNextMessage(_env._partition));
if (lock.first.get()) {
processMessages(lock);
diff --git a/storage/src/vespa/storage/persistence/persistencethread.h b/storage/src/vespa/storage/persistence/persistencethread.h
index 07fadb875cc..640614f7edf 100644
--- a/storage/src/vespa/storage/persistence/persistencethread.h
+++ b/storage/src/vespa/storage/persistence/persistencethread.h
@@ -21,7 +21,7 @@ class PersistenceThread final : public DiskThread, public Types
public:
PersistenceThread(ServiceLayerComponentRegister&, const config::ConfigUri & configUri,
spi::PersistenceProvider& provider, FileStorHandler& filestorHandler,
- FileStorThreadMetrics& metrics, uint16_t deviceIndex, uint8_t lowestPriority);
+ FileStorThreadMetrics& metrics, uint16_t deviceIndex);
~PersistenceThread();
/** Waits for current operation to be finished. */
diff --git a/storage/src/vespa/storage/persistence/persistenceutil.cpp b/storage/src/vespa/storage/persistence/persistenceutil.cpp
index 8e96c05a66a..ec666925148 100644
--- a/storage/src/vespa/storage/persistence/persistenceutil.cpp
+++ b/storage/src/vespa/storage/persistence/persistenceutil.cpp
@@ -66,7 +66,6 @@ PersistenceUtil::PersistenceUtil(
FileStorHandler& fileStorHandler,
FileStorThreadMetrics& metrics,
uint16_t partition,
- uint8_t lowestPriority,
spi::PersistenceProvider& provider)
: _config(*config::ConfigGetter<vespa::config::content::StorFilestorConfig>::getConfig(configUri.getConfigId(), configUri.getContext())),
_compReg(compReg),
@@ -77,19 +76,15 @@ PersistenceUtil::PersistenceUtil(
_metrics(metrics),
_bucketFactory(_component.getBucketIdFactory()),
_repo(_component.getTypeRepo()),
- _lowestPriority(lowestPriority),
_pauseHandler(),
_spi(provider)
{
}
-PersistenceUtil::~PersistenceUtil()
-{
-}
+PersistenceUtil::~PersistenceUtil() { }
void
-PersistenceUtil::updateBucketDatabase(const document::Bucket &bucket,
- const api::BucketInfo& i)
+PersistenceUtil::updateBucketDatabase(const document::Bucket &bucket, const api::BucketInfo& i)
{
// Update bucket database
StorBucketDatabase::WrappedEntry entry(getBucketDatabase(bucket.getBucketSpace()).get(
diff --git a/storage/src/vespa/storage/persistence/persistenceutil.h b/storage/src/vespa/storage/persistence/persistenceutil.h
index 5126931bab6..c3bd706f4b7 100644
--- a/storage/src/vespa/storage/persistence/persistenceutil.h
+++ b/storage/src/vespa/storage/persistence/persistenceutil.h
@@ -70,7 +70,6 @@ struct PersistenceUtil {
FileStorThreadMetrics& _metrics;
const document::BucketIdFactory& _bucketFactory;
const std::shared_ptr<document::DocumentTypeRepo> _repo;
- uint8_t _lowestPriority;
PauseHandler _pauseHandler;
spi::PersistenceProvider& _spi;
@@ -80,7 +79,6 @@ struct PersistenceUtil {
FileStorHandler& fileStorHandler,
FileStorThreadMetrics& metrics,
uint16_t partition,
- uint8_t lowestPriority,
spi::PersistenceProvider& provider);
~PersistenceUtil();