aboutsummaryrefslogtreecommitdiffstats
path: root/messagebus
diff options
context:
space:
mode:
Diffstat (limited to 'messagebus')
-rw-r--r--messagebus/src/tests/throttling/throttling.cpp46
-rw-r--r--messagebus/src/vespa/messagebus/dynamicthrottlepolicy.cpp46
-rw-r--r--messagebus/src/vespa/messagebus/dynamicthrottlepolicy.h13
3 files changed, 68 insertions, 37 deletions
diff --git a/messagebus/src/tests/throttling/throttling.cpp b/messagebus/src/tests/throttling/throttling.cpp
index 07db548be48..a601845a5fc 100644
--- a/messagebus/src/tests/throttling/throttling.cpp
+++ b/messagebus/src/tests/throttling/throttling.cpp
@@ -252,36 +252,39 @@ Test::testMinOne()
void
Test::testDynamicWindowSize()
{
- std::unique_ptr<DynamicTimer> ptr(new DynamicTimer());
- DynamicTimer *timer = ptr.get();
+ auto ptr = std::make_unique<DynamicTimer>();
+ auto* timer = ptr.get();
DynamicThrottlePolicy policy(std::move(ptr));
- policy.setWindowSizeIncrement(5);
+ policy.setWindowSizeIncrement(5)
+ .setResizeRate(1);
double windowSize = getWindowSize(policy, *timer, 100);
- ASSERT_TRUE(windowSize >= 90 && windowSize <= 110);
+ ASSERT_TRUE(windowSize >= 90 && windowSize <= 105);
windowSize = getWindowSize(policy, *timer, 200);
- ASSERT_TRUE(windowSize >= 90 && windowSize <= 210);
+ ASSERT_TRUE(windowSize >= 180 && windowSize <= 205);
windowSize = getWindowSize(policy, *timer, 50);
- ASSERT_TRUE(windowSize >= 9 && windowSize <= 55);
+ ASSERT_TRUE(windowSize >= 45 && windowSize <= 55);
windowSize = getWindowSize(policy, *timer, 500);
- ASSERT_TRUE(windowSize >= 90 && windowSize <= 505);
+ ASSERT_TRUE(windowSize >= 450 && windowSize <= 505);
windowSize = getWindowSize(policy, *timer, 100);
- ASSERT_TRUE(windowSize >= 90 && windowSize <= 110);
+ ASSERT_TRUE(windowSize >= 90 && windowSize <= 115);
}
void
Test::testIdleTimePeriod()
{
- ITimer::UP ptr(new DynamicTimer());
- DynamicTimer *timer = static_cast<DynamicTimer*>(ptr.get());
+ auto ptr = std::make_unique<DynamicTimer>();
+ auto* timer = ptr.get();
DynamicThrottlePolicy policy(std::move(ptr));
- policy.setWindowSizeIncrement(5);
+ policy.setWindowSizeIncrement(5)
+ .setMinWindowSize(1)
+ .setResizeRate(1);
double windowSize = getWindowSize(policy, *timer, 100);
ASSERT_TRUE(windowSize >= 90 && windowSize <= 110);
@@ -303,12 +306,13 @@ Test::testIdleTimePeriod()
void
Test::testMinWindowSize()
{
- ITimer::UP ptr(new DynamicTimer());
- DynamicTimer *timer = static_cast<DynamicTimer*>(ptr.get());
+ auto ptr = std::make_unique<DynamicTimer>();
+ auto* timer = ptr.get();
DynamicThrottlePolicy policy(std::move(ptr));
- policy.setWindowSizeIncrement(5);
- policy.setMinWindowSize(150);
+ policy.setWindowSizeIncrement(5)
+ .setResizeRate(1)
+ .setMinWindowSize(150);
double windowSize = getWindowSize(policy, *timer, 200);
ASSERT_TRUE(windowSize >= 150 && windowSize <= 210);
@@ -317,12 +321,13 @@ Test::testMinWindowSize()
void
Test::testMaxWindowSize()
{
- ITimer::UP ptr(new DynamicTimer());
- DynamicTimer *timer = static_cast<DynamicTimer*>(ptr.get());
+ auto ptr = std::make_unique<DynamicTimer>();
+ auto* timer = ptr.get();
DynamicThrottlePolicy policy(std::move(ptr));
- policy.setWindowSizeIncrement(5);
- policy.setMaxWindowSize(50);
+ policy.setWindowSizeIncrement(5)
+ .setResizeRate(1)
+ .setMaxWindowSize(50);
double windowSize = getWindowSize(policy, *timer, 100);
ASSERT_TRUE(windowSize >= 40 && windowSize <= 50);
@@ -338,6 +343,7 @@ Test::getWindowSize(DynamicThrottlePolicy &policy, DynamicTimer &timer, uint32_t
{
SimpleMessage msg("foo");
SimpleReply reply("bar");
+ reply.setContext(mbus::Context(uint64_t(1))); // To offset pending size bump in static policy
for (uint32_t i = 0; i < 999; ++i) {
uint32_t numPending = 0;
@@ -354,6 +360,6 @@ Test::getWindowSize(DynamicThrottlePolicy &policy, DynamicTimer &timer, uint32_t
}
}
uint32_t ret = policy.getMaxPendingCount();
- printf("getWindowSize() = %d\n", ret);
+ fprintf(stderr, "getWindowSize() = %u\n", ret);
return ret;
}
diff --git a/messagebus/src/vespa/messagebus/dynamicthrottlepolicy.cpp b/messagebus/src/vespa/messagebus/dynamicthrottlepolicy.cpp
index 00a76ca8b12..be61dcdc675 100644
--- a/messagebus/src/vespa/messagebus/dynamicthrottlepolicy.cpp
+++ b/messagebus/src/vespa/messagebus/dynamicthrottlepolicy.cpp
@@ -12,8 +12,8 @@ DynamicThrottlePolicy::DynamicThrottlePolicy() :
_timer(new SteadyTimer()),
_numSent(0),
_numOk(0),
- _resizeRate(3),
- _resizeTime(_timer->getMilliTime()),
+ _resizeRate(3.0),
+ _resizeTime(0),
_timeOfLastMessage(_timer->getMilliTime()),
_idleTimePeriod(60000),
_efficiencyThreshold(1),
@@ -21,6 +21,7 @@ DynamicThrottlePolicy::DynamicThrottlePolicy() :
_windowSize(_windowSizeIncrement),
_maxWindowSize(INT_MAX),
_minWindowSize(_windowSizeIncrement),
+ _decrementFactor(2.0),
_windowSizeBackOff(0.9),
_weight(1),
_localMaxThroughput(0)
@@ -30,8 +31,8 @@ DynamicThrottlePolicy::DynamicThrottlePolicy(double windowSizeIncrement) :
_timer(new SteadyTimer()),
_numSent(0),
_numOk(0),
- _resizeRate(3),
- _resizeTime(_timer->getMilliTime()),
+ _resizeRate(3.0),
+ _resizeTime(0),
_timeOfLastMessage(_timer->getMilliTime()),
_idleTimePeriod(60000),
_efficiencyThreshold(1),
@@ -39,6 +40,7 @@ DynamicThrottlePolicy::DynamicThrottlePolicy(double windowSizeIncrement) :
_windowSize(_windowSizeIncrement),
_maxWindowSize(INT_MAX),
_minWindowSize(_windowSizeIncrement),
+ _decrementFactor(2.0),
_windowSizeBackOff(0.9),
_weight(1),
_localMaxThroughput(0)
@@ -48,8 +50,8 @@ DynamicThrottlePolicy::DynamicThrottlePolicy(ITimer::UP timer) :
_timer(std::move(timer)),
_numSent(0),
_numOk(0),
- _resizeRate(3),
- _resizeTime(_timer->getMilliTime()),
+ _resizeRate(3.0),
+ _resizeTime(0),
_timeOfLastMessage(_timer->getMilliTime()),
_idleTimePeriod(60000),
_efficiencyThreshold(1),
@@ -57,6 +59,7 @@ DynamicThrottlePolicy::DynamicThrottlePolicy(ITimer::UP timer) :
_windowSize(_windowSizeIncrement),
_maxWindowSize(INT_MAX),
_minWindowSize(_windowSizeIncrement),
+ _decrementFactor(2.0),
_windowSizeBackOff(0.9),
_weight(1),
_localMaxThroughput(0)
@@ -73,20 +76,21 @@ DynamicThrottlePolicy &
DynamicThrottlePolicy::setWindowSizeIncrement(double windowSizeIncrement)
{
_windowSizeIncrement = windowSizeIncrement;
+ _windowSize = std::max(_windowSize, _windowSizeIncrement);
return *this;
}
DynamicThrottlePolicy &
DynamicThrottlePolicy::setWindowSizeBackOff(double windowSizeBackOff)
{
- _windowSizeBackOff = windowSizeBackOff;
+ _windowSizeBackOff = std::max(0.0, std::min(1.0, windowSizeBackOff));
return *this;
}
DynamicThrottlePolicy &
-DynamicThrottlePolicy::setResizeRate(uint32_t resizeRate)
+DynamicThrottlePolicy::setResizeRate(double resizeRate)
{
- _resizeRate = resizeRate;
+ _resizeRate = std::max(2.0, resizeRate);
return *this;
}
@@ -115,6 +119,14 @@ DynamicThrottlePolicy &
DynamicThrottlePolicy::setMinWindowSize(double min)
{
_minWindowSize = min;
+ _windowSize = std::max(_minWindowSize, _windowSizeIncrement);
+ return *this;
+}
+
+DynamicThrottlePolicy&
+DynamicThrottlePolicy::setWindowSizeDecrementFactor(double decrementFactor)
+{
+ _decrementFactor = decrementFactor;
return *this;
}
@@ -134,10 +146,14 @@ DynamicThrottlePolicy::canSend(const Message &msg, uint32_t pendingCount)
}
uint64_t time = _timer->getMilliTime();
if (time - _timeOfLastMessage > _idleTimePeriod) {
- _windowSize = std::min(_windowSize, (double) pendingCount + _windowSizeIncrement);
+ _windowSize = std::max(_minWindowSize, std::min(_windowSize, pendingCount + _windowSizeIncrement));
+ LOG(debug, "Idle time exceeded; WindowSize = %.2f", _windowSize);
}
_timeOfLastMessage = time;
- return pendingCount < _windowSize;
+ auto windowSizeFloored = static_cast<uint32_t>(_windowSize);
+ // Use floating point window sizes, so the algorithm sees the difference between 1.1 and 1.9 window size.
+ bool carry = _numSent < ((_windowSize * _resizeRate) * (_windowSize - windowSizeFloored));
+ return pendingCount < (windowSizeFloored + (carry ? 1 : 0));
}
void
@@ -156,7 +172,7 @@ DynamicThrottlePolicy::processMessage(Message &msg)
_numSent = 0;
_numOk = 0;
- if (throughput > _localMaxThroughput * 1.01) {
+ if (throughput > _localMaxThroughput) {
LOG(debug, "WindowSize = %.2f, Throughput = %f", _windowSize, throughput);
_localMaxThroughput = throughput;
_windowSize += _weight*_windowSizeIncrement;
@@ -170,15 +186,15 @@ DynamicThrottlePolicy::processMessage(Message &msg)
period *= 0.1;
}
double efficiency = throughput*period/_windowSize;
- LOG(debug, "WindowSize = %.2f, Throughput = %f, Efficiency = %.2f, Elapsed = %.2f, Period = %.2f", _windowSize, throughput, efficiency, elapsed, period);
if (efficiency < _efficiencyThreshold) {
- double newSize = std::min(throughput * period, _windowSize);
- _windowSize = std::min(newSize * _windowSizeBackOff, _windowSize - 2 * _windowSizeIncrement);
+ _windowSize = std::min(_windowSize * _windowSizeBackOff, _windowSize - _decrementFactor * _windowSizeIncrement);
_localMaxThroughput = 0;
} else {
_windowSize += _weight*_windowSizeIncrement;
}
+ LOG(debug, "WindowSize = %.2f, Throughput = %f, Efficiency = %.2f, Elapsed = %.2f, Period = %.2f",
+ _windowSize, throughput, efficiency, elapsed, period);
}
_windowSize = std::max(_minWindowSize, _windowSize);
_windowSize = std::min(_maxWindowSize, _windowSize);
diff --git a/messagebus/src/vespa/messagebus/dynamicthrottlepolicy.h b/messagebus/src/vespa/messagebus/dynamicthrottlepolicy.h
index 99a549beef2..1ae7e88f876 100644
--- a/messagebus/src/vespa/messagebus/dynamicthrottlepolicy.h
+++ b/messagebus/src/vespa/messagebus/dynamicthrottlepolicy.h
@@ -19,7 +19,7 @@ private:
ITimer::UP _timer;
uint32_t _numSent;
uint32_t _numOk;
- uint32_t _resizeRate;
+ double _resizeRate;
uint64_t _resizeTime;
uint64_t _timeOfLastMessage;
uint64_t _idleTimePeriod;
@@ -28,6 +28,7 @@ private:
double _windowSize;
double _maxWindowSize;
double _minWindowSize;
+ double _decrementFactor;
double _windowSizeBackOff;
double _weight;
double _localMaxThroughput;
@@ -96,7 +97,7 @@ public:
* @param resizeRate The rate to set.
* @return This, to allow chaining.
*/
- DynamicThrottlePolicy &setResizeRate(uint32_t resizeRate);
+ DynamicThrottlePolicy &setResizeRate(double resizeRate);
/**
* Sets the weight for this client. The larger the value, the more resources
@@ -151,6 +152,14 @@ public:
DynamicThrottlePolicy &setMinWindowSize(double min);
/**
+ * Sets the relative step size when decreasing window size.
+ *
+ * @param decrementFactor the step size to set
+ * @return this, to allow chaining
+ */
+ DynamicThrottlePolicy& setWindowSizeDecrementFactor(double decrementFactor);
+
+ /**
* Get the minimum number of pending operations allowed at any time.
*
* @return The minimum number of operations.