aboutsummaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@oath.com>2018-03-20 13:22:39 +0100
committerGitHub <noreply@github.com>2018-03-20 13:22:39 +0100
commit76cd17602b493a13d1e993b1f72cfdaf7b627e0b (patch)
tree2d3bd1ee963de6107b80b183a23d03708764fa1a /storage
parente31b9616921f9046a1fabbe9fca9cf3a5d535279 (diff)
parentb9c175537bc743a0bf457d579d03b59867d1a1f1 (diff)
Merge pull request #5378 from vespa-engine/balder/no-more-thread-priorities
Balder/no more thread priorities
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/common/testhelper.cpp23
-rw-r--r--storage/src/tests/persistence/filestorage/filestormanagertest.cpp413
-rw-r--r--storage/src/tests/persistence/filestorage/operationabortingtest.cpp99
-rw-r--r--storage/src/tests/persistence/persistencequeuetest.cpp18
-rw-r--r--storage/src/tests/persistence/persistencetestutils.cpp32
-rw-r--r--storage/src/vespa/storage/common/storagelink.cpp159
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp44
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandler.h30
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp156
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h24
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp188
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/pausehandler.h34
-rw-r--r--storage/src/vespa/storage/persistence/persistencethread.cpp215
-rw-r--r--storage/src/vespa/storage/persistence/persistencethread.h2
-rw-r--r--storage/src/vespa/storage/persistence/persistenceutil.cpp19
-rw-r--r--storage/src/vespa/storage/persistence/persistenceutil.h6
-rw-r--r--storage/src/vespa/storage/storageserver/storagenode.h10
17 files changed, 352 insertions, 1120 deletions
diff --git a/storage/src/tests/common/testhelper.cpp b/storage/src/tests/common/testhelper.cpp
index 296a9a3cc0f..f7da854be68 100644
--- a/storage/src/tests/common/testhelper.cpp
+++ b/storage/src/tests/common/testhelper.cpp
@@ -9,23 +9,6 @@ LOG_SETUP(".testhelper");
namespace storage {
-namespace {
- bool useNewStorageCore() {
- if ( // Unit test directory
- vespalib::fileExists("use_new_storage_core") ||
- // src/cpp directory
- vespalib::fileExists("../use_new_storage_core") ||
- // Top build directory where storage-HEAD remains
- vespalib::fileExists("../../../../use_new_storage_core"))
- {
- std::cerr << "Using new storage core for unit tests\n";
- return true;
- }
- return false;
- }
- bool newStorageCore(useNewStorageCore());
-}
-
void addStorageDistributionConfig(vdstestlib::DirConfig& dc)
{
vdstestlib::DirConfig::Config* config;
@@ -124,18 +107,14 @@ vdstestlib::DirConfig getStandardConfig(bool storagenode, const std::string & ro
config->set("minimum_file_meta_slots", "2");
config->set("minimum_file_header_block_size", "368");
config->set("minimum_file_size", "4096");
- config->set("threads[1]");
- config->set("threads[0].lowestpri 255");
+ config->set("num_threads", "1");
config->set("dir_spread", "4");
config->set("dir_levels", "0");
- config->set("use_new_core", newStorageCore ? "true" : "false");
config->set("maximum_versions_of_single_document_stored", "0");
//config->set("enable_slotfile_cache", "false");
// Unit tests typically use fake low time values, so don't complain
// about them or compact/delete them by default. Override in tests testing that
// behavior
- config->set("time_future_limit", "5");
- config->set("time_past_limit", "2000000000");
config->set("keep_remove_time_period", "2000000000");
config->set("revert_time_period", "2000000000");
// Don't want test to call exit()
diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
index 8d03c9de54c..26bfeb41b42 100644
--- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
+++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
@@ -7,17 +7,10 @@
#include <vespa/document/test/make_document_bucket.h>
#include <vespa/storage/storageserver/statemanager.h>
#include <vespa/storage/bucketdb/bucketmanager.h>
-#include <vespa/vdslib/state/cluster_state_bundle.h>
#include <vespa/storage/persistence/persistencethread.h>
#include <vespa/storage/persistence/filestorage/filestormanager.h>
#include <vespa/storage/persistence/filestorage/modifiedbucketchecker.h>
#include <vespa/document/update/assignvalueupdate.h>
-#include <vespa/document/datatype/datatype.h>
-#include <vespa/document/fieldvalue/document.h>
-#include <vespa/document/datatype/documenttype.h>
-#include <vespa/document/update/documentupdate.h>
-#include <vespa/document/fieldvalue/rawfieldvalue.h>
-#include <vespa/document/fieldvalue/stringfieldvalue.h>
#include <vespa/document/select/parser.h>
#include <vespa/vdslib/state/random.h>
#include <vespa/storageapi/message/bucketsplitting.h>
@@ -76,8 +69,6 @@ struct FileStorManagerTest : public CppUnit::TestFixture {
void testFlush();
void testRemapSplit();
void testHandlerPriority();
- void testHandlerPriorityBlocking();
- void testHandlerPriorityPreempt();
void testHandlerMulti();
void testHandlerTimeout();
void testHandlerPause();
@@ -102,7 +93,6 @@ struct FileStorManagerTest : public CppUnit::TestFixture {
void testMergeBucketImplicitCreateBucket();
void testNewlyCreatedBucketIsReady();
void testCreateBucketSetsActiveFlagInDatabaseAndReply();
- void testFileStorThreadLockingStressTest();
void testStateChange();
void testRepairNotifiesDistributorOnChange();
void testDiskMove();
@@ -113,8 +103,6 @@ struct FileStorManagerTest : public CppUnit::TestFixture {
CPPUNIT_TEST(testFlush);
CPPUNIT_TEST(testRemapSplit);
CPPUNIT_TEST(testHandlerPriority);
- CPPUNIT_TEST(testHandlerPriorityBlocking);
- CPPUNIT_TEST(testHandlerPriorityPreempt);
CPPUNIT_TEST(testHandlerMulti);
CPPUNIT_TEST(testHandlerTimeout);
CPPUNIT_TEST(testHandlerPause);
@@ -146,21 +134,17 @@ struct FileStorManagerTest : public CppUnit::TestFixture {
void createBucket(document::BucketId bid, uint16_t disk)
{
- spi::Context context(defaultLoadType, spi::Priority(0),
- spi::Trace::TraceLevel(0));
- _node->getPersistenceProvider().createBucket(
- makeSpiBucket(bid, spi::PartitionId(disk)), context);
+ spi::Context context(defaultLoadType, spi::Priority(0), spi::Trace::TraceLevel(0));
+ _node->getPersistenceProvider().createBucket(makeSpiBucket(bid, spi::PartitionId(disk)), context);
StorBucketDatabase::WrappedEntry entry(
- _node->getStorageBucketDatabase().get(bid, "foo",
- StorBucketDatabase::CREATE_IF_NONEXISTING));
+ _node->getStorageBucketDatabase().get(bid, "foo", StorBucketDatabase::CREATE_IF_NONEXISTING));
entry->disk = disk;
entry->info = api::BucketInfo(0, 0, 0, 0, 0, true, false);
entry.write();
}
- document::Document::UP createDocument(
- const std::string& content, const std::string& id)
+ document::Document::UP createDocument(const std::string& content, const std::string& id)
{
return _node->getTestDocMan().createDocument(content, id);
}
@@ -265,15 +249,14 @@ std::unique_ptr<DiskThread> createThread(vdstestlib::DirConfig& config,
spi::PersistenceProvider& provider,
FileStorHandler& filestorHandler,
FileStorThreadMetrics& metrics,
- uint16_t deviceIndex,
- uint8_t lowestPriority)
+ uint16_t deviceIndex)
{
(void) config;
std::unique_ptr<DiskThread> disk;
disk.reset(new PersistenceThread(
node.getComponentRegister(), config.getConfigId(), provider,
filestorHandler, metrics,
- deviceIndex, lowestPriority));
+ deviceIndex));
return disk;
}
@@ -629,39 +612,31 @@ FileStorManagerTest::testHandlerPriority()
FileStorMetrics metrics(loadTypes.getMetricLoadTypes());
metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1);
- FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(),
- _node->getComponentRegister(), 255, 0);
+ FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), _node->getComponentRegister());
filestorHandler.setGetNextMessageTimeout(50);
std::string content("Here is some content which is in all documents");
std::ostringstream uri;
- Document::SP doc(createDocument(
- content, "userdoc:footype:1234:bar").release());
+ Document::SP doc(createDocument(content, "userdoc:footype:1234:bar").release());
document::BucketIdFactory factory;
- document::BucketId bucket(16, factory.getBucketId(
- doc->getId()).getRawId());
+ document::BucketId bucket(16, factory.getBucketId(doc->getId()).getRawId());
// Populate bucket with the given data
for (uint32_t i = 1; i < 6; i++) {
- std::shared_ptr<api::PutCommand> cmd(
- new api::PutCommand(makeDocumentBucket(bucket), doc, 100));
- std::unique_ptr<api::StorageMessageAddress> address(
- new api::StorageMessageAddress(
- "storage", lib::NodeType::STORAGE, 3));
+ auto cmd = std::make_shared<api::PutCommand>(makeDocumentBucket(bucket), doc, 100);
+ auto address = std::make_shared<api::StorageMessageAddress>("storage", lib::NodeType::STORAGE, 3);
cmd->setAddress(*address);
cmd->setPriority(i * 15);
filestorHandler.schedule(cmd, 0);
}
- CPPUNIT_ASSERT_EQUAL(15, (int)filestorHandler.getNextMessage(0, 20).second->getPriority());
- CPPUNIT_ASSERT(filestorHandler.getNextMessage(0, 20).second.get() == NULL);
- CPPUNIT_ASSERT_EQUAL(30, (int)filestorHandler.getNextMessage(0, 50).second->getPriority());
- CPPUNIT_ASSERT_EQUAL(45, (int)filestorHandler.getNextMessage(0, 50).second->getPriority());
- CPPUNIT_ASSERT(filestorHandler.getNextMessage(0, 50).second.get() == NULL);
- CPPUNIT_ASSERT_EQUAL(60, (int)filestorHandler.getNextMessage(0, 255).second->getPriority());
- CPPUNIT_ASSERT_EQUAL(75, (int)filestorHandler.getNextMessage(0, 255).second->getPriority());
+ CPPUNIT_ASSERT_EQUAL(15, (int)filestorHandler.getNextMessage(0).second->getPriority());
+ CPPUNIT_ASSERT_EQUAL(30, (int)filestorHandler.getNextMessage(0).second->getPriority());
+ CPPUNIT_ASSERT_EQUAL(45, (int)filestorHandler.getNextMessage(0).second->getPriority());
+ CPPUNIT_ASSERT_EQUAL(60, (int)filestorHandler.getNextMessage(0).second->getPriority());
+ CPPUNIT_ASSERT_EQUAL(75, (int)filestorHandler.getNextMessage(0).second->getPriority());
}
class MessagePusherThread : public document::Runnable
@@ -678,11 +653,9 @@ public:
void run() override {
while (!_done) {
document::BucketIdFactory factory;
- document::BucketId bucket(16, factory.getBucketId(
- _doc->getId()).getRawId());
+ document::BucketId bucket(16, factory.getBucketId(_doc->getId()).getRawId());
- std::shared_ptr<api::PutCommand> cmd(
- new api::PutCommand(makeDocumentBucket(bucket), _doc, 100));
+ auto cmd = std::make_shared<api::PutCommand>(makeDocumentBucket(bucket), _doc, 100);
_handler.schedule(cmd, 0);
FastOS_Thread::Sleep(1);
}
@@ -694,7 +667,7 @@ public:
MessagePusherThread::MessagePusherThread(FileStorHandler& handler, Document::SP doc)
: _handler(handler), _doc(doc), _done(false), _threadDone(false)
{}
-MessagePusherThread::~MessagePusherThread() {}
+MessagePusherThread::~MessagePusherThread() = default;
class MessageFetchingThread : public document::Runnable {
public:
@@ -711,7 +684,7 @@ public:
void run() override {
while (!_done) {
- FileStorHandler::LockedMessage msg = _handler.getNextMessage(0, 255);
+ FileStorHandler::LockedMessage msg = _handler.getNextMessage(0);
if (msg.second.get()) {
uint32_t originalConfig = _config.load();
_fetchedCount++;
@@ -747,8 +720,7 @@ FileStorManagerTest::testHandlerPausedMultiThread()
FileStorMetrics metrics(loadTypes.getMetricLoadTypes());
metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1);
- FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(),
- _node->getComponentRegister(), 255, 0);
+ FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), _node->getComponentRegister());
filestorHandler.setGetNextMessageTimeout(50);
std::string content("Here is some content which is in all documents");
@@ -799,8 +771,7 @@ FileStorManagerTest::testHandlerPause()
FileStorMetrics metrics(loadTypes.getMetricLoadTypes());
metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1);
- FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(),
- _node->getComponentRegister(), 255, 0);
+ FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), _node->getComponentRegister());
filestorHandler.setGetNextMessageTimeout(50);
std::string content("Here is some content which is in all documents");
@@ -810,29 +781,26 @@ FileStorManagerTest::testHandlerPause()
document::BucketIdFactory factory;
document::BucketId bucket(16, factory.getBucketId(
- doc->getId()).getRawId());
+ doc->getId()).getRawId());
// Populate bucket with the given data
for (uint32_t i = 1; i < 6; i++) {
- std::shared_ptr<api::PutCommand> cmd(
- new api::PutCommand(makeDocumentBucket(bucket), doc, 100));
- std::unique_ptr<api::StorageMessageAddress> address(
- new api::StorageMessageAddress(
- "storage", lib::NodeType::STORAGE, 3));
+ auto cmd = std::make_shared<api::PutCommand>(makeDocumentBucket(bucket), doc, 100);
+ auto address = std::make_unique<api::StorageMessageAddress>("storage", lib::NodeType::STORAGE, 3);
cmd->setAddress(*address);
cmd->setPriority(i * 15);
filestorHandler.schedule(cmd, 0);
}
- CPPUNIT_ASSERT_EQUAL(15, (int)filestorHandler.getNextMessage(0, 255).second->getPriority());
+ CPPUNIT_ASSERT_EQUAL(15, (int)filestorHandler.getNextMessage(0).second->getPriority());
{
ResumeGuard guard = filestorHandler.pause();
(void)guard;
- CPPUNIT_ASSERT(filestorHandler.getNextMessage(0, 255).second.get() == NULL);
+ CPPUNIT_ASSERT(filestorHandler.getNextMessage(0).second.get() == NULL);
}
- CPPUNIT_ASSERT_EQUAL(30, (int)filestorHandler.getNextMessage(0, 255).second->getPriority());
+ CPPUNIT_ASSERT_EQUAL(30, (int)filestorHandler.getNextMessage(0).second->getPriority());
}
namespace {
@@ -855,8 +823,7 @@ FileStorManagerTest::testRemapSplit()
// Setup a filestorthread to test
DummyStorageLink top;
DummyStorageLink *dummyManager;
- top.push_back(std::unique_ptr<StorageLink>(
- dummyManager = new DummyStorageLink));
+ top.push_back(std::unique_ptr<StorageLink>(dummyManager = new DummyStorageLink));
top.open();
ForwardingMessageSender messageSender(*dummyManager);
// Since we fake time with small numbers, we need to make sure we dont
@@ -867,7 +834,7 @@ FileStorManagerTest::testRemapSplit()
metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1);
FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(),
- _node->getComponentRegister(), 255, 0);
+ _node->getComponentRegister());
filestorHandler.setGetNextMessageTimeout(50);
std::string content("Here is some content which is in all documents");
@@ -882,10 +849,8 @@ FileStorManagerTest::testRemapSplit()
// Populate bucket with the given data
for (uint32_t i = 1; i < 4; i++) {
- filestorHandler.schedule(
- api::StorageMessage::SP(new api::PutCommand(makeDocumentBucket(bucket1), doc1, i)), 0);
- filestorHandler.schedule(
- api::StorageMessage::SP(new api::PutCommand(makeDocumentBucket(bucket2), doc2, i + 10)), 0);
+ filestorHandler.schedule(std::make_shared<api::PutCommand>(makeDocumentBucket(bucket1), doc1, i), 0);
+ filestorHandler.schedule(std::make_shared<api::PutCommand>(makeDocumentBucket(bucket2), doc2, i + 10), 0);
}
CPPUNIT_ASSERT_EQUAL(std::string("BucketId(0x40000000000004d2): Put(BucketId(0x40000000000004d2), userdoc:footype:1234:bar, timestamp 1, size 108) (priority: 127)\n"
@@ -933,7 +898,7 @@ FileStorManagerTest::testHandlerMulti()
metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1);
FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(),
- _node->getComponentRegister(), 255, 0);
+ _node->getComponentRegister());
filestorHandler.setGetNextMessageTimeout(50);
std::string content("Here is some content which is in all documents");
@@ -943,10 +908,8 @@ FileStorManagerTest::testHandlerMulti()
Document::SP doc2(createDocument(content, "userdoc:footype:4567:bar").release());
document::BucketIdFactory factory;
- document::BucketId bucket1(16, factory.getBucketId(
- doc1->getId()).getRawId());
- document::BucketId bucket2(16, factory.getBucketId(
- doc2->getId()).getRawId());
+ document::BucketId bucket1(16, factory.getBucketId(doc1->getId()).getRawId());
+ document::BucketId bucket2(16, factory.getBucketId(doc2->getId()).getRawId());
// Populate bucket with the given data
for (uint32_t i = 1; i < 10; i++) {
@@ -957,21 +920,21 @@ FileStorManagerTest::testHandlerMulti()
}
{
- FileStorHandler::LockedMessage lock = filestorHandler.getNextMessage(0, 255);
+ FileStorHandler::LockedMessage lock = filestorHandler.getNextMessage(0);
CPPUNIT_ASSERT_EQUAL((uint64_t)1, getPutTime(lock.second));
- lock = filestorHandler.getNextMessage(0, lock, 255);
+ lock = filestorHandler.getNextMessage(0, lock);
CPPUNIT_ASSERT_EQUAL((uint64_t)2, getPutTime(lock.second));
- lock = filestorHandler.getNextMessage(0, lock, 255);
+ lock = filestorHandler.getNextMessage(0, lock);
CPPUNIT_ASSERT_EQUAL((uint64_t)3, getPutTime(lock.second));
}
{
- FileStorHandler::LockedMessage lock = filestorHandler.getNextMessage(0, 255);
+ FileStorHandler::LockedMessage lock = filestorHandler.getNextMessage(0);
CPPUNIT_ASSERT_EQUAL((uint64_t)11, getPutTime(lock.second));
- lock = filestorHandler.getNextMessage(0, lock, 255);
+ lock = filestorHandler.getNextMessage(0, lock);
CPPUNIT_ASSERT_EQUAL((uint64_t)12, getPutTime(lock.second));
}
}
@@ -984,8 +947,7 @@ FileStorManagerTest::testHandlerTimeout()
// Setup a filestorthread to test
DummyStorageLink top;
DummyStorageLink *dummyManager;
- top.push_back(std::unique_ptr<StorageLink>(
- dummyManager = new DummyStorageLink));
+ top.push_back(std::unique_ptr<StorageLink>(dummyManager = new DummyStorageLink));
top.open();
ForwardingMessageSender messageSender(*dummyManager);
@@ -997,7 +959,7 @@ FileStorManagerTest::testHandlerTimeout()
metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1);
FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(),
- _node->getComponentRegister(), 255, 0);
+ _node->getComponentRegister());
filestorHandler.setGetNextMessageTimeout(50);
std::string content("Here is some content which is in all documents");
@@ -1036,7 +998,7 @@ FileStorManagerTest::testHandlerTimeout()
FastOS_Thread::Sleep(51);
for (;;) {
- auto lock = filestorHandler.getNextMessage(0, 255);
+ auto lock = filestorHandler.getNextMessage(0);
if (lock.first.get()) {
CPPUNIT_ASSERT_EQUAL(uint8_t(200), lock.second->getPriority());
break;
@@ -1050,208 +1012,6 @@ FileStorManagerTest::testHandlerTimeout()
}
void
-FileStorManagerTest::testHandlerPriorityBlocking()
-{
- TestName testName("testHandlerPriorityBlocking");
- // Setup a filestorthread to test
- DummyStorageLink top;
- DummyStorageLink *dummyManager;
- top.push_back(std::unique_ptr<StorageLink>(
- dummyManager = new DummyStorageLink));
- top.open();
- ForwardingMessageSender messageSender(*dummyManager);
- // Since we fake time with small numbers, we need to make sure we dont
- // compact them away, as they will seem to be from 1970
-
- documentapi::LoadTypeSet loadTypes("raw:");
- FileStorMetrics metrics(loadTypes.getMetricLoadTypes());
- metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1);
-
- FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(),
- _node->getComponentRegister(), 21, 21);
- filestorHandler.setGetNextMessageTimeout(50);
-
- std::string content("Here is some content which is in all documents");
- std::ostringstream uri;
-
- document::BucketIdFactory factory;
-
- // Populate bucket with the given data
- for (uint32_t i = 1; i < 6; i++) {
- Document::SP doc(createDocument(content, vespalib::make_string("doc:foo:%d",i)).release());
- document::BucketId bucket(16, factory.getBucketId(
- doc->getId()).getRawId());
- std::shared_ptr<api::PutCommand> cmd(
- new api::PutCommand(makeDocumentBucket(bucket), doc, 100));
- std::unique_ptr<api::StorageMessageAddress> address(
- new api::StorageMessageAddress(
- "storage", lib::NodeType::STORAGE, 3));
- cmd->setAddress(*address);
- cmd->setPriority(i * 15);
- filestorHandler.schedule(cmd, 0);
- }
-
- {
- FileStorHandler::LockedMessage lock1 = filestorHandler.getNextMessage(0, 20);
- CPPUNIT_ASSERT_EQUAL(15, (int)lock1.second->getPriority());
-
- LOG(debug, "Waiting for request that should time out");
- FileStorHandler::LockedMessage lock2 = filestorHandler.getNextMessage(0, 30);
- LOG(debug, "Got request that should time out");
- CPPUNIT_ASSERT(lock2.second.get() == NULL);
- }
-
- {
- FileStorHandler::LockedMessage lock1 = filestorHandler.getNextMessage(0, 40);
- CPPUNIT_ASSERT_EQUAL(30, (int)lock1.second->getPriority());
-
- // New high-pri message comes in
- Document::SP doc(createDocument(content, vespalib::make_string("doc:foo:%d", 100)).release());
- document::BucketId bucket(16, factory.getBucketId(
- doc->getId()).getRawId());
- std::shared_ptr<api::PutCommand> cmd(
- new api::PutCommand(makeDocumentBucket(bucket), doc, 100));
- std::unique_ptr<api::StorageMessageAddress> address(
- new api::StorageMessageAddress(
- "storage", lib::NodeType::STORAGE, 3));
- cmd->setAddress(*address);
- cmd->setPriority(15);
- filestorHandler.schedule(cmd, 0);
-
- FileStorHandler::LockedMessage lock2 = filestorHandler.getNextMessage(0, 20);
- CPPUNIT_ASSERT_EQUAL(15, (int)lock2.second->getPriority());
-
- LOG(debug, "Waiting for request that should time out");
- FileStorHandler::LockedMessage lock3 = filestorHandler.getNextMessage(0, 255);
- LOG(debug, "Got request that should time out");
- CPPUNIT_ASSERT(lock3.second.get() == NULL);
- }
-
- {
- FileStorHandler::LockedMessage lock1 = filestorHandler.getNextMessage(0, 255);
- CPPUNIT_ASSERT_EQUAL(45, (int)lock1.second->getPriority());
-
- FileStorHandler::LockedMessage lock = filestorHandler.getNextMessage(0, 255);
- CPPUNIT_ASSERT_EQUAL(60, (int)lock.second->getPriority());
- }
- LOG(debug, "Test done");
-}
-
-class PausedThread : public document::Runnable {
-private:
- FileStorHandler& _handler;
-
-public:
- bool pause;
- bool done;
- bool gotoperation;
-
- PausedThread(FileStorHandler& handler)
- : _handler(handler), pause(false), done(false), gotoperation(false) {}
-
- void run() override {
- FileStorHandler::LockedMessage msg = _handler.getNextMessage(0, 255);
- gotoperation = true;
-
- while (!done) {
- if (pause) {
- _handler.pause(0, msg.second->getPriority());
- pause = false;
- }
- FastOS_Thread::Sleep(10);
- }
-
- done = false;
- };
-};
-
-void
-FileStorManagerTest::testHandlerPriorityPreempt()
-{
- TestName testName("testHandlerPriorityPreempt");
- // Setup a filestorthread to test
- DummyStorageLink top;
- DummyStorageLink *dummyManager;
- top.push_back(std::unique_ptr<StorageLink>(
- dummyManager = new DummyStorageLink));
- top.open();
- ForwardingMessageSender messageSender(*dummyManager);
- // Since we fake time with small numbers, we need to make sure we dont
- // compact them away, as they will seem to be from 1970
-
- documentapi::LoadTypeSet loadTypes("raw:");
- FileStorMetrics metrics(loadTypes.getMetricLoadTypes());
- metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1);
-
- FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(),
- _node->getComponentRegister(), 21, 21);
- filestorHandler.setGetNextMessageTimeout(50);
-
- std::string content("Here is some content which is in all documents");
- std::ostringstream uri;
-
- document::BucketIdFactory factory;
-
- {
- Document::SP doc(createDocument(content, "doc:foo:1").release());
- document::BucketId bucket(16, factory.getBucketId(
- doc->getId()).getRawId());
- std::shared_ptr<api::PutCommand> cmd(
- new api::PutCommand(makeDocumentBucket(bucket), doc, 100));
- std::unique_ptr<api::StorageMessageAddress> address(
- new api::StorageMessageAddress(
- "storage", lib::NodeType::STORAGE, 3));
- cmd->setAddress(*address);
- cmd->setPriority(60);
- filestorHandler.schedule(cmd, 0);
- }
-
- PausedThread thread(filestorHandler);
- FastOS_ThreadPool pool(512 * 1024);
- thread.start(pool);
-
- while (!thread.gotoperation) {
- FastOS_Thread::Sleep(10);
- }
-
- {
- Document::SP doc(createDocument(content, "doc:foo:2").release());
- document::BucketId bucket(16, factory.getBucketId(
- doc->getId()).getRawId());
- std::shared_ptr<api::PutCommand> cmd(
- new api::PutCommand(makeDocumentBucket(bucket), doc, 100));
- std::unique_ptr<api::StorageMessageAddress> address(
- new api::StorageMessageAddress(
- "storage", lib::NodeType::STORAGE, 3));
- cmd->setAddress(*address);
- cmd->setPriority(20);
- filestorHandler.schedule(cmd, 0);
- }
-
- {
- FileStorHandler::LockedMessage lock1 = filestorHandler.getNextMessage(0, 20);
- CPPUNIT_ASSERT_EQUAL(20, (int)lock1.second->getPriority());
-
- thread.pause = true;
-
- for (uint32_t i = 0; i < 10; i++) {
- CPPUNIT_ASSERT(thread.pause);
- FastOS_Thread::Sleep(100);
- }
- }
-
- while (thread.pause) {
- FastOS_Thread::Sleep(10);
- }
-
- thread.done = true;
-
- while (thread.done) {
- FastOS_Thread::Sleep(10);
- }
-}
-
-void
FileStorManagerTest::testPriority()
{
TestName testName("testPriority");
@@ -1269,14 +1029,13 @@ FileStorManagerTest::testPriority()
FileStorMetrics metrics(loadTypes.getMetricLoadTypes());
metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 2);
- FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(),
- _node->getComponentRegister(), 255, 0);
+ FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), _node->getComponentRegister());
std::unique_ptr<DiskThread> thread(createThread(
*config, *_node, _node->getPersistenceProvider(),
- filestorHandler, *metrics.disks[0]->threads[0], 0, 25));
+ filestorHandler, *metrics.disks[0]->threads[0], 0));
std::unique_ptr<DiskThread> thread2(createThread(
*config, *_node, _node->getPersistenceProvider(),
- filestorHandler, *metrics.disks[0]->threads[1], 0, 255));
+ filestorHandler, *metrics.disks[0]->threads[1], 0));
// Creating documents to test with. Different gids, 2 locations.
std::vector<document::Document::SP > documents;
@@ -1284,8 +1043,7 @@ FileStorManagerTest::testPriority()
std::string content("Here is some content which is in all documents");
std::ostringstream uri;
- uri << "userdoc:footype:" << (i % 3 == 0 ? 0x10001 : 0x0100001)
- << ":mydoc-" << i;
+ uri << "userdoc:footype:" << (i % 3 == 0 ? 0x10001 : 0x0100001)<< ":mydoc-" << i;
Document::SP doc(createDocument(content, uri.str()).release());
documents.push_back(doc);
}
@@ -1294,20 +1052,16 @@ FileStorManagerTest::testPriority()
// Create buckets in separate, initial pass to avoid races with puts
for (uint32_t i=0; i<documents.size(); ++i) {
- document::BucketId bucket(16, factory.getBucketId(
- documents[i]->getId()).getRawId());
+ document::BucketId bucket(16, factory.getBucketId(documents[i]->getId()).getRawId());
- spi::Context context(defaultLoadType, spi::Priority(0),
- spi::Trace::TraceLevel(0));
+ spi::Context context(defaultLoadType, spi::Priority(0), spi::Trace::TraceLevel(0));
- _node->getPersistenceProvider().createBucket(
- makeSpiBucket(bucket), context);
+ _node->getPersistenceProvider().createBucket(makeSpiBucket(bucket), context);
}
// Populate bucket with the given data
for (uint32_t i=0; i<documents.size(); ++i) {
- document::BucketId bucket(16, factory.getBucketId(
- documents[i]->getId()).getRawId());
+ document::BucketId bucket(16, factory.getBucketId(documents[i]->getId()).getRawId());
std::shared_ptr<api::PutCommand> cmd(
new api::PutCommand(makeDocumentBucket(bucket), documents[i], 100 + i));
@@ -1342,9 +1096,8 @@ FileStorManagerTest::testPriority()
CPPUNIT_ASSERT_EQUAL(uint64_t(documents.size()),
metrics.disks[0]->threads[0]->operations.getValue()
+ metrics.disks[0]->threads[1]->operations.getValue());
- CPPUNIT_ASSERT(metrics.disks[0]->threads[0]->operations.getValue() <= 13);
- // Closing file stor handler before threads are deleted, such that
- // file stor threads getNextMessage calls returns.
+ // Closing file stor handler before threads are deleted, such that
+ // file stor threads getNextMessage calls returns.
filestorHandler.close();
}
@@ -1363,11 +1116,10 @@ FileStorManagerTest::testSplit1()
documentapi::LoadTypeSet loadTypes("raw:");
FileStorMetrics metrics(loadTypes.getMetricLoadTypes());
metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1);
- FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(),
- _node->getComponentRegister(), 255, 0);
+ FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), _node->getComponentRegister());
std::unique_ptr<DiskThread> thread(createThread(
*config, *_node, _node->getPersistenceProvider(),
- filestorHandler, *metrics.disks[0]->threads[0], 0, 255));
+ filestorHandler, *metrics.disks[0]->threads[0], 0));
// Creating documents to test with. Different gids, 2 locations.
std::vector<document::Document::SP > documents;
for (uint32_t i=0; i<20; ++i) {
@@ -1532,10 +1284,8 @@ FileStorManagerTest::testSplitSingleGroup()
documentapi::LoadTypeSet loadTypes("raw:");
FileStorMetrics metrics(loadTypes.getMetricLoadTypes());
metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1);
- FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(),
- _node->getComponentRegister(), 255, 0);
- spi::Context context(defaultLoadType, spi::Priority(0),
- spi::Trace::TraceLevel(0));
+ FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), _node->getComponentRegister());
+ spi::Context context(defaultLoadType, spi::Priority(0), spi::Trace::TraceLevel(0));
for (uint32_t j=0; j<1; ++j) {
// Test this twice, once where all the data ends up in file with
// splitbit set, and once where all the data ends up in file with
@@ -1544,7 +1294,7 @@ FileStorManagerTest::testSplitSingleGroup()
std::unique_ptr<DiskThread> thread(createThread(
*config, *_node, _node->getPersistenceProvider(),
- filestorHandler, *metrics.disks[0]->threads[0], 0, 255));
+ filestorHandler, *metrics.disks[0]->threads[0], 0));
// Creating documents to test with. Different gids, 2 locations.
std::vector<document::Document::SP > documents;
for (uint32_t i=0; i<20; ++i) {
@@ -1673,11 +1423,10 @@ FileStorManagerTest::testSplitEmptyTargetWithRemappedOps()
documentapi::LoadTypeSet loadTypes("raw:");
FileStorMetrics metrics(loadTypes.getMetricLoadTypes());
metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1);
- FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(),
- _node->getComponentRegister(), 255, 0);
+ FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), _node->getComponentRegister());
std::unique_ptr<DiskThread> thread(createThread(
*config, *_node, _node->getPersistenceProvider(),
- filestorHandler, *metrics.disks[0]->threads[0], 0, 255));
+ filestorHandler, *metrics.disks[0]->threads[0], 0));
document::BucketId source(16, 0x10001);
@@ -1751,11 +1500,10 @@ FileStorManagerTest::testNotifyOnSplitSourceOwnershipChanged()
documentapi::LoadTypeSet loadTypes("raw:");
FileStorMetrics metrics(loadTypes.getMetricLoadTypes());
metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1);
- FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(),
- _node->getComponentRegister(), 255, 0);
+ FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), _node->getComponentRegister());
std::unique_ptr<DiskThread> thread(createThread(
*config, *_node, _node->getPersistenceProvider(),
- filestorHandler, *metrics.disks[0]->threads[0], 0, 255));
+ filestorHandler, *metrics.disks[0]->threads[0], 0));
document::BucketId source(getFirstBucketNotOwnedByDistributor(0));
createBucket(source, 0);
@@ -1801,21 +1549,18 @@ FileStorManagerTest::testJoin()
documentapi::LoadTypeSet loadTypes("raw:");
FileStorMetrics metrics(loadTypes.getMetricLoadTypes());
metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1);
- FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(),
- _node->getComponentRegister(), 255, 0);
+ FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), _node->getComponentRegister());
std::unique_ptr<DiskThread> thread(createThread(
*config, *_node, _node->getPersistenceProvider(),
- filestorHandler, *metrics.disks[0]->threads[0], 0, 255));
+ filestorHandler, *metrics.disks[0]->threads[0], 0));
// Creating documents to test with. Different gids, 2 locations.
std::vector<document::Document::SP > documents;
for (uint32_t i=0; i<20; ++i) {
std::string content("Here is some content which is in all documents");
std::ostringstream uri;
- uri << "userdoc:footype:" << (i % 3 == 0 ? 0x10001 : 0x0100001)
- << ":mydoc-" << i;
- Document::SP doc(createDocument(
- content, uri.str()).release());
+ uri << "userdoc:footype:" << (i % 3 == 0 ? 0x10001 : 0x0100001) << ":mydoc-" << i;
+ Document::SP doc(createDocument(content, uri.str()).release());
documents.push_back(doc);
}
document::BucketIdFactory factory;
@@ -1826,36 +1571,26 @@ FileStorManagerTest::testJoin()
{
// Populate bucket with the given data
for (uint32_t i=0; i<documents.size(); ++i) {
- document::BucketId bucket(17, factory.getBucketId(
- documents[i]->getId()).getRawId());
- std::shared_ptr<api::PutCommand> cmd(
- new api::PutCommand(makeDocumentBucket(bucket), documents[i], 100 + i));
- std::unique_ptr<api::StorageMessageAddress> address(
- new api::StorageMessageAddress(
- "storage", lib::NodeType::STORAGE, 3));
+ document::BucketId bucket(17, factory.getBucketId(documents[i]->getId()).getRawId());
+ auto cmd = std::make_shared<api::PutCommand>(makeDocumentBucket(bucket), documents[i], 100 + i);
+ auto address = std::make_unique<api::StorageMessageAddress>("storage", lib::NodeType::STORAGE, 3);
cmd->setAddress(*address);
filestorHandler.schedule(cmd, 0);
filestorHandler.flush(true);
CPPUNIT_ASSERT_EQUAL((size_t) 1, top.getNumReplies());
- std::shared_ptr<api::PutReply> reply(
- std::dynamic_pointer_cast<api::PutReply>(
- top.getReply(0)));
+ auto reply = std::dynamic_pointer_cast<api::PutReply>(top.getReply(0));
CPPUNIT_ASSERT(reply.get());
- CPPUNIT_ASSERT_EQUAL(ReturnCode(ReturnCode::OK),
- reply->getResult());
+ CPPUNIT_ASSERT_EQUAL(ReturnCode(ReturnCode::OK), reply->getResult());
top.reset();
// Delete every 5th document to have delete entries in file too
if (i % 5 == 0) {
- std::shared_ptr<api::RemoveCommand> rcmd(
- new api::RemoveCommand(
- makeDocumentBucket(bucket), documents[i]->getId(), 1000000 + 100 + i));
+ auto rcmd = std::make_shared<api::RemoveCommand>(makeDocumentBucket(bucket),
+ documents[i]->getId(), 1000000 + 100 + i);
rcmd->setAddress(*address);
filestorHandler.schedule(rcmd, 0);
filestorHandler.flush(true);
CPPUNIT_ASSERT_EQUAL((size_t) 1, top.getNumReplies());
- std::shared_ptr<api::RemoveReply> rreply(
- std::dynamic_pointer_cast<api::RemoveReply>(
- top.getReply(0)));
+ auto rreply = std::dynamic_pointer_cast<api::RemoveReply>(top.getReply(0));
CPPUNIT_ASSERT_MSG(top.getReply(0)->getType().toString(),
rreply.get());
CPPUNIT_ASSERT_EQUAL(ReturnCode(ReturnCode::OK),
diff --git a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp
index 95642ac6215..64ef48b5719 100644
--- a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp
+++ b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp
@@ -43,10 +43,8 @@ public:
_deleteBucketInvocations(0)
{}
- spi::Result put(const spi::Bucket& bucket,
- spi::Timestamp timestamp,
- const document::Document::SP& doc,
- spi::Context& context) override
+ spi::Result put(const spi::Bucket& bucket, spi::Timestamp timestamp,
+ const document::Document::SP& doc, spi::Context& context) override
{
(void) bucket;
(void) timestamp;
@@ -90,26 +88,20 @@ public:
void setupDisks(uint32_t diskCount, uint32_t queueBarrierThreads) {
FileStorTestFixture::setupDisks(diskCount);
- _dummyProvider.reset(new spi::dummy::DummyPersistence(
- _node->getTypeRepo(), diskCount));
+ _dummyProvider.reset(new spi::dummy::DummyPersistence(_node->getTypeRepo(), diskCount));
_queueBarrier.reset(new vespalib::Barrier(queueBarrierThreads));
_completionBarrier.reset(new vespalib::Barrier(2));
- _blockingProvider = new BlockingMockProvider(*_dummyProvider,
- *_queueBarrier, *_completionBarrier);
-
- _node->setPersistenceProvider(
- spi::PersistenceProvider::UP(_blockingProvider));
+ _blockingProvider = new BlockingMockProvider(*_dummyProvider, *_queueBarrier, *_completionBarrier);
+ _node->setPersistenceProvider(spi::PersistenceProvider::UP(_blockingProvider));
}
- void validateReplies(DummyStorageLink& link,
- size_t repliesTotal,
+ void validateReplies(DummyStorageLink& link, size_t repliesTotal,
const std::vector<document::BucketId>& okReplies,
const std::vector<document::BucketId>& abortedGetDiffs);
- void doTestSpecificOperationsNotAborted(
- const char* testName,
- const std::vector<api::StorageMessage::SP>& msgs,
- bool shouldCreateBucketInitially);
+ void doTestSpecificOperationsNotAborted(const char* testName,
+ const std::vector<api::StorageMessage::SP>& msgs,
+ bool shouldCreateBucketInitially);
api::BucketInfo getBucketInfoFromDB(const document::BucketId&) const;
@@ -138,8 +130,7 @@ namespace {
template <typename T, typename Collection>
bool
existsIn(const T& elem, const Collection& collection) {
- return (std::find(collection.begin(), collection.end(), elem)
- != collection.end());
+ return (std::find(collection.begin(), collection.end(), elem) != collection.end());
}
}
@@ -150,18 +141,15 @@ OperationAbortingTest::setUp()
}
void
-OperationAbortingTest::validateReplies(
- DummyStorageLink& link,
- size_t repliesTotal,
- const std::vector<document::BucketId>& okReplies,
- const std::vector<document::BucketId>& abortedGetDiffs)
+OperationAbortingTest::validateReplies(DummyStorageLink& link, size_t repliesTotal,
+ const std::vector<document::BucketId>& okReplies,
+ const std::vector<document::BucketId>& abortedGetDiffs)
{
link.waitForMessages(repliesTotal, MSG_WAIT_TIME);
CPPUNIT_ASSERT_EQUAL(repliesTotal, link.getNumReplies());
for (uint32_t i = 0; i < repliesTotal; ++i) {
- api::StorageReply& reply(
- dynamic_cast<api::StorageReply&>(*link.getReply(i)));
+ api::StorageReply& reply(dynamic_cast<api::StorageReply&>(*link.getReply(i)));
LOG(info, "Checking reply %s", reply.toString(true).c_str());
switch (static_cast<uint32_t>(reply.getType().getId())) {
case api::MessageType::PUT_REPLY_ID:
@@ -222,11 +210,8 @@ template <typename Container>
AbortBucketOperationsCommand::SP
makeAbortCmd(const Container& buckets)
{
- std::unique_ptr<AbortBucketOperationsCommand::AbortPredicate> pred(
- new ExplicitBucketSetPredicate(buckets.begin(), buckets.end()));
- AbortBucketOperationsCommand::SP cmd(
- new AbortBucketOperationsCommand(std::move(pred)));
- return cmd;
+ auto pred = std::make_unique<ExplicitBucketSetPredicate>(buckets.begin(), buckets.end());
+ return std::make_shared<AbortBucketOperationsCommand>(std::move(pred));
}
}
@@ -320,8 +305,7 @@ public:
void
OperationAbortingTest::testWaitForCurrentOperationCompletionForAbortedBucket()
{
- uint32_t queueBarrierThreads = 3;
- setupDisks(1, queueBarrierThreads);
+ setupDisks(1, 3);
TestFileStorComponents c(*this, "testWaitForCurrentOperationCompletionForAbortedBucket");
document::BucketId bucket(16, 1);
@@ -350,10 +334,8 @@ OperationAbortingTest::testWaitForCurrentOperationCompletionForAbortedBucket()
// reply, as it must finish processing fully before the abort returns.
c.top.waitForMessages(2, MSG_WAIT_TIME);
CPPUNIT_ASSERT_EQUAL(size_t(2), c.top.getNumReplies());
- CPPUNIT_ASSERT_EQUAL(api::MessageType::PUT_REPLY,
- c.top.getReply(0)->getType());
- CPPUNIT_ASSERT_EQUAL(api::MessageType::INTERNAL_REPLY,
- c.top.getReply(1)->getType());
+ CPPUNIT_ASSERT_EQUAL(api::MessageType::PUT_REPLY, c.top.getReply(0)->getType());
+ CPPUNIT_ASSERT_EQUAL(api::MessageType::INTERNAL_REPLY, c.top.getReply(1)->getType());
}
void
@@ -364,10 +346,7 @@ OperationAbortingTest::testDoNotAbortCreateBucketCommands()
msgs.push_back(api::StorageMessage::SP(new api::CreateBucketCommand(makeDocumentBucket(bucket))));
bool shouldCreateBucketInitially(false);
- doTestSpecificOperationsNotAborted(
- "testDoNotAbortCreateBucketCommands",
- msgs,
- shouldCreateBucketInitially);
+ doTestSpecificOperationsNotAborted("testDoNotAbortCreateBucketCommands", msgs, shouldCreateBucketInitially);
}
void
@@ -378,18 +357,14 @@ OperationAbortingTest::testDoNotAbortRecheckBucketCommands()
msgs.push_back(api::StorageMessage::SP(new RecheckBucketInfoCommand(makeDocumentBucket(bucket))));
bool shouldCreateBucketInitially(true);
- doTestSpecificOperationsNotAborted(
- "testDoNotAbortRecheckBucketCommands",
- msgs,
- shouldCreateBucketInitially);
+ doTestSpecificOperationsNotAborted("testDoNotAbortRecheckBucketCommands", msgs, shouldCreateBucketInitially);
}
api::BucketInfo
OperationAbortingTest::getBucketInfoFromDB(const document::BucketId& id) const
{
StorBucketDatabase::WrappedEntry entry(
- _node->getStorageBucketDatabase().get(id, "foo",
- StorBucketDatabase::CREATE_IF_NONEXISTING));
+ _node->getStorageBucketDatabase().get(id, "foo", StorBucketDatabase::CREATE_IF_NONEXISTING));
CPPUNIT_ASSERT(entry.exist());
return entry->info;
}
@@ -403,20 +378,15 @@ OperationAbortingTest::testDoNotAbortDeleteBucketCommands()
msgs.push_back(cmd);
bool shouldCreateBucketInitially(true);
- doTestSpecificOperationsNotAborted(
- "testDoNotAbortRecheckBucketCommands",
- msgs,
- shouldCreateBucketInitially);
+ doTestSpecificOperationsNotAborted("testDoNotAbortRecheckBucketCommands", msgs, shouldCreateBucketInitially);
}
void
-OperationAbortingTest::doTestSpecificOperationsNotAborted(
- const char* testName,
- const std::vector<api::StorageMessage::SP>& msgs,
- bool shouldCreateBucketInitially)
+OperationAbortingTest::doTestSpecificOperationsNotAborted(const char* testName,
+ const std::vector<api::StorageMessage::SP>& msgs,
+ bool shouldCreateBucketInitially)
{
- uint32_t queueBarrierThreads = 2;
- setupDisks(1, queueBarrierThreads);
+ setupDisks(1, 2);
TestFileStorComponents c(*this, testName);
document::BucketId bucket(16, 1);
document::BucketId blockerBucket(16, 2);
@@ -443,8 +413,7 @@ OperationAbortingTest::doTestSpecificOperationsNotAborted(
break;
case api::MessageType::DELETEBUCKET_ID:
{
- api::DeleteBucketCommand& delCmd(
- dynamic_cast<api::DeleteBucketCommand&>(*msgs[i]));
+ api::DeleteBucketCommand& delCmd(dynamic_cast<api::DeleteBucketCommand&>(*msgs[i]));
delCmd.setBucketInfo(getBucketInfoFromDB(delCmd.getBucketId()));
}
++expectedDeleteBuckets;
@@ -473,8 +442,7 @@ OperationAbortingTest::doTestSpecificOperationsNotAborted(
c.sendDummyGet(blockerBucket);
// put+abort+get + any other creates/deletes/rechecks
- size_t expectedMsgs(3 + expectedCreateBuckets + expectedDeleteBuckets
- + expectedRecheckReplies);
+ size_t expectedMsgs(3 + expectedCreateBuckets + expectedDeleteBuckets + expectedRecheckReplies);
LOG(info, "barrier passed, waiting for %zu replies", expectedMsgs);
std::vector<document::BucketId> okReplies;
@@ -483,13 +451,10 @@ OperationAbortingTest::doTestSpecificOperationsNotAborted(
std::vector<document::BucketId> abortedGetDiffs;
validateReplies(c.top, expectedMsgs, okReplies, abortedGetDiffs);
- CPPUNIT_ASSERT_EQUAL(expectedBucketInfoInvocations,
- _blockingProvider->_bucketInfoInvocations);
+ CPPUNIT_ASSERT_EQUAL(expectedBucketInfoInvocations, _blockingProvider->_bucketInfoInvocations);
CPPUNIT_ASSERT_EQUAL(expectedCreateBuckets + (shouldCreateBucketInitially ? 2 : 1),
_blockingProvider->_createBucketInvocations);
- CPPUNIT_ASSERT_EQUAL(expectedDeleteBuckets,
- _blockingProvider->_deleteBucketInvocations);
+ CPPUNIT_ASSERT_EQUAL(expectedDeleteBuckets, _blockingProvider->_deleteBucketInvocations);
}
-
-
+
} // storage
diff --git a/storage/src/tests/persistence/persistencequeuetest.cpp b/storage/src/tests/persistence/persistencequeuetest.cpp
index 77e0ed6b71c..f5f190ad718 100644
--- a/storage/src/tests/persistence/persistencequeuetest.cpp
+++ b/storage/src/tests/persistence/persistencequeuetest.cpp
@@ -77,9 +77,7 @@ PersistenceQueueTest::testFetchNextUnlockedMessageIfBucketLocked()
metrics.initDiskMetrics(_node->getPartitions().size(),
loadTypes.getMetricLoadTypes(), 1);
- FileStorHandler filestorHandler(messageSender, metrics,
- _node->getPartitions(),
- _node->getComponentRegister(), 255, 0);
+ FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), _node->getComponentRegister());
// Send 2 puts, 2 to the first bucket, 1 to the second. Calling
// getNextMessage 2 times should then return a lock on the first bucket,
@@ -89,17 +87,15 @@ PersistenceQueueTest::testFetchNextUnlockedMessageIfBucketLocked()
filestorHandler.schedule(createPut(1234, 1), 0);
filestorHandler.schedule(createPut(5432, 0), 0);
- auto lock0 = filestorHandler.getNextMessage(0, 255);
+ auto lock0 = filestorHandler.getNextMessage(0);
CPPUNIT_ASSERT(lock0.first.get());
- CPPUNIT_ASSERT_EQUAL(
- document::BucketId(16, 1234),
- dynamic_cast<api::PutCommand&>(*lock0.second).getBucketId());
+ CPPUNIT_ASSERT_EQUAL(document::BucketId(16, 1234),
+ dynamic_cast<api::PutCommand&>(*lock0.second).getBucketId());
- auto lock1 = filestorHandler.getNextMessage(0, 255);
+ auto lock1 = filestorHandler.getNextMessage(0);
CPPUNIT_ASSERT(lock1.first.get());
- CPPUNIT_ASSERT_EQUAL(
- document::BucketId(16, 5432),
- dynamic_cast<api::PutCommand&>(*lock1.second).getBucketId());
+ CPPUNIT_ASSERT_EQUAL(document::BucketId(16, 5432),
+ dynamic_cast<api::PutCommand&>(*lock1.second).getBucketId());
}
} // namespace storage
diff --git a/storage/src/tests/persistence/persistencetestutils.cpp b/storage/src/tests/persistence/persistencetestutils.cpp
index 9d3a3d5f008..d46708b3aaa 100644
--- a/storage/src/tests/persistence/persistencetestutils.cpp
+++ b/storage/src/tests/persistence/persistencetestutils.cpp
@@ -52,21 +52,13 @@ PersistenceTestEnvironment::PersistenceTestEnvironment(DiskCount numDisks, const
_node.setupDummyPersistence();
_metrics.initDiskMetrics(
numDisks, _node.getLoadTypes()->getMetricLoadTypes(), 1);
- _handler.reset(new FileStorHandler(
- _messageKeeper, _metrics,
- _node.getPersistenceProvider().getPartitionStates().getList(),
- _node.getComponentRegister(), 255, 0));
+ _handler.reset(new FileStorHandler(_messageKeeper, _metrics,
+ _node.getPersistenceProvider().getPartitionStates().getList(),
+ _node.getComponentRegister()));
for (uint32_t i = 0; i < numDisks; i++) {
_diskEnvs.push_back(
- std::unique_ptr<PersistenceUtil>(
- new PersistenceUtil(
- _config.getConfigId(),
- _node.getComponentRegister(),
- *_handler,
- *_metrics.disks[i]->threads[0],
- i,
- 255,
- _node.getPersistenceProvider())));
+ std::make_unique<PersistenceUtil>(_config.getConfigId(), _node.getComponentRegister(), *_handler,
+ *_metrics.disks[i]->threads[0], i, _node.getPersistenceProvider()));
}
}
@@ -79,8 +71,7 @@ PersistenceTestUtils::~PersistenceTestUtils()
}
std::string
-PersistenceTestUtils::dumpBucket(const document::BucketId& bid,
- uint16_t disk) {
+PersistenceTestUtils::dumpBucket(const document::BucketId& bid, uint16_t disk) {
return dynamic_cast<spi::dummy::DummyPersistence&>(_env->_node.getPersistenceProvider()).dumpBucket(makeSpiBucket(bid, spi::PartitionId(disk)));
}
@@ -92,14 +83,9 @@ PersistenceTestUtils::setupDisks(uint32_t numDisks) {
std::unique_ptr<PersistenceThread>
PersistenceTestUtils::createPersistenceThread(uint32_t disk)
{
- return std::unique_ptr<PersistenceThread>(
- new PersistenceThread(_env->_node.getComponentRegister(),
- _env->_config.getConfigId(),
- getPersistenceProvider(),
- getEnv()._fileStorHandler,
- getEnv()._metrics,
- disk,
- 255));
+ return std::make_unique<PersistenceThread>(_env->_node.getComponentRegister(), _env->_config.getConfigId(),
+ getPersistenceProvider(), getEnv()._fileStorHandler,
+ getEnv()._metrics, disk);
}
document::Document::SP
diff --git a/storage/src/vespa/storage/common/storagelink.cpp b/storage/src/vespa/storage/common/storagelink.cpp
index 8636628c7cc..b53c8c270f2 100644
--- a/storage/src/vespa/storage/common/storagelink.cpp
+++ b/storage/src/vespa/storage/common/storagelink.cpp
@@ -14,21 +14,16 @@ using namespace storage::api;
namespace storage {
-StorageLink::~StorageLink()
-{
-}
+StorageLink::~StorageLink() = default;
void StorageLink::push_back(StorageLink::UP link)
{
if (_state != CREATED) {
- LOG(error, "Attempted to alter chain by adding link %s after link %s "
- "while state is %s",
- link->toString().c_str(),
- toString().c_str(),
- stateToString(_state));
+ LOG(error, "Attempted to alter chain by adding link %s after link %s while state is %s",
+ link->toString().c_str(), toString().c_str(), stateToString(_state));
assert(false);
}
- assert(link.get());
+ assert(link);
if (isBottom()) {
link->_up = this;
_down = std::move(link);
@@ -39,26 +34,24 @@ void StorageLink::push_back(StorageLink::UP link)
void StorageLink::open()
{
- // First tag all states as opened, as components are allowed to send
- // messages both ways in onOpen call, in case any component send message
- // up, the link receiving them should have their state as opened.
+ // First tag all states as opened, as components are allowed to send
+ // messages both ways in onOpen call, in case any component send message
+ // up, the link receiving them should have their state as opened.
StorageLink* link = this;
while (true) {
if (link->_state != CREATED) {
- LOG(error, "During open(), link %s should be in CREATED state, "
- "not in state %s.",
- toString().c_str(),
- stateToString(link->_state));
+ LOG(error, "During open(), link %s should be in CREATED state, not in state %s.",
+ toString().c_str(), stateToString(link->_state));
assert(false);
}
link->_state = OPENED;
if (link->_down.get() == 0) break;
link = link->_down.get();
}
- // When give all links an onOpen call, bottoms up. Do it bottoms up, as
- // links are more likely to send messages down in their onOpen() call
- // than up. Thus, chances are best that the component is ready to
- // receive messages sent during onOpen().
+ // When give all links an onOpen call, bottoms up. Do it bottoms up, as
+ // links are more likely to send messages down in their onOpen() call
+ // than up. Thus, chances are best that the component is ready to
+ // receive messages sent during onOpen().
while (link != 0) {
link->onOpen();
link = link->_up;
@@ -91,34 +84,31 @@ void StorageLink::closeNextLink() {
void StorageLink::flush()
{
if (_state != CLOSING) {
- LOG(error, "During flush(), link %s should be in CLOSING state, "
- "not in state %s.",
- toString().c_str(),
- stateToString(_state));
+ LOG(error, "During flush(), link %s should be in CLOSING state, not in state %s.",
+ toString().c_str(), stateToString(_state));
assert(false);
}
- // First flush down to get all requests out of the system.
+ // First flush down to get all requests out of the system.
_state = FLUSHINGDOWN;
LOG(debug, "Flushing link %s on the way down.", toString().c_str());
onFlush(true);
LOG(debug, "Flushed link %s on the way down.", toString().c_str());
if (!isBottom()) {
_down->flush();
- // Then flush up to get replies out of the system
+ // Then flush up to get replies out of the system
LOG(debug, "Flushing link %s on the way back up.", toString().c_str());
_state = FLUSHINGUP;
onFlush(false);
LOG(debug, "Flushed link %s on the way back up.", toString().c_str());
} else {
- // Then flush up to get replies out of the system
+ // Then flush up to get replies out of the system
LOG(debug, "Flushing link %s on the way back up.", toString().c_str());
_state = FLUSHINGUP;
onFlush(false);
LOG(debug, "Flushed link %s on the way back up.", toString().c_str());
}
_state = CLOSED;
- LOG(debug, "Link %s is now closed and should do nothing more.",
- toString().c_str());
+ LOG(debug, "Link %s is now closed and should do nothing more.", toString().c_str());
}
void StorageLink::sendDown(const StorageMessage::SP& msg)
@@ -130,58 +120,31 @@ void StorageLink::sendDown(const StorageMessage::SP& msg)
case FLUSHINGDOWN:
break;
default:
- LOG(error,
- "Link %s trying to send %s down while in state %s",
- toString().c_str(),
- msg->toString().c_str(),
- stateToString(_state));
+ LOG(error, "Link %s trying to send %s down while in state %s",
+ toString().c_str(), msg->toString().c_str(), stateToString(_state));
assert(false);
}
assert(msg.get());
- LOG(spam, "Storage Link %s to handle %s",
- toString().c_str(), msg->toString().c_str());
+ LOG(spam, "Storage Link %s to handle %s", toString().c_str(), msg->toString().c_str());
if (isBottom()) {
- LOG(spam, "Storage link %s at bottom of chain got message %s.",
- toString().c_str(), msg->toString().c_str());
- /*
- if (isFlush(msg)) {
+ LOG(spam, "Storage link %s at bottom of chain got message %s.", toString().c_str(), msg->toString().c_str());
+ ostringstream ost;
+ ost << "Unhandled message at bottom of chain " << *msg << " (message type "
+ << msg->getType().getName() << "). " << vespalib::getStackTrace(0);
+ if (!msg->getType().isReply()) {
+ LOGBP(warning, "%s", ost.str().c_str());
StorageCommand& cmd = static_cast<StorageCommand&>(*msg);
shared_ptr<StorageReply> reply(cmd.makeReply().release());
if (reply.get()) {
+ reply->setResult(ReturnCode(ReturnCode::NOT_IMPLEMENTED, msg->getType().getName()));
sendUp(reply);
}
} else {
- */
- ostringstream ost;
- ost << "Unhandled message at bottom of chain "
- << *msg << " (message type "
- << msg->getType().getName()
- << "). "
- << vespalib::getStackTrace(0);
- if (!msg->getType().isReply()) {
- //if (!_closed) {
- LOGBP(warning, "%s", ost.str().c_str());
- //}
- StorageCommand& cmd = static_cast<StorageCommand&>(*msg);
- shared_ptr<StorageReply> reply(cmd.makeReply().release());
-
- if (reply.get()) {
- reply->setResult(ReturnCode(ReturnCode::NOT_IMPLEMENTED,
- msg->getType().getName()));
- sendUp(reply);
- }
- } else {
- ost << " Return code: "
- << static_cast<StorageReply&>(*msg).getResult();
- //if (!_closed) {
- LOGBP(warning, "%s", ost.str().c_str());
- //}
- }
- //}
+ ost << " Return code: " << static_cast<StorageReply&>(*msg).getResult();
+ LOGBP(warning, "%s", ost.str().c_str());
+ }
} else if (!_down->onDown(msg)) {
- //LOG(spam, "Storage link %s forwarding message %s.",
- // toString().c_str(), msg->toString().c_str());
_down->sendDown(msg);
} else {
LOG(spam, "Storage link %s handled message %s.",
@@ -191,7 +154,7 @@ void StorageLink::sendDown(const StorageMessage::SP& msg)
void StorageLink::sendUp(const shared_ptr<StorageMessage> & msg)
{
- // Verify acceptable state to send messages up
+ // Verify acceptable state to send messages up
switch(_state) {
case OPENED:
case CLOSING:
@@ -199,48 +162,28 @@ void StorageLink::sendUp(const shared_ptr<StorageMessage> & msg)
case FLUSHINGUP:
break;
default:
- LOG(error,
- "Link %s trying to send %s up while in state %s",
- toString().c_str(),
- msg->toString(true).c_str(),
- stateToString(_state));
+ LOG(error, "Link %s trying to send %s up while in state %s",
+ toString().c_str(), msg->toString(true).c_str(), stateToString(_state));
assert(false);
}
assert(msg.get());
if (isTop()) {
- /*
- if (isFlush(msg)) {
+ ostringstream ost;
+ ost << "Unhandled message at top of chain " << *msg << ".";
+ ost << vespalib::getStackTrace(0);
+ if (!msg->getType().isReply()) {
+ LOGBP(warning, "%s", ost.str().c_str());
StorageCommand& cmd = static_cast<StorageCommand&>(*msg);
shared_ptr<StorageReply> reply(cmd.makeReply().release());
if (reply.get()) {
+ reply->setResult(ReturnCode(ReturnCode::NOT_IMPLEMENTED, msg->getType().getName()));
sendDown(reply);
}
} else {
- */
- ostringstream ost;
- ost << "Unhandled message at top of chain " << *msg << ".";
- ost << vespalib::getStackTrace(0);
- if (!msg->getType().isReply()) {
- //if (!_closed) {
- LOGBP(warning, "%s", ost.str().c_str());
- //}
- StorageCommand& cmd = static_cast<StorageCommand&>(*msg);
- shared_ptr<StorageReply> reply(cmd.makeReply().release());
-
- if (reply.get()) {
- reply->setResult(ReturnCode(ReturnCode::NOT_IMPLEMENTED,
- msg->getType().getName()));
- sendDown(reply);
- }
- } else {
- ost << " Return code: "
- << static_cast<StorageReply&>(*msg).getResult();
- //if (!_closed) {
- LOGBP(warning, "%s", ost.str().c_str());
- //}
- }
- //}
+ ost << " Return code: " << static_cast<StorageReply&>(*msg).getResult();
+ LOGBP(warning, "%s", ost.str().c_str());
+ }
} else if (!_up->onUp(msg)) {
_up->sendUp(msg);
}
@@ -261,19 +204,7 @@ void StorageLink::printChain(std::ostream& out, std::string indent) const {
bool StorageLink::onDown(const shared_ptr<StorageMessage> & msg)
{
- //LOG(spam, "Checking if storage link %s handles %s.",
- // toString().c_str(), msg->toString().c_str());
- bool result = msg->callHandler(*this, msg);
- /*
- if (result) {
- LOG(spam, "Storage link %s handled message %s.",
- toString().c_str(), msg->toString().c_str());
- } else {
- LOG(spam, "Storage link %s did not handle message %s.",
- toString().c_str(), msg->toString().c_str());
- }
- */
- return result;
+ return msg->callHandler(*this, msg);
}
bool StorageLink::onUp(const shared_ptr<StorageMessage> & msg)
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp
index e23bfda192c..949ceb901e1 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp
@@ -7,12 +7,8 @@ namespace storage {
FileStorHandler::FileStorHandler(MessageSender& sender,
FileStorMetrics& metrics,
const spi::PartitionStateList& partitions,
- ServiceLayerComponentRegister& compReg,
- uint8_t maxPriorityToBlock,
- uint8_t minPriorityToBeBlocking)
- : _impl(new FileStorHandlerImpl(
- sender, metrics, partitions, compReg,
- maxPriorityToBlock, minPriorityToBeBlocking))
+ ServiceLayerComponentRegister& compReg)
+ : _impl(new FileStorHandlerImpl(sender, metrics, partitions, compReg))
{
}
@@ -57,23 +53,16 @@ FileStorHandler::schedule(const api::StorageMessage::SP& msg, uint16_t thread)
return _impl->schedule(msg, thread);
}
-void
-FileStorHandler::pause(uint16_t disk, uint8_t priority) const {
- return _impl->pause(disk, priority);
-}
-
FileStorHandler::LockedMessage
-FileStorHandler::getNextMessage(uint16_t thread, uint8_t lowestPriority)
+FileStorHandler::getNextMessage(uint16_t thread)
{
- return _impl->getNextMessage(thread, lowestPriority);
+ return _impl->getNextMessage(thread);
}
FileStorHandler::LockedMessage &
-FileStorHandler::getNextMessage(uint16_t thread,
- LockedMessage& lck,
- uint8_t lowestPriority)
+FileStorHandler::getNextMessage(uint16_t thread, LockedMessage& lck)
{
- return _impl->getNextMessage(thread, lck, lowestPriority);
+ return _impl->getNextMessage(thread, lck);
}
FileStorHandler::BucketLockInterface::SP
@@ -89,30 +78,23 @@ FileStorHandler::remapQueueAfterDiskMove(
{
RemapInfo target(bucket, targetDisk);
- _impl->remapQueue(RemapInfo(bucket, sourceDisk), target,
- FileStorHandlerImpl::MOVE);
+ _impl->remapQueue(RemapInfo(bucket, sourceDisk), target, FileStorHandlerImpl::MOVE);
}
void
-FileStorHandler::remapQueueAfterJoin(
- const RemapInfo& source,
- RemapInfo& target)
+FileStorHandler::remapQueueAfterJoin(const RemapInfo& source,RemapInfo& target)
{
_impl->remapQueue(source, target, FileStorHandlerImpl::JOIN);
}
void
-FileStorHandler::remapQueueAfterSplit(
- const RemapInfo& source,
- RemapInfo& target1,
- RemapInfo& target2)
+FileStorHandler::remapQueueAfterSplit(const RemapInfo& source,RemapInfo& target1, RemapInfo& target2)
{
_impl->remapQueue(source, target1, target2, FileStorHandlerImpl::SPLIT);
}
void
-FileStorHandler::failOperations(const document::Bucket &bucket,
- uint16_t fromDisk, const api::ReturnCode& err)
+FileStorHandler::failOperations(const document::Bucket &bucket, uint16_t fromDisk, const api::ReturnCode& err)
{
_impl->failOperations(bucket, fromDisk, err);
}
@@ -130,8 +112,7 @@ FileStorHandler::sendReply(const api::StorageReply::SP& msg)
}
void
-FileStorHandler::getStatus(std::ostream& out,
- const framework::HttpUrlPath& path) const
+FileStorHandler::getStatus(std::ostream& out, const framework::HttpUrlPath& path) const
{
_impl->getStatus(out, path);
}
@@ -149,8 +130,7 @@ FileStorHandler::getQueueSize(uint16_t disk) const
}
void
-FileStorHandler::addMergeStatus(const document::Bucket& bucket,
- MergeStatus::SP ms)
+FileStorHandler::addMergeStatus(const document::Bucket& bucket, MergeStatus::SP ms)
{
return _impl->addMergeStatus(bucket, ms);
}
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
index 6e2d6a0fc07..2cfc97ca71b 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
@@ -15,7 +15,6 @@
#include "mergestatus.h"
#include <vespa/document/bucket/bucket.h>
-#include <ostream>
#include <vespa/storage/storageutil/resumeguard.h>
#include <vespa/storage/common/messagesender.h>
@@ -72,12 +71,7 @@ public:
CLOSED
};
- FileStorHandler(MessageSender&,
- FileStorMetrics&,
- const spi::PartitionStateList&,
- ServiceLayerComponentRegister&,
- uint8_t maxPriorityToBlock,
- uint8_t minPriorityToBeBlocking);
+ FileStorHandler(MessageSender&, FileStorMetrics&, const spi::PartitionStateList&, ServiceLayerComponentRegister&);
~FileStorHandler();
// Commands used by file stor manager
@@ -115,32 +109,19 @@ public:
* Schedule a storage message to be processed by the given disk
* @return True if we maanged to schedule operation. False if not
*/
- bool schedule(const std::shared_ptr<api::StorageMessage>&,
- uint16_t disk);
-
- // Commands used by file stor threads
-
- /**
- * When called, checks if any running operations have "preempting"
- * priority. If so, and the given priority is less than that, this call
- * will hang until the other operation is done.
- */
- void pause(uint16_t disk, uint8_t priority) const;
+ bool schedule(const std::shared_ptr<api::StorageMessage>&, uint16_t disk);
/**
* Used by file stor threads to get their next message to process.
*
* @param disk The disk to get messages for
- * @param lowestPriority The lowest priority of operation we should return
*/
- LockedMessage getNextMessage(uint16_t disk, uint8_t lowestPriority);
+ LockedMessage getNextMessage(uint16_t disk);
/**
* Returns the next message for the same bucket.
*/
- LockedMessage & getNextMessage(uint16_t disk,
- LockedMessage& lock,
- uint8_t lowestPriority);
+ LockedMessage & getNextMessage(uint16_t disk, LockedMessage& lock);
/**
* Lock a bucket. By default, each file stor thread has the locks of all
@@ -219,8 +200,7 @@ public:
* Fail all operations towards a single bucket currently queued to the
* given thread with the given error code.
*/
- void failOperations(const document::Bucket&, uint16_t fromDisk,
- const api::ReturnCode&);
+ void failOperations(const document::Bucket&, uint16_t fromDisk, const api::ReturnCode&);
/**
* Add a new merge state to the registry.
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
index c8b5c71ee2e..5eb168a9a42 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
@@ -23,20 +23,14 @@ using document::BucketSpace;
namespace storage {
-FileStorHandlerImpl::FileStorHandlerImpl(
- MessageSender& sender,
- FileStorMetrics& metrics,
- const spi::PartitionStateList& partitions,
- ServiceLayerComponentRegister& compReg,
- uint8_t maxPriorityToBlock,
- uint8_t minPriorityToBeBlocking)
+FileStorHandlerImpl::FileStorHandlerImpl(MessageSender& sender, FileStorMetrics& metrics,
+ const spi::PartitionStateList& partitions,
+ ServiceLayerComponentRegister& compReg)
: _partitions(partitions),
_component(compReg, "filestorhandlerimpl"),
_diskInfo(_component.getDiskCount()),
_messageSender(sender),
_bucketIdFactory(_component.getBucketIdFactory()),
- _maxPriorityToBlock(maxPriorityToBlock),
- _minPriorityToBeBlocking(minPriorityToBeBlocking),
_getNextMessageTimeout(100),
_paused(false)
{
@@ -46,23 +40,21 @@ FileStorHandlerImpl::FileStorHandlerImpl(
}
if (_diskInfo.size() == 0) {
- throw vespalib::IllegalArgumentException(
- "No disks configured", VESPA_STRLOC);
+ throw vespalib::IllegalArgumentException("No disks configured", VESPA_STRLOC);
}
// Add update hook, so we will get callbacks each 5 seconds to update
// metrics.
_component.registerMetricUpdateHook(*this, framework::SecondTime(5));
}
-FileStorHandlerImpl::~FileStorHandlerImpl() { }
+FileStorHandlerImpl::~FileStorHandlerImpl() = default;
void
FileStorHandlerImpl::addMergeStatus(const document::Bucket& bucket, MergeStatus::SP status)
{
vespalib::LockGuard mlock(_mergeStatesLock);
if (_mergeStates.find(bucket) != _mergeStates.end()) {;
- LOG(warning, "A merge status already existed for %s. Overwriting it.",
- bucket.toString().c_str());
+ LOG(warning, "A merge status already existed for %s. Overwriting it.", bucket.toString().c_str());
}
_mergeStates[bucket] = status;
}
@@ -73,8 +65,7 @@ FileStorHandlerImpl::editMergeStatus(const document::Bucket& bucket)
vespalib::LockGuard mlock(_mergeStatesLock);
MergeStatus::SP status = _mergeStates[bucket];
if (status.get() == 0) {
- throw vespalib::IllegalStateException(
- "No merge state exist for " + bucket.toString(), VESPA_STRLOC);
+ throw vespalib::IllegalStateException("No merge state exist for " + bucket.toString(), VESPA_STRLOC);
}
return *status;
}
@@ -94,8 +85,7 @@ FileStorHandlerImpl::getNumActiveMerges() const
}
void
-FileStorHandlerImpl::clearMergeStatus(const document::Bucket& bucket,
- const api::ReturnCode* code)
+FileStorHandlerImpl::clearMergeStatus(const document::Bucket& bucket, const api::ReturnCode* code)
{
vespalib::LockGuard mlock(_mergeStatesLock);
auto it = _mergeStates.find(bucket);
@@ -153,10 +143,9 @@ FileStorHandlerImpl::flush(bool killPendingMerges)
if (killPendingMerges) {
api::ReturnCode code(api::ReturnCode::ABORTED,
"Storage node is shutting down");
- for (std::map<document::Bucket, MergeStatus::SP>::iterator it
- = _mergeStates.begin(); it != _mergeStates.end(); ++it)
+ for (auto & entry : _mergeStates)
{
- MergeStatus& s(*it->second);
+ MergeStatus& s(*entry.second);
if (s.pendingGetDiff.get() != 0) {
s.pendingGetDiff->setResult(code);
_messageSender.sendReply(s.pendingGetDiff);
@@ -182,11 +171,9 @@ FileStorHandlerImpl::reply(api::StorageMessage& msg,
std::shared_ptr<api::StorageReply> rep(
static_cast<api::StorageCommand&>(msg).makeReply().release());
if (state == FileStorHandler::DISABLED) {
- rep->setResult(api::ReturnCode(
- api::ReturnCode::DISK_FAILURE, "Disk disabled"));
+ rep->setResult(api::ReturnCode(api::ReturnCode::DISK_FAILURE, "Disk disabled"));
} else {
- rep->setResult(api::ReturnCode(
- api::ReturnCode::ABORTED, "Shutting down storage node."));
+ rep->setResult(api::ReturnCode(api::ReturnCode::ABORTED, "Shutting down storage node."));
}
_messageSender.sendReply(rep);
}
@@ -270,29 +257,6 @@ FileStorHandlerImpl::schedule(const std::shared_ptr<api::StorageMessage>& msg,
return true;
}
-void
-FileStorHandlerImpl::pause(uint16_t disk, uint8_t priority) const {
- if (priority < _maxPriorityToBlock) {
- return;
- }
-
- assert(disk < _diskInfo.size());
- const Disk& t(_diskInfo[disk]);
- vespalib::MonitorGuard lockGuard(t.lock);
-
- bool paused = true;
- while (paused) {
- paused = false;
- for (auto& lockedBucket : t.lockedBuckets) {
- if (lockedBucket.second.priority <= _minPriorityToBeBlocking) {
- paused = true;
- lockGuard.wait();
- break;
- }
- }
- }
-}
-
bool
FileStorHandlerImpl::messageMayBeAborted(const api::StorageMessage& msg) const
{
@@ -395,18 +359,6 @@ FileStorHandlerImpl::abortQueuedOperations(
}
}
-bool
-FileStorHandlerImpl::hasBlockingOperations(const Disk& t) const
-{
- for (auto& lockedBucket : t.lockedBuckets) {
- if (lockedBucket.second.priority <= _minPriorityToBeBlocking) {
- return true;
- }
- }
-
- return false;
-}
-
void
FileStorHandlerImpl::updateMetrics(const MetricLockGuard &)
{
@@ -419,16 +371,11 @@ FileStorHandlerImpl::updateMetrics(const MetricLockGuard &)
}
FileStorHandler::LockedMessage &
-FileStorHandlerImpl::getNextMessage(uint16_t disk,
- FileStorHandler::LockedMessage& lck,
- uint8_t maxPriority)
+FileStorHandlerImpl::getNextMessage(uint16_t disk, FileStorHandler::LockedMessage& lck)
{
document::Bucket bucket(lck.first->getBucket());
- LOG(spam,
- "Disk %d retrieving message for buffered bucket %s",
- disk,
- bucket.getBucketId().toString().c_str());
+ LOG(spam, "Disk %d retrieving message for buffered bucket %s", disk, bucket.getBucketId().toString().c_str());
assert(disk < _diskInfo.size());
Disk& t(_diskInfo[disk]);
@@ -451,22 +398,12 @@ FileStorHandlerImpl::getNextMessage(uint16_t disk,
api::StorageMessage & m(*range.first->_command);
mbus::Trace& trace = m.getTrace();
- // Priority is too low, not buffering any more.
- if (m.getPriority() > maxPriority || m.getPriority() >= _maxPriorityToBlock) {
- lck.second.reset();
- return lck;
- }
+ MBUS_TRACE(trace, 9, "FileStorHandler: Message identified by disk thread looking for more requests to active bucket.");
- MBUS_TRACE(trace, 9,
- "FileStorHandler: Message identified by disk thread looking for "
- "more requests to active bucket.");
+ uint64_t waitTime(const_cast<metrics::MetricTimer&>(range.first->_timer).stop(
+ t.metrics->averageQueueWaitingTime[m.getLoadType()]));
- uint64_t waitTime(
- const_cast<metrics::MetricTimer&>(range.first->_timer).stop(
- t.metrics->averageQueueWaitingTime[m.getLoadType()]));
-
- LOG(debug, "Message %s waited %" PRIu64 " ms in storage queue (bucket %s), "
- "timeout %d",
+ LOG(debug, "Message %s waited %" PRIu64 " ms in storage queue (bucket %s), timeout %d",
m.toString().c_str(), waitTime, bucket.getBucketId().toString().c_str(),
static_cast<api::StorageCommand&>(m).getTimeout());
@@ -480,15 +417,11 @@ FileStorHandlerImpl::getNextMessage(uint16_t disk,
lockGuard.unlock();
return lck;
} else {
- std::shared_ptr<api::StorageReply> msgReply(
- static_cast<api::StorageCommand&>(m)
- .makeReply().release());
+ std::shared_ptr<api::StorageReply> msgReply(static_cast<api::StorageCommand&>(m).makeReply().release());
idx.erase(range.first);
lockGuard.broadcast();
lockGuard.unlock();
- msgReply->setResult(api::ReturnCode(
- api::ReturnCode::TIMEOUT,
- "Message waited too long in storage queue"));
+ msgReply->setResult(api::ReturnCode(api::ReturnCode::TIMEOUT, "Message waited too long in storage queue"));
_messageSender.sendReply(msgReply);
lck.second.reset();
@@ -517,23 +450,13 @@ FileStorHandlerImpl::diskIsClosed(uint16_t disk) const
}
bool
-FileStorHandlerImpl::operationBlockedByHigherPriorityThread(
- const api::StorageMessage& msg,
- const Disk& disk) const
-{
- return ((msg.getPriority() >= _maxPriorityToBlock)
- && hasBlockingOperations(disk));
-}
-
-bool
FileStorHandlerImpl::messageTimedOutInQueue(const api::StorageMessage& msg,
uint64_t waitTime) const
{
if (msg.getType().isReply()) {
return false; // Replies must always be processed and cannot time out.
}
- return (waitTime >= static_cast<const api::StorageCommand&>(
- msg).getTimeout());
+ return (waitTime >= static_cast<const api::StorageCommand&>(msg).getTimeout());
}
std::unique_ptr<FileStorHandler::BucketLockInterface>
@@ -543,8 +466,7 @@ FileStorHandlerImpl::takeDiskBucketLockOwnership(
const document::Bucket &bucket,
const api::StorageMessage& msg)
{
- return std::unique_ptr<FileStorHandler::BucketLockInterface>(
- new BucketLock(guard, disk, bucket, msg.getPriority(), msg.getSummary()));
+ return std::make_unique<BucketLock>(guard, disk, bucket, msg.getPriority(), msg.getSummary());
}
std::unique_ptr<api::StorageReply>
@@ -564,23 +486,10 @@ namespace {
bucketIsLockedOnDisk(const document::Bucket &id, const FileStorHandlerImpl::Disk &t) {
return (id.getBucketId().getRawId() != 0 && t.isLocked(id));
}
-
- /**
- * Return whether msg has sufficiently high priority that a thread with
- * a configured priority threshold of maxPriority can even run in.
- * Often, operations such as streaming searches will have dedicated threads
- * that refuse lower priority operations such as Puts etc.
- */
- bool
- operationHasHighEnoughPriorityToBeRun(const api::StorageMessage& msg, uint8_t maxPriority)
- {
- // NOTE: priority integral value 0 is considered highest pri.
- return (msg.getPriority() <= maxPriority);
- }
}
FileStorHandler::LockedMessage
-FileStorHandlerImpl::getNextMessage(uint16_t disk, uint8_t maxPriority)
+FileStorHandlerImpl::getNextMessage(uint16_t disk)
{
assert(disk < _diskInfo.size());
if (!tryHandlePause(disk)) {
@@ -602,12 +511,7 @@ FileStorHandlerImpl::getNextMessage(uint16_t disk, uint8_t maxPriority)
iter++;
}
if (iter != end) {
- api::StorageMessage &m(*iter->_command);
-
- if (operationHasHighEnoughPriorityToBeRun(m, maxPriority)
- && ! operationBlockedByHigherPriorityThread(m, t)
- && ! isPaused())
- {
+ if (! isPaused()) {
return getMessage(lockGuard, t, idx, iter);
}
}
@@ -655,22 +559,16 @@ FileStorHandlerImpl::lock(const document::Bucket &bucket, uint16_t disk)
assert(disk < _diskInfo.size());
Disk& t(_diskInfo[disk]);
- LOG(spam,
- "Acquiring filestor lock for %s on disk %d",
- bucket.getBucketId().toString().c_str(),
- disk);
+ LOG(spam, "Acquiring filestor lock for %s on disk %d", bucket.getBucketId().toString().c_str(), disk);
vespalib::MonitorGuard lockGuard(t.lock);
while (bucket.getBucketId().getRawId() != 0 && t.isLocked(bucket)) {
- LOG(spam,
- "Contending for filestor lock for %s",
- bucket.getBucketId().toString().c_str());
+ LOG(spam, "Contending for filestor lock for %s", bucket.getBucketId().toString().c_str());
lockGuard.wait(100);
}
- std::shared_ptr<FileStorHandler::BucketLockInterface> locker(
- new BucketLock(lockGuard, t, bucket, 255, "External lock"));
+ auto locker = std::make_shared<BucketLock>(lockGuard, t, bucket, 255, "External lock");
lockGuard.broadcast();
return locker;
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
index 56ac9ea0577..6b6d154e149 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
@@ -142,12 +142,8 @@ public:
document::Bucket _bucket;
};
- FileStorHandlerImpl(MessageSender&,
- FileStorMetrics&,
- const spi::PartitionStateList&,
- ServiceLayerComponentRegister&,
- uint8_t maxPriorityToBlock,
- uint8_t minPriorityToBeBlocking);
+ FileStorHandlerImpl(MessageSender&, FileStorMetrics&,
+ const spi::PartitionStateList&, ServiceLayerComponentRegister&);
~FileStorHandlerImpl();
void setGetNextMessageTimeout(uint32_t timeout) { _getNextMessageTimeout = timeout; }
@@ -158,12 +154,10 @@ public:
void close();
bool schedule(const std::shared_ptr<api::StorageMessage>&, uint16_t disk);
- void pause(uint16_t disk, uint8_t priority) const;
- FileStorHandler::LockedMessage getNextMessage(uint16_t disk, uint8_t lowestPriority);
+ FileStorHandler::LockedMessage getNextMessage(uint16_t disk);
FileStorHandler::LockedMessage getMessage(vespalib::MonitorGuard & guard, Disk & t, PriorityIdx & idx, PriorityIdx::iterator iter);
- FileStorHandler::LockedMessage & getNextMessage(uint16_t disk, FileStorHandler::LockedMessage& lock,
- uint8_t lowestPriority);
+ FileStorHandler::LockedMessage & getNextMessage(uint16_t disk, FileStorHandler::LockedMessage& lock);
enum Operation { MOVE, SPLIT, JOIN };
void remapQueue(const RemapInfo& source, RemapInfo& target, Operation op);
@@ -204,8 +198,6 @@ private:
std::map<document::Bucket, MergeStatus::SP> _mergeStates;
- uint8_t _maxPriorityToBlock;
- uint8_t _minPriorityToBeBlocking;
uint32_t _getNextMessageTimeout;
vespalib::Monitor _pauseMonitor;
@@ -237,12 +229,6 @@ private:
bool diskIsClosed(uint16_t disk) const;
/**
- * Return whether an already running high priority operation pre-empts
- * (blocks) the operation in msg from even starting in the current thread.
- */
- bool operationBlockedByHigherPriorityThread(const api::StorageMessage& msg, const Disk& disk) const;
-
- /**
* Return whether msg has timed out based on waitTime and the message's
* specified timeout.
*/
@@ -262,7 +248,6 @@ private:
*/
std::unique_ptr<api::StorageReply> makeQueueTimeoutReply(api::StorageMessage& msg) const;
bool messageMayBeAborted(const api::StorageMessage& msg) const;
- bool hasBlockingOperations(const Disk& t) const;
void abortQueuedCommandsForBuckets(Disk& disk, const AbortBucketOperationsCommand& cmd);
bool diskHasActiveOperationForAbortedBucket(const Disk& disk, const AbortBucketOperationsCommand& cmd) const;
void waitUntilNoActiveOperationsForAbortedBuckets(Disk& disk, const AbortBucketOperationsCommand& cmd);
@@ -287,4 +272,3 @@ private:
};
} // storage
-
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
index 6644d09da7f..b4735c2961a 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
@@ -29,10 +29,8 @@ using document::BucketSpace;
namespace storage {
FileStorManager::
-FileStorManager(const config::ConfigUri & configUri,
- const spi::PartitionStateList& partitions,
- spi::PersistenceProvider& provider,
- ServiceLayerComponentRegister& compReg)
+FileStorManager(const config::ConfigUri & configUri, const spi::PartitionStateList& partitions,
+ spi::PersistenceProvider& provider, ServiceLayerComponentRegister& compReg)
: StorageLinkQueued("File store manager", compReg),
framework::HtmlStatusReporter("filestorman", "File store manager"),
_compReg(compReg),
@@ -83,8 +81,7 @@ FileStorManager::~FileStorManager()
}
}
}
- LOG(debug, "Closing all filestor queues, answering queued messages. "
- "New messages will be refused.");
+ LOG(debug, "Closing all filestor queues, answering queued messages. New messages will be refused.");
_filestorHandler->close();
LOG(debug, "Deleting filestor threads. Waiting for their current operation "
"to finish. Stop their threads and delete objects.");
@@ -116,38 +113,16 @@ FileStorManager::configure(std::unique_ptr<vespa::config::content::StorFilestorC
_disks.resize(_component.getDiskCount());
- _metrics->initDiskMetrics(
- _disks.size(),
- _component.getLoadTypes()->getMetricLoadTypes(),
- (_config->threads.size() > 0) ? (_config->threads.size()) : 6);
+ size_t numThreads = _config->numThreads;
+ _metrics->initDiskMetrics(_disks.size(), _component.getLoadTypes()->getMetricLoadTypes(), numThreads);
- _filestorHandler.reset(new FileStorHandler(
- *this, *_metrics, _partitions, _compReg,
- _config->maxPriorityToBlock, _config->minPriorityToBeBlocking));
+ _filestorHandler.reset(new FileStorHandler(*this, *_metrics, _partitions, _compReg));
for (uint32_t i=0; i<_component.getDiskCount(); ++i) {
if (_partitions[i].isUp()) {
- if (_config->threads.size() == 0) {
- LOG(spam, "Setting up disk %u", i);
- for (uint32_t j = 0; j < 4; j++) {
- _disks[i].push_back(DiskThread::SP(
- new PersistenceThread(_compReg, _configUri, *_provider, *_filestorHandler,
- *_metrics->disks[i]->threads[j], i, 255)));
-
- }
- for (uint32_t j = 4; j < 6; j++) {
- _disks[i].push_back(DiskThread::SP(
- new PersistenceThread(_compReg, _configUri, *_provider, *_filestorHandler,
- *_metrics->disks[i]->threads[j], i, 100)));
- }
- }
-
- for (uint16_t j = 0; j < _config->threads.size(); j++) {
- LOG(spam, "Setting up disk %u, thread %u with priority %d",
- i, j, _config->threads[j].lowestpri);
- _disks[i].push_back(DiskThread::SP(
- new PersistenceThread(_compReg, _configUri, *_provider, *_filestorHandler,
- *_metrics->disks[i]->threads[j], i, _config->threads[j].lowestpri)));
-
+ LOG(spam, "Setting up disk %u", i);
+ for (uint32_t j = 0; j < numThreads; j++) {
+ _disks[i].push_back(std::make_shared<PersistenceThread>(_compReg, _configUri, *_provider, *_filestorHandler,
+ *_metrics->disks[i]->threads[j], i));
}
} else {
_filestorHandler->disable(i);
@@ -157,36 +132,28 @@ FileStorManager::configure(std::unique_ptr<vespa::config::content::StorFilestorC
}
void
-FileStorManager::replyDroppedOperation(api::StorageMessage& msg,
- const document::Bucket& bucket,
- api::ReturnCode::Result returnCode,
- vespalib::stringref reason)
+FileStorManager::replyDroppedOperation(api::StorageMessage& msg, const document::Bucket& bucket,
+ api::ReturnCode::Result returnCode, vespalib::stringref reason)
{
std::ostringstream error;
error << "Dropping " << msg.getType() << " to bucket "
<< bucket.toString() << ". Reason: " << reason;
LOGBT(debug, bucket.toString(), "%s", error.str().c_str());
if (!msg.getType().isReply()) {
- std::shared_ptr<api::StorageReply> reply(
- static_cast<api::StorageCommand&>(msg).makeReply().release());
+ std::shared_ptr<api::StorageReply> reply = static_cast<api::StorageCommand&>(msg).makeReply();
reply->setResult(api::ReturnCode(returnCode, error.str()));
sendUp(reply);
}
}
void
-FileStorManager::replyWithBucketNotFound(api::StorageMessage& msg,
- const document::Bucket& bucket)
+FileStorManager::replyWithBucketNotFound(api::StorageMessage& msg, const document::Bucket& bucket)
{
- replyDroppedOperation(msg,
- bucket,
- api::ReturnCode::BUCKET_NOT_FOUND,
- "bucket does not exist");
+ replyDroppedOperation(msg, bucket, api::ReturnCode::BUCKET_NOT_FOUND, "bucket does not exist");
}
StorBucketDatabase::WrappedEntry
-FileStorManager::mapOperationToDisk(api::StorageMessage& msg,
- const document::Bucket& bucket)
+FileStorManager::mapOperationToDisk(api::StorageMessage& msg, const document::Bucket& bucket)
{
StorBucketDatabase::WrappedEntry entry(_component.getBucketDatabase(bucket.getBucketSpace()).get(
bucket.getBucketId(), "FileStorManager::mapOperationToDisk"));
@@ -197,8 +164,7 @@ FileStorManager::mapOperationToDisk(api::StorageMessage& msg,
}
StorBucketDatabase::WrappedEntry
-FileStorManager::mapOperationToBucketAndDisk(api::BucketCommand& cmd,
- const document::DocumentId* docId)
+FileStorManager::mapOperationToBucketAndDisk(api::BucketCommand& cmd, const document::DocumentId* docId)
{
StorBucketDatabase &database = _component.getBucketDatabase(cmd.getBucket().getBucketSpace());
StorBucketDatabase::WrappedEntry entry(database.get(
@@ -208,17 +174,12 @@ FileStorManager::mapOperationToBucketAndDisk(api::BucketCommand& cmd,
if (docId) {
specific = _bucketIdFactory.getBucketId(*docId);
}
- typedef std::map<document::BucketId,
- StorBucketDatabase::WrappedEntry> BucketMap;
+ typedef std::map<document::BucketId, StorBucketDatabase::WrappedEntry> BucketMap;
std::shared_ptr<api::StorageReply> reply;
{
- BucketMap results(
- database.getContained(
- specific, "FileStorManager::mapOperationToBucketAndDisk-2"));
+ BucketMap results( database.getContained( specific, "FileStorManager::mapOperationToBucketAndDisk-2"));
if (results.size() == 1) {
- LOG(debug,
- "Remapping %s operation to specific %s versus "
- "non-existing %s to %s.",
+ LOG(debug, "Remapping %s operation to specific %s versus non-existing %s to %s.",
cmd.toString().c_str(), specific.toString().c_str(),
cmd.getBucketId().toString().c_str(),
results.begin()->first.toString().c_str());
@@ -243,10 +204,8 @@ FileStorManager::mapOperationToBucketAndDisk(api::BucketCommand& cmd,
}
LOGBT(debug, cmd.getBucketId().toString(), "%s", error.str().c_str());
- reply.reset(static_cast<api::StorageCommand&>(cmd).makeReply().release());
- reply->setResult(
- api::ReturnCode(
- api::ReturnCode::BUCKET_NOT_FOUND, error.str()));
+ reply = static_cast<api::StorageCommand&>(cmd).makeReply();
+ reply->setResult( api::ReturnCode( api::ReturnCode::BUCKET_NOT_FOUND, error.str()));
}
sendUp(reply);
}
@@ -254,18 +213,15 @@ FileStorManager::mapOperationToBucketAndDisk(api::BucketCommand& cmd,
}
bool
-FileStorManager::handlePersistenceMessage(
- const shared_ptr<api::StorageMessage>& msg, uint16_t disk)
+FileStorManager::handlePersistenceMessage( const shared_ptr<api::StorageMessage>& msg, uint16_t disk)
{
api::ReturnCode errorCode(api::ReturnCode::OK);
do {
- LOG(spam, "Received %s. Attempting to queue it to disk %u.",
- msg->getType().getName().c_str(), disk);
+ LOG(spam, "Received %s. Attempting to queue it to disk %u.", msg->getType().getName().c_str(), disk);
LOG_BUCKET_OPERATION_NO_LOCK(
getStorageMessageBucket(*msg).getBucketId(),
- vespalib::make_string("Attempting to queue %s to disk %u",
- msg->toString().c_str(), disk));
+ vespalib::make_string("Attempting to queue %s to disk %u", msg->toString().c_str(), disk));
if (_filestorHandler->schedule(msg, disk)) {
@@ -275,12 +231,10 @@ FileStorManager::handlePersistenceMessage(
}
switch (_filestorHandler->getDiskState(disk)) {
case FileStorHandler::DISABLED:
- errorCode = api::ReturnCode(api::ReturnCode::DISK_FAILURE,
- "Disk disabled");
+ errorCode = api::ReturnCode(api::ReturnCode::DISK_FAILURE, "Disk disabled");
break;
case FileStorHandler::CLOSED:
- errorCode = api::ReturnCode(api::ReturnCode::ABORTED,
- "Shutting down storage node.");
+ errorCode = api::ReturnCode(api::ReturnCode::ABORTED, "Shutting down storage node.");
break;
case FileStorHandler::AVAILABLE:
assert(false);
@@ -289,8 +243,7 @@ FileStorManager::handlePersistenceMessage(
// If we get here, we failed to schedule message. errorCode says why
// We need to reply to message (while not having bucket lock)
if (!msg->getType().isReply()) {
- std::shared_ptr<api::StorageReply> reply(
- static_cast<api::StorageCommand&>(*msg).makeReply().release());
+ std::shared_ptr<api::StorageReply> reply = static_cast<api::StorageCommand&>(*msg).makeReply();
reply->setResult(errorCode);
LOG(spam, "Received persistence message %s. Returning reply: %s",
msg->getType().getName().c_str(), errorCode.toString().c_str());
@@ -303,7 +256,7 @@ bool
FileStorManager::onPut(const shared_ptr<api::PutCommand>& cmd)
{
if (cmd->getTimestamp() == 0) {
- shared_ptr<api::StorageReply> reply(cmd->makeReply().release());
+ shared_ptr<api::StorageReply> reply = cmd->makeReply();
std::string msg("Put command received without timestamp set. "
"Distributor need to set timestamp to ensure equal "
"timestamps between storage nodes. Rejecting.");
@@ -311,8 +264,7 @@ FileStorManager::onPut(const shared_ptr<api::PutCommand>& cmd)
sendUp(reply);
return true;
}
- StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk(
- *cmd, &cmd->getDocumentId()));
+ StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk(*cmd, &cmd->getDocumentId()));
if (entry.exist()) {
handlePersistenceMessage(cmd, entry->disk);
}
@@ -323,7 +275,7 @@ bool
FileStorManager::onUpdate(const shared_ptr<api::UpdateCommand>& cmd)
{
if (cmd->getTimestamp() == 0) {
- shared_ptr<api::StorageReply> reply(cmd->makeReply().release());
+ shared_ptr<api::StorageReply> reply = cmd->makeReply();
std::string msg("Update command received without timestamp set. "
"Distributor need to set timestamp to ensure equal "
"timestamps between storage nodes. Rejecting.");
@@ -331,8 +283,7 @@ FileStorManager::onUpdate(const shared_ptr<api::UpdateCommand>& cmd)
sendUp(reply);
return true;
}
- StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk(
- *cmd, &cmd->getDocumentId()));
+ StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk(*cmd, &cmd->getDocumentId()));
if (entry.exist()) {
handlePersistenceMessage(cmd, entry->disk);
}
@@ -342,8 +293,7 @@ FileStorManager::onUpdate(const shared_ptr<api::UpdateCommand>& cmd)
bool
FileStorManager::onGet(const shared_ptr<api::GetCommand>& cmd)
{
- StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk(
- *cmd, &cmd->getDocumentId()));
+ StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk(*cmd, &cmd->getDocumentId()));
if (entry.exist()) {
handlePersistenceMessage(cmd, entry->disk);
}
@@ -354,7 +304,7 @@ bool
FileStorManager::onRemove(const shared_ptr<api::RemoveCommand>& cmd)
{
if (cmd->getTimestamp() == 0) {
- shared_ptr<api::StorageReply> reply(cmd->makeReply().release());
+ shared_ptr<api::StorageReply> reply = cmd->makeReply();
std::string msg("Remove command received without timestamp set. "
"Distributor need to set timestamp to ensure equal "
"timestamps between storage nodes. Rejecting.");
@@ -362,8 +312,7 @@ FileStorManager::onRemove(const shared_ptr<api::RemoveCommand>& cmd)
sendUp(reply);
return true;
}
- StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk(
- *cmd, &cmd->getDocumentId()));
+ StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk(*cmd, &cmd->getDocumentId()));
if (entry.exist()) {
handlePersistenceMessage(cmd, entry->disk);
}
@@ -373,8 +322,7 @@ FileStorManager::onRemove(const shared_ptr<api::RemoveCommand>& cmd)
bool
FileStorManager::onRevert(const shared_ptr<api::RevertCommand>& cmd)
{
- StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk(
- *cmd, 0));
+ StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk(*cmd, 0));
if (entry.exist()) {
handlePersistenceMessage(cmd, entry->disk);
}
@@ -423,12 +371,10 @@ FileStorManager::onCreateBucket(
bucket.getBucketId(), "FileStorManager::onCreateBucket",
StorBucketDatabase::CREATE_IF_NONEXISTING));
if (entry.preExisted()) {
- LOG(debug,
- "Got create bucket request for %s which already exists: %s",
+ LOG(debug, "Got create bucket request for %s which already exists: %s",
cmd->getBucketId().toString().c_str(),
entry->getBucketInfo().toString().c_str());
- code = api::ReturnCode(api::ReturnCode::EXISTS,
- "Bucket already exist");
+ code = api::ReturnCode(api::ReturnCode::EXISTS, "Bucket already exist");
} else {
entry->disk = _component.getIdealPartition(cmd->getBucket());
if (_partitions[entry->disk].isUp()) {
@@ -445,12 +391,9 @@ FileStorManager::onCreateBucket(
return true;
} else {
entry.remove();
- code = api::ReturnCode(
- api::ReturnCode::IO_FAILURE,
- vespalib::make_string(
- "Trying to create bucket %s on disabled disk %d",
- cmd->getBucketId().toString().c_str(),
- entry->disk));
+ code = api::ReturnCode(api::ReturnCode::IO_FAILURE,
+ vespalib::make_string("Trying to create bucket %s on disabled disk %d",
+ cmd->getBucketId().toString().c_str(), entry->disk));
}
}
}
@@ -471,7 +414,7 @@ FileStorManager::onDeleteBucket(const shared_ptr<api::DeleteBucketCommand>& cmd)
"FileStorManager::onDeleteBucket"));
if (!entry.exist()) {
LOG(debug, "%s was already deleted", cmd->getBucketId().toString().c_str());
- std::shared_ptr<api::StorageReply> reply(cmd->makeReply().release());
+ std::shared_ptr<api::StorageReply> reply = cmd->makeReply();
sendUp(reply);
return true;
}
@@ -493,7 +436,7 @@ FileStorManager::onDeleteBucket(const shared_ptr<api::DeleteBucketCommand>& cmd)
<< entry->getBucketInfo().toString();
LOG(debug, "Rejecting bucket delete: %s", ost.str().c_str());
- std::shared_ptr<api::StorageReply> reply(cmd->makeReply().release());
+ std::shared_ptr<api::StorageReply> reply = cmd->makeReply();
static_cast<api::DeleteBucketReply&>(*reply).setBucketInfo(entry->getBucketInfo());
reply->setResult(api::ReturnCode(api::ReturnCode::REJECTED, ost.str()));
entry.unlock();
@@ -543,8 +486,7 @@ FileStorManager::ensureConsistentBucket(
bool
FileStorManager::onMergeBucket(const shared_ptr<api::MergeBucketCommand>& cmd)
{
- StorBucketDatabase::WrappedEntry entry(ensureConsistentBucket(cmd->getBucket(), *cmd,
- "FileStorManager::onMergeBucket"));
+ StorBucketDatabase::WrappedEntry entry(ensureConsistentBucket(cmd->getBucket(), *cmd, "FileStorManager::onMergeBucket"));
if (!entry.exist()) {
return true;
}
@@ -567,7 +509,7 @@ FileStorManager::onMergeBucket(const shared_ptr<api::MergeBucketCommand>& cmd)
cmd->toString().c_str(), entry->disk, cmd->getClusterStateVersion(),
_component.getStateUpdater().getClusterStateBundle()->getVersion()));
LOGBT(debug, cmd->getBucketId().toString(), "%s", code.getMessage().c_str());
- api::MergeBucketReply::SP reply(new api::MergeBucketReply(*cmd));
+ auto reply = std::make_shared<api::MergeBucketReply>(*cmd);
reply->setResult(code);
sendUp(reply);
return true;
@@ -582,39 +524,30 @@ bool
FileStorManager::onGetBucketDiff(
const shared_ptr<api::GetBucketDiffCommand>& cmd)
{
- StorBucketDatabase::WrappedEntry entry(
- ensureConsistentBucket(cmd->getBucket(),
- *cmd,
- "FileStorManager::onGetBucketDiff"));
+ StorBucketDatabase::WrappedEntry entry(ensureConsistentBucket(cmd->getBucket(), *cmd, "FileStorManager::onGetBucketDiff"));
if (!entry.exist()) {
return true;
}
if (!entry.preExisted()) {
entry->disk = _component.getIdealPartition(cmd->getBucket());
if (_partitions[entry->disk].isUp()) {
- LOG(debug, "Created bucket %s on disk %d (node index is %d) due "
- "to get bucket diff being received.",
- cmd->getBucketId().toString().c_str(),
- entry->disk, _component.getIndex());
+ LOG(debug, "Created bucket %s on disk %d (node index is %d) due to get bucket diff being received.",
+ cmd->getBucketId().toString().c_str(), entry->disk, _component.getIndex());
entry->info.setTotalDocumentSize(0);
entry->info.setUsedFileSize(0);
entry->info.setReady(true);
- // Call before writing bucket entry as we need to have bucket
- // lock while calling
+ // Call before writing bucket entry as we need to have bucket
+ // lock while calling
handlePersistenceMessage(cmd, entry->disk);
entry.write();
} else {
entry.remove();
api::ReturnCode code(api::ReturnCode::IO_FAILURE,
vespalib::make_string(
- "Trying to merge non-existing bucket %s, which "
- "can't be created because target disk %d is down",
- cmd->getBucketId().toString().c_str(),
- entry->disk));
- LOGBT(warning, cmd->getBucketId().toString(),
- "%s", code.getMessage().c_str());
- api::GetBucketDiffReply::SP reply(
- new api::GetBucketDiffReply(*cmd));
+ "Trying to merge non-existing bucket %s, which can't be created because target disk %d is down",
+ cmd->getBucketId().toString().c_str(), entry->disk));
+ LOGBT(warning, cmd->getBucketId().toString(), "%s", code.getMessage().c_str());
+ auto reply = std::make_shared<api::GetBucketDiffReply>(*cmd);
reply->setResult(code);
sendUp(reply);
return true;
@@ -634,8 +567,7 @@ FileStorManager::validateApplyDiffCommandBucket(api::StorageMessage& msg, const
BucketSpace bucketSpace(msg.getBucket().getBucketSpace());
if (!_component.getBucketDatabase(bucketSpace).isConsistent(entry)) {
document::Bucket bucket(bucketSpace, entry.getBucketId());
- replyDroppedOperation(msg, bucket, api::ReturnCode::ABORTED,
- "bucket became inconsistent during merging");
+ replyDroppedOperation(msg, bucket, api::ReturnCode::ABORTED, "bucket became inconsistent during merging");
return false;
}
return true;
@@ -715,8 +647,7 @@ FileStorManager::onSplitBucket(const std::shared_ptr<api::SplitBucketCommand>& c
}
bool
-FileStorManager::onSetBucketState(
- const std::shared_ptr<api::SetBucketStateCommand>& cmd)
+FileStorManager::onSetBucketState(const std::shared_ptr<api::SetBucketStateCommand>& cmd)
{
StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucket()));
if (entry.exist()) {
@@ -821,7 +752,7 @@ void
FileStorManager::handleAbortBucketOperations(const shared_ptr<AbortBucketOperationsCommand>& cmd)
{
_filestorHandler->abortQueuedOperations(*cmd);
- sendReply(api::StorageReply::SP(cmd->makeReply().release()));
+ sendReply(api::StorageReply::SP(cmd->makeReply()));
}
bool
@@ -913,8 +844,7 @@ void FileStorManager::onFlush(bool downwards)
}
void
-FileStorManager::reportHtmlStatus(std::ostream& out,
- const framework::HttpUrlPath& path) const
+FileStorManager::reportHtmlStatus(std::ostream& out, const framework::HttpUrlPath& path) const
{
bool showStatus = !path.hasAttribute("thread");
bool verbose = path.hasAttribute("verbose");
@@ -962,7 +892,7 @@ FileStorManager::updateState()
bool nodeUp = state->getNodeState(node).getState().oneOf("uir");
LOG(debug, "FileStorManager received cluster state '%s'", state->toString().c_str());
- // If edge where we go down
+ // If edge where we go down
if (_nodeUpInLastNodeStateSeenByProvider && !nodeUp) {
LOG(debug, "Received cluster state where this node is down; de-activating all buckets in database");
Deactivator deactivator;
diff --git a/storage/src/vespa/storage/persistence/filestorage/pausehandler.h b/storage/src/vespa/storage/persistence/filestorage/pausehandler.h
deleted file mode 100644
index 59c543b0067..00000000000
--- a/storage/src/vespa/storage/persistence/filestorage/pausehandler.h
+++ /dev/null
@@ -1,34 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-/**
- * @class PauseHandler
- * @ingroup persistence
- *
- * @brief Object that can be used to possibly pause running operation
- */
-#pragma once
-
-#include <vespa/storage/persistence/filestorage/filestorhandler.h>
-
-namespace storage {
-
-class PauseHandler {
- FileStorHandler* _handler;
- uint16_t _disk;
- uint8_t _priority;
-
-public:
- PauseHandler() : _handler(0), _disk(0), _priority(0) {}
- PauseHandler(FileStorHandler& handler, uint16_t disk)
- : _handler(&handler),
- _disk(disk),
- _priority(0)
- {
- }
-
- void setPriority(uint8_t priority) { _priority = priority; }
-
- void pause() const { if (_handler != 0) _handler->pause(_disk, _priority); }
-};
-
-} // storage
-
diff --git a/storage/src/vespa/storage/persistence/persistencethread.cpp b/storage/src/vespa/storage/persistence/persistencethread.cpp
index 6c4cd7b64d2..be6f9577642 100644
--- a/storage/src/vespa/storage/persistence/persistencethread.cpp
+++ b/storage/src/vespa/storage/persistence/persistencethread.cpp
@@ -9,7 +9,6 @@
#include <vespa/document/fieldset/fieldsetrepo.h>
#include <vespa/vespalib/stllike/hash_map.hpp>
#include <vespa/vespalib/util/exceptions.h>
-#include <algorithm>
#include <vespa/log/bufferedlogger.h>
LOG_SETUP(".persistence.thread");
@@ -21,9 +20,8 @@ PersistenceThread::PersistenceThread(ServiceLayerComponentRegister& compReg,
spi::PersistenceProvider& provider,
FileStorHandler& filestorHandler,
FileStorThreadMetrics& metrics,
- uint16_t deviceIndex,
- uint8_t lowestPriority)
- : _env(configUri, compReg, filestorHandler, metrics, deviceIndex, lowestPriority, provider),
+ uint16_t deviceIndex)
+ : _env(configUri, compReg, filestorHandler, metrics, deviceIndex, provider),
_warnOnSlowOperations(5000),
_spi(provider),
_processAllHandler(_env, provider),
@@ -106,19 +104,14 @@ bool PersistenceThread::tasConditionMatches(const api::TestAndSetCommand & cmd,
MessageTracker::UP
PersistenceThread::handlePut(api::PutCommand& cmd)
{
- MessageTracker::UP tracker(new MessageTracker(
- _env._metrics.put[cmd.getLoadType()],
- _env._component.getClock()));
+ auto tracker = std::make_unique<MessageTracker>(_env._metrics.put[cmd.getLoadType()],_env._component.getClock());
if (tasConditionExists(cmd) && !tasConditionMatches(cmd, *tracker)) {
return tracker;
}
- spi::Result response =
- _spi.put(getBucket(cmd.getDocumentId(), cmd.getBucket()),
- spi::Timestamp(cmd.getTimestamp()),
- cmd.getDocument(),
- _context);
+ spi::Result response = _spi.put(getBucket(cmd.getDocumentId(), cmd.getBucket()),
+ spi::Timestamp(cmd.getTimestamp()), cmd.getDocument(), _context);
checkForError(response, *tracker);
return tracker;
}
@@ -126,22 +119,16 @@ PersistenceThread::handlePut(api::PutCommand& cmd)
MessageTracker::UP
PersistenceThread::handleRemove(api::RemoveCommand& cmd)
{
- MessageTracker::UP tracker(new MessageTracker(
- _env._metrics.remove[cmd.getLoadType()],
- _env._component.getClock()));
+ auto tracker = std::make_unique<MessageTracker>(_env._metrics.remove[cmd.getLoadType()],_env._component.getClock());
if (tasConditionExists(cmd) && !tasConditionMatches(cmd, *tracker)) {
return tracker;
}
- spi::RemoveResult response =
- _spi.removeIfFound(getBucket(cmd.getDocumentId(), cmd.getBucket()),
- spi::Timestamp(cmd.getTimestamp()),
- cmd.getDocumentId(), _context);
+ spi::RemoveResult response = _spi.removeIfFound(getBucket(cmd.getDocumentId(), cmd.getBucket()),
+ spi::Timestamp(cmd.getTimestamp()), cmd.getDocumentId(), _context);
if (checkForError(response, *tracker)) {
- api::RemoveReply* reply(new api::RemoveReply(
- cmd, response.wasFound() ? cmd.getTimestamp() : 0));
- tracker->setReply(api::StorageReply::SP(reply));
+ tracker->setReply(std::make_shared<api::RemoveReply>(cmd, response.wasFound() ? cmd.getTimestamp() : 0));
}
if (!response.wasFound()) {
++_env._metrics.remove[cmd.getLoadType()].notFound;
@@ -152,22 +139,18 @@ PersistenceThread::handleRemove(api::RemoveCommand& cmd)
MessageTracker::UP
PersistenceThread::handleUpdate(api::UpdateCommand& cmd)
{
- MessageTracker::UP tracker(new MessageTracker(
- _env._metrics.update[cmd.getLoadType()],
- _env._component.getClock()));
+ auto tracker = std::make_unique<MessageTracker>(_env._metrics.update[cmd.getLoadType()],_env._component.getClock());
if (tasConditionExists(cmd) && !tasConditionMatches(cmd, *tracker)) {
return tracker;
}
- spi::UpdateResult response =
- _spi.update(getBucket(cmd.getUpdate()->getId(), cmd.getBucket()),
- spi::Timestamp(cmd.getTimestamp()),
- cmd.getUpdate(), _context);
+ spi::UpdateResult response = _spi.update(getBucket(cmd.getUpdate()->getId(), cmd.getBucket()),
+ spi::Timestamp(cmd.getTimestamp()), cmd.getUpdate(), _context);
if (checkForError(response, *tracker)) {
- api::UpdateReply* reply = new api::UpdateReply(cmd);
+ auto reply = std::make_shared<api::UpdateReply>(cmd);
reply->setOldTimestamp(response.getExistingTimestamp());
- tracker->setReply(api::StorageReply::SP(reply));
+ tracker->setReply(std::move(reply));
}
return tracker;
}
@@ -175,30 +158,18 @@ PersistenceThread::handleUpdate(api::UpdateCommand& cmd)
MessageTracker::UP
PersistenceThread::handleGet(api::GetCommand& cmd)
{
- MessageTracker::UP tracker(new MessageTracker(
- _env._metrics.get[cmd.getLoadType()],
- _env._component.getClock()));
+ auto tracker = std::make_unique<MessageTracker>(_env._metrics.get[cmd.getLoadType()],_env._component.getClock());
document::FieldSetRepo repo;
- document::FieldSet::UP fieldSet = repo.parse(*_env._component.getTypeRepo(),
- cmd.getFieldSet());
+ document::FieldSet::UP fieldSet = repo.parse(*_env._component.getTypeRepo(), cmd.getFieldSet());
spi::GetResult result =
- _spi.get(getBucket(cmd.getDocumentId(), cmd.getBucket()),
- *fieldSet,
- cmd.getDocumentId(),
- _context);
+ _spi.get(getBucket(cmd.getDocumentId(), cmd.getBucket()), *fieldSet, cmd.getDocumentId(), _context);
if (checkForError(result, *tracker)) {
if (!result.hasDocument()) {
++_env._metrics.get[cmd.getLoadType()].notFound;
}
-
- api::GetReply::UP reply(
- new api::GetReply(cmd,
- Document::SP(result.getDocumentPtr()),
- result.getTimestamp()));
-
- tracker->setReply(api::StorageReply::SP(reply.release()));
+ tracker->setReply(std::make_shared<api::GetReply>(cmd, result.getDocumentPtr(), result.getTimestamp()));
}
return tracker;
@@ -207,19 +178,14 @@ PersistenceThread::handleGet(api::GetCommand& cmd)
MessageTracker::UP
PersistenceThread::handleRepairBucket(RepairBucketCommand& cmd)
{
- MessageTracker::UP tracker(new MessageTracker(
- _env._metrics.repairs,
- _env._component.getClock()));
+ auto tracker = std::make_unique<MessageTracker>(_env._metrics.repairs,_env._component.getClock());
NotificationGuard notifyGuard(*_bucketOwnershipNotifier);
- LOG(debug, "Repair(%s): %s",
- cmd.getBucketId().toString().c_str(),
+ LOG(debug, "Repair(%s): %s", cmd.getBucketId().toString().c_str(),
(cmd.verifyBody() ? "Verifying body" : "Not verifying body"));
api::BucketInfo before = _env.getBucketInfo(cmd.getBucket());
spi::Result result =
- _spi.maintain(spi::Bucket(cmd.getBucket(),
- spi::PartitionId(_env._partition)),
- cmd.verifyBody() ?
- spi::HIGH : spi::LOW);
+ _spi.maintain(spi::Bucket(cmd.getBucket(), spi::PartitionId(_env._partition)),
+ cmd.verifyBody() ? spi::HIGH : spi::LOW);
if (checkForError(result, *tracker)) {
api::BucketInfo after = _env.getBucketInfo(cmd.getBucket());
@@ -239,15 +205,11 @@ PersistenceThread::handleRepairBucket(RepairBucketCommand& cmd)
MessageTracker::UP
PersistenceThread::handleRevert(api::RevertCommand& cmd)
{
- MessageTracker::UP tracker(new MessageTracker(
- _env._metrics.revert[cmd.getLoadType()],
- _env._component.getClock()));
+ auto tracker = std::make_unique<MessageTracker>(_env._metrics.revert[cmd.getLoadType()],_env._component.getClock());
spi::Bucket b = spi::Bucket(cmd.getBucket(), spi::PartitionId(_env._partition));
- const std::vector<api::Timestamp> tokens = cmd.getRevertTokens();
- for (uint32_t i = 0; i < tokens.size(); ++i) {
- spi::Result result = _spi.removeEntry(b,
- spi::Timestamp(tokens[i]),
- _context);
+ const std::vector<api::Timestamp> & tokens = cmd.getRevertTokens();
+ for (const api::Timestamp & token : tokens) {
+ spi::Result result = _spi.removeEntry(b, spi::Timestamp(token), _context);
}
return tracker;
}
@@ -255,9 +217,7 @@ PersistenceThread::handleRevert(api::RevertCommand& cmd)
MessageTracker::UP
PersistenceThread::handleCreateBucket(api::CreateBucketCommand& cmd)
{
- MessageTracker::UP tracker(new MessageTracker(
- _env._metrics.createBuckets,
- _env._component.getClock()));
+ auto tracker = std::make_unique<MessageTracker>(_env._metrics.createBuckets,_env._component.getClock());
LOG(debug, "CreateBucket(%s)", cmd.getBucketId().toString().c_str());
if (_env._fileStorHandler.isMerging(cmd.getBucket())) {
LOG(warning, "Bucket %s was merging at create time. Unexpected.",
@@ -299,8 +259,7 @@ PersistenceThread::checkProviderBucketInfoMatches(const spi::Bucket& bucket,
result.getErrorMessage().c_str());
return false;
}
- api::BucketInfo providerInfo(
- _env.convertBucketInfo(result.getBucketInfo()));
+ api::BucketInfo providerInfo(_env.convertBucketInfo(result.getBucketInfo()));
// Don't check meta fields or active/ready fields since these are not
// that important and ready may change under the hood in a race with
// getModifiedBuckets(). If bucket is empty it means it has already
@@ -323,15 +282,12 @@ PersistenceThread::checkProviderBucketInfoMatches(const spi::Bucket& bucket,
MessageTracker::UP
PersistenceThread::handleDeleteBucket(api::DeleteBucketCommand& cmd)
{
- MessageTracker::UP tracker(new MessageTracker(
- _env._metrics.deleteBuckets,
- _env._component.getClock()));
+ auto tracker = std::make_unique<MessageTracker>(_env._metrics.deleteBuckets,_env._component.getClock());
LOG(debug, "DeletingBucket(%s)", cmd.getBucketId().toString().c_str());
LOG_BUCKET_OPERATION(cmd.getBucketId(), "deleteBucket()");
if (_env._fileStorHandler.isMerging(cmd.getBucket())) {
_env._fileStorHandler.clearMergeStatus(cmd.getBucket(),
- api::ReturnCode(api::ReturnCode::ABORTED,
- "Bucket was deleted during the merge"));
+ api::ReturnCode(api::ReturnCode::ABORTED, "Bucket was deleted during the merge"));
}
spi::Bucket bucket(cmd.getBucket(), spi::PartitionId(_env._partition));
if (!checkProviderBucketInfoMatches(bucket, cmd.getBucketInfo())) {
@@ -340,8 +296,7 @@ PersistenceThread::handleDeleteBucket(api::DeleteBucketCommand& cmd)
_spi.deleteBucket(bucket, _context);
StorBucketDatabase& db(_env.getBucketDatabase(cmd.getBucket().getBucketSpace()));
{
- StorBucketDatabase::WrappedEntry entry(db.get(
- cmd.getBucketId(), "FileStorThread::onDeleteBucket"));
+ StorBucketDatabase::WrappedEntry entry(db.get(cmd.getBucketId(), "FileStorThread::onDeleteBucket"));
if (entry.exist() && entry->getMetaCount() > 0) {
LOG(debug, "onDeleteBucket(%s): Bucket DB entry existed. Likely "
"active operation when delete bucket was queued. "
@@ -365,11 +320,8 @@ PersistenceThread::handleDeleteBucket(api::DeleteBucketCommand& cmd)
MessageTracker::UP
PersistenceThread::handleGetIter(GetIterCommand& cmd)
{
- MessageTracker::UP tracker(new MessageTracker(
- _env._metrics.visit[cmd.getLoadType()],
- _env._component.getClock()));
- spi::IterateResult result(_spi.iterate(cmd.getIteratorId(),
- cmd.getMaxByteSize(), _context));
+ auto tracker = std::make_unique<MessageTracker>(_env._metrics.visit[cmd.getLoadType()],_env._component.getClock());
+ spi::IterateResult result(_spi.iterate(cmd.getIteratorId(), cmd.getMaxByteSize(), _context));
if (checkForError(result, *tracker)) {
GetIterReply::SP reply(new GetIterReply(cmd));
reply->getEntries() = result.steal_entries();
@@ -386,13 +338,11 @@ PersistenceThread::handleGetIter(GetIterCommand& cmd)
MessageTracker::UP
PersistenceThread::handleReadBucketList(ReadBucketList& cmd)
{
- MessageTracker::UP tracker(new MessageTracker(
- _env._metrics.readBucketList,
- _env._component.getClock()));
+ auto tracker = std::make_unique<MessageTracker>(_env._metrics.readBucketList,_env._component.getClock());
spi::BucketIdListResult result(_spi.listBuckets(cmd.getBucketSpace(), cmd.getPartition()));
if (checkForError(result, *tracker)) {
- ReadBucketListReply::SP reply(new ReadBucketListReply(cmd));
+ auto reply = std::make_shared<ReadBucketListReply>(cmd);
result.getList().swap(reply->getBuckets());
tracker->setReply(reply);
}
@@ -403,36 +353,24 @@ PersistenceThread::handleReadBucketList(ReadBucketList& cmd)
MessageTracker::UP
PersistenceThread::handleReadBucketInfo(ReadBucketInfo& cmd)
{
- MessageTracker::UP tracker(new MessageTracker(
- _env._metrics.readBucketInfo,
- _env._component.getClock()));
-
- _env.updateBucketDatabase(cmd.getBucket(),
- _env.getBucketInfo(cmd.getBucket()));
+ auto tracker = std::make_unique<MessageTracker>(_env._metrics.readBucketInfo,_env._component.getClock());
+ _env.updateBucketDatabase(cmd.getBucket(), _env.getBucketInfo(cmd.getBucket()));
return tracker;
}
MessageTracker::UP
PersistenceThread::handleCreateIterator(CreateIteratorCommand& cmd)
{
- MessageTracker::UP tracker(new MessageTracker(
- _env._metrics.createIterator,
- _env._component.getClock()));
+ auto tracker = std::make_unique<MessageTracker>(_env._metrics.createIterator,_env._component.getClock());
document::FieldSetRepo repo;
- document::FieldSet::UP fieldSet = repo.parse(*_env._component.getTypeRepo(),
- cmd.getFields());
+ document::FieldSet::UP fieldSet = repo.parse(*_env._component.getTypeRepo(), cmd.getFields());
// _context is reset per command, so it's safe to modify it like this.
_context.setReadConsistency(cmd.getReadConsistency());
spi::CreateIteratorResult result(_spi.createIterator(
spi::Bucket(cmd.getBucket(), spi::PartitionId(_env._partition)),
- *fieldSet,
- cmd.getSelection(),
- cmd.getIncludedVersions(),
- _context));
+ *fieldSet, cmd.getSelection(), cmd.getIncludedVersions(), _context));
if (checkForError(result, *tracker)) {
- tracker->setReply(CreateIteratorReply::SP(
- new CreateIteratorReply(
- cmd, spi::IteratorId(result.getIteratorId()))));
+ tracker->setReply(std::make_shared<CreateIteratorReply>(cmd, spi::IteratorId(result.getIteratorId())));
}
return tracker;
}
@@ -447,10 +385,8 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd)
// Calculate the various bucket ids involved.
if (cmd.getBucketId().getUsedBits() >= 58) {
- tracker->fail(
- api::ReturnCode::ILLEGAL_PARAMETERS,
- "Can't split anymore since maximum split bits "
- "is already reached");
+ tracker->fail(api::ReturnCode::ILLEGAL_PARAMETERS,
+ "Can't split anymore since maximum split bits is already reached");
return tracker;
}
if (cmd.getMaxSplitBits() <= cmd.getBucketId().getUsedBits()) {
@@ -470,13 +406,11 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd)
if (targetInfo.empty() || !_env._config.enableMultibitSplitOptimalization) {
document::BucketId src(cmd.getBucketId());
document::BucketId target1(src.getUsedBits() + 1, src.getId());
- document::BucketId target2(src.getUsedBits() + 1, src.getId()
- | (uint64_t(1) << src.getUsedBits()));
+ document::BucketId target2(src.getUsedBits() + 1, src.getId() | (uint64_t(1) << src.getUsedBits()));
targetInfo = SplitBitDetector::Result(target1, target2, false);
}
if (targetInfo.failed()) {
- tracker->fail(api::ReturnCode::INTERNAL_FAILURE,
- targetInfo.getReason());
+ tracker->fail(api::ReturnCode::INTERNAL_FAILURE, targetInfo.getReason());
return tracker;
}
// If we get here, we're splitting data in two.
@@ -519,8 +453,9 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd)
// Ensure to take them in rising order.
StorBucketDatabase::WrappedEntry sourceEntry(_env.getBucketDatabase(spiBucket.getBucket().getBucketSpace()).get(
cmd.getBucketId(), "PersistenceThread::handleSplitBucket-source"));
- api::SplitBucketReply* splitReply(new api::SplitBucketReply(cmd));
- tracker->setReply(api::StorageReply::SP(splitReply));
+ auto reply = std::make_shared<api::SplitBucketReply>(cmd);
+ api::SplitBucketReply & splitReply = *reply;
+ tracker->setReply(std::move(reply));
typedef std::pair<StorBucketDatabase::WrappedEntry,
FileStorHandler::RemapInfo> TargetInfo;
@@ -581,7 +516,7 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd)
createTarget.toString().c_str());
_spi.createBucket(createTarget, _context);
}
- splitReply->getSplitInfo().push_back(
+ splitReply.getSplitInfo().push_back(
api::SplitBucketReply::Entry(
targets[i].second.bucket.getBucketId(),
targets[i].first->getBucketInfo()));
@@ -953,28 +888,23 @@ PersistenceThread::processMessage(api::StorageMessage& msg)
++_env._metrics.operations;
if (msg.getType().isReply()) {
try{
- _env._pauseHandler.setPriority(msg.getPriority());
LOG(debug, "Handling reply: %s", msg.toString().c_str());
LOG(spam, "Message content: %s", msg.toString(true).c_str());
handleReply(static_cast<api::StorageReply&>(msg));
} catch (std::exception& e) {
// It's a reply, so nothing we can do.
- LOG(debug, "Caught exception for %s: %s",
- msg.toString().c_str(),
- e.what());
+ LOG(debug, "Caught exception for %s: %s", msg.toString().c_str(), e.what());
}
} else {
api::StorageCommand& initiatingCommand =
static_cast<api::StorageCommand&>(msg);
try {
- int64_t startTime(
- _component->getClock().getTimeInMillis().getTime());
+ int64_t startTime(_component->getClock().getTimeInMillis().getTime());
LOG(debug, "Handling command: %s", msg.toString().c_str());
LOG(spam, "Message content: %s", msg.toString(true).c_str());
- std::unique_ptr<MessageTracker> tracker(
- handleCommand(initiatingCommand));
+ auto tracker(handleCommand(initiatingCommand));
if (!tracker.get()) {
LOG(debug, "Received unsupported command %s",
msg.getType().getName().c_str());
@@ -1080,23 +1010,18 @@ PersistenceThread::flushAllReplies(
if (errorCode != 0) {
for (uint32_t i = 0; i < replies.size(); ++i) {
replies[i]->getReply()->setResult(
- api::ReturnCode(
- (api::ReturnCode::Result)errorCode,
- result.getErrorMessage()));
+ api::ReturnCode((api::ReturnCode::Result)errorCode, result.getErrorMessage()));
}
}
} catch (std::exception& e) {
for (uint32_t i = 0; i < replies.size(); ++i) {
- replies[i]->getReply()->setResult(api::ReturnCode(
- api::ReturnCode::INTERNAL_FAILURE, e.what()));
+ replies[i]->getReply()->setResult(api::ReturnCode(api::ReturnCode::INTERNAL_FAILURE, e.what()));
}
}
for (uint32_t i = 0; i < replies.size(); ++i) {
- LOG(spam,
- "Sending reply up (batched): %s %zu",
- replies[i]->getReply()->toString().c_str(),
- replies[i]->getReply()->getMsgId());
+ LOG(spam, "Sending reply up (batched): %s %zu",
+ replies[i]->getReply()->toString().c_str(), replies[i]->getReply()->getMsgId());
_env._fileStorHandler.sendReply(replies[i]->getReply());
}
@@ -1108,7 +1033,7 @@ void PersistenceThread::processMessages(FileStorHandler::LockedMessage & lock)
std::vector<MessageTracker::UP> trackers;
document::Bucket bucket = lock.first->getBucket();
- while (lock.second.get() != 0) {
+ while (lock.second) {
LOG(debug, "Inside while loop %d, nodeIndex %d, ptr=%p",
_env._partition, _env._nodeIndex, lock.second.get());
std::shared_ptr<api::StorageMessage> msg(lock.second);
@@ -1121,7 +1046,7 @@ void PersistenceThread::processMessages(FileStorHandler::LockedMessage & lock)
}
std::unique_ptr<MessageTracker> tracker = processMessage(*msg);
- if (!tracker.get() || !tracker->getReply().get()) {
+ if (!tracker || !tracker->getReply()) {
// Was a reply
break;
}
@@ -1133,24 +1058,18 @@ void PersistenceThread::processMessages(FileStorHandler::LockedMessage & lock)
}
if (batchable) {
LOG(spam, "Adding reply %s to batch for bucket %s",
- tracker->getReply()->toString().c_str(),
- bucket.getBucketId().toString().c_str());
+ tracker->getReply()->toString().c_str(), bucket.getBucketId().toString().c_str());
trackers.push_back(std::move(tracker));
if (trackers.back()->getReply()->getResult().success()) {
- _env._fileStorHandler.getNextMessage(
- _env._partition,
- lock,
- _env._lowestPriority);
+ _env._fileStorHandler.getNextMessage(_env._partition, lock);
} else {
break;
}
} else {
- LOG(spam,
- "Sending reply up: %s %zu",
- tracker->getReply()->toString().c_str(),
- tracker->getReply()->getMsgId());
+ LOG(spam, "Sending reply up: %s %zu",
+ tracker->getReply()->toString().c_str(), tracker->getReply()->getMsgId());
_env._fileStorHandler.sendReply(tracker->getReply());
break;
@@ -1165,16 +1084,12 @@ PersistenceThread::run(framework::ThreadHandle& thread)
{
LOG(debug, "Started persistence thread with pid %d", getpid());
- while (!thread.interrupted()
- && !_env._fileStorHandler.closed(_env._partition))
- {
+ while (!thread.interrupted() && !_env._fileStorHandler.closed(_env._partition)) {
thread.registerTick();
- FileStorHandler::LockedMessage lock(
- _env._fileStorHandler.getNextMessage(
- _env._partition, _env._lowestPriority));
+ FileStorHandler::LockedMessage lock(_env._fileStorHandler.getNextMessage(_env._partition));
- if (lock.first.get()) {
+ if (lock.first) {
processMessages(lock);
}
diff --git a/storage/src/vespa/storage/persistence/persistencethread.h b/storage/src/vespa/storage/persistence/persistencethread.h
index 07fadb875cc..640614f7edf 100644
--- a/storage/src/vespa/storage/persistence/persistencethread.h
+++ b/storage/src/vespa/storage/persistence/persistencethread.h
@@ -21,7 +21,7 @@ class PersistenceThread final : public DiskThread, public Types
public:
PersistenceThread(ServiceLayerComponentRegister&, const config::ConfigUri & configUri,
spi::PersistenceProvider& provider, FileStorHandler& filestorHandler,
- FileStorThreadMetrics& metrics, uint16_t deviceIndex, uint8_t lowestPriority);
+ FileStorThreadMetrics& metrics, uint16_t deviceIndex);
~PersistenceThread();
/** Waits for current operation to be finished. */
diff --git a/storage/src/vespa/storage/persistence/persistenceutil.cpp b/storage/src/vespa/storage/persistence/persistenceutil.cpp
index 8e96c05a66a..c2dcb8e2a29 100644
--- a/storage/src/vespa/storage/persistence/persistenceutil.cpp
+++ b/storage/src/vespa/storage/persistence/persistenceutil.cpp
@@ -66,7 +66,6 @@ PersistenceUtil::PersistenceUtil(
FileStorHandler& fileStorHandler,
FileStorThreadMetrics& metrics,
uint16_t partition,
- uint8_t lowestPriority,
spi::PersistenceProvider& provider)
: _config(*config::ConfigGetter<vespa::config::content::StorFilestorConfig>::getConfig(configUri.getConfigId(), configUri.getContext())),
_compReg(compReg),
@@ -77,24 +76,18 @@ PersistenceUtil::PersistenceUtil(
_metrics(metrics),
_bucketFactory(_component.getBucketIdFactory()),
_repo(_component.getTypeRepo()),
- _lowestPriority(lowestPriority),
- _pauseHandler(),
_spi(provider)
{
}
-PersistenceUtil::~PersistenceUtil()
-{
-}
+PersistenceUtil::~PersistenceUtil() { }
void
-PersistenceUtil::updateBucketDatabase(const document::Bucket &bucket,
- const api::BucketInfo& i)
+PersistenceUtil::updateBucketDatabase(const document::Bucket &bucket, const api::BucketInfo& i)
{
// Update bucket database
- StorBucketDatabase::WrappedEntry entry(getBucketDatabase(bucket.getBucketSpace()).get(
- bucket.getBucketId(),
- "env::updatebucketdb"));
+ StorBucketDatabase::WrappedEntry entry(getBucketDatabase(bucket.getBucketSpace()).get(bucket.getBucketId(),
+ "env::updatebucketdb"));
if (entry.exist()) {
api::BucketInfo info = i;
@@ -106,9 +99,7 @@ PersistenceUtil::updateBucketDatabase(const document::Bucket &bucket,
entry->setBucketInfo(info);
entry.write();
} else {
- LOG(debug,
- "Bucket(%s).getBucketInfo: Bucket does not exist.",
- bucket.getBucketId().toString().c_str());
+ LOG(debug, "Bucket(%s).getBucketInfo: Bucket does not exist.", bucket.getBucketId().toString().c_str());
}
}
diff --git a/storage/src/vespa/storage/persistence/persistenceutil.h b/storage/src/vespa/storage/persistence/persistenceutil.h
index 5126931bab6..bf347ccb10a 100644
--- a/storage/src/vespa/storage/persistence/persistenceutil.h
+++ b/storage/src/vespa/storage/persistence/persistenceutil.h
@@ -1,12 +1,11 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
+#include "types.h"
#include <vespa/persistence/spi/persistenceprovider.h>
#include <vespa/storage/common/servicelayercomponent.h>
#include <vespa/storage/persistence/filestorage/filestorhandler.h>
#include <vespa/storage/persistence/filestorage/filestormetrics.h>
-#include <vespa/storage/persistence/filestorage/pausehandler.h>
-#include <vespa/storage/persistence/types.h>
#include <vespa/vespalib/io/fileutil.h>
#include <vespa/storage/storageutil/utils.h>
#include <vespa/config-stor-filestor.h>
@@ -70,8 +69,6 @@ struct PersistenceUtil {
FileStorThreadMetrics& _metrics;
const document::BucketIdFactory& _bucketFactory;
const std::shared_ptr<document::DocumentTypeRepo> _repo;
- uint8_t _lowestPriority;
- PauseHandler _pauseHandler;
spi::PersistenceProvider& _spi;
PersistenceUtil(
@@ -80,7 +77,6 @@ struct PersistenceUtil {
FileStorHandler& fileStorHandler,
FileStorThreadMetrics& metrics,
uint16_t partition,
- uint8_t lowestPriority,
spi::PersistenceProvider& provider);
~PersistenceUtil();
diff --git a/storage/src/vespa/storage/storageserver/storagenode.h b/storage/src/vespa/storage/storageserver/storagenode.h
index 38a48ac2eed..c8739442c13 100644
--- a/storage/src/vespa/storage/storageserver/storagenode.h
+++ b/storage/src/vespa/storage/storageserver/storagenode.h
@@ -115,20 +115,20 @@ private:
bool _attemptedStopped;
vespalib::string _pidFile;
- // First components that doesn't depend on others
+ // First components that doesn't depend on others
std::unique_ptr<StatusWebServer> _statusWebServer;
std::shared_ptr<StorageMetricSet> _metrics;
std::unique_ptr<metrics::MetricManager> _metricManager;
- // Depends on bucket databases and stop() functionality
+ // Depends on bucket databases and stop() functionality
std::unique_ptr<DeadLockDetector> _deadLockDetector;
- // Depends on metric manager
+ // Depends on metric manager
std::unique_ptr<StatusMetricConsumer> _statusMetrics;
- // Depends on metric manager
+ // Depends on metric manager
std::unique_ptr<StateReporter> _stateReporter;
std::unique_ptr<StateManager> _stateManager;
- // The storage chain can depend on anything.
+ // The storage chain can depend on anything.
std::unique_ptr<StorageLink> _chain;
/** Implementation of config callbacks. */