aboutsummaryrefslogtreecommitdiffstats
path: root/staging_vespalib
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2021-12-06 17:49:36 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2021-12-06 17:49:36 +0000
commit7f71eddb09917cf59091236c7f1b0701c038d5f1 (patch)
tree0b424617a3eca985e38a8df256c9f79474b9d61a /staging_vespalib
parent30827a4c8ed38be228f26b590a67eab6321ec76e (diff)
Add testing of watermark and change it to have the ration to the taskLimit as
it had on initial construction time.
Diffstat (limited to 'staging_vespalib')
-rw-r--r--staging_vespalib/src/tests/singleexecutor/singleexecutor_test.cpp8
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp10
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/singleexecutor.h4
3 files changed, 15 insertions, 7 deletions
diff --git a/staging_vespalib/src/tests/singleexecutor/singleexecutor_test.cpp b/staging_vespalib/src/tests/singleexecutor/singleexecutor_test.cpp
index fae57b6532f..dd71380f64a 100644
--- a/staging_vespalib/src/tests/singleexecutor/singleexecutor_test.cpp
+++ b/staging_vespalib/src/tests/singleexecutor/singleexecutor_test.cpp
@@ -37,7 +37,10 @@ void verifyResizeTaskLimit(bool up) {
std::atomic<uint64_t> allowed(0);
constexpr uint32_t INITIAL = 20;
const uint32_t INITIAL_2inN = roundUp2inN(INITIAL);
- SingleExecutor executor(sequenced_executor, INITIAL, INITIAL/2, 10ms);
+ double waterMarkRatio = 0.5;
+ SingleExecutor executor(sequenced_executor, INITIAL, INITIAL*waterMarkRatio, 10ms);
+ EXPECT_EQUAL(INITIAL_2inN, executor.getTaskLimit());
+ EXPECT_EQUAL(uint32_t(INITIAL_2inN*waterMarkRatio), executor.get_watermark());
uint32_t targetTaskLimit = up ? 40 : 5;
uint32_t roundedTaskLimit = roundUp2inN(targetTaskLimit);
@@ -56,6 +59,7 @@ void verifyResizeTaskLimit(bool up) {
EXPECT_EQUAL(1u, started);
executor.setTaskLimit(targetTaskLimit);
EXPECT_EQUAL(INITIAL_2inN, executor.getTaskLimit());
+ EXPECT_EQUAL(INITIAL_2inN*waterMarkRatio, executor.get_watermark());
allowed = 5;
while (started < 6);
EXPECT_EQUAL(6u, started);
@@ -74,8 +78,10 @@ void verifyResizeTaskLimit(bool up) {
while (started < INITIAL + 1);
EXPECT_EQUAL(INITIAL + 1, started);
EXPECT_EQUAL(roundedTaskLimit, executor.getTaskLimit());
+ EXPECT_EQUAL(roundedTaskLimit*waterMarkRatio, executor.get_watermark());
allowed = INITIAL + 1;
}
+
TEST("test that resizing up and down works") {
TEST_DO(verifyResizeTaskLimit(true));
TEST_DO(verifyResizeTaskLimit(false));
diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
index 2ebc64cf62a..ff46f58cb84 100644
--- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
@@ -27,7 +27,7 @@ SingleExecutor::SingleExecutor(init_fun_t func, uint32_t taskLimit, uint32_t wat
_wakeupConsumerAt(0),
_producerNeedWakeupAt(0),
_wp(0),
- _watermark(std::min(_taskLimit.load(), watermark)),
+ _watermark(std::min(_taskLimit.load(), (watermark*_taskLimit)/taskLimit)),
_reactionTime(reactionTime),
_closed(false)
{
@@ -75,7 +75,7 @@ SingleExecutor::execute(Task::UP task) {
void
SingleExecutor::setTaskLimit(uint32_t taskLimit) {
- _wantedTaskLimit = vespalib::roundUp2inN(std::max(taskLimit, _watermark));
+ _wantedTaskLimit = vespalib::roundUp2inN(taskLimit);
}
void
@@ -117,7 +117,7 @@ SingleExecutor::run() {
while (!_thread.stopped()) {
drain_tasks();
_producerCondition.notify_all();
- _wakeupConsumerAt.store(_wp.load(std::memory_order_relaxed) + _watermark, std::memory_order_relaxed);
+ _wakeupConsumerAt.store(_wp.load(std::memory_order_relaxed) + get_watermark(), std::memory_order_relaxed);
Lock lock(_mutex);
if (numTasks() <= 0) {
steady_time now = steady_clock::now();
@@ -158,11 +158,13 @@ SingleExecutor::wait_for_room(Lock & lock) {
if (taskLimit != _wantedTaskLimit.load(std::memory_order_relaxed)) {
drain(lock);
_tasks = std::make_unique<Task::UP[]>(_wantedTaskLimit);
+ double waterMarkRatio = double(get_watermark()) / _taskLimit.load(std::memory_order_relaxed);
_taskLimit = _wantedTaskLimit.load();
+ _watermark = _taskLimit * waterMarkRatio;
}
_queueSize.add(numTasks());
while (numTasks() >= _taskLimit.load(std::memory_order_relaxed)) {
- sleepProducer(lock, _reactionTime, wp - _watermark);
+ sleepProducer(lock, _reactionTime, wp - get_watermark());
}
}
diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
index 7d868322558..9bc2f3ca837 100644
--- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
+++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
@@ -28,7 +28,7 @@ public:
void wakeup() override;
size_t getNumThreads() const override;
uint32_t getTaskLimit() const override { return _taskLimit.load(std::memory_order_relaxed); }
- uint32_t get_watermark() const { return _watermark; }
+ uint32_t get_watermark() const { return _watermark.load(std::memory_order_relaxed); }
duration get_reaction_time() const { return _reactionTime; }
ExecutorStats getStats() override;
SingleExecutor & shutdown() override;
@@ -63,7 +63,7 @@ private:
std::atomic<uint64_t> _wakeupConsumerAt;
std::atomic<uint64_t> _producerNeedWakeupAt;
std::atomic<uint64_t> _wp;
- const uint32_t _watermark;
+ std::atomic<uint32_t> _watermark;
const duration _reactionTime;
bool _closed;
};