diff options
Diffstat (limited to 'messagebus')
-rw-r--r-- | messagebus/src/tests/throttling/throttling.cpp | 46 | ||||
-rw-r--r-- | messagebus/src/vespa/messagebus/dynamicthrottlepolicy.cpp | 46 | ||||
-rw-r--r-- | messagebus/src/vespa/messagebus/dynamicthrottlepolicy.h | 13 |
3 files changed, 68 insertions, 37 deletions
diff --git a/messagebus/src/tests/throttling/throttling.cpp b/messagebus/src/tests/throttling/throttling.cpp index 07db548be48..a601845a5fc 100644 --- a/messagebus/src/tests/throttling/throttling.cpp +++ b/messagebus/src/tests/throttling/throttling.cpp @@ -252,36 +252,39 @@ Test::testMinOne() void Test::testDynamicWindowSize() { - std::unique_ptr<DynamicTimer> ptr(new DynamicTimer()); - DynamicTimer *timer = ptr.get(); + auto ptr = std::make_unique<DynamicTimer>(); + auto* timer = ptr.get(); DynamicThrottlePolicy policy(std::move(ptr)); - policy.setWindowSizeIncrement(5); + policy.setWindowSizeIncrement(5) + .setResizeRate(1); double windowSize = getWindowSize(policy, *timer, 100); - ASSERT_TRUE(windowSize >= 90 && windowSize <= 110); + ASSERT_TRUE(windowSize >= 90 && windowSize <= 105); windowSize = getWindowSize(policy, *timer, 200); - ASSERT_TRUE(windowSize >= 90 && windowSize <= 210); + ASSERT_TRUE(windowSize >= 180 && windowSize <= 205); windowSize = getWindowSize(policy, *timer, 50); - ASSERT_TRUE(windowSize >= 9 && windowSize <= 55); + ASSERT_TRUE(windowSize >= 45 && windowSize <= 55); windowSize = getWindowSize(policy, *timer, 500); - ASSERT_TRUE(windowSize >= 90 && windowSize <= 505); + ASSERT_TRUE(windowSize >= 450 && windowSize <= 505); windowSize = getWindowSize(policy, *timer, 100); - ASSERT_TRUE(windowSize >= 90 && windowSize <= 110); + ASSERT_TRUE(windowSize >= 90 && windowSize <= 115); } void Test::testIdleTimePeriod() { - ITimer::UP ptr(new DynamicTimer()); - DynamicTimer *timer = static_cast<DynamicTimer*>(ptr.get()); + auto ptr = std::make_unique<DynamicTimer>(); + auto* timer = ptr.get(); DynamicThrottlePolicy policy(std::move(ptr)); - policy.setWindowSizeIncrement(5); + policy.setWindowSizeIncrement(5) + .setMinWindowSize(1) + .setResizeRate(1); double windowSize = getWindowSize(policy, *timer, 100); ASSERT_TRUE(windowSize >= 90 && windowSize <= 110); @@ -303,12 +306,13 @@ Test::testIdleTimePeriod() void Test::testMinWindowSize() { - ITimer::UP ptr(new DynamicTimer()); - DynamicTimer *timer = static_cast<DynamicTimer*>(ptr.get()); + auto ptr = std::make_unique<DynamicTimer>(); + auto* timer = ptr.get(); DynamicThrottlePolicy policy(std::move(ptr)); - policy.setWindowSizeIncrement(5); - policy.setMinWindowSize(150); + policy.setWindowSizeIncrement(5) + .setResizeRate(1) + .setMinWindowSize(150); double windowSize = getWindowSize(policy, *timer, 200); ASSERT_TRUE(windowSize >= 150 && windowSize <= 210); @@ -317,12 +321,13 @@ Test::testMinWindowSize() void Test::testMaxWindowSize() { - ITimer::UP ptr(new DynamicTimer()); - DynamicTimer *timer = static_cast<DynamicTimer*>(ptr.get()); + auto ptr = std::make_unique<DynamicTimer>(); + auto* timer = ptr.get(); DynamicThrottlePolicy policy(std::move(ptr)); - policy.setWindowSizeIncrement(5); - policy.setMaxWindowSize(50); + policy.setWindowSizeIncrement(5) + .setResizeRate(1) + .setMaxWindowSize(50); double windowSize = getWindowSize(policy, *timer, 100); ASSERT_TRUE(windowSize >= 40 && windowSize <= 50); @@ -338,6 +343,7 @@ Test::getWindowSize(DynamicThrottlePolicy &policy, DynamicTimer &timer, uint32_t { SimpleMessage msg("foo"); SimpleReply reply("bar"); + reply.setContext(mbus::Context(uint64_t(1))); // To offset pending size bump in static policy for (uint32_t i = 0; i < 999; ++i) { uint32_t numPending = 0; @@ -354,6 +360,6 @@ Test::getWindowSize(DynamicThrottlePolicy &policy, DynamicTimer &timer, uint32_t } } uint32_t ret = policy.getMaxPendingCount(); - printf("getWindowSize() = %d\n", ret); + fprintf(stderr, "getWindowSize() = %u\n", ret); return ret; } diff --git a/messagebus/src/vespa/messagebus/dynamicthrottlepolicy.cpp b/messagebus/src/vespa/messagebus/dynamicthrottlepolicy.cpp index 00a76ca8b12..be61dcdc675 100644 --- a/messagebus/src/vespa/messagebus/dynamicthrottlepolicy.cpp +++ b/messagebus/src/vespa/messagebus/dynamicthrottlepolicy.cpp @@ -12,8 +12,8 @@ DynamicThrottlePolicy::DynamicThrottlePolicy() : _timer(new SteadyTimer()), _numSent(0), _numOk(0), - _resizeRate(3), - _resizeTime(_timer->getMilliTime()), + _resizeRate(3.0), + _resizeTime(0), _timeOfLastMessage(_timer->getMilliTime()), _idleTimePeriod(60000), _efficiencyThreshold(1), @@ -21,6 +21,7 @@ DynamicThrottlePolicy::DynamicThrottlePolicy() : _windowSize(_windowSizeIncrement), _maxWindowSize(INT_MAX), _minWindowSize(_windowSizeIncrement), + _decrementFactor(2.0), _windowSizeBackOff(0.9), _weight(1), _localMaxThroughput(0) @@ -30,8 +31,8 @@ DynamicThrottlePolicy::DynamicThrottlePolicy(double windowSizeIncrement) : _timer(new SteadyTimer()), _numSent(0), _numOk(0), - _resizeRate(3), - _resizeTime(_timer->getMilliTime()), + _resizeRate(3.0), + _resizeTime(0), _timeOfLastMessage(_timer->getMilliTime()), _idleTimePeriod(60000), _efficiencyThreshold(1), @@ -39,6 +40,7 @@ DynamicThrottlePolicy::DynamicThrottlePolicy(double windowSizeIncrement) : _windowSize(_windowSizeIncrement), _maxWindowSize(INT_MAX), _minWindowSize(_windowSizeIncrement), + _decrementFactor(2.0), _windowSizeBackOff(0.9), _weight(1), _localMaxThroughput(0) @@ -48,8 +50,8 @@ DynamicThrottlePolicy::DynamicThrottlePolicy(ITimer::UP timer) : _timer(std::move(timer)), _numSent(0), _numOk(0), - _resizeRate(3), - _resizeTime(_timer->getMilliTime()), + _resizeRate(3.0), + _resizeTime(0), _timeOfLastMessage(_timer->getMilliTime()), _idleTimePeriod(60000), _efficiencyThreshold(1), @@ -57,6 +59,7 @@ DynamicThrottlePolicy::DynamicThrottlePolicy(ITimer::UP timer) : _windowSize(_windowSizeIncrement), _maxWindowSize(INT_MAX), _minWindowSize(_windowSizeIncrement), + _decrementFactor(2.0), _windowSizeBackOff(0.9), _weight(1), _localMaxThroughput(0) @@ -73,20 +76,21 @@ DynamicThrottlePolicy & DynamicThrottlePolicy::setWindowSizeIncrement(double windowSizeIncrement) { _windowSizeIncrement = windowSizeIncrement; + _windowSize = std::max(_windowSize, _windowSizeIncrement); return *this; } DynamicThrottlePolicy & DynamicThrottlePolicy::setWindowSizeBackOff(double windowSizeBackOff) { - _windowSizeBackOff = windowSizeBackOff; + _windowSizeBackOff = std::max(0.0, std::min(1.0, windowSizeBackOff)); return *this; } DynamicThrottlePolicy & -DynamicThrottlePolicy::setResizeRate(uint32_t resizeRate) +DynamicThrottlePolicy::setResizeRate(double resizeRate) { - _resizeRate = resizeRate; + _resizeRate = std::max(2.0, resizeRate); return *this; } @@ -115,6 +119,14 @@ DynamicThrottlePolicy & DynamicThrottlePolicy::setMinWindowSize(double min) { _minWindowSize = min; + _windowSize = std::max(_minWindowSize, _windowSizeIncrement); + return *this; +} + +DynamicThrottlePolicy& +DynamicThrottlePolicy::setWindowSizeDecrementFactor(double decrementFactor) +{ + _decrementFactor = decrementFactor; return *this; } @@ -134,10 +146,14 @@ DynamicThrottlePolicy::canSend(const Message &msg, uint32_t pendingCount) } uint64_t time = _timer->getMilliTime(); if (time - _timeOfLastMessage > _idleTimePeriod) { - _windowSize = std::min(_windowSize, (double) pendingCount + _windowSizeIncrement); + _windowSize = std::max(_minWindowSize, std::min(_windowSize, pendingCount + _windowSizeIncrement)); + LOG(debug, "Idle time exceeded; WindowSize = %.2f", _windowSize); } _timeOfLastMessage = time; - return pendingCount < _windowSize; + auto windowSizeFloored = static_cast<uint32_t>(_windowSize); + // Use floating point window sizes, so the algorithm sees the difference between 1.1 and 1.9 window size. + bool carry = _numSent < ((_windowSize * _resizeRate) * (_windowSize - windowSizeFloored)); + return pendingCount < (windowSizeFloored + (carry ? 1 : 0)); } void @@ -156,7 +172,7 @@ DynamicThrottlePolicy::processMessage(Message &msg) _numSent = 0; _numOk = 0; - if (throughput > _localMaxThroughput * 1.01) { + if (throughput > _localMaxThroughput) { LOG(debug, "WindowSize = %.2f, Throughput = %f", _windowSize, throughput); _localMaxThroughput = throughput; _windowSize += _weight*_windowSizeIncrement; @@ -170,15 +186,15 @@ DynamicThrottlePolicy::processMessage(Message &msg) period *= 0.1; } double efficiency = throughput*period/_windowSize; - LOG(debug, "WindowSize = %.2f, Throughput = %f, Efficiency = %.2f, Elapsed = %.2f, Period = %.2f", _windowSize, throughput, efficiency, elapsed, period); if (efficiency < _efficiencyThreshold) { - double newSize = std::min(throughput * period, _windowSize); - _windowSize = std::min(newSize * _windowSizeBackOff, _windowSize - 2 * _windowSizeIncrement); + _windowSize = std::min(_windowSize * _windowSizeBackOff, _windowSize - _decrementFactor * _windowSizeIncrement); _localMaxThroughput = 0; } else { _windowSize += _weight*_windowSizeIncrement; } + LOG(debug, "WindowSize = %.2f, Throughput = %f, Efficiency = %.2f, Elapsed = %.2f, Period = %.2f", + _windowSize, throughput, efficiency, elapsed, period); } _windowSize = std::max(_minWindowSize, _windowSize); _windowSize = std::min(_maxWindowSize, _windowSize); diff --git a/messagebus/src/vespa/messagebus/dynamicthrottlepolicy.h b/messagebus/src/vespa/messagebus/dynamicthrottlepolicy.h index 99a549beef2..1ae7e88f876 100644 --- a/messagebus/src/vespa/messagebus/dynamicthrottlepolicy.h +++ b/messagebus/src/vespa/messagebus/dynamicthrottlepolicy.h @@ -19,7 +19,7 @@ private: ITimer::UP _timer; uint32_t _numSent; uint32_t _numOk; - uint32_t _resizeRate; + double _resizeRate; uint64_t _resizeTime; uint64_t _timeOfLastMessage; uint64_t _idleTimePeriod; @@ -28,6 +28,7 @@ private: double _windowSize; double _maxWindowSize; double _minWindowSize; + double _decrementFactor; double _windowSizeBackOff; double _weight; double _localMaxThroughput; @@ -96,7 +97,7 @@ public: * @param resizeRate The rate to set. * @return This, to allow chaining. */ - DynamicThrottlePolicy &setResizeRate(uint32_t resizeRate); + DynamicThrottlePolicy &setResizeRate(double resizeRate); /** * Sets the weight for this client. The larger the value, the more resources @@ -151,6 +152,14 @@ public: DynamicThrottlePolicy &setMinWindowSize(double min); /** + * Sets the relative step size when decreasing window size. + * + * @param decrementFactor the step size to set + * @return this, to allow chaining + */ + DynamicThrottlePolicy& setWindowSizeDecrementFactor(double decrementFactor); + + /** * Get the minimum number of pending operations allowed at any time. * * @return The minimum number of operations. |