summaryrefslogtreecommitdiffstats
path: root/searchcore
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2021-11-29 21:34:06 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2021-11-30 13:50:43 +0000
commitaabf1addb8c3693257ef96f3642f0b18b8ab4178 (patch)
tree3bd0d0700e56d061d453bf289f932173bc879de8 /searchcore
parentd0476e64b77f9cde4324cd1bbc1366fd24281ef6 (diff)
Use new InvokerService for doing wakeup calls at regular intervals.
Diffstat (limited to 'searchcore')
-rw-r--r--searchcore/src/tests/proton/documentdb/executor_threading_service/executor_threading_service_test.cpp1
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/documentdb.cpp2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp19
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h4
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/i_shared_threading_service.h6
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/proton.cpp1
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/proton.h5
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp10
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/shared_threading_service.h5
-rw-r--r--searchcore/src/vespa/searchcore/proton/test/mock_shared_threading_service.h6
10 files changed, 53 insertions, 6 deletions
diff --git a/searchcore/src/tests/proton/documentdb/executor_threading_service/executor_threading_service_test.cpp b/searchcore/src/tests/proton/documentdb/executor_threading_service/executor_threading_service_test.cpp
index 4629ebec854..8d7e842bc89 100644
--- a/searchcore/src/tests/proton/documentdb/executor_threading_service/executor_threading_service_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/executor_threading_service/executor_threading_service_test.cpp
@@ -32,6 +32,7 @@ public:
void setup(uint32_t indexing_threads, SharedFieldWriterExecutor shared_field_writer) {
service = std::make_unique<ExecutorThreadingService>(shared_executor,
field_writer_executor.get(),
+ nullptr,
ThreadingServiceConfig::make(indexing_threads, shared_field_writer));
}
SequencedTaskExecutor* index_inverter() {
diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp
index 2b2f2422221..53bdc356015 100644
--- a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp
@@ -175,7 +175,7 @@ DocumentDB::DocumentDB(const vespalib::string &baseDir,
_baseDir(baseDir + "/" + _docTypeName.toString()),
// Only one thread per executor, or performDropFeedView() will fail.
_writeServiceConfig(configSnapshot->get_threading_service_config()),
- _writeService(shared_service.shared(), shared_service.field_writer(), _writeServiceConfig, indexing_thread_stack_size),
+ _writeService(shared_service.shared(), shared_service.field_writer(), &shared_service.invokeService(), _writeServiceConfig, indexing_thread_stack_size),
_initializeThreads(std::move(initializeThreads)),
_initConfigSnapshot(),
_initConfigSerialNum(0u),
diff --git a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp
index 52da92ed568..bca8e89d69e 100644
--- a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp
@@ -39,11 +39,12 @@ VESPA_THREAD_STACK_TAG(field_writer_executor)
}
ExecutorThreadingService::ExecutorThreadingService(vespalib::ThreadExecutor &sharedExecutor, uint32_t num_treads)
- : ExecutorThreadingService(sharedExecutor, nullptr, ThreadingServiceConfig::make(num_treads))
+ : ExecutorThreadingService(sharedExecutor, nullptr, nullptr, ThreadingServiceConfig::make(num_treads))
{}
ExecutorThreadingService::ExecutorThreadingService(vespalib::ThreadExecutor& sharedExecutor,
vespalib::ISequencedTaskExecutor* field_writer,
+ vespalib::InvokeService * invokerService,
const ThreadingServiceConfig& cfg,
uint32_t stackSize)
@@ -61,12 +62,20 @@ ExecutorThreadingService::ExecutorThreadingService(vespalib::ThreadExecutor& sha
_field_writer(),
_index_field_inverter_ptr(),
_index_field_writer_ptr(),
- _attribute_field_writer_ptr()
+ _attribute_field_writer_ptr(),
+ _invokeRegistrations()
{
+ if (cfg.optimize() == vespalib::Executor::OptimizeFor::THROUGHPUT && invokerService) {
+ _invokeRegistrations.push_back(invokerService->registerInvoke([executor=_indexExecutor.get()](){ executor->wakeup();}));
+ _invokeRegistrations.push_back(invokerService->registerInvoke([executor=_summaryExecutor.get()](){ executor->wakeup();}));
+ }
if (_shared_field_writer == SharedFieldWriterExecutor::INDEX) {
_field_writer = SequencedTaskExecutor::create(field_writer_executor, cfg.indexingThreads() * 2, cfg.defaultTaskLimit());
_attributeFieldWriter = SequencedTaskExecutor::create(attribute_field_writer_executor, cfg.indexingThreads(), cfg.defaultTaskLimit(),
cfg.optimize(), cfg.kindOfwatermark(), cfg.reactionTime());
+ if (cfg.optimize() == vespalib::Executor::OptimizeFor::THROUGHPUT && invokerService) {
+ _invokeRegistrations.push_back(invokerService->registerInvoke([executor=_attributeFieldWriter.get()](){ executor->wakeup();}));
+ }
_index_field_inverter_ptr = _field_writer.get();
_index_field_writer_ptr = _field_writer.get();
_attribute_field_writer_ptr = _attributeFieldWriter.get();
@@ -74,6 +83,9 @@ ExecutorThreadingService::ExecutorThreadingService(vespalib::ThreadExecutor& sha
} else if (_shared_field_writer == SharedFieldWriterExecutor::INDEX_AND_ATTRIBUTE) {
_field_writer = SequencedTaskExecutor::create(field_writer_executor, cfg.indexingThreads() * 3, cfg.defaultTaskLimit(),
cfg.optimize(), cfg.kindOfwatermark(), cfg.reactionTime());
+ if (cfg.optimize() == vespalib::Executor::OptimizeFor::THROUGHPUT && invokerService) {
+ _invokeRegistrations.push_back(invokerService->registerInvoke([executor=_field_writer.get()](){ executor->wakeup();}));
+ }
_index_field_inverter_ptr = _field_writer.get();
_index_field_writer_ptr = _field_writer.get();
_attribute_field_writer_ptr = _field_writer.get();
@@ -87,6 +99,9 @@ ExecutorThreadingService::ExecutorThreadingService(vespalib::ThreadExecutor& sha
_indexFieldWriter = SequencedTaskExecutor::create(index_field_writer_executor, cfg.indexingThreads(), cfg.defaultTaskLimit());
_attributeFieldWriter = SequencedTaskExecutor::create(attribute_field_writer_executor, cfg.indexingThreads(), cfg.defaultTaskLimit(),
cfg.optimize(), cfg.kindOfwatermark(), cfg.reactionTime());
+ if (cfg.optimize() == vespalib::Executor::OptimizeFor::THROUGHPUT && invokerService) {
+ _invokeRegistrations.push_back(invokerService->registerInvoke([executor=_attributeFieldWriter.get()](){ executor->wakeup();}));
+ }
_index_field_inverter_ptr = _indexFieldInverter.get();
_index_field_writer_ptr = _indexFieldWriter.get();
_attribute_field_writer_ptr = _attributeFieldWriter.get();
diff --git a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h
index 972e0de0ec0..e55e95c6745 100644
--- a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h
+++ b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h
@@ -5,6 +5,7 @@
#include "threading_service_config.h"
#include <vespa/searchcorespi/index/ithreadingservice.h>
#include <vespa/vespalib/util/threadstackexecutor.h>
+#include <vespa/vespalib/util/invokeservice.h>
namespace proton {
@@ -17,6 +18,7 @@ class ExecutorThreadingServiceStats;
class ExecutorThreadingService : public searchcorespi::index::IThreadingService
{
private:
+ using Registration = std::unique_ptr<vespalib::IDestructorCallback>;
vespalib::ThreadExecutor & _sharedExecutor;
vespalib::ThreadStackExecutor _masterExecutor;
ThreadingServiceConfig::SharedFieldWriterExecutor _shared_field_writer;
@@ -32,6 +34,7 @@ private:
vespalib::ISequencedTaskExecutor* _index_field_inverter_ptr;
vespalib::ISequencedTaskExecutor* _index_field_writer_ptr;
vespalib::ISequencedTaskExecutor* _attribute_field_writer_ptr;
+ std::vector<Registration> _invokeRegistrations;
void syncOnce();
public:
@@ -43,6 +46,7 @@ public:
ExecutorThreadingService(vespalib::ThreadExecutor& sharedExecutor,
vespalib::ISequencedTaskExecutor* field_writer,
+ vespalib::InvokeService * invokeService,
const ThreadingServiceConfig& cfg,
uint32_t stackSize = 128 * 1024);
~ExecutorThreadingService() override;
diff --git a/searchcore/src/vespa/searchcore/proton/server/i_shared_threading_service.h b/searchcore/src/vespa/searchcore/proton/server/i_shared_threading_service.h
index 9cd19012223..dfa48cb8d1a 100644
--- a/searchcore/src/vespa/searchcore/proton/server/i_shared_threading_service.h
+++ b/searchcore/src/vespa/searchcore/proton/server/i_shared_threading_service.h
@@ -4,6 +4,7 @@
namespace vespalib {
class ISequencedTaskExecutor;
class ThreadExecutor;
+class InvokeService;
}
namespace proton {
@@ -38,6 +39,11 @@ public:
* TODO: Make this a reference when it is always shared.
*/
virtual vespalib::ISequencedTaskExecutor* field_writer() = 0;
+
+ /**
+ * Returns an InvokeService intended for regular wakeup calls.
+ */
+ virtual vespalib::InvokeService & invokeService() = 0;
};
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.cpp b/searchcore/src/vespa/searchcore/proton/server/proton.cpp
index 30a23aaa3d5..0bcbbc14650 100644
--- a/searchcore/src/vespa/searchcore/proton/server/proton.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/proton.cpp
@@ -40,6 +40,7 @@
#include <vespa/vespalib/util/random.h>
#include <vespa/vespalib/util/sequencedtaskexecutor.h>
#include <vespa/vespalib/util/size_literals.h>
+#include <vespa/vespalib/util/invokeserviceimpl.h>
#ifdef __linux__
#include <malloc.h>
#endif
diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.h b/searchcore/src/vespa/searchcore/proton/server/proton.h
index 91635dc7497..c18737f22b5 100644
--- a/searchcore/src/vespa/searchcore/proton/server/proton.h
+++ b/searchcore/src/vespa/searchcore/proton/server/proton.h
@@ -29,7 +29,9 @@
#include <mutex>
#include <shared_mutex>
-namespace vespalib { class StateServer; }
+namespace vespalib {
+ class StateServer;
+}
namespace search::transactionlog { class TransLogServerApp; }
namespace metrics { class MetricLockGuard; }
namespace storage::spi { struct PersistenceProvider; }
@@ -61,6 +63,7 @@ private:
using DocumentDBMap = std::map<DocTypeName, DocumentDB::SP>;
using InitializeThreads = std::shared_ptr<vespalib::ThreadExecutor>;
using BucketSpace = document::BucketSpace;
+ using InvokeService = vespalib::InvokeService;
class ProtonFileHeaderContext : public search::common::FileHeaderContext
{
diff --git a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp
index c4fc79c43fd..fa4771bee1d 100644
--- a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp
@@ -5,6 +5,7 @@
#include <vespa/vespalib/util/isequencedtaskexecutor.h>
#include <vespa/vespalib/util/sequencedtaskexecutor.h>
#include <vespa/vespalib/util/size_literals.h>
+#include <vespa/vespalib/util/invokeserviceimpl.h>
VESPA_THREAD_STACK_TAG(proton_field_writer_executor)
VESPA_THREAD_STACK_TAG(proton_shared_executor)
@@ -18,7 +19,9 @@ SharedThreadingService::SharedThreadingService(const SharedThreadingServiceConfi
: _warmup(cfg.warmup_threads(), 128_Ki, proton_warmup_executor),
_shared(std::make_shared<vespalib::BlockingThreadStackExecutor>(cfg.shared_threads(), 128_Ki,
cfg.shared_task_limit(), proton_shared_executor)),
- _field_writer()
+ _field_writer(),
+ _invokeService(5ms),
+ _invokeRegistrations()
{
const auto& fw_cfg = cfg.field_writer_config();
if (fw_cfg.shared_field_writer() == SharedFieldWriterExecutor::DOCUMENT_DB) {
@@ -28,6 +31,11 @@ SharedThreadingService::SharedThreadingService(const SharedThreadingServiceConfi
fw_cfg.optimize(),
fw_cfg.kindOfwatermark(),
fw_cfg.reactionTime());
+ if (fw_cfg.optimize() == vespalib::Executor::OptimizeFor::THROUGHPUT) {
+ _invokeRegistrations.push_back(_invokeService.registerInvoke([executor = _field_writer.get()]() {
+ executor->wakeup();
+ }));
+ }
}
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.h b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.h
index 6006d484c97..cd0e6d71402 100644
--- a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.h
+++ b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.h
@@ -5,6 +5,7 @@
#include "shared_threading_service_config.h"
#include <vespa/vespalib/util/threadstackexecutor.h>
#include <vespa/vespalib/util/syncable.h>
+#include <vespa/vespalib/util/invokeserviceimpl.h>
#include <memory>
namespace proton {
@@ -14,9 +15,12 @@ namespace proton {
*/
class SharedThreadingService : public ISharedThreadingService {
private:
+ using Registration = std::unique_ptr<vespalib::IDestructorCallback>;
vespalib::ThreadStackExecutor _warmup;
std::shared_ptr<vespalib::SyncableThreadExecutor> _shared;
std::unique_ptr<vespalib::ISequencedTaskExecutor> _field_writer;
+ vespalib::InvokeServiceImpl _invokeService;
+ std::vector<Registration> _invokeRegistrations;
public:
SharedThreadingService(const SharedThreadingServiceConfig& cfg);
@@ -28,6 +32,7 @@ public:
vespalib::ThreadExecutor& warmup() override { return _warmup; }
vespalib::ThreadExecutor& shared() override { return *_shared; }
vespalib::ISequencedTaskExecutor* field_writer() override { return _field_writer.get(); }
+ vespalib::InvokeService & invokeService() override { return _invokeService; }
};
}
diff --git a/searchcore/src/vespa/searchcore/proton/test/mock_shared_threading_service.h b/searchcore/src/vespa/searchcore/proton/test/mock_shared_threading_service.h
index 976d75f2571..74965c15cd4 100644
--- a/searchcore/src/vespa/searchcore/proton/test/mock_shared_threading_service.h
+++ b/searchcore/src/vespa/searchcore/proton/test/mock_shared_threading_service.h
@@ -2,6 +2,7 @@
#pragma once
#include <vespa/searchcore/proton/server/i_shared_threading_service.h>
+#include <vespa/vespalib/util/invokeserviceimpl.h>
namespace proton {
@@ -9,16 +10,19 @@ class MockSharedThreadingService : public ISharedThreadingService {
private:
vespalib::ThreadExecutor& _warmup;
vespalib::ThreadExecutor& _shared;
+ vespalib::InvokeServiceImpl _invokeService;
public:
MockSharedThreadingService(vespalib::ThreadExecutor& warmup_in,
vespalib::ThreadExecutor& shared_in)
: _warmup(warmup_in),
- _shared(shared_in)
+ _shared(shared_in),
+ _invokeService(10ms)
{}
vespalib::ThreadExecutor& warmup() override { return _warmup; }
vespalib::ThreadExecutor& shared() override { return _shared; }
vespalib::ISequencedTaskExecutor* field_writer() override { return nullptr; }
+ vespalib::InvokeService & invokeService() override { return _invokeService; }
};
}