diff options
Diffstat (limited to 'persistence')
-rw-r--r-- | persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.cpp | 51 | ||||
-rw-r--r-- | persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.h | 15 |
2 files changed, 63 insertions, 3 deletions
diff --git a/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.cpp b/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.cpp index 060215c4521..953cfcf733f 100644 --- a/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.cpp +++ b/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.cpp @@ -14,7 +14,9 @@ DummyBucketExecutor::DummyBucketExecutor(size_t numExecutors) : _executor(std::make_unique<vespalib::ThreadStackExecutor>(numExecutors, 0x10000)), _lock(), _cond(), - _inFlight() + _inFlight(), + _defer_tasks(false), + _deferred_tasks() { } @@ -24,6 +26,15 @@ DummyBucketExecutor::~DummyBucketExecutor() { void DummyBucketExecutor::execute(const Bucket & bucket, std::unique_ptr<BucketTask> task) { + if (!_defer_tasks) { + internal_execute_no_defer(bucket, std::move(task)); + } else { + _deferred_tasks.emplace_back(bucket, std::move(task)); + } +} + +void +DummyBucketExecutor::internal_execute_no_defer(const Bucket& bucket, std::unique_ptr<BucketTask> task) { auto failed = _executor->execute(makeLambdaTask([this, bucket, bucketTask=std::move(task)]() { { std::unique_lock guard(_lock); @@ -45,6 +56,44 @@ DummyBucketExecutor::execute(const Bucket & bucket, std::unique_ptr<BucketTask> } void +DummyBucketExecutor::defer_new_tasks() { + std::lock_guard guard(_lock); + _defer_tasks = true; +} + +void +DummyBucketExecutor::schedule_all_deferred_tasks() { + DeferredTasks to_run; + { + std::lock_guard guard(_lock); + assert(_defer_tasks); + _deferred_tasks.swap(to_run); + } + for (auto& bucket_and_task : to_run) { + internal_execute_no_defer(bucket_and_task.first, std::move(bucket_and_task.second)); + } +} + +size_t +DummyBucketExecutor::num_deferred_tasks() const noexcept { + std::lock_guard guard(_lock); + return _deferred_tasks.size(); +} + +void +DummyBucketExecutor::schedule_single_deferred_task() { + std::pair<Bucket, std::unique_ptr<BucketTask>> bucket_and_task; + { + std::lock_guard guard(_lock); + assert(_defer_tasks); + assert(!_deferred_tasks.empty()); + bucket_and_task = std::move(_deferred_tasks.front()); + _deferred_tasks.pop_front(); + } + internal_execute_no_defer(bucket_and_task.first, std::move(bucket_and_task.second)); +} + +void DummyBucketExecutor::sync() { _executor->sync(); } diff --git a/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.h b/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.h index 86b497437fe..3e1432f7f54 100644 --- a/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.h +++ b/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.h @@ -4,6 +4,7 @@ #include <vespa/persistence/spi/bucketexecutor.h> #include <vespa/vespalib/util/threadexecutor.h> +#include <deque> #include <mutex> #include <condition_variable> #include <unordered_set> @@ -19,11 +20,21 @@ public: ~DummyBucketExecutor() override; void execute(const Bucket & bucket, std::unique_ptr<BucketTask> task) override; void sync(); + void defer_new_tasks(); + [[nodiscard]] size_t num_deferred_tasks() const noexcept; + void schedule_single_deferred_task(); + void schedule_all_deferred_tasks(); private: + void internal_execute_no_defer(const Bucket & bucket, std::unique_ptr<BucketTask> task); + + using DeferredTasks = std::deque<std::pair<Bucket, std::unique_ptr<BucketTask>>>; + std::unique_ptr<vespalib::SyncableThreadExecutor> _executor; - std::mutex _lock; + mutable std::mutex _lock; std::condition_variable _cond; - std::unordered_set<document::Bucket, document::Bucket::hash> _inFlight; + std::unordered_set<document::Bucket, document::Bucket::hash> _inFlight; + bool _defer_tasks; + DeferredTasks _deferred_tasks; }; } |