summaryrefslogtreecommitdiffstats
path: root/searchlib/src/vespa/searchlib/memoryindex/document_inverter.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'searchlib/src/vespa/searchlib/memoryindex/document_inverter.cpp')
-rw-r--r--searchlib/src/vespa/searchlib/memoryindex/document_inverter.cpp105
1 files changed, 30 insertions, 75 deletions
diff --git a/searchlib/src/vespa/searchlib/memoryindex/document_inverter.cpp b/searchlib/src/vespa/searchlib/memoryindex/document_inverter.cpp
index 879942ea5d7..16141bcd268 100644
--- a/searchlib/src/vespa/searchlib/memoryindex/document_inverter.cpp
+++ b/searchlib/src/vespa/searchlib/memoryindex/document_inverter.cpp
@@ -4,7 +4,11 @@
#include "document_inverter_context.h"
#include "i_field_index_collection.h"
#include "field_inverter.h"
+#include "invert_task.h"
+#include "push_task.h"
+#include "remove_task.h"
#include "url_field_inverter.h"
+#include <vespa/searchlib/common/schedule_sequenced_task_callback.h>
#include <vespa/vespalib/util/isequencedtaskexecutor.h>
#include <vespa/vespalib/util/retain_guard.h>
@@ -12,6 +16,7 @@ namespace search::memoryindex {
using document::Document;
using index::Schema;
+using search::ScheduleSequencedTaskCallback;
using search::index::FieldLengthCalculator;
using vespalib::ISequencedTaskExecutor;
using vespalib::RetainGuard;
@@ -56,26 +61,13 @@ DocumentInverter::~DocumentInverter()
void
DocumentInverter::invertDocument(uint32_t docId, const Document &doc)
{
- // Might want to batch inverters as we do for attributes
_context.set_data_type(doc);
- auto& schema_index_fields = _context.get_schema_index_fields();
auto& invert_threads = _context.get_invert_threads();
- for (uint32_t fieldId : schema_index_fields._textFields) {
- auto fv = _context.get_field_value(doc, fieldId);
- FieldInverter *inverter = _inverters[fieldId].get();
- invert_threads.execute(fieldId,[inverter, docId, fv(std::move(fv))]() {
- inverter->invertField(docId, fv);
- });
- }
- uint32_t urlId = 0;
- for (const auto & fi : schema_index_fields._uriFields) {
- uint32_t fieldId = fi._all;
- auto fv = _context.get_field_value(doc, fieldId);
- UrlFieldInverter *inverter = _urlInverters[urlId].get();
- invert_threads.execute(fieldId,[inverter, docId, fv(std::move(fv))]() {
- inverter->invertField(docId, fv);
- });
- ++urlId;
+ auto& invert_contexts = _context.get_invert_contexts();
+ for (auto& invert_context : invert_contexts) {
+ auto id = invert_context.get_id();
+ auto task = std::make_unique<InvertTask>(_context, invert_context, _inverters, _urlInverters, docId, doc);
+ invert_threads.executeTask(id, std::move(task));
}
}
@@ -88,73 +80,36 @@ DocumentInverter::removeDocument(uint32_t docId) {
void
DocumentInverter::removeDocuments(LidVector lids)
{
- // Might want to batch inverters as we do for attributes
- auto& schema_index_fields = _context.get_schema_index_fields();
auto& invert_threads = _context.get_invert_threads();
- for (uint32_t fieldId : schema_index_fields._textFields) {
- FieldInverter *inverter = _inverters[fieldId].get();
- invert_threads.execute(fieldId, [inverter, lids]() {
- for (uint32_t lid : lids) {
- inverter->removeDocument(lid);
- }
- });
- }
- uint32_t urlId = 0;
- for (const auto & fi : schema_index_fields._uriFields) {
- uint32_t fieldId = fi._all;
- UrlFieldInverter *inverter = _urlInverters[urlId].get();
- invert_threads.execute(fieldId, [inverter, lids]() {
- for (uint32_t lid : lids) {
- inverter->removeDocument(lid);
- }
- });
- ++urlId;
+ auto& invert_contexts = _context.get_invert_contexts();
+ for (auto& invert_context : invert_contexts) {
+ auto id = invert_context.get_id();
+ auto task = std::make_unique<RemoveTask>(invert_context, _inverters, _urlInverters, lids);
+ invert_threads.executeTask(id, std::move(task));
}
}
-namespace {
-
-template <typename Inverter>
-void push_documents_helper(ISequencedTaskExecutor& invert_threads,
- ISequencedTaskExecutor& push_threads,
- Inverter &inverter,
- uint32_t field_id,
- std::shared_ptr<vespalib::IDestructorCallback> on_write_done,
- std::shared_ptr<RetainGuard> retain)
-{
- auto invert_id = invert_threads.getExecutorId(field_id);
- auto push_id = push_threads.getExecutorId(field_id);
- invert_threads.execute(invert_id,
- [&push_threads, push_id, &inverter, retain(std::move(retain)), on_write_done(std::move(on_write_done))] () mutable
- {
- push_threads.execute(push_id,
- [&inverter, retain(std::move(retain)), on_write_done(std::move(on_write_done))]()
- {
- inverter.applyRemoves();
- inverter.pushDocuments();
- });
- });
-}
-
-}
-
void
DocumentInverter::pushDocuments(const std::shared_ptr<vespalib::IDestructorCallback> &onWriteDone)
{
auto retain = std::make_shared<RetainGuard>(_ref_count);
- auto& schema_index_fields = _context.get_schema_index_fields();
- auto& invert_threads = _context.get_invert_threads();
+ using PushTasks = std::vector<std::shared_ptr<ScheduleSequencedTaskCallback>>;
+ PushTasks all_push_tasks;
auto& push_threads = _context.get_push_threads();
- for (uint32_t field_id : schema_index_fields._textFields) {
- auto& inverter = *_inverters[field_id];
- push_documents_helper(invert_threads, push_threads, inverter, field_id, onWriteDone, retain);
+ auto& push_contexts = _context.get_push_contexts();
+ for (auto& push_context : push_contexts) {
+ auto task = std::make_unique<PushTask>(push_context, _inverters, _urlInverters, onWriteDone, retain);
+ all_push_tasks.emplace_back(std::make_shared<ScheduleSequencedTaskCallback>(push_threads, push_context.get_id(), std::move(task)));
}
- uint32_t uri_field_id = 0;
- for (const auto& uri_field : schema_index_fields._uriFields) {
- uint32_t field_id = uri_field._all;
- auto& inverter = *_urlInverters[uri_field_id];
- push_documents_helper(invert_threads, push_threads, inverter, field_id, onWriteDone, retain);
- ++uri_field_id;
+ auto& invert_threads = _context.get_invert_threads();
+ auto& invert_contexts = _context.get_invert_contexts();
+ for (auto& invert_context : invert_contexts) {
+ PushTasks push_tasks;
+ for (auto& pusher : invert_context.get_pushers()) {
+ assert(pusher < all_push_tasks.size());
+ push_tasks.emplace_back(all_push_tasks[pusher]);
+ }
+ invert_threads.execute(invert_context.get_id(), [push_tasks(std::move(push_tasks))]() { });
}
}