diff options
author | Tor Egge <Tor.Egge@online.no> | 2022-03-02 18:42:52 +0100 |
---|---|---|
committer | Tor Egge <Tor.Egge@online.no> | 2022-03-02 18:42:52 +0100 |
commit | 5ee26fe45dfaf4c65407b9433ef8b37ea927b80f (patch) | |
tree | cf913b27ed22b85b4579d5274e794c45f41860cf /searchcore/src | |
parent | 60c5b5e0a91e35e61dc82222df74df2ad42d3caa (diff) |
Add replay feed token factory with optional tracking.
Add feed operation type and serial number to replay feed tokens.
Diffstat (limited to 'searchcore/src')
6 files changed, 116 insertions, 15 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/common/CMakeLists.txt b/searchcore/src/vespa/searchcore/proton/common/CMakeLists.txt index 03bad4c7eaf..3f33871a0e2 100644 --- a/searchcore/src/vespa/searchcore/proton/common/CMakeLists.txt +++ b/searchcore/src/vespa/searchcore/proton/common/CMakeLists.txt @@ -18,6 +18,7 @@ vespa_add_library(searchcore_pcommon STATIC ipendinglidtracker.cpp operation_rate_tracker.cpp pendinglidtracker.cpp + replay_feed_token_factory.cpp replay_feedtoken_state.cpp select_utils.cpp selectcontext.cpp diff --git a/searchcore/src/vespa/searchcore/proton/common/replay_feed_token_factory.cpp b/searchcore/src/vespa/searchcore/proton/common/replay_feed_token_factory.cpp new file mode 100644 index 00000000000..18e6f415f16 --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/common/replay_feed_token_factory.cpp @@ -0,0 +1,57 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "replay_feed_token_factory.h" +#include "replay_feedtoken_state.h" +#include <cassert> + +namespace proton::feedtoken { + +class ReplayFeedTokenFactory::Deleter { + ReplayFeedTokenFactory& _factory; +public: + Deleter(ReplayFeedTokenFactory& factory) + : _factory(factory) + { + } + void operator()(ReplayState* p) const noexcept { + _factory.on_delete(p); + delete p; + } +}; + +ReplayFeedTokenFactory::ReplayFeedTokenFactory(bool enable_tracking) + : _lock(), + _states(), + _enable_tracking(enable_tracking) +{ +} + +ReplayFeedTokenFactory::~ReplayFeedTokenFactory() +{ + std::lock_guard guard(_lock); + assert(_states.empty()); +} + +FeedToken +ReplayFeedTokenFactory::make_replay_feed_token(ThrottlerToken throttler_token, const FeedOperation& op) +{ + if (_enable_tracking) { + auto token = std::make_unique<ReplayState>(std::move(throttler_token), op); + std::lock_guard guard(_lock); + bool inserted = _states.insert(token.get()).second; + assert(inserted); + return std::shared_ptr<ReplayState>(token.release(), Deleter(*this)); + } else { + return std::make_shared<ReplayState>(std::move(throttler_token), op); + } +} + +void +ReplayFeedTokenFactory::on_delete(const ReplayState* state) noexcept +{ + std::lock_guard guard(_lock); + bool erased = _states.erase(state) > 0u; + assert(erased); +} + +} diff --git a/searchcore/src/vespa/searchcore/proton/common/replay_feed_token_factory.h b/searchcore/src/vespa/searchcore/proton/common/replay_feed_token_factory.h new file mode 100644 index 00000000000..7e161ef1773 --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/common/replay_feed_token_factory.h @@ -0,0 +1,33 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <vespa/vespalib/util/shared_operation_throttler.h> +#include "feedtoken.h" +#include <mutex> +#include <unordered_set> + +namespace proton { class FeedOperation; } + +namespace proton::feedtoken { + +class ReplayState; + +/* + * A factory for replay feed tokens with optional tracking. + */ +class ReplayFeedTokenFactory { + using ThrottlerToken = vespalib::SharedOperationThrottler::Token; + class Deleter; + + std::mutex _lock; + std::unordered_set<const ReplayState*> _states; + bool _enable_tracking; + void on_delete(const ReplayState* state) noexcept; +public: + ReplayFeedTokenFactory(bool enable_tracking); + ~ReplayFeedTokenFactory(); + FeedToken make_replay_feed_token(ThrottlerToken throttler_token, const FeedOperation& op); +}; + +} diff --git a/searchcore/src/vespa/searchcore/proton/common/replay_feedtoken_state.cpp b/searchcore/src/vespa/searchcore/proton/common/replay_feedtoken_state.cpp index a3a473c9548..2e6469f870f 100644 --- a/searchcore/src/vespa/searchcore/proton/common/replay_feedtoken_state.cpp +++ b/searchcore/src/vespa/searchcore/proton/common/replay_feedtoken_state.cpp @@ -4,9 +4,11 @@ namespace proton::feedtoken { -ReplayState::ReplayState(vespalib::SharedOperationThrottler::Token throttler_token) +ReplayState::ReplayState(vespalib::SharedOperationThrottler::Token throttler_token, const FeedOperation& op) : IState(), - _throttler_token(std::move(throttler_token)) + _throttler_token(std::move(throttler_token)), + _type(op.getType()), + _serial_num(op.getSerialNum()) { } diff --git a/searchcore/src/vespa/searchcore/proton/common/replay_feedtoken_state.h b/searchcore/src/vespa/searchcore/proton/common/replay_feedtoken_state.h index 512f12a50af..4ed0986b838 100644 --- a/searchcore/src/vespa/searchcore/proton/common/replay_feedtoken_state.h +++ b/searchcore/src/vespa/searchcore/proton/common/replay_feedtoken_state.h @@ -4,6 +4,7 @@ #include "feedtoken.h" #include <vespa/vespalib/util/shared_operation_throttler.h> +#include <vespa/searchcore/proton/feedoperation/feedoperation.h> namespace proton::feedtoken { @@ -13,10 +14,15 @@ namespace proton::feedtoken { * of the feed operation. */ class ReplayState : public IState { - vespalib::SharedOperationThrottler::Token _throttler_token; + using SerialNum = search::SerialNum; + using ThrottlerToken = vespalib::SharedOperationThrottler::Token; + + ThrottlerToken _throttler_token; + FeedOperation::Type _type; + SerialNum _serial_num; public: ~ReplayState() override; - ReplayState(vespalib::SharedOperationThrottler::Token throttler_token); + ReplayState(ThrottlerToken throttler_token, const FeedOperation& op); bool is_replay() const noexcept override; void fail() override; void setResult(ResultUP result, bool documentWasFound) override; diff --git a/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp b/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp index d9fdee85e4a..db0c1623862 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp @@ -9,7 +9,7 @@ #include <vespa/searchcore/proton/bucketdb/ibucketdbhandler.h> #include <vespa/searchcore/proton/feedoperation/operations.h> #include <vespa/searchcore/proton/common/eventlogger.h> -#include <vespa/searchcore/proton/common/replay_feedtoken_state.h> +#include <vespa/searchcore/proton/common/replay_feed_token_factory.h> #include <vespa/vespalib/util/idestructorcallback.h> #include <vespa/vespalib/util/lambdatask.h> #include <vespa/vespalib/util/shared_operation_throttler.h> @@ -57,6 +57,7 @@ class TransactionLogReplayPacketHandler : public IReplayPacketHandler { IIncSerialNum &_inc_serial_num; CommitTimeTracker _commitTimeTracker; std::unique_ptr<SharedOperationThrottler> _throttler; + std::unique_ptr<feedtoken::ReplayFeedTokenFactory> _replay_feed_token_factory; static std::unique_ptr<SharedOperationThrottler> make_throttler(const ReplayThrottlingPolicy& replay_throttling_policy) { auto& params = replay_throttling_policy.get_params(); @@ -79,24 +80,25 @@ public: _config_store(config_store), _inc_serial_num(inc_serial_num), _commitTimeTracker(5ms), - _throttler(make_throttler(replay_throttling_policy)) + _throttler(make_throttler(replay_throttling_policy)), + _replay_feed_token_factory(std::make_unique<feedtoken::ReplayFeedTokenFactory>(false)) { } ~TransactionLogReplayPacketHandler() override = default; - FeedToken make_replay_feed_token() { + FeedToken make_replay_feed_token(const FeedOperation& op) { SharedOperationThrottler::Token throttler_token = _throttler->blocking_acquire_one(); - return std::make_shared<feedtoken::ReplayState>(std::move(throttler_token)); + return _replay_feed_token_factory->make_replay_feed_token(std::move(throttler_token), op); } void replay(const PutOperation &op) override { - _feed_view_ptr->handlePut(make_replay_feed_token(), op); + _feed_view_ptr->handlePut(make_replay_feed_token(op), op); } void replay(const RemoveOperation &op) override { - _feed_view_ptr->handleRemove(make_replay_feed_token(), op); + _feed_view_ptr->handleRemove(make_replay_feed_token(op), op); } void replay(const UpdateOperation &op) override { - _feed_view_ptr->handleUpdate(make_replay_feed_token(), op); + _feed_view_ptr->handleUpdate(make_replay_feed_token(op), op); } void replay(const NoopOperation &) override {} // ignored void replay(const NewConfigOperation &op) override { @@ -104,7 +106,7 @@ public: } void replay(const DeleteBucketOperation &op) override { - _feed_view_ptr->handleDeleteBucket(op, make_replay_feed_token()); + _feed_view_ptr->handleDeleteBucket(op, make_replay_feed_token(op)); } void replay(const SplitBucketOperation &op) override { _bucketDBHandler.handleSplit(op.getSerialNum(), op.getSource(), @@ -115,15 +117,15 @@ public: op.getSource2(), op.getTarget()); } void replay(const PruneRemovedDocumentsOperation &op) override { - _feed_view_ptr->handlePruneRemovedDocuments(op, make_replay_feed_token()); + _feed_view_ptr->handlePruneRemovedDocuments(op, make_replay_feed_token(op)); } void replay(const MoveOperation &op) override { - _feed_view_ptr->handleMove(op, make_replay_feed_token()); + _feed_view_ptr->handleMove(op, make_replay_feed_token(op)); } void replay(const CreateBucketOperation &) override { } void replay(const CompactLidSpaceOperation &op) override { - _feed_view_ptr->handleCompactLidSpace(op, make_replay_feed_token()); + _feed_view_ptr->handleCompactLidSpace(op, make_replay_feed_token(op)); } NewConfigOperation::IStreamHandler &getNewConfigStreamHandler() override { return _config_store; |