diff options
author | Tor Egge <Tor.Egge@online.no> | 2021-11-08 18:34:29 +0100 |
---|---|---|
committer | Tor Egge <Tor.Egge@online.no> | 2021-11-08 18:52:06 +0100 |
commit | f1e231eab069edfa5eb499f324e3ee7bcca89a40 (patch) | |
tree | a382ceb5a2e9537ec7956cb8468234d7afdcaddf | |
parent | 401ef5852764dee9a7e1568057c11f240ae6be9e (diff) |
Use alternate executor id for push stage when sharing sequenced task executor
with invert stage.
4 files changed, 30 insertions, 0 deletions
diff --git a/searchlib/src/vespa/searchlib/memoryindex/bundled_fields_context.h b/searchlib/src/vespa/searchlib/memoryindex/bundled_fields_context.h index 68dac168280..fb1a68d7273 100644 --- a/searchlib/src/vespa/searchlib/memoryindex/bundled_fields_context.h +++ b/searchlib/src/vespa/searchlib/memoryindex/bundled_fields_context.h @@ -22,6 +22,7 @@ protected: public: void add_field(uint32_t field_id); void add_uri_field(uint32_t uri_field_id); + void set_id(vespalib::ISequencedTaskExecutor::ExecutorId id) { _id = id; } vespalib::ISequencedTaskExecutor::ExecutorId get_id() const noexcept { return _id; } const std::vector<uint32_t>& get_fields() const noexcept { return _fields; } const std::vector<uint32_t>& get_uri_fields() const noexcept { return _uri_fields; } diff --git a/searchlib/src/vespa/searchlib/memoryindex/document_inverter_context.cpp b/searchlib/src/vespa/searchlib/memoryindex/document_inverter_context.cpp index 84aeff92bed..c82f71906fb 100644 --- a/searchlib/src/vespa/searchlib/memoryindex/document_inverter_context.cpp +++ b/searchlib/src/vespa/searchlib/memoryindex/document_inverter_context.cpp @@ -50,6 +50,13 @@ void make_contexts(const SchemaIndexFields& schema_index_fields, ISequencedTaskE } } +void switch_to_alternate_ids(ISequencedTaskExecutor& executor, std::vector<PushContext>& contexts, uint32_t bias) +{ + for (auto& context : contexts) { + context.set_id(executor.get_alternate_executor_id(context.get_id(), bias)); + } +} + class PusherMapping { std::vector<std::optional<uint32_t>> _pushers; public: @@ -195,6 +202,10 @@ DocumentInverterContext::setup_contexts() { make_contexts(_schema_index_fields, _invert_threads, _invert_contexts); make_contexts(_schema_index_fields, _push_threads, _push_contexts); + if (&_invert_threads == &_push_threads) { + uint32_t bias = _schema_index_fields._textFields.size() + _schema_index_fields._uriFields.size(); + switch_to_alternate_ids(_push_threads, _push_contexts, bias); + } connect_contexts(_invert_contexts, _push_contexts, _schema.getNumIndexFields(), _schema_index_fields._uriFields.size()); } diff --git a/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.cpp index 41653889b8f..c54f182891c 100644 --- a/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.cpp @@ -18,4 +18,13 @@ ISequencedTaskExecutor::getExecutorIdFromName(vespalib::stringref componentId) c return getExecutorId(hashfun(componentId)); } +ISequencedTaskExecutor::ExecutorId +ISequencedTaskExecutor::get_alternate_executor_id(ExecutorId id, uint32_t bias) const +{ + if ((bias % _numExecutors) == 0) { + bias = 1; + } + return ExecutorId((id.getId() + bias) % _numExecutors); +} + } diff --git a/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h b/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h index 06cc37616c0..0e931838279 100644 --- a/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h +++ b/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h @@ -43,6 +43,15 @@ public: ExecutorId getExecutorIdFromName(vespalib::stringref componentId) const; /** + * Returns an executor id that is NOT equal to the given executor id, + * using the given bias to offset the new id. + * + * This is relevant for pipelining operations on the same component, + * by doing pipeline steps in different executors. + */ + ExecutorId get_alternate_executor_id(ExecutorId id, uint32_t bias) const; + + /** * Schedule a task to run after all previously scheduled tasks with * same id. * |