diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-03-02 12:52:47 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-03-02 12:52:47 +0100 |
commit | 81084712352afe6c3cf68aedb695e8b90a51a640 (patch) | |
tree | 2796394c409b965c8b636b3985141ff803d1d181 | |
parent | afdbe50100d189e20d7f53c971834a58b006e554 (diff) | |
parent | 6449cf3d52ae44a54de59e478f8c0915671e23bb (diff) |
Merge pull request #12381 from vespa-engine/balder/use-single-threaded-executor
Balder/use single threaded executor
16 files changed, 337 insertions, 15 deletions
diff --git a/searchcore/src/vespa/searchcore/config/proton.def b/searchcore/src/vespa/searchcore/config/proton.def index eb4f1f6dc89..6040fc651c2 100644 --- a/searchcore/src/vespa/searchcore/config/proton.def +++ b/searchcore/src/vespa/searchcore/config/proton.def @@ -117,6 +117,10 @@ indexing.read.io enum {NORMAL, DIRECTIO} default=DIRECTIO restart ## Control number of threads used for indexing indexing.threads int default=1 restart +## Option to specify what is most important during indexing. +## This is experimental and will most likely be temporary. +indexing.optimize enum {LATENCY, THROUGHPUT} default=LATENCY restart + ## Maximum number of pending operations for each of the internal ## indexing threads. Only used when visibility delay is zero. indexing.tasklimit int default=1000 restart diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp index 302ffc93f6a..1ee1b703ea9 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp @@ -140,7 +140,8 @@ DocumentDB::DocumentDB(const vespalib::string &baseDir, _writeService(sharedExecutor, _writeServiceConfig.indexingThreads(), indexing_thread_stack_size, - _writeServiceConfig.defaultTaskLimit()), + _writeServiceConfig.defaultTaskLimit(), + _writeServiceConfig.optimize()), _initializeThreads(std::move(initializeThreads)), _initConfigSnapshot(), _initConfigSerialNum(0u), diff --git a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp index 058197f0271..6e7b4967f6d 100644 --- a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp @@ -10,7 +10,8 @@ using search::SequencedTaskExecutor; namespace proton { ExecutorThreadingService::ExecutorThreadingService(vespalib::ThreadStackExecutorBase & sharedExecutor, - uint32_t threads, uint32_t stackSize, uint32_t taskLimit) + uint32_t threads, uint32_t stackSize, uint32_t taskLimit, + OptimizeFor optimize) : _sharedExecutor(sharedExecutor), _masterExecutor(1, stackSize), @@ -21,7 +22,7 @@ ExecutorThreadingService::ExecutorThreadingService(vespalib::ThreadStackExecutor _summaryService(_summaryExecutor), _indexFieldInverter(SequencedTaskExecutor::create(threads, taskLimit)), _indexFieldWriter(SequencedTaskExecutor::create(threads, taskLimit)), - _attributeFieldWriter(SequencedTaskExecutor::create(threads, taskLimit)) + _attributeFieldWriter(SequencedTaskExecutor::create(threads, taskLimit, optimize)) { } diff --git a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h index 7bbe1cb162a..2e4dd2035f3 100644 --- a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h +++ b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h @@ -29,6 +29,7 @@ private: std::unique_ptr<search::ISequencedTaskExecutor> _attributeFieldWriter; public: + using OptimizeFor = vespalib::Executor::OptimizeFor; /** * Constructor. * @@ -38,7 +39,8 @@ public: ExecutorThreadingService(vespalib::ThreadStackExecutorBase &sharedExecutor, uint32_t threads = 1, uint32_t stackSize = 128 * 1024, - uint32_t taskLimit = 1000); + uint32_t taskLimit = 1000, + OptimizeFor optimize = OptimizeFor::LATENCY); ~ExecutorThreadingService() override; /** diff --git a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp index 55aa1a20ef6..5bc6ef543f3 100644 --- a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp @@ -7,13 +7,16 @@ namespace proton { using ProtonConfig = ThreadingServiceConfig::ProtonConfig; +using OptimizeFor = vespalib::Executor::OptimizeFor; ThreadingServiceConfig::ThreadingServiceConfig(uint32_t indexingThreads_, uint32_t defaultTaskLimit_, - uint32_t semiUnboundTaskLimit_) + uint32_t semiUnboundTaskLimit_, + OptimizeFor optimize) : _indexingThreads(indexingThreads_), _defaultTaskLimit(defaultTaskLimit_), - _semiUnboundTaskLimit(semiUnboundTaskLimit_) + _semiUnboundTaskLimit(semiUnboundTaskLimit_), + _optimize(optimize) { } @@ -30,6 +33,16 @@ calculateIndexingThreads(uint32_t cfgIndexingThreads, double concurrency, const return std::max(indexingThreads, 1u); } +OptimizeFor +selectOptimization(ProtonConfig::Indexing::Optimize optimize) { + using CfgOptimize = ProtonConfig::Indexing::Optimize; + switch (optimize) { + case CfgOptimize::LATENCY: return OptimizeFor::LATENCY; + case CfgOptimize::THROUGHPUT: return OptimizeFor::THROUGHPUT; + } + return OptimizeFor::LATENCY; +} + } ThreadingServiceConfig @@ -37,7 +50,8 @@ ThreadingServiceConfig::make(const ProtonConfig &cfg, double concurrency, const { uint32_t indexingThreads = calculateIndexingThreads(cfg.indexing.threads, concurrency, cpuInfo); return ThreadingServiceConfig(indexingThreads, cfg.indexing.tasklimit, - (cfg.indexing.semiunboundtasklimit / indexingThreads)); + (cfg.indexing.semiunboundtasklimit / indexingThreads), + selectOptimization(cfg.indexing.optimize)); } } diff --git a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h index be39f516598..149215f97dc 100644 --- a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h +++ b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h @@ -2,6 +2,7 @@ #pragma once #include <vespa/searchcore/proton/common/hw_info.h> +#include <vespa/vespalib/util/executor.h> #include <cstdint> namespace vespa::config::search::core::internal { class InternalProtonType; } @@ -13,14 +14,16 @@ namespace proton { class ThreadingServiceConfig { public: using ProtonConfig = const vespa::config::search::core::internal::InternalProtonType; + using OptimizeFor = vespalib::Executor::OptimizeFor; private: - uint32_t _indexingThreads; - uint32_t _defaultTaskLimit; - uint32_t _semiUnboundTaskLimit; + uint32_t _indexingThreads; + uint32_t _defaultTaskLimit; + uint32_t _semiUnboundTaskLimit; + OptimizeFor _optimize; private: - ThreadingServiceConfig(uint32_t indexingThreads_, uint32_t defaultTaskLimit_, uint32_t semiUnboundTaskLimit_); + ThreadingServiceConfig(uint32_t indexingThreads_, uint32_t defaultTaskLimit_, uint32_t semiUnboundTaskLimit_, OptimizeFor optimize); public: static ThreadingServiceConfig make(const ProtonConfig &cfg, double concurrency, const HwInfo::Cpu &cpuInfo); @@ -28,6 +31,7 @@ public: uint32_t indexingThreads() const { return _indexingThreads; } uint32_t defaultTaskLimit() const { return _defaultTaskLimit; } uint32_t semiUnboundTaskLimit() const { return _semiUnboundTaskLimit; } + OptimizeFor optimize() const { return _optimize;} }; } diff --git a/searchlib/src/tests/common/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp b/searchlib/src/tests/common/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp index b2b15ded274..9491617c135 100644 --- a/searchlib/src/tests/common/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp +++ b/searchlib/src/tests/common/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp @@ -10,13 +10,19 @@ using ExecutorId = search::ISequencedTaskExecutor::ExecutorId; int main(int argc, char *argv[]) { unsigned long numTasks = 1000000; unsigned numThreads = 4; + unsigned taskLimit = 1000; + vespalib::Executor::OptimizeFor optimize = vespalib::Executor::OptimizeFor::LATENCY; std::atomic<long> counter(0); if (argc > 1) numTasks = atol(argv[1]); if (argc > 2) numThreads = atoi(argv[2]); + if (argc > 3) + taskLimit = atoi(argv[3]); + if (argc > 4) + optimize = vespalib::Executor::OptimizeFor::THROUGHPUT; - auto executor = SequencedTaskExecutor::create(numThreads); + auto executor = SequencedTaskExecutor::create(numThreads, taskLimit, optimize); for (unsigned long tid(0); tid < numTasks; tid++) { executor->executeTask(ExecutorId(tid%numThreads), vespalib::makeLambdaTask([&counter] { counter++; })); } diff --git a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.cpp b/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.cpp index 30723a6eb2a..7b0c30ec9d8 100644 --- a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.cpp +++ b/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.cpp @@ -2,8 +2,10 @@ #include "sequencedtaskexecutor.h" #include <vespa/vespalib/util/blockingthreadstackexecutor.h> +#include <vespa/vespalib/util/singleexecutor.h> using vespalib::BlockingThreadStackExecutor; +using vespalib::SingleExecutor; namespace search { @@ -15,12 +17,16 @@ constexpr uint32_t stackSize = 128 * 1024; std::unique_ptr<ISequencedTaskExecutor> -SequencedTaskExecutor::create(uint32_t threads, uint32_t taskLimit) +SequencedTaskExecutor::create(uint32_t threads, uint32_t taskLimit, OptimizeFor optimize) { auto executors = std::make_unique<std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>>>(); executors->reserve(threads); for (uint32_t id = 0; id < threads; ++id) { - executors->push_back(std::make_unique<BlockingThreadStackExecutor>(1, stackSize, taskLimit)); + if (optimize == OptimizeFor::THROUGHPUT) { + executors->push_back(std::make_unique<SingleExecutor>(taskLimit)); + } else { + executors->push_back(std::make_unique<BlockingThreadStackExecutor>(1, stackSize, taskLimit)); + } } return std::unique_ptr<ISequencedTaskExecutor>(new SequencedTaskExecutor(std::move(executors))); } diff --git a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.h b/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.h index a29e3d5226c..8568901006f 100644 --- a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.h +++ b/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.h @@ -23,6 +23,7 @@ class SequencedTaskExecutor final : public ISequencedTaskExecutor SequencedTaskExecutor(std::unique_ptr<std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>>> executor); public: using ISequencedTaskExecutor::getExecutorId; + using OptimizeFor = vespalib::Executor::OptimizeFor; ~SequencedTaskExecutor(); @@ -30,7 +31,12 @@ public: void executeTask(ExecutorId id, vespalib::Executor::Task::UP task) override; void sync() override; Stats getStats() override; - static std::unique_ptr<ISequencedTaskExecutor> create(uint32_t threads, uint32_t taskLimit = 1000); + + /* + * Note that if you choose Optimize::THROUGHPUT, you must ensure only a single producer, or synchronize on the outside. + */ + static std::unique_ptr<ISequencedTaskExecutor> + create(uint32_t threads, uint32_t taskLimit = 1000, OptimizeFor optimize = OptimizeFor::LATENCY); }; } // namespace search diff --git a/staging_vespalib/CMakeLists.txt b/staging_vespalib/CMakeLists.txt index 3c4e2a9f444..9fb48cbd4fc 100644 --- a/staging_vespalib/CMakeLists.txt +++ b/staging_vespalib/CMakeLists.txt @@ -30,6 +30,7 @@ vespa_define_module( src/tests/shutdownguard src/tests/state_server src/tests/stllike + src/tests/singleexecutor src/tests/timer src/tests/util/process_memory_stats src/tests/xmlserializable diff --git a/staging_vespalib/src/tests/singleexecutor/CMakeLists.txt b/staging_vespalib/src/tests/singleexecutor/CMakeLists.txt new file mode 100644 index 00000000000..c5d42d2c8c5 --- /dev/null +++ b/staging_vespalib/src/tests/singleexecutor/CMakeLists.txt @@ -0,0 +1,8 @@ +# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_executable(staging_vespalib_singleexecutor_test_app TEST + SOURCES + singleexecutor_test.cpp + DEPENDS + staging_vespalib +) +vespa_add_test(NAME staging_vespalib_singleexecutor_test_app COMMAND staging_vespalib_singleexecutor_test_app) diff --git a/staging_vespalib/src/tests/singleexecutor/singleexecutor_test.cpp b/staging_vespalib/src/tests/singleexecutor/singleexecutor_test.cpp new file mode 100644 index 00000000000..5dacaa5d204 --- /dev/null +++ b/staging_vespalib/src/tests/singleexecutor/singleexecutor_test.cpp @@ -0,0 +1,80 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/vespalib/testkit/testapp.h> + +#include <vespa/vespalib/util/singleexecutor.h> +#include <vespa/vespalib/util/lambdatask.h> +#include <atomic> + +using namespace vespalib; + +TEST("test that all tasks are executed") { + + std::atomic<uint64_t> counter(0); + SingleExecutor executor(10); + + for (uint64_t i(0); i < 10; i++) { + executor.execute(makeLambdaTask([&counter] {counter++;})); + } + executor.sync(); + EXPECT_EQUAL(10u, counter); + + counter = 0; + for (uint64_t i(0); i < 10000; i++) { + executor.execute(makeLambdaTask([&counter] {counter++;})); + } + executor.sync(); + EXPECT_EQUAL(10000u, counter); +} + +void verifyResizeTaskLimit(bool up) { + Monitor lock; + std::atomic<uint64_t> started(0); + std::atomic<uint64_t> allowed(0); + SingleExecutor executor(10); + + uint32_t targetTaskLimit = up ? 20 : 5; + uint32_t roundedTaskLimit = roundUp2inN(targetTaskLimit); + EXPECT_NOT_EQUAL(16u, roundedTaskLimit); + + for (uint64_t i(0); i < 10; i++) { + executor.execute(makeLambdaTask([&lock, &started, &allowed] { + started++; + MonitorGuard guard(lock); + while (allowed < started) { + guard.wait(1ms); + } + })); + } + while (started < 1); + EXPECT_EQUAL(1u, started); + executor.setTaskLimit(targetTaskLimit); + EXPECT_EQUAL(16u, executor.getTaskLimit()); + allowed = 5; + while (started < 6); + EXPECT_EQUAL(6u, started); + EXPECT_EQUAL(16u, executor.getTaskLimit()); + allowed = 10; + while (started < 10); + EXPECT_EQUAL(10u, started); + EXPECT_EQUAL(16u, executor.getTaskLimit()); + executor.execute(makeLambdaTask([&lock, &started, &allowed] { + started++; + MonitorGuard guard(lock); + while (allowed < started) { + guard.wait(1ms); + } + })); + while (started < 11); + EXPECT_EQUAL(11u, started); + EXPECT_EQUAL(roundedTaskLimit, executor.getTaskLimit()); + allowed = 11; +} +TEST("test that resizing up and down works") { + TEST_DO(verifyResizeTaskLimit(true)); + TEST_DO(verifyResizeTaskLimit(false)); + + +} + +TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/staging_vespalib/src/vespa/vespalib/util/CMakeLists.txt b/staging_vespalib/src/vespa/vespalib/util/CMakeLists.txt index 71364a813f6..ba03b77c941 100644 --- a/staging_vespalib/src/vespa/vespalib/util/CMakeLists.txt +++ b/staging_vespalib/src/vespa/vespalib/util/CMakeLists.txt @@ -17,6 +17,7 @@ vespa_add_library(staging_vespalib_vespalib_util OBJECT rusage.cpp shutdownguard.cpp scheduledexecutor.cpp + singleexecutor.cpp xmlserializable.cpp xmlstream.cpp DEPENDS diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp new file mode 100644 index 00000000000..791b0876c19 --- /dev/null +++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp @@ -0,0 +1,131 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "singleexecutor.h" +#include <vespa/vespalib/util/time.h> + +namespace vespalib { + +SingleExecutor::SingleExecutor(uint32_t taskLimit) + : _taskLimit(vespalib::roundUp2inN(taskLimit)), + _wantedTaskLimit(_taskLimit.load()), + _rp(0), + _tasks(std::make_unique<Task::UP[]>(_taskLimit)), + _consumerMonitor(), + _producerMonitor(), + _thread(*this), + _lastAccepted(0), + _maxPending(0), + _wakeupConsumerAt(0), + _producerNeedWakeup(false), + _wp(0) +{ + _thread.start(); +} +SingleExecutor::~SingleExecutor() { + sync(); + _thread.stop().join(); +} + +size_t +SingleExecutor::getNumThreads() const { + return 1; +} + +uint64_t +SingleExecutor::addTask(Task::UP task) { + MonitorGuard guard(_producerMonitor); + wait_for_room(guard); + uint64_t wp = _wp.load(std::memory_order_relaxed); + _tasks[index(wp)] = std::move(task); + _wp.store(wp + 1, std::memory_order_release); + return wp; +} + +Executor::Task::UP +SingleExecutor::execute(Task::UP task) { + uint64_t wp = addTask(std::move(task)); + if (wp == _wakeupConsumerAt.load(std::memory_order_relaxed)) { + MonitorGuard guard(_consumerMonitor); + guard.signal(); + } + return task; +} + +void +SingleExecutor::setTaskLimit(uint32_t taskLimit) { + _wantedTaskLimit = vespalib::roundUp2inN(taskLimit); +} + +SingleExecutor & +SingleExecutor::sync() { + uint64_t wp = _wp.load(std::memory_order_relaxed); + while (wp > _rp.load(std::memory_order_relaxed)) { + std::this_thread::sleep_for(1ms); + } + return *this; +} + +void +SingleExecutor::run() { + while (!_thread.stopped()) { + drain_tasks(); + _wakeupConsumerAt.store(_wp.load(std::memory_order_relaxed) + (_taskLimit.load(std::memory_order_relaxed) >> 2), std::memory_order_relaxed); + MonitorGuard guard(_consumerMonitor); + guard.wait(10ms); + _wakeupConsumerAt.store(0, std::memory_order_relaxed); + } +} + +void +SingleExecutor::drain_tasks() { + while (numTasks() > 0) { + run_tasks_till(_wp.load(std::memory_order_acquire)); + } +} + +void +SingleExecutor::run_tasks_till(uint64_t available) { + uint64_t consumed = _rp.load(std::memory_order_relaxed); + uint64_t left = available - consumed; + if (_maxPending.load(std::memory_order_relaxed) < left) { + _maxPending.store(left, std::memory_order_relaxed); + } + uint64_t wakeupLimit = _producerNeedWakeup.load(std::memory_order_relaxed) + ? (available - (left >> 2)) + : 0; + while (consumed < available) { + Task::UP task = std::move(_tasks[index(consumed)]); + task->run(); + _rp.store(++consumed, std::memory_order_release); + if (wakeupLimit == consumed) { + MonitorGuard guard(_producerMonitor); + guard.broadcast(); + } + } +} + +void +SingleExecutor::wait_for_room(MonitorGuard & producerGuard) { + if (_taskLimit.load(std::memory_order_relaxed) != _wantedTaskLimit.load(std::memory_order_relaxed)) { + sync(); + _tasks = std::make_unique<Task::UP[]>(_wantedTaskLimit); + _taskLimit = _wantedTaskLimit.load(); + } + while (numTasks() >= _taskLimit.load(std::memory_order_relaxed)) { + _producerNeedWakeup.store(true, std::memory_order_relaxed); + producerGuard.wait(10ms); + _producerNeedWakeup.store(false, std::memory_order_relaxed); + } +} + +ThreadExecutor::Stats +SingleExecutor::getStats() { + uint64_t accepted = _wp.load(std::memory_order_relaxed); + Stats stats(_maxPending, (accepted - _lastAccepted), 0); + _lastAccepted = accepted; + _maxPending = 0; + return stats; +} + + +} diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h new file mode 100644 index 00000000000..0ab89355566 --- /dev/null +++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h @@ -0,0 +1,55 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <vespa/vespalib/util/threadexecutor.h> +#include <vespa/vespalib/util/thread.h> +#include <thread> +#include <atomic> + +namespace vespalib { + +/** + * Has a single thread consuming tasks from a fixed size ringbuffer. + * Made for throughput where the producer has no interaction with the consumer and + * it is hence very cheap to produce a task. High and low watermark at 25%/75% is used + * to reduce ping-pong. + */ +class SingleExecutor final : public vespalib::SyncableThreadExecutor, vespalib::Runnable { +public: + explicit SingleExecutor(uint32_t taskLimit); + ~SingleExecutor() override; + Task::UP execute(Task::UP task) override; + void setTaskLimit(uint32_t taskLimit) override; + SingleExecutor & sync() override; + size_t getNumThreads() const override; + uint32_t getTaskLimit() const { return _taskLimit.load(std::memory_order_relaxed); } + Stats getStats() override; +private: + uint64_t addTask(Task::UP task); + void run() override; + void drain_tasks(); + void run_tasks_till(uint64_t available); + void wait_for_room(MonitorGuard & guard); + uint64_t index(uint64_t counter) const { + return counter & (_taskLimit.load(std::memory_order_relaxed) - 1); + } + + uint64_t numTasks() const { + return _wp.load(std::memory_order_relaxed) - _rp.load(std::memory_order_acquire); + } + std::atomic<uint32_t> _taskLimit; + std::atomic<uint32_t> _wantedTaskLimit; + std::atomic<uint64_t> _rp; + std::unique_ptr<Task::UP[]> _tasks; + vespalib::Monitor _consumerMonitor; + vespalib::Monitor _producerMonitor; + vespalib::Thread _thread; + uint64_t _lastAccepted; + std::atomic<uint64_t> _maxPending; + std::atomic<uint64_t> _wakeupConsumerAt; + std::atomic<bool> _producerNeedWakeup; + std::atomic<uint64_t> _wp; +}; + +} diff --git a/vespalib/src/vespa/vespalib/util/executor.h b/vespalib/src/vespa/vespalib/util/executor.h index ef5cb4b84fa..97cde4ffbe2 100644 --- a/vespalib/src/vespa/vespalib/util/executor.h +++ b/vespalib/src/vespa/vespalib/util/executor.h @@ -23,6 +23,8 @@ public: virtual ~Task() {} }; + enum class OptimizeFor {LATENCY, THROUGHPUT}; + /** * Execute the given task using one of the internal threads some * time in the future. The task may also be rejected in which case |