aboutsummaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorGeir Storli <geirst@verizonmedia.com>2021-04-29 11:21:30 +0000
committerGeir Storli <geirst@verizonmedia.com>2021-04-29 13:27:59 +0000
commitb5ffe5a1927c93c0c3eccc66c76be87f6361841e (patch)
tree60d56413786f397599cc6ba3ea88e34cb9dc6bfe /storage
parent126d4f78c4464c79f3365c433890119306693102 (diff)
Make the top-level BucketDBUpdater less dependant on the single existing distributor stripe.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/vespa/storage/distributor/CMakeLists.txt1
-rw-r--r--storage/src/vespa/storage/distributor/bucketdbupdater.cpp51
-rw-r--r--storage/src/vespa/storage/distributor/bucketdbupdater.h19
-rw-r--r--storage/src/vespa/storage/distributor/distributor.cpp18
-rw-r--r--storage/src/vespa/storage/distributor/distributor.h10
-rw-r--r--storage/src/vespa/storage/distributor/distributor_component.cpp27
-rw-r--r--storage/src/vespa/storage/distributor/distributor_component.h71
-rw-r--r--storage/src/vespa/storage/distributor/distributor_interface.h20
-rw-r--r--storage/src/vespa/storage/distributor/distributor_operation_context.h32
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe_operation_context.h17
10 files changed, 209 insertions, 57 deletions
diff --git a/storage/src/vespa/storage/distributor/CMakeLists.txt b/storage/src/vespa/storage/distributor/CMakeLists.txt
index 57d6a23c79f..4cf65ddb63a 100644
--- a/storage/src/vespa/storage/distributor/CMakeLists.txt
+++ b/storage/src/vespa/storage/distributor/CMakeLists.txt
@@ -14,6 +14,7 @@ vespa_add_library(storage_distributor
distributor.cpp
distributor_bucket_space.cpp
distributor_bucket_space_repo.cpp
+ distributor_component.cpp
distributor_host_info_reporter.cpp
distributor_status.cpp
distributor_stripe.cpp
diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
index 90d3d24c240..44df64e467e 100644
--- a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
+++ b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
@@ -27,36 +27,37 @@ using document::BucketSpace;
namespace storage::distributor {
-BucketDBUpdater::BucketDBUpdater(DistributorStripeInterface& owner, // FIXME STRIPE!
+BucketDBUpdater::BucketDBUpdater(const DistributorNodeContext& node_ctx,
+ DistributorOperationContext& op_ctx,
+ DistributorInterface& distributor_interface,
DistributorMessageSender& sender,
- DistributorComponentRegister& comp_reg,
+ ChainedMessageSender& chained_sender,
+ std::shared_ptr<const lib::Distribution> bootstrap_distribution,
StripeAccessor& stripe_accessor)
: framework::StatusReporter("temp_bucketdb", "Bucket DB Updater"), // TODO STRIPE rename once duplication is removed
_stripe_accessor(stripe_accessor),
_active_state_bundle(lib::ClusterState()),
- _dummy_mutable_bucket_space_repo(std::make_unique<DistributorBucketSpaceRepo>(owner.getDistributorIndex())),
- _dummy_read_only_bucket_space_repo(std::make_unique<DistributorBucketSpaceRepo>(owner.getDistributorIndex())),
- _distributor_component(owner, *_dummy_mutable_bucket_space_repo, *_dummy_read_only_bucket_space_repo, comp_reg, "Bucket DB Updater"),
- _node_ctx(_distributor_component),
- _op_ctx(_distributor_component),
- _distributor_interface(_distributor_component.getDistributor()),
+ _node_ctx(node_ctx),
+ _op_ctx(op_ctx),
+ _distributor_interface(distributor_interface),
_pending_cluster_state(),
_history(),
_sender(sender),
+ _chained_sender(chained_sender),
_outdated_nodes_map(),
_transition_timer(_node_ctx.clock()),
_stale_reads_enabled(false)
{
// FIXME STRIPE top-level Distributor needs a proper way to track the current cluster state bundle!
propagate_active_state_bundle_internally();
- bootstrap_distribution_config(_distributor_component.getDistribution());
+ bootstrap_distribution_config(bootstrap_distribution);
}
BucketDBUpdater::~BucketDBUpdater() = default;
void
BucketDBUpdater::propagate_active_state_bundle_internally() {
- for (auto* repo : {_dummy_mutable_bucket_space_repo.get(), _dummy_read_only_bucket_space_repo.get()}) {
+ for (auto* repo : {&_op_ctx.bucket_space_repo(), &_op_ctx.read_only_bucket_space_repo()}) {
for (auto& iter : *repo) {
iter.second->setClusterState(_active_state_bundle.getDerivedClusterState(iter.first));
}
@@ -66,7 +67,7 @@ BucketDBUpdater::propagate_active_state_bundle_internally() {
void
BucketDBUpdater::bootstrap_distribution_config(std::shared_ptr<const lib::Distribution> distribution) {
auto global_distr = GlobalBucketSpaceDistributionConverter::convert_to_global(*distribution);
- for (auto* repo : {_dummy_mutable_bucket_space_repo.get(), _dummy_read_only_bucket_space_repo.get()}) {
+ for (auto* repo : {&_op_ctx.bucket_space_repo(), &_op_ctx.read_only_bucket_space_repo()}) {
repo->get(document::FixedBucketSpaces::default_space()).setDistribution(distribution);
repo->get(document::FixedBucketSpaces::global_space()).setDistribution(global_distr);
}
@@ -172,7 +173,7 @@ BucketDBUpdater::ensure_transition_timer_started()
void
BucketDBUpdater::complete_transition_timer()
{
- _distributor_interface.getMetrics()
+ _distributor_interface.metrics()
.stateTransitionTime.addValue(_transition_timer.getElapsedTimeAsDouble());
}
@@ -184,11 +185,11 @@ BucketDBUpdater::storage_distribution_changed(const BucketSpaceDistributionConfi
auto guard = _stripe_accessor.rendezvous_and_hold_all();
// FIXME STRIPE might this cause a mismatch with the component stuff's own distribution config..?!
guard->update_distribution_config(configs);
- remove_superfluous_buckets(*guard, _op_ctx.cluster_state_bundle(), true);
+ remove_superfluous_buckets(*guard, _active_state_bundle, true);
auto clusterInfo = std::make_shared<const SimpleClusterInformation>(
_node_ctx.node_index(),
- _op_ctx.cluster_state_bundle(),
+ _active_state_bundle,
_op_ctx.storage_node_up_states());
_pending_cluster_state = PendingClusterState::createForDistributionChange(
_node_ctx.clock(),
@@ -205,7 +206,7 @@ void
BucketDBUpdater::reply_to_previous_pending_cluster_state_if_any()
{
if (_pending_cluster_state.get() && _pending_cluster_state->hasCommand()) {
- _distributor_interface.getMessageSender().sendUp(
+ _chained_sender.sendUp(
std::make_shared<api::SetSystemStateReply>(*_pending_cluster_state->getCommand()));
}
}
@@ -217,7 +218,7 @@ BucketDBUpdater::reply_to_activation_with_actual_version(
{
auto reply = std::make_shared<api::ActivateClusterStateVersionReply>(cmd);
reply->setActualVersion(actualVersion);
- _distributor_interface.getMessageSender().sendUp(reply); // TODO let API accept rvalues
+ _chained_sender.sendUp(reply); // TODO let API accept rvalues
}
bool
@@ -245,7 +246,7 @@ BucketDBUpdater::onSetSystemState(
auto clusterInfo = std::make_shared<const SimpleClusterInformation>(
_node_ctx.node_index(),
- _op_ctx.cluster_state_bundle(),
+ _active_state_bundle,
_op_ctx.storage_node_up_states());
_pending_cluster_state = PendingClusterState::createForClusterStateChange(
_node_ctx.clock(),
@@ -257,7 +258,7 @@ BucketDBUpdater::onSetSystemState(
_op_ctx.generate_unique_timestamp()); // FIXME STRIPE must be atomic across all threads
_outdated_nodes_map = _pending_cluster_state->getOutdatedNodesMap();
- _distributor_interface.getMetrics().set_cluster_state_processing_time.addValue(
+ _distributor_interface.metrics().set_cluster_state_processing_time.addValue(
process_timer.getElapsedTimeAsDouble());
guard->set_pending_cluster_state_bundle(_pending_cluster_state->getNewClusterStateBundle());
@@ -358,7 +359,7 @@ BucketDBUpdater::process_completed_pending_cluster_state(StripeAccessGuard& guar
// taken effect via activation. External operation handler will keep operations from
// actually being scheduled until state has been activated. The external operation handler
// needs to be explicitly aware of the case where no state has yet to be activated.
- _distributor_interface.getMessageSender().sendDown(_pending_cluster_state->getCommand());
+ _chained_sender.sendDown(_pending_cluster_state->getCommand());
_pending_cluster_state->clearCommand();
return;
}
@@ -379,14 +380,13 @@ BucketDBUpdater::activate_pending_cluster_state(StripeAccessGuard& guard)
LOG(debug, "Activating pending cluster state version %u", _pending_cluster_state->clusterStateVersion());
enable_current_cluster_state_bundle_in_distributor_and_stripes(guard);
if (_pending_cluster_state->hasCommand()) {
- _distributor_interface.getMessageSender().sendDown(_pending_cluster_state->getCommand());
+ _chained_sender.sendDown(_pending_cluster_state->getCommand());
}
add_current_state_to_cluster_state_history();
} else {
LOG(debug, "Activating pending distribution config");
// TODO distribution changes cannot currently be deferred as they are not
// initiated by the cluster controller!
- _distributor_interface.notifyDistributionChangeEnabled(); // TODO factor these two out into one func?
guard.notify_distribution_change_enabled();
}
@@ -397,7 +397,7 @@ BucketDBUpdater::activate_pending_cluster_state(StripeAccessGuard& guard)
complete_transition_timer();
guard.clear_read_only_bucket_repo_databases();
- _distributor_interface.getMetrics().activate_cluster_state_processing_time.addValue(
+ _distributor_interface.metrics().activate_cluster_state_processing_time.addValue(
process_timer.getElapsedTimeAsDouble());
}
@@ -412,16 +412,11 @@ BucketDBUpdater::enable_current_cluster_state_bundle_in_distributor_and_stripes(
LOG(debug, "BucketDBUpdater finished processing state %s",
state.getBaselineClusterState()->toString().c_str());
- // First enable the cluster state for the _top-level_ distributor component.
- _distributor_interface.enableClusterStateBundle(state);
- // And then subsequently for all underlying stripes. Technically the order doesn't matter
- // since all threads are blocked at this point.
guard.enable_cluster_state_bundle(state);
}
void BucketDBUpdater::simulate_cluster_state_bundle_activation(const lib::ClusterStateBundle& activated_state) {
auto guard = _stripe_accessor.rendezvous_and_hold_all();
- _distributor_interface.enableClusterStateBundle(activated_state);
guard->enable_cluster_state_bundle(activated_state);
_active_state_bundle = activated_state;
@@ -476,7 +471,7 @@ BucketDBUpdater::report_xml_status(vespalib::xml::XmlOutputStream& xos,
using namespace vespalib::xml;
xos << XmlTag("bucketdb")
<< XmlTag("systemstate_active")
- << XmlContent(_op_ctx.cluster_state_bundle().getBaselineClusterState()->toString())
+ << XmlContent(_active_state_bundle.getBaselineClusterState()->toString())
<< XmlEndTag();
if (_pending_cluster_state) {
xos << *_pending_cluster_state;
diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.h b/storage/src/vespa/storage/distributor/bucketdbupdater.h
index 04962e3af9b..cd78e0ac5b2 100644
--- a/storage/src/vespa/storage/distributor/bucketdbupdater.h
+++ b/storage/src/vespa/storage/distributor/bucketdbupdater.h
@@ -28,7 +28,7 @@ namespace storage::distributor {
struct BucketSpaceDistributionConfigs;
class BucketSpaceDistributionContext;
-class DistributorStripeInterface;
+class DistributorInterface;
class StripeAccessor;
class StripeAccessGuard;
@@ -37,9 +37,12 @@ class BucketDBUpdater : public framework::StatusReporter,
{
public:
using OutdatedNodesMap = dbtransition::OutdatedNodesMap;
- BucketDBUpdater(DistributorStripeInterface& owner,
+ BucketDBUpdater(const DistributorNodeContext& node_ctx,
+ DistributorOperationContext& op_ctx,
+ DistributorInterface& distributor_interface,
DistributorMessageSender& sender,
- DistributorComponentRegister& comp_reg,
+ ChainedMessageSender& chained_sender,
+ std::shared_ptr<const lib::Distribution> bootstrap_distribution,
StripeAccessor& stripe_accessor);
~BucketDBUpdater() override;
@@ -59,8 +62,6 @@ public:
vespalib::string report_xml_status(vespalib::xml::XmlOutputStream& xos, const framework::HttpUrlPath&) const;
void print(std::ostream& out, bool verbose, const std::string& indent) const;
- const DistributorNodeContext& node_context() const { return _node_ctx; }
- DistributorStripeOperationContext& operation_context() { return _op_ctx; }
void set_stale_reads_enabled(bool enabled) noexcept {
_stale_reads_enabled.store(enabled, std::memory_order_relaxed);
@@ -106,16 +107,14 @@ private:
// TODO STRIPE remove once distributor component dependencies have been pruned
StripeAccessor& _stripe_accessor;
lib::ClusterStateBundle _active_state_bundle;
- std::unique_ptr<DistributorBucketSpaceRepo> _dummy_mutable_bucket_space_repo;
- std::unique_ptr<DistributorBucketSpaceRepo> _dummy_read_only_bucket_space_repo;
- DistributorStripeComponent _distributor_component;
const DistributorNodeContext& _node_ctx;
- DistributorStripeOperationContext& _op_ctx;
- DistributorStripeInterface& _distributor_interface;
+ DistributorOperationContext& _op_ctx;
+ DistributorInterface& _distributor_interface;
std::unique_ptr<PendingClusterState> _pending_cluster_state;
std::list<PendingClusterState::Summary> _history;
DistributorMessageSender& _sender;
+ ChainedMessageSender& _chained_sender;
OutdatedNodesMap _outdated_nodes_map;
framework::MilliSecTimer _transition_timer;
std::atomic<bool> _stale_reads_enabled;
diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp
index 8dd414e8def..d49cecf76b6 100644
--- a/storage/src/vespa/storage/distributor/distributor.cpp
+++ b/storage/src/vespa/storage/distributor/distributor.cpp
@@ -18,6 +18,7 @@
#include <vespa/storage/common/hostreporter/hostinfo.h>
#include <vespa/storage/common/node_identity.h>
#include <vespa/storage/common/nodestateupdater.h>
+#include <vespa/storage/config/distributorconfiguration.h>
#include <vespa/storage/distributor/maintenance/simplebucketprioritydatabase.h>
#include <vespa/storageframework/generic/status/xmlstatusreporter.h>
#include <vespa/vdslib/distribution/distribution.h>
@@ -56,7 +57,8 @@ Distributor::Distributor(DistributorComponentRegister& compReg,
_stripe(std::make_unique<DistributorStripe>(compReg, *_metrics, node_identity, threadPool,
doneInitHandler, *this, (num_distributor_stripes == 0))),
_stripe_accessor(std::make_unique<LegacySingleStripeAccessor>(*_stripe)),
- _component(compReg, "distributor"),
+ _component(*this, compReg, "distributor"),
+ _total_config(_component.total_distributor_config_sp()),
_bucket_db_updater(),
_distributorStatusDelegate(compReg, *this, *this),
_threadPool(threadPool),
@@ -71,8 +73,11 @@ Distributor::Distributor(DistributorComponentRegister& compReg,
_component.registerMetricUpdateHook(_metricUpdateHook, framework::SecondTime(0));
if (num_distributor_stripes > 0) {
LOG(info, "Setting up distributor with %u stripes", num_distributor_stripes); // TODO STRIPE remove once legacy gone
- // FIXME STRIPE using the singular stripe here is a temporary Hack McHack Deluxe 3000!
- _bucket_db_updater = std::make_unique<BucketDBUpdater>(*_stripe, *_stripe, _comp_reg, *_stripe_accessor);
+ // TODO STRIPE: Avoid passing the singular stripe as DistributorMessageSender.
+ _bucket_db_updater = std::make_unique<BucketDBUpdater>(_component, _component,
+ *this, *_stripe,
+ *this, _component.getDistribution(),
+ *_stripe_accessor);
}
_hostInfoReporter.enableReporting(getConfig().getEnableHostInfoReporting());
_distributorStatusDelegate.registerStatusPage();
@@ -274,6 +279,12 @@ Distributor::handleMessage(const std::shared_ptr<api::StorageMessage>& msg)
return _stripe->handleMessage(msg);
}
+const DistributorConfiguration&
+Distributor::config() const
+{
+ return *_total_config;
+}
+
const lib::ClusterStateBundle&
Distributor::getClusterStateBundle() const
{
@@ -396,6 +407,7 @@ Distributor::enableNextConfig()
// Only lazily trigger a config propagation and internal update if something has _actually changed_.
if (_component.internal_config_generation() != _current_internal_config_generation) {
if (_bucket_db_updater) {
+ _total_config = _component.total_distributor_config_sp();
auto guard = _stripe_accessor->rendezvous_and_hold_all();
guard->update_total_distributor_config(_component.total_distributor_config_sp());
} else {
diff --git a/storage/src/vespa/storage/distributor/distributor.h b/storage/src/vespa/storage/distributor/distributor.h
index 074f5fe27d4..db9656eb09b 100644
--- a/storage/src/vespa/storage/distributor/distributor.h
+++ b/storage/src/vespa/storage/distributor/distributor.h
@@ -4,7 +4,9 @@
#include "bucket_spaces_stats_provider.h"
#include "bucketdbupdater.h"
+#include "distributor_component.h"
#include "distributor_host_info_reporter.h"
+#include "distributor_interface.h"
#include "distributor_stripe_interface.h"
#include "externaloperationhandler.h"
#include "idealstatemanager.h"
@@ -46,6 +48,7 @@ class ThrottlingOperationStarter;
class Distributor final
: public StorageLink,
+ public DistributorInterface,
public StatusDelegator,
public framework::StatusReporter,
public framework::TickingThread,
@@ -74,6 +77,10 @@ public:
DistributorMetricSet& getMetrics() { return *_metrics; }
+ // Implements DistributorInterface,
+ DistributorMetricSet& metrics() override { return getMetrics(); }
+ const DistributorConfiguration& config() const override;
+
/**
* Enables a new cluster state. Called after the bucket db updater has
* retrieved all bucket info related to the change.
@@ -171,7 +178,8 @@ private:
// TODO STRIPE multiple stripes...! This is for proof of concept of wiring.
std::unique_ptr<DistributorStripe> _stripe;
std::unique_ptr<LegacySingleStripeAccessor> _stripe_accessor;
- storage::DistributorComponent _component;
+ distributor::DistributorComponent _component;
+ std::shared_ptr<const DistributorConfiguration> _total_config;
std::unique_ptr<BucketDBUpdater> _bucket_db_updater;
StatusReporterDelegate _distributorStatusDelegate;
framework::TickingThreadPool& _threadPool;
diff --git a/storage/src/vespa/storage/distributor/distributor_component.cpp b/storage/src/vespa/storage/distributor/distributor_component.cpp
new file mode 100644
index 00000000000..05a605fbcae
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/distributor_component.cpp
@@ -0,0 +1,27 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "distributor_bucket_space.h"
+#include "distributor_bucket_space_repo.h"
+#include "distributor_component.h"
+
+namespace storage::distributor {
+
+DistributorComponent::DistributorComponent(DistributorInterface& distributor,
+ DistributorComponentRegister& comp_reg,
+ const std::string& name)
+ : storage::DistributorComponent(comp_reg, name),
+ _distributor(distributor),
+ _bucket_space_repo(std::make_unique<DistributorBucketSpaceRepo>(node_index())),
+ _read_only_bucket_space_repo(std::make_unique<DistributorBucketSpaceRepo>(node_index()))
+{
+}
+
+DistributorComponent::~DistributorComponent() = default;
+
+api::StorageMessageAddress
+DistributorComponent::node_address(uint16_t node_index) const noexcept
+{
+ return api::StorageMessageAddress::create(cluster_name_ptr(), lib::NodeType::STORAGE, node_index);
+}
+
+}
diff --git a/storage/src/vespa/storage/distributor/distributor_component.h b/storage/src/vespa/storage/distributor/distributor_component.h
new file mode 100644
index 00000000000..8db70f490ba
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/distributor_component.h
@@ -0,0 +1,71 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "distributor_interface.h"
+#include "distributor_node_context.h"
+#include "distributor_operation_context.h"
+#include <vespa/storage/common/distributorcomponent.h>
+
+namespace storage::distributor {
+
+class DistributorBucketSpaceRepo;
+
+/**
+ * The framework component for the top-level distributor.
+ *
+ * This class should be used directly as little as possible.
+ * Instead the interfaces DistributorNodeContext and DistributorOperationContext should be used where possible.
+ */
+class DistributorComponent : public storage::DistributorComponent,
+ public DistributorNodeContext,
+ public DistributorOperationContext {
+private:
+ DistributorInterface& _distributor;
+ // TODO STRIPE: These bucket space repos are only temporary until we get an interface
+ // to look at state per bucket space.
+ std::unique_ptr<DistributorBucketSpaceRepo> _bucket_space_repo;
+ std::unique_ptr<DistributorBucketSpaceRepo> _read_only_bucket_space_repo;
+
+public:
+ DistributorComponent(DistributorInterface& distributor,
+ DistributorComponentRegister& comp_reg,
+ const std::string& name);
+
+ ~DistributorComponent() override;
+
+ // TODO STRIPE: Unify implementation of this interface between DistributorComponent and DistributorStripeComponent?
+ // Implements DistributorNodeContext
+ const framework::Clock& clock() const noexcept override { return getClock(); }
+ const vespalib::string* cluster_name_ptr() const noexcept override { return cluster_context().cluster_name_ptr(); }
+ const document::BucketIdFactory& bucket_id_factory() const noexcept override { return getBucketIdFactory(); }
+ uint16_t node_index() const noexcept override { return getIndex(); }
+ api::StorageMessageAddress node_address(uint16_t node_index) const noexcept override;
+
+ // Implements DistributorOperationContext
+ api::Timestamp generate_unique_timestamp() override {
+ return getUniqueTimestamp();
+ }
+ const DistributorBucketSpaceRepo& bucket_space_repo() const noexcept override {
+ return *_bucket_space_repo;
+ }
+ DistributorBucketSpaceRepo& bucket_space_repo() noexcept override {
+ return *_bucket_space_repo;
+ }
+ const DistributorBucketSpaceRepo& read_only_bucket_space_repo() const noexcept override {
+ return *_read_only_bucket_space_repo;
+ }
+ DistributorBucketSpaceRepo& read_only_bucket_space_repo() noexcept override {
+ return *_read_only_bucket_space_repo;
+ }
+ const storage::DistributorConfiguration& distributor_config() const noexcept override {
+ return _distributor.config();
+ }
+ const char* storage_node_up_states() const override {
+ // TODO STRIPE: Move to a common place.
+ return "uri";
+ }
+
+};
+
+}
diff --git a/storage/src/vespa/storage/distributor/distributor_interface.h b/storage/src/vespa/storage/distributor/distributor_interface.h
new file mode 100644
index 00000000000..38f212ba911
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/distributor_interface.h
@@ -0,0 +1,20 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+namespace storage { class DistributorConfiguration; }
+
+namespace storage::distributor {
+
+class DistributorMetricSet;
+
+/**
+ * Simple interface to access metrics and config for the top-level distributor.
+ */
+class DistributorInterface {
+public:
+ virtual ~DistributorInterface() {}
+ virtual DistributorMetricSet& metrics() = 0;
+ virtual const DistributorConfiguration& config() const = 0;
+};
+
+}
diff --git a/storage/src/vespa/storage/distributor/distributor_operation_context.h b/storage/src/vespa/storage/distributor/distributor_operation_context.h
new file mode 100644
index 00000000000..470a2f0f788
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/distributor_operation_context.h
@@ -0,0 +1,32 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <vespa/storageapi/defs.h>
+
+namespace storage { class DistributorConfiguration; }
+namespace storage::lib { class ClusterStateBundle; }
+
+namespace storage::distributor {
+
+class DistributorBucketSpaceRepo;
+
+/**
+ * Interface with functionality that is used when handling top-level distributor operations.
+ */
+class DistributorOperationContext {
+public:
+ virtual ~DistributorOperationContext() {}
+ virtual api::Timestamp generate_unique_timestamp() = 0;
+ // TODO STRIPE: Access to bucket space repos is only temporary at this level.
+ virtual const DistributorBucketSpaceRepo& bucket_space_repo() const noexcept= 0;
+ virtual DistributorBucketSpaceRepo& bucket_space_repo() noexcept = 0;
+ virtual const DistributorBucketSpaceRepo& read_only_bucket_space_repo() const noexcept = 0;
+ virtual DistributorBucketSpaceRepo& read_only_bucket_space_repo() noexcept = 0;
+ virtual const DistributorConfiguration& distributor_config() const noexcept = 0;
+
+ // TODO STRIPE: Move to a common place.
+ virtual const char* storage_node_up_states() const = 0;
+};
+
+}
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_operation_context.h b/storage/src/vespa/storage/distributor/distributor_stripe_operation_context.h
index 8a62a102cc0..e4086374ad9 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe_operation_context.h
+++ b/storage/src/vespa/storage/distributor/distributor_stripe_operation_context.h
@@ -3,6 +3,7 @@
#pragma once
#include "bucketownership.h"
+#include "distributor_operation_context.h"
#include "operation_routing_snapshot.h"
#include <vespa/document/bucket/bucketspace.h>
#include <vespa/storage/bucketdb/bucketdatabase.h>
@@ -11,21 +12,16 @@
namespace document { class Bucket; }
-namespace storage { class DistributorConfiguration; }
-namespace storage::lib { class ClusterStateBundle; }
-
namespace storage::distributor {
-class DistributorBucketSpaceRepo;
class PendingMessageTracker;
/**
* Interface with functionality that is used when handling distributor stripe operations.
*/
-class DistributorStripeOperationContext {
+class DistributorStripeOperationContext : public DistributorOperationContext {
public:
virtual ~DistributorStripeOperationContext() {}
- virtual api::Timestamp generate_unique_timestamp() = 0;
virtual void update_bucket_database(const document::Bucket& bucket,
const BucketCopy& changed_node,
uint32_t update_flags = 0) = 0;
@@ -35,15 +31,10 @@ public:
virtual void remove_node_from_bucket_database(const document::Bucket& bucket, uint16_t node_index) = 0;
virtual void remove_nodes_from_bucket_database(const document::Bucket& bucket,
const std::vector<uint16_t>& nodes) = 0;
- virtual const DistributorBucketSpaceRepo& bucket_space_repo() const noexcept= 0;
- virtual DistributorBucketSpaceRepo& bucket_space_repo() noexcept = 0;
- virtual const DistributorBucketSpaceRepo& read_only_bucket_space_repo() const noexcept = 0;
- virtual DistributorBucketSpaceRepo& read_only_bucket_space_repo() noexcept = 0;
virtual document::BucketId make_split_bit_constrained_bucket_id(const document::DocumentId& docId) const = 0;
virtual void recheck_bucket_info(uint16_t node_index, const document::Bucket& bucket) = 0;
virtual document::BucketId get_sibling(const document::BucketId& bid) const = 0;
- virtual const DistributorConfiguration& distributor_config() const noexcept = 0;
virtual void send_inline_split_if_bucket_too_large(document::BucketSpace bucket_space,
const BucketDatabase::Entry& entry,
uint8_t pri) = 0;
@@ -55,10 +46,6 @@ public:
virtual const lib::ClusterState* pending_cluster_state_or_null(const document::BucketSpace& bucket_space) const = 0;
virtual const lib::ClusterStateBundle& cluster_state_bundle() const = 0;
virtual bool storage_node_is_up(document::BucketSpace bucket_space, uint32_t node_index) const = 0;
-
- // TODO: Move to being a free function instead.
- virtual const char* storage_node_up_states() const = 0;
-
};
}