diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2021-01-21 12:23:47 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2021-01-21 12:30:27 +0000 |
commit | 350ef35c38f276354991ad1444f5500d07b57802 (patch) | |
tree | 9db68b138ea32e5cadb8cd049bd8b0aa52311a1d | |
parent | 29970d8a10da432c379bdcdecc96ab01c751dc83 (diff) |
Add DummyBucketExecutor for use in testing.
7 files changed, 110 insertions, 12 deletions
diff --git a/persistence/src/vespa/persistence/dummyimpl/CMakeLists.txt b/persistence/src/vespa/persistence/dummyimpl/CMakeLists.txt index 988f5a05ef4..ff730045070 100644 --- a/persistence/src/vespa/persistence/dummyimpl/CMakeLists.txt +++ b/persistence/src/vespa/persistence/dummyimpl/CMakeLists.txt @@ -2,5 +2,6 @@ vespa_add_library(persistence_dummyimpl OBJECT SOURCES dummypersistence.cpp + dummy_bucket_executor.cpp DEPENDS ) diff --git a/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.cpp b/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.cpp new file mode 100644 index 00000000000..c51eadc2906 --- /dev/null +++ b/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.cpp @@ -0,0 +1,48 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "dummy_bucket_executor.h" +#include <vespa/vespalib/util/threadstackexecutor.h> +#include <vespa/vespalib/util/lambdatask.h> +#include <vespa/vespalib/util/destructor_callbacks.h> + +using vespalib::makeLambdaTask; +using vespalib::makeLambdaCallback; + +namespace storage::spi::dummy { + +DummyBucketExecutor::DummyBucketExecutor(size_t numExecutors) + : _executor(std::make_unique<vespalib::ThreadStackExecutor>(numExecutors, 0x10000)), + _lock(), + _cond(), + _inFlight() +{ +} + +DummyBucketExecutor::~DummyBucketExecutor() = default; + +std::unique_ptr<BucketTask> +DummyBucketExecutor::execute(const Bucket & bucket, std::unique_ptr<BucketTask> task) { + { + std::unique_lock guard(_lock); + while (_inFlight.contains(bucket.getBucket())) { + _cond.wait(guard); + } + _inFlight.insert(bucket.getBucket()); + } + _executor->execute(makeLambdaTask([this, bucket, bucketTask=std::move(task)]() { + bucketTask->run(bucket, makeLambdaCallback([this, bucket]() { + std::unique_lock guard(_lock); + assert(_inFlight.contains(bucket.getBucket())); + _inFlight.erase(bucket.getBucket()); + _cond.notify_all(); + })); + })); + return task; +} + +void +DummyBucketExecutor::sync() { + _executor->sync(); +} + +} diff --git a/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.h b/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.h new file mode 100644 index 00000000000..b832cb6c02c --- /dev/null +++ b/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.h @@ -0,0 +1,29 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <vespa/persistence/spi/bucketexecutor.h> +#include <vespa/vespalib/util/threadexecutor.h> +#include <mutex> +#include <condition_variable> +#include <unordered_set> + +namespace storage::spi::dummy { + +/** + * Simple implementation of a bucket executor. It can schedule multiple tasks concurrently, but only one per bucket. + */ +class DummyBucketExecutor : public BucketExecutor { +public: + DummyBucketExecutor(size_t numExecutors); + ~DummyBucketExecutor() override; + std::unique_ptr<BucketTask> execute(const Bucket & bucket, std::unique_ptr<BucketTask> task) override; + void sync() override; +private: + std::unique_ptr<vespalib::SyncableThreadExecutor> _executor; + std::mutex _lock; + std::condition_variable _cond; + std::unordered_set<document::Bucket, document::Bucket::hash> _inFlight; +}; + +} diff --git a/searchcore/src/vespa/searchcore/proton/server/i_lid_space_compaction_handler.h b/searchcore/src/vespa/searchcore/proton/server/i_lid_space_compaction_handler.h index baf5c3033f9..afc1a4a0a7f 100644 --- a/searchcore/src/vespa/searchcore/proton/server/i_lid_space_compaction_handler.h +++ b/searchcore/src/vespa/searchcore/proton/server/i_lid_space_compaction_handler.h @@ -24,7 +24,7 @@ struct ILidSpaceCompactionHandler typedef std::unique_ptr<ILidSpaceCompactionHandler> UP; typedef std::vector<UP> Vector; - virtual ~ILidSpaceCompactionHandler() {} + virtual ~ILidSpaceCompactionHandler() = default; /** * Returns the name of this handler. diff --git a/searchcore/src/vespa/searchcore/proton/server/move_operation_limiter.cpp b/searchcore/src/vespa/searchcore/proton/server/move_operation_limiter.cpp index 6e8c878d8c1..42fe492539c 100644 --- a/searchcore/src/vespa/searchcore/proton/server/move_operation_limiter.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/move_operation_limiter.cpp @@ -11,7 +11,7 @@ using BlockedReason = IBlockableMaintenanceJob::BlockedReason; struct MoveOperationLimiter::Callback : public vespalib::IDestructorCallback { MoveOperationLimiter::SP _limiter; Callback(MoveOperationLimiter::SP limiter) noexcept : _limiter(std::move(limiter)) {} - virtual ~Callback() { _limiter->endOperation(); } + ~Callback() override { _limiter->endOperation(); } }; bool diff --git a/searchcore/src/vespa/searchcore/proton/server/move_operation_limiter.h b/searchcore/src/vespa/searchcore/proton/server/move_operation_limiter.h index 05b3e5cba0b..04440a7451d 100644 --- a/searchcore/src/vespa/searchcore/proton/server/move_operation_limiter.h +++ b/searchcore/src/vespa/searchcore/proton/server/move_operation_limiter.h @@ -24,22 +24,21 @@ private: struct Callback; - mutable std::mutex _mutex; + mutable std::mutex _mutex; IBlockableMaintenanceJob *_job; - uint32_t _outstandingOps; - const uint32_t _maxOutstandingOps; + uint32_t _outstandingOps; + const uint32_t _maxOutstandingOps; bool isOnLimit(const LockGuard &guard) const; void endOperation(); public: using SP = std::shared_ptr<MoveOperationLimiter>; - MoveOperationLimiter(IBlockableMaintenanceJob *job, - uint32_t maxOutstandingOps); + MoveOperationLimiter(IBlockableMaintenanceJob *job, uint32_t maxOutstandingOps); ~MoveOperationLimiter(); void clearJob(); bool isAboveLimit() const; - virtual std::shared_ptr<vespalib::IDestructorCallback> beginOperation() override; + std::shared_ptr<vespalib::IDestructorCallback> beginOperation() override; }; } diff --git a/vespalib/src/vespa/vespalib/util/destructor_callbacks.h b/vespalib/src/vespa/vespalib/util/destructor_callbacks.h index fc0d23fa177..211105127cf 100644 --- a/vespalib/src/vespa/vespalib/util/destructor_callbacks.h +++ b/vespalib/src/vespa/vespalib/util/destructor_callbacks.h @@ -8,25 +8,46 @@ namespace vespalib { class Gate; -class GateCallback : public vespalib::IDestructorCallback { +class GateCallback : public IDestructorCallback { public: - GateCallback(vespalib::Gate & gate) noexcept : _gate(gate) {} + explicit GateCallback(vespalib::Gate & gate) noexcept : _gate(gate) {} ~GateCallback() override; private: vespalib::Gate & _gate; }; -class IgnoreCallback : public vespalib::IDestructorCallback { +class IgnoreCallback : public IDestructorCallback { public: IgnoreCallback() noexcept { } ~IgnoreCallback() override = default; }; template <typename T> -struct KeepAlive : public vespalib::IDestructorCallback { +struct KeepAlive : public IDestructorCallback { explicit KeepAlive(T toKeep) noexcept : _toKeep(std::move(toKeep)) { } ~KeepAlive() override = default; T _toKeep; }; +template<class FunctionType> +class LambdaCallback : public IDestructorCallback { +public: + explicit LambdaCallback(FunctionType &&func) noexcept + : _func(std::move(func)) + {} + ~LambdaCallback() { + _func(); + } +private: + FunctionType _func; +}; + +template<class FunctionType> +std::shared_ptr<IDestructorCallback> +makeLambdaCallback(FunctionType &&function) { + return std::make_shared<LambdaCallback<std::decay_t<FunctionType>>> + (std::forward<FunctionType>(function)); +} + + } |