aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2022-09-14 17:43:20 +0200
committerGitHub <noreply@github.com>2022-09-14 17:43:20 +0200
commit58ce86bd1a8006c49f8286bfb73ba0f56276f2fb (patch)
treeb0b2bf1e0eeaf8adac6e06d4a5a66c7b64ae3044
parent4840ba81903a4f030c2541b805124cc0eb271e3a (diff)
parent92649edc2b02a8615f2cdff162deef5c1d9ce337 (diff)
Merge pull request #24056 from vespa-engine/havardpe/more-convenient-thread-bundlev8.53.21
more convenient ThreadBundle::run
-rw-r--r--searchcore/src/vespa/searchcore/proton/matching/extract_features.cpp32
-rw-r--r--searchcore/src/vespa/searchcore/proton/matching/match_master.cpp4
-rw-r--r--searchcore/src/vespa/searchcore/proton/matching/matcher.cpp4
-rw-r--r--vespalib/src/tests/simple_thread_bundle/simple_thread_bundle_test.cpp50
-rw-r--r--vespalib/src/vespa/vespalib/util/simple_thread_bundle.cpp11
-rw-r--r--vespalib/src/vespa/vespalib/util/simple_thread_bundle.h12
-rw-r--r--vespalib/src/vespa/vespalib/util/thread_bundle.cpp6
-rw-r--r--vespalib/src/vespa/vespalib/util/thread_bundle.h35
8 files changed, 111 insertions, 43 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/matching/extract_features.cpp b/searchcore/src/vespa/searchcore/proton/matching/extract_features.cpp
index 6115d642b4a..8f7970f5717 100644
--- a/searchcore/src/vespa/searchcore/proton/matching/extract_features.cpp
+++ b/searchcore/src/vespa/searchcore/proton/matching/extract_features.cpp
@@ -114,22 +114,6 @@ struct LaterChunk : MyChunk {
}
};
-struct MyWork {
- size_t num_threads;
- std::vector<Runnable::UP> chunks;
- MyWork(ThreadBundle &thread_bundle) : num_threads(thread_bundle.size()), chunks() {
- chunks.reserve(num_threads);
- }
- void run(ThreadBundle &thread_bundle) {
- std::vector<Runnable*> refs;
- refs.reserve(chunks.size());
- for (const auto &task: chunks) {
- refs.push_back(task.get());
- }
- thread_bundle.run(refs);
- }
-};
-
} // unnamed
FeatureSet::UP
@@ -161,24 +145,26 @@ ExtractFeatures::get_match_features(const MatchToolsFactory &mtf, const OrderedD
FeatureResolver resolver(tools->rank_program().get_seeds(false));
result.names = extract_names(resolver, mtf.get_feature_rename_map());
result.values.resize(result.names.size() * docs.size());
- MyWork work(thread_bundle);
- size_t per_thread = docs.size() / work.num_threads;
- size_t rest_docs = docs.size() % work.num_threads;
+ size_t num_threads = thread_bundle.size();
+ std::vector<Runnable::UP> chunks;
+ chunks.reserve(num_threads);
+ size_t per_thread = docs.size() / num_threads;
+ size_t rest_docs = docs.size() % num_threads;
size_t idx = 0;
- for (size_t i = 0; i < work.num_threads; ++i) {
+ for (size_t i = 0; i < num_threads; ++i) {
size_t chunk_size = per_thread + (i < rest_docs);
if (chunk_size == 0) {
break;
}
if (i == 0) {
- work.chunks.push_back(std::make_unique<FirstChunk>(&docs[idx], &docs[idx + chunk_size], result, tools->getDoom(), tools->search(), resolver));
+ chunks.push_back(std::make_unique<FirstChunk>(&docs[idx], &docs[idx + chunk_size], result, tools->getDoom(), tools->search(), resolver));
} else {
- work.chunks.push_back(std::make_unique<LaterChunk>(&docs[idx], &docs[idx + chunk_size], result, tools->getDoom(), mtf));
+ chunks.push_back(std::make_unique<LaterChunk>(&docs[idx], &docs[idx + chunk_size], result, tools->getDoom(), mtf));
}
idx += chunk_size;
}
assert(idx == docs.size());
- work.run(thread_bundle);
+ thread_bundle.run(chunks);
return result;
}
diff --git a/searchcore/src/vespa/searchcore/proton/matching/match_master.cpp b/searchcore/src/vespa/searchcore/proton/matching/match_master.cpp
index eefb8411df2..063666e9cad 100644
--- a/searchcore/src/vespa/searchcore/proton/matching/match_master.cpp
+++ b/searchcore/src/vespa/searchcore/proton/matching/match_master.cpp
@@ -103,7 +103,6 @@ MatchMaster::match(search::engine::Trace & trace,
DocidRangeScheduler::UP scheduler = createScheduler(threadBundle.size(), numSearchPartitions, params.numDocs);
std::vector<MatchThread::UP> threadState;
- std::vector<vespalib::Runnable*> targets;
for (size_t i = 0; i < threadBundle.size(); ++i) {
IMatchLoopCommunicator &com = (i == 0)
? static_cast<IMatchLoopCommunicator&>(timedCommunicator)
@@ -111,10 +110,9 @@ MatchMaster::match(search::engine::Trace & trace,
threadState.emplace_back(std::make_unique<MatchThread>(i, threadBundle.size(), params, mtf, com, *scheduler,
resultProcessor, mergeDirector, distributionKey,
trace.getRelativeTime(), trace.getLevel(), trace.getProfileDepth()));
- targets.push_back(threadState.back().get());
}
resultProcessor.prepareThreadContextCreation(threadBundle.size());
- threadBundle.run(targets);
+ threadBundle.run(threadState);
auto reply = make_reply(mtf, resultProcessor, threadBundle, threadState[0]->extract_result());
double query_time_s = vespalib::to_s(query_latency_time.elapsed());
double rerank_time_s = vespalib::to_s(timedCommunicator.elapsed);
diff --git a/searchcore/src/vespa/searchcore/proton/matching/matcher.cpp b/searchcore/src/vespa/searchcore/proton/matching/matcher.cpp
index 95fe6fd6c3e..b612f4cd061 100644
--- a/searchcore/src/vespa/searchcore/proton/matching/matcher.cpp
+++ b/searchcore/src/vespa/searchcore/proton/matching/matcher.cpp
@@ -75,8 +75,8 @@ public:
{ }
private:
size_t size() const override { return _maxThreads; }
- void run(const std::vector<vespalib::Runnable*> &targets) override {
- _threadBundle.run(targets);
+ void run(vespalib::Runnable* const* targets, size_t cnt) override {
+ _threadBundle.run(targets, cnt);
}
vespalib::ThreadBundle &_threadBundle;
const uint32_t _maxThreads;
diff --git a/vespalib/src/tests/simple_thread_bundle/simple_thread_bundle_test.cpp b/vespalib/src/tests/simple_thread_bundle/simple_thread_bundle_test.cpp
index debb34724f6..d6e0e473e59 100644
--- a/vespalib/src/tests/simple_thread_bundle/simple_thread_bundle_test.cpp
+++ b/vespalib/src/tests/simple_thread_bundle/simple_thread_bundle_test.cpp
@@ -201,4 +201,54 @@ TEST_MT_FF("require that bundle pool works with multiple threads", 32, SimpleThr
f1.release(std::move(bundle));
}
+struct Filler {
+ int stuff;
+ Filler() : stuff(0) {}
+ virtual ~Filler() {}
+};
+
+struct Proxy : Filler, Runnable {
+ Runnable &target;
+ Proxy(Runnable &target_in) : target(target_in) {}
+ void run() override { target.run(); }
+};
+
+struct AlmostRunnable : Runnable {};
+
+TEST("require that Proxy needs fixup to become Runnable") {
+ Cnt cnt;
+ Proxy proxy(cnt);
+ Runnable &runnable = proxy;
+ void *proxy_ptr = &proxy;
+ void *runnable_ptr = &runnable;
+ EXPECT_TRUE(proxy_ptr != runnable_ptr);
+}
+
+TEST_FF("require that various versions of run can be used to invoke targets", SimpleThreadBundle(5), State(5)) {
+ EXPECT_TRUE(ThreadBundle::is_runnable_ptr<Runnable*>());
+ EXPECT_TRUE(ThreadBundle::is_runnable_ptr<Runnable::UP>());
+ EXPECT_FALSE(ThreadBundle::is_runnable_ptr<std::unique_ptr<Proxy>>());
+ EXPECT_FALSE(ThreadBundle::is_runnable_ptr<std::unique_ptr<AlmostRunnable>>());
+ std::vector<Runnable::UP> direct;
+ std::vector<std::unique_ptr<Proxy>> custom;
+ for (Runnable &target: f2.cnts) {
+ direct.push_back(std::make_unique<Proxy>(target));
+ custom.push_back(std::make_unique<Proxy>(target));
+ }
+ std::vector<Runnable*> refs = f2.getTargets(5);
+ f2.check({0,0,0,0,0});
+ f1.run(refs.data(), 3);
+ f2.check({1,1,1,0,0});
+ f1.run(&refs[3], 2);
+ f2.check({1,1,1,1,1});
+ f1.run(refs);
+ f2.check({2,2,2,2,2});
+ f1.run(direct);
+ f2.check({3,3,3,3,3});
+ f1.run(custom);
+ f2.check({4,4,4,4,4});
+ f1.run(f2.cnts);
+ f2.check({5,5,5,5,5});
+}
+
TEST_MAIN() { TEST_RUN_ALL(); }
diff --git a/vespalib/src/vespa/vespalib/util/simple_thread_bundle.cpp b/vespalib/src/vespa/vespalib/util/simple_thread_bundle.cpp
index 99ab298864f..8a66a4f6898 100644
--- a/vespalib/src/vespa/vespalib/util/simple_thread_bundle.cpp
+++ b/vespalib/src/vespa/vespalib/util/simple_thread_bundle.cpp
@@ -157,20 +157,21 @@ SimpleThreadBundle::size() const
}
void
-SimpleThreadBundle::run(const std::vector<Runnable*> &targets)
+SimpleThreadBundle::run(Runnable* const* targets, size_t cnt)
{
- if (targets.size() > size()) {
+ if (cnt > size()) {
throw IllegalArgumentException("too many targets");
}
- if (targets.empty()) {
+ if (cnt == 0) {
return;
}
- if (targets.size() == 1) {
+ if (cnt == 1) {
targets[0]->run();
return;
}
CountDownLatch latch(size());
- _work.targets = &targets;
+ _work.targets = targets;
+ _work.cnt = cnt;
_work.latch = &latch;
_hook->run();
latch.await();
diff --git a/vespalib/src/vespa/vespalib/util/simple_thread_bundle.h b/vespalib/src/vespa/vespalib/util/simple_thread_bundle.h
index b7434d09ac3..569500635a5 100644
--- a/vespalib/src/vespa/vespalib/util/simple_thread_bundle.h
+++ b/vespalib/src/vespa/vespalib/util/simple_thread_bundle.h
@@ -18,9 +18,10 @@ namespace fixed_thread_bundle {
* support static wiring of signal paths and execution hooks.
**/
struct Work {
- const std::vector<Runnable *> *targets;
+ Runnable* const* targets;
+ size_t cnt;
CountDownLatch *latch;
- Work() : targets(0), latch(0) {}
+ Work() : targets(nullptr), cnt(0), latch(0) {}
};
/**
@@ -30,10 +31,10 @@ struct Part {
const Work &work;
size_t offset;
Part(const Work &w, size_t o) : work(w), offset(o) {}
- bool valid() { return (offset < work.targets->size()); }
+ bool valid() { return (offset < work.cnt); }
void perform() {
if (valid()) {
- (*(work.targets))[offset]->run();
+ work.targets[offset]->run();
}
work.latch->countDown();
}
@@ -130,7 +131,8 @@ public:
: SimpleThreadBundle(size, Runnable::default_init_function, strategy) {}
~SimpleThreadBundle();
size_t size() const override;
- void run(const std::vector<Runnable*> &targets) override;
+ using ThreadBundle::run;
+ void run(Runnable* const* targets, size_t cnt) override;
};
} // namespace vespalib
diff --git a/vespalib/src/vespa/vespalib/util/thread_bundle.cpp b/vespalib/src/vespa/vespalib/util/thread_bundle.cpp
index 9f953abfce7..d47ffb2f3ef 100644
--- a/vespalib/src/vespa/vespalib/util/thread_bundle.cpp
+++ b/vespalib/src/vespa/vespalib/util/thread_bundle.cpp
@@ -9,10 +9,10 @@ ThreadBundle &
ThreadBundle::trivial() {
struct TrivialThreadBundle : ThreadBundle {
size_t size() const override { return 1; }
- void run(const std::vector<Runnable*> &targets) override {
- if (targets.size() == 1) {
+ void run(Runnable* const* targets, size_t cnt) override {
+ if (cnt == 1) {
targets[0]->run();
- } else if (targets.size() > 1) {
+ } else if (cnt > 1) {
throw IllegalArgumentException("too many targets");
}
};
diff --git a/vespalib/src/vespa/vespalib/util/thread_bundle.h b/vespalib/src/vespa/vespalib/util/thread_bundle.h
index 699fd8e27a0..830d7c76e7c 100644
--- a/vespalib/src/vespa/vespalib/util/thread_bundle.h
+++ b/vespalib/src/vespa/vespalib/util/thread_bundle.h
@@ -27,7 +27,34 @@ struct ThreadBundle {
* their completion. This function cannot be called with more
* targets than the size of this bundle.
**/
- virtual void run(const std::vector<Runnable*> &targets) = 0;
+ virtual void run(Runnable* const* targets, size_t cnt) = 0;
+
+ // convenience run wrapper
+ void run(const std::vector<Runnable*> &targets) {
+ run(targets.data(), targets.size());
+ }
+
+ // convenience run wrapper
+ void run(const std::vector<Runnable::UP> &targets) {
+ static_assert(sizeof(Runnable::UP) == sizeof(Runnable*));
+ run(reinterpret_cast<Runnable* const*>(targets.data()), targets.size());
+ }
+
+ template <typename T>
+ static constexpr bool is_runnable_ptr() {
+ return (std::is_same_v<T,Runnable*> || std::is_same_v<T,Runnable::UP>);
+ }
+
+ // convenience run wrapper
+ template <typename Item>
+ std::enable_if_t<!is_runnable_ptr<Item>(),void> run(std::vector<Item> &items) {
+ std::vector<Runnable*> targets;
+ targets.reserve(items.size());
+ for (auto &item: items) {
+ targets.push_back(resolve(item));
+ }
+ run(targets);
+ }
/**
* Empty virtual destructor to enable subclassing.
@@ -36,7 +63,11 @@ struct ThreadBundle {
// a thread bundle that can only run things in the current thread.
static ThreadBundle &trivial();
+
+private:
+ Runnable *resolve(Runnable &target) { return &target; }
+ template <typename T>
+ Runnable *resolve(const std::unique_ptr<T> &target) { return target.get(); }
};
} // namespace vespalib
-