diff options
author | Geir Storli <geirst@yahoo-inc.com> | 2017-06-28 15:15:17 +0000 |
---|---|---|
committer | Geir Storli <geirst@yahoo-inc.com> | 2017-07-03 10:54:34 +0000 |
commit | 2a07fc76adf84495eef9281036207230976b306e (patch) | |
tree | 5563dac7a0221612081e10d0d29ffddf89ad965f | |
parent | f022a3af057e5acf763da02e139ef87118ea51ae (diff) |
Add class to limit the number of outstanding move operations a blockable maintenance job can have.
8 files changed, 237 insertions, 1 deletions
diff --git a/searchcore/CMakeLists.txt b/searchcore/CMakeLists.txt index c1d117d51b3..208be618b1f 100644 --- a/searchcore/CMakeLists.txt +++ b/searchcore/CMakeLists.txt @@ -95,6 +95,7 @@ vespa_define_module( src/tests/proton/documentdb/job_tracked_maintenance_job src/tests/proton/documentdb/lid_space_compaction src/tests/proton/documentdb/maintenancecontroller + src/tests/proton/documentdb/move_operation_limiter src/tests/proton/documentdb/storeonlyfeedview src/tests/proton/documentmetastore src/tests/proton/documentmetastore/lidreusedelayer diff --git a/searchcore/src/tests/proton/documentdb/move_operation_limiter/CMakeLists.txt b/searchcore/src/tests/proton/documentdb/move_operation_limiter/CMakeLists.txt new file mode 100644 index 00000000000..1cf0cf59ad3 --- /dev/null +++ b/searchcore/src/tests/proton/documentdb/move_operation_limiter/CMakeLists.txt @@ -0,0 +1,8 @@ +# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_executable(searchcore_move_operation_limiter_test_app TEST + SOURCES + move_operation_limiter_test.cpp + DEPENDS + searchcore_server +) +vespa_add_test(NAME searchcore_move_operation_limiter_test_app COMMAND searchcore_move_operation_limiter_test_app) diff --git a/searchcore/src/tests/proton/documentdb/move_operation_limiter/FILES b/searchcore/src/tests/proton/documentdb/move_operation_limiter/FILES new file mode 100644 index 00000000000..833ddbf7789 --- /dev/null +++ b/searchcore/src/tests/proton/documentdb/move_operation_limiter/FILES @@ -0,0 +1 @@ +move_operation_limiter_test.cpp diff --git a/searchcore/src/tests/proton/documentdb/move_operation_limiter/move_operation_limiter_test.cpp b/searchcore/src/tests/proton/documentdb/move_operation_limiter/move_operation_limiter_test.cpp new file mode 100644 index 00000000000..df9aaab675c --- /dev/null +++ b/searchcore/src/tests/proton/documentdb/move_operation_limiter/move_operation_limiter_test.cpp @@ -0,0 +1,106 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/log/log.h> +LOG_SETUP("move_operation_limiter_test"); + +#include <vespa/searchcore/proton/server/i_blockable_maintenance_job.h> +#include <vespa/searchcore/proton/server/move_operation_limiter.h> +#include <vespa/vespalib/testkit/testapp.h> +#include <queue> + +using namespace proton; + +struct MyBlockableMaintenanceJob : public IBlockableMaintenanceJob { + bool blocked; + MyBlockableMaintenanceJob() + : IBlockableMaintenanceJob("my_job", 1.0, 1.0), + blocked(false) + {} + virtual void setBlocked(BlockedReason reason) override { + ASSERT_TRUE(reason == BlockedReason::OUTSTANDING_OPS); + EXPECT_FALSE(blocked); + blocked = true; + } + virtual void unBlock(BlockedReason reason) override { + ASSERT_TRUE(reason == BlockedReason::OUTSTANDING_OPS); + EXPECT_TRUE(blocked); + blocked = false; + } + virtual bool run() override { return true; } +}; + +struct Fixture { + using OpsQueue = std::queue<std::shared_ptr<search::IDestructorCallback>>; + using MoveOperationLimiterSP = std::shared_ptr<MoveOperationLimiter>; + + MyBlockableMaintenanceJob job; + MoveOperationLimiterSP limiter; + OpsQueue ops; + Fixture(uint32_t maxOutstandingOps = 2) + : job(), + limiter(std::make_shared<MoveOperationLimiter>(&job, maxOutstandingOps)), + ops() + {} + void beginOp() { ops.push(limiter->beginOperation()); } + void endOp() { ops.pop(); } + void clearJob() { limiter->clearJob(); } + void clearLimiter() { limiter = MoveOperationLimiterSP(); } + void assertAboveLimit() const { + EXPECT_TRUE(limiter->isAboveLimit()); + EXPECT_TRUE(job.blocked); + } + void assertBelowLimit() const { + EXPECT_FALSE(limiter->isAboveLimit()); + EXPECT_FALSE(job.blocked); + } +}; + +TEST_F("require that job is blocked / unblocked when crossing max outstanding ops boundaries", Fixture) +{ + f.beginOp(); + TEST_DO(f.assertBelowLimit()); + f.beginOp(); + TEST_DO(f.assertAboveLimit()); + f.beginOp(); + TEST_DO(f.assertAboveLimit()); + f.endOp(); + TEST_DO(f.assertAboveLimit()); + f.endOp(); + TEST_DO(f.assertBelowLimit()); + f.endOp(); + TEST_DO(f.assertBelowLimit()); +} + +TEST_F("require that cleared job is not blocked when crossing max ops boundary", Fixture) +{ + f.beginOp(); + f.clearJob(); + f.beginOp(); + EXPECT_FALSE(f.job.blocked); + EXPECT_TRUE(f.limiter->isAboveLimit()); +} + +TEST_F("require that cleared job is not unblocked when crossing max ops boundary", Fixture) +{ + f.beginOp(); + f.beginOp(); + TEST_DO(f.assertAboveLimit()); + f.clearJob(); + f.endOp(); + EXPECT_TRUE(f.job.blocked); + EXPECT_FALSE(f.limiter->isAboveLimit()); +} + +TEST_F("require that destructor callback has reference to limiter via shared ptr", Fixture) +{ + f.beginOp(); + f.beginOp(); + TEST_DO(f.assertAboveLimit()); + f.clearLimiter(); + f.endOp(); + EXPECT_FALSE(f.job.blocked); +} + +TEST_MAIN() +{ + TEST_RUN_ALL(); +} diff --git a/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt b/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt index 7208a3695e7..f58b94426c4 100644 --- a/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt +++ b/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt @@ -63,6 +63,7 @@ vespa_add_library(searchcore_server STATIC memory_flush_config_updater.cpp memoryflush.cpp minimal_document_retriever.cpp + move_operation_limiter.cpp ooscli.cpp operationdonecontext.cpp persistencehandlerproxy.cpp diff --git a/searchcore/src/vespa/searchcore/proton/server/i_blockable_maintenance_job.h b/searchcore/src/vespa/searchcore/proton/server/i_blockable_maintenance_job.h index aa4d5ed79b6..9544485cc86 100644 --- a/searchcore/src/vespa/searchcore/proton/server/i_blockable_maintenance_job.h +++ b/searchcore/src/vespa/searchcore/proton/server/i_blockable_maintenance_job.h @@ -15,7 +15,8 @@ public: enum class BlockedReason { RESOURCE_LIMITS = 0, FROZEN_BUCKET = 1, - CLUSTER_STATE = 2 + CLUSTER_STATE = 2, + OUTSTANDING_OPS = 3 }; IBlockableMaintenanceJob(const vespalib::string &name, diff --git a/searchcore/src/vespa/searchcore/proton/server/move_operation_limiter.cpp b/searchcore/src/vespa/searchcore/proton/server/move_operation_limiter.cpp new file mode 100644 index 00000000000..6c0c0863fe1 --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/server/move_operation_limiter.cpp @@ -0,0 +1,74 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "i_blockable_maintenance_job.h" +#include "move_operation_limiter.h" +#include <cassert> + +namespace proton { + +using BlockedReason = IBlockableMaintenanceJob::BlockedReason; + +struct MoveOperationLimiter::Callback : public search::IDestructorCallback { + MoveOperationLimiter::SP _limiter; + Callback(MoveOperationLimiter::SP limiter) : _limiter(std::move(limiter)) {} + virtual ~Callback() { _limiter->endOperation(); } +}; + +bool +MoveOperationLimiter::isOnLimit(const LockGuard &) const +{ + return (_outstandingOps == _maxOutstandingOps); +} + +void +MoveOperationLimiter::endOperation() +{ + LockGuard guard(_mutex); + bool considerUnblock = isOnLimit(guard); + assert(_outstandingOps > 0); + --_outstandingOps; + if (_job && considerUnblock) { + _job->unBlock(BlockedReason::OUTSTANDING_OPS); + } +} + +MoveOperationLimiter::MoveOperationLimiter(IBlockableMaintenanceJob *job, + uint32_t maxOutstandingOps) + : _mutex(), + _job(job), + _outstandingOps(0), + _maxOutstandingOps(maxOutstandingOps) +{ +} + +MoveOperationLimiter::~MoveOperationLimiter() +{ +} + +void +MoveOperationLimiter::clearJob() +{ + LockGuard guard(_mutex); + _job = nullptr; +} + +bool +MoveOperationLimiter::isAboveLimit() const +{ + LockGuard guard(_mutex); + return (_outstandingOps >= _maxOutstandingOps); +} + +std::shared_ptr<search::IDestructorCallback> +MoveOperationLimiter::beginOperation() +{ + LockGuard guard(_mutex); + ++_outstandingOps; + if (_job && isOnLimit(guard)) { + _job->setBlocked(BlockedReason::OUTSTANDING_OPS); + } + MoveOperationLimiter::SP thisPtr = shared_from_this(); + return std::make_shared<Callback>(std::move(thisPtr)); +} + +} diff --git a/searchcore/src/vespa/searchcore/proton/server/move_operation_limiter.h b/searchcore/src/vespa/searchcore/proton/server/move_operation_limiter.h new file mode 100644 index 00000000000..c86ff9fe2a5 --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/server/move_operation_limiter.h @@ -0,0 +1,44 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <vespa/searchlib/common/idestructorcallback.h> +#include <memory> +#include <mutex> + +namespace proton { + +class IBlockableMaintenanceJob; + +/** + * Class used to limit the number of outstanding move operations a blockable maintenance job can have. + * + * When crossing the boundary of max outstanding operations the job is blocked/unblocked. + * Create a destructor callback with beginOperation() and pass this to the component(s) responsible for handling the move operation. + * When this object is destructed (in any thread) the limiter is signaled and the job can be unblocked (if blocked). + */ +class MoveOperationLimiter : public std::enable_shared_from_this<MoveOperationLimiter> { +private: + using SP = std::shared_ptr<MoveOperationLimiter>; + using LockGuard = std::lock_guard<std::mutex>; + + struct Callback; + + mutable std::mutex _mutex; + IBlockableMaintenanceJob *_job; + uint32_t _outstandingOps; + const uint32_t _maxOutstandingOps; + + bool isOnLimit(const LockGuard &guard) const; + void endOperation(); + +public: + MoveOperationLimiter(IBlockableMaintenanceJob *job, + uint32_t maxOutstandingOps); + ~MoveOperationLimiter(); + void clearJob(); + bool isAboveLimit() const; + std::shared_ptr<search::IDestructorCallback> beginOperation(); +}; + + +} |