summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGeir Storli <geirstorli@yahoo.no>2018-03-19 14:49:48 +0100
committerGitHub <noreply@github.com>2018-03-19 14:49:48 +0100
commite73ee36db35b3e80ae4aea4450c45a1c2314e870 (patch)
tree3e0c563a9a4ba4e7fe9136165bd46dbf6eaa5983
parent601b4777257e9d1694bdc6987d2b63ee6ccf52dc (diff)
parenta65ba0e6728b1a878eb6066e2833abd60799c00d (diff)
Merge pull request #5382 from vespa-engine/vekterli/immediately-send-getnodestate-reply-on-no-pending-merges-edge-rebased
Immediately send GetNodeState reply on "no more merges" pending edge (rebased)
-rw-r--r--storage/src/tests/distributor/distributortest.cpp52
-rw-r--r--storage/src/tests/distributor/distributortestutil.cpp31
-rw-r--r--storage/src/tests/distributor/distributortestutil.h10
-rw-r--r--storage/src/vespa/storage/distributor/distributor.cpp48
-rw-r--r--storage/src/vespa/storage/distributor/distributor.h2
5 files changed, 128 insertions, 15 deletions
diff --git a/storage/src/tests/distributor/distributortest.cpp b/storage/src/tests/distributor/distributortest.cpp
index 33549226b98..28fccf438ff 100644
--- a/storage/src/tests/distributor/distributortest.cpp
+++ b/storage/src/tests/distributor/distributortest.cpp
@@ -20,6 +20,9 @@
using document::test::makeDocumentBucket;
using document::test::makeBucketSpace;
using document::FixedBucketSpaces;
+using document::BucketSpace;
+using document::Bucket;
+using document::BucketId;
namespace storage {
@@ -60,6 +63,8 @@ class Distributor_Test : public CppUnit::TestFixture,
CPPUNIT_TEST(closing_aborts_priority_queued_client_requests);
CPPUNIT_TEST(entering_recovery_mode_resets_bucket_space_stats);
CPPUNIT_TEST(leaving_recovery_mode_immediately_sends_getnodestate_replies);
+ CPPUNIT_TEST(pending_to_no_pending_default_merges_edge_immediately_sends_getnodestate_replies);
+ CPPUNIT_TEST(pending_to_no_pending_global_merges_edge_immediately_sends_getnodestate_replies);
CPPUNIT_TEST_SUITE_END();
public:
@@ -97,6 +102,10 @@ protected:
void closing_aborts_priority_queued_client_requests();
void entering_recovery_mode_resets_bucket_space_stats();
void leaving_recovery_mode_immediately_sends_getnodestate_replies();
+ void pending_to_no_pending_default_merges_edge_immediately_sends_getnodestate_replies();
+ void pending_to_no_pending_global_merges_edge_immediately_sends_getnodestate_replies();
+ // TODO handle edge case for window between getnodestate reply already
+ // sent and new request not yet received
void assertBucketSpaceStats(size_t expBucketPending, size_t expBucketTotal, uint16_t node, const vespalib::string &bucketSpace,
const BucketSpacesStatsProvider::PerNodeBucketSpacesStats &stats);
@@ -211,6 +220,7 @@ private:
void assertNoMessageBounced();
void configure_mutation_sequencing(bool enabled);
void configure_merge_busy_inhibit_duration(int seconds);
+ void do_test_pending_merge_getnodestate_reply_edge(BucketSpace space);
};
CPPUNIT_TEST_SUITE_REGISTRATION(Distributor_Test);
@@ -1067,6 +1077,48 @@ void Distributor_Test::leaving_recovery_mode_immediately_sends_getnodestate_repl
CPPUNIT_ASSERT_EQUAL(size_t(1), explicit_node_state_reply_send_invocations());
}
+void Distributor_Test::do_test_pending_merge_getnodestate_reply_edge(BucketSpace space) {
+ setupDistributor(Redundancy(2), NodeCount(2), "version:1 distributor:1 storage:2");
+ // 2 buckets with missing replicas triggering merge pending stats
+ addNodesToBucketDB(Bucket(space, BucketId(16, 1)), "0=1/1/1/t/a");
+ addNodesToBucketDB(Bucket(space, BucketId(16, 2)), "0=1/1/1/t/a");
+ tickDistributorNTimes(3);
+ CPPUNIT_ASSERT(!_distributor->isInRecoveryMode());
+ const auto space_name = FixedBucketSpaces::to_string(space);
+ assertBucketSpaceStats(2, 0, 1, space_name, _distributor->getBucketSpacesStats());
+ CPPUNIT_ASSERT_EQUAL(size_t(0), explicit_node_state_reply_send_invocations());
+
+ // Edge not triggered when 1 bucket with missing replica left
+ addNodesToBucketDB(Bucket(space, BucketId(16, 1)), "0=1/1/1/t/a,1=1/1/1/t");
+ tickDistributorNTimes(3);
+ assertBucketSpaceStats(1, 1, 1, space_name, _distributor->getBucketSpacesStats());
+ CPPUNIT_ASSERT_EQUAL(size_t(0), explicit_node_state_reply_send_invocations());
+
+ // Edge triggered when no more buckets with requiring merge
+ addNodesToBucketDB(Bucket(space, BucketId(16, 2)), "0=1/1/1/t/a,1=1/1/1/t");
+ tickDistributorNTimes(3);
+ assertBucketSpaceStats(0, 2, 1, space_name, _distributor->getBucketSpacesStats());
+ CPPUNIT_ASSERT_EQUAL(size_t(1), explicit_node_state_reply_send_invocations());
+
+ // Should only send when edge happens, not in subsequent DB iterations
+ tickDistributorNTimes(10);
+ CPPUNIT_ASSERT_EQUAL(size_t(1), explicit_node_state_reply_send_invocations());
+
+ // Going back to merges pending should _not_ send a getnodestate reply (at least for now)
+ addNodesToBucketDB(Bucket(space, BucketId(16, 1)), "0=1/1/1/t/a");
+ tickDistributorNTimes(3);
+ assertBucketSpaceStats(1, 1, 1, space_name, _distributor->getBucketSpacesStats());
+ CPPUNIT_ASSERT_EQUAL(size_t(1), explicit_node_state_reply_send_invocations());
+}
+
+void Distributor_Test::pending_to_no_pending_default_merges_edge_immediately_sends_getnodestate_replies() {
+ do_test_pending_merge_getnodestate_reply_edge(FixedBucketSpaces::default_space());
+}
+
+void Distributor_Test::pending_to_no_pending_global_merges_edge_immediately_sends_getnodestate_replies() {
+ do_test_pending_merge_getnodestate_reply_edge(FixedBucketSpaces::global_space());
+}
+
}
}
diff --git a/storage/src/tests/distributor/distributortestutil.cpp b/storage/src/tests/distributor/distributortestutil.cpp
index 8310266c9cb..33167d61026 100644
--- a/storage/src/tests/distributor/distributortestutil.cpp
+++ b/storage/src/tests/distributor/distributortestutil.cpp
@@ -173,14 +173,11 @@ DistributorTestUtil::addIdealNodes(const lib::ClusterState& state,
getBucketDatabase().update(entry);
}
-void
-DistributorTestUtil::addNodesToBucketDB(const document::BucketId& id,
- const std::string& nodeStr)
-{
- BucketDatabase::Entry entry = getBucket(id);
+void DistributorTestUtil::addNodesToBucketDB(const document::Bucket& bucket, const std::string& nodeStr) {
+ BucketDatabase::Entry entry = getBucket(bucket);
if (!entry.valid()) {
- entry = BucketDatabase::Entry(id);
+ entry = BucketDatabase::Entry(bucket.getBucketId());
}
entry->clear();
@@ -228,7 +225,14 @@ DistributorTestUtil::addNodesToBucketDB(const document::BucketId& id,
entry->addNodeManual(node);
}
- getBucketDatabase().update(entry);
+ getBucketDatabase(bucket.getBucketSpace()).update(entry);
+}
+
+void
+DistributorTestUtil::addNodesToBucketDB(const document::BucketId& id,
+ const std::string& nodeStr)
+{
+ addNodesToBucketDB(document::Bucket(makeBucketSpace(), id), nodeStr);
}
void
@@ -301,6 +305,10 @@ DistributorTestUtil::sendReply(Operation& op,
op.receive(_sender, reply);
}
+BucketDatabase::Entry DistributorTestUtil::getBucket(const document::Bucket& bucket) const {
+ return getBucketDatabase(bucket.getBucketSpace()).get(bucket.getBucketId());
+}
+
BucketDatabase::Entry
DistributorTestUtil::getBucket(const document::BucketId& bId) const
{
@@ -356,11 +364,20 @@ BucketDatabase&
DistributorTestUtil::getBucketDatabase() {
return getDistributorBucketSpace().getBucketDatabase();
}
+
+BucketDatabase& DistributorTestUtil::getBucketDatabase(document::BucketSpace space) {
+ return getBucketSpaceRepo().get(space).getBucketDatabase();
+}
+
const BucketDatabase&
DistributorTestUtil::getBucketDatabase() const {
return getBucketSpaceRepo().get(makeBucketSpace()).getBucketDatabase();
}
+const BucketDatabase& DistributorTestUtil::getBucketDatabase(document::BucketSpace space) const {
+ return getBucketSpaceRepo().get(space).getBucketDatabase();
+}
+
DistributorBucketSpaceRepo &
DistributorTestUtil::getBucketSpaceRepo() {
return _distributor->getBucketSpaceRepo();
diff --git a/storage/src/tests/distributor/distributortestutil.h b/storage/src/tests/distributor/distributortestutil.h
index e19cf491fc7..c2cb4fb2a62 100644
--- a/storage/src/tests/distributor/distributortestutil.h
+++ b/storage/src/tests/distributor/distributortestutil.h
@@ -67,6 +67,8 @@ public:
* Format:
* "node1=checksum/docs/size,node2=checksum/docs/size"
*/
+ void addNodesToBucketDB(const document::Bucket& bucket, const std::string& nodeStr);
+ // As the above, but always inserts into default bucket space
void addNodesToBucketDB(const document::BucketId& id, const std::string& nodeStr);
/**
@@ -124,8 +126,10 @@ public:
// TODO explicit notion of bucket spaces for tests
DistributorBucketSpace &getDistributorBucketSpace();
- BucketDatabase& getBucketDatabase();
- const BucketDatabase& getBucketDatabase() const;
+ BucketDatabase& getBucketDatabase(); // Implicit default space only
+ BucketDatabase& getBucketDatabase(document::BucketSpace space);
+ const BucketDatabase& getBucketDatabase() const; // Implicit default space only
+ const BucketDatabase& getBucketDatabase(document::BucketSpace space) const;
DistributorBucketSpaceRepo &getBucketSpaceRepo();
const DistributorBucketSpaceRepo &getBucketSpaceRepo() const;
@@ -164,6 +168,8 @@ public:
void disableBucketActivationInConfig(bool disable);
+ BucketDatabase::Entry getBucket(const document::Bucket& bucket) const;
+ // Gets bucket entry from default space only
BucketDatabase::Entry getBucket(const document::BucketId& bId) const;
std::vector<document::BucketSpace> getBucketSpaces() const;
diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp
index 8d9cb82dca9..2d490ea9923 100644
--- a/storage/src/vespa/storage/distributor/distributor.cpp
+++ b/storage/src/vespa/storage/distributor/distributor.cpp
@@ -99,7 +99,8 @@ Distributor::Distributor(DistributorComponentRegister& compReg,
_hostInfoReporter(*this, *this),
_ownershipSafeTimeCalc(
std::make_unique<OwnershipTransferSafeTimePointCalculator>(
- std::chrono::seconds(0))) // Set by config later
+ std::chrono::seconds(0))), // Set by config later
+ _must_send_updated_host_info(false)
{
_component.registerMetric(*_metrics);
_component.registerMetricUpdateHook(_metricUpdateHook,
@@ -420,7 +421,7 @@ Distributor::leaveRecoveryMode()
_metrics->recoveryModeTime.addValue(
_recoveryTimeStarted.getElapsedTimeAsDouble());
if (_doneInitializing) {
- _component.getStateUpdater().immediately_send_get_node_state_replies();
+ _must_send_updated_host_info = true;
}
}
_schedulingMode = MaintenanceScheduler::NORMAL_SCHEDULING_MODE;
@@ -694,10 +695,12 @@ toBucketSpaceStats(const NodeMaintenanceStats &stats)
return BucketSpaceStats(stats.total, stats.syncing + stats.copyingIn);
}
-BucketSpacesStatsProvider::PerNodeBucketSpacesStats
+using PerNodeBucketSpacesStats = BucketSpacesStatsProvider::PerNodeBucketSpacesStats;
+
+PerNodeBucketSpacesStats
toBucketSpacesStats(const NodeMaintenanceStatsTracker &maintenanceStats)
{
- BucketSpacesStatsProvider::PerNodeBucketSpacesStats result;
+ PerNodeBucketSpacesStats result;
for (const auto &nodeEntry : maintenanceStats.perNodeStats()) {
for (const auto &bucketSpaceEntry : nodeEntry.second) {
auto bucketSpace = document::FixedBucketSpaces::to_string(bucketSpaceEntry.first);
@@ -707,6 +710,27 @@ toBucketSpacesStats(const NodeMaintenanceStatsTracker &maintenanceStats)
return result;
}
+size_t spaces_with_merges_pending(const PerNodeBucketSpacesStats& stats) {
+ std::unordered_set<document::BucketSpace, document::BucketSpace::hash> spaces_with_pending;
+ for (auto& node : stats) {
+ for (auto& space : node.second) {
+ if (space.second.valid() && space.second.bucketsPending() != 0) {
+ // TODO avoid bucket space string roundtrip
+ spaces_with_pending.emplace(document::FixedBucketSpaces::from_string(space.first));
+ }
+ }
+ }
+ return spaces_with_pending.size();
+}
+
+// TODO should we also trigger on !pending --> pending edge?
+bool merge_no_longer_pending_edge(const PerNodeBucketSpacesStats& prev_stats,
+ const PerNodeBucketSpacesStats& curr_stats) {
+ const auto prev_pending = spaces_with_merges_pending(prev_stats);
+ const auto curr_pending = spaces_with_merges_pending(curr_stats);
+ return curr_pending < prev_pending;
+}
+
}
void
@@ -717,7 +741,11 @@ Distributor::updateInternalMetricsForCompletedScan()
_bucketDBMetricUpdater.completeRound();
_bucketDbStats = _bucketDBMetricUpdater.getLastCompleteStats();
_maintenanceStats = _scanner->getPendingMaintenanceStats();
- _bucketSpacesStats = toBucketSpacesStats(_maintenanceStats.perNodeStats);
+ auto new_space_stats = toBucketSpacesStats(_maintenanceStats.perNodeStats);
+ if (merge_no_longer_pending_edge(_bucketSpacesStats, new_space_stats)) {
+ _must_send_updated_host_info = true;
+ }
+ _bucketSpacesStats = std::move(new_space_stats);
}
void
@@ -733,7 +761,8 @@ Distributor::scanNextBucket()
MaintenanceScanner::ScanResult scanResult(_scanner->scanNext());
if (scanResult.isDone()) {
updateInternalMetricsForCompletedScan();
- leaveRecoveryMode(); // Must happen after internal metrics updates
+ leaveRecoveryMode();
+ send_updated_host_info_if_required();
_scanner->reset();
} else {
const auto &distribution(_bucketSpaceRepo->get(scanResult.getBucketSpace()).getDistribution());
@@ -744,6 +773,13 @@ Distributor::scanNextBucket()
return scanResult;
}
+void Distributor::send_updated_host_info_if_required() {
+ if (_must_send_updated_host_info) {
+ _component.getStateUpdater().immediately_send_get_node_state_replies();
+ _must_send_updated_host_info = false;
+ }
+}
+
void
Distributor::startNextMaintenanceOperation()
{
diff --git a/storage/src/vespa/storage/distributor/distributor.h b/storage/src/vespa/storage/distributor/distributor.h
index 15738144a58..9108abea2af 100644
--- a/storage/src/vespa/storage/distributor/distributor.h
+++ b/storage/src/vespa/storage/distributor/distributor.h
@@ -237,6 +237,7 @@ private:
template <typename NodeFunctor>
void for_each_available_content_node_in(const lib::ClusterState&, NodeFunctor&&);
void invalidate_bucket_spaces_stats();
+ void send_updated_host_info_if_required();
lib::ClusterStateBundle _clusterStateBundle;
@@ -310,6 +311,7 @@ private:
BucketDBMetricUpdater::Stats _bucketDbStats;
DistributorHostInfoReporter _hostInfoReporter;
std::unique_ptr<OwnershipTransferSafeTimePointCalculator> _ownershipSafeTimeCalc;
+ bool _must_send_updated_host_info;
};
} // distributor