summaryrefslogtreecommitdiffstats
path: root/searchcore
diff options
context:
space:
mode:
authorGeir Storli <geirst@yahooinc.com>2021-11-29 12:56:46 +0000
committerGeir Storli <geirst@yahooinc.com>2021-11-29 12:56:46 +0000
commit18ba82682f0a9588f92f911ea4774d44d484f589 (patch)
tree5b86852de461dc3fecfec0ab378d8db6e1152c8d /searchcore
parent3c4983f0109da80fefc643e43d12798ccf38fb80 (diff)
Add support for using a shared field writer executor among all document dbs.
This is currently controlled with a feature flag setting a proton config to turn it on.
Diffstat (limited to 'searchcore')
-rw-r--r--searchcore/src/tests/proton/documentdb/executor_threading_service/executor_threading_service_test.cpp16
-rw-r--r--searchcore/src/tests/proton/server/shared_threading_service/shared_threading_service_test.cpp38
-rw-r--r--searchcore/src/vespa/searchcore/proton/metrics/content_proton_metrics.cpp3
-rw-r--r--searchcore/src/vespa/searchcore/proton/metrics/content_proton_metrics.h1
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/documentdb.cpp2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/executor_explorer_utils.cpp45
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/executor_explorer_utils.h12
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/executor_threading_service_explorer.cpp50
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/executor_threading_service_explorer.h4
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp15
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h1
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/i_shared_threading_service.h13
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/proton.cpp10
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/proton_thread_pools_explorer.cpp7
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/proton_thread_pools_explorer.h9
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp19
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/shared_threading_service.h2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/shared_threading_service_config.cpp9
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/shared_threading_service_config.h7
-rw-r--r--searchcore/src/vespa/searchcore/proton/test/mock_shared_threading_service.h1
20 files changed, 192 insertions, 72 deletions
diff --git a/searchcore/src/tests/proton/documentdb/executor_threading_service/executor_threading_service_test.cpp b/searchcore/src/tests/proton/documentdb/executor_threading_service/executor_threading_service_test.cpp
index 32707f8a69f..4629ebec854 100644
--- a/searchcore/src/tests/proton/documentdb/executor_threading_service/executor_threading_service_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/executor_threading_service/executor_threading_service_test.cpp
@@ -10,6 +10,7 @@ using vespalib::ISequencedTaskExecutor;
using vespalib::SequencedTaskExecutor;
using SharedFieldWriterExecutor = ThreadingServiceConfig::SharedFieldWriterExecutor;
+VESPA_THREAD_STACK_TAG(my_field_writer_executor)
SequencedTaskExecutor*
to_concrete_type(ISequencedTaskExecutor& exec)
@@ -20,14 +21,17 @@ to_concrete_type(ISequencedTaskExecutor& exec)
class ExecutorThreadingServiceTest : public ::testing::Test {
public:
vespalib::ThreadStackExecutor shared_executor;
+ std::unique_ptr<ISequencedTaskExecutor> field_writer_executor;
std::unique_ptr<ExecutorThreadingService> service;
ExecutorThreadingServiceTest()
: shared_executor(1, 1000),
+ field_writer_executor(SequencedTaskExecutor::create(my_field_writer_executor, 3, 200)),
service()
{
}
void setup(uint32_t indexing_threads, SharedFieldWriterExecutor shared_field_writer) {
service = std::make_unique<ExecutorThreadingService>(shared_executor,
+ field_writer_executor.get(),
ThreadingServiceConfig::make(indexing_threads, shared_field_writer));
}
SequencedTaskExecutor* index_inverter() {
@@ -39,6 +43,9 @@ public:
SequencedTaskExecutor* attribute_writer() {
return to_concrete_type(service->attributeFieldWriter());
}
+ SequencedTaskExecutor* field_writer() {
+ return to_concrete_type(*field_writer_executor);
+ }
};
void
@@ -75,6 +82,15 @@ TEST_F(ExecutorThreadingServiceTest, shared_executor_for_index_and_attribute_fie
assert_executor(index_inverter(), 12, 100);
}
+TEST_F(ExecutorThreadingServiceTest, shared_field_writer_specified_from_the_outside)
+{
+ setup(4, SharedFieldWriterExecutor::DOCUMENT_DB);
+ EXPECT_EQ(field_writer(), index_inverter());
+ EXPECT_EQ(field_writer(), index_writer());
+ EXPECT_EQ(field_writer(), attribute_writer());
+ assert_executor(field_writer(), 3, 200);
+}
+
TEST_F(ExecutorThreadingServiceTest, tasks_limits_can_be_updated)
{
setup(4, SharedFieldWriterExecutor::NONE);
diff --git a/searchcore/src/tests/proton/server/shared_threading_service/shared_threading_service_test.cpp b/searchcore/src/tests/proton/server/shared_threading_service/shared_threading_service_test.cpp
index faf64af17e1..e90bfc8ae57 100644
--- a/searchcore/src/tests/proton/server/shared_threading_service/shared_threading_service_test.cpp
+++ b/searchcore/src/tests/proton/server/shared_threading_service/shared_threading_service_test.cpp
@@ -1,10 +1,15 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <vespa/searchcore/config/config-proton.h>
+#include <vespa/searchcore/proton/server/shared_threading_service.h>
#include <vespa/searchcore/proton/server/shared_threading_service_config.h>
+#include <vespa/vespalib/util/isequencedtaskexecutor.h>
+#include <vespa/vespalib/util/sequencedtaskexecutor.h>
#include <vespa/vespalib/gtest/gtest.h>
using namespace proton;
+using vespalib::ISequencedTaskExecutor;
+using vespalib::SequencedTaskExecutor;
using ProtonConfig = vespa::config::search::core::ProtonConfig;
using ProtonConfigBuilder = vespa::config::search::core::ProtonConfigBuilder;
@@ -18,6 +23,8 @@ make_proton_config(double concurrency)
builder.flush.maxconcurrent = 1;
builder.feeding.concurrency = concurrency;
+ builder.feeding.sharedFieldWriterExecutor = ProtonConfig::Feeding::SharedFieldWriterExecutor::DOCUMENT_DB;
+ builder.indexing.tasklimit = 300;
return builder;
}
@@ -38,4 +45,35 @@ TEST(SharedThreadingServiceConfigTest, shared_threads_are_derived_from_cpu_cores
expect_shared_threads(5, 10);
}
+class SharedThreadingServiceTest : public ::testing::Test {
+public:
+ std::unique_ptr<SharedThreadingService> service;
+ SharedThreadingServiceTest()
+ : service()
+ {
+ }
+ void setup(double concurrency, uint32_t cpu_cores) {
+ service = std::make_unique<SharedThreadingService>(
+ SharedThreadingServiceConfig::make(make_proton_config(concurrency), HwInfo::Cpu(cpu_cores)));
+ }
+ SequencedTaskExecutor* field_writer() {
+ return dynamic_cast<SequencedTaskExecutor*>(service->field_writer());
+ }
+};
+
+void
+assert_executor(SequencedTaskExecutor* exec, uint32_t exp_executors, uint32_t exp_task_limit)
+{
+ EXPECT_EQ(exp_executors, exec->getNumExecutors());
+ EXPECT_EQ(exp_task_limit, exec->first_executor()->getTaskLimit());
+}
+
+TEST_F(SharedThreadingServiceTest, field_writer_can_be_shared_across_all_document_dbs)
+{
+ setup(0.75, 8);
+ EXPECT_TRUE(field_writer());
+ EXPECT_EQ(6, field_writer()->getNumExecutors());
+ EXPECT_EQ(300, field_writer()->first_executor()->getTaskLimit());
+}
+
GTEST_MAIN_RUN_ALL_TESTS()
diff --git a/searchcore/src/vespa/searchcore/proton/metrics/content_proton_metrics.cpp b/searchcore/src/vespa/searchcore/proton/metrics/content_proton_metrics.cpp
index 0db3488dc28..d8c42795099 100644
--- a/searchcore/src/vespa/searchcore/proton/metrics/content_proton_metrics.cpp
+++ b/searchcore/src/vespa/searchcore/proton/metrics/content_proton_metrics.cpp
@@ -11,7 +11,8 @@ ContentProtonMetrics::ProtonExecutorMetrics::ProtonExecutorMetrics(metrics::Metr
match("match", this),
docsum("docsum", this),
shared("shared", this),
- warmup("warmup", this)
+ warmup("warmup", this),
+ field_writer("field_writer", this)
{
}
diff --git a/searchcore/src/vespa/searchcore/proton/metrics/content_proton_metrics.h b/searchcore/src/vespa/searchcore/proton/metrics/content_proton_metrics.h
index 26629d13569..70d3d16cb7c 100644
--- a/searchcore/src/vespa/searchcore/proton/metrics/content_proton_metrics.h
+++ b/searchcore/src/vespa/searchcore/proton/metrics/content_proton_metrics.h
@@ -26,6 +26,7 @@ struct ContentProtonMetrics : metrics::MetricSet
ExecutorMetrics docsum;
ExecutorMetrics shared;
ExecutorMetrics warmup;
+ ExecutorMetrics field_writer;
ProtonExecutorMetrics(metrics::MetricSet *parent);
~ProtonExecutorMetrics();
diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp
index 5d7f841a909..2b2f2422221 100644
--- a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp
@@ -175,7 +175,7 @@ DocumentDB::DocumentDB(const vespalib::string &baseDir,
_baseDir(baseDir + "/" + _docTypeName.toString()),
// Only one thread per executor, or performDropFeedView() will fail.
_writeServiceConfig(configSnapshot->get_threading_service_config()),
- _writeService(shared_service.shared(), _writeServiceConfig, indexing_thread_stack_size),
+ _writeService(shared_service.shared(), shared_service.field_writer(), _writeServiceConfig, indexing_thread_stack_size),
_initializeThreads(std::move(initializeThreads)),
_initConfigSnapshot(),
_initConfigSerialNum(0u),
diff --git a/searchcore/src/vespa/searchcore/proton/server/executor_explorer_utils.cpp b/searchcore/src/vespa/searchcore/proton/server/executor_explorer_utils.cpp
index c044b544675..8b34775b65d 100644
--- a/searchcore/src/vespa/searchcore/proton/server/executor_explorer_utils.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/executor_explorer_utils.cpp
@@ -2,17 +2,21 @@
#include "executor_explorer_utils.h"
#include <vespa/vespalib/data/slime/cursor.h>
+#include <vespa/vespalib/util/adaptive_sequenced_executor.h>
#include <vespa/vespalib/util/blockingthreadstackexecutor.h>
+#include <vespa/vespalib/util/sequencedtaskexecutor.h>
#include <vespa/vespalib/util/singleexecutor.h>
#include <vespa/vespalib/util/threadstackexecutor.h>
+using vespalib::AdaptiveSequencedExecutor;
using vespalib::BlockingThreadStackExecutor;
+using vespalib::ISequencedTaskExecutor;
+using vespalib::SequencedTaskExecutor;
using vespalib::SingleExecutor;
using vespalib::ThreadExecutor;
using vespalib::ThreadStackExecutor;
using vespalib::slime::Cursor;
-
namespace proton::explorer {
namespace {
@@ -33,6 +37,32 @@ convert_single_executor_to_slime(const SingleExecutor& executor, Cursor& object)
object.setDouble("reaction_time_sec", vespalib::to_s(executor.get_reaction_time()));
}
+void
+set_type(Cursor& object, const vespalib::string& type)
+{
+ object.setString("type", type);
+}
+
+void
+convert_sequenced_executor_to_slime(const SequencedTaskExecutor& executor, Cursor& object)
+{
+ set_type(object, "SequencedTaskExecutor");
+ object.setLong("num_executors", executor.getNumExecutors());
+ convert_executor_to_slime(executor.first_executor(), object.setObject("executor"));
+}
+
+void
+convert_adaptive_executor_to_slime(const AdaptiveSequencedExecutor& executor, Cursor& object)
+{
+ set_type(object, "AdaptiveSequencedExecutor");
+ object.setLong("num_strands", executor.getNumExecutors());
+ auto cfg = executor.get_config();
+ object.setLong("num_threads", cfg.num_threads);
+ object.setLong("max_waiting", cfg.max_waiting);
+ object.setLong("max_pending", cfg.max_pending);
+ object.setLong("wakeup_limit", cfg.wakeup_limit);
+}
+
}
void
@@ -52,5 +82,18 @@ convert_executor_to_slime(const ThreadExecutor* executor, Cursor& object)
}
}
+void
+convert_executor_to_slime(const ISequencedTaskExecutor* executor, Cursor& object)
+{
+ if (const auto* seq = dynamic_cast<const SequencedTaskExecutor*>(executor)) {
+ convert_sequenced_executor_to_slime(*seq, object);
+ } else if (const auto* ada = dynamic_cast<const AdaptiveSequencedExecutor*>(executor)) {
+ convert_adaptive_executor_to_slime(*ada, object);
+ } else {
+ set_type(object, "ISequencedTaskExecutor");
+ object.setLong("num_executors", executor->getNumExecutors());
+ }
+}
+
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/executor_explorer_utils.h b/searchcore/src/vespa/searchcore/proton/server/executor_explorer_utils.h
index 7ed0d073ded..aa7bfabd00b 100644
--- a/searchcore/src/vespa/searchcore/proton/server/executor_explorer_utils.h
+++ b/searchcore/src/vespa/searchcore/proton/server/executor_explorer_utils.h
@@ -2,15 +2,23 @@
#pragma once
-namespace vespalib { class ThreadExecutor; }
+namespace vespalib {
+class ISequencedTaskExecutor;
+class ThreadExecutor;
+}
namespace vespalib::slime { struct Cursor; }
namespace proton::explorer {
/**
- * Utility to convert an executor to slime for use with a state explorer.
+ * Utility to convert a thread executor to slime for use with a state explorer.
*/
void convert_executor_to_slime(const vespalib::ThreadExecutor* executor, vespalib::slime::Cursor& object);
+/**
+ * Utility to convert a sequenced task executor to slime for use with a state explorer.
+ */
+void convert_executor_to_slime(const vespalib::ISequencedTaskExecutor* executor, vespalib::slime::Cursor& object);
+
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/executor_threading_service_explorer.cpp b/searchcore/src/vespa/searchcore/proton/server/executor_threading_service_explorer.cpp
index daf54eac859..5bbbf1ca57d 100644
--- a/searchcore/src/vespa/searchcore/proton/server/executor_threading_service_explorer.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/executor_threading_service_explorer.cpp
@@ -4,61 +4,11 @@
#include "executor_threading_service_explorer.h"
#include "executorthreadingservice.h"
#include <vespa/vespalib/data/slime/cursor.h>
-#include <vespa/vespalib/util/adaptive_sequenced_executor.h>
-#include <vespa/vespalib/util/sequencedtaskexecutor.h>
-
-using vespalib::AdaptiveSequencedExecutor;
-using vespalib::ISequencedTaskExecutor;
-using vespalib::SequencedTaskExecutor;
-using vespalib::slime::Cursor;
namespace proton {
using explorer::convert_executor_to_slime;
-namespace {
-
-void
-set_type(Cursor& object, const vespalib::string& type)
-{
- object.setString("type", type);
-}
-
-void
-convert_sequenced_executor_to_slime(const SequencedTaskExecutor& executor, Cursor& object)
-{
- set_type(object, "SequencedTaskExecutor");
- object.setLong("num_executors", executor.getNumExecutors());
- convert_executor_to_slime(executor.first_executor(), object.setObject("executor"));
-}
-
-void
-convert_adaptive_executor_to_slime(const AdaptiveSequencedExecutor& executor, Cursor& object)
-{
- set_type(object, "AdaptiveSequencedExecutor");
- object.setLong("num_strands", executor.getNumExecutors());
- auto cfg = executor.get_config();
- object.setLong("num_threads", cfg.num_threads);
- object.setLong("max_waiting", cfg.max_waiting);
- object.setLong("max_pending", cfg.max_pending);
- object.setLong("wakeup_limit", cfg.wakeup_limit);
-}
-
-void
-convert_executor_to_slime(const ISequencedTaskExecutor* executor, Cursor& object)
-{
- if (const auto* seq = dynamic_cast<const SequencedTaskExecutor*>(executor)) {
- convert_sequenced_executor_to_slime(*seq, object);
- } else if (const auto* ada = dynamic_cast<const AdaptiveSequencedExecutor*>(executor)) {
- convert_adaptive_executor_to_slime(*ada, object);
- } else {
- set_type(object, "ISequencedTaskExecutor");
- object.setLong("num_executors", executor->getNumExecutors());
- }
-}
-
-}
-
ExecutorThreadingServiceExplorer::ExecutorThreadingServiceExplorer(searchcorespi::index::IThreadingService& service)
: _service(service)
{
diff --git a/searchcore/src/vespa/searchcore/proton/server/executor_threading_service_explorer.h b/searchcore/src/vespa/searchcore/proton/server/executor_threading_service_explorer.h
index 70ed23c6271..f0bb20ab64e 100644
--- a/searchcore/src/vespa/searchcore/proton/server/executor_threading_service_explorer.h
+++ b/searchcore/src/vespa/searchcore/proton/server/executor_threading_service_explorer.h
@@ -14,10 +14,10 @@ class ExecutorThreadingService;
*/
class ExecutorThreadingServiceExplorer : public vespalib::StateExplorer {
private:
- searchcorespi::index::IThreadingService & _service;
+ searchcorespi::index::IThreadingService& _service;
public:
- ExecutorThreadingServiceExplorer(searchcorespi::index::IThreadingService & service);
+ ExecutorThreadingServiceExplorer(searchcorespi::index::IThreadingService& service);
~ExecutorThreadingServiceExplorer();
void get_state(const vespalib::slime::Inserter& inserter, bool full) const override;
diff --git a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp
index 86530f235fd..52da92ed568 100644
--- a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp
@@ -39,10 +39,11 @@ VESPA_THREAD_STACK_TAG(field_writer_executor)
}
ExecutorThreadingService::ExecutorThreadingService(vespalib::ThreadExecutor &sharedExecutor, uint32_t num_treads)
- : ExecutorThreadingService(sharedExecutor, ThreadingServiceConfig::make(num_treads))
+ : ExecutorThreadingService(sharedExecutor, nullptr, ThreadingServiceConfig::make(num_treads))
{}
ExecutorThreadingService::ExecutorThreadingService(vespalib::ThreadExecutor& sharedExecutor,
+ vespalib::ISequencedTaskExecutor* field_writer,
const ThreadingServiceConfig& cfg,
uint32_t stackSize)
@@ -76,8 +77,12 @@ ExecutorThreadingService::ExecutorThreadingService(vespalib::ThreadExecutor& sha
_index_field_inverter_ptr = _field_writer.get();
_index_field_writer_ptr = _field_writer.get();
_attribute_field_writer_ptr = _field_writer.get();
+ } else if (_shared_field_writer == SharedFieldWriterExecutor::DOCUMENT_DB) {
+ assert(field_writer != nullptr);
+ _index_field_inverter_ptr = field_writer;
+ _index_field_writer_ptr = field_writer;
+ _attribute_field_writer_ptr = field_writer;
} else {
- // TODO: Add support for shared field writer across all document dbs.
_indexFieldInverter = SequencedTaskExecutor::create(index_field_inverter_executor, cfg.indexingThreads(), cfg.defaultTaskLimit());
_indexFieldWriter = SequencedTaskExecutor::create(index_field_writer_executor, cfg.indexingThreads(), cfg.defaultTaskLimit());
_attributeFieldWriter = SequencedTaskExecutor::create(attribute_field_writer_executor, cfg.indexingThreads(), cfg.defaultTaskLimit(),
@@ -143,6 +148,7 @@ ExecutorThreadingService::set_task_limits(uint32_t master_task_limit,
_master_task_limit.store(master_task_limit, std::memory_order_release);
_indexExecutor->setTaskLimit(field_task_limit);
_summaryExecutor->setTaskLimit(summary_task_limit);
+ // TODO: Move this to a common place when the field writer is always shared.
_index_field_inverter_ptr->setTaskLimit(field_task_limit);
_index_field_writer_ptr->setTaskLimit(field_task_limit);
_attribute_field_writer_ptr->setTaskLimit(field_task_limit);
@@ -166,6 +172,11 @@ ExecutorThreadingService::getStats()
field_writer_stats,
field_writer_stats,
field_writer_stats);
+ } else if (_shared_field_writer == SharedFieldWriterExecutor::DOCUMENT_DB) {
+ vespalib::ExecutorStats empty_stats;
+ // In this case the field writer stats are reported at a higher level.
+ return ExecutorThreadingServiceStats(master_stats, index_stats, summary_stats,
+ empty_stats, empty_stats, empty_stats);
} else {
return ExecutorThreadingServiceStats(master_stats, index_stats, summary_stats,
_index_field_inverter_ptr->getStats(),
diff --git a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h
index 2a4c57ef57d..972e0de0ec0 100644
--- a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h
+++ b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h
@@ -42,6 +42,7 @@ public:
ExecutorThreadingService(vespalib::ThreadExecutor& sharedExecutor, uint32_t num_treads = 1);
ExecutorThreadingService(vespalib::ThreadExecutor& sharedExecutor,
+ vespalib::ISequencedTaskExecutor* field_writer,
const ThreadingServiceConfig& cfg,
uint32_t stackSize = 128 * 1024);
~ExecutorThreadingService() override;
diff --git a/searchcore/src/vespa/searchcore/proton/server/i_shared_threading_service.h b/searchcore/src/vespa/searchcore/proton/server/i_shared_threading_service.h
index 5145dbec43e..9cd19012223 100644
--- a/searchcore/src/vespa/searchcore/proton/server/i_shared_threading_service.h
+++ b/searchcore/src/vespa/searchcore/proton/server/i_shared_threading_service.h
@@ -1,7 +1,10 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
-namespace vespalib { class ThreadExecutor; }
+namespace vespalib {
+class ISequencedTaskExecutor;
+class ThreadExecutor;
+}
namespace proton {
@@ -27,6 +30,14 @@ public:
* - Writing of data in the document store.
*/
virtual vespalib::ThreadExecutor& shared() = 0;
+
+ /**
+ * Returns the sequenced executor used to write index and attribute fields in a document db.
+ *
+ * This is a nullptr if the field writer is not shared across all document dbs.
+ * TODO: Make this a reference when it is always shared.
+ */
+ virtual vespalib::ISequencedTaskExecutor* field_writer() = 0;
};
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.cpp b/searchcore/src/vespa/searchcore/proton/server/proton.cpp
index e056325e0d3..89940fbf367 100644
--- a/searchcore/src/vespa/searchcore/proton/server/proton.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/proton.cpp
@@ -38,6 +38,7 @@
#include <vespa/vespalib/util/lambdatask.h>
#include <vespa/vespalib/util/mmap_file_allocator_factory.h>
#include <vespa/vespalib/util/random.h>
+#include <vespa/vespalib/util/sequencedtaskexecutor.h>
#include <vespa/vespalib/util/size_literals.h>
#ifdef __linux__
#include <malloc.h>
@@ -448,6 +449,9 @@ Proton::~Proton()
if (_shared_service) {
_shared_service->warmup_raw().sync();
_shared_service->shared_raw()->sync();
+ if (_shared_service->field_writer()) {
+ _shared_service->field_writer()->sync_all();
+ }
}
if ( ! _documentDBMap.empty()) {
@@ -788,6 +792,9 @@ Proton::updateMetrics(const metrics::MetricLockGuard &)
if (_shared_service) {
metrics.shared.update(_shared_service->shared().getStats());
metrics.warmup.update(_shared_service->warmup().getStats());
+ if (_shared_service->field_writer()) {
+ metrics.warmup.update(_shared_service->field_writer()->getStats());
+ }
}
}
}
@@ -944,7 +951,8 @@ Proton::get_child(vespalib::stringref name) const
(_summaryEngine) ? &_summaryEngine->get_executor() : nullptr,
(_flushEngine) ? &_flushEngine->get_executor() : nullptr,
&_executor,
- (_shared_service) ? &_shared_service->warmup() : nullptr);
+ (_shared_service) ? &_shared_service->warmup() : nullptr,
+ (_shared_service) ? _shared_service->field_writer() : nullptr);
}
return Explorer_UP(nullptr);
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/proton_thread_pools_explorer.cpp b/searchcore/src/vespa/searchcore/proton/server/proton_thread_pools_explorer.cpp
index 70195980376..06a51b7e661 100644
--- a/searchcore/src/vespa/searchcore/proton/server/proton_thread_pools_explorer.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/proton_thread_pools_explorer.cpp
@@ -16,13 +16,15 @@ ProtonThreadPoolsExplorer::ProtonThreadPoolsExplorer(const ThreadExecutor* share
const ThreadExecutor* docsum,
const ThreadExecutor* flush,
const ThreadExecutor* proton,
- const ThreadExecutor* warmup)
+ const ThreadExecutor* warmup,
+ const vespalib::ISequencedTaskExecutor* field_writer)
: _shared(shared),
_match(match),
_docsum(docsum),
_flush(flush),
_proton(proton),
- _warmup(warmup)
+ _warmup(warmup),
+ _field_writer(field_writer)
{
}
@@ -37,6 +39,7 @@ ProtonThreadPoolsExplorer::get_state(const vespalib::slime::Inserter& inserter,
convert_executor_to_slime(_flush, object.setObject("flush"));
convert_executor_to_slime(_proton, object.setObject("proton"));
convert_executor_to_slime(_warmup, object.setObject("warmup"));
+ convert_executor_to_slime(_field_writer, object.setObject("field_writer"));
}
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/proton_thread_pools_explorer.h b/searchcore/src/vespa/searchcore/proton/server/proton_thread_pools_explorer.h
index 76c5fa8cfb0..2cacdd2c336 100644
--- a/searchcore/src/vespa/searchcore/proton/server/proton_thread_pools_explorer.h
+++ b/searchcore/src/vespa/searchcore/proton/server/proton_thread_pools_explorer.h
@@ -4,7 +4,10 @@
#include <vespa/vespalib/net/state_explorer.h>
-namespace vespalib { class ThreadExecutor; }
+namespace vespalib {
+class ISequencedTaskExecutor;
+class ThreadExecutor;
+}
namespace proton {
@@ -19,6 +22,7 @@ private:
const vespalib::ThreadExecutor* _flush;
const vespalib::ThreadExecutor* _proton;
const vespalib::ThreadExecutor* _warmup;
+ const vespalib::ISequencedTaskExecutor* _field_writer;
public:
ProtonThreadPoolsExplorer(const vespalib::ThreadExecutor* shared,
@@ -26,7 +30,8 @@ public:
const vespalib::ThreadExecutor* docsum,
const vespalib::ThreadExecutor* flush,
const vespalib::ThreadExecutor* proton,
- const vespalib::ThreadExecutor* warmup);
+ const vespalib::ThreadExecutor* warmup,
+ const vespalib::ISequencedTaskExecutor* field_writer);
void get_state(const vespalib::slime::Inserter& inserter, bool full) const override;
};
diff --git a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp
index 04e775674b4..f9333aa18a5 100644
--- a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp
@@ -1,19 +1,34 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "shared_threading_service.h"
-#include <vespa/vespalib/util/size_literals.h>
#include <vespa/vespalib/util/blockingthreadstackexecutor.h>
+#include <vespa/vespalib/util/isequencedtaskexecutor.h>
+#include <vespa/vespalib/util/sequencedtaskexecutor.h>
+#include <vespa/vespalib/util/size_literals.h>
+VESPA_THREAD_STACK_TAG(proton_field_writer_executor)
VESPA_THREAD_STACK_TAG(proton_shared_executor)
VESPA_THREAD_STACK_TAG(proton_warmup_executor)
namespace proton {
+using SharedFieldWriterExecutor = ThreadingServiceConfig::ProtonConfig::Feeding::SharedFieldWriterExecutor;
+
SharedThreadingService::SharedThreadingService(const SharedThreadingServiceConfig& cfg)
: _warmup(cfg.warmup_threads(), 128_Ki, proton_warmup_executor),
_shared(std::make_shared<vespalib::BlockingThreadStackExecutor>(cfg.shared_threads(), 128_Ki,
- cfg.shared_task_limit(), proton_shared_executor))
+ cfg.shared_task_limit(), proton_shared_executor)),
+ _field_writer()
{
+ const auto& fw_cfg = cfg.field_writer_config();
+ if (fw_cfg.shared_field_writer() == SharedFieldWriterExecutor::DOCUMENT_DB) {
+ _field_writer = vespalib::SequencedTaskExecutor::create(proton_field_writer_executor,
+ fw_cfg.indexingThreads() * 3,
+ fw_cfg.defaultTaskLimit(),
+ fw_cfg.optimize(),
+ fw_cfg.kindOfwatermark(),
+ fw_cfg.reactionTime());
+ }
}
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.h b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.h
index ef0ff31c389..12686e4a3c5 100644
--- a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.h
+++ b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.h
@@ -16,6 +16,7 @@ class SharedThreadingService : public ISharedThreadingService {
private:
vespalib::ThreadStackExecutor _warmup;
std::shared_ptr<vespalib::SyncableThreadExecutor> _shared;
+ std::unique_ptr<vespalib::ISequencedTaskExecutor> _field_writer;
public:
SharedThreadingService(const SharedThreadingServiceConfig& cfg);
@@ -25,6 +26,7 @@ public:
vespalib::ThreadExecutor& warmup() override { return _warmup; }
vespalib::ThreadExecutor& shared() override { return *_shared; }
+ vespalib::ISequencedTaskExecutor* field_writer() override { return _field_writer.get(); }
};
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service_config.cpp b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service_config.cpp
index cf62cf3b76c..8a81c3f4388 100644
--- a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service_config.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service_config.cpp
@@ -10,10 +10,12 @@ using ProtonConfig = SharedThreadingServiceConfig::ProtonConfig;
SharedThreadingServiceConfig::SharedThreadingServiceConfig(uint32_t shared_threads_in,
uint32_t shared_task_limit_in,
- uint32_t warmup_threads_in)
+ uint32_t warmup_threads_in,
+ const ThreadingServiceConfig& field_writer_config_in)
: _shared_threads(shared_threads_in),
_shared_task_limit(shared_task_limit_in),
- _warmup_threads(warmup_threads_in)
+ _warmup_threads(warmup_threads_in),
+ _field_writer_config(field_writer_config_in)
{
}
@@ -35,7 +37,8 @@ SharedThreadingServiceConfig::make(const proton::SharedThreadingServiceConfig::P
const proton::HwInfo::Cpu& cpu_info)
{
size_t shared_threads = derive_shared_threads(cfg, cpu_info);
- return proton::SharedThreadingServiceConfig(shared_threads, shared_threads * 16, 4);
+ return proton::SharedThreadingServiceConfig(shared_threads, shared_threads * 16, 4,
+ ThreadingServiceConfig::make(cfg, cfg.feeding.concurrency, cpu_info));
}
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service_config.h b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service_config.h
index 02966e0efeb..1214bfa77fa 100644
--- a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service_config.h
+++ b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service_config.h
@@ -1,6 +1,7 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
+#include "threading_service_config.h"
#include <vespa/searchcore/proton/common/hw_info.h>
namespace vespa::config::search::core::internal { class InternalProtonType; }
@@ -18,18 +19,20 @@ private:
uint32_t _shared_threads;
uint32_t _shared_task_limit;
uint32_t _warmup_threads;
+ ThreadingServiceConfig _field_writer_config;
public:
SharedThreadingServiceConfig(uint32_t shared_threads_in,
uint32_t shared_task_limit_in,
- uint32_t warmup_threads_in);
+ uint32_t warmup_threads_in,
+ const ThreadingServiceConfig& field_writer_config_in);
static SharedThreadingServiceConfig make(const ProtonConfig& cfg, const HwInfo::Cpu& cpu_info);
uint32_t shared_threads() const { return _shared_threads; }
uint32_t shared_task_limit() const { return _shared_task_limit; }
uint32_t warmup_threads() const { return _warmup_threads; }
-
+ const ThreadingServiceConfig& field_writer_config() const { return _field_writer_config; }
};
}
diff --git a/searchcore/src/vespa/searchcore/proton/test/mock_shared_threading_service.h b/searchcore/src/vespa/searchcore/proton/test/mock_shared_threading_service.h
index f21f43ed5ad..976d75f2571 100644
--- a/searchcore/src/vespa/searchcore/proton/test/mock_shared_threading_service.h
+++ b/searchcore/src/vespa/searchcore/proton/test/mock_shared_threading_service.h
@@ -18,6 +18,7 @@ public:
{}
vespalib::ThreadExecutor& warmup() override { return _warmup; }
vespalib::ThreadExecutor& shared() override { return _shared; }
+ vespalib::ISequencedTaskExecutor* field_writer() override { return nullptr; }
};
}