summaryrefslogtreecommitdiffstats
path: root/searchlib
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@online.no>2022-01-17 14:15:32 +0100
committerTor Egge <Tor.Egge@online.no>2022-01-17 14:23:51 +0100
commite7bccfc03ca40700e19865c1c7282255458bf09e (patch)
tree1fb0ec213b981bcba0a52cf49e34eb29ac76c7ed /searchlib
parente7f13b646891ddbfad4dfbc5725bfbcbc34d1132 (diff)
Schedule small tasks for field merge.
Diffstat (limited to 'searchlib')
-rw-r--r--searchlib/src/vespa/searchlib/diskindex/CMakeLists.txt2
-rw-r--r--searchlib/src/vespa/searchlib/diskindex/field_merger.cpp9
-rw-r--r--searchlib/src/vespa/searchlib/diskindex/field_merger.h4
-rw-r--r--searchlib/src/vespa/searchlib/diskindex/field_merger_task.cpp22
-rw-r--r--searchlib/src/vespa/searchlib/diskindex/field_merger_task.h30
-rw-r--r--searchlib/src/vespa/searchlib/diskindex/field_mergers_state.cpp75
-rw-r--r--searchlib/src/vespa/searchlib/diskindex/field_mergers_state.h41
-rw-r--r--searchlib/src/vespa/searchlib/diskindex/fusion.cpp22
8 files changed, 179 insertions, 26 deletions
diff --git a/searchlib/src/vespa/searchlib/diskindex/CMakeLists.txt b/searchlib/src/vespa/searchlib/diskindex/CMakeLists.txt
index 74a873a4e29..0f7c77f8451 100644
--- a/searchlib/src/vespa/searchlib/diskindex/CMakeLists.txt
+++ b/searchlib/src/vespa/searchlib/diskindex/CMakeLists.txt
@@ -11,6 +11,8 @@ vespa_add_library(searchlib_diskindex OBJECT
docidmapper.cpp
extposocc.cpp
field_merger.cpp
+ field_mergers_state.cpp
+ field_merger_task.cpp
fieldreader.cpp
fieldwriter.cpp
field_length_scanner.cpp
diff --git a/searchlib/src/vespa/searchlib/diskindex/field_merger.cpp b/searchlib/src/vespa/searchlib/diskindex/field_merger.cpp
index c5ad0c63c21..dfd4d891818 100644
--- a/searchlib/src/vespa/searchlib/diskindex/field_merger.cpp
+++ b/searchlib/src/vespa/searchlib/diskindex/field_merger.cpp
@@ -560,13 +560,4 @@ FieldMerger::process_merge_field()
}
}
-bool
-FieldMerger::merge_field()
-{
- while (!_failed && _state != State::MERGE_DONE) {
- process_merge_field();
- }
- return !_failed;
-}
-
}
diff --git a/searchlib/src/vespa/searchlib/diskindex/field_merger.h b/searchlib/src/vespa/searchlib/diskindex/field_merger.h
index 653facfcd51..347367b3aa5 100644
--- a/searchlib/src/vespa/searchlib/diskindex/field_merger.h
+++ b/searchlib/src/vespa/searchlib/diskindex/field_merger.h
@@ -85,7 +85,9 @@ public:
void merge_field_start();
void merge_field_finish();
void process_merge_field(); // Called multiple times
- bool merge_field();
+ uint32_t get_id() const noexcept { return _id; }
+ bool done() const noexcept { return _state == State::MERGE_DONE; }
+ bool failed() const noexcept { return _failed; }
};
}
diff --git a/searchlib/src/vespa/searchlib/diskindex/field_merger_task.cpp b/searchlib/src/vespa/searchlib/diskindex/field_merger_task.cpp
new file mode 100644
index 00000000000..bd73685a0a9
--- /dev/null
+++ b/searchlib/src/vespa/searchlib/diskindex/field_merger_task.cpp
@@ -0,0 +1,22 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "field_merger_task.h"
+#include "field_merger.h"
+#include "field_mergers_state.h"
+
+namespace search::diskindex {
+
+void
+FieldMergerTask::run()
+{
+ _field_merger.process_merge_field();
+ if (_field_merger.failed()) {
+ _field_mergers_state.field_merger_done(_field_merger, true);
+ } else if (_field_merger.done()) {
+ _field_mergers_state.field_merger_done(_field_merger, false);
+ } else {
+ _field_mergers_state.schedule_task(_field_merger);
+ }
+}
+
+}
diff --git a/searchlib/src/vespa/searchlib/diskindex/field_merger_task.h b/searchlib/src/vespa/searchlib/diskindex/field_merger_task.h
new file mode 100644
index 00000000000..179feabe652
--- /dev/null
+++ b/searchlib/src/vespa/searchlib/diskindex/field_merger_task.h
@@ -0,0 +1,30 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <vespa/vespalib/util/threadexecutor.h>
+
+namespace search::diskindex {
+
+class FieldMerger;
+class FieldMergersState;
+
+/*
+ * Task for processing a portion of a field merge.
+ */
+class FieldMergerTask : public vespalib::Executor::Task
+{
+ FieldMerger& _field_merger;
+ FieldMergersState& _field_mergers_state;
+
+ void run() override;
+public:
+ FieldMergerTask(FieldMerger& field_merger, FieldMergersState& field_mergers_state)
+ : vespalib::Executor::Task(),
+ _field_merger(field_merger),
+ _field_mergers_state(field_mergers_state)
+ {
+ }
+};
+
+}
diff --git a/searchlib/src/vespa/searchlib/diskindex/field_mergers_state.cpp b/searchlib/src/vespa/searchlib/diskindex/field_mergers_state.cpp
new file mode 100644
index 00000000000..e62367c0f37
--- /dev/null
+++ b/searchlib/src/vespa/searchlib/diskindex/field_mergers_state.cpp
@@ -0,0 +1,75 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "field_mergers_state.h"
+#include "field_merger.h"
+#include "field_merger_task.h"
+#include "fusion_output_index.h"
+#include <vespa/searchcommon/common/schema.h>
+#include <vespa/vespalib/util/threadexecutor.h>
+
+namespace search::diskindex {
+
+FieldMergersState::FieldMergersState(const FusionOutputIndex& fusion_out_index, vespalib::ThreadExecutor& executor, std::shared_ptr<IFlushToken> flush_token)
+ : _fusion_out_index(fusion_out_index),
+ _executor(executor),
+ _flush_token(std::move(flush_token)),
+ _concurrent(std::max(1ul, _executor.getNumThreads() / 2)),
+ _done(_fusion_out_index.get_schema().getNumIndexFields()),
+ _failed(0u),
+ _field_mergers(_fusion_out_index.get_schema().getNumIndexFields())
+{
+}
+
+FieldMergersState::~FieldMergersState()
+{
+ wait_field_mergers_done();
+}
+
+FieldMerger&
+FieldMergersState::alloc_field_merger(uint32_t id)
+{
+ _concurrent.wait();
+ assert(id < _field_mergers.size());
+ auto field_merger = std::make_unique<FieldMerger>(id, _fusion_out_index, _flush_token);
+ auto& result = *field_merger;
+ assert(!_field_mergers[id]);
+ _field_mergers[id] = std::move(field_merger);
+ return result;
+}
+
+void
+FieldMergersState::destroy_field_merger(FieldMerger& field_merger)
+{
+ uint32_t id = field_merger.get_id();
+ assert(id < _field_mergers.size());
+ std::unique_ptr<FieldMerger> old_merger;
+ old_merger = std::move(_field_mergers[id]);
+ assert(old_merger.get() == &field_merger);
+ old_merger.reset();
+ _concurrent.post();
+ _done.countDown();
+}
+
+void
+FieldMergersState::field_merger_done(FieldMerger& field_merger, bool failed)
+{
+ if (failed) {
+ ++_failed;
+ }
+ destroy_field_merger(field_merger);
+}
+
+void
+FieldMergersState::wait_field_mergers_done()
+{
+ _done.await();
+}
+
+void
+FieldMergersState::schedule_task(FieldMerger& field_merger)
+{
+ auto rejected = _executor.execute(std::make_unique<FieldMergerTask>(field_merger, *this));
+ assert(!rejected);
+}
+
+}
diff --git a/searchlib/src/vespa/searchlib/diskindex/field_mergers_state.h b/searchlib/src/vespa/searchlib/diskindex/field_mergers_state.h
new file mode 100644
index 00000000000..34c50c4d3e5
--- /dev/null
+++ b/searchlib/src/vespa/searchlib/diskindex/field_mergers_state.h
@@ -0,0 +1,41 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <vespa/document/util/queue.h>
+#include <vespa/vespalib/util/count_down_latch.h>
+#include <atomic>
+
+namespace search { class IFlushToken; }
+namespace vespalib { class ThreadExecutor; }
+
+namespace search::diskindex {
+
+class FieldMerger;
+class FusionOutputIndex;
+
+/*
+ * This class has ownership of active field mergers until they are
+ * done or failed.
+ */
+class FieldMergersState {
+ const FusionOutputIndex& _fusion_out_index;
+ vespalib::ThreadExecutor& _executor;
+ std::shared_ptr<IFlushToken> _flush_token;
+ document::Semaphore _concurrent;
+ vespalib::CountDownLatch _done;
+ std::atomic<uint32_t> _failed;
+ std::vector<std::unique_ptr<FieldMerger>> _field_mergers;
+
+ void destroy_field_merger(FieldMerger& field_merger);
+public:
+ FieldMergersState(const FusionOutputIndex& fusion_out_index, vespalib::ThreadExecutor& executor, std::shared_ptr<IFlushToken> flush_token);
+ ~FieldMergersState();
+ FieldMerger& alloc_field_merger(uint32_t id);
+ void field_merger_done(FieldMerger& field_merger, bool failed);
+ void wait_field_mergers_done();
+ void schedule_task(FieldMerger& field_merger);
+ uint32_t get_failed() const noexcept { return _failed; }
+};
+
+}
diff --git a/searchlib/src/vespa/searchlib/diskindex/fusion.cpp b/searchlib/src/vespa/searchlib/diskindex/fusion.cpp
index 12552f09027..0fa634ef072 100644
--- a/searchlib/src/vespa/searchlib/diskindex/fusion.cpp
+++ b/searchlib/src/vespa/searchlib/diskindex/fusion.cpp
@@ -3,6 +3,7 @@
#include "fusion.h"
#include "fusion_input_index.h"
#include "field_merger.h"
+#include "field_mergers_state.h"
#include <vespa/fastos/file.h>
#include <vespa/searchlib/common/documentsummary.h>
#include <vespa/searchlib/common/i_flush_token.h>
@@ -73,29 +74,18 @@ Fusion::~Fusion() = default;
bool
Fusion::mergeFields(vespalib::ThreadExecutor & executor, std::shared_ptr<IFlushToken> flush_token)
{
+ FieldMergersState field_mergers_state(_fusion_out_index, executor, flush_token);
const Schema &schema = getSchema();
- std::atomic<uint32_t> failed(0);
- uint32_t maxConcurrentThreads = std::max(1ul, executor.getNumThreads()/2);
- document::Semaphore concurrent(maxConcurrentThreads);
- vespalib::CountDownLatch done(schema.getNumIndexFields());
for (SchemaUtil::IndexIterator iter(schema); iter.isValid(); ++iter) {
- concurrent.wait();
- executor.execute(vespalib::makeLambdaTask([this, index=iter.getIndex(), &failed, &done, &concurrent, flush_token]() {
- FieldMerger merger(index, _fusion_out_index, flush_token);
- if (!merger.merge_field()) {
- failed++;
- }
- concurrent.post();
- done.countDown();
- }));
+ auto& field_merger = field_mergers_state.alloc_field_merger(iter.getIndex());
+ field_mergers_state.schedule_task(field_merger);
}
LOG(debug, "Waiting for %u fields", schema.getNumIndexFields());
- done.await();
+ field_mergers_state.wait_field_mergers_done();
LOG(debug, "Done waiting for %u fields", schema.getNumIndexFields());
- return (failed == 0u);
+ return (field_mergers_state.get_failed() == 0u);
}
-
bool
Fusion::checkSchemaCompat()
{