summaryrefslogtreecommitdiffstats
path: root/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'searchcore/src/vespa/searchcore/proton/server/feedstates.cpp')
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/feedstates.cpp21
1 files changed, 14 insertions, 7 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp b/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp
index d2626a0d9f4..1c8e17fc474 100644
--- a/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp
@@ -8,8 +8,10 @@
#include <vespa/searchcore/proton/bucketdb/ibucketdbhandler.h>
#include <vespa/searchcore/proton/feedoperation/operations.h>
#include <vespa/searchcore/proton/common/eventlogger.h>
+#include <vespa/searchcore/proton/common/replay_feedtoken_state.h>
#include <vespa/vespalib/util/idestructorcallback.h>
#include <vespa/vespalib/util/lambdatask.h>
+#include <vespa/vespalib/util/shared_operation_throttler.h>
#include <cassert>
#include <vespa/log/log.h>
@@ -69,14 +71,19 @@ public:
~TransactionLogReplayPacketHandler() override = default;
+ FeedToken make_replay_feed_token() {
+ vespalib::SharedOperationThrottler::Token throttler_token;
+ return std::make_shared<feedtoken::ReplayState>(std::move(throttler_token));
+ }
+
void replay(const PutOperation &op) override {
- _feed_view_ptr->handlePut(FeedToken(), op);
+ _feed_view_ptr->handlePut(make_replay_feed_token(), op);
}
void replay(const RemoveOperation &op) override {
- _feed_view_ptr->handleRemove(FeedToken(), op);
+ _feed_view_ptr->handleRemove(make_replay_feed_token(), op);
}
void replay(const UpdateOperation &op) override {
- _feed_view_ptr->handleUpdate(FeedToken(), op);
+ _feed_view_ptr->handleUpdate(make_replay_feed_token(), op);
}
void replay(const NoopOperation &) override {} // ignored
void replay(const NewConfigOperation &op) override {
@@ -84,7 +91,7 @@ public:
}
void replay(const DeleteBucketOperation &op) override {
- _feed_view_ptr->handleDeleteBucket(op, IDestructorCallback::SP());
+ _feed_view_ptr->handleDeleteBucket(op, make_replay_feed_token());
}
void replay(const SplitBucketOperation &op) override {
_bucketDBHandler.handleSplit(op.getSerialNum(), op.getSource(),
@@ -95,15 +102,15 @@ public:
op.getSource2(), op.getTarget());
}
void replay(const PruneRemovedDocumentsOperation &op) override {
- _feed_view_ptr->handlePruneRemovedDocuments(op, IDestructorCallback::SP());
+ _feed_view_ptr->handlePruneRemovedDocuments(op, make_replay_feed_token());
}
void replay(const MoveOperation &op) override {
- _feed_view_ptr->handleMove(op, IDestructorCallback::SP());
+ _feed_view_ptr->handleMove(op, make_replay_feed_token());
}
void replay(const CreateBucketOperation &) override {
}
void replay(const CompactLidSpaceOperation &op) override {
- _feed_view_ptr->handleCompactLidSpace(op, IDestructorCallback::SP());
+ _feed_view_ptr->handleCompactLidSpace(op, make_replay_feed_token());
}
NewConfigOperation::IStreamHandler &getNewConfigStreamHandler() override {
return _config_store;