aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2022-01-19 13:28:44 +0100
committerGitHub <noreply@github.com>2022-01-19 13:28:44 +0100
commit02aa747942ba9dcd03e97c5c1c0f9cbe0eeef55f (patch)
tree19e6aabc654ed573b16b595f4e4d6e5639e93bed
parentf2bd1f3efdfbf4250e731f62dccf31adc7251dca (diff)
parentb3a00a723ead93d6b6245d21c0093748ffd89c02 (diff)
Merge pull request #20872 from vespa-engine/balder/allow-full-use-of-all-executor-threads-for-fusion-too
Since fusion task are now smaller, there is no need to explicit limit…
-rw-r--r--searchlib/src/vespa/searchlib/diskindex/field_mergers_state.cpp8
-rw-r--r--searchlib/src/vespa/searchlib/diskindex/field_mergers_state.h9
-rw-r--r--searchlib/src/vespa/searchlib/diskindex/fusion.cpp5
-rw-r--r--searchlib/src/vespa/searchlib/diskindex/fusion.h6
4 files changed, 12 insertions, 16 deletions
diff --git a/searchlib/src/vespa/searchlib/diskindex/field_mergers_state.cpp b/searchlib/src/vespa/searchlib/diskindex/field_mergers_state.cpp
index e62367c0f37..280800f1b58 100644
--- a/searchlib/src/vespa/searchlib/diskindex/field_mergers_state.cpp
+++ b/searchlib/src/vespa/searchlib/diskindex/field_mergers_state.cpp
@@ -5,15 +5,15 @@
#include "field_merger_task.h"
#include "fusion_output_index.h"
#include <vespa/searchcommon/common/schema.h>
-#include <vespa/vespalib/util/threadexecutor.h>
+#include <vespa/vespalib/util/executor.h>
+#include <cassert>
namespace search::diskindex {
-FieldMergersState::FieldMergersState(const FusionOutputIndex& fusion_out_index, vespalib::ThreadExecutor& executor, std::shared_ptr<IFlushToken> flush_token)
+FieldMergersState::FieldMergersState(const FusionOutputIndex& fusion_out_index, vespalib::Executor& executor, std::shared_ptr<IFlushToken> flush_token)
: _fusion_out_index(fusion_out_index),
_executor(executor),
_flush_token(std::move(flush_token)),
- _concurrent(std::max(1ul, _executor.getNumThreads() / 2)),
_done(_fusion_out_index.get_schema().getNumIndexFields()),
_failed(0u),
_field_mergers(_fusion_out_index.get_schema().getNumIndexFields())
@@ -28,7 +28,6 @@ FieldMergersState::~FieldMergersState()
FieldMerger&
FieldMergersState::alloc_field_merger(uint32_t id)
{
- _concurrent.wait();
assert(id < _field_mergers.size());
auto field_merger = std::make_unique<FieldMerger>(id, _fusion_out_index, _flush_token);
auto& result = *field_merger;
@@ -46,7 +45,6 @@ FieldMergersState::destroy_field_merger(FieldMerger& field_merger)
old_merger = std::move(_field_mergers[id]);
assert(old_merger.get() == &field_merger);
old_merger.reset();
- _concurrent.post();
_done.countDown();
}
diff --git a/searchlib/src/vespa/searchlib/diskindex/field_mergers_state.h b/searchlib/src/vespa/searchlib/diskindex/field_mergers_state.h
index 34c50c4d3e5..f4bad9a2b8c 100644
--- a/searchlib/src/vespa/searchlib/diskindex/field_mergers_state.h
+++ b/searchlib/src/vespa/searchlib/diskindex/field_mergers_state.h
@@ -2,12 +2,12 @@
#pragma once
-#include <vespa/document/util/queue.h>
#include <vespa/vespalib/util/count_down_latch.h>
#include <atomic>
+#include <vector>
namespace search { class IFlushToken; }
-namespace vespalib { class ThreadExecutor; }
+namespace vespalib { class Executor; }
namespace search::diskindex {
@@ -20,16 +20,15 @@ class FusionOutputIndex;
*/
class FieldMergersState {
const FusionOutputIndex& _fusion_out_index;
- vespalib::ThreadExecutor& _executor;
+ vespalib::Executor& _executor;
std::shared_ptr<IFlushToken> _flush_token;
- document::Semaphore _concurrent;
vespalib::CountDownLatch _done;
std::atomic<uint32_t> _failed;
std::vector<std::unique_ptr<FieldMerger>> _field_mergers;
void destroy_field_merger(FieldMerger& field_merger);
public:
- FieldMergersState(const FusionOutputIndex& fusion_out_index, vespalib::ThreadExecutor& executor, std::shared_ptr<IFlushToken> flush_token);
+ FieldMergersState(const FusionOutputIndex& fusion_out_index, vespalib::Executor& executor, std::shared_ptr<IFlushToken> flush_token);
~FieldMergersState();
FieldMerger& alloc_field_merger(uint32_t id);
void field_merger_done(FieldMerger& field_merger, bool failed);
diff --git a/searchlib/src/vespa/searchlib/diskindex/fusion.cpp b/searchlib/src/vespa/searchlib/diskindex/fusion.cpp
index 0fa634ef072..7d8bd4bc799 100644
--- a/searchlib/src/vespa/searchlib/diskindex/fusion.cpp
+++ b/searchlib/src/vespa/searchlib/diskindex/fusion.cpp
@@ -10,7 +10,6 @@
#include <vespa/searchlib/index/schemautil.h>
#include <vespa/searchlib/util/dirtraverse.h>
#include <vespa/vespalib/io/fileutil.h>
-#include <vespa/vespalib/util/count_down_latch.h>
#include <vespa/vespalib/util/error.h>
#include <vespa/vespalib/util/exceptions.h>
#include <vespa/vespalib/util/lambdatask.h>
@@ -72,7 +71,7 @@ Fusion::Fusion(const Schema& schema, const vespalib::string& dir,
Fusion::~Fusion() = default;
bool
-Fusion::mergeFields(vespalib::ThreadExecutor & executor, std::shared_ptr<IFlushToken> flush_token)
+Fusion::mergeFields(vespalib::Executor & executor, std::shared_ptr<IFlushToken> flush_token)
{
FieldMergersState field_mergers_state(_fusion_out_index, executor, flush_token);
const Schema &schema = getSchema();
@@ -104,7 +103,7 @@ Fusion::readSchemaFiles()
}
bool
-Fusion::merge(vespalib::ThreadExecutor& executor, std::shared_ptr<IFlushToken> flush_token)
+Fusion::merge(vespalib::Executor& executor, std::shared_ptr<IFlushToken> flush_token)
{
FastOS_StatInfo statInfo;
if (!FastOS_File::Stat(_fusion_out_index.get_path().c_str(), &statInfo)) {
diff --git a/searchlib/src/vespa/searchlib/diskindex/fusion.h b/searchlib/src/vespa/searchlib/diskindex/fusion.h
index 1f5c4471950..04edf77ea81 100644
--- a/searchlib/src/vespa/searchlib/diskindex/fusion.h
+++ b/searchlib/src/vespa/searchlib/diskindex/fusion.h
@@ -3,7 +3,7 @@
#pragma once
#include "fusion_output_index.h"
-#include <vespa/vespalib/util/threadexecutor.h>
+#include <vespa/vespalib/util/executor.h>
namespace search {
class IFlushToken;
@@ -25,7 +25,7 @@ class Fusion
private:
using Schema = index::Schema;
- bool mergeFields(vespalib::ThreadExecutor& executor, std::shared_ptr<IFlushToken> flush_token);
+ bool mergeFields(vespalib::Executor& executor, std::shared_ptr<IFlushToken> flush_token);
bool readSchemaFiles();
bool checkSchemaCompat();
@@ -43,7 +43,7 @@ public:
~Fusion();
void set_dynamic_k_pos_index_format(bool dynamic_k_pos_index_format) { _fusion_out_index.set_dynamic_k_pos_index_format(dynamic_k_pos_index_format); }
void set_force_small_merge_chunk(bool force_small_merge_chunk) { _fusion_out_index.set_force_small_merge_chunk(force_small_merge_chunk); }
- bool merge(vespalib::ThreadExecutor& executor, std::shared_ptr<IFlushToken> flush_token);
+ bool merge(vespalib::Executor& executor, std::shared_ptr<IFlushToken> flush_token);
};
}