summaryrefslogtreecommitdiffstats
path: root/persistence
diff options
context:
space:
mode:
Diffstat (limited to 'persistence')
-rw-r--r--persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.cpp51
-rw-r--r--persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.h15
2 files changed, 63 insertions, 3 deletions
diff --git a/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.cpp b/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.cpp
index 060215c4521..953cfcf733f 100644
--- a/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.cpp
+++ b/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.cpp
@@ -14,7 +14,9 @@ DummyBucketExecutor::DummyBucketExecutor(size_t numExecutors)
: _executor(std::make_unique<vespalib::ThreadStackExecutor>(numExecutors, 0x10000)),
_lock(),
_cond(),
- _inFlight()
+ _inFlight(),
+ _defer_tasks(false),
+ _deferred_tasks()
{
}
@@ -24,6 +26,15 @@ DummyBucketExecutor::~DummyBucketExecutor() {
void
DummyBucketExecutor::execute(const Bucket & bucket, std::unique_ptr<BucketTask> task) {
+ if (!_defer_tasks) {
+ internal_execute_no_defer(bucket, std::move(task));
+ } else {
+ _deferred_tasks.emplace_back(bucket, std::move(task));
+ }
+}
+
+void
+DummyBucketExecutor::internal_execute_no_defer(const Bucket& bucket, std::unique_ptr<BucketTask> task) {
auto failed = _executor->execute(makeLambdaTask([this, bucket, bucketTask=std::move(task)]() {
{
std::unique_lock guard(_lock);
@@ -45,6 +56,44 @@ DummyBucketExecutor::execute(const Bucket & bucket, std::unique_ptr<BucketTask>
}
void
+DummyBucketExecutor::defer_new_tasks() {
+ std::lock_guard guard(_lock);
+ _defer_tasks = true;
+}
+
+void
+DummyBucketExecutor::schedule_all_deferred_tasks() {
+ DeferredTasks to_run;
+ {
+ std::lock_guard guard(_lock);
+ assert(_defer_tasks);
+ _deferred_tasks.swap(to_run);
+ }
+ for (auto& bucket_and_task : to_run) {
+ internal_execute_no_defer(bucket_and_task.first, std::move(bucket_and_task.second));
+ }
+}
+
+size_t
+DummyBucketExecutor::num_deferred_tasks() const noexcept {
+ std::lock_guard guard(_lock);
+ return _deferred_tasks.size();
+}
+
+void
+DummyBucketExecutor::schedule_single_deferred_task() {
+ std::pair<Bucket, std::unique_ptr<BucketTask>> bucket_and_task;
+ {
+ std::lock_guard guard(_lock);
+ assert(_defer_tasks);
+ assert(!_deferred_tasks.empty());
+ bucket_and_task = std::move(_deferred_tasks.front());
+ _deferred_tasks.pop_front();
+ }
+ internal_execute_no_defer(bucket_and_task.first, std::move(bucket_and_task.second));
+}
+
+void
DummyBucketExecutor::sync() {
_executor->sync();
}
diff --git a/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.h b/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.h
index 86b497437fe..3e1432f7f54 100644
--- a/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.h
+++ b/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.h
@@ -4,6 +4,7 @@
#include <vespa/persistence/spi/bucketexecutor.h>
#include <vespa/vespalib/util/threadexecutor.h>
+#include <deque>
#include <mutex>
#include <condition_variable>
#include <unordered_set>
@@ -19,11 +20,21 @@ public:
~DummyBucketExecutor() override;
void execute(const Bucket & bucket, std::unique_ptr<BucketTask> task) override;
void sync();
+ void defer_new_tasks();
+ [[nodiscard]] size_t num_deferred_tasks() const noexcept;
+ void schedule_single_deferred_task();
+ void schedule_all_deferred_tasks();
private:
+ void internal_execute_no_defer(const Bucket & bucket, std::unique_ptr<BucketTask> task);
+
+ using DeferredTasks = std::deque<std::pair<Bucket, std::unique_ptr<BucketTask>>>;
+
std::unique_ptr<vespalib::SyncableThreadExecutor> _executor;
- std::mutex _lock;
+ mutable std::mutex _lock;
std::condition_variable _cond;
- std::unordered_set<document::Bucket, document::Bucket::hash> _inFlight;
+ std::unordered_set<document::Bucket, document::Bucket::hash> _inFlight;
+ bool _defer_tasks;
+ DeferredTasks _deferred_tasks;
};
}