aboutsummaryrefslogtreecommitdiffstats
path: root/messagebus
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahooinc.com>2022-01-17 16:02:56 +0000
committerTor Brede Vekterli <vekterli@yahooinc.com>2022-01-17 16:02:56 +0000
commit28a3fb6be2904d94848b3ffe51772b2ab3ed07de (patch)
treeb9098d4b0ccdc3786b3448335b9b11ba20e6a742 /messagebus
parentaf0de1790ddcec36d90821fa2fa15d8364ac312e (diff)
Update C++ DynamicThrottlePolicy to match Java impl semantics
This is an attempt at making the C++ implementation 1-1 with the Java implementation, the latter considered the source of truth for this matter. Unit tests have been updated to match their Java equivalents as well. This task was approached much in the same way a novice wizard would transcribe arcane runes that may possible blow their hands off; carefully and with more pragmatic respect than understanding.
Diffstat (limited to 'messagebus')
-rw-r--r--messagebus/src/tests/throttling/throttling.cpp46
-rw-r--r--messagebus/src/vespa/messagebus/dynamicthrottlepolicy.cpp46
-rw-r--r--messagebus/src/vespa/messagebus/dynamicthrottlepolicy.h13
3 files changed, 68 insertions, 37 deletions
diff --git a/messagebus/src/tests/throttling/throttling.cpp b/messagebus/src/tests/throttling/throttling.cpp
index 07db548be48..a601845a5fc 100644
--- a/messagebus/src/tests/throttling/throttling.cpp
+++ b/messagebus/src/tests/throttling/throttling.cpp
@@ -252,36 +252,39 @@ Test::testMinOne()
void
Test::testDynamicWindowSize()
{
- std::unique_ptr<DynamicTimer> ptr(new DynamicTimer());
- DynamicTimer *timer = ptr.get();
+ auto ptr = std::make_unique<DynamicTimer>();
+ auto* timer = ptr.get();
DynamicThrottlePolicy policy(std::move(ptr));
- policy.setWindowSizeIncrement(5);
+ policy.setWindowSizeIncrement(5)
+ .setResizeRate(1);
double windowSize = getWindowSize(policy, *timer, 100);
- ASSERT_TRUE(windowSize >= 90 && windowSize <= 110);
+ ASSERT_TRUE(windowSize >= 90 && windowSize <= 105);
windowSize = getWindowSize(policy, *timer, 200);
- ASSERT_TRUE(windowSize >= 90 && windowSize <= 210);
+ ASSERT_TRUE(windowSize >= 180 && windowSize <= 205);
windowSize = getWindowSize(policy, *timer, 50);
- ASSERT_TRUE(windowSize >= 9 && windowSize <= 55);
+ ASSERT_TRUE(windowSize >= 45 && windowSize <= 55);
windowSize = getWindowSize(policy, *timer, 500);
- ASSERT_TRUE(windowSize >= 90 && windowSize <= 505);
+ ASSERT_TRUE(windowSize >= 450 && windowSize <= 505);
windowSize = getWindowSize(policy, *timer, 100);
- ASSERT_TRUE(windowSize >= 90 && windowSize <= 110);
+ ASSERT_TRUE(windowSize >= 90 && windowSize <= 115);
}
void
Test::testIdleTimePeriod()
{
- ITimer::UP ptr(new DynamicTimer());
- DynamicTimer *timer = static_cast<DynamicTimer*>(ptr.get());
+ auto ptr = std::make_unique<DynamicTimer>();
+ auto* timer = ptr.get();
DynamicThrottlePolicy policy(std::move(ptr));
- policy.setWindowSizeIncrement(5);
+ policy.setWindowSizeIncrement(5)
+ .setMinWindowSize(1)
+ .setResizeRate(1);
double windowSize = getWindowSize(policy, *timer, 100);
ASSERT_TRUE(windowSize >= 90 && windowSize <= 110);
@@ -303,12 +306,13 @@ Test::testIdleTimePeriod()
void
Test::testMinWindowSize()
{
- ITimer::UP ptr(new DynamicTimer());
- DynamicTimer *timer = static_cast<DynamicTimer*>(ptr.get());
+ auto ptr = std::make_unique<DynamicTimer>();
+ auto* timer = ptr.get();
DynamicThrottlePolicy policy(std::move(ptr));
- policy.setWindowSizeIncrement(5);
- policy.setMinWindowSize(150);
+ policy.setWindowSizeIncrement(5)
+ .setResizeRate(1)
+ .setMinWindowSize(150);
double windowSize = getWindowSize(policy, *timer, 200);
ASSERT_TRUE(windowSize >= 150 && windowSize <= 210);
@@ -317,12 +321,13 @@ Test::testMinWindowSize()
void
Test::testMaxWindowSize()
{
- ITimer::UP ptr(new DynamicTimer());
- DynamicTimer *timer = static_cast<DynamicTimer*>(ptr.get());
+ auto ptr = std::make_unique<DynamicTimer>();
+ auto* timer = ptr.get();
DynamicThrottlePolicy policy(std::move(ptr));
- policy.setWindowSizeIncrement(5);
- policy.setMaxWindowSize(50);
+ policy.setWindowSizeIncrement(5)
+ .setResizeRate(1)
+ .setMaxWindowSize(50);
double windowSize = getWindowSize(policy, *timer, 100);
ASSERT_TRUE(windowSize >= 40 && windowSize <= 50);
@@ -338,6 +343,7 @@ Test::getWindowSize(DynamicThrottlePolicy &policy, DynamicTimer &timer, uint32_t
{
SimpleMessage msg("foo");
SimpleReply reply("bar");
+ reply.setContext(mbus::Context(uint64_t(1))); // To offset pending size bump in static policy
for (uint32_t i = 0; i < 999; ++i) {
uint32_t numPending = 0;
@@ -354,6 +360,6 @@ Test::getWindowSize(DynamicThrottlePolicy &policy, DynamicTimer &timer, uint32_t
}
}
uint32_t ret = policy.getMaxPendingCount();
- printf("getWindowSize() = %d\n", ret);
+ fprintf(stderr, "getWindowSize() = %u\n", ret);
return ret;
}
diff --git a/messagebus/src/vespa/messagebus/dynamicthrottlepolicy.cpp b/messagebus/src/vespa/messagebus/dynamicthrottlepolicy.cpp
index 00a76ca8b12..be61dcdc675 100644
--- a/messagebus/src/vespa/messagebus/dynamicthrottlepolicy.cpp
+++ b/messagebus/src/vespa/messagebus/dynamicthrottlepolicy.cpp
@@ -12,8 +12,8 @@ DynamicThrottlePolicy::DynamicThrottlePolicy() :
_timer(new SteadyTimer()),
_numSent(0),
_numOk(0),
- _resizeRate(3),
- _resizeTime(_timer->getMilliTime()),
+ _resizeRate(3.0),
+ _resizeTime(0),
_timeOfLastMessage(_timer->getMilliTime()),
_idleTimePeriod(60000),
_efficiencyThreshold(1),
@@ -21,6 +21,7 @@ DynamicThrottlePolicy::DynamicThrottlePolicy() :
_windowSize(_windowSizeIncrement),
_maxWindowSize(INT_MAX),
_minWindowSize(_windowSizeIncrement),
+ _decrementFactor(2.0),
_windowSizeBackOff(0.9),
_weight(1),
_localMaxThroughput(0)
@@ -30,8 +31,8 @@ DynamicThrottlePolicy::DynamicThrottlePolicy(double windowSizeIncrement) :
_timer(new SteadyTimer()),
_numSent(0),
_numOk(0),
- _resizeRate(3),
- _resizeTime(_timer->getMilliTime()),
+ _resizeRate(3.0),
+ _resizeTime(0),
_timeOfLastMessage(_timer->getMilliTime()),
_idleTimePeriod(60000),
_efficiencyThreshold(1),
@@ -39,6 +40,7 @@ DynamicThrottlePolicy::DynamicThrottlePolicy(double windowSizeIncrement) :
_windowSize(_windowSizeIncrement),
_maxWindowSize(INT_MAX),
_minWindowSize(_windowSizeIncrement),
+ _decrementFactor(2.0),
_windowSizeBackOff(0.9),
_weight(1),
_localMaxThroughput(0)
@@ -48,8 +50,8 @@ DynamicThrottlePolicy::DynamicThrottlePolicy(ITimer::UP timer) :
_timer(std::move(timer)),
_numSent(0),
_numOk(0),
- _resizeRate(3),
- _resizeTime(_timer->getMilliTime()),
+ _resizeRate(3.0),
+ _resizeTime(0),
_timeOfLastMessage(_timer->getMilliTime()),
_idleTimePeriod(60000),
_efficiencyThreshold(1),
@@ -57,6 +59,7 @@ DynamicThrottlePolicy::DynamicThrottlePolicy(ITimer::UP timer) :
_windowSize(_windowSizeIncrement),
_maxWindowSize(INT_MAX),
_minWindowSize(_windowSizeIncrement),
+ _decrementFactor(2.0),
_windowSizeBackOff(0.9),
_weight(1),
_localMaxThroughput(0)
@@ -73,20 +76,21 @@ DynamicThrottlePolicy &
DynamicThrottlePolicy::setWindowSizeIncrement(double windowSizeIncrement)
{
_windowSizeIncrement = windowSizeIncrement;
+ _windowSize = std::max(_windowSize, _windowSizeIncrement);
return *this;
}
DynamicThrottlePolicy &
DynamicThrottlePolicy::setWindowSizeBackOff(double windowSizeBackOff)
{
- _windowSizeBackOff = windowSizeBackOff;
+ _windowSizeBackOff = std::max(0.0, std::min(1.0, windowSizeBackOff));
return *this;
}
DynamicThrottlePolicy &
-DynamicThrottlePolicy::setResizeRate(uint32_t resizeRate)
+DynamicThrottlePolicy::setResizeRate(double resizeRate)
{
- _resizeRate = resizeRate;
+ _resizeRate = std::max(2.0, resizeRate);
return *this;
}
@@ -115,6 +119,14 @@ DynamicThrottlePolicy &
DynamicThrottlePolicy::setMinWindowSize(double min)
{
_minWindowSize = min;
+ _windowSize = std::max(_minWindowSize, _windowSizeIncrement);
+ return *this;
+}
+
+DynamicThrottlePolicy&
+DynamicThrottlePolicy::setWindowSizeDecrementFactor(double decrementFactor)
+{
+ _decrementFactor = decrementFactor;
return *this;
}
@@ -134,10 +146,14 @@ DynamicThrottlePolicy::canSend(const Message &msg, uint32_t pendingCount)
}
uint64_t time = _timer->getMilliTime();
if (time - _timeOfLastMessage > _idleTimePeriod) {
- _windowSize = std::min(_windowSize, (double) pendingCount + _windowSizeIncrement);
+ _windowSize = std::max(_minWindowSize, std::min(_windowSize, pendingCount + _windowSizeIncrement));
+ LOG(debug, "Idle time exceeded; WindowSize = %.2f", _windowSize);
}
_timeOfLastMessage = time;
- return pendingCount < _windowSize;
+ auto windowSizeFloored = static_cast<uint32_t>(_windowSize);
+ // Use floating point window sizes, so the algorithm sees the difference between 1.1 and 1.9 window size.
+ bool carry = _numSent < ((_windowSize * _resizeRate) * (_windowSize - windowSizeFloored));
+ return pendingCount < (windowSizeFloored + (carry ? 1 : 0));
}
void
@@ -156,7 +172,7 @@ DynamicThrottlePolicy::processMessage(Message &msg)
_numSent = 0;
_numOk = 0;
- if (throughput > _localMaxThroughput * 1.01) {
+ if (throughput > _localMaxThroughput) {
LOG(debug, "WindowSize = %.2f, Throughput = %f", _windowSize, throughput);
_localMaxThroughput = throughput;
_windowSize += _weight*_windowSizeIncrement;
@@ -170,15 +186,15 @@ DynamicThrottlePolicy::processMessage(Message &msg)
period *= 0.1;
}
double efficiency = throughput*period/_windowSize;
- LOG(debug, "WindowSize = %.2f, Throughput = %f, Efficiency = %.2f, Elapsed = %.2f, Period = %.2f", _windowSize, throughput, efficiency, elapsed, period);
if (efficiency < _efficiencyThreshold) {
- double newSize = std::min(throughput * period, _windowSize);
- _windowSize = std::min(newSize * _windowSizeBackOff, _windowSize - 2 * _windowSizeIncrement);
+ _windowSize = std::min(_windowSize * _windowSizeBackOff, _windowSize - _decrementFactor * _windowSizeIncrement);
_localMaxThroughput = 0;
} else {
_windowSize += _weight*_windowSizeIncrement;
}
+ LOG(debug, "WindowSize = %.2f, Throughput = %f, Efficiency = %.2f, Elapsed = %.2f, Period = %.2f",
+ _windowSize, throughput, efficiency, elapsed, period);
}
_windowSize = std::max(_minWindowSize, _windowSize);
_windowSize = std::min(_maxWindowSize, _windowSize);
diff --git a/messagebus/src/vespa/messagebus/dynamicthrottlepolicy.h b/messagebus/src/vespa/messagebus/dynamicthrottlepolicy.h
index 99a549beef2..1ae7e88f876 100644
--- a/messagebus/src/vespa/messagebus/dynamicthrottlepolicy.h
+++ b/messagebus/src/vespa/messagebus/dynamicthrottlepolicy.h
@@ -19,7 +19,7 @@ private:
ITimer::UP _timer;
uint32_t _numSent;
uint32_t _numOk;
- uint32_t _resizeRate;
+ double _resizeRate;
uint64_t _resizeTime;
uint64_t _timeOfLastMessage;
uint64_t _idleTimePeriod;
@@ -28,6 +28,7 @@ private:
double _windowSize;
double _maxWindowSize;
double _minWindowSize;
+ double _decrementFactor;
double _windowSizeBackOff;
double _weight;
double _localMaxThroughput;
@@ -96,7 +97,7 @@ public:
* @param resizeRate The rate to set.
* @return This, to allow chaining.
*/
- DynamicThrottlePolicy &setResizeRate(uint32_t resizeRate);
+ DynamicThrottlePolicy &setResizeRate(double resizeRate);
/**
* Sets the weight for this client. The larger the value, the more resources
@@ -151,6 +152,14 @@ public:
DynamicThrottlePolicy &setMinWindowSize(double min);
/**
+ * Sets the relative step size when decreasing window size.
+ *
+ * @param decrementFactor the step size to set
+ * @return this, to allow chaining
+ */
+ DynamicThrottlePolicy& setWindowSizeDecrementFactor(double decrementFactor);
+
+ /**
* Get the minimum number of pending operations allowed at any time.
*
* @return The minimum number of operations.