aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGeir Storli <geirst@yahooinc.com>2021-11-08 11:57:36 +0100
committerGitHub <noreply@github.com>2021-11-08 11:57:36 +0100
commit757e30efaec1783c0feea9b6735cc3aa855d11d9 (patch)
tree770a0ddc72da69a577efa4f4eebad8f53488aef9
parent25894d3050b763a8184ed9387c56303496495c16 (diff)
parent4042bb8b492585aa5f88e780a770fce0d09d9284 (diff)
Merge pull request #19891 from vespa-engine/toregge/bundle-fields-using-same-low-level-executor-for-memory-indexv7.497.10
Bundle fields using same executor for memory index.
-rw-r--r--searchlib/src/vespa/searchlib/common/CMakeLists.txt1
-rw-r--r--searchlib/src/vespa/searchlib/common/schedule_sequenced_task_callback.cpp22
-rw-r--r--searchlib/src/vespa/searchlib/common/schedule_sequenced_task_callback.h28
-rw-r--r--searchlib/src/vespa/searchlib/memoryindex/CMakeLists.txt6
-rw-r--r--searchlib/src/vespa/searchlib/memoryindex/bundled_fields_context.cpp28
-rw-r--r--searchlib/src/vespa/searchlib/memoryindex/bundled_fields_context.h30
-rw-r--r--searchlib/src/vespa/searchlib/memoryindex/document_inverter.cpp105
-rw-r--r--searchlib/src/vespa/searchlib/memoryindex/document_inverter_context.cpp117
-rw-r--r--searchlib/src/vespa/searchlib/memoryindex/document_inverter_context.h9
-rw-r--r--searchlib/src/vespa/searchlib/memoryindex/invert_context.cpp21
-rw-r--r--searchlib/src/vespa/searchlib/memoryindex/invert_context.h28
-rw-r--r--searchlib/src/vespa/searchlib/memoryindex/invert_task.cpp51
-rw-r--r--searchlib/src/vespa/searchlib/memoryindex/invert_task.h39
-rw-r--r--searchlib/src/vespa/searchlib/memoryindex/push_context.cpp14
-rw-r--r--searchlib/src/vespa/searchlib/memoryindex/push_context.h20
-rw-r--r--searchlib/src/vespa/searchlib/memoryindex/push_task.cpp44
-rw-r--r--searchlib/src/vespa/searchlib/memoryindex/push_task.h36
-rw-r--r--searchlib/src/vespa/searchlib/memoryindex/remove_task.cpp44
-rw-r--r--searchlib/src/vespa/searchlib/memoryindex/remove_task.h30
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h12
20 files changed, 601 insertions, 84 deletions
diff --git a/searchlib/src/vespa/searchlib/common/CMakeLists.txt b/searchlib/src/vespa/searchlib/common/CMakeLists.txt
index 6495fa8561d..73c8999520b 100644
--- a/searchlib/src/vespa/searchlib/common/CMakeLists.txt
+++ b/searchlib/src/vespa/searchlib/common/CMakeLists.txt
@@ -25,6 +25,7 @@ vespa_add_library(searchlib_common OBJECT
packets.cpp
partialbitvector.cpp
resultset.cpp
+ schedule_sequenced_task_callback.cpp
serialnumfileheadercontext.cpp
sort.cpp
sortdata.cpp
diff --git a/searchlib/src/vespa/searchlib/common/schedule_sequenced_task_callback.cpp b/searchlib/src/vespa/searchlib/common/schedule_sequenced_task_callback.cpp
new file mode 100644
index 00000000000..6001770286e
--- /dev/null
+++ b/searchlib/src/vespa/searchlib/common/schedule_sequenced_task_callback.cpp
@@ -0,0 +1,22 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "schedule_sequenced_task_callback.h"
+
+namespace search {
+
+ScheduleSequencedTaskCallback::ScheduleSequencedTaskCallback(vespalib::ISequencedTaskExecutor& executor,
+ vespalib::ISequencedTaskExecutor::ExecutorId id,
+ std::unique_ptr<vespalib::Executor::Task> task) noexcept
+ : _executor(executor),
+ _id(id),
+ _task(std::move(task))
+{
+}
+
+
+ScheduleSequencedTaskCallback::~ScheduleSequencedTaskCallback()
+{
+ _executor.executeTask(_id, std::move(_task));
+}
+
+}
diff --git a/searchlib/src/vespa/searchlib/common/schedule_sequenced_task_callback.h b/searchlib/src/vespa/searchlib/common/schedule_sequenced_task_callback.h
new file mode 100644
index 00000000000..602ef1354e0
--- /dev/null
+++ b/searchlib/src/vespa/searchlib/common/schedule_sequenced_task_callback.h
@@ -0,0 +1,28 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include "vespa/vespalib/util/idestructorcallback.h"
+#include <vespa/vespalib/util/isequencedtaskexecutor.h>
+
+namespace search {
+
+/**
+ * Class that schedules a sequenced task when instance is
+ * destroyed. Typically a shared pointer to an instance is passed
+ * around to multiple worker threads that performs portions of a
+ * larger task before dropping the shared pointer, triggering the
+ * callback when all worker threads have completed.
+ */
+class ScheduleSequencedTaskCallback : public vespalib::IDestructorCallback
+{
+ vespalib::ISequencedTaskExecutor& _executor;
+ vespalib::ISequencedTaskExecutor::ExecutorId _id;
+ std::unique_ptr<vespalib::Executor::Task> _task;
+public:
+ ScheduleSequencedTaskCallback(vespalib::ISequencedTaskExecutor& executor,
+ vespalib::ISequencedTaskExecutor::ExecutorId id,
+ std::unique_ptr<vespalib::Executor::Task> task) noexcept;
+ ~ScheduleSequencedTaskCallback() override;
+};
+
+} // namespace search
diff --git a/searchlib/src/vespa/searchlib/memoryindex/CMakeLists.txt b/searchlib/src/vespa/searchlib/memoryindex/CMakeLists.txt
index e845201f5c6..34ac7d8e905 100644
--- a/searchlib/src/vespa/searchlib/memoryindex/CMakeLists.txt
+++ b/searchlib/src/vespa/searchlib/memoryindex/CMakeLists.txt
@@ -1,6 +1,7 @@
# Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
vespa_add_library(searchlib_memoryindex OBJECT
SOURCES
+ bundled_fields_context.cpp
compact_words_store.cpp
document_inverter.cpp
document_inverter_collection.cpp
@@ -11,9 +12,14 @@ vespa_add_library(searchlib_memoryindex OBJECT
field_index_collection.cpp
field_index_remover.cpp
field_inverter.cpp
+ invert_context.cpp
+ invert_task.cpp
memory_index.cpp
ordered_field_index_inserter.cpp
posting_iterator.cpp
+ push_context.cpp
+ push_task.cpp
+ remove_task.cpp
url_field_inverter.cpp
word_store.cpp
DEPENDS
diff --git a/searchlib/src/vespa/searchlib/memoryindex/bundled_fields_context.cpp b/searchlib/src/vespa/searchlib/memoryindex/bundled_fields_context.cpp
new file mode 100644
index 00000000000..af7e19ee20d
--- /dev/null
+++ b/searchlib/src/vespa/searchlib/memoryindex/bundled_fields_context.cpp
@@ -0,0 +1,28 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "bundled_fields_context.h"
+
+namespace search::memoryindex {
+
+BundledFieldsContext::BundledFieldsContext(vespalib::ISequencedTaskExecutor::ExecutorId id)
+ : _id(id),
+ _fields(),
+ _uri_fields()
+{
+}
+
+BundledFieldsContext::~BundledFieldsContext() = default;
+
+void
+BundledFieldsContext::add_field(uint32_t field_id)
+{
+ _fields.emplace_back(field_id);
+}
+
+void
+BundledFieldsContext::add_uri_field(uint32_t uri_field_id)
+{
+ _uri_fields.emplace_back(uri_field_id);
+}
+
+}
diff --git a/searchlib/src/vespa/searchlib/memoryindex/bundled_fields_context.h b/searchlib/src/vespa/searchlib/memoryindex/bundled_fields_context.h
new file mode 100644
index 00000000000..68dac168280
--- /dev/null
+++ b/searchlib/src/vespa/searchlib/memoryindex/bundled_fields_context.h
@@ -0,0 +1,30 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <vespa/vespalib/util/isequencedtaskexecutor.h>
+
+namespace search::memoryindex {
+
+/*
+ * Base class for PushContext and InvertContext, with mapping to
+ * the fields and uri fields handled by this context. Fields using
+ * the same thread appear in the same context.
+ */
+class BundledFieldsContext
+{
+ vespalib::ISequencedTaskExecutor::ExecutorId _id;
+ std::vector<uint32_t> _fields;
+ std::vector<uint32_t> _uri_fields;
+protected:
+ BundledFieldsContext(vespalib::ISequencedTaskExecutor::ExecutorId id);
+ ~BundledFieldsContext();
+public:
+ void add_field(uint32_t field_id);
+ void add_uri_field(uint32_t uri_field_id);
+ vespalib::ISequencedTaskExecutor::ExecutorId get_id() const noexcept { return _id; }
+ const std::vector<uint32_t>& get_fields() const noexcept { return _fields; }
+ const std::vector<uint32_t>& get_uri_fields() const noexcept { return _uri_fields; }
+};
+
+}
diff --git a/searchlib/src/vespa/searchlib/memoryindex/document_inverter.cpp b/searchlib/src/vespa/searchlib/memoryindex/document_inverter.cpp
index 879942ea5d7..16141bcd268 100644
--- a/searchlib/src/vespa/searchlib/memoryindex/document_inverter.cpp
+++ b/searchlib/src/vespa/searchlib/memoryindex/document_inverter.cpp
@@ -4,7 +4,11 @@
#include "document_inverter_context.h"
#include "i_field_index_collection.h"
#include "field_inverter.h"
+#include "invert_task.h"
+#include "push_task.h"
+#include "remove_task.h"
#include "url_field_inverter.h"
+#include <vespa/searchlib/common/schedule_sequenced_task_callback.h>
#include <vespa/vespalib/util/isequencedtaskexecutor.h>
#include <vespa/vespalib/util/retain_guard.h>
@@ -12,6 +16,7 @@ namespace search::memoryindex {
using document::Document;
using index::Schema;
+using search::ScheduleSequencedTaskCallback;
using search::index::FieldLengthCalculator;
using vespalib::ISequencedTaskExecutor;
using vespalib::RetainGuard;
@@ -56,26 +61,13 @@ DocumentInverter::~DocumentInverter()
void
DocumentInverter::invertDocument(uint32_t docId, const Document &doc)
{
- // Might want to batch inverters as we do for attributes
_context.set_data_type(doc);
- auto& schema_index_fields = _context.get_schema_index_fields();
auto& invert_threads = _context.get_invert_threads();
- for (uint32_t fieldId : schema_index_fields._textFields) {
- auto fv = _context.get_field_value(doc, fieldId);
- FieldInverter *inverter = _inverters[fieldId].get();
- invert_threads.execute(fieldId,[inverter, docId, fv(std::move(fv))]() {
- inverter->invertField(docId, fv);
- });
- }
- uint32_t urlId = 0;
- for (const auto & fi : schema_index_fields._uriFields) {
- uint32_t fieldId = fi._all;
- auto fv = _context.get_field_value(doc, fieldId);
- UrlFieldInverter *inverter = _urlInverters[urlId].get();
- invert_threads.execute(fieldId,[inverter, docId, fv(std::move(fv))]() {
- inverter->invertField(docId, fv);
- });
- ++urlId;
+ auto& invert_contexts = _context.get_invert_contexts();
+ for (auto& invert_context : invert_contexts) {
+ auto id = invert_context.get_id();
+ auto task = std::make_unique<InvertTask>(_context, invert_context, _inverters, _urlInverters, docId, doc);
+ invert_threads.executeTask(id, std::move(task));
}
}
@@ -88,73 +80,36 @@ DocumentInverter::removeDocument(uint32_t docId) {
void
DocumentInverter::removeDocuments(LidVector lids)
{
- // Might want to batch inverters as we do for attributes
- auto& schema_index_fields = _context.get_schema_index_fields();
auto& invert_threads = _context.get_invert_threads();
- for (uint32_t fieldId : schema_index_fields._textFields) {
- FieldInverter *inverter = _inverters[fieldId].get();
- invert_threads.execute(fieldId, [inverter, lids]() {
- for (uint32_t lid : lids) {
- inverter->removeDocument(lid);
- }
- });
- }
- uint32_t urlId = 0;
- for (const auto & fi : schema_index_fields._uriFields) {
- uint32_t fieldId = fi._all;
- UrlFieldInverter *inverter = _urlInverters[urlId].get();
- invert_threads.execute(fieldId, [inverter, lids]() {
- for (uint32_t lid : lids) {
- inverter->removeDocument(lid);
- }
- });
- ++urlId;
+ auto& invert_contexts = _context.get_invert_contexts();
+ for (auto& invert_context : invert_contexts) {
+ auto id = invert_context.get_id();
+ auto task = std::make_unique<RemoveTask>(invert_context, _inverters, _urlInverters, lids);
+ invert_threads.executeTask(id, std::move(task));
}
}
-namespace {
-
-template <typename Inverter>
-void push_documents_helper(ISequencedTaskExecutor& invert_threads,
- ISequencedTaskExecutor& push_threads,
- Inverter &inverter,
- uint32_t field_id,
- std::shared_ptr<vespalib::IDestructorCallback> on_write_done,
- std::shared_ptr<RetainGuard> retain)
-{
- auto invert_id = invert_threads.getExecutorId(field_id);
- auto push_id = push_threads.getExecutorId(field_id);
- invert_threads.execute(invert_id,
- [&push_threads, push_id, &inverter, retain(std::move(retain)), on_write_done(std::move(on_write_done))] () mutable
- {
- push_threads.execute(push_id,
- [&inverter, retain(std::move(retain)), on_write_done(std::move(on_write_done))]()
- {
- inverter.applyRemoves();
- inverter.pushDocuments();
- });
- });
-}
-
-}
-
void
DocumentInverter::pushDocuments(const std::shared_ptr<vespalib::IDestructorCallback> &onWriteDone)
{
auto retain = std::make_shared<RetainGuard>(_ref_count);
- auto& schema_index_fields = _context.get_schema_index_fields();
- auto& invert_threads = _context.get_invert_threads();
+ using PushTasks = std::vector<std::shared_ptr<ScheduleSequencedTaskCallback>>;
+ PushTasks all_push_tasks;
auto& push_threads = _context.get_push_threads();
- for (uint32_t field_id : schema_index_fields._textFields) {
- auto& inverter = *_inverters[field_id];
- push_documents_helper(invert_threads, push_threads, inverter, field_id, onWriteDone, retain);
+ auto& push_contexts = _context.get_push_contexts();
+ for (auto& push_context : push_contexts) {
+ auto task = std::make_unique<PushTask>(push_context, _inverters, _urlInverters, onWriteDone, retain);
+ all_push_tasks.emplace_back(std::make_shared<ScheduleSequencedTaskCallback>(push_threads, push_context.get_id(), std::move(task)));
}
- uint32_t uri_field_id = 0;
- for (const auto& uri_field : schema_index_fields._uriFields) {
- uint32_t field_id = uri_field._all;
- auto& inverter = *_urlInverters[uri_field_id];
- push_documents_helper(invert_threads, push_threads, inverter, field_id, onWriteDone, retain);
- ++uri_field_id;
+ auto& invert_threads = _context.get_invert_threads();
+ auto& invert_contexts = _context.get_invert_contexts();
+ for (auto& invert_context : invert_contexts) {
+ PushTasks push_tasks;
+ for (auto& pusher : invert_context.get_pushers()) {
+ assert(pusher < all_push_tasks.size());
+ push_tasks.emplace_back(all_push_tasks[pusher]);
+ }
+ invert_threads.execute(invert_context.get_id(), [push_tasks(std::move(push_tasks))]() { });
}
}
diff --git a/searchlib/src/vespa/searchlib/memoryindex/document_inverter_context.cpp b/searchlib/src/vespa/searchlib/memoryindex/document_inverter_context.cpp
index 8fea82229c8..84aeff92bed 100644
--- a/searchlib/src/vespa/searchlib/memoryindex/document_inverter_context.cpp
+++ b/searchlib/src/vespa/searchlib/memoryindex/document_inverter_context.cpp
@@ -15,6 +15,110 @@ using document::Document;
using document::DocumentType;
using document::Field;
using vespalib::ISequencedTaskExecutor;
+using index::SchemaIndexFields;
+
+namespace {
+
+template <typename Context>
+void make_contexts(const SchemaIndexFields& schema_index_fields, ISequencedTaskExecutor& executor, std::vector<Context>& contexts)
+{
+ using ExecutorId = ISequencedTaskExecutor::ExecutorId;
+ using IdMapping = std::vector<std::tuple<ExecutorId, bool, uint32_t>>;
+ IdMapping map;
+ for (uint32_t field_id : schema_index_fields._textFields) {
+ // TODO: Add bias when sharing sequenced task executor between document types
+ map.emplace_back(executor.getExecutorId(field_id), false, field_id);
+ }
+ uint32_t uri_field_id = 0;
+ for (auto& uri_field : schema_index_fields._uriFields) {
+ // TODO: Add bias when sharing sequenced task executor between document types
+ map.emplace_back(executor.getExecutorId(uri_field._all), true, uri_field_id);
+ ++uri_field_id;
+ }
+ std::sort(map.begin(), map.end());
+ std::optional<ExecutorId> prev_id;
+ for (auto& entry : map) {
+ if (!prev_id.has_value() || prev_id.value() != std::get<0>(entry)) {
+ contexts.emplace_back(std::get<0>(entry));
+ prev_id = std::get<0>(entry);
+ }
+ if (std::get<1>(entry)) {
+ contexts.back().add_uri_field(std::get<2>(entry));
+ } else {
+ contexts.back().add_field(std::get<2>(entry));
+ }
+ }
+}
+
+class PusherMapping {
+ std::vector<std::optional<uint32_t>> _pushers;
+public:
+ PusherMapping(size_t size);
+ ~PusherMapping();
+
+ void add_mapping(const std::vector<uint32_t>& fields, uint32_t pusher_id) {
+ for (auto field_id : fields) {
+ assert(field_id < _pushers.size());
+ auto& opt_pusher = _pushers[field_id];
+ assert(!opt_pusher.has_value());
+ opt_pusher = pusher_id;
+ }
+ }
+
+ void use_mapping(const std::vector<uint32_t>& fields, std::vector<uint32_t>& pushers) {
+ for (auto field_id : fields) {
+ assert(field_id < _pushers.size());
+ auto& opt_pusher = _pushers[field_id];
+ assert(opt_pusher.has_value());
+ pushers.emplace_back(opt_pusher.value());
+ }
+ }
+};
+
+PusherMapping::PusherMapping(size_t size)
+ : _pushers(size)
+{
+}
+
+PusherMapping::~PusherMapping() = default;
+
+/*
+ * Connect contexts for inverting to contexts for pushing. If we use
+ * different sequenced task executors or adds different biases to the
+ * getExecutorId() argument (to enable double buffering) then contexts
+ * for inverting and contexts for pushing will bundle different sets
+ * of fields, preventing a 1:1 mapping. If we use the same sequenced
+ * task executor and drop double buffering then we can simplify this
+ * to a 1:1 mapping.
+ */
+void connect_contexts(std::vector<InvertContext>& invert_contexts,
+ const std::vector<PushContext>& push_contexts,
+ uint32_t num_fields,
+ uint32_t num_uri_fields)
+{
+ PusherMapping field_to_pusher(num_fields);
+ PusherMapping uri_field_to_pusher(num_uri_fields);
+ uint32_t pusher_id = 0;
+ for (auto& push_context : push_contexts) {
+ field_to_pusher.add_mapping(push_context.get_fields(), pusher_id);
+ uri_field_to_pusher.add_mapping(push_context.get_uri_fields(), pusher_id);
+ ++pusher_id;
+ }
+ std::vector<uint32_t> pushers;
+ for (auto& invert_context : invert_contexts) {
+ pushers.clear();
+ field_to_pusher.use_mapping(invert_context.get_fields(), pushers);
+ uri_field_to_pusher.use_mapping(invert_context.get_uri_fields(), pushers);
+ std::sort(pushers.begin(), pushers.end());
+ auto last = std::unique(pushers.begin(), pushers.end());
+ pushers.erase(last, pushers.end());
+ for (auto pusher : pushers) {
+ invert_context.add_pusher(pusher);
+ }
+ }
+}
+
+}
void
DocumentInverterContext::add_field(const DocumentType& doc_type, uint32_t fieldId)
@@ -57,9 +161,12 @@ DocumentInverterContext::DocumentInverterContext(const index::Schema& schema,
_schema_index_fields(),
_invert_threads(invert_threads),
_push_threads(push_threads),
- _field_indexes(field_indexes)
+ _field_indexes(field_indexes),
+ _invert_contexts(),
+ _push_contexts()
{
_schema_index_fields.setup(schema);
+ setup_contexts();
}
DocumentInverterContext::~DocumentInverterContext() = default;
@@ -83,4 +190,12 @@ DocumentInverterContext::get_field_value(const Document& doc, uint32_t field_id)
return {};
}
+void
+DocumentInverterContext::setup_contexts()
+{
+ make_contexts(_schema_index_fields, _invert_threads, _invert_contexts);
+ make_contexts(_schema_index_fields, _push_threads, _push_contexts);
+ connect_contexts(_invert_contexts, _push_contexts, _schema.getNumIndexFields(), _schema_index_fields._uriFields.size());
+}
+
}
diff --git a/searchlib/src/vespa/searchlib/memoryindex/document_inverter_context.h b/searchlib/src/vespa/searchlib/memoryindex/document_inverter_context.h
index 7330f4376ea..54a1fff90a4 100644
--- a/searchlib/src/vespa/searchlib/memoryindex/document_inverter_context.h
+++ b/searchlib/src/vespa/searchlib/memoryindex/document_inverter_context.h
@@ -3,6 +3,8 @@
#pragma once
#include <vespa/searchlib/index/schema_index_fields.h>
+#include "invert_context.h"
+#include "push_context.h"
#include <memory>
#include <vector>
@@ -14,8 +16,6 @@ class Field;
class FieldValue;
}
-namespace vespalib { class ISequencedTaskExecutor; }
-
namespace search::memoryindex {
class IFieldIndexCollection;
@@ -33,8 +33,11 @@ class DocumentInverterContext {
vespalib::ISequencedTaskExecutor& _invert_threads;
vespalib::ISequencedTaskExecutor& _push_threads;
IFieldIndexCollection& _field_indexes;
+ std::vector<InvertContext> _invert_contexts;
+ std::vector<PushContext> _push_contexts;
void add_field(const document::DocumentType& doc_type, uint32_t fieldId);
void build_fields(const document::DocumentType& doc_type, const document::DataType* data_type);
+ void setup_contexts();
public:
DocumentInverterContext(const index::Schema &schema,
vespalib::ISequencedTaskExecutor &invert_threads,
@@ -48,6 +51,8 @@ public:
vespalib::ISequencedTaskExecutor& get_push_threads() noexcept { return _push_threads; }
IFieldIndexCollection& get_field_indexes() noexcept { return _field_indexes; }
std::unique_ptr<document::FieldValue> get_field_value(const document::Document& doc, uint32_t field_id) const;
+ const std::vector<InvertContext>& get_invert_contexts() const noexcept { return _invert_contexts; }
+ const std::vector<PushContext>& get_push_contexts() const noexcept { return _push_contexts; }
};
}
diff --git a/searchlib/src/vespa/searchlib/memoryindex/invert_context.cpp b/searchlib/src/vespa/searchlib/memoryindex/invert_context.cpp
new file mode 100644
index 00000000000..fd58cb6595e
--- /dev/null
+++ b/searchlib/src/vespa/searchlib/memoryindex/invert_context.cpp
@@ -0,0 +1,21 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "invert_context.h"
+
+namespace search::memoryindex {
+
+InvertContext::InvertContext(vespalib::ISequencedTaskExecutor::ExecutorId id)
+ : BundledFieldsContext(id),
+ _pushers()
+{
+}
+
+InvertContext::~InvertContext() = default;
+
+void
+InvertContext::add_pusher(uint32_t pusher_id)
+{
+ _pushers.emplace_back(pusher_id);
+}
+
+}
diff --git a/searchlib/src/vespa/searchlib/memoryindex/invert_context.h b/searchlib/src/vespa/searchlib/memoryindex/invert_context.h
new file mode 100644
index 00000000000..4d2ebddd647
--- /dev/null
+++ b/searchlib/src/vespa/searchlib/memoryindex/invert_context.h
@@ -0,0 +1,28 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "bundled_fields_context.h"
+
+namespace search::memoryindex {
+
+/*
+ * Context used by an InvertTask to invert a set of document fields
+ * into corresponding field inverters or by a RemoveTask to remove
+ * documents from a set of field inverters.
+ *
+ * It is also used by DocumentInverter::pushDocuments() to execute
+ * PushTask at the proper time (i.e. when all related InvertTask /
+ * RemoveTask operations have completed).
+ */
+class InvertContext : public BundledFieldsContext
+{
+ std::vector<uint32_t> _pushers;
+public:
+ void add_pusher(uint32_t pusher_id);
+ InvertContext(vespalib::ISequencedTaskExecutor::ExecutorId id);
+ ~InvertContext();
+ const std::vector<uint32_t>& get_pushers() const noexcept { return _pushers; }
+};
+
+}
diff --git a/searchlib/src/vespa/searchlib/memoryindex/invert_task.cpp b/searchlib/src/vespa/searchlib/memoryindex/invert_task.cpp
new file mode 100644
index 00000000000..2fb1ccf2444
--- /dev/null
+++ b/searchlib/src/vespa/searchlib/memoryindex/invert_task.cpp
@@ -0,0 +1,51 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "invert_task.h"
+#include "document_inverter_context.h"
+#include "field_inverter.h"
+#include "invert_context.h"
+#include "url_field_inverter.h"
+
+namespace search::memoryindex {
+
+InvertTask::InvertTask(const DocumentInverterContext& inv_context, const InvertContext& context, const std::vector<std::unique_ptr<FieldInverter>>& inverters, const std::vector<std::unique_ptr<UrlFieldInverter>>& uri_inverters, uint32_t lid, const document::Document& doc)
+ : _inv_context(inv_context),
+ _context(context),
+ _inverters(inverters),
+ _uri_inverters(uri_inverters),
+ _field_values(),
+ _uri_field_values(),
+ _lid(lid)
+{
+ _field_values.reserve(_context.get_fields().size());
+ _uri_field_values.reserve(_context.get_uri_fields().size());
+ for (uint32_t field_id : _context.get_fields()) {
+ _field_values.emplace_back(_inv_context.get_field_value(doc, field_id));
+ }
+ const auto& schema_index_fields = _inv_context.get_schema_index_fields();
+ for (uint32_t uri_field_id : _context.get_uri_fields()) {
+ uint32_t field_id = schema_index_fields._uriFields[uri_field_id]._all;
+ _uri_field_values.emplace_back(_inv_context.get_field_value(doc, field_id));
+ }
+}
+
+InvertTask::~InvertTask() = default;
+
+void
+InvertTask::run()
+{
+ assert(_field_values.size() == _context.get_fields().size());
+ assert(_uri_field_values.size() == _context.get_uri_fields().size());
+ auto fv_itr = _field_values.begin();
+ for (auto field_id : _context.get_fields()) {
+ _inverters[field_id]->invertField(_lid, *fv_itr);
+ ++fv_itr;
+ }
+ auto uri_fv_itr = _uri_field_values.begin();
+ for (auto uri_field_id : _context.get_uri_fields()) {
+ _uri_inverters[uri_field_id]->invertField(_lid, *uri_fv_itr);
+ ++uri_fv_itr;
+ }
+}
+
+}
diff --git a/searchlib/src/vespa/searchlib/memoryindex/invert_task.h b/searchlib/src/vespa/searchlib/memoryindex/invert_task.h
new file mode 100644
index 00000000000..48f999a963d
--- /dev/null
+++ b/searchlib/src/vespa/searchlib/memoryindex/invert_task.h
@@ -0,0 +1,39 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <vespa/vespalib/util/executor.h>
+#include <vector>
+
+namespace document {
+class Document;
+class FieldValue;
+}
+
+namespace search::memoryindex {
+
+class DocumentInverterContext;
+class FieldInverter;
+class InvertContext;
+class UrlFieldInverter;
+
+/*
+ * Task to invert a set of document fields into related field
+ * inverters and uri field inverters.
+ */
+class InvertTask : public vespalib::Executor::Task
+{
+ const DocumentInverterContext& _inv_context;
+ const InvertContext& _context;
+ const std::vector<std::unique_ptr<FieldInverter>>& _inverters;
+ const std::vector<std::unique_ptr<UrlFieldInverter>>& _uri_inverters;
+ std::vector<std::unique_ptr<document::FieldValue>> _field_values;
+ std::vector<std::unique_ptr<document::FieldValue>> _uri_field_values;
+ uint32_t _lid;
+public:
+ InvertTask(const DocumentInverterContext& inv_context, const InvertContext& context, const std::vector<std::unique_ptr<FieldInverter>>& inverters, const std::vector<std::unique_ptr<UrlFieldInverter>>& uri_inverters, uint32_t lid, const document::Document& doc);
+ ~InvertTask() override;
+ void run() override;
+};
+
+}
diff --git a/searchlib/src/vespa/searchlib/memoryindex/push_context.cpp b/searchlib/src/vespa/searchlib/memoryindex/push_context.cpp
new file mode 100644
index 00000000000..5a4a773a6f5
--- /dev/null
+++ b/searchlib/src/vespa/searchlib/memoryindex/push_context.cpp
@@ -0,0 +1,14 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "push_context.h"
+
+namespace search::memoryindex {
+
+PushContext::PushContext(vespalib::ISequencedTaskExecutor::ExecutorId id)
+ : BundledFieldsContext(id)
+{
+}
+
+PushContext::~PushContext() = default;
+
+}
diff --git a/searchlib/src/vespa/searchlib/memoryindex/push_context.h b/searchlib/src/vespa/searchlib/memoryindex/push_context.h
new file mode 100644
index 00000000000..0e96346837e
--- /dev/null
+++ b/searchlib/src/vespa/searchlib/memoryindex/push_context.h
@@ -0,0 +1,20 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "bundled_fields_context.h"
+
+namespace search::memoryindex {
+
+/*
+ * Context for pushing inverted data to memory index structure for a set
+ * of fields and uri fields. Currently used by PushTask.
+ */
+class PushContext : public BundledFieldsContext
+{
+public:
+ PushContext(vespalib::ISequencedTaskExecutor::ExecutorId id);
+ ~PushContext();
+};
+
+}
diff --git a/searchlib/src/vespa/searchlib/memoryindex/push_task.cpp b/searchlib/src/vespa/searchlib/memoryindex/push_task.cpp
new file mode 100644
index 00000000000..0eca882836b
--- /dev/null
+++ b/searchlib/src/vespa/searchlib/memoryindex/push_task.cpp
@@ -0,0 +1,44 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "push_task.h"
+#include "push_context.h"
+#include "field_inverter.h"
+#include "url_field_inverter.h"
+
+namespace search::memoryindex {
+
+namespace {
+
+template <typename Inverter>
+void push_inverter(Inverter& inverter)
+{
+ inverter.applyRemoves();
+ inverter.pushDocuments();
+}
+
+}
+
+
+PushTask::PushTask(const PushContext& context, const std::vector<std::unique_ptr<FieldInverter>>& inverters, const std::vector<std::unique_ptr<UrlFieldInverter>>& uri_inverters, std::shared_ptr<vespalib::IDestructorCallback> on_write_done, std::shared_ptr<vespalib::RetainGuard> retain)
+ : _context(context),
+ _inverters(inverters),
+ _uri_inverters(uri_inverters),
+ _on_write_done(std::move(on_write_done)),
+ _retain(std::move(retain))
+{
+}
+
+PushTask::~PushTask() = default;
+
+void
+PushTask::run()
+{
+ for (auto field_id : _context.get_fields()) {
+ push_inverter(*_inverters[field_id]);
+ }
+ for (auto uri_field_id : _context.get_uri_fields()) {
+ push_inverter(*_uri_inverters[uri_field_id]);
+ }
+}
+
+}
diff --git a/searchlib/src/vespa/searchlib/memoryindex/push_task.h b/searchlib/src/vespa/searchlib/memoryindex/push_task.h
new file mode 100644
index 00000000000..24474fb2003
--- /dev/null
+++ b/searchlib/src/vespa/searchlib/memoryindex/push_task.h
@@ -0,0 +1,36 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <vespa/vespalib/util/executor.h>
+#include <vector>
+
+namespace vespalib {
+class IDestructorCallback;
+class RetainGuard;
+}
+
+namespace search::memoryindex {
+
+class FieldInverter;
+class PushContext;
+class UrlFieldInverter;
+
+/*
+ * Task to push inverted data from a set of field inverters and uri
+ * field inverters to to memory index structure.
+ */
+class PushTask : public vespalib::Executor::Task
+{
+ const PushContext& _context;
+ const std::vector<std::unique_ptr<FieldInverter>>& _inverters;
+ const std::vector<std::unique_ptr<UrlFieldInverter>>& _uri_inverters;
+ std::shared_ptr<vespalib::IDestructorCallback> _on_write_done;
+ std::shared_ptr<vespalib::RetainGuard> _retain;
+public:
+ PushTask(const PushContext& context, const std::vector<std::unique_ptr<FieldInverter>>& inverters, const std::vector<std::unique_ptr<UrlFieldInverter>>& uri_inverters, std::shared_ptr<vespalib::IDestructorCallback> on_write_done, std::shared_ptr<vespalib::RetainGuard> retain);
+ ~PushTask() override;
+ void run() override;
+};
+
+}
diff --git a/searchlib/src/vespa/searchlib/memoryindex/remove_task.cpp b/searchlib/src/vespa/searchlib/memoryindex/remove_task.cpp
new file mode 100644
index 00000000000..d19abd50274
--- /dev/null
+++ b/searchlib/src/vespa/searchlib/memoryindex/remove_task.cpp
@@ -0,0 +1,44 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "remove_task.h"
+#include "document_inverter_context.h"
+#include "field_inverter.h"
+#include "invert_context.h"
+#include "url_field_inverter.h"
+
+namespace search::memoryindex {
+
+namespace {
+
+template <typename Inverter>
+void remove_documents(Inverter& inverter, const std::vector<uint32_t>& lids)
+{
+ for (auto lid : lids) {
+ inverter.removeDocument(lid);
+ }
+}
+
+}
+
+RemoveTask::RemoveTask(const InvertContext& context, const std::vector<std::unique_ptr<FieldInverter>>& inverters, const std::vector<std::unique_ptr<UrlFieldInverter>>& uri_inverters, const std::vector<uint32_t>& lids)
+ : _context(context),
+ _inverters(inverters),
+ _uri_inverters(uri_inverters),
+ _lids(lids)
+{
+}
+
+RemoveTask::~RemoveTask() = default;
+
+void
+RemoveTask::run()
+{
+ for (auto field_id : _context.get_fields()) {
+ remove_documents(*_inverters[field_id], _lids);
+ }
+ for (auto uri_field_id : _context.get_uri_fields()) {
+ remove_documents(*_uri_inverters[uri_field_id], _lids);
+ }
+}
+
+}
diff --git a/searchlib/src/vespa/searchlib/memoryindex/remove_task.h b/searchlib/src/vespa/searchlib/memoryindex/remove_task.h
new file mode 100644
index 00000000000..5eba4390752
--- /dev/null
+++ b/searchlib/src/vespa/searchlib/memoryindex/remove_task.h
@@ -0,0 +1,30 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <vespa/vespalib/util/executor.h>
+#include <vector>
+
+namespace search::memoryindex {
+
+class FieldInverter;
+class InvertContext;
+class UrlFieldInverter;
+
+/*
+ * Task to remove a document from a set of field inverters and uri
+ * field inverters.
+ */
+class RemoveTask : public vespalib::Executor::Task
+{
+ const InvertContext& _context;
+ const std::vector<std::unique_ptr<FieldInverter>>& _inverters;
+ const std::vector<std::unique_ptr<UrlFieldInverter>>& _uri_inverters;
+ std::vector<uint32_t> _lids;
+public:
+ RemoveTask(const InvertContext& context, const std::vector<std::unique_ptr<FieldInverter>>& inverters, const std::vector<std::unique_ptr<UrlFieldInverter>>& uri_inverters, const std::vector<uint32_t>& lids);
+ ~RemoveTask() override;
+ void run() override;
+};
+
+}
diff --git a/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h b/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h
index 8268363d335..06cc37616c0 100644
--- a/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h
+++ b/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h
@@ -19,12 +19,12 @@ class ISequencedTaskExecutor
public:
class ExecutorId {
public:
- ExecutorId() : ExecutorId(0) { }
- explicit ExecutorId(uint32_t id) : _id(id) { }
- uint32_t getId() const { return _id; }
- bool operator != (ExecutorId rhs) const { return _id != rhs._id; }
- bool operator == (ExecutorId rhs) const { return _id == rhs._id; }
- bool operator < (ExecutorId rhs) const { return _id < rhs._id; }
+ ExecutorId() noexcept : ExecutorId(0) { }
+ explicit ExecutorId(uint32_t id) noexcept : _id(id) { }
+ uint32_t getId() const noexcept { return _id; }
+ bool operator != (ExecutorId rhs) const noexcept { return _id != rhs._id; }
+ bool operator == (ExecutorId rhs) const noexcept { return _id == rhs._id; }
+ bool operator < (ExecutorId rhs) const noexcept { return _id < rhs._id; }
private:
uint32_t _id;
};