diff options
author | Tor Egge <Tor.Egge@online.no> | 2022-01-25 12:53:10 +0100 |
---|---|---|
committer | Tor Egge <Tor.Egge@online.no> | 2022-01-25 12:54:27 +0100 |
commit | 291daab336706cf85b23f46bda6c2a226700591a (patch) | |
tree | 99076a0d50319e47941090de274c4f02a3184574 /searchcore | |
parent | 1608bedc17ea020f0a5a4834552aded54e82ab5d (diff) |
Throttle replay.
Diffstat (limited to 'searchcore')
9 files changed, 81 insertions, 8 deletions
diff --git a/searchcore/src/tests/proton/server/feedstates_test.cpp b/searchcore/src/tests/proton/server/feedstates_test.cpp index bb0c4eeb282..e01d90f5eed 100644 --- a/searchcore/src/tests/proton/server/feedstates_test.cpp +++ b/searchcore/src/tests/proton/server/feedstates_test.cpp @@ -8,6 +8,7 @@ #include <vespa/searchcore/proton/server/feedstates.h> #include <vespa/searchcore/proton/server/ireplayconfig.h> #include <vespa/searchcore/proton/server/memoryconfigstore.h> +#include <vespa/searchcore/proton/server/replay_throttling_policy.h> #include <vespa/searchcore/proton/feedoperation/removeoperation.h> #include <vespa/searchcore/proton/test/dummy_feed_view.h> #include <vespa/searchlib/common/serialnum.h> @@ -71,6 +72,7 @@ struct Fixture MemoryConfigStore config_store; bucketdb::BucketDBOwner _bucketDB; bucketdb::BucketDBHandler _bucketDBHandler; + ReplayThrottlingPolicy _replay_throttling_policy; MyIncSerialNum _inc_serial_num; ReplayTransactionLogState state; @@ -86,8 +88,9 @@ Fixture::Fixture() config_store(), _bucketDB(), _bucketDBHandler(_bucketDB), + _replay_throttling_policy({}), _inc_serial_num(9u), - state("doctypename", feed_view_ptr, _bucketDBHandler, replay_config, config_store, _inc_serial_num) + state("doctypename", feed_view_ptr, _bucketDBHandler, replay_config, config_store, _replay_throttling_policy, _inc_serial_num) { } Fixture::~Fixture() = default; diff --git a/searchcore/src/vespa/searchcore/config/proton.def b/searchcore/src/vespa/searchcore/config/proton.def index 34530afe9f9..ab721536508 100644 --- a/searchcore/src/vespa/searchcore/config/proton.def +++ b/searchcore/src/vespa/searchcore/config/proton.def @@ -543,3 +543,11 @@ tensor_implementation enum {TENSOR_ENGINE, FAST_VALUE} default = FAST_VALUE ## Whether to report issues back to the container via protobuf field forward_issues bool default = true + +## Chooses the throttling policy used to control the window size +## of the SharedOperationThrottler component used by replay feed state. +replay_throttling_policy.type enum { UNLIMITED, DYNAMIC } default=UNLIMITED +## Only used if replay_throttling_policy.type == DYNAMIC: +replay_throttling_policy.min_window_size int default=20 +replay_throttling_policy.max_window_size int default=1000 +replay_throttling_policy.window_size_increment int default=20 diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp index f052d663ba6..52851ebf031 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp @@ -12,6 +12,7 @@ #include "idocumentsubdb.h" #include "maintenance_jobs_injector.h" #include "reconfig_params.h" +#include "replay_throttling_policy.h" #include <vespa/document/repo/documenttyperepo.h> #include <vespa/metrics/updatehook.h> #include <vespa/searchcore/proton/attribute/attribute_config_inspector.h> @@ -77,6 +78,18 @@ makeIndexConfig(const ProtonConfig::Index & cfg) { return index::IndexConfig(WarmupConfig(vespalib::from_s(cfg.warmup.time), cfg.warmup.unpack), cfg.maxflushed, cfg.cache.size); } +ReplayThrottlingPolicy +make_replay_throttling_policy(const ProtonConfig::ReplayThrottlingPolicy& cfg) { + if (cfg.type == ProtonConfig::ReplayThrottlingPolicy::Type::UNLIMITED) { + return ReplayThrottlingPolicy({}); + } + vespalib::SharedOperationThrottler::DynamicThrottleParams params; + params.min_window_size = cfg.minWindowSize; + params.max_window_size = cfg.maxWindowSize; + params.window_size_increment = cfg.windowSizeIncrement; + return ReplayThrottlingPolicy(params); +} + class MetricsUpdateHook : public metrics::UpdateHook { DocumentDB &_db; public: @@ -190,6 +203,7 @@ DocumentDB::DocumentDB(const vespalib::string &baseDir, _clusterStateHandler(_writeService.master()), _bucketHandler(_writeService.master()), _indexCfg(makeIndexConfig(protonCfg.index)), + _replay_throttling_policy(std::make_unique<ReplayThrottlingPolicy>(make_replay_throttling_policy(protonCfg.replayThrottlingPolicy))), _config_store(std::move(config_store)), _sessionManager(std::make_shared<matching::SessionManager>(protonCfg.grouping.sessionmanager.maxentries)), _metricsWireService(metricsWireService), @@ -720,7 +734,8 @@ DocumentDB::startTransactionLogReplay() getBackingStore().lastSyncToken(), oldestFlushedSerial, newestFlushedSerial, - *_config_store); + *_config_store, + *_replay_throttling_policy); _initGate.countDown(); LOG(debug, "DocumentDB(%s): Database started.", _docTypeName.toString().c_str()); diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.h b/searchcore/src/vespa/searchcore/proton/server/documentdb.h index 8e8391b2f31..2030e6ffac9 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentdb.h +++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.h @@ -51,6 +51,7 @@ class ExecutorThreadingServiceStats; class IDocumentDBOwner; class ISharedThreadingService; class ITransientResourceUsageProvider; +class ReplayThrottlingPolicy; class StatusReport; struct MetricsWireService; @@ -104,6 +105,7 @@ private: ClusterStateHandler _clusterStateHandler; BucketHandler _bucketHandler; index::IndexConfig _indexCfg; + std::unique_ptr<ReplayThrottlingPolicy> _replay_throttling_policy; ConfigStore::UP _config_store; std::shared_ptr<matching::SessionManager> _sessionManager; // TODO: This should not have to be a shared pointer. MetricsWireService &_metricsWireService; diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp index 8b99c39dd65..16bd2537813 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp @@ -464,13 +464,14 @@ FeedHandler::close() void FeedHandler::replayTransactionLog(SerialNum flushedIndexMgrSerial, SerialNum flushedSummaryMgrSerial, SerialNum oldestFlushedSerial, SerialNum newestFlushedSerial, - ConfigStore &config_store) + ConfigStore &config_store, + const ReplayThrottlingPolicy& replay_throttling_policy) { (void) newestFlushedSerial; assert(_activeFeedView); assert(_bucketDBHandler); auto state = make_shared<ReplayTransactionLogState> - (getDocTypeName(), _activeFeedView, *_bucketDBHandler, _replayConfig, config_store, *this); + (getDocTypeName(), _activeFeedView, *_bucketDBHandler, _replayConfig, config_store, replay_throttling_policy, *this); changeFeedState(state); // Resurrected attribute vector might cause oldestFlushedSerial to // be lower than _prunedSerialNum, so don't warn for now. diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h index 417d9c21548..32a70f7c2b0 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h +++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h @@ -36,6 +36,7 @@ class IReplayConfig; class JoinBucketsOperation; class PutOperation; class RemoveOperation; +class ReplayThrottlingPolicy; class SplitBucketOperation; class UpdateOperation; @@ -189,7 +190,8 @@ public: SerialNum flushedSummaryMgrSerial, SerialNum oldestFlushedSerial, SerialNum newestFlushedSerial, - ConfigStore &config_store); + ConfigStore &config_store, + const ReplayThrottlingPolicy& replay_throttling_policy); /** * Called when a flush is done and allows pruning of the transaction log. diff --git a/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp b/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp index 1c8e17fc474..d9fdee85e4a 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp @@ -5,6 +5,7 @@ #include "ifeedview.h" #include "ireplayconfig.h" #include "replaypacketdispatcher.h" +#include "replay_throttling_policy.h" #include <vespa/searchcore/proton/bucketdb/ibucketdbhandler.h> #include <vespa/searchcore/proton/feedoperation/operations.h> #include <vespa/searchcore/proton/common/eventlogger.h> @@ -23,6 +24,7 @@ using search::SerialNum; using vespalib::Executor; using vespalib::makeLambdaTask; using vespalib::IDestructorCallback; +using vespalib::SharedOperationThrottler; using vespalib::make_string; using proton::bucketdb::IBucketDBHandler; @@ -54,25 +56,36 @@ class TransactionLogReplayPacketHandler : public IReplayPacketHandler { FeedConfigStore &_config_store; IIncSerialNum &_inc_serial_num; CommitTimeTracker _commitTimeTracker; + std::unique_ptr<SharedOperationThrottler> _throttler; + + static std::unique_ptr<SharedOperationThrottler> make_throttler(const ReplayThrottlingPolicy& replay_throttling_policy) { + auto& params = replay_throttling_policy.get_params(); + if (!params.has_value()) { + return SharedOperationThrottler::make_unlimited_throttler(); + } + return SharedOperationThrottler::make_dynamic_throttler(params.value()); + } public: TransactionLogReplayPacketHandler(IFeedView *& feed_view_ptr, IBucketDBHandler &bucketDBHandler, IReplayConfig &replay_config, FeedConfigStore &config_store, + const ReplayThrottlingPolicy& replay_throttling_policy, IIncSerialNum &inc_serial_num) : _feed_view_ptr(feed_view_ptr), _bucketDBHandler(bucketDBHandler), _replay_config(replay_config), _config_store(config_store), _inc_serial_num(inc_serial_num), - _commitTimeTracker(5ms) + _commitTimeTracker(5ms), + _throttler(make_throttler(replay_throttling_policy)) { } ~TransactionLogReplayPacketHandler() override = default; FeedToken make_replay_feed_token() { - vespalib::SharedOperationThrottler::Token throttler_token; + SharedOperationThrottler::Token throttler_token = _throttler->blocking_acquire_one(); return std::make_shared<feedtoken::ReplayState>(std::move(throttler_token)); } @@ -180,10 +193,11 @@ ReplayTransactionLogState::ReplayTransactionLogState( IBucketDBHandler &bucketDBHandler, IReplayConfig &replay_config, FeedConfigStore &config_store, + const ReplayThrottlingPolicy &replay_throttling_policy, IIncSerialNum& inc_serial_num) : FeedState(REPLAY_TRANSACTION_LOG), _doc_type_name(name), - _packet_handler(std::make_unique<TransactionLogReplayPacketHandler>(feed_view_ptr, bucketDBHandler, replay_config, config_store, inc_serial_num)) + _packet_handler(std::make_unique<TransactionLogReplayPacketHandler>(feed_view_ptr, bucketDBHandler, replay_config, config_store, replay_throttling_policy, inc_serial_num)) { } ReplayTransactionLogState::~ReplayTransactionLogState() = default; diff --git a/searchcore/src/vespa/searchcore/proton/server/feedstates.h b/searchcore/src/vespa/searchcore/proton/server/feedstates.h index 7533d6443a7..fcff28fe481 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedstates.h +++ b/searchcore/src/vespa/searchcore/proton/server/feedstates.h @@ -55,6 +55,7 @@ public: bucketdb::IBucketDBHandler &bucketDBHandler, IReplayConfig &replay_config, FeedConfigStore &config_store, + const ReplayThrottlingPolicy &replay_throttling_policy, IIncSerialNum &inc_serial_num); ~ReplayTransactionLogState() override; diff --git a/searchcore/src/vespa/searchcore/proton/server/replay_throttling_policy.h b/searchcore/src/vespa/searchcore/proton/server/replay_throttling_policy.h new file mode 100644 index 00000000000..f3d430010cc --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/server/replay_throttling_policy.h @@ -0,0 +1,27 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <vespa/vespalib/util/shared_operation_throttler.h> +#include <optional> + +namespace proton { + +/* + * Policy for replay throttling. If params are set then a dynamic throttler + * is used, otherwise an unlimited throttler is used. + */ +class ReplayThrottlingPolicy +{ + using DynamicThrottleParams = vespalib::SharedOperationThrottler::DynamicThrottleParams; + std::optional<DynamicThrottleParams> _params; + +public: + explicit ReplayThrottlingPolicy(std::optional<DynamicThrottleParams> params) + : _params(std::move(params)) + { + } + const std::optional<DynamicThrottleParams>& get_params() const noexcept { return _params; } +}; + +} |