summaryrefslogtreecommitdiffstats
path: root/searchcore/src
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@online.no>2022-03-02 18:42:52 +0100
committerTor Egge <Tor.Egge@online.no>2022-03-02 18:42:52 +0100
commit5ee26fe45dfaf4c65407b9433ef8b37ea927b80f (patch)
treecf913b27ed22b85b4579d5274e794c45f41860cf /searchcore/src
parent60c5b5e0a91e35e61dc82222df74df2ad42d3caa (diff)
Add replay feed token factory with optional tracking.
Add feed operation type and serial number to replay feed tokens.
Diffstat (limited to 'searchcore/src')
-rw-r--r--searchcore/src/vespa/searchcore/proton/common/CMakeLists.txt1
-rw-r--r--searchcore/src/vespa/searchcore/proton/common/replay_feed_token_factory.cpp57
-rw-r--r--searchcore/src/vespa/searchcore/proton/common/replay_feed_token_factory.h33
-rw-r--r--searchcore/src/vespa/searchcore/proton/common/replay_feedtoken_state.cpp6
-rw-r--r--searchcore/src/vespa/searchcore/proton/common/replay_feedtoken_state.h10
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/feedstates.cpp24
6 files changed, 116 insertions, 15 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/common/CMakeLists.txt b/searchcore/src/vespa/searchcore/proton/common/CMakeLists.txt
index 03bad4c7eaf..3f33871a0e2 100644
--- a/searchcore/src/vespa/searchcore/proton/common/CMakeLists.txt
+++ b/searchcore/src/vespa/searchcore/proton/common/CMakeLists.txt
@@ -18,6 +18,7 @@ vespa_add_library(searchcore_pcommon STATIC
ipendinglidtracker.cpp
operation_rate_tracker.cpp
pendinglidtracker.cpp
+ replay_feed_token_factory.cpp
replay_feedtoken_state.cpp
select_utils.cpp
selectcontext.cpp
diff --git a/searchcore/src/vespa/searchcore/proton/common/replay_feed_token_factory.cpp b/searchcore/src/vespa/searchcore/proton/common/replay_feed_token_factory.cpp
new file mode 100644
index 00000000000..18e6f415f16
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/proton/common/replay_feed_token_factory.cpp
@@ -0,0 +1,57 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "replay_feed_token_factory.h"
+#include "replay_feedtoken_state.h"
+#include <cassert>
+
+namespace proton::feedtoken {
+
+class ReplayFeedTokenFactory::Deleter {
+ ReplayFeedTokenFactory& _factory;
+public:
+ Deleter(ReplayFeedTokenFactory& factory)
+ : _factory(factory)
+ {
+ }
+ void operator()(ReplayState* p) const noexcept {
+ _factory.on_delete(p);
+ delete p;
+ }
+};
+
+ReplayFeedTokenFactory::ReplayFeedTokenFactory(bool enable_tracking)
+ : _lock(),
+ _states(),
+ _enable_tracking(enable_tracking)
+{
+}
+
+ReplayFeedTokenFactory::~ReplayFeedTokenFactory()
+{
+ std::lock_guard guard(_lock);
+ assert(_states.empty());
+}
+
+FeedToken
+ReplayFeedTokenFactory::make_replay_feed_token(ThrottlerToken throttler_token, const FeedOperation& op)
+{
+ if (_enable_tracking) {
+ auto token = std::make_unique<ReplayState>(std::move(throttler_token), op);
+ std::lock_guard guard(_lock);
+ bool inserted = _states.insert(token.get()).second;
+ assert(inserted);
+ return std::shared_ptr<ReplayState>(token.release(), Deleter(*this));
+ } else {
+ return std::make_shared<ReplayState>(std::move(throttler_token), op);
+ }
+}
+
+void
+ReplayFeedTokenFactory::on_delete(const ReplayState* state) noexcept
+{
+ std::lock_guard guard(_lock);
+ bool erased = _states.erase(state) > 0u;
+ assert(erased);
+}
+
+}
diff --git a/searchcore/src/vespa/searchcore/proton/common/replay_feed_token_factory.h b/searchcore/src/vespa/searchcore/proton/common/replay_feed_token_factory.h
new file mode 100644
index 00000000000..7e161ef1773
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/proton/common/replay_feed_token_factory.h
@@ -0,0 +1,33 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <vespa/vespalib/util/shared_operation_throttler.h>
+#include "feedtoken.h"
+#include <mutex>
+#include <unordered_set>
+
+namespace proton { class FeedOperation; }
+
+namespace proton::feedtoken {
+
+class ReplayState;
+
+/*
+ * A factory for replay feed tokens with optional tracking.
+ */
+class ReplayFeedTokenFactory {
+ using ThrottlerToken = vespalib::SharedOperationThrottler::Token;
+ class Deleter;
+
+ std::mutex _lock;
+ std::unordered_set<const ReplayState*> _states;
+ bool _enable_tracking;
+ void on_delete(const ReplayState* state) noexcept;
+public:
+ ReplayFeedTokenFactory(bool enable_tracking);
+ ~ReplayFeedTokenFactory();
+ FeedToken make_replay_feed_token(ThrottlerToken throttler_token, const FeedOperation& op);
+};
+
+}
diff --git a/searchcore/src/vespa/searchcore/proton/common/replay_feedtoken_state.cpp b/searchcore/src/vespa/searchcore/proton/common/replay_feedtoken_state.cpp
index a3a473c9548..2e6469f870f 100644
--- a/searchcore/src/vespa/searchcore/proton/common/replay_feedtoken_state.cpp
+++ b/searchcore/src/vespa/searchcore/proton/common/replay_feedtoken_state.cpp
@@ -4,9 +4,11 @@
namespace proton::feedtoken {
-ReplayState::ReplayState(vespalib::SharedOperationThrottler::Token throttler_token)
+ReplayState::ReplayState(vespalib::SharedOperationThrottler::Token throttler_token, const FeedOperation& op)
: IState(),
- _throttler_token(std::move(throttler_token))
+ _throttler_token(std::move(throttler_token)),
+ _type(op.getType()),
+ _serial_num(op.getSerialNum())
{
}
diff --git a/searchcore/src/vespa/searchcore/proton/common/replay_feedtoken_state.h b/searchcore/src/vespa/searchcore/proton/common/replay_feedtoken_state.h
index 512f12a50af..4ed0986b838 100644
--- a/searchcore/src/vespa/searchcore/proton/common/replay_feedtoken_state.h
+++ b/searchcore/src/vespa/searchcore/proton/common/replay_feedtoken_state.h
@@ -4,6 +4,7 @@
#include "feedtoken.h"
#include <vespa/vespalib/util/shared_operation_throttler.h>
+#include <vespa/searchcore/proton/feedoperation/feedoperation.h>
namespace proton::feedtoken {
@@ -13,10 +14,15 @@ namespace proton::feedtoken {
* of the feed operation.
*/
class ReplayState : public IState {
- vespalib::SharedOperationThrottler::Token _throttler_token;
+ using SerialNum = search::SerialNum;
+ using ThrottlerToken = vespalib::SharedOperationThrottler::Token;
+
+ ThrottlerToken _throttler_token;
+ FeedOperation::Type _type;
+ SerialNum _serial_num;
public:
~ReplayState() override;
- ReplayState(vespalib::SharedOperationThrottler::Token throttler_token);
+ ReplayState(ThrottlerToken throttler_token, const FeedOperation& op);
bool is_replay() const noexcept override;
void fail() override;
void setResult(ResultUP result, bool documentWasFound) override;
diff --git a/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp b/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp
index d9fdee85e4a..db0c1623862 100644
--- a/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp
@@ -9,7 +9,7 @@
#include <vespa/searchcore/proton/bucketdb/ibucketdbhandler.h>
#include <vespa/searchcore/proton/feedoperation/operations.h>
#include <vespa/searchcore/proton/common/eventlogger.h>
-#include <vespa/searchcore/proton/common/replay_feedtoken_state.h>
+#include <vespa/searchcore/proton/common/replay_feed_token_factory.h>
#include <vespa/vespalib/util/idestructorcallback.h>
#include <vespa/vespalib/util/lambdatask.h>
#include <vespa/vespalib/util/shared_operation_throttler.h>
@@ -57,6 +57,7 @@ class TransactionLogReplayPacketHandler : public IReplayPacketHandler {
IIncSerialNum &_inc_serial_num;
CommitTimeTracker _commitTimeTracker;
std::unique_ptr<SharedOperationThrottler> _throttler;
+ std::unique_ptr<feedtoken::ReplayFeedTokenFactory> _replay_feed_token_factory;
static std::unique_ptr<SharedOperationThrottler> make_throttler(const ReplayThrottlingPolicy& replay_throttling_policy) {
auto& params = replay_throttling_policy.get_params();
@@ -79,24 +80,25 @@ public:
_config_store(config_store),
_inc_serial_num(inc_serial_num),
_commitTimeTracker(5ms),
- _throttler(make_throttler(replay_throttling_policy))
+ _throttler(make_throttler(replay_throttling_policy)),
+ _replay_feed_token_factory(std::make_unique<feedtoken::ReplayFeedTokenFactory>(false))
{ }
~TransactionLogReplayPacketHandler() override = default;
- FeedToken make_replay_feed_token() {
+ FeedToken make_replay_feed_token(const FeedOperation& op) {
SharedOperationThrottler::Token throttler_token = _throttler->blocking_acquire_one();
- return std::make_shared<feedtoken::ReplayState>(std::move(throttler_token));
+ return _replay_feed_token_factory->make_replay_feed_token(std::move(throttler_token), op);
}
void replay(const PutOperation &op) override {
- _feed_view_ptr->handlePut(make_replay_feed_token(), op);
+ _feed_view_ptr->handlePut(make_replay_feed_token(op), op);
}
void replay(const RemoveOperation &op) override {
- _feed_view_ptr->handleRemove(make_replay_feed_token(), op);
+ _feed_view_ptr->handleRemove(make_replay_feed_token(op), op);
}
void replay(const UpdateOperation &op) override {
- _feed_view_ptr->handleUpdate(make_replay_feed_token(), op);
+ _feed_view_ptr->handleUpdate(make_replay_feed_token(op), op);
}
void replay(const NoopOperation &) override {} // ignored
void replay(const NewConfigOperation &op) override {
@@ -104,7 +106,7 @@ public:
}
void replay(const DeleteBucketOperation &op) override {
- _feed_view_ptr->handleDeleteBucket(op, make_replay_feed_token());
+ _feed_view_ptr->handleDeleteBucket(op, make_replay_feed_token(op));
}
void replay(const SplitBucketOperation &op) override {
_bucketDBHandler.handleSplit(op.getSerialNum(), op.getSource(),
@@ -115,15 +117,15 @@ public:
op.getSource2(), op.getTarget());
}
void replay(const PruneRemovedDocumentsOperation &op) override {
- _feed_view_ptr->handlePruneRemovedDocuments(op, make_replay_feed_token());
+ _feed_view_ptr->handlePruneRemovedDocuments(op, make_replay_feed_token(op));
}
void replay(const MoveOperation &op) override {
- _feed_view_ptr->handleMove(op, make_replay_feed_token());
+ _feed_view_ptr->handleMove(op, make_replay_feed_token(op));
}
void replay(const CreateBucketOperation &) override {
}
void replay(const CompactLidSpaceOperation &op) override {
- _feed_view_ptr->handleCompactLidSpace(op, make_replay_feed_token());
+ _feed_view_ptr->handleCompactLidSpace(op, make_replay_feed_token(op));
}
NewConfigOperation::IStreamHandler &getNewConfigStreamHandler() override {
return _config_store;