summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGeir Storli <geirst@yahooinc.com>2022-05-16 17:04:44 +0200
committerGitHub <noreply@github.com>2022-05-16 17:04:44 +0200
commit4bb588d36eab746d4bc5d210f869b116790ff437 (patch)
tree3ab47098beae9fde6e9fb5fb95bdf88497ecf42b
parentcc8d97a8e9f56d139700424e1393ea954723ae2d (diff)
parente6ff27a287c753d5ccbe3c08eb881dc2c78e3e7a (diff)
Merge pull request #22621 from vespa-engine/geirst/shared-field-writer-cleanup
Remove unused code after the shared field writer is used for all docu…
-rw-r--r--searchcore/src/tests/proton/documentdb/executor_threading_service/executor_threading_service_test.cpp38
-rw-r--r--searchcore/src/tests/proton/server/shared_threading_service/shared_threading_service_test.cpp3
-rw-r--r--searchcore/src/vespa/searchcore/config/proton.def10
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp115
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h20
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/i_shared_threading_service.h5
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/proton.cpp6
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp24
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/shared_threading_service.h2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp16
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/threading_service_config.h8
-rw-r--r--searchcore/src/vespa/searchcore/proton/test/mock_shared_threading_service.h2
-rw-r--r--searchcore/src/vespa/searchcore/proton/test/transport_helper.cpp15
-rw-r--r--searchcore/src/vespa/searchcore/proton/test/transport_helper.h3
14 files changed, 74 insertions, 193 deletions
diff --git a/searchcore/src/tests/proton/documentdb/executor_threading_service/executor_threading_service_test.cpp b/searchcore/src/tests/proton/documentdb/executor_threading_service/executor_threading_service_test.cpp
index 46a6419e924..d4f4e24ba6c 100644
--- a/searchcore/src/tests/proton/documentdb/executor_threading_service/executor_threading_service_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/executor_threading_service/executor_threading_service_test.cpp
@@ -9,7 +9,6 @@
using namespace proton;
using vespalib::ISequencedTaskExecutor;
using vespalib::SequencedTaskExecutor;
-using SharedFieldWriterExecutor = ThreadingServiceConfig::SharedFieldWriterExecutor;
VESPA_THREAD_STACK_TAG(my_field_writer_executor)
@@ -30,13 +29,13 @@ public:
service()
{
}
- void setup(uint32_t indexing_threads, SharedFieldWriterExecutor shared_field_writer) {
+ void setup(uint32_t indexing_threads) {
service = std::make_unique<ExecutorThreadingService>(_transport.shared(),
_transport.transport(),
_transport.clock(),
- field_writer_executor.get(),
+ *field_writer_executor,
nullptr,
- ThreadingServiceConfig::make(indexing_threads, shared_field_writer));
+ ThreadingServiceConfig::make(indexing_threads));
}
SequencedTaskExecutor* index_inverter() {
return to_concrete_type(service->indexFieldInverter());
@@ -59,36 +58,9 @@ assert_executor(SequencedTaskExecutor* exec, uint32_t exp_executors, uint32_t ex
EXPECT_EQ(exp_task_limit, exec->first_executor()->getTaskLimit());
}
-TEST_F(ExecutorThreadingServiceTest, no_shared_field_writer_executor)
-{
- setup(4, SharedFieldWriterExecutor::NONE);
- EXPECT_NE(index_inverter(), index_writer());
- EXPECT_NE(index_writer(), attribute_writer());
- assert_executor(index_inverter(), 4, 100);
- assert_executor(index_writer(), 4, 100);
- assert_executor(attribute_writer(), 4, 100);
-}
-
-TEST_F(ExecutorThreadingServiceTest, shared_executor_for_index_field_writers)
-{
- setup(4, SharedFieldWriterExecutor::INDEX);
- EXPECT_EQ(index_inverter(), index_writer());
- EXPECT_NE(index_inverter(), attribute_writer());
- assert_executor(index_inverter(), 8, 100);
- assert_executor(attribute_writer(), 4, 100);
-}
-
-TEST_F(ExecutorThreadingServiceTest, shared_executor_for_index_and_attribute_field_writers)
-{
- setup(4, SharedFieldWriterExecutor::INDEX_AND_ATTRIBUTE);
- EXPECT_EQ(index_inverter(), index_writer());
- EXPECT_EQ(index_inverter(), attribute_writer());
- assert_executor(index_inverter(), 12, 100);
-}
-
TEST_F(ExecutorThreadingServiceTest, shared_field_writer_specified_from_the_outside)
{
- setup(4, SharedFieldWriterExecutor::DOCUMENT_DB);
+ setup(4);
EXPECT_EQ(field_writer(), index_inverter());
EXPECT_EQ(field_writer(), index_writer());
EXPECT_EQ(field_writer(), attribute_writer());
@@ -97,7 +69,7 @@ TEST_F(ExecutorThreadingServiceTest, shared_field_writer_specified_from_the_outs
TEST_F(ExecutorThreadingServiceTest, tasks_limits_can_be_updated)
{
- setup(4, SharedFieldWriterExecutor::NONE);
+ setup(4);
service->set_task_limits(5, 7, 11);
EXPECT_EQ(5, service->master_task_limit());
EXPECT_EQ(7, service->index().getTaskLimit());
diff --git a/searchcore/src/tests/proton/server/shared_threading_service/shared_threading_service_test.cpp b/searchcore/src/tests/proton/server/shared_threading_service/shared_threading_service_test.cpp
index a15bd8c67c9..948c98b1034 100644
--- a/searchcore/src/tests/proton/server/shared_threading_service/shared_threading_service_test.cpp
+++ b/searchcore/src/tests/proton/server/shared_threading_service/shared_threading_service_test.cpp
@@ -26,7 +26,6 @@ make_proton_config(double concurrency)
builder.flush.maxconcurrent = 1;
builder.feeding.concurrency = concurrency;
- builder.feeding.sharedFieldWriterExecutor = ProtonConfig::Feeding::SharedFieldWriterExecutor::DOCUMENT_DB;
builder.indexing.tasklimit = 255;
return builder;
}
@@ -65,7 +64,7 @@ public:
transport.transport(), bucket_executor);
}
SequencedTaskExecutor* field_writer() {
- return dynamic_cast<SequencedTaskExecutor*>(service->field_writer());
+ return dynamic_cast<SequencedTaskExecutor*>(&service->field_writer());
}
};
diff --git a/searchcore/src/vespa/searchcore/config/proton.def b/searchcore/src/vespa/searchcore/config/proton.def
index 51536299d51..7cc78178b51 100644
--- a/searchcore/src/vespa/searchcore/config/proton.def
+++ b/searchcore/src/vespa/searchcore/config/proton.def
@@ -506,16 +506,6 @@ hwinfo.cpu.cores int default = 0 restart
## Deprecated -> Use documentdb.feeding.concurrency
feeding.concurrency double default = 0.2 restart
-## Whether we should use a shared field writer executor in the document db threading service:
-##
-## NONE: Don't use a shared executor.
-## INDEX: Use a shared executor for index field inverter and index field writer.
-## INDEX_AND_ATTRIBUTE: Use a shared executor for index field inverter, index field writer, and attribute field writer.
-## DOCUMENT_DB: Use a shared executor for index field inverter, index field writer, and attribute field writer among all document dbs.
-##
-## TODO: Remove this when a shared executor is the default.
-feeding.shared_field_writer_executor enum {NONE, INDEX, INDEX_AND_ATTRIBUTE, DOCUMENT_DB} default = DOCUMENT_DB restart
-
## Maximum number of pending tasks for the master thread in each document db.
##
## This limit is only considered when executing tasks for handling external feed operations.
diff --git a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp
index dd735c75d79..eb1c953dcf5 100644
--- a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp
@@ -15,7 +15,6 @@ using vespalib::SingleExecutor;
using vespalib::SyncableThreadExecutor;
using vespalib::steady_time;
using OptimizeFor = vespalib::Executor::OptimizeFor;
-using SharedFieldWriterExecutor = proton::ThreadingServiceConfig::SharedFieldWriterExecutor;
namespace proton {
@@ -34,22 +33,21 @@ createExecutorWithOneThread(uint32_t stackSize, uint32_t taskLimit, OptimizeFor
VESPA_THREAD_STACK_TAG(master_executor)
VESPA_THREAD_STACK_TAG(index_executor)
VESPA_THREAD_STACK_TAG(summary_executor)
-VESPA_THREAD_STACK_TAG(index_field_inverter_executor)
-VESPA_THREAD_STACK_TAG(index_field_writer_executor)
-VESPA_THREAD_STACK_TAG(attribute_field_writer_executor)
-VESPA_THREAD_STACK_TAG(field_writer_executor)
}
-ExecutorThreadingService::ExecutorThreadingService(vespalib::Executor &sharedExecutor, FNET_Transport & transport,
- const vespalib::Clock & clock, uint32_t num_treads)
- : ExecutorThreadingService(sharedExecutor, transport, clock, nullptr, nullptr, ThreadingServiceConfig::make(num_treads))
+ExecutorThreadingService::ExecutorThreadingService(vespalib::Executor& sharedExecutor,
+ FNET_Transport& transport,
+ const vespalib::Clock& clock,
+ vespalib::ISequencedTaskExecutor& field_writer,
+ uint32_t num_treads)
+ : ExecutorThreadingService(sharedExecutor, transport, clock, field_writer, nullptr, ThreadingServiceConfig::make(num_treads))
{}
ExecutorThreadingService::ExecutorThreadingService(vespalib::Executor & sharedExecutor,
FNET_Transport & transport,
const vespalib::Clock & clock,
- vespalib::ISequencedTaskExecutor * field_writer,
+ vespalib::ISequencedTaskExecutor& field_writer,
vespalib::InvokeService * invokerService,
const ThreadingServiceConfig & cfg,
uint32_t stackSize)
@@ -58,7 +56,6 @@ ExecutorThreadingService::ExecutorThreadingService(vespalib::Executor & sharedEx
_transport(transport),
_clock(clock),
_masterExecutor(1, stackSize, CpuUsage::wrap(master_executor, CpuUsage::Category::WRITE)),
- _shared_field_writer(cfg.shared_field_writer()),
_master_task_limit(cfg.master_task_limit()),
_indexExecutor(createExecutorWithOneThread(stackSize, cfg.defaultTaskLimit(), cfg.optimize(),
CpuUsage::wrap(index_executor, CpuUsage::Category::WRITE))),
@@ -66,62 +63,15 @@ ExecutorThreadingService::ExecutorThreadingService(vespalib::Executor & sharedEx
CpuUsage::wrap(summary_executor, CpuUsage::Category::WRITE))),
_masterService(_masterExecutor),
_indexService(*_indexExecutor),
- _indexFieldInverter(),
- _indexFieldWriter(),
- _attributeFieldWriter(),
- _field_writer(),
- _index_field_inverter_ptr(),
- _index_field_writer_ptr(),
- _attribute_field_writer_ptr(),
+ _index_field_inverter(field_writer),
+ _index_field_writer(field_writer),
+ _attribute_field_writer(field_writer),
_invokeRegistrations()
{
if (cfg.optimize() == vespalib::Executor::OptimizeFor::THROUGHPUT && invokerService) {
_invokeRegistrations.push_back(invokerService->registerInvoke([executor=_indexExecutor.get()](){ executor->wakeup();}));
_invokeRegistrations.push_back(invokerService->registerInvoke([executor=_summaryExecutor.get()](){ executor->wakeup();}));
}
- if (_shared_field_writer == SharedFieldWriterExecutor::INDEX) {
- _field_writer = SequencedTaskExecutor::create(CpuUsage::wrap(field_writer_executor, CpuUsage::Category::WRITE),
- cfg.indexingThreads() * 2, cfg.defaultTaskLimit());
- _attributeFieldWriter = SequencedTaskExecutor::create(CpuUsage::wrap(attribute_field_writer_executor, CpuUsage::Category::WRITE),
- cfg.indexingThreads(), cfg.defaultTaskLimit(),
- cfg.is_task_limit_hard(), cfg.optimize(), cfg.kindOfwatermark());
- if (cfg.optimize() == vespalib::Executor::OptimizeFor::THROUGHPUT && invokerService) {
- _invokeRegistrations.push_back(invokerService->registerInvoke([executor=_attributeFieldWriter.get()](){ executor->wakeup();}));
- }
- _index_field_inverter_ptr = _field_writer.get();
- _index_field_writer_ptr = _field_writer.get();
- _attribute_field_writer_ptr = _attributeFieldWriter.get();
-
- } else if (_shared_field_writer == SharedFieldWriterExecutor::INDEX_AND_ATTRIBUTE) {
- _field_writer = SequencedTaskExecutor::create(CpuUsage::wrap(field_writer_executor, CpuUsage::Category::WRITE),
- cfg.indexingThreads() * 3, cfg.defaultTaskLimit(),
- cfg.is_task_limit_hard(), cfg.optimize(), cfg.kindOfwatermark());
- if (cfg.optimize() == vespalib::Executor::OptimizeFor::THROUGHPUT && invokerService) {
- _invokeRegistrations.push_back(invokerService->registerInvoke([executor=_field_writer.get()](){ executor->wakeup();}));
- }
- _index_field_inverter_ptr = _field_writer.get();
- _index_field_writer_ptr = _field_writer.get();
- _attribute_field_writer_ptr = _field_writer.get();
- } else if (_shared_field_writer == SharedFieldWriterExecutor::DOCUMENT_DB) {
- assert(field_writer != nullptr);
- _index_field_inverter_ptr = field_writer;
- _index_field_writer_ptr = field_writer;
- _attribute_field_writer_ptr = field_writer;
- } else {
- _indexFieldInverter = SequencedTaskExecutor::create(CpuUsage::wrap(index_field_inverter_executor, CpuUsage::Category::WRITE),
- cfg.indexingThreads(), cfg.defaultTaskLimit());
- _indexFieldWriter = SequencedTaskExecutor::create(CpuUsage::wrap(index_field_writer_executor, CpuUsage::Category::WRITE),
- cfg.indexingThreads(), cfg.defaultTaskLimit());
- _attributeFieldWriter = SequencedTaskExecutor::create(CpuUsage::wrap(attribute_field_writer_executor, CpuUsage::Category::WRITE),
- cfg.indexingThreads(), cfg.defaultTaskLimit(),
- cfg.is_task_limit_hard(), cfg.optimize(), cfg.kindOfwatermark());
- if (cfg.optimize() == vespalib::Executor::OptimizeFor::THROUGHPUT && invokerService) {
- _invokeRegistrations.push_back(invokerService->registerInvoke([executor=_attributeFieldWriter.get()](){ executor->wakeup();}));
- }
- _index_field_inverter_ptr = _indexFieldInverter.get();
- _index_field_writer_ptr = _indexFieldWriter.get();
- _attribute_field_writer_ptr = _attributeFieldWriter.get();
- }
}
ExecutorThreadingService::~ExecutorThreadingService() = default;
@@ -140,11 +90,11 @@ void
ExecutorThreadingService::shutdown()
{
_masterExecutor.shutdown().sync();
- _attribute_field_writer_ptr->sync_all();
+ _attribute_field_writer.sync_all();
_summaryExecutor->shutdown().sync();
_indexExecutor->shutdown().sync();
- _index_field_inverter_ptr->sync_all();
- _index_field_writer_ptr->sync_all();
+ _index_field_inverter.sync_all();
+ _index_field_writer.sync_all();
}
void
@@ -156,9 +106,9 @@ ExecutorThreadingService::set_task_limits(uint32_t master_task_limit,
_indexExecutor->setTaskLimit(field_task_limit);
_summaryExecutor->setTaskLimit(summary_task_limit);
// TODO: Move this to a common place when the field writer is always shared.
- _index_field_inverter_ptr->setTaskLimit(field_task_limit);
- _index_field_writer_ptr->setTaskLimit(field_task_limit);
- _attribute_field_writer_ptr->setTaskLimit(field_task_limit);
+ _index_field_inverter.setTaskLimit(field_task_limit);
+ _index_field_writer.setTaskLimit(field_task_limit);
+ _attribute_field_writer.setTaskLimit(field_task_limit);
}
ExecutorThreadingServiceStats
@@ -167,44 +117,25 @@ ExecutorThreadingService::getStats()
auto master_stats = _masterExecutor.getStats();
auto index_stats = _indexExecutor->getStats();
auto summary_stats = _summaryExecutor->getStats();
- if (_shared_field_writer == SharedFieldWriterExecutor::INDEX) {
- auto field_writer_stats = _field_writer->getStats();
- return ExecutorThreadingServiceStats(master_stats, index_stats, summary_stats,
- field_writer_stats,
- field_writer_stats,
- _attribute_field_writer_ptr->getStats());
- } else if (_shared_field_writer == SharedFieldWriterExecutor::INDEX_AND_ATTRIBUTE) {
- auto field_writer_stats = _field_writer->getStats();
- return ExecutorThreadingServiceStats(master_stats, index_stats, summary_stats,
- field_writer_stats,
- field_writer_stats,
- field_writer_stats);
- } else if (_shared_field_writer == SharedFieldWriterExecutor::DOCUMENT_DB) {
- vespalib::ExecutorStats empty_stats;
- // In this case the field writer stats are reported at a higher level.
- return ExecutorThreadingServiceStats(master_stats, index_stats, summary_stats,
- empty_stats, empty_stats, empty_stats);
- } else {
- return ExecutorThreadingServiceStats(master_stats, index_stats, summary_stats,
- _index_field_inverter_ptr->getStats(),
- _index_field_writer_ptr->getStats(),
- _attribute_field_writer_ptr->getStats());
- }
+ vespalib::ExecutorStats empty_stats;
+ // In this case the field writer stats are reported at a higher level.
+ return ExecutorThreadingServiceStats(master_stats, index_stats, summary_stats,
+ empty_stats, empty_stats, empty_stats);
}
vespalib::ISequencedTaskExecutor &
ExecutorThreadingService::indexFieldInverter() {
- return *_index_field_inverter_ptr;
+ return _index_field_inverter;
}
vespalib::ISequencedTaskExecutor &
ExecutorThreadingService::indexFieldWriter() {
- return *_index_field_writer_ptr;
+ return _index_field_writer;
}
vespalib::ISequencedTaskExecutor &
ExecutorThreadingService::attributeFieldWriter() {
- return *_attribute_field_writer_ptr;
+ return _attribute_field_writer;
}
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h
index 1179c88ef76..03c3feabd86 100644
--- a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h
+++ b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h
@@ -24,19 +24,14 @@ private:
FNET_Transport & _transport;
const vespalib::Clock & _clock;
vespalib::ThreadStackExecutor _masterExecutor;
- ThreadingServiceConfig::SharedFieldWriterExecutor _shared_field_writer;
std::atomic<uint32_t> _master_task_limit;
std::unique_ptr<vespalib::SyncableThreadExecutor> _indexExecutor;
std::unique_ptr<vespalib::SyncableThreadExecutor> _summaryExecutor;
SyncableExecutorThreadService _masterService;
ExecutorThreadService _indexService;
- std::unique_ptr<vespalib::ISequencedTaskExecutor> _indexFieldInverter;
- std::unique_ptr<vespalib::ISequencedTaskExecutor> _indexFieldWriter;
- std::unique_ptr<vespalib::ISequencedTaskExecutor> _attributeFieldWriter;
- std::unique_ptr<vespalib::ISequencedTaskExecutor> _field_writer;
- vespalib::ISequencedTaskExecutor* _index_field_inverter_ptr;
- vespalib::ISequencedTaskExecutor* _index_field_writer_ptr;
- vespalib::ISequencedTaskExecutor* _attribute_field_writer_ptr;
+ vespalib::ISequencedTaskExecutor& _index_field_inverter;
+ vespalib::ISequencedTaskExecutor& _index_field_writer;
+ vespalib::ISequencedTaskExecutor& _attribute_field_writer;
std::vector<Registration> _invokeRegistrations;
public:
@@ -44,13 +39,16 @@ public:
/**
* Convenience constructor used in unit tests.
*/
- ExecutorThreadingService(vespalib::Executor& sharedExecutor, FNET_Transport & transport,
- const vespalib::Clock & clock, uint32_t num_treads = 1);
+ ExecutorThreadingService(vespalib::Executor& sharedExecutor,
+ FNET_Transport& transport,
+ const vespalib::Clock& clock,
+ vespalib::ISequencedTaskExecutor& field_writer,
+ uint32_t num_treads = 1);
ExecutorThreadingService(vespalib::Executor& sharedExecutor,
FNET_Transport & transport,
const vespalib::Clock & clock,
- vespalib::ISequencedTaskExecutor* field_writer,
+ vespalib::ISequencedTaskExecutor& field_writer,
vespalib::InvokeService * invokeService,
const ThreadingServiceConfig& cfg,
uint32_t stackSize = 128 * 1024);
diff --git a/searchcore/src/vespa/searchcore/proton/server/i_shared_threading_service.h b/searchcore/src/vespa/searchcore/proton/server/i_shared_threading_service.h
index dccea41373f..0bfd874b90f 100644
--- a/searchcore/src/vespa/searchcore/proton/server/i_shared_threading_service.h
+++ b/searchcore/src/vespa/searchcore/proton/server/i_shared_threading_service.h
@@ -39,11 +39,8 @@ public:
/**
* Returns the sequenced executor used to write index and attribute fields in a document db.
- *
- * This is a nullptr if the field writer is not shared across all document dbs.
- * TODO: Make this a reference when it is always shared.
*/
- virtual vespalib::ISequencedTaskExecutor* field_writer() = 0;
+ virtual vespalib::ISequencedTaskExecutor& field_writer() = 0;
/**
* Returns an InvokeService intended for regular wakeup calls.
diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.cpp b/searchcore/src/vespa/searchcore/proton/server/proton.cpp
index 8d52ecc0ba7..b5e70a540ec 100644
--- a/searchcore/src/vespa/searchcore/proton/server/proton.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/proton.cpp
@@ -809,9 +809,7 @@ Proton::updateMetrics(const metrics::MetricLockGuard &)
if (_shared_service) {
metrics.shared.update(_shared_service->shared().getStats());
metrics.warmup.update(_shared_service->warmup().getStats());
- if (_shared_service->field_writer()) {
- metrics.field_writer.update(_shared_service->field_writer()->getStats());
- }
+ metrics.field_writer.update(_shared_service->field_writer().getStats());
}
}
}
@@ -970,7 +968,7 @@ Proton::get_child(vespalib::stringref name) const
(_flushEngine) ? &_flushEngine->get_executor() : nullptr,
&_executor,
(_shared_service) ? &_shared_service->warmup() : nullptr,
- (_shared_service) ? _shared_service->field_writer() : nullptr);
+ (_shared_service) ? &_shared_service->field_writer() : nullptr);
} else if (name == HW_INFO) {
return std::make_unique<HwInfoExplorer>(_hw_info);
diff --git a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp
index 79827ce81c0..82d7f6c650d 100644
--- a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp
@@ -16,8 +16,6 @@ VESPA_THREAD_STACK_TAG(proton_warmup_executor)
namespace proton {
-using SharedFieldWriterExecutor = ThreadingServiceConfig::ProtonConfig::Feeding::SharedFieldWriterExecutor;
-
SharedThreadingService::SharedThreadingService(const SharedThreadingServiceConfig& cfg,
FNET_Transport& transport,
storage::spi::BucketExecutor& bucket_executor)
@@ -35,18 +33,16 @@ SharedThreadingService::SharedThreadingService(const SharedThreadingServiceConfi
_clock(_invokeService.nowRef())
{
const auto& fw_cfg = cfg.field_writer_config();
- if (fw_cfg.shared_field_writer() == SharedFieldWriterExecutor::DOCUMENT_DB) {
- _field_writer = vespalib::SequencedTaskExecutor::create(CpuUsage::wrap(proton_field_writer_executor, CpuUsage::Category::WRITE),
- fw_cfg.indexingThreads() * 3,
- fw_cfg.defaultTaskLimit(),
- fw_cfg.is_task_limit_hard(),
- fw_cfg.optimize(),
- fw_cfg.kindOfwatermark());
- if (fw_cfg.optimize() == vespalib::Executor::OptimizeFor::THROUGHPUT) {
- _invokeRegistrations.push_back(_invokeService.registerInvoke([executor = _field_writer.get()]() {
- executor->wakeup();
- }));
- }
+ _field_writer = vespalib::SequencedTaskExecutor::create(CpuUsage::wrap(proton_field_writer_executor, CpuUsage::Category::WRITE),
+ fw_cfg.indexingThreads() * 3,
+ fw_cfg.defaultTaskLimit(),
+ fw_cfg.is_task_limit_hard(),
+ fw_cfg.optimize(),
+ fw_cfg.kindOfwatermark());
+ if (fw_cfg.optimize() == vespalib::Executor::OptimizeFor::THROUGHPUT) {
+ _invokeRegistrations.push_back(_invokeService.registerInvoke([executor = _field_writer.get()]() {
+ executor->wakeup();
+ }));
}
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.h b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.h
index ead16441da0..019b9fe0596 100644
--- a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.h
+++ b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.h
@@ -36,7 +36,7 @@ public:
vespalib::ThreadExecutor& warmup() override { return *_warmup; }
vespalib::ThreadExecutor& shared() override { return *_shared; }
- vespalib::ISequencedTaskExecutor* field_writer() override { return _field_writer.get(); }
+ vespalib::ISequencedTaskExecutor& field_writer() override { return *_field_writer; }
vespalib::InvokeService & invokeService() override { return _invokeService; }
FNET_Transport & transport() override { return _transport; }
storage::spi::BucketExecutor& bucket_executor() override { return _bucket_executor; }
diff --git a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp
index 335d5bab8d0..c129a78b045 100644
--- a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp
@@ -15,16 +15,14 @@ ThreadingServiceConfig::ThreadingServiceConfig(uint32_t indexingThreads_,
int32_t defaultTaskLimit_,
OptimizeFor optimize_,
uint32_t kindOfWatermark_,
- vespalib::duration reactionTime_,
- SharedFieldWriterExecutor shared_field_writer_)
+ vespalib::duration reactionTime_)
: _indexingThreads(indexingThreads_),
_master_task_limit(master_task_limit_),
_defaultTaskLimit(std::abs(defaultTaskLimit_)),
_is_task_limit_hard(defaultTaskLimit_ >= 0),
_optimize(optimize_),
_kindOfWatermark(kindOfWatermark_),
- _reactionTime(reactionTime_),
- _shared_field_writer(shared_field_writer_)
+ _reactionTime(reactionTime_)
{
}
@@ -60,13 +58,12 @@ ThreadingServiceConfig::make(const ProtonConfig &cfg, double concurrency, const
cfg.indexing.tasklimit,
selectOptimization(cfg.indexing.optimize),
cfg.indexing.kindOfWatermark,
- vespalib::from_s(cfg.indexing.reactiontime),
- cfg.feeding.sharedFieldWriterExecutor);
+ vespalib::from_s(cfg.indexing.reactiontime));
}
ThreadingServiceConfig
-ThreadingServiceConfig::make(uint32_t indexingThreads, SharedFieldWriterExecutor shared_field_writer_) {
- return ThreadingServiceConfig(indexingThreads, 0, 100, OptimizeFor::LATENCY, 0, 10ms, shared_field_writer_);
+ThreadingServiceConfig::make(uint32_t indexingThreads) {
+ return ThreadingServiceConfig(indexingThreads, 0, 100, OptimizeFor::LATENCY, 0, 10ms);
}
void
@@ -85,8 +82,7 @@ ThreadingServiceConfig::operator==(const ThreadingServiceConfig &rhs) const
_is_task_limit_hard == rhs._is_task_limit_hard &&
_optimize == rhs._optimize &&
_kindOfWatermark == rhs._kindOfWatermark &&
- _reactionTime == rhs._reactionTime &&
- _shared_field_writer == rhs._shared_field_writer;
+ _reactionTime == rhs._reactionTime;
}
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h
index a54c0674263..8c63340a62d 100644
--- a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h
+++ b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h
@@ -17,7 +17,6 @@ class ThreadingServiceConfig {
public:
using ProtonConfig = const vespa::config::search::core::internal::InternalProtonType;
using OptimizeFor = vespalib::Executor::OptimizeFor;
- using SharedFieldWriterExecutor = ProtonConfig::Feeding::SharedFieldWriterExecutor;
private:
uint32_t _indexingThreads;
@@ -27,16 +26,14 @@ private:
OptimizeFor _optimize;
uint32_t _kindOfWatermark;
vespalib::duration _reactionTime; // Maximum reaction time to new tasks
- SharedFieldWriterExecutor _shared_field_writer;
private:
ThreadingServiceConfig(uint32_t indexingThreads_, uint32_t master_task_limit_, int32_t defaultTaskLimit_,
- OptimizeFor optimize_, uint32_t kindOfWatermark_,
- vespalib::duration reactionTime_, SharedFieldWriterExecutor shared_field_writer_);
+ OptimizeFor optimize_, uint32_t kindOfWatermark_, vespalib::duration reactionTime_);
public:
static ThreadingServiceConfig make(const ProtonConfig &cfg, double concurrency, const HwInfo::Cpu &cpuInfo);
- static ThreadingServiceConfig make(uint32_t indexingThreads, SharedFieldWriterExecutor shared_field_writer_ = SharedFieldWriterExecutor::NONE);
+ static ThreadingServiceConfig make(uint32_t indexingThreads);
void update(const ThreadingServiceConfig& cfg);
uint32_t indexingThreads() const { return _indexingThreads; }
uint32_t master_task_limit() const { return _master_task_limit; }
@@ -45,7 +42,6 @@ public:
OptimizeFor optimize() const { return _optimize; }
uint32_t kindOfwatermark() const { return _kindOfWatermark; }
vespalib::duration reactionTime() const { return _reactionTime; }
- SharedFieldWriterExecutor shared_field_writer() const { return _shared_field_writer; }
bool operator==(const ThreadingServiceConfig &rhs) const;
};
diff --git a/searchcore/src/vespa/searchcore/proton/test/mock_shared_threading_service.h b/searchcore/src/vespa/searchcore/proton/test/mock_shared_threading_service.h
index 00ffdc92020..e92d4362f53 100644
--- a/searchcore/src/vespa/searchcore/proton/test/mock_shared_threading_service.h
+++ b/searchcore/src/vespa/searchcore/proton/test/mock_shared_threading_service.h
@@ -26,7 +26,7 @@ public:
~MockSharedThreadingService() override;
ThreadExecutor& warmup() override { return _warmup; }
ThreadExecutor& shared() override { return _shared; }
- vespalib::ISequencedTaskExecutor* field_writer() override { return _field_writer.get(); }
+ vespalib::ISequencedTaskExecutor& field_writer() override { return *_field_writer; }
vespalib::InvokeService & invokeService() override { return _invokeService; }
FNET_Transport & transport() override { return _transport.transport(); }
storage::spi::BucketExecutor& bucket_executor() override { return _bucket_executor; }
diff --git a/searchcore/src/vespa/searchcore/proton/test/transport_helper.cpp b/searchcore/src/vespa/searchcore/proton/test/transport_helper.cpp
index 0731a3429b1..80e2622fa3b 100644
--- a/searchcore/src/vespa/searchcore/proton/test/transport_helper.cpp
+++ b/searchcore/src/vespa/searchcore/proton/test/transport_helper.cpp
@@ -1,12 +1,13 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "transport_helper.h"
-#include <vespa/fnet/transport.h>
#include <vespa/fastos/thread.h>
+#include <vespa/fnet/transport.h>
+#include <vespa/searchcore/proton/server/executorthreadingservice.h>
+#include <vespa/vespalib/util/sequencedtaskexecutor.h>
#include <vespa/vespalib/util/size_literals.h>
-#include <vespa/vespalib/util/threadstackexecutor.h>
#include <vespa/vespalib/util/testclock.h>
-#include <vespa/searchcore/proton/server/executorthreadingservice.h>
+#include <vespa/vespalib/util/threadstackexecutor.h>
namespace proton {
@@ -31,9 +32,12 @@ Transport::shutdown() {
_transport->ShutDown(true);
}
+VESPA_THREAD_STACK_TAG(proton_transport_and_executor_field_writer)
+
TransportAndExecutor::TransportAndExecutor(size_t num_threads)
: Transport(),
- _sharedExecutor(std::make_unique<vespalib::ThreadStackExecutor>(num_threads, 64_Ki))
+ _sharedExecutor(std::make_unique<vespalib::ThreadStackExecutor>(num_threads, 64_Ki)),
+ _field_writer(vespalib::SequencedTaskExecutor::create(proton_transport_and_executor_field_writer, num_threads))
{}
TransportAndExecutor::~TransportAndExecutor() = default;
@@ -45,8 +49,9 @@ TransportAndExecutor::shutdown() {
TransportAndExecutorService::TransportAndExecutorService(size_t num_threads)
: TransportAndExecutor(num_threads),
- _writeService(std::make_unique<ExecutorThreadingService>(shared(), transport(), clock()))
+ _writeService(std::make_unique<ExecutorThreadingService>(shared(), transport(), clock(), field_writer()))
{}
+
TransportAndExecutorService::~TransportAndExecutorService() = default;
searchcorespi::index::IThreadingService &
diff --git a/searchcore/src/vespa/searchcore/proton/test/transport_helper.h b/searchcore/src/vespa/searchcore/proton/test/transport_helper.h
index 8ec4f50e3f0..46ca8131041 100644
--- a/searchcore/src/vespa/searchcore/proton/test/transport_helper.h
+++ b/searchcore/src/vespa/searchcore/proton/test/transport_helper.h
@@ -33,9 +33,12 @@ public:
TransportAndExecutor(size_t num_threads);
~TransportAndExecutor() override;
vespalib::Executor & shared() { return *_sharedExecutor; }
+ vespalib::ISequencedTaskExecutor& field_writer() { return *_field_writer; }
void shutdown() override;
private:
std::unique_ptr<vespalib::Executor> _sharedExecutor;
+ std::unique_ptr<vespalib::ISequencedTaskExecutor> _field_writer;
+
};
class TransportAndExecutorService : public TransportAndExecutor {