diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2021-01-25 14:46:26 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-01-25 14:46:26 +0100 |
commit | 6884c5787c33f94fdd37b0715fb584f55eb7426c (patch) | |
tree | 8810c309db58727d32b7e5e22a1da014c5667662 | |
parent | 96faa92d568a57c67be73a92a131fa908356f4a0 (diff) | |
parent | 7694c1e38cb2c1b52a72b26e4171908169c5bc4a (diff) |
Merge pull request #16195 from vespa-engine/balder/simplify-by-avoiding-closure
Simplify by avoiding closure.
-rw-r--r-- | searchcore/src/vespa/searchcore/proton/server/feedstates.cpp | 63 |
1 files changed, 37 insertions, 26 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp b/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp index 09caf4db1d6..68c47190516 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp @@ -9,7 +9,6 @@ #include <vespa/searchcore/proton/feedoperation/operations.h> #include <vespa/searchcore/proton/common/eventlogger.h> #include <vespa/vespalib/util/idestructorcallback.h> -#include <vespa/vespalib/util/closuretask.h> #include <vespa/vespalib/util/lambdatask.h> #include <cassert> @@ -20,16 +19,13 @@ using search::transactionlog::Packet; using search::transactionlog::client::RPC; using search::SerialNum; using vespalib::Executor; -using vespalib::makeClosure; using vespalib::makeLambdaTask; -using vespalib::makeTask; using vespalib::make_string; using proton::bucketdb::IBucketDBHandler; namespace proton { namespace { -typedef vespalib::Closure1<const Packet::Entry &>::UP EntryHandler; const search::SerialNum REPLAY_PROGRESS_INTERVAL = 50000; @@ -48,22 +44,6 @@ handleProgress(TlsReplayProgress &progress, SerialNum currentSerial) } } -void -handlePacket(PacketWrapper & wrap, EntryHandler entryHandler) -{ - vespalib::nbostream_longlivedbuf handle(wrap.packet.getHandle().data(), wrap.packet.getHandle().size()); - while ( !handle.empty() ) { - Packet::Entry entry; - entry.deserialize(handle); - entryHandler->call(entry); - if (wrap.progress != nullptr) { - handleProgress(*wrap.progress, entry.serial()); - } - } - wrap.result = RPC::OK; - wrap.gate.countDown(); -} - class TransactionLogReplayPacketHandler : public IReplayPacketHandler { IFeedView *& _feed_view_ptr; // Pointer can be changed in executor thread. IBucketDBHandler &_bucketDBHandler; @@ -144,15 +124,44 @@ public: } }; -void startDispatch(IReplayPacketHandler *packet_handler, const Packet::Entry &entry) { +class PacketDispatcher { +public: + PacketDispatcher(IReplayPacketHandler *packet_handler) + : _packet_handler(packet_handler) + {} + + void handlePacket(PacketWrapper & wrap); +private: + void handleEntry(const Packet::Entry &entry); + IReplayPacketHandler *_packet_handler; +}; + +void +PacketDispatcher::handlePacket(PacketWrapper & wrap) +{ + vespalib::nbostream_longlivedbuf handle(wrap.packet.getHandle().data(), wrap.packet.getHandle().size()); + while ( !handle.empty() ) { + Packet::Entry entry; + entry.deserialize(handle); + handleEntry(entry); + if (wrap.progress != nullptr) { + handleProgress(*wrap.progress, entry.serial()); + } + } + wrap.result = RPC::OK; + wrap.gate.countDown(); +} + +void +PacketDispatcher::handleEntry(const Packet::Entry &entry) { // Called by handlePacket() in executor thread. LOG(spam, "replay packet entry: entrySerial(%" PRIu64 "), entryType(%u)", entry.serial(), entry.type()); auto entry_serial_num = entry.serial(); - packet_handler->check_serial_num(entry_serial_num); - ReplayPacketDispatcher dispatcher(*packet_handler); + _packet_handler->check_serial_num(entry_serial_num); + ReplayPacketDispatcher dispatcher(*_packet_handler); dispatcher.replayEntry(entry); - packet_handler->optionalCommit(entry_serial_num); + _packet_handler->optionalCommit(entry_serial_num); } } // namespace @@ -173,8 +182,10 @@ ReplayTransactionLogState::~ReplayTransactionLogState() = default; void ReplayTransactionLogState::receive(const PacketWrapper::SP &wrap, Executor &executor) { - EntryHandler closure = makeClosure(&startDispatch, _packet_handler.get()); - executor.execute(makeLambdaTask([wrap = wrap, dispatch = std::move(closure)] () mutable { handlePacket(*wrap, std::move(dispatch)); })); + executor.execute(makeLambdaTask([this, wrap = wrap] () { + PacketDispatcher dispatcher(_packet_handler.get()); + dispatcher.handlePacket(*wrap); + })); } } // namespace proton |