summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2022-03-08 22:57:23 +0100
committerGitHub <noreply@github.com>2022-03-08 22:57:23 +0100
commitc097f2f22e032aba47e80e5588f24c4da2a14fcc (patch)
treeafc4bc71840243f19a85d1dc1e3e19c0452e33fe
parent5b7d3fe8f4e6e59e23b24f9acae21fa90eb97ba0 (diff)
parent6b4190b2582773d9be46677918e79ce980271dbb (diff)
Merge pull request #21599 from vespa-engine/vekterli/atomic-serial-number-in-feedhandler
Make FeedHandler current serial number atomic with relaxed ops
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp22
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/feedhandler.h14
2 files changed, 22 insertions, 14 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
index e80d8033751..402de8ce7ea 100644
--- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
@@ -20,6 +20,7 @@
#include <vespa/searchcorespi/index/ithreadingservice.h>
#include <vespa/vespalib/util/destructor_callbacks.h>
#include <vespa/searchlib/transactionlog/client_session.h>
+#include <vespa/vespalib/util/atomic.h>
#include <vespa/vespalib/util/exceptions.h>
#include <vespa/vespalib/util/lambdatask.h>
#include <cassert>
@@ -42,6 +43,7 @@ using vespalib::make_string;
using std::make_unique;
using std::make_shared;
using search::CommitParam;
+using namespace vespalib::atomic;
namespace proton {
@@ -305,13 +307,13 @@ void
FeedHandler::performEof()
{
assert(_writeService.master().isCurrentThread());
- _activeFeedView->forceCommitAndWait(CommitParam(_serialNum));
+ _activeFeedView->forceCommitAndWait(CommitParam(load_relaxed(_serialNum)));
LOG(debug, "Visiting done for transaction log domain '%s', eof received", _tlsMgr.getDomainName().c_str());
// Replay must be complete
- if (_replay_end_serial_num != _serialNum) {
+ if (_replay_end_serial_num != load_relaxed(_serialNum)) {
LOG(warning, "Expected replay end serial number %" PRIu64 ", got serial number %" PRIu64,
- _replay_end_serial_num, _serialNum);
- assert(_replay_end_serial_num == _serialNum);
+ _replay_end_serial_num, load_relaxed(_serialNum));
+ assert(_replay_end_serial_num == load_relaxed(_serialNum));
}
_owner.onTransactionLogReplayDone();
_tlsMgr.replayDone();
@@ -444,7 +446,7 @@ void
FeedHandler::init(SerialNum oldestConfigSerial)
{
_tlsMgr.init(oldestConfigSerial, _prunedSerialNum, _replay_end_serial_num);
- _serialNum = _prunedSerialNum;
+ store_relaxed(_serialNum, _prunedSerialNum);
if (_tlsWriter == nullptr) {
_tlsMgrWriter = std::make_unique<TlsMgrWriter>(_tlsMgr, _tlsWriterfactory);
_tlsWriter = _tlsMgrWriter.get();
@@ -458,7 +460,7 @@ void
FeedHandler::close()
{
if (_allowSync) {
- syncTls(_serialNum);
+ syncTls(load_relaxed(_serialNum));
}
_allowSync = false;
_tlsMgr.close();
@@ -484,8 +486,8 @@ FeedHandler::replayTransactionLog(SerialNum flushedIndexMgrSerial, SerialNum flu
TransactionLogManager::prepareReplay(_tlsMgr.getClient(), _docTypeName.getName(),
flushedIndexMgrSerial, flushedSummaryMgrSerial, config_store);
- _tlsReplayProgress = _tlsMgr.make_replay_progress(_serialNum, _replay_end_serial_num);
- _tlsMgr.startReplay(_serialNum, _replay_end_serial_num, *this);
+ _tlsReplayProgress = _tlsMgr.make_replay_progress(load_relaxed(_serialNum), _replay_end_serial_num);
+ _tlsMgr.startReplay(load_relaxed(_serialNum), _replay_end_serial_num, *this);
}
void
@@ -549,7 +551,7 @@ FeedHandler::initiateCommit(vespalib::steady_time start_time) {
if (_activeFeedView) {
using KeepAlivePair = vespalib::KeepAlive<std::pair<CommitResult, DoneCallback>>;
auto pair = std::make_pair(std::move(commitResult), std::move(onCommitDoneContext));
- _activeFeedView->forceCommit(CommitParam(_serialNum, CommitParam::UpdateStats::SKIP), std::make_shared<KeepAlivePair>(std::move(pair)));
+ _activeFeedView->forceCommit(CommitParam(load_relaxed(_serialNum), CommitParam::UpdateStats::SKIP), std::make_shared<KeepAlivePair>(std::move(pair)));
}
}
@@ -773,7 +775,7 @@ FeedHandler::heartBeat()
{
assert(_writeService.master().isCurrentThread());
_heart_beat_time.store(vespalib::steady_clock::now());
- _activeFeedView->heartBeat(_serialNum, vespalib::IDestructorCallback::SP());
+ _activeFeedView->heartBeat(load_relaxed(_serialNum), vespalib::IDestructorCallback::SP());
}
FeedHandler::RPC::Result
diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h
index 32a70f7c2b0..4e9c016af9b 100644
--- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h
+++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h
@@ -78,7 +78,7 @@ private:
TlsWriter *_tlsWriter;
TlsReplayProgress::UP _tlsReplayProgress;
// the serial num of the last feed operation processed by feed handler.
- SerialNum _serialNum;
+ std::atomic<SerialNum> _serialNum;
// the serial num considered to be fully procssessed and flushed to stable storage. Used to prune transaction log.
SerialNum _prunedSerialNum;
// the serial num of the last feed operation in the transaction log at startup before replay
@@ -215,9 +215,15 @@ public:
_bucketDBHandler = bucketDBHandler;
}
- void setSerialNum(SerialNum serialNum) { _serialNum = serialNum; }
- SerialNum inc_serial_num() override { return ++_serialNum; }
- SerialNum getSerialNum() const override { return _serialNum; }
+ // Must only be called from writer thread:
+ void setSerialNum(SerialNum serialNum) { _serialNum.store(serialNum, std::memory_order_relaxed); }
+ SerialNum inc_serial_num() override {
+ const auto post_inc = _serialNum.load(std::memory_order_relaxed) + 1u;
+ _serialNum.store(post_inc, std::memory_order_relaxed);
+ return post_inc;
+ }
+ // May be called from non-writer threads:
+ SerialNum getSerialNum() const override { return _serialNum.load(std::memory_order_relaxed); }
// The two following methods are used when saving initial config
SerialNum get_replay_end_serial_num() const { return _replay_end_serial_num; }
SerialNum inc_replay_end_serial_num() { return ++_replay_end_serial_num; }