diff options
Diffstat (limited to 'searchcore/src/vespa/searchcore/proton/server/feedstates.cpp')
-rw-r--r-- | searchcore/src/vespa/searchcore/proton/server/feedstates.cpp | 21 |
1 files changed, 14 insertions, 7 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp b/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp index d2626a0d9f4..1c8e17fc474 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp @@ -8,8 +8,10 @@ #include <vespa/searchcore/proton/bucketdb/ibucketdbhandler.h> #include <vespa/searchcore/proton/feedoperation/operations.h> #include <vespa/searchcore/proton/common/eventlogger.h> +#include <vespa/searchcore/proton/common/replay_feedtoken_state.h> #include <vespa/vespalib/util/idestructorcallback.h> #include <vespa/vespalib/util/lambdatask.h> +#include <vespa/vespalib/util/shared_operation_throttler.h> #include <cassert> #include <vespa/log/log.h> @@ -69,14 +71,19 @@ public: ~TransactionLogReplayPacketHandler() override = default; + FeedToken make_replay_feed_token() { + vespalib::SharedOperationThrottler::Token throttler_token; + return std::make_shared<feedtoken::ReplayState>(std::move(throttler_token)); + } + void replay(const PutOperation &op) override { - _feed_view_ptr->handlePut(FeedToken(), op); + _feed_view_ptr->handlePut(make_replay_feed_token(), op); } void replay(const RemoveOperation &op) override { - _feed_view_ptr->handleRemove(FeedToken(), op); + _feed_view_ptr->handleRemove(make_replay_feed_token(), op); } void replay(const UpdateOperation &op) override { - _feed_view_ptr->handleUpdate(FeedToken(), op); + _feed_view_ptr->handleUpdate(make_replay_feed_token(), op); } void replay(const NoopOperation &) override {} // ignored void replay(const NewConfigOperation &op) override { @@ -84,7 +91,7 @@ public: } void replay(const DeleteBucketOperation &op) override { - _feed_view_ptr->handleDeleteBucket(op, IDestructorCallback::SP()); + _feed_view_ptr->handleDeleteBucket(op, make_replay_feed_token()); } void replay(const SplitBucketOperation &op) override { _bucketDBHandler.handleSplit(op.getSerialNum(), op.getSource(), @@ -95,15 +102,15 @@ public: op.getSource2(), op.getTarget()); } void replay(const PruneRemovedDocumentsOperation &op) override { - _feed_view_ptr->handlePruneRemovedDocuments(op, IDestructorCallback::SP()); + _feed_view_ptr->handlePruneRemovedDocuments(op, make_replay_feed_token()); } void replay(const MoveOperation &op) override { - _feed_view_ptr->handleMove(op, IDestructorCallback::SP()); + _feed_view_ptr->handleMove(op, make_replay_feed_token()); } void replay(const CreateBucketOperation &) override { } void replay(const CompactLidSpaceOperation &op) override { - _feed_view_ptr->handleCompactLidSpace(op, IDestructorCallback::SP()); + _feed_view_ptr->handleCompactLidSpace(op, make_replay_feed_token()); } NewConfigOperation::IStreamHandler &getNewConfigStreamHandler() override { return _config_store; |