diff options
Diffstat (limited to 'persistence')
3 files changed, 78 insertions, 0 deletions
diff --git a/persistence/src/vespa/persistence/dummyimpl/CMakeLists.txt b/persistence/src/vespa/persistence/dummyimpl/CMakeLists.txt index 988f5a05ef4..ff730045070 100644 --- a/persistence/src/vespa/persistence/dummyimpl/CMakeLists.txt +++ b/persistence/src/vespa/persistence/dummyimpl/CMakeLists.txt @@ -2,5 +2,6 @@ vespa_add_library(persistence_dummyimpl OBJECT SOURCES dummypersistence.cpp + dummy_bucket_executor.cpp DEPENDS ) diff --git a/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.cpp b/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.cpp new file mode 100644 index 00000000000..c51eadc2906 --- /dev/null +++ b/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.cpp @@ -0,0 +1,48 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "dummy_bucket_executor.h" +#include <vespa/vespalib/util/threadstackexecutor.h> +#include <vespa/vespalib/util/lambdatask.h> +#include <vespa/vespalib/util/destructor_callbacks.h> + +using vespalib::makeLambdaTask; +using vespalib::makeLambdaCallback; + +namespace storage::spi::dummy { + +DummyBucketExecutor::DummyBucketExecutor(size_t numExecutors) + : _executor(std::make_unique<vespalib::ThreadStackExecutor>(numExecutors, 0x10000)), + _lock(), + _cond(), + _inFlight() +{ +} + +DummyBucketExecutor::~DummyBucketExecutor() = default; + +std::unique_ptr<BucketTask> +DummyBucketExecutor::execute(const Bucket & bucket, std::unique_ptr<BucketTask> task) { + { + std::unique_lock guard(_lock); + while (_inFlight.contains(bucket.getBucket())) { + _cond.wait(guard); + } + _inFlight.insert(bucket.getBucket()); + } + _executor->execute(makeLambdaTask([this, bucket, bucketTask=std::move(task)]() { + bucketTask->run(bucket, makeLambdaCallback([this, bucket]() { + std::unique_lock guard(_lock); + assert(_inFlight.contains(bucket.getBucket())); + _inFlight.erase(bucket.getBucket()); + _cond.notify_all(); + })); + })); + return task; +} + +void +DummyBucketExecutor::sync() { + _executor->sync(); +} + +} diff --git a/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.h b/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.h new file mode 100644 index 00000000000..b832cb6c02c --- /dev/null +++ b/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.h @@ -0,0 +1,29 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <vespa/persistence/spi/bucketexecutor.h> +#include <vespa/vespalib/util/threadexecutor.h> +#include <mutex> +#include <condition_variable> +#include <unordered_set> + +namespace storage::spi::dummy { + +/** + * Simple implementation of a bucket executor. It can schedule multiple tasks concurrently, but only one per bucket. + */ +class DummyBucketExecutor : public BucketExecutor { +public: + DummyBucketExecutor(size_t numExecutors); + ~DummyBucketExecutor() override; + std::unique_ptr<BucketTask> execute(const Bucket & bucket, std::unique_ptr<BucketTask> task) override; + void sync() override; +private: + std::unique_ptr<vespalib::SyncableThreadExecutor> _executor; + std::mutex _lock; + std::condition_variable _cond; + std::unordered_set<document::Bucket, document::Bucket::hash> _inFlight; +}; + +} |