diff options
Diffstat (limited to 'searchlib')
8 files changed, 179 insertions, 26 deletions
diff --git a/searchlib/src/vespa/searchlib/diskindex/CMakeLists.txt b/searchlib/src/vespa/searchlib/diskindex/CMakeLists.txt index 74a873a4e29..0f7c77f8451 100644 --- a/searchlib/src/vespa/searchlib/diskindex/CMakeLists.txt +++ b/searchlib/src/vespa/searchlib/diskindex/CMakeLists.txt @@ -11,6 +11,8 @@ vespa_add_library(searchlib_diskindex OBJECT docidmapper.cpp extposocc.cpp field_merger.cpp + field_mergers_state.cpp + field_merger_task.cpp fieldreader.cpp fieldwriter.cpp field_length_scanner.cpp diff --git a/searchlib/src/vespa/searchlib/diskindex/field_merger.cpp b/searchlib/src/vespa/searchlib/diskindex/field_merger.cpp index c5ad0c63c21..dfd4d891818 100644 --- a/searchlib/src/vespa/searchlib/diskindex/field_merger.cpp +++ b/searchlib/src/vespa/searchlib/diskindex/field_merger.cpp @@ -560,13 +560,4 @@ FieldMerger::process_merge_field() } } -bool -FieldMerger::merge_field() -{ - while (!_failed && _state != State::MERGE_DONE) { - process_merge_field(); - } - return !_failed; -} - } diff --git a/searchlib/src/vespa/searchlib/diskindex/field_merger.h b/searchlib/src/vespa/searchlib/diskindex/field_merger.h index 653facfcd51..347367b3aa5 100644 --- a/searchlib/src/vespa/searchlib/diskindex/field_merger.h +++ b/searchlib/src/vespa/searchlib/diskindex/field_merger.h @@ -85,7 +85,9 @@ public: void merge_field_start(); void merge_field_finish(); void process_merge_field(); // Called multiple times - bool merge_field(); + uint32_t get_id() const noexcept { return _id; } + bool done() const noexcept { return _state == State::MERGE_DONE; } + bool failed() const noexcept { return _failed; } }; } diff --git a/searchlib/src/vespa/searchlib/diskindex/field_merger_task.cpp b/searchlib/src/vespa/searchlib/diskindex/field_merger_task.cpp new file mode 100644 index 00000000000..bd73685a0a9 --- /dev/null +++ b/searchlib/src/vespa/searchlib/diskindex/field_merger_task.cpp @@ -0,0 +1,22 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "field_merger_task.h" +#include "field_merger.h" +#include "field_mergers_state.h" + +namespace search::diskindex { + +void +FieldMergerTask::run() +{ + _field_merger.process_merge_field(); + if (_field_merger.failed()) { + _field_mergers_state.field_merger_done(_field_merger, true); + } else if (_field_merger.done()) { + _field_mergers_state.field_merger_done(_field_merger, false); + } else { + _field_mergers_state.schedule_task(_field_merger); + } +} + +} diff --git a/searchlib/src/vespa/searchlib/diskindex/field_merger_task.h b/searchlib/src/vespa/searchlib/diskindex/field_merger_task.h new file mode 100644 index 00000000000..179feabe652 --- /dev/null +++ b/searchlib/src/vespa/searchlib/diskindex/field_merger_task.h @@ -0,0 +1,30 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <vespa/vespalib/util/threadexecutor.h> + +namespace search::diskindex { + +class FieldMerger; +class FieldMergersState; + +/* + * Task for processing a portion of a field merge. + */ +class FieldMergerTask : public vespalib::Executor::Task +{ + FieldMerger& _field_merger; + FieldMergersState& _field_mergers_state; + + void run() override; +public: + FieldMergerTask(FieldMerger& field_merger, FieldMergersState& field_mergers_state) + : vespalib::Executor::Task(), + _field_merger(field_merger), + _field_mergers_state(field_mergers_state) + { + } +}; + +} diff --git a/searchlib/src/vespa/searchlib/diskindex/field_mergers_state.cpp b/searchlib/src/vespa/searchlib/diskindex/field_mergers_state.cpp new file mode 100644 index 00000000000..e62367c0f37 --- /dev/null +++ b/searchlib/src/vespa/searchlib/diskindex/field_mergers_state.cpp @@ -0,0 +1,75 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "field_mergers_state.h" +#include "field_merger.h" +#include "field_merger_task.h" +#include "fusion_output_index.h" +#include <vespa/searchcommon/common/schema.h> +#include <vespa/vespalib/util/threadexecutor.h> + +namespace search::diskindex { + +FieldMergersState::FieldMergersState(const FusionOutputIndex& fusion_out_index, vespalib::ThreadExecutor& executor, std::shared_ptr<IFlushToken> flush_token) + : _fusion_out_index(fusion_out_index), + _executor(executor), + _flush_token(std::move(flush_token)), + _concurrent(std::max(1ul, _executor.getNumThreads() / 2)), + _done(_fusion_out_index.get_schema().getNumIndexFields()), + _failed(0u), + _field_mergers(_fusion_out_index.get_schema().getNumIndexFields()) +{ +} + +FieldMergersState::~FieldMergersState() +{ + wait_field_mergers_done(); +} + +FieldMerger& +FieldMergersState::alloc_field_merger(uint32_t id) +{ + _concurrent.wait(); + assert(id < _field_mergers.size()); + auto field_merger = std::make_unique<FieldMerger>(id, _fusion_out_index, _flush_token); + auto& result = *field_merger; + assert(!_field_mergers[id]); + _field_mergers[id] = std::move(field_merger); + return result; +} + +void +FieldMergersState::destroy_field_merger(FieldMerger& field_merger) +{ + uint32_t id = field_merger.get_id(); + assert(id < _field_mergers.size()); + std::unique_ptr<FieldMerger> old_merger; + old_merger = std::move(_field_mergers[id]); + assert(old_merger.get() == &field_merger); + old_merger.reset(); + _concurrent.post(); + _done.countDown(); +} + +void +FieldMergersState::field_merger_done(FieldMerger& field_merger, bool failed) +{ + if (failed) { + ++_failed; + } + destroy_field_merger(field_merger); +} + +void +FieldMergersState::wait_field_mergers_done() +{ + _done.await(); +} + +void +FieldMergersState::schedule_task(FieldMerger& field_merger) +{ + auto rejected = _executor.execute(std::make_unique<FieldMergerTask>(field_merger, *this)); + assert(!rejected); +} + +} diff --git a/searchlib/src/vespa/searchlib/diskindex/field_mergers_state.h b/searchlib/src/vespa/searchlib/diskindex/field_mergers_state.h new file mode 100644 index 00000000000..34c50c4d3e5 --- /dev/null +++ b/searchlib/src/vespa/searchlib/diskindex/field_mergers_state.h @@ -0,0 +1,41 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <vespa/document/util/queue.h> +#include <vespa/vespalib/util/count_down_latch.h> +#include <atomic> + +namespace search { class IFlushToken; } +namespace vespalib { class ThreadExecutor; } + +namespace search::diskindex { + +class FieldMerger; +class FusionOutputIndex; + +/* + * This class has ownership of active field mergers until they are + * done or failed. + */ +class FieldMergersState { + const FusionOutputIndex& _fusion_out_index; + vespalib::ThreadExecutor& _executor; + std::shared_ptr<IFlushToken> _flush_token; + document::Semaphore _concurrent; + vespalib::CountDownLatch _done; + std::atomic<uint32_t> _failed; + std::vector<std::unique_ptr<FieldMerger>> _field_mergers; + + void destroy_field_merger(FieldMerger& field_merger); +public: + FieldMergersState(const FusionOutputIndex& fusion_out_index, vespalib::ThreadExecutor& executor, std::shared_ptr<IFlushToken> flush_token); + ~FieldMergersState(); + FieldMerger& alloc_field_merger(uint32_t id); + void field_merger_done(FieldMerger& field_merger, bool failed); + void wait_field_mergers_done(); + void schedule_task(FieldMerger& field_merger); + uint32_t get_failed() const noexcept { return _failed; } +}; + +} diff --git a/searchlib/src/vespa/searchlib/diskindex/fusion.cpp b/searchlib/src/vespa/searchlib/diskindex/fusion.cpp index 12552f09027..0fa634ef072 100644 --- a/searchlib/src/vespa/searchlib/diskindex/fusion.cpp +++ b/searchlib/src/vespa/searchlib/diskindex/fusion.cpp @@ -3,6 +3,7 @@ #include "fusion.h" #include "fusion_input_index.h" #include "field_merger.h" +#include "field_mergers_state.h" #include <vespa/fastos/file.h> #include <vespa/searchlib/common/documentsummary.h> #include <vespa/searchlib/common/i_flush_token.h> @@ -73,29 +74,18 @@ Fusion::~Fusion() = default; bool Fusion::mergeFields(vespalib::ThreadExecutor & executor, std::shared_ptr<IFlushToken> flush_token) { + FieldMergersState field_mergers_state(_fusion_out_index, executor, flush_token); const Schema &schema = getSchema(); - std::atomic<uint32_t> failed(0); - uint32_t maxConcurrentThreads = std::max(1ul, executor.getNumThreads()/2); - document::Semaphore concurrent(maxConcurrentThreads); - vespalib::CountDownLatch done(schema.getNumIndexFields()); for (SchemaUtil::IndexIterator iter(schema); iter.isValid(); ++iter) { - concurrent.wait(); - executor.execute(vespalib::makeLambdaTask([this, index=iter.getIndex(), &failed, &done, &concurrent, flush_token]() { - FieldMerger merger(index, _fusion_out_index, flush_token); - if (!merger.merge_field()) { - failed++; - } - concurrent.post(); - done.countDown(); - })); + auto& field_merger = field_mergers_state.alloc_field_merger(iter.getIndex()); + field_mergers_state.schedule_task(field_merger); } LOG(debug, "Waiting for %u fields", schema.getNumIndexFields()); - done.await(); + field_mergers_state.wait_field_mergers_done(); LOG(debug, "Done waiting for %u fields", schema.getNumIndexFields()); - return (failed == 0u); + return (field_mergers_state.get_failed() == 0u); } - bool Fusion::checkSchemaCompat() { |