diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2021-08-13 14:09:40 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2021-08-13 14:14:49 +0000 |
commit | 523f065db9be6ef5defbfcd7d228f46596fc8736 (patch) | |
tree | 34fc433e761884bebc9c18d75fcebc06e1be7ef5 /searchlib | |
parent | 1292a3ad898b50de44ae8d2967e9d6de8ab536a3 (diff) |
Avoid starting a separate thread for completing index insert.
Use a queue and do completition in the forground. That ensures only a single thread
modifying the attribute.
Diffstat (limited to 'searchlib')
-rw-r--r-- | searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.cpp | 107 |
1 files changed, 73 insertions, 34 deletions
diff --git a/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.cpp b/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.cpp index 77e4f665327..62a1072de48 100644 --- a/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.cpp +++ b/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.cpp @@ -213,57 +213,96 @@ public: }; } - +/** + * Will load and index documents in parallel. Note that indexing order is not guaranteed, + * but that is inline with the guarantees vespa already has. + */ class DenseTensorAttribute::ThreadedLoader : public Loader { public: ThreadedLoader(DenseTensorAttribute & attr, vespalib::Executor & shared_executor) : _attr(attr), _shared_executor(shared_executor), - _complete_executor(1, 0x10000), + _queue(MAX_PENDING), _pending(0) {} - void load(uint32_t lid, vespalib::datastore::EntryRef ref) override { + void load(uint32_t lid, vespalib::datastore::EntryRef ref) override; + void wait_complete() override { + drainUntilPending(0); + } +private: + using Entry = std::pair<uint32_t, std::unique_ptr<PrepareResult>>; + using Queue = vespalib::ArrayQueue<Entry>; + + bool pop(Entry & entry) { + std::unique_lock guard(_mutex); + if (_queue.empty()) return false; + entry = std::move(_queue.front()); + _queue.pop(); + return true; + } + void drainQ() { + Queue queue(MAX_PENDING); { std::unique_lock guard(_mutex); - while (_pending > MAX_PENDING) { - _cond.wait(guard); - } + queue.swap(_queue); + } + while (!queue.empty()) { + auto item = std::move(queue.front()); + queue.pop(); + complete(item.first, std::move(item.second)); } - ++_pending; - _shared_executor.execute(vespalib::makeLambdaTask([this, ref, lid]() { - auto prepared = _attr._index->prepare_add_document(lid, _attr._denseTensorStore.get_typed_cells(ref), _attr.getGenerationHandler().takeGuard()); - _complete_executor.execute(vespalib::makeLambdaTask([this, lid, result=std::move(prepared)]() mutable { - _attr.setCommittedDocIdLimit(std::max(_attr.getCommittedDocIdLimit(), lid + 1)); - _attr._index->complete_add_document(lid, std::move(result)); - { - std::unique_lock guard(_mutex); - --_pending; - if ((_pending == MAX_PENDING/2) || (_pending == 0)) { - _cond.notify_all(); - } - } - if ((lid % 256) == 0) { - _attr.commit(); - }; - })); - })); } - void wait_complete() override { - std::unique_lock guard(_mutex); - while (_pending > 0) { - _cond.wait(guard); + + void complete(uint32_t lid, std::unique_ptr<PrepareResult> prepared) { + _attr.setCommittedDocIdLimit(std::max(_attr.getCommittedDocIdLimit(), lid + 1)); + _attr._index->complete_add_document(lid, std::move(prepared)); + --_pending; + if ((lid % 256) == 0) { + _attr.commit(); + }; + } + void drainUntilPending(uint32_t maxPending) { + while (_pending > maxPending) { + { + std::unique_lock guard(_mutex); + while (_queue.empty()) { + _cond.wait(guard); + } + } + drainQ(); } } -private: static constexpr uint32_t MAX_PENDING = 1000; - DenseTensorAttribute & _attr; - vespalib::Executor & _shared_executor; - vespalib::ThreadStackExecutor _complete_executor; - std::mutex _mutex; + DenseTensorAttribute & _attr; + vespalib::Executor & _shared_executor; + std::mutex _mutex; std::condition_variable _cond; - uint64_t _pending; + Queue _queue; + uint64_t _pending; // _pending is only modified in forground thread }; +void +DenseTensorAttribute::ThreadedLoader::load(uint32_t lid, vespalib::datastore::EntryRef ref) { + Entry item; + while (pop(item)) { + // First process items that are ready to complete + complete(item.first, std::move(item.second)); + } + // Then ensure that there no mor ethan MAX_PENDING inflight + drainUntilPending(MAX_PENDING); + + // Then we can issue a new one + ++_pending; + _shared_executor.execute(vespalib::makeLambdaTask([this, ref, lid]() { + auto prepared = _attr._index->prepare_add_document(lid, _attr._denseTensorStore.get_typed_cells(ref), + _attr.getGenerationHandler().takeGuard()); + std::unique_lock guard(_mutex); + _queue.push(std::make_pair(lid, std::move(prepared))); + if (_queue.size() == 1) { + _cond.notify_all(); + } + })); +} class DenseTensorAttribute::ForegroundLoader : public Loader { public: ForegroundLoader(DenseTensorAttribute & attr) : _attr(attr) {} |