diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-10-15 13:28:07 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2020-10-15 13:28:07 +0000 |
commit | e4e61c963812479c90c576f0291043a0d16d8e5d (patch) | |
tree | 0acdcc8e89eac4df1a6463ead0e4973b668849fc /storage/src | |
parent | e58457d582ccb5f3b5061d5c8028312310d7b08f (diff) |
There will always be a sequencer.
Diffstat (limited to 'storage/src')
7 files changed, 49 insertions, 65 deletions
diff --git a/storage/src/tests/common/teststorageapp.cpp b/storage/src/tests/common/teststorageapp.cpp index 1c69e87e38e..edea33491af 100644 --- a/storage/src/tests/common/teststorageapp.cpp +++ b/storage/src/tests/common/teststorageapp.cpp @@ -139,7 +139,8 @@ TestServiceLayerApp::TestServiceLayerApp(vespalib::stringref configId) : TestStorageApp(std::make_unique<ServiceLayerComponentRegisterImpl>(true), // TODO remove B-tree flag once default lib::NodeType::STORAGE, getIndexFromConfig(configId), configId), _compReg(dynamic_cast<ServiceLayerComponentRegisterImpl&>(TestStorageApp::getComponentRegister())), - _persistenceProvider() + _persistenceProvider(), + _executor(vespalib::SequencedTaskExecutor::create(1)) { _compReg.setDiskCount(1); lib::NodeState ns(*_nodeStateUpdater.getReportedNodeState()); @@ -152,7 +153,8 @@ TestServiceLayerApp::TestServiceLayerApp(NodeIndex index, : TestStorageApp(std::make_unique<ServiceLayerComponentRegisterImpl>(true), // TODO remove B-tree flag once default lib::NodeType::STORAGE, index, configId), _compReg(dynamic_cast<ServiceLayerComponentRegisterImpl&>(TestStorageApp::getComponentRegister())), - _persistenceProvider() + _persistenceProvider(), + _executor(vespalib::SequencedTaskExecutor::create(1)) { _compReg.setDiskCount(1); lib::NodeState ns(*_nodeStateUpdater.getReportedNodeState()); diff --git a/storage/src/tests/common/teststorageapp.h b/storage/src/tests/common/teststorageapp.h index 37c9e3c7ffe..54668cac515 100644 --- a/storage/src/tests/common/teststorageapp.h +++ b/storage/src/tests/common/teststorageapp.h @@ -29,6 +29,7 @@ #include <vespa/persistence/spi/persistenceprovider.h> #include <vespa/document/bucket/fixed_bucket_spaces.h> #include <vespa/document/base/testdocman.h> +#include <vespa/vespalib/util/sequencedtaskexecutor.h> #include <atomic> namespace storage { @@ -110,6 +111,7 @@ class TestServiceLayerApp : public TestStorageApp using PersistenceProviderUP = std::unique_ptr<spi::PersistenceProvider>; ServiceLayerComponentRegisterImpl& _compReg; PersistenceProviderUP _persistenceProvider; + std::unique_ptr<vespalib::ISequencedTaskExecutor> _executor; public: TestServiceLayerApp(vespalib::stringref configId); @@ -126,6 +128,7 @@ public: StorBucketDatabase& getStorageBucketDatabase() override { return _compReg.getBucketSpaceRepo().get(document::FixedBucketSpaces::default_space()).bucketDatabase(); } + vespalib::ISequencedTaskExecutor & executor() { return *_executor; } private: // For storage server interface implementation we'll get rid of soon. diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp index 3525563eb7a..f61a31e5898 100644 --- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp +++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp @@ -192,14 +192,15 @@ bool fileExistsWithin(const std::string& path, const std::string& file) { return !(findFile(path, file) == ""); } -std::unique_ptr<DiskThread> createThread(vdstestlib::DirConfig& config, - TestServiceLayerApp& node, - spi::PersistenceProvider& provider, - FileStorHandler& filestorHandler, - FileStorThreadMetrics& metrics) +std::unique_ptr<DiskThread> +createThread(vdstestlib::DirConfig& config, + TestServiceLayerApp& node, + spi::PersistenceProvider& provider, + FileStorHandler& filestorHandler, + FileStorThreadMetrics& metrics) { (void) config; - return std::make_unique<PersistenceThread>(nullptr,node.getComponentRegister(), config.getConfigId(), + return std::make_unique<PersistenceThread>(node.executor(), node.getComponentRegister(), config.getConfigId(), provider, filestorHandler, metrics); } diff --git a/storage/src/tests/persistence/persistencetestutils.cpp b/storage/src/tests/persistence/persistencetestutils.cpp index 76e805639b7..67d148fb317 100644 --- a/storage/src/tests/persistence/persistencetestutils.cpp +++ b/storage/src/tests/persistence/persistencetestutils.cpp @@ -88,7 +88,7 @@ PersistenceTestUtils::setupExecutor(uint32_t numThreads) { std::unique_ptr<PersistenceThread> PersistenceTestUtils::createPersistenceThread() { - return std::make_unique<PersistenceThread>(_sequenceTaskExecutor.get(), _env->_node.getComponentRegister(), + return std::make_unique<PersistenceThread>(*_sequenceTaskExecutor, _env->_node.getComponentRegister(), _env->_config.getConfigId(),getPersistenceProvider(), getEnv()._fileStorHandler, getEnv()._metrics); } diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp index 88bfb18841c..3b60e3c61ee 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp @@ -86,7 +86,7 @@ FileStorManager::print(std::ostream& out, bool verbose, const std::string& inden namespace { uint32_t computeNumResponseThreads(int configured) { - return (configured < 0) ? std::max(1u, std::thread::hardware_concurrency()/4) : configured; + return (configured <= 0) ? std::max(1u, std::thread::hardware_concurrency()/4) : configured; } vespalib::Executor::OptimizeFor @@ -125,12 +125,11 @@ FileStorManager::configure(std::unique_ptr<vespa::config::content::StorFilestorC _filestorHandler = std::make_unique<FileStorHandler>(numThreads, numStripes, *this, *_metrics, _compReg); uint32_t numResponseThreads = computeNumResponseThreads(_config->numResponseThreads); - if (numResponseThreads > 0) { - _sequencedExecutor = vespalib::SequencedTaskExecutor::create(numResponseThreads, 10000, selectSequencer(_config->responseSequencerType)); - } + _sequencedExecutor = vespalib::SequencedTaskExecutor::create(numResponseThreads, 10000, selectSequencer(_config->responseSequencerType)); + assert(_sequencedExecutor); LOG(spam, "Setting up the disk"); for (uint32_t j = 0; j < numThreads; j++) { - _threads.push_back(std::make_shared<PersistenceThread>(_sequencedExecutor.get(), _compReg, _configUri, *_provider, + _threads.push_back(std::make_shared<PersistenceThread>(*_sequencedExecutor, _compReg, _configUri, *_provider, *_filestorHandler, *_metrics->disks[0]->threads[j])); } } diff --git a/storage/src/vespa/storage/persistence/persistencethread.cpp b/storage/src/vespa/storage/persistence/persistencethread.cpp index 1e0bcac28fa..fdafa92ed70 100644 --- a/storage/src/vespa/storage/persistence/persistencethread.cpp +++ b/storage/src/vespa/storage/persistence/persistencethread.cpp @@ -94,7 +94,7 @@ private: } -PersistenceThread::PersistenceThread(vespalib::ISequencedTaskExecutor * sequencedExecutor, +PersistenceThread::PersistenceThread(vespalib::ISequencedTaskExecutor & sequencedExecutor, ServiceLayerComponentRegister& compReg, const config::ConfigUri & configUri, spi::PersistenceProvider& provider, @@ -180,17 +180,13 @@ PersistenceThread::handlePut(api::PutCommand& cmd, MessageTracker::UP trackerUP) } spi::Bucket bucket = getBucket(cmd.getDocumentId(), cmd.getBucket()); - if (_sequencedExecutor == nullptr) { - spi::Result response = _spi.put(bucket, spi::Timestamp(cmd.getTimestamp()), std::move(cmd.getDocument()),tracker.context()); - tracker.checkForError(response); - } else { - auto task = makeResultTask([tracker = std::move(trackerUP)](spi::Result::UP response) { - tracker->checkForError(*response); - tracker->sendReply(); - }); - _spi.putAsync(bucket, spi::Timestamp(cmd.getTimestamp()), std::move(cmd.getDocument()), tracker.context(), - std::make_unique<ResultTaskOperationDone>(*_sequencedExecutor, cmd.getBucketId(), std::move(task))); - } + auto task = makeResultTask([tracker = std::move(trackerUP)](spi::Result::UP response) { + tracker->checkForError(*response); + tracker->sendReply(); + }); + _spi.putAsync(bucket, spi::Timestamp(cmd.getTimestamp()), std::move(cmd.getDocument()), tracker.context(), + std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, cmd.getBucketId(), std::move(task))); + return trackerUP; } @@ -208,29 +204,20 @@ PersistenceThread::handleRemove(api::RemoveCommand& cmd, MessageTracker::UP trac } spi::Bucket bucket = getBucket(cmd.getDocumentId(), cmd.getBucket()); - if (_sequencedExecutor == nullptr) { - spi::RemoveResult response = _spi.removeIfFound(bucket, spi::Timestamp(cmd.getTimestamp()), cmd.getDocumentId(),tracker.context()); - if (tracker.checkForError(response)) { - tracker.setReply(std::make_shared<api::RemoveReply>(cmd, response.wasFound() ? cmd.getTimestamp() : 0)); + + // Note that the &cmd capture is OK since its lifetime is guaranteed by the tracker + auto task = makeResultTask([&metrics, &cmd, tracker = std::move(trackerUP)](spi::Result::UP responseUP) { + auto & response = dynamic_cast<const spi::RemoveResult &>(*responseUP); + if (tracker->checkForError(response)) { + tracker->setReply(std::make_shared<api::RemoveReply>(cmd, response.wasFound() ? cmd.getTimestamp() : 0)); } if (!response.wasFound()) { metrics.notFound.inc(); } - } else { - // Note that the &cmd capture is OK since its lifetime is guaranteed by the tracker - auto task = makeResultTask([&metrics, &cmd, tracker = std::move(trackerUP)](spi::Result::UP responseUP) { - auto & response = dynamic_cast<const spi::RemoveResult &>(*responseUP); - if (tracker->checkForError(response)) { - tracker->setReply(std::make_shared<api::RemoveReply>(cmd, response.wasFound() ? cmd.getTimestamp() : 0)); - } - if (!response.wasFound()) { - metrics.notFound.inc(); - } - tracker->sendReply(); - }); - _spi.removeIfFoundAsync(bucket, spi::Timestamp(cmd.getTimestamp()), cmd.getDocumentId(), tracker.context(), - std::make_unique<ResultTaskOperationDone>(*_sequencedExecutor, cmd.getBucketId(), std::move(task))); - } + tracker->sendReply(); + }); + _spi.removeIfFoundAsync(bucket, spi::Timestamp(cmd.getTimestamp()), cmd.getDocumentId(), tracker.context(), + std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, cmd.getBucketId(), std::move(task))); return trackerUP; } @@ -248,27 +235,19 @@ PersistenceThread::handleUpdate(api::UpdateCommand& cmd, MessageTracker::UP trac } spi::Bucket bucket = getBucket(cmd.getDocumentId(), cmd.getBucket()); - if (_sequencedExecutor == nullptr) { - spi::UpdateResult response = _spi.update(bucket, spi::Timestamp(cmd.getTimestamp()), std::move(cmd.getUpdate()),tracker.context()); - if (tracker.checkForError(response)) { + + // Note that the &cmd capture is OK since its lifetime is guaranteed by the tracker + auto task = makeResultTask([&cmd, tracker = std::move(trackerUP)](spi::Result::UP responseUP) { + auto & response = dynamic_cast<const spi::UpdateResult &>(*responseUP); + if (tracker->checkForError(response)) { auto reply = std::make_shared<api::UpdateReply>(cmd); reply->setOldTimestamp(response.getExistingTimestamp()); - tracker.setReply(std::move(reply)); + tracker->setReply(std::move(reply)); } - } else { - // Note that the &cmd capture is OK since its lifetime is guaranteed by the tracker - auto task = makeResultTask([&cmd, tracker = std::move(trackerUP)](spi::Result::UP responseUP) { - auto & response = dynamic_cast<const spi::UpdateResult &>(*responseUP); - if (tracker->checkForError(response)) { - auto reply = std::make_shared<api::UpdateReply>(cmd); - reply->setOldTimestamp(response.getExistingTimestamp()); - tracker->setReply(std::move(reply)); - } - tracker->sendReply(); - }); - _spi.updateAsync(bucket, spi::Timestamp(cmd.getTimestamp()), std::move(cmd.getUpdate()), tracker.context(), - std::make_unique<ResultTaskOperationDone>(*_sequencedExecutor, cmd.getBucketId(), std::move(task))); - } + tracker->sendReply(); + }); + _spi.updateAsync(bucket, spi::Timestamp(cmd.getTimestamp()), std::move(cmd.getUpdate()), tracker.context(), + std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, cmd.getBucketId(), std::move(task))); return trackerUP; } diff --git a/storage/src/vespa/storage/persistence/persistencethread.h b/storage/src/vespa/storage/persistence/persistencethread.h index d2216d6fb5e..6643345353c 100644 --- a/storage/src/vespa/storage/persistence/persistencethread.h +++ b/storage/src/vespa/storage/persistence/persistencethread.h @@ -20,7 +20,7 @@ class TestAndSetHelper; class PersistenceThread final : public DiskThread, public Types { public: - PersistenceThread(vespalib::ISequencedTaskExecutor *, ServiceLayerComponentRegister&, + PersistenceThread(vespalib::ISequencedTaskExecutor &, ServiceLayerComponentRegister&, const config::ConfigUri & configUri, spi::PersistenceProvider& provider, FileStorHandler& filestorHandler, FileStorThreadMetrics& metrics); ~PersistenceThread() override; @@ -49,7 +49,7 @@ public: private: uint32_t _stripeId; PersistenceUtil _env; - vespalib::ISequencedTaskExecutor * _sequencedExecutor; + vespalib::ISequencedTaskExecutor & _sequencedExecutor; spi::PersistenceProvider& _spi; ProcessAllHandler _processAllHandler; MergeHandler _mergeHandler; |