summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/common/testhelper.cpp20
-rw-r--r--storage/src/tests/persistence/filestorage/filestormanagertest.cpp339
-rw-r--r--storage/src/tests/persistence/persistencequeuetest.cpp4
-rw-r--r--storage/src/tests/persistence/persistencetestutils.cpp12
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp13
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandler.h23
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp124
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h19
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp13
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/pausehandler.h34
-rw-r--r--storage/src/vespa/storage/persistence/persistencethread.cpp11
-rw-r--r--storage/src/vespa/storage/persistence/persistenceutil.cpp10
-rw-r--r--storage/src/vespa/storage/persistence/persistenceutil.h4
-rw-r--r--storage/src/vespa/storage/storageserver/storagenode.h10
14 files changed, 93 insertions, 543 deletions
diff --git a/storage/src/tests/common/testhelper.cpp b/storage/src/tests/common/testhelper.cpp
index 296a9a3cc0f..8cfa403539a 100644
--- a/storage/src/tests/common/testhelper.cpp
+++ b/storage/src/tests/common/testhelper.cpp
@@ -9,23 +9,6 @@ LOG_SETUP(".testhelper");
namespace storage {
-namespace {
- bool useNewStorageCore() {
- if ( // Unit test directory
- vespalib::fileExists("use_new_storage_core") ||
- // src/cpp directory
- vespalib::fileExists("../use_new_storage_core") ||
- // Top build directory where storage-HEAD remains
- vespalib::fileExists("../../../../use_new_storage_core"))
- {
- std::cerr << "Using new storage core for unit tests\n";
- return true;
- }
- return false;
- }
- bool newStorageCore(useNewStorageCore());
-}
-
void addStorageDistributionConfig(vdstestlib::DirConfig& dc)
{
vdstestlib::DirConfig::Config* config;
@@ -128,14 +111,11 @@ vdstestlib::DirConfig getStandardConfig(bool storagenode, const std::string & ro
config->set("threads[0].lowestpri 255");
config->set("dir_spread", "4");
config->set("dir_levels", "0");
- config->set("use_new_core", newStorageCore ? "true" : "false");
config->set("maximum_versions_of_single_document_stored", "0");
//config->set("enable_slotfile_cache", "false");
// Unit tests typically use fake low time values, so don't complain
// about them or compact/delete them by default. Override in tests testing that
// behavior
- config->set("time_future_limit", "5");
- config->set("time_past_limit", "2000000000");
config->set("keep_remove_time_period", "2000000000");
config->set("revert_time_period", "2000000000");
// Don't want test to call exit()
diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
index 318881364d7..26bfeb41b42 100644
--- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
+++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
@@ -7,17 +7,10 @@
#include <vespa/document/test/make_document_bucket.h>
#include <vespa/storage/storageserver/statemanager.h>
#include <vespa/storage/bucketdb/bucketmanager.h>
-#include <vespa/vdslib/state/cluster_state_bundle.h>
#include <vespa/storage/persistence/persistencethread.h>
#include <vespa/storage/persistence/filestorage/filestormanager.h>
#include <vespa/storage/persistence/filestorage/modifiedbucketchecker.h>
#include <vespa/document/update/assignvalueupdate.h>
-#include <vespa/document/datatype/datatype.h>
-#include <vespa/document/fieldvalue/document.h>
-#include <vespa/document/datatype/documenttype.h>
-#include <vespa/document/update/documentupdate.h>
-#include <vespa/document/fieldvalue/rawfieldvalue.h>
-#include <vespa/document/fieldvalue/stringfieldvalue.h>
#include <vespa/document/select/parser.h>
#include <vespa/vdslib/state/random.h>
#include <vespa/storageapi/message/bucketsplitting.h>
@@ -76,8 +69,6 @@ struct FileStorManagerTest : public CppUnit::TestFixture {
void testFlush();
void testRemapSplit();
void testHandlerPriority();
- void testHandlerPriorityBlocking();
- void testHandlerPriorityPreempt();
void testHandlerMulti();
void testHandlerTimeout();
void testHandlerPause();
@@ -102,7 +93,6 @@ struct FileStorManagerTest : public CppUnit::TestFixture {
void testMergeBucketImplicitCreateBucket();
void testNewlyCreatedBucketIsReady();
void testCreateBucketSetsActiveFlagInDatabaseAndReply();
- void testFileStorThreadLockingStressTest();
void testStateChange();
void testRepairNotifiesDistributorOnChange();
void testDiskMove();
@@ -113,8 +103,6 @@ struct FileStorManagerTest : public CppUnit::TestFixture {
CPPUNIT_TEST(testFlush);
CPPUNIT_TEST(testRemapSplit);
CPPUNIT_TEST(testHandlerPriority);
- CPPUNIT_TEST(testHandlerPriorityBlocking);
- CPPUNIT_TEST(testHandlerPriorityPreempt);
CPPUNIT_TEST(testHandlerMulti);
CPPUNIT_TEST(testHandlerTimeout);
CPPUNIT_TEST(testHandlerPause);
@@ -146,21 +134,17 @@ struct FileStorManagerTest : public CppUnit::TestFixture {
void createBucket(document::BucketId bid, uint16_t disk)
{
- spi::Context context(defaultLoadType, spi::Priority(0),
- spi::Trace::TraceLevel(0));
- _node->getPersistenceProvider().createBucket(
- makeSpiBucket(bid, spi::PartitionId(disk)), context);
+ spi::Context context(defaultLoadType, spi::Priority(0), spi::Trace::TraceLevel(0));
+ _node->getPersistenceProvider().createBucket(makeSpiBucket(bid, spi::PartitionId(disk)), context);
StorBucketDatabase::WrappedEntry entry(
- _node->getStorageBucketDatabase().get(bid, "foo",
- StorBucketDatabase::CREATE_IF_NONEXISTING));
+ _node->getStorageBucketDatabase().get(bid, "foo", StorBucketDatabase::CREATE_IF_NONEXISTING));
entry->disk = disk;
entry->info = api::BucketInfo(0, 0, 0, 0, 0, true, false);
entry.write();
}
- document::Document::UP createDocument(
- const std::string& content, const std::string& id)
+ document::Document::UP createDocument(const std::string& content, const std::string& id)
{
return _node->getTestDocMan().createDocument(content, id);
}
@@ -628,19 +612,16 @@ FileStorManagerTest::testHandlerPriority()
FileStorMetrics metrics(loadTypes.getMetricLoadTypes());
metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1);
- FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(),
- _node->getComponentRegister(), 255, 0);
+ FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), _node->getComponentRegister());
filestorHandler.setGetNextMessageTimeout(50);
std::string content("Here is some content which is in all documents");
std::ostringstream uri;
- Document::SP doc(createDocument(
- content, "userdoc:footype:1234:bar").release());
+ Document::SP doc(createDocument(content, "userdoc:footype:1234:bar").release());
document::BucketIdFactory factory;
- document::BucketId bucket(16, factory.getBucketId(
- doc->getId()).getRawId());
+ document::BucketId bucket(16, factory.getBucketId(doc->getId()).getRawId());
// Populate bucket with the given data
for (uint32_t i = 1; i < 6; i++) {
@@ -739,8 +720,7 @@ FileStorManagerTest::testHandlerPausedMultiThread()
FileStorMetrics metrics(loadTypes.getMetricLoadTypes());
metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1);
- FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(),
- _node->getComponentRegister(), 255, 0);
+ FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), _node->getComponentRegister());
filestorHandler.setGetNextMessageTimeout(50);
std::string content("Here is some content which is in all documents");
@@ -791,8 +771,7 @@ FileStorManagerTest::testHandlerPause()
FileStorMetrics metrics(loadTypes.getMetricLoadTypes());
metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1);
- FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(),
- _node->getComponentRegister(), 255, 0);
+ FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), _node->getComponentRegister());
filestorHandler.setGetNextMessageTimeout(50);
std::string content("Here is some content which is in all documents");
@@ -802,15 +781,12 @@ FileStorManagerTest::testHandlerPause()
document::BucketIdFactory factory;
document::BucketId bucket(16, factory.getBucketId(
- doc->getId()).getRawId());
+ doc->getId()).getRawId());
// Populate bucket with the given data
for (uint32_t i = 1; i < 6; i++) {
- std::shared_ptr<api::PutCommand> cmd(
- new api::PutCommand(makeDocumentBucket(bucket), doc, 100));
- std::unique_ptr<api::StorageMessageAddress> address(
- new api::StorageMessageAddress(
- "storage", lib::NodeType::STORAGE, 3));
+ auto cmd = std::make_shared<api::PutCommand>(makeDocumentBucket(bucket), doc, 100);
+ auto address = std::make_unique<api::StorageMessageAddress>("storage", lib::NodeType::STORAGE, 3);
cmd->setAddress(*address);
cmd->setPriority(i * 15);
filestorHandler.schedule(cmd, 0);
@@ -847,8 +823,7 @@ FileStorManagerTest::testRemapSplit()
// Setup a filestorthread to test
DummyStorageLink top;
DummyStorageLink *dummyManager;
- top.push_back(std::unique_ptr<StorageLink>(
- dummyManager = new DummyStorageLink));
+ top.push_back(std::unique_ptr<StorageLink>(dummyManager = new DummyStorageLink));
top.open();
ForwardingMessageSender messageSender(*dummyManager);
// Since we fake time with small numbers, we need to make sure we dont
@@ -859,7 +834,7 @@ FileStorManagerTest::testRemapSplit()
metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1);
FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(),
- _node->getComponentRegister(), 255, 0);
+ _node->getComponentRegister());
filestorHandler.setGetNextMessageTimeout(50);
std::string content("Here is some content which is in all documents");
@@ -874,10 +849,8 @@ FileStorManagerTest::testRemapSplit()
// Populate bucket with the given data
for (uint32_t i = 1; i < 4; i++) {
- filestorHandler.schedule(
- api::StorageMessage::SP(new api::PutCommand(makeDocumentBucket(bucket1), doc1, i)), 0);
- filestorHandler.schedule(
- api::StorageMessage::SP(new api::PutCommand(makeDocumentBucket(bucket2), doc2, i + 10)), 0);
+ filestorHandler.schedule(std::make_shared<api::PutCommand>(makeDocumentBucket(bucket1), doc1, i), 0);
+ filestorHandler.schedule(std::make_shared<api::PutCommand>(makeDocumentBucket(bucket2), doc2, i + 10), 0);
}
CPPUNIT_ASSERT_EQUAL(std::string("BucketId(0x40000000000004d2): Put(BucketId(0x40000000000004d2), userdoc:footype:1234:bar, timestamp 1, size 108) (priority: 127)\n"
@@ -925,7 +898,7 @@ FileStorManagerTest::testHandlerMulti()
metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1);
FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(),
- _node->getComponentRegister(), 255, 0);
+ _node->getComponentRegister());
filestorHandler.setGetNextMessageTimeout(50);
std::string content("Here is some content which is in all documents");
@@ -935,10 +908,8 @@ FileStorManagerTest::testHandlerMulti()
Document::SP doc2(createDocument(content, "userdoc:footype:4567:bar").release());
document::BucketIdFactory factory;
- document::BucketId bucket1(16, factory.getBucketId(
- doc1->getId()).getRawId());
- document::BucketId bucket2(16, factory.getBucketId(
- doc2->getId()).getRawId());
+ document::BucketId bucket1(16, factory.getBucketId(doc1->getId()).getRawId());
+ document::BucketId bucket2(16, factory.getBucketId(doc2->getId()).getRawId());
// Populate bucket with the given data
for (uint32_t i = 1; i < 10; i++) {
@@ -988,7 +959,7 @@ FileStorManagerTest::testHandlerTimeout()
metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1);
FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(),
- _node->getComponentRegister(), 255, 0);
+ _node->getComponentRegister());
filestorHandler.setGetNextMessageTimeout(50);
std::string content("Here is some content which is in all documents");
@@ -1041,208 +1012,6 @@ FileStorManagerTest::testHandlerTimeout()
}
void
-FileStorManagerTest::testHandlerPriorityBlocking()
-{
- TestName testName("testHandlerPriorityBlocking");
- // Setup a filestorthread to test
- DummyStorageLink top;
- DummyStorageLink *dummyManager;
- top.push_back(std::unique_ptr<StorageLink>(
- dummyManager = new DummyStorageLink));
- top.open();
- ForwardingMessageSender messageSender(*dummyManager);
- // Since we fake time with small numbers, we need to make sure we dont
- // compact them away, as they will seem to be from 1970
-
- documentapi::LoadTypeSet loadTypes("raw:");
- FileStorMetrics metrics(loadTypes.getMetricLoadTypes());
- metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1);
-
- FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(),
- _node->getComponentRegister(), 21, 21);
- filestorHandler.setGetNextMessageTimeout(50);
-
- std::string content("Here is some content which is in all documents");
- std::ostringstream uri;
-
- document::BucketIdFactory factory;
-
- // Populate bucket with the given data
- for (uint32_t i = 1; i < 6; i++) {
- Document::SP doc(createDocument(content, vespalib::make_string("doc:foo:%d",i)).release());
- document::BucketId bucket(16, factory.getBucketId(
- doc->getId()).getRawId());
- std::shared_ptr<api::PutCommand> cmd(
- new api::PutCommand(makeDocumentBucket(bucket), doc, 100));
- std::unique_ptr<api::StorageMessageAddress> address(
- new api::StorageMessageAddress(
- "storage", lib::NodeType::STORAGE, 3));
- cmd->setAddress(*address);
- cmd->setPriority(i * 15);
- filestorHandler.schedule(cmd, 0);
- }
-
- {
- FileStorHandler::LockedMessage lock1 = filestorHandler.getNextMessage(0);
- CPPUNIT_ASSERT_EQUAL(15, (int)lock1.second->getPriority());
-
- LOG(debug, "Waiting for request that should time out");
- FileStorHandler::LockedMessage lock2 = filestorHandler.getNextMessage(0);
- LOG(debug, "Got request that should time out");
- CPPUNIT_ASSERT(lock2.second.get() == NULL);
- }
-
- {
- FileStorHandler::LockedMessage lock1 = filestorHandler.getNextMessage(0);
- CPPUNIT_ASSERT_EQUAL(30, (int)lock1.second->getPriority());
-
- // New high-pri message comes in
- Document::SP doc(createDocument(content, vespalib::make_string("doc:foo:%d", 100)).release());
- document::BucketId bucket(16, factory.getBucketId(
- doc->getId()).getRawId());
- std::shared_ptr<api::PutCommand> cmd(
- new api::PutCommand(makeDocumentBucket(bucket), doc, 100));
- std::unique_ptr<api::StorageMessageAddress> address(
- new api::StorageMessageAddress(
- "storage", lib::NodeType::STORAGE, 3));
- cmd->setAddress(*address);
- cmd->setPriority(15);
- filestorHandler.schedule(cmd, 0);
-
- FileStorHandler::LockedMessage lock2 = filestorHandler.getNextMessage(0);
- CPPUNIT_ASSERT_EQUAL(15, (int)lock2.second->getPriority());
-
- LOG(debug, "Waiting for request that should time out");
- FileStorHandler::LockedMessage lock3 = filestorHandler.getNextMessage(0);
- LOG(debug, "Got request that should time out");
- CPPUNIT_ASSERT(lock3.second.get() == NULL);
- }
-
- {
- FileStorHandler::LockedMessage lock1 = filestorHandler.getNextMessage(0);
- CPPUNIT_ASSERT_EQUAL(45, (int)lock1.second->getPriority());
-
- FileStorHandler::LockedMessage lock = filestorHandler.getNextMessage(0);
- CPPUNIT_ASSERT_EQUAL(60, (int)lock.second->getPriority());
- }
- LOG(debug, "Test done");
-}
-
-class PausedThread : public document::Runnable {
-private:
- FileStorHandler& _handler;
-
-public:
- bool pause;
- bool done;
- bool gotoperation;
-
- PausedThread(FileStorHandler& handler)
- : _handler(handler), pause(false), done(false), gotoperation(false) {}
-
- void run() override {
- FileStorHandler::LockedMessage msg = _handler.getNextMessage(0);
- gotoperation = true;
-
- while (!done) {
- if (pause) {
- _handler.pause(0, msg.second->getPriority());
- pause = false;
- }
- FastOS_Thread::Sleep(10);
- }
-
- done = false;
- };
-};
-
-void
-FileStorManagerTest::testHandlerPriorityPreempt()
-{
- TestName testName("testHandlerPriorityPreempt");
- // Setup a filestorthread to test
- DummyStorageLink top;
- DummyStorageLink *dummyManager;
- top.push_back(std::unique_ptr<StorageLink>(
- dummyManager = new DummyStorageLink));
- top.open();
- ForwardingMessageSender messageSender(*dummyManager);
- // Since we fake time with small numbers, we need to make sure we dont
- // compact them away, as they will seem to be from 1970
-
- documentapi::LoadTypeSet loadTypes("raw:");
- FileStorMetrics metrics(loadTypes.getMetricLoadTypes());
- metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1);
-
- FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(),
- _node->getComponentRegister(), 21, 21);
- filestorHandler.setGetNextMessageTimeout(50);
-
- std::string content("Here is some content which is in all documents");
- std::ostringstream uri;
-
- document::BucketIdFactory factory;
-
- {
- Document::SP doc(createDocument(content, "doc:foo:1").release());
- document::BucketId bucket(16, factory.getBucketId(
- doc->getId()).getRawId());
- std::shared_ptr<api::PutCommand> cmd(
- new api::PutCommand(makeDocumentBucket(bucket), doc, 100));
- std::unique_ptr<api::StorageMessageAddress> address(
- new api::StorageMessageAddress(
- "storage", lib::NodeType::STORAGE, 3));
- cmd->setAddress(*address);
- cmd->setPriority(60);
- filestorHandler.schedule(cmd, 0);
- }
-
- PausedThread thread(filestorHandler);
- FastOS_ThreadPool pool(512 * 1024);
- thread.start(pool);
-
- while (!thread.gotoperation) {
- FastOS_Thread::Sleep(10);
- }
-
- {
- Document::SP doc(createDocument(content, "doc:foo:2").release());
- document::BucketId bucket(16, factory.getBucketId(
- doc->getId()).getRawId());
- std::shared_ptr<api::PutCommand> cmd(
- new api::PutCommand(makeDocumentBucket(bucket), doc, 100));
- std::unique_ptr<api::StorageMessageAddress> address(
- new api::StorageMessageAddress(
- "storage", lib::NodeType::STORAGE, 3));
- cmd->setAddress(*address);
- cmd->setPriority(20);
- filestorHandler.schedule(cmd, 0);
- }
-
- {
- FileStorHandler::LockedMessage lock1 = filestorHandler.getNextMessage(0);
- CPPUNIT_ASSERT_EQUAL(20, (int)lock1.second->getPriority());
-
- thread.pause = true;
-
- for (uint32_t i = 0; i < 10; i++) {
- CPPUNIT_ASSERT(thread.pause);
- FastOS_Thread::Sleep(100);
- }
- }
-
- while (thread.pause) {
- FastOS_Thread::Sleep(10);
- }
-
- thread.done = true;
-
- while (thread.done) {
- FastOS_Thread::Sleep(10);
- }
-}
-
-void
FileStorManagerTest::testPriority()
{
TestName testName("testPriority");
@@ -1260,8 +1029,7 @@ FileStorManagerTest::testPriority()
FileStorMetrics metrics(loadTypes.getMetricLoadTypes());
metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 2);
- FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(),
- _node->getComponentRegister(), 255, 0);
+ FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), _node->getComponentRegister());
std::unique_ptr<DiskThread> thread(createThread(
*config, *_node, _node->getPersistenceProvider(),
filestorHandler, *metrics.disks[0]->threads[0], 0));
@@ -1275,8 +1043,7 @@ FileStorManagerTest::testPriority()
std::string content("Here is some content which is in all documents");
std::ostringstream uri;
- uri << "userdoc:footype:" << (i % 3 == 0 ? 0x10001 : 0x0100001)
- << ":mydoc-" << i;
+ uri << "userdoc:footype:" << (i % 3 == 0 ? 0x10001 : 0x0100001)<< ":mydoc-" << i;
Document::SP doc(createDocument(content, uri.str()).release());
documents.push_back(doc);
}
@@ -1285,20 +1052,16 @@ FileStorManagerTest::testPriority()
// Create buckets in separate, initial pass to avoid races with puts
for (uint32_t i=0; i<documents.size(); ++i) {
- document::BucketId bucket(16, factory.getBucketId(
- documents[i]->getId()).getRawId());
+ document::BucketId bucket(16, factory.getBucketId(documents[i]->getId()).getRawId());
- spi::Context context(defaultLoadType, spi::Priority(0),
- spi::Trace::TraceLevel(0));
+ spi::Context context(defaultLoadType, spi::Priority(0), spi::Trace::TraceLevel(0));
- _node->getPersistenceProvider().createBucket(
- makeSpiBucket(bucket), context);
+ _node->getPersistenceProvider().createBucket(makeSpiBucket(bucket), context);
}
// Populate bucket with the given data
for (uint32_t i=0; i<documents.size(); ++i) {
- document::BucketId bucket(16, factory.getBucketId(
- documents[i]->getId()).getRawId());
+ document::BucketId bucket(16, factory.getBucketId(documents[i]->getId()).getRawId());
std::shared_ptr<api::PutCommand> cmd(
new api::PutCommand(makeDocumentBucket(bucket), documents[i], 100 + i));
@@ -1353,8 +1116,7 @@ FileStorManagerTest::testSplit1()
documentapi::LoadTypeSet loadTypes("raw:");
FileStorMetrics metrics(loadTypes.getMetricLoadTypes());
metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1);
- FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(),
- _node->getComponentRegister(), 255, 0);
+ FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), _node->getComponentRegister());
std::unique_ptr<DiskThread> thread(createThread(
*config, *_node, _node->getPersistenceProvider(),
filestorHandler, *metrics.disks[0]->threads[0], 0));
@@ -1522,10 +1284,8 @@ FileStorManagerTest::testSplitSingleGroup()
documentapi::LoadTypeSet loadTypes("raw:");
FileStorMetrics metrics(loadTypes.getMetricLoadTypes());
metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1);
- FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(),
- _node->getComponentRegister(), 255, 0);
- spi::Context context(defaultLoadType, spi::Priority(0),
- spi::Trace::TraceLevel(0));
+ FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), _node->getComponentRegister());
+ spi::Context context(defaultLoadType, spi::Priority(0), spi::Trace::TraceLevel(0));
for (uint32_t j=0; j<1; ++j) {
// Test this twice, once where all the data ends up in file with
// splitbit set, and once where all the data ends up in file with
@@ -1663,8 +1423,7 @@ FileStorManagerTest::testSplitEmptyTargetWithRemappedOps()
documentapi::LoadTypeSet loadTypes("raw:");
FileStorMetrics metrics(loadTypes.getMetricLoadTypes());
metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1);
- FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(),
- _node->getComponentRegister(), 255, 0);
+ FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), _node->getComponentRegister());
std::unique_ptr<DiskThread> thread(createThread(
*config, *_node, _node->getPersistenceProvider(),
filestorHandler, *metrics.disks[0]->threads[0], 0));
@@ -1741,8 +1500,7 @@ FileStorManagerTest::testNotifyOnSplitSourceOwnershipChanged()
documentapi::LoadTypeSet loadTypes("raw:");
FileStorMetrics metrics(loadTypes.getMetricLoadTypes());
metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1);
- FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(),
- _node->getComponentRegister(), 255, 0);
+ FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), _node->getComponentRegister());
std::unique_ptr<DiskThread> thread(createThread(
*config, *_node, _node->getPersistenceProvider(),
filestorHandler, *metrics.disks[0]->threads[0], 0));
@@ -1791,8 +1549,7 @@ FileStorManagerTest::testJoin()
documentapi::LoadTypeSet loadTypes("raw:");
FileStorMetrics metrics(loadTypes.getMetricLoadTypes());
metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1);
- FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(),
- _node->getComponentRegister(), 255, 0);
+ FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), _node->getComponentRegister());
std::unique_ptr<DiskThread> thread(createThread(
*config, *_node, _node->getPersistenceProvider(),
filestorHandler, *metrics.disks[0]->threads[0], 0));
@@ -1802,10 +1559,8 @@ FileStorManagerTest::testJoin()
std::string content("Here is some content which is in all documents");
std::ostringstream uri;
- uri << "userdoc:footype:" << (i % 3 == 0 ? 0x10001 : 0x0100001)
- << ":mydoc-" << i;
- Document::SP doc(createDocument(
- content, uri.str()).release());
+ uri << "userdoc:footype:" << (i % 3 == 0 ? 0x10001 : 0x0100001) << ":mydoc-" << i;
+ Document::SP doc(createDocument(content, uri.str()).release());
documents.push_back(doc);
}
document::BucketIdFactory factory;
@@ -1816,36 +1571,26 @@ FileStorManagerTest::testJoin()
{
// Populate bucket with the given data
for (uint32_t i=0; i<documents.size(); ++i) {
- document::BucketId bucket(17, factory.getBucketId(
- documents[i]->getId()).getRawId());
- std::shared_ptr<api::PutCommand> cmd(
- new api::PutCommand(makeDocumentBucket(bucket), documents[i], 100 + i));
- std::unique_ptr<api::StorageMessageAddress> address(
- new api::StorageMessageAddress(
- "storage", lib::NodeType::STORAGE, 3));
+ document::BucketId bucket(17, factory.getBucketId(documents[i]->getId()).getRawId());
+ auto cmd = std::make_shared<api::PutCommand>(makeDocumentBucket(bucket), documents[i], 100 + i);
+ auto address = std::make_unique<api::StorageMessageAddress>("storage", lib::NodeType::STORAGE, 3);
cmd->setAddress(*address);
filestorHandler.schedule(cmd, 0);
filestorHandler.flush(true);
CPPUNIT_ASSERT_EQUAL((size_t) 1, top.getNumReplies());
- std::shared_ptr<api::PutReply> reply(
- std::dynamic_pointer_cast<api::PutReply>(
- top.getReply(0)));
+ auto reply = std::dynamic_pointer_cast<api::PutReply>(top.getReply(0));
CPPUNIT_ASSERT(reply.get());
- CPPUNIT_ASSERT_EQUAL(ReturnCode(ReturnCode::OK),
- reply->getResult());
+ CPPUNIT_ASSERT_EQUAL(ReturnCode(ReturnCode::OK), reply->getResult());
top.reset();
// Delete every 5th document to have delete entries in file too
if (i % 5 == 0) {
- std::shared_ptr<api::RemoveCommand> rcmd(
- new api::RemoveCommand(
- makeDocumentBucket(bucket), documents[i]->getId(), 1000000 + 100 + i));
+ auto rcmd = std::make_shared<api::RemoveCommand>(makeDocumentBucket(bucket),
+ documents[i]->getId(), 1000000 + 100 + i);
rcmd->setAddress(*address);
filestorHandler.schedule(rcmd, 0);
filestorHandler.flush(true);
CPPUNIT_ASSERT_EQUAL((size_t) 1, top.getNumReplies());
- std::shared_ptr<api::RemoveReply> rreply(
- std::dynamic_pointer_cast<api::RemoveReply>(
- top.getReply(0)));
+ auto rreply = std::dynamic_pointer_cast<api::RemoveReply>(top.getReply(0));
CPPUNIT_ASSERT_MSG(top.getReply(0)->getType().toString(),
rreply.get());
CPPUNIT_ASSERT_EQUAL(ReturnCode(ReturnCode::OK),
diff --git a/storage/src/tests/persistence/persistencequeuetest.cpp b/storage/src/tests/persistence/persistencequeuetest.cpp
index 8d18f75d3db..f5f190ad718 100644
--- a/storage/src/tests/persistence/persistencequeuetest.cpp
+++ b/storage/src/tests/persistence/persistencequeuetest.cpp
@@ -77,9 +77,7 @@ PersistenceQueueTest::testFetchNextUnlockedMessageIfBucketLocked()
metrics.initDiskMetrics(_node->getPartitions().size(),
loadTypes.getMetricLoadTypes(), 1);
- FileStorHandler filestorHandler(messageSender, metrics,
- _node->getPartitions(),
- _node->getComponentRegister(), 255, 0);
+ FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), _node->getComponentRegister());
// Send 2 puts, 2 to the first bucket, 1 to the second. Calling
// getNextMessage 2 times should then return a lock on the first bucket,
diff --git a/storage/src/tests/persistence/persistencetestutils.cpp b/storage/src/tests/persistence/persistencetestutils.cpp
index e0769795bed..d46708b3aaa 100644
--- a/storage/src/tests/persistence/persistencetestutils.cpp
+++ b/storage/src/tests/persistence/persistencetestutils.cpp
@@ -52,15 +52,13 @@ PersistenceTestEnvironment::PersistenceTestEnvironment(DiskCount numDisks, const
_node.setupDummyPersistence();
_metrics.initDiskMetrics(
numDisks, _node.getLoadTypes()->getMetricLoadTypes(), 1);
- _handler.reset(new FileStorHandler(
- _messageKeeper, _metrics,
- _node.getPersistenceProvider().getPartitionStates().getList(),
- _node.getComponentRegister(), 255, 0));
+ _handler.reset(new FileStorHandler(_messageKeeper, _metrics,
+ _node.getPersistenceProvider().getPartitionStates().getList(),
+ _node.getComponentRegister()));
for (uint32_t i = 0; i < numDisks; i++) {
_diskEnvs.push_back(
- std::unique_ptr<PersistenceUtil>(
- new PersistenceUtil(_config.getConfigId(), _node.getComponentRegister(),
- *_handler, *_metrics.disks[i]->threads[0], i, _node.getPersistenceProvider())));
+ std::make_unique<PersistenceUtil>(_config.getConfigId(), _node.getComponentRegister(), *_handler,
+ *_metrics.disks[i]->threads[0], i, _node.getPersistenceProvider()));
}
}
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp
index b53a2a361b3..949ceb901e1 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp
@@ -7,12 +7,8 @@ namespace storage {
FileStorHandler::FileStorHandler(MessageSender& sender,
FileStorMetrics& metrics,
const spi::PartitionStateList& partitions,
- ServiceLayerComponentRegister& compReg,
- uint8_t maxPriorityToBlock,
- uint8_t minPriorityToBeBlocking)
- : _impl(new FileStorHandlerImpl(
- sender, metrics, partitions, compReg,
- maxPriorityToBlock, minPriorityToBeBlocking))
+ ServiceLayerComponentRegister& compReg)
+ : _impl(new FileStorHandlerImpl(sender, metrics, partitions, compReg))
{
}
@@ -57,11 +53,6 @@ FileStorHandler::schedule(const api::StorageMessage::SP& msg, uint16_t thread)
return _impl->schedule(msg, thread);
}
-void
-FileStorHandler::pause(uint16_t disk, uint8_t priority) const {
- return _impl->pause(disk, priority);
-}
-
FileStorHandler::LockedMessage
FileStorHandler::getNextMessage(uint16_t thread)
{
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
index 8fc528b5ad7..2cfc97ca71b 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
@@ -15,7 +15,6 @@
#include "mergestatus.h"
#include <vespa/document/bucket/bucket.h>
-#include <ostream>
#include <vespa/storage/storageutil/resumeguard.h>
#include <vespa/storage/common/messagesender.h>
@@ -72,12 +71,7 @@ public:
CLOSED
};
- FileStorHandler(MessageSender&,
- FileStorMetrics&,
- const spi::PartitionStateList&,
- ServiceLayerComponentRegister&,
- uint8_t maxPriorityToBlock,
- uint8_t minPriorityToBeBlocking);
+ FileStorHandler(MessageSender&, FileStorMetrics&, const spi::PartitionStateList&, ServiceLayerComponentRegister&);
~FileStorHandler();
// Commands used by file stor manager
@@ -115,17 +109,7 @@ public:
* Schedule a storage message to be processed by the given disk
* @return True if we maanged to schedule operation. False if not
*/
- bool schedule(const std::shared_ptr<api::StorageMessage>&,
- uint16_t disk);
-
- // Commands used by file stor threads
-
- /**
- * When called, checks if any running operations have "preempting"
- * priority. If so, and the given priority is less than that, this call
- * will hang until the other operation is done.
- */
- void pause(uint16_t disk, uint8_t priority) const;
+ bool schedule(const std::shared_ptr<api::StorageMessage>&, uint16_t disk);
/**
* Used by file stor threads to get their next message to process.
@@ -216,8 +200,7 @@ public:
* Fail all operations towards a single bucket currently queued to the
* given thread with the given error code.
*/
- void failOperations(const document::Bucket&, uint16_t fromDisk,
- const api::ReturnCode&);
+ void failOperations(const document::Bucket&, uint16_t fromDisk, const api::ReturnCode&);
/**
* Add a new merge state to the registry.
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
index 0793d23b862..75f389815df 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
@@ -23,20 +23,14 @@ using document::BucketSpace;
namespace storage {
-FileStorHandlerImpl::FileStorHandlerImpl(
- MessageSender& sender,
- FileStorMetrics& metrics,
- const spi::PartitionStateList& partitions,
- ServiceLayerComponentRegister& compReg,
- uint8_t maxPriorityToBlock,
- uint8_t minPriorityToBeBlocking)
+FileStorHandlerImpl::FileStorHandlerImpl(MessageSender& sender, FileStorMetrics& metrics,
+ const spi::PartitionStateList& partitions,
+ ServiceLayerComponentRegister& compReg)
: _partitions(partitions),
_component(compReg, "filestorhandlerimpl"),
_diskInfo(_component.getDiskCount()),
_messageSender(sender),
_bucketIdFactory(_component.getBucketIdFactory()),
- _maxPriorityToBlock(maxPriorityToBlock),
- _minPriorityToBeBlocking(minPriorityToBeBlocking),
_getNextMessageTimeout(100),
_paused(false)
{
@@ -46,23 +40,21 @@ FileStorHandlerImpl::FileStorHandlerImpl(
}
if (_diskInfo.size() == 0) {
- throw vespalib::IllegalArgumentException(
- "No disks configured", VESPA_STRLOC);
+ throw vespalib::IllegalArgumentException("No disks configured", VESPA_STRLOC);
}
// Add update hook, so we will get callbacks each 5 seconds to update
// metrics.
_component.registerMetricUpdateHook(*this, framework::SecondTime(5));
}
-FileStorHandlerImpl::~FileStorHandlerImpl() { }
+FileStorHandlerImpl::~FileStorHandlerImpl() = default;
void
FileStorHandlerImpl::addMergeStatus(const document::Bucket& bucket, MergeStatus::SP status)
{
vespalib::LockGuard mlock(_mergeStatesLock);
if (_mergeStates.find(bucket) != _mergeStates.end()) {;
- LOG(warning, "A merge status already existed for %s. Overwriting it.",
- bucket.toString().c_str());
+ LOG(warning, "A merge status already existed for %s. Overwriting it.", bucket.toString().c_str());
}
_mergeStates[bucket] = status;
}
@@ -73,8 +65,7 @@ FileStorHandlerImpl::editMergeStatus(const document::Bucket& bucket)
vespalib::LockGuard mlock(_mergeStatesLock);
MergeStatus::SP status = _mergeStates[bucket];
if (status.get() == 0) {
- throw vespalib::IllegalStateException(
- "No merge state exist for " + bucket.toString(), VESPA_STRLOC);
+ throw vespalib::IllegalStateException("No merge state exist for " + bucket.toString(), VESPA_STRLOC);
}
return *status;
}
@@ -94,8 +85,7 @@ FileStorHandlerImpl::getNumActiveMerges() const
}
void
-FileStorHandlerImpl::clearMergeStatus(const document::Bucket& bucket,
- const api::ReturnCode* code)
+FileStorHandlerImpl::clearMergeStatus(const document::Bucket& bucket, const api::ReturnCode* code)
{
vespalib::LockGuard mlock(_mergeStatesLock);
auto it = _mergeStates.find(bucket);
@@ -153,10 +143,9 @@ FileStorHandlerImpl::flush(bool killPendingMerges)
if (killPendingMerges) {
api::ReturnCode code(api::ReturnCode::ABORTED,
"Storage node is shutting down");
- for (std::map<document::Bucket, MergeStatus::SP>::iterator it
- = _mergeStates.begin(); it != _mergeStates.end(); ++it)
+ for (auto & entry : _mergeStates)
{
- MergeStatus& s(*it->second);
+ MergeStatus& s(*entry.second);
if (s.pendingGetDiff.get() != 0) {
s.pendingGetDiff->setResult(code);
_messageSender.sendReply(s.pendingGetDiff);
@@ -182,11 +171,9 @@ FileStorHandlerImpl::reply(api::StorageMessage& msg,
std::shared_ptr<api::StorageReply> rep(
static_cast<api::StorageCommand&>(msg).makeReply().release());
if (state == FileStorHandler::DISABLED) {
- rep->setResult(api::ReturnCode(
- api::ReturnCode::DISK_FAILURE, "Disk disabled"));
+ rep->setResult(api::ReturnCode(api::ReturnCode::DISK_FAILURE, "Disk disabled"));
} else {
- rep->setResult(api::ReturnCode(
- api::ReturnCode::ABORTED, "Shutting down storage node."));
+ rep->setResult(api::ReturnCode(api::ReturnCode::ABORTED, "Shutting down storage node."));
}
_messageSender.sendReply(rep);
}
@@ -270,29 +257,6 @@ FileStorHandlerImpl::schedule(const std::shared_ptr<api::StorageMessage>& msg,
return true;
}
-void
-FileStorHandlerImpl::pause(uint16_t disk, uint8_t priority) const {
- if (priority < _maxPriorityToBlock) {
- return;
- }
-
- assert(disk < _diskInfo.size());
- const Disk& t(_diskInfo[disk]);
- vespalib::MonitorGuard lockGuard(t.lock);
-
- bool paused = true;
- while (paused) {
- paused = false;
- for (auto& lockedBucket : t.lockedBuckets) {
- if (lockedBucket.second.priority <= _minPriorityToBeBlocking) {
- paused = true;
- lockGuard.wait();
- break;
- }
- }
- }
-}
-
bool
FileStorHandlerImpl::messageMayBeAborted(const api::StorageMessage& msg) const
{
@@ -395,18 +359,6 @@ FileStorHandlerImpl::abortQueuedOperations(
}
}
-bool
-FileStorHandlerImpl::hasBlockingOperations(const Disk& t) const
-{
- for (const auto & lockedBucket : t.lockedBuckets) {
- if (lockedBucket.second.priority <= _minPriorityToBeBlocking) {
- return true;
- }
- }
-
- return false;
-}
-
void
FileStorHandlerImpl::updateMetrics(const MetricLockGuard &)
{
@@ -423,10 +375,7 @@ FileStorHandlerImpl::getNextMessage(uint16_t disk, FileStorHandler::LockedMessag
{
document::Bucket bucket(lck.first->getBucket());
- LOG(spam,
- "Disk %d retrieving message for buffered bucket %s",
- disk,
- bucket.getBucketId().toString().c_str());
+ LOG(spam, "Disk %d retrieving message for buffered bucket %s", disk, bucket.getBucketId().toString().c_str());
assert(disk < _diskInfo.size());
Disk& t(_diskInfo[disk]);
@@ -449,22 +398,12 @@ FileStorHandlerImpl::getNextMessage(uint16_t disk, FileStorHandler::LockedMessag
api::StorageMessage & m(*range.first->_command);
mbus::Trace& trace = m.getTrace();
- // Priority is too low, not buffering any more.
- if (m.getPriority() >= _maxPriorityToBlock) {
- lck.second.reset();
- return lck;
- }
+ MBUS_TRACE(trace, 9, "FileStorHandler: Message identified by disk thread looking for more requests to active bucket.");
- MBUS_TRACE(trace, 9,
- "FileStorHandler: Message identified by disk thread looking for "
- "more requests to active bucket.");
-
- uint64_t waitTime(
- const_cast<metrics::MetricTimer&>(range.first->_timer).stop(
- t.metrics->averageQueueWaitingTime[m.getLoadType()]));
+ uint64_t waitTime(const_cast<metrics::MetricTimer&>(range.first->_timer).stop(
+ t.metrics->averageQueueWaitingTime[m.getLoadType()]));
- LOG(debug, "Message %s waited %" PRIu64 " ms in storage queue (bucket %s), "
- "timeout %d",
+ LOG(debug, "Message %s waited %" PRIu64 " ms in storage queue (bucket %s), timeout %d",
m.toString().c_str(), waitTime, bucket.getBucketId().toString().c_str(),
static_cast<api::StorageCommand&>(m).getTimeout());
@@ -478,15 +417,11 @@ FileStorHandlerImpl::getNextMessage(uint16_t disk, FileStorHandler::LockedMessag
lockGuard.unlock();
return lck;
} else {
- std::shared_ptr<api::StorageReply> msgReply(
- static_cast<api::StorageCommand&>(m)
- .makeReply().release());
+ std::shared_ptr<api::StorageReply> msgReply(static_cast<api::StorageCommand&>(m).makeReply().release());
idx.erase(range.first);
lockGuard.broadcast();
lockGuard.unlock();
- msgReply->setResult(api::ReturnCode(
- api::ReturnCode::TIMEOUT,
- "Message waited too long in storage queue"));
+ msgReply->setResult(api::ReturnCode(api::ReturnCode::TIMEOUT, "Message waited too long in storage queue"));
_messageSender.sendReply(msgReply);
lck.second.reset();
@@ -515,23 +450,13 @@ FileStorHandlerImpl::diskIsClosed(uint16_t disk) const
}
bool
-FileStorHandlerImpl::operationBlockedByHigherPriorityThread(
- const api::StorageMessage& msg,
- const Disk& disk) const
-{
- return ((msg.getPriority() >= _maxPriorityToBlock)
- && hasBlockingOperations(disk));
-}
-
-bool
FileStorHandlerImpl::messageTimedOutInQueue(const api::StorageMessage& msg,
uint64_t waitTime) const
{
if (msg.getType().isReply()) {
return false; // Replies must always be processed and cannot time out.
}
- return (waitTime >= static_cast<const api::StorageCommand&>(
- msg).getTimeout());
+ return (waitTime >= static_cast<const api::StorageCommand&>(msg).getTimeout());
}
std::unique_ptr<FileStorHandler::BucketLockInterface>
@@ -541,8 +466,7 @@ FileStorHandlerImpl::takeDiskBucketLockOwnership(
const document::Bucket &bucket,
const api::StorageMessage& msg)
{
- return std::unique_ptr<FileStorHandler::BucketLockInterface>(
- new BucketLock(guard, disk, bucket, msg.getPriority(), msg.getSummary()));
+ return std::make_unique<BucketLock>(guard, disk, bucket, msg.getPriority(), msg.getSummary());
}
std::unique_ptr<api::StorageReply>
@@ -587,11 +511,7 @@ FileStorHandlerImpl::getNextMessage(uint16_t disk)
iter++;
}
if (iter != end) {
- api::StorageMessage &m(*iter->_command);
-
- if (! operationBlockedByHigherPriorityThread(m, t)
- && ! isPaused())
- {
+ if (! isPaused()) {
return getMessage(lockGuard, t, idx, iter);
}
}
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
index 0b1ab5661e2..6b6d154e149 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
@@ -142,12 +142,8 @@ public:
document::Bucket _bucket;
};
- FileStorHandlerImpl(MessageSender&,
- FileStorMetrics&,
- const spi::PartitionStateList&,
- ServiceLayerComponentRegister&,
- uint8_t maxPriorityToBlock,
- uint8_t minPriorityToBeBlocking);
+ FileStorHandlerImpl(MessageSender&, FileStorMetrics&,
+ const spi::PartitionStateList&, ServiceLayerComponentRegister&);
~FileStorHandlerImpl();
void setGetNextMessageTimeout(uint32_t timeout) { _getNextMessageTimeout = timeout; }
@@ -158,7 +154,6 @@ public:
void close();
bool schedule(const std::shared_ptr<api::StorageMessage>&, uint16_t disk);
- void pause(uint16_t disk, uint8_t priority) const;
FileStorHandler::LockedMessage getNextMessage(uint16_t disk);
FileStorHandler::LockedMessage getMessage(vespalib::MonitorGuard & guard, Disk & t, PriorityIdx & idx, PriorityIdx::iterator iter);
@@ -203,8 +198,6 @@ private:
std::map<document::Bucket, MergeStatus::SP> _mergeStates;
- uint8_t _maxPriorityToBlock;
- uint8_t _minPriorityToBeBlocking;
uint32_t _getNextMessageTimeout;
vespalib::Monitor _pauseMonitor;
@@ -236,12 +229,6 @@ private:
bool diskIsClosed(uint16_t disk) const;
/**
- * Return whether an already running high priority operation pre-empts
- * (blocks) the operation in msg from even starting in the current thread.
- */
- bool operationBlockedByHigherPriorityThread(const api::StorageMessage& msg, const Disk& disk) const;
-
- /**
* Return whether msg has timed out based on waitTime and the message's
* specified timeout.
*/
@@ -261,7 +248,6 @@ private:
*/
std::unique_ptr<api::StorageReply> makeQueueTimeoutReply(api::StorageMessage& msg) const;
bool messageMayBeAborted(const api::StorageMessage& msg) const;
- bool hasBlockingOperations(const Disk& t) const;
void abortQueuedCommandsForBuckets(Disk& disk, const AbortBucketOperationsCommand& cmd);
bool diskHasActiveOperationForAbortedBucket(const Disk& disk, const AbortBucketOperationsCommand& cmd) const;
void waitUntilNoActiveOperationsForAbortedBuckets(Disk& disk, const AbortBucketOperationsCommand& cmd);
@@ -286,4 +272,3 @@ private:
};
} // storage
-
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
index 20c08acc01c..5d4cae19b59 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
@@ -116,17 +116,12 @@ FileStorManager::configure(std::unique_ptr<vespa::config::content::StorFilestorC
_disks.resize(_component.getDiskCount());
- _metrics->initDiskMetrics(
- _disks.size(),
- _component.getLoadTypes()->getMetricLoadTypes(),
- (_config->threads.size() > 0) ? (_config->threads.size()) : 6);
-
- _filestorHandler.reset(new FileStorHandler(
- *this, *_metrics, _partitions, _compReg,
- _config->maxPriorityToBlock, _config->minPriorityToBeBlocking));
+ size_t numThreads = (_config->threads.size() == 0) ? 6 : _config->threads.size();
+ _metrics->initDiskMetrics(_disks.size(), _component.getLoadTypes()->getMetricLoadTypes(), numThreads);
+
+ _filestorHandler.reset(new FileStorHandler(*this, *_metrics, _partitions, _compReg));
for (uint32_t i=0; i<_component.getDiskCount(); ++i) {
if (_partitions[i].isUp()) {
- size_t numThreads = (_config->threads.size() == 0) ? 6 : _config->threads.size();
LOG(spam, "Setting up disk %u", i);
for (uint32_t j = 0; j < numThreads; j++) {
_disks[i].push_back(DiskThread::SP(
diff --git a/storage/src/vespa/storage/persistence/filestorage/pausehandler.h b/storage/src/vespa/storage/persistence/filestorage/pausehandler.h
deleted file mode 100644
index 59c543b0067..00000000000
--- a/storage/src/vespa/storage/persistence/filestorage/pausehandler.h
+++ /dev/null
@@ -1,34 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-/**
- * @class PauseHandler
- * @ingroup persistence
- *
- * @brief Object that can be used to possibly pause running operation
- */
-#pragma once
-
-#include <vespa/storage/persistence/filestorage/filestorhandler.h>
-
-namespace storage {
-
-class PauseHandler {
- FileStorHandler* _handler;
- uint16_t _disk;
- uint8_t _priority;
-
-public:
- PauseHandler() : _handler(0), _disk(0), _priority(0) {}
- PauseHandler(FileStorHandler& handler, uint16_t disk)
- : _handler(&handler),
- _disk(disk),
- _priority(0)
- {
- }
-
- void setPriority(uint8_t priority) { _priority = priority; }
-
- void pause() const { if (_handler != 0) _handler->pause(_disk, _priority); }
-};
-
-} // storage
-
diff --git a/storage/src/vespa/storage/persistence/persistencethread.cpp b/storage/src/vespa/storage/persistence/persistencethread.cpp
index 13d9d0ffbb6..fded9c87978 100644
--- a/storage/src/vespa/storage/persistence/persistencethread.cpp
+++ b/storage/src/vespa/storage/persistence/persistencethread.cpp
@@ -947,28 +947,23 @@ PersistenceThread::processMessage(api::StorageMessage& msg)
++_env._metrics.operations;
if (msg.getType().isReply()) {
try{
- _env._pauseHandler.setPriority(msg.getPriority());
LOG(debug, "Handling reply: %s", msg.toString().c_str());
LOG(spam, "Message content: %s", msg.toString(true).c_str());
handleReply(static_cast<api::StorageReply&>(msg));
} catch (std::exception& e) {
// It's a reply, so nothing we can do.
- LOG(debug, "Caught exception for %s: %s",
- msg.toString().c_str(),
- e.what());
+ LOG(debug, "Caught exception for %s: %s", msg.toString().c_str(), e.what());
}
} else {
api::StorageCommand& initiatingCommand =
static_cast<api::StorageCommand&>(msg);
try {
- int64_t startTime(
- _component->getClock().getTimeInMillis().getTime());
+ int64_t startTime(_component->getClock().getTimeInMillis().getTime());
LOG(debug, "Handling command: %s", msg.toString().c_str());
LOG(spam, "Message content: %s", msg.toString(true).c_str());
- std::unique_ptr<MessageTracker> tracker(
- handleCommand(initiatingCommand));
+ auto tracker(handleCommand(initiatingCommand));
if (!tracker.get()) {
LOG(debug, "Received unsupported command %s",
msg.getType().getName().c_str());
diff --git a/storage/src/vespa/storage/persistence/persistenceutil.cpp b/storage/src/vespa/storage/persistence/persistenceutil.cpp
index ec666925148..c2dcb8e2a29 100644
--- a/storage/src/vespa/storage/persistence/persistenceutil.cpp
+++ b/storage/src/vespa/storage/persistence/persistenceutil.cpp
@@ -76,7 +76,6 @@ PersistenceUtil::PersistenceUtil(
_metrics(metrics),
_bucketFactory(_component.getBucketIdFactory()),
_repo(_component.getTypeRepo()),
- _pauseHandler(),
_spi(provider)
{
}
@@ -87,9 +86,8 @@ void
PersistenceUtil::updateBucketDatabase(const document::Bucket &bucket, const api::BucketInfo& i)
{
// Update bucket database
- StorBucketDatabase::WrappedEntry entry(getBucketDatabase(bucket.getBucketSpace()).get(
- bucket.getBucketId(),
- "env::updatebucketdb"));
+ StorBucketDatabase::WrappedEntry entry(getBucketDatabase(bucket.getBucketSpace()).get(bucket.getBucketId(),
+ "env::updatebucketdb"));
if (entry.exist()) {
api::BucketInfo info = i;
@@ -101,9 +99,7 @@ PersistenceUtil::updateBucketDatabase(const document::Bucket &bucket, const api:
entry->setBucketInfo(info);
entry.write();
} else {
- LOG(debug,
- "Bucket(%s).getBucketInfo: Bucket does not exist.",
- bucket.getBucketId().toString().c_str());
+ LOG(debug, "Bucket(%s).getBucketInfo: Bucket does not exist.", bucket.getBucketId().toString().c_str());
}
}
diff --git a/storage/src/vespa/storage/persistence/persistenceutil.h b/storage/src/vespa/storage/persistence/persistenceutil.h
index c3bd706f4b7..bf347ccb10a 100644
--- a/storage/src/vespa/storage/persistence/persistenceutil.h
+++ b/storage/src/vespa/storage/persistence/persistenceutil.h
@@ -1,12 +1,11 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
+#include "types.h"
#include <vespa/persistence/spi/persistenceprovider.h>
#include <vespa/storage/common/servicelayercomponent.h>
#include <vespa/storage/persistence/filestorage/filestorhandler.h>
#include <vespa/storage/persistence/filestorage/filestormetrics.h>
-#include <vespa/storage/persistence/filestorage/pausehandler.h>
-#include <vespa/storage/persistence/types.h>
#include <vespa/vespalib/io/fileutil.h>
#include <vespa/storage/storageutil/utils.h>
#include <vespa/config-stor-filestor.h>
@@ -70,7 +69,6 @@ struct PersistenceUtil {
FileStorThreadMetrics& _metrics;
const document::BucketIdFactory& _bucketFactory;
const std::shared_ptr<document::DocumentTypeRepo> _repo;
- PauseHandler _pauseHandler;
spi::PersistenceProvider& _spi;
PersistenceUtil(
diff --git a/storage/src/vespa/storage/storageserver/storagenode.h b/storage/src/vespa/storage/storageserver/storagenode.h
index 38a48ac2eed..c8739442c13 100644
--- a/storage/src/vespa/storage/storageserver/storagenode.h
+++ b/storage/src/vespa/storage/storageserver/storagenode.h
@@ -115,20 +115,20 @@ private:
bool _attemptedStopped;
vespalib::string _pidFile;
- // First components that doesn't depend on others
+ // First components that doesn't depend on others
std::unique_ptr<StatusWebServer> _statusWebServer;
std::shared_ptr<StorageMetricSet> _metrics;
std::unique_ptr<metrics::MetricManager> _metricManager;
- // Depends on bucket databases and stop() functionality
+ // Depends on bucket databases and stop() functionality
std::unique_ptr<DeadLockDetector> _deadLockDetector;
- // Depends on metric manager
+ // Depends on metric manager
std::unique_ptr<StatusMetricConsumer> _statusMetrics;
- // Depends on metric manager
+ // Depends on metric manager
std::unique_ptr<StateReporter> _stateReporter;
std::unique_ptr<StateManager> _stateManager;
- // The storage chain can depend on anything.
+ // The storage chain can depend on anything.
std::unique_ptr<StorageLink> _chain;
/** Implementation of config callbacks. */