diff options
author | Henning Baldersheim <balder@oath.com> | 2018-03-16 17:26:38 +0100 |
---|---|---|
committer | Henning Baldersheim <balder@oath.com> | 2018-03-19 13:01:16 +0100 |
commit | 99c0278cfa2118fb26a4e13b8ede982cb6abf1fe (patch) | |
tree | b59e9977e37833da59d937b0244b3df0bf373406 /storage | |
parent | 2a788ec1b3900d7ba853b3f5f89e49df7e016c7e (diff) |
Clean out priority blocking stuff in PersistenceThread
Conflicts:
storage/src/tests/persistence/filestorage/filestormanagertest.cpp
Resolve merge conflict.
Diffstat (limited to 'storage')
14 files changed, 93 insertions, 543 deletions
diff --git a/storage/src/tests/common/testhelper.cpp b/storage/src/tests/common/testhelper.cpp index 296a9a3cc0f..8cfa403539a 100644 --- a/storage/src/tests/common/testhelper.cpp +++ b/storage/src/tests/common/testhelper.cpp @@ -9,23 +9,6 @@ LOG_SETUP(".testhelper"); namespace storage { -namespace { - bool useNewStorageCore() { - if ( // Unit test directory - vespalib::fileExists("use_new_storage_core") || - // src/cpp directory - vespalib::fileExists("../use_new_storage_core") || - // Top build directory where storage-HEAD remains - vespalib::fileExists("../../../../use_new_storage_core")) - { - std::cerr << "Using new storage core for unit tests\n"; - return true; - } - return false; - } - bool newStorageCore(useNewStorageCore()); -} - void addStorageDistributionConfig(vdstestlib::DirConfig& dc) { vdstestlib::DirConfig::Config* config; @@ -128,14 +111,11 @@ vdstestlib::DirConfig getStandardConfig(bool storagenode, const std::string & ro config->set("threads[0].lowestpri 255"); config->set("dir_spread", "4"); config->set("dir_levels", "0"); - config->set("use_new_core", newStorageCore ? "true" : "false"); config->set("maximum_versions_of_single_document_stored", "0"); //config->set("enable_slotfile_cache", "false"); // Unit tests typically use fake low time values, so don't complain // about them or compact/delete them by default. Override in tests testing that // behavior - config->set("time_future_limit", "5"); - config->set("time_past_limit", "2000000000"); config->set("keep_remove_time_period", "2000000000"); config->set("revert_time_period", "2000000000"); // Don't want test to call exit() diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp index 318881364d7..26bfeb41b42 100644 --- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp +++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp @@ -7,17 +7,10 @@ #include <vespa/document/test/make_document_bucket.h> #include <vespa/storage/storageserver/statemanager.h> #include <vespa/storage/bucketdb/bucketmanager.h> -#include <vespa/vdslib/state/cluster_state_bundle.h> #include <vespa/storage/persistence/persistencethread.h> #include <vespa/storage/persistence/filestorage/filestormanager.h> #include <vespa/storage/persistence/filestorage/modifiedbucketchecker.h> #include <vespa/document/update/assignvalueupdate.h> -#include <vespa/document/datatype/datatype.h> -#include <vespa/document/fieldvalue/document.h> -#include <vespa/document/datatype/documenttype.h> -#include <vespa/document/update/documentupdate.h> -#include <vespa/document/fieldvalue/rawfieldvalue.h> -#include <vespa/document/fieldvalue/stringfieldvalue.h> #include <vespa/document/select/parser.h> #include <vespa/vdslib/state/random.h> #include <vespa/storageapi/message/bucketsplitting.h> @@ -76,8 +69,6 @@ struct FileStorManagerTest : public CppUnit::TestFixture { void testFlush(); void testRemapSplit(); void testHandlerPriority(); - void testHandlerPriorityBlocking(); - void testHandlerPriorityPreempt(); void testHandlerMulti(); void testHandlerTimeout(); void testHandlerPause(); @@ -102,7 +93,6 @@ struct FileStorManagerTest : public CppUnit::TestFixture { void testMergeBucketImplicitCreateBucket(); void testNewlyCreatedBucketIsReady(); void testCreateBucketSetsActiveFlagInDatabaseAndReply(); - void testFileStorThreadLockingStressTest(); void testStateChange(); void testRepairNotifiesDistributorOnChange(); void testDiskMove(); @@ -113,8 +103,6 @@ struct FileStorManagerTest : public CppUnit::TestFixture { CPPUNIT_TEST(testFlush); CPPUNIT_TEST(testRemapSplit); CPPUNIT_TEST(testHandlerPriority); - CPPUNIT_TEST(testHandlerPriorityBlocking); - CPPUNIT_TEST(testHandlerPriorityPreempt); CPPUNIT_TEST(testHandlerMulti); CPPUNIT_TEST(testHandlerTimeout); CPPUNIT_TEST(testHandlerPause); @@ -146,21 +134,17 @@ struct FileStorManagerTest : public CppUnit::TestFixture { void createBucket(document::BucketId bid, uint16_t disk) { - spi::Context context(defaultLoadType, spi::Priority(0), - spi::Trace::TraceLevel(0)); - _node->getPersistenceProvider().createBucket( - makeSpiBucket(bid, spi::PartitionId(disk)), context); + spi::Context context(defaultLoadType, spi::Priority(0), spi::Trace::TraceLevel(0)); + _node->getPersistenceProvider().createBucket(makeSpiBucket(bid, spi::PartitionId(disk)), context); StorBucketDatabase::WrappedEntry entry( - _node->getStorageBucketDatabase().get(bid, "foo", - StorBucketDatabase::CREATE_IF_NONEXISTING)); + _node->getStorageBucketDatabase().get(bid, "foo", StorBucketDatabase::CREATE_IF_NONEXISTING)); entry->disk = disk; entry->info = api::BucketInfo(0, 0, 0, 0, 0, true, false); entry.write(); } - document::Document::UP createDocument( - const std::string& content, const std::string& id) + document::Document::UP createDocument(const std::string& content, const std::string& id) { return _node->getTestDocMan().createDocument(content, id); } @@ -628,19 +612,16 @@ FileStorManagerTest::testHandlerPriority() FileStorMetrics metrics(loadTypes.getMetricLoadTypes()); metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1); - FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), - _node->getComponentRegister(), 255, 0); + FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), _node->getComponentRegister()); filestorHandler.setGetNextMessageTimeout(50); std::string content("Here is some content which is in all documents"); std::ostringstream uri; - Document::SP doc(createDocument( - content, "userdoc:footype:1234:bar").release()); + Document::SP doc(createDocument(content, "userdoc:footype:1234:bar").release()); document::BucketIdFactory factory; - document::BucketId bucket(16, factory.getBucketId( - doc->getId()).getRawId()); + document::BucketId bucket(16, factory.getBucketId(doc->getId()).getRawId()); // Populate bucket with the given data for (uint32_t i = 1; i < 6; i++) { @@ -739,8 +720,7 @@ FileStorManagerTest::testHandlerPausedMultiThread() FileStorMetrics metrics(loadTypes.getMetricLoadTypes()); metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1); - FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), - _node->getComponentRegister(), 255, 0); + FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), _node->getComponentRegister()); filestorHandler.setGetNextMessageTimeout(50); std::string content("Here is some content which is in all documents"); @@ -791,8 +771,7 @@ FileStorManagerTest::testHandlerPause() FileStorMetrics metrics(loadTypes.getMetricLoadTypes()); metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1); - FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), - _node->getComponentRegister(), 255, 0); + FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), _node->getComponentRegister()); filestorHandler.setGetNextMessageTimeout(50); std::string content("Here is some content which is in all documents"); @@ -802,15 +781,12 @@ FileStorManagerTest::testHandlerPause() document::BucketIdFactory factory; document::BucketId bucket(16, factory.getBucketId( - doc->getId()).getRawId()); + doc->getId()).getRawId()); // Populate bucket with the given data for (uint32_t i = 1; i < 6; i++) { - std::shared_ptr<api::PutCommand> cmd( - new api::PutCommand(makeDocumentBucket(bucket), doc, 100)); - std::unique_ptr<api::StorageMessageAddress> address( - new api::StorageMessageAddress( - "storage", lib::NodeType::STORAGE, 3)); + auto cmd = std::make_shared<api::PutCommand>(makeDocumentBucket(bucket), doc, 100); + auto address = std::make_unique<api::StorageMessageAddress>("storage", lib::NodeType::STORAGE, 3); cmd->setAddress(*address); cmd->setPriority(i * 15); filestorHandler.schedule(cmd, 0); @@ -847,8 +823,7 @@ FileStorManagerTest::testRemapSplit() // Setup a filestorthread to test DummyStorageLink top; DummyStorageLink *dummyManager; - top.push_back(std::unique_ptr<StorageLink>( - dummyManager = new DummyStorageLink)); + top.push_back(std::unique_ptr<StorageLink>(dummyManager = new DummyStorageLink)); top.open(); ForwardingMessageSender messageSender(*dummyManager); // Since we fake time with small numbers, we need to make sure we dont @@ -859,7 +834,7 @@ FileStorManagerTest::testRemapSplit() metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1); FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), - _node->getComponentRegister(), 255, 0); + _node->getComponentRegister()); filestorHandler.setGetNextMessageTimeout(50); std::string content("Here is some content which is in all documents"); @@ -874,10 +849,8 @@ FileStorManagerTest::testRemapSplit() // Populate bucket with the given data for (uint32_t i = 1; i < 4; i++) { - filestorHandler.schedule( - api::StorageMessage::SP(new api::PutCommand(makeDocumentBucket(bucket1), doc1, i)), 0); - filestorHandler.schedule( - api::StorageMessage::SP(new api::PutCommand(makeDocumentBucket(bucket2), doc2, i + 10)), 0); + filestorHandler.schedule(std::make_shared<api::PutCommand>(makeDocumentBucket(bucket1), doc1, i), 0); + filestorHandler.schedule(std::make_shared<api::PutCommand>(makeDocumentBucket(bucket2), doc2, i + 10), 0); } CPPUNIT_ASSERT_EQUAL(std::string("BucketId(0x40000000000004d2): Put(BucketId(0x40000000000004d2), userdoc:footype:1234:bar, timestamp 1, size 108) (priority: 127)\n" @@ -925,7 +898,7 @@ FileStorManagerTest::testHandlerMulti() metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1); FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), - _node->getComponentRegister(), 255, 0); + _node->getComponentRegister()); filestorHandler.setGetNextMessageTimeout(50); std::string content("Here is some content which is in all documents"); @@ -935,10 +908,8 @@ FileStorManagerTest::testHandlerMulti() Document::SP doc2(createDocument(content, "userdoc:footype:4567:bar").release()); document::BucketIdFactory factory; - document::BucketId bucket1(16, factory.getBucketId( - doc1->getId()).getRawId()); - document::BucketId bucket2(16, factory.getBucketId( - doc2->getId()).getRawId()); + document::BucketId bucket1(16, factory.getBucketId(doc1->getId()).getRawId()); + document::BucketId bucket2(16, factory.getBucketId(doc2->getId()).getRawId()); // Populate bucket with the given data for (uint32_t i = 1; i < 10; i++) { @@ -988,7 +959,7 @@ FileStorManagerTest::testHandlerTimeout() metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1); FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), - _node->getComponentRegister(), 255, 0); + _node->getComponentRegister()); filestorHandler.setGetNextMessageTimeout(50); std::string content("Here is some content which is in all documents"); @@ -1041,208 +1012,6 @@ FileStorManagerTest::testHandlerTimeout() } void -FileStorManagerTest::testHandlerPriorityBlocking() -{ - TestName testName("testHandlerPriorityBlocking"); - // Setup a filestorthread to test - DummyStorageLink top; - DummyStorageLink *dummyManager; - top.push_back(std::unique_ptr<StorageLink>( - dummyManager = new DummyStorageLink)); - top.open(); - ForwardingMessageSender messageSender(*dummyManager); - // Since we fake time with small numbers, we need to make sure we dont - // compact them away, as they will seem to be from 1970 - - documentapi::LoadTypeSet loadTypes("raw:"); - FileStorMetrics metrics(loadTypes.getMetricLoadTypes()); - metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1); - - FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), - _node->getComponentRegister(), 21, 21); - filestorHandler.setGetNextMessageTimeout(50); - - std::string content("Here is some content which is in all documents"); - std::ostringstream uri; - - document::BucketIdFactory factory; - - // Populate bucket with the given data - for (uint32_t i = 1; i < 6; i++) { - Document::SP doc(createDocument(content, vespalib::make_string("doc:foo:%d",i)).release()); - document::BucketId bucket(16, factory.getBucketId( - doc->getId()).getRawId()); - std::shared_ptr<api::PutCommand> cmd( - new api::PutCommand(makeDocumentBucket(bucket), doc, 100)); - std::unique_ptr<api::StorageMessageAddress> address( - new api::StorageMessageAddress( - "storage", lib::NodeType::STORAGE, 3)); - cmd->setAddress(*address); - cmd->setPriority(i * 15); - filestorHandler.schedule(cmd, 0); - } - - { - FileStorHandler::LockedMessage lock1 = filestorHandler.getNextMessage(0); - CPPUNIT_ASSERT_EQUAL(15, (int)lock1.second->getPriority()); - - LOG(debug, "Waiting for request that should time out"); - FileStorHandler::LockedMessage lock2 = filestorHandler.getNextMessage(0); - LOG(debug, "Got request that should time out"); - CPPUNIT_ASSERT(lock2.second.get() == NULL); - } - - { - FileStorHandler::LockedMessage lock1 = filestorHandler.getNextMessage(0); - CPPUNIT_ASSERT_EQUAL(30, (int)lock1.second->getPriority()); - - // New high-pri message comes in - Document::SP doc(createDocument(content, vespalib::make_string("doc:foo:%d", 100)).release()); - document::BucketId bucket(16, factory.getBucketId( - doc->getId()).getRawId()); - std::shared_ptr<api::PutCommand> cmd( - new api::PutCommand(makeDocumentBucket(bucket), doc, 100)); - std::unique_ptr<api::StorageMessageAddress> address( - new api::StorageMessageAddress( - "storage", lib::NodeType::STORAGE, 3)); - cmd->setAddress(*address); - cmd->setPriority(15); - filestorHandler.schedule(cmd, 0); - - FileStorHandler::LockedMessage lock2 = filestorHandler.getNextMessage(0); - CPPUNIT_ASSERT_EQUAL(15, (int)lock2.second->getPriority()); - - LOG(debug, "Waiting for request that should time out"); - FileStorHandler::LockedMessage lock3 = filestorHandler.getNextMessage(0); - LOG(debug, "Got request that should time out"); - CPPUNIT_ASSERT(lock3.second.get() == NULL); - } - - { - FileStorHandler::LockedMessage lock1 = filestorHandler.getNextMessage(0); - CPPUNIT_ASSERT_EQUAL(45, (int)lock1.second->getPriority()); - - FileStorHandler::LockedMessage lock = filestorHandler.getNextMessage(0); - CPPUNIT_ASSERT_EQUAL(60, (int)lock.second->getPriority()); - } - LOG(debug, "Test done"); -} - -class PausedThread : public document::Runnable { -private: - FileStorHandler& _handler; - -public: - bool pause; - bool done; - bool gotoperation; - - PausedThread(FileStorHandler& handler) - : _handler(handler), pause(false), done(false), gotoperation(false) {} - - void run() override { - FileStorHandler::LockedMessage msg = _handler.getNextMessage(0); - gotoperation = true; - - while (!done) { - if (pause) { - _handler.pause(0, msg.second->getPriority()); - pause = false; - } - FastOS_Thread::Sleep(10); - } - - done = false; - }; -}; - -void -FileStorManagerTest::testHandlerPriorityPreempt() -{ - TestName testName("testHandlerPriorityPreempt"); - // Setup a filestorthread to test - DummyStorageLink top; - DummyStorageLink *dummyManager; - top.push_back(std::unique_ptr<StorageLink>( - dummyManager = new DummyStorageLink)); - top.open(); - ForwardingMessageSender messageSender(*dummyManager); - // Since we fake time with small numbers, we need to make sure we dont - // compact them away, as they will seem to be from 1970 - - documentapi::LoadTypeSet loadTypes("raw:"); - FileStorMetrics metrics(loadTypes.getMetricLoadTypes()); - metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1); - - FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), - _node->getComponentRegister(), 21, 21); - filestorHandler.setGetNextMessageTimeout(50); - - std::string content("Here is some content which is in all documents"); - std::ostringstream uri; - - document::BucketIdFactory factory; - - { - Document::SP doc(createDocument(content, "doc:foo:1").release()); - document::BucketId bucket(16, factory.getBucketId( - doc->getId()).getRawId()); - std::shared_ptr<api::PutCommand> cmd( - new api::PutCommand(makeDocumentBucket(bucket), doc, 100)); - std::unique_ptr<api::StorageMessageAddress> address( - new api::StorageMessageAddress( - "storage", lib::NodeType::STORAGE, 3)); - cmd->setAddress(*address); - cmd->setPriority(60); - filestorHandler.schedule(cmd, 0); - } - - PausedThread thread(filestorHandler); - FastOS_ThreadPool pool(512 * 1024); - thread.start(pool); - - while (!thread.gotoperation) { - FastOS_Thread::Sleep(10); - } - - { - Document::SP doc(createDocument(content, "doc:foo:2").release()); - document::BucketId bucket(16, factory.getBucketId( - doc->getId()).getRawId()); - std::shared_ptr<api::PutCommand> cmd( - new api::PutCommand(makeDocumentBucket(bucket), doc, 100)); - std::unique_ptr<api::StorageMessageAddress> address( - new api::StorageMessageAddress( - "storage", lib::NodeType::STORAGE, 3)); - cmd->setAddress(*address); - cmd->setPriority(20); - filestorHandler.schedule(cmd, 0); - } - - { - FileStorHandler::LockedMessage lock1 = filestorHandler.getNextMessage(0); - CPPUNIT_ASSERT_EQUAL(20, (int)lock1.second->getPriority()); - - thread.pause = true; - - for (uint32_t i = 0; i < 10; i++) { - CPPUNIT_ASSERT(thread.pause); - FastOS_Thread::Sleep(100); - } - } - - while (thread.pause) { - FastOS_Thread::Sleep(10); - } - - thread.done = true; - - while (thread.done) { - FastOS_Thread::Sleep(10); - } -} - -void FileStorManagerTest::testPriority() { TestName testName("testPriority"); @@ -1260,8 +1029,7 @@ FileStorManagerTest::testPriority() FileStorMetrics metrics(loadTypes.getMetricLoadTypes()); metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 2); - FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), - _node->getComponentRegister(), 255, 0); + FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), _node->getComponentRegister()); std::unique_ptr<DiskThread> thread(createThread( *config, *_node, _node->getPersistenceProvider(), filestorHandler, *metrics.disks[0]->threads[0], 0)); @@ -1275,8 +1043,7 @@ FileStorManagerTest::testPriority() std::string content("Here is some content which is in all documents"); std::ostringstream uri; - uri << "userdoc:footype:" << (i % 3 == 0 ? 0x10001 : 0x0100001) - << ":mydoc-" << i; + uri << "userdoc:footype:" << (i % 3 == 0 ? 0x10001 : 0x0100001)<< ":mydoc-" << i; Document::SP doc(createDocument(content, uri.str()).release()); documents.push_back(doc); } @@ -1285,20 +1052,16 @@ FileStorManagerTest::testPriority() // Create buckets in separate, initial pass to avoid races with puts for (uint32_t i=0; i<documents.size(); ++i) { - document::BucketId bucket(16, factory.getBucketId( - documents[i]->getId()).getRawId()); + document::BucketId bucket(16, factory.getBucketId(documents[i]->getId()).getRawId()); - spi::Context context(defaultLoadType, spi::Priority(0), - spi::Trace::TraceLevel(0)); + spi::Context context(defaultLoadType, spi::Priority(0), spi::Trace::TraceLevel(0)); - _node->getPersistenceProvider().createBucket( - makeSpiBucket(bucket), context); + _node->getPersistenceProvider().createBucket(makeSpiBucket(bucket), context); } // Populate bucket with the given data for (uint32_t i=0; i<documents.size(); ++i) { - document::BucketId bucket(16, factory.getBucketId( - documents[i]->getId()).getRawId()); + document::BucketId bucket(16, factory.getBucketId(documents[i]->getId()).getRawId()); std::shared_ptr<api::PutCommand> cmd( new api::PutCommand(makeDocumentBucket(bucket), documents[i], 100 + i)); @@ -1353,8 +1116,7 @@ FileStorManagerTest::testSplit1() documentapi::LoadTypeSet loadTypes("raw:"); FileStorMetrics metrics(loadTypes.getMetricLoadTypes()); metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1); - FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), - _node->getComponentRegister(), 255, 0); + FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), _node->getComponentRegister()); std::unique_ptr<DiskThread> thread(createThread( *config, *_node, _node->getPersistenceProvider(), filestorHandler, *metrics.disks[0]->threads[0], 0)); @@ -1522,10 +1284,8 @@ FileStorManagerTest::testSplitSingleGroup() documentapi::LoadTypeSet loadTypes("raw:"); FileStorMetrics metrics(loadTypes.getMetricLoadTypes()); metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1); - FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), - _node->getComponentRegister(), 255, 0); - spi::Context context(defaultLoadType, spi::Priority(0), - spi::Trace::TraceLevel(0)); + FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), _node->getComponentRegister()); + spi::Context context(defaultLoadType, spi::Priority(0), spi::Trace::TraceLevel(0)); for (uint32_t j=0; j<1; ++j) { // Test this twice, once where all the data ends up in file with // splitbit set, and once where all the data ends up in file with @@ -1663,8 +1423,7 @@ FileStorManagerTest::testSplitEmptyTargetWithRemappedOps() documentapi::LoadTypeSet loadTypes("raw:"); FileStorMetrics metrics(loadTypes.getMetricLoadTypes()); metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1); - FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), - _node->getComponentRegister(), 255, 0); + FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), _node->getComponentRegister()); std::unique_ptr<DiskThread> thread(createThread( *config, *_node, _node->getPersistenceProvider(), filestorHandler, *metrics.disks[0]->threads[0], 0)); @@ -1741,8 +1500,7 @@ FileStorManagerTest::testNotifyOnSplitSourceOwnershipChanged() documentapi::LoadTypeSet loadTypes("raw:"); FileStorMetrics metrics(loadTypes.getMetricLoadTypes()); metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1); - FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), - _node->getComponentRegister(), 255, 0); + FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), _node->getComponentRegister()); std::unique_ptr<DiskThread> thread(createThread( *config, *_node, _node->getPersistenceProvider(), filestorHandler, *metrics.disks[0]->threads[0], 0)); @@ -1791,8 +1549,7 @@ FileStorManagerTest::testJoin() documentapi::LoadTypeSet loadTypes("raw:"); FileStorMetrics metrics(loadTypes.getMetricLoadTypes()); metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1); - FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), - _node->getComponentRegister(), 255, 0); + FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), _node->getComponentRegister()); std::unique_ptr<DiskThread> thread(createThread( *config, *_node, _node->getPersistenceProvider(), filestorHandler, *metrics.disks[0]->threads[0], 0)); @@ -1802,10 +1559,8 @@ FileStorManagerTest::testJoin() std::string content("Here is some content which is in all documents"); std::ostringstream uri; - uri << "userdoc:footype:" << (i % 3 == 0 ? 0x10001 : 0x0100001) - << ":mydoc-" << i; - Document::SP doc(createDocument( - content, uri.str()).release()); + uri << "userdoc:footype:" << (i % 3 == 0 ? 0x10001 : 0x0100001) << ":mydoc-" << i; + Document::SP doc(createDocument(content, uri.str()).release()); documents.push_back(doc); } document::BucketIdFactory factory; @@ -1816,36 +1571,26 @@ FileStorManagerTest::testJoin() { // Populate bucket with the given data for (uint32_t i=0; i<documents.size(); ++i) { - document::BucketId bucket(17, factory.getBucketId( - documents[i]->getId()).getRawId()); - std::shared_ptr<api::PutCommand> cmd( - new api::PutCommand(makeDocumentBucket(bucket), documents[i], 100 + i)); - std::unique_ptr<api::StorageMessageAddress> address( - new api::StorageMessageAddress( - "storage", lib::NodeType::STORAGE, 3)); + document::BucketId bucket(17, factory.getBucketId(documents[i]->getId()).getRawId()); + auto cmd = std::make_shared<api::PutCommand>(makeDocumentBucket(bucket), documents[i], 100 + i); + auto address = std::make_unique<api::StorageMessageAddress>("storage", lib::NodeType::STORAGE, 3); cmd->setAddress(*address); filestorHandler.schedule(cmd, 0); filestorHandler.flush(true); CPPUNIT_ASSERT_EQUAL((size_t) 1, top.getNumReplies()); - std::shared_ptr<api::PutReply> reply( - std::dynamic_pointer_cast<api::PutReply>( - top.getReply(0))); + auto reply = std::dynamic_pointer_cast<api::PutReply>(top.getReply(0)); CPPUNIT_ASSERT(reply.get()); - CPPUNIT_ASSERT_EQUAL(ReturnCode(ReturnCode::OK), - reply->getResult()); + CPPUNIT_ASSERT_EQUAL(ReturnCode(ReturnCode::OK), reply->getResult()); top.reset(); // Delete every 5th document to have delete entries in file too if (i % 5 == 0) { - std::shared_ptr<api::RemoveCommand> rcmd( - new api::RemoveCommand( - makeDocumentBucket(bucket), documents[i]->getId(), 1000000 + 100 + i)); + auto rcmd = std::make_shared<api::RemoveCommand>(makeDocumentBucket(bucket), + documents[i]->getId(), 1000000 + 100 + i); rcmd->setAddress(*address); filestorHandler.schedule(rcmd, 0); filestorHandler.flush(true); CPPUNIT_ASSERT_EQUAL((size_t) 1, top.getNumReplies()); - std::shared_ptr<api::RemoveReply> rreply( - std::dynamic_pointer_cast<api::RemoveReply>( - top.getReply(0))); + auto rreply = std::dynamic_pointer_cast<api::RemoveReply>(top.getReply(0)); CPPUNIT_ASSERT_MSG(top.getReply(0)->getType().toString(), rreply.get()); CPPUNIT_ASSERT_EQUAL(ReturnCode(ReturnCode::OK), diff --git a/storage/src/tests/persistence/persistencequeuetest.cpp b/storage/src/tests/persistence/persistencequeuetest.cpp index 8d18f75d3db..f5f190ad718 100644 --- a/storage/src/tests/persistence/persistencequeuetest.cpp +++ b/storage/src/tests/persistence/persistencequeuetest.cpp @@ -77,9 +77,7 @@ PersistenceQueueTest::testFetchNextUnlockedMessageIfBucketLocked() metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1); - FileStorHandler filestorHandler(messageSender, metrics, - _node->getPartitions(), - _node->getComponentRegister(), 255, 0); + FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), _node->getComponentRegister()); // Send 2 puts, 2 to the first bucket, 1 to the second. Calling // getNextMessage 2 times should then return a lock on the first bucket, diff --git a/storage/src/tests/persistence/persistencetestutils.cpp b/storage/src/tests/persistence/persistencetestutils.cpp index e0769795bed..d46708b3aaa 100644 --- a/storage/src/tests/persistence/persistencetestutils.cpp +++ b/storage/src/tests/persistence/persistencetestutils.cpp @@ -52,15 +52,13 @@ PersistenceTestEnvironment::PersistenceTestEnvironment(DiskCount numDisks, const _node.setupDummyPersistence(); _metrics.initDiskMetrics( numDisks, _node.getLoadTypes()->getMetricLoadTypes(), 1); - _handler.reset(new FileStorHandler( - _messageKeeper, _metrics, - _node.getPersistenceProvider().getPartitionStates().getList(), - _node.getComponentRegister(), 255, 0)); + _handler.reset(new FileStorHandler(_messageKeeper, _metrics, + _node.getPersistenceProvider().getPartitionStates().getList(), + _node.getComponentRegister())); for (uint32_t i = 0; i < numDisks; i++) { _diskEnvs.push_back( - std::unique_ptr<PersistenceUtil>( - new PersistenceUtil(_config.getConfigId(), _node.getComponentRegister(), - *_handler, *_metrics.disks[i]->threads[0], i, _node.getPersistenceProvider()))); + std::make_unique<PersistenceUtil>(_config.getConfigId(), _node.getComponentRegister(), *_handler, + *_metrics.disks[i]->threads[0], i, _node.getPersistenceProvider())); } } diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp index b53a2a361b3..949ceb901e1 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp @@ -7,12 +7,8 @@ namespace storage { FileStorHandler::FileStorHandler(MessageSender& sender, FileStorMetrics& metrics, const spi::PartitionStateList& partitions, - ServiceLayerComponentRegister& compReg, - uint8_t maxPriorityToBlock, - uint8_t minPriorityToBeBlocking) - : _impl(new FileStorHandlerImpl( - sender, metrics, partitions, compReg, - maxPriorityToBlock, minPriorityToBeBlocking)) + ServiceLayerComponentRegister& compReg) + : _impl(new FileStorHandlerImpl(sender, metrics, partitions, compReg)) { } @@ -57,11 +53,6 @@ FileStorHandler::schedule(const api::StorageMessage::SP& msg, uint16_t thread) return _impl->schedule(msg, thread); } -void -FileStorHandler::pause(uint16_t disk, uint8_t priority) const { - return _impl->pause(disk, priority); -} - FileStorHandler::LockedMessage FileStorHandler::getNextMessage(uint16_t thread) { diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h index 8fc528b5ad7..2cfc97ca71b 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h @@ -15,7 +15,6 @@ #include "mergestatus.h" #include <vespa/document/bucket/bucket.h> -#include <ostream> #include <vespa/storage/storageutil/resumeguard.h> #include <vespa/storage/common/messagesender.h> @@ -72,12 +71,7 @@ public: CLOSED }; - FileStorHandler(MessageSender&, - FileStorMetrics&, - const spi::PartitionStateList&, - ServiceLayerComponentRegister&, - uint8_t maxPriorityToBlock, - uint8_t minPriorityToBeBlocking); + FileStorHandler(MessageSender&, FileStorMetrics&, const spi::PartitionStateList&, ServiceLayerComponentRegister&); ~FileStorHandler(); // Commands used by file stor manager @@ -115,17 +109,7 @@ public: * Schedule a storage message to be processed by the given disk * @return True if we maanged to schedule operation. False if not */ - bool schedule(const std::shared_ptr<api::StorageMessage>&, - uint16_t disk); - - // Commands used by file stor threads - - /** - * When called, checks if any running operations have "preempting" - * priority. If so, and the given priority is less than that, this call - * will hang until the other operation is done. - */ - void pause(uint16_t disk, uint8_t priority) const; + bool schedule(const std::shared_ptr<api::StorageMessage>&, uint16_t disk); /** * Used by file stor threads to get their next message to process. @@ -216,8 +200,7 @@ public: * Fail all operations towards a single bucket currently queued to the * given thread with the given error code. */ - void failOperations(const document::Bucket&, uint16_t fromDisk, - const api::ReturnCode&); + void failOperations(const document::Bucket&, uint16_t fromDisk, const api::ReturnCode&); /** * Add a new merge state to the registry. diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp index 0793d23b862..75f389815df 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp @@ -23,20 +23,14 @@ using document::BucketSpace; namespace storage { -FileStorHandlerImpl::FileStorHandlerImpl( - MessageSender& sender, - FileStorMetrics& metrics, - const spi::PartitionStateList& partitions, - ServiceLayerComponentRegister& compReg, - uint8_t maxPriorityToBlock, - uint8_t minPriorityToBeBlocking) +FileStorHandlerImpl::FileStorHandlerImpl(MessageSender& sender, FileStorMetrics& metrics, + const spi::PartitionStateList& partitions, + ServiceLayerComponentRegister& compReg) : _partitions(partitions), _component(compReg, "filestorhandlerimpl"), _diskInfo(_component.getDiskCount()), _messageSender(sender), _bucketIdFactory(_component.getBucketIdFactory()), - _maxPriorityToBlock(maxPriorityToBlock), - _minPriorityToBeBlocking(minPriorityToBeBlocking), _getNextMessageTimeout(100), _paused(false) { @@ -46,23 +40,21 @@ FileStorHandlerImpl::FileStorHandlerImpl( } if (_diskInfo.size() == 0) { - throw vespalib::IllegalArgumentException( - "No disks configured", VESPA_STRLOC); + throw vespalib::IllegalArgumentException("No disks configured", VESPA_STRLOC); } // Add update hook, so we will get callbacks each 5 seconds to update // metrics. _component.registerMetricUpdateHook(*this, framework::SecondTime(5)); } -FileStorHandlerImpl::~FileStorHandlerImpl() { } +FileStorHandlerImpl::~FileStorHandlerImpl() = default; void FileStorHandlerImpl::addMergeStatus(const document::Bucket& bucket, MergeStatus::SP status) { vespalib::LockGuard mlock(_mergeStatesLock); if (_mergeStates.find(bucket) != _mergeStates.end()) {; - LOG(warning, "A merge status already existed for %s. Overwriting it.", - bucket.toString().c_str()); + LOG(warning, "A merge status already existed for %s. Overwriting it.", bucket.toString().c_str()); } _mergeStates[bucket] = status; } @@ -73,8 +65,7 @@ FileStorHandlerImpl::editMergeStatus(const document::Bucket& bucket) vespalib::LockGuard mlock(_mergeStatesLock); MergeStatus::SP status = _mergeStates[bucket]; if (status.get() == 0) { - throw vespalib::IllegalStateException( - "No merge state exist for " + bucket.toString(), VESPA_STRLOC); + throw vespalib::IllegalStateException("No merge state exist for " + bucket.toString(), VESPA_STRLOC); } return *status; } @@ -94,8 +85,7 @@ FileStorHandlerImpl::getNumActiveMerges() const } void -FileStorHandlerImpl::clearMergeStatus(const document::Bucket& bucket, - const api::ReturnCode* code) +FileStorHandlerImpl::clearMergeStatus(const document::Bucket& bucket, const api::ReturnCode* code) { vespalib::LockGuard mlock(_mergeStatesLock); auto it = _mergeStates.find(bucket); @@ -153,10 +143,9 @@ FileStorHandlerImpl::flush(bool killPendingMerges) if (killPendingMerges) { api::ReturnCode code(api::ReturnCode::ABORTED, "Storage node is shutting down"); - for (std::map<document::Bucket, MergeStatus::SP>::iterator it - = _mergeStates.begin(); it != _mergeStates.end(); ++it) + for (auto & entry : _mergeStates) { - MergeStatus& s(*it->second); + MergeStatus& s(*entry.second); if (s.pendingGetDiff.get() != 0) { s.pendingGetDiff->setResult(code); _messageSender.sendReply(s.pendingGetDiff); @@ -182,11 +171,9 @@ FileStorHandlerImpl::reply(api::StorageMessage& msg, std::shared_ptr<api::StorageReply> rep( static_cast<api::StorageCommand&>(msg).makeReply().release()); if (state == FileStorHandler::DISABLED) { - rep->setResult(api::ReturnCode( - api::ReturnCode::DISK_FAILURE, "Disk disabled")); + rep->setResult(api::ReturnCode(api::ReturnCode::DISK_FAILURE, "Disk disabled")); } else { - rep->setResult(api::ReturnCode( - api::ReturnCode::ABORTED, "Shutting down storage node.")); + rep->setResult(api::ReturnCode(api::ReturnCode::ABORTED, "Shutting down storage node.")); } _messageSender.sendReply(rep); } @@ -270,29 +257,6 @@ FileStorHandlerImpl::schedule(const std::shared_ptr<api::StorageMessage>& msg, return true; } -void -FileStorHandlerImpl::pause(uint16_t disk, uint8_t priority) const { - if (priority < _maxPriorityToBlock) { - return; - } - - assert(disk < _diskInfo.size()); - const Disk& t(_diskInfo[disk]); - vespalib::MonitorGuard lockGuard(t.lock); - - bool paused = true; - while (paused) { - paused = false; - for (auto& lockedBucket : t.lockedBuckets) { - if (lockedBucket.second.priority <= _minPriorityToBeBlocking) { - paused = true; - lockGuard.wait(); - break; - } - } - } -} - bool FileStorHandlerImpl::messageMayBeAborted(const api::StorageMessage& msg) const { @@ -395,18 +359,6 @@ FileStorHandlerImpl::abortQueuedOperations( } } -bool -FileStorHandlerImpl::hasBlockingOperations(const Disk& t) const -{ - for (const auto & lockedBucket : t.lockedBuckets) { - if (lockedBucket.second.priority <= _minPriorityToBeBlocking) { - return true; - } - } - - return false; -} - void FileStorHandlerImpl::updateMetrics(const MetricLockGuard &) { @@ -423,10 +375,7 @@ FileStorHandlerImpl::getNextMessage(uint16_t disk, FileStorHandler::LockedMessag { document::Bucket bucket(lck.first->getBucket()); - LOG(spam, - "Disk %d retrieving message for buffered bucket %s", - disk, - bucket.getBucketId().toString().c_str()); + LOG(spam, "Disk %d retrieving message for buffered bucket %s", disk, bucket.getBucketId().toString().c_str()); assert(disk < _diskInfo.size()); Disk& t(_diskInfo[disk]); @@ -449,22 +398,12 @@ FileStorHandlerImpl::getNextMessage(uint16_t disk, FileStorHandler::LockedMessag api::StorageMessage & m(*range.first->_command); mbus::Trace& trace = m.getTrace(); - // Priority is too low, not buffering any more. - if (m.getPriority() >= _maxPriorityToBlock) { - lck.second.reset(); - return lck; - } + MBUS_TRACE(trace, 9, "FileStorHandler: Message identified by disk thread looking for more requests to active bucket."); - MBUS_TRACE(trace, 9, - "FileStorHandler: Message identified by disk thread looking for " - "more requests to active bucket."); - - uint64_t waitTime( - const_cast<metrics::MetricTimer&>(range.first->_timer).stop( - t.metrics->averageQueueWaitingTime[m.getLoadType()])); + uint64_t waitTime(const_cast<metrics::MetricTimer&>(range.first->_timer).stop( + t.metrics->averageQueueWaitingTime[m.getLoadType()])); - LOG(debug, "Message %s waited %" PRIu64 " ms in storage queue (bucket %s), " - "timeout %d", + LOG(debug, "Message %s waited %" PRIu64 " ms in storage queue (bucket %s), timeout %d", m.toString().c_str(), waitTime, bucket.getBucketId().toString().c_str(), static_cast<api::StorageCommand&>(m).getTimeout()); @@ -478,15 +417,11 @@ FileStorHandlerImpl::getNextMessage(uint16_t disk, FileStorHandler::LockedMessag lockGuard.unlock(); return lck; } else { - std::shared_ptr<api::StorageReply> msgReply( - static_cast<api::StorageCommand&>(m) - .makeReply().release()); + std::shared_ptr<api::StorageReply> msgReply(static_cast<api::StorageCommand&>(m).makeReply().release()); idx.erase(range.first); lockGuard.broadcast(); lockGuard.unlock(); - msgReply->setResult(api::ReturnCode( - api::ReturnCode::TIMEOUT, - "Message waited too long in storage queue")); + msgReply->setResult(api::ReturnCode(api::ReturnCode::TIMEOUT, "Message waited too long in storage queue")); _messageSender.sendReply(msgReply); lck.second.reset(); @@ -515,23 +450,13 @@ FileStorHandlerImpl::diskIsClosed(uint16_t disk) const } bool -FileStorHandlerImpl::operationBlockedByHigherPriorityThread( - const api::StorageMessage& msg, - const Disk& disk) const -{ - return ((msg.getPriority() >= _maxPriorityToBlock) - && hasBlockingOperations(disk)); -} - -bool FileStorHandlerImpl::messageTimedOutInQueue(const api::StorageMessage& msg, uint64_t waitTime) const { if (msg.getType().isReply()) { return false; // Replies must always be processed and cannot time out. } - return (waitTime >= static_cast<const api::StorageCommand&>( - msg).getTimeout()); + return (waitTime >= static_cast<const api::StorageCommand&>(msg).getTimeout()); } std::unique_ptr<FileStorHandler::BucketLockInterface> @@ -541,8 +466,7 @@ FileStorHandlerImpl::takeDiskBucketLockOwnership( const document::Bucket &bucket, const api::StorageMessage& msg) { - return std::unique_ptr<FileStorHandler::BucketLockInterface>( - new BucketLock(guard, disk, bucket, msg.getPriority(), msg.getSummary())); + return std::make_unique<BucketLock>(guard, disk, bucket, msg.getPriority(), msg.getSummary()); } std::unique_ptr<api::StorageReply> @@ -587,11 +511,7 @@ FileStorHandlerImpl::getNextMessage(uint16_t disk) iter++; } if (iter != end) { - api::StorageMessage &m(*iter->_command); - - if (! operationBlockedByHigherPriorityThread(m, t) - && ! isPaused()) - { + if (! isPaused()) { return getMessage(lockGuard, t, idx, iter); } } diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h index 0b1ab5661e2..6b6d154e149 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h @@ -142,12 +142,8 @@ public: document::Bucket _bucket; }; - FileStorHandlerImpl(MessageSender&, - FileStorMetrics&, - const spi::PartitionStateList&, - ServiceLayerComponentRegister&, - uint8_t maxPriorityToBlock, - uint8_t minPriorityToBeBlocking); + FileStorHandlerImpl(MessageSender&, FileStorMetrics&, + const spi::PartitionStateList&, ServiceLayerComponentRegister&); ~FileStorHandlerImpl(); void setGetNextMessageTimeout(uint32_t timeout) { _getNextMessageTimeout = timeout; } @@ -158,7 +154,6 @@ public: void close(); bool schedule(const std::shared_ptr<api::StorageMessage>&, uint16_t disk); - void pause(uint16_t disk, uint8_t priority) const; FileStorHandler::LockedMessage getNextMessage(uint16_t disk); FileStorHandler::LockedMessage getMessage(vespalib::MonitorGuard & guard, Disk & t, PriorityIdx & idx, PriorityIdx::iterator iter); @@ -203,8 +198,6 @@ private: std::map<document::Bucket, MergeStatus::SP> _mergeStates; - uint8_t _maxPriorityToBlock; - uint8_t _minPriorityToBeBlocking; uint32_t _getNextMessageTimeout; vespalib::Monitor _pauseMonitor; @@ -236,12 +229,6 @@ private: bool diskIsClosed(uint16_t disk) const; /** - * Return whether an already running high priority operation pre-empts - * (blocks) the operation in msg from even starting in the current thread. - */ - bool operationBlockedByHigherPriorityThread(const api::StorageMessage& msg, const Disk& disk) const; - - /** * Return whether msg has timed out based on waitTime and the message's * specified timeout. */ @@ -261,7 +248,6 @@ private: */ std::unique_ptr<api::StorageReply> makeQueueTimeoutReply(api::StorageMessage& msg) const; bool messageMayBeAborted(const api::StorageMessage& msg) const; - bool hasBlockingOperations(const Disk& t) const; void abortQueuedCommandsForBuckets(Disk& disk, const AbortBucketOperationsCommand& cmd); bool diskHasActiveOperationForAbortedBucket(const Disk& disk, const AbortBucketOperationsCommand& cmd) const; void waitUntilNoActiveOperationsForAbortedBuckets(Disk& disk, const AbortBucketOperationsCommand& cmd); @@ -286,4 +272,3 @@ private: }; } // storage - diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp index 20c08acc01c..5d4cae19b59 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp @@ -116,17 +116,12 @@ FileStorManager::configure(std::unique_ptr<vespa::config::content::StorFilestorC _disks.resize(_component.getDiskCount()); - _metrics->initDiskMetrics( - _disks.size(), - _component.getLoadTypes()->getMetricLoadTypes(), - (_config->threads.size() > 0) ? (_config->threads.size()) : 6); - - _filestorHandler.reset(new FileStorHandler( - *this, *_metrics, _partitions, _compReg, - _config->maxPriorityToBlock, _config->minPriorityToBeBlocking)); + size_t numThreads = (_config->threads.size() == 0) ? 6 : _config->threads.size(); + _metrics->initDiskMetrics(_disks.size(), _component.getLoadTypes()->getMetricLoadTypes(), numThreads); + + _filestorHandler.reset(new FileStorHandler(*this, *_metrics, _partitions, _compReg)); for (uint32_t i=0; i<_component.getDiskCount(); ++i) { if (_partitions[i].isUp()) { - size_t numThreads = (_config->threads.size() == 0) ? 6 : _config->threads.size(); LOG(spam, "Setting up disk %u", i); for (uint32_t j = 0; j < numThreads; j++) { _disks[i].push_back(DiskThread::SP( diff --git a/storage/src/vespa/storage/persistence/filestorage/pausehandler.h b/storage/src/vespa/storage/persistence/filestorage/pausehandler.h deleted file mode 100644 index 59c543b0067..00000000000 --- a/storage/src/vespa/storage/persistence/filestorage/pausehandler.h +++ /dev/null @@ -1,34 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -/** - * @class PauseHandler - * @ingroup persistence - * - * @brief Object that can be used to possibly pause running operation - */ -#pragma once - -#include <vespa/storage/persistence/filestorage/filestorhandler.h> - -namespace storage { - -class PauseHandler { - FileStorHandler* _handler; - uint16_t _disk; - uint8_t _priority; - -public: - PauseHandler() : _handler(0), _disk(0), _priority(0) {} - PauseHandler(FileStorHandler& handler, uint16_t disk) - : _handler(&handler), - _disk(disk), - _priority(0) - { - } - - void setPriority(uint8_t priority) { _priority = priority; } - - void pause() const { if (_handler != 0) _handler->pause(_disk, _priority); } -}; - -} // storage - diff --git a/storage/src/vespa/storage/persistence/persistencethread.cpp b/storage/src/vespa/storage/persistence/persistencethread.cpp index 13d9d0ffbb6..fded9c87978 100644 --- a/storage/src/vespa/storage/persistence/persistencethread.cpp +++ b/storage/src/vespa/storage/persistence/persistencethread.cpp @@ -947,28 +947,23 @@ PersistenceThread::processMessage(api::StorageMessage& msg) ++_env._metrics.operations; if (msg.getType().isReply()) { try{ - _env._pauseHandler.setPriority(msg.getPriority()); LOG(debug, "Handling reply: %s", msg.toString().c_str()); LOG(spam, "Message content: %s", msg.toString(true).c_str()); handleReply(static_cast<api::StorageReply&>(msg)); } catch (std::exception& e) { // It's a reply, so nothing we can do. - LOG(debug, "Caught exception for %s: %s", - msg.toString().c_str(), - e.what()); + LOG(debug, "Caught exception for %s: %s", msg.toString().c_str(), e.what()); } } else { api::StorageCommand& initiatingCommand = static_cast<api::StorageCommand&>(msg); try { - int64_t startTime( - _component->getClock().getTimeInMillis().getTime()); + int64_t startTime(_component->getClock().getTimeInMillis().getTime()); LOG(debug, "Handling command: %s", msg.toString().c_str()); LOG(spam, "Message content: %s", msg.toString(true).c_str()); - std::unique_ptr<MessageTracker> tracker( - handleCommand(initiatingCommand)); + auto tracker(handleCommand(initiatingCommand)); if (!tracker.get()) { LOG(debug, "Received unsupported command %s", msg.getType().getName().c_str()); diff --git a/storage/src/vespa/storage/persistence/persistenceutil.cpp b/storage/src/vespa/storage/persistence/persistenceutil.cpp index ec666925148..c2dcb8e2a29 100644 --- a/storage/src/vespa/storage/persistence/persistenceutil.cpp +++ b/storage/src/vespa/storage/persistence/persistenceutil.cpp @@ -76,7 +76,6 @@ PersistenceUtil::PersistenceUtil( _metrics(metrics), _bucketFactory(_component.getBucketIdFactory()), _repo(_component.getTypeRepo()), - _pauseHandler(), _spi(provider) { } @@ -87,9 +86,8 @@ void PersistenceUtil::updateBucketDatabase(const document::Bucket &bucket, const api::BucketInfo& i) { // Update bucket database - StorBucketDatabase::WrappedEntry entry(getBucketDatabase(bucket.getBucketSpace()).get( - bucket.getBucketId(), - "env::updatebucketdb")); + StorBucketDatabase::WrappedEntry entry(getBucketDatabase(bucket.getBucketSpace()).get(bucket.getBucketId(), + "env::updatebucketdb")); if (entry.exist()) { api::BucketInfo info = i; @@ -101,9 +99,7 @@ PersistenceUtil::updateBucketDatabase(const document::Bucket &bucket, const api: entry->setBucketInfo(info); entry.write(); } else { - LOG(debug, - "Bucket(%s).getBucketInfo: Bucket does not exist.", - bucket.getBucketId().toString().c_str()); + LOG(debug, "Bucket(%s).getBucketInfo: Bucket does not exist.", bucket.getBucketId().toString().c_str()); } } diff --git a/storage/src/vespa/storage/persistence/persistenceutil.h b/storage/src/vespa/storage/persistence/persistenceutil.h index c3bd706f4b7..bf347ccb10a 100644 --- a/storage/src/vespa/storage/persistence/persistenceutil.h +++ b/storage/src/vespa/storage/persistence/persistenceutil.h @@ -1,12 +1,11 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once +#include "types.h" #include <vespa/persistence/spi/persistenceprovider.h> #include <vespa/storage/common/servicelayercomponent.h> #include <vespa/storage/persistence/filestorage/filestorhandler.h> #include <vespa/storage/persistence/filestorage/filestormetrics.h> -#include <vespa/storage/persistence/filestorage/pausehandler.h> -#include <vespa/storage/persistence/types.h> #include <vespa/vespalib/io/fileutil.h> #include <vespa/storage/storageutil/utils.h> #include <vespa/config-stor-filestor.h> @@ -70,7 +69,6 @@ struct PersistenceUtil { FileStorThreadMetrics& _metrics; const document::BucketIdFactory& _bucketFactory; const std::shared_ptr<document::DocumentTypeRepo> _repo; - PauseHandler _pauseHandler; spi::PersistenceProvider& _spi; PersistenceUtil( diff --git a/storage/src/vespa/storage/storageserver/storagenode.h b/storage/src/vespa/storage/storageserver/storagenode.h index 38a48ac2eed..c8739442c13 100644 --- a/storage/src/vespa/storage/storageserver/storagenode.h +++ b/storage/src/vespa/storage/storageserver/storagenode.h @@ -115,20 +115,20 @@ private: bool _attemptedStopped; vespalib::string _pidFile; - // First components that doesn't depend on others + // First components that doesn't depend on others std::unique_ptr<StatusWebServer> _statusWebServer; std::shared_ptr<StorageMetricSet> _metrics; std::unique_ptr<metrics::MetricManager> _metricManager; - // Depends on bucket databases and stop() functionality + // Depends on bucket databases and stop() functionality std::unique_ptr<DeadLockDetector> _deadLockDetector; - // Depends on metric manager + // Depends on metric manager std::unique_ptr<StatusMetricConsumer> _statusMetrics; - // Depends on metric manager + // Depends on metric manager std::unique_ptr<StateReporter> _stateReporter; std::unique_ptr<StateManager> _stateManager; - // The storage chain can depend on anything. + // The storage chain can depend on anything. std::unique_ptr<StorageLink> _chain; /** Implementation of config callbacks. */ |