diff options
author | Geir Storli <geirstorli@yahoo.no> | 2018-03-19 14:49:48 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-03-19 14:49:48 +0100 |
commit | e73ee36db35b3e80ae4aea4450c45a1c2314e870 (patch) | |
tree | 3e0c563a9a4ba4e7fe9136165bd46dbf6eaa5983 | |
parent | 601b4777257e9d1694bdc6987d2b63ee6ccf52dc (diff) | |
parent | a65ba0e6728b1a878eb6066e2833abd60799c00d (diff) |
Merge pull request #5382 from vespa-engine/vekterli/immediately-send-getnodestate-reply-on-no-pending-merges-edge-rebased
Immediately send GetNodeState reply on "no more merges" pending edge (rebased)
5 files changed, 128 insertions, 15 deletions
diff --git a/storage/src/tests/distributor/distributortest.cpp b/storage/src/tests/distributor/distributortest.cpp index 33549226b98..28fccf438ff 100644 --- a/storage/src/tests/distributor/distributortest.cpp +++ b/storage/src/tests/distributor/distributortest.cpp @@ -20,6 +20,9 @@ using document::test::makeDocumentBucket; using document::test::makeBucketSpace; using document::FixedBucketSpaces; +using document::BucketSpace; +using document::Bucket; +using document::BucketId; namespace storage { @@ -60,6 +63,8 @@ class Distributor_Test : public CppUnit::TestFixture, CPPUNIT_TEST(closing_aborts_priority_queued_client_requests); CPPUNIT_TEST(entering_recovery_mode_resets_bucket_space_stats); CPPUNIT_TEST(leaving_recovery_mode_immediately_sends_getnodestate_replies); + CPPUNIT_TEST(pending_to_no_pending_default_merges_edge_immediately_sends_getnodestate_replies); + CPPUNIT_TEST(pending_to_no_pending_global_merges_edge_immediately_sends_getnodestate_replies); CPPUNIT_TEST_SUITE_END(); public: @@ -97,6 +102,10 @@ protected: void closing_aborts_priority_queued_client_requests(); void entering_recovery_mode_resets_bucket_space_stats(); void leaving_recovery_mode_immediately_sends_getnodestate_replies(); + void pending_to_no_pending_default_merges_edge_immediately_sends_getnodestate_replies(); + void pending_to_no_pending_global_merges_edge_immediately_sends_getnodestate_replies(); + // TODO handle edge case for window between getnodestate reply already + // sent and new request not yet received void assertBucketSpaceStats(size_t expBucketPending, size_t expBucketTotal, uint16_t node, const vespalib::string &bucketSpace, const BucketSpacesStatsProvider::PerNodeBucketSpacesStats &stats); @@ -211,6 +220,7 @@ private: void assertNoMessageBounced(); void configure_mutation_sequencing(bool enabled); void configure_merge_busy_inhibit_duration(int seconds); + void do_test_pending_merge_getnodestate_reply_edge(BucketSpace space); }; CPPUNIT_TEST_SUITE_REGISTRATION(Distributor_Test); @@ -1067,6 +1077,48 @@ void Distributor_Test::leaving_recovery_mode_immediately_sends_getnodestate_repl CPPUNIT_ASSERT_EQUAL(size_t(1), explicit_node_state_reply_send_invocations()); } +void Distributor_Test::do_test_pending_merge_getnodestate_reply_edge(BucketSpace space) { + setupDistributor(Redundancy(2), NodeCount(2), "version:1 distributor:1 storage:2"); + // 2 buckets with missing replicas triggering merge pending stats + addNodesToBucketDB(Bucket(space, BucketId(16, 1)), "0=1/1/1/t/a"); + addNodesToBucketDB(Bucket(space, BucketId(16, 2)), "0=1/1/1/t/a"); + tickDistributorNTimes(3); + CPPUNIT_ASSERT(!_distributor->isInRecoveryMode()); + const auto space_name = FixedBucketSpaces::to_string(space); + assertBucketSpaceStats(2, 0, 1, space_name, _distributor->getBucketSpacesStats()); + CPPUNIT_ASSERT_EQUAL(size_t(0), explicit_node_state_reply_send_invocations()); + + // Edge not triggered when 1 bucket with missing replica left + addNodesToBucketDB(Bucket(space, BucketId(16, 1)), "0=1/1/1/t/a,1=1/1/1/t"); + tickDistributorNTimes(3); + assertBucketSpaceStats(1, 1, 1, space_name, _distributor->getBucketSpacesStats()); + CPPUNIT_ASSERT_EQUAL(size_t(0), explicit_node_state_reply_send_invocations()); + + // Edge triggered when no more buckets with requiring merge + addNodesToBucketDB(Bucket(space, BucketId(16, 2)), "0=1/1/1/t/a,1=1/1/1/t"); + tickDistributorNTimes(3); + assertBucketSpaceStats(0, 2, 1, space_name, _distributor->getBucketSpacesStats()); + CPPUNIT_ASSERT_EQUAL(size_t(1), explicit_node_state_reply_send_invocations()); + + // Should only send when edge happens, not in subsequent DB iterations + tickDistributorNTimes(10); + CPPUNIT_ASSERT_EQUAL(size_t(1), explicit_node_state_reply_send_invocations()); + + // Going back to merges pending should _not_ send a getnodestate reply (at least for now) + addNodesToBucketDB(Bucket(space, BucketId(16, 1)), "0=1/1/1/t/a"); + tickDistributorNTimes(3); + assertBucketSpaceStats(1, 1, 1, space_name, _distributor->getBucketSpacesStats()); + CPPUNIT_ASSERT_EQUAL(size_t(1), explicit_node_state_reply_send_invocations()); +} + +void Distributor_Test::pending_to_no_pending_default_merges_edge_immediately_sends_getnodestate_replies() { + do_test_pending_merge_getnodestate_reply_edge(FixedBucketSpaces::default_space()); +} + +void Distributor_Test::pending_to_no_pending_global_merges_edge_immediately_sends_getnodestate_replies() { + do_test_pending_merge_getnodestate_reply_edge(FixedBucketSpaces::global_space()); +} + } } diff --git a/storage/src/tests/distributor/distributortestutil.cpp b/storage/src/tests/distributor/distributortestutil.cpp index 8310266c9cb..33167d61026 100644 --- a/storage/src/tests/distributor/distributortestutil.cpp +++ b/storage/src/tests/distributor/distributortestutil.cpp @@ -173,14 +173,11 @@ DistributorTestUtil::addIdealNodes(const lib::ClusterState& state, getBucketDatabase().update(entry); } -void -DistributorTestUtil::addNodesToBucketDB(const document::BucketId& id, - const std::string& nodeStr) -{ - BucketDatabase::Entry entry = getBucket(id); +void DistributorTestUtil::addNodesToBucketDB(const document::Bucket& bucket, const std::string& nodeStr) { + BucketDatabase::Entry entry = getBucket(bucket); if (!entry.valid()) { - entry = BucketDatabase::Entry(id); + entry = BucketDatabase::Entry(bucket.getBucketId()); } entry->clear(); @@ -228,7 +225,14 @@ DistributorTestUtil::addNodesToBucketDB(const document::BucketId& id, entry->addNodeManual(node); } - getBucketDatabase().update(entry); + getBucketDatabase(bucket.getBucketSpace()).update(entry); +} + +void +DistributorTestUtil::addNodesToBucketDB(const document::BucketId& id, + const std::string& nodeStr) +{ + addNodesToBucketDB(document::Bucket(makeBucketSpace(), id), nodeStr); } void @@ -301,6 +305,10 @@ DistributorTestUtil::sendReply(Operation& op, op.receive(_sender, reply); } +BucketDatabase::Entry DistributorTestUtil::getBucket(const document::Bucket& bucket) const { + return getBucketDatabase(bucket.getBucketSpace()).get(bucket.getBucketId()); +} + BucketDatabase::Entry DistributorTestUtil::getBucket(const document::BucketId& bId) const { @@ -356,11 +364,20 @@ BucketDatabase& DistributorTestUtil::getBucketDatabase() { return getDistributorBucketSpace().getBucketDatabase(); } + +BucketDatabase& DistributorTestUtil::getBucketDatabase(document::BucketSpace space) { + return getBucketSpaceRepo().get(space).getBucketDatabase(); +} + const BucketDatabase& DistributorTestUtil::getBucketDatabase() const { return getBucketSpaceRepo().get(makeBucketSpace()).getBucketDatabase(); } +const BucketDatabase& DistributorTestUtil::getBucketDatabase(document::BucketSpace space) const { + return getBucketSpaceRepo().get(space).getBucketDatabase(); +} + DistributorBucketSpaceRepo & DistributorTestUtil::getBucketSpaceRepo() { return _distributor->getBucketSpaceRepo(); diff --git a/storage/src/tests/distributor/distributortestutil.h b/storage/src/tests/distributor/distributortestutil.h index e19cf491fc7..c2cb4fb2a62 100644 --- a/storage/src/tests/distributor/distributortestutil.h +++ b/storage/src/tests/distributor/distributortestutil.h @@ -67,6 +67,8 @@ public: * Format: * "node1=checksum/docs/size,node2=checksum/docs/size" */ + void addNodesToBucketDB(const document::Bucket& bucket, const std::string& nodeStr); + // As the above, but always inserts into default bucket space void addNodesToBucketDB(const document::BucketId& id, const std::string& nodeStr); /** @@ -124,8 +126,10 @@ public: // TODO explicit notion of bucket spaces for tests DistributorBucketSpace &getDistributorBucketSpace(); - BucketDatabase& getBucketDatabase(); - const BucketDatabase& getBucketDatabase() const; + BucketDatabase& getBucketDatabase(); // Implicit default space only + BucketDatabase& getBucketDatabase(document::BucketSpace space); + const BucketDatabase& getBucketDatabase() const; // Implicit default space only + const BucketDatabase& getBucketDatabase(document::BucketSpace space) const; DistributorBucketSpaceRepo &getBucketSpaceRepo(); const DistributorBucketSpaceRepo &getBucketSpaceRepo() const; @@ -164,6 +168,8 @@ public: void disableBucketActivationInConfig(bool disable); + BucketDatabase::Entry getBucket(const document::Bucket& bucket) const; + // Gets bucket entry from default space only BucketDatabase::Entry getBucket(const document::BucketId& bId) const; std::vector<document::BucketSpace> getBucketSpaces() const; diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp index 8d9cb82dca9..2d490ea9923 100644 --- a/storage/src/vespa/storage/distributor/distributor.cpp +++ b/storage/src/vespa/storage/distributor/distributor.cpp @@ -99,7 +99,8 @@ Distributor::Distributor(DistributorComponentRegister& compReg, _hostInfoReporter(*this, *this), _ownershipSafeTimeCalc( std::make_unique<OwnershipTransferSafeTimePointCalculator>( - std::chrono::seconds(0))) // Set by config later + std::chrono::seconds(0))), // Set by config later + _must_send_updated_host_info(false) { _component.registerMetric(*_metrics); _component.registerMetricUpdateHook(_metricUpdateHook, @@ -420,7 +421,7 @@ Distributor::leaveRecoveryMode() _metrics->recoveryModeTime.addValue( _recoveryTimeStarted.getElapsedTimeAsDouble()); if (_doneInitializing) { - _component.getStateUpdater().immediately_send_get_node_state_replies(); + _must_send_updated_host_info = true; } } _schedulingMode = MaintenanceScheduler::NORMAL_SCHEDULING_MODE; @@ -694,10 +695,12 @@ toBucketSpaceStats(const NodeMaintenanceStats &stats) return BucketSpaceStats(stats.total, stats.syncing + stats.copyingIn); } -BucketSpacesStatsProvider::PerNodeBucketSpacesStats +using PerNodeBucketSpacesStats = BucketSpacesStatsProvider::PerNodeBucketSpacesStats; + +PerNodeBucketSpacesStats toBucketSpacesStats(const NodeMaintenanceStatsTracker &maintenanceStats) { - BucketSpacesStatsProvider::PerNodeBucketSpacesStats result; + PerNodeBucketSpacesStats result; for (const auto &nodeEntry : maintenanceStats.perNodeStats()) { for (const auto &bucketSpaceEntry : nodeEntry.second) { auto bucketSpace = document::FixedBucketSpaces::to_string(bucketSpaceEntry.first); @@ -707,6 +710,27 @@ toBucketSpacesStats(const NodeMaintenanceStatsTracker &maintenanceStats) return result; } +size_t spaces_with_merges_pending(const PerNodeBucketSpacesStats& stats) { + std::unordered_set<document::BucketSpace, document::BucketSpace::hash> spaces_with_pending; + for (auto& node : stats) { + for (auto& space : node.second) { + if (space.second.valid() && space.second.bucketsPending() != 0) { + // TODO avoid bucket space string roundtrip + spaces_with_pending.emplace(document::FixedBucketSpaces::from_string(space.first)); + } + } + } + return spaces_with_pending.size(); +} + +// TODO should we also trigger on !pending --> pending edge? +bool merge_no_longer_pending_edge(const PerNodeBucketSpacesStats& prev_stats, + const PerNodeBucketSpacesStats& curr_stats) { + const auto prev_pending = spaces_with_merges_pending(prev_stats); + const auto curr_pending = spaces_with_merges_pending(curr_stats); + return curr_pending < prev_pending; +} + } void @@ -717,7 +741,11 @@ Distributor::updateInternalMetricsForCompletedScan() _bucketDBMetricUpdater.completeRound(); _bucketDbStats = _bucketDBMetricUpdater.getLastCompleteStats(); _maintenanceStats = _scanner->getPendingMaintenanceStats(); - _bucketSpacesStats = toBucketSpacesStats(_maintenanceStats.perNodeStats); + auto new_space_stats = toBucketSpacesStats(_maintenanceStats.perNodeStats); + if (merge_no_longer_pending_edge(_bucketSpacesStats, new_space_stats)) { + _must_send_updated_host_info = true; + } + _bucketSpacesStats = std::move(new_space_stats); } void @@ -733,7 +761,8 @@ Distributor::scanNextBucket() MaintenanceScanner::ScanResult scanResult(_scanner->scanNext()); if (scanResult.isDone()) { updateInternalMetricsForCompletedScan(); - leaveRecoveryMode(); // Must happen after internal metrics updates + leaveRecoveryMode(); + send_updated_host_info_if_required(); _scanner->reset(); } else { const auto &distribution(_bucketSpaceRepo->get(scanResult.getBucketSpace()).getDistribution()); @@ -744,6 +773,13 @@ Distributor::scanNextBucket() return scanResult; } +void Distributor::send_updated_host_info_if_required() { + if (_must_send_updated_host_info) { + _component.getStateUpdater().immediately_send_get_node_state_replies(); + _must_send_updated_host_info = false; + } +} + void Distributor::startNextMaintenanceOperation() { diff --git a/storage/src/vespa/storage/distributor/distributor.h b/storage/src/vespa/storage/distributor/distributor.h index 15738144a58..9108abea2af 100644 --- a/storage/src/vespa/storage/distributor/distributor.h +++ b/storage/src/vespa/storage/distributor/distributor.h @@ -237,6 +237,7 @@ private: template <typename NodeFunctor> void for_each_available_content_node_in(const lib::ClusterState&, NodeFunctor&&); void invalidate_bucket_spaces_stats(); + void send_updated_host_info_if_required(); lib::ClusterStateBundle _clusterStateBundle; @@ -310,6 +311,7 @@ private: BucketDBMetricUpdater::Stats _bucketDbStats; DistributorHostInfoReporter _hostInfoReporter; std::unique_ptr<OwnershipTransferSafeTimePointCalculator> _ownershipSafeTimeCalc; + bool _must_send_updated_host_info; }; } // distributor |