diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-10-14 23:07:22 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2020-10-15 04:59:11 +0000 |
commit | 30d76ff5080f3f911d9119125202d7bad0a2a9da (patch) | |
tree | 9123279b9637e8cbab0b116428892e8990215d48 | |
parent | f07e7cde693a73d99d6d3d27dc3aa65e44d1958b (diff) |
GC disk related code.
40 files changed, 403 insertions, 686 deletions
diff --git a/storage/src/tests/bucketdb/bucketmanagertest.cpp b/storage/src/tests/bucketdb/bucketmanagertest.cpp index 51b477a2980..7c241f63a39 100644 --- a/storage/src/tests/bucketdb/bucketmanagertest.cpp +++ b/storage/src/tests/bucketdb/bucketmanagertest.cpp @@ -150,8 +150,7 @@ void BucketManagerTest::setupTestEnvironment(bool fakePersistenceLayer, *ConfigGetter<DocumenttypesConfig>::getConfig( "config-doctypes", FileSpec("../config-doctypes.cfg"))); _top = std::make_unique<DummyStorageLink>(); - _node = std::make_unique<TestServiceLayerApp>( - DiskCount(1), NodeIndex(0), config.getConfigId()); + _node = std::make_unique<TestServiceLayerApp>(NodeIndex(0), config.getConfigId()); _node->setTypeRepo(repo); _node->setupDummyPersistence(); // Set up the 3 links @@ -204,7 +203,6 @@ void BucketManagerTest::addBucketsToDB(uint32_t count) ++_emptyBuckets; for (const auto& bi : _bucketInfo) { bucketdb::StorageBucketInfo entry; - entry.disk = bi.second.partition; entry.setBucketInfo(api::BucketInfo(bi.second.crc, bi.second.count, bi.second.size)); @@ -224,7 +222,6 @@ BucketManagerTest::wasBlockedDueToLastModified(api::StorageMessage* msg, { bucketdb::StorageBucketInfo entry; entry.setBucketInfo(info); - entry.disk = 0; _node->getStorageBucketDatabase().insert(id, entry, "foo"); } @@ -438,7 +435,6 @@ TEST_F(BucketManagerTest, metrics_generation) { // Add 3 buckets; 2 ready, 1 active. 300 docs total, 600 bytes total. for (int i = 0; i < 3; ++i) { bucketdb::StorageBucketInfo entry; - entry.disk = 0; api::BucketInfo info(50, 100, 200); if (i > 0) { info.setReady(); @@ -483,7 +479,6 @@ TEST_F(BucketManagerTest, metrics_are_tracked_per_bucket_space) { auto& repo = _node->getComponentRegister().getBucketSpaceRepo(); { bucketdb::StorageBucketInfo entry; - entry.disk = 0; api::BucketInfo info(50, 100, 200); info.setReady(true); entry.setBucketInfo(info); @@ -492,7 +487,6 @@ TEST_F(BucketManagerTest, metrics_are_tracked_per_bucket_space) { } { bucketdb::StorageBucketInfo entry; - entry.disk = 0; api::BucketInfo info(60, 150, 300); info.setActive(true); entry.setBucketInfo(info); @@ -530,7 +524,6 @@ BucketManagerTest::insertSingleBucket(const document::BucketId& bucket, const api::BucketInfo& info) { bucketdb::StorageBucketInfo entry; - entry.disk = 0; entry.setBucketInfo(info); _node->getStorageBucketDatabase().insert(bucket, entry, "foo"); } diff --git a/storage/src/tests/bucketdb/initializertest.cpp b/storage/src/tests/bucketdb/initializertest.cpp index c11e067731e..c5d30204def 100644 --- a/storage/src/tests/bucketdb/initializertest.cpp +++ b/storage/src/tests/bucketdb/initializertest.cpp @@ -132,11 +132,10 @@ struct BucketInfoLogger { StorBucketDatabase::Decision operator()( uint64_t revBucket, const StorBucketDatabase::Entry& entry) { - document::BucketId bucket( - document::BucketId::keyToBucketId(revBucket)); + document::BucketId bucket(document::BucketId::keyToBucketId(revBucket)); assert(bucket.getRawId() != 0); assert(entry.getBucketInfo().valid()); - DiskData& ddata(map[entry.disk]); + DiskData& ddata(map[0]); BucketData& bdata(ddata[bucket]); bdata.info = entry.getBucketInfo(); return StorBucketDatabase::Decision::CONTINUE; @@ -356,8 +355,7 @@ struct FakePersistenceLayer : public StorageLink { + " did not exist in bucket database but we got " + "read bucket info request for it."); } else { - const BucketData* bucketData(getBucketData( - entry->disk, rbi.getBucketId(), "readbucketinfo")); + const BucketData* bucketData(getBucketData(0, rbi.getBucketId(), "readbucketinfo")); if (bucketData != 0) { entry->setBucketInfo(bucketData->info); entry.write(); @@ -412,8 +410,7 @@ InitializerTest::do_test_initialization(InitParams& params) std::map<PartitionId, DiskData> data(buildBucketInfo(_docMan, params)); assert(params.diskCount == 1u); - TestServiceLayerApp node(params.diskCount, params.nodeIndex, - params.getConfig().getConfigId()); + TestServiceLayerApp node(params.nodeIndex, params.getConfig().getConfigId()); DummyStorageLink top; StorageBucketDBInitializer* initializer; FakePersistenceLayer* bottom; @@ -534,7 +531,6 @@ struct DatabaseInsertCallback : MessageCallback d.info = api::BucketInfo(3+i, 4+i, 5+i, 6+i, 7+i); } _data[bid] = d; - entry->disk = 0; entry->setBucketInfo(d.info); entry.write(); } @@ -555,7 +551,7 @@ TEST_F(InitializerTest, buckets_initialized_by_load) { std::map<PartitionId, DiskData> data(buildBucketInfo(_docMan, params)); assert(params.diskCount == 1u); - TestServiceLayerApp node(params.diskCount, params.nodeIndex, + TestServiceLayerApp node(params.nodeIndex, params.getConfig().getConfigId()); DummyStorageLink top; StorageBucketDBInitializer* initializer; diff --git a/storage/src/tests/common/metricstest.cpp b/storage/src/tests/common/metricstest.cpp index 59d8fa4b2d3..c0ccb5bc771 100644 --- a/storage/src/tests/common/metricstest.cpp +++ b/storage/src/tests/common/metricstest.cpp @@ -71,8 +71,7 @@ void MetricsTest::SetUp() { _config = std::make_unique<vdstestlib::DirConfig>(getStandardConfig(true, "metricstest")); assert(system(("rm -rf " + getRootFolder(*_config)).c_str()) == 0); try { - _node = std::make_unique<TestServiceLayerApp>( - DiskCount(1), NodeIndex(0), _config->getConfigId()); + _node = std::make_unique<TestServiceLayerApp>(NodeIndex(0), _config->getConfigId()); _node->setupDummyPersistence(); _clock = &_node->getClock(); _clock->setAbsoluteTimeInSeconds(1000000); diff --git a/storage/src/tests/common/teststorageapp.cpp b/storage/src/tests/common/teststorageapp.cpp index 992d6a54d91..1c69e87e38e 100644 --- a/storage/src/tests/common/teststorageapp.cpp +++ b/storage/src/tests/common/teststorageapp.cpp @@ -147,14 +147,13 @@ TestServiceLayerApp::TestServiceLayerApp(vespalib::stringref configId) _nodeStateUpdater.setReportedNodeState(ns); } -TestServiceLayerApp::TestServiceLayerApp(DiskCount dc, NodeIndex index, +TestServiceLayerApp::TestServiceLayerApp(NodeIndex index, vespalib::stringref configId) : TestStorageApp(std::make_unique<ServiceLayerComponentRegisterImpl>(true), // TODO remove B-tree flag once default lib::NodeType::STORAGE, index, configId), _compReg(dynamic_cast<ServiceLayerComponentRegisterImpl&>(TestStorageApp::getComponentRegister())), _persistenceProvider() { - assert(dc == 1u); _compReg.setDiskCount(1); lib::NodeState ns(*_nodeStateUpdater.getReportedNodeState()); ns.setDiskCount(1); diff --git a/storage/src/tests/common/teststorageapp.h b/storage/src/tests/common/teststorageapp.h index f7edf5e0678..37c9e3c7ffe 100644 --- a/storage/src/tests/common/teststorageapp.h +++ b/storage/src/tests/common/teststorageapp.h @@ -112,9 +112,8 @@ class TestServiceLayerApp : public TestStorageApp PersistenceProviderUP _persistenceProvider; public: - TestServiceLayerApp(vespalib::stringref configId = ""); - TestServiceLayerApp(DiskCount diskCount, NodeIndex = NodeIndex(0xffff), - vespalib::stringref configId = ""); + TestServiceLayerApp(vespalib::stringref configId); + TestServiceLayerApp(NodeIndex = NodeIndex(0xffff), vespalib::stringref configId = ""); ~TestServiceLayerApp(); void setupDummyPersistence(); diff --git a/storage/src/tests/persistence/common/filestortestfixture.cpp b/storage/src/tests/persistence/common/filestortestfixture.cpp index 1282bcf85c3..49dbf082e34 100644 --- a/storage/src/tests/persistence/common/filestortestfixture.cpp +++ b/storage/src/tests/persistence/common/filestortestfixture.cpp @@ -28,8 +28,7 @@ FileStorTestFixture::setupPersistenceThreads(uint32_t threads) _config->getConfig("stor-server").set("node_index", "1"); _config->getConfig("stor-filestor").set("num_threads", std::to_string(threads)); - _node = std::make_unique<TestServiceLayerApp>( - DiskCount(1), NodeIndex(1), _config->getConfigId()); + _node = std::make_unique<TestServiceLayerApp>(NodeIndex(1), _config->getConfigId()); _testdoctype1 = _node->getTypeRepo()->getDocumentType("testdoctype1"); } @@ -60,7 +59,6 @@ FileStorTestFixture::createBucket(const document::BucketId& bid) StorBucketDatabase::WrappedEntry entry( _node->getStorageBucketDatabase().get(bid, "foo", StorBucketDatabase::CREATE_IF_NONEXISTING)); - entry->disk = 0; entry->info = api::BucketInfo(0, 0, 0, 0, 0, true, false); entry.write(); } diff --git a/storage/src/tests/persistence/filestorage/deactivatebucketstest.cpp b/storage/src/tests/persistence/filestorage/deactivatebucketstest.cpp index 18f8a235453..0fe18335c23 100644 --- a/storage/src/tests/persistence/filestorage/deactivatebucketstest.cpp +++ b/storage/src/tests/persistence/filestorage/deactivatebucketstest.cpp @@ -40,7 +40,6 @@ TEST_F(DeactivateBucketsTest, buckets_in_database_deactivated_when_node_down_in_ StorBucketDatabase::WrappedEntry entry( _node->getStorageBucketDatabase().get(bucket, "foo", StorBucketDatabase::CREATE_IF_NONEXISTING)); - entry->disk = 0; entry->info = serviceLayerInfo; entry.write(); } diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp index b7165312785..3525563eb7a 100644 --- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp +++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp @@ -82,7 +82,6 @@ struct FileStorManagerTest : Test{ StorBucketDatabase::WrappedEntry entry( _node->getStorageBucketDatabase().get(bid, "foo", StorBucketDatabase::CREATE_IF_NONEXISTING)); - entry->disk = disk; entry->info = api::BucketInfo(0, 0, 0, 0, 0, true, false); entry.write(); } @@ -123,7 +122,7 @@ struct FileStorManagerTest : Test{ new lib::ClusterState(state))); } - void setupDisks(uint32_t diskCount) { + void setupDisks() { std::string rootOfRoot = "filestormanagertest"; config.reset(new vdstestlib::DirConfig(getStandardConfig(true, rootOfRoot))); @@ -144,8 +143,7 @@ struct FileStorManagerTest : Test{ assert(system(vespalib::make_string("mkdir -p %s/disks/d0", getRootFolder(*config).c_str()).c_str()) == 0); assert(system(vespalib::make_string("mkdir -p %s/disks/d0", getRootFolder(*config2).c_str()).c_str()) == 0); try { - _node.reset(new TestServiceLayerApp(DiskCount(diskCount), NodeIndex(0), - config->getConfigId())); + _node = std::make_unique<TestServiceLayerApp>(NodeIndex(0), config->getConfigId()); _node->setupDummyPersistence(); } catch (config::InvalidConfigException& e) { fprintf(stderr, "%s\n", e.what()); @@ -198,12 +196,11 @@ std::unique_ptr<DiskThread> createThread(vdstestlib::DirConfig& config, TestServiceLayerApp& node, spi::PersistenceProvider& provider, FileStorHandler& filestorHandler, - FileStorThreadMetrics& metrics, - uint16_t deviceIndex) + FileStorThreadMetrics& metrics) { (void) config; return std::make_unique<PersistenceThread>(nullptr,node.getComponentRegister(), config.getConfigId(), - provider, filestorHandler, metrics, deviceIndex); + provider, filestorHandler, metrics); } namespace { @@ -227,7 +224,7 @@ struct TestFileStorComponents { void FileStorManagerTest::SetUp() { - setupDisks(1); + setupDisks(); } void @@ -399,7 +396,7 @@ TEST_F(FileStorManagerTest, handler_priority) { FileStorHandler filestorHandler(messageSender, metrics, _node->getComponentRegister()); filestorHandler.setGetNextMessageTimeout(50ms); - uint32_t stripeId = filestorHandler.getNextStripeId(0); + uint32_t stripeId = filestorHandler.getNextStripeId(); ASSERT_EQ(0u, stripeId); std::string content("Here is some content which is in all documents"); @@ -416,14 +413,14 @@ TEST_F(FileStorManagerTest, handler_priority) { auto address = std::make_shared<api::StorageMessageAddress>("storage", lib::NodeType::STORAGE, 3); cmd->setAddress(*address); cmd->setPriority(i * 15); - filestorHandler.schedule(cmd, 0); + filestorHandler.schedule(cmd); } - ASSERT_EQ(15, filestorHandler.getNextMessage(0, stripeId).second->getPriority()); - ASSERT_EQ(30, filestorHandler.getNextMessage(0, stripeId).second->getPriority()); - ASSERT_EQ(45, filestorHandler.getNextMessage(0, stripeId).second->getPriority()); - ASSERT_EQ(60, filestorHandler.getNextMessage(0, stripeId).second->getPriority()); - ASSERT_EQ(75, filestorHandler.getNextMessage(0, stripeId).second->getPriority()); + ASSERT_EQ(15, filestorHandler.getNextMessage(stripeId).second->getPriority()); + ASSERT_EQ(30, filestorHandler.getNextMessage(stripeId).second->getPriority()); + ASSERT_EQ(45, filestorHandler.getNextMessage(stripeId).second->getPriority()); + ASSERT_EQ(60, filestorHandler.getNextMessage(stripeId).second->getPriority()); + ASSERT_EQ(75, filestorHandler.getNextMessage(stripeId).second->getPriority()); } class MessagePusherThread : public document::Runnable { @@ -442,7 +439,7 @@ public: document::BucketId bucket(16, factory.getBucketId(_doc->getId()).getRawId()); auto cmd = std::make_shared<api::PutCommand>(makeDocumentBucket(bucket), _doc, 100); - _handler.schedule(cmd, 0); + _handler.schedule(cmd); std::this_thread::sleep_for(1ms); } @@ -466,13 +463,13 @@ public: std::atomic<bool> _threadDone; explicit MessageFetchingThread(FileStorHandler& handler) - : _threadId(handler.getNextStripeId(0)), _handler(handler), _config(0), _fetchedCount(0), _done(false), + : _threadId(handler.getNextStripeId()), _handler(handler), _config(0), _fetchedCount(0), _done(false), _failed(false), _threadDone(false) {} void run() override { while (!_done) { - FileStorHandler::LockedMessage msg = _handler.getNextMessage(0, _threadId); + FileStorHandler::LockedMessage msg = _handler.getNextMessage(_threadId); if (msg.second.get()) { uint32_t originalConfig = _config.load(); _fetchedCount++; @@ -553,7 +550,7 @@ TEST_F(FileStorManagerTest, handler_pause) { FileStorHandler filestorHandler(messageSender, metrics, _node->getComponentRegister()); filestorHandler.setGetNextMessageTimeout(50ms); - uint32_t stripeId = filestorHandler.getNextStripeId(0); + uint32_t stripeId = filestorHandler.getNextStripeId(); std::string content("Here is some content which is in all documents"); std::ostringstream uri; @@ -569,18 +566,18 @@ TEST_F(FileStorManagerTest, handler_pause) { auto address = std::make_unique<api::StorageMessageAddress>("storage", lib::NodeType::STORAGE, 3); cmd->setAddress(*address); cmd->setPriority(i * 15); - filestorHandler.schedule(cmd, 0); + filestorHandler.schedule(cmd); } - ASSERT_EQ(15, filestorHandler.getNextMessage(0, stripeId).second->getPriority()); + ASSERT_EQ(15, filestorHandler.getNextMessage(stripeId).second->getPriority()); { ResumeGuard guard = filestorHandler.pause(); (void)guard; - ASSERT_EQ(filestorHandler.getNextMessage(0, stripeId).second.get(), nullptr); + ASSERT_EQ(filestorHandler.getNextMessage(stripeId).second.get(), nullptr); } - ASSERT_EQ(30, filestorHandler.getNextMessage(0, stripeId).second->getPriority()); + ASSERT_EQ(30, filestorHandler.getNextMessage(stripeId).second->getPriority()); } TEST_F(FileStorManagerTest, remap_split) { @@ -612,8 +609,8 @@ TEST_F(FileStorManagerTest, remap_split) { // Populate bucket with the given data for (uint32_t i = 1; i < 4; i++) { - filestorHandler.schedule(std::make_shared<api::PutCommand>(makeDocumentBucket(bucket1), doc1, i), 0); - filestorHandler.schedule(std::make_shared<api::PutCommand>(makeDocumentBucket(bucket2), doc2, i + 10), 0); + filestorHandler.schedule(std::make_shared<api::PutCommand>(makeDocumentBucket(bucket1), doc1, i)); + filestorHandler.schedule(std::make_shared<api::PutCommand>(makeDocumentBucket(bucket2), doc2, i + 10)); } EXPECT_EQ("BucketId(0x40000000000004d2): Put(BucketId(0x40000000000004d2), id:footype:testdoctype1:n=1234:bar, timestamp 1, size 118) (priority: 127)\n" @@ -622,11 +619,11 @@ TEST_F(FileStorManagerTest, remap_split) { "BucketId(0x40000000000011d7): Put(BucketId(0x40000000000011d7), id:footype:testdoctype1:n=4567:bar, timestamp 12, size 118) (priority: 127)\n" "BucketId(0x40000000000004d2): Put(BucketId(0x40000000000004d2), id:footype:testdoctype1:n=1234:bar, timestamp 3, size 118) (priority: 127)\n" "BucketId(0x40000000000011d7): Put(BucketId(0x40000000000011d7), id:footype:testdoctype1:n=4567:bar, timestamp 13, size 118) (priority: 127)\n", - filestorHandler.dumpQueue(0)); + filestorHandler.dumpQueue()); - FileStorHandler::RemapInfo a(makeDocumentBucket(document::BucketId(17, 1234)), 0); - FileStorHandler::RemapInfo b(makeDocumentBucket(document::BucketId(17, 1234 | 1 << 16)), 0); - filestorHandler.remapQueueAfterSplit(FileStorHandler::RemapInfo(makeDocumentBucket(bucket1), 0), a, b); + FileStorHandler::RemapInfo a(makeDocumentBucket(document::BucketId(17, 1234))); + FileStorHandler::RemapInfo b(makeDocumentBucket(document::BucketId(17, 1234 | 1 << 16))); + filestorHandler.remapQueueAfterSplit(FileStorHandler::RemapInfo(makeDocumentBucket(bucket1)), a, b); ASSERT_TRUE(a.foundInQueue); ASSERT_FALSE(b.foundInQueue); @@ -637,7 +634,7 @@ TEST_F(FileStorManagerTest, remap_split) { "BucketId(0x44000000000004d2): Put(BucketId(0x44000000000004d2), id:footype:testdoctype1:n=1234:bar, timestamp 1, size 118) (priority: 127)\n" "BucketId(0x44000000000004d2): Put(BucketId(0x44000000000004d2), id:footype:testdoctype1:n=1234:bar, timestamp 2, size 118) (priority: 127)\n" "BucketId(0x44000000000004d2): Put(BucketId(0x44000000000004d2), id:footype:testdoctype1:n=1234:bar, timestamp 3, size 118) (priority: 127)\n", - filestorHandler.dumpQueue(0)); + filestorHandler.dumpQueue()); } TEST_F(FileStorManagerTest, handler_timeout) { @@ -657,7 +654,7 @@ TEST_F(FileStorManagerTest, handler_timeout) { FileStorHandler filestorHandler(messageSender, metrics, _node->getComponentRegister()); filestorHandler.setGetNextMessageTimeout(50ms); - uint32_t stripeId = filestorHandler.getNextStripeId(0); + uint32_t stripeId = filestorHandler.getNextStripeId(); std::string content("Here is some content which is in all documents"); std::ostringstream uri; @@ -674,7 +671,7 @@ TEST_F(FileStorManagerTest, handler_timeout) { cmd->setAddress(*address); cmd->setPriority(0); cmd->setTimeout(50ms); - filestorHandler.schedule(cmd, 0); + filestorHandler.schedule(cmd); } { @@ -683,12 +680,12 @@ TEST_F(FileStorManagerTest, handler_timeout) { cmd->setAddress(*address); cmd->setPriority(200); cmd->setTimeout(10000ms); - filestorHandler.schedule(cmd, 0); + filestorHandler.schedule(cmd); } std::this_thread::sleep_for(51ms); for (;;) { - auto lock = filestorHandler.getNextMessage(0, stripeId); + auto lock = filestorHandler.getNextMessage(stripeId); if (lock.first.get()) { ASSERT_EQ(200, lock.second->getPriority()); break; @@ -718,10 +715,10 @@ TEST_F(FileStorManagerTest, priority) { FileStorHandler filestorHandler(messageSender, metrics, _node->getComponentRegister()); std::unique_ptr<DiskThread> thread(createThread( *config, *_node, _node->getPersistenceProvider(), - filestorHandler, *metrics.disks[0]->threads[0], 0)); + filestorHandler, *metrics.disks[0]->threads[0])); std::unique_ptr<DiskThread> thread2(createThread( *config, *_node, _node->getPersistenceProvider(), - filestorHandler, *metrics.disks[0]->threads[1], 0)); + filestorHandler, *metrics.disks[0]->threads[1])); // Creating documents to test with. Different gids, 2 locations. std::vector<document::Document::SP > documents; @@ -753,7 +750,7 @@ TEST_F(FileStorManagerTest, priority) { auto address = std::make_unique<api::StorageMessageAddress>("storage", lib::NodeType::STORAGE, 3); cmd->setAddress(*address); cmd->setPriority(i * 2); - filestorHandler.schedule(cmd, 0); + filestorHandler.schedule(cmd); } filestorHandler.flush(true); @@ -798,7 +795,7 @@ TEST_F(FileStorManagerTest, split1) { FileStorHandler filestorHandler(messageSender, metrics, _node->getComponentRegister()); std::unique_ptr<DiskThread> thread(createThread( *config, *_node, _node->getPersistenceProvider(), - filestorHandler, *metrics.disks[0]->threads[0], 0)); + filestorHandler, *metrics.disks[0]->threads[0])); // Creating documents to test with. Different gids, 2 locations. std::vector<document::Document::SP > documents; for (uint32_t i=0; i<20; ++i) { @@ -828,7 +825,7 @@ TEST_F(FileStorManagerTest, split1) { cmd->setAddress(*address); cmd->setSourceIndex(0); - filestorHandler.schedule(cmd, 0); + filestorHandler.schedule(cmd); filestorHandler.flush(true); LOG(debug, "Got %zu replies", top.getNumReplies()); ASSERT_EQ(1, top.getNumReplies()); @@ -842,7 +839,7 @@ TEST_F(FileStorManagerTest, split1) { auto rcmd = std::make_shared<api::RemoveCommand>( makeDocumentBucket(bucket), documents[i]->getId(), 1000000 + 100 + i); rcmd->setAddress(*address); - filestorHandler.schedule(rcmd, 0); + filestorHandler.schedule(rcmd); filestorHandler.flush(true); ASSERT_EQ(1, top.getNumReplies()); auto rreply = std::dynamic_pointer_cast<api::RemoveReply>(top.getReply(0)); @@ -856,7 +853,7 @@ TEST_F(FileStorManagerTest, split1) { { auto cmd = std::make_shared<api::SplitBucketCommand>(makeDocumentBucket(document::BucketId(16, 1))); cmd->setSourceIndex(0); - filestorHandler.schedule(cmd, 0); + filestorHandler.schedule(cmd); filestorHandler.flush(true); ASSERT_EQ(1, top.getNumReplies()); auto reply = std::dynamic_pointer_cast<api::SplitBucketReply>(top.getReply(0)); @@ -873,7 +870,7 @@ TEST_F(FileStorManagerTest, split1) { makeDocumentBucket(bucket), documents[i]->getId(), document::AllFields::NAME); api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 3); cmd->setAddress(address); - filestorHandler.schedule(cmd, 0); + filestorHandler.schedule(cmd); filestorHandler.flush(true); ASSERT_EQ(1, top.getNumReplies()); auto reply = std::dynamic_pointer_cast<api::GetReply>(top.getReply(0)); @@ -887,7 +884,7 @@ TEST_F(FileStorManagerTest, split1) { auto cmd = std::make_shared<api::SplitBucketCommand>( makeDocumentBucket(document::BucketId(i, 0x0100001))); cmd->setSourceIndex(0); - filestorHandler.schedule(cmd, 0); + filestorHandler.schedule(cmd); filestorHandler.flush(true); ASSERT_EQ(1, top.getNumReplies()); auto reply = std::dynamic_pointer_cast<api::SplitBucketReply>(top.getReply(0)); @@ -909,7 +906,7 @@ TEST_F(FileStorManagerTest, split1) { makeDocumentBucket(bucket), documents[i]->getId(), document::AllFields::NAME); api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 3); cmd->setAddress(address); - filestorHandler.schedule(cmd, 0); + filestorHandler.schedule(cmd); filestorHandler.flush(true); ASSERT_EQ(1, top.getNumReplies()); auto reply = std::dynamic_pointer_cast<api::GetReply>(top.getReply(0)); @@ -945,7 +942,7 @@ TEST_F(FileStorManagerTest, split_single_group) { std::unique_ptr<DiskThread> thread(createThread( *config, *_node, _node->getPersistenceProvider(), - filestorHandler, *metrics.disks[0]->threads[0], 0)); + filestorHandler, *metrics.disks[0]->threads[0])); // Creating documents to test with. Different gids, 2 locations. std::vector<document::Document::SP> documents; for (uint32_t i=0; i<20; ++i) { @@ -968,7 +965,7 @@ TEST_F(FileStorManagerTest, split_single_group) { auto cmd = std::make_shared<api::PutCommand>(makeDocumentBucket(bucket), documents[i], 100 + i); api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 3); cmd->setAddress(address); - filestorHandler.schedule(cmd, 0); + filestorHandler.schedule(cmd); filestorHandler.flush(true); ASSERT_EQ(1, top.getNumReplies()); auto reply = std::dynamic_pointer_cast<api::PutReply>(top.getReply(0)); @@ -980,7 +977,7 @@ TEST_F(FileStorManagerTest, split_single_group) { { auto cmd = std::make_shared<api::SplitBucketCommand>(makeDocumentBucket(document::BucketId(16, 1))); cmd->setSourceIndex(0); - filestorHandler.schedule(cmd, 0); + filestorHandler.schedule(cmd); filestorHandler.flush(true); ASSERT_EQ(1, top.getNumReplies()); auto reply = std::dynamic_pointer_cast<api::SplitBucketReply>(top.getReply(0)); @@ -996,7 +993,7 @@ TEST_F(FileStorManagerTest, split_single_group) { (makeDocumentBucket(bucket), documents[i]->getId(), document::AllFields::NAME); api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 3); cmd->setAddress(address); - filestorHandler.schedule(cmd, 0); + filestorHandler.schedule(cmd); filestorHandler.flush(true); ASSERT_EQ(1, top.getNumReplies()); auto reply = std::dynamic_pointer_cast<api::GetReply>(top.getReply(0)); @@ -1023,19 +1020,15 @@ FileStorManagerTest::putDoc(DummyStorageLink& top, document::DocumentId docId(vespalib::make_string("id:ns:testdoctype1:n=%" PRIu64 ":%d", target.getId(), docNum)); document::BucketId bucket(16, factory.getBucketId(docId).getRawId()); //std::cerr << "doc bucket is " << bucket << " vs source " << source << "\n"; - _node->getPersistenceProvider().createBucket( - makeSpiBucket(target), context); + _node->getPersistenceProvider().createBucket(makeSpiBucket(target), context); Document::SP doc(new Document(*_testdoctype1, docId)); - std::shared_ptr<api::PutCommand> cmd( - new api::PutCommand(makeDocumentBucket(target), doc, docNum+1)); + auto cmd = std::make_shared<api::PutCommand>(makeDocumentBucket(target), doc, docNum+1); cmd->setAddress(address); cmd->setPriority(120); - filestorHandler.schedule(cmd, 0); + filestorHandler.schedule(cmd); filestorHandler.flush(true); ASSERT_EQ(1, top.getNumReplies()); - std::shared_ptr<api::PutReply> reply( - std::dynamic_pointer_cast<api::PutReply>( - top.getReply(0))); + std::shared_ptr<api::PutReply> reply(std::dynamic_pointer_cast<api::PutReply>(top.getReply(0))); ASSERT_TRUE(reply.get()); ASSERT_EQ(ReturnCode(ReturnCode::OK), reply->getResult()); top.reset(); @@ -1044,8 +1037,7 @@ FileStorManagerTest::putDoc(DummyStorageLink& top, TEST_F(FileStorManagerTest, split_empty_target_with_remapped_ops) { DummyStorageLink top; DummyStorageLink *dummyManager; - top.push_back(std::unique_ptr<StorageLink>( - dummyManager = new DummyStorageLink)); + top.push_back(std::unique_ptr<StorageLink>(dummyManager = new DummyStorageLink)); setClusterState("storage:2 distributor:1"); top.open(); ForwardingMessageSender messageSender(*dummyManager); @@ -1055,7 +1047,7 @@ TEST_F(FileStorManagerTest, split_empty_target_with_remapped_ops) { FileStorHandler filestorHandler(messageSender, metrics, _node->getComponentRegister()); std::unique_ptr<DiskThread> thread(createThread( *config, *_node, _node->getPersistenceProvider(), - filestorHandler, *metrics.disks[0]->threads[0], 0)); + filestorHandler, *metrics.disks[0]->threads[0])); document::BucketId source(16, 0x10001); @@ -1084,8 +1076,8 @@ TEST_F(FileStorManagerTest, split_empty_target_with_remapped_ops) { putCmd->setAddress(address); putCmd->setPriority(120); - filestorHandler.schedule(splitCmd, 0); - filestorHandler.schedule(putCmd, 0); + filestorHandler.schedule(splitCmd); + filestorHandler.schedule(putCmd); resumeGuard.reset(); // Unpause filestorHandler.flush(true); @@ -1120,7 +1112,7 @@ TEST_F(FileStorManagerTest, notify_on_split_source_ownership_changed) { FileStorHandler filestorHandler(messageSender, metrics, _node->getComponentRegister()); std::unique_ptr<DiskThread> thread(createThread( *config, *_node, _node->getPersistenceProvider(), - filestorHandler, *metrics.disks[0]->threads[0], 0)); + filestorHandler, *metrics.disks[0]->threads[0])); document::BucketId source(getFirstBucketNotOwnedByDistributor(0)); createBucket(source, 0); @@ -1132,7 +1124,7 @@ TEST_F(FileStorManagerTest, notify_on_split_source_ownership_changed) { splitCmd->setPriority(120); splitCmd->setSourceIndex(0); // Source not owned by this distributor. - filestorHandler.schedule(splitCmd, 0); + filestorHandler.schedule(splitCmd); filestorHandler.flush(true); top.waitForMessages(4, _waitTime); // 3 notify cmds + split reply @@ -1161,7 +1153,7 @@ TEST_F(FileStorManagerTest, join) { FileStorHandler filestorHandler(messageSender, metrics, _node->getComponentRegister()); std::unique_ptr<DiskThread> thread(createThread( *config, *_node, _node->getPersistenceProvider(), - filestorHandler, *metrics.disks[0]->threads[0], 0)); + filestorHandler, *metrics.disks[0]->threads[0])); // Creating documents to test with. Different gids, 2 locations. std::vector<document::Document::SP > documents; for (uint32_t i=0; i<20; ++i) { @@ -1182,7 +1174,7 @@ TEST_F(FileStorManagerTest, join) { auto cmd = std::make_shared<api::PutCommand>(makeDocumentBucket(bucket), documents[i], 100 + i); auto address = std::make_unique<api::StorageMessageAddress>("storage", lib::NodeType::STORAGE, 3); cmd->setAddress(*address); - filestorHandler.schedule(cmd, 0); + filestorHandler.schedule(cmd); filestorHandler.flush(true); ASSERT_EQ(1, top.getNumReplies()); auto reply = std::dynamic_pointer_cast<api::PutReply>(top.getReply(0)); @@ -1194,7 +1186,7 @@ TEST_F(FileStorManagerTest, join) { auto rcmd = std::make_shared<api::RemoveCommand>( makeDocumentBucket(bucket), documents[i]->getId(), 1000000 + 100 + i); rcmd->setAddress(*address); - filestorHandler.schedule(rcmd, 0); + filestorHandler.schedule(rcmd); filestorHandler.flush(true); ASSERT_EQ(1, top.getNumReplies()); auto rreply = std::dynamic_pointer_cast<api::RemoveReply>(top.getReply(0)); @@ -1209,7 +1201,7 @@ TEST_F(FileStorManagerTest, join) { auto cmd = std::make_shared<api::JoinBucketsCommand>(makeDocumentBucket(document::BucketId(16, 1))); cmd->getSourceBuckets().emplace_back(document::BucketId(17, 0x00001)); cmd->getSourceBuckets().emplace_back(document::BucketId(17, 0x10001)); - filestorHandler.schedule(cmd, 0); + filestorHandler.schedule(cmd); filestorHandler.flush(true); ASSERT_EQ(1, top.getNumReplies()); auto reply = std::dynamic_pointer_cast<api::JoinBucketsReply>(top.getReply(0)); @@ -1224,7 +1216,7 @@ TEST_F(FileStorManagerTest, join) { makeDocumentBucket(bucket), documents[i]->getId(), document::AllFields::NAME); api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 3); cmd->setAddress(address); - filestorHandler.schedule(cmd, 0); + filestorHandler.schedule(cmd); filestorHandler.flush(true); ASSERT_EQ(1, top.getNumReplies()); auto reply = std::dynamic_pointer_cast<api::GetReply>(top.getReply(0)); diff --git a/storage/src/tests/persistence/filestorage/modifiedbucketcheckertest.cpp b/storage/src/tests/persistence/filestorage/modifiedbucketcheckertest.cpp index 1660fed9e38..732cc402641 100644 --- a/storage/src/tests/persistence/filestorage/modifiedbucketcheckertest.cpp +++ b/storage/src/tests/persistence/filestorage/modifiedbucketcheckertest.cpp @@ -41,8 +41,7 @@ void ModifiedBucketCheckerTest::SetUp() { _config.reset(new vdstestlib::DirConfig(getStandardConfig(true))); - _node.reset(new TestServiceLayerApp(DiskCount(1), NodeIndex(0), - _config->getConfigId())); + _node.reset(new TestServiceLayerApp(NodeIndex(0), _config->getConfigId())); _node->setupDummyPersistence(); _top.reset(new DummyStorageLink); diff --git a/storage/src/tests/persistence/filestorage/sanitycheckeddeletetest.cpp b/storage/src/tests/persistence/filestorage/sanitycheckeddeletetest.cpp index 787a63a618c..8c9ea11f1c8 100644 --- a/storage/src/tests/persistence/filestorage/sanitycheckeddeletetest.cpp +++ b/storage/src/tests/persistence/filestorage/sanitycheckeddeletetest.cpp @@ -33,7 +33,6 @@ TEST_F(SanityCheckedDeleteTest, delete_bucket_fails_when_provider_out_of_sync) { StorBucketDatabase::WrappedEntry entry( _node->getStorageBucketDatabase().get(bucket, "foo", StorBucketDatabase::CREATE_IF_NONEXISTING)); - entry->disk = 0; entry->info = serviceLayerInfo; entry.write(); } diff --git a/storage/src/tests/persistence/mergehandlertest.cpp b/storage/src/tests/persistence/mergehandlertest.cpp index df0ea3e6680..565d8b5ee3c 100644 --- a/storage/src/tests/persistence/mergehandlertest.cpp +++ b/storage/src/tests/persistence/mergehandlertest.cpp @@ -176,7 +176,6 @@ MergeHandlerTest::SetUp() { LOG(debug, "Creating %s in bucket database", _bucket.toString().c_str()); bucketdb::StorageBucketInfo bucketDBEntry; - bucketDBEntry.disk = 0; getEnv().getBucketDatabase(_bucket.getBucketSpace()).insert(_bucket.getBucketId(), bucketDBEntry, "mergetestsetup"); LOG(debug, "Creating bucket to merge"); diff --git a/storage/src/tests/persistence/persistencequeuetest.cpp b/storage/src/tests/persistence/persistencequeuetest.cpp index 8edb03b67fa..7e54b45f96a 100644 --- a/storage/src/tests/persistence/persistencequeuetest.cpp +++ b/storage/src/tests/persistence/persistencequeuetest.cpp @@ -34,8 +34,6 @@ public: explicit Fixture(FileStorTestFixture& parent); ~Fixture(); }; - - static constexpr uint16_t _disk = 0; }; PersistenceQueueTest::Fixture::Fixture(FileStorTestFixture& parent_) @@ -58,7 +56,7 @@ PersistenceQueueTest::Fixture::Fixture(FileStorTestFixture& parent_) // and small enough to not slow down testing too much. filestorHandler->setGetNextMessageTimeout(20ms); - stripeId = filestorHandler->getNextStripeId(0); + stripeId = filestorHandler->getNextStripeId(); } PersistenceQueueTest::Fixture::~Fixture() = default; @@ -90,16 +88,16 @@ TEST_F(PersistenceQueueTest, fetch_next_unlocked_message_if_bucket_locked) { // getNextMessage 2 times should then return a lock on the first bucket, // then subsequently on the second, skipping the already locked bucket. // Puts all have same pri, so order is well defined. - f.filestorHandler->schedule(createPut(1234, 0), _disk); - f.filestorHandler->schedule(createPut(1234, 1), _disk); - f.filestorHandler->schedule(createPut(5432, 0), _disk); + f.filestorHandler->schedule(createPut(1234, 0)); + f.filestorHandler->schedule(createPut(1234, 1)); + f.filestorHandler->schedule(createPut(5432, 0)); - auto lock0 = f.filestorHandler->getNextMessage(_disk, f.stripeId); + auto lock0 = f.filestorHandler->getNextMessage(f.stripeId); ASSERT_TRUE(lock0.first.get()); EXPECT_EQ(document::BucketId(16, 1234), dynamic_cast<api::PutCommand&>(*lock0.second).getBucketId()); - auto lock1 = f.filestorHandler->getNextMessage(_disk, f.stripeId); + auto lock1 = f.filestorHandler->getNextMessage(f.stripeId); ASSERT_TRUE(lock1.first.get()); EXPECT_EQ(document::BucketId(16, 5432), dynamic_cast<api::PutCommand&>(*lock1.second).getBucketId()); @@ -108,16 +106,16 @@ TEST_F(PersistenceQueueTest, fetch_next_unlocked_message_if_bucket_locked) { TEST_F(PersistenceQueueTest, shared_locked_operations_allow_concurrent_bucket_access) { Fixture f(*this); - f.filestorHandler->schedule(createGet(1234), _disk); - f.filestorHandler->schedule(createGet(1234), _disk); + f.filestorHandler->schedule(createGet(1234)); + f.filestorHandler->schedule(createGet(1234)); - auto lock0 = f.filestorHandler->getNextMessage(_disk, f.stripeId); + auto lock0 = f.filestorHandler->getNextMessage(f.stripeId); ASSERT_TRUE(lock0.first.get()); EXPECT_EQ(api::LockingRequirements::Shared, lock0.first->lockingRequirements()); // Even though we already have a lock on the bucket, Gets allow shared locking and we // should therefore be able to get another lock. - auto lock1 = f.filestorHandler->getNextMessage(_disk, f.stripeId); + auto lock1 = f.filestorHandler->getNextMessage(f.stripeId); ASSERT_TRUE(lock1.first.get()); EXPECT_EQ(api::LockingRequirements::Shared, lock1.first->lockingRequirements()); } @@ -125,45 +123,45 @@ TEST_F(PersistenceQueueTest, shared_locked_operations_allow_concurrent_bucket_ac TEST_F(PersistenceQueueTest, exclusive_locked_operation_not_started_if_shared_op_active) { Fixture f(*this); - f.filestorHandler->schedule(createGet(1234), _disk); - f.filestorHandler->schedule(createPut(1234, 0), _disk); + f.filestorHandler->schedule(createGet(1234)); + f.filestorHandler->schedule(createPut(1234, 0)); - auto lock0 = f.filestorHandler->getNextMessage(_disk, f.stripeId); + auto lock0 = f.filestorHandler->getNextMessage(f.stripeId); ASSERT_TRUE(lock0.first.get()); EXPECT_EQ(api::LockingRequirements::Shared, lock0.first->lockingRequirements()); // Expected to time out - auto lock1 = f.filestorHandler->getNextMessage(_disk, f.stripeId); + auto lock1 = f.filestorHandler->getNextMessage(f.stripeId); ASSERT_FALSE(lock1.first.get()); } TEST_F(PersistenceQueueTest, shared_locked_operation_not_started_if_exclusive_op_active) { Fixture f(*this); - f.filestorHandler->schedule(createPut(1234, 0), _disk); - f.filestorHandler->schedule(createGet(1234), _disk); + f.filestorHandler->schedule(createPut(1234, 0)); + f.filestorHandler->schedule(createGet(1234)); - auto lock0 = f.filestorHandler->getNextMessage(_disk, f.stripeId); + auto lock0 = f.filestorHandler->getNextMessage(f.stripeId); ASSERT_TRUE(lock0.first.get()); EXPECT_EQ(api::LockingRequirements::Exclusive, lock0.first->lockingRequirements()); // Expected to time out - auto lock1 = f.filestorHandler->getNextMessage(_disk, f.stripeId); + auto lock1 = f.filestorHandler->getNextMessage(f.stripeId); ASSERT_FALSE(lock1.first.get()); } TEST_F(PersistenceQueueTest, exclusive_locked_operation_not_started_if_exclusive_op_active) { Fixture f(*this); - f.filestorHandler->schedule(createPut(1234, 0), _disk); - f.filestorHandler->schedule(createPut(1234, 0), _disk); + f.filestorHandler->schedule(createPut(1234, 0)); + f.filestorHandler->schedule(createPut(1234, 0)); - auto lock0 = f.filestorHandler->getNextMessage(_disk, f.stripeId); + auto lock0 = f.filestorHandler->getNextMessage(f.stripeId); ASSERT_TRUE(lock0.first.get()); EXPECT_EQ(api::LockingRequirements::Exclusive, lock0.first->lockingRequirements()); // Expected to time out - auto lock1 = f.filestorHandler->getNextMessage(_disk, f.stripeId); + auto lock1 = f.filestorHandler->getNextMessage(f.stripeId); ASSERT_FALSE(lock1.first.get()); } diff --git a/storage/src/tests/persistence/persistencetestutils.cpp b/storage/src/tests/persistence/persistencetestutils.cpp index 11a876ad0e5..7185271bea4 100644 --- a/storage/src/tests/persistence/persistencetestutils.cpp +++ b/storage/src/tests/persistence/persistencetestutils.cpp @@ -25,13 +25,11 @@ namespace { spi::LoadType defaultLoadType(0, "default"); - vdstestlib::DirConfig initialize(uint32_t numDisks, const std::string & rootOfRoot) { + vdstestlib::DirConfig initialize(const std::string & rootOfRoot) { vdstestlib::DirConfig config(getStandardConfig(true, rootOfRoot)); std::string rootFolder = getRootFolder(config); vespalib::rmdir(rootFolder, true); - for (uint32_t i = 0; i < numDisks; i++) { - vespalib::mkdir(vespalib::make_string("%s/disks/d%d", rootFolder.c_str(), i), true); - } + vespalib::mkdir(vespalib::make_string("%s/disks/d0", rootFolder.c_str()), true); return config; } @@ -47,27 +45,23 @@ namespace { }; } -PersistenceTestEnvironment::PersistenceTestEnvironment(DiskCount numDisks, const std::string & rootOfRoot) - : _config(initialize(numDisks, rootOfRoot)), +PersistenceTestEnvironment::PersistenceTestEnvironment(const std::string & rootOfRoot) + : _config(initialize(rootOfRoot)), _messageKeeper(), - _node(numDisks, NodeIndex(0), _config.getConfigId()), + _node(NodeIndex(0), _config.getConfigId()), _component(_node.getComponentRegister(), "persistence test env"), _metrics(_component.getLoadTypes()->getMetricLoadTypes()) { _node.setupDummyPersistence(); - _metrics.initDiskMetrics(numDisks, _node.getLoadTypes()->getMetricLoadTypes(), 1, 1); - _handler = std::make_unique<FileStorHandler>(_messageKeeper, _metrics, - _node.getComponentRegister()); - for (uint32_t i = 0; i < numDisks; i++) { - _diskEnvs.push_back( - std::make_unique<PersistenceUtil>(_config.getConfigId(), _node.getComponentRegister(), *_handler, - *_metrics.disks[i]->threads[0], i, _node.getPersistenceProvider())); - } + _metrics.initDiskMetrics(1, _node.getLoadTypes()->getMetricLoadTypes(), 1, 1); + _handler = std::make_unique<FileStorHandler>(_messageKeeper, _metrics, _node.getComponentRegister()); + _diskEnv = std::make_unique<PersistenceUtil>(_config.getConfigId(), _node.getComponentRegister(), *_handler, + *_metrics.disks[0]->threads[0], _node.getPersistenceProvider()); } PersistenceTestEnvironment::~PersistenceTestEnvironment() { _handler->close(); - while (!_handler->closed(0)) { + while (!_handler->closed()) { std::this_thread::sleep_for(1ms); } } @@ -76,15 +70,14 @@ PersistenceTestUtils::PersistenceTestUtils() = default; PersistenceTestUtils::~PersistenceTestUtils() = default; std::string -PersistenceTestUtils::dumpBucket(const document::BucketId& bid, uint16_t disk) { - assert(disk == 0u); +PersistenceTestUtils::dumpBucket(const document::BucketId& bid) { return dynamic_cast<spi::dummy::DummyPersistence&>(_env->_node.getPersistenceProvider()).dumpBucket(makeSpiBucket(bid)); } void -PersistenceTestUtils::setupDisks(uint32_t numDisks) { - _env = std::make_unique<PersistenceTestEnvironment>(DiskCount(numDisks), "todo-make-unique-persistencetestutils"); - setupExecutor(numDisks); +PersistenceTestUtils::setupDisks() { + _env = std::make_unique<PersistenceTestEnvironment>("todo-make-unique-persistencetestutils"); + setupExecutor(2); } void @@ -93,27 +86,24 @@ PersistenceTestUtils::setupExecutor(uint32_t numThreads) { } std::unique_ptr<PersistenceThread> -PersistenceTestUtils::createPersistenceThread(uint32_t disk) +PersistenceTestUtils::createPersistenceThread() { return std::make_unique<PersistenceThread>(_sequenceTaskExecutor.get(), _env->_node.getComponentRegister(), _env->_config.getConfigId(),getPersistenceProvider(), - getEnv()._fileStorHandler, getEnv()._metrics, disk); + getEnv()._fileStorHandler, getEnv()._metrics); } document::Document::SP PersistenceTestUtils::schedulePut( uint32_t location, spi::Timestamp timestamp, - uint16_t disk, uint32_t minSize, uint32_t maxSize) { document::Document::SP doc(createRandomDocumentAtLocation( location, timestamp, minSize, maxSize)); - std::shared_ptr<api::StorageMessage> msg( - new api::PutCommand( - makeDocumentBucket(document::BucketId(16, location)), doc, timestamp)); - fsHandler().schedule(msg, disk); + auto msg = std::make_shared<api::PutCommand>(makeDocumentBucket(document::BucketId(16, location)), doc, timestamp); + fsHandler().schedule(msg); return doc; } @@ -150,7 +140,7 @@ PersistenceTestUtils::getBucketStatus(const document::BucketId& id) if (!entry.exist()) { ost << "null"; } else { - ost << entry->getBucketInfo().getDocumentCount() << "," << entry->disk; + ost << entry->getBucketInfo().getDocumentCount(); } return ost.str(); @@ -158,76 +148,52 @@ PersistenceTestUtils::getBucketStatus(const document::BucketId& id) document::Document::SP PersistenceTestUtils::doPutOnDisk( - uint16_t disk, uint32_t location, spi::Timestamp timestamp, uint32_t minSize, uint32_t maxSize) { - document::Document::SP doc(createRandomDocumentAtLocation( - location, timestamp, minSize, maxSize)); - assert(disk == 0u); + document::Document::SP doc(createRandomDocumentAtLocation(location, timestamp, minSize, maxSize)); spi::Bucket b(makeSpiBucket(document::BucketId(16, location))); - spi::Context context(defaultLoadType, spi::Priority(0), - spi::Trace::TraceLevel(0)); - + spi::Context context(defaultLoadType, spi::Priority(0), spi::Trace::TraceLevel(0)); getPersistenceProvider().createBucket(b, context); - getPersistenceProvider().put(spi::Bucket(b), timestamp, doc, context); - return doc; } bool PersistenceTestUtils::doRemoveOnDisk( - uint16_t disk, const document::BucketId& bucketId, const document::DocumentId& docId, spi::Timestamp timestamp, bool persistRemove) { - spi::Context context(defaultLoadType, spi::Priority(0), - spi::Trace::TraceLevel(0)); - assert(disk == 0u); + spi::Context context(defaultLoadType, spi::Priority(0), spi::Trace::TraceLevel(0)); if (persistRemove) { - spi::RemoveResult result = getPersistenceProvider().removeIfFound( - makeSpiBucket(bucketId), - timestamp, docId, context); + spi::RemoveResult result = getPersistenceProvider().removeIfFound(makeSpiBucket(bucketId),timestamp, docId, context); return result.wasFound(); } - spi::RemoveResult result = getPersistenceProvider().remove( - makeSpiBucket(bucketId), - timestamp, docId, context); + spi::RemoveResult result = getPersistenceProvider().remove(makeSpiBucket(bucketId), timestamp, docId, context); return result.wasFound(); } bool PersistenceTestUtils::doUnrevertableRemoveOnDisk( - uint16_t disk, const document::BucketId& bucketId, const document::DocumentId& docId, spi::Timestamp timestamp) { - assert(disk == 0u); - spi::Context context(defaultLoadType, spi::Priority(0), - spi::Trace::TraceLevel(0)); - spi::RemoveResult result = getPersistenceProvider().remove( - makeSpiBucket(bucketId), - timestamp, docId, context); + spi::Context context(defaultLoadType, spi::Priority(0),spi::Trace::TraceLevel(0)); + spi::RemoveResult result = getPersistenceProvider().remove(makeSpiBucket(bucketId), timestamp, docId, context); return result.wasFound(); } spi::GetResult -PersistenceTestUtils::doGetOnDisk( - uint16_t disk, - const document::BucketId& bucketId, - const document::DocumentId& docId) +PersistenceTestUtils::doGetOnDisk(const document::BucketId& bucketId, const document::DocumentId& docId) { auto fieldSet = std::make_unique<document::AllFields>(); - spi::Context context(defaultLoadType, spi::Priority(0), - spi::Trace::TraceLevel(0)); - assert(disk == 0u); + spi::Context context(defaultLoadType, spi::Priority(0), spi::Trace::TraceLevel(0)); return getPersistenceProvider().get(makeSpiBucket(bucketId), *fieldSet, docId, context); } @@ -255,47 +221,19 @@ PersistenceTestUtils::createHeaderUpdate(const document::DocumentId& docId, cons return update; } -uint16_t -PersistenceTestUtils::getDiskFromBucketDatabaseIfUnset(const document::Bucket& bucket, uint16_t disk) -{ - if (disk == 0xffff) { - StorBucketDatabase::WrappedEntry entry( - getEnv().getBucketDatabase(bucket.getBucketSpace()).get(bucket.getBucketId(), "createTestBucket")); - if (entry.exist()) { - return entry->disk; - } else { - std::ostringstream error; - error << bucket.toString() << " not in db and disk unset"; - throw vespalib::IllegalStateException(error.str(), VESPA_STRLOC); - } - } - return disk; -} - void -PersistenceTestUtils::doPut(const document::Document::SP& doc, - spi::Timestamp time, - uint16_t disk, - uint16_t usedBits) +PersistenceTestUtils::doPut(const document::Document::SP& doc, spi::Timestamp time, uint16_t usedBits) { - document::BucketId bucket( - _env->_component.getBucketIdFactory().getBucketId(doc->getId())); + document::BucketId bucket(_env->_component.getBucketIdFactory().getBucketId(doc->getId())); bucket.setUsedBits(usedBits); - disk = getDiskFromBucketDatabaseIfUnset(makeDocumentBucket(bucket), disk); - - doPut(doc, bucket, time, disk); + doPut(doc, bucket, time); } void -PersistenceTestUtils::doPut(const document::Document::SP& doc, - document::BucketId bid, - spi::Timestamp time, - uint16_t disk) +PersistenceTestUtils::doPut(const document::Document::SP& doc, document::BucketId bid, spi::Timestamp time) { - assert(disk == 0u); spi::Bucket b(makeSpiBucket(bid)); - spi::Context context(defaultLoadType, spi::Priority(0), - spi::Trace::TraceLevel(0)); + spi::Context context(defaultLoadType, spi::Priority(0), spi::Trace::TraceLevel(0)); getPersistenceProvider().createBucket(b, context); getPersistenceProvider().put(b, time, std::move(doc), context); } @@ -303,28 +241,20 @@ PersistenceTestUtils::doPut(const document::Document::SP& doc, spi::UpdateResult PersistenceTestUtils::doUpdate(document::BucketId bid, const document::DocumentUpdate::SP& update, - spi::Timestamp time, - uint16_t disk) + spi::Timestamp time) { - spi::Context context(defaultLoadType, spi::Priority(0), - spi::Trace::TraceLevel(0)); - assert(disk == 0u); - return getPersistenceProvider().update( - makeSpiBucket(bid), time, update, context); + spi::Context context(defaultLoadType, spi::Priority(0), spi::Trace::TraceLevel(0)); + return getPersistenceProvider().update(makeSpiBucket(bid), time, update, context); } void PersistenceTestUtils::doRemove(const document::DocumentId& id, spi::Timestamp time, - uint16_t disk, bool unrevertableRemove, - uint16_t usedBits) + bool unrevertableRemove, uint16_t usedBits) { document::BucketId bucket( _env->_component.getBucketIdFactory().getBucketId(id)); bucket.setUsedBits(usedBits); - disk = getDiskFromBucketDatabaseIfUnset(makeDocumentBucket(bucket), disk); - spi::Context context(defaultLoadType, spi::Priority(0), - spi::Trace::TraceLevel(0)); - assert(disk == 0u); + spi::Context context(defaultLoadType, spi::Priority(0), spi::Trace::TraceLevel(0)); if (unrevertableRemove) { getPersistenceProvider().remove( makeSpiBucket(bucket), time, id, context); @@ -360,8 +290,7 @@ PersistenceTestUtils::createRandomDocumentAtLocation( } void -PersistenceTestUtils::createTestBucket(const document::Bucket& bucket, - uint16_t disk) +PersistenceTestUtils::createTestBucket(const document::Bucket& bucket) { document::BucketId bucketId(bucket.getBucketId()); uint32_t opsPerType = 2; @@ -377,25 +306,20 @@ PersistenceTestUtils::createTestBucket(const document::Bucket& bucket, location <<= 32; location += (bucketId.getRawId() & 0xffffffff); document::Document::SP doc( - createRandomDocumentAtLocation( - location, seed, minDocSize, maxDocSize)); + createRandomDocumentAtLocation(location, seed, minDocSize, maxDocSize)); if (headerOnly) { clearBody(*doc); } - doPut(doc, spi::Timestamp(seed), disk, bucketId.getUsedBits()); + doPut(doc, spi::Timestamp(seed), bucketId.getUsedBits()); if (optype == 0) { // Regular put } else if (optype == 1) { // Overwritten later in time document::Document::SP doc2(new document::Document(*doc)); - doc2->setValue(doc2->getField("content"), - document::StringFieldValue("overwritten")); - doPut(doc2, spi::Timestamp(seed + 500), - disk, bucketId.getUsedBits()); + doc2->setValue(doc2->getField("content"), document::StringFieldValue("overwritten")); + doPut(doc2, spi::Timestamp(seed + 500), bucketId.getUsedBits()); } else if (optype == 2) { // Removed - doRemove(doc->getId(), spi::Timestamp(seed + 500), disk, false, - bucketId.getUsedBits()); + doRemove(doc->getId(), spi::Timestamp(seed + 500), false, bucketId.getUsedBits()); } else if (optype == 3) { // Unrevertable removed - doRemove(doc->getId(), spi::Timestamp(seed), disk, true, - bucketId.getUsedBits()); + doRemove(doc->getId(), spi::Timestamp(seed), true, bucketId.getUsedBits()); } } } diff --git a/storage/src/tests/persistence/persistencetestutils.h b/storage/src/tests/persistence/persistencetestutils.h index d889deabbd5..a0ab516754a 100644 --- a/storage/src/tests/persistence/persistencetestutils.h +++ b/storage/src/tests/persistence/persistencetestutils.h @@ -24,7 +24,7 @@ struct MessageKeeper : public MessageSender { }; struct PersistenceTestEnvironment { - PersistenceTestEnvironment(DiskCount numDisks, const std::string & rootOfRoot); + PersistenceTestEnvironment(const std::string & rootOfRoot); ~PersistenceTestEnvironment(); document::TestDocMan _testDocMan; @@ -34,7 +34,7 @@ struct PersistenceTestEnvironment { StorageComponent _component; FileStorMetrics _metrics; std::unique_ptr<FileStorHandler> _handler; - std::vector<std::unique_ptr<PersistenceUtil> > _diskEnvs; + std::unique_ptr<PersistenceUtil> _diskEnv; }; class PersistenceTestUtils : public testing::Test { @@ -76,14 +76,9 @@ public: PersistenceTestUtils(); virtual ~PersistenceTestUtils(); - document::Document::SP schedulePut( - uint32_t location, - spi::Timestamp timestamp, - uint16_t disk, - uint32_t minSize = 0, - uint32_t maxSize = 128); + document::Document::SP schedulePut(uint32_t location, spi::Timestamp timestamp, uint32_t minSize = 0, uint32_t maxSize = 128); - void setupDisks(uint32_t disks); + void setupDisks(); void setupExecutor(uint32_t numThreads); void TearDown() override { @@ -94,10 +89,9 @@ public: _env.reset(); } - std::string dumpBucket(const document::BucketId& bid, uint16_t disk = 0); + std::string dumpBucket(const document::BucketId& bid); - PersistenceUtil& getEnv(uint32_t disk = 0) - { return *_env->_diskEnvs[disk]; } + PersistenceUtil& getEnv() { return *_env->_diskEnv; } FileStorHandler& fsHandler() { return *_env->_handler; } FileStorMetrics& metrics() { return _env->_metrics; } MessageKeeper& messageKeeper() { return _env->_messageKeeper; } @@ -128,11 +122,9 @@ public: } /** - Performs a put to the given disk. Returns the document that was inserted. */ document::Document::SP doPutOnDisk( - uint16_t disk, uint32_t location, spi::Timestamp timestamp, uint32_t minSize = 0, @@ -143,14 +135,12 @@ public: spi::Timestamp timestamp, uint32_t minSize = 0, uint32_t maxSize = 128) - { return doPutOnDisk(0, location, timestamp, minSize, maxSize); } + { return doPutOnDisk(location, timestamp, minSize, maxSize); } /** - Performs a remove to the given disk. Returns the new doccount if document was removed, or -1 if not found. */ bool doRemoveOnDisk( - uint16_t disk, const document::BucketId& bid, const document::DocumentId& id, spi::Timestamp timestamp, @@ -161,11 +151,10 @@ public: const document::DocumentId& id, spi::Timestamp timestamp, bool persistRemove) { - return doRemoveOnDisk(0, bid, id, timestamp, persistRemove); + return doRemoveOnDisk(bid, id, timestamp, persistRemove); } - bool doUnrevertableRemoveOnDisk(uint16_t disk, - const document::BucketId& bid, + bool doUnrevertableRemoveOnDisk(const document::BucketId& bid, const document::DocumentId& id, spi::Timestamp timestamp); @@ -173,29 +162,27 @@ public: const document::DocumentId& id, spi::Timestamp timestamp) { - return doUnrevertableRemoveOnDisk(0, bid, id, timestamp); + return doUnrevertableRemoveOnDisk(bid, id, timestamp); } /** * Do a remove toward storage set up in test environment. * * @id Document to remove. - * @disk If set, use this disk, otherwise lookup in bucket db. * @unrevertableRemove If set, instead of adding put, turn put to remove. * @usedBits Generate bucket to use from docid using this amount of bits. */ - void doRemove(const document::DocumentId& id, spi::Timestamp, uint16_t disk = 0xffff, + void doRemove(const document::DocumentId& id, spi::Timestamp, bool unrevertableRemove = false, uint16_t usedBits = 16); spi::GetResult doGetOnDisk( - uint16_t disk, const document::BucketId& bucketId, const document::DocumentId& docId); spi::GetResult doGet( const document::BucketId& bucketId, const document::DocumentId& docId) - { return doGetOnDisk(0, bucketId, docId); } + { return doGetOnDisk(bucketId, docId); } std::shared_ptr<document::DocumentUpdate> createBodyUpdate( const document::DocumentId& id, @@ -205,28 +192,23 @@ public: const document::DocumentId& id, const document::FieldValue& updateValue); - uint16_t getDiskFromBucketDatabaseIfUnset(const document::Bucket &, - uint16_t disk = 0xffff); + uint16_t getDiskFromBucketDatabaseIfUnset(const document::Bucket &); /** * Do a put toward storage set up in test environment. * * @doc Document to put. Use TestDocMan to generate easily. - * @disk If set, use this disk, otherwise lookup in bucket db. * @usedBits Generate bucket to use from docid using this amount of bits. */ - void doPut(const document::Document::SP& doc, spi::Timestamp, - uint16_t disk = 0xffff, uint16_t usedBits = 16); + void doPut(const document::Document::SP& doc, spi::Timestamp, uint16_t usedBits = 16); void doPut(const document::Document::SP& doc, document::BucketId bid, - spi::Timestamp time, - uint16_t disk = 0); + spi::Timestamp time); spi::UpdateResult doUpdate(document::BucketId bid, const std::shared_ptr<document::DocumentUpdate>& update, - spi::Timestamp time, - uint16_t disk = 0); + spi::Timestamp time); document::Document::UP createRandomDocumentAtLocation( uint64_t location, uint32_t seed, @@ -237,14 +219,13 @@ public: * bucket can represent. (Such that tests have a nice test bucket to use * that require operations to handle all the various bucket contents. * - * @disk If set, use this disk, otherwise lookup in bucket db. */ - void createTestBucket(const document::Bucket&, uint16_t disk = 0xffff); + void createTestBucket(const document::Bucket&); /** * Create a new persistence thread. */ - std::unique_ptr<PersistenceThread> createPersistenceThread(uint32_t disk); + std::unique_ptr<PersistenceThread> createPersistenceThread(); /** * In-place modify doc so that it has no more body fields. @@ -256,7 +237,7 @@ class SingleDiskPersistenceTestUtils : public PersistenceTestUtils { public: void SetUp() override { - setupDisks(1); + setupDisks(); } }; diff --git a/storage/src/tests/persistence/persistencethread_splittest.cpp b/storage/src/tests/persistence/persistencethread_splittest.cpp index 3d7fc70db6a..10fb0b2e6b4 100644 --- a/storage/src/tests/persistence/persistencethread_splittest.cpp +++ b/storage/src/tests/persistence/persistencethread_splittest.cpp @@ -204,7 +204,7 @@ PersistenceThreadSplitTest::doTest(SplitCase splitCase) spi.put(bucket, spi::Timestamp(1000 + i), std::move(doc), context); } - std::unique_ptr<PersistenceThread> thread(createPersistenceThread(0)); + std::unique_ptr<PersistenceThread> thread(createPersistenceThread()); getNode().getStateUpdater().setClusterState( std::make_shared<lib::ClusterState>("distributor:1 storage:1")); document::Bucket docBucket = makeDocumentBucket(document::BucketId(currentSplitLevel, 1)); diff --git a/storage/src/tests/persistence/processalltest.cpp b/storage/src/tests/persistence/processalltest.cpp index 5174b733334..24c67fe93c1 100644 --- a/storage/src/tests/persistence/processalltest.cpp +++ b/storage/src/tests/persistence/processalltest.cpp @@ -42,7 +42,7 @@ TEST_F(ProcessAllHandlerTest, remove_location_document_subset) { for (int i = 0; i < 10; ++i) { document::Document::SP doc(docMan.createRandomDocumentAtLocation(4, 1234 + i)); doc->setValue(doc->getField("headerval"), document::IntFieldValue(i)); - doPut(doc, bucketId, spi::Timestamp(100 + i), 0); + doPut(doc, bucketId, spi::Timestamp(100 + i)); } document::Bucket bucket = makeDocumentBucket(bucketId); @@ -102,7 +102,7 @@ TEST_F(ProcessAllHandlerTest, bucket_stat_request_returns_document_metadata_matc for (int i = 0; i < 10; ++i) { document::Document::SP doc(docMan.createRandomDocumentAtLocation(4, 1234 + i)); doc->setValue(doc->getField("headerval"), document::IntFieldValue(i)); - doPut(doc, bucketId, spi::Timestamp(100 + i), 0); + doPut(doc, bucketId, spi::Timestamp(100 + i)); } document::Bucket bucket = makeDocumentBucket(bucketId); @@ -114,7 +114,7 @@ TEST_F(ProcessAllHandlerTest, bucket_stat_request_returns_document_metadata_matc EXPECT_EQ(api::ReturnCode::OK, reply.getResult().getResult()); vespalib::string expected = - "Persistence bucket BucketId(0x4000000000000004), partition 0\n" + "Persistence bucket BucketId(0x4000000000000004)\n" " Timestamp: 100, Doc(id:mail:testdoctype1:n=4:3619.html), gid(0x0400000092bb8d298934253a), size: 163\n" " Timestamp: 102, Doc(id:mail:testdoctype1:n=4:62608.html), gid(0x04000000ce878d2488413bc4), size: 141\n" " Timestamp: 104, Doc(id:mail:testdoctype1:n=4:56061.html), gid(0x040000002b8f80f0160f6c5c), size: 118\n" @@ -132,7 +132,7 @@ TEST_F(ProcessAllHandlerTest, stat_bucket_request_can_returned_removed_entries) for (int i = 0; i < 10; ++i) { document::Document::SP doc(docMan.createRandomDocumentAtLocation(4, 1234 + i)); doc->setValue(doc->getField("headerval"), document::IntFieldValue(i)); - doPut(doc, bucketId, spi::Timestamp(100 + i), 0); + doPut(doc, bucketId, spi::Timestamp(100 + i)); doRemove(bucketId, doc->getId(), spi::Timestamp(200 + i), @@ -148,7 +148,7 @@ TEST_F(ProcessAllHandlerTest, stat_bucket_request_can_returned_removed_entries) EXPECT_EQ(api::ReturnCode::OK, reply.getResult().getResult()); vespalib::string expected = - "Persistence bucket BucketId(0x4000000000000004), partition 0\n" + "Persistence bucket BucketId(0x4000000000000004)\n" " Timestamp: 100, Doc(id:mail:testdoctype1:n=4:3619.html), gid(0x0400000092bb8d298934253a), size: 163\n" " Timestamp: 101, Doc(id:mail:testdoctype1:n=4:33113.html), gid(0x04000000b121a632741db368), size: 89\n" " Timestamp: 102, Doc(id:mail:testdoctype1:n=4:62608.html), gid(0x04000000ce878d2488413bc4), size: 141\n" @@ -182,7 +182,7 @@ TEST_F(ProcessAllHandlerTest, bucket_stat_request_can_return_all_put_entries_in_ for (int i = 0; i < 10; ++i) { document::Document::SP doc(docMan.createRandomDocumentAtLocation(4, 1234 + i)); doc->setValue(doc->getField("headerval"), document::IntFieldValue(i)); - doPut(doc, bucketId, spi::Timestamp(100 + i), 0); + doPut(doc, bucketId, spi::Timestamp(100 + i)); } document::Bucket bucket = makeDocumentBucket(bucketId); @@ -194,7 +194,7 @@ TEST_F(ProcessAllHandlerTest, bucket_stat_request_can_return_all_put_entries_in_ EXPECT_EQ(api::ReturnCode::OK, reply.getResult().getResult()); vespalib::string expected = - "Persistence bucket BucketId(0x4000000000000004), partition 0\n" + "Persistence bucket BucketId(0x4000000000000004)\n" " Timestamp: 100, Doc(id:mail:testdoctype1:n=4:3619.html), gid(0x0400000092bb8d298934253a), size: 163\n" " Timestamp: 101, Doc(id:mail:testdoctype1:n=4:33113.html), gid(0x04000000b121a632741db368), size: 89\n" " Timestamp: 102, Doc(id:mail:testdoctype1:n=4:62608.html), gid(0x04000000ce878d2488413bc4), size: 141\n" diff --git a/storage/src/tests/persistence/testandsettest.cpp b/storage/src/tests/persistence/testandsettest.cpp index 18d78e1e8cf..ce041660a2f 100644 --- a/storage/src/tests/persistence/testandsettest.cpp +++ b/storage/src/tests/persistence/testandsettest.cpp @@ -47,7 +47,7 @@ struct TestAndSetTest : SingleDiskPersistenceTestUtils { createBucket(BUCKET_ID); getPersistenceProvider().createBucket(makeSpiBucket(BUCKET_ID),context); - thread = createPersistenceThread(0); + thread = createPersistenceThread(); testDoc = createTestDocument(); testDocId = testDoc->getId(); } diff --git a/storage/src/tests/storageserver/bouncertest.cpp b/storage/src/tests/storageserver/bouncertest.cpp index 46dad62de48..bcfe2bd0830 100644 --- a/storage/src/tests/storageserver/bouncertest.cpp +++ b/storage/src/tests/storageserver/bouncertest.cpp @@ -65,7 +65,7 @@ BouncerTest::BouncerTest() void BouncerTest::setUpAsNode(const lib::NodeType& type) { vdstestlib::DirConfig config(getStandardConfig(type == lib::NodeType::STORAGE)); if (type == lib::NodeType::STORAGE) { - _node.reset(new TestServiceLayerApp(DiskCount(1), NodeIndex(2), config.getConfigId())); + _node.reset(new TestServiceLayerApp(NodeIndex(2), config.getConfigId())); } else { _node.reset(new TestDistributorApp(NodeIndex(2), config.getConfigId())); } diff --git a/storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp b/storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp index 2f091572ed2..22441223c5c 100644 --- a/storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp +++ b/storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp @@ -109,7 +109,6 @@ ChangedBucketOwnershipHandlerTest::insertBuckets(uint32_t numBuckets, bucketdb::StorageBucketInfo sbi; sbi.setBucketInfo(api::BucketInfo(1, 2, 3)); - sbi.disk = 0; _app->getStorageBucketDatabase().insert(bucket, sbi, "test"); inserted.push_back(bucket); } diff --git a/storage/src/tests/storageserver/mergethrottlertest.cpp b/storage/src/tests/storageserver/mergethrottlertest.cpp index 6ac0bfcb5d8..e6067fa561c 100644 --- a/storage/src/tests/storageserver/mergethrottlertest.cpp +++ b/storage/src/tests/storageserver/mergethrottlertest.cpp @@ -156,13 +156,11 @@ MergeThrottlerTest::SetUp() vdstestlib::DirConfig config(getStandardConfig(true)); for (int i = 0; i < _storageNodeCount; ++i) { - std::unique_ptr<TestServiceLayerApp> server( - new TestServiceLayerApp(DiskCount(1), NodeIndex(i))); - server->setClusterState(lib::ClusterState( - "distributor:100 storage:100 version:1")); + auto server = std::make_unique<TestServiceLayerApp>(NodeIndex(i)); + server->setClusterState(lib::ClusterState("distributor:100 storage:100 version:1")); std::unique_ptr<DummyStorageLink> top; - top.reset(new DummyStorageLink); + top = std::make_unique<DummyStorageLink>(); MergeThrottler* throttler = new MergeThrottler(config.getConfigId(), server->getComponentRegister()); // MergeThrottler will be sandwiched in between two dummy links top->push_back(std::unique_ptr<StorageLink>(throttler)); diff --git a/storage/src/tests/storageserver/statemanagertest.cpp b/storage/src/tests/storageserver/statemanagertest.cpp index f88b59f50a5..b55e62d5fd3 100644 --- a/storage/src/tests/storageserver/statemanagertest.cpp +++ b/storage/src/tests/storageserver/statemanagertest.cpp @@ -55,7 +55,7 @@ StateManagerTest::StateManagerTest() void StateManagerTest::SetUp() { vdstestlib::DirConfig config(getStandardConfig(true)); - _node = std::make_unique<TestServiceLayerApp>(DiskCount(1), NodeIndex(2)); + _node = std::make_unique<TestServiceLayerApp>(NodeIndex(2)); // Clock will increase 1 sec per call. _node->getClock().setAbsoluteTimeInSeconds(1); _metricManager = std::make_unique<metrics::MetricManager>(); diff --git a/storage/src/tests/storageserver/statereportertest.cpp b/storage/src/tests/storageserver/statereportertest.cpp index f0907bffba1..728bbb7a473 100644 --- a/storage/src/tests/storageserver/statereportertest.cpp +++ b/storage/src/tests/storageserver/statereportertest.cpp @@ -70,7 +70,7 @@ void StateReporterTest::SetUp() { _config = std::make_unique<vdstestlib::DirConfig>(getStandardConfig(true, "statereportertest")); assert(system(("rm -rf " + getRootFolder(*_config)).c_str()) == 0); - _node = std::make_unique<TestServiceLayerApp>(DiskCount(1), NodeIndex(0), _config->getConfigId()); + _node = std::make_unique<TestServiceLayerApp>(NodeIndex(0), _config->getConfigId()); _node->setupDummyPersistence(); _clock = &_node->getClock(); _clock->setAbsoluteTimeInSeconds(1000000); diff --git a/storage/src/tests/visiting/visitormanagertest.cpp b/storage/src/tests/visiting/visitormanagertest.cpp index c6ce935b611..031290cecb4 100644 --- a/storage/src/tests/visiting/visitormanagertest.cpp +++ b/storage/src/tests/visiting/visitormanagertest.cpp @@ -161,7 +161,6 @@ VisitorManagerTest::initializeTest() StorBucketDatabase::WrappedEntry entry( _node->getStorageBucketDatabase().get(bid, "", StorBucketDatabase::CREATE_IF_NONEXISTING)); - entry->disk = 0; entry.write(); } for (uint32_t i=0; i<docCount; ++i) { diff --git a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp index 7b9d5b77c98..ab22d8440fa 100644 --- a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp +++ b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp @@ -184,16 +184,15 @@ namespace { document::BucketId::keyToBucketId(bucketId)); if (data.valid()) { - assert(data.disk < diskCount); - ++disk[data.disk].buckets; + ++disk[0].buckets; if (data.getBucketInfo().isActive()) { - ++disk[data.disk].active; + ++disk[0].active; } if (data.getBucketInfo().isReady()) { - ++disk[data.disk].ready; + ++disk[0].ready; } - disk[data.disk].docs += data.getBucketInfo().getDocumentCount(); - disk[data.disk].bytes += data.getBucketInfo().getTotalDocumentSize(); + disk[0].docs += data.getBucketInfo().getDocumentCount(); + disk[0].bytes += data.getBucketInfo().getTotalDocumentSize(); if (bucket.getUsedBits() < lowestUsedBit) { lowestUsedBit = bucket.getUsedBits(); @@ -357,7 +356,6 @@ namespace { _xos << XmlTag("bucket") << XmlAttribute("id", ost.str()); info.getBucketInfo().printXml(_xos); - _xos << XmlAttribute("disk", info.disk); _xos << XmlEndTag(); }; }; diff --git a/storage/src/vespa/storage/bucketdb/storagebucketdbinitializer.cpp b/storage/src/vespa/storage/bucketdb/storagebucketdbinitializer.cpp index 0878ffb3d99..0761d845ce6 100644 --- a/storage/src/vespa/storage/bucketdb/storagebucketdbinitializer.cpp +++ b/storage/src/vespa/storage/bucketdb/storagebucketdbinitializer.cpp @@ -382,7 +382,7 @@ StorageBucketDBInitializer::registerBucket(const document::Bucket &bucket, bucketInfo.toString().c_str()); } if (entry.preExisted()) { - if (entry->disk == partition) { + if (0 == partition) { LOG(debug, "%s already existed in bucket database on disk %i. " "Might have been moved from wrong directory prior to " "listing this directory.", @@ -395,14 +395,14 @@ StorageBucketDBInitializer::registerBucket(const document::Bucket &bucket, bucketId.stripUnused()) == partition) { keepOnDisk = partition; - joinFromDisk = entry->disk; + joinFromDisk = 0; } else { - keepOnDisk = entry->disk; + keepOnDisk = 0; joinFromDisk = partition; } - LOG(debug, "%s exist on both disk %u and disk %i. Joining two versions " + LOG(debug, "%s exist on both disk 0 and disk %i. Joining two versions " "onto disk %u.", - bucketId.toString().c_str(), entry->disk, int(partition), keepOnDisk); + bucketId.toString().c_str(), int(partition), keepOnDisk); entry.unlock(); // Must not have bucket db lock while sending down auto cmd = std::make_shared<InternalBucketJoinCommand>(bucket, keepOnDisk, joinFromDisk); @@ -414,7 +414,6 @@ StorageBucketDBInitializer::registerBucket(const document::Bucket &bucket, _system._component.getMinUsedBitsTracker().update(bucketId); LOG(spam, "Inserted %s on disk %i into bucket database", bucketId.toString().c_str(), int(partition)); - entry->disk = partition; entry.write(); uint16_t disk(distribution.getIdealDisk( _system._nodeState, _system._nodeIndex, bucketId.stripUnused(), @@ -453,7 +452,7 @@ namespace { return StorBucketDatabase::Decision::CONTINUE; } _iterator = bucket; - if (entry.disk != _disk) { + if (0 != _disk) { //LOG(spam, "Ignoring bucket %s as it is not on disk currently " // "being processed", bucket.toString().c_str()); // Ignore. We only want to scan for one disk diff --git a/storage/src/vespa/storage/bucketdb/storagebucketinfo.h b/storage/src/vespa/storage/bucketdb/storagebucketinfo.h index d9081432feb..f8732b7914b 100644 --- a/storage/src/vespa/storage/bucketdb/storagebucketinfo.h +++ b/storage/src/vespa/storage/bucketdb/storagebucketinfo.h @@ -8,9 +8,8 @@ namespace storage::bucketdb { struct StorageBucketInfo { api::BucketInfo info; - unsigned disk : 8; // The disk containing the bucket - StorageBucketInfo() : info(), disk(0xff) {} + StorageBucketInfo() : info() {} static bool mayContain(const StorageBucketInfo&) { return true; } void print(std::ostream&, bool verbose, const std::string& indent) const; bool valid() const { return info.valid(); } @@ -22,7 +21,7 @@ struct StorageBucketInfo { info.setDocumentCount(0); info.setTotalDocumentSize(0); } - bool verifyLegal() const { return (disk != 0xff); } + bool verifyLegal() const { return true; } uint32_t getMetaCount() const { return info.getMetaCount(); } void setChecksum(uint32_t crc) { info.setChecksum(crc); } bool operator == (const StorageBucketInfo & b) const; diff --git a/storage/src/vespa/storage/bucketdb/storbucketdb.cpp b/storage/src/vespa/storage/bucketdb/storbucketdb.cpp index e64a19e9a4a..bb61867bcc5 100644 --- a/storage/src/vespa/storage/bucketdb/storbucketdb.cpp +++ b/storage/src/vespa/storage/bucketdb/storbucketdb.cpp @@ -17,19 +17,19 @@ void StorageBucketInfo:: print(std::ostream& out, bool, const std::string&) const { - out << info << ", disk " << disk; + out << info; } -bool StorageBucketInfo::operator==(const StorageBucketInfo& b) const { - return disk == b.disk; +bool StorageBucketInfo::operator==(const StorageBucketInfo& ) const { + return true; } bool StorageBucketInfo::operator!=(const StorageBucketInfo& b) const { return !(*this == b); } -bool StorageBucketInfo::operator<(const StorageBucketInfo& b) const { - return disk < b.disk; +bool StorageBucketInfo::operator<(const StorageBucketInfo& ) const { + return false; } std::ostream& @@ -62,7 +62,6 @@ StorBucketDatabase::insert(const document::BucketId& bucket, const bucketdb::StorageBucketInfo& entry, const char* clientId) { - assert(entry.disk != 0xff); bool preExisted; return _impl->insert(bucket.toKey(), entry, clientId, false, preExisted); } diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp index 9d9c7e10111..af863f82d6f 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp @@ -25,15 +25,15 @@ FileStorHandler::flush(bool flushMerges) } void -FileStorHandler::setDiskState(uint16_t disk, DiskState state) +FileStorHandler::setDiskState(DiskState state) { - _impl->setDiskState(disk, state); + _impl->setDiskState(state); } FileStorHandler::DiskState -FileStorHandler::getDiskState(uint16_t disk) +FileStorHandler::getDiskState() { - return _impl->getDiskState(disk); + return _impl->getDiskState(); } void @@ -49,28 +49,28 @@ FileStorHandler::pause() } bool -FileStorHandler::schedule(const api::StorageMessage::SP& msg, uint16_t disk) +FileStorHandler::schedule(const api::StorageMessage::SP& msg) { - return _impl->schedule(msg, disk); + return _impl->schedule(msg); } FileStorHandler::LockedMessage -FileStorHandler::getNextMessage(uint16_t disk, uint32_t stripeId) +FileStorHandler::getNextMessage(uint32_t stripeId) { - return _impl->getNextMessage(disk, stripeId); + return _impl->getNextMessage(stripeId); } FileStorHandler::BucketLockInterface::SP -FileStorHandler::lock(const document::Bucket& bucket, uint16_t disk, api::LockingRequirements lockReq) +FileStorHandler::lock(const document::Bucket& bucket, api::LockingRequirements lockReq) { - return _impl->lock(bucket, disk, lockReq); + return _impl->lock(bucket, lockReq); } void -FileStorHandler::remapQueueAfterDiskMove(const document::Bucket& bucket, uint16_t sourceDisk, uint16_t targetDisk) +FileStorHandler::remapQueueAfterDiskMove(const document::Bucket& bucket) { - RemapInfo target(bucket, targetDisk); - _impl->remapQueue(RemapInfo(bucket, sourceDisk), target, FileStorHandlerImpl::MOVE); + RemapInfo target(bucket); + _impl->remapQueue(RemapInfo(bucket), target, FileStorHandlerImpl::MOVE); } void @@ -86,9 +86,9 @@ FileStorHandler::remapQueueAfterSplit(const RemapInfo& source,RemapInfo& target1 } void -FileStorHandler::failOperations(const document::Bucket &bucket, uint16_t fromDisk, const api::ReturnCode& err) +FileStorHandler::failOperations(const document::Bucket &bucket, const api::ReturnCode& err) { - _impl->failOperations(bucket, fromDisk, err); + _impl->failOperations(bucket, err); } void @@ -121,12 +121,6 @@ FileStorHandler::getQueueSize() const return _impl->getQueueSize(); } -uint32_t -FileStorHandler::getQueueSize(uint16_t disk) const -{ - return _impl->getQueueSize(disk); -} - void FileStorHandler::addMergeStatus(const document::Bucket& bucket, MergeStatus::SP ms) { @@ -152,8 +146,8 @@ FileStorHandler::getNumActiveMerges() const } uint32_t -FileStorHandler::getNextStripeId(uint32_t disk) { - return _impl->getNextStripeId(disk); +FileStorHandler::getNextStripeId() { + return _impl->getNextStripeId(); } void @@ -182,9 +176,9 @@ FileStorHandler::setGetNextMessageTimeout(vespalib::duration timeout) } std::string -FileStorHandler::dumpQueue(uint16_t disk) const +FileStorHandler::dumpQueue() const { - return _impl->dumpQueue(disk); + return _impl->dumpQueue(); } } // storage diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h index ccce5b7326a..bb210cd6b62 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h @@ -39,14 +39,12 @@ class FileStorHandler : public MessageSender { public: struct RemapInfo { document::Bucket bucket; - uint16_t diskIndex; bool foundInQueue; - RemapInfo(const document::Bucket &bucket_, uint16_t diskIdx) + RemapInfo(const document::Bucket &bucket_) : bucket(bucket_), - diskIndex(diskIdx), foundInQueue(false) - {} + {} }; class BucketLockInterface { @@ -69,8 +67,7 @@ public: FileStorHandler(uint32_t numThreads, uint32_t numStripes, MessageSender&, FileStorMetrics&, ServiceLayerComponentRegister&); - FileStorHandler(MessageSender&, FileStorMetrics&, - ServiceLayerComponentRegister&); + FileStorHandler(MessageSender&, FileStorMetrics&, ServiceLayerComponentRegister&); ~FileStorHandler(); // Commands used by file stor manager @@ -84,17 +81,17 @@ public: */ void flush(bool killPendingMerges); - void setDiskState(uint16_t disk, DiskState state); - DiskState getDiskState(uint16_t disk); + void setDiskState(DiskState state); + DiskState getDiskState(); /** Check whether a given disk is enabled or not. */ - bool enabled(uint16_t disk) { return (getDiskState(disk) == AVAILABLE); } - bool closed(uint16_t disk) { return (getDiskState(disk) == CLOSED); } + bool enabled() { return (getDiskState() == AVAILABLE); } + bool closed() { return (getDiskState() == CLOSED); } /** - * Disable the given disk. Operations towards threads using this disk will + * Disable the disk. Operations towards threads using this disk will * start to fail. Typically called when disk errors are detected. */ - void disable(uint16_t disk) { setDiskState(disk, DISABLED); } + void disable() { setDiskState(DISABLED); } /** Closes all disk threads. */ void close(); @@ -108,14 +105,14 @@ public: * Schedule a storage message to be processed by the given disk * @return True if we maanged to schedule operation. False if not */ - bool schedule(const std::shared_ptr<api::StorageMessage>&, uint16_t disk); + bool schedule(const std::shared_ptr<api::StorageMessage>&); /** * Used by file stor threads to get their next message to process. * * @param disk The disk to get messages for */ - LockedMessage getNextMessage(uint16_t disk, uint32_t stripeId); + LockedMessage getNextMessage(uint32_t stripeId); /** * Lock a bucket. By default, each file stor thread has the locks of all @@ -131,7 +128,7 @@ public: * * */ - BucketLockInterface::SP lock(const document::Bucket&, uint16_t disk, api::LockingRequirements lockReq); + BucketLockInterface::SP lock(const document::Bucket&, api::LockingRequirements lockReq); /** * Called by FileStorThread::onBucketDiskMove() after moving file, in case @@ -143,8 +140,7 @@ public: * requeststatus - Ignore * readbucketinfo/bucketdiskmove/internalbucketjoin - Fail and log errors */ - void remapQueueAfterDiskMove(const document::Bucket &bucket, - uint16_t sourceDisk, uint16_t targetDisk); + void remapQueueAfterDiskMove(const document::Bucket &bucket); /** * Called by FileStorThread::onJoin() after joining a bucket into another, @@ -194,7 +190,7 @@ public: * Fail all operations towards a single bucket currently queued to the * given thread with the given error code. */ - void failOperations(const document::Bucket&, uint16_t fromDisk, const api::ReturnCode&); + void failOperations(const document::Bucket&, const api::ReturnCode&); /** * Add a new merge state to the registry. @@ -224,7 +220,7 @@ public: uint32_t getNumActiveMerges() const; /// Provides the next stripe id for a certain disk. - uint32_t getNextStripeId(uint32_t disk); + uint32_t getNextStripeId(); /** Removes the merge status for the given bucket. */ void clearMergeStatus(const document::Bucket&); @@ -243,12 +239,11 @@ public: /** Utility function to fetch total size of queue. */ uint32_t getQueueSize() const; - uint32_t getQueueSize(uint16_t disk) const; // Commands used by testing void setGetNextMessageTimeout(vespalib::duration timeout); - std::string dumpQueue(uint16_t disk) const; + std::string dumpQueue() const; private: std::unique_ptr<FileStorHandlerImpl> _impl; diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp index 518523be7a2..88b3a6bcafc 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp @@ -41,31 +41,21 @@ FileStorHandlerImpl::FileStorHandlerImpl(uint32_t numThreads, uint32_t numStripe FileStorMetrics& metrics, ServiceLayerComponentRegister& compReg) : _component(compReg, "filestorhandlerimpl"), - _diskInfo(), + _disk(*this, sender, numStripes), _messageSender(sender), _bucketIdFactory(_component.getBucketIdFactory()), _getNextMessageTimeout(100ms), _max_active_merges_per_stripe(per_stripe_merge_limit(numThreads, numStripes)), _paused(false) { - _diskInfo.reserve(_component.getDiskCount()); - for (uint32_t i(0); i < _component.getDiskCount(); i++) { - _diskInfo.emplace_back(*this, sender, numStripes); - } - for (uint32_t i=0; i<_diskInfo.size(); ++i) { - _diskInfo[i].metrics = metrics.disks[i].get(); - assert(_diskInfo[i].metrics != nullptr); - uint32_t j(0); - for (Stripe & stripe : _diskInfo[i].getStripes()) { - stripe.setMetrics(metrics.disks[i]->stripes[j++].get()); - } + _disk.metrics = metrics.disks[0].get(); + assert(_disk.metrics != nullptr); + uint32_t j(0); + for (Stripe & stripe : _disk.getStripes()) { + stripe.setMetrics(metrics.disks[0]->stripes[j++].get()); } - if (_diskInfo.size() == 0) { - throw vespalib::IllegalArgumentException("No disks configured", VESPA_STRLOC); - } - // Add update hook, so we will get callbacks each 5 seconds to update - // metrics. + // Add update hook, so we will get callbacks each 5 seconds to update metrics. _component.registerMetricUpdateHook(*this, framework::SecondTime(5)); } @@ -86,7 +76,7 @@ FileStorHandlerImpl::editMergeStatus(const document::Bucket& bucket) { std::lock_guard mlock(_mergeStatesLock); MergeStatus::SP status = _mergeStates[bucket]; - if (status.get() == 0) { + if ( ! status ) { throw vespalib::IllegalStateException("No merge state exist for " + bucket.toString(), VESPA_STRLOC); } return *status; @@ -151,11 +141,9 @@ FileStorHandlerImpl::clearMergeStatus(const document::Bucket& bucket, const api: void FileStorHandlerImpl::flush(bool killPendingMerges) { - for (uint32_t i=0; i<_diskInfo.size(); ++i) { - LOG(debug, "Wait until queues and bucket locks released for disk '%d'", i); - _diskInfo[i].flush(); - LOG(debug, "All queues and bucket locks released for disk '%d'", i); - } + LOG(debug, "Wait until queues and bucket locks released."); + _disk.flush(); + LOG(debug, "All queues and bucket locks released."); if (killPendingMerges) { api::ReturnCode code(api::ReturnCode::ABORTED, "Storage node is shutting down"); @@ -194,53 +182,44 @@ FileStorHandlerImpl::reply(api::StorageMessage& msg, DiskState state) const } void -FileStorHandlerImpl::setDiskState(uint16_t diskId, DiskState state) +FileStorHandlerImpl::setDiskState(DiskState state) { - Disk& disk = _diskInfo[diskId]; // Mark disk closed - disk.setState(state); + _disk.setState(state); if (state != FileStorHandler::AVAILABLE) { - disk.flush(); + _disk.flush(); } } FileStorHandler::DiskState -FileStorHandlerImpl::getDiskState(uint16_t disk) const +FileStorHandlerImpl::getDiskState() const { - return _diskInfo[disk].getState(); + return _disk.getState(); } void FileStorHandlerImpl::close() { - for (uint32_t i=0; i<_diskInfo.size(); ++i) { - if (getDiskState(i) == FileStorHandler::AVAILABLE) { - LOG(debug, "AVAILABLE -> CLOSED disk[%d]", i); - setDiskState(i, FileStorHandler::CLOSED); - } - LOG(debug, "Closing disk[%d]", i); - _diskInfo[i].broadcast(); - LOG(debug, "Closed disk[%d]", i); + if (getDiskState() == FileStorHandler::AVAILABLE) { + LOG(debug, "AVAILABLE -> CLOSED"); + setDiskState(FileStorHandler::CLOSED); } + LOG(debug, "Closing"); + _disk.broadcast(); + LOG(debug, "Closed"); } uint32_t FileStorHandlerImpl::getQueueSize() const { - size_t count = 0; - for (const auto & disk : _diskInfo) { - count += disk.getQueueSize(); - } - return count; + return _disk.getQueueSize(); } bool -FileStorHandlerImpl::schedule(const std::shared_ptr<api::StorageMessage>& msg, uint16_t diskId) +FileStorHandlerImpl::schedule(const std::shared_ptr<api::StorageMessage>& msg) { - assert(diskId < _diskInfo.size()); - Disk& disk(_diskInfo[diskId]); - return disk.schedule(msg); + return _disk.schedule(msg); } bool @@ -288,43 +267,37 @@ FileStorHandlerImpl::abortQueuedOperations(const AbortBucketOperationsCommand& c { // Do queue clearing and active operation waiting in two passes // to allow disk threads to drain running operations in parallel. - for (Disk & disk : _diskInfo) { - abortQueuedCommandsForBuckets(disk, cmd); - } - for (Disk & disk : _diskInfo) { - disk.waitInactive(cmd); - } + abortQueuedCommandsForBuckets(_disk, cmd); + _disk.waitInactive(cmd); } void FileStorHandlerImpl::updateMetrics(const MetricLockGuard &) { - for (Disk & disk : _diskInfo) { - std::lock_guard lockGuard(_mergeStatesLock); - disk.metrics->pendingMerges.addValue(_mergeStates.size()); - disk.metrics->queueSize.addValue(disk.getQueueSize()); - - for (auto & entry : disk.metrics->averageQueueWaitingTime.getMetricMap()) { - metrics::LoadType loadType(entry.first, "ignored"); - for (const auto & stripe : disk.metrics->stripes) { - const auto & m = stripe->averageQueueWaitingTime[loadType]; - entry.second->addTotalValueWithCount(m.getTotal(), m.getCount()); - } + std::lock_guard lockGuard(_mergeStatesLock); + _disk.metrics->pendingMerges.addValue(_mergeStates.size()); + _disk.metrics->queueSize.addValue(_disk.getQueueSize()); + + for (auto & entry : _disk.metrics->averageQueueWaitingTime.getMetricMap()) { + metrics::LoadType loadType(entry.first, "ignored"); + for (const auto & stripe : _disk.metrics->stripes) { + const auto & m = stripe->averageQueueWaitingTime[loadType]; + entry.second->addTotalValueWithCount(m.getTotal(), m.getCount()); } } } uint32_t -FileStorHandlerImpl::getNextStripeId(uint32_t disk) { - return _diskInfo[disk].getNextStripeId(); +FileStorHandlerImpl::getNextStripeId() { + return _disk.getNextStripeId(); } bool -FileStorHandlerImpl::tryHandlePause(uint16_t disk) const +FileStorHandlerImpl::tryHandlePause() const { if (isPaused()) { // Wait a single time to see if filestor gets unpaused. - if (!_diskInfo[disk].isClosed()) { + if (!_disk.isClosed()) { std::unique_lock g(_pauseMonitor); _pauseCond.wait_for(g, 100ms); } @@ -352,14 +325,13 @@ FileStorHandlerImpl::makeQueueTimeoutReply(api::StorageMessage& msg) } FileStorHandler::LockedMessage -FileStorHandlerImpl::getNextMessage(uint16_t disk, uint32_t stripeId) +FileStorHandlerImpl::getNextMessage(uint32_t stripeId) { - assert(disk < _diskInfo.size()); - if (!tryHandlePause(disk)) { + if (!tryHandlePause()) { return {}; // Still paused, return to allow tick. } - return _diskInfo[disk].getNextMessage(stripeId, _getNextMessageTimeout); + return _disk.getNextMessage(stripeId, _getNextMessageTimeout); } std::shared_ptr<FileStorHandler::BucketLockInterface> @@ -381,30 +353,14 @@ FileStorHandlerImpl::Stripe::lock(const document::Bucket &bucket, api::LockingRe namespace { struct MultiLockGuard { using monitor_guard = FileStorHandlerImpl::monitor_guard; - struct DiskAndStripe { - uint16_t disk; - uint16_t stripe; - - DiskAndStripe(uint16_t disk_, uint16_t stripe_) noexcept : disk(disk_), stripe(stripe_) {} - bool operator==(const DiskAndStripe& rhs) const noexcept { - return (disk == rhs.disk) && (stripe == rhs.stripe); - } - bool operator<(const DiskAndStripe& rhs) const noexcept { - if (disk != rhs.disk) { - return disk < rhs.disk; - } - return stripe < rhs.stripe; - } - }; - - std::map<DiskAndStripe, std::mutex*> monitors; + std::map<uint16_t, std::mutex*> monitors; std::vector<std::shared_ptr<monitor_guard>> guards; MultiLockGuard() = default; - void addLock(std::mutex & lock, uint16_t disk_index, uint16_t stripe_index) { - monitors[DiskAndStripe(disk_index, stripe_index)] = & lock; + void addLock(std::mutex & lock, uint16_t stripe_index) { + monitors[stripe_index] = & lock; } void lock() { for (auto & entry : monitors) { @@ -476,7 +432,7 @@ splitOrJoin(FileStorHandlerImpl::Operation op) { document::Bucket FileStorHandlerImpl::remapMessage(api::StorageMessage& msg, const document::Bucket& source, Operation op, - std::vector<RemapInfo*>& targets, uint16_t& targetDisk, api::ReturnCode& returnCode) + std::vector<RemapInfo*>& targets, api::ReturnCode& returnCode) { document::Bucket newBucket = source; @@ -496,7 +452,6 @@ FileStorHandlerImpl::remapMessage(api::StorageMessage& msg, const document::Buck if (idx > -1) { cmd.remapBucketId(targets[idx]->bucket.getBucketId()); targets[idx]->foundInQueue = true; - targetDisk = targets[idx]->diskIndex; #if defined(ENABLE_BUCKET_OPERATION_LOGGING) { vespalib::string desc = vespalib::make_string( @@ -539,7 +494,6 @@ FileStorHandlerImpl::remapMessage(api::StorageMessage& msg, const document::Buck cmd.toString().c_str(), targets[0]->bucket.getBucketId().toString().c_str()); cmd.remapBucketId(targets[0]->bucket.getBucketId()); newBucket = targets[0]->bucket; - targetDisk = targets[0]->diskIndex; #ifdef ENABLE_BUCKET_OPERATION_LOGGING { vespalib::string desc = vespalib::make_string( @@ -588,7 +542,6 @@ FileStorHandlerImpl::remapMessage(api::StorageMessage& msg, const document::Buck api::BucketCommand& cmd(static_cast<api::BucketCommand&>(msg)); if (cmd.getBucket() == source) { if (op == MOVE) { - targetDisk = targets[0]->diskIndex; } else if (op == SPLIT) { returnCode = api::ReturnCode(api::ReturnCode::BUCKET_DELETED, "Bucket split while operation enqueued"); } else { @@ -607,7 +560,6 @@ FileStorHandlerImpl::remapMessage(api::StorageMessage& msg, const document::Buck api::BucketCommand& cmd(static_cast<api::BucketCommand&>(msg)); if (cmd.getBucket() == source) { if (op == MOVE) { - targetDisk = targets[0]->diskIndex; } else { returnCode = api::ReturnCode(api::ReturnCode::BUCKET_DELETED, splitOrJoin(op)); } @@ -622,7 +574,6 @@ FileStorHandlerImpl::remapMessage(api::StorageMessage& msg, const document::Buck api::BucketCommand& cmd(static_cast<api::BucketCommand&>(msg)); if (cmd.getBucket() == source) { if (op == MOVE) { - targetDisk = targets[0]->diskIndex; } } break; @@ -641,7 +592,6 @@ FileStorHandlerImpl::remapMessage(api::StorageMessage& msg, const document::Buck // Fail with bucket not found if op != MOVE if (bucket == source) { if (op == MOVE) { - targetDisk = targets[0]->diskIndex; } else { returnCode = api::ReturnCode(api::ReturnCode::BUCKET_DELETED, splitOrJoin(op)); } @@ -653,7 +603,6 @@ FileStorHandlerImpl::remapMessage(api::StorageMessage& msg, const document::Buck // Fail with bucket not found if op != MOVE if (bucket == source) { if (op == MOVE) { - targetDisk = targets[0]->diskIndex; } else { returnCode = api::ReturnCode(api::ReturnCode::BUCKET_DELETED, splitOrJoin(op)); } @@ -714,14 +663,13 @@ FileStorHandlerImpl::remapQueueNoLock(Disk& from, const RemapInfo& source, // If set to something other than source.diskIndex, move this message // to that queue. MessageEntry& entry = entriesFound[i]; - uint16_t targetDisk = source.diskIndex; // If not OK, reply to this message with the following message api::ReturnCode returnCode(api::ReturnCode::OK); api::StorageMessage& msg(*entry._command); assert(entry._bucket == source.bucket); - document::Bucket bucket = remapMessage(msg, source.bucket, op, targets, targetDisk, returnCode); + document::Bucket bucket = remapMessage(msg, source.bucket, op, targets, returnCode); if (returnCode.getResult() != api::ReturnCode::OK) { // Fail message if errorcode set @@ -739,7 +687,7 @@ FileStorHandlerImpl::remapQueueNoLock(Disk& from, const RemapInfo& source, assert(bucket == source.bucket || std::find_if(targets.begin(), targets.end(), [bucket](auto* e){ return e->bucket == bucket; }) != targets.end()); - _diskInfo[targetDisk].stripe(bucket).exposeQueue().emplace_back(std::move(entry)); + _disk.stripe(bucket).exposeQueue().emplace_back(std::move(entry)); } } @@ -752,12 +700,10 @@ FileStorHandlerImpl::remapQueue(const RemapInfo& source, RemapInfo& target, Oper // the same bucket. Will fix order if we accept wrong order later. MultiLockGuard guard; - Disk& from(_diskInfo[source.diskIndex]); - guard.addLock(from.stripe(source.bucket).exposeLock(), source.diskIndex, from.stripe_index(source.bucket)); + guard.addLock(_disk.stripe(source.bucket).exposeLock(), _disk.stripe_index(source.bucket)); - Disk& to1(_diskInfo[target.diskIndex]); if (target.bucket.getBucketId().getRawId() != 0) { - guard.addLock(to1.stripe(target.bucket).exposeLock(), target.diskIndex, to1.stripe_index(target.bucket)); + guard.addLock(_disk.stripe(target.bucket).exposeLock(), _disk.stripe_index(target.bucket)); } std::vector<RemapInfo*> targets; @@ -765,7 +711,7 @@ FileStorHandlerImpl::remapQueue(const RemapInfo& source, RemapInfo& target, Oper guard.lock(); - remapQueueNoLock(from, source, targets, op); + remapQueueNoLock(_disk, source, targets, op); } void @@ -775,17 +721,14 @@ FileStorHandlerImpl::remapQueue(const RemapInfo& source, RemapInfo& target1, Rem // the same bucket. Will fix order if we accept wrong order later. MultiLockGuard guard; - Disk& from(_diskInfo[source.diskIndex]); - guard.addLock(from.stripe(source.bucket).exposeLock(), source.diskIndex, from.stripe_index(source.bucket)); + guard.addLock(_disk.stripe(source.bucket).exposeLock(), _disk.stripe_index(source.bucket)); - Disk& to1(_diskInfo[target1.diskIndex]); if (target1.bucket.getBucketId().getRawId() != 0) { - guard.addLock(to1.stripe(target1.bucket).exposeLock(), target1.diskIndex, to1.stripe_index(target1.bucket)); + guard.addLock(_disk.stripe(target1.bucket).exposeLock(), _disk.stripe_index(target1.bucket)); } - Disk& to2(_diskInfo[target2.diskIndex]); if (target2.bucket.getBucketId().getRawId() != 0) { - guard.addLock(to2.stripe(target2.bucket).exposeLock(), target2.diskIndex, to2.stripe_index(target2.bucket)); + guard.addLock(_disk.stripe(target2.bucket).exposeLock(), _disk.stripe_index(target2.bucket)); } guard.lock(); @@ -794,7 +737,7 @@ FileStorHandlerImpl::remapQueue(const RemapInfo& source, RemapInfo& target1, Rem targets.push_back(&target1); targets.push_back(&target2); - remapQueueNoLock(from, source, targets, op); + remapQueueNoLock(_disk, source, targets, op); } void @@ -1180,12 +1123,6 @@ FileStorHandlerImpl::Disk::getQueueSize() const noexcept return sum; } -uint32_t -FileStorHandlerImpl::getQueueSize(uint16_t disk) const -{ - return _diskInfo[disk].getQueueSize(); -} - FileStorHandlerImpl::BucketLock::BucketLock(const monitor_guard & guard, Stripe& stripe, const document::Bucket &bucket, uint8_t priority, api::MessageType::Id msgType, api::StorageMessage::Id msgId, @@ -1295,22 +1232,21 @@ FileStorHandlerImpl::getStatus(std::ostream& out, const framework::HttpUrlPath& { bool verbose = path.hasAttribute("verbose"); out << "<h1>Filestor handler</h1>\n"; - for (uint32_t i=0; i<_diskInfo.size(); ++i) { - out << "<h2>Disk " << i << "</h2>\n"; - const Disk& disk(_diskInfo[i]); - out << "Queue size: " << disk.getQueueSize() << "<br>\n"; - out << "Disk state: "; - switch (disk.getState()) { - case FileStorHandler::AVAILABLE: out << "AVAILABLE"; break; - case FileStorHandler::DISABLED: out << "DISABLED"; break; - case FileStorHandler::CLOSED: out << "CLOSED"; break; - } - out << "<h4>Active operations</h4>\n"; - disk.dumpActiveHtml(out); - if (!verbose) continue; + + out << "<h2>Disk " << "</h2>\n"; + out << "Queue size: " << _disk.getQueueSize() << "<br>\n"; + out << "Disk state: "; + switch (_disk.getState()) { + case FileStorHandler::AVAILABLE: out << "AVAILABLE"; break; + case FileStorHandler::DISABLED: out << "DISABLED"; break; + case FileStorHandler::CLOSED: out << "CLOSED"; break; + } + out << "<h4>Active operations</h4>\n"; + _disk.dumpActiveHtml(out); + if (verbose) { out << "<h4>Input queue</h4>\n"; out << "<ul>\n"; - disk.dumpQueueHtml(out); + _disk.dumpQueueHtml(out); out << "</ul>\n"; } @@ -1330,9 +1266,7 @@ FileStorHandlerImpl::getStatus(std::ostream& out, const framework::HttpUrlPath& void FileStorHandlerImpl::waitUntilNoLocks() { - for (const auto & disk : _diskInfo) { - disk.waitUntilNoLocks(); - } + _disk.waitUntilNoLocks(); } ResumeGuard diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h index 90b0e559899..f397b82d199 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h @@ -241,20 +241,20 @@ public: void setGetNextMessageTimeout(vespalib::duration timeout) { _getNextMessageTimeout = timeout; } void flush(bool killPendingMerges); - void setDiskState(uint16_t disk, DiskState state); - DiskState getDiskState(uint16_t disk) const; + void setDiskState(DiskState state); + DiskState getDiskState() const; void close(); - bool schedule(const std::shared_ptr<api::StorageMessage>&, uint16_t disk); + bool schedule(const std::shared_ptr<api::StorageMessage>&); - FileStorHandler::LockedMessage getNextMessage(uint16_t disk, uint32_t stripeId); + FileStorHandler::LockedMessage getNextMessage(uint32_t stripeId); enum Operation { MOVE, SPLIT, JOIN }; void remapQueue(const RemapInfo& source, RemapInfo& target, Operation op); void remapQueue(const RemapInfo& source, RemapInfo& target1, RemapInfo& target2, Operation op); - void failOperations(const document::Bucket & bucket, uint16_t disk, const api::ReturnCode & code) { - _diskInfo[disk].failOperations(bucket, code); + void failOperations(const document::Bucket & bucket, const api::ReturnCode & code) { + _disk.failOperations(bucket, code); } void sendCommand(const std::shared_ptr<api::StorageCommand>&) override; void sendReply(const std::shared_ptr<api::StorageReply>&) override; @@ -263,12 +263,11 @@ public: void getStatus(std::ostream& out, const framework::HttpUrlPath& path) const; uint32_t getQueueSize() const; - uint32_t getQueueSize(uint16_t disk) const; - uint32_t getNextStripeId(uint32_t disk); + uint32_t getNextStripeId(); std::shared_ptr<FileStorHandler::BucketLockInterface> - lock(const document::Bucket & bucket, uint16_t disk, api::LockingRequirements lockReq) { - return _diskInfo[disk].lock(bucket, lockReq); + lock(const document::Bucket & bucket, api::LockingRequirements lockReq) { + return _disk.lock(bucket, lockReq); } void addMergeStatus(const document::Bucket&, MergeStatus::SP); @@ -277,8 +276,8 @@ public: uint32_t getNumActiveMerges() const; void clearMergeStatus(const document::Bucket&, const api::ReturnCode*); - std::string dumpQueue(uint16_t disk) const { - return _diskInfo[disk].dumpQueue(); + std::string dumpQueue() const { + return _disk.dumpQueue(); } ResumeGuard pause(); void resume() override; @@ -286,7 +285,7 @@ public: private: ServiceLayerComponent _component; - std::vector<Disk> _diskInfo; + Disk _disk; MessageSender& _messageSender; const document::BucketIdFactory& _bucketIdFactory; mutable std::mutex _mergeStatesLock; @@ -307,7 +306,7 @@ private: * recheck pause status. Returns true if filestor isn't paused at the time * of the first check or after the wait, false if it's still paused. */ - bool tryHandlePause(uint16_t disk) const; + bool tryHandlePause() const; /** * Checks whether the entire filestor layer is paused. @@ -335,7 +334,7 @@ private: document::Bucket remapMessage(api::StorageMessage& msg, const document::Bucket &source, Operation op, - std::vector<RemapInfo*>& targets, uint16_t& targetDisk, api::ReturnCode& returnCode); + std::vector<RemapInfo*>& targets, api::ReturnCode& returnCode); void remapQueueNoLock(Disk& from, const RemapInfo& source, std::vector<RemapInfo*>& targets, Operation op); diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp index c4369a94161..88bfb18841c 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp @@ -37,7 +37,7 @@ FileStorManager(const config::ConfigUri & configUri, _provider(&_providerErrorWrapper), _bucketIdFactory(_component.getBucketIdFactory()), _configUri(configUri), - _disks(), + _threads(), _bucketOwnershipNotifier(std::make_unique<BucketOwnershipNotifier>(_component, *this)), _configFetcher(_configUri.getContext()), _threadLockCheckInterval(60), @@ -59,25 +59,21 @@ FileStorManager::~FileStorManager() LOG(debug, "Deleting link %s. Giving filestor threads stop signal.", toString().c_str()); - for (uint32_t i = 0; i < _disks.size(); ++i) { - for (uint32_t j = 0; j < _disks[i].size(); ++j) { - if (_disks[i][j].get() != 0) { - _disks[i][j]->getThread().interrupt(); - } + for (const auto & thread : _threads) { + if (thread) { + thread->getThread().interrupt(); } } - for (uint32_t i = 0; i < _disks.size(); ++i) { - for (uint32_t j = 0; j < _disks[i].size(); ++j) { - if (_disks[i][j].get() != 0) { - _disks[i][j]->getThread().join(); - } + for (const auto & thread : _threads) { + if (thread) { + thread->getThread().join(); } } LOG(debug, "Closing all filestor queues, answering queued messages. New messages will be refused."); _filestorHandler->close(); LOG(debug, "Deleting filestor threads. Waiting for their current operation " "to finish. Stop their threads and delete objects."); - _disks.clear(); + _threads.clear(); } void @@ -115,29 +111,27 @@ void FileStorManager::configure(std::unique_ptr<vespa::config::content::StorFilestorConfig> config) { // If true, this is not the first configure. - bool liveUpdate = (_disks.size() != 0); + bool liveUpdate = ! _threads.empty(); _threadLockCheckInterval = config->diskOperationTimeout; _failDiskOnError = (config->failDiskAfterErrorCount > 0); if (!liveUpdate) { _config = std::move(config); - _disks.resize(_component.getDiskCount()); + assert(_component.getDiskCount() == 1); size_t numThreads = _config->numThreads; size_t numStripes = std::max(size_t(1u), numThreads / 2); - _metrics->initDiskMetrics(_disks.size(), _component.getLoadTypes()->getMetricLoadTypes(), numStripes, numThreads); + _metrics->initDiskMetrics(1, _component.getLoadTypes()->getMetricLoadTypes(), numStripes, numThreads); _filestorHandler = std::make_unique<FileStorHandler>(numThreads, numStripes, *this, *_metrics, _compReg); uint32_t numResponseThreads = computeNumResponseThreads(_config->numResponseThreads); if (numResponseThreads > 0) { _sequencedExecutor = vespalib::SequencedTaskExecutor::create(numResponseThreads, 10000, selectSequencer(_config->responseSequencerType)); } - for (uint32_t i=0; i<_component.getDiskCount(); ++i) { - LOG(spam, "Setting up disk %u", i); - for (uint32_t j = 0; j < numThreads; j++) { - _disks[i].push_back(std::make_shared<PersistenceThread>(_sequencedExecutor.get(), _compReg, _configUri, *_provider, - *_filestorHandler, *_metrics->disks[i]->threads[j], i)); - } + LOG(spam, "Setting up the disk"); + for (uint32_t j = 0; j < numThreads; j++) { + _threads.push_back(std::make_shared<PersistenceThread>(_sequencedExecutor.get(), _compReg, _configUri, *_provider, + *_filestorHandler, *_metrics->disks[0]->threads[j])); } } } @@ -224,23 +218,23 @@ FileStorManager::mapOperationToBucketAndDisk(api::BucketCommand& cmd, const docu } bool -FileStorManager::handlePersistenceMessage( const shared_ptr<api::StorageMessage>& msg, uint16_t disk) +FileStorManager::handlePersistenceMessage( const shared_ptr<api::StorageMessage>& msg) { api::ReturnCode errorCode(api::ReturnCode::OK); do { - LOG(spam, "Received %s. Attempting to queue it to disk %u.", msg->getType().getName().c_str(), disk); + LOG(spam, "Received %s. Attempting to queue it.", msg->getType().getName().c_str()); LOG_BUCKET_OPERATION_NO_LOCK( getStorageMessageBucket(*msg).getBucketId(), - vespalib::make_string("Attempting to queue %s to disk %u", msg->toString().c_str(), disk)); + vespalib::make_string("Attempting to queue %s", msg->toString().c_str())); - if (_filestorHandler->schedule(msg, disk)) { - LOG(spam, "Received persistence message %s. Queued it to disk %u", - msg->getType().getName().c_str(), disk); + if (_filestorHandler->schedule(msg)) { + LOG(spam, "Received persistence message %s. Queued it to disk", + msg->getType().getName().c_str()); return true; } - switch (_filestorHandler->getDiskState(disk)) { + switch (_filestorHandler->getDiskState()) { case FileStorHandler::DISABLED: errorCode = api::ReturnCode(api::ReturnCode::DISK_FAILURE, "Disk disabled"); break; @@ -277,7 +271,7 @@ FileStorManager::onPut(const shared_ptr<api::PutCommand>& cmd) } StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk(*cmd, &cmd->getDocumentId())); if (entry.exist()) { - handlePersistenceMessage(cmd, entry->disk); + handlePersistenceMessage(cmd); } return true; } @@ -296,7 +290,7 @@ FileStorManager::onUpdate(const shared_ptr<api::UpdateCommand>& cmd) } StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk(*cmd, &cmd->getDocumentId())); if (entry.exist()) { - handlePersistenceMessage(cmd, entry->disk); + handlePersistenceMessage(cmd); } return true; } @@ -306,7 +300,7 @@ FileStorManager::onGet(const shared_ptr<api::GetCommand>& cmd) { StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk(*cmd, &cmd->getDocumentId())); if (entry.exist()) { - handlePersistenceMessage(cmd, entry->disk); + handlePersistenceMessage(cmd); } return true; } @@ -325,7 +319,7 @@ FileStorManager::onRemove(const shared_ptr<api::RemoveCommand>& cmd) } StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk(*cmd, &cmd->getDocumentId())); if (entry.exist()) { - handlePersistenceMessage(cmd, entry->disk); + handlePersistenceMessage(cmd); } return true; } @@ -335,7 +329,7 @@ FileStorManager::onRevert(const shared_ptr<api::RevertCommand>& cmd) { StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk(*cmd, 0)); if (entry.exist()) { - handlePersistenceMessage(cmd, entry->disk); + handlePersistenceMessage(cmd); } return true; } @@ -345,7 +339,7 @@ FileStorManager::onRemoveLocation(const std::shared_ptr<api::RemoveLocationComma { StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucket())); if (entry.exist()) { - handlePersistenceMessage(cmd, entry->disk); + handlePersistenceMessage(cmd); } return true; } @@ -355,7 +349,7 @@ FileStorManager::onStatBucket(const std::shared_ptr<api::StatBucketCommand>& cmd { StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucket())); if (entry.exist()) { - handlePersistenceMessage(cmd, entry->disk); + handlePersistenceMessage(cmd); } return true; } @@ -377,17 +371,14 @@ FileStorManager::onCreateBucket( entry->getBucketInfo().toString().c_str()); code = api::ReturnCode(api::ReturnCode::EXISTS, "Bucket already exist"); } else { - entry->disk = _component.getIdealPartition(cmd->getBucket()); // Newly created buckets are ready but not active, unless // explicitly marked as such by the distributor. - entry->setBucketInfo(api::BucketInfo( - 0, 0, 0, 0, 0, true, cmd->getActive())); + entry->setBucketInfo(api::BucketInfo(0, 0, 0, 0, 0, true, cmd->getActive())); cmd->setPriority(0); - handlePersistenceMessage(cmd, entry->disk); + handlePersistenceMessage(cmd); entry.write(); - LOG(debug, "Created bucket %s on disk %d (node index is %d)", - cmd->getBucketId().toString().c_str(), - entry->disk, _component.getIndex()); + LOG(debug, "Created bucket %s (node index is %d)", + cmd->getBucketId().toString().c_str(), _component.getIndex()); return true; } } @@ -401,7 +392,6 @@ FileStorManager::onCreateBucket( bool FileStorManager::onDeleteBucket(const shared_ptr<api::DeleteBucketCommand>& cmd) { - uint16_t disk; { document::Bucket bucket(cmd->getBucket()); StorBucketDatabase::WrappedEntry entry(_component.getBucketDatabase(bucket.getBucketSpace()).get(bucket.getBucketId(), @@ -443,11 +433,10 @@ FileStorManager::onDeleteBucket(const shared_ptr<api::DeleteBucketCommand>& cmd) // higher priority. cmd->setPriority(0); LOG(debug, "Deleting %s", cmd->getBucketId().toString().c_str()); - handlePersistenceMessage(cmd, entry->disk); - disk = entry->disk; + handlePersistenceMessage(cmd); entry.remove(); } - _filestorHandler->failOperations(cmd->getBucket(), disk, + _filestorHandler->failOperations(cmd->getBucket(), api::ReturnCode(api::ReturnCode::BUCKET_DELETED, vespalib::make_string("Bucket %s about to be deleted anyway", cmd->getBucketId().toString().c_str()))); @@ -486,41 +475,38 @@ FileStorManager::onMergeBucket(const shared_ptr<api::MergeBucketCommand>& cmd) } if (!entry.preExisted()) { - entry->disk = _component.getIdealPartition(cmd->getBucket()); entry->info = api::BucketInfo(0, 0, 0, 0, 0, true, false); - LOG(debug, "Created bucket %s on disk %d (node index is %d) due to merge being received.", - cmd->getBucketId().toString().c_str(), entry->disk, _component.getIndex()); + LOG(debug, "Created bucket %s (node index is %d) due to merge being received.", + cmd->getBucketId().toString().c_str(), _component.getIndex()); // Call before writing bucket entry as we need to have bucket // lock while calling - handlePersistenceMessage(cmd, entry->disk); + handlePersistenceMessage(cmd); entry.write(); } else { - handlePersistenceMessage(cmd, entry->disk); + handlePersistenceMessage(cmd); } return true; } bool -FileStorManager::onGetBucketDiff( - const shared_ptr<api::GetBucketDiffCommand>& cmd) +FileStorManager::onGetBucketDiff(const shared_ptr<api::GetBucketDiffCommand>& cmd) { StorBucketDatabase::WrappedEntry entry(ensureConsistentBucket(cmd->getBucket(), *cmd, "FileStorManager::onGetBucketDiff")); if (!entry.exist()) { return true; } if (!entry.preExisted()) { - entry->disk = _component.getIdealPartition(cmd->getBucket()); - LOG(debug, "Created bucket %s on disk %d (node index is %d) due to get bucket diff being received.", - cmd->getBucketId().toString().c_str(), entry->disk, _component.getIndex()); + LOG(debug, "Created bucket %s (node index is %d) due to get bucket diff being received.", + cmd->getBucketId().toString().c_str(), _component.getIndex()); entry->info.setTotalDocumentSize(0); entry->info.setUsedFileSize(0); entry->info.setReady(true); // Call before writing bucket entry as we need to have bucket // lock while calling - handlePersistenceMessage(cmd, entry->disk); + handlePersistenceMessage(cmd); entry.write(); } else { - handlePersistenceMessage(cmd, entry->disk); + handlePersistenceMessage(cmd); } return true; } @@ -562,7 +548,7 @@ FileStorManager::onGetBucketDiffReply(const shared_ptr<api::GetBucketDiffReply>& { StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*reply, reply->getBucket())); if (validateDiffReplyBucket(entry, reply->getBucket())) { - handlePersistenceMessage(reply, entry->disk); + handlePersistenceMessage(reply); } return true; } @@ -572,7 +558,7 @@ FileStorManager::onApplyBucketDiff(const shared_ptr<api::ApplyBucketDiffCommand> { StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucket())); if (validateApplyDiffCommandBucket(*cmd, entry)) { - handlePersistenceMessage(cmd, entry->disk); + handlePersistenceMessage(cmd); } return true; } @@ -580,10 +566,9 @@ FileStorManager::onApplyBucketDiff(const shared_ptr<api::ApplyBucketDiffCommand> bool FileStorManager::onApplyBucketDiffReply(const shared_ptr<api::ApplyBucketDiffReply>& reply) { - StorBucketDatabase::WrappedEntry entry(mapOperationToDisk( - *reply, reply->getBucket())); + StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*reply, reply->getBucket())); if (validateDiffReplyBucket(entry, reply->getBucket())) { - handlePersistenceMessage(reply, entry->disk); + handlePersistenceMessage(reply); } return true; } @@ -594,13 +579,7 @@ FileStorManager::onJoinBuckets(const std::shared_ptr<api::JoinBucketsCommand>& c document::Bucket bucket(cmd->getBucket()); StorBucketDatabase::WrappedEntry entry(_component.getBucketDatabase(bucket.getBucketSpace()).get( bucket.getBucketId(), "FileStorManager::onJoinBuckets")); - uint16_t disk; - if (entry.exist()) { - disk = entry->disk; - } else { - disk = _component.getPreferredAvailablePartition(bucket); - } - return handlePersistenceMessage(cmd, disk); + return handlePersistenceMessage(cmd); } bool @@ -608,7 +587,7 @@ FileStorManager::onSplitBucket(const std::shared_ptr<api::SplitBucketCommand>& c { StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucket())); if (entry.exist()) { - handlePersistenceMessage(cmd, entry->disk); + handlePersistenceMessage(cmd); } return true; } @@ -618,7 +597,7 @@ FileStorManager::onSetBucketState(const std::shared_ptr<api::SetBucketStateComma { StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucket())); if (entry.exist()) { - handlePersistenceMessage(cmd, entry->disk); + handlePersistenceMessage(cmd); } return true; } @@ -632,7 +611,7 @@ FileStorManager::onInternal(const shared_ptr<api::InternalCommand>& msg) shared_ptr<GetIterCommand> cmd(std::static_pointer_cast<GetIterCommand>(msg)); StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucket())); if (entry.exist()) { - handlePersistenceMessage(cmd, entry->disk); + handlePersistenceMessage(cmd); } return true; } @@ -641,7 +620,7 @@ FileStorManager::onInternal(const shared_ptr<api::InternalCommand>& msg) shared_ptr<CreateIteratorCommand> cmd(std::static_pointer_cast<CreateIteratorCommand>(msg)); StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucket())); if (entry.exist()) { - handlePersistenceMessage(cmd, entry->disk); + handlePersistenceMessage(cmd); } return true; } @@ -656,7 +635,7 @@ FileStorManager::onInternal(const shared_ptr<api::InternalCommand>& msg) case ReadBucketList::ID: { shared_ptr<ReadBucketList> cmd(std::static_pointer_cast<ReadBucketList>(msg)); - handlePersistenceMessage(cmd, cmd->getPartition()); + handlePersistenceMessage(cmd); return true; } case ReadBucketInfo::ID: @@ -664,7 +643,7 @@ FileStorManager::onInternal(const shared_ptr<api::InternalCommand>& msg) shared_ptr<ReadBucketInfo> cmd(std::static_pointer_cast<ReadBucketInfo>(msg)); StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucket())); if (entry.exist()) { - handlePersistenceMessage(cmd, entry->disk); + handlePersistenceMessage(cmd); } return true; } @@ -673,7 +652,7 @@ FileStorManager::onInternal(const shared_ptr<api::InternalCommand>& msg) shared_ptr<InternalBucketJoinCommand> cmd(std::static_pointer_cast<InternalBucketJoinCommand>(msg)); StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucket())); if (entry.exist()) { - handlePersistenceMessage(cmd, entry->disk); + handlePersistenceMessage(cmd); } return true; } @@ -682,7 +661,7 @@ FileStorManager::onInternal(const shared_ptr<api::InternalCommand>& msg) shared_ptr<RecheckBucketInfoCommand> cmd(std::static_pointer_cast<RecheckBucketInfoCommand>(msg)); StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucket())); if (entry.exist()) { - handlePersistenceMessage(cmd, entry->disk); + handlePersistenceMessage(cmd); } return true; } @@ -748,7 +727,7 @@ FileStorManager::sendReplyDirectly(const std::shared_ptr<api::StorageReply>& rep if (reply->getType() == api::MessageType::INTERNAL_REPLY) { std::shared_ptr<api::InternalReply> rep(std::dynamic_pointer_cast<api::InternalReply>(reply)); - assert(rep.get()); + assert(rep); if (onInternalReply(rep)) return; } sendUp(reply); @@ -780,12 +759,10 @@ void FileStorManager::onFlush(bool downwards) LOG(debug, "Start Flushing"); _filestorHandler->flush(!downwards); LOG(debug, "Flushed _filestorHandler->flush(!downwards);"); - for (uint32_t i = 0; i < _disks.size(); ++i) { - for (uint32_t j = 0; j < _disks[i].size(); ++j) { - if (_disks[i][j]) { - _disks[i][j]->flush(); - LOG(debug, "flushed disk[%d][%d]", i, j); - } + for (const auto & thread : _threads) { + if (thread) { + thread->flush(); + LOG(debug, "flushed thread[%s]", thread->getThread().getId().c_str()); } } uint32_t queueSize = _filestorHandler->getQueueSize(); @@ -822,9 +799,7 @@ FileStorManager::reportHtmlStatus(std::ostream& out, const framework::HttpUrlPat out << "\">" << (verbose ? "Less verbose" : "More verbose") << "</a>\n" << " ]</font><br><br>\n"; - if (_disks.size()) { - out << "<p>Using " << _disks[0].size() << " threads per disk</p>\n"; - } + out << "<p>Using " << _threads.size() << " threads</p>\n"; _filestorHandler->getStatus(out, path); } diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.h b/storage/src/vespa/storage/persistence/filestorage/filestormanager.h index 6efd30419b8..54bfd927b18 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.h @@ -56,8 +56,7 @@ class FileStorManager : public StorageLinkQueued, const document::BucketIdFactory& _bucketIdFactory; config::ConfigUri _configUri; - typedef std::vector<DiskThread::SP> DiskThreads; - std::vector<DiskThreads> _disks; + std::vector<DiskThread::SP> _threads; std::unique_ptr<BucketOwnershipNotifier> _bucketOwnershipNotifier; std::unique_ptr<vespa::config::content::StorFilestorConfig> _config; @@ -114,7 +113,7 @@ private: StorBucketDatabase::WrappedEntry mapOperationToDisk(api::StorageMessage&, const document::Bucket&); StorBucketDatabase::WrappedEntry mapOperationToBucketAndDisk(api::BucketCommand&, const document::DocumentId*); - bool handlePersistenceMessage(const std::shared_ptr<api::StorageMessage>&, uint16_t disk); + bool handlePersistenceMessage(const std::shared_ptr<api::StorageMessage>&); // Document operations bool onPut(const std::shared_ptr<api::PutCommand>&) override; diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp index 4e1076d9214..acbbe782099 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.cpp +++ b/storage/src/vespa/storage/persistence/mergehandler.cpp @@ -848,7 +848,6 @@ MergeHandler::handleMergeBucket(api::MergeBucketCommand& cmd, MessageTracker::UP { tracker->setMetric(_env._metrics.mergeBuckets); - assert(_env._partition == 0u); spi::Bucket bucket(cmd.getBucket()); LOG(debug, "MergeBucket(%s) with max timestamp %" PRIu64 ".", bucket.toString().c_str(), cmd.getMaxTimestamp()); @@ -1059,7 +1058,6 @@ MessageTracker::UP MergeHandler::handleGetBucketDiff(api::GetBucketDiffCommand& cmd, MessageTracker::UP tracker) { tracker->setMetric(_env._metrics.getBucketDiff); - assert(_env._partition == 0u); spi::Bucket bucket(cmd.getBucket()); LOG(debug, "GetBucketDiff(%s)", bucket.toString().c_str()); checkResult(_spi.createBucket(bucket, tracker->context()), bucket, "create bucket"); @@ -1171,7 +1169,6 @@ void MergeHandler::handleGetBucketDiffReply(api::GetBucketDiffReply& reply, MessageSender& sender) { _env._metrics.getBucketDiffReply.inc(); - assert(_env._partition == 0u); spi::Bucket bucket(reply.getBucket()); LOG(debug, "GetBucketDiffReply(%s)", bucket.toString().c_str()); @@ -1246,7 +1243,6 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTra { tracker->setMetric(_env._metrics.applyBucketDiff); - assert(_env._partition == 0u); spi::Bucket bucket(cmd.getBucket()); LOG(debug, "%s", cmd.toString().c_str()); @@ -1334,7 +1330,6 @@ void MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,MessageSender& sender) { _env._metrics.applyBucketDiffReply.inc(); - assert(_env._partition == 0u); spi::Bucket bucket(reply.getBucket()); std::vector<api::ApplyBucketDiffCommand::Entry>& diff(reply.getDiff()); LOG(debug, "%s", reply.toString().c_str()); diff --git a/storage/src/vespa/storage/persistence/persistencethread.cpp b/storage/src/vespa/storage/persistence/persistencethread.cpp index da0f06bf662..1e0bcac28fa 100644 --- a/storage/src/vespa/storage/persistence/persistencethread.cpp +++ b/storage/src/vespa/storage/persistence/persistencethread.cpp @@ -99,10 +99,9 @@ PersistenceThread::PersistenceThread(vespalib::ISequencedTaskExecutor * sequence const config::ConfigUri & configUri, spi::PersistenceProvider& provider, FileStorHandler& filestorHandler, - FileStorThreadMetrics& metrics, - uint16_t deviceIndex) - : _stripeId(filestorHandler.getNextStripeId(deviceIndex)), - _env(configUri, compReg, filestorHandler, metrics, deviceIndex, provider), + FileStorThreadMetrics& metrics) + : _stripeId(filestorHandler.getNextStripeId()), + _env(configUri, compReg, filestorHandler, metrics, provider), _sequencedExecutor(sequencedExecutor), _spi(provider), _processAllHandler(_env, provider), @@ -110,7 +109,7 @@ PersistenceThread::PersistenceThread(vespalib::ISequencedTaskExecutor * sequence _bucketOwnershipNotifier() { std::ostringstream threadName; - threadName << "Disk " << _env._partition << " thread " << _stripeId; + threadName << "Thread " << _stripeId; _component = std::make_unique<ServiceLayerComponent>(compReg, threadName.str()); _bucketOwnershipNotifier = std::make_unique<BucketOwnershipNotifier>(*_component, filestorHandler); _thread = _component->startThread(*this, 60s, 1s); @@ -137,7 +136,6 @@ PersistenceThread::getBucket(const DocumentId& id, const document::Bucket &bucke + "bucket " + bucket.getBucketId().toString() + ".", VESPA_STRLOC); } - assert(_env._partition == 0u); return spi::Bucket(bucket); } @@ -329,7 +327,6 @@ MessageTracker::UP PersistenceThread::handleRevert(api::RevertCommand& cmd, MessageTracker::UP tracker) { tracker->setMetric(_env._metrics.revert[cmd.getLoadType()]); - assert(_env._partition == 0u); spi::Bucket b = spi::Bucket(cmd.getBucket()); const std::vector<api::Timestamp> & tokens = cmd.getRevertTokens(); for (const api::Timestamp & token : tokens) { @@ -347,7 +344,6 @@ PersistenceThread::handleCreateBucket(api::CreateBucketCommand& cmd, MessageTrac LOG(warning, "Bucket %s was merging at create time. Unexpected.", cmd.getBucketId().toString().c_str()); DUMP_LOGGED_BUCKET_OPERATIONS(cmd.getBucketId()); } - assert(_env._partition == 0u); spi::Bucket spiBucket(cmd.getBucket()); _spi.createBucket(spiBucket, tracker->context()); if (cmd.getActive()) { @@ -407,7 +403,6 @@ PersistenceThread::handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTrac _env._fileStorHandler.clearMergeStatus(cmd.getBucket(), api::ReturnCode(api::ReturnCode::ABORTED, "Bucket was deleted during the merge")); } - assert(_env._partition == 0u); spi::Bucket bucket(cmd.getBucket()); if (!checkProviderBucketInfoMatches(bucket, cmd.getBucketInfo())) { return tracker; @@ -486,7 +481,6 @@ PersistenceThread::handleCreateIterator(CreateIteratorCommand& cmd, MessageTrack if ( ! fieldSet) { return tracker; } tracker->context().setReadConsistency(cmd.getReadConsistency()); - assert(_env._partition == 0u); spi::CreateIteratorResult result(_spi.createIterator( spi::Bucket(cmd.getBucket()), std::move(fieldSet), cmd.getSelection(), cmd.getIncludedVersions(), tracker->context())); @@ -514,7 +508,6 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd, MessageTracke return tracker; } - assert(_env._partition == 0u); spi::Bucket spiBucket(cmd.getBucket()); SplitBitDetector::Result targetInfo; if (_env._config.enableMultibitSplitOptimalization) { @@ -556,8 +549,6 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd, MessageTracke } } #endif - assert(lock1.disk == 0u); - assert(lock2.disk == 0u); spi::Result result = _spi.split(spiBucket, spi::Bucket(target1), spi::Bucket(target2), tracker->context()); if (result.hasError()) { @@ -576,14 +567,12 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd, MessageTracke std::vector<TargetInfo> targets; for (uint32_t i = 0; i < 2; i++) { const document::Bucket &target(i == 0 ? target1 : target2); - uint16_t disk(i == 0 ? lock1.disk : lock2.disk); assert(target.getBucketId().getRawId() != 0); targets.emplace_back(_env.getBucketDatabase(target.getBucketSpace()).get( target.getBucketId(), "PersistenceThread::handleSplitBucket - Target", StorBucketDatabase::CREATE_IF_NONEXISTING), - FileStorHandler::RemapInfo(target, disk)); - targets.back().first->setBucketInfo(_env.getBucketInfo(target, disk)); - targets.back().first->disk = disk; + FileStorHandler::RemapInfo(target)); + targets.back().first->setBucketInfo(_env.getBucketInfo(target)); } if (LOG_WOULD_LOG(spam)) { api::BucketInfo targ1(targets[0].first->getBucketInfo()); @@ -596,7 +585,7 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd, MessageTracke target2.getBucketId().toString().c_str(), targ2.getMetaCount()); } - FileStorHandler::RemapInfo source(cmd.getBucket(), _env._partition); + FileStorHandler::RemapInfo source(cmd.getBucket()); _env._fileStorHandler.remapQueueAfterSplit(source, targets[0].second, targets[1].second); bool ownershipChanged(!_bucketOwnershipNotifier->distributorOwns(cmd.getSourceIndex(), cmd.getBucket())); // Now release all the bucketdb locks. @@ -613,7 +602,6 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd, MessageTracke // Must make sure target bucket exists when we have pending ops // to an empty target bucket, since the provider will have // implicitly erased it by this point. - assert(target.second.diskIndex == 0u); spi::Bucket createTarget(spi::Bucket(target.second.bucket)); LOG(debug, "Split target %s was empty, but re-creating it since there are remapped operations queued to it", createTarget.toString().c_str()); @@ -679,7 +667,6 @@ PersistenceThread::handleJoinBuckets(api::JoinBucketsCommand& cmd, MessageTracke StorBucketDatabase::WrappedEntry entry = _env.getBucketDatabase(destBucket.getBucketSpace()).get(destBucket.getBucketId(), "join", StorBucketDatabase::CREATE_IF_NONEXISTING); - entry->disk = _env._partition; entry.write(); } @@ -705,9 +692,6 @@ PersistenceThread::handleJoinBuckets(api::JoinBucketsCommand& cmd, MessageTracke } } #endif - assert(lock1.disk == 0u); - assert(lock2.disk == 0u); - assert(_env._partition == 0u); spi::Result result = _spi.join(spi::Bucket(firstBucket), spi::Bucket(secondBucket), @@ -719,9 +703,8 @@ PersistenceThread::handleJoinBuckets(api::JoinBucketsCommand& cmd, MessageTracke uint64_t lastModified = 0; for (uint32_t i = 0; i < cmd.getSourceBuckets().size(); i++) { document::Bucket srcBucket(destBucket.getBucketSpace(), cmd.getSourceBuckets()[i]); - uint16_t disk = (i == 0) ? lock1.disk : lock2.disk; - FileStorHandler::RemapInfo target(cmd.getBucket(), _env._partition); - _env._fileStorHandler.remapQueueAfterJoin(FileStorHandler::RemapInfo(srcBucket, disk), target); + FileStorHandler::RemapInfo target(cmd.getBucket()); + _env._fileStorHandler.remapQueueAfterJoin(FileStorHandler::RemapInfo(srcBucket), target); // Remove source from bucket db. StorBucketDatabase::WrappedEntry entry = _env.getBucketDatabase(srcBucket.getBucketSpace()).get(srcBucket.getBucketId(), "join-remove-source"); @@ -749,7 +732,6 @@ PersistenceThread::handleSetBucketState(api::SetBucketStateCommand& cmd, Message NotificationGuard notifyGuard(*_bucketOwnershipNotifier); LOG(debug, "handleSetBucketState(): %s", cmd.toString().c_str()); - assert(_env._partition == 0u); spi::Bucket bucket(cmd.getBucket()); bool shouldBeActive(cmd.getState() == api::SetBucketStateCommand::ACTIVE); spi::BucketInfo::ActiveState newState(shouldBeActive ? spi::BucketInfo::ACTIVE : spi::BucketInfo::NOT_ACTIVE); @@ -785,7 +767,6 @@ PersistenceThread::handleInternalBucketJoin(InternalBucketJoinCommand& cmd, Mess _env.getBucketDatabase(destBucket.getBucketSpace()).get( destBucket.getBucketId(), "join", StorBucketDatabase::CREATE_IF_NONEXISTING); - entry->disk = _env._partition; entry.write(); } assert(cmd.getDiskOfInstanceToJoin() == 0u); @@ -939,7 +920,7 @@ PersistenceThread::processMessage(api::StorageMessage& msg, MessageTracker::UP t void PersistenceThread::processLockedMessage(FileStorHandler::LockedMessage lock) { - LOG(debug, "Partition %d, nodeIndex %d, ptr=%p", _env._partition, _env._nodeIndex, lock.second.get()); + LOG(debug, "NodeIndex %d, ptr=%p", _env._nodeIndex, lock.second.get()); api::StorageMessage & msg(*lock.second); // Important: we _copy_ the message shared_ptr instead of moving to ensure that `msg` remains @@ -956,10 +937,10 @@ PersistenceThread::run(framework::ThreadHandle& thread) { LOG(debug, "Started persistence thread"); - while (!thread.interrupted() && !_env._fileStorHandler.closed(_env._partition)) { + while (!thread.interrupted() && !_env._fileStorHandler.closed()) { thread.registerTick(); - FileStorHandler::LockedMessage lock(_env._fileStorHandler.getNextMessage(_env._partition, _stripeId)); + FileStorHandler::LockedMessage lock(_env._fileStorHandler.getNextMessage(_stripeId)); if (lock.first) { processLockedMessage(std::move(lock)); diff --git a/storage/src/vespa/storage/persistence/persistencethread.h b/storage/src/vespa/storage/persistence/persistencethread.h index ddc1a0ac217..d2216d6fb5e 100644 --- a/storage/src/vespa/storage/persistence/persistencethread.h +++ b/storage/src/vespa/storage/persistence/persistencethread.h @@ -22,7 +22,7 @@ class PersistenceThread final : public DiskThread, public Types public: PersistenceThread(vespalib::ISequencedTaskExecutor *, ServiceLayerComponentRegister&, const config::ConfigUri & configUri, spi::PersistenceProvider& provider, - FileStorHandler& filestorHandler, FileStorThreadMetrics& metrics, uint16_t deviceIndex); + FileStorHandler& filestorHandler, FileStorThreadMetrics& metrics); ~PersistenceThread() override; /** Waits for current operation to be finished. */ diff --git a/storage/src/vespa/storage/persistence/persistenceutil.cpp b/storage/src/vespa/storage/persistence/persistenceutil.cpp index 0d9a4a06cee..24d91253358 100644 --- a/storage/src/vespa/storage/persistence/persistenceutil.cpp +++ b/storage/src/vespa/storage/persistence/persistenceutil.cpp @@ -89,11 +89,11 @@ MessageTracker::sendReply() { vespalib::duration duration = vespalib::from_s(_timer.getElapsedTimeAsDouble()/1000.0); if (duration >= WARN_ON_SLOW_OPERATIONS) { LOGBT(warning, _msg->getType().toString(), - "Slow processing of message %s on disk %u. Processing time: %1.1f s (>=%1.1f s)", - _msg->toString().c_str(), _env._partition, vespalib::to_s(duration), vespalib::to_s(WARN_ON_SLOW_OPERATIONS)); + "Slow processing of message %s. Processing time: %1.1f s (>=%1.1f s)", + _msg->toString().c_str(), vespalib::to_s(duration), vespalib::to_s(WARN_ON_SLOW_OPERATIONS)); } else { - LOGBT(spam, _msg->getType().toString(), "Processing time of message %s on disk %u: %1.1f s", - _msg->toString(true).c_str(), _env._partition, vespalib::to_s(duration)); + LOGBT(spam, _msg->getType().toString(), "Processing time of message %s: %1.1f s", + _msg->toString(true).c_str(), vespalib::to_s(duration)); } if (hasReply()) { if ( ! _context.getTrace().getRoot().isEmpty()) { @@ -166,13 +166,11 @@ PersistenceUtil::PersistenceUtil( ServiceLayerComponentRegister& compReg, FileStorHandler& fileStorHandler, FileStorThreadMetrics& metrics, - uint16_t partition, spi::PersistenceProvider& provider) : _config(*config::ConfigGetter<vespa::config::content::StorFilestorConfig>::getConfig(configUri.getConfigId(), configUri.getContext())), _compReg(compReg), _component(compReg, generateName(this)), _fileStorHandler(fileStorHandler), - _partition(partition), _nodeIndex(_component.getIndex()), _metrics(metrics), _bucketFactory(_component.getBucketIdFactory()), @@ -220,24 +218,18 @@ PersistenceUtil::lockAndGetDisk(const document::Bucket &bucket, // the bucket DB again to verify that the bucket is still on that // disk after locking it, or we will have to retry on new disk. LockResult result; - result.disk = getPreferredAvailableDisk(bucket); while (true) { // This function is only called in a context where we require exclusive // locking (split/join). Refactor if this no longer the case. std::shared_ptr<FileStorHandler::BucketLockInterface> lock( - _fileStorHandler.lock(bucket, result.disk, api::LockingRequirements::Exclusive)); + _fileStorHandler.lock(bucket, api::LockingRequirements::Exclusive)); // TODO disks are no longer used in practice, can we safely discard this? // Might need it for synchronization purposes if something has taken the // disk lock _and_ the bucket lock...? StorBucketDatabase::WrappedEntry entry(getBucketDatabase(bucket.getBucketSpace()).get( bucket.getBucketId(), "join-lockAndGetDisk-1", flags)); - if (entry.exist() && entry->disk != result.disk) { - result.disk = entry->disk; - continue; - } - result.lock = lock; return result; } @@ -246,7 +238,7 @@ PersistenceUtil::lockAndGetDisk(const document::Bucket &bucket, void PersistenceUtil::setBucketInfo(MessageTracker& tracker, const document::Bucket &bucket) { - api::BucketInfo info = getBucketInfo(bucket, _partition); + api::BucketInfo info = getBucketInfo(bucket); static_cast<api::BucketInfoReply&>(tracker.getReply()).setBucketInfo(info); @@ -254,13 +246,8 @@ PersistenceUtil::setBucketInfo(MessageTracker& tracker, const document::Bucket & } api::BucketInfo -PersistenceUtil::getBucketInfo(const document::Bucket &bucket, int disk) const +PersistenceUtil::getBucketInfo(const document::Bucket &bucket) const { - if (disk == -1) { - disk = _partition; - } - - assert(disk == 0u); spi::BucketInfoResult response = _spi.getBucketInfo(spi::Bucket(bucket)); return convertBucketInfo(response.getBucketInfo()); diff --git a/storage/src/vespa/storage/persistence/persistenceutil.h b/storage/src/vespa/storage/persistence/persistenceutil.h index 51eb2b4b590..fe475d96077 100644 --- a/storage/src/vespa/storage/persistence/persistenceutil.h +++ b/storage/src/vespa/storage/persistence/persistenceutil.h @@ -99,7 +99,6 @@ struct PersistenceUtil { ServiceLayerComponentRegister &_compReg; ServiceLayerComponent _component; FileStorHandler &_fileStorHandler; - uint16_t _partition; uint16_t _nodeIndex; FileStorThreadMetrics &_metrics; const document::BucketIdFactory &_bucketFactory; @@ -111,7 +110,6 @@ struct PersistenceUtil { ServiceLayerComponentRegister&, FileStorHandler& fileStorHandler, FileStorThreadMetrics& metrics, - uint16_t partition, spi::PersistenceProvider& provider); ~PersistenceUtil(); @@ -127,18 +125,16 @@ struct PersistenceUtil { /** Lock the given bucket in the file stor handler. */ struct LockResult { std::shared_ptr<FileStorHandler::BucketLockInterface> lock; - uint16_t disk; + LockResult() : lock() {} - LockResult() : lock(), disk(0) {} - - bool bucketExisted() const { return (lock.get() != 0); } + bool bucketExisted() const { return bool(lock); } }; LockResult lockAndGetDisk( const document::Bucket &bucket, StorBucketDatabase::Flag flags = StorBucketDatabase::NONE); - api::BucketInfo getBucketInfo(const document::Bucket &bucket, int disk = -1) const; + api::BucketInfo getBucketInfo(const document::Bucket &bucket) const; api::BucketInfo convertBucketInfo(const spi::BucketInfo&) const; diff --git a/storage/src/vespa/storage/persistence/processallhandler.cpp b/storage/src/vespa/storage/persistence/processallhandler.cpp index cbc5c0fc6dd..15f7c1fffb7 100644 --- a/storage/src/vespa/storage/persistence/processallhandler.cpp +++ b/storage/src/vespa/storage/persistence/processallhandler.cpp @@ -88,7 +88,6 @@ ProcessAllHandler::handleRemoveLocation(api::RemoveLocationCommand& cmd, Message cmd.getBucketId().toString().c_str(), cmd.getDocumentSelection().c_str()); - assert(_env._partition == 0u); spi::Bucket bucket(cmd.getBucket()); UnrevertableRemoveEntryProcessor processor(_spi, bucket, tracker->context()); BucketProcessor::iterateAll(_spi, bucket, cmd.getDocumentSelection(), @@ -104,11 +103,8 @@ ProcessAllHandler::handleStatBucket(api::StatBucketCommand& cmd, MessageTracker: { tracker->setMetric(_env._metrics.statBucket[cmd.getLoadType()]); std::ostringstream ost; + ost << "Persistence bucket " << cmd.getBucketId() << "\n"; - ost << "Persistence bucket " << cmd.getBucketId() - << ", partition " << _env._partition << "\n"; - - assert(_env._partition == 0u); spi::Bucket bucket(cmd.getBucket()); StatEntryProcessor processor(ost); BucketProcessor::iterateAll(_spi, bucket, cmd.getDocumentSelection(), diff --git a/storage/src/vespa/storage/persistence/testandsethelper.cpp b/storage/src/vespa/storage/persistence/testandsethelper.cpp index ea3ad3c6cb3..523f5a52885 100644 --- a/storage/src/vespa/storage/persistence/testandsethelper.cpp +++ b/storage/src/vespa/storage/persistence/testandsethelper.cpp @@ -71,8 +71,8 @@ TestAndSetHelper::retrieveAndMatch(spi::Context & context) { auto docPtr = result.getDocumentPtr(); if (_docSelectionUp->contains(*docPtr) != document::select::Result::True) { return api::ReturnCode(api::ReturnCode::TEST_AND_SET_CONDITION_FAILED, - vespalib::make_string("Condition did not match document partition=%d, nodeIndex=%d bucket=%" PRIx64 " %s", - _thread._env._partition, _thread._env._nodeIndex, _cmd.getBucketId().getRawId(), + vespalib::make_string("Condition did not match document nodeIndex=%d bucket=%" PRIx64 " %s", + _thread._env._nodeIndex, _cmd.getBucketId().getRawId(), _cmd.hasBeenRemapped() ? "remapped" : "")); } @@ -83,8 +83,8 @@ TestAndSetHelper::retrieveAndMatch(spi::Context & context) { } return api::ReturnCode(api::ReturnCode::TEST_AND_SET_CONDITION_FAILED, - vespalib::make_string("Document does not exist partition=%d, nodeIndex=%d bucket=%" PRIx64 " %s", - _thread._env._partition, _thread._env._nodeIndex, _cmd.getBucketId().getRawId(), + vespalib::make_string("Document does not exist nodeIndex=%d bucket=%" PRIx64 " %s", + _thread._env._nodeIndex, _cmd.getBucketId().getRawId(), _cmd.hasBeenRemapped() ? "remapped" : "")); } |