aboutsummaryrefslogtreecommitdiffstats
path: root/searchlib
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-04-04 22:20:35 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2020-04-04 22:35:17 +0000
commit416ff1764ce98954b3b15fcae0f6a50d76b38323 (patch)
tree8974071929be2d3723db0a14567dcbeb2f7a1797 /searchlib
parent130d4607a359ae2740bdeeb0179a731751f979a0 (diff)
Move sequenced task executors to staging vespalib
Diffstat (limited to 'searchlib')
-rw-r--r--searchlib/CMakeLists.txt2
-rw-r--r--searchlib/src/apps/tests/memoryindexstress_test.cpp10
-rw-r--r--searchlib/src/tests/common/foregroundtaskexecutor/.gitignore1
-rw-r--r--searchlib/src/tests/common/foregroundtaskexecutor/CMakeLists.txt8
-rw-r--r--searchlib/src/tests/common/foregroundtaskexecutor/foregroundtaskexecutor_test.cpp121
-rw-r--r--searchlib/src/tests/common/sequencedtaskexecutor/.gitignore2
-rw-r--r--searchlib/src/tests/common/sequencedtaskexecutor/CMakeLists.txt23
-rw-r--r--searchlib/src/tests/common/sequencedtaskexecutor/adaptive_sequenced_executor_test.cpp251
-rw-r--r--searchlib/src/tests/common/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp70
-rw-r--r--searchlib/src/tests/common/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp251
-rw-r--r--searchlib/src/tests/diskindex/fusion/fusion_test.cpp3
-rw-r--r--searchlib/src/tests/memoryindex/document_inverter/document_inverter_test.cpp5
-rw-r--r--searchlib/src/tests/memoryindex/field_index/field_index_test.cpp6
-rw-r--r--searchlib/src/tests/memoryindex/memory_index/memory_index_test.cpp14
-rw-r--r--searchlib/src/vespa/searchlib/common/CMakeLists.txt5
-rw-r--r--searchlib/src/vespa/searchlib/common/adaptive_sequenced_executor.cpp324
-rw-r--r--searchlib/src/vespa/searchlib/common/adaptive_sequenced_executor.h126
-rw-r--r--searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.cpp45
-rw-r--r--searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.h36
-rw-r--r--searchlib/src/vespa/searchlib/common/isequencedtaskexecutor.cpp45
-rw-r--r--searchlib/src/vespa/searchlib/common/isequencedtaskexecutor.h113
-rw-r--r--searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.cpp86
-rw-r--r--searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.h42
-rw-r--r--searchlib/src/vespa/searchlib/common/sequencedtaskexecutorobserver.cpp52
-rw-r--r--searchlib/src/vespa/searchlib/common/sequencedtaskexecutorobserver.h38
-rw-r--r--searchlib/src/vespa/searchlib/common/threaded_compactable_lid_space.cpp3
-rw-r--r--searchlib/src/vespa/searchlib/common/threaded_compactable_lid_space.h3
-rw-r--r--searchlib/src/vespa/searchlib/memoryindex/document_inverter.cpp2
-rw-r--r--searchlib/src/vespa/searchlib/memoryindex/document_inverter.h16
-rw-r--r--searchlib/src/vespa/searchlib/memoryindex/memory_index.cpp3
-rw-r--r--searchlib/src/vespa/searchlib/memoryindex/memory_index.h3
31 files changed, 41 insertions, 1668 deletions
diff --git a/searchlib/CMakeLists.txt b/searchlib/CMakeLists.txt
index a76baeced04..4237bede9d5 100644
--- a/searchlib/CMakeLists.txt
+++ b/searchlib/CMakeLists.txt
@@ -100,11 +100,9 @@ vespa_define_module(
src/tests/bitvector
src/tests/bytecomplens
src/tests/common/bitvector
- src/tests/common/foregroundtaskexecutor
src/tests/common/location
src/tests/common/matching_elements
src/tests/common/resultset
- src/tests/common/sequencedtaskexecutor
src/tests/common/struct_field_mapper
src/tests/common/summaryfeatures
src/tests/diskindex/bitvector
diff --git a/searchlib/src/apps/tests/memoryindexstress_test.cpp b/searchlib/src/apps/tests/memoryindexstress_test.cpp
index dc950b84508..6bbd93bda84 100644
--- a/searchlib/src/apps/tests/memoryindexstress_test.cpp
+++ b/searchlib/src/apps/tests/memoryindexstress_test.cpp
@@ -9,7 +9,6 @@
#include <vespa/document/repo/documenttyperepo.h>
#include <vespa/document/repo/fixedtyperepo.h>
#include <vespa/searchlib/common/scheduletaskcallback.h>
-#include <vespa/searchlib/common/sequencedtaskexecutor.h>
#include <vespa/searchlib/fef/matchdata.h>
#include <vespa/searchlib/fef/matchdatalayout.h>
#include <vespa/searchlib/fef/termfieldmatchdata.h>
@@ -24,6 +23,7 @@
#include <vespa/searchlib/util/rand48.h>
#include <vespa/vespalib/testkit/testapp.h>
#include <vespa/vespalib/util/threadstackexecutor.h>
+#include <vespa/vespalib/util/sequencedtaskexecutor.h>
#include <vespa/log/log.h>
LOG_SETUP("memoryindexstress_test");
@@ -195,8 +195,8 @@ struct Fixture {
Schema schema;
DocumentTypeRepo repo;
vespalib::ThreadStackExecutor _executor;
- std::unique_ptr<search::ISequencedTaskExecutor> _invertThreads;
- std::unique_ptr<search::ISequencedTaskExecutor> _pushThreads;
+ std::unique_ptr<vespalib::ISequencedTaskExecutor> _invertThreads;
+ std::unique_ptr<vespalib::ISequencedTaskExecutor> _pushThreads;
MemoryIndex index;
uint32_t _readThreads;
vespalib::ThreadStackExecutor _writer; // 1 write thread
@@ -247,8 +247,8 @@ Fixture::Fixture(uint32_t readThreads)
: schema(makeSchema()),
repo(makeDocTypeRepoConfig()),
_executor(1, 128 * 1024),
- _invertThreads(search::SequencedTaskExecutor::create(2)),
- _pushThreads(search::SequencedTaskExecutor::create(2)),
+ _invertThreads(vespalib::SequencedTaskExecutor::create(2)),
+ _pushThreads(vespalib::SequencedTaskExecutor::create(2)),
index(schema, MockFieldLengthInspector(), *_invertThreads, *_pushThreads),
_readThreads(readThreads),
_writer(1, 128 * 1024),
diff --git a/searchlib/src/tests/common/foregroundtaskexecutor/.gitignore b/searchlib/src/tests/common/foregroundtaskexecutor/.gitignore
deleted file mode 100644
index 0bd7759156b..00000000000
--- a/searchlib/src/tests/common/foregroundtaskexecutor/.gitignore
+++ /dev/null
@@ -1 +0,0 @@
-searchlib_foregroundtaskexecutor_test_app
diff --git a/searchlib/src/tests/common/foregroundtaskexecutor/CMakeLists.txt b/searchlib/src/tests/common/foregroundtaskexecutor/CMakeLists.txt
deleted file mode 100644
index 64354d54396..00000000000
--- a/searchlib/src/tests/common/foregroundtaskexecutor/CMakeLists.txt
+++ /dev/null
@@ -1,8 +0,0 @@
-# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-vespa_add_executable(searchlib_foregroundtaskexecutor_test_app TEST
- SOURCES
- foregroundtaskexecutor_test.cpp
- DEPENDS
- searchlib
-)
-vespa_add_test(NAME searchlib_foregroundtaskexecutor_test_app COMMAND searchlib_foregroundtaskexecutor_test_app)
diff --git a/searchlib/src/tests/common/foregroundtaskexecutor/foregroundtaskexecutor_test.cpp b/searchlib/src/tests/common/foregroundtaskexecutor/foregroundtaskexecutor_test.cpp
deleted file mode 100644
index 0cbd4bd9473..00000000000
--- a/searchlib/src/tests/common/foregroundtaskexecutor/foregroundtaskexecutor_test.cpp
+++ /dev/null
@@ -1,121 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include <vespa/searchlib/common/foregroundtaskexecutor.h>
-#include <vespa/vespalib/testkit/testapp.h>
-
-#include <mutex>
-#include <condition_variable>
-#include <unistd.h>
-
-#include <vespa/log/log.h>
-LOG_SETUP("foregroundtaskexecutor_test");
-
-namespace search::common {
-
-
-class Fixture
-{
-public:
- ForegroundTaskExecutor _threads;
-
- Fixture()
- : _threads()
- {
- }
-};
-
-
-class TestObj
-{
-public:
- std::mutex _m;
- std::condition_variable _cv;
- int _done;
- int _fail;
- int _val;
-
- TestObj()
- : _m(),
- _cv(),
- _done(0),
- _fail(0),
- _val(0)
- {
- }
-
- void
- modify(int oldValue, int newValue)
- {
- {
- std::lock_guard<std::mutex> guard(_m);
- if (_val == oldValue) {
- _val = newValue;
- } else {
- ++_fail;
- }
- ++_done;
- }
- _cv.notify_all();
- }
-
- void
- wait(int wantDone)
- {
- std::unique_lock<std::mutex> guard(_m);
- _cv.wait(guard, [=] { return this->_done >= wantDone; });
- }
-};
-
-TEST_F("testExecute", Fixture) {
- std::shared_ptr<TestObj> tv(std::make_shared<TestObj>());
- EXPECT_EQUAL(0, tv->_val);
- f._threads.execute(1, [=]() { tv->modify(0, 42); });
- tv->wait(1);
- EXPECT_EQUAL(0, tv->_fail);
- EXPECT_EQUAL(42, tv->_val);
- f._threads.sync();
- EXPECT_EQUAL(0, tv->_fail);
- EXPECT_EQUAL(42, tv->_val);
-}
-
-
-TEST_F("require that task with same id are serialized", Fixture)
-{
- std::shared_ptr<TestObj> tv(std::make_shared<TestObj>());
- EXPECT_EQUAL(0, tv->_val);
- f._threads.execute(0, [=]() { usleep(2000); tv->modify(0, 14); });
- f._threads.execute(0, [=]() { tv->modify(14, 42); });
- tv->wait(2);
- EXPECT_EQUAL(0, tv->_fail);
- EXPECT_EQUAL(42, tv->_val);
- f._threads.sync();
- EXPECT_EQUAL(0, tv->_fail);
- EXPECT_EQUAL(42, tv->_val);
-}
-
-TEST_F("require that task with different ids are serialized", Fixture)
-{
- int tryCnt = 0;
- for (tryCnt = 0; tryCnt < 100; ++tryCnt) {
- std::shared_ptr<TestObj> tv(std::make_shared<TestObj>());
- EXPECT_EQUAL(0, tv->_val);
- f._threads.execute(0, [=]() { usleep(2000); tv->modify(0, 14); });
- f._threads.execute(1, [=]() { tv->modify(14, 42); });
- tv->wait(2);
- if (tv->_fail != 1) {
- continue;
- }
- EXPECT_EQUAL(1, tv->_fail);
- EXPECT_EQUAL(14, tv->_val);
- f._threads.sync();
- EXPECT_EQUAL(1, tv->_fail);
- EXPECT_EQUAL(14, tv->_val);
- break;
- }
- EXPECT_TRUE(tryCnt >= 100);
-}
-
-
-}
-
-TEST_MAIN() { TEST_RUN_ALL(); }
diff --git a/searchlib/src/tests/common/sequencedtaskexecutor/.gitignore b/searchlib/src/tests/common/sequencedtaskexecutor/.gitignore
deleted file mode 100644
index 4bd94f124fb..00000000000
--- a/searchlib/src/tests/common/sequencedtaskexecutor/.gitignore
+++ /dev/null
@@ -1,2 +0,0 @@
-searchlib_sequencedtaskexecutor_test_app
-searchlib_sequencedtaskexecutor_benchmark_app
diff --git a/searchlib/src/tests/common/sequencedtaskexecutor/CMakeLists.txt b/searchlib/src/tests/common/sequencedtaskexecutor/CMakeLists.txt
deleted file mode 100644
index fd6cd9efc43..00000000000
--- a/searchlib/src/tests/common/sequencedtaskexecutor/CMakeLists.txt
+++ /dev/null
@@ -1,23 +0,0 @@
-# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-vespa_add_executable(searchlib_sequencedtaskexecutor_benchmark_app TEST
- SOURCES
- sequencedtaskexecutor_benchmark.cpp
- DEPENDS
- searchlib
-)
-
-vespa_add_executable(searchlib_sequencedtaskexecutor_test_app TEST
- SOURCES
- sequencedtaskexecutor_test.cpp
- DEPENDS
- searchlib
-)
-vespa_add_test(NAME searchlib_sequencedtaskexecutor_test_app COMMAND searchlib_sequencedtaskexecutor_test_app)
-
-vespa_add_executable(searchlib_adaptive_sequenced_executor_test_app TEST
- SOURCES
- adaptive_sequenced_executor_test.cpp
- DEPENDS
- searchlib
-)
-vespa_add_test(NAME searchlib_adaptive_sequenced_executor_test_app COMMAND searchlib_adaptive_sequenced_executor_test_app)
diff --git a/searchlib/src/tests/common/sequencedtaskexecutor/adaptive_sequenced_executor_test.cpp b/searchlib/src/tests/common/sequencedtaskexecutor/adaptive_sequenced_executor_test.cpp
deleted file mode 100644
index ba66b28108c..00000000000
--- a/searchlib/src/tests/common/sequencedtaskexecutor/adaptive_sequenced_executor_test.cpp
+++ /dev/null
@@ -1,251 +0,0 @@
-// Copyright 2020 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include <vespa/searchlib/common/adaptive_sequenced_executor.h>
-#include <vespa/vespalib/testkit/testapp.h>
-#include <vespa/vespalib/test/insertion_operators.h>
-
-#include <mutex>
-#include <condition_variable>
-#include <unistd.h>
-
-#include <vespa/log/log.h>
-LOG_SETUP("adaptive_sequenced_executor_test");
-
-namespace search::common {
-
-
-class Fixture
-{
-public:
- AdaptiveSequencedExecutor _threads;
-
- Fixture() : _threads(2, 2, 0, 1000) { }
-};
-
-
-class TestObj
-{
-public:
- std::mutex _m;
- std::condition_variable _cv;
- int _done;
- int _fail;
- int _val;
-
- TestObj()
- : _m(),
- _cv(),
- _done(0),
- _fail(0),
- _val(0)
- {
- }
-
- void
- modify(int oldValue, int newValue)
- {
- {
- std::lock_guard<std::mutex> guard(_m);
- if (_val == oldValue) {
- _val = newValue;
- } else {
- ++_fail;
- }
- ++_done;
- }
- _cv.notify_all();
- }
-
- void
- wait(int wantDone)
- {
- std::unique_lock<std::mutex> guard(_m);
- _cv.wait(guard, [&] { return this->_done >= wantDone; });
- }
-};
-
-TEST_F("testExecute", Fixture) {
- std::shared_ptr<TestObj> tv(std::make_shared<TestObj>());
- EXPECT_EQUAL(0, tv->_val);
- f._threads.execute(1, [&]() { tv->modify(0, 42); });
- tv->wait(1);
- EXPECT_EQUAL(0, tv->_fail);
- EXPECT_EQUAL(42, tv->_val);
- f._threads.sync();
- EXPECT_EQUAL(0, tv->_fail);
- EXPECT_EQUAL(42, tv->_val);
-}
-
-
-TEST_F("require that task with same component id are serialized", Fixture)
-{
- std::shared_ptr<TestObj> tv(std::make_shared<TestObj>());
- EXPECT_EQUAL(0, tv->_val);
- f._threads.execute(0, [&]() { usleep(2000); tv->modify(0, 14); });
- f._threads.execute(0, [&]() { tv->modify(14, 42); });
- tv->wait(2);
- EXPECT_EQUAL(0, tv->_fail);
- EXPECT_EQUAL(42, tv->_val);
- f._threads.sync();
- EXPECT_EQUAL(0, tv->_fail);
- EXPECT_EQUAL(42, tv->_val);
-}
-
-TEST_F("require that task with different component ids are not serialized", Fixture)
-{
- int tryCnt = 0;
- for (tryCnt = 0; tryCnt < 100; ++tryCnt) {
- std::shared_ptr<TestObj> tv(std::make_shared<TestObj>());
- EXPECT_EQUAL(0, tv->_val);
- f._threads.execute(0, [&]() { usleep(2000); tv->modify(0, 14); });
- f._threads.execute(2, [&]() { tv->modify(14, 42); });
- tv->wait(2);
- if (tv->_fail != 1) {
- continue;
- }
- EXPECT_EQUAL(1, tv->_fail);
- EXPECT_EQUAL(14, tv->_val);
- f._threads.sync();
- EXPECT_EQUAL(1, tv->_fail);
- EXPECT_EQUAL(14, tv->_val);
- break;
- }
- EXPECT_TRUE(tryCnt < 100);
-}
-
-
-TEST_F("require that task with same string component id are serialized", Fixture)
-{
- std::shared_ptr<TestObj> tv(std::make_shared<TestObj>());
- EXPECT_EQUAL(0, tv->_val);
- auto test2 = [&]() { tv->modify(14, 42); };
- f._threads.execute(f._threads.getExecutorId("0"), [&]() { usleep(2000); tv->modify(0, 14); });
- f._threads.execute(f._threads.getExecutorId("0"), test2);
- tv->wait(2);
- EXPECT_EQUAL(0, tv->_fail);
- EXPECT_EQUAL(42, tv->_val);
- f._threads.sync();
- EXPECT_EQUAL(0, tv->_fail);
- EXPECT_EQUAL(42, tv->_val);
-}
-
-namespace {
-
-int detectSerializeFailure(Fixture &f, vespalib::stringref altComponentId, int tryLimit)
-{
- int tryCnt = 0;
- for (tryCnt = 0; tryCnt < tryLimit; ++tryCnt) {
- std::shared_ptr<TestObj> tv(std::make_shared<TestObj>());
- EXPECT_EQUAL(0, tv->_val);
- f._threads.execute(f._threads.getExecutorId("0"), [&]() { usleep(2000); tv->modify(0, 14); });
- f._threads.execute(f._threads.getExecutorId(altComponentId), [&]() { tv->modify(14, 42); });
- tv->wait(2);
- if (tv->_fail != 1) {
- continue;
- }
- EXPECT_EQUAL(1, tv->_fail);
- EXPECT_EQUAL(14, tv->_val);
- f._threads.sync();
- EXPECT_EQUAL(1, tv->_fail);
- EXPECT_EQUAL(14, tv->_val);
- break;
- }
- return tryCnt;
-}
-
-vespalib::string makeAltComponentId(Fixture &f)
-{
- int tryCnt = 0;
- char altComponentId[20];
- ISequencedTaskExecutor::ExecutorId executorId0 = f._threads.getExecutorId("0");
- for (tryCnt = 1; tryCnt < 100; ++tryCnt) {
- sprintf(altComponentId, "%d", tryCnt);
- if (f._threads.getExecutorId(altComponentId) == executorId0) {
- break;
- }
- }
- EXPECT_TRUE(tryCnt < 100);
- return altComponentId;
-}
-
-}
-
-TEST_F("require that task with different string component ids are not serialized", Fixture)
-{
- int tryCnt = detectSerializeFailure(f, "2", 100);
- EXPECT_TRUE(tryCnt < 100);
-}
-
-
-TEST_F("require that task with different string component ids mapping to the same executor id are serialized",
- Fixture)
-{
- vespalib::string altComponentId = makeAltComponentId(f);
- LOG(info, "second string component id is \"%s\"", altComponentId.c_str());
- int tryCnt = detectSerializeFailure(f, altComponentId, 100);
- EXPECT_TRUE(tryCnt == 100);
-}
-
-
-TEST_F("require that execute works with const lambda", Fixture)
-{
- int i = 5;
- std::vector<int> res;
- const auto lambda = [i, &res]() mutable
- { res.push_back(i--); res.push_back(i--); };
- f._threads.execute(0, lambda);
- f._threads.execute(0, lambda);
- f._threads.sync();
- std::vector<int> exp({5, 4, 5, 4});
- EXPECT_EQUAL(exp, res);
- EXPECT_EQUAL(5, i);
-}
-
-TEST_F("require that execute works with reference to lambda", Fixture)
-{
- int i = 5;
- std::vector<int> res;
- auto lambda = [i, &res]() mutable
- { res.push_back(i--); res.push_back(i--); };
- auto &lambdaref = lambda;
- f._threads.execute(0, lambdaref);
- f._threads.execute(0, lambdaref);
- f._threads.sync();
- std::vector<int> exp({5, 4, 5, 4});
- EXPECT_EQUAL(exp, res);
- EXPECT_EQUAL(5, i);
-}
-
-TEST_F("require that executeLambda works", Fixture)
-{
- int i = 5;
- std::vector<int> res;
- const auto lambda = [i, &res]() mutable
- { res.push_back(i--); res.push_back(i--); };
- f._threads.executeLambda(ISequencedTaskExecutor::ExecutorId(0), lambda);
- f._threads.sync();
- std::vector<int> exp({5, 4});
- EXPECT_EQUAL(exp, res);
- EXPECT_EQUAL(5, i);
-}
-
-TEST("require that you get correct number of executors") {
- AdaptiveSequencedExecutor seven(7, 1, 0, 10);
- EXPECT_EQUAL(7u, seven.getNumExecutors());
-}
-
-TEST("require that you distribute well") {
- AdaptiveSequencedExecutor seven(7, 1, 0, 10);
- EXPECT_EQUAL(7u, seven.getNumExecutors());
- EXPECT_EQUAL(97u, seven.getComponentHashSize());
- EXPECT_EQUAL(0u, seven.getComponentEffectiveHashSize());
- for (uint32_t id=0; id < 1000; id++) {
- EXPECT_EQUAL((id%97)%7, seven.getExecutorId(id).getId());
- }
- EXPECT_EQUAL(97u, seven.getComponentHashSize());
- EXPECT_EQUAL(97u, seven.getComponentEffectiveHashSize());
-}
-
-}
-
-TEST_MAIN() { TEST_RUN_ALL(); }
diff --git a/searchlib/src/tests/common/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp b/searchlib/src/tests/common/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp
deleted file mode 100644
index 362dc28d36a..00000000000
--- a/searchlib/src/tests/common/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp
+++ /dev/null
@@ -1,70 +0,0 @@
-// Copyright 2020 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include <vespa/searchlib/common/sequencedtaskexecutor.h>
-#include <vespa/searchlib/common/adaptive_sequenced_executor.h>
-#include <vespa/vespalib/util/lambdatask.h>
-#include <vespa/vespalib/util/time.h>
-#include <atomic>
-
-using search::ISequencedTaskExecutor;
-using search::SequencedTaskExecutor;
-using search::AdaptiveSequencedExecutor;
-using ExecutorId = search::ISequencedTaskExecutor::ExecutorId;
-
-size_t do_work(size_t size) {
- size_t ret = 0;
- for (size_t i = 0; i < size; ++i) {
- for (size_t j = 0; j < 128; ++j) {
- ret = (ret + i) * j;
- }
- }
- return ret;
-}
-
-struct SimpleParams {
- int argc;
- char **argv;
- int idx;
- SimpleParams(int argc_in, char **argv_in) : argc(argc_in), argv(argv_in), idx(0) {}
- int next(const char *name, int fallback) {
- ++idx;
- int value = 0;
- if (argc > idx) {
- value = atoi(argv[idx]);
- } else {
- value = fallback;
- }
- fprintf(stderr, "param %s: %d\n", name, value);
- return value;
- }
-};
-
-int main(int argc, char **argv) {
- SimpleParams params(argc, argv);
- bool use_adaptive_executor = params.next("use_adaptive_executor", 0);
- bool optimize_for_throughput = params.next("optimize_for_throughput", 0);
- size_t num_tasks = params.next("num_tasks", 1000000);
- size_t num_strands = params.next("num_strands", 4);
- size_t task_limit = params.next("task_limit", 1000);
- size_t num_threads = params.next("num_threads", num_strands);
- size_t max_waiting = params.next("max_waiting", optimize_for_throughput ? 32 : 0);
- size_t work_size = params.next("work_size", 0);
- std::atomic<long> counter(0);
- std::unique_ptr<ISequencedTaskExecutor> executor;
- if (use_adaptive_executor) {
- executor = std::make_unique<AdaptiveSequencedExecutor>(num_strands, num_threads, max_waiting, task_limit);
- } else {
- auto optimize = optimize_for_throughput
- ? vespalib::Executor::OptimizeFor::THROUGHPUT
- : vespalib::Executor::OptimizeFor::LATENCY;
- executor = SequencedTaskExecutor::create(num_strands, task_limit, optimize);
- }
- vespalib::Timer timer;
- for (size_t task_id = 0; task_id < num_tasks; ++task_id) {
- executor->executeTask(ExecutorId(task_id % num_strands),
- vespalib::makeLambdaTask([&counter,work_size] { (void) do_work(work_size); counter++; }));
- }
- executor.reset();
- fprintf(stderr, "\ntotal time: %zu ms\n", vespalib::count_ms(timer.elapsed()));
- return (size_t(counter) == num_tasks) ? 0 : 1;
-}
diff --git a/searchlib/src/tests/common/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp b/searchlib/src/tests/common/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp
deleted file mode 100644
index c311a59a56c..00000000000
--- a/searchlib/src/tests/common/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp
+++ /dev/null
@@ -1,251 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include <vespa/searchlib/common/sequencedtaskexecutor.h>
-#include <vespa/vespalib/testkit/testapp.h>
-#include <vespa/vespalib/test/insertion_operators.h>
-
-#include <mutex>
-#include <condition_variable>
-#include <unistd.h>
-
-#include <vespa/log/log.h>
-LOG_SETUP("sequencedtaskexecutor_test");
-
-namespace search::common {
-
-
-class Fixture
-{
-public:
- std::unique_ptr<ISequencedTaskExecutor> _threads;
-
- Fixture() : _threads(SequencedTaskExecutor::create(2)) { }
-};
-
-
-class TestObj
-{
-public:
- std::mutex _m;
- std::condition_variable _cv;
- int _done;
- int _fail;
- int _val;
-
- TestObj()
- : _m(),
- _cv(),
- _done(0),
- _fail(0),
- _val(0)
- {
- }
-
- void
- modify(int oldValue, int newValue)
- {
- {
- std::lock_guard<std::mutex> guard(_m);
- if (_val == oldValue) {
- _val = newValue;
- } else {
- ++_fail;
- }
- ++_done;
- }
- _cv.notify_all();
- }
-
- void
- wait(int wantDone)
- {
- std::unique_lock<std::mutex> guard(_m);
- _cv.wait(guard, [=] { return this->_done >= wantDone; });
- }
-};
-
-TEST_F("testExecute", Fixture) {
- std::shared_ptr<TestObj> tv(std::make_shared<TestObj>());
- EXPECT_EQUAL(0, tv->_val);
- f._threads->execute(1, [=]() { tv->modify(0, 42); });
- tv->wait(1);
- EXPECT_EQUAL(0, tv->_fail);
- EXPECT_EQUAL(42, tv->_val);
- f._threads->sync();
- EXPECT_EQUAL(0, tv->_fail);
- EXPECT_EQUAL(42, tv->_val);
-}
-
-
-TEST_F("require that task with same component id are serialized", Fixture)
-{
- std::shared_ptr<TestObj> tv(std::make_shared<TestObj>());
- EXPECT_EQUAL(0, tv->_val);
- f._threads->execute(0, [=]() { usleep(2000); tv->modify(0, 14); });
- f._threads->execute(0, [=]() { tv->modify(14, 42); });
- tv->wait(2);
- EXPECT_EQUAL(0, tv->_fail);
- EXPECT_EQUAL(42, tv->_val);
- f._threads->sync();
- EXPECT_EQUAL(0, tv->_fail);
- EXPECT_EQUAL(42, tv->_val);
-}
-
-TEST_F("require that task with different component ids are not serialized", Fixture)
-{
- int tryCnt = 0;
- for (tryCnt = 0; tryCnt < 100; ++tryCnt) {
- std::shared_ptr<TestObj> tv(std::make_shared<TestObj>());
- EXPECT_EQUAL(0, tv->_val);
- f._threads->execute(0, [=]() { usleep(2000); tv->modify(0, 14); });
- f._threads->execute(2, [=]() { tv->modify(14, 42); });
- tv->wait(2);
- if (tv->_fail != 1) {
- continue;
- }
- EXPECT_EQUAL(1, tv->_fail);
- EXPECT_EQUAL(14, tv->_val);
- f._threads->sync();
- EXPECT_EQUAL(1, tv->_fail);
- EXPECT_EQUAL(14, tv->_val);
- break;
- }
- EXPECT_TRUE(tryCnt < 100);
-}
-
-
-TEST_F("require that task with same string component id are serialized", Fixture)
-{
- std::shared_ptr<TestObj> tv(std::make_shared<TestObj>());
- EXPECT_EQUAL(0, tv->_val);
- auto test2 = [=]() { tv->modify(14, 42); };
- f._threads->execute(f._threads->getExecutorId("0"), [=]() { usleep(2000); tv->modify(0, 14); });
- f._threads->execute(f._threads->getExecutorId("0"), test2);
- tv->wait(2);
- EXPECT_EQUAL(0, tv->_fail);
- EXPECT_EQUAL(42, tv->_val);
- f._threads->sync();
- EXPECT_EQUAL(0, tv->_fail);
- EXPECT_EQUAL(42, tv->_val);
-}
-
-namespace {
-
-int detectSerializeFailure(Fixture &f, vespalib::stringref altComponentId, int tryLimit)
-{
- int tryCnt = 0;
- for (tryCnt = 0; tryCnt < tryLimit; ++tryCnt) {
- std::shared_ptr<TestObj> tv(std::make_shared<TestObj>());
- EXPECT_EQUAL(0, tv->_val);
- f._threads->execute(f._threads->getExecutorId("0"), [=]() { usleep(2000); tv->modify(0, 14); });
- f._threads->execute(f._threads->getExecutorId(altComponentId), [=]() { tv->modify(14, 42); });
- tv->wait(2);
- if (tv->_fail != 1) {
- continue;
- }
- EXPECT_EQUAL(1, tv->_fail);
- EXPECT_EQUAL(14, tv->_val);
- f._threads->sync();
- EXPECT_EQUAL(1, tv->_fail);
- EXPECT_EQUAL(14, tv->_val);
- break;
- }
- return tryCnt;
-}
-
-vespalib::string makeAltComponentId(Fixture &f)
-{
- int tryCnt = 0;
- char altComponentId[20];
- ISequencedTaskExecutor::ExecutorId executorId0 = f._threads->getExecutorId("0");
- for (tryCnt = 1; tryCnt < 100; ++tryCnt) {
- sprintf(altComponentId, "%d", tryCnt);
- if (f._threads->getExecutorId(altComponentId) == executorId0) {
- break;
- }
- }
- EXPECT_TRUE(tryCnt < 100);
- return altComponentId;
-}
-
-}
-
-TEST_F("require that task with different string component ids are not serialized", Fixture)
-{
- int tryCnt = detectSerializeFailure(f, "2", 100);
- EXPECT_TRUE(tryCnt < 100);
-}
-
-
-TEST_F("require that task with different string component ids mapping to the same executor id are serialized",
- Fixture)
-{
- vespalib::string altComponentId = makeAltComponentId(f);
- LOG(info, "second string component id is \"%s\"", altComponentId.c_str());
- int tryCnt = detectSerializeFailure(f, altComponentId, 100);
- EXPECT_TRUE(tryCnt == 100);
-}
-
-
-TEST_F("require that execute works with const lambda", Fixture)
-{
- int i = 5;
- std::vector<int> res;
- const auto lambda = [i, &res]() mutable
- { res.push_back(i--); res.push_back(i--); };
- f._threads->execute(0, lambda);
- f._threads->execute(0, lambda);
- f._threads->sync();
- std::vector<int> exp({5, 4, 5, 4});
- EXPECT_EQUAL(exp, res);
- EXPECT_EQUAL(5, i);
-}
-
-TEST_F("require that execute works with reference to lambda", Fixture)
-{
- int i = 5;
- std::vector<int> res;
- auto lambda = [i, &res]() mutable
- { res.push_back(i--); res.push_back(i--); };
- auto &lambdaref = lambda;
- f._threads->execute(0, lambdaref);
- f._threads->execute(0, lambdaref);
- f._threads->sync();
- std::vector<int> exp({5, 4, 5, 4});
- EXPECT_EQUAL(exp, res);
- EXPECT_EQUAL(5, i);
-}
-
-TEST_F("require that executeLambda works", Fixture)
-{
- int i = 5;
- std::vector<int> res;
- const auto lambda = [i, &res]() mutable
- { res.push_back(i--); res.push_back(i--); };
- f._threads->executeLambda(ISequencedTaskExecutor::ExecutorId(0), lambda);
- f._threads->sync();
- std::vector<int> exp({5, 4});
- EXPECT_EQUAL(exp, res);
- EXPECT_EQUAL(5, i);
-}
-
-TEST("require that you get correct number of executors") {
- auto seven = SequencedTaskExecutor::create(7);
- EXPECT_EQUAL(7u, seven->getNumExecutors());
-}
-
-TEST("require that you distribute well") {
- auto seven = SequencedTaskExecutor::create(7);
- EXPECT_EQUAL(7u, seven->getNumExecutors());
- EXPECT_EQUAL(97u, seven->getComponentHashSize());
- EXPECT_EQUAL(0u, seven->getComponentEffectiveHashSize());
- for (uint32_t id=0; id < 1000; id++) {
- EXPECT_EQUAL((id%97)%7, seven->getExecutorId(id).getId());
- }
- EXPECT_EQUAL(97u, seven->getComponentHashSize());
- EXPECT_EQUAL(97u, seven->getComponentEffectiveHashSize());
-}
-
-}
-
-TEST_MAIN() { TEST_RUN_ALL(); }
diff --git a/searchlib/src/tests/diskindex/fusion/fusion_test.cpp b/searchlib/src/tests/diskindex/fusion/fusion_test.cpp
index 92d0659d984..c2fd048ce3e 100644
--- a/searchlib/src/tests/diskindex/fusion/fusion_test.cpp
+++ b/searchlib/src/tests/diskindex/fusion/fusion_test.cpp
@@ -1,6 +1,5 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include <vespa/searchlib/common/sequencedtaskexecutor.h>
#include <vespa/searchlib/diskindex/diskindex.h>
#include <vespa/searchlib/diskindex/fusion.h>
#include <vespa/searchlib/diskindex/indexbuilder.h>
@@ -19,6 +18,7 @@
#include <vespa/vespalib/btree/btreeroot.hpp>
#include <vespa/vespalib/io/fileutil.h>
#include <vespa/vespalib/util/threadstackexecutor.h>
+#include <vespa/vespalib/util/sequencedtaskexecutor.h>
#include <gtest/gtest.h>
#include <vespa/log/log.h>
@@ -37,6 +37,7 @@ using search::common::FileHeaderContext;
using search::index::schema::CollectionType;
using search::index::schema::DataType;
using search::index::test::MockFieldLengthInspector;
+using vespalib::SequencedTaskExecutor;
using namespace index;
diff --git a/searchlib/src/tests/memoryindex/document_inverter/document_inverter_test.cpp b/searchlib/src/tests/memoryindex/document_inverter/document_inverter_test.cpp
index 60b21699406..58f800dec23 100644
--- a/searchlib/src/tests/memoryindex/document_inverter/document_inverter_test.cpp
+++ b/searchlib/src/tests/memoryindex/document_inverter/document_inverter_test.cpp
@@ -1,6 +1,5 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include <vespa/searchlib/common/sequencedtaskexecutor.h>
#include <vespa/searchlib/index/docbuilder.h>
#include <vespa/searchlib/index/field_length_calculator.h>
#include <vespa/searchlib/memoryindex/document_inverter.h>
@@ -9,6 +8,8 @@
#include <vespa/searchlib/memoryindex/i_field_index_collection.h>
#include <vespa/searchlib/memoryindex/word_store.h>
#include <vespa/searchlib/test/memoryindex/ordered_field_index_inserter.h>
+#include <vespa/vespalib/util/sequencedtaskexecutor.h>
+
#include <vespa/vespalib/gtest/gtest.h>
namespace search {
@@ -18,6 +19,8 @@ using index::DocBuilder;
using index::Schema;
using index::schema::CollectionType;
using index::schema::DataType;
+using vespalib::SequencedTaskExecutor;
+using vespalib::ISequencedTaskExecutor;
using namespace index;
diff --git a/searchlib/src/tests/memoryindex/field_index/field_index_test.cpp b/searchlib/src/tests/memoryindex/field_index/field_index_test.cpp
index 7a0c240dea5..ed6bd7168f2 100644
--- a/searchlib/src/tests/memoryindex/field_index/field_index_test.cpp
+++ b/searchlib/src/tests/memoryindex/field_index/field_index_test.cpp
@@ -1,6 +1,5 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include <vespa/searchlib/common/sequencedtaskexecutor.h>
#include <vespa/searchlib/diskindex/fusion.h>
#include <vespa/searchlib/diskindex/indexbuilder.h>
#include <vespa/searchlib/diskindex/zcposoccrandread.h>
@@ -19,6 +18,8 @@
#include <vespa/searchlib/test/memoryindex/wrap_inserter.h>
#include <vespa/vespalib/btree/btreenodeallocator.hpp>
#include <vespa/vespalib/btree/btreeroot.hpp>
+#include <vespa/vespalib/util/sequencedtaskexecutor.h>
+
#include <vespa/vespalib/gtest/gtest.h>
#include <vespa/log/log.h>
@@ -38,6 +39,9 @@ using search::index::schema::CollectionType;
using search::index::schema::DataType;
using search::index::test::MockFieldLengthInspector;
using vespalib::GenerationHandler;
+using vespalib::ISequencedTaskExecutor;
+using vespalib::SequencedTaskExecutor;
+
namespace memoryindex {
diff --git a/searchlib/src/tests/memoryindex/memory_index/memory_index_test.cpp b/searchlib/src/tests/memoryindex/memory_index/memory_index_test.cpp
index 5032ed2c179..6476ac7cf8a 100644
--- a/searchlib/src/tests/memoryindex/memory_index/memory_index_test.cpp
+++ b/searchlib/src/tests/memoryindex/memory_index/memory_index_test.cpp
@@ -1,7 +1,6 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <vespa/searchlib/common/scheduletaskcallback.h>
-#include <vespa/searchlib/common/sequencedtaskexecutor.h>
#include <vespa/searchlib/fef/matchdata.h>
#include <vespa/searchlib/fef/matchdatalayout.h>
#include <vespa/searchlib/fef/termfieldmatchdata.h>
@@ -14,9 +13,10 @@
#include <vespa/searchlib/queryeval/fake_search.h>
#include <vespa/searchlib/queryeval/fake_searchable.h>
#include <vespa/searchlib/queryeval/searchiterator.h>
-#include <vespa/vespalib/gtest/gtest.h>
#include <vespa/vespalib/util/stringfmt.h>
#include <vespa/vespalib/util/threadstackexecutor.h>
+#include <vespa/vespalib/util/sequencedtaskexecutor.h>
+#include <vespa/vespalib/gtest/gtest.h>
#include <vespa/log/log.h>
LOG_SETUP("memory_index_test");
@@ -31,6 +31,8 @@ using vespalib::makeLambdaTask;
using search::query::Node;
using search::query::SimplePhrase;
using search::query::SimpleStringTerm;
+using vespalib::ISequencedTaskExecutor;
+using vespalib::SequencedTaskExecutor;
using namespace search::fef;
using namespace search::index;
using namespace search::memoryindex;
@@ -64,8 +66,8 @@ struct MySetup : public IFieldLengthInspector {
struct Index {
Schema schema;
vespalib::ThreadStackExecutor _executor;
- std::unique_ptr<search::ISequencedTaskExecutor> _invertThreads;
- std::unique_ptr<search::ISequencedTaskExecutor> _pushThreads;
+ std::unique_ptr<ISequencedTaskExecutor> _invertThreads;
+ std::unique_ptr<ISequencedTaskExecutor> _pushThreads;
MemoryIndex index;
DocBuilder builder;
uint32_t docid;
@@ -123,8 +125,8 @@ private:
Index::Index(const MySetup &setup)
: schema(setup.schema),
_executor(1, 128 * 1024),
- _invertThreads(search::SequencedTaskExecutor::create(2)),
- _pushThreads(search::SequencedTaskExecutor::create(2)),
+ _invertThreads(SequencedTaskExecutor::create(2)),
+ _pushThreads(SequencedTaskExecutor::create(2)),
index(schema, setup, *_invertThreads, *_pushThreads),
builder(schema),
docid(1),
diff --git a/searchlib/src/vespa/searchlib/common/CMakeLists.txt b/searchlib/src/vespa/searchlib/common/CMakeLists.txt
index f4a9e27b79d..36264a2035b 100644
--- a/searchlib/src/vespa/searchlib/common/CMakeLists.txt
+++ b/searchlib/src/vespa/searchlib/common/CMakeLists.txt
@@ -1,7 +1,6 @@
# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
vespa_add_library(searchlib_common OBJECT
SOURCES
- adaptive_sequenced_executor.cpp
allocatedbitvector.cpp
bitvector.cpp
bitvectorcache.cpp
@@ -12,11 +11,9 @@ vespa_add_library(searchlib_common OBJECT
documentsummary.cpp
featureset.cpp
fileheadercontext.cpp
- foregroundtaskexecutor.cpp
gatecallback.cpp
growablebitvector.cpp
indexmetainfo.cpp
- isequencedtaskexecutor.cpp
location.cpp
locationiterators.cpp
mapnames.cpp
@@ -24,8 +21,6 @@ vespa_add_library(searchlib_common OBJECT
packets.cpp
partialbitvector.cpp
resultset.cpp
- sequencedtaskexecutor.cpp
- sequencedtaskexecutorobserver.cpp
serialnumfileheadercontext.cpp
sort.cpp
sortdata.cpp
diff --git a/searchlib/src/vespa/searchlib/common/adaptive_sequenced_executor.cpp b/searchlib/src/vespa/searchlib/common/adaptive_sequenced_executor.cpp
deleted file mode 100644
index f31172b1eba..00000000000
--- a/searchlib/src/vespa/searchlib/common/adaptive_sequenced_executor.cpp
+++ /dev/null
@@ -1,324 +0,0 @@
-// Copyright 2020 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include "adaptive_sequenced_executor.h"
-
-namespace search {
-
-//-----------------------------------------------------------------------------
-
-AdaptiveSequencedExecutor::Strand::Strand()
- : state(State::IDLE),
- queue()
-{
-}
-
-AdaptiveSequencedExecutor::Strand::~Strand()
-{
- assert(queue.empty());
-}
-
-//-----------------------------------------------------------------------------
-
-AdaptiveSequencedExecutor::Worker::Worker()
- : cond(),
- state(State::RUNNING),
- strand(nullptr)
-{
-}
-
-AdaptiveSequencedExecutor::Worker::~Worker()
-{
- assert(state == State::DONE);
- assert(strand == nullptr);
-}
-
-//-----------------------------------------------------------------------------
-
-AdaptiveSequencedExecutor::Self::Self()
- : cond(),
- state(State::OPEN),
- waiting_tasks(0),
- pending_tasks(0)
-{
-}
-
-AdaptiveSequencedExecutor::Self::~Self()
-{
- assert(state == State::CLOSED);
- assert(waiting_tasks == 0);
- assert(pending_tasks == 0);
-}
-
-//-----------------------------------------------------------------------------
-
-AdaptiveSequencedExecutor::ThreadTools::ThreadTools(AdaptiveSequencedExecutor &parent_in)
- : parent(parent_in),
- pool(std::make_unique<FastOS_ThreadPool>(STACK_SIZE)),
- allow_worker_exit()
-{
-}
-
-AdaptiveSequencedExecutor::ThreadTools::~ThreadTools()
-{
- assert(pool->isClosed());
-}
-
-void
-AdaptiveSequencedExecutor::ThreadTools::Run(FastOS_ThreadInterface *, void *)
-{
- parent.worker_main();
-}
-
-void
-AdaptiveSequencedExecutor::ThreadTools::start(size_t num_threads)
-{
- for (size_t i = 0; i < num_threads; ++i) {
- FastOS_ThreadInterface *thread = pool->NewThread(this);
- assert(thread != nullptr);
- (void)thread;
- }
-}
-
-void
-AdaptiveSequencedExecutor::ThreadTools::close()
-{
- allow_worker_exit.countDown();
- pool->Close();
-}
-
-//-----------------------------------------------------------------------------
-
-void
-AdaptiveSequencedExecutor::maybe_block_self(std::unique_lock<std::mutex> &lock)
-{
- while (_self.state == Self::State::BLOCKED) {
- _self.cond.wait(lock);
- }
- while ((_self.state == Self::State::OPEN) && (_self.pending_tasks >= _cfg.max_pending)) {
- _self.state = Self::State::BLOCKED;
- while (_self.state == Self::State::BLOCKED) {
- _self.cond.wait(lock);
- }
- }
-}
-
-bool
-AdaptiveSequencedExecutor::maybe_unblock_self(const std::unique_lock<std::mutex> &)
-{
- if ((_self.state == Self::State::BLOCKED) && (_self.pending_tasks < _cfg.wakeup_limit)) {
- _self.state = Self::State::OPEN;
- return true;
- }
- return false;
-}
-
-AdaptiveSequencedExecutor::Worker *
-AdaptiveSequencedExecutor::get_worker_to_wake(const std::unique_lock<std::mutex> &)
-{
- if ((_self.waiting_tasks > _cfg.max_waiting) && (!_worker_stack.empty())) {
- assert(!_wait_queue.empty());
- Worker *worker = _worker_stack.back();
- _worker_stack.popBack();
- assert(worker->state == Worker::State::BLOCKED);
- assert(worker->strand == nullptr);
- worker->state = Worker::State::RUNNING;
- worker->strand = _wait_queue.front();
- _wait_queue.pop();
- assert(worker->strand->state == Strand::State::WAITING);
- assert(!worker->strand->queue.empty());
- worker->strand->state = Strand::State::ACTIVE;
- assert(_self.waiting_tasks >= worker->strand->queue.size());
- _self.waiting_tasks -= worker->strand->queue.size();
- return worker;
- }
- return nullptr;
-}
-
-bool
-AdaptiveSequencedExecutor::obtain_strand(Worker &worker, std::unique_lock<std::mutex> &lock)
-{
- assert(worker.strand == nullptr);
- if (!_wait_queue.empty()) {
- worker.strand = _wait_queue.front();
- _wait_queue.pop();
- assert(worker.strand->state == Strand::State::WAITING);
- assert(!worker.strand->queue.empty());
- worker.strand->state = Strand::State::ACTIVE;
- assert(_self.waiting_tasks >= worker.strand->queue.size());
- _self.waiting_tasks -= worker.strand->queue.size();
- } else if (_self.state == Self::State::CLOSED) {
- worker.state = Worker::State::DONE;
- } else {
- worker.state = Worker::State::BLOCKED;
- _worker_stack.push(&worker);
- while (worker.state == Worker::State::BLOCKED) {
- worker.cond.wait(lock);
- }
- }
- return (worker.state == Worker::State::RUNNING);
-}
-
-bool
-AdaptiveSequencedExecutor::exchange_strand(Worker &worker, std::unique_lock<std::mutex> &lock)
-{
- if (worker.strand == nullptr) {
- return obtain_strand(worker, lock);
- }
- if (worker.strand->queue.empty()) {
- worker.strand->state = Strand::State::IDLE;
- worker.strand = nullptr;
- return obtain_strand(worker, lock);
- }
- if (!_wait_queue.empty()) {
- worker.strand->state = Strand::State::WAITING;
- _self.waiting_tasks += worker.strand->queue.size();
- _wait_queue.push(worker.strand);
- worker.strand = nullptr;
- return obtain_strand(worker, lock);
- }
- return true;
-}
-
-AdaptiveSequencedExecutor::Task::UP
-AdaptiveSequencedExecutor::next_task(Worker &worker)
-{
- Task::UP task;
- Worker *worker_to_wake = nullptr;
- auto guard = std::unique_lock(_mutex);
- if (exchange_strand(worker, guard)) {
- assert(worker.state == Worker::State::RUNNING);
- assert(worker.strand != nullptr);
- assert(!worker.strand->queue.empty());
- task = std::move(worker.strand->queue.front());
- worker.strand->queue.pop();
- _stats.queueSize.add(--_self.pending_tasks);
- worker_to_wake = get_worker_to_wake(guard);
- } else {
- assert(worker.state == Worker::State::DONE);
- assert(worker.strand == nullptr);
- }
- bool signal_self = maybe_unblock_self(guard);
- guard.unlock(); // UNLOCK
- if (worker_to_wake != nullptr) {
- worker_to_wake->cond.notify_one();
- }
- if (signal_self) {
- _self.cond.notify_all();
- }
- return task;
-}
-
-void
-AdaptiveSequencedExecutor::worker_main()
-{
- Worker worker;
- while (Task::UP my_task = next_task(worker)) {
- my_task->run();
- }
- _thread_tools->allow_worker_exit.await();
-}
-
-AdaptiveSequencedExecutor::AdaptiveSequencedExecutor(size_t num_strands, size_t num_threads,
- size_t max_waiting, size_t max_pending)
- : ISequencedTaskExecutor(num_strands),
- _thread_tools(std::make_unique<ThreadTools>(*this)),
- _mutex(),
- _strands(num_strands),
- _wait_queue(num_strands),
- _worker_stack(num_threads),
- _self(),
- _stats(),
- _cfg(num_threads, max_waiting, max_pending)
-{
- _stats.queueSize.add(_self.pending_tasks);
- _thread_tools->start(num_threads);
-}
-
-AdaptiveSequencedExecutor::~AdaptiveSequencedExecutor()
-{
- sync();
- {
- auto guard = std::unique_lock(_mutex);
- assert(_self.state == Self::State::OPEN);
- _self.state = Self::State::CLOSED;
- while (!_worker_stack.empty()) {
- Worker *worker = _worker_stack.back();
- _worker_stack.popBack();
- assert(worker->state == Worker::State::BLOCKED);
- assert(worker->strand == nullptr);
- worker->state = Worker::State::DONE;
- worker->cond.notify_one();
- }
- _self.cond.notify_all();
- }
- _thread_tools->close();
- assert(_wait_queue.empty());
- assert(_worker_stack.empty());
-}
-
-void
-AdaptiveSequencedExecutor::executeTask(ExecutorId id, Task::UP task)
-{
- assert(id.getId() < _strands.size());
- Strand &strand = _strands[id.getId()];
- auto guard = std::unique_lock(_mutex);
- maybe_block_self(guard);
- assert(_self.state != Self::State::CLOSED);
- strand.queue.push(std::move(task));
- _stats.queueSize.add(++_self.pending_tasks);
- ++_stats.acceptedTasks;
- if (strand.state == Strand::State::WAITING) {
- ++_self.waiting_tasks;
- } else if (strand.state == Strand::State::IDLE) {
- if (_worker_stack.size() < _cfg.num_threads) {
- strand.state = Strand::State::WAITING;
- _wait_queue.push(&strand);
- _self.waiting_tasks += strand.queue.size();
- } else {
- strand.state = Strand::State::ACTIVE;
- assert(_wait_queue.empty());
- Worker *worker = _worker_stack.back();
- _worker_stack.popBack();
- assert(worker->state == Worker::State::BLOCKED);
- assert(worker->strand == nullptr);
- worker->state = Worker::State::RUNNING;
- worker->strand = &strand;
- guard.unlock(); // UNLOCK
- worker->cond.notify_one();
- }
- }
-}
-
-void
-AdaptiveSequencedExecutor::sync()
-{
- vespalib::CountDownLatch latch(_strands.size());
- for (size_t i = 0; i < _strands.size(); ++i) {
- execute(ExecutorId(i), [&](){ latch.countDown(); });
- }
- latch.await();
-}
-
-void
-AdaptiveSequencedExecutor::setTaskLimit(uint32_t task_limit)
-{
- auto guard = std::unique_lock(_mutex);
- _cfg.set_max_pending(task_limit);
- bool signal_self = maybe_unblock_self(guard);
- guard.unlock(); // UNLOCK
- if (signal_self) {
- _self.cond.notify_all();
- }
-}
-
-AdaptiveSequencedExecutor::Stats
-AdaptiveSequencedExecutor::getStats()
-{
- auto guard = std::lock_guard(_mutex);
- Stats stats = _stats;
- _stats = Stats();
- _stats.queueSize.add(_self.pending_tasks);
- return stats;
-}
-
-}
diff --git a/searchlib/src/vespa/searchlib/common/adaptive_sequenced_executor.h b/searchlib/src/vespa/searchlib/common/adaptive_sequenced_executor.h
deleted file mode 100644
index 3abc095e9df..00000000000
--- a/searchlib/src/vespa/searchlib/common/adaptive_sequenced_executor.h
+++ /dev/null
@@ -1,126 +0,0 @@
-// Copyright 2020 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#pragma once
-
-#include "isequencedtaskexecutor.h"
-#include <vespa/vespalib/util/arrayqueue.hpp>
-#include <vespa/vespalib/util/gate.h>
-#include <vespa/fastos/thread.h>
-#include <mutex>
-#include <condition_variable>
-#include <cassert>
-
-namespace search {
-
-/**
- * Sequenced executor that balances the number of active threads in
- * order to optimize for throughput over latency by minimizing the
- * number of critical-path wakeups.
- **/
-class AdaptiveSequencedExecutor : public ISequencedTaskExecutor
-{
-private:
- using Stats = vespalib::ExecutorStats;
- using Task = vespalib::Executor::Task;
-
- /**
- * Values used to configure the executor.
- **/
- struct Config {
- size_t num_threads;
- size_t max_waiting;
- size_t max_pending;
- size_t wakeup_limit;
- void set_max_pending(size_t max_pending_in) {
- max_pending = std::max(1uL, max_pending_in);
- wakeup_limit = std::max(1uL, size_t(max_pending * 0.9));
- assert(wakeup_limit > 0);
- assert(wakeup_limit <= max_pending);
- }
- Config(size_t num_threads_in, size_t max_waiting_in, size_t max_pending_in)
- : num_threads(num_threads_in), max_waiting(max_waiting_in), max_pending(1000), wakeup_limit(900)
- {
- assert(num_threads > 0);
- set_max_pending(max_pending_in);
- }
- };
-
- /**
- * Tasks that need to be sequenced are handled by a single strand.
- **/
- struct Strand {
- enum class State { IDLE, WAITING, ACTIVE };
- State state;
- vespalib::ArrayQueue<Task::UP> queue;
- Strand();
- ~Strand();
- };
-
- /**
- * The state of a single worker thread.
- **/
- struct Worker {
- enum class State { RUNNING, BLOCKED, DONE };
- std::condition_variable cond;
- State state;
- Strand *strand;
- Worker();
- ~Worker();
- };
-
- /**
- * State related to the executor itself.
- **/
- struct Self {
- enum class State { OPEN, BLOCKED, CLOSED };
- std::condition_variable cond;
- State state;
- size_t waiting_tasks;
- size_t pending_tasks;
- Self();
- ~Self();
- };
-
- /**
- * Stuff related to worker thread startup and shutdown.
- **/
- struct ThreadTools : FastOS_Runnable {
- static constexpr size_t STACK_SIZE = (256 * 1024);
- AdaptiveSequencedExecutor &parent;
- std::unique_ptr<FastOS_ThreadPool> pool;
- vespalib::Gate allow_worker_exit;
- ThreadTools(AdaptiveSequencedExecutor &parent_in);
- ~ThreadTools();
- void Run(FastOS_ThreadInterface *, void *) override;
- void start(size_t num_threads);
- void close();
- };
-
- std::unique_ptr<ThreadTools> _thread_tools;
- std::mutex _mutex;
- std::vector<Strand> _strands;
- vespalib::ArrayQueue<Strand*> _wait_queue;
- vespalib::ArrayQueue<Worker*> _worker_stack;
- Self _self;
- Stats _stats;
- Config _cfg;
-
- void maybe_block_self(std::unique_lock<std::mutex> &lock);
- bool maybe_unblock_self(const std::unique_lock<std::mutex> &lock);
-
- Worker *get_worker_to_wake(const std::unique_lock<std::mutex> &lock);
- bool obtain_strand(Worker &worker, std::unique_lock<std::mutex> &lock);
- bool exchange_strand(Worker &worker, std::unique_lock<std::mutex> &lock);
- Task::UP next_task(Worker &worker);
- void worker_main();
-public:
- AdaptiveSequencedExecutor(size_t num_strands, size_t num_threads,
- size_t max_waiting, size_t max_pending);
- ~AdaptiveSequencedExecutor() override;
- void executeTask(ExecutorId id, Task::UP task) override;
- void sync() override;
- void setTaskLimit(uint32_t task_limit) override;
- vespalib::ExecutorStats getStats() override;
-};
-
-}
diff --git a/searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.cpp b/searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.cpp
deleted file mode 100644
index a93eb1ff4bc..00000000000
--- a/searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.cpp
+++ /dev/null
@@ -1,45 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include "foregroundtaskexecutor.h"
-#include <vespa/vespalib/util/threadstackexecutor.h>
-#include <vespa/vespalib/stllike/hash_map.hpp>
-
-using vespalib::ThreadStackExecutor;
-
-namespace search {
-
-ForegroundTaskExecutor::ForegroundTaskExecutor()
- : ForegroundTaskExecutor(1)
-{
-}
-
-ForegroundTaskExecutor::ForegroundTaskExecutor(uint32_t threads)
- : ISequencedTaskExecutor(threads),
- _accepted(0)
-{
-}
-
-ForegroundTaskExecutor::~ForegroundTaskExecutor() = default;
-
-void
-ForegroundTaskExecutor::executeTask(ExecutorId id, vespalib::Executor::Task::UP task)
-{
- assert(id.getId() < getNumExecutors());
- task->run();
- _accepted++;
-}
-
-void
-ForegroundTaskExecutor::sync()
-{
-}
-
-void ForegroundTaskExecutor::setTaskLimit(uint32_t) {
-
-}
-
-vespalib::ExecutorStats ForegroundTaskExecutor::getStats() {
- return vespalib::ExecutorStats(vespalib::ExecutorStats::QueueSizeT(0) , _accepted.load(std::memory_order_relaxed), 0);
-}
-
-} // namespace search
diff --git a/searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.h b/searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.h
deleted file mode 100644
index 0b604fb140d..00000000000
--- a/searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.h
+++ /dev/null
@@ -1,36 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#pragma once
-
-#include "isequencedtaskexecutor.h"
-#include <atomic>
-
-namespace vespalib { class ThreadStackExecutorBase; }
-
-namespace search {
-
-/**
- * Class to run multiple tasks in parallel, but tasks with same
- * id has to be run in sequence.
- *
- * Currently, this is a dummy version that runs everything in the foreground.
- */
-class ForegroundTaskExecutor : public ISequencedTaskExecutor
-{
-public:
- using ISequencedTaskExecutor::getExecutorId;
-
- ForegroundTaskExecutor();
- ForegroundTaskExecutor(uint32_t threads);
- ~ForegroundTaskExecutor() override;
-
- void executeTask(ExecutorId id, vespalib::Executor::Task::UP task) override;
- void sync() override;
-
- void setTaskLimit(uint32_t taskLimit) override;
-
- vespalib::ExecutorStats getStats() override;
-private:
- std::atomic<uint64_t> _accepted;
-};
-
-} // namespace search
diff --git a/searchlib/src/vespa/searchlib/common/isequencedtaskexecutor.cpp b/searchlib/src/vespa/searchlib/common/isequencedtaskexecutor.cpp
deleted file mode 100644
index 9d1bc99bebb..00000000000
--- a/searchlib/src/vespa/searchlib/common/isequencedtaskexecutor.cpp
+++ /dev/null
@@ -1,45 +0,0 @@
-// Copyright 2020 Oath inc.. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include "isequencedtaskexecutor.h"
-#include <vespa/vespalib/stllike/hash_fun.h>
-#include <vespa/vespalib/stllike/hashtable.h>
-#include <cassert>
-
-namespace search {
- namespace {
- constexpr uint8_t MAGIC = 255;
- }
-
-ISequencedTaskExecutor::ISequencedTaskExecutor(uint32_t numExecutors)
- : _component2Id(vespalib::hashtable_base::getModuloStl(numExecutors*8), MAGIC),
- _mutex(),
- _numExecutors(numExecutors),
- _nextId(0)
-{
- assert(numExecutors < 256);
-}
-
-ISequencedTaskExecutor::~ISequencedTaskExecutor() = default;
-
-ISequencedTaskExecutor::ExecutorId
-ISequencedTaskExecutor::getExecutorId(vespalib::stringref componentId) const {
- vespalib::hash<vespalib::stringref> hashfun;
- return getExecutorId(hashfun(componentId));
-}
-
-ISequencedTaskExecutor::ExecutorId
-ISequencedTaskExecutor::getExecutorId(uint64_t componentId) const {
- uint32_t shrunkId = componentId % _component2Id.size();
- uint8_t executorId = _component2Id[shrunkId];
- if (executorId == MAGIC) {
- std::lock_guard guard(_mutex);
- if (_component2Id[shrunkId] == MAGIC) {
- _component2Id[shrunkId] = _nextId % getNumExecutors();
- _nextId++;
- }
- executorId = _component2Id[shrunkId];
- }
- return ExecutorId(executorId);
-}
-
-}
diff --git a/searchlib/src/vespa/searchlib/common/isequencedtaskexecutor.h b/searchlib/src/vespa/searchlib/common/isequencedtaskexecutor.h
deleted file mode 100644
index 55c26abc3d8..00000000000
--- a/searchlib/src/vespa/searchlib/common/isequencedtaskexecutor.h
+++ /dev/null
@@ -1,113 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#pragma once
-
-#include <vespa/vespalib/util/executor.h>
-#include <vespa/vespalib/util/executor_stats.h>
-#include <vespa/vespalib/stllike/string.h>
-#include <vespa/vespalib/util/lambdatask.h>
-#include <vector>
-#include <mutex>
-
-namespace search {
-
-/**
- * Interface class to run multiple tasks in parallel, but tasks with same
- * id has to be run in sequence.
- */
-class ISequencedTaskExecutor
-{
-public:
- class ExecutorId {
- public:
- ExecutorId() : ExecutorId(0) { }
- explicit ExecutorId(uint32_t id) : _id(id) { }
- uint32_t getId() const { return _id; }
- bool operator != (ExecutorId rhs) const { return _id != rhs._id; }
- bool operator == (ExecutorId rhs) const { return _id == rhs._id; }
- bool operator < (ExecutorId rhs) const { return _id < rhs._id; }
- private:
- uint32_t _id;
- };
- ISequencedTaskExecutor(uint32_t numExecutors);
- virtual ~ISequencedTaskExecutor();
-
- /**
- * Calculate which executor will handle an component.
- *
- * @param componentId component id
- * @return executor id
- */
- ExecutorId getExecutorId(uint64_t componentId) const;
- uint32_t getNumExecutors() const { return _numExecutors; }
-
- ExecutorId getExecutorId(vespalib::stringref componentId) const;
-
- /**
- * Schedule a task to run after all previously scheduled tasks with
- * same id.
- *
- * @param id which internal executor to use
- * @param task unique pointer to the task to be executed
- */
- virtual void executeTask(ExecutorId id, vespalib::Executor::Task::UP task) = 0;
-
- /**
- * Wrap lambda function into a task and schedule it to be run.
- * Caller must ensure that pointers and references are valid and
- * call sync before tearing down pointed to/referenced data.
- *
- * @param id which internal executor to use
- * @param function function to be wrapped in a task and later executed
- */
- template <class FunctionType>
- void executeLambda(ExecutorId id, FunctionType &&function) {
- executeTask(id, vespalib::makeLambdaTask(std::forward<FunctionType>(function)));
- }
- /**
- * Wait for all scheduled tasks to complete.
- */
- virtual void sync() = 0;
-
- virtual void setTaskLimit(uint32_t taskLimit) = 0;
-
- virtual vespalib::ExecutorStats getStats() = 0;
-
- /**
- * Wrap lambda function into a task and schedule it to be run.
- * Caller must ensure that pointers and references are valid and
- * call sync before tearing down pointed to/referenced data.
- *
- * @param componentId component id
- * @param function function to be wrapped in a task and later executed
- */
- template <class FunctionType>
- void execute(uint64_t componentId, FunctionType &&function) {
- ExecutorId id = getExecutorId(componentId);
- executeTask(id, vespalib::makeLambdaTask(std::forward<FunctionType>(function)));
- }
-
- /**
- * Wrap lambda function into a task and schedule it to be run.
- * Caller must ensure that pointers and references are valid and
- * call sync before tearing down pointed to/referenced data.
- *
- * @param id executor id
- * @param function function to be wrapped in a task and later executed
- */
- template <class FunctionType>
- void execute(ExecutorId id, FunctionType &&function) {
- executeTask(id, vespalib::makeLambdaTask(std::forward<FunctionType>(function)));
- }
- /**
- * For testing only
- */
- uint32_t getComponentHashSize() const { return _component2Id.size(); }
- uint32_t getComponentEffectiveHashSize() const { return _nextId; }
-private:
- mutable std::vector<uint8_t> _component2Id;
- mutable std::mutex _mutex;
- uint32_t _numExecutors;
- mutable uint32_t _nextId;
-};
-
-} // namespace search
diff --git a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.cpp b/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.cpp
deleted file mode 100644
index c9b4af6cb55..00000000000
--- a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.cpp
+++ /dev/null
@@ -1,86 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include "sequencedtaskexecutor.h"
-#include <vespa/vespalib/util/blockingthreadstackexecutor.h>
-#include <vespa/vespalib/util/singleexecutor.h>
-
-using vespalib::BlockingThreadStackExecutor;
-using vespalib::SingleExecutor;
-
-namespace search {
-
-namespace {
-
-constexpr uint32_t stackSize = 128 * 1024;
-
-}
-
-
-std::unique_ptr<ISequencedTaskExecutor>
-SequencedTaskExecutor::create(uint32_t threads, uint32_t taskLimit, OptimizeFor optimize)
-{
- auto executors = std::make_unique<std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>>>();
- executors->reserve(threads);
- for (uint32_t id = 0; id < threads; ++id) {
- if (optimize == OptimizeFor::THROUGHPUT) {
- executors->push_back(std::make_unique<SingleExecutor>(taskLimit));
- } else {
- executors->push_back(std::make_unique<BlockingThreadStackExecutor>(1, stackSize, taskLimit));
- }
- }
- return std::unique_ptr<ISequencedTaskExecutor>(new SequencedTaskExecutor(std::move(executors)));
-}
-
-SequencedTaskExecutor::~SequencedTaskExecutor()
-{
- sync();
-}
-
-SequencedTaskExecutor::SequencedTaskExecutor(std::unique_ptr<std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>>> executors)
- : ISequencedTaskExecutor(executors->size()),
- _executors(std::move(executors))
-{
-}
-
-void
-SequencedTaskExecutor::setTaskLimit(uint32_t taskLimit)
-{
- for (const auto &executor : *_executors) {
- executor->setTaskLimit(taskLimit);
- }
-}
-
-void
-SequencedTaskExecutor::executeTask(ExecutorId id, vespalib::Executor::Task::UP task)
-{
- assert(id.getId() < _executors->size());
- auto rejectedTask = (*_executors)[id.getId()]->execute(std::move(task));
- assert(!rejectedTask);
-}
-
-void
-SequencedTaskExecutor::sync()
-{
- for (auto &executor : *_executors) {
- SingleExecutor * single = dynamic_cast<vespalib::SingleExecutor *>(executor.get());
- if (single) {
- //Enforce parallel wakeup of napping executors.
- single->startSync();
- }
- }
- for (auto &executor : *_executors) {
- executor->sync();
- }
-}
-
-SequencedTaskExecutor::Stats
-SequencedTaskExecutor::getStats()
-{
- Stats accumulatedStats;
- for (auto &executor :* _executors) {
- accumulatedStats += executor->getStats();
- }
- return accumulatedStats;
-}
-
-} // namespace search
diff --git a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.h b/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.h
deleted file mode 100644
index 8568901006f..00000000000
--- a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.h
+++ /dev/null
@@ -1,42 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#pragma once
-
-#include "isequencedtaskexecutor.h"
-#include <vector>
-
-namespace vespalib {
- struct ExecutorStats;
- class SyncableThreadExecutor;
-}
-
-namespace search {
-
-/**
- * Class to run multiple tasks in parallel, but tasks with same
- * id has to be run in sequence.
- */
-class SequencedTaskExecutor final : public ISequencedTaskExecutor
-{
- using Stats = vespalib::ExecutorStats;
- std::unique_ptr<std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>>> _executors;
-
- SequencedTaskExecutor(std::unique_ptr<std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>>> executor);
-public:
- using ISequencedTaskExecutor::getExecutorId;
- using OptimizeFor = vespalib::Executor::OptimizeFor;
-
- ~SequencedTaskExecutor();
-
- void setTaskLimit(uint32_t taskLimit) override;
- void executeTask(ExecutorId id, vespalib::Executor::Task::UP task) override;
- void sync() override;
- Stats getStats() override;
-
- /*
- * Note that if you choose Optimize::THROUGHPUT, you must ensure only a single producer, or synchronize on the outside.
- */
- static std::unique_ptr<ISequencedTaskExecutor>
- create(uint32_t threads, uint32_t taskLimit = 1000, OptimizeFor optimize = OptimizeFor::LATENCY);
-};
-
-} // namespace search
diff --git a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutorobserver.cpp b/searchlib/src/vespa/searchlib/common/sequencedtaskexecutorobserver.cpp
deleted file mode 100644
index e553e757c37..00000000000
--- a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutorobserver.cpp
+++ /dev/null
@@ -1,52 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include "sequencedtaskexecutorobserver.h"
-
-namespace search {
-
-SequencedTaskExecutorObserver::SequencedTaskExecutorObserver(ISequencedTaskExecutor &executor)
- : ISequencedTaskExecutor(executor.getNumExecutors()),
- _executor(executor),
- _executeCnt(0u),
- _syncCnt(0u),
- _executeHistory(),
- _mutex()
-{
-}
-
-SequencedTaskExecutorObserver::~SequencedTaskExecutorObserver() = default;
-
-void
-SequencedTaskExecutorObserver::executeTask(ExecutorId id, vespalib::Executor::Task::UP task)
-{
- ++_executeCnt;
- {
- std::lock_guard<std::mutex> guard(_mutex);
- _executeHistory.emplace_back(id.getId());
- }
- _executor.executeTask(id, std::move(task));
-}
-
-void
-SequencedTaskExecutorObserver::sync()
-{
- ++_syncCnt;
- _executor.sync();
-}
-
-std::vector<uint32_t>
-SequencedTaskExecutorObserver::getExecuteHistory()
-{
- std::lock_guard<std::mutex> guard(_mutex);
- return _executeHistory;
-}
-
-void SequencedTaskExecutorObserver::setTaskLimit(uint32_t taskLimit) {
- _executor.setTaskLimit(taskLimit);
-}
-
-vespalib::ExecutorStats SequencedTaskExecutorObserver::getStats() {
- return _executor.getStats();
-}
-
-} // namespace search
diff --git a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutorobserver.h b/searchlib/src/vespa/searchlib/common/sequencedtaskexecutorobserver.h
deleted file mode 100644
index dadd4bf59cf..00000000000
--- a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutorobserver.h
+++ /dev/null
@@ -1,38 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#pragma once
-
-#include "isequencedtaskexecutor.h"
-#include <atomic>
-
-namespace search {
-
-/**
- * Observer class to observe class to run multiple tasks in parallel,
- * but tasks with same id has to be run in sequence.
- */
-class SequencedTaskExecutorObserver : public ISequencedTaskExecutor
-{
- ISequencedTaskExecutor &_executor;
- std::atomic<uint32_t> _executeCnt;
- std::atomic<uint32_t> _syncCnt;
- std::vector<uint32_t> _executeHistory;
- std::mutex _mutex;
-public:
- using ISequencedTaskExecutor::getExecutorId;
-
- SequencedTaskExecutorObserver(ISequencedTaskExecutor &executor);
- ~SequencedTaskExecutorObserver() override;
-
- void executeTask(ExecutorId id, vespalib::Executor::Task::UP task) override;
- void sync() override;
-
- uint32_t getExecuteCnt() const { return _executeCnt; }
- uint32_t getSyncCnt() const { return _syncCnt; }
- std::vector<uint32_t> getExecuteHistory();
-
- void setTaskLimit(uint32_t taskLimit) override;
-
- vespalib::ExecutorStats getStats() override;
-};
-
-} // namespace search
diff --git a/searchlib/src/vespa/searchlib/common/threaded_compactable_lid_space.cpp b/searchlib/src/vespa/searchlib/common/threaded_compactable_lid_space.cpp
index defb537be0e..4b253387f15 100644
--- a/searchlib/src/vespa/searchlib/common/threaded_compactable_lid_space.cpp
+++ b/searchlib/src/vespa/searchlib/common/threaded_compactable_lid_space.cpp
@@ -1,9 +1,6 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include <cstddef>
-#include <cstdint>
#include "threaded_compactable_lid_space.h"
-#include "isequencedtaskexecutor.h"
#include <future>
namespace search::common {
diff --git a/searchlib/src/vespa/searchlib/common/threaded_compactable_lid_space.h b/searchlib/src/vespa/searchlib/common/threaded_compactable_lid_space.h
index 02d54acf666..f22d5946295 100644
--- a/searchlib/src/vespa/searchlib/common/threaded_compactable_lid_space.h
+++ b/searchlib/src/vespa/searchlib/common/threaded_compactable_lid_space.h
@@ -3,7 +3,7 @@
#pragma once
#include "i_compactable_lid_space.h"
-#include "isequencedtaskexecutor.h"
+#include <vespa/vespalib/util/isequencedtaskexecutor.h>
#include <memory>
namespace search::common {
@@ -15,6 +15,7 @@ namespace search::common {
*/
class ThreadedCompactableLidSpace : public ICompactableLidSpace
{
+ using ISequencedTaskExecutor = vespalib::ISequencedTaskExecutor;
std::shared_ptr<ICompactableLidSpace> _target;
ISequencedTaskExecutor &_executor;
ISequencedTaskExecutor::ExecutorId _executorId;
diff --git a/searchlib/src/vespa/searchlib/memoryindex/document_inverter.cpp b/searchlib/src/vespa/searchlib/memoryindex/document_inverter.cpp
index d032f06fc58..c8b23b2256f 100644
--- a/searchlib/src/vespa/searchlib/memoryindex/document_inverter.cpp
+++ b/searchlib/src/vespa/searchlib/memoryindex/document_inverter.cpp
@@ -8,7 +8,7 @@
#include <vespa/document/annotation/alternatespanlist.h>
#include <vespa/document/datatype/urldatatype.h>
#include <vespa/document/repo/fixedtyperepo.h>
-#include <vespa/searchlib/common/isequencedtaskexecutor.h>
+#include <vespa/vespalib/util/isequencedtaskexecutor.h>
#include <vespa/searchlib/common/sort.h>
#include <vespa/searchlib/util/url.h>
#include <vespa/vespalib/text/lowercase.h>
diff --git a/searchlib/src/vespa/searchlib/memoryindex/document_inverter.h b/searchlib/src/vespa/searchlib/memoryindex/document_inverter.h
index ffa9dd0fab8..57f51e7fdb0 100644
--- a/searchlib/src/vespa/searchlib/memoryindex/document_inverter.h
+++ b/searchlib/src/vespa/searchlib/memoryindex/document_inverter.h
@@ -6,18 +6,21 @@
#include <vespa/searchlib/index/schema_index_fields.h>
namespace document {
-class DataType;
-class Document;
-class DocumentType;
-class Field;
-class FieldValue;
+ class DataType;
+ class Document;
+ class DocumentType;
+ class Field;
+ class FieldValue;
}
namespace search {
- class ISequencedTaskExecutor;
class IDestructorCallback;
}
+namespace vespalib {
+ class ISequencedTaskExecutor;
+}
+
namespace search::memoryindex {
class FieldInverter;
@@ -31,6 +34,7 @@ class IFieldIndexCollection;
*/
class DocumentInverter {
private:
+ using ISequencedTaskExecutor = vespalib::ISequencedTaskExecutor;
DocumentInverter(const DocumentInverter &) = delete;
DocumentInverter &operator=(const DocumentInverter &) = delete;
diff --git a/searchlib/src/vespa/searchlib/memoryindex/memory_index.cpp b/searchlib/src/vespa/searchlib/memoryindex/memory_index.cpp
index d8e48e84fb7..34c5d65ab23 100644
--- a/searchlib/src/vespa/searchlib/memoryindex/memory_index.cpp
+++ b/searchlib/src/vespa/searchlib/memoryindex/memory_index.cpp
@@ -5,7 +5,7 @@
#include "memory_index.h"
#include <vespa/document/fieldvalue/arrayfieldvalue.h>
#include <vespa/document/fieldvalue/document.h>
-#include <vespa/searchlib/common/sequencedtaskexecutor.h>
+#include <vespa/vespalib/util/isequencedtaskexecutor.h>
#include <vespa/searchlib/index/field_length_calculator.h>
#include <vespa/searchlib/index/schemautil.h>
#include <vespa/searchlib/queryeval/create_blueprint_visitor_helper.h>
@@ -44,6 +44,7 @@ using queryeval::EmptyBlueprint;
using queryeval::FieldSpec;
using queryeval::IRequestContext;
using queryeval::Searchable;
+using vespalib::ISequencedTaskExecutor;
}
diff --git a/searchlib/src/vespa/searchlib/memoryindex/memory_index.h b/searchlib/src/vespa/searchlib/memoryindex/memory_index.h
index 4e20bcf81e3..a9f153f7dd8 100644
--- a/searchlib/src/vespa/searchlib/memoryindex/memory_index.h
+++ b/searchlib/src/vespa/searchlib/memoryindex/memory_index.h
@@ -14,7 +14,7 @@ namespace search::index {
class IndexBuilder;
}
-namespace search { class ISequencedTaskExecutor; }
+namespace vespalib { class ISequencedTaskExecutor; }
namespace document { class Document; }
@@ -40,6 +40,7 @@ class FieldIndexCollection;
*/
class MemoryIndex : public queryeval::Searchable {
private:
+ using ISequencedTaskExecutor = vespalib::ISequencedTaskExecutor;
index::Schema _schema;
ISequencedTaskExecutor &_invertThreads;
ISequencedTaskExecutor &_pushThreads;