aboutsummaryrefslogtreecommitdiffstats
path: root/searchlib
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2021-08-13 14:09:40 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2021-08-13 14:14:49 +0000
commit523f065db9be6ef5defbfcd7d228f46596fc8736 (patch)
tree34fc433e761884bebc9c18d75fcebc06e1be7ef5 /searchlib
parent1292a3ad898b50de44ae8d2967e9d6de8ab536a3 (diff)
Avoid starting a separate thread for completing index insert.
Use a queue and do completition in the forground. That ensures only a single thread modifying the attribute.
Diffstat (limited to 'searchlib')
-rw-r--r--searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.cpp107
1 files changed, 73 insertions, 34 deletions
diff --git a/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.cpp b/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.cpp
index 77e4f665327..62a1072de48 100644
--- a/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.cpp
+++ b/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.cpp
@@ -213,57 +213,96 @@ public:
};
}
-
+/**
+ * Will load and index documents in parallel. Note that indexing order is not guaranteed,
+ * but that is inline with the guarantees vespa already has.
+ */
class DenseTensorAttribute::ThreadedLoader : public Loader {
public:
ThreadedLoader(DenseTensorAttribute & attr, vespalib::Executor & shared_executor)
: _attr(attr),
_shared_executor(shared_executor),
- _complete_executor(1, 0x10000),
+ _queue(MAX_PENDING),
_pending(0)
{}
- void load(uint32_t lid, vespalib::datastore::EntryRef ref) override {
+ void load(uint32_t lid, vespalib::datastore::EntryRef ref) override;
+ void wait_complete() override {
+ drainUntilPending(0);
+ }
+private:
+ using Entry = std::pair<uint32_t, std::unique_ptr<PrepareResult>>;
+ using Queue = vespalib::ArrayQueue<Entry>;
+
+ bool pop(Entry & entry) {
+ std::unique_lock guard(_mutex);
+ if (_queue.empty()) return false;
+ entry = std::move(_queue.front());
+ _queue.pop();
+ return true;
+ }
+ void drainQ() {
+ Queue queue(MAX_PENDING);
{
std::unique_lock guard(_mutex);
- while (_pending > MAX_PENDING) {
- _cond.wait(guard);
- }
+ queue.swap(_queue);
+ }
+ while (!queue.empty()) {
+ auto item = std::move(queue.front());
+ queue.pop();
+ complete(item.first, std::move(item.second));
}
- ++_pending;
- _shared_executor.execute(vespalib::makeLambdaTask([this, ref, lid]() {
- auto prepared = _attr._index->prepare_add_document(lid, _attr._denseTensorStore.get_typed_cells(ref), _attr.getGenerationHandler().takeGuard());
- _complete_executor.execute(vespalib::makeLambdaTask([this, lid, result=std::move(prepared)]() mutable {
- _attr.setCommittedDocIdLimit(std::max(_attr.getCommittedDocIdLimit(), lid + 1));
- _attr._index->complete_add_document(lid, std::move(result));
- {
- std::unique_lock guard(_mutex);
- --_pending;
- if ((_pending == MAX_PENDING/2) || (_pending == 0)) {
- _cond.notify_all();
- }
- }
- if ((lid % 256) == 0) {
- _attr.commit();
- };
- }));
- }));
}
- void wait_complete() override {
- std::unique_lock guard(_mutex);
- while (_pending > 0) {
- _cond.wait(guard);
+
+ void complete(uint32_t lid, std::unique_ptr<PrepareResult> prepared) {
+ _attr.setCommittedDocIdLimit(std::max(_attr.getCommittedDocIdLimit(), lid + 1));
+ _attr._index->complete_add_document(lid, std::move(prepared));
+ --_pending;
+ if ((lid % 256) == 0) {
+ _attr.commit();
+ };
+ }
+ void drainUntilPending(uint32_t maxPending) {
+ while (_pending > maxPending) {
+ {
+ std::unique_lock guard(_mutex);
+ while (_queue.empty()) {
+ _cond.wait(guard);
+ }
+ }
+ drainQ();
}
}
-private:
static constexpr uint32_t MAX_PENDING = 1000;
- DenseTensorAttribute & _attr;
- vespalib::Executor & _shared_executor;
- vespalib::ThreadStackExecutor _complete_executor;
- std::mutex _mutex;
+ DenseTensorAttribute & _attr;
+ vespalib::Executor & _shared_executor;
+ std::mutex _mutex;
std::condition_variable _cond;
- uint64_t _pending;
+ Queue _queue;
+ uint64_t _pending; // _pending is only modified in forground thread
};
+void
+DenseTensorAttribute::ThreadedLoader::load(uint32_t lid, vespalib::datastore::EntryRef ref) {
+ Entry item;
+ while (pop(item)) {
+ // First process items that are ready to complete
+ complete(item.first, std::move(item.second));
+ }
+ // Then ensure that there no mor ethan MAX_PENDING inflight
+ drainUntilPending(MAX_PENDING);
+
+ // Then we can issue a new one
+ ++_pending;
+ _shared_executor.execute(vespalib::makeLambdaTask([this, ref, lid]() {
+ auto prepared = _attr._index->prepare_add_document(lid, _attr._denseTensorStore.get_typed_cells(ref),
+ _attr.getGenerationHandler().takeGuard());
+ std::unique_lock guard(_mutex);
+ _queue.push(std::make_pair(lid, std::move(prepared)));
+ if (_queue.size() == 1) {
+ _cond.notify_all();
+ }
+ }));
+}
class DenseTensorAttribute::ForegroundLoader : public Loader {
public:
ForegroundLoader(DenseTensorAttribute & attr) : _attr(attr) {}