summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-03-02 12:52:47 +0100
committerGitHub <noreply@github.com>2020-03-02 12:52:47 +0100
commit81084712352afe6c3cf68aedb695e8b90a51a640 (patch)
tree2796394c409b965c8b636b3985141ff803d1d181
parentafdbe50100d189e20d7f53c971834a58b006e554 (diff)
parent6449cf3d52ae44a54de59e478f8c0915671e23bb (diff)
Merge pull request #12381 from vespa-engine/balder/use-single-threaded-executor
Balder/use single threaded executor
-rw-r--r--searchcore/src/vespa/searchcore/config/proton.def4
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/documentdb.cpp3
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp5
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h4
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp20
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/threading_service_config.h12
-rw-r--r--searchlib/src/tests/common/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp8
-rw-r--r--searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.cpp10
-rw-r--r--searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.h8
-rw-r--r--staging_vespalib/CMakeLists.txt1
-rw-r--r--staging_vespalib/src/tests/singleexecutor/CMakeLists.txt8
-rw-r--r--staging_vespalib/src/tests/singleexecutor/singleexecutor_test.cpp80
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/CMakeLists.txt1
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp131
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/singleexecutor.h55
-rw-r--r--vespalib/src/vespa/vespalib/util/executor.h2
16 files changed, 337 insertions, 15 deletions
diff --git a/searchcore/src/vespa/searchcore/config/proton.def b/searchcore/src/vespa/searchcore/config/proton.def
index eb4f1f6dc89..6040fc651c2 100644
--- a/searchcore/src/vespa/searchcore/config/proton.def
+++ b/searchcore/src/vespa/searchcore/config/proton.def
@@ -117,6 +117,10 @@ indexing.read.io enum {NORMAL, DIRECTIO} default=DIRECTIO restart
## Control number of threads used for indexing
indexing.threads int default=1 restart
+## Option to specify what is most important during indexing.
+## This is experimental and will most likely be temporary.
+indexing.optimize enum {LATENCY, THROUGHPUT} default=LATENCY restart
+
## Maximum number of pending operations for each of the internal
## indexing threads. Only used when visibility delay is zero.
indexing.tasklimit int default=1000 restart
diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp
index 302ffc93f6a..1ee1b703ea9 100644
--- a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp
@@ -140,7 +140,8 @@ DocumentDB::DocumentDB(const vespalib::string &baseDir,
_writeService(sharedExecutor,
_writeServiceConfig.indexingThreads(),
indexing_thread_stack_size,
- _writeServiceConfig.defaultTaskLimit()),
+ _writeServiceConfig.defaultTaskLimit(),
+ _writeServiceConfig.optimize()),
_initializeThreads(std::move(initializeThreads)),
_initConfigSnapshot(),
_initConfigSerialNum(0u),
diff --git a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp
index 058197f0271..6e7b4967f6d 100644
--- a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp
@@ -10,7 +10,8 @@ using search::SequencedTaskExecutor;
namespace proton {
ExecutorThreadingService::ExecutorThreadingService(vespalib::ThreadStackExecutorBase & sharedExecutor,
- uint32_t threads, uint32_t stackSize, uint32_t taskLimit)
+ uint32_t threads, uint32_t stackSize, uint32_t taskLimit,
+ OptimizeFor optimize)
: _sharedExecutor(sharedExecutor),
_masterExecutor(1, stackSize),
@@ -21,7 +22,7 @@ ExecutorThreadingService::ExecutorThreadingService(vespalib::ThreadStackExecutor
_summaryService(_summaryExecutor),
_indexFieldInverter(SequencedTaskExecutor::create(threads, taskLimit)),
_indexFieldWriter(SequencedTaskExecutor::create(threads, taskLimit)),
- _attributeFieldWriter(SequencedTaskExecutor::create(threads, taskLimit))
+ _attributeFieldWriter(SequencedTaskExecutor::create(threads, taskLimit, optimize))
{
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h
index 7bbe1cb162a..2e4dd2035f3 100644
--- a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h
+++ b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h
@@ -29,6 +29,7 @@ private:
std::unique_ptr<search::ISequencedTaskExecutor> _attributeFieldWriter;
public:
+ using OptimizeFor = vespalib::Executor::OptimizeFor;
/**
* Constructor.
*
@@ -38,7 +39,8 @@ public:
ExecutorThreadingService(vespalib::ThreadStackExecutorBase &sharedExecutor,
uint32_t threads = 1,
uint32_t stackSize = 128 * 1024,
- uint32_t taskLimit = 1000);
+ uint32_t taskLimit = 1000,
+ OptimizeFor optimize = OptimizeFor::LATENCY);
~ExecutorThreadingService() override;
/**
diff --git a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp
index 55aa1a20ef6..5bc6ef543f3 100644
--- a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp
@@ -7,13 +7,16 @@
namespace proton {
using ProtonConfig = ThreadingServiceConfig::ProtonConfig;
+using OptimizeFor = vespalib::Executor::OptimizeFor;
ThreadingServiceConfig::ThreadingServiceConfig(uint32_t indexingThreads_,
uint32_t defaultTaskLimit_,
- uint32_t semiUnboundTaskLimit_)
+ uint32_t semiUnboundTaskLimit_,
+ OptimizeFor optimize)
: _indexingThreads(indexingThreads_),
_defaultTaskLimit(defaultTaskLimit_),
- _semiUnboundTaskLimit(semiUnboundTaskLimit_)
+ _semiUnboundTaskLimit(semiUnboundTaskLimit_),
+ _optimize(optimize)
{
}
@@ -30,6 +33,16 @@ calculateIndexingThreads(uint32_t cfgIndexingThreads, double concurrency, const
return std::max(indexingThreads, 1u);
}
+OptimizeFor
+selectOptimization(ProtonConfig::Indexing::Optimize optimize) {
+ using CfgOptimize = ProtonConfig::Indexing::Optimize;
+ switch (optimize) {
+ case CfgOptimize::LATENCY: return OptimizeFor::LATENCY;
+ case CfgOptimize::THROUGHPUT: return OptimizeFor::THROUGHPUT;
+ }
+ return OptimizeFor::LATENCY;
+}
+
}
ThreadingServiceConfig
@@ -37,7 +50,8 @@ ThreadingServiceConfig::make(const ProtonConfig &cfg, double concurrency, const
{
uint32_t indexingThreads = calculateIndexingThreads(cfg.indexing.threads, concurrency, cpuInfo);
return ThreadingServiceConfig(indexingThreads, cfg.indexing.tasklimit,
- (cfg.indexing.semiunboundtasklimit / indexingThreads));
+ (cfg.indexing.semiunboundtasklimit / indexingThreads),
+ selectOptimization(cfg.indexing.optimize));
}
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h
index be39f516598..149215f97dc 100644
--- a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h
+++ b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h
@@ -2,6 +2,7 @@
#pragma once
#include <vespa/searchcore/proton/common/hw_info.h>
+#include <vespa/vespalib/util/executor.h>
#include <cstdint>
namespace vespa::config::search::core::internal { class InternalProtonType; }
@@ -13,14 +14,16 @@ namespace proton {
class ThreadingServiceConfig {
public:
using ProtonConfig = const vespa::config::search::core::internal::InternalProtonType;
+ using OptimizeFor = vespalib::Executor::OptimizeFor;
private:
- uint32_t _indexingThreads;
- uint32_t _defaultTaskLimit;
- uint32_t _semiUnboundTaskLimit;
+ uint32_t _indexingThreads;
+ uint32_t _defaultTaskLimit;
+ uint32_t _semiUnboundTaskLimit;
+ OptimizeFor _optimize;
private:
- ThreadingServiceConfig(uint32_t indexingThreads_, uint32_t defaultTaskLimit_, uint32_t semiUnboundTaskLimit_);
+ ThreadingServiceConfig(uint32_t indexingThreads_, uint32_t defaultTaskLimit_, uint32_t semiUnboundTaskLimit_, OptimizeFor optimize);
public:
static ThreadingServiceConfig make(const ProtonConfig &cfg, double concurrency, const HwInfo::Cpu &cpuInfo);
@@ -28,6 +31,7 @@ public:
uint32_t indexingThreads() const { return _indexingThreads; }
uint32_t defaultTaskLimit() const { return _defaultTaskLimit; }
uint32_t semiUnboundTaskLimit() const { return _semiUnboundTaskLimit; }
+ OptimizeFor optimize() const { return _optimize;}
};
}
diff --git a/searchlib/src/tests/common/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp b/searchlib/src/tests/common/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp
index b2b15ded274..9491617c135 100644
--- a/searchlib/src/tests/common/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp
+++ b/searchlib/src/tests/common/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp
@@ -10,13 +10,19 @@ using ExecutorId = search::ISequencedTaskExecutor::ExecutorId;
int main(int argc, char *argv[]) {
unsigned long numTasks = 1000000;
unsigned numThreads = 4;
+ unsigned taskLimit = 1000;
+ vespalib::Executor::OptimizeFor optimize = vespalib::Executor::OptimizeFor::LATENCY;
std::atomic<long> counter(0);
if (argc > 1)
numTasks = atol(argv[1]);
if (argc > 2)
numThreads = atoi(argv[2]);
+ if (argc > 3)
+ taskLimit = atoi(argv[3]);
+ if (argc > 4)
+ optimize = vespalib::Executor::OptimizeFor::THROUGHPUT;
- auto executor = SequencedTaskExecutor::create(numThreads);
+ auto executor = SequencedTaskExecutor::create(numThreads, taskLimit, optimize);
for (unsigned long tid(0); tid < numTasks; tid++) {
executor->executeTask(ExecutorId(tid%numThreads), vespalib::makeLambdaTask([&counter] { counter++; }));
}
diff --git a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.cpp b/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.cpp
index 30723a6eb2a..7b0c30ec9d8 100644
--- a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.cpp
+++ b/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.cpp
@@ -2,8 +2,10 @@
#include "sequencedtaskexecutor.h"
#include <vespa/vespalib/util/blockingthreadstackexecutor.h>
+#include <vespa/vespalib/util/singleexecutor.h>
using vespalib::BlockingThreadStackExecutor;
+using vespalib::SingleExecutor;
namespace search {
@@ -15,12 +17,16 @@ constexpr uint32_t stackSize = 128 * 1024;
std::unique_ptr<ISequencedTaskExecutor>
-SequencedTaskExecutor::create(uint32_t threads, uint32_t taskLimit)
+SequencedTaskExecutor::create(uint32_t threads, uint32_t taskLimit, OptimizeFor optimize)
{
auto executors = std::make_unique<std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>>>();
executors->reserve(threads);
for (uint32_t id = 0; id < threads; ++id) {
- executors->push_back(std::make_unique<BlockingThreadStackExecutor>(1, stackSize, taskLimit));
+ if (optimize == OptimizeFor::THROUGHPUT) {
+ executors->push_back(std::make_unique<SingleExecutor>(taskLimit));
+ } else {
+ executors->push_back(std::make_unique<BlockingThreadStackExecutor>(1, stackSize, taskLimit));
+ }
}
return std::unique_ptr<ISequencedTaskExecutor>(new SequencedTaskExecutor(std::move(executors)));
}
diff --git a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.h b/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.h
index a29e3d5226c..8568901006f 100644
--- a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.h
+++ b/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.h
@@ -23,6 +23,7 @@ class SequencedTaskExecutor final : public ISequencedTaskExecutor
SequencedTaskExecutor(std::unique_ptr<std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>>> executor);
public:
using ISequencedTaskExecutor::getExecutorId;
+ using OptimizeFor = vespalib::Executor::OptimizeFor;
~SequencedTaskExecutor();
@@ -30,7 +31,12 @@ public:
void executeTask(ExecutorId id, vespalib::Executor::Task::UP task) override;
void sync() override;
Stats getStats() override;
- static std::unique_ptr<ISequencedTaskExecutor> create(uint32_t threads, uint32_t taskLimit = 1000);
+
+ /*
+ * Note that if you choose Optimize::THROUGHPUT, you must ensure only a single producer, or synchronize on the outside.
+ */
+ static std::unique_ptr<ISequencedTaskExecutor>
+ create(uint32_t threads, uint32_t taskLimit = 1000, OptimizeFor optimize = OptimizeFor::LATENCY);
};
} // namespace search
diff --git a/staging_vespalib/CMakeLists.txt b/staging_vespalib/CMakeLists.txt
index 3c4e2a9f444..9fb48cbd4fc 100644
--- a/staging_vespalib/CMakeLists.txt
+++ b/staging_vespalib/CMakeLists.txt
@@ -30,6 +30,7 @@ vespa_define_module(
src/tests/shutdownguard
src/tests/state_server
src/tests/stllike
+ src/tests/singleexecutor
src/tests/timer
src/tests/util/process_memory_stats
src/tests/xmlserializable
diff --git a/staging_vespalib/src/tests/singleexecutor/CMakeLists.txt b/staging_vespalib/src/tests/singleexecutor/CMakeLists.txt
new file mode 100644
index 00000000000..c5d42d2c8c5
--- /dev/null
+++ b/staging_vespalib/src/tests/singleexecutor/CMakeLists.txt
@@ -0,0 +1,8 @@
+# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+vespa_add_executable(staging_vespalib_singleexecutor_test_app TEST
+ SOURCES
+ singleexecutor_test.cpp
+ DEPENDS
+ staging_vespalib
+)
+vespa_add_test(NAME staging_vespalib_singleexecutor_test_app COMMAND staging_vespalib_singleexecutor_test_app)
diff --git a/staging_vespalib/src/tests/singleexecutor/singleexecutor_test.cpp b/staging_vespalib/src/tests/singleexecutor/singleexecutor_test.cpp
new file mode 100644
index 00000000000..5dacaa5d204
--- /dev/null
+++ b/staging_vespalib/src/tests/singleexecutor/singleexecutor_test.cpp
@@ -0,0 +1,80 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/vespalib/testkit/testapp.h>
+
+#include <vespa/vespalib/util/singleexecutor.h>
+#include <vespa/vespalib/util/lambdatask.h>
+#include <atomic>
+
+using namespace vespalib;
+
+TEST("test that all tasks are executed") {
+
+ std::atomic<uint64_t> counter(0);
+ SingleExecutor executor(10);
+
+ for (uint64_t i(0); i < 10; i++) {
+ executor.execute(makeLambdaTask([&counter] {counter++;}));
+ }
+ executor.sync();
+ EXPECT_EQUAL(10u, counter);
+
+ counter = 0;
+ for (uint64_t i(0); i < 10000; i++) {
+ executor.execute(makeLambdaTask([&counter] {counter++;}));
+ }
+ executor.sync();
+ EXPECT_EQUAL(10000u, counter);
+}
+
+void verifyResizeTaskLimit(bool up) {
+ Monitor lock;
+ std::atomic<uint64_t> started(0);
+ std::atomic<uint64_t> allowed(0);
+ SingleExecutor executor(10);
+
+ uint32_t targetTaskLimit = up ? 20 : 5;
+ uint32_t roundedTaskLimit = roundUp2inN(targetTaskLimit);
+ EXPECT_NOT_EQUAL(16u, roundedTaskLimit);
+
+ for (uint64_t i(0); i < 10; i++) {
+ executor.execute(makeLambdaTask([&lock, &started, &allowed] {
+ started++;
+ MonitorGuard guard(lock);
+ while (allowed < started) {
+ guard.wait(1ms);
+ }
+ }));
+ }
+ while (started < 1);
+ EXPECT_EQUAL(1u, started);
+ executor.setTaskLimit(targetTaskLimit);
+ EXPECT_EQUAL(16u, executor.getTaskLimit());
+ allowed = 5;
+ while (started < 6);
+ EXPECT_EQUAL(6u, started);
+ EXPECT_EQUAL(16u, executor.getTaskLimit());
+ allowed = 10;
+ while (started < 10);
+ EXPECT_EQUAL(10u, started);
+ EXPECT_EQUAL(16u, executor.getTaskLimit());
+ executor.execute(makeLambdaTask([&lock, &started, &allowed] {
+ started++;
+ MonitorGuard guard(lock);
+ while (allowed < started) {
+ guard.wait(1ms);
+ }
+ }));
+ while (started < 11);
+ EXPECT_EQUAL(11u, started);
+ EXPECT_EQUAL(roundedTaskLimit, executor.getTaskLimit());
+ allowed = 11;
+}
+TEST("test that resizing up and down works") {
+ TEST_DO(verifyResizeTaskLimit(true));
+ TEST_DO(verifyResizeTaskLimit(false));
+
+
+}
+
+TEST_MAIN() { TEST_RUN_ALL(); }
diff --git a/staging_vespalib/src/vespa/vespalib/util/CMakeLists.txt b/staging_vespalib/src/vespa/vespalib/util/CMakeLists.txt
index 71364a813f6..ba03b77c941 100644
--- a/staging_vespalib/src/vespa/vespalib/util/CMakeLists.txt
+++ b/staging_vespalib/src/vespa/vespalib/util/CMakeLists.txt
@@ -17,6 +17,7 @@ vespa_add_library(staging_vespalib_vespalib_util OBJECT
rusage.cpp
shutdownguard.cpp
scheduledexecutor.cpp
+ singleexecutor.cpp
xmlserializable.cpp
xmlstream.cpp
DEPENDS
diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
new file mode 100644
index 00000000000..791b0876c19
--- /dev/null
+++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
@@ -0,0 +1,131 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "singleexecutor.h"
+#include <vespa/vespalib/util/time.h>
+
+namespace vespalib {
+
+SingleExecutor::SingleExecutor(uint32_t taskLimit)
+ : _taskLimit(vespalib::roundUp2inN(taskLimit)),
+ _wantedTaskLimit(_taskLimit.load()),
+ _rp(0),
+ _tasks(std::make_unique<Task::UP[]>(_taskLimit)),
+ _consumerMonitor(),
+ _producerMonitor(),
+ _thread(*this),
+ _lastAccepted(0),
+ _maxPending(0),
+ _wakeupConsumerAt(0),
+ _producerNeedWakeup(false),
+ _wp(0)
+{
+ _thread.start();
+}
+SingleExecutor::~SingleExecutor() {
+ sync();
+ _thread.stop().join();
+}
+
+size_t
+SingleExecutor::getNumThreads() const {
+ return 1;
+}
+
+uint64_t
+SingleExecutor::addTask(Task::UP task) {
+ MonitorGuard guard(_producerMonitor);
+ wait_for_room(guard);
+ uint64_t wp = _wp.load(std::memory_order_relaxed);
+ _tasks[index(wp)] = std::move(task);
+ _wp.store(wp + 1, std::memory_order_release);
+ return wp;
+}
+
+Executor::Task::UP
+SingleExecutor::execute(Task::UP task) {
+ uint64_t wp = addTask(std::move(task));
+ if (wp == _wakeupConsumerAt.load(std::memory_order_relaxed)) {
+ MonitorGuard guard(_consumerMonitor);
+ guard.signal();
+ }
+ return task;
+}
+
+void
+SingleExecutor::setTaskLimit(uint32_t taskLimit) {
+ _wantedTaskLimit = vespalib::roundUp2inN(taskLimit);
+}
+
+SingleExecutor &
+SingleExecutor::sync() {
+ uint64_t wp = _wp.load(std::memory_order_relaxed);
+ while (wp > _rp.load(std::memory_order_relaxed)) {
+ std::this_thread::sleep_for(1ms);
+ }
+ return *this;
+}
+
+void
+SingleExecutor::run() {
+ while (!_thread.stopped()) {
+ drain_tasks();
+ _wakeupConsumerAt.store(_wp.load(std::memory_order_relaxed) + (_taskLimit.load(std::memory_order_relaxed) >> 2), std::memory_order_relaxed);
+ MonitorGuard guard(_consumerMonitor);
+ guard.wait(10ms);
+ _wakeupConsumerAt.store(0, std::memory_order_relaxed);
+ }
+}
+
+void
+SingleExecutor::drain_tasks() {
+ while (numTasks() > 0) {
+ run_tasks_till(_wp.load(std::memory_order_acquire));
+ }
+}
+
+void
+SingleExecutor::run_tasks_till(uint64_t available) {
+ uint64_t consumed = _rp.load(std::memory_order_relaxed);
+ uint64_t left = available - consumed;
+ if (_maxPending.load(std::memory_order_relaxed) < left) {
+ _maxPending.store(left, std::memory_order_relaxed);
+ }
+ uint64_t wakeupLimit = _producerNeedWakeup.load(std::memory_order_relaxed)
+ ? (available - (left >> 2))
+ : 0;
+ while (consumed < available) {
+ Task::UP task = std::move(_tasks[index(consumed)]);
+ task->run();
+ _rp.store(++consumed, std::memory_order_release);
+ if (wakeupLimit == consumed) {
+ MonitorGuard guard(_producerMonitor);
+ guard.broadcast();
+ }
+ }
+}
+
+void
+SingleExecutor::wait_for_room(MonitorGuard & producerGuard) {
+ if (_taskLimit.load(std::memory_order_relaxed) != _wantedTaskLimit.load(std::memory_order_relaxed)) {
+ sync();
+ _tasks = std::make_unique<Task::UP[]>(_wantedTaskLimit);
+ _taskLimit = _wantedTaskLimit.load();
+ }
+ while (numTasks() >= _taskLimit.load(std::memory_order_relaxed)) {
+ _producerNeedWakeup.store(true, std::memory_order_relaxed);
+ producerGuard.wait(10ms);
+ _producerNeedWakeup.store(false, std::memory_order_relaxed);
+ }
+}
+
+ThreadExecutor::Stats
+SingleExecutor::getStats() {
+ uint64_t accepted = _wp.load(std::memory_order_relaxed);
+ Stats stats(_maxPending, (accepted - _lastAccepted), 0);
+ _lastAccepted = accepted;
+ _maxPending = 0;
+ return stats;
+}
+
+
+}
diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
new file mode 100644
index 00000000000..0ab89355566
--- /dev/null
+++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
@@ -0,0 +1,55 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <vespa/vespalib/util/threadexecutor.h>
+#include <vespa/vespalib/util/thread.h>
+#include <thread>
+#include <atomic>
+
+namespace vespalib {
+
+/**
+ * Has a single thread consuming tasks from a fixed size ringbuffer.
+ * Made for throughput where the producer has no interaction with the consumer and
+ * it is hence very cheap to produce a task. High and low watermark at 25%/75% is used
+ * to reduce ping-pong.
+ */
+class SingleExecutor final : public vespalib::SyncableThreadExecutor, vespalib::Runnable {
+public:
+ explicit SingleExecutor(uint32_t taskLimit);
+ ~SingleExecutor() override;
+ Task::UP execute(Task::UP task) override;
+ void setTaskLimit(uint32_t taskLimit) override;
+ SingleExecutor & sync() override;
+ size_t getNumThreads() const override;
+ uint32_t getTaskLimit() const { return _taskLimit.load(std::memory_order_relaxed); }
+ Stats getStats() override;
+private:
+ uint64_t addTask(Task::UP task);
+ void run() override;
+ void drain_tasks();
+ void run_tasks_till(uint64_t available);
+ void wait_for_room(MonitorGuard & guard);
+ uint64_t index(uint64_t counter) const {
+ return counter & (_taskLimit.load(std::memory_order_relaxed) - 1);
+ }
+
+ uint64_t numTasks() const {
+ return _wp.load(std::memory_order_relaxed) - _rp.load(std::memory_order_acquire);
+ }
+ std::atomic<uint32_t> _taskLimit;
+ std::atomic<uint32_t> _wantedTaskLimit;
+ std::atomic<uint64_t> _rp;
+ std::unique_ptr<Task::UP[]> _tasks;
+ vespalib::Monitor _consumerMonitor;
+ vespalib::Monitor _producerMonitor;
+ vespalib::Thread _thread;
+ uint64_t _lastAccepted;
+ std::atomic<uint64_t> _maxPending;
+ std::atomic<uint64_t> _wakeupConsumerAt;
+ std::atomic<bool> _producerNeedWakeup;
+ std::atomic<uint64_t> _wp;
+};
+
+}
diff --git a/vespalib/src/vespa/vespalib/util/executor.h b/vespalib/src/vespa/vespalib/util/executor.h
index ef5cb4b84fa..97cde4ffbe2 100644
--- a/vespalib/src/vespa/vespalib/util/executor.h
+++ b/vespalib/src/vespa/vespalib/util/executor.h
@@ -23,6 +23,8 @@ public:
virtual ~Task() {}
};
+ enum class OptimizeFor {LATENCY, THROUGHPUT};
+
/**
* Execute the given task using one of the internal threads some
* time in the future. The task may also be rejected in which case