summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorOla Aunrønning <olaa@yahooinc.com>2023-06-08 10:58:17 +0200
committerGitHub <noreply@github.com>2023-06-08 10:58:17 +0200
commit79de2d78433c11932357d0c244704b16fc87da21 (patch)
tree382a06ce4e1217e395fbba25c1ce74a1afaadbe5 /storage
parent96d3814b80a693ee46640ae89a88fdb2d78dcd40 (diff)
parentd7568f0d450df2287657ac18c37955a1867496f5 (diff)
Merge branch 'master' into olaa/dataplane-proxy-config
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/distributor/check_condition_test.cpp42
-rw-r--r--storage/src/tests/distributor/putoperationtest.cpp3
-rw-r--r--storage/src/tests/distributor/removeoperationtest.cpp3
-rw-r--r--storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp57
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe_component.cpp5
-rw-r--r--storage/src/vespa/storage/distributor/distributormetricsset.cpp2
-rw-r--r--storage/src/vespa/storage/distributor/distributormetricsset.h22
-rw-r--r--storage/src/vespa/storage/distributor/externaloperationhandler.cpp7
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/check_condition.cpp27
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/check_condition.h5
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/putoperation.cpp5
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/putoperation.h3
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp5
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/removeoperation.h3
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp3
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h1
-rw-r--r--storage/src/vespa/storage/distributor/persistence_operation_metric_set.cpp14
-rw-r--r--storage/src/vespa/storage/distributor/persistence_operation_metric_set.h6
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp3
-rw-r--r--storage/src/vespa/storage/persistence/persistenceutil.cpp3
-rw-r--r--storage/src/vespa/storage/storageserver/changedbucketownershiphandler.cpp147
-rw-r--r--storage/src/vespa/storage/storageserver/changedbucketownershiphandler.h50
22 files changed, 260 insertions, 156 deletions
diff --git a/storage/src/tests/distributor/check_condition_test.cpp b/storage/src/tests/distributor/check_condition_test.cpp
index 1b5cede8af6..757a9329ea6 100644
--- a/storage/src/tests/distributor/check_condition_test.cpp
+++ b/storage/src/tests/distributor/check_condition_test.cpp
@@ -242,6 +242,32 @@ TEST_F(CheckConditionTest, check_fails_if_replica_set_changed_between_start_and_
});
}
+TEST_F(CheckConditionTest, check_fails_if_bucket_ownership_changed_between_start_and_completion_pending_transition_case) {
+ test_cond_with_2_gets_sent([&](auto& cond) {
+ cond.handle_reply(_sender, make_matched_reply(0));
+ simulate_set_pending_cluster_state("version:2 storage:1 distributor:1 .0.s:d"); // technically, no distributors own anything
+ cond.handle_reply(_sender, make_matched_reply(1));
+ }, [&](auto& outcome) {
+ EXPECT_FALSE(outcome.matched_condition());
+ EXPECT_FALSE(outcome.not_found());
+ EXPECT_TRUE(outcome.failed());
+ EXPECT_EQ(outcome.error_code().getResult(), api::ReturnCode::BUCKET_NOT_FOUND);
+ });
+}
+
+TEST_F(CheckConditionTest, check_fails_if_bucket_ownership_changed_between_start_and_completion_completed_transition_case) {
+ test_cond_with_2_gets_sent([&](auto& cond) {
+ cond.handle_reply(_sender, make_matched_reply(0));
+ enable_cluster_state("version:2 storage:1 distributor:1 .0.s:d"); // technically, no distributors own anything
+ cond.handle_reply(_sender, make_matched_reply(1));
+ }, [&](auto& outcome) {
+ EXPECT_FALSE(outcome.matched_condition());
+ EXPECT_FALSE(outcome.not_found());
+ EXPECT_TRUE(outcome.failed());
+ EXPECT_EQ(outcome.error_code().getResult(), api::ReturnCode::BUCKET_NOT_FOUND);
+ });
+}
+
TEST_F(CheckConditionTest, nested_get_traces_are_propagated_to_outcome) {
test_cond_with_2_gets_sent([&](auto& cond) {
cond.handle_reply(_sender, make_trace_reply(0, 100, "hello"));
@@ -253,4 +279,20 @@ TEST_F(CheckConditionTest, nested_get_traces_are_propagated_to_outcome) {
});
}
+TEST_F(CheckConditionTest, condition_evaluation_increments_probe_latency_metrics) {
+ getClock().setAbsoluteTimeInSeconds(1);
+ EXPECT_EQ(_metrics.latency.getLongValue("count"), 0);
+ EXPECT_EQ(_metrics.ok.getLongValue("last"), 0);
+ test_cond_with_2_gets_sent([&](auto& cond) {
+ cond.handle_reply(_sender, make_matched_reply(0));
+ getClock().setAbsoluteTimeInSeconds(3);
+ cond.handle_reply(_sender, make_matched_reply(1));
+ }, [&](auto& outcome) noexcept {
+ (void)outcome;
+ });
+ EXPECT_EQ(_metrics.latency.getLongValue("count"), 1);
+ EXPECT_EQ(_metrics.ok.getLongValue("last"), 1);
+ EXPECT_DOUBLE_EQ(_metrics.latency.getLast(), 2'000.0); // in millis
+}
+
}
diff --git a/storage/src/tests/distributor/putoperationtest.cpp b/storage/src/tests/distributor/putoperationtest.cpp
index ff375e5b902..76b6741442e 100644
--- a/storage/src/tests/distributor/putoperationtest.cpp
+++ b/storage/src/tests/distributor/putoperationtest.cpp
@@ -73,7 +73,8 @@ public:
operation_context(),
getDistributorBucketSpace(),
msg,
- metrics().puts);
+ metrics().puts,
+ metrics().put_condition_probes);
op->start(_sender);
}
diff --git a/storage/src/tests/distributor/removeoperationtest.cpp b/storage/src/tests/distributor/removeoperationtest.cpp
index d352d23bb8c..d169c80a95d 100644
--- a/storage/src/tests/distributor/removeoperationtest.cpp
+++ b/storage/src/tests/distributor/removeoperationtest.cpp
@@ -41,7 +41,8 @@ struct RemoveOperationTest : Test, DistributorStripeTestUtil {
operation_context(),
getDistributorBucketSpace(),
msg,
- metrics().removes);
+ metrics().removes,
+ metrics().remove_condition_probes);
op->start(_sender);
}
diff --git a/storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp b/storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp
index 4c83dde30da..ae2385a36d8 100644
--- a/storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp
+++ b/storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp
@@ -40,15 +40,11 @@ struct ChangedBucketOwnershipHandlerTest : Test {
uint16_t wantedOwner,
const lib::ClusterState& state);
- std::shared_ptr<api::SetSystemStateCommand> createStateCmd(
- const lib::ClusterState& state) const
- {
+ std::shared_ptr<api::SetSystemStateCommand> createStateCmd(const lib::ClusterState& state) const {
return std::make_shared<api::SetSystemStateCommand>(state);
}
- std::shared_ptr<api::SetSystemStateCommand> createStateCmd(
- const std::string& stateStr) const
- {
+ std::shared_ptr<api::SetSystemStateCommand> createStateCmd(const std::string& stateStr) const {
return createStateCmd(lib::ClusterState(stateStr));
}
@@ -71,11 +67,17 @@ struct ChangedBucketOwnershipHandlerTest : Test {
template <typename MsgType, typename... MsgParams>
void expectDownAbortsMessage(bool expected, MsgParams&& ... params);
- lib::ClusterState getDefaultTestClusterState() const {
+ std::shared_ptr<AbortBucketOperationsCommand> fetch_dispatched_abort_operations_command() {
+ _bottom->waitForMessages(2, 60); // abort cmd + set cluster state cmd
+ EXPECT_EQ(2, _bottom->getNumCommands());
+ return std::dynamic_pointer_cast<AbortBucketOperationsCommand>(_bottom->getCommand(0));
+ }
+
+ static lib::ClusterState getDefaultTestClusterState() {
return lib::ClusterState("distributor:4 storage:1");
}
- lib::ClusterState getStorageDownTestClusterState() const {
+ static lib::ClusterState getStorageDownTestClusterState() {
return lib::ClusterState("distributor:4 storage:1 .0.s:d");
}
@@ -173,29 +175,26 @@ hasAbortedNoneOf(const AbortBucketOperationsCommand::SP& cmd, const Vec& v)
bool
hasOnlySetSystemStateCmdQueued(DummyStorageLink& link) {
+ link.waitForMessages(1, 60);
if (link.getNumCommands() != 1) {
std::cerr << "expected 1 command, found"
<< link.getNumCommands() << "\n";
}
- api::SetSystemStateCommand::SP cmd(
- std::dynamic_pointer_cast<api::SetSystemStateCommand>(
- link.getCommand(0)));
- return (cmd.get() != 0);
+ auto cmd = std::dynamic_pointer_cast<api::SetSystemStateCommand>(link.getCommand(0));
+ return static_cast<bool>(cmd);
}
}
void
-ChangedBucketOwnershipHandlerTest::applyDistribution(
- Redundancy redundancy, NodeCount nodeCount)
+ChangedBucketOwnershipHandlerTest::applyDistribution(Redundancy redundancy, NodeCount nodeCount)
{
_app->setDistribution(redundancy, nodeCount);
_handler->storageDistributionChanged();
}
void
-ChangedBucketOwnershipHandlerTest::applyClusterState(
- const lib::ClusterState& state)
+ChangedBucketOwnershipHandlerTest::applyClusterState(const lib::ClusterState& state)
{
_app->setClusterState(state);
_handler->reloadClusterState();
@@ -212,10 +211,8 @@ TEST_F(ChangedBucketOwnershipHandlerTest, enumerate_buckets_belonging_on_changed
auto node2Buckets(insertBuckets(2, 2, stateBefore));
_top->sendDown(createStateCmd("distributor:4 .1.s:d .3.s:d storage:1"));
- // TODO: refactor into own function
- ASSERT_EQ(2, _bottom->getNumCommands());
- auto cmd = std::dynamic_pointer_cast<AbortBucketOperationsCommand>(_bottom->getCommand(0));
- ASSERT_TRUE(cmd.get() != 0);
+ auto cmd = fetch_dispatched_abort_operations_command();
+ ASSERT_TRUE(cmd);
EXPECT_TRUE(hasAbortedAllOf(cmd, node1Buckets));
EXPECT_TRUE(hasAbortedAllOf(cmd, node3Buckets));
@@ -280,10 +277,8 @@ TEST_F(ChangedBucketOwnershipHandlerTest, down_edge_to_no_available_distributors
lib::ClusterState downState("distributor:3 .0.s:d .1.s:s .2.s:s storage:1");
_top->sendDown(createStateCmd(downState));
- // TODO: refactor into own function
- ASSERT_EQ(2, _bottom->getNumCommands());
- auto cmd = std::dynamic_pointer_cast<AbortBucketOperationsCommand>(_bottom->getCommand(0));
- ASSERT_TRUE(cmd.get() != 0);
+ auto cmd = fetch_dispatched_abort_operations_command();
+ ASSERT_TRUE(cmd);
EXPECT_TRUE(hasAbortedAllOf(cmd, node0Buckets));
EXPECT_TRUE(hasAbortedAllOf(cmd, node1Buckets));
@@ -304,10 +299,8 @@ TEST_F(ChangedBucketOwnershipHandlerTest, ownership_changed_on_distributor_up_ed
auto node2Buckets(insertBuckets(2, 2, stateAfter));
_top->sendDown(createStateCmd(stateAfter));
- // TODO: refactor into own function
- ASSERT_EQ(2, _bottom->getNumCommands());
- auto cmd = std::dynamic_pointer_cast<AbortBucketOperationsCommand>(_bottom->getCommand(0));
- ASSERT_TRUE(cmd.get() != 0);
+ auto cmd = fetch_dispatched_abort_operations_command();
+ ASSERT_TRUE(cmd);
EXPECT_TRUE(hasAbortedAllOf(cmd, node1Buckets));
EXPECT_TRUE(hasAbortedNoneOf(cmd, node0Buckets));
@@ -319,8 +312,7 @@ TEST_F(ChangedBucketOwnershipHandlerTest, ownership_changed_on_distributor_up_ed
}
void
-ChangedBucketOwnershipHandlerTest::sendAndExpectAbortedCreateBucket(
- uint16_t fromDistributorIndex)
+ChangedBucketOwnershipHandlerTest::sendAndExpectAbortedCreateBucket(uint16_t fromDistributorIndex)
{
document::BucketId bucket(16, 6786);
auto msg = std::make_shared<api::CreateBucketCommand>(makeDocumentBucket(bucket));
@@ -350,7 +342,7 @@ TEST_F(ChangedBucketOwnershipHandlerTest, distribution_config_change_updates_own
/**
* Generate and dispatch a message of the given type with the provided
- * aruments as if that message was sent from distributor 1. Messages will
+ * arguments as if that message was sent from distributor 1. Messages will
* be checked as if the state contains 4 distributors in Up state. This
* means that it suffices to send in a message with a bucket that is not
* owned by distributor 1 in this state to trigger an abort.
@@ -382,7 +374,7 @@ ChangedBucketOwnershipHandlerTest::expectChangeAbortsMessage(bool expected, MsgP
/**
* Generate and dispatch a message of the given type with the provided
- * aruments as if that message was sent from distributor 1. Messages will
+ * arguments as if that message was sent from distributor 1. Messages will
* be checked as if the state contains 4 distributors in Up state and storage
* node is down. This means that any abortable message will trigger an abort.
*/
@@ -394,6 +386,7 @@ ChangedBucketOwnershipHandlerTest::expectDownAbortsMessage(bool expected, MsgPar
(void) _bottom->getCommandsOnce();
ASSERT_NO_FATAL_FAILURE((expectChangeAbortsMessage<MsgType, MsgParams...>(false, std::forward<MsgParams>(params)...)));
_top->sendDown(createStateCmd(getStorageDownTestClusterState()));
+ _bottom->waitForMessages(3, 60);
ASSERT_EQ(_bottom->getNumCommands(), 3);
auto setSystemStateCommand = std::dynamic_pointer_cast<api::SetSystemStateCommand>(_bottom->getCommand(2));
ASSERT_TRUE(setSystemStateCommand);
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_component.cpp b/storage/src/vespa/storage/distributor/distributor_stripe_component.cpp
index b47e0697a91..9a5fd595b1d 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe_component.cpp
+++ b/storage/src/vespa/storage/distributor/distributor_stripe_component.cpp
@@ -166,7 +166,10 @@ DistributorStripeComponent::update_bucket_database(
}
}
- UpdateBucketDatabaseProcessor processor(getClock(), found_down_node ? up_nodes : changed_nodes, bucketSpace.get_ideal_service_layer_nodes_bundle(bucket.getBucketId()).get_available_nodes(), (update_flags & DatabaseUpdate::RESET_TRUSTED) != 0);
+ UpdateBucketDatabaseProcessor processor(getClock(),
+ found_down_node ? up_nodes : changed_nodes,
+ bucketSpace.get_ideal_service_layer_nodes_bundle(bucket.getBucketId()).get_available_nodes(),
+ (update_flags & DatabaseUpdate::RESET_TRUSTED) != 0);
bucketSpace.getBucketDatabase().process_update(bucket.getBucketId(), processor, (update_flags & DatabaseUpdate::CREATE_IF_NONEXISTING) != 0);
}
diff --git a/storage/src/vespa/storage/distributor/distributormetricsset.cpp b/storage/src/vespa/storage/distributor/distributormetricsset.cpp
index fad44782dd4..cbc0e6f6eef 100644
--- a/storage/src/vespa/storage/distributor/distributormetricsset.cpp
+++ b/storage/src/vespa/storage/distributor/distributormetricsset.cpp
@@ -16,11 +16,13 @@ BucketDbMetrics::~BucketDbMetrics() = default;
DistributorMetricSet::DistributorMetricSet()
: MetricSet("distributor", {{"distributor"}}, ""),
puts("puts", this),
+ put_condition_probes("put_condition_probes", this),
updates(this),
update_puts("update_puts", this),
update_gets("update_gets", this),
update_metadata_gets("update_metadata_gets", this),
removes("removes", this),
+ remove_condition_probes("remove_condition_probes", this),
removelocations("removelocations", this),
gets("gets", this),
stats("stats", this),
diff --git a/storage/src/vespa/storage/distributor/distributormetricsset.h b/storage/src/vespa/storage/distributor/distributormetricsset.h
index ac140b85282..739e84759f1 100644
--- a/storage/src/vespa/storage/distributor/distributormetricsset.h
+++ b/storage/src/vespa/storage/distributor/distributormetricsset.h
@@ -20,24 +20,26 @@ struct BucketDbMetrics : metrics::MetricSet {
class DistributorMetricSet : public metrics::MetricSet {
public:
PersistenceOperationMetricSet puts;
- UpdateMetricSet updates;
+ PersistenceOperationMetricSet put_condition_probes;
+ UpdateMetricSet updates;
PersistenceOperationMetricSet update_puts;
PersistenceOperationMetricSet update_gets;
PersistenceOperationMetricSet update_metadata_gets;
PersistenceOperationMetricSet removes;
+ PersistenceOperationMetricSet remove_condition_probes;
PersistenceOperationMetricSet removelocations;
PersistenceOperationMetricSet gets;
PersistenceOperationMetricSet stats;
PersistenceOperationMetricSet getbucketlists;
- VisitorMetricSet visits;
- metrics::DoubleAverageMetric stateTransitionTime;
- metrics::DoubleAverageMetric set_cluster_state_processing_time;
- metrics::DoubleAverageMetric activate_cluster_state_processing_time;
- metrics::DoubleAverageMetric recoveryModeTime;
- metrics::LongValueMetric docsStored;
- metrics::LongValueMetric bytesStored;
- BucketDbMetrics mutable_dbs;
- BucketDbMetrics read_only_dbs;
+ VisitorMetricSet visits;
+ metrics::DoubleAverageMetric stateTransitionTime;
+ metrics::DoubleAverageMetric set_cluster_state_processing_time;
+ metrics::DoubleAverageMetric activate_cluster_state_processing_time;
+ metrics::DoubleAverageMetric recoveryModeTime;
+ metrics::LongValueMetric docsStored;
+ metrics::LongValueMetric bytesStored;
+ BucketDbMetrics mutable_dbs;
+ BucketDbMetrics read_only_dbs;
explicit DistributorMetricSet();
~DistributorMetricSet() override;
diff --git a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
index 6cb404aaa0a..d6bb5562a07 100644
--- a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
+++ b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
@@ -332,7 +332,9 @@ bool ExternalOperationHandler::onPut(const std::shared_ptr<api::PutCommand>& cmd
if (allow) {
_op = std::make_shared<PutOperation>(_node_ctx, _op_ctx,
_op_ctx.bucket_space_repo().get(bucket_space),
- std::move(cmd), getMetrics().puts, std::move(handle));
+ std::move(cmd),
+ getMetrics().puts, getMetrics().put_condition_probes,
+ std::move(handle));
} else {
_msg_sender.sendUp(makeConcurrentMutationRejectionReply(*cmd, cmd->getDocumentId(), metrics));
}
@@ -386,7 +388,8 @@ bool ExternalOperationHandler::onRemove(const std::shared_ptr<api::RemoveCommand
auto &distributorBucketSpace(_op_ctx.bucket_space_repo().get(bucket_space));
_op = std::make_shared<RemoveOperation>(_node_ctx, _op_ctx, distributorBucketSpace, std::move(cmd),
- getMetrics().removes, std::move(handle));
+ getMetrics().removes, getMetrics().remove_condition_probes,
+ std::move(handle));
} else {
_msg_sender.sendUp(makeConcurrentMutationRejectionReply(*cmd, cmd->getDocumentId(), metrics));
}
diff --git a/storage/src/vespa/storage/distributor/operations/external/check_condition.cpp b/storage/src/vespa/storage/distributor/operations/external/check_condition.cpp
index 9f7dbcaa132..0e12e3e3019 100644
--- a/storage/src/vespa/storage/distributor/operations/external/check_condition.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/check_condition.cpp
@@ -58,7 +58,7 @@ CheckCondition::CheckCondition(const document::Bucket& bucket,
const documentapi::TestAndSetCondition& tas_condition,
const DistributorBucketSpace& bucket_space,
const DistributorNodeContext& node_ctx,
- PersistenceOperationMetricSet& metric,
+ PersistenceOperationMetricSet& condition_probe_metrics,
uint32_t trace_level,
private_ctor_tag)
: _doc_id_bucket(bucket),
@@ -66,7 +66,8 @@ CheckCondition::CheckCondition(const document::Bucket& bucket,
_node_ctx(node_ctx),
_cluster_state_version_at_creation_time(_bucket_space.getClusterState().getVersion()),
_cond_get_op(),
- _sent_message_map()
+ _sent_message_map(),
+ _outcome()
{
// Condition checks only return metadata back to the distributor and thus have an empty fieldset.
// Side note: the BucketId provided to the GetCommand is ignored; GetOperation computes explicitly from the doc ID.
@@ -75,8 +76,8 @@ CheckCondition::CheckCondition(const document::Bucket& bucket,
get_cmd->getTrace().setLevel(trace_level);
_cond_get_op = std::make_shared<GetOperation>(_node_ctx, _bucket_space,
_bucket_space.getBucketDatabase().acquire_read_guard(),
- std::move(get_cmd),
- metric, api::InternalReadConsistency::Strong);
+ std::move(get_cmd), condition_probe_metrics,
+ api::InternalReadConsistency::Strong);
}
CheckCondition::~CheckCondition() = default;
@@ -126,6 +127,10 @@ bool CheckCondition::replica_set_changed_after_get_operation() const {
return (replicas_in_db_now != _cond_get_op->replicas_in_db());
}
+bool CheckCondition::distributor_no_longer_owns_bucket() const {
+ return !_bucket_space.check_ownership_in_pending_and_current_state(_doc_id_bucket.getBucketId()).isOwned();
+}
+
CheckCondition::Outcome::Result
CheckCondition::newest_replica_to_outcome(const std::optional<NewestReplica>& newest) noexcept {
if (!newest) {
@@ -158,9 +163,13 @@ void CheckCondition::handle_internal_get_operation_reply(std::shared_ptr<api::St
reply->steal_trace());
return;
}
- const auto state_version_now = _bucket_space.getClusterState().getVersion();
+ auto state_version_now = _bucket_space.getClusterState().getVersion();
+ if (_bucket_space.has_pending_cluster_state()) {
+ state_version_now = _bucket_space.get_pending_cluster_state().getVersion();
+ }
if ((state_version_now != _cluster_state_version_at_creation_time)
- && replica_set_changed_after_get_operation())
+ && (replica_set_changed_after_get_operation()
+ || distributor_no_longer_owns_bucket()))
{
// BUCKET_NOT_FOUND is semantically (usually) inaccurate here, but it's what we use for this purpose
// in existing operations. Checking the replica set will implicitly check for ownership changes,
@@ -220,7 +229,7 @@ CheckCondition::create_if_inconsistent_replicas(const document::Bucket& bucket,
const documentapi::TestAndSetCondition& tas_condition,
const DistributorNodeContext& node_ctx,
const DistributorStripeOperationContext& op_ctx,
- PersistenceOperationMetricSet& metric,
+ PersistenceOperationMetricSet& condition_probe_metrics,
uint32_t trace_level)
{
// TODO move this check to the caller?
@@ -237,8 +246,8 @@ CheckCondition::create_if_inconsistent_replicas(const document::Bucket& bucket,
if (!all_nodes_support_document_condition_probe(entries, op_ctx)) {
return {}; // Want write-repair, but one or more nodes are too old to use the feature
}
- return std::make_shared<CheckCondition>(bucket, doc_id, tas_condition, bucket_space,
- node_ctx, metric, trace_level, private_ctor_tag{});
+ return std::make_shared<CheckCondition>(bucket, doc_id, tas_condition, bucket_space, node_ctx,
+ condition_probe_metrics, trace_level, private_ctor_tag{});
}
}
diff --git a/storage/src/vespa/storage/distributor/operations/external/check_condition.h b/storage/src/vespa/storage/distributor/operations/external/check_condition.h
index 062c9bb831d..999b79adc3d 100644
--- a/storage/src/vespa/storage/distributor/operations/external/check_condition.h
+++ b/storage/src/vespa/storage/distributor/operations/external/check_condition.h
@@ -114,7 +114,7 @@ public:
const documentapi::TestAndSetCondition& tas_condition,
const DistributorBucketSpace& bucket_space,
const DistributorNodeContext& node_ctx,
- PersistenceOperationMetricSet& metric,
+ PersistenceOperationMetricSet& condition_probe_metrics,
uint32_t trace_level,
private_ctor_tag);
~CheckCondition();
@@ -135,10 +135,11 @@ public:
const documentapi::TestAndSetCondition& tas_condition,
const DistributorNodeContext& node_ctx,
const DistributorStripeOperationContext& op_ctx,
- PersistenceOperationMetricSet& metric,
+ PersistenceOperationMetricSet& condition_probe_metrics,
uint32_t trace_level);
private:
[[nodiscard]] bool replica_set_changed_after_get_operation() const;
+ [[nodiscard]] bool distributor_no_longer_owns_bucket() const;
void handle_internal_get_operation_reply(std::shared_ptr<api::StorageReply> reply);
diff --git a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp
index 952aeff0800..8c6fdb314f3 100644
--- a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp
@@ -26,6 +26,7 @@ PutOperation::PutOperation(const DistributorNodeContext& node_ctx,
DistributorBucketSpace& bucket_space,
std::shared_ptr<api::PutCommand> msg,
PersistenceOperationMetricSet& metric,
+ PersistenceOperationMetricSet& condition_probe_metrics,
SequencingHandle sequencing_handle)
: SequencedOperation(std::move(sequencing_handle)),
_tracker_instance(metric, std::make_shared<api::PutReply>(*msg), node_ctx, op_ctx, msg->getTimestamp()),
@@ -34,7 +35,7 @@ PutOperation::PutOperation(const DistributorNodeContext& node_ctx,
_doc_id_bucket_id(document::BucketIdFactory{}.getBucketId(_msg->getDocumentId())),
_node_ctx(node_ctx),
_op_ctx(op_ctx),
- _temp_metric(metric), // TODO
+ _condition_probe_metrics(condition_probe_metrics),
_bucket_space(bucket_space)
{
}
@@ -156,7 +157,7 @@ void PutOperation::start_conditional_put(DistributorStripeMessageSender& sender)
document::Bucket bucket(_msg->getBucket().getBucketSpace(), _doc_id_bucket_id);
_check_condition = CheckCondition::create_if_inconsistent_replicas(bucket, _bucket_space, _msg->getDocumentId(),
_msg->getCondition(), _node_ctx, _op_ctx,
- _temp_metric, _msg->getTrace().getLevel());
+ _condition_probe_metrics, _msg->getTrace().getLevel());
if (!_check_condition) {
start_direct_put_dispatch(sender);
} else {
diff --git a/storage/src/vespa/storage/distributor/operations/external/putoperation.h b/storage/src/vespa/storage/distributor/operations/external/putoperation.h
index 6befb8d3e38..635accc1865 100644
--- a/storage/src/vespa/storage/distributor/operations/external/putoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/putoperation.h
@@ -28,6 +28,7 @@ public:
DistributorBucketSpace& bucketSpace,
std::shared_ptr<api::PutCommand> msg,
PersistenceOperationMetricSet& metric,
+ PersistenceOperationMetricSet& condition_probe_metrics,
SequencingHandle sequencingHandle = SequencingHandle());
~PutOperation() override;
@@ -44,7 +45,7 @@ private:
document::BucketId _doc_id_bucket_id;
const DistributorNodeContext& _node_ctx;
DistributorStripeOperationContext& _op_ctx;
- PersistenceOperationMetricSet& _temp_metric;
+ PersistenceOperationMetricSet& _condition_probe_metrics;
DistributorBucketSpace& _bucket_space;
std::shared_ptr<CheckCondition> _check_condition;
diff --git a/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp
index 59ae4120fd6..96182b0744f 100644
--- a/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp
@@ -16,6 +16,7 @@ RemoveOperation::RemoveOperation(const DistributorNodeContext& node_ctx,
DistributorBucketSpace& bucketSpace,
std::shared_ptr<api::RemoveCommand> msg,
PersistenceOperationMetricSet& metric,
+ PersistenceOperationMetricSet& condition_probe_metrics,
SequencingHandle sequencingHandle)
: SequencedOperation(std::move(sequencingHandle)),
_tracker_instance(metric,
@@ -26,7 +27,7 @@ RemoveOperation::RemoveOperation(const DistributorNodeContext& node_ctx,
_doc_id_bucket_id(document::BucketIdFactory{}.getBucketId(_msg->getDocumentId())),
_node_ctx(node_ctx),
_op_ctx(op_ctx),
- _temp_metric(metric), // TODO
+ _condition_probe_metrics(condition_probe_metrics),
_bucket_space(bucketSpace),
_check_condition()
{
@@ -48,7 +49,7 @@ void RemoveOperation::start_conditional_remove(DistributorStripeMessageSender& s
document::Bucket bucket(_msg->getBucket().getBucketSpace(), _doc_id_bucket_id);
_check_condition = CheckCondition::create_if_inconsistent_replicas(bucket, _bucket_space, _msg->getDocumentId(),
_msg->getCondition(), _node_ctx, _op_ctx,
- _temp_metric, _msg->getTrace().getLevel());
+ _condition_probe_metrics, _msg->getTrace().getLevel());
if (!_check_condition) {
start_direct_remove_dispatch(sender);
} else {
diff --git a/storage/src/vespa/storage/distributor/operations/external/removeoperation.h b/storage/src/vespa/storage/distributor/operations/external/removeoperation.h
index 349a6182937..9f3a98294ea 100644
--- a/storage/src/vespa/storage/distributor/operations/external/removeoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/removeoperation.h
@@ -19,6 +19,7 @@ public:
DistributorBucketSpace& bucketSpace,
std::shared_ptr<api::RemoveCommand> msg,
PersistenceOperationMetricSet& metric,
+ PersistenceOperationMetricSet& condition_probe_metrics,
SequencingHandle sequencingHandle = SequencingHandle());
~RemoveOperation() override;
@@ -36,7 +37,7 @@ private:
document::BucketId _doc_id_bucket_id;
const DistributorNodeContext& _node_ctx;
DistributorStripeOperationContext& _op_ctx;
- PersistenceOperationMetricSet& _temp_metric;
+ PersistenceOperationMetricSet& _condition_probe_metrics;
DistributorBucketSpace& _bucket_space;
std::shared_ptr<CheckCondition> _check_condition;
diff --git a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
index 0cb4b223c11..73c65f54b21 100644
--- a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
@@ -34,6 +34,7 @@ TwoPhaseUpdateOperation::TwoPhaseUpdateOperation(
: SequencedOperation(std::move(sequencingHandle)),
_updateMetric(metrics.updates),
_putMetric(metrics.update_puts),
+ _put_condition_probe_metrics(metrics.put_condition_probes), // Updates never trigger put write repair, so we sneakily use a ref to someone else
_getMetric(metrics.update_gets),
_metadata_get_metrics(metrics.update_metadata_gets),
_updateCmd(std::move(msg)),
@@ -263,7 +264,7 @@ TwoPhaseUpdateOperation::schedulePutsWithUpdatedDocument(std::shared_ptr<documen
document::Bucket bucket(_updateCmd->getBucket().getBucketSpace(), document::BucketId(0));
auto put = std::make_shared<api::PutCommand>(bucket, doc, putTimestamp);
copyMessageSettings(*_updateCmd, *put);
- auto putOperation = std::make_shared<PutOperation>(_node_ctx, _op_ctx, _bucketSpace, std::move(put), _putMetric);
+ auto putOperation = std::make_shared<PutOperation>(_node_ctx, _op_ctx, _bucketSpace, std::move(put), _putMetric, _put_condition_probe_metrics);
PutOperation & op = *putOperation;
IntermediateMessageSender intermediate(_sentMessageMap, std::move(putOperation), sender);
op.start(intermediate, _node_ctx.clock().getSystemTime());
diff --git a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h
index 486ed766510..d2ad5359fa6 100644
--- a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h
@@ -139,6 +139,7 @@ private:
UpdateMetricSet& _updateMetric;
PersistenceOperationMetricSet& _putMetric;
+ PersistenceOperationMetricSet& _put_condition_probe_metrics;
PersistenceOperationMetricSet& _getMetric;
PersistenceOperationMetricSet& _metadata_get_metrics;
std::shared_ptr<api::UpdateCommand> _updateCmd;
diff --git a/storage/src/vespa/storage/distributor/persistence_operation_metric_set.cpp b/storage/src/vespa/storage/distributor/persistence_operation_metric_set.cpp
index 944b4bafa0a..e66884c4060 100644
--- a/storage/src/vespa/storage/distributor/persistence_operation_metric_set.cpp
+++ b/storage/src/vespa/storage/distributor/persistence_operation_metric_set.cpp
@@ -58,8 +58,8 @@ PersistenceFailuresMetricSet::clone(std::vector<Metric::UP>& ownerList, CopyType
if (copyType == INACTIVE) {
return MetricSet::clone(ownerList, INACTIVE, owner, includeUnused);
}
- return (PersistenceFailuresMetricSet*)
- (new PersistenceFailuresMetricSet(owner))->assignValues(*this);
+ return dynamic_cast<PersistenceFailuresMetricSet*>(
+ (new PersistenceFailuresMetricSet(owner))->assignValues(*this));
}
PersistenceOperationMetricSet::PersistenceOperationMetricSet(const std::string& name, MetricSet* owner)
@@ -69,6 +69,11 @@ PersistenceOperationMetricSet::PersistenceOperationMetricSet(const std::string&
failures(this)
{ }
+PersistenceOperationMetricSet::PersistenceOperationMetricSet(const std::string& name)
+ : PersistenceOperationMetricSet(name, nullptr)
+{
+}
+
PersistenceOperationMetricSet::~PersistenceOperationMetricSet() = default;
MetricSet *
@@ -78,9 +83,8 @@ PersistenceOperationMetricSet::clone(std::vector<Metric::UP>& ownerList, CopyTyp
if (copyType == INACTIVE) {
return MetricSet::clone(ownerList, INACTIVE, owner, includeUnused);
}
- return (PersistenceOperationMetricSet*)
- (new PersistenceOperationMetricSet(getName(), owner))
- ->assignValues(*this);
+ return dynamic_cast<PersistenceOperationMetricSet*>(
+ (new PersistenceOperationMetricSet(getName(), owner))->assignValues(*this));
}
void
diff --git a/storage/src/vespa/storage/distributor/persistence_operation_metric_set.h b/storage/src/vespa/storage/distributor/persistence_operation_metric_set.h
index b818d1bdd9f..eb1c3f57252 100644
--- a/storage/src/vespa/storage/distributor/persistence_operation_metric_set.h
+++ b/storage/src/vespa/storage/distributor/persistence_operation_metric_set.h
@@ -40,10 +40,11 @@ class PersistenceOperationMetricSet : public metrics::MetricSet
mutable std::mutex _mutex;
public:
metrics::DoubleAverageMetric latency;
- metrics::LongCountMetric ok;
+ metrics::LongCountMetric ok;
PersistenceFailuresMetricSet failures;
- PersistenceOperationMetricSet(const std::string& name, metrics::MetricSet* owner = nullptr);
+ PersistenceOperationMetricSet(const std::string& name, metrics::MetricSet* owner);
+ explicit PersistenceOperationMetricSet(const std::string& name);
~PersistenceOperationMetricSet() override;
MetricSet * clone(std::vector<Metric::UP>& ownerList, CopyType copyType,
@@ -57,7 +58,6 @@ public:
*/
void updateFromResult(const api::ReturnCode& result);
- friend class LockWrapper;
class LockWrapper {
std::unique_lock<std::mutex> _lock;
PersistenceOperationMetricSet& _self;
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
index c0f4041e284..f6b7c7e5f0b 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
@@ -184,7 +184,8 @@ computeAllPossibleHandlerThreads(const vespa::config::content::StorFilestorConfi
return cfg.numThreads +
computeNumResponseThreads(cfg.numResponseThreads) +
cfg.numNetworkThreads +
- cfg.numVisitorThreads;
+ cfg.numVisitorThreads +
+ 1; // Async cluster state processing thread (might be a pessimization to include here...)
}
}
diff --git a/storage/src/vespa/storage/persistence/persistenceutil.cpp b/storage/src/vespa/storage/persistence/persistenceutil.cpp
index 77e8532f0d2..e8534e3f299 100644
--- a/storage/src/vespa/storage/persistence/persistenceutil.cpp
+++ b/storage/src/vespa/storage/persistence/persistenceutil.cpp
@@ -36,6 +36,7 @@ MessageTracker::MessageTracker(const framework::MilliSecTimer & timer,
ThrottleToken throttle_token)
: MessageTracker(timer, env, replySender, true, std::move(bucketLock), std::move(msg), std::move(throttle_token))
{}
+
MessageTracker::MessageTracker(const framework::MilliSecTimer & timer,
const PersistenceUtil & env,
MessageSender & replySender,
@@ -90,7 +91,7 @@ MessageTracker::sendReply() {
if (count_result_as_failure()) {
_env._metrics.failedOperations.inc();
}
- vespalib::duration duration = vespalib::from_s(_timer.getElapsedTimeAsDouble()/1000.0);
+ vespalib::duration duration = _timer.getElapsedTime();
if (duration >= WARN_ON_SLOW_OPERATIONS) {
LOGBT(warning, _msg->getType().toString(),
"Slow processing of message %s. Processing time: %1.1f s (>=%1.1f s)",
diff --git a/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.cpp b/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.cpp
index 9d7dd95d922..3b97ff6c018 100644
--- a/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.cpp
+++ b/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.cpp
@@ -28,6 +28,7 @@ ChangedBucketOwnershipHandler::ChangedBucketOwnershipHandler(
_component(compReg, "changedbucketownershiphandler"),
_metrics(),
_configFetcher(std::make_unique<config::ConfigFetcher>(configUri.getContext())),
+ _state_sync_executor(1), // single thread for sequential task execution
_stateLock(),
_currentState(), // Not set yet, so ownership will not be valid
_currentOwnership(std::make_shared<OwnershipState>(
@@ -98,7 +99,7 @@ ChangedBucketOwnershipHandler::Metrics::Metrics(metrics::MetricSet* owner)
idealStateOpsAborted("ideal_state_ops_aborted", {}, "Number of outdated ideal state operations aborted", this),
externalLoadOpsAborted("external_load_ops_aborted", {}, "Number of outdated external load operations aborted", this)
{}
-ChangedBucketOwnershipHandler::Metrics::~Metrics() { }
+ChangedBucketOwnershipHandler::Metrics::~Metrics() = default;
ChangedBucketOwnershipHandler::OwnershipState::OwnershipState(const ContentBucketSpaceRepo &contentBucketSpaceRepo,
std::shared_ptr<const lib::ClusterStateBundle> state)
@@ -114,7 +115,7 @@ ChangedBucketOwnershipHandler::OwnershipState::OwnershipState(const ContentBucke
}
-ChangedBucketOwnershipHandler::OwnershipState::~OwnershipState() {}
+ChangedBucketOwnershipHandler::OwnershipState::~OwnershipState() = default;
const lib::ClusterState&
@@ -235,18 +236,79 @@ ChangedBucketOwnershipHandler::makeLazyAbortPredicate(
_component.getIndex()));
}
-/*
- * If we go from:
- * 1) Not all down -> all distributors down
- * - abort ops for _all_ buckets
- * 2) All distributors down -> not down
- * - no-op, since down edge must have been handled first
- * 3) All down -> all down
- * - no-op
- * 4) Some nodes down or up
- * - abort ops for buckets that have changed ownership between
- * current and new cluster state.
- */
+class ChangedBucketOwnershipHandler::ClusterStateSyncAndApplyTask
+ : public vespalib::Executor::Task
+{
+ ChangedBucketOwnershipHandler& _owner;
+ std::shared_ptr<api::SetSystemStateCommand> _command;
+public:
+ ClusterStateSyncAndApplyTask(ChangedBucketOwnershipHandler& owner,
+ std::shared_ptr<api::SetSystemStateCommand> command) noexcept
+ : _owner(owner),
+ _command(std::move(command))
+ {}
+
+ /*
+ * If we go from:
+ * 1) Not all down -> all distributors down
+ * - abort ops for _all_ buckets
+ * 2) All distributors down -> not down
+ * - no-op, since down edge must have been handled first
+ * 3) All down -> all down
+ * - no-op
+ * 4) Some nodes down or up
+ * - abort ops for buckets that have changed ownership between
+ * current and new cluster state.
+ */
+ void run() override {
+ OwnershipState::CSP old_ownership;
+ OwnershipState::CSP new_ownership;
+ // Update the ownership state inspected by all bucket-mutating operations passing through
+ // this component so that messages from outdated distributors will be rejected. Note that
+ // this is best-effort; with our current multitude of RPC threads directly dispatching
+ // operations into the persistence provider, it's possible for a thread carrying an outdated
+ // operation to have already passed the barrier, but be preempted so that it will apply the
+ // op _after_ the abort step has completed.
+ {
+ std::lock_guard guard(_owner._stateLock);
+ old_ownership = _owner._currentOwnership;
+ _owner.setCurrentOwnershipWithStateNoLock(_command->getClusterStateBundle());
+ new_ownership = _owner._currentOwnership;
+ }
+ assert(new_ownership->valid());
+ // If we're going from not having a state to having a state, we per
+ // definition cannot possibly have gotten any load that needs aborting,
+ // as no such load is allowed through this component when this is the
+ // case.
+ if (!old_ownership->valid()) {
+ return _owner.sendDown(_command);
+ }
+
+ if (allDistributorsDownInState(old_ownership->getBaselineState())) {
+ LOG(debug, "No need to send aborts on transition '%s' -> '%s'",
+ old_ownership->getBaselineState().toString().c_str(),
+ new_ownership->getBaselineState().toString().c_str());
+ return _owner.sendDown(_command);;
+ }
+ _owner.logTransition(old_ownership->getBaselineState(), new_ownership->getBaselineState());
+
+ metrics::MetricTimer duration_timer;
+ auto predicate = _owner.makeLazyAbortPredicate(old_ownership, new_ownership);
+ auto abort_cmd = std::make_shared<AbortBucketOperationsCommand>(std::move(predicate));
+
+ // Will not return until all operation aborts have been performed
+ // on the lower level links, at which point it is safe to send down
+ // the SetSystemStateCommand.
+ _owner.sendDown(abort_cmd);
+ duration_timer.stop(_owner._metrics.averageAbortProcessingTime);
+
+ // Conflicting operations have been aborted and incoming conflicting operations
+ // are aborted inline; send down the state command actually making the state change
+ // visible on the content node.
+ _owner.sendDown(_command);
+ }
+};
+
bool
ChangedBucketOwnershipHandler::onSetSystemState(
const std::shared_ptr<api::SetSystemStateCommand>& stateCmd)
@@ -255,47 +317,13 @@ ChangedBucketOwnershipHandler::onSetSystemState(
LOG(debug, "Operation aborting is config-disabled");
return false; // Early out.
}
- OwnershipState::CSP oldOwnership;
- OwnershipState::CSP newOwnership;
- // Get old state and update own current cluster state _before_ it is
- // applied to the rest of the system. This helps ensure that no message
- // can get through in the off-case that the lower level storage links
- // don't apply the state immediately for some reason.
- {
- std::lock_guard guard(_stateLock);
- oldOwnership = _currentOwnership;
- setCurrentOwnershipWithStateNoLock(stateCmd->getClusterStateBundle());
- newOwnership = _currentOwnership;
- }
- assert(newOwnership->valid());
- // If we're going from not having a state to having a state, we per
- // definition cannot possibly have gotten any load that needs aborting,
- // as no such load is allowed through this component when this is the
- // case.
- if (!oldOwnership->valid()) {
- return false;
- }
-
- if (allDistributorsDownInState(oldOwnership->getBaselineState())) {
- LOG(debug, "No need to send aborts on transition '%s' -> '%s'",
- oldOwnership->getBaselineState().toString().c_str(),
- newOwnership->getBaselineState().toString().c_str());
- return false;
- }
- logTransition(oldOwnership->getBaselineState(), newOwnership->getBaselineState());
-
- metrics::MetricTimer durationTimer;
- auto predicate(makeLazyAbortPredicate(oldOwnership, newOwnership));
- AbortBucketOperationsCommand::SP cmd(
- new AbortBucketOperationsCommand(std::move(predicate)));
-
- // Will not return until all operation aborts have been performed
- // on the lower level links, at which point it is safe to send down
- // the SetSystemStateCommand.
- sendDown(cmd);
-
- durationTimer.stop(_metrics.averageAbortProcessingTime);
- return false;
+ // Dispatch to background worker. This indirection is because operations such as lid-space compaction
+ // may cause the implicit operation abort waiting step to block the caller for a relatively long time.
+ // It is very important that the executor only has 1 thread, which means this has FIFO behavior.
+ [[maybe_unused]] auto rejected_task = _state_sync_executor.execute(std::make_unique<ClusterStateSyncAndApplyTask>(*this, stateCmd));
+ // If this fails, we have processed a message _after_ onClose has been called, which should not happen.
+ assert(!rejected_task);
+ return true;
}
/**
@@ -411,8 +439,7 @@ ChangedBucketOwnershipHandler::onDown(
const std::shared_ptr<api::StorageMessage>& msg)
{
if (msg->getType() == api::MessageType::SETSYSTEMSTATE) {
- return onSetSystemState(
- std::static_pointer_cast<api::SetSystemStateCommand>(msg));
+ return onSetSystemState(std::static_pointer_cast<api::SetSystemStateCommand>(msg));
}
if (!isMutatingCommandAndNeedsChecking(*msg)) {
return false;
@@ -451,4 +478,10 @@ ChangedBucketOwnershipHandler::onInternalReply(
return (reply->getType() == AbortBucketOperationsReply::ID);
}
+void
+ChangedBucketOwnershipHandler::onClose()
+{
+ _state_sync_executor.shutdown().sync();
+}
+
}
diff --git a/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.h b/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.h
index e753d96871e..8798d109955 100644
--- a/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.h
+++ b/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.h
@@ -10,6 +10,7 @@
#include <vespa/metrics/valuemetric.h>
#include <vespa/metrics/countmetric.h>
#include <vespa/metrics/metricset.h>
+#include <vespa/vespalib/util/threadstackexecutor.h>
#include <atomic>
#include <vector>
#include <unordered_map>
@@ -60,26 +61,24 @@ class ChangedBucketOwnershipHandler
private config::IFetcherCallback<vespa::config::content::PersistenceConfig>
{
public:
- class Metrics : public metrics::MetricSet
- {
+ class Metrics : public metrics::MetricSet {
public:
metrics::LongAverageMetric averageAbortProcessingTime;
metrics::LongCountMetric idealStateOpsAborted;
metrics::LongCountMetric externalLoadOpsAborted;
- Metrics(metrics::MetricSet* owner = 0);
- ~Metrics();
+ explicit Metrics(metrics::MetricSet* owner = nullptr);
+ ~Metrics() override;
};
/**
* Wrapper around the distribution & state pairs that decides how to
* compute the owner distributor for a bucket. It's possible to have
* an ownership state with a nullptr cluster state when the node
- * initially starts up, which is why no owership state must be used unless
+ * initially starts up, which is why no ownership state must be used unless
* invoking valid() on it returns true.
*/
- class OwnershipState
- {
+ class OwnershipState {
using BucketSpace = document::BucketSpace;
std::unordered_map<BucketSpace, std::shared_ptr<const lib::Distribution>, BucketSpace::hash> _distributions;
std::shared_ptr<const lib::ClusterStateBundle> _state;
@@ -93,7 +92,7 @@ public:
static const uint16_t FAILED_TO_RESOLVE = 0xffff;
- bool valid() const {
+ [[nodiscard]] bool valid() const noexcept {
return (!_distributions.empty() && _state);
}
@@ -114,16 +113,21 @@ public:
void reloadClusterState();
private:
- ServiceLayerComponent _component;
- Metrics _metrics;
- std::unique_ptr<config::ConfigFetcher> _configFetcher;
- mutable std::mutex _stateLock;
- std::shared_ptr<const lib::ClusterStateBundle> _currentState;
- OwnershipState::CSP _currentOwnership;
-
- std::atomic<bool> _abortQueuedAndPendingOnStateChange;
- std::atomic<bool> _abortMutatingIdealStateOps;
- std::atomic<bool> _abortMutatingExternalLoadOps;
+ class ClusterStateSyncAndApplyTask;
+
+ using ConfigFetcherUP = std::unique_ptr<config::ConfigFetcher>;
+ using ClusterStateBundleCSP = std::shared_ptr<const lib::ClusterStateBundle>;
+
+ ServiceLayerComponent _component;
+ Metrics _metrics;
+ ConfigFetcherUP _configFetcher;
+ vespalib::ThreadStackExecutor _state_sync_executor;
+ mutable std::mutex _stateLock;
+ ClusterStateBundleCSP _currentState;
+ OwnershipState::CSP _currentOwnership;
+ std::atomic<bool> _abortQueuedAndPendingOnStateChange;
+ std::atomic<bool> _abortMutatingIdealStateOps;
+ std::atomic<bool> _abortMutatingExternalLoadOps;
std::unique_ptr<AbortBucketOperationsCommand::AbortPredicate>
makeLazyAbortPredicate(
@@ -183,14 +187,12 @@ private:
public:
ChangedBucketOwnershipHandler(const config::ConfigUri& configUri,
ServiceLayerComponentRegister& compReg);
- ~ChangedBucketOwnershipHandler();
+ ~ChangedBucketOwnershipHandler() override;
- bool onSetSystemState(
- const std::shared_ptr<api::SetSystemStateCommand>&) override;
+ bool onSetSystemState(const std::shared_ptr<api::SetSystemStateCommand>&) override;
bool onDown(const std::shared_ptr<api::StorageMessage>&) override;
-
- bool onInternalReply(
- const std::shared_ptr<api::InternalReply>& reply) override;
+ bool onInternalReply(const std::shared_ptr<api::InternalReply>& reply) override;
+ void onClose() override;
void configure(std::unique_ptr<vespa::config::content::PersistenceConfig>) override;