aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@online.no>2021-11-08 18:34:29 +0100
committerTor Egge <Tor.Egge@online.no>2021-11-08 18:52:06 +0100
commitf1e231eab069edfa5eb499f324e3ee7bcca89a40 (patch)
treea382ceb5a2e9537ec7956cb8468234d7afdcaddf
parent401ef5852764dee9a7e1568057c11f240ae6be9e (diff)
Use alternate executor id for push stage when sharing sequenced task executor
with invert stage.
-rw-r--r--searchlib/src/vespa/searchlib/memoryindex/bundled_fields_context.h1
-rw-r--r--searchlib/src/vespa/searchlib/memoryindex/document_inverter_context.cpp11
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.cpp9
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h9
4 files changed, 30 insertions, 0 deletions
diff --git a/searchlib/src/vespa/searchlib/memoryindex/bundled_fields_context.h b/searchlib/src/vespa/searchlib/memoryindex/bundled_fields_context.h
index 68dac168280..fb1a68d7273 100644
--- a/searchlib/src/vespa/searchlib/memoryindex/bundled_fields_context.h
+++ b/searchlib/src/vespa/searchlib/memoryindex/bundled_fields_context.h
@@ -22,6 +22,7 @@ protected:
public:
void add_field(uint32_t field_id);
void add_uri_field(uint32_t uri_field_id);
+ void set_id(vespalib::ISequencedTaskExecutor::ExecutorId id) { _id = id; }
vespalib::ISequencedTaskExecutor::ExecutorId get_id() const noexcept { return _id; }
const std::vector<uint32_t>& get_fields() const noexcept { return _fields; }
const std::vector<uint32_t>& get_uri_fields() const noexcept { return _uri_fields; }
diff --git a/searchlib/src/vespa/searchlib/memoryindex/document_inverter_context.cpp b/searchlib/src/vespa/searchlib/memoryindex/document_inverter_context.cpp
index 84aeff92bed..c82f71906fb 100644
--- a/searchlib/src/vespa/searchlib/memoryindex/document_inverter_context.cpp
+++ b/searchlib/src/vespa/searchlib/memoryindex/document_inverter_context.cpp
@@ -50,6 +50,13 @@ void make_contexts(const SchemaIndexFields& schema_index_fields, ISequencedTaskE
}
}
+void switch_to_alternate_ids(ISequencedTaskExecutor& executor, std::vector<PushContext>& contexts, uint32_t bias)
+{
+ for (auto& context : contexts) {
+ context.set_id(executor.get_alternate_executor_id(context.get_id(), bias));
+ }
+}
+
class PusherMapping {
std::vector<std::optional<uint32_t>> _pushers;
public:
@@ -195,6 +202,10 @@ DocumentInverterContext::setup_contexts()
{
make_contexts(_schema_index_fields, _invert_threads, _invert_contexts);
make_contexts(_schema_index_fields, _push_threads, _push_contexts);
+ if (&_invert_threads == &_push_threads) {
+ uint32_t bias = _schema_index_fields._textFields.size() + _schema_index_fields._uriFields.size();
+ switch_to_alternate_ids(_push_threads, _push_contexts, bias);
+ }
connect_contexts(_invert_contexts, _push_contexts, _schema.getNumIndexFields(), _schema_index_fields._uriFields.size());
}
diff --git a/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.cpp
index 41653889b8f..c54f182891c 100644
--- a/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.cpp
@@ -18,4 +18,13 @@ ISequencedTaskExecutor::getExecutorIdFromName(vespalib::stringref componentId) c
return getExecutorId(hashfun(componentId));
}
+ISequencedTaskExecutor::ExecutorId
+ISequencedTaskExecutor::get_alternate_executor_id(ExecutorId id, uint32_t bias) const
+{
+ if ((bias % _numExecutors) == 0) {
+ bias = 1;
+ }
+ return ExecutorId((id.getId() + bias) % _numExecutors);
+}
+
}
diff --git a/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h b/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h
index 06cc37616c0..0e931838279 100644
--- a/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h
+++ b/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h
@@ -43,6 +43,15 @@ public:
ExecutorId getExecutorIdFromName(vespalib::stringref componentId) const;
/**
+ * Returns an executor id that is NOT equal to the given executor id,
+ * using the given bias to offset the new id.
+ *
+ * This is relevant for pipelining operations on the same component,
+ * by doing pipeline steps in different executors.
+ */
+ ExecutorId get_alternate_executor_id(ExecutorId id, uint32_t bias) const;
+
+ /**
* Schedule a task to run after all previously scheduled tasks with
* same id.
*