summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2021-01-21 12:23:47 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2021-01-21 12:30:27 +0000
commit350ef35c38f276354991ad1444f5500d07b57802 (patch)
tree9db68b138ea32e5cadb8cd049bd8b0aa52311a1d
parent29970d8a10da432c379bdcdecc96ab01c751dc83 (diff)
Add DummyBucketExecutor for use in testing.
-rw-r--r--persistence/src/vespa/persistence/dummyimpl/CMakeLists.txt1
-rw-r--r--persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.cpp48
-rw-r--r--persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.h29
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/i_lid_space_compaction_handler.h2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/move_operation_limiter.cpp2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/move_operation_limiter.h11
-rw-r--r--vespalib/src/vespa/vespalib/util/destructor_callbacks.h29
7 files changed, 110 insertions, 12 deletions
diff --git a/persistence/src/vespa/persistence/dummyimpl/CMakeLists.txt b/persistence/src/vespa/persistence/dummyimpl/CMakeLists.txt
index 988f5a05ef4..ff730045070 100644
--- a/persistence/src/vespa/persistence/dummyimpl/CMakeLists.txt
+++ b/persistence/src/vespa/persistence/dummyimpl/CMakeLists.txt
@@ -2,5 +2,6 @@
vespa_add_library(persistence_dummyimpl OBJECT
SOURCES
dummypersistence.cpp
+ dummy_bucket_executor.cpp
DEPENDS
)
diff --git a/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.cpp b/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.cpp
new file mode 100644
index 00000000000..c51eadc2906
--- /dev/null
+++ b/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.cpp
@@ -0,0 +1,48 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "dummy_bucket_executor.h"
+#include <vespa/vespalib/util/threadstackexecutor.h>
+#include <vespa/vespalib/util/lambdatask.h>
+#include <vespa/vespalib/util/destructor_callbacks.h>
+
+using vespalib::makeLambdaTask;
+using vespalib::makeLambdaCallback;
+
+namespace storage::spi::dummy {
+
+DummyBucketExecutor::DummyBucketExecutor(size_t numExecutors)
+ : _executor(std::make_unique<vespalib::ThreadStackExecutor>(numExecutors, 0x10000)),
+ _lock(),
+ _cond(),
+ _inFlight()
+{
+}
+
+DummyBucketExecutor::~DummyBucketExecutor() = default;
+
+std::unique_ptr<BucketTask>
+DummyBucketExecutor::execute(const Bucket & bucket, std::unique_ptr<BucketTask> task) {
+ {
+ std::unique_lock guard(_lock);
+ while (_inFlight.contains(bucket.getBucket())) {
+ _cond.wait(guard);
+ }
+ _inFlight.insert(bucket.getBucket());
+ }
+ _executor->execute(makeLambdaTask([this, bucket, bucketTask=std::move(task)]() {
+ bucketTask->run(bucket, makeLambdaCallback([this, bucket]() {
+ std::unique_lock guard(_lock);
+ assert(_inFlight.contains(bucket.getBucket()));
+ _inFlight.erase(bucket.getBucket());
+ _cond.notify_all();
+ }));
+ }));
+ return task;
+}
+
+void
+DummyBucketExecutor::sync() {
+ _executor->sync();
+}
+
+}
diff --git a/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.h b/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.h
new file mode 100644
index 00000000000..b832cb6c02c
--- /dev/null
+++ b/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.h
@@ -0,0 +1,29 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <vespa/persistence/spi/bucketexecutor.h>
+#include <vespa/vespalib/util/threadexecutor.h>
+#include <mutex>
+#include <condition_variable>
+#include <unordered_set>
+
+namespace storage::spi::dummy {
+
+/**
+ * Simple implementation of a bucket executor. It can schedule multiple tasks concurrently, but only one per bucket.
+ */
+class DummyBucketExecutor : public BucketExecutor {
+public:
+ DummyBucketExecutor(size_t numExecutors);
+ ~DummyBucketExecutor() override;
+ std::unique_ptr<BucketTask> execute(const Bucket & bucket, std::unique_ptr<BucketTask> task) override;
+ void sync() override;
+private:
+ std::unique_ptr<vespalib::SyncableThreadExecutor> _executor;
+ std::mutex _lock;
+ std::condition_variable _cond;
+ std::unordered_set<document::Bucket, document::Bucket::hash> _inFlight;
+};
+
+}
diff --git a/searchcore/src/vespa/searchcore/proton/server/i_lid_space_compaction_handler.h b/searchcore/src/vespa/searchcore/proton/server/i_lid_space_compaction_handler.h
index baf5c3033f9..afc1a4a0a7f 100644
--- a/searchcore/src/vespa/searchcore/proton/server/i_lid_space_compaction_handler.h
+++ b/searchcore/src/vespa/searchcore/proton/server/i_lid_space_compaction_handler.h
@@ -24,7 +24,7 @@ struct ILidSpaceCompactionHandler
typedef std::unique_ptr<ILidSpaceCompactionHandler> UP;
typedef std::vector<UP> Vector;
- virtual ~ILidSpaceCompactionHandler() {}
+ virtual ~ILidSpaceCompactionHandler() = default;
/**
* Returns the name of this handler.
diff --git a/searchcore/src/vespa/searchcore/proton/server/move_operation_limiter.cpp b/searchcore/src/vespa/searchcore/proton/server/move_operation_limiter.cpp
index 6e8c878d8c1..42fe492539c 100644
--- a/searchcore/src/vespa/searchcore/proton/server/move_operation_limiter.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/move_operation_limiter.cpp
@@ -11,7 +11,7 @@ using BlockedReason = IBlockableMaintenanceJob::BlockedReason;
struct MoveOperationLimiter::Callback : public vespalib::IDestructorCallback {
MoveOperationLimiter::SP _limiter;
Callback(MoveOperationLimiter::SP limiter) noexcept : _limiter(std::move(limiter)) {}
- virtual ~Callback() { _limiter->endOperation(); }
+ ~Callback() override { _limiter->endOperation(); }
};
bool
diff --git a/searchcore/src/vespa/searchcore/proton/server/move_operation_limiter.h b/searchcore/src/vespa/searchcore/proton/server/move_operation_limiter.h
index 05b3e5cba0b..04440a7451d 100644
--- a/searchcore/src/vespa/searchcore/proton/server/move_operation_limiter.h
+++ b/searchcore/src/vespa/searchcore/proton/server/move_operation_limiter.h
@@ -24,22 +24,21 @@ private:
struct Callback;
- mutable std::mutex _mutex;
+ mutable std::mutex _mutex;
IBlockableMaintenanceJob *_job;
- uint32_t _outstandingOps;
- const uint32_t _maxOutstandingOps;
+ uint32_t _outstandingOps;
+ const uint32_t _maxOutstandingOps;
bool isOnLimit(const LockGuard &guard) const;
void endOperation();
public:
using SP = std::shared_ptr<MoveOperationLimiter>;
- MoveOperationLimiter(IBlockableMaintenanceJob *job,
- uint32_t maxOutstandingOps);
+ MoveOperationLimiter(IBlockableMaintenanceJob *job, uint32_t maxOutstandingOps);
~MoveOperationLimiter();
void clearJob();
bool isAboveLimit() const;
- virtual std::shared_ptr<vespalib::IDestructorCallback> beginOperation() override;
+ std::shared_ptr<vespalib::IDestructorCallback> beginOperation() override;
};
}
diff --git a/vespalib/src/vespa/vespalib/util/destructor_callbacks.h b/vespalib/src/vespa/vespalib/util/destructor_callbacks.h
index fc0d23fa177..211105127cf 100644
--- a/vespalib/src/vespa/vespalib/util/destructor_callbacks.h
+++ b/vespalib/src/vespa/vespalib/util/destructor_callbacks.h
@@ -8,25 +8,46 @@ namespace vespalib {
class Gate;
-class GateCallback : public vespalib::IDestructorCallback {
+class GateCallback : public IDestructorCallback {
public:
- GateCallback(vespalib::Gate & gate) noexcept : _gate(gate) {}
+ explicit GateCallback(vespalib::Gate & gate) noexcept : _gate(gate) {}
~GateCallback() override;
private:
vespalib::Gate & _gate;
};
-class IgnoreCallback : public vespalib::IDestructorCallback {
+class IgnoreCallback : public IDestructorCallback {
public:
IgnoreCallback() noexcept { }
~IgnoreCallback() override = default;
};
template <typename T>
-struct KeepAlive : public vespalib::IDestructorCallback {
+struct KeepAlive : public IDestructorCallback {
explicit KeepAlive(T toKeep) noexcept : _toKeep(std::move(toKeep)) { }
~KeepAlive() override = default;
T _toKeep;
};
+template<class FunctionType>
+class LambdaCallback : public IDestructorCallback {
+public:
+ explicit LambdaCallback(FunctionType &&func) noexcept
+ : _func(std::move(func))
+ {}
+ ~LambdaCallback() {
+ _func();
+ }
+private:
+ FunctionType _func;
+};
+
+template<class FunctionType>
+std::shared_ptr<IDestructorCallback>
+makeLambdaCallback(FunctionType &&function) {
+ return std::make_shared<LambdaCallback<std::decay_t<FunctionType>>>
+ (std::forward<FunctionType>(function));
+}
+
+
}