summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/file/Templar.java75
-rw-r--r--node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/template/Template.java3
-rw-r--r--node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/task/util/file/TemplarTest.java21
-rw-r--r--searchcore/src/apps/tests/persistenceconformance_test.cpp8
-rw-r--r--searchlib/src/vespa/searchlib/diskindex/field_merger.h1
-rw-r--r--staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp23
-rw-r--r--staging_vespalib/src/tests/singleexecutor/singleexecutor_test.cpp24
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.cpp11
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h21
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp2
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.cpp17
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.h5
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp96
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/singleexecutor.h20
-rw-r--r--storage/src/vespa/storage/persistence/asynchandler.cpp2
-rw-r--r--vespalib/src/vespa/vespalib/util/arrayqueue.hpp6
16 files changed, 192 insertions, 143 deletions
diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/file/Templar.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/file/Templar.java
deleted file mode 100644
index 553f2ffba36..00000000000
--- a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/file/Templar.java
+++ /dev/null
@@ -1,75 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.hosted.node.admin.task.util.file;
-
-import java.nio.file.Path;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * A very simple template engine when there's little complexity and lots of Velocity special characters $ and #,
- * i.e. typically shell script.
- *
- * @author hakonhall
- */
-public class Templar {
- private final String template;
-
- private static final String prefix = "<%=";
- private static final String suffix = "%>";
-
- private final Map<String, String> settings = new HashMap<>();
-
- public static Templar fromUtf8File(Path path) {
- return new Templar(new UnixPath(path).readUtf8File());
- }
-
- public Templar(String template) {
- this.template = template;
- }
-
- public Templar set(String name, String value) {
- settings.put(name, value);
- return this;
- }
-
- public String resolve() {
- StringBuilder text = new StringBuilder(template.length() * 2);
-
- int start= 0;
- int end;
-
- for (; start < template.length(); start = end) {
- int prefixStart = template.indexOf(prefix, start);
-
-
- if (prefixStart == -1) {
- text.append(template, start, template.length());
- break;
- } else {
- text.append(template, start, prefixStart);
- }
-
- int suffixStart = template.indexOf(suffix, prefixStart + prefix.length());
- if (suffixStart == -1) {
- throw new IllegalArgumentException("Prefix at offset " + prefixStart + " is not terminated");
- }
-
- int prefixEnd = prefixStart + prefix.length();
- String name = template.substring(prefixEnd, suffixStart).trim();
- String value = settings.get(name);
- if (value == null) {
- throw new IllegalArgumentException("No value is set for name '" + name + "' at offset " + prefixEnd);
- }
-
- text.append(value);
-
- end = suffixStart + suffix.length();
- }
-
- return text.toString();
- }
-
- public FileWriter getFileWriterTo(Path path) {
- return new FileWriter(path, this::resolve);
- }
-}
diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/template/Template.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/template/Template.java
index ea55af17a0e..6d80ac2cad9 100644
--- a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/template/Template.java
+++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/template/Template.java
@@ -26,12 +26,15 @@ import java.util.Optional;
* id: a valid Java identifier
* </pre>
*
+ * <p>Other directive delimiters than "%{" and "}" may be used, see {@link TemplateDescriptor}.</p>
+ *
* <p>Fill the template with variable values ({@link #set(String, String) set()}, set if conditions
* ({@link #set(String, boolean)}), add list elements ({@link #add(String) add()}, etc, and finally
* render it as a String ({@link #render()}).</p>
*
* <p>To reuse a template, create the template and work on snapshots of that ({@link #snapshot()}).</p>
*
+ * @see TemplateDescriptor
* @author hakonhall
*/
public class Template implements Form {
diff --git a/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/task/util/file/TemplarTest.java b/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/task/util/file/TemplarTest.java
deleted file mode 100644
index ed410ffc1d1..00000000000
--- a/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/task/util/file/TemplarTest.java
+++ /dev/null
@@ -1,21 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.hosted.node.admin.task.util.file;
-
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * @author hakonhall
- */
-public class TemplarTest {
- @Test
- public void test() {
- Templar templar = new Templar("x y <%= foo %>, some other <%=bar%> text");
- templar.set("foo", "fidelity")
- .set("bar", "halimov")
- .set("not", "used");
-
- assertEquals("x y fidelity, some other halimov text", templar.resolve());
- }
-} \ No newline at end of file
diff --git a/searchcore/src/apps/tests/persistenceconformance_test.cpp b/searchcore/src/apps/tests/persistenceconformance_test.cpp
index 214c57557bc..385d0eb363e 100644
--- a/searchcore/src/apps/tests/persistenceconformance_test.cpp
+++ b/searchcore/src/apps/tests/persistenceconformance_test.cpp
@@ -178,6 +178,12 @@ private:
MockSharedThreadingService _shared_service;
storage::spi::dummy::DummyBucketExecutor _bucketExecutor;
+ static std::shared_ptr<ProtonConfig> make_proton_config() {
+ ProtonConfigBuilder proton_config;
+ proton_config.indexing.optimize = ProtonConfigBuilder::Indexing::Optimize::LATENCY;
+ return std::make_shared<ProtonConfig>(proton_config);
+ }
+
public:
DocumentDBFactory(const vespalib::string &baseDir, int tlsListenPort);
~DocumentDBFactory() override;
@@ -196,7 +202,7 @@ public:
TuneFileDocumentDB::SP tuneFileDocDB(new TuneFileDocumentDB());
DocumentDBConfigHelper mgr(spec, docType.getName());
auto b = std::make_shared<BootstrapConfig>(1, factory.getTypeCfg(), factory.getTypeRepo(),
- std::make_shared<ProtonConfig>(),
+ make_proton_config(),
std::make_shared<FiledistributorrpcConfig>(),
std::make_shared<BucketspacesConfig>(),
tuneFileDocDB, HwInfo());
diff --git a/searchlib/src/vespa/searchlib/diskindex/field_merger.h b/searchlib/src/vespa/searchlib/diskindex/field_merger.h
index 5017a7d5192..bbb4efc03b1 100644
--- a/searchlib/src/vespa/searchlib/diskindex/field_merger.h
+++ b/searchlib/src/vespa/searchlib/diskindex/field_merger.h
@@ -53,7 +53,6 @@ class FieldMerger
std::unique_ptr<FieldWriter> _writer;
State _state;
bool _failed;
- bool _force_small_merge_chunk;
void make_tmp_dirs();
bool clean_tmp_dirs();
diff --git a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp
index ef7f8bfb0f6..243935d4013 100644
--- a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp
+++ b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp
@@ -96,6 +96,23 @@ TEST_F("require that task with same component id are serialized", Fixture)
EXPECT_EQUAL(42, tv->_val);
}
+TEST_F("require that task with same component id are serialized when executed with list", Fixture)
+{
+ std::shared_ptr<TestObj> tv(std::make_shared<TestObj>());
+ EXPECT_EQUAL(0, tv->_val);
+ ISequencedTaskExecutor::ExecutorId executorId = f._threads->getExecutorId(0);
+ ISequencedTaskExecutor::TaskList list;
+ list.template emplace_back(executorId, makeLambdaTask([=]() { usleep(2000); tv->modify(0, 14); }));
+ list.template emplace_back(executorId, makeLambdaTask([=]() { tv->modify(14, 42); }));
+ f._threads->executeTasks(std::move(list));
+ tv->wait(2);
+ EXPECT_EQUAL(0, tv->_fail);
+ EXPECT_EQUAL(42, tv->_val);
+ f._threads->sync_all();
+ EXPECT_EQUAL(0, tv->_fail);
+ EXPECT_EQUAL(42, tv->_val);
+}
+
TEST_F("require that task with different component ids are not serialized", Fixture)
{
int tryCnt = 0;
@@ -136,7 +153,8 @@ TEST_F("require that task with same string component id are serialized", Fixture
namespace {
-int detectSerializeFailure(Fixture &f, vespalib::stringref altComponentId, int tryLimit)
+int
+detectSerializeFailure(Fixture &f, vespalib::stringref altComponentId, int tryLimit)
{
int tryCnt = 0;
for (tryCnt = 0; tryCnt < tryLimit; ++tryCnt) {
@@ -158,7 +176,8 @@ int detectSerializeFailure(Fixture &f, vespalib::stringref altComponentId, int t
return tryCnt;
}
-vespalib::string makeAltComponentId(Fixture &f)
+vespalib::string
+makeAltComponentId(Fixture &f)
{
int tryCnt = 0;
char altComponentId[20];
diff --git a/staging_vespalib/src/tests/singleexecutor/singleexecutor_test.cpp b/staging_vespalib/src/tests/singleexecutor/singleexecutor_test.cpp
index dd71380f64a..56352ff3c0d 100644
--- a/staging_vespalib/src/tests/singleexecutor/singleexecutor_test.cpp
+++ b/staging_vespalib/src/tests/singleexecutor/singleexecutor_test.cpp
@@ -30,6 +30,28 @@ TEST("test that all tasks are executed") {
EXPECT_EQUAL(10000u, counter);
}
+TEST("test that executor can overflow") {
+ constexpr size_t NUM_TASKS = 1000;
+ std::atomic<uint64_t> counter(0);
+ vespalib::Gate gate;
+ SingleExecutor executor(sequenced_executor, 10, false, 1, 1ms);
+ executor.execute(makeLambdaTask([&gate] { gate.await();}));
+
+ for(size_t i(0); i < NUM_TASKS; i++) {
+ executor.execute(makeLambdaTask([&counter, i] {
+ EXPECT_EQUAL(i, counter);
+ counter++;
+ }));
+ }
+ EXPECT_EQUAL(0u, counter);
+ ExecutorStats stats = executor.getStats();
+ EXPECT_EQUAL(NUM_TASKS + 1, stats.acceptedTasks);
+ EXPECT_EQUAL(NUM_TASKS, stats.queueSize.max());
+ gate.countDown();
+ executor.sync();
+ EXPECT_EQUAL(NUM_TASKS, counter);
+}
+
void verifyResizeTaskLimit(bool up) {
std::mutex lock;
std::condition_variable cond;
@@ -38,7 +60,7 @@ void verifyResizeTaskLimit(bool up) {
constexpr uint32_t INITIAL = 20;
const uint32_t INITIAL_2inN = roundUp2inN(INITIAL);
double waterMarkRatio = 0.5;
- SingleExecutor executor(sequenced_executor, INITIAL, INITIAL*waterMarkRatio, 10ms);
+ SingleExecutor executor(sequenced_executor, INITIAL, true, INITIAL*waterMarkRatio, 10ms);
EXPECT_EQUAL(INITIAL_2inN, executor.getTaskLimit());
EXPECT_EQUAL(uint32_t(INITIAL_2inN*waterMarkRatio), executor.get_watermark());
diff --git a/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.cpp
index c54f182891c..b31d72da3b1 100644
--- a/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.cpp
@@ -12,9 +12,16 @@ ISequencedTaskExecutor::ISequencedTaskExecutor(uint32_t numExecutors)
ISequencedTaskExecutor::~ISequencedTaskExecutor() = default;
+void
+ISequencedTaskExecutor::executeTasks(TaskList tasks) {
+ for (auto & task : tasks) {
+ executeTask(task.first, std::move(task.second));
+ }
+}
+
ISequencedTaskExecutor::ExecutorId
-ISequencedTaskExecutor::getExecutorIdFromName(vespalib::stringref componentId) const {
- vespalib::hash<vespalib::stringref> hashfun;
+ISequencedTaskExecutor::getExecutorIdFromName(stringref componentId) const {
+ hash<stringref> hashfun;
return getExecutorId(hashfun(componentId));
}
diff --git a/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h b/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h
index 3fe6fb5d678..ff90556e3e4 100644
--- a/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h
+++ b/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h
@@ -14,7 +14,7 @@ namespace vespalib {
* Interface class to run multiple tasks in parallel, but tasks with same
* id has to be run in sequence.
*/
-class ISequencedTaskExecutor : public vespalib::IWakeup
+class ISequencedTaskExecutor : public IWakeup
{
public:
class ExecutorId {
@@ -28,6 +28,7 @@ public:
private:
uint32_t _id;
};
+ using TaskList = std::vector<std::pair<ExecutorId, Executor::Task::UP>>;
ISequencedTaskExecutor(uint32_t numExecutors);
virtual ~ISequencedTaskExecutor();
@@ -40,7 +41,7 @@ public:
virtual ExecutorId getExecutorId(uint64_t componentId) const = 0;
uint32_t getNumExecutors() const { return _numExecutors; }
- ExecutorId getExecutorIdFromName(vespalib::stringref componentId) const;
+ ExecutorId getExecutorIdFromName(stringref componentId) const;
/**
* Returns an executor id that is NOT equal to the given executor id,
@@ -58,7 +59,13 @@ public:
* @param id which internal executor to use
* @param task unique pointer to the task to be executed
*/
- virtual void executeTask(ExecutorId id, vespalib::Executor::Task::UP task) = 0;
+ virtual void executeTask(ExecutorId id, Executor::Task::UP task) = 0;
+ /**
+ * Schedule a list of tasks to run after all previously scheduled tasks with
+ * same id. Default is to just iterate and execute one by one, but implementations
+ * that can schedule all in one go more efficiently can implement this one.
+ */
+ virtual void executeTasks(TaskList tasks);
/**
* Call this one to ensure you get the attention of the workers.
*/
@@ -74,7 +81,7 @@ public:
*/
template <class FunctionType>
void executeLambda(ExecutorId id, FunctionType &&function) {
- executeTask(id, vespalib::makeLambdaTask(std::forward<FunctionType>(function)));
+ executeTask(id, makeLambdaTask(std::forward<FunctionType>(function)));
}
/**
* Wait for all scheduled tasks to complete.
@@ -83,7 +90,7 @@ public:
virtual void setTaskLimit(uint32_t taskLimit) = 0;
- virtual vespalib::ExecutorStats getStats() = 0;
+ virtual ExecutorStats getStats() = 0;
/**
* Wrap lambda function into a task and schedule it to be run.
@@ -96,7 +103,7 @@ public:
template <class FunctionType>
void execute(uint64_t componentId, FunctionType &&function) {
ExecutorId id = getExecutorId(componentId);
- executeTask(id, vespalib::makeLambdaTask(std::forward<FunctionType>(function)));
+ executeTask(id, makeLambdaTask(std::forward<FunctionType>(function)));
}
/**
@@ -109,7 +116,7 @@ public:
*/
template <class FunctionType>
void execute(ExecutorId id, FunctionType &&function) {
- executeTask(id, vespalib::makeLambdaTask(std::forward<FunctionType>(function)));
+ executeTask(id, makeLambdaTask(std::forward<FunctionType>(function)));
}
private:
diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp
index 76b0235301b..58ae862f7c6 100644
--- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp
@@ -67,7 +67,7 @@ SequencedTaskExecutor::create(Runnable::init_fun_t func, uint32_t threads, uint3
for (uint32_t id = 0; id < threads; ++id) {
if (optimize == OptimizeFor::THROUGHPUT) {
uint32_t watermark = (kindOfWatermark == 0) ? taskLimit / 10 : kindOfWatermark;
- executors.push_back(std::make_unique<SingleExecutor>(func, taskLimit, watermark, 100ms));
+ executors.push_back(std::make_unique<SingleExecutor>(func, taskLimit, true, watermark, 100ms));
} else {
executors.push_back(std::make_unique<BlockingThreadStackExecutor>(1, stackSize, taskLimit, func));
}
diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.cpp b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.cpp
index 5ae8e96b606..d81b8ec1db6 100644
--- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.cpp
@@ -17,7 +17,7 @@ SequencedTaskExecutorObserver::SequencedTaskExecutorObserver(ISequencedTaskExecu
SequencedTaskExecutorObserver::~SequencedTaskExecutorObserver() = default;
void
-SequencedTaskExecutorObserver::executeTask(ExecutorId id, vespalib::Executor::Task::UP task)
+SequencedTaskExecutorObserver::executeTask(ExecutorId id, Executor::Task::UP task)
{
++_executeCnt;
{
@@ -28,6 +28,19 @@ SequencedTaskExecutorObserver::executeTask(ExecutorId id, vespalib::Executor::Ta
}
void
+SequencedTaskExecutorObserver::executeTasks(TaskList tasks)
+{
+ _executeCnt += tasks.size();
+ {
+ std::lock_guard<std::mutex> guard(_mutex);
+ for (const auto & task : tasks) {
+ _executeHistory.emplace_back(task.first.getId());
+ }
+ }
+ _executor.executeTasks(std::move(tasks));
+}
+
+void
SequencedTaskExecutorObserver::sync_all()
{
++_syncCnt;
@@ -45,7 +58,7 @@ void SequencedTaskExecutorObserver::setTaskLimit(uint32_t taskLimit) {
_executor.setTaskLimit(taskLimit);
}
-vespalib::ExecutorStats SequencedTaskExecutorObserver::getStats() {
+ExecutorStats SequencedTaskExecutorObserver::getStats() {
return _executor.getStats();
}
diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.h b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.h
index 7e2bf968952..1d54283c393 100644
--- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.h
+++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.h
@@ -24,10 +24,11 @@ public:
~SequencedTaskExecutorObserver() override;
ExecutorId getExecutorId(uint64_t componentId) const override;
- void executeTask(ExecutorId id, vespalib::Executor::Task::UP task) override;
+ void executeTask(ExecutorId id, Executor::Task::UP task) override;
+ void executeTasks(TaskList tasks) override;
void sync_all() override;
void setTaskLimit(uint32_t taskLimit) override;
- vespalib::ExecutorStats getStats() override;
+ ExecutorStats getStats() override;
uint32_t getExecuteCnt() const { return _executeCnt; }
uint32_t getSyncCnt() const { return _syncCnt; }
diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
index a99bce0a705..21ed90c3d22 100644
--- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
@@ -7,12 +7,12 @@
namespace vespalib {
SingleExecutor::SingleExecutor(init_fun_t func, uint32_t taskLimit)
- : SingleExecutor(func, taskLimit, taskLimit/10, 100ms)
+ : SingleExecutor(func, taskLimit, true, taskLimit/10, 100ms)
{ }
-SingleExecutor::SingleExecutor(init_fun_t func, uint32_t taskLimit, uint32_t watermark, duration reactionTime)
- : _watermarkRatio(watermark < taskLimit ? double(watermark) / taskLimit : 1.0),
- _taskLimit(vespalib::roundUp2inN(taskLimit)),
+SingleExecutor::SingleExecutor(init_fun_t func, uint32_t reservedQueueSize, bool isQueueSizeHard, uint32_t watermark, duration reactionTime)
+ : _watermarkRatio(watermark < reservedQueueSize ? double(watermark) / reservedQueueSize : 1.0),
+ _taskLimit(vespalib::roundUp2inN(reservedQueueSize)),
_wantedTaskLimit(_taskLimit.load()),
_rp(0),
_tasks(std::make_unique<Task::UP[]>(_taskLimit)),
@@ -30,9 +30,13 @@ SingleExecutor::SingleExecutor(init_fun_t func, uint32_t taskLimit, uint32_t wat
_wp(0),
_watermark(_taskLimit.load()*_watermarkRatio),
_reactionTime(reactionTime),
- _closed(false)
+ _closed(false),
+ _overflow()
{
- assert(taskLimit >= watermark);
+ assert(reservedQueueSize >= watermark);
+ if ( ! isQueueSizeHard) {
+ _overflow = std::make_unique<ArrayQueue<Task::UP>>();
+ }
_thread.start();
}
@@ -62,10 +66,12 @@ SingleExecutor::execute(Task::UP task) {
if (_closed) {
return task;
}
- wait_for_room(guard);
- wp = _wp.load(std::memory_order_relaxed);
- _tasks[index(wp)] = std::move(task);
- _wp.store(wp + 1, std::memory_order_release);
+ task = wait_for_room_or_put_in_overflow_Q(guard, std::move(task));
+ if (task) {
+ wp = move_to_main_q(guard, std::move(task));
+ } else {
+ wp = _wp.load(std::memory_order_relaxed) + num_tasks_in_overflow_q(guard);
+ }
}
if (wp == _wakeupConsumerAt.load(std::memory_order_relaxed)) {
_consumerCondition.notify_one();
@@ -73,6 +79,24 @@ SingleExecutor::execute(Task::UP task) {
return task;
}
+uint64_t
+SingleExecutor::numTasks() {
+ if (_overflow) {
+ Lock guard(_mutex);
+ return num_tasks_in_main_q() + num_tasks_in_overflow_q(guard);
+ } else {
+ return num_tasks_in_main_q();
+ }
+}
+
+uint64_t
+SingleExecutor::move_to_main_q(Lock &, Task::UP task) {
+ uint64_t wp = _wp.load(std::memory_order_relaxed);
+ _tasks[index(wp)] = std::move(task);
+ _wp.store(wp + 1, std::memory_order_release);
+ return wp;
+}
+
void
SingleExecutor::setTaskLimit(uint32_t taskLimit) {
_wantedTaskLimit = vespalib::roundUp2inN(taskLimit);
@@ -81,7 +105,7 @@ SingleExecutor::setTaskLimit(uint32_t taskLimit) {
void
SingleExecutor::drain(Lock & lock) {
uint64_t wp = _wp.load(std::memory_order_relaxed);
- while (numTasks() > 0) {
+ while (numTasks(lock) > 0) {
_consumerCondition.notify_one();
sleepProducer(lock, 100us, wp);
}
@@ -97,7 +121,7 @@ SingleExecutor::wakeup() {
SingleExecutor &
SingleExecutor::sync() {
Lock lock(_mutex);
- uint64_t wp = _wp.load(std::memory_order_relaxed);
+ uint64_t wp = _wp.load(std::memory_order_relaxed) + num_tasks_in_overflow_q(lock);
while (wp > _rp.load(std::memory_order_acquire)) {
_consumerCondition.notify_one();
sleepProducer(lock, 100us, wp);
@@ -119,7 +143,7 @@ SingleExecutor::run() {
_producerCondition.notify_all();
_wakeupConsumerAt.store(_wp.load(std::memory_order_relaxed) + get_watermark(), std::memory_order_relaxed);
Lock lock(_mutex);
- if (numTasks() <= 0) {
+ if (numTasks(lock) <= 0) {
steady_time now = steady_clock::now();
_threadIdleTracker.set_idle(now);
_consumerCondition.wait_until(lock, now + _reactionTime);
@@ -134,6 +158,22 @@ void
SingleExecutor::drain_tasks() {
while (numTasks() > 0) {
run_tasks_till(_wp.load(std::memory_order_acquire));
+ move_overflow_to_main_q();
+ }
+}
+
+void
+SingleExecutor::move_overflow_to_main_q()
+{
+ if ( ! _overflow) return;
+ Lock guard(_mutex);
+ move_overflow_to_main_q(guard);
+}
+void
+SingleExecutor::move_overflow_to_main_q(Lock & guard) {
+ while ( !_overflow->empty() && num_tasks_in_main_q() < _taskLimit.load(std::memory_order_relaxed)) {
+ move_to_main_q(guard, std::move(_overflow->front()));
+ _overflow->pop();
}
}
@@ -151,26 +191,42 @@ SingleExecutor::run_tasks_till(uint64_t available) {
}
}
-void
-SingleExecutor::wait_for_room(Lock & lock) {
+Executor::Task::UP
+SingleExecutor::wait_for_room_or_put_in_overflow_Q(Lock & guard, Task::UP task) {
uint64_t wp = _wp.load(std::memory_order_relaxed);
uint64_t taskLimit = _taskLimit.load(std::memory_order_relaxed);
if (taskLimit != _wantedTaskLimit.load(std::memory_order_relaxed)) {
- drain(lock);
+ drain(guard);
_tasks = std::make_unique<Task::UP[]>(_wantedTaskLimit);
_taskLimit = _wantedTaskLimit.load();
_watermark = _taskLimit * _watermarkRatio;
}
- _queueSize.add(numTasks());
- while (numTasks() >= _taskLimit.load(std::memory_order_relaxed)) {
- sleepProducer(lock, _reactionTime, wp - get_watermark());
+ uint64_t numTaskInQ = numTasks(guard);
+ _queueSize.add(numTaskInQ);
+ if (numTaskInQ >= _taskLimit.load(std::memory_order_relaxed)) {
+ if (_overflow) {
+ _overflow->push(std::move(task));
+ } else {
+ while (numTasks(guard) >= _taskLimit.load(std::memory_order_relaxed)) {
+ sleepProducer(guard, _reactionTime, wp - get_watermark());
+ }
+ }
+ } else {
+ if (_overflow && !_overflow->empty()) {
+ _overflow->push(std::move(task));
+ }
+ }
+ if (_overflow && !_overflow->empty()) {
+ assert(!task);
+ move_overflow_to_main_q(guard);
}
+ return task;
}
ExecutorStats
SingleExecutor::getStats() {
Lock lock(_mutex);
- uint64_t accepted = _wp.load(std::memory_order_relaxed);
+ uint64_t accepted = _wp.load(std::memory_order_relaxed) + num_tasks_in_overflow_q(lock);
steady_time now = steady_clock::now();
_idleTracker.was_idle(_threadIdleTracker.reset(now));
ExecutorStats stats(_queueSize, (accepted - _lastAccepted), 0, _wakeupCount);
diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
index e76e3f17a41..4fdc217e701 100644
--- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
+++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
@@ -5,6 +5,7 @@
#include <vespa/vespalib/util/threadexecutor.h>
#include <vespa/vespalib/util/thread.h>
#include <vespa/vespalib/util/time.h>
+#include <vespa/vespalib/util/arrayqueue.hpp>
#include <vespa/vespalib/util/executor_idle_tracking.h>
#include <thread>
#include <atomic>
@@ -19,8 +20,8 @@ namespace vespalib {
*/
class SingleExecutor final : public vespalib::SyncableThreadExecutor, vespalib::Runnable {
public:
- SingleExecutor(init_fun_t func, uint32_t taskLimit);
- SingleExecutor(init_fun_t func, uint32_t taskLimit, uint32_t watermark, duration reactionTime);
+ SingleExecutor(init_fun_t func, uint32_t reservedQueueSize);
+ SingleExecutor(init_fun_t func, uint32_t reservedQueueSize, bool isQueueSizeHard, uint32_t watermark, duration reactionTime);
~SingleExecutor() override;
Task::UP execute(Task::UP task) override;
void setTaskLimit(uint32_t taskLimit) override;
@@ -39,12 +40,22 @@ private:
void drain_tasks();
void sleepProducer(Lock & guard, duration maxWaitTime, uint64_t wakeupAt);
void run_tasks_till(uint64_t available);
- void wait_for_room(Lock & guard);
+ Task::UP wait_for_room_or_put_in_overflow_Q(Lock & guard, Task::UP task);
+ uint64_t move_to_main_q(Lock & guard, Task::UP task);
+ void move_overflow_to_main_q();
+ void move_overflow_to_main_q(Lock & guard);
uint64_t index(uint64_t counter) const {
return counter & (_taskLimit.load(std::memory_order_relaxed) - 1);
}
- uint64_t numTasks() const {
+ uint64_t numTasks();
+ uint64_t numTasks(Lock & guard) const {
+ return num_tasks_in_main_q() + num_tasks_in_overflow_q(guard);
+ }
+ uint64_t num_tasks_in_overflow_q(Lock &) const {
+ return _overflow ? _overflow->size() : 0;
+ }
+ uint64_t num_tasks_in_main_q() const {
return _wp.load(std::memory_order_relaxed) - _rp.load(std::memory_order_acquire);
}
const double _watermarkRatio;
@@ -67,6 +78,7 @@ private:
std::atomic<uint32_t> _watermark;
const duration _reactionTime;
bool _closed;
+ std::unique_ptr<ArrayQueue<Task::UP>> _overflow;
};
}
diff --git a/storage/src/vespa/storage/persistence/asynchandler.cpp b/storage/src/vespa/storage/persistence/asynchandler.cpp
index 3d24ee87879..5dafd9c5eda 100644
--- a/storage/src/vespa/storage/persistence/asynchandler.cpp
+++ b/storage/src/vespa/storage/persistence/asynchandler.cpp
@@ -113,7 +113,7 @@ class UnrevertableRemoveEntryProcessor : public BucketProcessor::EntryProcessor
public:
using DocumentIdsAndTimeStamps = std::vector<std::pair<spi::Timestamp, spi::DocumentId>>;
UnrevertableRemoveEntryProcessor(DocumentIdsAndTimeStamps & to_remove)
- : _to_remove(to_remove)
+ : _to_remove(to_remove)
{}
void process(spi::DocEntry& entry) override {
diff --git a/vespalib/src/vespa/vespalib/util/arrayqueue.hpp b/vespalib/src/vespa/vespalib/util/arrayqueue.hpp
index 73e70e7fd89..8f3dd8ab006 100644
--- a/vespalib/src/vespa/vespalib/util/arrayqueue.hpp
+++ b/vespalib/src/vespa/vespalib/util/arrayqueue.hpp
@@ -2,11 +2,11 @@
#pragma once
-#include <stdint.h>
-#include <stdlib.h>
+#include "traits.h"
+#include <cstdint>
+#include <cstdlib>
#include <cassert>
#include <algorithm>
-#include "traits.h"
namespace vespalib {