From 92649edc2b02a8615f2cdff162deef5c1d9ce337 Mon Sep 17 00:00:00 2001 From: HÃ¥vard Pettersen Date: Wed, 14 Sep 2022 14:00:22 +0000 Subject: more convenient ThreadBundle::run --- .../proton/matching/extract_features.cpp | 32 ++++---------- .../searchcore/proton/matching/match_master.cpp | 4 +- .../vespa/searchcore/proton/matching/matcher.cpp | 4 +- .../simple_thread_bundle_test.cpp | 50 ++++++++++++++++++++++ .../vespa/vespalib/util/simple_thread_bundle.cpp | 11 ++--- .../src/vespa/vespalib/util/simple_thread_bundle.h | 12 +++--- vespalib/src/vespa/vespalib/util/thread_bundle.cpp | 6 +-- vespalib/src/vespa/vespalib/util/thread_bundle.h | 35 ++++++++++++++- 8 files changed, 111 insertions(+), 43 deletions(-) diff --git a/searchcore/src/vespa/searchcore/proton/matching/extract_features.cpp b/searchcore/src/vespa/searchcore/proton/matching/extract_features.cpp index 6115d642b4a..8f7970f5717 100644 --- a/searchcore/src/vespa/searchcore/proton/matching/extract_features.cpp +++ b/searchcore/src/vespa/searchcore/proton/matching/extract_features.cpp @@ -114,22 +114,6 @@ struct LaterChunk : MyChunk { } }; -struct MyWork { - size_t num_threads; - std::vector chunks; - MyWork(ThreadBundle &thread_bundle) : num_threads(thread_bundle.size()), chunks() { - chunks.reserve(num_threads); - } - void run(ThreadBundle &thread_bundle) { - std::vector refs; - refs.reserve(chunks.size()); - for (const auto &task: chunks) { - refs.push_back(task.get()); - } - thread_bundle.run(refs); - } -}; - } // unnamed FeatureSet::UP @@ -161,24 +145,26 @@ ExtractFeatures::get_match_features(const MatchToolsFactory &mtf, const OrderedD FeatureResolver resolver(tools->rank_program().get_seeds(false)); result.names = extract_names(resolver, mtf.get_feature_rename_map()); result.values.resize(result.names.size() * docs.size()); - MyWork work(thread_bundle); - size_t per_thread = docs.size() / work.num_threads; - size_t rest_docs = docs.size() % work.num_threads; + size_t num_threads = thread_bundle.size(); + std::vector chunks; + chunks.reserve(num_threads); + size_t per_thread = docs.size() / num_threads; + size_t rest_docs = docs.size() % num_threads; size_t idx = 0; - for (size_t i = 0; i < work.num_threads; ++i) { + for (size_t i = 0; i < num_threads; ++i) { size_t chunk_size = per_thread + (i < rest_docs); if (chunk_size == 0) { break; } if (i == 0) { - work.chunks.push_back(std::make_unique(&docs[idx], &docs[idx + chunk_size], result, tools->getDoom(), tools->search(), resolver)); + chunks.push_back(std::make_unique(&docs[idx], &docs[idx + chunk_size], result, tools->getDoom(), tools->search(), resolver)); } else { - work.chunks.push_back(std::make_unique(&docs[idx], &docs[idx + chunk_size], result, tools->getDoom(), mtf)); + chunks.push_back(std::make_unique(&docs[idx], &docs[idx + chunk_size], result, tools->getDoom(), mtf)); } idx += chunk_size; } assert(idx == docs.size()); - work.run(thread_bundle); + thread_bundle.run(chunks); return result; } diff --git a/searchcore/src/vespa/searchcore/proton/matching/match_master.cpp b/searchcore/src/vespa/searchcore/proton/matching/match_master.cpp index eefb8411df2..063666e9cad 100644 --- a/searchcore/src/vespa/searchcore/proton/matching/match_master.cpp +++ b/searchcore/src/vespa/searchcore/proton/matching/match_master.cpp @@ -103,7 +103,6 @@ MatchMaster::match(search::engine::Trace & trace, DocidRangeScheduler::UP scheduler = createScheduler(threadBundle.size(), numSearchPartitions, params.numDocs); std::vector threadState; - std::vector targets; for (size_t i = 0; i < threadBundle.size(); ++i) { IMatchLoopCommunicator &com = (i == 0) ? static_cast(timedCommunicator) @@ -111,10 +110,9 @@ MatchMaster::match(search::engine::Trace & trace, threadState.emplace_back(std::make_unique(i, threadBundle.size(), params, mtf, com, *scheduler, resultProcessor, mergeDirector, distributionKey, trace.getRelativeTime(), trace.getLevel(), trace.getProfileDepth())); - targets.push_back(threadState.back().get()); } resultProcessor.prepareThreadContextCreation(threadBundle.size()); - threadBundle.run(targets); + threadBundle.run(threadState); auto reply = make_reply(mtf, resultProcessor, threadBundle, threadState[0]->extract_result()); double query_time_s = vespalib::to_s(query_latency_time.elapsed()); double rerank_time_s = vespalib::to_s(timedCommunicator.elapsed); diff --git a/searchcore/src/vespa/searchcore/proton/matching/matcher.cpp b/searchcore/src/vespa/searchcore/proton/matching/matcher.cpp index 95fe6fd6c3e..b612f4cd061 100644 --- a/searchcore/src/vespa/searchcore/proton/matching/matcher.cpp +++ b/searchcore/src/vespa/searchcore/proton/matching/matcher.cpp @@ -75,8 +75,8 @@ public: { } private: size_t size() const override { return _maxThreads; } - void run(const std::vector &targets) override { - _threadBundle.run(targets); + void run(vespalib::Runnable* const* targets, size_t cnt) override { + _threadBundle.run(targets, cnt); } vespalib::ThreadBundle &_threadBundle; const uint32_t _maxThreads; diff --git a/vespalib/src/tests/simple_thread_bundle/simple_thread_bundle_test.cpp b/vespalib/src/tests/simple_thread_bundle/simple_thread_bundle_test.cpp index debb34724f6..d6e0e473e59 100644 --- a/vespalib/src/tests/simple_thread_bundle/simple_thread_bundle_test.cpp +++ b/vespalib/src/tests/simple_thread_bundle/simple_thread_bundle_test.cpp @@ -201,4 +201,54 @@ TEST_MT_FF("require that bundle pool works with multiple threads", 32, SimpleThr f1.release(std::move(bundle)); } +struct Filler { + int stuff; + Filler() : stuff(0) {} + virtual ~Filler() {} +}; + +struct Proxy : Filler, Runnable { + Runnable ⌖ + Proxy(Runnable &target_in) : target(target_in) {} + void run() override { target.run(); } +}; + +struct AlmostRunnable : Runnable {}; + +TEST("require that Proxy needs fixup to become Runnable") { + Cnt cnt; + Proxy proxy(cnt); + Runnable &runnable = proxy; + void *proxy_ptr = &proxy; + void *runnable_ptr = &runnable; + EXPECT_TRUE(proxy_ptr != runnable_ptr); +} + +TEST_FF("require that various versions of run can be used to invoke targets", SimpleThreadBundle(5), State(5)) { + EXPECT_TRUE(ThreadBundle::is_runnable_ptr()); + EXPECT_TRUE(ThreadBundle::is_runnable_ptr()); + EXPECT_FALSE(ThreadBundle::is_runnable_ptr>()); + EXPECT_FALSE(ThreadBundle::is_runnable_ptr>()); + std::vector direct; + std::vector> custom; + for (Runnable &target: f2.cnts) { + direct.push_back(std::make_unique(target)); + custom.push_back(std::make_unique(target)); + } + std::vector refs = f2.getTargets(5); + f2.check({0,0,0,0,0}); + f1.run(refs.data(), 3); + f2.check({1,1,1,0,0}); + f1.run(&refs[3], 2); + f2.check({1,1,1,1,1}); + f1.run(refs); + f2.check({2,2,2,2,2}); + f1.run(direct); + f2.check({3,3,3,3,3}); + f1.run(custom); + f2.check({4,4,4,4,4}); + f1.run(f2.cnts); + f2.check({5,5,5,5,5}); +} + TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/vespalib/src/vespa/vespalib/util/simple_thread_bundle.cpp b/vespalib/src/vespa/vespalib/util/simple_thread_bundle.cpp index 99ab298864f..8a66a4f6898 100644 --- a/vespalib/src/vespa/vespalib/util/simple_thread_bundle.cpp +++ b/vespalib/src/vespa/vespalib/util/simple_thread_bundle.cpp @@ -157,20 +157,21 @@ SimpleThreadBundle::size() const } void -SimpleThreadBundle::run(const std::vector &targets) +SimpleThreadBundle::run(Runnable* const* targets, size_t cnt) { - if (targets.size() > size()) { + if (cnt > size()) { throw IllegalArgumentException("too many targets"); } - if (targets.empty()) { + if (cnt == 0) { return; } - if (targets.size() == 1) { + if (cnt == 1) { targets[0]->run(); return; } CountDownLatch latch(size()); - _work.targets = &targets; + _work.targets = targets; + _work.cnt = cnt; _work.latch = &latch; _hook->run(); latch.await(); diff --git a/vespalib/src/vespa/vespalib/util/simple_thread_bundle.h b/vespalib/src/vespa/vespalib/util/simple_thread_bundle.h index b7434d09ac3..569500635a5 100644 --- a/vespalib/src/vespa/vespalib/util/simple_thread_bundle.h +++ b/vespalib/src/vespa/vespalib/util/simple_thread_bundle.h @@ -18,9 +18,10 @@ namespace fixed_thread_bundle { * support static wiring of signal paths and execution hooks. **/ struct Work { - const std::vector *targets; + Runnable* const* targets; + size_t cnt; CountDownLatch *latch; - Work() : targets(0), latch(0) {} + Work() : targets(nullptr), cnt(0), latch(0) {} }; /** @@ -30,10 +31,10 @@ struct Part { const Work &work; size_t offset; Part(const Work &w, size_t o) : work(w), offset(o) {} - bool valid() { return (offset < work.targets->size()); } + bool valid() { return (offset < work.cnt); } void perform() { if (valid()) { - (*(work.targets))[offset]->run(); + work.targets[offset]->run(); } work.latch->countDown(); } @@ -130,7 +131,8 @@ public: : SimpleThreadBundle(size, Runnable::default_init_function, strategy) {} ~SimpleThreadBundle(); size_t size() const override; - void run(const std::vector &targets) override; + using ThreadBundle::run; + void run(Runnable* const* targets, size_t cnt) override; }; } // namespace vespalib diff --git a/vespalib/src/vespa/vespalib/util/thread_bundle.cpp b/vespalib/src/vespa/vespalib/util/thread_bundle.cpp index 9f953abfce7..d47ffb2f3ef 100644 --- a/vespalib/src/vespa/vespalib/util/thread_bundle.cpp +++ b/vespalib/src/vespa/vespalib/util/thread_bundle.cpp @@ -9,10 +9,10 @@ ThreadBundle & ThreadBundle::trivial() { struct TrivialThreadBundle : ThreadBundle { size_t size() const override { return 1; } - void run(const std::vector &targets) override { - if (targets.size() == 1) { + void run(Runnable* const* targets, size_t cnt) override { + if (cnt == 1) { targets[0]->run(); - } else if (targets.size() > 1) { + } else if (cnt > 1) { throw IllegalArgumentException("too many targets"); } }; diff --git a/vespalib/src/vespa/vespalib/util/thread_bundle.h b/vespalib/src/vespa/vespalib/util/thread_bundle.h index 699fd8e27a0..830d7c76e7c 100644 --- a/vespalib/src/vespa/vespalib/util/thread_bundle.h +++ b/vespalib/src/vespa/vespalib/util/thread_bundle.h @@ -27,7 +27,34 @@ struct ThreadBundle { * their completion. This function cannot be called with more * targets than the size of this bundle. **/ - virtual void run(const std::vector &targets) = 0; + virtual void run(Runnable* const* targets, size_t cnt) = 0; + + // convenience run wrapper + void run(const std::vector &targets) { + run(targets.data(), targets.size()); + } + + // convenience run wrapper + void run(const std::vector &targets) { + static_assert(sizeof(Runnable::UP) == sizeof(Runnable*)); + run(reinterpret_cast(targets.data()), targets.size()); + } + + template + static constexpr bool is_runnable_ptr() { + return (std::is_same_v || std::is_same_v); + } + + // convenience run wrapper + template + std::enable_if_t(),void> run(std::vector &items) { + std::vector targets; + targets.reserve(items.size()); + for (auto &item: items) { + targets.push_back(resolve(item)); + } + run(targets); + } /** * Empty virtual destructor to enable subclassing. @@ -36,7 +63,11 @@ struct ThreadBundle { // a thread bundle that can only run things in the current thread. static ThreadBundle &trivial(); + +private: + Runnable *resolve(Runnable &target) { return ⌖ } + template + Runnable *resolve(const std::unique_ptr &target) { return target.get(); } }; } // namespace vespalib - -- cgit v1.2.3