aboutsummaryrefslogtreecommitdiffstats
path: root/staging_vespalib
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-04-04 22:20:35 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2020-04-04 22:35:17 +0000
commit416ff1764ce98954b3b15fcae0f6a50d76b38323 (patch)
tree8974071929be2d3723db0a14567dcbeb2f7a1797 /staging_vespalib
parent130d4607a359ae2740bdeeb0179a731751f979a0 (diff)
Move sequenced task executors to staging vespalib
Diffstat (limited to 'staging_vespalib')
-rw-r--r--staging_vespalib/CMakeLists.txt1
-rw-r--r--staging_vespalib/src/tests/sequencedtaskexecutor/.gitignore4
-rw-r--r--staging_vespalib/src/tests/sequencedtaskexecutor/CMakeLists.txt31
-rw-r--r--staging_vespalib/src/tests/sequencedtaskexecutor/adaptive_sequenced_executor_test.cpp250
-rw-r--r--staging_vespalib/src/tests/sequencedtaskexecutor/foregroundtaskexecutor_test.cpp120
-rw-r--r--staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp70
-rw-r--r--staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp250
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/CMakeLists.txt5
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp324
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h126
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.cpp42
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.h34
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.cpp46
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h113
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp83
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h38
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.cpp52
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.h38
18 files changed, 1627 insertions, 0 deletions
diff --git a/staging_vespalib/CMakeLists.txt b/staging_vespalib/CMakeLists.txt
index 9fb48cbd4fc..e76d3078630 100644
--- a/staging_vespalib/CMakeLists.txt
+++ b/staging_vespalib/CMakeLists.txt
@@ -30,6 +30,7 @@ vespa_define_module(
src/tests/shutdownguard
src/tests/state_server
src/tests/stllike
+ src/tests/sequencedtaskexecutor
src/tests/singleexecutor
src/tests/timer
src/tests/util/process_memory_stats
diff --git a/staging_vespalib/src/tests/sequencedtaskexecutor/.gitignore b/staging_vespalib/src/tests/sequencedtaskexecutor/.gitignore
new file mode 100644
index 00000000000..523cfe5e3e1
--- /dev/null
+++ b/staging_vespalib/src/tests/sequencedtaskexecutor/.gitignore
@@ -0,0 +1,4 @@
+staging_vespalib_sequencedtaskexecutor_test_app
+staging_vespalib_sequencedtaskexecutor_benchmark_app
+staging_vespalib_adaptive_sequenced_executor_test_app
+staging_vespalib_foregroundtaskexecutor_test_app
diff --git a/staging_vespalib/src/tests/sequencedtaskexecutor/CMakeLists.txt b/staging_vespalib/src/tests/sequencedtaskexecutor/CMakeLists.txt
new file mode 100644
index 00000000000..6895eafd94a
--- /dev/null
+++ b/staging_vespalib/src/tests/sequencedtaskexecutor/CMakeLists.txt
@@ -0,0 +1,31 @@
+# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+vespa_add_executable(staging_vespalib_sequencedtaskexecutor_benchmark_app TEST
+ SOURCES
+ sequencedtaskexecutor_benchmark.cpp
+ DEPENDS
+ staging_vespalib
+)
+
+vespa_add_executable(staging_vespalib_sequencedtaskexecutor_test_app TEST
+ SOURCES
+ sequencedtaskexecutor_test.cpp
+ DEPENDS
+ staging_vespalib
+)
+vespa_add_test(NAME staging_vespalib_sequencedtaskexecutor_test_app COMMAND staging_vespalib_sequencedtaskexecutor_test_app)
+
+vespa_add_executable(staging_vespalib_adaptive_sequenced_executor_test_app TEST
+ SOURCES
+ adaptive_sequenced_executor_test.cpp
+ DEPENDS
+ staging_vespalib
+)
+vespa_add_test(NAME staging_vespalib_adaptive_sequenced_executor_test_app COMMAND staging_vespalib_adaptive_sequenced_executor_test_app)
+
+vespa_add_executable(staging_vespalib_foregroundtaskexecutor_test_app TEST
+ SOURCES
+ foregroundtaskexecutor_test.cpp
+ DEPENDS
+ staging_vespalib
+)
+vespa_add_test(NAME staging_vespalib_foregroundtaskexecutor_test_app COMMAND staging_vespalib_foregroundtaskexecutor_test_app)
diff --git a/staging_vespalib/src/tests/sequencedtaskexecutor/adaptive_sequenced_executor_test.cpp b/staging_vespalib/src/tests/sequencedtaskexecutor/adaptive_sequenced_executor_test.cpp
new file mode 100644
index 00000000000..10f3f6089e3
--- /dev/null
+++ b/staging_vespalib/src/tests/sequencedtaskexecutor/adaptive_sequenced_executor_test.cpp
@@ -0,0 +1,250 @@
+// Copyright 2020 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/vespalib/util/adaptive_sequenced_executor.h>
+#include <vespa/vespalib/testkit/testapp.h>
+#include <vespa/vespalib/test/insertion_operators.h>
+
+#include <condition_variable>
+#include <unistd.h>
+
+#include <vespa/log/log.h>
+LOG_SETUP("adaptive_sequenced_executor_test");
+
+namespace vespalib {
+
+
+class Fixture
+{
+public:
+ AdaptiveSequencedExecutor _threads;
+
+ Fixture() : _threads(2, 2, 0, 1000) { }
+};
+
+
+class TestObj
+{
+public:
+ std::mutex _m;
+ std::condition_variable _cv;
+ int _done;
+ int _fail;
+ int _val;
+
+ TestObj()
+ : _m(),
+ _cv(),
+ _done(0),
+ _fail(0),
+ _val(0)
+ {
+ }
+
+ void
+ modify(int oldValue, int newValue)
+ {
+ {
+ std::lock_guard<std::mutex> guard(_m);
+ if (_val == oldValue) {
+ _val = newValue;
+ } else {
+ ++_fail;
+ }
+ ++_done;
+ }
+ _cv.notify_all();
+ }
+
+ void
+ wait(int wantDone)
+ {
+ std::unique_lock<std::mutex> guard(_m);
+ _cv.wait(guard, [&] { return this->_done >= wantDone; });
+ }
+};
+
+TEST_F("testExecute", Fixture) {
+ std::shared_ptr<TestObj> tv(std::make_shared<TestObj>());
+ EXPECT_EQUAL(0, tv->_val);
+ f._threads.execute(1, [&]() { tv->modify(0, 42); });
+ tv->wait(1);
+ EXPECT_EQUAL(0, tv->_fail);
+ EXPECT_EQUAL(42, tv->_val);
+ f._threads.sync();
+ EXPECT_EQUAL(0, tv->_fail);
+ EXPECT_EQUAL(42, tv->_val);
+}
+
+
+TEST_F("require that task with same component id are serialized", Fixture)
+{
+ std::shared_ptr<TestObj> tv(std::make_shared<TestObj>());
+ EXPECT_EQUAL(0, tv->_val);
+ f._threads.execute(0, [&]() { usleep(2000); tv->modify(0, 14); });
+ f._threads.execute(0, [&]() { tv->modify(14, 42); });
+ tv->wait(2);
+ EXPECT_EQUAL(0, tv->_fail);
+ EXPECT_EQUAL(42, tv->_val);
+ f._threads.sync();
+ EXPECT_EQUAL(0, tv->_fail);
+ EXPECT_EQUAL(42, tv->_val);
+}
+
+TEST_F("require that task with different component ids are not serialized", Fixture)
+{
+ int tryCnt = 0;
+ for (tryCnt = 0; tryCnt < 100; ++tryCnt) {
+ std::shared_ptr<TestObj> tv(std::make_shared<TestObj>());
+ EXPECT_EQUAL(0, tv->_val);
+ f._threads.execute(0, [&]() { usleep(2000); tv->modify(0, 14); });
+ f._threads.execute(2, [&]() { tv->modify(14, 42); });
+ tv->wait(2);
+ if (tv->_fail != 1) {
+ continue;
+ }
+ EXPECT_EQUAL(1, tv->_fail);
+ EXPECT_EQUAL(14, tv->_val);
+ f._threads.sync();
+ EXPECT_EQUAL(1, tv->_fail);
+ EXPECT_EQUAL(14, tv->_val);
+ break;
+ }
+ EXPECT_TRUE(tryCnt < 100);
+}
+
+
+TEST_F("require that task with same string component id are serialized", Fixture)
+{
+ std::shared_ptr<TestObj> tv(std::make_shared<TestObj>());
+ EXPECT_EQUAL(0, tv->_val);
+ auto test2 = [&]() { tv->modify(14, 42); };
+ f._threads.execute(f._threads.getExecutorId("0"), [&]() { usleep(2000); tv->modify(0, 14); });
+ f._threads.execute(f._threads.getExecutorId("0"), test2);
+ tv->wait(2);
+ EXPECT_EQUAL(0, tv->_fail);
+ EXPECT_EQUAL(42, tv->_val);
+ f._threads.sync();
+ EXPECT_EQUAL(0, tv->_fail);
+ EXPECT_EQUAL(42, tv->_val);
+}
+
+namespace {
+
+int detectSerializeFailure(Fixture &f, vespalib::stringref altComponentId, int tryLimit)
+{
+ int tryCnt = 0;
+ for (tryCnt = 0; tryCnt < tryLimit; ++tryCnt) {
+ std::shared_ptr<TestObj> tv(std::make_shared<TestObj>());
+ EXPECT_EQUAL(0, tv->_val);
+ f._threads.execute(f._threads.getExecutorId("0"), [&]() { usleep(2000); tv->modify(0, 14); });
+ f._threads.execute(f._threads.getExecutorId(altComponentId), [&]() { tv->modify(14, 42); });
+ tv->wait(2);
+ if (tv->_fail != 1) {
+ continue;
+ }
+ EXPECT_EQUAL(1, tv->_fail);
+ EXPECT_EQUAL(14, tv->_val);
+ f._threads.sync();
+ EXPECT_EQUAL(1, tv->_fail);
+ EXPECT_EQUAL(14, tv->_val);
+ break;
+ }
+ return tryCnt;
+}
+
+vespalib::string makeAltComponentId(Fixture &f)
+{
+ int tryCnt = 0;
+ char altComponentId[20];
+ ISequencedTaskExecutor::ExecutorId executorId0 = f._threads.getExecutorId("0");
+ for (tryCnt = 1; tryCnt < 100; ++tryCnt) {
+ sprintf(altComponentId, "%d", tryCnt);
+ if (f._threads.getExecutorId(altComponentId) == executorId0) {
+ break;
+ }
+ }
+ EXPECT_TRUE(tryCnt < 100);
+ return altComponentId;
+}
+
+}
+
+TEST_F("require that task with different string component ids are not serialized", Fixture)
+{
+ int tryCnt = detectSerializeFailure(f, "2", 100);
+ EXPECT_TRUE(tryCnt < 100);
+}
+
+
+TEST_F("require that task with different string component ids mapping to the same executor id are serialized",
+ Fixture)
+{
+ vespalib::string altComponentId = makeAltComponentId(f);
+ LOG(info, "second string component id is \"%s\"", altComponentId.c_str());
+ int tryCnt = detectSerializeFailure(f, altComponentId, 100);
+ EXPECT_TRUE(tryCnt == 100);
+}
+
+
+TEST_F("require that execute works with const lambda", Fixture)
+{
+ int i = 5;
+ std::vector<int> res;
+ const auto lambda = [i, &res]() mutable
+ { res.push_back(i--); res.push_back(i--); };
+ f._threads.execute(0, lambda);
+ f._threads.execute(0, lambda);
+ f._threads.sync();
+ std::vector<int> exp({5, 4, 5, 4});
+ EXPECT_EQUAL(exp, res);
+ EXPECT_EQUAL(5, i);
+}
+
+TEST_F("require that execute works with reference to lambda", Fixture)
+{
+ int i = 5;
+ std::vector<int> res;
+ auto lambda = [i, &res]() mutable
+ { res.push_back(i--); res.push_back(i--); };
+ auto &lambdaref = lambda;
+ f._threads.execute(0, lambdaref);
+ f._threads.execute(0, lambdaref);
+ f._threads.sync();
+ std::vector<int> exp({5, 4, 5, 4});
+ EXPECT_EQUAL(exp, res);
+ EXPECT_EQUAL(5, i);
+}
+
+TEST_F("require that executeLambda works", Fixture)
+{
+ int i = 5;
+ std::vector<int> res;
+ const auto lambda = [i, &res]() mutable
+ { res.push_back(i--); res.push_back(i--); };
+ f._threads.executeLambda(ISequencedTaskExecutor::ExecutorId(0), lambda);
+ f._threads.sync();
+ std::vector<int> exp({5, 4});
+ EXPECT_EQUAL(exp, res);
+ EXPECT_EQUAL(5, i);
+}
+
+TEST("require that you get correct number of executors") {
+ AdaptiveSequencedExecutor seven(7, 1, 0, 10);
+ EXPECT_EQUAL(7u, seven.getNumExecutors());
+}
+
+TEST("require that you distribute well") {
+ AdaptiveSequencedExecutor seven(7, 1, 0, 10);
+ EXPECT_EQUAL(7u, seven.getNumExecutors());
+ EXPECT_EQUAL(97u, seven.getComponentHashSize());
+ EXPECT_EQUAL(0u, seven.getComponentEffectiveHashSize());
+ for (uint32_t id=0; id < 1000; id++) {
+ EXPECT_EQUAL((id%97)%7, seven.getExecutorId(id).getId());
+ }
+ EXPECT_EQUAL(97u, seven.getComponentHashSize());
+ EXPECT_EQUAL(97u, seven.getComponentEffectiveHashSize());
+}
+
+}
+
+TEST_MAIN() { TEST_RUN_ALL(); }
diff --git a/staging_vespalib/src/tests/sequencedtaskexecutor/foregroundtaskexecutor_test.cpp b/staging_vespalib/src/tests/sequencedtaskexecutor/foregroundtaskexecutor_test.cpp
new file mode 100644
index 00000000000..a2671bb81a7
--- /dev/null
+++ b/staging_vespalib/src/tests/sequencedtaskexecutor/foregroundtaskexecutor_test.cpp
@@ -0,0 +1,120 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/vespalib/util/foregroundtaskexecutor.h>
+#include <vespa/vespalib/testkit/testapp.h>
+
+#include <condition_variable>
+#include <unistd.h>
+
+#include <vespa/log/log.h>
+LOG_SETUP("foregroundtaskexecutor_test");
+
+namespace vespalib {
+
+
+class Fixture
+{
+public:
+ ForegroundTaskExecutor _threads;
+
+ Fixture()
+ : _threads()
+ {
+ }
+};
+
+
+class TestObj
+{
+public:
+ std::mutex _m;
+ std::condition_variable _cv;
+ int _done;
+ int _fail;
+ int _val;
+
+ TestObj()
+ : _m(),
+ _cv(),
+ _done(0),
+ _fail(0),
+ _val(0)
+ {
+ }
+
+ void
+ modify(int oldValue, int newValue)
+ {
+ {
+ std::lock_guard<std::mutex> guard(_m);
+ if (_val == oldValue) {
+ _val = newValue;
+ } else {
+ ++_fail;
+ }
+ ++_done;
+ }
+ _cv.notify_all();
+ }
+
+ void
+ wait(int wantDone)
+ {
+ std::unique_lock<std::mutex> guard(_m);
+ _cv.wait(guard, [=] { return this->_done >= wantDone; });
+ }
+};
+
+TEST_F("testExecute", Fixture) {
+ std::shared_ptr<TestObj> tv(std::make_shared<TestObj>());
+ EXPECT_EQUAL(0, tv->_val);
+ f._threads.execute(1, [=]() { tv->modify(0, 42); });
+ tv->wait(1);
+ EXPECT_EQUAL(0, tv->_fail);
+ EXPECT_EQUAL(42, tv->_val);
+ f._threads.sync();
+ EXPECT_EQUAL(0, tv->_fail);
+ EXPECT_EQUAL(42, tv->_val);
+}
+
+
+TEST_F("require that task with same id are serialized", Fixture)
+{
+ std::shared_ptr<TestObj> tv(std::make_shared<TestObj>());
+ EXPECT_EQUAL(0, tv->_val);
+ f._threads.execute(0, [=]() { usleep(2000); tv->modify(0, 14); });
+ f._threads.execute(0, [=]() { tv->modify(14, 42); });
+ tv->wait(2);
+ EXPECT_EQUAL(0, tv->_fail);
+ EXPECT_EQUAL(42, tv->_val);
+ f._threads.sync();
+ EXPECT_EQUAL(0, tv->_fail);
+ EXPECT_EQUAL(42, tv->_val);
+}
+
+TEST_F("require that task with different ids are serialized", Fixture)
+{
+ int tryCnt = 0;
+ for (tryCnt = 0; tryCnt < 100; ++tryCnt) {
+ std::shared_ptr<TestObj> tv(std::make_shared<TestObj>());
+ EXPECT_EQUAL(0, tv->_val);
+ f._threads.execute(0, [=]() { usleep(2000); tv->modify(0, 14); });
+ f._threads.execute(1, [=]() { tv->modify(14, 42); });
+ tv->wait(2);
+ if (tv->_fail != 1) {
+ continue;
+ }
+ EXPECT_EQUAL(1, tv->_fail);
+ EXPECT_EQUAL(14, tv->_val);
+ f._threads.sync();
+ EXPECT_EQUAL(1, tv->_fail);
+ EXPECT_EQUAL(14, tv->_val);
+ break;
+ }
+ EXPECT_TRUE(tryCnt >= 100);
+}
+
+
+}
+
+TEST_MAIN() { TEST_RUN_ALL(); }
diff --git a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp
new file mode 100644
index 00000000000..042408d439f
--- /dev/null
+++ b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp
@@ -0,0 +1,70 @@
+// Copyright 2020 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/vespalib/util/sequencedtaskexecutor.h>
+#include <vespa/vespalib/util/adaptive_sequenced_executor.h>
+#include <vespa/vespalib/util/lambdatask.h>
+#include <vespa/vespalib/util/time.h>
+#include <atomic>
+
+using vespalib::ISequencedTaskExecutor;
+using vespalib::SequencedTaskExecutor;
+using vespalib::AdaptiveSequencedExecutor;
+using ExecutorId = vespalib::ISequencedTaskExecutor::ExecutorId;
+
+size_t do_work(size_t size) {
+ size_t ret = 0;
+ for (size_t i = 0; i < size; ++i) {
+ for (size_t j = 0; j < 128; ++j) {
+ ret = (ret + i) * j;
+ }
+ }
+ return ret;
+}
+
+struct SimpleParams {
+ int argc;
+ char **argv;
+ int idx;
+ SimpleParams(int argc_in, char **argv_in) : argc(argc_in), argv(argv_in), idx(0) {}
+ int next(const char *name, int fallback) {
+ ++idx;
+ int value = 0;
+ if (argc > idx) {
+ value = atoi(argv[idx]);
+ } else {
+ value = fallback;
+ }
+ fprintf(stderr, "param %s: %d\n", name, value);
+ return value;
+ }
+};
+
+int main(int argc, char **argv) {
+ SimpleParams params(argc, argv);
+ bool use_adaptive_executor = params.next("use_adaptive_executor", 0);
+ bool optimize_for_throughput = params.next("optimize_for_throughput", 0);
+ size_t num_tasks = params.next("num_tasks", 1000000);
+ size_t num_strands = params.next("num_strands", 4);
+ size_t task_limit = params.next("task_limit", 1000);
+ size_t num_threads = params.next("num_threads", num_strands);
+ size_t max_waiting = params.next("max_waiting", optimize_for_throughput ? 32 : 0);
+ size_t work_size = params.next("work_size", 0);
+ std::atomic<long> counter(0);
+ std::unique_ptr<ISequencedTaskExecutor> executor;
+ if (use_adaptive_executor) {
+ executor = std::make_unique<AdaptiveSequencedExecutor>(num_strands, num_threads, max_waiting, task_limit);
+ } else {
+ auto optimize = optimize_for_throughput
+ ? vespalib::Executor::OptimizeFor::THROUGHPUT
+ : vespalib::Executor::OptimizeFor::LATENCY;
+ executor = SequencedTaskExecutor::create(num_strands, task_limit, optimize);
+ }
+ vespalib::Timer timer;
+ for (size_t task_id = 0; task_id < num_tasks; ++task_id) {
+ executor->executeTask(ExecutorId(task_id % num_strands),
+ vespalib::makeLambdaTask([&counter,work_size] { (void) do_work(work_size); counter++; }));
+ }
+ executor.reset();
+ fprintf(stderr, "\ntotal time: %zu ms\n", vespalib::count_ms(timer.elapsed()));
+ return (size_t(counter) == num_tasks) ? 0 : 1;
+}
diff --git a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp
new file mode 100644
index 00000000000..f5f04738e92
--- /dev/null
+++ b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp
@@ -0,0 +1,250 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/vespalib/util/sequencedtaskexecutor.h>
+#include <vespa/vespalib/testkit/testapp.h>
+#include <vespa/vespalib/test/insertion_operators.h>
+
+#include <condition_variable>
+#include <unistd.h>
+
+#include <vespa/log/log.h>
+LOG_SETUP("sequencedtaskexecutor_test");
+
+namespace vespalib {
+
+
+class Fixture
+{
+public:
+ std::unique_ptr<ISequencedTaskExecutor> _threads;
+
+ Fixture() : _threads(SequencedTaskExecutor::create(2)) { }
+};
+
+
+class TestObj
+{
+public:
+ std::mutex _m;
+ std::condition_variable _cv;
+ int _done;
+ int _fail;
+ int _val;
+
+ TestObj()
+ : _m(),
+ _cv(),
+ _done(0),
+ _fail(0),
+ _val(0)
+ {
+ }
+
+ void
+ modify(int oldValue, int newValue)
+ {
+ {
+ std::lock_guard<std::mutex> guard(_m);
+ if (_val == oldValue) {
+ _val = newValue;
+ } else {
+ ++_fail;
+ }
+ ++_done;
+ }
+ _cv.notify_all();
+ }
+
+ void
+ wait(int wantDone)
+ {
+ std::unique_lock<std::mutex> guard(_m);
+ _cv.wait(guard, [=] { return this->_done >= wantDone; });
+ }
+};
+
+TEST_F("testExecute", Fixture) {
+ std::shared_ptr<TestObj> tv(std::make_shared<TestObj>());
+ EXPECT_EQUAL(0, tv->_val);
+ f._threads->execute(1, [=]() { tv->modify(0, 42); });
+ tv->wait(1);
+ EXPECT_EQUAL(0, tv->_fail);
+ EXPECT_EQUAL(42, tv->_val);
+ f._threads->sync();
+ EXPECT_EQUAL(0, tv->_fail);
+ EXPECT_EQUAL(42, tv->_val);
+}
+
+
+TEST_F("require that task with same component id are serialized", Fixture)
+{
+ std::shared_ptr<TestObj> tv(std::make_shared<TestObj>());
+ EXPECT_EQUAL(0, tv->_val);
+ f._threads->execute(0, [=]() { usleep(2000); tv->modify(0, 14); });
+ f._threads->execute(0, [=]() { tv->modify(14, 42); });
+ tv->wait(2);
+ EXPECT_EQUAL(0, tv->_fail);
+ EXPECT_EQUAL(42, tv->_val);
+ f._threads->sync();
+ EXPECT_EQUAL(0, tv->_fail);
+ EXPECT_EQUAL(42, tv->_val);
+}
+
+TEST_F("require that task with different component ids are not serialized", Fixture)
+{
+ int tryCnt = 0;
+ for (tryCnt = 0; tryCnt < 100; ++tryCnt) {
+ std::shared_ptr<TestObj> tv(std::make_shared<TestObj>());
+ EXPECT_EQUAL(0, tv->_val);
+ f._threads->execute(0, [=]() { usleep(2000); tv->modify(0, 14); });
+ f._threads->execute(2, [=]() { tv->modify(14, 42); });
+ tv->wait(2);
+ if (tv->_fail != 1) {
+ continue;
+ }
+ EXPECT_EQUAL(1, tv->_fail);
+ EXPECT_EQUAL(14, tv->_val);
+ f._threads->sync();
+ EXPECT_EQUAL(1, tv->_fail);
+ EXPECT_EQUAL(14, tv->_val);
+ break;
+ }
+ EXPECT_TRUE(tryCnt < 100);
+}
+
+
+TEST_F("require that task with same string component id are serialized", Fixture)
+{
+ std::shared_ptr<TestObj> tv(std::make_shared<TestObj>());
+ EXPECT_EQUAL(0, tv->_val);
+ auto test2 = [=]() { tv->modify(14, 42); };
+ f._threads->execute(f._threads->getExecutorId("0"), [=]() { usleep(2000); tv->modify(0, 14); });
+ f._threads->execute(f._threads->getExecutorId("0"), test2);
+ tv->wait(2);
+ EXPECT_EQUAL(0, tv->_fail);
+ EXPECT_EQUAL(42, tv->_val);
+ f._threads->sync();
+ EXPECT_EQUAL(0, tv->_fail);
+ EXPECT_EQUAL(42, tv->_val);
+}
+
+namespace {
+
+int detectSerializeFailure(Fixture &f, vespalib::stringref altComponentId, int tryLimit)
+{
+ int tryCnt = 0;
+ for (tryCnt = 0; tryCnt < tryLimit; ++tryCnt) {
+ std::shared_ptr<TestObj> tv(std::make_shared<TestObj>());
+ EXPECT_EQUAL(0, tv->_val);
+ f._threads->execute(f._threads->getExecutorId("0"), [=]() { usleep(2000); tv->modify(0, 14); });
+ f._threads->execute(f._threads->getExecutorId(altComponentId), [=]() { tv->modify(14, 42); });
+ tv->wait(2);
+ if (tv->_fail != 1) {
+ continue;
+ }
+ EXPECT_EQUAL(1, tv->_fail);
+ EXPECT_EQUAL(14, tv->_val);
+ f._threads->sync();
+ EXPECT_EQUAL(1, tv->_fail);
+ EXPECT_EQUAL(14, tv->_val);
+ break;
+ }
+ return tryCnt;
+}
+
+vespalib::string makeAltComponentId(Fixture &f)
+{
+ int tryCnt = 0;
+ char altComponentId[20];
+ ISequencedTaskExecutor::ExecutorId executorId0 = f._threads->getExecutorId("0");
+ for (tryCnt = 1; tryCnt < 100; ++tryCnt) {
+ sprintf(altComponentId, "%d", tryCnt);
+ if (f._threads->getExecutorId(altComponentId) == executorId0) {
+ break;
+ }
+ }
+ EXPECT_TRUE(tryCnt < 100);
+ return altComponentId;
+}
+
+}
+
+TEST_F("require that task with different string component ids are not serialized", Fixture)
+{
+ int tryCnt = detectSerializeFailure(f, "2", 100);
+ EXPECT_TRUE(tryCnt < 100);
+}
+
+
+TEST_F("require that task with different string component ids mapping to the same executor id are serialized",
+ Fixture)
+{
+ vespalib::string altComponentId = makeAltComponentId(f);
+ LOG(info, "second string component id is \"%s\"", altComponentId.c_str());
+ int tryCnt = detectSerializeFailure(f, altComponentId, 100);
+ EXPECT_TRUE(tryCnt == 100);
+}
+
+
+TEST_F("require that execute works with const lambda", Fixture)
+{
+ int i = 5;
+ std::vector<int> res;
+ const auto lambda = [i, &res]() mutable
+ { res.push_back(i--); res.push_back(i--); };
+ f._threads->execute(0, lambda);
+ f._threads->execute(0, lambda);
+ f._threads->sync();
+ std::vector<int> exp({5, 4, 5, 4});
+ EXPECT_EQUAL(exp, res);
+ EXPECT_EQUAL(5, i);
+}
+
+TEST_F("require that execute works with reference to lambda", Fixture)
+{
+ int i = 5;
+ std::vector<int> res;
+ auto lambda = [i, &res]() mutable
+ { res.push_back(i--); res.push_back(i--); };
+ auto &lambdaref = lambda;
+ f._threads->execute(0, lambdaref);
+ f._threads->execute(0, lambdaref);
+ f._threads->sync();
+ std::vector<int> exp({5, 4, 5, 4});
+ EXPECT_EQUAL(exp, res);
+ EXPECT_EQUAL(5, i);
+}
+
+TEST_F("require that executeLambda works", Fixture)
+{
+ int i = 5;
+ std::vector<int> res;
+ const auto lambda = [i, &res]() mutable
+ { res.push_back(i--); res.push_back(i--); };
+ f._threads->executeLambda(ISequencedTaskExecutor::ExecutorId(0), lambda);
+ f._threads->sync();
+ std::vector<int> exp({5, 4});
+ EXPECT_EQUAL(exp, res);
+ EXPECT_EQUAL(5, i);
+}
+
+TEST("require that you get correct number of executors") {
+ auto seven = SequencedTaskExecutor::create(7);
+ EXPECT_EQUAL(7u, seven->getNumExecutors());
+}
+
+TEST("require that you distribute well") {
+ auto seven = SequencedTaskExecutor::create(7);
+ EXPECT_EQUAL(7u, seven->getNumExecutors());
+ EXPECT_EQUAL(97u, seven->getComponentHashSize());
+ EXPECT_EQUAL(0u, seven->getComponentEffectiveHashSize());
+ for (uint32_t id=0; id < 1000; id++) {
+ EXPECT_EQUAL((id%97)%7, seven->getExecutorId(id).getId());
+ }
+ EXPECT_EQUAL(97u, seven->getComponentHashSize());
+ EXPECT_EQUAL(97u, seven->getComponentEffectiveHashSize());
+}
+
+}
+
+TEST_MAIN() { TEST_RUN_ALL(); }
diff --git a/staging_vespalib/src/vespa/vespalib/util/CMakeLists.txt b/staging_vespalib/src/vespa/vespalib/util/CMakeLists.txt
index ba03b77c941..586e06396e7 100644
--- a/staging_vespalib/src/vespa/vespalib/util/CMakeLists.txt
+++ b/staging_vespalib/src/vespa/vespalib/util/CMakeLists.txt
@@ -1,11 +1,14 @@
# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
vespa_add_library(staging_vespalib_vespalib_util OBJECT
SOURCES
+ adaptive_sequenced_executor.cpp
bits.cpp
clock.cpp
crc.cpp
doom.cpp
+ foregroundtaskexecutor.cpp
growablebytebuffer.cpp
+ isequencedtaskexecutor.cpp
jsonexception.cpp
jsonstream.cpp
jsonwriter.cpp
@@ -15,6 +18,8 @@ vespa_add_library(staging_vespalib_vespalib_util OBJECT
programoptions_testutils.cpp
document_runnable.cpp
rusage.cpp
+ sequencedtaskexecutor.cpp
+ sequencedtaskexecutorobserver.cpp
shutdownguard.cpp
scheduledexecutor.cpp
singleexecutor.cpp
diff --git a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp
new file mode 100644
index 00000000000..50bc3b020a8
--- /dev/null
+++ b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp
@@ -0,0 +1,324 @@
+// Copyright 2020 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "adaptive_sequenced_executor.h"
+
+namespace vespalib {
+
+//-----------------------------------------------------------------------------
+
+AdaptiveSequencedExecutor::Strand::Strand()
+ : state(State::IDLE),
+ queue()
+{
+}
+
+AdaptiveSequencedExecutor::Strand::~Strand()
+{
+ assert(queue.empty());
+}
+
+//-----------------------------------------------------------------------------
+
+AdaptiveSequencedExecutor::Worker::Worker()
+ : cond(),
+ state(State::RUNNING),
+ strand(nullptr)
+{
+}
+
+AdaptiveSequencedExecutor::Worker::~Worker()
+{
+ assert(state == State::DONE);
+ assert(strand == nullptr);
+}
+
+//-----------------------------------------------------------------------------
+
+AdaptiveSequencedExecutor::Self::Self()
+ : cond(),
+ state(State::OPEN),
+ waiting_tasks(0),
+ pending_tasks(0)
+{
+}
+
+AdaptiveSequencedExecutor::Self::~Self()
+{
+ assert(state == State::CLOSED);
+ assert(waiting_tasks == 0);
+ assert(pending_tasks == 0);
+}
+
+//-----------------------------------------------------------------------------
+
+AdaptiveSequencedExecutor::ThreadTools::ThreadTools(AdaptiveSequencedExecutor &parent_in)
+ : parent(parent_in),
+ pool(std::make_unique<FastOS_ThreadPool>(STACK_SIZE)),
+ allow_worker_exit()
+{
+}
+
+AdaptiveSequencedExecutor::ThreadTools::~ThreadTools()
+{
+ assert(pool->isClosed());
+}
+
+void
+AdaptiveSequencedExecutor::ThreadTools::Run(FastOS_ThreadInterface *, void *)
+{
+ parent.worker_main();
+}
+
+void
+AdaptiveSequencedExecutor::ThreadTools::start(size_t num_threads)
+{
+ for (size_t i = 0; i < num_threads; ++i) {
+ FastOS_ThreadInterface *thread = pool->NewThread(this);
+ assert(thread != nullptr);
+ (void)thread;
+ }
+}
+
+void
+AdaptiveSequencedExecutor::ThreadTools::close()
+{
+ allow_worker_exit.countDown();
+ pool->Close();
+}
+
+//-----------------------------------------------------------------------------
+
+void
+AdaptiveSequencedExecutor::maybe_block_self(std::unique_lock<std::mutex> &lock)
+{
+ while (_self.state == Self::State::BLOCKED) {
+ _self.cond.wait(lock);
+ }
+ while ((_self.state == Self::State::OPEN) && (_self.pending_tasks >= _cfg.max_pending)) {
+ _self.state = Self::State::BLOCKED;
+ while (_self.state == Self::State::BLOCKED) {
+ _self.cond.wait(lock);
+ }
+ }
+}
+
+bool
+AdaptiveSequencedExecutor::maybe_unblock_self(const std::unique_lock<std::mutex> &)
+{
+ if ((_self.state == Self::State::BLOCKED) && (_self.pending_tasks < _cfg.wakeup_limit)) {
+ _self.state = Self::State::OPEN;
+ return true;
+ }
+ return false;
+}
+
+AdaptiveSequencedExecutor::Worker *
+AdaptiveSequencedExecutor::get_worker_to_wake(const std::unique_lock<std::mutex> &)
+{
+ if ((_self.waiting_tasks > _cfg.max_waiting) && (!_worker_stack.empty())) {
+ assert(!_wait_queue.empty());
+ Worker *worker = _worker_stack.back();
+ _worker_stack.popBack();
+ assert(worker->state == Worker::State::BLOCKED);
+ assert(worker->strand == nullptr);
+ worker->state = Worker::State::RUNNING;
+ worker->strand = _wait_queue.front();
+ _wait_queue.pop();
+ assert(worker->strand->state == Strand::State::WAITING);
+ assert(!worker->strand->queue.empty());
+ worker->strand->state = Strand::State::ACTIVE;
+ assert(_self.waiting_tasks >= worker->strand->queue.size());
+ _self.waiting_tasks -= worker->strand->queue.size();
+ return worker;
+ }
+ return nullptr;
+}
+
+bool
+AdaptiveSequencedExecutor::obtain_strand(Worker &worker, std::unique_lock<std::mutex> &lock)
+{
+ assert(worker.strand == nullptr);
+ if (!_wait_queue.empty()) {
+ worker.strand = _wait_queue.front();
+ _wait_queue.pop();
+ assert(worker.strand->state == Strand::State::WAITING);
+ assert(!worker.strand->queue.empty());
+ worker.strand->state = Strand::State::ACTIVE;
+ assert(_self.waiting_tasks >= worker.strand->queue.size());
+ _self.waiting_tasks -= worker.strand->queue.size();
+ } else if (_self.state == Self::State::CLOSED) {
+ worker.state = Worker::State::DONE;
+ } else {
+ worker.state = Worker::State::BLOCKED;
+ _worker_stack.push(&worker);
+ while (worker.state == Worker::State::BLOCKED) {
+ worker.cond.wait(lock);
+ }
+ }
+ return (worker.state == Worker::State::RUNNING);
+}
+
+bool
+AdaptiveSequencedExecutor::exchange_strand(Worker &worker, std::unique_lock<std::mutex> &lock)
+{
+ if (worker.strand == nullptr) {
+ return obtain_strand(worker, lock);
+ }
+ if (worker.strand->queue.empty()) {
+ worker.strand->state = Strand::State::IDLE;
+ worker.strand = nullptr;
+ return obtain_strand(worker, lock);
+ }
+ if (!_wait_queue.empty()) {
+ worker.strand->state = Strand::State::WAITING;
+ _self.waiting_tasks += worker.strand->queue.size();
+ _wait_queue.push(worker.strand);
+ worker.strand = nullptr;
+ return obtain_strand(worker, lock);
+ }
+ return true;
+}
+
+AdaptiveSequencedExecutor::Task::UP
+AdaptiveSequencedExecutor::next_task(Worker &worker)
+{
+ Task::UP task;
+ Worker *worker_to_wake = nullptr;
+ auto guard = std::unique_lock(_mutex);
+ if (exchange_strand(worker, guard)) {
+ assert(worker.state == Worker::State::RUNNING);
+ assert(worker.strand != nullptr);
+ assert(!worker.strand->queue.empty());
+ task = std::move(worker.strand->queue.front());
+ worker.strand->queue.pop();
+ _stats.queueSize.add(--_self.pending_tasks);
+ worker_to_wake = get_worker_to_wake(guard);
+ } else {
+ assert(worker.state == Worker::State::DONE);
+ assert(worker.strand == nullptr);
+ }
+ bool signal_self = maybe_unblock_self(guard);
+ guard.unlock(); // UNLOCK
+ if (worker_to_wake != nullptr) {
+ worker_to_wake->cond.notify_one();
+ }
+ if (signal_self) {
+ _self.cond.notify_all();
+ }
+ return task;
+}
+
+void
+AdaptiveSequencedExecutor::worker_main()
+{
+ Worker worker;
+ while (Task::UP my_task = next_task(worker)) {
+ my_task->run();
+ }
+ _thread_tools->allow_worker_exit.await();
+}
+
+AdaptiveSequencedExecutor::AdaptiveSequencedExecutor(size_t num_strands, size_t num_threads,
+ size_t max_waiting, size_t max_pending)
+ : ISequencedTaskExecutor(num_strands),
+ _thread_tools(std::make_unique<ThreadTools>(*this)),
+ _mutex(),
+ _strands(num_strands),
+ _wait_queue(num_strands),
+ _worker_stack(num_threads),
+ _self(),
+ _stats(),
+ _cfg(num_threads, max_waiting, max_pending)
+{
+ _stats.queueSize.add(_self.pending_tasks);
+ _thread_tools->start(num_threads);
+}
+
+AdaptiveSequencedExecutor::~AdaptiveSequencedExecutor()
+{
+ sync();
+ {
+ auto guard = std::unique_lock(_mutex);
+ assert(_self.state == Self::State::OPEN);
+ _self.state = Self::State::CLOSED;
+ while (!_worker_stack.empty()) {
+ Worker *worker = _worker_stack.back();
+ _worker_stack.popBack();
+ assert(worker->state == Worker::State::BLOCKED);
+ assert(worker->strand == nullptr);
+ worker->state = Worker::State::DONE;
+ worker->cond.notify_one();
+ }
+ _self.cond.notify_all();
+ }
+ _thread_tools->close();
+ assert(_wait_queue.empty());
+ assert(_worker_stack.empty());
+}
+
+void
+AdaptiveSequencedExecutor::executeTask(ExecutorId id, Task::UP task)
+{
+ assert(id.getId() < _strands.size());
+ Strand &strand = _strands[id.getId()];
+ auto guard = std::unique_lock(_mutex);
+ maybe_block_self(guard);
+ assert(_self.state != Self::State::CLOSED);
+ strand.queue.push(std::move(task));
+ _stats.queueSize.add(++_self.pending_tasks);
+ ++_stats.acceptedTasks;
+ if (strand.state == Strand::State::WAITING) {
+ ++_self.waiting_tasks;
+ } else if (strand.state == Strand::State::IDLE) {
+ if (_worker_stack.size() < _cfg.num_threads) {
+ strand.state = Strand::State::WAITING;
+ _wait_queue.push(&strand);
+ _self.waiting_tasks += strand.queue.size();
+ } else {
+ strand.state = Strand::State::ACTIVE;
+ assert(_wait_queue.empty());
+ Worker *worker = _worker_stack.back();
+ _worker_stack.popBack();
+ assert(worker->state == Worker::State::BLOCKED);
+ assert(worker->strand == nullptr);
+ worker->state = Worker::State::RUNNING;
+ worker->strand = &strand;
+ guard.unlock(); // UNLOCK
+ worker->cond.notify_one();
+ }
+ }
+}
+
+void
+AdaptiveSequencedExecutor::sync()
+{
+ vespalib::CountDownLatch latch(_strands.size());
+ for (size_t i = 0; i < _strands.size(); ++i) {
+ execute(ExecutorId(i), [&](){ latch.countDown(); });
+ }
+ latch.await();
+}
+
+void
+AdaptiveSequencedExecutor::setTaskLimit(uint32_t task_limit)
+{
+ auto guard = std::unique_lock(_mutex);
+ _cfg.set_max_pending(task_limit);
+ bool signal_self = maybe_unblock_self(guard);
+ guard.unlock(); // UNLOCK
+ if (signal_self) {
+ _self.cond.notify_all();
+ }
+}
+
+AdaptiveSequencedExecutor::Stats
+AdaptiveSequencedExecutor::getStats()
+{
+ auto guard = std::lock_guard(_mutex);
+ Stats stats = _stats;
+ _stats = Stats();
+ _stats.queueSize.add(_self.pending_tasks);
+ return stats;
+}
+
+}
diff --git a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h
new file mode 100644
index 00000000000..bc3457a72ef
--- /dev/null
+++ b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h
@@ -0,0 +1,126 @@
+// Copyright 2020 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "isequencedtaskexecutor.h"
+#include <vespa/vespalib/util/arrayqueue.hpp>
+#include <vespa/vespalib/util/gate.h>
+#include <vespa/fastos/thread.h>
+#include <mutex>
+#include <condition_variable>
+#include <cassert>
+
+namespace vespalib {
+
+/**
+ * Sequenced executor that balances the number of active threads in
+ * order to optimize for throughput over latency by minimizing the
+ * number of critical-path wakeups.
+ **/
+class AdaptiveSequencedExecutor : public ISequencedTaskExecutor
+{
+private:
+ using Stats = vespalib::ExecutorStats;
+ using Task = vespalib::Executor::Task;
+
+ /**
+ * Values used to configure the executor.
+ **/
+ struct Config {
+ size_t num_threads;
+ size_t max_waiting;
+ size_t max_pending;
+ size_t wakeup_limit;
+ void set_max_pending(size_t max_pending_in) {
+ max_pending = std::max(1uL, max_pending_in);
+ wakeup_limit = std::max(1uL, size_t(max_pending * 0.9));
+ assert(wakeup_limit > 0);
+ assert(wakeup_limit <= max_pending);
+ }
+ Config(size_t num_threads_in, size_t max_waiting_in, size_t max_pending_in)
+ : num_threads(num_threads_in), max_waiting(max_waiting_in), max_pending(1000), wakeup_limit(900)
+ {
+ assert(num_threads > 0);
+ set_max_pending(max_pending_in);
+ }
+ };
+
+ /**
+ * Tasks that need to be sequenced are handled by a single strand.
+ **/
+ struct Strand {
+ enum class State { IDLE, WAITING, ACTIVE };
+ State state;
+ vespalib::ArrayQueue<Task::UP> queue;
+ Strand();
+ ~Strand();
+ };
+
+ /**
+ * The state of a single worker thread.
+ **/
+ struct Worker {
+ enum class State { RUNNING, BLOCKED, DONE };
+ std::condition_variable cond;
+ State state;
+ Strand *strand;
+ Worker();
+ ~Worker();
+ };
+
+ /**
+ * State related to the executor itself.
+ **/
+ struct Self {
+ enum class State { OPEN, BLOCKED, CLOSED };
+ std::condition_variable cond;
+ State state;
+ size_t waiting_tasks;
+ size_t pending_tasks;
+ Self();
+ ~Self();
+ };
+
+ /**
+ * Stuff related to worker thread startup and shutdown.
+ **/
+ struct ThreadTools : FastOS_Runnable {
+ static constexpr size_t STACK_SIZE = (256 * 1024);
+ AdaptiveSequencedExecutor &parent;
+ std::unique_ptr<FastOS_ThreadPool> pool;
+ vespalib::Gate allow_worker_exit;
+ ThreadTools(AdaptiveSequencedExecutor &parent_in);
+ ~ThreadTools();
+ void Run(FastOS_ThreadInterface *, void *) override;
+ void start(size_t num_threads);
+ void close();
+ };
+
+ std::unique_ptr<ThreadTools> _thread_tools;
+ std::mutex _mutex;
+ std::vector<Strand> _strands;
+ vespalib::ArrayQueue<Strand*> _wait_queue;
+ vespalib::ArrayQueue<Worker*> _worker_stack;
+ Self _self;
+ Stats _stats;
+ Config _cfg;
+
+ void maybe_block_self(std::unique_lock<std::mutex> &lock);
+ bool maybe_unblock_self(const std::unique_lock<std::mutex> &lock);
+
+ Worker *get_worker_to_wake(const std::unique_lock<std::mutex> &lock);
+ bool obtain_strand(Worker &worker, std::unique_lock<std::mutex> &lock);
+ bool exchange_strand(Worker &worker, std::unique_lock<std::mutex> &lock);
+ Task::UP next_task(Worker &worker);
+ void worker_main();
+public:
+ AdaptiveSequencedExecutor(size_t num_strands, size_t num_threads,
+ size_t max_waiting, size_t max_pending);
+ ~AdaptiveSequencedExecutor() override;
+ void executeTask(ExecutorId id, Task::UP task) override;
+ void sync() override;
+ void setTaskLimit(uint32_t task_limit) override;
+ vespalib::ExecutorStats getStats() override;
+};
+
+}
diff --git a/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.cpp
new file mode 100644
index 00000000000..b45ada1c58c
--- /dev/null
+++ b/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.cpp
@@ -0,0 +1,42 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "foregroundtaskexecutor.h"
+#include <cassert>
+
+namespace vespalib {
+
+ForegroundTaskExecutor::ForegroundTaskExecutor()
+ : ForegroundTaskExecutor(1)
+{
+}
+
+ForegroundTaskExecutor::ForegroundTaskExecutor(uint32_t threads)
+ : ISequencedTaskExecutor(threads),
+ _accepted(0)
+{
+}
+
+ForegroundTaskExecutor::~ForegroundTaskExecutor() = default;
+
+void
+ForegroundTaskExecutor::executeTask(ExecutorId id, vespalib::Executor::Task::UP task)
+{
+ assert(id.getId() < getNumExecutors());
+ task->run();
+ _accepted++;
+}
+
+void
+ForegroundTaskExecutor::sync()
+{
+}
+
+void ForegroundTaskExecutor::setTaskLimit(uint32_t) {
+
+}
+
+vespalib::ExecutorStats ForegroundTaskExecutor::getStats() {
+ return vespalib::ExecutorStats(vespalib::ExecutorStats::QueueSizeT(0) , _accepted.load(std::memory_order_relaxed), 0);
+}
+
+} // namespace search
diff --git a/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.h b/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.h
new file mode 100644
index 00000000000..d9a348ed012
--- /dev/null
+++ b/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.h
@@ -0,0 +1,34 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include "isequencedtaskexecutor.h"
+#include <atomic>
+
+namespace vespalib {
+
+/**
+ * Class to run multiple tasks in parallel, but tasks with same
+ * id has to be run in sequence.
+ *
+ * Currently, this is a dummy version that runs everything in the foreground.
+ */
+class ForegroundTaskExecutor : public ISequencedTaskExecutor
+{
+public:
+ using ISequencedTaskExecutor::getExecutorId;
+
+ ForegroundTaskExecutor();
+ ForegroundTaskExecutor(uint32_t threads);
+ ~ForegroundTaskExecutor() override;
+
+ void executeTask(ExecutorId id, vespalib::Executor::Task::UP task) override;
+ void sync() override;
+
+ void setTaskLimit(uint32_t taskLimit) override;
+
+ vespalib::ExecutorStats getStats() override;
+private:
+ std::atomic<uint64_t> _accepted;
+};
+
+} // namespace search
diff --git a/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.cpp
new file mode 100644
index 00000000000..d05702cc85b
--- /dev/null
+++ b/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.cpp
@@ -0,0 +1,46 @@
+// Copyright 2020 Oath inc.. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "isequencedtaskexecutor.h"
+#include <vespa/vespalib/stllike/hash_fun.h>
+#include <vespa/vespalib/stllike/hashtable.h>
+#include <cassert>
+
+namespace vespalib {
+
+namespace {
+ constexpr uint8_t MAGIC = 255;
+}
+
+ISequencedTaskExecutor::ISequencedTaskExecutor(uint32_t numExecutors)
+ : _component2Id(vespalib::hashtable_base::getModuloStl(numExecutors*8), MAGIC),
+ _mutex(),
+ _numExecutors(numExecutors),
+ _nextId(0)
+{
+ assert(numExecutors < 256);
+}
+
+ISequencedTaskExecutor::~ISequencedTaskExecutor() = default;
+
+ISequencedTaskExecutor::ExecutorId
+ISequencedTaskExecutor::getExecutorId(vespalib::stringref componentId) const {
+ vespalib::hash<vespalib::stringref> hashfun;
+ return getExecutorId(hashfun(componentId));
+}
+
+ISequencedTaskExecutor::ExecutorId
+ISequencedTaskExecutor::getExecutorId(uint64_t componentId) const {
+ uint32_t shrunkId = componentId % _component2Id.size();
+ uint8_t executorId = _component2Id[shrunkId];
+ if (executorId == MAGIC) {
+ std::lock_guard guard(_mutex);
+ if (_component2Id[shrunkId] == MAGIC) {
+ _component2Id[shrunkId] = _nextId % getNumExecutors();
+ _nextId++;
+ }
+ executorId = _component2Id[shrunkId];
+ }
+ return ExecutorId(executorId);
+}
+
+}
diff --git a/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h b/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h
new file mode 100644
index 00000000000..cd2a6c6f0d8
--- /dev/null
+++ b/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h
@@ -0,0 +1,113 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <vespa/vespalib/util/executor.h>
+#include <vespa/vespalib/util/executor_stats.h>
+#include <vespa/vespalib/stllike/string.h>
+#include <vespa/vespalib/util/lambdatask.h>
+#include <vector>
+#include <mutex>
+
+namespace vespalib {
+
+/**
+ * Interface class to run multiple tasks in parallel, but tasks with same
+ * id has to be run in sequence.
+ */
+class ISequencedTaskExecutor
+{
+public:
+ class ExecutorId {
+ public:
+ ExecutorId() : ExecutorId(0) { }
+ explicit ExecutorId(uint32_t id) : _id(id) { }
+ uint32_t getId() const { return _id; }
+ bool operator != (ExecutorId rhs) const { return _id != rhs._id; }
+ bool operator == (ExecutorId rhs) const { return _id == rhs._id; }
+ bool operator < (ExecutorId rhs) const { return _id < rhs._id; }
+ private:
+ uint32_t _id;
+ };
+ ISequencedTaskExecutor(uint32_t numExecutors);
+ virtual ~ISequencedTaskExecutor();
+
+ /**
+ * Calculate which executor will handle an component.
+ *
+ * @param componentId component id
+ * @return executor id
+ */
+ ExecutorId getExecutorId(uint64_t componentId) const;
+ uint32_t getNumExecutors() const { return _numExecutors; }
+
+ ExecutorId getExecutorId(vespalib::stringref componentId) const;
+
+ /**
+ * Schedule a task to run after all previously scheduled tasks with
+ * same id.
+ *
+ * @param id which internal executor to use
+ * @param task unique pointer to the task to be executed
+ */
+ virtual void executeTask(ExecutorId id, vespalib::Executor::Task::UP task) = 0;
+
+ /**
+ * Wrap lambda function into a task and schedule it to be run.
+ * Caller must ensure that pointers and references are valid and
+ * call sync before tearing down pointed to/referenced data.
+ *
+ * @param id which internal executor to use
+ * @param function function to be wrapped in a task and later executed
+ */
+ template <class FunctionType>
+ void executeLambda(ExecutorId id, FunctionType &&function) {
+ executeTask(id, vespalib::makeLambdaTask(std::forward<FunctionType>(function)));
+ }
+ /**
+ * Wait for all scheduled tasks to complete.
+ */
+ virtual void sync() = 0;
+
+ virtual void setTaskLimit(uint32_t taskLimit) = 0;
+
+ virtual vespalib::ExecutorStats getStats() = 0;
+
+ /**
+ * Wrap lambda function into a task and schedule it to be run.
+ * Caller must ensure that pointers and references are valid and
+ * call sync before tearing down pointed to/referenced data.
+ *
+ * @param componentId component id
+ * @param function function to be wrapped in a task and later executed
+ */
+ template <class FunctionType>
+ void execute(uint64_t componentId, FunctionType &&function) {
+ ExecutorId id = getExecutorId(componentId);
+ executeTask(id, vespalib::makeLambdaTask(std::forward<FunctionType>(function)));
+ }
+
+ /**
+ * Wrap lambda function into a task and schedule it to be run.
+ * Caller must ensure that pointers and references are valid and
+ * call sync before tearing down pointed to/referenced data.
+ *
+ * @param id executor id
+ * @param function function to be wrapped in a task and later executed
+ */
+ template <class FunctionType>
+ void execute(ExecutorId id, FunctionType &&function) {
+ executeTask(id, vespalib::makeLambdaTask(std::forward<FunctionType>(function)));
+ }
+ /**
+ * For testing only
+ */
+ uint32_t getComponentHashSize() const { return _component2Id.size(); }
+ uint32_t getComponentEffectiveHashSize() const { return _nextId; }
+private:
+ mutable std::vector<uint8_t> _component2Id;
+ mutable std::mutex _mutex;
+ uint32_t _numExecutors;
+ mutable uint32_t _nextId;
+};
+
+}
diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp
new file mode 100644
index 00000000000..aa43bfaae7d
--- /dev/null
+++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp
@@ -0,0 +1,83 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "sequencedtaskexecutor.h"
+#include "singleexecutor.h"
+#include <vespa/vespalib/util/blockingthreadstackexecutor.h>
+
+namespace vespalib {
+
+namespace {
+
+constexpr uint32_t stackSize = 128 * 1024;
+
+}
+
+
+std::unique_ptr<ISequencedTaskExecutor>
+SequencedTaskExecutor::create(uint32_t threads, uint32_t taskLimit, OptimizeFor optimize)
+{
+ auto executors = std::make_unique<std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>>>();
+ executors->reserve(threads);
+ for (uint32_t id = 0; id < threads; ++id) {
+ if (optimize == OptimizeFor::THROUGHPUT) {
+ executors->push_back(std::make_unique<SingleExecutor>(taskLimit));
+ } else {
+ executors->push_back(std::make_unique<BlockingThreadStackExecutor>(1, stackSize, taskLimit));
+ }
+ }
+ return std::unique_ptr<ISequencedTaskExecutor>(new SequencedTaskExecutor(std::move(executors)));
+}
+
+SequencedTaskExecutor::~SequencedTaskExecutor()
+{
+ sync();
+}
+
+SequencedTaskExecutor::SequencedTaskExecutor(std::unique_ptr<std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>>> executors)
+ : ISequencedTaskExecutor(executors->size()),
+ _executors(std::move(executors))
+{
+}
+
+void
+SequencedTaskExecutor::setTaskLimit(uint32_t taskLimit)
+{
+ for (const auto &executor : *_executors) {
+ executor->setTaskLimit(taskLimit);
+ }
+}
+
+void
+SequencedTaskExecutor::executeTask(ExecutorId id, vespalib::Executor::Task::UP task)
+{
+ assert(id.getId() < _executors->size());
+ auto rejectedTask = (*_executors)[id.getId()]->execute(std::move(task));
+ assert(!rejectedTask);
+}
+
+void
+SequencedTaskExecutor::sync()
+{
+ for (auto &executor : *_executors) {
+ SingleExecutor * single = dynamic_cast<vespalib::SingleExecutor *>(executor.get());
+ if (single) {
+ //Enforce parallel wakeup of napping executors.
+ single->startSync();
+ }
+ }
+ for (auto &executor : *_executors) {
+ executor->sync();
+ }
+}
+
+SequencedTaskExecutor::Stats
+SequencedTaskExecutor::getStats()
+{
+ Stats accumulatedStats;
+ for (auto &executor :* _executors) {
+ accumulatedStats += executor->getStats();
+ }
+ return accumulatedStats;
+}
+
+} // namespace search
diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h
new file mode 100644
index 00000000000..14127970403
--- /dev/null
+++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h
@@ -0,0 +1,38 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include "isequencedtaskexecutor.h"
+
+namespace vespalib {
+
+class SyncableThreadExecutor;
+
+/**
+ * Class to run multiple tasks in parallel, but tasks with same
+ * id has to be run in sequence.
+ */
+class SequencedTaskExecutor final : public ISequencedTaskExecutor
+{
+ using Stats = vespalib::ExecutorStats;
+ std::unique_ptr<std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>>> _executors;
+
+ SequencedTaskExecutor(std::unique_ptr<std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>>> executor);
+public:
+ using ISequencedTaskExecutor::getExecutorId;
+ using OptimizeFor = vespalib::Executor::OptimizeFor;
+
+ ~SequencedTaskExecutor();
+
+ void setTaskLimit(uint32_t taskLimit) override;
+ void executeTask(ExecutorId id, vespalib::Executor::Task::UP task) override;
+ void sync() override;
+ Stats getStats() override;
+
+ /*
+ * Note that if you choose Optimize::THROUGHPUT, you must ensure only a single producer, or synchronize on the outside.
+ */
+ static std::unique_ptr<ISequencedTaskExecutor>
+ create(uint32_t threads, uint32_t taskLimit = 1000, OptimizeFor optimize = OptimizeFor::LATENCY);
+};
+
+} // namespace search
diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.cpp b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.cpp
new file mode 100644
index 00000000000..3d9ed4e21f4
--- /dev/null
+++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.cpp
@@ -0,0 +1,52 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "sequencedtaskexecutorobserver.h"
+
+namespace vespalib {
+
+SequencedTaskExecutorObserver::SequencedTaskExecutorObserver(ISequencedTaskExecutor &executor)
+ : ISequencedTaskExecutor(executor.getNumExecutors()),
+ _executor(executor),
+ _executeCnt(0u),
+ _syncCnt(0u),
+ _executeHistory(),
+ _mutex()
+{
+}
+
+SequencedTaskExecutorObserver::~SequencedTaskExecutorObserver() = default;
+
+void
+SequencedTaskExecutorObserver::executeTask(ExecutorId id, vespalib::Executor::Task::UP task)
+{
+ ++_executeCnt;
+ {
+ std::lock_guard<std::mutex> guard(_mutex);
+ _executeHistory.emplace_back(id.getId());
+ }
+ _executor.executeTask(id, std::move(task));
+}
+
+void
+SequencedTaskExecutorObserver::sync()
+{
+ ++_syncCnt;
+ _executor.sync();
+}
+
+std::vector<uint32_t>
+SequencedTaskExecutorObserver::getExecuteHistory()
+{
+ std::lock_guard<std::mutex> guard(_mutex);
+ return _executeHistory;
+}
+
+void SequencedTaskExecutorObserver::setTaskLimit(uint32_t taskLimit) {
+ _executor.setTaskLimit(taskLimit);
+}
+
+vespalib::ExecutorStats SequencedTaskExecutorObserver::getStats() {
+ return _executor.getStats();
+}
+
+} // namespace search
diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.h b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.h
new file mode 100644
index 00000000000..9307a7ddb37
--- /dev/null
+++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.h
@@ -0,0 +1,38 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include "isequencedtaskexecutor.h"
+#include <atomic>
+
+namespace vespalib {
+
+/**
+ * Observer class to observe class to run multiple tasks in parallel,
+ * but tasks with same id has to be run in sequence.
+ */
+class SequencedTaskExecutorObserver : public ISequencedTaskExecutor
+{
+ ISequencedTaskExecutor &_executor;
+ std::atomic<uint32_t> _executeCnt;
+ std::atomic<uint32_t> _syncCnt;
+ std::vector<uint32_t> _executeHistory;
+ std::mutex _mutex;
+public:
+ using ISequencedTaskExecutor::getExecutorId;
+
+ SequencedTaskExecutorObserver(ISequencedTaskExecutor &executor);
+ ~SequencedTaskExecutorObserver() override;
+
+ void executeTask(ExecutorId id, vespalib::Executor::Task::UP task) override;
+ void sync() override;
+
+ uint32_t getExecuteCnt() const { return _executeCnt; }
+ uint32_t getSyncCnt() const { return _syncCnt; }
+ std::vector<uint32_t> getExecuteHistory();
+
+ void setTaskLimit(uint32_t taskLimit) override;
+
+ vespalib::ExecutorStats getStats() override;
+};
+
+} // namespace search