summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-09-01 11:41:00 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2020-09-01 11:41:36 +0000
commit1a7756513b85d6d1a8ae78c536566845c84c8a77 (patch)
tree3fa38526685b3a9b3e7a772fa618e80e6adb33f5
parentcaf58f5b7ed8cdda43ec73706986b9e41dd019ed (diff)
Rewrite TwoPhaseLidTracker to use sequence number instead of counters for tracking state as suggested by @toregge.
-rw-r--r--searchcore/src/vespa/searchcore/proton/common/pendinglidtracker.cpp58
-rw-r--r--searchcore/src/vespa/searchcore/proton/common/pendinglidtracker.h14
2 files changed, 31 insertions, 41 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/common/pendinglidtracker.cpp b/searchcore/src/vespa/searchcore/proton/common/pendinglidtracker.cpp
index e0a45295dc8..2af6ca861c4 100644
--- a/searchcore/src/vespa/searchcore/proton/common/pendinglidtracker.cpp
+++ b/searchcore/src/vespa/searchcore/proton/common/pendinglidtracker.cpp
@@ -117,7 +117,12 @@ PendingLidTracker::pendingLids() const {
return lids;
}
-TwoPhasePendingLidTracker::TwoPhasePendingLidTracker() = default;
+TwoPhasePendingLidTracker::TwoPhasePendingLidTracker()
+ : _sequenceId(0),
+ _lastCommitStarted(0),
+ _lastCommitCompleted(0),
+ _pending()
+{}
TwoPhasePendingLidTracker::~TwoPhasePendingLidTracker() {
assert(_pending.empty());
@@ -126,24 +131,19 @@ TwoPhasePendingLidTracker::~TwoPhasePendingLidTracker() {
IPendingLidTracker::Token
TwoPhasePendingLidTracker::produce(uint32_t lid) {
std::lock_guard guard(_mutex);
- _pending[lid].inflight_feed++;
+ _pending[lid] = ++_sequenceId;
return Token(lid, *this);
}
void
TwoPhasePendingLidTracker::consume(uint32_t lid) {
- std::lock_guard guard(_mutex);
- auto found = _pending.find(lid);
- assert (found != _pending.end());
- assert (found->second.inflight_feed > 0);
- found->second.inflight_feed--;
- found->second.need_commit = true;
+ (void) lid;
}
ILidCommitState::State
TwoPhasePendingLidTracker::waitFor(MonitorGuard & guard, State state, uint32_t lid) const {
for (auto found = _pending.find(lid); found != _pending.end(); found = _pending.find(lid)) {
if (state == State::NEED_COMMIT) {
- if ((found->second.inflight_feed > 0) || found->second.need_commit) {
+ if (found->second > _lastCommitStarted) {
return State::NEED_COMMIT;
}
return State::WAITING;
@@ -154,16 +154,17 @@ TwoPhasePendingLidTracker::waitFor(MonitorGuard & guard, State state, uint32_t l
}
void
-TwoPhasePendingLidTracker::consumeSnapshot(LidList committed) {
+TwoPhasePendingLidTracker::consumeSnapshot(uint64_t sequenceIdWhenStarted) {
MonitorGuard guard(_mutex);
- for (const auto & lid : committed) {
- auto found = _pending.find(lid);
- assert(found != _pending.end());
- assert(found->second.inflight_commit >= 1);
- found->second.inflight_commit --;
- if (found->second.empty()) {
- _pending.erase(found);
- }
+ assert(sequenceIdWhenStarted >= _lastCommitCompleted);
+ _lastCommitCompleted = sequenceIdWhenStarted;
+ std::vector<uint32_t> committed;
+ for (const auto & entry : _pending) {
+ if (entry.second <= sequenceIdWhenStarted)
+ committed.push_back(entry.first);
+ }
+ for (uint32_t lid : committed) {
+ _pending.erase(lid);
}
_cond.notify_all();
}
@@ -184,43 +185,36 @@ namespace common::internal {
class CommitList : public PendingLidTrackerBase::Payload {
public:
using LidList = ILidCommitState::LidList;
- CommitList(LidList lids, TwoPhasePendingLidTracker & tracker)
+ CommitList(uint64_t commitStarted, TwoPhasePendingLidTracker & tracker)
: _tracker(&tracker),
- _lids(std::move(lids))
+ _commitStarted(commitStarted)
{ }
CommitList(const CommitList &) = delete;
CommitList & operator = (const CommitList &) = delete;
CommitList & operator = (CommitList &&) = delete;
CommitList(CommitList && rhs) noexcept
: _tracker(rhs._tracker),
- _lids(std::move(rhs._lids))
+ _commitStarted(rhs._commitStarted)
{
rhs._tracker = nullptr;
}
~CommitList() override {
if (_tracker != nullptr) {
- _tracker->consumeSnapshot(std::move(_lids));
+ _tracker->consumeSnapshot(_commitStarted);
}
}
private:
TwoPhasePendingLidTracker * _tracker;
- LidList _lids;
+ uint64_t _commitStarted;
};
}
PendingLidTrackerBase::Snapshot
TwoPhasePendingLidTracker::produceSnapshot() {
- LidList toCommit;
MonitorGuard guard(_mutex);
- for (auto & entry : _pending) {
- if (entry.second.need_commit) {
- toCommit.emplace_back(entry.first);
- entry.second.inflight_commit ++;
- entry.second.need_commit = false;
- }
- }
- return std::make_unique<common::internal::CommitList>(std::move(toCommit), *this);
+ _lastCommitStarted = _sequenceId;
+ return std::make_unique<common::internal::CommitList>(_lastCommitStarted, *this);
}
}
diff --git a/searchcore/src/vespa/searchcore/proton/common/pendinglidtracker.h b/searchcore/src/vespa/searchcore/proton/common/pendinglidtracker.h
index c4ff0d7d639..f1b7a29a37c 100644
--- a/searchcore/src/vespa/searchcore/proton/common/pendinglidtracker.h
+++ b/searchcore/src/vespa/searchcore/proton/common/pendinglidtracker.h
@@ -132,17 +132,13 @@ public:
private:
friend common::internal::CommitList;
void consume(uint32_t lid) override;
- void consumeSnapshot(LidList lids);
+ void consumeSnapshot(uint64_t sequenceIdWhenStarted);
LidList pendingLids() const override;
State waitFor(MonitorGuard & guard, State state, uint32_t lid) const override;
- struct Counters {
- Counters() : inflight_feed(0), inflight_commit(0), need_commit(false) {}
- bool empty() const { return (inflight_feed == 0) && ! need_commit && (inflight_commit == 0); }
- uint32_t inflight_feed;
- uint32_t inflight_commit;
- bool need_commit;
- };
- vespalib::hash_map<uint32_t, Counters> _pending;
+ uint64_t _sequenceId;
+ uint64_t _lastCommitStarted;
+ uint64_t _lastCommitCompleted;
+ vespalib::hash_map<uint32_t, uint64_t> _pending;
};
}