diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-10-19 18:00:32 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-10-19 18:00:32 +0200 |
commit | b898bffea4055d8306cf5656957c8a6b4a0fd4c6 (patch) | |
tree | 4f16785b3ca9bdd2245faffef2dba472175eb69d /storage | |
parent | 98f3df8f0778a18a3995e4070828c28092c1edb3 (diff) | |
parent | 229b5b3d08ead5d2b8f022341bd26feb2d692329 (diff) |
Merge pull request #14951 from vespa-engine/balder/split-thread-and-handler
Split the persistence thread and the message handler.
Diffstat (limited to 'storage')
24 files changed, 367 insertions, 345 deletions
diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp index 761888e9f9b..7876f19b3ba 100644 --- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp +++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp @@ -18,8 +18,8 @@ #include <vespa/storage/persistence/bucketownershipnotifier.h> #include <vespa/storage/persistence/filestorage/filestorhandlerimpl.h> #include <vespa/storage/persistence/filestorage/filestormanager.h> -#include <vespa/storage/persistence/filestorage/modifiedbucketchecker.h> #include <vespa/storage/persistence/persistencethread.h> +#include <vespa/storage/persistence/persistencehandler.h> #include <vespa/storage/storageserver/statemanager.h> #include <vespa/storageapi/message/bucketsplitting.h> #include <vespa/vdslib/state/random.h> @@ -194,17 +194,11 @@ bool fileExistsWithin(const std::string& path, const std::string& file) { } std::unique_ptr<DiskThread> -createThread(vdstestlib::DirConfig& config, - TestServiceLayerApp& node, - spi::PersistenceProvider& provider, +createThread(PersistenceHandler & persistenceHandler, FileStorHandler& filestorHandler, - BucketOwnershipNotifier & notifier, - FileStorThreadMetrics& metrics) + framework::Component & component) { - (void) config; - vespa::config::content::StorFilestorConfig cfg; - return std::make_unique<PersistenceThread>(node.executor(), node.getComponentRegister(), cfg, - provider, filestorHandler, notifier, metrics); + return std::make_unique<PersistenceThread>(persistenceHandler, filestorHandler, 0, component); } namespace { @@ -402,8 +396,7 @@ TEST_F(FileStorManagerTest, handler_priority) { FileStorHandlerImpl filestorHandler(messageSender, metrics, _node->getComponentRegister()); filestorHandler.setGetNextMessageTimeout(50ms); - uint32_t stripeId = filestorHandler.getNextStripeId(); - ASSERT_EQ(0u, stripeId); + uint32_t stripeId = 0; std::string content("Here is some content which is in all documents"); std::ostringstream uri; @@ -469,7 +462,7 @@ public: std::atomic<bool> _threadDone; explicit MessageFetchingThread(FileStorHandler& handler) - : _threadId(handler.getNextStripeId()), _handler(handler), _config(0), _fetchedCount(0), _done(false), + : _threadId(0), _handler(handler), _config(0), _fetchedCount(0), _done(false), _failed(false), _threadDone(false) {} @@ -556,7 +549,7 @@ TEST_F(FileStorManagerTest, handler_pause) { FileStorHandlerImpl filestorHandler(messageSender, metrics, _node->getComponentRegister()); filestorHandler.setGetNextMessageTimeout(50ms); - uint32_t stripeId = filestorHandler.getNextStripeId(); + uint32_t stripeId = 0; std::string content("Here is some content which is in all documents"); std::ostringstream uri; @@ -660,7 +653,7 @@ TEST_F(FileStorManagerTest, handler_timeout) { FileStorHandlerImpl filestorHandler(messageSender, metrics, _node->getComponentRegister()); filestorHandler.setGetNextMessageTimeout(50ms); - uint32_t stripeId = filestorHandler.getNextStripeId(); + uint32_t stripeId = 0; std::string content("Here is some content which is in all documents"); std::ostringstream uri; @@ -721,12 +714,14 @@ TEST_F(FileStorManagerTest, priority) { FileStorHandlerImpl filestorHandler(messageSender, metrics, _node->getComponentRegister()); ServiceLayerComponent component(_node->getComponentRegister(), "test"); BucketOwnershipNotifier bucketOwnershipNotifier(component, messageSender); - std::unique_ptr<DiskThread> thread(createThread( - *config, *_node, _node->getPersistenceProvider(), - filestorHandler, bucketOwnershipNotifier, *metrics.disks[0]->threads[0])); - std::unique_ptr<DiskThread> thread2(createThread( - *config, *_node, _node->getPersistenceProvider(), - filestorHandler, bucketOwnershipNotifier, *metrics.disks[0]->threads[1])); + vespa::config::content::StorFilestorConfig cfg; + PersistenceHandler persistenceHandler(_node->executor(), component, cfg, _node->getPersistenceProvider(), + filestorHandler, bucketOwnershipNotifier, *metrics.disks[0]->threads[0]); + std::unique_ptr<DiskThread> thread(createThread(persistenceHandler, filestorHandler, component)); + + PersistenceHandler persistenceHandler2(_node->executor(), component, cfg, _node->getPersistenceProvider(), + filestorHandler, bucketOwnershipNotifier, *metrics.disks[0]->threads[1]); + std::unique_ptr<DiskThread> thread2(createThread(persistenceHandler2, filestorHandler, component)); // Creating documents to test with. Different gids, 2 locations. std::vector<document::Document::SP > documents; @@ -803,9 +798,10 @@ TEST_F(FileStorManagerTest, split1) { FileStorHandlerImpl filestorHandler(messageSender, metrics, _node->getComponentRegister()); ServiceLayerComponent component(_node->getComponentRegister(), "test"); BucketOwnershipNotifier bucketOwnershipNotifier(component, messageSender); - std::unique_ptr<DiskThread> thread(createThread( - *config, *_node, _node->getPersistenceProvider(), - filestorHandler, bucketOwnershipNotifier, *metrics.disks[0]->threads[0])); + vespa::config::content::StorFilestorConfig cfg; + PersistenceHandler persistenceHandler(_node->executor(), component, cfg, _node->getPersistenceProvider(), + filestorHandler, bucketOwnershipNotifier, *metrics.disks[0]->threads[0]); + std::unique_ptr<DiskThread> thread(createThread(persistenceHandler, filestorHandler, component)); // Creating documents to test with. Different gids, 2 locations. std::vector<document::Document::SP > documents; for (uint32_t i=0; i<20; ++i) { @@ -814,8 +810,7 @@ TEST_F(FileStorManagerTest, split1) { uri << "id:footype:testdoctype1:n=" << (i % 3 == 0 ? 0x10001 : 0x0100001) << ":mydoc-" << i; - Document::SP doc(createDocument( - content, uri.str()).release()); + Document::SP doc(createDocument(content, uri.str()).release()); documents.push_back(doc); } document::BucketIdFactory factory; @@ -824,11 +819,9 @@ TEST_F(FileStorManagerTest, split1) { { // Populate bucket with the given data for (uint32_t i=0; i<documents.size(); ++i) { - document::BucketId bucket(16, factory.getBucketId( - documents[i]->getId()).getRawId()); + document::BucketId bucket(16, factory.getBucketId(documents[i]->getId()).getRawId()); - _node->getPersistenceProvider().createBucket( - makeSpiBucket(bucket), context); + _node->getPersistenceProvider().createBucket(makeSpiBucket(bucket), context); auto cmd = std::make_shared<api::PutCommand>(makeDocumentBucket(bucket), documents[i], 100 + i); auto address = std::make_unique<api::StorageMessageAddress>("storage", lib::NodeType::STORAGE, 3); @@ -946,15 +939,16 @@ TEST_F(FileStorManagerTest, split_single_group) { ServiceLayerComponent component(_node->getComponentRegister(), "test"); BucketOwnershipNotifier bucketOwnershipNotifier(component, messageSender); spi::Context context(defaultLoadType, spi::Priority(0), spi::Trace::TraceLevel(0)); + vespa::config::content::StorFilestorConfig cfg; + PersistenceHandler persistenceHandler(_node->executor(), component, cfg, _node->getPersistenceProvider(), + filestorHandler, bucketOwnershipNotifier, *metrics.disks[0]->threads[0]); for (uint32_t j=0; j<1; ++j) { // Test this twice, once where all the data ends up in file with // splitbit set, and once where all the data ends up in file with // splitbit unset bool state = (j == 0); - std::unique_ptr<DiskThread> thread(createThread( - *config, *_node, _node->getPersistenceProvider(), - filestorHandler, bucketOwnershipNotifier, *metrics.disks[0]->threads[0])); + std::unique_ptr<DiskThread> thread(createThread(persistenceHandler, filestorHandler, component)); // Creating documents to test with. Different gids, 2 locations. std::vector<document::Document::SP> documents; for (uint32_t i=0; i<20; ++i) { @@ -1059,9 +1053,10 @@ TEST_F(FileStorManagerTest, split_empty_target_with_remapped_ops) { FileStorHandlerImpl filestorHandler(messageSender, metrics, _node->getComponentRegister()); ServiceLayerComponent component(_node->getComponentRegister(), "test"); BucketOwnershipNotifier bucketOwnershipNotifier(component, messageSender); - std::unique_ptr<DiskThread> thread(createThread( - *config, *_node, _node->getPersistenceProvider(), - filestorHandler, bucketOwnershipNotifier, *metrics.disks[0]->threads[0])); + vespa::config::content::StorFilestorConfig cfg; + PersistenceHandler persistenceHandler(_node->executor(), component, cfg, _node->getPersistenceProvider(), + filestorHandler, bucketOwnershipNotifier, *metrics.disks[0]->threads[0]); + std::unique_ptr<DiskThread> thread(createThread(persistenceHandler, filestorHandler, component)); document::BucketId source(16, 0x10001); @@ -1126,9 +1121,10 @@ TEST_F(FileStorManagerTest, notify_on_split_source_ownership_changed) { FileStorHandlerImpl filestorHandler(messageSender, metrics, _node->getComponentRegister()); ServiceLayerComponent component(_node->getComponentRegister(), "test"); BucketOwnershipNotifier bucketOwnershipNotifier(component, messageSender); - std::unique_ptr<DiskThread> thread(createThread( - *config, *_node, _node->getPersistenceProvider(), - filestorHandler, bucketOwnershipNotifier, *metrics.disks[0]->threads[0])); + vespa::config::content::StorFilestorConfig cfg; + PersistenceHandler persistenceHandler(_node->executor(), component, cfg, _node->getPersistenceProvider(), + filestorHandler, bucketOwnershipNotifier, *metrics.disks[0]->threads[0]); + std::unique_ptr<DiskThread> thread(createThread(persistenceHandler, filestorHandler, component)); document::BucketId source(getFirstBucketNotOwnedByDistributor(0)); createBucket(source, 0); @@ -1158,8 +1154,7 @@ TEST_F(FileStorManagerTest, join) { // Setup a filestorthread to test DummyStorageLink top; DummyStorageLink *dummyManager; - top.push_back(std::unique_ptr<StorageLink>( - dummyManager = new DummyStorageLink)); + top.push_back(std::unique_ptr<StorageLink>(dummyManager = new DummyStorageLink)); top.open(); ForwardingMessageSender messageSender(*dummyManager); @@ -1169,9 +1164,10 @@ TEST_F(FileStorManagerTest, join) { FileStorHandlerImpl filestorHandler(messageSender, metrics, _node->getComponentRegister()); ServiceLayerComponent component(_node->getComponentRegister(), "test"); BucketOwnershipNotifier bucketOwnershipNotifier(component, messageSender); - std::unique_ptr<DiskThread> thread(createThread( - *config, *_node, _node->getPersistenceProvider(), - filestorHandler, bucketOwnershipNotifier, *metrics.disks[0]->threads[0])); + vespa::config::content::StorFilestorConfig cfg; + PersistenceHandler persistenceHandler(_node->executor(), component, cfg, _node->getPersistenceProvider(), + filestorHandler, bucketOwnershipNotifier, *metrics.disks[0]->threads[0]); + std::unique_ptr<DiskThread> thread(createThread(persistenceHandler, filestorHandler, component)); // Creating documents to test with. Different gids, 2 locations. std::vector<document::Document::SP > documents; for (uint32_t i=0; i<20; ++i) { @@ -1259,8 +1255,7 @@ createIterator(DummyStorageLink& link, { spi::Bucket bucket(makeSpiBucket(bucketId)); - spi::Selection selection = - spi::Selection(spi::DocumentSelection(docSel)); + spi::Selection selection = spi::Selection(spi::DocumentSelection(docSel)); selection.setFromTimestamp(spi::Timestamp(fromTime.getTime())); selection.setToTimestamp(spi::Timestamp(toTime.getTime())); auto createIterCmd = std::make_shared<CreateIteratorCommand>( diff --git a/storage/src/tests/persistence/persistencequeuetest.cpp b/storage/src/tests/persistence/persistencequeuetest.cpp index 4737809a926..a19d060474b 100644 --- a/storage/src/tests/persistence/persistencequeuetest.cpp +++ b/storage/src/tests/persistence/persistencequeuetest.cpp @@ -42,7 +42,8 @@ PersistenceQueueTest::Fixture::Fixture(FileStorTestFixture& parent_) dummyManager(std::make_unique<DummyStorageLink>()), messageSender(*dummyManager), loadTypes("raw:"), - metrics(loadTypes.getMetricLoadTypes()) + metrics(loadTypes.getMetricLoadTypes()), + stripeId(0) { top.push_back(std::move(dummyManager)); top.open(); @@ -55,8 +56,6 @@ PersistenceQueueTest::Fixture::Fixture(FileStorTestFixture& parent_) // that is large enough to fail tests with high probability if this is not the case, // and small enough to not slow down testing too much. filestorHandler->setGetNextMessageTimeout(20ms); - - stripeId = filestorHandler->getNextStripeId(); } PersistenceQueueTest::Fixture::~Fixture() = default; diff --git a/storage/src/tests/persistence/persistencetestutils.cpp b/storage/src/tests/persistence/persistencetestutils.cpp index e2d61b9db2c..b393c4a5f8d 100644 --- a/storage/src/tests/persistence/persistencetestutils.cpp +++ b/storage/src/tests/persistence/persistencetestutils.cpp @@ -9,11 +9,13 @@ #include <vespa/persistence/dummyimpl/dummypersistence.h> #include <vespa/persistence/spi/test.h> #include <vespa/storage/persistence/filestorage/filestorhandlerimpl.h> +#include <vespa/storage/persistence/persistencehandler.h> #include <vespa/storageapi/message/persistence.h> #include <vespa/vespalib/io/fileutil.h> #include <vespa/vespalib/objects/nbostream.h> #include <vespa/vespalib/util/exceptions.h> #include <vespa/vespalib/util/sequencedtaskexecutor.h> +#include <vespa/config-stor-filestor.h> #include <thread> using document::DocumentType; @@ -70,9 +72,14 @@ PersistenceTestEnvironment::~PersistenceTestEnvironment() { PersistenceTestUtils::PersistenceTestUtils() : _env(std::make_unique<PersistenceTestEnvironment>("todo-make-unique-persistencetestutils")), _replySender(), - _bucketOwnershipNotifier(getEnv()._component, getEnv()._fileStorHandler) + _bucketOwnershipNotifier(getEnv()._component, getEnv()._fileStorHandler), + _persistenceHandler() { setupExecutor(1); + vespa::config::content::StorFilestorConfig cfg; + _persistenceHandler = std::make_unique<PersistenceHandler>(*_sequenceTaskExecutor, _env->_component, cfg, + getPersistenceProvider(), getEnv()._fileStorHandler, + _bucketOwnershipNotifier, getEnv()._metrics); } PersistenceTestUtils::~PersistenceTestUtils() = default; @@ -86,15 +93,6 @@ PersistenceTestUtils::setupExecutor(uint32_t numThreads) { _sequenceTaskExecutor = vespalib::SequencedTaskExecutor::create(numThreads, 1000, vespalib::Executor::OptimizeFor::ADAPTIVE); } -std::unique_ptr<PersistenceThread> -PersistenceTestUtils::createPersistenceThread() -{ - vespa::config::content::StorFilestorConfig cfg; - return std::make_unique<PersistenceThread>(*_sequenceTaskExecutor, _env->_node.getComponentRegister(), - cfg, getPersistenceProvider(), - getEnv()._fileStorHandler, _bucketOwnershipNotifier, getEnv()._metrics); -} - document::Document::SP PersistenceTestUtils::schedulePut( uint32_t location, diff --git a/storage/src/tests/persistence/persistencetestutils.h b/storage/src/tests/persistence/persistencetestutils.h index 29a18db413b..332a30393b1 100644 --- a/storage/src/tests/persistence/persistencetestutils.h +++ b/storage/src/tests/persistence/persistencetestutils.h @@ -73,7 +73,7 @@ public: std::unique_ptr<vespalib::ISequencedTaskExecutor> _sequenceTaskExecutor; ReplySender _replySender; BucketOwnershipNotifier _bucketOwnershipNotifier; - + std::unique_ptr<PersistenceHandler> _persistenceHandler; PersistenceTestUtils(); ~PersistenceTestUtils() override; @@ -225,11 +225,6 @@ public: void createTestBucket(const document::Bucket&); /** - * Create a new persistence thread. - */ - std::unique_ptr<PersistenceThread> createPersistenceThread(); - - /** * In-place modify doc so that it has no more body fields. */ void clearBody(document::Document& doc); diff --git a/storage/src/tests/persistence/persistencethread_splittest.cpp b/storage/src/tests/persistence/persistencethread_splittest.cpp index e266d367eab..3dd8075176d 100644 --- a/storage/src/tests/persistence/persistencethread_splittest.cpp +++ b/storage/src/tests/persistence/persistencethread_splittest.cpp @@ -1,6 +1,6 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include <vespa/storage/persistence/persistencethread.h> +#include <vespa/storage/persistence/persistencehandler.h> #include <vespa/storageapi/message/bucketsplitting.h> #include <vespa/persistence/spi/test.h> #include <tests/persistence/persistencetestutils.h> @@ -204,7 +204,6 @@ PersistenceThreadSplitTest::doTest(SplitCase splitCase) spi.put(bucket, spi::Timestamp(1000 + i), std::move(doc), context); } - std::unique_ptr<PersistenceThread> thread(createPersistenceThread()); getNode().getStateUpdater().setClusterState( std::make_shared<lib::ClusterState>("distributor:1 storage:1")); document::Bucket docBucket = makeDocumentBucket(document::BucketId(currentSplitLevel, 1)); @@ -214,7 +213,7 @@ PersistenceThreadSplitTest::doTest(SplitCase splitCase) cmd->setMinByteSize(maxSize); cmd->setMinDocCount(maxCount); cmd->setSourceIndex(0); - MessageTracker::UP result = thread->splitjoinHandler().handleSplitBucket(*cmd, createTracker(cmd, docBucket)); + MessageTracker::UP result = _persistenceHandler->splitjoinHandler().handleSplitBucket(*cmd, createTracker(cmd, docBucket)); api::ReturnCode code(result->getResult()); EXPECT_EQ(error, code); if (!code.success()) { diff --git a/storage/src/tests/persistence/provider_error_wrapper_test.cpp b/storage/src/tests/persistence/provider_error_wrapper_test.cpp index 2ba5218003b..d4d26a68844 100644 --- a/storage/src/tests/persistence/provider_error_wrapper_test.cpp +++ b/storage/src/tests/persistence/provider_error_wrapper_test.cpp @@ -3,6 +3,7 @@ #include <vespa/persistence/spi/test.h> #include <tests/persistence/persistencetestutils.h> #include <tests/persistence/common/persistenceproviderwrapper.h> +#include <vespa/storage/persistence/provider_error_wrapper.h> using storage::spi::test::makeSpiBucket; diff --git a/storage/src/tests/persistence/testandsettest.cpp b/storage/src/tests/persistence/testandsettest.cpp index 3d0dd183232..a952e227b70 100644 --- a/storage/src/tests/persistence/testandsettest.cpp +++ b/storage/src/tests/persistence/testandsettest.cpp @@ -1,6 +1,6 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. // @author Vegard Sjonfjell -#include <vespa/storage/persistence/persistencethread.h> +#include <vespa/storage/persistence/persistencehandler.h> #include <tests/persistence/persistencetestutils.h> #include <vespa/document/test/make_document_bucket.h> #include <vespa/documentapi/messagebus/messages/testandsetcondition.h> @@ -32,13 +32,16 @@ struct TestAndSetTest : SingleDiskPersistenceTestUtils { const document::StringFieldValue NEW_CONTENT{"Freshly pressed and squeezed content"}; const document::Bucket BUCKET = makeDocumentBucket(BUCKET_ID); - unique_ptr<PersistenceThread> thread; + unique_ptr<PersistenceHandler> persistenceHandler; + const AsyncHandler * asyncHandler; shared_ptr<document::Document> testDoc; document::DocumentId testDocId; spi::Context context; TestAndSetTest() - : context(spi::LoadType(0, "default"), 0, 0) + : persistenceHandler(), + asyncHandler(nullptr), + context(spi::LoadType(0, "default"), 0, 0) {} void SetUp() override { @@ -47,14 +50,12 @@ struct TestAndSetTest : SingleDiskPersistenceTestUtils { createBucket(BUCKET_ID); getPersistenceProvider().createBucket(makeSpiBucket(BUCKET_ID),context); - thread = createPersistenceThread(); testDoc = createTestDocument(); testDocId = testDoc->getId(); + asyncHandler = &_persistenceHandler->asyncHandler(); } void TearDown() override { - thread->flush(); - thread.reset(); SingleDiskPersistenceTestUtils::TearDown(); } @@ -86,7 +87,7 @@ TEST_F(TestAndSetTest, conditional_put_not_executed_on_condition_mismatch) { auto putTwo = std::make_shared<api::PutCommand>(BUCKET, testDoc, timestampTwo); setTestCondition(*putTwo); - ASSERT_EQ(fetchResult(thread->asyncHandler().handlePut(*putTwo, createTracker(putTwo, BUCKET))).getResult(), + ASSERT_EQ(fetchResult(asyncHandler->handlePut(*putTwo, createTracker(putTwo, BUCKET))).getResult(), api::ReturnCode::Result::TEST_AND_SET_CONDITION_FAILED); EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId), dumpBucket(BUCKET_ID)); } @@ -106,7 +107,7 @@ TEST_F(TestAndSetTest, conditional_put_executed_on_condition_match) { auto putTwo = std::make_shared<api::PutCommand>(BUCKET, testDoc, timestampTwo); setTestCondition(*putTwo); - ASSERT_EQ(fetchResult(thread->asyncHandler().handlePut(*putTwo, createTracker(putTwo, BUCKET))).getResult(), api::ReturnCode::Result::OK); + ASSERT_EQ(fetchResult(asyncHandler->handlePut(*putTwo, createTracker(putTwo, BUCKET))).getResult(), api::ReturnCode::Result::OK); EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId) + expectedDocEntryString(timestampTwo, testDocId), dumpBucket(BUCKET_ID)); @@ -126,7 +127,7 @@ TEST_F(TestAndSetTest, conditional_remove_not_executed_on_condition_mismatch) { auto remove = std::make_shared<api::RemoveCommand>(BUCKET, testDocId, timestampTwo); setTestCondition(*remove); - ASSERT_EQ(fetchResult(thread->asyncHandler().handleRemove(*remove, createTracker(remove, BUCKET))).getResult(), + ASSERT_EQ(fetchResult(asyncHandler->handleRemove(*remove, createTracker(remove, BUCKET))).getResult(), api::ReturnCode::Result::TEST_AND_SET_CONDITION_FAILED); EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId), dumpBucket(BUCKET_ID)); @@ -146,7 +147,7 @@ TEST_F(TestAndSetTest, conditional_remove_executed_on_condition_match) { auto remove = std::make_shared<api::RemoveCommand>(BUCKET, testDocId, timestampTwo); setTestCondition(*remove); - ASSERT_EQ(fetchResult(thread->asyncHandler().handleRemove(*remove, createTracker(remove, BUCKET))).getResult(), api::ReturnCode::Result::OK); + ASSERT_EQ(fetchResult(asyncHandler->handleRemove(*remove, createTracker(remove, BUCKET))).getResult(), api::ReturnCode::Result::OK); EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId) + expectedDocEntryString(timestampTwo, testDocId, spi::REMOVE_ENTRY), dumpBucket(BUCKET_ID)); @@ -172,7 +173,7 @@ TEST_F(TestAndSetTest, conditional_update_not_executed_on_condition_mismatch) { putTestDocument(false, timestampOne); auto updateUp = conditional_update_test(false, timestampTwo); - ASSERT_EQ(fetchResult(thread->asyncHandler().handleUpdate(*updateUp, createTracker(updateUp, BUCKET))).getResult(), + ASSERT_EQ(fetchResult(asyncHandler->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))).getResult(), api::ReturnCode::Result::TEST_AND_SET_CONDITION_FAILED); EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId), dumpBucket(BUCKET_ID)); @@ -185,7 +186,7 @@ TEST_F(TestAndSetTest, conditional_update_executed_on_condition_match) { putTestDocument(true, timestampOne); auto updateUp = conditional_update_test(false, timestampTwo); - ASSERT_EQ(fetchResult(thread->asyncHandler().handleUpdate(*updateUp, createTracker(updateUp, BUCKET))).getResult(), api::ReturnCode::Result::OK); + ASSERT_EQ(fetchResult(asyncHandler->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))).getResult(), api::ReturnCode::Result::OK); EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId) + expectedDocEntryString(timestampTwo, testDocId), dumpBucket(BUCKET_ID)); @@ -197,7 +198,7 @@ TEST_F(TestAndSetTest, conditional_update_not_executed_when_no_document_and_no_a api::Timestamp updateTimestamp = 200; auto updateUp = conditional_update_test(false, updateTimestamp); - ASSERT_EQ(fetchResult(thread->asyncHandler().handleUpdate(*updateUp, createTracker(updateUp, BUCKET))).getResult(), + ASSERT_EQ(fetchResult(asyncHandler->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))).getResult(), api::ReturnCode::Result::TEST_AND_SET_CONDITION_FAILED); EXPECT_EQ("", dumpBucket(BUCKET_ID)); } @@ -206,7 +207,7 @@ TEST_F(TestAndSetTest, conditional_update_executed_when_no_document_but_auto_cre api::Timestamp updateTimestamp = 200; auto updateUp = conditional_update_test(true, updateTimestamp); - ASSERT_EQ(fetchResult(thread->asyncHandler().handleUpdate(*updateUp, createTracker(updateUp, BUCKET))).getResult(), api::ReturnCode::Result::OK); + ASSERT_EQ(fetchResult(asyncHandler->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))).getResult(), api::ReturnCode::Result::OK); EXPECT_EQ(expectedDocEntryString(updateTimestamp, testDocId), dumpBucket(BUCKET_ID)); assertTestDocumentFoundAndMatchesContent(NEW_CONTENT); } @@ -218,7 +219,7 @@ TEST_F(TestAndSetTest, invalid_document_selection_should_fail) { auto put = std::make_shared<api::PutCommand>(BUCKET, testDoc, timestamp); put->setCondition(documentapi::TestAndSetCondition("bjarne")); - ASSERT_EQ(fetchResult(thread->asyncHandler().handlePut(*put, createTracker(put, BUCKET))).getResult(), api::ReturnCode::Result::ILLEGAL_PARAMETERS); + ASSERT_EQ(fetchResult(asyncHandler->handlePut(*put, createTracker(put, BUCKET))).getResult(), api::ReturnCode::Result::ILLEGAL_PARAMETERS); EXPECT_EQ("", dumpBucket(BUCKET_ID)); } @@ -228,9 +229,9 @@ TEST_F(TestAndSetTest, conditional_put_to_non_existing_document_should_fail) { api::Timestamp timestamp = 0; auto put = std::make_shared<api::PutCommand>(BUCKET, testDoc, timestamp); setTestCondition(*put); - thread->asyncHandler().handlePut(*put, createTracker(put, BUCKET)); + asyncHandler->handlePut(*put, createTracker(put, BUCKET)); - ASSERT_EQ(fetchResult(thread->asyncHandler().handlePut(*put, createTracker(put, BUCKET))).getResult(), + ASSERT_EQ(fetchResult(asyncHandler->handlePut(*put, createTracker(put, BUCKET))).getResult(), api::ReturnCode::Result::TEST_AND_SET_CONDITION_FAILED); EXPECT_EQ("", dumpBucket(BUCKET_ID)); } @@ -255,7 +256,7 @@ document::Document::SP TestAndSetTest::retrieveTestDocument() { auto get = std::make_shared<api::GetCommand>(BUCKET, testDocId, document::AllFields::NAME); - auto tracker = thread->simpleMessageHandler().handleGet(*get, createTracker(get, BUCKET)); + auto tracker = _persistenceHandler->simpleMessageHandler().handleGet(*get, createTracker(get, BUCKET)); assert(tracker->getResult() == api::ReturnCode::Result::OK); auto & reply = static_cast<api::GetReply &>(tracker->getReply()); @@ -275,7 +276,7 @@ void TestAndSetTest::putTestDocument(bool matchingHeader, api::Timestamp timesta } auto put = std::make_shared<api::PutCommand>(BUCKET, testDoc, timestamp); - fetchResult(thread->asyncHandler().handlePut(*put, createTracker(put, BUCKET))); + fetchResult(asyncHandler->handlePut(*put, createTracker(put, BUCKET))); } void TestAndSetTest::assertTestDocumentFoundAndMatchesContent(const document::FieldValue & value) diff --git a/storage/src/vespa/storage/persistence/CMakeLists.txt b/storage/src/vespa/storage/persistence/CMakeLists.txt index aa22a67f747..ff8d29f7f45 100644 --- a/storage/src/vespa/storage/persistence/CMakeLists.txt +++ b/storage/src/vespa/storage/persistence/CMakeLists.txt @@ -7,6 +7,7 @@ vespa_add_library(storage_spersistence OBJECT fieldvisitor.cpp mergehandler.cpp messages.cpp + persistencehandler.cpp persistencethread.cpp persistenceutil.cpp processallhandler.cpp diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h index 4427a2f45e8..44e768c9db7 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h @@ -215,9 +215,6 @@ public: */ virtual uint32_t getNumActiveMerges() const = 0; - /// Provides the next stripe id for a certain disk. - virtual uint32_t getNextStripeId() = 0; - /** Removes the merge status for the given bucket. */ virtual void clearMergeStatus(const document::Bucket&) = 0; virtual void clearMergeStatus(const document::Bucket&, const api::ReturnCode&) = 0; diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp index 31cf06dfda4..57d09818ae6 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp @@ -48,7 +48,6 @@ FileStorHandlerImpl::FileStorHandlerImpl(uint32_t numThreads, uint32_t numStripe ServiceLayerComponentRegister& compReg) : _component(compReg, "filestorhandlerimpl"), _state(FileStorHandler::AVAILABLE), - _nextStripeId(0), _metrics(nullptr), _stripes(), _messageSender(sender), diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h index b4fd18bd2e2..6aac8b0474b 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h @@ -208,9 +208,6 @@ public: void getStatus(std::ostream& out, const framework::HttpUrlPath& path) const override; uint32_t getQueueSize() const override; - uint32_t getNextStripeId() override { - return (_nextStripeId++) % _stripes.size(); - } std::shared_ptr<FileStorHandler::BucketLockInterface> lock(const document::Bucket & bucket, api::LockingRequirements lockReq) override { @@ -236,7 +233,6 @@ public: private: ServiceLayerComponent _component; std::atomic<DiskState> _state; - uint32_t _nextStripeId; FileStorDiskMetrics * _metrics; std::vector<Stripe> _stripes; MessageSender& _messageSender; diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp index 98b95c16b78..832e07eaf95 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp @@ -2,11 +2,9 @@ #include "filestorhandlerimpl.h" #include "filestormanager.h" - #include <vespa/storage/bucketdb/lockablemap.hpp> #include <vespa/storage/bucketdb/minimumusedbitstracker.h> #include <vespa/storage/common/bucketmessages.h> -#include <vespa/storage/common/bucketoperationlogger.h> #include <vespa/storage/common/content_bucket_space_repo.h> #include <vespa/storage/common/doneinitializehandler.h> #include <vespa/vdslib/state/cluster_state_bundle.h> @@ -14,9 +12,12 @@ #include <vespa/storage/config/config-stor-server.h> #include <vespa/storage/persistence/bucketownershipnotifier.h> #include <vespa/storage/persistence/persistencethread.h> +#include <vespa/storage/persistence/persistencehandler.h> #include <vespa/storageapi/message/bucketsplitting.h> #include <vespa/storageapi/message/state.h> -#include <vespa/vespalib/stllike/hash_map.hpp> +#include <vespa/storageapi/message/persistence.h> +#include <vespa/storageapi/message/removelocation.h> +#include <vespa/storageapi/message/stat.h> #include <vespa/vespalib/util/stringfmt.h> #include <vespa/vespalib/util/sequencedtaskexecutor.h> @@ -25,6 +26,7 @@ LOG_SETUP(".persistence.filestor.manager"); using std::shared_ptr; using document::BucketSpace; +using vespalib::make_string_short::fmt; namespace storage { @@ -41,6 +43,7 @@ FileStorManager(const config::ConfigUri & configUri, spi::PersistenceProvider& p _init_handler(init_handler), _bucketIdFactory(_component.getBucketIdFactory()), _configUri(configUri), + _persistenceHandlers(), _threads(), _bucketOwnershipNotifier(std::make_unique<BucketOwnershipNotifier>(_component, *this)), _configFetcher(_configUri.getContext()), @@ -106,6 +109,11 @@ selectSequencer(vespa::config::content::StorFilestorConfig::ResponseSequencerTyp } } +vespalib::string +createThreadName(size_t stripeId) { + return fmt("PersistenceThread-%zu", stripeId); +} + } /** * If live configuration, assuming storageserver makes sure no messages are @@ -132,9 +140,15 @@ FileStorManager::configure(std::unique_ptr<vespa::config::content::StorFilestorC _sequencedExecutor = vespalib::SequencedTaskExecutor::create(numResponseThreads, 10000, selectSequencer(_config->responseSequencerType)); assert(_sequencedExecutor); LOG(spam, "Setting up the disk"); - for (uint32_t j = 0; j < numThreads; j++) { - _threads.push_back(std::make_shared<PersistenceThread>(*_sequencedExecutor, _compReg, *_config, *_provider, - *_filestorHandler, *_bucketOwnershipNotifier, *_metrics->disks[0]->threads[j])); + for (uint32_t i = 0; i < numThreads; i++) { + _persistenceComponents.push_back(std::make_unique<ServiceLayerComponent>(_compReg, createThreadName(i))); + _persistenceHandlers.push_back( + std::make_unique<PersistenceHandler>(*_sequencedExecutor, + *_persistenceComponents.back(), + *_config, *_provider, *_filestorHandler, + *_bucketOwnershipNotifier, *_metrics->disks[0]->threads[i])); + _threads.push_back(std::make_unique<PersistenceThread>(*_persistenceHandlers.back(), *_filestorHandler, + i % numStripes, _component)); } } } @@ -436,8 +450,8 @@ FileStorManager::onDeleteBucket(const shared_ptr<api::DeleteBucketCommand>& cmd) } _filestorHandler->failOperations(cmd->getBucket(), api::ReturnCode(api::ReturnCode::BUCKET_DELETED, - vespalib::make_string("Bucket %s about to be deleted anyway", - cmd->getBucketId().toString().c_str()))); + fmt("Bucket %s about to be deleted anyway", + cmd->getBucketId().toString().c_str()))); return true; } diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.h b/storage/src/vespa/storage/persistence/filestorage/filestormanager.h index 85cbbe57d21..aa9e7860a22 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.h @@ -41,6 +41,7 @@ class ReadBucketList; class BucketOwnershipNotifier; class AbortBucketOperationsCommand; struct DoneInitializeHandler; +class PersistenceHandler; class FileStorManager : public StorageLinkQueued, public framework::HtmlStatusReporter, @@ -58,7 +59,9 @@ class FileStorManager : public StorageLinkQueued, const document::BucketIdFactory& _bucketIdFactory; config::ConfigUri _configUri; - std::vector<DiskThread::SP> _threads; + std::vector<std::unique_ptr<ServiceLayerComponent>> _persistenceComponents; + std::vector<std::unique_ptr<PersistenceHandler>> _persistenceHandlers; + std::vector<std::unique_ptr<DiskThread>> _threads; std::unique_ptr<BucketOwnershipNotifier> _bucketOwnershipNotifier; std::unique_ptr<vespa::config::content::StorFilestorConfig> _config; diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp index 4eb002833da..ec71aee7eed 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.cpp +++ b/storage/src/vespa/storage/persistence/mergehandler.cpp @@ -111,7 +111,7 @@ MergeHandler::populateMetaData( const spi::Bucket& bucket, Timestamp maxTimestamp, std::vector<spi::DocEntry::UP>& entries, - spi::Context& context) + spi::Context& context) const { spi::DocumentSelection docSel(""); @@ -162,7 +162,7 @@ MergeHandler::buildBucketInfoList( Timestamp maxTimestamp, uint8_t myNodeIndex, std::vector<api::GetBucketDiffCommand::Entry>& output, - spi::Context& context) + spi::Context& context) const { assert(output.size() == 0); assert(myNodeIndex < 16); @@ -336,7 +336,7 @@ MergeHandler::fetchLocalData( const documentapi::LoadType& /*loadType*/, std::vector<api::ApplyBucketDiffCommand::Entry>& diff, uint8_t nodeIndex, - spi::Context& context) + spi::Context& context) const { uint32_t nodeMask = 1 << nodeIndex; // Preload documents in memory @@ -497,7 +497,7 @@ void MergeHandler::applyDiffEntry(const spi::Bucket& bucket, const api::ApplyBucketDiffCommand::Entry& e, spi::Context& context, - const document::DocumentTypeRepo& repo) + const document::DocumentTypeRepo& repo) const { spi::Timestamp timestamp(e._entry._timestamp); if (!(e._entry._flags & (DELETED | DELETED_IN_PLACE))) { @@ -524,7 +524,7 @@ MergeHandler::applyDiffLocally( const documentapi::LoadType& /*loadType*/, std::vector<api::ApplyBucketDiffCommand::Entry>& diff, uint8_t nodeIndex, - spi::Context& context) + spi::Context& context) const { // Sort the data to apply by which file they should be added to LOG(spam, "Merge(%s): Applying data locally. Diff has %zu entries", @@ -683,7 +683,7 @@ namespace { api::StorageReply::SP MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status, - MessageSender& sender, spi::Context& context) + MessageSender& sender, spi::Context& context) const { // If last action failed, fail the whole merge if (status.reply->getResult().failed()) { @@ -845,7 +845,7 @@ public: }; MessageTracker::UP -MergeHandler::handleMergeBucket(api::MergeBucketCommand& cmd, MessageTracker::UP tracker) +MergeHandler::handleMergeBucket(api::MergeBucketCommand& cmd, MessageTracker::UP tracker) const { tracker->setMetric(_env._metrics.mergeBuckets); @@ -1056,7 +1056,7 @@ namespace { } MessageTracker::UP -MergeHandler::handleGetBucketDiff(api::GetBucketDiffCommand& cmd, MessageTracker::UP tracker) +MergeHandler::handleGetBucketDiff(api::GetBucketDiffCommand& cmd, MessageTracker::UP tracker) const { tracker->setMetric(_env._metrics.getBucketDiff); spi::Bucket bucket(cmd.getBucket()); @@ -1167,7 +1167,7 @@ namespace { } // End of anonymous namespace void -MergeHandler::handleGetBucketDiffReply(api::GetBucketDiffReply& reply, MessageSender& sender) +MergeHandler::handleGetBucketDiffReply(api::GetBucketDiffReply& reply, MessageSender& sender) const { _env._metrics.getBucketDiffReply.inc(); spi::Bucket bucket(reply.getBucket()); @@ -1240,7 +1240,7 @@ MergeHandler::handleGetBucketDiffReply(api::GetBucketDiffReply& reply, MessageSe } MessageTracker::UP -MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTracker::UP tracker) +MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTracker::UP tracker) const { tracker->setMetric(_env._metrics.applyBucketDiff); @@ -1269,8 +1269,7 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTra } if (applyDiffHasLocallyNeededData(cmd.getDiff(), index)) { framework::MilliSecTimer startTime(_clock); - (void) applyDiffLocally(bucket, cmd.getLoadType(), - cmd.getDiff(), index, tracker->context()); + (void) applyDiffLocally(bucket, cmd.getLoadType(), cmd.getDiff(), index, tracker->context()); _env._metrics.merge_handler_metrics.mergeDataWriteLatency.addValue( startTime.getElapsedTimeAsDouble()); } else { @@ -1328,7 +1327,7 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTra } void -MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,MessageSender& sender) +MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,MessageSender& sender) const { _env._metrics.applyBucketDiffReply.inc(); spi::Bucket bucket(reply.getBucket()); diff --git a/storage/src/vespa/storage/persistence/mergehandler.h b/storage/src/vespa/storage/persistence/mergehandler.h index 9f74a6b93df..af2f765aed5 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.h +++ b/storage/src/vespa/storage/persistence/mergehandler.h @@ -45,24 +45,24 @@ public: Timestamp maxTimestamp, uint8_t myNodeIndex, std::vector<api::GetBucketDiffCommand::Entry>& output, - spi::Context& context); + spi::Context& context) const; void fetchLocalData(const spi::Bucket& bucket, const documentapi::LoadType&, std::vector<api::ApplyBucketDiffCommand::Entry>& diff, uint8_t nodeIndex, - spi::Context& context); + spi::Context& context) const; api::BucketInfo applyDiffLocally( const spi::Bucket& bucket, const documentapi::LoadType&, std::vector<api::ApplyBucketDiffCommand::Entry>& diff, uint8_t nodeIndex, - spi::Context& context); + spi::Context& context) const; - MessageTrackerUP handleMergeBucket(api::MergeBucketCommand&, MessageTrackerUP); - MessageTrackerUP handleGetBucketDiff(api::GetBucketDiffCommand&, MessageTrackerUP); - void handleGetBucketDiffReply(api::GetBucketDiffReply&, MessageSender&); - MessageTrackerUP handleApplyBucketDiff(api::ApplyBucketDiffCommand&, MessageTrackerUP); - void handleApplyBucketDiffReply(api::ApplyBucketDiffReply&, MessageSender&); + MessageTrackerUP handleMergeBucket(api::MergeBucketCommand&, MessageTrackerUP) const; + MessageTrackerUP handleGetBucketDiff(api::GetBucketDiffCommand&, MessageTrackerUP) const; + void handleGetBucketDiffReply(api::GetBucketDiffReply&, MessageSender&) const; + MessageTrackerUP handleApplyBucketDiff(api::ApplyBucketDiffCommand&, MessageTrackerUP) const; + void handleApplyBucketDiffReply(api::ApplyBucketDiffReply&, MessageSender&) const; private: const framework::Clock &_clock; @@ -77,7 +77,7 @@ private: api::StorageReply::SP processBucketMerge(const spi::Bucket& bucket, MergeStatus& status, MessageSender& sender, - spi::Context& context); + spi::Context& context) const; /** * Invoke either put, remove or unrevertable remove on the SPI @@ -86,7 +86,7 @@ private: void applyDiffEntry(const spi::Bucket&, const api::ApplyBucketDiffCommand::Entry&, spi::Context& context, - const document::DocumentTypeRepo& repo); + const document::DocumentTypeRepo& repo) const; /** * Fill entries-vector with metadata for bucket up to maxTimestamp, @@ -96,7 +96,7 @@ private: void populateMetaData(const spi::Bucket&, Timestamp maxTimestamp, std::vector<spi::DocEntry::UP>& entries, - spi::Context& context); + spi::Context& context) const; Document::UP deserializeDiffDocument( const api::ApplyBucketDiffCommand::Entry& e, diff --git a/storage/src/vespa/storage/persistence/persistencehandler.cpp b/storage/src/vespa/storage/persistence/persistencehandler.cpp new file mode 100644 index 00000000000..13ff1df340b --- /dev/null +++ b/storage/src/vespa/storage/persistence/persistencehandler.cpp @@ -0,0 +1,151 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "persistencehandler.h" + +#include <vespa/log/log.h> +LOG_SETUP(".persistence.persistencehandler"); + +namespace storage { + +PersistenceHandler::PersistenceHandler(vespalib::ISequencedTaskExecutor & sequencedExecutor, + ServiceLayerComponent & component, + const vespa::config::content::StorFilestorConfig & cfg, + spi::PersistenceProvider& provider, + FileStorHandler& filestorHandler, + BucketOwnershipNotifier & bucketOwnershipNotifier, + FileStorThreadMetrics& metrics) + : + _env(component, filestorHandler, metrics, provider), + _processAllHandler(_env, provider), + _mergeHandler(_env, provider, cfg.bucketMergeChunkSize, + cfg.enableMergeLocalNodeChooseDocsOptimalization, + cfg.commonMergeChainOptimalizationMinimumSize), + _asyncHandler(_env, provider, sequencedExecutor), + _splitJoinHandler(_env, provider, bucketOwnershipNotifier, cfg.enableMultibitSplitOptimalization), + _simpleHandler(_env, provider) +{ +} + +PersistenceHandler::~PersistenceHandler() = default; + +MessageTracker::UP +PersistenceHandler::handleCommandSplitByType(api::StorageCommand& msg, MessageTracker::UP tracker) const +{ + switch (msg.getType().getId()) { + case api::MessageType::GET_ID: + return _simpleHandler.handleGet(static_cast<api::GetCommand&>(msg), std::move(tracker)); + case api::MessageType::PUT_ID: + return _asyncHandler.handlePut(static_cast<api::PutCommand&>(msg), std::move(tracker)); + case api::MessageType::REMOVE_ID: + return _asyncHandler.handleRemove(static_cast<api::RemoveCommand&>(msg), std::move(tracker)); + case api::MessageType::UPDATE_ID: + return _asyncHandler.handleUpdate(static_cast<api::UpdateCommand&>(msg), std::move(tracker)); + case api::MessageType::REVERT_ID: + return _simpleHandler.handleRevert(static_cast<api::RevertCommand&>(msg), std::move(tracker)); + case api::MessageType::CREATEBUCKET_ID: + return _simpleHandler.handleCreateBucket(static_cast<api::CreateBucketCommand&>(msg), std::move(tracker)); + case api::MessageType::DELETEBUCKET_ID: + return _simpleHandler.handleDeleteBucket(static_cast<api::DeleteBucketCommand&>(msg), std::move(tracker)); + case api::MessageType::JOINBUCKETS_ID: + return _splitJoinHandler.handleJoinBuckets(static_cast<api::JoinBucketsCommand&>(msg), std::move(tracker)); + case api::MessageType::SPLITBUCKET_ID: + return _splitJoinHandler.handleSplitBucket(static_cast<api::SplitBucketCommand&>(msg), std::move(tracker)); + // Depends on iterators + case api::MessageType::STATBUCKET_ID: + return _processAllHandler.handleStatBucket(static_cast<api::StatBucketCommand&>(msg), std::move(tracker)); + case api::MessageType::REMOVELOCATION_ID: + return _processAllHandler.handleRemoveLocation(static_cast<api::RemoveLocationCommand&>(msg), std::move(tracker)); + case api::MessageType::MERGEBUCKET_ID: + return _mergeHandler.handleMergeBucket(static_cast<api::MergeBucketCommand&>(msg), std::move(tracker)); + case api::MessageType::GETBUCKETDIFF_ID: + return _mergeHandler.handleGetBucketDiff(static_cast<api::GetBucketDiffCommand&>(msg), std::move(tracker)); + case api::MessageType::APPLYBUCKETDIFF_ID: + return _mergeHandler.handleApplyBucketDiff(static_cast<api::ApplyBucketDiffCommand&>(msg), std::move(tracker)); + case api::MessageType::SETBUCKETSTATE_ID: + return _splitJoinHandler.handleSetBucketState(static_cast<api::SetBucketStateCommand&>(msg), std::move(tracker)); + case api::MessageType::INTERNAL_ID: + switch(static_cast<api::InternalCommand&>(msg).getType()) { + case GetIterCommand::ID: + return _simpleHandler.handleGetIter(static_cast<GetIterCommand&>(msg), std::move(tracker)); + case CreateIteratorCommand::ID: + return _simpleHandler.handleCreateIterator(static_cast<CreateIteratorCommand&>(msg), std::move(tracker)); + case ReadBucketList::ID: + return _simpleHandler.handleReadBucketList(static_cast<ReadBucketList&>(msg), std::move(tracker)); + case ReadBucketInfo::ID: + return _simpleHandler.handleReadBucketInfo(static_cast<ReadBucketInfo&>(msg), std::move(tracker)); + case InternalBucketJoinCommand::ID: + return _splitJoinHandler.handleInternalBucketJoin(static_cast<InternalBucketJoinCommand&>(msg), std::move(tracker)); + case RecheckBucketInfoCommand::ID: + return _splitJoinHandler.handleRecheckBucketInfo(static_cast<RecheckBucketInfoCommand&>(msg), std::move(tracker)); + default: + LOG(warning, "Persistence handler received unhandled internal command %s", msg.toString().c_str()); + break; + } + default: + break; + } + return MessageTracker::UP(); +} + +void +PersistenceHandler::handleReply(api::StorageReply& reply) const +{ + switch (reply.getType().getId()) { + case api::MessageType::GETBUCKETDIFF_REPLY_ID: + _mergeHandler.handleGetBucketDiffReply(static_cast<api::GetBucketDiffReply&>(reply), _env._fileStorHandler); + break; + case api::MessageType::APPLYBUCKETDIFF_REPLY_ID: + _mergeHandler.handleApplyBucketDiffReply(static_cast<api::ApplyBucketDiffReply&>(reply), _env._fileStorHandler); + break; + default: + break; + } +} + +MessageTracker::UP +PersistenceHandler::processMessage(api::StorageMessage& msg, MessageTracker::UP tracker) const +{ + MBUS_TRACE(msg.getTrace(), 5, "PersistenceHandler: Processing message in persistence layer"); + + _env._metrics.operations.inc(); + if (msg.getType().isReply()) { + try{ + LOG(debug, "Handling reply: %s", msg.toString().c_str()); + LOG(spam, "Message content: %s", msg.toString(true).c_str()); + handleReply(static_cast<api::StorageReply&>(msg)); + } catch (std::exception& e) { + // It's a reply, so nothing we can do. + LOG(debug, "Caught exception for %s: %s", msg.toString().c_str(), e.what()); + } + } else { + auto & initiatingCommand = static_cast<api::StorageCommand&>(msg); + try { + LOG(debug, "Handling command: %s", msg.toString().c_str()); + LOG(spam, "Message content: %s", msg.toString(true).c_str()); + return handleCommandSplitByType(initiatingCommand, std::move(tracker)); + } catch (std::exception& e) { + LOG(debug, "Caught exception for %s: %s", msg.toString().c_str(), e.what()); + api::StorageReply::SP reply(initiatingCommand.makeReply()); + reply->setResult(api::ReturnCode(api::ReturnCode::INTERNAL_FAILURE, e.what())); + _env._fileStorHandler.sendReply(reply); + } + } + + return tracker; +} + +void +PersistenceHandler::processLockedMessage(FileStorHandler::LockedMessage lock) const { + LOG(debug, "NodeIndex %d, ptr=%p", _env._nodeIndex, lock.second.get()); + api::StorageMessage & msg(*lock.second); + + // Important: we _copy_ the message shared_ptr instead of moving to ensure that `msg` remains + // valid even if the tracker is destroyed by an exception in processMessage(). + auto tracker = std::make_unique<MessageTracker>(_env, _env._fileStorHandler, std::move(lock.first), lock.second); + tracker = processMessage(msg, std::move(tracker)); + if (tracker) { + tracker->sendReply(); + } +} + +} diff --git a/storage/src/vespa/storage/persistence/persistencehandler.h b/storage/src/vespa/storage/persistence/persistencehandler.h new file mode 100644 index 00000000000..2cfac865484 --- /dev/null +++ b/storage/src/vespa/storage/persistence/persistencehandler.h @@ -0,0 +1,54 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "processallhandler.h" +#include "mergehandler.h" +#include "asynchandler.h" +#include "persistenceutil.h" +#include "provider_error_wrapper.h" +#include "splitjoinhandler.h" +#include "simplemessagehandler.h" +#include <vespa/storage/common/storagecomponent.h> +#include <vespa/vespalib/util/isequencedtaskexecutor.h> +#include <vespa/config-stor-filestor.h> + +namespace storage { + +class BucketOwnershipNotifier; + +/** + * Handle all messages destined for the persistence layer. The detailed handling + * happens in other handlers, but is forked out after common setup has been done. + * Currently metrics are updated so each thread should have its own instance. + */ +class PersistenceHandler : public Types +{ +public: + PersistenceHandler(vespalib::ISequencedTaskExecutor &, ServiceLayerComponent & component, + const vespa::config::content::StorFilestorConfig &, spi::PersistenceProvider &, + FileStorHandler &, BucketOwnershipNotifier &, FileStorThreadMetrics&); + ~PersistenceHandler(); + + void processLockedMessage(FileStorHandler::LockedMessage lock) const; + + //TODO Rewrite tests to avoid this api leak + const AsyncHandler & asyncHandler() const { return _asyncHandler; } + const SplitJoinHandler & splitjoinHandler() const { return _splitJoinHandler; } + const SimpleMessageHandler & simpleMessageHandler() const { return _simpleHandler; } +private: + // Message handling functions + MessageTracker::UP handleCommandSplitByType(api::StorageCommand&, MessageTracker::UP tracker) const; + void handleReply(api::StorageReply&) const; + + MessageTracker::UP processMessage(api::StorageMessage& msg, MessageTracker::UP tracker) const; + + PersistenceUtil _env; + ProcessAllHandler _processAllHandler; + MergeHandler _mergeHandler; + AsyncHandler _asyncHandler; + SplitJoinHandler _splitJoinHandler; + SimpleMessageHandler _simpleHandler; +}; + +} // storage diff --git a/storage/src/vespa/storage/persistence/persistencethread.cpp b/storage/src/vespa/storage/persistence/persistencethread.cpp index 719c2130e1a..5738e2257ae 100644 --- a/storage/src/vespa/storage/persistence/persistencethread.cpp +++ b/storage/src/vespa/storage/persistence/persistencethread.cpp @@ -1,55 +1,22 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "persistencethread.h" -#include "splitbitdetector.h" -#include "bucketownershipnotifier.h" -#include "testandsethelper.h" -#include <vespa/storageapi/message/bucketsplitting.h> -#include <vespa/storage/common/bucketoperationlogger.h> -#include <vespa/document/fieldset/fieldsetrepo.h> -#include <vespa/document/base/exceptions.h> -#include <vespa/vespalib/util/exceptions.h> -#include <vespa/vespalib/util/isequencedtaskexecutor.h> +#include "persistencehandler.h" #include <thread> #include <vespa/log/log.h> LOG_SETUP(".persistence.thread"); -using vespalib::make_string_short::fmt; -using to_str = vespalib::string; - namespace storage { -namespace { - -vespalib::string -createThreadName(size_t stripeId) { - return fmt("PersistenceThread-%zu", stripeId); -} - -} - -PersistenceThread::PersistenceThread(vespalib::ISequencedTaskExecutor & sequencedExecutor, - ServiceLayerComponentRegister& compReg, - const vespa::config::content::StorFilestorConfig & cfg, - spi::PersistenceProvider& provider, - FileStorHandler& filestorHandler, - BucketOwnershipNotifier & bucketOwnershipNotifier, - FileStorThreadMetrics& metrics) - : _stripeId(filestorHandler.getNextStripeId()), - _component(std::make_unique<ServiceLayerComponent>(compReg, createThreadName(_stripeId))), - _env(*_component, filestorHandler, metrics, provider), - _spi(provider), - _processAllHandler(_env, provider), - _mergeHandler(_env, _spi, cfg.bucketMergeChunkSize, - cfg.enableMergeLocalNodeChooseDocsOptimalization, - cfg.commonMergeChainOptimalizationMinimumSize), - _asyncHandler(_env, _spi, sequencedExecutor), - _splitJoinHandler(_env, provider, bucketOwnershipNotifier, cfg.enableMultibitSplitOptimalization), - _simpleHandler(_env, provider), +PersistenceThread::PersistenceThread(PersistenceHandler & persistenceHandler, FileStorHandler & fileStorHandler, + uint32_t stripeId, framework::Component & component) + : _persistenceHandler(persistenceHandler), + _fileStorHandler(fileStorHandler), + _stripeId(stripeId), _thread() { - _thread = _component->startThread(*this, 60s, 1s); + _thread = component.startThread(*this, 60s, 1s); } PersistenceThread::~PersistenceThread() @@ -61,138 +28,18 @@ PersistenceThread::~PersistenceThread() LOG(debug, "Persistence thread done with destruction"); } -MessageTracker::UP -PersistenceThread::handleCommandSplitByType(api::StorageCommand& msg, MessageTracker::UP tracker) -{ - switch (msg.getType().getId()) { - case api::MessageType::GET_ID: - return _simpleHandler.handleGet(static_cast<api::GetCommand&>(msg), std::move(tracker)); - case api::MessageType::PUT_ID: - return _asyncHandler.handlePut(static_cast<api::PutCommand&>(msg), std::move(tracker)); - case api::MessageType::REMOVE_ID: - return _asyncHandler.handleRemove(static_cast<api::RemoveCommand&>(msg), std::move(tracker)); - case api::MessageType::UPDATE_ID: - return _asyncHandler.handleUpdate(static_cast<api::UpdateCommand&>(msg), std::move(tracker)); - case api::MessageType::REVERT_ID: - return _simpleHandler.handleRevert(static_cast<api::RevertCommand&>(msg), std::move(tracker)); - case api::MessageType::CREATEBUCKET_ID: - return _simpleHandler.handleCreateBucket(static_cast<api::CreateBucketCommand&>(msg), std::move(tracker)); - case api::MessageType::DELETEBUCKET_ID: - return _simpleHandler.handleDeleteBucket(static_cast<api::DeleteBucketCommand&>(msg), std::move(tracker)); - case api::MessageType::JOINBUCKETS_ID: - return _splitJoinHandler.handleJoinBuckets(static_cast<api::JoinBucketsCommand&>(msg), std::move(tracker)); - case api::MessageType::SPLITBUCKET_ID: - return _splitJoinHandler.handleSplitBucket(static_cast<api::SplitBucketCommand&>(msg), std::move(tracker)); - // Depends on iterators - case api::MessageType::STATBUCKET_ID: - return _processAllHandler.handleStatBucket(static_cast<api::StatBucketCommand&>(msg), std::move(tracker)); - case api::MessageType::REMOVELOCATION_ID: - return _processAllHandler.handleRemoveLocation(static_cast<api::RemoveLocationCommand&>(msg), std::move(tracker)); - case api::MessageType::MERGEBUCKET_ID: - return _mergeHandler.handleMergeBucket(static_cast<api::MergeBucketCommand&>(msg), std::move(tracker)); - case api::MessageType::GETBUCKETDIFF_ID: - return _mergeHandler.handleGetBucketDiff(static_cast<api::GetBucketDiffCommand&>(msg), std::move(tracker)); - case api::MessageType::APPLYBUCKETDIFF_ID: - return _mergeHandler.handleApplyBucketDiff(static_cast<api::ApplyBucketDiffCommand&>(msg), std::move(tracker)); - case api::MessageType::SETBUCKETSTATE_ID: - return _splitJoinHandler.handleSetBucketState(static_cast<api::SetBucketStateCommand&>(msg), std::move(tracker)); - case api::MessageType::INTERNAL_ID: - switch(static_cast<api::InternalCommand&>(msg).getType()) { - case GetIterCommand::ID: - return _simpleHandler.handleGetIter(static_cast<GetIterCommand&>(msg), std::move(tracker)); - case CreateIteratorCommand::ID: - return _simpleHandler.handleCreateIterator(static_cast<CreateIteratorCommand&>(msg), std::move(tracker)); - case ReadBucketList::ID: - return _simpleHandler.handleReadBucketList(static_cast<ReadBucketList&>(msg), std::move(tracker)); - case ReadBucketInfo::ID: - return _simpleHandler.handleReadBucketInfo(static_cast<ReadBucketInfo&>(msg), std::move(tracker)); - case InternalBucketJoinCommand::ID: - return _splitJoinHandler.handleInternalBucketJoin(static_cast<InternalBucketJoinCommand&>(msg), std::move(tracker)); - case RecheckBucketInfoCommand::ID: - return _splitJoinHandler.handleRecheckBucketInfo(static_cast<RecheckBucketInfoCommand&>(msg), std::move(tracker)); - default: - LOG(warning, "Persistence thread received unhandled internal command %s", msg.toString().c_str()); - break; - } - default: - break; - } - return MessageTracker::UP(); -} - -void -PersistenceThread::handleReply(api::StorageReply& reply) -{ - switch (reply.getType().getId()) { - case api::MessageType::GETBUCKETDIFF_REPLY_ID: - _mergeHandler.handleGetBucketDiffReply(static_cast<api::GetBucketDiffReply&>(reply), _env._fileStorHandler); - break; - case api::MessageType::APPLYBUCKETDIFF_REPLY_ID: - _mergeHandler.handleApplyBucketDiffReply(static_cast<api::ApplyBucketDiffReply&>(reply), _env._fileStorHandler); - break; - default: - break; - } -} - -MessageTracker::UP -PersistenceThread::processMessage(api::StorageMessage& msg, MessageTracker::UP tracker) -{ - MBUS_TRACE(msg.getTrace(), 5, "PersistenceThread: Processing message in persistence layer"); - - _env._metrics.operations.inc(); - if (msg.getType().isReply()) { - try{ - LOG(debug, "Handling reply: %s", msg.toString().c_str()); - LOG(spam, "Message content: %s", msg.toString(true).c_str()); - handleReply(static_cast<api::StorageReply&>(msg)); - } catch (std::exception& e) { - // It's a reply, so nothing we can do. - LOG(debug, "Caught exception for %s: %s", msg.toString().c_str(), e.what()); - } - } else { - auto & initiatingCommand = static_cast<api::StorageCommand&>(msg); - try { - LOG(debug, "Handling command: %s", msg.toString().c_str()); - LOG(spam, "Message content: %s", msg.toString(true).c_str()); - return handleCommandSplitByType(initiatingCommand, std::move(tracker)); - } catch (std::exception& e) { - LOG(debug, "Caught exception for %s: %s", msg.toString().c_str(), e.what()); - api::StorageReply::SP reply(initiatingCommand.makeReply()); - reply->setResult(api::ReturnCode(api::ReturnCode::INTERNAL_FAILURE, e.what())); - _env._fileStorHandler.sendReply(reply); - } - } - - return tracker; -} - -void -PersistenceThread::processLockedMessage(FileStorHandler::LockedMessage lock) { - LOG(debug, "NodeIndex %d, ptr=%p", _env._nodeIndex, lock.second.get()); - api::StorageMessage & msg(*lock.second); - - // Important: we _copy_ the message shared_ptr instead of moving to ensure that `msg` remains - // valid even if the tracker is destroyed by an exception in processMessage(). - auto tracker = std::make_unique<MessageTracker>(_env, _env._fileStorHandler, std::move(lock.first), lock.second); - tracker = processMessage(msg, std::move(tracker)); - if (tracker) { - tracker->sendReply(); - } -} - void PersistenceThread::run(framework::ThreadHandle& thread) { LOG(debug, "Started persistence thread"); - while (!thread.interrupted() && !_env._fileStorHandler.closed()) { + while (!thread.interrupted() && !_fileStorHandler.closed()) { thread.registerTick(); - FileStorHandler::LockedMessage lock(_env._fileStorHandler.getNextMessage(_stripeId)); + FileStorHandler::LockedMessage lock(_fileStorHandler.getNextMessage(_stripeId)); if (lock.first) { - processLockedMessage(std::move(lock)); + _persistenceHandler.processLockedMessage(std::move(lock)); } } LOG(debug, "Closing down persistence thread"); @@ -202,7 +49,7 @@ void PersistenceThread::flush() { //TODO Only need to check for this stripe. - while (_env._fileStorHandler.getQueueSize() != 0) { + while (_fileStorHandler.getQueueSize() != 0) { std::this_thread::sleep_for(1ms); } } diff --git a/storage/src/vespa/storage/persistence/persistencethread.h b/storage/src/vespa/storage/persistence/persistencethread.h index 917812868a7..9022ab5a862 100644 --- a/storage/src/vespa/storage/persistence/persistencethread.h +++ b/storage/src/vespa/storage/persistence/persistencethread.h @@ -3,59 +3,35 @@ #pragma once #include "diskthread.h" -#include "processallhandler.h" -#include "mergehandler.h" -#include "asynchandler.h" -#include "persistenceutil.h" -#include "provider_error_wrapper.h" -#include "splitjoinhandler.h" -#include "simplemessagehandler.h" -#include <vespa/storage/common/bucketmessages.h> -#include <vespa/storage/common/storagecomponent.h> -#include <vespa/storage/common/statusmessages.h> -#include <vespa/vespalib/util/isequencedtaskexecutor.h> -#include <vespa/config-stor-filestor.h> +#include "types.h" namespace storage { -class BucketOwnershipNotifier; +namespace framework { + class Component; + class Thread; +} + +class PersistenceHandler; +class FileStorHandler; class PersistenceThread final : public DiskThread, public Types { public: - PersistenceThread(vespalib::ISequencedTaskExecutor &, ServiceLayerComponentRegister &, - const vespa::config::content::StorFilestorConfig &, spi::PersistenceProvider &, - FileStorHandler &, BucketOwnershipNotifier &, FileStorThreadMetrics&); + PersistenceThread(PersistenceHandler & handler, FileStorHandler & fileStorHandler, + uint32_t stripeId, framework::Component & component); ~PersistenceThread() override; /** Waits for current operation to be finished. */ void flush() override; framework::Thread& getThread() override { return *_thread; } - //TODO Rewrite tests to avoid this api leak - const AsyncHandler & asyncHandler() const { return _asyncHandler; } - const SplitJoinHandler & splitjoinHandler() const { return _splitJoinHandler; } - const SimpleMessageHandler & simpleMessageHandler() const { return _simpleHandler; } private: - uint32_t _stripeId; - ServiceLayerComponent::UP _component; - PersistenceUtil _env; - spi::PersistenceProvider& _spi; - ProcessAllHandler _processAllHandler; - MergeHandler _mergeHandler; - AsyncHandler _asyncHandler; - SplitJoinHandler _splitJoinHandler; - SimpleMessageHandler _simpleHandler; - framework::Thread::UP _thread; - - // Message handling functions - MessageTracker::UP handleCommandSplitByType(api::StorageCommand&, MessageTracker::UP tracker); - void handleReply(api::StorageReply&); - - MessageTracker::UP processMessage(api::StorageMessage& msg, MessageTracker::UP tracker); - void processLockedMessage(FileStorHandler::LockedMessage lock); - - // Thread main loop + PersistenceHandler & _persistenceHandler; + FileStorHandler & _fileStorHandler; + uint32_t _stripeId; + std::unique_ptr<framework::Thread> _thread; + void run(framework::ThreadHandle&) override; }; diff --git a/storage/src/vespa/storage/persistence/persistenceutil.cpp b/storage/src/vespa/storage/persistence/persistenceutil.cpp index 35842297ef9..4ab3cdc5d01 100644 --- a/storage/src/vespa/storage/persistence/persistenceutil.cpp +++ b/storage/src/vespa/storage/persistence/persistenceutil.cpp @@ -25,13 +25,13 @@ namespace { const vespalib::duration WARN_ON_SLOW_OPERATIONS = 5s; } -MessageTracker::MessageTracker(PersistenceUtil & env, +MessageTracker::MessageTracker(const PersistenceUtil & env, MessageSender & replySender, FileStorHandler::BucketLockInterface::SP bucketLock, api::StorageMessage::SP msg) : MessageTracker(env, replySender, true, std::move(bucketLock), std::move(msg)) {} -MessageTracker::MessageTracker(PersistenceUtil & env, +MessageTracker::MessageTracker(const PersistenceUtil & env, MessageSender & replySender, bool updateBucketInfo, FileStorHandler::BucketLockInterface::SP bucketLock, @@ -225,7 +225,7 @@ PersistenceUtil::lockAndGetDisk(const document::Bucket &bucket, } void -PersistenceUtil::setBucketInfo(MessageTracker& tracker, const document::Bucket &bucket) +PersistenceUtil::setBucketInfo(MessageTracker& tracker, const document::Bucket &bucket) const { api::BucketInfo info = getBucketInfo(bucket); diff --git a/storage/src/vespa/storage/persistence/persistenceutil.h b/storage/src/vespa/storage/persistence/persistenceutil.h index c90eee4b7ae..ffce25b1e49 100644 --- a/storage/src/vespa/storage/persistence/persistenceutil.h +++ b/storage/src/vespa/storage/persistence/persistenceutil.h @@ -17,7 +17,7 @@ class MessageTracker : protected Types { public: typedef std::unique_ptr<MessageTracker> UP; - MessageTracker(PersistenceUtil & env, MessageSender & replySender, + MessageTracker(const PersistenceUtil & env, MessageSender & replySender, FileStorHandler::BucketLockInterface::SP bucketLock, api::StorageMessage::SP msg); ~MessageTracker(); @@ -74,7 +74,7 @@ public: FileStorHandler::BucketLockInterface::SP bucketLock, api::StorageMessage::SP msg); private: - MessageTracker(PersistenceUtil & env, MessageSender & replySender, bool updateBucketInfo, + MessageTracker(const PersistenceUtil & env, MessageSender & replySender, bool updateBucketInfo, FileStorHandler::BucketLockInterface::SP bucketLock, api::StorageMessage::SP msg); [[nodiscard]] bool count_result_as_failure() const noexcept; @@ -84,7 +84,7 @@ private: FileStorHandler::BucketLockInterface::SP _bucketLock; api::StorageMessage::SP _msg; spi::Context _context; - PersistenceUtil &_env; + const PersistenceUtil &_env; MessageSender &_replySender; FileStorThreadMetrics::Op *_metric; // needs a better and thread safe solution api::StorageReply::SP _reply; @@ -132,7 +132,7 @@ struct PersistenceUtil { static api::BucketInfo convertBucketInfo(const spi::BucketInfo&); - void setBucketInfo(MessageTracker& tracker, const document::Bucket &bucket); + void setBucketInfo(MessageTracker& tracker, const document::Bucket &bucket) const; spi::Bucket getBucket(const document::DocumentId& id, const document::Bucket &bucket) const; diff --git a/storage/src/vespa/storage/persistence/processallhandler.cpp b/storage/src/vespa/storage/persistence/processallhandler.cpp index 0a1141a9ab3..a9c1aafd4d9 100644 --- a/storage/src/vespa/storage/persistence/processallhandler.cpp +++ b/storage/src/vespa/storage/persistence/processallhandler.cpp @@ -74,7 +74,7 @@ public: } MessageTracker::UP -ProcessAllHandler::handleRemoveLocation(api::RemoveLocationCommand& cmd, MessageTracker::UP tracker) +ProcessAllHandler::handleRemoveLocation(api::RemoveLocationCommand& cmd, MessageTracker::UP tracker) const { tracker->setMetric(_env._metrics.removeLocation[cmd.getLoadType()]); @@ -93,7 +93,7 @@ ProcessAllHandler::handleRemoveLocation(api::RemoveLocationCommand& cmd, Message } MessageTracker::UP -ProcessAllHandler::handleStatBucket(api::StatBucketCommand& cmd, MessageTracker::UP tracker) +ProcessAllHandler::handleStatBucket(api::StatBucketCommand& cmd, MessageTracker::UP tracker) const { tracker->setMetric(_env._metrics.statBucket[cmd.getLoadType()]); std::ostringstream ost; diff --git a/storage/src/vespa/storage/persistence/processallhandler.h b/storage/src/vespa/storage/persistence/processallhandler.h index 9c0f8905744..14b6bced8a7 100644 --- a/storage/src/vespa/storage/persistence/processallhandler.h +++ b/storage/src/vespa/storage/persistence/processallhandler.h @@ -14,8 +14,8 @@ struct PersistenceUtil; class ProcessAllHandler : public Types { public: ProcessAllHandler(const PersistenceUtil&, spi::PersistenceProvider&); - MessageTrackerUP handleRemoveLocation(api::RemoveLocationCommand&, MessageTrackerUP tracker); - MessageTrackerUP handleStatBucket(api::StatBucketCommand&, MessageTrackerUP tracker); + MessageTrackerUP handleRemoveLocation(api::RemoveLocationCommand&, MessageTrackerUP tracker) const; + MessageTrackerUP handleStatBucket(api::StatBucketCommand&, MessageTrackerUP tracker) const; private: const PersistenceUtil& _env; spi::PersistenceProvider& _spi; diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.h b/storage/src/vespa/storage/persistence/provider_error_wrapper.h index d4108e6322d..5e5682a8bb4 100644 --- a/storage/src/vespa/storage/persistence/provider_error_wrapper.h +++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.h @@ -13,9 +13,6 @@ #pragma once #include <vespa/persistence/spi/persistenceprovider.h> -#include <vector> -#include <string> -#include <memory> #include <mutex> namespace storage { |