summaryrefslogtreecommitdiffstats
path: root/staging_vespalib
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-04-06 15:06:43 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2020-04-06 15:06:43 +0000
commitba1dcd889e54af3c16c2122c000cca94ca03d22e (patch)
treeb7e069e930d2812cba79e982f005198ffe7eb727 /staging_vespalib
parentb5882399caf70648f9dba597696d20019e574fea (diff)
Add control for kind_of_watermark and reaction time.
Diffstat (limited to 'staging_vespalib')
-rw-r--r--staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp22
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp7
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h4
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp16
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/singleexecutor.h3
5 files changed, 44 insertions, 8 deletions
diff --git a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp
index f5f04738e92..70d0f1c743d 100644
--- a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp
+++ b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp
@@ -1,6 +1,8 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <vespa/vespalib/util/sequencedtaskexecutor.h>
+#include <vespa/vespalib/util/adaptive_sequenced_executor.h>
+
#include <vespa/vespalib/testkit/testapp.h>
#include <vespa/vespalib/test/insertion_operators.h>
@@ -245,6 +247,26 @@ TEST("require that you distribute well") {
EXPECT_EQUAL(97u, seven->getComponentEffectiveHashSize());
}
+TEST("Test creation of different types") {
+ auto iseq = SequencedTaskExecutor::create(1);
+
+ EXPECT_EQUAL(1u, iseq->getNumExecutors());
+ auto * seq = dynamic_cast<SequencedTaskExecutor *>(iseq.get());
+ ASSERT_TRUE(seq != nullptr);
+
+ iseq = SequencedTaskExecutor::create(1, 1000, Executor::OptimizeFor::LATENCY);
+ seq = dynamic_cast<SequencedTaskExecutor *>(iseq.get());
+ ASSERT_TRUE(seq != nullptr);
+
+ iseq = SequencedTaskExecutor::create(1, 1000, Executor::OptimizeFor::THROUGHPUT);
+ seq = dynamic_cast<SequencedTaskExecutor *>(iseq.get());
+ ASSERT_TRUE(seq != nullptr);
+
+ iseq = SequencedTaskExecutor::create(1, 1000, Executor::OptimizeFor::ADAPTIVE, 17);
+ auto aseq = dynamic_cast<AdaptiveSequencedExecutor *>(iseq.get());
+ ASSERT_TRUE(aseq != nullptr);
+}
+
}
TEST_MAIN() { TEST_RUN_ALL(); }
diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp
index 6d42895bffc..a0c2f0ac237 100644
--- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp
@@ -15,16 +15,17 @@ constexpr uint32_t stackSize = 128 * 1024;
std::unique_ptr<ISequencedTaskExecutor>
-SequencedTaskExecutor::create(uint32_t threads, uint32_t taskLimit, OptimizeFor optimize)
+SequencedTaskExecutor::create(uint32_t threads, uint32_t taskLimit, OptimizeFor optimize, uint32_t kindOfWatermark, duration reactionTime)
{
if (optimize == OptimizeFor::ADAPTIVE) {
- return std::make_unique<AdaptiveSequencedExecutor>(threads, threads, 0, taskLimit);
+ return std::make_unique<AdaptiveSequencedExecutor>(threads, threads, kindOfWatermark, taskLimit);
} else {
auto executors = std::make_unique<std::vector<std::unique_ptr<SyncableThreadExecutor>>>();
executors->reserve(threads);
for (uint32_t id = 0; id < threads; ++id) {
if (optimize == OptimizeFor::THROUGHPUT) {
- executors->push_back(std::make_unique<SingleExecutor>(taskLimit));
+ uint32_t watermark = kindOfWatermark == 0 ? taskLimit / 10 : kindOfWatermark;
+ executors->push_back(std::make_unique<SingleExecutor>(taskLimit, watermark, reactionTime));
} else {
executors->push_back(std::make_unique<BlockingThreadStackExecutor>(1, stackSize, taskLimit));
}
diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h
index 14127970403..b3dd400478a 100644
--- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h
+++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h
@@ -2,6 +2,7 @@
#pragma once
#include "isequencedtaskexecutor.h"
+#include <vespa/vespalib/util/time.h>
namespace vespalib {
@@ -30,9 +31,10 @@ public:
/*
* Note that if you choose Optimize::THROUGHPUT, you must ensure only a single producer, or synchronize on the outside.
+ *
*/
static std::unique_ptr<ISequencedTaskExecutor>
- create(uint32_t threads, uint32_t taskLimit = 1000, OptimizeFor optimize = OptimizeFor::LATENCY);
+ create(uint32_t threads, uint32_t taskLimit = 1000, OptimizeFor optimize = OptimizeFor::LATENCY, uint32_t kindOfWatermark = 0, duration reactionTime = 10ms);
};
} // namespace search
diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
index 90eb18c23ef..a17037799a3 100644
--- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
@@ -6,6 +6,10 @@
namespace vespalib {
SingleExecutor::SingleExecutor(uint32_t taskLimit)
+ : SingleExecutor(taskLimit, taskLimit/10, 5ms)
+{ }
+
+SingleExecutor::SingleExecutor(uint32_t taskLimit, uint32_t watermark, duration reactionTime)
: _taskLimit(vespalib::roundUp2inN(taskLimit)),
_wantedTaskLimit(_taskLimit.load()),
_rp(0),
@@ -19,10 +23,14 @@ SingleExecutor::SingleExecutor(uint32_t taskLimit)
_wakeupConsumerAt(0),
_producerNeedWakeupAt(0),
_wp(0),
+ _watermark(std::min(_taskLimit.load(), watermark)),
+ _reactionTime(reactionTime),
_closed(false)
{
+ assert(taskLimit >= watermark);
_thread.start();
}
+
SingleExecutor::~SingleExecutor() {
shutdown();
sync();
@@ -62,7 +70,7 @@ SingleExecutor::execute(Task::UP task) {
void
SingleExecutor::setTaskLimit(uint32_t taskLimit) {
- _wantedTaskLimit = vespalib::roundUp2inN(taskLimit);
+ _wantedTaskLimit = std::max(vespalib::roundUp2inN(taskLimit), size_t(_watermark));
}
void
@@ -102,10 +110,10 @@ SingleExecutor::run() {
while (!_thread.stopped()) {
drain_tasks();
_producerCondition.notify_all();
- _wakeupConsumerAt.store(_wp.load(std::memory_order_relaxed) + (_taskLimit.load(std::memory_order_relaxed) / 4), std::memory_order_relaxed);
+ _wakeupConsumerAt.store(_wp.load(std::memory_order_relaxed) + _watermark, std::memory_order_relaxed);
Lock lock(_mutex);
if (numTasks() <= 0) {
- _consumerCondition.wait_for(lock, 10ms);
+ _consumerCondition.wait_for(lock, _reactionTime);
}
_wakeupConsumerAt.store(0, std::memory_order_relaxed);
}
@@ -144,7 +152,7 @@ SingleExecutor::wait_for_room(Lock & lock) {
}
_queueSize.add(numTasks());
while (numTasks() >= _taskLimit.load(std::memory_order_relaxed)) {
- sleepProducer(lock, 10ms, wp - taskLimit/4);
+ sleepProducer(lock, _reactionTime, wp - _watermark);
}
}
diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
index 3d759769ea3..a58128c15aa 100644
--- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
+++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
@@ -19,6 +19,7 @@ namespace vespalib {
class SingleExecutor final : public vespalib::SyncableThreadExecutor, vespalib::Runnable {
public:
explicit SingleExecutor(uint32_t taskLimit);
+ SingleExecutor(uint32_t taskLimit, uint32_t watermark, duration reactionTime);
~SingleExecutor() override;
Task::UP execute(Task::UP task) override;
void setTaskLimit(uint32_t taskLimit) override;
@@ -56,6 +57,8 @@ private:
std::atomic<uint64_t> _wakeupConsumerAt;
std::atomic<uint64_t> _producerNeedWakeupAt;
std::atomic<uint64_t> _wp;
+ const uint32_t _watermark;
+ const duration _reactionTime;
bool _closed;
};