diff options
16 files changed, 192 insertions, 143 deletions
diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/file/Templar.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/file/Templar.java deleted file mode 100644 index 553f2ffba36..00000000000 --- a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/file/Templar.java +++ /dev/null @@ -1,75 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.hosted.node.admin.task.util.file; - -import java.nio.file.Path; -import java.util.HashMap; -import java.util.Map; - -/** - * A very simple template engine when there's little complexity and lots of Velocity special characters $ and #, - * i.e. typically shell script. - * - * @author hakonhall - */ -public class Templar { - private final String template; - - private static final String prefix = "<%="; - private static final String suffix = "%>"; - - private final Map<String, String> settings = new HashMap<>(); - - public static Templar fromUtf8File(Path path) { - return new Templar(new UnixPath(path).readUtf8File()); - } - - public Templar(String template) { - this.template = template; - } - - public Templar set(String name, String value) { - settings.put(name, value); - return this; - } - - public String resolve() { - StringBuilder text = new StringBuilder(template.length() * 2); - - int start= 0; - int end; - - for (; start < template.length(); start = end) { - int prefixStart = template.indexOf(prefix, start); - - - if (prefixStart == -1) { - text.append(template, start, template.length()); - break; - } else { - text.append(template, start, prefixStart); - } - - int suffixStart = template.indexOf(suffix, prefixStart + prefix.length()); - if (suffixStart == -1) { - throw new IllegalArgumentException("Prefix at offset " + prefixStart + " is not terminated"); - } - - int prefixEnd = prefixStart + prefix.length(); - String name = template.substring(prefixEnd, suffixStart).trim(); - String value = settings.get(name); - if (value == null) { - throw new IllegalArgumentException("No value is set for name '" + name + "' at offset " + prefixEnd); - } - - text.append(value); - - end = suffixStart + suffix.length(); - } - - return text.toString(); - } - - public FileWriter getFileWriterTo(Path path) { - return new FileWriter(path, this::resolve); - } -} diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/template/Template.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/template/Template.java index ea55af17a0e..6d80ac2cad9 100644 --- a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/template/Template.java +++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/template/Template.java @@ -26,12 +26,15 @@ import java.util.Optional; * id: a valid Java identifier * </pre> * + * <p>Other directive delimiters than "%{" and "}" may be used, see {@link TemplateDescriptor}.</p> + * * <p>Fill the template with variable values ({@link #set(String, String) set()}, set if conditions * ({@link #set(String, boolean)}), add list elements ({@link #add(String) add()}, etc, and finally * render it as a String ({@link #render()}).</p> * * <p>To reuse a template, create the template and work on snapshots of that ({@link #snapshot()}).</p> * + * @see TemplateDescriptor * @author hakonhall */ public class Template implements Form { diff --git a/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/task/util/file/TemplarTest.java b/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/task/util/file/TemplarTest.java deleted file mode 100644 index ed410ffc1d1..00000000000 --- a/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/task/util/file/TemplarTest.java +++ /dev/null @@ -1,21 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.hosted.node.admin.task.util.file; - -import org.junit.Test; - -import static org.junit.Assert.assertEquals; - -/** - * @author hakonhall - */ -public class TemplarTest { - @Test - public void test() { - Templar templar = new Templar("x y <%= foo %>, some other <%=bar%> text"); - templar.set("foo", "fidelity") - .set("bar", "halimov") - .set("not", "used"); - - assertEquals("x y fidelity, some other halimov text", templar.resolve()); - } -}
\ No newline at end of file diff --git a/searchcore/src/apps/tests/persistenceconformance_test.cpp b/searchcore/src/apps/tests/persistenceconformance_test.cpp index 214c57557bc..385d0eb363e 100644 --- a/searchcore/src/apps/tests/persistenceconformance_test.cpp +++ b/searchcore/src/apps/tests/persistenceconformance_test.cpp @@ -178,6 +178,12 @@ private: MockSharedThreadingService _shared_service; storage::spi::dummy::DummyBucketExecutor _bucketExecutor; + static std::shared_ptr<ProtonConfig> make_proton_config() { + ProtonConfigBuilder proton_config; + proton_config.indexing.optimize = ProtonConfigBuilder::Indexing::Optimize::LATENCY; + return std::make_shared<ProtonConfig>(proton_config); + } + public: DocumentDBFactory(const vespalib::string &baseDir, int tlsListenPort); ~DocumentDBFactory() override; @@ -196,7 +202,7 @@ public: TuneFileDocumentDB::SP tuneFileDocDB(new TuneFileDocumentDB()); DocumentDBConfigHelper mgr(spec, docType.getName()); auto b = std::make_shared<BootstrapConfig>(1, factory.getTypeCfg(), factory.getTypeRepo(), - std::make_shared<ProtonConfig>(), + make_proton_config(), std::make_shared<FiledistributorrpcConfig>(), std::make_shared<BucketspacesConfig>(), tuneFileDocDB, HwInfo()); diff --git a/searchlib/src/vespa/searchlib/diskindex/field_merger.h b/searchlib/src/vespa/searchlib/diskindex/field_merger.h index 5017a7d5192..bbb4efc03b1 100644 --- a/searchlib/src/vespa/searchlib/diskindex/field_merger.h +++ b/searchlib/src/vespa/searchlib/diskindex/field_merger.h @@ -53,7 +53,6 @@ class FieldMerger std::unique_ptr<FieldWriter> _writer; State _state; bool _failed; - bool _force_small_merge_chunk; void make_tmp_dirs(); bool clean_tmp_dirs(); diff --git a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp index ef7f8bfb0f6..243935d4013 100644 --- a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp +++ b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp @@ -96,6 +96,23 @@ TEST_F("require that task with same component id are serialized", Fixture) EXPECT_EQUAL(42, tv->_val); } +TEST_F("require that task with same component id are serialized when executed with list", Fixture) +{ + std::shared_ptr<TestObj> tv(std::make_shared<TestObj>()); + EXPECT_EQUAL(0, tv->_val); + ISequencedTaskExecutor::ExecutorId executorId = f._threads->getExecutorId(0); + ISequencedTaskExecutor::TaskList list; + list.template emplace_back(executorId, makeLambdaTask([=]() { usleep(2000); tv->modify(0, 14); })); + list.template emplace_back(executorId, makeLambdaTask([=]() { tv->modify(14, 42); })); + f._threads->executeTasks(std::move(list)); + tv->wait(2); + EXPECT_EQUAL(0, tv->_fail); + EXPECT_EQUAL(42, tv->_val); + f._threads->sync_all(); + EXPECT_EQUAL(0, tv->_fail); + EXPECT_EQUAL(42, tv->_val); +} + TEST_F("require that task with different component ids are not serialized", Fixture) { int tryCnt = 0; @@ -136,7 +153,8 @@ TEST_F("require that task with same string component id are serialized", Fixture namespace { -int detectSerializeFailure(Fixture &f, vespalib::stringref altComponentId, int tryLimit) +int +detectSerializeFailure(Fixture &f, vespalib::stringref altComponentId, int tryLimit) { int tryCnt = 0; for (tryCnt = 0; tryCnt < tryLimit; ++tryCnt) { @@ -158,7 +176,8 @@ int detectSerializeFailure(Fixture &f, vespalib::stringref altComponentId, int t return tryCnt; } -vespalib::string makeAltComponentId(Fixture &f) +vespalib::string +makeAltComponentId(Fixture &f) { int tryCnt = 0; char altComponentId[20]; diff --git a/staging_vespalib/src/tests/singleexecutor/singleexecutor_test.cpp b/staging_vespalib/src/tests/singleexecutor/singleexecutor_test.cpp index dd71380f64a..56352ff3c0d 100644 --- a/staging_vespalib/src/tests/singleexecutor/singleexecutor_test.cpp +++ b/staging_vespalib/src/tests/singleexecutor/singleexecutor_test.cpp @@ -30,6 +30,28 @@ TEST("test that all tasks are executed") { EXPECT_EQUAL(10000u, counter); } +TEST("test that executor can overflow") { + constexpr size_t NUM_TASKS = 1000; + std::atomic<uint64_t> counter(0); + vespalib::Gate gate; + SingleExecutor executor(sequenced_executor, 10, false, 1, 1ms); + executor.execute(makeLambdaTask([&gate] { gate.await();})); + + for(size_t i(0); i < NUM_TASKS; i++) { + executor.execute(makeLambdaTask([&counter, i] { + EXPECT_EQUAL(i, counter); + counter++; + })); + } + EXPECT_EQUAL(0u, counter); + ExecutorStats stats = executor.getStats(); + EXPECT_EQUAL(NUM_TASKS + 1, stats.acceptedTasks); + EXPECT_EQUAL(NUM_TASKS, stats.queueSize.max()); + gate.countDown(); + executor.sync(); + EXPECT_EQUAL(NUM_TASKS, counter); +} + void verifyResizeTaskLimit(bool up) { std::mutex lock; std::condition_variable cond; @@ -38,7 +60,7 @@ void verifyResizeTaskLimit(bool up) { constexpr uint32_t INITIAL = 20; const uint32_t INITIAL_2inN = roundUp2inN(INITIAL); double waterMarkRatio = 0.5; - SingleExecutor executor(sequenced_executor, INITIAL, INITIAL*waterMarkRatio, 10ms); + SingleExecutor executor(sequenced_executor, INITIAL, true, INITIAL*waterMarkRatio, 10ms); EXPECT_EQUAL(INITIAL_2inN, executor.getTaskLimit()); EXPECT_EQUAL(uint32_t(INITIAL_2inN*waterMarkRatio), executor.get_watermark()); diff --git a/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.cpp index c54f182891c..b31d72da3b1 100644 --- a/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.cpp @@ -12,9 +12,16 @@ ISequencedTaskExecutor::ISequencedTaskExecutor(uint32_t numExecutors) ISequencedTaskExecutor::~ISequencedTaskExecutor() = default; +void +ISequencedTaskExecutor::executeTasks(TaskList tasks) { + for (auto & task : tasks) { + executeTask(task.first, std::move(task.second)); + } +} + ISequencedTaskExecutor::ExecutorId -ISequencedTaskExecutor::getExecutorIdFromName(vespalib::stringref componentId) const { - vespalib::hash<vespalib::stringref> hashfun; +ISequencedTaskExecutor::getExecutorIdFromName(stringref componentId) const { + hash<stringref> hashfun; return getExecutorId(hashfun(componentId)); } diff --git a/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h b/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h index 3fe6fb5d678..ff90556e3e4 100644 --- a/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h +++ b/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h @@ -14,7 +14,7 @@ namespace vespalib { * Interface class to run multiple tasks in parallel, but tasks with same * id has to be run in sequence. */ -class ISequencedTaskExecutor : public vespalib::IWakeup +class ISequencedTaskExecutor : public IWakeup { public: class ExecutorId { @@ -28,6 +28,7 @@ public: private: uint32_t _id; }; + using TaskList = std::vector<std::pair<ExecutorId, Executor::Task::UP>>; ISequencedTaskExecutor(uint32_t numExecutors); virtual ~ISequencedTaskExecutor(); @@ -40,7 +41,7 @@ public: virtual ExecutorId getExecutorId(uint64_t componentId) const = 0; uint32_t getNumExecutors() const { return _numExecutors; } - ExecutorId getExecutorIdFromName(vespalib::stringref componentId) const; + ExecutorId getExecutorIdFromName(stringref componentId) const; /** * Returns an executor id that is NOT equal to the given executor id, @@ -58,7 +59,13 @@ public: * @param id which internal executor to use * @param task unique pointer to the task to be executed */ - virtual void executeTask(ExecutorId id, vespalib::Executor::Task::UP task) = 0; + virtual void executeTask(ExecutorId id, Executor::Task::UP task) = 0; + /** + * Schedule a list of tasks to run after all previously scheduled tasks with + * same id. Default is to just iterate and execute one by one, but implementations + * that can schedule all in one go more efficiently can implement this one. + */ + virtual void executeTasks(TaskList tasks); /** * Call this one to ensure you get the attention of the workers. */ @@ -74,7 +81,7 @@ public: */ template <class FunctionType> void executeLambda(ExecutorId id, FunctionType &&function) { - executeTask(id, vespalib::makeLambdaTask(std::forward<FunctionType>(function))); + executeTask(id, makeLambdaTask(std::forward<FunctionType>(function))); } /** * Wait for all scheduled tasks to complete. @@ -83,7 +90,7 @@ public: virtual void setTaskLimit(uint32_t taskLimit) = 0; - virtual vespalib::ExecutorStats getStats() = 0; + virtual ExecutorStats getStats() = 0; /** * Wrap lambda function into a task and schedule it to be run. @@ -96,7 +103,7 @@ public: template <class FunctionType> void execute(uint64_t componentId, FunctionType &&function) { ExecutorId id = getExecutorId(componentId); - executeTask(id, vespalib::makeLambdaTask(std::forward<FunctionType>(function))); + executeTask(id, makeLambdaTask(std::forward<FunctionType>(function))); } /** @@ -109,7 +116,7 @@ public: */ template <class FunctionType> void execute(ExecutorId id, FunctionType &&function) { - executeTask(id, vespalib::makeLambdaTask(std::forward<FunctionType>(function))); + executeTask(id, makeLambdaTask(std::forward<FunctionType>(function))); } private: diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp index 76b0235301b..58ae862f7c6 100644 --- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp @@ -67,7 +67,7 @@ SequencedTaskExecutor::create(Runnable::init_fun_t func, uint32_t threads, uint3 for (uint32_t id = 0; id < threads; ++id) { if (optimize == OptimizeFor::THROUGHPUT) { uint32_t watermark = (kindOfWatermark == 0) ? taskLimit / 10 : kindOfWatermark; - executors.push_back(std::make_unique<SingleExecutor>(func, taskLimit, watermark, 100ms)); + executors.push_back(std::make_unique<SingleExecutor>(func, taskLimit, true, watermark, 100ms)); } else { executors.push_back(std::make_unique<BlockingThreadStackExecutor>(1, stackSize, taskLimit, func)); } diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.cpp b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.cpp index 5ae8e96b606..d81b8ec1db6 100644 --- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.cpp @@ -17,7 +17,7 @@ SequencedTaskExecutorObserver::SequencedTaskExecutorObserver(ISequencedTaskExecu SequencedTaskExecutorObserver::~SequencedTaskExecutorObserver() = default; void -SequencedTaskExecutorObserver::executeTask(ExecutorId id, vespalib::Executor::Task::UP task) +SequencedTaskExecutorObserver::executeTask(ExecutorId id, Executor::Task::UP task) { ++_executeCnt; { @@ -28,6 +28,19 @@ SequencedTaskExecutorObserver::executeTask(ExecutorId id, vespalib::Executor::Ta } void +SequencedTaskExecutorObserver::executeTasks(TaskList tasks) +{ + _executeCnt += tasks.size(); + { + std::lock_guard<std::mutex> guard(_mutex); + for (const auto & task : tasks) { + _executeHistory.emplace_back(task.first.getId()); + } + } + _executor.executeTasks(std::move(tasks)); +} + +void SequencedTaskExecutorObserver::sync_all() { ++_syncCnt; @@ -45,7 +58,7 @@ void SequencedTaskExecutorObserver::setTaskLimit(uint32_t taskLimit) { _executor.setTaskLimit(taskLimit); } -vespalib::ExecutorStats SequencedTaskExecutorObserver::getStats() { +ExecutorStats SequencedTaskExecutorObserver::getStats() { return _executor.getStats(); } diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.h b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.h index 7e2bf968952..1d54283c393 100644 --- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.h +++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.h @@ -24,10 +24,11 @@ public: ~SequencedTaskExecutorObserver() override; ExecutorId getExecutorId(uint64_t componentId) const override; - void executeTask(ExecutorId id, vespalib::Executor::Task::UP task) override; + void executeTask(ExecutorId id, Executor::Task::UP task) override; + void executeTasks(TaskList tasks) override; void sync_all() override; void setTaskLimit(uint32_t taskLimit) override; - vespalib::ExecutorStats getStats() override; + ExecutorStats getStats() override; uint32_t getExecuteCnt() const { return _executeCnt; } uint32_t getSyncCnt() const { return _syncCnt; } diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp index a99bce0a705..21ed90c3d22 100644 --- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp @@ -7,12 +7,12 @@ namespace vespalib { SingleExecutor::SingleExecutor(init_fun_t func, uint32_t taskLimit) - : SingleExecutor(func, taskLimit, taskLimit/10, 100ms) + : SingleExecutor(func, taskLimit, true, taskLimit/10, 100ms) { } -SingleExecutor::SingleExecutor(init_fun_t func, uint32_t taskLimit, uint32_t watermark, duration reactionTime) - : _watermarkRatio(watermark < taskLimit ? double(watermark) / taskLimit : 1.0), - _taskLimit(vespalib::roundUp2inN(taskLimit)), +SingleExecutor::SingleExecutor(init_fun_t func, uint32_t reservedQueueSize, bool isQueueSizeHard, uint32_t watermark, duration reactionTime) + : _watermarkRatio(watermark < reservedQueueSize ? double(watermark) / reservedQueueSize : 1.0), + _taskLimit(vespalib::roundUp2inN(reservedQueueSize)), _wantedTaskLimit(_taskLimit.load()), _rp(0), _tasks(std::make_unique<Task::UP[]>(_taskLimit)), @@ -30,9 +30,13 @@ SingleExecutor::SingleExecutor(init_fun_t func, uint32_t taskLimit, uint32_t wat _wp(0), _watermark(_taskLimit.load()*_watermarkRatio), _reactionTime(reactionTime), - _closed(false) + _closed(false), + _overflow() { - assert(taskLimit >= watermark); + assert(reservedQueueSize >= watermark); + if ( ! isQueueSizeHard) { + _overflow = std::make_unique<ArrayQueue<Task::UP>>(); + } _thread.start(); } @@ -62,10 +66,12 @@ SingleExecutor::execute(Task::UP task) { if (_closed) { return task; } - wait_for_room(guard); - wp = _wp.load(std::memory_order_relaxed); - _tasks[index(wp)] = std::move(task); - _wp.store(wp + 1, std::memory_order_release); + task = wait_for_room_or_put_in_overflow_Q(guard, std::move(task)); + if (task) { + wp = move_to_main_q(guard, std::move(task)); + } else { + wp = _wp.load(std::memory_order_relaxed) + num_tasks_in_overflow_q(guard); + } } if (wp == _wakeupConsumerAt.load(std::memory_order_relaxed)) { _consumerCondition.notify_one(); @@ -73,6 +79,24 @@ SingleExecutor::execute(Task::UP task) { return task; } +uint64_t +SingleExecutor::numTasks() { + if (_overflow) { + Lock guard(_mutex); + return num_tasks_in_main_q() + num_tasks_in_overflow_q(guard); + } else { + return num_tasks_in_main_q(); + } +} + +uint64_t +SingleExecutor::move_to_main_q(Lock &, Task::UP task) { + uint64_t wp = _wp.load(std::memory_order_relaxed); + _tasks[index(wp)] = std::move(task); + _wp.store(wp + 1, std::memory_order_release); + return wp; +} + void SingleExecutor::setTaskLimit(uint32_t taskLimit) { _wantedTaskLimit = vespalib::roundUp2inN(taskLimit); @@ -81,7 +105,7 @@ SingleExecutor::setTaskLimit(uint32_t taskLimit) { void SingleExecutor::drain(Lock & lock) { uint64_t wp = _wp.load(std::memory_order_relaxed); - while (numTasks() > 0) { + while (numTasks(lock) > 0) { _consumerCondition.notify_one(); sleepProducer(lock, 100us, wp); } @@ -97,7 +121,7 @@ SingleExecutor::wakeup() { SingleExecutor & SingleExecutor::sync() { Lock lock(_mutex); - uint64_t wp = _wp.load(std::memory_order_relaxed); + uint64_t wp = _wp.load(std::memory_order_relaxed) + num_tasks_in_overflow_q(lock); while (wp > _rp.load(std::memory_order_acquire)) { _consumerCondition.notify_one(); sleepProducer(lock, 100us, wp); @@ -119,7 +143,7 @@ SingleExecutor::run() { _producerCondition.notify_all(); _wakeupConsumerAt.store(_wp.load(std::memory_order_relaxed) + get_watermark(), std::memory_order_relaxed); Lock lock(_mutex); - if (numTasks() <= 0) { + if (numTasks(lock) <= 0) { steady_time now = steady_clock::now(); _threadIdleTracker.set_idle(now); _consumerCondition.wait_until(lock, now + _reactionTime); @@ -134,6 +158,22 @@ void SingleExecutor::drain_tasks() { while (numTasks() > 0) { run_tasks_till(_wp.load(std::memory_order_acquire)); + move_overflow_to_main_q(); + } +} + +void +SingleExecutor::move_overflow_to_main_q() +{ + if ( ! _overflow) return; + Lock guard(_mutex); + move_overflow_to_main_q(guard); +} +void +SingleExecutor::move_overflow_to_main_q(Lock & guard) { + while ( !_overflow->empty() && num_tasks_in_main_q() < _taskLimit.load(std::memory_order_relaxed)) { + move_to_main_q(guard, std::move(_overflow->front())); + _overflow->pop(); } } @@ -151,26 +191,42 @@ SingleExecutor::run_tasks_till(uint64_t available) { } } -void -SingleExecutor::wait_for_room(Lock & lock) { +Executor::Task::UP +SingleExecutor::wait_for_room_or_put_in_overflow_Q(Lock & guard, Task::UP task) { uint64_t wp = _wp.load(std::memory_order_relaxed); uint64_t taskLimit = _taskLimit.load(std::memory_order_relaxed); if (taskLimit != _wantedTaskLimit.load(std::memory_order_relaxed)) { - drain(lock); + drain(guard); _tasks = std::make_unique<Task::UP[]>(_wantedTaskLimit); _taskLimit = _wantedTaskLimit.load(); _watermark = _taskLimit * _watermarkRatio; } - _queueSize.add(numTasks()); - while (numTasks() >= _taskLimit.load(std::memory_order_relaxed)) { - sleepProducer(lock, _reactionTime, wp - get_watermark()); + uint64_t numTaskInQ = numTasks(guard); + _queueSize.add(numTaskInQ); + if (numTaskInQ >= _taskLimit.load(std::memory_order_relaxed)) { + if (_overflow) { + _overflow->push(std::move(task)); + } else { + while (numTasks(guard) >= _taskLimit.load(std::memory_order_relaxed)) { + sleepProducer(guard, _reactionTime, wp - get_watermark()); + } + } + } else { + if (_overflow && !_overflow->empty()) { + _overflow->push(std::move(task)); + } + } + if (_overflow && !_overflow->empty()) { + assert(!task); + move_overflow_to_main_q(guard); } + return task; } ExecutorStats SingleExecutor::getStats() { Lock lock(_mutex); - uint64_t accepted = _wp.load(std::memory_order_relaxed); + uint64_t accepted = _wp.load(std::memory_order_relaxed) + num_tasks_in_overflow_q(lock); steady_time now = steady_clock::now(); _idleTracker.was_idle(_threadIdleTracker.reset(now)); ExecutorStats stats(_queueSize, (accepted - _lastAccepted), 0, _wakeupCount); diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h index e76e3f17a41..4fdc217e701 100644 --- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h +++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h @@ -5,6 +5,7 @@ #include <vespa/vespalib/util/threadexecutor.h> #include <vespa/vespalib/util/thread.h> #include <vespa/vespalib/util/time.h> +#include <vespa/vespalib/util/arrayqueue.hpp> #include <vespa/vespalib/util/executor_idle_tracking.h> #include <thread> #include <atomic> @@ -19,8 +20,8 @@ namespace vespalib { */ class SingleExecutor final : public vespalib::SyncableThreadExecutor, vespalib::Runnable { public: - SingleExecutor(init_fun_t func, uint32_t taskLimit); - SingleExecutor(init_fun_t func, uint32_t taskLimit, uint32_t watermark, duration reactionTime); + SingleExecutor(init_fun_t func, uint32_t reservedQueueSize); + SingleExecutor(init_fun_t func, uint32_t reservedQueueSize, bool isQueueSizeHard, uint32_t watermark, duration reactionTime); ~SingleExecutor() override; Task::UP execute(Task::UP task) override; void setTaskLimit(uint32_t taskLimit) override; @@ -39,12 +40,22 @@ private: void drain_tasks(); void sleepProducer(Lock & guard, duration maxWaitTime, uint64_t wakeupAt); void run_tasks_till(uint64_t available); - void wait_for_room(Lock & guard); + Task::UP wait_for_room_or_put_in_overflow_Q(Lock & guard, Task::UP task); + uint64_t move_to_main_q(Lock & guard, Task::UP task); + void move_overflow_to_main_q(); + void move_overflow_to_main_q(Lock & guard); uint64_t index(uint64_t counter) const { return counter & (_taskLimit.load(std::memory_order_relaxed) - 1); } - uint64_t numTasks() const { + uint64_t numTasks(); + uint64_t numTasks(Lock & guard) const { + return num_tasks_in_main_q() + num_tasks_in_overflow_q(guard); + } + uint64_t num_tasks_in_overflow_q(Lock &) const { + return _overflow ? _overflow->size() : 0; + } + uint64_t num_tasks_in_main_q() const { return _wp.load(std::memory_order_relaxed) - _rp.load(std::memory_order_acquire); } const double _watermarkRatio; @@ -67,6 +78,7 @@ private: std::atomic<uint32_t> _watermark; const duration _reactionTime; bool _closed; + std::unique_ptr<ArrayQueue<Task::UP>> _overflow; }; } diff --git a/storage/src/vespa/storage/persistence/asynchandler.cpp b/storage/src/vespa/storage/persistence/asynchandler.cpp index 3d24ee87879..5dafd9c5eda 100644 --- a/storage/src/vespa/storage/persistence/asynchandler.cpp +++ b/storage/src/vespa/storage/persistence/asynchandler.cpp @@ -113,7 +113,7 @@ class UnrevertableRemoveEntryProcessor : public BucketProcessor::EntryProcessor public: using DocumentIdsAndTimeStamps = std::vector<std::pair<spi::Timestamp, spi::DocumentId>>; UnrevertableRemoveEntryProcessor(DocumentIdsAndTimeStamps & to_remove) - : _to_remove(to_remove) + : _to_remove(to_remove) {} void process(spi::DocEntry& entry) override { diff --git a/vespalib/src/vespa/vespalib/util/arrayqueue.hpp b/vespalib/src/vespa/vespalib/util/arrayqueue.hpp index 73e70e7fd89..8f3dd8ab006 100644 --- a/vespalib/src/vespa/vespalib/util/arrayqueue.hpp +++ b/vespalib/src/vespa/vespalib/util/arrayqueue.hpp @@ -2,11 +2,11 @@ #pragma once -#include <stdint.h> -#include <stdlib.h> +#include "traits.h" +#include <cstdint> +#include <cstdlib> #include <cassert> #include <algorithm> -#include "traits.h" namespace vespalib { |