diff options
Diffstat (limited to 'storage')
8 files changed, 193 insertions, 12 deletions
diff --git a/storage/src/tests/distributor/distributortest.cpp b/storage/src/tests/distributor/distributortest.cpp index 2944348e639..5a5ebb8c823 100644 --- a/storage/src/tests/distributor/distributortest.cpp +++ b/storage/src/tests/distributor/distributortest.cpp @@ -1254,4 +1254,61 @@ TEST_F(DistributorTest, wanted_split_bit_count_is_lower_bounded) { EXPECT_EQ(getConfig().getMinimalBucketSplit(), 8); } +TEST_F(DistributorTest, host_info_sent_immediately_once_all_stripes_first_reported) { + set_num_distributor_stripes(3); + createLinks(); + getClock().setAbsoluteTimeInSeconds(1000); + // TODO STRIPE can't call this currently since it touches the bucket DB updater directly: + // setupDistributor(Redundancy(2), NodeCount(2), "version:1 distributor:1 storage:2"); + + tickDistributorNTimes(1); + EXPECT_EQ(0, explicit_node_state_reply_send_invocations()); // Nothing yet + getDistributor().notify_stripe_wants_to_send_host_info(1); + getDistributor().notify_stripe_wants_to_send_host_info(2); + + tickDistributorNTimes(1); + // Still nothing. Missing initial report from stripe 0 + EXPECT_EQ(0, explicit_node_state_reply_send_invocations()); + + getDistributor().notify_stripe_wants_to_send_host_info(0); + tickDistributorNTimes(1); + // All stripes have reported in, it's time to party! + EXPECT_EQ(1, explicit_node_state_reply_send_invocations()); + + // No further sends if stripes haven't requested it yet. + getClock().setAbsoluteTimeInSeconds(2000); + tickDistributorNTimes(10); + EXPECT_EQ(1, explicit_node_state_reply_send_invocations()); +} + +// TODO STRIPE make delay configurable instead of hardcoded +TEST_F(DistributorTest, non_bootstrap_host_info_send_request_delays_sending) { + set_num_distributor_stripes(3); + createLinks(); + getClock().setAbsoluteTimeInSeconds(1000); + + for (uint16_t i = 0; i < 3; ++i) { + getDistributor().notify_stripe_wants_to_send_host_info(i); + } + tickDistributorNTimes(1); + // Bootstrap case + EXPECT_EQ(1, explicit_node_state_reply_send_invocations()); + + // Stripe 1 suddenly really wants to tell the cluster controller something again + getDistributor().notify_stripe_wants_to_send_host_info(1); + tickDistributorNTimes(1); + // But its cry for attention is not yet honored since the delay hasn't passed. + EXPECT_EQ(1, explicit_node_state_reply_send_invocations()); + + getClock().addMilliSecondsToTime(999); + tickDistributorNTimes(1); + // 1 sec delay has still not passed + EXPECT_EQ(1, explicit_node_state_reply_send_invocations()); + + getClock().addMilliSecondsToTime(1); + tickDistributorNTimes(1); + // But now it has + EXPECT_EQ(2, explicit_node_state_reply_send_invocations()); +} + } diff --git a/storage/src/tests/distributor/distributortestutil.cpp b/storage/src/tests/distributor/distributortestutil.cpp index 5d204693971..be123d6281c 100644 --- a/storage/src/tests/distributor/distributortestutil.cpp +++ b/storage/src/tests/distributor/distributortestutil.cpp @@ -16,7 +16,8 @@ using document::test::makeDocumentBucket; namespace storage::distributor { DistributorTestUtil::DistributorTestUtil() - : _messageSender(_sender, _senderDown) + : _messageSender(_sender, _senderDown), + _num_distributor_stripes(0) // TODO STRIPE change default { _config = getStandardConfig(false); } @@ -32,7 +33,7 @@ DistributorTestUtil::createLinks() _node->node_identity(), *_threadPool, *this, - 0, + _num_distributor_stripes, _hostInfo, &_messageSender)); _component.reset(new storage::DistributorComponent(_node->getComponentRegister(), "distrtestutil")); diff --git a/storage/src/tests/distributor/distributortestutil.h b/storage/src/tests/distributor/distributortestutil.h index 0ed2498a0a2..de46905c870 100644 --- a/storage/src/tests/distributor/distributortestutil.h +++ b/storage/src/tests/distributor/distributortestutil.h @@ -197,6 +197,11 @@ public: const DistributorMessageSenderStub& sender() const noexcept { return _sender; } void setSystemState(const lib::ClusterState& systemState); + + // Must be called prior to createLinks() to have any effect + void set_num_distributor_stripes(uint32_t n_stripes) noexcept { + _num_distributor_stripes = n_stripes; + } protected: vdstestlib::DirConfig _config; std::unique_ptr<TestDistributorApp> _node; @@ -221,6 +226,7 @@ protected: } }; MessageSenderImpl _messageSender; + uint32_t _num_distributor_stripes; void enableDistributorClusterState(vespalib::stringref state); void enable_distributor_cluster_state(const lib::ClusterStateBundle& state); diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp index 17fec5d329d..67b264041a3 100644 --- a/storage/src/vespa/storage/distributor/distributor.cpp +++ b/storage/src/vespa/storage/distributor/distributor.cpp @@ -25,6 +25,7 @@ #include <vespa/storageframework/generic/status/xmlstatusreporter.h> #include <vespa/vdslib/distribution/distribution.h> #include <vespa/vespalib/util/memoryusage.h> +#include <vespa/vespalib/util/time.h> #include <algorithm> #include <vespa/log/log.h> @@ -58,7 +59,7 @@ Distributor::Distributor(DistributorComponentRegister& compReg, _messageSender(messageSender), _use_legacy_mode(num_distributor_stripes == 0), _stripe(std::make_unique<DistributorStripe>(compReg, *_metrics, node_identity, threadPool, - doneInitHandler, *this, _use_legacy_mode)), + doneInitHandler, *this, *this, _use_legacy_mode)), _stripe_pool(), _stripes(), _stripe_accessor(), @@ -68,7 +69,14 @@ Distributor::Distributor(DistributorComponentRegister& compReg, _total_config(_component.total_distributor_config_sp()), _bucket_db_updater(), _distributorStatusDelegate(compReg, *this, *this), + _bucket_db_status_delegate(), _threadPool(threadPool), + _status_to_do(), + _fetched_status_requests(), + _stripe_scan_notify_mutex(), + _stripe_scan_stats(), + _last_host_info_send_time(), + _host_info_send_delay(1000ms), _tickResult(framework::ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN), _metricUpdateHook(*this), _hostInfoReporter(*this, *this), @@ -87,6 +95,7 @@ Distributor::Distributor(DistributorComponentRegister& compReg, _component.getDistribution(), *_stripe_accessor); _stripes.emplace_back(std::move(_stripe)); + _stripe_scan_stats.resize(num_distributor_stripes); _distributorStatusDelegate.registerStatusPage(); _bucket_db_status_delegate = std::make_unique<StatusReporterDelegate>(compReg, *this, *_bucket_db_updater); _bucket_db_status_delegate->registerStatusPage(); @@ -258,13 +267,18 @@ void Distributor::onClose() { if (_use_legacy_mode) { _stripe->flush_and_close(); } else { - { - auto guard = _stripe_accessor->rendezvous_and_hold_all(); - guard->flush_and_close(); + // Tests may run with multiple stripes but without threads (for determinism's sake), + // so only try to flush stripes if a pool is running. + // TODO STRIPE probably also need to flush when running tests to handle any explicit close-tests. + if (_stripe_pool->stripe_count() > 0){ + { + auto guard = _stripe_accessor->rendezvous_and_hold_all(); + guard->flush_and_close(); + } + // TODO STRIPE must ensure no incoming requests can be posted on stripes between close + // and pool stop+join! + _stripe_pool->stop_and_join(); } - // TODO STRIPE must ensure no incoming requests can be posted on stripes between close - // and pool stop+join! - _stripe_pool->stop_and_join(); assert(_bucket_db_updater); _bucket_db_updater->flush(); } @@ -548,6 +562,7 @@ Distributor::doNonCriticalTick(framework::ThreadIndex idx) _tickResult = framework::ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN; handle_status_requests(); process_fetched_external_messages(); + send_host_info_if_appropriate(); _bucket_db_updater->resend_delayed_messages(); } return _tickResult; @@ -575,6 +590,56 @@ Distributor::enableNextConfig() // TODO STRIPE rename to enable_next_config_if_c } } + +void +Distributor::notify_stripe_wants_to_send_host_info(uint16_t stripe_index) +{ + LOG(debug, "Stripe %u has signalled an intent to send host info out-of-band", stripe_index); + std::lock_guard lock(_stripe_scan_notify_mutex); + assert(!_use_legacy_mode); + assert(stripe_index < _stripe_scan_stats.size()); + auto& stats = _stripe_scan_stats[stripe_index]; + stats.wants_to_send_host_info = true; + stats.has_reported_in_at_least_once = true; + // TODO STRIPE consider if we want to wake up distributor thread here. Will be rechecked + // every nth millisecond anyway. Not really an issue for out-of-band CC notifications. +} + +bool +Distributor::may_send_host_info_on_behalf_of_stripes() noexcept +{ + bool any_stripe_wants_to_send = false; + for (const auto& stats : _stripe_scan_stats) { + if (!stats.has_reported_in_at_least_once) { + // If not all stripes have reported in at least once, they have not all completed their + // first recovery mode pass through their DBs. To avoid sending partial stats to the cluster + // controller, we wait with sending the first out-of-band host info reply until they have all + // reported in. + return false; + } + any_stripe_wants_to_send |= stats.wants_to_send_host_info; + } + return any_stripe_wants_to_send; +} + +void +Distributor::send_host_info_if_appropriate() +{ + const auto now = _component.getClock().getMonotonicTime(); + std::lock_guard lock(_stripe_scan_notify_mutex); + + if (may_send_host_info_on_behalf_of_stripes()) { + if ((now - _last_host_info_send_time) >= _host_info_send_delay) { + LOG(debug, "Sending GetNodeState replies to cluster controllers on behalf of stripes"); + _component.getStateUpdater().immediately_send_get_node_state_replies(); + _last_host_info_send_time = now; + for (auto& stats : _stripe_scan_stats) { + stats.wants_to_send_host_info = false; + } + } + } +} + void Distributor::fetch_status_requests() { diff --git a/storage/src/vespa/storage/distributor/distributor.h b/storage/src/vespa/storage/distributor/distributor.h index 4257657816a..7e5de38ffe8 100644 --- a/storage/src/vespa/storage/distributor/distributor.h +++ b/storage/src/vespa/storage/distributor/distributor.h @@ -14,6 +14,7 @@ #include "pendingmessagetracker.h" #include "statusreporterdelegate.h" #include "stripe_bucket_db_updater.h" // TODO this is temporary +#include "stripe_host_info_notifier.h" #include <vespa/config/config.h> #include <vespa/storage/common/distributorcomponent.h> #include <vespa/storage/common/doneinitializehandler.h> @@ -23,6 +24,7 @@ #include <vespa/storageapi/message/state.h> #include <vespa/storageframework/generic/metric/metricupdatehook.h> #include <vespa/storageframework/generic/thread/tickingthread.h> +#include <chrono> #include <queue> #include <unordered_map> @@ -54,7 +56,8 @@ class Distributor final public framework::StatusReporter, public framework::TickingThread, public MinReplicaProvider, - public BucketSpacesStatsProvider + public BucketSpacesStatsProvider, + public StripeHostInfoNotifier { public: Distributor(DistributorComponentRegister&, @@ -97,6 +100,10 @@ public: virtual framework::ThreadWaitInfo doCriticalTick(framework::ThreadIndex) override; virtual framework::ThreadWaitInfo doNonCriticalTick(framework::ThreadIndex) override; + // Called by DistributorStripe threads when they want to notify the cluster controller of changed stats. + // Thread safe. + void notify_stripe_wants_to_send_host_info(uint16_t stripe_index) override; + class MetricUpdateHook : public framework::MetricUpdateHook { public: @@ -177,6 +184,14 @@ private: void dispatch_to_main_distributor_thread_queue(const std::shared_ptr<api::StorageMessage>& msg); void fetch_external_messages(); void process_fetched_external_messages(); + void send_host_info_if_appropriate(); + // Precondition: _stripe_scan_notify_mutex is held + [[nodiscard]] bool may_send_host_info_on_behalf_of_stripes() noexcept; + + struct StripeScanStats { + bool wants_to_send_host_info = false; + bool has_reported_in_at_least_once = false; + }; using MessageQueue = std::vector<std::shared_ptr<api::StorageMessage>>; @@ -199,6 +214,10 @@ private: framework::TickingThreadPool& _threadPool; mutable std::vector<std::shared_ptr<DistributorStatus>> _status_to_do; mutable std::vector<std::shared_ptr<DistributorStatus>> _fetched_status_requests; + mutable std::mutex _stripe_scan_notify_mutex; + std::vector<StripeScanStats> _stripe_scan_stats; // Indices are 1-1 with _stripes entries + std::chrono::steady_clock::time_point _last_host_info_send_time; + std::chrono::milliseconds _host_info_send_delay; framework::ThreadWaitInfo _tickResult; MetricUpdateHook _metricUpdateHook; DistributorHostInfoReporter _hostInfoReporter; diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.cpp b/storage/src/vespa/storage/distributor/distributor_stripe.cpp index 1bd5fe20074..668930e65ee 100644 --- a/storage/src/vespa/storage/distributor/distributor_stripe.cpp +++ b/storage/src/vespa/storage/distributor/distributor_stripe.cpp @@ -6,6 +6,7 @@ #include "distributor_bucket_space.h" #include "distributormetricsset.h" #include "idealstatemetricsset.h" +#include "stripe_host_info_notifier.h" #include "operation_sequencer.h" #include "ownership_transfer_safe_time_point_calculator.h" #include "throttlingoperationstarter.h" @@ -39,6 +40,7 @@ DistributorStripe::DistributorStripe(DistributorComponentRegister& compReg, framework::TickingThreadPool& threadPool, DoneInitializeHandler& doneInitHandler, ChainedMessageSender& messageSender, + StripeHostInfoNotifier& stripe_host_info_notifier, bool use_legacy_mode) : DistributorStripeInterface(), framework::StatusReporter("distributor", "Distributor"), @@ -57,6 +59,7 @@ DistributorStripe::DistributorStripe(DistributorComponentRegister& compReg, _bucketDBStatusDelegate(compReg, *this, _bucketDBUpdater), _idealStateManager(*this, *_bucketSpaceRepo, *_readOnlyBucketSpaceRepo, compReg), _messageSender(messageSender), + _stripe_host_info_notifier(stripe_host_info_notifier), _externalOperationHandler(_component, _component, getMetrics(), getMessageSender(), *_operation_sequencer, *this, _component, _idealStateManager, _operationOwner), @@ -736,8 +739,11 @@ DistributorStripe::scanNextBucket() void DistributorStripe::send_updated_host_info_if_required() { if (_must_send_updated_host_info) { - // TODO STRIPE how to handle with multiple stripes? - _component.getStateUpdater().immediately_send_get_node_state_replies(); + if (_use_legacy_mode) { + _component.getStateUpdater().immediately_send_get_node_state_replies(); + } else { + _stripe_host_info_notifier.notify_stripe_wants_to_send_host_info(0); // TODO STRIPE correct stripe index! + } _must_send_updated_host_info = false; } } diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.h b/storage/src/vespa/storage/distributor/distributor_stripe.h index efded7d29d5..475defbf105 100644 --- a/storage/src/vespa/storage/distributor/distributor_stripe.h +++ b/storage/src/vespa/storage/distributor/distributor_stripe.h @@ -41,6 +41,7 @@ class DistributorBucketSpaceRepo; class OperationSequencer; class OwnershipTransferSafeTimePointCalculator; class SimpleMaintenanceScanner; +class StripeHostInfoNotifier; class ThrottlingOperationStarter; /** @@ -62,6 +63,7 @@ public: framework::TickingThreadPool&, DoneInitializeHandler&, ChainedMessageSender& messageSender, + StripeHostInfoNotifier& stripe_host_info_notifier, bool use_legacy_mode); ~DistributorStripe() override; @@ -283,6 +285,7 @@ private: StatusReporterDelegate _bucketDBStatusDelegate; IdealStateManager _idealStateManager; ChainedMessageSender& _messageSender; + StripeHostInfoNotifier& _stripe_host_info_notifier; ExternalOperationHandler _externalOperationHandler; std::shared_ptr<lib::Distribution> _distribution; diff --git a/storage/src/vespa/storage/distributor/stripe_host_info_notifier.h b/storage/src/vespa/storage/distributor/stripe_host_info_notifier.h new file mode 100644 index 00000000000..3f10188827a --- /dev/null +++ b/storage/src/vespa/storage/distributor/stripe_host_info_notifier.h @@ -0,0 +1,24 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <cstdint> + +namespace storage::distributor { + +/** + * Used by stripes to signal that the distributor node should immediately respond to + * any pending GetNodeState long-poll RPCs from the cluster controller. This is generally + * done when a stripe has completed initializing or if all merging has completed for + * a bucket space. + * + * Implementations of this interface may batch and/or throttle actual host info sends, + * but shall attempt to send new host info within a reasonable amount of time (on the + * order of seconds). + */ +class StripeHostInfoNotifier { +public: + virtual ~StripeHostInfoNotifier() = default; + virtual void notify_stripe_wants_to_send_host_info(uint16_t stripe_index) = 0; +}; + +} |