summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2021-11-17 19:09:47 +0100
committerGitHub <noreply@github.com>2021-11-17 19:09:47 +0100
commitab3518e8b3a4caf742e12a134c4fb1d2bbf3c293 (patch)
tree389b23f151082b1f45ef3a5c70614b613032d407
parent65fe0e9b45b10e9b3663a729969473d84f3b5edd (diff)
parentcb193f15ffd443cdb370a43e97712bd4b4b83edf (diff)
Merge pull request #20070 from vespa-engine/geirst/external-feed-operation-blocking
Add support for blocking external feed operations when the document d…
-rw-r--r--searchcore/src/tests/proton/documentdb/executor_threading_service/executor_threading_service_test.cpp12
-rw-r--r--searchcore/src/tests/proton/documentdb/threading_service_config/threading_service_config_test.cpp26
-rw-r--r--searchcore/src/vespa/searchcore/config/proton.def7
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/documentdb.cpp4
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp31
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h23
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp8
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp10
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/threading_service_config.h7
-rw-r--r--searchcore/src/vespa/searchcore/proton/test/threading_service_observer.h4
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/ithreadingservice.h6
11 files changed, 105 insertions, 33 deletions
diff --git a/searchcore/src/tests/proton/documentdb/executor_threading_service/executor_threading_service_test.cpp b/searchcore/src/tests/proton/documentdb/executor_threading_service/executor_threading_service_test.cpp
index 714ffaa16b7..32707f8a69f 100644
--- a/searchcore/src/tests/proton/documentdb/executor_threading_service/executor_threading_service_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/executor_threading_service/executor_threading_service_test.cpp
@@ -75,5 +75,17 @@ TEST_F(ExecutorThreadingServiceTest, shared_executor_for_index_and_attribute_fie
assert_executor(index_inverter(), 12, 100);
}
+TEST_F(ExecutorThreadingServiceTest, tasks_limits_can_be_updated)
+{
+ setup(4, SharedFieldWriterExecutor::NONE);
+ service->set_task_limits(5, 7, 11);
+ EXPECT_EQ(5, service->master_task_limit());
+ EXPECT_EQ(7, service->index().getTaskLimit());
+ EXPECT_EQ(11, service->summary().getTaskLimit());
+ EXPECT_EQ(7, index_inverter()->first_executor()->getTaskLimit());
+ EXPECT_EQ(7, index_writer()->first_executor()->getTaskLimit());
+ EXPECT_EQ(7, attribute_writer()->first_executor()->getTaskLimit());
+}
+
GTEST_MAIN_RUN_ALL_TESTS()
diff --git a/searchcore/src/tests/proton/documentdb/threading_service_config/threading_service_config_test.cpp b/searchcore/src/tests/proton/documentdb/threading_service_config/threading_service_config_test.cpp
index 353ebeb3abc..50a8349b859 100644
--- a/searchcore/src/tests/proton/documentdb/threading_service_config/threading_service_config_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/threading_service_config/threading_service_config_test.cpp
@@ -14,15 +14,15 @@ using ProtonConfigBuilder = vespa::config::search::core::ProtonConfigBuilder;
struct Fixture {
ProtonConfig cfg;
- Fixture(uint32_t baseLineIndexingThreads = 2, uint32_t task_limit = 500, uint32_t semi_unbound_task_limit = 50000)
- : cfg(makeConfig(baseLineIndexingThreads, task_limit, semi_unbound_task_limit))
+ Fixture(uint32_t baseLineIndexingThreads = 2, uint32_t master_task_limit = 2000, uint32_t task_limit = 500)
+ : cfg(makeConfig(baseLineIndexingThreads, master_task_limit, task_limit))
{
}
- ProtonConfig makeConfig(uint32_t baseLineIndexingThreads, uint32_t task_limit, uint32_t semi_unbound_task_limit) {
+ ProtonConfig makeConfig(uint32_t baseLineIndexingThreads, uint32_t master_task_limit, uint32_t task_limit) {
ProtonConfigBuilder builder;
builder.indexing.threads = baseLineIndexingThreads;
builder.indexing.tasklimit = task_limit;
- builder.indexing.semiunboundtasklimit = semi_unbound_task_limit;
+ builder.feeding.masterTaskLimit = master_task_limit;
return builder;
}
ThreadingServiceConfig make(uint32_t cpuCores) {
@@ -51,28 +51,32 @@ TEST_F("require that indexing threads is always >= 1", Fixture(0))
TEST_DO(f.assertIndexingThreads(1, 0));
}
-TEST_F("require that default task limit is set", Fixture)
+TEST_F("require that task limits are set", Fixture)
{
- EXPECT_EQUAL(500u, f.make(24).defaultTaskLimit());
+ auto tcfg = f.make(24);
+ EXPECT_EQUAL(2000u, tcfg.master_task_limit());
+ EXPECT_EQUAL(500u, tcfg.defaultTaskLimit());
}
namespace {
-void assertConfig(uint32_t exp_indexing_threads, uint32_t exp_default_task_limit, const ThreadingServiceConfig &config) {
+void assertConfig(uint32_t exp_indexing_threads, uint32_t exp_master_task_limit,
+ uint32_t exp_default_task_limit, const ThreadingServiceConfig& config) {
EXPECT_EQUAL(exp_indexing_threads, config.indexingThreads());
+ EXPECT_EQUAL(exp_master_task_limit, config.master_task_limit());
EXPECT_EQUAL(exp_default_task_limit, config.defaultTaskLimit());
}
}
-TEST_FF("require that config can be somewhat updated", Fixture(), Fixture(2, 1000, 100000))
+TEST_FF("require that config can be somewhat updated", Fixture(), Fixture(2, 3000, 1000))
{
auto cfg1 = f1.make(1);
- assertConfig(2u, 500u, cfg1);
+ assertConfig(2u, 2000, 500u, cfg1);
const auto cfg2 = f2.make(13);
- assertConfig(3u, 1000u, cfg2);
+ assertConfig(3u, 3000u, 1000u, cfg2);
cfg1.update(cfg2);
- assertConfig(2u, 1000u, cfg1); // Indexing threads not changed
+ assertConfig(2u, 3000u, 1000u, cfg1); // Indexing threads not changed
}
TEST_MAIN()
diff --git a/searchcore/src/vespa/searchcore/config/proton.def b/searchcore/src/vespa/searchcore/config/proton.def
index 6b85f4a6829..5857fdd4f8d 100644
--- a/searchcore/src/vespa/searchcore/config/proton.def
+++ b/searchcore/src/vespa/searchcore/config/proton.def
@@ -503,6 +503,13 @@ feeding.concurrency double default = 0.2 restart
## TODO: Remove this when a shared executor is the default.
feeding.shared_field_writer_executor enum {NONE, INDEX, INDEX_AND_ATTRIBUTE, DOCUMENT_DB} default = NONE restart
+## Maximum number of pending tasks for the master thread in each document db.
+##
+## This limit is only considered when executing tasks for handling external feed operations.
+## In that case the calling thread (persistence thread) is blocked until the master thread has capacity to handle more tasks.
+## When this limit is set to 0 it is ignored.
+feeding.master_task_limit int default = 0
+
## Adjustment to resource limit when determining if maintenance jobs can run.
##
## Currently used by 'lid_space_compaction' and 'move_buckets' jobs.
diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp
index 3f2fb6c4634..427d435aae7 100644
--- a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp
@@ -482,7 +482,9 @@ DocumentDB::applyConfig(DocumentDBConfig::SP configSnapshot, SerialNum serialNum
if (_state.getState() >= DDBState::State::APPLY_LIVE_CONFIG) {
_writeServiceConfig.update(configSnapshot->get_threading_service_config());
}
- _writeService.setTaskLimit(_writeServiceConfig.defaultTaskLimit(), _writeServiceConfig.defaultTaskLimit());
+ _writeService.set_task_limits(_writeServiceConfig.master_task_limit(),
+ _writeServiceConfig.defaultTaskLimit(),
+ _writeServiceConfig.defaultTaskLimit());
if (params.shouldSubDbsChange()) {
applySubDBConfig(*configSnapshot, serialNum, params);
if (serialNum < _feedHandler->getSerialNum()) {
diff --git a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp
index 0e9ba7a24c8..7e0a1851bf5 100644
--- a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp
@@ -42,11 +42,13 @@ ExecutorThreadingService::ExecutorThreadingService(vespalib::ThreadExecutor &sha
: ExecutorThreadingService(sharedExecutor, ThreadingServiceConfig::make(num_treads))
{}
-ExecutorThreadingService::ExecutorThreadingService(vespalib::ThreadExecutor & sharedExecutor,
- const ThreadingServiceConfig & cfg, uint32_t stackSize)
+ExecutorThreadingService::ExecutorThreadingService(vespalib::ThreadExecutor& sharedExecutor,
+ const ThreadingServiceConfig& cfg,
+ uint32_t stackSize)
: _sharedExecutor(sharedExecutor),
_masterExecutor(1, stackSize, master_executor),
+ _master_task_limit(cfg.master_task_limit()),
_indexExecutor(createExecutorWithOneThread(stackSize, cfg.defaultTaskLimit(), cfg.optimize(), index_executor)),
_summaryExecutor(createExecutorWithOneThread(stackSize, cfg.defaultTaskLimit(), cfg.optimize(), summary_executor)),
_masterService(_masterExecutor),
@@ -97,6 +99,16 @@ ExecutorThreadingService::sync_all_executors() {
}
void
+ExecutorThreadingService::blocking_master_execute(vespalib::Executor::Task::UP task)
+{
+ uint32_t limit = master_task_limit();
+ if (limit > 0) {
+ _masterExecutor.wait_for_task_count(limit);
+ }
+ _masterExecutor.execute(std::move(task));
+}
+
+void
ExecutorThreadingService::syncOnce() {
bool isMasterThread = _masterService.isCurrentThread();
if (!isMasterThread) {
@@ -127,13 +139,16 @@ ExecutorThreadingService::shutdown()
}
void
-ExecutorThreadingService::setTaskLimit(uint32_t taskLimit, uint32_t summaryTaskLimit)
+ExecutorThreadingService::set_task_limits(uint32_t master_task_limit,
+ uint32_t field_task_limit,
+ uint32_t summary_task_limit)
{
- _indexExecutor->setTaskLimit(taskLimit);
- _summaryExecutor->setTaskLimit(summaryTaskLimit);
- _index_field_inverter_ptr->setTaskLimit(taskLimit);
- _index_field_writer_ptr->setTaskLimit(taskLimit);
- _attribute_field_writer_ptr->setTaskLimit(taskLimit);
+ _master_task_limit.store(master_task_limit, std::memory_order_release);
+ _indexExecutor->setTaskLimit(field_task_limit);
+ _summaryExecutor->setTaskLimit(summary_task_limit);
+ _index_field_inverter_ptr->setTaskLimit(field_task_limit);
+ _index_field_writer_ptr->setTaskLimit(field_task_limit);
+ _attribute_field_writer_ptr->setTaskLimit(field_task_limit);
}
ExecutorThreadingServiceStats
diff --git a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h
index e571e205f47..1890ca300e2 100644
--- a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h
+++ b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h
@@ -19,6 +19,7 @@ class ExecutorThreadingService : public searchcorespi::index::IThreadingService
private:
vespalib::ThreadExecutor & _sharedExecutor;
vespalib::ThreadStackExecutor _masterExecutor;
+ std::atomic<uint32_t> _master_task_limit;
std::unique_ptr<vespalib::SyncableThreadExecutor> _indexExecutor;
std::unique_ptr<vespalib::SyncableThreadExecutor> _summaryExecutor;
ExecutorThreadService _masterService;
@@ -36,21 +37,27 @@ private:
public:
using OptimizeFor = vespalib::Executor::OptimizeFor;
/**
- * Constructor.
- *
- * @stackSize The size of the stack of the underlying executors.
- * @cfg config used to set up all executors.
+ * Convenience constructor used in unit tests.
*/
- ExecutorThreadingService(vespalib::ThreadExecutor &sharedExecutor,
- const ThreadingServiceConfig & cfg, uint32_t stackSize = 128 * 1024);
- ExecutorThreadingService(vespalib::ThreadExecutor &sharedExecutor, uint32_t num_treads = 1);
+ ExecutorThreadingService(vespalib::ThreadExecutor& sharedExecutor, uint32_t num_treads = 1);
+
+ ExecutorThreadingService(vespalib::ThreadExecutor& sharedExecutor,
+ const ThreadingServiceConfig& cfg,
+ uint32_t stackSize = 128 * 1024);
~ExecutorThreadingService() override;
void sync_all_executors() override;
+ void blocking_master_execute(vespalib::Executor::Task::UP task) override;
+
void shutdown();
- void setTaskLimit(uint32_t taskLimit, uint32_t summaryTaskLimit);
+ uint32_t master_task_limit() const {
+ return _master_task_limit.load(std::memory_order_relaxed);
+ }
+ void set_task_limits(uint32_t master_task_limit,
+ uint32_t field_task_limit,
+ uint32_t summary_task_limit);
// Expose the underlying executors for stats fetching and testing.
// TOD: Remove - This is only used for casting to check the underlying type
diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
index c9294150f16..ea63d59c830 100644
--- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
@@ -736,7 +736,13 @@ FeedHandler::performOperation(FeedToken token, FeedOperation::UP op)
void
FeedHandler::handleOperation(FeedToken token, FeedOperation::UP op)
{
- _writeService.master().execute(makeLambdaTask([this, token = std::move(token), op = std::move(op)]() mutable {
+ // This function is only called when handling external feed operations (see PersistenceHandlerProxy),
+ // and ensures that the calling thread (persistence thread) is blocked until the master thread has capacity to handle more tasks.
+ // This helps keeping feed operation latencies and memory usage in check.
+ // NOTE: Tasks that are created and executed from the master thread itself or some of its helpers
+ // cannot use blocking_master_execute() as that could lead to deadlocks.
+ // See FeedHandler::initiateCommit() for a concrete example.
+ _writeService.blocking_master_execute(makeLambdaTask([this, token = std::move(token), op = std::move(op)]() mutable {
doHandleOperation(std::move(token), std::move(op));
}));
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp
index 012d91cb49f..ff75a59c41b 100644
--- a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp
@@ -11,12 +11,14 @@ using OptimizeFor = vespalib::Executor::OptimizeFor;
ThreadingServiceConfig::ThreadingServiceConfig(uint32_t indexingThreads_,
+ uint32_t master_task_limit_,
uint32_t defaultTaskLimit_,
OptimizeFor optimize_,
uint32_t kindOfWatermark_,
vespalib::duration reactionTime_,
SharedFieldWriterExecutor shared_field_writer_)
: _indexingThreads(indexingThreads_),
+ _master_task_limit(master_task_limit_),
_defaultTaskLimit(defaultTaskLimit_),
_optimize(optimize_),
_kindOfWatermark(kindOfWatermark_),
@@ -59,7 +61,9 @@ ThreadingServiceConfig
ThreadingServiceConfig::make(const ProtonConfig &cfg, double concurrency, const HwInfo::Cpu &cpuInfo)
{
uint32_t indexingThreads = calculateIndexingThreads(cfg.indexing, concurrency, cpuInfo);
- return ThreadingServiceConfig(indexingThreads, cfg.indexing.tasklimit,
+ return ThreadingServiceConfig(indexingThreads,
+ cfg.feeding.masterTaskLimit,
+ cfg.indexing.tasklimit,
selectOptimization(cfg.indexing.optimize),
cfg.indexing.kindOfWatermark,
vespalib::from_s(cfg.indexing.reactiontime),
@@ -68,12 +72,13 @@ ThreadingServiceConfig::make(const ProtonConfig &cfg, double concurrency, const
ThreadingServiceConfig
ThreadingServiceConfig::make(uint32_t indexingThreads, SharedFieldWriterExecutor shared_field_writer_) {
- return ThreadingServiceConfig(indexingThreads, 100, OptimizeFor::LATENCY, 0, 10ms, shared_field_writer_);
+ return ThreadingServiceConfig(indexingThreads, 0, 100, OptimizeFor::LATENCY, 0, 10ms, shared_field_writer_);
}
void
ThreadingServiceConfig::update(const ThreadingServiceConfig& cfg)
{
+ _master_task_limit = cfg._master_task_limit;
_defaultTaskLimit = cfg._defaultTaskLimit;
}
@@ -81,6 +86,7 @@ bool
ThreadingServiceConfig::operator==(const ThreadingServiceConfig &rhs) const
{
return _indexingThreads == rhs._indexingThreads &&
+ _master_task_limit == rhs._master_task_limit &&
_defaultTaskLimit == rhs._defaultTaskLimit &&
_optimize == rhs._optimize &&
_kindOfWatermark == rhs._kindOfWatermark &&
diff --git a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h
index 5869eaf9c2e..f1a4f0525d1 100644
--- a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h
+++ b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h
@@ -21,6 +21,7 @@ public:
private:
uint32_t _indexingThreads;
+ uint32_t _master_task_limit;
uint32_t _defaultTaskLimit;
OptimizeFor _optimize;
uint32_t _kindOfWatermark;
@@ -28,14 +29,16 @@ private:
SharedFieldWriterExecutor _shared_field_writer;
private:
- ThreadingServiceConfig(uint32_t indexingThreads_, uint32_t defaultTaskLimit_, OptimizeFor optimize_,
- uint32_t kindOfWatermark_, vespalib::duration reactionTime_, SharedFieldWriterExecutor shared_field_writer_);
+ ThreadingServiceConfig(uint32_t indexingThreads_, uint32_t master_task_limit_, uint32_t defaultTaskLimit_,
+ OptimizeFor optimize_, uint32_t kindOfWatermark_, vespalib::duration reactionTime_,
+ SharedFieldWriterExecutor shared_field_writer_);
public:
static ThreadingServiceConfig make(const ProtonConfig &cfg, double concurrency, const HwInfo::Cpu &cpuInfo);
static ThreadingServiceConfig make(uint32_t indexingThreads, SharedFieldWriterExecutor shared_field_writer_ = SharedFieldWriterExecutor::NONE);
void update(const ThreadingServiceConfig& cfg);
uint32_t indexingThreads() const { return _indexingThreads; }
+ uint32_t master_task_limit() const { return _master_task_limit; }
uint32_t defaultTaskLimit() const { return _defaultTaskLimit; }
OptimizeFor optimize() const { return _optimize; }
uint32_t kindOfwatermark() const { return _kindOfWatermark; }
diff --git a/searchcore/src/vespa/searchcore/proton/test/threading_service_observer.h b/searchcore/src/vespa/searchcore/proton/test/threading_service_observer.h
index 1d379f439fa..46527362091 100644
--- a/searchcore/src/vespa/searchcore/proton/test/threading_service_observer.h
+++ b/searchcore/src/vespa/searchcore/proton/test/threading_service_observer.h
@@ -37,6 +37,10 @@ public:
_service.sync_all_executors();
}
+ void blocking_master_execute(vespalib::Executor::Task::UP task) override {
+ _service.blocking_master_execute(std::move(task));
+ }
+
searchcorespi::index::IThreadService &master() override {
return _master;
}
diff --git a/searchcorespi/src/vespa/searchcorespi/index/ithreadingservice.h b/searchcorespi/src/vespa/searchcorespi/index/ithreadingservice.h
index f30aec94d53..0660f3ab495 100644
--- a/searchcorespi/src/vespa/searchcorespi/index/ithreadingservice.h
+++ b/searchcorespi/src/vespa/searchcorespi/index/ithreadingservice.h
@@ -65,6 +65,12 @@ struct IThreadingService
virtual void sync_all_executors() = 0;
+ /**
+ * Block the calling thread until the master thread has capacity to handle more tasks,
+ * and then execute the given task in the master thread.
+ */
+ virtual void blocking_master_execute(vespalib::Executor::Task::UP task) = 0;
+
virtual IThreadService &master() = 0;
virtual IThreadService &index() = 0;
virtual IThreadService &summary() = 0;