diff options
Diffstat (limited to 'searchlib/src/vespa/searchlib/memoryindex/document_inverter.cpp')
-rw-r--r-- | searchlib/src/vespa/searchlib/memoryindex/document_inverter.cpp | 105 |
1 files changed, 30 insertions, 75 deletions
diff --git a/searchlib/src/vespa/searchlib/memoryindex/document_inverter.cpp b/searchlib/src/vespa/searchlib/memoryindex/document_inverter.cpp index 879942ea5d7..16141bcd268 100644 --- a/searchlib/src/vespa/searchlib/memoryindex/document_inverter.cpp +++ b/searchlib/src/vespa/searchlib/memoryindex/document_inverter.cpp @@ -4,7 +4,11 @@ #include "document_inverter_context.h" #include "i_field_index_collection.h" #include "field_inverter.h" +#include "invert_task.h" +#include "push_task.h" +#include "remove_task.h" #include "url_field_inverter.h" +#include <vespa/searchlib/common/schedule_sequenced_task_callback.h> #include <vespa/vespalib/util/isequencedtaskexecutor.h> #include <vespa/vespalib/util/retain_guard.h> @@ -12,6 +16,7 @@ namespace search::memoryindex { using document::Document; using index::Schema; +using search::ScheduleSequencedTaskCallback; using search::index::FieldLengthCalculator; using vespalib::ISequencedTaskExecutor; using vespalib::RetainGuard; @@ -56,26 +61,13 @@ DocumentInverter::~DocumentInverter() void DocumentInverter::invertDocument(uint32_t docId, const Document &doc) { - // Might want to batch inverters as we do for attributes _context.set_data_type(doc); - auto& schema_index_fields = _context.get_schema_index_fields(); auto& invert_threads = _context.get_invert_threads(); - for (uint32_t fieldId : schema_index_fields._textFields) { - auto fv = _context.get_field_value(doc, fieldId); - FieldInverter *inverter = _inverters[fieldId].get(); - invert_threads.execute(fieldId,[inverter, docId, fv(std::move(fv))]() { - inverter->invertField(docId, fv); - }); - } - uint32_t urlId = 0; - for (const auto & fi : schema_index_fields._uriFields) { - uint32_t fieldId = fi._all; - auto fv = _context.get_field_value(doc, fieldId); - UrlFieldInverter *inverter = _urlInverters[urlId].get(); - invert_threads.execute(fieldId,[inverter, docId, fv(std::move(fv))]() { - inverter->invertField(docId, fv); - }); - ++urlId; + auto& invert_contexts = _context.get_invert_contexts(); + for (auto& invert_context : invert_contexts) { + auto id = invert_context.get_id(); + auto task = std::make_unique<InvertTask>(_context, invert_context, _inverters, _urlInverters, docId, doc); + invert_threads.executeTask(id, std::move(task)); } } @@ -88,73 +80,36 @@ DocumentInverter::removeDocument(uint32_t docId) { void DocumentInverter::removeDocuments(LidVector lids) { - // Might want to batch inverters as we do for attributes - auto& schema_index_fields = _context.get_schema_index_fields(); auto& invert_threads = _context.get_invert_threads(); - for (uint32_t fieldId : schema_index_fields._textFields) { - FieldInverter *inverter = _inverters[fieldId].get(); - invert_threads.execute(fieldId, [inverter, lids]() { - for (uint32_t lid : lids) { - inverter->removeDocument(lid); - } - }); - } - uint32_t urlId = 0; - for (const auto & fi : schema_index_fields._uriFields) { - uint32_t fieldId = fi._all; - UrlFieldInverter *inverter = _urlInverters[urlId].get(); - invert_threads.execute(fieldId, [inverter, lids]() { - for (uint32_t lid : lids) { - inverter->removeDocument(lid); - } - }); - ++urlId; + auto& invert_contexts = _context.get_invert_contexts(); + for (auto& invert_context : invert_contexts) { + auto id = invert_context.get_id(); + auto task = std::make_unique<RemoveTask>(invert_context, _inverters, _urlInverters, lids); + invert_threads.executeTask(id, std::move(task)); } } -namespace { - -template <typename Inverter> -void push_documents_helper(ISequencedTaskExecutor& invert_threads, - ISequencedTaskExecutor& push_threads, - Inverter &inverter, - uint32_t field_id, - std::shared_ptr<vespalib::IDestructorCallback> on_write_done, - std::shared_ptr<RetainGuard> retain) -{ - auto invert_id = invert_threads.getExecutorId(field_id); - auto push_id = push_threads.getExecutorId(field_id); - invert_threads.execute(invert_id, - [&push_threads, push_id, &inverter, retain(std::move(retain)), on_write_done(std::move(on_write_done))] () mutable - { - push_threads.execute(push_id, - [&inverter, retain(std::move(retain)), on_write_done(std::move(on_write_done))]() - { - inverter.applyRemoves(); - inverter.pushDocuments(); - }); - }); -} - -} - void DocumentInverter::pushDocuments(const std::shared_ptr<vespalib::IDestructorCallback> &onWriteDone) { auto retain = std::make_shared<RetainGuard>(_ref_count); - auto& schema_index_fields = _context.get_schema_index_fields(); - auto& invert_threads = _context.get_invert_threads(); + using PushTasks = std::vector<std::shared_ptr<ScheduleSequencedTaskCallback>>; + PushTasks all_push_tasks; auto& push_threads = _context.get_push_threads(); - for (uint32_t field_id : schema_index_fields._textFields) { - auto& inverter = *_inverters[field_id]; - push_documents_helper(invert_threads, push_threads, inverter, field_id, onWriteDone, retain); + auto& push_contexts = _context.get_push_contexts(); + for (auto& push_context : push_contexts) { + auto task = std::make_unique<PushTask>(push_context, _inverters, _urlInverters, onWriteDone, retain); + all_push_tasks.emplace_back(std::make_shared<ScheduleSequencedTaskCallback>(push_threads, push_context.get_id(), std::move(task))); } - uint32_t uri_field_id = 0; - for (const auto& uri_field : schema_index_fields._uriFields) { - uint32_t field_id = uri_field._all; - auto& inverter = *_urlInverters[uri_field_id]; - push_documents_helper(invert_threads, push_threads, inverter, field_id, onWriteDone, retain); - ++uri_field_id; + auto& invert_threads = _context.get_invert_threads(); + auto& invert_contexts = _context.get_invert_contexts(); + for (auto& invert_context : invert_contexts) { + PushTasks push_tasks; + for (auto& pusher : invert_context.get_pushers()) { + assert(pusher < all_push_tasks.size()); + push_tasks.emplace_back(all_push_tasks[pusher]); + } + invert_threads.execute(invert_context.get_id(), [push_tasks(std::move(push_tasks))]() { }); } } |