summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGeir Storli <geirst@yahoo-inc.com>2017-06-28 15:15:17 +0000
committerGeir Storli <geirst@yahoo-inc.com>2017-07-03 10:54:34 +0000
commit2a07fc76adf84495eef9281036207230976b306e (patch)
tree5563dac7a0221612081e10d0d29ffddf89ad965f
parentf022a3af057e5acf763da02e139ef87118ea51ae (diff)
Add class to limit the number of outstanding move operations a blockable maintenance job can have.
-rw-r--r--searchcore/CMakeLists.txt1
-rw-r--r--searchcore/src/tests/proton/documentdb/move_operation_limiter/CMakeLists.txt8
-rw-r--r--searchcore/src/tests/proton/documentdb/move_operation_limiter/FILES1
-rw-r--r--searchcore/src/tests/proton/documentdb/move_operation_limiter/move_operation_limiter_test.cpp106
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt1
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/i_blockable_maintenance_job.h3
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/move_operation_limiter.cpp74
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/move_operation_limiter.h44
8 files changed, 237 insertions, 1 deletions
diff --git a/searchcore/CMakeLists.txt b/searchcore/CMakeLists.txt
index c1d117d51b3..208be618b1f 100644
--- a/searchcore/CMakeLists.txt
+++ b/searchcore/CMakeLists.txt
@@ -95,6 +95,7 @@ vespa_define_module(
src/tests/proton/documentdb/job_tracked_maintenance_job
src/tests/proton/documentdb/lid_space_compaction
src/tests/proton/documentdb/maintenancecontroller
+ src/tests/proton/documentdb/move_operation_limiter
src/tests/proton/documentdb/storeonlyfeedview
src/tests/proton/documentmetastore
src/tests/proton/documentmetastore/lidreusedelayer
diff --git a/searchcore/src/tests/proton/documentdb/move_operation_limiter/CMakeLists.txt b/searchcore/src/tests/proton/documentdb/move_operation_limiter/CMakeLists.txt
new file mode 100644
index 00000000000..1cf0cf59ad3
--- /dev/null
+++ b/searchcore/src/tests/proton/documentdb/move_operation_limiter/CMakeLists.txt
@@ -0,0 +1,8 @@
+# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+vespa_add_executable(searchcore_move_operation_limiter_test_app TEST
+ SOURCES
+ move_operation_limiter_test.cpp
+ DEPENDS
+ searchcore_server
+)
+vespa_add_test(NAME searchcore_move_operation_limiter_test_app COMMAND searchcore_move_operation_limiter_test_app)
diff --git a/searchcore/src/tests/proton/documentdb/move_operation_limiter/FILES b/searchcore/src/tests/proton/documentdb/move_operation_limiter/FILES
new file mode 100644
index 00000000000..833ddbf7789
--- /dev/null
+++ b/searchcore/src/tests/proton/documentdb/move_operation_limiter/FILES
@@ -0,0 +1 @@
+move_operation_limiter_test.cpp
diff --git a/searchcore/src/tests/proton/documentdb/move_operation_limiter/move_operation_limiter_test.cpp b/searchcore/src/tests/proton/documentdb/move_operation_limiter/move_operation_limiter_test.cpp
new file mode 100644
index 00000000000..df9aaab675c
--- /dev/null
+++ b/searchcore/src/tests/proton/documentdb/move_operation_limiter/move_operation_limiter_test.cpp
@@ -0,0 +1,106 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/log/log.h>
+LOG_SETUP("move_operation_limiter_test");
+
+#include <vespa/searchcore/proton/server/i_blockable_maintenance_job.h>
+#include <vespa/searchcore/proton/server/move_operation_limiter.h>
+#include <vespa/vespalib/testkit/testapp.h>
+#include <queue>
+
+using namespace proton;
+
+struct MyBlockableMaintenanceJob : public IBlockableMaintenanceJob {
+ bool blocked;
+ MyBlockableMaintenanceJob()
+ : IBlockableMaintenanceJob("my_job", 1.0, 1.0),
+ blocked(false)
+ {}
+ virtual void setBlocked(BlockedReason reason) override {
+ ASSERT_TRUE(reason == BlockedReason::OUTSTANDING_OPS);
+ EXPECT_FALSE(blocked);
+ blocked = true;
+ }
+ virtual void unBlock(BlockedReason reason) override {
+ ASSERT_TRUE(reason == BlockedReason::OUTSTANDING_OPS);
+ EXPECT_TRUE(blocked);
+ blocked = false;
+ }
+ virtual bool run() override { return true; }
+};
+
+struct Fixture {
+ using OpsQueue = std::queue<std::shared_ptr<search::IDestructorCallback>>;
+ using MoveOperationLimiterSP = std::shared_ptr<MoveOperationLimiter>;
+
+ MyBlockableMaintenanceJob job;
+ MoveOperationLimiterSP limiter;
+ OpsQueue ops;
+ Fixture(uint32_t maxOutstandingOps = 2)
+ : job(),
+ limiter(std::make_shared<MoveOperationLimiter>(&job, maxOutstandingOps)),
+ ops()
+ {}
+ void beginOp() { ops.push(limiter->beginOperation()); }
+ void endOp() { ops.pop(); }
+ void clearJob() { limiter->clearJob(); }
+ void clearLimiter() { limiter = MoveOperationLimiterSP(); }
+ void assertAboveLimit() const {
+ EXPECT_TRUE(limiter->isAboveLimit());
+ EXPECT_TRUE(job.blocked);
+ }
+ void assertBelowLimit() const {
+ EXPECT_FALSE(limiter->isAboveLimit());
+ EXPECT_FALSE(job.blocked);
+ }
+};
+
+TEST_F("require that job is blocked / unblocked when crossing max outstanding ops boundaries", Fixture)
+{
+ f.beginOp();
+ TEST_DO(f.assertBelowLimit());
+ f.beginOp();
+ TEST_DO(f.assertAboveLimit());
+ f.beginOp();
+ TEST_DO(f.assertAboveLimit());
+ f.endOp();
+ TEST_DO(f.assertAboveLimit());
+ f.endOp();
+ TEST_DO(f.assertBelowLimit());
+ f.endOp();
+ TEST_DO(f.assertBelowLimit());
+}
+
+TEST_F("require that cleared job is not blocked when crossing max ops boundary", Fixture)
+{
+ f.beginOp();
+ f.clearJob();
+ f.beginOp();
+ EXPECT_FALSE(f.job.blocked);
+ EXPECT_TRUE(f.limiter->isAboveLimit());
+}
+
+TEST_F("require that cleared job is not unblocked when crossing max ops boundary", Fixture)
+{
+ f.beginOp();
+ f.beginOp();
+ TEST_DO(f.assertAboveLimit());
+ f.clearJob();
+ f.endOp();
+ EXPECT_TRUE(f.job.blocked);
+ EXPECT_FALSE(f.limiter->isAboveLimit());
+}
+
+TEST_F("require that destructor callback has reference to limiter via shared ptr", Fixture)
+{
+ f.beginOp();
+ f.beginOp();
+ TEST_DO(f.assertAboveLimit());
+ f.clearLimiter();
+ f.endOp();
+ EXPECT_FALSE(f.job.blocked);
+}
+
+TEST_MAIN()
+{
+ TEST_RUN_ALL();
+}
diff --git a/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt b/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt
index 7208a3695e7..f58b94426c4 100644
--- a/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt
+++ b/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt
@@ -63,6 +63,7 @@ vespa_add_library(searchcore_server STATIC
memory_flush_config_updater.cpp
memoryflush.cpp
minimal_document_retriever.cpp
+ move_operation_limiter.cpp
ooscli.cpp
operationdonecontext.cpp
persistencehandlerproxy.cpp
diff --git a/searchcore/src/vespa/searchcore/proton/server/i_blockable_maintenance_job.h b/searchcore/src/vespa/searchcore/proton/server/i_blockable_maintenance_job.h
index aa4d5ed79b6..9544485cc86 100644
--- a/searchcore/src/vespa/searchcore/proton/server/i_blockable_maintenance_job.h
+++ b/searchcore/src/vespa/searchcore/proton/server/i_blockable_maintenance_job.h
@@ -15,7 +15,8 @@ public:
enum class BlockedReason {
RESOURCE_LIMITS = 0,
FROZEN_BUCKET = 1,
- CLUSTER_STATE = 2
+ CLUSTER_STATE = 2,
+ OUTSTANDING_OPS = 3
};
IBlockableMaintenanceJob(const vespalib::string &name,
diff --git a/searchcore/src/vespa/searchcore/proton/server/move_operation_limiter.cpp b/searchcore/src/vespa/searchcore/proton/server/move_operation_limiter.cpp
new file mode 100644
index 00000000000..6c0c0863fe1
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/proton/server/move_operation_limiter.cpp
@@ -0,0 +1,74 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "i_blockable_maintenance_job.h"
+#include "move_operation_limiter.h"
+#include <cassert>
+
+namespace proton {
+
+using BlockedReason = IBlockableMaintenanceJob::BlockedReason;
+
+struct MoveOperationLimiter::Callback : public search::IDestructorCallback {
+ MoveOperationLimiter::SP _limiter;
+ Callback(MoveOperationLimiter::SP limiter) : _limiter(std::move(limiter)) {}
+ virtual ~Callback() { _limiter->endOperation(); }
+};
+
+bool
+MoveOperationLimiter::isOnLimit(const LockGuard &) const
+{
+ return (_outstandingOps == _maxOutstandingOps);
+}
+
+void
+MoveOperationLimiter::endOperation()
+{
+ LockGuard guard(_mutex);
+ bool considerUnblock = isOnLimit(guard);
+ assert(_outstandingOps > 0);
+ --_outstandingOps;
+ if (_job && considerUnblock) {
+ _job->unBlock(BlockedReason::OUTSTANDING_OPS);
+ }
+}
+
+MoveOperationLimiter::MoveOperationLimiter(IBlockableMaintenanceJob *job,
+ uint32_t maxOutstandingOps)
+ : _mutex(),
+ _job(job),
+ _outstandingOps(0),
+ _maxOutstandingOps(maxOutstandingOps)
+{
+}
+
+MoveOperationLimiter::~MoveOperationLimiter()
+{
+}
+
+void
+MoveOperationLimiter::clearJob()
+{
+ LockGuard guard(_mutex);
+ _job = nullptr;
+}
+
+bool
+MoveOperationLimiter::isAboveLimit() const
+{
+ LockGuard guard(_mutex);
+ return (_outstandingOps >= _maxOutstandingOps);
+}
+
+std::shared_ptr<search::IDestructorCallback>
+MoveOperationLimiter::beginOperation()
+{
+ LockGuard guard(_mutex);
+ ++_outstandingOps;
+ if (_job && isOnLimit(guard)) {
+ _job->setBlocked(BlockedReason::OUTSTANDING_OPS);
+ }
+ MoveOperationLimiter::SP thisPtr = shared_from_this();
+ return std::make_shared<Callback>(std::move(thisPtr));
+}
+
+}
diff --git a/searchcore/src/vespa/searchcore/proton/server/move_operation_limiter.h b/searchcore/src/vespa/searchcore/proton/server/move_operation_limiter.h
new file mode 100644
index 00000000000..c86ff9fe2a5
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/proton/server/move_operation_limiter.h
@@ -0,0 +1,44 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <vespa/searchlib/common/idestructorcallback.h>
+#include <memory>
+#include <mutex>
+
+namespace proton {
+
+class IBlockableMaintenanceJob;
+
+/**
+ * Class used to limit the number of outstanding move operations a blockable maintenance job can have.
+ *
+ * When crossing the boundary of max outstanding operations the job is blocked/unblocked.
+ * Create a destructor callback with beginOperation() and pass this to the component(s) responsible for handling the move operation.
+ * When this object is destructed (in any thread) the limiter is signaled and the job can be unblocked (if blocked).
+ */
+class MoveOperationLimiter : public std::enable_shared_from_this<MoveOperationLimiter> {
+private:
+ using SP = std::shared_ptr<MoveOperationLimiter>;
+ using LockGuard = std::lock_guard<std::mutex>;
+
+ struct Callback;
+
+ mutable std::mutex _mutex;
+ IBlockableMaintenanceJob *_job;
+ uint32_t _outstandingOps;
+ const uint32_t _maxOutstandingOps;
+
+ bool isOnLimit(const LockGuard &guard) const;
+ void endOperation();
+
+public:
+ MoveOperationLimiter(IBlockableMaintenanceJob *job,
+ uint32_t maxOutstandingOps);
+ ~MoveOperationLimiter();
+ void clearJob();
+ bool isAboveLimit() const;
+ std::shared_ptr<search::IDestructorCallback> beginOperation();
+};
+
+
+}