summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-09-01 15:32:16 +0200
committerGitHub <noreply@github.com>2020-09-01 15:32:16 +0200
commit402ce3fe9c7dbdbf63d8bfe8121683440adc6bc5 (patch)
treebe75321bee8c000899ca473b35faaa95c0699621
parent3b8f7fdff4872bd010286753f6072ec492f14a48 (diff)
parent06310164a5a8a86b0be4934dd2002e9d657e1127 (diff)
Merge pull request #14226 from vespa-engine/balder/counters-2-sequencenumber
Balder/counters 2 sequencenumber
-rw-r--r--searchcore/src/tests/proton/document_iterator/document_iterator_test.cpp1
-rw-r--r--searchcore/src/vespa/searchcore/proton/common/CMakeLists.txt1
-rw-r--r--searchcore/src/vespa/searchcore/proton/common/ipendinglidtracker.cpp35
-rw-r--r--searchcore/src/vespa/searchcore/proton/common/ipendinglidtracker.h64
-rw-r--r--searchcore/src/vespa/searchcore/proton/common/pendinglidtracker.cpp86
-rw-r--r--searchcore/src/vespa/searchcore/proton/common/pendinglidtracker.h71
-rw-r--r--searchcore/src/vespa/searchcore/proton/persistenceengine/commit_and_wait_document_retriever.h4
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/forcecommitdonetask.h8
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/ifeedview.h2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/putdonecontext.h2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/removedonecontext.h2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/updatedonecontext.h2
12 files changed, 143 insertions, 135 deletions
diff --git a/searchcore/src/tests/proton/document_iterator/document_iterator_test.cpp b/searchcore/src/tests/proton/document_iterator/document_iterator_test.cpp
index 1579b8ec91a..ec314c50f4a 100644
--- a/searchcore/src/tests/proton/document_iterator/document_iterator_test.cpp
+++ b/searchcore/src/tests/proton/document_iterator/document_iterator_test.cpp
@@ -9,6 +9,7 @@
#include <vespa/persistence/spi/result.h>
#include <vespa/persistence/spi/test.h>
#include <vespa/searchcore/proton/common/attribute_updater.h>
+#include <vespa/searchcore/proton/common/pendinglidtracker.h>
#include <vespa/searchcore/proton/persistenceengine/document_iterator.h>
#include <vespa/searchcore/proton/persistenceengine/commit_and_wait_document_retriever.h>
#include <vespa/searchlib/attribute/attributecontext.h>
diff --git a/searchcore/src/vespa/searchcore/proton/common/CMakeLists.txt b/searchcore/src/vespa/searchcore/proton/common/CMakeLists.txt
index be50b8d67a5..d59a4075dce 100644
--- a/searchcore/src/vespa/searchcore/proton/common/CMakeLists.txt
+++ b/searchcore/src/vespa/searchcore/proton/common/CMakeLists.txt
@@ -13,6 +13,7 @@ vespa_add_library(searchcore_pcommon STATIC
feedtoken.cpp
hw_info_sampler.cpp
indexschema_inspector.cpp
+ ipendinglidtracker.cpp
monitored_refcount.cpp
operation_rate_tracker.cpp
pendinglidtracker.cpp
diff --git a/searchcore/src/vespa/searchcore/proton/common/ipendinglidtracker.cpp b/searchcore/src/vespa/searchcore/proton/common/ipendinglidtracker.cpp
new file mode 100644
index 00000000000..2e182fa397f
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/proton/common/ipendinglidtracker.cpp
@@ -0,0 +1,35 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "ipendinglidtracker.h"
+
+namespace proton {
+
+IPendingLidTracker::Token::Token()
+ : _tracker(nullptr),
+ _lid(0u)
+{}
+IPendingLidTracker::Token::Token(uint32_t lid, IPendingLidTracker &tracker)
+ : _tracker(&tracker),
+ _lid(lid)
+{}
+
+IPendingLidTracker::Token::~Token() {
+ if (_tracker != nullptr) {
+ _tracker->consume(_lid);
+ }
+}
+
+void
+ILidCommitState::waitComplete(uint32_t lid) const {
+ waitState(State::COMPLETED, lid);
+}
+void
+ILidCommitState::waitComplete(const LidList & lids) const {
+ waitState(State::COMPLETED, lids);
+}
+void
+ILidCommitState::waitComplete() const {
+ waitState(State::COMPLETED);
+}
+
+}
diff --git a/searchcore/src/vespa/searchcore/proton/common/ipendinglidtracker.h b/searchcore/src/vespa/searchcore/proton/common/ipendinglidtracker.h
new file mode 100644
index 00000000000..9ca94441704
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/proton/common/ipendinglidtracker.h
@@ -0,0 +1,64 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <cstdint>
+#include <vector>
+
+namespace proton {
+
+/** Interface for tracking lids in the feed pipeline.
+ * A token is created with produce(lid).
+ * Once the token goes out of scope the lid is then consumed.
+ * This is used to track which lids are inflight in the feed pipeline.
+ */
+class IPendingLidTracker {
+public:
+ class Token {
+ public:
+ Token();
+ Token(uint32_t lid, IPendingLidTracker & tracker);
+ Token(const Token &) = delete;
+ Token & operator = (const Token &) = delete;
+ Token & operator = (Token &&) = delete;
+ Token(Token && rhs) noexcept
+ : _tracker(rhs._tracker),
+ _lid(rhs._lid)
+ {
+ rhs._tracker = nullptr;
+ }
+ ~Token();
+ private:
+ IPendingLidTracker * _tracker;
+ uint32_t _lid;
+ };
+ virtual ~IPendingLidTracker() = default;
+ virtual Token produce(uint32_t lid) = 0;
+private:
+ virtual void consume(uint32_t lid) = 0;
+};
+
+/**
+ * This is an interface for checking/waiting the state of a lid in the feed pipeline
+ * The lid might need a commit (NEED_COMMIT), but if visibility-delay is zero it will go directly to WAITING
+ * as no explicit commit is needed.
+ * After a commit has been started the lid is transferred to WAITING.
+ * Once the commit has gone through the lid is in state COMPLETED.
+ */
+class ILidCommitState {
+public:
+ enum class State {NEED_COMMIT, WAITING, COMPLETED};
+ using LidList = std::vector<uint32_t>;
+ virtual ~ILidCommitState() = default;
+ State getState() const { return waitState(State::NEED_COMMIT); }
+ State getState(uint32_t lid) const { return waitState(State::NEED_COMMIT, lid); }
+ State getState(const LidList & lids) const { return waitState(State::NEED_COMMIT, lids); }
+ void waitComplete(uint32_t lid) const;
+ void waitComplete(const LidList & lids) const;
+ void waitComplete() const;
+private:
+ virtual State waitState(State state, uint32_t lid) const = 0;
+ virtual State waitState(State state, const LidList & lids) const = 0;
+ virtual State waitState(State state) const = 0;
+};
+
+}
diff --git a/searchcore/src/vespa/searchcore/proton/common/pendinglidtracker.cpp b/searchcore/src/vespa/searchcore/proton/common/pendinglidtracker.cpp
index e0a45295dc8..dd6ca70248b 100644
--- a/searchcore/src/vespa/searchcore/proton/common/pendinglidtracker.cpp
+++ b/searchcore/src/vespa/searchcore/proton/common/pendinglidtracker.cpp
@@ -7,34 +7,6 @@
namespace proton {
-IPendingLidTracker::Token::Token()
- : _tracker(nullptr),
- _lid(0u)
-{}
-IPendingLidTracker::Token::Token(uint32_t lid, IPendingLidTracker &tracker)
- : _tracker(&tracker),
- _lid(lid)
-{}
-
-IPendingLidTracker::Token::~Token() {
- if (_tracker != nullptr) {
- _tracker->consume(_lid);
- }
-}
-
-void
-ILidCommitState::waitComplete(uint32_t lid) const {
- waitState(State::COMPLETED, lid);
-}
-void
-ILidCommitState::waitComplete(const LidList & lids) const {
- waitState(State::COMPLETED, lids);
-}
-void
-ILidCommitState::waitComplete() const {
- waitState(State::COMPLETED);
-}
-
PendingLidTrackerBase::PendingLidTrackerBase() = default;
PendingLidTrackerBase::~PendingLidTrackerBase() = default;
@@ -117,7 +89,12 @@ PendingLidTracker::pendingLids() const {
return lids;
}
-TwoPhasePendingLidTracker::TwoPhasePendingLidTracker() = default;
+TwoPhasePendingLidTracker::TwoPhasePendingLidTracker()
+ : _sequenceId(0),
+ _lastCommitStarted(0),
+ _lastCommitCompleted(0),
+ _pending()
+{}
TwoPhasePendingLidTracker::~TwoPhasePendingLidTracker() {
assert(_pending.empty());
@@ -126,24 +103,19 @@ TwoPhasePendingLidTracker::~TwoPhasePendingLidTracker() {
IPendingLidTracker::Token
TwoPhasePendingLidTracker::produce(uint32_t lid) {
std::lock_guard guard(_mutex);
- _pending[lid].inflight_feed++;
+ _pending[lid] = ++_sequenceId;
return Token(lid, *this);
}
void
TwoPhasePendingLidTracker::consume(uint32_t lid) {
- std::lock_guard guard(_mutex);
- auto found = _pending.find(lid);
- assert (found != _pending.end());
- assert (found->second.inflight_feed > 0);
- found->second.inflight_feed--;
- found->second.need_commit = true;
+ (void) lid;
}
ILidCommitState::State
TwoPhasePendingLidTracker::waitFor(MonitorGuard & guard, State state, uint32_t lid) const {
for (auto found = _pending.find(lid); found != _pending.end(); found = _pending.find(lid)) {
if (state == State::NEED_COMMIT) {
- if ((found->second.inflight_feed > 0) || found->second.need_commit) {
+ if (found->second > _lastCommitStarted) {
return State::NEED_COMMIT;
}
return State::WAITING;
@@ -154,16 +126,17 @@ TwoPhasePendingLidTracker::waitFor(MonitorGuard & guard, State state, uint32_t l
}
void
-TwoPhasePendingLidTracker::consumeSnapshot(LidList committed) {
+TwoPhasePendingLidTracker::consumeSnapshot(uint64_t sequenceIdWhenStarted) {
MonitorGuard guard(_mutex);
- for (const auto & lid : committed) {
- auto found = _pending.find(lid);
- assert(found != _pending.end());
- assert(found->second.inflight_commit >= 1);
- found->second.inflight_commit --;
- if (found->second.empty()) {
- _pending.erase(found);
- }
+ assert(sequenceIdWhenStarted >= _lastCommitCompleted);
+ _lastCommitCompleted = sequenceIdWhenStarted;
+ std::vector<uint32_t> committed;
+ for (const auto & entry : _pending) {
+ if (entry.second <= sequenceIdWhenStarted)
+ committed.push_back(entry.first);
+ }
+ for (uint32_t lid : committed) {
+ _pending.erase(lid);
}
_cond.notify_all();
}
@@ -184,43 +157,36 @@ namespace common::internal {
class CommitList : public PendingLidTrackerBase::Payload {
public:
using LidList = ILidCommitState::LidList;
- CommitList(LidList lids, TwoPhasePendingLidTracker & tracker)
+ CommitList(uint64_t commitStarted, TwoPhasePendingLidTracker & tracker)
: _tracker(&tracker),
- _lids(std::move(lids))
+ _commitStarted(commitStarted)
{ }
CommitList(const CommitList &) = delete;
CommitList & operator = (const CommitList &) = delete;
CommitList & operator = (CommitList &&) = delete;
CommitList(CommitList && rhs) noexcept
: _tracker(rhs._tracker),
- _lids(std::move(rhs._lids))
+ _commitStarted(rhs._commitStarted)
{
rhs._tracker = nullptr;
}
~CommitList() override {
if (_tracker != nullptr) {
- _tracker->consumeSnapshot(std::move(_lids));
+ _tracker->consumeSnapshot(_commitStarted);
}
}
private:
TwoPhasePendingLidTracker * _tracker;
- LidList _lids;
+ uint64_t _commitStarted;
};
}
PendingLidTrackerBase::Snapshot
TwoPhasePendingLidTracker::produceSnapshot() {
- LidList toCommit;
MonitorGuard guard(_mutex);
- for (auto & entry : _pending) {
- if (entry.second.need_commit) {
- toCommit.emplace_back(entry.first);
- entry.second.inflight_commit ++;
- entry.second.need_commit = false;
- }
- }
- return std::make_unique<common::internal::CommitList>(std::move(toCommit), *this);
+ _lastCommitStarted = _sequenceId;
+ return std::make_unique<common::internal::CommitList>(_lastCommitStarted, *this);
}
}
diff --git a/searchcore/src/vespa/searchcore/proton/common/pendinglidtracker.h b/searchcore/src/vespa/searchcore/proton/common/pendinglidtracker.h
index c4ff0d7d639..ef0a1dbb1a3 100644
--- a/searchcore/src/vespa/searchcore/proton/common/pendinglidtracker.h
+++ b/searchcore/src/vespa/searchcore/proton/common/pendinglidtracker.h
@@ -2,68 +2,13 @@
#pragma once
+#include "ipendinglidtracker.h"
#include <vespa/vespalib/stllike/hash_map.h>
#include <mutex>
#include <condition_variable>
-#include <vector>
namespace proton {
-/** Interface for tracking lids in the feed pipeline.
- * A token is created with produce(lid).
- * Once the token goes out of scope the lid is then consumed.
- * This is used to track which lids are inflight in the feed pipeline.
- */
-class IPendingLidTracker {
-public:
- class Token {
- public:
- Token();
- Token(uint32_t lid, IPendingLidTracker & tracker);
- Token(const Token &) = delete;
- Token & operator = (const Token &) = delete;
- Token & operator = (Token &&) = delete;
- Token(Token && rhs) noexcept
- : _tracker(rhs._tracker),
- _lid(rhs._lid)
- {
- rhs._tracker = nullptr;
- }
- ~Token();
- private:
- IPendingLidTracker * _tracker;
- uint32_t _lid;
- };
- virtual ~IPendingLidTracker() = default;
- virtual Token produce(uint32_t lid) = 0;
-private:
- virtual void consume(uint32_t lid) = 0;
-};
-
-/**
- * This is an interface for checking/waiting the state of a lid in the feed pipeline
- * The lid might need a commit (NEED_COMMIT), but if visibility-delay is zero it will go directly to WAITING
- * as no explicit commit is needed.
- * After a commit has been started the lid is transferred to WAITING.
- * Once the commit has gone through the lid is in state COMPLETED.
- */
-class ILidCommitState {
-public:
- enum class State {NEED_COMMIT, WAITING, COMPLETED};
- using LidList = std::vector<uint32_t>;
- virtual ~ILidCommitState() = default;
- State getState() const { return waitState(State::NEED_COMMIT); }
- State getState(uint32_t lid) const { return waitState(State::NEED_COMMIT, lid); }
- State getState(const LidList & lids) const { return waitState(State::NEED_COMMIT, lids); }
- void waitComplete(uint32_t lid) const;
- void waitComplete(const LidList & lids) const;
- void waitComplete() const;
-private:
- virtual State waitState(State state, uint32_t lid) const = 0;
- virtual State waitState(State state, const LidList & lids) const = 0;
- virtual State waitState(State state) const = 0;
-};
-
/**
* Base class for doing 2 phased lid tracking. The first phase is from when the feed operation
* is in progress and lasts until the OperationDoneContext goes out of scope. This might include commit
@@ -132,17 +77,13 @@ public:
private:
friend common::internal::CommitList;
void consume(uint32_t lid) override;
- void consumeSnapshot(LidList lids);
+ void consumeSnapshot(uint64_t sequenceIdWhenStarted);
LidList pendingLids() const override;
State waitFor(MonitorGuard & guard, State state, uint32_t lid) const override;
- struct Counters {
- Counters() : inflight_feed(0), inflight_commit(0), need_commit(false) {}
- bool empty() const { return (inflight_feed == 0) && ! need_commit && (inflight_commit == 0); }
- uint32_t inflight_feed;
- uint32_t inflight_commit;
- bool need_commit;
- };
- vespalib::hash_map<uint32_t, Counters> _pending;
+ uint64_t _sequenceId;
+ uint64_t _lastCommitStarted;
+ uint64_t _lastCommitCompleted;
+ vespalib::hash_map<uint32_t, uint64_t> _pending;
};
}
diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/commit_and_wait_document_retriever.h b/searchcore/src/vespa/searchcore/proton/persistenceengine/commit_and_wait_document_retriever.h
index 2b8264f7eef..8e1ac08fa20 100644
--- a/searchcore/src/vespa/searchcore/proton/persistenceengine/commit_and_wait_document_retriever.h
+++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/commit_and_wait_document_retriever.h
@@ -3,7 +3,7 @@
#pragma once
#include "i_document_retriever.h"
-#include <vespa/searchcore/proton/common/pendinglidtracker.h>
+#include <vespa/searchcore/proton/common/ipendinglidtracker.h>
#include <vespa/searchcore/proton/common/icommitable.h>
namespace proton {
@@ -17,7 +17,7 @@ class CommitAndWaitDocumentRetriever : public IDocumentRetriever
{
IDocumentRetriever::SP _retriever;
ICommitable &_commit;
- ILidCommitState &_uncommittedLidsTracker;
+ ILidCommitState &_uncommittedLidsTracker;
using Bucket = storage::spi::Bucket;
public:
CommitAndWaitDocumentRetriever(IDocumentRetriever::SP retriever, ICommitable &commit, ILidCommitState & unCommittedLidTracker);
diff --git a/searchcore/src/vespa/searchcore/proton/server/forcecommitdonetask.h b/searchcore/src/vespa/searchcore/proton/server/forcecommitdonetask.h
index 0a6774ad4e0..cffe4199e84 100644
--- a/searchcore/src/vespa/searchcore/proton/server/forcecommitdonetask.h
+++ b/searchcore/src/vespa/searchcore/proton/server/forcecommitdonetask.h
@@ -2,7 +2,7 @@
#pragma once
-#include <vespa/searchcore/proton/common/pendinglidtracker.h>
+#include <vespa/searchcore/proton/common/ipendinglidtracker.h>
#include <vespa/vespalib/util/executor.h>
namespace proton {
@@ -25,9 +25,9 @@ struct IDocumentMetaStore;
*/
class ForceCommitDoneTask : public vespalib::Executor::Task
{
- std::vector<uint32_t> _lidsToReuse;
- bool _holdUnblockShrinkLidSpace;
- IDocumentMetaStore &_documentMetaStore;
+ std::vector<uint32_t> _lidsToReuse;
+ bool _holdUnblockShrinkLidSpace;
+ IDocumentMetaStore &_documentMetaStore;
public:
ForceCommitDoneTask(IDocumentMetaStore &documentMetaStore);
diff --git a/searchcore/src/vespa/searchcore/proton/server/ifeedview.h b/searchcore/src/vespa/searchcore/proton/server/ifeedview.h
index 734c27ff09d..8856d5f194d 100644
--- a/searchcore/src/vespa/searchcore/proton/server/ifeedview.h
+++ b/searchcore/src/vespa/searchcore/proton/server/ifeedview.h
@@ -4,7 +4,7 @@
#include <vespa/searchcore/proton/common/feedtoken.h>
#include <vespa/searchlib/common/serialnum.h>
-#include <vespa/searchcore/proton/common/pendinglidtracker.h>
+#include <vespa/searchcore/proton/common/ipendinglidtracker.h>
namespace document { class DocumentTypeRepo; }
diff --git a/searchcore/src/vespa/searchcore/proton/server/putdonecontext.h b/searchcore/src/vespa/searchcore/proton/server/putdonecontext.h
index 024876c6c3d..c5e8c558c9e 100644
--- a/searchcore/src/vespa/searchcore/proton/server/putdonecontext.h
+++ b/searchcore/src/vespa/searchcore/proton/server/putdonecontext.h
@@ -3,7 +3,7 @@
#pragma once
#include "operationdonecontext.h"
-#include <vespa/searchcore/proton/common/pendinglidtracker.h>
+#include <vespa/searchcore/proton/common/ipendinglidtracker.h>
#include <vespa/document/base/globalid.h>
#include <vespa/searchlib/common/serialnum.h>
diff --git a/searchcore/src/vespa/searchcore/proton/server/removedonecontext.h b/searchcore/src/vespa/searchcore/proton/server/removedonecontext.h
index 6880696fe88..7b6c6be1fe1 100644
--- a/searchcore/src/vespa/searchcore/proton/server/removedonecontext.h
+++ b/searchcore/src/vespa/searchcore/proton/server/removedonecontext.h
@@ -4,7 +4,7 @@
#include "operationdonecontext.h"
#include <vespa/searchcore/proton/reference/pending_notify_remove_done.h>
-#include <vespa/searchcore/proton/common/pendinglidtracker.h>
+#include <vespa/searchcore/proton/common/ipendinglidtracker.h>
#include <vespa/vespalib/util/executor.h>
#include <vespa/document/base/globalid.h>
#include <vespa/searchlib/common/serialnum.h>
diff --git a/searchcore/src/vespa/searchcore/proton/server/updatedonecontext.h b/searchcore/src/vespa/searchcore/proton/server/updatedonecontext.h
index ba49317922b..6dad929aa26 100644
--- a/searchcore/src/vespa/searchcore/proton/server/updatedonecontext.h
+++ b/searchcore/src/vespa/searchcore/proton/server/updatedonecontext.h
@@ -3,7 +3,7 @@
#pragma once
#include "operationdonecontext.h"
-#include <vespa/searchcore/proton/common/pendinglidtracker.h>
+#include <vespa/searchcore/proton/common/ipendinglidtracker.h>
#include <vespa/document/update/documentupdate.h>
#include <future>