aboutsummaryrefslogtreecommitdiffstats
path: root/searchlib
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-02-27 22:35:17 +0100
committerGitHub <noreply@github.com>2020-02-27 22:35:17 +0100
commitd7fb5fac283eef2c09eeaabd94288fa123e9f94c (patch)
tree46ddfec9b779afc944a6c9641a2757cca0dabc86 /searchlib
parent2a08a7b83b33dcda0dfbe13085aeb19d32da8c39 (diff)
parentc705d62a12602433a05d21ff26a2af2020c661cc (diff)
Merge pull request #12372 from vespa-engine/balder/add-getStats-and-setTaskLimit-to-interface
Add getStats and setTaskLimit to interface to make it easy to swap im…
Diffstat (limited to 'searchlib')
-rw-r--r--searchlib/src/apps/tests/memoryindexstress_test.cpp10
-rw-r--r--searchlib/src/tests/common/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp4
-rw-r--r--searchlib/src/tests/common/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp70
-rw-r--r--searchlib/src/tests/diskindex/fusion/fusion_test.cpp28
-rw-r--r--searchlib/src/tests/memoryindex/document_inverter/document_inverter_test.cpp14
-rw-r--r--searchlib/src/tests/memoryindex/field_index/field_index_test.cpp74
-rw-r--r--searchlib/src/tests/memoryindex/memory_index/memory_index_test.cpp12
-rw-r--r--searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.cpp12
-rw-r--r--searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.h8
-rw-r--r--searchlib/src/vespa/searchlib/common/isequencedtaskexecutor.h5
-rw-r--r--searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.cpp28
-rw-r--r--searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.h12
-rw-r--r--searchlib/src/vespa/searchlib/common/sequencedtaskexecutorobserver.cpp8
-rw-r--r--searchlib/src/vespa/searchlib/common/sequencedtaskexecutorobserver.h6
14 files changed, 165 insertions, 126 deletions
diff --git a/searchlib/src/apps/tests/memoryindexstress_test.cpp b/searchlib/src/apps/tests/memoryindexstress_test.cpp
index 60f3a6b7664..dc950b84508 100644
--- a/searchlib/src/apps/tests/memoryindexstress_test.cpp
+++ b/searchlib/src/apps/tests/memoryindexstress_test.cpp
@@ -195,8 +195,8 @@ struct Fixture {
Schema schema;
DocumentTypeRepo repo;
vespalib::ThreadStackExecutor _executor;
- search::SequencedTaskExecutor _invertThreads;
- search::SequencedTaskExecutor _pushThreads;
+ std::unique_ptr<search::ISequencedTaskExecutor> _invertThreads;
+ std::unique_ptr<search::ISequencedTaskExecutor> _pushThreads;
MemoryIndex index;
uint32_t _readThreads;
vespalib::ThreadStackExecutor _writer; // 1 write thread
@@ -247,9 +247,9 @@ Fixture::Fixture(uint32_t readThreads)
: schema(makeSchema()),
repo(makeDocTypeRepoConfig()),
_executor(1, 128 * 1024),
- _invertThreads(2),
- _pushThreads(2),
- index(schema, MockFieldLengthInspector(), _invertThreads, _pushThreads),
+ _invertThreads(search::SequencedTaskExecutor::create(2)),
+ _pushThreads(search::SequencedTaskExecutor::create(2)),
+ index(schema, MockFieldLengthInspector(), *_invertThreads, *_pushThreads),
_readThreads(readThreads),
_writer(1, 128 * 1024),
_readers(readThreads, 128 * 1024),
diff --git a/searchlib/src/tests/common/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp b/searchlib/src/tests/common/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp
index a51becfbf13..b2b15ded274 100644
--- a/searchlib/src/tests/common/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp
+++ b/searchlib/src/tests/common/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp
@@ -16,9 +16,9 @@ int main(int argc, char *argv[]) {
if (argc > 2)
numThreads = atoi(argv[2]);
- SequencedTaskExecutor executor(numThreads);
+ auto executor = SequencedTaskExecutor::create(numThreads);
for (unsigned long tid(0); tid < numTasks; tid++) {
- executor.executeTask(ExecutorId(tid%numThreads), vespalib::makeLambdaTask([&counter] { counter++; }));
+ executor->executeTask(ExecutorId(tid%numThreads), vespalib::makeLambdaTask([&counter] { counter++; }));
}
return 0;
}
diff --git a/searchlib/src/tests/common/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp b/searchlib/src/tests/common/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp
index fcc7fd7300d..c311a59a56c 100644
--- a/searchlib/src/tests/common/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp
+++ b/searchlib/src/tests/common/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp
@@ -17,9 +17,9 @@ namespace search::common {
class Fixture
{
public:
- SequencedTaskExecutor _threads;
+ std::unique_ptr<ISequencedTaskExecutor> _threads;
- Fixture() : _threads(2) { }
+ Fixture() : _threads(SequencedTaskExecutor::create(2)) { }
};
@@ -67,11 +67,11 @@ public:
TEST_F("testExecute", Fixture) {
std::shared_ptr<TestObj> tv(std::make_shared<TestObj>());
EXPECT_EQUAL(0, tv->_val);
- f._threads.execute(1, [=]() { tv->modify(0, 42); });
+ f._threads->execute(1, [=]() { tv->modify(0, 42); });
tv->wait(1);
EXPECT_EQUAL(0, tv->_fail);
EXPECT_EQUAL(42, tv->_val);
- f._threads.sync();
+ f._threads->sync();
EXPECT_EQUAL(0, tv->_fail);
EXPECT_EQUAL(42, tv->_val);
}
@@ -81,12 +81,12 @@ TEST_F("require that task with same component id are serialized", Fixture)
{
std::shared_ptr<TestObj> tv(std::make_shared<TestObj>());
EXPECT_EQUAL(0, tv->_val);
- f._threads.execute(0, [=]() { usleep(2000); tv->modify(0, 14); });
- f._threads.execute(0, [=]() { tv->modify(14, 42); });
+ f._threads->execute(0, [=]() { usleep(2000); tv->modify(0, 14); });
+ f._threads->execute(0, [=]() { tv->modify(14, 42); });
tv->wait(2);
EXPECT_EQUAL(0, tv->_fail);
EXPECT_EQUAL(42, tv->_val);
- f._threads.sync();
+ f._threads->sync();
EXPECT_EQUAL(0, tv->_fail);
EXPECT_EQUAL(42, tv->_val);
}
@@ -97,15 +97,15 @@ TEST_F("require that task with different component ids are not serialized", Fixt
for (tryCnt = 0; tryCnt < 100; ++tryCnt) {
std::shared_ptr<TestObj> tv(std::make_shared<TestObj>());
EXPECT_EQUAL(0, tv->_val);
- f._threads.execute(0, [=]() { usleep(2000); tv->modify(0, 14); });
- f._threads.execute(2, [=]() { tv->modify(14, 42); });
+ f._threads->execute(0, [=]() { usleep(2000); tv->modify(0, 14); });
+ f._threads->execute(2, [=]() { tv->modify(14, 42); });
tv->wait(2);
if (tv->_fail != 1) {
continue;
}
EXPECT_EQUAL(1, tv->_fail);
EXPECT_EQUAL(14, tv->_val);
- f._threads.sync();
+ f._threads->sync();
EXPECT_EQUAL(1, tv->_fail);
EXPECT_EQUAL(14, tv->_val);
break;
@@ -119,12 +119,12 @@ TEST_F("require that task with same string component id are serialized", Fixture
std::shared_ptr<TestObj> tv(std::make_shared<TestObj>());
EXPECT_EQUAL(0, tv->_val);
auto test2 = [=]() { tv->modify(14, 42); };
- f._threads.execute(f._threads.getExecutorId("0"), [=]() { usleep(2000); tv->modify(0, 14); });
- f._threads.execute(f._threads.getExecutorId("0"), test2);
+ f._threads->execute(f._threads->getExecutorId("0"), [=]() { usleep(2000); tv->modify(0, 14); });
+ f._threads->execute(f._threads->getExecutorId("0"), test2);
tv->wait(2);
EXPECT_EQUAL(0, tv->_fail);
EXPECT_EQUAL(42, tv->_val);
- f._threads.sync();
+ f._threads->sync();
EXPECT_EQUAL(0, tv->_fail);
EXPECT_EQUAL(42, tv->_val);
}
@@ -137,15 +137,15 @@ int detectSerializeFailure(Fixture &f, vespalib::stringref altComponentId, int t
for (tryCnt = 0; tryCnt < tryLimit; ++tryCnt) {
std::shared_ptr<TestObj> tv(std::make_shared<TestObj>());
EXPECT_EQUAL(0, tv->_val);
- f._threads.execute(f._threads.getExecutorId("0"), [=]() { usleep(2000); tv->modify(0, 14); });
- f._threads.execute(f._threads.getExecutorId(altComponentId), [=]() { tv->modify(14, 42); });
+ f._threads->execute(f._threads->getExecutorId("0"), [=]() { usleep(2000); tv->modify(0, 14); });
+ f._threads->execute(f._threads->getExecutorId(altComponentId), [=]() { tv->modify(14, 42); });
tv->wait(2);
if (tv->_fail != 1) {
continue;
}
EXPECT_EQUAL(1, tv->_fail);
EXPECT_EQUAL(14, tv->_val);
- f._threads.sync();
+ f._threads->sync();
EXPECT_EQUAL(1, tv->_fail);
EXPECT_EQUAL(14, tv->_val);
break;
@@ -157,10 +157,10 @@ vespalib::string makeAltComponentId(Fixture &f)
{
int tryCnt = 0;
char altComponentId[20];
- ISequencedTaskExecutor::ExecutorId executorId0 = f._threads.getExecutorId("0");
+ ISequencedTaskExecutor::ExecutorId executorId0 = f._threads->getExecutorId("0");
for (tryCnt = 1; tryCnt < 100; ++tryCnt) {
sprintf(altComponentId, "%d", tryCnt);
- if (f._threads.getExecutorId(altComponentId) == executorId0) {
+ if (f._threads->getExecutorId(altComponentId) == executorId0) {
break;
}
}
@@ -193,9 +193,9 @@ TEST_F("require that execute works with const lambda", Fixture)
std::vector<int> res;
const auto lambda = [i, &res]() mutable
{ res.push_back(i--); res.push_back(i--); };
- f._threads.execute(0, lambda);
- f._threads.execute(0, lambda);
- f._threads.sync();
+ f._threads->execute(0, lambda);
+ f._threads->execute(0, lambda);
+ f._threads->sync();
std::vector<int> exp({5, 4, 5, 4});
EXPECT_EQUAL(exp, res);
EXPECT_EQUAL(5, i);
@@ -208,9 +208,9 @@ TEST_F("require that execute works with reference to lambda", Fixture)
auto lambda = [i, &res]() mutable
{ res.push_back(i--); res.push_back(i--); };
auto &lambdaref = lambda;
- f._threads.execute(0, lambdaref);
- f._threads.execute(0, lambdaref);
- f._threads.sync();
+ f._threads->execute(0, lambdaref);
+ f._threads->execute(0, lambdaref);
+ f._threads->sync();
std::vector<int> exp({5, 4, 5, 4});
EXPECT_EQUAL(exp, res);
EXPECT_EQUAL(5, i);
@@ -222,28 +222,28 @@ TEST_F("require that executeLambda works", Fixture)
std::vector<int> res;
const auto lambda = [i, &res]() mutable
{ res.push_back(i--); res.push_back(i--); };
- f._threads.executeLambda(ISequencedTaskExecutor::ExecutorId(0), lambda);
- f._threads.sync();
+ f._threads->executeLambda(ISequencedTaskExecutor::ExecutorId(0), lambda);
+ f._threads->sync();
std::vector<int> exp({5, 4});
EXPECT_EQUAL(exp, res);
EXPECT_EQUAL(5, i);
}
TEST("require that you get correct number of executors") {
- SequencedTaskExecutor seven(7);
- EXPECT_EQUAL(7u, seven.getNumExecutors());
+ auto seven = SequencedTaskExecutor::create(7);
+ EXPECT_EQUAL(7u, seven->getNumExecutors());
}
TEST("require that you distribute well") {
- SequencedTaskExecutor seven(7);
- EXPECT_EQUAL(7u, seven.getNumExecutors());
- EXPECT_EQUAL(97u, seven.getComponentHashSize());
- EXPECT_EQUAL(0u, seven.getComponentEffectiveHashSize());
+ auto seven = SequencedTaskExecutor::create(7);
+ EXPECT_EQUAL(7u, seven->getNumExecutors());
+ EXPECT_EQUAL(97u, seven->getComponentHashSize());
+ EXPECT_EQUAL(0u, seven->getComponentEffectiveHashSize());
for (uint32_t id=0; id < 1000; id++) {
- EXPECT_EQUAL((id%97)%7, seven.getExecutorId(id).getId());
+ EXPECT_EQUAL((id%97)%7, seven->getExecutorId(id).getId());
}
- EXPECT_EQUAL(97u, seven.getComponentHashSize());
- EXPECT_EQUAL(97u, seven.getComponentEffectiveHashSize());
+ EXPECT_EQUAL(97u, seven->getComponentHashSize());
+ EXPECT_EQUAL(97u, seven->getComponentEffectiveHashSize());
}
}
diff --git a/searchlib/src/tests/diskindex/fusion/fusion_test.cpp b/searchlib/src/tests/diskindex/fusion/fusion_test.cpp
index 1825c00ceda..92d0659d984 100644
--- a/searchlib/src/tests/diskindex/fusion/fusion_test.cpp
+++ b/searchlib/src/tests/diskindex/fusion/fusion_test.cpp
@@ -314,16 +314,16 @@ FusionTest::requireThatFusionIsWorking(const vespalib::string &prefix, bool dire
addField("f4"));
FieldIndexCollection fic(schema, MockFieldLengthInspector());
DocBuilder b(schema);
- SequencedTaskExecutor invertThreads(2);
- SequencedTaskExecutor pushThreads(2);
- DocumentInverter inv(schema, invertThreads, pushThreads, fic);
+ auto invertThreads = SequencedTaskExecutor::create(2);
+ auto pushThreads = SequencedTaskExecutor::create(2);
+ DocumentInverter inv(schema, *invertThreads, *pushThreads, fic);
Document::UP doc;
doc = make_doc10(b);
inv.invertDocument(10, *doc);
- invertThreads.sync();
+ invertThreads->sync();
myPushDocument(inv);
- pushThreads.sync();
+ pushThreads->sync();
b.startDocument("id:ns:searchdocument::11").
startIndexField("f3").
@@ -331,9 +331,9 @@ FusionTest::requireThatFusionIsWorking(const vespalib::string &prefix, bool dire
endField();
doc = b.endDocument();
inv.invertDocument(11, *doc);
- invertThreads.sync();
+ invertThreads->sync();
myPushDocument(inv);
- pushThreads.sync();
+ pushThreads->sync();
b.startDocument("id:ns:searchdocument::12").
startIndexField("f3").
@@ -341,9 +341,9 @@ FusionTest::requireThatFusionIsWorking(const vespalib::string &prefix, bool dire
endField();
doc = b.endDocument();
inv.invertDocument(12, *doc);
- invertThreads.sync();
+ invertThreads->sync();
myPushDocument(inv);
- pushThreads.sync();
+ pushThreads->sync();
IndexBuilder ib(schema);
vespalib::string dump2dir = prefix + "dump2";
@@ -455,14 +455,14 @@ FusionTest::make_simple_index(const vespalib::string &dump_dir, const IFieldLeng
uint32_t numDocs = 20;
uint32_t numWords = 1000;
DocBuilder b(_schema);
- SequencedTaskExecutor invertThreads(2);
- SequencedTaskExecutor pushThreads(2);
- DocumentInverter inv(_schema, invertThreads, pushThreads, fic);
+ auto invertThreads = SequencedTaskExecutor::create(2);
+ auto pushThreads = SequencedTaskExecutor::create(2);
+ DocumentInverter inv(_schema, *invertThreads, *pushThreads, fic);
inv.invertDocument(10, *make_doc10(b));
- invertThreads.sync();
+ invertThreads->sync();
myPushDocument(inv);
- pushThreads.sync();
+ pushThreads->sync();
IndexBuilder ib(_schema);
TuneFileIndexing tuneFileIndexing;
diff --git a/searchlib/src/tests/memoryindex/document_inverter/document_inverter_test.cpp b/searchlib/src/tests/memoryindex/document_inverter/document_inverter_test.cpp
index 3f798df3c05..60b21699406 100644
--- a/searchlib/src/tests/memoryindex/document_inverter/document_inverter_test.cpp
+++ b/searchlib/src/tests/memoryindex/document_inverter/document_inverter_test.cpp
@@ -117,8 +117,8 @@ public:
struct DocumentInverterTest : public ::testing::Test {
Schema _schema;
DocBuilder _b;
- SequencedTaskExecutor _invertThreads;
- SequencedTaskExecutor _pushThreads;
+ std::unique_ptr<ISequencedTaskExecutor> _invertThreads;
+ std::unique_ptr<ISequencedTaskExecutor> _pushThreads;
WordStore _word_store;
FieldIndexRemover _remover;
test::OrderedFieldIndexInserter _inserter;
@@ -138,26 +138,26 @@ struct DocumentInverterTest : public ::testing::Test {
DocumentInverterTest()
: _schema(makeSchema()),
_b(_schema),
- _invertThreads(2),
- _pushThreads(2),
+ _invertThreads(SequencedTaskExecutor::create(2)),
+ _pushThreads(SequencedTaskExecutor::create(2)),
_word_store(),
_remover(_word_store),
_inserter(),
_calculator(),
_fic(_remover, _inserter, _calculator),
- _inv(_schema, _invertThreads, _pushThreads, _fic)
+ _inv(_schema, *_invertThreads, *_pushThreads, _fic)
{
}
void pushDocuments() {
- _invertThreads.sync();
+ _invertThreads->sync();
uint32_t fieldId = 0;
for (auto &inverter : _inv.getInverters()) {
_inserter.setFieldId(fieldId);
inverter->pushDocuments();
++fieldId;
}
- _pushThreads.sync();
+ _pushThreads->sync();
}
};
diff --git a/searchlib/src/tests/memoryindex/field_index/field_index_test.cpp b/searchlib/src/tests/memoryindex/field_index/field_index_test.cpp
index 512e1bd2051..c562c0cf29c 100644
--- a/searchlib/src/tests/memoryindex/field_index/field_index_test.cpp
+++ b/searchlib/src/tests/memoryindex/field_index/field_index_test.cpp
@@ -872,7 +872,7 @@ struct FieldIndexCollectionTypeTest : public ::testing::Test {
fic(schema, MockFieldLengthInspector())
{
}
- Schema make_schema() {
+ static Schema make_schema() {
Schema result;
result.addIndexField(Schema::IndexField("normal", DataType::STRING));
Schema::IndexField interleaved("interleaved", DataType::STRING);
@@ -902,17 +902,17 @@ public:
Schema _schema;
FieldIndexCollection _fic;
DocBuilder _b;
- SequencedTaskExecutor _invertThreads;
- SequencedTaskExecutor _pushThreads;
+ std::unique_ptr<ISequencedTaskExecutor> _invertThreads;
+ std::unique_ptr<ISequencedTaskExecutor> _pushThreads;
DocumentInverter _inv;
InverterTest(const Schema& schema)
: _schema(schema),
_fic(_schema, MockFieldLengthInspector()),
_b(_schema),
- _invertThreads(2),
- _pushThreads(2),
- _inv(_schema, _invertThreads, _pushThreads, _fic)
+ _invertThreads(SequencedTaskExecutor::create(2)),
+ _pushThreads(SequencedTaskExecutor::create(2)),
+ _inv(_schema, *_invertThreads, *_pushThreads, _fic)
{
}
NormalFieldIndex::PostingList::Iterator find(const vespalib::stringref word, uint32_t field_id) const {
@@ -943,9 +943,9 @@ TEST_F(BasicInverterTest, require_that_inversion_is_working)
endField();
doc = _b.endDocument();
_inv.invertDocument(10, *doc);
- _invertThreads.sync();
+ _invertThreads->sync();
myPushDocument(_inv);
- _pushThreads.sync();
+ _pushThreads->sync();
_b.startDocument("id:ns:searchdocument::20");
_b.startIndexField("f0").
@@ -953,9 +953,9 @@ TEST_F(BasicInverterTest, require_that_inversion_is_working)
endField();
doc = _b.endDocument();
_inv.invertDocument(20, *doc);
- _invertThreads.sync();
+ _invertThreads->sync();
myPushDocument(_inv);
- _pushThreads.sync();
+ _pushThreads->sync();
_b.startDocument("id:ns:searchdocument::30");
_b.startIndexField("f0").
@@ -984,9 +984,9 @@ TEST_F(BasicInverterTest, require_that_inversion_is_working)
endField();
doc = _b.endDocument();
_inv.invertDocument(30, *doc);
- _invertThreads.sync();
+ _invertThreads->sync();
myPushDocument(_inv);
- _pushThreads.sync();
+ _pushThreads->sync();
_b.startDocument("id:ns:searchdocument::40");
_b.startIndexField("f0").
@@ -995,9 +995,9 @@ TEST_F(BasicInverterTest, require_that_inversion_is_working)
endField();
doc = _b.endDocument();
_inv.invertDocument(40, *doc);
- _invertThreads.sync();
+ _invertThreads->sync();
myPushDocument(_inv);
- _pushThreads.sync();
+ _pushThreads->sync();
_b.startDocument("id:ns:searchdocument::999");
_b.startIndexField("f0").
@@ -1025,12 +1025,12 @@ TEST_F(BasicInverterTest, require_that_inversion_is_working)
doc = _b.endDocument();
for (uint32_t docId = 10000; docId < 20000; ++docId) {
_inv.invertDocument(docId, *doc);
- _invertThreads.sync();
+ _invertThreads->sync();
myPushDocument(_inv);
- _pushThreads.sync();
+ _pushThreads->sync();
}
- _pushThreads.sync();
+ _pushThreads->sync();
DataStoreBase::MemStats beforeStats = getFeatureStoreMemStats(_fic);
LOG(info,
"Before feature compaction: allocElems=%zu, usedElems=%zu"
@@ -1044,13 +1044,13 @@ TEST_F(BasicInverterTest, require_that_inversion_is_working)
beforeStats._freeBuffers,
beforeStats._activeBuffers,
beforeStats._holdBuffers);
- myCompactFeatures(_fic, _pushThreads);
+ myCompactFeatures(_fic, *_pushThreads);
std::vector<std::unique_ptr<GenerationHandler::Guard>> guards;
for (auto &fieldIndex : _fic.getFieldIndexes()) {
guards.push_back(std::make_unique<GenerationHandler::Guard>
(fieldIndex->takeGenerationGuard()));
}
- myCommit(_fic, _pushThreads);
+ myCommit(_fic, *_pushThreads);
DataStoreBase::MemStats duringStats = getFeatureStoreMemStats(_fic);
LOG(info,
"During feature compaction: allocElems=%zu, usedElems=%zu"
@@ -1065,7 +1065,7 @@ TEST_F(BasicInverterTest, require_that_inversion_is_working)
duringStats._activeBuffers,
duringStats._holdBuffers);
guards.clear();
- myCommit(_fic, _pushThreads);
+ myCommit(_fic, *_pushThreads);
DataStoreBase::MemStats afterStats = getFeatureStoreMemStats(_fic);
LOG(info,
"After feature compaction: allocElems=%zu, usedElems=%zu"
@@ -1142,17 +1142,17 @@ TEST_F(BasicInverterTest, require_that_inverter_handles_remove_via_document_remo
_b.startIndexField("f1").addStr("a").addStr("c").endField();
Document::UP doc1 = _b.endDocument();
_inv.invertDocument(1, *doc1.get());
- _invertThreads.sync();
+ _invertThreads->sync();
myPushDocument(_inv);
- _pushThreads.sync();
+ _pushThreads->sync();
_b.startDocument("id:ns:searchdocument::2");
_b.startIndexField("f0").addStr("b").addStr("c").endField();
Document::UP doc2 = _b.endDocument();
_inv.invertDocument(2, *doc2.get());
- _invertThreads.sync();
+ _invertThreads->sync();
myPushDocument(_inv);
- _pushThreads.sync();
+ _pushThreads->sync();
EXPECT_TRUE(assertPostingList("[1]", find("a", 0)));
EXPECT_TRUE(assertPostingList("[1,2]", find("b", 0)));
@@ -1160,8 +1160,8 @@ TEST_F(BasicInverterTest, require_that_inverter_handles_remove_via_document_remo
EXPECT_TRUE(assertPostingList("[1]", find("a", 1)));
EXPECT_TRUE(assertPostingList("[1]", find("c", 1)));
- myremove(1, _inv, _invertThreads);
- _pushThreads.sync();
+ myremove(1, _inv, *_invertThreads);
+ _pushThreads->sync();
EXPECT_TRUE(assertPostingList("[]", find("a", 0)));
EXPECT_TRUE(assertPostingList("[2]", find("b", 0)));
@@ -1311,10 +1311,10 @@ TEST_F(UriInverterTest, require_that_uri_indexing_is_working)
endField();
doc = _b.endDocument();
_inv.invertDocument(10, *doc);
- _invertThreads.sync();
+ _invertThreads->sync();
myPushDocument(_inv);
- _pushThreads.sync();
+ _pushThreads->sync();
SimpleMatchData match_data;
{
@@ -1387,10 +1387,10 @@ TEST_F(CjkInverterTest, require_that_cjk_indexing_is_working)
endField();
doc = _b.endDocument();
_inv.invertDocument(10, *doc);
- _invertThreads.sync();
+ _invertThreads->sync();
myPushDocument(_inv);
- _pushThreads.sync();
+ _pushThreads->sync();
SimpleMatchData match_data;
uint32_t fieldId = _schema.getIndexFieldId("f0");
@@ -1445,13 +1445,13 @@ TEST_F(FieldIndexCollectionTest, require_that_insert_tells_which_word_ref_that_w
}
struct RemoverTest : public FieldIndexCollectionTest {
- SequencedTaskExecutor _invertThreads;
- SequencedTaskExecutor _pushThreads;
+ std::unique_ptr<ISequencedTaskExecutor> _invertThreads;
+ std::unique_ptr<ISequencedTaskExecutor> _pushThreads;
RemoverTest()
: FieldIndexCollectionTest(),
- _invertThreads(2),
- _pushThreads(2)
+ _invertThreads(SequencedTaskExecutor::create(2)),
+ _pushThreads(SequencedTaskExecutor::create(2))
{
}
void assertPostingLists(const vespalib::string &e1,
@@ -1462,9 +1462,9 @@ struct RemoverTest : public FieldIndexCollectionTest {
EXPECT_TRUE(assertPostingList(e3, find("b", 1)));
}
void remove(uint32_t docId) {
- DocumentInverter inv(schema, _invertThreads, _pushThreads, fic);
- myremove(docId, inv, _invertThreads);
- _pushThreads.sync();
+ DocumentInverter inv(schema, *_invertThreads, *_pushThreads, fic);
+ myremove(docId, inv, *_invertThreads);
+ _pushThreads->sync();
EXPECT_FALSE(fic.getFieldIndex(0u)->getDocumentRemover().
getStore().get(docId).valid());
}
diff --git a/searchlib/src/tests/memoryindex/memory_index/memory_index_test.cpp b/searchlib/src/tests/memoryindex/memory_index/memory_index_test.cpp
index 4bb0f91659a..5032ed2c179 100644
--- a/searchlib/src/tests/memoryindex/memory_index/memory_index_test.cpp
+++ b/searchlib/src/tests/memoryindex/memory_index/memory_index_test.cpp
@@ -64,8 +64,8 @@ struct MySetup : public IFieldLengthInspector {
struct Index {
Schema schema;
vespalib::ThreadStackExecutor _executor;
- search::SequencedTaskExecutor _invertThreads;
- search::SequencedTaskExecutor _pushThreads;
+ std::unique_ptr<search::ISequencedTaskExecutor> _invertThreads;
+ std::unique_ptr<search::ISequencedTaskExecutor> _pushThreads;
MemoryIndex index;
DocBuilder builder;
uint32_t docid;
@@ -123,15 +123,15 @@ private:
Index::Index(const MySetup &setup)
: schema(setup.schema),
_executor(1, 128 * 1024),
- _invertThreads(2),
- _pushThreads(2),
- index(schema, setup, _invertThreads, _pushThreads),
+ _invertThreads(search::SequencedTaskExecutor::create(2)),
+ _pushThreads(search::SequencedTaskExecutor::create(2)),
+ index(schema, setup, *_invertThreads, *_pushThreads),
builder(schema),
docid(1),
currentField()
{
}
-Index::~Index() {}
+Index::~Index() = default;
//-----------------------------------------------------------------------------
std::string toString(SearchIterator & search)
diff --git a/searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.cpp b/searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.cpp
index 513684d3fd5..4c501defeea 100644
--- a/searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.cpp
+++ b/searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.cpp
@@ -14,7 +14,8 @@ ForegroundTaskExecutor::ForegroundTaskExecutor()
}
ForegroundTaskExecutor::ForegroundTaskExecutor(uint32_t threads)
- : ISequencedTaskExecutor(threads)
+ : ISequencedTaskExecutor(threads),
+ _accepted(0)
{
}
@@ -25,6 +26,7 @@ ForegroundTaskExecutor::executeTask(ExecutorId id, vespalib::Executor::Task::UP
{
assert(id.getId() < getNumExecutors());
task->run();
+ _accepted++;
}
void
@@ -32,4 +34,12 @@ ForegroundTaskExecutor::sync()
{
}
+void ForegroundTaskExecutor::setTaskLimit(uint32_t) {
+
+}
+
+vespalib::ExecutorStats ForegroundTaskExecutor::getStats() {
+ return vespalib::ExecutorStats(0, _accepted.load(std::memory_order_relaxed), 0);
+}
+
} // namespace search
diff --git a/searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.h b/searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.h
index 2074eda009b..0b604fb140d 100644
--- a/searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.h
+++ b/searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.h
@@ -2,7 +2,7 @@
#pragma once
#include "isequencedtaskexecutor.h"
-#include <vespa/vespalib/stllike/hash_map.h>
+#include <atomic>
namespace vespalib { class ThreadStackExecutorBase; }
@@ -25,6 +25,12 @@ public:
void executeTask(ExecutorId id, vespalib::Executor::Task::UP task) override;
void sync() override;
+
+ void setTaskLimit(uint32_t taskLimit) override;
+
+ vespalib::ExecutorStats getStats() override;
+private:
+ std::atomic<uint64_t> _accepted;
};
} // namespace search
diff --git a/searchlib/src/vespa/searchlib/common/isequencedtaskexecutor.h b/searchlib/src/vespa/searchlib/common/isequencedtaskexecutor.h
index 109e8319148..55c26abc3d8 100644
--- a/searchlib/src/vespa/searchlib/common/isequencedtaskexecutor.h
+++ b/searchlib/src/vespa/searchlib/common/isequencedtaskexecutor.h
@@ -2,6 +2,7 @@
#pragma once
#include <vespa/vespalib/util/executor.h>
+#include <vespa/vespalib/util/executor_stats.h>
#include <vespa/vespalib/stllike/string.h>
#include <vespa/vespalib/util/lambdatask.h>
#include <vector>
@@ -67,6 +68,10 @@ public:
*/
virtual void sync() = 0;
+ virtual void setTaskLimit(uint32_t taskLimit) = 0;
+
+ virtual vespalib::ExecutorStats getStats() = 0;
+
/**
* Wrap lambda function into a task and schedule it to be run.
* Caller must ensure that pointers and references are valid and
diff --git a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.cpp b/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.cpp
index 184ff55220f..30723a6eb2a 100644
--- a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.cpp
+++ b/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.cpp
@@ -14,14 +14,15 @@ constexpr uint32_t stackSize = 128 * 1024;
}
-SequencedTaskExecutor::SequencedTaskExecutor(uint32_t threads, uint32_t taskLimit)
- : ISequencedTaskExecutor(threads),
- _executors()
+std::unique_ptr<ISequencedTaskExecutor>
+SequencedTaskExecutor::create(uint32_t threads, uint32_t taskLimit)
{
+ auto executors = std::make_unique<std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>>>();
+ executors->reserve(threads);
for (uint32_t id = 0; id < threads; ++id) {
- auto executor = std::make_unique<BlockingThreadStackExecutor>(1, stackSize, taskLimit);
- _executors.push_back(std::move(executor));
+ executors->push_back(std::make_unique<BlockingThreadStackExecutor>(1, stackSize, taskLimit));
}
+ return std::unique_ptr<ISequencedTaskExecutor>(new SequencedTaskExecutor(std::move(executors)));
}
SequencedTaskExecutor::~SequencedTaskExecutor()
@@ -29,10 +30,16 @@ SequencedTaskExecutor::~SequencedTaskExecutor()
sync();
}
+SequencedTaskExecutor::SequencedTaskExecutor(std::unique_ptr<std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>>> executors)
+ : ISequencedTaskExecutor(executors->size()),
+ _executors(std::move(executors))
+{
+}
+
void
SequencedTaskExecutor::setTaskLimit(uint32_t taskLimit)
{
- for (const auto &executor : _executors) {
+ for (const auto &executor : *_executors) {
executor->setTaskLimit(taskLimit);
}
}
@@ -40,16 +47,15 @@ SequencedTaskExecutor::setTaskLimit(uint32_t taskLimit)
void
SequencedTaskExecutor::executeTask(ExecutorId id, vespalib::Executor::Task::UP task)
{
- assert(id.getId() < _executors.size());
- vespalib::ThreadStackExecutorBase &executor(*_executors[id.getId()]);
- auto rejectedTask = executor.execute(std::move(task));
+ assert(id.getId() < _executors->size());
+ auto rejectedTask = (*_executors)[id.getId()]->execute(std::move(task));
assert(!rejectedTask);
}
void
SequencedTaskExecutor::sync()
{
- for (auto &executor : _executors) {
+ for (auto &executor : *_executors) {
executor->sync();
}
}
@@ -58,7 +64,7 @@ SequencedTaskExecutor::Stats
SequencedTaskExecutor::getStats()
{
Stats accumulatedStats;
- for (auto &executor : _executors) {
+ for (auto &executor :* _executors) {
accumulatedStats += executor->getStats();
}
return accumulatedStats;
diff --git a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.h b/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.h
index 9337f393150..a29e3d5226c 100644
--- a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.h
+++ b/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.h
@@ -6,7 +6,7 @@
namespace vespalib {
struct ExecutorStats;
- class BlockingThreadStackExecutor;
+ class SyncableThreadExecutor;
}
namespace search {
@@ -18,17 +18,19 @@ namespace search {
class SequencedTaskExecutor final : public ISequencedTaskExecutor
{
using Stats = vespalib::ExecutorStats;
- std::vector<std::shared_ptr<vespalib::BlockingThreadStackExecutor>> _executors;
+ std::unique_ptr<std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>>> _executors;
+
+ SequencedTaskExecutor(std::unique_ptr<std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>>> executor);
public:
using ISequencedTaskExecutor::getExecutorId;
- SequencedTaskExecutor(uint32_t threads, uint32_t taskLimit = 1000);
~SequencedTaskExecutor();
- void setTaskLimit(uint32_t taskLimit);
+ void setTaskLimit(uint32_t taskLimit) override;
void executeTask(ExecutorId id, vespalib::Executor::Task::UP task) override;
void sync() override;
- Stats getStats();
+ Stats getStats() override;
+ static std::unique_ptr<ISequencedTaskExecutor> create(uint32_t threads, uint32_t taskLimit = 1000);
};
} // namespace search
diff --git a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutorobserver.cpp b/searchlib/src/vespa/searchlib/common/sequencedtaskexecutorobserver.cpp
index 04504086520..e553e757c37 100644
--- a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutorobserver.cpp
+++ b/searchlib/src/vespa/searchlib/common/sequencedtaskexecutorobserver.cpp
@@ -41,4 +41,12 @@ SequencedTaskExecutorObserver::getExecuteHistory()
return _executeHistory;
}
+void SequencedTaskExecutorObserver::setTaskLimit(uint32_t taskLimit) {
+ _executor.setTaskLimit(taskLimit);
+}
+
+vespalib::ExecutorStats SequencedTaskExecutorObserver::getStats() {
+ return _executor.getStats();
+}
+
} // namespace search
diff --git a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutorobserver.h b/searchlib/src/vespa/searchlib/common/sequencedtaskexecutorobserver.h
index b2de71f06b3..dadd4bf59cf 100644
--- a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutorobserver.h
+++ b/searchlib/src/vespa/searchlib/common/sequencedtaskexecutorobserver.h
@@ -3,8 +3,6 @@
#include "isequencedtaskexecutor.h"
#include <atomic>
-#include <vector>
-#include <mutex>
namespace search {
@@ -31,6 +29,10 @@ public:
uint32_t getExecuteCnt() const { return _executeCnt; }
uint32_t getSyncCnt() const { return _syncCnt; }
std::vector<uint32_t> getExecuteHistory();
+
+ void setTaskLimit(uint32_t taskLimit) override;
+
+ vespalib::ExecutorStats getStats() override;
};
} // namespace search