aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-10-18 19:21:33 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2020-10-19 11:49:07 +0000
commit7dff0a9879800c9da8643d1f5c7d4f39fc910467 (patch)
tree1d4ab8c864c73b1d13f2fdf8e9373eb5b56b086f
parent779aeaf46753f97c8cc7221774a1c7c91a797c80 (diff)
Split the persistence thread and the message handler.
- Let FileStorManager own and control the Component and PersistenceHandler separately from the Persistence thread. - Let FileStorManager allocate and control stripe assignment.
-rw-r--r--storage/src/tests/persistence/filestorage/filestormanagertest.cpp85
-rw-r--r--storage/src/tests/persistence/persistencequeuetest.cpp5
-rw-r--r--storage/src/tests/persistence/persistencetestutils.cpp16
-rw-r--r--storage/src/tests/persistence/persistencetestutils.h7
-rw-r--r--storage/src/tests/persistence/persistencethread_splittest.cpp3
-rw-r--r--storage/src/tests/persistence/testandsettest.cpp37
-rw-r--r--storage/src/vespa/storage/persistence/CMakeLists.txt1
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandler.h3
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp1
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h4
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp21
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormanager.h5
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.cpp25
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.h22
-rw-r--r--storage/src/vespa/storage/persistence/persistencehandler.cpp162
-rw-r--r--storage/src/vespa/storage/persistence/persistencehandler.h50
-rw-r--r--storage/src/vespa/storage/persistence/persistencethread.cpp174
-rw-r--r--storage/src/vespa/storage/persistence/persistencethread.h46
-rw-r--r--storage/src/vespa/storage/persistence/persistenceutil.cpp6
-rw-r--r--storage/src/vespa/storage/persistence/persistenceutil.h8
-rw-r--r--storage/src/vespa/storage/persistence/processallhandler.cpp4
-rw-r--r--storage/src/vespa/storage/persistence/processallhandler.h4
22 files changed, 355 insertions, 334 deletions
diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
index 761888e9f9b..3f60189204b 100644
--- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
+++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
@@ -194,17 +194,11 @@ bool fileExistsWithin(const std::string& path, const std::string& file) {
}
std::unique_ptr<DiskThread>
-createThread(vdstestlib::DirConfig& config,
- TestServiceLayerApp& node,
- spi::PersistenceProvider& provider,
+createThread(PersistenceHandler & persistenceHandler,
FileStorHandler& filestorHandler,
- BucketOwnershipNotifier & notifier,
- FileStorThreadMetrics& metrics)
+ framework::Component & component)
{
- (void) config;
- vespa::config::content::StorFilestorConfig cfg;
- return std::make_unique<PersistenceThread>(node.executor(), node.getComponentRegister(), cfg,
- provider, filestorHandler, notifier, metrics);
+ return std::make_unique<PersistenceThread>(persistenceHandler, filestorHandler, 0, component);
}
namespace {
@@ -402,8 +396,7 @@ TEST_F(FileStorManagerTest, handler_priority) {
FileStorHandlerImpl filestorHandler(messageSender, metrics, _node->getComponentRegister());
filestorHandler.setGetNextMessageTimeout(50ms);
- uint32_t stripeId = filestorHandler.getNextStripeId();
- ASSERT_EQ(0u, stripeId);
+ uint32_t stripeId = 0;
std::string content("Here is some content which is in all documents");
std::ostringstream uri;
@@ -469,7 +462,7 @@ public:
std::atomic<bool> _threadDone;
explicit MessageFetchingThread(FileStorHandler& handler)
- : _threadId(handler.getNextStripeId()), _handler(handler), _config(0), _fetchedCount(0), _done(false),
+ : _threadId(0), _handler(handler), _config(0), _fetchedCount(0), _done(false),
_failed(false), _threadDone(false)
{}
@@ -556,7 +549,7 @@ TEST_F(FileStorManagerTest, handler_pause) {
FileStorHandlerImpl filestorHandler(messageSender, metrics, _node->getComponentRegister());
filestorHandler.setGetNextMessageTimeout(50ms);
- uint32_t stripeId = filestorHandler.getNextStripeId();
+ uint32_t stripeId = 0;
std::string content("Here is some content which is in all documents");
std::ostringstream uri;
@@ -660,7 +653,7 @@ TEST_F(FileStorManagerTest, handler_timeout) {
FileStorHandlerImpl filestorHandler(messageSender, metrics, _node->getComponentRegister());
filestorHandler.setGetNextMessageTimeout(50ms);
- uint32_t stripeId = filestorHandler.getNextStripeId();
+ uint32_t stripeId = 0;
std::string content("Here is some content which is in all documents");
std::ostringstream uri;
@@ -721,12 +714,14 @@ TEST_F(FileStorManagerTest, priority) {
FileStorHandlerImpl filestorHandler(messageSender, metrics, _node->getComponentRegister());
ServiceLayerComponent component(_node->getComponentRegister(), "test");
BucketOwnershipNotifier bucketOwnershipNotifier(component, messageSender);
- std::unique_ptr<DiskThread> thread(createThread(
- *config, *_node, _node->getPersistenceProvider(),
- filestorHandler, bucketOwnershipNotifier, *metrics.disks[0]->threads[0]));
- std::unique_ptr<DiskThread> thread2(createThread(
- *config, *_node, _node->getPersistenceProvider(),
- filestorHandler, bucketOwnershipNotifier, *metrics.disks[0]->threads[1]));
+ vespa::config::content::StorFilestorConfig cfg;
+ PersistenceHandler persistenceHandler(_node->executor(), component, cfg, _node->getPersistenceProvider(),
+ filestorHandler, bucketOwnershipNotifier, *metrics.disks[0]->threads[0]);
+ std::unique_ptr<DiskThread> thread(createThread(persistenceHandler, filestorHandler, component));
+
+ PersistenceHandler persistenceHandler2(_node->executor(), component, cfg, _node->getPersistenceProvider(),
+ filestorHandler, bucketOwnershipNotifier, *metrics.disks[0]->threads[1]);
+ std::unique_ptr<DiskThread> thread2(createThread(persistenceHandler2, filestorHandler, component));
// Creating documents to test with. Different gids, 2 locations.
std::vector<document::Document::SP > documents;
@@ -803,9 +798,10 @@ TEST_F(FileStorManagerTest, split1) {
FileStorHandlerImpl filestorHandler(messageSender, metrics, _node->getComponentRegister());
ServiceLayerComponent component(_node->getComponentRegister(), "test");
BucketOwnershipNotifier bucketOwnershipNotifier(component, messageSender);
- std::unique_ptr<DiskThread> thread(createThread(
- *config, *_node, _node->getPersistenceProvider(),
- filestorHandler, bucketOwnershipNotifier, *metrics.disks[0]->threads[0]));
+ vespa::config::content::StorFilestorConfig cfg;
+ PersistenceHandler persistenceHandler(_node->executor(), component, cfg, _node->getPersistenceProvider(),
+ filestorHandler, bucketOwnershipNotifier, *metrics.disks[0]->threads[0]);
+ std::unique_ptr<DiskThread> thread(createThread(persistenceHandler, filestorHandler, component));
// Creating documents to test with. Different gids, 2 locations.
std::vector<document::Document::SP > documents;
for (uint32_t i=0; i<20; ++i) {
@@ -814,8 +810,7 @@ TEST_F(FileStorManagerTest, split1) {
uri << "id:footype:testdoctype1:n=" << (i % 3 == 0 ? 0x10001 : 0x0100001)
<< ":mydoc-" << i;
- Document::SP doc(createDocument(
- content, uri.str()).release());
+ Document::SP doc(createDocument(content, uri.str()).release());
documents.push_back(doc);
}
document::BucketIdFactory factory;
@@ -824,11 +819,9 @@ TEST_F(FileStorManagerTest, split1) {
{
// Populate bucket with the given data
for (uint32_t i=0; i<documents.size(); ++i) {
- document::BucketId bucket(16, factory.getBucketId(
- documents[i]->getId()).getRawId());
+ document::BucketId bucket(16, factory.getBucketId(documents[i]->getId()).getRawId());
- _node->getPersistenceProvider().createBucket(
- makeSpiBucket(bucket), context);
+ _node->getPersistenceProvider().createBucket(makeSpiBucket(bucket), context);
auto cmd = std::make_shared<api::PutCommand>(makeDocumentBucket(bucket), documents[i], 100 + i);
auto address = std::make_unique<api::StorageMessageAddress>("storage", lib::NodeType::STORAGE, 3);
@@ -946,15 +939,16 @@ TEST_F(FileStorManagerTest, split_single_group) {
ServiceLayerComponent component(_node->getComponentRegister(), "test");
BucketOwnershipNotifier bucketOwnershipNotifier(component, messageSender);
spi::Context context(defaultLoadType, spi::Priority(0), spi::Trace::TraceLevel(0));
+ vespa::config::content::StorFilestorConfig cfg;
+ PersistenceHandler persistenceHandler(_node->executor(), component, cfg, _node->getPersistenceProvider(),
+ filestorHandler, bucketOwnershipNotifier, *metrics.disks[0]->threads[0]);
for (uint32_t j=0; j<1; ++j) {
// Test this twice, once where all the data ends up in file with
// splitbit set, and once where all the data ends up in file with
// splitbit unset
bool state = (j == 0);
- std::unique_ptr<DiskThread> thread(createThread(
- *config, *_node, _node->getPersistenceProvider(),
- filestorHandler, bucketOwnershipNotifier, *metrics.disks[0]->threads[0]));
+ std::unique_ptr<DiskThread> thread(createThread(persistenceHandler, filestorHandler, component));
// Creating documents to test with. Different gids, 2 locations.
std::vector<document::Document::SP> documents;
for (uint32_t i=0; i<20; ++i) {
@@ -1059,9 +1053,10 @@ TEST_F(FileStorManagerTest, split_empty_target_with_remapped_ops) {
FileStorHandlerImpl filestorHandler(messageSender, metrics, _node->getComponentRegister());
ServiceLayerComponent component(_node->getComponentRegister(), "test");
BucketOwnershipNotifier bucketOwnershipNotifier(component, messageSender);
- std::unique_ptr<DiskThread> thread(createThread(
- *config, *_node, _node->getPersistenceProvider(),
- filestorHandler, bucketOwnershipNotifier, *metrics.disks[0]->threads[0]));
+ vespa::config::content::StorFilestorConfig cfg;
+ PersistenceHandler persistenceHandler(_node->executor(), component, cfg, _node->getPersistenceProvider(),
+ filestorHandler, bucketOwnershipNotifier, *metrics.disks[0]->threads[0]);
+ std::unique_ptr<DiskThread> thread(createThread(persistenceHandler, filestorHandler, component));
document::BucketId source(16, 0x10001);
@@ -1126,9 +1121,10 @@ TEST_F(FileStorManagerTest, notify_on_split_source_ownership_changed) {
FileStorHandlerImpl filestorHandler(messageSender, metrics, _node->getComponentRegister());
ServiceLayerComponent component(_node->getComponentRegister(), "test");
BucketOwnershipNotifier bucketOwnershipNotifier(component, messageSender);
- std::unique_ptr<DiskThread> thread(createThread(
- *config, *_node, _node->getPersistenceProvider(),
- filestorHandler, bucketOwnershipNotifier, *metrics.disks[0]->threads[0]));
+ vespa::config::content::StorFilestorConfig cfg;
+ PersistenceHandler persistenceHandler(_node->executor(), component, cfg, _node->getPersistenceProvider(),
+ filestorHandler, bucketOwnershipNotifier, *metrics.disks[0]->threads[0]);
+ std::unique_ptr<DiskThread> thread(createThread(persistenceHandler, filestorHandler, component));
document::BucketId source(getFirstBucketNotOwnedByDistributor(0));
createBucket(source, 0);
@@ -1158,8 +1154,7 @@ TEST_F(FileStorManagerTest, join) {
// Setup a filestorthread to test
DummyStorageLink top;
DummyStorageLink *dummyManager;
- top.push_back(std::unique_ptr<StorageLink>(
- dummyManager = new DummyStorageLink));
+ top.push_back(std::unique_ptr<StorageLink>(dummyManager = new DummyStorageLink));
top.open();
ForwardingMessageSender messageSender(*dummyManager);
@@ -1169,9 +1164,10 @@ TEST_F(FileStorManagerTest, join) {
FileStorHandlerImpl filestorHandler(messageSender, metrics, _node->getComponentRegister());
ServiceLayerComponent component(_node->getComponentRegister(), "test");
BucketOwnershipNotifier bucketOwnershipNotifier(component, messageSender);
- std::unique_ptr<DiskThread> thread(createThread(
- *config, *_node, _node->getPersistenceProvider(),
- filestorHandler, bucketOwnershipNotifier, *metrics.disks[0]->threads[0]));
+ vespa::config::content::StorFilestorConfig cfg;
+ PersistenceHandler persistenceHandler(_node->executor(), component, cfg, _node->getPersistenceProvider(),
+ filestorHandler, bucketOwnershipNotifier, *metrics.disks[0]->threads[0]);
+ std::unique_ptr<DiskThread> thread(createThread(persistenceHandler, filestorHandler, component));
// Creating documents to test with. Different gids, 2 locations.
std::vector<document::Document::SP > documents;
for (uint32_t i=0; i<20; ++i) {
@@ -1259,8 +1255,7 @@ createIterator(DummyStorageLink& link,
{
spi::Bucket bucket(makeSpiBucket(bucketId));
- spi::Selection selection =
- spi::Selection(spi::DocumentSelection(docSel));
+ spi::Selection selection = spi::Selection(spi::DocumentSelection(docSel));
selection.setFromTimestamp(spi::Timestamp(fromTime.getTime()));
selection.setToTimestamp(spi::Timestamp(toTime.getTime()));
auto createIterCmd = std::make_shared<CreateIteratorCommand>(
diff --git a/storage/src/tests/persistence/persistencequeuetest.cpp b/storage/src/tests/persistence/persistencequeuetest.cpp
index 4737809a926..a19d060474b 100644
--- a/storage/src/tests/persistence/persistencequeuetest.cpp
+++ b/storage/src/tests/persistence/persistencequeuetest.cpp
@@ -42,7 +42,8 @@ PersistenceQueueTest::Fixture::Fixture(FileStorTestFixture& parent_)
dummyManager(std::make_unique<DummyStorageLink>()),
messageSender(*dummyManager),
loadTypes("raw:"),
- metrics(loadTypes.getMetricLoadTypes())
+ metrics(loadTypes.getMetricLoadTypes()),
+ stripeId(0)
{
top.push_back(std::move(dummyManager));
top.open();
@@ -55,8 +56,6 @@ PersistenceQueueTest::Fixture::Fixture(FileStorTestFixture& parent_)
// that is large enough to fail tests with high probability if this is not the case,
// and small enough to not slow down testing too much.
filestorHandler->setGetNextMessageTimeout(20ms);
-
- stripeId = filestorHandler->getNextStripeId();
}
PersistenceQueueTest::Fixture::~Fixture() = default;
diff --git a/storage/src/tests/persistence/persistencetestutils.cpp b/storage/src/tests/persistence/persistencetestutils.cpp
index e2d61b9db2c..80ba4c19384 100644
--- a/storage/src/tests/persistence/persistencetestutils.cpp
+++ b/storage/src/tests/persistence/persistencetestutils.cpp
@@ -70,9 +70,14 @@ PersistenceTestEnvironment::~PersistenceTestEnvironment() {
PersistenceTestUtils::PersistenceTestUtils()
: _env(std::make_unique<PersistenceTestEnvironment>("todo-make-unique-persistencetestutils")),
_replySender(),
- _bucketOwnershipNotifier(getEnv()._component, getEnv()._fileStorHandler)
+ _bucketOwnershipNotifier(getEnv()._component, getEnv()._fileStorHandler),
+ _persistenceHandler()
{
setupExecutor(1);
+ vespa::config::content::StorFilestorConfig cfg;
+ _persistenceHandler = std::make_unique<PersistenceHandler>(*_sequenceTaskExecutor, _env->_component, cfg,
+ getPersistenceProvider(), getEnv()._fileStorHandler,
+ _bucketOwnershipNotifier, getEnv()._metrics);
}
PersistenceTestUtils::~PersistenceTestUtils() = default;
@@ -86,15 +91,6 @@ PersistenceTestUtils::setupExecutor(uint32_t numThreads) {
_sequenceTaskExecutor = vespalib::SequencedTaskExecutor::create(numThreads, 1000, vespalib::Executor::OptimizeFor::ADAPTIVE);
}
-std::unique_ptr<PersistenceThread>
-PersistenceTestUtils::createPersistenceThread()
-{
- vespa::config::content::StorFilestorConfig cfg;
- return std::make_unique<PersistenceThread>(*_sequenceTaskExecutor, _env->_node.getComponentRegister(),
- cfg, getPersistenceProvider(),
- getEnv()._fileStorHandler, _bucketOwnershipNotifier, getEnv()._metrics);
-}
-
document::Document::SP
PersistenceTestUtils::schedulePut(
uint32_t location,
diff --git a/storage/src/tests/persistence/persistencetestutils.h b/storage/src/tests/persistence/persistencetestutils.h
index 29a18db413b..332a30393b1 100644
--- a/storage/src/tests/persistence/persistencetestutils.h
+++ b/storage/src/tests/persistence/persistencetestutils.h
@@ -73,7 +73,7 @@ public:
std::unique_ptr<vespalib::ISequencedTaskExecutor> _sequenceTaskExecutor;
ReplySender _replySender;
BucketOwnershipNotifier _bucketOwnershipNotifier;
-
+ std::unique_ptr<PersistenceHandler> _persistenceHandler;
PersistenceTestUtils();
~PersistenceTestUtils() override;
@@ -225,11 +225,6 @@ public:
void createTestBucket(const document::Bucket&);
/**
- * Create a new persistence thread.
- */
- std::unique_ptr<PersistenceThread> createPersistenceThread();
-
- /**
* In-place modify doc so that it has no more body fields.
*/
void clearBody(document::Document& doc);
diff --git a/storage/src/tests/persistence/persistencethread_splittest.cpp b/storage/src/tests/persistence/persistencethread_splittest.cpp
index e266d367eab..9ac0c7bbfc8 100644
--- a/storage/src/tests/persistence/persistencethread_splittest.cpp
+++ b/storage/src/tests/persistence/persistencethread_splittest.cpp
@@ -204,7 +204,6 @@ PersistenceThreadSplitTest::doTest(SplitCase splitCase)
spi.put(bucket, spi::Timestamp(1000 + i), std::move(doc), context);
}
- std::unique_ptr<PersistenceThread> thread(createPersistenceThread());
getNode().getStateUpdater().setClusterState(
std::make_shared<lib::ClusterState>("distributor:1 storage:1"));
document::Bucket docBucket = makeDocumentBucket(document::BucketId(currentSplitLevel, 1));
@@ -214,7 +213,7 @@ PersistenceThreadSplitTest::doTest(SplitCase splitCase)
cmd->setMinByteSize(maxSize);
cmd->setMinDocCount(maxCount);
cmd->setSourceIndex(0);
- MessageTracker::UP result = thread->splitjoinHandler().handleSplitBucket(*cmd, createTracker(cmd, docBucket));
+ MessageTracker::UP result = _persistenceHandler->splitjoinHandler().handleSplitBucket(*cmd, createTracker(cmd, docBucket));
api::ReturnCode code(result->getResult());
EXPECT_EQ(error, code);
if (!code.success()) {
diff --git a/storage/src/tests/persistence/testandsettest.cpp b/storage/src/tests/persistence/testandsettest.cpp
index 3d0dd183232..228ea29ab42 100644
--- a/storage/src/tests/persistence/testandsettest.cpp
+++ b/storage/src/tests/persistence/testandsettest.cpp
@@ -32,13 +32,16 @@ struct TestAndSetTest : SingleDiskPersistenceTestUtils {
const document::StringFieldValue NEW_CONTENT{"Freshly pressed and squeezed content"};
const document::Bucket BUCKET = makeDocumentBucket(BUCKET_ID);
- unique_ptr<PersistenceThread> thread;
+ unique_ptr<PersistenceHandler> persistenceHandler;
+ const AsyncHandler * asyncHandler;
shared_ptr<document::Document> testDoc;
document::DocumentId testDocId;
spi::Context context;
TestAndSetTest()
- : context(spi::LoadType(0, "default"), 0, 0)
+ : persistenceHandler(),
+ asyncHandler(nullptr),
+ context(spi::LoadType(0, "default"), 0, 0)
{}
void SetUp() override {
@@ -47,14 +50,12 @@ struct TestAndSetTest : SingleDiskPersistenceTestUtils {
createBucket(BUCKET_ID);
getPersistenceProvider().createBucket(makeSpiBucket(BUCKET_ID),context);
- thread = createPersistenceThread();
testDoc = createTestDocument();
testDocId = testDoc->getId();
+ asyncHandler = &_persistenceHandler->asyncHandler();
}
void TearDown() override {
- thread->flush();
- thread.reset();
SingleDiskPersistenceTestUtils::TearDown();
}
@@ -86,7 +87,7 @@ TEST_F(TestAndSetTest, conditional_put_not_executed_on_condition_mismatch) {
auto putTwo = std::make_shared<api::PutCommand>(BUCKET, testDoc, timestampTwo);
setTestCondition(*putTwo);
- ASSERT_EQ(fetchResult(thread->asyncHandler().handlePut(*putTwo, createTracker(putTwo, BUCKET))).getResult(),
+ ASSERT_EQ(fetchResult(asyncHandler->handlePut(*putTwo, createTracker(putTwo, BUCKET))).getResult(),
api::ReturnCode::Result::TEST_AND_SET_CONDITION_FAILED);
EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId), dumpBucket(BUCKET_ID));
}
@@ -106,7 +107,7 @@ TEST_F(TestAndSetTest, conditional_put_executed_on_condition_match) {
auto putTwo = std::make_shared<api::PutCommand>(BUCKET, testDoc, timestampTwo);
setTestCondition(*putTwo);
- ASSERT_EQ(fetchResult(thread->asyncHandler().handlePut(*putTwo, createTracker(putTwo, BUCKET))).getResult(), api::ReturnCode::Result::OK);
+ ASSERT_EQ(fetchResult(asyncHandler->handlePut(*putTwo, createTracker(putTwo, BUCKET))).getResult(), api::ReturnCode::Result::OK);
EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId) +
expectedDocEntryString(timestampTwo, testDocId),
dumpBucket(BUCKET_ID));
@@ -126,7 +127,7 @@ TEST_F(TestAndSetTest, conditional_remove_not_executed_on_condition_mismatch) {
auto remove = std::make_shared<api::RemoveCommand>(BUCKET, testDocId, timestampTwo);
setTestCondition(*remove);
- ASSERT_EQ(fetchResult(thread->asyncHandler().handleRemove(*remove, createTracker(remove, BUCKET))).getResult(),
+ ASSERT_EQ(fetchResult(asyncHandler->handleRemove(*remove, createTracker(remove, BUCKET))).getResult(),
api::ReturnCode::Result::TEST_AND_SET_CONDITION_FAILED);
EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId), dumpBucket(BUCKET_ID));
@@ -146,7 +147,7 @@ TEST_F(TestAndSetTest, conditional_remove_executed_on_condition_match) {
auto remove = std::make_shared<api::RemoveCommand>(BUCKET, testDocId, timestampTwo);
setTestCondition(*remove);
- ASSERT_EQ(fetchResult(thread->asyncHandler().handleRemove(*remove, createTracker(remove, BUCKET))).getResult(), api::ReturnCode::Result::OK);
+ ASSERT_EQ(fetchResult(asyncHandler->handleRemove(*remove, createTracker(remove, BUCKET))).getResult(), api::ReturnCode::Result::OK);
EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId) +
expectedDocEntryString(timestampTwo, testDocId, spi::REMOVE_ENTRY),
dumpBucket(BUCKET_ID));
@@ -172,7 +173,7 @@ TEST_F(TestAndSetTest, conditional_update_not_executed_on_condition_mismatch) {
putTestDocument(false, timestampOne);
auto updateUp = conditional_update_test(false, timestampTwo);
- ASSERT_EQ(fetchResult(thread->asyncHandler().handleUpdate(*updateUp, createTracker(updateUp, BUCKET))).getResult(),
+ ASSERT_EQ(fetchResult(asyncHandler->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))).getResult(),
api::ReturnCode::Result::TEST_AND_SET_CONDITION_FAILED);
EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId), dumpBucket(BUCKET_ID));
@@ -185,7 +186,7 @@ TEST_F(TestAndSetTest, conditional_update_executed_on_condition_match) {
putTestDocument(true, timestampOne);
auto updateUp = conditional_update_test(false, timestampTwo);
- ASSERT_EQ(fetchResult(thread->asyncHandler().handleUpdate(*updateUp, createTracker(updateUp, BUCKET))).getResult(), api::ReturnCode::Result::OK);
+ ASSERT_EQ(fetchResult(asyncHandler->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))).getResult(), api::ReturnCode::Result::OK);
EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId) +
expectedDocEntryString(timestampTwo, testDocId),
dumpBucket(BUCKET_ID));
@@ -197,7 +198,7 @@ TEST_F(TestAndSetTest, conditional_update_not_executed_when_no_document_and_no_a
api::Timestamp updateTimestamp = 200;
auto updateUp = conditional_update_test(false, updateTimestamp);
- ASSERT_EQ(fetchResult(thread->asyncHandler().handleUpdate(*updateUp, createTracker(updateUp, BUCKET))).getResult(),
+ ASSERT_EQ(fetchResult(asyncHandler->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))).getResult(),
api::ReturnCode::Result::TEST_AND_SET_CONDITION_FAILED);
EXPECT_EQ("", dumpBucket(BUCKET_ID));
}
@@ -206,7 +207,7 @@ TEST_F(TestAndSetTest, conditional_update_executed_when_no_document_but_auto_cre
api::Timestamp updateTimestamp = 200;
auto updateUp = conditional_update_test(true, updateTimestamp);
- ASSERT_EQ(fetchResult(thread->asyncHandler().handleUpdate(*updateUp, createTracker(updateUp, BUCKET))).getResult(), api::ReturnCode::Result::OK);
+ ASSERT_EQ(fetchResult(asyncHandler->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))).getResult(), api::ReturnCode::Result::OK);
EXPECT_EQ(expectedDocEntryString(updateTimestamp, testDocId), dumpBucket(BUCKET_ID));
assertTestDocumentFoundAndMatchesContent(NEW_CONTENT);
}
@@ -218,7 +219,7 @@ TEST_F(TestAndSetTest, invalid_document_selection_should_fail) {
auto put = std::make_shared<api::PutCommand>(BUCKET, testDoc, timestamp);
put->setCondition(documentapi::TestAndSetCondition("bjarne"));
- ASSERT_EQ(fetchResult(thread->asyncHandler().handlePut(*put, createTracker(put, BUCKET))).getResult(), api::ReturnCode::Result::ILLEGAL_PARAMETERS);
+ ASSERT_EQ(fetchResult(asyncHandler->handlePut(*put, createTracker(put, BUCKET))).getResult(), api::ReturnCode::Result::ILLEGAL_PARAMETERS);
EXPECT_EQ("", dumpBucket(BUCKET_ID));
}
@@ -228,9 +229,9 @@ TEST_F(TestAndSetTest, conditional_put_to_non_existing_document_should_fail) {
api::Timestamp timestamp = 0;
auto put = std::make_shared<api::PutCommand>(BUCKET, testDoc, timestamp);
setTestCondition(*put);
- thread->asyncHandler().handlePut(*put, createTracker(put, BUCKET));
+ asyncHandler->handlePut(*put, createTracker(put, BUCKET));
- ASSERT_EQ(fetchResult(thread->asyncHandler().handlePut(*put, createTracker(put, BUCKET))).getResult(),
+ ASSERT_EQ(fetchResult(asyncHandler->handlePut(*put, createTracker(put, BUCKET))).getResult(),
api::ReturnCode::Result::TEST_AND_SET_CONDITION_FAILED);
EXPECT_EQ("", dumpBucket(BUCKET_ID));
}
@@ -255,7 +256,7 @@ document::Document::SP
TestAndSetTest::retrieveTestDocument()
{
auto get = std::make_shared<api::GetCommand>(BUCKET, testDocId, document::AllFields::NAME);
- auto tracker = thread->simpleMessageHandler().handleGet(*get, createTracker(get, BUCKET));
+ auto tracker = _persistenceHandler->simpleMessageHandler().handleGet(*get, createTracker(get, BUCKET));
assert(tracker->getResult() == api::ReturnCode::Result::OK);
auto & reply = static_cast<api::GetReply &>(tracker->getReply());
@@ -275,7 +276,7 @@ void TestAndSetTest::putTestDocument(bool matchingHeader, api::Timestamp timesta
}
auto put = std::make_shared<api::PutCommand>(BUCKET, testDoc, timestamp);
- fetchResult(thread->asyncHandler().handlePut(*put, createTracker(put, BUCKET)));
+ fetchResult(asyncHandler->handlePut(*put, createTracker(put, BUCKET)));
}
void TestAndSetTest::assertTestDocumentFoundAndMatchesContent(const document::FieldValue & value)
diff --git a/storage/src/vespa/storage/persistence/CMakeLists.txt b/storage/src/vespa/storage/persistence/CMakeLists.txt
index aa22a67f747..ff8d29f7f45 100644
--- a/storage/src/vespa/storage/persistence/CMakeLists.txt
+++ b/storage/src/vespa/storage/persistence/CMakeLists.txt
@@ -7,6 +7,7 @@ vespa_add_library(storage_spersistence OBJECT
fieldvisitor.cpp
mergehandler.cpp
messages.cpp
+ persistencehandler.cpp
persistencethread.cpp
persistenceutil.cpp
processallhandler.cpp
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
index 4427a2f45e8..44e768c9db7 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
@@ -215,9 +215,6 @@ public:
*/
virtual uint32_t getNumActiveMerges() const = 0;
- /// Provides the next stripe id for a certain disk.
- virtual uint32_t getNextStripeId() = 0;
-
/** Removes the merge status for the given bucket. */
virtual void clearMergeStatus(const document::Bucket&) = 0;
virtual void clearMergeStatus(const document::Bucket&, const api::ReturnCode&) = 0;
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
index 31cf06dfda4..57d09818ae6 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
@@ -48,7 +48,6 @@ FileStorHandlerImpl::FileStorHandlerImpl(uint32_t numThreads, uint32_t numStripe
ServiceLayerComponentRegister& compReg)
: _component(compReg, "filestorhandlerimpl"),
_state(FileStorHandler::AVAILABLE),
- _nextStripeId(0),
_metrics(nullptr),
_stripes(),
_messageSender(sender),
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
index b4fd18bd2e2..6aac8b0474b 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
@@ -208,9 +208,6 @@ public:
void getStatus(std::ostream& out, const framework::HttpUrlPath& path) const override;
uint32_t getQueueSize() const override;
- uint32_t getNextStripeId() override {
- return (_nextStripeId++) % _stripes.size();
- }
std::shared_ptr<FileStorHandler::BucketLockInterface>
lock(const document::Bucket & bucket, api::LockingRequirements lockReq) override {
@@ -236,7 +233,6 @@ public:
private:
ServiceLayerComponent _component;
std::atomic<DiskState> _state;
- uint32_t _nextStripeId;
FileStorDiskMetrics * _metrics;
std::vector<Stripe> _stripes;
MessageSender& _messageSender;
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
index 98b95c16b78..2c7aada01b1 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
@@ -25,6 +25,7 @@ LOG_SETUP(".persistence.filestor.manager");
using std::shared_ptr;
using document::BucketSpace;
+using vespalib::make_string_short::fmt;
namespace storage {
@@ -41,6 +42,7 @@ FileStorManager(const config::ConfigUri & configUri, spi::PersistenceProvider& p
_init_handler(init_handler),
_bucketIdFactory(_component.getBucketIdFactory()),
_configUri(configUri),
+ _persistenceHandlers(),
_threads(),
_bucketOwnershipNotifier(std::make_unique<BucketOwnershipNotifier>(_component, *this)),
_configFetcher(_configUri.getContext()),
@@ -106,6 +108,11 @@ selectSequencer(vespa::config::content::StorFilestorConfig::ResponseSequencerTyp
}
}
+vespalib::string
+createThreadName(size_t stripeId) {
+ return fmt("PersistenceThread-%zu", stripeId);
+}
+
}
/**
* If live configuration, assuming storageserver makes sure no messages are
@@ -133,8 +140,14 @@ FileStorManager::configure(std::unique_ptr<vespa::config::content::StorFilestorC
assert(_sequencedExecutor);
LOG(spam, "Setting up the disk");
for (uint32_t j = 0; j < numThreads; j++) {
- _threads.push_back(std::make_shared<PersistenceThread>(*_sequencedExecutor, _compReg, *_config, *_provider,
- *_filestorHandler, *_bucketOwnershipNotifier, *_metrics->disks[0]->threads[j]));
+ _persistenceComponents.push_back(std::make_unique<ServiceLayerComponent>(_compReg, createThreadName(j)));
+ _persistenceHandlers.push_back(
+ std::make_unique<PersistenceHandler>(*_sequencedExecutor,
+ *_persistenceComponents.back(),
+ *_config, *_provider, *_filestorHandler,
+ *_bucketOwnershipNotifier, *_metrics->disks[0]->threads[j]));
+ _threads.push_back(std::make_unique<PersistenceThread>(*_persistenceHandlers.back(), *_filestorHandler,
+ j % numStripes, _component));
}
}
}
@@ -436,8 +449,8 @@ FileStorManager::onDeleteBucket(const shared_ptr<api::DeleteBucketCommand>& cmd)
}
_filestorHandler->failOperations(cmd->getBucket(),
api::ReturnCode(api::ReturnCode::BUCKET_DELETED,
- vespalib::make_string("Bucket %s about to be deleted anyway",
- cmd->getBucketId().toString().c_str())));
+ fmt("Bucket %s about to be deleted anyway",
+ cmd->getBucketId().toString().c_str())));
return true;
}
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.h b/storage/src/vespa/storage/persistence/filestorage/filestormanager.h
index 85cbbe57d21..aa9e7860a22 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.h
@@ -41,6 +41,7 @@ class ReadBucketList;
class BucketOwnershipNotifier;
class AbortBucketOperationsCommand;
struct DoneInitializeHandler;
+class PersistenceHandler;
class FileStorManager : public StorageLinkQueued,
public framework::HtmlStatusReporter,
@@ -58,7 +59,9 @@ class FileStorManager : public StorageLinkQueued,
const document::BucketIdFactory& _bucketIdFactory;
config::ConfigUri _configUri;
- std::vector<DiskThread::SP> _threads;
+ std::vector<std::unique_ptr<ServiceLayerComponent>> _persistenceComponents;
+ std::vector<std::unique_ptr<PersistenceHandler>> _persistenceHandlers;
+ std::vector<std::unique_ptr<DiskThread>> _threads;
std::unique_ptr<BucketOwnershipNotifier> _bucketOwnershipNotifier;
std::unique_ptr<vespa::config::content::StorFilestorConfig> _config;
diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp
index 4eb002833da..ec71aee7eed 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.cpp
+++ b/storage/src/vespa/storage/persistence/mergehandler.cpp
@@ -111,7 +111,7 @@ MergeHandler::populateMetaData(
const spi::Bucket& bucket,
Timestamp maxTimestamp,
std::vector<spi::DocEntry::UP>& entries,
- spi::Context& context)
+ spi::Context& context) const
{
spi::DocumentSelection docSel("");
@@ -162,7 +162,7 @@ MergeHandler::buildBucketInfoList(
Timestamp maxTimestamp,
uint8_t myNodeIndex,
std::vector<api::GetBucketDiffCommand::Entry>& output,
- spi::Context& context)
+ spi::Context& context) const
{
assert(output.size() == 0);
assert(myNodeIndex < 16);
@@ -336,7 +336,7 @@ MergeHandler::fetchLocalData(
const documentapi::LoadType& /*loadType*/,
std::vector<api::ApplyBucketDiffCommand::Entry>& diff,
uint8_t nodeIndex,
- spi::Context& context)
+ spi::Context& context) const
{
uint32_t nodeMask = 1 << nodeIndex;
// Preload documents in memory
@@ -497,7 +497,7 @@ void
MergeHandler::applyDiffEntry(const spi::Bucket& bucket,
const api::ApplyBucketDiffCommand::Entry& e,
spi::Context& context,
- const document::DocumentTypeRepo& repo)
+ const document::DocumentTypeRepo& repo) const
{
spi::Timestamp timestamp(e._entry._timestamp);
if (!(e._entry._flags & (DELETED | DELETED_IN_PLACE))) {
@@ -524,7 +524,7 @@ MergeHandler::applyDiffLocally(
const documentapi::LoadType& /*loadType*/,
std::vector<api::ApplyBucketDiffCommand::Entry>& diff,
uint8_t nodeIndex,
- spi::Context& context)
+ spi::Context& context) const
{
// Sort the data to apply by which file they should be added to
LOG(spam, "Merge(%s): Applying data locally. Diff has %zu entries",
@@ -683,7 +683,7 @@ namespace {
api::StorageReply::SP
MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status,
- MessageSender& sender, spi::Context& context)
+ MessageSender& sender, spi::Context& context) const
{
// If last action failed, fail the whole merge
if (status.reply->getResult().failed()) {
@@ -845,7 +845,7 @@ public:
};
MessageTracker::UP
-MergeHandler::handleMergeBucket(api::MergeBucketCommand& cmd, MessageTracker::UP tracker)
+MergeHandler::handleMergeBucket(api::MergeBucketCommand& cmd, MessageTracker::UP tracker) const
{
tracker->setMetric(_env._metrics.mergeBuckets);
@@ -1056,7 +1056,7 @@ namespace {
}
MessageTracker::UP
-MergeHandler::handleGetBucketDiff(api::GetBucketDiffCommand& cmd, MessageTracker::UP tracker)
+MergeHandler::handleGetBucketDiff(api::GetBucketDiffCommand& cmd, MessageTracker::UP tracker) const
{
tracker->setMetric(_env._metrics.getBucketDiff);
spi::Bucket bucket(cmd.getBucket());
@@ -1167,7 +1167,7 @@ namespace {
} // End of anonymous namespace
void
-MergeHandler::handleGetBucketDiffReply(api::GetBucketDiffReply& reply, MessageSender& sender)
+MergeHandler::handleGetBucketDiffReply(api::GetBucketDiffReply& reply, MessageSender& sender) const
{
_env._metrics.getBucketDiffReply.inc();
spi::Bucket bucket(reply.getBucket());
@@ -1240,7 +1240,7 @@ MergeHandler::handleGetBucketDiffReply(api::GetBucketDiffReply& reply, MessageSe
}
MessageTracker::UP
-MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTracker::UP tracker)
+MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTracker::UP tracker) const
{
tracker->setMetric(_env._metrics.applyBucketDiff);
@@ -1269,8 +1269,7 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTra
}
if (applyDiffHasLocallyNeededData(cmd.getDiff(), index)) {
framework::MilliSecTimer startTime(_clock);
- (void) applyDiffLocally(bucket, cmd.getLoadType(),
- cmd.getDiff(), index, tracker->context());
+ (void) applyDiffLocally(bucket, cmd.getLoadType(), cmd.getDiff(), index, tracker->context());
_env._metrics.merge_handler_metrics.mergeDataWriteLatency.addValue(
startTime.getElapsedTimeAsDouble());
} else {
@@ -1328,7 +1327,7 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTra
}
void
-MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,MessageSender& sender)
+MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,MessageSender& sender) const
{
_env._metrics.applyBucketDiffReply.inc();
spi::Bucket bucket(reply.getBucket());
diff --git a/storage/src/vespa/storage/persistence/mergehandler.h b/storage/src/vespa/storage/persistence/mergehandler.h
index 9f74a6b93df..af2f765aed5 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.h
+++ b/storage/src/vespa/storage/persistence/mergehandler.h
@@ -45,24 +45,24 @@ public:
Timestamp maxTimestamp,
uint8_t myNodeIndex,
std::vector<api::GetBucketDiffCommand::Entry>& output,
- spi::Context& context);
+ spi::Context& context) const;
void fetchLocalData(const spi::Bucket& bucket,
const documentapi::LoadType&,
std::vector<api::ApplyBucketDiffCommand::Entry>& diff,
uint8_t nodeIndex,
- spi::Context& context);
+ spi::Context& context) const;
api::BucketInfo applyDiffLocally(
const spi::Bucket& bucket,
const documentapi::LoadType&,
std::vector<api::ApplyBucketDiffCommand::Entry>& diff,
uint8_t nodeIndex,
- spi::Context& context);
+ spi::Context& context) const;
- MessageTrackerUP handleMergeBucket(api::MergeBucketCommand&, MessageTrackerUP);
- MessageTrackerUP handleGetBucketDiff(api::GetBucketDiffCommand&, MessageTrackerUP);
- void handleGetBucketDiffReply(api::GetBucketDiffReply&, MessageSender&);
- MessageTrackerUP handleApplyBucketDiff(api::ApplyBucketDiffCommand&, MessageTrackerUP);
- void handleApplyBucketDiffReply(api::ApplyBucketDiffReply&, MessageSender&);
+ MessageTrackerUP handleMergeBucket(api::MergeBucketCommand&, MessageTrackerUP) const;
+ MessageTrackerUP handleGetBucketDiff(api::GetBucketDiffCommand&, MessageTrackerUP) const;
+ void handleGetBucketDiffReply(api::GetBucketDiffReply&, MessageSender&) const;
+ MessageTrackerUP handleApplyBucketDiff(api::ApplyBucketDiffCommand&, MessageTrackerUP) const;
+ void handleApplyBucketDiffReply(api::ApplyBucketDiffReply&, MessageSender&) const;
private:
const framework::Clock &_clock;
@@ -77,7 +77,7 @@ private:
api::StorageReply::SP processBucketMerge(const spi::Bucket& bucket,
MergeStatus& status,
MessageSender& sender,
- spi::Context& context);
+ spi::Context& context) const;
/**
* Invoke either put, remove or unrevertable remove on the SPI
@@ -86,7 +86,7 @@ private:
void applyDiffEntry(const spi::Bucket&,
const api::ApplyBucketDiffCommand::Entry&,
spi::Context& context,
- const document::DocumentTypeRepo& repo);
+ const document::DocumentTypeRepo& repo) const;
/**
* Fill entries-vector with metadata for bucket up to maxTimestamp,
@@ -96,7 +96,7 @@ private:
void populateMetaData(const spi::Bucket&,
Timestamp maxTimestamp,
std::vector<spi::DocEntry::UP>& entries,
- spi::Context& context);
+ spi::Context& context) const;
Document::UP deserializeDiffDocument(
const api::ApplyBucketDiffCommand::Entry& e,
diff --git a/storage/src/vespa/storage/persistence/persistencehandler.cpp b/storage/src/vespa/storage/persistence/persistencehandler.cpp
new file mode 100644
index 00000000000..090dda9f408
--- /dev/null
+++ b/storage/src/vespa/storage/persistence/persistencehandler.cpp
@@ -0,0 +1,162 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "persistencehandler.h"
+
+#include <vespa/storage/common/bucketoperationlogger.h>
+#include <vespa/document/fieldset/fieldsetrepo.h>
+#include <vespa/document/base/exceptions.h>
+#include <vespa/vespalib/util/exceptions.h>
+#include <vespa/vespalib/util/isequencedtaskexecutor.h>
+#include <thread>
+
+#include <vespa/log/log.h>
+LOG_SETUP(".persistence.thread");
+
+using vespalib::make_string_short::fmt;
+using to_str = vespalib::string;
+
+namespace storage {
+
+PersistenceHandler::PersistenceHandler(vespalib::ISequencedTaskExecutor & sequencedExecutor,
+ ServiceLayerComponent & component,
+ const vespa::config::content::StorFilestorConfig & cfg,
+ spi::PersistenceProvider& provider,
+ FileStorHandler& filestorHandler,
+ BucketOwnershipNotifier & bucketOwnershipNotifier,
+ FileStorThreadMetrics& metrics)
+ :
+ _env(component, filestorHandler, metrics, provider),
+ _spi(provider),
+ _processAllHandler(_env, provider),
+ _mergeHandler(_env, _spi, cfg.bucketMergeChunkSize,
+ cfg.enableMergeLocalNodeChooseDocsOptimalization,
+ cfg.commonMergeChainOptimalizationMinimumSize),
+ _asyncHandler(_env, _spi, sequencedExecutor),
+ _splitJoinHandler(_env, provider, bucketOwnershipNotifier, cfg.enableMultibitSplitOptimalization),
+ _simpleHandler(_env, provider)
+{
+}
+
+PersistenceHandler::~PersistenceHandler() = default;
+
+MessageTracker::UP
+PersistenceHandler::handleCommandSplitByType(api::StorageCommand& msg, MessageTracker::UP tracker) const
+{
+ switch (msg.getType().getId()) {
+ case api::MessageType::GET_ID:
+ return _simpleHandler.handleGet(static_cast<api::GetCommand&>(msg), std::move(tracker));
+ case api::MessageType::PUT_ID:
+ return _asyncHandler.handlePut(static_cast<api::PutCommand&>(msg), std::move(tracker));
+ case api::MessageType::REMOVE_ID:
+ return _asyncHandler.handleRemove(static_cast<api::RemoveCommand&>(msg), std::move(tracker));
+ case api::MessageType::UPDATE_ID:
+ return _asyncHandler.handleUpdate(static_cast<api::UpdateCommand&>(msg), std::move(tracker));
+ case api::MessageType::REVERT_ID:
+ return _simpleHandler.handleRevert(static_cast<api::RevertCommand&>(msg), std::move(tracker));
+ case api::MessageType::CREATEBUCKET_ID:
+ return _simpleHandler.handleCreateBucket(static_cast<api::CreateBucketCommand&>(msg), std::move(tracker));
+ case api::MessageType::DELETEBUCKET_ID:
+ return _simpleHandler.handleDeleteBucket(static_cast<api::DeleteBucketCommand&>(msg), std::move(tracker));
+ case api::MessageType::JOINBUCKETS_ID:
+ return _splitJoinHandler.handleJoinBuckets(static_cast<api::JoinBucketsCommand&>(msg), std::move(tracker));
+ case api::MessageType::SPLITBUCKET_ID:
+ return _splitJoinHandler.handleSplitBucket(static_cast<api::SplitBucketCommand&>(msg), std::move(tracker));
+ // Depends on iterators
+ case api::MessageType::STATBUCKET_ID:
+ return _processAllHandler.handleStatBucket(static_cast<api::StatBucketCommand&>(msg), std::move(tracker));
+ case api::MessageType::REMOVELOCATION_ID:
+ return _processAllHandler.handleRemoveLocation(static_cast<api::RemoveLocationCommand&>(msg), std::move(tracker));
+ case api::MessageType::MERGEBUCKET_ID:
+ return _mergeHandler.handleMergeBucket(static_cast<api::MergeBucketCommand&>(msg), std::move(tracker));
+ case api::MessageType::GETBUCKETDIFF_ID:
+ return _mergeHandler.handleGetBucketDiff(static_cast<api::GetBucketDiffCommand&>(msg), std::move(tracker));
+ case api::MessageType::APPLYBUCKETDIFF_ID:
+ return _mergeHandler.handleApplyBucketDiff(static_cast<api::ApplyBucketDiffCommand&>(msg), std::move(tracker));
+ case api::MessageType::SETBUCKETSTATE_ID:
+ return _splitJoinHandler.handleSetBucketState(static_cast<api::SetBucketStateCommand&>(msg), std::move(tracker));
+ case api::MessageType::INTERNAL_ID:
+ switch(static_cast<api::InternalCommand&>(msg).getType()) {
+ case GetIterCommand::ID:
+ return _simpleHandler.handleGetIter(static_cast<GetIterCommand&>(msg), std::move(tracker));
+ case CreateIteratorCommand::ID:
+ return _simpleHandler.handleCreateIterator(static_cast<CreateIteratorCommand&>(msg), std::move(tracker));
+ case ReadBucketList::ID:
+ return _simpleHandler.handleReadBucketList(static_cast<ReadBucketList&>(msg), std::move(tracker));
+ case ReadBucketInfo::ID:
+ return _simpleHandler.handleReadBucketInfo(static_cast<ReadBucketInfo&>(msg), std::move(tracker));
+ case InternalBucketJoinCommand::ID:
+ return _splitJoinHandler.handleInternalBucketJoin(static_cast<InternalBucketJoinCommand&>(msg), std::move(tracker));
+ case RecheckBucketInfoCommand::ID:
+ return _splitJoinHandler.handleRecheckBucketInfo(static_cast<RecheckBucketInfoCommand&>(msg), std::move(tracker));
+ default:
+ LOG(warning, "Persistence thread received unhandled internal command %s", msg.toString().c_str());
+ break;
+ }
+ default:
+ break;
+ }
+ return MessageTracker::UP();
+}
+
+void
+PersistenceHandler::handleReply(api::StorageReply& reply) const
+{
+ switch (reply.getType().getId()) {
+ case api::MessageType::GETBUCKETDIFF_REPLY_ID:
+ _mergeHandler.handleGetBucketDiffReply(static_cast<api::GetBucketDiffReply&>(reply), _env._fileStorHandler);
+ break;
+ case api::MessageType::APPLYBUCKETDIFF_REPLY_ID:
+ _mergeHandler.handleApplyBucketDiffReply(static_cast<api::ApplyBucketDiffReply&>(reply), _env._fileStorHandler);
+ break;
+ default:
+ break;
+ }
+}
+
+MessageTracker::UP
+PersistenceHandler::processMessage(api::StorageMessage& msg, MessageTracker::UP tracker) const
+{
+ MBUS_TRACE(msg.getTrace(), 5, "PersistenceHandler: Processing message in persistence layer");
+
+ _env._metrics.operations.inc();
+ if (msg.getType().isReply()) {
+ try{
+ LOG(debug, "Handling reply: %s", msg.toString().c_str());
+ LOG(spam, "Message content: %s", msg.toString(true).c_str());
+ handleReply(static_cast<api::StorageReply&>(msg));
+ } catch (std::exception& e) {
+ // It's a reply, so nothing we can do.
+ LOG(debug, "Caught exception for %s: %s", msg.toString().c_str(), e.what());
+ }
+ } else {
+ auto & initiatingCommand = static_cast<api::StorageCommand&>(msg);
+ try {
+ LOG(debug, "Handling command: %s", msg.toString().c_str());
+ LOG(spam, "Message content: %s", msg.toString(true).c_str());
+ return handleCommandSplitByType(initiatingCommand, std::move(tracker));
+ } catch (std::exception& e) {
+ LOG(debug, "Caught exception for %s: %s", msg.toString().c_str(), e.what());
+ api::StorageReply::SP reply(initiatingCommand.makeReply());
+ reply->setResult(api::ReturnCode(api::ReturnCode::INTERNAL_FAILURE, e.what()));
+ _env._fileStorHandler.sendReply(reply);
+ }
+ }
+
+ return tracker;
+}
+
+void
+PersistenceHandler::processLockedMessage(FileStorHandler::LockedMessage lock) const {
+ LOG(debug, "NodeIndex %d, ptr=%p", _env._nodeIndex, lock.second.get());
+ api::StorageMessage & msg(*lock.second);
+
+ // Important: we _copy_ the message shared_ptr instead of moving to ensure that `msg` remains
+ // valid even if the tracker is destroyed by an exception in processMessage().
+ auto tracker = std::make_unique<MessageTracker>(_env, _env._fileStorHandler, std::move(lock.first), lock.second);
+ tracker = processMessage(msg, std::move(tracker));
+ if (tracker) {
+ tracker->sendReply();
+ }
+}
+
+}
diff --git a/storage/src/vespa/storage/persistence/persistencehandler.h b/storage/src/vespa/storage/persistence/persistencehandler.h
new file mode 100644
index 00000000000..7453be1173a
--- /dev/null
+++ b/storage/src/vespa/storage/persistence/persistencehandler.h
@@ -0,0 +1,50 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "processallhandler.h"
+#include "mergehandler.h"
+#include "asynchandler.h"
+#include "persistenceutil.h"
+#include "provider_error_wrapper.h"
+#include "splitjoinhandler.h"
+#include "simplemessagehandler.h"
+#include <vespa/storage/common/storagecomponent.h>
+#include <vespa/vespalib/util/isequencedtaskexecutor.h>
+#include <vespa/config-stor-filestor.h>
+
+namespace storage {
+
+class BucketOwnershipNotifier;
+
+class PersistenceHandler : public Types
+{
+public:
+ PersistenceHandler(vespalib::ISequencedTaskExecutor &, ServiceLayerComponent & component,
+ const vespa::config::content::StorFilestorConfig &, spi::PersistenceProvider &,
+ FileStorHandler &, BucketOwnershipNotifier &, FileStorThreadMetrics&);
+ ~PersistenceHandler();
+
+ void processLockedMessage(FileStorHandler::LockedMessage lock) const;
+
+ //TODO Rewrite tests to avoid this api leak
+ const AsyncHandler & asyncHandler() const { return _asyncHandler; }
+ const SplitJoinHandler & splitjoinHandler() const { return _splitJoinHandler; }
+ const SimpleMessageHandler & simpleMessageHandler() const { return _simpleHandler; }
+private:
+ // Message handling functions
+ MessageTracker::UP handleCommandSplitByType(api::StorageCommand&, MessageTracker::UP tracker) const;
+ void handleReply(api::StorageReply&) const;
+
+ MessageTracker::UP processMessage(api::StorageMessage& msg, MessageTracker::UP tracker) const;
+
+ PersistenceUtil _env;
+ spi::PersistenceProvider& _spi;
+ ProcessAllHandler _processAllHandler;
+ MergeHandler _mergeHandler;
+ AsyncHandler _asyncHandler;
+ SplitJoinHandler _splitJoinHandler;
+ SimpleMessageHandler _simpleHandler;
+};
+
+} // storage
diff --git a/storage/src/vespa/storage/persistence/persistencethread.cpp b/storage/src/vespa/storage/persistence/persistencethread.cpp
index 719c2130e1a..623cc8a4372 100644
--- a/storage/src/vespa/storage/persistence/persistencethread.cpp
+++ b/storage/src/vespa/storage/persistence/persistencethread.cpp
@@ -1,55 +1,21 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "persistencethread.h"
-#include "splitbitdetector.h"
-#include "bucketownershipnotifier.h"
-#include "testandsethelper.h"
-#include <vespa/storageapi/message/bucketsplitting.h>
-#include <vespa/storage/common/bucketoperationlogger.h>
-#include <vespa/document/fieldset/fieldsetrepo.h>
-#include <vespa/document/base/exceptions.h>
-#include <vespa/vespalib/util/exceptions.h>
-#include <vespa/vespalib/util/isequencedtaskexecutor.h>
#include <thread>
#include <vespa/log/log.h>
LOG_SETUP(".persistence.thread");
-using vespalib::make_string_short::fmt;
-using to_str = vespalib::string;
-
namespace storage {
-namespace {
-
-vespalib::string
-createThreadName(size_t stripeId) {
- return fmt("PersistenceThread-%zu", stripeId);
-}
-
-}
-
-PersistenceThread::PersistenceThread(vespalib::ISequencedTaskExecutor & sequencedExecutor,
- ServiceLayerComponentRegister& compReg,
- const vespa::config::content::StorFilestorConfig & cfg,
- spi::PersistenceProvider& provider,
- FileStorHandler& filestorHandler,
- BucketOwnershipNotifier & bucketOwnershipNotifier,
- FileStorThreadMetrics& metrics)
- : _stripeId(filestorHandler.getNextStripeId()),
- _component(std::make_unique<ServiceLayerComponent>(compReg, createThreadName(_stripeId))),
- _env(*_component, filestorHandler, metrics, provider),
- _spi(provider),
- _processAllHandler(_env, provider),
- _mergeHandler(_env, _spi, cfg.bucketMergeChunkSize,
- cfg.enableMergeLocalNodeChooseDocsOptimalization,
- cfg.commonMergeChainOptimalizationMinimumSize),
- _asyncHandler(_env, _spi, sequencedExecutor),
- _splitJoinHandler(_env, provider, bucketOwnershipNotifier, cfg.enableMultibitSplitOptimalization),
- _simpleHandler(_env, provider),
+PersistenceThread::PersistenceThread(PersistenceHandler & persistenceHandler, FileStorHandler & fileStorHandler,
+ uint32_t stripeId, framework::Component & component)
+ : _persistenceHandler(persistenceHandler),
+ _fileStorHandler(fileStorHandler),
+ _stripeId(stripeId),
_thread()
{
- _thread = _component->startThread(*this, 60s, 1s);
+ _thread = component.startThread(*this, 60s, 1s);
}
PersistenceThread::~PersistenceThread()
@@ -61,138 +27,18 @@ PersistenceThread::~PersistenceThread()
LOG(debug, "Persistence thread done with destruction");
}
-MessageTracker::UP
-PersistenceThread::handleCommandSplitByType(api::StorageCommand& msg, MessageTracker::UP tracker)
-{
- switch (msg.getType().getId()) {
- case api::MessageType::GET_ID:
- return _simpleHandler.handleGet(static_cast<api::GetCommand&>(msg), std::move(tracker));
- case api::MessageType::PUT_ID:
- return _asyncHandler.handlePut(static_cast<api::PutCommand&>(msg), std::move(tracker));
- case api::MessageType::REMOVE_ID:
- return _asyncHandler.handleRemove(static_cast<api::RemoveCommand&>(msg), std::move(tracker));
- case api::MessageType::UPDATE_ID:
- return _asyncHandler.handleUpdate(static_cast<api::UpdateCommand&>(msg), std::move(tracker));
- case api::MessageType::REVERT_ID:
- return _simpleHandler.handleRevert(static_cast<api::RevertCommand&>(msg), std::move(tracker));
- case api::MessageType::CREATEBUCKET_ID:
- return _simpleHandler.handleCreateBucket(static_cast<api::CreateBucketCommand&>(msg), std::move(tracker));
- case api::MessageType::DELETEBUCKET_ID:
- return _simpleHandler.handleDeleteBucket(static_cast<api::DeleteBucketCommand&>(msg), std::move(tracker));
- case api::MessageType::JOINBUCKETS_ID:
- return _splitJoinHandler.handleJoinBuckets(static_cast<api::JoinBucketsCommand&>(msg), std::move(tracker));
- case api::MessageType::SPLITBUCKET_ID:
- return _splitJoinHandler.handleSplitBucket(static_cast<api::SplitBucketCommand&>(msg), std::move(tracker));
- // Depends on iterators
- case api::MessageType::STATBUCKET_ID:
- return _processAllHandler.handleStatBucket(static_cast<api::StatBucketCommand&>(msg), std::move(tracker));
- case api::MessageType::REMOVELOCATION_ID:
- return _processAllHandler.handleRemoveLocation(static_cast<api::RemoveLocationCommand&>(msg), std::move(tracker));
- case api::MessageType::MERGEBUCKET_ID:
- return _mergeHandler.handleMergeBucket(static_cast<api::MergeBucketCommand&>(msg), std::move(tracker));
- case api::MessageType::GETBUCKETDIFF_ID:
- return _mergeHandler.handleGetBucketDiff(static_cast<api::GetBucketDiffCommand&>(msg), std::move(tracker));
- case api::MessageType::APPLYBUCKETDIFF_ID:
- return _mergeHandler.handleApplyBucketDiff(static_cast<api::ApplyBucketDiffCommand&>(msg), std::move(tracker));
- case api::MessageType::SETBUCKETSTATE_ID:
- return _splitJoinHandler.handleSetBucketState(static_cast<api::SetBucketStateCommand&>(msg), std::move(tracker));
- case api::MessageType::INTERNAL_ID:
- switch(static_cast<api::InternalCommand&>(msg).getType()) {
- case GetIterCommand::ID:
- return _simpleHandler.handleGetIter(static_cast<GetIterCommand&>(msg), std::move(tracker));
- case CreateIteratorCommand::ID:
- return _simpleHandler.handleCreateIterator(static_cast<CreateIteratorCommand&>(msg), std::move(tracker));
- case ReadBucketList::ID:
- return _simpleHandler.handleReadBucketList(static_cast<ReadBucketList&>(msg), std::move(tracker));
- case ReadBucketInfo::ID:
- return _simpleHandler.handleReadBucketInfo(static_cast<ReadBucketInfo&>(msg), std::move(tracker));
- case InternalBucketJoinCommand::ID:
- return _splitJoinHandler.handleInternalBucketJoin(static_cast<InternalBucketJoinCommand&>(msg), std::move(tracker));
- case RecheckBucketInfoCommand::ID:
- return _splitJoinHandler.handleRecheckBucketInfo(static_cast<RecheckBucketInfoCommand&>(msg), std::move(tracker));
- default:
- LOG(warning, "Persistence thread received unhandled internal command %s", msg.toString().c_str());
- break;
- }
- default:
- break;
- }
- return MessageTracker::UP();
-}
-
-void
-PersistenceThread::handleReply(api::StorageReply& reply)
-{
- switch (reply.getType().getId()) {
- case api::MessageType::GETBUCKETDIFF_REPLY_ID:
- _mergeHandler.handleGetBucketDiffReply(static_cast<api::GetBucketDiffReply&>(reply), _env._fileStorHandler);
- break;
- case api::MessageType::APPLYBUCKETDIFF_REPLY_ID:
- _mergeHandler.handleApplyBucketDiffReply(static_cast<api::ApplyBucketDiffReply&>(reply), _env._fileStorHandler);
- break;
- default:
- break;
- }
-}
-
-MessageTracker::UP
-PersistenceThread::processMessage(api::StorageMessage& msg, MessageTracker::UP tracker)
-{
- MBUS_TRACE(msg.getTrace(), 5, "PersistenceThread: Processing message in persistence layer");
-
- _env._metrics.operations.inc();
- if (msg.getType().isReply()) {
- try{
- LOG(debug, "Handling reply: %s", msg.toString().c_str());
- LOG(spam, "Message content: %s", msg.toString(true).c_str());
- handleReply(static_cast<api::StorageReply&>(msg));
- } catch (std::exception& e) {
- // It's a reply, so nothing we can do.
- LOG(debug, "Caught exception for %s: %s", msg.toString().c_str(), e.what());
- }
- } else {
- auto & initiatingCommand = static_cast<api::StorageCommand&>(msg);
- try {
- LOG(debug, "Handling command: %s", msg.toString().c_str());
- LOG(spam, "Message content: %s", msg.toString(true).c_str());
- return handleCommandSplitByType(initiatingCommand, std::move(tracker));
- } catch (std::exception& e) {
- LOG(debug, "Caught exception for %s: %s", msg.toString().c_str(), e.what());
- api::StorageReply::SP reply(initiatingCommand.makeReply());
- reply->setResult(api::ReturnCode(api::ReturnCode::INTERNAL_FAILURE, e.what()));
- _env._fileStorHandler.sendReply(reply);
- }
- }
-
- return tracker;
-}
-
-void
-PersistenceThread::processLockedMessage(FileStorHandler::LockedMessage lock) {
- LOG(debug, "NodeIndex %d, ptr=%p", _env._nodeIndex, lock.second.get());
- api::StorageMessage & msg(*lock.second);
-
- // Important: we _copy_ the message shared_ptr instead of moving to ensure that `msg` remains
- // valid even if the tracker is destroyed by an exception in processMessage().
- auto tracker = std::make_unique<MessageTracker>(_env, _env._fileStorHandler, std::move(lock.first), lock.second);
- tracker = processMessage(msg, std::move(tracker));
- if (tracker) {
- tracker->sendReply();
- }
-}
-
void
PersistenceThread::run(framework::ThreadHandle& thread)
{
LOG(debug, "Started persistence thread");
- while (!thread.interrupted() && !_env._fileStorHandler.closed()) {
+ while (!thread.interrupted() && !_fileStorHandler.closed()) {
thread.registerTick();
- FileStorHandler::LockedMessage lock(_env._fileStorHandler.getNextMessage(_stripeId));
+ FileStorHandler::LockedMessage lock(_fileStorHandler.getNextMessage(_stripeId));
if (lock.first) {
- processLockedMessage(std::move(lock));
+ _persistenceHandler.processLockedMessage(std::move(lock));
}
}
LOG(debug, "Closing down persistence thread");
@@ -202,7 +48,7 @@ void
PersistenceThread::flush()
{
//TODO Only need to check for this stripe.
- while (_env._fileStorHandler.getQueueSize() != 0) {
+ while (_fileStorHandler.getQueueSize() != 0) {
std::this_thread::sleep_for(1ms);
}
}
diff --git a/storage/src/vespa/storage/persistence/persistencethread.h b/storage/src/vespa/storage/persistence/persistencethread.h
index 917812868a7..19eb0811351 100644
--- a/storage/src/vespa/storage/persistence/persistencethread.h
+++ b/storage/src/vespa/storage/persistence/persistencethread.h
@@ -3,18 +3,7 @@
#pragma once
#include "diskthread.h"
-#include "processallhandler.h"
-#include "mergehandler.h"
-#include "asynchandler.h"
-#include "persistenceutil.h"
-#include "provider_error_wrapper.h"
-#include "splitjoinhandler.h"
-#include "simplemessagehandler.h"
-#include <vespa/storage/common/bucketmessages.h>
-#include <vespa/storage/common/storagecomponent.h>
-#include <vespa/storage/common/statusmessages.h>
-#include <vespa/vespalib/util/isequencedtaskexecutor.h>
-#include <vespa/config-stor-filestor.h>
+#include "persistencehandler.h"
namespace storage {
@@ -23,39 +12,20 @@ class BucketOwnershipNotifier;
class PersistenceThread final : public DiskThread, public Types
{
public:
- PersistenceThread(vespalib::ISequencedTaskExecutor &, ServiceLayerComponentRegister &,
- const vespa::config::content::StorFilestorConfig &, spi::PersistenceProvider &,
- FileStorHandler &, BucketOwnershipNotifier &, FileStorThreadMetrics&);
+ PersistenceThread(PersistenceHandler & handler, FileStorHandler & fileStorHandler,
+ uint32_t stripeId, framework::Component & component);
~PersistenceThread() override;
/** Waits for current operation to be finished. */
void flush() override;
framework::Thread& getThread() override { return *_thread; }
- //TODO Rewrite tests to avoid this api leak
- const AsyncHandler & asyncHandler() const { return _asyncHandler; }
- const SplitJoinHandler & splitjoinHandler() const { return _splitJoinHandler; }
- const SimpleMessageHandler & simpleMessageHandler() const { return _simpleHandler; }
private:
- uint32_t _stripeId;
- ServiceLayerComponent::UP _component;
- PersistenceUtil _env;
- spi::PersistenceProvider& _spi;
- ProcessAllHandler _processAllHandler;
- MergeHandler _mergeHandler;
- AsyncHandler _asyncHandler;
- SplitJoinHandler _splitJoinHandler;
- SimpleMessageHandler _simpleHandler;
- framework::Thread::UP _thread;
-
- // Message handling functions
- MessageTracker::UP handleCommandSplitByType(api::StorageCommand&, MessageTracker::UP tracker);
- void handleReply(api::StorageReply&);
-
- MessageTracker::UP processMessage(api::StorageMessage& msg, MessageTracker::UP tracker);
- void processLockedMessage(FileStorHandler::LockedMessage lock);
-
- // Thread main loop
+ PersistenceHandler & _persistenceHandler;
+ FileStorHandler & _fileStorHandler;
+ uint32_t _stripeId;
+ framework::Thread::UP _thread;
+
void run(framework::ThreadHandle&) override;
};
diff --git a/storage/src/vespa/storage/persistence/persistenceutil.cpp b/storage/src/vespa/storage/persistence/persistenceutil.cpp
index 35842297ef9..4ab3cdc5d01 100644
--- a/storage/src/vespa/storage/persistence/persistenceutil.cpp
+++ b/storage/src/vespa/storage/persistence/persistenceutil.cpp
@@ -25,13 +25,13 @@ namespace {
const vespalib::duration WARN_ON_SLOW_OPERATIONS = 5s;
}
-MessageTracker::MessageTracker(PersistenceUtil & env,
+MessageTracker::MessageTracker(const PersistenceUtil & env,
MessageSender & replySender,
FileStorHandler::BucketLockInterface::SP bucketLock,
api::StorageMessage::SP msg)
: MessageTracker(env, replySender, true, std::move(bucketLock), std::move(msg))
{}
-MessageTracker::MessageTracker(PersistenceUtil & env,
+MessageTracker::MessageTracker(const PersistenceUtil & env,
MessageSender & replySender,
bool updateBucketInfo,
FileStorHandler::BucketLockInterface::SP bucketLock,
@@ -225,7 +225,7 @@ PersistenceUtil::lockAndGetDisk(const document::Bucket &bucket,
}
void
-PersistenceUtil::setBucketInfo(MessageTracker& tracker, const document::Bucket &bucket)
+PersistenceUtil::setBucketInfo(MessageTracker& tracker, const document::Bucket &bucket) const
{
api::BucketInfo info = getBucketInfo(bucket);
diff --git a/storage/src/vespa/storage/persistence/persistenceutil.h b/storage/src/vespa/storage/persistence/persistenceutil.h
index c90eee4b7ae..ffce25b1e49 100644
--- a/storage/src/vespa/storage/persistence/persistenceutil.h
+++ b/storage/src/vespa/storage/persistence/persistenceutil.h
@@ -17,7 +17,7 @@ class MessageTracker : protected Types {
public:
typedef std::unique_ptr<MessageTracker> UP;
- MessageTracker(PersistenceUtil & env, MessageSender & replySender,
+ MessageTracker(const PersistenceUtil & env, MessageSender & replySender,
FileStorHandler::BucketLockInterface::SP bucketLock, api::StorageMessage::SP msg);
~MessageTracker();
@@ -74,7 +74,7 @@ public:
FileStorHandler::BucketLockInterface::SP bucketLock, api::StorageMessage::SP msg);
private:
- MessageTracker(PersistenceUtil & env, MessageSender & replySender, bool updateBucketInfo,
+ MessageTracker(const PersistenceUtil & env, MessageSender & replySender, bool updateBucketInfo,
FileStorHandler::BucketLockInterface::SP bucketLock, api::StorageMessage::SP msg);
[[nodiscard]] bool count_result_as_failure() const noexcept;
@@ -84,7 +84,7 @@ private:
FileStorHandler::BucketLockInterface::SP _bucketLock;
api::StorageMessage::SP _msg;
spi::Context _context;
- PersistenceUtil &_env;
+ const PersistenceUtil &_env;
MessageSender &_replySender;
FileStorThreadMetrics::Op *_metric; // needs a better and thread safe solution
api::StorageReply::SP _reply;
@@ -132,7 +132,7 @@ struct PersistenceUtil {
static api::BucketInfo convertBucketInfo(const spi::BucketInfo&);
- void setBucketInfo(MessageTracker& tracker, const document::Bucket &bucket);
+ void setBucketInfo(MessageTracker& tracker, const document::Bucket &bucket) const;
spi::Bucket getBucket(const document::DocumentId& id, const document::Bucket &bucket) const;
diff --git a/storage/src/vespa/storage/persistence/processallhandler.cpp b/storage/src/vespa/storage/persistence/processallhandler.cpp
index 0a1141a9ab3..a9c1aafd4d9 100644
--- a/storage/src/vespa/storage/persistence/processallhandler.cpp
+++ b/storage/src/vespa/storage/persistence/processallhandler.cpp
@@ -74,7 +74,7 @@ public:
}
MessageTracker::UP
-ProcessAllHandler::handleRemoveLocation(api::RemoveLocationCommand& cmd, MessageTracker::UP tracker)
+ProcessAllHandler::handleRemoveLocation(api::RemoveLocationCommand& cmd, MessageTracker::UP tracker) const
{
tracker->setMetric(_env._metrics.removeLocation[cmd.getLoadType()]);
@@ -93,7 +93,7 @@ ProcessAllHandler::handleRemoveLocation(api::RemoveLocationCommand& cmd, Message
}
MessageTracker::UP
-ProcessAllHandler::handleStatBucket(api::StatBucketCommand& cmd, MessageTracker::UP tracker)
+ProcessAllHandler::handleStatBucket(api::StatBucketCommand& cmd, MessageTracker::UP tracker) const
{
tracker->setMetric(_env._metrics.statBucket[cmd.getLoadType()]);
std::ostringstream ost;
diff --git a/storage/src/vespa/storage/persistence/processallhandler.h b/storage/src/vespa/storage/persistence/processallhandler.h
index 9c0f8905744..14b6bced8a7 100644
--- a/storage/src/vespa/storage/persistence/processallhandler.h
+++ b/storage/src/vespa/storage/persistence/processallhandler.h
@@ -14,8 +14,8 @@ struct PersistenceUtil;
class ProcessAllHandler : public Types {
public:
ProcessAllHandler(const PersistenceUtil&, spi::PersistenceProvider&);
- MessageTrackerUP handleRemoveLocation(api::RemoveLocationCommand&, MessageTrackerUP tracker);
- MessageTrackerUP handleStatBucket(api::StatBucketCommand&, MessageTrackerUP tracker);
+ MessageTrackerUP handleRemoveLocation(api::RemoveLocationCommand&, MessageTrackerUP tracker) const;
+ MessageTrackerUP handleStatBucket(api::StatBucketCommand&, MessageTrackerUP tracker) const;
private:
const PersistenceUtil& _env;
spi::PersistenceProvider& _spi;