summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-10-14 23:07:22 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2020-10-15 04:59:11 +0000
commit30d76ff5080f3f911d9119125202d7bad0a2a9da (patch)
tree9123279b9637e8cbab0b116428892e8990215d48
parentf07e7cde693a73d99d6d3d27dc3aa65e44d1958b (diff)
GC disk related code.
-rw-r--r--storage/src/tests/bucketdb/bucketmanagertest.cpp9
-rw-r--r--storage/src/tests/bucketdb/initializertest.cpp14
-rw-r--r--storage/src/tests/common/metricstest.cpp3
-rw-r--r--storage/src/tests/common/teststorageapp.cpp3
-rw-r--r--storage/src/tests/common/teststorageapp.h5
-rw-r--r--storage/src/tests/persistence/common/filestortestfixture.cpp4
-rw-r--r--storage/src/tests/persistence/filestorage/deactivatebucketstest.cpp1
-rw-r--r--storage/src/tests/persistence/filestorage/filestormanagertest.cpp128
-rw-r--r--storage/src/tests/persistence/filestorage/modifiedbucketcheckertest.cpp3
-rw-r--r--storage/src/tests/persistence/filestorage/sanitycheckeddeletetest.cpp1
-rw-r--r--storage/src/tests/persistence/mergehandlertest.cpp1
-rw-r--r--storage/src/tests/persistence/persistencequeuetest.cpp46
-rw-r--r--storage/src/tests/persistence/persistencetestutils.cpp166
-rw-r--r--storage/src/tests/persistence/persistencetestutils.h57
-rw-r--r--storage/src/tests/persistence/persistencethread_splittest.cpp2
-rw-r--r--storage/src/tests/persistence/processalltest.cpp14
-rw-r--r--storage/src/tests/persistence/testandsettest.cpp2
-rw-r--r--storage/src/tests/storageserver/bouncertest.cpp2
-rw-r--r--storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp1
-rw-r--r--storage/src/tests/storageserver/mergethrottlertest.cpp8
-rw-r--r--storage/src/tests/storageserver/statemanagertest.cpp2
-rw-r--r--storage/src/tests/storageserver/statereportertest.cpp2
-rw-r--r--storage/src/tests/visiting/visitormanagertest.cpp1
-rw-r--r--storage/src/vespa/storage/bucketdb/bucketmanager.cpp12
-rw-r--r--storage/src/vespa/storage/bucketdb/storagebucketdbinitializer.cpp13
-rw-r--r--storage/src/vespa/storage/bucketdb/storagebucketinfo.h5
-rw-r--r--storage/src/vespa/storage/bucketdb/storbucketdb.cpp11
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp44
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandler.h37
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp206
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h29
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp151
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormanager.h5
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.cpp5
-rw-r--r--storage/src/vespa/storage/persistence/persistencethread.cpp43
-rw-r--r--storage/src/vespa/storage/persistence/persistencethread.h2
-rw-r--r--storage/src/vespa/storage/persistence/persistenceutil.cpp27
-rw-r--r--storage/src/vespa/storage/persistence/persistenceutil.h10
-rw-r--r--storage/src/vespa/storage/persistence/processallhandler.cpp6
-rw-r--r--storage/src/vespa/storage/persistence/testandsethelper.cpp8
40 files changed, 403 insertions, 686 deletions
diff --git a/storage/src/tests/bucketdb/bucketmanagertest.cpp b/storage/src/tests/bucketdb/bucketmanagertest.cpp
index 51b477a2980..7c241f63a39 100644
--- a/storage/src/tests/bucketdb/bucketmanagertest.cpp
+++ b/storage/src/tests/bucketdb/bucketmanagertest.cpp
@@ -150,8 +150,7 @@ void BucketManagerTest::setupTestEnvironment(bool fakePersistenceLayer,
*ConfigGetter<DocumenttypesConfig>::getConfig(
"config-doctypes", FileSpec("../config-doctypes.cfg")));
_top = std::make_unique<DummyStorageLink>();
- _node = std::make_unique<TestServiceLayerApp>(
- DiskCount(1), NodeIndex(0), config.getConfigId());
+ _node = std::make_unique<TestServiceLayerApp>(NodeIndex(0), config.getConfigId());
_node->setTypeRepo(repo);
_node->setupDummyPersistence();
// Set up the 3 links
@@ -204,7 +203,6 @@ void BucketManagerTest::addBucketsToDB(uint32_t count)
++_emptyBuckets;
for (const auto& bi : _bucketInfo) {
bucketdb::StorageBucketInfo entry;
- entry.disk = bi.second.partition;
entry.setBucketInfo(api::BucketInfo(bi.second.crc,
bi.second.count,
bi.second.size));
@@ -224,7 +222,6 @@ BucketManagerTest::wasBlockedDueToLastModified(api::StorageMessage* msg,
{
bucketdb::StorageBucketInfo entry;
entry.setBucketInfo(info);
- entry.disk = 0;
_node->getStorageBucketDatabase().insert(id, entry, "foo");
}
@@ -438,7 +435,6 @@ TEST_F(BucketManagerTest, metrics_generation) {
// Add 3 buckets; 2 ready, 1 active. 300 docs total, 600 bytes total.
for (int i = 0; i < 3; ++i) {
bucketdb::StorageBucketInfo entry;
- entry.disk = 0;
api::BucketInfo info(50, 100, 200);
if (i > 0) {
info.setReady();
@@ -483,7 +479,6 @@ TEST_F(BucketManagerTest, metrics_are_tracked_per_bucket_space) {
auto& repo = _node->getComponentRegister().getBucketSpaceRepo();
{
bucketdb::StorageBucketInfo entry;
- entry.disk = 0;
api::BucketInfo info(50, 100, 200);
info.setReady(true);
entry.setBucketInfo(info);
@@ -492,7 +487,6 @@ TEST_F(BucketManagerTest, metrics_are_tracked_per_bucket_space) {
}
{
bucketdb::StorageBucketInfo entry;
- entry.disk = 0;
api::BucketInfo info(60, 150, 300);
info.setActive(true);
entry.setBucketInfo(info);
@@ -530,7 +524,6 @@ BucketManagerTest::insertSingleBucket(const document::BucketId& bucket,
const api::BucketInfo& info)
{
bucketdb::StorageBucketInfo entry;
- entry.disk = 0;
entry.setBucketInfo(info);
_node->getStorageBucketDatabase().insert(bucket, entry, "foo");
}
diff --git a/storage/src/tests/bucketdb/initializertest.cpp b/storage/src/tests/bucketdb/initializertest.cpp
index c11e067731e..c5d30204def 100644
--- a/storage/src/tests/bucketdb/initializertest.cpp
+++ b/storage/src/tests/bucketdb/initializertest.cpp
@@ -132,11 +132,10 @@ struct BucketInfoLogger {
StorBucketDatabase::Decision operator()(
uint64_t revBucket, const StorBucketDatabase::Entry& entry)
{
- document::BucketId bucket(
- document::BucketId::keyToBucketId(revBucket));
+ document::BucketId bucket(document::BucketId::keyToBucketId(revBucket));
assert(bucket.getRawId() != 0);
assert(entry.getBucketInfo().valid());
- DiskData& ddata(map[entry.disk]);
+ DiskData& ddata(map[0]);
BucketData& bdata(ddata[bucket]);
bdata.info = entry.getBucketInfo();
return StorBucketDatabase::Decision::CONTINUE;
@@ -356,8 +355,7 @@ struct FakePersistenceLayer : public StorageLink {
+ " did not exist in bucket database but we got "
+ "read bucket info request for it.");
} else {
- const BucketData* bucketData(getBucketData(
- entry->disk, rbi.getBucketId(), "readbucketinfo"));
+ const BucketData* bucketData(getBucketData(0, rbi.getBucketId(), "readbucketinfo"));
if (bucketData != 0) {
entry->setBucketInfo(bucketData->info);
entry.write();
@@ -412,8 +410,7 @@ InitializerTest::do_test_initialization(InitParams& params)
std::map<PartitionId, DiskData> data(buildBucketInfo(_docMan, params));
assert(params.diskCount == 1u);
- TestServiceLayerApp node(params.diskCount, params.nodeIndex,
- params.getConfig().getConfigId());
+ TestServiceLayerApp node(params.nodeIndex, params.getConfig().getConfigId());
DummyStorageLink top;
StorageBucketDBInitializer* initializer;
FakePersistenceLayer* bottom;
@@ -534,7 +531,6 @@ struct DatabaseInsertCallback : MessageCallback
d.info = api::BucketInfo(3+i, 4+i, 5+i, 6+i, 7+i);
}
_data[bid] = d;
- entry->disk = 0;
entry->setBucketInfo(d.info);
entry.write();
}
@@ -555,7 +551,7 @@ TEST_F(InitializerTest, buckets_initialized_by_load) {
std::map<PartitionId, DiskData> data(buildBucketInfo(_docMan, params));
assert(params.diskCount == 1u);
- TestServiceLayerApp node(params.diskCount, params.nodeIndex,
+ TestServiceLayerApp node(params.nodeIndex,
params.getConfig().getConfigId());
DummyStorageLink top;
StorageBucketDBInitializer* initializer;
diff --git a/storage/src/tests/common/metricstest.cpp b/storage/src/tests/common/metricstest.cpp
index 59d8fa4b2d3..c0ccb5bc771 100644
--- a/storage/src/tests/common/metricstest.cpp
+++ b/storage/src/tests/common/metricstest.cpp
@@ -71,8 +71,7 @@ void MetricsTest::SetUp() {
_config = std::make_unique<vdstestlib::DirConfig>(getStandardConfig(true, "metricstest"));
assert(system(("rm -rf " + getRootFolder(*_config)).c_str()) == 0);
try {
- _node = std::make_unique<TestServiceLayerApp>(
- DiskCount(1), NodeIndex(0), _config->getConfigId());
+ _node = std::make_unique<TestServiceLayerApp>(NodeIndex(0), _config->getConfigId());
_node->setupDummyPersistence();
_clock = &_node->getClock();
_clock->setAbsoluteTimeInSeconds(1000000);
diff --git a/storage/src/tests/common/teststorageapp.cpp b/storage/src/tests/common/teststorageapp.cpp
index 992d6a54d91..1c69e87e38e 100644
--- a/storage/src/tests/common/teststorageapp.cpp
+++ b/storage/src/tests/common/teststorageapp.cpp
@@ -147,14 +147,13 @@ TestServiceLayerApp::TestServiceLayerApp(vespalib::stringref configId)
_nodeStateUpdater.setReportedNodeState(ns);
}
-TestServiceLayerApp::TestServiceLayerApp(DiskCount dc, NodeIndex index,
+TestServiceLayerApp::TestServiceLayerApp(NodeIndex index,
vespalib::stringref configId)
: TestStorageApp(std::make_unique<ServiceLayerComponentRegisterImpl>(true), // TODO remove B-tree flag once default
lib::NodeType::STORAGE, index, configId),
_compReg(dynamic_cast<ServiceLayerComponentRegisterImpl&>(TestStorageApp::getComponentRegister())),
_persistenceProvider()
{
- assert(dc == 1u);
_compReg.setDiskCount(1);
lib::NodeState ns(*_nodeStateUpdater.getReportedNodeState());
ns.setDiskCount(1);
diff --git a/storage/src/tests/common/teststorageapp.h b/storage/src/tests/common/teststorageapp.h
index f7edf5e0678..37c9e3c7ffe 100644
--- a/storage/src/tests/common/teststorageapp.h
+++ b/storage/src/tests/common/teststorageapp.h
@@ -112,9 +112,8 @@ class TestServiceLayerApp : public TestStorageApp
PersistenceProviderUP _persistenceProvider;
public:
- TestServiceLayerApp(vespalib::stringref configId = "");
- TestServiceLayerApp(DiskCount diskCount, NodeIndex = NodeIndex(0xffff),
- vespalib::stringref configId = "");
+ TestServiceLayerApp(vespalib::stringref configId);
+ TestServiceLayerApp(NodeIndex = NodeIndex(0xffff), vespalib::stringref configId = "");
~TestServiceLayerApp();
void setupDummyPersistence();
diff --git a/storage/src/tests/persistence/common/filestortestfixture.cpp b/storage/src/tests/persistence/common/filestortestfixture.cpp
index 1282bcf85c3..49dbf082e34 100644
--- a/storage/src/tests/persistence/common/filestortestfixture.cpp
+++ b/storage/src/tests/persistence/common/filestortestfixture.cpp
@@ -28,8 +28,7 @@ FileStorTestFixture::setupPersistenceThreads(uint32_t threads)
_config->getConfig("stor-server").set("node_index", "1");
_config->getConfig("stor-filestor").set("num_threads", std::to_string(threads));
- _node = std::make_unique<TestServiceLayerApp>(
- DiskCount(1), NodeIndex(1), _config->getConfigId());
+ _node = std::make_unique<TestServiceLayerApp>(NodeIndex(1), _config->getConfigId());
_testdoctype1 = _node->getTypeRepo()->getDocumentType("testdoctype1");
}
@@ -60,7 +59,6 @@ FileStorTestFixture::createBucket(const document::BucketId& bid)
StorBucketDatabase::WrappedEntry entry(
_node->getStorageBucketDatabase().get(bid, "foo",
StorBucketDatabase::CREATE_IF_NONEXISTING));
- entry->disk = 0;
entry->info = api::BucketInfo(0, 0, 0, 0, 0, true, false);
entry.write();
}
diff --git a/storage/src/tests/persistence/filestorage/deactivatebucketstest.cpp b/storage/src/tests/persistence/filestorage/deactivatebucketstest.cpp
index 18f8a235453..0fe18335c23 100644
--- a/storage/src/tests/persistence/filestorage/deactivatebucketstest.cpp
+++ b/storage/src/tests/persistence/filestorage/deactivatebucketstest.cpp
@@ -40,7 +40,6 @@ TEST_F(DeactivateBucketsTest, buckets_in_database_deactivated_when_node_down_in_
StorBucketDatabase::WrappedEntry entry(
_node->getStorageBucketDatabase().get(bucket, "foo",
StorBucketDatabase::CREATE_IF_NONEXISTING));
- entry->disk = 0;
entry->info = serviceLayerInfo;
entry.write();
}
diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
index b7165312785..3525563eb7a 100644
--- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
+++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
@@ -82,7 +82,6 @@ struct FileStorManagerTest : Test{
StorBucketDatabase::WrappedEntry entry(
_node->getStorageBucketDatabase().get(bid, "foo", StorBucketDatabase::CREATE_IF_NONEXISTING));
- entry->disk = disk;
entry->info = api::BucketInfo(0, 0, 0, 0, 0, true, false);
entry.write();
}
@@ -123,7 +122,7 @@ struct FileStorManagerTest : Test{
new lib::ClusterState(state)));
}
- void setupDisks(uint32_t diskCount) {
+ void setupDisks() {
std::string rootOfRoot = "filestormanagertest";
config.reset(new vdstestlib::DirConfig(getStandardConfig(true, rootOfRoot)));
@@ -144,8 +143,7 @@ struct FileStorManagerTest : Test{
assert(system(vespalib::make_string("mkdir -p %s/disks/d0", getRootFolder(*config).c_str()).c_str()) == 0);
assert(system(vespalib::make_string("mkdir -p %s/disks/d0", getRootFolder(*config2).c_str()).c_str()) == 0);
try {
- _node.reset(new TestServiceLayerApp(DiskCount(diskCount), NodeIndex(0),
- config->getConfigId()));
+ _node = std::make_unique<TestServiceLayerApp>(NodeIndex(0), config->getConfigId());
_node->setupDummyPersistence();
} catch (config::InvalidConfigException& e) {
fprintf(stderr, "%s\n", e.what());
@@ -198,12 +196,11 @@ std::unique_ptr<DiskThread> createThread(vdstestlib::DirConfig& config,
TestServiceLayerApp& node,
spi::PersistenceProvider& provider,
FileStorHandler& filestorHandler,
- FileStorThreadMetrics& metrics,
- uint16_t deviceIndex)
+ FileStorThreadMetrics& metrics)
{
(void) config;
return std::make_unique<PersistenceThread>(nullptr,node.getComponentRegister(), config.getConfigId(),
- provider, filestorHandler, metrics, deviceIndex);
+ provider, filestorHandler, metrics);
}
namespace {
@@ -227,7 +224,7 @@ struct TestFileStorComponents {
void
FileStorManagerTest::SetUp()
{
- setupDisks(1);
+ setupDisks();
}
void
@@ -399,7 +396,7 @@ TEST_F(FileStorManagerTest, handler_priority) {
FileStorHandler filestorHandler(messageSender, metrics, _node->getComponentRegister());
filestorHandler.setGetNextMessageTimeout(50ms);
- uint32_t stripeId = filestorHandler.getNextStripeId(0);
+ uint32_t stripeId = filestorHandler.getNextStripeId();
ASSERT_EQ(0u, stripeId);
std::string content("Here is some content which is in all documents");
@@ -416,14 +413,14 @@ TEST_F(FileStorManagerTest, handler_priority) {
auto address = std::make_shared<api::StorageMessageAddress>("storage", lib::NodeType::STORAGE, 3);
cmd->setAddress(*address);
cmd->setPriority(i * 15);
- filestorHandler.schedule(cmd, 0);
+ filestorHandler.schedule(cmd);
}
- ASSERT_EQ(15, filestorHandler.getNextMessage(0, stripeId).second->getPriority());
- ASSERT_EQ(30, filestorHandler.getNextMessage(0, stripeId).second->getPriority());
- ASSERT_EQ(45, filestorHandler.getNextMessage(0, stripeId).second->getPriority());
- ASSERT_EQ(60, filestorHandler.getNextMessage(0, stripeId).second->getPriority());
- ASSERT_EQ(75, filestorHandler.getNextMessage(0, stripeId).second->getPriority());
+ ASSERT_EQ(15, filestorHandler.getNextMessage(stripeId).second->getPriority());
+ ASSERT_EQ(30, filestorHandler.getNextMessage(stripeId).second->getPriority());
+ ASSERT_EQ(45, filestorHandler.getNextMessage(stripeId).second->getPriority());
+ ASSERT_EQ(60, filestorHandler.getNextMessage(stripeId).second->getPriority());
+ ASSERT_EQ(75, filestorHandler.getNextMessage(stripeId).second->getPriority());
}
class MessagePusherThread : public document::Runnable {
@@ -442,7 +439,7 @@ public:
document::BucketId bucket(16, factory.getBucketId(_doc->getId()).getRawId());
auto cmd = std::make_shared<api::PutCommand>(makeDocumentBucket(bucket), _doc, 100);
- _handler.schedule(cmd, 0);
+ _handler.schedule(cmd);
std::this_thread::sleep_for(1ms);
}
@@ -466,13 +463,13 @@ public:
std::atomic<bool> _threadDone;
explicit MessageFetchingThread(FileStorHandler& handler)
- : _threadId(handler.getNextStripeId(0)), _handler(handler), _config(0), _fetchedCount(0), _done(false),
+ : _threadId(handler.getNextStripeId()), _handler(handler), _config(0), _fetchedCount(0), _done(false),
_failed(false), _threadDone(false)
{}
void run() override {
while (!_done) {
- FileStorHandler::LockedMessage msg = _handler.getNextMessage(0, _threadId);
+ FileStorHandler::LockedMessage msg = _handler.getNextMessage(_threadId);
if (msg.second.get()) {
uint32_t originalConfig = _config.load();
_fetchedCount++;
@@ -553,7 +550,7 @@ TEST_F(FileStorManagerTest, handler_pause) {
FileStorHandler filestorHandler(messageSender, metrics, _node->getComponentRegister());
filestorHandler.setGetNextMessageTimeout(50ms);
- uint32_t stripeId = filestorHandler.getNextStripeId(0);
+ uint32_t stripeId = filestorHandler.getNextStripeId();
std::string content("Here is some content which is in all documents");
std::ostringstream uri;
@@ -569,18 +566,18 @@ TEST_F(FileStorManagerTest, handler_pause) {
auto address = std::make_unique<api::StorageMessageAddress>("storage", lib::NodeType::STORAGE, 3);
cmd->setAddress(*address);
cmd->setPriority(i * 15);
- filestorHandler.schedule(cmd, 0);
+ filestorHandler.schedule(cmd);
}
- ASSERT_EQ(15, filestorHandler.getNextMessage(0, stripeId).second->getPriority());
+ ASSERT_EQ(15, filestorHandler.getNextMessage(stripeId).second->getPriority());
{
ResumeGuard guard = filestorHandler.pause();
(void)guard;
- ASSERT_EQ(filestorHandler.getNextMessage(0, stripeId).second.get(), nullptr);
+ ASSERT_EQ(filestorHandler.getNextMessage(stripeId).second.get(), nullptr);
}
- ASSERT_EQ(30, filestorHandler.getNextMessage(0, stripeId).second->getPriority());
+ ASSERT_EQ(30, filestorHandler.getNextMessage(stripeId).second->getPriority());
}
TEST_F(FileStorManagerTest, remap_split) {
@@ -612,8 +609,8 @@ TEST_F(FileStorManagerTest, remap_split) {
// Populate bucket with the given data
for (uint32_t i = 1; i < 4; i++) {
- filestorHandler.schedule(std::make_shared<api::PutCommand>(makeDocumentBucket(bucket1), doc1, i), 0);
- filestorHandler.schedule(std::make_shared<api::PutCommand>(makeDocumentBucket(bucket2), doc2, i + 10), 0);
+ filestorHandler.schedule(std::make_shared<api::PutCommand>(makeDocumentBucket(bucket1), doc1, i));
+ filestorHandler.schedule(std::make_shared<api::PutCommand>(makeDocumentBucket(bucket2), doc2, i + 10));
}
EXPECT_EQ("BucketId(0x40000000000004d2): Put(BucketId(0x40000000000004d2), id:footype:testdoctype1:n=1234:bar, timestamp 1, size 118) (priority: 127)\n"
@@ -622,11 +619,11 @@ TEST_F(FileStorManagerTest, remap_split) {
"BucketId(0x40000000000011d7): Put(BucketId(0x40000000000011d7), id:footype:testdoctype1:n=4567:bar, timestamp 12, size 118) (priority: 127)\n"
"BucketId(0x40000000000004d2): Put(BucketId(0x40000000000004d2), id:footype:testdoctype1:n=1234:bar, timestamp 3, size 118) (priority: 127)\n"
"BucketId(0x40000000000011d7): Put(BucketId(0x40000000000011d7), id:footype:testdoctype1:n=4567:bar, timestamp 13, size 118) (priority: 127)\n",
- filestorHandler.dumpQueue(0));
+ filestorHandler.dumpQueue());
- FileStorHandler::RemapInfo a(makeDocumentBucket(document::BucketId(17, 1234)), 0);
- FileStorHandler::RemapInfo b(makeDocumentBucket(document::BucketId(17, 1234 | 1 << 16)), 0);
- filestorHandler.remapQueueAfterSplit(FileStorHandler::RemapInfo(makeDocumentBucket(bucket1), 0), a, b);
+ FileStorHandler::RemapInfo a(makeDocumentBucket(document::BucketId(17, 1234)));
+ FileStorHandler::RemapInfo b(makeDocumentBucket(document::BucketId(17, 1234 | 1 << 16)));
+ filestorHandler.remapQueueAfterSplit(FileStorHandler::RemapInfo(makeDocumentBucket(bucket1)), a, b);
ASSERT_TRUE(a.foundInQueue);
ASSERT_FALSE(b.foundInQueue);
@@ -637,7 +634,7 @@ TEST_F(FileStorManagerTest, remap_split) {
"BucketId(0x44000000000004d2): Put(BucketId(0x44000000000004d2), id:footype:testdoctype1:n=1234:bar, timestamp 1, size 118) (priority: 127)\n"
"BucketId(0x44000000000004d2): Put(BucketId(0x44000000000004d2), id:footype:testdoctype1:n=1234:bar, timestamp 2, size 118) (priority: 127)\n"
"BucketId(0x44000000000004d2): Put(BucketId(0x44000000000004d2), id:footype:testdoctype1:n=1234:bar, timestamp 3, size 118) (priority: 127)\n",
- filestorHandler.dumpQueue(0));
+ filestorHandler.dumpQueue());
}
TEST_F(FileStorManagerTest, handler_timeout) {
@@ -657,7 +654,7 @@ TEST_F(FileStorManagerTest, handler_timeout) {
FileStorHandler filestorHandler(messageSender, metrics, _node->getComponentRegister());
filestorHandler.setGetNextMessageTimeout(50ms);
- uint32_t stripeId = filestorHandler.getNextStripeId(0);
+ uint32_t stripeId = filestorHandler.getNextStripeId();
std::string content("Here is some content which is in all documents");
std::ostringstream uri;
@@ -674,7 +671,7 @@ TEST_F(FileStorManagerTest, handler_timeout) {
cmd->setAddress(*address);
cmd->setPriority(0);
cmd->setTimeout(50ms);
- filestorHandler.schedule(cmd, 0);
+ filestorHandler.schedule(cmd);
}
{
@@ -683,12 +680,12 @@ TEST_F(FileStorManagerTest, handler_timeout) {
cmd->setAddress(*address);
cmd->setPriority(200);
cmd->setTimeout(10000ms);
- filestorHandler.schedule(cmd, 0);
+ filestorHandler.schedule(cmd);
}
std::this_thread::sleep_for(51ms);
for (;;) {
- auto lock = filestorHandler.getNextMessage(0, stripeId);
+ auto lock = filestorHandler.getNextMessage(stripeId);
if (lock.first.get()) {
ASSERT_EQ(200, lock.second->getPriority());
break;
@@ -718,10 +715,10 @@ TEST_F(FileStorManagerTest, priority) {
FileStorHandler filestorHandler(messageSender, metrics, _node->getComponentRegister());
std::unique_ptr<DiskThread> thread(createThread(
*config, *_node, _node->getPersistenceProvider(),
- filestorHandler, *metrics.disks[0]->threads[0], 0));
+ filestorHandler, *metrics.disks[0]->threads[0]));
std::unique_ptr<DiskThread> thread2(createThread(
*config, *_node, _node->getPersistenceProvider(),
- filestorHandler, *metrics.disks[0]->threads[1], 0));
+ filestorHandler, *metrics.disks[0]->threads[1]));
// Creating documents to test with. Different gids, 2 locations.
std::vector<document::Document::SP > documents;
@@ -753,7 +750,7 @@ TEST_F(FileStorManagerTest, priority) {
auto address = std::make_unique<api::StorageMessageAddress>("storage", lib::NodeType::STORAGE, 3);
cmd->setAddress(*address);
cmd->setPriority(i * 2);
- filestorHandler.schedule(cmd, 0);
+ filestorHandler.schedule(cmd);
}
filestorHandler.flush(true);
@@ -798,7 +795,7 @@ TEST_F(FileStorManagerTest, split1) {
FileStorHandler filestorHandler(messageSender, metrics, _node->getComponentRegister());
std::unique_ptr<DiskThread> thread(createThread(
*config, *_node, _node->getPersistenceProvider(),
- filestorHandler, *metrics.disks[0]->threads[0], 0));
+ filestorHandler, *metrics.disks[0]->threads[0]));
// Creating documents to test with. Different gids, 2 locations.
std::vector<document::Document::SP > documents;
for (uint32_t i=0; i<20; ++i) {
@@ -828,7 +825,7 @@ TEST_F(FileStorManagerTest, split1) {
cmd->setAddress(*address);
cmd->setSourceIndex(0);
- filestorHandler.schedule(cmd, 0);
+ filestorHandler.schedule(cmd);
filestorHandler.flush(true);
LOG(debug, "Got %zu replies", top.getNumReplies());
ASSERT_EQ(1, top.getNumReplies());
@@ -842,7 +839,7 @@ TEST_F(FileStorManagerTest, split1) {
auto rcmd = std::make_shared<api::RemoveCommand>(
makeDocumentBucket(bucket), documents[i]->getId(), 1000000 + 100 + i);
rcmd->setAddress(*address);
- filestorHandler.schedule(rcmd, 0);
+ filestorHandler.schedule(rcmd);
filestorHandler.flush(true);
ASSERT_EQ(1, top.getNumReplies());
auto rreply = std::dynamic_pointer_cast<api::RemoveReply>(top.getReply(0));
@@ -856,7 +853,7 @@ TEST_F(FileStorManagerTest, split1) {
{
auto cmd = std::make_shared<api::SplitBucketCommand>(makeDocumentBucket(document::BucketId(16, 1)));
cmd->setSourceIndex(0);
- filestorHandler.schedule(cmd, 0);
+ filestorHandler.schedule(cmd);
filestorHandler.flush(true);
ASSERT_EQ(1, top.getNumReplies());
auto reply = std::dynamic_pointer_cast<api::SplitBucketReply>(top.getReply(0));
@@ -873,7 +870,7 @@ TEST_F(FileStorManagerTest, split1) {
makeDocumentBucket(bucket), documents[i]->getId(), document::AllFields::NAME);
api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 3);
cmd->setAddress(address);
- filestorHandler.schedule(cmd, 0);
+ filestorHandler.schedule(cmd);
filestorHandler.flush(true);
ASSERT_EQ(1, top.getNumReplies());
auto reply = std::dynamic_pointer_cast<api::GetReply>(top.getReply(0));
@@ -887,7 +884,7 @@ TEST_F(FileStorManagerTest, split1) {
auto cmd = std::make_shared<api::SplitBucketCommand>(
makeDocumentBucket(document::BucketId(i, 0x0100001)));
cmd->setSourceIndex(0);
- filestorHandler.schedule(cmd, 0);
+ filestorHandler.schedule(cmd);
filestorHandler.flush(true);
ASSERT_EQ(1, top.getNumReplies());
auto reply = std::dynamic_pointer_cast<api::SplitBucketReply>(top.getReply(0));
@@ -909,7 +906,7 @@ TEST_F(FileStorManagerTest, split1) {
makeDocumentBucket(bucket), documents[i]->getId(), document::AllFields::NAME);
api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 3);
cmd->setAddress(address);
- filestorHandler.schedule(cmd, 0);
+ filestorHandler.schedule(cmd);
filestorHandler.flush(true);
ASSERT_EQ(1, top.getNumReplies());
auto reply = std::dynamic_pointer_cast<api::GetReply>(top.getReply(0));
@@ -945,7 +942,7 @@ TEST_F(FileStorManagerTest, split_single_group) {
std::unique_ptr<DiskThread> thread(createThread(
*config, *_node, _node->getPersistenceProvider(),
- filestorHandler, *metrics.disks[0]->threads[0], 0));
+ filestorHandler, *metrics.disks[0]->threads[0]));
// Creating documents to test with. Different gids, 2 locations.
std::vector<document::Document::SP> documents;
for (uint32_t i=0; i<20; ++i) {
@@ -968,7 +965,7 @@ TEST_F(FileStorManagerTest, split_single_group) {
auto cmd = std::make_shared<api::PutCommand>(makeDocumentBucket(bucket), documents[i], 100 + i);
api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 3);
cmd->setAddress(address);
- filestorHandler.schedule(cmd, 0);
+ filestorHandler.schedule(cmd);
filestorHandler.flush(true);
ASSERT_EQ(1, top.getNumReplies());
auto reply = std::dynamic_pointer_cast<api::PutReply>(top.getReply(0));
@@ -980,7 +977,7 @@ TEST_F(FileStorManagerTest, split_single_group) {
{
auto cmd = std::make_shared<api::SplitBucketCommand>(makeDocumentBucket(document::BucketId(16, 1)));
cmd->setSourceIndex(0);
- filestorHandler.schedule(cmd, 0);
+ filestorHandler.schedule(cmd);
filestorHandler.flush(true);
ASSERT_EQ(1, top.getNumReplies());
auto reply = std::dynamic_pointer_cast<api::SplitBucketReply>(top.getReply(0));
@@ -996,7 +993,7 @@ TEST_F(FileStorManagerTest, split_single_group) {
(makeDocumentBucket(bucket), documents[i]->getId(), document::AllFields::NAME);
api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 3);
cmd->setAddress(address);
- filestorHandler.schedule(cmd, 0);
+ filestorHandler.schedule(cmd);
filestorHandler.flush(true);
ASSERT_EQ(1, top.getNumReplies());
auto reply = std::dynamic_pointer_cast<api::GetReply>(top.getReply(0));
@@ -1023,19 +1020,15 @@ FileStorManagerTest::putDoc(DummyStorageLink& top,
document::DocumentId docId(vespalib::make_string("id:ns:testdoctype1:n=%" PRIu64 ":%d", target.getId(), docNum));
document::BucketId bucket(16, factory.getBucketId(docId).getRawId());
//std::cerr << "doc bucket is " << bucket << " vs source " << source << "\n";
- _node->getPersistenceProvider().createBucket(
- makeSpiBucket(target), context);
+ _node->getPersistenceProvider().createBucket(makeSpiBucket(target), context);
Document::SP doc(new Document(*_testdoctype1, docId));
- std::shared_ptr<api::PutCommand> cmd(
- new api::PutCommand(makeDocumentBucket(target), doc, docNum+1));
+ auto cmd = std::make_shared<api::PutCommand>(makeDocumentBucket(target), doc, docNum+1);
cmd->setAddress(address);
cmd->setPriority(120);
- filestorHandler.schedule(cmd, 0);
+ filestorHandler.schedule(cmd);
filestorHandler.flush(true);
ASSERT_EQ(1, top.getNumReplies());
- std::shared_ptr<api::PutReply> reply(
- std::dynamic_pointer_cast<api::PutReply>(
- top.getReply(0)));
+ std::shared_ptr<api::PutReply> reply(std::dynamic_pointer_cast<api::PutReply>(top.getReply(0)));
ASSERT_TRUE(reply.get());
ASSERT_EQ(ReturnCode(ReturnCode::OK), reply->getResult());
top.reset();
@@ -1044,8 +1037,7 @@ FileStorManagerTest::putDoc(DummyStorageLink& top,
TEST_F(FileStorManagerTest, split_empty_target_with_remapped_ops) {
DummyStorageLink top;
DummyStorageLink *dummyManager;
- top.push_back(std::unique_ptr<StorageLink>(
- dummyManager = new DummyStorageLink));
+ top.push_back(std::unique_ptr<StorageLink>(dummyManager = new DummyStorageLink));
setClusterState("storage:2 distributor:1");
top.open();
ForwardingMessageSender messageSender(*dummyManager);
@@ -1055,7 +1047,7 @@ TEST_F(FileStorManagerTest, split_empty_target_with_remapped_ops) {
FileStorHandler filestorHandler(messageSender, metrics, _node->getComponentRegister());
std::unique_ptr<DiskThread> thread(createThread(
*config, *_node, _node->getPersistenceProvider(),
- filestorHandler, *metrics.disks[0]->threads[0], 0));
+ filestorHandler, *metrics.disks[0]->threads[0]));
document::BucketId source(16, 0x10001);
@@ -1084,8 +1076,8 @@ TEST_F(FileStorManagerTest, split_empty_target_with_remapped_ops) {
putCmd->setAddress(address);
putCmd->setPriority(120);
- filestorHandler.schedule(splitCmd, 0);
- filestorHandler.schedule(putCmd, 0);
+ filestorHandler.schedule(splitCmd);
+ filestorHandler.schedule(putCmd);
resumeGuard.reset(); // Unpause
filestorHandler.flush(true);
@@ -1120,7 +1112,7 @@ TEST_F(FileStorManagerTest, notify_on_split_source_ownership_changed) {
FileStorHandler filestorHandler(messageSender, metrics, _node->getComponentRegister());
std::unique_ptr<DiskThread> thread(createThread(
*config, *_node, _node->getPersistenceProvider(),
- filestorHandler, *metrics.disks[0]->threads[0], 0));
+ filestorHandler, *metrics.disks[0]->threads[0]));
document::BucketId source(getFirstBucketNotOwnedByDistributor(0));
createBucket(source, 0);
@@ -1132,7 +1124,7 @@ TEST_F(FileStorManagerTest, notify_on_split_source_ownership_changed) {
splitCmd->setPriority(120);
splitCmd->setSourceIndex(0); // Source not owned by this distributor.
- filestorHandler.schedule(splitCmd, 0);
+ filestorHandler.schedule(splitCmd);
filestorHandler.flush(true);
top.waitForMessages(4, _waitTime); // 3 notify cmds + split reply
@@ -1161,7 +1153,7 @@ TEST_F(FileStorManagerTest, join) {
FileStorHandler filestorHandler(messageSender, metrics, _node->getComponentRegister());
std::unique_ptr<DiskThread> thread(createThread(
*config, *_node, _node->getPersistenceProvider(),
- filestorHandler, *metrics.disks[0]->threads[0], 0));
+ filestorHandler, *metrics.disks[0]->threads[0]));
// Creating documents to test with. Different gids, 2 locations.
std::vector<document::Document::SP > documents;
for (uint32_t i=0; i<20; ++i) {
@@ -1182,7 +1174,7 @@ TEST_F(FileStorManagerTest, join) {
auto cmd = std::make_shared<api::PutCommand>(makeDocumentBucket(bucket), documents[i], 100 + i);
auto address = std::make_unique<api::StorageMessageAddress>("storage", lib::NodeType::STORAGE, 3);
cmd->setAddress(*address);
- filestorHandler.schedule(cmd, 0);
+ filestorHandler.schedule(cmd);
filestorHandler.flush(true);
ASSERT_EQ(1, top.getNumReplies());
auto reply = std::dynamic_pointer_cast<api::PutReply>(top.getReply(0));
@@ -1194,7 +1186,7 @@ TEST_F(FileStorManagerTest, join) {
auto rcmd = std::make_shared<api::RemoveCommand>(
makeDocumentBucket(bucket), documents[i]->getId(), 1000000 + 100 + i);
rcmd->setAddress(*address);
- filestorHandler.schedule(rcmd, 0);
+ filestorHandler.schedule(rcmd);
filestorHandler.flush(true);
ASSERT_EQ(1, top.getNumReplies());
auto rreply = std::dynamic_pointer_cast<api::RemoveReply>(top.getReply(0));
@@ -1209,7 +1201,7 @@ TEST_F(FileStorManagerTest, join) {
auto cmd = std::make_shared<api::JoinBucketsCommand>(makeDocumentBucket(document::BucketId(16, 1)));
cmd->getSourceBuckets().emplace_back(document::BucketId(17, 0x00001));
cmd->getSourceBuckets().emplace_back(document::BucketId(17, 0x10001));
- filestorHandler.schedule(cmd, 0);
+ filestorHandler.schedule(cmd);
filestorHandler.flush(true);
ASSERT_EQ(1, top.getNumReplies());
auto reply = std::dynamic_pointer_cast<api::JoinBucketsReply>(top.getReply(0));
@@ -1224,7 +1216,7 @@ TEST_F(FileStorManagerTest, join) {
makeDocumentBucket(bucket), documents[i]->getId(), document::AllFields::NAME);
api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 3);
cmd->setAddress(address);
- filestorHandler.schedule(cmd, 0);
+ filestorHandler.schedule(cmd);
filestorHandler.flush(true);
ASSERT_EQ(1, top.getNumReplies());
auto reply = std::dynamic_pointer_cast<api::GetReply>(top.getReply(0));
diff --git a/storage/src/tests/persistence/filestorage/modifiedbucketcheckertest.cpp b/storage/src/tests/persistence/filestorage/modifiedbucketcheckertest.cpp
index 1660fed9e38..732cc402641 100644
--- a/storage/src/tests/persistence/filestorage/modifiedbucketcheckertest.cpp
+++ b/storage/src/tests/persistence/filestorage/modifiedbucketcheckertest.cpp
@@ -41,8 +41,7 @@ void
ModifiedBucketCheckerTest::SetUp()
{
_config.reset(new vdstestlib::DirConfig(getStandardConfig(true)));
- _node.reset(new TestServiceLayerApp(DiskCount(1), NodeIndex(0),
- _config->getConfigId()));
+ _node.reset(new TestServiceLayerApp(NodeIndex(0), _config->getConfigId()));
_node->setupDummyPersistence();
_top.reset(new DummyStorageLink);
diff --git a/storage/src/tests/persistence/filestorage/sanitycheckeddeletetest.cpp b/storage/src/tests/persistence/filestorage/sanitycheckeddeletetest.cpp
index 787a63a618c..8c9ea11f1c8 100644
--- a/storage/src/tests/persistence/filestorage/sanitycheckeddeletetest.cpp
+++ b/storage/src/tests/persistence/filestorage/sanitycheckeddeletetest.cpp
@@ -33,7 +33,6 @@ TEST_F(SanityCheckedDeleteTest, delete_bucket_fails_when_provider_out_of_sync) {
StorBucketDatabase::WrappedEntry entry(
_node->getStorageBucketDatabase().get(bucket, "foo",
StorBucketDatabase::CREATE_IF_NONEXISTING));
- entry->disk = 0;
entry->info = serviceLayerInfo;
entry.write();
}
diff --git a/storage/src/tests/persistence/mergehandlertest.cpp b/storage/src/tests/persistence/mergehandlertest.cpp
index df0ea3e6680..565d8b5ee3c 100644
--- a/storage/src/tests/persistence/mergehandlertest.cpp
+++ b/storage/src/tests/persistence/mergehandlertest.cpp
@@ -176,7 +176,6 @@ MergeHandlerTest::SetUp() {
LOG(debug, "Creating %s in bucket database", _bucket.toString().c_str());
bucketdb::StorageBucketInfo bucketDBEntry;
- bucketDBEntry.disk = 0;
getEnv().getBucketDatabase(_bucket.getBucketSpace()).insert(_bucket.getBucketId(), bucketDBEntry, "mergetestsetup");
LOG(debug, "Creating bucket to merge");
diff --git a/storage/src/tests/persistence/persistencequeuetest.cpp b/storage/src/tests/persistence/persistencequeuetest.cpp
index 8edb03b67fa..7e54b45f96a 100644
--- a/storage/src/tests/persistence/persistencequeuetest.cpp
+++ b/storage/src/tests/persistence/persistencequeuetest.cpp
@@ -34,8 +34,6 @@ public:
explicit Fixture(FileStorTestFixture& parent);
~Fixture();
};
-
- static constexpr uint16_t _disk = 0;
};
PersistenceQueueTest::Fixture::Fixture(FileStorTestFixture& parent_)
@@ -58,7 +56,7 @@ PersistenceQueueTest::Fixture::Fixture(FileStorTestFixture& parent_)
// and small enough to not slow down testing too much.
filestorHandler->setGetNextMessageTimeout(20ms);
- stripeId = filestorHandler->getNextStripeId(0);
+ stripeId = filestorHandler->getNextStripeId();
}
PersistenceQueueTest::Fixture::~Fixture() = default;
@@ -90,16 +88,16 @@ TEST_F(PersistenceQueueTest, fetch_next_unlocked_message_if_bucket_locked) {
// getNextMessage 2 times should then return a lock on the first bucket,
// then subsequently on the second, skipping the already locked bucket.
// Puts all have same pri, so order is well defined.
- f.filestorHandler->schedule(createPut(1234, 0), _disk);
- f.filestorHandler->schedule(createPut(1234, 1), _disk);
- f.filestorHandler->schedule(createPut(5432, 0), _disk);
+ f.filestorHandler->schedule(createPut(1234, 0));
+ f.filestorHandler->schedule(createPut(1234, 1));
+ f.filestorHandler->schedule(createPut(5432, 0));
- auto lock0 = f.filestorHandler->getNextMessage(_disk, f.stripeId);
+ auto lock0 = f.filestorHandler->getNextMessage(f.stripeId);
ASSERT_TRUE(lock0.first.get());
EXPECT_EQ(document::BucketId(16, 1234),
dynamic_cast<api::PutCommand&>(*lock0.second).getBucketId());
- auto lock1 = f.filestorHandler->getNextMessage(_disk, f.stripeId);
+ auto lock1 = f.filestorHandler->getNextMessage(f.stripeId);
ASSERT_TRUE(lock1.first.get());
EXPECT_EQ(document::BucketId(16, 5432),
dynamic_cast<api::PutCommand&>(*lock1.second).getBucketId());
@@ -108,16 +106,16 @@ TEST_F(PersistenceQueueTest, fetch_next_unlocked_message_if_bucket_locked) {
TEST_F(PersistenceQueueTest, shared_locked_operations_allow_concurrent_bucket_access) {
Fixture f(*this);
- f.filestorHandler->schedule(createGet(1234), _disk);
- f.filestorHandler->schedule(createGet(1234), _disk);
+ f.filestorHandler->schedule(createGet(1234));
+ f.filestorHandler->schedule(createGet(1234));
- auto lock0 = f.filestorHandler->getNextMessage(_disk, f.stripeId);
+ auto lock0 = f.filestorHandler->getNextMessage(f.stripeId);
ASSERT_TRUE(lock0.first.get());
EXPECT_EQ(api::LockingRequirements::Shared, lock0.first->lockingRequirements());
// Even though we already have a lock on the bucket, Gets allow shared locking and we
// should therefore be able to get another lock.
- auto lock1 = f.filestorHandler->getNextMessage(_disk, f.stripeId);
+ auto lock1 = f.filestorHandler->getNextMessage(f.stripeId);
ASSERT_TRUE(lock1.first.get());
EXPECT_EQ(api::LockingRequirements::Shared, lock1.first->lockingRequirements());
}
@@ -125,45 +123,45 @@ TEST_F(PersistenceQueueTest, shared_locked_operations_allow_concurrent_bucket_ac
TEST_F(PersistenceQueueTest, exclusive_locked_operation_not_started_if_shared_op_active) {
Fixture f(*this);
- f.filestorHandler->schedule(createGet(1234), _disk);
- f.filestorHandler->schedule(createPut(1234, 0), _disk);
+ f.filestorHandler->schedule(createGet(1234));
+ f.filestorHandler->schedule(createPut(1234, 0));
- auto lock0 = f.filestorHandler->getNextMessage(_disk, f.stripeId);
+ auto lock0 = f.filestorHandler->getNextMessage(f.stripeId);
ASSERT_TRUE(lock0.first.get());
EXPECT_EQ(api::LockingRequirements::Shared, lock0.first->lockingRequirements());
// Expected to time out
- auto lock1 = f.filestorHandler->getNextMessage(_disk, f.stripeId);
+ auto lock1 = f.filestorHandler->getNextMessage(f.stripeId);
ASSERT_FALSE(lock1.first.get());
}
TEST_F(PersistenceQueueTest, shared_locked_operation_not_started_if_exclusive_op_active) {
Fixture f(*this);
- f.filestorHandler->schedule(createPut(1234, 0), _disk);
- f.filestorHandler->schedule(createGet(1234), _disk);
+ f.filestorHandler->schedule(createPut(1234, 0));
+ f.filestorHandler->schedule(createGet(1234));
- auto lock0 = f.filestorHandler->getNextMessage(_disk, f.stripeId);
+ auto lock0 = f.filestorHandler->getNextMessage(f.stripeId);
ASSERT_TRUE(lock0.first.get());
EXPECT_EQ(api::LockingRequirements::Exclusive, lock0.first->lockingRequirements());
// Expected to time out
- auto lock1 = f.filestorHandler->getNextMessage(_disk, f.stripeId);
+ auto lock1 = f.filestorHandler->getNextMessage(f.stripeId);
ASSERT_FALSE(lock1.first.get());
}
TEST_F(PersistenceQueueTest, exclusive_locked_operation_not_started_if_exclusive_op_active) {
Fixture f(*this);
- f.filestorHandler->schedule(createPut(1234, 0), _disk);
- f.filestorHandler->schedule(createPut(1234, 0), _disk);
+ f.filestorHandler->schedule(createPut(1234, 0));
+ f.filestorHandler->schedule(createPut(1234, 0));
- auto lock0 = f.filestorHandler->getNextMessage(_disk, f.stripeId);
+ auto lock0 = f.filestorHandler->getNextMessage(f.stripeId);
ASSERT_TRUE(lock0.first.get());
EXPECT_EQ(api::LockingRequirements::Exclusive, lock0.first->lockingRequirements());
// Expected to time out
- auto lock1 = f.filestorHandler->getNextMessage(_disk, f.stripeId);
+ auto lock1 = f.filestorHandler->getNextMessage(f.stripeId);
ASSERT_FALSE(lock1.first.get());
}
diff --git a/storage/src/tests/persistence/persistencetestutils.cpp b/storage/src/tests/persistence/persistencetestutils.cpp
index 11a876ad0e5..7185271bea4 100644
--- a/storage/src/tests/persistence/persistencetestutils.cpp
+++ b/storage/src/tests/persistence/persistencetestutils.cpp
@@ -25,13 +25,11 @@ namespace {
spi::LoadType defaultLoadType(0, "default");
- vdstestlib::DirConfig initialize(uint32_t numDisks, const std::string & rootOfRoot) {
+ vdstestlib::DirConfig initialize(const std::string & rootOfRoot) {
vdstestlib::DirConfig config(getStandardConfig(true, rootOfRoot));
std::string rootFolder = getRootFolder(config);
vespalib::rmdir(rootFolder, true);
- for (uint32_t i = 0; i < numDisks; i++) {
- vespalib::mkdir(vespalib::make_string("%s/disks/d%d", rootFolder.c_str(), i), true);
- }
+ vespalib::mkdir(vespalib::make_string("%s/disks/d0", rootFolder.c_str()), true);
return config;
}
@@ -47,27 +45,23 @@ namespace {
};
}
-PersistenceTestEnvironment::PersistenceTestEnvironment(DiskCount numDisks, const std::string & rootOfRoot)
- : _config(initialize(numDisks, rootOfRoot)),
+PersistenceTestEnvironment::PersistenceTestEnvironment(const std::string & rootOfRoot)
+ : _config(initialize(rootOfRoot)),
_messageKeeper(),
- _node(numDisks, NodeIndex(0), _config.getConfigId()),
+ _node(NodeIndex(0), _config.getConfigId()),
_component(_node.getComponentRegister(), "persistence test env"),
_metrics(_component.getLoadTypes()->getMetricLoadTypes())
{
_node.setupDummyPersistence();
- _metrics.initDiskMetrics(numDisks, _node.getLoadTypes()->getMetricLoadTypes(), 1, 1);
- _handler = std::make_unique<FileStorHandler>(_messageKeeper, _metrics,
- _node.getComponentRegister());
- for (uint32_t i = 0; i < numDisks; i++) {
- _diskEnvs.push_back(
- std::make_unique<PersistenceUtil>(_config.getConfigId(), _node.getComponentRegister(), *_handler,
- *_metrics.disks[i]->threads[0], i, _node.getPersistenceProvider()));
- }
+ _metrics.initDiskMetrics(1, _node.getLoadTypes()->getMetricLoadTypes(), 1, 1);
+ _handler = std::make_unique<FileStorHandler>(_messageKeeper, _metrics, _node.getComponentRegister());
+ _diskEnv = std::make_unique<PersistenceUtil>(_config.getConfigId(), _node.getComponentRegister(), *_handler,
+ *_metrics.disks[0]->threads[0], _node.getPersistenceProvider());
}
PersistenceTestEnvironment::~PersistenceTestEnvironment() {
_handler->close();
- while (!_handler->closed(0)) {
+ while (!_handler->closed()) {
std::this_thread::sleep_for(1ms);
}
}
@@ -76,15 +70,14 @@ PersistenceTestUtils::PersistenceTestUtils() = default;
PersistenceTestUtils::~PersistenceTestUtils() = default;
std::string
-PersistenceTestUtils::dumpBucket(const document::BucketId& bid, uint16_t disk) {
- assert(disk == 0u);
+PersistenceTestUtils::dumpBucket(const document::BucketId& bid) {
return dynamic_cast<spi::dummy::DummyPersistence&>(_env->_node.getPersistenceProvider()).dumpBucket(makeSpiBucket(bid));
}
void
-PersistenceTestUtils::setupDisks(uint32_t numDisks) {
- _env = std::make_unique<PersistenceTestEnvironment>(DiskCount(numDisks), "todo-make-unique-persistencetestutils");
- setupExecutor(numDisks);
+PersistenceTestUtils::setupDisks() {
+ _env = std::make_unique<PersistenceTestEnvironment>("todo-make-unique-persistencetestutils");
+ setupExecutor(2);
}
void
@@ -93,27 +86,24 @@ PersistenceTestUtils::setupExecutor(uint32_t numThreads) {
}
std::unique_ptr<PersistenceThread>
-PersistenceTestUtils::createPersistenceThread(uint32_t disk)
+PersistenceTestUtils::createPersistenceThread()
{
return std::make_unique<PersistenceThread>(_sequenceTaskExecutor.get(), _env->_node.getComponentRegister(),
_env->_config.getConfigId(),getPersistenceProvider(),
- getEnv()._fileStorHandler, getEnv()._metrics, disk);
+ getEnv()._fileStorHandler, getEnv()._metrics);
}
document::Document::SP
PersistenceTestUtils::schedulePut(
uint32_t location,
spi::Timestamp timestamp,
- uint16_t disk,
uint32_t minSize,
uint32_t maxSize)
{
document::Document::SP doc(createRandomDocumentAtLocation(
location, timestamp, minSize, maxSize));
- std::shared_ptr<api::StorageMessage> msg(
- new api::PutCommand(
- makeDocumentBucket(document::BucketId(16, location)), doc, timestamp));
- fsHandler().schedule(msg, disk);
+ auto msg = std::make_shared<api::PutCommand>(makeDocumentBucket(document::BucketId(16, location)), doc, timestamp);
+ fsHandler().schedule(msg);
return doc;
}
@@ -150,7 +140,7 @@ PersistenceTestUtils::getBucketStatus(const document::BucketId& id)
if (!entry.exist()) {
ost << "null";
} else {
- ost << entry->getBucketInfo().getDocumentCount() << "," << entry->disk;
+ ost << entry->getBucketInfo().getDocumentCount();
}
return ost.str();
@@ -158,76 +148,52 @@ PersistenceTestUtils::getBucketStatus(const document::BucketId& id)
document::Document::SP
PersistenceTestUtils::doPutOnDisk(
- uint16_t disk,
uint32_t location,
spi::Timestamp timestamp,
uint32_t minSize,
uint32_t maxSize)
{
- document::Document::SP doc(createRandomDocumentAtLocation(
- location, timestamp, minSize, maxSize));
- assert(disk == 0u);
+ document::Document::SP doc(createRandomDocumentAtLocation(location, timestamp, minSize, maxSize));
spi::Bucket b(makeSpiBucket(document::BucketId(16, location)));
- spi::Context context(defaultLoadType, spi::Priority(0),
- spi::Trace::TraceLevel(0));
-
+ spi::Context context(defaultLoadType, spi::Priority(0), spi::Trace::TraceLevel(0));
getPersistenceProvider().createBucket(b, context);
-
getPersistenceProvider().put(spi::Bucket(b), timestamp, doc, context);
-
return doc;
}
bool
PersistenceTestUtils::doRemoveOnDisk(
- uint16_t disk,
const document::BucketId& bucketId,
const document::DocumentId& docId,
spi::Timestamp timestamp,
bool persistRemove)
{
- spi::Context context(defaultLoadType, spi::Priority(0),
- spi::Trace::TraceLevel(0));
- assert(disk == 0u);
+ spi::Context context(defaultLoadType, spi::Priority(0), spi::Trace::TraceLevel(0));
if (persistRemove) {
- spi::RemoveResult result = getPersistenceProvider().removeIfFound(
- makeSpiBucket(bucketId),
- timestamp, docId, context);
+ spi::RemoveResult result = getPersistenceProvider().removeIfFound(makeSpiBucket(bucketId),timestamp, docId, context);
return result.wasFound();
}
- spi::RemoveResult result = getPersistenceProvider().remove(
- makeSpiBucket(bucketId),
- timestamp, docId, context);
+ spi::RemoveResult result = getPersistenceProvider().remove(makeSpiBucket(bucketId), timestamp, docId, context);
return result.wasFound();
}
bool
PersistenceTestUtils::doUnrevertableRemoveOnDisk(
- uint16_t disk,
const document::BucketId& bucketId,
const document::DocumentId& docId,
spi::Timestamp timestamp)
{
- assert(disk == 0u);
- spi::Context context(defaultLoadType, spi::Priority(0),
- spi::Trace::TraceLevel(0));
- spi::RemoveResult result = getPersistenceProvider().remove(
- makeSpiBucket(bucketId),
- timestamp, docId, context);
+ spi::Context context(defaultLoadType, spi::Priority(0),spi::Trace::TraceLevel(0));
+ spi::RemoveResult result = getPersistenceProvider().remove(makeSpiBucket(bucketId), timestamp, docId, context);
return result.wasFound();
}
spi::GetResult
-PersistenceTestUtils::doGetOnDisk(
- uint16_t disk,
- const document::BucketId& bucketId,
- const document::DocumentId& docId)
+PersistenceTestUtils::doGetOnDisk(const document::BucketId& bucketId, const document::DocumentId& docId)
{
auto fieldSet = std::make_unique<document::AllFields>();
- spi::Context context(defaultLoadType, spi::Priority(0),
- spi::Trace::TraceLevel(0));
- assert(disk == 0u);
+ spi::Context context(defaultLoadType, spi::Priority(0), spi::Trace::TraceLevel(0));
return getPersistenceProvider().get(makeSpiBucket(bucketId), *fieldSet, docId, context);
}
@@ -255,47 +221,19 @@ PersistenceTestUtils::createHeaderUpdate(const document::DocumentId& docId, cons
return update;
}
-uint16_t
-PersistenceTestUtils::getDiskFromBucketDatabaseIfUnset(const document::Bucket& bucket, uint16_t disk)
-{
- if (disk == 0xffff) {
- StorBucketDatabase::WrappedEntry entry(
- getEnv().getBucketDatabase(bucket.getBucketSpace()).get(bucket.getBucketId(), "createTestBucket"));
- if (entry.exist()) {
- return entry->disk;
- } else {
- std::ostringstream error;
- error << bucket.toString() << " not in db and disk unset";
- throw vespalib::IllegalStateException(error.str(), VESPA_STRLOC);
- }
- }
- return disk;
-}
-
void
-PersistenceTestUtils::doPut(const document::Document::SP& doc,
- spi::Timestamp time,
- uint16_t disk,
- uint16_t usedBits)
+PersistenceTestUtils::doPut(const document::Document::SP& doc, spi::Timestamp time, uint16_t usedBits)
{
- document::BucketId bucket(
- _env->_component.getBucketIdFactory().getBucketId(doc->getId()));
+ document::BucketId bucket(_env->_component.getBucketIdFactory().getBucketId(doc->getId()));
bucket.setUsedBits(usedBits);
- disk = getDiskFromBucketDatabaseIfUnset(makeDocumentBucket(bucket), disk);
-
- doPut(doc, bucket, time, disk);
+ doPut(doc, bucket, time);
}
void
-PersistenceTestUtils::doPut(const document::Document::SP& doc,
- document::BucketId bid,
- spi::Timestamp time,
- uint16_t disk)
+PersistenceTestUtils::doPut(const document::Document::SP& doc, document::BucketId bid, spi::Timestamp time)
{
- assert(disk == 0u);
spi::Bucket b(makeSpiBucket(bid));
- spi::Context context(defaultLoadType, spi::Priority(0),
- spi::Trace::TraceLevel(0));
+ spi::Context context(defaultLoadType, spi::Priority(0), spi::Trace::TraceLevel(0));
getPersistenceProvider().createBucket(b, context);
getPersistenceProvider().put(b, time, std::move(doc), context);
}
@@ -303,28 +241,20 @@ PersistenceTestUtils::doPut(const document::Document::SP& doc,
spi::UpdateResult
PersistenceTestUtils::doUpdate(document::BucketId bid,
const document::DocumentUpdate::SP& update,
- spi::Timestamp time,
- uint16_t disk)
+ spi::Timestamp time)
{
- spi::Context context(defaultLoadType, spi::Priority(0),
- spi::Trace::TraceLevel(0));
- assert(disk == 0u);
- return getPersistenceProvider().update(
- makeSpiBucket(bid), time, update, context);
+ spi::Context context(defaultLoadType, spi::Priority(0), spi::Trace::TraceLevel(0));
+ return getPersistenceProvider().update(makeSpiBucket(bid), time, update, context);
}
void
PersistenceTestUtils::doRemove(const document::DocumentId& id, spi::Timestamp time,
- uint16_t disk, bool unrevertableRemove,
- uint16_t usedBits)
+ bool unrevertableRemove, uint16_t usedBits)
{
document::BucketId bucket(
_env->_component.getBucketIdFactory().getBucketId(id));
bucket.setUsedBits(usedBits);
- disk = getDiskFromBucketDatabaseIfUnset(makeDocumentBucket(bucket), disk);
- spi::Context context(defaultLoadType, spi::Priority(0),
- spi::Trace::TraceLevel(0));
- assert(disk == 0u);
+ spi::Context context(defaultLoadType, spi::Priority(0), spi::Trace::TraceLevel(0));
if (unrevertableRemove) {
getPersistenceProvider().remove(
makeSpiBucket(bucket), time, id, context);
@@ -360,8 +290,7 @@ PersistenceTestUtils::createRandomDocumentAtLocation(
}
void
-PersistenceTestUtils::createTestBucket(const document::Bucket& bucket,
- uint16_t disk)
+PersistenceTestUtils::createTestBucket(const document::Bucket& bucket)
{
document::BucketId bucketId(bucket.getBucketId());
uint32_t opsPerType = 2;
@@ -377,25 +306,20 @@ PersistenceTestUtils::createTestBucket(const document::Bucket& bucket,
location <<= 32;
location += (bucketId.getRawId() & 0xffffffff);
document::Document::SP doc(
- createRandomDocumentAtLocation(
- location, seed, minDocSize, maxDocSize));
+ createRandomDocumentAtLocation(location, seed, minDocSize, maxDocSize));
if (headerOnly) {
clearBody(*doc);
}
- doPut(doc, spi::Timestamp(seed), disk, bucketId.getUsedBits());
+ doPut(doc, spi::Timestamp(seed), bucketId.getUsedBits());
if (optype == 0) { // Regular put
} else if (optype == 1) { // Overwritten later in time
document::Document::SP doc2(new document::Document(*doc));
- doc2->setValue(doc2->getField("content"),
- document::StringFieldValue("overwritten"));
- doPut(doc2, spi::Timestamp(seed + 500),
- disk, bucketId.getUsedBits());
+ doc2->setValue(doc2->getField("content"), document::StringFieldValue("overwritten"));
+ doPut(doc2, spi::Timestamp(seed + 500), bucketId.getUsedBits());
} else if (optype == 2) { // Removed
- doRemove(doc->getId(), spi::Timestamp(seed + 500), disk, false,
- bucketId.getUsedBits());
+ doRemove(doc->getId(), spi::Timestamp(seed + 500), false, bucketId.getUsedBits());
} else if (optype == 3) { // Unrevertable removed
- doRemove(doc->getId(), spi::Timestamp(seed), disk, true,
- bucketId.getUsedBits());
+ doRemove(doc->getId(), spi::Timestamp(seed), true, bucketId.getUsedBits());
}
}
}
diff --git a/storage/src/tests/persistence/persistencetestutils.h b/storage/src/tests/persistence/persistencetestutils.h
index d889deabbd5..a0ab516754a 100644
--- a/storage/src/tests/persistence/persistencetestutils.h
+++ b/storage/src/tests/persistence/persistencetestutils.h
@@ -24,7 +24,7 @@ struct MessageKeeper : public MessageSender {
};
struct PersistenceTestEnvironment {
- PersistenceTestEnvironment(DiskCount numDisks, const std::string & rootOfRoot);
+ PersistenceTestEnvironment(const std::string & rootOfRoot);
~PersistenceTestEnvironment();
document::TestDocMan _testDocMan;
@@ -34,7 +34,7 @@ struct PersistenceTestEnvironment {
StorageComponent _component;
FileStorMetrics _metrics;
std::unique_ptr<FileStorHandler> _handler;
- std::vector<std::unique_ptr<PersistenceUtil> > _diskEnvs;
+ std::unique_ptr<PersistenceUtil> _diskEnv;
};
class PersistenceTestUtils : public testing::Test {
@@ -76,14 +76,9 @@ public:
PersistenceTestUtils();
virtual ~PersistenceTestUtils();
- document::Document::SP schedulePut(
- uint32_t location,
- spi::Timestamp timestamp,
- uint16_t disk,
- uint32_t minSize = 0,
- uint32_t maxSize = 128);
+ document::Document::SP schedulePut(uint32_t location, spi::Timestamp timestamp, uint32_t minSize = 0, uint32_t maxSize = 128);
- void setupDisks(uint32_t disks);
+ void setupDisks();
void setupExecutor(uint32_t numThreads);
void TearDown() override {
@@ -94,10 +89,9 @@ public:
_env.reset();
}
- std::string dumpBucket(const document::BucketId& bid, uint16_t disk = 0);
+ std::string dumpBucket(const document::BucketId& bid);
- PersistenceUtil& getEnv(uint32_t disk = 0)
- { return *_env->_diskEnvs[disk]; }
+ PersistenceUtil& getEnv() { return *_env->_diskEnv; }
FileStorHandler& fsHandler() { return *_env->_handler; }
FileStorMetrics& metrics() { return _env->_metrics; }
MessageKeeper& messageKeeper() { return _env->_messageKeeper; }
@@ -128,11 +122,9 @@ public:
}
/**
- Performs a put to the given disk.
Returns the document that was inserted.
*/
document::Document::SP doPutOnDisk(
- uint16_t disk,
uint32_t location,
spi::Timestamp timestamp,
uint32_t minSize = 0,
@@ -143,14 +135,12 @@ public:
spi::Timestamp timestamp,
uint32_t minSize = 0,
uint32_t maxSize = 128)
- { return doPutOnDisk(0, location, timestamp, minSize, maxSize); }
+ { return doPutOnDisk(location, timestamp, minSize, maxSize); }
/**
- Performs a remove to the given disk.
Returns the new doccount if document was removed, or -1 if not found.
*/
bool doRemoveOnDisk(
- uint16_t disk,
const document::BucketId& bid,
const document::DocumentId& id,
spi::Timestamp timestamp,
@@ -161,11 +151,10 @@ public:
const document::DocumentId& id,
spi::Timestamp timestamp,
bool persistRemove) {
- return doRemoveOnDisk(0, bid, id, timestamp, persistRemove);
+ return doRemoveOnDisk(bid, id, timestamp, persistRemove);
}
- bool doUnrevertableRemoveOnDisk(uint16_t disk,
- const document::BucketId& bid,
+ bool doUnrevertableRemoveOnDisk(const document::BucketId& bid,
const document::DocumentId& id,
spi::Timestamp timestamp);
@@ -173,29 +162,27 @@ public:
const document::DocumentId& id,
spi::Timestamp timestamp)
{
- return doUnrevertableRemoveOnDisk(0, bid, id, timestamp);
+ return doUnrevertableRemoveOnDisk(bid, id, timestamp);
}
/**
* Do a remove toward storage set up in test environment.
*
* @id Document to remove.
- * @disk If set, use this disk, otherwise lookup in bucket db.
* @unrevertableRemove If set, instead of adding put, turn put to remove.
* @usedBits Generate bucket to use from docid using this amount of bits.
*/
- void doRemove(const document::DocumentId& id, spi::Timestamp, uint16_t disk = 0xffff,
+ void doRemove(const document::DocumentId& id, spi::Timestamp,
bool unrevertableRemove = false, uint16_t usedBits = 16);
spi::GetResult doGetOnDisk(
- uint16_t disk,
const document::BucketId& bucketId,
const document::DocumentId& docId);
spi::GetResult doGet(
const document::BucketId& bucketId,
const document::DocumentId& docId)
- { return doGetOnDisk(0, bucketId, docId); }
+ { return doGetOnDisk(bucketId, docId); }
std::shared_ptr<document::DocumentUpdate> createBodyUpdate(
const document::DocumentId& id,
@@ -205,28 +192,23 @@ public:
const document::DocumentId& id,
const document::FieldValue& updateValue);
- uint16_t getDiskFromBucketDatabaseIfUnset(const document::Bucket &,
- uint16_t disk = 0xffff);
+ uint16_t getDiskFromBucketDatabaseIfUnset(const document::Bucket &);
/**
* Do a put toward storage set up in test environment.
*
* @doc Document to put. Use TestDocMan to generate easily.
- * @disk If set, use this disk, otherwise lookup in bucket db.
* @usedBits Generate bucket to use from docid using this amount of bits.
*/
- void doPut(const document::Document::SP& doc, spi::Timestamp,
- uint16_t disk = 0xffff, uint16_t usedBits = 16);
+ void doPut(const document::Document::SP& doc, spi::Timestamp, uint16_t usedBits = 16);
void doPut(const document::Document::SP& doc,
document::BucketId bid,
- spi::Timestamp time,
- uint16_t disk = 0);
+ spi::Timestamp time);
spi::UpdateResult doUpdate(document::BucketId bid,
const std::shared_ptr<document::DocumentUpdate>& update,
- spi::Timestamp time,
- uint16_t disk = 0);
+ spi::Timestamp time);
document::Document::UP createRandomDocumentAtLocation(
uint64_t location, uint32_t seed,
@@ -237,14 +219,13 @@ public:
* bucket can represent. (Such that tests have a nice test bucket to use
* that require operations to handle all the various bucket contents.
*
- * @disk If set, use this disk, otherwise lookup in bucket db.
*/
- void createTestBucket(const document::Bucket&, uint16_t disk = 0xffff);
+ void createTestBucket(const document::Bucket&);
/**
* Create a new persistence thread.
*/
- std::unique_ptr<PersistenceThread> createPersistenceThread(uint32_t disk);
+ std::unique_ptr<PersistenceThread> createPersistenceThread();
/**
* In-place modify doc so that it has no more body fields.
@@ -256,7 +237,7 @@ class SingleDiskPersistenceTestUtils : public PersistenceTestUtils
{
public:
void SetUp() override {
- setupDisks(1);
+ setupDisks();
}
};
diff --git a/storage/src/tests/persistence/persistencethread_splittest.cpp b/storage/src/tests/persistence/persistencethread_splittest.cpp
index 3d7fc70db6a..10fb0b2e6b4 100644
--- a/storage/src/tests/persistence/persistencethread_splittest.cpp
+++ b/storage/src/tests/persistence/persistencethread_splittest.cpp
@@ -204,7 +204,7 @@ PersistenceThreadSplitTest::doTest(SplitCase splitCase)
spi.put(bucket, spi::Timestamp(1000 + i), std::move(doc), context);
}
- std::unique_ptr<PersistenceThread> thread(createPersistenceThread(0));
+ std::unique_ptr<PersistenceThread> thread(createPersistenceThread());
getNode().getStateUpdater().setClusterState(
std::make_shared<lib::ClusterState>("distributor:1 storage:1"));
document::Bucket docBucket = makeDocumentBucket(document::BucketId(currentSplitLevel, 1));
diff --git a/storage/src/tests/persistence/processalltest.cpp b/storage/src/tests/persistence/processalltest.cpp
index 5174b733334..24c67fe93c1 100644
--- a/storage/src/tests/persistence/processalltest.cpp
+++ b/storage/src/tests/persistence/processalltest.cpp
@@ -42,7 +42,7 @@ TEST_F(ProcessAllHandlerTest, remove_location_document_subset) {
for (int i = 0; i < 10; ++i) {
document::Document::SP doc(docMan.createRandomDocumentAtLocation(4, 1234 + i));
doc->setValue(doc->getField("headerval"), document::IntFieldValue(i));
- doPut(doc, bucketId, spi::Timestamp(100 + i), 0);
+ doPut(doc, bucketId, spi::Timestamp(100 + i));
}
document::Bucket bucket = makeDocumentBucket(bucketId);
@@ -102,7 +102,7 @@ TEST_F(ProcessAllHandlerTest, bucket_stat_request_returns_document_metadata_matc
for (int i = 0; i < 10; ++i) {
document::Document::SP doc(docMan.createRandomDocumentAtLocation(4, 1234 + i));
doc->setValue(doc->getField("headerval"), document::IntFieldValue(i));
- doPut(doc, bucketId, spi::Timestamp(100 + i), 0);
+ doPut(doc, bucketId, spi::Timestamp(100 + i));
}
document::Bucket bucket = makeDocumentBucket(bucketId);
@@ -114,7 +114,7 @@ TEST_F(ProcessAllHandlerTest, bucket_stat_request_returns_document_metadata_matc
EXPECT_EQ(api::ReturnCode::OK, reply.getResult().getResult());
vespalib::string expected =
- "Persistence bucket BucketId(0x4000000000000004), partition 0\n"
+ "Persistence bucket BucketId(0x4000000000000004)\n"
" Timestamp: 100, Doc(id:mail:testdoctype1:n=4:3619.html), gid(0x0400000092bb8d298934253a), size: 163\n"
" Timestamp: 102, Doc(id:mail:testdoctype1:n=4:62608.html), gid(0x04000000ce878d2488413bc4), size: 141\n"
" Timestamp: 104, Doc(id:mail:testdoctype1:n=4:56061.html), gid(0x040000002b8f80f0160f6c5c), size: 118\n"
@@ -132,7 +132,7 @@ TEST_F(ProcessAllHandlerTest, stat_bucket_request_can_returned_removed_entries)
for (int i = 0; i < 10; ++i) {
document::Document::SP doc(docMan.createRandomDocumentAtLocation(4, 1234 + i));
doc->setValue(doc->getField("headerval"), document::IntFieldValue(i));
- doPut(doc, bucketId, spi::Timestamp(100 + i), 0);
+ doPut(doc, bucketId, spi::Timestamp(100 + i));
doRemove(bucketId,
doc->getId(),
spi::Timestamp(200 + i),
@@ -148,7 +148,7 @@ TEST_F(ProcessAllHandlerTest, stat_bucket_request_can_returned_removed_entries)
EXPECT_EQ(api::ReturnCode::OK, reply.getResult().getResult());
vespalib::string expected =
- "Persistence bucket BucketId(0x4000000000000004), partition 0\n"
+ "Persistence bucket BucketId(0x4000000000000004)\n"
" Timestamp: 100, Doc(id:mail:testdoctype1:n=4:3619.html), gid(0x0400000092bb8d298934253a), size: 163\n"
" Timestamp: 101, Doc(id:mail:testdoctype1:n=4:33113.html), gid(0x04000000b121a632741db368), size: 89\n"
" Timestamp: 102, Doc(id:mail:testdoctype1:n=4:62608.html), gid(0x04000000ce878d2488413bc4), size: 141\n"
@@ -182,7 +182,7 @@ TEST_F(ProcessAllHandlerTest, bucket_stat_request_can_return_all_put_entries_in_
for (int i = 0; i < 10; ++i) {
document::Document::SP doc(docMan.createRandomDocumentAtLocation(4, 1234 + i));
doc->setValue(doc->getField("headerval"), document::IntFieldValue(i));
- doPut(doc, bucketId, spi::Timestamp(100 + i), 0);
+ doPut(doc, bucketId, spi::Timestamp(100 + i));
}
document::Bucket bucket = makeDocumentBucket(bucketId);
@@ -194,7 +194,7 @@ TEST_F(ProcessAllHandlerTest, bucket_stat_request_can_return_all_put_entries_in_
EXPECT_EQ(api::ReturnCode::OK, reply.getResult().getResult());
vespalib::string expected =
- "Persistence bucket BucketId(0x4000000000000004), partition 0\n"
+ "Persistence bucket BucketId(0x4000000000000004)\n"
" Timestamp: 100, Doc(id:mail:testdoctype1:n=4:3619.html), gid(0x0400000092bb8d298934253a), size: 163\n"
" Timestamp: 101, Doc(id:mail:testdoctype1:n=4:33113.html), gid(0x04000000b121a632741db368), size: 89\n"
" Timestamp: 102, Doc(id:mail:testdoctype1:n=4:62608.html), gid(0x04000000ce878d2488413bc4), size: 141\n"
diff --git a/storage/src/tests/persistence/testandsettest.cpp b/storage/src/tests/persistence/testandsettest.cpp
index 18d78e1e8cf..ce041660a2f 100644
--- a/storage/src/tests/persistence/testandsettest.cpp
+++ b/storage/src/tests/persistence/testandsettest.cpp
@@ -47,7 +47,7 @@ struct TestAndSetTest : SingleDiskPersistenceTestUtils {
createBucket(BUCKET_ID);
getPersistenceProvider().createBucket(makeSpiBucket(BUCKET_ID),context);
- thread = createPersistenceThread(0);
+ thread = createPersistenceThread();
testDoc = createTestDocument();
testDocId = testDoc->getId();
}
diff --git a/storage/src/tests/storageserver/bouncertest.cpp b/storage/src/tests/storageserver/bouncertest.cpp
index 46dad62de48..bcfe2bd0830 100644
--- a/storage/src/tests/storageserver/bouncertest.cpp
+++ b/storage/src/tests/storageserver/bouncertest.cpp
@@ -65,7 +65,7 @@ BouncerTest::BouncerTest()
void BouncerTest::setUpAsNode(const lib::NodeType& type) {
vdstestlib::DirConfig config(getStandardConfig(type == lib::NodeType::STORAGE));
if (type == lib::NodeType::STORAGE) {
- _node.reset(new TestServiceLayerApp(DiskCount(1), NodeIndex(2), config.getConfigId()));
+ _node.reset(new TestServiceLayerApp(NodeIndex(2), config.getConfigId()));
} else {
_node.reset(new TestDistributorApp(NodeIndex(2), config.getConfigId()));
}
diff --git a/storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp b/storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp
index 2f091572ed2..22441223c5c 100644
--- a/storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp
+++ b/storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp
@@ -109,7 +109,6 @@ ChangedBucketOwnershipHandlerTest::insertBuckets(uint32_t numBuckets,
bucketdb::StorageBucketInfo sbi;
sbi.setBucketInfo(api::BucketInfo(1, 2, 3));
- sbi.disk = 0;
_app->getStorageBucketDatabase().insert(bucket, sbi, "test");
inserted.push_back(bucket);
}
diff --git a/storage/src/tests/storageserver/mergethrottlertest.cpp b/storage/src/tests/storageserver/mergethrottlertest.cpp
index 6ac0bfcb5d8..e6067fa561c 100644
--- a/storage/src/tests/storageserver/mergethrottlertest.cpp
+++ b/storage/src/tests/storageserver/mergethrottlertest.cpp
@@ -156,13 +156,11 @@ MergeThrottlerTest::SetUp()
vdstestlib::DirConfig config(getStandardConfig(true));
for (int i = 0; i < _storageNodeCount; ++i) {
- std::unique_ptr<TestServiceLayerApp> server(
- new TestServiceLayerApp(DiskCount(1), NodeIndex(i)));
- server->setClusterState(lib::ClusterState(
- "distributor:100 storage:100 version:1"));
+ auto server = std::make_unique<TestServiceLayerApp>(NodeIndex(i));
+ server->setClusterState(lib::ClusterState("distributor:100 storage:100 version:1"));
std::unique_ptr<DummyStorageLink> top;
- top.reset(new DummyStorageLink);
+ top = std::make_unique<DummyStorageLink>();
MergeThrottler* throttler = new MergeThrottler(config.getConfigId(), server->getComponentRegister());
// MergeThrottler will be sandwiched in between two dummy links
top->push_back(std::unique_ptr<StorageLink>(throttler));
diff --git a/storage/src/tests/storageserver/statemanagertest.cpp b/storage/src/tests/storageserver/statemanagertest.cpp
index f88b59f50a5..b55e62d5fd3 100644
--- a/storage/src/tests/storageserver/statemanagertest.cpp
+++ b/storage/src/tests/storageserver/statemanagertest.cpp
@@ -55,7 +55,7 @@ StateManagerTest::StateManagerTest()
void
StateManagerTest::SetUp() {
vdstestlib::DirConfig config(getStandardConfig(true));
- _node = std::make_unique<TestServiceLayerApp>(DiskCount(1), NodeIndex(2));
+ _node = std::make_unique<TestServiceLayerApp>(NodeIndex(2));
// Clock will increase 1 sec per call.
_node->getClock().setAbsoluteTimeInSeconds(1);
_metricManager = std::make_unique<metrics::MetricManager>();
diff --git a/storage/src/tests/storageserver/statereportertest.cpp b/storage/src/tests/storageserver/statereportertest.cpp
index f0907bffba1..728bbb7a473 100644
--- a/storage/src/tests/storageserver/statereportertest.cpp
+++ b/storage/src/tests/storageserver/statereportertest.cpp
@@ -70,7 +70,7 @@ void StateReporterTest::SetUp() {
_config = std::make_unique<vdstestlib::DirConfig>(getStandardConfig(true, "statereportertest"));
assert(system(("rm -rf " + getRootFolder(*_config)).c_str()) == 0);
- _node = std::make_unique<TestServiceLayerApp>(DiskCount(1), NodeIndex(0), _config->getConfigId());
+ _node = std::make_unique<TestServiceLayerApp>(NodeIndex(0), _config->getConfigId());
_node->setupDummyPersistence();
_clock = &_node->getClock();
_clock->setAbsoluteTimeInSeconds(1000000);
diff --git a/storage/src/tests/visiting/visitormanagertest.cpp b/storage/src/tests/visiting/visitormanagertest.cpp
index c6ce935b611..031290cecb4 100644
--- a/storage/src/tests/visiting/visitormanagertest.cpp
+++ b/storage/src/tests/visiting/visitormanagertest.cpp
@@ -161,7 +161,6 @@ VisitorManagerTest::initializeTest()
StorBucketDatabase::WrappedEntry entry(
_node->getStorageBucketDatabase().get(bid, "",
StorBucketDatabase::CREATE_IF_NONEXISTING));
- entry->disk = 0;
entry.write();
}
for (uint32_t i=0; i<docCount; ++i) {
diff --git a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp
index 7b9d5b77c98..ab22d8440fa 100644
--- a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp
+++ b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp
@@ -184,16 +184,15 @@ namespace {
document::BucketId::keyToBucketId(bucketId));
if (data.valid()) {
- assert(data.disk < diskCount);
- ++disk[data.disk].buckets;
+ ++disk[0].buckets;
if (data.getBucketInfo().isActive()) {
- ++disk[data.disk].active;
+ ++disk[0].active;
}
if (data.getBucketInfo().isReady()) {
- ++disk[data.disk].ready;
+ ++disk[0].ready;
}
- disk[data.disk].docs += data.getBucketInfo().getDocumentCount();
- disk[data.disk].bytes += data.getBucketInfo().getTotalDocumentSize();
+ disk[0].docs += data.getBucketInfo().getDocumentCount();
+ disk[0].bytes += data.getBucketInfo().getTotalDocumentSize();
if (bucket.getUsedBits() < lowestUsedBit) {
lowestUsedBit = bucket.getUsedBits();
@@ -357,7 +356,6 @@ namespace {
_xos << XmlTag("bucket")
<< XmlAttribute("id", ost.str());
info.getBucketInfo().printXml(_xos);
- _xos << XmlAttribute("disk", info.disk);
_xos << XmlEndTag();
};
};
diff --git a/storage/src/vespa/storage/bucketdb/storagebucketdbinitializer.cpp b/storage/src/vespa/storage/bucketdb/storagebucketdbinitializer.cpp
index 0878ffb3d99..0761d845ce6 100644
--- a/storage/src/vespa/storage/bucketdb/storagebucketdbinitializer.cpp
+++ b/storage/src/vespa/storage/bucketdb/storagebucketdbinitializer.cpp
@@ -382,7 +382,7 @@ StorageBucketDBInitializer::registerBucket(const document::Bucket &bucket,
bucketInfo.toString().c_str());
}
if (entry.preExisted()) {
- if (entry->disk == partition) {
+ if (0 == partition) {
LOG(debug, "%s already existed in bucket database on disk %i. "
"Might have been moved from wrong directory prior to "
"listing this directory.",
@@ -395,14 +395,14 @@ StorageBucketDBInitializer::registerBucket(const document::Bucket &bucket,
bucketId.stripUnused()) == partition)
{
keepOnDisk = partition;
- joinFromDisk = entry->disk;
+ joinFromDisk = 0;
} else {
- keepOnDisk = entry->disk;
+ keepOnDisk = 0;
joinFromDisk = partition;
}
- LOG(debug, "%s exist on both disk %u and disk %i. Joining two versions "
+ LOG(debug, "%s exist on both disk 0 and disk %i. Joining two versions "
"onto disk %u.",
- bucketId.toString().c_str(), entry->disk, int(partition), keepOnDisk);
+ bucketId.toString().c_str(), int(partition), keepOnDisk);
entry.unlock();
// Must not have bucket db lock while sending down
auto cmd = std::make_shared<InternalBucketJoinCommand>(bucket, keepOnDisk, joinFromDisk);
@@ -414,7 +414,6 @@ StorageBucketDBInitializer::registerBucket(const document::Bucket &bucket,
_system._component.getMinUsedBitsTracker().update(bucketId);
LOG(spam, "Inserted %s on disk %i into bucket database",
bucketId.toString().c_str(), int(partition));
- entry->disk = partition;
entry.write();
uint16_t disk(distribution.getIdealDisk(
_system._nodeState, _system._nodeIndex, bucketId.stripUnused(),
@@ -453,7 +452,7 @@ namespace {
return StorBucketDatabase::Decision::CONTINUE;
}
_iterator = bucket;
- if (entry.disk != _disk) {
+ if (0 != _disk) {
//LOG(spam, "Ignoring bucket %s as it is not on disk currently "
// "being processed", bucket.toString().c_str());
// Ignore. We only want to scan for one disk
diff --git a/storage/src/vespa/storage/bucketdb/storagebucketinfo.h b/storage/src/vespa/storage/bucketdb/storagebucketinfo.h
index d9081432feb..f8732b7914b 100644
--- a/storage/src/vespa/storage/bucketdb/storagebucketinfo.h
+++ b/storage/src/vespa/storage/bucketdb/storagebucketinfo.h
@@ -8,9 +8,8 @@ namespace storage::bucketdb {
struct StorageBucketInfo {
api::BucketInfo info;
- unsigned disk : 8; // The disk containing the bucket
- StorageBucketInfo() : info(), disk(0xff) {}
+ StorageBucketInfo() : info() {}
static bool mayContain(const StorageBucketInfo&) { return true; }
void print(std::ostream&, bool verbose, const std::string& indent) const;
bool valid() const { return info.valid(); }
@@ -22,7 +21,7 @@ struct StorageBucketInfo {
info.setDocumentCount(0);
info.setTotalDocumentSize(0);
}
- bool verifyLegal() const { return (disk != 0xff); }
+ bool verifyLegal() const { return true; }
uint32_t getMetaCount() const { return info.getMetaCount(); }
void setChecksum(uint32_t crc) { info.setChecksum(crc); }
bool operator == (const StorageBucketInfo & b) const;
diff --git a/storage/src/vespa/storage/bucketdb/storbucketdb.cpp b/storage/src/vespa/storage/bucketdb/storbucketdb.cpp
index e64a19e9a4a..bb61867bcc5 100644
--- a/storage/src/vespa/storage/bucketdb/storbucketdb.cpp
+++ b/storage/src/vespa/storage/bucketdb/storbucketdb.cpp
@@ -17,19 +17,19 @@ void
StorageBucketInfo::
print(std::ostream& out, bool, const std::string&) const
{
- out << info << ", disk " << disk;
+ out << info;
}
-bool StorageBucketInfo::operator==(const StorageBucketInfo& b) const {
- return disk == b.disk;
+bool StorageBucketInfo::operator==(const StorageBucketInfo& ) const {
+ return true;
}
bool StorageBucketInfo::operator!=(const StorageBucketInfo& b) const {
return !(*this == b);
}
-bool StorageBucketInfo::operator<(const StorageBucketInfo& b) const {
- return disk < b.disk;
+bool StorageBucketInfo::operator<(const StorageBucketInfo& ) const {
+ return false;
}
std::ostream&
@@ -62,7 +62,6 @@ StorBucketDatabase::insert(const document::BucketId& bucket,
const bucketdb::StorageBucketInfo& entry,
const char* clientId)
{
- assert(entry.disk != 0xff);
bool preExisted;
return _impl->insert(bucket.toKey(), entry, clientId, false, preExisted);
}
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp
index 9d9c7e10111..af863f82d6f 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp
@@ -25,15 +25,15 @@ FileStorHandler::flush(bool flushMerges)
}
void
-FileStorHandler::setDiskState(uint16_t disk, DiskState state)
+FileStorHandler::setDiskState(DiskState state)
{
- _impl->setDiskState(disk, state);
+ _impl->setDiskState(state);
}
FileStorHandler::DiskState
-FileStorHandler::getDiskState(uint16_t disk)
+FileStorHandler::getDiskState()
{
- return _impl->getDiskState(disk);
+ return _impl->getDiskState();
}
void
@@ -49,28 +49,28 @@ FileStorHandler::pause()
}
bool
-FileStorHandler::schedule(const api::StorageMessage::SP& msg, uint16_t disk)
+FileStorHandler::schedule(const api::StorageMessage::SP& msg)
{
- return _impl->schedule(msg, disk);
+ return _impl->schedule(msg);
}
FileStorHandler::LockedMessage
-FileStorHandler::getNextMessage(uint16_t disk, uint32_t stripeId)
+FileStorHandler::getNextMessage(uint32_t stripeId)
{
- return _impl->getNextMessage(disk, stripeId);
+ return _impl->getNextMessage(stripeId);
}
FileStorHandler::BucketLockInterface::SP
-FileStorHandler::lock(const document::Bucket& bucket, uint16_t disk, api::LockingRequirements lockReq)
+FileStorHandler::lock(const document::Bucket& bucket, api::LockingRequirements lockReq)
{
- return _impl->lock(bucket, disk, lockReq);
+ return _impl->lock(bucket, lockReq);
}
void
-FileStorHandler::remapQueueAfterDiskMove(const document::Bucket& bucket, uint16_t sourceDisk, uint16_t targetDisk)
+FileStorHandler::remapQueueAfterDiskMove(const document::Bucket& bucket)
{
- RemapInfo target(bucket, targetDisk);
- _impl->remapQueue(RemapInfo(bucket, sourceDisk), target, FileStorHandlerImpl::MOVE);
+ RemapInfo target(bucket);
+ _impl->remapQueue(RemapInfo(bucket), target, FileStorHandlerImpl::MOVE);
}
void
@@ -86,9 +86,9 @@ FileStorHandler::remapQueueAfterSplit(const RemapInfo& source,RemapInfo& target1
}
void
-FileStorHandler::failOperations(const document::Bucket &bucket, uint16_t fromDisk, const api::ReturnCode& err)
+FileStorHandler::failOperations(const document::Bucket &bucket, const api::ReturnCode& err)
{
- _impl->failOperations(bucket, fromDisk, err);
+ _impl->failOperations(bucket, err);
}
void
@@ -121,12 +121,6 @@ FileStorHandler::getQueueSize() const
return _impl->getQueueSize();
}
-uint32_t
-FileStorHandler::getQueueSize(uint16_t disk) const
-{
- return _impl->getQueueSize(disk);
-}
-
void
FileStorHandler::addMergeStatus(const document::Bucket& bucket, MergeStatus::SP ms)
{
@@ -152,8 +146,8 @@ FileStorHandler::getNumActiveMerges() const
}
uint32_t
-FileStorHandler::getNextStripeId(uint32_t disk) {
- return _impl->getNextStripeId(disk);
+FileStorHandler::getNextStripeId() {
+ return _impl->getNextStripeId();
}
void
@@ -182,9 +176,9 @@ FileStorHandler::setGetNextMessageTimeout(vespalib::duration timeout)
}
std::string
-FileStorHandler::dumpQueue(uint16_t disk) const
+FileStorHandler::dumpQueue() const
{
- return _impl->dumpQueue(disk);
+ return _impl->dumpQueue();
}
} // storage
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
index ccce5b7326a..bb210cd6b62 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
@@ -39,14 +39,12 @@ class FileStorHandler : public MessageSender {
public:
struct RemapInfo {
document::Bucket bucket;
- uint16_t diskIndex;
bool foundInQueue;
- RemapInfo(const document::Bucket &bucket_, uint16_t diskIdx)
+ RemapInfo(const document::Bucket &bucket_)
: bucket(bucket_),
- diskIndex(diskIdx),
foundInQueue(false)
- {}
+ {}
};
class BucketLockInterface {
@@ -69,8 +67,7 @@ public:
FileStorHandler(uint32_t numThreads, uint32_t numStripes, MessageSender&, FileStorMetrics&,
ServiceLayerComponentRegister&);
- FileStorHandler(MessageSender&, FileStorMetrics&,
- ServiceLayerComponentRegister&);
+ FileStorHandler(MessageSender&, FileStorMetrics&, ServiceLayerComponentRegister&);
~FileStorHandler();
// Commands used by file stor manager
@@ -84,17 +81,17 @@ public:
*/
void flush(bool killPendingMerges);
- void setDiskState(uint16_t disk, DiskState state);
- DiskState getDiskState(uint16_t disk);
+ void setDiskState(DiskState state);
+ DiskState getDiskState();
/** Check whether a given disk is enabled or not. */
- bool enabled(uint16_t disk) { return (getDiskState(disk) == AVAILABLE); }
- bool closed(uint16_t disk) { return (getDiskState(disk) == CLOSED); }
+ bool enabled() { return (getDiskState() == AVAILABLE); }
+ bool closed() { return (getDiskState() == CLOSED); }
/**
- * Disable the given disk. Operations towards threads using this disk will
+ * Disable the disk. Operations towards threads using this disk will
* start to fail. Typically called when disk errors are detected.
*/
- void disable(uint16_t disk) { setDiskState(disk, DISABLED); }
+ void disable() { setDiskState(DISABLED); }
/** Closes all disk threads. */
void close();
@@ -108,14 +105,14 @@ public:
* Schedule a storage message to be processed by the given disk
* @return True if we maanged to schedule operation. False if not
*/
- bool schedule(const std::shared_ptr<api::StorageMessage>&, uint16_t disk);
+ bool schedule(const std::shared_ptr<api::StorageMessage>&);
/**
* Used by file stor threads to get their next message to process.
*
* @param disk The disk to get messages for
*/
- LockedMessage getNextMessage(uint16_t disk, uint32_t stripeId);
+ LockedMessage getNextMessage(uint32_t stripeId);
/**
* Lock a bucket. By default, each file stor thread has the locks of all
@@ -131,7 +128,7 @@ public:
*
*
*/
- BucketLockInterface::SP lock(const document::Bucket&, uint16_t disk, api::LockingRequirements lockReq);
+ BucketLockInterface::SP lock(const document::Bucket&, api::LockingRequirements lockReq);
/**
* Called by FileStorThread::onBucketDiskMove() after moving file, in case
@@ -143,8 +140,7 @@ public:
* requeststatus - Ignore
* readbucketinfo/bucketdiskmove/internalbucketjoin - Fail and log errors
*/
- void remapQueueAfterDiskMove(const document::Bucket &bucket,
- uint16_t sourceDisk, uint16_t targetDisk);
+ void remapQueueAfterDiskMove(const document::Bucket &bucket);
/**
* Called by FileStorThread::onJoin() after joining a bucket into another,
@@ -194,7 +190,7 @@ public:
* Fail all operations towards a single bucket currently queued to the
* given thread with the given error code.
*/
- void failOperations(const document::Bucket&, uint16_t fromDisk, const api::ReturnCode&);
+ void failOperations(const document::Bucket&, const api::ReturnCode&);
/**
* Add a new merge state to the registry.
@@ -224,7 +220,7 @@ public:
uint32_t getNumActiveMerges() const;
/// Provides the next stripe id for a certain disk.
- uint32_t getNextStripeId(uint32_t disk);
+ uint32_t getNextStripeId();
/** Removes the merge status for the given bucket. */
void clearMergeStatus(const document::Bucket&);
@@ -243,12 +239,11 @@ public:
/** Utility function to fetch total size of queue. */
uint32_t getQueueSize() const;
- uint32_t getQueueSize(uint16_t disk) const;
// Commands used by testing
void setGetNextMessageTimeout(vespalib::duration timeout);
- std::string dumpQueue(uint16_t disk) const;
+ std::string dumpQueue() const;
private:
std::unique_ptr<FileStorHandlerImpl> _impl;
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
index 518523be7a2..88b3a6bcafc 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
@@ -41,31 +41,21 @@ FileStorHandlerImpl::FileStorHandlerImpl(uint32_t numThreads, uint32_t numStripe
FileStorMetrics& metrics,
ServiceLayerComponentRegister& compReg)
: _component(compReg, "filestorhandlerimpl"),
- _diskInfo(),
+ _disk(*this, sender, numStripes),
_messageSender(sender),
_bucketIdFactory(_component.getBucketIdFactory()),
_getNextMessageTimeout(100ms),
_max_active_merges_per_stripe(per_stripe_merge_limit(numThreads, numStripes)),
_paused(false)
{
- _diskInfo.reserve(_component.getDiskCount());
- for (uint32_t i(0); i < _component.getDiskCount(); i++) {
- _diskInfo.emplace_back(*this, sender, numStripes);
- }
- for (uint32_t i=0; i<_diskInfo.size(); ++i) {
- _diskInfo[i].metrics = metrics.disks[i].get();
- assert(_diskInfo[i].metrics != nullptr);
- uint32_t j(0);
- for (Stripe & stripe : _diskInfo[i].getStripes()) {
- stripe.setMetrics(metrics.disks[i]->stripes[j++].get());
- }
+ _disk.metrics = metrics.disks[0].get();
+ assert(_disk.metrics != nullptr);
+ uint32_t j(0);
+ for (Stripe & stripe : _disk.getStripes()) {
+ stripe.setMetrics(metrics.disks[0]->stripes[j++].get());
}
- if (_diskInfo.size() == 0) {
- throw vespalib::IllegalArgumentException("No disks configured", VESPA_STRLOC);
- }
- // Add update hook, so we will get callbacks each 5 seconds to update
- // metrics.
+ // Add update hook, so we will get callbacks each 5 seconds to update metrics.
_component.registerMetricUpdateHook(*this, framework::SecondTime(5));
}
@@ -86,7 +76,7 @@ FileStorHandlerImpl::editMergeStatus(const document::Bucket& bucket)
{
std::lock_guard mlock(_mergeStatesLock);
MergeStatus::SP status = _mergeStates[bucket];
- if (status.get() == 0) {
+ if ( ! status ) {
throw vespalib::IllegalStateException("No merge state exist for " + bucket.toString(), VESPA_STRLOC);
}
return *status;
@@ -151,11 +141,9 @@ FileStorHandlerImpl::clearMergeStatus(const document::Bucket& bucket, const api:
void
FileStorHandlerImpl::flush(bool killPendingMerges)
{
- for (uint32_t i=0; i<_diskInfo.size(); ++i) {
- LOG(debug, "Wait until queues and bucket locks released for disk '%d'", i);
- _diskInfo[i].flush();
- LOG(debug, "All queues and bucket locks released for disk '%d'", i);
- }
+ LOG(debug, "Wait until queues and bucket locks released.");
+ _disk.flush();
+ LOG(debug, "All queues and bucket locks released.");
if (killPendingMerges) {
api::ReturnCode code(api::ReturnCode::ABORTED, "Storage node is shutting down");
@@ -194,53 +182,44 @@ FileStorHandlerImpl::reply(api::StorageMessage& msg, DiskState state) const
}
void
-FileStorHandlerImpl::setDiskState(uint16_t diskId, DiskState state)
+FileStorHandlerImpl::setDiskState(DiskState state)
{
- Disk& disk = _diskInfo[diskId];
// Mark disk closed
- disk.setState(state);
+ _disk.setState(state);
if (state != FileStorHandler::AVAILABLE) {
- disk.flush();
+ _disk.flush();
}
}
FileStorHandler::DiskState
-FileStorHandlerImpl::getDiskState(uint16_t disk) const
+FileStorHandlerImpl::getDiskState() const
{
- return _diskInfo[disk].getState();
+ return _disk.getState();
}
void
FileStorHandlerImpl::close()
{
- for (uint32_t i=0; i<_diskInfo.size(); ++i) {
- if (getDiskState(i) == FileStorHandler::AVAILABLE) {
- LOG(debug, "AVAILABLE -> CLOSED disk[%d]", i);
- setDiskState(i, FileStorHandler::CLOSED);
- }
- LOG(debug, "Closing disk[%d]", i);
- _diskInfo[i].broadcast();
- LOG(debug, "Closed disk[%d]", i);
+ if (getDiskState() == FileStorHandler::AVAILABLE) {
+ LOG(debug, "AVAILABLE -> CLOSED");
+ setDiskState(FileStorHandler::CLOSED);
}
+ LOG(debug, "Closing");
+ _disk.broadcast();
+ LOG(debug, "Closed");
}
uint32_t
FileStorHandlerImpl::getQueueSize() const
{
- size_t count = 0;
- for (const auto & disk : _diskInfo) {
- count += disk.getQueueSize();
- }
- return count;
+ return _disk.getQueueSize();
}
bool
-FileStorHandlerImpl::schedule(const std::shared_ptr<api::StorageMessage>& msg, uint16_t diskId)
+FileStorHandlerImpl::schedule(const std::shared_ptr<api::StorageMessage>& msg)
{
- assert(diskId < _diskInfo.size());
- Disk& disk(_diskInfo[diskId]);
- return disk.schedule(msg);
+ return _disk.schedule(msg);
}
bool
@@ -288,43 +267,37 @@ FileStorHandlerImpl::abortQueuedOperations(const AbortBucketOperationsCommand& c
{
// Do queue clearing and active operation waiting in two passes
// to allow disk threads to drain running operations in parallel.
- for (Disk & disk : _diskInfo) {
- abortQueuedCommandsForBuckets(disk, cmd);
- }
- for (Disk & disk : _diskInfo) {
- disk.waitInactive(cmd);
- }
+ abortQueuedCommandsForBuckets(_disk, cmd);
+ _disk.waitInactive(cmd);
}
void
FileStorHandlerImpl::updateMetrics(const MetricLockGuard &)
{
- for (Disk & disk : _diskInfo) {
- std::lock_guard lockGuard(_mergeStatesLock);
- disk.metrics->pendingMerges.addValue(_mergeStates.size());
- disk.metrics->queueSize.addValue(disk.getQueueSize());
-
- for (auto & entry : disk.metrics->averageQueueWaitingTime.getMetricMap()) {
- metrics::LoadType loadType(entry.first, "ignored");
- for (const auto & stripe : disk.metrics->stripes) {
- const auto & m = stripe->averageQueueWaitingTime[loadType];
- entry.second->addTotalValueWithCount(m.getTotal(), m.getCount());
- }
+ std::lock_guard lockGuard(_mergeStatesLock);
+ _disk.metrics->pendingMerges.addValue(_mergeStates.size());
+ _disk.metrics->queueSize.addValue(_disk.getQueueSize());
+
+ for (auto & entry : _disk.metrics->averageQueueWaitingTime.getMetricMap()) {
+ metrics::LoadType loadType(entry.first, "ignored");
+ for (const auto & stripe : _disk.metrics->stripes) {
+ const auto & m = stripe->averageQueueWaitingTime[loadType];
+ entry.second->addTotalValueWithCount(m.getTotal(), m.getCount());
}
}
}
uint32_t
-FileStorHandlerImpl::getNextStripeId(uint32_t disk) {
- return _diskInfo[disk].getNextStripeId();
+FileStorHandlerImpl::getNextStripeId() {
+ return _disk.getNextStripeId();
}
bool
-FileStorHandlerImpl::tryHandlePause(uint16_t disk) const
+FileStorHandlerImpl::tryHandlePause() const
{
if (isPaused()) {
// Wait a single time to see if filestor gets unpaused.
- if (!_diskInfo[disk].isClosed()) {
+ if (!_disk.isClosed()) {
std::unique_lock g(_pauseMonitor);
_pauseCond.wait_for(g, 100ms);
}
@@ -352,14 +325,13 @@ FileStorHandlerImpl::makeQueueTimeoutReply(api::StorageMessage& msg)
}
FileStorHandler::LockedMessage
-FileStorHandlerImpl::getNextMessage(uint16_t disk, uint32_t stripeId)
+FileStorHandlerImpl::getNextMessage(uint32_t stripeId)
{
- assert(disk < _diskInfo.size());
- if (!tryHandlePause(disk)) {
+ if (!tryHandlePause()) {
return {}; // Still paused, return to allow tick.
}
- return _diskInfo[disk].getNextMessage(stripeId, _getNextMessageTimeout);
+ return _disk.getNextMessage(stripeId, _getNextMessageTimeout);
}
std::shared_ptr<FileStorHandler::BucketLockInterface>
@@ -381,30 +353,14 @@ FileStorHandlerImpl::Stripe::lock(const document::Bucket &bucket, api::LockingRe
namespace {
struct MultiLockGuard {
using monitor_guard = FileStorHandlerImpl::monitor_guard;
- struct DiskAndStripe {
- uint16_t disk;
- uint16_t stripe;
-
- DiskAndStripe(uint16_t disk_, uint16_t stripe_) noexcept : disk(disk_), stripe(stripe_) {}
- bool operator==(const DiskAndStripe& rhs) const noexcept {
- return (disk == rhs.disk) && (stripe == rhs.stripe);
- }
- bool operator<(const DiskAndStripe& rhs) const noexcept {
- if (disk != rhs.disk) {
- return disk < rhs.disk;
- }
- return stripe < rhs.stripe;
- }
- };
-
- std::map<DiskAndStripe, std::mutex*> monitors;
+ std::map<uint16_t, std::mutex*> monitors;
std::vector<std::shared_ptr<monitor_guard>> guards;
MultiLockGuard() = default;
- void addLock(std::mutex & lock, uint16_t disk_index, uint16_t stripe_index) {
- monitors[DiskAndStripe(disk_index, stripe_index)] = & lock;
+ void addLock(std::mutex & lock, uint16_t stripe_index) {
+ monitors[stripe_index] = & lock;
}
void lock() {
for (auto & entry : monitors) {
@@ -476,7 +432,7 @@ splitOrJoin(FileStorHandlerImpl::Operation op) {
document::Bucket
FileStorHandlerImpl::remapMessage(api::StorageMessage& msg, const document::Bucket& source, Operation op,
- std::vector<RemapInfo*>& targets, uint16_t& targetDisk, api::ReturnCode& returnCode)
+ std::vector<RemapInfo*>& targets, api::ReturnCode& returnCode)
{
document::Bucket newBucket = source;
@@ -496,7 +452,6 @@ FileStorHandlerImpl::remapMessage(api::StorageMessage& msg, const document::Buck
if (idx > -1) {
cmd.remapBucketId(targets[idx]->bucket.getBucketId());
targets[idx]->foundInQueue = true;
- targetDisk = targets[idx]->diskIndex;
#if defined(ENABLE_BUCKET_OPERATION_LOGGING)
{
vespalib::string desc = vespalib::make_string(
@@ -539,7 +494,6 @@ FileStorHandlerImpl::remapMessage(api::StorageMessage& msg, const document::Buck
cmd.toString().c_str(), targets[0]->bucket.getBucketId().toString().c_str());
cmd.remapBucketId(targets[0]->bucket.getBucketId());
newBucket = targets[0]->bucket;
- targetDisk = targets[0]->diskIndex;
#ifdef ENABLE_BUCKET_OPERATION_LOGGING
{
vespalib::string desc = vespalib::make_string(
@@ -588,7 +542,6 @@ FileStorHandlerImpl::remapMessage(api::StorageMessage& msg, const document::Buck
api::BucketCommand& cmd(static_cast<api::BucketCommand&>(msg));
if (cmd.getBucket() == source) {
if (op == MOVE) {
- targetDisk = targets[0]->diskIndex;
} else if (op == SPLIT) {
returnCode = api::ReturnCode(api::ReturnCode::BUCKET_DELETED, "Bucket split while operation enqueued");
} else {
@@ -607,7 +560,6 @@ FileStorHandlerImpl::remapMessage(api::StorageMessage& msg, const document::Buck
api::BucketCommand& cmd(static_cast<api::BucketCommand&>(msg));
if (cmd.getBucket() == source) {
if (op == MOVE) {
- targetDisk = targets[0]->diskIndex;
} else {
returnCode = api::ReturnCode(api::ReturnCode::BUCKET_DELETED, splitOrJoin(op));
}
@@ -622,7 +574,6 @@ FileStorHandlerImpl::remapMessage(api::StorageMessage& msg, const document::Buck
api::BucketCommand& cmd(static_cast<api::BucketCommand&>(msg));
if (cmd.getBucket() == source) {
if (op == MOVE) {
- targetDisk = targets[0]->diskIndex;
}
}
break;
@@ -641,7 +592,6 @@ FileStorHandlerImpl::remapMessage(api::StorageMessage& msg, const document::Buck
// Fail with bucket not found if op != MOVE
if (bucket == source) {
if (op == MOVE) {
- targetDisk = targets[0]->diskIndex;
} else {
returnCode = api::ReturnCode(api::ReturnCode::BUCKET_DELETED, splitOrJoin(op));
}
@@ -653,7 +603,6 @@ FileStorHandlerImpl::remapMessage(api::StorageMessage& msg, const document::Buck
// Fail with bucket not found if op != MOVE
if (bucket == source) {
if (op == MOVE) {
- targetDisk = targets[0]->diskIndex;
} else {
returnCode = api::ReturnCode(api::ReturnCode::BUCKET_DELETED, splitOrJoin(op));
}
@@ -714,14 +663,13 @@ FileStorHandlerImpl::remapQueueNoLock(Disk& from, const RemapInfo& source,
// If set to something other than source.diskIndex, move this message
// to that queue.
MessageEntry& entry = entriesFound[i];
- uint16_t targetDisk = source.diskIndex;
// If not OK, reply to this message with the following message
api::ReturnCode returnCode(api::ReturnCode::OK);
api::StorageMessage& msg(*entry._command);
assert(entry._bucket == source.bucket);
- document::Bucket bucket = remapMessage(msg, source.bucket, op, targets, targetDisk, returnCode);
+ document::Bucket bucket = remapMessage(msg, source.bucket, op, targets, returnCode);
if (returnCode.getResult() != api::ReturnCode::OK) {
// Fail message if errorcode set
@@ -739,7 +687,7 @@ FileStorHandlerImpl::remapQueueNoLock(Disk& from, const RemapInfo& source,
assert(bucket == source.bucket || std::find_if(targets.begin(), targets.end(), [bucket](auto* e){
return e->bucket == bucket;
}) != targets.end());
- _diskInfo[targetDisk].stripe(bucket).exposeQueue().emplace_back(std::move(entry));
+ _disk.stripe(bucket).exposeQueue().emplace_back(std::move(entry));
}
}
@@ -752,12 +700,10 @@ FileStorHandlerImpl::remapQueue(const RemapInfo& source, RemapInfo& target, Oper
// the same bucket. Will fix order if we accept wrong order later.
MultiLockGuard guard;
- Disk& from(_diskInfo[source.diskIndex]);
- guard.addLock(from.stripe(source.bucket).exposeLock(), source.diskIndex, from.stripe_index(source.bucket));
+ guard.addLock(_disk.stripe(source.bucket).exposeLock(), _disk.stripe_index(source.bucket));
- Disk& to1(_diskInfo[target.diskIndex]);
if (target.bucket.getBucketId().getRawId() != 0) {
- guard.addLock(to1.stripe(target.bucket).exposeLock(), target.diskIndex, to1.stripe_index(target.bucket));
+ guard.addLock(_disk.stripe(target.bucket).exposeLock(), _disk.stripe_index(target.bucket));
}
std::vector<RemapInfo*> targets;
@@ -765,7 +711,7 @@ FileStorHandlerImpl::remapQueue(const RemapInfo& source, RemapInfo& target, Oper
guard.lock();
- remapQueueNoLock(from, source, targets, op);
+ remapQueueNoLock(_disk, source, targets, op);
}
void
@@ -775,17 +721,14 @@ FileStorHandlerImpl::remapQueue(const RemapInfo& source, RemapInfo& target1, Rem
// the same bucket. Will fix order if we accept wrong order later.
MultiLockGuard guard;
- Disk& from(_diskInfo[source.diskIndex]);
- guard.addLock(from.stripe(source.bucket).exposeLock(), source.diskIndex, from.stripe_index(source.bucket));
+ guard.addLock(_disk.stripe(source.bucket).exposeLock(), _disk.stripe_index(source.bucket));
- Disk& to1(_diskInfo[target1.diskIndex]);
if (target1.bucket.getBucketId().getRawId() != 0) {
- guard.addLock(to1.stripe(target1.bucket).exposeLock(), target1.diskIndex, to1.stripe_index(target1.bucket));
+ guard.addLock(_disk.stripe(target1.bucket).exposeLock(), _disk.stripe_index(target1.bucket));
}
- Disk& to2(_diskInfo[target2.diskIndex]);
if (target2.bucket.getBucketId().getRawId() != 0) {
- guard.addLock(to2.stripe(target2.bucket).exposeLock(), target2.diskIndex, to2.stripe_index(target2.bucket));
+ guard.addLock(_disk.stripe(target2.bucket).exposeLock(), _disk.stripe_index(target2.bucket));
}
guard.lock();
@@ -794,7 +737,7 @@ FileStorHandlerImpl::remapQueue(const RemapInfo& source, RemapInfo& target1, Rem
targets.push_back(&target1);
targets.push_back(&target2);
- remapQueueNoLock(from, source, targets, op);
+ remapQueueNoLock(_disk, source, targets, op);
}
void
@@ -1180,12 +1123,6 @@ FileStorHandlerImpl::Disk::getQueueSize() const noexcept
return sum;
}
-uint32_t
-FileStorHandlerImpl::getQueueSize(uint16_t disk) const
-{
- return _diskInfo[disk].getQueueSize();
-}
-
FileStorHandlerImpl::BucketLock::BucketLock(const monitor_guard & guard, Stripe& stripe,
const document::Bucket &bucket, uint8_t priority,
api::MessageType::Id msgType, api::StorageMessage::Id msgId,
@@ -1295,22 +1232,21 @@ FileStorHandlerImpl::getStatus(std::ostream& out, const framework::HttpUrlPath&
{
bool verbose = path.hasAttribute("verbose");
out << "<h1>Filestor handler</h1>\n";
- for (uint32_t i=0; i<_diskInfo.size(); ++i) {
- out << "<h2>Disk " << i << "</h2>\n";
- const Disk& disk(_diskInfo[i]);
- out << "Queue size: " << disk.getQueueSize() << "<br>\n";
- out << "Disk state: ";
- switch (disk.getState()) {
- case FileStorHandler::AVAILABLE: out << "AVAILABLE"; break;
- case FileStorHandler::DISABLED: out << "DISABLED"; break;
- case FileStorHandler::CLOSED: out << "CLOSED"; break;
- }
- out << "<h4>Active operations</h4>\n";
- disk.dumpActiveHtml(out);
- if (!verbose) continue;
+
+ out << "<h2>Disk " << "</h2>\n";
+ out << "Queue size: " << _disk.getQueueSize() << "<br>\n";
+ out << "Disk state: ";
+ switch (_disk.getState()) {
+ case FileStorHandler::AVAILABLE: out << "AVAILABLE"; break;
+ case FileStorHandler::DISABLED: out << "DISABLED"; break;
+ case FileStorHandler::CLOSED: out << "CLOSED"; break;
+ }
+ out << "<h4>Active operations</h4>\n";
+ _disk.dumpActiveHtml(out);
+ if (verbose) {
out << "<h4>Input queue</h4>\n";
out << "<ul>\n";
- disk.dumpQueueHtml(out);
+ _disk.dumpQueueHtml(out);
out << "</ul>\n";
}
@@ -1330,9 +1266,7 @@ FileStorHandlerImpl::getStatus(std::ostream& out, const framework::HttpUrlPath&
void
FileStorHandlerImpl::waitUntilNoLocks()
{
- for (const auto & disk : _diskInfo) {
- disk.waitUntilNoLocks();
- }
+ _disk.waitUntilNoLocks();
}
ResumeGuard
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
index 90b0e559899..f397b82d199 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
@@ -241,20 +241,20 @@ public:
void setGetNextMessageTimeout(vespalib::duration timeout) { _getNextMessageTimeout = timeout; }
void flush(bool killPendingMerges);
- void setDiskState(uint16_t disk, DiskState state);
- DiskState getDiskState(uint16_t disk) const;
+ void setDiskState(DiskState state);
+ DiskState getDiskState() const;
void close();
- bool schedule(const std::shared_ptr<api::StorageMessage>&, uint16_t disk);
+ bool schedule(const std::shared_ptr<api::StorageMessage>&);
- FileStorHandler::LockedMessage getNextMessage(uint16_t disk, uint32_t stripeId);
+ FileStorHandler::LockedMessage getNextMessage(uint32_t stripeId);
enum Operation { MOVE, SPLIT, JOIN };
void remapQueue(const RemapInfo& source, RemapInfo& target, Operation op);
void remapQueue(const RemapInfo& source, RemapInfo& target1, RemapInfo& target2, Operation op);
- void failOperations(const document::Bucket & bucket, uint16_t disk, const api::ReturnCode & code) {
- _diskInfo[disk].failOperations(bucket, code);
+ void failOperations(const document::Bucket & bucket, const api::ReturnCode & code) {
+ _disk.failOperations(bucket, code);
}
void sendCommand(const std::shared_ptr<api::StorageCommand>&) override;
void sendReply(const std::shared_ptr<api::StorageReply>&) override;
@@ -263,12 +263,11 @@ public:
void getStatus(std::ostream& out, const framework::HttpUrlPath& path) const;
uint32_t getQueueSize() const;
- uint32_t getQueueSize(uint16_t disk) const;
- uint32_t getNextStripeId(uint32_t disk);
+ uint32_t getNextStripeId();
std::shared_ptr<FileStorHandler::BucketLockInterface>
- lock(const document::Bucket & bucket, uint16_t disk, api::LockingRequirements lockReq) {
- return _diskInfo[disk].lock(bucket, lockReq);
+ lock(const document::Bucket & bucket, api::LockingRequirements lockReq) {
+ return _disk.lock(bucket, lockReq);
}
void addMergeStatus(const document::Bucket&, MergeStatus::SP);
@@ -277,8 +276,8 @@ public:
uint32_t getNumActiveMerges() const;
void clearMergeStatus(const document::Bucket&, const api::ReturnCode*);
- std::string dumpQueue(uint16_t disk) const {
- return _diskInfo[disk].dumpQueue();
+ std::string dumpQueue() const {
+ return _disk.dumpQueue();
}
ResumeGuard pause();
void resume() override;
@@ -286,7 +285,7 @@ public:
private:
ServiceLayerComponent _component;
- std::vector<Disk> _diskInfo;
+ Disk _disk;
MessageSender& _messageSender;
const document::BucketIdFactory& _bucketIdFactory;
mutable std::mutex _mergeStatesLock;
@@ -307,7 +306,7 @@ private:
* recheck pause status. Returns true if filestor isn't paused at the time
* of the first check or after the wait, false if it's still paused.
*/
- bool tryHandlePause(uint16_t disk) const;
+ bool tryHandlePause() const;
/**
* Checks whether the entire filestor layer is paused.
@@ -335,7 +334,7 @@ private:
document::Bucket
remapMessage(api::StorageMessage& msg, const document::Bucket &source, Operation op,
- std::vector<RemapInfo*>& targets, uint16_t& targetDisk, api::ReturnCode& returnCode);
+ std::vector<RemapInfo*>& targets, api::ReturnCode& returnCode);
void remapQueueNoLock(Disk& from, const RemapInfo& source, std::vector<RemapInfo*>& targets, Operation op);
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
index c4369a94161..88bfb18841c 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
@@ -37,7 +37,7 @@ FileStorManager(const config::ConfigUri & configUri,
_provider(&_providerErrorWrapper),
_bucketIdFactory(_component.getBucketIdFactory()),
_configUri(configUri),
- _disks(),
+ _threads(),
_bucketOwnershipNotifier(std::make_unique<BucketOwnershipNotifier>(_component, *this)),
_configFetcher(_configUri.getContext()),
_threadLockCheckInterval(60),
@@ -59,25 +59,21 @@ FileStorManager::~FileStorManager()
LOG(debug, "Deleting link %s. Giving filestor threads stop signal.",
toString().c_str());
- for (uint32_t i = 0; i < _disks.size(); ++i) {
- for (uint32_t j = 0; j < _disks[i].size(); ++j) {
- if (_disks[i][j].get() != 0) {
- _disks[i][j]->getThread().interrupt();
- }
+ for (const auto & thread : _threads) {
+ if (thread) {
+ thread->getThread().interrupt();
}
}
- for (uint32_t i = 0; i < _disks.size(); ++i) {
- for (uint32_t j = 0; j < _disks[i].size(); ++j) {
- if (_disks[i][j].get() != 0) {
- _disks[i][j]->getThread().join();
- }
+ for (const auto & thread : _threads) {
+ if (thread) {
+ thread->getThread().join();
}
}
LOG(debug, "Closing all filestor queues, answering queued messages. New messages will be refused.");
_filestorHandler->close();
LOG(debug, "Deleting filestor threads. Waiting for their current operation "
"to finish. Stop their threads and delete objects.");
- _disks.clear();
+ _threads.clear();
}
void
@@ -115,29 +111,27 @@ void
FileStorManager::configure(std::unique_ptr<vespa::config::content::StorFilestorConfig> config)
{
// If true, this is not the first configure.
- bool liveUpdate = (_disks.size() != 0);
+ bool liveUpdate = ! _threads.empty();
_threadLockCheckInterval = config->diskOperationTimeout;
_failDiskOnError = (config->failDiskAfterErrorCount > 0);
if (!liveUpdate) {
_config = std::move(config);
- _disks.resize(_component.getDiskCount());
+ assert(_component.getDiskCount() == 1);
size_t numThreads = _config->numThreads;
size_t numStripes = std::max(size_t(1u), numThreads / 2);
- _metrics->initDiskMetrics(_disks.size(), _component.getLoadTypes()->getMetricLoadTypes(), numStripes, numThreads);
+ _metrics->initDiskMetrics(1, _component.getLoadTypes()->getMetricLoadTypes(), numStripes, numThreads);
_filestorHandler = std::make_unique<FileStorHandler>(numThreads, numStripes, *this, *_metrics, _compReg);
uint32_t numResponseThreads = computeNumResponseThreads(_config->numResponseThreads);
if (numResponseThreads > 0) {
_sequencedExecutor = vespalib::SequencedTaskExecutor::create(numResponseThreads, 10000, selectSequencer(_config->responseSequencerType));
}
- for (uint32_t i=0; i<_component.getDiskCount(); ++i) {
- LOG(spam, "Setting up disk %u", i);
- for (uint32_t j = 0; j < numThreads; j++) {
- _disks[i].push_back(std::make_shared<PersistenceThread>(_sequencedExecutor.get(), _compReg, _configUri, *_provider,
- *_filestorHandler, *_metrics->disks[i]->threads[j], i));
- }
+ LOG(spam, "Setting up the disk");
+ for (uint32_t j = 0; j < numThreads; j++) {
+ _threads.push_back(std::make_shared<PersistenceThread>(_sequencedExecutor.get(), _compReg, _configUri, *_provider,
+ *_filestorHandler, *_metrics->disks[0]->threads[j]));
}
}
}
@@ -224,23 +218,23 @@ FileStorManager::mapOperationToBucketAndDisk(api::BucketCommand& cmd, const docu
}
bool
-FileStorManager::handlePersistenceMessage( const shared_ptr<api::StorageMessage>& msg, uint16_t disk)
+FileStorManager::handlePersistenceMessage( const shared_ptr<api::StorageMessage>& msg)
{
api::ReturnCode errorCode(api::ReturnCode::OK);
do {
- LOG(spam, "Received %s. Attempting to queue it to disk %u.", msg->getType().getName().c_str(), disk);
+ LOG(spam, "Received %s. Attempting to queue it.", msg->getType().getName().c_str());
LOG_BUCKET_OPERATION_NO_LOCK(
getStorageMessageBucket(*msg).getBucketId(),
- vespalib::make_string("Attempting to queue %s to disk %u", msg->toString().c_str(), disk));
+ vespalib::make_string("Attempting to queue %s", msg->toString().c_str()));
- if (_filestorHandler->schedule(msg, disk)) {
- LOG(spam, "Received persistence message %s. Queued it to disk %u",
- msg->getType().getName().c_str(), disk);
+ if (_filestorHandler->schedule(msg)) {
+ LOG(spam, "Received persistence message %s. Queued it to disk",
+ msg->getType().getName().c_str());
return true;
}
- switch (_filestorHandler->getDiskState(disk)) {
+ switch (_filestorHandler->getDiskState()) {
case FileStorHandler::DISABLED:
errorCode = api::ReturnCode(api::ReturnCode::DISK_FAILURE, "Disk disabled");
break;
@@ -277,7 +271,7 @@ FileStorManager::onPut(const shared_ptr<api::PutCommand>& cmd)
}
StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk(*cmd, &cmd->getDocumentId()));
if (entry.exist()) {
- handlePersistenceMessage(cmd, entry->disk);
+ handlePersistenceMessage(cmd);
}
return true;
}
@@ -296,7 +290,7 @@ FileStorManager::onUpdate(const shared_ptr<api::UpdateCommand>& cmd)
}
StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk(*cmd, &cmd->getDocumentId()));
if (entry.exist()) {
- handlePersistenceMessage(cmd, entry->disk);
+ handlePersistenceMessage(cmd);
}
return true;
}
@@ -306,7 +300,7 @@ FileStorManager::onGet(const shared_ptr<api::GetCommand>& cmd)
{
StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk(*cmd, &cmd->getDocumentId()));
if (entry.exist()) {
- handlePersistenceMessage(cmd, entry->disk);
+ handlePersistenceMessage(cmd);
}
return true;
}
@@ -325,7 +319,7 @@ FileStorManager::onRemove(const shared_ptr<api::RemoveCommand>& cmd)
}
StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk(*cmd, &cmd->getDocumentId()));
if (entry.exist()) {
- handlePersistenceMessage(cmd, entry->disk);
+ handlePersistenceMessage(cmd);
}
return true;
}
@@ -335,7 +329,7 @@ FileStorManager::onRevert(const shared_ptr<api::RevertCommand>& cmd)
{
StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk(*cmd, 0));
if (entry.exist()) {
- handlePersistenceMessage(cmd, entry->disk);
+ handlePersistenceMessage(cmd);
}
return true;
}
@@ -345,7 +339,7 @@ FileStorManager::onRemoveLocation(const std::shared_ptr<api::RemoveLocationComma
{
StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucket()));
if (entry.exist()) {
- handlePersistenceMessage(cmd, entry->disk);
+ handlePersistenceMessage(cmd);
}
return true;
}
@@ -355,7 +349,7 @@ FileStorManager::onStatBucket(const std::shared_ptr<api::StatBucketCommand>& cmd
{
StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucket()));
if (entry.exist()) {
- handlePersistenceMessage(cmd, entry->disk);
+ handlePersistenceMessage(cmd);
}
return true;
}
@@ -377,17 +371,14 @@ FileStorManager::onCreateBucket(
entry->getBucketInfo().toString().c_str());
code = api::ReturnCode(api::ReturnCode::EXISTS, "Bucket already exist");
} else {
- entry->disk = _component.getIdealPartition(cmd->getBucket());
// Newly created buckets are ready but not active, unless
// explicitly marked as such by the distributor.
- entry->setBucketInfo(api::BucketInfo(
- 0, 0, 0, 0, 0, true, cmd->getActive()));
+ entry->setBucketInfo(api::BucketInfo(0, 0, 0, 0, 0, true, cmd->getActive()));
cmd->setPriority(0);
- handlePersistenceMessage(cmd, entry->disk);
+ handlePersistenceMessage(cmd);
entry.write();
- LOG(debug, "Created bucket %s on disk %d (node index is %d)",
- cmd->getBucketId().toString().c_str(),
- entry->disk, _component.getIndex());
+ LOG(debug, "Created bucket %s (node index is %d)",
+ cmd->getBucketId().toString().c_str(), _component.getIndex());
return true;
}
}
@@ -401,7 +392,6 @@ FileStorManager::onCreateBucket(
bool
FileStorManager::onDeleteBucket(const shared_ptr<api::DeleteBucketCommand>& cmd)
{
- uint16_t disk;
{
document::Bucket bucket(cmd->getBucket());
StorBucketDatabase::WrappedEntry entry(_component.getBucketDatabase(bucket.getBucketSpace()).get(bucket.getBucketId(),
@@ -443,11 +433,10 @@ FileStorManager::onDeleteBucket(const shared_ptr<api::DeleteBucketCommand>& cmd)
// higher priority.
cmd->setPriority(0);
LOG(debug, "Deleting %s", cmd->getBucketId().toString().c_str());
- handlePersistenceMessage(cmd, entry->disk);
- disk = entry->disk;
+ handlePersistenceMessage(cmd);
entry.remove();
}
- _filestorHandler->failOperations(cmd->getBucket(), disk,
+ _filestorHandler->failOperations(cmd->getBucket(),
api::ReturnCode(api::ReturnCode::BUCKET_DELETED,
vespalib::make_string("Bucket %s about to be deleted anyway",
cmd->getBucketId().toString().c_str())));
@@ -486,41 +475,38 @@ FileStorManager::onMergeBucket(const shared_ptr<api::MergeBucketCommand>& cmd)
}
if (!entry.preExisted()) {
- entry->disk = _component.getIdealPartition(cmd->getBucket());
entry->info = api::BucketInfo(0, 0, 0, 0, 0, true, false);
- LOG(debug, "Created bucket %s on disk %d (node index is %d) due to merge being received.",
- cmd->getBucketId().toString().c_str(), entry->disk, _component.getIndex());
+ LOG(debug, "Created bucket %s (node index is %d) due to merge being received.",
+ cmd->getBucketId().toString().c_str(), _component.getIndex());
// Call before writing bucket entry as we need to have bucket
// lock while calling
- handlePersistenceMessage(cmd, entry->disk);
+ handlePersistenceMessage(cmd);
entry.write();
} else {
- handlePersistenceMessage(cmd, entry->disk);
+ handlePersistenceMessage(cmd);
}
return true;
}
bool
-FileStorManager::onGetBucketDiff(
- const shared_ptr<api::GetBucketDiffCommand>& cmd)
+FileStorManager::onGetBucketDiff(const shared_ptr<api::GetBucketDiffCommand>& cmd)
{
StorBucketDatabase::WrappedEntry entry(ensureConsistentBucket(cmd->getBucket(), *cmd, "FileStorManager::onGetBucketDiff"));
if (!entry.exist()) {
return true;
}
if (!entry.preExisted()) {
- entry->disk = _component.getIdealPartition(cmd->getBucket());
- LOG(debug, "Created bucket %s on disk %d (node index is %d) due to get bucket diff being received.",
- cmd->getBucketId().toString().c_str(), entry->disk, _component.getIndex());
+ LOG(debug, "Created bucket %s (node index is %d) due to get bucket diff being received.",
+ cmd->getBucketId().toString().c_str(), _component.getIndex());
entry->info.setTotalDocumentSize(0);
entry->info.setUsedFileSize(0);
entry->info.setReady(true);
// Call before writing bucket entry as we need to have bucket
// lock while calling
- handlePersistenceMessage(cmd, entry->disk);
+ handlePersistenceMessage(cmd);
entry.write();
} else {
- handlePersistenceMessage(cmd, entry->disk);
+ handlePersistenceMessage(cmd);
}
return true;
}
@@ -562,7 +548,7 @@ FileStorManager::onGetBucketDiffReply(const shared_ptr<api::GetBucketDiffReply>&
{
StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*reply, reply->getBucket()));
if (validateDiffReplyBucket(entry, reply->getBucket())) {
- handlePersistenceMessage(reply, entry->disk);
+ handlePersistenceMessage(reply);
}
return true;
}
@@ -572,7 +558,7 @@ FileStorManager::onApplyBucketDiff(const shared_ptr<api::ApplyBucketDiffCommand>
{
StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucket()));
if (validateApplyDiffCommandBucket(*cmd, entry)) {
- handlePersistenceMessage(cmd, entry->disk);
+ handlePersistenceMessage(cmd);
}
return true;
}
@@ -580,10 +566,9 @@ FileStorManager::onApplyBucketDiff(const shared_ptr<api::ApplyBucketDiffCommand>
bool
FileStorManager::onApplyBucketDiffReply(const shared_ptr<api::ApplyBucketDiffReply>& reply)
{
- StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(
- *reply, reply->getBucket()));
+ StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*reply, reply->getBucket()));
if (validateDiffReplyBucket(entry, reply->getBucket())) {
- handlePersistenceMessage(reply, entry->disk);
+ handlePersistenceMessage(reply);
}
return true;
}
@@ -594,13 +579,7 @@ FileStorManager::onJoinBuckets(const std::shared_ptr<api::JoinBucketsCommand>& c
document::Bucket bucket(cmd->getBucket());
StorBucketDatabase::WrappedEntry entry(_component.getBucketDatabase(bucket.getBucketSpace()).get(
bucket.getBucketId(), "FileStorManager::onJoinBuckets"));
- uint16_t disk;
- if (entry.exist()) {
- disk = entry->disk;
- } else {
- disk = _component.getPreferredAvailablePartition(bucket);
- }
- return handlePersistenceMessage(cmd, disk);
+ return handlePersistenceMessage(cmd);
}
bool
@@ -608,7 +587,7 @@ FileStorManager::onSplitBucket(const std::shared_ptr<api::SplitBucketCommand>& c
{
StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucket()));
if (entry.exist()) {
- handlePersistenceMessage(cmd, entry->disk);
+ handlePersistenceMessage(cmd);
}
return true;
}
@@ -618,7 +597,7 @@ FileStorManager::onSetBucketState(const std::shared_ptr<api::SetBucketStateComma
{
StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucket()));
if (entry.exist()) {
- handlePersistenceMessage(cmd, entry->disk);
+ handlePersistenceMessage(cmd);
}
return true;
}
@@ -632,7 +611,7 @@ FileStorManager::onInternal(const shared_ptr<api::InternalCommand>& msg)
shared_ptr<GetIterCommand> cmd(std::static_pointer_cast<GetIterCommand>(msg));
StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucket()));
if (entry.exist()) {
- handlePersistenceMessage(cmd, entry->disk);
+ handlePersistenceMessage(cmd);
}
return true;
}
@@ -641,7 +620,7 @@ FileStorManager::onInternal(const shared_ptr<api::InternalCommand>& msg)
shared_ptr<CreateIteratorCommand> cmd(std::static_pointer_cast<CreateIteratorCommand>(msg));
StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucket()));
if (entry.exist()) {
- handlePersistenceMessage(cmd, entry->disk);
+ handlePersistenceMessage(cmd);
}
return true;
}
@@ -656,7 +635,7 @@ FileStorManager::onInternal(const shared_ptr<api::InternalCommand>& msg)
case ReadBucketList::ID:
{
shared_ptr<ReadBucketList> cmd(std::static_pointer_cast<ReadBucketList>(msg));
- handlePersistenceMessage(cmd, cmd->getPartition());
+ handlePersistenceMessage(cmd);
return true;
}
case ReadBucketInfo::ID:
@@ -664,7 +643,7 @@ FileStorManager::onInternal(const shared_ptr<api::InternalCommand>& msg)
shared_ptr<ReadBucketInfo> cmd(std::static_pointer_cast<ReadBucketInfo>(msg));
StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucket()));
if (entry.exist()) {
- handlePersistenceMessage(cmd, entry->disk);
+ handlePersistenceMessage(cmd);
}
return true;
}
@@ -673,7 +652,7 @@ FileStorManager::onInternal(const shared_ptr<api::InternalCommand>& msg)
shared_ptr<InternalBucketJoinCommand> cmd(std::static_pointer_cast<InternalBucketJoinCommand>(msg));
StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucket()));
if (entry.exist()) {
- handlePersistenceMessage(cmd, entry->disk);
+ handlePersistenceMessage(cmd);
}
return true;
}
@@ -682,7 +661,7 @@ FileStorManager::onInternal(const shared_ptr<api::InternalCommand>& msg)
shared_ptr<RecheckBucketInfoCommand> cmd(std::static_pointer_cast<RecheckBucketInfoCommand>(msg));
StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucket()));
if (entry.exist()) {
- handlePersistenceMessage(cmd, entry->disk);
+ handlePersistenceMessage(cmd);
}
return true;
}
@@ -748,7 +727,7 @@ FileStorManager::sendReplyDirectly(const std::shared_ptr<api::StorageReply>& rep
if (reply->getType() == api::MessageType::INTERNAL_REPLY) {
std::shared_ptr<api::InternalReply> rep(std::dynamic_pointer_cast<api::InternalReply>(reply));
- assert(rep.get());
+ assert(rep);
if (onInternalReply(rep)) return;
}
sendUp(reply);
@@ -780,12 +759,10 @@ void FileStorManager::onFlush(bool downwards)
LOG(debug, "Start Flushing");
_filestorHandler->flush(!downwards);
LOG(debug, "Flushed _filestorHandler->flush(!downwards);");
- for (uint32_t i = 0; i < _disks.size(); ++i) {
- for (uint32_t j = 0; j < _disks[i].size(); ++j) {
- if (_disks[i][j]) {
- _disks[i][j]->flush();
- LOG(debug, "flushed disk[%d][%d]", i, j);
- }
+ for (const auto & thread : _threads) {
+ if (thread) {
+ thread->flush();
+ LOG(debug, "flushed thread[%s]", thread->getThread().getId().c_str());
}
}
uint32_t queueSize = _filestorHandler->getQueueSize();
@@ -822,9 +799,7 @@ FileStorManager::reportHtmlStatus(std::ostream& out, const framework::HttpUrlPat
out << "\">" << (verbose ? "Less verbose" : "More verbose") << "</a>\n"
<< " ]</font><br><br>\n";
- if (_disks.size()) {
- out << "<p>Using " << _disks[0].size() << " threads per disk</p>\n";
- }
+ out << "<p>Using " << _threads.size() << " threads</p>\n";
_filestorHandler->getStatus(out, path);
}
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.h b/storage/src/vespa/storage/persistence/filestorage/filestormanager.h
index 6efd30419b8..54bfd927b18 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.h
@@ -56,8 +56,7 @@ class FileStorManager : public StorageLinkQueued,
const document::BucketIdFactory& _bucketIdFactory;
config::ConfigUri _configUri;
- typedef std::vector<DiskThread::SP> DiskThreads;
- std::vector<DiskThreads> _disks;
+ std::vector<DiskThread::SP> _threads;
std::unique_ptr<BucketOwnershipNotifier> _bucketOwnershipNotifier;
std::unique_ptr<vespa::config::content::StorFilestorConfig> _config;
@@ -114,7 +113,7 @@ private:
StorBucketDatabase::WrappedEntry mapOperationToDisk(api::StorageMessage&, const document::Bucket&);
StorBucketDatabase::WrappedEntry mapOperationToBucketAndDisk(api::BucketCommand&, const document::DocumentId*);
- bool handlePersistenceMessage(const std::shared_ptr<api::StorageMessage>&, uint16_t disk);
+ bool handlePersistenceMessage(const std::shared_ptr<api::StorageMessage>&);
// Document operations
bool onPut(const std::shared_ptr<api::PutCommand>&) override;
diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp
index 4e1076d9214..acbbe782099 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.cpp
+++ b/storage/src/vespa/storage/persistence/mergehandler.cpp
@@ -848,7 +848,6 @@ MergeHandler::handleMergeBucket(api::MergeBucketCommand& cmd, MessageTracker::UP
{
tracker->setMetric(_env._metrics.mergeBuckets);
- assert(_env._partition == 0u);
spi::Bucket bucket(cmd.getBucket());
LOG(debug, "MergeBucket(%s) with max timestamp %" PRIu64 ".",
bucket.toString().c_str(), cmd.getMaxTimestamp());
@@ -1059,7 +1058,6 @@ MessageTracker::UP
MergeHandler::handleGetBucketDiff(api::GetBucketDiffCommand& cmd, MessageTracker::UP tracker)
{
tracker->setMetric(_env._metrics.getBucketDiff);
- assert(_env._partition == 0u);
spi::Bucket bucket(cmd.getBucket());
LOG(debug, "GetBucketDiff(%s)", bucket.toString().c_str());
checkResult(_spi.createBucket(bucket, tracker->context()), bucket, "create bucket");
@@ -1171,7 +1169,6 @@ void
MergeHandler::handleGetBucketDiffReply(api::GetBucketDiffReply& reply, MessageSender& sender)
{
_env._metrics.getBucketDiffReply.inc();
- assert(_env._partition == 0u);
spi::Bucket bucket(reply.getBucket());
LOG(debug, "GetBucketDiffReply(%s)", bucket.toString().c_str());
@@ -1246,7 +1243,6 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTra
{
tracker->setMetric(_env._metrics.applyBucketDiff);
- assert(_env._partition == 0u);
spi::Bucket bucket(cmd.getBucket());
LOG(debug, "%s", cmd.toString().c_str());
@@ -1334,7 +1330,6 @@ void
MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,MessageSender& sender)
{
_env._metrics.applyBucketDiffReply.inc();
- assert(_env._partition == 0u);
spi::Bucket bucket(reply.getBucket());
std::vector<api::ApplyBucketDiffCommand::Entry>& diff(reply.getDiff());
LOG(debug, "%s", reply.toString().c_str());
diff --git a/storage/src/vespa/storage/persistence/persistencethread.cpp b/storage/src/vespa/storage/persistence/persistencethread.cpp
index da0f06bf662..1e0bcac28fa 100644
--- a/storage/src/vespa/storage/persistence/persistencethread.cpp
+++ b/storage/src/vespa/storage/persistence/persistencethread.cpp
@@ -99,10 +99,9 @@ PersistenceThread::PersistenceThread(vespalib::ISequencedTaskExecutor * sequence
const config::ConfigUri & configUri,
spi::PersistenceProvider& provider,
FileStorHandler& filestorHandler,
- FileStorThreadMetrics& metrics,
- uint16_t deviceIndex)
- : _stripeId(filestorHandler.getNextStripeId(deviceIndex)),
- _env(configUri, compReg, filestorHandler, metrics, deviceIndex, provider),
+ FileStorThreadMetrics& metrics)
+ : _stripeId(filestorHandler.getNextStripeId()),
+ _env(configUri, compReg, filestorHandler, metrics, provider),
_sequencedExecutor(sequencedExecutor),
_spi(provider),
_processAllHandler(_env, provider),
@@ -110,7 +109,7 @@ PersistenceThread::PersistenceThread(vespalib::ISequencedTaskExecutor * sequence
_bucketOwnershipNotifier()
{
std::ostringstream threadName;
- threadName << "Disk " << _env._partition << " thread " << _stripeId;
+ threadName << "Thread " << _stripeId;
_component = std::make_unique<ServiceLayerComponent>(compReg, threadName.str());
_bucketOwnershipNotifier = std::make_unique<BucketOwnershipNotifier>(*_component, filestorHandler);
_thread = _component->startThread(*this, 60s, 1s);
@@ -137,7 +136,6 @@ PersistenceThread::getBucket(const DocumentId& id, const document::Bucket &bucke
+ "bucket " + bucket.getBucketId().toString() + ".", VESPA_STRLOC);
}
- assert(_env._partition == 0u);
return spi::Bucket(bucket);
}
@@ -329,7 +327,6 @@ MessageTracker::UP
PersistenceThread::handleRevert(api::RevertCommand& cmd, MessageTracker::UP tracker)
{
tracker->setMetric(_env._metrics.revert[cmd.getLoadType()]);
- assert(_env._partition == 0u);
spi::Bucket b = spi::Bucket(cmd.getBucket());
const std::vector<api::Timestamp> & tokens = cmd.getRevertTokens();
for (const api::Timestamp & token : tokens) {
@@ -347,7 +344,6 @@ PersistenceThread::handleCreateBucket(api::CreateBucketCommand& cmd, MessageTrac
LOG(warning, "Bucket %s was merging at create time. Unexpected.", cmd.getBucketId().toString().c_str());
DUMP_LOGGED_BUCKET_OPERATIONS(cmd.getBucketId());
}
- assert(_env._partition == 0u);
spi::Bucket spiBucket(cmd.getBucket());
_spi.createBucket(spiBucket, tracker->context());
if (cmd.getActive()) {
@@ -407,7 +403,6 @@ PersistenceThread::handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTrac
_env._fileStorHandler.clearMergeStatus(cmd.getBucket(),
api::ReturnCode(api::ReturnCode::ABORTED, "Bucket was deleted during the merge"));
}
- assert(_env._partition == 0u);
spi::Bucket bucket(cmd.getBucket());
if (!checkProviderBucketInfoMatches(bucket, cmd.getBucketInfo())) {
return tracker;
@@ -486,7 +481,6 @@ PersistenceThread::handleCreateIterator(CreateIteratorCommand& cmd, MessageTrack
if ( ! fieldSet) { return tracker; }
tracker->context().setReadConsistency(cmd.getReadConsistency());
- assert(_env._partition == 0u);
spi::CreateIteratorResult result(_spi.createIterator(
spi::Bucket(cmd.getBucket()),
std::move(fieldSet), cmd.getSelection(), cmd.getIncludedVersions(), tracker->context()));
@@ -514,7 +508,6 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd, MessageTracke
return tracker;
}
- assert(_env._partition == 0u);
spi::Bucket spiBucket(cmd.getBucket());
SplitBitDetector::Result targetInfo;
if (_env._config.enableMultibitSplitOptimalization) {
@@ -556,8 +549,6 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd, MessageTracke
}
}
#endif
- assert(lock1.disk == 0u);
- assert(lock2.disk == 0u);
spi::Result result = _spi.split(spiBucket, spi::Bucket(target1),
spi::Bucket(target2), tracker->context());
if (result.hasError()) {
@@ -576,14 +567,12 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd, MessageTracke
std::vector<TargetInfo> targets;
for (uint32_t i = 0; i < 2; i++) {
const document::Bucket &target(i == 0 ? target1 : target2);
- uint16_t disk(i == 0 ? lock1.disk : lock2.disk);
assert(target.getBucketId().getRawId() != 0);
targets.emplace_back(_env.getBucketDatabase(target.getBucketSpace()).get(
target.getBucketId(), "PersistenceThread::handleSplitBucket - Target",
StorBucketDatabase::CREATE_IF_NONEXISTING),
- FileStorHandler::RemapInfo(target, disk));
- targets.back().first->setBucketInfo(_env.getBucketInfo(target, disk));
- targets.back().first->disk = disk;
+ FileStorHandler::RemapInfo(target));
+ targets.back().first->setBucketInfo(_env.getBucketInfo(target));
}
if (LOG_WOULD_LOG(spam)) {
api::BucketInfo targ1(targets[0].first->getBucketInfo());
@@ -596,7 +585,7 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd, MessageTracke
target2.getBucketId().toString().c_str(),
targ2.getMetaCount());
}
- FileStorHandler::RemapInfo source(cmd.getBucket(), _env._partition);
+ FileStorHandler::RemapInfo source(cmd.getBucket());
_env._fileStorHandler.remapQueueAfterSplit(source, targets[0].second, targets[1].second);
bool ownershipChanged(!_bucketOwnershipNotifier->distributorOwns(cmd.getSourceIndex(), cmd.getBucket()));
// Now release all the bucketdb locks.
@@ -613,7 +602,6 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd, MessageTracke
// Must make sure target bucket exists when we have pending ops
// to an empty target bucket, since the provider will have
// implicitly erased it by this point.
- assert(target.second.diskIndex == 0u);
spi::Bucket createTarget(spi::Bucket(target.second.bucket));
LOG(debug, "Split target %s was empty, but re-creating it since there are remapped operations queued to it",
createTarget.toString().c_str());
@@ -679,7 +667,6 @@ PersistenceThread::handleJoinBuckets(api::JoinBucketsCommand& cmd, MessageTracke
StorBucketDatabase::WrappedEntry entry =
_env.getBucketDatabase(destBucket.getBucketSpace()).get(destBucket.getBucketId(), "join",
StorBucketDatabase::CREATE_IF_NONEXISTING);
- entry->disk = _env._partition;
entry.write();
}
@@ -705,9 +692,6 @@ PersistenceThread::handleJoinBuckets(api::JoinBucketsCommand& cmd, MessageTracke
}
}
#endif
- assert(lock1.disk == 0u);
- assert(lock2.disk == 0u);
- assert(_env._partition == 0u);
spi::Result result =
_spi.join(spi::Bucket(firstBucket),
spi::Bucket(secondBucket),
@@ -719,9 +703,8 @@ PersistenceThread::handleJoinBuckets(api::JoinBucketsCommand& cmd, MessageTracke
uint64_t lastModified = 0;
for (uint32_t i = 0; i < cmd.getSourceBuckets().size(); i++) {
document::Bucket srcBucket(destBucket.getBucketSpace(), cmd.getSourceBuckets()[i]);
- uint16_t disk = (i == 0) ? lock1.disk : lock2.disk;
- FileStorHandler::RemapInfo target(cmd.getBucket(), _env._partition);
- _env._fileStorHandler.remapQueueAfterJoin(FileStorHandler::RemapInfo(srcBucket, disk), target);
+ FileStorHandler::RemapInfo target(cmd.getBucket());
+ _env._fileStorHandler.remapQueueAfterJoin(FileStorHandler::RemapInfo(srcBucket), target);
// Remove source from bucket db.
StorBucketDatabase::WrappedEntry entry =
_env.getBucketDatabase(srcBucket.getBucketSpace()).get(srcBucket.getBucketId(), "join-remove-source");
@@ -749,7 +732,6 @@ PersistenceThread::handleSetBucketState(api::SetBucketStateCommand& cmd, Message
NotificationGuard notifyGuard(*_bucketOwnershipNotifier);
LOG(debug, "handleSetBucketState(): %s", cmd.toString().c_str());
- assert(_env._partition == 0u);
spi::Bucket bucket(cmd.getBucket());
bool shouldBeActive(cmd.getState() == api::SetBucketStateCommand::ACTIVE);
spi::BucketInfo::ActiveState newState(shouldBeActive ? spi::BucketInfo::ACTIVE : spi::BucketInfo::NOT_ACTIVE);
@@ -785,7 +767,6 @@ PersistenceThread::handleInternalBucketJoin(InternalBucketJoinCommand& cmd, Mess
_env.getBucketDatabase(destBucket.getBucketSpace()).get(
destBucket.getBucketId(), "join", StorBucketDatabase::CREATE_IF_NONEXISTING);
- entry->disk = _env._partition;
entry.write();
}
assert(cmd.getDiskOfInstanceToJoin() == 0u);
@@ -939,7 +920,7 @@ PersistenceThread::processMessage(api::StorageMessage& msg, MessageTracker::UP t
void
PersistenceThread::processLockedMessage(FileStorHandler::LockedMessage lock) {
- LOG(debug, "Partition %d, nodeIndex %d, ptr=%p", _env._partition, _env._nodeIndex, lock.second.get());
+ LOG(debug, "NodeIndex %d, ptr=%p", _env._nodeIndex, lock.second.get());
api::StorageMessage & msg(*lock.second);
// Important: we _copy_ the message shared_ptr instead of moving to ensure that `msg` remains
@@ -956,10 +937,10 @@ PersistenceThread::run(framework::ThreadHandle& thread)
{
LOG(debug, "Started persistence thread");
- while (!thread.interrupted() && !_env._fileStorHandler.closed(_env._partition)) {
+ while (!thread.interrupted() && !_env._fileStorHandler.closed()) {
thread.registerTick();
- FileStorHandler::LockedMessage lock(_env._fileStorHandler.getNextMessage(_env._partition, _stripeId));
+ FileStorHandler::LockedMessage lock(_env._fileStorHandler.getNextMessage(_stripeId));
if (lock.first) {
processLockedMessage(std::move(lock));
diff --git a/storage/src/vespa/storage/persistence/persistencethread.h b/storage/src/vespa/storage/persistence/persistencethread.h
index ddc1a0ac217..d2216d6fb5e 100644
--- a/storage/src/vespa/storage/persistence/persistencethread.h
+++ b/storage/src/vespa/storage/persistence/persistencethread.h
@@ -22,7 +22,7 @@ class PersistenceThread final : public DiskThread, public Types
public:
PersistenceThread(vespalib::ISequencedTaskExecutor *, ServiceLayerComponentRegister&,
const config::ConfigUri & configUri, spi::PersistenceProvider& provider,
- FileStorHandler& filestorHandler, FileStorThreadMetrics& metrics, uint16_t deviceIndex);
+ FileStorHandler& filestorHandler, FileStorThreadMetrics& metrics);
~PersistenceThread() override;
/** Waits for current operation to be finished. */
diff --git a/storage/src/vespa/storage/persistence/persistenceutil.cpp b/storage/src/vespa/storage/persistence/persistenceutil.cpp
index 0d9a4a06cee..24d91253358 100644
--- a/storage/src/vespa/storage/persistence/persistenceutil.cpp
+++ b/storage/src/vespa/storage/persistence/persistenceutil.cpp
@@ -89,11 +89,11 @@ MessageTracker::sendReply() {
vespalib::duration duration = vespalib::from_s(_timer.getElapsedTimeAsDouble()/1000.0);
if (duration >= WARN_ON_SLOW_OPERATIONS) {
LOGBT(warning, _msg->getType().toString(),
- "Slow processing of message %s on disk %u. Processing time: %1.1f s (>=%1.1f s)",
- _msg->toString().c_str(), _env._partition, vespalib::to_s(duration), vespalib::to_s(WARN_ON_SLOW_OPERATIONS));
+ "Slow processing of message %s. Processing time: %1.1f s (>=%1.1f s)",
+ _msg->toString().c_str(), vespalib::to_s(duration), vespalib::to_s(WARN_ON_SLOW_OPERATIONS));
} else {
- LOGBT(spam, _msg->getType().toString(), "Processing time of message %s on disk %u: %1.1f s",
- _msg->toString(true).c_str(), _env._partition, vespalib::to_s(duration));
+ LOGBT(spam, _msg->getType().toString(), "Processing time of message %s: %1.1f s",
+ _msg->toString(true).c_str(), vespalib::to_s(duration));
}
if (hasReply()) {
if ( ! _context.getTrace().getRoot().isEmpty()) {
@@ -166,13 +166,11 @@ PersistenceUtil::PersistenceUtil(
ServiceLayerComponentRegister& compReg,
FileStorHandler& fileStorHandler,
FileStorThreadMetrics& metrics,
- uint16_t partition,
spi::PersistenceProvider& provider)
: _config(*config::ConfigGetter<vespa::config::content::StorFilestorConfig>::getConfig(configUri.getConfigId(), configUri.getContext())),
_compReg(compReg),
_component(compReg, generateName(this)),
_fileStorHandler(fileStorHandler),
- _partition(partition),
_nodeIndex(_component.getIndex()),
_metrics(metrics),
_bucketFactory(_component.getBucketIdFactory()),
@@ -220,24 +218,18 @@ PersistenceUtil::lockAndGetDisk(const document::Bucket &bucket,
// the bucket DB again to verify that the bucket is still on that
// disk after locking it, or we will have to retry on new disk.
LockResult result;
- result.disk = getPreferredAvailableDisk(bucket);
while (true) {
// This function is only called in a context where we require exclusive
// locking (split/join). Refactor if this no longer the case.
std::shared_ptr<FileStorHandler::BucketLockInterface> lock(
- _fileStorHandler.lock(bucket, result.disk, api::LockingRequirements::Exclusive));
+ _fileStorHandler.lock(bucket, api::LockingRequirements::Exclusive));
// TODO disks are no longer used in practice, can we safely discard this?
// Might need it for synchronization purposes if something has taken the
// disk lock _and_ the bucket lock...?
StorBucketDatabase::WrappedEntry entry(getBucketDatabase(bucket.getBucketSpace()).get(
bucket.getBucketId(), "join-lockAndGetDisk-1", flags));
- if (entry.exist() && entry->disk != result.disk) {
- result.disk = entry->disk;
- continue;
- }
-
result.lock = lock;
return result;
}
@@ -246,7 +238,7 @@ PersistenceUtil::lockAndGetDisk(const document::Bucket &bucket,
void
PersistenceUtil::setBucketInfo(MessageTracker& tracker, const document::Bucket &bucket)
{
- api::BucketInfo info = getBucketInfo(bucket, _partition);
+ api::BucketInfo info = getBucketInfo(bucket);
static_cast<api::BucketInfoReply&>(tracker.getReply()).setBucketInfo(info);
@@ -254,13 +246,8 @@ PersistenceUtil::setBucketInfo(MessageTracker& tracker, const document::Bucket &
}
api::BucketInfo
-PersistenceUtil::getBucketInfo(const document::Bucket &bucket, int disk) const
+PersistenceUtil::getBucketInfo(const document::Bucket &bucket) const
{
- if (disk == -1) {
- disk = _partition;
- }
-
- assert(disk == 0u);
spi::BucketInfoResult response = _spi.getBucketInfo(spi::Bucket(bucket));
return convertBucketInfo(response.getBucketInfo());
diff --git a/storage/src/vespa/storage/persistence/persistenceutil.h b/storage/src/vespa/storage/persistence/persistenceutil.h
index 51eb2b4b590..fe475d96077 100644
--- a/storage/src/vespa/storage/persistence/persistenceutil.h
+++ b/storage/src/vespa/storage/persistence/persistenceutil.h
@@ -99,7 +99,6 @@ struct PersistenceUtil {
ServiceLayerComponentRegister &_compReg;
ServiceLayerComponent _component;
FileStorHandler &_fileStorHandler;
- uint16_t _partition;
uint16_t _nodeIndex;
FileStorThreadMetrics &_metrics;
const document::BucketIdFactory &_bucketFactory;
@@ -111,7 +110,6 @@ struct PersistenceUtil {
ServiceLayerComponentRegister&,
FileStorHandler& fileStorHandler,
FileStorThreadMetrics& metrics,
- uint16_t partition,
spi::PersistenceProvider& provider);
~PersistenceUtil();
@@ -127,18 +125,16 @@ struct PersistenceUtil {
/** Lock the given bucket in the file stor handler. */
struct LockResult {
std::shared_ptr<FileStorHandler::BucketLockInterface> lock;
- uint16_t disk;
+ LockResult() : lock() {}
- LockResult() : lock(), disk(0) {}
-
- bool bucketExisted() const { return (lock.get() != 0); }
+ bool bucketExisted() const { return bool(lock); }
};
LockResult lockAndGetDisk(
const document::Bucket &bucket,
StorBucketDatabase::Flag flags = StorBucketDatabase::NONE);
- api::BucketInfo getBucketInfo(const document::Bucket &bucket, int disk = -1) const;
+ api::BucketInfo getBucketInfo(const document::Bucket &bucket) const;
api::BucketInfo convertBucketInfo(const spi::BucketInfo&) const;
diff --git a/storage/src/vespa/storage/persistence/processallhandler.cpp b/storage/src/vespa/storage/persistence/processallhandler.cpp
index cbc5c0fc6dd..15f7c1fffb7 100644
--- a/storage/src/vespa/storage/persistence/processallhandler.cpp
+++ b/storage/src/vespa/storage/persistence/processallhandler.cpp
@@ -88,7 +88,6 @@ ProcessAllHandler::handleRemoveLocation(api::RemoveLocationCommand& cmd, Message
cmd.getBucketId().toString().c_str(),
cmd.getDocumentSelection().c_str());
- assert(_env._partition == 0u);
spi::Bucket bucket(cmd.getBucket());
UnrevertableRemoveEntryProcessor processor(_spi, bucket, tracker->context());
BucketProcessor::iterateAll(_spi, bucket, cmd.getDocumentSelection(),
@@ -104,11 +103,8 @@ ProcessAllHandler::handleStatBucket(api::StatBucketCommand& cmd, MessageTracker:
{
tracker->setMetric(_env._metrics.statBucket[cmd.getLoadType()]);
std::ostringstream ost;
+ ost << "Persistence bucket " << cmd.getBucketId() << "\n";
- ost << "Persistence bucket " << cmd.getBucketId()
- << ", partition " << _env._partition << "\n";
-
- assert(_env._partition == 0u);
spi::Bucket bucket(cmd.getBucket());
StatEntryProcessor processor(ost);
BucketProcessor::iterateAll(_spi, bucket, cmd.getDocumentSelection(),
diff --git a/storage/src/vespa/storage/persistence/testandsethelper.cpp b/storage/src/vespa/storage/persistence/testandsethelper.cpp
index ea3ad3c6cb3..523f5a52885 100644
--- a/storage/src/vespa/storage/persistence/testandsethelper.cpp
+++ b/storage/src/vespa/storage/persistence/testandsethelper.cpp
@@ -71,8 +71,8 @@ TestAndSetHelper::retrieveAndMatch(spi::Context & context) {
auto docPtr = result.getDocumentPtr();
if (_docSelectionUp->contains(*docPtr) != document::select::Result::True) {
return api::ReturnCode(api::ReturnCode::TEST_AND_SET_CONDITION_FAILED,
- vespalib::make_string("Condition did not match document partition=%d, nodeIndex=%d bucket=%" PRIx64 " %s",
- _thread._env._partition, _thread._env._nodeIndex, _cmd.getBucketId().getRawId(),
+ vespalib::make_string("Condition did not match document nodeIndex=%d bucket=%" PRIx64 " %s",
+ _thread._env._nodeIndex, _cmd.getBucketId().getRawId(),
_cmd.hasBeenRemapped() ? "remapped" : ""));
}
@@ -83,8 +83,8 @@ TestAndSetHelper::retrieveAndMatch(spi::Context & context) {
}
return api::ReturnCode(api::ReturnCode::TEST_AND_SET_CONDITION_FAILED,
- vespalib::make_string("Document does not exist partition=%d, nodeIndex=%d bucket=%" PRIx64 " %s",
- _thread._env._partition, _thread._env._nodeIndex, _cmd.getBucketId().getRawId(),
+ vespalib::make_string("Document does not exist nodeIndex=%d bucket=%" PRIx64 " %s",
+ _thread._env._nodeIndex, _cmd.getBucketId().getRawId(),
_cmd.hasBeenRemapped() ? "remapped" : ""));
}