summaryrefslogtreecommitdiffstats
path: root/searchcore
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-03-23 16:01:22 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2020-03-23 16:13:09 +0000
commit36b3be76ae151682295faa04de17989f3600c29b (patch)
treefbe5156d396bfa694f9acd932b466088c20d2e63 /searchcore
parent23dcf1db150a66ec66cf746d4234982fdbb0e6e2 (diff)
Add shutdown to thread interface.
Let the optimize config control index and summary executor too.
Diffstat (limited to 'searchcore')
-rw-r--r--searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp1
-rw-r--r--searchcore/src/tests/proton/index/indexmanager_test.cpp1
-rw-r--r--searchcore/src/tests/proton/proton_configurer/proton_configurer_test.cpp9
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/documentdb.h2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/executor_thread_service.cpp7
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/executor_thread_service.h10
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp51
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h33
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/i_proton_configurer_owner.h2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/proton.cpp1
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/proton.h2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/proton_configurer.cpp2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/proton_configurer.h6
-rw-r--r--searchcore/src/vespa/searchcore/proton/test/thread_service_observer.h4
-rw-r--r--searchcore/src/vespa/searchcore/proton/test/threading_service_observer.h1
15 files changed, 79 insertions, 53 deletions
diff --git a/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp b/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp
index 30fed6fa49e..6c9ffc210a1 100644
--- a/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp
@@ -55,7 +55,6 @@ using storage::spi::RemoveResult;
using storage::spi::Result;
using storage::spi::Timestamp;
using storage::spi::UpdateResult;
-using vespalib::BlockingThreadStackExecutor;
using vespalib::ThreadStackExecutor;
using vespalib::ThreadStackExecutorBase;
using vespalib::makeClosure;
diff --git a/searchcore/src/tests/proton/index/indexmanager_test.cpp b/searchcore/src/tests/proton/index/indexmanager_test.cpp
index 264cf6d8cfa..8cc075773f7 100644
--- a/searchcore/src/tests/proton/index/indexmanager_test.cpp
+++ b/searchcore/src/tests/proton/index/indexmanager_test.cpp
@@ -48,7 +48,6 @@ using search::memoryindex::FieldIndexCollection;
using search::queryeval::Source;
using std::set;
using std::string;
-using vespalib::BlockingThreadStackExecutor;
using vespalib::ThreadStackExecutor;
using vespalib::makeLambdaTask;
using std::chrono::duration_cast;
diff --git a/searchcore/src/tests/proton/proton_configurer/proton_configurer_test.cpp b/searchcore/src/tests/proton/proton_configurer/proton_configurer_test.cpp
index dfb1268aaa6..c26b008f769 100644
--- a/searchcore/src/tests/proton/proton_configurer/proton_configurer_test.cpp
+++ b/searchcore/src/tests/proton/proton_configurer/proton_configurer_test.cpp
@@ -243,7 +243,6 @@ struct MyLog
struct MyProtonConfigurerOwner : public IProtonConfigurerOwner,
public MyLog
{
- using InitializeThreads = std::shared_ptr<vespalib::ThreadStackExecutorBase>;
vespalib::ThreadStackExecutor _executor;
std::map<DocTypeName, std::shared_ptr<MyDocumentDBConfigOwner>> _dbs;
@@ -254,9 +253,9 @@ struct MyProtonConfigurerOwner : public IProtonConfigurerOwner,
_dbs()
{
}
- virtual ~MyProtonConfigurerOwner() { }
+ ~MyProtonConfigurerOwner() { }
- virtual std::shared_ptr<DocumentDBConfigOwner> addDocumentDB(const DocTypeName &docTypeName,
+ std::shared_ptr<DocumentDBConfigOwner> addDocumentDB(const DocTypeName &docTypeName,
document::BucketSpace bucketSpace,
const vespalib::string &configId,
const std::shared_ptr<BootstrapConfig> &bootstrapConfig,
@@ -275,14 +274,14 @@ struct MyProtonConfigurerOwner : public IProtonConfigurerOwner,
_log.push_back(os.str());
return db;
}
- virtual void removeDocumentDB(const DocTypeName &docTypeName) override {
+ void removeDocumentDB(const DocTypeName &docTypeName) override {
ASSERT_FALSE(_dbs.find(docTypeName) == _dbs.end());
_dbs.erase(docTypeName);
std::ostringstream os;
os << "remove db " << docTypeName.getName();
_log.push_back(os.str());
}
- virtual void applyConfig(const std::shared_ptr<BootstrapConfig> &bootstrapConfig) override {
+ void applyConfig(const std::shared_ptr<BootstrapConfig> &bootstrapConfig) override {
std::ostringstream os;
os << "apply config " << bootstrapConfig->getGeneration();
_log.push_back(os.str());
diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.h b/searchcore/src/vespa/searchcore/proton/server/documentdb.h
index f296e264903..1a1d97a657b 100644
--- a/searchcore/src/vespa/searchcore/proton/server/documentdb.h
+++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.h
@@ -85,7 +85,7 @@ private:
DocumentStoreCacheStats() : total(), readySubDb(), notReadySubDb(), removedSubDb() {}
};
- using InitializeThreads = std::shared_ptr<vespalib::ThreadStackExecutorBase>;
+ using InitializeThreads = std::shared_ptr<vespalib::SyncableThreadExecutor>;
using IFlushTargetList = std::vector<std::shared_ptr<searchcorespi::IFlushTarget>>;
using StatusReportUP = std::unique_ptr<StatusReport>;
using ProtonConfig = const vespa::config::search::core::internal::InternalProtonType;
diff --git a/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.cpp b/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.cpp
index 6ca385711b0..b13fa2baed3 100644
--- a/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.cpp
@@ -2,13 +2,14 @@
#include "executor_thread_service.h"
#include <vespa/vespalib/util/lambdatask.h>
+#include <vespa/vespalib/util/gate.h>
#include <vespa/fastos/thread.h>
using vespalib::makeLambdaTask;
using vespalib::Executor;
using vespalib::Gate;
using vespalib::Runnable;
-using vespalib::ThreadStackExecutorBase;
+using vespalib::SyncableThreadExecutor;
namespace proton {
@@ -28,7 +29,7 @@ sampleThreadId(FastOS_ThreadId *threadId)
}
std::unique_ptr<internal::ThreadId>
-getThreadId(ThreadStackExecutorBase &executor)
+getThreadId(SyncableThreadExecutor &executor)
{
std::unique_ptr<internal::ThreadId> id = std::make_unique<internal::ThreadId>();
executor.execute(makeLambdaTask([threadId=&id->_id] { sampleThreadId(threadId);}));
@@ -45,7 +46,7 @@ runRunnable(Runnable *runnable, Gate *gate)
} // namespace
-ExecutorThreadService::ExecutorThreadService(ThreadStackExecutorBase &executor)
+ExecutorThreadService::ExecutorThreadService(SyncableThreadExecutor &executor)
: _executor(executor),
_threadId(getThreadId(executor))
{
diff --git a/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.h b/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.h
index ccdfb6b72cd..26069b4b8dd 100644
--- a/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.h
+++ b/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.h
@@ -2,7 +2,7 @@
#pragma once
#include <vespa/searchcorespi/index/i_thread_service.h>
-#include <vespa/vespalib/util/threadstackexecutorbase.h>
+#include <vespa/vespalib/util/threadexecutor.h>
namespace proton {
@@ -14,11 +14,11 @@ namespace internal { struct ThreadId; }
class ExecutorThreadService : public searchcorespi::index::IThreadService
{
private:
- vespalib::ThreadStackExecutorBase &_executor;
+ vespalib::SyncableThreadExecutor &_executor;
std::unique_ptr<internal::ThreadId> _threadId;
public:
- ExecutorThreadService(vespalib::ThreadStackExecutorBase &executor);
+ ExecutorThreadService(vespalib::SyncableThreadExecutor &executor);
~ExecutorThreadService();
Stats getStats() override;
@@ -31,6 +31,10 @@ public:
_executor.sync();
return *this;
}
+ ExecutorThreadService & shutdown() override {
+ _executor.shutdown();
+ return *this;
+ }
bool isCurrentThread() const override;
size_t getNumThreads() const override { return _executor.getNumThreads(); }
diff --git a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp
index 6e7b4967f6d..a725b00d485 100644
--- a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp
@@ -3,23 +3,42 @@
#include "executorthreadingservice.h"
#include <vespa/searchcore/proton/metrics/executor_threading_service_stats.h>
#include <vespa/searchlib/common/sequencedtaskexecutor.h>
+#include <vespa/vespalib/util/singleexecutor.h>
+#include <vespa/vespalib/util/blockingthreadstackexecutor.h>
-using vespalib::ThreadStackExecutorBase;
+
+using vespalib::SyncableThreadExecutor;
+using vespalib::BlockingThreadStackExecutor;
+using vespalib::SingleExecutor;
using search::SequencedTaskExecutor;
+using OptimizeFor = vespalib::Executor::OptimizeFor;
namespace proton {
-ExecutorThreadingService::ExecutorThreadingService(vespalib::ThreadStackExecutorBase & sharedExecutor,
+namespace {
+
+std::unique_ptr<SyncableThreadExecutor>
+createExecutorWithOneThread(uint32_t stackSize, uint32_t taskLimit, OptimizeFor optimize) {
+ if (optimize == OptimizeFor::THROUGHPUT) {
+ return std::make_unique<SingleExecutor>(taskLimit);
+ } else {
+ return std::make_unique<BlockingThreadStackExecutor>(1, stackSize, taskLimit);
+ }
+}
+
+}
+
+ExecutorThreadingService::ExecutorThreadingService(vespalib::SyncableThreadExecutor & sharedExecutor,
uint32_t threads, uint32_t stackSize, uint32_t taskLimit,
OptimizeFor optimize)
: _sharedExecutor(sharedExecutor),
_masterExecutor(1, stackSize),
- _indexExecutor(1, stackSize, taskLimit),
- _summaryExecutor(1, stackSize, taskLimit),
+ _indexExecutor(createExecutorWithOneThread(stackSize, taskLimit, optimize)),
+ _summaryExecutor(createExecutorWithOneThread(stackSize, taskLimit, optimize)),
_masterService(_masterExecutor),
- _indexService(_indexExecutor),
- _summaryService(_summaryExecutor),
+ _indexService(*_indexExecutor),
+ _summaryService(*_summaryExecutor),
_indexFieldInverter(SequencedTaskExecutor::create(threads, taskLimit)),
_indexFieldWriter(SequencedTaskExecutor::create(threads, taskLimit)),
_attributeFieldWriter(SequencedTaskExecutor::create(threads, taskLimit, optimize))
@@ -36,8 +55,8 @@ ExecutorThreadingService::sync()
_masterExecutor.sync();
}
_attributeFieldWriter->sync();
- _indexExecutor.sync();
- _summaryExecutor.sync();
+ _indexExecutor->sync();
+ _summaryExecutor->sync();
_indexFieldInverter->sync();
_indexFieldWriter->sync();
if (!isMasterThread) {
@@ -52,10 +71,10 @@ ExecutorThreadingService::shutdown()
_masterExecutor.shutdown();
_masterExecutor.sync();
_attributeFieldWriter->sync();
- _summaryExecutor.shutdown();
- _summaryExecutor.sync();
- _indexExecutor.shutdown();
- _indexExecutor.sync();
+ _summaryExecutor->shutdown();
+ _summaryExecutor->sync();
+ _indexExecutor->shutdown();
+ _indexExecutor->sync();
_indexFieldInverter->sync();
_indexFieldWriter->sync();
}
@@ -63,8 +82,8 @@ ExecutorThreadingService::shutdown()
void
ExecutorThreadingService::setTaskLimit(uint32_t taskLimit, uint32_t summaryTaskLimit)
{
- _indexExecutor.setTaskLimit(taskLimit);
- _summaryExecutor.setTaskLimit(summaryTaskLimit);
+ _indexExecutor->setTaskLimit(taskLimit);
+ _summaryExecutor->setTaskLimit(summaryTaskLimit);
_indexFieldInverter->setTaskLimit(taskLimit);
_indexFieldWriter->setTaskLimit(taskLimit);
_attributeFieldWriter->setTaskLimit(taskLimit);
@@ -74,8 +93,8 @@ ExecutorThreadingServiceStats
ExecutorThreadingService::getStats()
{
return ExecutorThreadingServiceStats(_masterExecutor.getStats(),
- _indexExecutor.getStats(),
- _summaryExecutor.getStats(),
+ _indexExecutor->getStats(),
+ _summaryExecutor->getStats(),
_sharedExecutor.getStats(),
_indexFieldInverter->getStats(),
_indexFieldWriter->getStats(),
diff --git a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h
index 2e4dd2035f3..4d018e2b6f3 100644
--- a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h
+++ b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h
@@ -3,7 +3,6 @@
#include "executor_thread_service.h"
#include <vespa/searchcorespi/index/ithreadingservice.h>
-#include <vespa/vespalib/util/blockingthreadstackexecutor.h>
#include <vespa/vespalib/util/threadstackexecutor.h>
namespace proton {
@@ -17,16 +16,16 @@ class ExecutorThreadingServiceStats;
class ExecutorThreadingService : public searchcorespi::index::IThreadingService
{
private:
- vespalib::ThreadStackExecutorBase & _sharedExecutor;
- vespalib::ThreadStackExecutor _masterExecutor;
- vespalib::BlockingThreadStackExecutor _indexExecutor;
- vespalib::BlockingThreadStackExecutor _summaryExecutor;
- ExecutorThreadService _masterService;
- ExecutorThreadService _indexService;
- ExecutorThreadService _summaryService;
- std::unique_ptr<search::ISequencedTaskExecutor> _indexFieldInverter;
- std::unique_ptr<search::ISequencedTaskExecutor> _indexFieldWriter;
- std::unique_ptr<search::ISequencedTaskExecutor> _attributeFieldWriter;
+ vespalib::SyncableThreadExecutor & _sharedExecutor;
+ vespalib::ThreadStackExecutor _masterExecutor;
+ std::unique_ptr<vespalib::SyncableThreadExecutor> _indexExecutor;
+ std::unique_ptr<vespalib::SyncableThreadExecutor> _summaryExecutor;
+ ExecutorThreadService _masterService;
+ ExecutorThreadService _indexService;
+ ExecutorThreadService _summaryService;
+ std::unique_ptr<search::ISequencedTaskExecutor> _indexFieldInverter;
+ std::unique_ptr<search::ISequencedTaskExecutor> _indexFieldWriter;
+ std::unique_ptr<search::ISequencedTaskExecutor> _attributeFieldWriter;
public:
using OptimizeFor = vespalib::Executor::OptimizeFor;
@@ -36,7 +35,7 @@ public:
* @stackSize The size of the stack of the underlying executors.
* @taskLimit The task limit for the index executor.
*/
- ExecutorThreadingService(vespalib::ThreadStackExecutorBase &sharedExecutor,
+ ExecutorThreadingService(vespalib::SyncableThreadExecutor &sharedExecutor,
uint32_t threads = 1,
uint32_t stackSize = 128 * 1024,
uint32_t taskLimit = 1000,
@@ -56,11 +55,11 @@ public:
vespalib::ThreadStackExecutorBase &getMasterExecutor() {
return _masterExecutor;
}
- vespalib::ThreadStackExecutorBase &getIndexExecutor() {
- return _indexExecutor;
+ vespalib::SyncableThreadExecutor &getIndexExecutor() {
+ return *_indexExecutor;
}
- vespalib::ThreadStackExecutorBase &getSummaryExecutor() {
- return _summaryExecutor;
+ vespalib::SyncableThreadExecutor &getSummaryExecutor() {
+ return *_summaryExecutor;
}
/**
@@ -76,7 +75,7 @@ public:
searchcorespi::index::IThreadService &summary() override {
return _summaryService;
}
- vespalib::ThreadExecutor &shared() override {
+ vespalib::SyncableThreadExecutor &shared() override {
return _sharedExecutor;
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/i_proton_configurer_owner.h b/searchcore/src/vespa/searchcore/proton/server/i_proton_configurer_owner.h
index fec8430e41d..5a457b168ec 100644
--- a/searchcore/src/vespa/searchcore/proton/server/i_proton_configurer_owner.h
+++ b/searchcore/src/vespa/searchcore/proton/server/i_proton_configurer_owner.h
@@ -18,8 +18,8 @@ class DocumentDBConfigOwner;
*/
class IProtonConfigurerOwner
{
- using InitializeThreads = std::shared_ptr<vespalib::ThreadStackExecutorBase>;
public:
+ using InitializeThreads = std::shared_ptr<vespalib::SyncableThreadExecutor>;
virtual ~IProtonConfigurerOwner() { }
virtual std::shared_ptr<DocumentDBConfigOwner> addDocumentDB(const DocTypeName &docTypeName,
document::BucketSpace bucketSpace,
diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.cpp b/searchcore/src/vespa/searchcore/proton/server/proton.cpp
index 20de5bb07c1..3f8db3f2ff9 100644
--- a/searchcore/src/vespa/searchcore/proton/server/proton.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/proton.cpp
@@ -32,6 +32,7 @@
#include <vespa/vespalib/util/host_name.h>
#include <vespa/vespalib/util/random.h>
#include <vespa/vespalib/net/state_server.h>
+#include <vespa/vespalib/util/blockingthreadstackexecutor.h>
#include <vespa/searchlib/aggregation/forcelink.hpp>
#include <vespa/searchlib/expression/forcelink.hpp>
diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.h b/searchcore/src/vespa/searchcore/proton/server/proton.h
index 410f45162e4..4c9d4c77cc4 100644
--- a/searchcore/src/vespa/searchcore/proton/server/proton.h
+++ b/searchcore/src/vespa/searchcore/proton/server/proton.h
@@ -57,7 +57,7 @@ private:
typedef search::engine::MonitorClient MonitorClient;
typedef std::map<DocTypeName, DocumentDB::SP> DocumentDBMap;
typedef BootstrapConfig::ProtonConfigSP ProtonConfigSP;
- using InitializeThreads = std::shared_ptr<vespalib::ThreadStackExecutorBase>;
+ using InitializeThreads = std::shared_ptr<vespalib::SyncableThreadExecutor>;
using BucketSpace = document::BucketSpace;
struct MetricsUpdateHook : metrics::UpdateHook
diff --git a/searchcore/src/vespa/searchcore/proton/server/proton_configurer.cpp b/searchcore/src/vespa/searchcore/proton/server/proton_configurer.cpp
index 0b9293a4aab..45e3c978dd9 100644
--- a/searchcore/src/vespa/searchcore/proton/server/proton_configurer.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/proton_configurer.cpp
@@ -39,7 +39,7 @@ getBucketSpace(const BootstrapConfig &bootstrapConfig, const DocTypeName &name)
}
-ProtonConfigurer::ProtonConfigurer(vespalib::ThreadStackExecutorBase &executor,
+ProtonConfigurer::ProtonConfigurer(vespalib::SyncableThreadExecutor &executor,
IProtonConfigurerOwner &owner,
const std::unique_ptr<IProtonDiskLayout> &diskLayout)
: IProtonConfigurer(),
diff --git a/searchcore/src/vespa/searchcore/proton/server/proton_configurer.h b/searchcore/src/vespa/searchcore/proton/server/proton_configurer.h
index c896f12bd4f..54399a26365 100644
--- a/searchcore/src/vespa/searchcore/proton/server/proton_configurer.h
+++ b/searchcore/src/vespa/searchcore/proton/server/proton_configurer.h
@@ -25,7 +25,7 @@ class IProtonDiskLayout;
class ProtonConfigurer : public IProtonConfigurer
{
using DocumentDBs = std::map<DocTypeName, std::pair<std::weak_ptr<IDocumentDBConfigOwner>, std::weak_ptr<DocumentDBDirectoryHolder>>>;
- using InitializeThreads = std::shared_ptr<vespalib::ThreadStackExecutorBase>;
+ using InitializeThreads = std::shared_ptr<vespalib::SyncableThreadExecutor>;
ExecutorThreadService _executor;
IProtonConfigurerOwner &_owner;
@@ -48,11 +48,11 @@ class ProtonConfigurer : public IProtonConfigurer
void pruneInitialDocumentDBDirs(const ProtonConfigSnapshot &configSnapshot);
public:
- ProtonConfigurer(vespalib::ThreadStackExecutorBase &executor,
+ ProtonConfigurer(vespalib::SyncableThreadExecutor &executor,
IProtonConfigurerOwner &owner,
const std::unique_ptr<IProtonDiskLayout> &diskLayout);
- ~ProtonConfigurer();
+ ~ProtonConfigurer() override;
void setAllowReconfig(bool allowReconfig);
diff --git a/searchcore/src/vespa/searchcore/proton/test/thread_service_observer.h b/searchcore/src/vespa/searchcore/proton/test/thread_service_observer.h
index 766bdeeefb0..127b696c4ab 100644
--- a/searchcore/src/vespa/searchcore/proton/test/thread_service_observer.h
+++ b/searchcore/src/vespa/searchcore/proton/test/thread_service_observer.h
@@ -31,6 +31,10 @@ public:
_service.sync();
return *this;
}
+ ThreadServiceObserver &shutdown() override {
+ _service.shutdown();
+ return *this;
+ }
bool isCurrentThread() const override {
return _service.isCurrentThread();
}
diff --git a/searchcore/src/vespa/searchcore/proton/test/threading_service_observer.h b/searchcore/src/vespa/searchcore/proton/test/threading_service_observer.h
index 7ac9c0c68f2..23c62d179b1 100644
--- a/searchcore/src/vespa/searchcore/proton/test/threading_service_observer.h
+++ b/searchcore/src/vespa/searchcore/proton/test/threading_service_observer.h
@@ -69,6 +69,7 @@ public:
search::ISequencedTaskExecutor &attributeFieldWriter() override {
return _attributeFieldWriter;
}
+
};
}