aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-10-20 10:54:32 +0200
committerGitHub <noreply@github.com>2020-10-20 10:54:32 +0200
commit45c15e502a9655355071a59f529b0acf93af6a1c (patch)
tree901c3ee62359187d056251a88380cdf669b01b20
parente698e639b67d89f374c7457d87c05012a242de75 (diff)
parent026648d68064a6158c6a4117fe84478912e6b5e6 (diff)
Merge pull request #14959 from vespa-engine/balder/reduce-persistenutils-tentackles
Balder/reduce persistenutils tentackles
-rw-r--r--storage/src/tests/common/teststorageapp.cpp3
-rw-r--r--storage/src/tests/common/teststorageapp.h6
-rw-r--r--storage/src/tests/persistence/mergehandlertest.cpp46
-rw-r--r--storage/src/tests/persistence/persistencetestutils.h3
-rw-r--r--storage/src/vespa/storage/bucketdb/bucketmanager.cpp6
-rw-r--r--storage/src/vespa/storage/common/servicelayercomponent.cpp16
-rw-r--r--storage/src/vespa/storage/common/servicelayercomponent.h7
-rw-r--r--storage/src/vespa/storage/frameworkimpl/component/servicelayercomponentregisterimpl.cpp17
-rw-r--r--storage/src/vespa/storage/frameworkimpl/component/servicelayercomponentregisterimpl.h3
-rw-r--r--storage/src/vespa/storage/persistence/asynchandler.cpp8
-rw-r--r--storage/src/vespa/storage/persistence/asynchandler.h5
-rw-r--r--storage/src/vespa/storage/persistence/bucketownershipnotifier.cpp23
-rw-r--r--storage/src/vespa/storage/persistence/bucketownershipnotifier.h51
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp22
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormanager.h1
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.cpp8
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.h1
-rw-r--r--storage/src/vespa/storage/persistence/persistencehandler.cpp11
-rw-r--r--storage/src/vespa/storage/persistence/persistencehandler.h3
-rw-r--r--storage/src/vespa/storage/persistence/persistenceutil.cpp46
-rw-r--r--storage/src/vespa/storage/persistence/persistenceutil.h62
-rw-r--r--storage/src/vespa/storage/persistence/processallhandler.h4
-rw-r--r--storage/src/vespa/storage/persistence/simplemessagehandler.h4
-rw-r--r--storage/src/vespa/storage/persistence/testandsethelper.cpp8
-rw-r--r--storage/src/vespa/storage/persistence/testandsethelper.h9
-rw-r--r--storage/src/vespa/storage/storageserver/servicelayernode.cpp1
26 files changed, 154 insertions, 220 deletions
diff --git a/storage/src/tests/common/teststorageapp.cpp b/storage/src/tests/common/teststorageapp.cpp
index a2a0cbd1e68..2fc33bc1360 100644
--- a/storage/src/tests/common/teststorageapp.cpp
+++ b/storage/src/tests/common/teststorageapp.cpp
@@ -141,7 +141,6 @@ TestServiceLayerApp::TestServiceLayerApp(vespalib::stringref configId)
_persistenceProvider(),
_executor(vespalib::SequencedTaskExecutor::create(1))
{
- _compReg.setDiskCount(1);
lib::NodeState ns(*_nodeStateUpdater.getReportedNodeState());
ns.setDiskCount(1);
_nodeStateUpdater.setReportedNodeState(ns);
@@ -155,7 +154,6 @@ TestServiceLayerApp::TestServiceLayerApp(NodeIndex index,
_persistenceProvider(),
_executor(vespalib::SequencedTaskExecutor::create(1))
{
- _compReg.setDiskCount(1);
lib::NodeState ns(*_nodeStateUpdater.getReportedNodeState());
ns.setDiskCount(1);
_nodeStateUpdater.setReportedNodeState(ns);
@@ -166,7 +164,6 @@ TestServiceLayerApp::~TestServiceLayerApp() = default;
void
TestServiceLayerApp::setupDummyPersistence()
{
- assert(_compReg.getDiskCount() == 1u);
auto provider = std::make_unique<spi::dummy::DummyPersistence>(getTypeRepo());
provider->initialize();
setPersistenceProvider(std::move(provider));
diff --git a/storage/src/tests/common/teststorageapp.h b/storage/src/tests/common/teststorageapp.h
index ffb16f3646a..0a104b1655c 100644
--- a/storage/src/tests/common/teststorageapp.h
+++ b/storage/src/tests/common/teststorageapp.h
@@ -104,7 +104,6 @@ private:
[[nodiscard]] virtual StorBucketDatabase& content_bucket_db(document::BucketSpace) { abort(); }
virtual StorBucketDatabase& getStorageBucketDatabase() { abort(); }
virtual BucketDatabase& getBucketDatabase() { abort(); }
- virtual uint16_t getDiskCount() const { abort(); }
};
class TestServiceLayerApp : public TestStorageApp
@@ -134,11 +133,6 @@ public:
return _compReg.getBucketSpaceRepo().get(document::FixedBucketSpaces::default_space()).bucketDatabase();
}
vespalib::ISequencedTaskExecutor & executor() { return *_executor; }
-
-private:
- // For storage server interface implementation we'll get rid of soon.
- // Use getPartitions().size() instead.
- uint16_t getDiskCount() const override { return _compReg.getDiskCount(); }
};
class TestDistributorApp : public TestStorageApp,
diff --git a/storage/src/tests/persistence/mergehandlertest.cpp b/storage/src/tests/persistence/mergehandlertest.cpp
index 40b0d8eb2ba..8cf9a7ac661 100644
--- a/storage/src/tests/persistence/mergehandlertest.cpp
+++ b/storage/src/tests/persistence/mergehandlertest.cpp
@@ -154,6 +154,15 @@ struct MergeHandlerTest : SingleDiskPersistenceTestUtils {
PersistenceProviderWrapper& providerWrapper,
HandlerInvoker& invoker,
const ExpectedExceptionSpec& spec);
+
+ MergeHandler createHandler(size_t maxChunkSize = 0x400000) {
+ return MergeHandler(getEnv(), getPersistenceProvider(),
+ getEnv()._component.getClusterName(), getEnv()._component.getClock(), maxChunkSize);
+ }
+ MergeHandler createHandler(spi::PersistenceProvider & spi) {
+ return MergeHandler(getEnv(), spi,
+ getEnv()._component.getClusterName(), getEnv()._component.getClock());
+ }
};
MergeHandlerTest::HandleGetBucketDiffReplyInvoker::HandleGetBucketDiffReplyInvoker() = default;
@@ -199,7 +208,8 @@ MergeHandlerTest::setUpChain(ChainPos pos) {
// Test a regular merge bucket command fetching data, including
// puts, removes, unrevertable removes & duplicates.
TEST_F(MergeHandlerTest, merge_bucket_command) {
- MergeHandler handler(getEnv(), getPersistenceProvider());
+ MergeHandler handler(getEnv(), getPersistenceProvider(),
+ getEnv()._component.getClusterName(), getEnv()._component.getClock());
LOG(debug, "Handle a merge bucket command");
auto cmd = std::make_shared<api::MergeBucketCommand>(_bucket, _nodes, _maxTimestamp);
@@ -224,7 +234,7 @@ void
MergeHandlerTest::testGetBucketDiffChain(bool midChain)
{
setUpChain(midChain ? MIDDLE : BACK);
- MergeHandler handler(getEnv(), getPersistenceProvider());
+ MergeHandler handler = createHandler();
LOG(debug, "Verifying that get bucket diff is sent on");
auto cmd = std::make_shared<api::GetBucketDiffCommand>(_bucket, _nodes, _maxTimestamp);
@@ -273,7 +283,7 @@ void
MergeHandlerTest::testApplyBucketDiffChain(bool midChain)
{
setUpChain(midChain ? MIDDLE : BACK);
- MergeHandler handler(getEnv(), getPersistenceProvider());
+ MergeHandler handler = createHandler();
LOG(debug, "Verifying that apply bucket diff is sent on");
auto cmd = std::make_shared<api::ApplyBucketDiffCommand>(_bucket, _nodes, _maxTimestamp);
@@ -320,7 +330,7 @@ TEST_F(MergeHandlerTest, apply_bucket_diff_end_of_chain) {
// Test that a simplistic merge with one thing to actually merge,
// sends correct commands and finish.
TEST_F(MergeHandlerTest, master_message_flow) {
- MergeHandler handler(getEnv(), getPersistenceProvider());
+ MergeHandler handler = createHandler();
LOG(debug, "Handle a merge bucket command");
auto cmd = std::make_shared<api::MergeBucketCommand>(_bucket, _nodes, _maxTimestamp);
@@ -421,7 +431,7 @@ TEST_F(MergeHandlerTest, chunked_apply_bucket_diff) {
doPut(1234, spi::Timestamp(4000 + i), docSize, docSize);
}
- MergeHandler handler(getEnv(), getPersistenceProvider(), maxChunkSize);
+ MergeHandler handler = createHandler(maxChunkSize);
LOG(debug, "Handle a merge bucket command");
auto cmd = std::make_shared<api::MergeBucketCommand>(_bucket, _nodes, _maxTimestamp);
@@ -504,7 +514,7 @@ TEST_F(MergeHandlerTest, chunk_limit_partially_filled_diff) {
auto applyBucketDiffCmd = std::make_shared<api::ApplyBucketDiffCommand>(_bucket, _nodes, maxChunkSize);
applyBucketDiffCmd->getDiff() = applyDiff;
- MergeHandler handler(getEnv(), getPersistenceProvider(), maxChunkSize);
+ MergeHandler handler = createHandler(maxChunkSize);
handler.handleApplyBucketDiff(*applyBucketDiffCmd, createTracker(applyBucketDiffCmd, _bucket));
auto fwdDiffCmd = fetchSingleMessage<api::ApplyBucketDiffCommand>();
@@ -516,7 +526,7 @@ TEST_F(MergeHandlerTest, chunk_limit_partially_filled_diff) {
TEST_F(MergeHandlerTest, max_timestamp) {
doPut(1234, spi::Timestamp(_maxTimestamp + 10), 1024, 1024);
- MergeHandler handler(getEnv(), getPersistenceProvider());
+ MergeHandler handler = createHandler();
auto cmd = std::make_shared<api::MergeBucketCommand>(_bucket, _nodes, _maxTimestamp);
handler.handleMergeBucket(*cmd, createTracker(cmd, _bucket));
@@ -624,7 +634,7 @@ MergeHandlerTest::createDummyGetBucketDiff(int timestampOffset,
TEST_F(MergeHandlerTest, spi_flush_guard) {
PersistenceProviderWrapper providerWrapper(getPersistenceProvider());
- MergeHandler handler(getEnv(), providerWrapper);
+ MergeHandler handler = createHandler(providerWrapper);
providerWrapper.setResult(
spi::Result(spi::Result::ErrorType::PERMANENT_ERROR, "who you gonna call?"));
@@ -644,7 +654,7 @@ TEST_F(MergeHandlerTest, spi_flush_guard) {
}
TEST_F(MergeHandlerTest, bucket_not_found_in_db) {
- MergeHandler handler(getEnv(), getPersistenceProvider());
+ MergeHandler handler = createHandler();
// Send merge for unknown bucket
auto cmd = std::make_shared<api::MergeBucketCommand>(makeDocumentBucket(document::BucketId(16, 6789)), _nodes, _maxTimestamp);
MessageTracker::UP tracker = handler.handleMergeBucket(*cmd, createTracker(cmd, _bucket));
@@ -652,7 +662,7 @@ TEST_F(MergeHandlerTest, bucket_not_found_in_db) {
}
TEST_F(MergeHandlerTest, merge_progress_safe_guard) {
- MergeHandler handler(getEnv(), getPersistenceProvider());
+ MergeHandler handler = createHandler();
auto cmd = std::make_shared<api::MergeBucketCommand>(_bucket, _nodes, _maxTimestamp);
handler.handleMergeBucket(*cmd, createTracker(cmd, _bucket));
@@ -675,7 +685,7 @@ TEST_F(MergeHandlerTest, merge_progress_safe_guard) {
}
TEST_F(MergeHandlerTest, safe_guard_not_invoked_when_has_mask_changes) {
- MergeHandler handler(getEnv(), getPersistenceProvider());
+ MergeHandler handler = createHandler();
_nodes.clear();
_nodes.emplace_back(0, false);
_nodes.emplace_back(1, false);
@@ -707,7 +717,7 @@ TEST_F(MergeHandlerTest, safe_guard_not_invoked_when_has_mask_changes) {
}
TEST_F(MergeHandlerTest, entry_removed_after_get_bucket_diff) {
- MergeHandler handler(getEnv(), getPersistenceProvider());
+ MergeHandler handler = createHandler();
std::vector<api::ApplyBucketDiffCommand::Entry> applyDiff;
{
api::ApplyBucketDiffCommand::Entry e;
@@ -815,7 +825,7 @@ MergeHandlerTest::HandleMergeBucketInvoker::invoke(
TEST_F(MergeHandlerTest, merge_bucket_spi_failures) {
PersistenceProviderWrapper providerWrapper(getPersistenceProvider());
- MergeHandler handler(getEnv(), providerWrapper);
+ MergeHandler handler = createHandler(providerWrapper);
providerWrapper.setResult(
spi::Result(spi::Result::ErrorType::PERMANENT_ERROR, "who you gonna call?"));
setUpChain(MIDDLE);
@@ -847,7 +857,7 @@ MergeHandlerTest::HandleGetBucketDiffInvoker::invoke(
TEST_F(MergeHandlerTest, get_bucket_diff_spi_failures) {
PersistenceProviderWrapper providerWrapper(getPersistenceProvider());
- MergeHandler handler(getEnv(), providerWrapper);
+ MergeHandler handler = createHandler(providerWrapper);
providerWrapper.setResult(spi::Result(spi::Result::ErrorType::PERMANENT_ERROR, "who you gonna call?"));
setUpChain(MIDDLE);
@@ -880,7 +890,7 @@ MergeHandlerTest::HandleApplyBucketDiffInvoker::invoke(
TEST_F(MergeHandlerTest, apply_bucket_diff_spi_failures) {
PersistenceProviderWrapper providerWrapper(getPersistenceProvider());
- MergeHandler handler(getEnv(), providerWrapper);
+ MergeHandler handler = createHandler(providerWrapper);
providerWrapper.setResult(
spi::Result(spi::Result::ErrorType::PERMANENT_ERROR, "who you gonna call?"));
setUpChain(MIDDLE);
@@ -945,7 +955,7 @@ MergeHandlerTest::HandleGetBucketDiffReplyInvoker::afterInvoke(
TEST_F(MergeHandlerTest, get_bucket_diff_reply_spi_failures) {
PersistenceProviderWrapper providerWrapper(getPersistenceProvider());
- MergeHandler handler(getEnv(), providerWrapper);
+ MergeHandler handler = createHandler(providerWrapper);
providerWrapper.setResult(
spi::Result(spi::Result::ErrorType::PERMANENT_ERROR, "who you gonna call?"));
HandleGetBucketDiffReplyInvoker invoker;
@@ -1036,7 +1046,7 @@ TEST_F(MergeHandlerTest, apply_bucket_diff_reply_spi_failures) {
ChainPos pos(i == 0 ? FRONT : MIDDLE);
setUpChain(pos);
invoker.setChainPos(pos);
- MergeHandler handler(getEnv(), providerWrapper);
+ MergeHandler handler = createHandler(providerWrapper);
providerWrapper.setResult(
spi::Result(spi::Result::ErrorType::PERMANENT_ERROR, "who you gonna call?"));
@@ -1128,7 +1138,7 @@ TEST_F(MergeHandlerTest, remove_put_on_existing_timestamp) {
spi::Timestamp ts(10111);
doPut(doc, ts);
- MergeHandler handler(getEnv(), getPersistenceProvider());
+ MergeHandler handler = createHandler();
std::vector<api::ApplyBucketDiffCommand::Entry> applyDiff;
{
api::ApplyBucketDiffCommand::Entry e;
diff --git a/storage/src/tests/persistence/persistencetestutils.h b/storage/src/tests/persistence/persistencetestutils.h
index 332a30393b1..29f77048b51 100644
--- a/storage/src/tests/persistence/persistencetestutils.h
+++ b/storage/src/tests/persistence/persistencetestutils.h
@@ -110,7 +110,8 @@ public:
MessageTracker::UP
createTracker(api::StorageMessage::SP cmd, document::Bucket bucket) {
- return MessageTracker::createForTesting(getEnv(), _replySender, NoBucketLock::make(bucket), std::move(cmd));
+ return MessageTracker::createForTesting(framework::MilliSecTimer(getEnv()._component.getClock()), getEnv(),
+ _replySender, NoBucketLock::make(bucket), std::move(cmd));
}
api::ReturnCode
diff --git a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp
index ab22d8440fa..a63067ca156 100644
--- a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp
+++ b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp
@@ -50,7 +50,7 @@ BucketManager::BucketManager(const config::ConfigUri & configUri,
_metrics(std::make_shared<BucketManagerMetrics>(_component.getBucketSpaceRepo())),
_simulated_processing_delay(0)
{
- _metrics->setDisks(_component.getDiskCount());
+ _metrics->setDisks(1);
_component.registerStatusPage(*this);
_component.registerMetric(*_metrics);
_component.registerMetricUpdateHook(*this, framework::SecondTime(300));
@@ -231,7 +231,7 @@ BucketManager::updateMetrics(bool updateDocCount)
updateDocCount ? "" : ", minusedbits only",
_doneInitialized ? "" : ", server is not done initializing");
- uint16_t diskCount = _component.getDiskCount();
+ const uint16_t diskCount = 1;
assert(diskCount >= 1);
if (!updateDocCount || _doneInitialized) {
MetricsUpdater total(diskCount);
@@ -275,7 +275,7 @@ void BucketManager::update_bucket_db_memory_usage_metrics() {
void BucketManager::updateMinUsedBits()
{
- MetricsUpdater m(_component.getDiskCount());
+ MetricsUpdater m(1);
_component.getBucketSpaceRepo().for_each_bucket(std::ref(m));
// When going through to get sizes, we also record min bits
MinimumUsedBitsTracker& bitTracker(_component.getMinUsedBitsTracker());
diff --git a/storage/src/vespa/storage/common/servicelayercomponent.cpp b/storage/src/vespa/storage/common/servicelayercomponent.cpp
index 14e67679e24..b6cd9bc4aae 100644
--- a/storage/src/vespa/storage/common/servicelayercomponent.cpp
+++ b/storage/src/vespa/storage/common/servicelayercomponent.cpp
@@ -24,20 +24,4 @@ ServiceLayerComponent::getBucketDatabase(BucketSpace bucketSpace) const
return _bucketSpaceRepo->get(bucketSpace).bucketDatabase();
}
-uint16_t
-ServiceLayerComponent::getIdealPartition(const document::Bucket& bucket) const
-{
- return _bucketSpaceRepo->get(bucket.getBucketSpace()).getDistribution()->getIdealDisk(
- *getStateUpdater().getReportedNodeState(), getIndex(), bucket.getBucketId(),
- lib::Distribution::IDEAL_DISK_EVEN_IF_DOWN);
-}
-
-uint16_t
-ServiceLayerComponent::getPreferredAvailablePartition(
- const document::Bucket& bucket) const
-{
- return _bucketSpaceRepo->get(bucket.getBucketSpace()).getDistribution()->getPreferredAvailableDisk(
- *getStateUpdater().getReportedNodeState(), getIndex(), bucket.getBucketId());
-}
-
} // storage
diff --git a/storage/src/vespa/storage/common/servicelayercomponent.h b/storage/src/vespa/storage/common/servicelayercomponent.h
index a75dacde1d8..1f1e26dbee4 100644
--- a/storage/src/vespa/storage/common/servicelayercomponent.h
+++ b/storage/src/vespa/storage/common/servicelayercomponent.h
@@ -38,7 +38,6 @@ struct ServiceLayerManagedComponent
{
virtual ~ServiceLayerManagedComponent() = default;
- virtual void setDiskCount(uint16_t count) = 0;
virtual void setBucketSpaceRepo(ContentBucketSpaceRepo&) = 0;
virtual void setMinUsedBitsTracker(MinimumUsedBitsTracker&) = 0;
};
@@ -51,12 +50,10 @@ struct ServiceLayerComponentRegister : public virtual StorageComponentRegister
class ServiceLayerComponent : public StorageComponent,
private ServiceLayerManagedComponent
{
- uint16_t _diskCount;
ContentBucketSpaceRepo* _bucketSpaceRepo;
MinimumUsedBitsTracker* _minUsedBitsTracker;
// ServiceLayerManagedComponent implementation
- void setDiskCount(uint16_t count) override { _diskCount = count; }
void setBucketSpaceRepo(ContentBucketSpaceRepo& repo) override { _bucketSpaceRepo = &repo; }
void setMinUsedBitsTracker(MinimumUsedBitsTracker& tracker) override {
_minUsedBitsTracker = &tracker;
@@ -67,14 +64,12 @@ public:
ServiceLayerComponent(ServiceLayerComponentRegister& compReg,
vespalib::stringref name)
: StorageComponent(compReg, name),
- _diskCount(0),
_bucketSpaceRepo(nullptr),
_minUsedBitsTracker(nullptr)
{
compReg.registerServiceLayerComponent(*this);
}
- uint16_t getDiskCount() const { return _diskCount; }
const ContentBucketSpaceRepo &getBucketSpaceRepo() const;
StorBucketDatabase& getBucketDatabase(document::BucketSpace bucketSpace) const;
MinimumUsedBitsTracker& getMinUsedBitsTracker() {
@@ -83,8 +78,6 @@ public:
const MinimumUsedBitsTracker& getMinUsedBitsTracker() const {
return *_minUsedBitsTracker;
}
- uint16_t getIdealPartition(const document::Bucket&) const;
- uint16_t getPreferredAvailablePartition(const document::Bucket&) const;
};
} // storage
diff --git a/storage/src/vespa/storage/frameworkimpl/component/servicelayercomponentregisterimpl.cpp b/storage/src/vespa/storage/frameworkimpl/component/servicelayercomponentregisterimpl.cpp
index 66ec262250c..7a342c1df25 100644
--- a/storage/src/vespa/storage/frameworkimpl/component/servicelayercomponentregisterimpl.cpp
+++ b/storage/src/vespa/storage/frameworkimpl/component/servicelayercomponentregisterimpl.cpp
@@ -10,8 +10,7 @@ namespace storage {
using vespalib::IllegalStateException;
ServiceLayerComponentRegisterImpl::ServiceLayerComponentRegisterImpl(bool use_btree_db)
- : _diskCount(0),
- _bucketSpaceRepo(use_btree_db)
+ : _bucketSpaceRepo(use_btree_db)
{ }
void
@@ -19,25 +18,11 @@ ServiceLayerComponentRegisterImpl::registerServiceLayerComponent(ServiceLayerMan
{
std::lock_guard lock(_componentLock);
_components.push_back(&smc);
- smc.setDiskCount(_diskCount);
smc.setBucketSpaceRepo(_bucketSpaceRepo);
smc.setMinUsedBitsTracker(_minUsedBitsTracker);
}
void
-ServiceLayerComponentRegisterImpl::setDiskCount(uint16_t count)
-{
- std::lock_guard lock(_componentLock);
- if (_diskCount != 0) {
- throw IllegalStateException("Disk count already set. Cannot be updated live", VESPA_STRLOC);
- }
- _diskCount = count;
- for (uint32_t i=0; i<_components.size(); ++i) {
- _components[i]->setDiskCount(count);
- }
-}
-
-void
ServiceLayerComponentRegisterImpl::setDistribution(lib::Distribution::SP distribution)
{
_bucketSpaceRepo.get(document::FixedBucketSpaces::default_space()).setDistribution(distribution);
diff --git a/storage/src/vespa/storage/frameworkimpl/component/servicelayercomponentregisterimpl.h b/storage/src/vespa/storage/frameworkimpl/component/servicelayercomponentregisterimpl.h
index e722110d770..967adf8f787 100644
--- a/storage/src/vespa/storage/frameworkimpl/component/servicelayercomponentregisterimpl.h
+++ b/storage/src/vespa/storage/frameworkimpl/component/servicelayercomponentregisterimpl.h
@@ -20,7 +20,6 @@ class ServiceLayerComponentRegisterImpl
{
std::mutex _componentLock;
std::vector<ServiceLayerManagedComponent*> _components;
- uint16_t _diskCount;
ContentBucketSpaceRepo _bucketSpaceRepo;
MinimumUsedBitsTracker _minUsedBitsTracker;
@@ -29,14 +28,12 @@ public:
ServiceLayerComponentRegisterImpl(bool use_btree_db = false);
- uint16_t getDiskCount() const { return _diskCount; }
ContentBucketSpaceRepo& getBucketSpaceRepo() { return _bucketSpaceRepo; }
MinimumUsedBitsTracker& getMinUsedBitsTracker() {
return _minUsedBitsTracker;
}
void registerServiceLayerComponent(ServiceLayerManagedComponent&) override;
- void setDiskCount(uint16_t count);
void setDistribution(lib::Distribution::SP distribution) override;
};
diff --git a/storage/src/vespa/storage/persistence/asynchandler.cpp b/storage/src/vespa/storage/persistence/asynchandler.cpp
index 418eeb40ffc..1d1f5caf673 100644
--- a/storage/src/vespa/storage/persistence/asynchandler.cpp
+++ b/storage/src/vespa/storage/persistence/asynchandler.cpp
@@ -86,10 +86,12 @@ private:
}
AsyncHandler::AsyncHandler(const PersistenceUtil & env, spi::PersistenceProvider & spi,
- vespalib::ISequencedTaskExecutor & executor)
+ vespalib::ISequencedTaskExecutor & executor,
+ const document::BucketIdFactory & bucketIdFactory)
: _env(env),
_spi(spi),
- _sequencedExecutor(executor)
+ _sequencedExecutor(executor),
+ _bucketIdFactory(bucketIdFactory)
{}
MessageTracker::UP
@@ -188,7 +190,7 @@ bool
AsyncHandler::tasConditionMatches(const api::TestAndSetCommand & cmd, MessageTracker & tracker,
spi::Context & context, bool missingDocumentImpliesMatch) const {
try {
- TestAndSetHelper helper(_env, _spi, cmd, missingDocumentImpliesMatch);
+ TestAndSetHelper helper(_env, _spi, _bucketIdFactory, cmd, missingDocumentImpliesMatch);
auto code = helper.retrieveAndMatch(context);
if (code.failed()) {
diff --git a/storage/src/vespa/storage/persistence/asynchandler.h b/storage/src/vespa/storage/persistence/asynchandler.h
index d7e068091c7..1719dcadeb3 100644
--- a/storage/src/vespa/storage/persistence/asynchandler.h
+++ b/storage/src/vespa/storage/persistence/asynchandler.h
@@ -4,6 +4,7 @@
#include "types.h"
#include <vespa/storageapi/message/persistence.h>
+namespace document { class BucketIdFactory; }
namespace vespalib { class ISequencedTaskExecutor; }
namespace storage {
@@ -19,7 +20,8 @@ struct PersistenceUtil;
*/
class AsyncHandler : public Types {
public:
- AsyncHandler(const PersistenceUtil&, spi::PersistenceProvider&, vespalib::ISequencedTaskExecutor & executor);
+ AsyncHandler(const PersistenceUtil&, spi::PersistenceProvider&, vespalib::ISequencedTaskExecutor & executor,
+ const document::BucketIdFactory & bucketIdFactory);
MessageTrackerUP handlePut(api::PutCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleRemove(api::RemoveCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleUpdate(api::UpdateCommand& cmd, MessageTrackerUP tracker) const;
@@ -30,6 +32,7 @@ private:
const PersistenceUtil & _env;
spi::PersistenceProvider & _spi;
vespalib::ISequencedTaskExecutor & _sequencedExecutor;
+ const document::BucketIdFactory & _bucketIdFactory;
};
} // storage
diff --git a/storage/src/vespa/storage/persistence/bucketownershipnotifier.cpp b/storage/src/vespa/storage/persistence/bucketownershipnotifier.cpp
index a0f05a70f4e..76547dc83a8 100644
--- a/storage/src/vespa/storage/persistence/bucketownershipnotifier.cpp
+++ b/storage/src/vespa/storage/persistence/bucketownershipnotifier.cpp
@@ -17,8 +17,7 @@ using document::BucketSpace;
namespace storage {
uint16_t
-BucketOwnershipNotifier::getOwnerDistributorForBucket(
- const document::Bucket &bucket) const
+BucketOwnershipNotifier::getOwnerDistributorForBucket(const document::Bucket &bucket) const
{
try {
auto distribution(_component.getBucketSpaceRepo().get(bucket.getBucketSpace()).getDistribution());
@@ -28,24 +27,19 @@ BucketOwnershipNotifier::getOwnerDistributorForBucket(
// If we get exceptions there aren't any distributors, so they'll have
// to explicitly fetch all bucket info eventually anyway.
} catch (lib::TooFewBucketBitsInUseException& e) {
- LOGBP(debug, "Too few bucket bits used for %s to be assigned "
- "to a distributor. Not notifying any distributor of "
- "bucket change.",
- bucket.toString().c_str());
+ LOGBP(debug, "Too few bucket bits used for %s to be assigned to a distributor."
+ " Not notifying any distributor of bucket change.",
+ bucket.toString().c_str());
} catch (lib::NoDistributorsAvailableException& e) {
- LOGBP(debug, "No distributors available. Not notifying any "
- "distributor of bucket change.");
+ LOGBP(debug, "No distributors available. Not notifying any distributor of bucket change.");
} catch (const std::exception& e) {
- LOG(error,
- "Got unknown exception while resolving distributor: %s",
- e.what());
+ LOG(error, "Got unknown exception while resolving distributor: %s", e.what());
}
return FAILED_TO_RESOLVE;
}
bool
-BucketOwnershipNotifier::distributorOwns(uint16_t distributor,
- const document::Bucket &bucket) const
+BucketOwnershipNotifier::distributorOwns(uint16_t distributor, const document::Bucket &bucket) const
{
return (distributor == getOwnerDistributorForBucket(bucket));
}
@@ -64,8 +58,7 @@ BucketOwnershipNotifier::sendNotifyBucketToDistributor(
vespalib::getStackTrace(0).c_str());
return;
}
- api::NotifyBucketChangeCommand::SP notifyCmd(
- new api::NotifyBucketChangeCommand(bucket, infoToSend));
+ auto notifyCmd = std::make_shared<api::NotifyBucketChangeCommand>(bucket, infoToSend);
notifyCmd->setAddress(api::StorageMessageAddress(
_component.getClusterName(),
diff --git a/storage/src/vespa/storage/persistence/bucketownershipnotifier.h b/storage/src/vespa/storage/persistence/bucketownershipnotifier.h
index 06faa2ed583..948fc54726f 100644
--- a/storage/src/vespa/storage/persistence/bucketownershipnotifier.h
+++ b/storage/src/vespa/storage/persistence/bucketownershipnotifier.h
@@ -12,40 +12,30 @@ namespace storage {
class BucketOwnershipNotifier
{
- ServiceLayerComponent& _component;
- MessageSender& _sender;
+ const ServiceLayerComponent & _component;
+ MessageSender & _sender;
public:
- BucketOwnershipNotifier(ServiceLayerComponent& component,
- MessageSender& sender)
+ BucketOwnershipNotifier(const ServiceLayerComponent& component, MessageSender& sender)
: _component(component),
_sender(sender)
{}
- bool distributorOwns(uint16_t distributor,
- const document::Bucket &bucket) const;
-
- void notifyIfOwnershipChanged(const document::Bucket &bucket,
- uint16_t sourceIndex,
- const api::BucketInfo& infoToSend);
-
- void sendNotifyBucketToCurrentOwner(const document::Bucket &bucket,
- const api::BucketInfo& infoToSend);
+ bool distributorOwns(uint16_t distributor, const document::Bucket &bucket) const;
+ void notifyIfOwnershipChanged(const document::Bucket &bucket, uint16_t sourceIndex, const api::BucketInfo& infoToSend);
+ void sendNotifyBucketToCurrentOwner(const document::Bucket &bucket, const api::BucketInfo& infoToSend);
private:
enum IndexMeta {
FAILED_TO_RESOLVE = 0xffff
};
- void sendNotifyBucketToDistributor(uint16_t distributorIndex,
- const document::Bucket &bucket,
+ void sendNotifyBucketToDistributor(uint16_t distributorIndex, const document::Bucket &bucket,
const api::BucketInfo& infoToSend);
// Returns either index or FAILED_TO_RESOLVE
uint16_t getOwnerDistributorForBucket(const document::Bucket &bucket) const;
- void logNotification(const document::Bucket &bucket,
- uint16_t sourceIndex,
- uint16_t currentOwnerIndex,
- const api::BucketInfo& newInfo);
+ void logNotification(const document::Bucket &bucket, uint16_t sourceIndex,
+ uint16_t currentOwnerIndex, const api::BucketInfo& newInfo);
};
/**
@@ -56,9 +46,7 @@ class NotificationGuard
{
struct BucketToCheck
{
- BucketToCheck(const document::Bucket& _bucket,
- uint16_t _sourceIndex,
- const api::BucketInfo& _info)
+ BucketToCheck(const document::Bucket& _bucket, uint16_t _sourceIndex, const api::BucketInfo& _info)
: bucket(_bucket),
info(_info),
sourceIndex(_sourceIndex),
@@ -66,29 +54,24 @@ class NotificationGuard
{}
document::Bucket bucket;
- api::BucketInfo info;
- uint16_t sourceIndex;
- bool alwaysSend;
+ api::BucketInfo info;
+ uint16_t sourceIndex;
+ bool alwaysSend;
};
BucketOwnershipNotifier& _notifier;
std::vector<BucketToCheck> _bucketsToCheck;
-
- NotificationGuard(const NotificationGuard&);
- NotificationGuard& operator=(const NotificationGuard&);
public:
NotificationGuard(BucketOwnershipNotifier& notifier)
: _notifier(notifier),
_bucketsToCheck()
{}
+ NotificationGuard(const NotificationGuard&) = delete;
+ NotificationGuard& operator=(const NotificationGuard&) = delete;
~NotificationGuard();
- void notifyIfOwnershipChanged(const document::Bucket &bucket,
- uint16_t sourceIndex,
- const api::BucketInfo& infoToSend);
-
- void notifyAlways(const document::Bucket &bucket,
- const api::BucketInfo& infoToSend);
+ void notifyIfOwnershipChanged(const document::Bucket &bucket, uint16_t sourceIndex, const api::BucketInfo& infoToSend);
+ void notifyAlways(const document::Bucket &bucket, const api::BucketInfo& infoToSend);
};
} // storage
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
index 832e07eaf95..c8789f765c7 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
@@ -115,6 +115,18 @@ createThreadName(size_t stripeId) {
}
}
+
+PersistenceHandler &
+FileStorManager::createRegisteredHandler(ServiceLayerComponent & component)
+{
+ size_t index = _persistenceHandlers.size();
+ assert(index < _metrics->disks[0]->threads.size());
+ _persistenceHandlers.push_back(
+ std::make_unique<PersistenceHandler>(*_sequencedExecutor, component,
+ *_config, *_provider, *_filestorHandler,
+ *_bucketOwnershipNotifier, *_metrics->disks[0]->threads[index]));
+ return *_persistenceHandlers.back();
+}
/**
* If live configuration, assuming storageserver makes sure no messages are
* incoming during reconfiguration
@@ -130,7 +142,6 @@ FileStorManager::configure(std::unique_ptr<vespa::config::content::StorFilestorC
if (!liveUpdate) {
_config = std::move(config);
- assert(_component.getDiskCount() == 1);
size_t numThreads = _config->numThreads;
size_t numStripes = std::max(size_t(1u), numThreads / 2);
_metrics->initDiskMetrics(1, _component.getLoadTypes()->getMetricLoadTypes(), numStripes, numThreads);
@@ -142,13 +153,8 @@ FileStorManager::configure(std::unique_ptr<vespa::config::content::StorFilestorC
LOG(spam, "Setting up the disk");
for (uint32_t i = 0; i < numThreads; i++) {
_persistenceComponents.push_back(std::make_unique<ServiceLayerComponent>(_compReg, createThreadName(i)));
- _persistenceHandlers.push_back(
- std::make_unique<PersistenceHandler>(*_sequencedExecutor,
- *_persistenceComponents.back(),
- *_config, *_provider, *_filestorHandler,
- *_bucketOwnershipNotifier, *_metrics->disks[0]->threads[i]));
- _threads.push_back(std::make_unique<PersistenceThread>(*_persistenceHandlers.back(), *_filestorHandler,
- i % numStripes, _component));
+ _threads.push_back(std::make_unique<PersistenceThread>(createRegisteredHandler(*_persistenceComponents.back()),
+ *_filestorHandler, i % numStripes, _component));
}
}
}
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.h b/storage/src/vespa/storage/persistence/filestorage/filestormanager.h
index aa9e7860a22..691470478cf 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.h
@@ -109,6 +109,7 @@ public:
private:
void configure(std::unique_ptr<vespa::config::content::StorFilestorConfig> config) override;
+ PersistenceHandler & createRegisteredHandler(ServiceLayerComponent & component);
void replyWithBucketNotFound(api::StorageMessage&, const document::Bucket&);
diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp
index ec71aee7eed..16a23b5f5a7 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.cpp
+++ b/storage/src/vespa/storage/persistence/mergehandler.cpp
@@ -14,11 +14,13 @@ LOG_SETUP(".persistence.mergehandler");
namespace storage {
-MergeHandler::MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi, uint32_t maxChunkSize,
+MergeHandler::MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi,
+ const vespalib::string & clusterName, const framework::Clock & clock,
+ uint32_t maxChunkSize,
bool enableMergeLocalNodeChooseDocsOptimalization,
uint32_t commonMergeChainOptimalizationMinimumSize)
- : _clock(env._component.getClock()),
- _clusterName(env._component.getClusterName()),
+ : _clock(clock),
+ _clusterName(clusterName),
_env(env),
_spi(spi),
_maxChunkSize(maxChunkSize),
diff --git a/storage/src/vespa/storage/persistence/mergehandler.h b/storage/src/vespa/storage/persistence/mergehandler.h
index af2f765aed5..830fb20c8d9 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.h
+++ b/storage/src/vespa/storage/persistence/mergehandler.h
@@ -35,6 +35,7 @@ public:
};
MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi,
+ const vespalib::string & clusterName, const framework::Clock & clock,
uint32_t maxChunkSize = 4190208,
bool enableMergeLocalNodeChooseDocsOptimalization = true,
uint32_t commonMergeChainOptimalizationMinimumSize = 64);
diff --git a/storage/src/vespa/storage/persistence/persistencehandler.cpp b/storage/src/vespa/storage/persistence/persistencehandler.cpp
index 13ff1df340b..b9d5d9affa0 100644
--- a/storage/src/vespa/storage/persistence/persistencehandler.cpp
+++ b/storage/src/vespa/storage/persistence/persistencehandler.cpp
@@ -8,19 +8,20 @@ LOG_SETUP(".persistence.persistencehandler");
namespace storage {
PersistenceHandler::PersistenceHandler(vespalib::ISequencedTaskExecutor & sequencedExecutor,
- ServiceLayerComponent & component,
+ const ServiceLayerComponent & component,
const vespa::config::content::StorFilestorConfig & cfg,
spi::PersistenceProvider& provider,
FileStorHandler& filestorHandler,
BucketOwnershipNotifier & bucketOwnershipNotifier,
FileStorThreadMetrics& metrics)
- :
+ : _clock(component.getClock()),
_env(component, filestorHandler, metrics, provider),
_processAllHandler(_env, provider),
- _mergeHandler(_env, provider, cfg.bucketMergeChunkSize,
+ _mergeHandler(_env, provider, component.getClusterName(), _clock,
+ cfg.bucketMergeChunkSize,
cfg.enableMergeLocalNodeChooseDocsOptimalization,
cfg.commonMergeChainOptimalizationMinimumSize),
- _asyncHandler(_env, provider, sequencedExecutor),
+ _asyncHandler(_env, provider, sequencedExecutor, component.getBucketIdFactory()),
_splitJoinHandler(_env, provider, bucketOwnershipNotifier, cfg.enableMultibitSplitOptimalization),
_simpleHandler(_env, provider)
{
@@ -141,7 +142,7 @@ PersistenceHandler::processLockedMessage(FileStorHandler::LockedMessage lock) co
// Important: we _copy_ the message shared_ptr instead of moving to ensure that `msg` remains
// valid even if the tracker is destroyed by an exception in processMessage().
- auto tracker = std::make_unique<MessageTracker>(_env, _env._fileStorHandler, std::move(lock.first), lock.second);
+ auto tracker = std::make_unique<MessageTracker>(framework::MilliSecTimer(_clock), _env, _env._fileStorHandler, std::move(lock.first), lock.second);
tracker = processMessage(msg, std::move(tracker));
if (tracker) {
tracker->sendReply();
diff --git a/storage/src/vespa/storage/persistence/persistencehandler.h b/storage/src/vespa/storage/persistence/persistencehandler.h
index 2cfac865484..29f09c47fc4 100644
--- a/storage/src/vespa/storage/persistence/persistencehandler.h
+++ b/storage/src/vespa/storage/persistence/persistencehandler.h
@@ -25,7 +25,7 @@ class BucketOwnershipNotifier;
class PersistenceHandler : public Types
{
public:
- PersistenceHandler(vespalib::ISequencedTaskExecutor &, ServiceLayerComponent & component,
+ PersistenceHandler(vespalib::ISequencedTaskExecutor &, const ServiceLayerComponent & component,
const vespa::config::content::StorFilestorConfig &, spi::PersistenceProvider &,
FileStorHandler &, BucketOwnershipNotifier &, FileStorThreadMetrics&);
~PersistenceHandler();
@@ -43,6 +43,7 @@ private:
MessageTracker::UP processMessage(api::StorageMessage& msg, MessageTracker::UP tracker) const;
+ const framework::Clock & _clock;
PersistenceUtil _env;
ProcessAllHandler _processAllHandler;
MergeHandler _mergeHandler;
diff --git a/storage/src/vespa/storage/persistence/persistenceutil.cpp b/storage/src/vespa/storage/persistence/persistenceutil.cpp
index 4ab3cdc5d01..a8b8cfa1f8c 100644
--- a/storage/src/vespa/storage/persistence/persistenceutil.cpp
+++ b/storage/src/vespa/storage/persistence/persistenceutil.cpp
@@ -25,13 +25,15 @@ namespace {
const vespalib::duration WARN_ON_SLOW_OPERATIONS = 5s;
}
-MessageTracker::MessageTracker(const PersistenceUtil & env,
+MessageTracker::MessageTracker(const framework::MilliSecTimer & timer,
+ const PersistenceUtil & env,
MessageSender & replySender,
FileStorHandler::BucketLockInterface::SP bucketLock,
api::StorageMessage::SP msg)
- : MessageTracker(env, replySender, true, std::move(bucketLock), std::move(msg))
+ : MessageTracker(timer, env, replySender, true, std::move(bucketLock), std::move(msg))
{}
-MessageTracker::MessageTracker(const PersistenceUtil & env,
+MessageTracker::MessageTracker(const framework::MilliSecTimer & timer,
+ const PersistenceUtil & env,
MessageSender & replySender,
bool updateBucketInfo,
FileStorHandler::BucketLockInterface::SP bucketLock,
@@ -45,12 +47,14 @@ MessageTracker::MessageTracker(const PersistenceUtil & env,
_replySender(replySender),
_metric(nullptr),
_result(api::ReturnCode::OK),
- _timer(env._component.getClock())
+ _timer(timer)
{ }
MessageTracker::UP
-MessageTracker::createForTesting(PersistenceUtil &env, MessageSender &replySender, FileStorHandler::BucketLockInterface::SP bucketLock, api::StorageMessage::SP msg) {
- return MessageTracker::UP(new MessageTracker(env, replySender, false, std::move(bucketLock), std::move(msg)));
+MessageTracker::createForTesting(const framework::MilliSecTimer & timer, PersistenceUtil &env, MessageSender &replySender,
+ FileStorHandler::BucketLockInterface::SP bucketLock, api::StorageMessage::SP msg)
+{
+ return MessageTracker::UP(new MessageTracker(timer, env, replySender, false, std::move(bucketLock), std::move(msg)));
}
void
@@ -154,16 +158,13 @@ MessageTracker::generateReply(api::StorageCommand& cmd)
}
}
-PersistenceUtil::PersistenceUtil(
- ServiceLayerComponent& component,
- FileStorHandler& fileStorHandler,
- FileStorThreadMetrics& metrics,
- spi::PersistenceProvider& provider)
+PersistenceUtil::PersistenceUtil(const ServiceLayerComponent& component, FileStorHandler& fileStorHandler,
+ FileStorThreadMetrics& metrics, spi::PersistenceProvider& provider)
: _component(component),
_fileStorHandler(fileStorHandler),
- _nodeIndex(component.getIndex()),
_metrics(metrics),
- _bucketFactory(component.getBucketIdFactory()),
+ _nodeIndex(component.getIndex()),
+ _bucketIdFactory(component.getBucketIdFactory()),
_spi(provider)
{
}
@@ -191,15 +192,8 @@ PersistenceUtil::updateBucketDatabase(const document::Bucket &bucket, const api:
}
}
-uint16_t
-PersistenceUtil::getPreferredAvailableDisk(const document::Bucket &bucket) const
-{
- return _component.getPreferredAvailablePartition(bucket);
-}
-
PersistenceUtil::LockResult
-PersistenceUtil::lockAndGetDisk(const document::Bucket &bucket,
- StorBucketDatabase::Flag flags)
+PersistenceUtil::lockAndGetDisk(const document::Bucket &bucket, StorBucketDatabase::Flag flags)
{
// To lock the bucket, we need to ensure that we don't conflict with
// bucket disk move command. First we fetch current disk index from
@@ -275,19 +269,13 @@ PersistenceUtil::convertErrorCode(const spi::Result& response)
return 0;
}
-void
-PersistenceUtil::shutdown(const std::string& reason)
-{
- _component.requestShutdown(reason);
-}
-
spi::Bucket
PersistenceUtil::getBucket(const document::DocumentId& id, const document::Bucket &bucket) const
{
- document::BucketId docBucket(_bucketFactory.getBucketId(id));
+ document::BucketId docBucket(_bucketIdFactory.getBucketId(id));
docBucket.setUsedBits(bucket.getBucketId().getUsedBits());
if (bucket.getBucketId() != docBucket) {
- docBucket = _bucketFactory.getBucketId(id);
+ docBucket = _bucketIdFactory.getBucketId(id);
throw vespalib::IllegalStateException("Document " + id.toString()
+ " (bucket " + docBucket.toString() + ") does not belong in "
+ "bucket " + bucket.getBucketId().toString() + ".", VESPA_STRLOC);
diff --git a/storage/src/vespa/storage/persistence/persistenceutil.h b/storage/src/vespa/storage/persistence/persistenceutil.h
index ffce25b1e49..2dbd7b2a263 100644
--- a/storage/src/vespa/storage/persistence/persistenceutil.h
+++ b/storage/src/vespa/storage/persistence/persistenceutil.h
@@ -11,13 +11,13 @@
namespace storage {
-struct PersistenceUtil;
+class PersistenceUtil;
class MessageTracker : protected Types {
public:
typedef std::unique_ptr<MessageTracker> UP;
- MessageTracker(const PersistenceUtil & env, MessageSender & replySender,
+ MessageTracker(const framework::MilliSecTimer & timer, const PersistenceUtil & env, MessageSender & replySender,
FileStorHandler::BucketLockInterface::SP bucketLock, api::StorageMessage::SP msg);
~MessageTracker();
@@ -70,11 +70,11 @@ public:
bool checkForError(const spi::Result& response);
static MessageTracker::UP
- createForTesting(PersistenceUtil & env, MessageSender & replySender,
+ createForTesting(const framework::MilliSecTimer & timer, PersistenceUtil & env, MessageSender & replySender,
FileStorHandler::BucketLockInterface::SP bucketLock, api::StorageMessage::SP msg);
private:
- MessageTracker(const PersistenceUtil & env, MessageSender & replySender, bool updateBucketInfo,
+ MessageTracker(const framework::MilliSecTimer & timer, const PersistenceUtil & env, MessageSender & replySender, bool updateBucketInfo,
FileStorHandler::BucketLockInterface::SP bucketLock, api::StorageMessage::SP msg);
[[nodiscard]] bool count_result_as_failure() const noexcept;
@@ -92,30 +92,8 @@ private:
framework::MilliSecTimer _timer;
};
-struct PersistenceUtil {
- ServiceLayerComponent &_component;
- FileStorHandler &_fileStorHandler;
- uint16_t _nodeIndex;
- FileStorThreadMetrics &_metrics; // Needs a better solution for speed and thread safety
- const document::BucketIdFactory &_bucketFactory;
- spi::PersistenceProvider &_spi;
-
- PersistenceUtil(
- ServiceLayerComponent&,
- FileStorHandler& fileStorHandler,
- FileStorThreadMetrics& metrics,
- spi::PersistenceProvider& provider);
-
- ~PersistenceUtil();
-
- StorBucketDatabase& getBucketDatabase(document::BucketSpace bucketSpace) const {
- return _component.getBucketDatabase(bucketSpace);
- }
-
- void updateBucketDatabase(const document::Bucket &bucket, const api::BucketInfo& info) const;
-
- uint16_t getPreferredAvailableDisk(const document::Bucket &bucket) const;
-
+class PersistenceUtil {
+public:
/** Lock the given bucket in the file stor handler. */
struct LockResult {
std::shared_ptr<FileStorHandler::BucketLockInterface> lock;
@@ -124,21 +102,29 @@ struct PersistenceUtil {
bool bucketExisted() const { return bool(lock); }
};
- LockResult lockAndGetDisk(
- const document::Bucket &bucket,
- StorBucketDatabase::Flag flags = StorBucketDatabase::NONE);
+ PersistenceUtil(const ServiceLayerComponent&, FileStorHandler& fileStorHandler,
+ FileStorThreadMetrics& metrics, spi::PersistenceProvider& provider);
+ ~PersistenceUtil();
+ StorBucketDatabase& getBucketDatabase(document::BucketSpace bucketSpace) const {
+ return _component.getBucketDatabase(bucketSpace);
+ }
+ spi::Bucket getBucket(const document::DocumentId& id, const document::Bucket &bucket) const;
+ void setBucketInfo(MessageTracker& tracker, const document::Bucket &bucket) const;
+ void updateBucketDatabase(const document::Bucket &bucket, const api::BucketInfo& info) const;
+ LockResult lockAndGetDisk(const document::Bucket &bucket, StorBucketDatabase::Flag flags = StorBucketDatabase::NONE);
api::BucketInfo getBucketInfo(const document::Bucket &bucket) const;
static api::BucketInfo convertBucketInfo(const spi::BucketInfo&);
-
- void setBucketInfo(MessageTracker& tracker, const document::Bucket &bucket) const;
-
- spi::Bucket getBucket(const document::DocumentId& id, const document::Bucket &bucket) const;
-
static uint32_t convertErrorCode(const spi::Result& response);
-
- void shutdown(const std::string& reason);
+public:
+ const ServiceLayerComponent &_component;
+ FileStorHandler &_fileStorHandler;
+ FileStorThreadMetrics &_metrics; // Needs a better solution for speed and thread safety
+ uint16_t _nodeIndex;
+private:
+ const document::BucketIdFactory &_bucketIdFactory;
+ spi::PersistenceProvider &_spi;
};
} // storage
diff --git a/storage/src/vespa/storage/persistence/processallhandler.h b/storage/src/vespa/storage/persistence/processallhandler.h
index 14b6bced8a7..44a3e631e0b 100644
--- a/storage/src/vespa/storage/persistence/processallhandler.h
+++ b/storage/src/vespa/storage/persistence/processallhandler.h
@@ -17,8 +17,8 @@ public:
MessageTrackerUP handleRemoveLocation(api::RemoveLocationCommand&, MessageTrackerUP tracker) const;
MessageTrackerUP handleStatBucket(api::StatBucketCommand&, MessageTrackerUP tracker) const;
private:
- const PersistenceUtil& _env;
- spi::PersistenceProvider& _spi;
+ const PersistenceUtil & _env;
+ spi::PersistenceProvider & _spi;
};
} // storage
diff --git a/storage/src/vespa/storage/persistence/simplemessagehandler.h b/storage/src/vespa/storage/persistence/simplemessagehandler.h
index f7c72859f78..79828679731 100644
--- a/storage/src/vespa/storage/persistence/simplemessagehandler.h
+++ b/storage/src/vespa/storage/persistence/simplemessagehandler.h
@@ -30,8 +30,8 @@ public:
MessageTrackerUP handleReadBucketInfo(ReadBucketInfo& cmd, MessageTrackerUP tracker) const;
private:
bool checkProviderBucketInfoMatches(const spi::Bucket&, const api::BucketInfo&) const;
- const PersistenceUtil& _env;
- spi::PersistenceProvider& _spi;
+ const PersistenceUtil & _env;
+ spi::PersistenceProvider & _spi;
};
} // storage
diff --git a/storage/src/vespa/storage/persistence/testandsethelper.cpp b/storage/src/vespa/storage/persistence/testandsethelper.cpp
index fa0cfd7ea7e..77eddd3bd0d 100644
--- a/storage/src/vespa/storage/persistence/testandsethelper.cpp
+++ b/storage/src/vespa/storage/persistence/testandsethelper.cpp
@@ -24,8 +24,9 @@ void TestAndSetHelper::resolveDocumentType(const document::DocumentTypeRepo & do
}
}
-void TestAndSetHelper::parseDocumentSelection(const document::DocumentTypeRepo & documentTypeRepo) {
- document::select::Parser parser(documentTypeRepo, _env._bucketFactory);
+void TestAndSetHelper::parseDocumentSelection(const document::DocumentTypeRepo & documentTypeRepo,
+ const document::BucketIdFactory & bucketIdFactory) {
+ document::select::Parser parser(documentTypeRepo, bucketIdFactory);
try {
_docSelectionUp = parser.parse(_cmd.getCondition().getSelection());
@@ -39,6 +40,7 @@ spi::GetResult TestAndSetHelper::retrieveDocument(const document::FieldSet & fie
}
TestAndSetHelper::TestAndSetHelper(const PersistenceUtil & env, const spi::PersistenceProvider & spi,
+ const document::BucketIdFactory & bucketFactory,
const api::TestAndSetCommand & cmd, bool missingDocumentImpliesMatch)
: _env(env),
_spi(spi),
@@ -49,7 +51,7 @@ TestAndSetHelper::TestAndSetHelper(const PersistenceUtil & env, const spi::Persi
{
const auto _repo = _env._component.getTypeRepo()->documentTypeRepo;
resolveDocumentType(*_repo);
- parseDocumentSelection(*_repo);
+ parseDocumentSelection(*_repo, bucketFactory);
}
TestAndSetHelper::~TestAndSetHelper() = default;
diff --git a/storage/src/vespa/storage/persistence/testandsethelper.h b/storage/src/vespa/storage/persistence/testandsethelper.h
index 44ecee88964..ca88f8b5337 100644
--- a/storage/src/vespa/storage/persistence/testandsethelper.h
+++ b/storage/src/vespa/storage/persistence/testandsethelper.h
@@ -8,7 +8,10 @@
#include <stdexcept>
namespace document::select { class Node; }
-namespace document { class FieldSet; }
+namespace document {
+ class FieldSet;
+ class BucketIdFactory;
+}
namespace storage {
@@ -42,11 +45,13 @@ class TestAndSetHelper {
bool _missingDocumentImpliesMatch;
void resolveDocumentType(const document::DocumentTypeRepo & documentTypeRepo);
- void parseDocumentSelection(const document::DocumentTypeRepo & documentTypeRepo);
+ void parseDocumentSelection(const document::DocumentTypeRepo & documentTypeRepo,
+ const document::BucketIdFactory & bucketIdFactory);
spi::GetResult retrieveDocument(const document::FieldSet & fieldSet, spi::Context & context);
public:
TestAndSetHelper(const PersistenceUtil & env, const spi::PersistenceProvider & _spi,
+ const document::BucketIdFactory & bucketIdFactory,
const api::TestAndSetCommand & cmd, bool missingDocumentImpliesMatch = false);
~TestAndSetHelper();
api::ReturnCode retrieveAndMatch(spi::Context & context);
diff --git a/storage/src/vespa/storage/storageserver/servicelayernode.cpp b/storage/src/vespa/storage/storageserver/servicelayernode.cpp
index e7ce6737ac8..1c8fdd4178d 100644
--- a/storage/src/vespa/storage/storageserver/servicelayernode.cpp
+++ b/storage/src/vespa/storage/storageserver/servicelayernode.cpp
@@ -88,7 +88,6 @@ ServiceLayerNode::subscribeToConfigs()
<< " disks but persistence provider states it has 1 disk.";
throw vespalib::IllegalStateException(ost.str(), VESPA_STRLOC);
}
- _context.getComponentRegister().setDiskCount(1u);
}
void