summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2021-05-10 16:45:07 +0200
committerGitHub <noreply@github.com>2021-05-10 16:45:07 +0200
commit22b4ff81bd1cb4ce254fcf0150088e628d281688 (patch)
tree433edc77ff9b09c4b40b28dd266257f124c69167
parent87117615807b157cbbad5deadbd45028bff66be3 (diff)
parent3adf24a4a850035770f2d902756bf356ae768928 (diff)
Merge pull request #17802 from vespa-engine/vekterli/batch-explicit-host-info-sends-from-stripes
Add timed batching of explicit host info sends triggered by stripes [run-systemtest]
-rw-r--r--storage/src/tests/distributor/distributortest.cpp57
-rw-r--r--storage/src/tests/distributor/distributortestutil.cpp5
-rw-r--r--storage/src/tests/distributor/distributortestutil.h6
-rw-r--r--storage/src/vespa/storage/distributor/distributor.cpp79
-rw-r--r--storage/src/vespa/storage/distributor/distributor.h21
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe.cpp10
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe.h3
-rw-r--r--storage/src/vespa/storage/distributor/stripe_host_info_notifier.h24
8 files changed, 193 insertions, 12 deletions
diff --git a/storage/src/tests/distributor/distributortest.cpp b/storage/src/tests/distributor/distributortest.cpp
index 2944348e639..5a5ebb8c823 100644
--- a/storage/src/tests/distributor/distributortest.cpp
+++ b/storage/src/tests/distributor/distributortest.cpp
@@ -1254,4 +1254,61 @@ TEST_F(DistributorTest, wanted_split_bit_count_is_lower_bounded) {
EXPECT_EQ(getConfig().getMinimalBucketSplit(), 8);
}
+TEST_F(DistributorTest, host_info_sent_immediately_once_all_stripes_first_reported) {
+ set_num_distributor_stripes(3);
+ createLinks();
+ getClock().setAbsoluteTimeInSeconds(1000);
+ // TODO STRIPE can't call this currently since it touches the bucket DB updater directly:
+ // setupDistributor(Redundancy(2), NodeCount(2), "version:1 distributor:1 storage:2");
+
+ tickDistributorNTimes(1);
+ EXPECT_EQ(0, explicit_node_state_reply_send_invocations()); // Nothing yet
+ getDistributor().notify_stripe_wants_to_send_host_info(1);
+ getDistributor().notify_stripe_wants_to_send_host_info(2);
+
+ tickDistributorNTimes(1);
+ // Still nothing. Missing initial report from stripe 0
+ EXPECT_EQ(0, explicit_node_state_reply_send_invocations());
+
+ getDistributor().notify_stripe_wants_to_send_host_info(0);
+ tickDistributorNTimes(1);
+ // All stripes have reported in, it's time to party!
+ EXPECT_EQ(1, explicit_node_state_reply_send_invocations());
+
+ // No further sends if stripes haven't requested it yet.
+ getClock().setAbsoluteTimeInSeconds(2000);
+ tickDistributorNTimes(10);
+ EXPECT_EQ(1, explicit_node_state_reply_send_invocations());
+}
+
+// TODO STRIPE make delay configurable instead of hardcoded
+TEST_F(DistributorTest, non_bootstrap_host_info_send_request_delays_sending) {
+ set_num_distributor_stripes(3);
+ createLinks();
+ getClock().setAbsoluteTimeInSeconds(1000);
+
+ for (uint16_t i = 0; i < 3; ++i) {
+ getDistributor().notify_stripe_wants_to_send_host_info(i);
+ }
+ tickDistributorNTimes(1);
+ // Bootstrap case
+ EXPECT_EQ(1, explicit_node_state_reply_send_invocations());
+
+ // Stripe 1 suddenly really wants to tell the cluster controller something again
+ getDistributor().notify_stripe_wants_to_send_host_info(1);
+ tickDistributorNTimes(1);
+ // But its cry for attention is not yet honored since the delay hasn't passed.
+ EXPECT_EQ(1, explicit_node_state_reply_send_invocations());
+
+ getClock().addMilliSecondsToTime(999);
+ tickDistributorNTimes(1);
+ // 1 sec delay has still not passed
+ EXPECT_EQ(1, explicit_node_state_reply_send_invocations());
+
+ getClock().addMilliSecondsToTime(1);
+ tickDistributorNTimes(1);
+ // But now it has
+ EXPECT_EQ(2, explicit_node_state_reply_send_invocations());
+}
+
}
diff --git a/storage/src/tests/distributor/distributortestutil.cpp b/storage/src/tests/distributor/distributortestutil.cpp
index 5d204693971..be123d6281c 100644
--- a/storage/src/tests/distributor/distributortestutil.cpp
+++ b/storage/src/tests/distributor/distributortestutil.cpp
@@ -16,7 +16,8 @@ using document::test::makeDocumentBucket;
namespace storage::distributor {
DistributorTestUtil::DistributorTestUtil()
- : _messageSender(_sender, _senderDown)
+ : _messageSender(_sender, _senderDown),
+ _num_distributor_stripes(0) // TODO STRIPE change default
{
_config = getStandardConfig(false);
}
@@ -32,7 +33,7 @@ DistributorTestUtil::createLinks()
_node->node_identity(),
*_threadPool,
*this,
- 0,
+ _num_distributor_stripes,
_hostInfo,
&_messageSender));
_component.reset(new storage::DistributorComponent(_node->getComponentRegister(), "distrtestutil"));
diff --git a/storage/src/tests/distributor/distributortestutil.h b/storage/src/tests/distributor/distributortestutil.h
index 0ed2498a0a2..de46905c870 100644
--- a/storage/src/tests/distributor/distributortestutil.h
+++ b/storage/src/tests/distributor/distributortestutil.h
@@ -197,6 +197,11 @@ public:
const DistributorMessageSenderStub& sender() const noexcept { return _sender; }
void setSystemState(const lib::ClusterState& systemState);
+
+ // Must be called prior to createLinks() to have any effect
+ void set_num_distributor_stripes(uint32_t n_stripes) noexcept {
+ _num_distributor_stripes = n_stripes;
+ }
protected:
vdstestlib::DirConfig _config;
std::unique_ptr<TestDistributorApp> _node;
@@ -221,6 +226,7 @@ protected:
}
};
MessageSenderImpl _messageSender;
+ uint32_t _num_distributor_stripes;
void enableDistributorClusterState(vespalib::stringref state);
void enable_distributor_cluster_state(const lib::ClusterStateBundle& state);
diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp
index 17fec5d329d..f7ecd324a51 100644
--- a/storage/src/vespa/storage/distributor/distributor.cpp
+++ b/storage/src/vespa/storage/distributor/distributor.cpp
@@ -25,6 +25,7 @@
#include <vespa/storageframework/generic/status/xmlstatusreporter.h>
#include <vespa/vdslib/distribution/distribution.h>
#include <vespa/vespalib/util/memoryusage.h>
+#include <vespa/vespalib/util/time.h>
#include <algorithm>
#include <vespa/log/log.h>
@@ -58,7 +59,7 @@ Distributor::Distributor(DistributorComponentRegister& compReg,
_messageSender(messageSender),
_use_legacy_mode(num_distributor_stripes == 0),
_stripe(std::make_unique<DistributorStripe>(compReg, *_metrics, node_identity, threadPool,
- doneInitHandler, *this, _use_legacy_mode)),
+ doneInitHandler, *this, *this, _use_legacy_mode)),
_stripe_pool(),
_stripes(),
_stripe_accessor(),
@@ -68,7 +69,14 @@ Distributor::Distributor(DistributorComponentRegister& compReg,
_total_config(_component.total_distributor_config_sp()),
_bucket_db_updater(),
_distributorStatusDelegate(compReg, *this, *this),
+ _bucket_db_status_delegate(),
_threadPool(threadPool),
+ _status_to_do(),
+ _fetched_status_requests(),
+ _stripe_scan_notify_mutex(),
+ _stripe_scan_stats(),
+ _last_host_info_send_time(),
+ _host_info_send_delay(1000ms),
_tickResult(framework::ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN),
_metricUpdateHook(*this),
_hostInfoReporter(*this, *this),
@@ -87,6 +95,7 @@ Distributor::Distributor(DistributorComponentRegister& compReg,
_component.getDistribution(),
*_stripe_accessor);
_stripes.emplace_back(std::move(_stripe));
+ _stripe_scan_stats.resize(num_distributor_stripes);
_distributorStatusDelegate.registerStatusPage();
_bucket_db_status_delegate = std::make_unique<StatusReporterDelegate>(compReg, *this, *_bucket_db_updater);
_bucket_db_status_delegate->registerStatusPage();
@@ -258,13 +267,18 @@ void Distributor::onClose() {
if (_use_legacy_mode) {
_stripe->flush_and_close();
} else {
- {
- auto guard = _stripe_accessor->rendezvous_and_hold_all();
- guard->flush_and_close();
+ // Tests may run with multiple stripes but without threads (for determinism's sake),
+ // so only try to flush stripes if a pool is running.
+ // TODO STRIPE probably also need to flush when running tests to handle any explicit close-tests.
+ if (_stripe_pool->stripe_count() > 0){
+ {
+ auto guard = _stripe_accessor->rendezvous_and_hold_all();
+ guard->flush_and_close();
+ }
+ // TODO STRIPE must ensure no incoming requests can be posted on stripes between close
+ // and pool stop+join!
+ _stripe_pool->stop_and_join();
}
- // TODO STRIPE must ensure no incoming requests can be posted on stripes between close
- // and pool stop+join!
- _stripe_pool->stop_and_join();
assert(_bucket_db_updater);
_bucket_db_updater->flush();
}
@@ -548,6 +562,7 @@ Distributor::doNonCriticalTick(framework::ThreadIndex idx)
_tickResult = framework::ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN;
handle_status_requests();
process_fetched_external_messages();
+ send_host_info_if_appropriate();
_bucket_db_updater->resend_delayed_messages();
}
return _tickResult;
@@ -575,6 +590,56 @@ Distributor::enableNextConfig() // TODO STRIPE rename to enable_next_config_if_c
}
}
+
+void
+Distributor::notify_stripe_wants_to_send_host_info(uint16_t stripe_index)
+{
+ LOG(debug, "Stripe %u has signalled an intent to send host info out-of-band", stripe_index);
+ std::lock_guard lock(_stripe_scan_notify_mutex);
+ assert(!_use_legacy_mode);
+ assert(stripe_index < _stripe_scan_stats.size());
+ auto& stats = _stripe_scan_stats[stripe_index];
+ stats.wants_to_send_host_info = true;
+ stats.has_reported_in_at_least_once = true;
+ // TODO STRIPE consider if we want to wake up distributor thread here. Will be rechecked
+ // every nth millisecond anyway. Not really an issue for out-of-band CC notifications.
+}
+
+bool
+Distributor::may_send_host_info_on_behalf_of_stripes([[maybe_unused]] std::lock_guard<std::mutex>& held_lock) noexcept
+{
+ bool any_stripe_wants_to_send = false;
+ for (const auto& stats : _stripe_scan_stats) {
+ if (!stats.has_reported_in_at_least_once) {
+ // If not all stripes have reported in at least once, they have not all completed their
+ // first recovery mode pass through their DBs. To avoid sending partial stats to the cluster
+ // controller, we wait with sending the first out-of-band host info reply until they have all
+ // reported in.
+ return false;
+ }
+ any_stripe_wants_to_send |= stats.wants_to_send_host_info;
+ }
+ return any_stripe_wants_to_send;
+}
+
+void
+Distributor::send_host_info_if_appropriate()
+{
+ const auto now = _component.getClock().getMonotonicTime();
+ std::lock_guard lock(_stripe_scan_notify_mutex);
+
+ if (may_send_host_info_on_behalf_of_stripes(lock)) {
+ if ((now - _last_host_info_send_time) >= _host_info_send_delay) {
+ LOG(debug, "Sending GetNodeState replies to cluster controllers on behalf of stripes");
+ _component.getStateUpdater().immediately_send_get_node_state_replies();
+ _last_host_info_send_time = now;
+ for (auto& stats : _stripe_scan_stats) {
+ stats.wants_to_send_host_info = false;
+ }
+ }
+ }
+}
+
void
Distributor::fetch_status_requests()
{
diff --git a/storage/src/vespa/storage/distributor/distributor.h b/storage/src/vespa/storage/distributor/distributor.h
index 4257657816a..b6dbc4432eb 100644
--- a/storage/src/vespa/storage/distributor/distributor.h
+++ b/storage/src/vespa/storage/distributor/distributor.h
@@ -14,6 +14,7 @@
#include "pendingmessagetracker.h"
#include "statusreporterdelegate.h"
#include "stripe_bucket_db_updater.h" // TODO this is temporary
+#include "stripe_host_info_notifier.h"
#include <vespa/config/config.h>
#include <vespa/storage/common/distributorcomponent.h>
#include <vespa/storage/common/doneinitializehandler.h>
@@ -23,6 +24,7 @@
#include <vespa/storageapi/message/state.h>
#include <vespa/storageframework/generic/metric/metricupdatehook.h>
#include <vespa/storageframework/generic/thread/tickingthread.h>
+#include <chrono>
#include <queue>
#include <unordered_map>
@@ -54,7 +56,8 @@ class Distributor final
public framework::StatusReporter,
public framework::TickingThread,
public MinReplicaProvider,
- public BucketSpacesStatsProvider
+ public BucketSpacesStatsProvider,
+ public StripeHostInfoNotifier
{
public:
Distributor(DistributorComponentRegister&,
@@ -97,6 +100,10 @@ public:
virtual framework::ThreadWaitInfo doCriticalTick(framework::ThreadIndex) override;
virtual framework::ThreadWaitInfo doNonCriticalTick(framework::ThreadIndex) override;
+ // Called by DistributorStripe threads when they want to notify the cluster controller of changed stats.
+ // Thread safe.
+ void notify_stripe_wants_to_send_host_info(uint16_t stripe_index) override;
+
class MetricUpdateHook : public framework::MetricUpdateHook
{
public:
@@ -177,6 +184,14 @@ private:
void dispatch_to_main_distributor_thread_queue(const std::shared_ptr<api::StorageMessage>& msg);
void fetch_external_messages();
void process_fetched_external_messages();
+ void send_host_info_if_appropriate();
+ // Precondition: _stripe_scan_notify_mutex is held
+ [[nodiscard]] bool may_send_host_info_on_behalf_of_stripes(std::lock_guard<std::mutex>& held_lock) noexcept;
+
+ struct StripeScanStats {
+ bool wants_to_send_host_info = false;
+ bool has_reported_in_at_least_once = false;
+ };
using MessageQueue = std::vector<std::shared_ptr<api::StorageMessage>>;
@@ -199,6 +214,10 @@ private:
framework::TickingThreadPool& _threadPool;
mutable std::vector<std::shared_ptr<DistributorStatus>> _status_to_do;
mutable std::vector<std::shared_ptr<DistributorStatus>> _fetched_status_requests;
+ mutable std::mutex _stripe_scan_notify_mutex;
+ std::vector<StripeScanStats> _stripe_scan_stats; // Indices are 1-1 with _stripes entries
+ std::chrono::steady_clock::time_point _last_host_info_send_time;
+ std::chrono::milliseconds _host_info_send_delay;
framework::ThreadWaitInfo _tickResult;
MetricUpdateHook _metricUpdateHook;
DistributorHostInfoReporter _hostInfoReporter;
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.cpp b/storage/src/vespa/storage/distributor/distributor_stripe.cpp
index 1bd5fe20074..668930e65ee 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe.cpp
+++ b/storage/src/vespa/storage/distributor/distributor_stripe.cpp
@@ -6,6 +6,7 @@
#include "distributor_bucket_space.h"
#include "distributormetricsset.h"
#include "idealstatemetricsset.h"
+#include "stripe_host_info_notifier.h"
#include "operation_sequencer.h"
#include "ownership_transfer_safe_time_point_calculator.h"
#include "throttlingoperationstarter.h"
@@ -39,6 +40,7 @@ DistributorStripe::DistributorStripe(DistributorComponentRegister& compReg,
framework::TickingThreadPool& threadPool,
DoneInitializeHandler& doneInitHandler,
ChainedMessageSender& messageSender,
+ StripeHostInfoNotifier& stripe_host_info_notifier,
bool use_legacy_mode)
: DistributorStripeInterface(),
framework::StatusReporter("distributor", "Distributor"),
@@ -57,6 +59,7 @@ DistributorStripe::DistributorStripe(DistributorComponentRegister& compReg,
_bucketDBStatusDelegate(compReg, *this, _bucketDBUpdater),
_idealStateManager(*this, *_bucketSpaceRepo, *_readOnlyBucketSpaceRepo, compReg),
_messageSender(messageSender),
+ _stripe_host_info_notifier(stripe_host_info_notifier),
_externalOperationHandler(_component, _component, getMetrics(), getMessageSender(),
*_operation_sequencer, *this, _component,
_idealStateManager, _operationOwner),
@@ -736,8 +739,11 @@ DistributorStripe::scanNextBucket()
void DistributorStripe::send_updated_host_info_if_required() {
if (_must_send_updated_host_info) {
- // TODO STRIPE how to handle with multiple stripes?
- _component.getStateUpdater().immediately_send_get_node_state_replies();
+ if (_use_legacy_mode) {
+ _component.getStateUpdater().immediately_send_get_node_state_replies();
+ } else {
+ _stripe_host_info_notifier.notify_stripe_wants_to_send_host_info(0); // TODO STRIPE correct stripe index!
+ }
_must_send_updated_host_info = false;
}
}
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.h b/storage/src/vespa/storage/distributor/distributor_stripe.h
index efded7d29d5..475defbf105 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe.h
+++ b/storage/src/vespa/storage/distributor/distributor_stripe.h
@@ -41,6 +41,7 @@ class DistributorBucketSpaceRepo;
class OperationSequencer;
class OwnershipTransferSafeTimePointCalculator;
class SimpleMaintenanceScanner;
+class StripeHostInfoNotifier;
class ThrottlingOperationStarter;
/**
@@ -62,6 +63,7 @@ public:
framework::TickingThreadPool&,
DoneInitializeHandler&,
ChainedMessageSender& messageSender,
+ StripeHostInfoNotifier& stripe_host_info_notifier,
bool use_legacy_mode);
~DistributorStripe() override;
@@ -283,6 +285,7 @@ private:
StatusReporterDelegate _bucketDBStatusDelegate;
IdealStateManager _idealStateManager;
ChainedMessageSender& _messageSender;
+ StripeHostInfoNotifier& _stripe_host_info_notifier;
ExternalOperationHandler _externalOperationHandler;
std::shared_ptr<lib::Distribution> _distribution;
diff --git a/storage/src/vespa/storage/distributor/stripe_host_info_notifier.h b/storage/src/vespa/storage/distributor/stripe_host_info_notifier.h
new file mode 100644
index 00000000000..3f10188827a
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/stripe_host_info_notifier.h
@@ -0,0 +1,24 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <cstdint>
+
+namespace storage::distributor {
+
+/**
+ * Used by stripes to signal that the distributor node should immediately respond to
+ * any pending GetNodeState long-poll RPCs from the cluster controller. This is generally
+ * done when a stripe has completed initializing or if all merging has completed for
+ * a bucket space.
+ *
+ * Implementations of this interface may batch and/or throttle actual host info sends,
+ * but shall attempt to send new host info within a reasonable amount of time (on the
+ * order of seconds).
+ */
+class StripeHostInfoNotifier {
+public:
+ virtual ~StripeHostInfoNotifier() = default;
+ virtual void notify_stripe_wants_to_send_host_info(uint16_t stripe_index) = 0;
+};
+
+}