summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@oath.com>2018-03-15 16:17:43 +0000
committerTor Brede Vekterli <vekterli@oath.com>2018-03-19 12:22:54 +0000
commita65ba0e6728b1a878eb6066e2833abd60799c00d (patch)
tree3fb67b66d059c16c507d288510ab9031a02a2df8
parentd5096254be8a0e21301c8575e193cf68488c39b9 (diff)
Immediately send GetNodeState reply on "no more merges" pending edge
Lets the cluster controller update the derived bucket space states as quickly as possible when merges are done for the global space, without having to wait for the normal reply timeout period.
-rw-r--r--storage/src/tests/distributor/distributortest.cpp52
-rw-r--r--storage/src/tests/distributor/distributortestutil.cpp31
-rw-r--r--storage/src/tests/distributor/distributortestutil.h10
-rw-r--r--storage/src/vespa/storage/distributor/distributor.cpp48
-rw-r--r--storage/src/vespa/storage/distributor/distributor.h2
5 files changed, 128 insertions, 15 deletions
diff --git a/storage/src/tests/distributor/distributortest.cpp b/storage/src/tests/distributor/distributortest.cpp
index 33549226b98..28fccf438ff 100644
--- a/storage/src/tests/distributor/distributortest.cpp
+++ b/storage/src/tests/distributor/distributortest.cpp
@@ -20,6 +20,9 @@
using document::test::makeDocumentBucket;
using document::test::makeBucketSpace;
using document::FixedBucketSpaces;
+using document::BucketSpace;
+using document::Bucket;
+using document::BucketId;
namespace storage {
@@ -60,6 +63,8 @@ class Distributor_Test : public CppUnit::TestFixture,
CPPUNIT_TEST(closing_aborts_priority_queued_client_requests);
CPPUNIT_TEST(entering_recovery_mode_resets_bucket_space_stats);
CPPUNIT_TEST(leaving_recovery_mode_immediately_sends_getnodestate_replies);
+ CPPUNIT_TEST(pending_to_no_pending_default_merges_edge_immediately_sends_getnodestate_replies);
+ CPPUNIT_TEST(pending_to_no_pending_global_merges_edge_immediately_sends_getnodestate_replies);
CPPUNIT_TEST_SUITE_END();
public:
@@ -97,6 +102,10 @@ protected:
void closing_aborts_priority_queued_client_requests();
void entering_recovery_mode_resets_bucket_space_stats();
void leaving_recovery_mode_immediately_sends_getnodestate_replies();
+ void pending_to_no_pending_default_merges_edge_immediately_sends_getnodestate_replies();
+ void pending_to_no_pending_global_merges_edge_immediately_sends_getnodestate_replies();
+ // TODO handle edge case for window between getnodestate reply already
+ // sent and new request not yet received
void assertBucketSpaceStats(size_t expBucketPending, size_t expBucketTotal, uint16_t node, const vespalib::string &bucketSpace,
const BucketSpacesStatsProvider::PerNodeBucketSpacesStats &stats);
@@ -211,6 +220,7 @@ private:
void assertNoMessageBounced();
void configure_mutation_sequencing(bool enabled);
void configure_merge_busy_inhibit_duration(int seconds);
+ void do_test_pending_merge_getnodestate_reply_edge(BucketSpace space);
};
CPPUNIT_TEST_SUITE_REGISTRATION(Distributor_Test);
@@ -1067,6 +1077,48 @@ void Distributor_Test::leaving_recovery_mode_immediately_sends_getnodestate_repl
CPPUNIT_ASSERT_EQUAL(size_t(1), explicit_node_state_reply_send_invocations());
}
+void Distributor_Test::do_test_pending_merge_getnodestate_reply_edge(BucketSpace space) {
+ setupDistributor(Redundancy(2), NodeCount(2), "version:1 distributor:1 storage:2");
+ // 2 buckets with missing replicas triggering merge pending stats
+ addNodesToBucketDB(Bucket(space, BucketId(16, 1)), "0=1/1/1/t/a");
+ addNodesToBucketDB(Bucket(space, BucketId(16, 2)), "0=1/1/1/t/a");
+ tickDistributorNTimes(3);
+ CPPUNIT_ASSERT(!_distributor->isInRecoveryMode());
+ const auto space_name = FixedBucketSpaces::to_string(space);
+ assertBucketSpaceStats(2, 0, 1, space_name, _distributor->getBucketSpacesStats());
+ CPPUNIT_ASSERT_EQUAL(size_t(0), explicit_node_state_reply_send_invocations());
+
+ // Edge not triggered when 1 bucket with missing replica left
+ addNodesToBucketDB(Bucket(space, BucketId(16, 1)), "0=1/1/1/t/a,1=1/1/1/t");
+ tickDistributorNTimes(3);
+ assertBucketSpaceStats(1, 1, 1, space_name, _distributor->getBucketSpacesStats());
+ CPPUNIT_ASSERT_EQUAL(size_t(0), explicit_node_state_reply_send_invocations());
+
+ // Edge triggered when no more buckets with requiring merge
+ addNodesToBucketDB(Bucket(space, BucketId(16, 2)), "0=1/1/1/t/a,1=1/1/1/t");
+ tickDistributorNTimes(3);
+ assertBucketSpaceStats(0, 2, 1, space_name, _distributor->getBucketSpacesStats());
+ CPPUNIT_ASSERT_EQUAL(size_t(1), explicit_node_state_reply_send_invocations());
+
+ // Should only send when edge happens, not in subsequent DB iterations
+ tickDistributorNTimes(10);
+ CPPUNIT_ASSERT_EQUAL(size_t(1), explicit_node_state_reply_send_invocations());
+
+ // Going back to merges pending should _not_ send a getnodestate reply (at least for now)
+ addNodesToBucketDB(Bucket(space, BucketId(16, 1)), "0=1/1/1/t/a");
+ tickDistributorNTimes(3);
+ assertBucketSpaceStats(1, 1, 1, space_name, _distributor->getBucketSpacesStats());
+ CPPUNIT_ASSERT_EQUAL(size_t(1), explicit_node_state_reply_send_invocations());
+}
+
+void Distributor_Test::pending_to_no_pending_default_merges_edge_immediately_sends_getnodestate_replies() {
+ do_test_pending_merge_getnodestate_reply_edge(FixedBucketSpaces::default_space());
+}
+
+void Distributor_Test::pending_to_no_pending_global_merges_edge_immediately_sends_getnodestate_replies() {
+ do_test_pending_merge_getnodestate_reply_edge(FixedBucketSpaces::global_space());
+}
+
}
}
diff --git a/storage/src/tests/distributor/distributortestutil.cpp b/storage/src/tests/distributor/distributortestutil.cpp
index 8310266c9cb..33167d61026 100644
--- a/storage/src/tests/distributor/distributortestutil.cpp
+++ b/storage/src/tests/distributor/distributortestutil.cpp
@@ -173,14 +173,11 @@ DistributorTestUtil::addIdealNodes(const lib::ClusterState& state,
getBucketDatabase().update(entry);
}
-void
-DistributorTestUtil::addNodesToBucketDB(const document::BucketId& id,
- const std::string& nodeStr)
-{
- BucketDatabase::Entry entry = getBucket(id);
+void DistributorTestUtil::addNodesToBucketDB(const document::Bucket& bucket, const std::string& nodeStr) {
+ BucketDatabase::Entry entry = getBucket(bucket);
if (!entry.valid()) {
- entry = BucketDatabase::Entry(id);
+ entry = BucketDatabase::Entry(bucket.getBucketId());
}
entry->clear();
@@ -228,7 +225,14 @@ DistributorTestUtil::addNodesToBucketDB(const document::BucketId& id,
entry->addNodeManual(node);
}
- getBucketDatabase().update(entry);
+ getBucketDatabase(bucket.getBucketSpace()).update(entry);
+}
+
+void
+DistributorTestUtil::addNodesToBucketDB(const document::BucketId& id,
+ const std::string& nodeStr)
+{
+ addNodesToBucketDB(document::Bucket(makeBucketSpace(), id), nodeStr);
}
void
@@ -301,6 +305,10 @@ DistributorTestUtil::sendReply(Operation& op,
op.receive(_sender, reply);
}
+BucketDatabase::Entry DistributorTestUtil::getBucket(const document::Bucket& bucket) const {
+ return getBucketDatabase(bucket.getBucketSpace()).get(bucket.getBucketId());
+}
+
BucketDatabase::Entry
DistributorTestUtil::getBucket(const document::BucketId& bId) const
{
@@ -356,11 +364,20 @@ BucketDatabase&
DistributorTestUtil::getBucketDatabase() {
return getDistributorBucketSpace().getBucketDatabase();
}
+
+BucketDatabase& DistributorTestUtil::getBucketDatabase(document::BucketSpace space) {
+ return getBucketSpaceRepo().get(space).getBucketDatabase();
+}
+
const BucketDatabase&
DistributorTestUtil::getBucketDatabase() const {
return getBucketSpaceRepo().get(makeBucketSpace()).getBucketDatabase();
}
+const BucketDatabase& DistributorTestUtil::getBucketDatabase(document::BucketSpace space) const {
+ return getBucketSpaceRepo().get(space).getBucketDatabase();
+}
+
DistributorBucketSpaceRepo &
DistributorTestUtil::getBucketSpaceRepo() {
return _distributor->getBucketSpaceRepo();
diff --git a/storage/src/tests/distributor/distributortestutil.h b/storage/src/tests/distributor/distributortestutil.h
index e19cf491fc7..c2cb4fb2a62 100644
--- a/storage/src/tests/distributor/distributortestutil.h
+++ b/storage/src/tests/distributor/distributortestutil.h
@@ -67,6 +67,8 @@ public:
* Format:
* "node1=checksum/docs/size,node2=checksum/docs/size"
*/
+ void addNodesToBucketDB(const document::Bucket& bucket, const std::string& nodeStr);
+ // As the above, but always inserts into default bucket space
void addNodesToBucketDB(const document::BucketId& id, const std::string& nodeStr);
/**
@@ -124,8 +126,10 @@ public:
// TODO explicit notion of bucket spaces for tests
DistributorBucketSpace &getDistributorBucketSpace();
- BucketDatabase& getBucketDatabase();
- const BucketDatabase& getBucketDatabase() const;
+ BucketDatabase& getBucketDatabase(); // Implicit default space only
+ BucketDatabase& getBucketDatabase(document::BucketSpace space);
+ const BucketDatabase& getBucketDatabase() const; // Implicit default space only
+ const BucketDatabase& getBucketDatabase(document::BucketSpace space) const;
DistributorBucketSpaceRepo &getBucketSpaceRepo();
const DistributorBucketSpaceRepo &getBucketSpaceRepo() const;
@@ -164,6 +168,8 @@ public:
void disableBucketActivationInConfig(bool disable);
+ BucketDatabase::Entry getBucket(const document::Bucket& bucket) const;
+ // Gets bucket entry from default space only
BucketDatabase::Entry getBucket(const document::BucketId& bId) const;
std::vector<document::BucketSpace> getBucketSpaces() const;
diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp
index 8d9cb82dca9..2d490ea9923 100644
--- a/storage/src/vespa/storage/distributor/distributor.cpp
+++ b/storage/src/vespa/storage/distributor/distributor.cpp
@@ -99,7 +99,8 @@ Distributor::Distributor(DistributorComponentRegister& compReg,
_hostInfoReporter(*this, *this),
_ownershipSafeTimeCalc(
std::make_unique<OwnershipTransferSafeTimePointCalculator>(
- std::chrono::seconds(0))) // Set by config later
+ std::chrono::seconds(0))), // Set by config later
+ _must_send_updated_host_info(false)
{
_component.registerMetric(*_metrics);
_component.registerMetricUpdateHook(_metricUpdateHook,
@@ -420,7 +421,7 @@ Distributor::leaveRecoveryMode()
_metrics->recoveryModeTime.addValue(
_recoveryTimeStarted.getElapsedTimeAsDouble());
if (_doneInitializing) {
- _component.getStateUpdater().immediately_send_get_node_state_replies();
+ _must_send_updated_host_info = true;
}
}
_schedulingMode = MaintenanceScheduler::NORMAL_SCHEDULING_MODE;
@@ -694,10 +695,12 @@ toBucketSpaceStats(const NodeMaintenanceStats &stats)
return BucketSpaceStats(stats.total, stats.syncing + stats.copyingIn);
}
-BucketSpacesStatsProvider::PerNodeBucketSpacesStats
+using PerNodeBucketSpacesStats = BucketSpacesStatsProvider::PerNodeBucketSpacesStats;
+
+PerNodeBucketSpacesStats
toBucketSpacesStats(const NodeMaintenanceStatsTracker &maintenanceStats)
{
- BucketSpacesStatsProvider::PerNodeBucketSpacesStats result;
+ PerNodeBucketSpacesStats result;
for (const auto &nodeEntry : maintenanceStats.perNodeStats()) {
for (const auto &bucketSpaceEntry : nodeEntry.second) {
auto bucketSpace = document::FixedBucketSpaces::to_string(bucketSpaceEntry.first);
@@ -707,6 +710,27 @@ toBucketSpacesStats(const NodeMaintenanceStatsTracker &maintenanceStats)
return result;
}
+size_t spaces_with_merges_pending(const PerNodeBucketSpacesStats& stats) {
+ std::unordered_set<document::BucketSpace, document::BucketSpace::hash> spaces_with_pending;
+ for (auto& node : stats) {
+ for (auto& space : node.second) {
+ if (space.second.valid() && space.second.bucketsPending() != 0) {
+ // TODO avoid bucket space string roundtrip
+ spaces_with_pending.emplace(document::FixedBucketSpaces::from_string(space.first));
+ }
+ }
+ }
+ return spaces_with_pending.size();
+}
+
+// TODO should we also trigger on !pending --> pending edge?
+bool merge_no_longer_pending_edge(const PerNodeBucketSpacesStats& prev_stats,
+ const PerNodeBucketSpacesStats& curr_stats) {
+ const auto prev_pending = spaces_with_merges_pending(prev_stats);
+ const auto curr_pending = spaces_with_merges_pending(curr_stats);
+ return curr_pending < prev_pending;
+}
+
}
void
@@ -717,7 +741,11 @@ Distributor::updateInternalMetricsForCompletedScan()
_bucketDBMetricUpdater.completeRound();
_bucketDbStats = _bucketDBMetricUpdater.getLastCompleteStats();
_maintenanceStats = _scanner->getPendingMaintenanceStats();
- _bucketSpacesStats = toBucketSpacesStats(_maintenanceStats.perNodeStats);
+ auto new_space_stats = toBucketSpacesStats(_maintenanceStats.perNodeStats);
+ if (merge_no_longer_pending_edge(_bucketSpacesStats, new_space_stats)) {
+ _must_send_updated_host_info = true;
+ }
+ _bucketSpacesStats = std::move(new_space_stats);
}
void
@@ -733,7 +761,8 @@ Distributor::scanNextBucket()
MaintenanceScanner::ScanResult scanResult(_scanner->scanNext());
if (scanResult.isDone()) {
updateInternalMetricsForCompletedScan();
- leaveRecoveryMode(); // Must happen after internal metrics updates
+ leaveRecoveryMode();
+ send_updated_host_info_if_required();
_scanner->reset();
} else {
const auto &distribution(_bucketSpaceRepo->get(scanResult.getBucketSpace()).getDistribution());
@@ -744,6 +773,13 @@ Distributor::scanNextBucket()
return scanResult;
}
+void Distributor::send_updated_host_info_if_required() {
+ if (_must_send_updated_host_info) {
+ _component.getStateUpdater().immediately_send_get_node_state_replies();
+ _must_send_updated_host_info = false;
+ }
+}
+
void
Distributor::startNextMaintenanceOperation()
{
diff --git a/storage/src/vespa/storage/distributor/distributor.h b/storage/src/vespa/storage/distributor/distributor.h
index 15738144a58..9108abea2af 100644
--- a/storage/src/vespa/storage/distributor/distributor.h
+++ b/storage/src/vespa/storage/distributor/distributor.h
@@ -237,6 +237,7 @@ private:
template <typename NodeFunctor>
void for_each_available_content_node_in(const lib::ClusterState&, NodeFunctor&&);
void invalidate_bucket_spaces_stats();
+ void send_updated_host_info_if_required();
lib::ClusterStateBundle _clusterStateBundle;
@@ -310,6 +311,7 @@ private:
BucketDBMetricUpdater::Stats _bucketDbStats;
DistributorHostInfoReporter _hostInfoReporter;
std::unique_ptr<OwnershipTransferSafeTimePointCalculator> _ownershipSafeTimeCalc;
+ bool _must_send_updated_host_info;
};
} // distributor