diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2022-03-08 22:57:23 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-03-08 22:57:23 +0100 |
commit | c097f2f22e032aba47e80e5588f24c4da2a14fcc (patch) | |
tree | afc4bc71840243f19a85d1dc1e3e19c0452e33fe | |
parent | 5b7d3fe8f4e6e59e23b24f9acae21fa90eb97ba0 (diff) | |
parent | 6b4190b2582773d9be46677918e79ce980271dbb (diff) |
Merge pull request #21599 from vespa-engine/vekterli/atomic-serial-number-in-feedhandler
Make FeedHandler current serial number atomic with relaxed ops
-rw-r--r-- | searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp | 22 | ||||
-rw-r--r-- | searchcore/src/vespa/searchcore/proton/server/feedhandler.h | 14 |
2 files changed, 22 insertions, 14 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp index e80d8033751..402de8ce7ea 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp @@ -20,6 +20,7 @@ #include <vespa/searchcorespi/index/ithreadingservice.h> #include <vespa/vespalib/util/destructor_callbacks.h> #include <vespa/searchlib/transactionlog/client_session.h> +#include <vespa/vespalib/util/atomic.h> #include <vespa/vespalib/util/exceptions.h> #include <vespa/vespalib/util/lambdatask.h> #include <cassert> @@ -42,6 +43,7 @@ using vespalib::make_string; using std::make_unique; using std::make_shared; using search::CommitParam; +using namespace vespalib::atomic; namespace proton { @@ -305,13 +307,13 @@ void FeedHandler::performEof() { assert(_writeService.master().isCurrentThread()); - _activeFeedView->forceCommitAndWait(CommitParam(_serialNum)); + _activeFeedView->forceCommitAndWait(CommitParam(load_relaxed(_serialNum))); LOG(debug, "Visiting done for transaction log domain '%s', eof received", _tlsMgr.getDomainName().c_str()); // Replay must be complete - if (_replay_end_serial_num != _serialNum) { + if (_replay_end_serial_num != load_relaxed(_serialNum)) { LOG(warning, "Expected replay end serial number %" PRIu64 ", got serial number %" PRIu64, - _replay_end_serial_num, _serialNum); - assert(_replay_end_serial_num == _serialNum); + _replay_end_serial_num, load_relaxed(_serialNum)); + assert(_replay_end_serial_num == load_relaxed(_serialNum)); } _owner.onTransactionLogReplayDone(); _tlsMgr.replayDone(); @@ -444,7 +446,7 @@ void FeedHandler::init(SerialNum oldestConfigSerial) { _tlsMgr.init(oldestConfigSerial, _prunedSerialNum, _replay_end_serial_num); - _serialNum = _prunedSerialNum; + store_relaxed(_serialNum, _prunedSerialNum); if (_tlsWriter == nullptr) { _tlsMgrWriter = std::make_unique<TlsMgrWriter>(_tlsMgr, _tlsWriterfactory); _tlsWriter = _tlsMgrWriter.get(); @@ -458,7 +460,7 @@ void FeedHandler::close() { if (_allowSync) { - syncTls(_serialNum); + syncTls(load_relaxed(_serialNum)); } _allowSync = false; _tlsMgr.close(); @@ -484,8 +486,8 @@ FeedHandler::replayTransactionLog(SerialNum flushedIndexMgrSerial, SerialNum flu TransactionLogManager::prepareReplay(_tlsMgr.getClient(), _docTypeName.getName(), flushedIndexMgrSerial, flushedSummaryMgrSerial, config_store); - _tlsReplayProgress = _tlsMgr.make_replay_progress(_serialNum, _replay_end_serial_num); - _tlsMgr.startReplay(_serialNum, _replay_end_serial_num, *this); + _tlsReplayProgress = _tlsMgr.make_replay_progress(load_relaxed(_serialNum), _replay_end_serial_num); + _tlsMgr.startReplay(load_relaxed(_serialNum), _replay_end_serial_num, *this); } void @@ -549,7 +551,7 @@ FeedHandler::initiateCommit(vespalib::steady_time start_time) { if (_activeFeedView) { using KeepAlivePair = vespalib::KeepAlive<std::pair<CommitResult, DoneCallback>>; auto pair = std::make_pair(std::move(commitResult), std::move(onCommitDoneContext)); - _activeFeedView->forceCommit(CommitParam(_serialNum, CommitParam::UpdateStats::SKIP), std::make_shared<KeepAlivePair>(std::move(pair))); + _activeFeedView->forceCommit(CommitParam(load_relaxed(_serialNum), CommitParam::UpdateStats::SKIP), std::make_shared<KeepAlivePair>(std::move(pair))); } } @@ -773,7 +775,7 @@ FeedHandler::heartBeat() { assert(_writeService.master().isCurrentThread()); _heart_beat_time.store(vespalib::steady_clock::now()); - _activeFeedView->heartBeat(_serialNum, vespalib::IDestructorCallback::SP()); + _activeFeedView->heartBeat(load_relaxed(_serialNum), vespalib::IDestructorCallback::SP()); } FeedHandler::RPC::Result diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h index 32a70f7c2b0..4e9c016af9b 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h +++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h @@ -78,7 +78,7 @@ private: TlsWriter *_tlsWriter; TlsReplayProgress::UP _tlsReplayProgress; // the serial num of the last feed operation processed by feed handler. - SerialNum _serialNum; + std::atomic<SerialNum> _serialNum; // the serial num considered to be fully procssessed and flushed to stable storage. Used to prune transaction log. SerialNum _prunedSerialNum; // the serial num of the last feed operation in the transaction log at startup before replay @@ -215,9 +215,15 @@ public: _bucketDBHandler = bucketDBHandler; } - void setSerialNum(SerialNum serialNum) { _serialNum = serialNum; } - SerialNum inc_serial_num() override { return ++_serialNum; } - SerialNum getSerialNum() const override { return _serialNum; } + // Must only be called from writer thread: + void setSerialNum(SerialNum serialNum) { _serialNum.store(serialNum, std::memory_order_relaxed); } + SerialNum inc_serial_num() override { + const auto post_inc = _serialNum.load(std::memory_order_relaxed) + 1u; + _serialNum.store(post_inc, std::memory_order_relaxed); + return post_inc; + } + // May be called from non-writer threads: + SerialNum getSerialNum() const override { return _serialNum.load(std::memory_order_relaxed); } // The two following methods are used when saving initial config SerialNum get_replay_end_serial_num() const { return _replay_end_serial_num; } SerialNum inc_replay_end_serial_num() { return ++_replay_end_serial_num; } |