summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2021-05-10 12:20:43 +0000
committerTor Brede Vekterli <vekterli@verizonmedia.com>2021-05-10 12:47:32 +0000
commit2d494f501a4d342cacdb1e055e281a31e4b0d911 (patch)
treed741a9d25a5060acf6b64ea85b5c02dea8db0169
parentb296aacc11999b0ca156b7d023ec5a9f3ec2543b (diff)
Add timed batching of explicit host info sends triggered by stripes
Since distributor stripes may independently reach a conclusion that a `GetNodeState` reply containing new host info should be sent back to the cluster controller, implement basic rate limiting/batching of concurrent sends. Batching has two separate modes of operation: - If the node is initializing, host info will be sent immediately after _all_ stripes have reported in (they will always do this post-init). This is not timed, in order to minimize latency of bucket info being visible to the cluster controller. - If the node has already initialized, have a grace period of up to 1 second from the time the first stripe signals its intent to send host info until it's actually sent. This allows several stripes to complete their recovery mode and signal host info intents during this second. Batch time period is currently not configurable, may be done later if deemed useful or necessary.
-rw-r--r--storage/src/tests/distributor/distributortest.cpp57
-rw-r--r--storage/src/tests/distributor/distributortestutil.cpp5
-rw-r--r--storage/src/tests/distributor/distributortestutil.h6
-rw-r--r--storage/src/vespa/storage/distributor/distributor.cpp79
-rw-r--r--storage/src/vespa/storage/distributor/distributor.h21
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe.cpp10
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe.h3
-rw-r--r--storage/src/vespa/storage/distributor/stripe_host_info_notifier.h24
8 files changed, 193 insertions, 12 deletions
diff --git a/storage/src/tests/distributor/distributortest.cpp b/storage/src/tests/distributor/distributortest.cpp
index 2944348e639..5a5ebb8c823 100644
--- a/storage/src/tests/distributor/distributortest.cpp
+++ b/storage/src/tests/distributor/distributortest.cpp
@@ -1254,4 +1254,61 @@ TEST_F(DistributorTest, wanted_split_bit_count_is_lower_bounded) {
EXPECT_EQ(getConfig().getMinimalBucketSplit(), 8);
}
+TEST_F(DistributorTest, host_info_sent_immediately_once_all_stripes_first_reported) {
+ set_num_distributor_stripes(3);
+ createLinks();
+ getClock().setAbsoluteTimeInSeconds(1000);
+ // TODO STRIPE can't call this currently since it touches the bucket DB updater directly:
+ // setupDistributor(Redundancy(2), NodeCount(2), "version:1 distributor:1 storage:2");
+
+ tickDistributorNTimes(1);
+ EXPECT_EQ(0, explicit_node_state_reply_send_invocations()); // Nothing yet
+ getDistributor().notify_stripe_wants_to_send_host_info(1);
+ getDistributor().notify_stripe_wants_to_send_host_info(2);
+
+ tickDistributorNTimes(1);
+ // Still nothing. Missing initial report from stripe 0
+ EXPECT_EQ(0, explicit_node_state_reply_send_invocations());
+
+ getDistributor().notify_stripe_wants_to_send_host_info(0);
+ tickDistributorNTimes(1);
+ // All stripes have reported in, it's time to party!
+ EXPECT_EQ(1, explicit_node_state_reply_send_invocations());
+
+ // No further sends if stripes haven't requested it yet.
+ getClock().setAbsoluteTimeInSeconds(2000);
+ tickDistributorNTimes(10);
+ EXPECT_EQ(1, explicit_node_state_reply_send_invocations());
+}
+
+// TODO STRIPE make delay configurable instead of hardcoded
+TEST_F(DistributorTest, non_bootstrap_host_info_send_request_delays_sending) {
+ set_num_distributor_stripes(3);
+ createLinks();
+ getClock().setAbsoluteTimeInSeconds(1000);
+
+ for (uint16_t i = 0; i < 3; ++i) {
+ getDistributor().notify_stripe_wants_to_send_host_info(i);
+ }
+ tickDistributorNTimes(1);
+ // Bootstrap case
+ EXPECT_EQ(1, explicit_node_state_reply_send_invocations());
+
+ // Stripe 1 suddenly really wants to tell the cluster controller something again
+ getDistributor().notify_stripe_wants_to_send_host_info(1);
+ tickDistributorNTimes(1);
+ // But its cry for attention is not yet honored since the delay hasn't passed.
+ EXPECT_EQ(1, explicit_node_state_reply_send_invocations());
+
+ getClock().addMilliSecondsToTime(999);
+ tickDistributorNTimes(1);
+ // 1 sec delay has still not passed
+ EXPECT_EQ(1, explicit_node_state_reply_send_invocations());
+
+ getClock().addMilliSecondsToTime(1);
+ tickDistributorNTimes(1);
+ // But now it has
+ EXPECT_EQ(2, explicit_node_state_reply_send_invocations());
+}
+
}
diff --git a/storage/src/tests/distributor/distributortestutil.cpp b/storage/src/tests/distributor/distributortestutil.cpp
index 5d204693971..be123d6281c 100644
--- a/storage/src/tests/distributor/distributortestutil.cpp
+++ b/storage/src/tests/distributor/distributortestutil.cpp
@@ -16,7 +16,8 @@ using document::test::makeDocumentBucket;
namespace storage::distributor {
DistributorTestUtil::DistributorTestUtil()
- : _messageSender(_sender, _senderDown)
+ : _messageSender(_sender, _senderDown),
+ _num_distributor_stripes(0) // TODO STRIPE change default
{
_config = getStandardConfig(false);
}
@@ -32,7 +33,7 @@ DistributorTestUtil::createLinks()
_node->node_identity(),
*_threadPool,
*this,
- 0,
+ _num_distributor_stripes,
_hostInfo,
&_messageSender));
_component.reset(new storage::DistributorComponent(_node->getComponentRegister(), "distrtestutil"));
diff --git a/storage/src/tests/distributor/distributortestutil.h b/storage/src/tests/distributor/distributortestutil.h
index 0ed2498a0a2..de46905c870 100644
--- a/storage/src/tests/distributor/distributortestutil.h
+++ b/storage/src/tests/distributor/distributortestutil.h
@@ -197,6 +197,11 @@ public:
const DistributorMessageSenderStub& sender() const noexcept { return _sender; }
void setSystemState(const lib::ClusterState& systemState);
+
+ // Must be called prior to createLinks() to have any effect
+ void set_num_distributor_stripes(uint32_t n_stripes) noexcept {
+ _num_distributor_stripes = n_stripes;
+ }
protected:
vdstestlib::DirConfig _config;
std::unique_ptr<TestDistributorApp> _node;
@@ -221,6 +226,7 @@ protected:
}
};
MessageSenderImpl _messageSender;
+ uint32_t _num_distributor_stripes;
void enableDistributorClusterState(vespalib::stringref state);
void enable_distributor_cluster_state(const lib::ClusterStateBundle& state);
diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp
index 17fec5d329d..67b264041a3 100644
--- a/storage/src/vespa/storage/distributor/distributor.cpp
+++ b/storage/src/vespa/storage/distributor/distributor.cpp
@@ -25,6 +25,7 @@
#include <vespa/storageframework/generic/status/xmlstatusreporter.h>
#include <vespa/vdslib/distribution/distribution.h>
#include <vespa/vespalib/util/memoryusage.h>
+#include <vespa/vespalib/util/time.h>
#include <algorithm>
#include <vespa/log/log.h>
@@ -58,7 +59,7 @@ Distributor::Distributor(DistributorComponentRegister& compReg,
_messageSender(messageSender),
_use_legacy_mode(num_distributor_stripes == 0),
_stripe(std::make_unique<DistributorStripe>(compReg, *_metrics, node_identity, threadPool,
- doneInitHandler, *this, _use_legacy_mode)),
+ doneInitHandler, *this, *this, _use_legacy_mode)),
_stripe_pool(),
_stripes(),
_stripe_accessor(),
@@ -68,7 +69,14 @@ Distributor::Distributor(DistributorComponentRegister& compReg,
_total_config(_component.total_distributor_config_sp()),
_bucket_db_updater(),
_distributorStatusDelegate(compReg, *this, *this),
+ _bucket_db_status_delegate(),
_threadPool(threadPool),
+ _status_to_do(),
+ _fetched_status_requests(),
+ _stripe_scan_notify_mutex(),
+ _stripe_scan_stats(),
+ _last_host_info_send_time(),
+ _host_info_send_delay(1000ms),
_tickResult(framework::ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN),
_metricUpdateHook(*this),
_hostInfoReporter(*this, *this),
@@ -87,6 +95,7 @@ Distributor::Distributor(DistributorComponentRegister& compReg,
_component.getDistribution(),
*_stripe_accessor);
_stripes.emplace_back(std::move(_stripe));
+ _stripe_scan_stats.resize(num_distributor_stripes);
_distributorStatusDelegate.registerStatusPage();
_bucket_db_status_delegate = std::make_unique<StatusReporterDelegate>(compReg, *this, *_bucket_db_updater);
_bucket_db_status_delegate->registerStatusPage();
@@ -258,13 +267,18 @@ void Distributor::onClose() {
if (_use_legacy_mode) {
_stripe->flush_and_close();
} else {
- {
- auto guard = _stripe_accessor->rendezvous_and_hold_all();
- guard->flush_and_close();
+ // Tests may run with multiple stripes but without threads (for determinism's sake),
+ // so only try to flush stripes if a pool is running.
+ // TODO STRIPE probably also need to flush when running tests to handle any explicit close-tests.
+ if (_stripe_pool->stripe_count() > 0){
+ {
+ auto guard = _stripe_accessor->rendezvous_and_hold_all();
+ guard->flush_and_close();
+ }
+ // TODO STRIPE must ensure no incoming requests can be posted on stripes between close
+ // and pool stop+join!
+ _stripe_pool->stop_and_join();
}
- // TODO STRIPE must ensure no incoming requests can be posted on stripes between close
- // and pool stop+join!
- _stripe_pool->stop_and_join();
assert(_bucket_db_updater);
_bucket_db_updater->flush();
}
@@ -548,6 +562,7 @@ Distributor::doNonCriticalTick(framework::ThreadIndex idx)
_tickResult = framework::ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN;
handle_status_requests();
process_fetched_external_messages();
+ send_host_info_if_appropriate();
_bucket_db_updater->resend_delayed_messages();
}
return _tickResult;
@@ -575,6 +590,56 @@ Distributor::enableNextConfig() // TODO STRIPE rename to enable_next_config_if_c
}
}
+
+void
+Distributor::notify_stripe_wants_to_send_host_info(uint16_t stripe_index)
+{
+ LOG(debug, "Stripe %u has signalled an intent to send host info out-of-band", stripe_index);
+ std::lock_guard lock(_stripe_scan_notify_mutex);
+ assert(!_use_legacy_mode);
+ assert(stripe_index < _stripe_scan_stats.size());
+ auto& stats = _stripe_scan_stats[stripe_index];
+ stats.wants_to_send_host_info = true;
+ stats.has_reported_in_at_least_once = true;
+ // TODO STRIPE consider if we want to wake up distributor thread here. Will be rechecked
+ // every nth millisecond anyway. Not really an issue for out-of-band CC notifications.
+}
+
+bool
+Distributor::may_send_host_info_on_behalf_of_stripes() noexcept
+{
+ bool any_stripe_wants_to_send = false;
+ for (const auto& stats : _stripe_scan_stats) {
+ if (!stats.has_reported_in_at_least_once) {
+ // If not all stripes have reported in at least once, they have not all completed their
+ // first recovery mode pass through their DBs. To avoid sending partial stats to the cluster
+ // controller, we wait with sending the first out-of-band host info reply until they have all
+ // reported in.
+ return false;
+ }
+ any_stripe_wants_to_send |= stats.wants_to_send_host_info;
+ }
+ return any_stripe_wants_to_send;
+}
+
+void
+Distributor::send_host_info_if_appropriate()
+{
+ const auto now = _component.getClock().getMonotonicTime();
+ std::lock_guard lock(_stripe_scan_notify_mutex);
+
+ if (may_send_host_info_on_behalf_of_stripes()) {
+ if ((now - _last_host_info_send_time) >= _host_info_send_delay) {
+ LOG(debug, "Sending GetNodeState replies to cluster controllers on behalf of stripes");
+ _component.getStateUpdater().immediately_send_get_node_state_replies();
+ _last_host_info_send_time = now;
+ for (auto& stats : _stripe_scan_stats) {
+ stats.wants_to_send_host_info = false;
+ }
+ }
+ }
+}
+
void
Distributor::fetch_status_requests()
{
diff --git a/storage/src/vespa/storage/distributor/distributor.h b/storage/src/vespa/storage/distributor/distributor.h
index 4257657816a..7e5de38ffe8 100644
--- a/storage/src/vespa/storage/distributor/distributor.h
+++ b/storage/src/vespa/storage/distributor/distributor.h
@@ -14,6 +14,7 @@
#include "pendingmessagetracker.h"
#include "statusreporterdelegate.h"
#include "stripe_bucket_db_updater.h" // TODO this is temporary
+#include "stripe_host_info_notifier.h"
#include <vespa/config/config.h>
#include <vespa/storage/common/distributorcomponent.h>
#include <vespa/storage/common/doneinitializehandler.h>
@@ -23,6 +24,7 @@
#include <vespa/storageapi/message/state.h>
#include <vespa/storageframework/generic/metric/metricupdatehook.h>
#include <vespa/storageframework/generic/thread/tickingthread.h>
+#include <chrono>
#include <queue>
#include <unordered_map>
@@ -54,7 +56,8 @@ class Distributor final
public framework::StatusReporter,
public framework::TickingThread,
public MinReplicaProvider,
- public BucketSpacesStatsProvider
+ public BucketSpacesStatsProvider,
+ public StripeHostInfoNotifier
{
public:
Distributor(DistributorComponentRegister&,
@@ -97,6 +100,10 @@ public:
virtual framework::ThreadWaitInfo doCriticalTick(framework::ThreadIndex) override;
virtual framework::ThreadWaitInfo doNonCriticalTick(framework::ThreadIndex) override;
+ // Called by DistributorStripe threads when they want to notify the cluster controller of changed stats.
+ // Thread safe.
+ void notify_stripe_wants_to_send_host_info(uint16_t stripe_index) override;
+
class MetricUpdateHook : public framework::MetricUpdateHook
{
public:
@@ -177,6 +184,14 @@ private:
void dispatch_to_main_distributor_thread_queue(const std::shared_ptr<api::StorageMessage>& msg);
void fetch_external_messages();
void process_fetched_external_messages();
+ void send_host_info_if_appropriate();
+ // Precondition: _stripe_scan_notify_mutex is held
+ [[nodiscard]] bool may_send_host_info_on_behalf_of_stripes() noexcept;
+
+ struct StripeScanStats {
+ bool wants_to_send_host_info = false;
+ bool has_reported_in_at_least_once = false;
+ };
using MessageQueue = std::vector<std::shared_ptr<api::StorageMessage>>;
@@ -199,6 +214,10 @@ private:
framework::TickingThreadPool& _threadPool;
mutable std::vector<std::shared_ptr<DistributorStatus>> _status_to_do;
mutable std::vector<std::shared_ptr<DistributorStatus>> _fetched_status_requests;
+ mutable std::mutex _stripe_scan_notify_mutex;
+ std::vector<StripeScanStats> _stripe_scan_stats; // Indices are 1-1 with _stripes entries
+ std::chrono::steady_clock::time_point _last_host_info_send_time;
+ std::chrono::milliseconds _host_info_send_delay;
framework::ThreadWaitInfo _tickResult;
MetricUpdateHook _metricUpdateHook;
DistributorHostInfoReporter _hostInfoReporter;
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.cpp b/storage/src/vespa/storage/distributor/distributor_stripe.cpp
index 1bd5fe20074..668930e65ee 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe.cpp
+++ b/storage/src/vespa/storage/distributor/distributor_stripe.cpp
@@ -6,6 +6,7 @@
#include "distributor_bucket_space.h"
#include "distributormetricsset.h"
#include "idealstatemetricsset.h"
+#include "stripe_host_info_notifier.h"
#include "operation_sequencer.h"
#include "ownership_transfer_safe_time_point_calculator.h"
#include "throttlingoperationstarter.h"
@@ -39,6 +40,7 @@ DistributorStripe::DistributorStripe(DistributorComponentRegister& compReg,
framework::TickingThreadPool& threadPool,
DoneInitializeHandler& doneInitHandler,
ChainedMessageSender& messageSender,
+ StripeHostInfoNotifier& stripe_host_info_notifier,
bool use_legacy_mode)
: DistributorStripeInterface(),
framework::StatusReporter("distributor", "Distributor"),
@@ -57,6 +59,7 @@ DistributorStripe::DistributorStripe(DistributorComponentRegister& compReg,
_bucketDBStatusDelegate(compReg, *this, _bucketDBUpdater),
_idealStateManager(*this, *_bucketSpaceRepo, *_readOnlyBucketSpaceRepo, compReg),
_messageSender(messageSender),
+ _stripe_host_info_notifier(stripe_host_info_notifier),
_externalOperationHandler(_component, _component, getMetrics(), getMessageSender(),
*_operation_sequencer, *this, _component,
_idealStateManager, _operationOwner),
@@ -736,8 +739,11 @@ DistributorStripe::scanNextBucket()
void DistributorStripe::send_updated_host_info_if_required() {
if (_must_send_updated_host_info) {
- // TODO STRIPE how to handle with multiple stripes?
- _component.getStateUpdater().immediately_send_get_node_state_replies();
+ if (_use_legacy_mode) {
+ _component.getStateUpdater().immediately_send_get_node_state_replies();
+ } else {
+ _stripe_host_info_notifier.notify_stripe_wants_to_send_host_info(0); // TODO STRIPE correct stripe index!
+ }
_must_send_updated_host_info = false;
}
}
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.h b/storage/src/vespa/storage/distributor/distributor_stripe.h
index efded7d29d5..475defbf105 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe.h
+++ b/storage/src/vespa/storage/distributor/distributor_stripe.h
@@ -41,6 +41,7 @@ class DistributorBucketSpaceRepo;
class OperationSequencer;
class OwnershipTransferSafeTimePointCalculator;
class SimpleMaintenanceScanner;
+class StripeHostInfoNotifier;
class ThrottlingOperationStarter;
/**
@@ -62,6 +63,7 @@ public:
framework::TickingThreadPool&,
DoneInitializeHandler&,
ChainedMessageSender& messageSender,
+ StripeHostInfoNotifier& stripe_host_info_notifier,
bool use_legacy_mode);
~DistributorStripe() override;
@@ -283,6 +285,7 @@ private:
StatusReporterDelegate _bucketDBStatusDelegate;
IdealStateManager _idealStateManager;
ChainedMessageSender& _messageSender;
+ StripeHostInfoNotifier& _stripe_host_info_notifier;
ExternalOperationHandler _externalOperationHandler;
std::shared_ptr<lib::Distribution> _distribution;
diff --git a/storage/src/vespa/storage/distributor/stripe_host_info_notifier.h b/storage/src/vespa/storage/distributor/stripe_host_info_notifier.h
new file mode 100644
index 00000000000..3f10188827a
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/stripe_host_info_notifier.h
@@ -0,0 +1,24 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <cstdint>
+
+namespace storage::distributor {
+
+/**
+ * Used by stripes to signal that the distributor node should immediately respond to
+ * any pending GetNodeState long-poll RPCs from the cluster controller. This is generally
+ * done when a stripe has completed initializing or if all merging has completed for
+ * a bucket space.
+ *
+ * Implementations of this interface may batch and/or throttle actual host info sends,
+ * but shall attempt to send new host info within a reasonable amount of time (on the
+ * order of seconds).
+ */
+class StripeHostInfoNotifier {
+public:
+ virtual ~StripeHostInfoNotifier() = default;
+ virtual void notify_stripe_wants_to_send_host_info(uint16_t stripe_index) = 0;
+};
+
+}