summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2021-01-25 14:46:26 +0100
committerGitHub <noreply@github.com>2021-01-25 14:46:26 +0100
commit6884c5787c33f94fdd37b0715fb584f55eb7426c (patch)
tree8810c309db58727d32b7e5e22a1da014c5667662
parent96faa92d568a57c67be73a92a131fa908356f4a0 (diff)
parent7694c1e38cb2c1b52a72b26e4171908169c5bc4a (diff)
Merge pull request #16195 from vespa-engine/balder/simplify-by-avoiding-closure
Simplify by avoiding closure.
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/feedstates.cpp63
1 files changed, 37 insertions, 26 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp b/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp
index 09caf4db1d6..68c47190516 100644
--- a/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp
@@ -9,7 +9,6 @@
#include <vespa/searchcore/proton/feedoperation/operations.h>
#include <vespa/searchcore/proton/common/eventlogger.h>
#include <vespa/vespalib/util/idestructorcallback.h>
-#include <vespa/vespalib/util/closuretask.h>
#include <vespa/vespalib/util/lambdatask.h>
#include <cassert>
@@ -20,16 +19,13 @@ using search::transactionlog::Packet;
using search::transactionlog::client::RPC;
using search::SerialNum;
using vespalib::Executor;
-using vespalib::makeClosure;
using vespalib::makeLambdaTask;
-using vespalib::makeTask;
using vespalib::make_string;
using proton::bucketdb::IBucketDBHandler;
namespace proton {
namespace {
-typedef vespalib::Closure1<const Packet::Entry &>::UP EntryHandler;
const search::SerialNum REPLAY_PROGRESS_INTERVAL = 50000;
@@ -48,22 +44,6 @@ handleProgress(TlsReplayProgress &progress, SerialNum currentSerial)
}
}
-void
-handlePacket(PacketWrapper & wrap, EntryHandler entryHandler)
-{
- vespalib::nbostream_longlivedbuf handle(wrap.packet.getHandle().data(), wrap.packet.getHandle().size());
- while ( !handle.empty() ) {
- Packet::Entry entry;
- entry.deserialize(handle);
- entryHandler->call(entry);
- if (wrap.progress != nullptr) {
- handleProgress(*wrap.progress, entry.serial());
- }
- }
- wrap.result = RPC::OK;
- wrap.gate.countDown();
-}
-
class TransactionLogReplayPacketHandler : public IReplayPacketHandler {
IFeedView *& _feed_view_ptr; // Pointer can be changed in executor thread.
IBucketDBHandler &_bucketDBHandler;
@@ -144,15 +124,44 @@ public:
}
};
-void startDispatch(IReplayPacketHandler *packet_handler, const Packet::Entry &entry) {
+class PacketDispatcher {
+public:
+ PacketDispatcher(IReplayPacketHandler *packet_handler)
+ : _packet_handler(packet_handler)
+ {}
+
+ void handlePacket(PacketWrapper & wrap);
+private:
+ void handleEntry(const Packet::Entry &entry);
+ IReplayPacketHandler *_packet_handler;
+};
+
+void
+PacketDispatcher::handlePacket(PacketWrapper & wrap)
+{
+ vespalib::nbostream_longlivedbuf handle(wrap.packet.getHandle().data(), wrap.packet.getHandle().size());
+ while ( !handle.empty() ) {
+ Packet::Entry entry;
+ entry.deserialize(handle);
+ handleEntry(entry);
+ if (wrap.progress != nullptr) {
+ handleProgress(*wrap.progress, entry.serial());
+ }
+ }
+ wrap.result = RPC::OK;
+ wrap.gate.countDown();
+}
+
+void
+PacketDispatcher::handleEntry(const Packet::Entry &entry) {
// Called by handlePacket() in executor thread.
LOG(spam, "replay packet entry: entrySerial(%" PRIu64 "), entryType(%u)", entry.serial(), entry.type());
auto entry_serial_num = entry.serial();
- packet_handler->check_serial_num(entry_serial_num);
- ReplayPacketDispatcher dispatcher(*packet_handler);
+ _packet_handler->check_serial_num(entry_serial_num);
+ ReplayPacketDispatcher dispatcher(*_packet_handler);
dispatcher.replayEntry(entry);
- packet_handler->optionalCommit(entry_serial_num);
+ _packet_handler->optionalCommit(entry_serial_num);
}
} // namespace
@@ -173,8 +182,10 @@ ReplayTransactionLogState::~ReplayTransactionLogState() = default;
void
ReplayTransactionLogState::receive(const PacketWrapper::SP &wrap, Executor &executor) {
- EntryHandler closure = makeClosure(&startDispatch, _packet_handler.get());
- executor.execute(makeLambdaTask([wrap = wrap, dispatch = std::move(closure)] () mutable { handlePacket(*wrap, std::move(dispatch)); }));
+ executor.execute(makeLambdaTask([this, wrap = wrap] () {
+ PacketDispatcher dispatcher(_packet_handler.get());
+ dispatcher.handlePacket(*wrap);
+ }));
}
} // namespace proton