diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-08-25 09:43:41 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2020-08-25 09:43:41 +0000 |
commit | 4856dbea48260e229f183b819094b2c0f7d140bf (patch) | |
tree | 50aa2165ebd8d4f30474516194bbb802f7de4ea6 /searchcore | |
parent | 0efc67578a8f0ef0d74e965ab156b618fea3fc5a (diff) |
Use two stage lid tracking.
Diffstat (limited to 'searchcore')
25 files changed, 350 insertions, 150 deletions
diff --git a/searchcore/src/tests/proton/common/CMakeLists.txt b/searchcore/src/tests/proton/common/CMakeLists.txt index da0bc9b5b10..30ec8c035ad 100644 --- a/searchcore/src/tests/proton/common/CMakeLists.txt +++ b/searchcore/src/tests/proton/common/CMakeLists.txt @@ -15,3 +15,10 @@ vespa_add_executable(searchcore_cachedselect_test_app TEST searchlib_test ) vespa_add_test(NAME searchcore_cachedselect_test_app COMMAND searchcore_cachedselect_test_app) +vespa_add_executable(pendinglidtracker_test_app TEST + SOURCES + pendinglidtracker_test.cpp + DEPENDS + searchcore_pcommon +) +vespa_add_test(NAME pendinglidtracker_test_app COMMAND pendinglidtracker_test_app) diff --git a/searchcore/src/tests/proton/common/pendinglidtracker_test.cpp b/searchcore/src/tests/proton/common/pendinglidtracker_test.cpp new file mode 100644 index 00000000000..07d8caab3c4 --- /dev/null +++ b/searchcore/src/tests/proton/common/pendinglidtracker_test.cpp @@ -0,0 +1,9 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/vespalib/testkit/testapp.h> +#include <vespa/searchcore/proton/common/pendinglidtracker.h> + +#include <vespa/log/log.h> +LOG_SETUP("pendinglidtracker_test"); + +TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/searchcore/src/tests/proton/document_iterator/document_iterator_test.cpp b/searchcore/src/tests/proton/document_iterator/document_iterator_test.cpp index e607513103b..1579b8ec91a 100644 --- a/searchcore/src/tests/proton/document_iterator/document_iterator_test.cpp +++ b/searchcore/src/tests/proton/document_iterator/document_iterator_test.cpp @@ -279,11 +279,11 @@ struct Committer : public ICommitable { size_t _commitAndWaitCount; Committer() : _commitCount(0), _commitAndWaitCount(0) { } void commit() override { _commitCount++; } - void commitAndWait(IPendingLidTracker &) override { _commitAndWaitCount++; } - void commitAndWait(IPendingLidTracker & tracker, uint32_t ) override { + void commitAndWait(ILidCommitState &) override { _commitAndWaitCount++; } + void commitAndWait(ILidCommitState & tracker, uint32_t ) override { commitAndWait(tracker); } - void commitAndWait(IPendingLidTracker & tracker, const std::vector<uint32_t> & ) override { + void commitAndWait(ILidCommitState & tracker, const std::vector<uint32_t> & ) override { commitAndWait(tracker); } }; diff --git a/searchcore/src/tests/proton/documentdb/document_subdbs/document_subdbs_test.cpp b/searchcore/src/tests/proton/documentdb/document_subdbs/document_subdbs_test.cpp index 21b69678460..43c018d1fbe 100644 --- a/searchcore/src/tests/proton/documentdb/document_subdbs/document_subdbs_test.cpp +++ b/searchcore/src/tests/proton/documentdb/document_subdbs/document_subdbs_test.cpp @@ -267,11 +267,11 @@ struct Committer : public ICommitable { size_t _commitAndWaitCount; Committer() : _commitCount(0), _commitAndWaitCount(0) { } void commit() override { _commitCount++; } - void commitAndWait(IPendingLidTracker & ) override { _commitAndWaitCount++; } - void commitAndWait(IPendingLidTracker & tracker, uint32_t ) override { + void commitAndWait(ILidCommitState & ) override { _commitAndWaitCount++; } + void commitAndWait(ILidCommitState & tracker, uint32_t ) override { commitAndWait(tracker); } - void commitAndWait(IPendingLidTracker & tracker, const std::vector<uint32_t> & ) override { + void commitAndWait(ILidCommitState & tracker, const std::vector<uint32_t> & ) override { commitAndWait(tracker); } }; diff --git a/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp b/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp index 272d1bfbd51..814e7ddeab9 100644 --- a/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp +++ b/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp @@ -386,11 +386,11 @@ public: void syncSubDBs(); void commit() override { } - void commitAndWait(IPendingLidTracker & ) override { } - void commitAndWait(IPendingLidTracker & tracker, uint32_t ) override { + void commitAndWait(ILidCommitState & ) override { } + void commitAndWait(ILidCommitState & tracker, uint32_t ) override { commitAndWait(tracker); } - void commitAndWait(IPendingLidTracker & tracker, const std::vector<uint32_t> & ) override { + void commitAndWait(ILidCommitState & tracker, const std::vector<uint32_t> & ) override { commitAndWait(tracker); } void performSyncSubDBs(); diff --git a/searchcore/src/tests/proton/server/visibility_handler/visibility_handler_test.cpp b/searchcore/src/tests/proton/server/visibility_handler/visibility_handler_test.cpp index 56d82ed8695..1d69b7cec3c 100644 --- a/searchcore/src/tests/proton/server/visibility_handler/visibility_handler_test.cpp +++ b/searchcore/src/tests/proton/server/visibility_handler/visibility_handler_test.cpp @@ -40,7 +40,7 @@ class MyFeedView : public DummyFeedView uint32_t _forceCommitCount; SerialNum _committedSerialNum; public: - proton::PendingLidTracker _tracker; + std::unique_ptr<proton::PendingLidTrackerBase> _tracker; MyFeedView() @@ -48,11 +48,20 @@ public: _committedSerialNum(0u) {} + void setTracker(vespalib::duration visibilityDelay) { + if (visibilityDelay == vespalib::duration::zero()) { + _tracker = std::make_unique<proton::PendingLidTracker>(); + } else { + _tracker = std::make_unique<proton::TwoPhasePendingLidTracker>(); + } + } + void forceCommit(SerialNum serialNum) override { EXPECT_TRUE(serialNum >= _committedSerialNum); _committedSerialNum = serialNum; ++_forceCommitCount; + _tracker->produceSnapshot(); } uint32_t getForceCommitCount() const { return _forceCommitCount; } @@ -85,25 +94,22 @@ public: void checkCommitPostCondition(uint32_t expForceCommitCount, SerialNum expCommittedSerialNum, - uint32_t expMasterExecuteCnt, - uint32_t expAttributeFieldWriterSyncCnt) + uint32_t expMasterExecuteCnt) { EXPECT_EQUAL(expForceCommitCount, _feedViewReal->getForceCommitCount()); EXPECT_EQUAL(expCommittedSerialNum, _feedViewReal->getCommittedSerialNum()); EXPECT_EQUAL(expMasterExecuteCnt, _writeService.masterObserver().getExecuteCnt()); - EXPECT_EQUAL(expAttributeFieldWriterSyncCnt, - _writeService.attributeFieldWriterObserver().getSyncCnt()); } void testCommit(vespalib::duration visibilityDelay, bool internal, uint32_t expForceCommitCount, SerialNum expCommittedSerialNum, uint32_t expMasterExecuteCnt, - uint32_t expAttributeFieldWriterSyncCnt, SerialNum currSerialNum = 10u) { + _feedViewReal->setTracker(visibilityDelay); _getSerialNum.setSerialNum(currSerialNum); _visibilityHandler.setVisibilityDelay(visibilityDelay); if (internal) { @@ -116,8 +122,16 @@ public: _writeService.master().sync(); checkCommitPostCondition(expForceCommitCount, expCommittedSerialNum, - expMasterExecuteCnt, - expAttributeFieldWriterSyncCnt); + expMasterExecuteCnt); + } + + proton::PendingLidTracker::Token + createToken(proton::PendingLidTrackerBase & tracker, SerialNum serialNum, uint32_t lid) { + if (serialNum == 0) { + return proton::PendingLidTracker::Token(); + } else { + return tracker.produce(lid);; + } } void @@ -125,16 +139,16 @@ public: uint32_t expForceCommitCount, SerialNum expCommittedSerialNum, uint32_t expMasterExecuteCnt, - uint32_t expAttributeFieldWriterSyncCnt, SerialNum currSerialNum = 10u) { + _feedViewReal->setTracker(visibilityDelay); _getSerialNum.setSerialNum(currSerialNum); _visibilityHandler.setVisibilityDelay(visibilityDelay); constexpr uint32_t MY_LID=13; - if (currSerialNum != 0) { - _feedViewReal->_tracker.produce(MY_LID); + proton::PendingLidTrackerBase * lidTracker = _feedViewReal->_tracker.get(); + { + proton::PendingLidTracker::Token token = createToken(*lidTracker, currSerialNum, MY_LID); } - proton::PendingLidTracker * lidTracker = & _feedViewReal->_tracker; if (internal) { VisibilityHandler *visibilityHandler = &_visibilityHandler; auto task = makeLambdaTask([=]() { visibilityHandler->commitAndWait(*lidTracker, MY_LID); }); @@ -145,8 +159,7 @@ public: } checkCommitPostCondition(expForceCommitCount, expCommittedSerialNum, - expMasterExecuteCnt, - expAttributeFieldWriterSyncCnt); + expMasterExecuteCnt); } }; @@ -154,62 +167,62 @@ public: TEST_F("Check external commit with zero visibility delay", Fixture) { - f.testCommit(0s, false, 0u, 0u, 0u, 0u); + f.testCommit(0s, false, 0u, 0u, 0u); } TEST_F("Check external commit with nonzero visibility delay", Fixture) { - f.testCommit(1s, false, 1u, 10u, 1u, 0u); + f.testCommit(1s, false, 1u, 10u, 1u); } TEST_F("Check external commit with nonzero visibility delay and no new feed operation", Fixture) { - f.testCommit(1s, false, 1u, 0u, 1u, 0u, 0u); + f.testCommit(1s, false, 1u, 0u, 1u, 0u); } TEST_F("Check internal commit with zero visibility delay", Fixture) { - f.testCommit(0s, true, 0u, 0u, 1u, 0u); + f.testCommit(0s, true, 0u, 0u, 1u); } TEST_F("Check internal commit with nonzero visibility delay", Fixture) { - f.testCommit(1s, true, 1u, 10u, 1u, 0u); + f.testCommit(1s, true, 1u, 10u, 1u); } TEST_F("Check internal commit with nonzero visibility delay and no new feed operation", Fixture) { - f.testCommit(1s, true, 1u, 0u, 1u, 0u, 0u); + f.testCommit(1s, true, 1u, 0u, 1u, 0u); } TEST_F("Check external commitAndWait with zero visibility delay", Fixture) { - f.testCommitAndWait(0s, false, 0u, 0u, 0u, 1u); + f.testCommitAndWait(0s, false, 0u, 0u, 0u); } TEST_F("Check external commitAndWait with nonzero visibility delay", Fixture) { - f.testCommitAndWait(1s, false, 1u, 10u, 1u, 1u); + f.testCommitAndWait(1s, false, 1u, 10u, 1u); } TEST_F("Check external commitAndWait with nonzero visibility delay and no new feed operation", Fixture) { - f.testCommitAndWait(1s, false, 0u, 0u, 0u, 1u, 0u); + f.testCommitAndWait(1s, false, 0u, 0u, 0u, 0u); } TEST_F("Check internal commitAndWait with zero visibility delay", Fixture) { - f.testCommitAndWait(0s, true, 0u, 0u, 1u, 1u); + f.testCommitAndWait(0s, true, 0u, 0u, 1u); } TEST_F("Check internal commitAndWait with nonzero visibility delay", Fixture) { - f.testCommitAndWait(1s, true, 1u, 10u, 1u, 1u); + f.testCommitAndWait(1s, true, 1u, 10u, 1u); } TEST_F("Check internal commitAndWait with nonzero visibility delay and no new feed operation", Fixture) { - f.testCommitAndWait(1s, true, 0u, 0u, 1u, 1u, 0u); + f.testCommitAndWait(1s, true, 0u, 0u, 1u, 0u); } TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/searchcore/src/vespa/searchcore/proton/common/icommitable.h b/searchcore/src/vespa/searchcore/proton/common/icommitable.h index 74d90c5c6e9..55762a69862 100644 --- a/searchcore/src/vespa/searchcore/proton/common/icommitable.h +++ b/searchcore/src/vespa/searchcore/proton/common/icommitable.h @@ -5,7 +5,7 @@ #include <vector> namespace proton { -class IPendingLidTracker; +class ILidCommitState; /** * Interface for anyone that needs to commit. @@ -13,9 +13,9 @@ class IPendingLidTracker; class ICommitable { public: virtual void commit() = 0; - virtual void commitAndWait(IPendingLidTracker & unCommittedLidTracker) = 0; - virtual void commitAndWait(IPendingLidTracker &uncommittedLidTracker, uint32_t lid) = 0; - virtual void commitAndWait(IPendingLidTracker &uncommittedLidTracker, const std::vector<uint32_t> & lid) = 0; + virtual void commitAndWait(ILidCommitState & unCommittedLidTracker) = 0; + virtual void commitAndWait(ILidCommitState &uncommittedLidTracker, uint32_t lid) = 0; + virtual void commitAndWait(ILidCommitState &uncommittedLidTracker, const std::vector<uint32_t> & lid) = 0; protected: virtual ~ICommitable() = default; }; diff --git a/searchcore/src/vespa/searchcore/proton/common/pendinglidtracker.cpp b/searchcore/src/vespa/searchcore/proton/common/pendinglidtracker.cpp index 41783f02a60..0419bd646be 100644 --- a/searchcore/src/vespa/searchcore/proton/common/pendinglidtracker.cpp +++ b/searchcore/src/vespa/searchcore/proton/common/pendinglidtracker.cpp @@ -7,16 +7,14 @@ namespace proton { -IPendingLidTracker::Token::Token(uint32_t lid, IPendingLidTracker &tracker) - : _tracker(&tracker), - _lid(lid) -{} - IPendingLidTracker::Token::Token() : _tracker(nullptr), _lid(0u) {} - +IPendingLidTracker::Token::Token(uint32_t lid, IPendingLidTracker &tracker) + : _tracker(&tracker), + _lid(lid) +{} IPendingLidTracker::Token::~Token() { if (_tracker != nullptr) { @@ -24,9 +22,30 @@ IPendingLidTracker::Token::~Token() { } } -IPendingLidTracker::Token -NoopLidTracker::produce(uint32_t) { - return Token(); +bool +ILidCommitState::needCommit(uint32_t lid) { + return waitState(State::NEED_COMMIT, lid) == State::NEED_COMMIT; +} +bool +ILidCommitState::needCommit(const std::vector<uint32_t> & lids) { + return waitState(State::NEED_COMMIT, lids) == State::NEED_COMMIT; +} +bool +ILidCommitState::needCommit() { + return waitState(State::NEED_COMMIT) == State::NEED_COMMIT; +} + +void +ILidCommitState::waitComplete(uint32_t lid) { + waitState(State::COMPLETE, lid); +} +void +ILidCommitState::waitComplete(const std::vector<uint32_t> & lids) { + waitState(State::COMPLETE, lids); +} +void +ILidCommitState::waitComplete() { + waitState(State::COMPLETE); } PendingLidTracker::PendingLidTracker() @@ -39,7 +58,7 @@ PendingLidTracker::~PendingLidTracker() { assert(_pending.empty()); } -PendingLidTracker::Token +IPendingLidTracker::Token PendingLidTracker::produce(uint32_t lid) { std::lock_guard guard(_mutex); _pending[lid]++; @@ -59,54 +78,156 @@ PendingLidTracker::consume(uint32_t lid) { } } -void -PendingLidTracker::waitFor(MonitorGuard & guard, uint32_t lid) { +ILidCommitState::State +PendingLidTracker::waitFor(MonitorGuard & guard, State, uint32_t lid) { while (_pending.find(lid) != _pending.end()) { _cond.wait(guard); } + return State::COMPLETE; } -void -PendingLidTracker::waitForEmpty() { +ILidCommitState::State +PendingLidTracker::waitState(State) { MonitorGuard guard(_mutex); while ( ! _pending.empty() ) { _cond.wait(guard); } + return State::COMPLETE; } -void -PendingLidTracker::waitForConsumed(uint32_t lid) { +ILidCommitState::State +PendingLidTracker::waitState(State state, uint32_t lid) { MonitorGuard guard(_mutex); - waitFor(guard, lid); + return waitFor(guard, state, lid); } -void -PendingLidTracker::waitForConsumed(const std::vector<uint32_t> & lids) { +ILidCommitState::State +PendingLidTracker::waitState(State state, const std::vector<uint32_t> & lids) { MonitorGuard guard(_mutex); for (uint32_t lid : lids) { - waitFor(guard, lid); + if ((waitFor(guard, state, lid) == state) && (state == State::NEED_COMMIT)) { + return State::NEED_COMMIT; + } } + return State::COMPLETE; } -bool -PendingLidTracker::isInFlight(uint32_t lid) { +PendingLidTrackerBase::Snapshot +PendingLidTracker::produceSnapshot() { + return Snapshot(); +} + +TwoPhasePendingLidTracker::TwoPhasePendingLidTracker() + : _mutex(), + _cond(), + _pending() +{} + +TwoPhasePendingLidTracker::~TwoPhasePendingLidTracker() { + assert(_pending.empty()); +} + +IPendingLidTracker::Token +TwoPhasePendingLidTracker::produce(uint32_t lid) { + std::lock_guard guard(_mutex); + _pending[lid].feed++; + return Token(lid, *this); +} +void +TwoPhasePendingLidTracker::consume(uint32_t lid) { + std::lock_guard guard(_mutex); + auto found = _pending.find(lid); + assert (found != _pending.end()); + assert (found->second.feed > 0); + found->second.feed--; + found->second.commit++; +} + +ILidCommitState::State +TwoPhasePendingLidTracker::waitFor(MonitorGuard & guard, State state, uint32_t lid) { + for (auto found = _pending.find(lid); found != _pending.end(); found = _pending.find(lid)) { + if ((state == State::NEED_COMMIT) && ((found->second.feed + found->second.commit) > 0)) { + return State::NEED_COMMIT; + } + _cond.wait(guard); + } + return State::COMPLETE; +} + +ILidCommitState::State +TwoPhasePendingLidTracker::waitState(State state) { MonitorGuard guard(_mutex); - return _pending.find(lid) != _pending.end(); + while ( ! _pending.empty() ) { + for (const auto & entry : _pending) { + if ((waitFor(guard, state, entry.first) == state) && (state == State::NEED_COMMIT)) { + return State::NEED_COMMIT; + } + } + } + return State::COMPLETE; } -bool -PendingLidTracker::areAnyInFlight(const std::vector<uint32_t> & lids) { +ILidCommitState::State +TwoPhasePendingLidTracker::waitState(State state, uint32_t lid) { MonitorGuard guard(_mutex); - return std::any_of(lids.begin(), lids.end(), - [this](uint32_t lid) { - return _pending.find(lid) == _pending.end(); - }); + return waitFor(guard, state, lid); } -bool -PendingLidTracker::areAnyInFlight() { +ILidCommitState::State +TwoPhasePendingLidTracker::waitState(State state, const std::vector<uint32_t> & lids) { MonitorGuard guard(_mutex); - return !_pending.empty(); + for (uint32_t lid : lids) { + if ((waitFor(guard, state, lid) == state) && (state == State::NEED_COMMIT)) { + return State::NEED_COMMIT; + } + } + return State::COMPLETE; +} + +PendingLidTrackerBase::Snapshot +TwoPhasePendingLidTracker::produceSnapshot() { + List toCommit; + MonitorGuard guard(_mutex); + for (auto & entry : _pending) { + if (entry.second.commit > 0) { + toCommit.emplace_back(entry.first, entry.second.commit); + entry.second.done += entry.second.commit; + entry.second.commit = 0; + } + } + return std::make_unique<CommitList>(std::move(toCommit), *this); +} + +void +TwoPhasePendingLidTracker::consumeSnapshot(List committed) { + MonitorGuard guard(_mutex); + for (const auto & entry : committed) { + auto found = _pending.find(entry.first); + assert(found != _pending.end()); + assert(found->second.done >= entry.second); + found->second.done -= entry.second; + if (found->second.empty()) { + _pending.erase(found); + } + } + _cond.notify_all(); +} + +TwoPhasePendingLidTracker::CommitList::CommitList(List lids, TwoPhasePendingLidTracker & tracker) + : _tracker(&tracker), + _lids(std::move(lids)) +{ } +TwoPhasePendingLidTracker::CommitList::CommitList(CommitList && rhs) noexcept + : _tracker(rhs._tracker), + _lids(std::move(rhs._lids)) +{ + rhs._tracker = nullptr; } +TwoPhasePendingLidTracker::CommitList::~CommitList() { + if (_tracker != nullptr) { + _tracker->consumeSnapshot(std::move(_lids)); + } +} + } diff --git a/searchcore/src/vespa/searchcore/proton/common/pendinglidtracker.h b/searchcore/src/vespa/searchcore/proton/common/pendinglidtracker.h index 9de5fa49cf2..0a85f1c3e73 100644 --- a/searchcore/src/vespa/searchcore/proton/common/pendinglidtracker.h +++ b/searchcore/src/vespa/searchcore/proton/common/pendinglidtracker.h @@ -19,8 +19,8 @@ public: Token & operator = (const Token &) = delete; Token & operator = (Token &&) = delete; Token(Token && rhs) noexcept - : _tracker(rhs._tracker), - _lid(rhs._lid) + : _tracker(rhs._tracker), + _lid(rhs._lid) { rhs._tracker = nullptr; } @@ -31,50 +31,95 @@ public: }; virtual ~IPendingLidTracker() = default; virtual Token produce(uint32_t lid) = 0; - virtual void waitForEmpty() = 0; - virtual void waitForConsumed(uint32_t lid) = 0; - virtual void waitForConsumed(const std::vector<uint32_t> & lids) = 0; - virtual bool isInFlight(uint32_t lid) = 0; - virtual bool areAnyInFlight(const std::vector<uint32_t> & lids) = 0; - virtual bool areAnyInFlight() = 0; private: virtual void consume(uint32_t lid) = 0; - std::mutex _mutex; - std::condition_variable _cond; - vespalib::hash_map<uint32_t, uint32_t> _pending; }; -class NoopLidTracker : public IPendingLidTracker { +class ILidCommitState { public: - Token produce(uint32_t lid) override; - void waitForEmpty() override { } - void waitForConsumed(uint32_t ) override { } - void waitForConsumed(const std::vector<uint32_t> & ) override { } - bool isInFlight(uint32_t ) override { return false; } - bool areAnyInFlight(const std::vector<uint32_t> & ) override { return false; } - bool areAnyInFlight() override { return false; } + virtual ~ILidCommitState() = default; + bool needCommit(uint32_t lid); + bool needCommit(const std::vector<uint32_t> & lids); + bool needCommit(); + void waitComplete(uint32_t lid); + void waitComplete(const std::vector<uint32_t> & lids); + void waitComplete(); +protected: + enum class State {NEED_COMMIT, COMPLETE}; + virtual State waitState(State state, uint32_t lid) = 0; + virtual State waitState(State state, const std::vector<uint32_t> & lids) = 0; + virtual State waitState(State state) = 0; +}; + +class PendingLidTrackerBase : public IPendingLidTracker, + public ILidCommitState +{ +public: + struct Payload { + virtual ~Payload() = default; + }; + using Snapshot = std::unique_ptr<Payload>; + virtual Snapshot produceSnapshot() = 0; private: - void consume(uint32_t ) override { } }; -class PendingLidTracker : public IPendingLidTracker { +class PendingLidTracker : public PendingLidTrackerBase +{ public: PendingLidTracker(); ~PendingLidTracker() override; Token produce(uint32_t lid) override; - void waitForEmpty() override; - void waitForConsumed(uint32_t lid) override; - void waitForConsumed(const std::vector<uint32_t> & lids) override; - bool isInFlight(uint32_t lid) override; - bool areAnyInFlight(const std::vector<uint32_t> & lids) override; - bool areAnyInFlight() override; + Snapshot produceSnapshot() override; private: - using MonitorGuard = std::unique_lock<std::mutex>; + State waitState(State state) override; + State waitState(State state, uint32_t lid) override; + State waitState(State state, const std::vector<uint32_t> & lids) override; void consume(uint32_t lid) override; - void waitFor(MonitorGuard & guard, uint32_t lid); + using MonitorGuard = std::unique_lock<std::mutex>; + State waitFor(MonitorGuard & guard, State state, uint32_t lid); std::mutex _mutex; std::condition_variable _cond; vespalib::hash_map<uint32_t, uint32_t> _pending; }; +class TwoPhasePendingLidTracker : public PendingLidTrackerBase +{ +public: + TwoPhasePendingLidTracker(); + ~TwoPhasePendingLidTracker() override; + Token produce(uint32_t lid) override; + Snapshot produceSnapshot() override; +private: + using List = std::vector<std::pair<uint32_t, uint32_t>>; + class CommitList : public Payload { + public: + CommitList(List lids, TwoPhasePendingLidTracker & tracker); + CommitList(const CommitList &) = delete; + CommitList & operator = (const CommitList &) = delete; + CommitList & operator = (CommitList &&) = delete; + CommitList(CommitList && rhs) noexcept; + ~CommitList() override; + private: + TwoPhasePendingLidTracker * _tracker; + List _lids; + }; + using MonitorGuard = std::unique_lock<std::mutex>; + State waitState(State state) override; + State waitState(State state, uint32_t lid) override; + State waitState(State state, const std::vector<uint32_t> & lids) override; + void consume(uint32_t lid) override; + void consumeSnapshot(List); + State waitFor(MonitorGuard & guard, State state, uint32_t lid); + std::mutex _mutex; + std::condition_variable _cond; + struct Counters { + Counters() : feed(0), commit(0), done(0) {} + bool empty() const { return (feed == 0) && (commit == 0) && (done == 0); } + uint32_t feed; + uint32_t commit; + uint32_t done; + }; + vespalib::hash_map<uint32_t, Counters> _pending; +}; + } diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/commit_and_wait_document_retriever.cpp b/searchcore/src/vespa/searchcore/proton/persistenceengine/commit_and_wait_document_retriever.cpp index f6a52ad7b44..daa240e8b12 100644 --- a/searchcore/src/vespa/searchcore/proton/persistenceengine/commit_and_wait_document_retriever.cpp +++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/commit_and_wait_document_retriever.cpp @@ -6,7 +6,7 @@ namespace proton { CommitAndWaitDocumentRetriever::CommitAndWaitDocumentRetriever(IDocumentRetriever::SP retriever, ICommitable &commit, - IPendingLidTracker & unCommittedLidTracker) + ILidCommitState & unCommittedLidTracker) : _retriever(std::move(retriever)), _commit(commit), _uncommittedLidsTracker(unCommittedLidTracker) diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/commit_and_wait_document_retriever.h b/searchcore/src/vespa/searchcore/proton/persistenceengine/commit_and_wait_document_retriever.h index 33258f767dc..2b8264f7eef 100644 --- a/searchcore/src/vespa/searchcore/proton/persistenceengine/commit_and_wait_document_retriever.h +++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/commit_and_wait_document_retriever.h @@ -17,10 +17,10 @@ class CommitAndWaitDocumentRetriever : public IDocumentRetriever { IDocumentRetriever::SP _retriever; ICommitable &_commit; - IPendingLidTracker &_uncommittedLidsTracker; + ILidCommitState &_uncommittedLidsTracker; using Bucket = storage::spi::Bucket; public: - CommitAndWaitDocumentRetriever(IDocumentRetriever::SP retriever, ICommitable &commit, IPendingLidTracker & unCommittedLidTracker); + CommitAndWaitDocumentRetriever(IDocumentRetriever::SP retriever, ICommitable &commit, ILidCommitState & unCommittedLidTracker); ~CommitAndWaitDocumentRetriever() override; const document::DocumentTypeRepo &getDocumentTypeRepo() const override; diff --git a/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.cpp b/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.cpp index 09dbace0bdc..00f67a4dd7b 100644 --- a/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.cpp @@ -233,7 +233,7 @@ CombiningFeedView::forceCommit(search::SerialNum serialNum) } } -IPendingLidTracker & +ILidCommitState & CombiningFeedView::getUncommittedLidsTracker() { LOG_ABORT("CombiningFeedView::getUncommittedLidsTracker should never be called."); } diff --git a/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.h b/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.h index aa730e104aa..5625d41ccdc 100644 --- a/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.h +++ b/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.h @@ -77,7 +77,7 @@ public: void sync() override; void handlePruneRemovedDocuments(const PruneRemovedDocumentsOperation &pruneOp) override; void handleCompactLidSpace(const CompactLidSpaceOperation &op) override; - IPendingLidTracker & getUncommittedLidsTracker() override; + ILidCommitState & getUncommittedLidsTracker() override; // Called by document db executor void setCalculator(const IBucketStateCalculator::SP &newCalc); diff --git a/searchcore/src/vespa/searchcore/proton/server/documentsubdbcollection.cpp b/searchcore/src/vespa/searchcore/proton/server/documentsubdbcollection.cpp index 54363c5b197..d0a857e8a0e 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentsubdbcollection.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/documentsubdbcollection.cpp @@ -130,7 +130,7 @@ DocumentSubDBCollection::createRetrievers() namespace { IDocumentRetriever::SP -wrapRetriever(IDocumentRetriever::SP retriever, ICommitable &commit, IPendingLidTracker & unCommitedLidsTracker) +wrapRetriever(IDocumentRetriever::SP retriever, ICommitable &commit, ILidCommitState & unCommitedLidsTracker) { return std::make_shared<CommitAndWaitDocumentRetriever>(std::move(retriever), commit, unCommitedLidsTracker); } diff --git a/searchcore/src/vespa/searchcore/proton/server/forcecommitcontext.cpp b/searchcore/src/vespa/searchcore/proton/server/forcecommitcontext.cpp index 2140792e206..cdb7ec20f26 100644 --- a/searchcore/src/vespa/searchcore/proton/server/forcecommitcontext.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/forcecommitcontext.cpp @@ -8,9 +8,10 @@ namespace proton { ForceCommitContext::ForceCommitContext(vespalib::Executor &executor, - IDocumentMetaStore &documentMetaStore) + IDocumentMetaStore &documentMetaStore, + PendingLidTrackerBase::Snapshot lidsToCommit) : _executor(executor), - _task(std::make_unique<ForceCommitDoneTask>(documentMetaStore)), + _task(std::make_unique<ForceCommitDoneTask>(documentMetaStore, std::move(lidsToCommit))), _committedDocIdLimit(0u), _docIdLimit(nullptr) { diff --git a/searchcore/src/vespa/searchcore/proton/server/forcecommitcontext.h b/searchcore/src/vespa/searchcore/proton/server/forcecommitcontext.h index 494e12002a6..8c7df2aaedd 100644 --- a/searchcore/src/vespa/searchcore/proton/server/forcecommitcontext.h +++ b/searchcore/src/vespa/searchcore/proton/server/forcecommitcontext.h @@ -2,10 +2,8 @@ #pragma once +#include <vespa/searchcore/proton/common/pendinglidtracker.h> #include <vespa/searchlib/common/idestructorcallback.h> -#include <memory> -#include <cstdint> -#include <vector> namespace vespalib { class Executor; } @@ -24,14 +22,15 @@ class DocIdLimit; */ class ForceCommitContext : public search::IDestructorCallback { - vespalib::Executor &_executor; - std::unique_ptr<ForceCommitDoneTask> _task; - uint32_t _committedDocIdLimit; - DocIdLimit *_docIdLimit; + vespalib::Executor &_executor; + std::unique_ptr<ForceCommitDoneTask> _task; + uint32_t _committedDocIdLimit; + DocIdLimit *_docIdLimit; public: ForceCommitContext(vespalib::Executor &executor, - IDocumentMetaStore &documentMetaStore); + IDocumentMetaStore &documentMetaStore, + PendingLidTrackerBase::Snapshot lidsToCommit); ~ForceCommitContext() override; diff --git a/searchcore/src/vespa/searchcore/proton/server/forcecommitdonetask.cpp b/searchcore/src/vespa/searchcore/proton/server/forcecommitdonetask.cpp index 81da2a4ec3e..ffae783d8e6 100644 --- a/searchcore/src/vespa/searchcore/proton/server/forcecommitdonetask.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/forcecommitdonetask.cpp @@ -5,8 +5,9 @@ namespace proton { -ForceCommitDoneTask::ForceCommitDoneTask(IDocumentMetaStore &documentMetaStore) +ForceCommitDoneTask::ForceCommitDoneTask(IDocumentMetaStore &documentMetaStore, PendingLidTrackerBase::Snapshot lidsToCommit) : _lidsToReuse(), + _lidsToCommit(std::move(lidsToCommit)), _holdUnblockShrinkLidSpace(false), _documentMetaStore(documentMetaStore) { @@ -34,6 +35,7 @@ ForceCommitDoneTask::run() if (_holdUnblockShrinkLidSpace) { _documentMetaStore.holdUnblockShrinkLidSpace(); } + _lidsToCommit.reset(); } } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/forcecommitdonetask.h b/searchcore/src/vespa/searchcore/proton/server/forcecommitdonetask.h index 1b459ab6d02..c9c4ba9f6ec 100644 --- a/searchcore/src/vespa/searchcore/proton/server/forcecommitdonetask.h +++ b/searchcore/src/vespa/searchcore/proton/server/forcecommitdonetask.h @@ -2,8 +2,8 @@ #pragma once +#include <vespa/searchcore/proton/common/pendinglidtracker.h> #include <vespa/vespalib/util/executor.h> -#include <vector> namespace proton { @@ -25,13 +25,13 @@ struct IDocumentMetaStore; */ class ForceCommitDoneTask : public vespalib::Executor::Task { - std::vector<uint32_t> _lidsToReuse; - bool _holdUnblockShrinkLidSpace; - IDocumentMetaStore &_documentMetaStore; + std::vector<uint32_t> _lidsToReuse; + PendingLidTrackerBase::Snapshot _lidsToCommit; + bool _holdUnblockShrinkLidSpace; + IDocumentMetaStore &_documentMetaStore; public: - ForceCommitDoneTask(IDocumentMetaStore &documentMetaStore); - + ForceCommitDoneTask(IDocumentMetaStore &documentMetaStore, PendingLidTrackerBase::Snapshot lidsToCommit); ~ForceCommitDoneTask() override; void reuseLids(std::vector<uint32_t> &&lids); @@ -43,7 +43,7 @@ public: void run() override; bool empty() const { - return _lidsToReuse.empty() && !_holdUnblockShrinkLidSpace; + return _lidsToReuse.empty() && !_holdUnblockShrinkLidSpace && !_lidsToCommit; } }; diff --git a/searchcore/src/vespa/searchcore/proton/server/ifeedview.h b/searchcore/src/vespa/searchcore/proton/server/ifeedview.h index 11e95937ec9..734c27ff09d 100644 --- a/searchcore/src/vespa/searchcore/proton/server/ifeedview.h +++ b/searchcore/src/vespa/searchcore/proton/server/ifeedview.h @@ -62,7 +62,7 @@ public: virtual void forceCommit(search::SerialNum serialNum) = 0; virtual void handlePruneRemovedDocuments(const PruneRemovedDocumentsOperation & pruneOp) = 0; virtual void handleCompactLidSpace(const CompactLidSpaceOperation &op) = 0; - virtual IPendingLidTracker & getUncommittedLidsTracker() = 0; + virtual ILidCommitState & getUncommittedLidsTracker() = 0; }; } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp index e5050831b45..4ad7dd91d52 100644 --- a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp @@ -192,12 +192,12 @@ moveMetaData(documentmetastore::IStore &meta_store, const DocumentId & doc_id, c meta_store.move(op.getPrevLid(), op.getLid()); } -std::unique_ptr<IPendingLidTracker> +std::unique_ptr<PendingLidTrackerBase> createUncommitedLidTracker(bool needImmediateCommit) { if (needImmediateCommit) { - return std::make_unique<NoopLidTracker>(); - } else { return std::make_unique<PendingLidTracker>(); + } else { + return std::make_unique<TwoPhasePendingLidTracker>(); } } @@ -230,14 +230,15 @@ StoreOnlyFeedView::sync() _writeService.summary().sync(); } -IPendingLidTracker & +ILidCommitState & StoreOnlyFeedView::getUncommittedLidsTracker() { return *_pendingLidsForCommit; } void StoreOnlyFeedView::forceCommit(SerialNum serialNum) { - forceCommit(serialNum, std::make_shared<ForceCommitContext>(_writeService.master(), _metaStore)); + forceCommit(serialNum, std::make_shared<ForceCommitContext>(_writeService.master(), _metaStore, + _pendingLidsForCommit->produceSnapshot())); } void @@ -469,7 +470,7 @@ StoreOnlyFeedView::internalUpdate(FeedToken token, const UpdateOperation &updOp) PromisedDoc promisedDoc; FutureDoc futureDoc = promisedDoc.get_future().share(); onWriteDone->setDocument(futureDoc); - _pendingLidsForDocStore.waitForConsumed(lid); + _pendingLidsForDocStore.waitComplete(lid); if (updateScope._indexedFields) { updateIndexedFields(serialNum, lid, futureDoc, immediateCommit, onWriteDone); } @@ -833,8 +834,8 @@ StoreOnlyFeedView::handleCompactLidSpace(const CompactLidSpaceOperation &op) const SerialNum serialNum = op.getSerialNum(); if (useDocumentMetaStore(serialNum)) { getDocumentMetaStore()->get().compactLidSpace(op.getLidLimit()); - std::shared_ptr<ForceCommitContext> - commitContext(std::make_shared<ForceCommitContext>(_writeService.master(), _metaStore)); + auto commitContext(std::make_shared<ForceCommitContext>(_writeService.master(), _metaStore, + _pendingLidsForCommit->produceSnapshot())); commitContext->holdUnblockShrinkLidSpace(); forceCommit(serialNum, commitContext); } diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h index 94ed2cf1b4c..106ccbff460 100644 --- a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h +++ b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h @@ -144,7 +144,7 @@ private: const document::DocumentType *_docType; LidReuseDelayer _lidReuseDelayer; PendingLidTracker _pendingLidsForDocStore; - std::unique_ptr<IPendingLidTracker> _pendingLidsForCommit; + std::unique_ptr<PendingLidTrackerBase> _pendingLidsForCommit; protected: const search::index::Schema::SP _schema; @@ -263,7 +263,7 @@ public: */ void handlePruneRemovedDocuments(const PruneRemovedDocumentsOperation &pruneOp) override; void handleCompactLidSpace(const CompactLidSpaceOperation &op) override; - IPendingLidTracker & getUncommittedLidsTracker() override; + ILidCommitState & getUncommittedLidsTracker() override; }; } diff --git a/searchcore/src/vespa/searchcore/proton/server/visibilityhandler.cpp b/searchcore/src/vespa/searchcore/proton/server/visibilityhandler.cpp index 7cef086f2d0..612af21b7e0 100644 --- a/searchcore/src/vespa/searchcore/proton/server/visibilityhandler.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/visibilityhandler.cpp @@ -37,31 +37,33 @@ VisibilityHandler::internalCommit(bool force) void VisibilityHandler::commit() { - internalCommit(true); + if (hasVisibilityDelay()) { + internalCommit(true); + } } void -VisibilityHandler::commitAndWait(IPendingLidTracker & unCommittedLidTracker) +VisibilityHandler::commitAndWait(ILidCommitState & unCommittedLidTracker) { - if (unCommittedLidTracker.areAnyInFlight()) { + if (unCommittedLidTracker.needCommit()) { internalCommit(false); - unCommittedLidTracker.waitForEmpty(); + unCommittedLidTracker.waitComplete(); } } void -VisibilityHandler::commitAndWait(IPendingLidTracker & unCommittedLidTracker, uint32_t lid) { - if (unCommittedLidTracker.isInFlight(lid)) { +VisibilityHandler::commitAndWait(ILidCommitState & unCommittedLidTracker, uint32_t lid) { + if (unCommittedLidTracker.needCommit(lid)) { internalCommit(false); - unCommittedLidTracker.waitForConsumed(lid); + unCommittedLidTracker.waitComplete(lid); } } void -VisibilityHandler::commitAndWait(IPendingLidTracker & unCommittedLidTracker, const std::vector<uint32_t> & lids) { - if (unCommittedLidTracker.areAnyInFlight(lids)) { +VisibilityHandler::commitAndWait(ILidCommitState & unCommittedLidTracker, const std::vector<uint32_t> & lids) { + if (unCommittedLidTracker.needCommit(lids)) { internalCommit(false); - unCommittedLidTracker.waitForConsumed(lids); + unCommittedLidTracker.waitComplete(lids); } } diff --git a/searchcore/src/vespa/searchcore/proton/server/visibilityhandler.h b/searchcore/src/vespa/searchcore/proton/server/visibilityhandler.h index 786cbb3bc83..560b6e75423 100644 --- a/searchcore/src/vespa/searchcore/proton/server/visibilityhandler.h +++ b/searchcore/src/vespa/searchcore/proton/server/visibilityhandler.h @@ -29,9 +29,9 @@ public: vespalib::duration getVisibilityDelay() const { return _visibilityDelay; } bool hasVisibilityDelay() const { return _visibilityDelay != vespalib::duration::zero(); } void commit() override; - void commitAndWait(IPendingLidTracker & unCommittedLidTracker) override; - void commitAndWait(IPendingLidTracker &, uint32_t ) override; - void commitAndWait(IPendingLidTracker &, const std::vector<uint32_t> & ) override; + void commitAndWait(ILidCommitState & unCommittedLidTracker) override; + void commitAndWait(ILidCommitState &, uint32_t ) override; + void commitAndWait(ILidCommitState &, const std::vector<uint32_t> & ) override; private: bool startCommit(const std::lock_guard<std::mutex> &unused, bool force); void performCommit(bool force); diff --git a/searchcore/src/vespa/searchcore/proton/test/dummy_feed_view.cpp b/searchcore/src/vespa/searchcore/proton/test/dummy_feed_view.cpp index ba9dd7ecc39..2507abcc9ea 100644 --- a/searchcore/src/vespa/searchcore/proton/test/dummy_feed_view.cpp +++ b/searchcore/src/vespa/searchcore/proton/test/dummy_feed_view.cpp @@ -17,7 +17,7 @@ DummyFeedView::DummyFeedView(std::shared_ptr<const document::DocumentTypeRepo> d DummyFeedView::~DummyFeedView() = default; -IPendingLidTracker & +ILidCommitState & DummyFeedView::getUncommittedLidsTracker() { assert(false); } diff --git a/searchcore/src/vespa/searchcore/proton/test/dummy_feed_view.h b/searchcore/src/vespa/searchcore/proton/test/dummy_feed_view.h index 122559f9e68..4fc5cbe5018 100644 --- a/searchcore/src/vespa/searchcore/proton/test/dummy_feed_view.h +++ b/searchcore/src/vespa/searchcore/proton/test/dummy_feed_view.h @@ -33,7 +33,7 @@ struct DummyFeedView : public IFeedView void handlePruneRemovedDocuments(const PruneRemovedDocumentsOperation &) override {} void handleCompactLidSpace(const CompactLidSpaceOperation &) override {} void forceCommit(search::SerialNum) override { } - IPendingLidTracker & getUncommittedLidsTracker() override; + ILidCommitState & getUncommittedLidsTracker() override; }; } |