diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-09-01 11:41:00 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2020-09-01 11:41:36 +0000 |
commit | 1a7756513b85d6d1a8ae78c536566845c84c8a77 (patch) | |
tree | 3fa38526685b3a9b3e7a772fa618e80e6adb33f5 /searchcore/src | |
parent | caf58f5b7ed8cdda43ec73706986b9e41dd019ed (diff) |
Rewrite TwoPhaseLidTracker to use sequence number instead of counters for tracking state as suggested by @toregge.
Diffstat (limited to 'searchcore/src')
-rw-r--r-- | searchcore/src/vespa/searchcore/proton/common/pendinglidtracker.cpp | 58 | ||||
-rw-r--r-- | searchcore/src/vespa/searchcore/proton/common/pendinglidtracker.h | 14 |
2 files changed, 31 insertions, 41 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/common/pendinglidtracker.cpp b/searchcore/src/vespa/searchcore/proton/common/pendinglidtracker.cpp index e0a45295dc8..2af6ca861c4 100644 --- a/searchcore/src/vespa/searchcore/proton/common/pendinglidtracker.cpp +++ b/searchcore/src/vespa/searchcore/proton/common/pendinglidtracker.cpp @@ -117,7 +117,12 @@ PendingLidTracker::pendingLids() const { return lids; } -TwoPhasePendingLidTracker::TwoPhasePendingLidTracker() = default; +TwoPhasePendingLidTracker::TwoPhasePendingLidTracker() + : _sequenceId(0), + _lastCommitStarted(0), + _lastCommitCompleted(0), + _pending() +{} TwoPhasePendingLidTracker::~TwoPhasePendingLidTracker() { assert(_pending.empty()); @@ -126,24 +131,19 @@ TwoPhasePendingLidTracker::~TwoPhasePendingLidTracker() { IPendingLidTracker::Token TwoPhasePendingLidTracker::produce(uint32_t lid) { std::lock_guard guard(_mutex); - _pending[lid].inflight_feed++; + _pending[lid] = ++_sequenceId; return Token(lid, *this); } void TwoPhasePendingLidTracker::consume(uint32_t lid) { - std::lock_guard guard(_mutex); - auto found = _pending.find(lid); - assert (found != _pending.end()); - assert (found->second.inflight_feed > 0); - found->second.inflight_feed--; - found->second.need_commit = true; + (void) lid; } ILidCommitState::State TwoPhasePendingLidTracker::waitFor(MonitorGuard & guard, State state, uint32_t lid) const { for (auto found = _pending.find(lid); found != _pending.end(); found = _pending.find(lid)) { if (state == State::NEED_COMMIT) { - if ((found->second.inflight_feed > 0) || found->second.need_commit) { + if (found->second > _lastCommitStarted) { return State::NEED_COMMIT; } return State::WAITING; @@ -154,16 +154,17 @@ TwoPhasePendingLidTracker::waitFor(MonitorGuard & guard, State state, uint32_t l } void -TwoPhasePendingLidTracker::consumeSnapshot(LidList committed) { +TwoPhasePendingLidTracker::consumeSnapshot(uint64_t sequenceIdWhenStarted) { MonitorGuard guard(_mutex); - for (const auto & lid : committed) { - auto found = _pending.find(lid); - assert(found != _pending.end()); - assert(found->second.inflight_commit >= 1); - found->second.inflight_commit --; - if (found->second.empty()) { - _pending.erase(found); - } + assert(sequenceIdWhenStarted >= _lastCommitCompleted); + _lastCommitCompleted = sequenceIdWhenStarted; + std::vector<uint32_t> committed; + for (const auto & entry : _pending) { + if (entry.second <= sequenceIdWhenStarted) + committed.push_back(entry.first); + } + for (uint32_t lid : committed) { + _pending.erase(lid); } _cond.notify_all(); } @@ -184,43 +185,36 @@ namespace common::internal { class CommitList : public PendingLidTrackerBase::Payload { public: using LidList = ILidCommitState::LidList; - CommitList(LidList lids, TwoPhasePendingLidTracker & tracker) + CommitList(uint64_t commitStarted, TwoPhasePendingLidTracker & tracker) : _tracker(&tracker), - _lids(std::move(lids)) + _commitStarted(commitStarted) { } CommitList(const CommitList &) = delete; CommitList & operator = (const CommitList &) = delete; CommitList & operator = (CommitList &&) = delete; CommitList(CommitList && rhs) noexcept : _tracker(rhs._tracker), - _lids(std::move(rhs._lids)) + _commitStarted(rhs._commitStarted) { rhs._tracker = nullptr; } ~CommitList() override { if (_tracker != nullptr) { - _tracker->consumeSnapshot(std::move(_lids)); + _tracker->consumeSnapshot(_commitStarted); } } private: TwoPhasePendingLidTracker * _tracker; - LidList _lids; + uint64_t _commitStarted; }; } PendingLidTrackerBase::Snapshot TwoPhasePendingLidTracker::produceSnapshot() { - LidList toCommit; MonitorGuard guard(_mutex); - for (auto & entry : _pending) { - if (entry.second.need_commit) { - toCommit.emplace_back(entry.first); - entry.second.inflight_commit ++; - entry.second.need_commit = false; - } - } - return std::make_unique<common::internal::CommitList>(std::move(toCommit), *this); + _lastCommitStarted = _sequenceId; + return std::make_unique<common::internal::CommitList>(_lastCommitStarted, *this); } } diff --git a/searchcore/src/vespa/searchcore/proton/common/pendinglidtracker.h b/searchcore/src/vespa/searchcore/proton/common/pendinglidtracker.h index c4ff0d7d639..f1b7a29a37c 100644 --- a/searchcore/src/vespa/searchcore/proton/common/pendinglidtracker.h +++ b/searchcore/src/vespa/searchcore/proton/common/pendinglidtracker.h @@ -132,17 +132,13 @@ public: private: friend common::internal::CommitList; void consume(uint32_t lid) override; - void consumeSnapshot(LidList lids); + void consumeSnapshot(uint64_t sequenceIdWhenStarted); LidList pendingLids() const override; State waitFor(MonitorGuard & guard, State state, uint32_t lid) const override; - struct Counters { - Counters() : inflight_feed(0), inflight_commit(0), need_commit(false) {} - bool empty() const { return (inflight_feed == 0) && ! need_commit && (inflight_commit == 0); } - uint32_t inflight_feed; - uint32_t inflight_commit; - bool need_commit; - }; - vespalib::hash_map<uint32_t, Counters> _pending; + uint64_t _sequenceId; + uint64_t _lastCommitStarted; + uint64_t _lastCommitCompleted; + vespalib::hash_map<uint32_t, uint64_t> _pending; }; } |