diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-09-01 15:32:16 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-09-01 15:32:16 +0200 |
commit | 402ce3fe9c7dbdbf63d8bfe8121683440adc6bc5 (patch) | |
tree | be75321bee8c000899ca473b35faaa95c0699621 | |
parent | 3b8f7fdff4872bd010286753f6072ec492f14a48 (diff) | |
parent | 06310164a5a8a86b0be4934dd2002e9d657e1127 (diff) |
Merge pull request #14226 from vespa-engine/balder/counters-2-sequencenumber
Balder/counters 2 sequencenumber
12 files changed, 143 insertions, 135 deletions
diff --git a/searchcore/src/tests/proton/document_iterator/document_iterator_test.cpp b/searchcore/src/tests/proton/document_iterator/document_iterator_test.cpp index 1579b8ec91a..ec314c50f4a 100644 --- a/searchcore/src/tests/proton/document_iterator/document_iterator_test.cpp +++ b/searchcore/src/tests/proton/document_iterator/document_iterator_test.cpp @@ -9,6 +9,7 @@ #include <vespa/persistence/spi/result.h> #include <vespa/persistence/spi/test.h> #include <vespa/searchcore/proton/common/attribute_updater.h> +#include <vespa/searchcore/proton/common/pendinglidtracker.h> #include <vespa/searchcore/proton/persistenceengine/document_iterator.h> #include <vespa/searchcore/proton/persistenceengine/commit_and_wait_document_retriever.h> #include <vespa/searchlib/attribute/attributecontext.h> diff --git a/searchcore/src/vespa/searchcore/proton/common/CMakeLists.txt b/searchcore/src/vespa/searchcore/proton/common/CMakeLists.txt index be50b8d67a5..d59a4075dce 100644 --- a/searchcore/src/vespa/searchcore/proton/common/CMakeLists.txt +++ b/searchcore/src/vespa/searchcore/proton/common/CMakeLists.txt @@ -13,6 +13,7 @@ vespa_add_library(searchcore_pcommon STATIC feedtoken.cpp hw_info_sampler.cpp indexschema_inspector.cpp + ipendinglidtracker.cpp monitored_refcount.cpp operation_rate_tracker.cpp pendinglidtracker.cpp diff --git a/searchcore/src/vespa/searchcore/proton/common/ipendinglidtracker.cpp b/searchcore/src/vespa/searchcore/proton/common/ipendinglidtracker.cpp new file mode 100644 index 00000000000..2e182fa397f --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/common/ipendinglidtracker.cpp @@ -0,0 +1,35 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "ipendinglidtracker.h" + +namespace proton { + +IPendingLidTracker::Token::Token() + : _tracker(nullptr), + _lid(0u) +{} +IPendingLidTracker::Token::Token(uint32_t lid, IPendingLidTracker &tracker) + : _tracker(&tracker), + _lid(lid) +{} + +IPendingLidTracker::Token::~Token() { + if (_tracker != nullptr) { + _tracker->consume(_lid); + } +} + +void +ILidCommitState::waitComplete(uint32_t lid) const { + waitState(State::COMPLETED, lid); +} +void +ILidCommitState::waitComplete(const LidList & lids) const { + waitState(State::COMPLETED, lids); +} +void +ILidCommitState::waitComplete() const { + waitState(State::COMPLETED); +} + +} diff --git a/searchcore/src/vespa/searchcore/proton/common/ipendinglidtracker.h b/searchcore/src/vespa/searchcore/proton/common/ipendinglidtracker.h new file mode 100644 index 00000000000..9ca94441704 --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/common/ipendinglidtracker.h @@ -0,0 +1,64 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <cstdint> +#include <vector> + +namespace proton { + +/** Interface for tracking lids in the feed pipeline. + * A token is created with produce(lid). + * Once the token goes out of scope the lid is then consumed. + * This is used to track which lids are inflight in the feed pipeline. + */ +class IPendingLidTracker { +public: + class Token { + public: + Token(); + Token(uint32_t lid, IPendingLidTracker & tracker); + Token(const Token &) = delete; + Token & operator = (const Token &) = delete; + Token & operator = (Token &&) = delete; + Token(Token && rhs) noexcept + : _tracker(rhs._tracker), + _lid(rhs._lid) + { + rhs._tracker = nullptr; + } + ~Token(); + private: + IPendingLidTracker * _tracker; + uint32_t _lid; + }; + virtual ~IPendingLidTracker() = default; + virtual Token produce(uint32_t lid) = 0; +private: + virtual void consume(uint32_t lid) = 0; +}; + +/** + * This is an interface for checking/waiting the state of a lid in the feed pipeline + * The lid might need a commit (NEED_COMMIT), but if visibility-delay is zero it will go directly to WAITING + * as no explicit commit is needed. + * After a commit has been started the lid is transferred to WAITING. + * Once the commit has gone through the lid is in state COMPLETED. + */ +class ILidCommitState { +public: + enum class State {NEED_COMMIT, WAITING, COMPLETED}; + using LidList = std::vector<uint32_t>; + virtual ~ILidCommitState() = default; + State getState() const { return waitState(State::NEED_COMMIT); } + State getState(uint32_t lid) const { return waitState(State::NEED_COMMIT, lid); } + State getState(const LidList & lids) const { return waitState(State::NEED_COMMIT, lids); } + void waitComplete(uint32_t lid) const; + void waitComplete(const LidList & lids) const; + void waitComplete() const; +private: + virtual State waitState(State state, uint32_t lid) const = 0; + virtual State waitState(State state, const LidList & lids) const = 0; + virtual State waitState(State state) const = 0; +}; + +} diff --git a/searchcore/src/vespa/searchcore/proton/common/pendinglidtracker.cpp b/searchcore/src/vespa/searchcore/proton/common/pendinglidtracker.cpp index e0a45295dc8..dd6ca70248b 100644 --- a/searchcore/src/vespa/searchcore/proton/common/pendinglidtracker.cpp +++ b/searchcore/src/vespa/searchcore/proton/common/pendinglidtracker.cpp @@ -7,34 +7,6 @@ namespace proton { -IPendingLidTracker::Token::Token() - : _tracker(nullptr), - _lid(0u) -{} -IPendingLidTracker::Token::Token(uint32_t lid, IPendingLidTracker &tracker) - : _tracker(&tracker), - _lid(lid) -{} - -IPendingLidTracker::Token::~Token() { - if (_tracker != nullptr) { - _tracker->consume(_lid); - } -} - -void -ILidCommitState::waitComplete(uint32_t lid) const { - waitState(State::COMPLETED, lid); -} -void -ILidCommitState::waitComplete(const LidList & lids) const { - waitState(State::COMPLETED, lids); -} -void -ILidCommitState::waitComplete() const { - waitState(State::COMPLETED); -} - PendingLidTrackerBase::PendingLidTrackerBase() = default; PendingLidTrackerBase::~PendingLidTrackerBase() = default; @@ -117,7 +89,12 @@ PendingLidTracker::pendingLids() const { return lids; } -TwoPhasePendingLidTracker::TwoPhasePendingLidTracker() = default; +TwoPhasePendingLidTracker::TwoPhasePendingLidTracker() + : _sequenceId(0), + _lastCommitStarted(0), + _lastCommitCompleted(0), + _pending() +{} TwoPhasePendingLidTracker::~TwoPhasePendingLidTracker() { assert(_pending.empty()); @@ -126,24 +103,19 @@ TwoPhasePendingLidTracker::~TwoPhasePendingLidTracker() { IPendingLidTracker::Token TwoPhasePendingLidTracker::produce(uint32_t lid) { std::lock_guard guard(_mutex); - _pending[lid].inflight_feed++; + _pending[lid] = ++_sequenceId; return Token(lid, *this); } void TwoPhasePendingLidTracker::consume(uint32_t lid) { - std::lock_guard guard(_mutex); - auto found = _pending.find(lid); - assert (found != _pending.end()); - assert (found->second.inflight_feed > 0); - found->second.inflight_feed--; - found->second.need_commit = true; + (void) lid; } ILidCommitState::State TwoPhasePendingLidTracker::waitFor(MonitorGuard & guard, State state, uint32_t lid) const { for (auto found = _pending.find(lid); found != _pending.end(); found = _pending.find(lid)) { if (state == State::NEED_COMMIT) { - if ((found->second.inflight_feed > 0) || found->second.need_commit) { + if (found->second > _lastCommitStarted) { return State::NEED_COMMIT; } return State::WAITING; @@ -154,16 +126,17 @@ TwoPhasePendingLidTracker::waitFor(MonitorGuard & guard, State state, uint32_t l } void -TwoPhasePendingLidTracker::consumeSnapshot(LidList committed) { +TwoPhasePendingLidTracker::consumeSnapshot(uint64_t sequenceIdWhenStarted) { MonitorGuard guard(_mutex); - for (const auto & lid : committed) { - auto found = _pending.find(lid); - assert(found != _pending.end()); - assert(found->second.inflight_commit >= 1); - found->second.inflight_commit --; - if (found->second.empty()) { - _pending.erase(found); - } + assert(sequenceIdWhenStarted >= _lastCommitCompleted); + _lastCommitCompleted = sequenceIdWhenStarted; + std::vector<uint32_t> committed; + for (const auto & entry : _pending) { + if (entry.second <= sequenceIdWhenStarted) + committed.push_back(entry.first); + } + for (uint32_t lid : committed) { + _pending.erase(lid); } _cond.notify_all(); } @@ -184,43 +157,36 @@ namespace common::internal { class CommitList : public PendingLidTrackerBase::Payload { public: using LidList = ILidCommitState::LidList; - CommitList(LidList lids, TwoPhasePendingLidTracker & tracker) + CommitList(uint64_t commitStarted, TwoPhasePendingLidTracker & tracker) : _tracker(&tracker), - _lids(std::move(lids)) + _commitStarted(commitStarted) { } CommitList(const CommitList &) = delete; CommitList & operator = (const CommitList &) = delete; CommitList & operator = (CommitList &&) = delete; CommitList(CommitList && rhs) noexcept : _tracker(rhs._tracker), - _lids(std::move(rhs._lids)) + _commitStarted(rhs._commitStarted) { rhs._tracker = nullptr; } ~CommitList() override { if (_tracker != nullptr) { - _tracker->consumeSnapshot(std::move(_lids)); + _tracker->consumeSnapshot(_commitStarted); } } private: TwoPhasePendingLidTracker * _tracker; - LidList _lids; + uint64_t _commitStarted; }; } PendingLidTrackerBase::Snapshot TwoPhasePendingLidTracker::produceSnapshot() { - LidList toCommit; MonitorGuard guard(_mutex); - for (auto & entry : _pending) { - if (entry.second.need_commit) { - toCommit.emplace_back(entry.first); - entry.second.inflight_commit ++; - entry.second.need_commit = false; - } - } - return std::make_unique<common::internal::CommitList>(std::move(toCommit), *this); + _lastCommitStarted = _sequenceId; + return std::make_unique<common::internal::CommitList>(_lastCommitStarted, *this); } } diff --git a/searchcore/src/vespa/searchcore/proton/common/pendinglidtracker.h b/searchcore/src/vespa/searchcore/proton/common/pendinglidtracker.h index c4ff0d7d639..ef0a1dbb1a3 100644 --- a/searchcore/src/vespa/searchcore/proton/common/pendinglidtracker.h +++ b/searchcore/src/vespa/searchcore/proton/common/pendinglidtracker.h @@ -2,68 +2,13 @@ #pragma once +#include "ipendinglidtracker.h" #include <vespa/vespalib/stllike/hash_map.h> #include <mutex> #include <condition_variable> -#include <vector> namespace proton { -/** Interface for tracking lids in the feed pipeline. - * A token is created with produce(lid). - * Once the token goes out of scope the lid is then consumed. - * This is used to track which lids are inflight in the feed pipeline. - */ -class IPendingLidTracker { -public: - class Token { - public: - Token(); - Token(uint32_t lid, IPendingLidTracker & tracker); - Token(const Token &) = delete; - Token & operator = (const Token &) = delete; - Token & operator = (Token &&) = delete; - Token(Token && rhs) noexcept - : _tracker(rhs._tracker), - _lid(rhs._lid) - { - rhs._tracker = nullptr; - } - ~Token(); - private: - IPendingLidTracker * _tracker; - uint32_t _lid; - }; - virtual ~IPendingLidTracker() = default; - virtual Token produce(uint32_t lid) = 0; -private: - virtual void consume(uint32_t lid) = 0; -}; - -/** - * This is an interface for checking/waiting the state of a lid in the feed pipeline - * The lid might need a commit (NEED_COMMIT), but if visibility-delay is zero it will go directly to WAITING - * as no explicit commit is needed. - * After a commit has been started the lid is transferred to WAITING. - * Once the commit has gone through the lid is in state COMPLETED. - */ -class ILidCommitState { -public: - enum class State {NEED_COMMIT, WAITING, COMPLETED}; - using LidList = std::vector<uint32_t>; - virtual ~ILidCommitState() = default; - State getState() const { return waitState(State::NEED_COMMIT); } - State getState(uint32_t lid) const { return waitState(State::NEED_COMMIT, lid); } - State getState(const LidList & lids) const { return waitState(State::NEED_COMMIT, lids); } - void waitComplete(uint32_t lid) const; - void waitComplete(const LidList & lids) const; - void waitComplete() const; -private: - virtual State waitState(State state, uint32_t lid) const = 0; - virtual State waitState(State state, const LidList & lids) const = 0; - virtual State waitState(State state) const = 0; -}; - /** * Base class for doing 2 phased lid tracking. The first phase is from when the feed operation * is in progress and lasts until the OperationDoneContext goes out of scope. This might include commit @@ -132,17 +77,13 @@ public: private: friend common::internal::CommitList; void consume(uint32_t lid) override; - void consumeSnapshot(LidList lids); + void consumeSnapshot(uint64_t sequenceIdWhenStarted); LidList pendingLids() const override; State waitFor(MonitorGuard & guard, State state, uint32_t lid) const override; - struct Counters { - Counters() : inflight_feed(0), inflight_commit(0), need_commit(false) {} - bool empty() const { return (inflight_feed == 0) && ! need_commit && (inflight_commit == 0); } - uint32_t inflight_feed; - uint32_t inflight_commit; - bool need_commit; - }; - vespalib::hash_map<uint32_t, Counters> _pending; + uint64_t _sequenceId; + uint64_t _lastCommitStarted; + uint64_t _lastCommitCompleted; + vespalib::hash_map<uint32_t, uint64_t> _pending; }; } diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/commit_and_wait_document_retriever.h b/searchcore/src/vespa/searchcore/proton/persistenceengine/commit_and_wait_document_retriever.h index 2b8264f7eef..8e1ac08fa20 100644 --- a/searchcore/src/vespa/searchcore/proton/persistenceengine/commit_and_wait_document_retriever.h +++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/commit_and_wait_document_retriever.h @@ -3,7 +3,7 @@ #pragma once #include "i_document_retriever.h" -#include <vespa/searchcore/proton/common/pendinglidtracker.h> +#include <vespa/searchcore/proton/common/ipendinglidtracker.h> #include <vespa/searchcore/proton/common/icommitable.h> namespace proton { @@ -17,7 +17,7 @@ class CommitAndWaitDocumentRetriever : public IDocumentRetriever { IDocumentRetriever::SP _retriever; ICommitable &_commit; - ILidCommitState &_uncommittedLidsTracker; + ILidCommitState &_uncommittedLidsTracker; using Bucket = storage::spi::Bucket; public: CommitAndWaitDocumentRetriever(IDocumentRetriever::SP retriever, ICommitable &commit, ILidCommitState & unCommittedLidTracker); diff --git a/searchcore/src/vespa/searchcore/proton/server/forcecommitdonetask.h b/searchcore/src/vespa/searchcore/proton/server/forcecommitdonetask.h index 0a6774ad4e0..cffe4199e84 100644 --- a/searchcore/src/vespa/searchcore/proton/server/forcecommitdonetask.h +++ b/searchcore/src/vespa/searchcore/proton/server/forcecommitdonetask.h @@ -2,7 +2,7 @@ #pragma once -#include <vespa/searchcore/proton/common/pendinglidtracker.h> +#include <vespa/searchcore/proton/common/ipendinglidtracker.h> #include <vespa/vespalib/util/executor.h> namespace proton { @@ -25,9 +25,9 @@ struct IDocumentMetaStore; */ class ForceCommitDoneTask : public vespalib::Executor::Task { - std::vector<uint32_t> _lidsToReuse; - bool _holdUnblockShrinkLidSpace; - IDocumentMetaStore &_documentMetaStore; + std::vector<uint32_t> _lidsToReuse; + bool _holdUnblockShrinkLidSpace; + IDocumentMetaStore &_documentMetaStore; public: ForceCommitDoneTask(IDocumentMetaStore &documentMetaStore); diff --git a/searchcore/src/vespa/searchcore/proton/server/ifeedview.h b/searchcore/src/vespa/searchcore/proton/server/ifeedview.h index 734c27ff09d..8856d5f194d 100644 --- a/searchcore/src/vespa/searchcore/proton/server/ifeedview.h +++ b/searchcore/src/vespa/searchcore/proton/server/ifeedview.h @@ -4,7 +4,7 @@ #include <vespa/searchcore/proton/common/feedtoken.h> #include <vespa/searchlib/common/serialnum.h> -#include <vespa/searchcore/proton/common/pendinglidtracker.h> +#include <vespa/searchcore/proton/common/ipendinglidtracker.h> namespace document { class DocumentTypeRepo; } diff --git a/searchcore/src/vespa/searchcore/proton/server/putdonecontext.h b/searchcore/src/vespa/searchcore/proton/server/putdonecontext.h index 024876c6c3d..c5e8c558c9e 100644 --- a/searchcore/src/vespa/searchcore/proton/server/putdonecontext.h +++ b/searchcore/src/vespa/searchcore/proton/server/putdonecontext.h @@ -3,7 +3,7 @@ #pragma once #include "operationdonecontext.h" -#include <vespa/searchcore/proton/common/pendinglidtracker.h> +#include <vespa/searchcore/proton/common/ipendinglidtracker.h> #include <vespa/document/base/globalid.h> #include <vespa/searchlib/common/serialnum.h> diff --git a/searchcore/src/vespa/searchcore/proton/server/removedonecontext.h b/searchcore/src/vespa/searchcore/proton/server/removedonecontext.h index 6880696fe88..7b6c6be1fe1 100644 --- a/searchcore/src/vespa/searchcore/proton/server/removedonecontext.h +++ b/searchcore/src/vespa/searchcore/proton/server/removedonecontext.h @@ -4,7 +4,7 @@ #include "operationdonecontext.h" #include <vespa/searchcore/proton/reference/pending_notify_remove_done.h> -#include <vespa/searchcore/proton/common/pendinglidtracker.h> +#include <vespa/searchcore/proton/common/ipendinglidtracker.h> #include <vespa/vespalib/util/executor.h> #include <vespa/document/base/globalid.h> #include <vespa/searchlib/common/serialnum.h> diff --git a/searchcore/src/vespa/searchcore/proton/server/updatedonecontext.h b/searchcore/src/vespa/searchcore/proton/server/updatedonecontext.h index ba49317922b..6dad929aa26 100644 --- a/searchcore/src/vespa/searchcore/proton/server/updatedonecontext.h +++ b/searchcore/src/vespa/searchcore/proton/server/updatedonecontext.h @@ -3,7 +3,7 @@ #pragma once #include "operationdonecontext.h" -#include <vespa/searchcore/proton/common/pendinglidtracker.h> +#include <vespa/searchcore/proton/common/ipendinglidtracker.h> #include <vespa/document/update/documentupdate.h> #include <future> |