aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@online.no>2022-01-25 12:53:10 +0100
committerTor Egge <Tor.Egge@online.no>2022-01-25 12:54:27 +0100
commit291daab336706cf85b23f46bda6c2a226700591a (patch)
tree99076a0d50319e47941090de274c4f02a3184574
parent1608bedc17ea020f0a5a4834552aded54e82ab5d (diff)
Throttle replay.
-rw-r--r--searchcore/src/tests/proton/server/feedstates_test.cpp5
-rw-r--r--searchcore/src/vespa/searchcore/config/proton.def8
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/documentdb.cpp17
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/documentdb.h2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp5
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/feedhandler.h4
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/feedstates.cpp20
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/feedstates.h1
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/replay_throttling_policy.h27
9 files changed, 81 insertions, 8 deletions
diff --git a/searchcore/src/tests/proton/server/feedstates_test.cpp b/searchcore/src/tests/proton/server/feedstates_test.cpp
index bb0c4eeb282..e01d90f5eed 100644
--- a/searchcore/src/tests/proton/server/feedstates_test.cpp
+++ b/searchcore/src/tests/proton/server/feedstates_test.cpp
@@ -8,6 +8,7 @@
#include <vespa/searchcore/proton/server/feedstates.h>
#include <vespa/searchcore/proton/server/ireplayconfig.h>
#include <vespa/searchcore/proton/server/memoryconfigstore.h>
+#include <vespa/searchcore/proton/server/replay_throttling_policy.h>
#include <vespa/searchcore/proton/feedoperation/removeoperation.h>
#include <vespa/searchcore/proton/test/dummy_feed_view.h>
#include <vespa/searchlib/common/serialnum.h>
@@ -71,6 +72,7 @@ struct Fixture
MemoryConfigStore config_store;
bucketdb::BucketDBOwner _bucketDB;
bucketdb::BucketDBHandler _bucketDBHandler;
+ ReplayThrottlingPolicy _replay_throttling_policy;
MyIncSerialNum _inc_serial_num;
ReplayTransactionLogState state;
@@ -86,8 +88,9 @@ Fixture::Fixture()
config_store(),
_bucketDB(),
_bucketDBHandler(_bucketDB),
+ _replay_throttling_policy({}),
_inc_serial_num(9u),
- state("doctypename", feed_view_ptr, _bucketDBHandler, replay_config, config_store, _inc_serial_num)
+ state("doctypename", feed_view_ptr, _bucketDBHandler, replay_config, config_store, _replay_throttling_policy, _inc_serial_num)
{
}
Fixture::~Fixture() = default;
diff --git a/searchcore/src/vespa/searchcore/config/proton.def b/searchcore/src/vespa/searchcore/config/proton.def
index 34530afe9f9..ab721536508 100644
--- a/searchcore/src/vespa/searchcore/config/proton.def
+++ b/searchcore/src/vespa/searchcore/config/proton.def
@@ -543,3 +543,11 @@ tensor_implementation enum {TENSOR_ENGINE, FAST_VALUE} default = FAST_VALUE
## Whether to report issues back to the container via protobuf field
forward_issues bool default = true
+
+## Chooses the throttling policy used to control the window size
+## of the SharedOperationThrottler component used by replay feed state.
+replay_throttling_policy.type enum { UNLIMITED, DYNAMIC } default=UNLIMITED
+## Only used if replay_throttling_policy.type == DYNAMIC:
+replay_throttling_policy.min_window_size int default=20
+replay_throttling_policy.max_window_size int default=1000
+replay_throttling_policy.window_size_increment int default=20
diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp
index f052d663ba6..52851ebf031 100644
--- a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp
@@ -12,6 +12,7 @@
#include "idocumentsubdb.h"
#include "maintenance_jobs_injector.h"
#include "reconfig_params.h"
+#include "replay_throttling_policy.h"
#include <vespa/document/repo/documenttyperepo.h>
#include <vespa/metrics/updatehook.h>
#include <vespa/searchcore/proton/attribute/attribute_config_inspector.h>
@@ -77,6 +78,18 @@ makeIndexConfig(const ProtonConfig::Index & cfg) {
return index::IndexConfig(WarmupConfig(vespalib::from_s(cfg.warmup.time), cfg.warmup.unpack), cfg.maxflushed, cfg.cache.size);
}
+ReplayThrottlingPolicy
+make_replay_throttling_policy(const ProtonConfig::ReplayThrottlingPolicy& cfg) {
+ if (cfg.type == ProtonConfig::ReplayThrottlingPolicy::Type::UNLIMITED) {
+ return ReplayThrottlingPolicy({});
+ }
+ vespalib::SharedOperationThrottler::DynamicThrottleParams params;
+ params.min_window_size = cfg.minWindowSize;
+ params.max_window_size = cfg.maxWindowSize;
+ params.window_size_increment = cfg.windowSizeIncrement;
+ return ReplayThrottlingPolicy(params);
+}
+
class MetricsUpdateHook : public metrics::UpdateHook {
DocumentDB &_db;
public:
@@ -190,6 +203,7 @@ DocumentDB::DocumentDB(const vespalib::string &baseDir,
_clusterStateHandler(_writeService.master()),
_bucketHandler(_writeService.master()),
_indexCfg(makeIndexConfig(protonCfg.index)),
+ _replay_throttling_policy(std::make_unique<ReplayThrottlingPolicy>(make_replay_throttling_policy(protonCfg.replayThrottlingPolicy))),
_config_store(std::move(config_store)),
_sessionManager(std::make_shared<matching::SessionManager>(protonCfg.grouping.sessionmanager.maxentries)),
_metricsWireService(metricsWireService),
@@ -720,7 +734,8 @@ DocumentDB::startTransactionLogReplay()
getBackingStore().lastSyncToken(),
oldestFlushedSerial,
newestFlushedSerial,
- *_config_store);
+ *_config_store,
+ *_replay_throttling_policy);
_initGate.countDown();
LOG(debug, "DocumentDB(%s): Database started.", _docTypeName.toString().c_str());
diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.h b/searchcore/src/vespa/searchcore/proton/server/documentdb.h
index 8e8391b2f31..2030e6ffac9 100644
--- a/searchcore/src/vespa/searchcore/proton/server/documentdb.h
+++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.h
@@ -51,6 +51,7 @@ class ExecutorThreadingServiceStats;
class IDocumentDBOwner;
class ISharedThreadingService;
class ITransientResourceUsageProvider;
+class ReplayThrottlingPolicy;
class StatusReport;
struct MetricsWireService;
@@ -104,6 +105,7 @@ private:
ClusterStateHandler _clusterStateHandler;
BucketHandler _bucketHandler;
index::IndexConfig _indexCfg;
+ std::unique_ptr<ReplayThrottlingPolicy> _replay_throttling_policy;
ConfigStore::UP _config_store;
std::shared_ptr<matching::SessionManager> _sessionManager; // TODO: This should not have to be a shared pointer.
MetricsWireService &_metricsWireService;
diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
index 8b99c39dd65..16bd2537813 100644
--- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
@@ -464,13 +464,14 @@ FeedHandler::close()
void
FeedHandler::replayTransactionLog(SerialNum flushedIndexMgrSerial, SerialNum flushedSummaryMgrSerial,
SerialNum oldestFlushedSerial, SerialNum newestFlushedSerial,
- ConfigStore &config_store)
+ ConfigStore &config_store,
+ const ReplayThrottlingPolicy& replay_throttling_policy)
{
(void) newestFlushedSerial;
assert(_activeFeedView);
assert(_bucketDBHandler);
auto state = make_shared<ReplayTransactionLogState>
- (getDocTypeName(), _activeFeedView, *_bucketDBHandler, _replayConfig, config_store, *this);
+ (getDocTypeName(), _activeFeedView, *_bucketDBHandler, _replayConfig, config_store, replay_throttling_policy, *this);
changeFeedState(state);
// Resurrected attribute vector might cause oldestFlushedSerial to
// be lower than _prunedSerialNum, so don't warn for now.
diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h
index 417d9c21548..32a70f7c2b0 100644
--- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h
+++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h
@@ -36,6 +36,7 @@ class IReplayConfig;
class JoinBucketsOperation;
class PutOperation;
class RemoveOperation;
+class ReplayThrottlingPolicy;
class SplitBucketOperation;
class UpdateOperation;
@@ -189,7 +190,8 @@ public:
SerialNum flushedSummaryMgrSerial,
SerialNum oldestFlushedSerial,
SerialNum newestFlushedSerial,
- ConfigStore &config_store);
+ ConfigStore &config_store,
+ const ReplayThrottlingPolicy& replay_throttling_policy);
/**
* Called when a flush is done and allows pruning of the transaction log.
diff --git a/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp b/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp
index 1c8e17fc474..d9fdee85e4a 100644
--- a/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp
@@ -5,6 +5,7 @@
#include "ifeedview.h"
#include "ireplayconfig.h"
#include "replaypacketdispatcher.h"
+#include "replay_throttling_policy.h"
#include <vespa/searchcore/proton/bucketdb/ibucketdbhandler.h>
#include <vespa/searchcore/proton/feedoperation/operations.h>
#include <vespa/searchcore/proton/common/eventlogger.h>
@@ -23,6 +24,7 @@ using search::SerialNum;
using vespalib::Executor;
using vespalib::makeLambdaTask;
using vespalib::IDestructorCallback;
+using vespalib::SharedOperationThrottler;
using vespalib::make_string;
using proton::bucketdb::IBucketDBHandler;
@@ -54,25 +56,36 @@ class TransactionLogReplayPacketHandler : public IReplayPacketHandler {
FeedConfigStore &_config_store;
IIncSerialNum &_inc_serial_num;
CommitTimeTracker _commitTimeTracker;
+ std::unique_ptr<SharedOperationThrottler> _throttler;
+
+ static std::unique_ptr<SharedOperationThrottler> make_throttler(const ReplayThrottlingPolicy& replay_throttling_policy) {
+ auto& params = replay_throttling_policy.get_params();
+ if (!params.has_value()) {
+ return SharedOperationThrottler::make_unlimited_throttler();
+ }
+ return SharedOperationThrottler::make_dynamic_throttler(params.value());
+ }
public:
TransactionLogReplayPacketHandler(IFeedView *& feed_view_ptr,
IBucketDBHandler &bucketDBHandler,
IReplayConfig &replay_config,
FeedConfigStore &config_store,
+ const ReplayThrottlingPolicy& replay_throttling_policy,
IIncSerialNum &inc_serial_num)
: _feed_view_ptr(feed_view_ptr),
_bucketDBHandler(bucketDBHandler),
_replay_config(replay_config),
_config_store(config_store),
_inc_serial_num(inc_serial_num),
- _commitTimeTracker(5ms)
+ _commitTimeTracker(5ms),
+ _throttler(make_throttler(replay_throttling_policy))
{ }
~TransactionLogReplayPacketHandler() override = default;
FeedToken make_replay_feed_token() {
- vespalib::SharedOperationThrottler::Token throttler_token;
+ SharedOperationThrottler::Token throttler_token = _throttler->blocking_acquire_one();
return std::make_shared<feedtoken::ReplayState>(std::move(throttler_token));
}
@@ -180,10 +193,11 @@ ReplayTransactionLogState::ReplayTransactionLogState(
IBucketDBHandler &bucketDBHandler,
IReplayConfig &replay_config,
FeedConfigStore &config_store,
+ const ReplayThrottlingPolicy &replay_throttling_policy,
IIncSerialNum& inc_serial_num)
: FeedState(REPLAY_TRANSACTION_LOG),
_doc_type_name(name),
- _packet_handler(std::make_unique<TransactionLogReplayPacketHandler>(feed_view_ptr, bucketDBHandler, replay_config, config_store, inc_serial_num))
+ _packet_handler(std::make_unique<TransactionLogReplayPacketHandler>(feed_view_ptr, bucketDBHandler, replay_config, config_store, replay_throttling_policy, inc_serial_num))
{ }
ReplayTransactionLogState::~ReplayTransactionLogState() = default;
diff --git a/searchcore/src/vespa/searchcore/proton/server/feedstates.h b/searchcore/src/vespa/searchcore/proton/server/feedstates.h
index 7533d6443a7..fcff28fe481 100644
--- a/searchcore/src/vespa/searchcore/proton/server/feedstates.h
+++ b/searchcore/src/vespa/searchcore/proton/server/feedstates.h
@@ -55,6 +55,7 @@ public:
bucketdb::IBucketDBHandler &bucketDBHandler,
IReplayConfig &replay_config,
FeedConfigStore &config_store,
+ const ReplayThrottlingPolicy &replay_throttling_policy,
IIncSerialNum &inc_serial_num);
~ReplayTransactionLogState() override;
diff --git a/searchcore/src/vespa/searchcore/proton/server/replay_throttling_policy.h b/searchcore/src/vespa/searchcore/proton/server/replay_throttling_policy.h
new file mode 100644
index 00000000000..f3d430010cc
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/proton/server/replay_throttling_policy.h
@@ -0,0 +1,27 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <vespa/vespalib/util/shared_operation_throttler.h>
+#include <optional>
+
+namespace proton {
+
+/*
+ * Policy for replay throttling. If params are set then a dynamic throttler
+ * is used, otherwise an unlimited throttler is used.
+ */
+class ReplayThrottlingPolicy
+{
+ using DynamicThrottleParams = vespalib::SharedOperationThrottler::DynamicThrottleParams;
+ std::optional<DynamicThrottleParams> _params;
+
+public:
+ explicit ReplayThrottlingPolicy(std::optional<DynamicThrottleParams> params)
+ : _params(std::move(params))
+ {
+ }
+ const std::optional<DynamicThrottleParams>& get_params() const noexcept { return _params; }
+};
+
+}