summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--searchcore/src/tests/proton/attribute/exclusive_attribute_read_accessor/exclusive_attribute_read_accessor_test.cpp8
-rw-r--r--searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp10
-rw-r--r--searchcore/src/tests/proton/documentmetastore/lidreusedelayer/CMakeLists.txt1
-rw-r--r--searchcore/src/tests/proton/documentmetastore/lidreusedelayer/lidreusedelayer_test.cpp52
-rw-r--r--searchcore/src/tests/proton/feed_and_search/feed_and_search.cpp8
-rw-r--r--searchcore/src/tests/proton/index/indexmanager_test.cpp15
-rw-r--r--searchcore/src/tests/proton/reference/document_db_reference_resolver/document_db_reference_resolver_test.cpp35
-rw-r--r--searchcore/src/tests/proton/reference/gid_to_lid_change_listener/gid_to_lid_change_listener_test.cpp6
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/executor_thread_service.cpp19
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/executor_thread_service.h7
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp6
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h21
-rw-r--r--searchcore/src/vespa/searchcore/proton/test/CMakeLists.txt1
-rw-r--r--searchcore/src/vespa/searchcore/proton/test/thread_service_observer.h11
-rw-r--r--searchcore/src/vespa/searchcore/proton/test/threading_service_observer.cpp22
-rw-r--r--searchcore/src/vespa/searchcore/proton/test/threading_service_observer.h20
-rw-r--r--searchlib/src/apps/tests/memoryindexstress_test.cpp10
-rw-r--r--searchlib/src/tests/common/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp4
-rw-r--r--searchlib/src/tests/common/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp70
-rw-r--r--searchlib/src/tests/diskindex/fusion/fusion_test.cpp28
-rw-r--r--searchlib/src/tests/memoryindex/document_inverter/document_inverter_test.cpp14
-rw-r--r--searchlib/src/tests/memoryindex/field_index/field_index_test.cpp74
-rw-r--r--searchlib/src/tests/memoryindex/memory_index/memory_index_test.cpp12
-rw-r--r--searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.cpp12
-rw-r--r--searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.h8
-rw-r--r--searchlib/src/vespa/searchlib/common/isequencedtaskexecutor.h5
-rw-r--r--searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.cpp28
-rw-r--r--searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.h12
-rw-r--r--searchlib/src/vespa/searchlib/common/sequencedtaskexecutorobserver.cpp8
-rw-r--r--searchlib/src/vespa/searchlib/common/sequencedtaskexecutorobserver.h6
-rw-r--r--vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.cpp6
-rw-r--r--vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.h5
-rw-r--r--vespalib/src/vespa/vespalib/util/threadexecutor.h17
-rw-r--r--vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp6
-rw-r--r--vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h17
35 files changed, 312 insertions, 272 deletions
diff --git a/searchcore/src/tests/proton/attribute/exclusive_attribute_read_accessor/exclusive_attribute_read_accessor_test.cpp b/searchcore/src/tests/proton/attribute/exclusive_attribute_read_accessor/exclusive_attribute_read_accessor_test.cpp
index 420e18db5af..d2eba28bc9b 100644
--- a/searchcore/src/tests/proton/attribute/exclusive_attribute_read_accessor/exclusive_attribute_read_accessor_test.cpp
+++ b/searchcore/src/tests/proton/attribute/exclusive_attribute_read_accessor/exclusive_attribute_read_accessor_test.cpp
@@ -24,13 +24,13 @@ createAttribute()
struct Fixture
{
AttributeVector::SP attribute;
- SequencedTaskExecutor writer;
+ std::unique_ptr<ISequencedTaskExecutor> writer;
ExclusiveAttributeReadAccessor accessor;
Fixture()
: attribute(createAttribute()),
- writer(1),
- accessor(attribute, writer)
+ writer(SequencedTaskExecutor::create(1)),
+ accessor(attribute, *writer)
{}
};
@@ -38,7 +38,7 @@ TEST_F("require that attribute write thread is blocked while guard is held", Fix
{
ReadGuard::UP guard = f.accessor.takeGuard();
Gate gate;
- f.writer.execute(f.writer.getExecutorId(f.attribute->getNamePrefix()), [&gate]() { gate.countDown(); });
+ f.writer->execute(f.writer->getExecutorId(f.attribute->getNamePrefix()), [&gate]() { gate.countDown(); });
bool reachedZero = gate.await(100);
EXPECT_FALSE(reachedZero);
EXPECT_EQUAL(1u, gate.getCount());
diff --git a/searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp b/searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp
index f380ce03152..6a6b05be7f0 100644
--- a/searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp
@@ -692,15 +692,15 @@ struct FixtureBase
FixtureBase::FixtureBase(vespalib::duration visibilityDelay)
: _tracer(),
sc(),
- iw(new MyIndexWriter(_tracer)),
- sa(new MySummaryAdapter(*sc._builder->getDocumentTypeRepo())),
- aw(new MyAttributeWriter(_tracer)),
+ iw(std::make_shared<MyIndexWriter>(_tracer)),
+ sa(std::make_shared<MySummaryAdapter>(*sc._builder->getDocumentTypeRepo())),
+ aw(std::make_shared<MyAttributeWriter>(_tracer)),
miw(static_cast<MyIndexWriter&>(*iw)),
msa(static_cast<MySummaryAdapter&>(*sa)),
maw(static_cast<MyAttributeWriter&>(*aw)),
_docIdLimit(0u),
- _dmscReal(new DocumentMetaStoreContext(std::make_shared<BucketDBOwner>())),
- _dmsc(new test::DocumentMetaStoreContextObserver(*_dmscReal)),
+ _dmscReal(std::make_shared<DocumentMetaStoreContext>(std::make_shared<BucketDBOwner>())),
+ _dmsc(std::make_shared<test::DocumentMetaStoreContextObserver>(*_dmscReal)),
pc(sc._builder->getDocumentType().getName(), "fileconfig_test"),
_sharedExecutor(1, 0x10000),
_writeServiceReal(_sharedExecutor),
diff --git a/searchcore/src/tests/proton/documentmetastore/lidreusedelayer/CMakeLists.txt b/searchcore/src/tests/proton/documentmetastore/lidreusedelayer/CMakeLists.txt
index 0d226268d1a..49f929e0127 100644
--- a/searchcore/src/tests/proton/documentmetastore/lidreusedelayer/CMakeLists.txt
+++ b/searchcore/src/tests/proton/documentmetastore/lidreusedelayer/CMakeLists.txt
@@ -5,5 +5,6 @@ vespa_add_executable(searchcore_lidreusedelayer_test_app TEST
DEPENDS
searchcore_server
searchcore_documentmetastore
+ searchcore_test
)
vespa_add_test(NAME searchcore_lidreusedelayer_test_app COMMAND searchcore_lidreusedelayer_test_app)
diff --git a/searchcore/src/tests/proton/documentmetastore/lidreusedelayer/lidreusedelayer_test.cpp b/searchcore/src/tests/proton/documentmetastore/lidreusedelayer/lidreusedelayer_test.cpp
index d305751f7c2..25668f56753 100644
--- a/searchcore/src/tests/proton/documentmetastore/lidreusedelayer/lidreusedelayer_test.cpp
+++ b/searchcore/src/tests/proton/documentmetastore/lidreusedelayer/lidreusedelayer_test.cpp
@@ -1,6 +1,5 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include <vespa/log/log.h>
-LOG_SETUP("lidreusedelayer_test");
+
#include <vespa/vespalib/testkit/testapp.h>
#include <vespa/searchcore/proton/documentmetastore/i_store.h>
#include <vespa/searchcore/proton/documentmetastore/lidreusedelayer.h>
@@ -9,12 +8,14 @@ LOG_SETUP("lidreusedelayer_test");
#include <vespa/searchcore/proton/test/threading_service_observer.h>
#include <vespa/vespalib/util/lambdatask.h>
+#include <vespa/log/log.h>
+LOG_SETUP("lidreusedelayer_test");
+
using vespalib::makeLambdaTask;
namespace proton {
-namespace
-{
+namespace {
bool
assertThreadObserver(uint32_t masterExecuteCnt,
@@ -55,69 +56,52 @@ public:
{
}
- virtual ~MyMetaStore() { }
+ ~MyMetaStore() override = default;
- virtual Result inspectExisting(const GlobalId &) const override
- {
+ Result inspectExisting(const GlobalId &) const override {
return Result();
}
- virtual Result inspect(const GlobalId &) override
- {
+ Result inspect(const GlobalId &) override {
return Result();
}
- virtual Result put(const GlobalId &, const BucketId &, const Timestamp &,
- uint32_t, DocId) override
- {
+ Result put(const GlobalId &, const BucketId &, const Timestamp &, uint32_t, DocId) override {
return Result();
}
- virtual bool updateMetaData(DocId, const BucketId &,
- const Timestamp &) override
- {
+ bool updateMetaData(DocId, const BucketId &, const Timestamp &) override {
return true;
}
- virtual bool remove(DocId) override
- {
+ bool remove(DocId) override {
return true;
}
- virtual void removeComplete(DocId) override
- {
+ void removeComplete(DocId) override {
++_removeCompleteCount;
++_removeCompleteLids;
}
- virtual void move(DocId, DocId) override
- {
+ void move(DocId, DocId) override {
}
- virtual bool validLid(DocId) const override
- {
+ bool validLid(DocId) const override {
return true;
}
- virtual void removeBatch(const std::vector<DocId> &,
- const DocId) override
- {
- }
+ void removeBatch(const std::vector<DocId> &, const DocId) override {}
- virtual void
- removeBatchComplete(const std::vector<DocId> &lidsToRemove) override
- {
+ void removeBatchComplete(const std::vector<DocId> &lidsToRemove) override{
++_removeBatchCompleteCount;
_removeCompleteLids += lidsToRemove.size();
}
- virtual const RawDocumentMetaData &getRawMetaData(DocId) const override
- {
+ const RawDocumentMetaData &getRawMetaData(DocId) const override {
LOG_ABORT("should not be reached");
}
- virtual bool getFreeListActive() const override
- {
+ bool getFreeListActive() const override {
return _freeListActive;
}
diff --git a/searchcore/src/tests/proton/feed_and_search/feed_and_search.cpp b/searchcore/src/tests/proton/feed_and_search/feed_and_search.cpp
index ca45108b698..991cb2e519d 100644
--- a/searchcore/src/tests/proton/feed_and_search/feed_and_search.cpp
+++ b/searchcore/src/tests/proton/feed_and_search/feed_and_search.cpp
@@ -149,9 +149,9 @@ void Test::testSearch(Searchable &source,
void Test::requireThatMemoryIndexCanBeDumpedAndSearched() {
Schema schema = getSchema();
vespalib::ThreadStackExecutor sharedExecutor(2, 0x10000);
- search::SequencedTaskExecutor indexFieldInverter(2);
- search::SequencedTaskExecutor indexFieldWriter(2);
- MemoryIndex memory_index(schema, MockFieldLengthInspector(), indexFieldInverter, indexFieldWriter);
+ auto indexFieldInverter = search::SequencedTaskExecutor::create(2);
+ auto indexFieldWriter = search::SequencedTaskExecutor::create(2);
+ MemoryIndex memory_index(schema, MockFieldLengthInspector(), *indexFieldInverter, *indexFieldWriter);
DocBuilder doc_builder(schema);
Document::UP doc = buildDocument(doc_builder, doc_id1, word1);
@@ -160,7 +160,7 @@ void Test::requireThatMemoryIndexCanBeDumpedAndSearched() {
doc = buildDocument(doc_builder, doc_id2, word2);
memory_index.insertDocument(doc_id2, *doc.get());
memory_index.commit(std::shared_ptr<search::IDestructorCallback>());
- indexFieldWriter.sync();
+ indexFieldWriter->sync();
testSearch(memory_index, word1, doc_id1);
testSearch(memory_index, word2, doc_id2);
diff --git a/searchcore/src/tests/proton/index/indexmanager_test.cpp b/searchcore/src/tests/proton/index/indexmanager_test.cpp
index 7542ef24c52..264cf6d8cfa 100644
--- a/searchcore/src/tests/proton/index/indexmanager_test.cpp
+++ b/searchcore/src/tests/proton/index/indexmanager_test.cpp
@@ -379,10 +379,9 @@ TEST_F(IndexManagerTest, require_that_flush_stats_are_calculated)
{
Schema schema(getSchema());
FieldIndexCollection fic(schema, MockFieldLengthInspector());
- SequencedTaskExecutor invertThreads(2);
- SequencedTaskExecutor pushThreads(2);
- search::memoryindex::DocumentInverter inverter(schema, invertThreads,
- pushThreads, fic);
+ auto invertThreads = SequencedTaskExecutor::create(2);
+ auto pushThreads = SequencedTaskExecutor::create(2);
+ search::memoryindex::DocumentInverter inverter(schema, *invertThreads, *pushThreads, fic);
uint64_t fixed_index_size = fic.getMemoryUsage().allocatedBytes();
uint64_t index_size = fic.getMemoryUsage().allocatedBytes() - fixed_index_size;
@@ -395,9 +394,9 @@ TEST_F(IndexManagerTest, require_that_flush_stats_are_calculated)
Document::UP doc = addDocument(docid);
inverter.invertDocument(docid, *doc);
- invertThreads.sync();
+ invertThreads->sync();
inverter.pushDocuments(std::shared_ptr<search::IDestructorCallback>());
- pushThreads.sync();
+ pushThreads->sync();
index_size = fic.getMemoryUsage().allocatedBytes() - fixed_index_size;
/// Must account for both docid 0 being reserved and the extra after.
@@ -414,9 +413,9 @@ TEST_F(IndexManagerTest, require_that_flush_stats_are_calculated)
inverter.invertDocument(docid + 10, *doc);
doc = addDocument(docid + 100);
inverter.invertDocument(docid + 100, *doc);
- invertThreads.sync();
+ invertThreads->sync();
inverter.pushDocuments(std::shared_ptr<search::IDestructorCallback>());
- pushThreads.sync();
+ pushThreads->sync();
index_size = fic.getMemoryUsage().allocatedBytes() - fixed_index_size;
/// Must account for both docid 0 being reserved and the extra after.
selector_size = (docid + 100 + 1) * sizeof(Source);
diff --git a/searchcore/src/tests/proton/reference/document_db_reference_resolver/document_db_reference_resolver_test.cpp b/searchcore/src/tests/proton/reference/document_db_reference_resolver/document_db_reference_resolver_test.cpp
index b77d9b3f5ab..02e9f3e8687 100644
--- a/searchcore/src/tests/proton/reference/document_db_reference_resolver/document_db_reference_resolver_test.cpp
+++ b/searchcore/src/tests/proton/reference/document_db_reference_resolver/document_db_reference_resolver_test.cpp
@@ -42,7 +42,7 @@ const ReferenceAttribute *getReferenceAttribute(const IGidToLidChangeListener &l
struct MyGidToLidMapperFactory : public IGidToLidMapperFactory {
using SP = std::shared_ptr<MyGidToLidMapperFactory>;
- virtual std::unique_ptr<IGidToLidMapper> getMapper() const override {
+ std::unique_ptr<IGidToLidMapper> getMapper() const override {
return std::unique_ptr<IGidToLidMapper>();
}
};
@@ -60,14 +60,14 @@ struct MyDocumentDBReference : public MockDocumentDBReference {
MyDocumentDBReference(MyGidToLidMapperFactory::SP factory_,
std::shared_ptr<MockGidToLidChangeHandler> gidToLidChangeHandler)
- : factory(factory_),
+ : factory(std::move(factory_)),
_gidToLidChangeHandler(std::move(gidToLidChangeHandler))
{
}
- virtual IGidToLidMapperFactory::SP getGidToLidMapperFactory() override {
+ IGidToLidMapperFactory::SP getGidToLidMapperFactory() override {
return factory;
}
- virtual std::shared_ptr<search::attribute::ReadableAttributeVector> getAttribute(vespalib::stringref name) override {
+ std::shared_ptr<search::attribute::ReadableAttributeVector> getAttribute(vespalib::stringref name) override {
auto itr = attributes.find(name);
if (itr != attributes.end()) {
return itr->second;
@@ -78,7 +78,7 @@ struct MyDocumentDBReference : public MockDocumentDBReference {
void addIntAttribute(vespalib::stringref name) {
attributes[name] = AttributeFactory::createAttribute(name, Config(BasicType::INT32));
}
- virtual std::unique_ptr<GidToLidChangeRegistrator> makeGidToLidChangeRegistrator(const vespalib::string &docTypeName) override {
+ std::unique_ptr<GidToLidChangeRegistrator> makeGidToLidChangeRegistrator(const vespalib::string &docTypeName) override {
return std::make_unique<GidToLidChangeRegistrator>(_gidToLidChangeHandler, docTypeName);
}
@@ -93,12 +93,12 @@ struct MyDocumentDBReference : public MockDocumentDBReference {
struct MyReferenceRegistry : public IDocumentDBReferenceRegistry {
using ReferenceMap = std::map<vespalib::string, IDocumentDBReference::SP>;
ReferenceMap map;
- virtual IDocumentDBReference::SP get(vespalib::stringref name) const override {
+ IDocumentDBReference::SP get(vespalib::stringref name) const override {
auto itr = map.find(name);
ASSERT_TRUE(itr != map.end());
return itr->second;
}
- virtual IDocumentDBReference::SP tryGet(vespalib::stringref name) const override {
+ IDocumentDBReference::SP tryGet(vespalib::stringref name) const override {
auto itr = map.find(name);
if (itr != map.end()) {
return itr->second;
@@ -106,10 +106,10 @@ struct MyReferenceRegistry : public IDocumentDBReferenceRegistry {
return IDocumentDBReference::SP();
}
}
- virtual void add(vespalib::stringref name, IDocumentDBReference::SP reference) override {
+ void add(vespalib::stringref name, IDocumentDBReference::SP reference) override {
map[name] = reference;
}
- virtual void remove(vespalib::stringref) override {}
+ void remove(vespalib::stringref) override {}
};
struct MyAttributeManager : public MockAttributeManager {
@@ -121,7 +121,7 @@ struct MyAttributeManager : public MockAttributeManager {
}
const ReferenceAttribute *getReferenceAttribute(const vespalib::string &name) const {
AttributeGuard::UP guard = getAttribute(name);
- const ReferenceAttribute *result = dynamic_cast<const ReferenceAttribute *>(guard->get());
+ auto *result = dynamic_cast<const ReferenceAttribute *>(guard->get());
ASSERT_TRUE(result != nullptr);
return result;
}
@@ -155,8 +155,7 @@ struct DocumentModel {
}
};
-DocumentModel::~DocumentModel() {
-}
+DocumentModel::~DocumentModel() = default;
void
set(const vespalib::string &name,
@@ -182,7 +181,7 @@ createImportedFieldsConfig()
const ImportedAttributeVector &
asImportedAttribute(const IAttributeVector &attr)
{
- const ImportedAttributeVector *result = dynamic_cast<const ImportedAttributeVector *>(&attr);
+ auto *result = dynamic_cast<const ImportedAttributeVector *>(&attr);
ASSERT_TRUE(result != nullptr);
return *result;
}
@@ -199,7 +198,7 @@ struct Fixture {
MyAttributeManager oldAttrMgr;
DocumentModel docModel;
ImportedFieldsConfig importedFieldsCfg;
- SequencedTaskExecutor _attributeFieldWriter;
+ std::unique_ptr<ISequencedTaskExecutor> _attributeFieldWriter;
Fixture() :
factory(std::make_shared<MyGidToLidMapperFactory>()),
_gidToLidChangeListenerRefCount(),
@@ -211,7 +210,7 @@ struct Fixture {
attrMgr(),
docModel(),
importedFieldsCfg(createImportedFieldsConfig()),
- _attributeFieldWriter(1)
+ _attributeFieldWriter(SequencedTaskExecutor::create(1))
{
registry.add("parent", parentReference);
@@ -231,7 +230,7 @@ struct Fixture {
oldAttrMgr.addReferenceAttribute("parent3_ref");
}
ImportedAttributesRepo::UP resolve(vespalib::duration visibilityDelay, bool useReferences) {
- DocumentDBReferenceResolver resolver(registry, docModel.childDocType, importedFieldsCfg, docModel.childDocType, _gidToLidChangeListenerRefCount, _attributeFieldWriter, useReferences);
+ DocumentDBReferenceResolver resolver(registry, docModel.childDocType, importedFieldsCfg, docModel.childDocType, _gidToLidChangeListenerRefCount, *_attributeFieldWriter, useReferences);
return resolver.resolve(attrMgr, oldAttrMgr, std::shared_ptr<search::IDocumentMetaStoreContext>(), visibilityDelay);
}
ImportedAttributesRepo::UP resolve(vespalib::duration visibilityDelay) {
@@ -244,7 +243,7 @@ struct Fixture {
return resolve(vespalib::duration::zero());
}
void teardown() {
- DocumentDBReferenceResolver resolver(registry, docModel.childDocType, importedFieldsCfg, docModel.childDocType, _gidToLidChangeListenerRefCount, _attributeFieldWriter, false);
+ DocumentDBReferenceResolver resolver(registry, docModel.childDocType, importedFieldsCfg, docModel.childDocType, _gidToLidChangeListenerRefCount, *_attributeFieldWriter, false);
resolver.teardown(attrMgr);
}
const IGidToLidMapperFactory *getMapperFactoryPtr(const vespalib::string &attrName) {
@@ -254,7 +253,7 @@ struct Fixture {
const vespalib::string &referenceField,
const vespalib::string &targetField,
bool useSearchCache,
- ImportedAttributeVector::SP attr) {
+ const ImportedAttributeVector::SP & attr) {
ASSERT_TRUE(attr.get());
EXPECT_EQUAL(name, attr->getName());
EXPECT_EQUAL(attrMgr.getReferenceAttribute(referenceField), attr->getReferenceAttribute().get());
diff --git a/searchcore/src/tests/proton/reference/gid_to_lid_change_listener/gid_to_lid_change_listener_test.cpp b/searchcore/src/tests/proton/reference/gid_to_lid_change_listener/gid_to_lid_change_listener_test.cpp
index 159ce17ef45..a023ff175d6 100644
--- a/searchcore/src/tests/proton/reference/gid_to_lid_change_listener/gid_to_lid_change_listener_test.cpp
+++ b/searchcore/src/tests/proton/reference/gid_to_lid_change_listener/gid_to_lid_change_listener_test.cpp
@@ -49,13 +49,13 @@ struct MyGidToLidMapperFactory : public MockGidToLidMapperFactory
struct Fixture
{
std::shared_ptr<ReferenceAttribute> _attr;
- search::SequencedTaskExecutor _writer;
+ std::unique_ptr<search::ISequencedTaskExecutor> _writer;
MonitoredRefCount _refCount;
std::unique_ptr<GidToLidChangeListener> _listener;
Fixture()
: _attr(std::make_shared<ReferenceAttribute>("test", Config(BasicType::REFERENCE))),
- _writer(1),
+ _writer(search::SequencedTaskExecutor::create(1)),
_refCount(),
_listener()
{
@@ -91,7 +91,7 @@ struct Fixture
}
void allocListener() {
- _listener = std::make_unique<GidToLidChangeListener>(_writer, _attr, _refCount, "test", "testdoc");
+ _listener = std::make_unique<GidToLidChangeListener>(*_writer, _attr, _refCount, "test", "testdoc");
}
void notifyPutDone(const GlobalId &gid, uint32_t referencedDoc) {
diff --git a/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.cpp b/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.cpp
index af8e16f657b..6ca385711b0 100644
--- a/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.cpp
@@ -1,11 +1,10 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "executor_thread_service.h"
-#include <vespa/vespalib/util/closuretask.h>
+#include <vespa/vespalib/util/lambdatask.h>
#include <vespa/fastos/thread.h>
-using vespalib::makeClosure;
-using vespalib::makeTask;
+using vespalib::makeLambdaTask;
using vespalib::Executor;
using vespalib::Gate;
using vespalib::Runnable;
@@ -32,7 +31,7 @@ std::unique_ptr<internal::ThreadId>
getThreadId(ThreadStackExecutorBase &executor)
{
std::unique_ptr<internal::ThreadId> id = std::make_unique<internal::ThreadId>();
- executor.execute(makeTask(makeClosure(&sampleThreadId, &id->_id)));
+ executor.execute(makeLambdaTask([threadId=&id->_id] { sampleThreadId(threadId);}));
executor.sync();
return id;
}
@@ -52,7 +51,7 @@ ExecutorThreadService::ExecutorThreadService(ThreadStackExecutorBase &executor)
{
}
-ExecutorThreadService::~ExecutorThreadService() {}
+ExecutorThreadService::~ExecutorThreadService() = default;
void
ExecutorThreadService::run(Runnable &runnable)
@@ -61,7 +60,7 @@ ExecutorThreadService::run(Runnable &runnable)
runnable.run();
} else {
Gate gate;
- _executor.execute(makeTask(makeClosure(&runRunnable, &runnable, &gate)));
+ _executor.execute(makeLambdaTask([runnablePtr=&runnable, gatePtr=&gate] { runRunnable(runnablePtr, gatePtr); }));
gate.await();
}
}
@@ -73,4 +72,12 @@ ExecutorThreadService::isCurrentThread() const
return FastOS_Thread::CompareThreadIds(_threadId->_id, currentThreadId);
}
+vespalib::ThreadExecutor::Stats ExecutorThreadService::getStats() {
+ return _executor.getStats();
+}
+
+void ExecutorThreadService::setTaskLimit(uint32_t taskLimit) {
+ _executor.setTaskLimit(taskLimit);
+}
+
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.h b/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.h
index 521565358d9..ccdfb6b72cd 100644
--- a/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.h
+++ b/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.h
@@ -21,9 +21,8 @@ public:
ExecutorThreadService(vespalib::ThreadStackExecutorBase &executor);
~ExecutorThreadService();
- /**
- * Implements IThreadService
- */
+ Stats getStats() override;
+
vespalib::Executor::Task::UP execute(vespalib::Executor::Task::UP task) override {
return _executor.execute(std::move(task));
}
@@ -34,6 +33,8 @@ public:
}
bool isCurrentThread() const override;
size_t getNumThreads() const override { return _executor.getNumThreads(); }
+
+ void setTaskLimit(uint32_t taskLimit) override;
};
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp
index 262363f4250..058197f0271 100644
--- a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp
@@ -19,9 +19,9 @@ ExecutorThreadingService::ExecutorThreadingService(vespalib::ThreadStackExecutor
_masterService(_masterExecutor),
_indexService(_indexExecutor),
_summaryService(_summaryExecutor),
- _indexFieldInverter(std::make_unique<SequencedTaskExecutor>(threads, taskLimit)),
- _indexFieldWriter(std::make_unique<SequencedTaskExecutor>(threads, taskLimit)),
- _attributeFieldWriter(std::make_unique<SequencedTaskExecutor>(threads, taskLimit))
+ _indexFieldInverter(SequencedTaskExecutor::create(threads, taskLimit)),
+ _indexFieldWriter(SequencedTaskExecutor::create(threads, taskLimit)),
+ _attributeFieldWriter(SequencedTaskExecutor::create(threads, taskLimit))
{
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h
index 9ab6eba39e1..7bbe1cb162a 100644
--- a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h
+++ b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h
@@ -6,7 +6,6 @@
#include <vespa/vespalib/util/blockingthreadstackexecutor.h>
#include <vespa/vespalib/util/threadstackexecutor.h>
-namespace search { class SequencedTaskExecutor; }
namespace proton {
class ExecutorThreadingServiceStats;
@@ -18,16 +17,16 @@ class ExecutorThreadingServiceStats;
class ExecutorThreadingService : public searchcorespi::index::IThreadingService
{
private:
- vespalib::ThreadStackExecutorBase & _sharedExecutor;
- vespalib::ThreadStackExecutor _masterExecutor;
- vespalib::BlockingThreadStackExecutor _indexExecutor;
- vespalib::BlockingThreadStackExecutor _summaryExecutor;
- ExecutorThreadService _masterService;
- ExecutorThreadService _indexService;
- ExecutorThreadService _summaryService;
- std::unique_ptr<search::SequencedTaskExecutor> _indexFieldInverter;
- std::unique_ptr<search::SequencedTaskExecutor> _indexFieldWriter;
- std::unique_ptr<search::SequencedTaskExecutor> _attributeFieldWriter;
+ vespalib::ThreadStackExecutorBase & _sharedExecutor;
+ vespalib::ThreadStackExecutor _masterExecutor;
+ vespalib::BlockingThreadStackExecutor _indexExecutor;
+ vespalib::BlockingThreadStackExecutor _summaryExecutor;
+ ExecutorThreadService _masterService;
+ ExecutorThreadService _indexService;
+ ExecutorThreadService _summaryService;
+ std::unique_ptr<search::ISequencedTaskExecutor> _indexFieldInverter;
+ std::unique_ptr<search::ISequencedTaskExecutor> _indexFieldWriter;
+ std::unique_ptr<search::ISequencedTaskExecutor> _attributeFieldWriter;
public:
/**
diff --git a/searchcore/src/vespa/searchcore/proton/test/CMakeLists.txt b/searchcore/src/vespa/searchcore/proton/test/CMakeLists.txt
index ad5ae54f6e1..488ac1041f1 100644
--- a/searchcore/src/vespa/searchcore/proton/test/CMakeLists.txt
+++ b/searchcore/src/vespa/searchcore/proton/test/CMakeLists.txt
@@ -8,6 +8,7 @@ vespa_add_library(searchcore_test STATIC
documentdb_config_builder.cpp
dummy_feed_view.cpp
userdocumentsbuilder.cpp
+ threading_service_observer.cpp
DEPENDS
searchcore_fconfig
)
diff --git a/searchcore/src/vespa/searchcore/proton/test/thread_service_observer.h b/searchcore/src/vespa/searchcore/proton/test/thread_service_observer.h
index c8d0f8968c9..766bdeeefb0 100644
--- a/searchcore/src/vespa/searchcore/proton/test/thread_service_observer.h
+++ b/searchcore/src/vespa/searchcore/proton/test/thread_service_observer.h
@@ -20,9 +20,6 @@ public:
uint32_t getExecuteCnt() const { return _executeCnt; }
- /**
- * Implements IThreadService
- */
vespalib::Executor::Task::UP execute(vespalib::Executor::Task::UP task) override {
++_executeCnt;
return _service.execute(std::move(task));
@@ -39,6 +36,14 @@ public:
}
size_t getNumThreads() const override { return _service.getNumThreads(); }
+ Stats getStats() override {
+ return _service.getStats();
+ }
+
+ void setTaskLimit(uint32_t taskLimit) override {
+ _service.setTaskLimit(taskLimit);
+ }
+
};
}
diff --git a/searchcore/src/vespa/searchcore/proton/test/threading_service_observer.cpp b/searchcore/src/vespa/searchcore/proton/test/threading_service_observer.cpp
new file mode 100644
index 00000000000..30a6334d764
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/proton/test/threading_service_observer.cpp
@@ -0,0 +1,22 @@
+// Copyright 2020 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "threading_service_observer.h"
+
+namespace proton::test {
+
+ThreadingServiceObserver::ThreadingServiceObserver(searchcorespi::index::IThreadingService &service)
+ : _service(service),
+ _master(_service.master()),
+ _index(service.index()),
+ _summary(service.summary()),
+ _shared(service.shared()),
+ _indexFieldInverter(_service.indexFieldInverter()),
+ _indexFieldWriter(_service.indexFieldWriter()),
+ _attributeFieldWriter(_service.attributeFieldWriter())
+{
+}
+
+ThreadingServiceObserver::~ThreadingServiceObserver() = default;
+
+}
+
diff --git a/searchcore/src/vespa/searchcore/proton/test/threading_service_observer.h b/searchcore/src/vespa/searchcore/proton/test/threading_service_observer.h
index e1a4433ed4b..7ac9c0c68f2 100644
--- a/searchcore/src/vespa/searchcore/proton/test/threading_service_observer.h
+++ b/searchcore/src/vespa/searchcore/proton/test/threading_service_observer.h
@@ -21,18 +21,8 @@ private:
search::SequencedTaskExecutorObserver _attributeFieldWriter;
public:
- ThreadingServiceObserver(searchcorespi::index::IThreadingService &service)
- : _service(service),
- _master(_service.master()),
- _index(service.index()),
- _summary(service.summary()),
- _shared(service.shared()),
- _indexFieldInverter(_service.indexFieldInverter()),
- _indexFieldWriter(_service.indexFieldWriter()),
- _attributeFieldWriter(_service.attributeFieldWriter())
- {
- }
- ~ThreadingServiceObserver() override { }
+ ThreadingServiceObserver(searchcorespi::index::IThreadingService &service);
+ ~ThreadingServiceObserver() override;
const ThreadServiceObserver &masterObserver() const {
return _master;
}
@@ -53,16 +43,10 @@ public:
return _attributeFieldWriter;
}
- /**
- * Implements vespalib::Syncable
- */
vespalib::Syncable &sync() override {
return _service.sync();
}
- /**
- * Implements IThreadingService
- */
searchcorespi::index::IThreadService &master() override {
return _master;
}
diff --git a/searchlib/src/apps/tests/memoryindexstress_test.cpp b/searchlib/src/apps/tests/memoryindexstress_test.cpp
index 60f3a6b7664..dc950b84508 100644
--- a/searchlib/src/apps/tests/memoryindexstress_test.cpp
+++ b/searchlib/src/apps/tests/memoryindexstress_test.cpp
@@ -195,8 +195,8 @@ struct Fixture {
Schema schema;
DocumentTypeRepo repo;
vespalib::ThreadStackExecutor _executor;
- search::SequencedTaskExecutor _invertThreads;
- search::SequencedTaskExecutor _pushThreads;
+ std::unique_ptr<search::ISequencedTaskExecutor> _invertThreads;
+ std::unique_ptr<search::ISequencedTaskExecutor> _pushThreads;
MemoryIndex index;
uint32_t _readThreads;
vespalib::ThreadStackExecutor _writer; // 1 write thread
@@ -247,9 +247,9 @@ Fixture::Fixture(uint32_t readThreads)
: schema(makeSchema()),
repo(makeDocTypeRepoConfig()),
_executor(1, 128 * 1024),
- _invertThreads(2),
- _pushThreads(2),
- index(schema, MockFieldLengthInspector(), _invertThreads, _pushThreads),
+ _invertThreads(search::SequencedTaskExecutor::create(2)),
+ _pushThreads(search::SequencedTaskExecutor::create(2)),
+ index(schema, MockFieldLengthInspector(), *_invertThreads, *_pushThreads),
_readThreads(readThreads),
_writer(1, 128 * 1024),
_readers(readThreads, 128 * 1024),
diff --git a/searchlib/src/tests/common/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp b/searchlib/src/tests/common/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp
index a51becfbf13..b2b15ded274 100644
--- a/searchlib/src/tests/common/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp
+++ b/searchlib/src/tests/common/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp
@@ -16,9 +16,9 @@ int main(int argc, char *argv[]) {
if (argc > 2)
numThreads = atoi(argv[2]);
- SequencedTaskExecutor executor(numThreads);
+ auto executor = SequencedTaskExecutor::create(numThreads);
for (unsigned long tid(0); tid < numTasks; tid++) {
- executor.executeTask(ExecutorId(tid%numThreads), vespalib::makeLambdaTask([&counter] { counter++; }));
+ executor->executeTask(ExecutorId(tid%numThreads), vespalib::makeLambdaTask([&counter] { counter++; }));
}
return 0;
}
diff --git a/searchlib/src/tests/common/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp b/searchlib/src/tests/common/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp
index fcc7fd7300d..c311a59a56c 100644
--- a/searchlib/src/tests/common/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp
+++ b/searchlib/src/tests/common/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp
@@ -17,9 +17,9 @@ namespace search::common {
class Fixture
{
public:
- SequencedTaskExecutor _threads;
+ std::unique_ptr<ISequencedTaskExecutor> _threads;
- Fixture() : _threads(2) { }
+ Fixture() : _threads(SequencedTaskExecutor::create(2)) { }
};
@@ -67,11 +67,11 @@ public:
TEST_F("testExecute", Fixture) {
std::shared_ptr<TestObj> tv(std::make_shared<TestObj>());
EXPECT_EQUAL(0, tv->_val);
- f._threads.execute(1, [=]() { tv->modify(0, 42); });
+ f._threads->execute(1, [=]() { tv->modify(0, 42); });
tv->wait(1);
EXPECT_EQUAL(0, tv->_fail);
EXPECT_EQUAL(42, tv->_val);
- f._threads.sync();
+ f._threads->sync();
EXPECT_EQUAL(0, tv->_fail);
EXPECT_EQUAL(42, tv->_val);
}
@@ -81,12 +81,12 @@ TEST_F("require that task with same component id are serialized", Fixture)
{
std::shared_ptr<TestObj> tv(std::make_shared<TestObj>());
EXPECT_EQUAL(0, tv->_val);
- f._threads.execute(0, [=]() { usleep(2000); tv->modify(0, 14); });
- f._threads.execute(0, [=]() { tv->modify(14, 42); });
+ f._threads->execute(0, [=]() { usleep(2000); tv->modify(0, 14); });
+ f._threads->execute(0, [=]() { tv->modify(14, 42); });
tv->wait(2);
EXPECT_EQUAL(0, tv->_fail);
EXPECT_EQUAL(42, tv->_val);
- f._threads.sync();
+ f._threads->sync();
EXPECT_EQUAL(0, tv->_fail);
EXPECT_EQUAL(42, tv->_val);
}
@@ -97,15 +97,15 @@ TEST_F("require that task with different component ids are not serialized", Fixt
for (tryCnt = 0; tryCnt < 100; ++tryCnt) {
std::shared_ptr<TestObj> tv(std::make_shared<TestObj>());
EXPECT_EQUAL(0, tv->_val);
- f._threads.execute(0, [=]() { usleep(2000); tv->modify(0, 14); });
- f._threads.execute(2, [=]() { tv->modify(14, 42); });
+ f._threads->execute(0, [=]() { usleep(2000); tv->modify(0, 14); });
+ f._threads->execute(2, [=]() { tv->modify(14, 42); });
tv->wait(2);
if (tv->_fail != 1) {
continue;
}
EXPECT_EQUAL(1, tv->_fail);
EXPECT_EQUAL(14, tv->_val);
- f._threads.sync();
+ f._threads->sync();
EXPECT_EQUAL(1, tv->_fail);
EXPECT_EQUAL(14, tv->_val);
break;
@@ -119,12 +119,12 @@ TEST_F("require that task with same string component id are serialized", Fixture
std::shared_ptr<TestObj> tv(std::make_shared<TestObj>());
EXPECT_EQUAL(0, tv->_val);
auto test2 = [=]() { tv->modify(14, 42); };
- f._threads.execute(f._threads.getExecutorId("0"), [=]() { usleep(2000); tv->modify(0, 14); });
- f._threads.execute(f._threads.getExecutorId("0"), test2);
+ f._threads->execute(f._threads->getExecutorId("0"), [=]() { usleep(2000); tv->modify(0, 14); });
+ f._threads->execute(f._threads->getExecutorId("0"), test2);
tv->wait(2);
EXPECT_EQUAL(0, tv->_fail);
EXPECT_EQUAL(42, tv->_val);
- f._threads.sync();
+ f._threads->sync();
EXPECT_EQUAL(0, tv->_fail);
EXPECT_EQUAL(42, tv->_val);
}
@@ -137,15 +137,15 @@ int detectSerializeFailure(Fixture &f, vespalib::stringref altComponentId, int t
for (tryCnt = 0; tryCnt < tryLimit; ++tryCnt) {
std::shared_ptr<TestObj> tv(std::make_shared<TestObj>());
EXPECT_EQUAL(0, tv->_val);
- f._threads.execute(f._threads.getExecutorId("0"), [=]() { usleep(2000); tv->modify(0, 14); });
- f._threads.execute(f._threads.getExecutorId(altComponentId), [=]() { tv->modify(14, 42); });
+ f._threads->execute(f._threads->getExecutorId("0"), [=]() { usleep(2000); tv->modify(0, 14); });
+ f._threads->execute(f._threads->getExecutorId(altComponentId), [=]() { tv->modify(14, 42); });
tv->wait(2);
if (tv->_fail != 1) {
continue;
}
EXPECT_EQUAL(1, tv->_fail);
EXPECT_EQUAL(14, tv->_val);
- f._threads.sync();
+ f._threads->sync();
EXPECT_EQUAL(1, tv->_fail);
EXPECT_EQUAL(14, tv->_val);
break;
@@ -157,10 +157,10 @@ vespalib::string makeAltComponentId(Fixture &f)
{
int tryCnt = 0;
char altComponentId[20];
- ISequencedTaskExecutor::ExecutorId executorId0 = f._threads.getExecutorId("0");
+ ISequencedTaskExecutor::ExecutorId executorId0 = f._threads->getExecutorId("0");
for (tryCnt = 1; tryCnt < 100; ++tryCnt) {
sprintf(altComponentId, "%d", tryCnt);
- if (f._threads.getExecutorId(altComponentId) == executorId0) {
+ if (f._threads->getExecutorId(altComponentId) == executorId0) {
break;
}
}
@@ -193,9 +193,9 @@ TEST_F("require that execute works with const lambda", Fixture)
std::vector<int> res;
const auto lambda = [i, &res]() mutable
{ res.push_back(i--); res.push_back(i--); };
- f._threads.execute(0, lambda);
- f._threads.execute(0, lambda);
- f._threads.sync();
+ f._threads->execute(0, lambda);
+ f._threads->execute(0, lambda);
+ f._threads->sync();
std::vector<int> exp({5, 4, 5, 4});
EXPECT_EQUAL(exp, res);
EXPECT_EQUAL(5, i);
@@ -208,9 +208,9 @@ TEST_F("require that execute works with reference to lambda", Fixture)
auto lambda = [i, &res]() mutable
{ res.push_back(i--); res.push_back(i--); };
auto &lambdaref = lambda;
- f._threads.execute(0, lambdaref);
- f._threads.execute(0, lambdaref);
- f._threads.sync();
+ f._threads->execute(0, lambdaref);
+ f._threads->execute(0, lambdaref);
+ f._threads->sync();
std::vector<int> exp({5, 4, 5, 4});
EXPECT_EQUAL(exp, res);
EXPECT_EQUAL(5, i);
@@ -222,28 +222,28 @@ TEST_F("require that executeLambda works", Fixture)
std::vector<int> res;
const auto lambda = [i, &res]() mutable
{ res.push_back(i--); res.push_back(i--); };
- f._threads.executeLambda(ISequencedTaskExecutor::ExecutorId(0), lambda);
- f._threads.sync();
+ f._threads->executeLambda(ISequencedTaskExecutor::ExecutorId(0), lambda);
+ f._threads->sync();
std::vector<int> exp({5, 4});
EXPECT_EQUAL(exp, res);
EXPECT_EQUAL(5, i);
}
TEST("require that you get correct number of executors") {
- SequencedTaskExecutor seven(7);
- EXPECT_EQUAL(7u, seven.getNumExecutors());
+ auto seven = SequencedTaskExecutor::create(7);
+ EXPECT_EQUAL(7u, seven->getNumExecutors());
}
TEST("require that you distribute well") {
- SequencedTaskExecutor seven(7);
- EXPECT_EQUAL(7u, seven.getNumExecutors());
- EXPECT_EQUAL(97u, seven.getComponentHashSize());
- EXPECT_EQUAL(0u, seven.getComponentEffectiveHashSize());
+ auto seven = SequencedTaskExecutor::create(7);
+ EXPECT_EQUAL(7u, seven->getNumExecutors());
+ EXPECT_EQUAL(97u, seven->getComponentHashSize());
+ EXPECT_EQUAL(0u, seven->getComponentEffectiveHashSize());
for (uint32_t id=0; id < 1000; id++) {
- EXPECT_EQUAL((id%97)%7, seven.getExecutorId(id).getId());
+ EXPECT_EQUAL((id%97)%7, seven->getExecutorId(id).getId());
}
- EXPECT_EQUAL(97u, seven.getComponentHashSize());
- EXPECT_EQUAL(97u, seven.getComponentEffectiveHashSize());
+ EXPECT_EQUAL(97u, seven->getComponentHashSize());
+ EXPECT_EQUAL(97u, seven->getComponentEffectiveHashSize());
}
}
diff --git a/searchlib/src/tests/diskindex/fusion/fusion_test.cpp b/searchlib/src/tests/diskindex/fusion/fusion_test.cpp
index 1825c00ceda..92d0659d984 100644
--- a/searchlib/src/tests/diskindex/fusion/fusion_test.cpp
+++ b/searchlib/src/tests/diskindex/fusion/fusion_test.cpp
@@ -314,16 +314,16 @@ FusionTest::requireThatFusionIsWorking(const vespalib::string &prefix, bool dire
addField("f4"));
FieldIndexCollection fic(schema, MockFieldLengthInspector());
DocBuilder b(schema);
- SequencedTaskExecutor invertThreads(2);
- SequencedTaskExecutor pushThreads(2);
- DocumentInverter inv(schema, invertThreads, pushThreads, fic);
+ auto invertThreads = SequencedTaskExecutor::create(2);
+ auto pushThreads = SequencedTaskExecutor::create(2);
+ DocumentInverter inv(schema, *invertThreads, *pushThreads, fic);
Document::UP doc;
doc = make_doc10(b);
inv.invertDocument(10, *doc);
- invertThreads.sync();
+ invertThreads->sync();
myPushDocument(inv);
- pushThreads.sync();
+ pushThreads->sync();
b.startDocument("id:ns:searchdocument::11").
startIndexField("f3").
@@ -331,9 +331,9 @@ FusionTest::requireThatFusionIsWorking(const vespalib::string &prefix, bool dire
endField();
doc = b.endDocument();
inv.invertDocument(11, *doc);
- invertThreads.sync();
+ invertThreads->sync();
myPushDocument(inv);
- pushThreads.sync();
+ pushThreads->sync();
b.startDocument("id:ns:searchdocument::12").
startIndexField("f3").
@@ -341,9 +341,9 @@ FusionTest::requireThatFusionIsWorking(const vespalib::string &prefix, bool dire
endField();
doc = b.endDocument();
inv.invertDocument(12, *doc);
- invertThreads.sync();
+ invertThreads->sync();
myPushDocument(inv);
- pushThreads.sync();
+ pushThreads->sync();
IndexBuilder ib(schema);
vespalib::string dump2dir = prefix + "dump2";
@@ -455,14 +455,14 @@ FusionTest::make_simple_index(const vespalib::string &dump_dir, const IFieldLeng
uint32_t numDocs = 20;
uint32_t numWords = 1000;
DocBuilder b(_schema);
- SequencedTaskExecutor invertThreads(2);
- SequencedTaskExecutor pushThreads(2);
- DocumentInverter inv(_schema, invertThreads, pushThreads, fic);
+ auto invertThreads = SequencedTaskExecutor::create(2);
+ auto pushThreads = SequencedTaskExecutor::create(2);
+ DocumentInverter inv(_schema, *invertThreads, *pushThreads, fic);
inv.invertDocument(10, *make_doc10(b));
- invertThreads.sync();
+ invertThreads->sync();
myPushDocument(inv);
- pushThreads.sync();
+ pushThreads->sync();
IndexBuilder ib(_schema);
TuneFileIndexing tuneFileIndexing;
diff --git a/searchlib/src/tests/memoryindex/document_inverter/document_inverter_test.cpp b/searchlib/src/tests/memoryindex/document_inverter/document_inverter_test.cpp
index 3f798df3c05..60b21699406 100644
--- a/searchlib/src/tests/memoryindex/document_inverter/document_inverter_test.cpp
+++ b/searchlib/src/tests/memoryindex/document_inverter/document_inverter_test.cpp
@@ -117,8 +117,8 @@ public:
struct DocumentInverterTest : public ::testing::Test {
Schema _schema;
DocBuilder _b;
- SequencedTaskExecutor _invertThreads;
- SequencedTaskExecutor _pushThreads;
+ std::unique_ptr<ISequencedTaskExecutor> _invertThreads;
+ std::unique_ptr<ISequencedTaskExecutor> _pushThreads;
WordStore _word_store;
FieldIndexRemover _remover;
test::OrderedFieldIndexInserter _inserter;
@@ -138,26 +138,26 @@ struct DocumentInverterTest : public ::testing::Test {
DocumentInverterTest()
: _schema(makeSchema()),
_b(_schema),
- _invertThreads(2),
- _pushThreads(2),
+ _invertThreads(SequencedTaskExecutor::create(2)),
+ _pushThreads(SequencedTaskExecutor::create(2)),
_word_store(),
_remover(_word_store),
_inserter(),
_calculator(),
_fic(_remover, _inserter, _calculator),
- _inv(_schema, _invertThreads, _pushThreads, _fic)
+ _inv(_schema, *_invertThreads, *_pushThreads, _fic)
{
}
void pushDocuments() {
- _invertThreads.sync();
+ _invertThreads->sync();
uint32_t fieldId = 0;
for (auto &inverter : _inv.getInverters()) {
_inserter.setFieldId(fieldId);
inverter->pushDocuments();
++fieldId;
}
- _pushThreads.sync();
+ _pushThreads->sync();
}
};
diff --git a/searchlib/src/tests/memoryindex/field_index/field_index_test.cpp b/searchlib/src/tests/memoryindex/field_index/field_index_test.cpp
index 512e1bd2051..c562c0cf29c 100644
--- a/searchlib/src/tests/memoryindex/field_index/field_index_test.cpp
+++ b/searchlib/src/tests/memoryindex/field_index/field_index_test.cpp
@@ -872,7 +872,7 @@ struct FieldIndexCollectionTypeTest : public ::testing::Test {
fic(schema, MockFieldLengthInspector())
{
}
- Schema make_schema() {
+ static Schema make_schema() {
Schema result;
result.addIndexField(Schema::IndexField("normal", DataType::STRING));
Schema::IndexField interleaved("interleaved", DataType::STRING);
@@ -902,17 +902,17 @@ public:
Schema _schema;
FieldIndexCollection _fic;
DocBuilder _b;
- SequencedTaskExecutor _invertThreads;
- SequencedTaskExecutor _pushThreads;
+ std::unique_ptr<ISequencedTaskExecutor> _invertThreads;
+ std::unique_ptr<ISequencedTaskExecutor> _pushThreads;
DocumentInverter _inv;
InverterTest(const Schema& schema)
: _schema(schema),
_fic(_schema, MockFieldLengthInspector()),
_b(_schema),
- _invertThreads(2),
- _pushThreads(2),
- _inv(_schema, _invertThreads, _pushThreads, _fic)
+ _invertThreads(SequencedTaskExecutor::create(2)),
+ _pushThreads(SequencedTaskExecutor::create(2)),
+ _inv(_schema, *_invertThreads, *_pushThreads, _fic)
{
}
NormalFieldIndex::PostingList::Iterator find(const vespalib::stringref word, uint32_t field_id) const {
@@ -943,9 +943,9 @@ TEST_F(BasicInverterTest, require_that_inversion_is_working)
endField();
doc = _b.endDocument();
_inv.invertDocument(10, *doc);
- _invertThreads.sync();
+ _invertThreads->sync();
myPushDocument(_inv);
- _pushThreads.sync();
+ _pushThreads->sync();
_b.startDocument("id:ns:searchdocument::20");
_b.startIndexField("f0").
@@ -953,9 +953,9 @@ TEST_F(BasicInverterTest, require_that_inversion_is_working)
endField();
doc = _b.endDocument();
_inv.invertDocument(20, *doc);
- _invertThreads.sync();
+ _invertThreads->sync();
myPushDocument(_inv);
- _pushThreads.sync();
+ _pushThreads->sync();
_b.startDocument("id:ns:searchdocument::30");
_b.startIndexField("f0").
@@ -984,9 +984,9 @@ TEST_F(BasicInverterTest, require_that_inversion_is_working)
endField();
doc = _b.endDocument();
_inv.invertDocument(30, *doc);
- _invertThreads.sync();
+ _invertThreads->sync();
myPushDocument(_inv);
- _pushThreads.sync();
+ _pushThreads->sync();
_b.startDocument("id:ns:searchdocument::40");
_b.startIndexField("f0").
@@ -995,9 +995,9 @@ TEST_F(BasicInverterTest, require_that_inversion_is_working)
endField();
doc = _b.endDocument();
_inv.invertDocument(40, *doc);
- _invertThreads.sync();
+ _invertThreads->sync();
myPushDocument(_inv);
- _pushThreads.sync();
+ _pushThreads->sync();
_b.startDocument("id:ns:searchdocument::999");
_b.startIndexField("f0").
@@ -1025,12 +1025,12 @@ TEST_F(BasicInverterTest, require_that_inversion_is_working)
doc = _b.endDocument();
for (uint32_t docId = 10000; docId < 20000; ++docId) {
_inv.invertDocument(docId, *doc);
- _invertThreads.sync();
+ _invertThreads->sync();
myPushDocument(_inv);
- _pushThreads.sync();
+ _pushThreads->sync();
}
- _pushThreads.sync();
+ _pushThreads->sync();
DataStoreBase::MemStats beforeStats = getFeatureStoreMemStats(_fic);
LOG(info,
"Before feature compaction: allocElems=%zu, usedElems=%zu"
@@ -1044,13 +1044,13 @@ TEST_F(BasicInverterTest, require_that_inversion_is_working)
beforeStats._freeBuffers,
beforeStats._activeBuffers,
beforeStats._holdBuffers);
- myCompactFeatures(_fic, _pushThreads);
+ myCompactFeatures(_fic, *_pushThreads);
std::vector<std::unique_ptr<GenerationHandler::Guard>> guards;
for (auto &fieldIndex : _fic.getFieldIndexes()) {
guards.push_back(std::make_unique<GenerationHandler::Guard>
(fieldIndex->takeGenerationGuard()));
}
- myCommit(_fic, _pushThreads);
+ myCommit(_fic, *_pushThreads);
DataStoreBase::MemStats duringStats = getFeatureStoreMemStats(_fic);
LOG(info,
"During feature compaction: allocElems=%zu, usedElems=%zu"
@@ -1065,7 +1065,7 @@ TEST_F(BasicInverterTest, require_that_inversion_is_working)
duringStats._activeBuffers,
duringStats._holdBuffers);
guards.clear();
- myCommit(_fic, _pushThreads);
+ myCommit(_fic, *_pushThreads);
DataStoreBase::MemStats afterStats = getFeatureStoreMemStats(_fic);
LOG(info,
"After feature compaction: allocElems=%zu, usedElems=%zu"
@@ -1142,17 +1142,17 @@ TEST_F(BasicInverterTest, require_that_inverter_handles_remove_via_document_remo
_b.startIndexField("f1").addStr("a").addStr("c").endField();
Document::UP doc1 = _b.endDocument();
_inv.invertDocument(1, *doc1.get());
- _invertThreads.sync();
+ _invertThreads->sync();
myPushDocument(_inv);
- _pushThreads.sync();
+ _pushThreads->sync();
_b.startDocument("id:ns:searchdocument::2");
_b.startIndexField("f0").addStr("b").addStr("c").endField();
Document::UP doc2 = _b.endDocument();
_inv.invertDocument(2, *doc2.get());
- _invertThreads.sync();
+ _invertThreads->sync();
myPushDocument(_inv);
- _pushThreads.sync();
+ _pushThreads->sync();
EXPECT_TRUE(assertPostingList("[1]", find("a", 0)));
EXPECT_TRUE(assertPostingList("[1,2]", find("b", 0)));
@@ -1160,8 +1160,8 @@ TEST_F(BasicInverterTest, require_that_inverter_handles_remove_via_document_remo
EXPECT_TRUE(assertPostingList("[1]", find("a", 1)));
EXPECT_TRUE(assertPostingList("[1]", find("c", 1)));
- myremove(1, _inv, _invertThreads);
- _pushThreads.sync();
+ myremove(1, _inv, *_invertThreads);
+ _pushThreads->sync();
EXPECT_TRUE(assertPostingList("[]", find("a", 0)));
EXPECT_TRUE(assertPostingList("[2]", find("b", 0)));
@@ -1311,10 +1311,10 @@ TEST_F(UriInverterTest, require_that_uri_indexing_is_working)
endField();
doc = _b.endDocument();
_inv.invertDocument(10, *doc);
- _invertThreads.sync();
+ _invertThreads->sync();
myPushDocument(_inv);
- _pushThreads.sync();
+ _pushThreads->sync();
SimpleMatchData match_data;
{
@@ -1387,10 +1387,10 @@ TEST_F(CjkInverterTest, require_that_cjk_indexing_is_working)
endField();
doc = _b.endDocument();
_inv.invertDocument(10, *doc);
- _invertThreads.sync();
+ _invertThreads->sync();
myPushDocument(_inv);
- _pushThreads.sync();
+ _pushThreads->sync();
SimpleMatchData match_data;
uint32_t fieldId = _schema.getIndexFieldId("f0");
@@ -1445,13 +1445,13 @@ TEST_F(FieldIndexCollectionTest, require_that_insert_tells_which_word_ref_that_w
}
struct RemoverTest : public FieldIndexCollectionTest {
- SequencedTaskExecutor _invertThreads;
- SequencedTaskExecutor _pushThreads;
+ std::unique_ptr<ISequencedTaskExecutor> _invertThreads;
+ std::unique_ptr<ISequencedTaskExecutor> _pushThreads;
RemoverTest()
: FieldIndexCollectionTest(),
- _invertThreads(2),
- _pushThreads(2)
+ _invertThreads(SequencedTaskExecutor::create(2)),
+ _pushThreads(SequencedTaskExecutor::create(2))
{
}
void assertPostingLists(const vespalib::string &e1,
@@ -1462,9 +1462,9 @@ struct RemoverTest : public FieldIndexCollectionTest {
EXPECT_TRUE(assertPostingList(e3, find("b", 1)));
}
void remove(uint32_t docId) {
- DocumentInverter inv(schema, _invertThreads, _pushThreads, fic);
- myremove(docId, inv, _invertThreads);
- _pushThreads.sync();
+ DocumentInverter inv(schema, *_invertThreads, *_pushThreads, fic);
+ myremove(docId, inv, *_invertThreads);
+ _pushThreads->sync();
EXPECT_FALSE(fic.getFieldIndex(0u)->getDocumentRemover().
getStore().get(docId).valid());
}
diff --git a/searchlib/src/tests/memoryindex/memory_index/memory_index_test.cpp b/searchlib/src/tests/memoryindex/memory_index/memory_index_test.cpp
index 4bb0f91659a..5032ed2c179 100644
--- a/searchlib/src/tests/memoryindex/memory_index/memory_index_test.cpp
+++ b/searchlib/src/tests/memoryindex/memory_index/memory_index_test.cpp
@@ -64,8 +64,8 @@ struct MySetup : public IFieldLengthInspector {
struct Index {
Schema schema;
vespalib::ThreadStackExecutor _executor;
- search::SequencedTaskExecutor _invertThreads;
- search::SequencedTaskExecutor _pushThreads;
+ std::unique_ptr<search::ISequencedTaskExecutor> _invertThreads;
+ std::unique_ptr<search::ISequencedTaskExecutor> _pushThreads;
MemoryIndex index;
DocBuilder builder;
uint32_t docid;
@@ -123,15 +123,15 @@ private:
Index::Index(const MySetup &setup)
: schema(setup.schema),
_executor(1, 128 * 1024),
- _invertThreads(2),
- _pushThreads(2),
- index(schema, setup, _invertThreads, _pushThreads),
+ _invertThreads(search::SequencedTaskExecutor::create(2)),
+ _pushThreads(search::SequencedTaskExecutor::create(2)),
+ index(schema, setup, *_invertThreads, *_pushThreads),
builder(schema),
docid(1),
currentField()
{
}
-Index::~Index() {}
+Index::~Index() = default;
//-----------------------------------------------------------------------------
std::string toString(SearchIterator & search)
diff --git a/searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.cpp b/searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.cpp
index 513684d3fd5..4c501defeea 100644
--- a/searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.cpp
+++ b/searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.cpp
@@ -14,7 +14,8 @@ ForegroundTaskExecutor::ForegroundTaskExecutor()
}
ForegroundTaskExecutor::ForegroundTaskExecutor(uint32_t threads)
- : ISequencedTaskExecutor(threads)
+ : ISequencedTaskExecutor(threads),
+ _accepted(0)
{
}
@@ -25,6 +26,7 @@ ForegroundTaskExecutor::executeTask(ExecutorId id, vespalib::Executor::Task::UP
{
assert(id.getId() < getNumExecutors());
task->run();
+ _accepted++;
}
void
@@ -32,4 +34,12 @@ ForegroundTaskExecutor::sync()
{
}
+void ForegroundTaskExecutor::setTaskLimit(uint32_t) {
+
+}
+
+vespalib::ExecutorStats ForegroundTaskExecutor::getStats() {
+ return vespalib::ExecutorStats(0, _accepted.load(std::memory_order_relaxed), 0);
+}
+
} // namespace search
diff --git a/searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.h b/searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.h
index 2074eda009b..0b604fb140d 100644
--- a/searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.h
+++ b/searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.h
@@ -2,7 +2,7 @@
#pragma once
#include "isequencedtaskexecutor.h"
-#include <vespa/vespalib/stllike/hash_map.h>
+#include <atomic>
namespace vespalib { class ThreadStackExecutorBase; }
@@ -25,6 +25,12 @@ public:
void executeTask(ExecutorId id, vespalib::Executor::Task::UP task) override;
void sync() override;
+
+ void setTaskLimit(uint32_t taskLimit) override;
+
+ vespalib::ExecutorStats getStats() override;
+private:
+ std::atomic<uint64_t> _accepted;
};
} // namespace search
diff --git a/searchlib/src/vespa/searchlib/common/isequencedtaskexecutor.h b/searchlib/src/vespa/searchlib/common/isequencedtaskexecutor.h
index 109e8319148..55c26abc3d8 100644
--- a/searchlib/src/vespa/searchlib/common/isequencedtaskexecutor.h
+++ b/searchlib/src/vespa/searchlib/common/isequencedtaskexecutor.h
@@ -2,6 +2,7 @@
#pragma once
#include <vespa/vespalib/util/executor.h>
+#include <vespa/vespalib/util/executor_stats.h>
#include <vespa/vespalib/stllike/string.h>
#include <vespa/vespalib/util/lambdatask.h>
#include <vector>
@@ -67,6 +68,10 @@ public:
*/
virtual void sync() = 0;
+ virtual void setTaskLimit(uint32_t taskLimit) = 0;
+
+ virtual vespalib::ExecutorStats getStats() = 0;
+
/**
* Wrap lambda function into a task and schedule it to be run.
* Caller must ensure that pointers and references are valid and
diff --git a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.cpp b/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.cpp
index 184ff55220f..30723a6eb2a 100644
--- a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.cpp
+++ b/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.cpp
@@ -14,14 +14,15 @@ constexpr uint32_t stackSize = 128 * 1024;
}
-SequencedTaskExecutor::SequencedTaskExecutor(uint32_t threads, uint32_t taskLimit)
- : ISequencedTaskExecutor(threads),
- _executors()
+std::unique_ptr<ISequencedTaskExecutor>
+SequencedTaskExecutor::create(uint32_t threads, uint32_t taskLimit)
{
+ auto executors = std::make_unique<std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>>>();
+ executors->reserve(threads);
for (uint32_t id = 0; id < threads; ++id) {
- auto executor = std::make_unique<BlockingThreadStackExecutor>(1, stackSize, taskLimit);
- _executors.push_back(std::move(executor));
+ executors->push_back(std::make_unique<BlockingThreadStackExecutor>(1, stackSize, taskLimit));
}
+ return std::unique_ptr<ISequencedTaskExecutor>(new SequencedTaskExecutor(std::move(executors)));
}
SequencedTaskExecutor::~SequencedTaskExecutor()
@@ -29,10 +30,16 @@ SequencedTaskExecutor::~SequencedTaskExecutor()
sync();
}
+SequencedTaskExecutor::SequencedTaskExecutor(std::unique_ptr<std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>>> executors)
+ : ISequencedTaskExecutor(executors->size()),
+ _executors(std::move(executors))
+{
+}
+
void
SequencedTaskExecutor::setTaskLimit(uint32_t taskLimit)
{
- for (const auto &executor : _executors) {
+ for (const auto &executor : *_executors) {
executor->setTaskLimit(taskLimit);
}
}
@@ -40,16 +47,15 @@ SequencedTaskExecutor::setTaskLimit(uint32_t taskLimit)
void
SequencedTaskExecutor::executeTask(ExecutorId id, vespalib::Executor::Task::UP task)
{
- assert(id.getId() < _executors.size());
- vespalib::ThreadStackExecutorBase &executor(*_executors[id.getId()]);
- auto rejectedTask = executor.execute(std::move(task));
+ assert(id.getId() < _executors->size());
+ auto rejectedTask = (*_executors)[id.getId()]->execute(std::move(task));
assert(!rejectedTask);
}
void
SequencedTaskExecutor::sync()
{
- for (auto &executor : _executors) {
+ for (auto &executor : *_executors) {
executor->sync();
}
}
@@ -58,7 +64,7 @@ SequencedTaskExecutor::Stats
SequencedTaskExecutor::getStats()
{
Stats accumulatedStats;
- for (auto &executor : _executors) {
+ for (auto &executor :* _executors) {
accumulatedStats += executor->getStats();
}
return accumulatedStats;
diff --git a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.h b/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.h
index 9337f393150..a29e3d5226c 100644
--- a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.h
+++ b/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.h
@@ -6,7 +6,7 @@
namespace vespalib {
struct ExecutorStats;
- class BlockingThreadStackExecutor;
+ class SyncableThreadExecutor;
}
namespace search {
@@ -18,17 +18,19 @@ namespace search {
class SequencedTaskExecutor final : public ISequencedTaskExecutor
{
using Stats = vespalib::ExecutorStats;
- std::vector<std::shared_ptr<vespalib::BlockingThreadStackExecutor>> _executors;
+ std::unique_ptr<std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>>> _executors;
+
+ SequencedTaskExecutor(std::unique_ptr<std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>>> executor);
public:
using ISequencedTaskExecutor::getExecutorId;
- SequencedTaskExecutor(uint32_t threads, uint32_t taskLimit = 1000);
~SequencedTaskExecutor();
- void setTaskLimit(uint32_t taskLimit);
+ void setTaskLimit(uint32_t taskLimit) override;
void executeTask(ExecutorId id, vespalib::Executor::Task::UP task) override;
void sync() override;
- Stats getStats();
+ Stats getStats() override;
+ static std::unique_ptr<ISequencedTaskExecutor> create(uint32_t threads, uint32_t taskLimit = 1000);
};
} // namespace search
diff --git a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutorobserver.cpp b/searchlib/src/vespa/searchlib/common/sequencedtaskexecutorobserver.cpp
index 04504086520..e553e757c37 100644
--- a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutorobserver.cpp
+++ b/searchlib/src/vespa/searchlib/common/sequencedtaskexecutorobserver.cpp
@@ -41,4 +41,12 @@ SequencedTaskExecutorObserver::getExecuteHistory()
return _executeHistory;
}
+void SequencedTaskExecutorObserver::setTaskLimit(uint32_t taskLimit) {
+ _executor.setTaskLimit(taskLimit);
+}
+
+vespalib::ExecutorStats SequencedTaskExecutorObserver::getStats() {
+ return _executor.getStats();
+}
+
} // namespace search
diff --git a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutorobserver.h b/searchlib/src/vespa/searchlib/common/sequencedtaskexecutorobserver.h
index b2de71f06b3..dadd4bf59cf 100644
--- a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutorobserver.h
+++ b/searchlib/src/vespa/searchlib/common/sequencedtaskexecutorobserver.h
@@ -3,8 +3,6 @@
#include "isequencedtaskexecutor.h"
#include <atomic>
-#include <vector>
-#include <mutex>
namespace search {
@@ -31,6 +29,10 @@ public:
uint32_t getExecuteCnt() const { return _executeCnt; }
uint32_t getSyncCnt() const { return _syncCnt; }
std::vector<uint32_t> getExecuteHistory();
+
+ void setTaskLimit(uint32_t taskLimit) override;
+
+ vespalib::ExecutorStats getStats() override;
};
} // namespace search
diff --git a/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.cpp b/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.cpp
index 33d6e46a244..b5cf16b4f80 100644
--- a/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.cpp
+++ b/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.cpp
@@ -39,10 +39,4 @@ BlockingThreadStackExecutor::~BlockingThreadStackExecutor()
cleanup();
}
-void
-BlockingThreadStackExecutor::setTaskLimit(uint32_t taskLimit)
-{
- internalSetTaskLimit(taskLimit);
-}
-
} // namespace vespalib
diff --git a/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.h b/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.h
index 73a142ae42c..df3f9408e0a 100644
--- a/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.h
+++ b/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.h
@@ -33,11 +33,6 @@ public:
init_fun_t init_function);
~BlockingThreadStackExecutor();
-
- /**
- * Sets a new upper limit for accepted number of tasks.
- */
- void setTaskLimit(uint32_t taskLimit);
};
} // namespace vespalib
diff --git a/vespalib/src/vespa/vespalib/util/threadexecutor.h b/vespalib/src/vespa/vespalib/util/threadexecutor.h
index 2dcbb595bb3..202e516bc60 100644
--- a/vespalib/src/vespa/vespalib/util/threadexecutor.h
+++ b/vespalib/src/vespa/vespalib/util/threadexecutor.h
@@ -4,6 +4,7 @@
#include "executor.h"
#include "syncable.h"
+#include "executor_stats.h"
namespace vespalib {
@@ -11,10 +12,26 @@ class ThreadExecutor : public Executor
{
public:
/**
+ * Internal stats that we want to observe externally. Note that
+ * all stats are reset each time they are observed.
+ **/
+ using Stats = ExecutorStats;
+ /**
* Get number of threads in the executor pool.
* @return number of threads in the pool
*/
virtual size_t getNumThreads() const = 0;
+
+ /**
+ * Observe and reset stats for this object.
+ * @return stats
+ **/
+ virtual Stats getStats() = 0;
+
+ /**
+ * Sets a new upper limit for accepted number of tasks.
+ */
+ virtual void setTaskLimit(uint32_t taskLimit) = 0;
};
/**
diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp
index e933477bfb9..9ab465841d6 100644
--- a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp
+++ b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp
@@ -168,6 +168,12 @@ size_t ThreadStackExecutorBase::getNumThreads() const {
}
void
+ThreadStackExecutorBase::setTaskLimit(uint32_t taskLimit)
+{
+ internalSetTaskLimit(taskLimit);
+}
+
+void
ThreadStackExecutorBase::internalSetTaskLimit(uint32_t taskLimit)
{
MonitorGuard monitor(_monitor);
diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h
index 8718b04d2d3..2c0bc56d6df 100644
--- a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h
+++ b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h
@@ -8,10 +8,8 @@
#include "sync.h"
#include "gate.h"
#include "runnable.h"
-#include <memory>
#include <vector>
#include <functional>
-#include "executor_stats.h"
class FastOS_ThreadPool;
@@ -36,12 +34,6 @@ class ThreadStackExecutorBase : public SyncableThreadExecutor,
public Runnable
{
public:
- /**
- * Internal stats that we want to observe externally. Note that
- * all stats are reset each time they are observed.
- **/
- using Stats = ExecutorStats;
-
using init_fun_t = std::function<int(Runnable&)>;
private:
@@ -204,14 +196,8 @@ public:
**/
size_t num_idle_workers() const;
- /**
- * Observe and reset stats for this object.
- *
- * @return stats
- **/
- Stats getStats();
+ Stats getStats() override;
- // inherited from Executor
Task::UP execute(Task::UP task) override;
/**
@@ -232,6 +218,7 @@ public:
void wait_for_task_count(uint32_t task_count);
size_t getNumThreads() const override;
+ void setTaskLimit(uint32_t taskLimit) override;
/**
* Shut down this executor. This will make this executor reject