aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGeir Storli <geirst@verizonmedia.com>2021-03-17 23:14:53 +0100
committerGitHub <noreply@github.com>2021-03-17 23:14:53 +0100
commit841d8fd3093b4794eaedb6fcfd29b2ab152f618a (patch)
tree793da24d970690fc81aac13e32c81be68544a497
parentc8ef45681f5fad722de427aba9368b81fd85f63d (diff)
parent06b5a80a3419c12f00500a998558f5253b08e0aa (diff)
Merge pull request #17013 from vespa-engine/geirst/bucket-db-updater-refactoring
Bucket db updater refactoring
-rw-r--r--storage/src/tests/distributor/bucketdbupdatertest.cpp4
-rw-r--r--storage/src/vespa/storage/distributor/bucketdbupdater.cpp146
-rw-r--r--storage/src/vespa/storage/distributor/bucketdbupdater.h14
-rw-r--r--storage/src/vespa/storage/distributor/distributor_node_context.h2
-rw-r--r--storage/src/vespa/storage/distributor/distributor_operation_context.h1
-rw-r--r--storage/src/vespa/storage/distributor/distributorcomponent.h4
-rw-r--r--storage/src/vespa/storage/distributor/pendingclusterstate.cpp7
-rw-r--r--storage/src/vespa/storage/distributor/pendingclusterstate.h2
8 files changed, 99 insertions, 81 deletions
diff --git a/storage/src/tests/distributor/bucketdbupdatertest.cpp b/storage/src/tests/distributor/bucketdbupdatertest.cpp
index 22d9c945262..1b93e728a04 100644
--- a/storage/src/tests/distributor/bucketdbupdatertest.cpp
+++ b/storage/src/tests/distributor/bucketdbupdatertest.cpp
@@ -72,7 +72,7 @@ public:
lib::ClusterStateBundle clusterStateBundle(baselineClusterState);
ClusterInformation::CSP clusterInfo(
new SimpleClusterInformation(
- getBucketDBUpdater().getDistributorComponent().getIndex(),
+ getBucketDBUpdater().node_context().node_index(),
clusterStateBundle,
"ui"));
for (auto* repo : {&mutable_repo(), &read_only_repo()}) {
@@ -1966,7 +1966,7 @@ TEST_F(BucketDBUpdaterTest, newer_mutations_not_overwritten_by_earlier_bucket_fe
document::BucketId bucket(16, 1);
constexpr uint64_t insertionTimestamp = 1001ULL * 1000000;
api::BucketInfo wantedInfo(5, 6, 7);
- getBucketDBUpdater().getDistributorComponent().updateBucketDatabase(
+ getBucketDBUpdater().operation_context().update_bucket_database(
makeDocumentBucket(bucket),
BucketCopy(insertionTimestamp, 0, wantedInfo),
DatabaseUpdate::CREATE_IF_NONEXISTING);
diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
index 17a9b4f5127..12fd14c260e 100644
--- a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
+++ b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
@@ -31,17 +31,26 @@ BucketDBUpdater::BucketDBUpdater(Distributor& owner,
DistributorComponentRegister& compReg)
: framework::StatusReporter("bucketdb", "Bucket DB Updater"),
_distributorComponent(owner, bucketSpaceRepo, readOnlyBucketSpaceRepo, compReg, "Bucket DB Updater"),
+ _node_ctx(_distributorComponent),
+ _op_ctx(_distributorComponent),
+ _distributor_interface(_distributorComponent.getDistributor()),
+ _delayedRequests(),
+ _sentMessages(),
+ _pendingClusterState(),
+ _history(),
_sender(sender),
- _transitionTimer(_distributorComponent.getClock()),
+ _enqueuedRechecks(),
+ _outdatedNodesMap(),
+ _transitionTimer(_node_ctx.clock()),
_stale_reads_enabled(false),
_active_distribution_contexts(),
_explicit_transition_read_guard(),
_distribution_context_mutex()
{
- for (auto& elem : _distributorComponent.getBucketSpaceRepo()) {
+ for (auto& elem : _op_ctx.bucket_space_repo()) {
_active_distribution_contexts.emplace(
elem.first,
- BucketSpaceDistributionContext::make_not_yet_initialized(_distributorComponent.getIndex()));
+ BucketSpaceDistributionContext::make_not_yet_initialized(_node_ctx.node_index()));
_explicit_transition_read_guard.emplace(elem.first, std::shared_ptr<BucketDatabase::ReadGuard>());
}
}
@@ -62,8 +71,8 @@ OperationRoutingSnapshot BucketDBUpdater::read_snapshot_for_bucket(const documen
return OperationRoutingSnapshot::make_not_routable_in_state(active_state_iter->second);
}
const auto& space_repo = bucket_present_in_mutable_db
- ? _distributorComponent.getBucketSpaceRepo()
- : _distributorComponent.getReadOnlyBucketSpaceRepo();
+ ? _op_ctx.bucket_space_repo()
+ : _op_ctx.read_only_bucket_space_repo();
auto existing_guard_iter = _explicit_transition_read_guard.find(bucket_space);
assert(existing_guard_iter != _explicit_transition_read_guard.cend());
auto db_guard = existing_guard_iter->second
@@ -117,15 +126,14 @@ BucketDBUpdater::sendRequestBucketInfo(
const document::Bucket& bucket,
const std::shared_ptr<MergeReplyGuard>& mergeReplyGuard)
{
- if (!_distributorComponent.storageNodeIsUp(bucket.getBucketSpace(), node)) {
+ if (!_op_ctx.storage_node_is_up(bucket.getBucketSpace(), node)) {
return;
}
std::vector<document::BucketId> buckets;
buckets.push_back(bucket.getBucketId());
- std::shared_ptr<api::RequestBucketInfoCommand> msg(
- new api::RequestBucketInfoCommand(bucket.getBucketSpace(), buckets));
+ auto msg = std::make_shared<api::RequestBucketInfoCommand>(bucket.getBucketSpace(), buckets);
LOG(debug,
"Sending request bucket info command %" PRIu64 " for "
@@ -135,10 +143,10 @@ BucketDBUpdater::sendRequestBucketInfo(
node);
msg->setPriority(50);
- msg->setAddress(_distributorComponent.nodeAddress(node));
+ msg->setAddress(_node_ctx.node_address(node));
_sentMessages[msg->getMsgId()] =
- BucketRequest(node, _distributorComponent.getUniqueTimestamp(),
+ BucketRequest(node, _op_ctx.generate_unique_timestamp(),
bucket, mergeReplyGuard);
_sender.sendCommand(msg);
}
@@ -203,8 +211,8 @@ BucketDBUpdater::removeSuperfluousBuckets(
bool is_distribution_config_change)
{
const bool move_to_read_only_db = shouldDeferStateEnabling();
- const char* up_states = _distributorComponent.getDistributor().getStorageNodeUpStates();
- for (auto& elem : _distributorComponent.getBucketSpaceRepo()) {
+ const char* up_states = _op_ctx.storage_node_up_states();
+ for (auto& elem : _op_ctx.bucket_space_repo()) {
const auto& newDistribution(elem.second->getDistribution());
const auto& oldClusterState(elem.second->getClusterState());
const auto& new_cluster_state = newState.getDerivedClusterState(elem.first);
@@ -221,14 +229,14 @@ BucketDBUpdater::removeSuperfluousBuckets(
}
auto& bucketDb(elem.second->getBucketDatabase());
- auto& readOnlyDb(_distributorComponent.getReadOnlyBucketSpaceRepo().get(elem.first).getBucketDatabase());
+ auto& readOnlyDb(_op_ctx.read_only_bucket_space_repo().get(elem.first).getBucketDatabase());
// Remove all buckets not belonging to this distributor, or
// being on storage nodes that are no longer up.
MergingNodeRemover proc(
oldClusterState,
*new_cluster_state,
- _distributorComponent.getIndex(),
+ _node_ctx.node_index(),
newDistribution,
up_states,
move_to_read_only_db);
@@ -254,12 +262,12 @@ void maybe_sleep_for(std::chrono::milliseconds ms) {
void
BucketDBUpdater::maybe_inject_simulated_db_pruning_delay() {
- maybe_sleep_for(_distributorComponent.getDistributor().getConfig().simulated_db_pruning_latency());
+ maybe_sleep_for(_op_ctx.distributor_config().simulated_db_pruning_latency());
}
void
BucketDBUpdater::maybe_inject_simulated_db_merging_delay() {
- maybe_sleep_for(_distributorComponent.getDistributor().getConfig().simulated_db_merging_latency());
+ maybe_sleep_for(_op_ctx.distributor_config().simulated_db_merging_latency());
}
void
@@ -269,21 +277,21 @@ BucketDBUpdater::ensureTransitionTimerStarted()
// that will make transition times appear artificially low.
if (!hasPendingClusterState()) {
_transitionTimer = framework::MilliSecTimer(
- _distributorComponent.getClock());
+ _node_ctx.clock());
}
}
void
BucketDBUpdater::completeTransitionTimer()
{
- _distributorComponent.getDistributor().getMetrics()
+ _distributor_interface.getMetrics()
.stateTransitionTime.addValue(_transitionTimer.getElapsedTimeAsDouble());
}
void
BucketDBUpdater::clearReadOnlyBucketRepoDatabases()
{
- for (auto& space : _distributorComponent.getReadOnlyBucketSpaceRepo()) {
+ for (auto& space : _op_ctx.read_only_bucket_space_repo()) {
space.second->getBucketDatabase().clear();
}
}
@@ -293,27 +301,27 @@ BucketDBUpdater::storageDistributionChanged()
{
ensureTransitionTimerStarted();
- removeSuperfluousBuckets(_distributorComponent.getClusterStateBundle(), true);
+ removeSuperfluousBuckets(_op_ctx.cluster_state_bundle(), true);
- ClusterInformation::CSP clusterInfo(new SimpleClusterInformation(
- _distributorComponent.getIndex(),
- _distributorComponent.getClusterStateBundle(),
- _distributorComponent.getDistributor().getStorageNodeUpStates()));
+ auto clusterInfo = std::make_shared<const SimpleClusterInformation>(
+ _node_ctx.node_index(),
+ _op_ctx.cluster_state_bundle(),
+ _op_ctx.storage_node_up_states());
_pendingClusterState = PendingClusterState::createForDistributionChange(
- _distributorComponent.getClock(),
+ _node_ctx.clock(),
std::move(clusterInfo),
_sender,
- _distributorComponent.getBucketSpaceRepo(),
- _distributorComponent.getUniqueTimestamp());
+ _op_ctx.bucket_space_repo(),
+ _op_ctx.generate_unique_timestamp());
_outdatedNodesMap = _pendingClusterState->getOutdatedNodesMap();
- _distributorComponent.getBucketSpaceRepo().set_pending_cluster_state_bundle(_pendingClusterState->getNewClusterStateBundle());
+ _op_ctx.bucket_space_repo().set_pending_cluster_state_bundle(_pendingClusterState->getNewClusterStateBundle());
}
void
BucketDBUpdater::replyToPreviousPendingClusterStateIfAny()
{
if (_pendingClusterState.get() && _pendingClusterState->hasCommand()) {
- _distributorComponent.sendUp(
+ _distributor_interface.getMessageSender().sendUp(
std::make_shared<api::SetSystemStateReply>(*_pendingClusterState->getCommand()));
}
}
@@ -325,12 +333,12 @@ BucketDBUpdater::replyToActivationWithActualVersion(
{
auto reply = std::make_shared<api::ActivateClusterStateVersionReply>(cmd);
reply->setActualVersion(actualVersion);
- _distributorComponent.sendUp(reply); // TODO let API accept rvalues
+ _distributor_interface.getMessageSender().sendUp(reply); // TODO let API accept rvalues
}
void BucketDBUpdater::update_read_snapshot_before_db_pruning() {
std::lock_guard lock(_distribution_context_mutex);
- for (auto& elem : _distributorComponent.getBucketSpaceRepo()) {
+ for (auto& elem : _op_ctx.bucket_space_repo()) {
// At this point, we're still operating with a distribution context _without_ a
// pending state, i.e. anyone using the context will expect to find buckets
// in the DB that correspond to how the database looked like prior to pruning
@@ -347,9 +355,9 @@ void BucketDBUpdater::update_read_snapshot_before_db_pruning() {
void BucketDBUpdater::update_read_snapshot_after_db_pruning(const lib::ClusterStateBundle& new_state) {
std::lock_guard lock(_distribution_context_mutex);
- const auto old_default_state = _distributorComponent.getBucketSpaceRepo().get(
+ const auto old_default_state = _op_ctx.bucket_space_repo().get(
document::FixedBucketSpaces::default_space()).cluster_state_sp();
- for (auto& elem : _distributorComponent.getBucketSpaceRepo()) {
+ for (auto& elem : _op_ctx.bucket_space_repo()) {
auto new_distribution = elem.second->distribution_sp();
auto old_cluster_state = elem.second->cluster_state_sp();
auto new_cluster_state = new_state.getDerivedClusterState(elem.first);
@@ -360,7 +368,7 @@ void BucketDBUpdater::update_read_snapshot_after_db_pruning(const lib::ClusterSt
old_default_state,
std::move(new_cluster_state),
std::move(new_distribution),
- _distributorComponent.getIndex()));
+ _node_ctx.node_index()));
// We can now remove the explicit mutable DB snapshot, as the buckets that have been
// pruned away are visible in the read-only DB.
_explicit_transition_read_guard[elem.first] = std::shared_ptr<BucketDatabase::ReadGuard>();
@@ -370,7 +378,7 @@ void BucketDBUpdater::update_read_snapshot_after_db_pruning(const lib::ClusterSt
void BucketDBUpdater::update_read_snapshot_after_activation(const lib::ClusterStateBundle& activated_state) {
std::lock_guard lock(_distribution_context_mutex);
const auto& default_cluster_state = activated_state.getDerivedClusterState(document::FixedBucketSpaces::default_space());
- for (auto& elem : _distributorComponent.getBucketSpaceRepo()) {
+ for (auto& elem : _op_ctx.bucket_space_repo()) {
auto new_distribution = elem.second->distribution_sp();
auto new_cluster_state = activated_state.getDerivedClusterState(elem.first);
_active_distribution_contexts.insert_or_assign(
@@ -379,7 +387,7 @@ void BucketDBUpdater::update_read_snapshot_after_activation(const lib::ClusterSt
std::move(new_cluster_state),
default_cluster_state,
std::move(new_distribution),
- _distributorComponent.getIndex()));
+ _node_ctx.node_index()));
}
}
@@ -391,7 +399,7 @@ BucketDBUpdater::onSetSystemState(
"Received new cluster state %s",
cmd->getSystemState().toString().c_str());
- const lib::ClusterStateBundle oldState = _distributorComponent.getClusterStateBundle();
+ const lib::ClusterStateBundle oldState = _op_ctx.cluster_state_bundle();
const lib::ClusterStateBundle& state = cmd->getClusterStateBundle();
if (state == oldState) {
@@ -399,33 +407,31 @@ BucketDBUpdater::onSetSystemState(
}
ensureTransitionTimerStarted();
// Separate timer since _transitionTimer might span multiple pending states.
- framework::MilliSecTimer process_timer(_distributorComponent.getClock());
+ framework::MilliSecTimer process_timer(_node_ctx.clock());
update_read_snapshot_before_db_pruning();
const auto& bundle = cmd->getClusterStateBundle();
removeSuperfluousBuckets(bundle, false);
update_read_snapshot_after_db_pruning(bundle);
replyToPreviousPendingClusterStateIfAny();
- ClusterInformation::CSP clusterInfo(
- new SimpleClusterInformation(
- _distributorComponent.getIndex(),
- _distributorComponent.getClusterStateBundle(),
- _distributorComponent.getDistributor()
- .getStorageNodeUpStates()));
+ auto clusterInfo = std::make_shared<const SimpleClusterInformation>(
+ _node_ctx.node_index(),
+ _op_ctx.cluster_state_bundle(),
+ _op_ctx.storage_node_up_states());
_pendingClusterState = PendingClusterState::createForClusterStateChange(
- _distributorComponent.getClock(),
+ _node_ctx.clock(),
std::move(clusterInfo),
_sender,
- _distributorComponent.getBucketSpaceRepo(),
+ _op_ctx.bucket_space_repo(),
cmd,
_outdatedNodesMap,
- _distributorComponent.getUniqueTimestamp());
+ _op_ctx.generate_unique_timestamp());
_outdatedNodesMap = _pendingClusterState->getOutdatedNodesMap();
- _distributorComponent.getDistributor().getMetrics().set_cluster_state_processing_time.addValue(
+ _distributor_interface.getMetrics().set_cluster_state_processing_time.addValue(
process_timer.getElapsedTimeAsDouble());
- _distributorComponent.getBucketSpaceRepo().set_pending_cluster_state_bundle(_pendingClusterState->getNewClusterStateBundle());
+ _op_ctx.bucket_space_repo().set_pending_cluster_state_bundle(_pendingClusterState->getNewClusterStateBundle());
if (isPendingClusterStateCompleted()) {
processCompletedPendingClusterState();
}
@@ -468,7 +474,7 @@ BucketDBUpdater::onActivateClusterStateVersion(const std::shared_ptr<api::Activa
BucketDBUpdater::MergeReplyGuard::~MergeReplyGuard()
{
if (_reply) {
- _updater.getDistributorComponent().getDistributor().handleCompletedMerge(_reply);
+ _distributor_interface.handleCompletedMerge(_reply);
}
}
@@ -476,8 +482,7 @@ bool
BucketDBUpdater::onMergeBucketReply(
const std::shared_ptr<api::MergeBucketReply>& reply)
{
- std::shared_ptr<MergeReplyGuard> replyGuard(
- new MergeReplyGuard(*this, reply));
+ auto replyGuard = std::make_shared<MergeReplyGuard>(_distributor_interface, reply);
// In case the merge was unsuccessful somehow, or some nodes weren't
// actually merged (source-only nodes?) we request the bucket info of the
@@ -523,8 +528,7 @@ BucketDBUpdater::onNotifyBucketChange(
const std::shared_ptr<api::NotifyBucketChangeCommand>& cmd)
{
// Immediately schedule reply to ensure it is sent.
- _sender.sendReply(std::shared_ptr<api::StorageReply>(
- new api::NotifyBucketChangeReply(*cmd)));
+ _sender.sendReply(std::make_shared<api::NotifyBucketChangeReply>(*cmd));
if (!cmd->getBucketInfo().valid()) {
LOG(error,
@@ -594,7 +598,7 @@ BucketDBUpdater::handleSingleBucketInfoFailure(
req.targetNode, repl->getResult().toString().c_str());
if (req.bucket.getBucketId() != document::BucketId(0)) {
- framework::MilliSecTime sendTime(_distributorComponent.getClock());
+ framework::MilliSecTime sendTime(_node_ctx.clock());
sendTime += framework::MilliSecTime(100);
_delayedRequests.emplace_back(sendTime, req);
}
@@ -606,8 +610,10 @@ BucketDBUpdater::resendDelayedMessages()
if (_pendingClusterState) {
_pendingClusterState->resendDelayedMessages();
}
- if (_delayedRequests.empty()) return; // Don't fetch time if not needed
- framework::MilliSecTime currentTime(_distributorComponent.getClock());
+ if (_delayedRequests.empty()) {
+ return; // Don't fetch time if not needed
+ }
+ framework::MilliSecTime currentTime(_node_ctx.clock());
while (!_delayedRequests.empty()
&& currentTime >= _delayedRequests.front().first)
{
@@ -662,7 +668,7 @@ BucketDBUpdater::processSingleBucketInfoReply(
BucketRequest req = iter->second;
_sentMessages.erase(iter);
- if (!_distributorComponent.storageNodeIsUp(req.bucket.getBucketSpace(), req.targetNode)) {
+ if (!_op_ctx.storage_node_is_up(req.bucket.getBucketSpace(), req.targetNode)) {
// Ignore replies from nodes that are down.
return true;
}
@@ -690,7 +696,7 @@ void
BucketDBUpdater::findRelatedBucketsInDatabase(uint16_t node, const document::Bucket& bucket,
BucketListMerger::BucketList& existing)
{
- auto &distributorBucketSpace(_distributorComponent.getBucketSpaceRepo().get(bucket.getBucketSpace()));
+ auto &distributorBucketSpace(_op_ctx.bucket_space_repo().get(bucket.getBucketSpace()));
std::vector<BucketDatabase::Entry> entries;
distributorBucketSpace.getBucketDatabase().getAll(bucket.getBucketId(), entries);
@@ -704,12 +710,12 @@ BucketDBUpdater::updateDatabase(document::BucketSpace bucketSpace, uint16_t node
{
for (const document::BucketId & bucketId : merger.getRemovedEntries()) {
document::Bucket bucket(bucketSpace, bucketId);
- _distributorComponent.removeNodeFromDB(bucket, node);
+ _op_ctx.remove_node_from_bucket_database(bucket, node);
}
for (const BucketListMerger::BucketEntry& entry : merger.getAddedEntries()) {
document::Bucket bucket(bucketSpace, entry.first);
- _distributorComponent.updateBucketDatabase(
+ _op_ctx.update_bucket_database(
bucket,
BucketCopy(merger.getTimestamp(), node, entry.second),
DatabaseUpdate::CREATE_IF_NONEXISTING);
@@ -737,7 +743,7 @@ BucketDBUpdater::processCompletedPendingClusterState()
// taken effect via activation. External operation handler will keep operations from
// actually being scheduled until state has been activated. The external operation handler
// needs to be explicitly aware of the case where no state has yet to be activated.
- _distributorComponent.getDistributor().getMessageSender().sendDown(
+ _distributor_interface.getMessageSender().sendDown(
_pendingClusterState->getCommand());
_pendingClusterState->clearCommand();
return;
@@ -750,7 +756,7 @@ BucketDBUpdater::processCompletedPendingClusterState()
void
BucketDBUpdater::activatePendingClusterState()
{
- framework::MilliSecTimer process_timer(_distributorComponent.getClock());
+ framework::MilliSecTimer process_timer(_node_ctx.clock());
_pendingClusterState->mergeIntoBucketDatabases();
maybe_inject_simulated_db_merging_delay();
@@ -759,7 +765,7 @@ BucketDBUpdater::activatePendingClusterState()
LOG(debug, "Activating pending cluster state version %u", _pendingClusterState->clusterStateVersion());
enableCurrentClusterStateBundleInDistributor();
if (_pendingClusterState->hasCommand()) {
- _distributorComponent.getDistributor().getMessageSender().sendDown(
+ _distributor_interface.getMessageSender().sendDown(
_pendingClusterState->getCommand());
}
addCurrentStateToClusterStateHistory();
@@ -767,18 +773,18 @@ BucketDBUpdater::activatePendingClusterState()
LOG(debug, "Activating pending distribution config");
// TODO distribution changes cannot currently be deferred as they are not
// initiated by the cluster controller!
- _distributorComponent.getDistributor().notifyDistributionChangeEnabled();
+ _distributor_interface.notifyDistributionChangeEnabled();
}
update_read_snapshot_after_activation(_pendingClusterState->getNewClusterStateBundle());
_pendingClusterState.reset();
_outdatedNodesMap.clear();
- _distributorComponent.getBucketSpaceRepo().clear_pending_cluster_state_bundle();
+ _op_ctx.bucket_space_repo().clear_pending_cluster_state_bundle();
sendAllQueuedBucketRechecks();
completeTransitionTimer();
clearReadOnlyBucketRepoDatabases();
- _distributorComponent.getDistributor().getMetrics().activate_cluster_state_processing_time.addValue(
+ _distributor_interface.getMetrics().activate_cluster_state_processing_time.addValue(
process_timer.getElapsedTimeAsDouble());
}
@@ -792,12 +798,12 @@ BucketDBUpdater::enableCurrentClusterStateBundleInDistributor()
"BucketDBUpdater finished processing state %s",
state.getBaselineClusterState()->toString().c_str());
- _distributorComponent.getDistributor().enableClusterStateBundle(state);
+ _distributor_interface.enableClusterStateBundle(state);
}
void BucketDBUpdater::simulate_cluster_state_bundle_activation(const lib::ClusterStateBundle& activated_state) {
update_read_snapshot_after_activation(activated_state);
- _distributorComponent.getDistributor().enableClusterStateBundle(activated_state);
+ _distributor_interface.enableClusterStateBundle(activated_state);
}
void
@@ -863,7 +869,7 @@ BucketDBUpdater::reportXmlStatus(vespalib::xml::XmlOutputStream& xos,
using namespace vespalib::xml;
xos << XmlTag("bucketdb")
<< XmlTag("systemstate_active")
- << XmlContent(_distributorComponent.getClusterStateBundle().getBaselineClusterState()->toString())
+ << XmlContent(_op_ctx.cluster_state_bundle().getBaselineClusterState()->toString())
<< XmlEndTag();
if (_pendingClusterState) {
xos << *_pendingClusterState;
diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.h b/storage/src/vespa/storage/distributor/bucketdbupdater.h
index d70a4fe3409..8fab76575e9 100644
--- a/storage/src/vespa/storage/distributor/bucketdbupdater.h
+++ b/storage/src/vespa/storage/distributor/bucketdbupdater.h
@@ -57,7 +57,8 @@ public:
vespalib::string getReportContentType(const framework::HttpUrlPath&) const override;
bool reportStatus(std::ostream&, const framework::HttpUrlPath&) const override;
void print(std::ostream& out, bool verbose, const std::string& indent) const;
- DistributorComponent& getDistributorComponent() { return _distributorComponent; }
+ const DistributorNodeContext& node_context() const { return _node_ctx; }
+ DistributorOperationContext& operation_context() { return _op_ctx; }
/**
* Returns whether the current PendingClusterState indicates that there has
@@ -78,11 +79,10 @@ public:
OperationRoutingSnapshot read_snapshot_for_bucket(const document::Bucket&) const;
private:
- DistributorComponent _distributorComponent;
class MergeReplyGuard {
public:
- MergeReplyGuard(BucketDBUpdater& updater, const std::shared_ptr<api::MergeBucketReply>& reply)
- : _updater(updater), _reply(reply) {}
+ MergeReplyGuard(DistributorInterface& distributor_interface, const std::shared_ptr<api::MergeBucketReply>& reply) noexcept
+ : _distributor_interface(distributor_interface), _reply(reply) {}
~MergeReplyGuard();
@@ -90,7 +90,7 @@ private:
// than send it down
void resetReply() { _reply.reset(); }
private:
- BucketDBUpdater& _updater;
+ DistributorInterface& _distributor_interface;
std::shared_ptr<api::MergeBucketReply> _reply;
};
@@ -239,6 +239,10 @@ private:
mutable bool _cachedOwned;
};
+ DistributorComponent _distributorComponent;
+ const DistributorNodeContext& _node_ctx;
+ DistributorOperationContext& _op_ctx;
+ DistributorInterface& _distributor_interface;
std::deque<std::pair<framework::MilliSecTime, BucketRequest> > _delayedRequests;
std::map<uint64_t, BucketRequest> _sentMessages;
std::unique_ptr<PendingClusterState> _pendingClusterState;
diff --git a/storage/src/vespa/storage/distributor/distributor_node_context.h b/storage/src/vespa/storage/distributor/distributor_node_context.h
index 3cb0f509ea7..805e54342dc 100644
--- a/storage/src/vespa/storage/distributor/distributor_node_context.h
+++ b/storage/src/vespa/storage/distributor/distributor_node_context.h
@@ -3,6 +3,7 @@
#pragma once
#include <vespa/storage/common/cluster_context.h>
+#include <vespa/storageapi/messageapi/storagemessage.h>
#include <cstdint>
namespace document { class BucketIdFactory; }
@@ -20,6 +21,7 @@ public:
virtual const framework::Clock& clock() const noexcept = 0;
virtual const document::BucketIdFactory& bucket_id_factory() const noexcept = 0;
virtual uint16_t node_index() const noexcept = 0;
+ virtual api::StorageMessageAddress node_address(uint16_t node_index) const noexcept = 0;
};
}
diff --git a/storage/src/vespa/storage/distributor/distributor_operation_context.h b/storage/src/vespa/storage/distributor/distributor_operation_context.h
index 97a522a694a..0b47c71e2e1 100644
--- a/storage/src/vespa/storage/distributor/distributor_operation_context.h
+++ b/storage/src/vespa/storage/distributor/distributor_operation_context.h
@@ -50,6 +50,7 @@ public:
uint32_t message_type) const = 0;
virtual const lib::ClusterState* pending_cluster_state_or_null(const document::BucketSpace& bucket_space) const = 0;
virtual const lib::ClusterStateBundle& cluster_state_bundle() const = 0;
+ virtual bool storage_node_is_up(document::BucketSpace bucket_space, uint32_t node_index) const = 0;
// TODO: Move to being a free function instead.
virtual const char* storage_node_up_states() const = 0;
diff --git a/storage/src/vespa/storage/distributor/distributorcomponent.h b/storage/src/vespa/storage/distributor/distributorcomponent.h
index b4a0b00a2e1..ca953ed01ef 100644
--- a/storage/src/vespa/storage/distributor/distributorcomponent.h
+++ b/storage/src/vespa/storage/distributor/distributorcomponent.h
@@ -149,6 +149,7 @@ public:
const vespalib::string * cluster_name_ptr() const noexcept override { return cluster_context().cluster_name_ptr(); }
const document::BucketIdFactory& bucket_id_factory() const noexcept override { return getBucketIdFactory(); }
uint16_t node_index() const noexcept override { return getIndex(); }
+ api::StorageMessageAddress node_address(uint16_t node_index) const noexcept override { return nodeAddress(node_index); }
// Implements DistributorOperationContext
api::Timestamp generate_unique_timestamp() override { return getUniqueTimestamp(); }
@@ -203,6 +204,9 @@ public:
const lib::ClusterStateBundle& cluster_state_bundle() const override {
return getClusterStateBundle();
}
+ bool storage_node_is_up(document::BucketSpace bucket_space, uint32_t node_index) const override {
+ return storageNodeIsUp(bucket_space, node_index);
+ }
const char* storage_node_up_states() const override {
return getDistributor().getStorageNodeUpStates();
}
diff --git a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp
index 175c7d27033..a7fd5a5af53 100644
--- a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp
+++ b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp
@@ -32,7 +32,9 @@ PendingClusterState::PendingClusterState(
const OutdatedNodesMap &outdatedNodesMap,
api::Timestamp creationTimestamp)
: _cmd(newStateCmd),
+ _sentMessages(),
_requestedNodes(newStateCmd->getSystemState().getNodeCount(lib::NodeType::STORAGE)),
+ _delayedRequests(),
_prevClusterStateBundle(clusterInfo->getClusterStateBundle()),
_newClusterStateBundle(newStateCmd->getClusterStateBundle()),
_clock(clock),
@@ -222,12 +224,11 @@ PendingClusterState::requestNode(BucketSpaceAndNode bucketSpaceAndNode)
_newClusterStateBundle.getDerivedClusterState(bucketSpaceAndNode.bucketSpace)->toString().c_str(),
distributionHash.c_str());
- std::shared_ptr<api::RequestBucketInfoCommand> cmd(
- new api::RequestBucketInfoCommand(
+ auto cmd = std::make_shared<api::RequestBucketInfoCommand>(
bucketSpaceAndNode.bucketSpace,
_sender.getDistributorIndex(),
*_newClusterStateBundle.getDerivedClusterState(bucketSpaceAndNode.bucketSpace),
- distributionHash));
+ distributionHash);
cmd->setPriority(api::StorageMessage::HIGH);
cmd->setTimeout(vespalib::duration::max());
diff --git a/storage/src/vespa/storage/distributor/pendingclusterstate.h b/storage/src/vespa/storage/distributor/pendingclusterstate.h
index 6d558d05640..42b7bf0dcf2 100644
--- a/storage/src/vespa/storage/distributor/pendingclusterstate.h
+++ b/storage/src/vespa/storage/distributor/pendingclusterstate.h
@@ -160,7 +160,7 @@ private:
/**
* Creates a pending cluster state that represents
- * a set system state command from the fleet controller.
+ * a set system state command from the cluster controller.
*/
PendingClusterState(
const framework::Clock&,