diff options
author | Tor Brede Vekterli <vekterli@oath.com> | 2018-03-20 13:22:39 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-03-20 13:22:39 +0100 |
commit | 76cd17602b493a13d1e993b1f72cfdaf7b627e0b (patch) | |
tree | 2d3bd1ee963de6107b80b183a23d03708764fa1a /storage | |
parent | e31b9616921f9046a1fabbe9fca9cf3a5d535279 (diff) | |
parent | b9c175537bc743a0bf457d579d03b59867d1a1f1 (diff) |
Merge pull request #5378 from vespa-engine/balder/no-more-thread-priorities
Balder/no more thread priorities
Diffstat (limited to 'storage')
17 files changed, 352 insertions, 1120 deletions
diff --git a/storage/src/tests/common/testhelper.cpp b/storage/src/tests/common/testhelper.cpp index 296a9a3cc0f..f7da854be68 100644 --- a/storage/src/tests/common/testhelper.cpp +++ b/storage/src/tests/common/testhelper.cpp @@ -9,23 +9,6 @@ LOG_SETUP(".testhelper"); namespace storage { -namespace { - bool useNewStorageCore() { - if ( // Unit test directory - vespalib::fileExists("use_new_storage_core") || - // src/cpp directory - vespalib::fileExists("../use_new_storage_core") || - // Top build directory where storage-HEAD remains - vespalib::fileExists("../../../../use_new_storage_core")) - { - std::cerr << "Using new storage core for unit tests\n"; - return true; - } - return false; - } - bool newStorageCore(useNewStorageCore()); -} - void addStorageDistributionConfig(vdstestlib::DirConfig& dc) { vdstestlib::DirConfig::Config* config; @@ -124,18 +107,14 @@ vdstestlib::DirConfig getStandardConfig(bool storagenode, const std::string & ro config->set("minimum_file_meta_slots", "2"); config->set("minimum_file_header_block_size", "368"); config->set("minimum_file_size", "4096"); - config->set("threads[1]"); - config->set("threads[0].lowestpri 255"); + config->set("num_threads", "1"); config->set("dir_spread", "4"); config->set("dir_levels", "0"); - config->set("use_new_core", newStorageCore ? "true" : "false"); config->set("maximum_versions_of_single_document_stored", "0"); //config->set("enable_slotfile_cache", "false"); // Unit tests typically use fake low time values, so don't complain // about them or compact/delete them by default. Override in tests testing that // behavior - config->set("time_future_limit", "5"); - config->set("time_past_limit", "2000000000"); config->set("keep_remove_time_period", "2000000000"); config->set("revert_time_period", "2000000000"); // Don't want test to call exit() diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp index 8d03c9de54c..26bfeb41b42 100644 --- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp +++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp @@ -7,17 +7,10 @@ #include <vespa/document/test/make_document_bucket.h> #include <vespa/storage/storageserver/statemanager.h> #include <vespa/storage/bucketdb/bucketmanager.h> -#include <vespa/vdslib/state/cluster_state_bundle.h> #include <vespa/storage/persistence/persistencethread.h> #include <vespa/storage/persistence/filestorage/filestormanager.h> #include <vespa/storage/persistence/filestorage/modifiedbucketchecker.h> #include <vespa/document/update/assignvalueupdate.h> -#include <vespa/document/datatype/datatype.h> -#include <vespa/document/fieldvalue/document.h> -#include <vespa/document/datatype/documenttype.h> -#include <vespa/document/update/documentupdate.h> -#include <vespa/document/fieldvalue/rawfieldvalue.h> -#include <vespa/document/fieldvalue/stringfieldvalue.h> #include <vespa/document/select/parser.h> #include <vespa/vdslib/state/random.h> #include <vespa/storageapi/message/bucketsplitting.h> @@ -76,8 +69,6 @@ struct FileStorManagerTest : public CppUnit::TestFixture { void testFlush(); void testRemapSplit(); void testHandlerPriority(); - void testHandlerPriorityBlocking(); - void testHandlerPriorityPreempt(); void testHandlerMulti(); void testHandlerTimeout(); void testHandlerPause(); @@ -102,7 +93,6 @@ struct FileStorManagerTest : public CppUnit::TestFixture { void testMergeBucketImplicitCreateBucket(); void testNewlyCreatedBucketIsReady(); void testCreateBucketSetsActiveFlagInDatabaseAndReply(); - void testFileStorThreadLockingStressTest(); void testStateChange(); void testRepairNotifiesDistributorOnChange(); void testDiskMove(); @@ -113,8 +103,6 @@ struct FileStorManagerTest : public CppUnit::TestFixture { CPPUNIT_TEST(testFlush); CPPUNIT_TEST(testRemapSplit); CPPUNIT_TEST(testHandlerPriority); - CPPUNIT_TEST(testHandlerPriorityBlocking); - CPPUNIT_TEST(testHandlerPriorityPreempt); CPPUNIT_TEST(testHandlerMulti); CPPUNIT_TEST(testHandlerTimeout); CPPUNIT_TEST(testHandlerPause); @@ -146,21 +134,17 @@ struct FileStorManagerTest : public CppUnit::TestFixture { void createBucket(document::BucketId bid, uint16_t disk) { - spi::Context context(defaultLoadType, spi::Priority(0), - spi::Trace::TraceLevel(0)); - _node->getPersistenceProvider().createBucket( - makeSpiBucket(bid, spi::PartitionId(disk)), context); + spi::Context context(defaultLoadType, spi::Priority(0), spi::Trace::TraceLevel(0)); + _node->getPersistenceProvider().createBucket(makeSpiBucket(bid, spi::PartitionId(disk)), context); StorBucketDatabase::WrappedEntry entry( - _node->getStorageBucketDatabase().get(bid, "foo", - StorBucketDatabase::CREATE_IF_NONEXISTING)); + _node->getStorageBucketDatabase().get(bid, "foo", StorBucketDatabase::CREATE_IF_NONEXISTING)); entry->disk = disk; entry->info = api::BucketInfo(0, 0, 0, 0, 0, true, false); entry.write(); } - document::Document::UP createDocument( - const std::string& content, const std::string& id) + document::Document::UP createDocument(const std::string& content, const std::string& id) { return _node->getTestDocMan().createDocument(content, id); } @@ -265,15 +249,14 @@ std::unique_ptr<DiskThread> createThread(vdstestlib::DirConfig& config, spi::PersistenceProvider& provider, FileStorHandler& filestorHandler, FileStorThreadMetrics& metrics, - uint16_t deviceIndex, - uint8_t lowestPriority) + uint16_t deviceIndex) { (void) config; std::unique_ptr<DiskThread> disk; disk.reset(new PersistenceThread( node.getComponentRegister(), config.getConfigId(), provider, filestorHandler, metrics, - deviceIndex, lowestPriority)); + deviceIndex)); return disk; } @@ -629,39 +612,31 @@ FileStorManagerTest::testHandlerPriority() FileStorMetrics metrics(loadTypes.getMetricLoadTypes()); metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1); - FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), - _node->getComponentRegister(), 255, 0); + FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), _node->getComponentRegister()); filestorHandler.setGetNextMessageTimeout(50); std::string content("Here is some content which is in all documents"); std::ostringstream uri; - Document::SP doc(createDocument( - content, "userdoc:footype:1234:bar").release()); + Document::SP doc(createDocument(content, "userdoc:footype:1234:bar").release()); document::BucketIdFactory factory; - document::BucketId bucket(16, factory.getBucketId( - doc->getId()).getRawId()); + document::BucketId bucket(16, factory.getBucketId(doc->getId()).getRawId()); // Populate bucket with the given data for (uint32_t i = 1; i < 6; i++) { - std::shared_ptr<api::PutCommand> cmd( - new api::PutCommand(makeDocumentBucket(bucket), doc, 100)); - std::unique_ptr<api::StorageMessageAddress> address( - new api::StorageMessageAddress( - "storage", lib::NodeType::STORAGE, 3)); + auto cmd = std::make_shared<api::PutCommand>(makeDocumentBucket(bucket), doc, 100); + auto address = std::make_shared<api::StorageMessageAddress>("storage", lib::NodeType::STORAGE, 3); cmd->setAddress(*address); cmd->setPriority(i * 15); filestorHandler.schedule(cmd, 0); } - CPPUNIT_ASSERT_EQUAL(15, (int)filestorHandler.getNextMessage(0, 20).second->getPriority()); - CPPUNIT_ASSERT(filestorHandler.getNextMessage(0, 20).second.get() == NULL); - CPPUNIT_ASSERT_EQUAL(30, (int)filestorHandler.getNextMessage(0, 50).second->getPriority()); - CPPUNIT_ASSERT_EQUAL(45, (int)filestorHandler.getNextMessage(0, 50).second->getPriority()); - CPPUNIT_ASSERT(filestorHandler.getNextMessage(0, 50).second.get() == NULL); - CPPUNIT_ASSERT_EQUAL(60, (int)filestorHandler.getNextMessage(0, 255).second->getPriority()); - CPPUNIT_ASSERT_EQUAL(75, (int)filestorHandler.getNextMessage(0, 255).second->getPriority()); + CPPUNIT_ASSERT_EQUAL(15, (int)filestorHandler.getNextMessage(0).second->getPriority()); + CPPUNIT_ASSERT_EQUAL(30, (int)filestorHandler.getNextMessage(0).second->getPriority()); + CPPUNIT_ASSERT_EQUAL(45, (int)filestorHandler.getNextMessage(0).second->getPriority()); + CPPUNIT_ASSERT_EQUAL(60, (int)filestorHandler.getNextMessage(0).second->getPriority()); + CPPUNIT_ASSERT_EQUAL(75, (int)filestorHandler.getNextMessage(0).second->getPriority()); } class MessagePusherThread : public document::Runnable @@ -678,11 +653,9 @@ public: void run() override { while (!_done) { document::BucketIdFactory factory; - document::BucketId bucket(16, factory.getBucketId( - _doc->getId()).getRawId()); + document::BucketId bucket(16, factory.getBucketId(_doc->getId()).getRawId()); - std::shared_ptr<api::PutCommand> cmd( - new api::PutCommand(makeDocumentBucket(bucket), _doc, 100)); + auto cmd = std::make_shared<api::PutCommand>(makeDocumentBucket(bucket), _doc, 100); _handler.schedule(cmd, 0); FastOS_Thread::Sleep(1); } @@ -694,7 +667,7 @@ public: MessagePusherThread::MessagePusherThread(FileStorHandler& handler, Document::SP doc) : _handler(handler), _doc(doc), _done(false), _threadDone(false) {} -MessagePusherThread::~MessagePusherThread() {} +MessagePusherThread::~MessagePusherThread() = default; class MessageFetchingThread : public document::Runnable { public: @@ -711,7 +684,7 @@ public: void run() override { while (!_done) { - FileStorHandler::LockedMessage msg = _handler.getNextMessage(0, 255); + FileStorHandler::LockedMessage msg = _handler.getNextMessage(0); if (msg.second.get()) { uint32_t originalConfig = _config.load(); _fetchedCount++; @@ -747,8 +720,7 @@ FileStorManagerTest::testHandlerPausedMultiThread() FileStorMetrics metrics(loadTypes.getMetricLoadTypes()); metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1); - FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), - _node->getComponentRegister(), 255, 0); + FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), _node->getComponentRegister()); filestorHandler.setGetNextMessageTimeout(50); std::string content("Here is some content which is in all documents"); @@ -799,8 +771,7 @@ FileStorManagerTest::testHandlerPause() FileStorMetrics metrics(loadTypes.getMetricLoadTypes()); metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1); - FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), - _node->getComponentRegister(), 255, 0); + FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), _node->getComponentRegister()); filestorHandler.setGetNextMessageTimeout(50); std::string content("Here is some content which is in all documents"); @@ -810,29 +781,26 @@ FileStorManagerTest::testHandlerPause() document::BucketIdFactory factory; document::BucketId bucket(16, factory.getBucketId( - doc->getId()).getRawId()); + doc->getId()).getRawId()); // Populate bucket with the given data for (uint32_t i = 1; i < 6; i++) { - std::shared_ptr<api::PutCommand> cmd( - new api::PutCommand(makeDocumentBucket(bucket), doc, 100)); - std::unique_ptr<api::StorageMessageAddress> address( - new api::StorageMessageAddress( - "storage", lib::NodeType::STORAGE, 3)); + auto cmd = std::make_shared<api::PutCommand>(makeDocumentBucket(bucket), doc, 100); + auto address = std::make_unique<api::StorageMessageAddress>("storage", lib::NodeType::STORAGE, 3); cmd->setAddress(*address); cmd->setPriority(i * 15); filestorHandler.schedule(cmd, 0); } - CPPUNIT_ASSERT_EQUAL(15, (int)filestorHandler.getNextMessage(0, 255).second->getPriority()); + CPPUNIT_ASSERT_EQUAL(15, (int)filestorHandler.getNextMessage(0).second->getPriority()); { ResumeGuard guard = filestorHandler.pause(); (void)guard; - CPPUNIT_ASSERT(filestorHandler.getNextMessage(0, 255).second.get() == NULL); + CPPUNIT_ASSERT(filestorHandler.getNextMessage(0).second.get() == NULL); } - CPPUNIT_ASSERT_EQUAL(30, (int)filestorHandler.getNextMessage(0, 255).second->getPriority()); + CPPUNIT_ASSERT_EQUAL(30, (int)filestorHandler.getNextMessage(0).second->getPriority()); } namespace { @@ -855,8 +823,7 @@ FileStorManagerTest::testRemapSplit() // Setup a filestorthread to test DummyStorageLink top; DummyStorageLink *dummyManager; - top.push_back(std::unique_ptr<StorageLink>( - dummyManager = new DummyStorageLink)); + top.push_back(std::unique_ptr<StorageLink>(dummyManager = new DummyStorageLink)); top.open(); ForwardingMessageSender messageSender(*dummyManager); // Since we fake time with small numbers, we need to make sure we dont @@ -867,7 +834,7 @@ FileStorManagerTest::testRemapSplit() metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1); FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), - _node->getComponentRegister(), 255, 0); + _node->getComponentRegister()); filestorHandler.setGetNextMessageTimeout(50); std::string content("Here is some content which is in all documents"); @@ -882,10 +849,8 @@ FileStorManagerTest::testRemapSplit() // Populate bucket with the given data for (uint32_t i = 1; i < 4; i++) { - filestorHandler.schedule( - api::StorageMessage::SP(new api::PutCommand(makeDocumentBucket(bucket1), doc1, i)), 0); - filestorHandler.schedule( - api::StorageMessage::SP(new api::PutCommand(makeDocumentBucket(bucket2), doc2, i + 10)), 0); + filestorHandler.schedule(std::make_shared<api::PutCommand>(makeDocumentBucket(bucket1), doc1, i), 0); + filestorHandler.schedule(std::make_shared<api::PutCommand>(makeDocumentBucket(bucket2), doc2, i + 10), 0); } CPPUNIT_ASSERT_EQUAL(std::string("BucketId(0x40000000000004d2): Put(BucketId(0x40000000000004d2), userdoc:footype:1234:bar, timestamp 1, size 108) (priority: 127)\n" @@ -933,7 +898,7 @@ FileStorManagerTest::testHandlerMulti() metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1); FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), - _node->getComponentRegister(), 255, 0); + _node->getComponentRegister()); filestorHandler.setGetNextMessageTimeout(50); std::string content("Here is some content which is in all documents"); @@ -943,10 +908,8 @@ FileStorManagerTest::testHandlerMulti() Document::SP doc2(createDocument(content, "userdoc:footype:4567:bar").release()); document::BucketIdFactory factory; - document::BucketId bucket1(16, factory.getBucketId( - doc1->getId()).getRawId()); - document::BucketId bucket2(16, factory.getBucketId( - doc2->getId()).getRawId()); + document::BucketId bucket1(16, factory.getBucketId(doc1->getId()).getRawId()); + document::BucketId bucket2(16, factory.getBucketId(doc2->getId()).getRawId()); // Populate bucket with the given data for (uint32_t i = 1; i < 10; i++) { @@ -957,21 +920,21 @@ FileStorManagerTest::testHandlerMulti() } { - FileStorHandler::LockedMessage lock = filestorHandler.getNextMessage(0, 255); + FileStorHandler::LockedMessage lock = filestorHandler.getNextMessage(0); CPPUNIT_ASSERT_EQUAL((uint64_t)1, getPutTime(lock.second)); - lock = filestorHandler.getNextMessage(0, lock, 255); + lock = filestorHandler.getNextMessage(0, lock); CPPUNIT_ASSERT_EQUAL((uint64_t)2, getPutTime(lock.second)); - lock = filestorHandler.getNextMessage(0, lock, 255); + lock = filestorHandler.getNextMessage(0, lock); CPPUNIT_ASSERT_EQUAL((uint64_t)3, getPutTime(lock.second)); } { - FileStorHandler::LockedMessage lock = filestorHandler.getNextMessage(0, 255); + FileStorHandler::LockedMessage lock = filestorHandler.getNextMessage(0); CPPUNIT_ASSERT_EQUAL((uint64_t)11, getPutTime(lock.second)); - lock = filestorHandler.getNextMessage(0, lock, 255); + lock = filestorHandler.getNextMessage(0, lock); CPPUNIT_ASSERT_EQUAL((uint64_t)12, getPutTime(lock.second)); } } @@ -984,8 +947,7 @@ FileStorManagerTest::testHandlerTimeout() // Setup a filestorthread to test DummyStorageLink top; DummyStorageLink *dummyManager; - top.push_back(std::unique_ptr<StorageLink>( - dummyManager = new DummyStorageLink)); + top.push_back(std::unique_ptr<StorageLink>(dummyManager = new DummyStorageLink)); top.open(); ForwardingMessageSender messageSender(*dummyManager); @@ -997,7 +959,7 @@ FileStorManagerTest::testHandlerTimeout() metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1); FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), - _node->getComponentRegister(), 255, 0); + _node->getComponentRegister()); filestorHandler.setGetNextMessageTimeout(50); std::string content("Here is some content which is in all documents"); @@ -1036,7 +998,7 @@ FileStorManagerTest::testHandlerTimeout() FastOS_Thread::Sleep(51); for (;;) { - auto lock = filestorHandler.getNextMessage(0, 255); + auto lock = filestorHandler.getNextMessage(0); if (lock.first.get()) { CPPUNIT_ASSERT_EQUAL(uint8_t(200), lock.second->getPriority()); break; @@ -1050,208 +1012,6 @@ FileStorManagerTest::testHandlerTimeout() } void -FileStorManagerTest::testHandlerPriorityBlocking() -{ - TestName testName("testHandlerPriorityBlocking"); - // Setup a filestorthread to test - DummyStorageLink top; - DummyStorageLink *dummyManager; - top.push_back(std::unique_ptr<StorageLink>( - dummyManager = new DummyStorageLink)); - top.open(); - ForwardingMessageSender messageSender(*dummyManager); - // Since we fake time with small numbers, we need to make sure we dont - // compact them away, as they will seem to be from 1970 - - documentapi::LoadTypeSet loadTypes("raw:"); - FileStorMetrics metrics(loadTypes.getMetricLoadTypes()); - metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1); - - FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), - _node->getComponentRegister(), 21, 21); - filestorHandler.setGetNextMessageTimeout(50); - - std::string content("Here is some content which is in all documents"); - std::ostringstream uri; - - document::BucketIdFactory factory; - - // Populate bucket with the given data - for (uint32_t i = 1; i < 6; i++) { - Document::SP doc(createDocument(content, vespalib::make_string("doc:foo:%d",i)).release()); - document::BucketId bucket(16, factory.getBucketId( - doc->getId()).getRawId()); - std::shared_ptr<api::PutCommand> cmd( - new api::PutCommand(makeDocumentBucket(bucket), doc, 100)); - std::unique_ptr<api::StorageMessageAddress> address( - new api::StorageMessageAddress( - "storage", lib::NodeType::STORAGE, 3)); - cmd->setAddress(*address); - cmd->setPriority(i * 15); - filestorHandler.schedule(cmd, 0); - } - - { - FileStorHandler::LockedMessage lock1 = filestorHandler.getNextMessage(0, 20); - CPPUNIT_ASSERT_EQUAL(15, (int)lock1.second->getPriority()); - - LOG(debug, "Waiting for request that should time out"); - FileStorHandler::LockedMessage lock2 = filestorHandler.getNextMessage(0, 30); - LOG(debug, "Got request that should time out"); - CPPUNIT_ASSERT(lock2.second.get() == NULL); - } - - { - FileStorHandler::LockedMessage lock1 = filestorHandler.getNextMessage(0, 40); - CPPUNIT_ASSERT_EQUAL(30, (int)lock1.second->getPriority()); - - // New high-pri message comes in - Document::SP doc(createDocument(content, vespalib::make_string("doc:foo:%d", 100)).release()); - document::BucketId bucket(16, factory.getBucketId( - doc->getId()).getRawId()); - std::shared_ptr<api::PutCommand> cmd( - new api::PutCommand(makeDocumentBucket(bucket), doc, 100)); - std::unique_ptr<api::StorageMessageAddress> address( - new api::StorageMessageAddress( - "storage", lib::NodeType::STORAGE, 3)); - cmd->setAddress(*address); - cmd->setPriority(15); - filestorHandler.schedule(cmd, 0); - - FileStorHandler::LockedMessage lock2 = filestorHandler.getNextMessage(0, 20); - CPPUNIT_ASSERT_EQUAL(15, (int)lock2.second->getPriority()); - - LOG(debug, "Waiting for request that should time out"); - FileStorHandler::LockedMessage lock3 = filestorHandler.getNextMessage(0, 255); - LOG(debug, "Got request that should time out"); - CPPUNIT_ASSERT(lock3.second.get() == NULL); - } - - { - FileStorHandler::LockedMessage lock1 = filestorHandler.getNextMessage(0, 255); - CPPUNIT_ASSERT_EQUAL(45, (int)lock1.second->getPriority()); - - FileStorHandler::LockedMessage lock = filestorHandler.getNextMessage(0, 255); - CPPUNIT_ASSERT_EQUAL(60, (int)lock.second->getPriority()); - } - LOG(debug, "Test done"); -} - -class PausedThread : public document::Runnable { -private: - FileStorHandler& _handler; - -public: - bool pause; - bool done; - bool gotoperation; - - PausedThread(FileStorHandler& handler) - : _handler(handler), pause(false), done(false), gotoperation(false) {} - - void run() override { - FileStorHandler::LockedMessage msg = _handler.getNextMessage(0, 255); - gotoperation = true; - - while (!done) { - if (pause) { - _handler.pause(0, msg.second->getPriority()); - pause = false; - } - FastOS_Thread::Sleep(10); - } - - done = false; - }; -}; - -void -FileStorManagerTest::testHandlerPriorityPreempt() -{ - TestName testName("testHandlerPriorityPreempt"); - // Setup a filestorthread to test - DummyStorageLink top; - DummyStorageLink *dummyManager; - top.push_back(std::unique_ptr<StorageLink>( - dummyManager = new DummyStorageLink)); - top.open(); - ForwardingMessageSender messageSender(*dummyManager); - // Since we fake time with small numbers, we need to make sure we dont - // compact them away, as they will seem to be from 1970 - - documentapi::LoadTypeSet loadTypes("raw:"); - FileStorMetrics metrics(loadTypes.getMetricLoadTypes()); - metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1); - - FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), - _node->getComponentRegister(), 21, 21); - filestorHandler.setGetNextMessageTimeout(50); - - std::string content("Here is some content which is in all documents"); - std::ostringstream uri; - - document::BucketIdFactory factory; - - { - Document::SP doc(createDocument(content, "doc:foo:1").release()); - document::BucketId bucket(16, factory.getBucketId( - doc->getId()).getRawId()); - std::shared_ptr<api::PutCommand> cmd( - new api::PutCommand(makeDocumentBucket(bucket), doc, 100)); - std::unique_ptr<api::StorageMessageAddress> address( - new api::StorageMessageAddress( - "storage", lib::NodeType::STORAGE, 3)); - cmd->setAddress(*address); - cmd->setPriority(60); - filestorHandler.schedule(cmd, 0); - } - - PausedThread thread(filestorHandler); - FastOS_ThreadPool pool(512 * 1024); - thread.start(pool); - - while (!thread.gotoperation) { - FastOS_Thread::Sleep(10); - } - - { - Document::SP doc(createDocument(content, "doc:foo:2").release()); - document::BucketId bucket(16, factory.getBucketId( - doc->getId()).getRawId()); - std::shared_ptr<api::PutCommand> cmd( - new api::PutCommand(makeDocumentBucket(bucket), doc, 100)); - std::unique_ptr<api::StorageMessageAddress> address( - new api::StorageMessageAddress( - "storage", lib::NodeType::STORAGE, 3)); - cmd->setAddress(*address); - cmd->setPriority(20); - filestorHandler.schedule(cmd, 0); - } - - { - FileStorHandler::LockedMessage lock1 = filestorHandler.getNextMessage(0, 20); - CPPUNIT_ASSERT_EQUAL(20, (int)lock1.second->getPriority()); - - thread.pause = true; - - for (uint32_t i = 0; i < 10; i++) { - CPPUNIT_ASSERT(thread.pause); - FastOS_Thread::Sleep(100); - } - } - - while (thread.pause) { - FastOS_Thread::Sleep(10); - } - - thread.done = true; - - while (thread.done) { - FastOS_Thread::Sleep(10); - } -} - -void FileStorManagerTest::testPriority() { TestName testName("testPriority"); @@ -1269,14 +1029,13 @@ FileStorManagerTest::testPriority() FileStorMetrics metrics(loadTypes.getMetricLoadTypes()); metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 2); - FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), - _node->getComponentRegister(), 255, 0); + FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), _node->getComponentRegister()); std::unique_ptr<DiskThread> thread(createThread( *config, *_node, _node->getPersistenceProvider(), - filestorHandler, *metrics.disks[0]->threads[0], 0, 25)); + filestorHandler, *metrics.disks[0]->threads[0], 0)); std::unique_ptr<DiskThread> thread2(createThread( *config, *_node, _node->getPersistenceProvider(), - filestorHandler, *metrics.disks[0]->threads[1], 0, 255)); + filestorHandler, *metrics.disks[0]->threads[1], 0)); // Creating documents to test with. Different gids, 2 locations. std::vector<document::Document::SP > documents; @@ -1284,8 +1043,7 @@ FileStorManagerTest::testPriority() std::string content("Here is some content which is in all documents"); std::ostringstream uri; - uri << "userdoc:footype:" << (i % 3 == 0 ? 0x10001 : 0x0100001) - << ":mydoc-" << i; + uri << "userdoc:footype:" << (i % 3 == 0 ? 0x10001 : 0x0100001)<< ":mydoc-" << i; Document::SP doc(createDocument(content, uri.str()).release()); documents.push_back(doc); } @@ -1294,20 +1052,16 @@ FileStorManagerTest::testPriority() // Create buckets in separate, initial pass to avoid races with puts for (uint32_t i=0; i<documents.size(); ++i) { - document::BucketId bucket(16, factory.getBucketId( - documents[i]->getId()).getRawId()); + document::BucketId bucket(16, factory.getBucketId(documents[i]->getId()).getRawId()); - spi::Context context(defaultLoadType, spi::Priority(0), - spi::Trace::TraceLevel(0)); + spi::Context context(defaultLoadType, spi::Priority(0), spi::Trace::TraceLevel(0)); - _node->getPersistenceProvider().createBucket( - makeSpiBucket(bucket), context); + _node->getPersistenceProvider().createBucket(makeSpiBucket(bucket), context); } // Populate bucket with the given data for (uint32_t i=0; i<documents.size(); ++i) { - document::BucketId bucket(16, factory.getBucketId( - documents[i]->getId()).getRawId()); + document::BucketId bucket(16, factory.getBucketId(documents[i]->getId()).getRawId()); std::shared_ptr<api::PutCommand> cmd( new api::PutCommand(makeDocumentBucket(bucket), documents[i], 100 + i)); @@ -1342,9 +1096,8 @@ FileStorManagerTest::testPriority() CPPUNIT_ASSERT_EQUAL(uint64_t(documents.size()), metrics.disks[0]->threads[0]->operations.getValue() + metrics.disks[0]->threads[1]->operations.getValue()); - CPPUNIT_ASSERT(metrics.disks[0]->threads[0]->operations.getValue() <= 13); - // Closing file stor handler before threads are deleted, such that - // file stor threads getNextMessage calls returns. + // Closing file stor handler before threads are deleted, such that + // file stor threads getNextMessage calls returns. filestorHandler.close(); } @@ -1363,11 +1116,10 @@ FileStorManagerTest::testSplit1() documentapi::LoadTypeSet loadTypes("raw:"); FileStorMetrics metrics(loadTypes.getMetricLoadTypes()); metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1); - FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), - _node->getComponentRegister(), 255, 0); + FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), _node->getComponentRegister()); std::unique_ptr<DiskThread> thread(createThread( *config, *_node, _node->getPersistenceProvider(), - filestorHandler, *metrics.disks[0]->threads[0], 0, 255)); + filestorHandler, *metrics.disks[0]->threads[0], 0)); // Creating documents to test with. Different gids, 2 locations. std::vector<document::Document::SP > documents; for (uint32_t i=0; i<20; ++i) { @@ -1532,10 +1284,8 @@ FileStorManagerTest::testSplitSingleGroup() documentapi::LoadTypeSet loadTypes("raw:"); FileStorMetrics metrics(loadTypes.getMetricLoadTypes()); metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1); - FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), - _node->getComponentRegister(), 255, 0); - spi::Context context(defaultLoadType, spi::Priority(0), - spi::Trace::TraceLevel(0)); + FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), _node->getComponentRegister()); + spi::Context context(defaultLoadType, spi::Priority(0), spi::Trace::TraceLevel(0)); for (uint32_t j=0; j<1; ++j) { // Test this twice, once where all the data ends up in file with // splitbit set, and once where all the data ends up in file with @@ -1544,7 +1294,7 @@ FileStorManagerTest::testSplitSingleGroup() std::unique_ptr<DiskThread> thread(createThread( *config, *_node, _node->getPersistenceProvider(), - filestorHandler, *metrics.disks[0]->threads[0], 0, 255)); + filestorHandler, *metrics.disks[0]->threads[0], 0)); // Creating documents to test with. Different gids, 2 locations. std::vector<document::Document::SP > documents; for (uint32_t i=0; i<20; ++i) { @@ -1673,11 +1423,10 @@ FileStorManagerTest::testSplitEmptyTargetWithRemappedOps() documentapi::LoadTypeSet loadTypes("raw:"); FileStorMetrics metrics(loadTypes.getMetricLoadTypes()); metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1); - FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), - _node->getComponentRegister(), 255, 0); + FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), _node->getComponentRegister()); std::unique_ptr<DiskThread> thread(createThread( *config, *_node, _node->getPersistenceProvider(), - filestorHandler, *metrics.disks[0]->threads[0], 0, 255)); + filestorHandler, *metrics.disks[0]->threads[0], 0)); document::BucketId source(16, 0x10001); @@ -1751,11 +1500,10 @@ FileStorManagerTest::testNotifyOnSplitSourceOwnershipChanged() documentapi::LoadTypeSet loadTypes("raw:"); FileStorMetrics metrics(loadTypes.getMetricLoadTypes()); metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1); - FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), - _node->getComponentRegister(), 255, 0); + FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), _node->getComponentRegister()); std::unique_ptr<DiskThread> thread(createThread( *config, *_node, _node->getPersistenceProvider(), - filestorHandler, *metrics.disks[0]->threads[0], 0, 255)); + filestorHandler, *metrics.disks[0]->threads[0], 0)); document::BucketId source(getFirstBucketNotOwnedByDistributor(0)); createBucket(source, 0); @@ -1801,21 +1549,18 @@ FileStorManagerTest::testJoin() documentapi::LoadTypeSet loadTypes("raw:"); FileStorMetrics metrics(loadTypes.getMetricLoadTypes()); metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1); - FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), - _node->getComponentRegister(), 255, 0); + FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), _node->getComponentRegister()); std::unique_ptr<DiskThread> thread(createThread( *config, *_node, _node->getPersistenceProvider(), - filestorHandler, *metrics.disks[0]->threads[0], 0, 255)); + filestorHandler, *metrics.disks[0]->threads[0], 0)); // Creating documents to test with. Different gids, 2 locations. std::vector<document::Document::SP > documents; for (uint32_t i=0; i<20; ++i) { std::string content("Here is some content which is in all documents"); std::ostringstream uri; - uri << "userdoc:footype:" << (i % 3 == 0 ? 0x10001 : 0x0100001) - << ":mydoc-" << i; - Document::SP doc(createDocument( - content, uri.str()).release()); + uri << "userdoc:footype:" << (i % 3 == 0 ? 0x10001 : 0x0100001) << ":mydoc-" << i; + Document::SP doc(createDocument(content, uri.str()).release()); documents.push_back(doc); } document::BucketIdFactory factory; @@ -1826,36 +1571,26 @@ FileStorManagerTest::testJoin() { // Populate bucket with the given data for (uint32_t i=0; i<documents.size(); ++i) { - document::BucketId bucket(17, factory.getBucketId( - documents[i]->getId()).getRawId()); - std::shared_ptr<api::PutCommand> cmd( - new api::PutCommand(makeDocumentBucket(bucket), documents[i], 100 + i)); - std::unique_ptr<api::StorageMessageAddress> address( - new api::StorageMessageAddress( - "storage", lib::NodeType::STORAGE, 3)); + document::BucketId bucket(17, factory.getBucketId(documents[i]->getId()).getRawId()); + auto cmd = std::make_shared<api::PutCommand>(makeDocumentBucket(bucket), documents[i], 100 + i); + auto address = std::make_unique<api::StorageMessageAddress>("storage", lib::NodeType::STORAGE, 3); cmd->setAddress(*address); filestorHandler.schedule(cmd, 0); filestorHandler.flush(true); CPPUNIT_ASSERT_EQUAL((size_t) 1, top.getNumReplies()); - std::shared_ptr<api::PutReply> reply( - std::dynamic_pointer_cast<api::PutReply>( - top.getReply(0))); + auto reply = std::dynamic_pointer_cast<api::PutReply>(top.getReply(0)); CPPUNIT_ASSERT(reply.get()); - CPPUNIT_ASSERT_EQUAL(ReturnCode(ReturnCode::OK), - reply->getResult()); + CPPUNIT_ASSERT_EQUAL(ReturnCode(ReturnCode::OK), reply->getResult()); top.reset(); // Delete every 5th document to have delete entries in file too if (i % 5 == 0) { - std::shared_ptr<api::RemoveCommand> rcmd( - new api::RemoveCommand( - makeDocumentBucket(bucket), documents[i]->getId(), 1000000 + 100 + i)); + auto rcmd = std::make_shared<api::RemoveCommand>(makeDocumentBucket(bucket), + documents[i]->getId(), 1000000 + 100 + i); rcmd->setAddress(*address); filestorHandler.schedule(rcmd, 0); filestorHandler.flush(true); CPPUNIT_ASSERT_EQUAL((size_t) 1, top.getNumReplies()); - std::shared_ptr<api::RemoveReply> rreply( - std::dynamic_pointer_cast<api::RemoveReply>( - top.getReply(0))); + auto rreply = std::dynamic_pointer_cast<api::RemoveReply>(top.getReply(0)); CPPUNIT_ASSERT_MSG(top.getReply(0)->getType().toString(), rreply.get()); CPPUNIT_ASSERT_EQUAL(ReturnCode(ReturnCode::OK), diff --git a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp index 95642ac6215..64ef48b5719 100644 --- a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp +++ b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp @@ -43,10 +43,8 @@ public: _deleteBucketInvocations(0) {} - spi::Result put(const spi::Bucket& bucket, - spi::Timestamp timestamp, - const document::Document::SP& doc, - spi::Context& context) override + spi::Result put(const spi::Bucket& bucket, spi::Timestamp timestamp, + const document::Document::SP& doc, spi::Context& context) override { (void) bucket; (void) timestamp; @@ -90,26 +88,20 @@ public: void setupDisks(uint32_t diskCount, uint32_t queueBarrierThreads) { FileStorTestFixture::setupDisks(diskCount); - _dummyProvider.reset(new spi::dummy::DummyPersistence( - _node->getTypeRepo(), diskCount)); + _dummyProvider.reset(new spi::dummy::DummyPersistence(_node->getTypeRepo(), diskCount)); _queueBarrier.reset(new vespalib::Barrier(queueBarrierThreads)); _completionBarrier.reset(new vespalib::Barrier(2)); - _blockingProvider = new BlockingMockProvider(*_dummyProvider, - *_queueBarrier, *_completionBarrier); - - _node->setPersistenceProvider( - spi::PersistenceProvider::UP(_blockingProvider)); + _blockingProvider = new BlockingMockProvider(*_dummyProvider, *_queueBarrier, *_completionBarrier); + _node->setPersistenceProvider(spi::PersistenceProvider::UP(_blockingProvider)); } - void validateReplies(DummyStorageLink& link, - size_t repliesTotal, + void validateReplies(DummyStorageLink& link, size_t repliesTotal, const std::vector<document::BucketId>& okReplies, const std::vector<document::BucketId>& abortedGetDiffs); - void doTestSpecificOperationsNotAborted( - const char* testName, - const std::vector<api::StorageMessage::SP>& msgs, - bool shouldCreateBucketInitially); + void doTestSpecificOperationsNotAborted(const char* testName, + const std::vector<api::StorageMessage::SP>& msgs, + bool shouldCreateBucketInitially); api::BucketInfo getBucketInfoFromDB(const document::BucketId&) const; @@ -138,8 +130,7 @@ namespace { template <typename T, typename Collection> bool existsIn(const T& elem, const Collection& collection) { - return (std::find(collection.begin(), collection.end(), elem) - != collection.end()); + return (std::find(collection.begin(), collection.end(), elem) != collection.end()); } } @@ -150,18 +141,15 @@ OperationAbortingTest::setUp() } void -OperationAbortingTest::validateReplies( - DummyStorageLink& link, - size_t repliesTotal, - const std::vector<document::BucketId>& okReplies, - const std::vector<document::BucketId>& abortedGetDiffs) +OperationAbortingTest::validateReplies(DummyStorageLink& link, size_t repliesTotal, + const std::vector<document::BucketId>& okReplies, + const std::vector<document::BucketId>& abortedGetDiffs) { link.waitForMessages(repliesTotal, MSG_WAIT_TIME); CPPUNIT_ASSERT_EQUAL(repliesTotal, link.getNumReplies()); for (uint32_t i = 0; i < repliesTotal; ++i) { - api::StorageReply& reply( - dynamic_cast<api::StorageReply&>(*link.getReply(i))); + api::StorageReply& reply(dynamic_cast<api::StorageReply&>(*link.getReply(i))); LOG(info, "Checking reply %s", reply.toString(true).c_str()); switch (static_cast<uint32_t>(reply.getType().getId())) { case api::MessageType::PUT_REPLY_ID: @@ -222,11 +210,8 @@ template <typename Container> AbortBucketOperationsCommand::SP makeAbortCmd(const Container& buckets) { - std::unique_ptr<AbortBucketOperationsCommand::AbortPredicate> pred( - new ExplicitBucketSetPredicate(buckets.begin(), buckets.end())); - AbortBucketOperationsCommand::SP cmd( - new AbortBucketOperationsCommand(std::move(pred))); - return cmd; + auto pred = std::make_unique<ExplicitBucketSetPredicate>(buckets.begin(), buckets.end()); + return std::make_shared<AbortBucketOperationsCommand>(std::move(pred)); } } @@ -320,8 +305,7 @@ public: void OperationAbortingTest::testWaitForCurrentOperationCompletionForAbortedBucket() { - uint32_t queueBarrierThreads = 3; - setupDisks(1, queueBarrierThreads); + setupDisks(1, 3); TestFileStorComponents c(*this, "testWaitForCurrentOperationCompletionForAbortedBucket"); document::BucketId bucket(16, 1); @@ -350,10 +334,8 @@ OperationAbortingTest::testWaitForCurrentOperationCompletionForAbortedBucket() // reply, as it must finish processing fully before the abort returns. c.top.waitForMessages(2, MSG_WAIT_TIME); CPPUNIT_ASSERT_EQUAL(size_t(2), c.top.getNumReplies()); - CPPUNIT_ASSERT_EQUAL(api::MessageType::PUT_REPLY, - c.top.getReply(0)->getType()); - CPPUNIT_ASSERT_EQUAL(api::MessageType::INTERNAL_REPLY, - c.top.getReply(1)->getType()); + CPPUNIT_ASSERT_EQUAL(api::MessageType::PUT_REPLY, c.top.getReply(0)->getType()); + CPPUNIT_ASSERT_EQUAL(api::MessageType::INTERNAL_REPLY, c.top.getReply(1)->getType()); } void @@ -364,10 +346,7 @@ OperationAbortingTest::testDoNotAbortCreateBucketCommands() msgs.push_back(api::StorageMessage::SP(new api::CreateBucketCommand(makeDocumentBucket(bucket)))); bool shouldCreateBucketInitially(false); - doTestSpecificOperationsNotAborted( - "testDoNotAbortCreateBucketCommands", - msgs, - shouldCreateBucketInitially); + doTestSpecificOperationsNotAborted("testDoNotAbortCreateBucketCommands", msgs, shouldCreateBucketInitially); } void @@ -378,18 +357,14 @@ OperationAbortingTest::testDoNotAbortRecheckBucketCommands() msgs.push_back(api::StorageMessage::SP(new RecheckBucketInfoCommand(makeDocumentBucket(bucket)))); bool shouldCreateBucketInitially(true); - doTestSpecificOperationsNotAborted( - "testDoNotAbortRecheckBucketCommands", - msgs, - shouldCreateBucketInitially); + doTestSpecificOperationsNotAborted("testDoNotAbortRecheckBucketCommands", msgs, shouldCreateBucketInitially); } api::BucketInfo OperationAbortingTest::getBucketInfoFromDB(const document::BucketId& id) const { StorBucketDatabase::WrappedEntry entry( - _node->getStorageBucketDatabase().get(id, "foo", - StorBucketDatabase::CREATE_IF_NONEXISTING)); + _node->getStorageBucketDatabase().get(id, "foo", StorBucketDatabase::CREATE_IF_NONEXISTING)); CPPUNIT_ASSERT(entry.exist()); return entry->info; } @@ -403,20 +378,15 @@ OperationAbortingTest::testDoNotAbortDeleteBucketCommands() msgs.push_back(cmd); bool shouldCreateBucketInitially(true); - doTestSpecificOperationsNotAborted( - "testDoNotAbortRecheckBucketCommands", - msgs, - shouldCreateBucketInitially); + doTestSpecificOperationsNotAborted("testDoNotAbortRecheckBucketCommands", msgs, shouldCreateBucketInitially); } void -OperationAbortingTest::doTestSpecificOperationsNotAborted( - const char* testName, - const std::vector<api::StorageMessage::SP>& msgs, - bool shouldCreateBucketInitially) +OperationAbortingTest::doTestSpecificOperationsNotAborted(const char* testName, + const std::vector<api::StorageMessage::SP>& msgs, + bool shouldCreateBucketInitially) { - uint32_t queueBarrierThreads = 2; - setupDisks(1, queueBarrierThreads); + setupDisks(1, 2); TestFileStorComponents c(*this, testName); document::BucketId bucket(16, 1); document::BucketId blockerBucket(16, 2); @@ -443,8 +413,7 @@ OperationAbortingTest::doTestSpecificOperationsNotAborted( break; case api::MessageType::DELETEBUCKET_ID: { - api::DeleteBucketCommand& delCmd( - dynamic_cast<api::DeleteBucketCommand&>(*msgs[i])); + api::DeleteBucketCommand& delCmd(dynamic_cast<api::DeleteBucketCommand&>(*msgs[i])); delCmd.setBucketInfo(getBucketInfoFromDB(delCmd.getBucketId())); } ++expectedDeleteBuckets; @@ -473,8 +442,7 @@ OperationAbortingTest::doTestSpecificOperationsNotAborted( c.sendDummyGet(blockerBucket); // put+abort+get + any other creates/deletes/rechecks - size_t expectedMsgs(3 + expectedCreateBuckets + expectedDeleteBuckets - + expectedRecheckReplies); + size_t expectedMsgs(3 + expectedCreateBuckets + expectedDeleteBuckets + expectedRecheckReplies); LOG(info, "barrier passed, waiting for %zu replies", expectedMsgs); std::vector<document::BucketId> okReplies; @@ -483,13 +451,10 @@ OperationAbortingTest::doTestSpecificOperationsNotAborted( std::vector<document::BucketId> abortedGetDiffs; validateReplies(c.top, expectedMsgs, okReplies, abortedGetDiffs); - CPPUNIT_ASSERT_EQUAL(expectedBucketInfoInvocations, - _blockingProvider->_bucketInfoInvocations); + CPPUNIT_ASSERT_EQUAL(expectedBucketInfoInvocations, _blockingProvider->_bucketInfoInvocations); CPPUNIT_ASSERT_EQUAL(expectedCreateBuckets + (shouldCreateBucketInitially ? 2 : 1), _blockingProvider->_createBucketInvocations); - CPPUNIT_ASSERT_EQUAL(expectedDeleteBuckets, - _blockingProvider->_deleteBucketInvocations); + CPPUNIT_ASSERT_EQUAL(expectedDeleteBuckets, _blockingProvider->_deleteBucketInvocations); } - - + } // storage diff --git a/storage/src/tests/persistence/persistencequeuetest.cpp b/storage/src/tests/persistence/persistencequeuetest.cpp index 77e0ed6b71c..f5f190ad718 100644 --- a/storage/src/tests/persistence/persistencequeuetest.cpp +++ b/storage/src/tests/persistence/persistencequeuetest.cpp @@ -77,9 +77,7 @@ PersistenceQueueTest::testFetchNextUnlockedMessageIfBucketLocked() metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1); - FileStorHandler filestorHandler(messageSender, metrics, - _node->getPartitions(), - _node->getComponentRegister(), 255, 0); + FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), _node->getComponentRegister()); // Send 2 puts, 2 to the first bucket, 1 to the second. Calling // getNextMessage 2 times should then return a lock on the first bucket, @@ -89,17 +87,15 @@ PersistenceQueueTest::testFetchNextUnlockedMessageIfBucketLocked() filestorHandler.schedule(createPut(1234, 1), 0); filestorHandler.schedule(createPut(5432, 0), 0); - auto lock0 = filestorHandler.getNextMessage(0, 255); + auto lock0 = filestorHandler.getNextMessage(0); CPPUNIT_ASSERT(lock0.first.get()); - CPPUNIT_ASSERT_EQUAL( - document::BucketId(16, 1234), - dynamic_cast<api::PutCommand&>(*lock0.second).getBucketId()); + CPPUNIT_ASSERT_EQUAL(document::BucketId(16, 1234), + dynamic_cast<api::PutCommand&>(*lock0.second).getBucketId()); - auto lock1 = filestorHandler.getNextMessage(0, 255); + auto lock1 = filestorHandler.getNextMessage(0); CPPUNIT_ASSERT(lock1.first.get()); - CPPUNIT_ASSERT_EQUAL( - document::BucketId(16, 5432), - dynamic_cast<api::PutCommand&>(*lock1.second).getBucketId()); + CPPUNIT_ASSERT_EQUAL(document::BucketId(16, 5432), + dynamic_cast<api::PutCommand&>(*lock1.second).getBucketId()); } } // namespace storage diff --git a/storage/src/tests/persistence/persistencetestutils.cpp b/storage/src/tests/persistence/persistencetestutils.cpp index 9d3a3d5f008..d46708b3aaa 100644 --- a/storage/src/tests/persistence/persistencetestutils.cpp +++ b/storage/src/tests/persistence/persistencetestutils.cpp @@ -52,21 +52,13 @@ PersistenceTestEnvironment::PersistenceTestEnvironment(DiskCount numDisks, const _node.setupDummyPersistence(); _metrics.initDiskMetrics( numDisks, _node.getLoadTypes()->getMetricLoadTypes(), 1); - _handler.reset(new FileStorHandler( - _messageKeeper, _metrics, - _node.getPersistenceProvider().getPartitionStates().getList(), - _node.getComponentRegister(), 255, 0)); + _handler.reset(new FileStorHandler(_messageKeeper, _metrics, + _node.getPersistenceProvider().getPartitionStates().getList(), + _node.getComponentRegister())); for (uint32_t i = 0; i < numDisks; i++) { _diskEnvs.push_back( - std::unique_ptr<PersistenceUtil>( - new PersistenceUtil( - _config.getConfigId(), - _node.getComponentRegister(), - *_handler, - *_metrics.disks[i]->threads[0], - i, - 255, - _node.getPersistenceProvider()))); + std::make_unique<PersistenceUtil>(_config.getConfigId(), _node.getComponentRegister(), *_handler, + *_metrics.disks[i]->threads[0], i, _node.getPersistenceProvider())); } } @@ -79,8 +71,7 @@ PersistenceTestUtils::~PersistenceTestUtils() } std::string -PersistenceTestUtils::dumpBucket(const document::BucketId& bid, - uint16_t disk) { +PersistenceTestUtils::dumpBucket(const document::BucketId& bid, uint16_t disk) { return dynamic_cast<spi::dummy::DummyPersistence&>(_env->_node.getPersistenceProvider()).dumpBucket(makeSpiBucket(bid, spi::PartitionId(disk))); } @@ -92,14 +83,9 @@ PersistenceTestUtils::setupDisks(uint32_t numDisks) { std::unique_ptr<PersistenceThread> PersistenceTestUtils::createPersistenceThread(uint32_t disk) { - return std::unique_ptr<PersistenceThread>( - new PersistenceThread(_env->_node.getComponentRegister(), - _env->_config.getConfigId(), - getPersistenceProvider(), - getEnv()._fileStorHandler, - getEnv()._metrics, - disk, - 255)); + return std::make_unique<PersistenceThread>(_env->_node.getComponentRegister(), _env->_config.getConfigId(), + getPersistenceProvider(), getEnv()._fileStorHandler, + getEnv()._metrics, disk); } document::Document::SP diff --git a/storage/src/vespa/storage/common/storagelink.cpp b/storage/src/vespa/storage/common/storagelink.cpp index 8636628c7cc..b53c8c270f2 100644 --- a/storage/src/vespa/storage/common/storagelink.cpp +++ b/storage/src/vespa/storage/common/storagelink.cpp @@ -14,21 +14,16 @@ using namespace storage::api; namespace storage { -StorageLink::~StorageLink() -{ -} +StorageLink::~StorageLink() = default; void StorageLink::push_back(StorageLink::UP link) { if (_state != CREATED) { - LOG(error, "Attempted to alter chain by adding link %s after link %s " - "while state is %s", - link->toString().c_str(), - toString().c_str(), - stateToString(_state)); + LOG(error, "Attempted to alter chain by adding link %s after link %s while state is %s", + link->toString().c_str(), toString().c_str(), stateToString(_state)); assert(false); } - assert(link.get()); + assert(link); if (isBottom()) { link->_up = this; _down = std::move(link); @@ -39,26 +34,24 @@ void StorageLink::push_back(StorageLink::UP link) void StorageLink::open() { - // First tag all states as opened, as components are allowed to send - // messages both ways in onOpen call, in case any component send message - // up, the link receiving them should have their state as opened. + // First tag all states as opened, as components are allowed to send + // messages both ways in onOpen call, in case any component send message + // up, the link receiving them should have their state as opened. StorageLink* link = this; while (true) { if (link->_state != CREATED) { - LOG(error, "During open(), link %s should be in CREATED state, " - "not in state %s.", - toString().c_str(), - stateToString(link->_state)); + LOG(error, "During open(), link %s should be in CREATED state, not in state %s.", + toString().c_str(), stateToString(link->_state)); assert(false); } link->_state = OPENED; if (link->_down.get() == 0) break; link = link->_down.get(); } - // When give all links an onOpen call, bottoms up. Do it bottoms up, as - // links are more likely to send messages down in their onOpen() call - // than up. Thus, chances are best that the component is ready to - // receive messages sent during onOpen(). + // When give all links an onOpen call, bottoms up. Do it bottoms up, as + // links are more likely to send messages down in their onOpen() call + // than up. Thus, chances are best that the component is ready to + // receive messages sent during onOpen(). while (link != 0) { link->onOpen(); link = link->_up; @@ -91,34 +84,31 @@ void StorageLink::closeNextLink() { void StorageLink::flush() { if (_state != CLOSING) { - LOG(error, "During flush(), link %s should be in CLOSING state, " - "not in state %s.", - toString().c_str(), - stateToString(_state)); + LOG(error, "During flush(), link %s should be in CLOSING state, not in state %s.", + toString().c_str(), stateToString(_state)); assert(false); } - // First flush down to get all requests out of the system. + // First flush down to get all requests out of the system. _state = FLUSHINGDOWN; LOG(debug, "Flushing link %s on the way down.", toString().c_str()); onFlush(true); LOG(debug, "Flushed link %s on the way down.", toString().c_str()); if (!isBottom()) { _down->flush(); - // Then flush up to get replies out of the system + // Then flush up to get replies out of the system LOG(debug, "Flushing link %s on the way back up.", toString().c_str()); _state = FLUSHINGUP; onFlush(false); LOG(debug, "Flushed link %s on the way back up.", toString().c_str()); } else { - // Then flush up to get replies out of the system + // Then flush up to get replies out of the system LOG(debug, "Flushing link %s on the way back up.", toString().c_str()); _state = FLUSHINGUP; onFlush(false); LOG(debug, "Flushed link %s on the way back up.", toString().c_str()); } _state = CLOSED; - LOG(debug, "Link %s is now closed and should do nothing more.", - toString().c_str()); + LOG(debug, "Link %s is now closed and should do nothing more.", toString().c_str()); } void StorageLink::sendDown(const StorageMessage::SP& msg) @@ -130,58 +120,31 @@ void StorageLink::sendDown(const StorageMessage::SP& msg) case FLUSHINGDOWN: break; default: - LOG(error, - "Link %s trying to send %s down while in state %s", - toString().c_str(), - msg->toString().c_str(), - stateToString(_state)); + LOG(error, "Link %s trying to send %s down while in state %s", + toString().c_str(), msg->toString().c_str(), stateToString(_state)); assert(false); } assert(msg.get()); - LOG(spam, "Storage Link %s to handle %s", - toString().c_str(), msg->toString().c_str()); + LOG(spam, "Storage Link %s to handle %s", toString().c_str(), msg->toString().c_str()); if (isBottom()) { - LOG(spam, "Storage link %s at bottom of chain got message %s.", - toString().c_str(), msg->toString().c_str()); - /* - if (isFlush(msg)) { + LOG(spam, "Storage link %s at bottom of chain got message %s.", toString().c_str(), msg->toString().c_str()); + ostringstream ost; + ost << "Unhandled message at bottom of chain " << *msg << " (message type " + << msg->getType().getName() << "). " << vespalib::getStackTrace(0); + if (!msg->getType().isReply()) { + LOGBP(warning, "%s", ost.str().c_str()); StorageCommand& cmd = static_cast<StorageCommand&>(*msg); shared_ptr<StorageReply> reply(cmd.makeReply().release()); if (reply.get()) { + reply->setResult(ReturnCode(ReturnCode::NOT_IMPLEMENTED, msg->getType().getName())); sendUp(reply); } } else { - */ - ostringstream ost; - ost << "Unhandled message at bottom of chain " - << *msg << " (message type " - << msg->getType().getName() - << "). " - << vespalib::getStackTrace(0); - if (!msg->getType().isReply()) { - //if (!_closed) { - LOGBP(warning, "%s", ost.str().c_str()); - //} - StorageCommand& cmd = static_cast<StorageCommand&>(*msg); - shared_ptr<StorageReply> reply(cmd.makeReply().release()); - - if (reply.get()) { - reply->setResult(ReturnCode(ReturnCode::NOT_IMPLEMENTED, - msg->getType().getName())); - sendUp(reply); - } - } else { - ost << " Return code: " - << static_cast<StorageReply&>(*msg).getResult(); - //if (!_closed) { - LOGBP(warning, "%s", ost.str().c_str()); - //} - } - //} + ost << " Return code: " << static_cast<StorageReply&>(*msg).getResult(); + LOGBP(warning, "%s", ost.str().c_str()); + } } else if (!_down->onDown(msg)) { - //LOG(spam, "Storage link %s forwarding message %s.", - // toString().c_str(), msg->toString().c_str()); _down->sendDown(msg); } else { LOG(spam, "Storage link %s handled message %s.", @@ -191,7 +154,7 @@ void StorageLink::sendDown(const StorageMessage::SP& msg) void StorageLink::sendUp(const shared_ptr<StorageMessage> & msg) { - // Verify acceptable state to send messages up + // Verify acceptable state to send messages up switch(_state) { case OPENED: case CLOSING: @@ -199,48 +162,28 @@ void StorageLink::sendUp(const shared_ptr<StorageMessage> & msg) case FLUSHINGUP: break; default: - LOG(error, - "Link %s trying to send %s up while in state %s", - toString().c_str(), - msg->toString(true).c_str(), - stateToString(_state)); + LOG(error, "Link %s trying to send %s up while in state %s", + toString().c_str(), msg->toString(true).c_str(), stateToString(_state)); assert(false); } assert(msg.get()); if (isTop()) { - /* - if (isFlush(msg)) { + ostringstream ost; + ost << "Unhandled message at top of chain " << *msg << "."; + ost << vespalib::getStackTrace(0); + if (!msg->getType().isReply()) { + LOGBP(warning, "%s", ost.str().c_str()); StorageCommand& cmd = static_cast<StorageCommand&>(*msg); shared_ptr<StorageReply> reply(cmd.makeReply().release()); if (reply.get()) { + reply->setResult(ReturnCode(ReturnCode::NOT_IMPLEMENTED, msg->getType().getName())); sendDown(reply); } } else { - */ - ostringstream ost; - ost << "Unhandled message at top of chain " << *msg << "."; - ost << vespalib::getStackTrace(0); - if (!msg->getType().isReply()) { - //if (!_closed) { - LOGBP(warning, "%s", ost.str().c_str()); - //} - StorageCommand& cmd = static_cast<StorageCommand&>(*msg); - shared_ptr<StorageReply> reply(cmd.makeReply().release()); - - if (reply.get()) { - reply->setResult(ReturnCode(ReturnCode::NOT_IMPLEMENTED, - msg->getType().getName())); - sendDown(reply); - } - } else { - ost << " Return code: " - << static_cast<StorageReply&>(*msg).getResult(); - //if (!_closed) { - LOGBP(warning, "%s", ost.str().c_str()); - //} - } - //} + ost << " Return code: " << static_cast<StorageReply&>(*msg).getResult(); + LOGBP(warning, "%s", ost.str().c_str()); + } } else if (!_up->onUp(msg)) { _up->sendUp(msg); } @@ -261,19 +204,7 @@ void StorageLink::printChain(std::ostream& out, std::string indent) const { bool StorageLink::onDown(const shared_ptr<StorageMessage> & msg) { - //LOG(spam, "Checking if storage link %s handles %s.", - // toString().c_str(), msg->toString().c_str()); - bool result = msg->callHandler(*this, msg); - /* - if (result) { - LOG(spam, "Storage link %s handled message %s.", - toString().c_str(), msg->toString().c_str()); - } else { - LOG(spam, "Storage link %s did not handle message %s.", - toString().c_str(), msg->toString().c_str()); - } - */ - return result; + return msg->callHandler(*this, msg); } bool StorageLink::onUp(const shared_ptr<StorageMessage> & msg) diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp index e23bfda192c..949ceb901e1 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp @@ -7,12 +7,8 @@ namespace storage { FileStorHandler::FileStorHandler(MessageSender& sender, FileStorMetrics& metrics, const spi::PartitionStateList& partitions, - ServiceLayerComponentRegister& compReg, - uint8_t maxPriorityToBlock, - uint8_t minPriorityToBeBlocking) - : _impl(new FileStorHandlerImpl( - sender, metrics, partitions, compReg, - maxPriorityToBlock, minPriorityToBeBlocking)) + ServiceLayerComponentRegister& compReg) + : _impl(new FileStorHandlerImpl(sender, metrics, partitions, compReg)) { } @@ -57,23 +53,16 @@ FileStorHandler::schedule(const api::StorageMessage::SP& msg, uint16_t thread) return _impl->schedule(msg, thread); } -void -FileStorHandler::pause(uint16_t disk, uint8_t priority) const { - return _impl->pause(disk, priority); -} - FileStorHandler::LockedMessage -FileStorHandler::getNextMessage(uint16_t thread, uint8_t lowestPriority) +FileStorHandler::getNextMessage(uint16_t thread) { - return _impl->getNextMessage(thread, lowestPriority); + return _impl->getNextMessage(thread); } FileStorHandler::LockedMessage & -FileStorHandler::getNextMessage(uint16_t thread, - LockedMessage& lck, - uint8_t lowestPriority) +FileStorHandler::getNextMessage(uint16_t thread, LockedMessage& lck) { - return _impl->getNextMessage(thread, lck, lowestPriority); + return _impl->getNextMessage(thread, lck); } FileStorHandler::BucketLockInterface::SP @@ -89,30 +78,23 @@ FileStorHandler::remapQueueAfterDiskMove( { RemapInfo target(bucket, targetDisk); - _impl->remapQueue(RemapInfo(bucket, sourceDisk), target, - FileStorHandlerImpl::MOVE); + _impl->remapQueue(RemapInfo(bucket, sourceDisk), target, FileStorHandlerImpl::MOVE); } void -FileStorHandler::remapQueueAfterJoin( - const RemapInfo& source, - RemapInfo& target) +FileStorHandler::remapQueueAfterJoin(const RemapInfo& source,RemapInfo& target) { _impl->remapQueue(source, target, FileStorHandlerImpl::JOIN); } void -FileStorHandler::remapQueueAfterSplit( - const RemapInfo& source, - RemapInfo& target1, - RemapInfo& target2) +FileStorHandler::remapQueueAfterSplit(const RemapInfo& source,RemapInfo& target1, RemapInfo& target2) { _impl->remapQueue(source, target1, target2, FileStorHandlerImpl::SPLIT); } void -FileStorHandler::failOperations(const document::Bucket &bucket, - uint16_t fromDisk, const api::ReturnCode& err) +FileStorHandler::failOperations(const document::Bucket &bucket, uint16_t fromDisk, const api::ReturnCode& err) { _impl->failOperations(bucket, fromDisk, err); } @@ -130,8 +112,7 @@ FileStorHandler::sendReply(const api::StorageReply::SP& msg) } void -FileStorHandler::getStatus(std::ostream& out, - const framework::HttpUrlPath& path) const +FileStorHandler::getStatus(std::ostream& out, const framework::HttpUrlPath& path) const { _impl->getStatus(out, path); } @@ -149,8 +130,7 @@ FileStorHandler::getQueueSize(uint16_t disk) const } void -FileStorHandler::addMergeStatus(const document::Bucket& bucket, - MergeStatus::SP ms) +FileStorHandler::addMergeStatus(const document::Bucket& bucket, MergeStatus::SP ms) { return _impl->addMergeStatus(bucket, ms); } diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h index 6e2d6a0fc07..2cfc97ca71b 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h @@ -15,7 +15,6 @@ #include "mergestatus.h" #include <vespa/document/bucket/bucket.h> -#include <ostream> #include <vespa/storage/storageutil/resumeguard.h> #include <vespa/storage/common/messagesender.h> @@ -72,12 +71,7 @@ public: CLOSED }; - FileStorHandler(MessageSender&, - FileStorMetrics&, - const spi::PartitionStateList&, - ServiceLayerComponentRegister&, - uint8_t maxPriorityToBlock, - uint8_t minPriorityToBeBlocking); + FileStorHandler(MessageSender&, FileStorMetrics&, const spi::PartitionStateList&, ServiceLayerComponentRegister&); ~FileStorHandler(); // Commands used by file stor manager @@ -115,32 +109,19 @@ public: * Schedule a storage message to be processed by the given disk * @return True if we maanged to schedule operation. False if not */ - bool schedule(const std::shared_ptr<api::StorageMessage>&, - uint16_t disk); - - // Commands used by file stor threads - - /** - * When called, checks if any running operations have "preempting" - * priority. If so, and the given priority is less than that, this call - * will hang until the other operation is done. - */ - void pause(uint16_t disk, uint8_t priority) const; + bool schedule(const std::shared_ptr<api::StorageMessage>&, uint16_t disk); /** * Used by file stor threads to get their next message to process. * * @param disk The disk to get messages for - * @param lowestPriority The lowest priority of operation we should return */ - LockedMessage getNextMessage(uint16_t disk, uint8_t lowestPriority); + LockedMessage getNextMessage(uint16_t disk); /** * Returns the next message for the same bucket. */ - LockedMessage & getNextMessage(uint16_t disk, - LockedMessage& lock, - uint8_t lowestPriority); + LockedMessage & getNextMessage(uint16_t disk, LockedMessage& lock); /** * Lock a bucket. By default, each file stor thread has the locks of all @@ -219,8 +200,7 @@ public: * Fail all operations towards a single bucket currently queued to the * given thread with the given error code. */ - void failOperations(const document::Bucket&, uint16_t fromDisk, - const api::ReturnCode&); + void failOperations(const document::Bucket&, uint16_t fromDisk, const api::ReturnCode&); /** * Add a new merge state to the registry. diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp index c8b5c71ee2e..5eb168a9a42 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp @@ -23,20 +23,14 @@ using document::BucketSpace; namespace storage { -FileStorHandlerImpl::FileStorHandlerImpl( - MessageSender& sender, - FileStorMetrics& metrics, - const spi::PartitionStateList& partitions, - ServiceLayerComponentRegister& compReg, - uint8_t maxPriorityToBlock, - uint8_t minPriorityToBeBlocking) +FileStorHandlerImpl::FileStorHandlerImpl(MessageSender& sender, FileStorMetrics& metrics, + const spi::PartitionStateList& partitions, + ServiceLayerComponentRegister& compReg) : _partitions(partitions), _component(compReg, "filestorhandlerimpl"), _diskInfo(_component.getDiskCount()), _messageSender(sender), _bucketIdFactory(_component.getBucketIdFactory()), - _maxPriorityToBlock(maxPriorityToBlock), - _minPriorityToBeBlocking(minPriorityToBeBlocking), _getNextMessageTimeout(100), _paused(false) { @@ -46,23 +40,21 @@ FileStorHandlerImpl::FileStorHandlerImpl( } if (_diskInfo.size() == 0) { - throw vespalib::IllegalArgumentException( - "No disks configured", VESPA_STRLOC); + throw vespalib::IllegalArgumentException("No disks configured", VESPA_STRLOC); } // Add update hook, so we will get callbacks each 5 seconds to update // metrics. _component.registerMetricUpdateHook(*this, framework::SecondTime(5)); } -FileStorHandlerImpl::~FileStorHandlerImpl() { } +FileStorHandlerImpl::~FileStorHandlerImpl() = default; void FileStorHandlerImpl::addMergeStatus(const document::Bucket& bucket, MergeStatus::SP status) { vespalib::LockGuard mlock(_mergeStatesLock); if (_mergeStates.find(bucket) != _mergeStates.end()) {; - LOG(warning, "A merge status already existed for %s. Overwriting it.", - bucket.toString().c_str()); + LOG(warning, "A merge status already existed for %s. Overwriting it.", bucket.toString().c_str()); } _mergeStates[bucket] = status; } @@ -73,8 +65,7 @@ FileStorHandlerImpl::editMergeStatus(const document::Bucket& bucket) vespalib::LockGuard mlock(_mergeStatesLock); MergeStatus::SP status = _mergeStates[bucket]; if (status.get() == 0) { - throw vespalib::IllegalStateException( - "No merge state exist for " + bucket.toString(), VESPA_STRLOC); + throw vespalib::IllegalStateException("No merge state exist for " + bucket.toString(), VESPA_STRLOC); } return *status; } @@ -94,8 +85,7 @@ FileStorHandlerImpl::getNumActiveMerges() const } void -FileStorHandlerImpl::clearMergeStatus(const document::Bucket& bucket, - const api::ReturnCode* code) +FileStorHandlerImpl::clearMergeStatus(const document::Bucket& bucket, const api::ReturnCode* code) { vespalib::LockGuard mlock(_mergeStatesLock); auto it = _mergeStates.find(bucket); @@ -153,10 +143,9 @@ FileStorHandlerImpl::flush(bool killPendingMerges) if (killPendingMerges) { api::ReturnCode code(api::ReturnCode::ABORTED, "Storage node is shutting down"); - for (std::map<document::Bucket, MergeStatus::SP>::iterator it - = _mergeStates.begin(); it != _mergeStates.end(); ++it) + for (auto & entry : _mergeStates) { - MergeStatus& s(*it->second); + MergeStatus& s(*entry.second); if (s.pendingGetDiff.get() != 0) { s.pendingGetDiff->setResult(code); _messageSender.sendReply(s.pendingGetDiff); @@ -182,11 +171,9 @@ FileStorHandlerImpl::reply(api::StorageMessage& msg, std::shared_ptr<api::StorageReply> rep( static_cast<api::StorageCommand&>(msg).makeReply().release()); if (state == FileStorHandler::DISABLED) { - rep->setResult(api::ReturnCode( - api::ReturnCode::DISK_FAILURE, "Disk disabled")); + rep->setResult(api::ReturnCode(api::ReturnCode::DISK_FAILURE, "Disk disabled")); } else { - rep->setResult(api::ReturnCode( - api::ReturnCode::ABORTED, "Shutting down storage node.")); + rep->setResult(api::ReturnCode(api::ReturnCode::ABORTED, "Shutting down storage node.")); } _messageSender.sendReply(rep); } @@ -270,29 +257,6 @@ FileStorHandlerImpl::schedule(const std::shared_ptr<api::StorageMessage>& msg, return true; } -void -FileStorHandlerImpl::pause(uint16_t disk, uint8_t priority) const { - if (priority < _maxPriorityToBlock) { - return; - } - - assert(disk < _diskInfo.size()); - const Disk& t(_diskInfo[disk]); - vespalib::MonitorGuard lockGuard(t.lock); - - bool paused = true; - while (paused) { - paused = false; - for (auto& lockedBucket : t.lockedBuckets) { - if (lockedBucket.second.priority <= _minPriorityToBeBlocking) { - paused = true; - lockGuard.wait(); - break; - } - } - } -} - bool FileStorHandlerImpl::messageMayBeAborted(const api::StorageMessage& msg) const { @@ -395,18 +359,6 @@ FileStorHandlerImpl::abortQueuedOperations( } } -bool -FileStorHandlerImpl::hasBlockingOperations(const Disk& t) const -{ - for (auto& lockedBucket : t.lockedBuckets) { - if (lockedBucket.second.priority <= _minPriorityToBeBlocking) { - return true; - } - } - - return false; -} - void FileStorHandlerImpl::updateMetrics(const MetricLockGuard &) { @@ -419,16 +371,11 @@ FileStorHandlerImpl::updateMetrics(const MetricLockGuard &) } FileStorHandler::LockedMessage & -FileStorHandlerImpl::getNextMessage(uint16_t disk, - FileStorHandler::LockedMessage& lck, - uint8_t maxPriority) +FileStorHandlerImpl::getNextMessage(uint16_t disk, FileStorHandler::LockedMessage& lck) { document::Bucket bucket(lck.first->getBucket()); - LOG(spam, - "Disk %d retrieving message for buffered bucket %s", - disk, - bucket.getBucketId().toString().c_str()); + LOG(spam, "Disk %d retrieving message for buffered bucket %s", disk, bucket.getBucketId().toString().c_str()); assert(disk < _diskInfo.size()); Disk& t(_diskInfo[disk]); @@ -451,22 +398,12 @@ FileStorHandlerImpl::getNextMessage(uint16_t disk, api::StorageMessage & m(*range.first->_command); mbus::Trace& trace = m.getTrace(); - // Priority is too low, not buffering any more. - if (m.getPriority() > maxPriority || m.getPriority() >= _maxPriorityToBlock) { - lck.second.reset(); - return lck; - } + MBUS_TRACE(trace, 9, "FileStorHandler: Message identified by disk thread looking for more requests to active bucket."); - MBUS_TRACE(trace, 9, - "FileStorHandler: Message identified by disk thread looking for " - "more requests to active bucket."); + uint64_t waitTime(const_cast<metrics::MetricTimer&>(range.first->_timer).stop( + t.metrics->averageQueueWaitingTime[m.getLoadType()])); - uint64_t waitTime( - const_cast<metrics::MetricTimer&>(range.first->_timer).stop( - t.metrics->averageQueueWaitingTime[m.getLoadType()])); - - LOG(debug, "Message %s waited %" PRIu64 " ms in storage queue (bucket %s), " - "timeout %d", + LOG(debug, "Message %s waited %" PRIu64 " ms in storage queue (bucket %s), timeout %d", m.toString().c_str(), waitTime, bucket.getBucketId().toString().c_str(), static_cast<api::StorageCommand&>(m).getTimeout()); @@ -480,15 +417,11 @@ FileStorHandlerImpl::getNextMessage(uint16_t disk, lockGuard.unlock(); return lck; } else { - std::shared_ptr<api::StorageReply> msgReply( - static_cast<api::StorageCommand&>(m) - .makeReply().release()); + std::shared_ptr<api::StorageReply> msgReply(static_cast<api::StorageCommand&>(m).makeReply().release()); idx.erase(range.first); lockGuard.broadcast(); lockGuard.unlock(); - msgReply->setResult(api::ReturnCode( - api::ReturnCode::TIMEOUT, - "Message waited too long in storage queue")); + msgReply->setResult(api::ReturnCode(api::ReturnCode::TIMEOUT, "Message waited too long in storage queue")); _messageSender.sendReply(msgReply); lck.second.reset(); @@ -517,23 +450,13 @@ FileStorHandlerImpl::diskIsClosed(uint16_t disk) const } bool -FileStorHandlerImpl::operationBlockedByHigherPriorityThread( - const api::StorageMessage& msg, - const Disk& disk) const -{ - return ((msg.getPriority() >= _maxPriorityToBlock) - && hasBlockingOperations(disk)); -} - -bool FileStorHandlerImpl::messageTimedOutInQueue(const api::StorageMessage& msg, uint64_t waitTime) const { if (msg.getType().isReply()) { return false; // Replies must always be processed and cannot time out. } - return (waitTime >= static_cast<const api::StorageCommand&>( - msg).getTimeout()); + return (waitTime >= static_cast<const api::StorageCommand&>(msg).getTimeout()); } std::unique_ptr<FileStorHandler::BucketLockInterface> @@ -543,8 +466,7 @@ FileStorHandlerImpl::takeDiskBucketLockOwnership( const document::Bucket &bucket, const api::StorageMessage& msg) { - return std::unique_ptr<FileStorHandler::BucketLockInterface>( - new BucketLock(guard, disk, bucket, msg.getPriority(), msg.getSummary())); + return std::make_unique<BucketLock>(guard, disk, bucket, msg.getPriority(), msg.getSummary()); } std::unique_ptr<api::StorageReply> @@ -564,23 +486,10 @@ namespace { bucketIsLockedOnDisk(const document::Bucket &id, const FileStorHandlerImpl::Disk &t) { return (id.getBucketId().getRawId() != 0 && t.isLocked(id)); } - - /** - * Return whether msg has sufficiently high priority that a thread with - * a configured priority threshold of maxPriority can even run in. - * Often, operations such as streaming searches will have dedicated threads - * that refuse lower priority operations such as Puts etc. - */ - bool - operationHasHighEnoughPriorityToBeRun(const api::StorageMessage& msg, uint8_t maxPriority) - { - // NOTE: priority integral value 0 is considered highest pri. - return (msg.getPriority() <= maxPriority); - } } FileStorHandler::LockedMessage -FileStorHandlerImpl::getNextMessage(uint16_t disk, uint8_t maxPriority) +FileStorHandlerImpl::getNextMessage(uint16_t disk) { assert(disk < _diskInfo.size()); if (!tryHandlePause(disk)) { @@ -602,12 +511,7 @@ FileStorHandlerImpl::getNextMessage(uint16_t disk, uint8_t maxPriority) iter++; } if (iter != end) { - api::StorageMessage &m(*iter->_command); - - if (operationHasHighEnoughPriorityToBeRun(m, maxPriority) - && ! operationBlockedByHigherPriorityThread(m, t) - && ! isPaused()) - { + if (! isPaused()) { return getMessage(lockGuard, t, idx, iter); } } @@ -655,22 +559,16 @@ FileStorHandlerImpl::lock(const document::Bucket &bucket, uint16_t disk) assert(disk < _diskInfo.size()); Disk& t(_diskInfo[disk]); - LOG(spam, - "Acquiring filestor lock for %s on disk %d", - bucket.getBucketId().toString().c_str(), - disk); + LOG(spam, "Acquiring filestor lock for %s on disk %d", bucket.getBucketId().toString().c_str(), disk); vespalib::MonitorGuard lockGuard(t.lock); while (bucket.getBucketId().getRawId() != 0 && t.isLocked(bucket)) { - LOG(spam, - "Contending for filestor lock for %s", - bucket.getBucketId().toString().c_str()); + LOG(spam, "Contending for filestor lock for %s", bucket.getBucketId().toString().c_str()); lockGuard.wait(100); } - std::shared_ptr<FileStorHandler::BucketLockInterface> locker( - new BucketLock(lockGuard, t, bucket, 255, "External lock")); + auto locker = std::make_shared<BucketLock>(lockGuard, t, bucket, 255, "External lock"); lockGuard.broadcast(); return locker; diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h index 56ac9ea0577..6b6d154e149 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h @@ -142,12 +142,8 @@ public: document::Bucket _bucket; }; - FileStorHandlerImpl(MessageSender&, - FileStorMetrics&, - const spi::PartitionStateList&, - ServiceLayerComponentRegister&, - uint8_t maxPriorityToBlock, - uint8_t minPriorityToBeBlocking); + FileStorHandlerImpl(MessageSender&, FileStorMetrics&, + const spi::PartitionStateList&, ServiceLayerComponentRegister&); ~FileStorHandlerImpl(); void setGetNextMessageTimeout(uint32_t timeout) { _getNextMessageTimeout = timeout; } @@ -158,12 +154,10 @@ public: void close(); bool schedule(const std::shared_ptr<api::StorageMessage>&, uint16_t disk); - void pause(uint16_t disk, uint8_t priority) const; - FileStorHandler::LockedMessage getNextMessage(uint16_t disk, uint8_t lowestPriority); + FileStorHandler::LockedMessage getNextMessage(uint16_t disk); FileStorHandler::LockedMessage getMessage(vespalib::MonitorGuard & guard, Disk & t, PriorityIdx & idx, PriorityIdx::iterator iter); - FileStorHandler::LockedMessage & getNextMessage(uint16_t disk, FileStorHandler::LockedMessage& lock, - uint8_t lowestPriority); + FileStorHandler::LockedMessage & getNextMessage(uint16_t disk, FileStorHandler::LockedMessage& lock); enum Operation { MOVE, SPLIT, JOIN }; void remapQueue(const RemapInfo& source, RemapInfo& target, Operation op); @@ -204,8 +198,6 @@ private: std::map<document::Bucket, MergeStatus::SP> _mergeStates; - uint8_t _maxPriorityToBlock; - uint8_t _minPriorityToBeBlocking; uint32_t _getNextMessageTimeout; vespalib::Monitor _pauseMonitor; @@ -237,12 +229,6 @@ private: bool diskIsClosed(uint16_t disk) const; /** - * Return whether an already running high priority operation pre-empts - * (blocks) the operation in msg from even starting in the current thread. - */ - bool operationBlockedByHigherPriorityThread(const api::StorageMessage& msg, const Disk& disk) const; - - /** * Return whether msg has timed out based on waitTime and the message's * specified timeout. */ @@ -262,7 +248,6 @@ private: */ std::unique_ptr<api::StorageReply> makeQueueTimeoutReply(api::StorageMessage& msg) const; bool messageMayBeAborted(const api::StorageMessage& msg) const; - bool hasBlockingOperations(const Disk& t) const; void abortQueuedCommandsForBuckets(Disk& disk, const AbortBucketOperationsCommand& cmd); bool diskHasActiveOperationForAbortedBucket(const Disk& disk, const AbortBucketOperationsCommand& cmd) const; void waitUntilNoActiveOperationsForAbortedBuckets(Disk& disk, const AbortBucketOperationsCommand& cmd); @@ -287,4 +272,3 @@ private: }; } // storage - diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp index 6644d09da7f..b4735c2961a 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp @@ -29,10 +29,8 @@ using document::BucketSpace; namespace storage { FileStorManager:: -FileStorManager(const config::ConfigUri & configUri, - const spi::PartitionStateList& partitions, - spi::PersistenceProvider& provider, - ServiceLayerComponentRegister& compReg) +FileStorManager(const config::ConfigUri & configUri, const spi::PartitionStateList& partitions, + spi::PersistenceProvider& provider, ServiceLayerComponentRegister& compReg) : StorageLinkQueued("File store manager", compReg), framework::HtmlStatusReporter("filestorman", "File store manager"), _compReg(compReg), @@ -83,8 +81,7 @@ FileStorManager::~FileStorManager() } } } - LOG(debug, "Closing all filestor queues, answering queued messages. " - "New messages will be refused."); + LOG(debug, "Closing all filestor queues, answering queued messages. New messages will be refused."); _filestorHandler->close(); LOG(debug, "Deleting filestor threads. Waiting for their current operation " "to finish. Stop their threads and delete objects."); @@ -116,38 +113,16 @@ FileStorManager::configure(std::unique_ptr<vespa::config::content::StorFilestorC _disks.resize(_component.getDiskCount()); - _metrics->initDiskMetrics( - _disks.size(), - _component.getLoadTypes()->getMetricLoadTypes(), - (_config->threads.size() > 0) ? (_config->threads.size()) : 6); + size_t numThreads = _config->numThreads; + _metrics->initDiskMetrics(_disks.size(), _component.getLoadTypes()->getMetricLoadTypes(), numThreads); - _filestorHandler.reset(new FileStorHandler( - *this, *_metrics, _partitions, _compReg, - _config->maxPriorityToBlock, _config->minPriorityToBeBlocking)); + _filestorHandler.reset(new FileStorHandler(*this, *_metrics, _partitions, _compReg)); for (uint32_t i=0; i<_component.getDiskCount(); ++i) { if (_partitions[i].isUp()) { - if (_config->threads.size() == 0) { - LOG(spam, "Setting up disk %u", i); - for (uint32_t j = 0; j < 4; j++) { - _disks[i].push_back(DiskThread::SP( - new PersistenceThread(_compReg, _configUri, *_provider, *_filestorHandler, - *_metrics->disks[i]->threads[j], i, 255))); - - } - for (uint32_t j = 4; j < 6; j++) { - _disks[i].push_back(DiskThread::SP( - new PersistenceThread(_compReg, _configUri, *_provider, *_filestorHandler, - *_metrics->disks[i]->threads[j], i, 100))); - } - } - - for (uint16_t j = 0; j < _config->threads.size(); j++) { - LOG(spam, "Setting up disk %u, thread %u with priority %d", - i, j, _config->threads[j].lowestpri); - _disks[i].push_back(DiskThread::SP( - new PersistenceThread(_compReg, _configUri, *_provider, *_filestorHandler, - *_metrics->disks[i]->threads[j], i, _config->threads[j].lowestpri))); - + LOG(spam, "Setting up disk %u", i); + for (uint32_t j = 0; j < numThreads; j++) { + _disks[i].push_back(std::make_shared<PersistenceThread>(_compReg, _configUri, *_provider, *_filestorHandler, + *_metrics->disks[i]->threads[j], i)); } } else { _filestorHandler->disable(i); @@ -157,36 +132,28 @@ FileStorManager::configure(std::unique_ptr<vespa::config::content::StorFilestorC } void -FileStorManager::replyDroppedOperation(api::StorageMessage& msg, - const document::Bucket& bucket, - api::ReturnCode::Result returnCode, - vespalib::stringref reason) +FileStorManager::replyDroppedOperation(api::StorageMessage& msg, const document::Bucket& bucket, + api::ReturnCode::Result returnCode, vespalib::stringref reason) { std::ostringstream error; error << "Dropping " << msg.getType() << " to bucket " << bucket.toString() << ". Reason: " << reason; LOGBT(debug, bucket.toString(), "%s", error.str().c_str()); if (!msg.getType().isReply()) { - std::shared_ptr<api::StorageReply> reply( - static_cast<api::StorageCommand&>(msg).makeReply().release()); + std::shared_ptr<api::StorageReply> reply = static_cast<api::StorageCommand&>(msg).makeReply(); reply->setResult(api::ReturnCode(returnCode, error.str())); sendUp(reply); } } void -FileStorManager::replyWithBucketNotFound(api::StorageMessage& msg, - const document::Bucket& bucket) +FileStorManager::replyWithBucketNotFound(api::StorageMessage& msg, const document::Bucket& bucket) { - replyDroppedOperation(msg, - bucket, - api::ReturnCode::BUCKET_NOT_FOUND, - "bucket does not exist"); + replyDroppedOperation(msg, bucket, api::ReturnCode::BUCKET_NOT_FOUND, "bucket does not exist"); } StorBucketDatabase::WrappedEntry -FileStorManager::mapOperationToDisk(api::StorageMessage& msg, - const document::Bucket& bucket) +FileStorManager::mapOperationToDisk(api::StorageMessage& msg, const document::Bucket& bucket) { StorBucketDatabase::WrappedEntry entry(_component.getBucketDatabase(bucket.getBucketSpace()).get( bucket.getBucketId(), "FileStorManager::mapOperationToDisk")); @@ -197,8 +164,7 @@ FileStorManager::mapOperationToDisk(api::StorageMessage& msg, } StorBucketDatabase::WrappedEntry -FileStorManager::mapOperationToBucketAndDisk(api::BucketCommand& cmd, - const document::DocumentId* docId) +FileStorManager::mapOperationToBucketAndDisk(api::BucketCommand& cmd, const document::DocumentId* docId) { StorBucketDatabase &database = _component.getBucketDatabase(cmd.getBucket().getBucketSpace()); StorBucketDatabase::WrappedEntry entry(database.get( @@ -208,17 +174,12 @@ FileStorManager::mapOperationToBucketAndDisk(api::BucketCommand& cmd, if (docId) { specific = _bucketIdFactory.getBucketId(*docId); } - typedef std::map<document::BucketId, - StorBucketDatabase::WrappedEntry> BucketMap; + typedef std::map<document::BucketId, StorBucketDatabase::WrappedEntry> BucketMap; std::shared_ptr<api::StorageReply> reply; { - BucketMap results( - database.getContained( - specific, "FileStorManager::mapOperationToBucketAndDisk-2")); + BucketMap results( database.getContained( specific, "FileStorManager::mapOperationToBucketAndDisk-2")); if (results.size() == 1) { - LOG(debug, - "Remapping %s operation to specific %s versus " - "non-existing %s to %s.", + LOG(debug, "Remapping %s operation to specific %s versus non-existing %s to %s.", cmd.toString().c_str(), specific.toString().c_str(), cmd.getBucketId().toString().c_str(), results.begin()->first.toString().c_str()); @@ -243,10 +204,8 @@ FileStorManager::mapOperationToBucketAndDisk(api::BucketCommand& cmd, } LOGBT(debug, cmd.getBucketId().toString(), "%s", error.str().c_str()); - reply.reset(static_cast<api::StorageCommand&>(cmd).makeReply().release()); - reply->setResult( - api::ReturnCode( - api::ReturnCode::BUCKET_NOT_FOUND, error.str())); + reply = static_cast<api::StorageCommand&>(cmd).makeReply(); + reply->setResult( api::ReturnCode( api::ReturnCode::BUCKET_NOT_FOUND, error.str())); } sendUp(reply); } @@ -254,18 +213,15 @@ FileStorManager::mapOperationToBucketAndDisk(api::BucketCommand& cmd, } bool -FileStorManager::handlePersistenceMessage( - const shared_ptr<api::StorageMessage>& msg, uint16_t disk) +FileStorManager::handlePersistenceMessage( const shared_ptr<api::StorageMessage>& msg, uint16_t disk) { api::ReturnCode errorCode(api::ReturnCode::OK); do { - LOG(spam, "Received %s. Attempting to queue it to disk %u.", - msg->getType().getName().c_str(), disk); + LOG(spam, "Received %s. Attempting to queue it to disk %u.", msg->getType().getName().c_str(), disk); LOG_BUCKET_OPERATION_NO_LOCK( getStorageMessageBucket(*msg).getBucketId(), - vespalib::make_string("Attempting to queue %s to disk %u", - msg->toString().c_str(), disk)); + vespalib::make_string("Attempting to queue %s to disk %u", msg->toString().c_str(), disk)); if (_filestorHandler->schedule(msg, disk)) { @@ -275,12 +231,10 @@ FileStorManager::handlePersistenceMessage( } switch (_filestorHandler->getDiskState(disk)) { case FileStorHandler::DISABLED: - errorCode = api::ReturnCode(api::ReturnCode::DISK_FAILURE, - "Disk disabled"); + errorCode = api::ReturnCode(api::ReturnCode::DISK_FAILURE, "Disk disabled"); break; case FileStorHandler::CLOSED: - errorCode = api::ReturnCode(api::ReturnCode::ABORTED, - "Shutting down storage node."); + errorCode = api::ReturnCode(api::ReturnCode::ABORTED, "Shutting down storage node."); break; case FileStorHandler::AVAILABLE: assert(false); @@ -289,8 +243,7 @@ FileStorManager::handlePersistenceMessage( // If we get here, we failed to schedule message. errorCode says why // We need to reply to message (while not having bucket lock) if (!msg->getType().isReply()) { - std::shared_ptr<api::StorageReply> reply( - static_cast<api::StorageCommand&>(*msg).makeReply().release()); + std::shared_ptr<api::StorageReply> reply = static_cast<api::StorageCommand&>(*msg).makeReply(); reply->setResult(errorCode); LOG(spam, "Received persistence message %s. Returning reply: %s", msg->getType().getName().c_str(), errorCode.toString().c_str()); @@ -303,7 +256,7 @@ bool FileStorManager::onPut(const shared_ptr<api::PutCommand>& cmd) { if (cmd->getTimestamp() == 0) { - shared_ptr<api::StorageReply> reply(cmd->makeReply().release()); + shared_ptr<api::StorageReply> reply = cmd->makeReply(); std::string msg("Put command received without timestamp set. " "Distributor need to set timestamp to ensure equal " "timestamps between storage nodes. Rejecting."); @@ -311,8 +264,7 @@ FileStorManager::onPut(const shared_ptr<api::PutCommand>& cmd) sendUp(reply); return true; } - StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk( - *cmd, &cmd->getDocumentId())); + StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk(*cmd, &cmd->getDocumentId())); if (entry.exist()) { handlePersistenceMessage(cmd, entry->disk); } @@ -323,7 +275,7 @@ bool FileStorManager::onUpdate(const shared_ptr<api::UpdateCommand>& cmd) { if (cmd->getTimestamp() == 0) { - shared_ptr<api::StorageReply> reply(cmd->makeReply().release()); + shared_ptr<api::StorageReply> reply = cmd->makeReply(); std::string msg("Update command received without timestamp set. " "Distributor need to set timestamp to ensure equal " "timestamps between storage nodes. Rejecting."); @@ -331,8 +283,7 @@ FileStorManager::onUpdate(const shared_ptr<api::UpdateCommand>& cmd) sendUp(reply); return true; } - StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk( - *cmd, &cmd->getDocumentId())); + StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk(*cmd, &cmd->getDocumentId())); if (entry.exist()) { handlePersistenceMessage(cmd, entry->disk); } @@ -342,8 +293,7 @@ FileStorManager::onUpdate(const shared_ptr<api::UpdateCommand>& cmd) bool FileStorManager::onGet(const shared_ptr<api::GetCommand>& cmd) { - StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk( - *cmd, &cmd->getDocumentId())); + StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk(*cmd, &cmd->getDocumentId())); if (entry.exist()) { handlePersistenceMessage(cmd, entry->disk); } @@ -354,7 +304,7 @@ bool FileStorManager::onRemove(const shared_ptr<api::RemoveCommand>& cmd) { if (cmd->getTimestamp() == 0) { - shared_ptr<api::StorageReply> reply(cmd->makeReply().release()); + shared_ptr<api::StorageReply> reply = cmd->makeReply(); std::string msg("Remove command received without timestamp set. " "Distributor need to set timestamp to ensure equal " "timestamps between storage nodes. Rejecting."); @@ -362,8 +312,7 @@ FileStorManager::onRemove(const shared_ptr<api::RemoveCommand>& cmd) sendUp(reply); return true; } - StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk( - *cmd, &cmd->getDocumentId())); + StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk(*cmd, &cmd->getDocumentId())); if (entry.exist()) { handlePersistenceMessage(cmd, entry->disk); } @@ -373,8 +322,7 @@ FileStorManager::onRemove(const shared_ptr<api::RemoveCommand>& cmd) bool FileStorManager::onRevert(const shared_ptr<api::RevertCommand>& cmd) { - StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk( - *cmd, 0)); + StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk(*cmd, 0)); if (entry.exist()) { handlePersistenceMessage(cmd, entry->disk); } @@ -423,12 +371,10 @@ FileStorManager::onCreateBucket( bucket.getBucketId(), "FileStorManager::onCreateBucket", StorBucketDatabase::CREATE_IF_NONEXISTING)); if (entry.preExisted()) { - LOG(debug, - "Got create bucket request for %s which already exists: %s", + LOG(debug, "Got create bucket request for %s which already exists: %s", cmd->getBucketId().toString().c_str(), entry->getBucketInfo().toString().c_str()); - code = api::ReturnCode(api::ReturnCode::EXISTS, - "Bucket already exist"); + code = api::ReturnCode(api::ReturnCode::EXISTS, "Bucket already exist"); } else { entry->disk = _component.getIdealPartition(cmd->getBucket()); if (_partitions[entry->disk].isUp()) { @@ -445,12 +391,9 @@ FileStorManager::onCreateBucket( return true; } else { entry.remove(); - code = api::ReturnCode( - api::ReturnCode::IO_FAILURE, - vespalib::make_string( - "Trying to create bucket %s on disabled disk %d", - cmd->getBucketId().toString().c_str(), - entry->disk)); + code = api::ReturnCode(api::ReturnCode::IO_FAILURE, + vespalib::make_string("Trying to create bucket %s on disabled disk %d", + cmd->getBucketId().toString().c_str(), entry->disk)); } } } @@ -471,7 +414,7 @@ FileStorManager::onDeleteBucket(const shared_ptr<api::DeleteBucketCommand>& cmd) "FileStorManager::onDeleteBucket")); if (!entry.exist()) { LOG(debug, "%s was already deleted", cmd->getBucketId().toString().c_str()); - std::shared_ptr<api::StorageReply> reply(cmd->makeReply().release()); + std::shared_ptr<api::StorageReply> reply = cmd->makeReply(); sendUp(reply); return true; } @@ -493,7 +436,7 @@ FileStorManager::onDeleteBucket(const shared_ptr<api::DeleteBucketCommand>& cmd) << entry->getBucketInfo().toString(); LOG(debug, "Rejecting bucket delete: %s", ost.str().c_str()); - std::shared_ptr<api::StorageReply> reply(cmd->makeReply().release()); + std::shared_ptr<api::StorageReply> reply = cmd->makeReply(); static_cast<api::DeleteBucketReply&>(*reply).setBucketInfo(entry->getBucketInfo()); reply->setResult(api::ReturnCode(api::ReturnCode::REJECTED, ost.str())); entry.unlock(); @@ -543,8 +486,7 @@ FileStorManager::ensureConsistentBucket( bool FileStorManager::onMergeBucket(const shared_ptr<api::MergeBucketCommand>& cmd) { - StorBucketDatabase::WrappedEntry entry(ensureConsistentBucket(cmd->getBucket(), *cmd, - "FileStorManager::onMergeBucket")); + StorBucketDatabase::WrappedEntry entry(ensureConsistentBucket(cmd->getBucket(), *cmd, "FileStorManager::onMergeBucket")); if (!entry.exist()) { return true; } @@ -567,7 +509,7 @@ FileStorManager::onMergeBucket(const shared_ptr<api::MergeBucketCommand>& cmd) cmd->toString().c_str(), entry->disk, cmd->getClusterStateVersion(), _component.getStateUpdater().getClusterStateBundle()->getVersion())); LOGBT(debug, cmd->getBucketId().toString(), "%s", code.getMessage().c_str()); - api::MergeBucketReply::SP reply(new api::MergeBucketReply(*cmd)); + auto reply = std::make_shared<api::MergeBucketReply>(*cmd); reply->setResult(code); sendUp(reply); return true; @@ -582,39 +524,30 @@ bool FileStorManager::onGetBucketDiff( const shared_ptr<api::GetBucketDiffCommand>& cmd) { - StorBucketDatabase::WrappedEntry entry( - ensureConsistentBucket(cmd->getBucket(), - *cmd, - "FileStorManager::onGetBucketDiff")); + StorBucketDatabase::WrappedEntry entry(ensureConsistentBucket(cmd->getBucket(), *cmd, "FileStorManager::onGetBucketDiff")); if (!entry.exist()) { return true; } if (!entry.preExisted()) { entry->disk = _component.getIdealPartition(cmd->getBucket()); if (_partitions[entry->disk].isUp()) { - LOG(debug, "Created bucket %s on disk %d (node index is %d) due " - "to get bucket diff being received.", - cmd->getBucketId().toString().c_str(), - entry->disk, _component.getIndex()); + LOG(debug, "Created bucket %s on disk %d (node index is %d) due to get bucket diff being received.", + cmd->getBucketId().toString().c_str(), entry->disk, _component.getIndex()); entry->info.setTotalDocumentSize(0); entry->info.setUsedFileSize(0); entry->info.setReady(true); - // Call before writing bucket entry as we need to have bucket - // lock while calling + // Call before writing bucket entry as we need to have bucket + // lock while calling handlePersistenceMessage(cmd, entry->disk); entry.write(); } else { entry.remove(); api::ReturnCode code(api::ReturnCode::IO_FAILURE, vespalib::make_string( - "Trying to merge non-existing bucket %s, which " - "can't be created because target disk %d is down", - cmd->getBucketId().toString().c_str(), - entry->disk)); - LOGBT(warning, cmd->getBucketId().toString(), - "%s", code.getMessage().c_str()); - api::GetBucketDiffReply::SP reply( - new api::GetBucketDiffReply(*cmd)); + "Trying to merge non-existing bucket %s, which can't be created because target disk %d is down", + cmd->getBucketId().toString().c_str(), entry->disk)); + LOGBT(warning, cmd->getBucketId().toString(), "%s", code.getMessage().c_str()); + auto reply = std::make_shared<api::GetBucketDiffReply>(*cmd); reply->setResult(code); sendUp(reply); return true; @@ -634,8 +567,7 @@ FileStorManager::validateApplyDiffCommandBucket(api::StorageMessage& msg, const BucketSpace bucketSpace(msg.getBucket().getBucketSpace()); if (!_component.getBucketDatabase(bucketSpace).isConsistent(entry)) { document::Bucket bucket(bucketSpace, entry.getBucketId()); - replyDroppedOperation(msg, bucket, api::ReturnCode::ABORTED, - "bucket became inconsistent during merging"); + replyDroppedOperation(msg, bucket, api::ReturnCode::ABORTED, "bucket became inconsistent during merging"); return false; } return true; @@ -715,8 +647,7 @@ FileStorManager::onSplitBucket(const std::shared_ptr<api::SplitBucketCommand>& c } bool -FileStorManager::onSetBucketState( - const std::shared_ptr<api::SetBucketStateCommand>& cmd) +FileStorManager::onSetBucketState(const std::shared_ptr<api::SetBucketStateCommand>& cmd) { StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucket())); if (entry.exist()) { @@ -821,7 +752,7 @@ void FileStorManager::handleAbortBucketOperations(const shared_ptr<AbortBucketOperationsCommand>& cmd) { _filestorHandler->abortQueuedOperations(*cmd); - sendReply(api::StorageReply::SP(cmd->makeReply().release())); + sendReply(api::StorageReply::SP(cmd->makeReply())); } bool @@ -913,8 +844,7 @@ void FileStorManager::onFlush(bool downwards) } void -FileStorManager::reportHtmlStatus(std::ostream& out, - const framework::HttpUrlPath& path) const +FileStorManager::reportHtmlStatus(std::ostream& out, const framework::HttpUrlPath& path) const { bool showStatus = !path.hasAttribute("thread"); bool verbose = path.hasAttribute("verbose"); @@ -962,7 +892,7 @@ FileStorManager::updateState() bool nodeUp = state->getNodeState(node).getState().oneOf("uir"); LOG(debug, "FileStorManager received cluster state '%s'", state->toString().c_str()); - // If edge where we go down + // If edge where we go down if (_nodeUpInLastNodeStateSeenByProvider && !nodeUp) { LOG(debug, "Received cluster state where this node is down; de-activating all buckets in database"); Deactivator deactivator; diff --git a/storage/src/vespa/storage/persistence/filestorage/pausehandler.h b/storage/src/vespa/storage/persistence/filestorage/pausehandler.h deleted file mode 100644 index 59c543b0067..00000000000 --- a/storage/src/vespa/storage/persistence/filestorage/pausehandler.h +++ /dev/null @@ -1,34 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -/** - * @class PauseHandler - * @ingroup persistence - * - * @brief Object that can be used to possibly pause running operation - */ -#pragma once - -#include <vespa/storage/persistence/filestorage/filestorhandler.h> - -namespace storage { - -class PauseHandler { - FileStorHandler* _handler; - uint16_t _disk; - uint8_t _priority; - -public: - PauseHandler() : _handler(0), _disk(0), _priority(0) {} - PauseHandler(FileStorHandler& handler, uint16_t disk) - : _handler(&handler), - _disk(disk), - _priority(0) - { - } - - void setPriority(uint8_t priority) { _priority = priority; } - - void pause() const { if (_handler != 0) _handler->pause(_disk, _priority); } -}; - -} // storage - diff --git a/storage/src/vespa/storage/persistence/persistencethread.cpp b/storage/src/vespa/storage/persistence/persistencethread.cpp index 6c4cd7b64d2..be6f9577642 100644 --- a/storage/src/vespa/storage/persistence/persistencethread.cpp +++ b/storage/src/vespa/storage/persistence/persistencethread.cpp @@ -9,7 +9,6 @@ #include <vespa/document/fieldset/fieldsetrepo.h> #include <vespa/vespalib/stllike/hash_map.hpp> #include <vespa/vespalib/util/exceptions.h> -#include <algorithm> #include <vespa/log/bufferedlogger.h> LOG_SETUP(".persistence.thread"); @@ -21,9 +20,8 @@ PersistenceThread::PersistenceThread(ServiceLayerComponentRegister& compReg, spi::PersistenceProvider& provider, FileStorHandler& filestorHandler, FileStorThreadMetrics& metrics, - uint16_t deviceIndex, - uint8_t lowestPriority) - : _env(configUri, compReg, filestorHandler, metrics, deviceIndex, lowestPriority, provider), + uint16_t deviceIndex) + : _env(configUri, compReg, filestorHandler, metrics, deviceIndex, provider), _warnOnSlowOperations(5000), _spi(provider), _processAllHandler(_env, provider), @@ -106,19 +104,14 @@ bool PersistenceThread::tasConditionMatches(const api::TestAndSetCommand & cmd, MessageTracker::UP PersistenceThread::handlePut(api::PutCommand& cmd) { - MessageTracker::UP tracker(new MessageTracker( - _env._metrics.put[cmd.getLoadType()], - _env._component.getClock())); + auto tracker = std::make_unique<MessageTracker>(_env._metrics.put[cmd.getLoadType()],_env._component.getClock()); if (tasConditionExists(cmd) && !tasConditionMatches(cmd, *tracker)) { return tracker; } - spi::Result response = - _spi.put(getBucket(cmd.getDocumentId(), cmd.getBucket()), - spi::Timestamp(cmd.getTimestamp()), - cmd.getDocument(), - _context); + spi::Result response = _spi.put(getBucket(cmd.getDocumentId(), cmd.getBucket()), + spi::Timestamp(cmd.getTimestamp()), cmd.getDocument(), _context); checkForError(response, *tracker); return tracker; } @@ -126,22 +119,16 @@ PersistenceThread::handlePut(api::PutCommand& cmd) MessageTracker::UP PersistenceThread::handleRemove(api::RemoveCommand& cmd) { - MessageTracker::UP tracker(new MessageTracker( - _env._metrics.remove[cmd.getLoadType()], - _env._component.getClock())); + auto tracker = std::make_unique<MessageTracker>(_env._metrics.remove[cmd.getLoadType()],_env._component.getClock()); if (tasConditionExists(cmd) && !tasConditionMatches(cmd, *tracker)) { return tracker; } - spi::RemoveResult response = - _spi.removeIfFound(getBucket(cmd.getDocumentId(), cmd.getBucket()), - spi::Timestamp(cmd.getTimestamp()), - cmd.getDocumentId(), _context); + spi::RemoveResult response = _spi.removeIfFound(getBucket(cmd.getDocumentId(), cmd.getBucket()), + spi::Timestamp(cmd.getTimestamp()), cmd.getDocumentId(), _context); if (checkForError(response, *tracker)) { - api::RemoveReply* reply(new api::RemoveReply( - cmd, response.wasFound() ? cmd.getTimestamp() : 0)); - tracker->setReply(api::StorageReply::SP(reply)); + tracker->setReply(std::make_shared<api::RemoveReply>(cmd, response.wasFound() ? cmd.getTimestamp() : 0)); } if (!response.wasFound()) { ++_env._metrics.remove[cmd.getLoadType()].notFound; @@ -152,22 +139,18 @@ PersistenceThread::handleRemove(api::RemoveCommand& cmd) MessageTracker::UP PersistenceThread::handleUpdate(api::UpdateCommand& cmd) { - MessageTracker::UP tracker(new MessageTracker( - _env._metrics.update[cmd.getLoadType()], - _env._component.getClock())); + auto tracker = std::make_unique<MessageTracker>(_env._metrics.update[cmd.getLoadType()],_env._component.getClock()); if (tasConditionExists(cmd) && !tasConditionMatches(cmd, *tracker)) { return tracker; } - spi::UpdateResult response = - _spi.update(getBucket(cmd.getUpdate()->getId(), cmd.getBucket()), - spi::Timestamp(cmd.getTimestamp()), - cmd.getUpdate(), _context); + spi::UpdateResult response = _spi.update(getBucket(cmd.getUpdate()->getId(), cmd.getBucket()), + spi::Timestamp(cmd.getTimestamp()), cmd.getUpdate(), _context); if (checkForError(response, *tracker)) { - api::UpdateReply* reply = new api::UpdateReply(cmd); + auto reply = std::make_shared<api::UpdateReply>(cmd); reply->setOldTimestamp(response.getExistingTimestamp()); - tracker->setReply(api::StorageReply::SP(reply)); + tracker->setReply(std::move(reply)); } return tracker; } @@ -175,30 +158,18 @@ PersistenceThread::handleUpdate(api::UpdateCommand& cmd) MessageTracker::UP PersistenceThread::handleGet(api::GetCommand& cmd) { - MessageTracker::UP tracker(new MessageTracker( - _env._metrics.get[cmd.getLoadType()], - _env._component.getClock())); + auto tracker = std::make_unique<MessageTracker>(_env._metrics.get[cmd.getLoadType()],_env._component.getClock()); document::FieldSetRepo repo; - document::FieldSet::UP fieldSet = repo.parse(*_env._component.getTypeRepo(), - cmd.getFieldSet()); + document::FieldSet::UP fieldSet = repo.parse(*_env._component.getTypeRepo(), cmd.getFieldSet()); spi::GetResult result = - _spi.get(getBucket(cmd.getDocumentId(), cmd.getBucket()), - *fieldSet, - cmd.getDocumentId(), - _context); + _spi.get(getBucket(cmd.getDocumentId(), cmd.getBucket()), *fieldSet, cmd.getDocumentId(), _context); if (checkForError(result, *tracker)) { if (!result.hasDocument()) { ++_env._metrics.get[cmd.getLoadType()].notFound; } - - api::GetReply::UP reply( - new api::GetReply(cmd, - Document::SP(result.getDocumentPtr()), - result.getTimestamp())); - - tracker->setReply(api::StorageReply::SP(reply.release())); + tracker->setReply(std::make_shared<api::GetReply>(cmd, result.getDocumentPtr(), result.getTimestamp())); } return tracker; @@ -207,19 +178,14 @@ PersistenceThread::handleGet(api::GetCommand& cmd) MessageTracker::UP PersistenceThread::handleRepairBucket(RepairBucketCommand& cmd) { - MessageTracker::UP tracker(new MessageTracker( - _env._metrics.repairs, - _env._component.getClock())); + auto tracker = std::make_unique<MessageTracker>(_env._metrics.repairs,_env._component.getClock()); NotificationGuard notifyGuard(*_bucketOwnershipNotifier); - LOG(debug, "Repair(%s): %s", - cmd.getBucketId().toString().c_str(), + LOG(debug, "Repair(%s): %s", cmd.getBucketId().toString().c_str(), (cmd.verifyBody() ? "Verifying body" : "Not verifying body")); api::BucketInfo before = _env.getBucketInfo(cmd.getBucket()); spi::Result result = - _spi.maintain(spi::Bucket(cmd.getBucket(), - spi::PartitionId(_env._partition)), - cmd.verifyBody() ? - spi::HIGH : spi::LOW); + _spi.maintain(spi::Bucket(cmd.getBucket(), spi::PartitionId(_env._partition)), + cmd.verifyBody() ? spi::HIGH : spi::LOW); if (checkForError(result, *tracker)) { api::BucketInfo after = _env.getBucketInfo(cmd.getBucket()); @@ -239,15 +205,11 @@ PersistenceThread::handleRepairBucket(RepairBucketCommand& cmd) MessageTracker::UP PersistenceThread::handleRevert(api::RevertCommand& cmd) { - MessageTracker::UP tracker(new MessageTracker( - _env._metrics.revert[cmd.getLoadType()], - _env._component.getClock())); + auto tracker = std::make_unique<MessageTracker>(_env._metrics.revert[cmd.getLoadType()],_env._component.getClock()); spi::Bucket b = spi::Bucket(cmd.getBucket(), spi::PartitionId(_env._partition)); - const std::vector<api::Timestamp> tokens = cmd.getRevertTokens(); - for (uint32_t i = 0; i < tokens.size(); ++i) { - spi::Result result = _spi.removeEntry(b, - spi::Timestamp(tokens[i]), - _context); + const std::vector<api::Timestamp> & tokens = cmd.getRevertTokens(); + for (const api::Timestamp & token : tokens) { + spi::Result result = _spi.removeEntry(b, spi::Timestamp(token), _context); } return tracker; } @@ -255,9 +217,7 @@ PersistenceThread::handleRevert(api::RevertCommand& cmd) MessageTracker::UP PersistenceThread::handleCreateBucket(api::CreateBucketCommand& cmd) { - MessageTracker::UP tracker(new MessageTracker( - _env._metrics.createBuckets, - _env._component.getClock())); + auto tracker = std::make_unique<MessageTracker>(_env._metrics.createBuckets,_env._component.getClock()); LOG(debug, "CreateBucket(%s)", cmd.getBucketId().toString().c_str()); if (_env._fileStorHandler.isMerging(cmd.getBucket())) { LOG(warning, "Bucket %s was merging at create time. Unexpected.", @@ -299,8 +259,7 @@ PersistenceThread::checkProviderBucketInfoMatches(const spi::Bucket& bucket, result.getErrorMessage().c_str()); return false; } - api::BucketInfo providerInfo( - _env.convertBucketInfo(result.getBucketInfo())); + api::BucketInfo providerInfo(_env.convertBucketInfo(result.getBucketInfo())); // Don't check meta fields or active/ready fields since these are not // that important and ready may change under the hood in a race with // getModifiedBuckets(). If bucket is empty it means it has already @@ -323,15 +282,12 @@ PersistenceThread::checkProviderBucketInfoMatches(const spi::Bucket& bucket, MessageTracker::UP PersistenceThread::handleDeleteBucket(api::DeleteBucketCommand& cmd) { - MessageTracker::UP tracker(new MessageTracker( - _env._metrics.deleteBuckets, - _env._component.getClock())); + auto tracker = std::make_unique<MessageTracker>(_env._metrics.deleteBuckets,_env._component.getClock()); LOG(debug, "DeletingBucket(%s)", cmd.getBucketId().toString().c_str()); LOG_BUCKET_OPERATION(cmd.getBucketId(), "deleteBucket()"); if (_env._fileStorHandler.isMerging(cmd.getBucket())) { _env._fileStorHandler.clearMergeStatus(cmd.getBucket(), - api::ReturnCode(api::ReturnCode::ABORTED, - "Bucket was deleted during the merge")); + api::ReturnCode(api::ReturnCode::ABORTED, "Bucket was deleted during the merge")); } spi::Bucket bucket(cmd.getBucket(), spi::PartitionId(_env._partition)); if (!checkProviderBucketInfoMatches(bucket, cmd.getBucketInfo())) { @@ -340,8 +296,7 @@ PersistenceThread::handleDeleteBucket(api::DeleteBucketCommand& cmd) _spi.deleteBucket(bucket, _context); StorBucketDatabase& db(_env.getBucketDatabase(cmd.getBucket().getBucketSpace())); { - StorBucketDatabase::WrappedEntry entry(db.get( - cmd.getBucketId(), "FileStorThread::onDeleteBucket")); + StorBucketDatabase::WrappedEntry entry(db.get(cmd.getBucketId(), "FileStorThread::onDeleteBucket")); if (entry.exist() && entry->getMetaCount() > 0) { LOG(debug, "onDeleteBucket(%s): Bucket DB entry existed. Likely " "active operation when delete bucket was queued. " @@ -365,11 +320,8 @@ PersistenceThread::handleDeleteBucket(api::DeleteBucketCommand& cmd) MessageTracker::UP PersistenceThread::handleGetIter(GetIterCommand& cmd) { - MessageTracker::UP tracker(new MessageTracker( - _env._metrics.visit[cmd.getLoadType()], - _env._component.getClock())); - spi::IterateResult result(_spi.iterate(cmd.getIteratorId(), - cmd.getMaxByteSize(), _context)); + auto tracker = std::make_unique<MessageTracker>(_env._metrics.visit[cmd.getLoadType()],_env._component.getClock()); + spi::IterateResult result(_spi.iterate(cmd.getIteratorId(), cmd.getMaxByteSize(), _context)); if (checkForError(result, *tracker)) { GetIterReply::SP reply(new GetIterReply(cmd)); reply->getEntries() = result.steal_entries(); @@ -386,13 +338,11 @@ PersistenceThread::handleGetIter(GetIterCommand& cmd) MessageTracker::UP PersistenceThread::handleReadBucketList(ReadBucketList& cmd) { - MessageTracker::UP tracker(new MessageTracker( - _env._metrics.readBucketList, - _env._component.getClock())); + auto tracker = std::make_unique<MessageTracker>(_env._metrics.readBucketList,_env._component.getClock()); spi::BucketIdListResult result(_spi.listBuckets(cmd.getBucketSpace(), cmd.getPartition())); if (checkForError(result, *tracker)) { - ReadBucketListReply::SP reply(new ReadBucketListReply(cmd)); + auto reply = std::make_shared<ReadBucketListReply>(cmd); result.getList().swap(reply->getBuckets()); tracker->setReply(reply); } @@ -403,36 +353,24 @@ PersistenceThread::handleReadBucketList(ReadBucketList& cmd) MessageTracker::UP PersistenceThread::handleReadBucketInfo(ReadBucketInfo& cmd) { - MessageTracker::UP tracker(new MessageTracker( - _env._metrics.readBucketInfo, - _env._component.getClock())); - - _env.updateBucketDatabase(cmd.getBucket(), - _env.getBucketInfo(cmd.getBucket())); + auto tracker = std::make_unique<MessageTracker>(_env._metrics.readBucketInfo,_env._component.getClock()); + _env.updateBucketDatabase(cmd.getBucket(), _env.getBucketInfo(cmd.getBucket())); return tracker; } MessageTracker::UP PersistenceThread::handleCreateIterator(CreateIteratorCommand& cmd) { - MessageTracker::UP tracker(new MessageTracker( - _env._metrics.createIterator, - _env._component.getClock())); + auto tracker = std::make_unique<MessageTracker>(_env._metrics.createIterator,_env._component.getClock()); document::FieldSetRepo repo; - document::FieldSet::UP fieldSet = repo.parse(*_env._component.getTypeRepo(), - cmd.getFields()); + document::FieldSet::UP fieldSet = repo.parse(*_env._component.getTypeRepo(), cmd.getFields()); // _context is reset per command, so it's safe to modify it like this. _context.setReadConsistency(cmd.getReadConsistency()); spi::CreateIteratorResult result(_spi.createIterator( spi::Bucket(cmd.getBucket(), spi::PartitionId(_env._partition)), - *fieldSet, - cmd.getSelection(), - cmd.getIncludedVersions(), - _context)); + *fieldSet, cmd.getSelection(), cmd.getIncludedVersions(), _context)); if (checkForError(result, *tracker)) { - tracker->setReply(CreateIteratorReply::SP( - new CreateIteratorReply( - cmd, spi::IteratorId(result.getIteratorId())))); + tracker->setReply(std::make_shared<CreateIteratorReply>(cmd, spi::IteratorId(result.getIteratorId()))); } return tracker; } @@ -447,10 +385,8 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd) // Calculate the various bucket ids involved. if (cmd.getBucketId().getUsedBits() >= 58) { - tracker->fail( - api::ReturnCode::ILLEGAL_PARAMETERS, - "Can't split anymore since maximum split bits " - "is already reached"); + tracker->fail(api::ReturnCode::ILLEGAL_PARAMETERS, + "Can't split anymore since maximum split bits is already reached"); return tracker; } if (cmd.getMaxSplitBits() <= cmd.getBucketId().getUsedBits()) { @@ -470,13 +406,11 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd) if (targetInfo.empty() || !_env._config.enableMultibitSplitOptimalization) { document::BucketId src(cmd.getBucketId()); document::BucketId target1(src.getUsedBits() + 1, src.getId()); - document::BucketId target2(src.getUsedBits() + 1, src.getId() - | (uint64_t(1) << src.getUsedBits())); + document::BucketId target2(src.getUsedBits() + 1, src.getId() | (uint64_t(1) << src.getUsedBits())); targetInfo = SplitBitDetector::Result(target1, target2, false); } if (targetInfo.failed()) { - tracker->fail(api::ReturnCode::INTERNAL_FAILURE, - targetInfo.getReason()); + tracker->fail(api::ReturnCode::INTERNAL_FAILURE, targetInfo.getReason()); return tracker; } // If we get here, we're splitting data in two. @@ -519,8 +453,9 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd) // Ensure to take them in rising order. StorBucketDatabase::WrappedEntry sourceEntry(_env.getBucketDatabase(spiBucket.getBucket().getBucketSpace()).get( cmd.getBucketId(), "PersistenceThread::handleSplitBucket-source")); - api::SplitBucketReply* splitReply(new api::SplitBucketReply(cmd)); - tracker->setReply(api::StorageReply::SP(splitReply)); + auto reply = std::make_shared<api::SplitBucketReply>(cmd); + api::SplitBucketReply & splitReply = *reply; + tracker->setReply(std::move(reply)); typedef std::pair<StorBucketDatabase::WrappedEntry, FileStorHandler::RemapInfo> TargetInfo; @@ -581,7 +516,7 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd) createTarget.toString().c_str()); _spi.createBucket(createTarget, _context); } - splitReply->getSplitInfo().push_back( + splitReply.getSplitInfo().push_back( api::SplitBucketReply::Entry( targets[i].second.bucket.getBucketId(), targets[i].first->getBucketInfo())); @@ -953,28 +888,23 @@ PersistenceThread::processMessage(api::StorageMessage& msg) ++_env._metrics.operations; if (msg.getType().isReply()) { try{ - _env._pauseHandler.setPriority(msg.getPriority()); LOG(debug, "Handling reply: %s", msg.toString().c_str()); LOG(spam, "Message content: %s", msg.toString(true).c_str()); handleReply(static_cast<api::StorageReply&>(msg)); } catch (std::exception& e) { // It's a reply, so nothing we can do. - LOG(debug, "Caught exception for %s: %s", - msg.toString().c_str(), - e.what()); + LOG(debug, "Caught exception for %s: %s", msg.toString().c_str(), e.what()); } } else { api::StorageCommand& initiatingCommand = static_cast<api::StorageCommand&>(msg); try { - int64_t startTime( - _component->getClock().getTimeInMillis().getTime()); + int64_t startTime(_component->getClock().getTimeInMillis().getTime()); LOG(debug, "Handling command: %s", msg.toString().c_str()); LOG(spam, "Message content: %s", msg.toString(true).c_str()); - std::unique_ptr<MessageTracker> tracker( - handleCommand(initiatingCommand)); + auto tracker(handleCommand(initiatingCommand)); if (!tracker.get()) { LOG(debug, "Received unsupported command %s", msg.getType().getName().c_str()); @@ -1080,23 +1010,18 @@ PersistenceThread::flushAllReplies( if (errorCode != 0) { for (uint32_t i = 0; i < replies.size(); ++i) { replies[i]->getReply()->setResult( - api::ReturnCode( - (api::ReturnCode::Result)errorCode, - result.getErrorMessage())); + api::ReturnCode((api::ReturnCode::Result)errorCode, result.getErrorMessage())); } } } catch (std::exception& e) { for (uint32_t i = 0; i < replies.size(); ++i) { - replies[i]->getReply()->setResult(api::ReturnCode( - api::ReturnCode::INTERNAL_FAILURE, e.what())); + replies[i]->getReply()->setResult(api::ReturnCode(api::ReturnCode::INTERNAL_FAILURE, e.what())); } } for (uint32_t i = 0; i < replies.size(); ++i) { - LOG(spam, - "Sending reply up (batched): %s %zu", - replies[i]->getReply()->toString().c_str(), - replies[i]->getReply()->getMsgId()); + LOG(spam, "Sending reply up (batched): %s %zu", + replies[i]->getReply()->toString().c_str(), replies[i]->getReply()->getMsgId()); _env._fileStorHandler.sendReply(replies[i]->getReply()); } @@ -1108,7 +1033,7 @@ void PersistenceThread::processMessages(FileStorHandler::LockedMessage & lock) std::vector<MessageTracker::UP> trackers; document::Bucket bucket = lock.first->getBucket(); - while (lock.second.get() != 0) { + while (lock.second) { LOG(debug, "Inside while loop %d, nodeIndex %d, ptr=%p", _env._partition, _env._nodeIndex, lock.second.get()); std::shared_ptr<api::StorageMessage> msg(lock.second); @@ -1121,7 +1046,7 @@ void PersistenceThread::processMessages(FileStorHandler::LockedMessage & lock) } std::unique_ptr<MessageTracker> tracker = processMessage(*msg); - if (!tracker.get() || !tracker->getReply().get()) { + if (!tracker || !tracker->getReply()) { // Was a reply break; } @@ -1133,24 +1058,18 @@ void PersistenceThread::processMessages(FileStorHandler::LockedMessage & lock) } if (batchable) { LOG(spam, "Adding reply %s to batch for bucket %s", - tracker->getReply()->toString().c_str(), - bucket.getBucketId().toString().c_str()); + tracker->getReply()->toString().c_str(), bucket.getBucketId().toString().c_str()); trackers.push_back(std::move(tracker)); if (trackers.back()->getReply()->getResult().success()) { - _env._fileStorHandler.getNextMessage( - _env._partition, - lock, - _env._lowestPriority); + _env._fileStorHandler.getNextMessage(_env._partition, lock); } else { break; } } else { - LOG(spam, - "Sending reply up: %s %zu", - tracker->getReply()->toString().c_str(), - tracker->getReply()->getMsgId()); + LOG(spam, "Sending reply up: %s %zu", + tracker->getReply()->toString().c_str(), tracker->getReply()->getMsgId()); _env._fileStorHandler.sendReply(tracker->getReply()); break; @@ -1165,16 +1084,12 @@ PersistenceThread::run(framework::ThreadHandle& thread) { LOG(debug, "Started persistence thread with pid %d", getpid()); - while (!thread.interrupted() - && !_env._fileStorHandler.closed(_env._partition)) - { + while (!thread.interrupted() && !_env._fileStorHandler.closed(_env._partition)) { thread.registerTick(); - FileStorHandler::LockedMessage lock( - _env._fileStorHandler.getNextMessage( - _env._partition, _env._lowestPriority)); + FileStorHandler::LockedMessage lock(_env._fileStorHandler.getNextMessage(_env._partition)); - if (lock.first.get()) { + if (lock.first) { processMessages(lock); } diff --git a/storage/src/vespa/storage/persistence/persistencethread.h b/storage/src/vespa/storage/persistence/persistencethread.h index 07fadb875cc..640614f7edf 100644 --- a/storage/src/vespa/storage/persistence/persistencethread.h +++ b/storage/src/vespa/storage/persistence/persistencethread.h @@ -21,7 +21,7 @@ class PersistenceThread final : public DiskThread, public Types public: PersistenceThread(ServiceLayerComponentRegister&, const config::ConfigUri & configUri, spi::PersistenceProvider& provider, FileStorHandler& filestorHandler, - FileStorThreadMetrics& metrics, uint16_t deviceIndex, uint8_t lowestPriority); + FileStorThreadMetrics& metrics, uint16_t deviceIndex); ~PersistenceThread(); /** Waits for current operation to be finished. */ diff --git a/storage/src/vespa/storage/persistence/persistenceutil.cpp b/storage/src/vespa/storage/persistence/persistenceutil.cpp index 8e96c05a66a..c2dcb8e2a29 100644 --- a/storage/src/vespa/storage/persistence/persistenceutil.cpp +++ b/storage/src/vespa/storage/persistence/persistenceutil.cpp @@ -66,7 +66,6 @@ PersistenceUtil::PersistenceUtil( FileStorHandler& fileStorHandler, FileStorThreadMetrics& metrics, uint16_t partition, - uint8_t lowestPriority, spi::PersistenceProvider& provider) : _config(*config::ConfigGetter<vespa::config::content::StorFilestorConfig>::getConfig(configUri.getConfigId(), configUri.getContext())), _compReg(compReg), @@ -77,24 +76,18 @@ PersistenceUtil::PersistenceUtil( _metrics(metrics), _bucketFactory(_component.getBucketIdFactory()), _repo(_component.getTypeRepo()), - _lowestPriority(lowestPriority), - _pauseHandler(), _spi(provider) { } -PersistenceUtil::~PersistenceUtil() -{ -} +PersistenceUtil::~PersistenceUtil() { } void -PersistenceUtil::updateBucketDatabase(const document::Bucket &bucket, - const api::BucketInfo& i) +PersistenceUtil::updateBucketDatabase(const document::Bucket &bucket, const api::BucketInfo& i) { // Update bucket database - StorBucketDatabase::WrappedEntry entry(getBucketDatabase(bucket.getBucketSpace()).get( - bucket.getBucketId(), - "env::updatebucketdb")); + StorBucketDatabase::WrappedEntry entry(getBucketDatabase(bucket.getBucketSpace()).get(bucket.getBucketId(), + "env::updatebucketdb")); if (entry.exist()) { api::BucketInfo info = i; @@ -106,9 +99,7 @@ PersistenceUtil::updateBucketDatabase(const document::Bucket &bucket, entry->setBucketInfo(info); entry.write(); } else { - LOG(debug, - "Bucket(%s).getBucketInfo: Bucket does not exist.", - bucket.getBucketId().toString().c_str()); + LOG(debug, "Bucket(%s).getBucketInfo: Bucket does not exist.", bucket.getBucketId().toString().c_str()); } } diff --git a/storage/src/vespa/storage/persistence/persistenceutil.h b/storage/src/vespa/storage/persistence/persistenceutil.h index 5126931bab6..bf347ccb10a 100644 --- a/storage/src/vespa/storage/persistence/persistenceutil.h +++ b/storage/src/vespa/storage/persistence/persistenceutil.h @@ -1,12 +1,11 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once +#include "types.h" #include <vespa/persistence/spi/persistenceprovider.h> #include <vespa/storage/common/servicelayercomponent.h> #include <vespa/storage/persistence/filestorage/filestorhandler.h> #include <vespa/storage/persistence/filestorage/filestormetrics.h> -#include <vespa/storage/persistence/filestorage/pausehandler.h> -#include <vespa/storage/persistence/types.h> #include <vespa/vespalib/io/fileutil.h> #include <vespa/storage/storageutil/utils.h> #include <vespa/config-stor-filestor.h> @@ -70,8 +69,6 @@ struct PersistenceUtil { FileStorThreadMetrics& _metrics; const document::BucketIdFactory& _bucketFactory; const std::shared_ptr<document::DocumentTypeRepo> _repo; - uint8_t _lowestPriority; - PauseHandler _pauseHandler; spi::PersistenceProvider& _spi; PersistenceUtil( @@ -80,7 +77,6 @@ struct PersistenceUtil { FileStorHandler& fileStorHandler, FileStorThreadMetrics& metrics, uint16_t partition, - uint8_t lowestPriority, spi::PersistenceProvider& provider); ~PersistenceUtil(); diff --git a/storage/src/vespa/storage/storageserver/storagenode.h b/storage/src/vespa/storage/storageserver/storagenode.h index 38a48ac2eed..c8739442c13 100644 --- a/storage/src/vespa/storage/storageserver/storagenode.h +++ b/storage/src/vespa/storage/storageserver/storagenode.h @@ -115,20 +115,20 @@ private: bool _attemptedStopped; vespalib::string _pidFile; - // First components that doesn't depend on others + // First components that doesn't depend on others std::unique_ptr<StatusWebServer> _statusWebServer; std::shared_ptr<StorageMetricSet> _metrics; std::unique_ptr<metrics::MetricManager> _metricManager; - // Depends on bucket databases and stop() functionality + // Depends on bucket databases and stop() functionality std::unique_ptr<DeadLockDetector> _deadLockDetector; - // Depends on metric manager + // Depends on metric manager std::unique_ptr<StatusMetricConsumer> _statusMetrics; - // Depends on metric manager + // Depends on metric manager std::unique_ptr<StateReporter> _stateReporter; std::unique_ptr<StateManager> _stateManager; - // The storage chain can depend on anything. + // The storage chain can depend on anything. std::unique_ptr<StorageLink> _chain; /** Implementation of config callbacks. */ |