From 3ae46cdfb043785d25e76d56724ef93b231668d8 Mon Sep 17 00:00:00 2001 From: Arnstein Ressem Date: Thu, 27 Aug 2020 19:56:48 +0200 Subject: Revert "Only wait for th elids that you are interested in." --- searchcore/src/tests/proton/common/CMakeLists.txt | 7 - .../tests/proton/common/pendinglidtracker_test.cpp | 105 ----------- .../document_iterator/document_iterator_test.cpp | 10 +- .../document_subdbs/document_subdbs_test.cpp | 10 +- .../maintenancecontroller_test.cpp | 10 +- .../visibility_handler/visibility_handler_test.cpp | 65 +++---- .../vespa/searchcore/proton/common/icommitable.h | 8 +- .../searchcore/proton/common/pendinglidtracker.cpp | 197 +++------------------ .../searchcore/proton/common/pendinglidtracker.h | 114 ++---------- .../commit_and_wait_document_retriever.cpp | 2 +- .../commit_and_wait_document_retriever.h | 4 +- .../searchcore/proton/server/combiningfeedview.cpp | 2 +- .../searchcore/proton/server/combiningfeedview.h | 2 +- .../proton/server/documentsubdbcollection.cpp | 7 +- .../proton/server/forcecommitcontext.cpp | 5 +- .../searchcore/proton/server/forcecommitcontext.h | 15 +- .../proton/server/forcecommitdonetask.cpp | 8 +- .../searchcore/proton/server/forcecommitdonetask.h | 14 +- .../src/vespa/searchcore/proton/server/ifeedview.h | 2 +- .../searchcore/proton/server/removedonecontext.cpp | 5 +- .../searchcore/proton/server/removedonecontext.h | 6 +- .../proton/server/searchabledocsubdb.cpp | 2 +- .../searchcore/proton/server/storeonlyfeedview.cpp | 76 +++----- .../searchcore/proton/server/storeonlyfeedview.h | 7 +- .../searchcore/proton/server/visibilityhandler.cpp | 60 +++---- .../searchcore/proton/server/visibilityhandler.h | 7 +- .../searchcore/proton/test/dummy_feed_view.cpp | 2 +- .../vespa/searchcore/proton/test/dummy_feed_view.h | 2 +- 28 files changed, 173 insertions(+), 581 deletions(-) delete mode 100644 searchcore/src/tests/proton/common/pendinglidtracker_test.cpp (limited to 'searchcore/src') diff --git a/searchcore/src/tests/proton/common/CMakeLists.txt b/searchcore/src/tests/proton/common/CMakeLists.txt index 30ec8c035ad..da0bc9b5b10 100644 --- a/searchcore/src/tests/proton/common/CMakeLists.txt +++ b/searchcore/src/tests/proton/common/CMakeLists.txt @@ -15,10 +15,3 @@ vespa_add_executable(searchcore_cachedselect_test_app TEST searchlib_test ) vespa_add_test(NAME searchcore_cachedselect_test_app COMMAND searchcore_cachedselect_test_app) -vespa_add_executable(pendinglidtracker_test_app TEST - SOURCES - pendinglidtracker_test.cpp - DEPENDS - searchcore_pcommon -) -vespa_add_test(NAME pendinglidtracker_test_app COMMAND pendinglidtracker_test_app) diff --git a/searchcore/src/tests/proton/common/pendinglidtracker_test.cpp b/searchcore/src/tests/proton/common/pendinglidtracker_test.cpp deleted file mode 100644 index 575033ad19a..00000000000 --- a/searchcore/src/tests/proton/common/pendinglidtracker_test.cpp +++ /dev/null @@ -1,105 +0,0 @@ -// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include -#include - -#include -LOG_SETUP("pendinglidtracker_test"); - -using namespace proton; - -constexpr uint32_t LID_1 = 1u; -const std::vector LIDV_2_1_3({2u, LID_1, 3u}); -const std::vector LIDV_2_3({2u, 3u}); - -std::ostream & -operator << (std::ostream & os, ILidCommitState::State state) { - switch (state) { - case ILidCommitState::State::NEED_COMMIT: - os << "NEED_COMMIT"; - break; - case ILidCommitState::State::WAITING: - os << "WAITING"; - break; - case ILidCommitState::State::COMPLETED: - os << "COMPLETED"; - break; - } - return os; -} - -void -verifyPhase1ProduceAndNeedCommit(PendingLidTrackerBase & tracker, ILidCommitState::State expected) { - EXPECT_EQUAL(ILidCommitState::State::COMPLETED, tracker.getState()); - EXPECT_EQUAL(ILidCommitState::State::COMPLETED, tracker.getState(LID_1)); - EXPECT_EQUAL(ILidCommitState::State::COMPLETED, tracker.getState(LIDV_2_1_3)); - - auto token = tracker.produce(LID_1); - EXPECT_EQUAL(expected, tracker.getState()); - EXPECT_EQUAL(expected, tracker.getState(LID_1)); - EXPECT_EQUAL(expected, tracker.getState(LIDV_2_1_3)); - EXPECT_EQUAL(ILidCommitState::State::COMPLETED, tracker.getState(LIDV_2_3)); - { - auto token2 = tracker.produce(LID_1); - EXPECT_EQUAL(expected, tracker.getState()); - EXPECT_EQUAL(expected, tracker.getState(LID_1)); - EXPECT_EQUAL(expected, tracker.getState(LIDV_2_1_3)); - EXPECT_EQUAL(ILidCommitState::State::COMPLETED, tracker.getState(LIDV_2_3)); - } - EXPECT_EQUAL(expected, tracker.getState()); - EXPECT_EQUAL(expected, tracker.getState(LID_1)); - EXPECT_EQUAL(expected, tracker.getState(LIDV_2_1_3)); - EXPECT_EQUAL(ILidCommitState::State::COMPLETED, tracker.getState(LIDV_2_3)); -} - -TEST("test pendinglidtracker for needcommit") { - PendingLidTracker tracker; - verifyPhase1ProduceAndNeedCommit(tracker, ILidCommitState::State::WAITING); - EXPECT_EQUAL(ILidCommitState::State::COMPLETED, tracker.getState()); - EXPECT_EQUAL(ILidCommitState::State::COMPLETED, tracker.getState(LID_1)); - EXPECT_EQUAL(ILidCommitState::State::COMPLETED, tracker.getState(LIDV_2_1_3)); - { - ILidCommitState::State incomplete = ILidCommitState::State::WAITING; - auto token = tracker.produce(LID_1); - EXPECT_EQUAL(incomplete, tracker.getState()); - EXPECT_EQUAL(incomplete, tracker.getState(LID_1)); - EXPECT_EQUAL(incomplete, tracker.getState(LIDV_2_1_3)); - EXPECT_EQUAL(ILidCommitState::State::COMPLETED, tracker.getState(LIDV_2_3)); - { - auto snapshot = tracker.produceSnapshot(); - EXPECT_EQUAL(incomplete, tracker.getState()); - EXPECT_EQUAL(incomplete, tracker.getState(LID_1)); - EXPECT_EQUAL(incomplete, tracker.getState(LIDV_2_1_3)); - EXPECT_EQUAL(ILidCommitState::State::COMPLETED, tracker.getState(LIDV_2_3)); - } - EXPECT_EQUAL(incomplete, tracker.getState()); - EXPECT_EQUAL(incomplete, tracker.getState(LID_1)); - EXPECT_EQUAL(incomplete, tracker.getState(LIDV_2_1_3)); - EXPECT_EQUAL(ILidCommitState::State::COMPLETED, tracker.getState(LIDV_2_3)); - } - EXPECT_EQUAL(ILidCommitState::State::COMPLETED, tracker.getState()); - EXPECT_EQUAL(ILidCommitState::State::COMPLETED, tracker.getState(LID_1)); - EXPECT_EQUAL(ILidCommitState::State::COMPLETED, tracker.getState(LIDV_2_1_3)); -} - -TEST("test two phase pendinglidtracker for needcommit") { - TwoPhasePendingLidTracker tracker; - ILidCommitState::State incomplete = ILidCommitState::State::NEED_COMMIT; - verifyPhase1ProduceAndNeedCommit(tracker, incomplete); - EXPECT_EQUAL(incomplete, tracker.getState()); - EXPECT_EQUAL(incomplete, tracker.getState(LID_1)); - EXPECT_EQUAL(incomplete, tracker.getState(LIDV_2_1_3)); - EXPECT_EQUAL(ILidCommitState::State::COMPLETED, tracker.getState(LIDV_2_3)); - { - ILidCommitState::State waiting = ILidCommitState::State::WAITING; - auto snapshot = tracker.produceSnapshot(); - EXPECT_EQUAL(waiting, tracker.getState()); - EXPECT_EQUAL(waiting, tracker.getState(LID_1)); - EXPECT_EQUAL(waiting, tracker.getState(LIDV_2_1_3)); - } - EXPECT_EQUAL(ILidCommitState::State::COMPLETED, tracker.getState()); - EXPECT_EQUAL(ILidCommitState::State::COMPLETED, tracker.getState(LID_1)); - EXPECT_EQUAL(ILidCommitState::State::COMPLETED, tracker.getState(LIDV_2_1_3)); -} - -TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/searchcore/src/tests/proton/document_iterator/document_iterator_test.cpp b/searchcore/src/tests/proton/document_iterator/document_iterator_test.cpp index 1579b8ec91a..b106233a13e 100644 --- a/searchcore/src/tests/proton/document_iterator/document_iterator_test.cpp +++ b/searchcore/src/tests/proton/document_iterator/document_iterator_test.cpp @@ -279,12 +279,12 @@ struct Committer : public ICommitable { size_t _commitAndWaitCount; Committer() : _commitCount(0), _commitAndWaitCount(0) { } void commit() override { _commitCount++; } - void commitAndWait(ILidCommitState &) override { _commitAndWaitCount++; } - void commitAndWait(ILidCommitState & tracker, uint32_t ) override { - commitAndWait(tracker); + void commitAndWait() override { _commitAndWaitCount++; } + void commitAndWait(IPendingLidTracker &, uint32_t ) override { + commitAndWait(); } - void commitAndWait(ILidCommitState & tracker, const std::vector & ) override { - commitAndWait(tracker); + void commitAndWait(IPendingLidTracker &, const std::vector & ) override { + commitAndWait(); } }; diff --git a/searchcore/src/tests/proton/documentdb/document_subdbs/document_subdbs_test.cpp b/searchcore/src/tests/proton/documentdb/document_subdbs/document_subdbs_test.cpp index 43c018d1fbe..eeb6fc1966f 100644 --- a/searchcore/src/tests/proton/documentdb/document_subdbs/document_subdbs_test.cpp +++ b/searchcore/src/tests/proton/documentdb/document_subdbs/document_subdbs_test.cpp @@ -267,12 +267,12 @@ struct Committer : public ICommitable { size_t _commitAndWaitCount; Committer() : _commitCount(0), _commitAndWaitCount(0) { } void commit() override { _commitCount++; } - void commitAndWait(ILidCommitState & ) override { _commitAndWaitCount++; } - void commitAndWait(ILidCommitState & tracker, uint32_t ) override { - commitAndWait(tracker); + void commitAndWait() override { _commitAndWaitCount++; } + void commitAndWait(IPendingLidTracker &, uint32_t ) override { + commitAndWait(); } - void commitAndWait(ILidCommitState & tracker, const std::vector & ) override { - commitAndWait(tracker); + void commitAndWait(IPendingLidTracker &, const std::vector & ) override { + commitAndWait(); } }; diff --git a/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp b/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp index 814e7ddeab9..daae29dabdc 100644 --- a/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp +++ b/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp @@ -386,12 +386,12 @@ public: void syncSubDBs(); void commit() override { } - void commitAndWait(ILidCommitState & ) override { } - void commitAndWait(ILidCommitState & tracker, uint32_t ) override { - commitAndWait(tracker); + void commitAndWait() override { } + void commitAndWait(IPendingLidTracker &, uint32_t ) override { + commitAndWait(); } - void commitAndWait(ILidCommitState & tracker, const std::vector & ) override { - commitAndWait(tracker); + void commitAndWait(IPendingLidTracker &, const std::vector & ) override { + commitAndWait(); } void performSyncSubDBs(); void notifyClusterStateChanged(); diff --git a/searchcore/src/tests/proton/server/visibility_handler/visibility_handler_test.cpp b/searchcore/src/tests/proton/server/visibility_handler/visibility_handler_test.cpp index 1d69b7cec3c..56d82ed8695 100644 --- a/searchcore/src/tests/proton/server/visibility_handler/visibility_handler_test.cpp +++ b/searchcore/src/tests/proton/server/visibility_handler/visibility_handler_test.cpp @@ -40,7 +40,7 @@ class MyFeedView : public DummyFeedView uint32_t _forceCommitCount; SerialNum _committedSerialNum; public: - std::unique_ptr _tracker; + proton::PendingLidTracker _tracker; MyFeedView() @@ -48,20 +48,11 @@ public: _committedSerialNum(0u) {} - void setTracker(vespalib::duration visibilityDelay) { - if (visibilityDelay == vespalib::duration::zero()) { - _tracker = std::make_unique(); - } else { - _tracker = std::make_unique(); - } - } - void forceCommit(SerialNum serialNum) override { EXPECT_TRUE(serialNum >= _committedSerialNum); _committedSerialNum = serialNum; ++_forceCommitCount; - _tracker->produceSnapshot(); } uint32_t getForceCommitCount() const { return _forceCommitCount; } @@ -94,22 +85,25 @@ public: void checkCommitPostCondition(uint32_t expForceCommitCount, SerialNum expCommittedSerialNum, - uint32_t expMasterExecuteCnt) + uint32_t expMasterExecuteCnt, + uint32_t expAttributeFieldWriterSyncCnt) { EXPECT_EQUAL(expForceCommitCount, _feedViewReal->getForceCommitCount()); EXPECT_EQUAL(expCommittedSerialNum, _feedViewReal->getCommittedSerialNum()); EXPECT_EQUAL(expMasterExecuteCnt, _writeService.masterObserver().getExecuteCnt()); + EXPECT_EQUAL(expAttributeFieldWriterSyncCnt, + _writeService.attributeFieldWriterObserver().getSyncCnt()); } void testCommit(vespalib::duration visibilityDelay, bool internal, uint32_t expForceCommitCount, SerialNum expCommittedSerialNum, uint32_t expMasterExecuteCnt, + uint32_t expAttributeFieldWriterSyncCnt, SerialNum currSerialNum = 10u) { - _feedViewReal->setTracker(visibilityDelay); _getSerialNum.setSerialNum(currSerialNum); _visibilityHandler.setVisibilityDelay(visibilityDelay); if (internal) { @@ -122,16 +116,8 @@ public: _writeService.master().sync(); checkCommitPostCondition(expForceCommitCount, expCommittedSerialNum, - expMasterExecuteCnt); - } - - proton::PendingLidTracker::Token - createToken(proton::PendingLidTrackerBase & tracker, SerialNum serialNum, uint32_t lid) { - if (serialNum == 0) { - return proton::PendingLidTracker::Token(); - } else { - return tracker.produce(lid);; - } + expMasterExecuteCnt, + expAttributeFieldWriterSyncCnt); } void @@ -139,16 +125,16 @@ public: uint32_t expForceCommitCount, SerialNum expCommittedSerialNum, uint32_t expMasterExecuteCnt, + uint32_t expAttributeFieldWriterSyncCnt, SerialNum currSerialNum = 10u) { - _feedViewReal->setTracker(visibilityDelay); _getSerialNum.setSerialNum(currSerialNum); _visibilityHandler.setVisibilityDelay(visibilityDelay); constexpr uint32_t MY_LID=13; - proton::PendingLidTrackerBase * lidTracker = _feedViewReal->_tracker.get(); - { - proton::PendingLidTracker::Token token = createToken(*lidTracker, currSerialNum, MY_LID); + if (currSerialNum != 0) { + _feedViewReal->_tracker.produce(MY_LID); } + proton::PendingLidTracker * lidTracker = & _feedViewReal->_tracker; if (internal) { VisibilityHandler *visibilityHandler = &_visibilityHandler; auto task = makeLambdaTask([=]() { visibilityHandler->commitAndWait(*lidTracker, MY_LID); }); @@ -159,7 +145,8 @@ public: } checkCommitPostCondition(expForceCommitCount, expCommittedSerialNum, - expMasterExecuteCnt); + expMasterExecuteCnt, + expAttributeFieldWriterSyncCnt); } }; @@ -167,62 +154,62 @@ public: TEST_F("Check external commit with zero visibility delay", Fixture) { - f.testCommit(0s, false, 0u, 0u, 0u); + f.testCommit(0s, false, 0u, 0u, 0u, 0u); } TEST_F("Check external commit with nonzero visibility delay", Fixture) { - f.testCommit(1s, false, 1u, 10u, 1u); + f.testCommit(1s, false, 1u, 10u, 1u, 0u); } TEST_F("Check external commit with nonzero visibility delay and no new feed operation", Fixture) { - f.testCommit(1s, false, 1u, 0u, 1u, 0u); + f.testCommit(1s, false, 1u, 0u, 1u, 0u, 0u); } TEST_F("Check internal commit with zero visibility delay", Fixture) { - f.testCommit(0s, true, 0u, 0u, 1u); + f.testCommit(0s, true, 0u, 0u, 1u, 0u); } TEST_F("Check internal commit with nonzero visibility delay", Fixture) { - f.testCommit(1s, true, 1u, 10u, 1u); + f.testCommit(1s, true, 1u, 10u, 1u, 0u); } TEST_F("Check internal commit with nonzero visibility delay and no new feed operation", Fixture) { - f.testCommit(1s, true, 1u, 0u, 1u, 0u); + f.testCommit(1s, true, 1u, 0u, 1u, 0u, 0u); } TEST_F("Check external commitAndWait with zero visibility delay", Fixture) { - f.testCommitAndWait(0s, false, 0u, 0u, 0u); + f.testCommitAndWait(0s, false, 0u, 0u, 0u, 1u); } TEST_F("Check external commitAndWait with nonzero visibility delay", Fixture) { - f.testCommitAndWait(1s, false, 1u, 10u, 1u); + f.testCommitAndWait(1s, false, 1u, 10u, 1u, 1u); } TEST_F("Check external commitAndWait with nonzero visibility delay and no new feed operation", Fixture) { - f.testCommitAndWait(1s, false, 0u, 0u, 0u, 0u); + f.testCommitAndWait(1s, false, 0u, 0u, 0u, 1u, 0u); } TEST_F("Check internal commitAndWait with zero visibility delay", Fixture) { - f.testCommitAndWait(0s, true, 0u, 0u, 1u); + f.testCommitAndWait(0s, true, 0u, 0u, 1u, 1u); } TEST_F("Check internal commitAndWait with nonzero visibility delay", Fixture) { - f.testCommitAndWait(1s, true, 1u, 10u, 1u); + f.testCommitAndWait(1s, true, 1u, 10u, 1u, 1u); } TEST_F("Check internal commitAndWait with nonzero visibility delay and no new feed operation", Fixture) { - f.testCommitAndWait(1s, true, 0u, 0u, 1u, 0u); + f.testCommitAndWait(1s, true, 0u, 0u, 1u, 1u, 0u); } TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/searchcore/src/vespa/searchcore/proton/common/icommitable.h b/searchcore/src/vespa/searchcore/proton/common/icommitable.h index 55762a69862..3623e6d60d1 100644 --- a/searchcore/src/vespa/searchcore/proton/common/icommitable.h +++ b/searchcore/src/vespa/searchcore/proton/common/icommitable.h @@ -5,7 +5,7 @@ #include namespace proton { -class ILidCommitState; +class IPendingLidTracker; /** * Interface for anyone that needs to commit. @@ -13,9 +13,9 @@ class ILidCommitState; class ICommitable { public: virtual void commit() = 0; - virtual void commitAndWait(ILidCommitState & unCommittedLidTracker) = 0; - virtual void commitAndWait(ILidCommitState &uncommittedLidTracker, uint32_t lid) = 0; - virtual void commitAndWait(ILidCommitState &uncommittedLidTracker, const std::vector & lid) = 0; + virtual void commitAndWait() = 0; + virtual void commitAndWait(IPendingLidTracker &uncommittedLidTracker, uint32_t lid) = 0; + virtual void commitAndWait(IPendingLidTracker &uncommittedLidTracker, const std::vector & lid) = 0; protected: virtual ~ICommitable() = default; }; diff --git a/searchcore/src/vespa/searchcore/proton/common/pendinglidtracker.cpp b/searchcore/src/vespa/searchcore/proton/common/pendinglidtracker.cpp index e0a45295dc8..8a4c01a4560 100644 --- a/searchcore/src/vespa/searchcore/proton/common/pendinglidtracker.cpp +++ b/searchcore/src/vespa/searchcore/proton/common/pendinglidtracker.cpp @@ -2,83 +2,51 @@ #include "pendinglidtracker.h" #include -#include #include namespace proton { -IPendingLidTracker::Token::Token() - : _tracker(nullptr), - _lid(0u) -{} IPendingLidTracker::Token::Token(uint32_t lid, IPendingLidTracker &tracker) : _tracker(&tracker), _lid(lid) {} +IPendingLidTracker::Token::Token() + : _tracker(nullptr), + _lid(0u) +{} + + IPendingLidTracker::Token::~Token() { if (_tracker != nullptr) { _tracker->consume(_lid); } } -void -ILidCommitState::waitComplete(uint32_t lid) const { - waitState(State::COMPLETED, lid); -} -void -ILidCommitState::waitComplete(const LidList & lids) const { - waitState(State::COMPLETED, lids); -} -void -ILidCommitState::waitComplete() const { - waitState(State::COMPLETED); -} - -PendingLidTrackerBase::PendingLidTrackerBase() = default; -PendingLidTrackerBase::~PendingLidTrackerBase() = default; - -ILidCommitState::State -PendingLidTrackerBase::waitState(State state) const { - auto pending = pendingLids(); - return waitState(state, pending); -} - -ILidCommitState::State -PendingLidTrackerBase::waitState(State state, uint32_t lid) const { - MonitorGuard guard(_mutex); - return waitFor(guard, state, lid); -} - -ILidCommitState::State -PendingLidTrackerBase::waitState(State state, const LidList & lids) const { - MonitorGuard guard(_mutex); - State lowest = State::COMPLETED; - for (uint32_t lid : lids) { - State next = waitFor(guard, state, lid); - if ((state == State::NEED_COMMIT) && next == state) { - return next; - } - lowest = std::min(next, lowest); - } - return lowest; +IPendingLidTracker::Token +NoopLidTracker::produce(uint32_t) { + return Token(); } -PendingLidTracker::PendingLidTracker() = default; +PendingLidTracker::PendingLidTracker() + : _mutex(), + _cond(), + _pending() +{} PendingLidTracker::~PendingLidTracker() { assert(_pending.empty()); } -IPendingLidTracker::Token +PendingLidTracker::Token PendingLidTracker::produce(uint32_t lid) { - std::lock_guard guard(_mutex); + std::lock_guard guard(_mutex); _pending[lid]++; return Token(lid, *this); } void PendingLidTracker::consume(uint32_t lid) { - std::lock_guard guard(_mutex); + std::lock_guard guard(_mutex); auto found = _pending.find(lid); assert (found != _pending.end()); assert (found->second > 0); @@ -90,137 +58,12 @@ PendingLidTracker::consume(uint32_t lid) { } } -ILidCommitState::State -PendingLidTracker::waitFor(MonitorGuard & guard, State state, uint32_t lid) const { - for (auto found = _pending.find(lid); found != _pending.end(); found = _pending.find(lid)) { - if (state == State::NEED_COMMIT) { - return State::WAITING; - } - _cond.wait(guard); - } - return State::COMPLETED; -} - -PendingLidTrackerBase::Snapshot -PendingLidTracker::produceSnapshot() { - return Snapshot(); -} - -ILidCommitState::LidList -PendingLidTracker::pendingLids() const { - MonitorGuard guard(_mutex); - LidList lids; - lids.reserve(_pending.size()); - for (const auto & entry : _pending) { - lids.push_back(entry.first); - } - return lids; -} - -TwoPhasePendingLidTracker::TwoPhasePendingLidTracker() = default; - -TwoPhasePendingLidTracker::~TwoPhasePendingLidTracker() { - assert(_pending.empty()); -} - -IPendingLidTracker::Token -TwoPhasePendingLidTracker::produce(uint32_t lid) { - std::lock_guard guard(_mutex); - _pending[lid].inflight_feed++; - return Token(lid, *this); -} void -TwoPhasePendingLidTracker::consume(uint32_t lid) { - std::lock_guard guard(_mutex); - auto found = _pending.find(lid); - assert (found != _pending.end()); - assert (found->second.inflight_feed > 0); - found->second.inflight_feed--; - found->second.need_commit = true; -} - -ILidCommitState::State -TwoPhasePendingLidTracker::waitFor(MonitorGuard & guard, State state, uint32_t lid) const { - for (auto found = _pending.find(lid); found != _pending.end(); found = _pending.find(lid)) { - if (state == State::NEED_COMMIT) { - if ((found->second.inflight_feed > 0) || found->second.need_commit) { - return State::NEED_COMMIT; - } - return State::WAITING; - } +PendingLidTracker::waitForConsumedLid(uint32_t lid) { + std::unique_lock guard(_mutex); + while (_pending.find(lid) != _pending.end()) { _cond.wait(guard); } - return State::COMPLETED; -} - -void -TwoPhasePendingLidTracker::consumeSnapshot(LidList committed) { - MonitorGuard guard(_mutex); - for (const auto & lid : committed) { - auto found = _pending.find(lid); - assert(found != _pending.end()); - assert(found->second.inflight_commit >= 1); - found->second.inflight_commit --; - if (found->second.empty()) { - _pending.erase(found); - } - } - _cond.notify_all(); -} - -ILidCommitState::LidList -TwoPhasePendingLidTracker::pendingLids() const { - MonitorGuard guard(_mutex); - LidList lids; - lids.reserve(_pending.size()); - for (const auto & entry : _pending) { - lids.push_back(entry.first); - } - return lids; -} - -namespace common::internal { - -class CommitList : public PendingLidTrackerBase::Payload { -public: - using LidList = ILidCommitState::LidList; - CommitList(LidList lids, TwoPhasePendingLidTracker & tracker) - : _tracker(&tracker), - _lids(std::move(lids)) - { } - CommitList(const CommitList &) = delete; - CommitList & operator = (const CommitList &) = delete; - CommitList & operator = (CommitList &&) = delete; - CommitList(CommitList && rhs) noexcept - : _tracker(rhs._tracker), - _lids(std::move(rhs._lids)) - { - rhs._tracker = nullptr; - } - ~CommitList() override { - if (_tracker != nullptr) { - _tracker->consumeSnapshot(std::move(_lids)); - } - } -private: - TwoPhasePendingLidTracker * _tracker; - LidList _lids; -}; - -} - -PendingLidTrackerBase::Snapshot -TwoPhasePendingLidTracker::produceSnapshot() { - LidList toCommit; - MonitorGuard guard(_mutex); - for (auto & entry : _pending) { - if (entry.second.need_commit) { - toCommit.emplace_back(entry.first); - entry.second.inflight_commit ++; - entry.second.need_commit = false; - } - } - return std::make_unique(std::move(toCommit), *this); } } diff --git a/searchcore/src/vespa/searchcore/proton/common/pendinglidtracker.h b/searchcore/src/vespa/searchcore/proton/common/pendinglidtracker.h index c4ff0d7d639..15dd0a9a4c3 100644 --- a/searchcore/src/vespa/searchcore/proton/common/pendinglidtracker.h +++ b/searchcore/src/vespa/searchcore/proton/common/pendinglidtracker.h @@ -5,15 +5,9 @@ #include #include #include -#include namespace proton { -/** Interface for tracking lids in the feed pipeline. - * A token is created with produce(lid). - * Once the token goes out of scope the lid is then consumed. - * This is used to track which lids are inflight in the feed pipeline. - */ class IPendingLidTracker { public: class Token { @@ -24,8 +18,8 @@ public: Token & operator = (const Token &) = delete; Token & operator = (Token &&) = delete; Token(Token && rhs) noexcept - : _tracker(rhs._tracker), - _lid(rhs._lid) + : _tracker(rhs._tracker), + _lid(rhs._lid) { rhs._tracker = nullptr; } @@ -36,113 +30,33 @@ public: }; virtual ~IPendingLidTracker() = default; virtual Token produce(uint32_t lid) = 0; + virtual void waitForConsumedLid(uint32_t lid) = 0; private: virtual void consume(uint32_t lid) = 0; + std::mutex _mutex; + std::condition_variable _cond; + vespalib::hash_map _pending; }; -/** - * This is an interface for checking/waiting the state of a lid in the feed pipeline - * The lid might need a commit (NEED_COMMIT), but if visibility-delay is zero it will go directly to WAITING - * as no explicit commit is needed. - * After a commit has been started the lid is transferred to WAITING. - * Once the commit has gone through the lid is in state COMPLETED. - */ -class ILidCommitState { +class NoopLidTracker : public IPendingLidTracker { public: - enum class State {NEED_COMMIT, WAITING, COMPLETED}; - using LidList = std::vector; - virtual ~ILidCommitState() = default; - State getState() const { return waitState(State::NEED_COMMIT); } - State getState(uint32_t lid) const { return waitState(State::NEED_COMMIT, lid); } - State getState(const LidList & lids) const { return waitState(State::NEED_COMMIT, lids); } - void waitComplete(uint32_t lid) const; - void waitComplete(const LidList & lids) const; - void waitComplete() const; + Token produce(uint32_t lid) override; + void waitForConsumedLid(uint32_t ) override { } private: - virtual State waitState(State state, uint32_t lid) const = 0; - virtual State waitState(State state, const LidList & lids) const = 0; - virtual State waitState(State state) const = 0; -}; - -/** - * Base class for doing 2 phased lid tracking. The first phase is from when the feed operation - * is in progress and lasts until the OperationDoneContext goes out of scope. This might include commit - * when visibility-delay is zero. - * When a commit is started a snapshot containing all lids in state NEED_COMMIT are taken, - * while also moving the lids to WAITING. Once the snapshot goes out of scope when the commit is complete, - * it will cleanup and move all lids from WAITING to COMPLETE. - */ -class PendingLidTrackerBase : public IPendingLidTracker, - public ILidCommitState -{ -public: - ~PendingLidTrackerBase(); - struct Payload { - virtual ~Payload() = default; - }; - using Snapshot = std::unique_ptr; - virtual Snapshot produceSnapshot() = 0; - - State waitState(State state) const override; - State waitState(State state, uint32_t lid) const override; - State waitState(State state, const LidList & lids) const override; -protected: - using MonitorGuard = std::unique_lock; - PendingLidTrackerBase(); - virtual LidList pendingLids() const = 0; - virtual State waitFor(MonitorGuard & guard, State state, uint32_t lid) const = 0; - MonitorGuard getGuard() { return MonitorGuard(_mutex); } - mutable std::mutex _mutex; - mutable std::condition_variable _cond; + void consume(uint32_t ) override { } }; -/** - * Use for tracking lids when visibility-delay is zero and commit is implicit. - * In this case lids go directly to WAITING and the second phase is a noop. - */ -class PendingLidTracker : public PendingLidTrackerBase -{ +class PendingLidTracker : public IPendingLidTracker { public: PendingLidTracker(); ~PendingLidTracker() override; Token produce(uint32_t lid) override; - Snapshot produceSnapshot() override; + void waitForConsumedLid(uint32_t lid) override; private: - LidList pendingLids() const override; void consume(uint32_t lid) override; - State waitFor(MonitorGuard & guard, State state, uint32_t lid) const override; - + std::mutex _mutex; + std::condition_variable _cond; vespalib::hash_map _pending; }; -namespace common::internal { - class CommitList; -} -/** - * Use for tracking lids in 2 phases which is needed when visibility-delay is non-zero. - * It tracks lids that are in feed pipeline, lids where commit has been started and when they fully complete. - */ -class TwoPhasePendingLidTracker : public PendingLidTrackerBase -{ -public: - TwoPhasePendingLidTracker(); - ~TwoPhasePendingLidTracker() override; - Token produce(uint32_t lid) override; - Snapshot produceSnapshot() override; -private: - friend common::internal::CommitList; - void consume(uint32_t lid) override; - void consumeSnapshot(LidList lids); - LidList pendingLids() const override; - State waitFor(MonitorGuard & guard, State state, uint32_t lid) const override; - struct Counters { - Counters() : inflight_feed(0), inflight_commit(0), need_commit(false) {} - bool empty() const { return (inflight_feed == 0) && ! need_commit && (inflight_commit == 0); } - uint32_t inflight_feed; - uint32_t inflight_commit; - bool need_commit; - }; - vespalib::hash_map _pending; -}; - } diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/commit_and_wait_document_retriever.cpp b/searchcore/src/vespa/searchcore/proton/persistenceengine/commit_and_wait_document_retriever.cpp index daa240e8b12..f6a52ad7b44 100644 --- a/searchcore/src/vespa/searchcore/proton/persistenceengine/commit_and_wait_document_retriever.cpp +++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/commit_and_wait_document_retriever.cpp @@ -6,7 +6,7 @@ namespace proton { CommitAndWaitDocumentRetriever::CommitAndWaitDocumentRetriever(IDocumentRetriever::SP retriever, ICommitable &commit, - ILidCommitState & unCommittedLidTracker) + IPendingLidTracker & unCommittedLidTracker) : _retriever(std::move(retriever)), _commit(commit), _uncommittedLidsTracker(unCommittedLidTracker) diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/commit_and_wait_document_retriever.h b/searchcore/src/vespa/searchcore/proton/persistenceengine/commit_and_wait_document_retriever.h index 2b8264f7eef..33258f767dc 100644 --- a/searchcore/src/vespa/searchcore/proton/persistenceengine/commit_and_wait_document_retriever.h +++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/commit_and_wait_document_retriever.h @@ -17,10 +17,10 @@ class CommitAndWaitDocumentRetriever : public IDocumentRetriever { IDocumentRetriever::SP _retriever; ICommitable &_commit; - ILidCommitState &_uncommittedLidsTracker; + IPendingLidTracker &_uncommittedLidsTracker; using Bucket = storage::spi::Bucket; public: - CommitAndWaitDocumentRetriever(IDocumentRetriever::SP retriever, ICommitable &commit, ILidCommitState & unCommittedLidTracker); + CommitAndWaitDocumentRetriever(IDocumentRetriever::SP retriever, ICommitable &commit, IPendingLidTracker & unCommittedLidTracker); ~CommitAndWaitDocumentRetriever() override; const document::DocumentTypeRepo &getDocumentTypeRepo() const override; diff --git a/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.cpp b/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.cpp index 00f67a4dd7b..09dbace0bdc 100644 --- a/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.cpp @@ -233,7 +233,7 @@ CombiningFeedView::forceCommit(search::SerialNum serialNum) } } -ILidCommitState & +IPendingLidTracker & CombiningFeedView::getUncommittedLidsTracker() { LOG_ABORT("CombiningFeedView::getUncommittedLidsTracker should never be called."); } diff --git a/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.h b/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.h index 5625d41ccdc..aa730e104aa 100644 --- a/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.h +++ b/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.h @@ -77,7 +77,7 @@ public: void sync() override; void handlePruneRemovedDocuments(const PruneRemovedDocumentsOperation &pruneOp) override; void handleCompactLidSpace(const CompactLidSpaceOperation &op) override; - ILidCommitState & getUncommittedLidsTracker() override; + IPendingLidTracker & getUncommittedLidsTracker() override; // Called by document db executor void setCalculator(const IBucketStateCalculator::SP &newCalc); diff --git a/searchcore/src/vespa/searchcore/proton/server/documentsubdbcollection.cpp b/searchcore/src/vespa/searchcore/proton/server/documentsubdbcollection.cpp index b0127ef58a4..54363c5b197 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentsubdbcollection.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/documentsubdbcollection.cpp @@ -130,7 +130,7 @@ DocumentSubDBCollection::createRetrievers() namespace { IDocumentRetriever::SP -wrapRetriever(IDocumentRetriever::SP retriever, ICommitable &commit, ILidCommitState & unCommitedLidsTracker) +wrapRetriever(IDocumentRetriever::SP retriever, ICommitable &commit, IPendingLidTracker & unCommitedLidsTracker) { return std::make_shared(std::move(retriever), commit, unCommitedLidsTracker); } @@ -147,8 +147,7 @@ DocumentSubDBCollection::getRetrievers(IDocumentRetriever::ReadConsistency consi assert(list->size() == 3); wrappedList->push_back(wrapRetriever((*list)[_readySubDbId], visibilityHandler, getReadySubDB()->getFeedView()->getUncommittedLidsTracker())); - wrappedList->push_back(wrapRetriever((*list)[_remSubDbId], visibilityHandler, - getRemSubDB()->getFeedView()->getUncommittedLidsTracker())); + wrappedList->push_back((*list)[_remSubDbId]); wrappedList->push_back(wrapRetriever((*list)[_notReadySubDbId], visibilityHandler, getNotReadySubDB()->getFeedView()->getUncommittedLidsTracker())); return wrappedList; @@ -168,7 +167,7 @@ void DocumentSubDBCollection::maintenanceSync(MaintenanceController &mc, ICommit MaintenanceDocumentSubDB remSubDB(getRemSubDB()->getName(), _remSubDbId, getRemSubDB()->getDocumentMetaStoreContext().getSP(), - wrapRetriever((*retrievers)[_remSubDbId], commit, getRemSubDB()->getFeedView()->getUncommittedLidsTracker()), + (*retrievers)[_remSubDbId], getRemSubDB()->getFeedView()); MaintenanceDocumentSubDB notReadySubDB(getNotReadySubDB()->getName(), _notReadySubDbId, diff --git a/searchcore/src/vespa/searchcore/proton/server/forcecommitcontext.cpp b/searchcore/src/vespa/searchcore/proton/server/forcecommitcontext.cpp index cdb7ec20f26..2140792e206 100644 --- a/searchcore/src/vespa/searchcore/proton/server/forcecommitcontext.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/forcecommitcontext.cpp @@ -8,10 +8,9 @@ namespace proton { ForceCommitContext::ForceCommitContext(vespalib::Executor &executor, - IDocumentMetaStore &documentMetaStore, - PendingLidTrackerBase::Snapshot lidsToCommit) + IDocumentMetaStore &documentMetaStore) : _executor(executor), - _task(std::make_unique(documentMetaStore, std::move(lidsToCommit))), + _task(std::make_unique(documentMetaStore)), _committedDocIdLimit(0u), _docIdLimit(nullptr) { diff --git a/searchcore/src/vespa/searchcore/proton/server/forcecommitcontext.h b/searchcore/src/vespa/searchcore/proton/server/forcecommitcontext.h index 8c7df2aaedd..494e12002a6 100644 --- a/searchcore/src/vespa/searchcore/proton/server/forcecommitcontext.h +++ b/searchcore/src/vespa/searchcore/proton/server/forcecommitcontext.h @@ -2,8 +2,10 @@ #pragma once -#include #include +#include +#include +#include namespace vespalib { class Executor; } @@ -22,15 +24,14 @@ class DocIdLimit; */ class ForceCommitContext : public search::IDestructorCallback { - vespalib::Executor &_executor; - std::unique_ptr _task; - uint32_t _committedDocIdLimit; - DocIdLimit *_docIdLimit; + vespalib::Executor &_executor; + std::unique_ptr _task; + uint32_t _committedDocIdLimit; + DocIdLimit *_docIdLimit; public: ForceCommitContext(vespalib::Executor &executor, - IDocumentMetaStore &documentMetaStore, - PendingLidTrackerBase::Snapshot lidsToCommit); + IDocumentMetaStore &documentMetaStore); ~ForceCommitContext() override; diff --git a/searchcore/src/vespa/searchcore/proton/server/forcecommitdonetask.cpp b/searchcore/src/vespa/searchcore/proton/server/forcecommitdonetask.cpp index ffae783d8e6..2893065f08b 100644 --- a/searchcore/src/vespa/searchcore/proton/server/forcecommitdonetask.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/forcecommitdonetask.cpp @@ -5,15 +5,16 @@ namespace proton { -ForceCommitDoneTask::ForceCommitDoneTask(IDocumentMetaStore &documentMetaStore, PendingLidTrackerBase::Snapshot lidsToCommit) +ForceCommitDoneTask::ForceCommitDoneTask(IDocumentMetaStore &documentMetaStore) : _lidsToReuse(), - _lidsToCommit(std::move(lidsToCommit)), _holdUnblockShrinkLidSpace(false), _documentMetaStore(documentMetaStore) { } -ForceCommitDoneTask::~ForceCommitDoneTask() = default; +ForceCommitDoneTask::~ForceCommitDoneTask() +{ +} void ForceCommitDoneTask::reuseLids(std::vector &&lids) @@ -35,7 +36,6 @@ ForceCommitDoneTask::run() if (_holdUnblockShrinkLidSpace) { _documentMetaStore.holdUnblockShrinkLidSpace(); } - _lidsToCommit.reset(); } } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/forcecommitdonetask.h b/searchcore/src/vespa/searchcore/proton/server/forcecommitdonetask.h index c9c4ba9f6ec..1b459ab6d02 100644 --- a/searchcore/src/vespa/searchcore/proton/server/forcecommitdonetask.h +++ b/searchcore/src/vespa/searchcore/proton/server/forcecommitdonetask.h @@ -2,8 +2,8 @@ #pragma once -#include #include +#include namespace proton { @@ -25,13 +25,13 @@ struct IDocumentMetaStore; */ class ForceCommitDoneTask : public vespalib::Executor::Task { - std::vector _lidsToReuse; - PendingLidTrackerBase::Snapshot _lidsToCommit; - bool _holdUnblockShrinkLidSpace; - IDocumentMetaStore &_documentMetaStore; + std::vector _lidsToReuse; + bool _holdUnblockShrinkLidSpace; + IDocumentMetaStore &_documentMetaStore; public: - ForceCommitDoneTask(IDocumentMetaStore &documentMetaStore, PendingLidTrackerBase::Snapshot lidsToCommit); + ForceCommitDoneTask(IDocumentMetaStore &documentMetaStore); + ~ForceCommitDoneTask() override; void reuseLids(std::vector &&lids); @@ -43,7 +43,7 @@ public: void run() override; bool empty() const { - return _lidsToReuse.empty() && !_holdUnblockShrinkLidSpace && !_lidsToCommit; + return _lidsToReuse.empty() && !_holdUnblockShrinkLidSpace; } }; diff --git a/searchcore/src/vespa/searchcore/proton/server/ifeedview.h b/searchcore/src/vespa/searchcore/proton/server/ifeedview.h index 734c27ff09d..11e95937ec9 100644 --- a/searchcore/src/vespa/searchcore/proton/server/ifeedview.h +++ b/searchcore/src/vespa/searchcore/proton/server/ifeedview.h @@ -62,7 +62,7 @@ public: virtual void forceCommit(search::SerialNum serialNum) = 0; virtual void handlePruneRemovedDocuments(const PruneRemovedDocumentsOperation & pruneOp) = 0; virtual void handleCompactLidSpace(const CompactLidSpaceOperation &op) = 0; - virtual ILidCommitState & getUncommittedLidsTracker() = 0; + virtual IPendingLidTracker & getUncommittedLidsTracker() = 0; }; } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/removedonecontext.cpp b/searchcore/src/vespa/searchcore/proton/server/removedonecontext.cpp index d35e6bb7127..3d770fad313 100644 --- a/searchcore/src/vespa/searchcore/proton/server/removedonecontext.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/removedonecontext.cpp @@ -7,14 +7,13 @@ namespace proton { -RemoveDoneContext::RemoveDoneContext(FeedToken token, IPendingLidTracker::Token uncommitted, vespalib::Executor &executor, +RemoveDoneContext::RemoveDoneContext(FeedToken token, vespalib::Executor &executor, IDocumentMetaStore &documentMetaStore, PendingNotifyRemoveDone &&pendingNotifyRemoveDone, uint32_t lid) : OperationDoneContext(std::move(token)), _executor(executor), _task(), - _pendingNotifyRemoveDone(std::move(pendingNotifyRemoveDone)), - _uncommitted(std::move(uncommitted)) + _pendingNotifyRemoveDone(std::move(pendingNotifyRemoveDone)) { if (lid != 0) { _task = std::make_unique(documentMetaStore, lid); diff --git a/searchcore/src/vespa/searchcore/proton/server/removedonecontext.h b/searchcore/src/vespa/searchcore/proton/server/removedonecontext.h index 6880696fe88..07057d1c431 100644 --- a/searchcore/src/vespa/searchcore/proton/server/removedonecontext.h +++ b/searchcore/src/vespa/searchcore/proton/server/removedonecontext.h @@ -3,11 +3,10 @@ #pragma once #include "operationdonecontext.h" -#include -#include #include #include #include +#include namespace proton { @@ -27,10 +26,9 @@ class RemoveDoneContext : public OperationDoneContext vespalib::Executor &_executor; std::unique_ptr _task; PendingNotifyRemoveDone _pendingNotifyRemoveDone; - IPendingLidTracker::Token _uncommitted; public: - RemoveDoneContext(FeedToken token, IPendingLidTracker::Token uncommitted, vespalib::Executor &executor, IDocumentMetaStore &documentMetaStore, + RemoveDoneContext(FeedToken token, vespalib::Executor &executor, IDocumentMetaStore &documentMetaStore, PendingNotifyRemoveDone &&pendingNotifyRemoveDone, uint32_t lid); ~RemoveDoneContext() override; }; diff --git a/searchcore/src/vespa/searchcore/proton/server/searchabledocsubdb.cpp b/searchcore/src/vespa/searchcore/proton/server/searchabledocsubdb.cpp index a57c0142db8..6ff5a149641 100644 --- a/searchcore/src/vespa/searchcore/proton/server/searchabledocsubdb.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/searchabledocsubdb.cpp @@ -271,7 +271,7 @@ SearchableDocSubDB::reconfigureIndexSearchable() { std::lock_guard guard(_configMutex); // Create new views as needed. - _commitable.commitAndWait(_iFeedView.get()->getUncommittedLidsTracker()); + _commitable.commitAndWait(); _configurer.reconfigureIndexSearchable(); // Activate new feed view at once syncViews(); diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp index e958405a62e..c5799f9082c 100644 --- a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp @@ -110,46 +110,30 @@ private: IDestructorCallback::SP _moveDoneCtx; public: - RemoveDoneContextForMove(FeedToken token, IPendingLidTracker::Token uncommitted, vespalib::Executor &executor, - IDocumentMetaStore &documentMetaStore, PendingNotifyRemoveDone &&pendingNotifyRemoveDone, + RemoveDoneContextForMove(FeedToken token, vespalib::Executor &executor, IDocumentMetaStore &documentMetaStore, + PendingNotifyRemoveDone &&pendingNotifyRemoveDone, uint32_t lid, IDestructorCallback::SP moveDoneCtx) - : RemoveDoneContext(std::move(token), std::move(uncommitted), executor, - documentMetaStore, std::move(pendingNotifyRemoveDone) ,lid), + : RemoveDoneContext(std::move(token), executor, documentMetaStore, std::move(pendingNotifyRemoveDone) ,lid), _moveDoneCtx(std::move(moveDoneCtx)) {} ~RemoveDoneContextForMove() override = default; }; std::shared_ptr -createRemoveDoneContext(FeedToken token, IPendingLidTracker::Token uncommitted, vespalib::Executor &executor, - IDocumentMetaStore &documentMetaStore,PendingNotifyRemoveDone &&pendingNotifyRemoveDone, +createRemoveDoneContext(FeedToken token, vespalib::Executor &executor, IDocumentMetaStore &documentMetaStore, + PendingNotifyRemoveDone &&pendingNotifyRemoveDone, uint32_t lid, IDestructorCallback::SP moveDoneCtx) { if (moveDoneCtx) { return std::make_shared - (std::move(token), std::move(uncommitted), executor, documentMetaStore, std::move(pendingNotifyRemoveDone), + (std::move(token), executor, documentMetaStore, std::move(pendingNotifyRemoveDone), lid, std::move(moveDoneCtx)); } else { return std::make_shared - (std::move(token), std::move(uncommitted), executor, documentMetaStore, std::move(pendingNotifyRemoveDone), lid); + (std::move(token), executor, documentMetaStore, std::move(pendingNotifyRemoveDone), lid); } } -class SummaryPutDoneContext : public OperationDoneContext -{ - IPendingLidTracker::Token _uncommitted; -public: - SummaryPutDoneContext(FeedToken token, IPendingLidTracker::Token uncommitted); - ~SummaryPutDoneContext() override; -}; - -SummaryPutDoneContext::SummaryPutDoneContext(FeedToken token, IPendingLidTracker::Token uncommitted) - : OperationDoneContext(std::move(token)), - _uncommitted(std::move(uncommitted)) -{} - -SummaryPutDoneContext::~SummaryPutDoneContext() = default; - std::vector getGidsToRemove(const IDocumentMetaStore &metaStore, const LidVectorContext::LidVector &lidsToRemove) { @@ -180,8 +164,7 @@ void putMetaData(documentmetastore::IStore &meta_store, const DocumentId & doc_i } void removeMetaData(documentmetastore::IStore &meta_store, const GlobalId & gid, const DocumentId &doc_id, - const DocumentOperation &op, bool is_removed_doc) -{ + const DocumentOperation &op, bool is_removed_doc) { assert(meta_store.validLid(op.getPrevLid())); assert(is_removed_doc == op.getPrevMarkedAsRemoved()); const RawDocumentMetaData &meta(meta_store.getRawMetaData(op.getPrevLid())); @@ -209,12 +192,12 @@ moveMetaData(documentmetastore::IStore &meta_store, const DocumentId & doc_id, c meta_store.move(op.getPrevLid(), op.getLid()); } -std::unique_ptr +std::unique_ptr createUncommitedLidTracker(bool needImmediateCommit) { if (needImmediateCommit) { - return std::make_unique(); + return std::make_unique(); } else { - return std::make_unique(); + return std::make_unique(); } } @@ -247,15 +230,14 @@ StoreOnlyFeedView::sync() _writeService.summary().sync(); } -ILidCommitState & +IPendingLidTracker & StoreOnlyFeedView::getUncommittedLidsTracker() { return *_pendingLidsForCommit; } void StoreOnlyFeedView::forceCommit(SerialNum serialNum) { - forceCommit(serialNum, std::make_shared(_writeService.master(), _metaStore, - _pendingLidsForCommit->produceSnapshot())); + forceCommit(serialNum, std::make_shared(_writeService.master(), _metaStore)); } void @@ -318,7 +300,6 @@ StoreOnlyFeedView::internalPut(FeedToken token, const PutOperation &putOp) _params._subDbId, doc->toString(true).size(), doc->toString(true).c_str()); PendingNotifyRemoveDone pendingNotifyRemoveDone = adjustMetaStore(putOp, docId.getGlobalId(), docId); - auto uncommitted = _pendingLidsForCommit->produce(putOp.getLid()); considerEarlyAck(token); bool docAlreadyExists = putOp.getValidPrevDbdId(_params._subDbId); @@ -327,7 +308,7 @@ StoreOnlyFeedView::internalPut(FeedToken token, const PutOperation &putOp) bool immediateCommit = needCommit(); const document::GlobalId &gid = docId.getGlobalId(); std::shared_ptr onWriteDone = - createPutDoneContext(std::move(token), std::move(uncommitted), + createPutDoneContext(std::move(token), _pendingLidsForCommit->produce(putOp.getLid()), _gidToLidChangeHandler, doc, gid, putOp.getLid(), serialNum, putOp.changedDbdId() && useDocumentMetaStore(serialNum)); putSummary(serialNum, putOp.getLid(), doc, onWriteDone); @@ -336,8 +317,8 @@ StoreOnlyFeedView::internalPut(FeedToken token, const PutOperation &putOp) } if (docAlreadyExists && putOp.changedDbdId()) { assert(!putOp.getValidDbdId(_params._subDbId)); - internalRemove(std::move(token), _pendingLidsForCommit->produce(putOp.getPrevLid()), serialNum, - std::move(pendingNotifyRemoveDone), putOp.getPrevLid(), IDestructorCallback::SP()); + internalRemove(std::move(token), serialNum, std::move(pendingNotifyRemoveDone), + putOp.getPrevLid(), IDestructorCallback::SP()); } } @@ -477,11 +458,10 @@ StoreOnlyFeedView::internalUpdate(FeedToken token, const UpdateOperation &updOp) (void) updateOk; _metaStore.commit(serialNum, serialNum); } - auto uncommitted = _pendingLidsForCommit->produce(updOp.getLid()); considerEarlyAck(token); bool immediateCommit = needCommit(); - auto onWriteDone = createUpdateDoneContext(std::move(token), std::move(uncommitted), updOp.getUpdate()); + auto onWriteDone = createUpdateDoneContext(std::move(token), _pendingLidsForCommit->produce(updOp.getLid()), updOp.getUpdate()); UpdateScope updateScope(*_schema, upd); updateAttributes(serialNum, lid, upd, immediateCommit, onWriteDone, updateScope); @@ -489,7 +469,7 @@ StoreOnlyFeedView::internalUpdate(FeedToken token, const UpdateOperation &updOp) PromisedDoc promisedDoc; FutureDoc futureDoc = promisedDoc.get_future().share(); onWriteDone->setDocument(futureDoc); - _pendingLidsForDocStore.waitComplete(lid); + _pendingLidsForDocStore.waitForConsumedLid(lid); if (updateScope._indexedFields) { updateIndexedFields(serialNum, lid, futureDoc, immediateCommit, onWriteDone); } @@ -605,20 +585,19 @@ StoreOnlyFeedView::internalRemove(FeedToken token, const RemoveOperationWithDocI rmOp.getSubDbId(), rmOp.getLid(), rmOp.getPrevSubDbId(), rmOp.getPrevLid(), _params._subDbId); PendingNotifyRemoveDone pendingNotifyRemoveDone = adjustMetaStore(rmOp, docId.getGlobalId(), docId); - auto uncommitted = _pendingLidsForCommit->produce(rmOp.getLid()); considerEarlyAck(token); if (rmOp.getValidDbdId(_params._subDbId)) { auto clearDoc = std::make_unique(*_docType, docId); clearDoc->setRepo(*_repo); - putSummary(serialNum, rmOp.getLid(), std::move(clearDoc), std::make_shared(std::move(token), std::move(uncommitted))); + putSummary(serialNum, rmOp.getLid(), std::move(clearDoc), std::shared_ptr()); } if (rmOp.getValidPrevDbdId(_params._subDbId)) { if (rmOp.changedDbdId()) { assert(!rmOp.getValidDbdId(_params._subDbId)); - internalRemove(std::move(token), _pendingLidsForCommit->produce(rmOp.getPrevLid()), serialNum, - std::move(pendingNotifyRemoveDone), rmOp.getPrevLid(), IDestructorCallback::SP()); + internalRemove(std::move(token), serialNum, std::move(pendingNotifyRemoveDone), + rmOp.getPrevLid(), IDestructorCallback::SP()); } } } @@ -631,26 +610,25 @@ StoreOnlyFeedView::internalRemove(FeedToken token, const RemoveOperationWithGid const SerialNum serialNum = rmOp.getSerialNum(); DocumentId dummy; PendingNotifyRemoveDone pendingNotifyRemoveDone = adjustMetaStore(rmOp, rmOp.getGlobalId(), dummy); - auto uncommitted = _pendingLidsForCommit->produce(rmOp.getLid()); considerEarlyAck(token); if (rmOp.getValidPrevDbdId(_params._subDbId)) { if (rmOp.changedDbdId()) { assert(!rmOp.getValidDbdId(_params._subDbId)); - internalRemove(std::move(token), _pendingLidsForCommit->produce(rmOp.getPrevLid()), serialNum, std::move(pendingNotifyRemoveDone), + internalRemove(std::move(token), serialNum, std::move(pendingNotifyRemoveDone), rmOp.getPrevLid(), IDestructorCallback::SP()); } } } void -StoreOnlyFeedView::internalRemove(FeedToken token, IPendingLidTracker::Token uncommitted, SerialNum serialNum, +StoreOnlyFeedView::internalRemove(FeedToken token, SerialNum serialNum, PendingNotifyRemoveDone &&pendingNotifyRemoveDone, Lid lid, IDestructorCallback::SP moveDoneCtx) { bool explicitReuseLid = _lidReuseDelayer.delayReuse(lid); std::shared_ptr onWriteDone; - onWriteDone = createRemoveDoneContext(std::move(token), std::move(uncommitted),_writeService.master(), _metaStore, + onWriteDone = createRemoveDoneContext(std::move(token), _writeService.master(), _metaStore, std::move(pendingNotifyRemoveDone), (explicitReuseLid ? lid : 0u), std::move(moveDoneCtx)); removeSummary(serialNum, lid, onWriteDone); @@ -817,7 +795,7 @@ StoreOnlyFeedView::handleMove(const MoveOperation &moveOp, IDestructorCallback:: putIndexedFields(serialNum, moveOp.getLid(), doc, immediateCommit, onWriteDone); } if (docAlreadyExists && moveOp.changedDbdId()) { - internalRemove(FeedToken(), _pendingLidsForCommit->produce(moveOp.getPrevLid()), serialNum, std::move(pendingNotifyRemoveDone), moveOp.getPrevLid(), doneCtx); + internalRemove(FeedToken(), serialNum, std::move(pendingNotifyRemoveDone), moveOp.getPrevLid(), doneCtx); } } @@ -855,8 +833,8 @@ StoreOnlyFeedView::handleCompactLidSpace(const CompactLidSpaceOperation &op) const SerialNum serialNum = op.getSerialNum(); if (useDocumentMetaStore(serialNum)) { getDocumentMetaStore()->get().compactLidSpace(op.getLidLimit()); - auto commitContext(std::make_shared(_writeService.master(), _metaStore, - _pendingLidsForCommit->produceSnapshot())); + std::shared_ptr + commitContext(std::make_shared(_writeService.master(), _metaStore)); commitContext->holdUnblockShrinkLidSpace(); forceCommit(serialNum, commitContext); } diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h index 8e9cc6508f2..94ed2cf1b4c 100644 --- a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h +++ b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h @@ -144,7 +144,7 @@ private: const document::DocumentType *_docType; LidReuseDelayer _lidReuseDelayer; PendingLidTracker _pendingLidsForDocStore; - std::unique_ptr _pendingLidsForCommit; + std::unique_ptr _pendingLidsForCommit; protected: const search::index::Schema::SP _schema; @@ -184,8 +184,7 @@ private: size_t removeDocuments(const RemoveDocumentsOperation &op, bool remove_index_and_attribute_fields, bool immediateCommit); - void internalRemove(FeedToken token, IPendingLidTracker::Token uncommitted, SerialNum serialNum, - PendingNotifyRemoveDone &&pendingNotifyRemoveDone, + void internalRemove(FeedToken token, SerialNum serialNum, PendingNotifyRemoveDone &&pendingNotifyRemoveDone, Lid lid, std::shared_ptr moveDoneCtx); // Ack token early if visibility delay is nonzero @@ -264,7 +263,7 @@ public: */ void handlePruneRemovedDocuments(const PruneRemovedDocumentsOperation &pruneOp) override; void handleCompactLidSpace(const CompactLidSpaceOperation &op) override; - ILidCommitState & getUncommittedLidsTracker() override; + IPendingLidTracker & getUncommittedLidsTracker() override; }; } diff --git a/searchcore/src/vespa/searchcore/proton/server/visibilityhandler.cpp b/searchcore/src/vespa/searchcore/proton/server/visibilityhandler.cpp index a3524ae79f3..c582733a983 100644 --- a/searchcore/src/vespa/searchcore/proton/server/visibilityhandler.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/visibilityhandler.cpp @@ -23,56 +23,44 @@ VisibilityHandler::VisibilityHandler(const IGetSerialNum & serial, VisibilityHandler::~VisibilityHandler() = default; -void -VisibilityHandler::internalCommit(bool force) -{ - if (_writeService.master().isCurrentThread()) { - performCommit(force); - } else { - std::lock_guard guard(_lock); - bool wasCommitTaskSpawned = startCommit(guard, force); - (void) wasCommitTaskSpawned; - } -} void VisibilityHandler::commit() { if (hasVisibilityDelay()) { - internalCommit(true); + if (_writeService.master().isCurrentThread()) { + performCommit(true); + } else { + std::lock_guard guard(_lock); + startCommit(guard, true); + } } } void -VisibilityHandler::commitAndWait(ILidCommitState & unCommittedLidTracker) +VisibilityHandler::commitAndWait() { - ILidCommitState::State state = unCommittedLidTracker.getState(); - if (state == ILidCommitState::State::NEED_COMMIT) { - internalCommit(false); - } - if (state != ILidCommitState::State::COMPLETED) { - unCommittedLidTracker.waitComplete(); + if (hasVisibilityDelay()) { + if (_writeService.master().isCurrentThread()) { + performCommit(false); + } else { + std::lock_guard guard(_lock); + if (startCommit(guard, false)) { + _writeService.master().sync(); + } + } } + // Always sync attribute writer threads so attribute vectors are + // properly updated when document retriver rebuilds document + _writeService.attributeFieldWriter().sync(); + _writeService.summary().sync(); } void -VisibilityHandler::commitAndWait(ILidCommitState & unCommittedLidTracker, uint32_t lid) { - ILidCommitState::State state = unCommittedLidTracker.getState(lid); - if (state == ILidCommitState::State::NEED_COMMIT) { - internalCommit(false); - } - if (state != ILidCommitState::State::COMPLETED) { - unCommittedLidTracker.waitComplete(lid); - } +VisibilityHandler::commitAndWait(IPendingLidTracker &, uint32_t ) { + commitAndWait(); } -void -VisibilityHandler::commitAndWait(ILidCommitState & unCommittedLidTracker, const std::vector & lids) { - ILidCommitState::State state = unCommittedLidTracker.getState(lids); - if (state == ILidCommitState::State::NEED_COMMIT) { - internalCommit(false); - } - if (state != ILidCommitState::State::COMPLETED) { - unCommittedLidTracker.waitComplete(lids); - } +void VisibilityHandler::commitAndWait(IPendingLidTracker &, const std::vector & ) { + commitAndWait(); } bool diff --git a/searchcore/src/vespa/searchcore/proton/server/visibilityhandler.h b/searchcore/src/vespa/searchcore/proton/server/visibilityhandler.h index 560b6e75423..6e65b6f6257 100644 --- a/searchcore/src/vespa/searchcore/proton/server/visibilityhandler.h +++ b/searchcore/src/vespa/searchcore/proton/server/visibilityhandler.h @@ -29,13 +29,12 @@ public: vespalib::duration getVisibilityDelay() const { return _visibilityDelay; } bool hasVisibilityDelay() const { return _visibilityDelay != vespalib::duration::zero(); } void commit() override; - void commitAndWait(ILidCommitState & unCommittedLidTracker) override; - void commitAndWait(ILidCommitState &, uint32_t ) override; - void commitAndWait(ILidCommitState &, const std::vector & ) override; + void commitAndWait() override; + void commitAndWait(IPendingLidTracker &, uint32_t ) override; + void commitAndWait(IPendingLidTracker &, const std::vector & ) override; private: bool startCommit(const std::lock_guard &unused, bool force); void performCommit(bool force); - void internalCommit(bool force); const IGetSerialNum & _serial; IThreadingService & _writeService; const FeedViewHolder & _feedView; diff --git a/searchcore/src/vespa/searchcore/proton/test/dummy_feed_view.cpp b/searchcore/src/vespa/searchcore/proton/test/dummy_feed_view.cpp index 2507abcc9ea..ba9dd7ecc39 100644 --- a/searchcore/src/vespa/searchcore/proton/test/dummy_feed_view.cpp +++ b/searchcore/src/vespa/searchcore/proton/test/dummy_feed_view.cpp @@ -17,7 +17,7 @@ DummyFeedView::DummyFeedView(std::shared_ptr d DummyFeedView::~DummyFeedView() = default; -ILidCommitState & +IPendingLidTracker & DummyFeedView::getUncommittedLidsTracker() { assert(false); } diff --git a/searchcore/src/vespa/searchcore/proton/test/dummy_feed_view.h b/searchcore/src/vespa/searchcore/proton/test/dummy_feed_view.h index 4fc5cbe5018..122559f9e68 100644 --- a/searchcore/src/vespa/searchcore/proton/test/dummy_feed_view.h +++ b/searchcore/src/vespa/searchcore/proton/test/dummy_feed_view.h @@ -33,7 +33,7 @@ struct DummyFeedView : public IFeedView void handlePruneRemovedDocuments(const PruneRemovedDocumentsOperation &) override {} void handleCompactLidSpace(const CompactLidSpaceOperation &) override {} void forceCommit(search::SerialNum) override { } - ILidCommitState & getUncommittedLidsTracker() override; + IPendingLidTracker & getUncommittedLidsTracker() override; }; } -- cgit v1.2.3