summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahoo-inc.com>2016-11-16 14:23:53 +0100
committerTor Brede Vekterli <vekterli@yahoo-inc.com>2016-11-22 16:48:32 +0100
commite665f71c6c60291ed9954529803fd598feafe6b9 (patch)
tree0c857900fd0a9e3a70ba7b102dbd6f64dfe2f155 /storage
parent2e747ea9504fdee687ce6641f0203a2604290fdb (diff)
Directly associate components with an explicit bucket space
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/common/teststorageapp.h7
-rw-r--r--storage/src/tests/distributor/bucketdbupdatertest.cpp26
-rw-r--r--storage/src/tests/distributor/distributortestutil.cpp48
-rw-r--r--storage/src/tests/distributor/distributortestutil.h15
-rw-r--r--storage/src/tests/distributor/statecheckerstest.cpp2
-rw-r--r--storage/src/vespa/storage/common/distributorcomponent.cpp2
-rw-r--r--storage/src/vespa/storage/common/distributorcomponent.h16
-rw-r--r--storage/src/vespa/storage/distributor/CMakeLists.txt3
-rw-r--r--storage/src/vespa/storage/distributor/bucket_space.cpp16
-rw-r--r--storage/src/vespa/storage/distributor/bucket_space.h44
-rw-r--r--storage/src/vespa/storage/distributor/bucket_space_component.cpp20
-rw-r--r--storage/src/vespa/storage/distributor/bucket_space_component.h37
-rw-r--r--storage/src/vespa/storage/distributor/bucket_space_repo.cpp27
-rw-r--r--storage/src/vespa/storage/distributor/bucket_space_repo.h31
-rw-r--r--storage/src/vespa/storage/distributor/bucketdbupdater.cpp3
-rw-r--r--storage/src/vespa/storage/distributor/bucketdbupdater.h6
-rw-r--r--storage/src/vespa/storage/distributor/distributor.cpp43
-rw-r--r--storage/src/vespa/storage/distributor/distributor.h14
-rw-r--r--storage/src/vespa/storage/distributor/distributorcomponent.h13
-rw-r--r--storage/src/vespa/storage/distributor/externaloperationhandler.cpp4
-rw-r--r--storage/src/vespa/storage/distributor/externaloperationhandler.h4
-rw-r--r--storage/src/vespa/storage/distributor/idealstatemanager.cpp3
-rw-r--r--storage/src/vespa/storage/distributor/idealstatemanager.h7
-rw-r--r--storage/src/vespa/storage/frameworkimpl/component/distributorcomponentregisterimpl.cpp27
-rw-r--r--storage/src/vespa/storage/frameworkimpl/component/distributorcomponentregisterimpl.h8
-rw-r--r--storage/src/vespa/storage/frameworkimpl/thread/deadlockdetector.cpp39
-rw-r--r--storage/src/vespa/storage/frameworkimpl/thread/deadlockdetector.h2
27 files changed, 309 insertions, 158 deletions
diff --git a/storage/src/tests/common/teststorageapp.h b/storage/src/tests/common/teststorageapp.h
index 14f9aad3ab0..e535fbfbc6e 100644
--- a/storage/src/tests/common/teststorageapp.h
+++ b/storage/src/tests/common/teststorageapp.h
@@ -148,10 +148,9 @@ public:
TestDistributorApp(vespalib::stringref configId = "");
TestDistributorApp(NodeIndex index, vespalib::stringref configId = "");
- DistributorComponentRegisterImpl& getComponentRegister()
- { return _compReg; }
- virtual BucketDatabase& getBucketDatabase()
- { return _compReg.getBucketDatabase(); }
+ DistributorComponentRegisterImpl& getComponentRegister() {
+ return _compReg;
+ }
virtual api::Timestamp getUniqueTimestamp();
};
diff --git a/storage/src/tests/distributor/bucketdbupdatertest.cpp b/storage/src/tests/distributor/bucketdbupdatertest.cpp
index 6c6c4d67acd..cd3658fc163 100644
--- a/storage/src/tests/distributor/bucketdbupdatertest.cpp
+++ b/storage/src/tests/distributor/bucketdbupdatertest.cpp
@@ -423,9 +423,8 @@ public:
}
void setDistribution(const std::string& distConfig) {
- lib::Distribution* distribution = new lib::Distribution(distConfig);
- _node->getComponentRegister().setDistribution(
- lib::Distribution::SP(distribution));
+ auto distribution = std::make_shared<lib::Distribution>(distConfig);
+ triggerDistributionChange(std::move(distribution));
}
std::string getDistConfig6Nodes3Groups() const {
@@ -626,7 +625,6 @@ BucketDBUpdaterTest::testDistributorChangeWithGrouping()
{
std::string distConfig(getDistConfig6Nodes3Groups());
setDistribution(distConfig);
- _distributor->enableNextDistribution();
int numBuckets = 100;
setSystemState(lib::ClusterState("distributor:6 storage:6"));
@@ -651,14 +649,10 @@ BucketDBUpdaterTest::testDistributorChangeWithGrouping()
// Unchanged grouping cause no change.
setDistribution(distConfig);
- _distributor->storageDistributionChanged();
- _distributor->enableNextDistribution();
CPPUNIT_ASSERT_EQUAL(size_t(0), _sender.commands.size());
// Changed grouping cause change
setDistribution(getDistConfig6Nodes4Groups());
- _distributor->storageDistributionChanged();
- _distributor->enableNextDistribution();
CPPUNIT_ASSERT_EQUAL(size_t(6), _sender.commands.size());
}
@@ -1992,12 +1986,7 @@ BucketDBUpdaterTest::testClusterStateAlwaysSendsFullFetchWhenDistributionChangeP
}
_sender.clear();
std::string distConfig(getDistConfig6Nodes3Groups());
- {
- _node->getComponentRegister().setDistribution(
- std::make_shared<lib::Distribution>(distConfig));
- _distributor->storageDistributionChanged();
- _distributor->enableNextDistribution();
- }
+ triggerDistributionChange(std::make_shared<lib::Distribution>(distConfig));
sortSentMessagesByIndex(_sender);
CPPUNIT_ASSERT_EQUAL(size_t(6), _sender.commands.size());
// Suddenly, a wild cluster state change appears! Even though this state
@@ -2043,10 +2032,7 @@ BucketDBUpdaterTest::testChangedDistributionConfigTriggersRecoveryMode()
CPPUNIT_ASSERT(!_distributor->isInRecoveryMode());
std::string distConfig(getDistConfig6Nodes4Groups());
- _node->getComponentRegister().setDistribution(
- std::make_shared<lib::Distribution>(distConfig));
- _distributor->storageDistributionChanged();
- _distributor->enableNextDistribution();
+ triggerDistributionChange(std::make_shared<lib::Distribution>(distConfig));
sortSentMessagesByIndex(_sender);
// No replies received yet, still no recovery mode.
CPPUNIT_ASSERT(!_distributor->isInRecoveryMode());
@@ -2271,8 +2257,6 @@ BucketDBUpdaterTest::clusterConfigDownsizeOnlySendsToAvailableNodes()
std::string distConfig(getDistConfig3Nodes1Group());
_node->getComponentRegister().setDistribution(
std::make_shared<lib::Distribution>(distConfig));
- _distributor->storageDistributionChanged();
- _distributor->enableNextDistribution();
sortSentMessagesByIndex(_sender);
CPPUNIT_ASSERT_EQUAL((nodeVec{0, 1, 2}), getSendSet());
@@ -2323,8 +2307,6 @@ BucketDBUpdaterTest::nodeMissingFromConfigIsTreatedAsNeedingOwnershipTransfer()
_node->getComponentRegister().setDistribution(
std::make_shared<lib::Distribution>(downsizeCfg));
- _distributor->storageDistributionChanged();
- _distributor->enableNextDistribution();
sortSentMessagesByIndex(_sender);
_sender.clear();
diff --git a/storage/src/tests/distributor/distributortestutil.cpp b/storage/src/tests/distributor/distributortestutil.cpp
index c2d878a253d..24af9f46f97 100644
--- a/storage/src/tests/distributor/distributortestutil.cpp
+++ b/storage/src/tests/distributor/distributortestutil.cpp
@@ -38,20 +38,40 @@ DistributorTestUtil::setupDistributor(int redundancy,
config.redundancy = redundancy;
config.initialRedundancy = earlyReturn;
config.ensurePrimaryPersisted = requirePrimaryToBeWritten;
- lib::Distribution* distribution = new lib::Distribution(config);
- _node->getComponentRegister().setDistribution(
- lib::Distribution::SP(distribution));
+ auto distribution = std::make_shared<lib::Distribution>(config);
+ _node->getComponentRegister().setDistribution(distribution);
_distributor->enableClusterState(lib::ClusterState(systemState));
+ // This is for all intents and purposes a hack to avoid having the
+ // distributor treat setting the distribution explicitly as a signal that
+ // it should send RequestBucketInfo to all configured nodes.
+ // If we called storageDistributionChanged followed by enableDistribution
+ // explicitly (which is what happens in "real life"), that is what would
+ // take place.
+ // The inverse case of this can be explicitly accomplished by calling
+ // triggerDistributionChange().
+ // This isn't pretty, folks, but it avoids breaking the world for now,
+ // as many tests have implicit assumptions about this being the behavior.
+ _distributor->propagateDefaultDistribution(distribution);
}
void
DistributorTestUtil::setRedundancy(uint32_t redundancy)
{
- _node->getComponentRegister().setDistribution(lib::Distribution::SP(
- new lib::Distribution(
- lib::Distribution::getDefaultDistributionConfig(
- redundancy, 100))));
+ auto distribution = std::make_shared<lib::Distribution>(
+ lib::Distribution::getDefaultDistributionConfig(
+ redundancy, 100));
+ // Same rationale for not triggering a full distribution change as
+ // in setupDistributor()
+ _node->getComponentRegister().setDistribution(distribution);
+ _distributor->propagateDefaultDistribution(std::move(distribution));
+}
+
+void
+DistributorTestUtil::triggerDistributionChange(lib::Distribution::SP distr)
+{
+ _node->getComponentRegister().setDistribution(std::move(distr));
_distributor->storageDistributionChanged();
+ _distributor->enableNextDistribution();
}
void
@@ -110,10 +130,8 @@ DistributorTestUtil::getIdealStr(document::BucketId id, const lib::ClusterState&
}
std::vector<uint16_t> nodes;
- _component->getDistribution()->getIdealNodes(lib::NodeType::STORAGE,
- state,
- id,
- nodes);
+ getDistribution().getIdealNodes(
+ lib::NodeType::STORAGE, state, id, nodes);
std::sort(nodes.begin(), nodes.end());
std::ostringstream ost;
ost << id << ": " << dumpVector(nodes);
@@ -132,10 +150,8 @@ DistributorTestUtil::addIdealNodes(const lib::ClusterState& state,
std::vector<uint16_t> res;
assert(_component.get());
- _component->getDistribution()->getIdealNodes(lib::NodeType::STORAGE,
- state,
- id,
- res);
+ getDistribution().getIdealNodes(
+ lib::NodeType::STORAGE, state, id, res);
for (uint32_t i = 0; i < res.size(); ++i) {
if (state.getNodeState(lib::Node(lib::NodeType::STORAGE, res[i])).getState() !=
@@ -280,7 +296,7 @@ DistributorTestUtil::sendReply(Operation& op,
BucketDatabase::Entry
DistributorTestUtil::getBucket(const document::BucketId& bId) const
{
- return _distributor->getBucketDatabase().get(bId);
+ return getBucketDatabase().get(bId);
}
void
diff --git a/storage/src/tests/distributor/distributortestutil.h b/storage/src/tests/distributor/distributortestutil.h
index 43b56859d0d..fe7809e5ba6 100644
--- a/storage/src/tests/distributor/distributortestutil.h
+++ b/storage/src/tests/distributor/distributortestutil.h
@@ -135,7 +135,20 @@ public:
return _config;
}
- BucketDatabase& getBucketDatabase() { return _distributor->getBucketDatabase(); }
+ BucketDatabase& getBucketDatabase() {
+ return _distributor->getDefaultBucketSpace().getBucketDatabase();
+ }
+ const BucketDatabase& getBucketDatabase() const {
+ return _distributor->getDefaultBucketSpace().getBucketDatabase();
+ }
+
+ const lib::Distribution& getDistribution() const {
+ return _distributor->getDefaultBucketSpace().getDistribution();
+ }
+ // "End to end" distribution change trigger, which will invoke the bucket
+ // DB updater as expected based on the previous and new cluster state
+ // and config.
+ void triggerDistributionChange(lib::Distribution::SP distr);
framework::defaultimplementation::FakeClock& getClock() { return _node->getClock(); }
DistributorComponentRegister& getComponentRegister() { return _node->getComponentRegister(); }
diff --git a/storage/src/tests/distributor/statecheckerstest.cpp b/storage/src/tests/distributor/statecheckerstest.cpp
index da444b9d22a..5bf66dbb128 100644
--- a/storage/src/tests/distributor/statecheckerstest.cpp
+++ b/storage/src/tests/distributor/statecheckerstest.cpp
@@ -1356,7 +1356,7 @@ StateCheckersTest::testBucketStatePerGroup()
config.group[3].nodes[1].index = 10;
config.group[3].nodes[2].index = 11;
lib::Distribution::SP distr(new lib::Distribution(config));
- _node->getComponentRegister().setDistribution(distr);
+ triggerDistributionChange(std::move(distr));
{
DistributorConfiguration::MaintenancePriorities mp;
diff --git a/storage/src/vespa/storage/common/distributorcomponent.cpp b/storage/src/vespa/storage/common/distributorcomponent.cpp
index 2d40b728b88..c64429f0881 100644
--- a/storage/src/vespa/storage/common/distributorcomponent.cpp
+++ b/storage/src/vespa/storage/common/distributorcomponent.cpp
@@ -7,7 +7,7 @@ namespace storage {
DistributorComponent::DistributorComponent(DistributorComponentRegister& compReg,
vespalib::stringref name)
: StorageComponent(compReg, name),
- _bucketDatabase(0), _timeCalculator(0),
+ _timeCalculator(0),
_totalConfig(*this)
{
compReg.registerDistributorComponent(*this);
diff --git a/storage/src/vespa/storage/common/distributorcomponent.h b/storage/src/vespa/storage/common/distributorcomponent.h
index 14d864151de..8b257d82222 100644
--- a/storage/src/vespa/storage/common/distributorcomponent.h
+++ b/storage/src/vespa/storage/common/distributorcomponent.h
@@ -53,9 +53,7 @@ struct DistributorManagedComponent
{
virtual ~DistributorManagedComponent() {}
- virtual void setIdealNodeCalculator(lib::IdealNodeCalculator&) = 0;
virtual void setTimeCalculator(UniqueTimeCalculator&) = 0;
- virtual void setBucketDatabase(BucketDatabase&) = 0;
virtual void setDistributorConfig(const DistributorConfig&)= 0;
virtual void setVisitorConfig(const VisitorConfig&) = 0;
};
@@ -69,18 +67,12 @@ struct DistributorComponentRegister : public virtual StorageComponentRegister
class DistributorComponent : public StorageComponent,
private DistributorManagedComponent
{
- lib::IdealNodeCalculator* _idealNodeCalculator;
- BucketDatabase* _bucketDatabase;
mutable UniqueTimeCalculator* _timeCalculator;
DistributorConfig _distributorConfig;
VisitorConfig _visitorConfig;
DistributorConfiguration _totalConfig;
// DistributorManagedComponent implementation
- virtual void setBucketDatabase(BucketDatabase& db)
- { _bucketDatabase = &db; }
- virtual void setIdealNodeCalculator(lib::IdealNodeCalculator& c)
- { _idealNodeCalculator = &c; }
virtual void setTimeCalculator(UniqueTimeCalculator& utc)
{ _timeCalculator = &utc; }
virtual void setDistributorConfig(const DistributorConfig& c)
@@ -92,7 +84,7 @@ public:
typedef std::unique_ptr<DistributorComponent> UP;
DistributorComponent(DistributorComponentRegister& compReg,
- vespalib::stringref name);
+ vespalib::stringref name);
~DistributorComponent();
api::Timestamp getUniqueTimestamp() const {
@@ -108,12 +100,6 @@ public:
getTotalDistributorConfig() const {
return _totalConfig;
}
- BucketDatabase& getBucketDatabase() {
- assert(_bucketDatabase); return *_bucketDatabase;
- }
- lib::IdealNodeCalculator& getIdealNodeCalculator() const {
- assert(_idealNodeCalculator); return *_idealNodeCalculator;
- }
};
} // storage
diff --git a/storage/src/vespa/storage/distributor/CMakeLists.txt b/storage/src/vespa/storage/distributor/CMakeLists.txt
index 5e8e0b106c8..ac4286458bb 100644
--- a/storage/src/vespa/storage/distributor/CMakeLists.txt
+++ b/storage/src/vespa/storage/distributor/CMakeLists.txt
@@ -30,6 +30,9 @@ vespa_add_library(storage_distributor
distributor_host_info_reporter.cpp
latency_statistics_provider.cpp
ownership_transfer_safe_time_point_calculator.cpp
+ bucket_space.cpp
+ bucket_space_component.cpp
+ bucket_space_repo.cpp
$<TARGET_OBJECTS:storage_distributoroperation>
$<TARGET_OBJECTS:storage_distributoroperationexternal>
$<TARGET_OBJECTS:storage_distributoroperationidealstate>
diff --git a/storage/src/vespa/storage/distributor/bucket_space.cpp b/storage/src/vespa/storage/distributor/bucket_space.cpp
new file mode 100644
index 00000000000..2e0d6f55873
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/bucket_space.cpp
@@ -0,0 +1,16 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "bucket_space.h"
+#include <vespa/storage/frameworkimpl/component/distributorcomponentregisterimpl.h>
+
+namespace storage {
+namespace distributor {
+
+BucketSpace::BucketSpace() {
+}
+
+BucketSpace::~BucketSpace() {
+}
+
+}
+}
diff --git a/storage/src/vespa/storage/distributor/bucket_space.h b/storage/src/vespa/storage/distributor/bucket_space.h
new file mode 100644
index 00000000000..22edc2b75b3
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/bucket_space.h
@@ -0,0 +1,44 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <vespa/storage/bucketdb/mapbucketdatabase.h>
+#include <vespa/vdslib/distribution/distribution.h>
+#include <memory>
+
+namespace storage {
+namespace distributor {
+
+/**
+ * A managed bucket space holds specific state and information required for a
+ * keeping track of and computing operations for a single bucket space:
+ * - Bucket database instance
+ * - Distribution config
+ * - Cluster state
+ */
+class BucketSpace {
+ MapBucketDatabase _bucketDatabase;
+ lib::Distribution::SP _distribution;
+public:
+ BucketSpace();
+ ~BucketSpace();
+
+ MapBucketDatabase& getBucketDatabase() noexcept {
+ return _bucketDatabase;
+ }
+ const MapBucketDatabase& getBucketDatabase() const noexcept {
+ return _bucketDatabase;
+ }
+
+ void setDistribution(lib::Distribution::SP distribution) {
+ _distribution = std::move(distribution);
+ }
+
+ // Precondition: setDistribution has been called at least once prior.
+ const lib::Distribution& getDistribution() const noexcept {
+ return *_distribution;
+ }
+
+};
+
+}
+}
diff --git a/storage/src/vespa/storage/distributor/bucket_space_component.cpp b/storage/src/vespa/storage/distributor/bucket_space_component.cpp
new file mode 100644
index 00000000000..840bbf8985c
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/bucket_space_component.cpp
@@ -0,0 +1,20 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/fastos/fastos.h>
+#include <vespa/storage/distributor/bucket_space_component.h>
+
+namespace storage {
+namespace distributor {
+
+BucketSpaceComponent::BucketSpaceComponent(
+ DistributorInterface& distributor,
+ BucketSpace& bucketSpace,
+ DistributorComponentRegister& compReg,
+ const std::string& name)
+ : DistributorComponent(distributor, compReg, name),
+ _bucketSpace(bucketSpace)
+{
+}
+
+} // distributor
+} // storage
diff --git a/storage/src/vespa/storage/distributor/bucket_space_component.h b/storage/src/vespa/storage/distributor/bucket_space_component.h
new file mode 100644
index 00000000000..f939220926d
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/bucket_space_component.h
@@ -0,0 +1,37 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include "distributorcomponent.h"
+#include "bucket_space.h"
+
+namespace storage {
+namespace distributor {
+
+/**
+ * Component bound to a specific bucket space, with utility operations to
+ * operate on buckets in this space.
+ */
+class BucketSpaceComponent : public DistributorComponent {
+ BucketSpace& _bucketSpace;
+public:
+ BucketSpaceComponent(DistributorInterface& distributor,
+ BucketSpace& bucketSpace,
+ DistributorComponentRegister& compReg,
+ const std::string& name);
+
+ BucketDatabase& getBucketDatabase() override {
+ return _bucketSpace.getBucketDatabase();
+ }
+
+ const BucketDatabase& getBucketDatabase() const override {
+ return _bucketSpace.getBucketDatabase();
+ }
+
+ const lib::Distribution& getDistribution() const override {
+ return _bucketSpace.getDistribution();
+ }
+
+};
+
+}
+}
diff --git a/storage/src/vespa/storage/distributor/bucket_space_repo.cpp b/storage/src/vespa/storage/distributor/bucket_space_repo.cpp
new file mode 100644
index 00000000000..9dd60b997dd
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/bucket_space_repo.cpp
@@ -0,0 +1,27 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "bucket_space_repo.h"
+#include <vespa/vdslib/distribution/distribution.h>
+
+#include <vespa/log/log.h>
+LOG_SETUP(".distributor.bucket_space_repo");
+
+namespace storage {
+namespace distributor {
+
+BucketSpaceRepo::BucketSpaceRepo() {
+}
+
+BucketSpaceRepo::~BucketSpaceRepo() {
+}
+
+void BucketSpaceRepo::setDefaultDistribution(
+ std::shared_ptr<lib::Distribution> distr)
+{
+ LOG(debug, "Got new distribution '%s'", distr->toString().c_str());
+ // TODO all spaces, per-space config transforms
+ _defaultSpace.setDistribution(std::move(distr));
+}
+
+}
+}
diff --git a/storage/src/vespa/storage/distributor/bucket_space_repo.h b/storage/src/vespa/storage/distributor/bucket_space_repo.h
new file mode 100644
index 00000000000..1cf8207cc75
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/bucket_space_repo.h
@@ -0,0 +1,31 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include "bucket_space.h"
+#include <memory>
+
+namespace storage {
+
+namespace lib {
+class Distribution;
+}
+
+namespace distributor {
+
+class BucketSpaceRepo {
+ // TODO: multiple spaces. This is just to start re-wiring things.
+ BucketSpace _defaultSpace;
+public:
+ BucketSpaceRepo();
+ ~BucketSpaceRepo();
+
+ BucketSpace& getDefaultSpace() noexcept { return _defaultSpace; }
+ const BucketSpace& getDefaultSpace() const noexcept {
+ return _defaultSpace;
+ }
+
+ void setDefaultDistribution(std::shared_ptr<lib::Distribution> distr);
+};
+
+}
+}
diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
index b7235c4f13d..abb079cdea5 100644
--- a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
+++ b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
@@ -23,10 +23,11 @@ namespace storage {
namespace distributor {
BucketDBUpdater::BucketDBUpdater(Distributor& owner,
+ BucketSpace& bucketSpace,
DistributorMessageSender& sender,
DistributorComponentRegister& compReg)
: framework::StatusReporter("bucketdb", "Bucket DB Updater"),
- _distributorComponent(owner, compReg, "Bucket DB Updater"),
+ _distributorComponent(owner, bucketSpace, compReg, "Bucket DB Updater"),
_sender(sender)
{
}
diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.h b/storage/src/vespa/storage/distributor/bucketdbupdater.h
index 3f639c1b28d..42afb6b3d27 100644
--- a/storage/src/vespa/storage/distributor/bucketdbupdater.h
+++ b/storage/src/vespa/storage/distributor/bucketdbupdater.h
@@ -14,6 +14,7 @@
#include <vespa/storage/distributor/distributorcomponent.h>
#include <vespa/storage/distributor/distributormessagesender.h>
#include <vespa/storage/distributor/pendingclusterstate.h>
+#include <vespa/storage/distributor/bucket_space_component.h>
#include <vespa/storageframework/generic/memory/memorymanagerinterface.h>
#include <vespa/storageapi/messageapi/messagehandler.h>
@@ -29,7 +30,10 @@ class BucketDBUpdater : public framework::StatusReporter,
public api::MessageHandler
{
public:
+ // TODO take in BucketSpaceRepo instead, this class needs access to all
+ // bucket spaces.
BucketDBUpdater(Distributor& owner,
+ BucketSpace& bucketSpace,
DistributorMessageSender& sender,
DistributorComponentRegister& compReg);
~BucketDBUpdater();
@@ -77,7 +81,7 @@ public:
}
private:
- DistributorComponent _distributorComponent;
+ BucketSpaceComponent _distributorComponent;
class MergeReplyGuard {
public:
MergeReplyGuard(BucketDBUpdater& updater,
diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp
index a6b7b7b2411..0dbde475402 100644
--- a/storage/src/vespa/storage/distributor/distributor.cpp
+++ b/storage/src/vespa/storage/distributor/distributor.cpp
@@ -9,6 +9,7 @@
#include <vespa/storage/distributor/throttlingoperationstarter.h>
#include <vespa/storage/distributor/idealstatemetricsset.h>
#include <vespa/storage/distributor/ownership_transfer_safe_time_point_calculator.h>
+#include <vespa/storage/distributor/bucket_space_repo.h>
#include <vespa/storage/common/nodestateupdater.h>
#include <vespa/storage/common/hostreporter/hostinfo.h>
@@ -63,26 +64,28 @@ Distributor::Distributor(DistributorComponentRegister& compReg,
framework::StatusReporter("distributor", "Distributor"),
_compReg(compReg),
_component(compReg, "distributor"),
+ _bucketSpaceRepo(std::make_unique<BucketSpaceRepo>()),
_metrics(new DistributorMetricSet(
_component.getLoadTypes()->getMetricLoadTypes())),
_operationOwner(*this, _component.getClock()),
_maintenanceOperationOwner(*this, _component.getClock()),
_pendingMessageTracker(compReg),
- _bucketDBUpdater(*this, *this, compReg),
+ _bucketDBUpdater(*this, getDefaultBucketSpace(), *this, compReg),
_distributorStatusDelegate(compReg, *this, *this),
_bucketDBStatusDelegate(compReg, *this, _bucketDBUpdater),
- _idealStateManager(*this, compReg,
+ _idealStateManager(*this, getDefaultBucketSpace(), compReg,
manageActiveBucketCopies),
- _externalOperationHandler(*this, _idealStateManager, compReg),
+ _externalOperationHandler(*this, getDefaultBucketSpace(),
+ _idealStateManager, compReg),
_threadPool(threadPool),
_initializingIsUp(true),
_doneInitializeHandler(doneInitHandler),
_doneInitializing(false),
_messageSender(messageSender),
_bucketPriorityDb(new SimpleBucketPriorityDatabase()),
- _scanner(new SimpleMaintenanceScanner(*_bucketPriorityDb,
- _idealStateManager,
- getBucketDatabase())),
+ _scanner(new SimpleMaintenanceScanner(
+ *_bucketPriorityDb, _idealStateManager,
+ getDefaultBucketSpace().getBucketDatabase())),
_throttlingStarter(new ThrottlingOperationStarter(
_maintenanceOperationOwner)),
_blockingStarter(new BlockingOperationStarter(_pendingMessageTracker,
@@ -111,6 +114,7 @@ Distributor::Distributor(DistributorComponentRegister& compReg,
_distributorStatusDelegate.registerStatusPage();
_bucketDBStatusDelegate.registerStatusPage();
hostInfoReporterRegistrar.registerReporter(&_hostInfoReporter);
+ _bucketSpaceRepo->setDefaultDistribution(_component.getDistribution());
};
Distributor::~Distributor()
@@ -137,6 +141,22 @@ Distributor::getPendingMessageTracker() const
return _pendingMessageTracker;
}
+BucketSpace& Distributor::getDefaultBucketSpace() noexcept {
+ return _bucketSpaceRepo->getDefaultSpace();
+}
+
+const BucketSpace& Distributor::getDefaultBucketSpace() const noexcept {
+ return _bucketSpaceRepo->getDefaultSpace();
+}
+
+BucketDatabase& Distributor::getBucketDatabase() {
+ return getDefaultBucketSpace().getBucketDatabase();
+}
+
+const BucketDatabase& Distributor::getBucketDatabase() const {
+ return getDefaultBucketSpace().getBucketDatabase();
+}
+
BucketOwnership
Distributor::checkOwnershipInPendingState(const document::BucketId& b) const
{
@@ -424,6 +444,7 @@ Distributor::storageDistributionChanged()
"Distribution changed to %s, must refetch bucket information",
_component.getDistribution()->toString().c_str());
+ // FIXME this is not thread safe
_nextDistribution = _component.getDistribution();
} else {
LOG(debug,
@@ -517,6 +538,8 @@ Distributor::checkBucketForSplit(const BucketDatabase::Entry& e,
const lib::Distribution&
Distributor::getDistribution() const
{
+ // FIXME having _distribution be mutable for this is smelly. Is this only
+ // in place for the sake of tests?
if (!_distribution.get()) {
_distribution = _component.getDistribution();
}
@@ -529,12 +552,20 @@ Distributor::enableNextDistribution()
{
if (_nextDistribution.get()) {
_distribution = _nextDistribution;
+ propagateDefaultDistribution(_distribution);
_nextDistribution = std::shared_ptr<lib::Distribution>();
_bucketDBUpdater.storageDistributionChanged(getDistribution());
}
}
void
+Distributor::propagateDefaultDistribution(
+ std::shared_ptr<lib::Distribution> distribution)
+{
+ _bucketSpaceRepo->setDefaultDistribution(std::move(distribution));
+}
+
+void
Distributor::signalWorkWasDone()
{
_tickResult = framework::ThreadWaitInfo::MORE_WORK_ENQUEUED;
diff --git a/storage/src/vespa/storage/distributor/distributor.h b/storage/src/vespa/storage/distributor/distributor.h
index cb44c1cbc8a..5bcf1f6d885 100644
--- a/storage/src/vespa/storage/distributor/distributor.h
+++ b/storage/src/vespa/storage/distributor/distributor.h
@@ -31,6 +31,7 @@ class HostInfo;
namespace distributor {
+class BucketSpaceRepo;
class SimpleMaintenanceScanner;
class BlockingOperationStarter;
class ThrottlingOperationStarter;
@@ -143,12 +144,8 @@ public:
return !_doneInitializing;
}
- BucketDatabase& getBucketDatabase() {
- return _component.getBucketDatabase();
- }
- const BucketDatabase& getBucketDatabase() const {
- return const_cast<Distributor&>(*this).getBucketDatabase();
- }
+ BucketDatabase& getBucketDatabase();
+ const BucketDatabase& getBucketDatabase() const;
const DistributorConfiguration& getConfig() const {
return _component.getTotalDistributorConfig();
@@ -172,6 +169,9 @@ public:
return *_bucketIdHasher;
}
+ BucketSpace& getDefaultBucketSpace() noexcept;
+ const BucketSpace& getDefaultBucketSpace() const noexcept;
+
private:
friend class Distributor_Test;
friend class BucketDBUpdaterTest;
@@ -241,11 +241,13 @@ private:
Operation::SP& operation);
void enableNextDistribution();
+ void propagateDefaultDistribution(std::shared_ptr<lib::Distribution>);
lib::ClusterState _clusterState;
DistributorComponentRegister& _compReg;
storage::DistributorComponent _component;
+ std::unique_ptr<BucketSpaceRepo> _bucketSpaceRepo;
std::shared_ptr<DistributorMetricSet> _metrics;
OperationOwner _operationOwner;
diff --git a/storage/src/vespa/storage/distributor/distributorcomponent.h b/storage/src/vespa/storage/distributor/distributorcomponent.h
index 9efa4c5186b..7fa366a9d02 100644
--- a/storage/src/vespa/storage/distributor/distributorcomponent.h
+++ b/storage/src/vespa/storage/distributor/distributorcomponent.h
@@ -158,14 +158,11 @@ public:
return _distributor;
}
- virtual BucketDatabase& getBucketDatabase()
- { return _distributor.getBucketDatabase(); }
-
- virtual const BucketDatabase& getBucketDatabase() const
- { return _distributor.getBucketDatabase(); }
-
- const lib::Distribution& getDistribution() const
- { return _distributor.getDistribution(); };
+ virtual BucketDatabase& getBucketDatabase() = 0;
+ virtual const BucketDatabase& getBucketDatabase() const = 0;
+ // FIXME this hides the StorageComponent::getDistribution method, which
+ // even has a different signature altogether...!
+ virtual const lib::Distribution& getDistribution() const = 0;
/**
* Finds a bucket that has the same direct parent as the given bucket
diff --git a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
index 9fd1aff37e0..88edeea1225 100644
--- a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
+++ b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
@@ -39,9 +39,11 @@ namespace distributor {
ExternalOperationHandler::ExternalOperationHandler(
Distributor& owner,
+ BucketSpace& bucketSpace,
const MaintenanceOperationGenerator& gen,
DistributorComponentRegister& compReg)
- : DistributorComponent(owner, compReg, "Distributor manager"),
+ : BucketSpaceComponent(owner, bucketSpace, compReg,
+ "External operation handler"),
_visitorMetrics(getLoadTypes()->getMetricLoadTypes(),
*&VisitorMetricSet(NULL)),
_operationGenerator(gen),
diff --git a/storage/src/vespa/storage/distributor/externaloperationhandler.h b/storage/src/vespa/storage/distributor/externaloperationhandler.h
index e804118d6fe..f0daf169967 100644
--- a/storage/src/vespa/storage/distributor/externaloperationhandler.h
+++ b/storage/src/vespa/storage/distributor/externaloperationhandler.h
@@ -6,6 +6,7 @@
#include <vespa/vdslib/state/clusterstate.h>
#include <vespa/storage/distributor/distributorcomponent.h>
#include <vespa/storage/distributor/visitormetricsset.h>
+#include <vespa/storage/distributor/bucket_space_component.h>
#include <vespa/storageapi/messageapi/messagehandler.h>
#include <vespa/storageframework/storageframework.h>
#include <chrono>
@@ -20,7 +21,7 @@ namespace distributor {
class Distributor;
class MaintenanceOperationGenerator;
-class ExternalOperationHandler : public DistributorComponent,
+class ExternalOperationHandler : public BucketSpaceComponent,
public api::MessageHandler
{
public:
@@ -38,6 +39,7 @@ public:
DEF_MSG_COMMAND_H(GetBucketList);
ExternalOperationHandler(Distributor& owner,
+ BucketSpace& bucketSpace,
const MaintenanceOperationGenerator&,
DistributorComponentRegister& compReg);
diff --git a/storage/src/vespa/storage/distributor/idealstatemanager.cpp b/storage/src/vespa/storage/distributor/idealstatemanager.cpp
index 0b8c7abac23..af748993e1e 100644
--- a/storage/src/vespa/storage/distributor/idealstatemanager.cpp
+++ b/storage/src/vespa/storage/distributor/idealstatemanager.cpp
@@ -30,11 +30,12 @@ namespace distributor {
IdealStateManager::IdealStateManager(
Distributor& owner,
+ BucketSpace& bucketSpace,
DistributorComponentRegister& compReg,
bool manageActiveBucketCopies)
: HtmlStatusReporter("idealstateman", "Ideal state manager"),
_metrics(new IdealStateMetricSet),
- _distributorComponent(owner, compReg, "Ideal state manager")
+ _distributorComponent(owner, bucketSpace, compReg, "Ideal state manager")
{
_distributorComponent.registerStatusPage(*this);
_distributorComponent.registerMetric(*_metrics);
diff --git a/storage/src/vespa/storage/distributor/idealstatemanager.h b/storage/src/vespa/storage/distributor/idealstatemanager.h
index 0fc29cf834a..665db1caa5e 100644
--- a/storage/src/vespa/storage/distributor/idealstatemanager.h
+++ b/storage/src/vespa/storage/distributor/idealstatemanager.h
@@ -4,7 +4,7 @@
#include <deque>
#include <map>
#include <set>
-#include <vespa/storage/distributor/distributorcomponent.h>
+#include <vespa/storage/distributor/bucket_space_component.h>
#include <vespa/storage/distributor/statechecker.h>
#include <vespa/storage/distributor/maintenance/maintenanceprioritygenerator.h>
#include <vespa/storage/distributor/maintenance/maintenanceoperationgenerator.h>
@@ -41,7 +41,8 @@ class IdealStateManager : public framework::HtmlStatusReporter,
{
public:
- IdealStateManager(Distributor& owner,
+ IdealStateManager(Distributor& owner,
+ BucketSpace& bucketSpace,
DistributorComponentRegister& compReg,
bool manageActiveBucketCopies);
@@ -112,7 +113,7 @@ private:
std::vector<StateChecker::SP> _stateCheckers;
SplitBucketStateChecker* _splitBucketStateChecker;
- DistributorComponent _distributorComponent;
+ BucketSpaceComponent _distributorComponent;
std::vector<IdealStateOperation::SP> generateOperationsForBucket(
StateChecker::Context& c) const;
diff --git a/storage/src/vespa/storage/frameworkimpl/component/distributorcomponentregisterimpl.cpp b/storage/src/vespa/storage/frameworkimpl/component/distributorcomponentregisterimpl.cpp
index d412a5f842a..5bbc26aa6e5 100644
--- a/storage/src/vespa/storage/frameworkimpl/component/distributorcomponentregisterimpl.cpp
+++ b/storage/src/vespa/storage/frameworkimpl/component/distributorcomponentregisterimpl.cpp
@@ -11,11 +11,11 @@ LOG_SETUP(".storage.component.register.distributor");
namespace storage {
DistributorComponentRegisterImpl::DistributorComponentRegisterImpl()
- : _timeCalculator(0),
- _bucketDatabase(),
- _idealNodeCalculator(new lib::IdealNodeCalculatorImpl)
+ : _timeCalculator(0)
{
- _idealNodeCalculator->setClusterState(_clusterState);
+}
+
+DistributorComponentRegisterImpl::~DistributorComponentRegisterImpl() {
}
void
@@ -33,20 +33,8 @@ DistributorComponentRegisterImpl::registerDistributorComponent(
if (_timeCalculator != 0) {
smc.setTimeCalculator(*_timeCalculator);
}
- smc.setBucketDatabase(_bucketDatabase);
smc.setDistributorConfig(_distributorConfig);
smc.setVisitorConfig(_visitorConfig);
- smc.setIdealNodeCalculator(*_idealNodeCalculator);
-}
-
-void
-DistributorComponentRegisterImpl::setIdealNodeCalculator(
- std::unique_ptr<lib::IdealNodeCalculatorConfigurable> calc)
-{
- _idealNodeCalculator = std::move(calc);
- for (uint32_t i=0; i<_components.size(); ++i) {
- _components[i]->setIdealNodeCalculator(*_idealNodeCalculator);
- }
}
void
@@ -86,13 +74,6 @@ DistributorComponentRegisterImpl::setVisitorConfig(const VisitorConfig& c)
}
void
-DistributorComponentRegisterImpl::setDistribution(lib::Distribution::SP d)
-{
- StorageComponentRegisterImpl::setDistribution(d);
- _idealNodeCalculator->setDistribution(*d);
-}
-
-void
DistributorComponentRegisterImpl::setNodeStateUpdater(NodeStateUpdater& updater)
{
StorageComponentRegisterImpl::setNodeStateUpdater(updater);
diff --git a/storage/src/vespa/storage/frameworkimpl/component/distributorcomponentregisterimpl.h b/storage/src/vespa/storage/frameworkimpl/component/distributorcomponentregisterimpl.h
index d70d69f6022..f58303519c5 100644
--- a/storage/src/vespa/storage/frameworkimpl/component/distributorcomponentregisterimpl.h
+++ b/storage/src/vespa/storage/frameworkimpl/component/distributorcomponentregisterimpl.h
@@ -26,27 +26,21 @@ class DistributorComponentRegisterImpl
std::vector<DistributorManagedComponent*> _components;
UniqueTimeCalculator* _timeCalculator;
- MapBucketDatabase _bucketDatabase;
DistributorConfig _distributorConfig;
VisitorConfig _visitorConfig;
lib::ClusterState _clusterState;
- std::unique_ptr<lib::IdealNodeCalculatorConfigurable> _idealNodeCalculator;
public:
typedef std::unique_ptr<DistributorComponentRegisterImpl> UP;
DistributorComponentRegisterImpl();
-
- BucketDatabase& getBucketDatabase() { return _bucketDatabase; }
+ ~DistributorComponentRegisterImpl();
virtual void registerDistributorComponent(DistributorManagedComponent&);
void setTimeCalculator(UniqueTimeCalculator& calc);
void setDistributorConfig(const DistributorConfig&);
void setVisitorConfig(const VisitorConfig&);
- void setDistribution(lib::Distribution::SP);
- void setIdealNodeCalculator(
- std::unique_ptr<lib::IdealNodeCalculatorConfigurable>);
private:
virtual void handleNewState();
diff --git a/storage/src/vespa/storage/frameworkimpl/thread/deadlockdetector.cpp b/storage/src/vespa/storage/frameworkimpl/thread/deadlockdetector.cpp
index 99b481877ca..35c427b26ca 100644
--- a/storage/src/vespa/storage/frameworkimpl/thread/deadlockdetector.cpp
+++ b/storage/src/vespa/storage/frameworkimpl/thread/deadlockdetector.cpp
@@ -23,8 +23,7 @@ DeadLockDetector::DeadLockDetector(StorageComponentRegister& compReg,
_enableWarning(true),
_enableShutdown(false),
_processSlackMs(30 * 1000),
- _waitSlackMs(5 * 1000),
- _reportedBucketDBLocksAtState(OK)
+ _waitSlackMs(5 * 1000)
{
DistributorComponentRegister* dComp(
dynamic_cast<DistributorComponentRegister*>(&compReg));
@@ -125,23 +124,6 @@ DeadLockDetector::isAboveWarnThreshold(
return (tick._lastTickMs + tp.getMaxCycleTime() + slack / 4 < time.getTime());
}
-vespalib::string
-DeadLockDetector::getBucketLockInfo() const
-{
- vespalib::asciistream ost;
- if (_dComponent.get()) {
- if (_dComponent->getBucketDatabase().size() > 0) {
- //_dComponent->getBucketDatabase().showLockClients(ost);
- ost << "No bucket lock information available for distributor\n";
- }
- } else {
- if (_slComponent->getBucketDatabase().size() > 0) {
- _slComponent->getBucketDatabase().showLockClients(ost);
- }
- }
- return ost.str();
-}
-
namespace {
struct ThreadChecker : public DeadLockDetector::ThreadVisitor
{
@@ -202,12 +184,6 @@ DeadLockDetector::handleDeadlock(const framework::MilliSecTime& currentTime,
if (_enableWarning) {
LOGBT(warning, "deadlockw-" + id, "%s",
error.str().c_str());
- if (_reportedBucketDBLocksAtState != WARNED) {
- _reportedBucketDBLocksAtState = WARNED;
- LOG(info, "Locks in bucket database at deadlock time:"
- "\n%s",
- getBucketLockInfo().c_str());
- }
}
return;
} else {
@@ -217,11 +193,6 @@ DeadLockDetector::handleDeadlock(const framework::MilliSecTime& currentTime,
}
}
if (!_enableShutdown) return;
- if (_reportedBucketDBLocksAtState != HALTED) {
- _reportedBucketDBLocksAtState = HALTED;
- LOG(info, "Locks in bucket database at deadlock time:"
- "\n%s", getBucketLockInfo().c_str());
- }
if (_enableShutdown) {
_killer->kill();
}
@@ -326,14 +297,6 @@ DeadLockDetector::reportHtmlStatus(std::ostream& os,
out << "<p>The deadlock detector is disabled and will only monitor "
<< "tick times.</p>\n";
}
- out << "<h2>Current locks in the bucket database</h2>\n"
- << "<p>In case of a software bug causing a deadlock in the code, bucket"
- << " database locks are a likely reason. Thus, we list current locks "
- << "here in hopes that it will simplify debugging.</p>\n"
- << "<p>Bucket database</p>\n"
- << "<pre>\n"
- << getBucketLockInfo()
- << "</pre>\n";
os << out.str();
}
diff --git a/storage/src/vespa/storage/frameworkimpl/thread/deadlockdetector.h b/storage/src/vespa/storage/frameworkimpl/thread/deadlockdetector.h
index 61ce0b26757..87ea20a02db 100644
--- a/storage/src/vespa/storage/frameworkimpl/thread/deadlockdetector.h
+++ b/storage/src/vespa/storage/frameworkimpl/thread/deadlockdetector.h
@@ -82,7 +82,6 @@ private:
bool _enableShutdown;
std::atomic<uint64_t> _processSlackMs;
std::atomic<uint64_t> _waitSlackMs;
- State _reportedBucketDBLocksAtState;
DistributorComponent::UP _dComponent;
ServiceLayerComponent::UP _slComponent;
StorageComponent* _component;
@@ -93,7 +92,6 @@ private:
// Status implementation
virtual void reportHtmlStatus(std::ostream& out,
const framework::HttpUrlPath&) const;
- vespalib::string getBucketLockInfo() const;
};
}