diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2022-01-19 13:28:44 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-01-19 13:28:44 +0100 |
commit | 02aa747942ba9dcd03e97c5c1c0f9cbe0eeef55f (patch) | |
tree | 19e6aabc654ed573b16b595f4e4d6e5639e93bed | |
parent | f2bd1f3efdfbf4250e731f62dccf31adc7251dca (diff) | |
parent | b3a00a723ead93d6b6245d21c0093748ffd89c02 (diff) |
Merge pull request #20872 from vespa-engine/balder/allow-full-use-of-all-executor-threads-for-fusion-too
Since fusion task are now smaller, there is no need to explicit limit…
4 files changed, 12 insertions, 16 deletions
diff --git a/searchlib/src/vespa/searchlib/diskindex/field_mergers_state.cpp b/searchlib/src/vespa/searchlib/diskindex/field_mergers_state.cpp index e62367c0f37..280800f1b58 100644 --- a/searchlib/src/vespa/searchlib/diskindex/field_mergers_state.cpp +++ b/searchlib/src/vespa/searchlib/diskindex/field_mergers_state.cpp @@ -5,15 +5,15 @@ #include "field_merger_task.h" #include "fusion_output_index.h" #include <vespa/searchcommon/common/schema.h> -#include <vespa/vespalib/util/threadexecutor.h> +#include <vespa/vespalib/util/executor.h> +#include <cassert> namespace search::diskindex { -FieldMergersState::FieldMergersState(const FusionOutputIndex& fusion_out_index, vespalib::ThreadExecutor& executor, std::shared_ptr<IFlushToken> flush_token) +FieldMergersState::FieldMergersState(const FusionOutputIndex& fusion_out_index, vespalib::Executor& executor, std::shared_ptr<IFlushToken> flush_token) : _fusion_out_index(fusion_out_index), _executor(executor), _flush_token(std::move(flush_token)), - _concurrent(std::max(1ul, _executor.getNumThreads() / 2)), _done(_fusion_out_index.get_schema().getNumIndexFields()), _failed(0u), _field_mergers(_fusion_out_index.get_schema().getNumIndexFields()) @@ -28,7 +28,6 @@ FieldMergersState::~FieldMergersState() FieldMerger& FieldMergersState::alloc_field_merger(uint32_t id) { - _concurrent.wait(); assert(id < _field_mergers.size()); auto field_merger = std::make_unique<FieldMerger>(id, _fusion_out_index, _flush_token); auto& result = *field_merger; @@ -46,7 +45,6 @@ FieldMergersState::destroy_field_merger(FieldMerger& field_merger) old_merger = std::move(_field_mergers[id]); assert(old_merger.get() == &field_merger); old_merger.reset(); - _concurrent.post(); _done.countDown(); } diff --git a/searchlib/src/vespa/searchlib/diskindex/field_mergers_state.h b/searchlib/src/vespa/searchlib/diskindex/field_mergers_state.h index 34c50c4d3e5..f4bad9a2b8c 100644 --- a/searchlib/src/vespa/searchlib/diskindex/field_mergers_state.h +++ b/searchlib/src/vespa/searchlib/diskindex/field_mergers_state.h @@ -2,12 +2,12 @@ #pragma once -#include <vespa/document/util/queue.h> #include <vespa/vespalib/util/count_down_latch.h> #include <atomic> +#include <vector> namespace search { class IFlushToken; } -namespace vespalib { class ThreadExecutor; } +namespace vespalib { class Executor; } namespace search::diskindex { @@ -20,16 +20,15 @@ class FusionOutputIndex; */ class FieldMergersState { const FusionOutputIndex& _fusion_out_index; - vespalib::ThreadExecutor& _executor; + vespalib::Executor& _executor; std::shared_ptr<IFlushToken> _flush_token; - document::Semaphore _concurrent; vespalib::CountDownLatch _done; std::atomic<uint32_t> _failed; std::vector<std::unique_ptr<FieldMerger>> _field_mergers; void destroy_field_merger(FieldMerger& field_merger); public: - FieldMergersState(const FusionOutputIndex& fusion_out_index, vespalib::ThreadExecutor& executor, std::shared_ptr<IFlushToken> flush_token); + FieldMergersState(const FusionOutputIndex& fusion_out_index, vespalib::Executor& executor, std::shared_ptr<IFlushToken> flush_token); ~FieldMergersState(); FieldMerger& alloc_field_merger(uint32_t id); void field_merger_done(FieldMerger& field_merger, bool failed); diff --git a/searchlib/src/vespa/searchlib/diskindex/fusion.cpp b/searchlib/src/vespa/searchlib/diskindex/fusion.cpp index 0fa634ef072..7d8bd4bc799 100644 --- a/searchlib/src/vespa/searchlib/diskindex/fusion.cpp +++ b/searchlib/src/vespa/searchlib/diskindex/fusion.cpp @@ -10,7 +10,6 @@ #include <vespa/searchlib/index/schemautil.h> #include <vespa/searchlib/util/dirtraverse.h> #include <vespa/vespalib/io/fileutil.h> -#include <vespa/vespalib/util/count_down_latch.h> #include <vespa/vespalib/util/error.h> #include <vespa/vespalib/util/exceptions.h> #include <vespa/vespalib/util/lambdatask.h> @@ -72,7 +71,7 @@ Fusion::Fusion(const Schema& schema, const vespalib::string& dir, Fusion::~Fusion() = default; bool -Fusion::mergeFields(vespalib::ThreadExecutor & executor, std::shared_ptr<IFlushToken> flush_token) +Fusion::mergeFields(vespalib::Executor & executor, std::shared_ptr<IFlushToken> flush_token) { FieldMergersState field_mergers_state(_fusion_out_index, executor, flush_token); const Schema &schema = getSchema(); @@ -104,7 +103,7 @@ Fusion::readSchemaFiles() } bool -Fusion::merge(vespalib::ThreadExecutor& executor, std::shared_ptr<IFlushToken> flush_token) +Fusion::merge(vespalib::Executor& executor, std::shared_ptr<IFlushToken> flush_token) { FastOS_StatInfo statInfo; if (!FastOS_File::Stat(_fusion_out_index.get_path().c_str(), &statInfo)) { diff --git a/searchlib/src/vespa/searchlib/diskindex/fusion.h b/searchlib/src/vespa/searchlib/diskindex/fusion.h index 1f5c4471950..04edf77ea81 100644 --- a/searchlib/src/vespa/searchlib/diskindex/fusion.h +++ b/searchlib/src/vespa/searchlib/diskindex/fusion.h @@ -3,7 +3,7 @@ #pragma once #include "fusion_output_index.h" -#include <vespa/vespalib/util/threadexecutor.h> +#include <vespa/vespalib/util/executor.h> namespace search { class IFlushToken; @@ -25,7 +25,7 @@ class Fusion private: using Schema = index::Schema; - bool mergeFields(vespalib::ThreadExecutor& executor, std::shared_ptr<IFlushToken> flush_token); + bool mergeFields(vespalib::Executor& executor, std::shared_ptr<IFlushToken> flush_token); bool readSchemaFiles(); bool checkSchemaCompat(); @@ -43,7 +43,7 @@ public: ~Fusion(); void set_dynamic_k_pos_index_format(bool dynamic_k_pos_index_format) { _fusion_out_index.set_dynamic_k_pos_index_format(dynamic_k_pos_index_format); } void set_force_small_merge_chunk(bool force_small_merge_chunk) { _fusion_out_index.set_force_small_merge_chunk(force_small_merge_chunk); } - bool merge(vespalib::ThreadExecutor& executor, std::shared_ptr<IFlushToken> flush_token); + bool merge(vespalib::Executor& executor, std::shared_ptr<IFlushToken> flush_token); }; } |