diff options
author | Tor Brede Vekterli <vekterli@yahoo-inc.com> | 2017-09-15 10:45:20 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-09-15 10:45:20 +0200 |
commit | 02fc546e8545be4d466c178c838b382657becbcb (patch) | |
tree | 956c8ec5a211df460ad1b9ee46b0ebce0baad7d7 | |
parent | b0154a2c2259e67b2923823daf52724bebee1c07 (diff) | |
parent | 42856470b2f16ae049fe25a089ee151ca1a9d6d2 (diff) |
Merge pull request #3412 from vespa-engine/vekterli/inhibit-merge-scheduling-towards-busy-nodes
Inhibit scheduling of merges towards nodes that are marked busy
15 files changed, 234 insertions, 127 deletions
diff --git a/storage/src/tests/distributor/distributortest.cpp b/storage/src/tests/distributor/distributortest.cpp index bfa1181eca1..3aedd31f574 100644 --- a/storage/src/tests/distributor/distributortest.cpp +++ b/storage/src/tests/distributor/distributortest.cpp @@ -46,6 +46,8 @@ class Distributor_Test : public CppUnit::TestFixture, CPPUNIT_TEST(max_clock_skew_config_is_propagated_to_distributor_config); CPPUNIT_TEST(configured_safe_time_point_rejection_works_end_to_end); CPPUNIT_TEST(sequencing_config_is_propagated_to_distributor_config); + CPPUNIT_TEST(merge_busy_inhibit_duration_config_is_propagated_to_distributor_config); + CPPUNIT_TEST(merge_busy_inhibit_duration_is_propagated_to_pending_message_tracker); CPPUNIT_TEST_SUITE_END(); protected: @@ -73,6 +75,8 @@ protected: void max_clock_skew_config_is_propagated_to_distributor_config(); void configured_safe_time_point_rejection_works_end_to_end(); void sequencing_config_is_propagated_to_distributor_config(); + void merge_busy_inhibit_duration_config_is_propagated_to_distributor_config(); + void merge_busy_inhibit_duration_is_propagated_to_pending_message_tracker(); public: void setUp() override { @@ -177,6 +181,7 @@ private: void assertSingleBouncedRemoveReplyPresent(); void assertNoMessageBounced(); void configure_mutation_sequencing(bool enabled); + void configure_merge_busy_inhibit_duration(int seconds); }; CPPUNIT_TEST_SUITE_REGISTRATION(Distributor_Test); @@ -819,6 +824,49 @@ void Distributor_Test::sequencing_config_is_propagated_to_distributor_config() { CPPUNIT_ASSERT(getConfig().getSequenceMutatingOperations()); } +void +Distributor_Test::configure_merge_busy_inhibit_duration(int seconds) { + using namespace vespa::config::content::core; + using ConfigBuilder = StorDistributormanagerConfigBuilder; + + ConfigBuilder builder; + builder.inhibitMergeSendingOnBusyNodeDurationSec = seconds; + getConfig().configure(builder); + _distributor->enableNextConfig(); +} + +void Distributor_Test::merge_busy_inhibit_duration_config_is_propagated_to_distributor_config() { + setupDistributor(Redundancy(2), NodeCount(2), "storage:2 distributor:1"); + + configure_merge_busy_inhibit_duration(7); + CPPUNIT_ASSERT(getConfig().getInhibitMergesOnBusyNodeDuration() == std::chrono::seconds(7)); +} + +void Distributor_Test::merge_busy_inhibit_duration_is_propagated_to_pending_message_tracker() { + setupDistributor(Redundancy(2), NodeCount(2), "storage:1 distributor:1"); + addNodesToBucketDB(document::BucketId(16, 1), "0=1/1/1/t"); + + configure_merge_busy_inhibit_duration(100); + auto cmd = makeDummyRemoveCommand(); // Remove is for bucket 1 + _distributor->handleMessage(cmd); + + // Should send to content node 0 + CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.commands.size()); + CPPUNIT_ASSERT_EQUAL(api::MessageType::REMOVE, _sender.commands[0]->getType()); + auto& fwd_cmd = dynamic_cast<api::RemoveCommand&>(*_sender.commands[0]); + auto reply = fwd_cmd.makeReply(); + reply->setResult(api::ReturnCode(api::ReturnCode::BUSY)); + _distributor->handleReply(std::shared_ptr<api::StorageReply>(std::move(reply))); + + auto& node_info = _distributor->getPendingMessageTracker().getNodeInfo(); + + CPPUNIT_ASSERT(node_info.isBusy(0)); + getClock().addSecondsToTime(99); + CPPUNIT_ASSERT(node_info.isBusy(0)); + getClock().addSecondsToTime(2); + CPPUNIT_ASSERT(!node_info.isBusy(0)); +} + } } diff --git a/storage/src/tests/distributor/mergeoperationtest.cpp b/storage/src/tests/distributor/mergeoperationtest.cpp index b298bec3977..7e7ca52635e 100644 --- a/storage/src/tests/distributor/mergeoperationtest.cpp +++ b/storage/src/tests/distributor/mergeoperationtest.cpp @@ -28,6 +28,7 @@ class MergeOperationTest : public CppUnit::TestFixture, CPPUNIT_TEST(testMarkRedundantTrustedCopiesAsSourceOnly); CPPUNIT_TEST(onlyMarkRedundantRetiredReplicasAsSourceOnly); CPPUNIT_TEST(mark_post_merge_redundant_replicas_source_only); + CPPUNIT_TEST(merge_operation_is_blocked_by_any_busy_target_node); CPPUNIT_TEST_SUITE_END(); std::unique_ptr<PendingMessageTracker> _pendingTracker; @@ -41,6 +42,7 @@ protected: void testMarkRedundantTrustedCopiesAsSourceOnly(); void onlyMarkRedundantRetiredReplicasAsSourceOnly(); void mark_post_merge_redundant_replicas_source_only(); + void merge_operation_is_blocked_by_any_busy_target_node(); public: void setUp() override { @@ -478,5 +480,28 @@ void MergeOperationTest::mark_post_merge_redundant_replicas_source_only() { getNodeList("storage:10", 4, "3,5,7,6")); } +void MergeOperationTest::merge_operation_is_blocked_by_any_busy_target_node() { + getClock().setAbsoluteTimeInSeconds(10); + addNodesToBucketDB(document::BucketId(16, 1), "0=10/1/1/t,1=20/1/1,2=10/1/1/t"); + _distributor->enableClusterState(lib::ClusterState("distributor:1 storage:3")); + MergeOperation op(BucketAndNodes(document::BucketId(16, 1), toVector<uint16_t>(0, 1, 2))); + op.setIdealStateManager(&getIdealStateManager()); + + // Should not block on nodes _not_ included in operation node set + _pendingTracker->getNodeInfo().setBusy(3, std::chrono::seconds(10)); + CPPUNIT_ASSERT(!op.isBlocked(*_pendingTracker)); + + // Node 1 is included in operation node set and should cause a block + _pendingTracker->getNodeInfo().setBusy(0, std::chrono::seconds(10)); + CPPUNIT_ASSERT(op.isBlocked(*_pendingTracker)); + + getClock().addSecondsToTime(11); + CPPUNIT_ASSERT(!op.isBlocked(*_pendingTracker)); // No longer busy + + // Should block on other operation nodes than the first listed as well + _pendingTracker->getNodeInfo().setBusy(1, std::chrono::seconds(10)); + CPPUNIT_ASSERT(op.isBlocked(*_pendingTracker)); +} + } // distributor } // storage diff --git a/storage/src/tests/distributor/nodeinfotest.cpp b/storage/src/tests/distributor/nodeinfotest.cpp index a3eaf2505b3..0363f25831a 100644 --- a/storage/src/tests/distributor/nodeinfotest.cpp +++ b/storage/src/tests/distributor/nodeinfotest.cpp @@ -57,11 +57,11 @@ NodeInfoTest::testSimple() CPPUNIT_ASSERT_EQUAL(1, (int)info.getPendingCount(7)); CPPUNIT_ASSERT_EQUAL(0, (int)info.getPendingCount(5)); - info.setBusy(5); + info.setBusy(5, std::chrono::seconds(60)); clock.addSecondsToTime(10); - info.setBusy(1); + info.setBusy(1, std::chrono::seconds(60)); clock.addSecondsToTime(20); - info.setBusy(42); + info.setBusy(42, std::chrono::seconds(60)); CPPUNIT_ASSERT_EQUAL(true, info.isBusy(5)); CPPUNIT_ASSERT_EQUAL(true, info.isBusy(1)); diff --git a/storage/src/tests/distributor/pendingmessagetrackertest.cpp b/storage/src/tests/distributor/pendingmessagetrackertest.cpp index cda9a9e3782..fa36b4e6305 100644 --- a/storage/src/tests/distributor/pendingmessagetrackertest.cpp +++ b/storage/src/tests/distributor/pendingmessagetrackertest.cpp @@ -13,12 +13,10 @@ namespace storage { namespace distributor { -// Workaround typedef for not (yet) running with --std=c++14 which supports -// user defined literals. Once we do, replace ms(123) with 123ms. -using ms = std::chrono::milliseconds; +using namespace std::chrono_literals; -class PendingMessageTrackerCallback_Test : public CppUnit::TestFixture { - CPPUNIT_TEST_SUITE(PendingMessageTrackerCallback_Test); +class PendingMessageTrackerTest : public CppUnit::TestFixture { + CPPUNIT_TEST_SUITE(PendingMessageTrackerTest); CPPUNIT_TEST(testSimple); CPPUNIT_TEST(testMultipleMessages); CPPUNIT_TEST(testStartPage); @@ -36,6 +34,8 @@ class PendingMessageTrackerCallback_Test : public CppUnit::TestFixture { CPPUNIT_TEST(timeTravellingClockLatenciesNotRegistered); CPPUNIT_TEST(statsSnapshotIncludesAllNodes); CPPUNIT_TEST(latencyProviderForwardsToImplementation); + CPPUNIT_TEST(busy_reply_marks_node_as_busy); + CPPUNIT_TEST(busy_node_duration_can_be_adjusted); CPPUNIT_TEST_SUITE_END(); public: @@ -56,6 +56,8 @@ public: void timeTravellingClockLatenciesNotRegistered(); void statsSnapshotIncludesAllNodes(); void latencyProviderForwardsToImplementation(); + void busy_reply_marks_node_as_busy(); + void busy_node_duration_can_be_adjusted(); private: void insertMessages(PendingMessageTracker& tracker); @@ -122,10 +124,12 @@ public: } void sendPutReply(api::PutCommand& putCmd, - const RequestBuilder& builder) + const RequestBuilder& builder, + const api::ReturnCode& result = api::ReturnCode()) { assignMockedTime(builder.atTime()); auto putReply = putCmd.makeReply(); + putReply->setResult(result); _tracker->reply(*putReply); } @@ -149,8 +153,8 @@ public: void sendPutAndReplyWithLatency(uint16_t node, std::chrono::milliseconds latency) { - auto put = sendPut(RequestBuilder().atTime(ms(1000)).toNode(node)); - sendPutReply(*put, RequestBuilder().atTime(ms(1000) + latency)); + auto put = sendPut(RequestBuilder().atTime(1000ms).toNode(node)); + sendPutReply(*put, RequestBuilder().atTime(1000ms + latency)); } OperationStats getNodePutOperationStats(uint16_t node) { @@ -158,6 +162,7 @@ public: } PendingMessageTracker& tracker() { return *_tracker; } + auto& clock() { return _clock; } private: std::string createDummyIdString(const document::BucketId& bucket) const { @@ -215,17 +220,16 @@ Fixture::Fixture() _clock.setAbsoluteTimeInSeconds(1); // Have to set clock in compReg before constructing tracker, or it'll // flip out and die on an explicit nullptr check. - _tracker = std::unique_ptr<PendingMessageTracker>( - new PendingMessageTracker(_compReg)); + _tracker = std::make_unique<PendingMessageTracker>(_compReg); } Fixture::~Fixture() {} } -CPPUNIT_TEST_SUITE_REGISTRATION(PendingMessageTrackerCallback_Test); +CPPUNIT_TEST_SUITE_REGISTRATION(PendingMessageTrackerTest); void -PendingMessageTrackerCallback_Test::testSimple() +PendingMessageTrackerTest::testSimple() { StorageComponentRegisterImpl compReg; framework::defaultimplementation::FakeClock clock; @@ -269,7 +273,7 @@ PendingMessageTrackerCallback_Test::testSimple() } void -PendingMessageTrackerCallback_Test::insertMessages(PendingMessageTracker& tracker) +PendingMessageTrackerTest::insertMessages(PendingMessageTracker& tracker) { for (uint32_t i = 0; i < 4; i++) { std::ostringstream ost; @@ -294,7 +298,7 @@ PendingMessageTrackerCallback_Test::insertMessages(PendingMessageTracker& tracke } void -PendingMessageTrackerCallback_Test::testStartPage() +PendingMessageTrackerTest::testStartPage() { StorageComponentRegisterImpl compReg; framework::defaultimplementation::FakeClock clock; @@ -318,7 +322,7 @@ PendingMessageTrackerCallback_Test::testStartPage() } void -PendingMessageTrackerCallback_Test::testMultipleMessages() +PendingMessageTrackerTest::testMultipleMessages() { StorageComponentRegisterImpl compReg; framework::defaultimplementation::FakeClock clock; @@ -419,7 +423,7 @@ public: } void -PendingMessageTrackerCallback_Test::testGetPendingMessageTypes() +PendingMessageTrackerTest::testGetPendingMessageTypes() { StorageComponentRegisterImpl compReg; framework::defaultimplementation::FakeClock clock; @@ -456,7 +460,7 @@ PendingMessageTrackerCallback_Test::testGetPendingMessageTypes() } void -PendingMessageTrackerCallback_Test::testHasPendingMessage() +PendingMessageTrackerTest::testHasPendingMessage() { StorageComponentRegisterImpl compReg; framework::defaultimplementation::FakeClock clock; @@ -508,7 +512,7 @@ public: } // anon ns void -PendingMessageTrackerCallback_Test::testGetAllMessagesForSingleBucket() +PendingMessageTrackerTest::testGetAllMessagesForSingleBucket() { StorageComponentRegisterImpl compReg; framework::defaultimplementation::FakeClock clock; @@ -535,10 +539,10 @@ PendingMessageTrackerCallback_Test::testGetAllMessagesForSingleBucket() } void -PendingMessageTrackerCallback_Test::nodeStatsCanBeOutputStreamed() +PendingMessageTrackerTest::nodeStatsCanBeOutputStreamed() { NodeStats stats; - stats.puts = makeOpStats(ms(56789), 10); + stats.puts = makeOpStats(56789ms, 10); std::ostringstream os; os << stats; @@ -550,120 +554,143 @@ PendingMessageTrackerCallback_Test::nodeStatsCanBeOutputStreamed() } void -PendingMessageTrackerCallback_Test::totalPutLatencyIsInitiallyZero() +PendingMessageTrackerTest::totalPutLatencyIsInitiallyZero() { Fixture fixture; - CPPUNIT_ASSERT_EQUAL(makeOpStats(ms(0), 0), + CPPUNIT_ASSERT_EQUAL(makeOpStats(0ms, 0), fixture.getNodePutOperationStats(0)); } void -PendingMessageTrackerCallback_Test::statsNotAlteredBeforeReplyReceived() +PendingMessageTrackerTest::statsNotAlteredBeforeReplyReceived() { Fixture fixture; - fixture.sendPut(RequestBuilder().atTime(ms(1000)).toNode(0)); - CPPUNIT_ASSERT_EQUAL(makeOpStats(ms(0), 0), + fixture.sendPut(RequestBuilder().atTime(1000ms).toNode(0)); + CPPUNIT_ASSERT_EQUAL(makeOpStats(0ms, 0), fixture.getNodePutOperationStats(0)); } void -PendingMessageTrackerCallback_Test::totalPutLatencyIsTrackedForSingleRequest() +PendingMessageTrackerTest::totalPutLatencyIsTrackedForSingleRequest() { Fixture fixture; - fixture.sendPutAndReplyWithLatency(0, ms(500)); + fixture.sendPutAndReplyWithLatency(0, 500ms); - CPPUNIT_ASSERT_EQUAL(makeOpStats(ms(500), 1), + CPPUNIT_ASSERT_EQUAL(makeOpStats(500ms, 1), fixture.getNodePutOperationStats(0)); } void -PendingMessageTrackerCallback_Test::statsAreTrackedSeparatelyPerNode() +PendingMessageTrackerTest::statsAreTrackedSeparatelyPerNode() { Fixture fixture; - fixture.sendPutAndReplyWithLatency(0, ms(500)); - fixture.sendPutAndReplyWithLatency(1, ms(600)); + fixture.sendPutAndReplyWithLatency(0, 500ms); + fixture.sendPutAndReplyWithLatency(1, 600ms); - CPPUNIT_ASSERT_EQUAL(makeOpStats(ms(500), 1), + CPPUNIT_ASSERT_EQUAL(makeOpStats(500ms, 1), fixture.getNodePutOperationStats(0)); - CPPUNIT_ASSERT_EQUAL(makeOpStats(ms(600), 1), + CPPUNIT_ASSERT_EQUAL(makeOpStats(600ms, 1), fixture.getNodePutOperationStats(1)); } // Necessarily, this test will have to be altered when we add tracking of // other message types as well. void -PendingMessageTrackerCallback_Test::onlyPutMessagesAreTracked() +PendingMessageTrackerTest::onlyPutMessagesAreTracked() { Fixture fixture; auto remove = fixture.sendRemove( - RequestBuilder().atTime(ms(1000)).toNode(0)); - fixture.sendRemoveReply(*remove, RequestBuilder().atTime(ms(2000))); - CPPUNIT_ASSERT_EQUAL(makeOpStats(ms(0), 0), + RequestBuilder().atTime(1000ms).toNode(0)); + fixture.sendRemoveReply(*remove, RequestBuilder().atTime(2000ms)); + CPPUNIT_ASSERT_EQUAL(makeOpStats(0ms, 0), fixture.getNodePutOperationStats(0)); } void -PendingMessageTrackerCallback_Test::totalPutLatencyIsAggregatedAcrossRequests() +PendingMessageTrackerTest::totalPutLatencyIsAggregatedAcrossRequests() { Fixture fixture; // Model 2 concurrent puts to node 0. - fixture.sendPutAndReplyWithLatency(0, ms(500)); - fixture.sendPutAndReplyWithLatency(0, ms(600)); - CPPUNIT_ASSERT_EQUAL(makeOpStats(ms(1100), 2), + fixture.sendPutAndReplyWithLatency(0, 500ms); + fixture.sendPutAndReplyWithLatency(0, 600ms); + CPPUNIT_ASSERT_EQUAL(makeOpStats(1100ms, 2), fixture.getNodePutOperationStats(0)); } void -PendingMessageTrackerCallback_Test::clearingMessagesDoesNotAffectStats() +PendingMessageTrackerTest::clearingMessagesDoesNotAffectStats() { Fixture fixture; - fixture.sendPutAndReplyWithLatency(2, ms(2000)); + fixture.sendPutAndReplyWithLatency(2, 2000ms); fixture.tracker().clearMessagesForNode(2); - CPPUNIT_ASSERT_EQUAL(makeOpStats(ms(2000), 1), + CPPUNIT_ASSERT_EQUAL(makeOpStats(2000ms, 1), fixture.getNodePutOperationStats(2)); } void -PendingMessageTrackerCallback_Test::timeTravellingClockLatenciesNotRegistered() +PendingMessageTrackerTest::timeTravellingClockLatenciesNotRegistered() { Fixture fixture; - auto put = fixture.sendPut(RequestBuilder().atTime(ms(1000)).toNode(0)); - fixture.sendPutReply(*put, RequestBuilder().atTime(ms(999))); + auto put = fixture.sendPut(RequestBuilder().atTime(1000ms).toNode(0)); + fixture.sendPutReply(*put, RequestBuilder().atTime(999ms)); // Latency increase of zero, but we do count the request itself. - CPPUNIT_ASSERT_EQUAL(makeOpStats(ms(0), 1), + CPPUNIT_ASSERT_EQUAL(makeOpStats(0ms, 1), fixture.getNodePutOperationStats(0)); } void -PendingMessageTrackerCallback_Test::statsSnapshotIncludesAllNodes() +PendingMessageTrackerTest::statsSnapshotIncludesAllNodes() { Fixture fixture; - fixture.sendPutAndReplyWithLatency(0, ms(500)); - fixture.sendPutAndReplyWithLatency(1, ms(600)); + fixture.sendPutAndReplyWithLatency(0, 500ms); + fixture.sendPutAndReplyWithLatency(1, 600ms); NodeStatsSnapshot snapshot = fixture.tracker().getLatencyStatistics(); CPPUNIT_ASSERT_EQUAL(size_t(2), snapshot.nodeToStats.size()); - CPPUNIT_ASSERT_EQUAL(makeOpStats(ms(500), 1), + CPPUNIT_ASSERT_EQUAL(makeOpStats(500ms, 1), snapshot.nodeToStats[0].puts); - CPPUNIT_ASSERT_EQUAL(makeOpStats(ms(600), 1), + CPPUNIT_ASSERT_EQUAL(makeOpStats(600ms, 1), snapshot.nodeToStats[1].puts); } void -PendingMessageTrackerCallback_Test::latencyProviderForwardsToImplementation() +PendingMessageTrackerTest::latencyProviderForwardsToImplementation() { Fixture fixture; - fixture.sendPutAndReplyWithLatency(0, ms(500)); + fixture.sendPutAndReplyWithLatency(0, 500ms); LatencyStatisticsProvider& provider( fixture.tracker().getLatencyStatisticsProvider()); NodeStatsSnapshot snapshot = provider.getLatencyStatistics(); CPPUNIT_ASSERT_EQUAL(size_t(1), snapshot.nodeToStats.size()); - CPPUNIT_ASSERT_EQUAL(makeOpStats(ms(500), 1), + CPPUNIT_ASSERT_EQUAL(makeOpStats(500ms, 1), snapshot.nodeToStats[0].puts); } +// TODO don't set busy for visitor replies? These will mark the node as busy today, +// but have the same actual semantics as busy merges (i.e. "queue is full", not "node +// is too busy to accept new requests in general"). + +void PendingMessageTrackerTest::busy_reply_marks_node_as_busy() { + Fixture f; + auto cmd = f.sendPut(RequestBuilder().toNode(0)); + CPPUNIT_ASSERT(!f.tracker().getNodeInfo().isBusy(0)); + f.sendPutReply(*cmd, RequestBuilder(), api::ReturnCode(api::ReturnCode::BUSY)); + CPPUNIT_ASSERT(f.tracker().getNodeInfo().isBusy(0)); + CPPUNIT_ASSERT(!f.tracker().getNodeInfo().isBusy(1)); +} + +void PendingMessageTrackerTest::busy_node_duration_can_be_adjusted() { + Fixture f; + auto cmd = f.sendPut(RequestBuilder().toNode(0)); + f.tracker().setNodeBusyDuration(std::chrono::seconds(10)); + f.sendPutReply(*cmd, RequestBuilder(), api::ReturnCode(api::ReturnCode::BUSY)); + CPPUNIT_ASSERT(f.tracker().getNodeInfo().isBusy(0)); + f.clock().addSecondsToTime(11); + CPPUNIT_ASSERT(!f.tracker().getNodeInfo().isBusy(0)); +} + } // distributor } // storage diff --git a/storage/src/vespa/storage/config/distributorconfiguration.cpp b/storage/src/vespa/storage/config/distributorconfiguration.cpp index 3007eff3d92..44cf56fdff8 100644 --- a/storage/src/vespa/storage/config/distributorconfiguration.cpp +++ b/storage/src/vespa/storage/config/distributorconfiguration.cpp @@ -27,6 +27,7 @@ DistributorConfiguration::DistributorConfiguration(StorageComponent& component) _maxVisitorsPerNodePerClientVisitor(4), _minBucketsPerVisitor(5), _maxClusterClockSkew(0), + _inhibitMergeSendingOnBusyNodeDuration(std::chrono::seconds(60)), _doInlineSplit(true), _enableJoinForSiblingLessBuckets(false), _enableInconsistentJoin(false), @@ -149,8 +150,10 @@ DistributorConfiguration::configure(const vespa::config::content::core::StorDist configureMaintenancePriorities(config); if (config.maxClusterClockSkewSec >= 0) { - _maxClusterClockSkew = std::chrono::seconds( - config.maxClusterClockSkewSec); + _maxClusterClockSkew = std::chrono::seconds(config.maxClusterClockSkewSec); + } + if (config.inhibitMergeSendingOnBusyNodeDurationSec >= 0) { + _inhibitMergeSendingOnBusyNodeDuration = std::chrono::seconds(config.inhibitMergeSendingOnBusyNodeDurationSec); } LOG(debug, diff --git a/storage/src/vespa/storage/config/distributorconfiguration.h b/storage/src/vespa/storage/config/distributorconfiguration.h index d33f46befac..d3e589f9d00 100644 --- a/storage/src/vespa/storage/config/distributorconfiguration.h +++ b/storage/src/vespa/storage/config/distributorconfiguration.h @@ -15,7 +15,7 @@ namespace distributor { class DistributorConfiguration { public: - DistributorConfiguration(StorageComponent& component); + explicit DistributorConfiguration(StorageComponent& component); ~DistributorConfiguration(); struct MaintenancePriorities @@ -225,6 +225,9 @@ public: std::chrono::seconds getMaxClusterClockSkew() const noexcept { return _maxClusterClockSkew; } + std::chrono::seconds getInhibitMergesOnBusyNodeDuration() const noexcept { + return _inhibitMergeSendingOnBusyNodeDuration; + } bool getSequenceMutatingOperations() const noexcept { return _sequenceMutatingOperations; @@ -263,6 +266,7 @@ private: MaintenancePriorities _maintenancePriorities; std::chrono::seconds _maxClusterClockSkew; + std::chrono::seconds _inhibitMergeSendingOnBusyNodeDuration; bool _doInlineSplit; bool _enableJoinForSiblingLessBuckets; diff --git a/storage/src/vespa/storage/config/stor-distributormanager.def b/storage/src/vespa/storage/config/stor-distributormanager.def index f549442cf51..0a43a7e8aa4 100644 --- a/storage/src/vespa/storage/config/stor-distributormanager.def +++ b/storage/src/vespa/storage/config/stor-distributormanager.def @@ -180,3 +180,7 @@ max_cluster_clock_skew_sec int default=1 ## modifications to documents when sent from multiple feed clients. sequence_mutating_operations bool default=true +## Number of seconds that scheduling of new merge operations should be inhibited +## towards a node if it has indicated that its merge queues are full or it is +## suffering from resource exhaustion. +inhibit_merge_sending_on_busy_node_duration_sec int default=30 diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp index f0b80900fe4..0dc52650131 100644 --- a/storage/src/vespa/storage/distributor/distributor.cpp +++ b/storage/src/vespa/storage/distributor/distributor.cpp @@ -684,12 +684,10 @@ Distributor::doNonCriticalTick(framework::ThreadIndex) void Distributor::enableNextConfig() { - _hostInfoReporter.enableReporting( - getConfig().getEnableHostInfoReporting()); - _bucketDBMetricUpdater.setMinimumReplicaCountingMode( - getConfig().getMinimumReplicaCountingMode()); - _ownershipSafeTimeCalc->setMaxClusterClockSkew( - getConfig().getMaxClusterClockSkew()); + _hostInfoReporter.enableReporting(getConfig().getEnableHostInfoReporting()); + _bucketDBMetricUpdater.setMinimumReplicaCountingMode(getConfig().getMinimumReplicaCountingMode()); + _ownershipSafeTimeCalc->setMaxClusterClockSkew(getConfig().getMaxClusterClockSkew()); + _pendingMessageTracker.setNodeBusyDuration(getConfig().getInhibitMergesOnBusyNodeDuration()); } void diff --git a/storage/src/vespa/storage/distributor/nodeinfo.cpp b/storage/src/vespa/storage/distributor/nodeinfo.cpp index f018f78ef7a..9430f1691e0 100644 --- a/storage/src/vespa/storage/distributor/nodeinfo.cpp +++ b/storage/src/vespa/storage/distributor/nodeinfo.cpp @@ -6,21 +6,17 @@ namespace storage::distributor { NodeInfo::NodeInfo(const framework::Clock& clock) - : _clock(clock) {} + : _clock(clock) {} -uint32_t -NodeInfo::getPendingCount(uint16_t idx) const -{ +uint32_t NodeInfo::getPendingCount(uint16_t idx) const { return getNode(idx)._pending; } -bool -NodeInfo::isBusy(uint16_t idx) const -{ +bool NodeInfo::isBusy(uint16_t idx) const { const SingleNodeInfo& info = getNode(idx); - if (info._busyTime.isSet()) { - if (_clock.getTimeInSeconds() > info._busyTime) { - info._busyTime = framework::SecondTime(0); + if (info._busyUntilTime.time_since_epoch().count() != 0) { + if (_clock.getMonotonicTime() > info._busyUntilTime) { + info._busyUntilTime = framework::MonotonicTimePoint{}; } else { return true; } @@ -29,22 +25,15 @@ NodeInfo::isBusy(uint16_t idx) const return false; } -void -NodeInfo::setBusy(uint16_t idx) -{ - getNode(idx)._busyTime = _clock.getTimeInSeconds() - + framework::SecondTime(60); +void NodeInfo::setBusy(uint16_t idx, framework::MonotonicDuration for_duration) { + getNode(idx)._busyUntilTime = _clock.getMonotonicTime() + for_duration; } -void -NodeInfo::incPending(uint16_t idx) -{ +void NodeInfo::incPending(uint16_t idx) { getNode(idx)._pending++; } -void -NodeInfo::decPending(uint16_t idx) -{ +void NodeInfo::decPending(uint16_t idx) { SingleNodeInfo& info = getNode(idx); if (info._pending > 0) { @@ -52,28 +41,24 @@ NodeInfo::decPending(uint16_t idx) } } -void -NodeInfo::clearPending(uint16_t idx) -{ +void NodeInfo::clearPending(uint16_t idx) { SingleNodeInfo& info = getNode(idx); info._pending = 0; } -NodeInfo::SingleNodeInfo& -NodeInfo::getNode(uint16_t idx) -{ - while ((int)_nodes.size() < idx + 1) { - _nodes.push_back(SingleNodeInfo()); +NodeInfo::SingleNodeInfo& NodeInfo::getNode(uint16_t idx) { + const auto index_lbound = static_cast<size_t>(idx) + 1; + while (_nodes.size() < index_lbound) { + _nodes.emplace_back(); } return _nodes[idx]; } -const NodeInfo::SingleNodeInfo& -NodeInfo::getNode(uint16_t idx) const -{ - while ((int)_nodes.size() < idx + 1) { - _nodes.push_back(SingleNodeInfo()); +const NodeInfo::SingleNodeInfo& NodeInfo::getNode(uint16_t idx) const { + const auto index_lbound = static_cast<size_t>(idx) + 1; + while (_nodes.size() < index_lbound) { + _nodes.emplace_back(); } return _nodes[idx]; diff --git a/storage/src/vespa/storage/distributor/nodeinfo.h b/storage/src/vespa/storage/distributor/nodeinfo.h index 57fd0145331..a28a8965714 100644 --- a/storage/src/vespa/storage/distributor/nodeinfo.h +++ b/storage/src/vespa/storage/distributor/nodeinfo.h @@ -14,13 +14,13 @@ namespace storage::distributor { class NodeInfo { public: - NodeInfo(const framework::Clock& clock); + explicit NodeInfo(const framework::Clock& clock); uint32_t getPendingCount(uint16_t idx) const; bool isBusy(uint16_t idx) const; - void setBusy(uint16_t idx); + void setBusy(uint16_t idx, framework::MonotonicDuration for_duration); void incPending(uint16_t idx); @@ -30,11 +30,10 @@ public: private: struct SingleNodeInfo { - SingleNodeInfo() - : _pending(0), _busyTime(0) {}; + SingleNodeInfo() : _pending(0), _busyUntilTime() {} uint32_t _pending; - mutable framework::SecondTime _busyTime; + mutable framework::MonotonicTimePoint _busyUntilTime; }; mutable std::vector<SingleNodeInfo> _nodes; diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp index 81d5c1a7ba6..28381af4021 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp @@ -1,6 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "mergeoperation.h" #include <vespa/storage/distributor/idealstatemanager.h> +#include <array> #include <vespa/log/bufferedlogger.h> LOG_SETUP(".distributor.operation.idealstate.merge"); @@ -302,25 +303,21 @@ MergeOperation::onReceive(DistributorMessageSender& sender, namespace { -static const uint32_t WRITE_FEED_MESSAGE_TYPES[] = -{ +constexpr std::array<uint32_t, 7> WRITE_FEED_MESSAGE_TYPES {{ api::MessageType::PUT_ID, api::MessageType::REMOVE_ID, api::MessageType::UPDATE_ID, api::MessageType::REMOVELOCATION_ID, api::MessageType::MULTIOPERATION_ID, api::MessageType::BATCHPUTREMOVE_ID, - api::MessageType::BATCHDOCUMENTUPDATE_ID, - 0 -}; + api::MessageType::BATCHDOCUMENTUPDATE_ID +}}; } -bool -MergeOperation::shouldBlockThisOperation(uint32_t messageType, uint8_t pri) const -{ - for (uint32_t i = 0; WRITE_FEED_MESSAGE_TYPES[i] != 0; ++i) { - if (messageType == WRITE_FEED_MESSAGE_TYPES[i]) { +bool MergeOperation::shouldBlockThisOperation(uint32_t messageType, uint8_t pri) const { + for (auto blocking_type : WRITE_FEED_MESSAGE_TYPES) { + if (messageType == blocking_type) { return true; } } @@ -328,4 +325,14 @@ MergeOperation::shouldBlockThisOperation(uint32_t messageType, uint8_t pri) cons return IdealStateOperation::shouldBlockThisOperation(messageType, pri); } +bool MergeOperation::isBlocked(const PendingMessageTracker& pending_tracker) const { + const auto& node_info = pending_tracker.getNodeInfo(); + for (auto node : getNodes()) { + if (node_info.isBusy(node)) { + return true; + } + } + return IdealStateOperation::isBlocked(pending_tracker); +} + } diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h index 36b894efb20..d09bc0ba5c4 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h +++ b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h @@ -46,7 +46,8 @@ public: const document::BucketId&, MergeLimiter&, std::vector<MergeMetaData>&); - bool shouldBlockThisOperation(uint32_t messageType, uint8_t pri) const override ; + bool shouldBlockThisOperation(uint32_t messageType, uint8_t pri) const override; + bool isBlocked(const PendingMessageTracker& pendingMessages) const override; private: static void addIdealNodes( const std::vector<uint16_t>& idealNodes, diff --git a/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp b/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp index b2eb416c48f..aeadca17d13 100644 --- a/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp +++ b/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp @@ -18,6 +18,7 @@ PendingMessageTracker::PendingMessageTracker(framework::ComponentRegister& cr) _nodeIndexToStats(), _nodeInfo(_component.getClock()), _statisticsForwarder(*this), + _nodeBusyDuration(60), _lock() { _component.registerStatusPage(*this); @@ -54,15 +55,14 @@ PendingMessageTracker::currentTime() const namespace { template <typename Pair> -struct PairAsRange -{ +struct PairAsRange { Pair _pair; explicit PairAsRange(Pair pair) : _pair(std::move(pair)) {} - auto begin() -> decltype(_pair.first) { return _pair.first; } - auto end() -> decltype(_pair.second) { return _pair.second; } - auto begin() const -> decltype(_pair.first) { return _pair.first; } - auto end() const -> decltype(_pair.second) { return _pair.second; } + auto begin() { return _pair.first; } + auto end() { return _pair.second; } + auto begin() const { return _pair.first; } + auto end() const { return _pair.second; } }; template <typename Pair> @@ -133,7 +133,7 @@ PendingMessageTracker::reply(const api::StorageReply& r) updateNodeStatsOnReply(*iter); api::ReturnCode::Result code = r.getResult().getResult(); if (code == api::ReturnCode::BUSY || code == api::ReturnCode::TIMEOUT) { - _nodeInfo.setBusy(r.getAddress()->getIndex()); + _nodeInfo.setBusy(r.getAddress()->getIndex(), _nodeBusyDuration); } LOG(debug, "Erased message with id %zu", msgId); msgs.erase(msgId); diff --git a/storage/src/vespa/storage/distributor/pendingmessagetracker.h b/storage/src/vespa/storage/distributor/pendingmessagetracker.h index 42704a02a40..9b49e94d51f 100644 --- a/storage/src/vespa/storage/distributor/pendingmessagetracker.h +++ b/storage/src/vespa/storage/distributor/pendingmessagetracker.h @@ -21,6 +21,7 @@ #include <set> #include <unordered_map> +#include <chrono> namespace storage { namespace distributor { @@ -106,7 +107,7 @@ public: * storage node. "Completed" here means both successful and failed * operations. Statistics are monotonically increasing within the scope of * the process' lifetime and are never reset. This models how the Linux - * kernel reports its internal stats and means the caller must maintan + * kernel reports its internal stats and means the caller must maintain * value snapshots to extract meaningful time series information. * * If stats are requested for a node that has not had any operations @@ -137,6 +138,10 @@ public: return _statisticsForwarder; } + void setNodeBusyDuration(std::chrono::seconds secs) noexcept { + _nodeBusyDuration = secs; + } + private: struct MessageEntry { TimePoint timeStamp; @@ -210,6 +215,7 @@ private: std::unordered_map<uint16_t, NodeStats> _nodeIndexToStats; NodeInfo _nodeInfo; ForwardingLatencyStatisticsProvider _statisticsForwarder; + std::chrono::seconds _nodeBusyDuration; // Since distributor is currently single-threaded, this will only // contend when status page is being accessed. It is, however, required diff --git a/storage/src/vespa/storage/distributor/throttlingoperationstarter.cpp b/storage/src/vespa/storage/distributor/throttlingoperationstarter.cpp index 282ddd871b8..abd9778d72c 100644 --- a/storage/src/vespa/storage/distributor/throttlingoperationstarter.cpp +++ b/storage/src/vespa/storage/distributor/throttlingoperationstarter.cpp @@ -26,7 +26,7 @@ ThrottlingOperationStarter::start(const std::shared_ptr<Operation>& operation, if (!canStart(_pendingCount, priority)) { return false; } - Operation::SP wrappedOp(new ThrottlingOperation(operation, *this)); + auto wrappedOp = std::make_shared<ThrottlingOperation>(operation, *this); ++_pendingCount; return _starterImpl.start(wrappedOp, priority); } |