diff options
author | Ola Aunrønning <olaa@yahooinc.com> | 2023-06-08 10:58:17 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-06-08 10:58:17 +0200 |
commit | 79de2d78433c11932357d0c244704b16fc87da21 (patch) | |
tree | 382a06ce4e1217e395fbba25c1ce74a1afaadbe5 /storage | |
parent | 96d3814b80a693ee46640ae89a88fdb2d78dcd40 (diff) | |
parent | d7568f0d450df2287657ac18c37955a1867496f5 (diff) |
Merge branch 'master' into olaa/dataplane-proxy-config
Diffstat (limited to 'storage')
22 files changed, 260 insertions, 156 deletions
diff --git a/storage/src/tests/distributor/check_condition_test.cpp b/storage/src/tests/distributor/check_condition_test.cpp index 1b5cede8af6..757a9329ea6 100644 --- a/storage/src/tests/distributor/check_condition_test.cpp +++ b/storage/src/tests/distributor/check_condition_test.cpp @@ -242,6 +242,32 @@ TEST_F(CheckConditionTest, check_fails_if_replica_set_changed_between_start_and_ }); } +TEST_F(CheckConditionTest, check_fails_if_bucket_ownership_changed_between_start_and_completion_pending_transition_case) { + test_cond_with_2_gets_sent([&](auto& cond) { + cond.handle_reply(_sender, make_matched_reply(0)); + simulate_set_pending_cluster_state("version:2 storage:1 distributor:1 .0.s:d"); // technically, no distributors own anything + cond.handle_reply(_sender, make_matched_reply(1)); + }, [&](auto& outcome) { + EXPECT_FALSE(outcome.matched_condition()); + EXPECT_FALSE(outcome.not_found()); + EXPECT_TRUE(outcome.failed()); + EXPECT_EQ(outcome.error_code().getResult(), api::ReturnCode::BUCKET_NOT_FOUND); + }); +} + +TEST_F(CheckConditionTest, check_fails_if_bucket_ownership_changed_between_start_and_completion_completed_transition_case) { + test_cond_with_2_gets_sent([&](auto& cond) { + cond.handle_reply(_sender, make_matched_reply(0)); + enable_cluster_state("version:2 storage:1 distributor:1 .0.s:d"); // technically, no distributors own anything + cond.handle_reply(_sender, make_matched_reply(1)); + }, [&](auto& outcome) { + EXPECT_FALSE(outcome.matched_condition()); + EXPECT_FALSE(outcome.not_found()); + EXPECT_TRUE(outcome.failed()); + EXPECT_EQ(outcome.error_code().getResult(), api::ReturnCode::BUCKET_NOT_FOUND); + }); +} + TEST_F(CheckConditionTest, nested_get_traces_are_propagated_to_outcome) { test_cond_with_2_gets_sent([&](auto& cond) { cond.handle_reply(_sender, make_trace_reply(0, 100, "hello")); @@ -253,4 +279,20 @@ TEST_F(CheckConditionTest, nested_get_traces_are_propagated_to_outcome) { }); } +TEST_F(CheckConditionTest, condition_evaluation_increments_probe_latency_metrics) { + getClock().setAbsoluteTimeInSeconds(1); + EXPECT_EQ(_metrics.latency.getLongValue("count"), 0); + EXPECT_EQ(_metrics.ok.getLongValue("last"), 0); + test_cond_with_2_gets_sent([&](auto& cond) { + cond.handle_reply(_sender, make_matched_reply(0)); + getClock().setAbsoluteTimeInSeconds(3); + cond.handle_reply(_sender, make_matched_reply(1)); + }, [&](auto& outcome) noexcept { + (void)outcome; + }); + EXPECT_EQ(_metrics.latency.getLongValue("count"), 1); + EXPECT_EQ(_metrics.ok.getLongValue("last"), 1); + EXPECT_DOUBLE_EQ(_metrics.latency.getLast(), 2'000.0); // in millis +} + } diff --git a/storage/src/tests/distributor/putoperationtest.cpp b/storage/src/tests/distributor/putoperationtest.cpp index ff375e5b902..76b6741442e 100644 --- a/storage/src/tests/distributor/putoperationtest.cpp +++ b/storage/src/tests/distributor/putoperationtest.cpp @@ -73,7 +73,8 @@ public: operation_context(), getDistributorBucketSpace(), msg, - metrics().puts); + metrics().puts, + metrics().put_condition_probes); op->start(_sender); } diff --git a/storage/src/tests/distributor/removeoperationtest.cpp b/storage/src/tests/distributor/removeoperationtest.cpp index d352d23bb8c..d169c80a95d 100644 --- a/storage/src/tests/distributor/removeoperationtest.cpp +++ b/storage/src/tests/distributor/removeoperationtest.cpp @@ -41,7 +41,8 @@ struct RemoveOperationTest : Test, DistributorStripeTestUtil { operation_context(), getDistributorBucketSpace(), msg, - metrics().removes); + metrics().removes, + metrics().remove_condition_probes); op->start(_sender); } diff --git a/storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp b/storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp index 4c83dde30da..ae2385a36d8 100644 --- a/storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp +++ b/storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp @@ -40,15 +40,11 @@ struct ChangedBucketOwnershipHandlerTest : Test { uint16_t wantedOwner, const lib::ClusterState& state); - std::shared_ptr<api::SetSystemStateCommand> createStateCmd( - const lib::ClusterState& state) const - { + std::shared_ptr<api::SetSystemStateCommand> createStateCmd(const lib::ClusterState& state) const { return std::make_shared<api::SetSystemStateCommand>(state); } - std::shared_ptr<api::SetSystemStateCommand> createStateCmd( - const std::string& stateStr) const - { + std::shared_ptr<api::SetSystemStateCommand> createStateCmd(const std::string& stateStr) const { return createStateCmd(lib::ClusterState(stateStr)); } @@ -71,11 +67,17 @@ struct ChangedBucketOwnershipHandlerTest : Test { template <typename MsgType, typename... MsgParams> void expectDownAbortsMessage(bool expected, MsgParams&& ... params); - lib::ClusterState getDefaultTestClusterState() const { + std::shared_ptr<AbortBucketOperationsCommand> fetch_dispatched_abort_operations_command() { + _bottom->waitForMessages(2, 60); // abort cmd + set cluster state cmd + EXPECT_EQ(2, _bottom->getNumCommands()); + return std::dynamic_pointer_cast<AbortBucketOperationsCommand>(_bottom->getCommand(0)); + } + + static lib::ClusterState getDefaultTestClusterState() { return lib::ClusterState("distributor:4 storage:1"); } - lib::ClusterState getStorageDownTestClusterState() const { + static lib::ClusterState getStorageDownTestClusterState() { return lib::ClusterState("distributor:4 storage:1 .0.s:d"); } @@ -173,29 +175,26 @@ hasAbortedNoneOf(const AbortBucketOperationsCommand::SP& cmd, const Vec& v) bool hasOnlySetSystemStateCmdQueued(DummyStorageLink& link) { + link.waitForMessages(1, 60); if (link.getNumCommands() != 1) { std::cerr << "expected 1 command, found" << link.getNumCommands() << "\n"; } - api::SetSystemStateCommand::SP cmd( - std::dynamic_pointer_cast<api::SetSystemStateCommand>( - link.getCommand(0))); - return (cmd.get() != 0); + auto cmd = std::dynamic_pointer_cast<api::SetSystemStateCommand>(link.getCommand(0)); + return static_cast<bool>(cmd); } } void -ChangedBucketOwnershipHandlerTest::applyDistribution( - Redundancy redundancy, NodeCount nodeCount) +ChangedBucketOwnershipHandlerTest::applyDistribution(Redundancy redundancy, NodeCount nodeCount) { _app->setDistribution(redundancy, nodeCount); _handler->storageDistributionChanged(); } void -ChangedBucketOwnershipHandlerTest::applyClusterState( - const lib::ClusterState& state) +ChangedBucketOwnershipHandlerTest::applyClusterState(const lib::ClusterState& state) { _app->setClusterState(state); _handler->reloadClusterState(); @@ -212,10 +211,8 @@ TEST_F(ChangedBucketOwnershipHandlerTest, enumerate_buckets_belonging_on_changed auto node2Buckets(insertBuckets(2, 2, stateBefore)); _top->sendDown(createStateCmd("distributor:4 .1.s:d .3.s:d storage:1")); - // TODO: refactor into own function - ASSERT_EQ(2, _bottom->getNumCommands()); - auto cmd = std::dynamic_pointer_cast<AbortBucketOperationsCommand>(_bottom->getCommand(0)); - ASSERT_TRUE(cmd.get() != 0); + auto cmd = fetch_dispatched_abort_operations_command(); + ASSERT_TRUE(cmd); EXPECT_TRUE(hasAbortedAllOf(cmd, node1Buckets)); EXPECT_TRUE(hasAbortedAllOf(cmd, node3Buckets)); @@ -280,10 +277,8 @@ TEST_F(ChangedBucketOwnershipHandlerTest, down_edge_to_no_available_distributors lib::ClusterState downState("distributor:3 .0.s:d .1.s:s .2.s:s storage:1"); _top->sendDown(createStateCmd(downState)); - // TODO: refactor into own function - ASSERT_EQ(2, _bottom->getNumCommands()); - auto cmd = std::dynamic_pointer_cast<AbortBucketOperationsCommand>(_bottom->getCommand(0)); - ASSERT_TRUE(cmd.get() != 0); + auto cmd = fetch_dispatched_abort_operations_command(); + ASSERT_TRUE(cmd); EXPECT_TRUE(hasAbortedAllOf(cmd, node0Buckets)); EXPECT_TRUE(hasAbortedAllOf(cmd, node1Buckets)); @@ -304,10 +299,8 @@ TEST_F(ChangedBucketOwnershipHandlerTest, ownership_changed_on_distributor_up_ed auto node2Buckets(insertBuckets(2, 2, stateAfter)); _top->sendDown(createStateCmd(stateAfter)); - // TODO: refactor into own function - ASSERT_EQ(2, _bottom->getNumCommands()); - auto cmd = std::dynamic_pointer_cast<AbortBucketOperationsCommand>(_bottom->getCommand(0)); - ASSERT_TRUE(cmd.get() != 0); + auto cmd = fetch_dispatched_abort_operations_command(); + ASSERT_TRUE(cmd); EXPECT_TRUE(hasAbortedAllOf(cmd, node1Buckets)); EXPECT_TRUE(hasAbortedNoneOf(cmd, node0Buckets)); @@ -319,8 +312,7 @@ TEST_F(ChangedBucketOwnershipHandlerTest, ownership_changed_on_distributor_up_ed } void -ChangedBucketOwnershipHandlerTest::sendAndExpectAbortedCreateBucket( - uint16_t fromDistributorIndex) +ChangedBucketOwnershipHandlerTest::sendAndExpectAbortedCreateBucket(uint16_t fromDistributorIndex) { document::BucketId bucket(16, 6786); auto msg = std::make_shared<api::CreateBucketCommand>(makeDocumentBucket(bucket)); @@ -350,7 +342,7 @@ TEST_F(ChangedBucketOwnershipHandlerTest, distribution_config_change_updates_own /** * Generate and dispatch a message of the given type with the provided - * aruments as if that message was sent from distributor 1. Messages will + * arguments as if that message was sent from distributor 1. Messages will * be checked as if the state contains 4 distributors in Up state. This * means that it suffices to send in a message with a bucket that is not * owned by distributor 1 in this state to trigger an abort. @@ -382,7 +374,7 @@ ChangedBucketOwnershipHandlerTest::expectChangeAbortsMessage(bool expected, MsgP /** * Generate and dispatch a message of the given type with the provided - * aruments as if that message was sent from distributor 1. Messages will + * arguments as if that message was sent from distributor 1. Messages will * be checked as if the state contains 4 distributors in Up state and storage * node is down. This means that any abortable message will trigger an abort. */ @@ -394,6 +386,7 @@ ChangedBucketOwnershipHandlerTest::expectDownAbortsMessage(bool expected, MsgPar (void) _bottom->getCommandsOnce(); ASSERT_NO_FATAL_FAILURE((expectChangeAbortsMessage<MsgType, MsgParams...>(false, std::forward<MsgParams>(params)...))); _top->sendDown(createStateCmd(getStorageDownTestClusterState())); + _bottom->waitForMessages(3, 60); ASSERT_EQ(_bottom->getNumCommands(), 3); auto setSystemStateCommand = std::dynamic_pointer_cast<api::SetSystemStateCommand>(_bottom->getCommand(2)); ASSERT_TRUE(setSystemStateCommand); diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_component.cpp b/storage/src/vespa/storage/distributor/distributor_stripe_component.cpp index b47e0697a91..9a5fd595b1d 100644 --- a/storage/src/vespa/storage/distributor/distributor_stripe_component.cpp +++ b/storage/src/vespa/storage/distributor/distributor_stripe_component.cpp @@ -166,7 +166,10 @@ DistributorStripeComponent::update_bucket_database( } } - UpdateBucketDatabaseProcessor processor(getClock(), found_down_node ? up_nodes : changed_nodes, bucketSpace.get_ideal_service_layer_nodes_bundle(bucket.getBucketId()).get_available_nodes(), (update_flags & DatabaseUpdate::RESET_TRUSTED) != 0); + UpdateBucketDatabaseProcessor processor(getClock(), + found_down_node ? up_nodes : changed_nodes, + bucketSpace.get_ideal_service_layer_nodes_bundle(bucket.getBucketId()).get_available_nodes(), + (update_flags & DatabaseUpdate::RESET_TRUSTED) != 0); bucketSpace.getBucketDatabase().process_update(bucket.getBucketId(), processor, (update_flags & DatabaseUpdate::CREATE_IF_NONEXISTING) != 0); } diff --git a/storage/src/vespa/storage/distributor/distributormetricsset.cpp b/storage/src/vespa/storage/distributor/distributormetricsset.cpp index fad44782dd4..cbc0e6f6eef 100644 --- a/storage/src/vespa/storage/distributor/distributormetricsset.cpp +++ b/storage/src/vespa/storage/distributor/distributormetricsset.cpp @@ -16,11 +16,13 @@ BucketDbMetrics::~BucketDbMetrics() = default; DistributorMetricSet::DistributorMetricSet() : MetricSet("distributor", {{"distributor"}}, ""), puts("puts", this), + put_condition_probes("put_condition_probes", this), updates(this), update_puts("update_puts", this), update_gets("update_gets", this), update_metadata_gets("update_metadata_gets", this), removes("removes", this), + remove_condition_probes("remove_condition_probes", this), removelocations("removelocations", this), gets("gets", this), stats("stats", this), diff --git a/storage/src/vespa/storage/distributor/distributormetricsset.h b/storage/src/vespa/storage/distributor/distributormetricsset.h index ac140b85282..739e84759f1 100644 --- a/storage/src/vespa/storage/distributor/distributormetricsset.h +++ b/storage/src/vespa/storage/distributor/distributormetricsset.h @@ -20,24 +20,26 @@ struct BucketDbMetrics : metrics::MetricSet { class DistributorMetricSet : public metrics::MetricSet { public: PersistenceOperationMetricSet puts; - UpdateMetricSet updates; + PersistenceOperationMetricSet put_condition_probes; + UpdateMetricSet updates; PersistenceOperationMetricSet update_puts; PersistenceOperationMetricSet update_gets; PersistenceOperationMetricSet update_metadata_gets; PersistenceOperationMetricSet removes; + PersistenceOperationMetricSet remove_condition_probes; PersistenceOperationMetricSet removelocations; PersistenceOperationMetricSet gets; PersistenceOperationMetricSet stats; PersistenceOperationMetricSet getbucketlists; - VisitorMetricSet visits; - metrics::DoubleAverageMetric stateTransitionTime; - metrics::DoubleAverageMetric set_cluster_state_processing_time; - metrics::DoubleAverageMetric activate_cluster_state_processing_time; - metrics::DoubleAverageMetric recoveryModeTime; - metrics::LongValueMetric docsStored; - metrics::LongValueMetric bytesStored; - BucketDbMetrics mutable_dbs; - BucketDbMetrics read_only_dbs; + VisitorMetricSet visits; + metrics::DoubleAverageMetric stateTransitionTime; + metrics::DoubleAverageMetric set_cluster_state_processing_time; + metrics::DoubleAverageMetric activate_cluster_state_processing_time; + metrics::DoubleAverageMetric recoveryModeTime; + metrics::LongValueMetric docsStored; + metrics::LongValueMetric bytesStored; + BucketDbMetrics mutable_dbs; + BucketDbMetrics read_only_dbs; explicit DistributorMetricSet(); ~DistributorMetricSet() override; diff --git a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp index 6cb404aaa0a..d6bb5562a07 100644 --- a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp +++ b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp @@ -332,7 +332,9 @@ bool ExternalOperationHandler::onPut(const std::shared_ptr<api::PutCommand>& cmd if (allow) { _op = std::make_shared<PutOperation>(_node_ctx, _op_ctx, _op_ctx.bucket_space_repo().get(bucket_space), - std::move(cmd), getMetrics().puts, std::move(handle)); + std::move(cmd), + getMetrics().puts, getMetrics().put_condition_probes, + std::move(handle)); } else { _msg_sender.sendUp(makeConcurrentMutationRejectionReply(*cmd, cmd->getDocumentId(), metrics)); } @@ -386,7 +388,8 @@ bool ExternalOperationHandler::onRemove(const std::shared_ptr<api::RemoveCommand auto &distributorBucketSpace(_op_ctx.bucket_space_repo().get(bucket_space)); _op = std::make_shared<RemoveOperation>(_node_ctx, _op_ctx, distributorBucketSpace, std::move(cmd), - getMetrics().removes, std::move(handle)); + getMetrics().removes, getMetrics().remove_condition_probes, + std::move(handle)); } else { _msg_sender.sendUp(makeConcurrentMutationRejectionReply(*cmd, cmd->getDocumentId(), metrics)); } diff --git a/storage/src/vespa/storage/distributor/operations/external/check_condition.cpp b/storage/src/vespa/storage/distributor/operations/external/check_condition.cpp index 9f7dbcaa132..0e12e3e3019 100644 --- a/storage/src/vespa/storage/distributor/operations/external/check_condition.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/check_condition.cpp @@ -58,7 +58,7 @@ CheckCondition::CheckCondition(const document::Bucket& bucket, const documentapi::TestAndSetCondition& tas_condition, const DistributorBucketSpace& bucket_space, const DistributorNodeContext& node_ctx, - PersistenceOperationMetricSet& metric, + PersistenceOperationMetricSet& condition_probe_metrics, uint32_t trace_level, private_ctor_tag) : _doc_id_bucket(bucket), @@ -66,7 +66,8 @@ CheckCondition::CheckCondition(const document::Bucket& bucket, _node_ctx(node_ctx), _cluster_state_version_at_creation_time(_bucket_space.getClusterState().getVersion()), _cond_get_op(), - _sent_message_map() + _sent_message_map(), + _outcome() { // Condition checks only return metadata back to the distributor and thus have an empty fieldset. // Side note: the BucketId provided to the GetCommand is ignored; GetOperation computes explicitly from the doc ID. @@ -75,8 +76,8 @@ CheckCondition::CheckCondition(const document::Bucket& bucket, get_cmd->getTrace().setLevel(trace_level); _cond_get_op = std::make_shared<GetOperation>(_node_ctx, _bucket_space, _bucket_space.getBucketDatabase().acquire_read_guard(), - std::move(get_cmd), - metric, api::InternalReadConsistency::Strong); + std::move(get_cmd), condition_probe_metrics, + api::InternalReadConsistency::Strong); } CheckCondition::~CheckCondition() = default; @@ -126,6 +127,10 @@ bool CheckCondition::replica_set_changed_after_get_operation() const { return (replicas_in_db_now != _cond_get_op->replicas_in_db()); } +bool CheckCondition::distributor_no_longer_owns_bucket() const { + return !_bucket_space.check_ownership_in_pending_and_current_state(_doc_id_bucket.getBucketId()).isOwned(); +} + CheckCondition::Outcome::Result CheckCondition::newest_replica_to_outcome(const std::optional<NewestReplica>& newest) noexcept { if (!newest) { @@ -158,9 +163,13 @@ void CheckCondition::handle_internal_get_operation_reply(std::shared_ptr<api::St reply->steal_trace()); return; } - const auto state_version_now = _bucket_space.getClusterState().getVersion(); + auto state_version_now = _bucket_space.getClusterState().getVersion(); + if (_bucket_space.has_pending_cluster_state()) { + state_version_now = _bucket_space.get_pending_cluster_state().getVersion(); + } if ((state_version_now != _cluster_state_version_at_creation_time) - && replica_set_changed_after_get_operation()) + && (replica_set_changed_after_get_operation() + || distributor_no_longer_owns_bucket())) { // BUCKET_NOT_FOUND is semantically (usually) inaccurate here, but it's what we use for this purpose // in existing operations. Checking the replica set will implicitly check for ownership changes, @@ -220,7 +229,7 @@ CheckCondition::create_if_inconsistent_replicas(const document::Bucket& bucket, const documentapi::TestAndSetCondition& tas_condition, const DistributorNodeContext& node_ctx, const DistributorStripeOperationContext& op_ctx, - PersistenceOperationMetricSet& metric, + PersistenceOperationMetricSet& condition_probe_metrics, uint32_t trace_level) { // TODO move this check to the caller? @@ -237,8 +246,8 @@ CheckCondition::create_if_inconsistent_replicas(const document::Bucket& bucket, if (!all_nodes_support_document_condition_probe(entries, op_ctx)) { return {}; // Want write-repair, but one or more nodes are too old to use the feature } - return std::make_shared<CheckCondition>(bucket, doc_id, tas_condition, bucket_space, - node_ctx, metric, trace_level, private_ctor_tag{}); + return std::make_shared<CheckCondition>(bucket, doc_id, tas_condition, bucket_space, node_ctx, + condition_probe_metrics, trace_level, private_ctor_tag{}); } } diff --git a/storage/src/vespa/storage/distributor/operations/external/check_condition.h b/storage/src/vespa/storage/distributor/operations/external/check_condition.h index 062c9bb831d..999b79adc3d 100644 --- a/storage/src/vespa/storage/distributor/operations/external/check_condition.h +++ b/storage/src/vespa/storage/distributor/operations/external/check_condition.h @@ -114,7 +114,7 @@ public: const documentapi::TestAndSetCondition& tas_condition, const DistributorBucketSpace& bucket_space, const DistributorNodeContext& node_ctx, - PersistenceOperationMetricSet& metric, + PersistenceOperationMetricSet& condition_probe_metrics, uint32_t trace_level, private_ctor_tag); ~CheckCondition(); @@ -135,10 +135,11 @@ public: const documentapi::TestAndSetCondition& tas_condition, const DistributorNodeContext& node_ctx, const DistributorStripeOperationContext& op_ctx, - PersistenceOperationMetricSet& metric, + PersistenceOperationMetricSet& condition_probe_metrics, uint32_t trace_level); private: [[nodiscard]] bool replica_set_changed_after_get_operation() const; + [[nodiscard]] bool distributor_no_longer_owns_bucket() const; void handle_internal_get_operation_reply(std::shared_ptr<api::StorageReply> reply); diff --git a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp index 952aeff0800..8c6fdb314f3 100644 --- a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp @@ -26,6 +26,7 @@ PutOperation::PutOperation(const DistributorNodeContext& node_ctx, DistributorBucketSpace& bucket_space, std::shared_ptr<api::PutCommand> msg, PersistenceOperationMetricSet& metric, + PersistenceOperationMetricSet& condition_probe_metrics, SequencingHandle sequencing_handle) : SequencedOperation(std::move(sequencing_handle)), _tracker_instance(metric, std::make_shared<api::PutReply>(*msg), node_ctx, op_ctx, msg->getTimestamp()), @@ -34,7 +35,7 @@ PutOperation::PutOperation(const DistributorNodeContext& node_ctx, _doc_id_bucket_id(document::BucketIdFactory{}.getBucketId(_msg->getDocumentId())), _node_ctx(node_ctx), _op_ctx(op_ctx), - _temp_metric(metric), // TODO + _condition_probe_metrics(condition_probe_metrics), _bucket_space(bucket_space) { } @@ -156,7 +157,7 @@ void PutOperation::start_conditional_put(DistributorStripeMessageSender& sender) document::Bucket bucket(_msg->getBucket().getBucketSpace(), _doc_id_bucket_id); _check_condition = CheckCondition::create_if_inconsistent_replicas(bucket, _bucket_space, _msg->getDocumentId(), _msg->getCondition(), _node_ctx, _op_ctx, - _temp_metric, _msg->getTrace().getLevel()); + _condition_probe_metrics, _msg->getTrace().getLevel()); if (!_check_condition) { start_direct_put_dispatch(sender); } else { diff --git a/storage/src/vespa/storage/distributor/operations/external/putoperation.h b/storage/src/vespa/storage/distributor/operations/external/putoperation.h index 6befb8d3e38..635accc1865 100644 --- a/storage/src/vespa/storage/distributor/operations/external/putoperation.h +++ b/storage/src/vespa/storage/distributor/operations/external/putoperation.h @@ -28,6 +28,7 @@ public: DistributorBucketSpace& bucketSpace, std::shared_ptr<api::PutCommand> msg, PersistenceOperationMetricSet& metric, + PersistenceOperationMetricSet& condition_probe_metrics, SequencingHandle sequencingHandle = SequencingHandle()); ~PutOperation() override; @@ -44,7 +45,7 @@ private: document::BucketId _doc_id_bucket_id; const DistributorNodeContext& _node_ctx; DistributorStripeOperationContext& _op_ctx; - PersistenceOperationMetricSet& _temp_metric; + PersistenceOperationMetricSet& _condition_probe_metrics; DistributorBucketSpace& _bucket_space; std::shared_ptr<CheckCondition> _check_condition; diff --git a/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp index 59ae4120fd6..96182b0744f 100644 --- a/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp @@ -16,6 +16,7 @@ RemoveOperation::RemoveOperation(const DistributorNodeContext& node_ctx, DistributorBucketSpace& bucketSpace, std::shared_ptr<api::RemoveCommand> msg, PersistenceOperationMetricSet& metric, + PersistenceOperationMetricSet& condition_probe_metrics, SequencingHandle sequencingHandle) : SequencedOperation(std::move(sequencingHandle)), _tracker_instance(metric, @@ -26,7 +27,7 @@ RemoveOperation::RemoveOperation(const DistributorNodeContext& node_ctx, _doc_id_bucket_id(document::BucketIdFactory{}.getBucketId(_msg->getDocumentId())), _node_ctx(node_ctx), _op_ctx(op_ctx), - _temp_metric(metric), // TODO + _condition_probe_metrics(condition_probe_metrics), _bucket_space(bucketSpace), _check_condition() { @@ -48,7 +49,7 @@ void RemoveOperation::start_conditional_remove(DistributorStripeMessageSender& s document::Bucket bucket(_msg->getBucket().getBucketSpace(), _doc_id_bucket_id); _check_condition = CheckCondition::create_if_inconsistent_replicas(bucket, _bucket_space, _msg->getDocumentId(), _msg->getCondition(), _node_ctx, _op_ctx, - _temp_metric, _msg->getTrace().getLevel()); + _condition_probe_metrics, _msg->getTrace().getLevel()); if (!_check_condition) { start_direct_remove_dispatch(sender); } else { diff --git a/storage/src/vespa/storage/distributor/operations/external/removeoperation.h b/storage/src/vespa/storage/distributor/operations/external/removeoperation.h index 349a6182937..9f3a98294ea 100644 --- a/storage/src/vespa/storage/distributor/operations/external/removeoperation.h +++ b/storage/src/vespa/storage/distributor/operations/external/removeoperation.h @@ -19,6 +19,7 @@ public: DistributorBucketSpace& bucketSpace, std::shared_ptr<api::RemoveCommand> msg, PersistenceOperationMetricSet& metric, + PersistenceOperationMetricSet& condition_probe_metrics, SequencingHandle sequencingHandle = SequencingHandle()); ~RemoveOperation() override; @@ -36,7 +37,7 @@ private: document::BucketId _doc_id_bucket_id; const DistributorNodeContext& _node_ctx; DistributorStripeOperationContext& _op_ctx; - PersistenceOperationMetricSet& _temp_metric; + PersistenceOperationMetricSet& _condition_probe_metrics; DistributorBucketSpace& _bucket_space; std::shared_ptr<CheckCondition> _check_condition; diff --git a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp index 0cb4b223c11..73c65f54b21 100644 --- a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp @@ -34,6 +34,7 @@ TwoPhaseUpdateOperation::TwoPhaseUpdateOperation( : SequencedOperation(std::move(sequencingHandle)), _updateMetric(metrics.updates), _putMetric(metrics.update_puts), + _put_condition_probe_metrics(metrics.put_condition_probes), // Updates never trigger put write repair, so we sneakily use a ref to someone else _getMetric(metrics.update_gets), _metadata_get_metrics(metrics.update_metadata_gets), _updateCmd(std::move(msg)), @@ -263,7 +264,7 @@ TwoPhaseUpdateOperation::schedulePutsWithUpdatedDocument(std::shared_ptr<documen document::Bucket bucket(_updateCmd->getBucket().getBucketSpace(), document::BucketId(0)); auto put = std::make_shared<api::PutCommand>(bucket, doc, putTimestamp); copyMessageSettings(*_updateCmd, *put); - auto putOperation = std::make_shared<PutOperation>(_node_ctx, _op_ctx, _bucketSpace, std::move(put), _putMetric); + auto putOperation = std::make_shared<PutOperation>(_node_ctx, _op_ctx, _bucketSpace, std::move(put), _putMetric, _put_condition_probe_metrics); PutOperation & op = *putOperation; IntermediateMessageSender intermediate(_sentMessageMap, std::move(putOperation), sender); op.start(intermediate, _node_ctx.clock().getSystemTime()); diff --git a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h index 486ed766510..d2ad5359fa6 100644 --- a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h +++ b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h @@ -139,6 +139,7 @@ private: UpdateMetricSet& _updateMetric; PersistenceOperationMetricSet& _putMetric; + PersistenceOperationMetricSet& _put_condition_probe_metrics; PersistenceOperationMetricSet& _getMetric; PersistenceOperationMetricSet& _metadata_get_metrics; std::shared_ptr<api::UpdateCommand> _updateCmd; diff --git a/storage/src/vespa/storage/distributor/persistence_operation_metric_set.cpp b/storage/src/vespa/storage/distributor/persistence_operation_metric_set.cpp index 944b4bafa0a..e66884c4060 100644 --- a/storage/src/vespa/storage/distributor/persistence_operation_metric_set.cpp +++ b/storage/src/vespa/storage/distributor/persistence_operation_metric_set.cpp @@ -58,8 +58,8 @@ PersistenceFailuresMetricSet::clone(std::vector<Metric::UP>& ownerList, CopyType if (copyType == INACTIVE) { return MetricSet::clone(ownerList, INACTIVE, owner, includeUnused); } - return (PersistenceFailuresMetricSet*) - (new PersistenceFailuresMetricSet(owner))->assignValues(*this); + return dynamic_cast<PersistenceFailuresMetricSet*>( + (new PersistenceFailuresMetricSet(owner))->assignValues(*this)); } PersistenceOperationMetricSet::PersistenceOperationMetricSet(const std::string& name, MetricSet* owner) @@ -69,6 +69,11 @@ PersistenceOperationMetricSet::PersistenceOperationMetricSet(const std::string& failures(this) { } +PersistenceOperationMetricSet::PersistenceOperationMetricSet(const std::string& name) + : PersistenceOperationMetricSet(name, nullptr) +{ +} + PersistenceOperationMetricSet::~PersistenceOperationMetricSet() = default; MetricSet * @@ -78,9 +83,8 @@ PersistenceOperationMetricSet::clone(std::vector<Metric::UP>& ownerList, CopyTyp if (copyType == INACTIVE) { return MetricSet::clone(ownerList, INACTIVE, owner, includeUnused); } - return (PersistenceOperationMetricSet*) - (new PersistenceOperationMetricSet(getName(), owner)) - ->assignValues(*this); + return dynamic_cast<PersistenceOperationMetricSet*>( + (new PersistenceOperationMetricSet(getName(), owner))->assignValues(*this)); } void diff --git a/storage/src/vespa/storage/distributor/persistence_operation_metric_set.h b/storage/src/vespa/storage/distributor/persistence_operation_metric_set.h index b818d1bdd9f..eb1c3f57252 100644 --- a/storage/src/vespa/storage/distributor/persistence_operation_metric_set.h +++ b/storage/src/vespa/storage/distributor/persistence_operation_metric_set.h @@ -40,10 +40,11 @@ class PersistenceOperationMetricSet : public metrics::MetricSet mutable std::mutex _mutex; public: metrics::DoubleAverageMetric latency; - metrics::LongCountMetric ok; + metrics::LongCountMetric ok; PersistenceFailuresMetricSet failures; - PersistenceOperationMetricSet(const std::string& name, metrics::MetricSet* owner = nullptr); + PersistenceOperationMetricSet(const std::string& name, metrics::MetricSet* owner); + explicit PersistenceOperationMetricSet(const std::string& name); ~PersistenceOperationMetricSet() override; MetricSet * clone(std::vector<Metric::UP>& ownerList, CopyType copyType, @@ -57,7 +58,6 @@ public: */ void updateFromResult(const api::ReturnCode& result); - friend class LockWrapper; class LockWrapper { std::unique_lock<std::mutex> _lock; PersistenceOperationMetricSet& _self; diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp index c0f4041e284..f6b7c7e5f0b 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp @@ -184,7 +184,8 @@ computeAllPossibleHandlerThreads(const vespa::config::content::StorFilestorConfi return cfg.numThreads + computeNumResponseThreads(cfg.numResponseThreads) + cfg.numNetworkThreads + - cfg.numVisitorThreads; + cfg.numVisitorThreads + + 1; // Async cluster state processing thread (might be a pessimization to include here...) } } diff --git a/storage/src/vespa/storage/persistence/persistenceutil.cpp b/storage/src/vespa/storage/persistence/persistenceutil.cpp index 77e8532f0d2..e8534e3f299 100644 --- a/storage/src/vespa/storage/persistence/persistenceutil.cpp +++ b/storage/src/vespa/storage/persistence/persistenceutil.cpp @@ -36,6 +36,7 @@ MessageTracker::MessageTracker(const framework::MilliSecTimer & timer, ThrottleToken throttle_token) : MessageTracker(timer, env, replySender, true, std::move(bucketLock), std::move(msg), std::move(throttle_token)) {} + MessageTracker::MessageTracker(const framework::MilliSecTimer & timer, const PersistenceUtil & env, MessageSender & replySender, @@ -90,7 +91,7 @@ MessageTracker::sendReply() { if (count_result_as_failure()) { _env._metrics.failedOperations.inc(); } - vespalib::duration duration = vespalib::from_s(_timer.getElapsedTimeAsDouble()/1000.0); + vespalib::duration duration = _timer.getElapsedTime(); if (duration >= WARN_ON_SLOW_OPERATIONS) { LOGBT(warning, _msg->getType().toString(), "Slow processing of message %s. Processing time: %1.1f s (>=%1.1f s)", diff --git a/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.cpp b/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.cpp index 9d7dd95d922..3b97ff6c018 100644 --- a/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.cpp +++ b/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.cpp @@ -28,6 +28,7 @@ ChangedBucketOwnershipHandler::ChangedBucketOwnershipHandler( _component(compReg, "changedbucketownershiphandler"), _metrics(), _configFetcher(std::make_unique<config::ConfigFetcher>(configUri.getContext())), + _state_sync_executor(1), // single thread for sequential task execution _stateLock(), _currentState(), // Not set yet, so ownership will not be valid _currentOwnership(std::make_shared<OwnershipState>( @@ -98,7 +99,7 @@ ChangedBucketOwnershipHandler::Metrics::Metrics(metrics::MetricSet* owner) idealStateOpsAborted("ideal_state_ops_aborted", {}, "Number of outdated ideal state operations aborted", this), externalLoadOpsAborted("external_load_ops_aborted", {}, "Number of outdated external load operations aborted", this) {} -ChangedBucketOwnershipHandler::Metrics::~Metrics() { } +ChangedBucketOwnershipHandler::Metrics::~Metrics() = default; ChangedBucketOwnershipHandler::OwnershipState::OwnershipState(const ContentBucketSpaceRepo &contentBucketSpaceRepo, std::shared_ptr<const lib::ClusterStateBundle> state) @@ -114,7 +115,7 @@ ChangedBucketOwnershipHandler::OwnershipState::OwnershipState(const ContentBucke } -ChangedBucketOwnershipHandler::OwnershipState::~OwnershipState() {} +ChangedBucketOwnershipHandler::OwnershipState::~OwnershipState() = default; const lib::ClusterState& @@ -235,18 +236,79 @@ ChangedBucketOwnershipHandler::makeLazyAbortPredicate( _component.getIndex())); } -/* - * If we go from: - * 1) Not all down -> all distributors down - * - abort ops for _all_ buckets - * 2) All distributors down -> not down - * - no-op, since down edge must have been handled first - * 3) All down -> all down - * - no-op - * 4) Some nodes down or up - * - abort ops for buckets that have changed ownership between - * current and new cluster state. - */ +class ChangedBucketOwnershipHandler::ClusterStateSyncAndApplyTask + : public vespalib::Executor::Task +{ + ChangedBucketOwnershipHandler& _owner; + std::shared_ptr<api::SetSystemStateCommand> _command; +public: + ClusterStateSyncAndApplyTask(ChangedBucketOwnershipHandler& owner, + std::shared_ptr<api::SetSystemStateCommand> command) noexcept + : _owner(owner), + _command(std::move(command)) + {} + + /* + * If we go from: + * 1) Not all down -> all distributors down + * - abort ops for _all_ buckets + * 2) All distributors down -> not down + * - no-op, since down edge must have been handled first + * 3) All down -> all down + * - no-op + * 4) Some nodes down or up + * - abort ops for buckets that have changed ownership between + * current and new cluster state. + */ + void run() override { + OwnershipState::CSP old_ownership; + OwnershipState::CSP new_ownership; + // Update the ownership state inspected by all bucket-mutating operations passing through + // this component so that messages from outdated distributors will be rejected. Note that + // this is best-effort; with our current multitude of RPC threads directly dispatching + // operations into the persistence provider, it's possible for a thread carrying an outdated + // operation to have already passed the barrier, but be preempted so that it will apply the + // op _after_ the abort step has completed. + { + std::lock_guard guard(_owner._stateLock); + old_ownership = _owner._currentOwnership; + _owner.setCurrentOwnershipWithStateNoLock(_command->getClusterStateBundle()); + new_ownership = _owner._currentOwnership; + } + assert(new_ownership->valid()); + // If we're going from not having a state to having a state, we per + // definition cannot possibly have gotten any load that needs aborting, + // as no such load is allowed through this component when this is the + // case. + if (!old_ownership->valid()) { + return _owner.sendDown(_command); + } + + if (allDistributorsDownInState(old_ownership->getBaselineState())) { + LOG(debug, "No need to send aborts on transition '%s' -> '%s'", + old_ownership->getBaselineState().toString().c_str(), + new_ownership->getBaselineState().toString().c_str()); + return _owner.sendDown(_command);; + } + _owner.logTransition(old_ownership->getBaselineState(), new_ownership->getBaselineState()); + + metrics::MetricTimer duration_timer; + auto predicate = _owner.makeLazyAbortPredicate(old_ownership, new_ownership); + auto abort_cmd = std::make_shared<AbortBucketOperationsCommand>(std::move(predicate)); + + // Will not return until all operation aborts have been performed + // on the lower level links, at which point it is safe to send down + // the SetSystemStateCommand. + _owner.sendDown(abort_cmd); + duration_timer.stop(_owner._metrics.averageAbortProcessingTime); + + // Conflicting operations have been aborted and incoming conflicting operations + // are aborted inline; send down the state command actually making the state change + // visible on the content node. + _owner.sendDown(_command); + } +}; + bool ChangedBucketOwnershipHandler::onSetSystemState( const std::shared_ptr<api::SetSystemStateCommand>& stateCmd) @@ -255,47 +317,13 @@ ChangedBucketOwnershipHandler::onSetSystemState( LOG(debug, "Operation aborting is config-disabled"); return false; // Early out. } - OwnershipState::CSP oldOwnership; - OwnershipState::CSP newOwnership; - // Get old state and update own current cluster state _before_ it is - // applied to the rest of the system. This helps ensure that no message - // can get through in the off-case that the lower level storage links - // don't apply the state immediately for some reason. - { - std::lock_guard guard(_stateLock); - oldOwnership = _currentOwnership; - setCurrentOwnershipWithStateNoLock(stateCmd->getClusterStateBundle()); - newOwnership = _currentOwnership; - } - assert(newOwnership->valid()); - // If we're going from not having a state to having a state, we per - // definition cannot possibly have gotten any load that needs aborting, - // as no such load is allowed through this component when this is the - // case. - if (!oldOwnership->valid()) { - return false; - } - - if (allDistributorsDownInState(oldOwnership->getBaselineState())) { - LOG(debug, "No need to send aborts on transition '%s' -> '%s'", - oldOwnership->getBaselineState().toString().c_str(), - newOwnership->getBaselineState().toString().c_str()); - return false; - } - logTransition(oldOwnership->getBaselineState(), newOwnership->getBaselineState()); - - metrics::MetricTimer durationTimer; - auto predicate(makeLazyAbortPredicate(oldOwnership, newOwnership)); - AbortBucketOperationsCommand::SP cmd( - new AbortBucketOperationsCommand(std::move(predicate))); - - // Will not return until all operation aborts have been performed - // on the lower level links, at which point it is safe to send down - // the SetSystemStateCommand. - sendDown(cmd); - - durationTimer.stop(_metrics.averageAbortProcessingTime); - return false; + // Dispatch to background worker. This indirection is because operations such as lid-space compaction + // may cause the implicit operation abort waiting step to block the caller for a relatively long time. + // It is very important that the executor only has 1 thread, which means this has FIFO behavior. + [[maybe_unused]] auto rejected_task = _state_sync_executor.execute(std::make_unique<ClusterStateSyncAndApplyTask>(*this, stateCmd)); + // If this fails, we have processed a message _after_ onClose has been called, which should not happen. + assert(!rejected_task); + return true; } /** @@ -411,8 +439,7 @@ ChangedBucketOwnershipHandler::onDown( const std::shared_ptr<api::StorageMessage>& msg) { if (msg->getType() == api::MessageType::SETSYSTEMSTATE) { - return onSetSystemState( - std::static_pointer_cast<api::SetSystemStateCommand>(msg)); + return onSetSystemState(std::static_pointer_cast<api::SetSystemStateCommand>(msg)); } if (!isMutatingCommandAndNeedsChecking(*msg)) { return false; @@ -451,4 +478,10 @@ ChangedBucketOwnershipHandler::onInternalReply( return (reply->getType() == AbortBucketOperationsReply::ID); } +void +ChangedBucketOwnershipHandler::onClose() +{ + _state_sync_executor.shutdown().sync(); +} + } diff --git a/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.h b/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.h index e753d96871e..8798d109955 100644 --- a/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.h +++ b/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.h @@ -10,6 +10,7 @@ #include <vespa/metrics/valuemetric.h> #include <vespa/metrics/countmetric.h> #include <vespa/metrics/metricset.h> +#include <vespa/vespalib/util/threadstackexecutor.h> #include <atomic> #include <vector> #include <unordered_map> @@ -60,26 +61,24 @@ class ChangedBucketOwnershipHandler private config::IFetcherCallback<vespa::config::content::PersistenceConfig> { public: - class Metrics : public metrics::MetricSet - { + class Metrics : public metrics::MetricSet { public: metrics::LongAverageMetric averageAbortProcessingTime; metrics::LongCountMetric idealStateOpsAborted; metrics::LongCountMetric externalLoadOpsAborted; - Metrics(metrics::MetricSet* owner = 0); - ~Metrics(); + explicit Metrics(metrics::MetricSet* owner = nullptr); + ~Metrics() override; }; /** * Wrapper around the distribution & state pairs that decides how to * compute the owner distributor for a bucket. It's possible to have * an ownership state with a nullptr cluster state when the node - * initially starts up, which is why no owership state must be used unless + * initially starts up, which is why no ownership state must be used unless * invoking valid() on it returns true. */ - class OwnershipState - { + class OwnershipState { using BucketSpace = document::BucketSpace; std::unordered_map<BucketSpace, std::shared_ptr<const lib::Distribution>, BucketSpace::hash> _distributions; std::shared_ptr<const lib::ClusterStateBundle> _state; @@ -93,7 +92,7 @@ public: static const uint16_t FAILED_TO_RESOLVE = 0xffff; - bool valid() const { + [[nodiscard]] bool valid() const noexcept { return (!_distributions.empty() && _state); } @@ -114,16 +113,21 @@ public: void reloadClusterState(); private: - ServiceLayerComponent _component; - Metrics _metrics; - std::unique_ptr<config::ConfigFetcher> _configFetcher; - mutable std::mutex _stateLock; - std::shared_ptr<const lib::ClusterStateBundle> _currentState; - OwnershipState::CSP _currentOwnership; - - std::atomic<bool> _abortQueuedAndPendingOnStateChange; - std::atomic<bool> _abortMutatingIdealStateOps; - std::atomic<bool> _abortMutatingExternalLoadOps; + class ClusterStateSyncAndApplyTask; + + using ConfigFetcherUP = std::unique_ptr<config::ConfigFetcher>; + using ClusterStateBundleCSP = std::shared_ptr<const lib::ClusterStateBundle>; + + ServiceLayerComponent _component; + Metrics _metrics; + ConfigFetcherUP _configFetcher; + vespalib::ThreadStackExecutor _state_sync_executor; + mutable std::mutex _stateLock; + ClusterStateBundleCSP _currentState; + OwnershipState::CSP _currentOwnership; + std::atomic<bool> _abortQueuedAndPendingOnStateChange; + std::atomic<bool> _abortMutatingIdealStateOps; + std::atomic<bool> _abortMutatingExternalLoadOps; std::unique_ptr<AbortBucketOperationsCommand::AbortPredicate> makeLazyAbortPredicate( @@ -183,14 +187,12 @@ private: public: ChangedBucketOwnershipHandler(const config::ConfigUri& configUri, ServiceLayerComponentRegister& compReg); - ~ChangedBucketOwnershipHandler(); + ~ChangedBucketOwnershipHandler() override; - bool onSetSystemState( - const std::shared_ptr<api::SetSystemStateCommand>&) override; + bool onSetSystemState(const std::shared_ptr<api::SetSystemStateCommand>&) override; bool onDown(const std::shared_ptr<api::StorageMessage>&) override; - - bool onInternalReply( - const std::shared_ptr<api::InternalReply>& reply) override; + bool onInternalReply(const std::shared_ptr<api::InternalReply>& reply) override; + void onClose() override; void configure(std::unique_ptr<vespa::config::content::PersistenceConfig>) override; |