aboutsummaryrefslogtreecommitdiffstats
path: root/storage/src
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-10-15 13:28:07 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2020-10-15 13:28:07 +0000
commite4e61c963812479c90c576f0291043a0d16d8e5d (patch)
tree0acdcc8e89eac4df1a6463ead0e4973b668849fc /storage/src
parente58457d582ccb5f3b5061d5c8028312310d7b08f (diff)
There will always be a sequencer.
Diffstat (limited to 'storage/src')
-rw-r--r--storage/src/tests/common/teststorageapp.cpp6
-rw-r--r--storage/src/tests/common/teststorageapp.h3
-rw-r--r--storage/src/tests/persistence/filestorage/filestormanagertest.cpp13
-rw-r--r--storage/src/tests/persistence/persistencetestutils.cpp2
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp9
-rw-r--r--storage/src/vespa/storage/persistence/persistencethread.cpp77
-rw-r--r--storage/src/vespa/storage/persistence/persistencethread.h4
7 files changed, 49 insertions, 65 deletions
diff --git a/storage/src/tests/common/teststorageapp.cpp b/storage/src/tests/common/teststorageapp.cpp
index 1c69e87e38e..edea33491af 100644
--- a/storage/src/tests/common/teststorageapp.cpp
+++ b/storage/src/tests/common/teststorageapp.cpp
@@ -139,7 +139,8 @@ TestServiceLayerApp::TestServiceLayerApp(vespalib::stringref configId)
: TestStorageApp(std::make_unique<ServiceLayerComponentRegisterImpl>(true), // TODO remove B-tree flag once default
lib::NodeType::STORAGE, getIndexFromConfig(configId), configId),
_compReg(dynamic_cast<ServiceLayerComponentRegisterImpl&>(TestStorageApp::getComponentRegister())),
- _persistenceProvider()
+ _persistenceProvider(),
+ _executor(vespalib::SequencedTaskExecutor::create(1))
{
_compReg.setDiskCount(1);
lib::NodeState ns(*_nodeStateUpdater.getReportedNodeState());
@@ -152,7 +153,8 @@ TestServiceLayerApp::TestServiceLayerApp(NodeIndex index,
: TestStorageApp(std::make_unique<ServiceLayerComponentRegisterImpl>(true), // TODO remove B-tree flag once default
lib::NodeType::STORAGE, index, configId),
_compReg(dynamic_cast<ServiceLayerComponentRegisterImpl&>(TestStorageApp::getComponentRegister())),
- _persistenceProvider()
+ _persistenceProvider(),
+ _executor(vespalib::SequencedTaskExecutor::create(1))
{
_compReg.setDiskCount(1);
lib::NodeState ns(*_nodeStateUpdater.getReportedNodeState());
diff --git a/storage/src/tests/common/teststorageapp.h b/storage/src/tests/common/teststorageapp.h
index 37c9e3c7ffe..54668cac515 100644
--- a/storage/src/tests/common/teststorageapp.h
+++ b/storage/src/tests/common/teststorageapp.h
@@ -29,6 +29,7 @@
#include <vespa/persistence/spi/persistenceprovider.h>
#include <vespa/document/bucket/fixed_bucket_spaces.h>
#include <vespa/document/base/testdocman.h>
+#include <vespa/vespalib/util/sequencedtaskexecutor.h>
#include <atomic>
namespace storage {
@@ -110,6 +111,7 @@ class TestServiceLayerApp : public TestStorageApp
using PersistenceProviderUP = std::unique_ptr<spi::PersistenceProvider>;
ServiceLayerComponentRegisterImpl& _compReg;
PersistenceProviderUP _persistenceProvider;
+ std::unique_ptr<vespalib::ISequencedTaskExecutor> _executor;
public:
TestServiceLayerApp(vespalib::stringref configId);
@@ -126,6 +128,7 @@ public:
StorBucketDatabase& getStorageBucketDatabase() override {
return _compReg.getBucketSpaceRepo().get(document::FixedBucketSpaces::default_space()).bucketDatabase();
}
+ vespalib::ISequencedTaskExecutor & executor() { return *_executor; }
private:
// For storage server interface implementation we'll get rid of soon.
diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
index 3525563eb7a..f61a31e5898 100644
--- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
+++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
@@ -192,14 +192,15 @@ bool fileExistsWithin(const std::string& path, const std::string& file) {
return !(findFile(path, file) == "");
}
-std::unique_ptr<DiskThread> createThread(vdstestlib::DirConfig& config,
- TestServiceLayerApp& node,
- spi::PersistenceProvider& provider,
- FileStorHandler& filestorHandler,
- FileStorThreadMetrics& metrics)
+std::unique_ptr<DiskThread>
+createThread(vdstestlib::DirConfig& config,
+ TestServiceLayerApp& node,
+ spi::PersistenceProvider& provider,
+ FileStorHandler& filestorHandler,
+ FileStorThreadMetrics& metrics)
{
(void) config;
- return std::make_unique<PersistenceThread>(nullptr,node.getComponentRegister(), config.getConfigId(),
+ return std::make_unique<PersistenceThread>(node.executor(), node.getComponentRegister(), config.getConfigId(),
provider, filestorHandler, metrics);
}
diff --git a/storage/src/tests/persistence/persistencetestutils.cpp b/storage/src/tests/persistence/persistencetestutils.cpp
index 76e805639b7..67d148fb317 100644
--- a/storage/src/tests/persistence/persistencetestutils.cpp
+++ b/storage/src/tests/persistence/persistencetestutils.cpp
@@ -88,7 +88,7 @@ PersistenceTestUtils::setupExecutor(uint32_t numThreads) {
std::unique_ptr<PersistenceThread>
PersistenceTestUtils::createPersistenceThread()
{
- return std::make_unique<PersistenceThread>(_sequenceTaskExecutor.get(), _env->_node.getComponentRegister(),
+ return std::make_unique<PersistenceThread>(*_sequenceTaskExecutor, _env->_node.getComponentRegister(),
_env->_config.getConfigId(),getPersistenceProvider(),
getEnv()._fileStorHandler, getEnv()._metrics);
}
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
index 88bfb18841c..3b60e3c61ee 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
@@ -86,7 +86,7 @@ FileStorManager::print(std::ostream& out, bool verbose, const std::string& inden
namespace {
uint32_t computeNumResponseThreads(int configured) {
- return (configured < 0) ? std::max(1u, std::thread::hardware_concurrency()/4) : configured;
+ return (configured <= 0) ? std::max(1u, std::thread::hardware_concurrency()/4) : configured;
}
vespalib::Executor::OptimizeFor
@@ -125,12 +125,11 @@ FileStorManager::configure(std::unique_ptr<vespa::config::content::StorFilestorC
_filestorHandler = std::make_unique<FileStorHandler>(numThreads, numStripes, *this, *_metrics, _compReg);
uint32_t numResponseThreads = computeNumResponseThreads(_config->numResponseThreads);
- if (numResponseThreads > 0) {
- _sequencedExecutor = vespalib::SequencedTaskExecutor::create(numResponseThreads, 10000, selectSequencer(_config->responseSequencerType));
- }
+ _sequencedExecutor = vespalib::SequencedTaskExecutor::create(numResponseThreads, 10000, selectSequencer(_config->responseSequencerType));
+ assert(_sequencedExecutor);
LOG(spam, "Setting up the disk");
for (uint32_t j = 0; j < numThreads; j++) {
- _threads.push_back(std::make_shared<PersistenceThread>(_sequencedExecutor.get(), _compReg, _configUri, *_provider,
+ _threads.push_back(std::make_shared<PersistenceThread>(*_sequencedExecutor, _compReg, _configUri, *_provider,
*_filestorHandler, *_metrics->disks[0]->threads[j]));
}
}
diff --git a/storage/src/vespa/storage/persistence/persistencethread.cpp b/storage/src/vespa/storage/persistence/persistencethread.cpp
index 1e0bcac28fa..fdafa92ed70 100644
--- a/storage/src/vespa/storage/persistence/persistencethread.cpp
+++ b/storage/src/vespa/storage/persistence/persistencethread.cpp
@@ -94,7 +94,7 @@ private:
}
-PersistenceThread::PersistenceThread(vespalib::ISequencedTaskExecutor * sequencedExecutor,
+PersistenceThread::PersistenceThread(vespalib::ISequencedTaskExecutor & sequencedExecutor,
ServiceLayerComponentRegister& compReg,
const config::ConfigUri & configUri,
spi::PersistenceProvider& provider,
@@ -180,17 +180,13 @@ PersistenceThread::handlePut(api::PutCommand& cmd, MessageTracker::UP trackerUP)
}
spi::Bucket bucket = getBucket(cmd.getDocumentId(), cmd.getBucket());
- if (_sequencedExecutor == nullptr) {
- spi::Result response = _spi.put(bucket, spi::Timestamp(cmd.getTimestamp()), std::move(cmd.getDocument()),tracker.context());
- tracker.checkForError(response);
- } else {
- auto task = makeResultTask([tracker = std::move(trackerUP)](spi::Result::UP response) {
- tracker->checkForError(*response);
- tracker->sendReply();
- });
- _spi.putAsync(bucket, spi::Timestamp(cmd.getTimestamp()), std::move(cmd.getDocument()), tracker.context(),
- std::make_unique<ResultTaskOperationDone>(*_sequencedExecutor, cmd.getBucketId(), std::move(task)));
- }
+ auto task = makeResultTask([tracker = std::move(trackerUP)](spi::Result::UP response) {
+ tracker->checkForError(*response);
+ tracker->sendReply();
+ });
+ _spi.putAsync(bucket, spi::Timestamp(cmd.getTimestamp()), std::move(cmd.getDocument()), tracker.context(),
+ std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, cmd.getBucketId(), std::move(task)));
+
return trackerUP;
}
@@ -208,29 +204,20 @@ PersistenceThread::handleRemove(api::RemoveCommand& cmd, MessageTracker::UP trac
}
spi::Bucket bucket = getBucket(cmd.getDocumentId(), cmd.getBucket());
- if (_sequencedExecutor == nullptr) {
- spi::RemoveResult response = _spi.removeIfFound(bucket, spi::Timestamp(cmd.getTimestamp()), cmd.getDocumentId(),tracker.context());
- if (tracker.checkForError(response)) {
- tracker.setReply(std::make_shared<api::RemoveReply>(cmd, response.wasFound() ? cmd.getTimestamp() : 0));
+
+ // Note that the &cmd capture is OK since its lifetime is guaranteed by the tracker
+ auto task = makeResultTask([&metrics, &cmd, tracker = std::move(trackerUP)](spi::Result::UP responseUP) {
+ auto & response = dynamic_cast<const spi::RemoveResult &>(*responseUP);
+ if (tracker->checkForError(response)) {
+ tracker->setReply(std::make_shared<api::RemoveReply>(cmd, response.wasFound() ? cmd.getTimestamp() : 0));
}
if (!response.wasFound()) {
metrics.notFound.inc();
}
- } else {
- // Note that the &cmd capture is OK since its lifetime is guaranteed by the tracker
- auto task = makeResultTask([&metrics, &cmd, tracker = std::move(trackerUP)](spi::Result::UP responseUP) {
- auto & response = dynamic_cast<const spi::RemoveResult &>(*responseUP);
- if (tracker->checkForError(response)) {
- tracker->setReply(std::make_shared<api::RemoveReply>(cmd, response.wasFound() ? cmd.getTimestamp() : 0));
- }
- if (!response.wasFound()) {
- metrics.notFound.inc();
- }
- tracker->sendReply();
- });
- _spi.removeIfFoundAsync(bucket, spi::Timestamp(cmd.getTimestamp()), cmd.getDocumentId(), tracker.context(),
- std::make_unique<ResultTaskOperationDone>(*_sequencedExecutor, cmd.getBucketId(), std::move(task)));
- }
+ tracker->sendReply();
+ });
+ _spi.removeIfFoundAsync(bucket, spi::Timestamp(cmd.getTimestamp()), cmd.getDocumentId(), tracker.context(),
+ std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, cmd.getBucketId(), std::move(task)));
return trackerUP;
}
@@ -248,27 +235,19 @@ PersistenceThread::handleUpdate(api::UpdateCommand& cmd, MessageTracker::UP trac
}
spi::Bucket bucket = getBucket(cmd.getDocumentId(), cmd.getBucket());
- if (_sequencedExecutor == nullptr) {
- spi::UpdateResult response = _spi.update(bucket, spi::Timestamp(cmd.getTimestamp()), std::move(cmd.getUpdate()),tracker.context());
- if (tracker.checkForError(response)) {
+
+ // Note that the &cmd capture is OK since its lifetime is guaranteed by the tracker
+ auto task = makeResultTask([&cmd, tracker = std::move(trackerUP)](spi::Result::UP responseUP) {
+ auto & response = dynamic_cast<const spi::UpdateResult &>(*responseUP);
+ if (tracker->checkForError(response)) {
auto reply = std::make_shared<api::UpdateReply>(cmd);
reply->setOldTimestamp(response.getExistingTimestamp());
- tracker.setReply(std::move(reply));
+ tracker->setReply(std::move(reply));
}
- } else {
- // Note that the &cmd capture is OK since its lifetime is guaranteed by the tracker
- auto task = makeResultTask([&cmd, tracker = std::move(trackerUP)](spi::Result::UP responseUP) {
- auto & response = dynamic_cast<const spi::UpdateResult &>(*responseUP);
- if (tracker->checkForError(response)) {
- auto reply = std::make_shared<api::UpdateReply>(cmd);
- reply->setOldTimestamp(response.getExistingTimestamp());
- tracker->setReply(std::move(reply));
- }
- tracker->sendReply();
- });
- _spi.updateAsync(bucket, spi::Timestamp(cmd.getTimestamp()), std::move(cmd.getUpdate()), tracker.context(),
- std::make_unique<ResultTaskOperationDone>(*_sequencedExecutor, cmd.getBucketId(), std::move(task)));
- }
+ tracker->sendReply();
+ });
+ _spi.updateAsync(bucket, spi::Timestamp(cmd.getTimestamp()), std::move(cmd.getUpdate()), tracker.context(),
+ std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, cmd.getBucketId(), std::move(task)));
return trackerUP;
}
diff --git a/storage/src/vespa/storage/persistence/persistencethread.h b/storage/src/vespa/storage/persistence/persistencethread.h
index d2216d6fb5e..6643345353c 100644
--- a/storage/src/vespa/storage/persistence/persistencethread.h
+++ b/storage/src/vespa/storage/persistence/persistencethread.h
@@ -20,7 +20,7 @@ class TestAndSetHelper;
class PersistenceThread final : public DiskThread, public Types
{
public:
- PersistenceThread(vespalib::ISequencedTaskExecutor *, ServiceLayerComponentRegister&,
+ PersistenceThread(vespalib::ISequencedTaskExecutor &, ServiceLayerComponentRegister&,
const config::ConfigUri & configUri, spi::PersistenceProvider& provider,
FileStorHandler& filestorHandler, FileStorThreadMetrics& metrics);
~PersistenceThread() override;
@@ -49,7 +49,7 @@ public:
private:
uint32_t _stripeId;
PersistenceUtil _env;
- vespalib::ISequencedTaskExecutor * _sequencedExecutor;
+ vespalib::ISequencedTaskExecutor & _sequencedExecutor;
spi::PersistenceProvider& _spi;
ProcessAllHandler _processAllHandler;
MergeHandler _mergeHandler;